朱伟

朱伟 查看完整档案

北京编辑  |  填写毕业院校  |  填写所在公司/组织填写个人主网站
编辑
_ | |__ _ _ __ _ | '_ \| | | |/ _` | | |_) | |_| | (_| | |_.__/ \__,_|\__, | |___/ 该用户太懒什么也没留下

个人动态

朱伟 发布了文章 · 9月6日

kubernetes schedule模块

一、kubernetes schedule 介绍

image.png
该图是kubernetes的整体架构图,设计到kubernetes中的一些重要模块,除了 kubernetes api server 以外,其余的模块都要和 api server 进行通信来获取需要的资源,原理是 list、watch 机制,当用户创建pod资源后,此时的 pod 中 nodename 属性值是空的,schedule 模块会获取到这样的 pod并为其选择合适的运行节点,schedule为pod选择合适的运行节点是一个很复杂的过程,要考虑很多因素比如zone、node的 affinity、anti-afinity,主机的 cpu、内存、卷冲突、taint等。当为pod获取到合适的运行主机后,会将主机名设置给pod作为一个属性,存储到持久化存储也就是etcd中,kubelet模块在监听pod的nodename属性有值的pod,获取到后在当前主机上运行pod。

二、kubenetes schedule framework 介绍

image.png
这是官方的一张框架图,这个是最新版本中实现的选择node的流程,在之前的版本中存在很多自定义插件带来的痛点,在之前的版本中插件是基于predicate、Prioritize的方式进行注册,改为 framerwork后的注册方式更加清晰并且扩展性更好,这些会在kubernetes extension中详细说明,图中每一个位置都是一个可扩展的点。
基于predicate、Prioritize的注册方式

// NewLegacyRegistry returns a legacy algorithm registry of predicates and priorities.
func NewLegacyRegistry() *LegacyRegistry {
    registry := &LegacyRegistry{
        // MandatoryPredicates the set of keys for predicates that the scheduler will
        // be configured with all the time.
        MandatoryPredicates: sets.NewString(
            PodToleratesNodeTaintsPred,
            CheckNodeUnschedulablePred,
        ),

        // Used as the default set of predicates if Policy was specified, but predicates was nil.
        DefaultPredicates: sets.NewString(
            NoVolumeZoneConflictPred,
            MaxEBSVolumeCountPred,
            MaxGCEPDVolumeCountPred,
            MaxAzureDiskVolumeCountPred,
            MaxCSIVolumeCountPred,
            MatchInterPodAffinityPred,
            NoDiskConflictPred,
            GeneralPred,
            PodToleratesNodeTaintsPred,
            CheckVolumeBindingPred,
            CheckNodeUnschedulablePred,
        ),

        // Used as the default set of predicates if Policy was specified, but priorities was nil.
        DefaultPriorities: map[string]int64{
            SelectorSpreadPriority:      1,
            InterPodAffinityPriority:    1,
            LeastRequestedPriority:      1,
            BalancedResourceAllocation:  1,
            NodePreferAvoidPodsPriority: 10000,
            NodeAffinityPriority:        1,
            TaintTolerationPriority:     1,
            ImageLocalityPriority:       1,
        },

        PredicateToConfigProducer: make(map[string]ConfigProducer),
        PriorityToConfigProducer:  make(map[string]ConfigProducer),
    }

    registry.registerPredicateConfigProducer(GeneralPred,
        func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
            // GeneralPredicate is a combination of predicates.
            plugins.Filter = appendToPluginSet(plugins.Filter, noderesources.FitName, nil)
            plugins.PreFilter = appendToPluginSet(plugins.PreFilter, noderesources.FitName, nil)
            if args.NodeResourcesFitArgs != nil {
                pluginConfig = append(pluginConfig, NewPluginConfig(noderesources.FitName, args.NodeResourcesFitArgs))
            }
            plugins.Filter = appendToPluginSet(plugins.Filter, nodename.Name, nil)
            plugins.Filter = appendToPluginSet(plugins.Filter, nodeports.Name, nil)
            plugins.PreFilter = appendToPluginSet(plugins.PreFilter, nodeports.Name, nil)
            plugins.Filter = appendToPluginSet(plugins.Filter, nodeaffinity.Name, nil)
            return
        })
    registry.registerPredicateConfigProducer(PodToleratesNodeTaintsPred,
        func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
            plugins.Filter = appendToPluginSet(plugins.Filter, tainttoleration.Name, nil)
            return
        })
    registry.registerPredicateConfigProducer(PodFitsResourcesPred,
        func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
            plugins.Filter = appendToPluginSet(plugins.Filter, noderesources.FitName, nil)
            plugins.PreFilter = appendToPluginSet(plugins.PreFilter, noderesources.FitName, nil)
            if args.NodeResourcesFitArgs != nil {
                pluginConfig = append(pluginConfig, NewPluginConfig(noderesources.FitName, args.NodeResourcesFitArgs))
            }
            return
        })
    registry.registerPredicateConfigProducer(HostNamePred,
        func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
            plugins.Filter = appendToPluginSet(plugins.Filter, nodename.Name, nil)
            return
        })
    registry.registerPredicateConfigProducer(PodFitsHostPortsPred,
        func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
            plugins.Filter = appendToPluginSet(plugins.Filter, nodeports.Name, nil)
            plugins.PreFilter = appendToPluginSet(plugins.PreFilter, nodeports.Name, nil)
            return
        })
    registry.registerPredicateConfigProducer(MatchNodeSelectorPred,
        func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
            plugins.Filter = appendToPluginSet(plugins.Filter, nodeaffinity.Name, nil)
            return
        })
    registry.registerPredicateConfigProducer(CheckNodeUnschedulablePred,
        func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
            plugins.Filter = appendToPluginSet(plugins.Filter, nodeunschedulable.Name, nil)
            return
        })
    registry.registerPredicateConfigProducer(CheckVolumeBindingPred,
        func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
            plugins.Filter = appendToPluginSet(plugins.Filter, volumebinding.Name, nil)
            return
        })
    registry.registerPredicateConfigProducer(NoDiskConflictPred,
        func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
            plugins.Filter = appendToPluginSet(plugins.Filter, volumerestrictions.Name, nil)
            return
        })
    registry.registerPredicateConfigProducer(NoVolumeZoneConflictPred,
        func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
            plugins.Filter = appendToPluginSet(plugins.Filter, volumezone.Name, nil)
            return
        })
    registry.registerPredicateConfigProducer(MaxCSIVolumeCountPred,
        func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
            plugins.Filter = appendToPluginSet(plugins.Filter, nodevolumelimits.CSIName, nil)
            return
        })
    registry.registerPredicateConfigProducer(MaxEBSVolumeCountPred,
        func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
            plugins.Filter = appendToPluginSet(plugins.Filter, nodevolumelimits.EBSName, nil)
            return
        })
    registry.registerPredicateConfigProducer(MaxGCEPDVolumeCountPred,
        func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
            plugins.Filter = appendToPluginSet(plugins.Filter, nodevolumelimits.GCEPDName, nil)
            return
        })
    registry.registerPredicateConfigProducer(MaxAzureDiskVolumeCountPred,
        func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
            plugins.Filter = appendToPluginSet(plugins.Filter, nodevolumelimits.AzureDiskName, nil)
            return
        })
    registry.registerPredicateConfigProducer(MaxCinderVolumeCountPred,
        func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
            plugins.Filter = appendToPluginSet(plugins.Filter, nodevolumelimits.CinderName, nil)
            return
        })
    registry.registerPredicateConfigProducer(MatchInterPodAffinityPred,
        func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
            plugins.Filter = appendToPluginSet(plugins.Filter, interpodaffinity.Name, nil)
            plugins.PreFilter = appendToPluginSet(plugins.PreFilter, interpodaffinity.Name, nil)
            return
        })
    registry.registerPredicateConfigProducer(CheckNodeLabelPresencePred,
        func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
            plugins.Filter = appendToPluginSet(plugins.Filter, nodelabel.Name, nil)
            if args.NodeLabelArgs != nil {
                pluginConfig = append(pluginConfig, NewPluginConfig(nodelabel.Name, args.NodeLabelArgs))
            }
            return
        })
    registry.registerPredicateConfigProducer(CheckServiceAffinityPred,
        func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
            plugins.Filter = appendToPluginSet(plugins.Filter, serviceaffinity.Name, nil)
            if args.ServiceAffinityArgs != nil {
                pluginConfig = append(pluginConfig, NewPluginConfig(serviceaffinity.Name, args.ServiceAffinityArgs))
            }
            plugins.PreFilter = appendToPluginSet(plugins.PreFilter, serviceaffinity.Name, nil)
            return
        })

    // Register Priorities.
    registry.registerPriorityConfigProducer(SelectorSpreadPriority,
        func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
            plugins.Score = appendToPluginSet(plugins.Score, defaultpodtopologyspread.Name, &args.Weight)
            plugins.PreScore = appendToPluginSet(plugins.PreScore, defaultpodtopologyspread.Name, nil)
            return
        })
    registry.registerPriorityConfigProducer(TaintTolerationPriority,
        func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
            plugins.PreScore = appendToPluginSet(plugins.PreScore, tainttoleration.Name, nil)
            plugins.Score = appendToPluginSet(plugins.Score, tainttoleration.Name, &args.Weight)
            return
        })
    registry.registerPriorityConfigProducer(NodeAffinityPriority,
        func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
            plugins.Score = appendToPluginSet(plugins.Score, nodeaffinity.Name, &args.Weight)
            return
        })
    registry.registerPriorityConfigProducer(ImageLocalityPriority,
        func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
            plugins.Score = appendToPluginSet(plugins.Score, imagelocality.Name, &args.Weight)
            return
        })
    registry.registerPriorityConfigProducer(InterPodAffinityPriority,
        func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
            plugins.PreScore = appendToPluginSet(plugins.PreScore, interpodaffinity.Name, nil)
            plugins.Score = appendToPluginSet(plugins.Score, interpodaffinity.Name, &args.Weight)
            if args.InterPodAffinityArgs != nil {
                pluginConfig = append(pluginConfig, NewPluginConfig(interpodaffinity.Name, args.InterPodAffinityArgs))
            }
            return
        })
    registry.registerPriorityConfigProducer(NodePreferAvoidPodsPriority,
        func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
            plugins.Score = appendToPluginSet(plugins.Score, nodepreferavoidpods.Name, &args.Weight)
            return
        })
    registry.registerPriorityConfigProducer(MostRequestedPriority,
        func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
            plugins.Score = appendToPluginSet(plugins.Score, noderesources.MostAllocatedName, &args.Weight)
            return
        })
    registry.registerPriorityConfigProducer(BalancedResourceAllocation,
        func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
            plugins.Score = appendToPluginSet(plugins.Score, noderesources.BalancedAllocationName, &args.Weight)
            return
        })
    registry.registerPriorityConfigProducer(LeastRequestedPriority,
        func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
            plugins.Score = appendToPluginSet(plugins.Score, noderesources.LeastAllocatedName, &args.Weight)
            return
        })
    registry.registerPriorityConfigProducer(noderesources.RequestedToCapacityRatioName,
        func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
            plugins.Score = appendToPluginSet(plugins.Score, noderesources.RequestedToCapacityRatioName, &args.Weight)
            if args.RequestedToCapacityRatioArgs != nil {
                pluginConfig = append(pluginConfig, NewPluginConfig(noderesources.RequestedToCapacityRatioName, args.RequestedToCapacityRatioArgs))
            }
            return
        })

    registry.registerPriorityConfigProducer(nodelabel.Name,
        func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
            // If there are n LabelPreference priorities in the policy, the weight for the corresponding
            // score plugin is n*weight (note that the validation logic verifies that all LabelPreference
            // priorities specified in Policy have the same weight).
            weight := args.Weight * int32(len(args.NodeLabelArgs.PresentLabelsPreference)+len(args.NodeLabelArgs.AbsentLabelsPreference))
            plugins.Score = appendToPluginSet(plugins.Score, nodelabel.Name, &weight)
            if args.NodeLabelArgs != nil {
                pluginConfig = append(pluginConfig, NewPluginConfig(nodelabel.Name, args.NodeLabelArgs))
            }
            return
        })
    registry.registerPriorityConfigProducer(serviceaffinity.Name,
        func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
            // If there are n ServiceAffinity priorities in the policy, the weight for the corresponding
            // score plugin is n*weight (note that the validation logic verifies that all ServiceAffinity
            // priorities specified in Policy have the same weight).
            weight := args.Weight * int32(len(args.ServiceAffinityArgs.AntiAffinityLabelsPreference))
            plugins.Score = appendToPluginSet(plugins.Score, serviceaffinity.Name, &weight)
            if args.ServiceAffinityArgs != nil {
                pluginConfig = append(pluginConfig, NewPluginConfig(serviceaffinity.Name, args.ServiceAffinityArgs))
            }
            return
        })

    // The following two features are the last ones to be supported as predicate/priority.
    // Once they graduate to GA, there will be no more checking for featue gates here.
    // Only register EvenPodsSpread predicate & priority if the feature is enabled
    if utilfeature.DefaultFeatureGate.Enabled(features.EvenPodsSpread) {
        klog.Infof("Registering EvenPodsSpread predicate and priority function")

        registry.registerPredicateConfigProducer(EvenPodsSpreadPred,
            func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
                plugins.PreFilter = appendToPluginSet(plugins.PreFilter, podtopologyspread.Name, nil)
                plugins.Filter = appendToPluginSet(plugins.Filter, podtopologyspread.Name, nil)
                return
            })
        registry.DefaultPredicates.Insert(EvenPodsSpreadPred)

        registry.registerPriorityConfigProducer(EvenPodsSpreadPriority,
            func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
                plugins.PreScore = appendToPluginSet(plugins.PreScore, podtopologyspread.Name, nil)
                plugins.Score = appendToPluginSet(plugins.Score, podtopologyspread.Name, &args.Weight)
                return
            })
        registry.DefaultPriorities[EvenPodsSpreadPriority] = 1
    }

    // Prioritizes nodes that satisfy pod's resource limits
    if utilfeature.DefaultFeatureGate.Enabled(features.ResourceLimitsPriorityFunction) {
        klog.Infof("Registering resourcelimits priority function")

        registry.registerPriorityConfigProducer(ResourceLimitsPriority,
            func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
                plugins.PreScore = appendToPluginSet(plugins.PreScore, noderesources.ResourceLimitsName, nil)
                plugins.Score = appendToPluginSet(plugins.Score, noderesources.ResourceLimitsName, &args.Weight)
                return
            })
        registry.DefaultPriorities[ResourceLimitsPriority] = 1
    }

    return registry
}

基于framerwork的注册方式

// ListAlgorithmProviders lists registered algorithm providers.
func ListAlgorithmProviders() string {
    r := NewRegistry()
    var providers []string
    for k := range r {
        providers = append(providers, k)
    }
    sort.Strings(providers)
    return strings.Join(providers, " | ")
}

func getDefaultConfig() *schedulerapi.Plugins {
    return &schedulerapi.Plugins{
        QueueSort: &schedulerapi.PluginSet{
            Enabled: []schedulerapi.Plugin{
                {Name: queuesort.Name},
            },
        },
        PreFilter: &schedulerapi.PluginSet{
            Enabled: []schedulerapi.Plugin{
                {Name: noderesources.FitName},
                {Name: nodeports.Name},
                {Name: interpodaffinity.Name},
            },
        },
        Filter: &schedulerapi.PluginSet{
            Enabled: []schedulerapi.Plugin{
                {Name: nodeunschedulable.Name},
                {Name: noderesources.FitName},
                {Name: nodename.Name},
                {Name: nodeports.Name},
                {Name: nodeaffinity.Name},
                {Name: volumerestrictions.Name},
                {Name: tainttoleration.Name},
                {Name: nodevolumelimits.EBSName},
                {Name: nodevolumelimits.GCEPDName},
                {Name: nodevolumelimits.CSIName},
                {Name: nodevolumelimits.AzureDiskName},
                {Name: volumebinding.Name},
                {Name: volumezone.Name},
                {Name: interpodaffinity.Name},
            },
        },
        PreScore: &schedulerapi.PluginSet{
            Enabled: []schedulerapi.Plugin{
                {Name: interpodaffinity.Name},
                {Name: defaultpodtopologyspread.Name},
                {Name: tainttoleration.Name},
            },
        },
        Score: &schedulerapi.PluginSet{
            Enabled: []schedulerapi.Plugin{
                {Name: noderesources.BalancedAllocationName, Weight: 1},
                {Name: imagelocality.Name, Weight: 1},
                {Name: interpodaffinity.Name, Weight: 1},
                {Name: noderesources.LeastAllocatedName, Weight: 1},
                {Name: nodeaffinity.Name, Weight: 1},
                {Name: nodepreferavoidpods.Name, Weight: 10000},
                {Name: defaultpodtopologyspread.Name, Weight: 1},
                {Name: tainttoleration.Name, Weight: 1},
            },
        },
        Bind: &schedulerapi.PluginSet{
            Enabled: []schedulerapi.Plugin{
                {Name: defaultbinder.Name},
            },
        },
    }
}
  • Pre-filter
    pre filter中的插件用来做一些必要的检查,在每一次调度循环中只运行一次,如果在执行过程中发生错误则本次调度终止。
  • Filter
    filter用来过滤掉不满足pod运行条件的node,该过程可以并发对所有node进行检查,每一个node都会被filter中的插件检查一遍,除非在某一个插件处遇到错误将停止其他插件的执行,在一个调度周期内可以执行多次
  • Score
    这个阶段会运行 score plugin 对赛选出来的node打分,最后会汇总所有 score plugin对每一个node的打分,最终挑选出分数最高的node
  • Bind
    这一步将选出来的node name作为一个属性值设置到pod资源上,然后调用api server进行持久化存储
  • 抢占
    如果在一轮调度后没有合适的节点被选出那么将执行抢占逻辑,整体步骤如下
  1. 对抢占者进行合法抢占检查
    抢占者开启了抢占选项并且抢占策略是可以抢占的,如果该抢占者之前已经抢占过了一次,NominatedNodeName已经被设置了某个节点的name值,但是在该节点中存在优先级比抢占者低并且已经处于将要被删除状态,那么禁止该抢占者抢占
  2. 找出潜在可以被抢占的pod所在的主机
    在本次调度失败后,失败的一些原因已经被记录了下来,通过失败记录排除掉存在 UnschedulableAndUnresolvable 这种错误的节点
  3. 尝试找到被抢占后付出代价最小的节点
    根据pdb特性以及afffinity、anti-affinity再次对node进行过滤,然后从中根据一些条件比如被抢占的pod的优先级最低、ypod数量最少、所有pod的优先级总和最小等策略选择出最合适的node

三、kubenetes schedule extension 介绍

在schedule模块中原生的插件只是最基本的,在使用过程中一定需要很多适合业务的一些插件需要被执行,新的framerwork框架为用户提供了以下几种扩展实现方式

  1. 将插件实现在schedule源码中
    按照源码中现有的程序组织方式实现进来,这种方式对实现者的能力有很大要求,实现和必须对k8s的整个架构以及各个模块细节了如指掌,要不然会导致整个k8s出现错误,不推荐这种方式
  2. 以http扩展的方式
    这种方式在schedule源码中已经实现了扩展点实现者可以使用任何语言来实现一个http server接收对应的参数并且实现自己的插件逻辑,这种方式是比较推荐的方式,但是是以https协议实现的schedule需要和extensuon进行通信会有延迟,并且没法控制取消调度以及共享cache,实现者需要自己再实现一套缓存在在即的扩展server中
  3. 修改kube-schedule代码结构中的main方法
    实现自己的register,将自己的插件以函数的方式注册进来,然后和schedule源码一起编译,这样解决了扩展方式2中带来的问题,当然不可避免的也要修改一点点源码,相比较方式1来说好了很多,至少不会影响到schedule的核心逻辑

四、参考

  1. https://github.com/kubernetes/enhancements/blob/master/keps/sig-scheduling/20180409-scheduling-framework.md#configuring-plugins
  2. https://github.com/kubernetes/enhancements/tree/master/keps/sig-scheduling/1819-scheduler-extender
  3. https://github.com/kubernetes/community/blob/b3349d5b1354df814b67bbdee6890477f3c250cb/contributors/design-proposals/scheduling/pod-preemption.md
  4. https://draveness.me/system-design-scheduler/
  5. https://developer.ibm.com/technologies/containers/articles/creating-a-custom-kube-scheduler/
  6. https://github.com/AliyunContainerService/gpushare-scheduler-extender
查看原文

赞 0 收藏 0 评论 0

朱伟 发布了文章 · 8月14日

聊聊 containerd

禁止复制,转载请注明出处和作者
Containerd 模块从 docker 中分离出来后,功能变得更加丰富,依赖者变得更加广泛,不仅仅是docker在使用,可以看到在云原生应用中涉及到容器技术时基本都在依赖 containerd。

来源https://www.slideshare.net/Docker/leveraging-the-power-of-containerd-events-evan-hazlett
图片来源 https://www.slideshare.net/Docker/leveraging-the-power-of-containerd-events-evan-hazlett

一、Containerd 作用

containerd 的下层是runc,containerd 主要在为runc提供 oci(Open Container Initiative)runtime spec,oci 定义了两个标准,一个是关于镜像的,一个是关于容器运行时,容器运行时标准简单来说就是一个config.json文件和一个rootfs,关于oci的详细信息可以看官方github仓库或者文末参考。containerd就是oci一个标准实现。

image

containerd 在docker架构中的位置
二、Container架构分析
Architecture图片.png

containerd本身就提供了ctr命令行工具,grpc接口用来管理容器的生命周期,containerd在镜像管理上进行了创新,不再像docker使用graphdriver来管理镜像,而是使用快照的方式,在容器的世界中存在两种镜像,一种是overlays filesystems(AUFS、OverlayFS), 一种是snapshotting filesystems(devicemapper、btrfs、ZFS)

image

containerd中的数据流

containerd的工作主要分为以下几步
1)拉取镜像,存储到 metadata(metadata是个bolt键值型数据库) 和content中,content中的存储是带标签的存储,metadata中的存储是不带标签,content主要用来进行独立执行或者测试使用,查询的还是metadata中的数据。metadata中的content中的内容主要是 manifests 的index、manifests、config.json、image layer。在拉取完之后同时创建了一个镜像到metadata的image内容空间,还有几本snapshot的构建。
metadata 的schema

// keys.
//  ├──version : <varint>                        - Latest version, see migrations
//  └──v1                                        - Schema version bucket
//     ╘══*namespace*
//        ├──labels
//        │  ╘══*key* : <string>                 - Label value
//        ├──image
//        │  ╘══*image name*
//        │     ├──createdat : <binary time>     - Created at
//        │     ├──updatedat : <binary time>     - Updated at
//        │     ├──target
//        │     │  ├──digest : <digest>          - Descriptor digest
//        │     │  ├──mediatype : <string>       - Descriptor media type
//        │     │  └──size : <varint>            - Descriptor size
//        │     └──labels
//        │        ╘══*key* : <string>           - Label value
//        ├──containers
//        │  ╘══*container id*
//        │     ├──createdat : <binary time>     - Created at
//        │     ├──updatedat : <binary time>     - Updated at
//        │     ├──spec : <binary>               - Proto marshaled spec
//        │     ├──image : <string>              - Image name
//        │     ├──snapshotter : <string>        - Snapshotter name
//        │     ├──snapshotKey : <string>        - Snapshot key
//        │     ├──runtime
//        │     │  ├──name : <string>            - Runtime name
//        │     │  ├──extensions
//        │     │  │  ╘══*name* : <binary>       - Proto marshaled extension
//        │     │  └──options : <binary>         - Proto marshaled options
//        │     └──labels
//        │        ╘══*key* : <string>           - Label value
//        ├──snapshots
//        │  ╘══*snapshotter*
//        │     ╘══*snapshot key*
//        │        ├──name : <string>            - Snapshot name in backend
//        │        ├──createdat : <binary time>  - Created at
//        │        ├──updatedat : <binary time>  - Updated at
//        │        ├──parent : <string>          - Parent snapshot name
//        │        ├──children
//        │        │  ╘══*snapshot key* : <nil>  - Child snapshot reference
//        │        └──labels
//        │           ╘══*key* : <string>        - Label value
//        ├──content
//        │  ├──blob
//        │  │  ╘══*blob digest*
//        │  │     ├──createdat : <binary time>  - Created at
//        │  │     ├──updatedat : <binary time>  - Updated at
//        │  │     ├──size : <varint>            - Blob size
//        │  │     └──labels
//        │  │        ╘══*key* : <string>        - Label value
//        │  └──ingests
//        │     ╘══*ingest reference*
//        │        ├──ref : <string>             - Ingest reference in backend
//        │        ├──expireat : <binary time>   - Time to expire ingest
//        │        └──expected : <digest>        - Expected commit digest
//        └──leases
//           ╘══*lease id*
//              ├──createdat : <binary time>     - Created at
//              ├──labels
//              │  ╘══*key* : <string>           - Label value
//              ├──snapshots
//              │  ╘══*snapshotter*
//              │     ╘══*snapshot key* : <nil>  - Snapshot reference
//              ├──content
//              │  ╘══*blob digest* : <nil>      - Content blob reference
//              └──ingests
//                 ╘══*ingest reference* : <nil> - Content ingest reference

在contained中是存在namespace的概念的
2)当运行容器时,利用metadata中的content、snapshot、image信息进行active snapshot(可以理解为docker中的容器层)的构建,构建bundle,调用runc启动容器。

containerd中的插件机制

containerd对外接口有ctr命令行和有grpc,ctr也是通过grpc协议来与containerd server通信,grpc server不是像我们平时使用的那样将各个service导入到某个入口包中进行注册,containerd使用了插件注册机制,将task、content、snapshot、namespace、event、containers等服务以插件的方式注册然后提供服务.
这是containerd的内部插件

[root@master containerd]# ctr plugin ls
TYPE                            ID                       PLATFORMS      STATUS
io.containerd.content.v1        content                  -              ok
io.containerd.snapshotter.v1    aufs                     linux/amd64    error
io.containerd.snapshotter.v1    btrfs                    linux/amd64    error
io.containerd.snapshotter.v1    devmapper                linux/amd64    error
io.containerd.snapshotter.v1    native                   linux/amd64    ok
io.containerd.snapshotter.v1    overlayfs                linux/amd64    ok
io.containerd.snapshotter.v1    zfs                      linux/amd64    error
io.containerd.metadata.v1       bolt                     -              ok
io.containerd.differ.v1         walking                  linux/amd64    ok
io.containerd.gc.v1             scheduler                -              ok
io.containerd.service.v1        introspection-service    -              ok
io.containerd.service.v1        containers-service       -              ok
io.containerd.service.v1        content-service          -              ok
io.containerd.service.v1        diff-service             -              ok
io.containerd.service.v1        images-service           -              ok
io.containerd.service.v1        leases-service           -              ok
io.containerd.service.v1        namespaces-service       -              ok
io.containerd.service.v1        snapshots-service        -              ok
io.containerd.runtime.v1        linux                    linux/amd64    ok
io.containerd.runtime.v2        task                     linux/amd64    ok
io.containerd.monitor.v1        cgroups                  linux/amd64    ok
io.containerd.service.v1        tasks-service            -              ok
io.containerd.internal.v1       restart                  -              ok
io.containerd.grpc.v1           containers               -              ok
io.containerd.grpc.v1           content                  -              ok
io.containerd.grpc.v1           diff                     -              ok
io.containerd.grpc.v1           events                   -              ok
io.containerd.grpc.v1           healthcheck              -              ok
io.containerd.grpc.v1           images                   -              ok
io.containerd.grpc.v1           leases                   -              ok
io.containerd.grpc.v1           namespaces               -              ok
io.containerd.internal.v1       opt                      -              ok
io.containerd.grpc.v1           snapshots                -              ok
io.containerd.grpc.v1           tasks                    -              ok
io.containerd.grpc.v1           version                  -              ok

扩展插件的两种方式
1、通过二进制方式在containerd的命令行传入
2、通过配置containerd的配置文件来设置proxy pligin
runtime在containerd中有v1和v2两个版本,可以在执行ctr run命令中通过命令行传入

ctr run --runtime io.containerd.runc.v1

自定义快照插件,/etc/containerd/config.toml 是containerd的默认配置文件,

[proxy_plugins]
  [proxy_plugins.customsnapshot]
    type = "snapshot"
    address = "/var/run/mysnapshotter.sock"

proxy plugin会在containerd启动时随着内部插件一起启动。

containerd中的event

containerd中实现了event的发订阅功能,在对每个资源操作后都会推送相关事件给订阅者,利用这个功能可以事件对containerd的监控和实现一些hook功能。

containerd中的namespace

通过上文中metadata中的scame可以看到在containerd中是存在命名空间的概念以及实现,类似opensatck中多租户一样,可以将不同的业务和应用进行隔离,比如kubernetes中使用containerd和docker在使用containerd时就使用了不同的namespace。

三、总结

containerd代码从docker中分离出来后,功能变得丰富强大以至于基本所有云计算中容器相关的底层都在利用containerd,在镜像管理上作者实现了微创新,docker中使用的是graph driver管理镜像,containerd中使用的snapshot,为什么会使用snapshot模型,因为snapshot在增量快照上是有严格的父子关系,这种关系和镜像的分层模型很一致,在容器世界有两种文件系统一种是块级别的一种是文件系统级别,container的将块级别的进行分层化,将文件系统级别的进行快照化。

四、参考

1、runtime spec
2、https://github.com/containerd/containerd/blob/master/PLUGINS.md
3、https://github.com/containerd/containerd/blob/master/design/data-flow.md
4、https://www.slideshare.net/Docker/assessing-container-runtime-performance-with-bucketbench-by-phil-estes-ibm
5、https://cizixs.com/2017/11/05/oci-and-runc/
6、https://heychenbin.github.io/post/containerd_intro/
7、https://www.jianshu.com/p/86296691ca49
8、https://blog.mobyproject.org/where-are-containerds-graph-drivers-145fc9b7255
9、https://container42.com/2017/10/14/containerd-deep-dive-intro/

查看原文

赞 0 收藏 0 评论 0

朱伟 发布了文章 · 7月11日

raft协议在 etcd中的应用

一、简介

etcd 是基于 raft 协议实现的分布式一致性jian值存储,本篇文章不介绍etcd的使用,本文讲解在etcd源码中提供的example,通过这个example来学习etcd是如何使用 raft协议。


二、实现

这个example在etcd源码目录下的contrib目录中

tree -L 1
.
├── Makefile
├── NOTICE
├── OWNERS
├── Procfile
├── Procfile.v2
├── README.md
├── ROADMAP.md
├── auth
├── bill-of-materials.json
├── bill-of-materials.override.json
├── build
├── build.bat
├── build.ps1
├── client
├── clientv3
├── code-of-conduct.md
├── contrib    # 今天的主角
├── docs
├── embed
├── etcd.conf.yml.sample
├── etcdctl
├── etcdmain
├── etcdserver
├── functional
├── functional.yaml
├── go.mod
├── go.sum
├── hack
├── integration
├── lease
├── logos
├── main.go
├── main_test.go
├── mvcc
├── pkg
├── proxy
├── raft
├── scripts
├── test
├── tests
├── tools
├── vendor
├── version
└── wal
tree -L 1 contrib/raftexample/
contrib/raftexample/
├── Procfile
├── README.md
├── doc.go
├── httpapi.go
├── kvstore.go
├── kvstore_test.go
├── listener.go
├── main.go
├── raft.go
└── raftexample_test.go

先看一下入口文件 main.go

func main() {
    cluster := flag.String("cluster", "http://127.0.0.1:9021", "comma separated cluster peers")
    id := flag.Int("id", 1, "node ID")
    kvport := flag.Int("port", 9121, "key-value server port")
    join := flag.Bool("join", false, "join an existing cluster")
    flag.Parse()

    proposeC := make(chan string)
    defer close(proposeC)
    confChangeC := make(chan raftpb.ConfChange)
    defer close(confChangeC)

    // raft provides a commit stream for the proposals from the http api
    var kvs *kvstore
    getSnapshot := func() ([]byte, error) { return kvs.getSnapshot() }
    commitC, errorC, snapshotterReady := newRaftNode(*id, strings.Split(*cluster, ","), *join, getSnapshot, proposeC, confChangeC)

    kvs = newKVStore(<-snapshotterReady, proposeC, commitC, errorC)

    // the key-value http handler will propose updates to raft
    serveHttpKVAPI(kvs, *kvport, confChangeC, errorC)
}

进行了一些初始化动作,看一下 newRaftNode 在 raft.go文件中

// newRaftNode initiates a raft instance and returns a committed log entry
// channel and error channel. Proposals for log updates are sent over the
// provided the proposal channel. All log entries are replayed over the
// commit channel, followed by a nil message (to indicate the channel is
// current), then new log entries. To shutdown, close proposeC and read errorC.
func newRaftNode(id int, peers []string, join bool, getSnapshot func() ([]byte, error), proposeC <-chan string,
    confChangeC <-chan raftpb.ConfChange) (<-chan *string, <-chan error, <-chan *snap.Snapshotter) {

    commitC := make(chan *string)
    errorC := make(chan error)
    // 初始化 raftnode 这个raft node 是etcd 中应用层面的 raft node 在 raft 协议层面也是用一个 raft node 通过应用层面的结构体定义可以发现 在结构体中是存在一个 raft 协议层面的node的,这两个node是一对一的关系
    rc := &raftNode{
        proposeC:    proposeC,
        confChangeC: confChangeC,
        commitC:     commitC,
        errorC:      errorC,
        id:          id,
        peers:       peers,
        join:        join,
        waldir:      fmt.Sprintf("raftexample-%d", id),
        snapdir:     fmt.Sprintf("raftexample-%d-snap", id),
        getSnapshot: getSnapshot,
        snapCount:   defaultSnapshotCount,
        stopc:       make(chan struct{}),
        httpstopc:   make(chan struct{}),
        httpdonec:   make(chan struct{}),

        snapshotterReady: make(chan *snap.Snapshotter, 1),
        // rest of structure populated after WAL replay
    }
    go rc.startRaft()
    return commitC, errorC, rc.snapshotterReady
}

看一下 startRaft 做了什么,还是在当前文件下

func (rc *raftNode) startRaft() {
    if !fileutil.Exist(rc.snapdir) {
        if err := os.Mkdir(rc.snapdir, 0750); err != nil {
            log.Fatalf("raftexample: cannot create dir for snapshot (%v)", err)
        }
    }
    // 获取快照实例
    rc.snapshotter = snap.New(zap.NewExample(), rc.snapdir)
    rc.snapshotterReady <- rc.snapshotter
   // 重放wal日志到内存,因为etcd维护了内存索引,查询时会通过内存索引获取到信息,这个信息是指key的值和版本号
    oldwal := wal.Exist(rc.waldir)
    rc.wal = rc.replayWAL()

    rpeers := make([]raft.Peer, len(rc.peers))
    for i := range rpeers {
        rpeers[i] = raft.Peer{ID: uint64(i + 1)}
    }
    // 和 raft 一致性协议的相关配置
    c := &raft.Config{
        ID:                        uint64(rc.id),
        ElectionTick:              10,
        HeartbeatTick:             1,
        Storage:                   rc.raftStorage,
        MaxSizePerMsg:             1024 * 1024,
        MaxInflightMsgs:           256,
        MaxUncommittedEntriesSize: 1 << 30,
    }
    // 初始化 raft协议层面的 node 
    if oldwal {
        rc.node = raft.RestartNode(c)
    } else {
        startPeers := rpeers
        if rc.join {
            startPeers = nil
        }
        rc.node = raft.StartNode(c, startPeers)
    }
    // 初始化 transport, transport 用来和etcd 集群中其他节点间进行通信并传递信息的桥梁,raft协议只是实现了消息和状态的一致,但是没有实现传输消息的代码,这部分需要etcd应用层面来实现
    rc.transport = &rafthttp.Transport{
        Logger:      zap.NewExample(),
        ID:          types.ID(rc.id),
        ClusterID:   0x1000,
        Raft:        rc,
        ServerStats: stats.NewServerStats("", ""),
        LeaderStats: stats.NewLeaderStats(strconv.Itoa(rc.id)),
        ErrorC:      make(chan error),
    }
    // 记录集群中实例信息,用来通信
    rc.transport.Start()
    for i := range rc.peers {
        if i+1 != rc.id {
            rc.transport.AddPeer(types.ID(i+1), []string{rc.peers[i]})
        }
    }
    // serveRaft 是transport 的httpserver用来处理通信,先不看
    go rc.serveRaft()
    go rc.serveChannels()
}

看一下 serveChannels,这里是重点,下面我们来分析一下整个过程

func (rc *raftNode) serveChannels() {
    snap, err := rc.raftStorage.Snapshot()
    if err != nil {
        panic(err)
    }
    rc.confState = snap.Metadata.ConfState
    rc.snapshotIndex = snap.Metadata.Index
    rc.appliedIndex = snap.Metadata.Index

    defer rc.wal.Close()

    ticker := time.NewTicker(100 * time.Millisecond)
    defer ticker.Stop()
     // 当e我们对etcd进行操作时,增删改查时,都是一个proposals,这个proposals要传递到 raft 协议中,让其维护集群中各个节点的一致状态
    // send proposals over raft
    go func() {
        confChangeCount := uint64(0)

        for rc.proposeC != nil && rc.confChangeC != nil {
            select {
            // 收到一个 proposals 后发送到 raft 协议中,后面会看到当一个http请求进来时会向这个proposeC传递数据的
            case prop, ok := <-rc.proposeC:
                if !ok {
                    rc.proposeC = nil
                } else {
                    // blocks until accepted by raft state machine
                    // 发送
                    rc.node.Propose(context.TODO(), []byte(prop))
                }

            case cc, ok := <-rc.confChangeC:
                if !ok {
                    rc.confChangeC = nil
                } else {
                    confChangeCount++
                    cc.ID = confChangeCount
                    rc.node.ProposeConfChange(context.TODO(), cc)
                }
            }
        }
        // client closed channel; shutdown raft if not already
        close(rc.stopc)
    }()

    // event loop on raft state machine updates
    for {
        select {
        case <-ticker.C:
            rc.node.Tick()

        // store raft entries to wal, then publish over commit channel
        // 当raft 协议处理完后,会返回给上层应用一条消息,由etcd应用层面进行处理,raft 协议层做了什么先不分析,接下来会专门写一篇文章来分析raft协议的流程
        case rd := <-rc.node.Ready():
            // 持久化到 wal 日志
            rc.wal.Save(rd.HardState, rd.Entries)
            if !raft.IsEmptySnap(rd.Snapshot) {
                // 保存到快照
                rc.saveSnap(rd.Snapshot)
    rc.raftStorage.ApplySnapshot(rd.Snapshot)
                rc.publishSnapshot(rd.Snapshot)
            }
            // 添加到内存索引中
            rc.raftStorage.Append(rd.Entries)
            // 发送到集群中其他节点
            rc.transport.Send(rd.Messages)
            if ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries)); !ok {
                rc.stop()
                return
            }
            rc.maybeTriggerSnapshot()
            // 从 raft 协议中获取下一条待处理的消息
            rc.node.Advance()

        case err := <-rc.transport.ErrorC:
            rc.writeError(err)
            return

        case <-rc.stopc:
            rc.stop()
            return
        }
    }
}

让我们回到 main.go 文件中,从 newRaftNode 这个函数一直走了很远出来,这个函数最后返回了 几个参数

    commitC, errorC, snapshotterReady := newRaftNode(*id, strings.Split(*cluster, ","), *join, getSnapshot, proposeC, confChangeC)

接着这行代码继续向下分析 newKVStore

kvs = newKVStore(<-snapshotterReady, proposeC, commitC, errorC)

着是一个简单的 内存 kv 存储,模拟了etcd中的kv存储,在etcd中 v3版本是用 bolt 这个golang语言开发的kv存储,这个 example为了说明raft协议在etcd中的应用所以简单用内存结构构造了kv存储。代码里做的事情就是读取 commitC 这个cahnnel 中的信息,然后将信息存储到map中,就不具体分析了

serveHttpKVAPI(kvs, *kvport, confChangeC, errorC)

紧接着启动了http服务,具体实现在 httpapi.go 文件中

func (h *httpKVAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    key := r.RequestURI
    switch {
    // 看一下put操作
    case r.Method == "PUT":
        v, err := ioutil.ReadAll(r.Body)
        if err != nil {
            log.Printf("Failed to read on PUT (%v)\n", err)
            http.Error(w, "Failed on PUT", http.StatusBadRequest)
            return
        }
        // stor 是 kvstor 内存kv存储,
        h.store.Propose(key, string(v))

        // Optimistic-- no waiting for ack from raft. Value is not yet
        // committed so a subsequent GET on the key may return old value
        w.WriteHeader(http.StatusNoContent)
    case r.Method == "GET":
        if v, ok := h.store.Lookup(key); ok {
            w.Write([]byte(v))
        } else {
            http.Error(w, "Failed to GET", http.StatusNotFound)
        }
    case r.Method == "POST":
        url, err := ioutil.ReadAll(r.Body)
        if err != nil {
            log.Printf("Failed to read on POST (%v)\n", err)
            http.Error(w, "Failed on POST", http.StatusBadRequest)
            return
        }

        nodeId, err := strconv.ParseUint(key[1:], 0, 64)
        if err != nil {
            log.Printf("Failed to convert ID for conf change (%v)\n", err)
            http.Error(w, "Failed on POST", http.StatusBadRequest)
            return
        }

        cc := raftpb.ConfChange{
            Type:    raftpb.ConfChangeAddNode,
            NodeID:  nodeId,
            Context: url,
        }
        h.confChangeC <- cc

        // As above, optimistic that raft will apply the conf change
        w.WriteHeader(http.StatusNoContent)
    case r.Method == "DELETE":
        nodeId, err := strconv.ParseUint(key[1:], 0, 64)
        if err != nil {
            log.Printf("Failed to convert ID for conf change (%v)\n", err)
            http.Error(w, "Failed on DELETE", http.StatusBadRequest)
            return
        }

        cc := raftpb.ConfChange{
            Type:   raftpb.ConfChangeRemoveNode,
            NodeID: nodeId,
        }
        h.confChangeC <- cc

        // As above, optimistic that raft will apply the conf change
        w.WriteHeader(http.StatusNoContent)
    default:
        w.Header().Set("Allow", "PUT")
        w.Header().Add("Allow", "GET")
        w.Header().Add("Allow", "POST")
        w.Header().Add("Allow", "DELETE")
        http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
    }
}

// serveHttpKVAPI starts a key-value server with a GET/PUT API and listens.
func serveHttpKVAPI(kv *kvstore, port int, confChangeC chan<- raftpb.ConfChange, errorC <-chan error) {
    srv := http.Server{
        Addr: ":" + strconv.Itoa(port),
        Handler: &httpKVAPI{
            store:       kv,
            confChangeC: confChangeC,
        },
    }
    go func() {
        if err := srv.ListenAndServe(); err != nil {
            log.Fatal(err)
        }
    }()

    // exit when raft goes down
    if err, ok := <-errorC; ok {
        log.Fatal(err)
    }
}
func (s *kvstore) Propose(k string, v string) {
    var buf bytes.Buffer
    if err := gob.NewEncoder(&buf).Encode(kv{k, v}); err != nil {
        log.Fatal(err)
    }
     // 发送请求数据到proposeC中,上面我们分析过有地方在监听这个proposeC channel
    s.proposeC <- buf.String()
}

到此整个example中的raft流程结束了,看上去还是蛮简单的,接下来会专门分析一下 raft协议内部的原理。

查看原文

赞 0 收藏 0 评论 0

朱伟 发布了文章 · 5月29日

Nsq原理分析(二)

上一篇文章中对nsq进行了简单的介绍,从nsq 的golang的客户端代码分析了一下nsq的使用,这篇文章会分析nsqd的代码

Nsqd代码分析

nsqd做了什么

  • nsqd接收对topic、channel的创建以及对消息的存储和分发
  • nsqd向nsqlookup注册自己的服务信息,ip 和端口,向nsqlookup注册自己的元数据信息(topic、channel),nsqd也会向nsqdlook查询topic、和channel信息

nsq.png
nsqadmin 是一个简单的管理界面,通过它可以查询topic、channel、消费者等等一些基本信息,nsqadmin是从 nsqlookup中获取信息的,通过nsqadmin也可以创建topic、channel,创建到了nsqlookup中,在nsqlookup中的内存中维护者,nsqd 会在某一个合适的时刻将这些信息拉回本地然后创建
nsqd 启动

func (n *NSQD) Main() error {
    ctx := &context{n}

    exitCh := make(chan error)
    var once sync.Once
    exitFunc := func(err error) {
        once.Do(func() {
            if err != nil {
                n.logf(LOG_FATAL, "%s", err)
            }
            exitCh <- err
        })
    }

    n.tcpServer.ctx = ctx
    // 启动 tcp监听
    n.waitGroup.Wrap(func() {
        exitFunc(protocol.TCPServer(n.tcpListener, n.tcpServer, n.logf))
    })
    // 启动http监听
    httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired)
    n.waitGroup.Wrap(func() {
        exitFunc(http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf))
    })

    if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" {
        httpsServer := newHTTPServer(ctx, true, true)
        n.waitGroup.Wrap(func() {
            exitFunc(http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.logf))
        })
    }
   // 队列扫描,处理超时、延迟等信息
    n.waitGroup.Wrap(n.queueScanLoop)
    // 向nsqlookup注册自己的元数据信息
    n.waitGroup.Wrap(n.lookupLoop)
    if n.getOpts().StatsdAddress != "" {
        n.waitGroup.Wrap(n.statsdLoop)
    }

    err := <-exitCh
    return err
}
func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) error {
    logf(lg.INFO, "TCP: listening on %s", listener.Addr())

    var wg sync.WaitGroup

    for {
       //等待请求的到来
        clientConn, err := listener.Accept()
        if err != nil {
            if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
                logf(lg.WARN, "temporary Accept() failure - %s", err)
                runtime.Gosched()
                continue
            }
            // theres no direct way to detect this error because it is not exposed
            if !strings.Contains(err.Error(), "use of closed network connection") {
                return fmt.Errorf("listener.Accept() error - %s", err)
            }
            break
        }

        wg.Add(1)
        // 每当到来一个请求都启动一个goroutine进行处理
        go func() {
            handler.Handle(clientConn)
            wg.Done()
        }()
    }

    // wait to return until all handler goroutines complete
    wg.Wait()

    logf(lg.INFO, "TCP: closing %s", listener.Addr())

    return nil
}
unc (p *tcpServer) Handle(clientConn net.Conn) {
    p.ctx.nsqd.logf(LOG_INFO, "TCP: new client(%s)", clientConn.RemoteAddr())

    // The client should initialize itself by sending a 4 byte sequence indicating
    // the version of the protocol that it intends to communicate, this will allow us
    // to gracefully upgrade the protocol away from text/line oriented to whatever...
    buf := make([]byte, 4)
    _, err := io.ReadFull(clientConn, buf)
    if err != nil {
        p.ctx.nsqd.logf(LOG_ERROR, "failed to read protocol version - %s", err)
        clientConn.Close()
        return
    }
    //协商协议版本
    protocolMagic := string(buf)

    p.ctx.nsqd.logf(LOG_INFO, "CLIENT(%s): desired protocol magic '%s'",
        clientConn.RemoteAddr(), protocolMagic)

    var prot protocol.Protocol
    switch protocolMagic {
    case "  V2":
        prot = &protocolV2{ctx: p.ctx}
    default:
        protocol.SendFramedResponse(clientConn, frameTypeError, []byte("E_BAD_PROTOCOL"))
        clientConn.Close()
        p.ctx.nsqd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'",
            clientConn.RemoteAddr(), protocolMagic)
        return
    }

    p.conns.Store(clientConn.RemoteAddr(), clientConn)
    // 开始一个死循环
    err = prot.IOLoop(clientConn)
    if err != nil {
        p.ctx.nsqd.logf(LOG_ERROR, "client(%s) - %s", clientConn.RemoteAddr(), err)
    }

    p.conns.Delete(clientConn.RemoteAddr())
}
func (p *protocolV2) IOLoop(conn net.Conn) error {
    var err error
    var line []byte
    var zeroTime time.Time

    clientID := atomic.AddInt64(&p.ctx.nsqd.clientIDSequence, 1)
    client := newClientV2(clientID, conn, p.ctx)
    p.ctx.nsqd.AddClient(client.ID, client)

    // synchronize the startup of messagePump in order
    // to guarantee that it gets a chance to initialize
    // goroutine local state derived from client attributes
    // and avoid a potential race with IDENTIFY (where a client
    // could have changed or disabled said attributes)
    messagePumpStartedChan := make(chan bool)
    go p.messagePump(client, messagePumpStartedChan)
    // 消息分发,向消费者发送消息
    <-messagePumpStartedChan

    for {
    // 设置socket读取超时,如果consumer未在指定的时间内发送过来,那么会断开连接,导致consumer退出
        if client.HeartbeatInterval > 0 {
            client.SetReadDeadline(time.Now().Add(client.HeartbeatInterval * 2))
        } else {
            client.SetReadDeadline(zeroTime)
        }

        // ReadSlice does not allocate new space for the data each request
        // ie. the returned slice is only valid until the next call to it
        //读取生产者或者消费者发送过来的请求
        line, err = client.Reader.ReadSlice('\n')
        if err != nil {
            if err == io.EOF {
                err = nil
            } else {
                err = fmt.Errorf("failed to read command - %s", err)
            }
            break
        }

        // trim the '\n'
        line = line[:len(line)-1]
        // optionally trim the '\r'
        if len(line) > 0 && line[len(line)-1] == '\r' {
            line = line[:len(line)-1]
        }
        params := bytes.Split(line, separatorBytes)

        p.ctx.nsqd.logf(LOG_DEBUG, "PROTOCOL(V2): [%s] %s", client, params)

        var response []byte
        // 根据不同的命令执行不同的动作
        response, err = p.Exec(client, params)
        if err != nil {
            ctx := ""
            if parentErr := err.(protocol.ChildErr).Parent(); parentErr != nil {
                ctx = " - " + parentErr.Error()
            }
            p.ctx.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, err, ctx)

            sendErr := p.Send(client, frameTypeError, []byte(err.Error()))
            if sendErr != nil {
                p.ctx.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, sendErr, ctx)
                break
            }

            // errors of type FatalClientErr should forceably close the connection
            if _, ok := err.(*protocol.FatalClientErr); ok {
                break
            }
            continue
        }

        if response != nil {
            err = p.Send(client, frameTypeResponse, response)
            if err != nil {
                err = fmt.Errorf("failed to send response - %s", err)
                break
            }
        }
    }

    p.ctx.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] exiting ioloop", client)
    conn.Close()
    close(client.ExitChan)
    if client.Channel != nil {
        client.Channel.RemoveClient(client.ID)
    }

    p.ctx.nsqd.RemoveClient(client.ID)
    return err
}

在继续向下看前,看一下生产者的 PUB 请求在nsqd中做了什么

func (p *protocolV2) PUB(client *clientV2, params [][]byte) ([]byte, error) {
    var err error

    if len(params) < 2 {
        return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "PUB insufficient number of parameters")
    }

    topicName := string(params[1])
    if !protocol.IsValidTopicName(topicName) {
        return nil, protocol.NewFatalClientErr(nil, "E_BAD_TOPIC",
            fmt.Sprintf("PUB topic name %q is not valid", topicName))
    }

    bodyLen, err := readLen(client.Reader, client.lenSlice)
    if err != nil {
        return nil, protocol.NewFatalClientErr(err, "E_BAD_MESSAGE", "PUB failed to read message body size")
    }

    if bodyLen <= 0 {
        return nil, protocol.NewFatalClientErr(nil, "E_BAD_MESSAGE",
            fmt.Sprintf("PUB invalid message body size %d", bodyLen))
    }

    if int64(bodyLen) > p.ctx.nsqd.getOpts().MaxMsgSize {
        return nil, protocol.NewFatalClientErr(nil, "E_BAD_MESSAGE",
            fmt.Sprintf("PUB message too big %d > %d", bodyLen, p.ctx.nsqd.getOpts().MaxMsgSize))
    }

    messageBody := make([]byte, bodyLen)
    _, err = io.ReadFull(client.Reader, messageBody)
    if err != nil {
        return nil, protocol.NewFatalClientErr(err, "E_BAD_MESSAGE", "PUB failed to read message body")
    }

    if err := p.CheckAuth(client, "PUB", topicName, ""); err != nil {
        return nil, err
    }
    // topic 在nsqd中的创建的lazy create,只有当某个生产者向该topic中发送消息时才会创建topic,
    topic := p.ctx.nsqd.GetTopic(topicName)
    msg := NewMessage(topic.GenerateID(), messageBody)
    err = topic.PutMessage(msg)
    if err != nil {
        return nil, protocol.NewFatalClientErr(err, "E_PUB_FAILED", "PUB failed "+err.Error())
    }

    client.PublishedMessage(topicName, 1)

    return okBytes, nil
}
/ GetTopic performs a thread safe operation
// to return a pointer to a Topic object (potentially new)
func (n *NSQD) GetTopic(topicName string) *Topic {
    // most likely, we already have this topic, so try read lock first.
    n.RLock()
    // 当topic在nsqd中创建过时就直接返回该topic
    t, ok := n.topicMap[topicName]
    n.RUnlock()
    if ok {
        return t
    }

    n.Lock()

    t, ok = n.topicMap[topicName]
    if ok {
        n.Unlock()
        return t
    }
    deleteCallback := func(t *Topic) {
        n.DeleteExistingTopic(t.name)
    }
    //稍后看一下这个函数
    t = NewTopic(topicName, &context{n}, deleteCallback)
    n.topicMap[topicName] = t

    n.Unlock()

    n.logf(LOG_INFO, "TOPIC(%s): created", t.name)
    // topic is created but messagePump not yet started

    // if loading metadata at startup, no lookupd connections yet, topic started after load
    if atomic.LoadInt32(&n.isLoading) == 1 {
        return t
    }

    // if using lookupd, make a blocking call to get the topics, and immediately create them.
    // this makes sure that any message received is buffered to the right channels
    //如果使用了nsqlookup,那么从nsqlookup中查询该topic的channel信息,如果没有在nsqd中创建就创建出来
    lookupdHTTPAddrs := n.lookupdHTTPAddrs()
    if len(lookupdHTTPAddrs) > 0 {
        channelNames, err := n.ci.GetLookupdTopicChannels(t.name, lookupdHTTPAddrs)
        if err != nil {
            n.logf(LOG_WARN, "failed to query nsqlookupd for channels to pre-create for topic %s - %s", t.name, err)
        }
        for _, channelName := range channelNames {
            if strings.HasSuffix(channelName, "#ephemeral") {
                continue // do not create ephemeral channel with no consumer client
            }
            t.GetChannel(channelName)
        }
    } else if len(n.getOpts().NSQLookupdTCPAddresses) > 0 {
        n.logf(LOG_ERROR, "no available nsqlookupd to query for channels to pre-create for topic %s", t.name)
    }

    // now that all channels are added, start topic messagePump
    t.Start()
    return t
}
// Topic constructor
func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topic {
    t := &Topic{
        name:              topicName,
        channelMap:        make(map[string]*Channel),
        memoryMsgChan:     nil,
        startChan:         make(chan int, 1),
        exitChan:          make(chan int),
        channelUpdateChan: make(chan int),
        ctx:               ctx,
        paused:            0,
        pauseChan:         make(chan int),
        deleteCallback:    deleteCallback,
        idFactory:         NewGUIDFactory(ctx.nsqd.getOpts().ID),
    }
    // create mem-queue only if size > 0 (do not use unbuffered chan)
    if ctx.nsqd.getOpts().MemQueueSize > 0 {
        t.memoryMsgChan = make(chan *Message, ctx.nsqd.getOpts().MemQueueSize)
    }
    if strings.HasSuffix(topicName, "#ephemeral") {
        t.ephemeral = true
        t.backend = newDummyBackendQueue()
    } else {
        dqLogf := func(level diskqueue.LogLevel, f string, args ...interface{}) {
            opts := ctx.nsqd.getOpts()
            lg.Logf(opts.Logger, opts.LogLevel, lg.LogLevel(level), f, args...)
        }
        //持久化的结构
        t.backend = diskqueue.New(
            topicName,
            ctx.nsqd.getOpts().DataPath,
            ctx.nsqd.getOpts().MaxBytesPerFile,
            int32(minValidMsgLength),
            int32(ctx.nsqd.getOpts().MaxMsgSize)+minValidMsgLength,
            ctx.nsqd.getOpts().SyncEvery,
            ctx.nsqd.getOpts().SyncTimeout,
            dqLogf,
        )
    }
   // topic中也启动了一个messagePump,在protocolv2中也启动了一个同名函数,前一个是为了向consumer推送消息,这个是向topic下的一个或者多个队列中发送消息
    t.waitGroup.Wrap(t.messagePump)
    // 通知持久化
    t.ctx.nsqd.Notify(t)

    return t
}

func (t *Topic) Start() {
    select {
    case t.startChan <- 1:
    default:
    }
}

看一下nsqd是如何向nsqlookup注册自己的元数据信息的,在nsqd启动时起了一个goroutine lookuploop

func (n *NSQD) lookupLoop() {
    var lookupPeers []*lookupPeer
    var lookupAddrs []string
    connect := true

    hostname, err := os.Hostname()
    if err != nil {
        n.logf(LOG_FATAL, "failed to get hostname - %s", err)
        os.Exit(1)
    }

    // for announcements, lookupd determines the host automatically
    ticker := time.Tick(15 * time.Second)
    for {
        if connect {
            for _, host := range n.getOpts().NSQLookupdTCPAddresses {
                if in(host, lookupAddrs) {
                    continue
                }
                n.logf(LOG_INFO, "LOOKUP(%s): adding peer", host)
                lookupPeer := newLookupPeer(host, n.getOpts().MaxBodySize, n.logf,
                    connectCallback(n, hostname))
                lookupPeer.Command(nil) // start the connection
                lookupPeers = append(lookupPeers, lookupPeer)
                lookupAddrs = append(lookupAddrs, host)
            }
            n.lookupPeers.Store(lookupPeers)
            connect = false
        }

        select {
        case <-ticker:
        // 向nsqlookup发送心跳信息
            // send a heartbeat and read a response (read detects closed conns)
            for _, lookupPeer := range lookupPeers {
                n.logf(LOG_DEBUG, "LOOKUPD(%s): sending heartbeat", lookupPeer)
                cmd := nsq.Ping()
                _, err := lookupPeer.Command(cmd)
                if err != nil {
                    n.logf(LOG_ERROR, "LOOKUPD(%s): %s - %s", lookupPeer, cmd, err)
                }
            }
        case val := <-n.notifyChan:
            var cmd *nsq.Command
            var branch string

            switch val.(type) {
            // 注册channel
            case *Channel:
                // notify all nsqlookupds that a new channel exists, or that it's removed
                branch = "channel"
                channel := val.(*Channel)
                if channel.Exiting() == true {
                    cmd = nsq.UnRegister(channel.topicName, channel.name)
                } else {
                    cmd = nsq.Register(channel.topicName, channel.name)
                }
            // 注册topic
            case *Topic:
           
                // notify all nsqlookupds that a new topic exists, or that it's removed
                branch = "topic"
                topic := val.(*Topic)
                if topic.Exiting() == true {
                    cmd = nsq.UnRegister(topic.name, "")
                } else {
                    cmd = nsq.Register(topic.name, "")
                }
            }

            for _, lookupPeer := range lookupPeers {
                n.logf(LOG_INFO, "LOOKUPD(%s): %s %s", lookupPeer, branch, cmd)
                _, err := lookupPeer.Command(cmd)
                if err != nil {
                    n.logf(LOG_ERROR, "LOOKUPD(%s): %s - %s", lookupPeer, cmd, err)
                }
            }
        case <-n.optsNotificationChan:
            var tmpPeers []*lookupPeer
            var tmpAddrs []string
            for _, lp := range lookupPeers {
                if in(lp.addr, n.getOpts().NSQLookupdTCPAddresses) {
                    tmpPeers = append(tmpPeers, lp)
                    tmpAddrs = append(tmpAddrs, lp.addr)
                    continue
                }
                n.logf(LOG_INFO, "LOOKUP(%s): removing peer", lp)
                lp.Close()
            }
            lookupPeers = tmpPeers
            lookupAddrs = tmpAddrs
            connect = true
        case <-n.exitChan:
            goto exit
        }
    }

exit:
    n.logf(LOG_INFO, "LOOKUP: closing")
}

在nsqd启动lookuploop这个goroutine时还启动了另一 queueScanLoop goroutine,主要用来监控超时消息的处理。
总结一下

  • nsqd启动时分别监听tcp、http端口
  • 启动loopuploop goroutine 向nsqlookup 注册自己的相关信息
  • 启动 queueScanLoop goroutine 对超时消息进行处理
  • 启动 statsdLoop goroutine 进行性能和topic、channel等一些参数进行统计
  • 当有 producer client 通过 PUB 命令接入进来时,nsqd 会情动一个单独的 goroutine 进行处理,此时会创建 topic、channel,topic会启动一个 messagepump 的 goroutine,将消息发送给下面的各个channel
  • 当有 consumer client 接入进来时, 启动单独 goroutine 进行处理,会启动一个 messagepump goroutine 将消息发送给各个consumer

注意,consumer 消费消息是有超时配置的,消费者的每一条消息要在超时范围内,要不然会导致一些问题。

查看原文

赞 0 收藏 0 评论 0

朱伟 发布了文章 · 5月27日

Nsq 原理分析(一)

Nsq 是用 go 语言开发的轻量级的分布式消息队列,适合小型项目使用、用来学习消息队列实现原理、学习 golang channel知识以及如何用 go 来写分布式,为什么说适合小型小型项目使用因为,nsq 如果没有能力进行二次开发的情况存在的问题还是很多的。


Nsq 模块介绍

nsqd:是一个进程监听了 http、tcp 两种协议,用来创建 topic、channel,分发消息给消费者,向 nsqlooup 注册自己的元数据信息(topic、channel、consumer),自己的服务信息,最核心模块。

nsqlookup:存储了 nsqd 的元数据和服务信息(endpoind),向消费者提供服务发现功能,向 nsqadmin 提供数据查询功能。

nsqadmin:简单的管理界面,展示了 topic、channel以及channel上的消费者,也可以创建 topic、channel
nsq.gif
摘自官网
生产者向某个topic中发送消息,如果topic有一个或者多个channle,那么该消息会被复制多分发送到每一个channel中。类似 rabbitmq中的fanout类型,channle类似队列。
官方说 nsq 是分布式的消息队列服务,但是在我看来只有channel到消费者这部分提现出来分布式的感觉,nsqd 这个模块其实就是单点的,nsqd 将 topic、channel、以及消息都存储在了本地磁盘,官方还建议一个生产者使用一个 nsqd,这样不仅浪费资源还没有数据备份的保障。一旦 nsqd 所在的主机磁损坏,数据都将丢失。

Nsq 源码分析

先部署一个简单的环境,以 centos 操作系统为例

下载
wget https://s3.amazonaws.com/bitly-downloads/nsq/nsq-1.2.0.linux-amd64.go1.12.9.tar.gz
解压
tar xvf nsq-1.2.0.linux-amd64.go1.12.9.tar.gz
cd nsq-1.2.0.linux-amd64.go1.12.9/bin
cp * /bin

启动三个终端,一个用来启动 nsqadmin(管理界面)、nsqlookup(nsqd服务以及元数据管理)、nsqd(nsq核心模块,元数据、消息存储以及消息分发), ip 换成自己的真实ip

终端1
/bin/nsqd --lookupd-tcp-address 192.168.1.1:4160 -tcp-address 0.0.0.0:4152 -http-address 0.0.0.0:4153  --broadcast-address 192.168.1.1
终端2
/bin/nsqlookupd --broadcast-address 192.168.1.1:4160
终端3
/bin/nsqadmin --lookupd-http-address 192.168.1.1:4160

看一下 nsq 的简单使用

cat producer.go
package main
import "github.com/nsqio/go-nsq"
config := nsq.NewConfig()
p, _ := nsq.NewProducer(addr, config)
err := p.Publish("topic", []byte("message"))
if err != nil {
    fmt.Printf("dispatch task failed %s", err)
}

cat consumer.go
package main
import "github.com/nsqio/go-nsq"

type MyHandler struct {}

func (h *MyHandler) HandleMessage(message *nsq.Message) error {
    fmt.Printf("consume message %+v\n", message)
}

config := nsq.NewConfig()
c, _ := nsq.NewConsumer("topic", "channel", config)
c.SetLoggerLevel(nsq.LogLevelDebug)
handler := &MyHandler{}
c.AddHandler(handler)
// 这里端口是4161 是 nsqlookup 的 http 端口, nsqd 和 nsqlookup 都同时监听了 tcp和http两个协议
err := c.ConnectToNSQLookupd("192.168.1.1:4161")
if err != nil {
    fmt.Printf("Connect nsq lookup failed %+v\n", err)
}

1. 生产者代码分析

go-nsq/producer.go

// After Config is passed into NewProducer the values are no longer mutable (they are copied).
func NewProducer(addr string, config *Config) (*Producer, error) {
    err := config.Validate()
    if err != nil {
        return nil, err
    }

    p := &Producer{
        id: atomic.AddInt64(&instCount, 1),

        addr:   addr,
        config: *config,

        logger: make([]logger, int(LogLevelMax+1)),
        logLvl: LogLevelInfo,

        transactionChan: make(chan *ProducerTransaction),
        exitChan:        make(chan int),
        responseChan:    make(chan []byte),
        errorChan:       make(chan []byte),
    }

    // Set default logger for all log levels
    l := log.New(os.Stderr, "", log.Flags())
    for index, _ := range p.logger {
        p.logger[index] = l
    }
    return p, nil
}

初始化了 Producer 的结构体

// Publish synchronously publishes a message body to the specified topic, returning
// an error if publish failed
func (w *Producer) Publish(topic string, body []byte) error {   
    return w.sendCommand(Publish(topic, body))
}

指定要往哪个 topic 中发送消息以及要发送的消息

// Publish creates a new Command to write a message to a given topic
func Publish(topic string, body []byte) *Command {
    var params = [][]byte{[]byte(topic)}
    return &Command{[]byte("PUB"), params, body}
}

封装了命令

func (w *Producer) sendCommand(cmd *Command) error {
    doneChan := make(chan *ProducerTransaction)
    // 内部使用了异步发送的方式
    err := w.sendCommandAsync(cmd, doneChan, nil)
    if err != nil {
        close(doneChan)
        return err
    }
    // 等待异步发送完成
    t := <-doneChan
    return t.Error
}
func (w *Producer) sendCommandAsync(cmd *Command, doneChan chan *ProducerTransaction,
    args []interface{}) error {
    // keep track of how many outstanding producers we're dealing with
    // in order to later ensure that we clean them all up...
    atomic.AddInt32(&w.concurrentProducers, 1)
    defer atomic.AddInt32(&w.concurrentProducers, -1)
    // 判断有没有和 nsqd 建立连接,已经建立跳过
    if atomic.LoadInt32(&w.state) != StateConnected {
        err := w.connect()
        if err != nil {
            return err
        }
    }

    t := &ProducerTransaction{
        cmd:      cmd,
        doneChan: doneChan,
        Args:     args,
    }

    select {
    case w.transactionChan <- t:
    case <-w.exitChan:
        return ErrStopped
    }
    return nil
}

在上面这段代码中依然没有看到将 PUB command 发送给 nsqd进程的代码, 我们看一下那个 connect 函数

func (w *Producer) connect() error {
    w.guard.Lock()
    defer w.guard.Unlock()

    if atomic.LoadInt32(&w.stopFlag) == 1 {
        return ErrStopped
    }

    switch state := atomic.LoadInt32(&w.state); state {
    case StateInit:
    case StateConnected:
        return nil
    default:
        return ErrNotConnected
    }

    w.log(LogLevelInfo, "(%s) connecting to nsqd", w.addr)

    w.conn = NewConn(w.addr, &w.config, &producerConnDelegate{w})
    w.conn.SetLoggerLevel(w.getLogLevel())
    format := fmt.Sprintf("%3d (%%s)", w.id)
    for index := range w.logger {
        w.conn.SetLoggerForLevel(w.logger[index], LogLevel(index), format)
    }
    // 这个主要是消费者在使用。在消费者部分会详细分析
    _, err := w.conn.Connect()
    if err != nil {
        w.conn.Close()
        w.log(LogLevelError, "(%s) error connecting to nsqd - %s", w.addr, err)
        return err
    }
    atomic.StoreInt32(&w.state, StateConnected)
    w.closeChan = make(chan int)
    w.wg.Add(1)
    // 生产者利用这个 goroutine 向 nsqd 发送命令和接收响应
    go w.router()

    return nil
}
func (w *Producer) router() {
    for {
        select {
        // 在上面的 sendCommandAsync 这个方法中只看到了将待发送的命令又包装了一下扔到了一个 channel 中,这里在监听,以及将命令发送给nsqd
        case t := <-w.transactionChan:
            w.transactions = append(w.transactions, t)
            err := w.conn.WriteCommand(t.cmd)
            if err != nil {
                w.log(LogLevelError, "(%s) sending command - %s", w.conn.String(), err)
                w.close()
            }
            // 接收 nsqd 的响应
        case data := <-w.responseChan:
            w.popTransaction(FrameTypeResponse, data)
        case data := <-w.errorChan:
            w.popTransaction(FrameTypeError, data)
        case <-w.closeChan:
            goto exit
        case <-w.exitChan:
            goto exit
        }
    }

exit:
    w.transactionCleanup()
    w.wg.Done()
    w.log(LogLevelInfo, "exiting router")
}

2. 消费者代码分析

// NewConsumer creates a new instance of Consumer for the specified topic/channel
//
// The only valid way to create a Config is via NewConfig, using a struct literal will panic.
// After Config is passed into NewConsumer the values are no longer mutable (they are copied).
// 指定要监听的订阅的 topic 和 channel
func NewConsumer(topic string, channel string, config *Config) (*Consumer, error) {
    if err := config.Validate(); err != nil {
        return nil, err
    }

    if !IsValidTopicName(topic) {
        return nil, errors.New("invalid topic name")
    }

    if !IsValidChannelName(channel) {
        return nil, errors.New("invalid channel name")
    }

    r := &Consumer{
        id: atomic.AddInt64(&instCount, 1),

        topic:   topic,
        channel: channel,
        config:  *config,

        logger:      make([]logger, LogLevelMax+1),
        logLvl:      LogLevelInfo,
        maxInFlight: int32(config.MaxInFlight),

        incomingMessages: make(chan *Message),

        rdyRetryTimers:     make(map[string]*time.Timer),
        pendingConnections: make(map[string]*Conn),
        connections:        make(map[string]*Conn),

        lookupdRecheckChan: make(chan int, 1),

        rng: rand.New(rand.NewSource(time.Now().UnixNano())),

        StopChan: make(chan int),
        exitChan: make(chan int),
    }

    // Set default logger for all log levels
    l := log.New(os.Stderr, "", log.Flags())
    for index := range r.logger {
        r.logger[index] = l
    }

    r.wg.Add(1)
    // 因为nsq是推送push的方式消费消息,所以早消费者端会控制消费的速度,限流作用,可以配置可以自动更新
    go r.rdyLoop()
    return r, nil
}

初始化 Consumer结构体

初始化后需要添加消息处理函数 AddHandler

// AddHandler sets the Handler for messages received by this Consumer. This can be called
// multiple times to add additional handlers. Handler will have a 1:1 ratio to message handling goroutines.
//
// This panics if called after connecting to NSQD or NSQ Lookupd
//
// (see Handler or HandlerFunc for details on implementing this interface)
func (r *Consumer) AddHandler(handler Handler) {
    r.AddConcurrentHandlers(handler, 1)
}

// AddConcurrentHandlers sets the Handler for messages received by this Consumer.  It
// takes a second argument which indicates the number of goroutines to spawn for
// message handling.
//
// This panics if called after connecting to NSQD or NSQ Lookupd
//
// (see Handler or HandlerFunc for details on implementing this interface)
func (r *Consumer) AddConcurrentHandlers(handler Handler, concurrency int) {
    if atomic.LoadInt32(&r.connectedFlag) == 1 {
        panic("already connected")
    }

    atomic.AddInt32(&r.runningHandlers, int32(concurrency))
    for i := 0; i < concurrency; i++ {
        // 可以设置并发
        go r.handlerLoop(handler)
    }
}

func (r *Consumer) handlerLoop(handler Handler) {
    r.log(LogLevelDebug, "starting Handler")

    for {
        // 不断的接收 nsqd 发送过来的请求, readloop这个死循环方法会向这个channel仍消息进来,后面我们会说到
        message, ok := <-r.incomingMessages
        if !ok {
            goto exit
        }

        if r.shouldFailMessage(message, handler) {
            message.Finish()
            continue
        }
       // 使用我们添加的消息处理函数来消费消息
        err := handler.HandleMessage(message)
        if err != nil {
            r.log(LogLevelError, "Handler returned error (%s) for msg %s", err, message.ID)
            if !message.IsAutoResponseDisabled() {
                message.Requeue(-1)
            }
            continue
        }
       // 当一条消息处理完成是否从队列中移除,相当于提交,默认消费完一条消息自动提交,可以设置批量提交
        if !message.IsAutoResponseDisabled() {
            message.Finish()
        }
    }

exit:
    r.log(LogLevelDebug, "stopping Handler")
    if atomic.AddInt32(&r.runningHandlers, -1) == 0 {
        r.exit()
    }
}

func (r *Consumer) shouldFailMessage(message *Message, handler interface{}) bool {
    // message passed the max number of attempts
    if r.config.MaxAttempts > 0 && message.Attempts > r.config.MaxAttempts {
        r.log(LogLevelWarning, "msg %s attempted %d times, giving up",
            message.ID, message.Attempts)

        logger, ok := handler.(FailedMessageLogger)
        if ok {
            logger.LogFailedMessage(message)
        }

        return true
    }
    return false
}

func (r *Consumer) exit() {
    r.exitHandler.Do(func() {
        close(r.exitChan)
        r.wg.Wait()
        close(r.StopChan)
    })
}
// ConnectToNSQLookupd adds an nsqlookupd address to the list for this Consumer instance.
//
// If it is the first to be added, it initiates an HTTP request to discover nsqd
// producers for the configured topic.
//
// A goroutine is spawned to handle continual polling.
func (r *Consumer) ConnectToNSQLookupd(addr string) error {
    if atomic.LoadInt32(&r.stopFlag) == 1 {
        return errors.New("consumer stopped")
    }
    if atomic.LoadInt32(&r.runningHandlers) == 0 {
        return errors.New("no handlers")
    }

    if err := validatedLookupAddr(addr); err != nil {
        return err
    }

    atomic.StoreInt32(&r.connectedFlag, 1)

    r.mtx.Lock()
    for _, x := range r.lookupdHTTPAddrs {
        if x == addr {
            r.mtx.Unlock()
            return nil
        }
    }
    r.lookupdHTTPAddrs = append(r.lookupdHTTPAddrs, addr)
    numLookupd := len(r.lookupdHTTPAddrs)
    r.mtx.Unlock()

    // if this is the first one, kick off the go loop
    if numLookupd == 1 {
        r.queryLookupd()
        r.wg.Add(1)
        go r.lookupdLoop()
    }

    return nil
}

消费者需要连接到nsqlookup,从nsqlookup中查询到nsqd的服务信息,然后进行连接

// make an HTTP req to one of the configured nsqlookupd instances to discover
// which nsqd's provide the topic we are consuming.
//
// initiate a connection to any new producers that are identified.
func (r *Consumer) queryLookupd() {
    retries := 0

retry:
    endpoint := r.nextLookupdEndpoint()

    r.log(LogLevelInfo, "querying nsqlookupd %s", endpoint)

    var data lookupResp
    err := apiRequestNegotiateV1("GET", endpoint, nil, &data)
    if err != nil {
        r.log(LogLevelError, "error querying nsqlookupd (%s) - %s", endpoint, err)
        retries++
        if retries < 3 {
            r.log(LogLevelInfo, "retrying with next nsqlookupd")
            goto retry
        }
        return
    }

    var nsqdAddrs []string
    for _, producer := range data.Producers {
        broadcastAddress := producer.BroadcastAddress
        port := producer.TCPPort
        joined := net.JoinHostPort(broadcastAddress, strconv.Itoa(port))
        nsqdAddrs = append(nsqdAddrs, joined)
    }
    // apply filter
    if discoveryFilter, ok := r.behaviorDelegate.(DiscoveryFilter); ok {
        nsqdAddrs = discoveryFilter.Filter(nsqdAddrs)
    }
    // 获取 nsqlookup中所以的nsqd信息,然后进行连接
    for _, addr := range nsqdAddrs {
        err = r.ConnectToNSQD(addr)
        if err != nil && err != ErrAlreadyConnected {
            r.log(LogLevelError, "(%s) error connecting to nsqd - %s", addr, err)
            continue
        }
    }
}

官方不建议消费者端直接连接nsqd,

// ConnectToNSQD takes a nsqd address to connect directly to.
//
// It is recommended to use ConnectToNSQLookupd so that topics are discovered
// automatically.  This method is useful when you want to connect to a single, local,
// instance.
func (r *Consumer) ConnectToNSQD(addr string) error {
    if atomic.LoadInt32(&r.stopFlag) == 1 {
        return errors.New("consumer stopped")
    }

    if atomic.LoadInt32(&r.runningHandlers) == 0 {
        return errors.New("no handlers")
    }

    atomic.StoreInt32(&r.connectedFlag, 1)
    // 初始化
    conn := NewConn(addr, &r.config, &consumerConnDelegate{r})
    conn.SetLoggerLevel(r.getLogLevel())
    format := fmt.Sprintf("%3d [%s/%s] (%%s)", r.id, r.topic, r.channel)
    for index := range r.logger {
        conn.SetLoggerForLevel(r.logger[index], LogLevel(index), format)
    }
    r.mtx.Lock()
    _, pendingOk := r.pendingConnections[addr]
    _, ok := r.connections[addr]
    if ok || pendingOk {
        r.mtx.Unlock()
        return ErrAlreadyConnected
    }
    r.pendingConnections[addr] = conn
    if idx := indexOf(addr, r.nsqdTCPAddrs); idx == -1 {
        r.nsqdTCPAddrs = append(r.nsqdTCPAddrs, addr)
    }
    r.mtx.Unlock()

    r.log(LogLevelInfo, "(%s) connecting to nsqd", addr)

    cleanupConnection := func() {
        r.mtx.Lock()
        delete(r.pendingConnections, addr)
        r.mtx.Unlock()
        conn.Close()
    }
    // 进行连接,在分析生产者时看到过,这里是consumer和nsqd建立了连接的地方
    resp, err := conn.Connect()
    if err != nil {
        cleanupConnection()
        return err
    }

    if resp != nil {
        if resp.MaxRdyCount < int64(r.getMaxInFlight()) {
            r.log(LogLevelWarning,
                "(%s) max RDY count %d < consumer max in flight %d, truncation possible",
                conn.String(), resp.MaxRdyCount, r.getMaxInFlight())
        }
    }
    // consumer向nsqd发送订阅命令,此时consumer会将自己注册到nsqd中,更准确的说法是consumer将自己注册到了topic下的channel的client列表中,有消息到来时channle会随机向自己的客户端列表发送消息
    cmd := Subscribe(r.topic, r.channel)
    err = conn.WriteCommand(cmd)
    if err != nil {
        cleanupConnection()
        return fmt.Errorf("[%s] failed to subscribe to %s:%s - %s",
            conn, r.topic, r.channel, err.Error())
    }

    r.mtx.Lock()
    delete(r.pendingConnections, addr)
    r.connections[addr] = conn
    r.mtx.Unlock()

    // pre-emptive signal to existing connections to lower their RDY count
    for _, c := range r.conns() {
        r.maybeUpdateRDY(c)
    }

    return nil

go-nsq/conn.go

// Connect dials and bootstraps the nsqd connection
// (including IDENTIFY) and returns the IdentifyResponse
func (c *Conn) Connect() (*IdentifyResponse, error) {
    dialer := &net.Dialer{
        LocalAddr: c.config.LocalAddr,
        Timeout:   c.config.DialTimeout,
    }
    // 生产者或者消费者在这里与 nsqd 建立 tcp 连接
    conn, err := dialer.Dial("tcp", c.addr)
    if err != nil {
        return nil, err
    }
    c.conn = conn.(*net.TCPConn)
    c.r = conn
    c.w = conn
    // 建立连接后先发送 4 字节信息表示使用哪种协议,目前有 v1 和 v2两种协议
    _, err = c.Write(MagicV2)
    if err != nil {
        c.Close()
        return nil, fmt.Errorf("[%s] failed to write magic - %s", c.addr, err)
    }
    // 告诉 nsqd 关于自己的一些基本信息,比如心跳间隔、处理消息的超时、client id 等等
    resp, err := c.identify()
    if err != nil {
        return nil, err
    }

    if resp != nil && resp.AuthRequired {
        if c.config.AuthSecret == "" {
            c.log(LogLevelError, "Auth Required")
            return nil, errors.New("Auth Required")
        }
        err := c.auth(c.config.AuthSecret)
        if err != nil {
            c.log(LogLevelError, "Auth Failed %s", err)
            return nil, err
        }
    }

    c.wg.Add(2)
    atomic.StoreInt32(&c.readLoopRunning, 1)
    // 这两个 goroutine 很重要
    go c.readLoop()
    go c.writeLoop()
    return resp, nil
}
func (c *Conn) readLoop() {
    delegate := &connMessageDelegate{c}
    for {
        if atomic.LoadInt32(&c.closeFlag) == 1 {
            goto exit
        }
        // 从 nsqd获取消息
        frameType, data, err := ReadUnpackedResponse(c)
        if err != nil {
            if err == io.EOF && atomic.LoadInt32(&c.closeFlag) == 1 {
                goto exit
            }
            if !strings.Contains(err.Error(), "use of closed network connection") {
                c.log(LogLevelError, "IO error - %s", err)
                c.delegate.OnIOError(c, err)
            }
            goto exit
        }
        // 心跳检测默认30s检查一次,后面会细说一下这里
        if frameType == FrameTypeResponse && bytes.Equal(data, []byte("_heartbeat_")) {
            c.log(LogLevelDebug, "heartbeat received")
            c.delegate.OnHeartbeat(c)
            err := c.WriteCommand(Nop())
            if err != nil {
                c.log(LogLevelError, "IO error - %s", err)
                c.delegate.OnIOError(c, err)
                goto exit
            }
            continue
        }

        switch frameType {
        // 处理相应信息
        case FrameTypeResponse:
            c.delegate.OnResponse(c, data)
            // 接收消息进行消费
        case FrameTypeMessage:
            msg, err := DecodeMessage(data)
            if err != nil {
                c.log(LogLevelError, "IO error - %s", err)
                c.delegate.OnIOError(c, err)
                goto exit
            }
            msg.Delegate = delegate
            msg.NSQDAddress = c.String()

            atomic.AddInt64(&c.messagesInFlight, 1)
            atomic.StoreInt64(&c.lastMsgTimestamp, time.Now().UnixNano())
             // 这里将从nsqd那边获取到的消息扔到了一个channel中,这个channel就是上面 handlerloop死循环中在等待消息的channel
            c.delegate.OnMessage(c, msg)
        case FrameTypeError:
            c.log(LogLevelError, "protocol error - %s", data)
            c.delegate.OnError(c, data)
        default:
            c.log(LogLevelError, "IO error - %s", err)
            c.delegate.OnIOError(c, fmt.Errorf("unknown frame type %d", frameType))
        }
    }

exit:
    atomic.StoreInt32(&c.readLoopRunning, 0)
    // start the connection close
    messagesInFlight := atomic.LoadInt64(&c.messagesInFlight)
    if messagesInFlight == 0 {
        // if we exited readLoop with no messages in flight
        // we need to explicitly trigger the close because
        // writeLoop won't
        c.close()
    } else {
        c.log(LogLevelWarning, "delaying close, %d outstanding messages", messagesInFlight)
    }
    c.wg.Done()
    c.log(LogLevelInfo, "readLoop exiting")
}
func (c *Conn) writeLoop() {
    for {
        select {
        case <-c.exitChan:
            c.log(LogLevelInfo, "breaking out of writeLoop")
            // Indicate drainReady because we will not pull any more off msgResponseChan
            close(c.drainReady)
            goto exit
        case cmd := <-c.cmdChan:
            err := c.WriteCommand(cmd)
            if err != nil {
                c.log(LogLevelError, "error sending command %s - %s", cmd, err)
                c.close()
                continue
            }
        case resp := <-c.msgResponseChan:
            // Decrement this here so it is correct even if we can't respond to nsqd
            msgsInFlight := atomic.AddInt64(&c.messagesInFlight, -1)

            if resp.success {
                c.log(LogLevelDebug, "FIN %s", resp.msg.ID)
                c.delegate.OnMessageFinished(c, resp.msg)
                c.delegate.OnResume(c)
            } else {
                c.log(LogLevelDebug, "REQ %s", resp.msg.ID)
                c.delegate.OnMessageRequeued(c, resp.msg)
                if resp.backoff {
                    c.delegate.OnBackoff(c)
                } else {
                    c.delegate.OnContinue(c)
                }
            }

            err := c.WriteCommand(resp.cmd)
            if err != nil {
                c.log(LogLevelError, "error sending command %s - %s", resp.cmd, err)
                c.close()
                continue
            }

            if msgsInFlight == 0 &&
                atomic.LoadInt32(&c.closeFlag) == 1 {
                c.close()
                continue
            }
        }
    }

exit:
    c.wg.Done()
    c.log(LogLevelInfo, "writeLoop exiting")
}

当消息处理完成consumer会通过writeloop向nsqd发送FIN 命令,告诉nsqd我有哪些消息消费完成可以从队列中移除了。
其实上面是go nsq这个客户端的代码,还没有看到 nsq本身的代码,先总结一下。然后继续看nsqd的代码
生产者

  1. 生产者先初始化Producerj结构体,然后设置一些配置
  2. 生产者和nsqd建立tcp连接
  3. 协商版本
  4. 生产者启动一个route协程,这个协程用来不断的向nsqd发送PUB指令,同时携带消息

消费者

  1. 消费者初始化Consumer结构体
  2. 消费者通过nsqlookup和 nsqd 建立tcp连接,nsqd可能是一个也可能是多个
  3. 协商版本
  4. 建立连接后发送自己的识别信息给nsqd,携带一些基本配置信息,比如心跳间隔、消息消费超时、客户端id等等
  5. 启动RDY限流机制
  6. 启动 readloop、writeloop
查看原文

赞 0 收藏 0 评论 0

朱伟 关注了用户 · 5月16日

时速云 @shisuyun

时速云是国内领先的容器云平台和解决方案提供商。基于Docker为代表的容器技术,为开发者和企业提供应用的镜像构建、发布、持续集成/交付、容器部署、运维管理的新一代云计算平台。其中包括标准化、高可用的镜像构建,存储服务、大规模、可伸缩的容器托管服务,及自有主机集群混合云服务。时速云致力打造下一代以应用为中心的云计算平台,帮助客户优化开发运维环节,提高业务效率,降低IT成本,实现持续创新。

关注 35

朱伟 回答了问题 · 5月14日

什么是分布式任务调度?

celery 的官方文档以及官方给出的代码样例不能很好的解释celery是分布式,其实 celery 借助了kombu 框架,kombu 封装了常用的消息队列,celery 将消息队列中的生产者和消费者抽象化,kombu 将消息队列抽象化了,celery 将函数或者类封装成了任务

关注 3 回答 2

朱伟 发布了文章 · 5月14日

Kubernetes informer 原理分析

熟悉 openstack 的人都知道在 openstack 的模块内部的不同进程间是通过 rabbitmq 消息队列来传递消息的,在不同模块之间靠 http 协议交换信息,kubernetes 中 informer 机制实在太重要了,基本承担了整个 kubernetes 中的资源信息传递,不仅仅在 kubernetes 内部,当我们写一些 k8s 扩展时,只要涉及到资源信息比如 pod、node、serviceaccount等等,都逃离不了 informer 这个框架。没有它我们获取到 k8s中资源会很麻烦。有了 informer 框架可以让 k8s 各个模块至之间协作起来很优雅。


informer 框架作用

k8s 中的 informer 框架可以很方便的让每个子模块以及扩展程序拿到 k8s 中的资源信息。

informer 框架的使用

先来看一下 informer 的整体工作原理图
k8s informer 原理图
reflector:reflector 用来直接和 kuber api server 通信,内部实现了 listwatch 机制,listwatch 就是用来监听资源变化的,一个listwatch 只对应一个资源,这个资源可以是 k8s 中内部的资源也可以是自定义的资源,当收到资源变化时(创建、删除、修改)时会将资源放到 Delta Fifo 队列中。
informer:informer 是我们要监听的资源的一个代码抽象,在 controller 的驱动下运行,能够将 delta filo 队列中的数据弹出,然后保存到本地缓存也就是图中的步骤5,同时将数据分发到自定义controller 中进行事件处理也就是图中的步骤6。
indexer: indexer 能够基于一些索引函数以及对象的标签计算出索引存储到本地缓存,在自定义 controller 中处理对象时就是基于对象的索引在本地缓存将对象查询出来进行处理。

package main

import (
    "k8s.io/client-go/informers"
    )
func main() {
    // client 是 kube api server 客户端,因为要从 kube api         // server 端拉取数据, resyncpersiod 是重新拉取周期,后面会细说
    sharedInformers := informers.NewSharedInformerFactory(client,ResyncPeriod)
    // 监听 pod 资源
    podInformer := sharedInformers.Core().V1().Pods()
    // 监听 service 资源
    servicesInformer := sharedInformers.Core().V1().Services()
    podLister = podInformer.Lister()
    servicesLister = servicesInformer.Lister()
    sListerSynced = sInformer.Informer().HasSynced
    dc.podListerSynced = podInformer.Informer().HasSynced
    // 启动各个资源 informer
    sharedInformers.Start(stopChannel)
}

这这么几步就实现了资源的获取,初始化 informer 工厂实例, 设置需要监听的资源。

informer 框架的原理

就利用上面的那一段代码看一下 informer 内部是如何工作的,

informer 的启动

vender/k8s.io/client-go/informers/factory.go

// NewSharedInformerFactory constructs a new instance of sharedInformerFactory for all namespaces.
func NewSharedInformerFactory(client
kubernetes.Interface, defaultResync time.Duration) SharedInformerFactory {
    return NewSharedInformerFactoryWithOptions(client, defaultResync)
}

// NewFilteredSharedInformerFactory constructs a new instance of sharedInformerFactory.
// Listers obtained via this SharedInformerFactory will be subject to the same filters
// as specified here.
// Deprecated: Please use NewSharedInformerFactoryWithOptions instead
func NewFilteredSharedInformerFactory(client kubernetes.Interface, defaultResync time.Duration, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) SharedInformerFactory {
    return NewSharedInformerFactoryWithOptions(client, defaultResync, WithNamespace(namespace), WithTweakListOptions(tweakListOptions))
}

// NewSharedInformerFactoryWithOptions constructs a new instance of a SharedInformerFactory with additional options.
func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory {
    // 主要在初始化这个数据结构
    factory := &sharedInformerFactory{
        client:           client,
        namespace:        v1.NamespaceAll,
        defaultResync:    defaultResync,
        informers:        make(map[reflect.Type]cache.SharedIndexInformer),
        startedInformers: make(map[reflect.Type]bool),
        customResync:     make(map[reflect.Type]time.Duration),
    }

    // Apply all options
    for _, opt := range options {
        factory = opt(factory)
    }

    return factory
}

// Start initializes all requested informers.
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
    f.lock.Lock()
    defer f.lock.Unlock()
    // 迭代 informers 中的元素将其启动,informers 就是我们监听的资源, 比如pod、services
    for informerType, informer := range f.informers {
        if !f.startedInformers[informerType] {
            go informer.Run(stopCh)
            f.startedInformers[informerType] = true
        }
    }
}
informer 注册

看完上面的几段代码缺了点什么?informers 中的元素什么时候注册进来的?

sListerSynced = sInformer.Informer().HasSynced

这一行代码就将一个要监听的资源注册到了 informer 框架中,看一下这一句代码背后做了什么
kubernetes/pkg/controller/service.go
下面这段代码是 service controller 的 Run 方法

func (s *Controller) Run(stopCh <-chan struct{}, workers int) {
    defer runtime.HandleCrash()
    defer s.queue.ShutDown()

    klog.Info("Starting service controller")
    defer klog.Info("Shutting down service controller")
    // 等待第一次同步的触发
    if !cache.WaitForNamedCacheSync("service", stopCh, s.serviceListerSynced, s.nodeListerSynced) {
        return
    }

    for i := 0; i < workers; i++ {
        go wait.Until(s.worker, time.Second, stopCh)
    }

    go wait.Until(s.nodeSyncLoop, nodeSyncPeriod, stopCh)

    <-stopCh
}

client-go/informers/core/v1/service.go
每一个资源都有一个文件有着类似的代码结构

type ServiceInformer interface {
    Informer() cache.SharedIndexInformer
    Lister() v1.ServiceLister
}

type serviceInformer struct {
    factory          internalinterfaces.SharedInformerFactory
    tweakListOptions internalinterfaces.TweakListOptionsFunc
    namespace        string
}

// NewServiceInformer constructs a new informer for Service type.
// Always prefer using an informer factory to get a shared informer instead of getting an independent
// one. This reduces memory footprint and number of connections to the server.
func NewServiceInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer {
    return NewFilteredServiceInformer(client, namespace, resyncPeriod, indexers, nil)
}

// NewFilteredServiceInformer constructs a new informer for Service type.
// Always prefer using an informer factory to get a shared informer instead of getting an independent
// one. This reduces memory footprint and number of connections to the server.
func NewFilteredServiceInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
    return cache.NewSharedIndexInformer(
        &cache.ListWatch{
            ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
                if tweakListOptions != nil {
                    tweakListOptions(&options)
                }
                return client.CoreV1().Services(namespace).List(options)
            },
            WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
                if tweakListOptions != nil {
                    tweakListOptions(&options)
                }
                return client.CoreV1().Services(namespace).Watch(options)
            },
        },
        &corev1.Service{},
        resyncPeriod,
        indexers,
    )
}

func (f *serviceInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
    return NewFilteredServiceInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}
// 这个段代码将 service 资源注册到了 informers 列表中
func (f *serviceInformer) Informer() cache.SharedIndexInformer {
    return f.factory.InformerFor(&corev1.Service{}, f.defaultInformer)
}

func (f *serviceInformer) Lister() v1.ServiceLister {
    return v1.NewServiceLister(f.Informer().GetIndexer())
}
informer 运行

注册完后看一下 informer 的启动做了什么

func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
    defer utilruntime.HandleCrash()

    fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
    // 初始化 controller 的配置,为接下来的运行做好准备
    cfg := &Config{
        Queue:            fifo, // delta fifo 队列
        ListerWatcher:    s.listerWatcher,
        ObjectType:       s.objectType,
        FullResyncPeriod: s.resyncCheckPeriod,
        RetryOnError:     false,
        ShouldResync:     s.processor.shouldResync,

        Process: s.HandleDeltas,
    }

    func() {
        s.startedLock.Lock()
        defer s.startedLock.Unlock()

        s.controller = New(cfg)
        s.controller.(*controller).clock = s.clock
        s.started = true
    }()

    // Separate stop channel because Processor should be stopped strictly after controller
    processorStopCh := make(chan struct{})
    var wg wait.Group
    defer wg.Wait()              // Wait for Processor to stop
    defer close(processorStopCh) // Tell Processor to stop
    wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
    // 在运行 controller 之前运行了一个 processor 的协程
    // informer 是在controller 的驱动下运行的,controller 会是的数据从 delta fifo 队列中弹出,弹出来要被处理的, 可以理解 controller 和 delta fifo queue 结合成为了生产者, 那么这里的 processor 就是消费者
    wg.StartWithChannel(processorStopCh, s.processor.run)

    defer func() {
        s.startedLock.Lock()
        defer s.startedLock.Unlock()
        s.stopped = true // Don't want any new listeners
    }()
    // 启动controller
    s.controller.Run(stopCh)
}

看一下 processor 这个消费者做了哪些工作,消费者是我自己命的名理解成资源处理器也可以

func (p *sharedProcessor) run(stopCh <-chan struct{}) {
    func() {
        p.listenersLock.RLock()
        defer p.listenersLock.RUnlock()
        for _, listener := range p.listeners {
            // 可以看到在这里又启动了两个协程
            p.wg.Start(listener.run)
            p.wg.Start(listener.pop)
        }
        p.listenersStarted = true
    }()
    <-stopCh
    p.listenersLock.RLock()
    defer p.listenersLock.RUnlock()
    for _, listener := range p.listeners {
        close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
    }
    p.wg.Wait() // Wait for all .pop() and .run() to stop
}
func (p *processorListener) run() {
    // this call blocks until the channel is closed.  When a panic happens during the notification
    // we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
    // the next notification will be attempted.  This is usually better than the alternative of never
    // delivering again.
    stopCh := make(chan struct{})
    wait.Until(func() {
        // this gives us a few quick retries before a long pause and then a few more quick retries
        err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
            // 会阻塞在这里, 等待数据对象发送过来
            for next := range p.nextCh {
                // 根据注册的回调函数进行事件处理
                switch notification := next.(type) {
                case updateNotification:
                    p.handler.OnUpdate(notification.oldObj, notification.newObj)
                case addNotification:
                    p.handler.OnAdd(notification.newObj)
                case deleteNotification:
                    p.handler.OnDelete(notification.oldObj)
                default:
                    utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
                }
            }
            // the only way to get here is if the p.nextCh is empty and closed
            return true, nil
        })

        // the only way to get here is if the p.nextCh is empty and closed
        if err == nil {
            close(stopCh)
        }
    }, 1*time.Minute, stopCh)
}
func (p *processorListener) pop() {
    defer utilruntime.HandleCrash()
    defer close(p.nextCh) // Tell .run() to stop

    var nextCh chan<- interface{}
    var notification interface{}
    for {
        select {
        // 在往 nextCh channle 中发送数据, 上面的 run 就会被触发执行
        case nextCh <- notification:
            // Notification dispatched
            var ok bool
            notification, ok = p.pendingNotifications.ReadOne()
            if !ok { // Nothing to pop
                 // 没有数据时关闭通道
                nextCh = nil // Disable this select case
            }
        case notificationToAdd, ok := <-p.addCh:
            if !ok {
                return
            }
            if notification == nil { // No notification to pop (and pendingNotifications is empty)
                // Optimize the case - skip adding to pendingNotifications
                notification = notificationToAdd
                nextCh = p.nextCh // 有数据到来打开通道接收
            } else { // There is already a notification waiting to be dispatched
                p.pendingNotifications.WriteOne(notificationToAdd)
            }
        }
    }
}

回头看一下 controller 在启动后做了什么

func (c *controller) Run(stopCh <-chan struct{}) {
    defer utilruntime.HandleCrash()
    go func() {
        <-stopCh
        c.config.Queue.Close()
    }()
    r := NewReflector(
        c.config.ListerWatcher,
        c.config.ObjectType,
        c.config.Queue,
        c.config.FullResyncPeriod,
    )
    r.ShouldResync = c.config.ShouldResync
    r.clock = c.clock

    c.reflectorMutex.Lock()
    c.reflector = r
    c.reflectorMutex.Unlock()

    var wg wait.Group
    defer wg.Wait()
    // 运行了 reflect, 通过informer的架构图可以知道 reflect 要和 kube api server 通信的 使用 listwatch 方式监听资源的变动事件的发生,然后同步到 delta fifo queue 中
    wg.StartWithChannel(stopCh, r.Run)
    //  从 delta fifo queu 中弹出数据,然后交由 informer 处理
    wait.Until(c.processLoop, time.Second, stopCh)
}
// 通过函数名就可以看出来是处理 delta fifo queue 的
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
    s.blockDeltas.Lock()
    defer s.blockDeltas.Unlock()

    // from oldest to newest
    for _, d := range obj.(Deltas) {
        // 执行 informer 架构图中的 5,6两步
        switch d.Type {
        case Sync, Added, Updated:
            isSync := d.Type == Sync
            s.cacheMutationDetector.AddObject(d.Object)
            if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
                if err := s.indexer.Update(d.Object); err != nil {
                    return err
                }
                s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
            } else {
                if err := s.indexer.Add(d.Object); err != nil {
                    return err
                }
                s.processor.distribute(addNotification{newObj: d.Object}, isSync)
            }
        case Deleted:
            if err := s.indexer.Delete(d.Object); err != nil {
                return err
            }
            s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
        }
    }
    return nil
}

通过 informer 的架构图可以知道在自定义 controller 处理资源变动的事件时是通过 workerqueue 来获取数据的,所以在本地是存在两极缓存,workerqueue 只存储了资源的索引,在真正对资源进行处理时,需要根据资源索引去 thread safe store 这个缓存中获取数据的,
文章中提到了两个 controller 概念, 其中一处提到的是自定义 controller,另一处只是使用了 controller, 自定义controller值得的开发者可以写 k8s 的 controller 扩展,通常和 CRU(自定义资源)配合,同时也只 k8s 中的 deployment、service、deamon等 controller,另一个 controller 是指 informer 框架中的 controller

参考文章

https://github.com/kubernetes/sample-controller/blob/master/docs/controller-client-go.md

查看原文

赞 0 收藏 0 评论 0

朱伟 发布了文章 · 5月14日

Celery杂谈

在本篇文章中不会针对 celery 的某个部分话题进行深入介绍,而是记录一些学习和使用 celery 的过程中让我困惑的一些问题。


一、celery 做了什么

celery 官方文档介绍说 celery 是一个分布式任务队列,但是通过阅读其官方文档很难让开发者感受到是分布式的,因为官方并没有提供分布式的代码样例以及相关说明。
celery 是封装了消息队列,比如常用的 rabbitmq、kafka或者nysql、redis等等这种伪消息队列,对mysql、redis用类似 rabbitmq 和 kafak 中的概念进行了抽象,其实 celery 也没有自己去针对这些消息队列去进行抽象,而是大量的借助了 kombu 的代码, kombu 底层才是真正的将 消息队列给抽象了起来,那 celery 为什么又封装了一层?celery 其实是将消息队列中的生产者和消费者的使用方式进行了抽象,kombu 是将消息队列的使用方式进行了抽象。在 celery 的世界里可以忽略生产者和消费者概念,只考虑任务的概念,celery 帮助开发者将要执行的函数封装成了任务,这个任务一调用就会在本地机器执行或者在远程机器执行。这样开发者就可以专心的写业务逻辑了。


二、celery 中的任务是如何执行的

@app.task
def add(x, y):
    return x+y

我们使用时都会像上面那段代码一样,将我们的函数用装饰器包装起来形成一个任务,这个任务当我们调用 add.delay(1,2) 时,其实调用了消息队列中的生产者,根据 celery 的配置文件,或者在装饰器中传递的参数会选择路由和队列从而将任务发送给消费者。消费者获取到任务就会将任务扔到进程池、线程池、协程中的某一个策略中去运行任务。
生产者发送的一定是任务的元数据信息,肯定不会发送一个对象的序列化格式过去,不是不可以,但是不建议那样做。元数据最重要的是任务的 id 以及 name。消费和会根据任务名称去寻找要执行的任务,消费者是如何根据任务名称找到将要执行的任务的,看下一小节。


三、celery 消费者如何根据任务名称找到要执行的任务

celery 中写好的任务文件不仅仅要在生产者(分发任务的机器) 上要存在,在消费者(启动 worker)d的机器上也要存在,这样任务才能注册到数据结构中,worker 收到任务的元数据才会找到要执行的任务,这一点官方文档没有样例,也没有明确的说明,所以很让人不解 celery是怎么个分布式的,要让 worker(消费者)在启动时能够将所有任务注册进来,有四种实现方式。
1)将 Celery 的实例与所有的 task 写在同一个文件中

mkdir /root/project
cd /root/project
touch __init__.py
touch celery.py

cat celery.py
from celery import Celery
app = Celery(include=["project.tasks"])

touch tasks.py
cat tasks.py
from project.celery import app

@app.task
def add(x,y):
    reurn x+y
   
@app.task
def sub(x,y):
    return x-y

这样在启动 worker 时要制定项目中 Celery 实例所在的路径,这个时候装饰器会执行,同时会将任务注册,但是这种方法在平时简单写写脚本还好,如果在工程化项目中就显得太混乱,没有分类,所有任务都在一个文件中,不清晰没有层次感。
2)命令行
在 worker 启动时有个 -I 的参数选项,这个选项可以以逗号进行分割传入一个或者多个任务文件路径,比如有两个文件存在任务
3) 在执行 Celery 类实例化时作为参数传入

app = Celery(include=["project.tasks"])

这种方式比较推荐
4) 在配置文件中指定

在 celery 的用户配置文件中有项 CELERY_IMPORTS = ("project.tasks",) 推荐使用这种方式,比较灵活,方便维护

将 celery 的实例和 相关的 tasks 都写入到一个文件是最不推荐在项目中使用的,如果是简单的工具没有问题,通过命令行的传入也不是很优雅,3和4种无论采用哪种都需要在分发任务和所有 worker 机器上进行一致性维护,所以还是采用配置文件的方式比较好,当文件修改后统一下发到各个机器上。


查看原文

赞 0 收藏 0 评论 0

朱伟 发布了文章 · 5月3日

Asyncio 源码分析


python 中的协程

从我个人的理解来说一说 python中的协程,我们知道 linux 中的线程比进程轻量级,切换成本低,协程比线程更轻量级。所以切换成本耕地,是基于生成器来实现的也就是 yield 语句,后来又有 yeild from 子协程的语法出现,生成器是迭代器,迭代器不是生成器,生成器能够输出值,也可以接收值,可以 hang/resume。当然在 python3.5 使用了新的语法 async/await, 本质没啥变化,仅仅是防止在语法上的混淆。可以进行隐式切换或者显式切换,在一个线程中实现多协程切换,asyncio 就是显式的来切换协程。


Asyncio 异步框架

asyncio 框架是建立在 epoll、poll、seledct等功能基之上的,下文统一用 epoll 代替,当然使用那种事件机制取决于操作系统,在使用asyncio时,大部分操作是用asyncio运行任务,运行任务时 asyncio 并没有使用epoll 机制,因为我们知道 epoll 是需要注册文件描述符的,是在使用协程,至于协程和 epoll 怎么结合运行的,下文会细说。epoll 是用来实现异步 web 框架用的。协程使用来运行用户的 task。


Asyncio 的运行流程

简单写一个异步任务,这个任务简单点,因为本篇文章主要讲的是 asyncio 的运行机制而不是 asyncio 的使用

import asyncio
async def print_hello_after3second():
    await asyncio.sleep(3)
    print("hello")
asyncio.run(print_hello_after3second)

这里使用的 run 这个接口,使用 asyncio 运行异步任务有很多种方式,run 我觉得更像是一个命令行,从外面看接口简单,其实内部帮忙做了很多事情.为了节省篇幅以及使得文章看起来清晰每个代码片段只截取重要部分,其余的省略。

asyncio/runners.py

#run 第一个参数要是个协程
def run(main, *, debug=False):
    # loop 理解成 epoll 就好
    events.set_event_loop(loop)
    #重点在这里
    loop.run_until_complete(loop.shutdown_asyncgens())
    .... 

asyncio/base_events.py

    def run_until_complete(self, future):
        ....
        # asyncio 会把我们传进来的任务封装成 task,也可以说是 future,task 是 future 的子类
        future = tasks.ensure_future(future, loop=self
        # 里面有 _run_once 是用来调度事件循环的
        self.run_forever()
        ....

asyncio/task.py

# ensure_future 也是一个传递任务的接口
def ensure_future(coro_or_future, *, loop=None):
        ....
        # 在调用 Task 类中的__init__方法进行初始化,同时将 Task 类中的 _step方法作为回掉函数注册到了事件循环中
        task = loop.create_task(coro_or_future)
        ....

asyncio/base_events.py

    #这个方法很重要所以在这里全部列出,里面包含了asyncio的调用思想,调度 task 和 epoll
    def _run_once(self):
        """Run one full iteration of the event loop.

        This calls all currently ready callbacks, polls for I/O,
        schedules the resulting callbacks, and finally schedules
        'call_later' callbacks.
        """
        print("run once")
        sched_count = len(self._scheduled)
        if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
            self._timer_cancelled_count / sched_count >
                _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
            # Remove delayed calls that were cancelled if their number
            # is too high
            new_scheduled = []

            for handle in self._scheduled:
                if handle._cancelled:
                    handle._scheduled = False
                else:
                    new_scheduled.append(handle)

            heapq.heapify(new_scheduled)
            self._scheduled = new_scheduled
            self._timer_cancelled_count = 0
        else:
            # Remove delayed calls that were cancelled from head of queue.
            while self._scheduled and self._scheduled[0]._cancelled:
                self._timer_cancelled_count -= 1
                handle = heapq.heappop(self._scheduled)
                handle._scheduled = False

        timeout = None
        if self._ready or self._stopping:
            timeout = 0
        elif self._scheduled:
            # Compute the desired timeout.
            when = self._scheduled[0]._when
            timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT)

        if self._debug and timeout != 0:
            t0 = self.time()
            # 这里是在检查 epoll 事件
            event_list = self._selector.select(timeout)
            dt = self.time() - t0
            if dt >= 1.0:
                level = logging.INFO
            else:
                level = logging.DEBUG
            nevent = len(event_list)
            if timeout is None:
                logger.log(level, 'poll took %.3f ms: %s events',
                           dt * 1e3, nevent)
            elif nevent:
                logger.log(level,
                           'poll %.3f ms took %.3f ms: %s events',
                           timeout * 1e3, dt * 1e3, nevent)
            elif dt >= 1.0:
                logger.log(level,
                           'poll %.3f ms took %.3f ms: timeout',
                           timeout * 1e3, dt * 1e3)
        else:
            event_list = self._selector.select(timeout)
        #这里将从epoll中获取到可读可写的事件后,添加回掉函数到self._ready这个列表中,这个列表同时也包含了用户添加的异步任务,那么在什么时候添加进来的呢?
        self._process_events(event_list)

        # Handle 'later' callbacks that are ready.
        # 定时任务,可以使得异步任务在未来的某个事件点运行,用堆实现了优先级队列,按照时间排序
        end_time = self.time() + self._clock_resolution
        while self._scheduled:
            handle = self._scheduled[0]
            if handle._when >= end_time:
                break
            handle = heapq.heappop(self._scheduled)
            handle._scheduled = False
            self._ready.append(handle)

        # This is the only place where callbacks are actually *called*.
        # All other places just add them to ready.
        # Note: We run all currently scheduled callbacks, but not any
        # callbacks scheduled by callbacks run this time around --
        # they will be run the next time (after another I/O poll).
        # Use an idiom that is thread-safe without using locks.
        ntodo = len(self._ready)
        for i in range(ntodo):
            handle = self._ready.popleft()
            if handle._cancelled:
                continue
            if self._debug:
                try:
                    self._current_handle = handle
                    t0 = self.time()
                    #我们的异步任务对应的回掉函数被封装成了 handler 实例了,这个实例是协程安全的,
                    handle._run()
                    dt = self.time() - t0
                    if dt >= self.slow_callback_duration:
                        logger.warning('Executing %s took %.3f seconds',
                                       _format_handle(handle), dt)
                finally:
                    self._current_handle = None
            else:
                print("hanle %s", handle)
                handle._run()
        handle = None  # Needed to break cycles when an exception occurs.

总结一下,asyncio 将异步任务和epoll获取来的可读可写的回掉事件都放到了 self._ready 这个列表中统一运行。那么异步任务什么时候被放到 self._ready 这个列表中来的呢
asyncio/base_events.py

    #谁在调用这个函数呢,前文说过我们的异步任务都会被封装成 asyncio 中的 Task 类的 task 类的 __init__  中的这个方法调用了 call_soon , 那么 call_at 这种未来执行的任务呢?当然最终也会调用 call_soon 的,在运行时间到的时候。
    def call_soon(self, callback, *args, context=None):
        """Arrange for a callback to be called as soon as possible.

        This operates as a FIFO queue: callbacks are called in the
        order in which they are registered.  Each callback will be
        called exactly once.

        Any positional arguments after the callback will be passed to
        the callback when it is called.
        """
        self._check_closed()
        if self._debug:
            self._check_thread()
            self._check_callback(callback, 'call_soon')
        handle = self._call_soon(callback, args, context)
        if handle._source_traceback:
            del handle._source_traceback[-1]
        return handle
 # 没错就是在这里进行添加和封装成 handler 的
 def _call_soon(self, callback, args, context):
        print("register")
        handle = events.Handle(callback, args, self, context)
        if handle._source_traceback:
            del handle._source_traceback[-1]
        self._ready.append(handle)
        return handle

以上的一些说明只是讲解了 asyncio 如何运行用户侧 task 以及异步事件的,其实用户侧异步task,被隐藏在了epoll的概念中,这也是 asyncio 很高明之处。
到此仅仅说明了 asyncio 是如何调度 task 和 epoll 事件的回掉的执行。但是异步task的回掉在这里很重要,也就是上文提到的 _step 这个方法,这个方法在 Task 类中。这也关系到了 aio-libs 中这些python的异步库如何改造,下文会说如何自己实现python异步库的改造和编写。
asyncio/tasks.py

    #方法很重要,不进行删减
    def __step(self, exc=None):
        if self.done():
            raise futures.InvalidStateError(
                f'_step(): already done: {self!r}, {exc!r}')
        if self._must_cancel:
            if not isinstance(exc, futures.CancelledError):
                exc = futures.CancelledError()
            self._must_cancel = False
        # 这个coro 是我们添加进来的异步任务
        coro = self._coro
        self._fut_waiter = None

        _enter_task(self._loop, self)
        # Call either coro.throw(exc) or coro.send(None).
        try:
            if exc is None:
                # We use the `send` method directly, because coroutines
                # don't have `__iter__` and `__next__` methods.
                #触发异步任务运行
                result = coro.send(None)
            else:
                result = coro.throw(exc)
        # 协程运行结束会抛出这个异常
        except StopIteration as exc:
            if self._must_cancel:
                # Task is cancelled right before coro stops.
                self._must_cancel = False
                super().set_exception(futures.CancelledError())
            else:
                # future(不打算翻译成中文啦,也不解释在编程语言或者python中什么意思),抛出 StopIteration 异常代表异步任务运行结束,设置结果给 future,很重要,不设置结果异步任务就停不下来了(哈哈!)
                super().set_result(exc.value)
        except futures.CancelledError:
            super().cancel()  # I.e., Future.cancel(self).
        except Exception as exc:
            super().set_exception(exc)
        except BaseException as exc:
            super().set_exception(exc)
            raise
        else:
            #result 是一个future实例
            blocking = getattr(result, '_asyncio_future_blocking', None)
            if blocking is not None:
                # Yielded Future must come from Future.__iter__().
                if futures._get_loop(result) is not self._loop:
                    new_exc = RuntimeError(
                        f'Task {self!r} got Future '
                        f'{result!r} attached to a different loop')
                    self._loop.call_soon(
                        self.__step, new_exc, context=self._context)
                elif blocking:
                    if result is self:
                        new_exc = RuntimeError(
                            f'Task cannot await on itself: {self!r}')
                        self._loop.call_soon(
                            self.__step, new_exc, context=self._context)
                    else:
                        #注册wakeup到future的callback中,这个wakeup是用来提取future中的结果用的
                        result._asyncio_future_blocking = False
                        result.add_done_callback(
                            self.__wakeup, context=self._context)
                        self._fut_waiter = result
                        if self._must_cancel:
                            if self._fut_waiter.cancel():
                                self._must_cancel = False
          #小面除了 result 为none的分之外都是出现异常
                else:
                    new_exc = RuntimeError(
                        f'yield was used instead of yield from '
                        f'in task {self!r} with {result!r}')
                    print("call soon")
                    self._loop.call_soon(
                        self.__step, new_exc, context=self._context)

            elif result is None:
                # Bare yield relinquishes control for one event loop iteration.
                self._loop.call_soon(self.__step, context=self._context)
            elif inspect.isgenerator(result):
                # Yielding a generator is just wrong.
                new_exc = RuntimeError(
                    f'yield was used instead of yield from for '
                    f'generator in task {self!r} with {result!r}')
                self._loop.call_soon(
                    self.__step, new_exc, context=self._context)
            else:
                # Yielding something else is an error.
                new_exc = RuntimeError(f'Task got bad yield: {result!r}')
                self._loop.call_soon(
                    self.__step, new_exc, context=self._context)
        finally:
            _leave_task(self._loop, self)
            self = None  # Needed to break cycles when an exception occurs.

_run_once 方法是调度,调度 task 和 epoll 事件,_step 是在处理 await something() 这种语句,_step方法不是很好理解,用一段话总结一下 _step 做的事情。当然描述起来也是很抽象,
用户写的协程函数,会被 asyncio 封装成 task,协程函数作为 Task 类中 _step方法中的一个属性,_step又会被封装成 handler 作为异步事件被调用,每一个协程函数都有一个 future 和 wakeup 与之绑定,函数运行结果会设置到 future 中,wakeup 作为 future 的回掉被调用(真正调用的还是事件循环),当设置好结果后 wakeup 唤醒协程函数来提取结果,

async def a(): 
    await b() # 协程函数 b 有个future 和 wakeup 与之绑定

asyncio 无论是使用还是理解原理都是很难的,不像 golang 这种原生支持协程,python的协程经历了很慢长的步伐,不去理解背后的原理在使用过程会出现很多问题。asyncio 的生态也还不完善,有时需要自己去实现异步改造。所以理解 asyncio 背后的原理很重要,只有知道原理后才知道如何自己去改造或者写出与 asyncio 配套的工具。

参考

深入理解asyncio(二)

查看原文

赞 0 收藏 0 评论 0

认证与成就

  • 获得 1 次点赞
  • 获得 1 枚徽章 获得 0 枚金徽章, 获得 0 枚银徽章, 获得 1 枚铜徽章

擅长技能
编辑

开源项目 & 著作
编辑

(゚∀゚ )
暂时没有

注册于 2月22日
个人主页被 198 人浏览