1 消息中间件
属于分布式系统
中一个子系统,关注于数据的发送和接收,利用高效可靠的异步消息传递机制
对分布式系统中的其余各个子系统进行集成。
1.1 作用
解耦、异步、削峰、分发
2 物理架构
启动流程:
NameServer
先启动Broker
启动,向NameServer
注册生产者
先从NameServer
获取Broker服务地址列表
(或者集群),再根据负载均衡算法
选取一台Broker进行消息发送NameServer
与每台Broker服务器
保持长连接,并间隔30S
检测Broker是否存活,如果检测到Broker 宕机(使用心跳机制,如果检测超过120S
),则从路由注册表中将其移除消费者
先从NameServer
获取Broker服务地址列表
(或者集群),然后从Broker
中订阅消息,规则由Broker配置
决定
2.1 NameServer(服务注册中心)
NameServer
是整个 RocketMQ 的大脑
,它是 RocketMQ 的服务注册中心
,所以 RocketMQ 需要先启动 NameServer
,再启动 Rocket 中的 Broker
。
Broker 在启动时向所有 NameServer
注册(主要是服务器地址等),生产者
(或消费者
)在发送(收取)消息之前先从 NameServer
获取 Broker 服务器地址列表
,然后根据负载均衡算法
从列表中选择一台服务器进行消息发送。
NameServer
与每台 Broker 服务
保持长连接
,并间隔 30S
检查 Broker
是否存活,如果检测到 Broker
宕机,则从路由注册表中将其移除。这样就可以实现 RocketMQ 的高可用
。
NameServer
支持集群化部署,相互之间独立
,其他角色同时向多个NameServer
机器上报状态信息,从而达到热备份
的目的。
2.2 生产者(Producer)
也称为消息发布者
,负责生产并发送消息至 RocketMQ。
生产者通常被集成在业务系统中,将业务消息按照要求封装成Apache RocketMQ的消息
(Message)并发送至服务端。
生产者
和主题
的关系为多对多关系
,即同一个生产者可以向多个主题发送消息,对于平台类场景如果需要发送消息到多个主题,并不需要创建多个生产者;同一个主题也可以接收多个生产者的消息,以此可以实现生产者性能的水平扩展和容灾。
- 启动流程
- 创建消息生产者
producer
,并指定生产者组名
(3.x/4.x版本选配,5.x不需要配置,采用了匿名发送/生产) - 指定
Nameserver
地址 - 启动
producer
- 创建消息对象,指定
Topic
、Tag
和消息体
- 发送消息
- 关闭生产者
producer
- 创建消息生产者
2.3 消费者(Consumer)
也称为消息订阅者
,负责从 RocketMQ 接收并消费消息。
消费者通常被集成在业务系统中,从 Apache RocketMQ服务端
获取消息,并将消息转化成业务可理解的信息,供业务逻辑处理。
2.3.1 消费者分组
在 Apache RocketMQ 中,通过消费者分组内初始化多个消费者
实现消费性能的水平扩展
以及高可用容灾
。
在消费者分组中,统一定义以下消费行为,同一分组下的多个消费者将按照分组内统一的消费行为
和负载均衡策略
消费消息。
- 订阅关系:Apache RocketMQ 以
消费者分组
的粒度管理订阅关系,实现订阅关系的管理和追溯。 - 投递顺序性:Apache RocketMQ 的服务端将消息投递给消费者消费时,支持
顺序投递
和并发投递
,投递方式在消费者分组中统一配置。 - 消费重试策略:消费者消费消息失败时的重试策略,包括
重试次数
、死信队列
设置等。
2.3.2 启动流程
- 创建消费者
Consumer
,指定消费者组名
- 指定
Nameserver地址
- 订阅主题
Topic
和Tag
- 设置
回调函数
,处理消息 - 启动消费者
consumer
2.4 消息(Message)
生产或消费的数据,对于 RocketMQ 来说,消息就是字节数组
。
它是 Apache RocketMQ 中的最小数据传输单元
。生产者将业务数据的负载和拓展属性包装成消息发送到 Apache RocketMQ 服务端,服务端按照相关语义将消息投递到消费端进行消费。
常见的RocketMQ的消息有:
- 普通消息
- 顺序消息
- 延迟消息
- 事务消息
- 批量消息
- 过滤消息
2.5 主机(Broker)
Broker
是RocketMQ的核心
,大部分“重量级”工作都是由Broker
完成的,包括接收Producer发过来的消息
、处理Consumer的消费消息请求
、消息的持久化存储
、消息的HA机制
以及服务端过滤
功能等。 支持集群化部署
,主从架构
。
3 逻辑概念
3.1 分组(Group)
3.1.1 生产者分组
- 主要用于
事务消息
,若一个组内的某一生产者宕机,导致事务消息一直prepared
或者超时
,则选择这个组内的另一生产者进行消息发送。 3.x/4.x版本
才有生产者分组,在5.x
版本官方已经淡化,“对于历史版本服务端3.x和4.x版本,已经使用的生产者分组可以废弃无需再设置,且不会对当前业务产生影响。”
3.1.2 消费者分组
是 Apache RocketMQ 系统中承载多个消费行为一致
的消费者的负载均衡分组
。
官方建议:
- 按照业务合理拆分分组:Apache RocketMQ 的
消费者
和主题
是多对多的关系
,对于消费者分组的拆分设计,建议遵循以下原则:- 消费者的
投递顺序一致
:同一消费者分组下所有消费者的消费投递顺序是相同的,统一都是顺序投递或并发投递
,不同业务场景不能混用消费者分组。 - 消费者
业务类型一致
:一般消费者分组和主题对应,不同业务域对消息消费的要求不同,例如消息过滤属性、消费重试策略不同。因此,不同业务域主题的消费建议使用不同的消费者分组,避免一个消费者分组消费超过10个
主题。
- 消费者的
- 消费者分组管理尽量避免自动化机制:Apache RocketMQ 虽然提供了
自动创建消费者分组
的功能,但是建议仅在测试环境
使用,生产环境请勿打开,避免产生大量消费者分组,无法管理和回收,且浪费系统资源。 - 订阅关系一致:
同一个消费者组
下所有消费者实例所订阅的Topic
、Tag
必须完全一致。如果订阅关系(消费者分组名-Topic-Tag
)不一致,会导致消费消息紊乱,甚至消息丢失。
下图中,订阅的Topic一样,且过滤表达式
一致:
不一致情况:
- 同一
ConsumerGroup
下的Consumer实例
订阅的Topic
不同(3.x、4.x SDK适用)
在早期3.x/4.x
版本的SDK中,如下图所示,同一ConsumerGroup
下的三个Consumer
实例C1
、C2
和C3
分别订阅了TopicA
、TopicB
和TopicC
,订阅的Topic不一致,不符合订阅关系一致性原则。 - 同一
ConsumerGroup
下的Consumer
实例订阅的Topic
相同,但订阅的Tag
不一致
如下图所示,同一ConsumerGroup
下的三个Consumer
实例C1
、C2
和C3
分别都订阅了TopicA
,但是C1
订阅TopicA
的Tag
为Tag1
,C2
和C3
订阅的TopicA
的Tag
为Tag2
,订阅同一Topic的Tag不一致,不符合订阅关系一致性原则**。Tips:
过滤表达式:RocketMQ支持通过Tag/SQL
进行消息过滤。
上述几个图是通过Tag
进行过滤的。通过SQL
过滤支持以下几种方式:数值比较,比如:
>
,>=
,<
,<=
,BETWEEN
,=
;
字符比较,比如:=
,<>
,IN
;IS NULL
或IS NOT NULL
;
逻辑符号AND
,OR
,NOT
;
数值, 比如:123
,3.1415
;
字符, 比如:’abc’,必须用单引号
包裹起来;NULL
,特殊的常量
布尔值,TRUE
或者FALSE
.
举例,通过MessageSelector.bySql()
方法可以实现通过SQL
方式进行消息的过滤。
public class RocketMQReceiveTest {
public static void main(String[] args) throws MQClientException {
//1. 创建消息消费者, 指定消费者所属的组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myconsumer-group");
//2. 指定Nameserver地址
consumer.setNamesrvAddr("192.168.109.131:9876");
consumer.subscribe("SqlFilterTest", MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', TagB')) and (a is not null and a between 0 and 3)"));
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs, msgs.size());
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer started.");
}
}
Tips:
开启SQL过滤,需要修改Broker.conf
配置文件:enablePropertyFilter=true
,然后重启 Broker 服务。
3.2 主题(Topic)
主题
是 Apache RocketMQ 中消息传输和存储的顶层容器
,用于标识同一类业务逻辑的消息。
但主题是一个逻辑概念
,并不是实际的消息容器。主题内部由多个队列
组成,消息的存储和水平扩展能力最终是由队列
实现的;并且针对主题的所有约束和属性设置,最终也是通过主题内部的队列
来实现。
3.3 标签(Tag)
RocketMQ 支持在发送的时候给 Topic
打 Tag
,同一个 Topic 的消息虽然逻辑管理是一样的。但是消费 Topic 的时候,如果你消费订阅的时候指定的是 tagA
,那么 tagB
的消息将不会投递。
3.4 消息队列(Message Queue)
队列
是 Apache RocketMQ 中消息存储和传输的实际容器
,也是 Apache RocketMQ 消息的最小存储单元
。
一个 Topic
可有多个 MQ
,每一条MQ
均对应一个文件,这个文件存储了实际消息的索引信息。并且即使文件被删除,也能通过实际纯粹的消息文件(commit log)恢复回来。
3.5 偏移量(Offset)
RocketMQ 中,有很多 offset 的概念。一般我们只关心暴露到客户端的offset
。不指定的话,就是指 Message Queue
下面的offset
。
Message Queue
是无限长的数组。一条消息进来下标就会+1
,而这个数组的下标
就是 offset,Message Queue 中的max offset
表示消息的最大 offset;Consumer offset
可以理解为标记Consumer Group
在一条逻辑Message Queue
上,消息消费到哪里,即消费进度
。但从源码上看,这个数值是消费过的最新消费的消息 offset + 1
,即实际上表示的是下次拉取的 offset 位置。