问题描述
A项目中有一个方法,先将数据保存在mysql中,然后将mysql的插入操作返回数据的id作为消息体,通过kafka发送。(此方法已被@Transactional,所以插入数据和发送消息应该会)
B项目收到消息后,从MySQL中查询消息的详情,并进行后续操作。
线上部署时,mysql是远程的数据库服务器,而kafka,A项目,B项目部署在同一台服务器。出现一个问题就是,A发送的消息,B项目已经接收,并开始根据消息中的id查询数据库,但是此时,数据库的插入操作还未完成,导致查询结果为空。
这种情况下,如何对数据库插入操作和kafka发送消息进行控制,确保,B收到消息时,数据库插入操作已经完成,并且当kafka消息发送失败时,能够正确的回退操作。
目前,我的做法是进行手动控制,去掉@Transactional注解,当消息发送出现异常时,手动对之前的数据库插入操作新增的数据进行删除。
有没有更优雅的解决方案?
先分析下为何会产生顺序的问题:
加
@Transactional
属于数据库的事务。伪代码如下:上面的
send to kafka
发送失败了,那么数据库也会回滚,这样数据是一致的。但是如果发送kafka
成功而commit
失败,那么数据库数据是无效的,而消费者照常去消费消息,这也可能导致B服务通过Id找不到数据。上面是B服务找不到数据的两种可能情况。
所以问题的原因是对两个操作进行了不正确的事务控制。正确的事务控制应该是下面这样的:
上面
database
事务提交先于kafka
事务,这样会保证服务B在消费的时候数据已经在数据库了。database
事务提交失败了,两者都回滚,数据是一致的,database
事务提交成功,kafka
事务也提交成功,两边的数据也还是一致的,但是如果database
事务提交成功,kafka
事务提交失败,也有可能导致数据不一致(数据插入成功了,服务B无法消费)。所以得有个补偿机制,消息中间件我了保证数据一致性,会有个超时询问机制,如图:所以系统A还需提供一个事务询问的接口,供消息中间件调用。当消息中间件收到一条事务型消息后便开始计时,如果到了超时时间也没收到系统A发来的Commit或Rollback指令的话,就会主动调用系统A提供的事务询问接口询问该系统目前的状态。