Yourtion

Yourtion 查看完整档案

深圳编辑  |  填写毕业院校  |  填写所在公司/组织 blog.yourtion.com 编辑
编辑
_ | |__ _ _ __ _ | '_ \| | | |/ _` | | |_) | |_| | (_| | |_.__/ \__,_|\__, | |___/ 该用户太懒什么也没留下

个人动态

Yourtion 收藏了文章 · 10月18日

如何设计一个牛逼的API接口

在日常开发中,总会接触到各种接口。前后端数据传输接口,第三方业务平台接口。一个平台的前后端数据传输接口一般都会在内网环境下通信,而且会使用安全框架,所以安全性可以得到很好的保护。这篇文章重点讨论一下提供给第三方平台的业务接口应当如何设计?我们应该考虑哪些问题?

主要从以上三个方面来设计一个安全的API接口。

一 安全性问题

安全性问题是一个接口必须要保证的规范。如果接口保证不了安全性,那么你的接口相当于直接暴露在公网环境中任人蹂躏。

1.1 调用接口的先决条件-token

获取token一般会涉及到几个参数appidappkeytimestampnoncesign。我们通过以上几个参数来获取调用系统的凭证。

appidappkey可以直接通过平台线上申请,也可以线下直接颁发。appid是全局唯一的,每个appid将对应一个客户,appkey需要高度保密。

timestamp是时间戳,使用系统当前的unix时间戳。时间戳的目的就是为了减轻DOS攻击。防止请求被拦截后一直尝试请求接口。服务器端设置时间戳阀值,如果请求时间戳和服务器时间超过阀值,则响应失败。

nonce是随机值。随机值主要是为了增加sign的多变性,也可以保护接口的幂等性,相邻的两次请求nonce不允许重复,如果重复则认为是重复提交,响应失败。

sign是参数签名,将appkeytimestampnonce拼接起来进行md5加密(当然使用其他方式进行不可逆加密也没问题)。

token,使用参数appidtimestampnoncesign来获取token,作为系统调用的唯一凭证。token可以设置一次有效(这样安全性更高),也可以设置时效性,这里推荐设置时效性。如果一次有效的话这个接口的请求频率可能会很高。token推荐加到请求头上,这样可以跟业务参数完全区分开来。

1.2 使用POST作为接口请求方式

一般调用接口最常用的两种方式就是GET和POST。两者的区别也很明显,GET请求会将参数暴露在浏览器URL中,而且对长度也有限制。为了更高的安全性,所有接口都采用POST方式请求。

1.3 客户端IP白名单

ip白名单是指将接口的访问权限对部分ip进行开放。这样就能避免其他ip进行访问攻击,设置ip白名单比较麻烦的一点就是当你的客户端进行迁移后,就需要重新联系服务提供者添加新的ip白名单。设置ip白名单的方式很多,除了传统的防火墙之外,spring cloud alibaba提供的组件sentinel也支持白名单设置。为了降低api的复杂度,推荐使用防火墙规则进行白名单设置。

1.4 单个接口针对ip限流

限流是为了更好的维护系统稳定性。使用redis进行接口调用次数统计,ip+接口地址作为key,访问次数作为value,每次请求value+1,设置过期时长来限制接口的调用频率。

1.5 记录接口请求日志

使用aop全局记录请求日志,快速定位异常请求位置,排查问题原因。

1.6 敏感数据脱敏

在接口调用过程中,可能会涉及到订单号等敏感数据,这类数据通常需要脱敏处理,最常用的方式就是加密。加密方式使用安全性比较高的RSA非对称加密。非对称加密算法有两个密钥,这两个密钥完全不同但又完全匹配。只有使用匹配的一对公钥和私钥,才能完成对明文的加密和解密过程。

二 幂等性问题

幂等性是指任意多次请求的执行结果和一次请求的执行结果所产生的影响相同。说的直白一点就是查询操作无论查询多少次都不会影响数据本身,因此查询操作本身就是幂等的。但是新增操作,每执行一次数据库就会发生变化,所以它是非幂等的。

幂等问题的解决有很多思路,这里讲一种比较严谨的。提供一个生成随机数的接口,随机数全局唯一。调用接口的时候带入随机数。第一次调用,业务处理成功后,将随机数作为key,操作结果作为value,存入redis,同时设置过期时长。第二次调用,查询redis,如果key存在,则证明是重复提交,直接返回错误。

三 数据规范问题

3.1 版本控制

一套成熟的API文档,一旦发布是不允许随意修改接口的。这时候如果想新增或者修改接口,就需要加入版本控制,版本号可以是整数类型,也可以是浮点数类型。一般接口地址都会带上版本号,http://ip:port//v1/list。

3.2 响应状态码规范

一个牛逼的API,还需要提供简单明了的响应值,根据状态码就可以大概知道问题所在。我们采用http的状态码进行数据封装,例如200表示请求成功,4xx表示客户端错误,5xx表示服务器内部发生错误。状态码设计参考如下:

分类描述
1xx信息,服务器收到请求,需要请求者继续执行操作
2xx成功
3xx重定向,需要进一步的操作以完成请求
4xx客户端错误,请求包含语法错误或无法完成请求
5xx服务端错误

状态码枚举类:

public enum CodeEnum {

    // 根据业务需求进行添加
    SUCCESS(200,"处理成功"),
    ERROR_PATH(404,"请求地址错误"),
    ERROR_SERVER(505,"服务器内部发生错误");
    
    private int code;
    private String message;
    
    CodeEnum(int code, String message) {
        this.code = code;
        this.message = message;
    }

    public int getCode() {
        return code;
    }

    public void setCode(int code) {
        this.code = code;
    }

    public String getMessage() {
        return message;
    }

    public void setMessage(String message) {
        this.message = message;
    }
}

3.3 统一响应数据格式

为了方便给客户端响应,响应数据会包含三个属性,状态码(code),信息描述(message),响应数据(data)。客户端根据状态码及信息描述可快速知道接口,如果状态码返回成功,再开始处理数据。

响应结果定义及常用方法:

public class R implements Serializable {

    private static final long serialVersionUID = 793034041048451317L;

    private int code;
    private String message;
    private Object data = null;

    public int getCode() {
        return code;
    }
    public void setCode(int code) {
        this.code = code;
    }

    public String getMessage() {
        return message;
    }
    public void setMessage(String message) {
        this.message = message;
    }

    public Object getData() {
        return data;
    }

    /**
     * 放入响应枚举
     */
    public R fillCode(CodeEnum codeEnum){
        this.setCode(codeEnum.getCode());
        this.setMessage(codeEnum.getMessage());
        return this;
    }

    /**
     * 放入响应码及信息
     */
    public R fillCode(int code, String message){
        this.setCode(code);
        this.setMessage(message);
        return this;
    }

    /**
     * 处理成功,放入自定义业务数据集合
     */
    public R fillData(Object data) {
        this.setCode(CodeEnum.SUCCESS.getCode());
        this.setMessage(CodeEnum.SUCCESS.getMessage());
        this.data = data;
        return this;
    }
}

总结

本篇文章从安全性、幂等性、数据规范等方面讨论了API设计规范。除此之外,一个好的API还少不了一个优秀的接口文档。接口文档的可读性非常重要,虽然很多程序员都不喜欢写文档,而且不喜欢别人不写文档。为了不增加程序员的压力,推荐使用swagger或其他接口管理工具,通过简单配置,就可以在开发中测试接口的连通性,上线后也可以生成离线文档用于管理API。

点关注、不迷路

如果觉得文章不错,欢迎关注点赞收藏,你们的支持是我创作的动力,感谢大家。

如果文章写的有问题,请不要吝啬,欢迎留言指出,我会及时核查修改。

如果你还想更加深入的了解我,可以微信搜索「Java旅途」进行关注。回复「1024」即可获得学习视频及精美电子书。每天7:30准时推送技术文章,让你的上班路不在孤独,而且每月还有送书活动,助你提升硬实力!

查看原文

Yourtion 收藏了文章 · 10月17日

kafka核心原理的秘密,藏在这16张图里

文章首发公众号:码哥字节(ID:MageByte)

Kafka 是一个优秀的分布式消息中间件,许多系统中都会使用到 Kafka 来做消息通信。对分布式消息系统的了解和使用几乎成为一个后台开发人员必备的技能。今天码哥字节就从常见的 Kafka 面试题入手,和大家聊聊 Kafka 的那些事儿。

思维导图

讲一讲分布式消息中间件

问题

  • 什么是分布式消息中间件?
  • 消息中间件的作用是什么?
  • 消息中间件的使用场景是什么?
  • 消息中间件选型?

消息队列

分布式消息是一种通信机制,和 RPC、HTTP、RMI 等不一样,消息中间件采用分布式中间代理的方式进行通信。如图所示,采用了消息中间件之后,上游业务系统发送消息,先存储在消息中间件,然后由消息中间件将消息分发到对应的业务模块应用(分布式生产者 - 消费者模式)。这种异步的方式,减少了服务之间的耦合程度。

架构

定义消息中间件:

  • 利用高效可靠的消息传递机制进行平台无关的数据交流
  • 基于数据通信,来进行分布式系统的集成
  • 通过提供消息传递和消息排队模型,可以在分布式环境下扩展进程间的通信

在系统架构中引用额外的组件,必然提高系统的架构复杂度和运维的难度,那么在系统中使用分布式消息中间件有什么优势呢?消息中间件在系统中起的作用又是什么呢?

  • 解耦
  • 冗余(存储)
  • 扩展性
  • 削峰
  • 可恢复性
  • 顺序保证
  • 缓冲
  • 异步通信

面试时,面试官经常会关心面试者对开源组件的选型能力,这既可以考验面试者知识的广度,也可以考验面试者对某类系统的知识的认识深度,而且也可以看出面试者对系统整体把握和系统架构设计的能力。开源分布式消息系统有很多,不同的消息系统的特性也不一样,选择怎样的消息系统,不仅需要对各消息系统有一定的了解,也需要对自身系统需求有清晰的认识。

下面是常见的几种分布式消息系统的对比:

选择

答案关键字

  • 什么是分布式消息中间件?通信,队列,分布式,生产消费者模式。
  • 消息中间件的作用是什么? 解耦、峰值处理、异步通信、缓冲。
  • 消息中间件的使用场景是什么? 异步通信,消息存储处理。
  • 消息中间件选型?语言,协议、HA、数据可靠性、性能、事务、生态、简易、推拉模式。

Kafka 基本概念和架构

问题

  • 简单讲下 Kafka 的架构?
  • Kafka 是推模式还是拉模式,推拉的区别是什么?
  • Kafka 如何广播消息?
  • Kafka 的消息是否是有序的?
  • Kafka 是否支持读写分离?
  • Kafka 如何保证数据高可用?
  • Kafka 中 zookeeper 的作用?
  • 是否支持事务?
  • 分区数是否可以减少?

Kafka 架构中的一般概念:

架构

  • Producer:生产者,也就是发送消息的一方。生产者负责创建消息,然后将其发送到 Kafka。
  • Consumer:消费者,也就是接受消息的一方。消费者连接到 Kafka 上并接收消息,进而进行相应的业务逻辑处理。
  • Consumer Group:一个消费者组可以包含一个或多个消费者。使用多分区 + 多消费者方式可以极大提高数据下游的处理速度,同一消费组中的消费者不会重复消费消息,同样的,不同消费组中的消费者消息消息时互不影响。Kafka 就是通过消费组的方式来实现消息 P2P 模式和广播模式。
  • Broker:服务代理节点。Broker 是 Kafka 的服务节点,即 Kafka 的服务器。
  • Topic:Kafka 中的消息以 Topic 为单位进行划分,生产者将消息发送到特定的 Topic,而消费者负责订阅 Topic 的消息并进行消费。
  • Partition:Topic 是一个逻辑的概念,它可以细分为多个分区,每个分区只属于单个主题。同一个主题下不同分区包含的消息是不同的,分区在存储层面可以看作一个可追加的日志(Log)文件,消息在被追加到分区日志文件的时候都会分配一个特定的偏移量(offset)。
  • Offset:offset 是消息在分区中的唯一标识,Kafka 通过它来保证消息在分区内的顺序性,不过 offset 并不跨越分区,也就是说,Kafka 保证的是分区有序性而不是主题有序性。
  • Replication:副本,是 Kafka 保证数据高可用的方式,Kafka 同一 Partition 的数据可以在多 Broker 上存在多个副本,通常只有主副本对外提供读写服务,当主副本所在 broker 崩溃或发生网络一场,Kafka 会在 Controller 的管理下会重新选择新的 Leader 副本对外提供读写服务。
  • Record: 实际写入 Kafka 中并可以被读取的消息记录。每个 record 包含了 key、value 和 timestamp。

Kafka Topic Partitions Layout

主题

Kafka 将 Topic 进行分区,分区可以并发读写。

Kafka Consumer Offset

consumer offset

zookeeper

zookeeper

  • Broker 注册:Broker 是分布式部署并且之间相互独立,Zookeeper 用来管理注册到集群的所有 Broker 节点。
  • Topic 注册: 在 Kafka 中,同一个 Topic 的消息会被分成多个分区并将其分布在多个 Broker 上,这些分区信息及与 Broker 的对应关系也都是由 Zookeeper 在维护
  • 生产者负载均衡:由于同一个 Topic 消息会被分区并将其分布在多个 Broker 上,因此,生产者需要将消息合理地发送到这些分布式的 Broker 上。
  • 消费者负载均衡:与生产者类似,Kafka 中的消费者同样需要进行负载均衡来实现多个消费者合理地从对应的 Broker 服务器上接收消息,每个消费者分组包含若干消费者,每条消息都只会发送给分组中的一个消费者,不同的消费者分组消费自己特定的 Topic 下面的消息,互不干扰。

答案关键字

  • 简单讲下 Kafka 的架构?

    Producer、Consumer、Consumer Group、Topic、Partition
  • Kafka 是推模式还是拉模式,推拉的区别是什么?

    Kafka Producer 向 Broker 发送消息使用 Push 模式,Consumer 消费采用的 Pull 模式。拉取模式,让 consumer 自己管理 offset,可以提供读取性能
  • Kafka 如何广播消息?

    Consumer group
  • Kafka 的消息是否是有序的?

    Topic 级别无序,Partition 有序
  • Kafka 是否支持读写分离?

    不支持,只有 Leader 对外提供读写服务
  • Kafka 如何保证数据高可用?

    副本,ack,HW
  • Kafka 中 zookeeper 的作用?

    集群管理,元数据管理
  • 是否支持事务?

    0.11 后支持事务,可以实现”exactly once“
  • 分区数是否可以减少?

    不可以,会丢失数据

Kafka 使用

问题

  • Kafka 有哪些命令行工具?你用过哪些?
  • Kafka Producer 的执行过程?
  • Kafka Producer 有哪些常见配置?
  • 如何让 Kafka 的消息有序?
  • Producer 如何保证数据发送不丢失?
  • 如何提升 Producer 的性能?
  • 如果同一 group 下 consumer 的数量大于 part 的数量,kafka 如何处理?
  • Kafka Consumer 是否是线程安全的?
  • 讲一下你使用 Kafka Consumer 消费消息时的线程模型,为何如此设计?
  • Kafka Consumer 的常见配置?
  • Consumer 什么时候会被踢出集群?
  • 当有 Consumer 加入或退出时,Kafka 会作何反应?
  • 什么是 Rebalance,何时会发生 Rebalance?

命令行工具

Kafka 的命令行工具在 Kafka 包的/bin目录下,主要包括服务和集群管理脚本,配置脚本,信息查看脚本,Topic 脚本,客户端脚本等。

  • kafka-configs.sh: 配置管理脚本
  • kafka-console-consumer.sh: kafka 消费者控制台
  • kafka-console-producer.sh: kafka 生产者控制台
  • kafka-consumer-groups.sh: kafka 消费者组相关信息
  • kafka-delete-records.sh: 删除低水位的日志文件
  • kafka-log-dirs.sh:kafka 消息日志目录信息
  • kafka-mirror-maker.sh: 不同数据中心 kafka 集群复制工具
  • kafka-preferred-replica-election.sh: 触发 preferred replica 选举
  • kafka-producer-perf-test.sh:kafka 生产者性能测试脚本
  • kafka-reassign-partitions.sh: 分区重分配脚本
  • kafka-replica-verification.sh: 复制进度验证脚本
  • kafka-server-start.sh: 启动 kafka 服务
  • kafka-server-stop.sh: 停止 kafka 服务
  • kafka-topics.sh:topic 管理脚本
  • kafka-verifiable-consumer.sh: 可检验的 kafka 消费者
  • kafka-verifiable-producer.sh: 可检验的 kafka 生产者
  • zookeeper-server-start.sh: 启动 zk 服务
  • zookeeper-server-stop.sh: 停止 zk 服务
  • zookeeper-shell.sh:zk 客户端

我们通常可以使用kafka-console-consumer.shkafka-console-producer.sh脚本来测试 Kafka 生产和消费,kafka-consumer-groups.sh可以查看和管理集群中的 Topic,kafka-topics.sh通常用于查看 Kafka 的消费组情况。

Kafka Producer

Kafka producer 的正常生产逻辑包含以下几个步骤:

  1. 配置生产者客户端参数常见生产者实例。
  2. 构建待发送的消息。
  3. 发送消息。
  4. 关闭生产者实例。

Producer 发送消息的过程如下图所示,需要经过拦截器序列化器分区器,最终由累加器批量发送至 Broker。

producer

Kafka Producer 需要以下必要参数:

  • bootstrap.server: 指定 Kafka 的 Broker 的地址
  • key.serializer: key 序列化器
  • value.serializer: value 序列化器

常见参数:

  • batch.num.messages

    默认值:200,每次批量消息的数量,只对 asyc 起作用。
  • request.required.acks

    默认值:0,0 表示 producer 毋须等待 leader 的确认,1 代表需要 leader 确认写入它的本地 log 并立即确认,-1 代表所有的备份都完成后确认。 只对 async 模式起作用,这个参数的调整是数据不丢失和发送效率的 tradeoff,如果对数据丢失不敏感而在乎效率的场景可以考虑设置为 0,这样可以大大提高 producer 发送数据的效率。
  • request.timeout.ms

    默认值:10000,确认超时时间。
  • partitioner.class

    默认值:kafka.producer.DefaultPartitioner,必须实现 kafka.producer.Partitioner,根据 Key 提供一个分区策略。_有时候我们需要相同类型的消息必须顺序处理,这样我们就必须自定义分配策略,从而将相同类型的数据分配到同一个分区中。_
  • producer.type

    默认值:sync,指定消息发送是同步还是异步。异步 asyc 成批发送用 kafka.producer.AyncProducer, 同步 sync 用 kafka.producer.SyncProducer。同步和异步发送也会影响消息生产的效率。
  • compression.topic

    默认值:none,消息压缩,默认不压缩。其余压缩方式还有,"gzip"、"snappy"和"lz4"。对消息的压缩可以极大地减少网络传输量、降低网络 IO,从而提高整体性能。
  • compressed.topics

    默认值:null,在设置了压缩的情况下,可以指定特定的 topic 压缩,未指定则全部压缩。
  • message.send.max.retries

    默认值:3,消息发送最大尝试次数。
  • retry.backoff.ms

    默认值:300,每次尝试增加的额外的间隔时间。
  • topic.metadata.refresh.interval.ms

    默认值:600000,定期的获取元数据的时间。当分区丢失,leader 不可用时 producer 也会主动获取元数据,如果为 0,则每次发送完消息就获取元数据,不推荐。如果为负值,则只有在失败的情况下获取元数据。
  • queue.buffering.max.ms

    默认值:5000,在 producer queue 的缓存的数据最大时间,仅仅 for asyc。
  • queue.buffering.max.message

    默认值:10000,producer 缓存的消息的最大数量,仅仅 for asyc。
  • queue.enqueue.timeout.ms

    默认值:-1,0 当 queue 满时丢掉,负值是 queue 满时 block, 正值是 queue 满时 block 相应的时间,仅仅 for asyc。

Kafka Consumer

Kafka 有消费组的概念,每个消费者只能消费所分配到的分区的消息,每一个分区只能被一个消费组中的一个消费者所消费,所以同一个消费组中消费者的数量如果超过了分区的数量,将会出现有些消费者分配不到消费的分区。消费组与消费者关系如下图所示:

consumer group

Kafka Consumer Client 消费消息通常包含以下步骤:

  1. 配置客户端,创建消费者
  2. 订阅主题
  3. 拉去消息并消费
  4. 提交消费位移
  5. 关闭消费者实例

过程

因为 Kafka 的 Consumer 客户端是线程不安全的,为了保证线程安全,并提升消费性能,可以在 Consumer 端采用类似 Reactor 的线程模型来消费数据。

消费模型

Kafka consumer 参数

  • bootstrap.servers: 连接 broker 地址,host:port 格式。
  • group.id: 消费者隶属的消费组。
  • key.deserializer: 与生产者的key.serializer对应,key 的反序列化方式。
  • value.deserializer: 与生产者的value.serializer对应,value 的反序列化方式。
  • session.timeout.ms: coordinator 检测失败的时间。默认 10s 该参数是 Consumer Group 主动检测 (组内成员 comsummer) 崩溃的时间间隔,类似于心跳过期时间。
  • auto.offset.reset: 该属性指定了消费者在读取一个没有偏移量后者偏移量无效(消费者长时间失效当前的偏移量已经过时并且被删除了)的分区的情况下,应该作何处理,默认值是 latest,也就是从最新记录读取数据(消费者启动之后生成的记录),另一个值是 earliest,意思是在偏移量无效的情况下,消费者从起始位置开始读取数据。
  • enable.auto.commit: 否自动提交位移,如果为false,则需要在程序中手动提交位移。对于精确到一次的语义,最好手动提交位移
  • fetch.max.bytes: 单次拉取数据的最大字节数量
  • max.poll.records: 单次 poll 调用返回的最大消息数,如果处理逻辑很轻量,可以适当提高该值。 但是max.poll.records条数据需要在在 session.timeout.ms 这个时间内处理完 。默认值为 500
  • request.timeout.ms: 一次请求响应的最长等待时间。如果在超时时间内未得到响应,kafka 要么重发这条消息,要么超过重试次数的情况下直接置为失败。

Kafka Rebalance

rebalance 本质上是一种协议,规定了一个 consumer group 下的所有 consumer 如何达成一致来分配订阅 topic 的每个分区。比如某个 group 下有 20 个 consumer,它订阅了一个具有 100 个分区的 topic。正常情况下,Kafka 平均会为每个 consumer 分配 5 个分区。这个分配的过程就叫 rebalance。

什么时候 rebalance?

这也是经常被提及的一个问题。rebalance 的触发条件有三种:

  • 组成员发生变更(新 consumer 加入组、已有 consumer 主动离开组或已有 consumer 崩溃了——这两者的区别后面会谈到)
  • 订阅主题数发生变更
  • 订阅主题的分区数发生变更

如何进行组内分区分配?

Kafka 默认提供了两种分配策略:Range 和 Round-Robin。当然 Kafka 采用了可插拔式的分配策略,你可以创建自己的分配器以实现不同的分配策略。

答案关键字

  • Kafka 有哪些命令行工具?你用过哪些?/bin目录,管理 kafka 集群、管理 topic、生产和消费 kafka
  • Kafka Producer 的执行过程?拦截器,序列化器,分区器和累加器
  • Kafka Producer 有哪些常见配置?broker 配置,ack 配置,网络和发送参数,压缩参数,ack 参数
  • 如何让 Kafka 的消息有序?Kafka 在 Topic 级别本身是无序的,只有 partition 上才有序,所以为了保证处理顺序,可以自定义分区器,将需顺序处理的数据发送到同一个 partition
  • Producer 如何保证数据发送不丢失?ack 机制,重试机制
  • 如何提升 Producer 的性能?批量,异步,压缩
  • 如果同一 group 下 consumer 的数量大于 part 的数量,kafka 如何处理?多余的 Part 将处于无用状态,不消费数据
  • Kafka Consumer 是否是线程安全的?不安全,单线程消费,多线程处理
  • 讲一下你使用 Kafka Consumer 消费消息时的线程模型,为何如此设计?拉取和处理分离
  • Kafka Consumer 的常见配置?broker, 网络和拉取参数,心跳参数
  • Consumer 什么时候会被踢出集群?奔溃,网络异常,处理时间过长提交位移超时
  • 当有 Consumer 加入或退出时,Kafka 会作何反应?进行 Rebalance
  • 什么是 Rebalance,何时会发生 Rebalance?topic 变化,consumer 变化

高可用和性能

问题

  • Kafka 如何保证高可用?
  • Kafka 的交付语义?
  • Replic 的作用?
  • 什么事 AR,ISR?
  • Leader 和 Flower 是什么?
  • Kafka 中的 HW、LEO、LSO、LW 等分别代表什么?
  • Kafka 为保证优越的性能做了哪些处理?

分区与副本

分区副本

在分布式数据系统中,通常使用分区来提高系统的处理能力,通过副本来保证数据的高可用性。多分区意味着并发处理的能力,这多个副本中,只有一个是 leader,而其他的都是 follower 副本。仅有 leader 副本可以对外提供服务。 多个 follower 副本通常存放在和 leader 副本不同的 broker 中。通过这样的机制实现了高可用,当某台机器挂掉后,其他 follower 副本也能迅速”转正“,开始对外提供服务。

为什么 follower 副本不提供读服务?

这个问题本质上是对性能和一致性的取舍。试想一下,如果 follower 副本也对外提供服务那会怎么样呢?首先,性能是肯定会有所提升的。但同时,会出现一系列问题。类似数据库事务中的幻读,脏读。 比如你现在写入一条数据到 kafka 主题 a,消费者 b 从主题 a 消费数据,却发现消费不到,因为消费者 b 去读取的那个分区副本中,最新消息还没写入。而这个时候,另一个消费者 c 却可以消费到最新那条数据,因为它消费了 leader 副本。Kafka 通过 WH 和 Offset 的管理来决定 Consumer 可以消费哪些数据,已经当前写入的数据。

watermark

只有 Leader 可以对外提供读服务,那如何选举 Leader

kafka 会将与 leader 副本保持同步的副本放到 ISR 副本集合中。当然,leader 副本是一直存在于 ISR 副本集合中的,在某些特殊情况下,ISR 副本中甚至只有 leader 一个副本。 当 leader 挂掉时,kakfa 通过 zookeeper 感知到这一情况,在 ISR 副本中选取新的副本成为 leader,对外提供服务。 但这样还有一个问题,前面提到过,有可能 ISR 副本集合中,只有 leader,当 leader 副本挂掉后,ISR 集合就为空,这时候怎么办呢?这时候如果设置 unclean.leader.election.enable 参数为 true,那么 kafka 会在非同步,也就是不在 ISR 副本集合中的副本中,选取出副本成为 leader。

副本的存在就会出现副本同步问题

Kafka 在所有分配的副本 (AR) 中维护一个可用的副本列表 (ISR),Producer 向 Broker 发送消息时会根据ack配置来确定需要等待几个副本已经同步了消息才相应成功,Broker 内部会ReplicaManager服务来管理 flower 与 leader 之间的数据同步。

sync

性能优化

  • partition 并发
  • 顺序读写磁盘
  • page cache:按页读写
  • 预读:Kafka 会将将要消费的消息提前读入内存
  • 高性能序列化(二进制)
  • 内存映射
  • 无锁 offset 管理:提高并发能力
  • Java NIO 模型
  • 批量:批量读写
  • 压缩:消息压缩,存储压缩,减小网络和 IO 开销

Partition 并发

一方面,由于不同 Partition 可位于不同机器,因此可以充分利用集群优势,实现机器间的并行处理。另一方面,由于 Partition 在物理上对应一个文件夹,即使多个 Partition 位于同一个节点,也可通过配置让同一节点上的不同 Partition 置于不同的 disk drive 上,从而实现磁盘间的并行处理,充分发挥多磁盘的优势。

顺序读写

Kafka 每一个 partition 目录下的文件被平均切割成大小相等(默认一个文件是 500 兆,可以手动去设置)的数据文件,
每一个数据文件都被称为一个段(segment file), 每个 segment 都采用 append 的方式追加数据。

追加数据

答案关键字

  • Kafka 如何保证高可用?

    通过副本来保证数据的高可用,producer ack、重试、自动 Leader 选举,Consumer 自平衡
  • Kafka 的交付语义?

    交付语义一般有at least onceat most onceexactly once。kafka 通过 ack 的配置来实现前两种。
  • Replic 的作用?

    实现数据的高可用
  • 什么是 AR,ISR?

    AR:Assigned Replicas。AR 是主题被创建后,分区创建时被分配的副本集合,副本个 数由副本因子决定。
    ISR:In-Sync Replicas。Kafka 中特别重要的概念,指代的是 AR 中那些与 Leader 保 持同步的副本集合。在 AR 中的副本可能不在 ISR 中,但 Leader 副本天然就包含在 ISR 中。关于 ISR,还有一个常见的面试题目是如何判断副本是否应该属于 ISR。目前的判断 依据是:Follower 副本的 LEO 落后 Leader LEO 的时间,是否超过了 Broker 端参数 replica.lag.time.max.ms 值。如果超过了,副本就会被从 ISR 中移除。
  • Leader 和 Flower 是什么?
  • Kafka 中的 HW 代表什么?

    高水位值 (High watermark)。这是控制消费者可读取消息范围的重要字段。一 个普通消费者只能“看到”Leader 副本上介于 Log Start Offset 和 HW(不含)之间的 所有消息。水位以上的消息是对消费者不可见的。
  • Kafka 为保证优越的性能做了哪些处理?

    partition 并发、顺序读写磁盘、page cache 压缩、高性能序列化(二进制)、内存映射 无锁 offset 管理、Java NIO 模型

本文并没有深入 Kafka 的实现细节和源码分析,但 Kafka 确实是一个 优秀的开源系统,很多优雅的架构设计和源码设计都值得我们学习,十分建议感兴趣的同学更加深入的去了解一下这个开源系统,对于自身架构设计能力,编码能力,性能优化都会有很大的帮助。

推荐阅读

以下几篇文章阅读量与读者反馈都很好,推荐大家阅读:

MageByte

查看原文

Yourtion 收藏了文章 · 9月22日

王者荣耀背后的实时大数据平台用了什么黑科技?

大家好我是许振文,今天分享的主题是《基于 Flink+ServiceMesh 的腾讯游戏大数据服务应用实践》,内容主要分为以下四个部分:

  1. 背景和解决框架介绍
  2. 实时大数据计算 OneData
  3. 数据接口服务 OneFun
  4. 微服务化& ServiceMesh

一、背景和解决框架介绍

1、离线数据运营和实时数据运营

首先介绍下背景,我们做游戏数据运营的时间是比较久的了,在 13 年的时候就已经在做游戏离线数据分析,并且能把数据运用到游戏的运营活动中去。

但那时候的数据有一个缺陷,就是大部分都是离线数据,比如今天产生的数据我们算完后,第二天才会把这个数据推到线上。所以数据的实时性,和对游戏用户的实时干预、实时运营效果就会非常不好。尤其是比如我今天中的奖,明天才能拿到礼包,这点是玩家很不爽的。

现在提倡的是:“我看到的就是我想要的”或者“我想要的我立马就要”,所以我们从 16 年开始,整个游戏数据逐渐从离线运营转到实时运营,但同时我们在做的过程中,离线数据肯定少不了,因为离线的一些计算、累计值、数据校准都是非常有价值的。

实时方面主要是补足我们对游戏运营的体验,比如说在游戏里玩完一局或者做完一个任务后,立马就能得到相应的奖励,或者下一步的玩法指引。对用户来说,这种及时的刺激和干预,对于他们玩游戏的体验会更好。

其实不单单是游戏,其他方面也是一样的,所以我们在做这套系统的时候,就是离线+实时结合着用,但主要还是往实时方面去靠拢,未来大数据的方向也是,尽量会往实时方向去走。

2、应用场景

■ 1)游戏内任务系统

这个场景给大家介绍一下,是游戏内的任务系统,大家都应该看过。比如第一个是吃鸡里的,每日完成几局?分享没有?还有其他一些活动都会做简历,但这种简历我们现在都是实时的,尤其是需要全盘计算或者分享到其他社区里的。以前我们在做数据运营的时候,都是任务做完回去计算,第二天才会发到奖励,而现在所有任务都可以做到实时干预。

游戏的任务系统是游戏中特别重要的环节,大家不要认为任务系统就是让大家完成任务,收大家钱,其实任务系统给了玩家很好的指引,让玩家在游戏中可以得到更好的游戏体验。

■ 2)实时排行版

还有一个很重要的应用场景就是游戏内的排行榜,比如说王者荣耀里要上星耀、王者,其实都是用排行榜的方式。但我们这个排行榜可能会更具体一些,比如说是今天的战力排行榜,或者今天的对局排行榜,这些都是全局计算的实时排行榜。而且我们有快照的功能,比如 0 点 00 分 的时候有一个快照,就能立马给快照里的玩家发奖励。

这些是实时计算的典型应用案例,一个任务系统一个排行榜,其他的我们后面还会慢慢介绍。

3、游戏对数据的需求

再说一下为什么会有这样一个平台,其实我们最初在做数据运营的时候,是筒仓式或者手工作坊式的开发。当接到一个需求后,我们会做一个资源的评审、数据接入、大数据的编码,编码和数据开发完后,还要做线上资源的申请、发布、验证,再去开发大数据计算完成后的服务接口,然后再开发页面和线上的系统,这些都完了后再发到线上去,做线上监控,最后会有一个资源回收。

其实这种方式在很早期的时候是没有问题的,那为什么说现在不适应了?主要还是流程太长了。我们现在对游戏运营的要求非常高,比如说我们会接入数据挖掘的能力,大数据实时计算完成之后,我们还要把实时的用户画像,离线画像进行综合,接着推荐给他这个人适合哪些任务,然后指引去完成。

这种情况下,原来的做法门槛就比较高了,每一个都要单独去做,而且成本高效率低,在数据的复用性上也比较差,容易出错,而且没有办法沉淀。每一个做完之后代码回收就扔到一块,最多下次做的时候,想起来我有这个代码了可以稍微借鉴一下,但这种借鉴基本上也都是一种手工的方式。

所以我们希望能有一个平台化的方式,从项目的创建、资源分配、服务开发、在线测试、独立部署、服务上线、线上监控、效果分析、资源回收、项目结项整个综合成一站式的服务。

其实这块我们是借鉴 DevOps 的思路,就是你的开发和运营应该是一个人就可以独立完成的,有这样一个系统能够去支撑这件事。当一个服务在平台上呈现出来的时候,有可能会复用到计算的数据,比说实时的登录次数或击杀数,那这个指标在后面的服务中就可以共用。

而且有了这样一个平台之后,开发者只需主要关注他的开发逻辑就行了,其余两条运维发布和线上运营都由平台来保证。所以我们希望有一个平台化的方式,把数据计算和接口服务统一起来,通过数据的标准化和数据字典的统一,能够形成上面不同的数据应用,这个是我们的第一个目标。

其实我们现在都是这种方式了,第一是要在 DevOps 的指导思想下去做,尤其是腾讯去做的时候数据服务的量是非常大的,比如我们去年一共做了 5、6 万的营销服务,在这种情况下如果没有平台支撑,没有平台去治理和管理这些服务,单靠人的话成本非常大。

4、思路

3 个现代化,大数据应用的 DevOps。

我们的思路也是这样,三个现代化,而且把大数据应用的 DevOps 思路实现起来。

  • 规范化:流程规范、数据开发规范和开发框架;
  • 自动化:资源分配、发布上线、监控部署(这是 DevOps 里不可缺少的);
  • 一体化:数据开发、数据接口开发、测试发布、运维监控。

所以我们针对大数据的应用系统,会把它拆成这样三块,一个是大数据的开发,另外一个是数据服务接口的开发,当然接口后面就是一些页面和客户端,这些完了后这些开发还要有一个完整的开发流程支持。

这样我们就能够为各种数据应用场景提供一站式的数据开发及应用解决服务、统一的活动管理、数据指标计算开发管理和各种数据应用接口自动化生产管理的一站式的服务。

这样的系统能保障这些的事情,而且我们这里也合理拆分,不要把大数据和接口混到一块去,一定要做解耦,这是一个非常关键的地方。

5、数据服务平台整体架构

■ 1)计算存储 

这个框架大家可以看一下,我认为可以借鉴,如果你内部要去做一个数据服务平台的话,基本上思路也是这样的,底层的 Iass 可以不用管,直接用腾讯云或者阿里云或者其他云上的服务就可以了。

我们主要是做上层这一块的东西,最下面的计算存储这个部分我们内部在做系统的时候也不是 care 的,这块最好是能承包出去。现在 Iass 发展到这个程度,这些东西在云上可以直接像 MySQL 数据库或者 Redis 数据库一样购买就行了,比如 Kafka、Pulsar、Flink、Storm。

存储这块我们内部的有 TRedis、TSpider,其实就是 Redis 和 MySQL 的升级版本。基础这块我建议大家如果自己构建的话,也不需要太过于关注。

■ 2)服务调度

系统核心主要是在中间的服务调度这个部分,它是统一的调度 API,就是上层的一些服务能发下来,然后去统一调度。另外一个就是流程的开发,我们有一个不可缺少的调度系统,这里我们使用的是 DAG 调度引擎,这样我们可以把离线任务、实时任务、实时+离线、离线+函数接口的服务能够组合起来,来完成更复杂实时数据应用场景。

比如我们现在做的实时排行榜,把实时计算任务下发到 Flink 后,同时会给 Flink 下发一个 URL,Flink 拿到 URL 后,它会把符合条件的数据都发送到 URL,这个 URL 其实就是函数服务,这些函数服务把数据,在 Redis 里做排序,最终生成一个排行榜。

再往下的这个调度器,你可以不断地去横向拓展,比如我可以做 Storm 的调度器、Flink 的调度器、Spark 的调度器等等一系列。在这块可以形成自己算法库,这个算法库可以根据场景去做,比如有些是 Flink 的 SQL 的分装,也就是把 SQL 传进来,它就能够计算和封装的 Jar 包。另外比如一些简单的数据出发、规则判断也可以去做,直接把算法库分装到这块就行。

其实这块和业务场景没有直接关系的,但算法库一定是和场景是有关系的,另外下层我们会有写文件通道,比如说一些 Jar 包的分发,这里腾讯用的是 COS,能够去做一些数据的传输和 Jar 包的提交。

还有一个命令管道,它主要针对机器,比如提交 Flink 任务的时候一定是通过命令管道,然后在一台机器去把 Jar 包拉下来,然后同时把任务提交到 Flink 集群里去。数据管道也是类似的一个作用。

■ 3)各种管理

另外还要将一个蛮重要的内容,右边绿色这块的运营监控、集群管理、系统管理(用户权限管理,业务管理,场景管理,菜单配置管理等等),还有消息中心、帮助文档,这些都是配套的,整个系统不可缺少的。

还有一部分是组件管理,包括大数据组件管理、函数管理、服务的二进制管理都可以在这里能够做统一的管理。

数据资产,比如我们通过 Flink 或者 Storm 能够生成的数据指标,它的计算逻辑的管理都在这里面,包括我们计算出来后,把这些指标打上标签或者划后,我们也作为数据资产。

还有一个最重要的是数据表的管理,我们无论是 Flink 或 Storm,它的计算最终的落地点一定是通过一个数据表能算出来的。其他都还好,数据报表,比如每天计算多少数据,成功计算多少,每天有多少任务在跑,新增多少任务,这些都在里面可以做,包括我们版本的发布变更。还有一个是外部管理端,这个根据业务场景去做就行了,等会演示我们管理端的时候大家就可以看到,其实我们的菜单相对来说比较简单,根据比如我们的数据接入,从源头把数据接入到 Kafka 或者 Pulsar 里去。然后数据指标基于接入的数据表,进行数据指标的计算,比如一些特性的 Jar 包,它是多张表的数据混合计算,或者是加上的表的混合计算,等等一系列通过硬场景做的一些分装。

我们最终把这些做完后,所有的大数据都是通过对外的服务 API 暴露出去的,比如最终游戏任务是否完成,用户 ID 过来后我们能看这个用户的任务是否完成,这样的一些应用场景可以直接使用 API 去操作。

这是整个流程,讲得比较细后面大家才会更清晰。

二、实时大数据计算 OneData

1、数据开发流程

这是我们整体的数据应用流程:

我们的 Game Server 先把数据上传到日志 Server(数据接入部分),日志 Server 再把数据转到 Kafka 或者 Pulsar,就是消息队列里。

接进来后是数据表,数据表是描述,基于描述的表去开发指标、数据。比如我们这里一共有三类,一类是 SQL,另外一类是我们已经分装好的框架,你可以自己去填充它的个性代码,然后就可以在线完成 Flink 程序的编写。

还有一种是自己全新的在本地把代码写好,再发到系统里去调测。之前说了在大数据计算和数据接口一定要做解耦,我们解耦的方式是存储,存储我们用 Redis。它这种做法是把 Redis 和 SSD 盘能够结合起来,然后再加上 RockDB,就是 Redis 里面它 hold 热点数据,同时它把这些数据都通过这个 RockDB 落地到 SSD 盘里去,所以它的读写性非常好,就是把整个磁盘作为数据库存储,而不像普通的 Redis 一样再大数据情况下智能把内存作为存储对象。

在大数据把数据计算存储进去后,后面的就简单了,我们提供查询的服务有两种,一种是计算的指标,点一下就可以生成接口,我们叫规则接口;然后我们另外一种,也提供特性化的存储到介质里,我可以自己去定义他的 SQL 或者查询方式,然后在数据进行加工处理,生成接口 。

还有一种方式,是我们在 Flink 和 Storm 直接把数据配置我们这边的一个函数接口,比如我刚才讲的排行榜的方式,就给一个接口,他直接在 Flink 这边处理完成之后,把数据吐到函数接口里面,函数接口对这个数据进行二次处理。

这个是整个处理方式,所以我们前面讲的就是,基于 Flink 和 Storm 构建一个全面的、托管的、可配置化的大数据处理服务。主要消费的是 Kafka 的数据,Pulsar 现在在少量的使用。

这样做就是我们把数据的开发门槛降低,不需要很多人懂 Flink 或者 Storm,他只要会 SQL 或者一些简单的逻辑函数编写,那就可以去完成大数据的开发。

2、数据计算统一

其实我们之前在做的时候,有一些优化的过程,原来每一个计算任务都是用 Jar 包去写,写完之后就是编辑、打包、开发、发布。后来我们划分了三种场景,一种是 SQL 化,就是一些我们能用 SQL 表示的我们就尽量分装成 SQL,然后有一个 Jar 包能去执行这个提交的 SQL 就可以了。

还有一种是在线的 WebIDE,是处理函数的逻辑,举例子 Storm 里可以把 blot 和 spout 暴露出来,你把这两函数写完后,再把并行度提交就可以运行。但这里我们具体实现的时候是基于 Flink 去做的。

另一个是场景化的配置,我们个性化的 Jar 包能够统一调度,根据调度逻辑去执行。

3、数据计算服务体系

这是我们整个 OneData 计算体系的过程,支持三种,一种的自研的 SQL,一种是 Flink SQL,还有是 Jar 包。

我们自研的 SQL 是怎么存储,最早是使用 Storm,但 StormSQL 的效率非常低,所以我们根据 SQL Parser 做的 SQL 的分装,我们对 SQL 自己进行解析,自己形成函数,在 SQL 提交之后,我们用这样的方式直接把它编译成 Java 的字节码,再把字节码扔到 Storm 里去计算。

Flink 这块我们也继承了这种方式,后面会讲一下两种方式有什么区别。其实我们自研 SQL 在灵活性上比 Flink SQL 要好一点。

这里是做平台化,不能说直接放一个 FlinkSQL 去跑,因为我们想要在里面统计整个业务逻辑的执行情况,比如 SQL 处理的数据量,正确的和错误的,包括一些衰减,都是要做统计。

这是基本的过程,完了后我们在上面形成的一些基本场景,比如实时统计的场景,PV,UV,用独立的 Jar 包去算就行了,配置一下表就可以去计算。另外实时指标的服务,比如杀人书,金币的积累数,游戏的场次,王者荣耀里下路走的次数,这种数据都可以作为实时指标。

还有一种是规则触发服务,表里的某个数据满足什么条件时,触发一个接口。还有通讯实时排行榜和一些定制化的服务。

■ 1)自研 SQL

接下来说我们自研 SQL 的过程,我们早期为了避免像 Hive 一样(函数栈调用),而我们自己通过 SQL Paser 的语法抽象后,把它生成一段函数,就不需要这么多的对账调用。

这个是函数生成过程,最终生成的就是这样一段代码,它去做计算逻辑,一个函数完成,不需要函数栈的调用,这样效率就会大大提升。我们原来单核跑八万,放在现在可以跑二十多万。

整个处理的时候,我们把 SQL 编译成字节码,Flink 消费了数据后,把数据转化成 SQL 能够执行的函数,就是 roll 的方式。然后把 Roll 整个数据传到 class 里去执行,最后输出。

这种场景适合于,比如 FlinkSQL 它有状态值,我们要统计某个最大值的话,要一直把用户的最大值 hold 到内存里去。而我们自研的 SQL 呢,自己写的函数,它把数据借助第三方存储,比如刚才说的 TRedis 存储。每次只需要读取和写入数据即可,不需要做过多的内存的 hold。

当前做到状态的实时落地,就算挂掉也能立马起来接着去执行,所以超过 10G、100G 的数据计算,都不成问题,但是 FlinkSQL 如果要算的话,它的状态值就一直要 hould 到内存里去了,而且挂掉后只能用它的 check point 去恢复。

所以这是这两种 SQL 的应用场景。

■ 2)SQL 化

另外 SQL 里我们还可以做些其他的事情。我们的数据是持久化保存在存储里的,那存储里如果是同一张表,同一个纬度,比如我们都是用 QQ,在这个纬度上我们配置了两个指标,那能不能一次算完?只消费一次把数据算完,然后存储一次。

其实这种在大数据计算里是很多的,目前在我们在做的平台化就可以,比如一个是计算登录次数,另一个是计算最高等级,这两个计算逻辑不一样,但是消费的数据表是一样的,然后聚合纬度也是一样的,聚合关键字也是一样。那这个数据就可以进行一次消费,同时把数据计算出来同时去落地,大大减少了存储和计算成本。

我们现在整个游戏里面有一万一千多个指标,就是计算出来的,存储的纬度有两千六百多,实际节省计算和存储约有 60%以上。

两个 SQL,甚至更多的 SQL,我们一张表算十几个指标很正常,原来要消费十几次现在只需要一次就可以算出来。而且这种情况对用户是无感知的。A 用户在这张表上配了指标是 A 纬度,B 用户在这张表上配了指标也是 A 纬度,那这两个用户的数据,我们在底层计算的时候就消费一次计算两次存储一次,最终拿到的数据也是一样的。

**■ 3)在线实时编程

**

  • 无需搭建本地开发环境;
  • 在线开发测试;
  • 严格的输入输出管理;
  • 标准化输入和输出;
  • 一站式开发测试发布监控。

再介绍下刚才提到的在线实时编程,其实很多时候对开发者来说,搭建一个本地的 Flink 集群做开发调测也是非常麻烦的,所以我们现在就是提供一种测试环境,上层的代码都是固定的,不能修改。比如数据已经消费过来了,进行数据的加工处理,最终往存储里去塞就可以了。

通过这种方式,我们可以对简单逻辑进行分装,需要函数代码,但比 SQL 复杂,比自动的 Jar 包开发要简单一些,可以在线写代码,写完代码直接提交和测试就能完成结果的输出。而且这种的好处是,数据的上报逻辑,数据的统计逻辑,我都在这里面分装好了。只要管业务逻辑的开发就好了。

4、Flink 特性应用

  • 时间特性:基于事件时间水印的监控,减少计算量,提高准确性;
  • 异步化 IO:提高吞吐量,确保顺序性和一致性。

我们最早在 Storm 里做的时候,数据产生的时间和数据进到消息队列的时间,都是通过这种消息里自带的时间戳,每一个消息都是要对比的。有了 Flink 之后,有了 watermark 这个机制之后,这一部分的计算就可以减少了。

实际测试下来的效果也是比较理想的,我们原来在 Storm 里单核计算,大概是以前的 QPS,加上读写和处理性能,单核五个线程的情况下。但是 Flink 的时候我们可以到一万,还加上 Redis 存储 IO 的开销。

另一个我们原来数据想要从 Redis 里取出来,再去算最大值最小值,完了算了再写到 Redis 里,这个都是同步去写的,但是同步 IO 有一个问题就是性能不高。所以我们现在在把它改成异步 IO,但是异步 IO 也有个特点就是整个一条数据的处理必须是同步的,必须先从 Redis 里把数据取出来,然后再把值计算完,再塞到里面去,保证塞完后再处理下一个统一的数据。

我们再做这样的一些优化。Flink 这里有些特性可以保证我们数据的一致性,而且提升效率。

5、统一大数据开发服务—服务案例

接着介绍下更多的案例,如果大家玩英雄联盟的话,那这个任务系统就是我们设计的,下次玩做这个任务的时候,你就可以想起我。还有天龙八部、CF、王者荣耀 LBS 荣耀战区(通过大数据实时计算+LBS 的数据排行)、王者荣耀的日常活动(实时数据+接口+规则计算)、有哪些好友是实时在线的,跟你匹配的。

三、数据接口服务 OneFun

1、数据应用的出口

下面介绍下函数,我们原来在做的时候也是存在着一些问题,把数据存到存储里面,如果存储直接开放出去,让别人任意去使用的话,其实对存储的压力和管理程度都是很有问题的。所以后来我们采用了一种类似于 Fass 的的解决方式。我们把存储在里面的元数据进行管理,完了之后接口再配置化的方式,你要使用我这个 DB,这个 DB 最大 QPS 多少,我就进行对比,允许你之后才能使用这个量。

比如我这个 DB 的最大 QPS 只有 10 万,你要申请 11 万,那我就给你申请不了,我就只能通知 DB 把这个 Redis 进行扩容,扩容后才给你提供使用。所以这里面牵扯到我们的指标数据的元数据管理和接口之间的打通。

2、一体化函数执行引擎—OneFun

这个和刚才 OneData 的方式是一样的,比如这块提供了快速的函数,还有一些在线函数编程的方式的接口,你可以在上面写一点 JavaScript 或者 Golang 代码,然后就生成接口,接口里面可以直接对外提供服务,把他形成产品化的包装,在上层根据接口衍生出更多其他的一些应用系统。

3、基于 ssa 的在线 Golang 函数执行引擎

这里重点介绍下 Golang,其实我们是基于 Golang 语言本身 ssa 的特点去做的,我们有一个执行器,这个执行器已经写好的,它的作用就是可以把你写的 Golang 代码提交过来,加载到它的执行器里。

并且我们可以把我们写的代码作为函数库,积累下来然后放进去,它可以在执行的时候去调用这些函数库,而这里面写的代码语法和 Golang 是完全一样的。

同时我们在这里面执行的时候,指定了一个协程,每一个协程我们规定它的作用域,就是以沙箱机制的方式来去执行,最先实现的就是外部 context 去实现的,我们就可以实现 Web 化的 Golang 开发,这种有点像 Lua 那种脚本语言一样,你在线写完语言直接提交执行。

4、基于 V8 引擎的在线函数服务引擎

这是我们的 Javascript 的执行引擎,我们主要是做了 V8 引擎的池子,所有 Javascript 写完之后,丢到 V8 引擎上去执行,这应该大家都能够理解,如果大家玩过 JS 的可以理解这种方式,就是 V8 引擎里直接去执行。

5、一体化函数执行引擎--函数即服务

这是我们的在线函数编写过程:

右下角是我们的函数代码编写区,写完后左边的黑框是点击测试,输出可以在这里写,点击测试就会把结果输出出来,通过这种方式,我们极大地扩张了我们数据平台的开发能力。原来是本地要把 Golang 代码写完,然后调试完再发到线上环境去测试,而现在我们可以很大的规范化,比如说数据源的引入,我们就直接可以在这里去规定了,你只能引入申请过的数据源,你不能随便乱引入数据源,包括你数据源引入的时候,QPS 放大我都可以通过这种方式知道。

  • 降低启动成本;
  • 更快的部署流水线;
  • 更快的开发速度;
  • 系统安全性更高;
  • 适应微服务架构;
  • 自动扩展能力。

这个是我们一站式,把函数开发完后,直接提交,我们用 Prometheus + Grafana 可以里面看到实时报表。

6、案例介绍

这是一个典型的应用,Flink 里面去计算的时候,他对这个数据进行过滤,完了之后进行一个远程的 call,这个远程调用执行函数代码,大多数这种情况就是一个开发就可以完成大数据的开发和这个函数接口的开发,就可以完成这样一个活动的开发,整个活动开发的门槛就低了很多,真正实现了我们 DevOps,就是开发能够把整个流程自己走完。

四、微服务化 & ServiceMesh

1、数据应用必走之路—微服务化

上面讲的是 OneData 和 OneFun 的实现原理和机制,我们在内部是怎么去应用的,这套系统我们在游戏内部是大力推广。

这里尤其是接口这块,其实如果说要微服务化的话,大数据我们能做的也就是那样了,能够用 yarn 或者 K8S 去做资源的管控,和任务的管控,但真正去做服务治理还是在接口这块。目前我们上下接口大概是三千五百个,每周新增 50 个接口。

所以我们在做的时候也考虑到。原来我们服务是一个个开发,但是没有治理,现在我们加上服务还是一个个去开发,甚至有些服务我们会把它变成一个服务,但是我们加入了这个服务的治理。

好多人在提微服务,微服务如果没有一个平台去治理的话,将会是一种灾难。所以微服务化给我们带来便利的同时,也会给我们带来一些问题,所以在我们的场景里面,微服务是非常好的,每一个接口就可以作为一个服务,这种是天然的微服务。

2、一体化服务治理设计

但是这种微服务的治理将会是我们很大的一个问题,所以我们花了很大的精力去做了一个微服务的治理系统,从项目注册的时候,他就把项目注册的微服务中心,并且把 API 注册上来,然后在服务发布的时候,发到集群里的时候,这些服务都要主动的注册到我们的名册服务,就是 Consoul。

但注册到服务里不是作为服务路由来用的,而是到服务里后,我们在普罗米修斯这块去做它的健康检查和状态采集,只要注册上来,我立马就能感知和把状态采集过来,然后主要做实时报表和告警。

首先在服务的稳定性和健康度这块我们有一个保障,另外一个就是服务的信息注册到 Consul 里去后,我们有一个服务的网关,我们用的是 envoy,其实内部我们还把它作为 SideCar 使用,后面会介绍。

注册完了之后,envoy 会把这个所有的负载进信息加载到这块来,它去做它服务的路由,同时我们会把整个日志上报到日志中心去,包括网关的日志也会上传到日志中心去,日志中心再去做离线的报表和实时的一些报警监控。

所以这里面我们还加了一个基于 Consul 的一个配置,就是我们包括 server 的实时控制都可以通过 Consul 去配置,配置完了后立马能够 watch 到,然后去执行。

这个是基本的服务治理,但现在我们的服务治理升级了,比这个要更好一点,基本的原理是这样。

3、南北流量+东西流量的统一管控

并且我们在这里面实现了一个对 envoy 的管控,我们说是服务治理,主要是对流量的一些治理,比如贫富负载策略,路由策略,熔断,超时控制,故障注入等等一系列。

我们是通过 Consul 的配置管理,通过它能够下发到我们的 Agent,这个 Agent 再把这个数据能够通过 Istio 的接口和 K8s 的 API 能够下发到 envoy,这里面就是 API GeteWay 和 SideCar 都是 envoy,所以我们通过 Istio 对他的 XDS 的接口写入,就可以把所有的配置信息下发到这里。

这套系统能够整个去管控整个集群,南北流量和东西流量的统一管控。这套系统我们未来准备开源,现在主要是内部在使用,而且这里面我们也做了图形化的配置,所有 envoy 和 Istio 的配置我们都经过 YAML 转 Istio 再转 UI 的方式,把它图形化了,在这块能够做统一的管控。

而且我们把 Agent 开发完了之后就是多集群的支持,就是多个 K8s 集群只要加入进来,没问题可以去支持,我们管理 API GeteWay。

还有一块是 SideCar 的管理,就是 ServiceMash 里的 SideCar 管理。我们刚才说的函数接口也好,规则接口也好,是一个 server。

当然这里面还提到一个 chaos mesh 的功能,我们现在还在研究,我们准备在这套系统里把它实现了。

4、基于 ServiceMesh 的全链路流量分析

这是一个我们通过 ServiceMesh 做的分析,我们虽然可以宏观地看出来我们接口对 DB 的压力有多大,但实际上我们把流量导进来是我们对压力的监控是不够的,所以这种情况我们通过 ServiceMesh,他对出口流量和进口流量的管控,然后可以把流量进行详细的统计,统计完后可以生成一个快照,这次快照和下次快照之间的数据对比,入流量有多少的时候,对下面各个流量压力有多少。

这是整个展示图,我们有多个测试用例,这两个测试用例之间我们可以算出来对下游的压力的流量分析,后期对下游压力的分析和下游资源的扩容、缩容都有非常大的价值。

5、案例介绍

最后再介绍下我们目前用这套系统实现的一些案例,大数据的游戏回归,比如做一个游戏数据的回顾 (生涯回顾)、任务系统、排行榜。

Q & A

Q1:ServiceMesh 是怎么部署的?主要用来解决什么问题?

目前我们在使用的 ServiceMesh 技术实现是 Istio,版本是 1.3.6。这个版本还不支持物理机方式部署,所以我们是在 K8s 中部署使用,部署方式有 2 种,可以是直接使用 istioctl 命令安装,或者是生成 Yaml 文件后使用 kubectl 进行安装。

Servicemesh 的架构主要解决的问题是集群内东西流量的治理问题。同时 Servicemesh 的 Sidercar 作为协议代理服务和可以屏蔽后面的服务开发技术栈, Sidercar 后面的服务可以是各种语言开发,但是流量的管理和路由可以有统一的管控。

Q2:微服务治理架构能介绍一下吗?

微服务治理架构在我看来可以分为两类:

  • 服务实例的治理,这个在目前的 K8s 架构下,基本都是由 K8s 来管理了,包含了服务实例的发布,升级,阔所容,服务注册发现等等;
  • 服务流量的治理,这一个大家通常说的服务治理,目前主要是由微服务网关和服务网格两种技术来实现。服务网关实现集群内和集群外的流量治理,服务网格实现了集群内的流量治理。

Q3:开发人员主要具有什么样的技术背景?

针对大数据开发人员,要使用我们这套系统只需要会 SQL 语句和基本统计知识就可以了。

针对应用接口开发人员,要使用我们这套系统只需要会 JavaScript 或者 Golang,会基本的正则表达式,了解 HTTP 协议,会调试 HTTP 的 API 接口就可以了。

Q4:实时计算,Flink 与 Spark 选择上有没啥建议?

Spark 在 15,16 年的时候我们也在大规模使用,也在实时计算中使用过,但是当时的版本在实时计算上还是比较弱,在 500ms 的批处理中还是会出现数据堆积,所以在实时性上会有一定的问题,Spark 擅长在数据迭代计算和算法计算中。但是如果实时性要求不高而且有算法要求的场景中 Spark 还是不错的选择。

Flink 在设计之初就是一种流失处理模型,所以在针对实时性要求较高的场景中 Flink 还是比较合适的,在我们内部测试发现 Flink 的流失计算吞吐确实要比 Storm 好很多,比 Spark 也是好很多,而且 Flink 目前的窗口机制针对实时计算中的窗口计算非常好用。所以一般实时计算或者对实时性要求较高的场景 Flink 还是比较推荐的。

Q5:游戏回放数据服务端存储场景有么?

这种场景也是有的,游戏回放一般有 2 种方式,一种是录屏传输回放,这种成本非常高,但是简单且及时性比较好,另外一种是控制指令发回 Server,在另外的服务中去恢复当时的场景,这种方式在成本相对较小,但是使用复杂。

Q6:回放场景下客户端走什么协议将数据发送到服务端?

一般是游戏的私有协议。

查看原文

Yourtion 收藏了文章 · 8月6日

《RabbitMQ》如何保证消息的可靠性

一条消费成功被消费经历了生产者->MQ->消费者,因此在这三个步骤中都有可能造成消息丢失。

一 消息生产者没有把消息成功发送到MQ

1.1 事务机制

AMQP协议提供了事务机制,在投递消息时开启事务支持,如果消息投递失败,则回滚事务。

自定义事务管理器

@Configuration
public class RabbitTranscation {
    
    @Bean
    public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory){
        return new RabbitTransactionManager(connectionFactory);
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        return new RabbitTemplate(connectionFactory);
    }
}

修改yml

spring:
  rabbitmq:
    # 消息在未被队列收到的情况下返回
    publisher-returns: true

开启事务支持

rabbitTemplate.setChannelTransacted(true);

消息未接收时调用ReturnCallback

rabbitTemplate.setMandatory(true);

生产者投递消息

@Service
public class ProviderTranscation implements RabbitTemplate.ReturnCallback {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){
        // 设置channel开启事务
        rabbitTemplate.setChannelTransacted(true);
        rabbitTemplate.setReturnCallback(this);
    }
    
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println("这条消息发送失败了"+message+",请处理");
    }
    
    @Transactional(rollbackFor = Exception.class,transactionManager = "rabbitTransactionManager")
    public void publishMessage(String message) throws Exception {
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.convertAndSend("javatrip",message);
    }
}

但是,很少有人这么干,因为这是同步操作,一条消息发送之后会使发送端阻塞,以等待RabbitMQ-Server的回应,之后才能继续发送下一条消息,生产者生产消息的吞吐量和性能都会大大降低。

1.2 发送方确认机制

发送消息时将信道设置为confirm模式,消息进入该信道后,都会被指派给一个唯一ID,一旦消息被投递到所匹配的队列后,RabbitMQ就会发送给生产者一个确认。

开启消息确认机制

spring:
  rabbitmq:
    # 消息在未被队列收到的情况下返回
    publisher-returns: true
    # 开启消息确认机制
    publisher-confirm-type: correlated

消息未接收时调用ReturnCallback

rabbitTemplate.setMandatory(true);

生产者投递消息

@Service
public class ConfirmProvider implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init() {
        rabbitTemplate.setReturnCallback(this);
        rabbitTemplate.setConfirmCallback(this);
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if(ack){
            System.out.println("确认了这条消息:"+correlationData);
        }else{
            System.out.println("确认失败了:"+correlationData+";出现异常:"+cause);
        }
    }

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println("这条消息发送失败了"+message+",请处理");
    }

    public void publisMessage(String message){
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.convertAndSend("javatrip",message);
    }
}

如果消息确认失败后,我们可以进行消息补偿,也就是消息的重试机制。当未收到确认信息时进行消息的重新投递。设置如下配置即可完成。

spring:
  rabbitmq:
    # 支持消息发送失败后重返队列
    publisher-returns: true
    # 开启消息确认机制
    publisher-confirm-type: correlated
    listener:
      simple:
        retry:
          # 开启重试
          enabled: true
          # 最大重试次数
          max-attempts: 5
          # 重试时间间隔
          initial-interval: 3000

二 消息发送到MQ后,MQ宕机导致内存中的消息丢失

消息在MQ中有可能发生丢失,这时候我们就需要将队列和消息都进行持久化。

@Queue注解为我们提供了队列相关的一些属性,具体如下:

  1. name: 队列的名称;
  2. durable: 是否持久化;
  3. exclusive: 是否独享、排外的;
  4. autoDelete: 是否自动删除;
  5. arguments:队列的其他属性参数,有如下可选项,可参看图2的arguments:

    • x-message-ttl:消息的过期时间,单位:毫秒;
    • x-expires:队列过期时间,队列在多长时间未被访问将被删除,单位:毫秒;
    • x-max-length:队列最大长度,超过该最大值,则将从队列头部开始删除消息;
    • x-max-length-bytes:队列消息内容占用最大空间,受限于内存大小,超过该阈值则从队列头部开始删除消息;
    • x-overflow:设置队列溢出行为。这决定了当达到队列的最大长度时消息会发生什么。有效值是drop-head、reject-publish或reject-publish-dlx。仲裁队列类型仅支持drop-head;
    • x-dead-letter-exchange:死信交换器名称,过期或被删除(因队列长度超长或因空间超出阈值)的消息可指定发送到该交换器中;
    • x-dead-letter-routing-key:死信消息路由键,在消息发送到死信交换器时会使用该路由键,如果不设置,则使用消息的原来的路由键值
    • x-single-active-consumer:表示队列是否是单一活动消费者,true时,注册的消费组内只有一个消费者消费消息,其他被忽略,false时消息循环分发给所有消费者(默认false)
    • x-max-priority:队列要支持的最大优先级数;如果未设置,队列将不支持消息优先级;
    • x-queue-mode(Lazy mode):将队列设置为延迟模式,在磁盘上保留尽可能多的消息,以减少RAM的使用;如果未设置,队列将保留内存缓存以尽可能快地传递消息;
    • x-queue-master-locator:在集群模式下设置镜像队列的主节点信息。

持久化队列

创建队列的时候将持久化属性durable设置为true,同时要将autoDelete设置为false

@Queue(value = "javatrip",durable = "false",autoDelete = "false")

持久化消息

发送消息的时候将消息的deliveryMode设置为2,在Spring Boot中消息默认就是持久化的。

三 消费者消费消息的时候,未消费完毕就出现了异常

消费者刚消费了消息,还没有处理业务,结果发生异常。这时候就需要关闭自动确认,改为手动确认消息。

修改yml为手动签收模式

spring:
  rabbitmq:
    listener:
      simple:
        # 手动签收模式
        acknowledge-mode: manual
        # 每次签收一条消息
        prefetch: 1

消费者手动签收

@Component
@RabbitListener(queuesToDeclare = @Queue(value = "javatrip", durable = "true"))
public class Consumer {

    @RabbitHandler
    public void receive(String message, @Headers Map<String,Object> headers, Channel channel) throws Exception{

        System.out.println(message);
        // 唯一的消息ID
        Long deliverTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
        // 确认该条消息
        if(...){
            channel.basicAck(deliverTag,false);
        }else{
            // 消费失败,消息重返队列
            channel.basicNack(deliverTag,false,true);
        }
      
    }
}

四 总结

消息丢失的原因?

生产者、MQ、消费者都有可能造成消息丢失

如何保证消息的可靠性?

  • 发送方采取发送者确认模式
  • MQ进行队列及消息的持久化
  • 消费者消费成功后手动确认消息
查看原文

Yourtion 收藏了文章 · 8月6日

秒杀系统设计

背景

我之前写过一个秒杀系统的文章不过有些许瑕疵,所以我准备在之前的基础上进行二次创作,不过让我决心二创秒杀系统的原因是我最近面试了很多读者,动不动就是秒杀系统把我整蒙蔽了,我懵的主要是秒杀系统的细节大家都不知道,甚至不知道电商公司一个秒杀系统的组成部分。

我之前在某电商公司就是做电商活动的,所以这样的场景和很多解决方案我是比较清楚的,那我就从我自身去带着大家看看一个秒杀的设计细节以及中间各种解决方案的利弊,以下就是我设计的秒杀系统,几乎涵盖了市面上所有秒杀的实现细节:

正文

首先设计一个系统之前,我们需要先确认我们的业务场景是怎么样子的,我就带着大家一起假设一个场景好吧。

我们现场要卖1000件下面这个婴儿纸尿裤,然后我们根据以往这样秒杀活动的数据经验来看,目测来抢这100件纸尿裤的人足足有10万人。(南极人打钱!)

你一听,完了呀,这我们的服务器哪里顶得住啊!说真的直接打DB肯定挂,但是别急嘛,有暖男敖丙在,任何系统我们开始设计之前我们都应该去思考会出现哪些问题?这里我罗列了几个非常经典的问题:

问题

高并发:

是的高并发这个是我们想都不用想的一个点,一瞬间这么多人进来这不是高并发什么时候是呢?

是吧,秒杀的特点就是这样时间极短瞬间用户量大

正常的店铺营销都是用极低的价格配合上短信、APP的精准推送,吸引特别多的用户来参与这场秒杀,爽了商家苦了开发呀

秒杀大家都知道如果真的营销到位,价格诱人,几十万的流量我觉得完全不是问题,那单机的Redis我感觉3-4W的QPS还是能顶得住的,但是再高了就没办法了,那这个数据随便搞个热销商品的秒杀可能都不止了。

大量的请求进来,我们需要考虑的点就很多了,缓存雪崩缓存击穿缓存穿透这些我之前提到的点都是有可能发生的,出现问题打挂DB那就很难受了,活动失败用户体验差,活动人气没了,最后背锅的还是开发

超卖:

但凡是个秒杀,都怕超卖,我这里举例的只是尿不湿,要是换成100个MacBook Pro,商家的预算经费卖100个可以赚点还可以造势,结果你写错程序多卖出去200个,你不发货用户投诉你,平台封你店,你发货就血亏,你怎么办? (没事看了敖丙的文章直接不怕)

那最后只能杀个开发祭天解气了,秒杀的价格本来就低了,基本上都是不怎么赚钱的,超卖了就恐怖了呀,所以超卖也是很关键的一个点。

恶意请求:

你这么低的价格,假如我抢到了,我转手卖掉我不是血赚?就算我不卖我也不亏啊,那用户知道,你知道,别的别有用心的人(黑客、黄牛...)肯定也知道的。

那简单啊,我知道你什么时候抢,我搞个几十台机器搞点脚本,我也模拟出来十几万个人左右的请求,那我是不是意味着我基本上有80%的成功率了。

真实情况可能远远不止,因为机器请求的速度比人的手速往往快太多了,在贵州的敖丙我每年回家抢高铁票都是秒光的,我也不知道有没有黄牛的功劳,我要Diss你,黄牛。杰伦演唱会门票抢不到,我也Diss你。

Tip:科普下,小道消息了解到的,黄牛的抢票系统,比国内很多小公司的系统还吊很多,架构设计都是顶级的,我用顶配的服务加上顶配的架构设计,你还想看演唱会?还想回家?

不过不用黄牛我回家都难,我们云贵川跟我一样要回家过年的仔太多了555!

链接暴露:

前面几个问题大家可能都很好理解,一看到这个有的小伙伴可能会比较疑惑,啥是链接暴露呀?

相信是个开发同学都对这个画面一点都不陌生吧,懂点行的仔都可以打开谷歌的开发者模式,然后看看你的网页代码,有的就有URL,但是我写VUE的时候是事件触发然后去调用文件里面的接口看源码看不到,但是我可以点击一下查看你的请求地址啊,不过你好像可以对按钮在秒杀前置灰。

不管怎么样子都有危险,撇开外面的所有的东西你都挡住了,你卖这个东西实在便宜得过分,有诱惑力,你能保证开发不动心?开发知道地址,在秒杀的时候自己提前请求。。。(开发:怎么TM又是我)

数据库:

每秒上万甚至十几万的QPS(每秒请求数)直接打到数据库,基本上都要把库打挂掉,而且你服务不单单是做秒杀的还涉及其他的业务,你没做降级、限流、熔断啥的,别的一起挂,小公司的话可能全站崩溃404

反正不管你秒杀怎么挂,你别把别的搞挂了对吧,搞挂了就不是杀一个程序员能搞定的。

程序员:我TM好难啊!

问题都列出来了,那怎么设计,怎么解决这些问题就是接下去要考虑的了,我们对症下药。

我会从我设计的秒杀系统从上到下去给大家介绍我们正常电商秒杀系统在每一层做了些什么,每一层存在的问题,难点等。

我们从前端开始:

前端

秒杀系统普遍都是商城网页、H5、APP、小程序这几项。

在前端这一层其实我们可以做的事情有很多,如果用node去做,甚至能直接处理掉整个秒杀,但是node其实应该属于后端,所以我不讨论node Service了。

资源静态化:

秒杀一般都是特定的商品还有页面模板,现在一般都是前后端分离的,页面一般都是不会经过后端的,但是前端也要自己的服务器啊,那就把能提前放入cdn服务器的东西都放进去,反正把所有能提升效率的步骤都做一下,减少真正秒杀时候服务器的压力。

秒杀链接加盐:

我们上面说了链接要是提前暴露出去可能有人直接访问url就提前秒杀了,那又有小伙伴要说了我做个时间的校验就好了呀,那我告诉你,知道链接的地址比起页面人工点击的还是有很大优势

我知道url了,那我通过程序不断获取最新的北京时间,可以达到毫秒级别的,我就在00毫秒的时候请求,我敢说绝对比你人工点的成功率大太多了,而且我可以一毫秒发送N次请求,搞不好你卖100个产品我全拿了。

那这种情况怎么避免?

简单,把URL动态化,就连写代码的人都不知道,你就通过MD5之类的摘要算法加密随机的字符串去做url,然后通过前端代码获取url后台校验才能通过。

这个只能防止一部分没耐心继续破解下去的黑客,有耐心的人研究出来还是能破解,在电商场景存在很多这样的羊毛党,那怎么做呢?

后面我会说。

限流:

限流这里我觉得应该分为前端限流后端限流

物理控制:

大家有没有发现没到秒杀前,一般按钮都是置灰的,只有时间到了,才能点击。

这是因为怕大家在时间快到的最后几秒秒疯狂请求服务器,然后还没到秒杀的时候基本上服务器就挂了。

这个时候就需要前端的配合,定时去请求你的后端服务器,获取最新的北京时间,到时间点再给按钮可用状态。

按钮可以点击之后也得给他置灰几秒,不然他一样在开始之后一直点的。

你敢说你们秒杀的时候不是这样的?

前端限流:这个很简单,一般秒杀不会让你一直点的,一般都是点击一下或者两下然后几秒之后才可以继续点击,这也是保护服务器的一种手段。

后端限流:秒杀的时候肯定是涉及到后续的订单生成和支付等操作,但是都只是成功的幸运儿才会走到那一步,那一旦100个产品卖光了,return了一个false,前端直接秒杀结束,然后你后端也关闭后续无效请求的介入了。

Tip:真正的限流还会有限流组件的加入例如:阿里的Sentinel、Hystrix等。我这里就不展开了,就说一下物理的限流。

我们卖1000件商品,请求有10W,我们不需要把十万都放进来,你可以放1W请求进来,然后再进行操作,因为秒杀对于用户本身就是黑盒的,所以你怎么做的他们是没感知的,至于为啥放1W进来,而不是刚好1000,是因为会丢掉一些薅羊毛的用户,至于怎么判断,后面的风控阶段我会说。

Nginx:

Nginx大家想必都不陌生了吧,这玩意是高性能的web服务器,并发也随便顶几万不是梦,但是我们的Tomcat只能顶几百的并发呀,那简单呀负载均衡嘛,一台服务几百,那就多搞点,在秒杀的时候多租点流量机

Tip:据我所知国内某大厂就是在去年春节活动期间租光了亚洲所有的服务器,小公司也很喜欢在双十一期间买流量机来顶住压力。

这样一对比是不是觉得你的集群能顶很多了。

恶意请求拦截也需要用到它,一般单个用户请求次数太夸张,不像人为的请求在网关那一层就得拦截掉了,不然请求多了他抢不抢得到是一回事,服务器压力上去了,可能占用网络带宽或者把服务器打崩、缓存击穿等等。

风控

我可以明确的告诉大家,前面的所有措施还是拦不住很多羊毛党,因为他们是专业的团队,他们可以注册很多账号来薅你的羊毛,而且不用机器请求,就用群控,操作几乎跟真实用户一模一样。

那怎么办,是不是无解了?

这个时候就需要风控同学的介入了,在请求到达后端之前,风控可以根据账号行为分析出这个账号机器人的概率大不大,我现在负责公司的某些特殊系统,每个用户的行为都是会送到我们大数据团队进行分析处理,给你打上对应标签的。

那黑客其实也有办法:养号

他们去黑市买真实用户有过很多记录的账号,买到了还不闲着,帮他们去购物啥的,让系统无法识别他们是黑号还是真实用户的号。

怎么办?

通杀!是的没有办法,只能通杀了,通杀的意思就是,我们通过风管分析出来这个用户是真实用户的概率没有其他用户概率大,那就认为他是机器了,丢弃他的请求。

之前的限流我们放进来10000个请求,但是我们真正的库存只有1000个,那我们就算出最有可能是真实用户的1000人进行秒杀,丢弃其他请求,因为秒杀本来就是黑盒操作的,用户层面是无感知的,这样设计能让真实的用户买到东西,还可以减少自己被薅羊毛的概率。

风控可以说是流量进入的最后一道门槛了,所以很多公司的风控是很强的,蚂蚁金服的风控大家如果了解过就知道了,你的资金在支付宝被盗了,他们是能做到全款补偿是有原因的。

后端

服务单一职责:

设计个能抗住高并发的系统,我觉得还是得单一职责

什么意思呢,大家都知道现在设计都是微服务的设计思想,然后再用分布式的部署方式

也就是我们下单是有个订单服务,用户登录管理等有个用户服务等等,那为啥我们不给秒杀也开个服务,我们把秒杀的代码业务逻辑放一起。

单一职责的好处就是就算秒杀没抗住,秒杀库崩了,服务挂了,也不会影响到其他的服务。(高可用)

Redis集群:

之前不是说单机的Redis顶不住嘛,那简单多找几个兄弟啊,秒杀本来就是读多写少,那你们是不是瞬间想起来我之前跟你们提到过的,Redis集群主从同步读写分离,我们还搞点哨兵,开启持久化直接无敌高可用!

库存预热:

秒杀的本质,就是对库存的抢夺,每个秒杀的用户来你都去数据库查询库存校验库存,然后扣减库存,撇开性能因数,你不觉得这样好繁琐,对业务开发人员都不友好,而且数据库顶不住啊。

开发:你tm总算为我着想一次了。

那怎么办?

我们都知道数据库顶不住但是他的兄弟非关系型的数据库Redis能顶啊!

那不简单了,我们要开始秒杀前你通过定时任务或者运维同学提前把商品的库存加载到Redis中去,让整个流程都在Redis里面去做,然后等秒杀介绍了,再异步的去修改库存就好了。

但是用了Redis就有一个问题了,我们上面说了我们采用主从,就是我们会去读取库存然后再判断然后有库存才去减库存,正常情况没问题,但是高并发的情况问题就很大了。

**多品几遍!!!**就比如现在库存只剩下1个了,我们高并发嘛,4个服务器一起查询了发现都是还有1个,那大家都觉得是自己抢到了,就都去扣库存,那结果就变成了-3,是的只有一个是真的抢到了,别的都是超卖的。咋办?

事务:

Redis本身是支持事务的,而且他有很多原子命令的,大家也可以用LUA,还可以用他的管道,乐观锁他也知支持。

限流&降级&熔断&隔离:

这个为啥要做呢,不怕一万就怕万一,万一你真的顶不住了,限流,顶不住就挡一部分出去但是不能说不行,降级,降级了还是被打挂了,熔断,至少不要影响别的系统,隔离,你本身就独立的,但是你会调用其他的系统嘛,你快不行了你别拖累兄弟们啊。

消息队列(削峰填谷):

一说到这个名词,很多小伙伴就知道了,对的MQ,你买东西少了你直接100个请求改库我觉得没问题,但是万一秒杀一万个,10万个呢?服务器挂了,程序员又要背锅的

秒杀就是这种瞬间流量很高,但是平时又没有流量的场景,那消息队列完全契合这样的场景了呀,削峰填谷。

Tip:可能小伙伴说我们业务达不到这个量级,没必要。但是我想说我们写代码,就不应该写出有逻辑漏洞的代码,至少以后公司体量上去了,别人一看居然不用改代码,一看代码作者是敖丙?有点东西!

你可以把它放消息队列,然后一点点消费去改库存就好了嘛,不过单个商品其实一次修改就够了,我这里说的是某个点多个商品一起秒杀的场景,像极了双十一零点。

数据库

数据库用MySQL只要连接池设置合理一般问题是不大的,不过一般大公司不缺钱而且秒杀这样的活动十分频繁,我之前所在的公司就是这样秒杀特卖这样的场景一直都是不间断的。

单独给秒杀建立一个数据库,为秒杀服务,表的设计也是竟可能的简单点,现在的互联网架构部署都是分库的。

至于表就看大家怎么设计了,该设置索引的地方还是要设置索引的,建完后记得用explain看看SQL的执行计划。(不了解的小伙伴也没事,MySQL章节去康康)

分布式事务

这为啥我不放在后端而放到最后来讲呢?

因为上面的任何一步都是可能出错的,而且我们是在不同的服务里面出错的,那就涉及分布式事务了,但是分布式事务大家想的是一定要成功什么的那就不对了,还是那句话,几个请求丢了就丢了,要保证时效和服务的可用可靠。

所以TCC最终一致性其实不是很适合,TCC开发成本很大,所有接口都要写三次,因为涉及TCC的三个阶段。

最终一致性基本上都是靠轮训的操作去保证一个操作一定成功,那时效性就大打折扣了。

大家觉得不那么可靠的**两段式(2PC)三段式(3PC)**就派上用场了,他们不一定能保证数据最终一致,但是效率上还算ok。

总结

到这里我想我已经基本上把该考虑的点还有对应的解决方案也都说了一下,不知道还有没有没考虑到的,但是就算没考虑到我想我这个设计,应该也能撑住一个完整的秒杀流程。

最后大家再看看这个秒杀系统或许会有新的感悟,是不是一个系统真的没有大家想的那么简单,而且我还是有漏掉的细节,这是一定的。

秒杀这章我脑细胞死了很多,考虑了很多个点,最后还是出来了,忍不住给自己点赞

总结

我们玩归玩,闹归闹,别拿面试开玩笑。

秒杀不一定是每个同学都会问到的,至少肯定没Redis基础那样常问,但是一旦问到,大家一定要回答到点上。

至少你得说出可能出现的情况需要注意的情况,以及对于的解决思路和方案,因为这才是一个coder的基本素养,这些你不考虑你也很难去进步。

最后就是需要对整个链路比较熟悉,注意是一个完整的链路,前端怎么设计的呀,网关的作用呀,怎么解决Redis的并发竞争啊,数据的同步方式呀,MQ的作用啊等等,相信你会有不错的收获。

不知道这是一次成功还是失败的二创,我里面所有提到的技术细节我都写了对应的文章,大家可以关注我去历史文章看看,天色已晚,我溜了。

查看原文

Yourtion 收藏了文章 · 7月30日

VUE-多文件断点续传、秒传、分片上传

本文为:多文件断点续传、分片上传、秒传、重试机制 的更新版,若想看初始版本的实现,请查看该文章。

凡是要知其然知其所以然

文件上传相信很多朋友都有遇到过,那或许你也遇到过当上传大文件时,上传时间较长,且经常失败的困扰,并且失败后,又得重新上传很是烦人。那我们先了解下失败的原因吧!

据我了解大概有以下原因:

  1. 服务器配置:例如在PHP中默认的文件上传大小为8M【post_max_size = 8m】,若你在一个请求体中放入8M以上的内容时,便会出现异常
  2. 请求超时:当你设置了接口的超时时间为10s,那么上传大文件时,一个接口响应时间超过10s,那么便会被Faild掉。
  3. 网络波动:这个就属于不可控因素,也是较常见的问题。
基于以上原因,聪明的人们就想到了,将文件拆分多个小文件,依次上传,不就解决以上1,2问题嘛,这便是分片上传。 网络波动这个实在不可控,也许一阵大风刮来,就断网了呢。那这样好了,既然断网无法控制,那我可以控制只上传已经上传的文件内容,不就好了,这样大大加快了重新上传的速度。所以便有了“断点续传”一说。此时,人群中有人插了一嘴,有些文件我已经上传一遍了,为啥还要在上传,能不能不浪费我流量和时间。喔...这个嘛,简单,每次上传时判断下是否存在这个文件,若存在就不重新上传便可,于是又有了“秒传”一说。从此这"三兄弟" 便自行CP,统治了整个文件界。”

注意文中的代码并非实际代码,请移步至github查看最新代码
https://github.com/pseudo-god...


分片上传

HTML

原生INPUT样式较丑,这里通过样式叠加的方式,放一个Button.
  <div class="btns">
    <el-button-group>
      <el-button :disabled="changeDisabled">
        <i class="el-icon-upload2 el-icon--left" size="mini"></i>选择文件
        <input
          v-if="!changeDisabled"
          type="file"
          :multiple="multiple"
          class="select-file-input"
          :accept="accept"
          @change="handleFileChange"
        />
      </el-button>
      <el-button :disabled="uploadDisabled" @click="handleUpload()"><i class="el-icon-upload el-icon--left" size="mini"></i>上传</el-button>
      <el-button :disabled="pauseDisabled" @click="handlePause"><i class="el-icon-video-pause el-icon--left" size="mini"></i>暂停</el-button>
      <el-button :disabled="resumeDisabled" @click="handleResume"><i class="el-icon-video-play el-icon--left" size="mini"></i>恢复</el-button>
      <el-button :disabled="clearDisabled" @click="clearFiles"><i class="el-icon-video-play el-icon--left" size="mini"></i>清空</el-button>
    </el-button-group>
    <slot 
    
 //data 数据
 
var chunkSize = 10 * 1024 * 1024; // 切片大小
var fileIndex = 0; // 当前正在被遍历的文件下标

 data: () => ({
    container: {
      files: null
    },
    tempFilesArr: [], // 存储files信息
    cancels: [], // 存储要取消的请求
    tempThreads: 3,
    // 默认状态
    status: Status.wait
  }),
    

一个稍微好看的UI就出来了。

选择文件

选择文件过程中,需要对外暴露出几个钩子,熟悉elementUi的同学应该很眼熟,这几个钩子基本与其一致。onExceed:文件超出个数限制时的钩子、beforeUpload:文件上传之前

fileIndex 这个很重要,因为是多文件上传,所以定位当前正在被上传的文件就很重要,基本都靠它

handleFileChange(e) {
  const files = e.target.files;
  if (!files) return;
  Object.assign(this.$data, this.$options.data()); // 重置data所有数据

  fileIndex = 0; // 重置文件下标
  this.container.files = files;
  // 判断文件选择的个数
  if (this.limit && this.container.files.length > this.limit) {
    this.onExceed && this.onExceed(files);
    return;
  }

  // 因filelist不可编辑,故拷贝filelist 对象
  var index = 0; // 所选文件的下标,主要用于剔除文件后,原文件list与临时文件list不对应的情况
  for (const key in this.container.files) {
    if (this.container.files.hasOwnProperty(key)) {
      const file = this.container.files[key];

      if (this.beforeUpload) {
        const before = this.beforeUpload(file);
        if (before) {
          this.pushTempFile(file, index);
        }
      }

      if (!this.beforeUpload) {
        this.pushTempFile(file, index);
      }

      index++;
    }
  }
},
// 存入 tempFilesArr,为了上面的钩子,所以将代码做了拆分
pushTempFile(file, index) {
  // 额外的初始值
  const obj = {
    status: fileStatus.wait,
    chunkList: [],
    uploadProgress: 0,
    hashProgress: 0,
    index
  };
  for (const k in file) {
    obj[k] = file[k];
  }
  console.log('pushTempFile -> obj', obj);
  this.tempFilesArr.push(obj);
}

分片上传

  • 创建切片,循环分解文件即可

      createFileChunk(file, size = chunkSize) {
        const fileChunkList = [];
        var count = 0;
        while (count < file.size) {
          fileChunkList.push({
            file: file.slice(count, count + size)
          });
          count += size;
        }
        return fileChunkList;
      }
  • 循环创建切片,既然咱们做的是多文件,所以这里就有循环去处理,依次创建文件切片,及切片的上传。
async handleUpload(resume) {
  if (!this.container.files) return;
  this.status = Status.uploading;
  const filesArr = this.container.files;
  var tempFilesArr = this.tempFilesArr;

  for (let i = 0; i < tempFilesArr.length; i++) {
    fileIndex = i;
    //创建切片
    const fileChunkList = this.createFileChunk(
      filesArr[tempFilesArr[i].index]
    );
      
    tempFilesArr[i].fileHash ='xxxx'; // 先不用看这个,后面会讲,占个位置
    tempFilesArr[i].chunkList = fileChunkList.map(({ file }, index) => ({
      fileHash: tempFilesArr[i].hash,
      fileName: tempFilesArr[i].name,
      index,
      hash: tempFilesArr[i].hash + '-' + index,
      chunk: file,
      size: file.size,
      uploaded: false,
      progress: 0, // 每个块的上传进度
      status: 'wait' // 上传状态,用作进度状态显示
    }));
    
    //上传切片
    await this.uploadChunks(this.tempFilesArr[i]);
  }
}
  • 上传切片,这个里需要考虑的问题较多,也算是核心吧,uploadChunks方法只负责构造传递给后端的数据,核心上传功能放到sendRequest方法中
 async uploadChunks(data) {
  var chunkData = data.chunkList;
  const requestDataList = chunkData
    .map(({ fileHash, chunk, fileName, index }) => {
      const formData = new FormData();
      formData.append('md5', fileHash);
      formData.append('file', chunk);
      formData.append('fileName', index); // 文件名使用切片的下标
      return { formData, index, fileName };
    });

  try {
    await this.sendRequest(requestDataList, chunkData);
  } catch (error) {
    // 上传有被reject的
    this.$message.error('亲 上传失败了,考虑重试下呦' + error);
    return;
  }

  // 合并切片
  const isUpload = chunkData.some(item => item.uploaded === false);
  console.log('created -> isUpload', isUpload);
  if (isUpload) {
    alert('存在失败的切片');
  } else {
    // 执行合并
    await this.mergeRequest(data);
  }
}
  • sendReques。上传这是最重要的地方,也是容易失败的地方,假设有10个分片,那我们若是直接发10个请求的话,很容易达到浏览器的瓶颈,所以需要对请求进行并发处理。

    • 并发处理:这里我使用for循环控制并发的初始并发数,然后在 handler 函数里调用自己,这样就控制了并发。在handler中,通过数组API.shift模拟队列的效果,来上传切片。
    • 重试: retryArr 数组存储每个切片文件请求的重试次数,做累加。比如[1,0,2],就是第0个文件切片报错1次,第2个报错2次。为保证能与文件做对应,const index = formInfo.index; 我们直接从数据中拿之前定义好的index。 若失败后,将失败的请求重新加入队列即可。

      • 关于并发及重试我写了一个小Demo,若不理解可以自己在研究下,文件地址:https://github.com/pseudo-god... , 重试代码好像被我弄丢了,大家要是有需求,我再补吧!
    // 并发处理
sendRequest(forms, chunkData) {
  var finished = 0;
  const total = forms.length;
  const that = this;
  const retryArr = []; // 数组存储每个文件hash请求的重试次数,做累加 比如[1,0,2],就是第0个文件切片报错1次,第2个报错2次

  return new Promise((resolve, reject) => {
    const handler = () => {
      if (forms.length) {
        // 出栈
        const formInfo = forms.shift();

        const formData = formInfo.formData;
        const index = formInfo.index;
        
        instance.post('fileChunk', formData, {
          onUploadProgress: that.createProgresshandler(chunkData[index]),
          cancelToken: new CancelToken(c => this.cancels.push(c)),
          timeout: 0
        }).then(res => {
          console.log('handler -> res', res);
          // 更改状态
          chunkData[index].uploaded = true;
          chunkData[index].status = 'success';
          
          finished++;
          handler();
        })
          .catch(e => {
            // 若暂停,则禁止重试
            if (this.status === Status.pause) return;
            if (typeof retryArr[index] !== 'number') {
              retryArr[index] = 0;
            }

            // 更新状态
            chunkData[index].status = 'warning';

            // 累加错误次数
            retryArr[index]++;

            // 重试3次
            if (retryArr[index] >= this.chunkRetry) {
              return reject('重试失败', retryArr);
            }

            this.tempThreads++; // 释放当前占用的通道

            // 将失败的重新加入队列
            forms.push(formInfo);
            handler();
          });
      }

      if (finished >= total) {
        resolve('done');
      }
    };

    // 控制并发
    for (let i = 0; i < this.tempThreads; i++) {
      handler();
    }
  });
}
  • 切片的上传进度,通过axios的onUploadProgress事件,结合createProgresshandler方法进行维护
// 切片上传进度
createProgresshandler(item) {
  return p => {
    item.progress = parseInt(String((p.loaded / p.total) * 100));
    this.fileProgress();
  };
}

Hash计算

其实就是算一个文件的MD5值,MD5在整个项目中用到的地方也就几点。
  • 秒传,需要通过MD5值判断文件是否已存在。
  • 续传:需要用到MD5作为key值,当唯一值使用。
本项目主要使用worker处理,性能及速度都会有很大提升.
由于是多文件,所以HASH的计算进度也要体现在每个文件上,所以这里使用全局变量fileIndex来定位当前正在被上传的文件

执行计算hash

正在上传文件

// 生成文件 hash(web-worker)
calculateHash(fileChunkList) {
  return new Promise(resolve => {
    this.container.worker = new Worker('./hash.js');
    this.container.worker.postMessage({ fileChunkList });
    this.container.worker.onmessage = e => {
      const { percentage, hash } = e.data;
      if (this.tempFilesArr[fileIndex]) {
        this.tempFilesArr[fileIndex].hashProgress = Number(
          percentage.toFixed(0)
        );
      }

      if (hash) {
        resolve(hash);
      }
    };
  });
}

因使用worker,所以我们不能直接使用NPM包方式使用MD5。需要单独去下载spark-md5.js文件,并引入

//hash.js

self.importScripts("/spark-md5.min.js"); // 导入脚本
// 生成文件 hash
self.onmessage = e => {
  const { fileChunkList } = e.data;
  const spark = new self.SparkMD5.ArrayBuffer();
  let percentage = 0;
  let count = 0;
  const loadNext = index => {
    const reader = new FileReader();
    reader.readAsArrayBuffer(fileChunkList[index].file);
    reader.onload = e => {
      count++;
      spark.append(e.target.result);
      if (count === fileChunkList.length) {
        self.postMessage({
          percentage: 100,
          hash: spark.end()
        });
        self.close();
      } else {
        percentage += 100 / fileChunkList.length;
        self.postMessage({
          percentage
        });
        loadNext(count);
      }
    };
  };
  loadNext(0);
};

文件合并

当我们的切片全部上传完毕后,就需要进行文件的合并,这里我们只需要请求接口即可
mergeRequest(data) {
   const obj = {
     md5: data.fileHash,
     fileName: data.name,
     fileChunkNum: data.chunkList.length
   };

   instance.post('fileChunk/merge', obj, 
     {
       timeout: 0
     })
     .then((res) => {
       this.$message.success('上传成功');
     });
 }
Done: 至此一个分片上传的功能便已完成

断点续传

顾名思义,就是从那断的就从那开始,明确思路就很简单了。一般有2种方式,一种为服务器端返回,告知我从那开始,还有一种是浏览器端自行处理。2种方案各有优缺点。本项目使用第二种。

思路:已文件HASH为key值,每个切片上传成功后,记录下来便可。若需要续传时,直接跳过记录中已存在的便可。本项目将使用Localstorage进行存储,这里我已提前封装好addChunkStorage、getChunkStorage方法。

存储在Stroage的数据

缓存处理

在切片上传的axios成功回调中,存储已上传成功的切片

 instance.post('fileChunk', formData, )
  .then(res => {
    // 存储已上传的切片下标
+ this.addChunkStorage(chunkData[index].fileHash, index);
    handler();
  })

在切片上传前,先看下localstorage中是否存在已上传的切片,并修改uploaded

    async handleUpload(resume) {
+      const getChunkStorage = this.getChunkStorage(tempFilesArr[i].hash);
      tempFilesArr[i].chunkList = fileChunkList.map(({ file }, index) => ({
+        uploaded: getChunkStorage && getChunkStorage.includes(index), // 标识:是否已完成上传
+        progress: getChunkStorage && getChunkStorage.includes(index) ? 100 : 0,
+        status: getChunkStorage && getChunkStorage.includes(index)? 'success'
+              : 'wait' // 上传状态,用作进度状态显示
      }));

    }

构造切片数据时,过滤掉uploaded为true的

 async uploadChunks(data) {
  var chunkData = data.chunkList;
  const requestDataList = chunkData
+    .filter(({ uploaded }) => !uploaded)
    .map(({ fileHash, chunk, fileName, index }) => {
      const formData = new FormData();
      formData.append('md5', fileHash);
      formData.append('file', chunk);
      formData.append('fileName', index); // 文件名使用切片的下标
      return { formData, index, fileName };
    })
}

垃圾文件清理

随着上传文件的增多,相应的垃圾文件也会增多,比如有些时候上传一半就不再继续,或上传失败,碎片文件就会增多。解决方案我目前想了2种
  • 前端在localstorage设置缓存时间,超过时间就发送请求通知后端清理碎片文件,同时前端也要清理缓存。
  • 前后端都约定好,每个缓存从生成开始,只能存储12小时,12小时后自动清理
以上2中方案似乎都有点问题,极有可能造成前后端因时间差,引发切片上传异常的问题,后面想到合适的解决方案再来更新吧。

Done: 续传到这里也就完成了。


秒传

这算是最简单的,只是听起来很厉害的样子。原理:计算整个文件的HASH,在执行上传操作前,向服务端发送请求,传递MD5值,后端进行文件检索。若服务器中已存在该文件,便不进行后续的任何操作,上传也便直接结束。大家一看就明白
async handleUpload(resume) {
    if (!this.container.files) return;
    const filesArr = this.container.files;
    var tempFilesArr = this.tempFilesArr;

    for (let i = 0; i < tempFilesArr.length; i++) {
      const fileChunkList = this.createFileChunk(
        filesArr[tempFilesArr[i].index]
      );

      // hash校验,是否为秒传
+      tempFilesArr[i].hash = await this.calculateHash(fileChunkList);
+      const verifyRes = await this.verifyUpload(
+        tempFilesArr[i].name,
+        tempFilesArr[i].hash
+      );
+      if (verifyRes.data.presence) {
+       tempFilesArr[i].status = fileStatus.secondPass;
+       tempFilesArr[i].uploadProgress = 100;
+      } else {
        console.log('开始上传切片文件----》', tempFilesArr[i].name);
        await this.uploadChunks(this.tempFilesArr[i]);
      }
    }
  }
  // 文件上传之前的校验: 校验文件是否已存在
  verifyUpload(fileName, fileHash) {
    return new Promise(resolve => {
      const obj = {
        md5: fileHash,
        fileName,
        ...this.uploadArguments //传递其他参数
      };
      instance
        .post('fileChunk/presence', obj)
        .then(res => {
          resolve(res.data);
        })
        .catch(err => {
          console.log('verifyUpload -> err', err);
        });
    });
  }
Done: 秒传到这里也就完成了。

后端处理

文章好像有点长了,具体代码逻辑就先不贴了,除非有人留言要求,嘻嘻,有时间再更新

Node版

请前往 https://github.com/pseudo-god... 查看

JAVA版

下周应该会更新处理

PHP版

1年多没写PHP了,抽空我会慢慢补上来

待完善

  • 切片的大小:这个后面会做出动态计算的。需要根据当前所上传文件的大小,自动计算合适的切片大小。避免出现切片过多的情况。
  • 文件追加:目前上传文件过程中,不能继续选择文件加入队列。(这个没想好应该怎么处理。)

更新记录

组件已经运行一段时间了,期间也测试出几个问题,本来以为没BUG的,看起来BUG都挺严重

BUG-1:当同时上传多个内容相同但是文件名称不同的文件时,出现上传失败的问题。

预期结果:第一个上传成功后,后面相同的问文件应该直接秒传

实际结果:第一个上传成功后,其余相同的文件都失败,错误信息,块数不对。

原因:当第一个文件块上传完毕后,便立即进行了下一个文件的循环,导致无法及时获取文件是否已秒传的状态,从而导致失败。

解决方案:在当前文件分片上传完毕并且请求合并接口完毕后,再进行下一次循环。

将子方法都改为同步方式,mergeRequest 和 uploadChunks 方法


BUG-2: 当每次选择相同的文件并触发beforeUpload方法时,若第二次也选择了相同的文件,beforeUpload方法失效,从而导致整个流程失效。

原因:之前每次选择文件时,没有清空上次所选input文件的数据,相同数据的情况下,是不会触发input的change事件。

解决方案:每次点击input时,清空数据即可。我顺带优化了下其他的代码,具体看提交记录吧。

<input
  v-if="!changeDisabled"
  type="file"
  :multiple="multiple"
  class="select-file-input"
  :accept="accept"
+  οnclick="f.outerHTML=f.outerHTML"
  @change="handleFileChange"/>
重写了暂停和恢复的功能,实际上,主要是增加了暂停和恢复的状态

之前的处理逻辑太简单粗暴,存在诸多问题。现在将状态定位在每一个文件之上,这样恢复上传时,直接跳过即可

封装组件

写了一大堆,其实以上代码你直接复制也无法使用,这里我将此封装了一个组件。大家可以去github下载文件,里面有使用案例 ,若有用记得随手给个star,谢谢!

偷个懒,具体封装组件的代码就不列出来了,大家直接去下载文件查看,若有不明白的,可留言。

组件文档

Attribute

参数类型说明默认备注
headersObject设置请求头
before-uploadFunction上传文件前的钩子,返回false则停止上传
acceptString接受上传的文件类型
upload-argumentsObject上传文件时携带的参数
with-credentialsBoolean是否传递Cookiefalse
limitNumber最大允许上传个数00为不限制
on-exceedFunction文件超出个数限制时的钩子
multipleBoolean是否为多选模式true
base-urlString由于本组件为内置的AXIOS,若你需要走代理,可以直接在这里配置你的基础路径
chunk-sizeNumber每个切片的大小10M
threadsNumber请求的并发数3并发数越高,对服务器的性能要求越高,尽可能用默认值即可
chunk-retryNumber错误重试次数3分片请求的错误重试次数

Slot

方法名说明参数备注
header按钮区域
tip提示说明文字

后端接口文档:按文档实现即可



代码地址:https://github.com/pseudo-god...

接口文档地址 https://docs.apipost.cn/view/...

查看原文

Yourtion 收藏了文章 · 6月4日

正则表达式引擎执行原理——从未如此清晰!

目前越来越多的网站、编辑器、编程语言都已支持一种叫“正则表达式”的字符串查找“公式”,有过编程经验的同学都应该了解正则表达式(Regular Expression 简写regex)是什么东西,它是一种字符串匹配的模式(pattern),更像是一种逻辑公式。

使用正则表达式去匹配字符串Hello World 中的 Hello
伪代码:/Hello/, "Hello World"
输出:Hello

如何写好一篇关于 正则表达式 的文章,我思考了一周的时间,从未有一篇文章能让猪哥如此费神。

因为我觉得正则表达式 :难记忆、难描述、广而深且不受重视,有人说正则表达式既好写也难写!

  1. 好写:无非写一些常用、实用的案例,说实话你们每个人都能写出这种:在网上百度一下然后结合一点自己的实际经验,一篇文章就出来了。
  2. 难写:很多人都认为正则简单,不用记,要用就百度一下。但是绝大多数人了解的只是正则的一个小面,真正的精髓却很少关注!

猪哥希望大家能了解到正则的知识点其实非常非常多,尤其是正则引擎执行原理以及正则优化,这算是正则表达式的进阶知识点,面试中也可能会被问到。
在这里插入图片描述

一、起源与发展

我们在学习一门技术的时候有必要了解其起源与发展过程,这对我们去理解技术本身有一定的帮助!

20世纪40年代:正则表达式最初的想法来自两位神经学家:沃尔特·皮茨与麦卡洛克,他们研究出了一种用数学方式来描述神经网络的模型。

1956年:一位名叫Stephen Kleene的数学科学家发表了一篇题目是《神经网事件的表示法》的论文,利用称之为正则集合的数学符号来描述此模型,引入了正则表达式的概念。正则表达式被作为用来描述其称之为“正则集的代数”的一种表达式,因而采用了“正则表达式”这个术语。

1968年:C语言之父、UNIX之父肯·汤普森把这个“正则表达式”的理论成果用于做一些搜索算法的研究,他描述了一种正则表达式的编译器,于是出现了应该算是最早的正则表达式的编译器qed(这也就成为后来的grep编辑器)。

Unix使用正则之后,正则表达式不断的发展壮大,然后大规模应用于各种领域,根据这些领域各自的条件需要,又发展出了许多版本的正则表达式,出现了许多的分支。我们把这些分支叫做“流派”。

1987年:Perl语言诞生了,它综合了其他的语言,用正则表达式作为基础,开创了一个新的流派,Perl流派。之后很多编程语言如:Python、Java、Ruby、.Net、PHP等等在设计正则式支持的时候都参考Perl正则表达式
在这里插入图片描述

到这里我们也就知道为什么众多编程语言的正则表达式基本一样,因为他们都师从Perl

注:Perl语言是一种擅长处理文本的语言,但因晦涩语法和古怪符号不利于理解和记忆导致很多开发者并不喜欢。

二、语法

完整的正则表达式由两种字符构成:特殊字符(元字符)和普通字符。

ps:元字符表示正则表达式功能的最小单位,如 *^$\d 等等

关于语法部分猪哥并不想过多的讲解,给大家做一个详细的归纳整理,供大家日后快速查找吧!
在这里插入图片描述

如果想系统学习正则表达式的语法部分,猪哥推荐 菜鸟教程:https://www.runoob.com/regexp...
在这里插入图片描述

三、匹配原理

匹配原理是猪哥想要重点讲解的部分,也希望同学们可以认真了解这部分的内容。

很多人觉得开车没必要了解车的构造原理,但是我们学编程的还真的需要了解原理。

因为了解原理,你才能调优,这往往也是初级工程师与中高级工程师之间的差别点之一!

1.执行过程

正则表达是的执行,是由正则表达引擎编译执行的,大致的执行流程猪哥也花了一个流程图给大家看看。
在这里插入图片描述

这里给大家提一点就是:预编译(pre-use compile)

猪哥建议大家在生产环境中使用预编译功能,为什么呢?

以Python语言内置re模块举例:

  1. 通过re.compile(pattern)预编译返回Pattern对象,在后面代码中可以直接引用。
  2. 通过re.match(pattern, text)即用编译,虽然也会有缓存Pattern对象,但是每次使用都需要去缓存中取出,比预编译多一步取操作。

猪哥也通过实际测试来 验证预编译 确实比 即用编译 要快!

pattern = r'http:\/\/(?:.?\w+)+'
text = '<a href="http://www.xxx.com">xxx.com</a>'

在这里插入图片描述

2.引擎

既然正则表达式由执行引擎执行,那我们就来讲讲正则表达式的引擎吧,这一块是重点,希望大家仔细看看,弄懂了理解了才行!

正则引擎主要可以分为基本不同的两大类:

  1. DFA (Deterministic finite automaton) 确定型有穷自动机
  2. NFA (Non-deterministic finite automaton) 非确定型有穷自动机

ps:当然还有一种引擎为:POSIX NFA,这是根据NFA引擎出的规范版本,但因为使用较少所以我们这里也就不重点讲解。

这里需要和大家解释下何为确定型有穷自动机这几个名词:

  1. 确定型与非确定型:假设有一个字符串(text=abc)需要匹配,在没有编写正则表达式的前提下,就直接可以确定字符匹配顺序的就是确定型,不能确定字符匹配顺序的则为非确定型。
  2. 有穷:有穷即表示有限的意思,这里表示有限次数内能得到结果。
  3. 自动机:自动机便是自动完成,在我们设置好匹配规则后由引擎自动完成,不需要人为干预!

根据上面的解释我们可得知DFA引擎 和 NFA引擎 的区别就在于:在没有编写正则表达式的前提下,是否能确定字符执行顺序!

DFA引擎执行原理:
为了大家能很清楚的理解DFA引擎执行原理,猪哥制作了一个简易的动态执行过程图给大家看看
在这里插入图片描述
根据上面的动图我们可以得出DFA引擎的一些特点:

  1. 文本主导:按照文本的顺序执行,这也就能说明为什么DFA引擎是确定型(deterministic)了,稳定!
  2. 记录当前有效的所有可能:我们看到当执行到(d|b)时,同时比较表达式中的db,所以会需要更多的内存。
  3. 每个字符只检查一次:这提高了执行效率,而且速度与正则表达式无关。
  4. 不能使用反向引用等功能:因为每个字符只检查一次,文本零宽度(位置)只记录当前比较值,所以不能使用反向引用、环视等一些功能!

NFA引擎执行原理:
猪哥同样画了一个简易的NFA引擎执行过程图方便大家理解
在这里插入图片描述
根据上面的动图我们可以得出NFA引擎的一些特点:

  1. 文表达式主导:按照表达式的一部分执行,如果不匹配换其他部分继续匹配,直到表达式匹配完成。
  2. 会记录某个位置:我们看到当执行到(d|b)时,NFA引擎会记录字符的位置(零宽度),然后选择其中一个先匹配。
  3. 单个字符可能检查多次:我们看到当执行到(d|b)时,比较d后发现不匹配,于是NFA引擎换表达式的另一个分支b,同时文本位置回退,重新匹配字符'b'。这也是NFA引擎是非确定型的原因,同时带来另一个问题效率可能没有DFA引擎高。
  4. 可实现反向引用等功能:因为具有回退这一步,所以可以很容易的实现反向引用、环视等一些功能!

针对两种引擎的区别,猪哥进行了比较
在这里插入图片描述
关于这两种引擎的总结,猪哥引用《精通正则表达式》书本中的一句话来概括:

DFA(是电动机) 和NFA(汽油机) 都有很长的历史,不过,正如汽油机一样,NFA 的历史更长一些。也有些系统采用了混合引擎,它们会根据任务的不同选择合适的引擎(甚至对同一表达式中的不同部分采用不同的引擎,以求得功能与速度之间的最佳平衡)。 ——《精通正则表达式》

3.回溯

作为绝大多数编程语言都选择的引擎——NFA (非确定型有穷自动机) 引擎,我们当然要再详细了解一下它的精髓——回溯
在这里插入图片描述
动图中,我们可以看到当某个正则分支匹配不成功之后,文本的位置需要回退,然后换另一个分支匹配,而回退这步专业术语就叫:回溯

回溯的原理类似我们走迷宫时走过的路设置一个标志物,如果不对则原路返回,换另一条路。
在这里插入图片描述

回溯机制不但需要重新计算正则表达式文本的对应位置,也需要维护括号内的子表达式所匹配文本的状态(b匹配成功),保存到内存中以数字编号的组中,这就叫捕获组

保存括号内的匹配结果之后,我们在后面的正则表达式中就可以使用,这就是我们所说的反向引用,在上面的案例中只有一个捕获,所以$1=b

回溯陷阱:讲到回溯必须提到回溯陷阱,它导致的结果就是机器CPU使用率爆满(超100%),机器就卡死了。

举个例子:text=aaaaa,pattern=/^(a*)b$/,匹配过程大致是

  1. (a*):匹配到了文本中的aaaaa
  2. 匹配正则中的b,但是失败,因为(a*)已经把text都吃了
  3. 这时候引擎会要求(a*)吐出最后一个字符(a),但是无法匹配b
  4. 第二次是吐出倒数第二个字符(还是a),依然无法匹配
  5. 就这样引擎会要求(a*)逐个将吃进去的字符都吐出来
  6. 但是到最后都无法匹配b

这里的重点就在于 引擎会要求*匹配的东西一点一点吐回,我们假设如果文本长度为几万,那引擎就要回溯几万次,这对机器的CPU来说简直是灾难。

有些复杂的正则表达式可能有多个部分都要回溯,那回溯次数就是指数型。如果文本长度为500,一个表达式有两部分都要回溯,那次数可能是500^2=25万次,这谁受得了!

关于更多更详细的回溯介绍,推荐大家可以阅读《精通正则表达式》这本书!

四、优化

编写巧妙的正则表达式不仅仅是一种技能,而且还是一种艺术。

上面我们了解到,绝大多数的编程语言都采用的是NFA引擎,而NFA引擎的特点是:功能强大、但有回溯机制所以效率慢。所以我们需要学习一些NFA引擎的一些优化技巧,以减少引擎回溯次数以及更直接的匹配到结果!

针对NFA引擎的可优化的点其实挺多的,为了方便大家记忆,猪哥也画幅结构图归纳一下,方便大家收藏细看。
在这里插入图片描述
在面试过程中也许会被问到关于正则的优化,大家记住几点就可以。

五、推荐

上面我们讲解了关于正则表达式的诞生和发展、引擎、优化等知识,但是关于正则表达式的知识点远远不止这些,所以最后猪哥推荐一些好的学习资料,大家有空可以了解学习下。

1.书

推荐正则表达式的书,那必然是《精通正则表达式》 ,目前这本书已经出了第三版,豆瓣评分8.9。

内容虽然稍有啰嗦,但是对于正则新手很友好,唯一不足是Python案例少。
在这里插入图片描述

2.博客

入门:菜鸟教程:https://www.runoob.com/regexp...

3.在线测试工具

https://regex101.com/,这个网站可以选择不同编程语言的正则支持,有语义分析、匹配测试、参考列表等,非常实用。
在这里插入图片描述

4.常用案例

一些简单常用的小案例汇总,菜鸟教程:http://c.runoob.com/front-end...
在这里插入图片描述

最后祝愿大家都能搞定正则表达式,处理文本可以得心应手!

更多优质教程可关注猪哥微信公众号「裸睡的猪」!

查看原文

Yourtion 收藏了文章 · 6月4日

也许这才是你想要的微前端方案

前言

微前端是当下的前端热词,稍具规模的团队都会去做技术探索,作为一个不甘落后的团队,我们也去做了。也许你看过了Single-Spaqiankun这些业界成熟方案,非常强大:JS沙箱隔离、多栈支持、子应用并行、子应用嵌套,但仔细想想它真的适合你吗?

对于我来说,太重了,概念太多,理解困难。先说一下背景,我们之所以要对我司的小贷管理后台做微前端改造,主要基于以下几个述求:

  • 系统从接手时差不多30个页面,一年多时间,发展到目前150多个页面,并还在持续增长;
  • 项目体积变大,带来开发体验很差,打包构建速度很慢(初次构建,1分钟以上);
  • 小贷系统开发量占整个web组50%的人力,每个迭代都有两三个需求在这一个系统上开发,代码合并冲突,上线时间交叉。带来的是开发流程管理复杂;
  • 业务人员是分类的,没有谁会用到所有的功能,每个业务人员只拥有其中30%甚至更少的功能。但不得不加载所有业务代码,才能看到自己想要的页面;

所以和市面上很多前端团队引入微前端的目的不同的是,我们是,而更多的团队是。所以本方案适合和我目的一致的前端团队,将自己维护的巨婴系统瓦解,然后通过微前端"框架"来聚合,降低项目管理难度,提升开发体验与业务使用体验。

巨婴系统技术栈: Dva + Antd

方案参考美团一篇文章:微前端在美团外卖的实践

在做这个项目的按需提前加载设计时,自己去深究过webpack构建出的项目代码运行逻辑,收获比较多:webpack 打包的代码怎么在浏览器跑起来的?, 不了解的可以看看

方案设计

基于业务角色,我们将巨婴系统拆成了一个基座系统和四个子系统(可以按需扩展子系统),如下图所示:

20200528165839

基座系统除了提供基座功能,即系统的登录、权限获取、子系统的加载、公共组件共享、公共库的共享,还提供了一个基本所有业务人员都会使用的业务功能:用户授(guan)信(li)。

子系统以静态资源的方式,提供一个注册函数,函数返回值是一个Switch包裹的组件与子系统所有的models。

路由设计

子系统以组件的形式加载到基座系统中,所以路由是入口,也是整个设计的第一步,为了区分基座系统页面和子系统页面,在路由上约定了下面这种形式:

// 子系统路由匹配,伪代码
function Layout(layoutProps) {
  useEffect(() => {
      const apps = getIncludeSubAppMap();
      // 按需加载子项目;
      apps.forEach(subKey => startAsyncSubapp(subKey));
  }, []);

  return (
    <HLayout {...props}>
      <Switch>
          {/* 企业用户管理 */}
          <Route exact path={Paths.PRODUCT_WHITEBAR} component={pages.ProductManage} breadcrumbName="企业用户管理" />
          {/* ...省略一百行 */}
          <Route path="/subPage/" component={pages.AsyncComponent} />
      </Switch>
    </HLayout>
}

即只要以subPage路径开头,就默认这个路由对应的组件为子项目,从而通过AsyncComponent组件去异步获取子项目组件。

异步加载组件设计

路由设计完了,然后异步加载组件就是这个方案的灵魂了,流程是这样的:

  • 通过路由,匹配到要访问的具体是那个子项目;
  • 通过子项目id,获取对应的manifest.json文件;
  • 通过获取manifest.json,识别到对应的静态资源(js,css)
  • 加载静态资源,加载完,子项目执行注册
  • 动态加载model,更新子项目组件

直接上代码吧,简单明了,资源加载的逻辑后面再详讲,需要注意的是model和component的加载顺序

export default function AsyncComponent({ location }) {
  // 子工程资源是否加载完成
  const [ayncLoading, setAyncLoaded] = useState(true);
  // 子工程组件加载存取
  const [ayncComponent, setAyncComponent] = useState(null);
  const { pathname } = location;
  // 取路径中标识子工程前缀的部分, 例如 '/subPage/xxx/home' 其中xxx即子系统路由标识
  const id = pathname.split('/')[2];
  useEffect(() => {
    if (!subAppMapInfo[id]) {
      // 不存在这个子系统,直接重定向到首页去
      goBackToIndex();
    }
    const status = subAppRegisterStatus[id];
    if (status !== 'finish') {
      // 加载子项目
      loadAsyncSubapp(id).then(({ routes, models }) => {
        loadModule(id, models);
        setAyncComponent(routes);
        setAyncLoaded(false);
        // 已经加载过的,做个标记
        subAppRegisterStatus[id] = 'finish';
      }).catch((error = {}) => {
        // 如果加载失败,显示错误信息
        setAyncLoaded(false);
        setAyncComponent(
          <div style={{
            margin: '100px auto',
            textAlign: 'center',
            color: 'red',
            fontSize: '20px'
          }}
          >
            {error.message || '加载失败'}
          </div>);
      });
    } else {
      const models = subappModels[id];
      loadModule(id, models);
      // 如果能匹配上前缀则加载相应子工程模块
      setAyncLoaded(false);
      setAyncComponent(subappRoutes[id]);
    }
  }, [id]);
  return (
    <Spin spinning={ayncLoading} style={{ width: '100%', minHeight: '100%' }}>
      {ayncComponent}
    </Spin>
  );
}

子项目设计

子项目以静态资源的形式在基座项目中加载,需要暴露出子系统自己的全部页面组件和数据model;然后在打包构建上和以前也稍许不同,需要多生成一个manifest.json来搜集子项目的静态资源信息。

子项目暴露出自己自愿的代码长这样:

// 子项目资源输出代码
import routes from './layouts';

const models = {};

function importAll(r) {
  r.keys().forEach(key => models[key] = r(key).default);
}

// 搜集所有页面的model
importAll(require.context('./pages', true, /model\.js$/));

function registerApp(dep) {
  return {
    routes, // 子工程路由组件
    models, // 子工程数据模型集合
  };
}

// 数组第一个参数为子项目id,第二个参数为子项目模块获取函数
(window["registerApp"] = window["registerApp"] || []).push(['collection', registerApp]);

子项目页面组件搜集:

import menus from 'configs/menus';
import { Switch, Redirect, Route } from 'react-router-dom';
import pages from 'pages';

function flattenMenu(menus) {
  const result = [];
  menus.forEach((menu) => {
    if (menu.children) {
      result.push(...flattenMenu(menu.children));
    } else {
      menu.Component = pages[menu.component];
      result.push(menu);
    }
  });
  return result;
}

// 子项目自己路径分别 + /subpage/xxx 
const prefixRoutes = flattenMenu(menus);

export default (
  <Switch>
    {prefixRoutes.map(child =>
      <Route
        exact
        key={child.key}
        path={child.path}
        component={child.Component}
        breadcrumbName={child.title}
      />
    )}
    <Redirect to="/home" />
  </Switch>);

静态资源加载逻辑设计

开始做方案时,只是设计出按需加载的交互体验:即当业务切换到子项目路径时,开始加载子项目的资源,然后渲染页面。但后面感觉这种改动影响了业务体验,他们以前只需要加载数据时loading,现在还需要承受子项目加载loading。所以为了让业务尽量小的感知系统的重构,将按需加载换成了按需提前加载。简单点说,就是当业务登录时,我们会去遍历他的所有权限菜单,获取他拥有那些子项目的访问权限,然后提前加载这些资源。

遍历菜单,提前加载子项目资源:

// 本地开发环境不提前按需加载
if (getDeployEnv() !== 'local') {
  const apps = getIncludeAppMap();
  // 按需提前加载子项目资源;
  apps.forEach(subKey => startAsyncSubapp(subKey));
}

然后就是show代码的时候了,思路参考webpackJsonp,就是通过拦截一个全局数组的push操作,得知子项目已加载完成:

import { subAppMapInfo } from './menus';

// 子项目静态资源映射表存放:
/**
 * 状态定义:
 * '': 还未加载
 * ‘start’:静态资源映射表已存在;
 * ‘map’:静态资源映射表已存在;
 * 'init': 静态资源已加载;
 * 'wait': 资源加载已完成, 待注入;
 * 'finish': 模块已注入;
*/
export const subAppRegisterStatus = {};

export const subappSourceInfo = {};

// 项目加载待处理的Promise hash 表
const defferPromiseMap = {};

// 项目加载待处理的错误 hash 表
const errorInfoMap = {};

// 加载css,js 资源
function loadSingleSource(url) {
  // 此处省略了一写代码
  return new Promise((resolove, reject) => {
    link.onload = () => {
      resolove(true);
    };
    link.onerror = () => {
      reject(false);
    };
  });
}

// 加载json中包含的所有静态资源
async function loadSource(json) {
  const keys = Object.keys(json);
  const isOk = await Promise.all(keys.map(key => loadSingleSource(json[key])));

  if (!isOk || isOk.filter(res => res === true) < keys.length) {
    return false;
  }

  return true;
}

// 获取子项目的json 资源信息
async function getManifestJson(subKey) {
  const url = subAppMapInfo[subKey];
  if (subappSourceInfo[subKey]) {
    return subappSourceInfo[subKey];
  }

  const json = await fetch(url).then(response => response.json())
    .catch(() => false);

  subAppRegisterStatus[subKey] = 'map';
  return json;
}

// 子项目提前按需加载入口
export async function startAsyncSubapp(moduleName) {
  subAppRegisterStatus[moduleName] = 'start'; // 开始加载
  const json = await getManifestJson(moduleName);
  const [, reject] = defferPromiseMap[moduleName] || [];
  if (json === false) {
    subAppRegisterStatus[moduleName] = 'error';
    errorInfoMap[moduleName] = new Error(`模块:${moduleName}, manifest.json 加载错误`);
    reject && reject(errorInfoMap[moduleName]);
    return;
  }
  subAppRegisterStatus[moduleName] = 'map'; // json加载完毕
  const isOk = await loadSource(json);
  if (isOk) {
    subAppRegisterStatus[moduleName] = 'init';
    return;
  }
  errorInfoMap[moduleName] = new Error(`模块:${moduleName}, 静态资源加载错误`);
  reject && reject(errorInfoMap[moduleName]);
  subAppRegisterStatus[moduleName] = 'error';
}

// 回调处理
function checkDeps(moduleName) {
  if (!defferPromiseMap[moduleName]) {
    return;
  }
  // 存在待处理的,开始处理;
  const [resolove, reject] = defferPromiseMap[moduleName];
  const registerApp = subappSourceInfo[moduleName];

  try {
    const moduleExport = registerApp();
    resolove(moduleExport);
  } catch (e) {
    reject(e);
  } finally {
    // 从待处理中清理掉
    defferPromiseMap[moduleName] = null;
    subAppRegisterStatus[moduleName] = 'finish';
  }
}

// window.registerApp.push(['collection', registerApp])
// 这是子项目注册的核心,灵感来源于webpack,即对window.registerApp的push操作进行拦截
export function initSubAppLoader() {
  window.registerApp = [];
  const originPush = window.registerApp.push.bind(window.registerApp);
  // eslint-disable-next-line no-use-before-define
  window.registerApp.push = registerPushCallback;
  function registerPushCallback(module = []) {
    const [moduleName, register] = module;
    subappSourceInfo[moduleName] = register;
    originPush(module);
    checkDeps(moduleName);
  }
}

// 按需提前加载入口
export function loadAsyncSubapp(moduleName) {
  const subAppInfo = subAppRegisterStatus[moduleName];

  // 错误处理优先
  if (subAppInfo === 'error') {
    const error = errorInfoMap[moduleName] || new Error(`模块:${moduleName}, 资源加载错误`);
    return Promise.reject(error);
  }

  // 已经提前加载,等待注入
  if (typeof subappSourceInfo[moduleName] === 'function') {
    return Promise.resolve(subappSourceInfo[moduleName]());
  }

  // 还未加载的,就开始加载,已经开始加载的,直接返回
  if (!subAppInfo) {
    startAsyncSubapp(moduleName);
  }

  return new Promise((resolve, reject = (error) => { throw error; }) => {
    // 加入待处理map中;
    defferPromiseMap[moduleName] = [resolve, reject];
  });
}

这里需要强调一下子项目有两种加载场景:

  • 从基座页面路径进入系统, 那么就是按需提前加载的场景, 那么startAsyncSubapp先执行,提前缓存资源;
  • 从子项目页面路径进入系统, 那就是按需加载的场景,就存在loadAsyncSubapp先执行,利用Promise完成发布订阅。至于为什么startAsyncSubapp在前但后执行,是因为useEffect是组件挂载完成才执行;

至此,框架的大致逻辑就交代清楚了,剩下的就是优化了。

其他难点

其实不难,只是怪我太菜,但这些点确实值得记录,分享出来共勉。

公共依赖共享

我们由于基座项目与子项目技术栈一致,另外又是拆分系统,所以共享公共库依赖,优化打包是一个特别重要的点,以为就是webpack配个external就完事,但其实要复杂的多。

antd 构建

antd 3.x就支持了esm,即按需引入,但由于我们构建工具没有做相应升级,用了babel-plugin-import这个插件,所以导致了两个问题,打包冗余与无法全量导出antd Modules。分开来讲:

  • 打包冗余,就是通过BundleAnalyzer插件发现,一个模块即打了commonJs代码,也打了Esm代码;
  • 无法全量导出,因为基座项目不知道子项目会具体用哪个模块,所以只能暴力的导出Antd所有模块,但babel-plugin-import这个插件有个优化,会分析引入,然后删除没用的依赖,但我们的需求和它的目的是冲突的;

结论:使用babel-plugin-import这个插件打包commonJs代码已经过时, 其存在的唯一价值就是还可以帮我们按需引入css 代码;

项目公共组件共享

项目中公共组件的共享,我们开始尝试将常用的组件加入公司组件库来解决,但发现这个方案并不是最理想的,第一:很多组件和业务场景强相关,加入公共组件库,会造成组件库臃肿;第二:没有必要。所以我们最后还是采用了基座项目收集组件,并统一暴露:

function combineCommonComponent() {
 const contexts = require.context('./components/common', true, /\.js$/);
 return contexts.keys().reduce((next, key) => {
   // 合并components/common下的组件
   const compName = key.match(/\w+(?=\/index\.js)/)[0];
   next[compName] = contexts(key).default;
   return next;
 }, {});
}

webpackJsonp 全局变量污染

如果对webpack构建后的代码不熟悉,可以先看看开篇提到的那篇文章。

webpack构建时,在开发环境modules是一个对象,采用文件path作为module的key; 而正式环境,modules是一个数组,会采用index作为module的key。
由于我基座项目和子项目没有做沙箱隔离,即window被公用,所以存在webpackJsonp全局变量污染的情况,在开发环境,这个污染没有被暴露,因为文件Key是唯一的,但在打正式包时,发现qa 环境子项目无法加载,最后一分析,发现了window.webpackJsonp 环境变量污染的bug。

最后解决的方案就是子项目打包都拥有自己独立的webpackJsonp变量,即将webpackJsonp重命名,写了一个简单的webpack插件搞定:

// 将webpackJsonp 重命名为 webpackJsonpCollect
config.plugins.push(new RenameWebpack({ replace: 'webpackJsonpCollect' }));

子项目开发热加载

基座项目为什么会成为基座,就因为他迭代少且稳定的特殊性。但开发时,由于子项目无法独立运行,所以需要依赖基座项目联调。但做一个需求,要打开两个vscode,同时运行两个项目,对于那个开发,这都是一个不好的开发体验,所以我们希望将dev环境作为基座,来支持本地的开发联调,这才是最好的体验。

将dev环境的构建参数改成开发环境后,发现子项目能在线上基座项目运行,但webSocket通信一直失败,最后找到原因是webpack-dev-sever有个host check逻辑,称为主机检查,是一个安全选项,我们这里是可以确认的,所以直接注释就行。

总结

这篇文章,本身就是个总结。如果有什么疑惑或更好的建议,欢迎一起讨论,issues地址

查看原文

Yourtion 收藏了文章 · 5月18日

详解Go语言的计时器

Go语言的标准库里提供两种类型的计时器TimerTickerTimer经过指定的duration时间后被触发,往自己的时间channel发送当前时间,此后Timer不再计时。Ticker则是每隔duration时间都会把当前时间点发送给自己的时间channel,利用计时器的时间channel可以实现很多与计时相关的功能。

文章主要涉及如下内容:

  • TimerTicker计时器的内部结构表示
  • TimerTicker的使用方法和注意事项
  • 如何正确Reset定时器

计时器的内部表示

两种计时器都是基于Go语言的运行时计时器runtime.timer实现的,rumtime.timer的结构体表示如下:

type timer struct {
    pp puintptr

    when     int64
    period   int64
    f        func(interface{}, uintptr)
    arg      interface{}
    seq      uintptr
    nextwhen int64
    status   uint32
}

rumtime.timer结构体中的字段含义是

  • when — 当前计时器被唤醒的时间;
  • period — 两次被唤醒的间隔;
  • f — 每当计时器被唤醒时都会调用的函数;
  • arg — 计时器被唤醒时调用 f 传入的参数;
  • nextWhen — 计时器处于 timerModifiedLater/timerModifiedEairlier 状态时,用于设置 when 字段;
  • status — 计时器的状态;

这里的 runtime.timer 只是私有的计时器运行时表示,对外暴露的计时器 time.Timertime.Ticker的结构体表示如下:

type Timer struct {
    C <-chan Time
    r runtimeTimer
}

type Ticker struct {
    C <-chan Time 
    r runtimeTimer
}

Timer.CTicker.C就是计时器中的时间channel,接下来我们看一下怎么使用这两种计时器,以及使用时要注意的地方。

Timer计时器

time.Timer 计时器必须通过 time.NewTimertime.AfterFunc 或者 time.After 函数创建。 当计时器失效时,失效的时间就会被发送给计时器持有的 channel,订阅 channelgoroutine 会收到计时器失效的时间。

通过定时器Timer用户可以定义自己的超时逻辑,尤其是在应对使用select处理多个channel的超时、单channel读写的超时等情形时尤为方便。Timer常见的使用方法如下:

//使用time.AfterFunc:

t := time.AfterFunc(d, f)

//使用time.After:
select {
    case m := <-c:
       handle(m)
    case <-time.After(5 * time.Minute):
       fmt.Println("timed out")
}

// 使用time.NewTimer:
t := time.NewTimer(5 * time.Minute)
select {
    case m := <-c:
       handle(m)
    case <-t.C:
       fmt.Println("timed out")
}

time.AfterFunc这种方式创建的Timer,在到达超时时间后会在单独的goroutine里执行函数f

func AfterFunc(d Duration, f func()) *Timer {
    t := &Timer{
        r: runtimeTimer{
            when: when(d),
            f:    goFunc,
            arg:  f,
        },
    }
    startTimer(&t.r)
    return t
}

func goFunc(arg interface{}, seq uintptr) {
    go arg.(func())()
}

从上面AfterFunc的源码可以看到外面传入的f参数并非直接赋值给了运行时计时器的f,而是作为包装函数goFunc的参数传入的。goFunc会启动了一个新的goroutine来执行外部传入的函数f。这是因为所有计时器的事件函数都是由Go运行时内唯一的 goroutinetimerproc运行的。为了不阻塞timerproc的执行,必须启动一个新的goroutine执行到期的事件函数。

对于NewTimerAfter这两种创建方法,则是Timer在超时后,执行一个标准库中内置的函数:sendTime

func NewTimer(d Duration) *Timer {
    c := make(chan Time, 1)
    t := &Timer{
        C: c,
        r: runtimeTimer{
            when: when(d),
            f:    sendTime,
            arg:  c,
        },
    }
    startTimer(&t.r)
    return t
}

func sendTime(c interface{}, seq uintptr) {
    select {
    case c.(chan Time) <- Now():
    default:
    }
}

sendTime将当前时间发送到Timer的时间channel中。那么这个动作不会阻塞timerproc的执行么?答案是不会,原因是NewTimer创建的是一个带缓冲的channel所以无论Timer.C这个channel有没有接收方sendTime都可以非阻塞的将当前时间发送给Timer.C,而且sendTime中还加了双保险:通过select判断Timer.CBuffer是否已满,一旦满了,会直接退出,依然不会阻塞。

TimerStop方法可以阻止计时器触发,调用Stop方法成功停止了计时器的触发将会返回true,如果计时器已经过期了或者已经被Stop停止过了,再次调用Stop方法将会返回false

Go运行时将所有计时器维护在一个最小堆Min Heap中,Stop一个计时器就是从堆中删除该计时器。

Ticker计时器

Ticker可以周期性地触发时间事件,每次到达指定的时间间隔后都会触发事件。

time.Ticker需要通过time.NewTicker或者time.Tick创建。

// 使用time.Tick:
go func() {
    for t := range time.Tick(time.Minute) {
        fmt.Println("Tick at", t)
    }
}()

// 使用time.Ticker
var ticker *time.Ticker = time.NewTicker(1 * time.Second)

go func() {
    for t := range ticker.C {
        fmt.Println("Tick at", t)
    }
}()

time.Sleep(time.Second * 5)
ticker.Stop()     
fmt.Println("Ticker stopped")

不过time.Tick很少会被用到,除非你想在程序的整个生命周期里都使用time.Ticker的时间channel。官文文档里对time.Tick的描述是:

time.Tick底层的Ticker不能被垃圾收集器恢复;

所以使用time.Tick时一定要小心,为避免意外尽量使用time.NewTicker返回的Ticker替代。

NewTicker创建的计时器与NewTimer创建的计时器持有的时间channel一样都是带一个缓存的channel,每次触发后执行的函数也是sendTime,这样即保证了无论有误接收方Ticker触发时间事件时都不会阻塞:

func NewTicker(d Duration) *Ticker {
    if d <= 0 {
        panic(errors.New("non-positive interval for NewTicker"))
    }
    // Give the channel a 1-element time buffer.
    // If the client falls behind while reading, we drop ticks
    // on the floor until the client catches up.
    c := make(chan Time, 1)
    t := &Ticker{
        C: c,
        r: runtimeTimer{
            when:   when(d),
            period: int64(d),
            f:      sendTime,
            arg:    c,
        },
    }
    startTimer(&t.r)
    return t
}

Reset计时器时要注意的问题

关于Reset的使用建议,文档里的描述是:

重置计时器时必须注意不要与当前计时器到期发送时间到t.C的操作产生竞争。如果程序已经从t.C接收到值,则计时器是已知的已过期,并且t.Reset可以直接使用。如果程序尚未从t.C接收值,计时器必须先被停止,并且-如果使用t.Stop时报告计时器已过期,那么请排空其通道中值。

例如:

if !t.Stop() {
  <-t.C
}
t.Reset(d)

下面的例子里producer goroutine里每一秒向通道中发送一个false值,循环结束后等待一秒再往通道里发送一个true值。在consumer goroutine里通过循环试图从通道中读取值,用计时器设置了最长等待时间为5秒,如果计时器超时了,输出当前时间并进行下次循环尝试,如果从通道中读取出的不是期待的值(预期值是true),则尝试重新从通道中读取并重置计时器。

func main() {
    c := make(chan bool)

    go func() {
        for i := 0; i < 5; i++ {
            time.Sleep(time.Second * 1)
            c <- false
        }

        time.Sleep(time.Second * 1)
        c <- true
    }()

    go func() {
        // try to read from channel, block at most 5s.
        // if timeout, print time event and go on loop.
        // if read a message which is not the type we want(we want true, not false),
        // retry to read.
        timer := time.NewTimer(time.Second * 5)
        for {
            // timer is active , not fired, stop always returns true, no problems occurs.
            if !timer.Stop() {
                <-timer.C
            }
            timer.Reset(time.Second * 5)
            select {
            case b := <-c:
                if b == false {
                    fmt.Println(time.Now(), ":recv false. continue")
                    continue
                }
                //we want true, not false
                fmt.Println(time.Now(), ":recv true. return")
                return
            case <-timer.C:
                fmt.Println(time.Now(), ":timer expired")
                continue
            }
        }
    }()

    //to avoid that all goroutine blocks.
    var s string
    fmt.Scanln(&s)
}

程序的输出如下:

2020-05-13 12:49:48.90292 +0800 CST m=+1.004554120 :recv false. continue
2020-05-13 12:49:49.906087 +0800 CST m=+2.007748042 :recv false. continue
2020-05-13 12:49:50.910208 +0800 CST m=+3.011892138 :recv false. continue
2020-05-13 12:49:51.914291 +0800 CST m=+4.015997373 :recv false. continue
2020-05-13 12:49:52.916762 +0800 CST m=+5.018489240 :recv false. continue
2020-05-13 12:49:53.920384 +0800 CST m=+6.022129708 :recv true. return

目前来看没什么问题,使用Reset重置计时器也起作用了,接下来我们对producer goroutin做一些更改,我们把producer goroutine里每秒发送值的逻辑改成每6秒发送值,而consumer gouroutine里和计时器还是5秒就到期。

  // producer
    go func() {
        for i := 0; i < 5; i++ {
            time.Sleep(time.Second * 6)
            c <- false
        }

        time.Sleep(time.Second * 6)
        c <- true
    }()

再次运行会发现程序发生了deadlock在第一次报告计时器过期后直接阻塞住了:

2020-05-13 13:09:11.166976 +0800 CST m=+5.005266022 :timer expired

那程序是在哪阻塞住的呢?对就是在抽干timer.C通道时阻塞住了(英文叫做drain channel比喻成流干管道里的水,在程序里就是让timer.C管道中不再存在未接收的值)。

if !timer.Stop() {
    <-timer.C
}
timer.Reset(time.Second * 5)

producer goroutine的发送行为发生了变化,comsumer goroutine在收到第一个数据前有了一次计时器过期的事件,for循环进行一下次循环。这时timer.Stop函数返回的不再是true,而是false,因为计时器已经过期了,上面提到的维护着所有活跃计时器的最小堆中已经不包含该计时器了。而此时timer.C中并没有数据,接下来用于drain channel的代码会将consumer goroutine阻塞住。

这种情况,我们应该直接Reset计时器,而不用显式drain channel。如何将这两种情形合二为一呢?我们可以利用一个select来包裹drain channel的操作,这样无论channel中是否有数据,drain都不会阻塞住。

//consumer
    go func() {
        // try to read from channel, block at most 5s.
        // if timeout, print time event and go on loop.
        // if read a message which is not the type we want(we want true, not false),
        // retry to read.
        timer := time.NewTimer(time.Second * 5)
        for {
            // timer may be not active, and fired
            if !timer.Stop() {
                select {
                case <-timer.C: //try to drain from the channel
                default:
                }
            }
            timer.Reset(time.Second * 5)
            select {
            case b := <-c:
                if b == false {
                    fmt.Println(time.Now(), ":recv false. continue")
                    continue
                }
                //we want true, not false
                fmt.Println(time.Now(), ":recv true. return")
                return
            case <-timer.C:
                fmt.Println(time.Now(), ":timer expired")
                continue
            }
        }
    }()

运行修改后的程序,发现程序不会被阻塞住,能正常进行通道读取,读取到true值后会自行退出。输出结果如下:

2020-05-13 13:25:08.412679 +0800 CST m=+5.005475546 :timer expired
2020-05-13 13:25:09.409249 +0800 CST m=+6.002037341 :recv false. continue
2020-05-13 13:25:14.412282 +0800 CST m=+11.005029547 :timer expired
2020-05-13 13:25:15.414482 +0800 CST m=+12.007221569 :recv false. continue
2020-05-13 13:25:20.416826 +0800 CST m=+17.009524859 :timer expired
2020-05-13 13:25:21.418555 +0800 CST m=+18.011245687 :recv false. continue
2020-05-13 13:25:26.42388 +0800 CST m=+23.016530193 :timer expired
2020-05-13 13:25:27.42294 +0800 CST m=+24.015582511 :recv false. continue
2020-05-13 13:25:32.425666 +0800 CST m=+29.018267054 :timer expired
2020-05-13 13:25:33.428189 +0800 CST m=+30.020782483 :recv false. continue
2020-05-13 13:25:38.432428 +0800 CST m=+35.024980796 :timer expired
2020-05-13 13:25:39.428343 +0800 CST m=+36.020887629 :recv true. return

总结

以上比较详细地介绍了Go语言的计时器以及它们的使用方法和注意事项,总结一下有如下关键点:

  • TimerTicker都是在运行时计时器runtime.timer的基础上实现的。
  • 运行时里的所有计时器的事件函数都由运行时内唯一的goroutinetimerproc触发。
  • time.Tick创建的Ticker在运行时不会被gc回收,能不用就不用。
  • TimerTicker的时间channel都是带有一个缓冲的通道。
  • time.Aftertime.NewTimertime.NewTicker创建的计时器触发时都会执行sendTime
  • sendTime和计时器带缓存的时间通道保证了计时器不会阻塞程序。
  • Reset计时器时要注意drain channel和计时器过期存在竞争条件。

第1页.png

查看原文

Yourtion 收藏了文章 · 5月14日

系统服务化构建-状态码设计要点

Code状态码码是接口设计中的常见概念,本文主要讨论接口开发中Code码设计。从客户端和服务器端开发的角度,给出具体的工程实践建议和思考。

从笔者之前的一份接口文档定义开始说起,文档中定义的服务端接口输出格式如下

接口输出格式

返回数据由两部分构成,第一部分是对结果集的说明,第二部分是data节点

{
    "code": 4302,

    "message": "no sign",

    "time": 1487832032,

    "data": []
}

第一部分,无论错误与否,都会有如下片段。

code:信息代号

message:信息描述

time:接口返回时间

第二部分是具体数据如下:

data节点

我们可以 看到code=4302,4302并不是一个HTTP 协议状态码,而是一个业务状态码,是业务领域的含义,并非我们常见的HTTP 协议层面的响应状态码。

业务状态码与HTTP 状态码

在REST 接口设计规范中,我们通常都会被引导为这里的Code 应该是HTTP 协议状态码 200,404 或者501等。

实际上这是实践中的一种折中的方式,Code 会包含HTTP状态码和业务状态码

业界为什么会有这种实践,与客户端的解析数据方式有很大关系,下文中会给出答案。

说到这里,我们引出了两个概念,一个是业务状态码,一个是HTTP请求状态码。

两个概念很好理解

业务状态码

状态码对应.jpg

业务状态码是服务端给出的关于业务描述的码,用于客户端明确得知本次请求的资源的状态情况。上文例子中的4032被认为是一个缺少签名sign的业务状态码。有业务状态码输出表明当次HTTP 请求是通的。

业务状态码是可变的,没有业界标准,是一种资源状态描述,与HTTP响应状态码也不存在对应关系。

如下文图片HTTP-200 显示,接口是通的 HTTP 状态响应返回 200,但是业务没有执行成功,code用1 表示。

HTTP-200.png

HTTP/1.1 200 OK
Server: nginx
Date: Wed, 13 Nov 2019 01:27:03 GMT
Content-Type: application/json; charset=UTF-8
Transfer-Encoding: chunked
Connection: close
X-Powered-By: PHP/5.6.15
Access-Control-Allow-Origin: *
Access-Control-Allow-Methods: POST, PUT, GET, DELETE, OPTIONS
Access-Control-Allow-Headers: token, app-key, content-type, etcp-base
Access-Control-Max-Age: 86400
X-Frame-Options: SAMEORIGIN
X-Content-Type-Options: nosniff

{
  "code": 1,
  "message": "states is wrong",
  "data": []
}

HTTP状态码

HTTP请求状态码是HTTP协议的一部分,用于表明HTTP响应状态。

rest响应401.png

HTTP 状态码是HTTP 协议的工程实现,不符合协议规定的服务器端实现,我们可以认为 服务器的HTTP 实现是错误的。

这里举一个简单的幂等性例子,我们知道DELETE 方法是幂等的,如果之前已经删除过特定的资源,再次请求时也应该返回200的响应码,而不是404资源不存在的响应。

服务器端的开发实践

为什么上文中着重介绍状态码的两种分类,因为在业界开发中,这两种码会交叉使用,都有具体的使用场景,语义上不应该被混淆。

这里抛出几个问题

如何用Code码表明此次访问是连接成功的

如何用Code码表明此次访问达到了客户端预想的结果

客户端应该先接收HTTP状态码还是业务状态码

客户端HTTP 请求

先对本文中的客户端做一个简单定义,即调用服务器端接口的调用者,主要是前端WebView,安卓和iOS工程师,统称大前端。前端WebView的请求会涉及到跨域CORS

其实简单来说,客户端工程师最关心两个问题:

第一,接口有没有通。

第二,接口有没有返回我想要的数据。

有经验的客户端工程师会关心接口如果不通,返回提示是否可以指导我排除错误,或者说跟踪到问题所在。接下来接口设计是否合理,是否有隐患,就看工程师职业水平和职业素养了。

客户端排除法

客户端HTTP 请求的通用方法是采用排除法,什么是排除法,客户端在请求服务端的REST 接口时,会先在网络层面判断接口是否通,包括404或者200。客户端只关心本身有用的Code,其余都按异常处理。

网络层判断这个任务客户端会交给具体的HTTP 拦截器 (Intercept),之后才会接受当次接口的描述信息也就是data 和code,做业务前端处理。

axios 就是一个主要用于浏览器请求的HTTP 客户端,包含请求响应拦截器(Intercept request and response)

Promise based HTTP client for the browser and node.js

以下代码是两段响应拦截,分别是拦截HTTP 协议的401验证不通过和自定义业务代码的验证不通过。

HTTP 401

axios.interceptors.response.use(function (res) {
  return res.data;
}, function (res) {
  let response = res && res.response || {};
  if (response.status == 401) {
    tool.showToast('登录已过期,请重新登录。');
    tool.removeReUserInfo();
    location.hash = "#/login";
  } else {
    tool.showToast('请求数据失败,请稍后再试。');
  }
});

自定义业务代码

axios.interceptors.response.use(function (res) {
  Indicator.close(); //首先关闭所有的提示窗口
  var data = res.data;

  if (data.code == 4034) { //签名不合法
    tool.showToast('签名不合法。');
  } else if (data.code == 4033) { //token失效
    tool.showToast('登录已过期,自动登录中。');
    tool.removeUserInfo();
     location.hash = "#/login";
  } else {
    return data;
  }

}, function (err) {
  // alert(JSON.stringify(err));
  Indicator.close(); //首先关闭所有的提示窗口
  tool.showToast('请求数据失败,请稍后再试。');
});

安卓客户端拦截器

okhttp 是一个安卓平台的HTTP 客户端,其中包含一个网络拦截器(Network Interceptors)。网络状态码和业务状态码的截取都交给拦截器处理处理。

图片.png

设计倡导

这里我们重新梳理之前提出的三个问题,给出一些解决思路,同时总结一些经验

如何用Code码表明此次访问是连接成功的?

这里应该以HTTP 状态码为依据,主要有200, 401 ,表明请求是【触碰到关于的数据处理的业务部分了】如

HTTP/1.1 200 OK
{
  "code": 0,
  "message": "客户端已是最新版本",
  "data": {
    "code": 10
  },
  "debug_stack": []
}
HTTP/1.1 401 Unauthorized
Server: nginx

{
  "name": "Unauthorized",
  "message": "Token is expired",
  "code": 401,
  "status": 401,
  "type": "yii\\web\\HttpException"
}
如何用Code码表明此次访问达到了客户端预想的结果?

这里以业务状态码的数据为依据,获取到的就是真实的。Code可以用 0 表示。

{
  "code": 0,
  "message": "客户端已是最新版本",
  "data": {
    "code": 10
  },
  "debug_stack": []
}
客户端应该先接收HTTP 状态码还是业务状态码?

当然是先接收HTTP 状态码,其次是业务状态码,不混淆,也不能混淆。从软件分层的角度来说,接收HTTP 状态码在接收业务状态码的上层,通常由拦截器来做,比如token过期的401阻挡。

一般情况下,0表示成功,1表示业务操作失败。业务复杂时,需要维护多种业务状态码。下图是微信平台的业务状态码枚举,场景较多。

微信错误码.png

接口字段整齐

这里所说的字段整齐是指服务提供方给到的数据结构是完整的,最通用的,现在大部分接口格式如下

三个字段应该都存在,可以为空,避免NULL。

{
    "code": 200,
    "data": null,
    "message": "成功"
}

对于提供接口开发的服务者而言,code和message字段都会给出,存在异议的字段是data。更严谨的说法是 请求的资源描述中伙包含资源状态编码和描述信息,如message。

当data 没有数据时,有的工程师喜欢把data置为null,或者直接不返回data字段。这两种方式都不合理,都会增加调用方的判断成本,尤其是null,如果调用方写法不严谨的话,很容易引发程序异常。

接口总会有返回值,data字段就是实际的返回值,可以是空字符串,空数组,bool类型。

图灵奖得主Tony Hoare 曾经公开表达过null是一个糟糕的设计,null总是代表着不确定性。

null.jpeg

业务状态码不等于异常

业务状态码和异常是两个概念,切忌混淆。业务状态码指正常的业务处理结果的显示说明,而异常通常由于语法错误,数据缺失造成程序不能正常执行完成。不能通过业务状态码而屏蔽异常。

总结

本文从接口文档开始,引出了状态码的概念,细分为网络状态码和业务状态码。结合服务器端和客户端的编程角度,介绍了各自的使用场景。在分布式服务化的网络架构中,清晰的网络状态码和业务状态码有助于服务链路的跟踪和服务的链路跟踪,尤其是异常的定位和捕获。业务状态码应该趋于同一化,与网络状态码相互补充。

参考文档中给出一些资源,有兴趣的读者可以参考阅读。

参考文档

axios-interceptors-response-undefined

查看原文

认证与成就

  • 获得 34 次点赞
  • 获得 4 枚徽章 获得 0 枚金徽章, 获得 0 枚银徽章, 获得 4 枚铜徽章

擅长技能
编辑

(゚∀゚ )
暂时没有

开源项目 & 著作
编辑

(゚∀゚ )
暂时没有

注册于 2014-04-18
个人主页被 536 人浏览