张攀钦

张攀钦 查看完整档案

南京编辑  |  填写毕业院校  |  填写所在公司/组织 www.mflyyou.cn/ 编辑
编辑
_ | |__ _ _ __ _ | '_ \| | | |/ _` | | |_) | |_| | (_| | |_.__/ \__,_|\__, | |___/ 该用户太懒什么也没留下

个人动态

张攀钦 发布了文章 · 8月9日

你需要知道的TCP/IP

前言

TCP/IP 协议 是网络通信的基石,TCP/IP 协议 不是只有 TCPIP 协议,它是整个网络通信中所有协议的简称。

维基百科:TCP/IP协议簇)

维基百科:OSI模型

# TCP/IP 参考模型维基百科
https://zh.wikipedia.org/wiki/TCP/IP%E5%8D%8F%E8%AE%AE%E6%97%8F
# OIS 参考模型维基百科
https://zh.wikipedia.org/wiki/OSI%E6%A8%A1%E5%9E%8B

<img data-original="http://oss.mflyyou.cn/blog/20200801104517.png?author=zhangpanqin" alt="image-20200801104517510" style="zoom:50%;" />

图片来自 《图解 TCP/IP 与 OSI 参考模型》 中 TCP/IP 协议分层模型

OSI 参考模型 (七层)是个理论模型,实际我们用的是 TCP/IP (四层)模型。不过我们可以通过 OSI 参考模型 来学习 TCP/IP 模型。

应用层:应用程序通信细节的协议,比如常用的 HTTP

传输层:主要是负责两个节点之间数据传输,通信标识是 port 端口号。

网络层:地址管理和路由选择,在两点之间找到一条最佳的通信路线,通信标识是 IP

数据链路层:负责物理层面链接的通信(同一个网段内)。也就是局域网中通过交换机链接的节点。通信标识是 Mac 地址,网卡出厂自带的标识。

物理层:将链路层的数据帧(字节流)转换为电压或光信号传播。

网络通信可以做什么呢?

redisson (一个操作 redis 的 java 库),就是使用的 netty 来做网络通信连接 redis 服务的。

微服务中的服务发现和通信,就需要你熟悉网络通信。

你要是在通信行业,那就不是了解了,你连协议的规范都得很清楚,不然路由器你都整不出来,还说什么 5G

作为一个 Java 后端开发,主要是开发偏应用层面的程序,离底层相对比较远,熟练掌握即可,如果以后做通信行业的时候,你也一定会进一步学习的相关细节的。

TCP/IP 你不了解,也不会有多大问题,CRUD 还是没有问题的。但是你了解了之后,日常开发定位和解决问题方面有很大助力,总之学习 TCP/IP 是一个重要不紧急的事情,根据自己目标和层次安排。

本文内容

  • 局域网中各节点怎么通信
  • 介绍 IP,ICMP,ARP 协议在网络层的作用及路由表的作用,及网段划分,子网掩码、网关的作用
  • 介绍交换机和路由器的作用
  • 介绍 TCP/IP 三次握手和四次挥手,TCP 中通信状态的作用,滑动窗口
  • 介绍 tcp 包格式,ip 包格式,链路层 数据格式

交换机与路由器

交换机

维基百科:交换机)

交换机上有多个端口(不是 port)供计算机连接,交换机会维护端口与连接这个端口的 PC 的 Mac 地址映射表。当交换机接受到数据的时候,会根据目的 Mac 地址,发送到对应的端口上,然后经过网线发送到目的 PC。

交换机链接多个电脑组成一个局域网,交换机链接交换机又可以组成一个更大的局域网。

比如 A、B 交换机各有 100 个端口,A 链接了 99 个PC,然后 B 交换机链接99 个,再将其中的一个端口 A/B 之间相互连接组成一个更大的局域网。

路由器

维基百科:路由器)

路由器工作在网络层,主要用于将一个网段数据包转发到另一个网段内。路由器上也会有个几个 LAN 口 (Local Area Network,局域网),用于建立局域网。还会有一个 WAN(Wide Area Network,广域网),连接运营商的网络。

路由器也具有交换机的功能,只是 LAN 口 比较少,可以接入的电脑比较少。

PC 或者 手机 连接无线路由器时也会给 PC 分配一个局域网 IP,子网掩码,网关等。

我住的地方的网络拓扑图如下:未命名文件

当手机与电脑通信的时候,实际通过 LAN 口走局域网通信。

当手机访问 维基百科 时,实际是通过路由器跳入到光猫网段,再通过光猫跳入到小区运营商的网络,… 到维基百科的服务器上。

只要需要有 IP 地址的设备(光猫,路由器,PC,手机)都需要有网卡,网卡出厂自带有 Mac 地址。IP 和 Mac 地址的作用后文中会介绍。

image-20200801144243065

交互机和路由器的区别

<font color=red>这部分内容是我自己的理解,我没有在网上找到资料佐证,请谨慎对待</font>

其实交换机和路由器硬件差别不大,只是硬件上的软件决定了它能做什么。

2 层交换机上的软件(只有数据链路层)可能只做解析帧,拿到 mac 地址,然后查找当前交换机的端口对应的 mac 地址,然后从对应的端口传递过去。

路由器(有网络层和数据链路层),当拿到数据包的时候,发现目的 mac 地址不是自己,就会将数据包通过 LAN 口发送出去。

当发送的数据包的 目的 MAC 地址 是当前路由器上 MAC地址 ,路由器就会对其解包,拿到数据包 目的 IP ,然后根据 目的 IP 匹配下一跳 mac 地址,封包为新的帧数据发送出去。

TCP/IP 通信

TCP_IP 同一以太网 (2)

从发送端发送数据的时候,数据经过每层的封包,经物理层传送到接收端。接收端收到数据包,一层一层进行拆包,然后将数据数据发送给我接收端的应用层的应用程序。

通常我们说的第一层就是 物理层 ,第二层是 链路层 …...

数据链路层

image-20200801220255714

源 MAC 地址 就是发送端的 MAC 地址,目标 MAC 地址不是最终的 MAC 地址,是下一跳节点的 MAC 地址。

类型 指的是这个以太网帧中的 数据 是何种类型的数据,比如 IPV4,IPV6。然后调用对应的接口进行处理。

数据链路层传输的帧是有大小限制的(64-1518 字节),能传输的数据的最大值就是 最大传输单元,简称 MTUMaximum Transmission Unit。这个值在以太网中通常是 1500。

# 查看网卡对应的 MTU
ifconfig -a
netstat -i

网络层

网络层主要以 IP 协议为主,也有 ICMP,ARP(在 TCP\IP 模型 中,arp 属于网络层。在 osi 七层模型arp 数据链路层。)。

DNS

IP 是网络层通信的标识。但是 IP 不容易记忆,所以出现了 域名

访问 DNS 可以将域名解析为 IP

可以在本地配置 host ,定义域名和 IP 对应关系,这样就不用解析了。

也可以在电脑配置 DNS 解析时访问的 ip,这样域名解析时就会访问这个服务。

<img data-original="http://oss.mflyyou.cn/blog/20200801182357.png?author=zhangpanqin" alt="image-20200801182357581" style="zoom:50%;" />

# 解析域名的 ip
dig www.mflyyou.cn

IP 基础

IP 地址 又可以分为 IPV4IPV6,目前使用比较广的是 IPV4 ,所以只介绍 IPV4

IP 地址 由 32 (2 进制)位组成,32 位被 . 分为了四组。每组 8 位,十进制表示就是 xxx.xxx.xxx.xxx(xxx 取值在 0-255)。

IP 地址网络地址 (网段) 和 主机号

同一个网段的电脑用 2 层交互机相连,然后就可以局域网通信了。

同一个网段内,主机号不能重复,重复主机号的电脑不能上网。

为了便于区分出 IP 在那个网段,引入了子网掩码 (netmask)。IP 地址与子网掩码按位与计算可以得出网段,32 位 中取出网段所在的位,剩余就是主机号能取得值。

IP 中主机号全为 0 就是网段,全为 1 就是广播地址。这两个是不能被分配给电脑的。

IP:192.168.202.116

子网掩码:255.255.252.0

网段为:192.168.200.0

广播地址为:192.168.203.255

IP:192.168.201.56

子网掩码:255.255.252.0

网段为:192.168.200.0

广播地址为:192.168.203.255

ICMP

网络层是不可靠传输,发送失败的数据包,网络层是不会再发一次数据包,但是会有 ICMP 包回复告诉你发包到底是什么问题。传输层 可以根据 ICMP 来判断是否需要重发包。

ARP

ARP 用于 IP 的 对应的MAC 地址。

目的 IP 在路由表中查询下一跳的 IP,在查询这个 IP 对应的 mac 地址

查询的这个 IP 是当前网段内的 ip,它会通过广播地址发送给当前网段内所有主机,收到这个协议的主机会判断是否是当前主机,是的话就会恢复当前 ip 对应的 MAC 地址。

image-20200801223925086

通信过程分析

未命名文件

当我在浏览器输入 wwww.mflyyou.cn 的时候:

1、先解析域名(DNS) www.mflyyou.cnIP (目的 IP: 47.104.168.20)

2、将目的 IP 与本地路由表中的子网掩码进行按位与,计算出网段与 Destination 匹配,看哪个匹配度更高,走哪个条目。都没有匹配到走默认条目(0.0.0.0)

# 查看路由表
route -n
Kernel IP routing table
Destination     Gateway         Genmask         Flags Metric Ref    Use Iface
0.0.0.0         192.168.31.1    0.0.0.0         UG    100    0        0 eth0

3、然后用 arp 查询(有缓存可不查,走缓存)192.168.31.1 对应的 mac 地址

4、数据链路层封装以太网帧数据包中的目的 MAC 地址址就是 192.168.31.1 对应的 mac 地址,然后将数据帧发送到下一个节点(这也就常说的下一跳,数据包发送只是找到当前接节点的下一个节点)

5、到下一个路由器节点,路由器解包,看是发给自己的数据包(根据帧中的目的 MAC 地址与自己的 MAC 地址比较),不是就丢弃了;是的话就会解包拿到 目的 IP (47.104.168.20),然后在当前路由器上根据路由表查询下一跳,发送给下一个节点;。。。。 直到目的服务器,或者发送的包 TTL 为 0

6、发到目的服务器的网卡上,网卡将数据复制到内核缓冲区,应用程序从缓冲区中读取数据

IP 数据格式

<font color=red>IPv4 数据结构</font>

<img data-original="http://oss.mflyyou.cn/blog/20200802000153.png?author=zhangpanqin" alt="image-20200802000153692" style="zoom:50%;" />

图来自《图解 TCP/IP》
  • 版本(Version):4 bit 构成,代表当前 IP包是哪个版本,IPv4 或者 IPv6,为 4 时表示当前是 IPv4。
  • 首部长度(Internet Header Length):由 4 bit 构成,一般 20字节大小。
  • 标识(Identification):用于分片重组用,值相同的属于同一个 IP 数据包
  • 标志(Flags):用于判断是否还有分片。
  • 总长度(Total Length):16 个字节,IP 数据包总的长度,最长可为 65525 字节。
  • 分段偏移(Fragment Offset):表示这个包在原来 IP 包中的位置。
  • 生存时间 TTL(Time To Live): IP 包在路由转发中存活的时间,被路由转发一次,次数减 1,为 0 时,数据包被丢弃。
  • 挂载协议标识 (Protocol):记录数据包中 Data(实际发送的数据)是什么类型的数据,1 标识 ICMP, 4 标识 IP, 6标识 TCP, 17 标识 UDP。根据这个挂载协议程序就知道调用哪些接口来进行后续的处理了。

数据链路层中 以太网数据帧MTU 是 1500 字节,限定了 IP 数据包最大为 1500 字节。然后去掉 IP 包首部 20 字节,一般 IP 数据包发送的数据为 1480 字节。

当我们发送一个 3058 字节的 IP 数据包时,这显然大于了数据链路层的 MTU (1500 字节)。所以网络层会对大于链路层MTU 的数据包进行分片。拆分一个一个的1500 的数据包发送接收端,接收端接收到这三个包,在汇聚成一个完成的,在调用传输层接口。

# 会发送 3050 字节数据与 8 字节的 ICMP 首部,这个命令会总共发送 ip 数据大小 3058 字节。
ping -s 3050 www.mflyyou.cn

image-20200801230015436

<img data-original="http://oss.mflyyou.cn/blog/20200801230141.png?author=zhangpanqin" alt="image-20200801230141070" style="zoom:50%;" />

<img data-original="http://oss.mflyyou.cn/blog/20200801230528.png?author=zhangpanqin" alt="image-20200801230528418" style="zoom:50%;" />

<img data-original="http://oss.mflyyou.cn/blog/20200801230423.png?author=zhangpanqin" alt="image-20200801230423653" style="zoom:50%;" />

通过 wireshark 抓包可以看到,IP 数据包的首部长度占了 20 字节,实际每次发送数据为 1480 字节,最后一次发送了 98 字节。

从 Fragment 和 Identification 可以看到这三个包属于同一个 IP 数据包,并且从 Fragment offset 能将这三个包合成一个完成的网络层数据包。

传输层 TCP

TCP 是面向链接的,可靠的,全双工协议。

面向连接就是发送之前,需要建立一个链接通道,数据都是在这个链接中发送。

网络层 是不可靠协议,数据发送失败是不会重发的。

TCP 协议中发送端会记录发送的那些数据包被客户端收到了。接收端接受数据之后,会回复一个 ACK 包(由数据格式中的控制位决定),确认应答号告诉发送端哪些数据包接收到了。

发送端 发送了数据包之后,这个包会有一个重发倒计时,在这个倒计时内没有收到接收端 回复 ACK 包,就会再重发一个数据包。如果是 HTTP 请求 ,就相当于同样的数据请求了两次。

我们知道支付接口都要求幂等性,有一部分原因是因为这个超时重发。发送端发送了请求,接收端处理好业务之后回复的 ACK 包超时,发送端超时重发这个请求。如果不保证接口的幂等性,那么扣钱就会扣两次。

我们要做的就是保证这个重发 n+1 次不再扣用户的钱,一般会用一个 token 来判断是不是重复请求,重复就不走扣款处理了,直接返回已经支付,保证接口的幂等性。或者用一个账单流水来保证幂等性。

连接既然需要建立,那么也会有连接断开。断开连接需双方协商好之后断开连接,不能单方面关闭而不管对方。因为建立连接之后占用的计算机资源需要释放掉。你单方面强制断开连接释放了资源,但是对方不知道需要断开连接,分配的计算机资源一直占用那就是不可靠协议了。所以 TCP 有四次挥手断开连接。

全双工就是连接两边都可以主动发送接受数据,而不是轮训访问有没有数据到达。

TCP 数据格式

首先我们要先了解 TCP 数据格式,才能更容易知道 TCP 的工作原理。

<img data-original="http://oss.mflyyou.cn/blog/20200802000246.png?author=zhangpanqin" alt="image-20200802000246545" style="zoom:50%;" />

源端口号(Source Port)

占用 2 个字节。标识 发送端 程序的端口号,当接收端需要回复消息的时候,需要带上这个端口号。

目的端口号(Destination Port)

占用 2 个字节。标识 接收端 程序的端口号,可以传递给监听在这个端口的程序

控制位(Control Flag)

占用 6 位,不满一个字节。标识当前 TCP 包是什么包,在通信过程中有一些特殊作用。

SYN

表示希望建立三次握手链接,并初始化序列号。

ACK

对收到数据包的应答确认。接收端接受数据之后,会回复 ACK 包,发送端从其上 确认应答号 知道接收端哪些数据已经接受了。

FIN

表示没有数据发送了,希望断开连接

PASH

接收端接收到这个数据包需要立刻传递给应用层,不能等待接收更多的数据包

RET

链接出现异常,需要强制断开连接

URG

表示包中有需要紧急处理的数据

序列号(Sequence Number)

占用 4 个字节。TCP 三次握手的时候,发送端和接收端各自初始化(随机的)自己的 `序列号。

我们可以这样理解,发送端发送的数据就是一个字节数组,这个数组中每个字节都有一个 序列号

发送端和接收端都有自己的序列号,并且不相同,在三次握手的时候自己初始化,然后告知对方。

确认应答号(Acknowledgement Number)

占用 4 个字节。确认应答号 也是指的序列号,指的是期望发送端下次发送的序列号,这个序列号(确认应答号)之前的数据已经接受处理了。

下图是我抓包建立三次链接,然后我发送三次 1\n 数据。

三次握手,发送端通过发送 SYN 包,发送自己的初始化序列号(893189542),然后发送的每个字节都会有一个序列号。

接收端发送 ACK 包中的 确认应答号,指明这个序列号之前的数据我已经接受了。

<img data-original="http://oss.mflyyou.cn/blog/20200802205000.png?author=zhangpanqin" alt="image-20200802205000890" style="zoom:50%;" />

窗口大小(Window Size)

窗口大小适用于流控的。发送端不能一直发送消息,需要根据我的接受能力来调整发包的速率。

未命名文件

内核会为每个 TCP/IP 分配读写缓冲区,网卡会从这些读写缓冲区中把数据取走,然后发送。数据大致可以分为这几类。

TCP/IP 是可靠连接,所以它需要记录哪些数据发送已被对方接受了(由确认应答号可以知道),接受的数据会被淘汰掉,节省内存空间。

窗口大小作用:接收端会通过 ACK 告诉 发送端 调整窗口大小。

当窗口中的数据全都是 已发送未确认数据 时,发送端不能再发送新的数据,必须等待窗口空出位置来。

未命名文件 (2)

当有一个数据包被确认了,发送端就可以发送新的数据包。已发送未确认数据 会在超时的时候重新发包。

滑动窗口百度百科)

校验和 (Checksum)

占用 2 个字节。校验和 用于校验数据包是否损坏。每个数据包都一个 校验和接收端 接收到数据之后,使用相同的算法对数据计算出一个值,然后和 校验和比较,不一样说明数据在传输过程中损坏了,接收端 会丢弃这个包,等待 发送端 重新发这个包。

TCP 中 MSS

链路层能发送的最大以太网帧为 1500 字节,MTU 为 1500。

IP 数据包能发送的最大数据 = MTU - IP 首部大小(一般 20 字节),IP 数据包超过这个 1500 字节会分片

TCP 传输数据以段 (Segment) 为单位。

TCP 为了避免分片,会主动将数据分片之后交给网络层。 TCP 能传输的最大分段(只是数据不包括首部)称之为 Max Segment Size,简称为 MSS。

MSS = MTU - IP 首部大小 - TCP 首部大小

在以太网中 TCP 的 MSS = 1500(MTU) - 20(一般 IP 首部大小) - 20(一般 TCP 首部大小)= 1460,这个值需要根据首部计算

image-20200802211639395

MSS 值在三次握手时,会通过 MTU 计算的。

TCP 三次握手建立连接

<img data-original="http://oss.mflyyou.cn/blog/20200802212532.png?author=zhangpanqin" alt="image-20200802212532628" style="zoom: 33%;" />

图片来自 码出高效:Java 开发手册

为什么是三次握手建立连接呢?很多面试官也会问。这其实是可靠连接的最少握手次数。

<img data-original="http://oss.mflyyou.cn/blog/20200802212808.png?author=zhangpanqin" alt="image-20200802212808724" style="zoom:50%;" />

图片来自 码出高效:Java 开发手册

这里还有个 全连接队列和半链接队列 的知识点

TCP 四次挥手断开连接

<img data-original="http://oss.mflyyou.cn/blog/20200802213247.png?author=zhangpanqin" alt="image-20200802213247725" style="zoom: 33%;" />

图片来自 码出高效:Java 开发手册

CLOSE_WAIT 是收到对方 FIN 包之后,回复 ACK 之后进入的状态。之后不会接受数据了,进行已收数据的业务处理之后,在发送一个 ACK+FIN,进入 LASK_ACK,然后等待对方发送 ACK,超时没有等到,会重试发送(内核可以配置重试发送次数)。当你发现服务端有大量的 CLOSE_WAIT 链接,服务端的代码有问题,需要排查。

TIME_WAIT 的链接多的话,服务端可以优化,不然这个链接会占用很长时间,在高并发的时候,会导致没有资源释放的慢。

MSL 为 Maximum Segment Lifetime,在 centos 中默认值为 60s

# sysctl -a | grep tcp_fin_timeout
# 推荐小于 30,也不能太小,15-30
net.ipv4.tcp_fin_timeout = 60

说明 A 机器链接会在 120 s 之后才能释放。这个是为了保证 B 机器 能接收到最后一个 ACK,当处于 LAST_ACK 的超时没有收到A 发来的 ACK 的话,会重试发送一个 FIN+ACK。这个 2MSL 也是为了最大限度保证 B 机器正常关闭。

三次握手建立连接四次挥手断开连接 需要结合抓包工具自己分析一下,理解会更深刻。

网络抓包

Wireshark 抓包分析是很厉害的,mac oslinux 都有命令行程序 tshark,可以在服务器用 tshark 抓包,拿到本地来分析。

抓包的时候一定要指定抓什么包,什么包都抓的话,一会你的电脑内存就飙升好多(别问我为啥知道,问就是 30g 内存都让它吃了)。

Wireshark 有个 抓包过滤器显示过滤器。抓包的时候指定抓什么包这是 抓包过滤器的作用,抓包之后显示显示那些内容那是 显示过滤器的作用

# -i 指定那个网卡 
# -f 指定抓包过滤器
# -Y 显示过滤器
# -w 指定抓包数据到文件,没有 -w 输出屏幕
# -V 显示 TCP/IP 每层包的详细信息,建议将抓包的文件在图形化界面中查看,不指定 -V
tshark  -i en0 -f "tcp" -Y "http"

# 抓取访问 www.mflyyou.cn 的包
tshark  -i en0 -w a.pcap -f "host www.mflyyou.cn"

# 指定抓那个协议 tcp,ip,icmp,arp,udp
tshark  -i en0 -f "tcp"


# host 指定域名或者 ip
# port 指定端口
# 访问 www.mflyyou.cn 的包,或者 icmp. ping www.baidu.com 也会被抓到
tshark  -i en0 -f "host www.mflyyou.cn || icmp"
tshark  -i en0 -f "port 80"

# 条件之间支持逻辑运算符 || && !
# 抓取 ssh 链接的包
tshark  -i en0 -f "host www.mflyyou.cn && port 22"

参考资料

《图解 TCP/IP》

linux-tcp 说明

鸟哥私房菜:基础网络的概念


本文由 张攀钦的博客http://www.mflyyou.cn/ 创作。 可自由转载、引用,但需署名作者且注明文章出处。

如转载至微信公众号,请在文末添加作者公众号二维码。微信公众号名称:Mflyyou

查看原文

赞 1 收藏 1 评论 0

张攀钦 发布了文章 · 7月28日

你不知道的SpringBoot与Vue部署解决方案

前言

前段时间公司外网部署的演示环境全部转到内网环境中去,所有对外演示的环境都需要申请外网映射才能访问某个服务。我用一个外网地址 www.a.com 映射到一个内网地址 http://ip:port,然后在这个地址 http://ip:port 用 nginx 做代理转发到各个组的项目 http://ipn:portn 上去,其中也遇到一些静态资源 404,主要是是解决这个 404 问题。

最近又做了一个项目,考虑到用户的体验,减少部署的复杂性,我想了一个办法用 SpringBoot 做 web 服务器映射前端资源为 web 资源 。

<font color=red>条件允许或者对性能要求比较高,推荐是前后端分离部署,nginx 做 web 服务器,后端只提供接口服务</font>

以前部署的项目 A 外网访问地址是 http://ip1:8080,外网映射后只能访问 http://ip/app1 ,以前项目 B 外网访问地址是 http://ip1:8081 ,项目访问地址是 http://ip/app2 。这也算是一个不大不小的变动,但是切换之后遇到的第一个问题就是静态资源转发导致 404

比如以前项目 A 访问地址是 http://ip1:8080 它是没有上下文的。

而现在 A 的访问地址为 http://ip/app1 ,有一个上下文 app1 在这里,导致有一些资源 404。

比如说:原来 http://ip1:8080 请求到了 index.html 资源,现在只能 http://ip/app1 请求到 index.html。

<!-- index.html -->
<!-- 原来部署环境写法 -->
<link href="/index.css" rel="stylesheet">

以前访问 index.css 地址是 http://ip1:8080/index.css ,但是现在变成访问了 http://ip/index.css 导致 404,实际 index.css 地址为 http://ip/app1/index.css

前端使用 vue 编写,html 中的静态资源路径可以很好解决,修改 webpack 打包即可。

<!-- 原来部署环境写法 -->
<link href="/index.css" rel="stylesheet">

<!-- 写成相对路径 -->
<link href="./index.css" rel="stylesheet">

<!-- 结合 webpack 打包时进行路径补充 -->
<link href="<%= BASE_URL %>index.css" rel="stylesheet">

但是项目中有一些组件的请求没有办法统一处理,只能改代码。但我不想动代码,webpack 打包都不想动,基于这些需求想了一个办法来解决。

本文内容

  • Nginx 部署 vue 项目,怎么能友好处理静态资源的丢失
  • SpringBoot 提供 web 服务器的功能映射 vue 项目为 web 资源,并处理 vue 路由转发 index.html 问题。

演示代码地址

https://github.com/zhangpanqin/vue-springboot

Nginx 部署 Vue 项目

server {
    listen 8087;
    # 它的作用是不重定向地址,比如浏览器输入 /app1 访问,也可以访问到 /app1/ ,而浏览器地址是不改变的 /app1 。没办法,强迫症
    location / {
        try_files $uri $uri/;
    }
    root /Users/zhangpanqin/staic/;
    location ~ /(.*)/ {
        index index.html /index.html;
        try_files $uri $uri/ /$1/index.html;
    }
}

/Users/zhangpanqin/staic/ 放部署的项目,比如 app 的项目资源放到 /Users/zhangpanqin/staic/app 下。 访问地址为 http://ip/8087/app

<!DOCTYPE html>
<html lang="en">
<head>
    <!-- 也可以改成类似的地址  BASE_URL 等于 vue.config.js 配置的 publicPath-->
    <link rel="icon" href="<%= BASE_URL %>favicon.ico">
    <!-- 部署之后,访问不到 index.css -->
    <link href="/index.css" rel="stylesheet">
</head>
</html>

为了可以在浏览器输入 vue 的路由 /app/blog 也可以访问页面,需要添加 vue-router 中的 base 属性。

import Vue from 'vue';
import VueRouter from 'vue-router';

Vue.use(VueRouter);

const routes = [
    {
        path: '/',
        name: 'Home',
        component: () => import('@/views/Home.vue'),
    },
    {
        path: '/blog',
        name: 'Blog',
        component: () => import('@/views/Blog.vue'),
    },
    {
        // 匹配不到路由的时候跳转到这里
        path: '*',
        name: 'Error404',
        component: () => import('@/views/Error404.vue'),
    }
];
const router = new VueRouter({
    // 主要是修改这里,可以根据 vue mode 环境来取值。
    // https://cli.vuejs.org/zh/guide/mode-and-env.html
    // https://router.vuejs.org/zh/api/#base
    base: process.env.VUE_APP_DEPLOY_PATH,
    mode: 'history',
    routes,
});

export default router;

<img data-original="http://oss.mflyyou.cn/blog/20200727234702.png?author=zhangpanqin" alt="image-20200727234702928" style="zoom: 25%;" />

http://localhost:8087/app/index.css 为 css 的真实地址。所以想办法为这些不以 /app 开头的资源加上 /app 就可以了,想了想只有 cookie 能做到。

x_vue_path 记录每个项目的路径,然后静态资源去这个路径下寻找,$cookie_x_vue_path/$uri

下面这个配置使用了 try_files 内部重定向资源,是不会在浏览器端发生重定向的。

# gzip ,缓存 和 epoll 优化的都没写
server {
    listen 8087;
    # 它的作用是不重定向地址,比如浏览器输入 /app1 访问,也可以访问到 /app1/ ,而浏览器地址是不改变的 /app1 。没办法,强迫症
    location / {
        try_files $uri $uri/;
    }
    root /Users/zhangpanqin/staic/;

    # (.*) 匹配是哪个项目,比如说 app1 app2 等
    location ~ /(.*)/.*/ {
        index index.html /index.html;
        add_header Set-Cookie "x_vue_path=/$1;path=/;";
        # /Users/zhangpanqin/staic/+/$1/index.html 可以到每个项目下 index.html
        try_files $uri $uri/ /$1/index.html @404router;
    }
    # 查找静态资源,也可以在这里添加缓存。
    location ~ (.css|js)$ {
        try_files $uri $cookie_x_vue_path/$uri @404router;
    }
    location @404router {
        return 404;
    }
}

image-20200728014849158

下面这个是重定向的配置

server {
    listen 8087;
    root /Users/zhangpanqin/staic/;

    location ~ /(.*)/.*/? {
        index index.html /index.html;
        add_header Set-Cookie "x_vue_path=/$1;path=/;";
        try_files $uri $uri/ /$1/index.html @404router;
    }
    location ~ (.css|js)$ {
        # 匹配到 /app/index.css 的资源,直接访问
        rewrite ^($cookie_x_vue_path)/.* $uri break;
        # 访问的资源 /index.css  302 临时重定向到 /app/index.css
        rewrite (.css|js)$ $cookie_x_vue_path$uri redirect;
    }
    location @404router {
        return 404;
    }
}

image-20200728014654144

根据这个思路就可以把所有的资源进行转发了,不用改业务代码,只需给 vue-router 加上一个 base 基础路由。

SpringBoot 部署 Vue 项目

Nginx 走通了,SpringBoot 依葫芦画瓢就行了,还是 java 写的舒服,能 debug,哈哈。

SpringBoot 映射静态资源

@Configuration
public class VueWebConfig implements WebMvcConfigurer {
    /**
     * 映射的静态资源路径
     * file:./static/ 路径是相对于 user.dir 路径,jar 包同级目录下的 static
     */
    private static final String[] CLASSPATH_RESOURCE_LOCATIONS = {"file:./static/", "classpath:/META-INF/resources/",
            "classpath:/resources/", "classpath:/static/", "classpath:/public/"};

    @Override
    public void addResourceHandlers(ResourceHandlerRegistry registry) {
        // 添加静态资源缓存
        CacheControl cacheControl = CacheControl.maxAge(5, TimeUnit.HOURS).cachePublic();
        registry.addResourceHandler("/**").addResourceLocations(CLASSPATH_RESOURCE_LOCATIONS).setCacheControl(cacheControl);
    }
    
   
    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        // 配置要拦截的资源,主要用于 添加 cookie 
        registry.addInterceptor(new VueCookieInterceptor()).addPathPatterns("/test/**");
    }

    // vue 路由转发使用的,也做 接口请求找不到的
    @Bean
    public VueErrorController vueErrorController() {
        return new VueErrorController(new DefaultErrorAttributes());
    }
}

项目静态资源路径添加 cookie

public class VueCookieInterceptor implements HandlerInterceptor {
    public static final String VUE_HTML_COOKIE_NAME = "x_vue_path";

    public static final String VUE_HTML_COOKIE_VALUE = "/test";

    /**
     * 配置请求资源路径 /test 下全部加上 cookie
     */
    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        final Cookie cookieByName = getCookieByName(request, VUE_HTML_COOKIE_NAME);
        if (Objects.isNull(cookieByName)) {
            final Cookie cookie = new Cookie(VUE_HTML_COOKIE_NAME, VUE_HTML_COOKIE_VALUE);
            // 项目下的 url 都带能带上
            cookie.setPath("/");
            cookie.setHttpOnly(true);
            response.addCookie(cookie);
        }
        return true;
    }

    public static Cookie getCookieByName(HttpServletRequest httpServletRequest, String cookieName) {
        final Cookie[] cookies = httpServletRequest.getCookies();
        if (Objects.isNull(cookieName) || Objects.isNull(cookies)) {
            return null;
        }
        for (Cookie cookie : cookies) {
            final String name = cookie.getName();
            if (Objects.equals(cookieName, name)) {
                return cookie;
            }
        }
        return null;
    }
}

请求出现错误做资源的转发

访问错误的跳转要分清楚 接口请求和静态资源的请求,通过 accept 可以判断。

@RequestMapping("/error")
public class VueErrorController extends AbstractErrorController {

    private static final String ONLINE_SAIL = VUE_HTML_COOKIE_NAME;

    private static final String ERROR_BEFORE_PATH = "javax.servlet.error.request_uri";

    public VueErrorController(DefaultErrorAttributes defaultErrorAttributes) {
        super(defaultErrorAttributes);
    }

    @Override
    public String getErrorPath() {
        return "/error";
    }
    
    @RequestMapping
    public ModelAndView errorHtml(HttpServletRequest httpServletRequest, HttpServletResponse response, @CookieValue(name = ONLINE_SAIL, required = false, defaultValue = "") String cookie) {
        final Object attribute = httpServletRequest.getAttribute(ERROR_BEFORE_PATH);
        if (cookie.length() > 0 && Objects.nonNull(attribute)) {
            response.setStatus(HttpStatus.OK.value());
            String requestURI = attribute.toString();
            // 访问的路径没有以 vue 部署的路径结尾,补充上路径转发去访问
            if (!requestURI.startsWith(cookie)) {
                ModelAndView modelAndView = new ModelAndView();
                modelAndView.setStatus(HttpStatus.OK);
                // 静态资源不想转发,重定向的话,修改为 redirect
                String viewName = "forward:" + cookie + requestURI;
                modelAndView.setViewName(viewName);
                return modelAndView;
            }
        }
        ModelAndView modelAndView = new ModelAndView();
        modelAndView.setStatus(HttpStatus.OK);
        modelAndView.setViewName("forward:/test/index.html");
        return modelAndView;
    }
    
    // 处理请求头为 accept 为 application/json 的请求,就是接口请求返回json 数据
    @RequestMapping(produces = MediaType.APPLICATION_JSON_VALUE)
    public ResponseEntity<Map<String, Object>> error(HttpServletRequest request) {
        HttpStatus status = getStatus(request);
        if (status == HttpStatus.NO_CONTENT) {
            return new ResponseEntity<>(status);
        }
        final Map<String, Object> errorAttributes = getErrorAttributes(request, true);
        return new ResponseEntity<>(errorAttributes, status);
    }

首页跳转

@Controller
public class IndexController {
    @RequestMapping(value = {"/test", "/test"})
    public String index() {
        return "forward:/test/index.html";
    }
}

本文由 张攀钦的博客http://www.mflyyou.cn/ 创作。 可自由转载、引用,但需署名作者且注明文章出处。

如转载至微信公众号,请在文末添加作者公众号二维码。微信公众号名称:Mflyyou

查看原文

赞 0 收藏 0 评论 0

张攀钦 发布了文章 · 7月26日

从linux内核理解Java怎样实现Socket通信

前言

前段时间买本书研究了 TCP/IP 通信,弄清楚了计算机之间是怎么通信的。网络通信的的基础就是 TCP/IP 协议簇,也被称为 TCP/IP 协议栈 ,也被简称为 TCP/IP 协议TCP/IP 协议 并不是只有 TCPIP 协议,只是这俩用的比较多,就用这两个起的名字。

我们目前使用的 HTTP , FTP , SMTP , DNS , HTTPS , SSH , MQTT , RPC 等都是以 TCP/IP协议 为基础。下图针对的是 传输层为 TCP

<img data-original="http://oss.mflyyou.cn/blog/20200718221518.svg?author=zhangpanqin" alt="TCP_IP 同一以太网 (1)" style="zoom:50%;" />

Linux 内核 为我们屏蔽了 TCP/IP 通信模型的复杂性,并且 Linux 中一切皆文件,因此为我们抽象了 Socket 文件,实际我们编码的时候,主要是通过一些系统调用和 Socket 打交道。

在 Java 中,网络通信这块 netty 提供了很大的便利,但是你了解了这些原理之后,netty 你也了解的差不多了。

内核参数说明

/proc/sys/net/* 说明

TCP/IP 内核参数说明

文件系统部分 /proc/sys/fs/* 说明

https://www.kernel.org/doc/Documentation/sysctl/net.txt
https://www.kernel.org/doc/Documentation/networking/ip-sysctl.txt
https://www.kernel.org/doc/Documentation/sysctl/fs.txt

修改内核参数,有两种改法,比如修改 tcp_syn_retries = 5

  • 临时修改
# 查看参数的完整值 net.ipv4.tcp_syn_retries = 6
sysctl -a  | grep tcp_syn_retries
# linux 一切皆文件,所以这个东西也是会在文件中保存,我们可以修改这个文件内容,临时生效,重启之后就不影响
# 内核属性文件路径都是在 /proc/sys 下,剩余的路径就是 net.ipv4.tcp_syn_retries 中的 . 替换为 /
echo 5 > /proc/sys/net/ipv4/tcp_syn_retries

# 查看修改之后的值
sysctl -a  | grep tcp_syn_retries
  • 永久修改
# tcp_syn_retries = 7
echo "net.ipv4.tcp_syn_retries = 7" >> /etc/sysctl.conf

# 让修改生效
sysctl -p

# 查看修改之后的值
sysctl -a  | grep tcp_syn_retries

本文内容

  • BIO 通信模型(画图说明)及 java 代码实现
  • NIO 通信模型及 java 代码实现
  • 多路复用通信模型(画图说明),主要是 epoll,会详细讲解

通信模型是按照 BIO -> NIO -> 多路复用 慢慢演变过来的,因为互联网的发展,并发要求比较高。

本文所用代码地址

https://github.com/zhangpanqin/fly-java-socket

本文内容环境:

  • jdk .18
  • Linux version 3.10.0-693.5.2.el7.x86_64

BIO 通信

Socket 通信 (1)

BIO 通信模型 中,服务端ServerSocket.accpet 会阻塞等待新的客户端经过 TCP 三次握手 建立连接,当客户端 Socket 建立了链接,就可以通过 ServerSocket.accpet 得到这个 Socket ,然后对这个 Socket 进行读写数据。

Socket 读写数据时,会阻塞当前线程直到操作完成,因此我们需要为每个客户端分配一个线程,然后在线程中死循环从 Socket 读取数据(客户端发来的数据)。还需要分配一个线程池对 Socket 进行写数据 (发送数据到客户端)。

<img data-original="http://oss.mflyyou.cn/blog/20200719151354.svg?author=zhangpanqin" alt="Java Bio" />

应用程序调用系统调用 read 将数据从 内核态用户态 ,这个过程在 BIO 中是阻塞的。而且数据你不知道什么时候过来,只能在一个线程中死循环查看数据是否可读。

try {
    // 当内核没有准备好数据的时候,一直在这里阻塞等待数据到来
    while ((length = inputStreamBySocket.read(data)) >= 0) {
        s = new String(data, 0, length, StandardCharsets.UTF_8);
        if (s.contains(EOF)) {
            this.close();
            return;
        }
        log.info("接收到客户端的消息,clientId: {} ,message: {}", clientId, s);

    }
    if (length == -1) {
        log.info("客户端关闭了,clientId: {},服务端释放资源", clientId);
        this.close();
    }
} catch (IOException e) {
    if (length == -1) {
        this.close();
    }
}

服务端主动往客户端写数据,应用程序调用 write 也是阻塞的。 我们可以通过线程池来做。为每个客户端会分配一个 id 属性维持会话,用 ConcurrentHashMap<Integer, SocketBioClient> 保持,要想 1 号客户端写数据,直接从这个 Map 拿出客户端,然后往里面写入数据。

public void writeMessage(Integer clientId, String message) {
    Objects.requireNonNull(clientId);
    Objects.requireNonNull(message);
    // 根据客户端 id 取出客户端。
    final SocketBioClient socketBioClient = CLIENT.get(clientId);
    Optional.ofNullable(socketBioClient).orElseThrow(() -> new RuntimeException("clientId: " + clientId + " 不合法"));
    // 在线程池中运行写入数据
    threadPoolExecutor.execute(() -> {
        if (socketBioClient.isClosed()) {
            CLIENT.remove(clientId);
            return;
        }
        socketBioClient.writeMessage(message);
    });
}

BIO 通信 在并发比较大的时候,就显得力不从心了。比如有五万链接建立,就需要建立五万个线程来进行维护通信。在 java 中线程占用的内存假设为 512KB,内存占用 24GB(50000*0.5/1024GB),还有 CPU 需要调度五万个线程来读取客户端数据和应答,CPU 绝大数的资源都会浪费在线程切换上去了,并且通信的实时性更不能保证。

全连接队列和半链接队列

1、服务端需要绑定一个 serverIpserverPort ; java 中 api 为 ServerSocket.bind

2、然后在这个 serverIpserverPort 上监听客户端的链接的到来

3、客户单绑定一个 clientIpclientPort,然后调用 Socket.conect(serverIp,serverPort),经过内核建立 Tcp 链接。

4、然后在服务端死循环调用 ServerSocket.accept 拿到建立连接 Socket

5、Socket.read 读取客户端发来的数据,Socket.wirte 写数据到客户端

serverIpserverPort 是确定的,只要 clientIpclientPort 只要有一个不同就可以看做是不同的客户端。

clientIpclientPortserverIpserverPort 在通信中也叫四元组,这四个确定才能建立 TCP/IP 链接。

比如我们的浏览器加载页面的时候,实际是随机创建了一个合法 本地 port ,加上已知的 clientIp 去请求 serverIpserverPort 获取数据。

TCP 链接建立 (2)

​ 客户端链接服务端的 TCP 三次握手过程:

1、客户端 发送一个 SYN 包给服务端,在 客户端 运行 netstat -natp ,可以查看到处于 SYN-SENT 状态

2、服务端 接受到 客户端SYN 包,将连接放入半链接队列,然后发送 客户端 一个 SYN+ACK 包,状态处于 SYN_REVD

3、客户端 收到来服务端的 SYN+ACK 包,回复一个 ACK,状态处于 ESTABLISHED (服务端全连接队列满的时候,客户端链接也是这个状态,当你发送数据的时候,服务端会回复一个 RST 包重置链接)

4、服务端 收到来自客户端的 ACK,链接状态变为 ESTABLISHED (只有服务端看这个状态状态的链接才是真正 TCP 链接过程走完的),并将连接放入到全连接队列

队列是一个有界队列,当全连接队列和半链接队列溢时,会有配置的内核参数决定采用对应的策略处理。

TCP 抓包

 # wireshark,需要安装这个程序,抓包相关的截图,我使用的 wireshark,mac 也有对应程序
 # -i 指定抓取那个网卡,port 指定只显示这个 port 的包
 tshark -i eth0 port 10222
 
 # linux 自带
 tcpdump -nn -i eth0 port 10222

全连接队列溢出

我在写代码验证及抓包的时候发现,设置的全队列长度为 10,但是可以建立 11 个链接,12 个链接建立的时候就发生了全连接溢出。

cat /proc/sys/net/ipv4/tcp_abort_on_overflow

# 临时修改
echo 1 > /proc/sys/net/ipv4/tcp_abort_on_overflow
# 临时修改,修改为 2 之后,发现重试只有两次了
echo 2 > /proc/sys/net/ipv4/tcp_synack_retries

tcp_abort_on_overflow 为 0 时(默认),表示如果第三次握手(客户端发送了 ACK)的时候,全连接队列满了,服务端会发送给客户端一个包让其重试发送 ACKsysctl -a | grep tcp_synack_retries 查看服务端配置第三次握手重试的次数,默认为 5 次。

image-20200725201134175

TCP 三次握手中的第三次客户端发送 ACK 给服务端,全连接队列满了,会丢弃第三次的 ACK 包,所以后续的过程中,是客户端再次发送 ACK 的包给服务端,服务端一直丢弃,所以,客户端一直发送 ACK

tcp_abort_on_overflow 为 1 时,表示如果第三次握手(客户端发送了 ACK)的时候,全连接队列满了,服务端会回复一个 RST 包,关闭连接过程

image-20200725200200971

半链接队列溢出

半链接队列的长度计算公式,来源于 从一次 Connection Reset 说起,TCP 半连接队列与全连接队列)

  • backloglisten 时传入的参数,我传入的 10
  • somaxconn ,我的是 128
  • tcp_max_syn_backlog,我的为 128

somaxconn 和 tcp_max_syn_backlog 参数含义

# 查看对应端口的 Send-Q
ss -lnt

# net.core.somaxconn = 128
sysctl -a | grep somaxconn

# net.ipv4.tcp_max_syn_backlog = 128
sysctl -a | grep tcp_max_syn_backlog

syn flood 攻击,模拟半链接溢出

# -p 指定端口
# --rand-source 伪造源 ip
# -S 只发送 SYN 包
# --flood 不停的攻击
# 10.211.55.8 攻击的目的 ip
hping3 -S --flood --rand-source -p 10222 10.211.55.8
# 计算半链接的数量
netstat -natp | grep SYN | wc -l

我分别将 backlog 设置为 7,123,511 测试的公式正确

nr_table_entries = min(backlog, somaxconn, tcp_max_syn_backlog)
nr_table_entries = max(nr_table_entries, 8)
// roundup_pow_of_two: 将参数(nr_table_entries + 1)向上取整到最小的 2^n
nr_table_entries = roundup_pow_of_two(nr_table_entries + 1)
max_qlen_log = max(3, log2(nr_table_entries))
max_queue_length = 2^max_qlen_log

SYN FLOOD 的防御

客户端发送大量的 SYN 包,然后就不走后面的握手过程,导致服务端半链接队列满了,无法接受正常用户的握手链接。

# 默认为 1,开启 syn cookie
cat /proc/sys/net/ipv4/tcp_syncookies

# 临时修改为 0 ,tcp_syncookies
echo 0 > /proc/sys/net/ipv4/tcp_syncookies

内核参数 tcp_syncookies 设置可以帮我们做一些防御 SYN FLOOD 攻击,当设置为 0 的时候,半链接队列满了,服务端会丢弃客户端的 SYN 包,客户端链接的时候,没有收到 SYN+ACK 会重试发送 SYN 包,超过了重试次数,建立连接失败。

linux 中是内核参数 net.ipv4.tcp_syn_retries = 6 ,限制 SYN 重试次数,当前半链接队列已经满了,新的正常链接建立的时候,重试发送的 SYN 次数。

当设置 tcp_syncookies=0 时,是不能抵御 SYN FLOOD 攻击的,新的正常用户建立不了链接。

image-20200726134431509

当设置 tcp_syncookies=1 时,新的正常链接(走三次握手)还是可以建立 TCP 连接的,前提是 全连接队列没有满,全连接队列满了,走全连接队列的逻辑。

# 临时修改
echo 1 > /proc/sys/net/ipv4/tcp_syncookies

全连接队列没有满,服务端会回复一个带 syncookieSYN+ACK 包给客户端,就是给这个包加一个会话标识,客户端收到这个 SYN+ACK 包必须将 syncookie 携带发送 ACK 才能建立三次握手的链接。

全连接队列满的话会从上面全连接队列。

Socket Bio 通信 GitHub 地址

NIO 通信

BIO 演变到 NIO ,只是支持了同步非阻塞。不要小看非阻塞这个特性,他可以将我们的线程模型降低为一个(在不考虑读写客户端实时性的情况下),BIO 不管你怎么修改,始终都要一个客户端对应一个读线程。NIO 在不考虑性能的情况下,理论可以一个线程管理 n 个客户端。

ServerSocketChannel.accept 可以不阻塞等待客户端建立连接;

while (true) {
    try {
        // bio 会在这里阻塞等待新的客户端建立。
        // nio 不阻塞等待,有链接建立,返回客户端。没有链接返回 null
        final SocketChannel accept = serverSocket.accept();
        if (Objects.nonNull(accept)) {
            accept.configureBlocking(false);
            final int currentIdClient = CLIENT_ID.incrementAndGet();
            final SocketNioClient socketNioClient = new SocketNioClient(currentIdClient, accept);
            CLIENT.put(currentIdClient, socketNioClient);
            new Thread(socketNioClient, "客户端-" + currentIdClient).start();
        }

    } catch (IOException e) {
        log.info("接受客户端你失败", e);
    }
}

SocketChannel.read 可以不阻塞等待数据从内核态到用户态,内核态中没有数据,直接返回。

ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);
while (true) {
    // bio 不管有没有数据,都要在这里等待读取
    // nio 当内核中没有数据可以读取,内核会返回 0
    length = this.client.read(byteBuffer);
    if (length > 0) {
        byteBuffer.flip();
        s = StandardCharsets.UTF_8.decode(byteBuffer).toString();
        log.info("接收到客户端的消息,clientId: {} ,message: {}", clientId, s);
        if (s.contains(EOF)) {
            this.close();
            return;
        }
    }
    if (length == -1) {
        log.info("客户端主动关闭了,clientId: {},服务端释放资源", clientId);
        this.close();
        return;
    }
    // 这里在内核没有准备好数据的时候,可以在这里执行一些别的业务代码
}

在 NIO 模型下,一个线程就可以管理所有的读写了(不考虑响应客户端的实时性)。

package com.fly.socket.nio;

import com.fly.socket.nio.chat.model.ChatPushDTO;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentLinkedDeque;

/**
 * @author 张攀钦
 * @date 2020-07-19-16:32
 */
@Slf4j
public class NioSingleThread implements AutoCloseable {
    // 客户端发送这个消息,说明要断开连接,服务端主动断开连接
    private static final String EOF = "exit";
    // 保存会话,由于这个是在单线程中操作的,不需要用并发容器
    private static final Map<Integer, SocketChannel> MAP = new HashMap<>(16);
    // http 接口主动发消息时,将消息保存在这个队列中
    private static final ConcurrentLinkedDeque<ChatPushDTO> QUEUE = new ConcurrentLinkedDeque<>();
    // 因为单线程操作,所以直接申请堆外 buffer,这样性能高,没有考虑能不能接受客户端发送消息的大小,简单写法,只考虑 1024 个字节。
    final ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);
    // 服务端 socket 绑定那个 端口
    private int port;
    // 全链接队列的 backlog,不理解这个属性,看上面的 BIO
    private int backlog;
    // 本次绑定 ServerSocketChannel
    private ServerSocketChannel open;

    // NioSingleThread 会注册到 ioc 中,closed 标记是否调用了NioSingleThread bean 被销毁时调用的 close 方法
    private boolean closed = false;

    public ServerSocketChannel getOpen() {
        return open;
    }

    public NioSingleThread(int port, int backlog) {
        this.port = port;
        this.backlog = backlog;
        try {
            open = ServerSocketChannel.open();
            // 设置使用 NIO 模型, ServerSocketChannel.accept 时候不阻塞
            open.configureBlocking(false);
            open.bind(new InetSocketAddress(port), backlog);
            this.init();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * @Bean(destroyMethod = "close")
     * public NioSingleThread nioSingleThread() {
     *     return new NioSingleThread(9998, 20);
     * }
     */
    @Override
    public void close() throws IOException {
        closed = true;
        if (Objects.nonNull(open)) {
            if (!open.socket().isClosed()) {
                open.close();
                log.info("关闭客户端了");
            }
        }
    }

    // 初始化之后,启动了一个线程
    private void init() {
        new Thread(
            () -> {
                Integer clientIdAuto = 1;
                while (true) {
                    // 先判断这个 bean 是否被销毁了,销毁了,说明服务端的在关闭,顺便也关闭 socket
                    if(closed){
                        if (open.socket().isClosed()) {
                            try {
                                open.close();
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }
                        return;
                    }
                    try {
                        // 处理新的客户端链接建立
                        final SocketChannel accept = open.accept();
                        if (Objects.nonNull(accept)) {
                            accept.configureBlocking(false);
                            MAP.put(clientIdAuto, accept);
                            clientIdAuto++;
                        }

                        // 处理读取事件
                        MAP.forEach((clientId, client) -> {
                            if (!client.socket().isClosed()) {
                                byteBuffer.clear();
                                try {
                                    final int read = client.read(byteBuffer);
                                    if (read == -1) {
                                        client.close();
                                        MAP.remove(clientId);
                                    }
                                    if (read > 0) {
                                        byteBuffer.flip();
                                        final String s = StandardCharsets.UTF_8.decode(byteBuffer).toString();
                                        log.info("读取客户端 clientId: {} 到的数据: {}", clientId, s);
                                        if (s.contains(EOF)) {
                                            if (!client.socket().isClosed()) {
                                                client.close();
                                            }
                                        }
                                    }

                                } catch (IOException e) {
                                    log.error("读取数据异常,clientId: {}", clientId);
                                }
                            }

                        });

                        // 处理写事件
                        while (!QUEUE.isEmpty()) {
                            final ChatPushDTO peek = QUEUE.remove();
                            if (Objects.isNull(peek)) {
                                break;
                            }
                            final Integer chatId = peek.getChatId();
                            final String message = peek.getMessage();
                            final SocketChannel socketChannel = MAP.get(chatId);
                            if (Objects.isNull(socketChannel) || socketChannel.socket().isClosed()) {
                                continue;
                            }

                            byteBuffer.clear();
                            byteBuffer.put(message.getBytes(StandardCharsets.UTF_8));
                            byteBuffer.flip();
                            socketChannel.write(byteBuffer);

                        }


                    } catch (IOException e) {
                        throw new RuntimeException("服务端异常", e);
                    }
                }
            }, "NioSingleThread"
        ).start();

    }

    // 对外暴露的接口,写事件
    public void writeMessage(ChatPushDTO chatPushDTO) {
        Objects.requireNonNull(chatPushDTO);
        QUEUE.add(chatPushDTO);
    }
}

NIO 代码 GitHub 地址

NIO 模型已经不错了,减少了线程和内存占用。但是它有一个弊端就是客户端有没有数据还是需要调用系统调用 read 来看看是否有数据到达。

当比如有五万个链接的时候,我们需要调用系统调用五万次 int read = client.read(byteBuffer),换而言之用户态到内核态需要切换五万次,这也是不小的计算机资源消耗。

IO 模型 继续演变到目前常用比较广泛的 多路复用,它解决了这个系统调用多次的问题,将五万次的系统调用减少到一次或者多次。

IO 多路复用

NIO 存在的弊端:不管你客户端有没有数据传过来,我都要调用系统调用看看有没有数据到来。

客户端建立连接之后,内核会为这个客户端分配一个 fd(文件描述符)

IO 多路复用 指的是内核监控客户端(fd)有没有数据到来,当我们想要知道哪些客户端数据到来了,只需要调用多路复用器 select , poll , epoll 提供的系统调用即可,将想要知道的客户端(fd)传进去,内核就会返回哪些客户端(fd)数据准备好了。我们从原来的五万次系统调用,降低到一次,大大降低了系统开销。epoll 是这三个多路复用器中效率最高的一个。

1、select 一次调用传入的 fd 是有数量限制的(一次只能传入 1024 个,不同的内核参数可能会不同),五万链接会调用 30 次左右系统调用,但是内核还是会遍历这五万个链接,检查是否有数据可读。然后调用对应的系统调用,获得有数据到达的客户端 (fd),然后操作 fd 将数据从 内核态 copy 到 用户态 去做业务处理。

2、pollselect 差不多,只是系统调用时传入的 fd 没有限制。pollselect 只是减少了系统调用,实际内核也是遍历每个链接检查是否可读,所以效率和连接总数成线性关系,建立连接的客户端越多效率越低。

3、epoll 不是内核轮训每个 fd 检验是否可读。当客户端数据到达,内核将网卡中将数据读到到自己的内存空间,内核会将有数据到达的连接放入到一个队列中去,用户态的程序只需要调用 epoll 提供的系统调用,从这个队里中拿到链接对应的 fd 即可,所以效率和活跃连接数有关,和连接总数没有关系(百万链接中可能只有 20% 是活跃链接)。

epoll 相关的系统调用

epoll 内部维护了一个红黑树和队列,红黑树记录当前多路复用器需要监测哪些链接的那些操作(读写等),队列中就是哪些操作就绪的链接。

epoll_create

//  返回文件描述符,这个文件描述符对应 epoll 实例,fd 在后续 epoll 相关的系统调用中有用
int epoll_create(int size);

epoll_create 创造一个多路复用器实例 epoll,返回一个 epfd,这个 epfd 指向了epoll的实例。epfd 实际就是一个文件描述符。

epoll_ctl

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

epoll_ctl 将客户端或者服务端对应的 socket fd 注册 epoll 上,op 就是指定当前系统调用的类型,是将 fd 注册到 epoll ,还是从 epoll 删除 fd,还是修改在 epoll 上 event 。event 指的是 io 操作(读、写等)。

epoll_ctl 设置 epoll 的实例监听哪些客户端或者服务端,并且指定监听它们的那些 io 操作。

epoll_wait

# epoll 返回了准备好 io 操作的 fd 的数量
int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);

获取当前多路复用器(epfd)上有多少个客户端 io 操作就绪(注册 epoll 中时指定的操作)。epoll_wait 当没有指定 timeout 时,会一直阻塞等待至少有一个客户端 io 操作就绪。timeout 大于 0 会在超时时直接返回 0。

epoll_event 是接受这个系统调用中准备好的事件,事件数据结构中可以拿到对应的客户端 fd。

epoll_wait 是阻塞调用,返回的话:

  • 有 io 操作就绪
  • 指定的超时时间到了
  • 调用被打断就会返回

epoll 触发方式

epoll 监控多个文件描述符的 io 事件,什么样的情况 epoll 认为是可以读写呢,这是就事件的触发方式。epoll 支持两重触发方式,边缘触发(edge trigger,ET)和水平触发 (level trigger,LT)。

每个 fd 缓冲区,fd 缓冲区中又可以分为读缓冲区和写缓冲区。每个客户端链接对应一个 fd。

客户端数据来了,网卡会将客户端来的数据从网卡的内存中写入到链接对应内核中的 fd 读缓冲区。应用程序调用 epoll_wait 知道那个链接有数据到达了,再将这个数据从内核态读到用户态,然后做数据处理。

往客户端写数据。应用程序调用 socket (对应一个 fd) api,将数据从用户态写入到内核态中的 fd 写缓冲区中去,然后内核会将数据写入到网卡中去,网卡在适当的时机再发给客户端。

如果 fd 的写缓冲区满了,当调用 write 的时候就会阻塞等待写缓冲区腾出空间来。

TCP 链接数据发送的时候,会有一个滑动窗口控制数据的发送。当发送的快,接受的慢,当超过了这个流量控制,发送的数据包,没有收到客户端发来的 ACK ,会继续重试发送数据包。

下图是在流控之内正常发送,服务端发包,客户端接收到,恢复一个 ACK

<img data-original="http://oss.mflyyou.cn/blog/20200726191559.png?author=zhangpanqin" alt="image-20200726191559755" style="zoom:150%;" />

这个是流控之外没有发送成功,会等待接着发送的。

image-20200726191825717

这个也和 fd 的读写缓冲区有关系,客户端的度读缓冲区满了,服务端再怎么发,也不会成功的。

服务端写数据到客户端,会从

1、水平触发时机

  • 对于读操作,只要读缓冲内容不为空,LT模式返回读就绪。
  • 对于写操作,只要写缓冲区不满,LT模式会返回写就绪。

2、边缘触发时机

读操作
  • 当缓冲区由不可读变为可读的时候,即缓冲区由空变为不空的时候。
  • 当有新数据到达时,即缓冲区中的待读数据变多的时候。
写操作
  • 当缓冲区由不可写变为可写时。
  • 当有旧数据被发送走,即缓冲区中的内容变少的时候。

边缘触发相当于只有增量的时候才会触发。

Java 多路复用

Java 中对多路复用器的抽象是 Selector 。根据不同的平台通过 SPI获得不同的 SelectorProvider

// 根据 SPI 获取多路复用器,linux 是 epoll,mac 下是 KQueue
public abstract AbstractSelector openSelector()throws IOException;

// 获取服务端 socket
public abstract ServerSocketChannel openServerSocketChannel()throws IOException;

// 获取客户端 socket
public abstract SocketChannel openSocketChannel()throws IOException;
public abstract class Selector implements Closeable {

    // 相当于 epoll_create ,创建一个多路复用器
    public static Selector open() throws IOException {
        return SelectorProvider.provider().openSelector();
    }
    
    // 相当于 epoll_wait
    // select 实现使用了 synchronized ,它的锁和 register 使用的锁有重复,当 select 阻塞的时候,调用 register 也会被阻塞。
    public abstract int select(long timeout)throws IOException;
    public abstract int select() throws IOException;

    // 打断 epoll_wait 的阻塞
    public abstract Selector wakeup();

    // 释放 epoll 的示例
    public abstract void close() throws IOException;
    
    // 方法在 AbstractSelector extends Selector
    protected abstract SelectionKey register(AbstractSelectableChannel ch,int ops, Object att);
}
public abstract class SocketChannel extends AbstractSelectableChannel implements
        ByteChannel, ScatteringByteChannel, GatheringByteChannel, NetworkChannel {
    /**
     * 从通道读取数据是加锁的,方法线程安全。读取之后的结果 ByteBuffer 操作需要自己保证安全
     * synchronized(this.readLock)
     */
    @Override
    public abstract int read(ByteBuffer dst) throws IOException;
    
    /**
     * 将缓冲区的数据写入到通道中,加锁。但是 ByteBuffer 需要自己保证安全
     * synchronized(this.writeLock)
     */
    @Override
    public abstract int write(ByteBuffer src) throws IOException;
}

一个简单 Demo

/**
 * @author 张攀钦
 * @date 2020-07-26-16:15
 */
public class SocketDemo1 {
    public static void main(String[] args) throws IOException {
        // 调用 socket() 系统调用获取 socketfd
        final ServerSocketChannel open = ServerSocketChannel.open();
        // 注册多路复用器的 socket 必须是非阻塞的
        open.configureBlocking(false);
        // 调用 bind 系统调用,将 socketfd 绑定特定的 ip 和 port
        open.bind(new InetSocketAddress("10.211.55.8", 10224), 8);
        // 调用 epoll_create 多创建一个多路复用器,epoll
        final Selector open1 = Selector.open();
        // epoll_ctl 让 epoll 监听 socketfd 的 哪些io 操作
        open.register(open1, SelectionKey.OP_ACCEPT);
        // 解决 Selector.select 阻塞的时候,调用 Selector.register 被阻塞的问题,这个点很重要,一定要理解
        final LinkedBlockingQueue<Runnable> objects = new LinkedBlockingQueue<>(1024);
        
        // 创建监听客户端的 epoll,可以根据业务,创建一定数量 epoll,每个 epoll 下监听一定量客户端链接
        Selector open2 = Selector.open();

        // 这个线程用于读取数据
        new Thread(() -> {
            while (true) {
                try {
                    // 调用这个方法会阻塞,阻塞的时候等待 io 操作,select 阻塞的时候锁没有释放,当调用 register 也被阻塞了,最终可能造成多个线程                      // 都被阻塞
                    int select = open2.select();
                    if (select > 0) {
                        final Set<SelectionKey> selectionKeys = open2.selectedKeys();
                        final Iterator<SelectionKey> iterator = selectionKeys.iterator();
                        while (iterator.hasNext()) {
                            System.out.println("随便输入数据");
                            // 可以在这里阻塞将数据从内核态读入到用户态,主要为了验证缓冲区和 Tcp 的滑动窗口
                            System.in.read();
                            final SelectionKey next = iterator.next();
                            iterator.remove();
                            if (next.isReadable()) {
                                final SocketChannel channel = (SocketChannel) next.channel();
                                final ByteBuffer allocate = ByteBuffer.allocate(1024);
                                final int read = channel.read(allocate);
                                // 长度为 -1 的时候说明客户端关闭了
                                if (read == -1) {
                                    channel.close();
                                }
                                if (read > 0) {
                                    allocate.flip();
                                    System.out.println(StandardCharsets.UTF_8.decode(allocate).toString());
                                }
                            }
                        }
                    }
                
                    // 在这里解决 select 阻塞 register 的问题。
                    final Runnable poll = objects.poll();
                    if (Objects.nonNull(poll)) {
                        poll.run();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        
        // 主要用于接受客户端的链接,并将链接注册到 epoll 的逻辑
        new Thread(() -> {
            while (true) {
                try {
                    if (open1.select(100) <= 0) {
                        continue;
                    }
                    final Set<SelectionKey> selectionKeys = open1.selectedKeys();
                    final Iterator<SelectionKey> iterator = selectionKeys.iterator();
                    while (iterator.hasNext()) {
                        final SelectionKey next = iterator.next();
                        iterator.remove();
                        if (next.isValid() & next.isAcceptable()) {
                            final ServerSocketChannel channel = (ServerSocketChannel) next.channel();
                            final SocketChannel accept = channel.accept();
                            if (Objects.nonNull(accept)) {
                                accept.configureBlocking(false);
                                objects.put(() -> {
                                    open2.wakeup();
                                    try {
                                        accept.register(open2, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
                                    } catch (ClosedChannelException e) {
                                        e.printStackTrace();
                                    }
                                });
                                open2.wakeup();
                            }
                        }
                    }
                } catch (IOException | InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

参考资料

TCP/IP 介绍


本文由 张攀钦的博客http://www.mflyyou.cn/ 创作。 可自由转载、引用,但需署名作者且注明文章出处。

如转载至微信公众号,请在文末添加作者公众号二维码。微信公众号名称:Mflyyou

查看原文

赞 0 收藏 0 评论 0

张攀钦 发布了文章 · 7月12日

从Linux内核理解JAVA的NIO

前言

IO 可以简单分为磁盘 IO网络 IO ,磁盘 IO 相对于网络 IO 速度会快一点,本文主要介绍 磁盘 IO网络 IO 下周写。

JAVA 对 NIO 抽象为 Channel , Channel 又可以分为 FileChannel (磁盘 io)和 SocketChannel (网络 io)。

如果你对 IO 的理解只是停留在 api 层面那是远远不够的,一定要了解 IO 在系统层面是怎么处理的。

本文内容:

  • FileChannel 读写复制文件的用法。
  • ByteBuffer 的介绍
  • jvm 文件进程锁,FileLock
  • HeapByteBuffer ,DirectByteBuffer 和 mmap 谁的速度更快
  • Linux 内核 中的 虚拟内存系统调用文件描述符InodePage Cache缺页异常讲述整个 IO 的过程
  • jvm 堆外的 DirectByteBuffer 的内存怎么回收

<img data-original="http://oss.mflyyou.cn/blog/20200711165857.png?author=zhangpanqin" alt="image-20200711165857889" style="zoom: 33%;" />

本文计算机系统相关的图全部来自 《深入理解计算机系统》

对 Linux 的了解都是来自书上和查阅资料,本文内容主要是我自己的理解和代码验证,有的描述不一定准确,重在理解过程即可。

NIO

NIO 是 从 Java 1.4 开始引入的,被称之为 Non Blocking IO,也有称之为 New IO。

NIO 抽象为 Channel 是面向缓冲区的(操作的是一块数据),非阻塞 IO。

Channel 只负责传输,数据由 Buffer 负责存储。

Buffer

Buffer 中的 capacitylimitposition 属性是比较重要的,这些弄不明白,读写文件会遇到很多坑。

capacity 标识 Buffer 最大数据容量,相等于一个数组的长度。

limit 为一个指针,标识当前数组可操作的数据的最大索引。

position 表示为下一个读取数据时的索引

<img data-original="http://oss.mflyyou.cn/blog/20200711202515.png?author=zhangpanqin" alt="image-20200711202515462" style="zoom:50%;" />

@Test
public void run1() {
    // `DirectByteBuffer`
    final ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);
    // `HeapByteBuffer`
    final ByteBuffer allocate = ByteBuffer.allocate(1024);
}

HeapByteBuffer 会分配在 Jvm堆内,受 JVM 堆大小的限制,创建速度快,但是读写速度慢。实际底层是一个字节数组。

DirectByteBuffer 会分配 Jvm 堆外,不受 JVM 堆大小的限制,创建速度慢,读写快。DirectByteBuffer 内存在 Linux 中,属于进程的堆内。DirectByteBuffer 受 jvm 参数 MaxDirectMemorySize 的影响。

设置 jvm 堆 100m,运行程序报错 Exception in thread "main" java.lang.OutOfMemoryError: Java heap space。因为指定了 jvm 堆为 100m,然后一些 class 文件也会放在 堆中的,实际堆内存时不足 100m,当申请 100m 堆内存只能报错了。

public class BufferNio {
    // -Xmx100m
    public static void main(String[] args) throws InterruptedException {
        // HeapByteBuffer 是 jvm 堆内,因为堆不足分配 100m(java 中的一些 class 也会占用堆),导致 oom
        System.out.println("申请 100 m `HeapByteBuffer`");
        Thread.sleep(5000);
        ByteBuffer.allocate(100 * 1024 * 1024);
    }
}

设置 jvm 堆为 100m,MaxDirectMemorySize 为 1g,死循环创建 DirectByteBuffer,打印 10 次 申请 directbuffer 成功,报错 Exception in thread "main" java.lang.OutOfMemoryError: Direct buffer memory,后面再说这个堆外的 DirectByteBuffer 怎么进行回收。

public class BufferNio {
//    -Xmx100m -XX:MaxDirectMemorySize=1g
    public static void main(String[] args) throws InterruptedException {
        System.out.println("申请 100 m DirectByteBuffer");
        final ArrayList<Object> objects = new ArrayList<>();
        while (true) {
            // DirectByteBuffer 不在 jvm 堆内,所以可以申请成功,但是不是无限制的,也有限制(MaxDirectMemorySize)
            final ByteBuffer byteBuffer = ByteBuffer.allocateDirect(100 * 1024 * 1024);
            objects.add(byteBuffer);
            System.out.println("申请 directbuffer 成功");
            System.out.println(ManagementFactory.getMemoryMXBean().getHeapMemoryUsage());
            System.out.println(ManagementFactory.getMemoryMXBean().getNonHeapMemoryUsage());
        }
    }
}

FileChannel

读文件

@Test
public void read() throws IOException {
    final Path path = Paths.get(FILE_NAME);
    // 创建一个 FileChannel,指定这个 channel 读写的权限
    final FileChannel open = FileChannel.open(path, StandardOpenOption.READ);
    // 创建一个和这个文件大小一样的 buffer,小文件可以这样,大文件,循环读
    final ByteBuffer allocate = ByteBuffer.allocate((int) open.size());
    open.read(allocate);
    open.close();
    // 切换为读模式,position=0
    allocate.flip();
    // 用 UTF-8 解码
    final CharBuffer decode = StandardCharsets.UTF_8.decode(allocate);
    System.out.println(decode.toString());
}

写文件

@Test
public void write() throws IOException {
    final Path path = Paths.get("demo" + FILE_NAME);
    // 通道具有写权限,create 标识文件不存在的时候创建
    final FileChannel open = FileChannel.open(path, StandardOpenOption.WRITE, StandardOpenOption.CREATE);
    final ByteBuffer allocate = ByteBuffer.allocate(1024);
    allocate.put("张攀钦aaaaa-1111111".getBytes(StandardCharsets.UTF_8));
    // 切换写模式,position=0
    allocate.flip();
    open.write(allocate);
    open.close();
}

复制文件

@Test
public void copy() throws IOException {
    final Path srcPath = Paths.get(FILE_NAME);
    final Path destPath = Paths.get("demo" + FILE_NAME);
    final FileChannel srcChannel = FileChannel.open(srcPath, StandardOpenOption.READ);
    final FileChannel destChannel = FileChannel.open(destPath, StandardOpenOption.WRITE, StandardOpenOption.CREATE);
    // transferTo 实现类中,用的是一个 8M MappedByteBuffer 做数据的 copy ,但是这个方法只能 copy 文件最大字节数为 Integer.MAX
    srcChannel.transferTo(0, srcChannel.size(), destChannel);
    destChannel.close();
    srcChannel.close();
}

FileLock

FileLcok 是 jvm 进程文件锁,在多个 jvm 进程间生效,进程享有文件的读写权限,有共享锁 和 独占锁。

同一个进程不能锁同一个文件的重复区域,不重复是可以锁的。

同一个进程中第一个线程锁文件的 (0,2),同时另一个线程锁(1,2),文件锁的区域有重复,程序会报错。

一个进程锁(0,2),另一个进程锁(1,2)这是可以的,因为 FileLock 是 JVM 进程锁。

运行下面程序两次,打印结果为

第一个程序顺利打印

获取到锁0-3,代码没有被阻塞
获取到锁4-7,代码没有被阻塞

第二个程序打印

获取到锁4-7,代码没有被阻塞
获取到锁0-3,代码没有被阻塞

第一个程序运行的时候,file_lock.txt 的 0-2 位置被锁住了,第一个程序持有锁 10 s,第二个程序运行的时候,会在这里阻塞等待 FileLock,直到第一个程序释放了锁。

public class FileLock {
    public static void main(String[] args) throws IOException, InterruptedException {
        final Path path = Paths.get("file_lock.txt");
        final FileChannel open = FileChannel.open(path, StandardOpenOption.WRITE, StandardOpenOption.READ);
        final CountDownLatch countDownLatch = new CountDownLatch(2);
        new Thread(() -> {
         
            try (final java.nio.channels.FileLock lock = open.lock(0, 3, false)) {
             
                System.out.println("获取到锁0-3,代码没有被阻塞");
                Thread.sleep(10000);
                final ByteBuffer wrap = ByteBuffer.wrap("aaa".getBytes());
                open.position(0);
                open.write(wrap);
                Thread.sleep(10000);
            } catch (IOException | InterruptedException e) {
                e.printStackTrace();
            } finally {
                countDownLatch.countDown();
            }
        }).start();
        Thread.sleep(1000);
        new Thread(() -> {
            try (final java.nio.channels.FileLock lock = open.lock(4, 3, false)) {
                System.out.println("获取到锁4-7,代码没有被阻塞");
                final ByteBuffer wrap = ByteBuffer.wrap("bbb".getBytes());
                open.position(4);
                open.write(wrap);
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                countDownLatch.countDown();
            }
        }).start();
        countDownLatch.await();
        open.close();
    }
}

当将上面的程序第二个线程改为 java.nio.channels.FileLock lock = open.lock(1, 3, false) ,因为同一个进程不允许锁文件的重复区域,程序会报错。

Exception in thread "Thread-1" java.nio.channels.OverlappingFileLockException

HeapByteBuffer 和 DirectByteBuffer 谁的读写效率高?

FileChannel 的实现类 FileChannelImpl,当读写 ByteBuffer 会判断是否是 DirectBuffer,不是的话,会创建一个 DirectBuffer,将原来的的 Buffer 数据 copy 到 DirectBuffer 中使用。所以读写效率上来说,DirectByteBuffer 读写更快。但是 DirectByteBuffer 创建相对来说耗时。

尽管 DirectByteBuffer 是堆外,但是当堆外内存占用达到 -XX:MaxDirectMemorySize 的时候,也会触发 FullGC ,如果堆外没有办法回收内存,就会抛出 OOM。

// 下面这个程序会一直执行下去,但是会触发 FullGC,来回收掉堆外的直接内存
public class BufferNio {
    //    -Xmx100m -XX:MaxDirectMemorySize=1g
    public static void main(String[] args) throws InterruptedException {
        System.out.println("申请 100 m `HeapByteBuffer`");
        while (true) {
            // 当前对象没有被引用,GC root 也就到达不了 DirectByteBuffer
            ByteBuffer.allocateDirect(100 * 1024 * 1024);
            System.out.println("申请 directbuffer 成功");
        }
    }
}

死循环创建的 DirectByteBuffer 没有 GC ROOT 到达,对象会被回收掉,回收掉的时候,也只是回收掉堆内啊,堆外的回收怎么做到的呢?

DirectByteBuffer 源码着手,可以看到它有一个成员变量 private final Cleaner cleaner;,当触发 FullGC 的时候,因为 cleaner 没有 gc root 可达,导致 cleaner 会被回收,回收的时候会触发 Cleaner.clean (在 Reference.tryHandlePending 触发)方法的调用,thunk 就是 DirectByteBuffer.Deallocator 的示例,这个 run 方法中,调用了Unsafe.freeMemory 来释放掉了堆外内存。

public class Cleaner extends PhantomReference<Object> {
      private final Runnable thunk;
     public void clean() {
        if (remove(this)) {
            try {
                this.thunk.run();
            } catch (final Throwable var2) {
                AccessController.doPrivileged(new PrivilegedAction<Void>() {
                    public Void run() {
                        if (System.err != null) {
                            (new Error("Cleaner terminated abnormally", var2)).printStackTrace();
                        }

                        System.exit(1);
                        return null;
                    }
                });
            }

        }
    }
}

内存映射

<img data-original="http://oss.mflyyou.cn/blog/20200712125658.png?author=zhangpanqin" alt="image-20200712125657989" style="zoom:50%;" />

当应用程序读文件的时候,数据需要从先从磁盘读取到内核空间(第一次读写,没有 page cache 缓存数据),在从内核空间 copy 到用户空间,这样应用程序才能使用读到的数据。当一个文件的全部数据都在内核的 Page Cache 上时,就不用再从磁盘读了,直接从内核空间 copy 到用户空间去了。

应用程序对一个文件写数据时,先将要写的数据 copy 到内核 的 page cache,然后调用 fsync 将数据从内核落盘到文件上(只要调用返回成功,数据就不会丢失)。或者不调用 fsync 落盘,应用程序的数据只要写入到 内核的 pagecache 上,写入操作就算完成了,数据的落盘交由 内核 的 Io 调度程序在适当的时机来落盘(突然断电会丢数据,MySQL 这样的程序都是自己维护数据的落盘的)。

我们可以看到数据的读写总会经过从用户空间与内核空间的 copy ,如果能把这个 copy 去掉,效率就会高很多,这就是 mmap (内存映射)。将用户空间和内核空间的内存指向同一块物理内存。内存映射 英文为 Memory Mapping ,缩写 mmap。对应系统调用 mmap

这样在用户空间读写数据,实际操作的也是内核空间的,减少了数据的 copy 。

<img data-original="http://oss.mflyyou.cn/blog/20200712145306.png?author=zhangpanqin" alt="image-20200712145306814" style="zoom:50%;" />

怎么实现的呢,简单来说就是 linux 中进程的地址是虚拟地址,cpu 会将虚拟地址映射到物理内存的物理地址上。mmap 实际是将用户进程的某块虚拟地址与内核空间的某块虚拟地址映射到同一块物理内存上,已达到减少数据的 copy 。

用户程序调用系统调用 mmap 之后的数据的读写都不需要调用系统调用 readwrite 了。

虚拟内存与物理内存的映射

计算机的主存可以看做是由 M 个连续字节组成的数组,每个字节都有一个唯一物理地址 (Physical Address)。

Cpu 使用的 虚拟寻址VA,Virtual Address) 来查找物理地址。

<img data-original="http://oss.mflyyou.cn/blog/20200711171400.png?author=zhangpanqin" alt="image-20200711171400757" style="zoom:50%;" />

CPU 会将进程使用的 虚拟地址 通过 CPU 上的硬件 内存管理单元 (Memory Management UnitMMU) 的进行地址翻译找到物理主存中的物理地址,从而获取数据。

当进程加载之后,系统会为进程分配一个虚拟地址空间,当虚拟地址空间中的某个 虚拟地址 被使用的时候,就会将其先映射到主存上的 物理地址

当多个进程需要共享数据的时候,只需要将其虚拟地址空间中的某些虚拟地址映射相同的物理地址即可。

通常我们操作数据的时候,不会一个字节一个字节的操作,这样效率太低,通常都是连续访问某些字节。所以在内存管理的时候,将内存空间分割为页来管理,物理内存中有物理页Physical Page),虚拟内存中有 Virtual Page 来管理。通常页的大小为 4KB。

系统通过 MMU 和 页表(Page Table) 来管理 虚拟页物理也 的对应关系,页表就是页表条目(Page Table Entry,PTE)的数组

<img data-original="http://oss.mflyyou.cn/blog/20200711183510.png?author=zhangpanqin" alt="image-20200711183510194" style="zoom:50%;" />

PTE 的有效为1时,标识数据在内存中,标识为 0 时,标识在磁盘上。

当访问的虚拟地址对应的数据不再物理内存上时,会有两种情况处理:

1、在内存够用的时候,会直接将虚拟页对应在磁盘上的数据加载到物理内存上,

2、当内存不够用的时候,就会触发 swap,会根据 LRU 将最近使用频率比较低的虚拟页对应物理也淘汰掉,写入到磁盘中去,淘汰掉一部分物理内存中的数据,然后对对应的虚拟页设置为 0,然后将磁盘上的数据再加载到内存中去。

进程的虚拟内存

Linux 会为每个进程分配一个单独的虚拟内存地址,

<img data-original="http://oss.mflyyou.cn/blog/20200711174755.png?author=zhangpanqin" alt="image-20200711174755550" style="zoom: 50%;" />

当我们的程序运行的时候,不是整个程序的代码文件一次性全部加载到内存中去,而是执行懒加载。

机械硬盘使用扇区来管理磁盘,磁盘控制器会通过块管理磁盘,系统通过 Page Cache 与磁盘控制器打交道。

一个块包含多个扇区,一个页也包含多个块。

磁盘上会有一个文件对应一个 Inode,Innode 记录文件的元数据及数据所在位置。

当系统启动的时候,这些 Inode 数据会被加载到主存中去。不过系统中的 Inode 还记录他们对应的物理内存中的位置(实际就是对应 Page Cache),有的 Inode 对应的数据没有加载到内存中,Inode 就不会记录其对应的内存地址。

程序执行之前会初始化其虚拟内存,虚拟内存会记录代码对应哪些 Innode。

当执行程序的时候,系统会初始化当前程序的虚拟内存,然后运行 main 函数,当发现执行代码时,有的代码没有加载到内存,就会触发缺页异常,将根据虚拟页找到对应的 Innoe ,然后将磁盘中需要的数据加载到内存中,然后将虚拟页标记为已加载到内存,下次访问直接从内存中访问。

Java 中的 mmap

看源码我们发现 open.map 返回的也是 DirectByteBuffer,只是这个方法返回的 DirectByteBuffer 使用了不同的构造方法,它绑定了一个 fd 。当我们读写数据的时候是不会触发系统调用 read 和 write 的,也就是内存映射的好处。

public class MMapDemo {
    public static void main(String[] args) throws URISyntaxException, IOException, InterruptedException {
        final URL resource = MMapDemo.class.getClassLoader().getResource("demo.txt");
        final Path path = Paths.get(resource.toURI());
        final FileChannel open = FileChannel.open(path, StandardOpenOption.READ);
        // 发起系统调用 mmap
        final MappedByteBuffer map = open.map(FileChannel.MapMode.READ_ONLY, 0, open.size());
        // 读取数据时,不会再出发调用 read,直接从自己的虚拟内存中即可拿数据
        final CharBuffer decode = StandardCharsets.UTF_8.decode(map);
        System.out.println(decode.toString());
        open.close();
        Thread.sleep(100000);
    }
}

尽管下面这个也是 DirectByteBuffer ,但是它和 mmap 不同的是,他没有绑定 fd,读写数据的时候还是要经过从用户空间到内核空间的 copy ,也会发生系统调用,效率相对 mmap 低。

public class MMapDemo {
    public static void main(String[] args) throws URISyntaxException, IOException, InterruptedException {
        final URL resource = MMapDemo.class.getClassLoader().getResource("demo.txt");
        final Path path = Paths.get(resource.toURI());
        final FileChannel open = FileChannel.open(path, StandardOpenOption.READ);
        // 这个 DirectByteBuffer 使用的构造不一样,它会走系统调用 read
        final ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);
        final int read = open.read(byteBuffer);
        byteBuffer.flip();
        System.out.println(StandardCharsets.UTF_8.decode(byteBuffer).toString());
        Thread.sleep(100000);
    }
}

追踪代码的系统调用,在 linux 下使用 strace

#!/bin/bash
rm -fr /nio/out.*
cd /nio/target/classes
strace -ff -o /nio/out java com.fly.blog.nio.MMapDemo

数据读写速度上 mmap 大于 ByteBuffer.allocateDirect 大于 ByteBuffer.allocate


本文由 张攀钦的博客http://www.mflyyou.cn/ 创作。 可自由转载、引用,但需署名作者且注明文章出处。

如转载至微信公众号,请在文末添加作者公众号二维码。微信公众号名称:Mflyyou

查看原文

赞 0 收藏 0 评论 0

张攀钦 发布了文章 · 7月5日

java中强软弱虚引用的妙用

前言

ThreadLocal 在什么情况下可能发生内存泄漏?如果你想清楚这个问题的来龙去脉,看源码是必不可少的,看了源码之后你发现,实际 ThreadLocal 中实际用到 static class Entry extends WeakReference<ThreadLocal<?>> {} ,谜底实际就是使用了弱引用 WeakReference

本文内容概要

  • 强引用:Object o = new Object()
  • 软引用:new SoftReference(o);
  • 弱引用:new WeakReference(o);
  • 虚引用:new PhantomReference(o);
  • ThreadLocal 的使用,及使用不当发生内存泄漏的原因

Jdk 1.2 增加了抽象类 ReferenceSoftReferenceWeakReferencePhantomReference,扩展了引用类型分类,达到对内存更细粒度的控制。

比如我们的缓存数据,当内存不够用的时候,我希望缓存可以释放内存,或者将缓存存入到堆外等。

但我们怎么区分哪些对象需要回收(垃圾回收算法,可达性分析),回收的时候可以让我们拿到回收的通知,所以 JDK 1.2 带来这几个引用类型。

引用类型什么时候回收
强引用强引用的对象,只要 GC root 可达,不会被回收,内存不够用了,会抛出 oom
软引用:SoftReference软引用对象,GC root 中,只有软引用可以到达某个对象 a,在 oom 之前,垃圾回收会回收对象 a
弱引用:WeakReference弱引用,GC root 中,只有弱引用可以到达某个对象 c,发生 gc 就会被回收掉 c
虚引用:PhantomReference虚引用,必须配合 ReferenceQueue 使用,什么时候回收不知道,但回收之后,可以操作 ReferenceQueue 获取被回收的引用

强引用

强引用就是我们经常用到的方式:Object o = new Object()。垃圾回收时,强引用的变量是不会被回收,只有设置 o=null,jvm 通过可达性分析,没有 GC root 到达对象,垃圾回收器才会清理堆中的对象,释放内存。 当继续申请内存分配,就会 oom。

定义一个类 Demo,Demo 实例占用内存大小为 10m,不停往 list 添加 Demo 的示例,由于不能申请到内存分配,程序抛出 oom 终止

// -Xmx600m
public class SoftReferenceDemo {
    // 1m
    private static int _1M = 1024 * 1024 * 1;
    public static void main(String[] args) throws InterruptedException {
        ArrayList<Object> objects = Lists.newArrayListWithCapacity(50);
        int count = 1;
        while (true) {
            Thread.sleep(100);
            // 获取 jvm 空闲的内存为多少 m
            long meme_free = Runtime.getRuntime().freeMemory() / _1M;
            if ((meme_free - 10) >= 0) {
                Demo demo = new Demo(count);
                objects.add(demo);
                count++;
                demo = null;
            }
            System.out.println("jvm 空闲内存" + meme_free + " m");
            System.out.println(objects.size());
        }
    }

    @Data
    static class Demo {
        private byte[] a = new byte[_1M * 10];
        private String str;
        public Demo(int i) {
            this.str = String.valueOf(i);
        }
    }
}

以上代码运行结果,抛出 oom 程序停止

jvm 空闲内存41 m
54
Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
    at com.fly.blog.ref.SoftReferenceDemo$Demo.<init>(SoftReferenceDemo.java:37)
    at com.fly.blog.ref.SoftReferenceDemo.main(SoftReferenceDemo.java:25)

但是有的业务场景,需要我们在内存不够用,可以释放掉一些不必要的数据。比如我们在缓存中存的用户信息。

软引用

jdk 从 1.2 开始加入了 Reference ,SoftReference 是其中一个分类,它的作用是,通过 GC root 到达对象 a,仅有 SoftReference ,对象 a 将会在jvm oom 之前,被 jvm gc 释放掉。

无限循环往 List 添加 10m 左右大小的数据(SoftReference),发现没有出现 oom。

// -Xmx600m
public class SoftReferenceDemo {
    // 1m
    private static int _1M = 1024 * 1024 * 1;
    public static void main(String[] args) throws InterruptedException {
        ArrayList<Object> objects = Lists.newArrayListWithCapacity(50);
        int count = 1;
        while (true) {
            Thread.sleep(500);
            // 获取 jvm 空闲的内存为多少 m
            long meme_free = Runtime.getRuntime().freeMemory() / _1M;
            if ((meme_free - 10) >= 0) {
                Demo demo = new Demo(count);
                SoftReference<Demo> demoSoftReference = new SoftReference<>(demo);
                objects.add(demoSoftReference);
                count++;
                // demo 为 null,只有 demoSoftReference 一条引用到达 Demo 的实例,GC 将会在 oom 之前回收 Demo 的实例
                demo = null;
            }
            System.out.println("jvm 空闲内存" + meme_free + " m");
            System.out.println(objects.size());
        }
    }
    @Data
    static class Demo {
        private byte[] a = new byte[_1M * 10];
        private String str;
        public Demo(int i) {
            this.str = String.valueOf(i);
        }
    }
}

image-20200625213429845

通过 jvisualvm 查看 jvm 堆的使用,可以看到堆在要溢出的时候就会回收掉,空闲的内存很大的时候,你主动执行 执行垃圾回收,内存是不会回收的。

弱引用

对象 demo 的引用只有 WeakReference 可达时,会在 gc 之后回收 demo 释放掉内存。

以下程序也会一直不停的运行,只是内存释放的时机不同而已

// -Xmx600m -XX:+PrintGCDetails
public class WeakReferenceDemo {
    // 1m
    private static int _1M = 1024 * 1024 * 1;

    public static void main(String[] args) throws InterruptedException {
        ArrayList<Object> objects = Lists.newArrayListWithCapacity(50);
        int count = 1;
        while (true) {
            Thread.sleep(100);
            // 获取 jvm 空闲的内存为多少 m
            long meme_free = Runtime.getRuntime().freeMemory() / _1M;
            if ((meme_free - 10) >= 0) {
                Demo demo = new Demo(count);
                WeakReference<Demo> demoWeakReference = new WeakReference<>(demo);
                objects.add(demoWeakReference);
                count++;
                demo = null;
            }
            System.out.println("jvm 空闲内存" + meme_free + " m");
            System.out.println(objects.size());
        }
    }

    @Data
    static class Demo {
        private byte[] a = new byte[_1M * 10];
        private String str;
        public Demo(int i) {
            this.str = String.valueOf(i);
        }
    }
}

运行结果,SoftReference 可用内存在快用尽的时候就会释放掉内存,而 WeakReference 每次可用内存达到 360m 左右会进行垃圾,而释放掉内存

[GC (Allocation Failure) [PSYoungGen: 129159K->1088K(153088K)] 129175K->1104K(502784K), 0.0007990 secs] [Times: user=0.00 sys=0.00, real=0.00 secs] 
jvm 空闲内存364 m
36
jvm 空闲内存477 m

虚引用

也有称呼为 幻灵引用,因为你不知道什么时候被回收,所需必须配合 ReferenceQueue,当对象回收时,可以从这个队列拿到 PhantomReference 的实例。

// -Xmx600m -XX:+PrintGCDetails
public class PhantomReferenceDemo {
    // 1m
    private static int _1M = 1024 * 1024 * 1;

    private static ReferenceQueue referenceQueue = new ReferenceQueue();

    public static void main(String[] args) throws InterruptedException {
        ArrayList<Object> objects = Lists.newArrayListWithCapacity(50);
        int count = 1;
        new Thread(() -> {
            while (true) {
                try {
                    Reference remove = referenceQueue.remove();
                    // objects 可达性分析,可以到达 PhantomReference<Demo>,内存是不能及时释放的,我们需要在队里中拿到那个 Demo 被回收了,然后
                    // 从 objects 移除这个对象
                    if (objects.remove(remove)) {
                        System.out.println("移除元素");
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        while (true) {
            Thread.sleep(500);
            // 获取 jvm 空闲的内存为多少 m
            long meme_free = Runtime.getRuntime().freeMemory() / _1M;
            if ((meme_free - 10) > 40) {
                Demo demo = new Demo(count);
                PhantomReference<Demo> demoWeakReference = new PhantomReference<>(demo, referenceQueue);
                objects.add(demoWeakReference);
                count++;
                demo = null;
            }
            System.out.println("jvm 空闲内存" + meme_free + " m");
            System.out.println(objects.size());
        }
    }

    @Data
    static class Demo {
        private byte[] a = new byte[_1M * 10];
        private String str;

        public Demo(int i) {
            this.str = String.valueOf(i);
        }
    }
}

ThreadLocal

ThreadLocal 在我们实际开发中,用的还是比较多的。那它到底是个什么东东呢(线程本地变量),我们知道 局部变量 (方法内定义的变量)和 成员变量 (类的属性)。

有的时候呢,我们希望一个变量的生命周期可以贯穿整个线程的一个任务运行周期(线程池中的线程可以分配执行不同的任务),在各个方法调用的时候我们可以拿到这个预先设置的变量,这就是 ThreadLocal 的作用。

比如我们想要拿到当前请求的 HttpServletRequest,然后在当前各个方法都可以获取到,SpringBoot 已经帮我们封装好了,RequestContextFilter 在每个请求过来之后,都会通过 RequestContextHolder 设置线程本地变量,原理就是操作 ThreadLocal

ThreadLocal 只是针对当前线程中的调用,跨线程调用是不行的,所以 Jdk 通过 InheritableThreadLocal 继承 ThreadLocal 来实现。

ThreadLocal 获取当前请求的用户信息

看注释大致就能明白 TheadLocal 怎么使用了

/**
 * @author 张攀钦
 * @date 2018/12/21-22:59
 */
@RestController
public class UserInfoController {
    @RequestMapping("/user/info")
    public UserInfoDTO getUserInfoDTO() {
        return UserInfoInterceptor.getCurrentRequestUserInfoDTO();
    }
}

@Slf4j
public class UserInfoInterceptor implements HandlerInterceptor {
    private static final ThreadLocal<UserInfoDTO> THREAD_LOCAL = new ThreadLocal();
    // 请求头用户名
    private static final String USER_NAME = "userName";
    // 注意这个,只有注入到 ioc 中的 bean,才能注入进来
    @Autowired
    private IUserInfoService userInfoService;
    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        // 判断是不是接口请求
        if (handler instanceof HandlerMethod) {
            String userName = request.getHeader(USER_NAME);
            UserInfoDTO userInfoByUserName = userInfoService.getUserInfoByUserName(userName);
            THREAD_LOCAL.set(userInfoByUserName);
            return true;
        }
        return false;
    }
    @Override
    public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
        // 用完之后记得释放掉内存
        THREAD_LOCAL.remove();
    }
    // 获取当前线程设置的用户信息
    public static UserInfoDTO getCurrentRequestUserInfoDTO() {
        return THREAD_LOCAL.get();
    }
}

@Configuration
public class WebMvcConfig implements WebMvcConfigurer {

    /**
     * 将 UserInfoInterceptor 注入到 ioc 容器中
     */
    @Bean
    public UserInfoInterceptor getUserInfoInterceptor() {
        return new UserInfoInterceptor();
    }

    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        // 调用这个方法返回的就是 ioc 的 bean
        registry.addInterceptor(getUserInfoInterceptor()).addPathPatterns("/**");
    }
}

InheritableThreadLocal

有的时候,我们希望当前线程的局部变量的生命周期可以延伸到子线程 中,父线程设置的变量,在子线程拿到。 InheritableThreadLocal 就是提供了这个能力。

/**
 * @author 张攀钦
 * @date 2020-06-27-21:18
 */
public class InheritableThreadLocalDemo {
    static InheritableThreadLocal<String> INHERITABLE_THREAD_LOCAL = new InheritableThreadLocal();
    static ThreadLocal<String> THREAD_LOCAL = new ThreadLocal<>();
    public static void main(String[] args) throws InterruptedException {
        INHERITABLE_THREAD_LOCAL.set("父线程中使用 InheritableThreadLocal 设置变量");
        THREAD_LOCAL.set("父线程中使用 ThreadLocal 设置变量");
        Thread thread = new Thread(
                () -> {
                    // 能拿到设置的变量
                    System.out.println("从 InheritableThreadLocal 拿父线程设置的变量: " + INHERITABLE_THREAD_LOCAL.get());
                    // 打印为 null
                    System.out.println("从 ThreadLocal 拿父线程设置的变量: " + THREAD_LOCAL.get());
                }
        );
        thread.start();
        thread.join();
    }
}

ThreadLocal get 方法源码分析

你可以理解 Thead 对象有个属性 Map,它的 key 是 ThreadLoal 实例,获取线程局部变量的源码

public class ThreadLocal<T> {
    public T get() {
        // 获取运行在那个线程中
        Thread t = Thread.currentThread();
        // 从 Thread 拿 Map 
        ThreadLocalMap map = getMap(t);
        if (map != null) {
            // 使用 ThreadLocal 实例从 Map 获取值
            ThreadLocalMap.Entry e = map.getEntry(this);
            if (e != null) {
                @SuppressWarnings("unchecked")
                T result = (T)e.value;
                return result;
            }
        }
        // 初始化 Map,并返回初始化值,默认为 null,你可以定义方法,从这个方法加载初始化值
        return setInitialValue();
    }
}

InheritableThreadLocal 获取父线程设置的数据分析

每个 Thread 还有一个 Map 属性为 inheritableThreadLocals,用于保存从父线程复制过来的 value 。

当初始化子线程的时候,它会将父线程的 Map (inheritableThreadLocals) 的值复制到自己的 Thead Map (inheritableThreadLocals)过来,每个线程维护自己的 inheritableThreadLocals, 所以子线程改不了父线程维护的数据,只是子线程可以获得父线程设置的数据。

public class Thread{
    
    // 维护线程本地变量
    ThreadLocal.ThreadLocalMap threadLocals = null;

    // 维护可以子线程可以继承的父线程的数据
    ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
    
   // 线程初始化
    public Thread(ThreadGroup group, Runnable target, String name,
                  long stackSize) {
        init(group, target, name, stackSize);
    }
    
    private void init(ThreadGroup g, Runnable target, String name,
                      long stackSize, AccessControlContext acc,
                      boolean inheritThreadLocals) {
        if (inheritThreadLocals && parent.inheritableThreadLocals != null){
            // 将父线程的 inheritableThreadLocals 数据复制到子线程中去
            this.inheritableThreadLocals = ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
        }
    }
}

public class TheadLocal{
    static ThreadLocalMap createInheritedMap(ThreadLocalMap parentMap) {
        /// 创建自己线程的 Map,将父线程的值复制进去
        return new ThreadLocalMap(parentMap);
    }

    static class ThreadLocalMap {
        private ThreadLocalMap(ThreadLocalMap parentMap) {
            Entry[] parentTable = parentMap.table;
            int len = parentTable.length;
            setThreshold(len);
            table = new Entry[len];
            // 遍历父线程,将数据复制过来
            for (int j = 0; j < len; j++) {
                Entry e = parentTable[j];
                if (e != null) {
                    @SuppressWarnings("unchecked")
                    ThreadLocal<Object> key = (ThreadLocal<Object>) e.get();
                    if (key != null) {
                        Object value = key.childValue(e.value);
                        Entry c = new Entry(key, value);
                        int h = key.threadLocalHashCode & (len - 1);
                        while (table[h] != null)
                            h = nextIndex(h, len);
                        table[h] = c;
                        size++;
                    }
                }
            }
        }
    }
} 

demo 验证,以上分析

image-20200627232351534

image-20200627225502636

内存泄漏原因

定义了一个 20 大小的线程池,执行 50 次任务,执行完之后,将 threadLocal 置为 null,模拟内存泄漏的场景 。为了排除干扰因素,我设置 jvm 参数为 -Xms8g -Xmx8g -XX:+PrintGCDetails

public class ThreadLocalDemo {
    private static ExecutorService executorService = Executors.newFixedThreadPool(20);
    private static ThreadLocal threadLocal = new ThreadLocal();

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 50; i++) {
            executorService.submit(() -> {
                try {
                    threadLocal.set(new Demo());
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    if (Objects.nonNull(threadLocal)) {
                        // 为防止内存泄漏,当前线程用完,清除掉 value
//                        threadLocal.remove();
                    }
                }
            });
        }
        Thread.sleep(5000);
        threadLocal = null;
        while (true) {
            Thread.sleep(2000);
        }
    }
    @Data
    static class Demo {
        //
        private Demo[] demos = new Demo[1024 * 1024 * 5];
    }
}

运行程序,没有打印 gc 日志,说明没有进行垃圾回收

image-20200628020439866

image-20200628020512394

Java VisualVM 中我们 执行垃圾回收,回收之后的内存分布,这个 20 个 ThreadLocalDemo$Demo[] 是回收不了的,这就是内存泄漏。

image-20200628020811328

程序循环 50 次创建了 50 个 Demo ,程序运行期间是不会触发垃圾回收(设置 jvm 参数保证的),所以 ThreadLocalDemo$Demo[] 存活的实例数为 50

当我手动触发了 GC,实例数降为 20,并不是我们期望的 0,这就是程序发生了内存泄漏问题

为什么发生了内存泄漏呢?

因为每个线程对应一个 Thread,线程池大小为 20 个。Thread 中有 ThreadLocal.ThreadLocalMap threadLocals = null;

ThreadLocalMap 中有 Entry[] tables,k 为弱引用。当我们将 threadLocal 置为 null 的时候,GC ROOT 到 ThreadLocalDemo$Demo[] 引用链还是存在的,只是 k 回收掉了,value 依然存在的,tables 长度是不会变的,是不会被回收的。

image-20200628023936332

ThreadLocal 在setget 的时候,针对 k 为 null 的情况做了优化,会将对应的 tables[i] 设置为 null。这样单个 Entry 就可以被回收了。但是我们将 ThreadLocal 置为 null 之后,不能操作方法调用了。只能等到 Thread 再次调用别的 ThreadLocal 时操作 ThreadLocalMap 时根据条件判断,进行 Map 的 rehash,将 k 为 null 的 Entry 删除掉。

上述问题解决也比较方便,线程使用完 线程局部变量,调用 remove 主动清除 Entry 就可以了。


本文由 张攀钦的博客http://www.mflyyou.cn/ 创作。 可自由转载、引用,但需署名作者且注明文章出处。

如转载至微信公众号,请在文末添加作者公众号二维码。微信公众号名称:Mflyyou

查看原文

赞 0 收藏 0 评论 0

张攀钦 发布了文章 · 7月5日

从Linux内核理解Java中的IO

前言

刚接触 Java IO 的时候, 一直有一个 困惑:为什么 BufferedInputStreamFileInputStream 快? 随着对 Linux 了解,这个问题也得到解决。最近也在看 Linux 内核 方面的书,想了解程序在 Linux 上运行的过程,感觉收获还是很多的。

基于安全考虑,只有 Linux内核 才能权限去访问计算机的硬件,Linux内核会提供一些接口(系统调用)让我们可以和硬件交互。不过数据一般都是从硬件内核态 ,再从 Linux内核 复制到 用户态 进程的内存空间中,这样进程才能对读取的数据进行处理。

image-20200704231239764

本文内容:

  • Linux 中的虚拟文件系统介绍
  • Page Cache 和 Dirty Page
  • Java api 写入的数据,什么时候会被刷新到磁盘中

Linux 中 虚拟文件系统(VFS)

虚拟文件系统(Virtual File System,简称VFS)是Linux内核的子系统之一,它为操作文件(普通文件,socket 等)提供了统一的接口,屏蔽不同的硬件差异和操作细节。我们只需调用 openreadwriteclosefsync 这些系统调用,达到操作文件的目的。

我们实际看到的 linux的目录,实际就是 VFS 中的路径,我们可以通过将硬盘中的分区挂载到 linux 中的路径下,访问虚拟文件系统中的路径既可以访问硬盘中的内容。

df -i 可以看到 VFS 中路径挂载的分区。

image-20200704233624173

# 将分区挂载到虚拟文件系统的 /boot 目录下
mount /dev/sda1 /boot

# 卸载分区
umount /boot

操作系统会将硬盘分成两个区域,一个是数据区,用于保存文件的数据;还有一个 Inode 区用于保存文件的元数据(文件创建者,文件创建时间,文件权限,文件大小,块位置等)。

硬盘的最小存储单位叫做"扇区"(Sector),每个扇区储存512字节(相当于0.5KB)。Linux 内核 从硬盘读取内容时,不会一个扇区一个扇区读,而是一次性读取多个扇区,即一次性读取一个 块(Block)。文件的数据内容储存在 中。

基于以上介绍,可以知道,实际一个文件必须占有一个 Inode 和 至少一个 block

df -i 可以查看分区中,inode 的使用情况和分区对应 Linux 下的文件路径。

查看文件的 Inode 的基本大小(一般 4KB)

image-20200705002110818

当应用程序调用系统调用 open,会返回一个文件描述符 (简称 FD,File Decsriptor)。我们可以把 FD 理解为文件的指针,这个指针会指向一个Inode 。多个 FD 可以指向同一个 Inode,FD 会维护一个对文件内容操作的偏移量(读写到什么地方了)。FD 是上层应用程序使用的,Inode 是内核维护使用的。

但是进程打开的 FD 是有限制的,所以我们需要关闭流(实际上就是释放申请的计算机资源),不然 FD 不释放,程序发起系统调用没有 FD可用就会报错。

ulimit -n 可以查看系统限制的进程打开 FD 的数量,当程序并发很高的时候,需要调大此值,不然会报 (Too many open files)

public class ErrorOpenFile {
    public static void main(String[] args) throws IOException, InterruptedException {
        final Path path = Paths.get("/root/testfileio/out.txt");
        int count = 0;
        while (true) {
            // 为了查看 FD 的增长,所以设置阻塞五秒
            Thread.sleep(5000);
            count++;
            Files.newBufferedReader(path);
            System.out.println("打开一个文件描述符");
        }
    }
}

/proc/pid/fd 下可以看到一个进程打开的 FD,其中的 0、1、2 是默认输入(System.in),输出(System.out),错误输出(System.err),每个程序都会有。

image-20200705004254331

为什么 BufferedInputStreamFileInputStream 快?

下面的程序,FileOutputStreamBufferedOutputStream 循环 10000 次,写入相同大小的数据,FileOutputStream 用时 468 毫秒。BufferedOutputStream 用时 3 毫秒。

public class IoOperation {
    static byte[] data = "1234567890\n".getBytes();
    static String path = "/root/testfileio/out.txt";
    static int count = 0;
    public static void main(String[] args) throws Exception {
        switch (args[0]) {
            case "0":
                testBasicFileIO();
                break;
            case "1":
                testBufferedFileIO();
                break;
            default:

        }
    }
    // 468 毫秒执行完 
    public static void testBasicFileIO() throws Exception {
        File file = new File(path);
        FileOutputStream out = new FileOutputStream(file);
        final long start = System.currentTimeMillis();
        while (count < 10000) {
            out.write(data);
            count++;
        }
        System.out.println(System.currentTimeMillis() - start);
        out.close();
    }
    // 3 毫秒执行完 
    public static void testBufferedFileIO() throws Exception {
        File file = new File(path);
        BufferedOutputStream out = new BufferedOutputStream(new FileOutputStream(file));
        final long start = System.currentTimeMillis();
        while (count < 10000) {
            out.write(data);
            count++;
        }
        System.out.println(System.currentTimeMillis() - start);
        out.close();
    }
}

VFS 抽象出来的系统调用(open,read,write,close)是让应用程序调用的。我们可以在 Linux 中使用 man open(read/write/close) 查看系统调用的意思

也可以在 Linux 手册https://man7.org/linux/man-pages/dir_section_2.html 看系统调用。

ssize_t write(int fd, const void *buf, size_t count);

write 系统调用,是把缓存区 buf 中的前 count 个字节写入到 fd 中,返回的是实际写入到文件中的字节数 ssize_t,ssize_t 可能小于 count。

write 系统调用 会触发进程从用户态切换到内核态,Cpu 需要保存进程用户态的上下文(代码执行到哪里了,相关数据等),再执行内核代码,执行完内核代码,还要切换回用户态,将进程的上下文再还原,相对来说进程态的切换是比较消耗 Cpu 资源的,我们应该减少 Cpu 资源的切换。

# 执行上面代码,并追踪系统调用
strace -ff -o /root/testfileio/out java com.fly.io.IoOperation $1

image-20200705122935804

FileInputStream 会调用 10000 次系统调用,进程用户态到内核态切换了 10000 次,所以代码执行时间比较长。

BufferedOutputStream 有一个 8192 字节的缓冲区,当调用 BufferedOutputStream.write 会先写入这个缓冲区,在这个缓冲区满的时候,会将这个缓冲区的数据发起系统调用,这样减少了系统调用,所以用时比较少。

Page Cache 和 Dirty Page

文件数据的持久化,也被称为 落盘内存 的速度是 硬盘 N 倍,他俩不是一个量级的。 所以 Linux 引入 Page Cache 来作为数据的缓存,当 Page Cache 被修改之后变为了 Dirty Page,Linux 会在适当时机(可以通过参数调节),将脏页的数据,刷新到硬盘中。也可以调用系统调用(fsync),将脏页刷新到硬盘。

JAVA 程序 调用 FileOutputStream.write 的时候,实际是将用户态的数据,写入到了内核态中的 Page Cache (一个 Page Cache 大小为 4KB 左右),当我们调用 FileOutputStream.close 的时候,实际只是调用了系统调用 close,而没有落盘,这时对计算机断电,数据是没有持久化的。

当我们调用了 FileOutputStream.getFD().sync() 会触发系统调用 fsync,将数据落盘。

image-20200704201130013

Linux 内核进行 Io 调度,来控制数据落盘,时机是:

  1. 当空闲内存低于一个特定的阈值时,内核必须将脏页写回磁盘,以便释放内存。
  2. 当脏页在内存中驻留时间超过一个特定的阈值时,内核必须将超时的脏页写回磁盘吧
  3. 用户进程调用sync(2)fsync(2)fdatasync(2)系统调用时,内核会执行相应的写回操作。

一下是内核参数配置,进行控制内核的调度

sysctl -a | grep dirty 可以查看当前系统生效的配置

#若脏页占总物理内存10%以上,则触发flush把脏数据写回磁盘。内核后台线程写。
vm.dirty_background_ratio = 10
vm.dirty_background_bytes = 1048576
# 向内存写 pagecage 时,内核判断当前脏页占用物理内存的百分比,当超过这个值, 内核会阻塞掉写操作,并开始刷新脏页
vm.dirty_ratio = 10
vm.dirty_bytes = 1048576
# flush每隔5秒执行一次
vm.dirty_writeback_centisecs = 5000
#内存中驻留30秒以上的脏数据将由flush在下一次执行时写入磁盘
vm.dirty_expire_centisecs = 30000

代码验证 FileOutputStream.close 不会引起数据的落盘。为避免 Linux Io 调度的影响,我修改了内核的配置参数,这样数据只要没有调用系统调用 fsync 就不会触发系统调用。

# 编辑配置文件,将参数配置填入文件中
vim /etc/sysctl.conf

# 使配置生效
sysctl -p
vm.dirty_background_ratio = 90
vm.dirty_ratio = 90
vm.dirty_expire_centisecs = 300000
vm.dirty_writeback_centisecs = 50000

代码的逻辑为:往一个文件中写数据,然后关闭流,但是阻塞程序停止,程序停止,数据会刷新到磁盘中,然后模拟断电关闭虚拟机。

当打印 没有落盘的时候cat /root/testfileio/out.txt 是可以看到数据的,当我断电重启之后,数据就没有了。说明 close 不能触发数据的落盘。

public class IoOperation1 {
    static byte[] data = "1234567890\n".getBytes();
    static String path = "/root/testfileio/out.txt";
    static int count = 0;

    public static void main(String[] args) throws Exception {
        File file = new File(path);
        final FileOutputStream out = new FileOutputStream(file);
        while (count < 10) {
            out.write(data);
            count++;
        }
        out.close();
        System.out.println("没有落盘");
        Thread.sleep(1000000);
    }
}

当我们调用系统调用进行落盘的时候,断电重启虚拟机,发现 out.txt 是有数据的。

public class IoOperation1 {
    static byte[] data = "1234567890\n".getBytes();
    static String path = "/root/testfileio/out.txt";
    static int count = 0;

    public static void main(String[] args) throws Exception {
        File file = new File(path);
        final FileOutputStream out = new FileOutputStream(file);
        while (count < 10) {
            out.write(data);
            count++;
        }
        // 发起了系统调用 fsync,进行数据的落盘
        out.getFD().sync();
        out.close();
        System.out.println("落盘");
        Thread.sleep(1000000);
    }
}

本文由 张攀钦的博客http://www.mflyyou.cn/ 创作。 可自由转载、引用,但需署名作者且注明文章出处。

如转载至微信公众号,请在文末添加作者公众号二维码。微信公众号名称:Mflyyou

查看原文

赞 0 收藏 0 评论 0

张攀钦 赞了文章 · 7月4日

文件描述符(File Descriptor)简介

维基百科:文件描述符在形式上是一个非负整数。实际上,它是一个索引值,指向内核为每一个进程所维护的该进程打开文件的记录表。当程序打开一个现有文件或者创建一个新文件时,内核向进程返回一个文件描述符。在程序设计中,一些涉及底层的程序编写往往会围绕着文件描述符展开。

一、文件描述符概念

  Linux 系统中,把一切都看做是文件,当进程打开现有文件或创建新文件时,内核向进程返回一个文件描述符,文件描述符就是内核为了高效管理已被打开的文件所创建的索引,用来指向被打开的文件,所有执行I/O操作的系统调用都会通过文件描述符。

二、文件描述符、文件、进程间的关系

1.描述:

  • 每个文件描述符会与一个打开的文件相对应

  • 不同的文件描述符也可能指向同一个文件

  • 相同的文件可以被不同的进程打开,也可以在同一个进程被多次打开

2.系统为维护文件描述符,建立了三个表

3.通过这三个表,认识文件描述符

图片描述

  • 在进程A中,文件描述符1和30都指向了同一个打开的文件句柄(#23),这可能是该进程多次对执行打开操作

  • 进程A中的文件描述符2和进程B的文件描述符2都指向了同一个打开的文件句柄(#73),这种情况有几种可能,1.进程A和进程B可能是父子进程关系;2.进程A和进程B打开了同一个文件,且文件描述符相同(低概率事件=_=);3.A、B中某个进程通过UNIX域套接字将一个打开的文件描述符传递给另一个进程。

  • 进程A的描述符0和进程B的描述符3分别指向不同的打开文件句柄,但这些句柄均指向i-node表的相同条目(#1936),换言之,指向同一个文件。发生这种情况是因为每个进程各自对同一个文件发起了打开请求。同一个进程两次打开同一个文件,也会发生类似情况。

前人的思考,我们的阶梯,这部分参考自网络:链接

三、文件描述符限制

  有资源的地方就有战争,“文件描述符”也是一种资源,系统中的每个进程都需要有“文件描述符”才能进行改变世界的宏图霸业。世界需要秩序,于是就有了“文件描述符限制”的规定。

如下表:

图片描述

永久修改用户级限制时有三种设置类型:

  1. soft 指的是当前系统生效的设置值

  2. hard 指的是系统中所能设定的最大值

  3. - 指的是同时设置了 soft 和 hard 的值

命令讲解:

四、检查某个进程的文件描述符相关内容

步骤(以nginx为例,*注意权限问题,此示例是在本地环境):

  1. 找到需要检查的进程id

如图,找到的进程id为 1367

  1. 查看该进程的限制

图片描述

如图,在 Max open files 那一行,可以看到当前设置中最大文件描述符的数量为1024

  1. 查看该进程占用了多少个文件描述符

图片描述

如图所示,使用了17个文件描述符

总结

  实际应用过程中,如果出现“Too many open files” , 可以通过增大进程可用的文件描述符数量来解决,但往往故事不会这样结束,很多时候,并不是因为进程可用的文件描述符过少,而是因为程序bug,打开了大量的文件连接(web连接也会占用文件描述符)而没有释放。程序申请的资源在用完后及时释放,才是解决“Too many open files”的根本之道。

查看原文

赞 23 收藏 24 评论 3

张攀钦 发布了文章 · 6月28日

java中强软弱虚引用的妙用

前言

ThreadLocal 在什么情况下可能发生内存泄漏?如果你想清楚这个问题的来龙去脉,看源码是必不可少的,看了源码之后你发现,实际 ThreadLocal 中实际用到 static class Entry extends WeakReference<ThreadLocal<?>> {} ,谜底实际就是使用了弱引用 WeakReference

本文内容概要

  • 强引用:Object o = new Object()
  • 软引用:new SoftReference(o);
  • 弱引用:new WeakReference(o);
  • 虚引用:new PhantomReference(o);
  • ThreadLocal 的使用,及使用不当发生内存泄漏的原因

Jdk 1.2 增加了抽象类 ReferenceSoftReferenceWeakReferencePhantomReference,扩展了引用类型分类,达到对内存更细粒度的控制。

比如我们的缓存数据,当内存不够用的时候,我希望缓存可以释放内存,或者将缓存存入到堆外等。

但我们怎么区分哪些对象需要回收(垃圾回收算法,可达性分析),回收的时候可以让我们拿到回收的通知,所以 JDK 1.2 带来这几个引用类型。

引用类型什么时候回收
强引用强引用的对象,只要 GC root 可达,不会被回收,内存不够用了,会抛出 oom
软引用:SoftReference软引用对象,GC root 中,只有软引用可以到达某个对象 a,在 oom 之前,垃圾回收会回收对象 a
弱引用:WeakReference弱引用,GC root 中,只有弱引用可以到达某个对象 c,发生 gc 就会被回收掉 c
虚引用:PhantomReference虚引用,必须配合 ReferenceQueue 使用,什么时候回收不知道,但回收之后,可以操作 ReferenceQueue 获取被回收的引用

强引用

强引用就是我们经常用到的方式:Object o = new Object()。垃圾回收时,强引用的变量是不会被回收,只有设置 o=null,jvm 通过可达性分析,没有 GC root 到达对象,垃圾回收器才会清理堆中的对象,释放内存。 当继续申请内存分配,就会 oom。

定义一个类 Demo,Demo 实例占用内存大小为 10m,不停往 list 添加 Demo 的示例,由于不能申请到内存分配,程序抛出 oom 终止

// -Xmx600m
public class SoftReferenceDemo {
    // 1m
    private static int _1M = 1024 * 1024 * 1;
    public static void main(String[] args) throws InterruptedException {
        ArrayList<Object> objects = Lists.newArrayListWithCapacity(50);
        int count = 1;
        while (true) {
            Thread.sleep(100);
            // 获取 jvm 空闲的内存为多少 m
            long meme_free = Runtime.getRuntime().freeMemory() / _1M;
            if ((meme_free - 10) >= 0) {
                Demo demo = new Demo(count);
                objects.add(demo);
                count++;
                demo = null;
            }
            System.out.println("jvm 空闲内存" + meme_free + " m");
            System.out.println(objects.size());
        }
    }

    @Data
    static class Demo {
        private byte[] a = new byte[_1M * 10];
        private String str;
        public Demo(int i) {
            this.str = String.valueOf(i);
        }
    }
}

以上代码运行结果,抛出 oom 程序停止

jvm 空闲内存41 m
54
Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
    at com.fly.blog.ref.SoftReferenceDemo$Demo.<init>(SoftReferenceDemo.java:37)
    at com.fly.blog.ref.SoftReferenceDemo.main(SoftReferenceDemo.java:25)

但是有的业务场景,需要我们在内存不够用,可以释放掉一些不必要的数据。比如我们在缓存中存的用户信息。

软引用

jdk 从 1.2 开始加入了 Reference ,SoftReference 是其中一个分类,它的作用是,通过 GC root 到达对象 a,仅有 SoftReference ,对象 a 将会在jvm oom 之前,被 jvm gc 释放掉。

无限循环往 List 添加 10m 左右大小的数据(SoftReference),发现没有出现 oom。

// -Xmx600m
public class SoftReferenceDemo {
    // 1m
    private static int _1M = 1024 * 1024 * 1;
    public static void main(String[] args) throws InterruptedException {
        ArrayList<Object> objects = Lists.newArrayListWithCapacity(50);
        int count = 1;
        while (true) {
            Thread.sleep(500);
            // 获取 jvm 空闲的内存为多少 m
            long meme_free = Runtime.getRuntime().freeMemory() / _1M;
            if ((meme_free - 10) >= 0) {
                Demo demo = new Demo(count);
                SoftReference<Demo> demoSoftReference = new SoftReference<>(demo);
                objects.add(demoSoftReference);
                count++;
                // demo 为 null,只有 demoSoftReference 一条引用到达 Demo 的实例,GC 将会在 oom 之前回收 Demo 的实例
                demo = null;
            }
            System.out.println("jvm 空闲内存" + meme_free + " m");
            System.out.println(objects.size());
        }
    }
    @Data
    static class Demo {
        private byte[] a = new byte[_1M * 10];
        private String str;
        public Demo(int i) {
            this.str = String.valueOf(i);
        }
    }
}

image-20200625213429845

通过 jvisualvm 查看 jvm 堆的使用,可以看到堆在要溢出的时候就会回收掉,空闲的内存很大的时候,你主动执行 执行垃圾回收,内存是不会回收的。

弱引用

对象 demo 的引用只有 WeakReference 可达时,会在 gc 之后回收 demo 释放掉内存。

以下程序也会一直不停的运行,只是内存释放的时机不同而已

// -Xmx600m -XX:+PrintGCDetails
public class WeakReferenceDemo {
    // 1m
    private static int _1M = 1024 * 1024 * 1;

    public static void main(String[] args) throws InterruptedException {
        ArrayList<Object> objects = Lists.newArrayListWithCapacity(50);
        int count = 1;
        while (true) {
            Thread.sleep(100);
            // 获取 jvm 空闲的内存为多少 m
            long meme_free = Runtime.getRuntime().freeMemory() / _1M;
            if ((meme_free - 10) >= 0) {
                Demo demo = new Demo(count);
                WeakReference<Demo> demoWeakReference = new WeakReference<>(demo);
                objects.add(demoWeakReference);
                count++;
                demo = null;
            }
            System.out.println("jvm 空闲内存" + meme_free + " m");
            System.out.println(objects.size());
        }
    }

    @Data
    static class Demo {
        private byte[] a = new byte[_1M * 10];
        private String str;
        public Demo(int i) {
            this.str = String.valueOf(i);
        }
    }
}

运行结果,SoftReference 可用内存在快用尽的时候就会释放掉内存,而 WeakReference 每次可用内存达到 360m 左右会进行垃圾,而释放掉内存

[GC (Allocation Failure) [PSYoungGen: 129159K->1088K(153088K)] 129175K->1104K(502784K), 0.0007990 secs] [Times: user=0.00 sys=0.00, real=0.00 secs] 
jvm 空闲内存364 m
36
jvm 空闲内存477 m

虚引用

也有称呼为 幻灵引用,因为你不知道什么时候被回收,所需必须配合 ReferenceQueue,当对象回收时,可以从这个队列拿到 PhantomReference 的实例。

// -Xmx600m -XX:+PrintGCDetails
public class PhantomReferenceDemo {
    // 1m
    private static int _1M = 1024 * 1024 * 1;

    private static ReferenceQueue referenceQueue = new ReferenceQueue();

    public static void main(String[] args) throws InterruptedException {
        ArrayList<Object> objects = Lists.newArrayListWithCapacity(50);
        int count = 1;
        new Thread(() -> {
            while (true) {
                try {
                    Reference remove = referenceQueue.remove();
                    // objects 可达性分析,可以到达 PhantomReference<Demo>,内存是不能及时释放的,我们需要在队里中拿到那个 Demo 被回收了,然后
                    // 从 objects 移除这个对象
                    if (objects.remove(remove)) {
                        System.out.println("移除元素");
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        while (true) {
            Thread.sleep(500);
            // 获取 jvm 空闲的内存为多少 m
            long meme_free = Runtime.getRuntime().freeMemory() / _1M;
            if ((meme_free - 10) > 40) {
                Demo demo = new Demo(count);
                PhantomReference<Demo> demoWeakReference = new PhantomReference<>(demo, referenceQueue);
                objects.add(demoWeakReference);
                count++;
                demo = null;
            }
            System.out.println("jvm 空闲内存" + meme_free + " m");
            System.out.println(objects.size());
        }
    }

    @Data
    static class Demo {
        private byte[] a = new byte[_1M * 10];
        private String str;

        public Demo(int i) {
            this.str = String.valueOf(i);
        }
    }
}

ThreadLocal

ThreadLocal 在我们实际开发中,用的还是比较多的。那它到底是个什么东东呢(线程本地变量),我们知道 局部变量 (方法内定义的变量)和 成员变量 (类的属性)。

有的时候呢,我们希望一个变量的生命周期可以贯穿整个线程的一个任务运行周期(线程池中的线程可以分配执行不同的任务),在各个方法调用的时候我们可以拿到这个预先设置的变量,这就是 ThreadLocal 的作用。

比如我们想要拿到当前请求的 HttpServletRequest,然后在当前各个方法都可以获取到,SpringBoot 已经帮我们封装好了,RequestContextFilter 在每个请求过来之后,都会通过 RequestContextHolder 设置线程本地变量,原理就是操作 ThreadLocal

ThreadLocal 只是针对当前线程中的调用,跨线程调用是不行的,所以 Jdk 通过 InheritableThreadLocal 继承 ThreadLocal 来实现。

ThreadLocal 获取当前请求的用户信息

看注释大致就能明白 TheadLocal 怎么使用了

/**
 * @author 张攀钦
 * @date 2018/12/21-22:59
 */
@RestController
public class UserInfoController {
    @RequestMapping("/user/info")
    public UserInfoDTO getUserInfoDTO() {
        return UserInfoInterceptor.getCurrentRequestUserInfoDTO();
    }
}

@Slf4j
public class UserInfoInterceptor implements HandlerInterceptor {
    private static final ThreadLocal<UserInfoDTO> THREAD_LOCAL = new ThreadLocal();
    // 请求头用户名
    private static final String USER_NAME = "userName";
    // 注意这个,只有注入到 ioc 中的 bean,才能注入进来
    @Autowired
    private IUserInfoService userInfoService;
    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        // 判断是不是接口请求
        if (handler instanceof HandlerMethod) {
            String userName = request.getHeader(USER_NAME);
            UserInfoDTO userInfoByUserName = userInfoService.getUserInfoByUserName(userName);
            THREAD_LOCAL.set(userInfoByUserName);
            return true;
        }
        return false;
    }
    @Override
    public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
        // 用完之后记得释放掉内存
        THREAD_LOCAL.remove();
    }
    // 获取当前线程设置的用户信息
    public static UserInfoDTO getCurrentRequestUserInfoDTO() {
        return THREAD_LOCAL.get();
    }
}

@Configuration
public class WebMvcConfig implements WebMvcConfigurer {

    /**
     * 将 UserInfoInterceptor 注入到 ioc 容器中
     */
    @Bean
    public UserInfoInterceptor getUserInfoInterceptor() {
        return new UserInfoInterceptor();
    }

    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        // 调用这个方法返回的就是 ioc 的 bean
        registry.addInterceptor(getUserInfoInterceptor()).addPathPatterns("/**");
    }
}

InheritableThreadLocal

有的时候,我们希望当前线程的局部变量的生命周期可以延伸到子线程 中,父线程设置的变量,在子线程拿到。 InheritableThreadLocal 就是提供了这个能力。

/**
 * @author 张攀钦
 * @date 2020-06-27-21:18
 */
public class InheritableThreadLocalDemo {
    static InheritableThreadLocal<String> INHERITABLE_THREAD_LOCAL = new InheritableThreadLocal();
    static ThreadLocal<String> THREAD_LOCAL = new ThreadLocal<>();
    public static void main(String[] args) throws InterruptedException {
        INHERITABLE_THREAD_LOCAL.set("父线程中使用 InheritableThreadLocal 设置变量");
        THREAD_LOCAL.set("父线程中使用 ThreadLocal 设置变量");
        Thread thread = new Thread(
                () -> {
                    // 能拿到设置的变量
                    System.out.println("从 InheritableThreadLocal 拿父线程设置的变量: " + INHERITABLE_THREAD_LOCAL.get());
                    // 打印为 null
                    System.out.println("从 ThreadLocal 拿父线程设置的变量: " + THREAD_LOCAL.get());
                }
        );
        thread.start();
        thread.join();
    }
}

ThreadLocal get 方法源码分析

你可以理解 Thead 对象有个属性 Map,它的 key 是 ThreadLoal 实例,获取线程局部变量的源码

public class ThreadLocal<T> {
    public T get() {
        // 获取运行在那个线程中
        Thread t = Thread.currentThread();
        // 从 Thread 拿 Map 
        ThreadLocalMap map = getMap(t);
        if (map != null) {
            // 使用 ThreadLocal 实例从 Map 获取值
            ThreadLocalMap.Entry e = map.getEntry(this);
            if (e != null) {
                @SuppressWarnings("unchecked")
                T result = (T)e.value;
                return result;
            }
        }
        // 初始化 Map,并返回初始化值,默认为 null,你可以定义方法,从这个方法加载初始化值
        return setInitialValue();
    }
}

InheritableThreadLocal 获取父线程设置的数据分析

每个 Thread 还有一个 Map 属性为 inheritableThreadLocals,用于保存从父线程复制过来的 value 。

当初始化子线程的时候,它会将父线程的 Map (inheritableThreadLocals) 的值复制到自己的 Thead Map (inheritableThreadLocals)过来,每个线程维护自己的 inheritableThreadLocals, 所以子线程改不了父线程维护的数据,只是子线程可以获得父线程设置的数据。

public class Thread{
    
    // 维护线程本地变量
    ThreadLocal.ThreadLocalMap threadLocals = null;

    // 维护可以子线程可以继承的父线程的数据
    ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
    
   // 线程初始化
    public Thread(ThreadGroup group, Runnable target, String name,
                  long stackSize) {
        init(group, target, name, stackSize);
    }
    
    private void init(ThreadGroup g, Runnable target, String name,
                      long stackSize, AccessControlContext acc,
                      boolean inheritThreadLocals) {
        if (inheritThreadLocals && parent.inheritableThreadLocals != null){
            // 将父线程的 inheritableThreadLocals 数据复制到子线程中去
            this.inheritableThreadLocals = ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
        }
    }
}

public class TheadLocal{
    static ThreadLocalMap createInheritedMap(ThreadLocalMap parentMap) {
        /// 创建自己线程的 Map,将父线程的值复制进去
        return new ThreadLocalMap(parentMap);
    }

    static class ThreadLocalMap {
        private ThreadLocalMap(ThreadLocalMap parentMap) {
            Entry[] parentTable = parentMap.table;
            int len = parentTable.length;
            setThreshold(len);
            table = new Entry[len];
            // 遍历父线程,将数据复制过来
            for (int j = 0; j < len; j++) {
                Entry e = parentTable[j];
                if (e != null) {
                    @SuppressWarnings("unchecked")
                    ThreadLocal<Object> key = (ThreadLocal<Object>) e.get();
                    if (key != null) {
                        Object value = key.childValue(e.value);
                        Entry c = new Entry(key, value);
                        int h = key.threadLocalHashCode & (len - 1);
                        while (table[h] != null)
                            h = nextIndex(h, len);
                        table[h] = c;
                        size++;
                    }
                }
            }
        }
    }
} 

demo 验证,以上分析

image-20200627232351534

image-20200627225502636

内存泄漏原因

定义了一个 20 大小的线程池,执行 50 次任务,执行完之后,将 threadLocal 置为 null,模拟内存泄漏的场景 。为了排除干扰因素,我设置 jvm 参数为 -Xms8g -Xmx8g -XX:+PrintGCDetails

public class ThreadLocalDemo {
    private static ExecutorService executorService = Executors.newFixedThreadPool(20);
    private static ThreadLocal threadLocal = new ThreadLocal();

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 50; i++) {
            executorService.submit(() -> {
                try {
                    threadLocal.set(new Demo());
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    if (Objects.nonNull(threadLocal)) {
                        // 为防止内存泄漏,当前线程用完,清除掉 value
//                        threadLocal.remove();
                    }
                }
            });
        }
        Thread.sleep(5000);
        threadLocal = null;
        while (true) {
            Thread.sleep(2000);
        }
    }
    @Data
    static class Demo {
        //
        private Demo[] demos = new Demo[1024 * 1024 * 5];
    }
}

运行程序,没有打印 gc 日志,说明没有进行垃圾回收

image-20200628020439866

image-20200628020512394

Java VisualVM 中我们 执行垃圾回收,回收之后的内存分布,这个 20 个 ThreadLocalDemo$Demo[] 是回收不了的,这就是内存泄漏。

image-20200628020811328

程序循环 50 次创建了 50 个 Demo ,程序运行期间是不会触发垃圾回收(设置 jvm 参数保证的),所以 ThreadLocalDemo$Demo[] 存活的实例数为 50

当我手动触发了 GC,实例数降为 20,并不是我们期望的 0,这就是程序发生了内存泄漏问题

为什么发生了内存泄漏呢?

因为每个线程对应一个 Thread,线程池大小为 20 个。Thread 中有 ThreadLocal.ThreadLocalMap threadLocals = null;

ThreadLocalMap 中有 Entry[] tables,k 为弱引用。当我们将 threadLocal 置为 null 的时候,GC ROOT 到 ThreadLocalDemo$Demo[] 引用链还是存在的,只是 k 回收掉了,value 依然存在的,tables 长度是不会变的,是不会被回收的。

image-20200628023936332

ThreadLocal 在setget 的时候,针对 k 为 null 的情况做了优化,会将对应的 tables[i] 设置为 null。这样单个 Entry 就可以被回收了。但是我们将 ThreadLocal 置为 null 之后,不能操作方法调用了。只能等到 Thread 再次调用别的 ThreadLocal 时操作 ThreadLocalMap 时根据条件判断,进行 Map 的 rehash,将 k 为 null 的 Entry 删除掉。

上述问题解决也比较方便,线程使用完 线程局部变量,调用 remove 主动清除 Entry 就可以了。


本文由 张攀钦的博客http://www.mflyyou.cn/ 创作。 可自由转载、引用,但需署名作者且注明文章出处。

如转载至微信公众号,请在文末添加作者公众号二维码。微信公众号名称:Mflyyou

查看原文

赞 1 收藏 1 评论 0

张攀钦 发布了文章 · 6月20日

你需要了解锁的前提-volatile

前言

java 中鼎鼎有名的 AQS 维护 private volatile int state 状态实现了用户态的锁。你如果不了解 volatile ,你看 AQS 的源码应该很难理解为什么Lock 能保证线程安全。

volatile 绝对是你打通 java 的任督二脉的首要条件。votaile 的特性很简单,可见性禁止指令重拍,如果你自己写代码验证过这两个特点,接下来的内容应该对你帮助不大。

单例模式懒汉式 的写法(DCL)是可以检验你对 volatile 的了解,这也是面试中被问频率较高的问题。

本文将会介绍如下内容:

  • volatile 的可见性是什么,有什么用
  • volatile 禁止指令重排是个什么东东
  • 单例模式,饿汉式和懒汉式(DCL)的写法,分析 (DCL)
  • 伪共享是什么,怎么避免伪共享

volatile

可见性

image-20200620152106412

计算机 CPU主存 交互的逻辑大致如图,CPU 的运算速度是 主存 的 100 倍左右,为了避免 CPU 被主存拖慢速度。当CPU 需要一个数据的时候,会先从 L1 找,找到直接使用;L1 中未找到,会去 L2 中,L2 中找不到会去 L3 ,L3 找不到再去主存加载到 L3,再从 L3加载到 L2 ,再从 L2 加载到 L1

这样提高的计算速度,同时也面临数据不一致问题。

主存中现在有一个变量 a=1,CPU1 a+1 之后,将结果 a=2 放入到 L1 去,但是后续代码计算还会用到 a,这时 CPU1 不会将 a=2 同步到主存中去。之后 CPU2 也从主存中取出变量 a(a=1),CPU2 将 a+2 的结果放入到 L1 中。这样就造成了数据不一致问题。缓存一致性协议就是为了解决这个问题的。

以上是计算机底层的实现原理,JAVA 在自己的虚拟机中执行,也有自己的内存模型,但不管怎么样,底层还是依靠的 CPU 指令集达到缓存一致性。JAVA 的内存模型屏蔽了不同平台缓存一致性协议的不同实现细节,定义了一套自己的内存模型。

image-20200620153620353

java 虚拟机中的变量全部储存在主存中,每个线程都有自己的工作内存,工作内存中的变量是主存变量的副本拷贝(使用那些变量,拷贝那些),每个线程只会操作工作内存的变量,当需要保存数据一致性的时候,线程会将工作内存中的变量同步到主存中去。volatile 就是让线程改变了 a 之后,回写到主存中,已达到缓存一致。

接下来代码体会一下,带不带 volatile 的区别。

public class VolatileDemo {
    private static  int a = 0;
    public static void main(String[] args) throws InterruptedException {
        new Thread(() -> {
            while (a == 0) {
            }
        }, "线程 1").start();
        System.out.println("修改 a=1 之前");
        Thread.sleep(3000);
        a = 1;
        System.out.println("修改 a=1 之后");
    }
}

运行这个程序,代码会一直运行,不会停止。这是因为 线程1 的工作内存 a 为 0,而主线程尽管修改了 a,但不会达到线程1重新加载主存中的变量 a。

public class VolatileDemo {
    // 代码的区别只是加了 volatile
    private static volatile int a = 0;
    public static void main(String[] args) throws InterruptedException {
        new Thread(() -> {
            while (a == 0) {

            }
        }, "线程 1").start();
        System.out.println("修改 a=1 之前");
        Thread.sleep(3000);
        a = 1;
        System.out.println("修改 a=1 之后");
    }
}

打印 修改 a=1 之后 程序停止。这是因为 volatile 标记的变量 a,主线程修改之后,并同步回主存,当其他的线程再使用变量 a 的时候,java 内存模型会让线程从主存加载变量 a。这就是 volatile可见性 特点。

禁止指令重排

java 中的字节码最终都会编译成机器码(CPU 指令)执行,CPU 在保证单线程中执行结果不变的情况下,可以对指令进行指令重排已达到提高执行效率。

public class VolatileOrdering2 {
    static  int b = 1;
    public static void main(String[] args) throws InterruptedException {
        int a = 0;
        b = 2;
        a += 1;
        System.out.println(a);
    }
}

上述代码指令重排执行顺序的可能:

int a=0;
a+=1;
System.out.println(a);
int b = 2;

网上也有人写的 demo 验证可能会发生指令重排的小程序

public class T04_Disorder {
    private static int x = 0, y = 0;
    private static int a = 0, b =0;

    public static void main(String[] args) throws InterruptedException {
        int i = 0;
        for(;;) {
            i++;
            x = 0; y = 0;
            a = 0; b = 0;
            Thread one = new Thread(new Runnable() {
                public void run() {
                    //由于线程one先启动,下面这句话让它等一等线程two. 读着可根据自己电脑的实际性能适当调整等待时间.
                    //shortWait(100000);
                    a = 1;
                    x = b;
                }
            });
            Thread other = new Thread(new Runnable() {
                public void run() {
                    b = 1;
                    y = a;
                }
            });
            one.start();other.start();
            one.join();other.join();
            String result = "第" + i + "次 (" + x + "," + y + ")";
            if(x == 0 && y == 0) {
                System.err.println(result);
                break;
            } else {
                //System.out.println(result);
            }
        }
    }
    public static void shortWait(long interval){
        long start = System.nanoTime();
        long end;
        do{
            end = System.nanoTime();
        }while(start + interval >= end);
    }
}

假设指令重排不会发生,那么 result 将不会打印,实际循环 n 次之后会打印 result

volatile 可以禁止指令重排。

image-20200620174915155

大致简单理解,加了内存屏障之后,代码分成 1,2,3部分。1 部分代码你怎么指令重排我不管,但是 1 部分代码执行完了之后,必须执行 2 部分代码,再执行 3 部分代码。

单例模式

饿汉式

public class SingletonDemo {
    private static final SingletonDemo INSTANCE = new SingletonDemo();
    private SingletonDemo() {
    }
    public static SingletonDemo getInstance() {
        return SingletonDemo.INSTANCE;
    }
}

一般项目中我们用这种用法即可,简单方便,也没谁闲着无聊利用别的手段给你打破单例。

懒汉式

饿汉式不管你用不用这个单例,只要类加载,单例就给你初始化好了。有的人就想让其懒加载,节约那可怜的内存,用的时候单例再实例化。

public class SingletonDemo1 {
    private SingletonDemo1() {
    }

    public static SingletonDemo1 getInstance() {
        System.out.println("SingletonDemo1Holder 类加载");
        return SingletonDemo1Holder.getInstance();
    }

    private static class SingletonDemo1Holder {
        private static final SingletonDemo1 INSTANCE = new SingletonDemo1();
        public static SingletonDemo1 getInstance() {
            return SingletonDemo1Holder.INSTANCE;
        }
    }

    public static void main(String[] args) throws InterruptedException {
        System.out.println(SingletonDemo1.getInstance());
        System.out.println(SingletonDemo1.getInstance());
    }
}

运行的时候加上这个 -XX:+TraceClassLoading 会打印加载的类。

image-20200620182245333

从图中我们可以看到调用 SingletonDemo1.getInstance() 的时候,才加载的 SingletonDemo1Holder 类,再实例化单例,达到懒加载的要求。

DCL 实现单例

以上单例的实现看着没啥技术含量,下面介绍一下 DCL (Double-checked locking),双重检查锁的实现,这也是面试会问到的点。

public class SingletonDemo2 {
    // 考点在这里,要不要加 volitale
    private volatile static SingletonDemo2 INSTANCE;
    private SingletonDemo2() {

    }
    public static SingletonDemo2 getInstance() {
        if (INSTANCE == null) {
            synchronized (SingletonDemo2.class) {
                if (INSTANCE == null) {
                    // 对象实例化
                    INSTANCE = new SingletonDemo2();
                }
            }
        }
        return INSTANCE;
    }
}

对象实例化实际可以分为几个步骤:

1、分配对象空间

2、初始化对象

3、将对象指向分配的内存空间

当指令重排的时候,2 和 3 会进行重排序,导致有的线程可能拿到未初始化的对象调用,存在风险问题。

伪共享

volatile 给我们带来了变量 可见性 的功能,但是当使用不当,会掉入另一个 伪共享 的坑。先看 demo.

public class VolatileDemo3 {
    private static volatile Demo[] demos = new Demo[2];
//    @sun.misc.Contended
    private static final class Demo {
        private volatile long x = 0L;
    }
    static {
        demos[0] = new Demo();
        demos[1] = new Demo();
    }
    public static void main(String[] args) throws InterruptedException {
        Thread thread1 = new Thread(() -> {
            for (long i = 0; i < 10000_0000L; i++) {
                demos[0].x = i;
            }
        });
        Thread thread = new Thread(() -> {
            for (long i = 0; i < 10000_0000L; i++) {
                demos[1].x = i;
            }
        });
        long start = System.nanoTime();
        thread.start();
        thread1.start();
        thread.join();
        thread1.join();
        long end = System.nanoTime();
        long runSecond = (end - start) / 100_0000;
        System.out.println("运行毫秒:" + runSecond);
    }
}

上述代码,存在伪共享的情况,我电脑运行 运行毫秒:2764

// 运行的时候,需要加上参数 -XX:-RestrictContended
public class VolatileDemo3 {
    private static volatile Demo[] demos = new Demo[2];
    @sun.misc.Contended
    private static final class Demo {
        private volatile long x = 0L;
    }
    static {
        demos[0] = new Demo();
        demos[1] = new Demo();
    }
    public static void main(String[] args) throws InterruptedException {
        Thread thread1 = new Thread(() -> {
            for (long i = 0; i < 10000_0000L; i++) {
                demos[0].x = i;
            }
        });
        Thread thread = new Thread(() -> {
            for (long i = 0; i < 10000_0000L; i++) {
                demos[1].x = i;
            }
        });
        long start = System.nanoTime();
        thread.start();
        thread1.start();
        thread.join();
        thread1.join();
        long end = System.nanoTime();
        long runSecond = (end - start) / 100_0000;
        System.out.println("运行毫秒:" + runSecond);
    }
}

上述代码,使用 @sun.misc.Contended 避免伪共享,我电脑运行 运行毫秒:813

相似的用法在 ConcurrentHashMap 可以看到,

@sun.misc.Contended 
static final class CounterCell {
    volatile long value;
    CounterCell(long x) { value = x; }
}

上述代码展示了伪共享会降低代码的运行速度。什么是伪共享呢。

还记得 Cpu 中的 L1L2L3 吗,主存中的数据加载到 Cpu 的高速缓存的最小单位就是 缓存行(64 bit)。Cpu 的缓存失效,也是以缓存行为单位失效。

Cpu 从内存加载数据的时候,它会把可能会用到的数据和目标数据一起加载到 L1/L2/L3 中。上述代码的变量 private static volatile Demo[] demos = new Demo[2]; 这两个变量被一起加载到同一个缓存行中去了,一个线程修改了其中的 demos[0].x 导致缓存行失效,另一个线程修改 demos[1].x = i; 的时候发现缓存行失效,会去主存重新加载新的数据,两个线程相互影响导致不停从内存加载,运行速度自然降低了。

@sun.misc.Contended 作用就是让其单独在一个缓存行中去。

我们也可以通过对齐填充,而避免伪共享。

缓存行 通常都是 64 bit。而 long 为 8 个 bit,我们自己补充 7 个没有用 long 变量就可以让 x 和 7个没用的变量单独一个缓存行

public class VolatileDemo3 {
    private static volatile Demo[] demos = new Demo[2];
    private static final class Demo {
        private volatile long x = 0L;
        // 缓存行对齐填充的无用数据
        private volatile long pading1, pading2, pading3, pading4, pading5, pading6, pading7;
    }
    static {
        demos[0] = new Demo();
        demos[1] = new Demo();
    }

    public static void main(String[] args) throws InterruptedException {
        Thread thread1 = new Thread(() -> {
            for (long i = 0; i < 10000_0000L; i++) {
                demos[0].x = i;
            }
        });
        Thread thread = new Thread(() -> {
            for (long i = 0; i < 10000_0000L; i++) {
                demos[1].x = i;
            }
        });
        long start = System.nanoTime();
        thread.start();
        thread1.start();
        thread.join();
        thread1.join();
        long end = System.nanoTime();
        long runSecond = (end - start) / 100_0000;
        System.out.println("运行毫秒:" + runSecond);
    }


}

本文由 张攀钦的博客http://www.mflyyou.cn/ 创作。 可自由转载、引用,但需署名作者且注明文章出处。

如转载至微信公众号,请在文末添加作者公众号二维码。微信公众号名称:Mflyyou

查看原文

赞 0 收藏 0 评论 0

张攀钦 赞了文章 · 6月17日

关于spring boot集成MQTT的一写新人问题

这几天弄了下mqtt ,发现有很多问题,网上搜不到什么解决办法,所以自己记录下来,也让初识mqtt的人少走一些坑,关于我写的不对的也希望看到的人能指出来互相学习下

安装

说到mqtt,首先肯定要安装了,安装什么的地址:http://activemq.apache.org/ap...
我本地是Windows的环境,所以装的是Windows版本,这里是第一个注意的地方,因为后面使用的时候windows和linux的有一些不同

下载完成之后就是解压安装了,这里解压完成之后进入bin目录下,自己用cmd或者直接进去在此处打开命令窗口也行,然后运行apollo.cmd 创建一个服务实例我的实例名称是mybroker所以命令是 apollo.cmd create mybroker,这个名称自己可以随便指定

创建完实例后发现bin 目录下多了一个文件夹,这个文件夹就是你实例名称,进入文件夹运行
.apollo-broker.cmd run 命令
clipboard.png
这样就启动成功了

启动成功可以去http://localhost:61680/console/index.html看看,登录账号和密码在mybrokeretcusers.properties文件中找到输入就可以进去了

clipboard.png

页面上有连接信息和订阅主题的一些对应信息,有兴趣的自己看下,后面也会讲到的

使用

安装成功接下来就是使用了,首先创建一个maven工程,引入配置

    <!--mqtt-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-integration</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-stream</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-mqtt</artifactId>
    </dependency>

由于我们后面处理订阅消息的消费者打印的日志是用了slf4j为了方便也引入了lombok的配置 :

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>

引入完成以后就可以开始准备开始使用mqtt了
这里为了方便维护和配置我把一些配置参数放在了properties文件里面:

#MQTT配置信息

spring.mqtt.username=admin

spring.mqtt.password=password

spring.mqtt.url=tcp://localhost:61613

spring.mqtt.client.id=clientId

spring.mqtt.server.id=serverId

spring.mqtt.default.topic=topic

这里我遇到了一个坑,专门注释了,就是订阅端订阅消息的id 和 发布端发布消息的id 一定不能一样,这样会导致mqtt识别到两个一样的id,消息一发就断开连接了,订阅端总是收不到消息,这个问题我找了好长时间都不知道问题出在哪,刚接触的很容易搞错,第二个问题就是mqtt的服务器连接地址,在Windows和linux下tcp的端口是不一样的,在启动的apollo的日志中可以看出来
clipboard.png

监听的tcp端口是61613,看别人很多的demo上都是1883,如果一直连不上,原因可能是因为这个

接下来就是spring.mqtt.default.topic 配置了,这个是mqtt订阅和推送的消息主题,既然你想发消息那么订阅消息的主题和发布消息的主题一致才能收到消息,和rabbitmq一样

然后就是客户端

@Configuration
@IntegrationComponentScan
@Slf4j
public class MqttSenderConfig {

    @Value("${spring.mqtt.username}")
    private String username;

    @Value("${spring.mqtt.password}")
    private String password;

    @Value("${spring.mqtt.url}")
    private String hostUrl;

    @Value("${spring.mqtt.client.id}")
    private String clientId;

    @Value("${spring.mqtt.default.topic}")
    private String defaultTopic;

    @Bean
    public MqttConnectOptions getMqttConnectOptions(){
        MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();
        mqttConnectOptions.setUserName(username);
        mqttConnectOptions.setPassword(password.toCharArray());
        mqttConnectOptions.setServerURIs(new String[]{hostUrl});
        mqttConnectOptions.setKeepAliveInterval(2);
        return mqttConnectOptions;
    }

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(getMqttConnectOptions());
        return factory;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler =  new MqttPahoMessageHandler(clientId, mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic(defaultTopic);
        return messageHandler;
    }

    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }
  }  
  

这里有点问题,如果你是复制我的代码的话MessageHandler 这个类是没有的需要自己手动导包,看了源码发现这里需要的是一个消息处理的handler需要是org.springframework.messaging.MessageHandler的实现,直接导入这个包就行了

@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MsgWriter {

    void sendToMqtt(String data);
    void sendToMqtt(String payload,@Header(MqttHeaders.TOPIC) String topic);
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);

}

这个是消息发送接口,需要发送消息的时候直接调用就行了,提供了几个重载方法payload或者data是发送消息的内容
topic是消息发送的主题,这里可以自己灵活定义,也可以使用默认的主题,就是配置文件的主题,qos是mqtt 对消息处理的几种机制分为0,1,2 其中0表示的是订阅者没收到消息不会再次发送,消息会丢失,1表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息,2相比多了一次去重的动作,确保订阅者收到的消息有一次
当然,这三种模式下的性能肯定也不一样,qos=0是最好的,2是最差的 ,有兴趣的可以去详细了解我在这不多赘述

上面就完成了消息的发送,可以去http://localhost:61680/console/index.html看看消息的记录,这里我写了一个接口调用sendToMqtt方法发送一条消息

clipboard.png

会看到收到有两个主题,我的是因为我订阅了两个主题所以上面显示的是两个,我的刚才发布消息的主题是too所以打开会看到too有消息送达过来

clipboard.png

如果你还没写订阅方的话consumers是没有的,现在显示我发了7条消息,证明发送成功了

接下来就是订阅方,为了方便我就直接写在启动类上了,没有用到所有的配置

@SpringBootApplication
@EnableAutoConfiguration
public class MytestApplication {

    public static void main(String[] args) {
        SpringApplication.run(MytestApplication.class, args);
    }


    @Value("${spring.mqtt.server.id}")
    private String serverId;

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setServerURIs("tcp://localhost:61613");
        factory.setUserName("admin");
        factory.setPassword("password");
        return factory;
    }

//     consumer 订阅者监听消息

    @Bean
    public IntegrationFlow mqttInFlow() {
        return IntegrationFlows.from(mqttInbound())
                .transform(p -> p + ", received from MQTT")
                .handle(logger())
                .get();
    }

    private LoggingHandler logger() {
        LoggingHandler loggingHandler = new LoggingHandler("INFO");
        loggingHandler.setLoggerName("siSample");
        return loggingHandler;
    }

    @Bean
    public MessageProducerSupport mqttInbound() {
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(serverId,
                mqttClientFactory(), "too");
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        return adapter;
    }

}

这里订阅的主题可以指定,我订阅的是刚才发的too主题,还有订阅方的id 别和发送方的id 一样
重新启动项目,发送消息,会发现控制台已经打印出消息

clipboard.png

代表订阅方已经成功收到消息,同时

clipboard.png

也显示消息订阅方和记录,至此一个完整的消息发送和订阅完成,比较简单,但是一不留神很容易出现问题,希望能帮助到新入门的人

查看原文

赞 8 收藏 4 评论 4

认证与成就

  • 获得 13 次点赞
  • 获得 1 枚徽章 获得 0 枚金徽章, 获得 0 枚银徽章, 获得 1 枚铜徽章

擅长技能
编辑

开源项目 & 著作
编辑

(゚∀゚ )
暂时没有

注册于 2018-10-30
个人主页被 67 人浏览