现在有一个场景,会订阅kafka的topic,实时获取消息,之后通过消息内容,反查数据库,获取到一些构建文档必须的字段; 之后消费构建得到的文档;
现在遇到一个比较大的问题就是消费能力跟不上;不知道这种场景下有什么好的解决方案。
这么多年了,最后放弃手撸的伪流处理代码,用了flink,map里翻查数据库,也够用了。不够就加并行度完事。
现在有一个场景,会订阅kafka的topic,实时获取消息,之后通过消息内容,反查数据库,获取到一些构建文档必须的字段; 之后消费构建得到的文档;
现在遇到一个比较大的问题就是消费能力跟不上;不知道这种场景下有什么好的解决方案。
这么多年了,最后放弃手撸的伪流处理代码,用了flink,map里翻查数据库,也够用了。不够就加并行度完事。
本人用python消费kafka也遇到消费能力低下问题。我用的kafka-python, 然后是多进程处理。但是这方案每个进程内都是同步阻塞的。目前了解下来可以用aiokafka 替代实现。另外分享一点。
有两种方法能避免阻塞型调用:
. 在单独的线程中运行各个阻塞版本
. 把每个阻塞型的操作转换成非阻塞型的异步调用
消费端目前用的是多线程么?可以采用多实例部署,每个实例是一个进程,即一个consumer,每个consumer再使用线程池,异步消费