tfzh

tfzh 查看完整档案

深圳编辑深圳大学  |  土木工程 编辑  |  填写所在公司/组织 segmentfault.com/u/tfzh 编辑
编辑

今日长缨在手 何时缚住苍龙

个人动态

tfzh 赞了文章 · 1月15日

Redis 6.0 集群搭建实践

图片

本文是Redis集群学习的实践总结(基于Redis 6.0+),详细介绍逐步搭建Redis集群环境的过程,并完成集群伸缩的实践。

Redis集群简介

Redis集群(Redis Cluster) 是Redis提供的分布式数据库方案,通过 分片(sharding) 来进行数据共享,并提供复制和故障转移功能。相比于主从复制、哨兵模式,Redis集群实现了较为完善的高可用方案,解决了存储能力受到单机限制,写操作无法负载均衡的问题。

本文是Redis集群学习的实践总结,详细介绍逐步搭建Redis集群环境的过程,并完成集群伸缩的实践。

Redis集群环境搭建

方便起见,这里集群环境的所有节点全部位于同一个服务器上,共6个节点以端口号区分,3个主节点+3个从节点。集群的简单架构如图:图片本文基于最新的Redis 6.0+,直接从github下载最新的源码编译获得常用工具 redis-server ,  redis-cli 。值得注意的是,从Redis 5.0以后的版本,集群管理软件 redis-trib.rb 被集成到 redis-cli 客户端工具中(详细可参考cluster-tutorial)。

本节介绍集群环境搭建时,并未借助 redis-trib.rb 快速管理,而是按照标准步骤一步步搭建,这也是为了熟悉集群管理的基本步骤。在集群伸缩实践一节将借助 redis-trib.rb 完成集群重新分片工作。

集群的搭建可以分为四步:

  • 启动节点:将节点以集群方式启动,此时节点是独立的。
  • 节点握手:将独立的节点连成网络。
  • 槽指派:将16384个槽位分配给主节点,以达到分片保存数据库键值对的效果。
  • 主从复制:为从节点指定主节点。

启动节点

每个节点初始状态仍为 Master服务器,唯一不同的是:使用 Cluster 模式启动。需要对配置文件进行修改,以端口号为6379的节点为例,主要修改如下几项:

# redis_6379_cluster.conf
port 6379
cluster-enabled yes
cluster-config-file "node-6379.conf"
logfile "redis-server-6379.log"
dbfilename "dump-6379.rdb"
daemonize yes

其中 cluster-config-file 参数指定了集群配置文件的位置,每个节点在运行过程中,会维护一份集群配置文件;每当集群信息发生变化时(如增减节点),集群内所有节点会将最新信息更新到该配置文件;当节点重启后,会重新读取该配置文件,获取集群信息,可以方便的重新加入到集群中。也就是说,当Redis节点以集群模式启动时,会首先寻找是否有集群配置文件,如果有则使用文件中的配置启动,如果没有,则初始化配置并将配置保存到文件中。集群配置文件由Redis节点维护,不需要人工修改。

为6个节点修改好相应的配置文件后,即可利用 redis-server redis_xxxx_cluster.conf 工具启动6个服务器(xxxx表示端口号,对应相应的配置文件)。利用ps命令查看进程:

$ ps -aux | grep redis
... 800  0.1  0.0  49584  2444 ?        Ssl  20:42   0:00 redis-server 127.0.0.1:6379 [cluster]
... 805  0.1  0.0  49584  2440 ?        Ssl  20:42   0:00 redis-server 127.0.0.1:6380 [cluster]
... 812  0.3  0.0  49584  2436 ?        Ssl  20:42   0:00 redis-server 127.0.0.1:6381 [cluster]
... 817  0.1  0.0  49584  2432 ?        Ssl  20:43   0:00 redis-server 127.0.0.1:6479 [cluster]
... 822  0.0  0.0  49584  2380 ?        Ssl  20:43   0:00 redis-server 127.0.0.1:6480 [cluster]
... 827  0.5  0.0  49584  2380 ?        Ssl  20:43   0:00 redis-server 127.0.0.1:6481 [cluster]

节点握手

将上面的每个节点启动后,节点间是相互独立的,他们都处于一个只包含自己的集群当中,以端口号6379的服务器为例,利用 CLUSTER NODES 查看当前集群包含的节点。

127.0.0.1:6379> CLUSTER NODES
37784b3605ad216fa93e976979c43def42bf763d :6379@16379 myself,master - 0 0 0 connected 449 4576 5798 7568 8455 12706

我们需要将各个独立的节点连接起来,构成一个包含多个节点的集群,使用 CLUSTER MEET 命令。

$ redis-cli -p 6379 -c          # -c 选项指定以Cluster模式运行redis-cli 127.0.0.1:6379> CLUSTER MEET 127.0.0.1 6380 OK 127.0.0.1:6379> CLUSTER MEET 127.0.0.1 6381 OK 127.0.0.1:6379> CLUSTER MEET 127.0.0.1 6480 OK 127.0.0.1:6379> CLUSTER MEET 127.0.0.1 6381 OK 127.0.0.1:6379> CLUSTER MEET 127.0.0.1 6382 OK

再次查看此时集群中包含的节点情况:

127.0.0.1:6379> CLUSTER NODES
c47598b25205cc88abe2e5094d5bfd9ea202335f 127.0.0.1:6380@16380 master - 0 1603632309283 4 connected
87b7dfacde34b3cf57d5f46ab44fd6fffb2e4f52 127.0.0.1:6379@16379 myself,master - 0 1603632308000 1 connected
51081a64ddb3ccf5432c435a8cf20d45ab795dd8 127.0.0.1:6381@16381 master - 0 1603632310292 2 connected
9d587b75bdaed26ca582036ed706df8b2282b0aa 127.0.0.1:6481@16481 master - 0 1603632309000 5 connected
4c23b25bd4bcef7f4b77d8287e330ae72e738883 127.0.0.1:6479@16479 master - 0 1603632308000 3 connected
32ed645a9c9d13ca68dba5a147937fb1d05922ee 127.0.0.1:6480@16480 master - 0 1603632311302 0 connected

可以发现此时6个节点均作为主节点加入到集群中, CLUSTER NODES 返回的结果各项含义如下:

<id> <ip:port@cport> <flags> <master> <ping-sent> <pong-recv> <config-epoch> <link-state> <slot> <slot> ... <slot>
  • 节点id: 由40个16进制字符串组成,节点id只在集群初始化时创建一次,然后保存到集群配置文件(即前文提到的cluster-config-file)中,以后节点重新启动时会直接在集群配置文件中读取。
  • port@cport: 前者为普通端口,用于为客户端提供服务;后者为集群端口,分配方法为:普通端口+10000,只用于节点间的通讯。

其余各项的详细解释可以参考官方文档cluster nodes。

槽指派

Redis集群通过分片(sharding)的方式保存数据库的键值对,整个数据库被分为16384个槽(slot),数据库每个键都属于这16384个槽的一个,集群中的每个节点都可以处理0个或者最多16384个slot。

槽是数据管理和迁移的基本单位。当数据库中的16384个槽都分配了节点时,集群处于上线状态(ok);如果有任意一个槽没有分配节点,则集群处于下线状态(fail)。

注意,只有主节点有处理槽的能力,如果将槽指派步骤放在主从复制之后,并且将槽位分配给从节点,那么集群将无法正常工作(处于下线状态)。

利用 CLUSTER ADDSLOTS

redis-cli  -p 6379 cluster addslots {0..5000}
redis-cli  -p 6380 cluster addslots {5001..10000}
redis-cli  -p 6381 cluster addslots {10001..16383}

槽指派后集群中节点情况如下:

127.0.0.1:6379> CLUSTER NODES
c47598b25205cc88abe2e5094d5bfd9ea202335f 127.0.0.1:6380@16380 master - 0 1603632880310 4 connected 5001-10000 87b7dfacde34b3cf57d5f46ab44fd6fffb2e4f52 127.0.0.1:6379@16379 myself,master - 0 1603632879000 1 connected 0-5000 51081a64ddb3ccf5432c435a8cf20d45ab795dd8 127.0.0.1:6381@16381 master - 0 1603632879000 2 connected 10001-16383 9d587b75bdaed26ca582036ed706df8b2282b0aa 127.0.0.1:6481@16481 master - 0 1603632878000 5 connected
4c23b25bd4bcef7f4b77d8287e330ae72e738883 127.0.0.1:6479@16479 master - 0 1603632880000 3 connected
32ed645a9c9d13ca68dba5a147937fb1d05922ee 127.0.0.1:6480@16480 master - 0 1603632881317 0 connected
127.0.0.1:6379> CLUSTER INFO
cluster_state:ok                        # 集群处于上线状态
cluster_slots_assigned:16384 cluster_slots_ok:16384 cluster_slots_pfail:0 cluster_slots_fail:0 cluster_known_nodes:6 cluster_size:3 cluster_current_epoch:5 cluster_my_epoch:1 cluster_stats_messages_ping_sent:4763 cluster_stats_messages_pong_sent:4939 cluster_stats_messages_meet_sent:5 cluster_stats_messages_sent:9707 cluster_stats_messages_ping_received:4939 cluster_stats_messages_pong_received:4768 cluster_stats_messages_received:9707

主从复制

上述步骤后,集群节点均作为主节点存在,仍不能实现Redis的高可用,配置主从复制之后,才算真正实现了集群的高可用功能。

CLUSTER REPLICATE <node_id> 用来让集群中接收命令的节点成为 node_id 所指定节点的从节点,并开始对主节点进行复制。

redis-cli  -p 6479 cluster replicate 87b7dfacde34b3cf57d5f46ab44fd6fffb2e4f52
redis-cli  -p 6480 cluster replicate c47598b25205cc88abe2e5094d5bfd9ea202335f
redis-cli  -p 6481 cluster replicate 51081a64ddb3ccf5432c435a8cf20d45ab795dd8
127.0.0.1:6379> CLUSTER NODES
c47598b25205cc88abe2e5094d5bfd9ea202335f 127.0.0.1:6380@16380 master - 0 1603633105211 4 connected 5001-10000 87b7dfacde34b3cf57d5f46ab44fd6fffb2e4f52 127.0.0.1:6379@16379 myself,master - 0 1603633105000 1 connected 0-5000 51081a64ddb3ccf5432c435a8cf20d45ab795dd8 127.0.0.1:6381@16381 master - 0 1603633105000 2 connected 10001-16383 9d587b75bdaed26ca582036ed706df8b2282b0aa 127.0.0.1:6481@16481 slave 51081a64ddb3ccf5432c435a8cf20d45ab795dd8 0 1603633107229 5 connected
4c23b25bd4bcef7f4b77d8287e330ae72e738883 127.0.0.1:6479@16479 slave 87b7dfacde34b3cf57d5f46ab44fd6fffb2e4f52 0 1603633106221 3 connected
32ed645a9c9d13ca68dba5a147937fb1d05922ee 127.0.0.1:6480@16480 slave c47598b25205cc88abe2e5094d5bfd9ea202335f 0 1603633104000 4 connected

顺带补充,上述步骤1.2,1.3,1.4可以利用 redis-trib.rb 工具整体实现,在Redis 5.0之后直接利用 redis-cli 完成,参考命令如下:

redis-cli --cluster create  127.0.0.1:6379 127.0.0.1:6479  127.0.0.1:6380 127.0.0.1:6480  127.0.0.1:6381 127.0.0.1:6481  --cluster-replicas 1
--cluster-replicas 1 指示给定的创建节点列表是以主节点+从节点对组成的。

在集群中执行命令

集群此时处于上线状态,可以通过客户端向集群中的节点发送命令。接收命令的节点会计算出命令要处理的键属于哪个槽,并检查这个槽是否指派给自己。

  • 如果键所在的slot刚好指派给了当前节点,会直接执行这个命令。
  • 否则,节点向客户端返回 MOVED 错误,指引客户端转向 redirect 至正确的节点,并再次发送此前的命令。

此处,我们利用 CLUSTER KEYSLOT 查看到键 name 所在槽号为5798(被分配在6380节点),当对此键操作时,会被重定向到相应的节点。对键 fruits 的操作与此类似。

127.0.0.1:6379> CLUSTER KEYSLOT name
(integer) 5798
127.0.0.1:6379> set name huey -> Redirected to slot [5798] located at 127.0.0.1:6380 OK 127.0.0.1:6380>
127.0.0.1:6379> get fruits -> Redirected to slot [14943] located at 127.0.0.1:6381
"apple"
127.0.0.1:6381>

值得注意的是,当我们将命令通过客户端发送给一个从节点时,命令会被重定向至对应的主节点。

127.0.0.1:6480> KEYS *
1) "name"
127.0.0.1:6480> get name -> Redirected to slot [5798] located at 127.0.0.1:6380
"huey"

集群故障转移

集群中主节点下线时,复制此主节点的所有的从节点将会选出一个节点作为新的主节点,并完成故障转移。和主从复制的配置相似,当原先的从节点再次上线,它会被作为新主节点的的从节点存在于集群中。

下面模拟6379节点宕机的情况(将其SHUTDOWN),可以观察到其从节点6479将作为新的主节点继续工作。

462:S 26 Oct 14:08:12.750 * FAIL message received from c47598b25205cc88abe2e5094d5bfd9ea202335f about 87b7dfacde34b3cf57d5f46ab44fd6fffb2e4f52 462:S 26 Oct 14:08:12.751 # Cluster state changed: fail 462:S 26 Oct 14:08:12.829 # Start of election delayed for 595 milliseconds (rank #0, offset 9160). 462:S 26 Oct 14:08:13.434 # Starting a failover election for epoch 6. 462:S 26 Oct 14:08:13.446 # Failover election won: I'm the new master.
462:S 26 Oct 14:08:13.447 # configEpoch set to 6 after successful failover 462:M 26 Oct 14:08:13.447 # Setting secondary replication ID to d357886e00341b57bf17e46b6d9f8cf53b7fad21, valid up to offset: 9161. New replication ID is adbf41b16075ea22b17f145186c53c4499864d5b 462:M 26 Oct 14:08:13.447 * Discarding previously cached master state. 462:M 26 Oct 14:08:13.448 # Cluster state changed: ok

6379节点从宕机状态恢复后,将作为6380节点的从节点存在。

127.0.0.1:6379> CLUSTER NODES
51081a64ddb3ccf5432c435a8cf20d45ab795dd8 127.0.0.1:6381@16381 master - 0 1603692968000 2 connected 10001-16383 c47598b25205cc88abe2e5094d5bfd9ea202335f 127.0.0.1:6380@16380 master - 0 1603692968504 0 connected 5001-10000 4c23b25bd4bcef7f4b77d8287e330ae72e738883 127.0.0.1:6479@16479 master - 0 1603692967495 6 connected 0-5000 87b7dfacde34b3cf57d5f46ab44fd6fffb2e4f52 127.0.0.1:6379@16379 myself,slave 4c23b25bd4bcef7f4b77d8287e330ae72e738883 0 1603692964000 1 connected
9d587b75bdaed26ca582036ed706df8b2282b0aa 127.0.0.1:6481@16481 slave 51081a64ddb3ccf5432c435a8cf20d45ab795dd8 0 1603692967000 4 connected
32ed645a9c9d13ca68dba5a147937fb1d05922ee 127.0.0.1:6480@16480 slave c47598b25205cc88abe2e5094d5bfd9ea202335f 0 1603692967000 5 connected

前文提到 cluster-config-file 会记录下集群节点的状态,打开节点6379的配置文件 nodes-6379.conf ,可以看到 CLUSTER NODES 所示信息均被保存在配置文件中:

51081a64ddb3ccf5432c435a8cf20d45ab795dd8 127.0.0.1:6381@16381 master - 0 1603694920206 2 connected 10001-16383 c47598b25205cc88abe2e5094d5bfd9ea202335f 127.0.0.1:6380@16380 master - 0 1603694916000 0 connected 5001-10000 4c23b25bd4bcef7f4b77d8287e330ae72e738883 127.0.0.1:6479@16479 master - 0 1603694920000 6 connected 0-5000 87b7dfacde34b3cf57d5f46ab44fd6fffb2e4f52 127.0.0.1:6379@16379 myself,slave 4c23b25bd4bcef7f4b77d8287e330ae72e738883 0 1603694918000 1 connected
9d587b75bdaed26ca582036ed706df8b2282b0aa 127.0.0.1:6481@16481 slave 51081a64ddb3ccf5432c435a8cf20d45ab795dd8 0 1603694919000 4 connected
32ed645a9c9d13ca68dba5a147937fb1d05922ee 127.0.0.1:6480@16480 slave c47598b25205cc88abe2e5094d5bfd9ea202335f 0 1603694919200 5 connected
vars currentEpoch 6 lastVoteEpoch 0

集群伸缩实践

集群伸缩的关键在于对集群的进行重新分片,实现槽位在节点间的迁移。本节将以在集群中添加节点和删除节点为例,对槽迁移进行实践。

借助于 redis-cli 中集成的 redis-trib.rb 工具进行槽位的管理,工具的帮助菜单如下:

$ redis-cli --cluster help
Cluster Manager Commands:
  create         host1:port1 ... hostN:portN --cluster-replicas <arg> check          host:port --cluster-search-multiple-owners
  info           host:port
  fix            host:port --cluster-search-multiple-owners --cluster-fix-with-unreachable-masters
  reshard        host:port --cluster-from <arg>
                 --cluster-to <arg>
                 --cluster-slots <arg>
                 --cluster-yes --cluster-timeout <arg>
                 --cluster-pipeline <arg>
                 --cluster-replace
  rebalance      host:port --cluster-weight <node1=w1...nodeN=wN>
                 --cluster-use-empty-masters --cluster-timeout <arg>
                 --cluster-simulate --cluster-pipeline <arg>
                 --cluster-threshold <arg>
                 --cluster-replace
  add-node       new_host:new_port existing_host:existing_port --cluster-slave --cluster-master-id <arg> del-node       host:port node_id
  call           host:port command arg arg .. arg set-timeout    host:port milliseconds
  import         host:port --cluster-from <arg>
                 --cluster-copy --cluster-replace
  backup         host:port backup_directory
  help
  
For check, fix, reshard, del-node, set-timeout you can specify the host and port of any working node in the cluster.

集群伸缩-添加节点

考虑在集群中添加两个节点,端口号为6382和6482,其中节点6482对6382进行复制。

  • (1) 启动节点:按照1.1中介绍的步骤,启动6382和6482节点。
  • (2) 节点握手:借助 redis-cli --cluster add-node 命令分别添加节点6382和6482。
redis-cli --cluster add-node 127.0.0.1:6382 127.0.0.1:6379 redis-cli --cluster add-node 127.0.0.1:6482 127.0.0.1:6379
$ redis-cli --cluster add-node 127.0.0.1:6382 127.0.0.1:6379
>>> Adding node 127.0.0.1:6382 to cluster 127.0.0.1:6379
>>> Performing Cluster Check (using node 127.0.0.1:6379)
S: 87b7dfacde34b3cf57d5f46ab44fd6fffb2e4f52 127.0.0.1:6379 slots: (0 slots) slave
    replicates 4c23b25bd4bcef7f4b77d8287e330ae72e738883
M: 51081a64ddb3ccf5432c435a8cf20d45ab795dd8 127.0.0.1:6381 slots:[10001-16383] (6383 slots) master 1 additional replica(s)
M: c47598b25205cc88abe2e5094d5bfd9ea202335f 127.0.0.1:6380 slots:[5001-10000] (5000 slots) master 1 additional replica(s)
M: 4c23b25bd4bcef7f4b77d8287e330ae72e738883 127.0.0.1:6479 slots:[0-5000] (5001 slots) master 1 additional replica(s)
S: 9d587b75bdaed26ca582036ed706df8b2282b0aa 127.0.0.1:6481 slots: (0 slots) slave
    replicates 51081a64ddb3ccf5432c435a8cf20d45ab795dd8
S: 32ed645a9c9d13ca68dba5a147937fb1d05922ee 127.0.0.1:6480 slots: (0 slots) slave
    replicates c47598b25205cc88abe2e5094d5bfd9ea202335f
[OK] All nodes agree about slots configuration. >>> Check for open slots... >>> Check slots coverage...
[OK] All 16384 slots covered. >>> Send CLUSTER MEET to node 127.0.0.1:6382 to make it join the cluster.
[OK] New node added correctly.
  • 移动的槽位数:最终平均每个主节点有4096个slot,因此总共移动4096 slots
  • 接收槽位的目标节点ID:节点6382的ID
  • 移出槽位的源节点ID:节点6379/6380/6381的ID
  1. 重新分片:借助 redis-cli --cluster reshard 命令对集群重新分片,使得各节点槽位均衡(分别从节点6379/6380/6381中迁移一些slot到节点6382中)。需要指定:
$ redis-cli --cluster reshard 127.0.0.1 6479
>>> Performing Cluster Check (using node 127.0.0.1:6479)
M: 4c23b25bd4bcef7f4b77d8287e330ae72e738883 127.0.0.1:6479 slots:[0-5000] (5001 slots) master 1 additional replica(s)
S: 32ed645a9c9d13ca68dba5a147937fb1d05922ee 127.0.0.1:6480 slots: (0 slots) slave
  replicates c47598b25205cc88abe2e5094d5bfd9ea202335f
M: 706f399b248ed3a080cf1d4e43047a79331b714f 127.0.0.1:6482 slots: (0 slots) master
M: af81109fc29f69f9184ce9512c46df476fe693a3 127.0.0.1:6382 slots: (0 slots) master
M: 51081a64ddb3ccf5432c435a8cf20d45ab795dd8 127.0.0.1:6381 slots:[10001-16383] (6383 slots) master 1 additional replica(s)
S: 9d587b75bdaed26ca582036ed706df8b2282b0aa 127.0.0.1:6481 slots: (0 slots) slave
  replicates 51081a64ddb3ccf5432c435a8cf20d45ab795dd8
S: 87b7dfacde34b3cf57d5f46ab44fd6fffb2e4f52 127.0.0.1:6379 slots: (0 slots) slave
  replicates 4c23b25bd4bcef7f4b77d8287e330ae72e738883
M: c47598b25205cc88abe2e5094d5bfd9ea202335f 127.0.0.1:6380 slots:[5001-10000] (5000 slots) master 1 additional replica(s)
[OK] All nodes agree about slots configuration. >>> Check for open slots... >>> Check slots coverage...
[OK] All 16384 slots covered.
How many slots do you want to move (from 1 to 16384)? 4096 What is the receiving node ID?
  • (4) 设置主从关系:
redis-cli -p 6482 cluster replicate af81109fc29f69f9184ce9512c46df476fe693a3 
127.0.0.1:6482> CLUSTER NODES
32ed645a9c9d13ca68dba5a147937fb1d05922ee 127.0.0.1:6480@16480 slave c47598b25205cc88abe2e5094d5bfd9ea202335f 0 1603694930000 0 connected
51081a64ddb3ccf5432c435a8cf20d45ab795dd8 127.0.0.1:6381@16381 master - 0 1603694931000 2 connected 11597-16383 9d587b75bdaed26ca582036ed706df8b2282b0aa 127.0.0.1:6481@16481 slave 51081a64ddb3ccf5432c435a8cf20d45ab795dd8 0 1603694932000 2 connected
706f399b248ed3a080cf1d4e43047a79331b714f 127.0.0.1:6482@16482 myself,slave af81109fc29f69f9184ce9512c46df476fe693a3 0 1603694932000 8 connected
87b7dfacde34b3cf57d5f46ab44fd6fffb2e4f52 127.0.0.1:6379@16379 slave 4c23b25bd4bcef7f4b77d8287e330ae72e738883 0 1603694932000 6 connected
c47598b25205cc88abe2e5094d5bfd9ea202335f 127.0.0.1:6380@16380 master - 0 1603694933678 0 connected 6251-10000 4c23b25bd4bcef7f4b77d8287e330ae72e738883 127.0.0.1:6479@16479 master - 0 1603694932669 6 connected 1250-5000 af81109fc29f69f9184ce9512c46df476fe693a3 127.0.0.1:6382@16382 master - 0 1603694933000 9 connected 0-1249 5001-6250 10001-11596

集群伸缩-删除节点

这里考虑将新添加的两个节点6382和6482删除,需要将节点6382上分配的槽位迁移到其他节点。

  • (1) 重新分片:同样借助 redis-cli --cluster reshard 命令,将6382节点上的槽位全部转移到节点6479上。
$ redis-cli --cluster reshard 127.0.0.1 6382
>>> Performing Cluster Check (using node 127.0.0.1:6382)
M: af81109fc29f69f9184ce9512c46df476fe693a3 127.0.0.1:6382 slots:[0-1249],[5001-6250],[10001-11596] (4096 slots) master 1 additional replica(s)
M: 51081a64ddb3ccf5432c435a8cf20d45ab795dd8 127.0.0.1:6381 slots:[11597-16383] (4787 slots) master 1 additional replica(s)
S: 87b7dfacde34b3cf57d5f46ab44fd6fffb2e4f52 127.0.0.1:6379 slots: (0 slots) slave
    replicates 4c23b25bd4bcef7f4b77d8287e330ae72e738883
S: 32ed645a9c9d13ca68dba5a147937fb1d05922ee 127.0.0.1:6480 slots: (0 slots) slave
    replicates c47598b25205cc88abe2e5094d5bfd9ea202335f
M: 4c23b25bd4bcef7f4b77d8287e330ae72e738883 127.0.0.1:6479 slots:[1250-5000] (3751 slots) master 1 additional replica(s)
M: c47598b25205cc88abe2e5094d5bfd9ea202335f 127.0.0.1:6380 slots:[6251-10000] (3750 slots) master 1 additional replica(s)
S: 706f399b248ed3a080cf1d4e43047a79331b714f 127.0.0.1:6482 slots: (0 slots) slave
    replicates af81109fc29f69f9184ce9512c46df476fe693a3
S: 9d587b75bdaed26ca582036ed706df8b2282b0aa 127.0.0.1:6481 slots: (0 slots) slave
    replicates 51081a64ddb3ccf5432c435a8cf20d45ab795dd8
[OK] All nodes agree about slots configuration. >>> Check for open slots... >>> Check slots coverage...
[OK] All 16384 slots covered.
How many slots do you want to move (from 1 to 16384)? 4096 What is the receiving node ID? 4c23b25bd4bcef7f4b77d8287e330ae72e738883
Please enter all the source node IDs.
Type 'all' to use all the nodes as source nodes for the hash slots.
Type 'done' once you entered all the source nodes IDs.
Source node #1: af81109fc29f69f9184ce9512c46df476fe693a3
Source node #2: done
127.0.0.1:6379> CLUSTER NODES
c47598b25205cc88abe2e5094d5bfd9ea202335f 127.0.0.1:6380@16380 master - 0 1603773540922 0 connected 6251-10000 87b7dfacde34b3cf57d5f46ab44fd6fffb2e4f52 127.0.0.1:6379@16379 myself,slave 4c23b25bd4bcef7f4b77d8287e330ae72e738883 0 1603773539000 1 connected
4c23b25bd4bcef7f4b77d8287e330ae72e738883 127.0.0.1:6479@16479 master - 0 1603773541000 10 connected 0-6250 10001-11596 706f399b248ed3a080cf1d4e43047a79331b714f 127.0.0.1:6482@16482 slave 4c23b25bd4bcef7f4b77d8287e330ae72e738883 0 1603773541000 10 connected
32ed645a9c9d13ca68dba5a147937fb1d05922ee 127.0.0.1:6480@16480 slave c47598b25205cc88abe2e5094d5bfd9ea202335f 0 1603773539000 5 connected
9d587b75bdaed26ca582036ed706df8b2282b0aa 127.0.0.1:6481@16481 slave 51081a64ddb3ccf5432c435a8cf20d45ab795dd8 0 1603773541931 4 connected
af81109fc29f69f9184ce9512c46df476fe693a3 127.0.0.1:6382@16382 master - 0 1603773539000 9 connected
51081a64ddb3ccf5432c435a8cf20d45ab795dd8 127.0.0.1:6381@16381 master - 0 1603773540000 2 connected 11597-16383
  • (2) 删除节点:利用 redis-cli --cluster del-node 命令依次删除从节点6482和主节点6382。
$ redis-cli --cluster del-node 127.0.0.1:6482 706f399b248ed3a080cf1d4e43047a79331b714f >>> Removing node 706f399b248ed3a080cf1d4e43047a79331b714f from cluster 127.0.0.1:6482
>>> Sending CLUSTER FORGET messages to the cluster... >>> Sending CLUSTER RESET SOFT to the deleted node.
$ redis-cli --cluster del-node 127.0.0.1:6382 af81109fc29f69f9184ce9512c46df476fe693a3 >>> Removing node af81109fc29f69f9184ce9512c46df476fe693a3 from cluster 127.0.0.1:6382
>>> Sending CLUSTER FORGET messages to the cluster... >>> Sending CLUSTER RESET SOFT to the deleted node.
127.0.0.1:6379> CLUSTER NODES
c47598b25205cc88abe2e5094d5bfd9ea202335f 127.0.0.1:6380@16380 master - 0 1603773679121 0 connected 6251-10000 87b7dfacde34b3cf57d5f46ab44fd6fffb2e4f52 127.0.0.1:6379@16379 myself,slave 4c23b25bd4bcef7f4b77d8287e330ae72e738883 0 1603773677000 1 connected
4c23b25bd4bcef7f4b77d8287e330ae72e738883 127.0.0.1:6479@16479 master - 0 1603773678000 10 connected 0-6250 10001-11596 32ed645a9c9d13ca68dba5a147937fb1d05922ee 127.0.0.1:6480@16480 slave c47598b25205cc88abe2e5094d5bfd9ea202335f 0 1603773680130 5 connected
9d587b75bdaed26ca582036ed706df8b2282b0aa 127.0.0.1:6481@16481 slave 51081a64ddb3ccf5432c435a8cf20d45ab795dd8 0 1603773677099 4 connected
51081a64ddb3ccf5432c435a8cf20d45ab795dd8 127.0.0.1:6381@16381 master - 0 1603773678112 2 connected 11597-16383

总结

Redis集群环境的搭建主要包括启动节点、节点握手、槽指派和主从复制等四个步骤,集群伸缩同样涉及这几个方面。借助 redis-cli --cluster 命令来管理集群环境,不仅能增加简便性,还能降低操作失误的风险。

原文:https://www.cnblogs.com/hueyx...

image

查看原文

赞 6 收藏 3 评论 0

tfzh 发布了文章 · 1月15日

数据结构与算法: 哈希表 C语言描述

1. 目标:实现一个简易哈希表

期望构建一个这样的简易哈希表:

  • √ 长度为10
  • √ 使用time33散列函数
  • √ key使用16位字符串
  • √ data使用int
  • √ 用单向链表处理哈希碰撞
  • X 暂不支持扩容重哈希

2. 代码实现

2.1 哈希节点结构体

#define HASHSIZE 10

typedef struct HashNode {
    char   key[16];
    int    data;//数据域
    struct HashNode *next;//指针域,哈希碰撞时候的后继结点
} HashNode;

static HashNode *HashTable[HASHSIZE];

2.2 time33散列函数

这里因为要放在长度为HASHSIZEHashTable里,故而对HASHSIZE取余

typedef struct HashNode {
    char   key[16];
    int    data;//数据域
    struct HashNode *next;//指针域,哈希碰撞时候的后继结点
} HashNode;

static HashNode *HashTable[HASHSIZE];

2.3 set函数

  • 考虑该槽为NULL时直接插入
  • 考虑该槽不为空时,则遍历链表寻找目标key, 找得到就更新, 找不到就尾插新节点
void *set(char *key, int data)
{
    struct HashNode *node;
    unsigned int hash_key = time33(key);
    printf("set key:%s val:%d hash:%d\n", key, data, hash_key);
    
    struct HashNode *new_node;
    new_node = (struct HashNode *)malloc(sizeof(struct HashNode));
    strcpy(new_node->key, key);
    new_node->data = data;
    new_node->next = NULL;
    
    node = HashTable[hash_key];
    //先看在不在
    if (NULL == node)
    {
        HashTable[hash_key] = new_node; 
        return NULL;
    }else if(strcmp(node->key, key) == 0){
        printf("update key:%s %d => %d \n",key,node->data, data);
        //update
        node->data = data;
        free(new_node);
    }else{
        //碰撞
        while(node->next != NULL){
            node = node->next;
            if(strcmp(node->key, key) == 0){
                //update
                node->data = data;
                printf("update key:%s %d => %d \n",key,node->data, data);
                free(new_node);
            }
        }
        node->next = new_node;
    }
}

2.4 get函数

  • 考虑该槽为NULL时返回NULL
  • 考虑该槽不为空时,则遍历链表寻找目标key, 找得到就返回该节点, 找不到返回NULL
HashNode *get(char *key)
{
    unsigned int hash_key = time33(key);
    struct HashNode *node;
    node = HashTable[hash_key];
    while(NULL != node){
        if (strcmp(key, node->key) == 0)
        {
            printf("get %s :%d \n", key, node->data);
            return node;
        }
        node = node->next;
    }
    return NULL;
}

2.5 遍历哈希getAll函数 用于验证代码

void getAll(){
    struct HashNode *node;
    for (int i = 0; i < HASHSIZE; ++i)
    {
        if (HashTable[i] != NULL)
        {
            node = HashTable[i];
            while(node != NULL){
                printf("%d %s %d \n", i,node->key,node->data);
                node = node->next;
            }
        }        
    }
}

3 测试代码

int main(int argc, char const *argv[])
{
    set("zhangsan", 100);
    set("zhangsan", 200);//update
    set("lisi", 200);
    set("wangwu", 300);
    set("maliu", 400);
    set("qianer", 500);
    set("zhaojiu", 600);
    set("sunzi",700);
    set("wangwu", 3000);//update
    getAll();
    return 0;
}

Res:

terence@k8s-master:/mydata/c$ gcc hashTable.c
terence@k8s-master:/mydata/c$ ./a.out
set         key:zhangsan val:100     hash:1
set         key:zhangsan val:200     hash:1
update         key:zhangsan 100 => 200 
set         key:lisi     val:200    hash:6
set         key:wangwu   val:300     hash:0
set         key:maliu    val:400     hash:1
set         key:qianer   val:500     hash:5
set         key:zhaojiu  val:600     hash:7
set         key:sunzi    val:700     hash:4
set         key:wangwu   val:3000     hash:0
update         key:wangwu   300 => 3000 
0 wangwu 3000 
1 zhangsan 200 
1 maliu 400 
4 sunzi 700 
5 qianer 500 
6 lisi 200 
7 zhaojiu 600

Done!

查看原文

赞 0 收藏 0 评论 0

tfzh 收藏了文章 · 1月13日

Linux IO模式及 select、poll、epoll详解

注:本文是对众多博客的学习和总结,可能存在理解错误。请带着怀疑的眼光,同时如果有错误希望能指出。

同步IO和异步IO,阻塞IO和非阻塞IO分别是什么,到底有什么区别?不同的人在不同的上下文下给出的答案是不同的。所以先限定一下本文的上下文。

本文讨论的背景是Linux环境下的network IO。

一 概念说明

在进行解释之前,首先要说明几个概念:
- 用户空间和内核空间
- 进程切换
- 进程的阻塞
- 文件描述符
- 缓存 I/O

用户空间与内核空间

现在操作系统都是采用虚拟存储器,那么对32位操作系统而言,它的寻址空间(虚拟存储空间)为4G(2的32次方)。操作系统的核心是内核,独立于普通的应用程序,可以访问受保护的内存空间,也有访问底层硬件设备的所有权限。为了保证用户进程不能直接操作内核(kernel),保证内核的安全,操心系统将虚拟空间划分为两部分,一部分为内核空间,一部分为用户空间。针对linux操作系统而言,将最高的1G字节(从虚拟地址0xC0000000到0xFFFFFFFF),供内核使用,称为内核空间,而将较低的3G字节(从虚拟地址0x00000000到0xBFFFFFFF),供各个进程使用,称为用户空间。

进程切换

为了控制进程的执行,内核必须有能力挂起正在CPU上运行的进程,并恢复以前挂起的某个进程的执行。这种行为被称为进程切换。因此可以说,任何进程都是在操作系统内核的支持下运行的,是与内核紧密相关的。

从一个进程的运行转到另一个进程上运行,这个过程中经过下面这些变化:
1. 保存处理机上下文,包括程序计数器和其他寄存器。
2. 更新PCB信息。
3. 把进程的PCB移入相应的队列,如就绪、在某事件阻塞等队列。
4. 选择另一个进程执行,并更新其PCB。
5. 更新内存管理的数据结构。
6. 恢复处理机上下文。

注:总而言之就是很耗资源,具体的可以参考这篇文章:进程切换

进程的阻塞

正在执行的进程,由于期待的某些事件未发生,如请求系统资源失败、等待某种操作的完成、新数据尚未到达或无新工作做等,则由系统自动执行阻塞原语(Block),使自己由运行状态变为阻塞状态。可见,进程的阻塞是进程自身的一种主动行为,也因此只有处于运行态的进程(获得CPU),才可能将其转为阻塞状态。当进程进入阻塞状态,是不占用CPU资源的

文件描述符fd

文件描述符(File descriptor)是计算机科学中的一个术语,是一个用于表述指向文件的引用的抽象化概念。

文件描述符在形式上是一个非负整数。实际上,它是一个索引值,指向内核为每一个进程所维护的该进程打开文件的记录表。当程序打开一个现有文件或者创建一个新文件时,内核向进程返回一个文件描述符。在程序设计中,一些涉及底层的程序编写往往会围绕着文件描述符展开。但是文件描述符这一概念往往只适用于UNIX、Linux这样的操作系统。

缓存 I/O

缓存 I/O 又被称作标准 I/O,大多数文件系统的默认 I/O 操作都是缓存 I/O。在 Linux 的缓存 I/O 机制中,操作系统会将 I/O 的数据缓存在文件系统的页缓存( page cache )中,也就是说,数据会先被拷贝到操作系统内核的缓冲区中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。

缓存 I/O 的缺点:
数据在传输过程中需要在应用程序地址空间和内核进行多次数据拷贝操作,这些数据拷贝操作所带来的 CPU 以及内存开销是非常大的。

二 IO模式

刚才说了,对于一次IO访问(以read举例),数据会先被拷贝到操作系统内核的缓冲区中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。所以说,当一个read操作发生时,它会经历两个阶段:
1. 等待数据准备 (Waiting for the data to be ready)
2. 将数据从内核拷贝到进程中 (Copying the data from the kernel to the process)

正式因为这两个阶段,linux系统产生了下面五种网络模式的方案。
- 阻塞 I/O(blocking IO)
- 非阻塞 I/O(nonblocking IO)
- I/O 多路复用( IO multiplexing)
- 信号驱动 I/O( signal driven IO)
- 异步 I/O(asynchronous IO)

注:由于signal driven IO在实际中并不常用,所以我这只提及剩下的四种IO Model。

阻塞 I/O(blocking IO)

在linux中,默认情况下所有的socket都是blocking,一个典型的读操作流程大概是这样:
clipboard.png

当用户进程调用了recvfrom这个系统调用,kernel就开始了IO的第一个阶段:准备数据(对于网络IO来说,很多时候数据在一开始还没有到达。比如,还没有收到一个完整的UDP包。这个时候kernel就要等待足够的数据到来)。这个过程需要等待,也就是说数据被拷贝到操作系统内核的缓冲区中是需要一个过程的。而在用户进程这边,整个进程会被阻塞(当然,是进程自己选择的阻塞)。当kernel一直等到数据准备好了,它就会将数据从kernel中拷贝到用户内存,然后kernel返回结果,用户进程才解除block的状态,重新运行起来。

所以,blocking IO的特点就是在IO执行的两个阶段都被block了。

非阻塞 I/O(nonblocking IO)

linux下,可以通过设置socket使其变为non-blocking。当对一个non-blocking socket执行读操作时,流程是这个样子:
clipboard.png

当用户进程发出read操作时,如果kernel中的数据还没有准备好,那么它并不会block用户进程,而是立刻返回一个error。从用户进程角度讲 ,它发起一个read操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个error时,它就知道数据还没有准备好,于是它可以再次发送read操作。一旦kernel中的数据准备好了,并且又再次收到了用户进程的system call,那么它马上就将数据拷贝到了用户内存,然后返回。

所以,nonblocking IO的特点是用户进程需要不断的主动询问kernel数据好了没有。

I/O 多路复用( IO multiplexing)

IO multiplexing就是我们说的select,poll,epoll,有些地方也称这种IO方式为event driven IO。select/epoll的好处就在于单个process就可以同时处理多个网络连接的IO。它的基本原理就是select,poll,epoll这个function会不断的轮询所负责的所有socket,当某个socket有数据到达了,就通知用户进程。

clipboard.png

当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”所有select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程。

所以,I/O 多路复用的特点是通过一种机制一个进程能同时等待多个文件描述符,而这些文件描述符(套接字描述符)其中的任意一个进入读就绪状态,select()函数就可以返回。

这个图和blocking IO的图其实并没有太大的不同,事实上,还更差一些。因为这里需要使用两个system call (select 和 recvfrom),而blocking IO只调用了一个system call (recvfrom)。但是,用select的优势在于它可以同时处理多个connection。

所以,如果处理的连接数不是很高的话,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,可能延迟还更大。select/epoll的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接。)

在IO multiplexing Model中,实际中,对于每一个socket,一般都设置成为non-blocking,但是,如上图所示,整个用户的process其实是一直被block的。只不过process是被select这个函数block,而不是被socket IO给block。

异步 I/O(asynchronous IO)

inux下的asynchronous IO其实用得很少。先看一下它的流程:
clipboard.png

用户进程发起read操作之后,立刻就可以开始去做其它的事。而另一方面,从kernel的角度,当它受到一个asynchronous read之后,首先它会立刻返回,所以不会对用户进程产生任何block。然后,kernel会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都完成之后,kernel会给用户进程发送一个signal,告诉它read操作完成了。

总结

blocking和non-blocking的区别

调用blocking IO会一直block住对应的进程直到操作完成,而non-blocking IO在kernel还准备数据的情况下会立刻返回。

synchronous IO和asynchronous IO的区别

在说明synchronous IO和asynchronous IO的区别之前,需要先给出两者的定义。POSIX的定义是这样子的:
- A synchronous I/O operation causes the requesting process to be blocked until that I/O operation completes;
- An asynchronous I/O operation does not cause the requesting process to be blocked;

两者的区别就在于synchronous IO做”IO operation”的时候会将process阻塞。按照这个定义,之前所述的blocking IO,non-blocking IO,IO multiplexing都属于synchronous IO。

有人会说,non-blocking IO并没有被block啊。这里有个非常“狡猾”的地方,定义中所指的”IO operation”是指真实的IO操作,就是例子中的recvfrom这个system call。non-blocking IO在执行recvfrom这个system call的时候,如果kernel的数据没有准备好,这时候不会block进程。但是,当kernel中数据准备好的时候,recvfrom会将数据从kernel拷贝到用户内存中,这个时候进程是被block了,在这段时间内,进程是被block的。

而asynchronous IO则不一样,当进程发起IO 操作之后,就直接返回再也不理睬了,直到kernel发送一个信号,告诉进程说IO完成。在这整个过程中,进程完全没有被block。

各个IO Model的比较如图所示:
clipboard.png

通过上面的图片,可以发现non-blocking IO和asynchronous IO的区别还是很明显的。在non-blocking IO中,虽然进程大部分时间都不会被block,但是它仍然要求进程去主动的check,并且当数据准备完成以后,也需要进程主动的再次调用recvfrom来将数据拷贝到用户内存。而asynchronous IO则完全不同。它就像是用户进程将整个IO操作交给了他人(kernel)完成,然后他人做完后发信号通知。在此期间,用户进程不需要去检查IO操作的状态,也不需要主动的去拷贝数据。

三 I/O 多路复用之select、poll、epoll详解

select,poll,epoll都是IO多路复用的机制。I/O多路复用就是通过一种机制,一个进程可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。但select,poll,epoll本质上都是同步I/O,因为他们都需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的,而异步I/O则无需自己负责进行读写,异步I/O的实现会负责把数据从内核拷贝到用户空间。(这里啰嗦下)

select

int select (int n, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

select 函数监视的文件描述符分3类,分别是writefds、readfds、和exceptfds。调用后select函数会阻塞,直到有描述副就绪(有数据 可读、可写、或者有except),或者超时(timeout指定等待时间,如果立即返回设为null即可),函数返回。当select函数返回后,可以 通过遍历fdset,来找到就绪的描述符。

select目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点。select的一 个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,可以通过修改宏定义甚至重新编译内核的方式提升这一限制,但 是这样也会造成效率的降低。

poll

int poll (struct pollfd *fds, unsigned int nfds, int timeout);

不同与select使用三个位图来表示三个fdset的方式,poll使用一个 pollfd的指针实现。

struct pollfd {
    int fd; /* file descriptor */
    short events; /* requested events to watch */
    short revents; /* returned events witnessed */
};

pollfd结构包含了要监视的event和发生的event,不再使用select“参数-值”传递的方式。同时,pollfd并没有最大数量限制(但是数量过大后性能也是会下降)。 和select函数一样,poll返回后,需要轮询pollfd来获取就绪的描述符。

从上面看,select和poll都需要在返回后,通过遍历文件描述符来获取已经就绪的socket。事实上,同时连接的大量客户端在一时刻可能只有很少的处于就绪状态,因此随着监视的描述符数量的增长,其效率也会线性下降。

epoll

epoll是在2.6内核中提出的,是之前的select和poll的增强版本。相对于select和poll来说,epoll更加灵活,没有描述符限制。epoll使用一个文件描述符管理多个描述符,将用户关系的文件描述符的事件存放到内核的一个事件表中,这样在用户空间和内核空间的copy只需一次。

一 epoll操作过程

epoll操作过程需要三个接口,分别如下:

int epoll_create(int size);//创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

1. int epoll_create(int size);
创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大,这个参数不同于select()中的第一个参数,给出最大监听的fd+1的值,参数size并不是限制了epoll所能监听的描述符最大个数,只是对内核初始分配内部数据结构的一个建议
当创建好epoll句柄后,它就会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。

2. int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
函数是对指定描述符fd执行op操作。
- epfd:是epoll_create()的返回值。
- op:表示op操作,用三个宏来表示:添加EPOLL_CTL_ADD,删除EPOLL_CTL_DEL,修改EPOLL_CTL_MOD。分别添加、删除和修改对fd的监听事件。
- fd:是需要监听的fd(文件描述符)
- epoll_event:是告诉内核需要监听什么事,struct epoll_event结构如下:

struct epoll_event {
  __uint32_t events;  /* Epoll events */
  epoll_data_t data;  /* User data variable */
};

//events可以是以下几个宏的集合:
EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
EPOLLOUT:表示对应的文件描述符可以写;
EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
EPOLLERR:表示对应的文件描述符发生错误;
EPOLLHUP:表示对应的文件描述符被挂断;
EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里

3. int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
等待epfd上的io事件,最多返回maxevents个事件。
参数events用来从内核得到事件的集合,maxevents告之内核这个events有多大,这个maxevents的值不能大于创建epoll_create()时的size,参数timeout是超时时间(毫秒,0会立即返回,-1将不确定,也有说法说是永久阻塞)。该函数返回需要处理的事件数目,如返回0表示已超时。

二 工作模式

 epoll对文件描述符的操作有两种模式:LT(level trigger)ET(edge trigger)。LT模式是默认模式,LT模式与ET模式的区别如下:
  LT模式:当epoll_wait检测到描述符事件发生并将此事件通知应用程序,应用程序可以不立即处理该事件。下次调用epoll_wait时,会再次响应应用程序并通知此事件。
  ET模式:当epoll_wait检测到描述符事件发生并将此事件通知应用程序,应用程序必须立即处理该事件。如果不处理,下次调用epoll_wait时,不会再次响应应用程序并通知此事件。

1. LT模式

LT(level triggered)是缺省的工作方式,并且同时支持block和no-block socket.在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你的。

2. ET模式

ET(edge-triggered)是高速工作方式,只支持no-block socket。在这种模式下,当描述符从未就绪变为就绪时,内核通过epoll告诉你。然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知,直到你做了某些操作导致那个文件描述符不再为就绪状态了(比如,你在发送,接收或者接收请求,或者发送接收的数据少于一定量时导致了一个EWOULDBLOCK 错误)。但是请注意,如果一直不对这个fd作IO操作(从而导致它再次变成未就绪),内核不会发送更多的通知(only once)

ET模式在很大程度上减少了epoll事件被重复触发的次数,因此效率要比LT模式高。epoll工作在ET模式的时候,必须使用非阻塞套接口,以避免由于一个文件句柄的阻塞读/阻塞写操作把处理多个文件描述符的任务饿死。

3. 总结

假如有这样一个例子:
1. 我们已经把一个用来从管道中读取数据的文件句柄(RFD)添加到epoll描述符
2. 这个时候从管道的另一端被写入了2KB的数据
3. 调用epoll_wait(2),并且它会返回RFD,说明它已经准备好读取操作
4. 然后我们读取了1KB的数据
5. 调用epoll_wait(2)......

LT模式:
如果是LT模式,那么在第5步调用epoll_wait(2)之后,仍然能受到通知。

ET模式:
如果我们在第1步将RFD添加到epoll描述符的时候使用了EPOLLET标志,那么在第5步调用epoll_wait(2)之后将有可能会挂起,因为剩余的数据还存在于文件的输入缓冲区内,而且数据发出端还在等待一个针对已经发出数据的反馈信息。只有在监视的文件句柄上发生了某个事件的时候 ET 工作模式才会汇报事件。因此在第5步的时候,调用者可能会放弃等待仍在存在于文件输入缓冲区内的剩余数据。

当使用epoll的ET模型来工作时,当产生了一个EPOLLIN事件后,
读数据的时候需要考虑的是当recv()返回的大小如果等于请求的大小,那么很有可能是缓冲区还有数据未读完,也意味着该次事件还没有处理完,所以还需要再次读取:

while(rs){
  buflen = recv(activeevents[i].data.fd, buf, sizeof(buf), 0);
  if(buflen < 0){
    // 由于是非阻塞的模式,所以当errno为EAGAIN时,表示当前缓冲区已无数据可读
    // 在这里就当作是该次事件已处理处.
    if(errno == EAGAIN){
        break;
    }
    else{
        return;
    }
  }
  else if(buflen == 0){
     // 这里表示对端的socket已正常关闭.
  }

 if(buflen == sizeof(buf){
      rs = 1;   // 需要再次读取
 }
 else{
      rs = 0;
 }
}

Linux中的EAGAIN含义

Linux环境下开发经常会碰到很多错误(设置errno),其中EAGAIN是其中比较常见的一个错误(比如用在非阻塞操作中)。
从字面上来看,是提示再试一次。这个错误经常出现在当应用程序进行一些非阻塞(non-blocking)操作(对文件或socket)的时候。

例如,以 O_NONBLOCK的标志打开文件/socket/FIFO,如果你连续做read操作而没有数据可读。此时程序不会阻塞起来等待数据准备就绪返回,read函数会返回一个错误EAGAIN,提示你的应用程序现在没有数据可读请稍后再试。
又例如,当一个系统调用(比如fork)因为没有足够的资源(比如虚拟内存)而执行失败,返回EAGAIN提示其再调用一次(也许下次就能成功)。

三 代码演示

下面是一段不完整的代码且格式不对,意在表述上面的过程,去掉了一些模板代码。

#define IPADDRESS   "127.0.0.1"
#define PORT        8787
#define MAXSIZE     1024
#define LISTENQ     5
#define FDSIZE      1000
#define EPOLLEVENTS 100

listenfd = socket_bind(IPADDRESS,PORT);

struct epoll_event events[EPOLLEVENTS];

//创建一个描述符
epollfd = epoll_create(FDSIZE);

//添加监听描述符事件
add_event(epollfd,listenfd,EPOLLIN);

//循环等待
for ( ; ; ){
    //该函数返回已经准备好的描述符事件数目
    ret = epoll_wait(epollfd,events,EPOLLEVENTS,-1);
    //处理接收到的连接
    handle_events(epollfd,events,ret,listenfd,buf);
}

//事件处理函数
static void handle_events(int epollfd,struct epoll_event *events,int num,int listenfd,char *buf)
{
     int i;
     int fd;
     //进行遍历;这里只要遍历已经准备好的io事件。num并不是当初epoll_create时的FDSIZE。
     for (i = 0;i < num;i++)
     {
         fd = events[i].data.fd;
        //根据描述符的类型和事件类型进行处理
         if ((fd == listenfd) &&(events[i].events & EPOLLIN))
            handle_accpet(epollfd,listenfd);
         else if (events[i].events & EPOLLIN)
            do_read(epollfd,fd,buf);
         else if (events[i].events & EPOLLOUT)
            do_write(epollfd,fd,buf);
     }
}

//添加事件
static void add_event(int epollfd,int fd,int state){
    struct epoll_event ev;
    ev.events = state;
    ev.data.fd = fd;
    epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&ev);
}

//处理接收到的连接
static void handle_accpet(int epollfd,int listenfd){
     int clifd;     
     struct sockaddr_in cliaddr;     
     socklen_t  cliaddrlen;     
     clifd = accept(listenfd,(struct sockaddr*)&cliaddr,&cliaddrlen);     
     if (clifd == -1)         
     perror("accpet error:");     
     else {         
         printf("accept a new client: %s:%d\n",inet_ntoa(cliaddr.sin_addr),cliaddr.sin_port);                       //添加一个客户描述符和事件         
         add_event(epollfd,clifd,EPOLLIN);     
     } 
}

//读处理
static void do_read(int epollfd,int fd,char *buf){
    int nread;
    nread = read(fd,buf,MAXSIZE);
    if (nread == -1)     {         
        perror("read error:");         
        close(fd); //记住close fd        
        delete_event(epollfd,fd,EPOLLIN); //删除监听 
    }
    else if (nread == 0)     {         
        fprintf(stderr,"client close.\n");
        close(fd); //记住close fd       
        delete_event(epollfd,fd,EPOLLIN); //删除监听 
    }     
    else {         
        printf("read message is : %s",buf);        
        //修改描述符对应的事件,由读改为写         
        modify_event(epollfd,fd,EPOLLOUT);     
    } 
}

//写处理
static void do_write(int epollfd,int fd,char *buf) {     
    int nwrite;     
    nwrite = write(fd,buf,strlen(buf));     
    if (nwrite == -1){         
        perror("write error:");        
        close(fd);   //记住close fd       
        delete_event(epollfd,fd,EPOLLOUT);  //删除监听    
    }else{
        modify_event(epollfd,fd,EPOLLIN); 
    }    
    memset(buf,0,MAXSIZE); 
}

//删除事件
static void delete_event(int epollfd,int fd,int state) {
    struct epoll_event ev;
    ev.events = state;
    ev.data.fd = fd;
    epoll_ctl(epollfd,EPOLL_CTL_DEL,fd,&ev);
}

//修改事件
static void modify_event(int epollfd,int fd,int state){     
    struct epoll_event ev;
    ev.events = state;
    ev.data.fd = fd;
    epoll_ctl(epollfd,EPOLL_CTL_MOD,fd,&ev);
}

//注:另外一端我就省了

四 epoll总结

在 select/poll中,进程只有在调用一定的方法后,内核才对所有监视的文件描述符进行扫描,而epoll事先通过epoll_ctl()来注册一 个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait() 时便得到通知。(此处去掉了遍历文件描述符,而是通过监听回调的的机制。这正是epoll的魅力所在。)

epoll的优点主要是一下几个方面:
1. 监视的描述符数量不受限制,它所支持的FD上限是最大可以打开文件的数目,这个数字一般远大于2048,举个例子,在1GB内存的机器上大约是10万左 右,具体数目可以cat /proc/sys/fs/file-max察看,一般来说这个数目和系统内存关系很大。select的最大缺点就是进程打开的fd是有数量限制的。这对 于连接数量比较大的服务器来说根本不能满足。虽然也可以选择多进程的解决方案( Apache就是这样实现的),不过虽然linux上面创建进程的代价比较小,但仍旧是不可忽视的,加上进程间数据同步远比不上线程间同步的高效,所以也不是一种完美的方案。

  1. IO的效率不会随着监视fd的数量的增长而下降。epoll不同于select和poll轮询的方式,而是通过每个fd定义的回调函数来实现的。只有就绪的fd才会执行回调函数。

如果没有大量的idle -connection或者dead-connection,epoll的效率并不会比select/poll高很多,但是当遇到大量的idle- connection,就会发现epoll的效率大大高于select/poll。

参考

用户空间与内核空间,进程上下文与中断上下文[总结]
进程切换
维基百科-文件描述符
Linux 中直接 I/O 机制的介绍
IO - 同步,异步,阻塞,非阻塞 (亡羊补牢篇)
Linux中select poll和epoll的区别
IO多路复用之select总结
IO多路复用之poll总结
IO多路复用之epoll总结

查看原文

tfzh 赞了文章 · 1月13日

Linux IO模式及 select、poll、epoll详解

注:本文是对众多博客的学习和总结,可能存在理解错误。请带着怀疑的眼光,同时如果有错误希望能指出。

同步IO和异步IO,阻塞IO和非阻塞IO分别是什么,到底有什么区别?不同的人在不同的上下文下给出的答案是不同的。所以先限定一下本文的上下文。

本文讨论的背景是Linux环境下的network IO。

一 概念说明

在进行解释之前,首先要说明几个概念:
- 用户空间和内核空间
- 进程切换
- 进程的阻塞
- 文件描述符
- 缓存 I/O

用户空间与内核空间

现在操作系统都是采用虚拟存储器,那么对32位操作系统而言,它的寻址空间(虚拟存储空间)为4G(2的32次方)。操作系统的核心是内核,独立于普通的应用程序,可以访问受保护的内存空间,也有访问底层硬件设备的所有权限。为了保证用户进程不能直接操作内核(kernel),保证内核的安全,操心系统将虚拟空间划分为两部分,一部分为内核空间,一部分为用户空间。针对linux操作系统而言,将最高的1G字节(从虚拟地址0xC0000000到0xFFFFFFFF),供内核使用,称为内核空间,而将较低的3G字节(从虚拟地址0x00000000到0xBFFFFFFF),供各个进程使用,称为用户空间。

进程切换

为了控制进程的执行,内核必须有能力挂起正在CPU上运行的进程,并恢复以前挂起的某个进程的执行。这种行为被称为进程切换。因此可以说,任何进程都是在操作系统内核的支持下运行的,是与内核紧密相关的。

从一个进程的运行转到另一个进程上运行,这个过程中经过下面这些变化:
1. 保存处理机上下文,包括程序计数器和其他寄存器。
2. 更新PCB信息。
3. 把进程的PCB移入相应的队列,如就绪、在某事件阻塞等队列。
4. 选择另一个进程执行,并更新其PCB。
5. 更新内存管理的数据结构。
6. 恢复处理机上下文。

注:总而言之就是很耗资源,具体的可以参考这篇文章:进程切换

进程的阻塞

正在执行的进程,由于期待的某些事件未发生,如请求系统资源失败、等待某种操作的完成、新数据尚未到达或无新工作做等,则由系统自动执行阻塞原语(Block),使自己由运行状态变为阻塞状态。可见,进程的阻塞是进程自身的一种主动行为,也因此只有处于运行态的进程(获得CPU),才可能将其转为阻塞状态。当进程进入阻塞状态,是不占用CPU资源的

文件描述符fd

文件描述符(File descriptor)是计算机科学中的一个术语,是一个用于表述指向文件的引用的抽象化概念。

文件描述符在形式上是一个非负整数。实际上,它是一个索引值,指向内核为每一个进程所维护的该进程打开文件的记录表。当程序打开一个现有文件或者创建一个新文件时,内核向进程返回一个文件描述符。在程序设计中,一些涉及底层的程序编写往往会围绕着文件描述符展开。但是文件描述符这一概念往往只适用于UNIX、Linux这样的操作系统。

缓存 I/O

缓存 I/O 又被称作标准 I/O,大多数文件系统的默认 I/O 操作都是缓存 I/O。在 Linux 的缓存 I/O 机制中,操作系统会将 I/O 的数据缓存在文件系统的页缓存( page cache )中,也就是说,数据会先被拷贝到操作系统内核的缓冲区中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。

缓存 I/O 的缺点:
数据在传输过程中需要在应用程序地址空间和内核进行多次数据拷贝操作,这些数据拷贝操作所带来的 CPU 以及内存开销是非常大的。

二 IO模式

刚才说了,对于一次IO访问(以read举例),数据会先被拷贝到操作系统内核的缓冲区中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。所以说,当一个read操作发生时,它会经历两个阶段:
1. 等待数据准备 (Waiting for the data to be ready)
2. 将数据从内核拷贝到进程中 (Copying the data from the kernel to the process)

正式因为这两个阶段,linux系统产生了下面五种网络模式的方案。
- 阻塞 I/O(blocking IO)
- 非阻塞 I/O(nonblocking IO)
- I/O 多路复用( IO multiplexing)
- 信号驱动 I/O( signal driven IO)
- 异步 I/O(asynchronous IO)

注:由于signal driven IO在实际中并不常用,所以我这只提及剩下的四种IO Model。

阻塞 I/O(blocking IO)

在linux中,默认情况下所有的socket都是blocking,一个典型的读操作流程大概是这样:
clipboard.png

当用户进程调用了recvfrom这个系统调用,kernel就开始了IO的第一个阶段:准备数据(对于网络IO来说,很多时候数据在一开始还没有到达。比如,还没有收到一个完整的UDP包。这个时候kernel就要等待足够的数据到来)。这个过程需要等待,也就是说数据被拷贝到操作系统内核的缓冲区中是需要一个过程的。而在用户进程这边,整个进程会被阻塞(当然,是进程自己选择的阻塞)。当kernel一直等到数据准备好了,它就会将数据从kernel中拷贝到用户内存,然后kernel返回结果,用户进程才解除block的状态,重新运行起来。

所以,blocking IO的特点就是在IO执行的两个阶段都被block了。

非阻塞 I/O(nonblocking IO)

linux下,可以通过设置socket使其变为non-blocking。当对一个non-blocking socket执行读操作时,流程是这个样子:
clipboard.png

当用户进程发出read操作时,如果kernel中的数据还没有准备好,那么它并不会block用户进程,而是立刻返回一个error。从用户进程角度讲 ,它发起一个read操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个error时,它就知道数据还没有准备好,于是它可以再次发送read操作。一旦kernel中的数据准备好了,并且又再次收到了用户进程的system call,那么它马上就将数据拷贝到了用户内存,然后返回。

所以,nonblocking IO的特点是用户进程需要不断的主动询问kernel数据好了没有。

I/O 多路复用( IO multiplexing)

IO multiplexing就是我们说的select,poll,epoll,有些地方也称这种IO方式为event driven IO。select/epoll的好处就在于单个process就可以同时处理多个网络连接的IO。它的基本原理就是select,poll,epoll这个function会不断的轮询所负责的所有socket,当某个socket有数据到达了,就通知用户进程。

clipboard.png

当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”所有select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程。

所以,I/O 多路复用的特点是通过一种机制一个进程能同时等待多个文件描述符,而这些文件描述符(套接字描述符)其中的任意一个进入读就绪状态,select()函数就可以返回。

这个图和blocking IO的图其实并没有太大的不同,事实上,还更差一些。因为这里需要使用两个system call (select 和 recvfrom),而blocking IO只调用了一个system call (recvfrom)。但是,用select的优势在于它可以同时处理多个connection。

所以,如果处理的连接数不是很高的话,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,可能延迟还更大。select/epoll的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接。)

在IO multiplexing Model中,实际中,对于每一个socket,一般都设置成为non-blocking,但是,如上图所示,整个用户的process其实是一直被block的。只不过process是被select这个函数block,而不是被socket IO给block。

异步 I/O(asynchronous IO)

inux下的asynchronous IO其实用得很少。先看一下它的流程:
clipboard.png

用户进程发起read操作之后,立刻就可以开始去做其它的事。而另一方面,从kernel的角度,当它受到一个asynchronous read之后,首先它会立刻返回,所以不会对用户进程产生任何block。然后,kernel会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都完成之后,kernel会给用户进程发送一个signal,告诉它read操作完成了。

总结

blocking和non-blocking的区别

调用blocking IO会一直block住对应的进程直到操作完成,而non-blocking IO在kernel还准备数据的情况下会立刻返回。

synchronous IO和asynchronous IO的区别

在说明synchronous IO和asynchronous IO的区别之前,需要先给出两者的定义。POSIX的定义是这样子的:
- A synchronous I/O operation causes the requesting process to be blocked until that I/O operation completes;
- An asynchronous I/O operation does not cause the requesting process to be blocked;

两者的区别就在于synchronous IO做”IO operation”的时候会将process阻塞。按照这个定义,之前所述的blocking IO,non-blocking IO,IO multiplexing都属于synchronous IO。

有人会说,non-blocking IO并没有被block啊。这里有个非常“狡猾”的地方,定义中所指的”IO operation”是指真实的IO操作,就是例子中的recvfrom这个system call。non-blocking IO在执行recvfrom这个system call的时候,如果kernel的数据没有准备好,这时候不会block进程。但是,当kernel中数据准备好的时候,recvfrom会将数据从kernel拷贝到用户内存中,这个时候进程是被block了,在这段时间内,进程是被block的。

而asynchronous IO则不一样,当进程发起IO 操作之后,就直接返回再也不理睬了,直到kernel发送一个信号,告诉进程说IO完成。在这整个过程中,进程完全没有被block。

各个IO Model的比较如图所示:
clipboard.png

通过上面的图片,可以发现non-blocking IO和asynchronous IO的区别还是很明显的。在non-blocking IO中,虽然进程大部分时间都不会被block,但是它仍然要求进程去主动的check,并且当数据准备完成以后,也需要进程主动的再次调用recvfrom来将数据拷贝到用户内存。而asynchronous IO则完全不同。它就像是用户进程将整个IO操作交给了他人(kernel)完成,然后他人做完后发信号通知。在此期间,用户进程不需要去检查IO操作的状态,也不需要主动的去拷贝数据。

三 I/O 多路复用之select、poll、epoll详解

select,poll,epoll都是IO多路复用的机制。I/O多路复用就是通过一种机制,一个进程可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。但select,poll,epoll本质上都是同步I/O,因为他们都需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的,而异步I/O则无需自己负责进行读写,异步I/O的实现会负责把数据从内核拷贝到用户空间。(这里啰嗦下)

select

int select (int n, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

select 函数监视的文件描述符分3类,分别是writefds、readfds、和exceptfds。调用后select函数会阻塞,直到有描述副就绪(有数据 可读、可写、或者有except),或者超时(timeout指定等待时间,如果立即返回设为null即可),函数返回。当select函数返回后,可以 通过遍历fdset,来找到就绪的描述符。

select目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点。select的一 个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,可以通过修改宏定义甚至重新编译内核的方式提升这一限制,但 是这样也会造成效率的降低。

poll

int poll (struct pollfd *fds, unsigned int nfds, int timeout);

不同与select使用三个位图来表示三个fdset的方式,poll使用一个 pollfd的指针实现。

struct pollfd {
    int fd; /* file descriptor */
    short events; /* requested events to watch */
    short revents; /* returned events witnessed */
};

pollfd结构包含了要监视的event和发生的event,不再使用select“参数-值”传递的方式。同时,pollfd并没有最大数量限制(但是数量过大后性能也是会下降)。 和select函数一样,poll返回后,需要轮询pollfd来获取就绪的描述符。

从上面看,select和poll都需要在返回后,通过遍历文件描述符来获取已经就绪的socket。事实上,同时连接的大量客户端在一时刻可能只有很少的处于就绪状态,因此随着监视的描述符数量的增长,其效率也会线性下降。

epoll

epoll是在2.6内核中提出的,是之前的select和poll的增强版本。相对于select和poll来说,epoll更加灵活,没有描述符限制。epoll使用一个文件描述符管理多个描述符,将用户关系的文件描述符的事件存放到内核的一个事件表中,这样在用户空间和内核空间的copy只需一次。

一 epoll操作过程

epoll操作过程需要三个接口,分别如下:

int epoll_create(int size);//创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

1. int epoll_create(int size);
创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大,这个参数不同于select()中的第一个参数,给出最大监听的fd+1的值,参数size并不是限制了epoll所能监听的描述符最大个数,只是对内核初始分配内部数据结构的一个建议
当创建好epoll句柄后,它就会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。

2. int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
函数是对指定描述符fd执行op操作。
- epfd:是epoll_create()的返回值。
- op:表示op操作,用三个宏来表示:添加EPOLL_CTL_ADD,删除EPOLL_CTL_DEL,修改EPOLL_CTL_MOD。分别添加、删除和修改对fd的监听事件。
- fd:是需要监听的fd(文件描述符)
- epoll_event:是告诉内核需要监听什么事,struct epoll_event结构如下:

struct epoll_event {
  __uint32_t events;  /* Epoll events */
  epoll_data_t data;  /* User data variable */
};

//events可以是以下几个宏的集合:
EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
EPOLLOUT:表示对应的文件描述符可以写;
EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
EPOLLERR:表示对应的文件描述符发生错误;
EPOLLHUP:表示对应的文件描述符被挂断;
EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里

3. int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
等待epfd上的io事件,最多返回maxevents个事件。
参数events用来从内核得到事件的集合,maxevents告之内核这个events有多大,这个maxevents的值不能大于创建epoll_create()时的size,参数timeout是超时时间(毫秒,0会立即返回,-1将不确定,也有说法说是永久阻塞)。该函数返回需要处理的事件数目,如返回0表示已超时。

二 工作模式

 epoll对文件描述符的操作有两种模式:LT(level trigger)ET(edge trigger)。LT模式是默认模式,LT模式与ET模式的区别如下:
  LT模式:当epoll_wait检测到描述符事件发生并将此事件通知应用程序,应用程序可以不立即处理该事件。下次调用epoll_wait时,会再次响应应用程序并通知此事件。
  ET模式:当epoll_wait检测到描述符事件发生并将此事件通知应用程序,应用程序必须立即处理该事件。如果不处理,下次调用epoll_wait时,不会再次响应应用程序并通知此事件。

1. LT模式

LT(level triggered)是缺省的工作方式,并且同时支持block和no-block socket.在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你的。

2. ET模式

ET(edge-triggered)是高速工作方式,只支持no-block socket。在这种模式下,当描述符从未就绪变为就绪时,内核通过epoll告诉你。然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知,直到你做了某些操作导致那个文件描述符不再为就绪状态了(比如,你在发送,接收或者接收请求,或者发送接收的数据少于一定量时导致了一个EWOULDBLOCK 错误)。但是请注意,如果一直不对这个fd作IO操作(从而导致它再次变成未就绪),内核不会发送更多的通知(only once)

ET模式在很大程度上减少了epoll事件被重复触发的次数,因此效率要比LT模式高。epoll工作在ET模式的时候,必须使用非阻塞套接口,以避免由于一个文件句柄的阻塞读/阻塞写操作把处理多个文件描述符的任务饿死。

3. 总结

假如有这样一个例子:
1. 我们已经把一个用来从管道中读取数据的文件句柄(RFD)添加到epoll描述符
2. 这个时候从管道的另一端被写入了2KB的数据
3. 调用epoll_wait(2),并且它会返回RFD,说明它已经准备好读取操作
4. 然后我们读取了1KB的数据
5. 调用epoll_wait(2)......

LT模式:
如果是LT模式,那么在第5步调用epoll_wait(2)之后,仍然能受到通知。

ET模式:
如果我们在第1步将RFD添加到epoll描述符的时候使用了EPOLLET标志,那么在第5步调用epoll_wait(2)之后将有可能会挂起,因为剩余的数据还存在于文件的输入缓冲区内,而且数据发出端还在等待一个针对已经发出数据的反馈信息。只有在监视的文件句柄上发生了某个事件的时候 ET 工作模式才会汇报事件。因此在第5步的时候,调用者可能会放弃等待仍在存在于文件输入缓冲区内的剩余数据。

当使用epoll的ET模型来工作时,当产生了一个EPOLLIN事件后,
读数据的时候需要考虑的是当recv()返回的大小如果等于请求的大小,那么很有可能是缓冲区还有数据未读完,也意味着该次事件还没有处理完,所以还需要再次读取:

while(rs){
  buflen = recv(activeevents[i].data.fd, buf, sizeof(buf), 0);
  if(buflen < 0){
    // 由于是非阻塞的模式,所以当errno为EAGAIN时,表示当前缓冲区已无数据可读
    // 在这里就当作是该次事件已处理处.
    if(errno == EAGAIN){
        break;
    }
    else{
        return;
    }
  }
  else if(buflen == 0){
     // 这里表示对端的socket已正常关闭.
  }

 if(buflen == sizeof(buf){
      rs = 1;   // 需要再次读取
 }
 else{
      rs = 0;
 }
}

Linux中的EAGAIN含义

Linux环境下开发经常会碰到很多错误(设置errno),其中EAGAIN是其中比较常见的一个错误(比如用在非阻塞操作中)。
从字面上来看,是提示再试一次。这个错误经常出现在当应用程序进行一些非阻塞(non-blocking)操作(对文件或socket)的时候。

例如,以 O_NONBLOCK的标志打开文件/socket/FIFO,如果你连续做read操作而没有数据可读。此时程序不会阻塞起来等待数据准备就绪返回,read函数会返回一个错误EAGAIN,提示你的应用程序现在没有数据可读请稍后再试。
又例如,当一个系统调用(比如fork)因为没有足够的资源(比如虚拟内存)而执行失败,返回EAGAIN提示其再调用一次(也许下次就能成功)。

三 代码演示

下面是一段不完整的代码且格式不对,意在表述上面的过程,去掉了一些模板代码。

#define IPADDRESS   "127.0.0.1"
#define PORT        8787
#define MAXSIZE     1024
#define LISTENQ     5
#define FDSIZE      1000
#define EPOLLEVENTS 100

listenfd = socket_bind(IPADDRESS,PORT);

struct epoll_event events[EPOLLEVENTS];

//创建一个描述符
epollfd = epoll_create(FDSIZE);

//添加监听描述符事件
add_event(epollfd,listenfd,EPOLLIN);

//循环等待
for ( ; ; ){
    //该函数返回已经准备好的描述符事件数目
    ret = epoll_wait(epollfd,events,EPOLLEVENTS,-1);
    //处理接收到的连接
    handle_events(epollfd,events,ret,listenfd,buf);
}

//事件处理函数
static void handle_events(int epollfd,struct epoll_event *events,int num,int listenfd,char *buf)
{
     int i;
     int fd;
     //进行遍历;这里只要遍历已经准备好的io事件。num并不是当初epoll_create时的FDSIZE。
     for (i = 0;i < num;i++)
     {
         fd = events[i].data.fd;
        //根据描述符的类型和事件类型进行处理
         if ((fd == listenfd) &&(events[i].events & EPOLLIN))
            handle_accpet(epollfd,listenfd);
         else if (events[i].events & EPOLLIN)
            do_read(epollfd,fd,buf);
         else if (events[i].events & EPOLLOUT)
            do_write(epollfd,fd,buf);
     }
}

//添加事件
static void add_event(int epollfd,int fd,int state){
    struct epoll_event ev;
    ev.events = state;
    ev.data.fd = fd;
    epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&ev);
}

//处理接收到的连接
static void handle_accpet(int epollfd,int listenfd){
     int clifd;     
     struct sockaddr_in cliaddr;     
     socklen_t  cliaddrlen;     
     clifd = accept(listenfd,(struct sockaddr*)&cliaddr,&cliaddrlen);     
     if (clifd == -1)         
     perror("accpet error:");     
     else {         
         printf("accept a new client: %s:%d\n",inet_ntoa(cliaddr.sin_addr),cliaddr.sin_port);                       //添加一个客户描述符和事件         
         add_event(epollfd,clifd,EPOLLIN);     
     } 
}

//读处理
static void do_read(int epollfd,int fd,char *buf){
    int nread;
    nread = read(fd,buf,MAXSIZE);
    if (nread == -1)     {         
        perror("read error:");         
        close(fd); //记住close fd        
        delete_event(epollfd,fd,EPOLLIN); //删除监听 
    }
    else if (nread == 0)     {         
        fprintf(stderr,"client close.\n");
        close(fd); //记住close fd       
        delete_event(epollfd,fd,EPOLLIN); //删除监听 
    }     
    else {         
        printf("read message is : %s",buf);        
        //修改描述符对应的事件,由读改为写         
        modify_event(epollfd,fd,EPOLLOUT);     
    } 
}

//写处理
static void do_write(int epollfd,int fd,char *buf) {     
    int nwrite;     
    nwrite = write(fd,buf,strlen(buf));     
    if (nwrite == -1){         
        perror("write error:");        
        close(fd);   //记住close fd       
        delete_event(epollfd,fd,EPOLLOUT);  //删除监听    
    }else{
        modify_event(epollfd,fd,EPOLLIN); 
    }    
    memset(buf,0,MAXSIZE); 
}

//删除事件
static void delete_event(int epollfd,int fd,int state) {
    struct epoll_event ev;
    ev.events = state;
    ev.data.fd = fd;
    epoll_ctl(epollfd,EPOLL_CTL_DEL,fd,&ev);
}

//修改事件
static void modify_event(int epollfd,int fd,int state){     
    struct epoll_event ev;
    ev.events = state;
    ev.data.fd = fd;
    epoll_ctl(epollfd,EPOLL_CTL_MOD,fd,&ev);
}

//注:另外一端我就省了

四 epoll总结

在 select/poll中,进程只有在调用一定的方法后,内核才对所有监视的文件描述符进行扫描,而epoll事先通过epoll_ctl()来注册一 个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait() 时便得到通知。(此处去掉了遍历文件描述符,而是通过监听回调的的机制。这正是epoll的魅力所在。)

epoll的优点主要是一下几个方面:
1. 监视的描述符数量不受限制,它所支持的FD上限是最大可以打开文件的数目,这个数字一般远大于2048,举个例子,在1GB内存的机器上大约是10万左 右,具体数目可以cat /proc/sys/fs/file-max察看,一般来说这个数目和系统内存关系很大。select的最大缺点就是进程打开的fd是有数量限制的。这对 于连接数量比较大的服务器来说根本不能满足。虽然也可以选择多进程的解决方案( Apache就是这样实现的),不过虽然linux上面创建进程的代价比较小,但仍旧是不可忽视的,加上进程间数据同步远比不上线程间同步的高效,所以也不是一种完美的方案。

  1. IO的效率不会随着监视fd的数量的增长而下降。epoll不同于select和poll轮询的方式,而是通过每个fd定义的回调函数来实现的。只有就绪的fd才会执行回调函数。

如果没有大量的idle -connection或者dead-connection,epoll的效率并不会比select/poll高很多,但是当遇到大量的idle- connection,就会发现epoll的效率大大高于select/poll。

参考

用户空间与内核空间,进程上下文与中断上下文[总结]
进程切换
维基百科-文件描述符
Linux 中直接 I/O 机制的介绍
IO - 同步,异步,阻塞,非阻塞 (亡羊补牢篇)
Linux中select poll和epoll的区别
IO多路复用之select总结
IO多路复用之poll总结
IO多路复用之epoll总结

查看原文

赞 597 收藏 1054 评论 64

tfzh 关注了专栏 · 1月13日

Swoole

PHP的协程框架

关注 7100

tfzh 赞了文章 · 1月11日

红旗 Linux 桌面操作系统 11 来了

图片

红旗Linux桌面操作系统11将于1月10日开放预览版的下载,新版本具有良好的硬件兼容,支持多款国产自主CPU品牌,同时还具有丰富的外设支持及海量的易用生态软件,打造全新的UI设计风格,带来更灵动的视觉效果和便捷的操作体验。

红旗Linux桌面操作系统11兼容x86、ARM、MIPS、SW等CPU指令集架构;支持国产自主CPU品牌:龙芯、申威、鲲鹏、麒麟、飞腾、海光、兆芯。图片官方地址:http://www.chinaredflag.cn/

图片

更新日志:图片丰富的外设支持

图片

开机画面:

图片

关机效果:

图片

来源:https://news.mydrivers.com/1/...
查看原文

赞 2 收藏 0 评论 0

tfzh 赞了文章 · 1月11日

2020年终总结——前端入坑四年,今年实惨

写在前面

2020最后一天,我在KFC写总结。前几天就想写了,嗯,我是拖延症患者。😔 2020年,换工作、被裁员,体检一堆问题... 回顾我31年的人生,没有重点小学、没有重点初高中、专科学历、没有获奖经历、没有特殊技能、学习能力差、英语不好、拖延.... 加上现在算大龄程序员了吧,似乎我的人生拿了一把烂牌,可是我还是想赢啊。

我怎么走上了前端这条路?

11年毕业后,在工厂里打工,做技术员,跟IT无关。最好的4年光阴,感觉错付了,但是不后悔,每个人都有不同的经历。然后14年的时候,觉得这么下去我能看见我退休甚至死亡的时刻。所以我觉得我应该学点什么?好让我的人生稍微有点不同。 于是我买了大学接触过的单片机,想做硬件开发工程师哦。

(就是这个玩意)

所以要学汇编、学C(事实上我没学好,也能做个计时器啥的...) ,然后放弃了,这个不太好找工作。 然后学Java去了(因为上网找C语言资料的时候),学了一段时间后,大概到SSH框架吧,15年那时候来过一次杭州,面了一个做Java开发的,给了3500的工资(没去,那时候正在跟女朋友谈婚阶段了)。 后面16年初的时候接触了web前端(因为JSP要写HTML,查资料的时候web前端培训的广告挺火的),慎重考虑后,16年6月辞了工作(已经结婚了,老婆也支持),来杭州报了一个线下的培训班学习4个月,挺贵的,我还是觉得自己没学好。然后10月底的时候拿了个7K的offer,感觉月薪过万有希望了。 我就正式入坑了...^_^

四年的工作经历

第一份工作

16年10月份找工作,面了4家公司拿了2个offer,选择了这家做医疗检验的。工作了差不多三年。一开始去公司,改.net项目样式。后面前后端分离用JQ写项目。然后用Vue全家桶来做项目。这个阶段技术成长了很多。 也组建了一个8人的前端团队(特别感谢老板的信任,当初我买房还找他借了20多万周转,现在还欠着这个人情呢)。后面有想法做了一些工程化的工作,对于前端基建方面没有任何认知,团队管理方面也没有管理知识的支撑。

第二份工作

出于提升自己的目的来到这家公司的,结果跟我想的不太一样(虽然公司大了些,开发跟第一家公司也差不多),这一年成长太少了。好在学了算法。参加了早早聊,打开了视野。也在团队中也做了几次分享。

第三份工作

本打算好好工作的,奈何天不遂人愿,被裁了(为了融资扩招,然后裁员,找工作真的要擦亮眼啊)。三个多月的时间,简历被搞花了....

第四份工作(还在面试中......)

可能会选择去大一点的团队吧。

2020,我到底干了啥?

工作

7月份之前,主要还是写业务代码,不过从年初到7月,大概面试了有小百人吧,这个对我来说收获还是蛮大的。接触了从应届到工作10年工作经验、大小厂的各种前端大小朋友。最大的感觉就是:

  • 工作年限不等于工作能力
  • 平台牛不代表你牛
  • 基础任何阶段都不能拉下

7月份换工作的时候也不是我本意,本来是申请回到总部办公的。由于其他原因变成了离职。7月底的时候来了新公司(当时Scott建议我去丁香园的,考虑距离问题选择了这家),然后11月被裁了(为了融资扩招,可怜我试用期还没过呢)。内心委屈,毕竟是想好好上班的...
11月到现在,面试了9家大型互联网公司,暂时没有拿到offer,但是很有信心,我一定可以。 这个月面试的感触:

  • 基础知识要夯实
  • 要有前端广阔的视野,后端要有一定的认知
  • 要有某一方向的技术深度
  • 性能优化必问
  • 算法一定要刷

学习

这段时间刷了很多大厂面试题,做了一个整理,也是为了方便自己复习,有空就可以拿出来刷一刷。

HTML 和 CSS
  • 你如何理解 HTML 结构的语义化?
  • 谈谈以前端角度出发做好 SEO 需要考虑什么?
  • 有哪项方式可以对一个 DOM 设置它的 CSS 样式?
  • CSS 都有哪些选择器?
  • CSS 中可以通过哪些属性定义,使得一个 DOM 元素不显示在浏览器可视范围内?
  • 超链接访问过后 hover 样式就不出现的问题是什么?如何解决?
  • 什么是 Css Hack?ie6,7,8 的 hack 分别是什么?
  • 请用 Css 写一个简单的幻灯片效果页面
  • 行内元素和块级元素的具体区别是什么?行内元素的padding和margin可设置吗?
  • 什么是外边距重叠?重叠的结果是什么

......

JS基础
  • call 和 apply 的区别
  • b 继承 a 的方法
  • JavaScript this 指针、闭包、作用域
  • 事件委托是什么
  • 闭包是什么,有什么特性,对页面有什么影响
  • 如何阻止事件冒泡和默认事件
  • 添加 删除 替换 插入到某个接点的方法
  • javascript 的本地对象,内置对象和宿主对象
  • document load 和 document ready 的区别
  • “==”和“===”的不同
  • javascript 的同源策略
  • 编写一个数组去重的方法

Ajax
  • Ajax 是什么? 如何创建一个 Ajax?
  • 同步和异步的区别?
  • 如何解决跨域问题?
  • 页面编码和被请求的资源编码如果不一致如何处理?
  • 简述 ajax 的过程。
  • 阐述一下异步加载。
  • 请解释一下 JavaScript 的同源策略。
  • GET 和 POST 的区别,何时使用 POST?
  • Ajax 的最大的特点是什么。
  • ajax 请求的时候 get 和 post 方式的区别
  • 解释 jsonp 的原理,以及为什么不是真正的 ajax
  • http 常见的状态码有那些?分别代表是什么意思?
  • 一个页面从输入 URL 到页面加载显示完成,这个过程中都发生了什么?

......

JS高级
  • JQuery 一个对象可以同时绑定多个事件,这是如何实现的?
  • 知道什么是 webkit 么? 知道怎么用浏览器的各种工具来调试和 debug 代码么?
  • 如何测试前端代码么? 知道 BDD, TDD, Unit Test 么? 知道怎么测试你的前端工程么(mocha, sinon, jasmin, qUnit..)
  • 前端 templating(Mustache, underscore, handlebars)是干嘛的, 怎么用?
  • 简述一下 Handlebars 的基本用法?
  • 简述一下 Handlerbars 的对模板的基本处理流程, 如何编译的?如何缓存的?
  • 用 js 实现千位分隔符?
  • 检测浏览器版本版本有哪些方式?
  • 我们给一个 dom 同时绑定两个点击事件,一个用捕获,一个用冒泡,你来说下会执
  • 行几次事件,然后会先执行冒泡还是捕获

......

Vue
  • vuex 有哪几种属性?
  • vuex 的 State 特性是?
  • vuex 的 Getter 特性是?
  • vuex 的 Mutation 特性是?
  • Vue.js 中 ajax 请求代码应该写在组件的 methods 中还是 vuex 的 actions 中?
  • 什么是 MVVM?
  • mvvm 和 mvc 区别?它和其它框架(jquery)的区别是什么?哪些场景适合?
  • vue 的优点是什么?
  • 组件之间的传值?
  • vue.cli 中怎样使用自定义的组件?有遇到过哪些问题吗?
  • vue 如何实现按需加载配合 webpack 设置
  • Vue 中引入组件的步骤?
  • 指令 v-el 的作用是什么?
  • 在 Vue 中使用插件的步骤
  • vue 生命周期的作用是什么
  • vue 生命周期总共有几个阶段
  • 第一次页面加载会触发哪几个钩子
  • DOM 渲染在 哪个周期中就已经完成
  • 简单描述每个周期具体适合哪些场

浏览器
  • 跨标签页通讯
  • 浏览器架构
  • 浏览器下事件循环(Event Loop)
  • 从输入 url 到展示的过程
  • 重绘与回流
  • 存储
  • Web Worker
  • V8 垃圾回收机制
  • 内存泄露
  • reflow(回流)和 repaint(重绘)优化
  • 如何减少重绘和回流?
  • 一个页面从输入 URL 到页面加载显示完成,这个过程中都发生了什么?
  • localStorage 与 sessionStorage 与 cookie 的区别总结
  • 浏览器如何阻止事件传播,阻止默认行为

生活

平平淡淡,两个姑娘越来越漂亮了 这个月没上班,接送大姑娘上下学、去培训班。现在的小孩子要学的可真多,我3岁多的时候应该在捏泥巴吧。 遗憾的是,因为疫情的原因没能带老婆孩子出去玩玩,明年补上吧。

2021,一定会更好的

许多事情要提上日程了,健身、英语、算法、理财、旅游、装修........想做的事情很多。

  • [ ] 多写一些总结,多复盘【每月输出2篇文档】
  • [ ] 算法【每日打卡,要弄懂】
  • [ ] 一次旅游 【但愿疫情彻底过去吧】
  • [ ] 读书【尽量每月1本吧】
  • [ ] 装修【房子交付要装修了】
  • [ ] 健身 【动起来,体检好几项都不行,得关注一下身体健康了】
  • [ ] 英语【背背单词】
  • [ ] 理财 【暂定吧】
  • [ ] ....

2021年,走的慢一点没关系,千万不能停!未来一定会更好的。

最后

前端工作四年多点了,好在没有放弃。虽然离优秀还很远,但贵在一直坚持。技术慢慢在提升、视野越来越开阔、薪水也比刚做前端时多了3倍多,我没法跟那些优秀的人比,我只能跟自己比,今天的我比昨天的我进步了一点就很开心了。 2021年,一定要读完的3本书《刻意练习》、《复盘》、《戒了吧,拖延症》。 愿自己越来越好。

查看原文

赞 21 收藏 7 评论 9

tfzh 赞了文章 · 1月11日

axios用法总结

通过向axios传递相关配置项 ( confg对象 ),创建axios请求:

axios({
    method: "post",
        url: “/user”,
        data: { firstName: "Fred", lastName: "Flintstone" },
        params: { id: 1234}
})

axios请求方法别名

axios.request(config)
axios.get(url[, config])
axios.delete(url[, config])
axios.head(url[, config])
axios.options(url[, config])
axios.post(url[, data[, config]])
axios.put(url[, data[, config]])
axios.patch(url[, data[, config]])

这一段,信息量很大:
比如: axios.post(url[, data[, config]])
axios.post()的第一个参数是url,第2个参数是data参数(即post的body),第三个参数是config对象。

通过观察,使用别名时,method,url 和 data都不必在最后一个参数config对象中指定。

axios.all() 参数是可迭代对象,array,set 或者 map等
axios.spread() 参数是一个回调函数
axios.create( ) 参数是一个config对象

全局的axios默认配置:

axios.defaults.baseURL = "http://www.123.com/"
axios.defaults.headers.post['Content-Type'] = 'application/x-www-urlencoded'

当实例创建后,也可修改实例的默认配置

axiosInstance.defaults.headers.common['Autorization'] = AUTH_TOKEN

配置的优先级:

传入的config中的配置 > instance.defaults设置的配置 > axios.create() 中传入的配置

axios错误处理:
config配置项中加入了validateStatus后,会出现下面改变

validateStatus: (status) => {
      // return status <= 200 // e.response 会被打印出来
      return status <= 500 //e.response 不会被打印出来
    }

即:
validateStatus 函数返回true时,response会出现在 instance.interceptors.response.use(successCallBack, errorCallBack) 的successCallBack;为 false时,会出现在 errorCallBack中。

所以,拦截非200的请求可以这么做:

validateStatus: (status) => {
      return status === 200
    }

将axios请求中的token用批量替换的方式进行替换

instance.interceptors.request.use((config) => {
    let url = config.url
    if(url.includes("{token}")) {
      config.url = url.replace("{token}", token)
    }
    return config
  }, (error) => {
    console.log("error 结果", error)
  })

当参数要以application / x-www-form-urlencoded格式发送数据。可以有以下方法(测试过):
1,借用qs.stringify,请求的headers的Content-Type会自动改变为 application / x-www-form-urlencoded 数据也会变成url的格式
2,使用formData,看代码:
上面和下面的方式,axios.post传入params,都可以得到想要的结果

let params = new FormData()
params.append('name', obj.brokerCode)
params.append('longLink', obj.longLink)

const params = qs.stringify({
    name: obj.brokerCode,
    longLink: obj.longLink
})

以上就是axios请求,后面在使用过程中遇到问题,会继续补充。

查看原文

赞 2 收藏 0 评论 0

tfzh 发布了文章 · 1月9日

C 进程间通讯

先大概的总结一下,及基本的用法,其中每一个,展开都可以写一篇文章的

  • 匿名管道(Pipe)
  • 命名管道(Named pipe)
  • 消息队列 (Message Queues)
  • 信号 (Signal)
  • 信号量 (Semaphore)
  • 共享内存 (Shared Memory)
  • Socket

1.1匿名管道(Pipe)

//1.1匿名管道通讯
#include <stdio.h>
#include <unistd.h>
#include <string.h>
int main(int argc, char const *argv[])
{
    pid_t my_pid = getpid();
    char buffer[256];
    memset(buffer, 0, sizeof(buffer));
    int fildes[2];//fildes[0]接受数据,fildes[1]发送数据
    
    if (-1 == pipe(fildes)) {
        printf("创建管道失败\n");
        return 1;
    }
    int pid = fork();
    printf("fork result:%d\n",pid);

    if (pid == 0) { //子进程
        my_pid = getpid();//更新当前pid
        printf("子进程:%d: sleep 5s\n",my_pid);
        sleep(5);
        int nbyte = write(fildes[1], "你好呀,我是子进程", strlen("你好呀,我是子进程"));
        printf("子进程:%d:写了%d字节\n",my_pid, nbyte);
        close(fildes[1]);
    } else {
        my_pid = getpid();//更新当前pid
        read(fildes[0], buffer, sizeof(buffer));
        printf("父进程:%d:读了内容=>[%s]\n",my_pid, buffer);
        close(fildes[0]);
        printf("父进程:%d:sleep 5s\n",my_pid);
        sleep(5);
    }    
    
    return 0;
}

Res:

terence@k8s-master:/mydata/linux$ ./a.out
fork result:55440
fork result:0
子进程:55440
子进程:55440:写了27字节
父进程:55439:读了内容=>[你好呀,我是子进程]
父进程:55439

//与此同时另一个ssh窗口查看进程(因为代码中故意sleep了)
terence@k8s-master:/mydata/linux$ ps -ef | grep 'a.out'
terence   55439  97831  0 15:21 pts/0    00:00:00 ./a.out
terence   55440  55439  0 15:21 pts/0    00:00:00 ./a.out

其中ssh进程号是97831,测试进程a.out是55439,fork的子进程号为55440

1.2命名管道(Named pipe)

//1.2命名管道通讯
terence@k8s-master:/mydata/linux$ mkfifo named_pipe_1
terence@k8s-master:/mydata/linux$ ll
drwxrwxr-x  2 terence terence 4096 Jan  8 16:35 ./
drwxrwxrwx 20 root    root    4096 Jan  8 14:20 ../
prw-rw-r--  1 terence terence    0 Jan  8 16:35 named_pipe_1|
1.2.1 named_pipe_write.c
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <fcntl.h>
int main(int argc, char const *argv[])
{
    //O_RDWR   读写打开 不会阻塞
    //O_RDONLY 只读 会阻塞
    //O_WRONLY 只写 会阻塞
    char buf[255]="";
    int fd_write=open(argv[1],O_WRONLY);
    printf("fd_write=%d\n",fd_write);
    write(fd_write,"hello im writer",strlen("hello im writer"));
    printf("写入 hello im writer\n");
    return 0;
}
1.2.2 named_pipe_read.c
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <fcntl.h>
int main(int argc, char const *argv[])
{
    //O_RDWR   读写打开 不会阻塞
    //O_RDONLY 只读 会阻塞
    //O_WRONLY 只写 会阻塞
    char buf[255]="";
    int fd_read=open(argv[1],O_RDONLY);
    printf("fd_read=%d\n",fd_read);
    read(fd_read,buf,sizeof(buf));
    printf("read data:%s\n",buf);
    return 0;
}

Res:

terence@k8s-master:/mydata/linux$ ./write named_pipe_1  

---------------------------------------------------------------------------------------------------------------------------
阻塞......                                                            terence@k8s-master:/mydata/linux$ terence@k8s-master:/mydata/linux$./read named_pipe_1
---------------------------------------------------------------------------------------------------------------------------
fd_write=3
写入 hello im writer
---------------------------------------------------------------------------------------------------------------------------
                                                                      fd_read=3
                                                                      read data:hello im writer

1.3 消息队列 (Message Queues)

1.3.1 msg_send.c
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <string.h>
//这个结构体可以自己定义,但必须要有一个消息类型type
typedef struct _msg
{
    long mtype;
    char mtext[50];
}MSG;
int main(int argc, char const *argv[])
{
    key_t key;
    int   msgqid;
    MSG   msg;

    key = ftok(".", 2021); // key 值
    // 创建消息队列
    msgqid = msgget(key, IPC_CREAT|0666);
    if(msgqid == -1)
    {
        perror("msgget");
        exit(-1);
    }

    msg.mtype = 100;    //消息类型
    strcpy(msg.mtext, "hello world!"); // 正文内容

    /* 添加消息
    msg_id:消息队列标识符
    &msg:消息结构体地址
    sizeof(msg)-sizeof(long):消息正文大小
    0:习惯用0
    */
    msgsnd(msgqid, &msg, sizeof(msg)-sizeof(long), 0);
    printf("send: %s\n", msg.mtext);

    return 0;
}
1.3.2 msg_read.c
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <string.h>
//这个结构体可以自己定义,但必须要有一个消息类型type
typedef struct _msg
{
    long mtype;
    char mtext[50];
}MSG;
int main(int argc, char const *argv[])
{
    key_t key;
    int   msgqid;
    MSG   msg;
    
    key = ftok(".", 2021); // key 值
    // 创建消息队列
    msgqid = msgget(key, IPC_CREAT|0666);
    printf("msgqid=%d\n", msgqid);
    if(msgqid == -1)
    {
        perror("msgget");
        exit(-1);
    }

    memset(&msg, 0, sizeof(msg));
    /* 取出类型为 10 的消息
    msg_id:消息队列标识符
    &msg:消息结构体地址
    sizeof(msg)-sizeof(long):消息正文大小
    (long)10:消息的类型
    0:习惯用0
    */
    msgrcv(msgqid, &msg, 10000, 100, 0);
    printf("msg.mtext=%s\n", msg.mtext);

    // 把消息队列删除
    // IPC_RMID:删除标志位
    //msgctl(msgqid, IPC_RMID, NULL);
    return 0;
}

Res:

//msg_send
terence@k8s-master:/mydata/linux$ gcc -o msg_send msg_send.c
terence@k8s-master:/mydata/linux$ ./msg_send
send: hello world!

//查看队列
terence@k8s-master:/mydata/linux$ ipcs -q

------ Message Queues --------
key        msqid      owner      perms      used-bytes   messages    
0xe5013e9b 0          terence    666        56           1

//read
terence@k8s-master:/mydata/linux$ gcc -o msg_read msg_read.c
terence@k8s-master:/mydata/linux$ ./msg_read
msg.mtext=hello world!

1.3 信号 (Signal)

#include <stdio.h>
#include <unistd.h>
#include <signal.h>
#include <stdlib.h>
 
void SIGINT_handle(int sig_num)
{
    //Ctrl + C 捕获
    printf("reveice SIGINT signal %d \n", sig_num);
}
void SIGQUIT_handle(int sig_num)
{
    //Ctrl + \ 捕获
    printf("reveice SIGQUIT signal %d \n", sig_num);
}
 
int main(int argc, char argv[])
{
    int i = 0;
    signal(SIGINT, SIGINT_handle);
    signal(SIGQUIT, SIGQUIT_handle);
    while(i < 10){
        printf("wait signal\n");
        sleep(5);
        i++;
    }
    return 0;
}

Res:

terence@k8s-master:/mydata/linux$ gcc -o signal signal.c
terence@k8s-master:/mydata/linux$ ./signal
wait signal
^Creveice SIGINT signal 2 
wait signal
^\reveice SIGQUIT signal 3
---------------------------------------------------------------------------------------------------------------------------
                                                                    //也可以在别的窗口命令行kill发送信号
                                                                    terence@k8s-master:/mydata/linux$ ps -ef | grep signal
                                                                    terence@k8s-master:/mydata/linux$ kill -SIGINT 72553
---------------------------------------------------------------------------------------------------------------------------
wait signal
Killed

1.4 信号量 (Semaphore)

1.4.1 sem.c 操作信号量
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/ipc.h>
#include <sys/sem.h>
// struct sembuf{
//     short sem_num; // 除非使用一组信号量,否则它为0
//     short sem_op;  // 信号量在一次操作中需要改变的数据,通常是两个数,一个是-1,即P(等待)操作,
//                    // 一个是+1,即V(发送信号)操作。
//     short sem_flg; // 通常为SEM_UNDO,使操作系统跟踪信号,
//                    // 并在进程没有释放该信号量而终止时,操作系统释放信号量
// };
union semun
{
 int             val;
 struct semid_ds *buf;
 unsigned short  *array;
 struct seminfo  *__buf; 
};
int main(int argc, char const *argv[])
{
    key_t key;
    int   semid;
    int   num;
    struct sembuf sem_b;
    union semun   sem_args;
    unsigned short array[1]={1};
    int op;

    sem_args.array = array;
    
    key = ftok(".", 2021); // key 值
    printf("key=%d\n", (int)key);
    printf("semid=%d\n", semid);

    
    // 创建信号量
    semid = semget(key, 1, IPC_CREAT|0666);

    semctl(semid, 0, SETALL, sem_args);//0代表对1个信号来量初始化,即有1个资源
    while(1){
        num = rand() % 100 + 1;

        if (num > 50)
        {
            op = 1;
        }else{
            op = -1;
        }
        sem_b.sem_num = 0;
        sem_b.sem_op = op;//P()
        sem_b.sem_flg = SEM_UNDO;
        semop(semid, &sem_b, 1);
        printf("set semval=%d\n", op);
        sleep(3);
    }
    return 0;
}
1.4.2 sem_client.c 读取信号量
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <sys/ipc.h>
#include <sys/sem.h>
int main(int argc, char const *argv[])
{
    key_t key;
    int   semid;
    int   semval;
    
    key = ftok(".", 2021); // key 值
    printf("key=%d\n", (int)key);
    printf("semid=%d\n", semid);
    // 创建信号量
    semid = semget(key, 1, IPC_CREAT|0666);

    while(1){
        semval = semctl(semid, 0, GETVAL, 0);
        printf("semval=%d\n", (int)semval);
        sleep(3);
    }
    return 0;
}

Res:

//set
terence@k8s-master:/mydata/linux$ gcc -o sem sem.c
terence@k8s-master:/mydata/linux$ ./sem
key=-452903269
semid=0
set semval=1
set semval=-1
set semval=1
set semval=-1
set semval=1
...

//查看
terence@k8s-master:~$ ipcs -s

------ Semaphore Arrays --------
key        semid      owner      perms      nsems     
0xe5013e9b 0          terence    666        1         

terence@k8s-master:~$

//get
terence@k8s-master:/mydata/linux$ gcc -o sem_client sem_client.c
terence@k8s-master:/mydata/linux$ ./sem_client
key=-452903269
semid=0
semval=1
semval=2
semval=3
semval=4
...

1.5 共享内存 (Shared Memory)

1.5.1 mem_write.c 写入共享内存
#include <sys/shm.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
int main(int argc, char const *argv[])
{
    int   shmid;
    key_t key;
    //创建共享内存的key
    key = ftok(".", 2021); // key 值
    if (key == -1)
    {
        perror("ftok error");
    }
    // 创建1024字节的共享内存
    shmid = shmget(key, 1024, IPC_CREAT|0666);
    printf("shmid=%d\n", shmid);

    // 共享内存连接到本进程某个指针
    char *shmadd;
    shmadd = shmat(shmid, NULL, 0);

    char *msg;
    msg = "Hello World!";
    printf("set data to shared-memory :%s\n", msg);
    // Set data
    strcpy(shmadd, msg);
    // 分离
    shmdt(shmadd);
    return 0;
}
1.5.2 mem_read.c 读取共享内存
#include <sys/shm.h>
#include <stdio.h>
#include <sys/ipc.h>
#include <stdlib.h>
#include <strings.h>
int main(int argc, char const *argv[])
{
    int   shmid;
    key_t key;
    //创建共享内存的key
    key = ftok(".", 2021); // key 值
    if (key == -1)
    {
        perror("ftok error");
    }
    // 创建|打开1024字节的共享内存
    shmid = shmget(key, 1024, IPC_CREAT|0666);
    printf("shmid=%d\n", shmid);

    // 共享内存连接到本进程某个指针
    char *shmadd;
    shmadd = shmat(shmid, NULL, 0);

    printf("read data from shared-memory :%s\n", shmadd);

    // 分离
    shmdt(shmadd);

    // 删除共享内存
    shmctl(shmid, IPC_RMID, NULL);
    return 0;
}

1.6 Socket

这里使用TCP示例

1.6.1 socket_server.c 服务端
#include <sys/types.h>
#include <sys/socket.h>
#include <stdio.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <fcntl.h>
#define MYPORT  8887
#define QUEUE   20
#define BUFFER_SIZE 1024
int main(int argc, char const *argv[])
{
    //定义sockfd
    int server_socket = socket(AF_INET,SOCK_STREAM, 0);
    //定义sockaddr_in
    struct sockaddr_in server_sockaddr;
    memset(&server_sockaddr, 0, sizeof(server_sockaddr));  //每个字节都用0填充

    server_sockaddr.sin_family      = AF_INET;
    server_sockaddr.sin_port        = htons(MYPORT);
    server_sockaddr.sin_addr.s_addr = htonl(INADDR_ANY);

    //bind,成功返回0,出错返回-1
    if(bind(server_socket,(struct sockaddr *)&server_sockaddr,sizeof(server_sockaddr))==-1)
    {
        perror("bind");
        exit(1);
    }
    printf("blind...\n");

    //listen,成功返回0,出错返回-1
    if(listen(server_socket,QUEUE) == -1)
    {
        perror("listen");
        exit(1);
    }
    printf("listen...\n");

    //客户端套接字
    char   buffer[BUFFER_SIZE];
    struct sockaddr_in client_addr;
    socklen_t length = sizeof(client_addr);
 
    int conn;
    //成功返回非负描述字,出错返回-1
    conn = accept(server_socket, (struct sockaddr*)&client_addr, &length);
    if (conn == -1)
    {
        printf("accept error....\n");
        close(server_socket);
        exit(1);
    }
    printf("read socket=%d\n",conn);
    memset(buffer,0,sizeof(buffer));
    int len = read(conn, buffer, sizeof(buffer));
    printf("read data from %d:%s \n", conn, buffer);
    memset(buffer,0,sizeof(buffer));
    strcpy(buffer, "我收到啦!");
    write(conn, buffer, sizeof(buffer));
    printf("send data to %d:%s \n", conn,buffer);
    printf("close....\n");
    close(conn);
    close(server_socket);
    return 0;
}
1.6.2 socket_client.c 服务端
#include <sys/types.h>
#include <sys/socket.h>
#include <stdio.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <fcntl.h>
 
#define MYPORT  8887
#define BUFFER_SIZE 1024
 
int main()
{
    ///定义sockfd
    int sock_cli = socket(AF_INET,SOCK_STREAM, 0);
 
    ///定义sockaddr_in
    struct sockaddr_in servaddr;
    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_port = htons(MYPORT);  ///服务器端口
    servaddr.sin_addr.s_addr = inet_addr("127.0.0.1");  ///服务器ip
 
    ///连接服务器,成功返回0,错误返回-1
    if (connect(sock_cli, (struct sockaddr *)&servaddr, sizeof(servaddr)) < 0)
    {
        perror("connect");
        exit(1);
    }
    printf("connected!\n");
    char sendbuf[BUFFER_SIZE];
    char recvbuf[BUFFER_SIZE];

    memset(sendbuf , 0,  sizeof(sendbuf));
    strcpy(sendbuf, "你好啊,我是client!");
    write(sock_cli, sendbuf, strlen(sendbuf)); ///发送
    printf("send data %s \n", sendbuf);

    memset(recvbuf , 0,  sizeof(recvbuf));
    read(sock_cli, recvbuf, sizeof(recvbuf)); ///接收
    printf("get data %s \n", recvbuf);
    printf("close....\n");
    close(sock_cli);
    return 0;
}

Res:

//server
terence@k8s-master:/mydata/linux$ gcc -o socket_server socket_server.c
terence@k8s-master:/mydata/linux$ ./socket_server
blind...
listen...
read socket=4
read data from 4:你好啊,我是client! 
send data to 4:我收到啦! 
close....

//client
terence@k8s-master:/mydata/linux$ gcc -o socket_client socket_client.c
terence@k8s-master:/mydata/linux$ ./socket_client
connected!
send data 你好啊,我是client! 
get data 我收到啦! 
close....~~~~
查看原文

赞 1 收藏 1 评论 0

tfzh 赞了文章 · 1月8日

go mod graph 可视化——gmchart

背景

之前构建 golang 遇到个问题,就是明明指定了依赖的包版本,在构建时,又自动把版本号给升上去了,当时不知道为什么。后面知道有个 go mod graph 的命令能列出所有的依赖,试了下,一点都不直观,还得复制到文本编辑器里面来来检索信息。

于是我就上网查了下相关工具。

别的工具

检索 Go Module 依赖关系 可视化

网上还是有很多工具。大部分方案都 graphvizechart, 或者用绘制图片,我就试了几个。

graphviz 这个工具很强大,但需要通过其他渠道安装,且生成的svg可读性也不那么好,特别是依赖的包多起来之后,例如下面这个:

go mod graphviz

于是乎我转向 echart,

哇哦,看着挺不错的,鼠标悬停反馈也不错,配色也很鲜艳。

go mod echart

echart 方案很漂亮,但没法用。svg我还能搜索包名,echart 这个不显示包名。

好了,吐槽归吐槽,问题还是要解决的。今天给大家介绍个 go mod graph 可视化工具—— gmchart

gmchart

github: https://github.com/PaulXu-cn/...

安装

go get -u github.com/PaulXu-cn/go-mod-graph-chart/gmchart

检查安装情况,如下就是成功了

gmchart --help

Usage of ~\go\bin\gmchart:
  -debug int
        is debug model
  -keep int
        start http server not exit

使用

进入 golang 项目,输入命令:

go mod graph | gmchart
会自动打开浏览器,如果没有就手动一下

访问 http://127.0.0.1:60306 就能看到

可以看到,它将依赖形成了一个 依赖树,你可以知道某个包在第几层被引入的,非常直观。网页内是一个svg,你要找某个包也非常方便,直接 ctr+F 就能检索。

总结

找了那么久,为啥还是没有一个趁手的工具呢?

我想了下,可视化是前端的擅长的啊,前端不用 golang 啊,会的也少,会 Golang 的前端,还对 go mod graph 这个功能感兴趣的就更少了,所以这个工具只能是我们后端来做了

哎~

其他方案

查看原文

赞 4 收藏 3 评论 0

认证与成就

  • 获得 62 次点赞
  • 获得 6 枚徽章 获得 0 枚金徽章, 获得 1 枚银徽章, 获得 5 枚铜徽章

擅长技能
编辑

开源项目 & 著作
编辑

(゚∀゚ )
暂时没有

注册于 2016-09-22
个人主页被 2.1k 人浏览