1 前言
上一章节,主要介绍了RocketMQ的物理架构和逻辑概念。这一章主要列举其中的消息类型以及各自的应用场景,举例。
2 普通消息
2.1 发送
RocketMQ支持以下三种方式发送一条MQ:同步发送、异步发送、单向发送。各自优劣见下表:
发送方式 | 发送TPS | 发送结果反馈 | 可靠性 | 适用场景 |
---|---|---|---|---|
同步发送 | 快 | 有 | 不丢失 | 重要通知邮件、报名短信通知、营销短信系统等 |
异步可靠发送 | 快 | 有 | 不丢失 | 用户视频上传后通知启动转码服务,转码完成后通知推送转码结果等 |
单向发送 | 最快 | 无 | 可能丢失 | 适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集 |
2.2 消费
一般RocketMQ默认消费模式是集群消费
,另外还可选广播消费
模式。
2.2.1 集群消费
上一章节,提到了RocketMQ官方建议的消息订阅一致性要求。如果符合这个要求,相当于在集群消费模式下,对于同一个消费者组的相同Topic
,相同Tag
的消息,做到了消费端的负载均衡,该条消息只会被RocketMQ投递到一个Consumer Group
下面的一个实例。且消费进度
(Consumer Offset)的存储会持久化到Broker
。
实际上,每个 Consumer 是平均分摊 Message Queue
的去做拉取消费。例如某个 Topic
有 3 条 Q
,其中一个 Consumer Group 有 3 个实例
(可能是 3 个进程,或者 3 台机器),那么每个实例只消费其中的 1 条 Q
。
例:
public class SyncProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("please rename _unique_group_name")
producer.setNamesrvAddr("'localhost:9876");
producer.start();
for (int i = 0; i < 100; i++) {
Message msg = new Message(
"TopicTest",
"TagA",
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
//发送消息到一个Broker
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
//如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}
消费结果:
"C: \Program Files\Java\jdk1.8.0_101 \bin\java.exe"
15:11:52.433 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework
Consumer Started.
ConsumeMessageThread_1 Receive New Messages: [MessageExt [brokerName=broker-a, queueld=1, storeSize=201, queueOffset=17
ConsumeMessageThread_2 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=2, storeSize=201, queueOffset=17
ConsumeMessageThread_3 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=3, storeSize=201, queueOffset=17
ConsumeMessageThread_4 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=0, storeSize=201, queueOffset=17
2.2.2 广播消息
消息将对一个Consumer Group
下的各个Consumer 实例
都投递一遍。即即使这些Consumer 属于同一个Consumer Group,消息也会被Consumer Group 中的每个Consumer 都消费一次。
消费进度
(Consumer Offset)会存储持久化到实例本地
。同时,出现重复的概率稍大于集群模式
。另外,广播模式下服务端不维护消费进度,所以消息队列RocketMQ 控制台不支持消息堆积查询
、消息堆积报警
和订阅关系查询
功能。
例:
public class BroadcastComsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
consumer.setNamesrvAddr("localhost: 9876");
//Topic
consumer.subscribe("TopicTest", "*");
//广播模式消费
consumer.setMessageModel(MessageModel.BROADCASTING);
//注册回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@override
public ConsumeConcurrentlyStatufconsumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消息吉
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
3 顺序消息
顺序消费的原理解析,在默认
的情况下消息发送会采取Round Robin轮询
方式把消息发送到不同的 queue
(分区队列);
而消费消息的时候从多个 queue
上拉取消息,这种情况发送和消费是不能保证顺序的。但是如果控制发送的顺序消息只依次发送到同一个 queue
中,消费
的时候只从这个 queue
上依次拉取,则就保证了顺序。
- 当发送和消费参与的
queue
只有一个,则是全局有序; - 如果多个
queue
参与,则为分区有序
,即相对每个queue
,消息都是有序的。
public class ProducerInOrder {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("OrderProducer");
producer.setNamesrvAddr("106.55.246.66:9876");//106.55.246.66
producer.start();
String[] tags = new String[]{"TagA", "TagC", "TagD"};
// 订单列表
List<Order> orderList = new ProducerInOrder().buildOrders();
Date date = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String dateStr = sdf.format(date);
for (int i = 0; i < orderList.size(); i++) {
// 加个时间前缀
String body = dateStr + " Order:" + orderList.get(i);
Message msg = new Message("PartOrder", tags[i % tags.length], "KEY" + i, body.getBytes());
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Long id = (Long) arg; //根据订单id选择发送queue
long index = id % mqs.size();
return mqs.get((int) index);
}
}, orderList.get(i).getOrderId());//订单id
System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
sendResult.getSendStatus(),
sendResult.getMessageQueue().getQueueId(),
body));
}
producer.shutdown();
}
/**
* 订单
*/
private static class Order {
private long orderId;
private String desc;
public long getOrderId() {
return orderId;
}
public void setOrderId(long orderId) {
this.orderId = orderId;
}
public String getDesc() {
return desc;
}
public void setDesc(String desc) {
this.desc = desc;
}
@Override
public String toString() {
return "Order{" +
"orderId=" + orderId +
", desc='" + desc + '\'' +
'}';
}
}
/**
* 生成模拟订单数据 3个订单 每个订单4个状态
*/
private List<Order> buildOrders() {
List<Order> orderList = new ArrayList<Order>();
Order orderDemo = new Order();
orderDemo.setOrderId(20210406001L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(20210406002L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(20210406001L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(20210406003L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(20210406002L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(20210406003L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(20210406002L);
orderDemo.setDesc("推送");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(20210406003L);
orderDemo.setDesc("推送");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(20210406002L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(20210406001L);
orderDemo.setDesc("推送");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(20210406003L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(20210406001L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
return orderList;
}
}
结果:
SendResult status: SEND_OK, queueId:1, body:2021-04-06 16:21:05 Order: Order{orderId=20210406001, desc='创建'}
SendResult status: SEND_OK, queueId:2, body:2021-04-06 16:21:05 Order: Order{orderId=20210406002, desc='创建'}
SendResult status: SEND_OK, queueId:1, body:2021-04-06 16:21:05 Order: Order{orderId=20210406001, desc='付款'}
SendResult status: SEND_OK, queueId:3, body:2021-04-06 16:21:05 Order: Order{orderId=20210406003, desc='创建'}
SendResult status: SEND_OK, queueId:2, body:2021-04-06 16:21:05 Order: Order{orderId=20210406002, desc='付款'}
SendResult status: SEND_OK, queueId:3, body:2021-04-06 16:21:05 Order: Order{orderId=20210406003, desc='付款'}
SendResult status: SEND_OK, queueId:2, body:2021-04-06 16:21:05 Order: Order{orderId=20210406002, desc='推送'}
SendResult status: SEND_OK, queueId:3, body:2021-04-06 16:21:05 Order: Order{orderId=20210406003, desc='推送'}
SendResult status: SEND_OK, queueId:2, body:2021-04-06 16:21:05 Order: Order{orderId=20210406002, desc='完成'}
SendResult status: SEND_OK, queueId:1, body:2021-04-06 16:21:05 Order: Order{orderId=20210406001, desc='推送'}
SendResult status: SEND_OK, queueId:3, body:2021-04-06 16:21:05 Order: Order{orderId=20210406003, desc='完成'}
SendResult status: SEND_OK, queueId:1, body:2021-04-06 16:21:05 Order: Order{orderId=20210406001, desc='完成'}
注意:消息消费失败时,不能返回
reconsume_later
, 而是suspend_current_queue_a_moment
,意思是先等一会,一会儿再处理这批消息,而不是放到重试队列里。
4 延迟消息
Producer
将消息发送到消息队列 RocketMQ 服务端
,但并不期望这条消息立马投递,而是延迟一定时间后
才投递到 Consumer
进行消费, 该消息即延时消息
。
4.1 适用场景
消息生产和消费有时间窗口要求:比如在电商交易中超时未支付关闭订单的场景
,在订单创建时会发送一条延时消息。这条消息将会在 30 分钟以后
投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付
。 如支付未完成,则关闭订单。如已完成支付则忽略。
注意,Apache RocketMQ 目前只支持
固定精度
的定时消息,分为18
个等级:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
,默认等级为3
,即10s
。
阿里云 RocketMQ(ONS
) 提供了任意时刻的定时消息功能。(要用啊,给钱买吧!)
4.2 举例
4.2.1 生产者
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
/**
* @author zyx
* 延时消息-生产者
*/
public class ScheduledMessageProducer {
public static void main(String[] args) throws Exception {
// 实例化一个生产者来产生延时消息
DefaultMQProducer producer = new DefaultMQProducer("ScheduledProducer");
// 设置NameServer的地址
producer.setNamesrvAddr("106.55.246.66:9876");
// 启动Producer实例
producer.start();
int totalMessagesToSend = 10;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("ScheduledTopic", ("Hello scheduled message " + i).getBytes());
// 设置延时等级3,这个消息将在10s之后投递给消费者(详看delayTimeLevel)
// delayTimeLevel:(1~18个等级)"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
message.setDelayTimeLevel(4);
// 发送消息
producer.send(message);
}
// 关闭生产者
producer.shutdown();
}
}
4.2.2 消费者
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/**
* @author zyx
* 延时消息-消费者
*/
public class ScheduledMessageConsumer {
public static void main(String[] args) throws Exception {
// 实例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ScheduledConsumer");
// 指定Namesrv地址信息.
consumer.setNamesrvAddr("106.55.246.66:9876");
// 订阅Topics
consumer.subscribe("ScheduledTopic", "*");
// 注册消息监听者
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
for (MessageExt message : messages) {
// Print approximate delay time period
System.out.println("Receive message[msgId=" + message.getMsgId() + "] "
+ (message.getStoreTimestamp() - message.getBornTimestamp()) + "ms later");
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
}
}
结果:
"C:\Program Files\Java\jdk1.8.0_101\bin\java.exe"...
16:53:00.493 [main] DEBUG i.n.u.i.l. InternalLoggerFactory - Using SLF4J as the default logging framework
Receive message[msgId=7F0000010F6C18B4AAC21D5F55620001] 10002ms later
Receive message[msgId=7F0000010F6C18B4AAC21D5F557C0012] 10002ms later
Receive message[msgId=7F0000010F6C18B4AAC21D5F557D0013] 10001ms later
Receive message[msgId=7F0000010F6C18B4AAC21D5F557B0011] 10001ms later
Receive message[msgId=7F0000010F6C18B4AAC21D5F557A0010] 10000ms later
Receive message[msgId=7F0000010F6C18B4AAC21D5F5572000B] 10000ms later
Receive message[msgId=7F0000010F6C18B4AAC21D5F5575000D] 10002ms later
Receive message[msgId=7F0000010F6C18B4AAC21D5F5573000C] 10002ms later
Receive message[msgId=7F0000010F6C18B4AAC21D5F556B0006] 10002ms later
Receive message[msgId=7F0000010F6C18B4AAC21D5F5577000E] 10002ms later
Receive message[msgId=7F0000010F6C18B4AAC21D5F5578000F] 10001ms later
5 事务消息
其中分为两个流程:正常事务消息的发送及提交
、事务消息的补偿流程
。
5.1 正常事务流程
- 发送消息(half 消息):图中步骤 1。
- 服务端响应消息写入结果:图中步骤 2。
- 根据发送结果执行本地事务(如果写入失败,此时 half 消息对业务不可见,本地逻辑不执行):图中步骤 3。
- 根据本地事务状态执行 Commit 或者 Rollback(Commit 操作生成消息索引,消息对消费者可见):图中步骤 4。
5.2 事务补偿流程
- 对没有
Commit/Rollback
的事务消息(pending 状态的消息),从服务端发起一次“回查”:图中步骤 5。 Producer
收到回查消息,检查回查消息对应的本地事务的状态:图中步骤 6。- 根据本地事务状态,重新
Commit
或者Rollback
:图中步骤 6。
其中,补偿阶段
用于解决消息Commit
或者Rollback
发生超时
或者失败
的情况。
5.3 举例
5.3.1 生产者
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author zyx
* 事务消息-消息发送方
*/
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
//创建事务监听器
TransactionListener transactionListener = new TransactionListenerImpl();
//创建消息生产者
TransactionMQProducer producer = new TransactionMQProducer("TransactionProducer");
// 设置NameServer的地址
producer.setNamesrvAddr("106.55.246.66:9876");
//创建线程池
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
//设置生产者回查线程池
producer.setExecutorService(executorService);
//生产者设置监听器
producer.setTransactionListener(transactionListener);
//启动消息生产者
producer.start();
String[] tags = new String[]{"TagA", "TagB", "TagC"};
for (int i = 0; i < 3; i++) {
try {
Message msg =
new Message("TransactionTopic",
tags[i % tags.length],
"KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
//1,2步 半事务的发送,确认。
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(1000);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
}
如果消息的总长度可能大于 4MB
时,这时候最好把消息进行分割
。比如:
public class ListSplitter implements Iterator<List<Message>> {
private int sizeLimit = 1000 * 1000;//1M
private final List<Message> messages;
private int currIndex;
public ListSplitter(List<Message> messages) {
this.messages = messages;
}
@Override
public boolean hasNext() {
return currIndex < messages.size();
}
@Override
public List<Message> next() {
int nextIndex = currIndex;
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex);
int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize + 20; // 增加日志的开销20字节
if (tmpSize > sizeLimit) {
//单个消息超过了最大的限制(1M)
//忽略,否则会阻塞分裂的进程
if (nextIndex - currIndex == 0) {
//假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则只是退出循环
nextIndex++;
}
break;
}
if (tmpSize + totalSize > sizeLimit) {
break;
} else {
totalSize += tmpSize;
}
}
List<Message> subList = messages.subList(currIndex, nextIndex);
currIndex = nextIndex;
return subList;
}
@Override
public void remove() {
throw new UnsupportedOperationException("Not allowed to remove");
}
}
运用此切分类,新的生产者如下:
public class SplitBatchProducer {
public static void main(String[] args) throws Exception {
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("BatchProducer");
// 设置NameServer的地址
producer.setNamesrvAddr("106.55.246.66:9876");
// 启动Producer实例
producer.start();
//large batch
String topic = "BatchTest";
List<Message> messages = new ArrayList<>(100 * 1000);
//10万元素的数组
for (int i = 0; i < 100 * 1000; i++) {
messages.add(new Message(topic, "Tag", "OrderID" + i, ("Hello world " + i).getBytes()));
}
//把大的消息分裂成若干个小的消息(1M左右)
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
List<Message> listItem = splitter.next();
producer.send(listItem);
Thread.sleep(100);
}
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
System.out.printf("Consumer Started.%n");
}
}
5.3.2 事务监听
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
public class TransactionListenerImpl implements TransactionListener {
//事务状态记录
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
/**
* 执行本地事务 3
*/
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
System.out.println("执行本地事务");
int value = transactionIndex.getAndIncrement();
//0,1,2
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
//这里模拟的不进行步骤4 A系统不知道的--UNKNOWN
return LocalTransactionState.UNKNOW;
}
/**
* 检查本地事务状态 默认是60s,一分钟检查一次
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
//打印每次回查的时间
//设置日期格式
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// new Date()为获取当前系统时间
System.out.println("checkLocalTransaction:" + df.format(new Date()));
Integer status = localTrans.get(msg.getTransactionId());
if (null != status) {
switch (status) {
case 0:
System.out.println("MQ检查消息【" + msg.getTransactionId() + "】事务状态【中间状态】");
return LocalTransactionState.UNKNOW;
case 2:
System.out.println("MQ检查消息【" + msg.getTransactionId() + "】事务状态【回滚状态】");
return LocalTransactionState.ROLLBACK_MESSAGE;
default:
System.out.println("MQ检查消息【" + msg.getTransactionId() + "】事务状态【提交状态】");
return LocalTransactionState.COMMIT_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
6 批量消息
批量发送消息能显著提高传递小消息
的性能。限制
是这些批量消息应该有相同的 topic
,相同的 waitStoreMsgOK
,而且不能是延时消息
。此外,这一批消息的总大小不应超过 4MB
。
6.1 生产者
import java.util.ArrayList;
import java.util.List;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
/**
* @author zyx
* 批量消息-生产者 list不要超过4m
*/
public class BatchProducer {
public static void main(String[] args) throws Exception {
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("BatchProducer");
// 设置NameServer的地址
producer.setNamesrvAddr("106.55.246.66:9876");
// 启动Producer实例
producer.start();
String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID003", "Hello world 2".getBytes()));
try {
producer.send(messages);
} catch (Exception e) {
producer.shutdown();
e.printStackTrace();
}
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}
6.2 消费者
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import java.util.List;
/**
* @author zyx
* 批量消息-消费者
*/
public class BatchConsumer {
public static void main(String[] args) throws Exception {
// 实例化消息生产者,指定组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("BatchComsuer");
// 指定Namesrv地址信息.
consumer.setNamesrvAddr("106.55.246.66:9876");
// 订阅Topic
consumer.subscribe("BatchTest", "*");
//负载均衡模式消费
consumer.setMessageModel(MessageModel.CLUSTERING);
// 注册回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n",
Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消息者
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
7 过滤消息
上一章节3.1.2节有介绍,参阅3.1.2节。
注意:上述所有例子均基于版本4.8.0,5.x版本可能会有所不同,请知悉。