spring kafka 标头中没有类型信息,也没有提供默认类型

新手上路,请多包涵

我有一个 spring boot 应用程序,它定义了:

  • 一个写入 kafka 主题的 REST 控制器,STREAM_TOPIC_IN_QQQ
  • 从 STREAM_TOPIC_IN_QQQ (groupId=“bar”) 和日志中读取的 KafkaListener
  • 一个查看主题并记录它的 KStream,将其转换为另一种类型,然后将其写入 STREAM_TOPIC_OUT_QQQ
  • 另一个从 STREAM_TOPIC_OUT_QQQ 读取的 KafkaListener。

(我一直在更改后缀以避免任何可能的混淆,并手动创建主题,因为否则我会收到警告,STREAM_TOPIC_IN_ xxx =LEADER_NOT_AVAILABLE 并且流不会运行一分钟左右。)

第一个侦听器和流似乎正在工作,但是当 STREAM_OUT_TOPIC 上的侦听器尝试反序列化消息时,我得到下面的异常。我正在使用 Produced.with 在流中提供 serde。我需要做什么才能让侦听器知道要反序列化的类型?

日志

11 Mar 2019 14:34:00,194   DEBUG    [KafkaMessageController [] http-nio-8080-exec-1]   Sending a Kafka Message
11 Mar 2019 14:34:00,236   INFO     [KafkaConfig [] kafka9000-v0.1-b0a60795-0258-48d9-8c87-30fa9a97d7b8-StreamThread-1]   -------------- STREAM_IN_TOPIC peek: Got a greeting in the stream: Hello, World!
11 Mar 2019 14:34:00,241   INFO     [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1]   STREAM_IN_TOPIC Listener: ConsumerRecord: {}ConsumerRecord(topic = STREAM_TOPIC_IN_QQQ, partition = 0, offset = 0, CreateTime = 1552332840188, serialized key size = 1, serialized value size = 34, headers = RecordHeaders(headers = [], isReadOnly = false), key = 1, value = com.teramedica.kafakaex001web.model.Greeting@7b6c8fcc)
11 Mar 2019 14:34:00,243   INFO     [Metadata [] kafka-producer-network-thread | kafka9000-v0.1-b0a60795-0258-48d9-8c87-30fa9a97d7b8-StreamThread-1-producer]   Cluster ID: y48IEZaGQWKcWDVGf4mD6g
11 Mar 2019 14:34:00,367   ERROR    [LoggingErrorHandler [] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]   Error while processing: ConsumerRecord(topic = STREAM_TOPIC_OUT_QQQ, partition = 0, offset = 0, CreateTime = 1552332840188, serialized key size = 1, serialized value size = 48, headers = RecordHeaders(headers = [RecordHeader(key = springDeserializerExceptionValue, value = [ REDACTED ])], isReadOnly = false), key = 1, value = null)
org.springframework.kafka.support.serializer.DeserializationException: failed to deserialize; nested exception is java.lang.IllegalStateException: No type information in headers and no default type provided
    at org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2.deserializationException(ErrorHandlingDeserializer2.java:204) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]

这是配置:

休息(弹簧MVC):

 @RequestMapping("/greeting")
public Greeting greeting(@RequestParam(value = "name", defaultValue = "World") String name) {
    Greeting gr = new Greeting(counter.incrementAndGet(), String.format(msgTemplate, name));
    this.kafkaTemplate.send(K9000Consts.STREAM_TOPIC_IN, "1", gr);
    logger.debug("Sending a Kafka Message");
    return gr;
}

卡夫卡配置(spring-kafka):

 @Bean
public KStream<String, Greeting> kStream(StreamsBuilder kStreamBuilder) {
    KStream<String, Greeting> stream = kStreamBuilder.stream(K9000Consts.STREAM_TOPIC_IN);
    stream.peek((k, greeting) -> {
        logger.info("-------------- STREAM_IN_TOPIC peek: Got a greeting in the stream: {}", greeting.getContent());
    })
          .map((k, v) -> new KeyValue<>(k, new GreetingResponse(v)))
          .to(K9000Consts.STREAM_TOPIC_OUT, Produced.with(stringSerde, new JsonSerde<>(GreetingResponse.class)));
    return stream;
}

@KafkaListener(topics = K9000Consts.STREAM_TOPIC_OUT, groupId="oofda", errorHandler = "myTopicErrorHandler")
public void listenForGreetingResponse(ConsumerRecord<String, GreetingResponse> cr) throws Exception {
    logger.info("STREAM_OUT_TOPIC Listener : {}" + cr.toString());
}

@KafkaListener(topics = K9000Consts.STREAM_TOPIC_IN, groupId = "bar")
public void listenForGreetingResponses(ConsumerRecord<String, Greeting> cr) throws Exception {
    logger.info("STREAM_IN_TOPIC Listener: ConsumerRecord: {}" + cr.toString());
}

应用程序.yml

 spring:
kafka:
  bootstrap-servers:  localhost:9092
  consumer:
    group-id: foo
    auto-offset-reset: latest
    key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
    value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
    properties:
      spring.json.trusted.packages: com.teramedica.kafakaex001web.model
      spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
      spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
  producer:
    key-serializer: org.apache.kafka.common.serialization.StringSerializer
    value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
  streams:
    application-id: kafka9000-v0.1
    properties: # properties not explicitly handled by KafkaProperties.streams
      default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
      default.value.serde: org.springframework.kafka.support.serializer.JsonSerde
      spring.json.trusted.packages: com.teramedica.kafakaex001web.model

原文由 mconner 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 2.1k
1 个回答

请参阅 文档

具体来说…

JsonDeserializer.VALUE_DEFAULT_TYPE :如果不存在标头信息,则用于反序列化值的后备类型。

它是 spring.json.value.default.type

您还可以设置 spring.json.use.type.headers (默认为 true)以防止甚至查找标头。

反序列化程序会自动信任默认类型的包,因此无需在其中添加它。

编辑

但是,另请参阅 Spring Messaging 消息转换

使用 BytesDeserializerBytesJsonMessageConverter 框架会将方法参数类型作为转换目标传递。

原文由 Gary Russell 发布,翻译遵循 CC BY-SA 4.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进