Chris

Chris 查看完整档案

上海编辑  |  填写毕业院校  |  填写所在公司/组织填写个人主网站
编辑

Keep Hungery!

个人动态

Chris 发布了文章 · 2020-12-28

RabbitMQ实践-code

简单模式-生产与消费

producer生产者

import pika
import json
 
credentials = pika.PlainCredentials('guest', 'guest')
conn = pika.BlockingConnection(
    pika.ConnectionParameters(host='your robbitmq server', port=5672, virtual_host='/', credentials=credentials))
 
channel = conn.channel()
result = channel.queue_declare(queue='zy_test')
 
for i in range(100):
    message = json.dumps({'OrderId': f'1000{i}'})
    channel.basic_publish(exchange='', routing_key='zy_test', body=message)
    print(message)
 
conn.close()

consumer消费者

import pika
import json
 
credentials = pika.PlainCredentials('guest', 'guest')
conn = pika.BlockingConnection(
    pika.ConnectionParameters(host='your robbitmq server', port=5672, virtual_host='/', credentials=credentials))
 
channel = conn.channel()
# durable 持久化 消息不丢失
channel.queue_declare(queue='zy_test', durable=True)
# 负载均衡:使用basicQos( prefetchCount = 1)方法,来限制RabbitMQ只发不超过1条的消息给同一个消费者。当消息处理完毕后,有了反馈,才会进行第二次发送。
channel.basic_qos(prefetch_count=1)
 
def callback(ch, method, properties, body):
    ch.basic_ack(delivery_tag=method.delivery_tag)
    print(body.decode())
 
channel.basic_consume(queue='zy_test', on_message_callback=callback)
channel.start_consuming()

发布与订阅

发布与订阅要借助交换机(EXchange)的原理来实现:
image.png

EXchange一共有四种工作模式:fanout,direct,topic,headers(不常用)

1、fanout模式

fanout模式下,传递到exchange的消息将会转发到所有与其绑定的queue上。通俗理解为:发送给与exchange绑定的所有queue

  • 不需要指定routing_key,即使指定了也是无效的
  • 需要提前将exchange和queue绑定,多对多关系:一个exchange可以绑定多个queue,一个queue可以绑定多个exchange
  • 需要先启动订阅者,此模式下的队列是consumer随机生成的,发布者仅仅发消息到exchange,由exchange转发消息至queue。

发布者:

import json
from time import sleep
 
import pika
 
credentials = pika.PlainCredentials('guest', 'guest')
conn = pika.BlockingConnection(
    pika.ConnectionParameters(host='your robbitmq server', port=5672, virtual_host='/', credentials=credentials))
 
channel = conn.channel()
# 声明exchange,由exchange指定消息在哪个队列传递,如不存在,则创建。durable = True 代表exchange持久化存储,False 非持久化存储
channel.exchange_declare(exchange='zy_test', exchange_type='fanout', durable=True)
 
for i in range(100):
    message = json.dumps({'OrderId': f'20000{i}'})
    # 向队列插入数值 routing_key是队列名。delivery_mode = 2 声明消息在队列中持久化,delivery_mod = 1 消息非持久化。routing_key 不需要配置
    channel.basic_publish(
        exchange='zy_test',
        routing_key='',
        body=message,
        properties=pika.BasicProperties(delivery_mode=2)
    )
    print(f'Sent: {message}')
    sleep(1)
conn.close()

订阅者:

import pika
 
credentials = pika.PlainCredentials('guest', 'guest')
conn = pika.BlockingConnection(
    pika.ConnectionParameters(host='your robbitmq server', port=5672, virtual_host='/', credentials=credentials))
 
channel = conn.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
# 创建临时队列,队列名传空字符,consumer关闭后,队列自动删除
result = channel.queue_declare(queue='', exclusive=True)
# 绑定exchange和队列queue,exchange 使我们能够确切地指定消息应该到哪个队列去
channel.queue_bind(exchange='zy_test', queue=result.method.queue)
 
 
# 回调函数,处理消息内容
def callback(ch, method, properties, body):
    ch.basic_ack(delivery_tag=method.delivery_tag)
    print(body.decode())
 
 
# auto_ack设置成 False,在调用callback函数时,未收到确认标识,消息会重回队列。True,无论调用callback成功与否,消息都被消费掉
channel.basic_consume(result.method.queue, callback, auto_ack=False)
channel.start_consuming()

2、direct模式

direct模式原理:消息发送至exchange,exchange根据路由键(routing_key)转达到对应的queue上;通俗的理解为:根据routing_key过滤。

  • 可以使用默认exchange='', 也可以自定义exchange
  • direct模式不需要将exchange和queue,routing_key,queue进行绑定,当然可以绑定;
  • 传递或接收消息时 需要指定 routing_key
  • 需要先启动 订阅者,此模式下的consumer是随机生成的,发布者仅仅发布消息到exchange,由exchange转发消息至queue。如果后启动订阅者,则会丢失启动前的消息数据。

发布者:

import json
from time import sleep
 
import pika
 
credentials = pika.PlainCredentials('guest', 'guest')
conn = pika.BlockingConnection(
    pika.ConnectionParameters(host='your robbitmq server', port=5672, virtual_host='/', credentials=credentials))
 
channel = conn.channel()
channel.exchange_declare(exchange='zy_test_d', durable=True, exchange_type='direct')
for i in range(100):
    message = json.dumps({'OrderId': f'30000{i}'})
    # 向队列插入数值 routing_key是队列名。delivery_mode = 2 声明消息在队列中持久化,delivery_mod = 1 消息非持久化。routing_key 不需要配置
    channel.basic_publish(
        exchange='zy_test_d',
        routing_key='OrderId',
        body=message,
        properties=pika.BasicProperties(delivery_mode=2)
    )
    print(f'Sent: {message}')
    sleep(1)
conn.close()

订阅者:

import pika
 
credentials = pika.PlainCredentials('guest', 'guest')
conn = pika.BlockingConnection(
    pika.ConnectionParameters(host='your robbitmq server', port=5672, virtual_host='/', credentials=credentials))
 
channel = conn.channel()
channel.exchange_declare(exchange='zy_test_d', exchange_type='direct', durable=True)
# 创建临时队列,队列名传空字符,consumer关闭后,队列自动删除
result = channel.queue_declare(queue='', exclusive=True)
# 绑定exchange和队列queue,exchange 使我们能够确切地指定消息应该到哪个队列去
channel.queue_bind(exchange='zy_test_d', queue=result.method.queue, routing_key='OrderId')
 
 
# 回调函数,处理消息内容
def callback(ch, method, properties, body):
    ch.basic_ack(delivery_tag=method.delivery_tag)
    print(body.decode())
 
 
# auto_ack设置成 False,在调用callback函数时,未收到确认标识,消息会重回队列。True,无论调用callback成功与否,消息都被消费掉
channel.basic_consume(result.method.queue, callback, auto_ack=False)
channel.start_consuming()

3、topic模式

topic模式和第二种模式差不多,exchange 也是通过 路由键 routing_key 来转发消息到指定的 queue 。 _不同点是 routing_key 使用正则表达式支持模糊匹配_,但匹配规则又与常规的正则表达式不同,比如“#”是匹配全部,“*”是匹配一个词。
举例:routing_key =“#orderid#”,意思是将消息转发至所有 routing_key 包含 “orderid” 字符的队列中。代码和模式二 类似,就不贴出来了。

基于rabbitMQ的RPC

Callback queue 回调队列

一个客户端向服务器发送请求,服务器端处理请求后,将其处理结果保存在一个存储体中。而客户端为了获得处理结果,那么客户在向服务器发送请求时,同时发送一个回调队列地址 reply_to

Correlation id 关联标识

一个客户端可能会发送多个请求给服务器,当服务器处理完后,客户端无法辨别在回调队列中的响应具体和那个请求时对应的。为了处理这种情况,客户端在发送每个请求时,同时会附带一个独有correlation_id属性,这样客户端在回调队列中根据correlation_id字段的值就可以分辨此响应属于哪个请求。

客户端发送请求

客户端在发送RPC请求到RPC请求队列时,客户端至少发送带有reply_to以及correlation_id两个属性的信息

服务端工作流

等待接受客户端发来RPC请求,当请求出现的时候,服务器从RPC请求队列中取出请求,进行处理后,将响应发送到reply_to指定的回调队列中

客户端接受处理结果

客户端等待回调队列中出现响应,当响应出现时,它会根据响应中correlation_id字段的值,将其返回给对应的应用

server端

import pika
 
 
def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n - 1) + fib(n - 2)
 
 
# 处理RPC请求队列中数据,并回调
def on_request(ch, method, properties, body):
    n = int(body)
    print(f'[recv] n={n}')
    response = fib(n)
 
    ch.basic_publish(exchange='',
                     routing_key=properties.reply_to,
                     properties=pika.BasicProperties(correlation_id=properties.correlation_id),
                     body=str(response)
                     )
    ch.basic_ack(delivery_tag=method.delivery_tag)
 
# 鉴权
credentials = pika.PlainCredentials('guest', 'guest')
# 建立链接
conn = pika.BlockingConnection(
    pika.ConnectionParameters(host='your rabbitmq host', port=5672, virtual_host='/', credentials=credentials))
# 建立会话
channel = conn.channel()
# 声明RPC请求队列
channel.queue_declare(queue='rpc_queue')
# 负载均衡
channel.basic_qos(prefetch_count=1)
# 消费客户端的消息,并请求回调方法on_request
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)
channel.start_consuming()

client端

import uuid
import pika
 
 
class fibRpcClient:
    def __init__(self):
        self.__credentials = pika.PlainCredentials('guest', 'guest')
        self.conn = pika.BlockingConnection(pika.ConnectionParameters(
            host='your rabbitmq server', port=5672, virtual_host='/', credentials=self.__credentials
        ))
        # 定义回调消息会话
        self.channel = self.conn.channel()
        result = self.channel.queue_declare(queue='rpc_back', exclusive=True)
        self.callback_queue = result.method.queue
        self.correlation_id = str(uuid.uuid4())
        self.response = None
        # 收到回调消息,开始消费
        self.channel.basic_consume(
            on_message_callback=self.on_response,
            queue=self.callback_queue,
            auto_ack=False
        )
 
    # 收到消息后 数据处理方法
    def on_response(self, ch, method, properties, body):
        if self.correlation_id == properties.correlation_id:
            self.response = body
 
    def call(self, n):
        # 发送消息
        self.channel.basic_publish(
            exchange='',
            routing_key='rpc_queue',
            body=str(n),
            properties=pika.BasicProperties(
                reply_to=self.callback_queue,
                correlation_id=self.correlation_id
            )
        )
        print(f'[send] n={n}')
        # 没有收到相应,就一直非阻塞式的start_consumer
        while self.response is None:
            self.conn.process_data_events()
        print(f'[recv] result is {int(self.response)}')
 
if __name__ == '__main__':
    fib = fibRpcClient()
    fib.call(17)

持久化

MQ默认建立的是临时 queue 和 exchange,如果不声明持久化,一旦 rabbitmq 挂掉,queue、exchange 将会全部丢失。所以我们一般在创建 queue 或者 exchange 的时候会声明 持久化。

1、queue 声明持久化

声明消息队列,消息将在这个队列传递,如不存在,则创建。
durable = True 代表消息队列持久化存储,False 非持久化存储

result = channel.queue_declare(queue = 'python-test',durable = True)

2、exchange 声明持久化

声明exchange,由exchange指定消息在哪个队列传递,如不存在,则创建。
durable = True 代表exchange持久化存储,False 非持久化存储

channel.exchange_declare(exchange = 'python-test', durable = True)

3、消息持久化

虽然exchange 和 queue 都申明了持久化,但如果消息只存在内存里,rabbitmq 重启后,内存里的东西还是会丢失。所以必须声明消息也是持久化,从内存转存到硬盘。

向队列插入数值 routing_key是队列名。
delivery_mode = 2 声明消息在队列中持久化,delivery_mod = 1 消息非持久化 

channel.basic_publish(exchange = '',routing_key = 'python-test',body = message, properties=pika.BasicProperties(delivery_mode = 2))

4、acknowledgement消息不丢失

消费者(consumer)调用callback函数时,会存在处理消息失败的风险,如果处理失败,则消息丢失。但是也可以选择消费者处理失败时,将消息回退给 rabbitmq ,重新再被消费者消费,这个时候需要设置确认标识。

auto_ack设置成 False,在调用callback函数时,未收到确认标识,消息会重回队列。
True,无论调用callback成功与否,消息都被消费掉 

channel.basic_consume(callback,queue = 'python-test', auto_ack= False)
查看原文

赞 0 收藏 0 评论 0

Chris 发布了文章 · 2020-12-28

RobbitMQ安装

安装以CentOS系统为例

1、在服务器上配置epel源

# CentOS6
wget -O /etc/yum.repos.d/epel.repo http://mirrors.aliyun.com/repo/epel-6.repo
# 验证是否成功
yum repolist 
# 如果出错,就编辑vi epel.repo,取消baseurl的注释,将mirrorlist行注释,就差不多了.  
 
# CentOS7同理
wget -O /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-7.repo
wget -O /etc/yum.repos.d/epel.repo http://mirrors.aliyun.com/repo/epel-7.repo

2、安装

#安装erlang语言,rabbitMQ使用erlang语言写的
yum -y install erlang
yum -y install rabbitmq-server

3、启动、停止

# 启动
service rabbitmq-server start
# 停止
service rabbitmq-server stop
# 更多命令:start|stop|status|rotate-logs|restart|condrestart|try-restart|reload|force-reload

4、主要端口说明

4369 -- erlang发现口

5672 --client端通信口

15672 -- 管理界面ui端口

25672 -- server间内部通信口

RabbitMQ界面http://host:15672/
默认用户名/密码:guest/guest

连接RabbitMQ要用client端通信口:server: amqp://guest:guest@localhost:5672/

5、web界面打不开解决方案

cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.1.5/sbin
./rabbitmq-plugins list
 
# 如果下图中几项前面中括号为空,则执行如下命令:
./rabbitmq-plugins enable rabbitmq_management
# 然后再执行list命令,看web页面相关plugins是否启动,参照下图
./rabbitmq-plugins list
# 如果已经启动则重启
rabbitmq service rabbitmq-server restart

image.png

6、常用概念

Broker简单来说就是消息队列服务器实体。
Exchange消息交换机,他制定消息按什么规则,路由到哪个队列。
Queue消息队列载体,每个消息都会被投入一个或多个队列。
Binding绑定,他的作用就是把exchange和queue按照路由规则绑定起来。
Routing Key路由关键字,exchange根据这个关键字进行消息投递。
vhost虚拟主机,一个broker里可以设多个vhost,用作不同用户得权限分离。
producer消息生产者,就是投递消息得程序。
consumer消息消费者,就是接受消息得程序。
channel消息通道,在客户端得每个连接里。可以建立多个channel,每个channel代表一个会话任务。

查看原文

赞 0 收藏 0 评论 0

Chris 发布了文章 · 2020-12-28

RobbitMQ使用入门

什么是MQ

消息总线(Message Queue),是一种跨进程异步的通信机制,用于上下游传递消息。由消息系统来确保消息的可靠传递。

MQ是干什么用的?

应用解耦异步流量削锋、数据分发、错峰流控、日志收集等等...

解耦举例:看这么个场景。A 系统发送数据到 BCD 三个系统,通过接口调用发送。如果 E 系统也要这个数据呢?那如果 C 系统现在不需要了呢?A 系统负责人几乎崩溃......,所以通过发布订阅模式,谁需要谁订阅即可。

异步举例:A 系统接收一个请求,需要在自己本地写库,还需要在 BCD 三个系统写库,自己本地写库要 3ms,BCD 三个系统分别写库要 300ms、450ms、200ms。最终请求总延时是 3 + 300 + 450 + 200 = 953ms,接近 1s,用户感觉搞个什么东西,慢死了慢死了。用户通过浏览器发起请求,等待个 1s,这几乎是不可接受的。 如果使用异步,A系统处理完自己的逻辑,直接返回给用户;然后发消息让BCD写库;用户收到请求只要3ms,体验流畅美滋滋。

流量削峰举例:每天 0:00 到 12:00,A 系统风平浪静,每秒并发请求数量就 50 个。结果每次一到 12:00 ~ 13:00 ,每秒并发请求数量突然会暴增到 5k+ 条。但是系统是直接基于 MySQL 的,大量的请求涌入 MySQL,每秒钟对 MySQL 执行约 5k 条 SQL。一般的 MySQL,扛到每秒 2k 个请求就差不多了,如果每秒请求到 5k 的话,可能就直接把 MySQL 给打死了,导致系统崩溃,用户也就没法再使用系统了。 这时可以通过MQ,把消息挤压在队列中,至少可以保证系统正常可用;然后按照系统正常处理速度写入数据,也不会丢失数据。

MQ的潜在缺点

  • 系统可用性降低
    系统引入的外部依赖越多,越容易挂掉。本来你就是 A 系统调用 BCD 三个系统的接口就好了,人 ABCD 四个系统好好的,没啥问题,你偏加个 MQ 进来,万一 MQ 挂了咋整,MQ 一挂,整套系统崩溃的,你不就完了?如何保证消息队列的高可用
  • 系统复杂度提高
    硬生生加个 MQ 进来,你怎么保证消息没有重复消费?怎么处理消息丢失的情况?怎么保证消息传递的顺序性?头大头大,问题一大堆,痛苦不已。
  • 一致性问题
    A 系统处理完了直接返回成功了,人都以为你这个请求就成功了;但是问题是,要是 BCD 三个系统那里,BD 两个系统写库成功了,结果 C 系统写库失败了,咋整?你这数据就不一致了。

主流产品

RabbitMQ,Kafka,ActiveMQ,RocketMQ等等

image.png

综上,各种对比之后,有如下建议:

一般的业务系统要引入 MQ,最早大家都用 ActiveMQ,但是现在确实大家用的不多了,没经过大规模吞吐量场景的验证,社区也不是很活跃;

后来大家开始用 RabbitMQ,但是确实 erlang 语言阻止了大量的 Java 工程师去深入研究和掌控它,对公司而言,几乎处于不可控的状态,但是确实人家是开源的,比较稳定的支持,活跃度也高;

不过现在确实越来越多的公司,会去用 RocketMQ,确实很不错(阿里出品),对自己公司技术实力有绝对自信的,推荐用 RocketMQ。

所以中小型公司,技术实力较为一般,技术挑战不是特别高,用 RabbitMQ 是不错的选择;大型公司,基础架构研发实力较强,用 RocketMQ 是很好的选择。

如果是大数据领域的实时计算、日志采集等场景,用 Kafka 是业内标准的,绝对没问题,社区活跃度很高,何况几乎是全世界这个领域的事实性规范

RabbitMQ的优势:

  • 可靠性(Reliablity):使用了一些机制来保证可靠性,比如持久化、传输确认、发布确认。
  • 灵活的路由(Flexible Routing):在消息进入队列之前,通过Exchange来路由消息。对于典型的路由功能,Rabbit已经提供了一些内置的Exchange来实现。针对更复杂的路由功能,可以将多个Exchange绑定在一起,也通过插件机制实现自己的Exchange。
  • 消息集群(Clustering):多个RabbitMQ服务器可以组成一个集群,形成一个逻辑Broker。
  • 高可用(Highly Avaliable Queues):队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。
  • 多种协议(Multi-protocol):支持多种消息队列协议,如STOMP、MQTT等。
  • 多种语言客户端(Many Clients):几乎支持所有常用语言,比如Java、.NET、Ruby等。
  • 管理界面(Management UI):提供了易用的用户界面,使得用户可以监控和管理消息Broker的许多方面。
  • 跟踪机制(Tracing):如果消息异常,RabbitMQ提供了消息的跟踪机制,使用者可以找出发生了什么。
  • 插件机制(Plugin System):提供了许多插件,来从多方面进行扩展,也可以编辑自己的插件。

RabbitMQ的几种使用模式

1、生产与消费

生产端通过路由规则发送消息到不同queue,消费端根据queue名称消费消息。

RabbitMQ是向消费端推送消息,订阅关系和消费状态保存在服务端。

生产端发送一条消息通过路由投递到Queue,只有一个消费者能消费到。

举例:系统内部的点赞操作,和用户的点赞数据列表。 点赞操作执行完成后,如果还需要处理用户已点赞数据等一些列逻辑,会增加当前点赞接口耗时。所以只需要发消息出来,点赞接口先行返回点赞结果,供点赞数据收到消息后处理已点赞数据逻辑等。点赞服务可能也是集群,只要一个服务消费到这个点赞消息即可。如果存在多个服务均消费该数据,有可能会存在数据重复问题。

image.png

2、发布与订阅

当RabbitMQ需要支持多订阅时,发布者发送的消息通过路由同时写到多个Queue,不同订阅组消费此消息。

举例:棋谱负责处理feed流数据,记录feed的各种状态。小视频,创作中心都关心feed的状态,需要知道feed是否发生变化。那qipu就说 我把消息放到消息池里好了,你们谁需要新数据,通过订阅我,然后就可到池里拿了。后续新增随刻创作,也可以通过订阅的qipu的数据,来获取feed数据。

image.png

两者的主要区别:

消费模式:生产消费者是所有消费者抢占消息,订阅发布是所有订阅者共享消息。

(1)生产者消费者模式:生产者生产消息放到队列里,多个消费者同时监听队列,谁先抢到消息谁就会从队列中取走消息;即对于每个消息只能被最多一个消费者拥有;

(2)发布者订阅者模式:发布者生产消息放到队列里,多个监听队列的消费者都会收到同一份消息;即正常情况下每个消费者收到的消息应该都是一样的。

发布订阅中EXchange有四种工作模式:fanout,direct,topic,headers(不常用)

发布与订阅要借助交换机(EXchange)的原理来实现

Exchange消息交换机,他制定消息按什么规则,路由到哪个队列。
Queue消息队列载体,每个消息都会被投入一个或多个队列。
Binding绑定,他的作用就是把exchange和queue按照路由规则绑定起来。
Routing Key路由关键字,exchange根据这个关键字进行消息投递。

image.png

1、fanout

每个发到fanout类型交换器的消息都会分到所有绑定的队列上去。

fanout交换器不处理该路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout类型转发消息是最快的。

image.png

2、direct

每个发到fanout类型交换器的消息都会分到所有绑定的队列上去。

消息中的路由键(routing key)如果和Binding中的binding key一致,交换器就将消息发到对应的队列中。路由键与队列名完全匹配
image.png

3、topic

topic交换器通过模式匹配(可以理解按照一定规则为模糊匹配)分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。

image.png

查看原文

赞 0 收藏 0 评论 0

Chris 发布了文章 · 2020-11-27

图像相似度对比

【声明】度量两张图片的相似度有许多算法,本文将对常用的图片相似度算法进行汇总。部分数据、资料来源于各技术网站,如有侵权烦请联系删除。
常用的算法有几类:

一、Hash算法

  • Hash算法常用的有三种,分别为平均哈希算法(aHash)、感知哈希算法你(pHash)和差异哈哈希算法(dHash);还有一种是小波哈希算法(whash)。
  • Hash算法都是通过获取图片的hash值,再比较两张图片hash值的汉明距离来度量两张图片是否相似。两张图片越相似,那么两张图片的hash数的汉明距离越小。

1、aHash-平均哈希

算法步骤

平均哈希算法是三种Hash算法中最简单的一种,它通过下面几个步骤来获得图片的Hash值:

(1) 缩放图片;
(2) 转灰度图; 
(3) 算像素均值;
(4) 根据相似均值计算指纹.

image.png
得到图片的ahash值后,比较两张图片ahash值的汉明距离,通常认为汉明距离小于10的一组图片为相似图片。

举栗

我们以一张动物的图片为例,详解ahash算法过程:
原图:
husky.jpg

缩放图片
先将图片缩放为8*8的图片,得到下图:
image.png

图片灰度
得到灰度图:
image.png
得到单通道灰度图的各个像素值
image.png

计算平均值
求得的平均值为152.4375

每个像素点与平均值比较
image.png
那么该图片的ahash值即为:1111100011110000111111001111011011100111111011101111000000001000

ahash代码

import cv2
import numpy as np
import copy


def ahash_process(pic_path):
    '''
    ahash算法过程
    :param pic_path: 图片路径
    :return: 
    '''
    img = cv2.imread(pic_path, cv2.IMREAD_UNCHANGED)
    img_resize = cv2.resize(img, (8, 8), cv2.INTER_AREA)
    gray_img = cv2.cvtColor(img_resize, cv2.COLOR_BGR2GRAY)
    print('单通道灰度图:', gray_img)

    avg = np.mean(gray_img)
    print('平均值:', avg)

    dis = copy.deepcopy(gray_img)
    dis_hash_str = ''
    for x_index, x in enumerate(gray_img):
        for y_index, y in enumerate(x):
            if y >= avg:
                dis[x_index, y_index] = 1
            else:
                dis[x_index, y_index] = 0
            dis_hash_str += str(dis[x_index, y_index])
    print('与平均值比较:', dis)
    print('该图片的ahash值:', dis_hash_str)

    cv2.namedWindow('resize', 0)
    cv2.resizeWindow('resize', 600, 600)
    cv2.imshow('resize', img_resize)

    cv2.namedWindow('gray', 0)
    cv2.resizeWindow('gray', 600, 600)
    cv2.imshow('gray', gray_img)

    cv2.waitKey(0)
    cv2.destroyAllWindows()

两张图片对比自然需要另外一张图片,同理通过上述方法,计算出hash值后,计算汉明距离即可。

2、dHash-差异哈希

算法步骤

dhash和ahash算法的差别在于,ahash使用每个像素点与平均像素做对比得出布尔值,dhash是使用当前像素与后一个像素点做对比得到布尔值,正是这个原因,所以dhash算法需要将图片缩放为9*8个像素点。

(1)图片缩放为9*8,保留结构,出去细节; 
(2)灰度化:转换为256阶灰度图; 
(3)求平均值:计算灰度图所有像素的平均值;
(4)比较:像素值大于后一个像素值记作1,相反记作0。本行不与下一行对比,每行9个像素,八个差值,有8行,总共64位 ;
(5)生成hash:将上述步骤生成的1和0按顺序组合起来既是图片的指纹(hash); 

举栗

原图:
husky.jpg

缩放图片
先将图片缩放为9*8的图片,得到下图:
image.png
图片灰度
得到灰度图:
image.png

得到单通道灰度图的各个像素值
image.png

每个像素点与后面一个像素点比较值
image.png

那么该图片的dhash值即为:0011011010111011010110101111001011010011101110011110101111001101

dHash代码

import cv2


def dhash_process(pic_path):
    img = cv2.imread(pic_path, cv2.IMREAD_UNCHANGED)
    img_resize = cv2.resize(img, (9, 8), cv2.INTER_AREA)
    gray_img = cv2.cvtColor(img_resize, cv2.COLOR_BGR2GRAY)
    print('单通道灰度图:', gray_img)

    dis = [[] for x in range(8)]
    dis_hash_str = ''
    for row in range(8):
        for col in range(8):
            if gray_img[row, col] >= gray_img[row, col + 1]:
                dis[row].append(1)
                dis_hash_str += str(1)
            else:
                dis[row].append(0)
                dis_hash_str += str(0)

    print('比较值:', dis)
    print('该图片的dhash值:', dis_hash_str)

    cv2.namedWindow('resize', 0)
    cv2.resizeWindow('resize', 600, 600)
    cv2.imshow('resize', img_resize)

    cv2.namedWindow('gray', 0)
    cv2.resizeWindow('gray', 600, 600)
    cv2.imshow('gray', gray_img)

    cv2.waitKey(0)
    cv2.destroyAllWindows()

3、pHash-感知哈希

算法步骤

dhash是三种Hash算法中较为复杂的一种,它是基于DCT(离散余弦变换)来得到图片的hash值:

(1)缩小图片:32*32是一个较好的大小,这样方便DCT计算;
(2)灰度化:转换为256阶灰度图; 
(3)计算DCT:DCT把图片分离成分率的集合,DCT(离散余弦变换);
(4)缩小DCT:DCT计算后的矩阵是32 * 32,保留左上角的8 * 8,这些代表的图片的最低频率;
(5)计算平均值:计算缩小DCT后的所有像素点的平均值; 
(6)比较平均值:大于平均值记录为1,反之记录为0,得到phash值。

举栗

原图:
husky.jpg

缩放图片
先将图片缩放为32*32的图片,得到下图:
image.png

图片灰度
得到灰度图:
image.png

计算图片DCT
得到DCT图:
image.png

得到图片低频DCT
得到低频DCT图:
image.png
低频DCT值:
image.png
平均值为:97.10013

低频DCT值与平均值比较
得到phash值为:1110011010100100000010001101001100010000110000001001000000010000

pHash代码

import cv2
import numpy as np


def phash_process(pic_path):
    img = cv2.imread(pic_path, cv2.IMREAD_UNCHANGED)
    img_resize = cv2.resize(img, (32, 32), cv2.INTER_AREA)
    gray_img = cv2.cvtColor(img_resize, cv2.COLOR_BGR2GRAY)
    print('单通道灰度图:', gray_img)

    gray_img_dct = cv2.dct(np.float32(gray_img))
    gray_img_low_dct = gray_img_dct[0:8, 0:8]
    print('低频DCT图值:', gray_img_low_dct)

    avg = np.mean(gray_img_low_dct)
    print('低频DCT图平均值:', avg)

    dis = [[] for x in range(8)]
    dis_hash_str = ''

    for row_index, row in enumerate(gray_img_low_dct):
        for col_index, col in enumerate(row):
            print(row_index, col_index)
            if col >= avg:
                dis[row_index].append(1)
                dis_hash_str += '1'
            else:
                dis[row_index].append(0)
                dis_hash_str += '0'

    print('phash值:', dis_hash_str)

    cv2.namedWindow('resize', 0)
    cv2.resizeWindow('resize', 600, 600)
    cv2.imshow('resize', img_resize)

    cv2.namedWindow('gray', 0)
    cv2.resizeWindow('gray', 600, 600)
    cv2.imshow('gray', gray_img)

    cv2.namedWindow('DCT', 0)
    cv2.resizeWindow('DCT', 600, 600)
    cv2.imshow('DCT', gray_img_dct)

    cv2.namedWindow('low_DCT', 0)
    cv2.resizeWindow('low_DCT', 600, 600)
    cv2.imshow('low_DCT', gray_img_low_dct)

    cv2.waitKey(0)
    cv2.destroyAllWindows()

4、wHash-小波哈希

wavelet hash是频表示的另一种形式。whash相当于将phash中的DCT变换改为DWT。
离散小波变换(DWT)是频表示的另一种形式。流行的DCT和傅立叶变换使用余弦函数作为sin\cos的基础:sin(x),sin(2x),sin(3x)等等。与此相反,DWT使用一个单一的功能作为基础,但在不同的形式:缩放和移动。

小波hash在平时用的不多,在此不展开。

以上是所有hash算法的总结

5、彩蛋

那我们在平时使用过程中,是不需要自己手写hash算法过程的。为什么写在最后,其实是想让大家能够耐心看完前面的内容,对算法底层有一定的了解,求不挨打!
有现有的包(imagehash)供大家使用:

pip install imagehash
from imagehash import phash

def image_similar_compare(img1, img2):
    # 以phash为栗子,其余hash方法也可以直接引用的
    hash1 = phash(Image.open(img1))
    hash2 = phash(Image.open(img2))
    # 计算汉明距离
    return 1 - (hash1 - hash2) / len(hash1.hash) ** 2

二、SSIM算法

SSIM(结构相似性度量),这是一种全参考的图像质量评价指标,分别从亮度、对比度、结构三个方面度量图像相似性。 均值作为亮度的估计,标准差作为对比度的估计,协方差作为结构相似程度的度量。 SSIM取值范围[0,1],值越大,表示图像失真越小,越相似。

计算步骤

在实际应用中,可以利用滑动窗将图像分块,令分块总数为N,考虑到窗口形状对分块的影响,采用高斯加权计算每一窗口的均值、方差以及协方差;然后计算对应块的结构相似度SSIM,最后将平均值作为两图像的结构相似性度量,即平均结构相似性MSSIM。

优点

结构相似度指数从图像组成的角度将结构信息定义为独立于亮度、对比度的反映场景中物体结构的属性,并将失真建模为亮度、对比度和结构三个不同因素的组合。

应用

SSIM已经成为广播和有线电视中广为使用的一种衡量视频质量的方法。在超分辨率,图像去模糊 中都有广泛的应用。

更多的是应用于同一张照片,在传输过程前后的对比。

image.png

uX、uY分别表示图像X和Y的均值,σX、σY分别表示图像X和Y的标准差,σXσX、σYσY(实在打不出上标啊,理解万岁)分别表示图像X和Y的方差。σXY代表图像X和Y协方差。C1,C2和C3为常数,是为了避免分母为0而维持稳定。通常取C1=(K1L)^2, C2=(K2L)^2, C3=C2/2, 一般地K1=0.01, K2=0.03, L=255( 是像素值的动态范围,一般都取为255)

image.png

参考文章:

https://cloud.tencent.com/dev...
https://www.cnblogs.com/Kalaf...
https://blog.csdn.net/u010977...

查看原文

赞 0 收藏 0 评论 0

Chris 发布了文章 · 2020-11-26

python+openCV对视频进行截取

使用cv2对视频进行切割

import cv2


def clip_video(source_video, target_video, start_time, end_time):
    cap = cv2.VideoCapture(source_video)
    if not cap.isOpened():
        logger_warning('video is not opened')
    else:
        success, frame = cap.read()
        f_shape = frame.shape
        f_height = f_shape[0]  # 原视频图片的高度
        f_width = f_shape[1]
        fps = cap.get(5)  # 帧速率
        frame_number = cap.get(7)  # 视频文件的帧数
        duration = frame_number / fps  # 视频总帧数/帧速率 是时间/秒【总共有多少秒的视频时间】
        if start_time > duration or end_time > duration:
            return
        start_time = fps * float(start_time)
        end_time = fps * float(end_time)
        # AVI格式编码输出 XVID
        four_cc = cv2.VideoWriter_fourcc(*'H264')
        video_writer = cv2.VideoWriter(target_video, four_cc, fps, (int(f_width), int(f_height)))
        num = 0
        while True:
            success, frame = cap.read()
            if int(start_time) <= int(num) <= int(end_time):
                if success:
                    video_writer.write(frame)
                else:
                    break
            num += 1
            if num > frame_number:
                break
        cap.release()

VideoWriter_fourcc编码格式:

fourcc意为四字符代码(Four-Character Codes),顾名思义,该编码由四个字符组成,下面是VideoWriter_fourcc对象一些常用的参数,注意:字符顺序不能弄混
cv2.VideoWriter_fourcc('I', '4', '2', '0'),该参数是YUV编码类型,文件名后缀为.avi
cv2.VideoWriter_fourcc('P', 'I', 'M', 'I'),该参数是MPEG-1编码类型,文件名后缀为.avi
cv2.VideoWriter_fourcc('X', 'V', 'I', 'D'),该参数是MPEG-4编码类型,文件名后缀为.avi
cv2.VideoWriter_fourcc('T', 'H', 'E', 'O'),该参数是Ogg Vorbis,文件名后缀为.ogv
cv2.VideoWriter_fourcc('F', 'L', 'V', '1'),该参数是Flash视频,文件名后缀为.flv
查看原文

赞 0 收藏 0 评论 0

Chris 发布了文章 · 2020-11-26

django+redis+celery异步任务执行

一、安装redis

参考redis文件夹下:redis安装

文档:redis安装.note
链接:http://note.youdao.com/notesh...

二、django工程配置

1、安装依赖包

pip install celery 
pip install celery-with-redis 
pip install django-celery

2、配置

settings.py文件

import djcelery 

#注册jdcelery 
INSTALLED_APPS = [ 
... ,
'djcelery', 
] 
# celery 设置 
# celery中间人 redis://redis服务所在的ip地址:端口/数据库号 
BROKER_URL = 'redis://redis_ip:6379/0' 
# celery结果返回,可用于跟踪结果 CELERY_RESULT_BACKEND = 'redis://redis_ip:6379/0' 
# celery内容等消息的格式设置,这里使用pickle,如果不使用,会序列化报错 
CELERY_ACCEPT_CONTENT = ['pickle', ] CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' 

# celery时区设置,使用settings中TIME_ZONE同样的时区 CELERY_TIMEZONE = TIME_ZONE
在工程目录下,创建celery.py文件
# coding:utf-8 
from __future__ import absolute_import, unicode_literals 
from celery import Celery 
from django.conf import settings 
import os

# 获取当前文件夹名,即为该Django的项目名 
project_name = os.path.split(os.path.abspath('.'))[-1] 
project_settings = '%s.settings' % project_name 

# 设置环境变量 
os.environ.setdefault('DJANGO_SETTINGS_MODULE', project_settings)
# 实例化Celery,网上很多教程这里都是没有设置broker造成启动失败 
app = Celery('tasks', broker='redis://redis_ip:6379/0', backend='redis://redis_ip:6379/0') 

# 使用django的settings文件配置celery 
app.config_from_object('django.conf:settings') 

# Celery加载所有注册的应用 
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
在工程目录的__init__.py文件中添加:
# 引入celery实例对象 
# 一定要添加在文件最前面,否则会报错 
from __future__ import absolute_import, unicode_literals 
from .celery import app as celery_app 
__all__ = [celery_app]
在对应app的目录下创建tasks.py文件

注意:tasks.py文件一定要创建在应用根目录下,且文件名固定,在该文件里添加你对应的后台任务代码,并注解@task

import time 
from celery import task, Celery 
app = Celery('tasks', broker='redis://redis_ip:6379/0', backend='redis://redis_ip:6379/0')
#在这里添加你的后台任务方法代码,并注解task 
@task 
def add(a, b): 
    print("这是任务开始")
    c = a + b 
    print(c) 
    time.sleep(10) 
    print("这是任务结束")

3、启动celery worker

在python manage.py shell命令中输入

celery -A project_name worker --pool=solo -l info

如下图,即为启动成功
clipboard.png

4、测试

在tasks.py文件中加入测试代码,这里复用上面tasks.py代码中的add方法

在views.py文件中创建测试view方法:

from . import tasks 

def add(request,*args,**kwargs):
    tasks.add.delay(1, 2)
    result = {'code': 0, 'msg': '这是一个后台任务'} 
    return JsonResponse(result)

在url中配置url路径

from django.urls import path 
from . import views 

urlpatterns = [ path('add', views.add, name="add") ]

启动django

然后调用对应url,发现响应只用63ms
clipboard.png

在manage.py shell日志中静候后台任务执行,发现任务已经执行,耗时10s多(因为上述代码sleep了10s)
clipboard.png
到这里,celery已成功运行。

三、问题积累

1、启动celery worker时,报链接失败

(1)可能是限制bind没有注释,仅允许本机访问;参考redis配置项

(2)保护模式为关闭,参考redis配置项

(3)需要鉴权

2、运行后台任务时,报错 kombu.exceptions.ContentDisallowed: Refusing to deserialize untrusted content of type pickle (application/x-python-serialize)

https://blog.csdn.net/libing_thinking/article/details/78622943

kombu.exceptions.ContentDisallowed: Refusing to deserialize untrusted
content of type pickle (application/x-python-serialize)

需要将settings.py文件中修改celery配置

CELERY_ACCEPT_CONTENT = ['pickle', ]

3、运行后台任务时,报错AttributeError: 'str' object has no attribute 'items'

python版本为3.6

#貌似是redis版本高了,重新安装后得以解决 pip install redis==2.10.6

4、启动celery时报错:TypeError: can only concatenate tuple (not “NoneType”) to tuple

这个问题是由于安装包不全导致的,分别检查是否安装了以下安装包:

pip install celery
pip install celery-with-redis
pip install django-celery
查看原文

赞 0 收藏 0 评论 0

Chris 发布了文章 · 2020-11-26

使用django-crontab建立定时任务

1、安装django-crontab

pip install django-crontab

2、在django项目settings.py中注册

INSTALLED_APPS = [
    ......
    'django_crontab',
    'app_name'
]

【注意】django_crontab一定要注册在应用名之前,在这里是下划线,不是短横。

3、在settings.py中配置定时任务

# 定时任务
CRONJOBS = [
    ('*/5 * * * *', 'RecomEvalBackend.job.syncMcnData.task'),
    ('*/5 * * * *', 'RecomEvalBackend.job.syncMcnData.task','>> test.log'),
    ('*/5 * * * *', 'RecomEvalBackend.job.syncMcnData.task',['param1','param2'],{'param3': 4},'>> test.log')
]

【注意】
(1)这里的定时任务脚本,一定是要放在应用文件夹下的;
(2)RecomEvalBackend.job.syncMcnData.task这部分的写法应为:应用名.文件夹.文件名.方法名

4、如何在django中添加任务

#django中添加定时任务
python manage.py crontab add
#django中移出定时任务
python manage.py crontab remove
#django中展示已添加的定时任务
python manage.py crontab show
#django中单次手动执行定时任务
python manage.py crontab run <tash_hash_id>

5、原理

原理是django把定时任务添加到了linux的定时任务crond服务中。所以这里要求crond服务必须是开启的
查看服务器中定时任务命令:

crontab -e

查看crond服务状态:

#查看状态
service crond status
#开启服务
service crond start
#关闭服务
service crond stop
#重启服务
service crond restart

6、其他注意事项

(1)django-crontab不支持windows和mac系统;在windows上执行会报错,如下:

  ...
  File "C:\Users\youngzhang\AppData\Local\Programs\Python\Python36\lib\site-packages\django_crontab\management\commands\crontab.py", line 4, in <module>
    from django_crontab.crontab import Crontab
  File "C:\Users\youngzhang\AppData\Local\Programs\Python\Python36\lib\site-packages\django_crontab\crontab.py", line 3, in <module>
    import fcntl
ModuleNotFoundError: No module named 'fcntl'
查看原文

赞 0 收藏 0 评论 0

Chris 发布了文章 · 2020-11-26

在django中使用redis

一、安装&配置

1、安装

pip install django-redis

注:版本选择

Django 版本支持:
django-redis 3.8.x 支持 django 1.4, 1.5, 1.6, 1.7 (或许会有 1.8)
django-redis 4.4.x 支持 django 1.6, 1.7, 1.8, 1.9 和 1.10
Redis Server 支持:
django-redis 3.x.y 支持 redis-server 2.6.x 或更高
django-redis 4.x.y 支持 redis-server 2.8.x 或更高

2、配置

在django项目中的setting.py文件中配置:

CACHES = {
    "default": {
        "BACKEND": "django_redis.cache.RedisCache",
        "LOCATION": "redis://127.0.0.1:6379/1",
        "OPTIONS": {
            "CLIENT_CLASS": "django_redis.client.DefaultClient",
        }
    }
}

或者

CACHES = {
    "default": {
        "BACKEND": "django_redis.cache.RedisCache",
        "LOCATION": "redis://127.0.0.1:6379/1",
        "OPTIONS": {
            "CLIENT_CLASS": "django_redis.client.DefaultClient",
            "PASSWORD": "mysecret"
        }
    }
}

作为session backend的使用配置:

Django 默认可以使用任何 cache backend 作为 session backend, 将 django-redis 作为 session 储存后端不用安装任何额外的 backend

SESSION_ENGINE = "django.contrib.sessions.backends.cache"
SESSION_CACHE_ALIAS = "default"

Pickle版本

django-redis 使用 pickle 序列化几乎所有数据,默认使用-1=最新版本
CACHES = {

"default": {
    # ...
    "OPTIONS": {
        "PICKLE_VERSION": -1  # Use the latest protocol version
    }
}

}

设置超时时间

SOCKET_CONNECT_TIMEOUT : socket 建立连接超时设置
SOCKET_TIMEOUT : 连接建立后的读写操作超时设置

CACHES = {
    "default": {
        # ...
        "OPTIONS": {
            "SOCKET_CONNECT_TIMEOUT": 5,  # in seconds
            "SOCKET_TIMEOUT": 5,  # in seconds
        }
    }
}

支持压缩

django-redis 支持压缩, 但默认是关闭的. 你可以激活它:

CACHES = {
    "default": {
        # ...
        "OPTIONS": {
            "COMPRESSOR": "django_redis.compressors.zlib.ZlibCompressor",
        }
    }
}

使用 lzma 压缩的例子

import lzma

CACHES = {
    "default": {
        # ...
        "OPTIONS": {
            "COMPRESSOR": "django_redis.compressors.lzma.LzmaCompressor",
        }
    }
}

二、django_redis使用

1、设置key的ttl

from django.core.cache import cache

//timeout为过期时间,单位:秒,timeout=0为立即过期, timeout为None永不超时
cache.set(key, val, timeout) 

//ttl搜索过期时间,返回值:0--标示key不存在或过期,
//None--key存在,但是没有设置过期时间
cache.ttl(key)

//expire指定一个key的过期时间
cache.expire(key, timeout)

//persist设置key永不过期
chache.persist(key)

2、redis锁

django-redis 支持 redis 分布式锁. 锁的线程接口是相同的, 因此你可以使用它作为替代.
使用 python 上下文管理器分配锁的例子:

from django.core.cache import cache

with cache.lock(key):
    do_something()

3、扫描key

from django.core.cache import cache

cache.keys("demo_*")
// redis 的 server side cursors 2.8 版及以上, 
// 使用 iter_keys 取代 keys 方法
cache.iter_keys("demo_*")
from django_redis import get_redis_connection

scan_res = conn.scan_iter('key_*')
查看原文

赞 0 收藏 0 评论 0

Chris 发布了文章 · 2020-11-26

nginx分发请求到docker配置

1、Nginx配置:

nginx.conf

在配置文件中增加配置:
Inkedclipboard_LI.jpg

2、Docker启动时设置端口映射

将宿主机上的8000端口映射到docker上的8001端口

docker run -d -p 8000:8001 -it docker-registry.company.virtual/username/test:0.1 /bin/bash

3、查看docker服务映射的端口

netstat -ltunp | grep docker

Inkedclipboard_LI.jpg

查看宿主机端口与docker端口映射情况(图片可以看到,宿主机的8001端口映射到了docker8000端口)

docker ps

Inkedclipboard_LI.jpg

查看原文

赞 0 收藏 0 评论 0

Chris 发布了文章 · 2020-11-26

使用nginx缓存服务器上的静态文件

一、nginx缓存的优点

nginx缓存图示

如图所示,nginx缓存,可以在一定程度上,减少源服务器的处理请求压力。 因为静态文件(比如css,js, 图片)中,很多都是不经常更新的。 nginx使用proxy_cache将用户的请求缓存到本地一个目录。下一个相同请求可以直接调取缓存文件,就不用去请求服务器了。 毕竟,IO密集型服务的处理是nginx的强项。

二、如何进行设置

先上个栗子:

http{
    proxy_connect_timeout 10;
    proxy_read_timeout 180;
    proxy_send_timeout 5;
    proxy_buffer_size 16k;
    proxy_buffers 4 32k;
    proxy_busy_buffers_size 96k;
    proxy_temp_file_write_size 96k;
    proxy_temp_path /tmp/temp_dir;
    proxy_cache_path /tmp/cache levels=1:2 keys_zone=cache_one:100m inactive=1d max_size=10g;


    server {
        listen       80 default_server;
        server_name  localhost;
        root /mnt/blog/;

        location / {

        }

        #要缓存文件的后缀,可以在以下设置。
        location ~ .*.(gif|jpg|png|css|js)(.*) {
                proxy_pass http://ip地址:90;
                proxy_redirect off;
                proxy_set_header Host $host;
                proxy_cache cache_one;
                proxy_cache_valid 200 302 24h;
                proxy_cache_valid 301 30d;
                proxy_cache_valid any 5m;
                expires 90d;
                add_header wall  "hey!guys!give me a star.";
        }
    }

    # 无nginx缓存的blog端口
    server {
        listen  90;
        server_name localhost;
        root /mnt/blog/;

        location / {

        }
    }
}
复制代码

因为我是在一台服务器上做试验(敲重点,做试验),所以用了两个端口8090进行模拟两台服务器之间的交互。

80端口对接的是普通的域名(http://wangxiaokai.vip)访问。
90端口负责处理80端口代理过来的资源访问。
相当于90端口是源服务器,80端口是nginx反向缓存代理服务器。

接下来讲一下配置项:

2.1 http层设置

 proxy_connect_timeout 10;
    proxy_read_timeout 180;
    proxy_send_timeout 5;
    proxy_buffer_size 16k;
    proxy_buffers 4 32k;
    proxy_busy_buffers_size 96k;
    proxy_temp_file_write_size 96k;
    proxy_temp_path /tmp/temp_dir;
    proxy_cache_path /tmp/cache levels=1:2 keys_zone=cache_one:100m inactive=1d max_size=10g;
复制代码
  • proxy_connect_timeout 服务器连接的超时时间
  • proxy_read_timeout 连接成功后,等候后端服务器响应时间
  • proxy_send_timeout 后端服务器数据回传时间
  • proxy_buffer_size 缓冲区的大小
  • proxy_buffers 每个连接设置缓冲区的数量为number,每块缓冲区的大小为size
  • proxy_busy_buffers_size 开启缓冲响应的功能以后,在没有读到全部响应的情况下,写缓冲到达一定大小时,nginx一定会向客户端发送响应,直到缓冲小于此值。
  • proxy_temp_file_write_size 设置nginx每次写数据到临时文件的size(大小)限制
  • proxy_temp_path 从后端服务器接收的临时文件的存放路径
  • proxy_cache_path 设置缓存的路径和其他参数。被缓存的数据如果在inactive参数(当前为1天)指定的时间内未被访问,就会被从缓存中移除

2.2 server层设置

2.2.1 反向缓存代理服务器

 server {
        listen       80 default_server;
        server_name  localhost;
        root /mnt/blog/;

        location / {

        }

        #要缓存文件的后缀,可以在以下设置。
        location ~ .*.(gif|jpg|png|css|js)(.*) {
                proxy_pass http://ip地址:90;
                proxy_redirect off;
                proxy_set_header Host $host;
                proxy_cache cache_one;
                proxy_cache_valid 200 302 24h;
                proxy_cache_valid 301 30d;
                proxy_cache_valid any 5m;
                expires 90d;
                add_header wall  "hey!guys!give me a star.";
        }
    }
复制代码
  • proxy_pass nginx缓存里拿不到资源,向该地址转发请求,拿到新的资源,并进行缓存
  • proxy_redirect 设置后端服务器“Location”响应头和“Refresh”响应头的替换文本
  • proxy_set_header 允许重新定义或者添加发往后端服务器的请求头
  • proxy_cache 指定用于页面缓存的共享内存,对应http层设置的keys_zone
  • proxy_cache_valid 为不同的响应状态码设置不同的缓存时间
  • expires 缓存时间
    • *

这里我设置了图片cssjs静态资源进行缓存。 当用户输入http://wangxiaokai.vip域名时,解析得到ip:port的访问地址。port默认为80。所以页面请求会被当前server截取到,进行请求处理。 当解析到上述文件名结尾的静态资源,会到缓存区获取静态资源。 如果获取到对应资源,则直接返回数据。 如果获取不到,则将请求转发给proxy_pass指向的地址进行处理。

2.2.2 源服务器

 server {
        listen  90;
        server_name localhost;
        root /mnt/blog/;

        location / {

        }
    }
复制代码

这里直接处理90端口接受到的请求,到服务器本地目录/mnt/blog下抓取资源进行响应。

三、如何验证缓存是否有效

细心的读者应该发现,我在第二段的栗子里,留了个彩蛋 add_header wall "hey!guys!give me a star."add_header是用于在报头设置自定义的信息。 所以,如果缓存有效的话,那么静态资源返回的报头,一定会带上这个信息。

访问http://wangxiaokai.vip结果如下:

nginx缓存结果

四、参考

[1] nginx文档
[2] nginx反向缓存代理详解
[3] Nginx缓存服务器静态文件

查看原文

赞 0 收藏 0 评论 0

认证与成就

  • 获得 28 次点赞
  • 获得 14 枚徽章 获得 0 枚金徽章, 获得 2 枚银徽章, 获得 12 枚铜徽章

擅长技能
编辑

开源项目 & 著作
编辑

(゚∀゚ )
暂时没有

注册于 2017-02-18
个人主页被 1.5k 人浏览