找回密码
 会员注册
查看: 29|回复: 0

RocketMQ事务消息转转版与社区版的分析

[复制链接]

2万

主题

0

回帖

6万

积分

超级版主

积分
64437
发表于 2024-9-19 17:24:00 | 显示全部楼层 |阅读模式
一、背景二、转转版2.1基本原理2.2如何发送消息2.3如何处理未发送消息三、社区版3.1基本原理3.2如何发送消息3.3如何处理未知状态消息四、总结一、背景在公司使用RocketMQ的时候发现事务消息不是社区版的,而是自研版本。这就引发了强烈的好奇,为什么要自己研发一套呢?和社区版的又有什么不同呢?带着问题,咨询了公司的架构部苑同学后,得到了答案:转转的RocketMQ版本是3.4.6并且事务消息第一版提交于2017年12月,但是社区版直到2018年8月的4.3.0版本才开始支持事务消息。这里查找了一下社区版对于事务消息支持的一些重要版本信息:RocketMQ3.0.8以及之前的版本是支持分布式事务消息;RocketMQ3.0.8之后,分布式事务阉割了,不支持分布式事务消息;RocketMQ4.0.0开始apache孵化,但是也不支持分布式事务消息;直到上面说的4.3.0版本才支持事务消息。在这种情况下,如果要用到事务消息该怎么处理呢? 相信你也猜到了,那就是咱们做自己的事务消息。那么各自主要的实现方式是怎样的呢?转转版是将事务消息处理的压力放在了发送方这一端,需要在发送方的数据库(MySQL)中建一张事务消息表。RocketMQ由于本身是开源项目无法强制要求使用方来配合,所以将事务消息处理放在了自己的Broker中,但是Broker自身无法得知事务的结果,只能设计一个回查的流程来确认消息的结果,这就导致了在使用社区版事务消息时无法避免的需要提供回查的能力。二、转转版2.1基本原理转转的事务消息利用了数据库(MySQL)本地事务的特性,需要在发送端的数据库(MySQL)中创建一个事务消息表来和本地事务一起提交或者回滚。在使用上非常简单,这里列一个简单的使用例子:仓储商品入库需要向上游同步入库消息@Transactional(rollbackFor = Exception.class)  // 事务中public void testTransaction() throws Exception {  SingleInboundReq inboundReq= new SingleInboundReq();  // 入库实体  inboundService.singleInbound(inboundReq);  // 本地事务操作  提交入库  InventoryOrderMsg inventoryOrderMsg = new InventoryOrderMsg();  // 消息内容   mqProduceComponent.sendCisAddMq(inventoryOrderMsg);  //发送事务消息  同步入库信息}可以看到,只要你的消息放在本地事务中即可,不用做其他处理。那么它是如何发送事务消息的呢?2.2如何发送消息发送中和本地事务一起提交数据,随后交给内存中的一个队列,再由其他线程异步处理。当然这里的处理线程是可以配置的,到底需要多少个线程由客户端根据自己的业务情况决定。那么取到msg后怎么处理呢?msg中会存放数据库主键以及相应的数据库信息,取到信息后会去数据库查询一次信息内容,经过一些基础的判断后发送MQ消息给到Broker,随后删除本地事务表中的数据。解释下一下基础判断的内容:主要是对消息的内容,重试的次数,重试的时间等做验证,后面的图中会详细说明在发送消息时可能会出现1个很重要的问题:发送失败原因有很多,但结果都是同样的,针对这种问题,又要怎么处理呢?在发送失败或者未查询到数据后,会将这个msg丢入到另一个队列timeWheelQueue,由另一个定时任务去处理。具体流程如图:这里采用scheduleAtFixedRate定时任务每5毫秒从另一个队列(timeWheelQueue)来获取重试消息,并且逻辑只是判断过期时间,没过期则重新丢入处理队列(msgQueue)进行处理。为什么是5毫秒一次呢?同一条消息的重试次数和时间间隔是有限制的,经过这几年的迭代后,目前定的时间间隔(单位毫秒)是0,5,10,25,50,100,200,300,500,800,1000,1000,1000,大部分的消息是在前面5个时间点就能发出,可以看到首次的重试时间就是5毫秒,所以重试任务的执行时间就定为了5毫秒一次。2.3如何处理未发送消息如果因为上线或者其他情况导致服务重启,未发送完的消息该如何处理呢?启动一个线程定时拉取10秒之前的数据,组成消息发送给broker,随后删除该条数据。为什么是10秒之前的数据?大部分事务10秒内都会提交完成,这部分数据会在内存中直接发送掉,为了避免和扫表的任务并发,综合考虑是只扫10秒之前的数据,当然10秒是经过几次迭代考量得出来的一个值。具体流程如图:当然转转版的事务消息是特定场景下的产物,并不适用于外部。一些参数上的配置都是在场景里迭代出来的一个值。那么社区版后面提供的事务消息是怎么做的呢?三、社区版3.1基本原理社区版事物消息则是利用2阶段提交的想法来实现的第一阶段:先发送一个half消息,随后执行本地事务第二阶段:得到本地事务结果后发送一个op消息说明一下2个概念:half消息本质是指写入RMQ_SYS_TRANS_HALF_TOPIC的消息用来保存第一阶段的事务消息意思是说我这有一些不知道事务最后是提交还是回滚的消息op消息本质上是指写入RMQ_SYS_TRANS_OP_HALF_TOPIC的消息 用来保存第二阶段本地事务的状态意思是说我这知道一些half消息的事务结果来看下社区版的事务消息是怎么使用的:社区版的事务消息重点在于setTransactionListener中,这里面需要实现2个方法。executeLocalTransaction用来执行本地事务,返回本地事务结果给到Broker。checkLocalTransaction用来给Broker查询本地事务结果。那么是怎么发送事务消息的呢?3.2如何发送消息在发送一个事务消息时,第一次先把真实消息的topic和queueId,tags等信息放入自己的properties中,随后设置该消息的topic为RMQ_SYS_TRANS_HALF_TOPIC发送给Broker。Broker返回成功后执行本地事务也就是executeLocalTransaction此方法的内容,得到本地事务的结果。最后执行endTransaction方法确认该消息的事务结果,当然这次写入的就是op消息。为了方便理解,画了一幅简图:第一阶段发的事务消息其实就可以看作是发送一个普通的消息给Broker去保存下来,只不过是保存到事务消息专用的half消息中。第二阶段是我们要了解的重点,得到事务结果后Broker会怎么处理呢?源码是在EndTransactionProcessor#processRequest方法中,这里不展开说源码细节。说白了就2个点:如果事务状态是提交,那么会组装出真实消息内容写入到commitLog中,随后组装一个tag为d的op消息写入commitLog中。如果事务状态是回滚,那么只会往commitLog中写入一个tag为d的op消息。这里的commitLog是RocketMQ用来保存消息的文件,这里不展开讨论commitLog的细节。如图:那么为什么要写op消息呢?我们知道RocketMQ消息写入后是不会修改和删除的,如果要指明一个事务消息的结果只能通过另外一个topic来指明,RocketMQ就是使用了op消息来指明某条事务消息的最终结果,该op消息的body内容会保存对应half消息的位置。op消息是如何表示一条事务消息的状态呢?op消息通过一个tag为d的标签来表示对应的half消息是完结状态从而无需再去请求生产者获取该消息的结果,直接忽略不处理即可。3.3如何处理未知状态消息有的half消息由于各种原因,一直没有得到本地事务的结果,那么RocketMQ是怎么处理的呢?在Broker启动时会开启一个线程每60秒去核对一次事务消息的结果,源码在BrokerController#start中。最后执行会调用TransactionalMessageServiceImpl#check方法,具体源码这里不细说。总的来说分以下几个步骤:获取当前half消息op消息的消费位置;一次性获取op消息的32条消息,判断是否有已经处理过的消息组成一个removeMap,键为half消息的位置,值为op消息的位置;while(true)处理,从half消息的当前位置依次获取消息,如果在removeMap中则跳过;不在removeMap则判断是否已经处理超过5次BrokerConfig.transactionCheckMax由此配置控制,或者距今是否超过72小时由MessageStoreConfig.fileReservedTime配置控制,如果是则跳过不处理;如果该half消息没有对应的op结果,则重新发送half消息并且异步去询问生产者结果。整体流程如图小小的吐槽一下,看完源码才明白为什么说阿里规范都是对外的)四、总结通过上面的分析,可以看到在不同场景下会出现不同的技术方案。各种方案都有其优劣,好与坏不是思考的重点,适用与否或许更加重要。最后附上整理后的源码流程图:社区版整体流程图: https://www.processon.com/view/link/6283410c5653bb3b4934e348转转版整体流程图: https://www.processon.com/view/link/6274d9af5653bb45ea4ab5d6作者简介 涂晓伟,终身学习践行者,转转履约中台研发工程师想了解更多转转公司的业务实践,欢迎点击关注下方公众号:
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 会员注册

本版积分规则

QQ|手机版|心飞设计-版权所有:微度网络信息技术服务中心 ( 鲁ICP备17032091号-12 )|网站地图

GMT+8, 2024-12-26 23:38 , Processed in 0.519864 second(s), 25 queries .

Powered by Discuz! X3.5

© 2001-2024 Discuz! Team.

快速回复 返回顶部 返回列表