yexiaobai

yexiaobai 查看完整档案

杭州编辑  |  填写毕业院校阿里巴巴  |  SRE 编辑填写个人主网站
编辑

就是不告诉你 O(∩_∩)O哈哈~。

个人动态

yexiaobai 发布了文章 · 2019-05-24

K8S client-go Patch example

使用Patch方式更新K8S的 API Objects 一共有三种方式:strategic merge patch, json-patch,json merge patch。关于这三种方式的文字描述区别可看官方文档update-api-object-kubectl-patch

我在本文中主要会介绍使用client-go的Patch方式,主要包括strategic merge patchjson-patch。不介绍json merge patch的原因,是该方式使用场景比较少,因此不做介绍,如果有同学有兴趣,可做补充。

StrategicMergePatch

新增Object值

本次示例以给一个node新增一个labels为例,直接上代码:

//根据Pod Sn 更新 pod
func UpdatePodByPodSn(coreV1 v1.CoreV1Interface, podSn string, patchData map[string]interface{}) (*apiv1.Pod, error) {
    v1Pod, err := coreV1.Pods("").Get(podSn, metav1.GetOptions{})
    if err != nil {
        logs.Error("[UpdatePodByPodSn]  get pod %v  fail %v", podSn, err)
        return nil, fmt.Errorf("[UpdatePodByPodSn]  get pod %v  fail %v", podSn, err)
    }

    namespace := v1Pod.Namespace
    podName := v1Pod.Name

    playLoadBytes, _ := json.Marshal(patchData)

    newV1Pod, err := coreV1.Pods(namespace).Patch(podName, types.StrategicMergePatchType, playLoadBytes)

    if err != nil {
        logs.Error("[UpdatePodByPodSn] %v pod Patch fail %v", podName, err)
        return nil, fmt.Errorf("[UpdatePodByPodSn] %v pod Patch fail %v", podName, err)
    }

    return newV1Pod, nil
}
注意:上面的PatchData 必须是以 {"metadata":...}的go struct, 如:`map[string]interface{}{"metadata": map[string]map[string]string{"labels": {
        "test2": "test2",
    }}}`

对应单元测试用例

func pod(podName string, nodeName string, labels map[string]string, annotations map[string]string) *v1.Pod {
    return &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: podName, Labels: labels, Annotations: annotations}, Spec: v1.PodSpec{NodeName: nodeName}, Status: v1.PodStatus{}}
}

func TestUpdatePodByPodSn(t *testing.T) {
    var tests = []struct {
        expectedError      interface{}
        expectedAnnotation string
        expectedLabel      string
        podSn              string
        patchData          map[string]interface{}
        v1Pod              []runtime.Object
    }{
        {nil, "test2", "", "1.1.1.1", map[string]interface{}{"metadata": map[string]map[string]string{"annotations": {
            "test2": "test2",
        }}},
            []runtime.Object{pod("1.1.1.1", "1.1.1.1", map[string]string{"test1": "test1"}, map[string]string{"test1": "test1"})},
        },
        {nil, "", "", "1.1.1.2", map[string]interface{}{"metadata": map[string]map[string]string{"labels": {
            "test2": "",
        }}},
            []runtime.Object{pod("1.1.1.2", "1.1.1.1", map[string]string{"test1": "test1"}, map[string]string{"test1": "test1"})},
        },
        {nil, "", "test2", "1.1.1.3", map[string]interface{}{"metadata": map[string]map[string]string{"labels": {
            "test2": "test2",
        }}},
            []runtime.Object{pod("1.1.1.3", "1.1.1.1", map[string]string{"test1": "test1"}, map[string]string{"test1": "test1"})},
        },
    }

    for _, test := range tests {
        client := fake.NewSimpleClientset(test.v1Pod...)

        v1Pod, err := UpdatePodByPodSn(client.CoreV1(), test.podSn, test.patchData)
        if err != nil {
            t.Errorf("expected error  %s, got %s", test.expectedError, err)
        }

        assert.Equal(t, v1Pod.Annotations["test2"], test.expectedAnnotation)
        assert.Equal(t, v1Pod.Labels["test2"], test.expectedLabel)
    }
}

修改Object的值

修改Obejct的值使用方式如下,当使用strategic merge patch的时候,如果提交的数据中键已经存在,那就会使用新提交的值替换原先的数据。依旧以修改labels的值为例。
如新提交的数据为:

{
  "metadata":{
      "labels":{
          "test2":"test3",
      },
  }
}

Node中已经存在的labels为:

{
  "metadata":{
      "labels":{
          "test2":"test1",
      },
  }
}

最终Node中labels的key为test2的值会被替换成 test3

删除Object值

当需要把某个Object的值删除的时候,当使用strategic merge patch的时候,依旧是删除labels为例提交方式是:

golang里面的表现形式是:

{
  "metadata":{
      "labels":{
          "test2":nil
      },
  }
}

对应从浏览器提交的数据是:

{
  "metadata":{
      "labels":{
          "test2":null
      },
  }
}
PS:如果不喜欢使用上面struct的方式组成数据,可以使用如下的方式 labelsPatch := fmt.Sprintf({"metadata":{"labels":{"%s":"%s"}}}, labelkey, labelvalue) 直接代替上面示例中的patchData

JSONPatch

JSONPatch的详细说明请参考文档:http://jsonpatch.com/
JSONPatch 主要有三种操作方式:add,replace,remove。以下会以代码示例说明这三种操作在Client-go对应的代码示例来说明怎样操作K8s 的资源。

使用JSONPatch,如果Patch中带有斜杠“/”和 (~)这两个字符,不能直接传入这两个字符,需要你输入的时候就人工转换下,/转换成~1~转换成~0。以新增labels为例,如我要新增一个"test1/test2":"test3"的labels,可以把要传入的数据修改为"test1~1test2":"test3"即可。

Op:add

使用JSONPatch的方式新增一个标签,其提交的数据格式必须是[{ "op": "replace", "path": "/baz", "value": "boo" }] 这样的。代码如下:

//patchStringValue specifies a patch operation for a string.
type PatchStringValue struct {
    Op    string      `json:"op"`
    Path  string      `json:"path"`
    Value interface{} `json:"value"`
}

type PatchNodeParam struct {
    coreV1       v1.CoreV1Interface
    NodeSn       string                 `json:"nodeSn"`
    OperatorType string                 `json:"operator_type"`
    OperatorPath string                 `json:"operator_path"`
    OperatorData map[string]interface{} `json:"operator_data"`
}


//patch node info, example label, annotation
func patchNode(param PatchNodeParam) (*apiv1.Node, error) {
    coreV1 := param.coreV1
    nodeSn := param.NodeSn

    node, err := coreV1.Nodes().Get(nodeSn, metav1.GetOptions{})

    if err != nil {
        return nil, err
    }

    operatorData := param.OperatorData
    operatorType := param.OperatorType
    operatorPath := param.OperatorPath

    var payloads []interface{}

    for key, value := range operatorData {
        payload := PatchStringValue{
            Op:    operatorType,
            Path:  operatorPath + key,
            Value: value,
        }

        payloads = append(payloads, payload)

    }

    payloadBytes, _ := json.Marshal(payloads)

    newNode, err := coreV1.Nodes().Patch(nodeSn, types.JSONPatchType, payloadBytes)

    if err != nil {
        return nil, err
    }

    return newNode, err
}

单元测试:

func TestPatchNode(t *testing.T) {
    Convey("test patchNode", t, func() {
        Convey("Patch Node fail", func() {
            var tests = []struct {
                nodeSn        string
                operatorType  string
                operatorPath  string
                operatorData  map[string]interface{}
                expectedError interface{}
                expectedValue *v1.Node
                objs          []runtime.Object
            }{
                {"1.1.1.1", "add", "/metadata/labels/",
                    map[string]interface{}{
                        "test1": "test1",
                        "test2": "test2"},
                    "nodes \"1.1.1.1\" not found", nil, nil},
                {"1.1.1.1", "aaa", "/metadata/labels/",
                    map[string]interface{}{
                        "test1": "test1",
                        "test2": "test2"},
                    "Unexpected kind: aaa", nil, []runtime.Object{node("1.1.1.1", nil, nil)}},
            }


            for _, test := range tests {
                client := fake.NewSimpleClientset(test.objs...)

                param := PatchNodeParam{
                    coreV1:       client.CoreV1(),
                    NodeSn:       test.nodeSn,
                    OperatorType: test.operatorType,
                    OperatorPath: test.operatorPath,
                    OperatorData: test.operatorData,
                    EmpId:        test.empId,
                }

                output, err := patchNode(param)

                So(output, ShouldEqual, test.expectedValue)
                So(err.Error(), ShouldEqual, test.expectedError)

            }
        })

        Convey("Patch Node success", func() {
            var tests = []struct {
                nodeSn        string
                operatorType  string
                operatorPath  string
                operatorData  map[string]interface{}
                expectedError interface{}
                expectedValue string
                objs          []runtime.Object
            }{
                {"1.1.1.1", "add", "/metadata/labels/",
                    map[string]interface{}{
                        "test1": "test1",
                        "test2": "test2"},
                    nil, "1.1.1.1", []runtime.Object{node("1.1.1.1", map[string]string{"test3": "test3"}, map[string]string{"test3": "test3"})}},
                {"1.1.1.1", "add", "/metadata/labels/",
                    map[string]interface{}{
                        "test1": "test1",
                        "test2": "test2"},
                    nil, "1.1.1.1", []runtime.Object{node("1.1.1.1", map[string]string{"test1": "modifytest"}, map[string]string{"test1": "modifytest"})}},
            }

            for _, test := range tests {

                client := fake.NewSimpleClientset(test.objs...)

                param := PatchNodeParam{
                    coreV1:       client.CoreV1(),
                    NodeSn:       test.nodeSn,
                    OperatorType: test.operatorType,
                    OperatorPath: test.operatorPath,
                    OperatorData: test.operatorData,
                }

                output, err := patchNode(param)

                So(output, ShouldNotBeNil)
                So(err, ShouldBeNil)
                So(output.Name, ShouldEqual, test.expectedValue)
            }
        })

    })

}
使用add有个需要注意的地方就是,当你的Path是使用的/metadata/labels而不是/metadata/labels/labelkey的时候,那你这个add操作实际是对整个labels进行替换,而不是新增,一定要注意避免踩坑。
PS:如果不喜欢使用上面struct的方式组成数据,可以使用如下的方式 labelsPatch := fmt.Sprintf([{"op":"add","path":"/metadata/labels/%s","value":"%s" }], labelkey, labelvalue) 直接代替上面示例中的patchData

Op:remove

要删除一个标签的话,代码和增加区别不大,唯一的区别就是提交的数据要由键值对修改为提交一个string slice类型[]string,代码如下:

type PatchNodeParam struct {
    coreV1       v1.CoreV1Interface
    NodeSn       string                 `json:"nodeSn"`
    OperatorType string                 `json:"operator_type"`
    OperatorPath string                 `json:"operator_path"`
    OperatorData map[string]interface{} `json:"operator_data"`
}

//patchStringValue specifies a remove operation for a string.
type RemoveStringValue struct {
    Op   string `json:"op"`
    Path string `json:"path"`
}

//remove node info, example label, annotation
func removeNodeInfo(param RemoveNodeInfoParam) (*apiv1.Node, error) {
    coreV1 := param.coreV1
    nodeSn := param.NodeSn

    node, err := coreV1.Nodes().Get(nodeSn, metav1.GetOptions{})

    if err != nil {
        return nil, err
    }

    operatorKey := param.OperatorKey
    operatorType := param.OperatorType
    operatorPath := param.OperatorPath

    var payloads []interface{}

    for key := range operatorKey {
        payload := RemoveStringValue{
            Op:   operatorType,
            Path: operatorPath + operatorKey[key],
        }

        payloads = append(payloads, payload)

    }

    payloadBytes, _ := json.Marshal(payloads)

    newNode, err := coreV1.Nodes().Patch(nodeSn, types.JSONPatchType, payloadBytes)

    if err != nil {
        return nil, err
    }


    return newNode, err
}

Op:replace

replace操作,会对整个的Object进行替换。所以使用replace记住要把原始的数据取出来和你要新增的数据合并后再提交,如:

type ReplaceNodeInfoParam struct {
    coreV1       v1.CoreV1Interface
    NodeSn       string                 `json:"nodeSn"`
    OperatorType string                 `json:"operator_type"`
    OperatorPath string                 `json:"operator_path"`
    OperatorData map[string]interface{} `json:"operator_data"`
    DataType     string                 `json:"data_type"`
}

//patchStringValue specifies a patch operation for a string.
type PatchStringValue struct {
    Op    string      `json:"op"`
    Path  string      `json:"path"`
    Value interface{} `json:"value"`
}



func replaceNodeInfo(param ReplaceNodeInfoParam) (*apiv1.Node, error) {
    coreV1 := param.coreV1
    nodeSn := param.NodeSn

    node, err := coreV1.Nodes().Get(nodeSn, metav1.GetOptions{})

    if err != nil {
        return nil, err
    }

    var originOperatorData map[string]string

    dataType := param.DataType
    operatorData := param.OperatorData
    operatorType := param.OperatorType
    operatorPath := param.OperatorPath

    switch dataType {
    case "labels":
        originOperatorData = node.Labels
    case "annotations":
        originOperatorData = node.Annotations
    default:
        originOperatorData = nil
    }

    if originOperatorData == nil {
        return nil, fmt.Errorf("[replaceNodeInfo] fail, %v originOperatorData is nil", nodeSn)
    }

    for key, value := range originOperatorData {
        operatorData[key] = value
    }

    var payloads []interface{}

    payload := PatchStringValue{
        Op:    operatorType,
        Path:  operatorPath,
        Value: operatorData,
    }

    payloads = append(payloads, payload)

    payloadBytes, _ := json.Marshal(payloads)

    newNode, err := coreV1.Nodes().Patch(nodeSn, types.JSONPatchType, payloadBytes)

    if err != nil {
        return nil, err
    }

    return newNode, err
}

单元测试

func TestReplaceNodeInfo(t *testing.T) {
    Convey("test ReplaceNodeInfo", t, func() {
        Convey("Patch ReplaceNodeInfo fail", func() {
            var tests = []struct {
                nodeSn        string
                operatorType  string
                operatorPath  string
                dataType      string
                operatorData  map[string]interface{}
                expectedError interface{}
                expectedValue *v1.Node
                objs          []runtime.Object
            }{
                {"1.1.1.1", "add", "/metadata/labels", "labels",
                    map[string]interface{}{
                        "test1": "test1",
                        "test2": "test2"},
                    "nodes \"1.1.1.1\" not found", nil, nil},
                {"1.1.1.1", "aaa", "/metadata/annotations", "annotations",
                    map[string]interface{}{
                        "test1": "test1",
                        "test2": "test2"},
                    "[replaceNodeInfo] fail, 1.1.1.1 originOperatorData is nil", nil, []runtime.Object{node("1.1.1.1", nil, nil)}},
            }

            for _, test := range tests {
                client := fake.NewSimpleClientset(test.objs...)

                param := ReplaceNodeInfoParam{
                    coreV1:       client.CoreV1(),
                    NodeSn:       test.nodeSn,
                    OperatorType: test.operatorType,
                    OperatorPath: test.operatorPath,
                    OperatorData: test.operatorData,
                    DataType:     test.dataType,
                }

                output, err := replaceNodeInfo(param)

                So(output, ShouldEqual, test.expectedValue)
                So(err.Error(), ShouldEqual, test.expectedError)

            }
        })

        Convey("Patch Node success", func() {
            var tests = []struct {
                nodeSn             string
                operatorType       string
                operatorPath       string
                dataType           string
                operatorData       map[string]interface{}
                expectedError      interface{}
                expectedLabel      string
                expectedAnnotation string
                objs               []runtime.Object
            }{
                {"1.1.1.1", "replace", "/metadata/labels", "labels",
                    map[string]interface{}{
                        "test1": "test1",
                        "test2": "test2"},
                    nil, "test3", "", []runtime.Object{node("1.1.1.1", map[string]string{"test3": "test3"}, map[string]string{"test3": "test3"})}},
                {"1.1.1.1", "replace", "/metadata/annotations", "annotations",
                    map[string]interface{}{
                        "test1": "test1",
                        "test2": "test2"},
                    nil, "", "modifytest", []runtime.Object{node("1.1.1.1", map[string]string{"test1": "modifytest"}, map[string]string{"test1": "modifytest"})}},
            }

            for _, test := range tests {

                client := fake.NewSimpleClientset(test.objs...)

                param := ReplaceNodeInfoParam{
                    coreV1:       client.CoreV1(),
                    NodeSn:       test.nodeSn,
                    OperatorType: test.operatorType,
                    OperatorPath: test.operatorPath,
                    OperatorData: test.operatorData,
                    DataType:     test.dataType,
                }

                output, err := replaceNodeInfo(param)

                So(output, ShouldNotBeNil)
                So(err, ShouldBeNil)
                So(output.Labels["test3"], ShouldEqual, test.expectedLabel)
                So(output.Annotations["test1"], ShouldEqual, test.expectedAnnotation)
            }
        })

    })
}

PS:如各位还有其他更好的方式,欢迎交流补充。

查看原文

赞 1 收藏 0 评论 1

yexiaobai 收藏了文章 · 2019-03-21

golang防缓存击穿利器--singleflight

缓存击穿

    给缓存加一个过期时间,下次未命中缓存时再去从数据源获取结果写入新的缓存,这个是后端开发人员再熟悉不过的基操。本人之前在做直播平台活动业务的时候,当时带着这份再熟练不过的自信,把复杂的数据库链表语句写好,各种微服务之间调用捞数据最后算好的结果,丢进了缓存然后设了一个过期时间,当时噼里啪啦两下写完代码觉得稳如铁蛋,结果在活动快结束之前,数据库很友好的挂掉了。当时回去查看监控后发现,是在活动快结束前,大量用户都在疯狂的刷活动页,导致缓存过期的瞬间有大量未命中缓存的请求直接打到数据库上所导致的,所以这个经典的问题稍不注意还是害死人

    防缓存击穿的方式有很多种,比如通过计划任务来跟新缓存使得从前端过来的所有请求都是从缓存读取等等。之前读过 groupCache的源码,发现里面有一个很有意思的库,叫singleFlight, 因为groupCache从节点上获取缓存如果未命中,则会去其他节点寻找,其他节点还没有的话再从数据源获取,所以这个步骤对于防击穿非常有必要。singleFlight使得groupCache在多个并发请求对一个失效的key进行源数据获取时,只让其中一个得到执行,其余阻塞等待到执行的那个请求完成后,将结果传递给阻塞的其他请求达到防止击穿的效果。

SingleFlight 使用Demo

本文模拟一个数据源是从调用rpc获取的场景
图片描述
然后再模拟一百个并发请求在缓存失效的瞬间同时调用rpc访问源数据
图片描述
效果
图片描述
可以看到100个并发请求从源数据获取时,rpcServer端只收到了来自client 17的请求,而其余99个最后也都得到了正确的返回值。

SingleFlight 源码剖析

在看完singleFlight的实际效果后,欣喜若狂,想必其实现应该相当复杂吧, 结果翻看源码一看, 100行不到的代码就解决了这么个业务痛点, 不得不佩服。

package singlefilght

import "sync"

type Group struct {
    mu sync.Mutex
    m map[string]*Call // 对于每一个需要获取的key有一个对应的call
}

// call代表需要被执行的函数
type Call struct {
    wg sync.WaitGroup // 用于阻塞这个调用call的其他请求
    val interface{} // 函数执行后的结果
    err error         // 函数执行后的error
}

func (g *Group) Do(key string, fn func()(interface{}, error)) (interface{}, error) {

    g.mu.Lock()
    if g.m == nil {
        g.m = make(map[string]*Call)
    }
    
    // 如果获取当前key的函数正在被执行,则阻塞等待执行中的,等待其执行完毕后获取它的执行结果
    if c, ok := g.m[key]; ok {
        g.mu.Unlock()
        c.wg.Wait()
        return c.val, c.err
    }

    // 初始化一个call,往map中写后就解
    c := new(Call)
    c.wg.Add(1)
    g.m[key] = c
    g.mu.Unlock()
    
  // 执行获取key的函数,并将结果赋值给这个Call
    c.val, c.err = fn()
    c.wg.Done()
    
    // 重新上锁删除key
    g.mu.Lock()
    delete(g.m, key)
    g.mu.Unlock()

    return c.val, c.err

}

    对的没看错, 就这么100行不到的代码就能解决缓存击穿的问题,这算是我写过最愉快的一篇博了,同时也推荐大家去读一读groupCache这个项目的源码,会有更多惊喜的发现

查看原文

yexiaobai 回答了问题 · 2018-06-08

解决VUE 传值问题

已经通过使用 Vuex 解决这个变量传值问题

关注 2 回答 2

yexiaobai 提出了问题 · 2018-06-07

解决VUE 传值问题

图片描述
图片描述

问题描述

  1. 使用的 n3 框架的 n3-data-table组件,勾选按钮的时候,会触发图1的 onSelect 函数,checkRows 是选中的值,我的需求是怎么样才能把这个checkRows值传给 data 中的 needApplyAppTransfers 这个变量。目前按照我图1所示的方法

self.needApplyAppTransfers = checkRows 是不行的,虽然第一个的选择的数据打印出来了 needApplyAppTransfers的值, 但是在调用方法 applyServer,打印的 appTransfers 数据依旧为空。

关注 2 回答 2

yexiaobai 发布了文章 · 2018-04-23

抓包工具 ngrep - 入门指南

安装

下载 ngrep 的源码包

wget http://prdownloads.sourceforge.net/ngrep/ngrep-1.45.tar.bz2?download

解压刚刚下载的源码包:

tar -xvf ngrep-1.45.tar.bz

编译安装:

cd ngrep-1.45
./configure
make && make install
注:在编译过程中,如果你的系统未安装 libpcap 就如下错误:
checking for a broken redhat glibc udphdr declaration... no
checking for a complete set of pcap headers... no
**!!! couldn't find a complete set of pcap headers**

解决办法,就是安装 libpcap 包,我的系统是 CentOS,所以安装方法是:

yum -y install libpcap*
查看原文

赞 0 收藏 0 评论 0

yexiaobai 发布了文章 · 2018-04-22

Istio 架构

SideCar 解释的参考文档 Qcon2017 实录 | Service Mesh:下一代微服务

Istio 服务网格被逻辑的区分为数据层和控制层。

  • 数据层由一套作为 sidecars部署的智能代理(Envoy)组成,用于调节和控制微服务之间通信的所有流量。
  • 控制层负责管理和配置策略用于路由流量,同时也在运行期执行策略。

下图展示了组成每层的不同组件。

此处输入图片的描述

SideCar解释
此处输入图片的描述

Sidecar 这个词中文翻译为边车,或者车斗,也有一个乡土气息浓重的翻译叫做边三轮。Sidecar 这个东西出现的时间挺长的,它在原有的客户端和服务端之间加多了一个代理。

Envoy

Istio 使用 Envoy 代理的扩展版本,一个使用 c++开发的高性能代理,为了调节在服务网格中所有服务的进出流量。Istio 利用了Envoy的许多内建特性,比如动态服务发现,负载均衡,TLS终止,HTTP/2 & gRPC 代理,断路器,健康检查,分阶段的基于百分比的流量分配,故障注入和丰富的度量。

Envoy 作为一个 SideCar 部署与相关服务部署在相同的Kubernetes pod。这允许Istio提取大量流量行为的信号如Istio 属性,反过来可以在Mixer使用执行策略决定,并被发送到监控系统来提供关于整个网格行为的信息。sidecar 代理模型允许你添加Istio能力到一个已经存在的部署而不需要重新构建或重新写代码。你可以在我们的设计目标中阅读关于为什么我们选择这种方式的更多的信息

Mixer

Mixer 是一个平台无关的组件,负责执行访问控制,跨服务网格使用策略,以及收集从Envoy代理和其他服务自动探测到的数据。代理提取请求等级属性,被发送给Mixer用于计算。这个属性提取更多的信息可以在Mixer 配置中找到,Mixer包含一个非常灵活的插件模型使其能够和各种主机环境和后端基础架构交互,从这些细节中抽象出Envoy代理和Istio管理的服务。

Pilot

Pilot为Envoy sidecar 提供了服务发现功能,为智能路由提供了流量管理能力(比如A/B测试,金丝雀发布等等),以及弹性(超时控制,重试,断路器等等)。它转换高级路由规则,将流量行为控制在Envoy特定的配置中,并且在运行期把他们传播到sidecars。Pilot抽象特定平台的服务发现方法,并将他们合成为可被任何sidecar消费的标准格式,其符合Envoy 数据层 API。这种松散的耦合允许Istio运行在多个环境中(比如,Kubernetes, Consul/Nomad等等),然而只需要维护相同操作接口进行流量管理。

Pilot 的架构图

clipboard.png

  • Envoy API 负责和 Envoy 的通讯, 主要是发送服务发现信息和流量控制规则给 Envoy
  • Envoy 提供服务发现,负载均衡池和路由表的动态更新的 API。这些 API 将 Istio 和 - Envoy 的实现解耦。(另外,也使得 Linkerd 之类的其他服务网络实现得以平滑接管 Envoy)
  • Polit 定了一个抽象模型,以从特定平台细节中解耦,为跨平台提供基础
  • Platform Adapter 则是这个抽象模型的现实实现版本, 用于对接外部的不同平台
  • 最后是 Rules API,提供接口给外部调用以管理 Pilot,包括命令行工具 Istioctl 以及未来可能出现的第三方管理界面

Istio 的流量管理:

图片描述

Istio-Auth

Istio-Auth使用 mutual TLS提供强大的服务到服务的和终端用户的身份认证,具有内置的身份和凭证管理。它可以用于升级在服务网格中未加密的流量,提供基于服务身份策略而不是网络控制的执行能力。Istio 的未来版本将增加细粒度的访问控制,审计控制以及监控谁访问了你的服务、API或者资源,使用各种访问控制机制,包括属性和基于角色的访问控制以及授权钩子。

参考文档:

查看原文

赞 2 收藏 1 评论 1

yexiaobai 发布了文章 · 2018-04-22

为高效 Ops 和 SRE 团队准备的 10 个开源 k8s 工具

如果你正在 Kubernetes 上工作,你的 SRE 和 Ops 团队需要正确的工具来确保Kubernetes集群的高可用和在其中运行的工作负载。这里我们列出了10个开源Kubernetes工具来使得你的SRE和Ops团队更高效的达到他们的服务水平目标(SLA)。

Kube-ops-view

Kube-ops-view为多个Kubernetes集群提供了一个通用的操作视图,对于SRE和Ops团队来说这是一个方便的工具,Kube-ops-view提供只读的系统仪表。Kube-ops-view 提供了一些非常酷的特性:

  • 在多个Kubernetes 集群间切换。
  • 渲染节点并指明它们的总体状态(“Ready”)。
  • 展示节点的容量和资源利用率(CPU,内存)。
  • 指明pods的状态(绿色:ready/running,红色:error等)。
  • 为节点和pods提供工具提示信息。
  • pod创建和终止。
  • 使用屏幕令牌在TV屏幕上提供仪表盘。

此处输入图片的描述

Cabin

Cabin是一个Kubernetes 的原生的手机App仪表盘。Cabin UI是使用React Native,因此可以运行在 IOS和Android硬件上。它是一个移动助手,提供了细粒度操作来维护Kubernetes 资源。Cabin app做了触摸优化。例如,你可以通过一个左滑动来删除一个pod。你也可以通过一个手指滚动来扩展部署。
此处输入图片的描述
此处输入图片的描述
此处输入图片的描述
此处输入图片的描述
Cabin 的一些有趣的特性:

  • 无缝的支持 Google Kubernetes Engine (GKE),你可以直接在你的移动手机上创建 GKE集群。
  • 早期支持帮助图表,你可以浏览图表库,并且通过点击移动运行图表。
  • 访问 pod 日志,通过标签(label)搜索资源,通过改变你部署的镜像触发滚动升级等。

Kubectx

如果你工作在多k8s 集群,kubectx是另外一个必须要有的工具。Kubectx与kubens捆绑在一起,当你使用kubectl的时候,允许你在Kubernetes集群和命名空间之间切换。
此处输入图片的描述
此处输入图片的描述

kubectx 和 kubens支持在bash/zsh 环境通过 tab 来帮助你补全长的长下文名称。你不在需要记得完整的上下文名称。

Kube-shell

Kube-shell是一个和Kubernetes CLI集成的 Shell,它有一些非常漂亮的特性,比如:

  • 自动补全命令,自我提示,联机文档。
  • 通过使用 up/down 键盘命令访问历史命令的执行。
  • 从kubeconfig获取当前上下文,在 集群/命名空间之间非常容易的切换。

此处输入图片的描述

相关工具

Kube-prompt是另外一个有自动补全特性的交互式Kubernetes客户端。它接受没有kubect前缀的命令。

另外,Kube-ps1是一个类似的脚本让你添加配置在kubectl的当前Kubernetes 上下文和命名空间到你的Bash/Zsh提示字符。

最后,Kail是一个 Kubernetes tail。作为一个Kubernetes日志查看器,kail允许你使用选择器从匹配的pods流式的查看日志。

Kail - kubernetes 日志查看器
此处输入图片的描述

你可以基于标准的标签选择器匹配 pods,通过名字,通过服务,通过部署,等等。

Stern是另外一个专注于 pods和 pods中容器的日志 tail 解决方案。使用 Stern,为了快速调试,结果是有颜色的。

Telepresence

Telepresence是一个开源的工具,可以让你在本地调试服务,虽然该服务与它位于远程Kubernetes集群或者是远程云服务资源(如数据库)的依赖服务保持连接。

Telepresence 本地开发和和对远程Kubernetes 集群的调试。

此处输入图片的描述

就个人而言,我认为Telepresence有很大的潜力,对于运行在 Kubernetes 上的服务而言,Telepresence已经是一个丰富的本地开发环境。在线调试是一个新事物,但是发展很快。

Weave Scope

Weave Scope是一个Docker 和 Kubernetes的排错&监控工具。它为你的应用自动的构建逻辑拓扑以及基础设施,以便你的SRE和Ops团队可以直观的明白,监控,控制你的容器,基于应用的微服务。
此处输入图片的描述
此处输入图片的描述

除了拓扑视图,Weave Scope也提供了一个深入视图,比如节点和进程之间的任何事情,包括部署,服务,副本集,pods 和容器。另外,你基于CPU和内存使用率应用过滤,或者是通过名字,标签,甚至路径使用搜索快速的找到节点类型,容器和进程。

PowerfulSeal

PowerfulSeal 的灵感来源于 Chaos Monkey,由 Bloomberg 工程师团队开发。它可以给你的Kubernetes集群添加混乱,如杀掉目标的pods或者是节点。它以两个模式操作:交互式和自治的。

  • 交互式模式被设计为允许你发现你的集群组件,并且人工的停止一些事情看会发生什么。它操作在节点,pods,部署,和命名空间上。
  • 自治模式读取一个策略文件,可以包含任意数量的pod和节点的场景。每个场景描述了在集群上匹配,过滤,和行动的列表。

策略文件是以YAML 格式编写的,包含将被自治客户端执行的场景。

相关工具

kube-monkey是用于Kubernetes集群的Netflix的Chaos Monkey
的另外一种实现方式。它随机删除在Kubernetes集群中的pods,鼓励并验证恢复服务的开发。

Marmot

Marmot是一个来自于谷歌的工作流执行引擎,用于处理SRE和Ops需要的工作流。它被设计为处理基础架构变更的工具,但它可以和Kubernetes一起使用。

它特别适用于那些有一定节奏,可能需要对健康进行状态检查的任何类型的操作。因此,比如,你正在使用大量实例发布一个新服务版本,这时你执行了一个增量在受控的发布(金丝雀发布)。

Ark

Ark 是一个用于管理从你的Kubernetes资源和卷做灾难恢复的工具。Ark提供一个简单并且鲁棒的方式来备份和从系列的检查点恢复Kubernetes资源和持久化的卷。备份文件被存储在一个对象存储服务(如,Amazon S3)。

Ark 确保你以一个高效的方式自动化以下场景:

  • 减低灾难恢复的TTR(响应时间)。
  • Kubernetes API 对象提供跨云服务器商迁移。
  • 通过复制生产环境副本,开发和测试环境的设置(+CI)。

Ark 附带一个集群服务(Ark server)和CLI(Ark 客户端),集群服务最重要的工作就是它运行所有的Ark控制器。Ark服务器执行实际的备份,校验,和把备份文件加载进云对象存储中。

Sysdig

Sysdig是一个容器排错工具,它可以捕获系统调用和来自于Linux内核的事件。简单的说,对于整个集群,Sysdig就是strace + tcpdump + htop + iftop + lsof + wireshark。

  • Sysdig 在物理机和虚拟机的操作系统级别使用。通过安装进Linux内核,捕获系统调用和其他操作系统事件。Sysdig 也可以为系统活动创建trace文件。

相关工具

Sysdig Inspect是一个可视化通过Sysdig收集的数据的接口。Sysdig Inspect 使得SRE和Ops团队在容器排错和安全调查方面很方便。

  • Inspect的用户接口被设计为直观的导航Sysdig捕获的数据,包含系统,网络和一个Linux系统的应用活动。Sysdig Inspect帮助你明白趋势,相关性的指标,和大海捞针(从一堆数据中找到关键数据)。它包含了功能设计来支持性能和安全调查,深入容器查询。

此处输入图片的描述

Sysdig Falco 是另外一个构建与Sysdig收集的数据基础之上的工具。Falco监控活动行为,它被设计为发现你应用中异常的活动。比如,使用Falco你可以发现活动,如:

  • 运行在一个容器中的脚本。
  • 一个运行在私密模式的容器。
  • 一个挂载在敏感主机的容器。

最后的思考

Kubernetes 生态系统正在爆炸性增长。有大量的开源和商业工具可以帮助你更高效的操作非关键性的Kubernetes集群和服务。

查看原文

赞 4 收藏 3 评论 0

yexiaobai 发布了文章 · 2018-04-22

golang 的channels 行为

简介

当我第一次使用 Go 的 channels 工作的时候,我犯了一个错误,把 channels 考虑为一个数据结构。我把 channels 看作为 goroutines 之间提供自动同步访问的队列。这种结构上的理解导致我写了很多不好且结构复杂的并发代码。

随着时间的推移,我认识到最好的方式是忘记 channels 是数据结构,转而关注它的行为。所以现在谈论到 channels,我只考虑一件事情:signaling(信号)。一个 channel 允许一个 goroutine 给另外一个发特定事件的信号。信号是使用 channel 做一切事情的核心。将 channel 看作是一种信号机制,可以让你写出明确定义和精确行为的更好代码。

为了理解信号怎样工作,我们必须理解以下三个特性:

  • 交付保证
  • 状态
  • 有数据或无数据

这三个特性共同构成了围绕信号的设计哲学,在讨论这些特性之后,我将提供一系列代码示例,这些示例将演示使用这些属性的信号。

交付保证

交付保证基于一个问题:“我是否需要保证由特定的 goroutine 发送的信号已经被接收?”

换句话说,我们可以给出清单1的示例:

清单1

01 go func() {
02     p := <-ch // Receive
03 }()
04
05 ch <- "paper" // Send

发送的 goroutine 是否需要保证在第五行中发送给 channel 的 paper,在继续执行前, 会被第二行的 goroutine 接收。

基于这个问题的答案,你将知道使用两种类型的 channels 中的哪种:无缓冲有缓冲。每个channel围绕交付保证提供不同的行为。

图1

clipboard.png

保证很重要,并且如果你不这样认为,我有很多东西兜售给你。当然,我想开个玩笑,当你的生活没有保障的时候你不会害怕吗?在编写并发代码时,对是否需要一项保证有很强的理解是至关重要的。随着继续,你将学会如何做决策。

状态

一个 channel 的行为直接被它当前的状态所影响。一个channel 的状态是:nilopenclosed

下面的清单2展示了怎样声明或把一个 channel放进这三个状态。

清单2

// ** nil channel

// A channel is in a nil state when it is declared to its zero value
var ch chan string

// A channel can be placed in a nil state by explicitly setting it to nil.
ch = nil


// ** open channel

// A channel is in a open state when it’s made using the built-in function make.
ch := make(chan string)    


// ** closed channel

// A channel is in a closed state when it’s closed using the built-in function close.
close(ch)

状态决定了怎样send(发送)和receive(接收)操作行为。

信号通过一个 channel 发送和接收。不要说读和写,因为 channels 不执行 I/O。

图2

clipboard.png

当一个 channel 是 nil 状态,任何试图在 channel 的发送或接收都将会被阻塞。当一个 channel 是在 open 状态,信号可以被发送和接收。当一个 channel 被置为 closed 状态,信号将不在被发送,但是依然可以接收信号。

这些状态将在你遭遇不同的情况的时候可以提供不同的行为。当结合状态交付保证,作为你设计选择的结果,你可以分析你承担的成本/收益。你也可以仅仅通过读代码快速发现错误,因为你懂得 channel 将表现出什么行为。

有数据和无数据

最后的信号特性需要考虑你是否需要信号有数据或者无数据。

在一个 channel 中有数据的信号被执行一个发送。

清单3

01 ch <- "paper"

当你的信号有数据,它通常是因为:

  • 一个 goroutine 被要求启动一个新的 task。
  • 一个 goroutine 传达一个结果。

无数据信号通过关闭一个 channel。

清单4

01 close(ch)

当信号没有数据的时候,它通常是因为:

  • 一个 goroutine 被告知停止它正在做的事情。
  • 一个 goroutine 报告它们已经完成,没有结果。
  • 一个 goroutine 报告它已经完成处理并且关闭。

这些规则也有例外,但这些都是主要的用例,并且我们将在本文中重点讨论这些问题。我认为这些规则例外的情况是最初的代码味道。

无数据信号的一个好处是一个单独的 goroutine 可以立刻给很多 goroutines 信号。有数据的信号通常是在 goroutines 之间一对一的交换数据。

有数据信号

当你使用有数据信号的时候,依赖于你需要保证的类型,有三个channel配置选项可以选择。

图3:有数据信号

clipboard.png

这三个 channel 选项是:Unbuffered, Buffered >1Buffered =1

  • 有保证

    • 一个无缓冲的channel给你保证被发送的信号已经被接收。

      • 因为信号接收发生在信号发送完成之前。
  • 无保证

    • 一个 size > 1 的有缓冲的 channel 不会保证发送的信号已经被接收。

      • 因为信号发送发生在信号接送完成之前。
  • 延迟保证

    • 一个 size = 1 的有缓冲 channel 提供延迟保证。它可以保证先前发送的信号已经被接收。

      • 因为第一个接收信号,发生在第二个完成的发送信号之前。

缓冲大小绝对不能是一个随机数字,它必须是为一些定义好的约束而计算出来的。在计算中没有无穷大,无论是空间还是时间,所有的东西都必须要有良好的定义约束。

无数据信号

无数据信号主要用于取消,它允许一个 goroutine 发送信号给另外一个来取消它们正在做的事情。取消可以被有缓冲和无缓冲的channels实现,但是在没有数据发送的情况下使用缓冲 channel 会更好。

图4:无数据信号

clipboard.png

内建的函数 close 被用于无数据信号。正如上面状态章节所解释的那样,你依然可以在channel关闭的时候接收信号。实际上,在一个关闭的channel上的任何接收都不会被阻塞,并且接收操作将一直返回。

在大多数情况下,你想使用标准的库 context 包来实现无数据信号。context 包使用一个无缓冲channel传递信号以及内建函数close发送无数据信号。

如果你选择使用你自己的 channel 而不是 context包来取消,你的channel 应该是chan struct{} 类型,这是一种零空间的惯用方式,用来表示一个信号仅仅用于信号传递。

场景

有了这些特性,更进一步理解它们在实践中怎样工作的最好方式就是运行一系列的代码场景。当我在读写 channel 基础代码的时候,我喜欢把goroutines想像成人。这个形象对我非常有帮助,我将把它用作下面的辅助工具。

有数据信号 - 保证 - 无缓冲 Channels

当你需要知道一个被发送的信号已经被接收的时候,有两种情况需要考虑。它们是 等待任务等待结果

场景1 - 等待任务

考虑一下作为一名经理,需要雇佣一名新员工。在本场景中,你想你的新员工执行一个任务,但是他们需要等待直到你准备好。这是因为在他们开始前你需要递给他们一份报告。

清单5

在线演示地址

01 func waitForTask() {
02     ch := make(chan string)
03
04     go func() {
05         p := <-ch
06
07         // Employee performs work here.
08
09         // Employee is done and free to go.
10     }()
11
12     time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
13
14     ch <- "paper"
15 }

在清单5的第2行,一个带有属性的无缓冲channel被创建,string 数据将与信号一起被发送。在第4行,一名员工被雇佣并在开始工作前,被告诉等待你的信号【在第5行】。第5行是一个 channel 接收,引起员工阻塞直到等到你发送的报告。一旦报告被员工接收,员工将执行工作并在完成的时候可以离开。

你作为经理正在并发的与你的员工工作。因此在第4行你雇佣员工之后,你发现你自己需要做什么来解锁并且发信号给员工(第12行)。值得注意的是,不知道要花费多长的时间来准备这份报告(paper)。

最终你准备好给员工发信号,在第14行,你执行一个有数据信号,数据就是那份报告。由于一个无缓冲的channel被使用,你得到一个保证就是一旦你操作完成,员工就已经接收到了这份报告。接收发生在发送之前。

技术上你所知道的一切就是在你的channel发送操作完成的同时员工接收到了这份报告。在两个channel操作之后,调度器可以选择执行它想要执行的任何语句。下一行被执行的代码是被你还是员工是不确定的。这意味着使用print语句会欺骗你关于事件的执行顺序。

场景2 - 等待结果

在下一个场景中,事情是相反的。这时你想你的员工一被雇佣就立即执行他们的任务。然后你需要等待他们工作的结果。你需要等待是因为在你继续前你需要他们发来的报告。

清单6
在线演示地址

01 func waitForResult() {
02     ch := make(chan string)
03
04     go func() {
05         time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
06
07         ch <- "paper"
08
09         // Employee is done and free to go.
10     }()
11
12     p := <-ch
13 }

成本/收益

无缓冲 channel 提供了信号被发送就会被接收的保证,这很好,但是没有任何东西是没有代价的。这个成本就是保证是未知的延迟。在等待任务场景中,员工不知道你要花费多长时间发送你的报告。在等待结果场景中,你不知道员工会花费多长时间把报告发送给你。

在以上两个场景中,未知的延迟是我们必须面对的,因为它需要保证。没有这种保证行为,逻辑就不会起作用。

有数据信号 - 无保证 - 缓冲 Channels > 1

场景1 - 扇出(Fan Out)

扇出模式允许你抛出明确定义数量的员工在同时工作的问题上。由于你每个任务都有一个员工,你很明确的知道你会接收多少个报告。你可能需要确保你的盒子有适量的空间来接收所有的报告。这就是你员工的收益,不需要等待你来提交他们的报告。但是他们确实需要轮流把报告放进你的盒子,如果他们几乎同一时间到达盒子。

再次假设你是经理,但是这次你雇佣一个团队的员工,你有一个单独的任务,你想每个员工都执行它。作为每个单独的员工完成他们的任务,他们需要给你提供一张报告放进你桌子上的盒子里面。

清单7
演示地址

01 func fanOut() {
02     emps := 20
03     ch := make(chan string, emps)
04
05     for e := 0; e < emps; e++ {
06         go func() {
07             time.Sleep(time.Duration(rand.Intn(200)) * time.Millisecond)
08             ch <- "paper"
09         }()
10     }
11
12     for emps > 0 {
13         p := <-ch
14         fmt.Println(p)
15         emps--
16     }
17 }

在清单7的第3行,一个带有属性的有缓冲channel被创建,string 数据将与信号一起被发送。这时,由于在第2行声明的 emps 变量,将创建有 20个缓冲的 channel。

在第5行和第10行之间,20 个员工被雇佣,并且他们立即开始工作。在第7行你不知道每个员工将花费多长时间。这时在第8行,员工发送他们的报告,但这一次发送不会阻塞等待接收。因为在盒子里为每位员工准备的空间,在 channel 上的发送仅仅与其他在同一时间想发送他们报告的员工竞争。

在 12 行和16行之间的代码全部是你的操作。在这里你等待20个员工来完成他们的工作并且发送报告。在12行,你在一个循环中,在 13 行你被阻塞在一个 channel 等待接收你的报告。一旦报告接收完成,报告在14被打印,并且本地的计数器变量被消耗来表明一个员工意见完成了他的工作。

场景2 - Drop

Drop模式允许你在你的员工在满负荷的时候丢掉工作。这有利于继续接受客户端的工作,并且从不施加压力或者是这项工作可接受的延迟。这里的关键是知道你什么时候是满负荷的,因此你不承担或过度承诺你将尝试完成的工作量。通常集成测试或度量可以帮助你确定这个数字。

假设你是经理,你雇佣了单个员工来完成工作。你有一个单独的任务想员工去执行。当员工完成他们任务时,你不在乎知道他们已经完成了。最重要的是你能或不能把新工作放入盒子。如果你不能执行发送,这时你知道你的盒子满了并且员工是满负荷的。这时候,新工作需要丢弃以便让事情继续进行。

清单8
演示地址

01 func selectDrop() {
02     const cap = 5
03     ch := make(chan string, cap)
04
05     go func() {
06         for p := range ch {
07             fmt.Println("employee : received :", p)
08         }
09     }()
10
11     const work = 20
12     for w := 0; w < work; w++ {
13         select {
14             case ch <- "paper":
15                 fmt.Println("manager : send ack")
16             default:
17                 fmt.Println("manager : drop")
18         }
19     }
20
21     close(ch)
22 }

在清单8的第3行,一个有属性的有缓冲 channel 被创建,string 数据将与信号一起被发送。由于在第2行声明的cap 常量,这时创建了有5个缓冲的 channel。

从第5行到第9行,一个单独的员工被雇佣来处理工作,一个 for range被用于循环处理 channel 的接收。每次一份报告被接收,在第7行被处理。

在第11行和19行之间,你尝试发送20分报告给你的员工。这时一个 select语句在第14行的第一个case被用于执行发送。因为default从句被用于第16行的select语句。如果发送被堵塞,是因为缓冲中没有多余的空间,通过执行第17行发送被丢弃。

最后在第21行,内建函数close被调用来关闭channel。这将发送没有数据的信号给员工表明他们已经完成,并且一旦他们完成分派给他们的工作可以立即离开。

成本/收益

有缓冲的 channel 缓冲大于1提供无保证发送的信号被接收到。离开保证是有好处的,在两个goroutine之间通信可以降低或者是没有延迟。在扇出场景,这有一个有缓冲的空间用于存放员工将被发送的报告。在Drop场景,缓冲是测量能力的,如果容量满,工作被丢弃以便工作继续。

在两个选择中,这种缺乏保证是我们必须面对的,因为延迟降低非常重要。0到最小延迟的要求不会给系统的整体逻辑造成问题。

有数据信号 - 延迟保证- 缓冲1的channel

场景1 - 等待任务

清单9
演示地址

01 func waitForTasks() {
02     ch := make(chan string, 1)
03
04     go func() {
05         for p := range ch {
06             fmt.Println("employee : working :", p)
07         }
08     }()
09
10     const work = 10
11     for w := 0; w < work; w++ {
12         ch <- "paper"
13     }
14
15     close(ch)
16 }

在清单9的第2行,一个带有属性的一个缓冲大小的 channel 被创建,string 数据将与信号一起被发送。在第4行和第8行之间,一个员工被雇佣来处理工作。for range被用于循环处理 channel 的接收。在第6行每次一份报告被接收就被处理。

在第10行和13行之间,你开始发送你的任务给员工。如果你的员工可以跑的和你发送的一样快,你们之间的延迟会降低。但是每次发送你成功执行,你需要保证你提交的最后一份工作正在被进行。

在最后的第15行,内建函数close 被调用关闭channel,这将会发送无数据信号给员工告知他们工作已经完成,可以离开了。尽管如此,你提交的最后一份工作将在 for range中断前被接收。

无数据信号 - Context

在最后这个场景中,你将看到从 Context 包中使用 Context 值怎样取消一个正在运行的goroutine。这所有的工作是通过改变一个已经关闭的无缓冲channel来执行一个无数据信号。

最后一次你是经理,你雇佣了一个单独的员工来完成工作,这次你不会等待员工未知的时间完成他的工作。你分配了一个截止时间,如果你的员工没有按时完成工作,你将不会等待。

清单10
演示地址

01 func withTimeout() {
02     duration := 50 * time.Millisecond
03
04     ctx, cancel := context.WithTimeout(context.Background(), duration)
05     defer cancel()
06
07     ch := make(chan string, 1)
08
09     go func() {
10         time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
11         ch <- "paper"
12     }()
13
14     select {
15     case p := <-ch:
16         fmt.Println("work complete", p)
17
18     case <-ctx.Done():
19         fmt.Println("moving on")
20     }
21 }

在清单10的第2行,一个时间值被声明,它代表了员工将花费多长时间完成他们的工作。这个值被用在第4行来创建一个50毫秒超时的 context.Context 值。context 包的 WithTimeout 函数返回一个 Context 值和一个取消函数。

context包创建一个goroutine,一旦时间值到期,将关闭与Context 值关联的无缓冲channels。不管事情如何发生,你需要负责调用cancel 函数。这将清理被Context创建的东西。cancel被调用不止一次是可以的。

在第5行,一旦函数中断,cancel函数被 deferred 执行。在第7行,1个缓冲的channels被创建,它被用于被员工发送他们工作的结果给你。在第09行和12行,员工被雇佣兵立即投入工作,你不需要指定员工花费多长时间完成他们的工作。

在第14行和20行之间,你使用 select 语句来在两个channels接收。在第15行的接收,你等待员工发送他们的结果。在第18行的接收,你等待看context 包是否正在发送信号50毫秒的时间到了。无论你首先收到哪个信号,都将有一个被处理。

这个算法的一个重要方面是使用一个缓冲的channels。如果员工没有按时完成,你将离开而不会给员工任何通知。对于员工而言,在第11行他将一直发送他的报告,你在或者不在那里接收,他都是盲目的。如果你使用一个无缓冲channels,如果你离开,员工将一直阻塞在那尝试你给发送报告。这会引起goroutine泄漏。因此一个缓冲的channels用来防止这个问题发生。

总结

当使用 channels(或并发) 时,在保证,channel状态和发送过程中信号属性是非常重要的。它们将帮助你实现你并发程序需要的更好的行为以及你写的算法。它们将帮助你找出bug和闻出潜在的坏代码。

在本文中,我分享了一些程序示例来展示信号属性工作在不同的场景中。凡事都有例外,但是这些模式是非常良好的开端。

作为总结回顾下这些要点,何时,如何有效地思考和使用channels:

语言机制

  • 使用 channels 来编排和协作 goroutines:

    • 关注信号属性而不是数据共享
    • 有数据信号和无数据信号
    • 询问它们用于同步访问共享数据的用途

      • 有些情况下,对于这个问题,通道可以更简单一些,但是最初的问题是。
  • 无缓冲 channels:

    • 接收发生在发送之前
    • 收益:100%保证信号被接收
    • 成本:未知的延迟,不知道信号什么时候将被接收。
  • 有缓冲 channels:

    • 发送发生在接收之前。
    • 收益:降低信号之间的阻塞延迟。
    • 成本:不保证信号什么时候被接收。

      • 缓冲越大,保证越少。
      • 缓冲为1可以给你一个延迟发送保证。
  • 关闭的 channels:

    • 关闭发生在接收之前(像缓冲)。
    • 无数据信号。
    • 完美的信号取消或截止。
  • nil channels:

    • 发送和接收都阻塞。
    • 关闭信号。
    • 完美的速度限制或短时停工。

设计哲学

  • 如果在 channel上任何给定的发送能引起发送 goroutine 阻塞:

    • 不允许使用大于1的缓冲channels。

      • 缓冲大于1必须有原因/测量。
    • 必须知道当发送 goroutine阻塞的时候发生了什么。
  • 如果在 channel 上任何给定的发送不会引起发送阻塞:

    • 每个发送必须有确切的缓冲数字。

      • 扇出模式。
    • 有缓冲测量最大的容量。

      • Drop 模式。
  • 对于缓冲而言,少即是多。

    • 当考虑缓冲的时候,不要考虑性能。
    • 缓冲可以帮助降低信号之间的阻塞延迟。

      • 降低阻塞延迟到0并不一定意味着更好的吞吐量。
      • 如果一个缓冲可以给你足够的吞吐量,那就保持它。
      • 缓冲大于1的问题需要测量大小。
      • 尽可能找到提供足够吞吐量的最小缓冲
查看原文

赞 13 收藏 25 评论 0

yexiaobai 发布了文章 · 2018-04-21

Java 8 CompletableFuture 教程

Java 8 有大量的新特性和增强如 Lambda 表达式StreamsCompletableFuture等。在本篇文章中我将详细解释清楚CompletableFuture以及它所有方法的使用。

什么是CompletableFuture?

在Java中CompletableFuture用于异步编程,异步编程是编写非阻塞的代码,运行的任务在一个单独的线程,与主线程隔离,并且会通知主线程它的进度,成功或者失败。

在这种方式中,主线程不会被阻塞,不需要一直等到子线程完成。主线程可以并行的执行其他任务。

使用这种并行方式,可以极大的提高程序的性能。

Future vs CompletableFuture

CompletableFuture 是 Future API的扩展。

Future 被用于作为一个异步计算结果的引用。提供一个 isDone() 方法来检查计算任务是否完成。当任务完成时,get() 方法用来接收计算任务的结果。

Callbale和 Future 教程可以学习更多关于 Future 知识.

Future API 是非常好的 Java 异步编程进阶,但是它缺乏一些非常重要和有用的特性。

Future 的局限性

  1. 不能手动完成
    当你写了一个函数,用于通过一个远程API获取一个电子商务产品最新价格。因为这个 API 太耗时,你把它允许在一个独立的线程中,并且从你的函数中返回一个 Future。现在假设这个API服务宕机了,这时你想通过该产品的最新缓存价格手工完成这个Future 。你会发现无法这样做。
  2. Future 的结果在非阻塞的情况下,不能执行更进一步的操作
    Future 不会通知你它已经完成了,它提供了一个阻塞的 get() 方法通知你结果。你无法给 Future 植入一个回调函数,当 Future 结果可用的时候,用该回调函数自动的调用 Future 的结果。
  3. 多个 Future 不能串联在一起组成链式调用
    有时候你需要执行一个长时间运行的计算任务,并且当计算任务完成的时候,你需要把它的计算结果发送给另外一个长时间运行的计算任务等等。你会发现你无法使用 Future 创建这样的一个工作流。
  4. 不能组合多个 Future 的结果
    假设你有10个不同的Future,你想并行的运行,然后在它们运行未完成后运行一些函数。你会发现你也无法使用 Future 这样做。
  5. 没有异常处理
    Future API 没有任务的异常处理结构居然有如此多的限制,幸好我们有CompletableFuture,你可以使用 CompletableFuture 达到以上所有目的。

CompletableFuture 实现了 FutureCompletionStage接口,并且提供了许多关于创建,链式调用和组合多个 Future 的便利方法集,而且有广泛的异常处理支持。

创建 CompletableFuture

1. 简单的例子
可以使用如下无参构造函数简单的创建 CompletableFuture:

CompletableFuture<String> completableFuture = new CompletableFuture<String>();

这是一个最简单的 CompletableFuture,想获取CompletableFuture 的结果可以使用 CompletableFuture.get() 方法:

String result = completableFuture.get()

get() 方法会一直阻塞直到 Future 完成。因此,以上的调用将被永远阻塞,因为该Future一直不会完成。

你可以使用 CompletableFuture.complete() 手工的完成一个 Future:

completableFuture.complete("Future's Result")

所有等待这个 Future 的客户端都将得到一个指定的结果,并且 completableFuture.complete() 之后的调用将被忽略。

2. 使用 runAsync() 运行异步计算
如果你想异步的运行一个后台任务并且不想改任务返回任务东西,这时候可以使用 CompletableFuture.runAsync()方法,它持有一个Runnable 对象,并返回 CompletableFuture<Void>

// Run a task specified by a Runnable Object asynchronously.
CompletableFuture<Void> future = CompletableFuture.runAsync(new Runnable() {
    @Override
    public void run() {
        // Simulate a long-running Job
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            throw new IllegalStateException(e);
        }
        System.out.println("I'll run in a separate thread than the main thread.");
    }
});

// Block and wait for the future to complete
future.get()

你也可以以 lambda 表达式的形式传入 Runnable 对象:

// Using Lambda Expression
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    // Simulate a long-running Job   
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
        throw new IllegalStateException(e);
    }
    System.out.println("I'll run in a separate thread than the main thread.");
});

在本文中,我使用lambda表达式会比较频繁,如果以前你没有使用过,建议你也多使用lambda 表达式。

3. 使用 supplyAsync() 运行一个异步任务并且返回结果
当任务不需要返回任何东西的时候, CompletableFuture.runAsync() 非常有用。但是如果你的后台任务需要返回一些结果应该要怎么样?

CompletableFuture.supplyAsync() 就是你的选择。它持有supplier<T> 并且返回CompletableFuture<T>T 是通过调用 传入的supplier取得的值的类型。

// Run a task specified by a Supplier object asynchronously
CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
    @Override
    public String get() {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            throw new IllegalStateException(e);
        }
        return "Result of the asynchronous computation";
    }
});

// Block and get the result of the Future
String result = future.get();
System.out.println(result);

Supplier<T> 是一个简单的函数式接口,表示supplier的结果。它有一个get()方法,该方法可以写入你的后台任务中,并且返回结果。

你可以使用lambda表达式使得上面的示例更加简明:

// Using Lambda Expression
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
        throw new IllegalStateException(e);
    }
    return "Result of the asynchronous computation";
});
一个关于Executor 和Thread Pool笔记
你可能想知道,我们知道runAsync() supplyAsync()方法在单独的线程中执行他们的任务。但是我们不会永远只创建一个线程。
CompletableFuture可以从全局的 ForkJoinPool.commonPool()获得一个线程中执行这些任务。
但是你也可以创建一个线程池并传给runAsync() supplyAsync()方法来让他们从线程池中获取一个线程执行它们的任务。
CompletableFuture API 的所有方法都有两个变体-一个接受Executor作为参数,另一个不这样:
// Variations of runAsync() and supplyAsync() methods
static CompletableFuture<Void>  runAsync(Runnable runnable)
static CompletableFuture<Void>  runAsync(Runnable runnable, Executor executor)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

创建一个线程池,并传递给其中一个方法:

Executor executor = Executors.newFixedThreadPool(10);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
        throw new IllegalStateException(e);
    }
    return "Result of the asynchronous computation";
}, executor);

在 CompletableFuture 转换和运行

CompletableFuture.get()方法是阻塞的。它会一直等到Future完成并且在完成后返回结果。
但是,这是我们想要的吗?对于构建异步系统,我们应该附上一个回调给CompletableFuture,当Future完成的时候,自动的获取结果。
如果我们不想等待结果返回,我们可以把需要等待Future完成执行的逻辑写入到回调函数中。

可以使用 thenApply(), thenAccept()thenRun()方法附上一个回调给CompletableFuture。

1. thenApply()
可以使用 thenApply() 处理和改变CompletableFuture的结果。持有一个Function<R,T>作为参数。Function<R,T>是一个简单的函数式接口,接受一个T类型的参数,产出一个R类型的结果。

// Create a CompletableFuture
CompletableFuture<String> whatsYourNameFuture = CompletableFuture.supplyAsync(() -> {
   try {
       TimeUnit.SECONDS.sleep(1);
   } catch (InterruptedException e) {
       throw new IllegalStateException(e);
   }
   return "Rajeev";
});

// Attach a callback to the Future using thenApply()
CompletableFuture<String> greetingFuture = whatsYourNameFuture.thenApply(name -> {
   return "Hello " + name;
});

// Block and get the result of the future.
System.out.println(greetingFuture.get()); // Hello Rajeev

你也可以通过附加一系列的thenApply()在回调方法 在CompletableFuture写一个连续的转换。这样的话,结果中的一个 thenApply方法就会传递给该系列的另外一个 thenApply方法。

CompletableFuture<String> welcomeText = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return "Rajeev";
}).thenApply(name -> {
    return "Hello " + name;
}).thenApply(greeting -> {
    return greeting + ", Welcome to the CalliCoder Blog";
});

System.out.println(welcomeText.get());
// Prints - Hello Rajeev, Welcome to the CalliCoder Blog

2. thenAccept() 和 thenRun()
如果你不想从你的回调函数中返回任何东西,仅仅想在Future完成后运行一些代码片段,你可以使用thenAccept() thenRun()方法,这些方法经常在调用链的最末端的最后一个回调函数中使用。
CompletableFuture.thenAccept() 持有一个Consumer<T> ,返回一个CompletableFuture<Void>。它可以访问CompletableFuture的结果:

// thenAccept() example
CompletableFuture.supplyAsync(() -> {
    return ProductService.getProductDetail(productId);
}).thenAccept(product -> {
    System.out.println("Got product detail from remote service " + product.getName())
});

虽然thenAccept()可以访问CompletableFuture的结果,但thenRun()不能访Future的结果,它持有一个Runnable返回CompletableFuture<Void>:

// thenRun() example
CompletableFuture.supplyAsync(() -> {
    // Run some computation  
}).thenRun(() -> {
    // Computation Finished.
});
异步回调方法的笔记
CompletableFuture提供的所有回调方法都有两个变体:
`// thenApply() variants
<U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)`
这些异步回调变体通过在独立的线程中执行回调任务帮助你进一步执行并行计算。
以下示例:
CompletableFuture.supplyAsync(() -> {
    try {
       TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
      throw new IllegalStateException(e);
    }
    return "Some Result"
}).thenApply(result -> {
    /* 
      Executed in the same thread where the supplyAsync() task is executed
      or in the main thread If the supplyAsync() task completes immediately (Remove sleep() call to verify)
    */
    return "Processed Result"
})

在以上示例中,在thenApply()中的任务和在supplyAsync()中的任务执行在相同的线程中。任何supplyAsync()立即执行完成,那就是执行在主线程中(尝试删除sleep测试下)。
为了控制执行回调任务的线程,你可以使用异步回调。如果你使用thenApplyAsync()回调,将从ForkJoinPool.commonPool()获取不同的线程执行。

CompletableFuture.supplyAsync(() -> {
    return "Some Result"
}).thenApplyAsync(result -> {
    // Executed in a different thread from ForkJoinPool.commonPool()
    return "Processed Result"
})

此外,如果你传入一个ExecutorthenApplyAsync()回调中,,任务将从Executor线程池获取一个线程执行。

Executor executor = Executors.newFixedThreadPool(2);
CompletableFuture.supplyAsync(() -> {
    return "Some result"
}).thenApplyAsync(result -> {
    // Executed in a thread obtained from the executor
    return "Processed Result"
}, executor);

组合两个CompletableFuture

1. 使用 thenCompose() 组合两个独立的future
假设你想从一个远程API中获取一个用户的详细信息,一旦用户信息可用,你想从另外一个服务中获取他的贷方。
考虑下以下两个方法getUserDetail() getCreditRating()的实现:

CompletableFuture<User> getUsersDetail(String userId) {
    return CompletableFuture.supplyAsync(() -> {
        UserService.getUserDetails(userId);
    });    
}

CompletableFuture<Double> getCreditRating(User user) {
    return CompletableFuture.supplyAsync(() -> {
        CreditRatingService.getCreditRating(user);
    });
}

现在让我们弄明白当使用了thenApply()后是否会达到我们期望的结果-

CompletableFuture<CompletableFuture<Double>> result = getUserDetail(userId)
.thenApply(user -> getCreditRating(user));

在更早的示例中,Supplier函数传入thenApply将返回一个简单的值,但是在本例中,将返回一个CompletableFuture。以上示例的最终结果是一个嵌套的CompletableFuture。
如果你想获取最终的结果给最顶层future,使用 thenCompose()方法代替-

CompletableFuture<Double> result = getUserDetail(userId)
.thenCompose(user -> getCreditRating(user));

因此,规则就是-如果你的回调函数返回一个CompletableFuture,但是你想从CompletableFuture链中获取一个直接合并后的结果,这时候你可以使用thenCompose()

2. 使用thenCombine()组合两个独立的 future
虽然thenCompose()被用于当一个future依赖另外一个future的时候用来组合两个future。thenCombine()被用来当两个独立的Future都完成的时候,用来做一些事情。

System.out.println("Retrieving weight.");
CompletableFuture<Double> weightInKgFuture = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return 65.0;
});

System.out.println("Retrieving height.");
CompletableFuture<Double> heightInCmFuture = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return 177.8;
});

System.out.println("Calculating BMI.");
CompletableFuture<Double> combinedFuture = weightInKgFuture
        .thenCombine(heightInCmFuture, (weightInKg, heightInCm) -> {
    Double heightInMeter = heightInCm/100;
    return weightInKg/(heightInMeter*heightInMeter);
});

System.out.println("Your BMI is - " + combinedFuture.get());

当两个Future都完成的时候,传给``thenCombine()的回调函数将被调用。

组合多个CompletableFuture

我们使用thenCompose() thenCombine()把两个CompletableFuture组合在一起。现在如果你想组合任意数量的CompletableFuture,应该怎么做?我们可以使用以下两个方法组合任意数量的CompletableFuture。

static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

1. CompletableFuture.allOf()
CompletableFuture.allOf的使用场景是当你一个列表的独立future,并且你想在它们都完成后并行的做一些事情。

假设你想下载一个网站的100个不同的页面。你可以串行的做这个操作,但是这非常消耗时间。因此你想写一个函数,传入一个页面链接,返回一个CompletableFuture,异步的下载页面内容。

CompletableFuture<String> downloadWebPage(String pageLink) {
    return CompletableFuture.supplyAsync(() -> {
        // Code to download and return the web page's content
    });
} 

现在,当所有的页面已经下载完毕,你想计算包含关键字CompletableFuture页面的数量。可以使用CompletableFuture.allOf()达成目的。

List<String> webPageLinks = Arrays.asList(...)    // A list of 100 web page links

// Download contents of all the web pages asynchronously
List<CompletableFuture<String>> pageContentFutures = webPageLinks.stream()
        .map(webPageLink -> downloadWebPage(webPageLink))
        .collect(Collectors.toList());


// Create a combined Future using allOf()
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
        pageContentFutures.toArray(new CompletableFuture[pageContentFutures.size()])
);

使用CompletableFuture.allOf()的问题是它返回CompletableFuture<Void>。但是我们可以通过写一些额外的代码来获取所有封装的CompletableFuture结果。

// When all the Futures are completed, call `future.join()` to get their results and collect the results in a list -
CompletableFuture<List<String>> allPageContentsFuture = allFutures.thenApply(v -> {
   return pageContentFutures.stream()
           .map(pageContentFuture -> pageContentFuture.join())
           .collect(Collectors.toList());
});

花一些时间理解下以上代码片段。当所有future完成的时候,我们调用了future.join(),因此我们不会在任何地方阻塞。

join()方法和get()方法非常类似,这唯一不同的地方是如果最顶层的CompletableFuture完成的时候发生了异常,它会抛出一个未经检查的异常。

现在让我们计算包含关键字页面的数量。

// Count the number of web pages having the "CompletableFuture" keyword.
CompletableFuture<Long> countFuture = allPageContentsFuture.thenApply(pageContents -> {
    return pageContents.stream()
            .filter(pageContent -> pageContent.contains("CompletableFuture"))
            .count();
});

System.out.println("Number of Web Pages having CompletableFuture keyword - " + 
        countFuture.get());

2. CompletableFuture.anyOf()

CompletableFuture.anyOf()和其名字介绍的一样,当任何一个CompletableFuture完成的时候【相同的结果类型】,返回一个新的CompletableFuture。以下示例:

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(2);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return "Result of Future 1";
});

CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return "Result of Future 2";
});

CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(3);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return "Result of Future 3";
});

CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2, future3);

System.out.println(anyOfFuture.get()); // Result of Future 2

在以上示例中,当三个中的任何一个CompletableFuture完成, anyOfFuture就会完成。因为future2的休眠时间最少,因此她最先完成,最终的结果将是future2的结果。

CompletableFuture.anyOf()传入一个Future可变参数,返回CompletableFuture<Object>。CompletableFuture.anyOf()的问题是如果你的CompletableFuture返回的结果是不同类型的,这时候你讲会不知道你最终CompletableFuture是什么类型。

CompletableFuture 异常处理

我们探寻了怎样创建CompletableFuture,转换它们,并组合多个CompletableFuture。现在让我们弄明白当发生错误的时候我们应该怎么做。

首先让我们明白在一个回调链中错误是怎么传递的。思考下以下回调链:

CompletableFuture.supplyAsync(() -> {
    // Code which might throw an exception
    return "Some result";
}).thenApply(result -> {
    return "processed result";
}).thenApply(result -> {
    return "result after further processing";
}).thenAccept(result -> {
    // do something with the final result
});

如果在原始的supplyAsync()任务中发生一个错误,这时候没有任何thenApply会被调用并且future将以一个异常结束。如果在第一个thenApply发生错误,这时候第二个和第三个将不会被调用,同样的,future将以异常结束。

1. 使用 exceptionally() 回调处理异常
exceptionally()回调给你一个从原始Future中生成的错误恢复的机会。你可以在这里记录这个异常并返回一个默认值。

Integer age = -1;

CompletableFuture<String> maturityFuture = CompletableFuture.supplyAsync(() -> {
    if(age < 0) {
        throw new IllegalArgumentException("Age can not be negative");
    }
    if(age > 18) {
        return "Adult";
    } else {
        return "Child";
    }
}).exceptionally(ex -> {
    System.out.println("Oops! We have an exception - " + ex.getMessage());
    return "Unknown!";
});

System.out.println("Maturity : " + maturityFuture.get()); 

2. 使用 handle() 方法处理异常
API提供了一个更通用的方法 - handle()从异常恢复,无论一个异常是否发生它都会被调用。

Integer age = -1;

CompletableFuture<String> maturityFuture = CompletableFuture.supplyAsync(() -> {
    if(age < 0) {
        throw new IllegalArgumentException("Age can not be negative");
    }
    if(age > 18) {
        return "Adult";
    } else {
        return "Child";
    }
}).handle((res, ex) -> {
    if(ex != null) {
        System.out.println("Oops! We have an exception - " + ex.getMessage());
        return "Unknown!";
    }
    return res;
});

System.out.println("Maturity : " + maturityFuture.get());

如果异常发生,res参数将是 null,否则,ex将是 null。

查看原文

赞 25 收藏 20 评论 2

yexiaobai 发布了文章 · 2018-04-19

Java 8 CompletableFuture 教程

Java 8 有大量的新特性和增强如 Lambda 表达式StreamsCompletableFuture等。在本篇文章中我将详细解释清楚CompletableFuture以及它所有方法的使用。

什么是CompletableFuture?

在Java中CompletableFuture用于异步编程,异步编程是编写非阻塞的代码,运行的任务在一个单独的线程,与主线程隔离,并且会通知主线程它的进度,成功或者失败。

在这种方式中,主线程不会被阻塞,不需要一直等到子线程完成。主线程可以并行的执行其他任务。

使用这种并行方式,可以极大的提高程序的性能。

Future vs CompletableFuture

CompletableFuture 是 Future API的扩展。

Future 被用于作为一个异步计算结果的引用。提供一个 isDone() 方法来检查计算任务是否完成。当任务完成时,get() 方法用来接收计算任务的结果。

Callbale和 Future 教程可以学习更多关于 Future 知识.

Future API 是非常好的 Java 异步编程进阶,但是它缺乏一些非常重要和有用的特性。

Future 的局限性

  1. 不能手动完成
    当你写了一个函数,用于通过一个远程API获取一个电子商务产品最新价格。因为这个 API 太耗时,你把它允许在一个独立的线程中,并且从你的函数中返回一个 Future。现在假设这个API服务宕机了,这时你想通过该产品的最新缓存价格手工完成这个Future 。你会发现无法这样做。
  2. Future 的结果在非阻塞的情况下,不能执行更进一步的操作
    Future 不会通知你它已经完成了,它提供了一个阻塞的 get() 方法通知你结果。你无法给 Future 植入一个回调函数,当 Future 结果可用的时候,用该回调函数自动的调用 Future 的结果。
  3. 多个 Future 不能串联在一起组成链式调用
    有时候你需要执行一个长时间运行的计算任务,并且当计算任务完成的时候,你需要把它的计算结果发送给另外一个长时间运行的计算任务等等。你会发现你无法使用 Future 创建这样的一个工作流。
  4. 不能组合多个 Future 的结果
    假设你有10个不同的Future,你想并行的运行,然后在它们运行未完成后运行一些函数。你会发现你也无法使用 Future 这样做。
  5. 没有异常处理
    Future API 没有任务的异常处理结构居然有如此多的限制,幸好我们有CompletableFuture,你可以使用 CompletableFuture 达到以上所有目的。

CompletableFuture 实现了 FutureCompletionStage接口,并且提供了许多关于创建,链式调用和组合多个 Future 的便利方法集,而且有广泛的异常处理支持。

创建 CompletableFuture

1. 简单的例子
可以使用如下无参构造函数简单的创建 CompletableFuture:

CompletableFuture<String> completableFuture = new CompletableFuture<String>();

这是一个最简单的 CompletableFuture,想获取CompletableFuture 的结果可以使用 CompletableFuture.get() 方法:

String result = completableFuture.get()

get() 方法会一直阻塞直到 Future 完成。因此,以上的调用将被永远阻塞,因为该Future一直不会完成。

你可以使用 CompletableFuture.complete() 手工的完成一个 Future:

completableFuture.complete("Future's Result")

所有等待这个 Future 的客户端都将得到一个指定的结果,并且 completableFuture.complete() 之后的调用将被忽略。

2. 使用 runAsync() 运行异步计算
如果你想异步的运行一个后台任务并且不想改任务返回任务东西,这时候可以使用 CompletableFuture.runAsync()方法,它持有一个Runnable 对象,并返回 CompletableFuture<Void>

// Run a task specified by a Runnable Object asynchronously.
CompletableFuture<Void> future = CompletableFuture.runAsync(new Runnable() {
    @Override
    public void run() {
        // Simulate a long-running Job
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            throw new IllegalStateException(e);
        }
        System.out.println("I'll run in a separate thread than the main thread.");
    }
});

// Block and wait for the future to complete
future.get()

你也可以以 lambda 表达式的形式传入 Runnable 对象:

// Using Lambda Expression
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    // Simulate a long-running Job   
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
        throw new IllegalStateException(e);
    }
    System.out.println("I'll run in a separate thread than the main thread.");
});

在本文中,我使用lambda表达式会比较频繁,如果以前你没有使用过,建议你也多使用lambda 表达式。

3. 使用 supplyAsync() 运行一个异步任务并且返回结果
当任务不需要返回任何东西的时候, CompletableFuture.runAsync() 非常有用。但是如果你的后台任务需要返回一些结果应该要怎么样?

CompletableFuture.supplyAsync() 就是你的选择。它持有supplier<T> 并且返回CompletableFuture<T>T 是通过调用 传入的supplier取得的值的类型。

// Run a task specified by a Supplier object asynchronously
CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
    @Override
    public String get() {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            throw new IllegalStateException(e);
        }
        return "Result of the asynchronous computation";
    }
});

// Block and get the result of the Future
String result = future.get();
System.out.println(result);

Supplier<T> 是一个简单的函数式接口,表示supplier的结果。它有一个get()方法,该方法可以写入你的后台任务中,并且返回结果。

你可以使用lambda表达式使得上面的示例更加简明:

// Using Lambda Expression
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
        throw new IllegalStateException(e);
    }
    return "Result of the asynchronous computation";
});
一个关于Executor 和Thread Pool笔记
你可能想知道,我们知道runAsync() supplyAsync()方法在单独的线程中执行他们的任务。但是我们不会永远只创建一个线程。
CompletableFuture可以从全局的 ForkJoinPool.commonPool()获得一个线程中执行这些任务。
但是你也可以创建一个线程池并传给runAsync() supplyAsync()方法来让他们从线程池中获取一个线程执行它们的任务。
CompletableFuture API 的所有方法都有两个变体-一个接受Executor作为参数,另一个不这样:
// Variations of runAsync() and supplyAsync() methods
static CompletableFuture<Void>  runAsync(Runnable runnable)
static CompletableFuture<Void>  runAsync(Runnable runnable, Executor executor)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

创建一个线程池,并传递给其中一个方法:

Executor executor = Executors.newFixedThreadPool(10);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
        throw new IllegalStateException(e);
    }
    return "Result of the asynchronous computation";
}, executor);

在 CompletableFuture 转换和运行

CompletableFuture.get()方法是阻塞的。它会一直等到Future完成并且在完成后返回结果。
但是,这是我们想要的吗?对于构建异步系统,我们应该附上一个回调给CompletableFuture,当Future完成的时候,自动的获取结果。
如果我们不想等待结果返回,我们可以把需要等待Future完成执行的逻辑写入到回调函数中。

可以使用 thenApply(), thenAccept()thenRun()方法附上一个回调给CompletableFuture。

1. thenApply()
可以使用 thenApply() 处理和改变CompletableFuture的结果。持有一个Function<R,T>作为参数。Function<R,T>是一个简单的函数式接口,接受一个T类型的参数,产出一个R类型的结果。

// Create a CompletableFuture
CompletableFuture<String> whatsYourNameFuture = CompletableFuture.supplyAsync(() -> {
   try {
       TimeUnit.SECONDS.sleep(1);
   } catch (InterruptedException e) {
       throw new IllegalStateException(e);
   }
   return "Rajeev";
});

// Attach a callback to the Future using thenApply()
CompletableFuture<String> greetingFuture = whatsYourNameFuture.thenApply(name -> {
   return "Hello " + name;
});

// Block and get the result of the future.
System.out.println(greetingFuture.get()); // Hello Rajeev

你也可以通过附加一系列的thenApply()在回调方法 在CompletableFuture写一个连续的转换。这样的话,结果中的一个 thenApply方法就会传递给该系列的另外一个 thenApply方法。

CompletableFuture<String> welcomeText = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return "Rajeev";
}).thenApply(name -> {
    return "Hello " + name;
}).thenApply(greeting -> {
    return greeting + ", Welcome to the CalliCoder Blog";
});

System.out.println(welcomeText.get());
// Prints - Hello Rajeev, Welcome to the CalliCoder Blog

2. thenAccept() 和 thenRun()
如果你不想从你的回调函数中返回任何东西,仅仅想在Future完成后运行一些代码片段,你可以使用thenAccept() thenRun()方法,这些方法经常在调用链的最末端的最后一个回调函数中使用。
CompletableFuture.thenAccept() 持有一个Consumer<T> ,返回一个CompletableFuture<Void>。它可以访问CompletableFuture的结果:

// thenAccept() example
CompletableFuture.supplyAsync(() -> {
    return ProductService.getProductDetail(productId);
}).thenAccept(product -> {
    System.out.println("Got product detail from remote service " + product.getName())
});

虽然thenAccept()可以访问CompletableFuture的结果,但thenRun()不能访Future的结果,它持有一个Runnable返回CompletableFuture<Void>:

// thenRun() example
CompletableFuture.supplyAsync(() -> {
    // Run some computation  
}).thenRun(() -> {
    // Computation Finished.
});
异步回调方法的笔记
CompletableFuture提供的所有回调方法都有两个变体:
`// thenApply() variants
<U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)`
这些异步回调变体通过在独立的线程中执行回调任务帮助你进一步执行并行计算。
以下示例:
CompletableFuture.supplyAsync(() -> {
    try {
       TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
      throw new IllegalStateException(e);
    }
    return "Some Result"
}).thenApply(result -> {
    /* 
      Executed in the same thread where the supplyAsync() task is executed
      or in the main thread If the supplyAsync() task completes immediately (Remove sleep() call to verify)
    */
    return "Processed Result"
})

在以上示例中,在thenApply()中的任务和在supplyAsync()中的任务执行在相同的线程中。任何supplyAsync()立即执行完成,那就是执行在主线程中(尝试删除sleep测试下)。
为了控制执行回调任务的线程,你可以使用异步回调。如果你使用thenApplyAsync()回调,将从ForkJoinPool.commonPool()获取不同的线程执行。

CompletableFuture.supplyAsync(() -> {
    return "Some Result"
}).thenApplyAsync(result -> {
    // Executed in a different thread from ForkJoinPool.commonPool()
    return "Processed Result"
})

此外,如果你传入一个ExecutorthenApplyAsync()回调中,,任务将从Executor线程池获取一个线程执行。

Executor executor = Executors.newFixedThreadPool(2);
CompletableFuture.supplyAsync(() -> {
    return "Some result"
}).thenApplyAsync(result -> {
    // Executed in a thread obtained from the executor
    return "Processed Result"
}, executor);

组合两个CompletableFuture

1. 使用 thenCompose() 组合两个独立的future
假设你想从一个远程API中获取一个用户的详细信息,一旦用户信息可用,你想从另外一个服务中获取他的贷方。
考虑下以下两个方法getUserDetail() getCreditRating()的实现:

CompletableFuture<User> getUsersDetail(String userId) {
    return CompletableFuture.supplyAsync(() -> {
        UserService.getUserDetails(userId);
    });    
}

CompletableFuture<Double> getCreditRating(User user) {
    return CompletableFuture.supplyAsync(() -> {
        CreditRatingService.getCreditRating(user);
    });
}

现在让我们弄明白当使用了thenApply()后是否会达到我们期望的结果-

CompletableFuture<CompletableFuture<Double>> result = getUserDetail(userId)
.thenApply(user -> getCreditRating(user));

在更早的示例中,Supplier函数传入thenApply将返回一个简单的值,但是在本例中,将返回一个CompletableFuture。以上示例的最终结果是一个嵌套的CompletableFuture。
如果你想获取最终的结果给最顶层future,使用 thenCompose()方法代替-

CompletableFuture<Double> result = getUserDetail(userId)
.thenCompose(user -> getCreditRating(user));

因此,规则就是-如果你的回调函数返回一个CompletableFuture,但是你想从CompletableFuture链中获取一个直接合并后的结果,这时候你可以使用thenCompose()

2. 使用thenCombine()组合两个独立的 future
虽然thenCompose()被用于当一个future依赖另外一个future的时候用来组合两个future。thenCombine()被用来当两个独立的Future都完成的时候,用来做一些事情。

System.out.println("Retrieving weight.");
CompletableFuture<Double> weightInKgFuture = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return 65.0;
});

System.out.println("Retrieving height.");
CompletableFuture<Double> heightInCmFuture = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return 177.8;
});

System.out.println("Calculating BMI.");
CompletableFuture<Double> combinedFuture = weightInKgFuture
        .thenCombine(heightInCmFuture, (weightInKg, heightInCm) -> {
    Double heightInMeter = heightInCm/100;
    return weightInKg/(heightInMeter*heightInMeter);
});

System.out.println("Your BMI is - " + combinedFuture.get());

当两个Future都完成的时候,传给``thenCombine()的回调函数将被调用。

组合多个CompletableFuture

我们使用thenCompose() thenCombine()把两个CompletableFuture组合在一起。现在如果你想组合任意数量的CompletableFuture,应该怎么做?我们可以使用以下两个方法组合任意数量的CompletableFuture。

static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

1. CompletableFuture.allOf()
CompletableFuture.allOf的使用场景是当你一个列表的独立future,并且你想在它们都完成后并行的做一些事情。

假设你想下载一个网站的100个不同的页面。你可以串行的做这个操作,但是这非常消耗时间。因此你想写一个函数,传入一个页面链接,返回一个CompletableFuture,异步的下载页面内容。

CompletableFuture<String> downloadWebPage(String pageLink) {
    return CompletableFuture.supplyAsync(() -> {
        // Code to download and return the web page's content
    });
} 

现在,当所有的页面已经下载完毕,你想计算包含关键字CompletableFuture页面的数量。可以使用CompletableFuture.allOf()达成目的。

List<String> webPageLinks = Arrays.asList(...)    // A list of 100 web page links

// Download contents of all the web pages asynchronously
List<CompletableFuture<String>> pageContentFutures = webPageLinks.stream()
        .map(webPageLink -> downloadWebPage(webPageLink))
        .collect(Collectors.toList());


// Create a combined Future using allOf()
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
        pageContentFutures.toArray(new CompletableFuture[pageContentFutures.size()])
);

使用CompletableFuture.allOf()的问题是它返回CompletableFuture<Void>。但是我们可以通过写一些额外的代码来获取所有封装的CompletableFuture结果。

// When all the Futures are completed, call `future.join()` to get their results and collect the results in a list -
CompletableFuture<List<String>> allPageContentsFuture = allFutures.thenApply(v -> {
   return pageContentFutures.stream()
           .map(pageContentFuture -> pageContentFuture.join())
           .collect(Collectors.toList());
});

花一些时间理解下以上代码片段。当所有future完成的时候,我们调用了future.join(),因此我们不会在任何地方阻塞。

join()方法和get()方法非常类似,这唯一不同的地方是如果最顶层的CompletableFuture完成的时候发生了异常,它会抛出一个未经检查的异常。

现在让我们计算包含关键字页面的数量。

// Count the number of web pages having the "CompletableFuture" keyword.
CompletableFuture<Long> countFuture = allPageContentsFuture.thenApply(pageContents -> {
    return pageContents.stream()
            .filter(pageContent -> pageContent.contains("CompletableFuture"))
            .count();
});

System.out.println("Number of Web Pages having CompletableFuture keyword - " + 
        countFuture.get());

2. CompletableFuture.anyOf()

CompletableFuture.anyOf()和其名字介绍的一样,当任何一个CompletableFuture完成的时候【相同的结果类型】,返回一个新的CompletableFuture。以下示例:

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(2);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return "Result of Future 1";
});

CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return "Result of Future 2";
});

CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(3);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return "Result of Future 3";
});

CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2, future3);

System.out.println(anyOfFuture.get()); // Result of Future 2

在以上示例中,当三个中的任何一个CompletableFuture完成, anyOfFuture就会完成。因为future2的休眠时间最少,因此她最先完成,最终的结果将是future2的结果。

CompletableFuture.anyOf()传入一个Future可变参数,返回CompletableFuture<Object>。CompletableFuture.anyOf()的问题是如果你的CompletableFuture返回的结果是不同类型的,这时候你讲会不知道你最终CompletableFuture是什么类型。

CompletableFuture 异常处理

我们探寻了怎样创建CompletableFuture,转换它们,并组合多个CompletableFuture。现在让我们弄明白当发生错误的时候我们应该怎么做。

首先让我们明白在一个回调链中错误是怎么传递的。思考下以下回调链:

CompletableFuture.supplyAsync(() -> {
    // Code which might throw an exception
    return "Some result";
}).thenApply(result -> {
    return "processed result";
}).thenApply(result -> {
    return "result after further processing";
}).thenAccept(result -> {
    // do something with the final result
});

如果在原始的supplyAsync()任务中发生一个错误,这时候没有任何thenApply会被调用并且future将以一个异常结束。如果在第一个thenApply发生错误,这时候第二个和第三个将不会被调用,同样的,future将以异常结束。

1. 使用 exceptionally() 回调处理异常
exceptionally()回调给你一个从原始Future中生成的错误恢复的机会。你可以在这里记录这个异常并返回一个默认值。

Integer age = -1;

CompletableFuture<String> maturityFuture = CompletableFuture.supplyAsync(() -> {
    if(age < 0) {
        throw new IllegalArgumentException("Age can not be negative");
    }
    if(age > 18) {
        return "Adult";
    } else {
        return "Child";
    }
}).exceptionally(ex -> {
    System.out.println("Oops! We have an exception - " + ex.getMessage());
    return "Unknown!";
});

System.out.println("Maturity : " + maturityFuture.get()); 

2. 使用 handle() 方法处理异常
API提供了一个更通用的方法 - handle()从异常恢复,无论一个异常是否发生它都会被调用。

Integer age = -1;

CompletableFuture<String> maturityFuture = CompletableFuture.supplyAsync(() -> {
    if(age < 0) {
        throw new IllegalArgumentException("Age can not be negative");
    }
    if(age > 18) {
        return "Adult";
    } else {
        return "Child";
    }
}).handle((res, ex) -> {
    if(ex != null) {
        System.out.println("Oops! We have an exception - " + ex.getMessage());
        return "Unknown!";
    }
    return res;
});

System.out.println("Maturity : " + maturityFuture.get());

如果异常发生,res参数将是 null,否则,ex将是 null。

查看原文

赞 25 收藏 20 评论 2

认证与成就

  • 获得 392 次点赞
  • 获得 28 枚徽章 获得 1 枚金徽章, 获得 13 枚银徽章, 获得 14 枚铜徽章

擅长技能
编辑

开源项目 & 著作
编辑

(゚∀゚ )
暂时没有

注册于 2013-10-09
个人主页被 6.5k 人浏览