编辑仅供参考: 工作 gitHub 示例
我在互联网上搜索,找不到嵌入式 Kafka 测试的有效且简单的示例。
我的设置是:
- 弹簧靴
- 一个类中具有不同主题的多个 @KafkaListener
- 用于测试的嵌入式 Kafka 开始正常
- 使用发送到主题的 Kafka 模板进行测试,但 @KafkaListener 方法即使经过很长的睡眠时间也没有收到任何内容
- 不显示警告或错误,日志中仅显示来自 Kafka 的垃圾信息
请帮我。大多存在过度配置或过度设计的示例。我相信它可以简单地完成。多谢你们!
@Controller
public class KafkaController {
private static final Logger LOG = getLogger(KafkaController.class);
@KafkaListener(topics = "test.kafka.topic")
public void receiveDunningHead(final String payload) {
LOG.debug("Receiving event with payload [{}]", payload);
//I will do database stuff here which i could check in db for testing
}
}
私有静态字符串 SENDER_TOPIC = “test.kafka.topic”;
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, SENDER_TOPIC);
@Test
public void testSend() throws InterruptedException, ExecutionException {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);
producer.send(new ProducerRecord<>(SENDER_TOPIC, 0, 0, "message00")).get();
producer.send(new ProducerRecord<>(SENDER_TOPIC, 0, 1, "message01")).get();
producer.send(new ProducerRecord<>(SENDER_TOPIC, 1, 0, "message10")).get();
Thread.sleep(10000);
}
原文由 Yuna Braska 发布,翻译遵循 CC BY-SA 4.0 许可协议
嵌入式 Kafka 测试适用于以下配置,
测试类注释
设置方法的注释之前
注意:我没有使用
@ClassRule
来创建嵌入式 Kafka 而不是自动装配@Autowired embeddedKafka
希望这可以帮助!
编辑:用
@TestConfiguration
标记的测试配置类现在
@Test
方法将自动装配KafkaTemplate并使用是发送消息用上面的行更新了答案代码块