1 Producer样例
public class TestProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.start();
for (int i = 0; i < 1; i++) {
try {
{
Message msg = new Message("TopicTest1", "TagA", "key113", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
QueryResult queryMessage = producer.queryMessage("TopicTest1", "key113", 10, 0, System.currentTimeMillis());
for (MessageExt m : queryMessage.getMessageList()) {
System.out.printf("%s%n", m);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
producer.shutdown();
}
}
2 启动流程
启动开始从producer.start()
开始。
//DefaultMQProducer#start
@Override
public void start() throws MQClientException {
//注入生产者组
this.setProducerGroup(withNamespace(this.producerGroup));
//采用默认的DefaultMQProducerImpl启动
this.defaultMQProducerImpl.start();
if (null != traceDispatcher) {
try {
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
}
}
DefaultMQProducerImpl
中的start()
方法是生产者启动的核心方法
,包括检查
、获取MQ ClientInstance实例
、启动
三个步骤。
public class DefaultMQProducerImpl implements MQProducerInner {
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
//1.第一次一定走这里,因为serviceState默认就是CREATE_JUST状态
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
//2.检查生产者组是否满足要求
this.checkConfig();
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
//3.获取或创建MQClientInstance
//单例MQClientManager
//同一个clientId对应同一个MQClientInstance
//MQClientInstance 封装了RocketMQ网络处理的API,是消息生产者与消费者同NameServer、Broker打交道的唯一途径
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
//4.注册当前生产者至MQClientInstance管理之中
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
//启动
if (startFactory) {
//调用MQClientInstance的API
mQClientFactory.start();
}
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
RequestFutureTable.scanExpiredRequest();
} catch (Throwable e) {
log.error("scan RequestFutureTable exception", e);
}
}
}, 1000 * 3, 1000);
}
}
2.1 检查
private void checkConfig() throws MQClientException {
//1.ProducerGroup名称是否为空、是否超过255、是否包含了除%,字母,数字,横线,下划线,空串以外的字符
Validators.checkGroup(this.defaultMQProducer.getProducerGroup());
//2.是否为空,这里重复判断了,bug
if(null==this.defaultMQProducer.getProducerGroup()){
throw new MQClientException("producerGroup is null",null);
}
//3.是否等于默认名称
if(this.defaultMQProducer.getProducerGroup().equals(MixAll.DEFAULT_PRODUCER_GROUP)){
throw new MQClientException("producerGroup can not equal "+MixAll.DEFAULT_PRODUCER_GROUP+", please specify another one.",
null);
}
}
2.2 获取MQ ClientInstance实例
public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
//1.创建clientId
String clientId = clientConfig.buildMQClientId();
//2.若已创建过实例,则直接取,否则,重新创建。key为clientID
MQClientInstance instance = this.factoryTable.get(clientId);
if (null == instance) {
instance = new MQClientInstance(clientConfig.cloneClientConfig(), this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
if (prev != null) {
instance = prev;
log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
} else {
log.info("Created new MQClientInstance for clientId:[{}]", clientId);
}
}
return instance;
}
这里,clientID
与MQClientInstance
一一对应,保存在一个ConcurrentHashMap当中。
//MQClientManager
private ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable = new ConcurrentHashMap<String, MQClientInstance>();
另外,创建完MQClientInstance
实例,还要把该Producer注册进实例当中,key
为生产者组名。
//4.注册当前生产者至MQClientInstance管理之中
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
2.3 启动
通过MQClientInstance
启动。
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel
//mQClientAPIImpl其实是个NettyRemotingClient代理
this.mQClientAPIImpl.start();
// Start various schedule tasks
this.startScheduledTask();
// Start pull service
this.pullMessageService.start();
// Start rebalance service
this.rebalanceService.start();
// Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
3 消息发送流程
DefaultMQProducerImpl
中的sendDefaultImpl()
是生产者消息发送的核心方法
。
我们从开头给的例子中send()方法出发。
SendResult sendResult = producer.send(msg);
依次经过DefaultMQProducer#send
、DefaultMQProducerImpl#send()
、DefaultMQProducerImpl#send()
、DefaultMQProducerImpl#sendDefaultImpl()
。
主要也是三个步骤:
- 验证消息
- 查找路由
- 选择队列
- 消息发送
public class DefaultMQProducerImpl implements MQProducerInner {
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.makeSureStateOK();
//1. 验证消息
Validators.checkMessage(msg, this.defaultMQProducer);
final long invokeID = random.nextLong();
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
long endTimestamp = beginTimestampFirst;
//2. 查找路由
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
boolean callTimeout = false;
MessageQueue mq = null;
Exception exception = null;
SendResult sendResult = null;
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
String[] brokersSent = new String[timesTotal];
for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName();
//3. 选择队列
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (mqSelected != null) {
mq = mqSelected;
brokersSent[times] = mq.getBrokerName();
try {
beginTimestampPrev = System.currentTimeMillis();
if (times > 0) {
//Reset topic with namespace during resend.
msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
}
long costTime = beginTimestampPrev - beginTimestampFirst;
if (timeout < costTime) {
callTimeout = true;
break;
}
//4. 消息发送
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
switch (communicationMode) {
case ASYNC:
return null;
case ONEWAY:
return null;
case SYNC:
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
continue;
}
}
return sendResult;
default:
break;
}
}catch (Exception e){
//.......................省略
}
} else {
break;
}
}
//.......................省略
validateNameServerSetting();
throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}
}
3.1 验证消息
public class Validators {
public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer) throws MQClientException {
//1. 是否为空消息
if (null == msg) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
}
// 2. 验证主题合法性
Validators.checkTopic(msg.getTopic());
//3. 该主题是否存在于不允许客户端发送消息的主题集合当中
Validators.isNotAllowedSendTopic(msg.getTopic());
// 4. 消息本身合法性验证,包括是否为空,长度是否为0,是否超长
if (null == msg.getBody()) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");
}
if (0 == msg.getBody().length) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");
}
if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
"the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
}
}
}
3.2 查找路由
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
//1. 本地缓存中是否有该主题的路由信息
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
//2. 没有,则从NameServer中拉取并缓存
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
//这里面会更新 TopicPublishInfo
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
3.3 选择队列
public class TopicPublishInfo {
//默认不启用Broker故障延迟机制
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
//第一次选择队列(非重试)
if (lastBrokerName == null) {
return selectOneMessageQueue();
} else {
//sendWhichQueue
for (int i = 0; i < this.messageQueueList.size(); i++) {
int index = this.sendWhichQueue.getAndIncrement();
//遍历消息队列集合
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0) {
pos = 0;
}
//规避上次Broker队列
MessageQueue mq = this.messageQueueList.get(pos);
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
//如果以上情况都不满足,返回sendWhichQueue取模后的队列
return selectOneMessageQueue();
}
}
//第一次选择队列
public MessageQueue selectOneMessageQueue() {
//sendWhichQueue自增
int index = this.sendWhichQueue.getAndIncrement();
//对队列大小取模
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0) {
pos = 0;
}
//返回对应的队列
return this.messageQueueList.get(pos);
}
}
3.3.1 选择策略
3.3.1.1 默认策略-轮询
采用了最简单的轮询算法
,这种算法有个很好的特性就是,保证每一个Queue队列的消息投递数量尽可能均匀
。
代码见3.3节:
selectOneMessageQueue()
。
有一个注意的地方就是计数器
使用了线程的ThreadLocal
:
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
public class ThreadLocalIndex {
private final ThreadLocal<Integer> threadLocalIndex = new ThreadLocal<Integer>();
private final Random random = new Random();
public int getAndIncrement() {
//..............
}
@Override
public String toString() {
//..............
}
}
因为本身消息的生产就可以多线程进行,所以当然要基于线程的上下文
来计数递增。
3.3.1.2 带有规避机制的策略
默认的投递方式比较简单,但是也暴露了一个问题:
有些Queue队列可能由于自身数量积压等原因,可能在投递的过程比较长,对于这样的Queue队列会影响后续投递的效果。
基于这种现象,RocketMQ在每发送一个MQ消息后,都会统计
一下消息投递的时间延迟
,根据这个时间延迟,可以知道往哪些Queue队列投递的速度快。
在这种场景下,会优先使用消息投递延迟最小的策略,如果没有生效,再使用Queue队列轮询的方式。
统计一下消息投递的时间延迟:org.apache.rocketmq.client.latency.MQFaultStrategy#updateFaultItem
的实现。
//MQFaultStrategy
public class MQFaultStrategy {
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
if (this.sendLatencyFaultEnable) {
//isolation为false,则结果为实际的时长、否则为默认的30s
long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
}
}
/**
* @param currentLatency 延迟时长
* @return 不可用时长
* @description 发送延时latencyMax:50L,100L,550L,1000L,2000L,3000L,15000L
* 故障规避notAvailableDuration:0L,0L,30000L,60000L,120000L,180000L,600000L
* 假如:消息发送时长为100ms,则mq认为broker不可用的时长为0ms;
* 假如:消息发送时长为600ms,则mq认为broker不可用的时长为30000ms;
* 假如:消息发送时长为4000ms,则mq认为broker不可用的时长为180000ms;
*/
private long computeNotAvailableDuration(final long currentLatency) {
for (int i = latencyMax.length - 1; i >= 0; i--) {
if (currentLatency >= latencyMax[i]) {
return this.notAvailableDuration[i];
}
}
return 0;
}
}
这里的操作大概如下:
根据消息发送时长(currentLatency),计算broker不可用时长(duration),即如果消息发送时间越久,mq会认为broker不可用的时长越久,broker不可用时长是个
经验值
,如果传入isolation为true,表示默认当前发送时长为30000ms,即broker不可用时长为600000ms。为啥broker不可用是600s?
发送延时latencyMax数组:50L,100L,550L,1000L,2000L,3000L,15000L
故障规避notAvailableDuration数组:0L,0L,30000L,60000L,120000L,180000L,600000L,根据代码可得:- 假如:消息发送时长为
100ms
,则mq认为broker不可用的时长为0ms
; - 假如:消息发送时长为
600ms
,则mq认为broker不可用的时长为30000ms
; - 假如:消息发送时长为
4000ms
,则mq认为broker不可用的时长为180000ms
; - 现在,消息发送市场为
30000ms
,则对应的broker不可用时长为600000ms
,即600s
,10分钟
。
- 假如:消息发送时长为
调用
latencyFaultTolerance.updateFaultItem
更新broker异常容错信息。
这个方法最终会往一个ConcurrentHashMap
表中写每台broker的延时、key
是brokerName
,value
是FaultItem
,包含broker的发送时长
、恢复可用的时间点
。
broker的预计恢复正常时间为:当前时间
+ 不可用时长
,即System.currentTimeMillis() + notAvailableDuration
。
public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String> {
//缓存broker对应的发送时长、恢复可用的时间点
private final ConcurrentHashMap<String, FaultItem> faultItemTable = new ConcurrentHashMap<String, FaultItem>(16);
private final ThreadLocalIndex whichItemWorst = new ThreadLocalIndex();
@Override
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
FaultItem old = this.faultItemTable.get(name);
if (null == old) {
final FaultItem faultItem = new FaultItem(name);
faultItem.setCurrentLatency(currentLatency);
//broker 恢复可用的时间点
faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
//保存每台broker的延时、key是brokerName,value是currentLatency(延时)。
old = this.faultItemTable.putIfAbsent(name, faultItem);
if (old != null) {
old.setCurrentLatency(currentLatency);
old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
}
} else {
old.setCurrentLatency(currentLatency);
old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
}
}
}
因此LatencyFaultToleranceImpl#isAvailable
判断broker是否预计可用的实现也很清晰了:
只要
当前时间>startTimestamp
,即表示该broker正常了(逻辑意义上的正常,预计broker会在这个时间点后恢复正常)。
//LatencyFaultToleranceImpl
@Override
public boolean isAvailable(final String name) {
final FaultItem faultItem = this.faultItemTable.get(name);
if (faultItem != null) {
return faultItem.isAvailable();
}
return true;
}
//FaultItem
public boolean isAvailable() {
return (System.currentTimeMillis() - startTimestamp) >= 0;
}
总结下带有故障规避策略的整体实现思路:
- 在消息发送失败,mq根据消息发送耗时来
预测该broker不可用的时长
,并将broker名称
,发送时长
及预计恢复时间点
,存储于ConcurrentHashMap
类型的faultItemTable
中; - 在开启消息容错后,选择消息队列时,会根据
当前时间
与FaultItem中该broker的预计恢复时间点
做比较,若(System.currentTimeMillis() - startTimestamp) >= 0
,则预计该broker恢复正常,选择该broker的消息队列; - 若所有的broker都预计不可用,
随机选择
一个不可用的broker,从路由信息中选择下一个消息队列,重置其brokerName,queueId,进行消息发送。
3.3.1.3 选择队列策略的对比
在默认队列选择机制
下,会随机选择
一个MessageQueue,若发送失败,轮询
队列重新进行重试发送(屏蔽单次发送中不可用的broker),同步模式下默认失败时重试发送2次
,但它的问题就是消息发送很大可能再次失败,引发再次重复失败,带来不必要的性能损耗。
在开启故障延迟机制
后,消息队列选择时,会在一段时间内过滤掉RocketMQ认为不可用的broker,以此来避免不断向宕机的broker发送消息,从而实现消息发送高可用
。
这两个策略没有绝对的好与坏,个人认为,如果在工作中选择,应该是看
网络环境
和服务器的环境
。
- 如果是网络和服务器环境比较好,那么我推荐
默认策略
,毕竟重试的次数和几率比较小。- 如果是网络和服务器环境压力比较大,推荐使用
故障延迟机制
。
3.4 消息发送
会经历很多方法,但最终还是会调用NettyRemotingClient
中的发送方法,选择同步发送
、异步发送
、oneway
发送其中之一。2.3节提到过,mQClientAPIImpl
其实是个NettyRemotingClient
代理。前者封装了很多API供生产者和消费者调用。