spring kafka如何实现消费者消费指定分区?

kafka的原生API可以使用consumer.assign(partitions)来订阅指定分区,spring kafka的API有没有相应的方法?我只找到用@KafkaListener(topicPartitions ={@TopicPartition(topic = "topic1", partitions = { "0", "1" }))}注解实现的,但这种方式实现时topic、partitions的值都必须为常量,但实际上我是需要根据用户id(运行中获取)来确定partition,这样的话@KafkaListener就不能用了,请问有什么解决办法么?

另:如果使用了consumer.assign(partitions)方法,是不是partition.assignment.strategy属性就不生效了?

阅读 15.4k
3 个回答
新手上路,请多包涵

可以在生产者中配置统一配置分区规则

新手上路,请多包涵

遇到了同样的问题,还好是解决了回来分享一下。

可以写表达式,取自https://docs.spring.io/spring-kafka/docs/current/reference/html/#tip-assign-all-parts

@KafkaListener(topicPartitions = @TopicPartition(topic = "compacted",
            partitions = "#{@finder.partitions('compacted')}"),
            partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0")))
public void listen(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, String payload) {
    ...
}

甚至还可以:

spring:
  kafka:
    consumer:
      group-id: foo
    topics: foo
            bar
            baz

with

@KafkaListener(topics = "#{'${spring.kafka.topics}'.split(' ')}")
撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进