博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RocketMQ顺序消息
阅读量:6245 次
发布时间:2019-06-22

本文共 2512 字,大约阅读时间需要 8 分钟。

rocketmq的顺序消息需要满足2点:

1.Producer端保证发送消息有序,且发送到同一个队列。

2.consumer端保证消费同一个队列。

 

生产端:

RocketMQ可以严格的保证消息有序。但这个顺序,不是全局顺序,只是分区(queue)顺序。要全局顺序只能一个分区。

但是同一条queue里面,RocketMQ的确是能保证FIFO的

确保消息放到同一个queue中,需要使用 MessageQueueSelector

列如:

String body = dateStr + " Hello RocketMQ " + orderList.get(i);Message msg = new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i, body.getBytes());//确保同一个订单号的数据放到同一个queue中SendResult sendResult = producer.send(msg, new MessageQueueSelector() {                    @Override                    public MessageQueue select(List
mqs, Message msg, Object arg) { Long id = (Long) arg; long index = id % mqs.size(); return mqs.get((int)index); } }, orderList.get(i).getOrderId());//订单id

 

消费端:

需要使用 MessageListenerOrderly 来消费数据。

MessageListenerOrderly与MessageListenerConcurrently区别

MessageListenerOrderly:有序消费,同一队列的消息同一时刻只能一个线程消费,可保证消息在同一队列严格有序消费  

MessageListenerConcurrently:并发消费

public class ConsumerInOrder {     public static void main(String[] args) throws MQClientException {        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");        consumer.setNamesrvAddr("10.11.11.11:9876;10.11.11.12:9876");        /**         * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
* 如果非第一次启动,那么按照上次消费的位置继续消费 */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTestjjj", "TagA || TagC || TagD"); consumer.registerMessageListener(new MessageListenerOrderly() { Random random = new Random(); @Override public ConsumeOrderlyStatus consumeMessage(List
msgs, ConsumeOrderlyContext context) { context.setAutoCommit(true); System.out.print(Thread.currentThread().getName() + " Receive New Messages: " ); for (MessageExt msg: msgs) { System.out.println(msg + ", content:" + new String(msg.getBody())); } try { //模拟业务逻辑处理中... TimeUnit.SECONDS.sleep(random.nextInt(10)); } catch (Exception e) { e.printStackTrace(); } return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); }}

 

参考文档

 

转载于:https://www.cnblogs.com/fqybzhangji/p/11044119.html

你可能感兴趣的文章
用图帮你了解https的原理
查看>>
区块链如何改变AI
查看>>
HTML5/JavaScript UI控件Wijmo Enterprise 2018v2发布
查看>>
工业仪表盘控件Iocomp ActiveX常见问题(2):Visual Basic中的错误
查看>>
Docker下使用selenium+testng实现web自动化
查看>>
当执行npm时遇到的问题
查看>>
JAVA程序员面试30问(附带答案)
查看>>
Java性能调优攻略全分享,七步搞定!(附学习资料分享)
查看>>
企业级 SpringBoot 教程 (六)springboot整合mybatis
查看>>
程序员写了一段注释, 第二天惨被公司开除, 公司巧妙回怼
查看>>
8.eclipse 安装 lombook插件
查看>>
Maven项目中使用本地JAR包方案4
查看>>
如何利用XMind创建概念图
查看>>
ldap接触(3)之LDAP特定错误以及错误一览表
查看>>
Zookeeper的功能以及工作原理
查看>>
朝花夕拾之Oracle11g 表分区
查看>>
本分类说明 -- django
查看>>
Android Binder IPC分析
查看>>
mysql分隔字符串,并将分隔字符串作为新列
查看>>
图学java基础篇之集合
查看>>