和耳朵

和耳朵 查看完整档案

上海编辑  |  填写毕业院校  |  填写所在公司/组织填写个人主网站
编辑

公众号:和耳朵 | 🏆争做认真学习冠军

个人动态

和耳朵 赞了文章 · 10月16日

思否有约丨@和耳朵:今天,我辞职了

访谈嘉宾:@和耳朵
访谈编辑:芒果果

“2020 年 10 月 16 日,天气晴,我辞职了。 ”

和耳朵今年 22 岁,是从业两年左右的 Java 开发工程师,他离职的决定并不是一时冲动下做出的,而是经过了理智的思考后决定换个环境刺激一下自己。

这个喜欢《海贼王》的男孩总会通过动漫故事里的路飞的冒险经历获取力量。很多年轻人被安上了“不顾后果”、“随性而为”的标签,其实他们只是更敢直面“后果”二字。

Q:辞职是冲动情绪下的决定么?你对裸辞怎么看?

不是冲动情绪,是理智思考过后的决定,比较想换个环境刺激一下自己。

离职会没有年终奖,从现在到发年终奖还有四个月,如果离职之后四个月的涨薪能把年终的部分都找补回来也不错的。

换言之,放弃年终奖可以让我提前四个月换个环境,我不是说之前的公司有什么不好,而是说四个月的时间其实蛮久的,他足以让人从积累很多的经验。

还有一个原因其实是关于女朋友的,虽然我俩认识不太久,但是我们可以确定彼此的真心,所以决定搬到一起。

裸辞的话,我是不建议的,现在程序员很多都是买方市场,环境上来讲找工作这个时间点还是挺不好的,可能会有很多挫折,甚至心态崩了。

不过你如果年轻的话,身上无房无贷,那有什么所谓呢,想换就换吧,只要想清楚,别后悔。

Q:把自己带入动漫角色的话你觉得自己是谁?希望自己是谁?

看海贼王喜欢路飞,看火影忍者喜欢鸣人。

小时候喜欢酷一点的角色,长大了喜欢乐观一点的角色。

Q:你觉得年轻的程序员在心态和工作上有什么不同?

更加积极乐观上进一点,碰到新技术还是蛮兴奋的,身体也能抗,处于一个人一生的黄金时代,觉得自己有一天迟早会成为大牛。

Q:生活中有什么爱好?

我的爱好是看动漫,最难过的一段时间天天看海贼王路飞的冒险故事,其实从故事那里能获取力量。

爱好文学,以前写过一点短小说,但是没公开过,现在也不写了。当了程序员之后整个圈子都在逼着你学习,让我其他的爱好几乎都被覆盖了,好久没看那些书了,现在信息流几乎是铺天盖地的。

以前喜欢看深度一点的电影,现代发现不带脑子的爆米花电影也很好看,可以让脑袋休息休息。

随性不是任性,虽然有时自己的想法和公司会有分歧,但和耳朵很清楚,项目是公司的,有想法可以充分沟通,但不能越界。他可以在自己的项目里单纯的体现自己的技术栈,也可以在公司的项目里按要求完成任务。

Q:从业至今最满意的开发项目是什么?

最满意的其实是我自己做着玩的电商系统,它没什么用就是体现自己的技术栈,但是却给了我很大的满足感。中间也遇到了很多问题 ,对我自己的成长有很大的帮助。

像公司的项目也挺好 不过它的开发方向和我想的不太一致,所以谈不上最满意。

Q:可以讲一下做电商系统遇到了哪些让自己成长的问题么?是如何解决的?

做系统是一个从0到1的过程,做之前可能对那些自动部署什么感觉很厉害,最后发现原理还是那些原理,自己也能实现。

这个流程踩过的坑都是你对开发这件事增加的经验,从0到1,从设计到开发,从开发到部署,从部署到运维,此中意味,只有本人才能体会到了。

各种问题解决方案网上铺天盖地的,凡是市面上有的,都是有解决方案的,慢慢踩坑吧。

Q:在自己的想法和公司的方向之间该怎么做权衡?

按照公司走,毕竟自己不是产品,在其位谋其政吧,不能越界,有想法可以沟通。

Q:您有什么个人比较特别的工作习惯么?

我喜欢整个背部都是靠在椅子上,这样更舒服,一般不会让背离开椅子靠背。

个人比较喜欢下午工作,感觉下午 2-3 点精神最好,其实很多开发工作都是想的要的写的多,思考是一件很费神的事情,但人在过程中不会感觉特别累,因为这是一种对自己的挑战的吧,完成一个工作的时候就像 LOL赢了之后出现 winter 一样的开心。

Q:你是如何看待国内的 IT 环境的?

国内太卷了,前文提到我准备离职了,当时会在闲暇之余刷一刷BOSS,发现现在外包都要求工作年限了,动不动三年起步。

国内IT其实到处充斥着奋斗口号,狼性文化,让你不得不努力,导致入行门槛越来越高。

这可以叫内卷也可以叫剧场效应,就是说本来大家在那坐的好好的看电影,突然有一人站起来了,后面的人看不见也跟着站起来,最后所有人都站起来看电影。

付出了站着的劳动损耗,观看体验没有上升反而下降。

但是你又不得不这样做,因为猜疑链。

可能可以号召大家都别当奋斗逼,但是我不知道你会不会当奋斗逼,所以我选择先当奋斗逼干掉不奋斗的,所有人都会这样想,最终形成闭环,所有人都得奋斗。

Q:是怎么开始在思否写文章的呢?

很早就知道思否了,但是感觉太高端,后来写了一些文章发到了思否,还是有人看的,让我觉得很惊喜。思否最好的一点就是能吸收我们这些作者的反馈。

Q:给编程初学者和怀抱梦想的年轻人提点建议吧。

编程上手容易,入行极难,小伙伴们要小心了。

三分思考七分练,只有理念没有落地几乎等于没有,作品是检验一个程序员的唯一标准。


小编有话说:

很多年轻人觉得裸辞很酷,甚至有段子说辞职的原因是“老板不听我的”。其实不管是物质上没有得到满足还是精神上无法承受压力,选择离开都没有错,只要能对自己的决定负责就好了。和耳朵就是想清楚了的那一类人,他对自己有清晰的认识,“还年轻、无房无贷”。

无论被迫还是自愿,我们每个人都在不停的向前奔跑,也许现在你落在很多人身后,但还没到终点一切还有可能。

“张华考进了北京大学,李萍进了中等技术学校,我在百货公司当售货员,我们都有着光明的未来。”

segmentfault 公众号

查看原文

赞 3 收藏 0 评论 3

和耳朵 发布了文章 · 9月2日

RabbitMQ高级之消息限流与延时队列

人生终将是场单人旅途,孤独之前是迷茫,孤独过后是成长。

楔子

本篇是消息队列RabbitMQ的第五弹。

上篇本来打算讲述RabbitMQ的一些高级用法:

  • 如何保证消息的可靠性?
  • 消息队列如何进行限流?
  • 如何设置延时队列进行延时消费?

最终因为篇幅缘故,上篇只讲了如何保证消息的可靠性?,本篇将会把剩下两个讲完,本篇也可能是RabbitMQ系列的最后一篇了~

我所讲的知识点在工作中基本上也够用了,希望大家好好消化。

旧坑填上之后可能会慢慢开新坑了,同时因为现在到九月中旬这段时间我有一场考试需要筹备,所以文章更新可能会比较慢,但是一周一更算是最低限度把,希望大家多多担待。

---

祝有好收获,先赞后看,快乐无限。

本文代码:码云地址GitHub地址

1. 🔍消息队列如何限流?

消息队列限流是指在服务器面临巨额流量时,为了进行自保,进行的一种救急措施。

因为巨大的流量代表着非常多的消息,这些消息如果多到服务器处理不过来就会造成服务器瘫痪,影响用户体验,造成不良影响。

所以要进行一次降级操作,把处理不了的流量隔绝在系统之外,避免它们打垮系统。

基本上任何一个消息队列都有限流的功能,今天我们就来看看在RabbitMQ之中进行限流具体应该怎么做?

RabbitMQ提供了一种QOS(服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息还未被消费确认,则不进行新消息的消费。


spring:
  rabbitmq:
    addresses: 127.0.0.1
    host: 5672
    username: guest
    password: guest
    virtual-host: /
    # 手动确认消息
    listener:
      simple:
          acknowledge-mode: manual
          prefetch: 2

我们只需要配置一下rabbitmq.listener.simple下的prefetch属性即可,为了演示方便我这里配置为两条,语义即为:如果队列中有两条以上未签收的消息,则不进行新的消息消费。

我往我的队列中发送三条信息,并不进行签收,来看看效果:

消息限流演示01

发送完显示我们系统中有三条Ready消息,这代表这三条消息还在队列中没有消费端去消费。

这时我打开消费端进行消费但依旧不进行签收,接着来看效果:

消息限流演示02

unacked=2,ready=1,这就代表有两条消息在服务端消费了但是没有签收,还有一条消息还在队列中没有往服务端推送,因为我们设置了prefetch=2,所以现在队列的最大同时在消费的消息数量为2,通过此种方式,我们就完成了消费限流。

Tip : 这种方式下消息一定要进行手动签收,因为之前的文章中我们讲过,自动签收是消息一达到消费端就进行签收了,可能我们的业务逻辑还没运行就已经进行签收了,所以自动签收状态下开启限流几乎没有作用。

2. 📑RabbitMQ控制台

上一节我的截图中,大家可以发现居然出现了可视化的界面,以往在我的截图中一般都是DOS命令操作台界面,但其实RabbitMQ是自带了可视化界面的插件的,我们只需要开启即可。

在我们的RabbitMQ中输入如下命令:rabbitmq-plugins.bat enable rabbitmq_management

就可以开启可视化页面了,紧接着访问:http://localhost:15672/

可视化页面01

默认用户名和密码都是 guest,直接登录即可。

可视化页面02

很方便的控制台,大家可以自己试一下~

3. 📔TTL消息/队列

TTL是Time To Live的缩写,也就是生存时间的意思,RabbitMQ支持消息的过期时间,在消息发送时可以进行指定,也支持队列的过期时间,从消息入队列开始计算,只要超过了队列的超时时间配置,那么消息会自动的清除。

设置队列的话就是整个队列的消息到时都会过期,设置消息的话就是单条消息到时自动过期。

    // TTL队列示例
    @Bean
    public Queue ttlQueue() {
        Map<String, Object> arguments = new HashMap<>();
        // 设置3s过期
        arguments.put("x-message-ttl",3000);
        return new Queue("topicQueue1",false,false,false, arguments);
    }

上面的代码就是演示如何创建一个TTL队列,需要放入参数才行,队列构造中的其他参数我为了方便直接填了false。

    public void sendTtl() {
        String message = "Hello 我是作者和耳朵,欢迎关注我。" + LocalDateTime.now().toString();

        System.out.println("Message content : " + message);

        // 设置过期3s
        MessageProperties props = MessagePropertiesBuilder.newInstance()
                .setExpiration("3000").build();

        rabbitTemplate.send(Producer.QUEUE_NAME,new Message(message.getBytes(StandardCharsets.UTF_8),props));
        System.out.println("消息发送完毕。");
    }

设置消息的TTL也是设置参数即可。

以上就是RabbitMQ中关于TTL的知识点。

4. 📌DLX死信队列

DLX死信队列虽然叫队列,但其实指的是Exchange,或者说指的Exchange和它所属的Queue,他俩一块构成了死信队列。

当一条消息:

  • 消费被拒绝(basic.reject/basic.nack)并且requeue=false
  • TTL过期
  • 要进入的队列达到最大长度

这三种情况,就可以判定一条消息死了,这种消息如果我们没有做处理,它就会被自动删除。

但其实我们可以在队列上加上一个参数,使当队列中发现了死亡的消息之后会将它自动转发到某个Exchange,由指定的Exchange来处理这些死亡的消息。

这个处理死亡消息的Exchange和之前我们讲述的Exchange没什么区别,依然可以绑定队列然后进行消息消费。

    // DLX队列示例
    @Bean
    public Queue dlxQueue() {
        Map<String, Object> arguments = new HashMap<>();
        // 指定消息死亡后发送到ExchangeName="dlx.exchange"的交换机去
        arguments.put("x-dead-letter-exchange","dlx.exchange");
        return new Queue("topicQueue1", false, false, false, arguments);
    }

如上代码,就是设置了一个队列中的消息死亡后的去处,就等于消息死亡后给它不把它删掉而是做一次转发,发到其他Exchange去。

那这样搞有什么用呢?这就取决于业务需求了,不过下一节会用到它,接着往下看~

5. 💡延时队列

RabbitMQ的基因中没有延时队列这回事,它不能直接指定一个队列类型为延时队列,然后去延时处理,但是经过上面两节的铺垫,我们可以将TTL+DLX相结合,这就能组成一个延时队列。

设想一个场景,下完订单之后15分钟未付款我们就要将订单关闭,这就是一个很经典的演示消费的场景,如果拿RabbitMQ来做,我们就需要结合TTL+DLX了。

先把订单消息设置好15分钟过期时间,然后过期后队列将消息转发给我们设置好的DLX-ExchangeDLX-Exchange再将分发给它绑定的队列,我们的消费者再消费这个队列中的消息,就做到了延时十五分钟消费。

真是super~~~简单呢

后记

收尾了收尾了,RabbitMQ结束了,虽然有些东西没有讲比如:镜像队列,因为我没用过而且一般轮不到自己来做这个,所以就懒了一下就不写这个了,RabbitMQ毕竟不是一个天生的分布式消息队列,弄镜像什么的还是有点麻烦的。

陆陆续续似乎写了快一个月呢,东西有点多也有些繁杂,要不下期写一篇文章专门回顾一下,再画个思维导图什么的,给大家梳理一下,再抽几个小册六折码

最后再给优狐打个广告,最近掘金在GitHub上面建立了一个开源计划 - open-source,旨在收录各种好玩的好用的开源库,如果大家有想要自荐或者分享的开源库都可以参与进去,为这个开源计划做一份贡献,同时这个开源库的Start也在稳步增长中,参与进去也可以增加自己项目的曝光度,一举两得。

同时这个开源库还有一个兄弟项目 - open-source-translation,旨在招募技术文章翻译志愿者进行技术文章的翻译工作,
争做最棒开源翻译,翻译业界高质量文稿,为技术人的成长献一份力。


最近这段时间事情挺多,优狐令我八月底之前升级到三级,所以各位读者的赞对我很重要,希望大家能够高抬贵手,帮我一哈~

好了,以上就是本期的全部内容,感谢你能看到这里,欢迎对本文点赞收藏与评论,👍你们的每个点赞都是我创作的最大动力。

我是耳朵,一个一直想做知识输出的伪文艺程序员,我们下期见。

本文代码:码云地址GitHub地址

查看原文

赞 20 收藏 15 评论 1

和耳朵 发布了文章 · 8月24日

RabbitMQ高级之如何保证消息可靠性?

人生终将是场单人旅途,孤独之前是迷茫,孤独过后是成长。

楔子

本篇是消息队列RabbitMQ的第四弹。

RabbitMQ我已经写了三篇了,基础的收发消息和基础的概念我都已经写了,学任何东西都是这样,先基础的上手能用,然后遇到问题再去解决,无法理解就去深入源码,随着时间的积累对这一门技术的理解也会随之提高。

基础操作已经熟练后,相信大家不可避免的会生出向那更高处攀登的心来,今天我就罗列一些RabbitMQ比较高级的用法,有些用得到有些用不上,但是一定要有所了解,因为大部分情况我们都是面向面试学习~

  • 如何保证消息的可靠性?
  • 消息队列如何进行限流?
  • 如何设置延时队列进行延时消费?


祝有好收获,先赞后看,快乐无限。

本文代码:码云地址GitHub地址

1. 📖如何保证消息的可靠性?

rabbit架构图

先来看看我们的万年老图,从图上我们大概可以看出来一个消息会经历四个节点,只有保证这四个节点的可靠性才能保证整个系统的可靠性。

  • 生产者发出后保证到达了MQ。
  • MQ收到消息保证分发到了消息对应的Exchange。
  • Exchange分发消息入队之后保证消息的持久性。
  • 消费者收到消息之后保证消息的正确消费。

经历了这四个保证,我们才能保证消息的可靠性,从而保证消息不会丢失。

2. 🔍生产者发送消息到MQ失败

我们的生产者发送消息之后可能由于网络闪断等各种原因导致我们的消息并没有发送到MQ之中,但是这个时候我们生产端又不知道我们的消息没有发出去,这就会造成消息的丢失。

为了解决这个问题,RabbitMQ引入了事务机制发送方确认机制(publisher confirm),由于事务机制过于耗费性能所以一般不用,这里我着重讲述发送方确认机制

这个机制很好理解,就是消息发送到MQ那端之后,MQ会回一个确认收到的消息给我们


打开此功能需要配置,接下来我来演示一下配置:

spring:
  rabbitmq:
    addresses: 127.0.0.1
    host: 5672
    username: guest
    password: guest
    virtual-host: /
    # 打开消息确认机制
    publisher-confirm-type: correlated

我们只需要在配置里面打开消息确认即可(true是返回客户端,false是自动删除)。

生产者:

    public void sendAndConfirm() {
        User user = new User();

        log.info("Message content : " + user);

        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend(Producer.QUEUE_NAME,user,correlationData);
        log.info("消息发送完毕。");

        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback(){
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                log.info("CorrelationData content : " + correlationData);
                log.info("Ack status : " + ack);
                log.info("Cause content : " + cause);
                if(ack){
                    log.info("消息成功发送,订单入库,更改订单状态");
                }else{
                    log.info("消息发送失败:"+correlationData+", 出现异常:"+cause);
                }
            }
        });
    }

生产者代码里我们看到又多了一个参数:CorrelationData,这个参数是用来做消息的唯一标识,同时我们打开消息确认之后需要对rabbitTemplate多设置一个setConfirmCallback,参数是一个匿名类,我们消息确认成功or失败之后的处理就是写在这个匿名类里面。

比如一条订单消息,当消息确认到达MQ确认之后再行入库或者修改订单的节点状态,如果消息没有成功到达MQ可以进行一次记录或者将订单状态修改。

Tip:消息确认失败不只有消息没发过去会触发,消息发过去但是找不到对应的Exchange,也会触发。

3. 📔MQ接收失败或者路由失败

生产者的发送消息处理好了之后,我们就可以来看看MQ端的处理,MQ可能出现两个问题:

  1. 消息找不到对应的Exchange。
  2. 找到了Exchange但是找不到对应的Queue。

这两种情况都可以用RabbitMQ提供的mandatory参数来解决,它会设置消息投递失败的策略,有两种策略:自动删除或返回到客户端。

我们既然要做可靠性,当然是设置为返回到客户端。


配置:

spring:
  rabbitmq:
    addresses: 127.0.0.1
    host: 5672
    username: guest
    password: guest
    virtual-host: /
    # 打开消息确认机制
    publisher-confirm-type: correlated
    # 打开消息返回
    publisher-returns: true
    template:
      mandatory: true

我们只需要在配置里面打开消息返回即可,template.mandatory: true这一步不要少~

生产者:

    public void sendAndReturn() {
        User user = new User();

        log.info("Message content : " + user);

        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            log.info("被退回的消息为:{}", message);
            log.info("replyCode:{}", replyCode);
            log.info("replyText:{}", replyText);
            log.info("exchange:{}", exchange);
            log.info("routingKey:{}", routingKey);
        });

        rabbitTemplate.convertAndSend("fail",user);
        log.info("消息发送完毕。");
    }

这里我们可以拿到被退回消息的所有信息,然后再进行处理,比如放到一个新的队列单独处理,路由失败一般都是配置问题了。

4. 📑消息入队之后MQ宕机

到这一步基本都是一些很小概率的问题了,比如MQ突然宕机了或者被关闭了,这种问题就必须要对消息做持久化,以便MQ重新启动之后消息还能重新恢复过来。

消息的持久化要做,但是不能只做消息的持久化,还要做队列的持久化和Exchange的持久化。

    @Bean
    public DirectExchange directExchange() {
        // 三个构造参数:name durable autoDelete
        return new DirectExchange("directExchange", false, false);
    }

    @Bean
    public Queue erduo() {
        // 其三个参数:durable exclusive autoDelete
        // 一般只设置一下持久化即可
        return new Queue("erduo",true);
    }

创建Exchange和队列时只要设置好持久化,发送的消息默认就是持久化消息。

设置持久化时一定要将Exchange和队列都设置上持久化:

单单只设置Exchange持久化,重启之后队列会丢失。单单只设置队列的持久化,重启之后Exchange会消失,既而消息也丢失,所以如果不两个一块设置持久化将毫无意义。

Tip: 这些都是MQ宕机引起的问题,如果出现服务器宕机或者磁盘损坏则上面的手段统统无效,必须引入镜像队列,做异地多活来抵御这种不可抗因素。

5. 📌消费者无法正常消费

最后一步会出问题的地方就在消费者端了,不过这个解决问题的方法我们之前的文章已经说过了,就是消费者的消息确认。

spring:
  rabbitmq:
    addresses: 127.0.0.1
    host: 5672
    username: guest
    password: guest
    virtual-host: /
    # 手动确认消息
    listener:
      simple:
          acknowledge-mode: manual

打开手动消息确认之后,只要我们这条消息没有成功消费,无论中间是出现消费者宕机还是代码异常,只要连接断开之后这条信息还没有被消费那么这条消息就会被重新放入队列再次被消费。

当然这也可能会出现重复消费的情况,不过在分布式系统中幂等性是一定要做的,所以一般重复消费都会被接口的幂等给拦掉。

所谓幂等性就是:一个操作多次执行产生的结果与一次执行产生的结果一致。

幂等性相关内容不在本章讨论范围~所以我就不多做阐述了。

6. 💡消息可靠性案例

消息可靠性架构

这个图是我很早之前画的,是为了记录当时使用RabbitMQ做消息可靠性的具体做法,这里我正好拿出来做个例子给大家看一看。

这个例子中的消息是先入库的,然后生产者从DB里面拿到数据包装成消息发给MQ,经过消费者消费之后对DB数据的状态进行更改,然后重新入库。

这中间有任何步骤失败,数据的状态都是没有更新的,这时通过一个定时任务不停的去刷库,找到有问题的数据将它重新扔到生产者那里进行重新投递。

这个方案其实和网上的很多方案大同小异,基础的可靠性保证之后,定时任务做一个兜底进行不断的扫描,力图100%可靠性。

后记

越写越长,因为篇幅缘故限流和延时队列放到下一篇了,我会尽快发出来供大家阅读,讲真,我真的不是故意多水一篇的!!!

最后再给优狐打个广告,最近掘金在GitHub上面建立了一个开源计划 - open-source,旨在收录各种好玩的好用的开源库,如果大家有想要自荐或者分享的开源库都可以参与进去,为这个开源计划做一份贡献,同时这个开源库的Start也在稳步增长中,参与进去也可以增加自己项目的曝光度,一举两得。

同时这个开源库还有一个兄弟项目 - open-source-translation,旨在招募技术文章翻译志愿者进行技术文章的翻译工作,
争做最棒开源翻译,翻译业界高质量文稿,为技术人的成长献一份力。


最近这段时间事情挺多,优狐令我八月底之前升级到三级,所以各位读者的赞对我很重要,希望大家能够高抬贵手,帮我一哈~

好了,以上就是本期的全部内容,感谢你能看到这里,欢迎对本文点赞收藏与评论,👍你们的每个点赞都是我创作的最大动力。

我是耳朵,一个一直想做知识输出的伪文艺程序员,我们下期见。

本文代码:码云地址GitHub地址

查看原文

赞 20 收藏 17 评论 2

和耳朵 发布了文章 · 8月19日

上手了RabbitMQ?再来看看它的交换机(Exchange)吧

人生终将是场单人旅途,孤独之前是迷茫,孤独过后是成长。

楔子

本篇是消息队列RabbitMQ的第三弹。

RabbitMQ的入门RabbitMQ+SpringBoot的整合可以点此链接进去回顾,今天要讲的是RabbitMQ的交换机。

本篇是理解RabbitMQ很重要的一篇,交换机是消息的第一站,只有理解了交换机的分发模式,我们才能知道不同交换机根据什么规则分发消息,才能明白在面对不同业务需求的时候应采用哪种交换机。


祝有好收获,先赞后看,快乐无限。

本文代码:码云地址GitHub地址

1. 🔍Exchange

rabbit架构图

先来放上几乎每篇都要出现一遍的我画了好久的RabbitMQ架构图。

前两篇文中我们一直没有显式的去使用Exchange,都是使用的默认Exchange,其实Exchange是一个非常关键的组件,有了它才有了各种消息分发模式。

我先简单说说Exchange有哪几种类型:

  1. fanoutFanout-Exchange会将它接收到的消息发往所有与他绑定的Queue中。
  2. directDirect-Exchange会把它接收到的消息发往与它有绑定关系且Routingkey完全匹配的Queue中(默认)。
  3. topicTopic-Exchange与Direct-Exchange相似,不过Topic-Exchange不需要全匹配,可以部分匹配,它约定:Routingkey为一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词)。
  4. headerHeader-Exchange不依赖于RoutingKey或绑定关系来分发消息,而是根据发送的消息内容中的headers属性进行匹配。此模式已经不再使用,本文中也不会去讲,大家知道即可。

本文中我们主要讲前三种Exchange方式,相信凭借着我简练的文字和灵魂的画技给大家好好讲讲,争取老妪能解。

Tip:本文的代码演示直接使用SpringBoot+RabbitMQ的模式。

2. 📕Fanout-Exchange

先来看看Fanout-ExchangeFanout-Exchange又称扇形交换机,这个交换机应该是最容易理解的。

扇形交换机

ExchangeQueue建立一个绑定关系,Exchange会分发给所有和它有绑定关系的Queue中,绑定了十个Queue就把消息复制十份进行分发。

这种绑定关系为了效率肯定都会维护一张表,从算法效率上来说一般是O(1),所以Fanout-Exchange是这几个交换机中查找需要被分发队列最快的交换机。


下面是一段代码演示:

    @Bean
    public Queue fanout1() {
        return new Queue("fanout1");
    }

    @Bean
    public Queue fanout2() {
        return new Queue("fanout2");
    }

    @Bean
    public FanoutExchange fanoutExchange() {
        // 三个构造参数:name durable autoDelete
        return new FanoutExchange("fanoutExchange", false, false);
    }

    @Bean
    public Binding binding1() {
        return BindingBuilder.bind(fanout1()).to(fanoutExchange());
    }

    @Bean
    public Binding binding2() {
        return BindingBuilder.bind(fanout2()).to(fanoutExchange());
    }

为了清晰明了,我新建了两个演示用的队列,然后建了一个FanoutExchange,最后给他们都设置上绑定关系,这样一组队列和交换机的绑定设置就算完成了。

紧接着编写一下生产者和消费者:

    public void sendFanout() {
        Client client = new Client();

        // 应读者要求,以后代码打印的地方都会改成log方式,这是一种良好的编程习惯,用System.out.println一般是不推荐的。
        log.info("Message content : " + client);

        rabbitTemplate.convertAndSend("fanoutExchange",null,client);
        System.out.println("消息发送完毕。");
    }

    @Test
    public void sendFanoutMessage() {
        rabbitProduce.sendFanout();
    }
@Slf4j
@Component("rabbitFanoutConsumer")
public class RabbitFanoutConsumer {
    @RabbitListener(queues = "fanout1")
    public void onMessage1(Message message, Channel channel) throws Exception {
        log.info("Message content : " + message);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        log.info("消息已确认");
    }

    @RabbitListener(queues = "fanout2")
    public void onMessage2(Message message, Channel channel) throws Exception {
        log.info("Message content : " + message);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        log.info("消息已确认");
    }

}

这两段代码都很好理解,不再赘述,有遗忘的可以去看RabbitMQ第一弹的内容。

其中发送消息的代码有三个参数,第一个参数是Exchange的名称,第二个参数是routingKey的名称,这个参数在扇形交换机里面用不到,在其他两个交换机类型里面会用到。

代码的准备到此结束,我们可以运行发送方法之后run一下了~

项目启动后,我们可以先来观察一下队列与交换机的绑定关系有没有生效,我们在RabbitMQ控制台使用rabbitmqctl list_bindings命令查看绑定关系。

扇形交换机绑定关系

关键部分我用红框标记了起来,这就代表着名叫fanoutExchange的交换机绑定着两个队列,一个叫fanout1,另一个叫fanout2

紧接着,我们来看控制台的打印情况:

扇形交换机确认消息

可以看到,一条信息发送出去之后,两个队列都接收到了这条消息,紧接着由我们的两个消费者消费。

Tip: 如果你的演示应用启动之后没有消费信息,可以尝试重新运行一次生产者的方法发送消息。

3. 📗Direct-Exchange

Direct-Exchange是一种精准匹配的交换机,我们之前一直使用默认的交换机,其实默认的交换机就是Direct类型。

如果将Direct交换机都比作一所公寓的管理员,那么队列就是里面的住户。(绑定关系)

管理员每天都会收到各种各样的信件(消息),这些信件的地址不光要标明地址(ExchangeKey)还需要标明要送往哪一户(routingKey),不然消息无法投递。

扇形交换机

以上图为例,准备一条消息发往名为SendService的直接交换机中去,这个交换机主要是用来做发送服务,所以其绑定了两个队列,SMS队列和MAIL队列,用于发送短信和邮件。

我们的消息除了指定ExchangeKey还需要指定routingKeyroutingKey对应着最终要发送的是哪个队列,我们的示例中的routingKey是sms,这里这条消息就会交给SMS队列。


听了上面这段,可能大家对routingKey还不是很理解,我们上段代码实践一下,大家应该就明白了。

准备工作:

    @Bean
    public Queue directQueue1() {
        return new Queue("directQueue1");
    }

    @Bean
    public Queue directQueue2() {
        return new Queue("directQueue2");
    }

    @Bean
    public DirectExchange directExchange() {
        // 三个构造参数:name durable autoDelete
        return new DirectExchange("directExchange", false, false);
    }

    @Bean
    public Binding directBinding1() {
        return BindingBuilder.bind(directQueue1()).to(directExchange()).with("sms");
    }

    @Bean
    public Binding directBinding2() {
        return BindingBuilder.bind(directQueue2()).to(directExchange()).with("mail");
    }

新建两个队列,新建了一个直接交换机,并设置了绑定关系。

这里的示例代码和上面扇形交换机的代码很像,唯一可以说不同的就是绑定的时候多调用了一个withroutingKey设置了上去。

所以是交换机和队列建立绑定关系的时候设置的routingKey,一个消息到达交换机之后,交换机通过消息上带来的routingKey找到自己与队列建立绑定关系时设置的routingKey,然后将消息分发到这个队列去。

生产者:

    public void sendDirect() {
        Client client = new Client();

        log.info("Message content : " + client);

        rabbitTemplate.convertAndSend("directExchange","sms",client);
        System.out.println("消息发送完毕。");
    }

消费者:

@Slf4j
@Component("rabbitDirectConsumer")
public class RabbitDirectConsumer {
    @RabbitListener(queues = "directQueue1")
    public void onMessage1(Message message, Channel channel) throws Exception {
        log.info("Message content : " + message);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        log.info("消息已确认");
    }

    @RabbitListener(queues = "directQueue2")
    public void onMessage2(Message message, Channel channel) throws Exception {
        log.info("Message content : " + message);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        log.info("消息已确认");
    }

}

效果图如下:

扇形交换机

只有一个消费者进行了消息,符合我们的预期。

4. 📙Topic-Exchange

Topic-Exchange是直接交换机的模糊匹配版本,Topic类型的交换器,支持使用"*"和"#"通配符定义模糊bindingKey,然后按照routingKey进行模糊匹配队列进行分发。

  • *:能够模糊匹配一个单词。
  • #:能够模糊匹配零个或多个单词。

因为加入了两个通配定义符,所以Topic交换机的routingKey也有些变化,routingKey可以使用.将单词分开。


这里我们直接来用一个例子说明会更加的清晰:

准备工作:

    // 主题交换机示例
    @Bean
    public Queue topicQueue1() {
        return new Queue("topicQueue1");
    }

    @Bean
    public Queue topicQueue2() {
        return new Queue("topicQueue2");
    }

    @Bean
    public TopicExchange topicExchange() {
        // 三个构造参数:name durable autoDelete
        return new TopicExchange("topicExchange", false, false);
    }

    @Bean
    public Binding topicBinding1() {
        return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("sms.*");
    }

    @Bean
    public Binding topicBinding2() {
        return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("mail.#");
    }

新建两个队列,新建了一个Topic交换机,并设置了绑定关系。

这里的示例代码我们主要看设置routingKey,这里的routingKey用上了通配符,且中间用.隔开,这就代表topicQueue1消费sms开头的消息,topicQueue2消费mail开头的消息,具体不同往下看。

生产者:

    public void sendTopic() {
        Client client = new Client();

        log.info("Message content : " + client);

        rabbitTemplate.convertAndSend("topicExchange","sms.liantong",client);
        System.out.println("消息发送完毕。");
    }

消费者:

@Slf4j
@Component("rabbitTopicConsumer")
public class RabbitTopicConsumer {
    @RabbitListener(queues = "topicQueue1")
    public void onMessage1(Message message, Channel channel) throws Exception {
        log.info("Message content : " + message);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        log.info("消息已确认");
    }

    @RabbitListener(queues = "topicQueue2")
    public void onMessage2(Message message, Channel channel) throws Exception {
        log.info("Message content : " + message);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        log.info("消息已确认");
    }

}

这里我们的生产者发送的消息routingKeysms.liantong,它就会被发到topicQueue1队列中去,这里消息的routingKey也需要用.隔离开,用其他符号无法正确识别。

如果我们的routingKeysms.123.liantong,那么它将无法找到对应的队列,因为topicQueue1的模糊匹配用的通配符是*而不是#,只有#是可以匹配多个单词的。

Topic-ExchangeDirect-Exchange很相似,我就不再赘述了,通配符*#的区别也很简单,大家可以自己试一下。

后记

周一没更文实在惭愧,去医院抽血了,抽了三管~,吃多少才能补回来~

RabbitMQ已经更新了三篇了,这三篇的内容有些偏基础,下一篇将会更新高级部分内容:包括防止消息丢失,防止消息重复消费等等内容,希望大家持续关注。


最近这段时间压力挺大,优狐令我八月底之前升级到三级,所以各位读者的赞对我很重要,希望大家能够高抬贵手,帮我一哈~

好了,以上就是本期的全部内容,感谢你能看到这里,欢迎对本文点赞收藏与评论,👍你们的每个点赞都是我创作的最大动力。

我是耳朵,一个一直想做知识输出的伪文艺程序员,我们下期见。

本文代码:码云地址GitHub地址

查看原文

赞 18 收藏 11 评论 0

和耳朵 回答了问题 · 8月18日

Springboot集成swagger2 打开swagger-ui.html页面出现弹窗

去掉 @ComponentScan({"com.example"}),是不行的,这会导致无法扫描,这个问题我没遇见过但是通过提示应该是因为你的安全框架给的提示把,你是使用了SpringCloud吗?

如果使用了安全框架或者网关,可以将此URL加入白名单。

关注 2 回答 1

和耳朵 回答了问题 · 8月18日

解决springboot中,如何将自己封装的返回格式代码段的new改为注入的形式?

楼上已经说的很清楚了,问题是因为没有写泛型,楼上给的方案也很好。

关注 6 回答 5

和耳朵 回答了问题 · 8月18日

请求获取网页的response,获取网页的html 怎么那么慢

这个地方不转UTF8不行吗?默认不是UTF8吗

关注 3 回答 2

和耳朵 发布了文章 · 8月10日

刚体验完RabbitMQ?一文带你SpringBoot+RabbitMQ方式收发消息

人生终将是场单人旅途,孤独之前是迷茫,孤独过后是成长。

楔子

这篇是消息队列RabbitMQ的第二弹。

上一篇的结尾我也预告了本篇的内容:利用RabbitTemplate和注解进行收发消息,还有一个我临时加上的内容:消息的序列化转换。

本篇会和SpringBoot做整合,采用自动配置的方式进行开发,我们只需要声明RabbitMQ地址就可以了,关于各种创建连接关闭连接的事都由Spring帮我们了~

交给Spring帮我们管理连接可以让我们专注于业务逻辑,就像声明式事务一样易用,方便又高效。


祝有好收获,先赞后看,快乐无限。

本文代码:码云地址GitHub地址

Tip:上一篇的代码都放在prototype包下,本篇的代码都放在auto包下面。

1. 🔍环境配置

第一节我们先来搞一下环境的配置,上一篇中我们已经引入了自动配置的包,我们既然使用了自动配置的方式,那RabbitMQ的连接信息我们直接放在配置文件中就行了,就像我们需要用到JDBC连接的时候去配置一下DataSource一样。

rabbitmq-yml配置

如图所示,我们只需要指明一下连接的IP+端口号和用户名密码就行了,这里我用的是默认的用户名与密码,不写的话默认也都是guest,端口号也是默认5672。

主要我们需要看一下手动确认消息的配置,需要配置成manual才是手动确认,日后还会有其他的配置项,眼下我们配置这一个就可以了。


接下来我们要配置一个Queue,上一篇中我们往一个名叫erduo的队列中发送消息,当时是我们手动定义的此队列,这里我们也需要手动配置,声明一个Bean就可以了。

@Configuration
public class RabbitmqConfig {
    @Bean
    public Queue erduo() {
        // 其三个参数:durable exclusive autoDelete
        // 一般只设置一下持久化即可
        return new Queue("erduo",true);
    }

}

就这么简单声明一下就可以了,当然了RabbitMQ毕竟是一个独立的组件,如果你在RabbitMQ中通过其他方式已经创建过一个名叫erduo的队列了,你这里也可以不声明,这里起到的一个效果就是如果你没有这个队列,会按照你声明的方式帮你创建这个队列。

配置完环境之后,我们就可以以SpringBoot的方式来编写生产者和消费者了。

2. 📕生产者与RabbitTemplate

和上一篇的节奏一样,我们先来编写生产者,不过这次我要引入一个新的工具:RabbitTemplate

听它的这个名字就知道,又是一个拿来即用的工具类,Spring家族这点就很舒服,什么东西都给你封装一遍,让你用起来更方便更顺手。

RabbitTemplate实现了标准AmqpTemplate接口,功能大致可以分为发送消息和接受消息。

我们这里是在生产者中来用,主要就是使用它的发送消息功能:sendconvertAndSend方法。

// 发送消息到默认的Exchange,使用默认的routing key
void send(Message message) throws AmqpException;

// 使用指定的routing key发送消息到默认的exchange
void send(String routingKey, Message message) throws AmqpException;

// 使用指定的routing key发送消息到指定的exchange
void send(String exchange, String routingKey, Message message) throws AmqpException;

send方法是发送byte数组的数据的模式,这里代表消息内容的对象是Message对象,它的构造方法就是传入byte数组数据,所以我们需要把我们的数据转成byte数组然后构造成一个Message对象再进行发送。

// Object类型,可以传入POJO
void convertAndSend(Object message) throws AmqpException;

void convertAndSend(String routingKey, Object message) throws AmqpException;

void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;

convertAndSend方法是可以传入POJO对象作为参数,底层是有一个MessageConverter帮我们自动将数据转换成byte类型或String或序列化类型。

所以这里支持的传入对象也只有三种:byte类型,String类型和实现了Serializable接口的POJO。


介绍完了,我们可以看一下代码:

@Slf4j
@Component("rabbitProduce")
public class RabbitProduce {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send() {
        String message = "Hello 我是作者和耳朵,欢迎关注我。" + LocalDateTime.now().toString();

        System.out.println("Message content : " + message);

        // 指定消息类型
        MessageProperties props = MessagePropertiesBuilder.newInstance()
                .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN).build();

        rabbitTemplate.send(Producer.QUEUE_NAME,new Message(message.getBytes(StandardCharsets.UTF_8),props));
        System.out.println("消息发送完毕。");
    }

    public void convertAndSend() {
        User user = new User();

        System.out.println("Message content : " + user);

        rabbitTemplate.convertAndSend(Producer.QUEUE_NAME,user);
        System.out.println("消息发送完毕。");
    }

}

这里我特意写明了两个例子,一个用来测试send,另一个用来测试convertAndSend。

send方法里我们看下来和之前的代码是几乎一样的,定义一个消息,然后直接send,但是这个构造消息的构造方法可能比我们想的要多一个参数,我们原来说的只要把数据转成二进制数组放进去即可,现在看来还要多放一个参数了。

MessageProperties,是的我们需要多放一个MessageProperties对象,从他的名字我们也可以看出它的功能就是附带一些参数,但是某些参数是少不了的,不带不行。

比如我的代码这里就是设置了一下消息的类型,消息的类型有很多种可以是二进制类型,文本类型,或者序列化类型,JSON类型,我这里设置的就是文本类型,指定类型是必须的,也可以为我们拿到消息之后要将消息转换成什么样的对象提供一个参考。

convertAndSend方法就要简单太多,这里我放了一个User对象拿来测试用,直接指定队列然后放入这个对象即可。

Tips:User必须实现Serializable接口,不然的话调用此方法的时候会抛出IllegalArgumentException异常。


代码完成之后我们就可以调用了,这里我写一个测试类进行调用:

@SpringBootTest
public class RabbitProduceTest {
    @Autowired
    private RabbitProduce rabbitProduce;

    @Test
    public void sendSimpleMessage() {
        rabbitProduce.send();
        rabbitProduce.convertAndSend();
    }
}

效果如下图~

生产者测试

同时在控制台使用命令rabbitmqctl.bat list_queues查看队列-erduo现在的情况:

查看队列情况

如此一来,我们的生产者测试就算完成了,现在消息队列里两条消息了,而且消息类型肯定不一样,一个是我们设置的文本类型,一个是自动设置的序列化类型。

3. 📗消费者与RabbitListener

既然队列里面已经有消息了,接下来我们就要看我们该如何通过新的方式拿到消息并消费与确认了。

消费者这里我们要用到@RabbitListener来帮我们拿到指定队列消息,它的用法很简单也很复杂,我们可以先来说简单的方式,直接放到方法上,指定监听的队列就行了。

@Slf4j
@Component("rabbitConsumer")
public class RabbitConsumer {

    @RabbitListener(queues = Producer.QUEUE_NAME)
    public void onMessage(Message message, Channel channel) throws Exception {
        System.out.println("Message content : " + message);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        System.out.println("消息已确认");
    }

}

这段代码就代表onMessage方法会处理erduo(Producer.QUEUE_NAME是常量字符串"erduo")队列中的消息。

我们可以看到这个方法里面有两个参数,MessageChannel,如果用不到Channel可以不写此参数,但是Message消息一定是要的,它代表了消息本身。

我们可以想想,我们的程序从RabbitMQ之中拉回一条条消息之后,要以怎么样的方式展示给我们呢?

没错,就是封装为一个个Message对象,这里面放入了一条消息的所有信息,数据结构是什么样一会我一run你就能看到了。

同时这里我们使用Channel做一个消息确认的操作,这里的DeliveryTag代表的是这个消息在队列中的序号,这个信息存放在MessageProperties中。

4. 📖SpringBoot 启动!

编写完生产者和消费者,同时已经运行过生产者往消息队列里面放了两条信息,接下来我们可以直接启动消息,查看消费情况:

SpringBoot启动查看消费者

在我红色框线标记的地方可以看到,因为我们有了消费者所以项目启动后先和RabbitMQ建立了一个连接进行监听队列。

随后就开始消费我们队列中的两条消息:

第一条信息是contentType=text/plain类型,所以直接就在控制台上打印出了具体内容。

第二条信息是contentType=application/x-java-serialized-object,在打印的时候只打印了一个内存地址+字节大小。

不管怎么说,数据我们是拿到了,也就是代表我们的消费是没有问题的,同时也都进行了消息确认操作,从数据上看,整个消息可以分为两部分:bodyMessageProperties

我们可以单独使用一个注解拿到这个body的内容 - @Payload

@RabbitListener(queues = Producer.QUEUE_NAME)
public void onMessage(@Payload String body, Channel channel) throws Exception {
    System.out.println("Message content : " + body);
}

也可以单独使用一个注解拿到MessageProperties的headers属性,headers属性在截图里也可以看到,只不过是个空的 - @Headers。

@RabbitListener(queues = Producer.QUEUE_NAME)
public void onMessage(@Payload String body, @Headers Map<String,Object> headers) throws Exception {
    System.out.println("Message content : " + body);
    System.out.println("Message headers : " + headers);
}

这两个注解都算是扩展知识,我还是更喜欢直接拿到全部,全都要!!!

上面我们已经完成了消息的发送与消费,整个过程我们可以再次回想一下,一切都和我画的这张图上一样的轨迹:

rabbit架构图

只不过我们一直没有指定Exchage一直使用的默认路由,希望大家好好记住这张图。

5. 📘@RabbitListener与@RabbitHandler

下面再来补一些知识点,有关@RabbitListener@RabbitHandler

@RabbitListener上面我们已经简单的进行了使用,稍微扩展一下它其实是可以监听多个队列的,就像这样:

@RabbitListener(queues = { "queue1", "queue2" })
public void onMessage(Message message, Channel channel) throws Exception {
    System.out.println("Message content : " + message);
    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    System.out.println("消息已确认");
}

还有一些其他的特性如绑定之类的,这里不再赘述因为太硬编码了一般用不上。

下面来说说这节要主要讲的一个特性:@RabbitListener和@RabbitHandler的搭配使用。

前面我们没有提到,@RabbitListener注解其实是可以注解在类上的,这个注解在类上标志着这个类监听某个队列或某些队列。

这两个注解的搭配使用就要让@RabbitListener注解在类上,然后用@RabbitHandler注解在方法上,根据方法参数的不同自动识别并去消费,写个例子给大家看一看更直观一些。

@Slf4j
@Component("rabbitConsumer")
@RabbitListener(queues = Producer.QUEUE_NAME)
public class RabbitConsumer {

    @RabbitHandler
    public void onMessage(@Payload String message){
        System.out.println("Message content : " + message);
    }

    @RabbitHandler
    public void onMessage(@Payload User user) {
        System.out.println("Message content : " + user);
    }
}

大家可以看看这个例子,我们先用@RabbitListener监听erduo队列中的消息,然后使用@RabbitHandler注解了两个方法。

  • 第一个方法的body类型是String类型,这就代表着这个方法只能处理文本类型的消息。
  • 第二个方法的body类型是User类型,这就代表着这个方法只能处理序列化类型且为User类型的消息。

这两个方法正好对应着我们第二节中测试类会发送的两种消息,所以我们往RabbitMQ中发送两条测试消息,用来测试这段代码,看看效果:

RabbitHandler效果

都在控制台上如常打印了,如果@RabbitHandler注解的方法中没有一个的类型可以和你消息的类型对的上,比如消息都是byte数组类型,这里没有对应的方法去接收,系统就会在控制台不断的报错,如果你出现这个情况就证明你类型写的不正确。

假设你的erduo队列中会出现三种类型的消息:byte,文本和序列化,那你就必须要有对应的处理这三种消息的方法,不然消息发过来的时候就会因为无法正确转换而报错。

而且使用了@RabbitHandler注解之后就不能再和之前一样使用Message做接收类型。

@RabbitHandler
public void onMessage(Message message, Channel channel) throws Exception {
    System.out.println("Message content : " + message);
    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    System.out.println("消息已确认");
}

这样写的话会报类型转换异常的,所以二者选其一。

同时上文我的@RabbitHandler没有进行消息确认,大家可以自己试一下进行消息确认。

6. 📙消息的序列化转换

通过上文我们已经知道,能被自动转换的对象只有byte[]Stringjava序列化对象(实现了Serializable接口的对象),但是并不是所有的Java对象都会去实现Serializable接口,而且序列化的过程中使用的是JDK自带的序列化方法,效率低下。

所以我们更普遍的做法是:使用Jackson先将数据转换成JSON格式发送给RabbitMQ,再接收消息的时候再用Jackson将数据反序列化出来。

这样做可以完美解决上面的痛点:消息对象既不必再去实现Serializable接口,也有比较高的效率(Jackson序列化效率业界应该是最好的了)。

默认的消息转换方案是消息转换顶层接口-MessageConverter的一个子类:SimpleMessageConverter,我们如果要换到另一个消息转换器只需要替换掉这个转换器就行了。

MessageConverter结构树

上图是MessageConverter结构树的结构树,可以看到除了SimpleMessageConverter之外还有一个Jackson2JsonMessageConverter,我们只需要将它定义为Bean,就可以直接使用这个转换器了。

@Bean
    public MessageConverter jackson2JsonMessageConverter() {
        return new Jackson2JsonMessageConverter(jacksonObjectMapper);
    }

这样就可以了,这里的jacksonObjectMapper可以不传入,但是默认的ObjectMapper方案对JDK8的时间日期序列化会不太友好,具体可以参考我的上一篇文章:从LocalDateTime序列化探讨全局一致性序列化,总的来说就是定义了自己的ObjectMapper

同时为了接下来测试方便,我又定义了一个专门测试JSON序列化的队列:

@Bean
public Queue erduoJson() {
    // 其三个参数:durable exclusive autoDelete
    // 一般只设置一下持久化即可
    return new Queue("erduo_json",true);
}

如此之后就可以进行测试了,先是生产者代码

public void sendObject() {
        Client client = new Client();

        System.out.println("Message content : " + client);

        rabbitTemplate.convertAndSend(RabbitJsonConsumer.JSON_QUEUE,client);
        System.out.println("消息发送完毕。");
    }

我又重新定义了一个Client对象,它和之前测试使用的User对象成员变量都是一样的,不一样的是它没有实现Serializable接口。

同时为了保留之前的测试代码,我又新建了一个RabbitJsonConsumer,用于测试JSON序列化的相关消费代码,里面定义了一个静态变量:JSON_QUEUE = "erduo_json";

所以这段代码是将Client对象作为消息发送到"erduo_json"队列中去,随后我们在测试类中run一下进行一次发送。

紧着是消费者代码

@Slf4j
@Component("rabbitJsonConsumer")
@RabbitListener(queues = RabbitJsonConsumer.JSON_QUEUE)
public class RabbitJsonConsumer {
    public static final String JSON_QUEUE = "erduo_json";

    @RabbitHandler
    public void onMessage(Client client, @Headers Map<String,Object> headers, Channel channel) throws Exception {
        System.out.println("Message content : " + client);
        System.out.println("Message headers : " + headers);
        channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG),false);
        System.out.println("消息已确认");
    }

}

有了上文的经验之后,这段代码理解起来也是很简单了吧,同时给出了上一节没写的如何在@RabbitHandler模式下进行消息签收。

我们直接来看看效果:

json模式消息发送

json模式消息消费

在打印的Headers里面,往后翻可以看到contentType=application/json,这个contentType是表明了消息的类型,这里正是说明我们新的消息转换器生效了,将所有消息都转换成了JSON类型。

后记

这两篇讲完了RabbitMQ的基本收发消息,包括手动配置和自动配置的两种方式,这些大家仔细研读之后应该会对RabbitMQ收发消息没什么疑问了~

不过我们一直以来发消息时都是使用默认的交换机,下篇将会讲述一下RabbitMQ的几种交换机类型,以及其使用方式。

讲完了交换机之后,这些RabbitMQ的常用概念基本就完善了。


最近这段时间压力挺大,优狐令我八月底之前升级到三级,所以各位读者的赞对我很重要,希望大家能够高抬贵手,帮我一哈~

好了,以上就是本期的全部内容,感谢你能看到这里,欢迎对本文点赞收藏与评论,👍你们的每个点赞都是我创作的最大动力。

我是耳朵,一个一直想做知识输出的伪文艺程序员,我们下期见。

本文代码:码云地址GitHub地址

查看原文

赞 17 收藏 12 评论 0

和耳朵 赞了文章 · 8月6日

秒杀系统设计

背景

我之前写过一个秒杀系统的文章不过有些许瑕疵,所以我准备在之前的基础上进行二次创作,不过让我决心二创秒杀系统的原因是我最近面试了很多读者,动不动就是秒杀系统把我整蒙蔽了,我懵的主要是秒杀系统的细节大家都不知道,甚至不知道电商公司一个秒杀系统的组成部分。

我之前在某电商公司就是做电商活动的,所以这样的场景和很多解决方案我是比较清楚的,那我就从我自身去带着大家看看一个秒杀的设计细节以及中间各种解决方案的利弊,以下就是我设计的秒杀系统,几乎涵盖了市面上所有秒杀的实现细节:

正文

首先设计一个系统之前,我们需要先确认我们的业务场景是怎么样子的,我就带着大家一起假设一个场景好吧。

我们现场要卖1000件下面这个婴儿纸尿裤,然后我们根据以往这样秒杀活动的数据经验来看,目测来抢这100件纸尿裤的人足足有10万人。(南极人打钱!)

你一听,完了呀,这我们的服务器哪里顶得住啊!说真的直接打DB肯定挂,但是别急嘛,有暖男敖丙在,任何系统我们开始设计之前我们都应该去思考会出现哪些问题?这里我罗列了几个非常经典的问题:

问题

高并发:

是的高并发这个是我们想都不用想的一个点,一瞬间这么多人进来这不是高并发什么时候是呢?

是吧,秒杀的特点就是这样时间极短瞬间用户量大

正常的店铺营销都是用极低的价格配合上短信、APP的精准推送,吸引特别多的用户来参与这场秒杀,爽了商家苦了开发呀

秒杀大家都知道如果真的营销到位,价格诱人,几十万的流量我觉得完全不是问题,那单机的Redis我感觉3-4W的QPS还是能顶得住的,但是再高了就没办法了,那这个数据随便搞个热销商品的秒杀可能都不止了。

大量的请求进来,我们需要考虑的点就很多了,缓存雪崩缓存击穿缓存穿透这些我之前提到的点都是有可能发生的,出现问题打挂DB那就很难受了,活动失败用户体验差,活动人气没了,最后背锅的还是开发

超卖:

但凡是个秒杀,都怕超卖,我这里举例的只是尿不湿,要是换成100个MacBook Pro,商家的预算经费卖100个可以赚点还可以造势,结果你写错程序多卖出去200个,你不发货用户投诉你,平台封你店,你发货就血亏,你怎么办? (没事看了敖丙的文章直接不怕)

那最后只能杀个开发祭天解气了,秒杀的价格本来就低了,基本上都是不怎么赚钱的,超卖了就恐怖了呀,所以超卖也是很关键的一个点。

恶意请求:

你这么低的价格,假如我抢到了,我转手卖掉我不是血赚?就算我不卖我也不亏啊,那用户知道,你知道,别的别有用心的人(黑客、黄牛...)肯定也知道的。

那简单啊,我知道你什么时候抢,我搞个几十台机器搞点脚本,我也模拟出来十几万个人左右的请求,那我是不是意味着我基本上有80%的成功率了。

真实情况可能远远不止,因为机器请求的速度比人的手速往往快太多了,在贵州的敖丙我每年回家抢高铁票都是秒光的,我也不知道有没有黄牛的功劳,我要Diss你,黄牛。杰伦演唱会门票抢不到,我也Diss你。

Tip:科普下,小道消息了解到的,黄牛的抢票系统,比国内很多小公司的系统还吊很多,架构设计都是顶级的,我用顶配的服务加上顶配的架构设计,你还想看演唱会?还想回家?

不过不用黄牛我回家都难,我们云贵川跟我一样要回家过年的仔太多了555!

链接暴露:

前面几个问题大家可能都很好理解,一看到这个有的小伙伴可能会比较疑惑,啥是链接暴露呀?

相信是个开发同学都对这个画面一点都不陌生吧,懂点行的仔都可以打开谷歌的开发者模式,然后看看你的网页代码,有的就有URL,但是我写VUE的时候是事件触发然后去调用文件里面的接口看源码看不到,但是我可以点击一下查看你的请求地址啊,不过你好像可以对按钮在秒杀前置灰。

不管怎么样子都有危险,撇开外面的所有的东西你都挡住了,你卖这个东西实在便宜得过分,有诱惑力,你能保证开发不动心?开发知道地址,在秒杀的时候自己提前请求。。。(开发:怎么TM又是我)

数据库:

每秒上万甚至十几万的QPS(每秒请求数)直接打到数据库,基本上都要把库打挂掉,而且你服务不单单是做秒杀的还涉及其他的业务,你没做降级、限流、熔断啥的,别的一起挂,小公司的话可能全站崩溃404

反正不管你秒杀怎么挂,你别把别的搞挂了对吧,搞挂了就不是杀一个程序员能搞定的。

程序员:我TM好难啊!

问题都列出来了,那怎么设计,怎么解决这些问题就是接下去要考虑的了,我们对症下药。

我会从我设计的秒杀系统从上到下去给大家介绍我们正常电商秒杀系统在每一层做了些什么,每一层存在的问题,难点等。

我们从前端开始:

前端

秒杀系统普遍都是商城网页、H5、APP、小程序这几项。

在前端这一层其实我们可以做的事情有很多,如果用node去做,甚至能直接处理掉整个秒杀,但是node其实应该属于后端,所以我不讨论node Service了。

资源静态化:

秒杀一般都是特定的商品还有页面模板,现在一般都是前后端分离的,页面一般都是不会经过后端的,但是前端也要自己的服务器啊,那就把能提前放入cdn服务器的东西都放进去,反正把所有能提升效率的步骤都做一下,减少真正秒杀时候服务器的压力。

秒杀链接加盐:

我们上面说了链接要是提前暴露出去可能有人直接访问url就提前秒杀了,那又有小伙伴要说了我做个时间的校验就好了呀,那我告诉你,知道链接的地址比起页面人工点击的还是有很大优势

我知道url了,那我通过程序不断获取最新的北京时间,可以达到毫秒级别的,我就在00毫秒的时候请求,我敢说绝对比你人工点的成功率大太多了,而且我可以一毫秒发送N次请求,搞不好你卖100个产品我全拿了。

那这种情况怎么避免?

简单,把URL动态化,就连写代码的人都不知道,你就通过MD5之类的摘要算法加密随机的字符串去做url,然后通过前端代码获取url后台校验才能通过。

这个只能防止一部分没耐心继续破解下去的黑客,有耐心的人研究出来还是能破解,在电商场景存在很多这样的羊毛党,那怎么做呢?

后面我会说。

限流:

限流这里我觉得应该分为前端限流后端限流

物理控制:

大家有没有发现没到秒杀前,一般按钮都是置灰的,只有时间到了,才能点击。

这是因为怕大家在时间快到的最后几秒秒疯狂请求服务器,然后还没到秒杀的时候基本上服务器就挂了。

这个时候就需要前端的配合,定时去请求你的后端服务器,获取最新的北京时间,到时间点再给按钮可用状态。

按钮可以点击之后也得给他置灰几秒,不然他一样在开始之后一直点的。

你敢说你们秒杀的时候不是这样的?

前端限流:这个很简单,一般秒杀不会让你一直点的,一般都是点击一下或者两下然后几秒之后才可以继续点击,这也是保护服务器的一种手段。

后端限流:秒杀的时候肯定是涉及到后续的订单生成和支付等操作,但是都只是成功的幸运儿才会走到那一步,那一旦100个产品卖光了,return了一个false,前端直接秒杀结束,然后你后端也关闭后续无效请求的介入了。

Tip:真正的限流还会有限流组件的加入例如:阿里的Sentinel、Hystrix等。我这里就不展开了,就说一下物理的限流。

我们卖1000件商品,请求有10W,我们不需要把十万都放进来,你可以放1W请求进来,然后再进行操作,因为秒杀对于用户本身就是黑盒的,所以你怎么做的他们是没感知的,至于为啥放1W进来,而不是刚好1000,是因为会丢掉一些薅羊毛的用户,至于怎么判断,后面的风控阶段我会说。

Nginx:

Nginx大家想必都不陌生了吧,这玩意是高性能的web服务器,并发也随便顶几万不是梦,但是我们的Tomcat只能顶几百的并发呀,那简单呀负载均衡嘛,一台服务几百,那就多搞点,在秒杀的时候多租点流量机

Tip:据我所知国内某大厂就是在去年春节活动期间租光了亚洲所有的服务器,小公司也很喜欢在双十一期间买流量机来顶住压力。

这样一对比是不是觉得你的集群能顶很多了。

恶意请求拦截也需要用到它,一般单个用户请求次数太夸张,不像人为的请求在网关那一层就得拦截掉了,不然请求多了他抢不抢得到是一回事,服务器压力上去了,可能占用网络带宽或者把服务器打崩、缓存击穿等等。

风控

我可以明确的告诉大家,前面的所有措施还是拦不住很多羊毛党,因为他们是专业的团队,他们可以注册很多账号来薅你的羊毛,而且不用机器请求,就用群控,操作几乎跟真实用户一模一样。

那怎么办,是不是无解了?

这个时候就需要风控同学的介入了,在请求到达后端之前,风控可以根据账号行为分析出这个账号机器人的概率大不大,我现在负责公司的某些特殊系统,每个用户的行为都是会送到我们大数据团队进行分析处理,给你打上对应标签的。

那黑客其实也有办法:养号

他们去黑市买真实用户有过很多记录的账号,买到了还不闲着,帮他们去购物啥的,让系统无法识别他们是黑号还是真实用户的号。

怎么办?

通杀!是的没有办法,只能通杀了,通杀的意思就是,我们通过风管分析出来这个用户是真实用户的概率没有其他用户概率大,那就认为他是机器了,丢弃他的请求。

之前的限流我们放进来10000个请求,但是我们真正的库存只有1000个,那我们就算出最有可能是真实用户的1000人进行秒杀,丢弃其他请求,因为秒杀本来就是黑盒操作的,用户层面是无感知的,这样设计能让真实的用户买到东西,还可以减少自己被薅羊毛的概率。

风控可以说是流量进入的最后一道门槛了,所以很多公司的风控是很强的,蚂蚁金服的风控大家如果了解过就知道了,你的资金在支付宝被盗了,他们是能做到全款补偿是有原因的。

后端

服务单一职责:

设计个能抗住高并发的系统,我觉得还是得单一职责

什么意思呢,大家都知道现在设计都是微服务的设计思想,然后再用分布式的部署方式

也就是我们下单是有个订单服务,用户登录管理等有个用户服务等等,那为啥我们不给秒杀也开个服务,我们把秒杀的代码业务逻辑放一起。

单一职责的好处就是就算秒杀没抗住,秒杀库崩了,服务挂了,也不会影响到其他的服务。(高可用)

Redis集群:

之前不是说单机的Redis顶不住嘛,那简单多找几个兄弟啊,秒杀本来就是读多写少,那你们是不是瞬间想起来我之前跟你们提到过的,Redis集群主从同步读写分离,我们还搞点哨兵,开启持久化直接无敌高可用!

库存预热:

秒杀的本质,就是对库存的抢夺,每个秒杀的用户来你都去数据库查询库存校验库存,然后扣减库存,撇开性能因数,你不觉得这样好繁琐,对业务开发人员都不友好,而且数据库顶不住啊。

开发:你tm总算为我着想一次了。

那怎么办?

我们都知道数据库顶不住但是他的兄弟非关系型的数据库Redis能顶啊!

那不简单了,我们要开始秒杀前你通过定时任务或者运维同学提前把商品的库存加载到Redis中去,让整个流程都在Redis里面去做,然后等秒杀介绍了,再异步的去修改库存就好了。

但是用了Redis就有一个问题了,我们上面说了我们采用主从,就是我们会去读取库存然后再判断然后有库存才去减库存,正常情况没问题,但是高并发的情况问题就很大了。

**多品几遍!!!**就比如现在库存只剩下1个了,我们高并发嘛,4个服务器一起查询了发现都是还有1个,那大家都觉得是自己抢到了,就都去扣库存,那结果就变成了-3,是的只有一个是真的抢到了,别的都是超卖的。咋办?

事务:

Redis本身是支持事务的,而且他有很多原子命令的,大家也可以用LUA,还可以用他的管道,乐观锁他也知支持。

限流&降级&熔断&隔离:

这个为啥要做呢,不怕一万就怕万一,万一你真的顶不住了,限流,顶不住就挡一部分出去但是不能说不行,降级,降级了还是被打挂了,熔断,至少不要影响别的系统,隔离,你本身就独立的,但是你会调用其他的系统嘛,你快不行了你别拖累兄弟们啊。

消息队列(削峰填谷):

一说到这个名词,很多小伙伴就知道了,对的MQ,你买东西少了你直接100个请求改库我觉得没问题,但是万一秒杀一万个,10万个呢?服务器挂了,程序员又要背锅的

秒杀就是这种瞬间流量很高,但是平时又没有流量的场景,那消息队列完全契合这样的场景了呀,削峰填谷。

Tip:可能小伙伴说我们业务达不到这个量级,没必要。但是我想说我们写代码,就不应该写出有逻辑漏洞的代码,至少以后公司体量上去了,别人一看居然不用改代码,一看代码作者是敖丙?有点东西!

你可以把它放消息队列,然后一点点消费去改库存就好了嘛,不过单个商品其实一次修改就够了,我这里说的是某个点多个商品一起秒杀的场景,像极了双十一零点。

数据库

数据库用MySQL只要连接池设置合理一般问题是不大的,不过一般大公司不缺钱而且秒杀这样的活动十分频繁,我之前所在的公司就是这样秒杀特卖这样的场景一直都是不间断的。

单独给秒杀建立一个数据库,为秒杀服务,表的设计也是竟可能的简单点,现在的互联网架构部署都是分库的。

至于表就看大家怎么设计了,该设置索引的地方还是要设置索引的,建完后记得用explain看看SQL的执行计划。(不了解的小伙伴也没事,MySQL章节去康康)

分布式事务

这为啥我不放在后端而放到最后来讲呢?

因为上面的任何一步都是可能出错的,而且我们是在不同的服务里面出错的,那就涉及分布式事务了,但是分布式事务大家想的是一定要成功什么的那就不对了,还是那句话,几个请求丢了就丢了,要保证时效和服务的可用可靠。

所以TCC最终一致性其实不是很适合,TCC开发成本很大,所有接口都要写三次,因为涉及TCC的三个阶段。

最终一致性基本上都是靠轮训的操作去保证一个操作一定成功,那时效性就大打折扣了。

大家觉得不那么可靠的**两段式(2PC)三段式(3PC)**就派上用场了,他们不一定能保证数据最终一致,但是效率上还算ok。

总结

到这里我想我已经基本上把该考虑的点还有对应的解决方案也都说了一下,不知道还有没有没考虑到的,但是就算没考虑到我想我这个设计,应该也能撑住一个完整的秒杀流程。

最后大家再看看这个秒杀系统或许会有新的感悟,是不是一个系统真的没有大家想的那么简单,而且我还是有漏掉的细节,这是一定的。

秒杀这章我脑细胞死了很多,考虑了很多个点,最后还是出来了,忍不住给自己点赞

总结

我们玩归玩,闹归闹,别拿面试开玩笑。

秒杀不一定是每个同学都会问到的,至少肯定没Redis基础那样常问,但是一旦问到,大家一定要回答到点上。

至少你得说出可能出现的情况需要注意的情况,以及对于的解决思路和方案,因为这才是一个coder的基本素养,这些你不考虑你也很难去进步。

最后就是需要对整个链路比较熟悉,注意是一个完整的链路,前端怎么设计的呀,网关的作用呀,怎么解决Redis的并发竞争啊,数据的同步方式呀,MQ的作用啊等等,相信你会有不错的收获。

不知道这是一次成功还是失败的二创,我里面所有提到的技术细节我都写了对应的文章,大家可以关注我去历史文章看看,天色已晚,我溜了。

查看原文

赞 98 收藏 57 评论 9

和耳朵 回答了问题 · 8月3日

如何向一个指定的邮箱发送1W封邮件

for循环调用发送即可,邮件title应该定义好,不然可能会被识别为垃圾邮件。

关注 5 回答 4

认证与成就

  • 获得 148 次点赞
  • 获得 2 枚徽章 获得 0 枚金徽章, 获得 0 枚银徽章, 获得 2 枚铜徽章

擅长技能
编辑

(゚∀゚ )
暂时没有

开源项目 & 著作
编辑

(゚∀゚ )
暂时没有

注册于 7月11日
个人主页被 5.6k 人浏览