分布式系统设计模式 - 最低水位线(Low-Water Mark)

港控/mmm° 2022-12-09 14:58 140阅读 0赞

设计模式翻译自:https://martinfowler.com/articles/patterns-of-distributed-systems/low-watermark.html

最低水位线(Low-Water Mark)

最低水位线是指在 WAL(Write Ahead Log)预写日志这种设计模式中,标记在这个位置之前的日志可以被丢弃。

问题背景

WAL(Write Ahead Log)预写日志维护了对于存储的每次更新,随着时间不断增长,这个日志文件会变得无限大。Segmented Log 分割日志这种设计模式可以让我们每次只处理一个更小的文件,但是日志如果不清理,会无休止增长以至于硬盘被占满。

解决方案

最低水位线这种设计模式会告诉系统哪一部分的日志可以被删除了,即在最低水位线之前的所有日志可以被清理掉。一般的方式是,程序内有一个线程运行一个定时任务,不断地检查哪一部分的日志可以被清理并且删除这些日志文件。

  1. this.logCleaner = newLogCleaner(config);
  2. this.logCleaner.startup();

这里的 LogCleaner 可以用定时任务实现:

  1. public void startup() {
  2. scheduleLogCleaning();
  3. }
  4. private void scheduleLogCleaning() {
  5. singleThreadedExecutor.schedule(() -> {
  6. cleanLogs();
  7. }, config.getCleanTaskIntervalMs(), TimeUnit.MILLISECONDS);
  8. }

基于快照的最低水位线实现以及示例

大部分的分布式一致性系统(例如 Zookeeper(ZAB 简化 paxos协议),etcd(raft协议)),都实现了快照机制。在这种机制下,他们的存储引擎会定时的进行全量快照,并且记录下快照对应的日志位置,将这个位置作为最低水位线。

  1. //进行快照
  2. public SnapShot takeSnapshot() {
  3. //获取最近的日志id
  4. Long snapShotTakenAtLogIndex = wal.getLastLogEntryId();
  5. //利用这个日志 id 作为标识,生成快照
  6. return new SnapShot(serializeState(kv), snapShotTakenAtLogIndex);
  7. }

当生成了快照并成功存储到了磁盘上,对应的最低水位线将用来清理老的日志:

  1. //根据位置获取这个位置之前的所有日志文件
  2. List<WALSegment> getSegmentsBefore(Long snapshotIndex) {
  3. List<WALSegment> markedForDeletion = new ArrayList<>();
  4. List<WALSegment> sortedSavedSegments = wal.sortedSavedSegments;
  5. for (WALSegment sortedSavedSegment : sortedSavedSegments) {
  6. //如果这个日志文件的最新log id 小于快照位置,证明可以被清理掉
  7. if (sortedSavedSegment.getLastLogEntryId() < snapshotIndex) {
  8. markedForDeletion.add(sortedSavedSegment);
  9. }
  10. }
  11. return markedForDeletion;
  12. }

zookeeper 中的最低水位线实现

定时任务位于DatadirCleanupManagerstart方法:

  1. public void start() {
  2. //只启动一次
  3. if (PurgeTaskStatus.STARTED == purgeTaskStatus) {
  4. LOG.warn("Purge task is already running.");
  5. return;
  6. }
  7. //检查定时间隔有效性
  8. if (purgeInterval <= 0) {
  9. LOG.info("Purge task is not scheduled.");
  10. return;
  11. }
  12. //启动定时任务
  13. timer = new Timer("PurgeTask", true);
  14. TimerTask task = new PurgeTask(dataLogDir, snapDir,snapRetainCount);
  15. timer.scheduleAtFixedRate(task, 0, TimeUnit.HOURS.toMillis(purgeInterval));
  16. purgeTaskStatus = PurgeTaskStatus.STARTED;
  17. }

核心方法为PurgeTxnLogpurge方法:

  1. public static void purge(File dataDir, File snapDir, int num) throws IOException {
  2. //保留的snapshot数量不能超过3
  3. if (num < 3) {
  4. throw new IllegalArgumentException(COUNT_ERR_MSG);
  5. }
  6. FileTxnSnapLog txnLog = new FileTxnSnapLog(dataDir, snapDir);
  7. //统计文件数量
  8. List<File> snaps = txnLog.findNValidSnapshots(num);
  9. int numSnaps = snaps.size();
  10. if (numSnaps > 0) {
  11. //利用上一个文件的日志偏移,清理log文件和snapshot文件
  12. purgeOlderSnapshots(txnLog, snaps.get(numSnaps - 1));
  13. }
  14. }
  15. static void purgeOlderSnapshots(FileTxnSnapLog txnLog, File snapShot) {
  16. //名字包括开头的zxid,就是代表了日志位置
  17. final long leastZxidToBeRetain = Util.getZxidFromName(snapShot.getName(), PREFIX_SNAPSHOT);
  18. final Set<File> retainedTxnLogs = new HashSet<File>();
  19. retainedTxnLogs.addAll(Arrays.asList(txnLog.getSnapshotLogs(leastZxidToBeRetain)));
  20. class MyFileFilter implements FileFilter {
  21. private final String prefix;
  22. MyFileFilter(String prefix) {
  23. this.prefix = prefix;
  24. }
  25. public boolean accept(File f) {
  26. if (!f.getName().startsWith(prefix + ".")) {
  27. return false;
  28. }
  29. if (retainedTxnLogs.contains(f)) {
  30. return false;
  31. }
  32. long fZxid = Util.getZxidFromName(f.getName(), prefix);
  33. //根据文件名称代表的zxid,过滤出要删除的文件
  34. return fZxid < leastZxidToBeRetain;
  35. }
  36. }
  37. //筛选出符合条件的 log 文件和 snapshot 文件
  38. File[] logs = txnLog.getDataDir().listFiles(new MyFileFilter(PREFIX_LOG));
  39. List<File> files = new ArrayList<>();
  40. if (logs != null) {
  41. files.addAll(Arrays.asList(logs));
  42. }
  43. File[] snapshots = txnLog.getSnapDir().listFiles(new MyFileFilter(PREFIX_SNAPSHOT));
  44. if (snapshots != null) {
  45. files.addAll(Arrays.asList(snapshots));
  46. }
  47. //进行删除
  48. for (File f : files) {
  49. final String msg = String.format(
  50. "Removing file: %s\t%s",
  51. DateFormat.getDateTimeInstance().format(f.lastModified()),
  52. f.getPath());
  53. LOG.info(msg);
  54. System.out.println(msg);
  55. if (!f.delete()) {
  56. System.err.println("Failed to remove " + f.getPath());
  57. }
  58. }
  59. }

那么是什么时候 snapshot 呢?查看SyncRequestProcessorrun方法,这个方法时处理请求,处理请求的时候记录操作日志到 log 文件,同时在有需要进行 snapshot 的时候进行 snapshot:

  1. public void run() {
  2. try {
  3. //避免所有的server都同时进行snapshot
  4. resetSnapshotStats();
  5. lastFlushTime = Time.currentElapsedTime();
  6. while (true) {
  7. //获取请求代码省略
  8. // 请求操作纪录成功
  9. if (!si.isThrottled() && zks.getZKDatabase().append(si)) {
  10. //是否需要snapshot
  11. if (shouldSnapshot()) {
  12. //重置是否需要snapshot判断相关的统计
  13. resetSnapshotStats();
  14. //另起新文件
  15. zks.getZKDatabase().rollLog();
  16. //进行snapshot,先获取锁,保证只有一个进行中的snapshot
  17. if (!snapThreadMutex.tryAcquire()) {
  18. LOG.warn("Too busy to snap, skipping");
  19. } else {
  20. //异步snapshot
  21. new ZooKeeperThread("Snapshot Thread") {
  22. public void run() {
  23. try {
  24. zks.takeSnapshot();
  25. } catch (Exception e) {
  26. LOG.warn("Unexpected exception", e);
  27. } finally {
  28. //释放锁
  29. snapThreadMutex.release();
  30. }
  31. }
  32. }.start();
  33. }
  34. }
  35. }
  36. //省略其他
  37. }
  38. } catch (Throwable t) {
  39. handleException(this.getName(), t);
  40. }
  41. }

resetSnapshotStats()设置随机起始位,避免集群内所有实例同时进行 snapshot:

  1. private void resetSnapshotStats() {
  2. //生成随机roll,snapCount(默认100000)
  3. randRoll = ThreadLocalRandom.current().nextInt(snapCount / 2);
  4. //生成随机size,snapSizeInBytes(默认4GB)
  5. randSize = Math.abs(ThreadLocalRandom.current().nextLong() % (snapSizeInBytes / 2));
  6. }

shouldSnapshot()根据启动时设置的随机起始位以及配置,判断是否需要 snapshot

  1. private boolean shouldSnapshot() {
  2. //获取日志计数
  3. int logCount = zks.getZKDatabase().getTxnCount();
  4. //获取大小
  5. long logSize = zks.getZKDatabase().getTxnSize();
  6. //当日志个数大于snapCount(默认100000)/2 + 随机roll,或者日志大小大于snapSizeInBytes(默认4GB)/2+随机size
  7. return (logCount > (snapCount / 2 + randRoll))
  8. || (snapSizeInBytes > 0 && logSize > (snapSizeInBytes / 2 + randSize));
  9. }

``

基于时间的最低水位线实现与示例

在某些系统中,日志不是用来更新系统的状态,可以在一段时间之后删除,并且不用考虑任何子系统这个最低水位线之前的是否可以删除。例如,kafka 默认保留 7 天的 log,RocketMQ 默认保留 3 天的 commit log。

RocketMQ中最低水位线实现

DefaultMeesageStoreaddScheduleTask()方法中,定义了清理的定时任务:

  1. private void addScheduleTask() {
  2. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  3. @Override
  4. public void run() {
  5. DefaultMessageStore.this.cleanFilesPeriodically();
  6. }
  7. }, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);
  8. //忽略其他定时任务
  9. }
  10. private void cleanFilesPeriodically() {
  11. //清理消息存储文件
  12. this.cleanCommitLogService.run();
  13. //清理消费队列文件
  14. this.cleanConsumeQueueService.run();
  15. }

我们这里只关心清理消息存储文件,即DefaultMessageStoredeleteExpiredFiles方法:

  1. private void deleteExpiredFiles() {
  2. int deleteCount = 0;
  3. //文件保留时间,就是文件最后一次更新时间到现在的时间间隔,如果超过了这个时间间隔,就认为可以被清理掉了
  4. long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
  5. //删除文件的间隔,每次清理可能不止删除一个文件,这个配置指定两个文件删除之间的最小间隔
  6. int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
  7. //清理文件时,可能文件被其他线程占用,例如读取消息,这时不能轻易删除
  8. //在第一次触发时,记录一个当前时间戳,当与当前时间间隔超过这个配置之后,强制删除
  9. int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
  10. //判断是否要删除的时间到了
  11. boolean timeup = this.isTimeToDelete();
  12. //判断磁盘空间是否还充足
  13. boolean spacefull = this.isSpaceToDelete();
  14. //是否是手工触发
  15. boolean manualDelete = this.manualDeleteFileSeveralTimes > 0;
  16. //满足其一,就执行清理
  17. if (timeup || spacefull || manualDelete) {
  18. if (manualDelete)
  19. this.manualDeleteFileSeveralTimes--;
  20. boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;
  21. fileReservedTime *= 60 * 60 * 1000;
  22. //清理文件
  23. deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,
  24. destroyMapedFileIntervalForcibly, cleanAtOnce);
  25. if (deleteCount > 0) {
  26. } else if (spacefull) {
  27. log.warn("disk space will be full soon, but delete file failed.");
  28. }
  29. }
  30. }

清理文件的代码MappedFiledeleteExpiredFileByTime方法:

  1. public int deleteExpiredFileByTime(final long expiredTime,
  2. final int deleteFilesInterval,
  3. final long intervalForcibly,
  4. final boolean cleanImmediately) {
  5. Object[] mfs = this.copyMappedFiles(0);
  6. if (null == mfs)
  7. return 0;
  8. //刨除最新的那个文件
  9. int mfsLength = mfs.length - 1;
  10. int deleteCount = 0;
  11. List<MappedFile> files = new ArrayList<MappedFile>();
  12. if (null != mfs) {
  13. for (int i = 0; i < mfsLength; i++) {
  14. MappedFile mappedFile = (MappedFile) mfs[i];
  15. long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;
  16. //如果超过了过期时间,或者需要立即清理
  17. if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) {
  18. //关闭,清理并删除文件
  19. if (mappedFile.destroy(intervalForcibly)) {
  20. files.add(mappedFile);
  21. deleteCount++;
  22. if (files.size() >= DELETE_FILES_BATCH_MAX) {
  23. break;
  24. }
  25. //如果配置了删除文件时间间隔,则需要等待
  26. if (deleteFilesInterval > 0 && (i + 1) < mfsLength) {
  27. try {
  28. Thread.sleep(deleteFilesInterval);
  29. } catch (InterruptedException e) {
  30. }
  31. }
  32. } else {
  33. break;
  34. }
  35. } else {
  36. //avoid deleting files in the middle
  37. break;
  38. }
  39. }
  40. }
  41. //从文件列表里面里将本次删除的文件剔除
  42. deleteExpiredFile(files);
  43. return deleteCount;
  44. }

发表评论

表情:
评论列表 (有 0 条评论,140人围观)

还没有评论,来说两句吧...

相关阅读

    相关 spark 水位线

    Spark 水位线是指在 Spark Streaming 程序中,用来标记数据的处理进度的一个概念。它表示已经处理过的数据的最大偏移量,也就是说,Spark Streaming