【RocketMQ】基本使用:Java操作RocketMQ(rocketmq-client)

短命女 2022-12-22 08:35 338阅读 0赞

1.引入依赖

  1. <dependency>
  2. <groupId>org.apache.rocketmq</groupId>
  3. <artifactId>rocketmq-client</artifactId>
  4. <version>4.5.2</version>
  5. </dependency>

2.Producer

  1. public class MyProducer {
  2. public static void main(String[] args) throws Exception{
  3. // 构造Producer时,必须指定groupId
  4. DefaultMQProducer producer = new DefaultMQProducer("my_producer_group");
  5. // 只用namesrv的地址就行,它会从namesrv上拿到broker的地址和topic信息
  6. producer.setNamesrvAddr("43.107.136.120:9876");
  7. producer.start();
  8. int num = 0;
  9. while (num < 20) {
  10. num++;
  11. /**
  12. * rocketmq封装了Message
  13. * String topic,
  14. * String tags, 标签(分类)---> 筛选
  15. * byte[] body
  16. */
  17. Message message = new Message("my_test_topic", "", ("hello rocketmq:" + num).getBytes());
  18. // 发送消息,拿到返回SendResult
  19. SendResult result = producer.send(message);
  20. System.out.println(result);
  21. }
  22. }
  23. }

启动并发送消成功后,返回的SendResult如下:

在这里插入图片描述

SendResult中,有一个sendStatus状态,表示消息的发送状态。一共有四种状态

  • FLUSH_DISK_TIMEOUT : 表示没有在规定时间内完成刷盘(需要Broker 的刷盘策Ill创立设置成 SYNC_FLUSH 才会报这个错误)
  • FLUSH_SLAVE_TIMEOUT :表示在主备方式下,并且Broker 被设置成SYNC_MASTER 方式,没有在设定时间内完成主从同步。
  • SLAVE_NOT_AVAILABLE : 这个状态产生的场景和FLUSH_SLAVE_TIMEOUT 类似, 表示在主备方式下,并且Broker 被设置成SYNC_MASTER ,但是没有找到被配置成Slave 的Broker 。
  • SEND OK :表示发送成功,发送成功的具体含义,比如消息是否已经被存储到磁盘?消息是否被同步到了Slave 上?消息在Slave 上是否被写入磁盘?需要结合所配置的刷盘策略、主从策略来定。这个状态还可以简单理解为,没有发生上面列出的三个问题状态就是SEND OK

在这里插入图片描述

3.Consumer

  1. public class MyConsumer {
  2. public static void main(String[] args) throws MQClientException {
  3. // 构造Consumer时,必须指定groupId
  4. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_consumer_group");
  5. consumer.setNamesrvAddr("39.105.136.112:9876"); // nameServer地址,用于获取broker、topic信息
  6. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  7. // 指定订阅的主题与tag,通过tag可以定制性消费(*表示全部tag)
  8. consumer.subscribe("my_test_topic", "*");
  9. // 异步消费
  10. consumer.registerMessageListener(new MessageListenerConcurrently() {
  11. @Override
  12. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
  13. ConsumeConcurrentlyContext context) {
  14. System.out.println("Receive Message:" + msgs);
  15. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 签收
  16. }
  17. });
  18. consumer.start();
  19. }
  20. }

收到消息的内容:

在这里插入图片描述

consumer Group:位于同一个consumer Group中的consumer实例

  • 和producer Group中的各个produer实例承担的角色类似
  • 同一个group中可以配置多个consumer,可以提高消费端的并发消费能力以及容灾
  • 和kafka一样,多个consumer会对消息做负载均衡,意味着同一个topic下的不同messageQueue会分发给同一个group中的不同consumer。
  • 同时,如果我们希望消息能够达到广播的目的,那么只需要把consumer加入到不同的group就行。

在这里插入图片描述

发表评论

表情:
评论列表 (有 0 条评论,338人围观)

还没有评论,来说两句吧...

相关阅读

    相关 java使用rocketMq

    RcoketMQ 是一款低延迟、高可靠、可伸缩、易于使用的消息中间件,号称消息中间件中的最强者,支持高并发,亿级的消息堆积能力,在高并发的电商,金融等业务场景中多有使用。其具有