spring集成kafka如何配置用户认证的信息?

引入了kafka-clients跟spring-kafka两个包,版本分别是0.11.0.0和1.3.8,网上找了一些spring集成kafka的帖子都没有提及如何加入用户认证,也就是security.protocol和sasl.mechanism以及kafka_client_jaas配置文件,看到网上有用spring boot配置的,但是不知道转换成xml的形式是怎么样的?我用了网上找的配置方法后无论放在consumerProperties还是containerProperties里都会报错,没法设置,不知道是版本的问题还是设置有问题
spring boot代码:

consumer:
bootstrap-servers: 192.168.186.130:9092,192.168.186.131:9092,192.168.186.132:9092
group-id: group-1
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
sasl.mechanism: PLAIN
security.protocol: SASL_PLAINTEXT

我的代码:

<bean id="consumerProperties" class="java.util.HashMap">
        <constructor-arg>
            <map>
                <entry key="bootstrap.servers" value="${bootstrap.servers}"/>
                <entry key="group.id" value="${group.id}"/>
                <entry key="enable.auto.commit" value="true"/>
                <entry key="auto.commit.interval.ms" value="1000"/>
                <entry key="session.timeout.ms" value="15000"/>
                <entry key="max.poll.records" value="10"/>
                <entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
                <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
            </map>
        </constructor-arg>
        <!-- <property name="security.protocol" value="${security.protocol}"/>
        <property name="sasl.mechanism" value="${sasl.mechanism}"/> -->
    </bean>

    <!-- 创建consumerFactory bean -->
    <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
        <constructor-arg>
            <ref bean="consumerProperties"/>
        </constructor-arg>
    </bean>


    <!-- 实际执行消息消费的类 -->
    <bean id="messageListernerConsumerService" class="com.rongdu.zgj.modules.dsc.kafka.KafkaConsumerListener"/>


    <!-- 消费者容器配置信息 -->
    <bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
        <constructor-arg name="topics" value="${topic}"/>        
        <property name="messageListener" ref="messageListernerConsumerService"/>
        
    </bean>


    <!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 -->
    <bean id="messageListenerContainer" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer"
          init-method="doStart">
        <constructor-arg ref="consumerFactory"/>
        <constructor-arg ref="containerProperties"/>
        <property name="concurrency" value="5"/>
    </bean>
阅读 4.5k
撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进