迈克丝

迈克丝 查看完整档案

北京编辑  |  填写毕业院校  |  填写所在公司/组织 1290722901@qq.com 编辑
编辑

一步一步学技术,踏踏实实涨经验,兴趣广泛,广交好友,希望大家多多指正/批评.

个人动态

迈克丝 发布了文章 · 10月9日

SpringBoot 整合 RabbitMQ

准备

1.新建springboot项目
2.添加Spring for Rabbitmq依赖
3.yml中配置rabbitmq信息:

spring:
  rabbitmq:
    host: 192.168.64.140
    username: admin
    password: admin

每个模式创建一个包,包中包含主程序(main方法),生产者,消费者,每测试一个程序启动一个main方法即可.

简单模式

主程序

@SpringBootApplication
public class Main {
    @Autowired
 private Producer p;
 public static void main(String[] args) {
        SpringApplication.run(Main.class,args);
 }
    /*
 springboot项目完整启动完成
 当前对象中需要注入的对象注入完成之后才会执行该方法
 */ @PostConstruct
 public void test(){
        p.send();
 }
    /*
 封装队列参数的对象
 RabbitAutoConfiguraion 自动配置类,会自动发现Queue
 在服务器上定义该队列
 */ @Bean
 public Queue helloworldQueue(){
        //return new Queue("helloworld");//true,false,false 持久,非独占,非自动删除
 return new Queue("helloworld",false);
 }
}

生产者

@Component
public class Producer {
    /*
 AmqpTemplate的实例是在
 RabbitmqAutoConfiguration自动配置类中创建的
 */ @Autowired
 private AmqpTemplate t;
 public void send(){
        t.convertAndSend("helloworld", "Hello World!");
 }
}

消费者

@Component
public class Consumer {
    /*
 消费者自动注册,自动连接服务器,自动开启消息监听
 */ @RabbitListener(queues = "helloworld")
    public void receive(String msg){
        System.out.println("收到:"+msg);
 }
}

工作模式

工作模式还需要实现两点:
1.合理分发:
手动ack:springboot整合rabbitmq默认手动ack,并且自动返回回执
qos=1:yml中配置:spring.rabbitmq.listener.simple.prefetch:1
2.持久化:
队列持久化/消息持久化:springboot整合rabbitmq中都是默认持久的
若是不想要其持久 -->

  • 使用 MessagePostProcessor 前置处理器参数
  • 从消息中获取消息的属性对象
  • 在属性中把 DeliveryMode 设置为非持久化
//如果需要设置消息为非持久化,可以取得消息的属性对象,修改它的deliveryMode属性
t.convertAndSend("task_queue", (Object) s, new MessagePostProcessor() {
    @Override
    public Message postProcessMessage(Message message) throws AmqpException {
        MessageProperties props = message.getMessageProperties();
        props.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
        return message;
    }
});

主程序

@SpringBootApplication
public class Main {
    @Autowired
 private Producer p;
 @PostConstruct
 public void test(){
        new Thread(new Runnable() {
            @Override
 public void run() {
                p.send();
 }
        }).start();
 //new Thread(() -> p.send()).start();
 }
    public static void main(String[] args) {
        SpringApplication.run(Main.class, args);
 }
    @Bean
 public Queue taskQueue(){
        return new Queue("task_queue");//true,false,false
 }
}

生产者

@Component
public class Producer {
    @Autowired
 private AmqpTemplate t;
 public void send(){
        while(true){
            System.out.println("nn输入消息:");
 String msg = new Scanner(System.in).nextLine();
 t.convertAndSend("task_queue", (Object) msg, new MessagePostProcessor() {
                @Override
 public Message postProcessMessage(Message message) throws AmqpException {
                    MessageProperties p = message.getMessageProperties();
 p.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
 return message;
 }
            });
 }
    }
}

消费者

@Component
public class Consumer {
    @RabbitListener(queues = "task_queue")
    public void receive1(String msg){
        System.out.println("消费者1收到:"+msg);
 }
    @RabbitListener(queues = "task_queue")
    public void receive2(String msg){
        System.out.println("消费者2收到:"+msg);
 }
}

发布订阅模式

主程序

创建 FanoutExcnahge 实例, 封装 fanout 类型交换机定义信息.

spring boot 的自动配置类会自动发现交换机实例, 并在 RabbitMQ 服务器中定义该交换机.

@SpringBootApplication
public class Main {
    @Autowired
 private Producer p;
 @PostConstruct
 public void test(){
        new Thread(new Runnable() {
            @Override
 public void run() {
                p.send();
 }
        }).start();
 }
    public static void main(String[] args) {
        SpringApplication.run(Main.class, args);
 }
    //定义fanout交换机:logs
 @Bean
 public FanoutExchange logs(){
        //return new FanoutExchange("logs");//默认true,false
 return new FanoutExchange("logs",false,false);
 }
}

生产者

生产者向指定的交换机 logs 发送数据.

不需要指定队列名或路由键, 即使指定也无效, 因为 fanout 交换机会向所有绑定的队列发送数据, 而不是有选择的发送

@Component
public class Producer {
    @Autowired
 private AmqpTemplate t;
 public void send(){
        while (true){
            System.out.println("输入消息:");
 String msg=new Scanner(System.in).nextLine();
 t.convertAndSend("logs", "",msg);
 }
    }
}

消费者

消费者需要执行以下操作:

  1. 定义随机队列(随机命名,非持久,排他,自动删除)
  2. 定义交换机(可以省略, 已在主程序中定义)
  3. 将队列绑定到交换机

spring boot 通过注解完成以上操作:(注意@RabbitListener注解的嵌套)

@Component
public class Consumer {
    /*
 1.随即队列
 2.指定绑定的交换机
 */ @RabbitListener(bindings = @QueueBinding(
            value = @Queue,//服务器自动取名,false,true,true
 exchange = @Exchange(name = "logs",declare = "false")//declare指定的是一个已定义过的交换机,而不是重新定义
 ))
    public void reveice1(String msg){
        System.out.println("消费者1收到:"+msg);
 }
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue,//服务器自动取名,false,true,true
 exchange = @Exchange(name = "logs",declare = "false")
    ))
    public void reveice2(String msg){
        System.out.println("消费者2收到:"+msg);
 }
}

路由模式

与发布和订阅模式代码类似, 只是做以下三点调整:

  1. 使用 direct 交换机
  2. 队列和交换机绑定时, 设置绑定键
  3. 发送消息时, 指定路由键

主程序

@SpringBootApplication
public class Main {
    @Autowired
 private Producer p;
 @PostConstruct
 public void test(){
        new Thread(new Runnable() {
            @Override
 public void run() {
                p.send();
 }
        }).start();
 }
    public static void main(String[] args) {
        SpringApplication.run(Main.class, args);
 }
    //定义direct交换机:logs
 @Bean
 public DirectExchange logs(){
        //return new FanoutExchange("logs");//默认true,false
 return new DirectExchange("direct_logs",false,false);
 }
}

生产者

@Component
public class Producer {
    @Autowired
 private AmqpTemplate t;
 public void send(){
        while (true){
            System.out.println("输入消息:");
 String msg=new Scanner(System.in).nextLine();
 System.out.println("输入路由键:");
 String key=new Scanner(System.in).nextLine();
 t.convertAndSend("direct_logs", key,msg);
 }
    }
}

消费者

@Component
public class Consumer {
    /*
 1.随即队列
 2.指定绑定的交换机
 */ @RabbitListener(bindings = @QueueBinding(
            value = @Queue,//服务器自动取名,false,true,true
 exchange = @Exchange(name = "direct_logs",declare = "false"),//declare指定的是一个已定义过的交换机,而不是重新定义
 key = "error"
 ))
    public void reveice1(String msg){
        System.out.println("消费者1收到:"+msg);
 }
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue,//服务器自动取名,false,true,true
 exchange = @Exchange(name = "direct_logs",declare = "false"),
 key = {"info","warning","error"}
    ))
    public void reveice2(String msg){
        System.out.println("消费者2收到:"+msg);
 }
}

主题模式

主题模式不过是具有特殊规则的路由模式, 代码与路由模式基本相同, 只做如下调整:

  1. 使用 topic 交换机
  2. 使用特殊的绑定键和路由键规则

主程序

{
    @Autowired
 private Producer p;
 @PostConstruct
 public void test(){
        new Thread(new Runnable() {
            @Override
 public void run() {
                p.send();
 }
        }).start();
 }
    public static void main(String[] args) {
        SpringApplication.run(Main.class, args);
 }
    //定义topic交换机:logs
 @Bean
 public TopicExchange logs(){
        //return new TopicExchange("logs");//默认true,false
 return new TopicExchange("topic_logs",false,false);
 }
}

生产者

@Component
public class Producer {
    @Autowired
 private AmqpTemplate t;
 public void send(){
        while (true){
            System.out.println("输入消息:");
 String msg=new Scanner(System.in).nextLine();
 System.out.println("输入路由键:");
 String key=new Scanner(System.in).nextLine();
 t.convertAndSend("topic_logs", key,msg);
 }
    }
}

消费者

@Component
public class Consumer {
    /*
 1.随即队列
 2.指定绑定的交换机
 */ @RabbitListener(bindings = @QueueBinding(
            value = @Queue,//服务器自动取名,false,true,true
 exchange = @Exchange(name = "topic_logs",declare = "false"),//declare指定的是一个已定义过的交换机,而不是重新定义
 key = "*.orange.*"
 ))
    public void reveice1(String msg){
        System.out.println("消费者1收到:"+msg);
 }
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue,//服务器自动取名,false,true,true
 exchange = @Exchange(name = "topic_logs",declare = "false"),
 key = {"*.*.rabbit","lazy.#"}
    ))
    public void reveice2(String msg){
        System.out.println("消费者2收到:"+msg);
 }
}
查看原文

赞 0 收藏 0 评论 0

迈克丝 发布了文章 · 10月9日

链路追踪 sleuth 链路分析 zipkin

简介

随着系统规模越来越大,微服务之间调用关系变得错综复杂,一条调用链路中可能调用多个微服务,任何一个微服务不可用都可能造整个调用过程失败

sleuth

spring cloud sleuth 可以跟踪调用链路,分析链路中每个节点的执行情况

添加依赖

只需要添加 sleuth 依赖,不需要其他配置

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-sleuth</artifactId>
</dependency>

按顺序启动项目

先启动erueka/config配置中心 --> 在启动其他业务项目
可以在日志中看到:[服务id,链路总id,span id(每一步服务id),是否发送到zipkin]

zipkin

zipkin 可以收集链路跟踪数据,提供可视化的链路分析

下载并启动zipkin服务

启动命令:java -jar zipkin-server-2.12.9-exec.jar --zipkin.collector.rabbitmq.uri=amqp://admin:admin@192.168.64.140:5672

通过该路径:* http://localhost:9411/zipkin检查情况,能否看到界面

添加依赖

添加启步依赖:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-zipkin</artifactId>
</dependency>

yml添加rabbitmq连接信息

在config配置中心修改:`spring.rabbitmq.host/port/username/password
spring.zipkin.sender.type:rabbit`

查看原文

赞 0 收藏 0 评论 0

迈克丝 发布了文章 · 9月30日

RabbitMQ 05 主题模式/RPC模式

5.主题模式

在路由模式中,使用Direct交换机,从而可以选择性接收日志。

虽然使用Direct交换机改进了我们的系统,但它不能基于多个标准进行路由。

这是缺乏灵活性的,要在日志系统中实现这一点,我们需要了解更复杂的Topic交换机。

主题交换机 Topic exchange

发送到Topic交换机的消息,它的的routingKey,必须是由点分隔的多个单词。单词可以是任何东西,但通常是与消息相关的一些特性。

routingKey可以有任意多的单词,最多255个字节。

bindingKey也必须采用相同的形式。Topic交换机的逻辑与直连交换机类似——使用特定routingKey发送的消息将被传递到所有使用匹配bindingKey绑定的队列。bindingKey有两个重要的特殊点:

  • * 可以通配单个单词。
  • # 可以通配零个或多个单词。

如图所示:
image

如上图中,将routingKey设置为"quick.orange.rabbit"的消息将被发送到两个队列。消息 "lazy.orange.elephant“也发送到它们两个。另外”quick.orange.fox“只会发到第一个队列,”lazy.brown.fox“只发给第二个。”lazy.pink.rabbit“将只被传递到第二个队列一次,即使它匹配两个绑定。”quick.brown.fox"不匹配任何绑定,因此将被丢弃。

生产者

public class Test1 {
    public static void main(String[] args) throws Exception {
        ConnectionFactory f = new ConnectionFactory();
        f.setHost("192.168.64.140");
        f.setPort(5672);
        f.setUsername("admin");
        f.setPassword("admin");
        
        Connection c = f.newConnection();
        Channel ch = c.createChannel();
        
        //参数1: 交换机名
        //参数2: 交换机类型
        ch.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);
        
        while (true) {
            System.out.print("输入消息: ");
            String msg = new Scanner(System.in).nextLine();
            if ("exit".contentEquals(msg)) {
                break;
            }
            System.out.print("输入routingKey: ");
            String routingKey = new Scanner(System.in).nextLine();
            
            //参数1: 交换机名
            //参数2: routingKey, 路由键,这里我们用日志级别,如"error","info","warning"
            //参数3: 其他配置属性
            //参数4: 发布的消息数据 
            ch.basicPublish("topic_logs", routingKey, null, msg.getBytes());
            
            System.out.println("消息已发送: "+routingKey+" - "+msg);
        }

        c.close();
    }
}

消费者

public class Test2 {
    public static void main(String[] args) throws Exception {
        ConnectionFactory f = new ConnectionFactory();
        f.setHost("192.168.64.140");
        f.setUsername("admin");
        f.setPassword("admin");
        Connection c = f.newConnection();
        Channel ch = c.createChannel();
        
        ch.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);
        
        //自动生成对列名,
        //非持久,独占,自动删除
        String queueName = ch.queueDeclare().getQueue();
        
        System.out.println("输入bindingKey,用空格隔开:");
        String[] a = new Scanner(System.in).nextLine().split("\\s");
        
        //把该队列,绑定到 topic_logs 交换机
        //允许使用多个 bindingKey
        for (String bindingKey : a) {
            ch.queueBind(queueName, "topic_logs", bindingKey);
        }
        
        System.out.println("等待接收数据");
        
        //收到消息后用来处理消息的回调对象
        DeliverCallback callback = new DeliverCallback() {
            @Override
            public void handle(String consumerTag, Delivery message) throws IOException {
                String msg = new String(message.getBody(), "UTF-8");
                String routingKey = message.getEnvelope().getRoutingKey();
                System.out.println("收到: "+routingKey+" - "+msg);
            }
        };
        
        //消费者取消时的回调对象
        CancelCallback cancel = new CancelCallback() {
            @Override
            public void handle(String consumerTag) throws IOException {
            }
        };
        
        ch.basicConsume(queueName, true, callback, cancel);
    }
}

6.RPC模式

该模式不常使用,也较为复杂,了解即可.
RPC模式顾名思义也就是远程调用模式.

RabbitMQ去搭建一个RPC系统:一个客户端和一个可以升级(扩展)的RPC服务器

image

总结:
RPC的工作方式是这样的:

  • 对于RPC请求,客户端发送一条带有两个属性的消息:replyTo,设置为仅为请求创建的匿名独占队列,和correlationId,设置为每个请求的惟一id值。
  • 请求被发送到rpc_queue队列。
  • RPC工作进程(即:服务器)在队列上等待请求。当一个请求出现时,它执行任务,并使用replyTo字段中的队列将结果发回客户机。
  • 客户机在回应消息队列上等待数据。当消息出现时,它检查correlationId属性。如果匹配请求中的值,则向程序返回该响应数据。
查看原文

赞 0 收藏 0 评论 0

迈克丝 发布了文章 · 9月30日

Config 配置中心

创建存放配置文件的 git 仓库

  1. 新建模块: config

    当做一个空文件夹来使用

  2. 复制 2,3,4,11 项目的 application.yml 到 config 目录并改名

    • item-service-dev.yml
    • user-service-dev.yml
    • order-service-dev.yml
    • zuul-service-dev.yml
在dev.yml中添加spring.cloud.config.override-none:true
  1. 提交推送到远程仓库
  2. 最后,清空四个项目中的application.yml文件

dev指的是profile名,一般可写dev(开发)/test(测试)/prod(生产)

搭建配置中心config服务器

config配置中心从git仓库下载所有的配置文件,而其他微服务从config配置中心获取

  1. 创建config配置中心项目
  2. 添加config server/eureka client依赖
  3. 配置application.yml:application.name/git.uri:配置仓库路径/git.searchPaths:config/server.port/eureka.defaultZone
  4. 启动类添加注解:@EnableConfigServer

配置中心的客户端

修改2,3,4,11项目

  1. 添加 config client 依赖
  2. 新建配置文件 bootstrap.yml

    bootstrap.yml在springboot项目启动时,执行引导(初始化)操作,然后才加载 application.yml

  3. 配置

    • eureka
    • 配置中心的服务id
    • 下载哪个配置文件
查看原文

赞 0 收藏 0 评论 0

迈克丝 发布了文章 · 9月30日

Feign/Zuul为何不启用Hystrix/Ribbon重试

Feign不推荐启用Hystrix

  • Feign

    • 部署位置: 业务服务之间调用
    • 添加Hystrix会造成混乱,故障点难以确认

Zuul不推荐启用Ribbon重试

  • Zuul

    • 部署位置: 在最前面作为入口
    • zuul如果启用重试,可能造成后台多台服务器压力过大

image

查看原文

赞 0 收藏 0 评论 0

迈克丝 发布了文章 · 9月30日

Zuul

作用

API网关

  • 后台服务统一的调用入口
  • 转发的路由规则如下:
zuul:
 routes:
 # 远程服务: 调用路径
 # 下面配置的是默认规则,不配置也可以自动配置
 # 如果不配置,可以根据注册表自动配置

实现步骤

1.添加依赖:Zuul/eureka client
2.配置application.yml:application.name/server.port/zuul.route.服务

spring:
  application:
    name: zuul
    
server:
  port: 3001
  
eureka:
  client:
    service-url:
      defaultZone: http://eureka1:2001/eureka, http://eureka2:2002/eureka

zuul:
  routes:
    item-service: /item-service/**
    user-service: /user-service/**
    order-service: /order-service/**

3.启动类添加注解@EnableZuulProxy

@EnableZuulProxy
@SpringBootApplication
public class Sp11ZuulApplication {
    public static void main(String[] args) {
        SpringApplication.run(Sp11ZuulApplication.class, args);
    }
}

统一的权限校验

定义过滤器

通过继承 ZuulFilter 过滤器,来实现权限判断
没登陆就阻止访问
已登录就允许访问--http://.../...?token=...

@Component
public class AccessFilter extends ZuulFilter{
    //是否执行过滤代码
    @Override
    public boolean shouldFilter() {
        //对指定的serviceid过滤,如果要过滤所有服务,直接返回 true
        
        RequestContext ctx = RequestContext.getCurrentContext();
        String serviceId = (String) ctx.get(FilterConstants.SERVICE_ID_KEY);
        if(serviceId.equals("item-service")) {
            return true;
        }
        return false;
    }

    //过滤代码
    @Override
    public Object run() throws ZuulException {
        RequestContext ctx = RequestContext.getCurrentContext();
        HttpServletRequest req = ctx.getRequest();
        String token = req.getParameter("token");
        if (token == null) {
            //此设置会阻止请求被路由到后台微服务
            ctx.setSendZuulResponse(false);
            //向客户端的响应
            ctx.setResponseStatusCode(200);
            ctx.setResponseBody(JsonResult.err().code(JsonResult.NOT_LOGIN).toString());
        }
        //zuul过滤器返回的数据设计为以后扩展使用,
        //目前该返回值没有被使用
        return null;
    }

    //过滤器类型
    @Override
    public String filterType() {
        return FilterConstants.PRE_TYPE;
    }

    //过滤器顺序号
    @Override
    public int filterOrder() {
        //该过滤器顺序要 > 5,才能得到 serviceid
        return FilterConstants.PRE_DECORATION_FILTER_ORDER+1;
    }
}

zuul 集成 ribbon

默认启用了负载均衡,但没有启用重试

启用重试

  1. 添加 spring-retry 依赖
  2. zuul.retryable=true
  3. 有默认重试参数(无需配置重试参数)

zuul + hystrix 降级

创建降级类

  • getRoute() 方法中指定应用此降级类的服务id,星号或null值可以通配所有服务
@Slf4j
@Component
public class ItemServiceFallback implements FallbackProvider {
    @Override
    public String getRoute() {
        //当执行item-service失败,
        //应用当前这个降级类
        return "item-service";
        //星号和null都表示所有微服务失败都应用当前降级类
        //"*"; //null;
    }

    //该方法返回封装降级响应的对象
    //ClientHttpResponse中封装降级响应
    @Override
    public ClientHttpResponse fallbackResponse(String route, Throwable cause) {
        return response();
    }

    private ClientHttpResponse response() {
        return new ClientHttpResponse() {
            //下面三个方法都是协议号
            @Override
            public HttpStatus getStatusCode() throws IOException {
                return HttpStatus.OK;
            }
            @Override
            public int getRawStatusCode() throws IOException {
                return HttpStatus.OK.value();
            }
            @Override
            public String getStatusText() throws IOException {
                return HttpStatus.OK.getReasonPhrase();
            }

            @Override
            public void close() {
            }

            @Override
            public InputStream getBody() throws IOException {
                log.info("fallback body");
                String s = JsonResult.err().msg("后台服务错误").toString();
                return new ByteArrayInputStream(s.getBytes("UTF-8"));
            }

            @Override
            public HttpHeaders getHeaders() {
                HttpHeaders headers = new HttpHeaders();
                headers.setContentType(MediaType.APPLICATION_JSON);
                return headers;
            }
        };
    }
}
查看原文

赞 0 收藏 0 评论 0

迈克丝 发布了文章 · 9月28日

turbine

turbine

turbine起到监控数据聚合的作用
可以把多台服务器监控数据聚合起来,一起提供给hystrix仪表盘展现

搭建turbine步骤

1.添加依赖
turbine/eureka client
2.配置yml
application.yml添加:

spring:
  application:
    name: turbin
    
server:
  port: 5001
  
eureka:
  client:
    service-url:
      defaultZone: http://eureka1:2001/eureka, http://eureka2:2002/eureka
      
turbine:
  app-config: order-service
  cluster-name-expression: new String("default")

2.启动类
启动类添加注解@EnableTurbine

@EnableTurbine
@SpringBootApplication
public class Sp10TurbineApplication {

    public static void main(String[] args) {
        SpringApplication.run(Sp10TurbineApplication.class, args);
    }
}
查看原文

赞 0 收藏 0 评论 0

迈克丝 发布了文章 · 9月28日

RabbitMQ 04 订阅模式/路由模式

rabbitmq六种工作模式

3.发布订阅模式

即向多个消费者传递同一条信息
image

1).Exchanges 交换机

RabbitMQ消息传递模型的核心思想是,生产者永远不会将任何消息直接发送到队列。

相反,生产者只能向交换机(Exchange)发送消息。交换机是一个非常简单的东西。一边接收来自生产者的消息,另一边将消息推送到队列。交换器必须确切地知道如何处理它接收到的消息。它应该被添加到一个特定的队列中吗?它应该添加到多个队列中吗?或者它应该被丢弃。这些规则由exchange的类型定义。

有几种可用的交换类型:direct、topic、header和fanout
创建fanout交换机logs: c.exchangeDeclare("logs", "fanout");
c.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);

fanout交换机非常简单。它只是将接收到的所有消息广播给它所知道的所有队列。

2).绑定 Bindings

image

创建了一个fanout交换机和一个队列。现在我们需要告诉exchange向指定队列发送消息。exchange和队列之间的关系称为绑定。

//指定的队列,与指定的交换机关联起来
//称为绑定 -- binding
//第三个参数时 routingKey, 由于是fanout交换机, 这里忽略 routingKey
ch.queueBind(queueName, "logs", "");

3).整体代码

1.生产者
最重要的更改是,我们现在希望将消息发布到logs交换机,而不是无名的日志交换机。我们需要在发送时提供一个routingKey,但是对于fanout交换机类型,该值会被忽略。

public class Producer {
    public static void main(String[] args) throws Exception {
        //建立连接
 ConnectionFactory f = new ConnectionFactory();
 f.setHost("192.168.64.140");
 f.setPort(5672);
 f.setUsername("admin");
 f.setPassword("admin");
 Connection con = f.newConnection();
 Channel c = con.createChannel();
 //定义fanout类型交换机:logs
 //c.exchangeDeclare("logs", "fanout"); c.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);
 //向交换机发送信息
 while (true){
            System.out.println("输入消息:");
 String msg = new Scanner(System.in).nextLine();
 c.basicPublish("logs",
 "",
 null, msg.getBytes());
 }
    }
}

2.消费者
如果还没有队列绑定到交换器,消息就会丢失,但这对我们来说没有问题;如果还没有消费者在听,我们可以安全地丢弃这些信息。

public class Consumer {
    public static void main(String[] args) throws Exception {
        //建立连接
 ConnectionFactory f = new ConnectionFactory();
 f.setHost("192.168.64.140");
 f.setPort(5672);
 f.setUsername("admin");
 f.setPassword("admin");
 Connection con = f.newConnection();
 Channel c = con.createChannel();
 //1.定义随机队列 2.定义交换机 3.绑定
 //随机命名,非持久,独占,自动删除
 String queue = UUID.randomUUID().toString();
 c.queueDeclare(queue,
 false, true, true, null);
 c.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);
 //第三个参数对发布订阅模式fanout交换机无效
 c.queueBind(queue, "logs", "");
 DeliverCallback deliverCallback = new DeliverCallback() {
            @Override
 public void handle(String consumerTag, Delivery message) throws IOException {
                String msg = new String(message.getBody());
 System.out.println("收到:"+msg);
 }
        };
 CancelCallback cancelCallback = new CancelCallback() {
            @Override
 public void handle(String consumerTag) throws IOException {
            }
        };
 //正常的消费数据
 c.basicConsume(queue,
 true, deliverCallback,
 cancelCallback);
 }
}

4.路由模式

路由模式与订阅模式不同之处在于,我们将向其添加一个特性—我们将只订阅所有消息中的一部分.本文中已添加err/info/warning等报错提示来示范.
image

1).绑定 Bindings

绑定是交换机和队列之间的关系。这可以简单地理解为:队列对来自此交换的消息感兴趣。

绑定可以使用额外的routingKey参数。为了避免与basic_publish参数混淆,我们将其称为bindingKey。这是我们如何创建一个键绑定:

ch.queueBind(queueName, EXCHANGE_NAME, "black");

bindingKey的含义取决于交换机类型。我们前面使用的fanout交换机完全忽略它。

2).直连交换机 Direct exchange

上一节中的日志系统向所有消费者广播所有消息。我们希望扩展它,允许根据消息的严重性过滤消息。

前面我们使用的是fanout交换机,这并没有给我们太多的灵活性——它只能进行简单的广播。

我们将用直连交换机(Direct exchange)代替。它背后的路由算法很简单——消息传递到bindingKey与routingKey完全匹配的队列。

3).多重绑定 Multiple bindings

使用相同的bindingKey绑定多个队列是完全允许的。可以使用binding key "black"将X与Q1和Q2绑定。在这种情况下,直连交换机的行为类似于fanout,并将消息广播给所有匹配的队列。一条路由键为black的消息将同时发送到Q1和Q2。

4).更改

1.发送消息
我们将提供日志级别作为routingKey,这样,接收程序将能够选择它希望接收的级别

//参数1: 交换机名
//参数2: routingKey, 路由键,这里我们用日志级别,如"error","info","warning"
//参数3: 其他配置属性
//参数4: 发布的消息数据 
ch.basicPublish("direct_logs", "error", null, message.getBytes());

2.接收消息
我们将为感兴趣的每个日志级别创建一个新的绑定

ch.queueBind(queueName, "logs", "info");
ch.queueBind(queueName, "logs", "warning");

5).完整代码

1.生产者

public class Producer {
    public static void main(String[] args) throws Exception {
        //建立连接
 ConnectionFactory f = new ConnectionFactory();
 f.setHost("192.168.64.140");
 f.setPort(5672);
 f.setUsername("admin");
 f.setPassword("admin");
 Connection con = f.newConnection();
 Channel c = con.createChannel();
 //定义fanout类型交换机:logs
 //c.exchangeDeclare("logs", "fanout"); c.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);
 //向交换机发送信息
 while (true){
            System.out.println("输入消息:");
 String msg = new Scanner(System.in).nextLine();
 System.out.println("输入路由键:");
 String key = new Scanner(System.in).nextLine();
 c.basicPublish("direct_logs",
 key, //路由键关键词
 null,
 msg.getBytes());
 }
    }
}

2.消费者

public class Consumer {
    public static void main(String[] args) throws Exception {
        //建立连接
 ConnectionFactory f = new ConnectionFactory();
 f.setHost("192.168.64.140");
 f.setPort(5672);
 f.setUsername("admin");
 f.setPassword("admin");
 Connection con = f.newConnection();
 Channel c = con.createChannel();
 //1.定义随机队列 2.定义交换机 3.绑定
 //随机命名,非持久,独占,自动删除
 String queue = UUID.randomUUID().toString();
 c.queueDeclare(queue,
 false, true, true, null);
 c.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);
 //用输入绑定键进行绑定
 System.out.println("输入绑定键,用空格隔开:");
 String s = new Scanner(System.in).nextLine();
 String[] a = s.split(" "); //["aaa","bbb","ccc"]
 for (String key:a){
            c.queueBind(queue, "direct_logs", key);
 }
        DeliverCallback deliverCallback = new DeliverCallback() {
            @Override
 public void handle(String consumerTag, Delivery message) throws IOException {
                String msg = new String(message.getBody());
 String key = message.getEnvelope().getRoutingKey();
 System.out.println("收到:"+msg+" - "+key);
 }
        };
 CancelCallback cancelCallback = new CancelCallback() {
            @Override
 public void handle(String consumerTag) throws IOException {
            }
        };
 //正常的消费数据
 c.basicConsume(queue,
 true, deliverCallback,
 cancelCallback);
 }
}
查看原文

赞 1 收藏 1 评论 0

迈克丝 发布了文章 · 9月27日

Feign

Feign

微服务应用中,ribbon 和 hystrix 总是同时出现,feign 整合了两者,并提供了声明式消费者客户端

总的来说就是集成工具,集成了

  • 远程调用
  • ribbon
  • hystrix

声明式的客户端

只需要声明一个抽象的接口,就可以通过接口方法调用远程服务

// 调用商品服务的声明式客户端接口
// 需要配置三条:调用哪个服务,调用这个服务的哪个路径,向这个路径提交什么参数
@FeignClient(name="item-service")//服务id
public interface ItemFeignClient {
    @GetMapping("/{orderId}")//请求路径
    JsonResult<List<Item>> getItems(@PathVariable String orderId);//传递参数
}
feign 利用了我们熟悉的 spring mvc 注解来对接口方法进行设置,方便我们使用

实现

创建feign项目 -->
添加依赖actuator/eureka client/feign/spring web -->
application.yml添加

spring: application: name: feign 
server: port: 3001 
eureka: client: service-url: defaultZone: http://eureka1:2001/eureka, http://eureka2:2002/eureka

-->
主程序添加@EnableFeignClients -->
按业务添加声明式客户端:

@FeignClient("item-service")
public interface ItemFeignService {
    @GetMapping("/{orderId}")
    JsonResult<List<Item>> getItems(@PathVariable String orderId);
    @PostMapping("/decreaseNumber")
    JsonResult decreaseNumber(@RequestBody List<Item> items);
    }
@FeignClient("user-service")
public interface UserFeignService {
    @GetMapping("/{userId}")
    JsonResult<User> getUser(@PathVariable Integer userId);
    // 拼接路径 /{userId}/score?score=新增积分
    @GetMapping("/{userId}/score") 
    JsonResult addScore(@PathVariable Integer userId, @RequestParam Integer score);
    }

注意:由于@GetMapping等注解是由feign去解析,而不是springmvc解析,所以可能会因为环境原因无法拿到请求所传的参数,需要加上@RequestParam注解修饰参数,另外如果请求参数名与方法参数名不同,@RequestParam不能省略,并且要指定请求参数名:@RequestParam("score") Integer s -->
添加controller类,注入声明式客户端,将请求一一接受

@FeignClient("order-service")
public interface OrderFeignService {
    @GetMapping("/{orderId}")
    JsonResult<Order> getOrder(@PathVariable String orderId);

    @GetMapping("/")
    JsonResult addOrder();
}
@RestController
public class FeignController {
    @Autowired
    private ItemFeignService itemService;
    @Autowired
    private UserFeignService userService;
    @Autowired
    private OrderFeignService orderService;
    @GetMapping("/item-service/{orderId}")
    public JsonResult<List<Item>> getItems(@PathVariable String orderId) {
        return itemService.getItems(orderId);
    }
    @PostMapping("/item-service/decreaseNumber")
    public JsonResult decreaseNumber(@RequestBody List<Item> items) {
        return itemService.decreaseNumber(items);
    }
    @GetMapping("/user-service/{userId}")
    public JsonResult<User> getUser(@PathVariable Integer userId) {
        return userService.getUser(userId);
    }
    @GetMapping("/user-service/{userId}/score") 
    public JsonResult addScore(@PathVariable Integer userId, Integer score) {
        return userService.addScore(userId, score);
    }
    @GetMapping("/order-service/{orderId}")
    public JsonResult<Order> getOrder(@PathVariable String orderId) {
        return orderService.getOrder(orderId);
    }
    @GetMapping("/order-service")
    public JsonResult addOrder() {
        return orderService.addOrder();
    }
 }

feign整合ribbon

  • 无需额外配置,feign 默认已启用了 ribbon 负载均衡和重试机制。可以通过配置对参数进行调整

重试的默认配置参数:

ConnectTimeout=1000
ReadTimeout=1000
MaxAutoRetries=0
MaxAutoRetriesNextServer=1

虽然一般无需额外配置,但若是需要配置的话,可如下分别配置全局和局部:
application.yml 配置 ribbon 超时和重试

  • ribbon.xxx 全局配置
  • item-service.ribbon.xxx 对特定服务实例的配置

Feign 集成 Hystrix,添加降级代码

feign 默认没有启用 hystrix,添加配置,启用 hystrix

基础配置

  1. 添加完整hystrix依赖
  2. feign.hystrix.enabled=true
  3. 启动类上加@EnableCircuitBreaker注解

添加降级代码

  1. @FeignClient(name="服务id", fallback=降级类.class)
  2. 降级类需要实现声明式客户端接口,实现它的抽象方法
  3. 添加 @Component

例如下列代码:

@Component
public class ItemFeignServiceFB implements ItemFeignService {
    @Override
    public JsonResult<List<Item>> getItems(String orderId) {
        return JsonResult.err("无法获取订单商品列表");
    }

    @Override
    public JsonResult decreaseNumber(List<Item> items) {
        return JsonResult.err("无法修改商品库存");
    }
}

暴露监控端点

  1. 添加 actuator 依赖
  2. 暴露 hystrix.stream 监控端点
    m.e.w.e.i=hystrix.stream
management:
  endpoints:
    web:
      exposure:
        include: hystrix.stream

在order订单中添加feign调用item/user

1.添加依赖

右键点击项目编辑起步依赖,添加以下依赖:

  • actuator
  • feign
  • hystrix
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.1.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>cn.tedu</groupId>
    <artifactId>sp04-orderservice</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>sp04-orderservice</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>cn.tedu</groupId>
            <artifactId>sp01-commons</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>
                spring-cloud-starter-netflix-eureka-client
            </artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>
                spring-cloud-starter-netflix-hystrix
            </artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-openfeign</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>Hoxton.RELEASE</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
</project>

2.基本配置

application.yml配置:
ribbon 重试和 hystrix 超时这里没有设置,采用了默认值

spring:
  application:
    name: order-service
    
server:
  port: 8201  
  
eureka:
  client:
    service-url:
      defaultZone: http://eureka1:2001/eureka, http://eureka2:2002/eureka
      
feign:
  hystrix:
    enabled: true
    
management:
  endpoints:
    web:
      exposure:
        include: hystrix.stream

启动类配置注解:

@EnableCircuitBreaker//hystrix
@EnableFeignClients//feign
@SpringBootApplication
public class OrderserviceApplication {
    public static void main(String[] args) {
        SpringApplication.run(Sp04OrderserviceApplication.class, args);
 }

3.定义声明式接口

定义item/user声明式接口供order调用
@FeignClient(name="",fallback=)/@GetMapping/参数列表(@PathVariable/@RequestBody)

@FeignClient(name="item-service", fallback = ItemFeignServiceFB.class)
public interface ItemFeignService {
    @GetMapping("/{orderId}")
    JsonResult<List<Item>> getItems(@PathVariable String orderId);

    @PostMapping("/decreaseNumber")
    JsonResult decreaseNumber(@RequestBody List<Item> items);
}
@FeignClient(name="user-service", fallback = UserFeignServiceFB.class)
public interface UserFeignService {
    @GetMapping("/{userId}")
    JsonResult<User> getUser(@PathVariable Integer userId);

    @GetMapping("/{userId}/score") 
    JsonResult addScore(@PathVariable Integer userId, @RequestParam Integer score);
}

4.定义降级方法

仅作模拟,模拟使用缓存数据
@Component/实现重写方法

@Component
public class ItemFeignServiceFB implements ItemFeignService {

    @Override
    public JsonResult<List<Item>> getItems(String orderId) {
        if(Math.random()<0.5) {
            return JsonResult.ok().data(
            
                Arrays.asList(new Item[] {
                        new Item(1,"缓存aaa",2),
                        new Item(2,"缓存bbb",1),
                        new Item(3,"缓存ccc",3),
                        new Item(4,"缓存ddd",1),
                        new Item(5,"缓存eee",5)
                })
            
            );
        }
        return JsonResult.err("无法获取订单商品列表");
    }

    @Override
    public JsonResult decreaseNumber(List<Item> items) {
        return JsonResult.err("无法修改商品库存");
    }
}
@Component
public class UserFeignServiceFB implements UserFeignService {

    @Override
    public JsonResult<User> getUser(Integer userId) {
        if(Math.random()<0.4) {
            return JsonResult.ok(new User(userId, "缓存name"+userId, "缓存pwd"+userId));
        }
        return JsonResult.err("无法获取用户信息");
    }

    @Override
    public JsonResult addScore(Integer userId, Integer score) {
        return JsonResult.err("无法增加用户积分");
    }
}

5.orderServiceImpl完成远程调用

@Service
@Slf4j
public class OrderServiceImpl implements OrderService {
    @Autowired
     private ItemClient itemClient;
     @Autowired
     private UserClient userClient;
     @Override
     public Order getOrder(String orderId) {
     //调用user-service获取用户信息 user 
     JsonResult<User> user = userClient.getUser(7);
     //调用item-service获取商品信息 items 
     JsonResult<List<Item>> items = itemClient.getItems(orderId);
     Order order = new Order();
     order.setId(orderId);
     order.setUser(user.getData());
     order.setItems(items.getData());
     return order;
     }
    @Override
     public void addOrder(Order order) {
            //调用item-service减少商品库存
     itemClient.decreaseNumber(order.getItems());
     //调用user-service增加用户积分
     userClient.addScore(order.getUser().getId(), 100);
     log.info("保存订单:"+order);
     }
 }
查看原文

赞 0 收藏 0 评论 0

迈克丝 发布了文章 · 9月25日

RabbitMQ 03 简单模式/工作模式

rabbitmq六种工作模式

1.简单模式

RabbitMQ是一个消息中间件,你可以想象它是一个邮局。当你把信件放到邮箱里时,能够确信邮递员会正确地递送你的信件。RabbitMq就是一个邮箱、一个邮局和一个邮递员。

  • 发送消息的程序是生产者
  • 队列就代表一个邮箱。虽然消息会流经RbbitMQ和你的应用程序,但消息只能被存储在队列里。队列存储空间只受服务器内存和磁盘限制,它本质上是一个大的消息缓冲区。多个生产者可以向同一个队列发送消息,多个消费者也可以从同一个队列接收消息.
  • 消费者等待从队列接收消息

简单模式

1).pom.xml

添加 slf4j 依赖, 和 rabbitmq依赖

<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.tedu</groupId>
    <artifactId>rabbitmq</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.4.3</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.8.0-alpha2</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.8.0-alpha2</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

2).生产者发送消息

public class Test1 {
    public static void main(String[] args) throws Exception {
        //创建连接工厂,并设置连接信息
        ConnectionFactory f = new ConnectionFactory();
        f.setHost("192.168.64.140");
        f.setPort(5672);//可选,5672是默认端口
        f.setUsername("admin");
        f.setPassword("admin");

        /*
         * 与rabbitmq服务器建立连接,
         * rabbitmq服务器端使用的是nio,会复用tcp连接,
         * 并开辟多个信道与客户端通信
         * 以减轻服务器端建立连接的开销
         */
        Connection c = f.newConnection();
        //建立信道
        Channel ch = c.createChannel();

        /*
         * 声明队列,会在rabbitmq中创建一个队列
         * 如果已经创建过该队列,就不能再使用其他参数来创建
         * 
         * 参数含义:
         *   -queue: 队列名称
         *   -durable: 队列持久化,true表示RabbitMQ重启后队列仍存在
         *   -exclusive: 排他,true表示限制仅当前连接可用
         *   -autoDelete: 当最后一个消费者断开后,是否删除队列
         *   -arguments: 其他参数
         */
        ch.queueDeclare("helloworld", false,false,false,null);

        /*
         * 发布消息
         * 这里把消息向默认交换机发送.
         * 默认交换机隐含与所有队列绑定,routing key即为队列名称
         * 
         * 参数含义:
         *     -exchange: 交换机名称,空串表示默认交换机"(AMQP default)",不能用 null 
         *     -routingKey: 对于默认交换机,路由键就是目标队列名称
         *     -props: 其他参数,例如头信息
         *     -body: 消息内容byte[]数组
         */
        ch.basicPublish("", "helloworld", null, "Hello world!".getBytes());

        System.out.println("消息已发送");
        c.close();
    }
}

3).消费者接收队列

public class Test2 {
    public static void main(String[] args) throws Exception {
        //连接工厂
        ConnectionFactory f = new ConnectionFactory();
        f.setHost("192.168.64.140");
        f.setUsername("admin");
        f.setPassword("admin");
        //建立连接
        Connection c = f.newConnection();
        //建立信道
        Channel ch = c.createChannel();
        //声明队列,如果该队列已经创建过,则不会重复创建
        ch.queueDeclare("helloworld",false,false,false,null);
        System.out.println("等待接收数据");
        
        //收到消息后用来处理消息的回调对象
        DeliverCallback callback = new DeliverCallback() {
            @Override
            public void handle(String consumerTag, Delivery message) throws IOException {
                String msg = new String(message.getBody(), "UTF-8");
                System.out.println("收到: "+msg);
            }
        };
        
        //消费者取消时的回调对象
        CancelCallback cancel = new CancelCallback() {
            @Override
            public void handle(String consumerTag) throws IOException {
            }
        };
        
        ch.basicConsume("helloworld", true, callback, cancel);
    }
}

2.工作模式

image
工作队列(即任务队列)背后的主要思想是避免立即执行资源密集型任务,并且必须等待它完成。相反,我们将任务安排在稍后完成。

我们将任务封装为消息并将其发送到队列。后台运行的工作进程将获取任务并最终执行任务。当运行多个消费者时,任务将在它们之间分发。

使用任务队列的一个优点是能够轻松地并行工作。如果我们正在积压工作任务,我们可以添加更多工作进程,这样就可以轻松扩展。

简单来说就是可以简易的实现负载均衡

1).生产者发送消息

这里模拟耗时任务,发送的消息中,每个点使工作进程暂停一秒钟,例如"Hello…"将花费3秒钟来处理

public class Test1 {
    public static void main(String[] args) throws Exception {
        ConnectionFactory f = new ConnectionFactory();
        f.setHost("192.168.64.140");
        f.setPort(5672);
        f.setUsername("admin");
        f.setPassword("admin");
        
        Connection c = f.newConnection();
        Channel ch = c.createChannel();
        //参数:queue,durable,exclusive,autoDelete,arguments
        ch.queueDeclare("helloworld", false,false,false,null);

        while (true) {
            //控制台输入的消息发送到rabbitmq
            System.out.print("输入消息: ");
            String msg = new Scanner(System.in).nextLine();
            //如果输入的是"exit"则结束生产者进程
            if ("exit".equals(msg)) {
                break;
            }
            //参数:exchage,routingKey,props,body
            ch.basicPublish("", "helloworld", null, msg.getBytes());
            System.out.println("消息已发送: "+msg);
        }

        c.close();
    }
}

2).消费者接收消息

public class Consumer {
    public static void main(String[] args) throws Exception {
        //建立连接
         ConnectionFactory f = new ConnectionFactory();
         f.setHost("192.168.64.140");
         f.setPort(5672);
         f.setUsername("admin");
         f.setPassword("admin");
         Connection con = f.newConnection();
         Channel c = con.createChannel();
         //定义队列
         c.queueDeclare("helloworld",
         false, //是否是持久队列
         false, //是否是独占队列(消费者能否共享)
         false, //自动删除
         null); //其他的属性设置map类型
         DeliverCallback deliverCallback = new DeliverCallback() {
                    @Override
         public void handle(String consumerTag, Delivery message) throws IOException {
                        String msg = new String(message.getBody());
         System.out.println("收到:" + msg);
         for (int i = 0; i<msg.length();i++){
                            if(msg.charAt(i)=='.'){
                                try {
                                    Thread.sleep(1000);
         } catch (InterruptedException e) {
                                }
                            }
                        }
                        c.basicAck(message.getEnvelope().getDeliveryTag(), false);
         System.out.println("消息处理结束");
         }
                };
         CancelCallback cancelCallback = new CancelCallback() {
                    @Override
         public void handle(String consumerTag) throws IOException {
                    }
                };
         //合理分发
         c.basicQos(1);
         //消费数据,接收数据
         c.basicConsume("helloworld",
         false, //关闭自动确认ack,改为手动确认
         deliverCallback,
         cancelCallback);
         }
}

3).消息确认

一个消费者接收消息后,在消息没有完全处理完时就挂掉了,那么这时会发生什么呢?

就现在的代码来说,rabbitmq把消息发送给消费者后,会立即删除消息,那么消费者挂掉后,它没来得及处理的消息就会丢失,

为了确保消息不会丢失,rabbitmq支持消息确认(回执)。当一个消息被消费者接收到并且执行完成后,消费者会发送一个ack (acknowledgment) 给rabbitmq服务器, 告诉他我已经执行完成了,你可以把这条消息删除了。

如果一个消费者没有返回消息确认就挂掉了(信道关闭,连接关闭或者TCP链接丢失),rabbitmq就会明白,这个消息没有被处理完成,rebbitmq就会把这条消息重新放入队列,如果在这时有其他的消费者在线,那么rabbitmq就会迅速的把这条消息传递给其他的消费者,这样就确保了没有消息会丢失。

这里不存在消息超时, rabbitmq只在消费者挂掉时重新分派消息, 即使消费者花非常久的时间来处理消息也可以

手动消息确认默认是开启的,前面的例子我们通过autoAck=ture把它关闭了。我们现在要把它设置为autoAck=false,然后工作进程处理完意向任务时,发送一个消息确认(回执)

如上述代码所示.

4).合理地分发

rabbitmq会一次把多个消息分发给消费者, 这样可能造成有的消费者非常繁忙, 而其它消费者空闲. 而rabbitmq对此一无所知, 仍然会均匀的分发消息

我们可以使用 basicQos(1) 方法, 这告诉rabbitmq一次只向消费者发送一条消息, 在返回确认回执前, 不要向消费者发送新消息. 而是把消息发给下一个空闲的消费者

5).消息持久化

当rabbitmq关闭时, 我们队列中的消息仍然会丢失, 除非明确要求它不要丢失数据

要求rabbitmq不丢失数据要做如下两点: 把队列和消息都设置为可持久化(durable)

队列设置为可持久化, 可以在定义队列时指定参数durable为true

由于之前我们已经定义过队列"helloworld"是不可持久化的, 对已存在的队列, rabbitmq不允许对其定义不同的参数, 否则会出错,有两种方式进行修改:1.删除重建队列,2.另起一个名字 所以这里我们定义一个不同名字的队列"task_queue"

//定义队列
c.queueDeclare("task_queue",
 true, //是否是持久队列
 false, //是否是独占队列(消费者能否共享)
 false, //自动删除
 null); //其他的属性设置map类型

生产者和消费者代码都要修改

这样即使rabbitmq重新启动, 队列也不会丢失. 现在我们再设置队列中消息的持久化, 使用MessageProperties.PERSISTENT_TEXT_PLAIN参数

整体代码如下:
1.生产者

public class Producer {
    public static void main(String[] args) throws Exception {
        //建立连接
 ConnectionFactory f = new ConnectionFactory();
 f.setHost("192.168.64.140");
 f.setPort(5672);
 f.setUsername("admin");
 f.setPassword("admin");
 Connection con = f.newConnection();
 Channel c = con.createChannel();
 //定义队列
 c.queueDeclare("task_queue",
 true, //是否是持久队列
 false, //是否是独占队列(消费者能否共享)
 false, //自动删除
 null); //其他的属性设置map类型
 while (true){
            System.out.print("输入消息:");
 String msg = new Scanner(System.in).nextLine();
 //发送消息
 c.basicPublish("",
 "task_queue",
 MessageProperties.PERSISTENT_TEXT_PLAIN, //持久化纯文本消息 //其他消息属性
 msg.getBytes());
 }
    }
}

2.消费者

public class Consumer {
    public static void main(String[] args) throws Exception {
        //建立连接
 ConnectionFactory f = new ConnectionFactory();
 f.setHost("192.168.64.140");
 f.setPort(5672);
 f.setUsername("admin");
 f.setPassword("admin");
 Connection con = f.newConnection();
 Channel c = con.createChannel();
 //定义队列
 c.queueDeclare("task_queue",
 true, //是否是持久队列
 false, //是否是独占队列(消费者能否共享)
 false, //自动删除
 null); //其他的属性设置map类型
 DeliverCallback deliverCallback = new DeliverCallback() {
            @Override
 public void handle(String consumerTag, Delivery message) throws IOException {
                String msg = new String(message.getBody());
 System.out.println("收到:" + msg);
 for (int i = 0; i<msg.length();i++){
                    if(msg.charAt(i)=='.'){
                        try {
                            Thread.sleep(1000);
 } catch (InterruptedException e) {
                        }
                    }
                }
                c.basicAck(message.getEnvelope().getDeliveryTag(), false);
 System.out.println("消息处理结束");
 }
        };
 CancelCallback cancelCallback = new CancelCallback() {
            @Override
 public void handle(String consumerTag) throws IOException {
            }
        };
 c.basicQos(1);
 //消费数据,接收数据
 c.basicConsume("task_queue",
 false, //关闭自动确认ack,改为手动确认
 deliverCallback,
 cancelCallback);
 }
}
查看原文

赞 0 收藏 0 评论 0

认证与成就

  • 获得 11 次点赞
  • 获得 3 枚徽章 获得 0 枚金徽章, 获得 1 枚银徽章, 获得 2 枚铜徽章

擅长技能
编辑

开源项目 & 著作
编辑

(゚∀゚ )
暂时没有

注册于 7月28日
个人主页被 893 人浏览