【RocketMQ学习】11.源码之事务消息


1 前言

回顾下事务消息流程:

RocketMQ事务消息流程

其中分为两个流程:正常事务消息的发送及提交事务消息的补偿流程

1.1 正常事务流程

  • 发送消息(half 消息):图中步骤 1。
  • 服务端响应消息写入结果:图中步骤 2。
  • 根据发送结果执行本地事务(如果写入失败,此时 half 消息对业务不可见,本地逻辑不执行):图中步骤 3。
  • 根据本地事务状态执行 Commit 或者 Rollback(Commit 操作生成消息索引,消息对消费者可见):图中步骤 4。

1.2 事务补偿流程

  • 对没有 Commit/Rollback 的事务消息(pending 状态的消息),从服务端发起一次“回查”:图中步骤 5。
  • Producer 收到回查消息,检查回查消息对应的本地事务的状态:图中步骤 6。
  • 根据本地事务状态,重新 Commit 或者 Rollback:图中步骤 6。
    其中,补偿阶段用于解决消息 Commit 或者 Rollback 发生超时或者失败的情况。

1.3 事务消息状态

事务消息共有三种状态:

  • CommitTransaction:提交状态,它允许消费者消费此消息(完成图中了1,2,3,4步,第4步是Commit)。
  • RollbackTransaction:回滚状态,它代表该消息将被删除,不允许被消费(完成图中了1,2,3,4步, 第4步是Rollback)。
  • Unknown:中间状态,它代表需要检查消息队列来确定状态(完成图中了1,2,3步, 但是没有4或者没有7,无法Commit或Rollback)。
public enum LocalTransactionState {
    //本地事务执行成功,给Broker发送一个commit的标识
    COMMIT_MESSAGE,
    //本地事务执行失败,给Broker发送一个回滚的标识
    ROLLBACK_MESSAGE,
    //未知,既不是成功,也不是失败,引发回查操作
    UNKNOW,
}

2 例子

2.1 生产者

public class TransactionProducer {
  public static void main(String[] args) throws MQClientException, InterruptedException {
    //创建事务监听器,就是2.2节的东东
    TransactionListener transactionListener = new TransactionListenerImpl();
    //创建消息生产者
    TransactionMQProducer producer = new TransactionMQProducer("TransactionProducer");
    // 设置NameServer的地址
    producer.setNamesrvAddr("localhost:9876");
    //创建线程池
    ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
      @Override
      public Thread newThread(Runnable r) {
        Thread thread = new Thread(r);
        thread.setName("client-transaction-msg-check-thread");
        return thread;
      }
    });
    //设置生产者回查线程池
    producer.setExecutorService(executorService);
    //生产者设置监听器,指定本地事务监听器
    producer.setTransactionListener(transactionListener);
    //启动消息生产者
    producer.start();
    String[] tags = new String[]{"TagA", "TagB", "TagC"};
    for (int i = 0; i < 3; i++) {
      try {
        Message msg = new Message(
                "TransactionTopic", 
                tags[i % tags.length], 
                "KEY" + i,
                ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
        //1,2步  半事务的发送,确认。
        SendResult sendResult = producer.sendMessageInTransaction(msg, null);
        System.out.printf("%s%n", sendResult);
        Thread.sleep(1000);
      } catch (MQClientException | UnsupportedEncodingException e) {
        e.printStackTrace();
      }
    }

    //先不要停掉生产者,观察事务回调和回查
    for (int i = 0; i < 100000; i++) {
      Thread.sleep(1000);
    }
    producer.shutdown();
  }
}

2.2 事务监听(消息回查)

public class TransactionListenerImpl implements TransactionListener {
  //事务状态记录
  private AtomicInteger transactionIndex = new AtomicInteger(0);
  private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

  /**
   * 执行本地事务 3
   */
  @Override
  public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
    System.out.println("执行本地事务");
    int value = transactionIndex.getAndIncrement();
    //0,1,2
    int status = value % 3;
    localTrans.put(msg.getTransactionId(), status);
    //这里模拟的不进行步骤4  A系统不知道的--UNKNOWN,这个状态会引发回查动作
    return LocalTransactionState.UNKNOW;
  }

  /**
   * 检查本地事务状态 6,  默认是60s,一分钟检查一次
   */
  @Override
  public LocalTransactionState checkLocalTransaction(MessageExt msg) {
    //打印每次回查的时间
    //设置日期格式
    SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    // new Date()为获取当前系统时间
    System.out.println("checkLocalTransaction:" + df.format(new Date()));
    Integer status = localTrans.get(msg.getTransactionId());
    if (null != status) {
      switch (status) {
        case 0 -> {
          System.out.println("MQ检查消息【" + msg.getTransactionId() + "】事务状态【中间状态】");
          return LocalTransactionState.UNKNOW;
        }
        case 2 -> {
          System.out.println("MQ检查消息【" + msg.getTransactionId() + "】事务状态【回滚状态】");
          return LocalTransactionState.ROLLBACK_MESSAGE;
        }
        default -> {
          System.out.println("MQ检查消息【" + msg.getTransactionId() + "】事务状态【提交状态】");
          return LocalTransactionState.COMMIT_MESSAGE;
        }
      }
    }
    return LocalTransactionState.COMMIT_MESSAGE;
  }
}

3 源码追踪

3.1 Producer启动

事务消息生产者TransactionMQProducer继承自DefaultMQProducer,大部分与普通消息生产者无异。

启动前,会初始化相关回查线程池、事务消息生产者:

//TransactionMQProducer
@Override
public void start() throws MQClientException {
    //初始化:事务消息生产者、回查线程池、阻塞队列
    this.defaultMQProducerImpl.initTransactionEnv();
    //这里与普通消息生产者无异
    super.start();
}

//DefaultMQProducerImpl
public void initTransactionEnv() {
    TransactionMQProducer producer = (TransactionMQProducer) this.defaultMQProducer;
    //生产者回查线程池
    if (producer.getExecutorService() != null) {
        this.checkExecutor = producer.getExecutorService();
    } else {
    //队列长度:2000
    this.checkRequestQueue = new LinkedBlockingQueue<Runnable>(producer.getCheckRequestHoldMax());
    this.checkExecutor = new ThreadPoolExecutor(
      //默认核心、最大线程数均为1个
      producer.getCheckThreadPoolMinSize(),
      producer.getCheckThreadPoolMaxSize(),
      1000 * 60,
      TimeUnit.MILLISECONDS,
      this.checkRequestQueue);
    }
}

不难发现,它就是检查我们是否指定了线程池,如果没有指定,则MQ为我们指定一个。

然后是调用super.start()方法,既然是调用父类的,那就是我们前面熟悉的普通消息的启动过程,这里就不在赘述了。

3.2 半消息发送(Send half message)

上述TransactionMQProducer已经启动,接下来就要进行事务消息发送的第一步了,即半消息发送

SendResult sendResult = producer.sendMessageInTransaction(msg, null);

定位至sendMessageInTransaction()方法:

//TransactionMQProducer
@Override
public TransactionSendResult sendMessageInTransaction(final Message msg, final Object arg) throws MQClientException {
    if (null == this.transactionListener) {
        throw new MQClientException("TransactionListener is null", null);
    }

    msg.setTopic(NamespaceUtil.wrapNamespace(this.getNamespace(), msg.getTopic()));
    return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);
}

实际最后调用的是DefaultMQProducerImpl#sendMessageInTransaction()

//DefaultMQProducerImpl
public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter localTransactionExecuter, final Object arg)
        throws MQClientException {
        //获取本地事务监听器
        TransactionListener transactionListener = getCheckListener();
        if (null == localTransactionExecuter && null == transactionListener) {
            throw new MQClientException("tranExecutor is null", null);
        }

        // ignore DelayTimeLevel parameter
        //事务消息不支持延迟
        if (msg.getDelayTimeLevel() != 0) {
            MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
        }
        //校验消息
        Validators.checkMessage(msg, this.defaultMQProducer);

        SendResult sendResult = null;
        //设置事务消息的标识
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
        try {
            //同步发送消息,这个消息的发送过程和普通消息一模一样,就不赘述了
            //在Broker端将会根据属性判断是否为事务消息,从而做特殊处理
            sendResult = this.send(msg);
        } catch (Exception e) {
            throw new MQClientException("send message Exception", e);
        }

        LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
        Throwable localException = null;
        switch (sendResult.getSendStatus()) {
            case SEND_OK: {
                try {
                    if (sendResult.getTransactionId() != null) {
                        msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                    }
                    String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                    if (null != transactionId && !"".equals(transactionId)) {
                        msg.setTransactionId(transactionId);
                    }
                    if (null != localTransactionExecuter) {
                        localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
                    } else if (transactionListener != null) {
                        log.debug("Used new transaction API");
                        //使用这个API来执行本地事务,也就是我们指定的那个,步骤3
                        localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
                    }
                    if (null == localTransactionState) {
                        localTransactionState = LocalTransactionState.UNKNOW;
                    }

                    if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                        log.info("executeLocalTransactionBranch return {}", localTransactionState);
                        log.info(msg.toString());
                    }
                } catch (Throwable e) {
                    log.info("executeLocalTransactionBranch exception", e);
                    log.info(msg.toString());
                    localException = e;
                }
            }
            break;
            //如果发送失败,则不会执行本地事务,直接设置为Rollback状态,Broker将回滚
            case FLUSH_DISK_TIMEOUT:
            case FLUSH_SLAVE_TIMEOUT:
            case SLAVE_NOT_AVAILABLE:
                localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
                break;
            default:
                break;
        }

        try {
            //本地事务结束,设置事务状态,发给Broker
            this.endTransaction(sendResult, localTransactionState, localException);
        } catch (Exception e) {
            log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
        }

        TransactionSendResult transactionSendResult = new TransactionSendResult();
        transactionSendResult.setSendStatus(sendResult.getSendStatus());
        transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
        transactionSendResult.setMsgId(sendResult.getMsgId());
        transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
        transactionSendResult.setTransactionId(sendResult.getTransactionId());
        transactionSendResult.setLocalTransactionState(localTransactionState);
        return transactionSendResult;
    }

3.3 执行本地事务

如果半消息发送成功,即发送结果状态为SEND_OK,则执行本地事务,即图【RocketMQ事务消息流程】步骤3:

localTransactionState = transactionListener.executeLocalTransaction(msg, arg);

也就是执行2.2节,我们自定义的消息监听中的这个方法:TransactionListenerImpl#executeLocalTransaction()

3.4 结束事务(commit/rollback)

将本地执行事务的结果同步给Broker,让Broker去处理下本次事务消息的结果:

  • 如果半消息发送失败或本地事务执行失败,则告诉Broker去删除半消息
  • 半消息发送成功且本地事务执行成功,则告诉Broker生效半消息

定位至方法:DefaultMQProducerImpl#endTransaction()

public void endTransaction(
    final SendResult sendResult,
    final LocalTransactionState localTransactionState,
    final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
    final MessageId id;
    if (sendResult.getOffsetMsgId() != null) {
        id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
    } else {
        id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
    }
    String transactionId = sendResult.getTransactionId();
    final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
    EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
    requestHeader.setTransactionId(transactionId);
    requestHeader.setCommitLogOffset(id.getOffset());
    switch (localTransactionState) {
        case COMMIT_MESSAGE:
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
            break;
        case ROLLBACK_MESSAGE:
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
            break;
        //如果是UNKNOW 将会引起回查
        case UNKNOW:
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
            break;
        default:
            break;
    }

    requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
    requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
    requestHeader.setMsgId(sendResult.getMsgId());
    String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
    //将本地事务的执行结果状态发送给Broker
    this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark, this.defaultMQProducer.getSendMsgTimeout());
}

总结下消息发送过程就是,客户端(消息生产者)发送事务消息主要做三件事

  • 设置事务消息属性,将消息(半事务消息)发往Broker,这个发送过程就和普通消息一样;
  • 如果消息发送成功,则执行本地事务;如果发送失败,则不会执行本地事务,直接回滚半事务消息;
  • 本地事务执行完毕,则将本地事务执行的状态发送给Broker,Broker根据状态来判断是回滚还是提交。

3.5 Broker的”两阶段”

3.5.1 一阶段

一阶段为Broker接收半消息,Broker端通过SendMessageProcessor.asyncProcessRequest()方法接收处理 Producer 发送的半消息。

最后会调用到SendMessageProcessor.asyncSendMessage(),判断消息类型,进行消息存储。

这里也分成两个步骤来跟进:

  • 解析过程
  • 存储过程

3.5.1.1 解析过程

即解析生产者发过来的半消息,依次经历:TransactionalMessageServiceImpl#asyncPrepareMessage()->TransactionalMessageBridge#parseHalfMessageInner()

//TransactionalMessageServiceImpl
@Override
public CompletableFuture<PutMessageResult> asyncPrepareMessage(MessageExtBrokerInner messageInner) {
    return transactionalMessageBridge.asyncPutHalfMessage(messageInner);
}

暂存真实的topic、队列ID,替换为内部topic,队列ID为0:

private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
        //暂存真实的topic、queueID
        MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
        MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
            String.valueOf(msgInner.getQueueId()));
        msgInner.setSysFlag(
            MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
        //替换topic为 RMQ_SYS_TRANS_HALF_TOPIC
        msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
        //替换消息的队列ID为 0号队列
        msgInner.setQueueId(0);
        msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
        return msgInner;
    }

3.5.1.2 存储过程

同普通消息的存储过程,见源码之Store,2.1节DefaultMessageStore

3.5.2 二阶段

即接收本地事务的执行结果状态。

3.2节中,LocalTransactionState可能会有三种状态,COMMIT_MESSAGE, ROLLBACK_MESSAGE, UNKNOW,其中之一会发送给Broker进行处理。

Broker端接收事务状态的处理器是:EndTransactionProcessor。针对可能收到的事务消息状态,Broker有不同的处理方式。

3.5.2.1 本地事务提交

如果本地事务执行成功,反馈给Broker需要进行事务的提交:

半消息提交

3.5.2.2 本地事务回滚

半消息提交

3.5.2.3 回查

EndTransactionProcessor只有COMMIT_MESSAGE, ROLLBACK_MESSAGE的状态处理,唯独没有UNKNOW,是不是阿里巴巴搞错了,漏掉了?

实际上并不是,UNKNOW的消息状态处理是通过异步线程TransactionalMessageCheckService来完成处理的。

该类会在BrokerController初始化时,创建实例;在BrokerController执行start()方法时,开启线程执行run()方法。

TransactionalMessageCheckService初始化
TransactionalMessageCheckService启动调用路径

BrokerController#startProcessorByHa()方法:

private void startProcessorByHa(BrokerRole role) {
        //当节点为主节点时,开启对应的事务状态回查处理器,对PREPARE状态的消息发起事务状态回查请求。
        if (BrokerRole.SLAVE != role) {
            if (this.transactionalMessageCheckService != null) {
                this.transactionalMessageCheckService.start();
            }
        }
    }

TransactionalMessageCheckService#run()

public class TransactionalMessageCheckService extends ServiceThread {
   
    //.....省略部分代码......

    @Override
    public void run() {
        log.info("Start transaction check service thread!");
        //60s执行一次
        long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval();
        while (!this.isStopped()) {
            this.waitForRunning(checkInterval);
        }
        log.info("End transaction check service thread!");
    }

    @Override
    protected void onWaitEnd() {
        long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
        int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
        long begin = System.currentTimeMillis();
        log.info("Begin to check prepare message, begin time:{}", begin);
        //check操作
        this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
        log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
    }

}

check()方法很长,我们只要关注回查的部分:

boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime)
          || (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout))
          || (valueOfCurrentMinusBorn <= -1);

      if (isNeedCheck) {
          if (!putBackHalfMsgQueue(msgExt, i)) {
              continue;
          }
          listener.resolveHalfMsg(msgExt);
      } else {
          pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);
          log.debug("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i,
              messageQueue, pullResult);
          continue;
      }

这个流程,有大佬把图画好了,借用:

半消息回查流程图

那什么样的消息需要回查?
1)每一条半事务消息如果经过了commit/rollback, 那么对应就会创建一个Op消息;反过来说,如果没有Op消息,但是有Half消息,那么说明是二阶段失败了(本地事务返回的状态码是UNKNOW),此时需要回查;
2)还有就是TM(生产者)在6秒内没有将最终确认状态发送给TC(Broker),此时也会触发回查。

回查发起,我们从方法resolveHalfMsg()追踪。

发起回查
发起回查
发起回查

3.6 Producer回查处理

接下来,Producer收到Broker发起的回查请求,执行本地事务回查处理,处理方法就是我们自定义的监听器内的方法:checkLocalTransaction()

Producer接收消息回查请求的是ClientRemotingProcessor类

public class ClientRemotingProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
    
    //......略.......
    
    @Override
    public RemotingCommand processRequest(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
        switch (request.getCode()) {
            //处理Broker发过来的回查请求
            case RequestCode.CHECK_TRANSACTION_STATE:
                return this.checkTransactionState(ctx, request);
            
            //.....忽略其他case.......
            default:
                break;
        }
        return null;
    }
    
     //......略........
}    

checkTransactionState()方法会调用到DefaultMQProducerImpl类checkTransactionState(...)方法中,这方法内容比较多,但其实主要就做了3件事

  • 获取本地事务监听器(就是代码实例中指定的TransactionListenerImpl);
  • 执行本地事务的回查;
  • 将回查结果上报给Broker;
@Override
public void checkTransactionState(final String addr, final MessageExt msg,
    final CheckTransactionStateRequestHeader header) {
    Runnable request = new Runnable() {
        private final String brokerAddr = addr;
        private final MessageExt message = msg;
        private final CheckTransactionStateRequestHeader checkRequestHeader = header;
        private final String group = DefaultMQProducerImpl.this.defaultMQProducer.getProducerGroup();

        @Override
        public void run() {
            //这个listener已废弃
            TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();
            //1.获取事务监听器,就是demo中我们指定的TransactionListenerImpl
            TransactionListener transactionListener = getCheckListener();
            if (transactionCheckListener != null || transactionListener != null) {
                LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
                
                // 2. 执行本地事务回查(省略其他if判断代码),步骤6,见2.2节事务监听
                localTransactionState = transactionListener.checkLocalTransaction(message);
                //3. 反馈给Broker,步骤7
                this.processTransactionState(
                    localTransactionState,
                    group,
                    exception);
            } else {
                log.warn("CheckTransactionState, pick transactionCheckListener by group[{}] failed", group);
            }
        }

        //这个方法和前面的endTransaction(....)方法的逻辑是一模一样的,就是将回查后的状态发给broker
        private void processTransactionState(
            final LocalTransactionState localTransactionState,
            final String producerGroup,
            final Throwable exception) {
            
            //.....省略诸多代码..........

            try {
               //3. 将回查结果上报给Broker
               DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,
                    3000);
            } catch (Exception e) {
                log.error("endTransactionOneway exception", e);
            }
        }
    };

    // 这个 checkExecutor 就是demo 示例中我们指定的线程池
    this.checkExecutor.submit(request);
}

然后将回查后的状态,再次发送给Broker

DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark, 3000);

至此事务消息整个流程源码跟踪完毕。

事务消息整体流程

4 事务消息的使用总结

  • 事务消息不支持延时消息批量消息
  • 为了避免单个消息被检查太多次而导致半队列消息累积,MQ默认将单个消息的检查次数限制为 15 次,用户可以通过 transactionCheckMax 参数来修改此限制。如果已经回查某条消息超过 transactionCheckMax 次的话,则Broker将丢弃此消息,并打印错误日志。
  • 事务消息将在参数 transactionTimeout 指定的时间长度之后被回查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionTimeout 参数。
  • 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。

文章作者: Kezade
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Kezade !
评论
  目录