开发学院

您的位置:首页>教程>正文

教程正文

RocketMQ 事务实例

什么是事务性消息?

  它可以被认为是两阶段提交消息实现,以确保分布式系统中的最终一致性。事务性消息确保本地事务的执行和消息的发送可以原子地执行。

使用限制

  (1)事务性消息没有时间表和批处理支持。

  (2)为了避免单个消息被检查太多次并导致半队列消息累积,默认情况下,我们将单个消息的检查次数限制为15次,但是用户可以通过更改代理配置中的transactionCheckMax参数来更改此限制,如果一个消息被检查了transactionCheckMax次,代理将丢弃此消息并在默认情况下同时打印错误日志。用户可以通过重写AbstractTransactionCheckListener类来改变这种行为。

  (3)在由代理配置中的参数transactionTimeout确定的特定时间段之后,将检查事务性消息。用户也可以通过在发送事务消息时设置用户属性CHECK_IMMUNITY_TIME_IN_SECONDS来更改此限制,此参数优先于transactionMsgTimeout参数。

  (4)事务性消息可能被检查或消费多次。

  (5)提交给用户目标主题的消息可能会失败。目前,这取决于日志记录。RocketMQ本身的高可用性机制确保了高可用性。如果您想确保事务性消息不会丢失,并且事务完整性得到保证,建议使用同步双写机制。

  (6)事务性消息的生产者标识不能与其他类型消息的生产者标识共享。与其他类型的消息不同,事务性消息允许向后查询。MQ服务器通过客户端的生产者标识来查询客户端。

应用实例

1、事务消息状态

  事务性消息有三种状态:

  (1)TransactionStatus.CommitTransaction:提交事务,这意味着允许消费者使用此消息。

  (2)TransactionStatus.RollbackTransaction:回滚事务,这意味着消息将被删除,不允许消费。

  (3)TransactionStatus.Unknown:中间状态,这意味着MQ需要检查以确定状态。

2、发送事务性消息

  (1)创建事务生产者

使用TransactionMQproducer类创建Producer客户端,并指定唯一的Producer组,您可以设置自定义线程池来处理检查请求。执行本地事务后,您需要根据执行结果回复MQ,回复状态在上面的章节中有所描述。

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;

public class TransactionProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        TransactionListener transactionListener = new TransactionListenerImpl();
        TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });

        producer.setExecutorService(executorService);
        producer.setTransactionListener(transactionListener);
        producer.start();

        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 10; i++) {
            try {
                Message msg =
                    new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult);

                Thread.sleep(10);
            } catch (MQClientException | UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }

        for (int i = 0; i < 100000; i++) {
            Thread.sleep(1000);
        }
        producer.shutdown();
    }
}

  (2)实现TransactionOnListener接口

  当一半发送消息成功时,executeLocalTransaction方法用于执行本地事务。它返回上一节提到的三种事务状态之一。

  checkLocalTransaction方法用于检查本地事务状态并响应MQ检查请求。它同样返回上一节提到的三种事务状态之一。

 import ...
   
   public class TransactionListenerImpl implements TransactionListener {
       private AtomicInteger transactionIndex = new AtomicInteger(0);
   
       private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
   
       @Override
       public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
           int value = transactionIndex.getAndIncrement();
           int status = value % 3;
           localTrans.put(msg.getTransactionId(), status);
           return LocalTransactionState.UNKNOW;
       }
   
       @Override
       public LocalTransactionState checkLocalTransaction(MessageExt msg) {
           Integer status = localTrans.get(msg.getTransactionId());
           if (null != status) {
               switch (status) {
                   case 0:
                       return LocalTransactionState.UNKNOW;
                   case 1:
                       return LocalTransactionState.COMMIT_MESSAGE;
                   case 2:
                       return LocalTransactionState.ROLLBACK_MESSAGE;
               }
           }
           return LocalTransactionState.COMMIT_MESSAGE;
       }
   }