spring cloud版本:2020.0.2
spring cloud stream 3版本:3.1.2
RabbitMQ 版本:3.7.7
消费者配置
cloud:
stream:
bindings:
topicAction-in-0:
destination: threadsEvent
binders:
defaultRabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
消息的处理方法
@Configuration
public class RabbitMQEventActionConfig {
@Autowired
private TopicActionCollectionService topicActionCollectionService;
private final static Logger logger = LoggerFactory.getLogger(RabbitMQEventActionConfig.class);
//spring-cloud-stream事件消费者
@Bean(name="topicAction")
public Consumer<String> topicAction() {
logger.error("[TE]start consume topic event");
return talBody -> {
TopicActionLog tal = new Gson().fromJson(talBody, TopicActionLog.class);
logger.error("[MDA][Receive]topic action: "+tal.getAction().getTitle()+", operator: "+tal.getMemberNickname());
};
}
消息的生产者:
@Component
public class TopicActionLogMessageProvider {
@Autowired
private StreamBridge streamBridge;
private final static Logger logger = LoggerFactory.getLogger(TopicActionLogMessageProvider.class);
@Override
public void logs(ForumActionEnum action, long topicId, long rodeMember, ActionEventCulpritor culpritor) {
TopicActionLog tal = new TopicActionLog(culpritor.getMemberId(), culpritor.getMemberNickname(), action, topicId, rodeMember, culpritor.getIpAddr(), culpritor.getToken());
String body = new Gson().toJson(tal);
streamBridge.send("topicAction-out-0", body);
logger.error("[MDA][Provider]topic action: "+action.getTitle()+", operator: "+culpritor.getMemberNickname());
}
}
生产者的配置
cloud:
config:
label: master
name: system
profile: dev
stream:
bindings:
topicAction-out-0:
destination: threadsEvent
binders:
defaultRabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
生产者可以连接到rabbitmq.交换机:threadsEvent能创建.消息能发送到交换机上
话题服务(消息的生产者)
java代码:
topicAction-out-0是绑定名称
yaml配置:
回复服务(消息的生产者)
java代码:
postsAction-out-0是绑定名称
yaml配置:
日志服务(消息的消费者)
java代码:
yaml配置:
最后:
spring.cloud.function.definition=topicAction|postsAction
哪么在消息的消费者服务启动完后会创建一个交换机名称:topicActionpostsAction-in-0. 并绑定一个队列,队列的名称是交换机名称后面拼接的anonymous加最后一串值交换机:
队列: