Charles

Charles 查看完整档案

广州编辑  |  填写毕业院校  |  填写所在公司/组织填写个人主网站
编辑

14年入行,后端开发

个人动态

Charles 赞了文章 · 11月9日

非常值得一看的 Curl 用法指南

导读

curl 是常用的命令行工具,用来请求 Web 服务器。它的名字就是客户端(client)的 URL 工具的意思。

它的功能非常强大,命令行参数多达几十种。如果熟练的话,完全可以取代 Postman 这一类的图形界面工具。

本文介绍它的主要命令行参数,作为日常的参考,方便查阅。内容主要翻译自《curl cookbook》。为了节约篇幅,下面的例子不包括运行时的输出,初学者可以先看我以前写的《curl 初学者教程》。

不带有任何参数时,curl 就是发出 GET 请求。

$ curl https://www.example.com

上面命令向www.example.com发出 GET 请求,服务器返回的内容会在命令行输出。

  • -A参数指定客户端的用户代理标头,即User-Agent。curl 的默认用户代理字符串是curl/[version]。
$ curl -A 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.100 Safari/537.36' https://google.com

上面命令将User-Agent改成 Chrome 浏览器。

$ curl -A '' https://google.com

上面命令会移除User-Agent标头。

也可以通过-H参数直接指定标头,更改User-Agent。

$ curl -H 'User-Agent: php/1.0' https://google.com
  • -b参数用来向服务器发送 Cookie。
$ curl -b 'foo=bar' https://google.com

上面命令会生成一个标头Cookie: foo=bar,向服务器发送一个名为foo、值为bar的 Cookie。

$ curl -b 'foo1=bar' -b 'foo2=baz' https://google.com

上面命令发送两个 Cookie。

$ curl -b cookies.txt https://www.google.com

上面命令读取本地文件cookies.txt,里面是服务器设置的 Cookie(参见-c参数),将其发送到服务器。

  • -c参数将服务器设置的 Cookie 写入一个文件。
$ curl -c cookies.txt https://www.google.com

上面命令将服务器的 HTTP 回应所设置 Cookie 写入文本文件cookies.txt。

  • -d参数用于发送 POST 请求的数据体。
$ curl -d'login=emma&password=123'-X POST https://google.com/login
# 或者
$ curl -d 'login=emma' -d 'password=123' -X POST  https://google.com/login

使用-d参数以后,HTTP 请求会自动加上标头Content-Type : application/x-www-form-urlencoded。并且会自动将请求转为 POST 方法,因此可以省略-X POST。

  • -d参数可以读取本地文本文件的数据,向服务器发送。
$ curl -d '@data.txt' https://google.com/login

上面命令读取data.txt文件的内容,作为数据体向服务器发送。

  • --data-urlencode

--data-urlencode参数等同于-d,发送 POST 请求的数据体,区别在于会自动将发送的数据进行 URL 编码。

$ curl --data-urlencode 'comment=hello world' https://google.com/login

上面代码中,发送的数据hello world之间有一个空格,需要进行 URL 编码。

  • -e参数用来设置 HTTP 的标头Referer,表示请求的来源。
curl -e 'https://google.com?q=example' https://www.example.com

上面命令将Referer标头设为https://google.com?q=example

  • -H参数可以通过直接添加标头Referer,达到同样效果。
curl -H 'Referer: https://google.com?q=example' https://www.example.com
  • -F参数用来向服务器上传二进制文件。
$ curl -F 'file=@photo.png' https://google.com/profile

上面命令会给 HTTP 请求加上标头Content-Type: multipart/form-data,然后将文件photo.png作为file字段上传。

  • -F参数可以指定 MIME 类型。
$ curl -F 'file=@photo.png;type=image/png' https://google.com/profile

上面命令指定 MIME 类型为image/png,否则 curl 会把 MIME 类型设为application/octet-stream。

  • -F参数也可以指定文件名。
$ curl -F 'file=@photo.png;filename=me.png' https://google.com/profile

上面命令中,原始文件名为photo.png,但是服务器接收到的文件名为me.png。

  • -G参数用来构造 URL 的查询字符串。
$ curl -G -d 'q=kitties' -d 'count=20' https://google.com/search

上面命令会发出一个 GET 请求,实际请求的 URL 为https://google.com/search?q=k...。如果省略--G,会发出一个 POST 请求。

如果数据需要 URL 编码,可以结合--data--urlencode参数。

$ curl -G --data-urlencode 'comment=hello world' https://www.example.com
  • -H参数添加 HTTP 请求的标头。
$ curl -H 'Accept-Language: en-US' https://google.com

上面命令添加 HTTP 标头Accept-Language: en-US。

$ curl -H 'Accept-Language: en-US' -H 'Secret-Message: xyzzy' https://google.com

上面命令添加两个 HTTP 标头。

$ curl -d '{"login": "emma", "pass": "123"}' -H 'Content-Type: application/json' https://google.com/login

上面命令添加 HTTP 请求的标头是Content-Type: application/json,然后用-d参数发送 JSON 数据。

  • -i参数打印出服务器回应的 HTTP 标头。
$ curl -i https://www.example.com

上面命令收到服务器回应后,先输出服务器回应的标头,然后空一行,再输出网页的源码。

  • -I参数向服务器发出 HEAD 请求,然会将服务器返回的 HTTP 标头打印出来。
$ curl -I https://www.example.com

上面命令输出服务器对 HEAD 请求的回应。

  • --head参数等同于-I。
$ curl --head https://www.example.com
  • -k参数指定跳过 SSL 检测。
$ curl -k https://www.example.com

上面命令不会检查服务器的 SSL 证书是否正确。

  • -L参数会让 HTTP 请求跟随服务器的重定向。curl 默认不跟随重定向。
$ curl -L -d 'tweet=hi' https://api.twitter.com/tweet
  • --limit-rate用来限制 HTTP 请求和回应的带宽,模拟慢网速的环境。
$ curl --limit-rate 200k https://google.com

上面命令将带宽限制在每秒 200K 字节。

  • -o参数将服务器的回应保存成文件,等同于wget命令。
$ curl -o example.html https://www.example.com

上面命令将www.example.com保存成example.html。

  • -O参数将服务器回应保存成文件,并将 URL 的最后部分当作文件名。
$ curl -O https://www.example.com/foo/bar.html

上面命令将服务器回应保存成文件,文件名为bar.html。

  • -s参数将不输出错误和进度信息。
$ curl -s https://www.example.com

上面命令一旦发生错误,不会显示错误信息。不发生错误的话,会正常显示运行结果。

如果想让 curl 不产生任何输出,可以使用下面的命令。

$ curl -s -o /dev/null https://google.com
  • -S参数指定只输出错误信息,通常与-o一起使用。
$ curl -s -o /dev/null https://google.com

上面命令没有任何输出,除非发生错误。

  • -u参数用来设置服务器认证的用户名和密码。
$ curl -u 'bob:12345' https://google.com/login

上面命令设置用户名为bob,密码为12345,然后将其转为 HTTP 标头Authorization: Basic Ym9iOjEyMzQ1。

curl 能够识别 URL 里面的用户名和密码。

$ curl https://bob:12345@google.com/login

上面命令能够识别 URL 里面的用户名和密码,将其转为上个例子里面的 HTTP 标头。

$ curl -u 'bob' https://google.com/login

上面命令只设置了用户名,执行后,curl 会提示用户输入密码。

  • -v参数输出通信的整个过程,用于调试。
$ curl -v https://www.example.com
  • --trace参数也可以用于调试,还会输出原始的二进制数据。
$ curl --trace - https://www.example.com
  • -x参数指定 HTTP 请求的代理。
$ curl -x socks5://james:cats@myproxy.com:8080 https://www.example.com

上面命令指定 HTTP 请求通过myproxy.com:8080的 socks5 代理发出。

如果没有指定代理协议,默认为 HTTP。

$ curl -x james:cats@myproxy.com:8080 https://www.example.com

上面命令中,请求的代理使用 HTTP 协议。

  • -X参数指定 HTTP 请求的方法。
$ curl -X POST https://www.example.com

上面命令对https://www.example.com发出 POST 请求。

_作者:阮一峰
原文:http://www.ruanyifeng.com/blo...

image

查看原文

赞 16 收藏 13 评论 0

Charles 收藏了文章 · 11月5日

为什么 Go 占用那么多的虚拟内存?

前段时间,某同学说某服务的容器因为超出内存限制,不断地重启,问我们是不是有内存泄露,赶紧排查,然后解决掉,省的出问题。我们大为震惊,赶紧查看监控+报警系统和性能分析,发现应用指标压根就不高,不像有泄露的样子。

那么问题是出在哪里了呢,我们进入某个容器里查看了 top 的系统指标,结果如下:

PID       VSZ    RSS   ... COMMAND
67459     2007m  136m  ... ./eddycjy-server

从结果上来看,也没什么大开销的东西,主要就一个 Go 进程,一看,某同学就说 VSZ 那么高,而某云上的容器内存指标居然恰好和 VSZ 的值相接近,因此某同学就怀疑是不是 VSZ 所导致的,觉得存在一定的关联关系。

而从最终的结论上来讲,上述的表述是不全对的,那么在今天,本篇文章将主要围绕 Go 进程的 VSZ 来进行剖析,看看到底它为什么那么 "高",而在正式开始分析前,第一节为前置的补充知识,大家可按顺序阅读。

基础知识

什么是 VSZ

VSZ 是该进程所能使用的虚拟内存总大小,它包括进程可以访问的所有内存,其中包括了被换出的内存(Swap)、已分配但未使用的内存以及来自共享库的内存。

为什么要虚拟内存

在前面我们有了解到 VSZ 其实就是该进程的虚拟内存总大小,那如果我们想了解 VSZ 的话,那我们得先了解 “为什么要虚拟内存?”

本质上来讲,在一个系统中的进程是与其他进程共享 CPU 和主存资源的,而在现代的操作系统中,多进程的使用非常的常见,那么如果太多的进程需要太多的内存,那么在没有虚拟内存的情况下,物理内存很可能会不够用,就会导致其中有些任务无法运行,更甚至会出现一些很奇怪的现象,例如 “某一个进程不小心写了另一个进程使用的内存”,就会造成内存破坏,因此虚拟内存是非常重要的一个媒介。

虚拟内存包含了什么

image

而虚拟内存,又分为内核虚拟内存和进程虚拟内存,每一个进程的虚拟内存都是独立的, 呈现如上图所示。

这里也补充说明一下,在内核虚拟内存中,是包含了内核中的代码和数据结构,而内核虚拟内存中的某些区域会被映射到所有进程共享的物理页面中去,因此你会看到 ”内核虚拟内存“ 中实际上是包含了 ”物理内存“ 的,它们两者存在映射关系。而在应用场景上来讲,每个进程也会去共享内核的代码和全局数据结构,因此就会被映射到所有进程的物理页面中去。

image

虚拟内存的重要能力

为了更有效地管理内存并且减少出错,现代系统提供了一种对主存的抽象概念,也就是今天的主角,叫做虚拟内存(VM),虚拟内存是硬件异常、硬件地址翻译、主存、磁盘文件和内核软件交互的地方,它为每个进程提供了一个大的、一致的和私有的地址空间,虚拟内存提供了三个重要的能力:

  1. 它将主存看成是一个存储在磁盘上的地址空间的高速缓存,在主存中只保存活动区域,并根据需要在磁盘和主存之间来回传送数据,通过这种方式,它高效地使用了主存。
  2. 它为每个进程提供了一致的地址空间,从而简化了内存管理。
  3. 它保护了每个进程的地址空间不被其他进程破坏。

小结

上面发散的可能比较多,简单来讲,对于本文我们重点关注这些知识点,如下:

  • 虚拟内存它是有各式各样内存交互的地方,它包含的不仅仅是 "自己",而在本文中,我们只需要关注 VSZ,也就是进程虚拟内存,它包含了你的代码、数据、堆、栈段和共享库
  • 虚拟内存作为内存保护的工具,能够保证进程之间的内存空间独立,不受其他进程的影响,因此每一个进程的 VSZ 大小都不一样,互不影响。
  • 虚拟内存的存在,系统给各进程分配的内存之和是可以大于实际可用的物理内存的,因此你也会发现你进程的物理内存总是比虚拟内存低的多的多。

排查问题

在了解了基础知识后,我们正式开始排查问题,第一步我们先编写一个测试程序,看看没有什么业务逻辑的 Go 程序,它初始的 VSZ 是怎么样的。

测试

应用代码:

func main() {
    r := gin.Default()
    r.GET("/ping", func(c *gin.Context) {
        c.JSON(200, gin.H{
            "message": "pong",
        })
    })
    r.Run(":8001")
}

查看进程情况:

$ ps aux 67459
USER      PID  %CPU %MEM      VSZ    RSS   ...
eddycjy 67459   0.0  0.0  4297048    960   ...

从结果上来看,VSZ 为 4297048K,也就是 4G 左右,咋一眼看过去还是挺吓人的,明明没有什么业务逻辑,但是为什么那么高呢,真是令人感到好奇。

确认有没有泄露

在未知的情况下,我们可以首先看下 runtime.MemStatspprof,确定应用到底有没有泄露。不过我们这块是演示程序,什么业务逻辑都没有,因此可以确定和应用没有直接关系。

# runtime.MemStats
# Alloc = 1298568
# TotalAlloc = 1298568
# Sys = 71893240
# Lookups = 0
# Mallocs = 10013
# Frees = 834
# HeapAlloc = 1298568
# HeapSys = 66551808
# HeapIdle = 64012288
# HeapInuse = 2539520
# HeapReleased = 64012288
# HeapObjects = 9179
...

Go FAQ

接着我第一反应是去翻了 Go FAQ(因为看到过,有印象),其问题为 "Why does my Go process use so much virtual memory?",回答如下:

The Go memory allocator reserves a large region of virtual memory as an arena for allocations. This virtual memory is local to the specific Go process; the reservation does not deprive other processes of memory.

To find the amount of actual memory allocated to a Go process, use the Unix top command and consult the RES (Linux) or RSIZE (macOS) columns.

这个 FAQ 是在 2012 年 10 月 提交 的,这么多年了也没有更进一步的说明,再翻了 issues 和 forum,一些关闭掉的 issue 都指向了 FAQ,这显然无法满足我的求知欲,因此我继续往下探索,看看里面到底都摆了些什么。

查看内存映射

在上图中,我们有提到进程虚拟内存,主要包含了你的代码、数据、堆、栈段和共享库,那初步怀疑是不是进程做了什么内存映射,导致了大量的内存空间被保留呢,为了确定这一点,我们通过如下命令去排查:

$ vmmap --wide 67459
...
==== Non-writable regions for process 67459
REGION TYPE                      START - END             [ VSIZE  RSDNT  DIRTY   SWAP] PRT/MAX SHRMOD PURGE    REGION DETAIL
__TEXT                 00000001065ff000-000000010667b000 [  496K   492K     0K     0K] r-x/rwx SM=COW          /bin/zsh
__LINKEDIT             0000000106687000-0000000106699000 [   72K    44K     0K     0K] r--/rwx SM=COW          /bin/zsh
MALLOC metadata        000000010669b000-000000010669c000 [    4K     4K     4K     0K] r--/rwx SM=COW          DefaultMallocZone_0x10669b000 zone structure
...
__TEXT                 00007fff76c31000-00007fff76c5f000 [  184K   168K     0K     0K] r-x/r-x SM=COW          /usr/lib/system/libxpc.dylib
__LINKEDIT             00007fffe7232000-00007ffff32cb000 [192.6M  17.4M     0K     0K] r--/r-- SM=COW          dyld shared cache combined __LINKEDIT
...        

==== Writable regions for process 67459
REGION TYPE                      START - END             [ VSIZE  RSDNT  DIRTY   SWAP] PRT/MAX SHRMOD PURGE    REGION DETAIL
__DATA                 000000010667b000-0000000106682000 [   28K    28K    28K     0K] rw-/rwx SM=COW          /bin/zsh
...   
__DATA                 0000000106716000-000000010671e000 [   32K    28K    28K     4K] rw-/rwx SM=COW          /usr/lib/zsh/5.3/zsh/zle.so
__DATA                 000000010671e000-000000010671f000 [    4K     4K     4K     0K] rw-/rwx SM=COW          /usr/lib/zsh/5.3/zsh/zle.so
__DATA                 0000000106745000-0000000106747000 [    8K     8K     8K     0K] rw-/rwx SM=COW          /usr/lib/zsh/5.3/zsh/complete.so
__DATA                 000000010675a000-000000010675b000 [    4K     4K     4K     0K] rw-
...

这块主要是利用 macOS 的 vmmap 命令去查看内存映射情况,这样就可以知道这个进程的内存映射情况,从输出分析来看,这些关联共享库占用的空间并不大,导致 VSZ 过高的根本原因不在共享库和二进制文件上,但是并没有发现大量保留内存空间的行为,这是一个问题点

注:若是 Linux 系统,可使用 cat /proc/PID/mapscat /proc/PID/smaps 查看。

查看系统调用

既然在内存映射中,我们没有明确的看到保留内存空间的行为,那我们接下来看看该进程的系统调用,确定一下它是否存在内存操作的行为,如下:

$ sudo dtruss -a ./awesomeProject
...
 4374/0x206a2:     15620       6      3 mprotect(0x1BC4000, 0x1000, 0x0)         = 0 0
...
 4374/0x206a2:     15781       9      4 sysctl([CTL_HW, 3, 0, 0, 0, 0] (2), 0x7FFEEFBFFA64, 0x7FFEEFBFFA68, 0x0, 0x0)         = 0 0
 4374/0x206a2:     15783       3      1 sysctl([CTL_HW, 7, 0, 0, 0, 0] (2), 0x7FFEEFBFFA64, 0x7FFEEFBFFA68, 0x0, 0x0)         = 0 0
 4374/0x206a2:     15899       7      2 mmap(0x0, 0x40000, 0x3, 0x1002, 0xFFFFFFFFFFFFFFFF, 0x0)         = 0x4000000 0
 4374/0x206a2:     15930       3      1 mmap(0xC000000000, 0x4000000, 0x0, 0x1002, 0xFFFFFFFFFFFFFFFF, 0x0)         = 0xC000000000 0
 4374/0x206a2:     15934       4      2 mmap(0xC000000000, 0x4000000, 0x3, 0x1012, 0xFFFFFFFFFFFFFFFF, 0x0)         = 0xC000000000 0
 4374/0x206a2:     15936       2      0 mmap(0x0, 0x2000000, 0x3, 0x1002, 0xFFFFFFFFFFFFFFFF, 0x0)         = 0x59B7000 0
 4374/0x206a2:     15942       2      0 mmap(0x0, 0x210800, 0x3, 0x1002, 0xFFFFFFFFFFFFFFFF, 0x0)         = 0x4040000 0
 4374/0x206a2:     15947       2      0 mmap(0x0, 0x10000, 0x3, 0x1002, 0xFFFFFFFFFFFFFFFF, 0x0)         = 0x1BD0000 0
 4374/0x206a2:     15993       3      0 madvise(0xC000000000, 0x2000, 0x8)         = 0 0
 4374/0x206a2:     16004       2      0 mmap(0x0, 0x10000, 0x3, 0x1002, 0xFFFFFFFFFFFFFFFF, 0x0)         = 0x1BE0000 0
...

在这小节中,我们通过 macOS 的 dtruss 命令监听并查看了运行这个程序所进行的所有系统调用,发现了与内存管理有一定关系的方法如下:

  • mmap:创建一个新的虚拟内存区域,但这里需要注意,就是当系统调用 mmap 时,它只是从虚拟内存中申请了一段空间出来,并不会去分配和映射真实的物理内存,而当你访问这段空间的时候,才会在当前时间真正的去分配物理内存。那么对应到我们实际应用的进程中,那就是 VSZ 的增长后,而该内存空间又未正式使用的话,物理内存是不会有增长的。
  • madvise:提供有关使用内存的建议,例如:MADV_NORMAL、MADV_RANDOM、MADV_SEQUENTIAL、MADV_WILLNEED、MADV_DONTNEED 等等。
  • mprotect:设置内存区域的保护情况,例如:PROT_NONE、PROT_READ、PROT_WRITE、PROT_EXEC、PROT_SEM、PROT_SAO、PROT_GROWSUP、PROT_GROWSDOWN 等等。
  • sysctl:在内核运行时动态地修改内核的运行参数。

在此比较可疑的是 mmap 方法,它在 dtruss 的最终统计中一共调用了 10 余次,我们可以相信它在 Go Runtime 的时候进行了大量的虚拟内存申请,我们再接着往下看,看看到底是在什么阶段进行了虚拟内存空间的申请。

注:若是 Linux 系统,可使用 strace 命令。

查看 Go Runtime

启动流程

通过上述的分析,我们可以知道在 Go 程序启动的时候 VSZ 就已经不低了,并且确定不是共享库等的原因,且程序在启动时系统调用确实存在 mmap 等方法的调用,那么我们可以充分怀疑 Go 在初始化阶段就保留了该内存空间。那我们第一步要做的就是查看一下 Go 的引导启动流程,看看是在哪里申请的,引导过程如下:

graph TD
A(rt0_darwin_amd64.s:8<br/>_rt0_amd64_darwin) -->|JMP| B(asm_amd64.s:15<br/>_rt0_amd64)
B --> |JMP|C(asm_amd64.s:87<br/>runtime-rt0_go)
C --> D(runtime1.go:60<br/>runtime-args)
D --> E(os_darwin.go:50<br/>runtime-osinit)
E --> F(proc.go:472<br/>runtime-schedinit)
F --> G(proc.go:3236<br/>runtime-newproc)
G --> H(proc.go:1170<br/>runtime-mstart)
H --> I(在新创建的 p 和 m 上运行 runtime-main)
  • runtime-osinit:获取 CPU 核心数。
  • runtime-schedinit:初始化程序运行环境(包括栈、内存分配器、垃圾回收、P等)。
  • runtime-newproc:创建一个新的 G 和 绑定 runtime.main。
  • runtime-mstart:启动线程 M。

注:来自@曹大的 《Go 程序的启动流程》和@全成的 《Go 程序是怎样跑起来的》,推荐大家阅读。

初始化运行环境

显然,我们要研究的是 runtime 里的 schedinit 方法,如下:

func schedinit() {
    ...
    stackinit()
    mallocinit()
    mcommoninit(_g_.m)
    cpuinit()       // must run before alginit
    alginit()       // maps must not be used before this call
    modulesinit()   // provides activeModules
    typelinksinit() // uses maps, activeModules
    itabsinit()     // uses activeModules

    msigsave(_g_.m)
    initSigmask = _g_.m.sigmask

    goargs()
    goenvs()
    parsedebugvars()
    gcinit()
  ...
}

从用途来看,非常明显, mallocinit 方法会进行内存分配器的初始化,我们继续往下看。

初始化内存分配器

mallocinit

接下来我们正式的分析一下 mallocinit 方法,在引导流程中, mallocinit 主要承担 Go 程序的内存分配器的初始化动作,而今天主要是针对虚拟内存地址这块进行拆解,如下:

func mallocinit() {
    ...
    if sys.PtrSize == 8 {
        for i := 0x7f; i >= 0; i-- {
            var p uintptr
            switch {
            case GOARCH == "arm64" && GOOS == "darwin":
                p = uintptr(i)<<40 | uintptrMask&(0x0013<<28)
            case GOARCH == "arm64":
                p = uintptr(i)<<40 | uintptrMask&(0x0040<<32)
            case GOOS == "aix":
                if i == 0 {
                    continue
                }
                p = uintptr(i)<<40 | uintptrMask&(0xa0<<52)
            case raceenabled:
                ...
            default:
                p = uintptr(i)<<40 | uintptrMask&(0x00c0<<32)
            }
            hint := (*arenaHint)(mheap_.arenaHintAlloc.alloc())
            hint.addr = p
            hint.next, mheap_.arenaHints = mheap_.arenaHints, hint
        }
    } else {
      ...
    }
}
  • 判断当前是 64 位还是 32 位的系统。
  • 从 0x7fc000000000~0x1c000000000 开始设置保留地址。
  • 判断当前 GOARCHGOOS 或是否开启了竞态检查,根据不同的情况申请不同大小的连续内存地址,而这里的 p 是即将要要申请的连续内存地址的开始地址。
  • 保存刚刚计算的 arena 的信息到 arenaHint 中。

可能会有小伙伴问,为什么要判断是 32 位还是 64 位的系统,这是因为不同位数的虚拟内存的寻址范围是不同的,因此要进行区分,否则会出现高位的虚拟内存映射问题。而在申请保留空间时,我们会经常提到 arenaHint 结构体,它是 arenaHints 链表里的一个节点,结构如下:

type arenaHint struct {
    addr uintptr
    down bool
    next *arenaHint
}
  • addr:arena 的起始地址
  • down:是否最后一个 arena
  • next:下一个 arenaHint 的指针地址

那么这里疯狂提到的 arena 又是什么东西呢,这其实是 Go 的内存管理中的概念,Go Runtime 会把申请的虚拟内存分为三个大块,如下:

image

  • spans:记录 arena 区域页号和 mspan 的映射关系。
  • bitmap:标识 arena 的使用情况,在功能上来讲,会用于标识 arena 的哪些空间地址已经保存了对象。
  • arean:arean 其实就是 Go 的堆区,是由 mheap 进行管理的,它的 MaxMem 是 512GB-1。而在功能上来讲,Go 会在初始化的时候申请一段连续的虚拟内存空间地址到 arean 保留下来,在真正需要申请堆上的空间时再从 arean 中取出来处理,这时候就会转变为物理内存了。

在这里的话,你需要理解 arean 区域在 Go 内存里的作用就可以了。

mmap

我们刚刚通过上述的分析,已经知道 mallocinit 的用途了,但是你可能还是会有疑惑,就是我们之前所看到的 mmap 系统调用,和它又有什么关系呢,怎么就关联到一起了,接下来我们先一起来看看更下层的代码,如下:

func sysAlloc(n uintptr, sysStat *uint64) unsafe.Pointer {
    p, err := mmap(nil, n, _PROT_READ|_PROT_WRITE, _MAP_ANON|_MAP_PRIVATE, -1, 0)
    ...
    mSysStatInc(sysStat, n)
    return p
}

func sysReserve(v unsafe.Pointer, n uintptr) unsafe.Pointer {
    p, err := mmap(v, n, _PROT_NONE, _MAP_ANON|_MAP_PRIVATE, -1, 0)
    ...
}

func sysMap(v unsafe.Pointer, n uintptr, sysStat *uint64) {
    ...
    munmap(v, n)
    p, err := mmap(v, n, _PROT_READ|_PROT_WRITE, _MAP_ANON|_MAP_FIXED|_MAP_PRIVATE, -1, 0)
  ...
}

在 Go Runtime 中存在着一系列的系统级内存调用方法,本文涉及的主要如下:

  • sysAlloc:从 OS 系统上申请清零后的内存空间,调用参数是 _PROT_READ|_PROT_WRITE, _MAP_ANON|_MAP_PRIVATE,得到的结果需进行内存对齐。
  • sysReserve:从 OS 系统中保留内存的地址空间,这时候还没有分配物理内存,调用参数是 _PROT_NONE, _MAP_ANON|_MAP_PRIVATE,得到的结果需进行内存对齐。
  • sysMap:通知 OS 系统我们要使用已经保留了的内存空间,调用参数是 _PROT_READ|_PROT_WRITE, _MAP_ANON|_MAP_FIXED|_MAP_PRIVATE

看上去好像很有道理的样子,但是 mallocinit 方法在初始化时,到底是在哪里涉及了 mmap 方法呢,表面看不出来,如下:

for i := 0x7f; i >= 0; i-- {
    ...
    hint := (*arenaHint)(mheap_.arenaHintAlloc.alloc())
    hint.addr = p
    hint.next, mheap_.arenaHints = mheap_.arenaHints, hint
}

实际上在调用 mheap_.arenaHintAlloc.alloc() 时,调用的是 mheap 下的 sysAlloc 方法,而 sysAlloc 又会与 mmap 方法产生调用关系,并且这个方法与常规的 sysAlloc 还不大一样,如下:

var mheap_ mheap
...
func (h *mheap) sysAlloc(n uintptr) (v unsafe.Pointer, size uintptr) {
    ...
    for h.arenaHints != nil {
        hint := h.arenaHints
        p := hint.addr
        if hint.down {
            p -= n
        }
        if p+n < p {
            v = nil
        } else if arenaIndex(p+n-1) >= 1<<arenaBits {
            v = nil
        } else {
            v = sysReserve(unsafe.Pointer(p), n)
        }
        ...
}

你可以惊喜的发现 mheap.sysAlloc 里其实有调用 sysReserve 方法,而 sysReserve 方法又正正是从 OS 系统中保留内存的地址空间的特定方法,是不是很惊喜,一切似乎都串起来了。

小结

在本节中,我们先写了一个测试程序,然后根据非常规的排查思路进行了一步步的跟踪怀疑,整体流程如下:

  • 通过 topps 等命令,查看进程运行情况,分析基础指标。
  • 通过 pprofruntime.MemStats 等工具链查看应用运行情况,分析应用层面是否有泄露或者哪儿高。
  • 通过 vmmap 命令,查看进程的内存映射情况,分析是不是进程虚拟空间内的某个区域比较高,例如:共享库等。
  • 通过 dtruss 命令,查看程序的系统调用情况,分析可能出现的一些特殊行为,例如:在分析中我们发现 mmap 方法调用的比例是比较高的,那我们有充分的理由怀疑 Go 在启动时就进行了大量的内存空间保留。
  • 通过上述的分析,确定可能是在哪个环节申请了那么多的内存空间后,再到 Go Runtime 中去做进一步的源码分析,因为源码面前,了无秘密,没必要靠猜。

从结论上而言,VSZ(进程虚拟内存大小)与共享库等没有太大的关系,主要与 Go Runtime 存在直接关联,也就是在前图中表示的运行时堆(malloc)。转换到 Go Runtime 里,就是在 mallocinit 这个内存分配器的初始化阶段里进行了一定量的虚拟空间的保留。

而保留虚拟内存空间时,受什么影响,又是一个哲学问题。从源码上来看,主要如下:

  • 受不同的 OS 系统架构(GOARCH/GOOS)和位数(32/64 位)的影响。
  • 受内存对齐的影响,计算回来的内存空间大小是需要经过对齐才会进行保留。

总结

我们通过一步步地分析,讲解了 Go 会在哪里,又会受什么因素,去调用了什么方法保留了那么多的虚拟内存空间,但是我们肯定会忧心进程虚拟内存(VSZ)高,会不会存在问题呢,我分析如下:

  • VSZ 并不意味着你真正使用了那些物理内存,因此是不需要担心的。
  • VSZ 并不会给 GC 带来压力,GC 管理的是进程实际使用的物理内存,而 VSZ 在你实际使用它之前,它并没有过多的代价。
  • VSZ 基本都是不可访问的内存映射,也就是它并没有内存的访问权限(不允许读、写和执行)。

思考

看到这里舒一口气,因为 Go VSZ 的高,并不会对我们产生什么非常实质性的问题,但是又仔细一想,为什么 Go 要申请那么多的虚拟内存呢?

总体考虑如下:

  • Go 的设计是考虑到 arenabitmap 的后续使用,先提早保留了整个内存地址空间。
  • 随着 Go Runtime 和应用的逐步使用,肯定也会开始实际的申请和使用内存,这时候 arenabitmap 的内存分配器就只需要将事先申请好的内存地址空间保留更改为实际可用的物理内存就好了,这样子可以极大的提高效能。

我的公众号

分享 Go 语言、微服务架构和奇怪的系统设计,欢迎大家关注我的公众号和我进行交流和沟通。

最好的关系是互相成就,各位的点赞就是煎鱼创作的最大动力,感谢支持。

查看原文

Charles 收藏了文章 · 11月5日

Redis还可以做哪些事?

上一篇文章中,讲到了redis五大基本数据类型的使用场景,除了string,hash,list,set,zset之外,redis还提供了一些其他的数据结构(当然,严格意义上也不算数据结构),一起来看看redis还可以做哪些事?

一 Bitmaps

在计算机中,使用二进制做为信息的基础单元,也就是输入的任何信息,最终在计算机底层都会转会为一串二进制的数字。在redis中,提供了Bitmaps来进行位操作。我们可以把Bitmaps想象成一个以位为单位的数组,数组的下标叫做偏移量。使用Bitmaps的优势就是占用空间更少。

假如我们想记录员工今天是否登录过公司官网,我们可以日期做为key,员工id做为偏移量(这里员工id在数据库中是自增的),如果id是从1000开始,为了节省空间,一般会将员工id减去这个初始值来做为偏移量,偏移量一般从0开始。是否访问官网用01来表示。

这样的话,id为3的员工访问了官网,就将他的值写成1

# id为3的员工访问了官网
setbit user:2020-11-04 3 1
# id为18的员工访问了官网
setbit user:2020-11-04 18 1

查看某个员工是否访问过官网

getbit user:2020-11-04 1

查询指定范围(字节)内值为1的个数,比如我想查看id从1-30之间有多少员工访问了官网

bitcount user:2020-11-04 0 3

二 HyperLogLog

HyperLogLog可以利用极小的内存空间完成数据统计,无法获取单条数据,只能做为统计使用,会有一定的误差率。

假如我想统计访问官网的IP地址

添加官网今天访问的ip列表

# 2020-11-04访问的ip
pfadd 2020-11-04:ip "ip1" "ip2" "ip3" 
# 2020-11-05访问的ip
pfadd 2020-11-05:ip "ip3" "ip4" "ip5" 

计算今天官网访问的ip数

pfcount 2020-11-04:ip

返回结果为3

查看2020-11-04和2020-11-05这两天总共有多少个独立ip访问过网站

先将两天的数据做并集,并复制给某个值

pfmerge 2020-11:ip 2020-11-04:ip 2020-11-05:ip

然后使用pfcount命令查询,获得的值为5

pfcount 2020-11:ip

三 GEO

Redis3.2版本中增加了GEO(地理位置定位)功能,可以使用此功能来获取附近的人。

添加命令如下,可批量添加

geoadd city longitud latitude member

我们添加几个城市的位置信息,来获取某个城市附近的城市

geoadd city 116.28 39.55 beijing 117.12 39.08 tianjin

获取北京的经纬度命令如下

geopos city beijing

查看beijing和tianjin两座城市的距离

geodist city beijing tianjin km

最后面的km表示距离单位是公里,支持的单位有以下几个:

  • m,米
  • km,千米
  • mi,英里
  • ft,尺

获取附近的位置有两个命令,georadius根据经纬度获取,georadiusbymember根据成员获取

georadius key longitude laitude [单位]
georadiusbymember key member [单位]

后面还可以跟非必须参数,参数分别如下

  • withcoord:返回结果中包含经纬度
  • withdist:返回结果中包含距离中心位置的距离
  • withhash:返回结果中包含geohash(就是将经纬度转换为hash值)
  • COUNT count:指定返回结果的数量
  • asc|desc:返回结果按距离中心位置的距离排序
  • store key:将返回结果的地理位置信息保存到指定key中
  • storedist key:将返回结果距离中心位置的距离保存到指定key中

四 发布订阅模式消息

上一篇文章中讲到了可以使用list和zset来实现消息队列,但是上面实现的消息队列是点对点模式,也就是一条消息只能由一个消费者来消费。除此之外,redis还支持发布订阅模式,即一个消息由所有订阅者消费,比如广播、公告等等,发布一条公告后,所有关注了我的用户都可以收到这条公告。

  1. 发布消息

发布到信道channel:message一条消息,消息内容为hi

pulish channel:message hi
  1. 订阅信道

订阅者可以订阅一个或多个信道,比如订阅channel:message

subscribe channel:message
  1. 取消订阅
unsubscribe channel:message
  1. 查看活跃信道
pubsub channels
  1. 查看订阅数

查看信道channel:message订阅个数

pubsub numsub channel:message

redis的发布订阅模式和专业的消息中间件相比,略显粗糙,但是实现起来非常简单,学习成本较低。

五 Bloom Filter

布隆过滤器是redis4版本中新增的一个功能。其实现原理和Bitmaps差不多,也是利用一个位数组,将你的值经过多个hash函数,得到对应的位数组的位置,将这些值设置为1。布隆过滤器经常别用来防止缓存穿透。

存在的问题,如果说某个元素不存在,则一定不存在,如果说某个元素存在,则可能不存在。这是因为如果有三个元素abc要放入同一个数组中去,假设a经过三次hash,得到1,5,7三个位置,那么就会将这三个位置修改成1b经过三次hash,得到2,4,6三个位置,将这三个位置修改成1c经过三次hash得到2,5,7三个位置,但是经过前两个元素hash后,这三个位置已经修改成1了,那么我们能说c一定存在吗?显然不能!


点关注、不迷路

如果觉得文章不错,欢迎关注点赞收藏,你们的支持是我创作的动力,感谢大家。

如果文章写的有问题,请不要吝惜文笔,欢迎留言指出,我会及时核查修改。

如果你还想更加深入的了解我,可以微信搜索「Java旅途」进行关注。回复「1024」即可获得学习视频及精美电子书。每天7:30准时推送技术文章,让你的上班路不在孤独,而且每月还有送书活动,助你提升硬实力!

查看原文

Charles 收藏了文章 · 11月5日

一文彻底了解 CDN 加速原理

一、什么是 CDN

CDN 的全称是(Content Delivery Network),即内容分发网络。其目的是通过在现有的Internet中增加一层新的CACHE(缓存)层,将网站的内容发布到最接近用户的网络”边缘“的节点,使用户可以就近取得所需的内容,提高用户访问网站的响应速度。从技术上全面解决由于网络带宽小、用户访问量大、网点分布不均等原因,提高用户访问网站的响应速度。

简单的说,CDN 的工作原理就是将您源站的资源缓存到位于全球各地的 CDN 节点上,用户请求资源时,就近返回节点上缓存的资源,而不需要每个用户的请求都回您的源站获取,避免网络拥塞、缓解源站压力,保证用户访问资源的速度和体验。

CDN 对网络的优化作用主要体现在如下几个方面

  • 解决服务器端的“第一公里”问题
  • 缓解甚至消除了不同运营商之间互联的瓶颈造成的影响
  • 减轻了各省的出口带宽压力
  • 缓解了骨干网的压力
  • 优化了网上热点内容的分布

二、CDN工作原理

传统访问过程由上图可见,用户访问未使用CDN缓存网站的过程为:

  • 用户输入访问的域名,操作系统向 LocalDns 查询域名的 ip 地址
  • LocalDns向 ROOT DNS 查询域名的授权服务器(这里假设LocalDns缓存过期)
  • ROOT DNS将域名授权 dns记录回应给 LocalDns
  • LocalDns 得到域名的授权 dns 记录后,继续向域名授权 dns 查询域名的 ip 地址
  • 域名授权 dns 查询域名记录后,回应给 LocalDns
  • LocalDns 将得到的域名ip地址,回应给 用户端
  • 用户得到域名 ip 地址后,访问站点服务器
  • 站点服务器应答请求,将内容返回给客户端

CDN 访问过程

通过上图,我们可以了解到,使用了CDN缓存后的网站的访问过程变为:

  • 用户输入访问的域名,操作系统向 LocalDns 查询域名的ip地址.
  • LocalDns向 ROOT DNS 查询域名的授权服务器(这里假设LocalDns缓存过期)
  • ROOT DNS将域名授权dns记录回应给 LocalDns
  • LocalDns得到域名的授权dns记录后,继续向域名授权dns查询域名的ip地址
  • 域名授权dns 查询域名记录后(一般是CNAME),回应给 LocalDns
  • LocalDns 得到域名记录后,向智能调度DNS查询域名的ip地址
  • 智能调度DNS 根据一定的算法和策略(比如静态拓扑,容量等),将最适合的CDN节点ip地址回应给 LocalDns
  • L-ocalDns 将得到的域名ip地址,回应给 用户端
  • 用户得到域名ip地址后,访问站点服务器
  • CDN 节点服务器应答请求,将内容返回给客户端。(缓存服务器一方面在本地进行保存,以备以后使用,二方面把获取的数据返回给客户端,完成数据服务过程)

通过以上的分析我们可以得到,为了实现对普通用户透明(使用缓存后用户客户端无需进行任何设置)访问,需要使用 DNS(域名解析)来引导用户来访问 Cache 服务器,以实现透明的加速服务。由于用户访问网站的第一步就是域名解析,所以通过修改dns来引导用户访问是最简单有效的方式。

CDN网络的组成要素

对于普通的 Internet 用户,每个 CDN 节点就相当于一个放置在它周围的网站服务器。 通过对 DNS 的接管,用户的请求被透明地指向离他最近的节点,节点中 CDN 服务器会像网站的原始服务器一样,响应用户的请求。 由于它离用户更近,因而响应时间必然更快。 从上面图中虚线圈起来的那块,就是 CDN 层,这层是位于用户端和站点服务器之间。

智能调度 DNS(比如 f5 的 3DNS)

  • 智能调度DNS是CDN服务中的关键系统.当用户访问加入CDN服务的网站时,域名解析请求将最终由 “智能调度DNS”负责处理。
  • 它通过一组预先定义好的策略,将当时最接近用户的节点地址提供给用户,使用户可以得到快速的服务。
  • 同时它需要与分布在各地的CDN节点保持通信,跟踪各节点的健康状态、容量等信息,确保将用户的请求分配到就近可用的节点上.

缓存功能服务

  • 负载均衡设备(如lvs,F5的BIG/IP)
  • 内容Cache服务器(如squid)
  • 共享存储

三、名词解释

CNAME记录(CNAME record)

CNAME即别名( Canonical Name );可以用来把一个域名解析到另一个域名,当 DNS 系统在查询 CNAME 左面的名称的时候,都会转向 CNAME 右面的名称再进行查询,一直追踪到最后的 PTR 或 A 名称,成功查询后才会做出回应,否则失败。

例如,你有一台服务器上存放了很多资料,你使用docs.example.com去访问这些资源,但又希望通过documents.example.com也能访问到这些资源,那么你就可以在您的DNS解析服务商添加一条CNAME记录,将documents.example.com指向docs.example.com,添加该条CNAME记录后,所有访问documents.example.com的请求都会被转到docs.example.com,获得相同的内容。

CNAME域名

接入CDN时,在CDN提供商控制台添加完加速域名后,您会得到一个CDN给您分配的CNAME域名, 您需要在您的DNS解析服务商添加CNAME记录,将自己的加速域名指向这个CNAME域名,这样该域名所有的请求才会都将转向CDN的节点,达到加速效果。

DNS

DNS 即 Domain Name System,是域名解析服务的意思。它在互联网的作用是:把域名转换成为网络可以识别的 IP 地址。人们习惯记忆域名,但机器间互相只认IP地址,域名与IP地址之间是一一对应的,它们之间的转换工作称为域名解析,域名解析需要由专门的域名解析服务器来完成,整个过程是自动进行的。比如:上网时输入的www.baidu.com 会自动转换成为 220.181.112.143。

常见的DNS解析服务商有:阿里云解析,万网解析,DNSPod,新网解析,Route53(AWS),Dyn,Cloudflare等。

回源 host

回源host:回源 host 决定回源请求访问到源站上的具体某个站点。

  • 例子1:源站是域名源站为www.a.com,回源host为www.b.com,那么实际回源是请求到`www.a.com解析到的IP,对应的主机上的站点www.b.com
  • 例子2:源站是IP源站为1.1.1.1, 回源host为www.b.com,那么实际回源的是1.1.1.1对应的主机上的站点www.b.com

协议回源 指回源时使用的协议和客户端访问资源时的协议保持一致,即如果客户端使用 HTTPS 方式请求资源,当CDN节点上未缓存该资源时,节点会使用相同的 HTTPS 方式回源获取资源;同理如果客户端使用 HTTP 协议的请求,CDN节点回源时也使用HTTP协议。

出处:https://www.jianshu.com/p/1da... 作者:Kandy

image

查看原文

Charles 关注了标签 · 10月10日

关注 84672

Charles 关注了标签 · 10月10日

并发编程

并发编程又称多线程编程,用于提高资源利用率

关注 10

Charles 关注了标签 · 10月10日

golang

Go语言是谷歌2009发布的第二款开源编程语言。Go语言专门针对多处理器系统应用程序的编程进行了优化,使用Go编译的程序可以媲美C或C++代码的速度,而且更加安全、支持并行进程。
Go语言是谷歌推出的一种全新的编程语言,可以在不损失应用程序性能的情况下降低代码的复杂性。谷歌首席软件工程师罗布派克(Rob Pike)说:我们之所以开发Go,是因为过去10多年间软件开发的难度令人沮丧。Go是谷歌2009发布的第二款编程语言。

七牛云存储CEO许式伟出版《Go语言编程
go语言翻译项目 http://code.google.com/p/gola...
《go编程导读》 http://code.google.com/p/ac-m...
golang的官方文档 http://golang.org/doc/docs.html
golang windows上安装 http://code.google.com/p/gomi...

关注 25995

Charles 赞了文章 · 10月10日

Go并发编程之传统同步—(2)条件变量

前言

回顾上篇文章《Go并发编程之传统同步—(1)互斥锁》其中说到,同步最终是为了达到以下两种目的:

  • 维持共享数据一致性,并发安全
  • 控制流程管理,更好的协同工作

示例程序通过使用互斥锁,达到了数据一致性目的,那么流程管理应该怎么做呢?

传统同步

条件变量

上篇文章的示例程序,仅仅实现了累加功能,但在现实的工作场景中,需求往往不可能这么简单,现在扩展一下这个程序,给它加上累减的功能。

加上了累减的示例程序,可以抽象的理解为一个固定容量的“储水池”,可以注水、排水。

仅用互斥锁

当水注满以后,停止注水,开始排水,当水排空以后,开始注水,反反复复...

func TestDemo1(t *testing.T) {
    var mut sync.Mutex
    maxSize := 10
    counter := 0

    // 排水口
    go func() {
        for {
            mut.Lock()
            if counter == maxSize {
                for i := 0; i < maxSize; i++ {
                    counter--
                    log.Printf("OUTPUT counter = %d", counter)
                }
            }
            mut.Unlock()
            time.Sleep(1 * time.Second)
        }
    }()

    // 注水口
    for {
        mut.Lock()
        if counter == 0 {
            for i := 0; i < maxSize; i++ {
                counter++
                log.Printf(" INPUT counter = %d", counter)
            }
        }
        mut.Unlock()
        time.Sleep(1 * time.Second)
    }
}

结果

=== RUN   TestDemo1
                ···
2020/10/06 13:52:50  INPUT counter = 8
2020/10/06 13:52:50  INPUT counter = 9
2020/10/06 13:52:50  INPUT counter = 10
2020/10/06 13:52:50 OUTPUT counter = 9
2020/10/06 13:52:50 OUTPUT counter = 8
2020/10/06 13:52:50 OUTPUT counter = 7
                ···

看着没有什么问题,一切正常,但就是这样工作的策略效率太低。

优化互斥锁

优化策略,不用等注满水再排水,也不用放空之后,再注水,注水口和排水口一起工作。

func TestDemo2(t *testing.T) {
    var mut sync.Mutex
    maxSize := 10
    counter := 0

    // 排水口
    go func() {
        for {
            mut.Lock()
            if counter != 0 {
                counter--
            }
            log.Printf("OUTPUT counter = %d", counter)
            mut.Unlock()
            time.Sleep(5 * time.Second) // 为了演示效果,睡眠5秒
        }
    }()

    // 注水口
    for {
        mut.Lock()
        if counter != maxSize {
            counter++
        }
        log.Printf(" INPUT counter = %d", counter)
        mut.Unlock()
        time.Sleep(1 * time.Second) // 为了演示效果,睡眠1秒
    }
}

结果

=== RUN   TestDemo2
                ···
2020/10/06 14:11:46  INPUT counter = 7
2020/10/06 14:11:47  INPUT counter = 8
2020/10/06 14:11:48 OUTPUT counter = 7
2020/10/06 14:11:48  INPUT counter = 8
2020/10/06 14:11:49  INPUT counter = 9
2020/10/06 14:11:50  INPUT counter = 10
2020/10/06 14:11:51  INPUT counter = 10
2020/10/06 14:11:52  INPUT counter = 10
2020/10/06 14:11:53 OUTPUT counter = 9
2020/10/06 14:11:53  INPUT counter = 10
2020/10/06 14:11:54  INPUT counter = 10
2020/10/06 14:11:55  INPUT counter = 10
2020/10/06 14:11:56  INPUT counter = 10
2020/10/06 14:11:57  INPUT counter = 10
2020/10/06 14:11:58 OUTPUT counter = 9
2020/10/06 14:11:58  INPUT counter = 10
2020/10/06 14:11:59  INPUT counter = 10
                ···

通过日志输出,可以看到程序达到了需求,运作正常。

但是,通过日志输出发现,当排水口效率低下的时候,注水口一直在轮询,这里频繁的上锁操作造成的开销很是浪费。

条件变量:单发通知

那有没有什么好的办法,省去不必要的轮询?如果注水口和排水口能互相“通知”就好了!这个功能,条件变量可以做到。

条件变量总是与互斥锁组合使用,除了可以使用 Lock、Unlock,还有如下三个方法:

  • Wait 等待通知
  • Signal 单发通知
  • Broadcast 广播通知
func TestDemo3(t *testing.T) {
    cond := sync.NewCond(new(sync.Mutex)) // 初始化条件变量
    maxSize := 10
    counter := 0

    // 排水口
    go func() {
        for {
            cond.L.Lock() // 上锁
            if counter == 0 { // 没水了
                cond.Wait() // 啥时候来水?等通知!
            }
            counter--
            log.Printf("OUTPUT counter = %d", counter)
            cond.Signal() // 单发通知:已排水
            cond.L.Unlock() // 解锁
            time.Sleep(5 * time.Second) // 为了演示效果,睡眠5秒
        }
    }()

    // 注水口
    for {
        cond.L.Lock() // 上锁
        if counter == maxSize { // 水满了
            cond.Wait() // 啥时候排水?等待通知!
        }
        counter++
        log.Printf(" INPUT counter = %d", counter)
        cond.Signal() // 单发通知:已来水
        cond.L.Unlock() // 解锁
        time.Sleep(1 * time.Second) // 为了演示效果,睡眠1秒
    }
}

结果

=== RUN   TestDemo3
                ···
2020/10/06 14:51:22  INPUT counter = 7
2020/10/06 14:51:23  INPUT counter = 8
2020/10/06 14:51:24 OUTPUT counter = 7
2020/10/06 14:51:24  INPUT counter = 8
2020/10/06 14:51:25  INPUT counter = 9
2020/10/06 14:51:26  INPUT counter = 10
2020/10/06 14:51:29 OUTPUT counter = 9
2020/10/06 14:51:29  INPUT counter = 10
2020/10/06 14:51:34 OUTPUT counter = 9
2020/10/06 14:51:34  INPUT counter = 10
                ···

通过日志输出,可以看出来,注水口没有一直轮询了,而是等到排水口发通知后,再进行注水,注水口一直再等排水口。那么新的问题又来了,如何提高排水口的效率呢?

条件变量:广播通知

多制造出一个排水口,提高排水效率。

那就不能继续使用单发通知了(Signal),因为单发通知只会通知到一个等待(Wait),针对多等待的这种情况,就需要使用广播通知(Broadcast)。

func TestDemo4(t *testing.T) {
    cond := sync.NewCond(new(sync.Mutex)) // 初始化条件变量
    maxSize := 10
    counter := 0

    // 排水口 1
    go func() {
        for {
            cond.L.Lock() // 上锁
            if counter == 0 { // 没水了
            //for counter == 0 { // 没水了
                cond.Wait() // 啥时候来水?等通知!
            }
            counter--
            log.Printf("OUTPUT A counter = %d", counter)
            cond.Broadcast() // 单发通知:已排水
            cond.L.Unlock() // 解锁
            //time.Sleep(2 * time.Second) // 为了演示效果,睡眠5秒
        }
    }()

    // 排水口 2
    go func() {
        for {
            cond.L.Lock() // 上锁
            if counter == 0 { // 没水了
            //for counter == 0 { // 没水了
                cond.Wait() // 啥时候来水?等通知!
            }
            counter--
            log.Printf("OUTPUT B counter = %d", counter)
            cond.Broadcast() // 单发通知:已排水
            cond.L.Unlock() // 解锁
            //time.Sleep(2 * time.Second) // 为了演示效果,睡眠5秒
        }
    }()

    // 注水口
    for {
        cond.L.Lock() // 上锁
        if counter == maxSize { // 水满了
        //for counter == maxSize { // 水满了
            cond.Wait() // 啥时候排水?等待通知!
        }
        counter++
        log.Printf(" INPUT   counter = %d", counter)
        cond.Broadcast() // 单发通知:已来水
        cond.L.Unlock() // 解锁
        //time.Sleep(1 * time.Second) // 为了演示效果,睡眠1秒
    }
}

结果

=== RUN   TestDemo4
                ···
2020/10/07 20:57:30 OUTPUT B counter = 2
2020/10/07 20:57:30 OUTPUT B counter = 1
2020/10/07 20:57:30 OUTPUT B counter = 0
2020/10/07 20:57:30 OUTPUT A counter = -1
2020/10/07 20:57:30 OUTPUT A counter = -2
2020/10/07 20:57:30 OUTPUT A counter = -3
2020/10/07 20:57:30 OUTPUT A counter = -4
                ···
2020/10/07 20:57:31 OUTPUT B counter = -7605
2020/10/07 20:57:31  INPUT   counter = -7604
2020/10/07 20:57:31 OUTPUT A counter = -7605
2020/10/07 20:57:31 OUTPUT A counter = -7606
                ···

通过日志输出可以看到,刚开始的时候还很正常,到后面的时候就变成负值了,一直在负增长,What?

《Go并发编程之传统同步—(1)互斥锁》文章中,程序因为没有加上互斥锁,出现过 counter 值异常的情况。

但这次程序这次加了互斥锁,按理说形成了一个临界区应该是没有问题了,所以问题应该不是出在临界区上,难道问题出在 Wait 上?

通过IDE追踪一下Wait的源码

func (c *Cond) Wait() {
    // 检查 c 是否是被复制的,如果是就 panic
    c.checker.check()
    // 将当前 goroutine 加入等待队列
    t := runtime_notifyListAdd(&c.notify)
    c.L.Unlock()
    // 等待当前 goroutine 被唤醒
    runtime_notifyListWait(&c.notify, t)
    c.L.Lock()
}

原来 Wait 内部的执行流程是,先执行了解锁,然后进入等待状态,接到通知之后,再执行加锁操作。

那按照这个代码逻辑结合输出日志,走一程序遍流程,看看能不能复现出 counter 为负值的情况:

  1. 注水口将 counter 累加到 10 之后,发送广播通知(Broadcast)。
  2. goroutine A 在“第1步”之前的时候进入了等待通知(Wait),现在接收到了广播通知(Broadcast),从 runtime_notifyListWait() 返回,并且成功执行了加锁(Lock)操作。
  3. goroutine B 在“第1步”之前的时候进入了等待通知(Wait),现在接收到了广播通知(Broadcast),从 runtime_notifyListWait() 返回,在执行加锁(Lock)操作的时候,发现 goroutine A 先抢占了临界区,所以一直阻塞在 c.L.Lock()。
  4. goroutine A 虽然完成任务后会释放锁,但是每次也成功将锁抢占,所以就这样 一直将 counter 减到了 0,然后发送广播通知(Broadcast)、解锁(Unlock)。
  5. goroutine B 在 goroutine A 解锁后,成功获得锁并从 Lock 方法中返回,接下来跳出 Wait 方法、跳出 if 判断,执行 counter--(0--),这时候 counter 的值是 -1

图示

image

问题就出现在第五步,只要 goroutine B 加锁成功的时候,再判断一下 counter 是否为 0 就好了。

所以将 if counter == 0 改成 for counter == 0,这样上面的“第五步”就变成了

5.goroutine B 在 goroutine A 解锁后,成功加锁(Lock)并从阻塞总返回,接下来跳出 Wait 方法、再次进入 for 循环,判断 counter == 0 结果为真,再次进入等待(Wait)。

代码做出相应的修改后,再执行看结果,没有问题了。

延伸

发送通知

等待通知(Wait)肯定是要在临界区里面的,那发送通知(Signal、Broadcast)在哪里更好呢?

Luck()
Wait()
Broadcast()// Signal()
Unlock()

// 或者

Luck()
Wait()
Unlock()
Broadcast()// Signal()

// 两种写法都不会报错 

在 go 的发送通知方法(Broadcast、Signal)上有这么一段话:

// It is allowed but not required for the caller to hold c.L
// during the call.

在我以往的 C 多线程开发的时候,发送通知总是在锁中的:

pthread_mutex_lock(&thread->mutex);
//              ...
pthread_cond_signal(&thread->cond);
pthread_mutex_unlock(&thread->mutex);

man 手册中有写到:

The pthread_cond_broadcast() or pthread_cond_signal() functions may be called by a thread whether or not it currently owns the mutex that threads calling pthread_cond_wait() or pthread_cond_timedwait() have associated with the condition variable during their waits; however, if predictable scheduling behavior is required, then that mutex shall be locked by the thread calling pthread_cond_broadcast() or pthread_cond_signal().

个人对此并没有什么见解,就不乱下定论了,有想法的小伙伴可以在文章下面留言,一起讨论。

等待通知

消息通知是有即时性的,如果没有 goroutine 在等待通知,那么这次通知直接被丢弃。

kubernetes

https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/client-go/tools/cache/fifo.go

总结

  1. Wait() 内会执行解锁、等待、加锁。
  2. Wait() 必须在 for 循环里面。
  3. Wait() 方法会把当前的 goroutine 添加到通知队列的队尾。
  4. 单发通知,唤醒通知队列第一个排队的 goroutine。
  5. 广播通知,唤醒通知队列里面全部的 goroutine。
  6. 程序示例只是为了演示效果,实际的开发中,生产者和消费者应该是异步消费,不应该使用同一个互斥锁。

文章示例代码

Sown专栏地址:https://segmentfault.com/blog/sown

查看原文

赞 8 收藏 3 评论 12

Charles 赞了文章 · 10月10日

OpenResty 和 Nginx 的共享内存区是如何消耗物理内存的

OpenResty 和 Nginx 服务器通常会配置共享内存区,用于储存在所有工作进程之间共享的数据。例如,Nginx 标准模块 ngx_http_limit_reqngx_http_limit_conn 使用共享内存区储存状态数据,以限制所有工作进程中的用户请求速率和用户请求的并发度。OpenResty 的 ngx_lua 模块通过 lua_shared_dict,向用户 Lua 代码提供基于共享内存的数据字典存储。

本文通过几个简单和独立的例子,探讨这些共享内存区如何使用物理内存资源(或 RAM)。我们还会探讨共享内存的使用率对系统层面的进程内存指标的影响,例如在 ps 等系统工具的结果中的 VSZRSS 等指标。

与本博客网站 中的几乎所有技术类文章类似,我们使用 OpenResty XRay 这款动态追踪产品对未经修改的 OpenResty 或 Nginx 服务器和应用的内部进行深度分析和可视化呈现。因为 OpenResty XRay 是一个非侵入性的分析平台,所以我们不需要对 OpenResty 或 Nginx 的目标进程做任何修改 -- 不需要代码注入,也不需要在目标进程中加载特殊插件或模块。这样可以保证我们通过 OpenResty XRay 分析工具所看到的目标进程内部状态,与没有观察者时的状态是完全一致的。

我们将在多数示例中使用 ngx_lua 模块的 lua_shared_dict,因为该模块可以使用自定义的 Lua 代码进行编程。我们在这些示例中展示的行为和问题,也同样适用于所有标准 Nginx 模块和第三方模块中的其他共享内存区。

Slab 与内存页

Nginx 及其模块通常使用 Nginx 核心里的 slab 分配器 来管理共享内存区内的空间。这个slab 分配器专门用于在固定大小的内存区内分配和释放较小的内存块。

在 slab 的基础之上,共享内存区会引入更高层面的数据结构,例如红黑树和链表等等。

slab 可能小至几个字节,也可能大至跨越多个内存页。

操作系统以内存页为单位来管理进程的共享内存(或其他种类的内存)。
x86_64 Linux 系统中,默认的内存页大小通常是 4 KB,但具体大小取决于体系结构和 Linux 内核的配置。例如,某些 Aarch64 Linux 系统的内存页大小高达 64 KB。

我们将会看到 OpenResty 和 Nginx 进程的共享内存区,分别在内存页层面和 slab 层面上的细节信息。

分配的内存不一定有消耗

与硬盘这样的资源不同,物理内存(或 RAM)总是一种非常宝贵的资源。
大部分现代操作系统都实现了一种优化技术,叫做 按需分页(demand-paging),用于减少用户应用对 RAM 资源的压力。具体来说,就是当你分配大块的内存时,操作系统核心会将 RAM 资源(或物理内存页)的实际分配推迟到内存页里的数据被实际使用的时候。例如,如果用户进程分配了 10 个内存页,但却只使用了 3 个内存页,则操作系统可能只把这 3 个内存页映射到了 RAM 设备。这种行为同样适用于 Nginx 或 OpenResty 应用中分配的共享内存区。用户可以在 nginx.conf 文件中配置庞大的共享内存区,但他可能会注意到在服务器启动之后,几乎没有额外占用多少内存,毕竟通常在刚启动的时候,几乎没有共享内存页被实际使用到。

空的共享内存区

我们以下面这个 nginx.conf 文件为例。该文件分配了一个空的共享内存区,并且从没有使用过它:

master_process on;
worker_processes 2;

events {
    worker_connections 1024;
}

http {
    lua_shared_dict dogs 100m;

    server {
        listen 8080;

        location = /t {
            return 200 "hello world\n";
        }
    }
}

我们通过 lua_shared_dict 指令配置了一个 100 MB 的共享内存区,名为 dogs。并且我们为这个服务器配置了 2 个工作进程。请注意,我们在配置里从没有触及这个 dogs 区,所以这个区是空的。

可以通过下列命令启动这个服务器:

mkdir ~/work/
cd ~/work/
mkdir logs/ conf/
vim conf/nginx.conf  # paste the nginx.conf sample above here
/usr/local/openresty/nginx/sbin/nginx -p $PWD/

然后用下列命令查看 nginx 进程是否已在运行:

$ ps aux|head -n1; ps aux|grep nginx
USER       PID %CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND
agentzh   9359  0.0  0.0 137508  1576 ?        Ss   09:10   0:00 nginx: master process /usr/local/openresty/nginx/sbin/nginx -p /home/agentzh/work/
agentzh   9360  0.0  0.0 137968  1924 ?        S    09:10   0:00 nginx: worker process
agentzh   9361  0.0  0.0 137968  1920 ?        S    09:10   0:00 nginx: worker process

这两个工作进程占用的内存大小很接近。下面我们重点研究 PID 为 9360 的这个工作进程。在 OpenResty XRay 控制台的 Web 图形界面中,我们可以看到这个进程一共占用了 134.73 MB 的虚拟内存(virtual memory)和 1.88 MB 的常驻内存(resident memory),这与上文中的 ps 命令输出的结果完全相同:

空的共享内存区的虚拟内存使用量明细

正如我们的另一篇文章 《OpenResty 和 Nginx 如何分配和管理内存》中所介绍的,我们最关心的就是常驻内存的使用量。常驻内存将硬件资源实际映射到相应的内存页(如 RAM 1)。所以我们从图中看到,实际映射到硬件资源的内存量很少,总计只有 1.88MB。上文配置的 100 MB 的共享内存区在这个常驻内存当中只占很小的一部分(详情请见后续的讨论)。

当然,共享内存区的这 100 MB 还是全部贡献到了该进程的虚拟内存总量中去了。操作系统会为这个共享内存区预留出虚拟内存的地址空间,不过,这只是一种簿记记录,此时并不占用任何的 RAM 资源或其他硬件资源。

不是 空无一物

我们可以通过该进程的“应用层面的内存使用量的分类明细”图,来检查空的共享内存区是否占用了常驻(或物理)内存。

应用层面内存使用量明细

有趣的是,我们在这个图中看到了一个非零的 Nginx Shm Loaded (已加载的 Nginx 共享内存)组分。这部分很小,只有 612 KB,但还是出现了。所以空的共享内存区也并非空无一物。这是因为 Nginx 已经在新初始化的共享内存区域中放置了一些元数据,用于簿记目的。这些元数据为 Nginx 的 slab 分配器所使用。

已加载和未加载内存页

我们可以通过 OpenResty XRay 自动生成的下列图表,查看共享内存区内被实际使用(或加载)的内存页数量。

共享内存区域内已加载和未加载的内存页

我们发现在dogs区域中已经加载(或实际使用)的内存大小为 608 KB,同时有一个特殊的 ngx_accept_mutex_ptr 被 Nginx 核心自动分配用于 accept_mutex 功能。

这两部分内存的大小相加为 612 KB,正是上文的饼状图中显示的 Nginx Shm Loaded 的大小。

如前文所述,dogs 区使用的 608 KB 内存实际上是 slab 分配器 使用的元数据。

未加载的内存页只是被保留的虚拟内存地址空间,并没有被使用过。

关于进程的页表

我们没有提及的一种复杂性是,每一个 nginx 工作进程其实都有各自的页表。CPU 硬件或操作系统内核正是通过查询这些页表来查找虚拟内存页所对应的存储。因此每个进程在不同共享内存区内可能有不同的已加载页集合,因为每个进程在运行过程中可能访问过不同的内存页集合。为了简化这里的分析,OpenResty XRay 会显示所有的为任意一个工作进程加载过的内存页,即使当前的目标工作进程从未碰触过这些内存页。也正因为这个原因,已加载内存页的总大小可能(略微)高于目标进程的常驻内存的大小。

空闲的和已使用的 slab

如上文所述,Nginx 通常使用 slabs 而不是内存页来管理共享内存区内的空间。我们可以通过 OpenResty XRay 直接查看某一个共享内存区内已使用的和空闲的(或未使用的)slabs 的统计信息:

dogs区域中空的和已使用的slab

如我们所预期的,我们这个例子里的大部分 slabs 是空闲的未被使用的。注意,这里的内存大小的数字远小于上一节中所示的内存页层面的统计数字。这是因为 slabs 层面的抽象层次更高,并不包含 slab 分配器针对内存页的大小补齐和地址对齐的内存消耗。

我们可以通过OpenResty XRay进一步观察在这个 dogs 区域中各个 slab 的大小分布情况:

空白区域的已使用 slab 大小分布

空的 slab 大小分布

我们可以看到这个空的共享内存区里,仍然有 3 个已使用的 slab 和 157 个空闲的 slab。这些 slab 的总个数为:3 + 157 = 160个。请记住这个数字,我们会在下文中跟写入了一些用户数据的 dogs 区里的情况进行对比。

写入了用户数据的共享内存区

下面我们会修改之前的配置示例,在 Nginx 服务器启动时主动写入一些数据。具体做法是,我们在 nginx.conf 文件的 http {} 配置分程序块中增加下面这条 init_by_lua_block 配置指令:

init_by_lua_block {
    for i = 1, 300000 do
        ngx.shared.dogs:set("key" .. i, i)
    end
}

这里在服务器启动的时候,主动对 dogs 共享内存区进行了初始化,写入了 300,000 个键值对。

然后运行下列的 shell 命令以重新启动服务器进程:

kill -QUIT `cat logs/nginx.pid`
/usr/local/openresty/nginx/sbin/nginx -p $PWD/

新启动的 Nginx 进程如下所示:

$ ps aux|head -n1; ps aux|grep nginx
USER       PID %CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND
agentzh  29733  0.0  0.0 137508  1420 ?        Ss   13:50   0:00 nginx: master process /usr/local/openresty/nginx/sbin/nginx -p /home/agentzh/work/
agentzh  29734 32.0  0.5 138544 41168 ?        S    13:50   0:00 nginx: worker process
agentzh  29735 32.0  0.5 138544 41044 ?        S    13:50   0:00 nginx: worker process

虚拟内存与常驻内存

针对 Nginx 工作进程 29735,OpenResty XRay 生成了下面这张饼图:

非空白区域的虚拟内存使用量明细

显然,常驻内存的大小远高于之前那个空的共享区的例子,而且在总的虚拟内存大小中所占的比例也更大(29.6%)。

虚拟内存的使用量也略有增加(从 134.73 MB 增加到了 135.30 MB)。因为共享内存区本身的大小没有变化,所以共享内存区对于虚拟内存使用量的增加其实并没有影响。这里略微增大的原因是我们通过 init_by_lua_block 指令新引入了一些 Lua 代码(这部分微小的内存也同时贡献到了常驻内存中去了)。

应用层面的内存使用量明细显示,Nginx 共享内存区域的已加载内存占用了最多常驻内存:

dogs 区域内已加载和未加载的内存页

已加载和未加载内存页

现在在这个 dogs 共享内存区里,已加载的内存页多了很多,而未加载的内存页也有了相应的显著减少:

dogs区域中的已加载和未加载内存页

空的和已使用的 slab

现在 dogs 共享内存区增加了 300,000 个已使用的 slab(除了空的共享内存区中那 3 个总是会预分配的 slab 以外):

dogs非空白区域中的已使用slab

显然,lua_shared_dict 区中的每一个键值对,其实都直接对应一个 slab。

空闲 slab 的数量与先前在空的共享内存区中的数量是完全相同的,即 157 个 slab:

dogs非空白区域的空slab

虚假的内存泄漏

正如我们上面所演示的,共享内存区在应用实际访问其内部的内存页之前,都不会实际耗费物理内存资源。因为这个原因,用户可能会观察到 Nginx 工作进程的常驻内存大小似乎会持续地增长,特别是在进程刚启动之后。这会让用户误以为存在内存泄漏。下面这张图展示了这样的一个例子:

process memory growing

通过查看 OpenResty XRay 生成的应用级别的内存使用明细图,我们可以清楚地看到 Nginx 的共享内存区域其实占用了绝大部分的常驻内存空间:

Memory usage breakdown for huge shm zones

这种内存增长是暂时的,会在共享内存区被填满时停止。但是当用户把共享内存区配置得特别大,大到超出当前系统中可用的物理内存的时候,仍然是有潜在风险的。正因为如此,我们应该注意观察如下所示的内存页级别的内存使用量的柱状图:

Loaded and unloaded memory pages in shared memory zones

图中蓝色的部分可能最终会被进程用尽(即变为红色),而对当前系统产生冲击。

HUP 重新加载

Nginx 支持通过 HUP 信号来重新加载服务器的配置而不用退出它的 master 进程(worker 进程仍然会优雅退出并重启)。通常 Nginx 共享内存区会在 HUP 重新加载(HUP reload)之后自动继承原有的数据。所以原先为已访问过的共享内存页分配的那些物理内存页也会保留下来。于是想通过 HUP 重新加载来释放共享内存区内的常驻内存空间的尝试是会失败的。用户应改用 Nginx 的重启或二进制升级操作。

值得提醒的是,某一个 Nginx 模块还是有权决定是否在 HUP 重新加载后保持原有的数据。所以可能会有例外。

结论

我们在上文中已经解释了 Nginx 的共享内存区所占用的物理内存资源,可能远少于 nginx.conf 文件中配置的大小。这要归功于现代操作系统中的按需分页特性。我们演示了空的共享内存区内依然会使用到一些内存页和 slab,以用于存储 slab 分配器本身需要的元数据。通过 OpenResty XRay 的高级分析器,我们可以实时检查运行中的 nginx 工作进程,查看其中的共享内存区实际使用或加载的内存,包括内存页和 slab 这两个不同层面。

另一方面,按需分页的优化也会产生内存在某段时间内持续增长的现象。这其实并不是内存泄漏,但仍然具有一定的风险。我们也解释了 Nginx 的 HUP 重新加载操作通常并不会清空共享内存区里已有的数据。

我们将在本博客网站后续的文章中,继续探讨共享内存区中使用的高级数据结构,例如红黑树和队列,以及如何分析和缓解共享内存区内的内存碎片的问题。

关于作者

章亦春是开源项目 OpenResty® 的创始人,同时也是 OpenResty Inc. 公司的创始人和 CEO。他贡献了许多 Nginx 的第三方模块,相当多 Nginx 和 LuaJIT 核心补丁,并且设计了 OpenResty XRay 等产品。

关注我们

如果您觉得本文有价值,非常欢迎关注我们 OpenResty Inc. 公司的博客网站 。也欢迎扫码关注我们的微信公众号:

我们的微信公众号

翻译

我们提供了英文版原文和中译版(本文) 。我们也欢迎读者提供其他语言的翻译版本,只要是全文翻译不带省略,我们都将会考虑采用,非常感谢!


  1. 当发生交换(swapping)时,一些常驻内存会被保存和映射到硬盘设备上去。
查看原文

赞 10 收藏 7 评论 0

Charles 收藏了文章 · 9月25日

聊聊分布式下的WebSocket解决方案

前言


最近自己搭建了个项目,项目本身很简单,但是里面有使用WebSocket进行消息提醒的功能,大体情况是这样的。

发布消息者在系统中发送消息,实时的把消息推送给对应的一个部门下的所有人。

这里面如果是单机应用的情况时,我们可以通过部门的id和用户的id组成一个唯一的key,与应用服务器建立WebSocket长连接,然后就可以接收到发布消息者发送的消息了。

但是真正把项目应用于生产环境中时,我们是不可能就部署一个单机应用的,而是要部署一个集群。

所以我通过Nginx+两台Tomcat搭建了一个简单的负载均衡集群,作为测试使用

但是问题出现了,我们的客户端浏览器只会与一台服务器建立WebSocket长连接,所以发布消息者在发送消息时,就没法保证所有目标部门的人都能接收到消息(因为这些人连接的可能不是一个服务器)。

本篇文章就是针对于这么一个问题展开讨论,提出一种解决方案,当然解决方案不止一种,那我们开始吧。

WebSocket单体应用介绍


在介绍分布式集群之前,我们先来看一下王子的WebSocket代码实现,先来看java后端代码如下:

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@ServerEndpoint("/webSocket/{key}")
public class WebSocket {

private static int onlineCount = 0;
/**
 * 存储连接的客户端
 */
private static Map<String, WebSocket> clients = new ConcurrentHashMap<String, WebSocket>();
private Session session;
/**
 * 发送的目标科室code
 */
private String key;

@OnOpen
public void onOpen(@PathParam("key") String key, Session session) throws IOException {
    this.key = key;
    this.session = session;
    if (!clients.containsKey(key)) {
        addOnlineCount();
    }
    clients.put(key, this);
    Log.info(key+"已连接消息服务!");
}

@OnClose
public void onClose() throws IOException {
    clients.remove(key);
    subOnlineCount();
}

@OnMessage
public void onMessage(String message) throws IOException {
    if(message.equals("ping")){
        return ;
    }
    JSONObject jsonTo = JSON.parseObject(message);
    String mes = (String) jsonTo.get("message");
    if (!jsonTo.get("to").equals("All")){
        sendMessageTo(mes, jsonTo.get("to").toString());
    }else{
        sendMessageAll(mes);
    }
}

@OnError
public void onError(Session session, Throwable error) {
    error.printStackTrace();
}

private void sendMessageTo(String message, String To) throws IOException {
    for (WebSocket item : clients.values()) {
        if (item.key.contains(To) )
            item.session.getAsyncRemote().sendText(message);
    }
}

private void sendMessageAll(String message) throws IOException {
    for (WebSocket item : clients.values()) {
        item.session.getAsyncRemote().sendText(message);
    }
}

public static synchronized int getOnlineCount() {
    return onlineCount;
}

public static synchronized void addOnlineCount() {
    WebSocket.onlineCount++;
}

public static synchronized void subOnlineCount() {
    WebSocket.onlineCount--;
}

public static synchronized Map<String, WebSocket> getClients() {
    return clients;
}

}

示例代码中并没有使用Spring,用的是原生的java web编写的,简单和大家介绍一下里面的方法。

onOpen:在客户端与WebSocket服务连接时触发方法执行

onClose:在客户端与WebSocket连接断开的时候触发执行

onMessage:在接收到客户端发送的消息时触发执行

onError:在发生错误时触发执行

可以看到,在onMessage方法中,我们直接根据客户端发送的消息,进行消息的转发功能,这样在单体消息服务中是没有问题的。

再来看一下js代码

var host = document.location.host;

// 获得当前登录科室
var deptCodes='${sessionScope.$UserContext.departmentID}';
deptCodes=deptCodes.replace(/[[|]|s]+/g, "");
var key = '${sessionScope.$UserContext.userID}'+deptCodes;
var lockReconnect = false;  //避免ws重复连接
var ws = null;          // 判断当前浏览器是否支持WebSocket
var wsUrl = 'ws://' + host + '/webSocket/'+ key;
createWebSocket(wsUrl);   //连接ws

function createWebSocket(url) {
    try{
        if('WebSocket' in window){
            ws = new WebSocket(url);
        }else if('MozWebSocket' in window){  
            ws = new MozWebSocket(url);
        }else{
              layer.alert("您的浏览器不支持websocket协议,建议使用新版谷歌、火狐等浏览器,请勿使用IE10以下浏览器,360浏览器请使用极速模式,不要使用兼容模式!"); 
        }
        initEventHandle();
    }catch(e){
        reconnect(url);
        console.log(e);
    }     
}

function initEventHandle() {
    ws.onclose = function () {
        reconnect(wsUrl);
        console.log("llws连接关闭!"+new Date().toUTCString());
    };
    ws.onerror = function () {
        reconnect(wsUrl);
        console.log("llws连接错误!");
    };
    ws.onopen = function () {
        heartCheck.reset().start();      //心跳检测重置
        console.log("llws连接成功!"+new Date().toUTCString());
    };
    ws.onmessage = function (event) {    //如果获取到消息,心跳检测重置
        heartCheck.reset().start();      //拿到任何消息都说明当前连接是正常的//接收到消息实际业务处理

        ...

    };
}
// 监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。
window.onbeforeunload = function() {
    ws.close();
}  

function reconnect(url) {
    if(lockReconnect) return;
    lockReconnect = true;
    setTimeout(function () {     //没连接上会一直重连,设置延迟避免请求过多
        createWebSocket(url);
        lockReconnect = false;
    }, 2000);
}

//心跳检测
var heartCheck = {
    timeout: 300000,        //5分钟发一次心跳
    timeoutObj: null,
    serverTimeoutObj: null,
    reset: function(){
        clearTimeout(this.timeoutObj);
        clearTimeout(this.serverTimeoutObj);
        return this;
    },
    start: function(){
        var self = this;
        this.timeoutObj = setTimeout(function(){
            //这里发送一个心跳,后端收到后,返回一个心跳消息,
            //onmessage拿到返回的心跳就说明连接正常
            ws.send("ping");
            console.log("ping!")
            self.serverTimeoutObj = setTimeout(function(){//如果超过一定时间还没重置,说明后端主动断开了
                ws.close();     //如果onclose会执行reconnect,我们执行ws.close()就行了.如果直接执行reconnect 会触发onclose导致重连两次
            }, self.timeout)
        }, this.timeout)
    }

  }

js部分使用的是原生H5编写的,如果为了更好的兼容浏览器,也可以使用SockJS,有兴趣小伙伴们可以自行百度。

接下来我们就手动的优化代码,实现WebSocket对分布式架构的支持。

解决方案的思考


现在我们已经了解单体应用下的代码结构,也清楚了WebSocket在分布式环境下面临的问题,那么是时候思考一下如何能够解决这个问题了。

我们先来看一看发生这个问题的根本原因是什么。

简单思考一下就能明白,单体应用下只有一台服务器,所有的客户端连接的都是这一台消息服务器,所以当发布消息者发送消息时,所有的客户端其实已经全部与这台服务器建立了连接,直接群发消息就可以了。

换成分布式系统后,假如我们有两台消息服务器,那么客户端通过Nginx负载均衡后,就会有一部分连接到其中一台服务器,另一部分连接到另一台服务器,所以发布消息者发送消息时,只会发送到其中的一台服务器上,而这台消息服务器就可以执行群发操作,但问题是,另一台服务器并不知道这件事,也就无法发送消息了。

现在我们知道了根本原因是生产消息时,只有一台消息服务器能够感知到,所以我们只要让另一台消息服务器也能感知到就可以了,这样感知到之后,它就可以群发消息给连接到它上边的客户端了。

那么什么方法可以实现这种功能呢,王子很快想到了引入消息中间件,并使用它的发布订阅模式来通知所有消息服务器就可以了。

引入RabbitMQ解决分布式下的WebSocket问题


在消息中间件的选择上,王子选择了RabbitMQ,原因是它的搭建比较简单,功能也很强大,而且我们只是用到它群发消息的功能。

RabbitMQ有一个广播模式(fanout),我们使用的就是这种模式。

首先我们写一个RabbitMQ的连接类:

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RabbitMQUtil {

private static Connection connection;

/**
 * 与rabbitmq建立连接
 * @return
 */
public static Connection getConnection() {
    if (connection != null&&connection.isOpen()) {
        return connection;
    }

    ConnectionFactory factory = new ConnectionFactory();
    factory.setVirtualHost("/");
    factory.setHost("192.168.220.110"); // 用的是虚拟IP地址
    factory.setPort(5672);
    factory.setUsername("guest");
    factory.setPassword("guest");

    try {
        connection = factory.newConnection();
    } catch (IOException e) {
        e.printStackTrace();
    } catch (TimeoutException e) {
        e.printStackTrace();
    }

    return connection;
}

}

这个类没什么说的,就是获取MQ连接的一个工厂类。

然后按照我们的思路,就是每次服务器启动的时候,都会创建一个MQ的消费者监听MQ的消息,王子这里测试使用的是Servlet的监听器,如下:

import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;

public class InitListener implements ServletContextListener {

@Override
public void contextInitialized(ServletContextEvent servletContextEvent) {
    WebSocket.init();
}

@Override
public void contextDestroyed(ServletContextEvent servletContextEvent) {

}

}

记得要在Web.xml中配置监听器信息

<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns="http://xmlns.jcp.org/xml/ns/javaee"

     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_4_0.xsd"
     version="4.0">
<listener>
    <listener-class>InitListener</listener-class>
</listener>

</web-app>

WebSocket中增加init方法,作为MQ消费者部分

public static void init() {

    try {
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();
        //交换机声明(参数为:交换机名称;交换机类型)
        channel.exchangeDeclare("fanoutLogs",BuiltinExchangeType.FANOUT);
        //获取一个临时队列
        String queueName = channel.queueDeclare().getQueue();
        //队列与交换机绑定(参数为:队列名称;交换机名称;routingKey忽略)
        channel.queueBind(queueName,"fanoutLogs","");


        //这里重写了DefaultConsumer的handleDelivery方法,因为发送的时候对消息进行了getByte(),在这里要重新组装成String
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                super.handleDelivery(consumerTag, envelope, properties, body);
                String message = new String(body,"UTF-8");
                System.out.println(message);

            //这里可以使用WebSocket通过消息内容发送消息给对应的客户端

            }
        };

        //声明队列中被消费掉的消息(参数为:队列名称;消息是否自动确认;consumer主体)
        channel.basicConsume(queueName,true,consumer);
        //这里不能关闭连接,调用了消费方法后,消费者会一直连接着rabbitMQ等待消费
    } catch (IOException e) {
        e.printStackTrace();
    }
}

同时在接收到消息时,不是直接通过WebSocket发送消息给对应客户端,而是发送消息给MQ,这样如果消息服务器有多个,就都会从MQ中获得消息,之后通过获取的消息内容再使用WebSocket推送给对应的客户端就可以了。

WebSocket的onMessage方法增加内容如下:

try {

        //尝试获取一个连接
        Connection connection = RabbitMQUtil.getConnection();
        //尝试创建一个channel
        Channel channel = connection.createChannel();
        //声明交换机(参数为:交换机名称; 交换机类型,广播模式)
        channel.exchangeDeclare("fanoutLogs", BuiltinExchangeType.FANOUT);
        //消息发布(参数为:交换机名称; routingKey,忽略。在广播模式中,生产者声明交换机的名称和类型即可)
        channel.basicPublish("fanoutLogs","", null,msg.getBytes("UTF-8"));
        System.out.println("发布消息");
        channel.close();
    } catch (IOException |TimeoutException e) {
        e.printStackTrace();
    }

增加后删除掉原来的Websocket推送部分代码。

这样一整套的解决方案就完成了。

总结


到这里,我们就解决了分布式下WebSocket的推送消息问题。

我们主要是引入了RabbitMQ,通过RabbitMQ的发布订阅模式,让每个消息服务器启动的时候都去订阅消息,而无论哪台消息服务器在发送消息的时候都会发送给MQ,这样每台消息服务器就都会感知到发送消息的事件,从而再通过Websocket发送给客户端。

大体流程就是这样,那么小伙伴们有没有想过,如果RabbitMQ挂掉了几分钟,之后重启了,消费者是否可以重新连接到RabbitMQ?是否还能正常接收消息呢?

生产环境下,这个问题是必须考虑的。

这里已经测试过,消费者是支持自动重连的,所以我们可以放心的使用这套架构来解决此问题。

本文到这里就结束了,欢迎各位小伙伴点赞文章留言讨论,一起学习,一起进步。

查看原文

认证与成就

  • 获得 55 次点赞
  • 获得 8 枚徽章 获得 0 枚金徽章, 获得 1 枚银徽章, 获得 7 枚铜徽章

擅长技能
编辑

开源项目 & 著作
编辑

  • Tiny框架

    公司经常会有h5游戏的开发需求,该类需求后端逻辑往往不复杂,而市面上的很多框架都较为重型,于是决定自己开发一套轻量的开发框架,专门用于满足公司h5游戏的后端开发需求。现在把它分享出来,有兴趣的朋友可以留言一起讨论!

注册于 2016-11-20
个人主页被 877 人浏览