【消息队列开发】 实现消息持久化

曾经终败给现在 2024-04-17 10:53 146阅读 0赞

文章目录

  • ?前言
  • ?消息存储格式设计
    • ?queue_data文件设计
    • ?queue_stat文件设计
    • ?拓展
  • ?实现统计文件(queue_stat)的读写
  • ⭕总结

?前言

本次开发目标,实现消息持久化
在这里插入图片描述

?消息存储格式设计

在前面最开始博主的设计就为将消息存储在硬盘上,那么我们应该以怎样的格式存储在硬盘上面呢?

我们的消息是需要依附于队列的,因此我们在存储的时候,可以将消息按照队列的维度展开。

在前面数据创建的时候,我们有了一个data目录(meta.db就在这个目录中)

我们可以在data中创建一些子目录,子目录的名字就是队列名。
在这里插入图片描述
然后我们再每个队列的子目录下,再分配两个文件,来存储消息

  1. 第一个文件:queue_data这里保存消息的内容
  2. 第二个文件:queue_stat 这里用来保存消息的统计信息(具体统计什么信息,后买你会说到)

?queue_data文件设计

关于queue_data.txt文件的设计

我是这样设计的,由于queue_data文件是一个二进制格式的文件。

所以我做出一下约定,这个文件包含若干个消息,每个消息都已二进制的方式进行存储。那么每个消息约定有以下几部分组成

在这里插入图片描述
而相应的二进制数据模块又由以下几部分组成

在这里插入图片描述

起初在设计消息类的时候,还涉及两个元素offsetBeg,offsetEnd,没有让它们序列化
在这里插入图片描述
此时呢,这两个属性就不会跟着Message进入硬盘中。而是在内存中进行存储,方便随时找到内存中的Message对象,就能找到对应的Message对象了

如此以来我们的queue_data文件设计就完成了

?queue_stat文件设计

那么queue_stat文件是用来干嘛的呢?

再上面的queue_data文件中有一个属性叫isValid,代表的是 是否持久化,也就是需要进行删除

对于Broker Server来说消息是需要新增,也需要删除的.

生产者生产-一个消息过来,就得新增这个消息

消费者把这个消息消费掉,这个消息就得删除.

新增和删除,对于内存中来说,好办 (直接使用一一些集合类)

但是在文件。上就麻烦了。新增消息可以直接把新的消息追加到文件末尾

删除消息不好搞文件可以视为是一个”顺序表”这样的结构如果直接

删除中间元素,就需要涉及到类似于”顺序表搬运”,这样的操作,效率是非常低的

因此,使用这种搬运的方式删除,是不合适的

因此我们使用逻辑删除,是比较合适的

  • isValid为1,有效
  • isValid为0,无效

但是呢,随着时间的推移,这些文件可能越来越大,并且可能大部分都是无效消息,针对这种情况,就需要考虑对当前文件进行垃圾回收

关于垃圾回收,博主这里使用的复制算法

我们只需要直接遍历原有的消息数据文件,把所有的有效的数据拷贝到一个新的文件中,再把之前的整个旧文件都删除就好了

那我们什么时候执行垃圾回收呢?

此处我们这里做出这样的约定,当总数目超过2000(这个可以随意定义),并且有效消息的数目低于总消息数目的50%(随意定义),就触发垃圾回收

如此以来,我们queue_stat文件的作用,就体现出来,

  • 用来记录总消息数目与有效消息数目

queue_stat文件定义为文本格式,只存一行数据,一行有两列:

  • 第一列 是总的消息数目
  • 第二列 是有效消息数目
  • 两者之间使用 \t 分割
  • 形如2000\t1888

如此以来,queue_stat文件也就此完成了

?拓展

如果整个队列中,消息特别特别多,而且都是有效消息

此时就会导致整个消息的数据库文件特别大,后续针对这个文件的操作,成本也会上移

假如这个文件非常大(10G),触发一次GC,整体耗时就会非常高

虽然博主这里没有解决该问题,但是可以提供以下思路

  1. 需要专门的数据结构,来存储当前队列中有多少个数据文件,每个文件大小是多少,消息数目是多少,无效消息是多少.
  2. 设计策略,什么时候触发文件的拆分.什么时候触发文件的合并

?实现统计文件(queue_stat)的读写

统计文件读写来说相对较为简单,所以这里就不进行讲解了,相关注解已包含再代码中,实现代码如下:

  1. // 定义一个内部类, 来表示该队列的统计信息
  2. // 有限考虑使用 static, 静态内部类.
  3. static public class Stat {
  4. // 此处直接定义成 public, 就不再搞 get set 方法了.
  5. // 对于这样的简单的类, 就直接使用成员, 类似于 C 的结构体了.
  6. public int totalCount; // 总消息数量
  7. public int validCount; // 有效消息数量
  8. }
  9. // 预定消息文件所在的目录和文件名
  10. // 这个方法, 用来获取到指定队列对应的消息文件所在路
  11. private String getQueueDir(String queuename) {
  12. return ".data" + queuename;
  13. }
  14. // 这个方法用来获取该队列的消息数据文件路径
  15. // 注意, 二进制文件, 使用 txt 作为后缀, 不太合适. txt 一般表示文本. 此处咱们也就不改.
  16. // .bin / .dat
  17. private String getQueueDataPath(String queueName) {
  18. return getQueueDir(queueName) + "/queue_data.txt";
  19. }
  20. // 这个方法用来获取该队列的消息统计文件路径
  21. private String getQueueStatPath(String queueName) {
  22. return getQueueDir(queueName) + "/queue_stat.txt";
  23. }
  24. private Stat readStat(String queuename) {
  25. // 由于当前的消息统计文件是文本文件, 可以直接使用 Scanner 来读取文件内容
  26. Stat stat = new Stat();
  27. try(InputStream inputStream = new FileInputStream(getQueueStatPath(queuename))) {
  28. Scanner scanner = new Scanner(inputStream);
  29. stat.totalCount = scanner.nextInt();
  30. stat.validCount = scanner.nextInt();
  31. return stat;
  32. } catch (IOException e) {
  33. e.printStackTrace();
  34. }
  35. return null;
  36. }
  37. private void writeStat(String queueName, Stat stat) {
  38. // 使用 PrintWrite 来写文件.
  39. // OutputStream 打开文件, 默认情况下, 会直接把原文件清空. 此时相当于新的数据覆盖了旧的.
  40. try(OutputStream outputStream = new FileOutputStream(getQueueStatPath(queueName))) {
  41. PrintWriter printWriter = new PrintWriter(outputStream);
  42. printWriter.print(stat.totalCount + "/t" + stat.validCount);
  43. printWriter.flush();
  44. } catch (IOException e) {
  45. e.printStackTrace();
  46. }
  47. }
  48. // 创建队列对应的文件和目录
  49. public void createQueueFiles(String queueName) throws IOException {
  50. // 1. 先创建队列对应的消息目录
  51. File baseDir = new File(getQueueDir(queueName));
  52. if (!baseDir.exists()) {
  53. // 不存在, 就创建这个目录
  54. boolean ok = baseDir.mkdirs();
  55. if (!ok) {
  56. throw new IOException("创建目录失败! baseDir=" + baseDir.getAbsolutePath());
  57. }
  58. }
  59. // 2. 创建队列数据文件
  60. File queueDataFile = new File(getQueueDataPath(queueName));
  61. if (!queueDataFile.exists()) {
  62. boolean ok = queueDataFile.createNewFile();
  63. if (!ok) {
  64. throw new IOException("创建文件失败! queueDataFile=" + queueDataFile.getAbsolutePath());
  65. }
  66. }
  67. // 3. 创建消息统计文件
  68. File queueStatFile = new File(getQueueStatPath(queueName));
  69. if (!queueStatFile.exists()) {
  70. boolean ok = queueStatFile.createNewFile();
  71. if (!ok) {
  72. throw new IOException("创建文件失败! queueStatFile=" + queueStatFile.getAbsolutePath());
  73. }
  74. }
  75. // 4. 给消息统计文件, 设定初始值. 0\t0
  76. Stat stat = new Stat();
  77. stat.totalCount = 0;
  78. stat.validCount = 0;
  79. writeStat(queueName, stat);
  80. }

除此之外呢,博主还提供了两个方法,作用分别为

  • 删除队列的文件和目录
  • 检查队列的目录和文件是否存在.

实现如下:

  1. public void destroyQueueFiles(String queueName) throws IOException {
  2. // 先删除里面的文件, 再删除目录.
  3. File queueDataFile = new File(getQueueDataPath(queueName));
  4. boolean ok1 = queueDataFile.delete();
  5. File queueStatFile = new File(getQueueStatPath(queueName));
  6. boolean ok2 = queueStatFile.delete();
  7. File baseDir = new File(getQueueDir(queueName));
  8. boolean ok3 = baseDir.delete();
  9. if (!ok1 || !ok2 || !ok3) {
  10. // 有任意一个删除失败, 都算整体删除失败.
  11. throw new IOException("删除队列目录和文件失败! baseDir=" + baseDir.getAbsolutePath());
  12. }
  13. }
  14. // 检查队列的目录和文件是否存在.
  15. // 比如后续有生产者给 broker server 生产消息了, 这个消息就可能需要记录到文件上(取决于消息是否要持久化)
  16. public boolean checkFilesExits(String queueName) {
  17. // 判定队列的数据文件和统计文件是否都存在!!
  18. File queueDataFile = new File(getQueueDataPath(queueName));
  19. if (!queueDataFile.exists()) {
  20. return false;
  21. }
  22. File queueStatFile = new File(getQueueStatPath(queueName));
  23. if (!queueStatFile.exists()) {
  24. return false;
  25. }
  26. return true;
  27. }

⭕总结

关于《【消息队列开发】 实现消息持久化》就讲解到这儿,感谢大家的支持,欢迎各位留言交流以及批评指正,如果文章对您有帮助或者觉得作者写的还不错可以点一下关注,点赞,收藏支持一下

发表评论

表情:
评论列表 (有 0 条评论,146人围观)

还没有评论,来说两句吧...

相关阅读

    相关 MQ持久消息

    ActiveMQ的另一个问题就是只要是软件就有可能挂掉,挂掉不可怕,怕的是挂掉之后把信息给丢了,所以本节分析一下几种持久化方式: 一、持久化为文件 ActiveMQ默认...