kafka和mysql操作,如何确保先后顺序

问题描述

A项目中有一个方法,先将数据保存在mysql中,然后将mysql的插入操作返回数据的id作为消息体,通过kafka发送。(此方法已被@Transactional,所以插入数据和发送消息应该会)

B项目收到消息后,从MySQL中查询消息的详情,并进行后续操作。

线上部署时,mysql是远程的数据库服务器,而kafka,A项目,B项目部署在同一台服务器。出现一个问题就是,A发送的消息,B项目已经接收,并开始根据消息中的id查询数据库,但是此时,数据库的插入操作还未完成,导致查询结果为空。

这种情况下,如何对数据库插入操作和kafka发送消息进行控制,确保,B收到消息时,数据库插入操作已经完成,并且当kafka消息发送失败时,能够正确的回退操作。

目前,我的做法是进行手动控制,去掉@Transactional注解,当消息发送出现异常时,手动对之前的数据库插入操作新增的数据进行删除。

有没有更优雅的解决方案?

阅读 6.5k
3 个回答

先分析下为何会产生顺序的问题:
@Transactional属于数据库的事务。伪代码如下:

database transaction begin;
try{
    insert into table;
    send to kafka;//这个动作先发生,有可能消息已被消费,而下面的commit还没执行完成。
    database transaction commit;//这个必须放在后面以保证两个动作的事务特性。
} catch (exception) {
    database transaction rollback;
}

上面的send to kafka发送失败了,那么数据库也会回滚,这样数据是一致的。但是如果发送kafka成功而commit失败,那么数据库数据是无效的,而消费者照常去消费消息,这也可能导致B服务通过Id找不到数据。

上面是B服务找不到数据的两种可能情况。

所以问题的原因是对两个操作进行了不正确的事务控制。正确的事务控制应该是下面这样的:

kafka transaction begin;
database transaction begin;
try{
    insert into table;
    send to kafka;
    database transaction commit;
    kafka transaction commit;
} catch (exception) {
    database transaction rollback;
    kafka transaction rollback;
}

上面database事务提交先于kafka事务,这样会保证服务B在消费的时候数据已经在数据库了。database事务提交失败了,两者都回滚,数据是一致的,database事务提交成功,kafka事务也提交成功,两边的数据也还是一致的,但是如果database事务提交成功,kafka事务提交失败,也有可能导致数据不一致(数据插入成功了,服务B无法消费)。所以得有个补偿机制,消息中间件我了保证数据一致性,会有个超时询问机制,如图:

clipboard.png

所以系统A还需提供一个事务询问的接口,供消息中间件调用。当消息中间件收到一条事务型消息后便开始计时,如果到了超时时间也没收到系统A发来的Commit或Rollback指令的话,就会主动调用系统A提供的事务询问接口询问该系统目前的状态。

看了一下我的理解你并不需要控制前后顺序,为什么B服务要查询一次数据库?直接在A服务把需要插入数据库的内容通过消息的方式传递给B不就可以了吗?

@TransactionalEventListener

或者采取本地事件表的方式,将消息写入本地事件表,由轮询线程去处理投递的问题

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题