屈天航

屈天航 查看完整档案

北京编辑重庆大学  |  计算机科学与技术 编辑融360  |  高级后端研发工程师 编辑 segmentfault.com/u/qutianhang 编辑
编辑

后端研发工程师(python、golang),专注性能优化、架构设计、微服务等

个人动态

屈天航 关注了用户 · 11月23日

小小小小雨 @xiaoxiaoxiaoxiaoyu

关注 1

屈天航 发布了文章 · 11月20日

聊一聊python和golang协程的区别

背景

最近在做后端服务python到go的迁移和重构,这两种语言里,最大的特色和优势就是都支持协程。之前主要做python的性能优化和架构优化,一开始觉得两个协程原理和应用应该差不多,后来发现还是有很大的区别,今天就在这里总结一下。

什么是协程

在说它们两者区别前,我们首先聊一下什么是协程,好像它没有一个官方的定义,那就结合平时的应用经验和学习内容来谈谈自己的理解。

协程,其实可以理解为一种特殊的程序调用。特殊的是在执行过程中,在子程序(或者说函数)内部可中断,然后转而执行别的子程序,在适当的时候再返回来接着执行。
注意,它有两个特征:

可中断,这里的中断不是普通的函数调用,而是类似CPU的中断,CPU在这里直接释放转到其他程序断点继续执行。
可恢复,等到合适的时候,可以恢复到中断的地方继续执行,至于什么是合适的时候,我们后面再探讨。

和进程线程的区别

上面两个特点就导致了它相对于线程和进程切换来说极高的执行效率,为什么这么说呢?我们先老生常谈地说一下进程和线程。

进程是操作系统资源分配的基本单位,线程是操作系统调度和执行的最小单位。这两句应该是我们最常听到的两句话,拆开来说,进程是程序的启动实例,拥有代码和打开的文件资源、数据资源、独立的内存空间。线程从属于进程,是程序的实际执行者,一个进程至少包含一个主线程,也可以有更多的子线程,线程拥有自己的栈空间。无论是进程还是线程,都是由操作系统所管理和切换的。

我们再来看协程,它又叫做微线程,但其实它和进程还有线程完全不是一个维度上的概念。进程和线程的切换完全是用户无感,由操作系统控制,从用户态到内核态再到用户态。而协程的切换完全是程序代码控制的,在用户态的切换,就像函数回调的消耗一样,在线程的栈内即可完成。

python的协程(coroutine)

python的协程其实是我们通常意义上的协程Goroutine。

从概念上来讲,python的协程同样是在适当的时候可中断可恢复。那么什么是适当的时候呢,就是你认为适当的时候,因为程序在哪里发生协程切换完全控制在开发者手里。当然,对于python来说,由于GIL锁,在CPU密集的代码上做协程切换是没啥意义的,CPU本来就在忙着没偷懒,切换到其他协程,也只是在单核内换个地方忙而已。很明显,我们应该在IO密集的地方来起协程,这样可以让CPU不再空等转而去别的地方干活,才能真正发挥协程的威力。

从实现上来讲,如果熟知了python生成器,还可以将协程理解为生成器+调度策略,生成器中的yield关键字,就可以让生成器函数发生中断,而调度策略,可以驱动着协程的执行和恢复。这样就实现了协程的概念。这里的调度策略可能有很多种,简单的例如忙轮循:while True,更简单的甚至是一个for循环。就可以驱动生成器的运行,因为生成器本身也是可迭代的。复杂的比如可能是基于epool的事件循环,在python2的tornado中,以及python3的asyncio中,都对协程的用法做了更好的封装,通过yield和await就可以使用协程,通过事件循环监控文件描述符状态来驱动协程恢复执行。

我们看一个简单的协程:

import time

def consumer():
    r = ''
    while True:
        n = yield r
        if not n:
            return
        print('[CONSUMER] Consuming %s...' % n)
        time.sleep(1)
        r = '200 OK'
def produce(c):
    c.next()
    n = 0
    while n < 5:
        n = n + 1
        print('[PRODUCER] Producing %s...' % n)
        r = c.send(n)
        print('[PRODUCER] Consumer return: %s' % r)
    c.close()

if __name__=='__main__':
    c = consumer()
    produce(c)

很明显这是一个传统的生产者-消费者模型,这里consumer函数就是一个协程(生成器),它在n = yield r 的地方发生中断,生产者produce中的c.send(n),可以驱动协程的恢复,并且向协程函数传递数据n,接收返回结果r。 而while n < 5,就是我们所说的调度策略。 在生产中,这种模式很适合我们来做一些pipeline数据的消费,我们不需要写死几个生产者进程几个消费者进程,而是用这种协程的方式,来实现CPU动态地分配调度。

如果你看过上篇文章的话,是不是发现这个golang中流水线模型有点像呢,也是生产者和消费者间进行通信,但go是通过channel这种安全的数据结构,为什么python不需要呢,因为python的协程是在单线程内切换本身就是安全的,换句话说,协程间本身就是串行执行的。而golang则不然。思考一个有意思的问题,如果我们将go流水线模型中channel设置为无缓冲区时,生产者绝对驱动消费者的执行,是不是就跟python很像了呢。所以python的协程从某种意义来说,是不是golang协程的一种特殊情况呢?
后端在线服务中我们更常用的python协程其实是在异步IO框架中使用,之前我们也提过python协程在IO密集的系统中使用才能发挥它的威力。并且大多数的数据中间件都已经提供支持了异步包的支持,这里顺便贴一个python3支持的异步IO库,基本支持了常见的异步数据中间件。

再看一个代码片段,asyncio支持的原生协程:
asyncio支持的基于epool的事件循环:

def main():
    define_options()
    options.parse_command_line()
    # 使用uvloop代替原生事件循环
    # asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
    app = tornado.web.Application(handlers=handlers, debug=options.debug)
    http_server = tornado.httpserver.HTTPServer(app)
    http_server.listen(options.port)
    asyncio.get_event_loop().run_forever()

async/await支持的原生协程:

class CreditHandler(BaseHandler):
    async def post(self):
        status, msg, user = self.check_args('uid', 'order_no', 'phone', 'name',
                                            'product_id')
        if status != ErrorCodeConfig.SUCCESS:
            status, msg, report = status, msg, None
        else:                                                                              RcOutputFlowControler())
            status, msg, report = await rcoutput_flow_instance.get_rcoutput_result(user)
        res = self.generate_response_data(status, msg, report)
        await self.finish(res)
      
        await AccompanyRunningFlowControler().get_accompany_data(user)

总结一下python协程的特点:

单线程内切换,适用于IO密集型程序中,可以最大化IO多路复用的效果。
无法利用多核。
协程间完全同步,不会并行。不需要考虑数据安全。
用法多样,可以用在web服务中,也可用在pipeline数据/任务消费中

golang的协程(goroutine)

golang的协程就和传统意义上的协程不大一样了,兼具协程和线程的优势。这也是go最大的特色,就是从语言层面支持并发。Go语言里,启动一个goroutine很容易:go function 就行。

同样从概念上来讲,golang的协程同样是在适当的时候可中断可恢复。当协程中发生channel读写的阻塞或者系统调用时,就会切换到其他协程。具体的代码示例可以看上篇文章,就不再赘述了。

从实现上来说,goroutine可以在多核上运行,从而实现协程并行,我们先直接看下go的调度模型MPG。

image.png

如上图,M指的是Machine,一个M直接关联了一个内核线程。由操作系统管理。
P指的是”processor”,代表了M所需的上下文环境,也是处理用户级代码逻辑的处理器。它负责衔接M和G的调度上下文,将等待执行的G与M对接。
G指的是Goroutine,其实本质上也是一种轻量级的线程。包括了调用栈,重要的调度信息,例如channel等。

每次go调用的时候,都会:

  1. 创建一个G对象,加入到本地队列或者全局队列
  2. 如果还有空闲的P,则创建一个M
  3. M会启动一个底层线程,循环执行能找到的G任务
  4. G任务的执行顺序是,先从本地队列找,本地没有则从全局队列找(一次性转移(全局G个数/P个数)个,再去其它P中找(一次性转移一半)

对于上面的第2-3步,创建一个M,其过程:

  1. 先找到一个空闲的P,如果没有则直接返回,(哈哈,这个地方就保证了进程不会占用超过自己设定的cpu个数)
  2. 调用系统api创建线程,不同的操作系统,调用不一样,其实就是和c语言创建过程是一致的
  3. 然后创建的这个线程里面才是真正做事的,循环执行G任务

当协程发生阻塞切换时:

  1. M0出让P
  2. 创建M1接管P及其任务队列继续执行其他G。
  3. 当阻塞结束后,M0会尝试获取空闲的P,失败的话,就把当前的G放到全局队列的队尾。

这里我们需要注意三点:
1、M与P的数量没有绝对关系,一个M阻塞,P就会去创建或者切换另一个M,所以,即使P的默认数量是1,也有可能会创建很多个M出来。

2、P何时创建:在确定了P的最大数量n后,运行时系统会根据这个数量创建n个P。

3、M何时创建:没有足够的M来关联P并运行其中的可运行的G。比如所有的M此时都阻塞住了,而P中还有很多就绪任务,就会去寻找空闲的M,而没有空闲的,就会去创建新的M。

总结一下go协程的特点:

协程间需要保证数据安全,比如通过channel或锁。
可以利用多核并行执行。
协程间不完全同步,可以并行运行,具体要看channel的设计。
抢占式调度,可能无法实现公平。

coroutine(python)和goroutine(go)的区别

除了python,C#, Lua语言都支持 coroutine 特性。coroutine 与 goroutine 在名字上类似,都是可中断可恢复的协程,它们之间最大的不同是,goroutine 可能在多核上发生并行执行,单但 coroutine 始终是顺序执行。也基于此,我们应该清楚coroutine适用于IO密集程序中,而goroutine在 IO密集和CPU密集中都有很好的表现。不过话说回来,go就一定比python快么,假如在完全IO并发密集的程序中,python的表现反而更好,因为单线程内的协程切换效率更高。

从运行机制上来说,coroutine 的运行机制属于协作式任务处理, 程序需要主动交出控制权,宿主才能获得控制权并将控制权交给其他 coroutine。如果开发者无意间或者故意让应用程序长时间占用 CPU,操作系统也无能为力,表现出来的效果就是计算机很容易失去响应或者死机。goroutine 属于抢占式任务处理,已经和现有的多线程和多进程任务处理非常类似, 虽然无法控制自己获取高优先度支持。但如果发现一个应用程序长时间大量地占用 CPU,那么用户有权终止这个任务。

从协程:线程的对应方式来看

N:1,Python协程模式,多个协程在一个线程中切换。在IO密集时切换效率高,但没有用到多核
1:1,Java多线程模式,每个协程只在一个线程中运行,这样协程和线程没区别,虽然用了多核,但是线程切换开销大。
1:1,go模式,多个协程在多个线程上切换,既可以用到多核,又可以减少切换开销。(当都是cpu密集时,在多核上切换好,当都是io密集时,在单核上切换好)。

从协程通信和调度机制来看
image.png

查看原文

赞 7 收藏 6 评论 0

屈天航 发布了文章 · 11月18日

Golang协程并发的流水线模型

背景

最近由于性能问题,后端服务一直在做python到golang的迁移和重构。go语言精简优雅,既有编译型语言的严谨和高性能,又有解释型语言的开发效率,出色的并发性能也是go区别于其他语言的一大特色。go的并发编程代码虽然简单,但重在其并发模型和流程的设计。这里就总结下golang协程并发常用的流水线模型。

简单的流水线思维

流水线模式并不是什么新奇的概念,但是它能极大地提高生产效率。比如实际生活中的汽车生产流水线,流水线上的每一个流程负责不同的工作,比如第一个流程是拼装车身,第二个流程是安装发动机,第三个流程是装轮胎...,这些步骤我们可以类比成go并发流程中的协程,每一个协程就是一个任务。流水线上面传递的车身、发动机、轮胎,这些我们可以类比成协程间需要传递的数据,而在这些流程(协程)间传递这些配件(数据),自然就要通过传送带(channel)。在流水线上,我们装四个轮胎肯定不是一个一个来装的,肯定是有四个机械臂同时来装。因此装轮胎这个步骤我们有4个协程在并发工作来提高效率。这么一来,流水线模型的基本要素就构成了。
Golang的并发模型灵感其实都来自我们生活,对程序而言,高的生产效率就是高的性能。在Golang中,流水线由多个流程节点组成,流程之间通过channel连接,每个流程节点可以由多个同时运行的goroutine组成。
image.png

如何构造流水线

有了流水线模式的思维,接下来就是如何构造流水线了。简单来说,其实就是通过channel将任务流程连接起来,两个相邻的流程互为生产者和消费者,通过channel进行通信。耗时的流程可以将任务分散到多个协程来执行。
我们先来看一个最简单的流水线,如下图,A是生产者流程,B是它的消费流程,同时又是C的生产者流程。A,B,C三个协程直接,通过读写channel进行通信。
image.png

那如果此时B流程可以将a channel中的任务并发执行呢,很简单,我们只需要起多个B协程就可以了。如下图。
image.png

总之,我们构造流水线并发的思路是关注数据的流动,数据流动的过程交给channel,channel两端数据处理的每个环节都交给goroutine,这个流程连起来,就构成了流水线模型。

关于channel

为什么我们可以选择channel来进行协程间的通信呢,协程之间又是怎么保持同步顺序呢,当然这都要归功于channel。channel是go提供的进程内协程间的通信方式,它是协程/线程安全的,channe的读写阻塞会导致协程的切换。
channel的操作和状态组合可以有以下几种情况:
image.png

**有1个特殊场景**:当`nil`的通道在`select`的某个`case`中时,这个case会阻塞,但不会造成死锁。

channel不仅可以保证协程安全的数据流动,还可以保证协程的同步。当有并发问题时,channel也是我们首先应该想到的数据结构。不过显而易见,当使用有缓冲区的channel时,才能达到协程并发的效果,并且生产者和消费者的协程间是相对同步的。使用无缓冲区的channel时,是没有并发效果的,协程间是绝对同步的,生产者和消费者必须同时写和读协程才能运行。
channel关注的是数据的流动,这种场景下都可以考虑使用channel。比如:消息传递、信号广播、任务分发、结果汇总、同步与异步、并发控制... 更多的不在这里赘述了,总之,Share memory by communicating, don't communicate by sharing memory.

流水线模型实例

举个简单栗子,计算80000以内的质数并输出。
这个例子如果我们采用非并发的方式,就是for循环80000,挨个判断是不是素数再输出。不过如果我们采用流水线的并发模型会更高效。

从数据流动的角度来分析,需要遍历生成1-80000的数字到一个channel中,数字判断是否为素数,输出结果到一个channel中。因此我们需要两个channel,channel的两端就设计成协程即可。
1、遍历生成原始80000个数据(生产者)
2、计算这80000个数据中的素数(生产者+消费者)
3、取结果输出(消费者)

代码如下:

package gen_channel
import "fmt"
import "time"
func generate_source(data_source_chan chan int) {
   for i := 1; i <= 80000; i++ {
      data_source_chan <- i
   }
   fmt.Println("写入协程结束")
   close(data_source_chan)
}
func generate_sushu(data_source_chan chan int, data_result_chan chan int, gen_chan chan bool) {
   for num:= range data_source_chan {
      falg := true
 for i := 2; i < num; i++ {
         if num%i == 0 {
            falg = false
 break }
      }
      if falg == true {
         data_result_chan <- num
      }
   }
   fmt.Println("该协程结束")
   gen_chan <- true
}
func workpool(data_source_chan chan int, data_result_chan chan int, gen_chan chan bool, gen_num int){
   // 开启8个协程
 for i := 0; i < gen_num; i++ {
      go generate_sushu(data_source_chan, data_result_chan, gen_chan)
   }
}
func Channel_main() {
   // 任务数据
   data_source_chan := make(chan int, 2000)
   // 结果数据
   data_result_chan := make(chan int, 2000)
   // 所有任务协程是否结束
   gen_chan := make(chan bool, 8)
   time1 := time.Now().Unix()
   go generate_source(data_source_chan)
   // 协程池,任务分发
   workpool(data_source_chan, data_result_chan, gen_chan, 8)
   // 所有协程结束后关闭结果数据channel
   go func() {
      for i := 0; i < 8; i++ {
         <-gen_chan
      }
      close(data_result_chan)
      fmt.Println("spend timeis ", time.Now().Unix()-time1)
   }()
   for date_result := range data_result_chan {
      fmt.Println(date_result)
   }
}

上面这段代码中。data_source_chandata_result_chan这两个channel分别用来放原始数据和结果数据,buffer分别为2000。

generate_source协程: 生产数据,它会把数据写入data_source_chan通道,全部写入完成后关闭通道。
generate_sushu协程: 负责计算并判断data_source_chan中的数据是否为质数,是的话就写入data_result_chan通道。
主协程for date_result := range data_result_chan: 最后负责读取data_result_chan中的结果,直到data_result_chan关闭后结束程序。

可以看到我们通过workpool方法起了8个generate_sushu协程来并发处理data_source_chan的任务。那么就有一个问题,如何知道所有数据都已处理完毕呢,等到生产者generate_source协程结束data_source_chan关闭吗? 恐怕不是,因为可能data_source_chan关闭后8个任务协程仍然在继续计算。那么只能等8个协程全部处理完毕后,才能说明所有数据已处理完,从而才能关闭data_result_chan,然后主协程读取data_result_chan结束。

因此我们这里引入了另一个channel:gen_chan,来记录计算结束的任务。每个generate_sushu协程处理完,就写入一个记录到channel中。因此我们有一个匿名协程,当可以从gen_chan中取8个结果出来的话,就说明所有协程已计算完成,那么可以关上阻塞程序的最后阀门data_result_chan

当然这种设计方式并不唯一,我们也可以不用统一的data_result_chan来接收结果,而是每个协程分配一个channel来存放结果,最后再merge到一起。

可能大家觉得这种方式很复杂,确实比较高效但写起来并不友好,那有没有更友好的方式呢?

sync包

在处理并发任务时我们首先想到的应该是channel,但有时候channel不是万能或者最方便的,所以go也为我们提供了sync包。

sync包提供了各种异步及锁类型及其内置方法。用起来也很方便,比如Mutex就是给协程加锁,某个时段内不能有多个协程访问同一段代码。WaitGroup就是等待一些工作完成后,再进行下一步工作。Once可以用来确保协程中某个函数只执行1次...当我们面对一个并发问题的时候,应该去分析采用哪种协程同步方式,是channel还是Mutex呢。这需要看我们关注的是数据的流动还是数据的安全性。篇幅原因这里不再展开讲了。

  1. Mutex:互斥锁
  2. RWMutex:读写锁
  3. WaitGroup:等待组
  4. Once:单次执行
  5. Cond:信号量
  6. Pool:临时对象池
  7. Map:自带锁的map

我们接着上面质数的问题,使用sync中的WaitGroup,会让我们的代码更加友好,因为我们不需要引入一个channel来记录是否4个车轮都换完了,让WaitGroup来做就好了。

 package gen_channel
import (
   "fmt"
 "time")
import "sync"
func generate_source3(data_source_chan chan int) {
   for i := 1; i <= 80000; i++ {
      data_source_chan <- i
   }
   fmt.Println("写入协程结束")
   close(data_source_chan)
}
func generate_sushu3(data_source_chan, data_result_chan chan int, wg *sync.WaitGroup) {
   defer wg.Done()
   for num := range data_source_chan {
      falg := true
 for i := 2; i < num; i++ {
         if num%i == 0 {
            falg = false
 break }
      }
      if falg == true {
         data_result_chan <- num
      }
   }
   fmt.Println("该协程结束")
}
func workpool3(data_source_chan chan int, data_result_chan chan int, wg *sync.WaitGroup, gen_num int) {
   // 开启8个协程
 for i := 0; i < gen_num; i++ {
      wg.Add(1)
      go generate_sushu3(data_source_chan, data_result_chan, wg)
   }
}
func Channel_main3() {
   data_source_chan := make(chan int, 500)
   data_result_chan := make(chan int, 2000)
   time1 := time.Now().Unix()
   var wg sync.WaitGroup
 go generate_source3(data_source_chan)
   // 开启8个协程
 for i := 0; i < 8; i++ {
      wg.Add(1)
      go generate_sushu3(data_source_chan, data_result_chan, &wg)
   }
   wg.Wait()
   close(data_result_chan)
   fmt.Println("spend timeis ", time.Now().Unix()-time1)
   for date_result := range data_result_chan {
      fmt.Println(date_result)
   }
}

总结

流水线模式的设计要关注数据的流动,然后在数据流动的路径中将数据放到channel中,将channel的两端设计成协程。
并发设计中channel和sync可以从开发效率和性能的角度自由组合,channel不一定是最优解
写入channel的协程来控制该协程的关闭,消费者协程不关闭读协程,防止报错。养成在协程入口限制channel读写类型的习惯。

以上是我们在go并发的流水线模型中的一些总结。可以看出go的协程并发更考验我们的设计能力,因为协程间的同步和数据传递都交给了开发者来设计。同时也留给我们一些引申思考,协程在IO密集和CPU密集的情况下是否都能大幅提高性能呢?是否和channel的缓冲区或者并发设计有关呢?协程异常该怎么处理呢?go的协程和python的协程又有什么区别呢?...我们后面慢慢探讨~

查看原文

赞 17 收藏 13 评论 0

屈天航 关注了用户 · 11月11日

大彬 @lessisbetter

公众号:Go语言充电站
二维码:https://segmentfault.com/img/...

关注 851

屈天航 发布了文章 · 11月6日

阿里云对象存储OSS的python SDK示例

背景

最近公司项目需要使用阿里云的oss存储来线上实时存储图片文件。因此调研开发了python版本的阿里云oss SDK。这里我们介绍一下oss以及python oss的常用方法。

oss介绍

阿里云对象存储OSS(Object Storage Service)是阿里云提供的海量、安全、低成本、高持久的云存储服务。其数据设计持久性不低于99.9999999999%(12个9),服务可用性(或业务连续性)不低于99.995%。
OSS具有与平台无关的RESTful API接口,您可以在任何应用、任何时间、任何地点存储和访问任意类型的数据。
您可以使用阿里云提供的API、SDK接口或者OSS迁移工具轻松地将海量数据移入或移出阿里云OSS。数据存储到阿里云OSS以后,您可以选择标准存储(Standard)作为移动应用、大型网站、图片分享或热点音视频的主要存储方式,也可以选择成本更低、存储期限更长的低频访问存储(Infrequent Access)、归档存储(Archive)、冷归档存储(Cold Archive)作为不经常访问数据的存储方式。
以上是阿里云官方对于oss的介绍,总结来看,就是其可以提供高可用,高持久、低成本、支持多种文件及数据格式、支持RESTful API接口的云存储服务。就我们实际使用来说确实还不错,成本的话我们申请的海外节点,30多T数据每月差不多5000,也算比较低了。

在代码示例前,我们先认识一些概念:

存储类型(Storage Class)

OSS提供标准、低频访问、归档、冷归档四种存储类型,全面覆盖从热到冷的各种数据存储场景。其中标准存储类型提供高持久、高可用、高性能的对象存储服务,能够支持频繁的数据访问;低频访问存储类型适合长期保存不经常访问的数据(平均每月访问频率1到2次),存储单价低于标准类型;归档存储类型适合需要长期保存(建议半年以上)的归档数据;冷归档存储适合需要超长时间存放的极冷数据。

存储空间(Bucket)

存储空间是您用于存储对象(Object)的容器,所有的对象都必须隶属于某个存储空间。存储空间具有各种配置属性,包括地域、访问权限、存储类型等。您可以根据实际需求,创建不同类型的存储空间来存储不同的数据。

对象(Object)

对象是OSS存储数据的基本单元,也被称为OSS的文件。对象由元信息(Object Meta)、用户数据(Data)和文件名(Key)组成。对象由存储空间内部唯一的Key来标识。对象元信息是一组键值对,表示了对象的一些属性,例如最后修改时间、大小等信息,同时您也可以在元信息中存储一些自定义的信息。

地域(Region)

地域表示OSS的数据中心所在物理位置。您可以根据费用、请求来源等选择合适的地域创建Bucket。

访问域名(Endpoint)

Endpoint表示OSS对外服务的访问域名。OSS以HTTP RESTful API的形式对外提供服务,当访问不同地域的时候,需要不同的域名。通过内网和外网访问同一个地域所需要的域名也是不同的。

访问密钥(AccessKey)

AccessKey简称AK,指的是访问身份验证中用到的AccessKey Id和AccessKey Secret。OSS通过使用AccessKey Id和AccessKey Secret对称加密的方法来验证某个请求的发送者身份。AccessKey Id用于标识用户;AccessKey Secret是用户用于加密签名字符串和OSS用来验证签名字符串的密钥,必须保密。

oss python SDK

OssClient支持的方法:
1、上传文件对象到oss存储空间
2、上传本地指定路径文件到oss存储空间
3、下载文件到文件流对象
4、下载文件到本地指定路径
5、生成加签的临时URL以供授信用户下载

以上功能基本可以满足日常业务需求,当然oss也提供了很多个性化的操作,比如断点下载、范围下载、断点上传、追加上传、上传回调、图片调整等等,这里不再赘述,都有专门的接口方法。
以下列举了接口实现方法,并没有在其中加入各种判断逻辑,比如文件是否存在、权限控制、是否覆盖、是否加密等等。有需要的可以自己添加逻辑

import oss2


AccessKeyId = 'LTA************hpmoN9'
AccessKeySecret = '0ise*************bkIyF'
BucketName = '*******'
Endpoint = 'http://oss-ap-south-1.aliyuncs.com'


class OssClient(object):
    __instance = None
    __first_init = False

    # 单例模式
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super().__new__(cls)
        return cls.__instance

    def __init__(self):
        cls = self.__class__
        if not cls.__first_init:
            self.auth = oss2.Auth(AccessKeyId, AccessKeySecret)
            self.bucket = oss2.Bucket(self.auth, Endpoint, BucketName)
            cls.__first_init = True


    def upload_file_from_fileobj(self):
        """
            upload_file_from_fileobj方法:上传文件对象到oss存储空间, 该方法可用于我们从上游服务接收了图片参数,然后以二进制形式读文件,上传到oss存储空间指定位置(abc/efg/00),
        当然也可以将本地文件上传到oss我们的bucket. 其中fileobj不止可以是文件对象,也可以是本地文件路径。 put_object方法底层仍是RESTful API的调用,可以指定headers,规定Content-Type等内容
        """
        # 判断bucket中文件是否存在,也可以不判断,会上传更新
        exist = self.bucket.object_exists('abc/efg/00') #<yourObjectName>
        if exist:
            return True
        with open('/home/rong/www/0', 'rb') as fileobj:
            result = self.bucket.put_object('abc/efg/00', fileobj, headers=None) #<yourObjectName>
        if result.status == 200:
            return True
        else:
            return False


    def upload_file_from_loaclfilepath(self):
        """
            upload_file_from_loaclfilepath:上传本地指定路径文件(/home/rong/www/0)到oss存储空间指定位置(abc/efg/0)。与put_object方法不同,put_object_from_file的第二个参数只能是本地文件路径
        """
        # 判断bucket中文件是否存在,也可以不判断,会上传更新
        exist = self.bucket.object_exists('abc/efg/0') #<yourObjectName>
        if exist:
            return True
        result = self.bucket.put_object_from_file('abc/efg/0', '/home/rong/www/0', headers=None) #(<yourObjectName>, <yourLocalFile>)
        if result.status == 200:
            return True
        else:
            return False


    def download_file_to_fileobj(self):
        """
            download_file_to_fileobj:下载文件到文件流对象。由于get_object接口返回的是一个stream流,需要执行read()后才能计算出返回Object数据的CRC checksum,因此需要在调用该接口后做CRC校验。
        """
        object_stream = self.bucket.get_object('abc/efg/0') #<yourObjectName>
        result = object_stream.read()
        if object_stream.client_crc != object_stream.server_crc:
            print("The CRC checksum between client and server is inconsistent!")
            result = None
        return result


    def download_file_to_loaclfilepath(self):
        """
            download_file_to_loaclfilepath:下载文件到本地路径。get_object和get_object_to_file的区别是前者是获取文件流实例,可用于代码处理和远程调用参赛。后者是存储到本地路径,返回的是一个http状态的json结果
        """
        result = self.bucket.get_object_to_file('abc/efg/0', '/home/rong/www/download/0') # ('<yourObjectName>', '<yourLocalFile>')
        if result.status == 200:
            return True
        else:
            return False

    def generate_temporary_download_url(self):
        """
            generate_temporary_download_url: 生成加签的临时URL以供授信用户下载。一般在实际业务中,我们是提供给调用方一个临时下载链接,来让其获取文件数据,而不是直接使用以上暴露AccessKeyId和AccessKeySecret的方法。
            因此一般我们会存储某条数据oss的路径(<yourObjectName>)与调用方某个唯一标识的对应关系(如手机号身份证号),在调用方请求时,通过该标识获取其数据的oss文件路径(<yourObjectName>),
            然后制定过期时间,为其生成临时下载链接
            http://bucketname.oss-ap-south-1.aliyuncs.com/abc/efg/0?OSSAccessKeyId=LTA************oN9&Expires=1604638842&Signature=tPgvWz*************Uk%3D
        """
        res_temporary_url = self.bucket.sign_url('GET', 'abc/efg/0', 60, slash_safe=True)
        return res_temporary_url


if __name__ == '__main__':
    oss_client = OssClient()
    print(oss_client.bucket.bucket_name)
    print(oss_client.bucket.ACL)
    from itertools import islice
    for b in islice(oss2.ObjectIterator(oss_client.bucket), 10):
        print(b.key)
    ...
查看原文

赞 0 收藏 0 评论 0

屈天航 发布了文章 · 9月22日

机器学习模型python在线服务部署的两种实例

背景

众所周知python在机器学习实践中的应用广泛深入,而在我们业务中的应用集中在提供线上实时风控输出服务,比如国内业务的模型在线服务架构和海外业务的后台决策引擎架构。这两种应用的结合就要求我们考虑如何高效安全便捷地来实现模型的在线部署,为上游提供服务。
   在我们的考虑中,无论是代码复杂程度和业务场景,还是语言本身的特点,模型部署都有趋于向微服务架构转型的趋势和需要。一方面,需要进行代码分离来明确责任分工提高开发效率和容错性。另外一个方面,python在CPU密集型的应用中表现是无法令人满意的。为了使用协程来提高异步性从而处理更多的并发请求,最直接地就是将CPU密集转化为IO密集,因为Python天生就适合IO密集型的网络应用。
   因此,我们生产中将模型计算抽取为model_lib代码库,并且通过微服务online_model进行交互。这里我们调研过两种模型部署的方式,最终选择了第一种。

一、基于flask框架进行模型部署

Flask是一个轻量级的可定制框架,具有灵活、轻便且高效的特点,并且是标准的wsgi接口框架,易于扩展和维护。

1. 为什么选用nginx+uwsgi+flask这种技术架构

1)    Uwsgi搭配nginx性能快,内存占用低,高度可定制,自带详尽日志功能,支持平滑重启。

2)    Flask完全兼容了wsgi标准; 微框架,扩展性强; 完全基于unicode,不需处理编码问题;自带服务可独立做单元测试及开发。
image.png

3)    我们客户端采用了tornado协程,已经实现了将cpu计算转为io操作,服务端完全是CPU密集的模型计算,不会释放进程,异步框架保持大量文件描述符状态耗费内存,因此不适用异步IO框架。

2. 业务流程框架

image.png

3. 部署方式:

部署方式采用nginx+uwsgi+flask的方式,uwsgi可直接接受socket而不是http请求提高性能,再将服务转发给flask框架,这里注意flask此类wsgi标准接口的服务框架比如djangoweb.py在生产中一般不使用自带服务,而是在上层部署uwsgi或者gunicorn作为服务器来进行服务转发,上层再用nginx来做负载均衡,这样可以提高服务稳定性和性能。
(这里的inrouter层是我们自己封装的路由层,用来做集群中的路由转发,nginx只在本地做了端口转发。)
image.png

4. 代码示例:

uwsgi服务配置:

[uwsgi]
# 监听端口
socket=127.0.0.1:8200
# 进程数
processes=20

;async=4
;threads=2
;enable-threads = true
# 运行的目录
chdir = /home/rong/www/online_model
# wsgi文件
wsgi-file = model_main.py
callable=app
# 是否要有主进程
master = true
# 后台运行及其打印的日志
daemonize = /home/rong/www/log/uwsgi.log
# 主进程pid文件
pidfile = /home/rong/www/log/online_model/pid/uwsgi.pid
# 日志切割大小
log-maxsize = 5000000
# 不记录请求信息的日志。只记录错误以及uWSGI内部消息到日志中。
disable-logging = false
# 超时时间
http-timeout= 3
# 传输数据大小限制
buffer-size  = 1048576
# 每个进程单独加载
lazy-apps = true

flask服务关键代码:

import importlib
import json
import cProfile
import pstats
import StringIO
import time
import traceback
from flask import Flask, request
from common import rong_logger
from common.global_variable import StaticCacheClass
import autopath  # 不能去掉

app = Flask(__name__)
# 这里是模型代码库的统一入口,模型代码库中是通过抽象类实现的规范化的模型代码实例,通过此服务提供调用,也通过离线调度进行跑批任务。保证线上线下模型调用一致性
online_model_main = importlib.import_module('online_model_main')

MUST_PARAMS = ['resource_id', 'feature_dict']
SUCCESS_STATUS = 0
ERROR_STATUS = 1


# 路由函数只允许post请求
@app.route("/", methods=['POST'])
def model_main():
    uniq_id = '[%s][%s]' % (request.form.get('resource_id', ''), request.url)
    try:
        status, msg, params = _check_params(request)
        if status != SUCCESS_STATUS:
            rong_logger.error(uniq_id + 'params error, detail: %s' % msg)
            status, msg, result = status, msg, None
        else:
            resource_id, feature_dict = params['resource_id'], json.loads(params['feature_dict'])
            status, msg, result = online_model_main.main(resource_id, feature_dict, request, rong_logger)
            rong_logger.info(uniq_id + '[%s][%s][%s]' % (status, msg, result))

    except Exception as e:
        rong_logger.error(uniq_id + 'error: %s, detail: %s' % (str(e), traceback.format_exc()))
        status, msg, result = 5, 'online_model_error:' + str(e), None
    return _get_response(status, msg, result)

模型代码库模型实例:

其中 XgboostExecutor类是基于xgb模型抽象类实现的类,通过它来实例化一个模型对象,给flask应用提供调用。具体我们不再深究,有兴趣可以再写一个专题来介绍模型代码库的部署。

# -*- coding:utf-8 -*-
# !/usr/bin/python

import logging
import os

from executor.src.load_helper import read_cur_path
from executor.xgboost_model_executor import XgboostExecutor

logging.basicConfig(level=logging.INFO, format='%(asctime)s:%(message)s')




[model_path, features_path,feature_importance_path] = map(
    read_cur_path, ["xgb_model", "feature_list","feature_importance"]
)
model = XgboostExecutor(model_path, features_path,
                        feature_check_white_list=["n21_score"],
                        white_or_weight=False,
                        feature_check_weight_limit= 1,
                        feature_importance_path=feature_importance_path,
                        manager="qutianhang@xx.com",
                        developer="qutianhang@xx.com",
                        correlation="negative")

5. 性能比对

微服务改造后20并发请求模型:
image.png
微服务改造前20并发请求模型:
image.png

本机测试并发性能就提高了20%,但注意这是在高并发的情况下,就单条请求来看,微服务并不能显著提高性能。

二、 基于grpc进行在线模型部署

在 gRPC 里客户端应用可以像调用本地对象一样直接调用另一台不同的机器上服务端应用的方法,能够更容易地创建分布式应用和微服务。与许多 RPC 系统类似,gRPC 也是基于以下理念:定义一个服务,指定其能够被远程调用的方法(包含参数和返回类型)。在服务端实现这个接口,并运行一个 gRPC 服务器来处理客户端调用。在客户端拥有一个存根能够执行在服务端实现的一样的方法(这个方法就类似于接口)
image.png

1. 为什么选用grpc进行模型部署

  • 1)grpc使用ProtoBuf来定义服务、请求返回的数据格式,压缩和传输效率高,语法简单,表达力强。(如下为ProtoBuf的序列化和反序列话性能表现

image.pngimage.png

  • 2)grpc可支持tensorflow serving的接口调用,tensorflow完成模型训练和部署后,可提供服务接口,给grpc进行调用。实现方便,高效,自带版本管理、模型热更新等,很适合大规模线上业务,是我们下一步模型部署的技术方向。
  • 3)gRPC支持多种语言,并能够基于语言自动生成客户端和服务端功能库。

2. 部署方式(业务流程与之前相同)

部署方式采用nginx+grpc,要求nginx支持http2.0。在客户端将json特征字典转为protobuf。(https://github.com/NextTuesday/py-pb-converters/blob/master/pbjson.py 这里附上json和protobuf互相转化的脚本。)

image.png

3. 服务发现与负载均衡

image.png

4. 开发流程

image.png

客户端:
image.png

服务端:
image.png

三、 两种方式线上模型部署对比

1)    grpc使用protbuf更加复杂,需要在客户端服务端均保留protbuf文件并做校验,而flask只需要做好统一的接口标准规范即可。

2)    grpc使用http2.0更适用移动端的在线模型预测,或者基于tensorflowd的大规模线上模型部署和预测,flask更适用后端面向服务的手动模型部署和预测。

3) grpc节省数据空间,但与python交互需要做json和protbuf数据转换,flask兼容wsgi标准,适用于RESTful类服务,但数据传输占用空间较大。

查看原文

赞 4 收藏 1 评论 0

屈天航 关注了用户 · 9月8日

陈星星 @chen_xingxing

谜一样的烟火🎆

关注 29

屈天航 关注了用户 · 9月8日

ronniesong @sxssxs

Seek the truth!

关注 128

屈天航 发布了文章 · 9月7日

python性能优化

背景

本文主要在python代码的性能分析优化方面进行讨论,旨在解决一些语言层面比较常见的性能瓶颈,是在平时工作中的一些积累和总结,会比较基础和全面,顺便也会介绍一些在服务架构上的优化经验。

一、Python语言背景

python简单易学以及在数据计算分析方面优异的特点催生了庞大的用户群体和活跃社区性,使得它在数据分析、机器学习等领域有着先天的优势, 同时由于其协程特性和广泛的第三方支持,python在在线服务上也有广泛的使用。但是python在性能问题上有所有动态解释型高级语言的通病,也是制约python进一步广泛应用的重要因素。这也是这类解释型脚本语言的通病:

image.png

二、业务架构背景

单独脱离具体的业务应用场景来看性能问题是比较片面的。下面以我们当前的后端架构来看下python性能瓶颈在业务应用上的具体表现。该系统是基于大数据和机器学习模型的风在线风控系统,它为大量金融机构提供风控服务,基于大量结构化和非结构化数据、外部数据源、超万维的特征、以及复杂的建模技术。些也导致我们基于python的服务性能面临着严峻考验 。下图是架构简图:

image.png

性能分析

对代码优化的前提是需要了解性能瓶颈在什么地方,程序运行的主要时间是消耗在哪里,常见的可以在日志中打点来统计运行时间,对于比较复杂的代码也可以借助一些工具来定位,python 内置了丰富的性能分析工具,能够描述程序运行时候的性能,并提供各种统计帮助用户定位程序的性能瓶颈。常见的 profilers:cProfile,profile,line_profile,pprofile 以及 hotshot等,当然一些IDE比如pycharm中也继承了完善的profiling。这里我们只介绍有代表性的几种性能分析方法:

一、利用装饰器实现函数耗时统计打点

装饰器就是通过闭包来给原有函数增加新功能,python可以用装饰器这种语法糖来给函数进行耗时统计,但是这仅限于一般的同步方法,在协程中,更一般地说在生成器函数中,因为yield会释放当前线程,耗时统计执行到yield处就会中断返回,导致统计的失效。
如下是一个包含两层闭包(因为要给装饰器传参)的装饰器:

def time_consumer(module_name='public_module'):

    def time_cost(func):
        #获取调用装饰器的函数路径
        filepath =sys._getframe(1).f_code.co_filename
        @wraps(func)
        def warpper(*args,**kwargs):
            t1 = time.time()
            res = func(*args,**kwargs)
            t2 = time.time()
            content={}
            try:
            
                content['time_cost'] = round(float(t2-t1),3)
                content['method'] = func.__name__
                content['file'] = filepath.split(os.sep)[-1]
                content['module'] = module_name
            
                content_res = json.dumps(content)
                time_cost_logger.info(content_res)
            
            except Exception as e:
                time_cost_logger.warning('%s detail: %s' % (str(e), traceback.format_exc()))
            return res
        return warpper
    return time_cost

二、函数级性能分析工具cprofile

cProfile自python2.5以来就是标准版Python解释器默认的性能分析器,它是一种确定性分析器,只测量CPU时间,并不关心内存消耗和其他与内存相关联的信息。

性能分析结果

image.png

字段含义

ncalls:函数被调用的次数。
tottime:函数内部消耗总时间。
percall:每次调用平均消耗时间。
cumtime:消费时间的累计和。
filename:lineno(function):被分析函数所在文件名、行号、函数名。

使用方法

1、针对单个文件的性能分析:

 python -m cProfile -s tottime test.py

2、针对某个方法的性能分析:

import cProfile
def function():
    pass
if __name__ == '__main__':
    cProfile.run("function()")

3、项目中针对实时服务的性能分析:

 # 一般需要绑定在服务框架的钩子函数中来实现,如下两个方法分别放在入口和出口钩子中;pstats格式化统计信息,并根据需要做排序分析处理。

 def _start_profile(self):
     import cProfile
     self.pr = cProfile.Profile()
     self.pr.enable()
     
 def _print_profile_result(self):
     if not self.pr:
         return
     self.pr.disable()
     import pstats
     import StringIO
     s = StringIO.StringIO()
     stats = pstats.Stats(self.pr, stream=s).strip_dirs().sort_stats('tottime')
     stats.print_stats(50)

三、行级分析工具pprofile/line_profile

使用line_profile需要引入_kernprof__,因此我们这里选用pprofile,虽然pprofile的效率没有line_profile高,但在做性能分析时这是可以忽略的。pprofile的用法和cprofile的用法三完全一致。

性能分析结果

image.png
image.png

字段含义

Line:行号
Hits:该行代码执行次数
Time:总执行耗时
Time per hit:单次执行耗时
%:耗时占比

四、性能分析总结

我们在做性能分析时,可以挑选任何方便易用的方法或工具进行分析。但总体的思路是由整体到具体的。例如可以通过cprofile寻找整个代码执行过程中的耗时较长的函数,然后再通过pprofile对这些函数进行逐行分析,最终将代码的性能瓶颈精确到行级。

python性能优化方法

一、优化思路

Python的性能优化方式有很多,可以从不同的角度出发考虑具体问题具体分析,但可以归结为两个思路:从服务架构和CPU效率层面,将CPU密集型向IO密集型优化。从代码执行和cpu利用率层面,要提高代码性能以及多核利用率。比如,基于此,python在线服务的优化思路可以从这几方面考虑:

image.png

二、代码优化

使用字典/集合等hash等数据结构

常用操作:检索、去重、交集、并集、差集
1、在字典/集合中查找(以下代码中均省略记时部分)

dic = {str(k):1 for k in xrange(1000000)}
if 'qth' in dic.keys():
    pass
if 'qth' in dic:
    pass

耗时:
0.0590000152588
0.0

2、使用集合求交集

list1=list(range(10000))
list2=[i*2 for i in list1]
s1=set(list1)
s2=set(list2)
list3 = []
# 列表求交集
for k in list1:
    if k in list2:
        list3.append(k)
# 集合求交集
s3 = s1&s2

耗时:
0.819999933243
0.001000165939

Ps:集合操作在去重、交并差集等方面性能突出:

image.png

使用生成器代替可迭代对象

节省内存和计算资源,不需要计算整个可迭代对象,只计算需要循环的部分。
1、使用xrange而不是range(python3中无区别)

for i in range(1000000):
    pass
for i in xrange(1000000):
    pass

耗时:
0.0829999446869
0.0320000648499

2、列表推导使用生成器

dic = {str(k):1 for k in xrange(100000)}
list1 = [k for k in dic.keys()]
list1 = (k for k in dic.keys())

耗时:
0.0130000114441
0.00300002098083

3、复杂逻辑产生的迭代对象使用生成器函数来代替

def fib(max):
    n,a,b =0,0,1
    list = []
    while n < max:
        a,b =b,a+b
        n = n+1
        list.append(b)
    return list
# 迭代列表
for i in fib(100000):
    pass
def fib2(max):
    n, a, b = 0, 0, 1
    while n < max:
        yield b
        a, b = b, a + b
        n = n + 1
# 迭代生成器
for  i in fib2(100000):
    pass

耗时:
0.713000059128
0.138999938965

减少循环和冗余

这部分比较容易理解就不再附上示例了。
i) 在循环中不要做和迭代对象无关的事。将无关代码提到循环上层。
ii) 使用列表解析和生成器表达式
iii) 对于and,应该把满足条件少的放在前面,对于or,把满足条件多的放在前面。
iv) 迭代器中的字符串操作:是有join不要使用+。
v) 尽量减少嵌套循环,不要超过三层,否则考虑优化代码逻辑、优化数据格式、使用dataframe代替循环等方式。

尝试pandas和numpy中的数据结构

一个NumPy数组基本上是由元数据(维数、形状、数据类型等)和实际数据构成。数据存储在一个均匀连续的内存块中,该内存在系统内存(随机存取存储器,或RAM)的一个特定地址处,被称为数据缓冲区。这是和list等纯Python结构的主要区别,list的元素在系统内存中是分散存储的。这是使NumPy数组如此高效的决定性因素。

import numpy as np
def pySum(n):
    a=list(range(n))
    b=list(range(0,5*n,5))
    c=[]
    for i in range(len(a)):
        c.append(a[i]**2+b[i]**3)
    return c
def npSum(n):
    a=np.arange(n)
    b=np.arange(0,5*n,5)
    c=a**2+b**3
    return c
a=pySum(100000)
b=npSum(100000)

耗时:
0.138999891281
0.007123823012

多进程

python多进程multiprocessing的目的是为了提高多核利用率,适用于cpu密集的代码。需要注意的两点是,Pytho的普通变量不是进程安全的,考虑同步互斥时,要使用共享变量类型;协程中可以包含多进程,但是多进程中不能包含协程,因为多进程中协程会在yield处释放cpu直接返回,导致该进程无法再恢复。从另一个角度理解,协程本身的特点也是在单进程中实现cpu调度。
1、进程通信、共享变量
python多进程提供了基本所有的共享变量类型,常用的包括:共享队列、共享字典、共享列表、简单变量等,因此也提供了锁机制。具体不在这里赘述,相关模块:from multiprocessing import Process,Manager,Queue

2、分片与合并
多进程在优化cpu密集的操作时,一般需要将列表、字典等进行分片操作,在多进程里分别处理,再通过共享变量merge到一起,达到利用多核的目的,注意根据具体逻辑来判断是否需要加锁。这里的处理其实类似于golang中的协程并发,只是它的协程可以分配到多核,同样也需要channel来进行通信 。

from multiprocessing import Pool
p = Pool(4)
# 对循环传入的参数做分片处理
for i in range(5):
    p.apply_async(long_time_task, args=(i,))
p.close()
p.join()

多线程

Python多线程一般适用于IO密集型的代码,IO阻塞可以释放GIL锁,其他线程可以继续执行,并且线程切换代价要小于进程切换。要注意的是python中time.sleep()可以阻塞进程,但不会阻塞线程

class ThreadObj():
    executor = ThreadPoolExecutor(16)

    @run_on_executor
    def function(self):
        # 模拟IO操作, time.sleep不会阻塞多线程,线程会发生切换
        time.sleep(5)

协程

协程可以简单地理解为一种特殊的程序调用,特殊的是在执行过程中,在子程序内部可中断,然后转而执行别的子程序,在适当的时候再返回来接着执行。如果熟知了python生成器,其实可以知道协程也是由生成器实现的,因此也可以将协程理解为生成器+调度策略。通过调度策略来驱动生成器的执行和调度,达到协程的目的。这里的调度策略可能有很多种,简单的例如忙轮循:while True,更简单的甚至是一个for循环。复杂的可能是基于epoll的事件循环。在python2的tornado中,以及python3的asyncio中,都对协程的用法做了更好的封装,通过yield和await就可以使用协程。但其基本实现仍然是这种生成器+调度策略的模式。使用协程可以在单线程内实现cpu的释放和调度,不再需要进程或线程切换,只是函数调用的消耗。在这里我们举一个简单的生产消费例子:

def consumer():
    r = ''
    while True:
        n = yield r
        if not n:
            return
        print('[CONSUMER] Consuming %s...' % n)
        r = '200 OK'

def produce(c):
    r=c.send(None)
    print r
    n = 0
    while n<5:
        n = n + 1
        print('[PRODUCER] Producing %s...' % n)
        r = c.send(n)
        print('[PRODUCER] Consumer return: %s' % r)
    c.close()

c = consumer()
produce(c)

使用合适的python解释器

CPython:是用C语言实现Pyhon,是目前应用最广泛的解释器。最新的语言特性都是在这个上面先实现,基本包含了所有第三方库支持,但是CPython有几个缺陷,一是全局锁使Python在多线程效能上表现不佳,二是CPython无法支持JIT(即时编译),导致其执行速度不及Java和Javascipt等语言。于是出现了Pypy。

Pypy:是用Python自身实现的解释器。针对CPython的缺点进行了各方面的改良,性能得到很大的提升。最重要的一点就是Pypy集成了JIT。但是,Pypy无法支持官方的C/Python API,导致无法使用例如Numpy,Scipy等重要的第三方库。这也是现在Pypy没有被广泛使用的原因吧。

Jython:Jython是将python code在JVM上面跑和调用java code的解释器。

一些建议

合理使用copy与deepcopy

使用 join 合并迭代器中的字符串

使用最佳的反序列化方式 json>cPickle>eval。

不借助中间变量交换两个变量的值(有循环引用造成内存泄露的风险)。

不局限于python内置函数,一些情况下,内置函数的性能,远远不如自己写的。比如python的strptime方法,会生成一个9位的时间元祖,经常需要根据此元祖计算时间戳,该方法性能很差。我们完全可以自己将时间字符串转成split成需要的时间元祖。

用生成器改写直接返回列表的复杂函数,用列表推导替代简单函数,但是列表推导不要超过两个表达式。生成器> 列表推导>map/filter。

关键代码可以依赖于高性能的扩展包,因此有时候需要牺牲一些可移植性换取性能; 勇于尝试python新版本。

考虑优化的成本,一般先从数据结构和算法上优化,改善时间/空间复杂度,比如使用集合、分治、贪心、动态规划等,最后再从架构和整体框架上考虑。

Python代码的优化也需要具体问题具体分析,不局限于以上方式,但只要能够分析出性能瓶颈,问题就解决了一半。《约束理论与企业优化》中指出:“除了瓶颈之外,任何改进都是幻觉”。

三、优化实例

优化循环

将无关代码提到循环上层
image.png
去掉冗余循环
image.png
平均耗时由2.0239s提升到0.7896s,性能提升了61%

多进程

采用多进程将无关主进程的函数放到后台执行:
image.png
将列表分片到多进程中执行:
image.png
如图,1s内返回的请求比例提升了十个百分点,性能提升200ms左右,但不建议代码中过多使用,在业务高峰时会对机器负载造成压力。
image.png

时间元祖代替strptime方法

image.png
image.png
如图,模块平均耗时由123ms提升到79ms,提升35.7%,并且对一些badcase优化效果会更明显:
image.pngimage.png

应用集合

将复杂字典转成md5的可hash的字符串后,通过集合去重,性能提升60%以上。数据量越大,优化效果越好。
image.png

微服务业务解耦

将特征计算作为分布式微服务,实现IO与计算解耦,将cpu密集型转为IO密集,在框架和服务选用方面,我们分别测试了tornado协程、uwsgi多进程、import代码库、celery分布式计算等多种方式,在性能及可用性上tornado都有一定优势,上层nginx来代理做端口转发和负载均衡:
ab压测前后性能对比,虽然在单条请求上并没有优势,但是对高并发系统来说,并发量明显提升:
ab压测前后性能对比,虽然在单条请求上并没有优势,但是对高并发系统来说,并发量明显提升:
image.png
image.png

耗时模块pipeline实时计算:

image.png

image.png
image.png

命中pipeline实时特征后的性能提升:
image.png

python的高性能服务框架

虽然python的语言特性导致它在cpu密集型的代码中性能堪忧,但是python却很适合IO密集型的网络应用,加上它优异的数据分析处理能力以及广泛的第三方支持,python在服务框架上也应用广泛。

例如Django、flask、Tornado,如果考虑性能优先,就要选择高性能的服务框架。Python的高性能服务基本都是协程和基于epoll的事件循环实现的IO多路复用框架。tornado依靠强大的ioloop事件循环和gen封装的协程,让我们可以用yield关键字同步式地写出异步代码。

在python3.5+中,python引入原生的异步网络库asyncio,提供了原生的事件循环get_event_loop来支持协程。并用async/await对协程做了更好的封装。在tornado6.0中,ioloop已经已经实现了对asyncio事件循环的封装。除了标准库asyncio的事件循环,社区使用Cython实现了另外一个事件循环uvloop。用来取代标准库。号称是性能最好的python异步IO库。之前提到python的高性能服务实现都是基于协程和事件循环,因此我们可以尝试不同的协程和事件循环组合,对tornado服务进行改造,实现最优的性能搭配
image.png
篇幅原因这里不详细展开,我们可以简单看下在python2和python3中异步服务框架的性能表现,发现在服务端的事件循环中,python3优势明显。而且在三方库的兼容,其他异步性能库的支持上,以及在协程循环及关键字支持等语法上,还是推荐使用python3,在更加复杂的项目中,新版的优势会显而易见。但不论新旧版本的python,协程+事件循环的效率都要比多进程或线程高的多。这里顺便贴一个python3支持协程的异步IO库,基本支持了常见的中间件:https://github.com/aio-libs?p...
image.png
image.png
image.png
image.png

查看原文

赞 0 收藏 0 评论 0

屈天航 关注了标签 · 9月7日

vue.js

Reactive Components for Modern Web Interfaces.

Vue.js 是一个用于创建 web 交互界面的。其特点是

  • 简洁 HTML 模板 + JSON 数据,再创建一个 Vue 实例,就这么简单。
  • 数据驱动 自动追踪依赖的模板表达式和计算属性。
  • 组件化 用解耦、可复用的组件来构造界面。
  • 轻量 ~24kb min+gzip,无依赖。
  • 快速 精确有效的异步批量 DOM 更新。
  • 模块友好 通过 NPM 或 Bower 安装,无缝融入你的工作流。

官网:https://vuejs.org
GitHub:https://github.com/vuejs/vue

关注 100941

认证与成就

  • 获得 28 次点赞
  • 获得 2 枚徽章 获得 0 枚金徽章, 获得 0 枚银徽章, 获得 2 枚铜徽章

擅长技能
编辑

开源项目 & 著作
编辑

(゚∀゚ )
暂时没有

注册于 9月7日
个人主页被 1.6k 人浏览