1 前言
回顾下事务消息流程:
其中分为两个流程:正常事务消息的发送及提交
、事务消息的补偿流程
。
1.1 正常事务流程
- 发送消息(half 消息):图中步骤 1。
- 服务端响应消息写入结果:图中步骤 2。
- 根据发送结果执行本地事务(如果写入失败,此时 half 消息对业务不可见,本地逻辑不执行):图中步骤 3。
- 根据本地事务状态执行 Commit 或者 Rollback(Commit 操作生成消息索引,消息对消费者可见):图中步骤 4。
1.2 事务补偿流程
- 对没有
Commit/Rollback
的事务消息(pending 状态的消息),从服务端发起一次“回查”:图中步骤 5。 Producer
收到回查消息,检查回查消息对应的本地事务的状态:图中步骤 6。- 根据本地事务状态,重新
Commit
或者Rollback
:图中步骤 6。
其中,补偿阶段
用于解决消息Commit
或者Rollback
发生超时
或者失败
的情况。
1.3 事务消息状态
事务消息共有三种状态:
CommitTransaction
:提交状态,它允许消费者消费此消息(完成图中了1,2,3,4步,第4步是Commit)。RollbackTransaction
:回滚状态,它代表该消息将被删除,不允许被消费(完成图中了1,2,3,4步, 第4步是Rollback)。Unknown
:中间状态,它代表需要检查消息队列来确定状态(完成图中了1,2,3步, 但是没有4或者没有7,无法Commit或Rollback)。
public enum LocalTransactionState {
//本地事务执行成功,给Broker发送一个commit的标识
COMMIT_MESSAGE,
//本地事务执行失败,给Broker发送一个回滚的标识
ROLLBACK_MESSAGE,
//未知,既不是成功,也不是失败,引发回查操作
UNKNOW,
}
2 例子
2.1 生产者
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
//创建事务监听器,就是2.2节的东东
TransactionListener transactionListener = new TransactionListenerImpl();
//创建消息生产者
TransactionMQProducer producer = new TransactionMQProducer("TransactionProducer");
// 设置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
//创建线程池
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
//设置生产者回查线程池
producer.setExecutorService(executorService);
//生产者设置监听器,指定本地事务监听器
producer.setTransactionListener(transactionListener);
//启动消息生产者
producer.start();
String[] tags = new String[]{"TagA", "TagB", "TagC"};
for (int i = 0; i < 3; i++) {
try {
Message msg = new Message(
"TransactionTopic",
tags[i % tags.length],
"KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
//1,2步 半事务的发送,确认。
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(1000);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
//先不要停掉生产者,观察事务回调和回查
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
}
2.2 事务监听(消息回查)
public class TransactionListenerImpl implements TransactionListener {
//事务状态记录
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
/**
* 执行本地事务 3
*/
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
System.out.println("执行本地事务");
int value = transactionIndex.getAndIncrement();
//0,1,2
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
//这里模拟的不进行步骤4 A系统不知道的--UNKNOWN,这个状态会引发回查动作
return LocalTransactionState.UNKNOW;
}
/**
* 检查本地事务状态 6, 默认是60s,一分钟检查一次
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
//打印每次回查的时间
//设置日期格式
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// new Date()为获取当前系统时间
System.out.println("checkLocalTransaction:" + df.format(new Date()));
Integer status = localTrans.get(msg.getTransactionId());
if (null != status) {
switch (status) {
case 0 -> {
System.out.println("MQ检查消息【" + msg.getTransactionId() + "】事务状态【中间状态】");
return LocalTransactionState.UNKNOW;
}
case 2 -> {
System.out.println("MQ检查消息【" + msg.getTransactionId() + "】事务状态【回滚状态】");
return LocalTransactionState.ROLLBACK_MESSAGE;
}
default -> {
System.out.println("MQ检查消息【" + msg.getTransactionId() + "】事务状态【提交状态】");
return LocalTransactionState.COMMIT_MESSAGE;
}
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
3 源码追踪
3.1 Producer启动
事务消息生产者TransactionMQProducer
继承自DefaultMQProducer
,大部分与普通消息生产者无异。
启动前,会初始化相关回查线程池、事务消息生产者:
//TransactionMQProducer
@Override
public void start() throws MQClientException {
//初始化:事务消息生产者、回查线程池、阻塞队列
this.defaultMQProducerImpl.initTransactionEnv();
//这里与普通消息生产者无异
super.start();
}
//DefaultMQProducerImpl
public void initTransactionEnv() {
TransactionMQProducer producer = (TransactionMQProducer) this.defaultMQProducer;
//生产者回查线程池
if (producer.getExecutorService() != null) {
this.checkExecutor = producer.getExecutorService();
} else {
//队列长度:2000
this.checkRequestQueue = new LinkedBlockingQueue<Runnable>(producer.getCheckRequestHoldMax());
this.checkExecutor = new ThreadPoolExecutor(
//默认核心、最大线程数均为1个
producer.getCheckThreadPoolMinSize(),
producer.getCheckThreadPoolMaxSize(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.checkRequestQueue);
}
}
不难发现,它就是检查我们是否指定了线程池,如果没有指定,则MQ为我们指定一个。
然后是调用super.start()
方法,既然是调用父类的,那就是我们前面熟悉的普通消息的启动过程,这里就不在赘述了。
3.2 半消息发送(Send half message)
上述TransactionMQProducer
已经启动,接下来就要进行事务消息发送的第一步了,即半消息发送
:
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
定位至sendMessageInTransaction()
方法:
//TransactionMQProducer
@Override
public TransactionSendResult sendMessageInTransaction(final Message msg, final Object arg) throws MQClientException {
if (null == this.transactionListener) {
throw new MQClientException("TransactionListener is null", null);
}
msg.setTopic(NamespaceUtil.wrapNamespace(this.getNamespace(), msg.getTopic()));
return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);
}
实际最后调用的是DefaultMQProducerImpl#sendMessageInTransaction()
:
//DefaultMQProducerImpl
public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter localTransactionExecuter, final Object arg)
throws MQClientException {
//获取本地事务监听器
TransactionListener transactionListener = getCheckListener();
if (null == localTransactionExecuter && null == transactionListener) {
throw new MQClientException("tranExecutor is null", null);
}
// ignore DelayTimeLevel parameter
//事务消息不支持延迟
if (msg.getDelayTimeLevel() != 0) {
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
}
//校验消息
Validators.checkMessage(msg, this.defaultMQProducer);
SendResult sendResult = null;
//设置事务消息的标识
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
try {
//同步发送消息,这个消息的发送过程和普通消息一模一样,就不赘述了
//在Broker端将会根据属性判断是否为事务消息,从而做特殊处理
sendResult = this.send(msg);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
switch (sendResult.getSendStatus()) {
case SEND_OK: {
try {
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}
if (null != localTransactionExecuter) {
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
} else if (transactionListener != null) {
log.debug("Used new transaction API");
//使用这个API来执行本地事务,也就是我们指定的那个,步骤3
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
}
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}
if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
log.info("executeLocalTransactionBranch return {}", localTransactionState);
log.info(msg.toString());
}
} catch (Throwable e) {
log.info("executeLocalTransactionBranch exception", e);
log.info(msg.toString());
localException = e;
}
}
break;
//如果发送失败,则不会执行本地事务,直接设置为Rollback状态,Broker将回滚
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}
try {
//本地事务结束,设置事务状态,发给Broker
this.endTransaction(sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}
TransactionSendResult transactionSendResult = new TransactionSendResult();
transactionSendResult.setSendStatus(sendResult.getSendStatus());
transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
transactionSendResult.setMsgId(sendResult.getMsgId());
transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
transactionSendResult.setTransactionId(sendResult.getTransactionId());
transactionSendResult.setLocalTransactionState(localTransactionState);
return transactionSendResult;
}
3.3 执行本地事务
如果半消息发送成功,即发送结果状态为SEND_OK
,则执行本地事务,即图【RocketMQ事务消息流程】步骤3:
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
也就是执行2.2节,我们自定义的消息监听中的这个方法:TransactionListenerImpl#executeLocalTransaction()
。
3.4 结束事务(commit/rollback)
将本地执行事务的结果同步给Broker,让Broker去处理下本次事务消息的结果:
- 如果半消息发送失败或本地事务执行失败,则告诉Broker去
删除半消息
; - 半消息发送成功且本地事务执行成功,则告诉Broker
生效半消息
。
定位至方法:DefaultMQProducerImpl#endTransaction()
:
public void endTransaction(
final SendResult sendResult,
final LocalTransactionState localTransactionState,
final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
final MessageId id;
if (sendResult.getOffsetMsgId() != null) {
id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
} else {
id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
}
String transactionId = sendResult.getTransactionId();
final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
requestHeader.setTransactionId(transactionId);
requestHeader.setCommitLogOffset(id.getOffset());
switch (localTransactionState) {
case COMMIT_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break;
case ROLLBACK_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
break;
//如果是UNKNOW 将会引起回查
case UNKNOW:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
break;
default:
break;
}
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
requestHeader.setMsgId(sendResult.getMsgId());
String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
//将本地事务的执行结果状态发送给Broker
this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark, this.defaultMQProducer.getSendMsgTimeout());
}
总结下消息发送过程就是,客户端(消息生产者)发送事务消息主要做三件事
:
- 设置事务消息属性,将消息(半事务消息)发往Broker,这个发送过程就和普通消息一样;
- 如果消息发送成功,则执行本地事务;如果发送失败,则不会执行本地事务,直接回滚半事务消息;
- 本地事务执行完毕,则将本地事务执行的状态发送给Broker,Broker根据状态来判断是回滚还是提交。
3.5 Broker的”两阶段”
3.5.1 一阶段
一阶段为Broker接收半消息,Broker端通过SendMessageProcessor.asyncProcessRequest()
方法接收处理 Producer 发送的半消息。
最后会调用到SendMessageProcessor.asyncSendMessage()
,判断消息类型,进行消息存储。
这里也分成两个步骤来跟进:
- 解析过程
- 存储过程
3.5.1.1 解析过程
即解析生产者发过来的半消息,依次经历:TransactionalMessageServiceImpl#asyncPrepareMessage()
->TransactionalMessageBridge#parseHalfMessageInner()
。
//TransactionalMessageServiceImpl
@Override
public CompletableFuture<PutMessageResult> asyncPrepareMessage(MessageExtBrokerInner messageInner) {
return transactionalMessageBridge.asyncPutHalfMessage(messageInner);
}
暂存真实的topic、队列ID,替换为内部topic,队列ID为0:
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
//暂存真实的topic、queueID
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
String.valueOf(msgInner.getQueueId()));
msgInner.setSysFlag(
MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
//替换topic为 RMQ_SYS_TRANS_HALF_TOPIC
msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
//替换消息的队列ID为 0号队列
msgInner.setQueueId(0);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
return msgInner;
}
3.5.1.2 存储过程
同普通消息的存储过程,见源码之Store,2.1节DefaultMessageStore。
3.5.2 二阶段
即接收本地事务的执行结果状态。
3.2节中,LocalTransactionState可能会有三种状态,COMMIT_MESSAGE
, ROLLBACK_MESSAGE
, UNKNOW
,其中之一会发送给Broker进行处理。
Broker端接收事务状态的处理器是:EndTransactionProcessor
。针对可能收到的事务消息状态,Broker有不同的处理方式。
3.5.2.1 本地事务提交
如果本地事务执行成功,反馈给Broker需要进行事务的提交:
3.5.2.2 本地事务回滚
3.5.2.3 回查
EndTransactionProcessor
只有COMMIT_MESSAGE
, ROLLBACK_MESSAGE
的状态处理,唯独没有UNKNOW
,是不是阿里巴巴搞错了,漏掉了?
实际上并不是,UNKNOW
的消息状态处理是通过异步线程TransactionalMessageCheckService
来完成处理的。
该类会在BrokerController
初始化时,创建实例;在BrokerController
执行start()
方法时,开启线程执行run()
方法。
BrokerController#startProcessorByHa()
方法:
private void startProcessorByHa(BrokerRole role) {
//当节点为主节点时,开启对应的事务状态回查处理器,对PREPARE状态的消息发起事务状态回查请求。
if (BrokerRole.SLAVE != role) {
if (this.transactionalMessageCheckService != null) {
this.transactionalMessageCheckService.start();
}
}
}
TransactionalMessageCheckService#run()
:
public class TransactionalMessageCheckService extends ServiceThread {
//.....省略部分代码......
@Override
public void run() {
log.info("Start transaction check service thread!");
//60s执行一次
long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval();
while (!this.isStopped()) {
this.waitForRunning(checkInterval);
}
log.info("End transaction check service thread!");
}
@Override
protected void onWaitEnd() {
long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
long begin = System.currentTimeMillis();
log.info("Begin to check prepare message, begin time:{}", begin);
//check操作
this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
}
}
check()
方法很长,我们只要关注回查的部分:
boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime)
|| (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout))
|| (valueOfCurrentMinusBorn <= -1);
if (isNeedCheck) {
if (!putBackHalfMsgQueue(msgExt, i)) {
continue;
}
listener.resolveHalfMsg(msgExt);
} else {
pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);
log.debug("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i,
messageQueue, pullResult);
continue;
}
这个流程,有大佬把图画好了,借用:
那什么样的消息需要回查?
1)每一条半事务消息如果经过了commit/rollback
, 那么对应就会创建一个Op消息
;反过来说,如果没有Op消息,但是有Half消息,那么说明是二阶段失败了(本地事务返回的状态码是UNKNOW
),此时需要回查;
2)还有就是TM
(生产者)在6秒内
没有将最终确认状态发送给TC
(Broker),此时也会触发回查。
回查发起,我们从方法resolveHalfMsg()
追踪。
3.6 Producer回查处理
接下来,Producer收到Broker发起的回查请求,执行本地事务回查处理,处理方法就是我们自定义的监听器内的方法:checkLocalTransaction()
。
Producer接收消息回查请求的是ClientRemotingProcessor类
:
public class ClientRemotingProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
//......略.......
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
switch (request.getCode()) {
//处理Broker发过来的回查请求
case RequestCode.CHECK_TRANSACTION_STATE:
return this.checkTransactionState(ctx, request);
//.....忽略其他case.......
default:
break;
}
return null;
}
//......略........
}
checkTransactionState()
方法会调用到DefaultMQProducerImpl类
的checkTransactionState(...)
方法中,这方法内容比较多,但其实主要就做了3件事
:
- 获取本地事务监听器(就是代码实例中指定的
TransactionListenerImpl
); - 执行本地事务的回查;
- 将回查结果上报给Broker;
@Override
public void checkTransactionState(final String addr, final MessageExt msg,
final CheckTransactionStateRequestHeader header) {
Runnable request = new Runnable() {
private final String brokerAddr = addr;
private final MessageExt message = msg;
private final CheckTransactionStateRequestHeader checkRequestHeader = header;
private final String group = DefaultMQProducerImpl.this.defaultMQProducer.getProducerGroup();
@Override
public void run() {
//这个listener已废弃
TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();
//1.获取事务监听器,就是demo中我们指定的TransactionListenerImpl
TransactionListener transactionListener = getCheckListener();
if (transactionCheckListener != null || transactionListener != null) {
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
// 2. 执行本地事务回查(省略其他if判断代码),步骤6,见2.2节事务监听
localTransactionState = transactionListener.checkLocalTransaction(message);
//3. 反馈给Broker,步骤7
this.processTransactionState(
localTransactionState,
group,
exception);
} else {
log.warn("CheckTransactionState, pick transactionCheckListener by group[{}] failed", group);
}
}
//这个方法和前面的endTransaction(....)方法的逻辑是一模一样的,就是将回查后的状态发给broker
private void processTransactionState(
final LocalTransactionState localTransactionState,
final String producerGroup,
final Throwable exception) {
//.....省略诸多代码..........
try {
//3. 将回查结果上报给Broker
DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,
3000);
} catch (Exception e) {
log.error("endTransactionOneway exception", e);
}
}
};
// 这个 checkExecutor 就是demo 示例中我们指定的线程池
this.checkExecutor.submit(request);
}
然后将回查后的状态,再次发送给Broker
DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark, 3000);
至此事务消息整个流程源码跟踪完毕。
4 事务消息的使用总结
- 事务消息不支持
延时消息
和批量消息
。 - 为了避免单个消息被检查太多次而导致半队列消息累积,MQ默认将
单个消息
的检查次数限制为15 次
,用户可以通过transactionCheckMax
参数来修改此限制。如果已经回查某条消息超过transactionCheckMax
次的话,则Broker将丢弃
此消息,并打印错误日志。 - 事务消息将在参数
transactionTimeout
指定的时间长度之后被回查。当发送事务消息时,用户还可以通过设置用户属性CHECK_IMMUNITY_TIME_IN_SECONDS
来改变这个限制,该参数优先于 transactionTimeout
参数。 - 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。