fevin

fevin 查看完整档案

北京编辑  |  填写毕业院校  |  填写所在公司/组织 fevin.github.io 编辑
编辑
_ | |__ _ _ __ _ | '_ \| | | |/ _` | | |_) | |_| | (_| | |_.__/ \__,_|\__, | |___/ 该用户太懒什么也没留下

个人动态

fevin 赞了文章 · 2019-08-29

一致性哈希的通用实现

一致性哈希算法在1997年由麻省理工学院的Karger等人在解决分布式Cache中提出的,设计目标是为了解决因特网中的热点(Hot spot)问题

国际惯例,先上源码 https://github.com/manerfan/c...

  1. 可自定义节点数据类型
  2. 可自定义hash函数

原理

一致性哈希可应用于负载均衡、分布式缓存(缓存分片)等场景中,以下以分布式缓存为例

传统方式

如,现有N个缓存实例,将一个对象object映射到某一个缓存上可以采取取模方式 hash(object) % N
我们称之为简单hash算法。一般,简单hash算法确实能够比较均匀地实现分布式映射,但如果考虑缓存实例变动(增删)的情况:

  1. 某一缓存实例宕机,需要将该实例从集群中摘除,则映射公式变为 hash(object) % (N - 1)
  2. 增加一台缓存实例,将该实例加入集群,则映射公式变为 hash(object) % (N + 1)

对于以上情况,无论新增还是移除,大部分object所映射的缓存实例均会改变,缓存命中率大幅度降低从而回源到服务器,短时间内造成缓存雪崩现象

一致性哈希

一致性 Hash 算法简单的说,在移除/添加一个缓存实例时,尽可能小的改变已存在key映射关系,尽可能的满足单调性的要求。

1. 环形空间

通常的hash算法都是将一个value映射到一个32位的key值,我们可以将这个[0, 2^32-1]空间想象成一个首尾相接的环形队列

consistent_hashing.001.jpeg

2. 将对象映射到hash空间

通过hash函数计算对象hash值,将对象映射到hash环形空间

consistent_hashing.002.jpeg

3. 将缓存实例映射到hash空间

使用缓存实例的ip、port等信息,通过hash函数计算其hash值,将缓存实例映射到hash空间

consistent_hashing.004.jpeg

4. 将对象映射到缓存实例

沿着顺时针方向,查找距离对象object最近的缓存实例,并将对象映射到该实例

consistent_hashing.003.jpeg

5. 添加缓存实例

按照同样的算法,在添加实例后发现,只有少部分对象的映射关系改变

consistent_hashing.004.jpeg

6. 移除缓存实例

按照同样的算法,在移除实例后,同样只有少部分对象的映射关系改变

consistent_hashing.005.jpeg

7. 虚拟节点

为了使对象尽可能均匀地映射到所有的缓存实例中(解决缓存实例分布不均匀的问题),引入虚拟节点的概念
虚拟节点其实为真实节点在hash空间中的复制品,一个真实节点可以对应多个虚拟节点

虚拟节点的hash求值可以在真实节点的求值基础上加入编号等信息 hash(realCacheKey#1)hash(realCacheKey#2)

consistent_hashing.006.jpeg

通用实现

实现目标:

  1. 由于一致性哈希应用较多,并不局限于某一特定场景,故需要能够 自定义节点数据类型
  2. 常规hash算法一般采用md5等,但不限制hash函数实现,故需要能够 自定义hash函数

自定义节点数据类型

这里,我们定义一个公共接口

/**
 * 真实节点
 */
interface PhysicalNode {
    fun hashKey(): String
}

自定义节点数据,只需要实现PhysicalNode接口及hashKey方法,程序会通过hashKey的值计算节点的hash值

如,我们定义常规服务节点

/**
 * 常规的服务节点
 *
 * @param name  服务名称
 * @param host  服务host/ip
 * @param port  服务port
 */
data class HostPortPhysicalNode(val name: String, val host: String, val port: Int) : PhysicalNode {
    override fun hashKey() = "$name:$host:$port"
}

这里,我们还需要定义一个虚拟节点

/**
 * 虚拟节点
 *
 * @param parent    真实节点
 * @param replica   虚拟节点id
 */
data class VirtualNode<out T : PhysicalNode>(val parent: T, private val replica: Int) : PhysicalNode {
    override fun hashKey() = "${parent.hashKey()}#$replica"
    fun matches(key: String) = parent.hashKey() == key
}

虚拟节点的hashKey为真实节点hashKey加上节点编号

自定义hash函数

这里,我们同样定义一个公共接口

/**
 * 哈希函数
 */
interface HashFunc {
    fun hash(str: String): Long
}

自定义hash含蓄,只需要实现HashFunc接口及hash方法

如,我们定义md5函数

class Md5 : HashFunc {
    override fun hash(str: String): Long {
        val md5 = MessageDigest.getInstance("MD5")
        md5.reset()
        md5.update(str.toByteArray())
        val bytes = md5.digest()

        var h: Long = 0
        bytes.forEach {
            h = h shl 8
            h = h or (it.toLong() and 0xFF)
        }
        return h
    }
 }

ConsistentHash的使用

ConsistentHashConsistentHashHelper的实现,见 ConsistentHashHelper.kt

ConsistentHashHelper

我们使用ConsistentHashHelper来构建ConsistentHash

val consistentHash = ConsistentHashHelper.create<HostPortPhysicalNode>().build()

如果需要自定义hash函数,可以通过withHash指定,默认使用md5

val consistentHash = ConsistentHashHelper.create<HostPortPhysicalNode>()
        .withHash(MyMd5HashFunc())
        .build()

同样,可以通过withNodes指定在初始化时生成节点信息

val master = HostPortPhysicalNode("master", "192.169.1.1", 8080)
val slave = HostPortPhysicalNode("slave", "192.169.1.2", 8080)

val consistentHash = ConsistentHashHelper.create<HostPortPhysicalNode>()
        .withHash(MyMd5HashFunc())
        .withNodes(listOf(master, slave), 2) // 节点,并指定每个节点的副本数(可以省略,缺省1)
        .build()

ConsistentHash

运行过程中,可以动态增删节点

val backup = HostPortPhysicalNode("backup", "192.168.1.13", 8888)

consistentHash.add(backup, 4) // 增加节点,并指定每个节点的副本数(可以省略,缺省1)
consistentHash.remove(slave.hashKey())

通过getNode函数获取对应object所映射的缓存实例

consistentHash.getNode(hashFunc.hash(object1.key))
consistentHash.getNode(hashFunc.hash(object2.key))

订阅号

查看原文

赞 2 收藏 1 评论 0

fevin 发布了文章 · 2018-07-29

【踩坑笔记】一次加锁和超时控制引起的交通事故

问题回顾

线上发现流量接入层好像扛不住,一直在被 OOM,并且客户出现大面积的超时。但是流量并没有打到后端的业务层。
在回滚代码,并且加机器之后,问题被解决了。

问题定位与解决

首先,怀疑是流量过大引起的。但是奇怪的点在于大部分流量没有打到业务层。通过分析流量接入层的日志,我们发现 有两个相邻日志输出的时间间隔比较长。而这两条日志输出之间正是有回滚的代码。所以,我们将问题定位的方向转移到了代码层面。

但是,线下压测过程中,并没有发现类似的严重耗时问题,(怀疑是测试 case 没有覆盖到)。于是,先人工 Review 一遍变动的代码。我们发现,有一个代码片段是加锁的,代码如下所示(golang 省略部分细节):

    // key1
    if val, exist := rateMap.Load(key1); exist {
        return true, val.(*RateLimit).Taken(count)
    }

    Lock.Lock()
    defer Lock.Unlock()
    if mapC, exist := RateLimitC[flag1]; exist {
        for _, val := range mapC {
            if key1_ok {
                rateLimit := NewRateLimit(val.Qps)
                rateLimit.Create()
                rateMap.Store(key1, &rateLimit)
                return true, rateLimit.Taken(count)
            }
        }
    }

    // key2
    if val, exist := rateMap.Load(key2); exist {
        return true, val.(*RateLimit).Taken(count)
    }

    for _, val := range RateLimitC[flag2] {
        if key2_ok {
            rateLimit := NewRateLimit(val.Qps)
            rateLimit.Create()
            rateMap.Store(key2, &rateLimit)
            return true, rateLimit.Taken(count)
        }
    }

这是一段 QPS 限流的逻辑,内部实现利用了令牌桶算法,(先忽略有待优化的逻辑,我们来看为什么会出现问题)
代码的大概意思是:

如果用 key1 获取到了 token,就直接返回;否则,加锁,看是 map 里是否有 flag1 代表的限流信息,如果有,则判断是否有符合 key1 条件的,如果有,则走正常获取 token 逻辑;如果没有,则尝试用 key2 获取 token,(下边逻辑类似 key1)

问题就出在线上大部分情况需要用 key2 来获取 token,所以大部分请求都会进入加锁区域。如果只是一个加锁,应该很快就能处理完,但是会有堆积性的耗时呢。
我们来看一下 val.(*RateLimit).Taken(count) 的实现:

func (this *RateLimit) Taken(count int) error {
    timer := time.NewTimer(time.Duration(count) * TimeDuration)
    defer timer.Stop()

    for i := count; i > 0; i-- {
        select {
        case <-this.BucketCh:
        case <-timer.C:
            return errors.New("not get bucket")
        }
    }

    return nil
}

里边有个超时机制,如果限定时间内没有获取到 token,则返回错误。

那么,现在的问题是,所有走通过 key2 获取 token 都会在加锁的区域串行通过,那么,当没有立即获取 token 的请求,阻塞在计时器的时候,其他等待锁的请求都会阻塞在加锁的阶段,直到上一个请求超时,或者获取到 token,他才能获得锁。
换句话说,这条路是一个单行道,一次只能有一人通过,这个人还经常卡在收费站口,那么后边来的人就可能会越积越多,等待的时间越来越长,最后把路都给压垮了。

总结

像这种错误,想到之后是很容易复现的,而且只要满足条件,这个 bug 必现。
反思:

  1. 开发最了解代码,功能完成之后,需要自己想一下测试的 case,尽量可以自己覆盖到;
  2. 如果只看锁的部分,不觉得有什么问题,但是跟上下文结合起来,问题显而易见(lock + timer);

另外,这种耗时问题,可以在线下用 go 官方的 pprof 包,查看一下程序的耗时情况,也是可以发现的。

查看原文

赞 2 收藏 0 评论 0

fevin 评论了文章 · 2018-07-14

解决 vim 报错:the imp module is deprecated in favour of importlib

问题描述

打开 vim 之后,出现如下错误:

Error detected while processing function youcompleteme#Enable[3]..<SNR>71_SetUpPython:
line   42:
/must>not&exist/foo:1: DeprecationWarning: the imp module is deprecated in favour of importlib; see the module's documentation for alternative uses

原因:
这是 python warning
imppython3.4 之后 已经不再使用。
显然,这个问题是由 ycm 这个插件加载时引起的,可以通过修改 ycm 源码解决。

解决办法

有以下几种:
1.重新安装 vim,但是采用较低版本的 python
2.修改 ycm 报错部分的代码
具体修改如下:
vim PLUG_PATH/YouCompleteMe/autoload/youcompleteme.vim
修改如下:

diff --git a/autoload/youcompleteme.vim b/autoload/youcompleteme.vim
index 597eb020..32461fa9 100644
--- a/autoload/youcompleteme.vim
+++ b/autoload/youcompleteme.vim
@@ -180,7 +180,7 @@ endfunction


 function! s:SetUpPython() abort
-  exec s:python_until_eof
+  silent! exec s:python_until_eof
 from __future__ import unicode_literals
 from __future__ import print_function
 from __future__ import division

参考:Error message printed first time python3 (version 3.7.0) dynamic library is imported


更新:这个问题出现在使用 Python 3.7 的情况,
可以暂时在 .vimrc 中做如下配置,并等待更新 Python 3.7 来解决这个问题:

" temporary fix
" https://github.com/vim/vim/issues/3117
if has('python3')
  silent! python3 1
endif
查看原文

fevin 评论了文章 · 2018-07-14

解决 vim 报错:the imp module is deprecated in favour of importlib

问题描述

打开 vim 之后,出现如下错误:

Error detected while processing function youcompleteme#Enable[3]..<SNR>71_SetUpPython:
line   42:
/must>not&exist/foo:1: DeprecationWarning: the imp module is deprecated in favour of importlib; see the module's documentation for alternative uses

原因:
这是 python warning
imppython3.4 之后 已经不再使用。
显然,这个问题是由 ycm 这个插件加载时引起的,可以通过修改 ycm 源码解决。

解决办法

有以下几种:
1.重新安装 vim,但是采用较低版本的 python
2.修改 ycm 报错部分的代码
具体修改如下:
vim PLUG_PATH/YouCompleteMe/autoload/youcompleteme.vim
修改如下:

diff --git a/autoload/youcompleteme.vim b/autoload/youcompleteme.vim
index 597eb020..32461fa9 100644
--- a/autoload/youcompleteme.vim
+++ b/autoload/youcompleteme.vim
@@ -180,7 +180,7 @@ endfunction


 function! s:SetUpPython() abort
-  exec s:python_until_eof
+  silent! exec s:python_until_eof
 from __future__ import unicode_literals
 from __future__ import print_function
 from __future__ import division

参考:Error message printed first time python3 (version 3.7.0) dynamic library is imported


更新:这个问题出现在使用 Python 3.7 的情况,
可以暂时在 .vimrc 中做如下配置,并等待更新 Python 3.7 来解决这个问题:

" temporary fix
" https://github.com/vim/vim/issues/3117
if has('python3')
  silent! python3 1
endif
查看原文

fevin 发布了文章 · 2018-07-08

解决 vim 报错:the imp module is deprecated in favour of importlib

问题描述

打开 vim 之后,出现如下错误:

Error detected while processing function youcompleteme#Enable[3]..<SNR>71_SetUpPython:
line   42:
/must>not&exist/foo:1: DeprecationWarning: the imp module is deprecated in favour of importlib; see the module's documentation for alternative uses

原因:
这是 python warning
imppython3.4 之后 已经不再使用。
显然,这个问题是由 ycm 这个插件加载时引起的,可以通过修改 ycm 源码解决。

解决办法

有以下几种:
1.重新安装 vim,但是采用较低版本的 python
2.修改 ycm 报错部分的代码
具体修改如下:
vim PLUG_PATH/YouCompleteMe/autoload/youcompleteme.vim
修改如下:

diff --git a/autoload/youcompleteme.vim b/autoload/youcompleteme.vim
index 597eb020..32461fa9 100644
--- a/autoload/youcompleteme.vim
+++ b/autoload/youcompleteme.vim
@@ -180,7 +180,7 @@ endfunction


 function! s:SetUpPython() abort
-  exec s:python_until_eof
+  silent! exec s:python_until_eof
 from __future__ import unicode_literals
 from __future__ import print_function
 from __future__ import division

参考:Error message printed first time python3 (version 3.7.0) dynamic library is imported


更新:这个问题出现在使用 Python 3.7 的情况,
可以暂时在 .vimrc 中做如下配置,并等待更新 Python 3.7 来解决这个问题:

" temporary fix
" https://github.com/vim/vim/issues/3117
if has('python3')
  silent! python3 1
endif
查看原文

赞 2 收藏 1 评论 5

fevin 分享了头条 · 2018-06-15

优雅的重启/关闭 Go http 程序。目的:应用升级时,提供不间断的服务,优化服务体验。

赞 0 收藏 7 评论 0

fevin 关注了标签 · 2018-06-15

golang

Go语言是谷歌2009发布的第二款开源编程语言。Go语言专门针对多处理器系统应用程序的编程进行了优化,使用Go编译的程序可以媲美C或C++代码的速度,而且更加安全、支持并行进程。
Go语言是谷歌推出的一种全新的编程语言,可以在不损失应用程序性能的情况下降低代码的复杂性。谷歌首席软件工程师罗布派克(Rob Pike)说:我们之所以开发Go,是因为过去10多年间软件开发的难度令人沮丧。Go是谷歌2009发布的第二款编程语言。

七牛云存储CEO许式伟出版《Go语言编程
go语言翻译项目 http://code.google.com/p/gola...
《go编程导读》 http://code.google.com/p/ac-m...
golang的官方文档 http://golang.org/doc/docs.html
golang windows上安装 http://code.google.com/p/gomi...

关注 25935

fevin 发布了文章 · 2018-06-08

gracehttp: 优雅重启 Go 程序(热启动 - Zero Downtime)

看完此篇你会知道,如何优雅的使用 HTTP Server

问题背景

http 应用程序重启时,如果我们直接 kill -9 使程序退出,然后在启动,会有以下几个问题:

  1. 旧的请求未处理完,如果服务端进程直接退出,会造成客户端链接中断(收到 RST);
  2. 新请求打过来,服务还没重启完毕,造成 connection refused
  3. 即使是要退出程序,直接 kill -9 仍然会让正在处理的请求中断;
  4. 面对海量请求,如何对链接数进行限制,并进行过载保护;
  5. 避免 open too many files 错误;

这些问题会造成不好的客户体验,严重的甚至影响客户业务。所以,我们需要以一种优雅的方式重启/关闭我们的应用,来达到热启动的效果,即:Zero Downtime

(Tips:名词解释)
热启动:新老程序(进程)无缝替换,同时可以保持对client的服务。让client端感觉不到你的服务挂掉了;
Zero Downtime: 0 宕机时间,即不间断的服务;

解决问题

Github: gracehttp

平滑启动

一般情况下,我们是退出旧版本,再启动新版本,总会有时间间隔,时间间隔内的请求怎么办?而且旧版本正在处理请求怎么办?
那么,针对这些问题,在升级应用过程中,我们需要达到如下目的:

  • 旧版本为退出之前,需要先启动新版本;
  • 旧版本继续处理完已经接受的请求,并且不再接受新请求;
  • 新版本接受并处理新请求的方式;

这样,我们就能实现 Zero Downtime 的升级效果。

实现原理

首先,我们需要用到以下基本知识:
1.linux 信号处理机制:在程序中,通过拦截 signal,并针对 signal 做出不同处理;
2.子进程继承父进程的资源:一切皆文件,子进程会继承父进程的资源句柄,网络端口也是文件;
3.通过给子进程重启标识(比如:重启时带着 -continue 参数),来实现子进程的初始化处理;

重启时,我们可以在程序中捕获 HUP 信号(通过 kill -HUP pid 可以触发),然后开启新进程,退出旧进程。信号处理代码示例如下:

package gracehttp

import (
    "fmt"
    "os"
    "os/signal"
    "syscall"
)

var sig chan os.Signal
var notifySignals []os.Signal

func init() {
    sig = make(chan os.Signal)
    notifySignals = append(notifySignals, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGTSTP, syscall.SIGQUIT)
    signal.Notify(sig, notifySignals...) // 注册需要拦截的信号
}

// 捕获系统信号,并处理
func handleSignals() {
    capturedSig := <-sig
    srvLog.Info(fmt.Sprintf("Received SIG. [PID:%d, SIG:%v]", syscall.Getpid(), capturedSig))
    switch capturedSig {
    case syscall.SIGHUP: // 重启信号
        startNewProcess() // 开启新进程
        shutdown() // 退出旧进程
    case syscall.SIGINT:
        fallthrough
    case syscall.SIGTERM:
        fallthrough
    case syscall.SIGTSTP:
        fallthrough
    case syscall.SIGQUIT:
        shutdown()
    }
}

startNewProcessshutdown 具体实现可以参考 Github

过载保护

通过限制 HTTP Serveraccept 数量实现链接数的限制,来达到如果并发量达到了最大值,客户端超时时间内可以等待,但不会消耗服务端文件句柄数(我们知道 Linux 系统对用户可以打开的最大文件数有限制,网络请求也是文件操作)

实现原理

  • 利用 channel 的缓冲机制实现,每个请求都会获取缓冲区的一个单元大小,知道缓冲区满了,后边的请求就会阻塞;
  • 如果客户端请求被阻塞,达到了客户端设置的超时时间,这时候链接会断开,那我们利用 goselect 机制,退出阻塞,并返回,不再进行 accept

处理代码如下:

package gracehttp

// about limit @see: "golang.org/x/net/netutil"

import (
    "net"
    "sync"
    "time"
)

type Listener struct {
    *net.TCPListener
    sem       chan struct{}
    closeOnce sync.Once     // ensures the done chan is only closed once
    done      chan struct{} // no values sent; closed when Close is called
}

func newListener(tl *net.TCPListener, n int) net.Listener {
    return &Listener{
        TCPListener: tl,
        sem:         make(chan struct{}, n),
        done:        make(chan struct{}),
    }
}

func (l *Listener) Fd() (uintptr, error) {
    file, err := l.TCPListener.File()
    if err != nil {
        return 0, err
    }
    return file.Fd(), nil
}

// override
func (l *Listener) Accept() (net.Conn, error) {
    acquired := l.acquire()
    tc, err := l.AcceptTCP()
    if err != nil {
        if acquired {
            l.release()
        }
        return nil, err
    }
    tc.SetKeepAlive(true)
    tc.SetKeepAlivePeriod(time.Minute)

    return &ListenerConn{Conn: tc, release: l.release}, nil
}

// override
func (l *Listener) Close() error {
    err := l.TCPListener.Close()
    l.closeOnce.Do(func() { close(l.done) })
    return err
}

// acquire acquires the limiting semaphore. Returns true if successfully
// accquired, false if the listener is closed and the semaphore is not
// acquired.
func (l *Listener) acquire() bool {
    select {
    case <-l.done:
        return false
    case l.sem <- struct{}{}:
        return true
    }
}

func (l *Listener) release() { <-l.sem }

type ListenerConn struct {
    net.Conn
    releaseOnce sync.Once
    release     func()
}

func (l *ListenerConn) Close() error {
    err := l.Conn.Close()
    l.releaseOnce.Do(l.release)
    return err
}

参考:grace-http:listener.go

gracehttp

现在我们把这个功能做得更优美有点,并提供一个开箱即用的代码库。
地址:Github-gracehttp

支持功能

  1. 平滑重启(Zero-Downtime);
  2. 平滑关闭;
  3. Server 添加(支持 HTTPHTTPS);
  4. 自定义日志组件;
  5. 支持单个端口 server 链接数限流,默认值为:C100K。超过该限制之后,链接阻塞进入等待,但是不消耗系统文件句柄,避免发生雪崩,压坏服务。

使用指南

添加服务

    import "fevin/gracehttp"
    
    ....

    // http
    srv1 := &http.Server{
        Addr:    ":80",
        Handler: sc,
    }
    gracehttp.AddServer(srv1, false, "", "")

    // https

    srv2 := &http.Server{
        Addr:    ":443",
        Handler: sc,
    }
    gracehttp.AddServer(srv2, true, "../config/https.crt", "../config/https.key")

    gracehttp.Run() // 此方法会阻塞,直到进程收到退出信号,或者 panic

如上所示,只需创建好 Server 对象,调用 gracehttp.AddServer 添加即可。

退出或者重启服务

  • 重启:kill -HUP pid
  • 退出:kill -QUIT pid

添加自定义日志组件

    gracehttp.SetErrorLogCallback(logger.LogConfigLoadError)

此处提供了三个 Set* 方法,分别对应不同的日志等级:

  • SetInfoLogCallback
  • SetNoticeLogCallback
  • SetErrorLogCallback

最后

实际中,很多情况会用到这种方式,不妨点个 star 吧!
欢迎一起来完善这个小项目,共同贡献代码。

查看原文

赞 10 收藏 7 评论 0

fevin 分享了头条 · 2018-04-27

知识点

赞 0 收藏 1 评论 0

认证与成就

  • 获得 132 次点赞
  • 获得 14 枚徽章 获得 0 枚金徽章, 获得 2 枚银徽章, 获得 12 枚铜徽章

擅长技能
编辑

开源项目 & 著作
编辑

  • my-vimrc

    This is my vimrc config for coding.

  • diffdir

    利用 vimdiff 在命令行下对文件夹下的文件进行递归式的对比。

  • gracehttp

    优雅的重启/关闭 Go http 程序。 目的:应用升级时,提供不间断的服务,优化服务体验。

注册于 2016-08-03
个人主页被 982 人浏览