带有 Spring Boot 的简单嵌入式 Kafka 测试示例

新手上路,请多包涵

编辑仅供参考: 工作 gitHub 示例


我在互联网上搜索,找不到嵌入式 Kafka 测试的有效且简单的示例。

我的设置是:

  • 弹簧靴
  • 一个类中具有不同主题的多个 @KafkaListener
  • 用于测试的嵌入式 Kafka 开始正常
  • 使用发送到主题的 Kafka 模板进行测试,但 @KafkaListener 方法即使经过很长的睡眠时间也没有收到任何内容
  • 不显示警告或错误,日志中仅显示来自 Kafka 的垃圾信息

请帮我。大多存在过度配置或过度设计的示例。我相信它可以简单地完成。多谢你们!

 @Controller
public class KafkaController {

    private static final Logger LOG = getLogger(KafkaController.class);

    @KafkaListener(topics = "test.kafka.topic")
    public void receiveDunningHead(final String payload) {
        LOG.debug("Receiving event with payload [{}]", payload);
        //I will do database stuff here which i could check in db for testing
    }
}


私有静态字符串 SENDER_TOPIC = “test.kafka.topic”;

 @ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, SENDER_TOPIC);

@Test
    public void testSend() throws InterruptedException, ExecutionException {

        Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);

        KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);
        producer.send(new ProducerRecord<>(SENDER_TOPIC, 0, 0, "message00")).get();
        producer.send(new ProducerRecord<>(SENDER_TOPIC, 0, 1, "message01")).get();
        producer.send(new ProducerRecord<>(SENDER_TOPIC, 1, 0, "message10")).get();
        Thread.sleep(10000);
    }

原文由 Yuna Braska 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 743
1 个回答

嵌入式 Kafka 测试适用于以下配置,

测试类注释

@EnableKafka
@SpringBootTest(classes = {KafkaController.class}) // Specify @KafkaListener class if its not the same class, or not loaded with test config
@EmbeddedKafka(
    partitions = 1,
    controlledShutdown = false,
    brokerProperties = {
        "listeners=PLAINTEXT://localhost:3333",
        "port=3333"
})
public class KafkaConsumerTest {
    @Autowired
    KafkaEmbedded kafkaEmbeded;

    @Autowired
    KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

设置方法的注释之前

@Before
public void setUp() throws Exception {
  for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
    ContainerTestUtils.waitForAssignment(messageListenerContainer,
    kafkaEmbeded.getPartitionsPerTopic());
  }
}

注意:我没有使用 @ClassRule 来创建嵌入式 Kafka 而不是自动装配

@Autowired embeddedKafka

 @Test
public void testReceive() throws Exception {
     kafkaTemplate.send(topic, data);
}

希望这可以帮助!

编辑:用 @TestConfiguration 标记的测试配置类

@TestConfiguration
public class TestConfig {

@Bean
public ProducerFactory<String, String> producerFactory() {
    return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(kafkaEmbedded));
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
    KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
    kafkaTemplate.setDefaultTopic(topic);
    return kafkaTemplate;
}

现在 @Test 方法将自动装配KafkaTemplate并使用是发送消息

kafkaTemplate.send(topic, data);

用上面的行更新了答案代码块

原文由 donm 发布,翻译遵循 CC BY-SA 4.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进