spring cloud stream 3 消费者绑不到交换机上

xiaofanku
  • 32

spring cloud版本:2020.0.2
spring cloud stream 3版本:3.1.2
RabbitMQ 版本:3.7.7

消费者配置

  cloud:
    stream:
      bindings:
        topicAction-in-0:
          destination: threadsEvent
      binders:
        defaultRabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest

消息的处理方法

@Configuration
public class RabbitMQEventActionConfig {
    @Autowired
    private TopicActionCollectionService topicActionCollectionService;
    private final static Logger logger = LoggerFactory.getLogger(RabbitMQEventActionConfig.class);

    //spring-cloud-stream事件消费者
    @Bean(name="topicAction")
    public Consumer<String> topicAction() {
        logger.error("[TE]start consume topic event");
        return talBody -> {
            TopicActionLog tal = new Gson().fromJson(talBody, TopicActionLog.class);
            logger.error("[MDA][Receive]topic action: "+tal.getAction().getTitle()+", operator: "+tal.getMemberNickname());
        };
    }

消息的生产者:

@Component
public class TopicActionLogMessageProvider {
    @Autowired
    private StreamBridge streamBridge;
    private final static Logger logger = LoggerFactory.getLogger(TopicActionLogMessageProvider.class);

    @Override
    public void logs(ForumActionEnum action, long topicId, long rodeMember, ActionEventCulpritor culpritor) {
        TopicActionLog tal = new TopicActionLog(culpritor.getMemberId(), culpritor.getMemberNickname(), action, topicId, rodeMember, culpritor.getIpAddr(), culpritor.getToken());
        String body = new Gson().toJson(tal);
        streamBridge.send("topicAction-out-0", body);
        logger.error("[MDA][Provider]topic action: "+action.getTitle()+", operator: "+culpritor.getMemberNickname());
    }
}

生产者的配置

  cloud:
    config:
      label: master
      name: system
      profile: dev
    stream:
      bindings:
        topicAction-out-0:
          destination: threadsEvent
      binders:
        defaultRabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest

生产者可以连接到rabbitmq.交换机:threadsEvent能创建.消息能发送到交换机上

回复
阅读 489
1 个回答

话题服务(消息的生产者)
java代码:

streamBridge.send("topicAction-out-0", body);

topicAction-out-0是绑定名称

yaml配置:

spring:
 cloud:    
   stream:
      bindings:
        topicAction-out-0:
          destination: threadsEvent
          group: threadsQueue
      binders:
        defaultRabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest

回复服务(消息的生产者)
java代码:

streamBridge.send("postsAction-out-0", body);

postsAction-out-0是绑定名称

yaml配置:

spring:
   cloud:    
    stream:
      bindings:
        postsAction-out-0:
          destination: repliesEvent
          group: repliesQueue
      binders:
        defaultRabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest

日志服务(消息的消费者)
java代码:

@Configuration
public class RabbitMQEventActionConfig {
    @Autowired
    private TopicActionCollectionService topicActionCollectionService;
    private final static Logger logger = LoggerFactory.getLogger(RabbitMQEventActionConfig.class);

    //spring-cloud-stream事件消费者
    @Bean
    public Consumer<String> topicAction() {
        logger.error("[TE]start consume topic event");
        return talBody -> {
            TopicActionLog tal = new Gson().fromJson(talBody, TopicActionLog.class);
            logger.error("[MDA][Receive]topic action: "+tal.getAction().getTitle()+", operator: "+tal.getMemberNickname());
            TopicActionCollection tac = new TopicActionCollection(tal.getMemberNickname(), tal.getMemberId(), tal.getAction(), tal.getTopicId(), tal.getIpAddr(), tal.getToken(), tal.getRodeMember());
            topicActionCollectionService.create(tac);
        };
    }

    @Bean
    public Consumer<String> postsAction(){
        logger.error("[TE]start consume posts event");
        return palBody -> {
            PostsActionLog pal = new Gson().fromJson(palBody, PostsActionLog.class);
            logger.error("[MDA][Receive]posts action: "+pal.getAction().getTitle()+", operator: "+pal.getMemberNickname());
            TopicActionCollection pac = new TopicActionCollection(pal.getMemberNickname(), pal.getMemberId(), pal.getAction(), pal.getTopicId(), pal.getPostsId(), pal.getIpAddr(), pal.getToken(), pal.getRodeMember());
            topicActionCollectionService.create(pac);
        };
    }
}

yaml配置:

  cloud:
    function:
      definition: topicAction;postsAction
    stream:
      bindings:
        topicAction-in-0:
          destination: threadsEvent
          group: threadsQueue
        postsAction-in-0:
          destination: repliesEvent
          group: repliesQueue
      binders:
        defaultRabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest

最后:

  1. 多看文档. 多试验。不要相信网上的各种搜索结果
  2. spring.cloud.function.definition中的管道符号和逗号都是拼接符号. 若消费者的配置中这样写:spring.cloud.function.definition=topicAction|postsAction哪么在消息的消费者服务启动完后会创建一个交换机名称:topicActionpostsAction-in-0. 并绑定一个队列,队列的名称是交换机名称后面拼接的anonymous加最后一串值
  3. 若消息消费者配的不对spring会在消息生产者执行完后创建一个交换机.并不会绑定队列.
  4. 上面的配置消费者服务启来后都可以创建好交换机和队列。rabbitMQ截图如下:
    交换机:
    mq交换机

队列:
mq队列

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
你知道吗?

宣传栏