Xavier

Xavier 查看完整档案

北京编辑  |  填写毕业院校  |  填写所在公司/组织 segmentfault.com/u/lihanx 编辑
编辑

Clairvoyant, Telepathist, PythonCoder
透视者,心灵感应者,Python开发者

个人动态

Xavier 回答了问题 · 2月25日

Consul 按照教程搭建集群后,无法获取已注册的服务

刚刚找到一个解决方案,自问自答一下..

client.Agent().Services() 获取的内容不完整,看起来只能获取本地 Agent 上注册的服务。如果要获取整个集群完整同步的信息,需要使用 client.Catalog().Services(nil)

集群的配置也不知道有没有问题,参考了网上很多人的代码,全都是使用的 Agent,但是我用就是没办法获取其他节点的服务信息。后面还要继续了解一下 Agent 的 Gossip Protocol

另外就是查看文档之后才了解到 Catalog 这一层。 Catalog HTTP APIAgent HTTP API 这块儿还要再了解一下。

就算解决了一半吧,希望对其他遇到这个问题的人有帮助,也希望有大佬或者前辈能给出更准确的解答。

再来补充两个参考:
Consul difference between agent and catalog
Anti-Entropy

对不起,打扰了

关注 1 回答 1

Xavier 提出了问题 · 2月23日

Consul 按照教程搭建集群后,无法获取已注册的服务

题目描述

按照 Deployment Guide 搭建了3个 Server 节点的集群,然后添加了2个 Client 节点,在任意节点注册一个服务后,尝试从其他节点获取,获取不到,但是在 UI 界面可以看到. 集群日志没有发现异常信息,设置完全是按照教程进行的,至少没有查出来差异.. 提前感谢..

相关代码

服务代码

package main

import (
    "context"
    "fmt"
    "log"
    "net"
    "net/http"
    "os"
    "os/signal"
    "strings"
    "syscall"
    "time"

    "github.com/gin-gonic/gin"
    consulapi "github.com/hashicorp/consul/api"
)

const (
    serviceName string = "111"
    port        int    = 8003
)

var (
    consulEndpoint string = "0.0.0.0:8500"
)

/*
服务主体
Graceful shutdown
启动时服务注册
终止时解注册
*/
func main() {
    router := gin.Default()

    router.GET("/", func(c *gin.Context) {
        c.String(http.StatusOK, "ok")
    })

    srv := &http.Server{
        Addr:    fmt.Sprintf(":%d", port),
        Handler: router,
    }

    register()

    go func() {
        if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
            log.Fatalf("listen: %s\n", err)
        }
    }()

    quit := make(chan os.Signal)
    signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
    <-quit

    deregister()

    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()
    if err := srv.Shutdown(ctx); err != nil {
        log.Fatal("Server Shutdown:", err)
    }
    select {
    case <-ctx.Done():
        log.Println("timeout of 5 seconds")
    }
    log.Println("Server exiting")
}

/*
获取本机IP地址
*/
func getLocalIP() (string, error) {
    conn, err := net.Dial("udp", "8.8.8.8:53")
    if err != nil {
        return "", err
    }
    addr := conn.LocalAddr().(*net.UDPAddr)
    host := strings.Split(addr.String(), ":")[0]
    return host, nil
}

/*
服务注册
*/
func register() {
    // init client
    config := consulapi.DefaultConfig()
    config.Address = consulEndpoint
    client, err := consulapi.NewClient(config)
    if err != nil {
        log.Fatalf("Service Register Failed: %s\n", err.Error())
    }
    host, err := getLocalIP()
    if err != nil {
        log.Fatalf("Service Register Failed: %s\n", err.Error())
    }
    // define registry
    reg := &consulapi.AgentServiceRegistration{
        ID:      serviceName,
        Name:    serviceName,
        Port:    port,
        Tags:    []string{"test"},
        Address: host,
        Check: &consulapi.AgentServiceCheck{
            TCP:                            fmt.Sprintf("%s:%d", host, port),
            Timeout:                        "5s",
            Interval:                       "10s",
            Status:                         "passing",
            DeregisterCriticalServiceAfter: "30s",
        },
    }
    err = client.Agent().ServiceRegister(reg)
    if err != nil {
        log.Fatal(err.Error())
    }
    log.Println("Service Registered")
}

/*
服务解注册
*/
func deregister() {
    config := consulapi.DefaultConfig()
    config.Address = consulEndpoint
    client, err := consulapi.NewClient(config)
    if err != nil {
        log.Fatal(err)
    }
    client.Agent().ServiceDeregister(serviceName)
    log.Println("Service Deregistered")
}

调用代码

只能在注册服务的节点查到服务,其他节点拿不到任何一个服务的信息

package main

import (
    "net"
    "fmt"
    "log"
    "strings"

    consulapi "github.com/hashicorp/consul/api"
)


const (
    serviceName string = "111"
)

var (
    consulEndpoint string
)


func main() {
    host, err := getLocalIP()
    if err != nil {
        log.Fatal(err)
    }

    consulEndpoint = fmt.Sprintf("%s:8500", host)
    config := consulapi.DefaultConfig()
    config.Address = consulEndpoint
    client, err := consulapi.NewClient(config)
    if err != nil {
        log.Fatal(err)
    }
    services, _ := client.Agent().Services()
    if len(services) == 0 {
        log.Fatal("No Services")
    }
    for k, v := range services {
        fmt.Println(k)
        fmt.Printf("%#v", v)
    }

    service, qm, err := client.Agent().Service(serviceName, nil)
    if err != nil {
        log.Fatal("Get Service Failed:", err)
    }
    fmt.Printf("%#v\n", service)
    fmt.Printf("%#v\n", qm)
}


func getLocalIP() (string, error) {
    conn, err := net.Dial("udp", "8.8.8.8:53")
    if err != nil {
        return "", err
    }
    addr := conn.LocalAddr().(*net.UDPAddr)
    host := strings.Split(addr.String(), ":")[0]
    return host, nil
}

关注 1 回答 1

Xavier 关注了标签 · 2月23日

consul

Consul提供了服务注册,服务发现,健康检查,多数据中心的集群软件

关注 29

Xavier 关注了标签 · 2020-05-21

golang

Go语言是谷歌2009发布的第二款开源编程语言。Go语言专门针对多处理器系统应用程序的编程进行了优化,使用Go编译的程序可以媲美C或C++代码的速度,而且更加安全、支持并行进程。
Go语言是谷歌推出的一种全新的编程语言,可以在不损失应用程序性能的情况下降低代码的复杂性。谷歌首席软件工程师罗布派克(Rob Pike)说:我们之所以开发Go,是因为过去10多年间软件开发的难度令人沮丧。Go是谷歌2009发布的第二款编程语言。

七牛云存储CEO许式伟出版《Go语言编程
go语言翻译项目 http://code.google.com/p/gola...
《go编程导读》 http://code.google.com/p/ac-m...
golang的官方文档 http://golang.org/doc/docs.html
golang windows上安装 http://code.google.com/p/gomi...

关注 26184

Xavier 赞了文章 · 2019-07-16

Redis与Lua及Redis-py应用Lua

基本命令

Redis 脚本使用 Lua 解释器来执行脚本。 Reids 2.6 版本通过内嵌支持 Lua 环境。执行脚本的常用命令为 EVAL。

EVAL script numkeys key [key ...] arg [arg ...]
EVAL "return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}" 2 key1 key2 first second
1) "key1"
2) "key2"
3) "first"
4) "second"

1 EVAL script numkeys key [key ...] arg [arg ...] 执行 Lua 脚本。
2 EVALSHA sha1 numkeys key [key ...] arg [arg ...] 执行 Lua 脚本。
3 SCRIPT EXISTS script [script ...] 查看指定的脚本是否已经被保存在缓存当中。
4 SCRIPT FLUSH 从脚本缓存中移除所有脚本。
5 SCRIPT KILL 杀死当前正在运行的 Lua 脚本。
6 SCRIPT LOAD script 将脚本 script 添加到脚本缓存中,但并不立即执行这个脚本。

Redis Eval 命令使用 Lua 解释器执行脚本。

EVAL script numkeys key [key ...] arg [arg ...]
参数说明
script: 参数是一段 Lua 5.1 脚本程序。脚本不必(也不应该)定义为一个 Lua 函数。
numkeys: 用于指定键名参数的个数。
key [key ...]: 从 EVAL 的第三个参数开始算起,表示在脚本中所用到的那些 Redis 键(key),这些键名参数可以在 Lua 中通过全局变量 KEYS 数组,用 1 为基址的形式访问( KEYS[1] , KEYS[2] ,以此类推)。
arg [arg ...]: 附加参数,在 Lua 中通过全局变量 ARGV 数组访问,访问的形式和 KEYS 变量类似( ARGV[1] 、 ARGV[2] ,诸如此类)。

Redis Evalsha 命令根据给定的 sha1 校验码,执行缓存在服务器中的脚本。
EVALSHA sha1 numkeys key [key ...] arg [arg ...]

redis 127.0.0.1:6379> SCRIPT LOAD "return 'hello moto'"
"232fd51614574cf0867b83d384a5e898cfd24e5a"
 
redis 127.0.0.1:6379> EVALSHA "232fd51614574cf0867b83d384a5e898cfd24e5a" 0
"hello moto"

Redis Script Exists 命令用于校验指定的脚本是否已经被保存在缓存当中。
SCRIPT EXISTS script [script ...]

redis 127.0.0.1:6379> SCRIPT LOAD "return 'hello moto'"    # 载入一个脚本
"232fd51614574cf0867b83d384a5e898cfd24e5a"
 
redis 127.0.0.1:6379> SCRIPT EXISTS 232fd51614574cf0867b83d384a5e898cfd24e5a
1) (integer) 1
 
redis 127.0.0.1:6379> SCRIPT FLUSH     # 清空缓存
OK
 
redis 127.0.0.1:6379> SCRIPT EXISTS 232fd51614574cf0867b83d384a5e898cfd24e5a
1) (integer) 0

SCRIPT FLUSH 从脚本缓存中移除所有脚本。
SCRIPT KILL 杀死当前正在运行的 Lua 脚本。
SCRIPT LOAD script 将脚本 script 添加到脚本缓存中,但并不立即执行这个脚本。

详细说明

这是从一个Lua脚本中使用两个不同的Lua函数来调用Redis的命令的例子:

redis.call()
redis.pcall()

redis.call() 与 redis.pcall()很类似, 他们唯一的区别是当redis命令执行结果返回错误时, redis.call()将返回给调用者一个错误,而redis.pcall()会将捕获的错误以Lua表的形式返回
redis.call() 和 redis.pcall() 两个函数的参数可以是任意的 Redis 命令:

> eval "return redis.call('set','foo','bar')" 0
OK

需要注意的是,上面这段脚本的确实现了将键 foo 的值设为 bar 的目的,但是,它违反了 EVAL 命令的语义,因为脚本里使用的所有键都应该由 KEYS 数组来传递,就像这样:

> eval "return redis.call('set',KEYS[1],'bar')" 1 foo
OK

要求使用正确的形式来传递键(key)是有原因的,**因为不仅仅是 EVAL 这个命令,所有的 Redis 命令,在执行之前都会被分析,籍此来确定命令会对哪些键进行操作。
因此,对于 EVAL 命令来说,必须使用正确的形式来传递键,才能确保分析工作正确地执行。 **

Lua 数据类型和 Redis 数据类型之间转换

当 Lua 通过 call() 或 pcall() 函数执行 Redis 命令的时候,命令的返回值会被转换成 Lua 数据结构。 同样地,当 Lua 脚本在 Redis 内置的解释器里运行时,Lua 脚本的返回值也会被转换成 Redis 协议(protocol),然后由 EVAL 将值返回给客户端。
下面两点需要重点注意:
lua中整数和浮点数之间没有什么区别。因此,我们始终将Lua的数字转换成整数的回复,这样将舍去小数部分。如果你想从Lua返回一个浮点数,你应该将它作为一个字符串
有两个辅助函数从Lua返回Redis的类型。

  • redis.error_reply(error_string) returns an error reply. This function simply returns the single field table with the err field set to the specified string for you.

  • redis.status_reply(status_string) returns a status reply. This function simply returns the single field table with the ok field set to the specified string for you.

return {err="My Error"}
return redis.error_reply("My Error")

脚本的原子性

Redis 使用单个 Lua 解释器去运行所有脚本,并且, Redis 也保证脚本会以原子性(atomic)的方式执行: 当某个脚本正在运行的时候,不会有其他脚本或 Redis 命令被执行。 这和使用 MULTI / EXEC 包围的事务很类似。 在其他别的客户端看来,脚本的效果(effect)要么是不可见的(not visible),要么就是已完成的(already completed)。

脚本缓存和 EVALSHA

EVAL 命令要求你在每次执行脚本的时候都发送一次脚本主体(script body)。Redis 有一个内部的脚本缓存机制,因此它不会每次都重新编译脚本
EVALSHA 命令,它的作用和 EVAL 一样,都用于对脚本求值,但它接受的第一个参数不是脚本,而是脚本的 SHA1 校验和(sum)。
客户端库的底层实现可以一直乐观地使用 EVALSHA 来代替 EVAL ,并期望着要使用的脚本已经保存在服务器上了,只有当 NOSCRIPT 错误发生时,才使用 EVAL 命令重新发送脚本,这样就可以最大限度地节省带宽。
刷新脚本缓存的唯一办法是显式地调用 SCRIPT FLUSH 命令,这个命令会清空运行过的所有脚本的缓存。通常只有在云计算环境中,才会执行这个命令。

Redis对lua脚本做出的限制

  • 不能访问系统时间或者其他内部状态

  • Redis 会返回一个错误,阻止这样的脚本运行: 这些脚本在执行随机命令之后(比如 RANDOMKEY 、 SRANDMEMBER 或 TIME 等),还会执行可以修改数据集的 Redis 命令。如果脚本只是执行只读操作,那么就没有这一限制。

  • 每当从 Lua 脚本中调用那些返回无序元素的命令时,执行命令所得的数据在返回给 Lua 之前会先执行一个静默(slient)的字典序排序(lexicographical sorting)。举个例子,因为 Redis 的 Set 保存的是无序的元素,所以在 Redis 命令行客户端中直接执行 SMEMBERS ,返回的元素是无序的,但是,假如在脚本中执行 redis.call(“smembers”, KEYS[1]) ,那么返回的总是排过序的元素。

  • 对 Lua 的伪随机数生成函数 math.random 和 math.randomseed 进行修改,使得每次在运行新脚本的时候,总是拥有同样的 seed 值。这意味着,每次运行脚本时,只要不使用 math.randomseed ,那么 math.random 产生的随机数序列总是相同的。

  • 全局变量保护,为了防止不必要的数据泄漏进 Lua 环境, Redis 脚本不允许创建全局变量。如果一个脚本需要在多次执行之间维持某种状态,它应该使用 Redis key 来进行状态保存。避免引入全局变量的一个诀窍是:将脚本中用到的所有变量都使用 local 关键字定义为局部变量。

可用库

Redis Lua解释器可用加载以下Lua库:
base
table
string
math
debug
struct 一个Lua装箱/拆箱的库
cjson 为Lua提供极快的JSON处理
cmsgpack为Lua提供了简单、快速的MessagePack操纵
bitop 为Lua的位运算模块增加了按位操作数。
redis.sha1hex function. 对字符串执行SHA1算法
每一个Redis实例都拥有以上的所有类库,以确保您使用脚本的环境都是一样的。
struct, CJSON 和 cmsgpack 都是外部库, 所有其他库都是标准。

redis 127.0.0.1:6379> eval 'return cjson.encode({["foo"]= "bar"})' 0
"{\"foo\":\"bar\"}"
redis 127.0.0.1:6379> eval 'return cjson.decode(ARGV[1])["foo"]' 0 "{\"foo\":\"bar\"}"
"bar"

127.0.0.1:6379> eval 'return cmsgpack.pack({"foo", "bar", "baz"})' 0
"\x93\xa3foo\xa3bar\xa3baz"
127.0.0.1:6379> eval 'return cmsgpack.unpack(ARGV[1])' 0 "\x93\xa3foo\xa3bar\xa3baz"
1) "foo"
2) "bar"
3) "baz"

使用脚本记录Redis 日志

在 Lua 脚本中,可以通过调用 redis.log 函数来写 Redis 日志(log):

redis.log(loglevel,message)

其中, message 参数是一个字符串,而 loglevel 参数可以是以下任意一个值:

  • redis.LOG_DEBUG

  • redis.LOG_VERBOSE

  • redis.LOG_NOTICE

  • redis.LOG_WARNING
    上面的这些等级(level)和标准 Redis 日志的等级相对应。

只有那些和当前 Redis 实例所设置的日志等级相同或更高级的日志才会被散发。
以下是一个日志示例:

redis.log(redis.LOG_WARNING, "Something is wrong with this script.")
执行上面的函数会产生这样的信息:
[32343] 22 Mar 15:21:39 # Something is wrong with this script.

沙箱(sandbox)和最大执行时间

脚本应该仅仅用于传递参数和对 Redis 数据进行处理,它不应该尝试去访问外部系统(比如文件系统),或者执行任何系统调用。
除此之外,脚本还有一个最大执行时间限制,它的默认值是 5 秒钟,一般正常运作的脚本通常可以在几分之几毫秒之内完成,花不了那么多时间,这个限制主要是为了防止因编程错误而造成的无限循环而设置的。
最大执行时间的长短由 lua-time-limit 选项来控制(以毫秒为单位),可以通过编辑 redis.conf 文件或者使用 CONFIG GET 和 CONFIG SET 命令来修改它。

当一个脚本达到最大执行时间的时候,它并不会自动被 Redis 结束,因为 Redis 必须保证脚本执行的原子性,而中途停止脚本的运行意味着可能会留下未处理完的数据在数据集(data set)里面。
因此,当脚本运行的时间超过最大执行时间后,以下动作会被执行:
Redis 记录一个脚本正在超时运行
Redis 开始重新接受其他客户端的命令请求,但是只有 SCRIPT KILL 和 SHUTDOWN NOSAVE 两个命令会被处理,对于其他命令请求, Redis 服务器只是简单地返回 BUSY 错误。
可以使用 SCRIPT KILL 命令将一个仅执行只读命令的脚本杀死,因为只读命令并不修改数据,因此杀死这个脚本并不破坏数据的完整性
如果脚本已经执行过写命令,那么唯一允许执行的操作就是 SHUTDOWN NOSAVE ,它通过停止服务器来阻止当前数据集写入磁盘

pipeline上下文(context)中的 EVALSHA

一旦在pipeline中因为 EVALSHA 命令而发生 NOSCRIPT 错误,那么这个pipeline就再也没有办法重新执行了,否则的话,命令的执行顺序就会被打乱。
为了防止出现以上所说的问题,客户端库实现应该实施以下的其中一项措施:

  • 总是在pipeline中使用 EVAL 命令

  • 检查pipeline中要用到的所有命令,找到其中的 EVAL 命令,并使用 SCRIPT EXISTS 命令检查要用到的脚本是不是全都已经保存在缓存里面了。如果所需的全部脚本都可以在缓存里找到,那么就可以放心地将所有 EVAL 命令改成 EVALSHA 命令,否则的话,就要在pipeline的顶端(top)将缺少的脚本用 SCRIPT LOAD 命令加上去。

案例1-实现访问频率限制:

实现访问者 $ip 在一定的时间 $time 内只能访问 $limit 次.
非脚本实现

private boolean accessLimit(String ip, int limit, int time, Jedis jedis) {
    boolean result = true;

    String key = "rate.limit:" + ip;
    if (jedis.exists(key)) {
        long afterValue = jedis.incr(key);
        if (afterValue > limit) {
            result = false;
        }
    } else {
        Transaction transaction = jedis.multi();
        transaction.incr(key);
        transaction.expire(key, time);
        transaction.exec();
    }
    return result;
}

以上代码有两点缺陷

  • 可能会出现竞态条件: 解决方法是用 WATCH 监控 rate.limit:$IP 的变动, 但较为麻烦;

  • 以上代码在不使用 pipeline 的情况下最多需要向Redis请求5条指令, 传输过多.

Lua脚本实现
Redis 允许将 Lua 脚本传到 Redis 服务器中执行, 脚本内可以调用大部分 Redis 命令, 且 Redis 保证脚本的 原子性 :
首先需要准备Lua代码: script.lua

local key = "rate.limit:" .. KEYS[1]
local limit = tonumber(ARGV[1])
local expire_time = ARGV[2]

local is_exists = redis.call("EXISTS", key)
if is_exists == 1 then
    if redis.call("INCR", key) > limit then
        return 0
    else
        return 1
    end
else
    redis.call("SET", key, 1)
    redis.call("EXPIRE", key, expire_time)
    return 1
end

Java

private boolean accessLimit(String ip, int limit, int timeout, Jedis connection) throws IOException {
    List<String> keys = Collections.singletonList(ip);
    List<String> argv = Arrays.asList(String.valueOf(limit), String.valueOf(timeout));

    return 1 == (long) connection.eval(loadScriptString("script.lua"), keys, argv);
}

// 加载Lua代码
private String loadScriptString(String fileName) throws IOException {
    Reader reader = new InputStreamReader(Client.class.getClassLoader().getResourceAsStream(fileName));
    return CharStreams.toString(reader);
}

Lua 嵌入 Redis 优势:

  • 减少网络开销: 不使用 Lua 的代码需要向 Redis 发送多次请求, 而脚本只需一次即可, 减少网络传输;

  • 原子操作: Redis 将整个脚本作为一个原子执行, 无需担心并发, 也就无需事务;

  • 复用: 脚本会永久保存 Redis 中, 其他客户端可继续使用.

案例2-使用Lua脚本重新构建带有过期时间的分布式锁.

案例来源: < Redis实战 > 第6、11章, 构建步骤:

  • 锁申请

  • 首先尝试加锁:

  • 成功则为锁设定过期时间; 返回;

  • 失败检测锁是否添加了过期时间;

  • wait.

  • 锁释放

  • 检查当前线程是否真的持有了该锁:

  • 持有: 则释放; 返回成功;

  • 失败: 返回失败.

非Lua实现

String acquireLockWithTimeOut(Jedis connection, String lockName, long acquireTimeOut, int lockTimeOut) {
    String identifier = UUID.randomUUID().toString();
    String key = "lock:" + lockName;

    long acquireTimeEnd = System.currentTimeMillis() + acquireTimeOut;
    while (System.currentTimeMillis() < acquireTimeEnd) {
        // 获取锁并设置过期时间
        if (connection.setnx(key, identifier) != 0) {
            connection.expire(key, lockTimeOut);
            return identifier;
        }
        // 检查过期时间, 并在必要时对其更新
        else if (connection.ttl(key) == -1) {
            connection.expire(key, lockTimeOut);
        }

        try {
            Thread.sleep(10);
        } catch (InterruptedException ignored) {
        }
    }
    return null;
}

boolean releaseLock(Jedis connection, String lockName, String identifier) {
    String key = "lock:" + lockName;

    connection.watch(key);
    // 确保当前线程还持有锁
    if (identifier.equals(connection.get(key))) {
        Transaction transaction = connection.multi();
        transaction.del(key);
        return transaction.exec().isEmpty();
    }
    connection.unwatch();

    return false;
}

Lua脚本实现
Lua脚本: acquire

local key = KEYS[1]
local identifier = ARGV[1]
local lockTimeOut = ARGV[2]

-- 锁定成功
if redis.call("SETNX", key, identifier) == 1 then
    redis.call("EXPIRE", key, lockTimeOut)
    return 1
elseif redis.call("TTL", key) == -1 then
    redis.call("EXPIRE", key, lockTimeOut)
end
return 0

Lua脚本: release

local key = KEYS[1]
local identifier = ARGV[1]

if redis.call("GET", key) == identifier then
    redis.call("DEL", key)
    return 1
end
return 0

参考:http://www.redis.cn/commands/...
http://www.redis.net.cn/tutor...
http://www.oschina.net/transl...
http://www.tuicool.com/articl...

查看原文

赞 3 收藏 4 评论 0

Xavier 赞了文章 · 2019-06-26

lxml 解析巨大深嵌套DOM树的问题

今天客户反映,我们的微信爬虫,有一篇文章的信息不全:问题链接

仔细观察之后,我们发现,这篇文章是由135微信编辑器生成的,正文内容的DOM树非常深,有几百层。

使用 lxml.etree.HTML(text).xp(xpath)进行解析的时候,如果DOM树过深,就解析会提前中止。

在build etree时,调用的是lxml.etree.XMLParser 类,而XMLParser接收 huge_tree=True的参数,允许解析巨大DOM树,而etree.HTML又接收自定义Parser,所以上述代码修改为:

lxml.etree.HTML(text, lxml.etree.XMLParser(huge_tree=True)).xp(xpath)之后,就可以顺利解析了。

查看原文

赞 1 收藏 0 评论 1

Xavier 关注了用户 · 2019-06-26

PETCoder亚洲善待程序猿组织 @secondsun

Sorry, but I have to leave.-Weibo

关注 21

Xavier 收藏了文章 · 2019-03-15

Redis与Lua及Redis-py应用Lua

基本命令

Redis 脚本使用 Lua 解释器来执行脚本。 Reids 2.6 版本通过内嵌支持 Lua 环境。执行脚本的常用命令为 EVAL。

EVAL script numkeys key [key ...] arg [arg ...]
EVAL "return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}" 2 key1 key2 first second
1) "key1"
2) "key2"
3) "first"
4) "second"

1 EVAL script numkeys key [key ...] arg [arg ...] 执行 Lua 脚本。
2 EVALSHA sha1 numkeys key [key ...] arg [arg ...] 执行 Lua 脚本。
3 SCRIPT EXISTS script [script ...] 查看指定的脚本是否已经被保存在缓存当中。
4 SCRIPT FLUSH 从脚本缓存中移除所有脚本。
5 SCRIPT KILL 杀死当前正在运行的 Lua 脚本。
6 SCRIPT LOAD script 将脚本 script 添加到脚本缓存中,但并不立即执行这个脚本。

Redis Eval 命令使用 Lua 解释器执行脚本。

EVAL script numkeys key [key ...] arg [arg ...]
参数说明
script: 参数是一段 Lua 5.1 脚本程序。脚本不必(也不应该)定义为一个 Lua 函数。
numkeys: 用于指定键名参数的个数。
key [key ...]: 从 EVAL 的第三个参数开始算起,表示在脚本中所用到的那些 Redis 键(key),这些键名参数可以在 Lua 中通过全局变量 KEYS 数组,用 1 为基址的形式访问( KEYS[1] , KEYS[2] ,以此类推)。
arg [arg ...]: 附加参数,在 Lua 中通过全局变量 ARGV 数组访问,访问的形式和 KEYS 变量类似( ARGV[1] 、 ARGV[2] ,诸如此类)。

Redis Evalsha 命令根据给定的 sha1 校验码,执行缓存在服务器中的脚本。
EVALSHA sha1 numkeys key [key ...] arg [arg ...]

redis 127.0.0.1:6379> SCRIPT LOAD "return 'hello moto'"
"232fd51614574cf0867b83d384a5e898cfd24e5a"
 
redis 127.0.0.1:6379> EVALSHA "232fd51614574cf0867b83d384a5e898cfd24e5a" 0
"hello moto"

Redis Script Exists 命令用于校验指定的脚本是否已经被保存在缓存当中。
SCRIPT EXISTS script [script ...]

redis 127.0.0.1:6379> SCRIPT LOAD "return 'hello moto'"    # 载入一个脚本
"232fd51614574cf0867b83d384a5e898cfd24e5a"
 
redis 127.0.0.1:6379> SCRIPT EXISTS 232fd51614574cf0867b83d384a5e898cfd24e5a
1) (integer) 1
 
redis 127.0.0.1:6379> SCRIPT FLUSH     # 清空缓存
OK
 
redis 127.0.0.1:6379> SCRIPT EXISTS 232fd51614574cf0867b83d384a5e898cfd24e5a
1) (integer) 0

SCRIPT FLUSH 从脚本缓存中移除所有脚本。
SCRIPT KILL 杀死当前正在运行的 Lua 脚本。
SCRIPT LOAD script 将脚本 script 添加到脚本缓存中,但并不立即执行这个脚本。

详细说明

这是从一个Lua脚本中使用两个不同的Lua函数来调用Redis的命令的例子:

redis.call()
redis.pcall()

redis.call() 与 redis.pcall()很类似, 他们唯一的区别是当redis命令执行结果返回错误时, redis.call()将返回给调用者一个错误,而redis.pcall()会将捕获的错误以Lua表的形式返回
redis.call() 和 redis.pcall() 两个函数的参数可以是任意的 Redis 命令:

> eval "return redis.call('set','foo','bar')" 0
OK

需要注意的是,上面这段脚本的确实现了将键 foo 的值设为 bar 的目的,但是,它违反了 EVAL 命令的语义,因为脚本里使用的所有键都应该由 KEYS 数组来传递,就像这样:

> eval "return redis.call('set',KEYS[1],'bar')" 1 foo
OK

要求使用正确的形式来传递键(key)是有原因的,**因为不仅仅是 EVAL 这个命令,所有的 Redis 命令,在执行之前都会被分析,籍此来确定命令会对哪些键进行操作。
因此,对于 EVAL 命令来说,必须使用正确的形式来传递键,才能确保分析工作正确地执行。 **

Lua 数据类型和 Redis 数据类型之间转换

当 Lua 通过 call() 或 pcall() 函数执行 Redis 命令的时候,命令的返回值会被转换成 Lua 数据结构。 同样地,当 Lua 脚本在 Redis 内置的解释器里运行时,Lua 脚本的返回值也会被转换成 Redis 协议(protocol),然后由 EVAL 将值返回给客户端。
下面两点需要重点注意:
lua中整数和浮点数之间没有什么区别。因此,我们始终将Lua的数字转换成整数的回复,这样将舍去小数部分。如果你想从Lua返回一个浮点数,你应该将它作为一个字符串
有两个辅助函数从Lua返回Redis的类型。

  • redis.error_reply(error_string) returns an error reply. This function simply returns the single field table with the err field set to the specified string for you.

  • redis.status_reply(status_string) returns a status reply. This function simply returns the single field table with the ok field set to the specified string for you.

return {err="My Error"}
return redis.error_reply("My Error")

脚本的原子性

Redis 使用单个 Lua 解释器去运行所有脚本,并且, Redis 也保证脚本会以原子性(atomic)的方式执行: 当某个脚本正在运行的时候,不会有其他脚本或 Redis 命令被执行。 这和使用 MULTI / EXEC 包围的事务很类似。 在其他别的客户端看来,脚本的效果(effect)要么是不可见的(not visible),要么就是已完成的(already completed)。

脚本缓存和 EVALSHA

EVAL 命令要求你在每次执行脚本的时候都发送一次脚本主体(script body)。Redis 有一个内部的脚本缓存机制,因此它不会每次都重新编译脚本
EVALSHA 命令,它的作用和 EVAL 一样,都用于对脚本求值,但它接受的第一个参数不是脚本,而是脚本的 SHA1 校验和(sum)。
客户端库的底层实现可以一直乐观地使用 EVALSHA 来代替 EVAL ,并期望着要使用的脚本已经保存在服务器上了,只有当 NOSCRIPT 错误发生时,才使用 EVAL 命令重新发送脚本,这样就可以最大限度地节省带宽。
刷新脚本缓存的唯一办法是显式地调用 SCRIPT FLUSH 命令,这个命令会清空运行过的所有脚本的缓存。通常只有在云计算环境中,才会执行这个命令。

Redis对lua脚本做出的限制

  • 不能访问系统时间或者其他内部状态

  • Redis 会返回一个错误,阻止这样的脚本运行: 这些脚本在执行随机命令之后(比如 RANDOMKEY 、 SRANDMEMBER 或 TIME 等),还会执行可以修改数据集的 Redis 命令。如果脚本只是执行只读操作,那么就没有这一限制。

  • 每当从 Lua 脚本中调用那些返回无序元素的命令时,执行命令所得的数据在返回给 Lua 之前会先执行一个静默(slient)的字典序排序(lexicographical sorting)。举个例子,因为 Redis 的 Set 保存的是无序的元素,所以在 Redis 命令行客户端中直接执行 SMEMBERS ,返回的元素是无序的,但是,假如在脚本中执行 redis.call(“smembers”, KEYS[1]) ,那么返回的总是排过序的元素。

  • 对 Lua 的伪随机数生成函数 math.random 和 math.randomseed 进行修改,使得每次在运行新脚本的时候,总是拥有同样的 seed 值。这意味着,每次运行脚本时,只要不使用 math.randomseed ,那么 math.random 产生的随机数序列总是相同的。

  • 全局变量保护,为了防止不必要的数据泄漏进 Lua 环境, Redis 脚本不允许创建全局变量。如果一个脚本需要在多次执行之间维持某种状态,它应该使用 Redis key 来进行状态保存。避免引入全局变量的一个诀窍是:将脚本中用到的所有变量都使用 local 关键字定义为局部变量。

可用库

Redis Lua解释器可用加载以下Lua库:
base
table
string
math
debug
struct 一个Lua装箱/拆箱的库
cjson 为Lua提供极快的JSON处理
cmsgpack为Lua提供了简单、快速的MessagePack操纵
bitop 为Lua的位运算模块增加了按位操作数。
redis.sha1hex function. 对字符串执行SHA1算法
每一个Redis实例都拥有以上的所有类库,以确保您使用脚本的环境都是一样的。
struct, CJSON 和 cmsgpack 都是外部库, 所有其他库都是标准。

redis 127.0.0.1:6379> eval 'return cjson.encode({["foo"]= "bar"})' 0
"{\"foo\":\"bar\"}"
redis 127.0.0.1:6379> eval 'return cjson.decode(ARGV[1])["foo"]' 0 "{\"foo\":\"bar\"}"
"bar"

127.0.0.1:6379> eval 'return cmsgpack.pack({"foo", "bar", "baz"})' 0
"\x93\xa3foo\xa3bar\xa3baz"
127.0.0.1:6379> eval 'return cmsgpack.unpack(ARGV[1])' 0 "\x93\xa3foo\xa3bar\xa3baz"
1) "foo"
2) "bar"
3) "baz"

使用脚本记录Redis 日志

在 Lua 脚本中,可以通过调用 redis.log 函数来写 Redis 日志(log):

redis.log(loglevel,message)

其中, message 参数是一个字符串,而 loglevel 参数可以是以下任意一个值:

  • redis.LOG_DEBUG

  • redis.LOG_VERBOSE

  • redis.LOG_NOTICE

  • redis.LOG_WARNING
    上面的这些等级(level)和标准 Redis 日志的等级相对应。

只有那些和当前 Redis 实例所设置的日志等级相同或更高级的日志才会被散发。
以下是一个日志示例:

redis.log(redis.LOG_WARNING, "Something is wrong with this script.")
执行上面的函数会产生这样的信息:
[32343] 22 Mar 15:21:39 # Something is wrong with this script.

沙箱(sandbox)和最大执行时间

脚本应该仅仅用于传递参数和对 Redis 数据进行处理,它不应该尝试去访问外部系统(比如文件系统),或者执行任何系统调用。
除此之外,脚本还有一个最大执行时间限制,它的默认值是 5 秒钟,一般正常运作的脚本通常可以在几分之几毫秒之内完成,花不了那么多时间,这个限制主要是为了防止因编程错误而造成的无限循环而设置的。
最大执行时间的长短由 lua-time-limit 选项来控制(以毫秒为单位),可以通过编辑 redis.conf 文件或者使用 CONFIG GET 和 CONFIG SET 命令来修改它。

当一个脚本达到最大执行时间的时候,它并不会自动被 Redis 结束,因为 Redis 必须保证脚本执行的原子性,而中途停止脚本的运行意味着可能会留下未处理完的数据在数据集(data set)里面。
因此,当脚本运行的时间超过最大执行时间后,以下动作会被执行:
Redis 记录一个脚本正在超时运行
Redis 开始重新接受其他客户端的命令请求,但是只有 SCRIPT KILL 和 SHUTDOWN NOSAVE 两个命令会被处理,对于其他命令请求, Redis 服务器只是简单地返回 BUSY 错误。
可以使用 SCRIPT KILL 命令将一个仅执行只读命令的脚本杀死,因为只读命令并不修改数据,因此杀死这个脚本并不破坏数据的完整性
如果脚本已经执行过写命令,那么唯一允许执行的操作就是 SHUTDOWN NOSAVE ,它通过停止服务器来阻止当前数据集写入磁盘

pipeline上下文(context)中的 EVALSHA

一旦在pipeline中因为 EVALSHA 命令而发生 NOSCRIPT 错误,那么这个pipeline就再也没有办法重新执行了,否则的话,命令的执行顺序就会被打乱。
为了防止出现以上所说的问题,客户端库实现应该实施以下的其中一项措施:

  • 总是在pipeline中使用 EVAL 命令

  • 检查pipeline中要用到的所有命令,找到其中的 EVAL 命令,并使用 SCRIPT EXISTS 命令检查要用到的脚本是不是全都已经保存在缓存里面了。如果所需的全部脚本都可以在缓存里找到,那么就可以放心地将所有 EVAL 命令改成 EVALSHA 命令,否则的话,就要在pipeline的顶端(top)将缺少的脚本用 SCRIPT LOAD 命令加上去。

案例1-实现访问频率限制:

实现访问者 $ip 在一定的时间 $time 内只能访问 $limit 次.
非脚本实现

private boolean accessLimit(String ip, int limit, int time, Jedis jedis) {
    boolean result = true;

    String key = "rate.limit:" + ip;
    if (jedis.exists(key)) {
        long afterValue = jedis.incr(key);
        if (afterValue > limit) {
            result = false;
        }
    } else {
        Transaction transaction = jedis.multi();
        transaction.incr(key);
        transaction.expire(key, time);
        transaction.exec();
    }
    return result;
}

以上代码有两点缺陷

  • 可能会出现竞态条件: 解决方法是用 WATCH 监控 rate.limit:$IP 的变动, 但较为麻烦;

  • 以上代码在不使用 pipeline 的情况下最多需要向Redis请求5条指令, 传输过多.

Lua脚本实现
Redis 允许将 Lua 脚本传到 Redis 服务器中执行, 脚本内可以调用大部分 Redis 命令, 且 Redis 保证脚本的 原子性 :
首先需要准备Lua代码: script.lua

local key = "rate.limit:" .. KEYS[1]
local limit = tonumber(ARGV[1])
local expire_time = ARGV[2]

local is_exists = redis.call("EXISTS", key)
if is_exists == 1 then
    if redis.call("INCR", key) > limit then
        return 0
    else
        return 1
    end
else
    redis.call("SET", key, 1)
    redis.call("EXPIRE", key, expire_time)
    return 1
end

Java

private boolean accessLimit(String ip, int limit, int timeout, Jedis connection) throws IOException {
    List<String> keys = Collections.singletonList(ip);
    List<String> argv = Arrays.asList(String.valueOf(limit), String.valueOf(timeout));

    return 1 == (long) connection.eval(loadScriptString("script.lua"), keys, argv);
}

// 加载Lua代码
private String loadScriptString(String fileName) throws IOException {
    Reader reader = new InputStreamReader(Client.class.getClassLoader().getResourceAsStream(fileName));
    return CharStreams.toString(reader);
}

Lua 嵌入 Redis 优势:

  • 减少网络开销: 不使用 Lua 的代码需要向 Redis 发送多次请求, 而脚本只需一次即可, 减少网络传输;

  • 原子操作: Redis 将整个脚本作为一个原子执行, 无需担心并发, 也就无需事务;

  • 复用: 脚本会永久保存 Redis 中, 其他客户端可继续使用.

案例2-使用Lua脚本重新构建带有过期时间的分布式锁.

案例来源: < Redis实战 > 第6、11章, 构建步骤:

  • 锁申请

  • 首先尝试加锁:

  • 成功则为锁设定过期时间; 返回;

  • 失败检测锁是否添加了过期时间;

  • wait.

  • 锁释放

  • 检查当前线程是否真的持有了该锁:

  • 持有: 则释放; 返回成功;

  • 失败: 返回失败.

非Lua实现

String acquireLockWithTimeOut(Jedis connection, String lockName, long acquireTimeOut, int lockTimeOut) {
    String identifier = UUID.randomUUID().toString();
    String key = "lock:" + lockName;

    long acquireTimeEnd = System.currentTimeMillis() + acquireTimeOut;
    while (System.currentTimeMillis() < acquireTimeEnd) {
        // 获取锁并设置过期时间
        if (connection.setnx(key, identifier) != 0) {
            connection.expire(key, lockTimeOut);
            return identifier;
        }
        // 检查过期时间, 并在必要时对其更新
        else if (connection.ttl(key) == -1) {
            connection.expire(key, lockTimeOut);
        }

        try {
            Thread.sleep(10);
        } catch (InterruptedException ignored) {
        }
    }
    return null;
}

boolean releaseLock(Jedis connection, String lockName, String identifier) {
    String key = "lock:" + lockName;

    connection.watch(key);
    // 确保当前线程还持有锁
    if (identifier.equals(connection.get(key))) {
        Transaction transaction = connection.multi();
        transaction.del(key);
        return transaction.exec().isEmpty();
    }
    connection.unwatch();

    return false;
}

Lua脚本实现
Lua脚本: acquire

local key = KEYS[1]
local identifier = ARGV[1]
local lockTimeOut = ARGV[2]

-- 锁定成功
if redis.call("SETNX", key, identifier) == 1 then
    redis.call("EXPIRE", key, lockTimeOut)
    return 1
elseif redis.call("TTL", key) == -1 then
    redis.call("EXPIRE", key, lockTimeOut)
end
return 0

Lua脚本: release

local key = KEYS[1]
local identifier = ARGV[1]

if redis.call("GET", key) == identifier then
    redis.call("DEL", key)
    return 1
end
return 0

参考:http://www.redis.cn/commands/...
http://www.redis.net.cn/tutor...
http://www.oschina.net/transl...
http://www.tuicool.com/articl...

查看原文

Xavier 赞了文章 · 2019-03-15

Redis与Lua及Redis-py应用Lua

基本命令

Redis 脚本使用 Lua 解释器来执行脚本。 Reids 2.6 版本通过内嵌支持 Lua 环境。执行脚本的常用命令为 EVAL。

EVAL script numkeys key [key ...] arg [arg ...]
EVAL "return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}" 2 key1 key2 first second
1) "key1"
2) "key2"
3) "first"
4) "second"

1 EVAL script numkeys key [key ...] arg [arg ...] 执行 Lua 脚本。
2 EVALSHA sha1 numkeys key [key ...] arg [arg ...] 执行 Lua 脚本。
3 SCRIPT EXISTS script [script ...] 查看指定的脚本是否已经被保存在缓存当中。
4 SCRIPT FLUSH 从脚本缓存中移除所有脚本。
5 SCRIPT KILL 杀死当前正在运行的 Lua 脚本。
6 SCRIPT LOAD script 将脚本 script 添加到脚本缓存中,但并不立即执行这个脚本。

Redis Eval 命令使用 Lua 解释器执行脚本。

EVAL script numkeys key [key ...] arg [arg ...]
参数说明
script: 参数是一段 Lua 5.1 脚本程序。脚本不必(也不应该)定义为一个 Lua 函数。
numkeys: 用于指定键名参数的个数。
key [key ...]: 从 EVAL 的第三个参数开始算起,表示在脚本中所用到的那些 Redis 键(key),这些键名参数可以在 Lua 中通过全局变量 KEYS 数组,用 1 为基址的形式访问( KEYS[1] , KEYS[2] ,以此类推)。
arg [arg ...]: 附加参数,在 Lua 中通过全局变量 ARGV 数组访问,访问的形式和 KEYS 变量类似( ARGV[1] 、 ARGV[2] ,诸如此类)。

Redis Evalsha 命令根据给定的 sha1 校验码,执行缓存在服务器中的脚本。
EVALSHA sha1 numkeys key [key ...] arg [arg ...]

redis 127.0.0.1:6379> SCRIPT LOAD "return 'hello moto'"
"232fd51614574cf0867b83d384a5e898cfd24e5a"
 
redis 127.0.0.1:6379> EVALSHA "232fd51614574cf0867b83d384a5e898cfd24e5a" 0
"hello moto"

Redis Script Exists 命令用于校验指定的脚本是否已经被保存在缓存当中。
SCRIPT EXISTS script [script ...]

redis 127.0.0.1:6379> SCRIPT LOAD "return 'hello moto'"    # 载入一个脚本
"232fd51614574cf0867b83d384a5e898cfd24e5a"
 
redis 127.0.0.1:6379> SCRIPT EXISTS 232fd51614574cf0867b83d384a5e898cfd24e5a
1) (integer) 1
 
redis 127.0.0.1:6379> SCRIPT FLUSH     # 清空缓存
OK
 
redis 127.0.0.1:6379> SCRIPT EXISTS 232fd51614574cf0867b83d384a5e898cfd24e5a
1) (integer) 0

SCRIPT FLUSH 从脚本缓存中移除所有脚本。
SCRIPT KILL 杀死当前正在运行的 Lua 脚本。
SCRIPT LOAD script 将脚本 script 添加到脚本缓存中,但并不立即执行这个脚本。

详细说明

这是从一个Lua脚本中使用两个不同的Lua函数来调用Redis的命令的例子:

redis.call()
redis.pcall()

redis.call() 与 redis.pcall()很类似, 他们唯一的区别是当redis命令执行结果返回错误时, redis.call()将返回给调用者一个错误,而redis.pcall()会将捕获的错误以Lua表的形式返回
redis.call() 和 redis.pcall() 两个函数的参数可以是任意的 Redis 命令:

> eval "return redis.call('set','foo','bar')" 0
OK

需要注意的是,上面这段脚本的确实现了将键 foo 的值设为 bar 的目的,但是,它违反了 EVAL 命令的语义,因为脚本里使用的所有键都应该由 KEYS 数组来传递,就像这样:

> eval "return redis.call('set',KEYS[1],'bar')" 1 foo
OK

要求使用正确的形式来传递键(key)是有原因的,**因为不仅仅是 EVAL 这个命令,所有的 Redis 命令,在执行之前都会被分析,籍此来确定命令会对哪些键进行操作。
因此,对于 EVAL 命令来说,必须使用正确的形式来传递键,才能确保分析工作正确地执行。 **

Lua 数据类型和 Redis 数据类型之间转换

当 Lua 通过 call() 或 pcall() 函数执行 Redis 命令的时候,命令的返回值会被转换成 Lua 数据结构。 同样地,当 Lua 脚本在 Redis 内置的解释器里运行时,Lua 脚本的返回值也会被转换成 Redis 协议(protocol),然后由 EVAL 将值返回给客户端。
下面两点需要重点注意:
lua中整数和浮点数之间没有什么区别。因此,我们始终将Lua的数字转换成整数的回复,这样将舍去小数部分。如果你想从Lua返回一个浮点数,你应该将它作为一个字符串
有两个辅助函数从Lua返回Redis的类型。

  • redis.error_reply(error_string) returns an error reply. This function simply returns the single field table with the err field set to the specified string for you.

  • redis.status_reply(status_string) returns a status reply. This function simply returns the single field table with the ok field set to the specified string for you.

return {err="My Error"}
return redis.error_reply("My Error")

脚本的原子性

Redis 使用单个 Lua 解释器去运行所有脚本,并且, Redis 也保证脚本会以原子性(atomic)的方式执行: 当某个脚本正在运行的时候,不会有其他脚本或 Redis 命令被执行。 这和使用 MULTI / EXEC 包围的事务很类似。 在其他别的客户端看来,脚本的效果(effect)要么是不可见的(not visible),要么就是已完成的(already completed)。

脚本缓存和 EVALSHA

EVAL 命令要求你在每次执行脚本的时候都发送一次脚本主体(script body)。Redis 有一个内部的脚本缓存机制,因此它不会每次都重新编译脚本
EVALSHA 命令,它的作用和 EVAL 一样,都用于对脚本求值,但它接受的第一个参数不是脚本,而是脚本的 SHA1 校验和(sum)。
客户端库的底层实现可以一直乐观地使用 EVALSHA 来代替 EVAL ,并期望着要使用的脚本已经保存在服务器上了,只有当 NOSCRIPT 错误发生时,才使用 EVAL 命令重新发送脚本,这样就可以最大限度地节省带宽。
刷新脚本缓存的唯一办法是显式地调用 SCRIPT FLUSH 命令,这个命令会清空运行过的所有脚本的缓存。通常只有在云计算环境中,才会执行这个命令。

Redis对lua脚本做出的限制

  • 不能访问系统时间或者其他内部状态

  • Redis 会返回一个错误,阻止这样的脚本运行: 这些脚本在执行随机命令之后(比如 RANDOMKEY 、 SRANDMEMBER 或 TIME 等),还会执行可以修改数据集的 Redis 命令。如果脚本只是执行只读操作,那么就没有这一限制。

  • 每当从 Lua 脚本中调用那些返回无序元素的命令时,执行命令所得的数据在返回给 Lua 之前会先执行一个静默(slient)的字典序排序(lexicographical sorting)。举个例子,因为 Redis 的 Set 保存的是无序的元素,所以在 Redis 命令行客户端中直接执行 SMEMBERS ,返回的元素是无序的,但是,假如在脚本中执行 redis.call(“smembers”, KEYS[1]) ,那么返回的总是排过序的元素。

  • 对 Lua 的伪随机数生成函数 math.random 和 math.randomseed 进行修改,使得每次在运行新脚本的时候,总是拥有同样的 seed 值。这意味着,每次运行脚本时,只要不使用 math.randomseed ,那么 math.random 产生的随机数序列总是相同的。

  • 全局变量保护,为了防止不必要的数据泄漏进 Lua 环境, Redis 脚本不允许创建全局变量。如果一个脚本需要在多次执行之间维持某种状态,它应该使用 Redis key 来进行状态保存。避免引入全局变量的一个诀窍是:将脚本中用到的所有变量都使用 local 关键字定义为局部变量。

可用库

Redis Lua解释器可用加载以下Lua库:
base
table
string
math
debug
struct 一个Lua装箱/拆箱的库
cjson 为Lua提供极快的JSON处理
cmsgpack为Lua提供了简单、快速的MessagePack操纵
bitop 为Lua的位运算模块增加了按位操作数。
redis.sha1hex function. 对字符串执行SHA1算法
每一个Redis实例都拥有以上的所有类库,以确保您使用脚本的环境都是一样的。
struct, CJSON 和 cmsgpack 都是外部库, 所有其他库都是标准。

redis 127.0.0.1:6379> eval 'return cjson.encode({["foo"]= "bar"})' 0
"{\"foo\":\"bar\"}"
redis 127.0.0.1:6379> eval 'return cjson.decode(ARGV[1])["foo"]' 0 "{\"foo\":\"bar\"}"
"bar"

127.0.0.1:6379> eval 'return cmsgpack.pack({"foo", "bar", "baz"})' 0
"\x93\xa3foo\xa3bar\xa3baz"
127.0.0.1:6379> eval 'return cmsgpack.unpack(ARGV[1])' 0 "\x93\xa3foo\xa3bar\xa3baz"
1) "foo"
2) "bar"
3) "baz"

使用脚本记录Redis 日志

在 Lua 脚本中,可以通过调用 redis.log 函数来写 Redis 日志(log):

redis.log(loglevel,message)

其中, message 参数是一个字符串,而 loglevel 参数可以是以下任意一个值:

  • redis.LOG_DEBUG

  • redis.LOG_VERBOSE

  • redis.LOG_NOTICE

  • redis.LOG_WARNING
    上面的这些等级(level)和标准 Redis 日志的等级相对应。

只有那些和当前 Redis 实例所设置的日志等级相同或更高级的日志才会被散发。
以下是一个日志示例:

redis.log(redis.LOG_WARNING, "Something is wrong with this script.")
执行上面的函数会产生这样的信息:
[32343] 22 Mar 15:21:39 # Something is wrong with this script.

沙箱(sandbox)和最大执行时间

脚本应该仅仅用于传递参数和对 Redis 数据进行处理,它不应该尝试去访问外部系统(比如文件系统),或者执行任何系统调用。
除此之外,脚本还有一个最大执行时间限制,它的默认值是 5 秒钟,一般正常运作的脚本通常可以在几分之几毫秒之内完成,花不了那么多时间,这个限制主要是为了防止因编程错误而造成的无限循环而设置的。
最大执行时间的长短由 lua-time-limit 选项来控制(以毫秒为单位),可以通过编辑 redis.conf 文件或者使用 CONFIG GET 和 CONFIG SET 命令来修改它。

当一个脚本达到最大执行时间的时候,它并不会自动被 Redis 结束,因为 Redis 必须保证脚本执行的原子性,而中途停止脚本的运行意味着可能会留下未处理完的数据在数据集(data set)里面。
因此,当脚本运行的时间超过最大执行时间后,以下动作会被执行:
Redis 记录一个脚本正在超时运行
Redis 开始重新接受其他客户端的命令请求,但是只有 SCRIPT KILL 和 SHUTDOWN NOSAVE 两个命令会被处理,对于其他命令请求, Redis 服务器只是简单地返回 BUSY 错误。
可以使用 SCRIPT KILL 命令将一个仅执行只读命令的脚本杀死,因为只读命令并不修改数据,因此杀死这个脚本并不破坏数据的完整性
如果脚本已经执行过写命令,那么唯一允许执行的操作就是 SHUTDOWN NOSAVE ,它通过停止服务器来阻止当前数据集写入磁盘

pipeline上下文(context)中的 EVALSHA

一旦在pipeline中因为 EVALSHA 命令而发生 NOSCRIPT 错误,那么这个pipeline就再也没有办法重新执行了,否则的话,命令的执行顺序就会被打乱。
为了防止出现以上所说的问题,客户端库实现应该实施以下的其中一项措施:

  • 总是在pipeline中使用 EVAL 命令

  • 检查pipeline中要用到的所有命令,找到其中的 EVAL 命令,并使用 SCRIPT EXISTS 命令检查要用到的脚本是不是全都已经保存在缓存里面了。如果所需的全部脚本都可以在缓存里找到,那么就可以放心地将所有 EVAL 命令改成 EVALSHA 命令,否则的话,就要在pipeline的顶端(top)将缺少的脚本用 SCRIPT LOAD 命令加上去。

案例1-实现访问频率限制:

实现访问者 $ip 在一定的时间 $time 内只能访问 $limit 次.
非脚本实现

private boolean accessLimit(String ip, int limit, int time, Jedis jedis) {
    boolean result = true;

    String key = "rate.limit:" + ip;
    if (jedis.exists(key)) {
        long afterValue = jedis.incr(key);
        if (afterValue > limit) {
            result = false;
        }
    } else {
        Transaction transaction = jedis.multi();
        transaction.incr(key);
        transaction.expire(key, time);
        transaction.exec();
    }
    return result;
}

以上代码有两点缺陷

  • 可能会出现竞态条件: 解决方法是用 WATCH 监控 rate.limit:$IP 的变动, 但较为麻烦;

  • 以上代码在不使用 pipeline 的情况下最多需要向Redis请求5条指令, 传输过多.

Lua脚本实现
Redis 允许将 Lua 脚本传到 Redis 服务器中执行, 脚本内可以调用大部分 Redis 命令, 且 Redis 保证脚本的 原子性 :
首先需要准备Lua代码: script.lua

local key = "rate.limit:" .. KEYS[1]
local limit = tonumber(ARGV[1])
local expire_time = ARGV[2]

local is_exists = redis.call("EXISTS", key)
if is_exists == 1 then
    if redis.call("INCR", key) > limit then
        return 0
    else
        return 1
    end
else
    redis.call("SET", key, 1)
    redis.call("EXPIRE", key, expire_time)
    return 1
end

Java

private boolean accessLimit(String ip, int limit, int timeout, Jedis connection) throws IOException {
    List<String> keys = Collections.singletonList(ip);
    List<String> argv = Arrays.asList(String.valueOf(limit), String.valueOf(timeout));

    return 1 == (long) connection.eval(loadScriptString("script.lua"), keys, argv);
}

// 加载Lua代码
private String loadScriptString(String fileName) throws IOException {
    Reader reader = new InputStreamReader(Client.class.getClassLoader().getResourceAsStream(fileName));
    return CharStreams.toString(reader);
}

Lua 嵌入 Redis 优势:

  • 减少网络开销: 不使用 Lua 的代码需要向 Redis 发送多次请求, 而脚本只需一次即可, 减少网络传输;

  • 原子操作: Redis 将整个脚本作为一个原子执行, 无需担心并发, 也就无需事务;

  • 复用: 脚本会永久保存 Redis 中, 其他客户端可继续使用.

案例2-使用Lua脚本重新构建带有过期时间的分布式锁.

案例来源: < Redis实战 > 第6、11章, 构建步骤:

  • 锁申请

  • 首先尝试加锁:

  • 成功则为锁设定过期时间; 返回;

  • 失败检测锁是否添加了过期时间;

  • wait.

  • 锁释放

  • 检查当前线程是否真的持有了该锁:

  • 持有: 则释放; 返回成功;

  • 失败: 返回失败.

非Lua实现

String acquireLockWithTimeOut(Jedis connection, String lockName, long acquireTimeOut, int lockTimeOut) {
    String identifier = UUID.randomUUID().toString();
    String key = "lock:" + lockName;

    long acquireTimeEnd = System.currentTimeMillis() + acquireTimeOut;
    while (System.currentTimeMillis() < acquireTimeEnd) {
        // 获取锁并设置过期时间
        if (connection.setnx(key, identifier) != 0) {
            connection.expire(key, lockTimeOut);
            return identifier;
        }
        // 检查过期时间, 并在必要时对其更新
        else if (connection.ttl(key) == -1) {
            connection.expire(key, lockTimeOut);
        }

        try {
            Thread.sleep(10);
        } catch (InterruptedException ignored) {
        }
    }
    return null;
}

boolean releaseLock(Jedis connection, String lockName, String identifier) {
    String key = "lock:" + lockName;

    connection.watch(key);
    // 确保当前线程还持有锁
    if (identifier.equals(connection.get(key))) {
        Transaction transaction = connection.multi();
        transaction.del(key);
        return transaction.exec().isEmpty();
    }
    connection.unwatch();

    return false;
}

Lua脚本实现
Lua脚本: acquire

local key = KEYS[1]
local identifier = ARGV[1]
local lockTimeOut = ARGV[2]

-- 锁定成功
if redis.call("SETNX", key, identifier) == 1 then
    redis.call("EXPIRE", key, lockTimeOut)
    return 1
elseif redis.call("TTL", key) == -1 then
    redis.call("EXPIRE", key, lockTimeOut)
end
return 0

Lua脚本: release

local key = KEYS[1]
local identifier = ARGV[1]

if redis.call("GET", key) == identifier then
    redis.call("DEL", key)
    return 1
end
return 0

参考:http://www.redis.cn/commands/...
http://www.redis.net.cn/tutor...
http://www.oschina.net/transl...
http://www.tuicool.com/articl...

查看原文

赞 3 收藏 4 评论 0

Xavier 赞了文章 · 2018-11-06

Linux IO模式及 select、poll、epoll详解

注:本文是对众多博客的学习和总结,可能存在理解错误。请带着怀疑的眼光,同时如果有错误希望能指出。

同步IO和异步IO,阻塞IO和非阻塞IO分别是什么,到底有什么区别?不同的人在不同的上下文下给出的答案是不同的。所以先限定一下本文的上下文。

本文讨论的背景是Linux环境下的network IO。

一 概念说明

在进行解释之前,首先要说明几个概念:
- 用户空间和内核空间
- 进程切换
- 进程的阻塞
- 文件描述符
- 缓存 I/O

用户空间与内核空间

现在操作系统都是采用虚拟存储器,那么对32位操作系统而言,它的寻址空间(虚拟存储空间)为4G(2的32次方)。操作系统的核心是内核,独立于普通的应用程序,可以访问受保护的内存空间,也有访问底层硬件设备的所有权限。为了保证用户进程不能直接操作内核(kernel),保证内核的安全,操心系统将虚拟空间划分为两部分,一部分为内核空间,一部分为用户空间。针对linux操作系统而言,将最高的1G字节(从虚拟地址0xC0000000到0xFFFFFFFF),供内核使用,称为内核空间,而将较低的3G字节(从虚拟地址0x00000000到0xBFFFFFFF),供各个进程使用,称为用户空间。

进程切换

为了控制进程的执行,内核必须有能力挂起正在CPU上运行的进程,并恢复以前挂起的某个进程的执行。这种行为被称为进程切换。因此可以说,任何进程都是在操作系统内核的支持下运行的,是与内核紧密相关的。

从一个进程的运行转到另一个进程上运行,这个过程中经过下面这些变化:
1. 保存处理机上下文,包括程序计数器和其他寄存器。
2. 更新PCB信息。
3. 把进程的PCB移入相应的队列,如就绪、在某事件阻塞等队列。
4. 选择另一个进程执行,并更新其PCB。
5. 更新内存管理的数据结构。
6. 恢复处理机上下文。

注:总而言之就是很耗资源,具体的可以参考这篇文章:进程切换

进程的阻塞

正在执行的进程,由于期待的某些事件未发生,如请求系统资源失败、等待某种操作的完成、新数据尚未到达或无新工作做等,则由系统自动执行阻塞原语(Block),使自己由运行状态变为阻塞状态。可见,进程的阻塞是进程自身的一种主动行为,也因此只有处于运行态的进程(获得CPU),才可能将其转为阻塞状态。当进程进入阻塞状态,是不占用CPU资源的

文件描述符fd

文件描述符(File descriptor)是计算机科学中的一个术语,是一个用于表述指向文件的引用的抽象化概念。

文件描述符在形式上是一个非负整数。实际上,它是一个索引值,指向内核为每一个进程所维护的该进程打开文件的记录表。当程序打开一个现有文件或者创建一个新文件时,内核向进程返回一个文件描述符。在程序设计中,一些涉及底层的程序编写往往会围绕着文件描述符展开。但是文件描述符这一概念往往只适用于UNIX、Linux这样的操作系统。

缓存 I/O

缓存 I/O 又被称作标准 I/O,大多数文件系统的默认 I/O 操作都是缓存 I/O。在 Linux 的缓存 I/O 机制中,操作系统会将 I/O 的数据缓存在文件系统的页缓存( page cache )中,也就是说,数据会先被拷贝到操作系统内核的缓冲区中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。

缓存 I/O 的缺点:
数据在传输过程中需要在应用程序地址空间和内核进行多次数据拷贝操作,这些数据拷贝操作所带来的 CPU 以及内存开销是非常大的。

二 IO模式

刚才说了,对于一次IO访问(以read举例),数据会先被拷贝到操作系统内核的缓冲区中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。所以说,当一个read操作发生时,它会经历两个阶段:
1. 等待数据准备 (Waiting for the data to be ready)
2. 将数据从内核拷贝到进程中 (Copying the data from the kernel to the process)

正式因为这两个阶段,linux系统产生了下面五种网络模式的方案。
- 阻塞 I/O(blocking IO)
- 非阻塞 I/O(nonblocking IO)
- I/O 多路复用( IO multiplexing)
- 信号驱动 I/O( signal driven IO)
- 异步 I/O(asynchronous IO)

注:由于signal driven IO在实际中并不常用,所以我这只提及剩下的四种IO Model。

阻塞 I/O(blocking IO)

在linux中,默认情况下所有的socket都是blocking,一个典型的读操作流程大概是这样:
clipboard.png

当用户进程调用了recvfrom这个系统调用,kernel就开始了IO的第一个阶段:准备数据(对于网络IO来说,很多时候数据在一开始还没有到达。比如,还没有收到一个完整的UDP包。这个时候kernel就要等待足够的数据到来)。这个过程需要等待,也就是说数据被拷贝到操作系统内核的缓冲区中是需要一个过程的。而在用户进程这边,整个进程会被阻塞(当然,是进程自己选择的阻塞)。当kernel一直等到数据准备好了,它就会将数据从kernel中拷贝到用户内存,然后kernel返回结果,用户进程才解除block的状态,重新运行起来。

所以,blocking IO的特点就是在IO执行的两个阶段都被block了。

非阻塞 I/O(nonblocking IO)

linux下,可以通过设置socket使其变为non-blocking。当对一个non-blocking socket执行读操作时,流程是这个样子:
clipboard.png

当用户进程发出read操作时,如果kernel中的数据还没有准备好,那么它并不会block用户进程,而是立刻返回一个error。从用户进程角度讲 ,它发起一个read操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个error时,它就知道数据还没有准备好,于是它可以再次发送read操作。一旦kernel中的数据准备好了,并且又再次收到了用户进程的system call,那么它马上就将数据拷贝到了用户内存,然后返回。

所以,nonblocking IO的特点是用户进程需要不断的主动询问kernel数据好了没有。

I/O 多路复用( IO multiplexing)

IO multiplexing就是我们说的select,poll,epoll,有些地方也称这种IO方式为event driven IO。select/epoll的好处就在于单个process就可以同时处理多个网络连接的IO。它的基本原理就是select,poll,epoll这个function会不断的轮询所负责的所有socket,当某个socket有数据到达了,就通知用户进程。

clipboard.png

当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”所有select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程。

所以,I/O 多路复用的特点是通过一种机制一个进程能同时等待多个文件描述符,而这些文件描述符(套接字描述符)其中的任意一个进入读就绪状态,select()函数就可以返回。

这个图和blocking IO的图其实并没有太大的不同,事实上,还更差一些。因为这里需要使用两个system call (select 和 recvfrom),而blocking IO只调用了一个system call (recvfrom)。但是,用select的优势在于它可以同时处理多个connection。

所以,如果处理的连接数不是很高的话,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,可能延迟还更大。select/epoll的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接。)

在IO multiplexing Model中,实际中,对于每一个socket,一般都设置成为non-blocking,但是,如上图所示,整个用户的process其实是一直被block的。只不过process是被select这个函数block,而不是被socket IO给block。

异步 I/O(asynchronous IO)

inux下的asynchronous IO其实用得很少。先看一下它的流程:
clipboard.png

用户进程发起read操作之后,立刻就可以开始去做其它的事。而另一方面,从kernel的角度,当它受到一个asynchronous read之后,首先它会立刻返回,所以不会对用户进程产生任何block。然后,kernel会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都完成之后,kernel会给用户进程发送一个signal,告诉它read操作完成了。

总结

blocking和non-blocking的区别

调用blocking IO会一直block住对应的进程直到操作完成,而non-blocking IO在kernel还准备数据的情况下会立刻返回。

synchronous IO和asynchronous IO的区别

在说明synchronous IO和asynchronous IO的区别之前,需要先给出两者的定义。POSIX的定义是这样子的:
- A synchronous I/O operation causes the requesting process to be blocked until that I/O operation completes;
- An asynchronous I/O operation does not cause the requesting process to be blocked;

两者的区别就在于synchronous IO做”IO operation”的时候会将process阻塞。按照这个定义,之前所述的blocking IO,non-blocking IO,IO multiplexing都属于synchronous IO。

有人会说,non-blocking IO并没有被block啊。这里有个非常“狡猾”的地方,定义中所指的”IO operation”是指真实的IO操作,就是例子中的recvfrom这个system call。non-blocking IO在执行recvfrom这个system call的时候,如果kernel的数据没有准备好,这时候不会block进程。但是,当kernel中数据准备好的时候,recvfrom会将数据从kernel拷贝到用户内存中,这个时候进程是被block了,在这段时间内,进程是被block的。

而asynchronous IO则不一样,当进程发起IO 操作之后,就直接返回再也不理睬了,直到kernel发送一个信号,告诉进程说IO完成。在这整个过程中,进程完全没有被block。

各个IO Model的比较如图所示:
clipboard.png

通过上面的图片,可以发现non-blocking IO和asynchronous IO的区别还是很明显的。在non-blocking IO中,虽然进程大部分时间都不会被block,但是它仍然要求进程去主动的check,并且当数据准备完成以后,也需要进程主动的再次调用recvfrom来将数据拷贝到用户内存。而asynchronous IO则完全不同。它就像是用户进程将整个IO操作交给了他人(kernel)完成,然后他人做完后发信号通知。在此期间,用户进程不需要去检查IO操作的状态,也不需要主动的去拷贝数据。

三 I/O 多路复用之select、poll、epoll详解

select,poll,epoll都是IO多路复用的机制。I/O多路复用就是通过一种机制,一个进程可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。但select,poll,epoll本质上都是同步I/O,因为他们都需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的,而异步I/O则无需自己负责进行读写,异步I/O的实现会负责把数据从内核拷贝到用户空间。(这里啰嗦下)

select

int select (int n, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

select 函数监视的文件描述符分3类,分别是writefds、readfds、和exceptfds。调用后select函数会阻塞,直到有描述副就绪(有数据 可读、可写、或者有except),或者超时(timeout指定等待时间,如果立即返回设为null即可),函数返回。当select函数返回后,可以 通过遍历fdset,来找到就绪的描述符。

select目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点。select的一 个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,可以通过修改宏定义甚至重新编译内核的方式提升这一限制,但 是这样也会造成效率的降低。

poll

int poll (struct pollfd *fds, unsigned int nfds, int timeout);

不同与select使用三个位图来表示三个fdset的方式,poll使用一个 pollfd的指针实现。

struct pollfd {
    int fd; /* file descriptor */
    short events; /* requested events to watch */
    short revents; /* returned events witnessed */
};

pollfd结构包含了要监视的event和发生的event,不再使用select“参数-值”传递的方式。同时,pollfd并没有最大数量限制(但是数量过大后性能也是会下降)。 和select函数一样,poll返回后,需要轮询pollfd来获取就绪的描述符。

从上面看,select和poll都需要在返回后,通过遍历文件描述符来获取已经就绪的socket。事实上,同时连接的大量客户端在一时刻可能只有很少的处于就绪状态,因此随着监视的描述符数量的增长,其效率也会线性下降。

epoll

epoll是在2.6内核中提出的,是之前的select和poll的增强版本。相对于select和poll来说,epoll更加灵活,没有描述符限制。epoll使用一个文件描述符管理多个描述符,将用户关系的文件描述符的事件存放到内核的一个事件表中,这样在用户空间和内核空间的copy只需一次。

一 epoll操作过程

epoll操作过程需要三个接口,分别如下:

int epoll_create(int size);//创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

1. int epoll_create(int size);
创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大,这个参数不同于select()中的第一个参数,给出最大监听的fd+1的值,参数size并不是限制了epoll所能监听的描述符最大个数,只是对内核初始分配内部数据结构的一个建议
当创建好epoll句柄后,它就会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。

2. int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
函数是对指定描述符fd执行op操作。
- epfd:是epoll_create()的返回值。
- op:表示op操作,用三个宏来表示:添加EPOLL_CTL_ADD,删除EPOLL_CTL_DEL,修改EPOLL_CTL_MOD。分别添加、删除和修改对fd的监听事件。
- fd:是需要监听的fd(文件描述符)
- epoll_event:是告诉内核需要监听什么事,struct epoll_event结构如下:

struct epoll_event {
  __uint32_t events;  /* Epoll events */
  epoll_data_t data;  /* User data variable */
};

//events可以是以下几个宏的集合:
EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
EPOLLOUT:表示对应的文件描述符可以写;
EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
EPOLLERR:表示对应的文件描述符发生错误;
EPOLLHUP:表示对应的文件描述符被挂断;
EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里

3. int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
等待epfd上的io事件,最多返回maxevents个事件。
参数events用来从内核得到事件的集合,maxevents告之内核这个events有多大,这个maxevents的值不能大于创建epoll_create()时的size,参数timeout是超时时间(毫秒,0会立即返回,-1将不确定,也有说法说是永久阻塞)。该函数返回需要处理的事件数目,如返回0表示已超时。

二 工作模式

 epoll对文件描述符的操作有两种模式:LT(level trigger)ET(edge trigger)。LT模式是默认模式,LT模式与ET模式的区别如下:
  LT模式:当epoll_wait检测到描述符事件发生并将此事件通知应用程序,应用程序可以不立即处理该事件。下次调用epoll_wait时,会再次响应应用程序并通知此事件。
  ET模式:当epoll_wait检测到描述符事件发生并将此事件通知应用程序,应用程序必须立即处理该事件。如果不处理,下次调用epoll_wait时,不会再次响应应用程序并通知此事件。

1. LT模式

LT(level triggered)是缺省的工作方式,并且同时支持block和no-block socket.在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你的。

2. ET模式

ET(edge-triggered)是高速工作方式,只支持no-block socket。在这种模式下,当描述符从未就绪变为就绪时,内核通过epoll告诉你。然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知,直到你做了某些操作导致那个文件描述符不再为就绪状态了(比如,你在发送,接收或者接收请求,或者发送接收的数据少于一定量时导致了一个EWOULDBLOCK 错误)。但是请注意,如果一直不对这个fd作IO操作(从而导致它再次变成未就绪),内核不会发送更多的通知(only once)

ET模式在很大程度上减少了epoll事件被重复触发的次数,因此效率要比LT模式高。epoll工作在ET模式的时候,必须使用非阻塞套接口,以避免由于一个文件句柄的阻塞读/阻塞写操作把处理多个文件描述符的任务饿死。

3. 总结

假如有这样一个例子:
1. 我们已经把一个用来从管道中读取数据的文件句柄(RFD)添加到epoll描述符
2. 这个时候从管道的另一端被写入了2KB的数据
3. 调用epoll_wait(2),并且它会返回RFD,说明它已经准备好读取操作
4. 然后我们读取了1KB的数据
5. 调用epoll_wait(2)......

LT模式:
如果是LT模式,那么在第5步调用epoll_wait(2)之后,仍然能受到通知。

ET模式:
如果我们在第1步将RFD添加到epoll描述符的时候使用了EPOLLET标志,那么在第5步调用epoll_wait(2)之后将有可能会挂起,因为剩余的数据还存在于文件的输入缓冲区内,而且数据发出端还在等待一个针对已经发出数据的反馈信息。只有在监视的文件句柄上发生了某个事件的时候 ET 工作模式才会汇报事件。因此在第5步的时候,调用者可能会放弃等待仍在存在于文件输入缓冲区内的剩余数据。

当使用epoll的ET模型来工作时,当产生了一个EPOLLIN事件后,
读数据的时候需要考虑的是当recv()返回的大小如果等于请求的大小,那么很有可能是缓冲区还有数据未读完,也意味着该次事件还没有处理完,所以还需要再次读取:

while(rs){
  buflen = recv(activeevents[i].data.fd, buf, sizeof(buf), 0);
  if(buflen < 0){
    // 由于是非阻塞的模式,所以当errno为EAGAIN时,表示当前缓冲区已无数据可读
    // 在这里就当作是该次事件已处理处.
    if(errno == EAGAIN){
        break;
    }
    else{
        return;
    }
  }
  else if(buflen == 0){
     // 这里表示对端的socket已正常关闭.
  }

 if(buflen == sizeof(buf){
      rs = 1;   // 需要再次读取
 }
 else{
      rs = 0;
 }
}

Linux中的EAGAIN含义

Linux环境下开发经常会碰到很多错误(设置errno),其中EAGAIN是其中比较常见的一个错误(比如用在非阻塞操作中)。
从字面上来看,是提示再试一次。这个错误经常出现在当应用程序进行一些非阻塞(non-blocking)操作(对文件或socket)的时候。

例如,以 O_NONBLOCK的标志打开文件/socket/FIFO,如果你连续做read操作而没有数据可读。此时程序不会阻塞起来等待数据准备就绪返回,read函数会返回一个错误EAGAIN,提示你的应用程序现在没有数据可读请稍后再试。
又例如,当一个系统调用(比如fork)因为没有足够的资源(比如虚拟内存)而执行失败,返回EAGAIN提示其再调用一次(也许下次就能成功)。

三 代码演示

下面是一段不完整的代码且格式不对,意在表述上面的过程,去掉了一些模板代码。

#define IPADDRESS   "127.0.0.1"
#define PORT        8787
#define MAXSIZE     1024
#define LISTENQ     5
#define FDSIZE      1000
#define EPOLLEVENTS 100

listenfd = socket_bind(IPADDRESS,PORT);

struct epoll_event events[EPOLLEVENTS];

//创建一个描述符
epollfd = epoll_create(FDSIZE);

//添加监听描述符事件
add_event(epollfd,listenfd,EPOLLIN);

//循环等待
for ( ; ; ){
    //该函数返回已经准备好的描述符事件数目
    ret = epoll_wait(epollfd,events,EPOLLEVENTS,-1);
    //处理接收到的连接
    handle_events(epollfd,events,ret,listenfd,buf);
}

//事件处理函数
static void handle_events(int epollfd,struct epoll_event *events,int num,int listenfd,char *buf)
{
     int i;
     int fd;
     //进行遍历;这里只要遍历已经准备好的io事件。num并不是当初epoll_create时的FDSIZE。
     for (i = 0;i < num;i++)
     {
         fd = events[i].data.fd;
        //根据描述符的类型和事件类型进行处理
         if ((fd == listenfd) &&(events[i].events & EPOLLIN))
            handle_accpet(epollfd,listenfd);
         else if (events[i].events & EPOLLIN)
            do_read(epollfd,fd,buf);
         else if (events[i].events & EPOLLOUT)
            do_write(epollfd,fd,buf);
     }
}

//添加事件
static void add_event(int epollfd,int fd,int state){
    struct epoll_event ev;
    ev.events = state;
    ev.data.fd = fd;
    epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&ev);
}

//处理接收到的连接
static void handle_accpet(int epollfd,int listenfd){
     int clifd;     
     struct sockaddr_in cliaddr;     
     socklen_t  cliaddrlen;     
     clifd = accept(listenfd,(struct sockaddr*)&cliaddr,&cliaddrlen);     
     if (clifd == -1)         
     perror("accpet error:");     
     else {         
         printf("accept a new client: %s:%d\n",inet_ntoa(cliaddr.sin_addr),cliaddr.sin_port);                       //添加一个客户描述符和事件         
         add_event(epollfd,clifd,EPOLLIN);     
     } 
}

//读处理
static void do_read(int epollfd,int fd,char *buf){
    int nread;
    nread = read(fd,buf,MAXSIZE);
    if (nread == -1)     {         
        perror("read error:");         
        close(fd); //记住close fd        
        delete_event(epollfd,fd,EPOLLIN); //删除监听 
    }
    else if (nread == 0)     {         
        fprintf(stderr,"client close.\n");
        close(fd); //记住close fd       
        delete_event(epollfd,fd,EPOLLIN); //删除监听 
    }     
    else {         
        printf("read message is : %s",buf);        
        //修改描述符对应的事件,由读改为写         
        modify_event(epollfd,fd,EPOLLOUT);     
    } 
}

//写处理
static void do_write(int epollfd,int fd,char *buf) {     
    int nwrite;     
    nwrite = write(fd,buf,strlen(buf));     
    if (nwrite == -1){         
        perror("write error:");        
        close(fd);   //记住close fd       
        delete_event(epollfd,fd,EPOLLOUT);  //删除监听    
    }else{
        modify_event(epollfd,fd,EPOLLIN); 
    }    
    memset(buf,0,MAXSIZE); 
}

//删除事件
static void delete_event(int epollfd,int fd,int state) {
    struct epoll_event ev;
    ev.events = state;
    ev.data.fd = fd;
    epoll_ctl(epollfd,EPOLL_CTL_DEL,fd,&ev);
}

//修改事件
static void modify_event(int epollfd,int fd,int state){     
    struct epoll_event ev;
    ev.events = state;
    ev.data.fd = fd;
    epoll_ctl(epollfd,EPOLL_CTL_MOD,fd,&ev);
}

//注:另外一端我就省了

四 epoll总结

在 select/poll中,进程只有在调用一定的方法后,内核才对所有监视的文件描述符进行扫描,而epoll事先通过epoll_ctl()来注册一 个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait() 时便得到通知。(此处去掉了遍历文件描述符,而是通过监听回调的的机制。这正是epoll的魅力所在。)

epoll的优点主要是一下几个方面:
1. 监视的描述符数量不受限制,它所支持的FD上限是最大可以打开文件的数目,这个数字一般远大于2048,举个例子,在1GB内存的机器上大约是10万左 右,具体数目可以cat /proc/sys/fs/file-max察看,一般来说这个数目和系统内存关系很大。select的最大缺点就是进程打开的fd是有数量限制的。这对 于连接数量比较大的服务器来说根本不能满足。虽然也可以选择多进程的解决方案( Apache就是这样实现的),不过虽然linux上面创建进程的代价比较小,但仍旧是不可忽视的,加上进程间数据同步远比不上线程间同步的高效,所以也不是一种完美的方案。

  1. IO的效率不会随着监视fd的数量的增长而下降。epoll不同于select和poll轮询的方式,而是通过每个fd定义的回调函数来实现的。只有就绪的fd才会执行回调函数。

如果没有大量的idle -connection或者dead-connection,epoll的效率并不会比select/poll高很多,但是当遇到大量的idle- connection,就会发现epoll的效率大大高于select/poll。

参考

用户空间与内核空间,进程上下文与中断上下文[总结]
进程切换
维基百科-文件描述符
Linux 中直接 I/O 机制的介绍
IO - 同步,异步,阻塞,非阻塞 (亡羊补牢篇)
Linux中select poll和epoll的区别
IO多路复用之select总结
IO多路复用之poll总结
IO多路复用之epoll总结

查看原文

赞 608 收藏 1059 评论 64

认证与成就

  • 获得 15 次点赞
  • 获得 20 枚徽章 获得 0 枚金徽章, 获得 5 枚银徽章, 获得 15 枚铜徽章

擅长技能
编辑

开源项目 & 著作
编辑

(゚∀゚ )
暂时没有

注册于 2016-06-08
个人主页被 874 人浏览