公子

公子 查看完整档案

上海编辑河南农业大学  |  电子信息工程 编辑腾道科技  |  Python 开发 编辑 zhaoxicn@foxmail.com 编辑
编辑

别说话, 让我安静一会...

个人动态

公子 赞了回答 · 2月22日

解决crontab任务python程序如何读取docker-compose里的environment变量

问题终于解决了。crontab的环境变量与系统的环境变量确实不一样,如此解决思路便是使其所在环境变量一致即可。

打印crontab环境变量:

SHELL=/bin/bash
PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin

* * * * * root env >> /work/logs/demo.log 2>&1

可以看出两者环境变量确实不一样:

$ docker exec -it demo bash
root@b6b6fd26170b:/work# env
PYTHONUNBUFFERED=1
HOSTNAME=xxx
PYTHON_VERSION=3.6.9
REDIS_HOST=127.0.0.1
PWD=/work
MYSQL_PASSWORD=password
MYSQL_USER=user_name
HOME=/root
LANG=C.UTF-8
GPG_KEY=xxx
MYSQL_HOST=127.0.0.1
TERM=xterm
SHLVL=1
PROD=1
MYSQL_DATABASE=demo
PYTHON_PIP_VERSION=19.3.1
PYTHON_GET_PIP_SHA256=b86f36cc4345ae87bfd4f10ef6b2dbfa7a872fbff70608a1e43944d283fd0eee
PYTHON_GET_PIP_URL=https://github.com/pypa/get-pip/raw/ffe826207a010164265d9cc807978e3604d18ca0/get-pip.py
PATH=/usr/local/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin
_=/usr/bin/env

root@b6b6fd26170b:/work# tail -n 8 logs/demo.log 
###################
SHELL=/bin/bash
PWD=/root
LOGNAME=root
HOME=/root
SHLVL=1
PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin
_=/usr/bin/env

解决方案是让shell包裹python程序,然后用crontab调度shell脚本。

其中shell需要引入系统的环境变量:

#!/bin/bash
PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin
export $(cat /proc/1/environ |tr '\0' '\n' | xargs)
python /work/demo.py >> /work/logs/demo.log 

crontab调度shell脚本:

PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin

* * * * * /work/deploy/start.sh

如此python便能正常引入环境变量

PROD:  1
## 2021-02-08 17:32:01
MYSQL_HOST:  127.0.0.1 REDIS_HOST:  127.0.0.1

参考链接:

https://www.tony-yin.site/2018/10/29/Why-Crontab-Not-Work/#footer

https://www.cnblogs.com/xuxinkun/p/10531091.html

关注 2 回答 2

公子 赞了文章 · 1月26日

python模块之enum_上

enum模块定义了:

  • 4种枚举类:Enum, IntEnum, Flag, IntFlag
  • 装饰器:unique()
  • 助手:auto

Flag, IntFlag, auto在python3.6中加入

创建枚举

from enum import Enum
class Color(Enum):
    RED = 2
    GREEN = 4
    BLUE = 6

注意点:
1. 枚举值可以是任何类型,如果值不重要可以使用auto()自动选择。但在有其他已定义的值的情况下,谨慎与auto混用
2. Color是枚举类,Color.RED等是枚举成员,枚举成员拥有name和value属性
3. 虽然使用class关键字创建,但枚举并不是常规意义上的python类

枚举成员的展现形式:

>>>print(Color.RED)
Color.RED

>>>print(repr(Color.RED))
<Color.RED: 2>

枚举成员的type类型是其所属的枚举类:

>>>type(Color.RED)
<enum 'Color'>
>>>isinstance(Color.RED, Color)
True

枚举支持按照定义时的顺序进行迭代:

>>>for color in Color:
...    print(color)
...
Color.RED
Color.GREEN
Color.BLUE    

枚举成员是可哈希的,因此可以在字典和集合中使用:

>>> apples = {}
>>> apples[Color.RED] = 'red delicious'
>>> apples[Color.GREEN] = 'granny smith'
>>> apples == {Color.RED: 'red delicious', Color.GREEN: 'granny smith'}
True

对枚举成员及其属性的程序化访问

通过值访问枚举成员:

>>>Color(2)
<Color.RED: 2>

通过名称访问枚举成员:

>>>Color["RED"]
<Color.RED: 2>

获取枚举成员的名称和值:

>>>member = Color.RED
>>>member.name
"RED"
>>>member.value
2

枚举成员及其值的重复性问题

拥有两个相同名称的枚举成员是不允许的:

>>> class Shape(Enum):
...     SQUARE = 2
...     SQUARE = 3
...
Traceback (most recent call last):
...
TypeError: Attempted to reuse key: 'SQUARE'

不过不同的枚举成员允许拥有相同的值。后定义的成员是先定义的成员的别名,通过值或名称访问时都将返回先定义的成员:

>>> class Shape(Enum):
...     SQUARE = 2
...     DIAMOND = 1
...     CIRCLE = 3
...     ALIAS_FOR_SQUARE = 2
...
>>> Shape.SQUARE
<Shape.SQUARE: 2>
>>> Shape.ALIAS_FOR_SQUARE
<Shape.SQUARE: 2>
>>> Shape(2)
<Shape.SQUARE: 2>

注意点:任意两个枚举属性(包括成员、方法等)不允许存在相同的名称

枚举值唯一约束

默认情况下,允许多个成员拥有相同的值。使用unique装饰器可以对枚举值进行唯一约束

@enum.unique: 枚举专用的类装饰器。它在枚举的__members__属性中只要查找到成员别名就抛出ValueError异常

>>> from enum import Enum, unique
>>> @unique
... class Mistake(Enum):
...     ONE = 1
...     TWO = 2
...     THREE = 3
...     FOUR = 3
...
Traceback (most recent call last):
...
ValueError: duplicate values found in <enum 'Mistake'>: FOUR -> THREE

自动生成枚举值

对于不重要的枚举值,可以使用auto自动生成:

>>> from enum import Enum, auto
>>> class Color(Enum):
...     RED = auto()
...     BLUE = auto()
...     GREEN = auto()
...
>>> list(Color)
[<Color.RED: 1>, <Color.BLUE: 2>, <Color.GREEN: 3>]

auto生成什么值取决于_generate_next_value_()方法,可重写:

>>> class AutoName(Enum):
...     def _generate_next_value_(name, start, count, last_values):
...         return name
...
>>> class Ordinal(AutoName):
...     NORTH = auto()
...     SOUTH = auto()
...     EAST = auto()
...     WEST = auto()
...
>>> list(Ordinal)
[<Ordinal.NORTH: 'NORTH'>, <Ordinal.SOUTH: 'SOUTH'>, <Ordinal.EAST: 'EAST'>, <Ordinal.WEST: 'WEST'>]

迭代

对枚举成员的迭代,并不会包含成员别名:

>>> list(Shape)
[<Shape.SQUARE: 2>, <Shape.DIAMOND: 1>, <Shape.CIRCLE: 3>]

__members__属性是一个映射了枚举成员及其名称的有序字典,包括成员别名:

>>> for name, member in Shape.__members__.items():
...     name, member
...
('SQUARE', <Shape.SQUARE: 2>)
('DIAMOND', <Shape.DIAMOND: 1>)
('CIRCLE', <Shape.CIRCLE: 3>)
('ALIAS_FOR_SQUARE', <Shape.SQUARE: 2>)

>>> [name for name, member in Shape.__members__.items() if member.name != name]
['ALIAS_FOR_SQUARE']

枚举比较(后两种不适用于IntEnum)

>>> Color.RED is Color.RED
True

>>> Color.RED == Color.BLUE
False

>>> Color.RED < Color.BLUE
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: '<' not supported between instances of 'Color' and 'Color'

>>> Color.BLUE == 6 # 与非枚举的值进行等值比较总是返回False
False

允许的枚举成员与属性

枚举是python类,也可以拥有普通方法和特殊方法:

class Mood(Enum):
    FUNKY = 1
    HAPPY = 3

    def describe(self):
        # self is the member here
        return self.name, self.value

    def __str__(self):
        return 'my custom str! {0}'.format(self.value)

    @classmethod
    def favorite_mood(cls):
        # cls here is the enumeration
        return cls.HAPPY

注意点:如果枚举中定义了__new()__或者__init__()方法,赋值给枚举成员的值将被传递到__new()__或者__init__()

枚举的继承限制

自定义枚举类必须继承自一个枚举基类,至多一个具体的数据类型以及0至多个混合类。继承顺序如下:

class EnumName([mix-in, ...,] [data-type,] base-enum):
    pass

基类枚举如果已经定义了成员,则不能被任何子类继承,如下第一种是不允许的,但第二种可以:

>>> class MoreColor(Color):
...     PINK = 17
...
Traceback (most recent call last):
...
TypeError: Cannot extend enumerations
>>> class Foo(Enum):
...     def some_behavior(self):
...         pass
...
>>> class Bar(Foo):
...     HAPPY = 1
...     SAD = 2
...

不能这么做的原因是可能破坏某些重要的不允许改变的值(原话是would lead to a violation of some important invariants of bytes and instances)。

序列化

>>> from a.b import Color
>>> from pickle import dumps, loads
>>> Color.RED is loads(dumps(Color.RED))
True

一般要求序列化的枚举要定义在模块顶层,因为反序列化要求枚举能够从模块导入。不过在第4版的pickle协议中,已经可以序列化嵌套在类中的枚举

通过在枚举中定义__reduce_ex__()方法,可以修改枚举成员的序列化/反序列化行为

Functional API

枚举类是可调用的:

>>> Animal = Enum("Pet", "Tortoise CAT DOG")

完整的API如下:
Enum(value='NewEnumName', names=<...>, *, module='...', qualname='...', type=<mixed-in class>, start=1)
图片描述

查看原文

赞 1 收藏 0 评论 1

公子 回答了问题 · 2020-10-29

解决虚拟机 CentOS 不能访问 HTTPS 地址

由于我是通过 WindowsHyper-v 虚拟机装的 CentOS8, 使用的是默认虚拟交换机(Default Switch), 不能访问 HTTPS, 具体原因我也不清楚....

(临时)解决办法

就是创建一个新的虚拟交换机(内部), 并共享网络给他, 虚拟机使用这个网络.

关注 3 回答 2

公子 提出了问题 · 2020-10-28

解决虚拟机 CentOS 不能访问 HTTPS 地址

访问 HTTP 是正常的:

[root@localhost ~]# wget http://www.baidu.com
--2020-10-28 18:00:30--  http://www.baidu.com/
Resolving www.baidu.com (www.baidu.com)... 180.101.49.12, 180.101.49.11
Connecting to www.baidu.com (www.baidu.com)|180.101.49.12|:80... connected.
HTTP request sent, awaiting response... 302 Moved Temporarily
Location: http://172.28.3.10/webAuth/index.htm?www.baidu.com/ [following]
--2020-10-28 18:00:30--  http://172.28.3.10/webAuth/index.htm?www.baidu.com/
Connecting to 172.28.3.10:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 3995 (3.9K) [text/html]
Saving to: ‘index.html.1’

index.html.1                                    100%[==============>]   3.90K  --.-KB/s    in 0s      

2020-10-28 18:00:31 (419 MB/s) - ‘index.html.1’ saved [3995/3995]

访问 HTTPS 异常:

[root@localhost ~]# wget https://www.baidu.com
--2020-10-28 18:05:07--  https://www.baidu.com/
Resolving www.baidu.com (www.baidu.com)... 180.101.49.11, 180.101.49.12
Connecting to www.baidu.com (www.baidu.com)|180.101.49.11|:443... failed: Connection timed out.
Connecting to www.baidu.com (www.baidu.com)|180.101.49.12|:443... 

防火墙和 iptables 都已经停了

[root@localhost ~]# systemctl status firewalld.service 
● firewalld.service - firewalld - dynamic firewall daemon
   Loaded: loaded (/usr/lib/systemd/system/firewalld.service; disabled; vendor preset: enabled)
   Active: inactive (dead)
     Docs: man:firewalld(1)
[root@localhost ~]# systemctl status iptables.service 
● iptables.service - IPv4 firewall with iptables
   Loaded: loaded (/usr/lib/systemd/system/iptables.service; disabled; vendor preset: disabled)
   Active: inactive (dead)

总感觉是网络策略的问题, 但是对 Linux 不精通, 不知道真正问题出在哪里

关注 3 回答 2

公子 提出了问题 · 2020-10-28

解决虚拟机 CentOS 不能访问 HTTPS 地址

访问 HTTP 是正常的:

[root@localhost ~]# wget http://www.baidu.com
--2020-10-28 18:00:30--  http://www.baidu.com/
Resolving www.baidu.com (www.baidu.com)... 180.101.49.12, 180.101.49.11
Connecting to www.baidu.com (www.baidu.com)|180.101.49.12|:80... connected.
HTTP request sent, awaiting response... 302 Moved Temporarily
Location: http://172.28.3.10/webAuth/index.htm?www.baidu.com/ [following]
--2020-10-28 18:00:30--  http://172.28.3.10/webAuth/index.htm?www.baidu.com/
Connecting to 172.28.3.10:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 3995 (3.9K) [text/html]
Saving to: ‘index.html.1’

index.html.1                                    100%[==============>]   3.90K  --.-KB/s    in 0s      

2020-10-28 18:00:31 (419 MB/s) - ‘index.html.1’ saved [3995/3995]

访问 HTTPS 异常:

[root@localhost ~]# wget https://www.baidu.com
--2020-10-28 18:05:07--  https://www.baidu.com/
Resolving www.baidu.com (www.baidu.com)... 180.101.49.11, 180.101.49.12
Connecting to www.baidu.com (www.baidu.com)|180.101.49.11|:443... failed: Connection timed out.
Connecting to www.baidu.com (www.baidu.com)|180.101.49.12|:443... 

防火墙和 iptables 都已经停了

[root@localhost ~]# systemctl status firewalld.service 
● firewalld.service - firewalld - dynamic firewall daemon
   Loaded: loaded (/usr/lib/systemd/system/firewalld.service; disabled; vendor preset: enabled)
   Active: inactive (dead)
     Docs: man:firewalld(1)
[root@localhost ~]# systemctl status iptables.service 
● iptables.service - IPv4 firewall with iptables
   Loaded: loaded (/usr/lib/systemd/system/iptables.service; disabled; vendor preset: disabled)
   Active: inactive (dead)

总感觉是网络策略的问题, 但是对 Linux 不精通, 不知道真正问题出在哪里

关注 3 回答 2

公子 赞了问题 · 2020-10-27

修改了/etc/docker/daemon.json,发现拉取数据时还是用的docke.io

使用了网易镜像

{
"registry-mirrors": ["https://hub-mirror.c.163.com"]
}

但在使用 docker-compose up 的时候,但是为什么还从 docker.io 拉取的数据,不是应该从 163 获取的么?

已经重启过docker
systemctl daemon-reload
systemctl restart docker

关注 2 回答 0

公子 赞了回答 · 2020-10-22

解决计算密集型和IO密集型

  • IO密集型:就是IO比较多的应用,比如网络传输、数据库等调用。web应用大多数是这种

  • 计算密集型:顾名思义就是需要大量的CPU计算的应用类型。像云计算一类的应用应该属于这种。

关注 7 回答 4

公子 赞了回答 · 2020-10-22

解决计算密集型和IO密集型

有一个被用烂了的栗子,解释 Node.js 语言 非阻塞事件驱动 概念,大约是这样的:

你到餐馆吃饭,餐馆里的服务员在帮你点单之后,会直接把菜单塞给厨房,然后立刻去到下一位顾客处,而非在厨房等待(阻塞)。直到烹饪结束后,厨师大喊“来拿菜”(事件)。服务员跑回厨房,把菜品端到你的桌子上(事件处理/回调

在这个栗子中,我们可以简单的理解为:服务员相当于 CPU,而厨房的工作就是I/O。显然的,在一个饭店里,服务员的工作并不复杂,”等待上菜的时间“多数是花在”等待厨房“上。这与如今大多数网站的情况相似:网站通常不需要做太多复杂的运算,但需要花费大量的时间等待 I/O 处理,比如数据库查询,比如图片、视频的读取。

于是 Node.js 舍弃了传统 Web 服务中“每当一次请求来临,都打开一个线程来单独处理”的做法,而是采用事件驱动的模型,默认情况下,仅用单线程就可以负担相当高的并发量。

于是在这种情境下,我们说:Node.js 更适合 IO密集型的任务处理

但如果我们把上面的栗子更换一下,比如说咱们不开饭馆,开银行。每当客户到来,要求取出指定的款项,作为服务员,你需要根据客户的账户等级计算利率,计算利息,计算分红,等等……(随意想到的比方,可能不太恰当),而“取钱并交给客户”这个动作本身却并不复杂。

这时候,就不能指望像饭店那样,只靠一个服务员就能应付大量的客户,因为每个请求都需要独占服务员大量的时间(不可避免的阻塞)。那么此时,传统的模型,例如 PHP 或许就变得更加合适了。

以上,希望能解你所惑

关注 7 回答 4

公子 赞了文章 · 2020-09-03

RabbitMQ高级之如何保证消息可靠性?

人生终将是场单人旅途,孤独之前是迷茫,孤独过后是成长。

楔子

本篇是消息队列RabbitMQ的第四弹。

RabbitMQ我已经写了三篇了,基础的收发消息和基础的概念我都已经写了,学任何东西都是这样,先基础的上手能用,然后遇到问题再去解决,无法理解就去深入源码,随着时间的积累对这一门技术的理解也会随之提高。

基础操作已经熟练后,相信大家不可避免的会生出向那更高处攀登的心来,今天我就罗列一些RabbitMQ比较高级的用法,有些用得到有些用不上,但是一定要有所了解,因为大部分情况我们都是面向面试学习~

  • 如何保证消息的可靠性?
  • 消息队列如何进行限流?
  • 如何设置延时队列进行延时消费?


祝有好收获,先赞后看,快乐无限。

本文代码:码云地址GitHub地址

1. 📖如何保证消息的可靠性?

rabbit架构图

先来看看我们的万年老图,从图上我们大概可以看出来一个消息会经历四个节点,只有保证这四个节点的可靠性才能保证整个系统的可靠性。

  • 生产者发出后保证到达了MQ。
  • MQ收到消息保证分发到了消息对应的Exchange。
  • Exchange分发消息入队之后保证消息的持久性。
  • 消费者收到消息之后保证消息的正确消费。

经历了这四个保证,我们才能保证消息的可靠性,从而保证消息不会丢失。

2. 🔍生产者发送消息到MQ失败

我们的生产者发送消息之后可能由于网络闪断等各种原因导致我们的消息并没有发送到MQ之中,但是这个时候我们生产端又不知道我们的消息没有发出去,这就会造成消息的丢失。

为了解决这个问题,RabbitMQ引入了事务机制发送方确认机制(publisher confirm),由于事务机制过于耗费性能所以一般不用,这里我着重讲述发送方确认机制

这个机制很好理解,就是消息发送到MQ那端之后,MQ会回一个确认收到的消息给我们


打开此功能需要配置,接下来我来演示一下配置:

spring:
  rabbitmq:
    addresses: 127.0.0.1
    host: 5672
    username: guest
    password: guest
    virtual-host: /
    # 打开消息确认机制
    publisher-confirm-type: correlated

我们只需要在配置里面打开消息确认即可(true是返回客户端,false是自动删除)。

生产者:

    public void sendAndConfirm() {
        User user = new User();

        log.info("Message content : " + user);

        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend(Producer.QUEUE_NAME,user,correlationData);
        log.info("消息发送完毕。");

        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback(){
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                log.info("CorrelationData content : " + correlationData);
                log.info("Ack status : " + ack);
                log.info("Cause content : " + cause);
                if(ack){
                    log.info("消息成功发送,订单入库,更改订单状态");
                }else{
                    log.info("消息发送失败:"+correlationData+", 出现异常:"+cause);
                }
            }
        });
    }

生产者代码里我们看到又多了一个参数:CorrelationData,这个参数是用来做消息的唯一标识,同时我们打开消息确认之后需要对rabbitTemplate多设置一个setConfirmCallback,参数是一个匿名类,我们消息确认成功or失败之后的处理就是写在这个匿名类里面。

比如一条订单消息,当消息确认到达MQ确认之后再行入库或者修改订单的节点状态,如果消息没有成功到达MQ可以进行一次记录或者将订单状态修改。

Tip:消息确认失败不只有消息没发过去会触发,消息发过去但是找不到对应的Exchange,也会触发。

3. 📔MQ接收失败或者路由失败

生产者的发送消息处理好了之后,我们就可以来看看MQ端的处理,MQ可能出现两个问题:

  1. 消息找不到对应的Exchange。
  2. 找到了Exchange但是找不到对应的Queue。

这两种情况都可以用RabbitMQ提供的mandatory参数来解决,它会设置消息投递失败的策略,有两种策略:自动删除或返回到客户端。

我们既然要做可靠性,当然是设置为返回到客户端。


配置:

spring:
  rabbitmq:
    addresses: 127.0.0.1
    host: 5672
    username: guest
    password: guest
    virtual-host: /
    # 打开消息确认机制
    publisher-confirm-type: correlated
    # 打开消息返回
    publisher-returns: true
    template:
      mandatory: true

我们只需要在配置里面打开消息返回即可,template.mandatory: true这一步不要少~

生产者:

    public void sendAndReturn() {
        User user = new User();

        log.info("Message content : " + user);

        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            log.info("被退回的消息为:{}", message);
            log.info("replyCode:{}", replyCode);
            log.info("replyText:{}", replyText);
            log.info("exchange:{}", exchange);
            log.info("routingKey:{}", routingKey);
        });

        rabbitTemplate.convertAndSend("fail",user);
        log.info("消息发送完毕。");
    }

这里我们可以拿到被退回消息的所有信息,然后再进行处理,比如放到一个新的队列单独处理,路由失败一般都是配置问题了。

4. 📑消息入队之后MQ宕机

到这一步基本都是一些很小概率的问题了,比如MQ突然宕机了或者被关闭了,这种问题就必须要对消息做持久化,以便MQ重新启动之后消息还能重新恢复过来。

消息的持久化要做,但是不能只做消息的持久化,还要做队列的持久化和Exchange的持久化。

    @Bean
    public DirectExchange directExchange() {
        // 三个构造参数:name durable autoDelete
        return new DirectExchange("directExchange", false, false);
    }

    @Bean
    public Queue erduo() {
        // 其三个参数:durable exclusive autoDelete
        // 一般只设置一下持久化即可
        return new Queue("erduo",true);
    }

创建Exchange和队列时只要设置好持久化,发送的消息默认就是持久化消息。

设置持久化时一定要将Exchange和队列都设置上持久化:

单单只设置Exchange持久化,重启之后队列会丢失。单单只设置队列的持久化,重启之后Exchange会消失,既而消息也丢失,所以如果不两个一块设置持久化将毫无意义。

Tip: 这些都是MQ宕机引起的问题,如果出现服务器宕机或者磁盘损坏则上面的手段统统无效,必须引入镜像队列,做异地多活来抵御这种不可抗因素。

5. 📌消费者无法正常消费

最后一步会出问题的地方就在消费者端了,不过这个解决问题的方法我们之前的文章已经说过了,就是消费者的消息确认。

spring:
  rabbitmq:
    addresses: 127.0.0.1
    host: 5672
    username: guest
    password: guest
    virtual-host: /
    # 手动确认消息
    listener:
      simple:
          acknowledge-mode: manual

打开手动消息确认之后,只要我们这条消息没有成功消费,无论中间是出现消费者宕机还是代码异常,只要连接断开之后这条信息还没有被消费那么这条消息就会被重新放入队列再次被消费。

当然这也可能会出现重复消费的情况,不过在分布式系统中幂等性是一定要做的,所以一般重复消费都会被接口的幂等给拦掉。

所谓幂等性就是:一个操作多次执行产生的结果与一次执行产生的结果一致。

幂等性相关内容不在本章讨论范围~所以我就不多做阐述了。

6. 💡消息可靠性案例

消息可靠性架构

这个图是我很早之前画的,是为了记录当时使用RabbitMQ做消息可靠性的具体做法,这里我正好拿出来做个例子给大家看一看。

这个例子中的消息是先入库的,然后生产者从DB里面拿到数据包装成消息发给MQ,经过消费者消费之后对DB数据的状态进行更改,然后重新入库。

这中间有任何步骤失败,数据的状态都是没有更新的,这时通过一个定时任务不停的去刷库,找到有问题的数据将它重新扔到生产者那里进行重新投递。

这个方案其实和网上的很多方案大同小异,基础的可靠性保证之后,定时任务做一个兜底进行不断的扫描,力图100%可靠性。

后记

越写越长,因为篇幅缘故限流和延时队列放到下一篇了,我会尽快发出来供大家阅读,讲真,我真的不是故意多水一篇的!!!

最后再给优狐打个广告,最近掘金在GitHub上面建立了一个开源计划 - open-source,旨在收录各种好玩的好用的开源库,如果大家有想要自荐或者分享的开源库都可以参与进去,为这个开源计划做一份贡献,同时这个开源库的Start也在稳步增长中,参与进去也可以增加自己项目的曝光度,一举两得。

同时这个开源库还有一个兄弟项目 - open-source-translation,旨在招募技术文章翻译志愿者进行技术文章的翻译工作,
争做最棒开源翻译,翻译业界高质量文稿,为技术人的成长献一份力。


最近这段时间事情挺多,优狐令我八月底之前升级到三级,所以各位读者的赞对我很重要,希望大家能够高抬贵手,帮我一哈~

好了,以上就是本期的全部内容,感谢你能看到这里,欢迎对本文点赞收藏与评论,👍你们的每个点赞都是我创作的最大动力。

我是耳朵,一个一直想做知识输出的伪文艺程序员,我们下期见。

本文代码:码云地址GitHub地址

查看原文

赞 21 收藏 17 评论 2

公子 赞了文章 · 2020-09-02

上手了RabbitMQ?再来看看它的交换机(Exchange)吧

人生终将是场单人旅途,孤独之前是迷茫,孤独过后是成长。

楔子

本篇是消息队列RabbitMQ的第三弹。

RabbitMQ的入门RabbitMQ+SpringBoot的整合可以点此链接进去回顾,今天要讲的是RabbitMQ的交换机。

本篇是理解RabbitMQ很重要的一篇,交换机是消息的第一站,只有理解了交换机的分发模式,我们才能知道不同交换机根据什么规则分发消息,才能明白在面对不同业务需求的时候应采用哪种交换机。


祝有好收获,先赞后看,快乐无限。

本文代码:码云地址GitHub地址

1. 🔍Exchange

rabbit架构图

先来放上几乎每篇都要出现一遍的我画了好久的RabbitMQ架构图。

前两篇文中我们一直没有显式的去使用Exchange,都是使用的默认Exchange,其实Exchange是一个非常关键的组件,有了它才有了各种消息分发模式。

我先简单说说Exchange有哪几种类型:

  1. fanoutFanout-Exchange会将它接收到的消息发往所有与他绑定的Queue中。
  2. directDirect-Exchange会把它接收到的消息发往与它有绑定关系且Routingkey完全匹配的Queue中(默认)。
  3. topicTopic-Exchange与Direct-Exchange相似,不过Topic-Exchange不需要全匹配,可以部分匹配,它约定:Routingkey为一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词)。
  4. headerHeader-Exchange不依赖于RoutingKey或绑定关系来分发消息,而是根据发送的消息内容中的headers属性进行匹配。此模式已经不再使用,本文中也不会去讲,大家知道即可。

本文中我们主要讲前三种Exchange方式,相信凭借着我简练的文字和灵魂的画技给大家好好讲讲,争取老妪能解。

Tip:本文的代码演示直接使用SpringBoot+RabbitMQ的模式。

2. 📕Fanout-Exchange

先来看看Fanout-ExchangeFanout-Exchange又称扇形交换机,这个交换机应该是最容易理解的。

扇形交换机

ExchangeQueue建立一个绑定关系,Exchange会分发给所有和它有绑定关系的Queue中,绑定了十个Queue就把消息复制十份进行分发。

这种绑定关系为了效率肯定都会维护一张表,从算法效率上来说一般是O(1),所以Fanout-Exchange是这几个交换机中查找需要被分发队列最快的交换机。


下面是一段代码演示:

    @Bean
    public Queue fanout1() {
        return new Queue("fanout1");
    }

    @Bean
    public Queue fanout2() {
        return new Queue("fanout2");
    }

    @Bean
    public FanoutExchange fanoutExchange() {
        // 三个构造参数:name durable autoDelete
        return new FanoutExchange("fanoutExchange", false, false);
    }

    @Bean
    public Binding binding1() {
        return BindingBuilder.bind(fanout1()).to(fanoutExchange());
    }

    @Bean
    public Binding binding2() {
        return BindingBuilder.bind(fanout2()).to(fanoutExchange());
    }

为了清晰明了,我新建了两个演示用的队列,然后建了一个FanoutExchange,最后给他们都设置上绑定关系,这样一组队列和交换机的绑定设置就算完成了。

紧接着编写一下生产者和消费者:

    public void sendFanout() {
        Client client = new Client();

        // 应读者要求,以后代码打印的地方都会改成log方式,这是一种良好的编程习惯,用System.out.println一般是不推荐的。
        log.info("Message content : " + client);

        rabbitTemplate.convertAndSend("fanoutExchange",null,client);
        System.out.println("消息发送完毕。");
    }

    @Test
    public void sendFanoutMessage() {
        rabbitProduce.sendFanout();
    }
@Slf4j
@Component("rabbitFanoutConsumer")
public class RabbitFanoutConsumer {
    @RabbitListener(queues = "fanout1")
    public void onMessage1(Message message, Channel channel) throws Exception {
        log.info("Message content : " + message);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        log.info("消息已确认");
    }

    @RabbitListener(queues = "fanout2")
    public void onMessage2(Message message, Channel channel) throws Exception {
        log.info("Message content : " + message);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        log.info("消息已确认");
    }

}

这两段代码都很好理解,不再赘述,有遗忘的可以去看RabbitMQ第一弹的内容。

其中发送消息的代码有三个参数,第一个参数是Exchange的名称,第二个参数是routingKey的名称,这个参数在扇形交换机里面用不到,在其他两个交换机类型里面会用到。

代码的准备到此结束,我们可以运行发送方法之后run一下了~

项目启动后,我们可以先来观察一下队列与交换机的绑定关系有没有生效,我们在RabbitMQ控制台使用rabbitmqctl list_bindings命令查看绑定关系。

扇形交换机绑定关系

关键部分我用红框标记了起来,这就代表着名叫fanoutExchange的交换机绑定着两个队列,一个叫fanout1,另一个叫fanout2

紧接着,我们来看控制台的打印情况:

扇形交换机确认消息

可以看到,一条信息发送出去之后,两个队列都接收到了这条消息,紧接着由我们的两个消费者消费。

Tip: 如果你的演示应用启动之后没有消费信息,可以尝试重新运行一次生产者的方法发送消息。

3. 📗Direct-Exchange

Direct-Exchange是一种精准匹配的交换机,我们之前一直使用默认的交换机,其实默认的交换机就是Direct类型。

如果将Direct交换机都比作一所公寓的管理员,那么队列就是里面的住户。(绑定关系)

管理员每天都会收到各种各样的信件(消息),这些信件的地址不光要标明地址(ExchangeKey)还需要标明要送往哪一户(routingKey),不然消息无法投递。

扇形交换机

以上图为例,准备一条消息发往名为SendService的直接交换机中去,这个交换机主要是用来做发送服务,所以其绑定了两个队列,SMS队列和MAIL队列,用于发送短信和邮件。

我们的消息除了指定ExchangeKey还需要指定routingKeyroutingKey对应着最终要发送的是哪个队列,我们的示例中的routingKey是sms,这里这条消息就会交给SMS队列。


听了上面这段,可能大家对routingKey还不是很理解,我们上段代码实践一下,大家应该就明白了。

准备工作:

    @Bean
    public Queue directQueue1() {
        return new Queue("directQueue1");
    }

    @Bean
    public Queue directQueue2() {
        return new Queue("directQueue2");
    }

    @Bean
    public DirectExchange directExchange() {
        // 三个构造参数:name durable autoDelete
        return new DirectExchange("directExchange", false, false);
    }

    @Bean
    public Binding directBinding1() {
        return BindingBuilder.bind(directQueue1()).to(directExchange()).with("sms");
    }

    @Bean
    public Binding directBinding2() {
        return BindingBuilder.bind(directQueue2()).to(directExchange()).with("mail");
    }

新建两个队列,新建了一个直接交换机,并设置了绑定关系。

这里的示例代码和上面扇形交换机的代码很像,唯一可以说不同的就是绑定的时候多调用了一个withroutingKey设置了上去。

所以是交换机和队列建立绑定关系的时候设置的routingKey,一个消息到达交换机之后,交换机通过消息上带来的routingKey找到自己与队列建立绑定关系时设置的routingKey,然后将消息分发到这个队列去。

生产者:

    public void sendDirect() {
        Client client = new Client();

        log.info("Message content : " + client);

        rabbitTemplate.convertAndSend("directExchange","sms",client);
        System.out.println("消息发送完毕。");
    }

消费者:

@Slf4j
@Component("rabbitDirectConsumer")
public class RabbitDirectConsumer {
    @RabbitListener(queues = "directQueue1")
    public void onMessage1(Message message, Channel channel) throws Exception {
        log.info("Message content : " + message);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        log.info("消息已确认");
    }

    @RabbitListener(queues = "directQueue2")
    public void onMessage2(Message message, Channel channel) throws Exception {
        log.info("Message content : " + message);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        log.info("消息已确认");
    }

}

效果图如下:

扇形交换机

只有一个消费者进行了消息,符合我们的预期。

4. 📙Topic-Exchange

Topic-Exchange是直接交换机的模糊匹配版本,Topic类型的交换器,支持使用"*"和"#"通配符定义模糊bindingKey,然后按照routingKey进行模糊匹配队列进行分发。

  • *:能够模糊匹配一个单词。
  • #:能够模糊匹配零个或多个单词。

因为加入了两个通配定义符,所以Topic交换机的routingKey也有些变化,routingKey可以使用.将单词分开。


这里我们直接来用一个例子说明会更加的清晰:

准备工作:

    // 主题交换机示例
    @Bean
    public Queue topicQueue1() {
        return new Queue("topicQueue1");
    }

    @Bean
    public Queue topicQueue2() {
        return new Queue("topicQueue2");
    }

    @Bean
    public TopicExchange topicExchange() {
        // 三个构造参数:name durable autoDelete
        return new TopicExchange("topicExchange", false, false);
    }

    @Bean
    public Binding topicBinding1() {
        return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("sms.*");
    }

    @Bean
    public Binding topicBinding2() {
        return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("mail.#");
    }

新建两个队列,新建了一个Topic交换机,并设置了绑定关系。

这里的示例代码我们主要看设置routingKey,这里的routingKey用上了通配符,且中间用.隔开,这就代表topicQueue1消费sms开头的消息,topicQueue2消费mail开头的消息,具体不同往下看。

生产者:

    public void sendTopic() {
        Client client = new Client();

        log.info("Message content : " + client);

        rabbitTemplate.convertAndSend("topicExchange","sms.liantong",client);
        System.out.println("消息发送完毕。");
    }

消费者:

@Slf4j
@Component("rabbitTopicConsumer")
public class RabbitTopicConsumer {
    @RabbitListener(queues = "topicQueue1")
    public void onMessage1(Message message, Channel channel) throws Exception {
        log.info("Message content : " + message);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        log.info("消息已确认");
    }

    @RabbitListener(queues = "topicQueue2")
    public void onMessage2(Message message, Channel channel) throws Exception {
        log.info("Message content : " + message);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        log.info("消息已确认");
    }

}

这里我们的生产者发送的消息routingKeysms.liantong,它就会被发到topicQueue1队列中去,这里消息的routingKey也需要用.隔离开,用其他符号无法正确识别。

如果我们的routingKeysms.123.liantong,那么它将无法找到对应的队列,因为topicQueue1的模糊匹配用的通配符是*而不是#,只有#是可以匹配多个单词的。

Topic-ExchangeDirect-Exchange很相似,我就不再赘述了,通配符*#的区别也很简单,大家可以自己试一下。

后记

周一没更文实在惭愧,去医院抽血了,抽了三管~,吃多少才能补回来~

RabbitMQ已经更新了三篇了,这三篇的内容有些偏基础,下一篇将会更新高级部分内容:包括防止消息丢失,防止消息重复消费等等内容,希望大家持续关注。


最近这段时间压力挺大,优狐令我八月底之前升级到三级,所以各位读者的赞对我很重要,希望大家能够高抬贵手,帮我一哈~

好了,以上就是本期的全部内容,感谢你能看到这里,欢迎对本文点赞收藏与评论,👍你们的每个点赞都是我创作的最大动力。

我是耳朵,一个一直想做知识输出的伪文艺程序员,我们下期见。

本文代码:码云地址GitHub地址

查看原文

赞 18 收藏 11 评论 0

认证与成就

  • 获得 2 次点赞
  • 获得 7 枚徽章 获得 0 枚金徽章, 获得 0 枚银徽章, 获得 7 枚铜徽章

擅长技能
编辑

开源项目 & 著作
编辑

(゚∀゚ )
暂时没有

注册于 2018-09-10
个人主页被 273 人浏览