simon_woo

simon_woo 查看完整档案

广州编辑华南理工大学  |  计算机科学与工程 编辑阿里巴巴  |  前端开发工程师 编辑 simonwoo.github.io 编辑
编辑

我是一个爱生活,爱摄影,爱代码的前端工程师。我要成为这个宇宙最牛逼的大神。

个人动态

simon_woo 关注了专栏 · 2019-12-12

pure render

一群志同道合的小伙伴,分享关于 React, Flux 在实践中的经验与想法 为 segmentfault 的小伙伴可以看到文章,特此同步在知乎上写的专栏 原作请到,http://zhuanlan.zhihu.com/purerender 未加允许,不得转载

关注 271

simon_woo 发布了文章 · 2019-07-10

基于Umi的开发方案

基于Umi的开发方案

Umi是阿里的一款基于React的企业级应用框架。本文将会从3个方面介绍下基于Umi的开发方案:

  • umi是什么?
  • 怎么使用umi?
  • umi是如何实现的?

    umi是什么

    umi是一款可插拔的企业级react应用框架,支持约定式路由以及各种进阶路由功能,并以此进行功能扩展,拥有完善的插件体系,覆盖从源码到构建产物的每个生命周期,支持各种功能扩展和业务需求。

    它有以下特性:

  • 开箱即用,内置react,react-router等
  • 约定式路由,同时支持可配置路由
  • 完善的插件体系
  • 支持typescript
  • 支持dva数据方案

一个umi工程的生命周期如下,它包含源码到上线的整个流程,umi 首先会加载用户的配置和插件,然后基于配置或者目录,生成一份路由配置,再基于此路由配置,把 JS/CSS 源码和 HTML 完整地串联起来。用户配置的参数和插件会影响流程里的每个环节。

生命周期

使用

创建一个umi项目

创建一个umi项目可通过2种方式,手工创建和脚手架创建。

手工创建

第一步,创建相关文件夹:

mkdir umi_app && cd umi_app
npm init
mkdir pages

第二步,增加npm script:

"scripts": {
    "start": "umi dev",
    "build": "umi build”,
}

第三步,增加依赖:

"devDependencies": {
    "umi": "^2.6.3"
}

第四步,在pages目录下,增加新模块。
最后,使用npm run start即可运行该项目。

脚手架创建

umi提供了脚手架工具create-umi来加快umi工程的创建。

mkdir umi_app && cd umi_app
create-umi
npm i

最后,使用npm run start即可运行项目,在localhost:8000访问该项目。

业务开发

创建出来的项目目录结构如下:

.
├── dist/                          // 默认的 build 输出目录
├── mock/                          // mock 文件所在目录,基于 express
├── config/
    ├── config.js                  // umi 配置,同 .umirc.js,二选一
└── src/                           // 源码目录,可选
    ├── layouts/index.js           // 全局布局
    ├── pages/                     // 页面目录,里面的文件即路由
        ├── .umi/                  // dev 临时目录,需添加到 .gitignore
        ├── .umi-production/       // build 临时目录,会自动删除
        ├── document.ejs           // HTML 模板
        ├── 404.js                 // 404 页面
        ├── page1.js               // 页面 1,任意命名,导出 react 组件
        ├── page1.test.js          // 用例文件,umi test 会匹配所有 .test.js 和 .e2e.js 结尾的文件
        └── page2.js               // 页面 2,任意命名
    ├── global.css                 // 约定的全局样式文件,自动引入,也可以用 global.less
    ├── global.js                  // 可以在这里加入 polyfill
    ├── app.js                     // 运行时配置文件
├── .umirc.js                      // umi 配置,同 config/config.js,二选一
├── .env                           // 环境变量
└── package.json

.umi目录是umi dev生成的临时目录,默认包含 umi.js 和 router.js。.umi-production是在umi build生成的临时目录。

通过命令行umi g page users生成users页面,在localhost:8000/users即可访问该页面。

同时,也可以使用dva作为状态管理工具配合umi进行开发,可参考umi + dva,完成用户管理的 CURD 应用

使用插件

基于umi的插件机制,你可以获得扩展项目的编译时和运行时的能力。通过插件支持的功能也会变得更强大,我们针对功能的需要可以去使用修改代码打包配置,修改启动代码,约定目录结构,修改 HTML 等更丰富接口。插件可以是一个 npm 包,也可以是路径直接引向一个 JS 的模块。用户通过配置 plugins 来使用插件。如下所示:

// .umirc.js
export default {
  plugins: [
    [
      'umi-plugin-dva',
      {
        immer: true,
      },
    ],
    [
      './src/plugins/customPlugin.js',
      {
        // plugin config
      },
    ],
  ],
};

如何实现

umi的插件机制非常优秀,我们通过umi dev进行分析,来窥探下umi的插件机制是如何实现的。umi的源代码地址:umi github。通过源代码看出,它是一个lerna的多packages项目,源代码在packages目录下。

umi dev的整个流程如下:
umi dev

umi包主要对外提供一些命令,如umi devumi buildumi inspectumi test

它通过实例化一个Service实例完成整个流程,new Service().run('dev', args);

Service的结构如下:
umiService.png

Service实例化之后,运行run方法,该方法有两步,第一步初始化,第二步运行对应的命令:
run

init方法对plugins中的各个plugin进行初始化:
initPlugin.png

initPlugin方法中通过Proxy对各个插件进行挂载更多的方法,dev命令注册的方法位于/umi-build-dev/src/plugins/commands/dev/index.js下。可以看出该命令最终通过af-webpack启动webpack运行起项目。

.umi目录是怎么生成的?

dev命令行中通过注册filesGenerator去生成.umi目录:
.umi

它通过chokidar对文件目录进行监控,当文件有更新时,会重新进行该目录的生成。

webpack配置如何封装?

Service是通过webpack-chain对webpack配置进行链式封装的:
webpack

查看原文

赞 6 收藏 1 评论 1

simon_woo 发布了文章 · 2019-06-25

Ant Design Pro 的 Docker 部署方式

背景

Ant Design Pro是一个企业级中后台解决方案,在Ant Design组件库的基础上,提炼出典型模板/业务组件/通用页等,在此基础上能够使开发者快速的完成中后台应用的开发。

在使用Ant Design Pro的过程中,可以发现它提供了一系列基于docker的开发部署方式,如下图。但是官方文档中并没有具体的介绍,本文的主要目的就是解析Ant Design Pro中对于docker的使用。

npm scripts

docker相关

为什么使用docker?

  • 环境部署是所有团队都必须面对的问题,随着系统越来越大,依赖的服务也越来越多,例如:Web服务器 + MySql数据库 + Redis缓存等
  • 依赖服务很多,本地搭建一套环境成本越来越高,初级人员很难解决环境部署中的一些问题
  • 服务的版本差异及OS的差异都可能导致线上环境BUG,项目引入新的服务时所有人的环境需要重新配置

任何安装过Docker的机器都可以运行这个容器获得同样的结果, 同的容器,从而完全消除了不同环境,不同版本可能引起的各种问题。例如,在前端开发中通常会遇到nodejs版本问题,就可以通过docker的方式进行解决。

docker中的概念

Docker有三个基本概念:镜像(image),容器(container),仓库(repository)

  • 镜像(image): 镜像中包含有需要运行的文件。镜像用来创建container,一个镜像可以运行多个container;镜像可以通过Dockerfile创建,也可以从Docker hub/registry上下载。
  • 容器(container): 容器是Docker的运行组件,启动一个镜像就是一个容器,容器是一个隔离环境,多个容器之间不会相互影响,保证容器中的程序运行在一个相对安全的环境中。
  • 仓库(repository): 共享和管理Docker镜像,用户可以上传或者下载上面的镜像,官方地址为 https://registry.hub.docker.com/ (类似于github对源代码的管理),也可以搭建自己私有的Docker registry。

常见docker命令

  • 使用当前目录Dockerfile创建镜像,标签为xxx:v1: docker build -t xxx:v1 .
  • 创建新容器并运行: docker run --name mynginx -d nginx:latest
  • 在容器中开启交互终端:docker exec -i -t container_id /bin/bash
  • 启动容器:docker start container_name/container_id
  • 停止容器:docker stop container_name/container_id
  • 重启容器:docker restart container_name/container_id

什么是docker-compose?

实际项目中,不可能只单单依赖于一个服务,例如一个常见的Web项目可能依赖于: 静态文件服务器,应用服务器,Mysql数据库等。我们可以通过分别启动单个镜像,并把镜像绑定到本地对应端口的形式进行部署,达到容器可通信的目的。但是为了更方便的管理多容器的情况,官方提供了docker-compose的方式。docker-compose是Docker的一种编排服务,是一个用于在 Docker 上定义并运行复杂应用的工具,可以让用户在集群中部署分布式应用。

compose中有两个重要的概念:

  • 服务 (service):一个应用的容器,实际上可以包括若干运行相同镜像的容器实例。
  • 项目 (project):由一组关联的应用容器组成的一个完整业务单元,在docker-compose.yml 文件中定义。

一个项目可以由多个服务(容器)关联而成,compose 面向项目进行管理,通过子命令对项目中的一组容器进行便捷地生命周期管理。

脚本解析

本地开发运行

npm run docker:dev该命令使用docker-compose up命令通过docker-compose.dev.yml模板启动相关容器。

docker-compose.dev.yml内容如下:
docker-compose-dev

这个compose文件定义了一个服务:ant-design-pro_dev。该服务使用Dockerfile.dev构建了当前镜像。将该容器内部的8000端口映射到host的8000端口。为了容器和host的数据同步,该容器挂载三个数据卷:../src:/usr/src/app/src, ../config:/usr/src/app/config, ../mock:/usr/src/app/mock。它将主机目录映射到容器,这样容器内的三个目录可以跟host对应的三个目录做到数据同步。

Dockerfile.dev内容如下:
dockerfile-dev.png
该容器使用node:latest作为基础镜像,并设定/usr/src/app作为工作目录。首先将package.json文件复制到该目录,并安装相关的依赖包,之后复制该文件夹下所有内容到该目录下,并使用npm run start启动应用。由于数据卷的存在,本地的三个文件夹下的任何修改都可以同步到容器中,达到更新的目的。

至此,整个开发环境搭建完成。

生产环境

npm run docker-pro:dev该命令使用docker-compose up命令通过docker-compose.yml模板启动相关容器。

docker-compose.yml内容如下:
docker-compose
这个compose文件定义了两个服务:ant-design-pro_buildant-design-pro_web

ant-design-pro_build使用Dockerfile构建镜像。

Dockerfile内容如下:
Dockerfile

该镜像实际只做了文件的构建(npm run build),构建的结果存放在/dist目录中,并通过数据卷共享该目录。由此看来,该容器只作为一个数据卷容器为其他镜像提供数据服务。

ant-design-pro_web使用nginx镜像,nginx容器的80端口绑定host的80端口,并将ant-design-pro_build的dist目录挂载到nginx服务器/usr/share/nginx/html目录,将nginx.conf挂载到/etc/nginx/conf.d/default.conf

容器视图如下:
compose.png

通过http://localhost:80就可以访问该部署了前端静态文件的nginx容器了。

生产镜像构建

npm run docker-hub:build命令通过docker build构建Dockerfile.hub定义的镜像。

Dockerfile.hub文件定义如下:

Dockerfile.hub

该Dockerfile使用了多阶段构建的方式,第一阶段构建出编译后的前端资源文件,第二阶段将第一阶段构建出来的前端资源文件复制到nginx的指定目录,构建出一个包含前端静态资源以及nginx.conf的镜像。

在指定服务器直接运行该镜像,便可以使用服务。

顺便提一句,这个Dockerfile.hub文件是由我提给官方并合并到master分支的,详情见:https://github.com/ant-design... 。还是有小小的激动的。

Reference

查看原文

赞 1 收藏 0 评论 0

simon_woo 发布了文章 · 2019-06-20

小团队适用的git-workflow

主流程

gitworkflow

涉及分支

  • master分支:主干分支,用于发布到生成环境,master分支上的commit源自于release分支和hotfix分支的合并,每个commit都应该有相应的tag。
  • develop分支:主开发分支,feature分支和release分支都是基于该分支进行,所有开发经过code review后都会应用到该分支。
  • feature分支:开发新功能使用的分支,它从develop分支切出,并最终将会合并到develop分支。feature分支的命名规范为:feature-YYYYMMDD-特性(以下划线分割),例:feature-20180701-xxx
  • release分支:发布分支,当完成某个里程碑的开发时,需要发布版本。这是从develop分支切出release分支,进行发布。针对发布后接收到的反馈,在该release分支进行bugfix,当该分支稳定时,分别合并到master和develop分支,之后删除该release分支。release分支的命名规范为:release-版本号,例:release-1.0.0
  • hotfix分支:针对线上版本进行bugfix,然后分别合并到master和develop分支,hotfix分支的命名规范为:hotfix-bug, 例:hotfix-table_height_fix

Git flow流程

开发新分支

建立新分支 - feature分支

git checkout develop // 切换到develop分支

git pull // 与远程分支同步develop

git checkout -b feature-20180701-xxx// 建立新分支,进行开发

将新分支推到远程

git checkout develop // 切换到develop分支

git pull // 与远程分支同步develop

git checkout feature-20180701-xxxx // 切换到你的开发分支

git rebase develop // 基于最新的develop分支进行代码合并,解决冲突

git push origin feature-20180701-xxx // 将你的分支推向远程

pull request

去gitlab发起一个针对develop分支的pull request

需了解:

  • git rebase vs git merge ?
  • 代码冲突如何解决 ?
  • 如何发起一个pull request?

发布

建立release分支

git checkout develop // 切换到develop分支

git pull // 与远程分支同步develop

git checkout -b release-1.0.0 // 建立release分支

git push origin release-1.0.0 // 推向远程

基于Release分支进行bugfix

git checkout release-1.0.0 // 切换到release分支

git pull // 与远程同步release

git checkout -b release-1.0.0_bugfix // 基于release分支建立bugfix分支,进行开发

git push origin release-1.0.0_bugfix // 推向远程

pull request

去gitlab发起一个针对release分支的pull request

将release分支分别合并到master和develop,对master分支进行打tag

git branch -D release-1.0.0 // 删除该release分支

线上bugfix

同上,hotfix是基于master的。

将hotfix分别合并到master和develop,对master分支进行打tag

其他常见git操作

  • 将n个commit合成一个commit,git rebase -i HEAD~n
  • 将某个分支的某个commit放到另外一个分支上,git cherry-pick commitID
  • 重命名commit,git commit --amend

工具

  • bash-git-promot - An informative and fancy bash prompt for Git users
  • source-tree - git client

Reference

查看原文

赞 2 收藏 1 评论 0

simon_woo 发布了文章 · 2019-06-15

基于前端技术生成PDF方案

需求背景

  • 业务系统需要预览报告(如产品周报,体检报告等)并生成pdf格式供用户下载,或者定期发送给指定用户
  • 报告格式相对固定,由文本,图片和图表组成,基本与前端页面保持一致

解决方案

需求分为两步:报告预览和报告生成。

  • 报告预览在前端进行展示,可使用前端技术,如React/Vue等技术栈对其进行还原,数据从服务端获取。
  • 报告生成需要对第一步生成的HTML进行PDF的转换生成,HTML2PDF的方式又分为两种:

    • 基于canvas的客户端生成方案
    • 基于nodejs + puppeteer的服务端生成方案

一个完整的案例

下面以一个体检报告的案例进行这两种方案的说明:
体检报告展示形式如下,格式相对固定,分为四个页面:个人信息页,建议页,原理页,个人信息页与建议页数据来源于服务器。
report.png
clipboard.png

基于canvas的客户端生成方案

canvas是HTML5标准中新增的元素,可用于通过使用JS的脚本来绘制图形。canvas提供了toDataURL/toBlob方法,用于把canvas中的内容转换为图片,API文档如下(来源于MDN):

canvas api

由于HTML文档再浏览器中是以DOM树的形式存在,所以我们可以通过三步完成HTML到PDF的转换:

  • 将DOM树转换为canvas对象,可使用html2canvas完成
  • 将canvas转换为图片,可使用canvas.toDataURL完成
  • 将图片转换为PDF,可使用jsPDF完成

完整代码实现:https://github.com/simonwoo/d...

截图如下,点击下载按钮可进行pdf生产:

clipboard.png

该方案完全基于客户端的方式生成,不需要服务器进行支持。在使用该方案的过程中,发现了一些问题:

  • 生产的PDF比较模糊,质量不高
  • 如果HTML中有外链图片,无法生成
  • 由于第一步是通过DOM去生成canvas,所以针对特别长的报告,DOM尚未加载完便点击下载时,会造成报告生成问题
  • 因为是客户端方案,所以需要用户主动触发生成,但对于一些定期发送给用户的报告,该方案无法使用

基于nodejs + puppeteer的服务端生成方案

puppeteer是google推出的headless浏览器,即没有图形界面的浏览器,但又可以实现普通浏览器HTML/JS/CSS的渲染,以及其他基本浏览器功能。你可以理解为一个没有界面的Chrome浏览器。主要有以下几种使用场景:

  • 生成页面的截图和PDF
  • 抓取SPA并生成预先呈现的内容(即“SSR”)
  • 爬虫,从网站抓取你需要的内容
  • 自动化测试,自动表单提交,UI测试,键盘输入等
  • 创建一个最新的自动化测试环境。使用最新的JavaScript和浏览器功能,直接在最新版本的Chrome中运行测试

通过理解puppeteer的功能,我们可以开启一个实例去渲染HTML报告,然后再利用其提供的转换PDF功能进行PDF的生成。

两个重要的API:

  • page.goto(url, [options]) - 打开指定url的文件,可以是本地文件(file://)也可以是网络文件(http://
  • page.pdf([options]) - 转换页面成PDF文件

puppeteer使用一个小例子,将百度网页转换为pdf:

puppeteer使用

完整代码如下:

项目启动流程如下:

  • 进入到webapp目录,使用npm install和npm run start启动前端服务器,地址为:localhost:3000
  • 进入到server目录,使用npm install和npm run dev启动node服务器,地址为:localhost:7001

整个服务架构如下:
架构图

node服务器通过路由增加一个pdf生成的controller,该controller通过启动puppeteer实例去加载localhost:3000的页面并生成pdf。直接在浏览器中通过http://localhost:7001/pdf即可访问到生成的pdf.

在实际环境中,前端页面可部署在nginx服务器上或者直接放在Node服务器上,puppeteer也支持使用cookie的操作,这样可以避免一些需要身份认证的问题。

相比客户端生成方式,使用puppeteer生成的pdf质量比较高,可满足生产要求。

本文中提到的两种方案中,均省去了ajax后端请求数据部分,读者可根据需要自行增加。

Reference

原文链接:https://juejin.im/post/5d036a...

查看原文

赞 6 收藏 6 评论 1

simon_woo 发布了文章 · 2019-06-10

工作中常用的npm包

工具类

lodash

工具库,封装了处理arrays,numbers,objects,string等常见的函数,是对标准库的补充。业务开发中常用的函数有很多,如:assign, times, debounce, get, find, filter, keyBy, cloneDeep, groupBy, omit, pick等。示例如下:

const _ = require('lodash');

const obj = {
    name: 'zhangsan',
    age: 20,
    friends: [{
        name: 'lisi',
        age: 18
    }, {
        name: 'wanger',
        age: 19
    }]
};

// 深复制,等价于JSON.parse(JSON.stringify(obj)),但是JSON.parse的形式无法复制函数
const obj2 = _.cloneDeep(obj); 
obj === obj2; // false

// 深获取属性,业务开发中可以取代if(a && a.b && a.b.c)的形式
_.get(obj, 'friends[0].age', 0); // 18

// extend properties
_.assign({}, {
    name: 'zhangsan'
}, {
    age: 20
}); // { name: 'zhangsan', age: 20 }

// remove properties
_.omit({
    name: 'zhangsan',
    age: 20
}, ['age']); // { name: 'zhangsan'}

// pick properties
_.pick({
    name: 'zhangsan',
    age: 20
}, ['age']); // { age: 20}

const arr = [{
        name: 'zhangsan',
        age: 20
    },
    {
        name: 'lisi',
        age: 19
    },
    {
        name: 'wanger',
        age: 18
    }
];

_.keyBy(arr, 'name'); // { zhangsan: { name: 'zhangsan', age: 20 }, lisi: { name: 'lisi', age: 19 }, wanger: { name: 'wanger', age: 18 } }

// debounce(消抖,停止的时候执行一次)和throttle(节流,每间隔一定时候执行一次)
_.debounce(keyword => {
    console.log(keyword);
}, 500);

// N loop
_.times(6, _.uniqueId.bind(null, 'key_')); // [ 'key_1', 'key_2', 'key_3', 'key_4', 'key_5', 'key_6' ]

// 安全的JSON.parse
function safeParse(str, defaultValue) {
    let result = _.attempt(function(str) {
        return JSON.parse(str)
    }, str)

    return _.isError(result) ? defalutValue : result
}

safeParse(JSON.stringify({
    a: 1
}), {}); // { a: 1 }

is-type-of

js类型判断库,可判断js中的类型,包括promise,generator等。示例如下:

is.array([1]) // => true
is.null(null) // => true
is.undefined(undefined) // => true
is.object({a: 1}) // => true

numeral

格式化处理数字。示例如下:

const TEN_THOUSANDS = 10000
const HUNDRED_MILLION = 100000000

numeral(345333).format('0,0’) => ‘345,333’ // 整数处理
numeral(3.45333).format('0.00’) => ‘3.45'   // 保留两位小数
numeral(0.9756).format('0.00%’) => ’97.56%’ // 百分比处理

numeral(14321235334343).divide(HUNDRED_MILLION).format('0,0.00’) => ‘143,212.35亿’ //亿处理
numeral(143212353).divide(TEN_THOUSANDS).format('¥0,0.00') => '14,321.24'万 //万处理

// 格式化数字, 大于亿的展示为亿,大于万的展示为万...
formatNum() {
    if(number > HUNDREND_MILLION) {
        return numeral(number).divide(HUNDREND_MILLION).format(‘0,0.00’) + ‘亿'
    } else if(number > TEN_THOUSANDS) {
        return numeral(number).divide(TEN_THOUSANDS).format(‘0,0.00’) + ‘万'    
    } else {
        return numeral(number).format(‘0,0.00')
    }
}

moment.js/day.js

时间处理库。示例如下:

moment().format(‘YYYY-MM-DD')

Excel处理

json文件转excel文件

excel-export库,基于node.js将数据生成excel文件,生成格式为xlsx。

// json转excel
const nodeExcel = require('excel-export');
const _ = require('lodash');
const fs = require('co-fs');
const co = require('co');
/** 
 * 使用场景:
 * 导出Excel
 * 
 * 用法:
 * params:
 *  - name: excel文件名称
 *  - path: 导出的excel路径
 * 
 * rows:
 * [
 *    {
 *      name: ''
 *      _created_at: ''
 *    },
 * .....
 * ]
 * 
 * cols:
 * [
 *   {
 *     key: 'name',
 *     text: '名称'
 *   },
 *   {
 *     key: '_created_at',
 *     text: '提交时间',
 *     filter: dateStr => {
 *        return dateStr.length === 0 ? dateStr : dateStr.split('.')[0].replace('T', ' ');
 *     }
 *   }
 * ];
 */
function wrapConf(rows, cols) {
    let conf = {};
    conf.name = 'sheet1';
    conf.cols = cols.map(item => {
        return {
            caption: item.text,
            type: item.type || 'string',
            width: item.width || 20
        };
    });

    conf.rows = rows.map((row) => {
        return cols.map((col) => {
            if (col.filter) {
                return col.filter(_.get(row, col.key), row);
            } else {
                return _.get(row, col.key);
            }
        });
    });

    return conf;
}

function* exportExcel(path, rows, cols) {
    let conf = wrapConf(rows, cols); // 配置项
    let result = nodeExcel.execute(conf); // 导出excel
    return yield fs.writeFile(path, result, 'binary'); // 写入到路径
}

module.exports = exportExcel;

excel文件转json文件

js-xlsx库,读取和解析多种格式表格的js库。

// excel转json
const fs = require('co-fs');
const co = require('co');
const XLSX = require('xlsx');
// {
//     SheetNames: ['sheet1', 'sheet2'],
//     Sheets: {
//         // worksheet
//         'sheet1': {
//             // cell
//             'A1': { ... },
//             // cell
//             'A2': { ... },
//             ...
//         },
//         // worksheet
//         'sheet2': {
//             // cell
//             'A1': { ... },
//             // cell
//             'A2': { ... },
//             ...
//         }
//     }
// }
function toJson(workbook, keys) {
    let result = {};
    let sheetNames = workbook.SheetNames;
    sheetNames.forEach(sheetName => {
        let worksheet = workbook.Sheets[sheetName];
        result[sheetName] = XLSX.utils.sheet_to_json(worksheet, {
            header: keys
        });
    });

    return result;
};
/**
 * 
 * 
 * @param {any} src: excel文件地址
 * @param {any} dest: 导出json文件地址
 * @param {any} keys: excel列名映射到json的keys 
 * @returns 
 */
function* excelToJson(src, dest, keys) {
    let data = yield fs.readFile(src)
    let json = toJson(XLSX.read(data))
    return yield fs.writeFile(dest, JSON.stringify(json, 2, 2))
}

module.exports = excelToJson;

Markdown

markdown文件转html文件,使用marked-toc(生成文件目录),marked(markdown解析库),hightlight(代码高亮)。

const md5 = require('md5');
const markedToc = require('marked-toc');
const marked = require('marked');
const highlight = require('highlight');
const fs = require('fs');

function generateTocName(name) {
    name = name.trim().replace(/\s+/g, '').replace(/\)/g, '').replace(/[\(\,]/g, '-').toLowerCase();
    if (/^[\w\-]+$/.test(name)) {
        return name;
    }

    return `toc-${md5(name).slice(0, 3)}`;
}

// filePath为markdown文件目录
function markdownToHtml(filePath) {
    let content = fs.readFileSync(filePath, 'utf8');

    let tocContent = marked(markedToc(content)).replace(/<a\s+href="#([^\"]+)">([^<>]+)<\/a>/g, (a, b, c) => {
        return `<a href="#${generateTocName(c)}">${c}</a>`;
    });

    let markedContent = marked(content).replace(/<h(\d)[^<>]*>(.*?)<\/h\1>/g, (a, b, c) => {
        if (b == 2) {
            return `<h${b} id="${generateTocName(c)}">${c}</h${b}>`;
        }
        return `<h${b} id="${generateTocName(c)}"><a class="anchor" href="#${generateTocName(c)}"></a>${c}</h${b}>`;
    });
    markedContent = markedContent.replace(/<h(\d)[^<>]*>([^<>]+)<\/h\1>/, (a, b, c) => {
        return `${a}<div class="toc">${tocContent}</div>`;
    });

    let highlightContent = markedContent.replace(/<pre><code\s*(?:class="lang-(\w+)")?>([\s\S]+?)<\/code><\/pre>/mg, (a, language, text) => {
        text = text.replace(/&#39;/g, '\'').replace(/&gt;/g, '>').replace(/&lt;/g, '<').replace(/\&quot;/g, '"').replace(/\&amp;/g, "&");
        var result = highlight.highlightAuto(text, language ? [language] : undefined);
        return `<pre><code class="hljs lang-${result.language}">${result.value}</code></pre>`;
    });

    return highlightContent;
}
查看原文

赞 1 收藏 0 评论 0

simon_woo 赞了文章 · 2018-04-21

Vue原理解析之Virtual Dom

DOM是文档对象模型(Document Object Model)的简写,在浏览器中我们可以通过js来操作DOM,但是这样的操作性能很差,于是Virtual Dom应运而生。我的理解,Virtual Dom就是在js中模拟DOM对象树来优化DOM操作的一种技术或思路。

本文将对于Vue框架2.1.8版本中使用的Virtual Dom进行分析。

VNode对象

一个VNode的实例对象包含了以下属性

  • tag: 当前节点的标签名

  • data: 当前节点的数据对象,具体包含哪些字段可以参考vue源码types/vnode.d.ts中对VNodeData的定义
    clipboard.png

  • children: 数组类型,包含了当前节点的子节点

  • text: 当前节点的文本,一般文本节点或注释节点会有该属性

  • elm: 当前虚拟节点对应的真实的dom节点

  • ns: 节点的namespace

  • context: 编译作用域

  • functionalContext: 函数化组件的作用域

  • key: 节点的key属性,用于作为节点的标识,有利于patch的优化

  • componentOptions: 创建组件实例时会用到的选项信息

  • child: 当前节点对应的组件实例

  • parent: 组件的占位节点

  • raw: raw html

  • isStatic: 静态节点的标识

  • isRootInsert: 是否作为根节点插入,被<transition>包裹的节点,该属性的值为false

  • isComment: 当前节点是否是注释节点

  • isCloned: 当前节点是否为克隆节点

  • isOnce: 当前节点是否有v-once指令

VNode分类

clipboard.png

VNode可以理解为vue框架的虚拟dom的基类,通过new实例化的VNode大致可以分为几类

  • EmptyVNode: 没有内容的注释节点

  • TextVNode: 文本节点

  • ElementVNode: 普通元素节点

  • ComponentVNode: 组件节点

  • CloneVNode: 克隆节点,可以是以上任意类型的节点,唯一的区别在于isCloned属性为true

  • ...

createElement解析

const SIMPLE_NORMALIZE = 1
const ALWAYS_NORMALIZE = 2

function createElement (context, tag, data, children, normalizationType, alwaysNormalize) {
  // 兼容不传data的情况
  if (Array.isArray(data) || isPrimitive(data)) {
    normalizationType = children
    children = data
    data = undefined
  }
  // 如果alwaysNormalize是true
  // 那么normalizationType应该设置为常量ALWAYS_NORMALIZE的值
  if (alwaysNormalize) normalizationType = ALWAYS_NORMALIZE
  // 调用_createElement创建虚拟节点
  return _createElement(context, tag, data, children, normalizationType)
}

function _createElement (context, tag, data, children, normalizationType) {
  /**
   * 如果存在data.__ob__,说明data是被Observer观察的数据
   * 不能用作虚拟节点的data
   * 需要抛出警告,并返回一个空节点
   * 
   * 被监控的data不能被用作vnode渲染的数据的原因是:
   * data在vnode渲染过程中可能会被改变,这样会触发监控,导致不符合预期的操作
   */
  if (data && data.__ob__) {
    process.env.NODE_ENV !== 'production' && warn(
      `Avoid using observed data object as vnode data: ${JSON.stringify(data)}\n` +
      'Always create fresh vnode data objects in each render!',
      context
    )
    return createEmptyVNode()
  }
  // 当组件的is属性被设置为一个falsy的值
  // Vue将不会知道要把这个组件渲染成什么
  // 所以渲染一个空节点
  if (!tag) {
    return createEmptyVNode()
  }
  // 作用域插槽
  if (Array.isArray(children) &&
      typeof children[0] === 'function') {
    data = data || {}
    data.scopedSlots = { default: children[0] }
    children.length = 0
  }
  // 根据normalizationType的值,选择不同的处理方法
  if (normalizationType === ALWAYS_NORMALIZE) {
    children = normalizeChildren(children)
  } else if (normalizationType === SIMPLE_NORMALIZE) {
    children = simpleNormalizeChildren(children)
  }
  let vnode, ns
  // 如果标签名是字符串类型
  if (typeof tag === 'string') {
    let Ctor
    // 获取标签名的命名空间
    ns = config.getTagNamespace(tag)
    // 判断是否为保留标签
    if (config.isReservedTag(tag)) {
      // 如果是保留标签,就创建一个这样的vnode
      vnode = new VNode(
        config.parsePlatformTagName(tag), data, children,
        undefined, undefined, context
      )
      // 如果不是保留标签,那么我们将尝试从vm的components上查找是否有这个标签的定义
    } else if ((Ctor = resolveAsset(context.$options, 'components', tag))) {
      // 如果找到了这个标签的定义,就以此创建虚拟组件节点
      vnode = createComponent(Ctor, data, context, children, tag)
    } else {
      // 兜底方案,正常创建一个vnode
      vnode = new VNode(
        tag, data, children,
        undefined, undefined, context
      )
    }
    // 当tag不是字符串的时候,我们认为tag是组件的构造类
    // 所以直接创建
  } else {
    vnode = createComponent(tag, data, context, children)
  }
  // 如果有vnode
  if (vnode) {
    // 如果有namespace,就应用下namespace,然后返回vnode
    if (ns) applyNS(vnode, ns)
    return vnode
  // 否则,返回一个空节点
  } else {
    return createEmptyVNode()
  }
}

简单的梳理了一个流程图,可以参考下

clipboard.png

patch原理

patch函数的定义在src/core/vdom/patch.js中,我们先来看下这个函数的逻辑

patch函数接收6个参数:

  • oldVnode: 旧的虚拟节点或旧的真实dom节点

  • vnode: 新的虚拟节点

  • hydrating: 是否要跟真是dom混合

  • removeOnly: 特殊flag,用于<transition-group>组件

  • parentElm: 父节点

  • refElm: 新节点将插入到refElm之前

patch的策略是:

  1. 如果vnode不存在但是oldVnode存在,说明意图是要销毁老节点,那么就调用invokeDestroyHook(oldVnode)来进行销毁

  2. 如果oldVnode不存在但是vnode存在,说明意图是要创建新节点,那么就调用createElm来创建新节点

  3. vnodeoldVnode都存在时

    • 如果oldVnodevnode是同一个节点,就调用patchVnode来进行patch

    • vnodeoldVnode不是同一个节点时,如果oldVnode是真实dom节点或hydrating设置为true,需要用hydrate函数将虚拟dom和真是dom进行映射,然后将oldVnode设置为对应的虚拟dom,找到oldVnode.elm的父节点,根据vnode创建一个真实dom节点并插入到该父节点中oldVnode.elm的位置

    这里面值得一提的是patchVnode函数,因为真正的patch算法是由它来实现的(patchVnode中更新子节点的算法其实是在updateChildren函数中实现的,为了便于理解,我统一放到patchVnode中来解释)。

patchVnode算法是:

  1. 如果oldVnodevnode完全一致,那么不需要做任何事情

  2. 如果oldVnodevnode都是静态节点,且具有相同的key,当vnode是克隆节点或是v-once指令控制的节点时,只需要把oldVnode.elmoldVnode.child都复制到vnode上,也不用再有其他操作

  3. 否则,如果vnode不是文本节点或注释节点

    • 如果oldVnodevnode都有子节点,且2方的子节点不完全一致,就执行更新子节点的操作(这一部分其实是在updateChildren函数中实现),算法如下

      • 分别获取oldVnodevnodefirstChildlastChild,赋值给oldStartVnodeoldEndVnodenewStartVnodenewEndVnode

      • 如果oldStartVnodenewStartVnode是同一节点,调用patchVnode进行patch,然后将oldStartVnodenewStartVnode都设置为下一个子节点,重复上述流程
        clipboard.png

      • 如果oldEndVnodenewEndVnode是同一节点,调用patchVnode进行patch,然后将oldEndVnodenewEndVnode都设置为上一个子节点,重复上述流程
        clipboard.png

      • 如果oldStartVnodenewEndVnode是同一节点,调用patchVnode进行patch,如果removeOnlyfalse,那么可以把oldStartVnode.elm移动到oldEndVnode.elm之后,然后把oldStartVnode设置为下一个节点,newEndVnode设置为上一个节点,重复上述流程
        clipboard.png

      • 如果newStartVnodeoldEndVnode是同一节点,调用patchVnode进行patch,如果removeOnlyfalse,那么可以把oldEndVnode.elm移动到oldStartVnode.elm之前,然后把newStartVnode设置为下一个节点,oldEndVnode设置为上一个节点,重复上述流程
        clipboard.png

      • 如果以上都不匹配,就尝试在oldChildren中寻找跟newStartVnode具有相同key的节点,如果找不到相同key的节点,说明newStartVnode是一个新节点,就创建一个,然后把newStartVnode设置为下一个节点

      • 如果上一步找到了跟newStartVnode相同key的节点,那么通过其他属性的比较来判断这2个节点是否是同一个节点,如果是,就调用patchVnode进行patch,如果removeOnlyfalse,就把newStartVnode.elm插入到oldStartVnode.elm之前,把newStartVnode设置为下一个节点,重复上述流程
        clipboard.png

      • 如果在oldChildren中没有寻找到newStartVnode的同一节点,那就创建一个新节点,把newStartVnode设置为下一个节点,重复上述流程

      • 如果oldStartVnodeoldEndVnode重合了,并且newStartVnodenewEndVnode也重合了,这个循环就结束了

    • 如果只有oldVnode有子节点,那就把这些节点都删除

    • 如果只有vnode有子节点,那就创建这些子节点

    • 如果oldVnodevnode都没有子节点,但是oldVnode是文本节点或注释节点,就把vnode.elm的文本设置为空字符串

  4. 如果vnode是文本节点或注释节点,但是vnode.text != oldVnode.text时,只需要更新vnode.elm的文本内容就可以

生命周期

patch提供了5个生命周期钩子,分别是

  • create: 创建patch时

  • activate: 激活组件时

  • update: 更新节点时

  • remove: 移除节点时

  • destroy: 销毁节点时

这些钩子是提供给Vue内部的directives/ref/attrs/style等模块使用的,方便这些模块在patch的不同阶段进行相应的操作,这里模块定义在src/core/vdom/modulessrc/platforms/web/runtime/modules2个目录中

vnode也提供了生命周期钩子,分别是

  • init: vdom初始化时

  • create: vdom创建时

  • prepatch: patch之前

  • insert: vdom插入后

  • update: vdom更新前

  • postpatch: patch之后

  • remove: vdom移除时

  • destroy: vdom销毁时

vue组件的生命周期底层其实就依赖于vnode的生命周期,在src/core/vdom/create-component.js中我们可以看到,vue为自己的组件vnode已经写好了默认的init/prepatch/insert/destroy,而vue组件的mounted/activated就是在insert中触发的,deactivated就是在destroy中触发的

实践

在Vue里面,Vue.prototype.$createElement对应vdom的createElement方法,Vue.prototype.__patch__对应patch方法,我写了个简单的demo来验证下功能

<p data-height="265" data-theme-id="0" data-slug-hash="rjZKZz" data-default-tab="html,result" data-user="JoeRay" data-embed-version="2" data-pen-title="Vue Virtual Dom" class="codepen">See the Pen Vue Virtual Dom by zhulei (@JoeRay) on CodePen.</p>
<script async data-original="https://production-assets.cod...

查看原文

赞 131 收藏 184 评论 12

simon_woo 赞了文章 · 2018-01-03

《Node.js设计模式》欢迎来到Node.js平台

本系列文章为《Node.js Design Patterns Second Edition》的原文翻译和读书笔记,在GitHub连载更新,同步翻译版链接

欢迎关注我的专栏,之后的博文将在专栏同步:

Welcom to the Node.js Platform

Node.js 的发展

  • 技术本身的发展
  • 庞大的Node.js生态圈的发展
  • 官方组织的维护

Node.js的特点

小模块

package的形式尽可能多的复用模块,原则上每个模块的容量尽量小而精。

原则:

  • "Small is beautiful" ---小而精
  • "Make each program do one thing well" ---单一职责原则

因此,一个Node.js应用由多个包搭建而成,包管理器(npm)的管理使得他们相互依赖而不起冲突。

如果设计一个Node.js的模块,尽可能做到以下三点:

  • 易于理解和使用
  • 易于测试和维护
  • 考虑到对客户端(浏览器)的支持更友好

以及,Don't Repeat Yourself(DRY)复用性原则。

以接口形式提供

每个Node.js模块都是一个函数(类也是以构造函数的形式呈现),我们只需要调用相关API即可,而不需要知道其它模块的实现。Node.js模块是为了使用它们而创建,不仅仅是在拓展性上,更要考虑到维护性和可用性。

简单且实用

“简单就是终极的复杂” ————达尔文

遵循KISS(Keep It Simple, Stupid)原则,即优秀的简洁的设计,能够更有效地传递信息。

设计必须很简单,无论在实现还是接口上,更重要的是实现比接口更简单,简单是重要的设计原则。

我们做一个设计简单,功能完备,而不是完美的软件:

  • 实现起来需要更少的努力
  • 允许用更少的速度进行更快的运输资源
  • 具有伸缩性,更易于维护和理解
  • 促进社区贡献,允许软件本身的成长和改进

而对于Node.js而言,因为其支持JavaScript,简单和函数、闭包、对象等特性,可取代复杂的面向对象的类语法。如单例模式和装饰者模式,它们在面向对象的语言都需要很复杂的实现,而对于JavaScript则较为简单。

介绍Node.js 6 和 ES2015的新语法

let和const关键字

ES5之前,只有函数和全局作用域。

if (false) {
  var x = "hello";
}

console.log(x); // undefined

现在用let,创建词法作用域,则会报出一个错误Uncaught ReferenceError: x is not defined

if (false) {
  let x = "hello";
}

console.log(x);

在循环语句中使用let,也会报错Uncaught ReferenceError: i is not defined

for (let i = 0; i < 10; i++) {
  // do something here
}

console.log(i);

使用letconst关键字,可以让代码更安全,如果意外的访问另一个作用域的变量,更容易发现错误。

使用const关键字声明变量,变量不会被意外更改。

const x = 'This will never change';
x = '...';

这里会报出一个错误Uncaught TypeError: Assignment to constant variable.

但是对于对象属性的更改,const显得毫无办法:

const x = {};
x.name = 'John';

上述代码并不会报错

但是如果直接更改对象,还是会抛出一个错误。

const x = {};
x = null;

实际运用中,我们使用const引入模块,防止意外被更改:

const path = require('path');
let path = './some/path';

上述代码会报错,提醒我们意外更改了模块。

如果需要创建不可变对象,只是简单的使用const是不够的,需要使用Object.freeze()deep-freeze

我看了一下源码,其实很少,就是递归使用Object.freeze()

module.exports = function deepFreeze (o) {
  Object.freeze(o);

  Object.getOwnPropertyNames(o).forEach(function (prop) {
    if (o.hasOwnProperty(prop)
    && o[prop] !== null
    && (typeof o[prop] === "object" || typeof o[prop] === "function")
    && !Object.isFrozen(o[prop])) {
      deepFreeze(o[prop]);
    }
  });
  
  return o;
};

箭头函数

箭头函数更易于理解,特别是在我们定义回调的时候:

const numbers = [2, 6, 7, 8, 1];
const even = numbers.filter(function(x) {
  return x % 2 === 0;
});

使用箭头函数语法,更简洁:

const numbers = [2, 6, 7, 8, 1];
const even = numbers.filter(x => x % 2 === 0);

如果不止一个return语句则使用=> {}

const numbers = [2, 6, 7, 8, 1];
const even = numbers.filter((x) => {
  if (x % 2 === 0) {
    console.log(x + ' is even');
    return true;
  }
});

最重要是,箭头函数绑定了它的词法作用域,其this与父级代码块的this相同。

function DelayedGreeter(name) {
  this.name = name;
}

DelayedGreeter.prototype.greet = function() {
  setTimeout(function cb() {
    console.log('Hello' + this.name);
  }, 500);
}

const greeter = new DelayedGreeter('World');
greeter.greet(); // 'Hello'

要解决这个问题,使用箭头函数或bind

function DelayedGreeter(name) {
  this.name = name;
}

DelayedGreeter.prototype.greet = function() {
  setTimeout(function cb() {
    console.log('Hello' + this.name);
  }.bind(this), 500);
}

const greeter = new DelayedGreeter('World');
greeter.greet(); // 'HelloWorld'

或者箭头函数,与父级代码块作用域相同:

function DelayedGreeter(name) {
  this.name = name;
}

DelayedGreeter.prototype.greet = function() {
  setTimeout(() => console.log('Hello' + this.name), 500);
}

const greeter = new DelayedGreeter('World');
greeter.greet(); // 'HelloWorld'

类语法糖

class是原型继承的语法糖,对于来自传统的面向对象语言的所有开发人员(如JavaC#)来说更熟悉,新语法并没有改变JavaScript的运行特征,通过原型来完成更加方便和易读。

传统的通过构造器 + 原型的写法:

function Person(name, surname, age) {
  this.name = name;
  this.surname = surname;
  this.age = age;
}

Person.prototype.getFullName = function() {
  return this.name + '' + this.surname;
}

Person.older = function(person1, person2) {
  return (person1.age >= person2.age) ? person1 : person2;
}

使用class语法显得更加简洁、方便、易懂:

class Person {
  constructor(name, surname, age) {
    this.name = name;
    this.surname = surname;
    this.age = age;
  }

  getFullName() {
    return this.name + '' + this.surname;
  }

  static older(person1, person2) {
    return (person1.age >= person2.age) ? person1 : person2;
  }
}

但是上面的实现是可以互换的,但是,对于class语法来说,最有意义的是extendssuper关键字。

class PersonWithMiddlename extends Person {
  constructor(name, middlename, surname, age) {
    super(name, surname, age);
    this.middlename = middlename;
  }

  getFullName() {
    return this.name + '' + this.middlename + '' + this.surname;
  }
}

这个例子是真正的面向对象的方式,我们声明了一个希望被继承的类,定义新的构造器,并可以使用super关键字调用父构造器,并重写getFullName方法,使得其支持middlename

对象字面量的新语法

允许缺省值:

const x = 22;
const y = 17;
const obj = { x, y };

允许省略方法名

module.exports = {
  square(x) {
    return x * x;
  },
  cube(x) {
    return x * x * x;
  },
};

key的计算属性

const namespace = '-webkit-';
const style = {
  [namespace + 'box-sizing']: 'border-box',
  [namespace + 'box-shadow']: '10px 10px 5px #888',
};

新的定义getter和setter方式

const person = {
  name: 'George',
  surname: 'Boole',

  get fullname() {
    return this.name + ' ' + this.surname;
  },

  set fullname(fullname) {
    let parts = fullname.split(' ');
    this.name = parts[0];
    this.surname = parts[1];
  }
};

console.log(person.fullname); // "George Boole"
console.log(person.fullname = 'Alan Turing'); // "Alan Turing"
console.log(person.name); // "Alan"

这里,第二个console.log触发了set方法。

模板字符串

其它ES2015语法

reactor模式

reactor模式Node.js异步编程的核心模块,其核心概念是:单线程非阻塞I/O,通过下列例子可以看到reactor模式Node.js平台的体现。

I/O是缓慢的

在计算机的基本操作中,输入输出肯定是最慢的。访问内存的速度是纳秒级(10e-9 s),同时访问磁盘上的数据或访问网络上的数据则更慢,是毫秒级(10e-3 s)。内存的传输速度一般认为是GB/s来计算,然而磁盘或网络的访问速度则比较慢,一般是MB/s。虽然对于CPU而言,I/O操作的资源消耗并不算大,但是在发送I/O请求和操作完成之间总会存在时间延迟。除此之外,我们还必须考虑人为因素,通常情况下,应用程序的输入是人为产生的,例如:按钮的点击、即时聊天工具的信息发送。因此,输入输出的速度并不因网络和磁盘访问速率慢造成的,还有多方面的因素。

阻塞I/O

在一个阻塞I/O模型的进程中,I/O请求会阻塞之后代码块的运行。在I/O请求操作完成之前,线程会有一段不定长的时间浪费。(它可能是毫秒级的,但甚至有可能是分钟级的,如用户按着一个按键不放的情况)。以下例子就是一个阻塞I/O模型。

// 直到请求完成,数据可用,线程都是阻塞的
data = socket.read();
// 请求完成,数据可用
print(data);

我们知道,阻塞I/O的服务器模型并不能在一个线程中处理多个连接,每次I/O都会阻塞其它连接的处理。出于这个原因,对于每个需要处理的并发连接,传统的web服务器的处理方式是新开一个新的进程或线程(或者从线程池中重用一个进程)。这样,当一个线程因 I/O操作被阻塞时,它并不会影响另一个线程的可用性,因为他们是在彼此独立的线程中处理的。

通过下面这张图:

通过上面的图片我们可以看到每个线程都有一段时间处于空闲等待状态,等待从关联连接接收新数据。如果所有种类的I/O操作都会阻塞后续请求。例如,连接数据库和访问文件系统,现在我们能很快知晓一个线程需要因等待I/O操作的结果等待许多时间。不幸的是,一个线程所持有的CPU资源并不廉价,它需要消耗内存、造成CPU上下文切换,因此,长期占有CPU而大部分时间并没有使用的线程,在资源利用率上考虑,并不是高效的选择。

非阻塞I/O

阻塞I/O之外,大部分现代的操作系统支持另外一种访问资源的机制,即非阻塞I/O。在这种机制下,后续代码块不会等到I/O请求数据的返回之后再执行。如果当前时刻所有数据都不可用,函数会先返回预先定义的常量值(如undefined),表明当前时刻暂无数据可用。

例如,在Unix操作系统中,fcntl()函数操作一个已存在的文件描述符,改变其操作模式为非阻塞I/O(通过O_NONBLOCK状态字)。一旦资源是非阻塞模式,如果读取文件操作没有可读取的数据,或者如果写文件操作被阻塞,读操作或写操作返回-1EAGAIN错误。

非阻塞I/O最基本的模式是通过轮询获取数据,这也叫做忙-等模型。看下面这个例子,通过非阻塞I/O和轮询机制获取I/O的结果。

resources = [socketA, socketB, pipeA];
while(!resources.isEmpty()) {
  for (i = 0; i < resources.length; i++) {
    resource = resources[i];
    // 进行读操作
    let data = resource.read();
    if (data === NO_DATA_AVAILABLE) {
      // 此时还没有数据
      continue;
    }
    if (data === RESOURCE_CLOSED) {
      // 资源被释放,从队列中移除该链接
      resources.remove(i);
    } else {
      consumeData(data);
    }
  }
}

我们可以看到,通过这个简单的技术,已经可以在一个线程中处理不同的资源了,但依然不是高效的。事实上,在前面的例子中,用于迭代资源的循环只会消耗宝贵的CPU,而这些资源的浪费比起阻塞I/O反而更不可接受,轮询算法通常浪费大量CPU时间。

事件多路复用

对于获取非阻塞的资源而言,忙-等模型不是一个理想的技术。但是幸运的是,大多数现代的操作系统提供了一个原生的机制来处理并发,非阻塞资源(同步事件多路复用器)是一个有效的方法。这种机制被称作事件循环机制,这种事件收集和I/O队列源于发布-订阅模式。事件多路复用器收集资源的I/O事件并且把这些事件放入队列中,直到事件被处理时都是阻塞状态。看下面这个伪代码:

socketA, pipeB;
wachedList.add(socketA, FOR_READ);
wachedList.add(pipeB, FOR_READ);
while(events = demultiplexer.watch(wachedList)) {
  // 事件循环
  foreach(event in events) {
    // 这里并不会阻塞,并且总会有返回值(不管是不是确切的值)
    data = event.resource.read();
    if (data === RESOURCE_CLOSED) {
      // 资源已经被释放,从观察者队列移除
      demultiplexer.unwatch(event.resource);
    } else {
      // 成功拿到资源,放入缓冲池
      consumeData(data);
    }
  }
}

事件多路复用的三个步骤:

  • 资源被添加到一个数据结构中,为每个资源关联一个特定的操作,在这个例子中是read
  • 事件通知器由一组被观察的资源组成,一旦事件即将触发,会调用同步的watch函数,并返回这个可被处理的事件。
  • 最后,处理事件多路复用器返回的每个事件,此时,与系统资源相关联的事件将被读并且在整个操作中都是非阻塞的。直到所有事件都被处理完时,事件多路复用器会再次阻塞,然后重复这个步骤,以上就是event loop

上图可以很好的帮助我们理解在一个单线程的应用程序中使用同步的时间多路复用器和非阻塞I/O实现并发。我们能够看到,只使用一个线程并不会影响我们处理多个I/O任务的性能。同时,我们看到任务是在单个线程中随着时间的推移而展开的,而不是分散在多个线程中。我们看到,在单线程中传播的任务相对于多线程中传播的任务反而节约了线程的总体空闲时间,并且更利于程序员编写代码。在这本书中,你可以看到我们可以用更简单的并发策略,因为不需要考虑多线程的互斥和同步问题。

在下一章中,我们有更多机会讨论Node.js的并发模型。

介绍reactor模式

现在来说reactor模式,它通过一种特殊的算法设计的处理程序(在Node.js中是使用一个回调函数表示),一旦事件产生并在事件循环中被处理,那么相关handler将会被调用。

它的结构如图所示:

reactor模式的步骤为:

  • 应用程序通过提交请求到时间多路复用器产生一个新的I/O操作。应用程序指定handlerhandler 在操作完成后被调用。提交请求到事件多路复用器是非阻塞的,其调用所以会立马返回,将执行权返回给应用程序。
  • 当一组I/O操作完成,事件多路复用器会将这些新事件添加到事件循环队列中。
  • 此时,事件循环会迭代事件循环队列中的每个事件。
  • 对于每个事件,对应的handler被处理。
  • handler,是应用程序代码的一部分,handler执行结束后执行权会交回事件循环。但是,在handler 执行时可能请求新的异步操作,从而新的操作被添加到事件多路复用器。
  • 当事件循环队列的全部事件被处理完后,循环会在事件多路复用器再次阻塞直到有一个新的事件可处理触发下一次循环。

我们现在可以定义Node.js的核心模式:

模式(反应器)阻塞处理I/O到在一组观察的资源有新的事件可处理,然后以分派每个事件对应handler的方式反应。

OS的非阻塞I/O引擎

每个操作系统对于事件多路复用器有其自身的接口,LinuxepollMac OSXkqueueWindowsIOCP API。除外,即使在相同的操作系统中,每个I/O操作对于不同的资源表现不一样。例如,在Unix下,普通文件系统不支持非阻塞操作,所以,为了模拟非阻塞行为,需要使用在事件循环外用一个独立的线程。所有这些平台内和跨平台的不一致性需要在事件多路复用器的上层做抽象。这就是为什么Node.js为了兼容所有主流平台而
编写C语言库libuv,目的就是为了使得Node.js兼容所有主流平台和规范化不同类型资源的非阻塞行为。libuv今天作为Node.jsI/O引擎的底层。

查看原文

赞 4 收藏 10 评论 0

simon_woo 赞了文章 · 2018-01-03

《Node.js设计模式》使用流进行编码

本系列文章为《Node.js Design Patterns Second Edition》的原文翻译和读书笔记,在GitHub连载更新,同步翻译版链接

欢迎关注我的专栏,之后的博文将在专栏同步:

Coding with Streams

StreamsNode.js最重要的组件和模式之一。 社区中有一句格言“Stream all the things(Steam就是所有的)”,仅此一点就足以描述流在Node.js中的地位。 Dominic Tarr作为Node.js社区的最大贡献者,它将流定义为Node.js最好,也是最难以理解的概念。

使Node.jsStreams如此吸引人还有其它原因; 此外,Streams不仅与性能或效率等技术特性有关,更重要的是它们的优雅性以及它们与Node.js的设计理念完美契合的方式。

在本章中,将会学到以下内容:

  • Streams对于Node.js的重要性。
  • 如何创建并使用Streams
  • Streams作为编程范式,不只是对于I/O而言,在多种应用场景下它的应用和强大的功能。
  • 管道模式和在不同的配置中连接Streams

发现Streams的重要性

在基于事件的平台(如Node.js)中,处理I / O的最有效的方法是实时处理,一旦有输入的信息,立马进行处理,一旦有需要输出的结果,也立马输出反馈。

在本节中,我们将首先介绍Node.jsStreams和它的优点。 请记住,这只是一个概述,因为本章后面将会详细介绍如何使用和组合Streams

Streams和Buffer的比较

我们在本书中几乎所有看到过的异步API都是使用的Buffer模式。 对于输入操作,Buffer模式会将来自资源的所有数据收集到Buffer区中; 一旦读取完整个资源,就会把结果传递给回调函数。 下图显示了这个范例的一个真实的例子:

从上图我们可以看到,在t1时刻,一些数据从资源接收并保存到缓冲区。 在t2时刻,最后一段数据被接收到另一个数据块,完成读取操作,这时,把整个缓冲区的内容发送给消费者。

另一方面,Streams允许你在数据到达时立即处理数据。 如下图所示:

这一张图显示了Streams如何从资源接收每个新的数据块,并立即提供给消费者,消费者现在不必等待缓冲区中收集所有数据再处理每个数据块。

但是这两种方法有什么区别呢? 我们可以将它们概括为两点:

  • 空间效率
  • 时间效率

此外,Node.jsStreams具有另一个重要的优点:可组合性(composability)。 现在让我们看看这些属性对我们设计和编写应用程序的方式会产生什么影响。

空间效率

首先,Streams允许我们做一些看起来不可能的事情,通过缓冲数据并一次性处理。 例如,考虑一下我们必须读取一个非常大的文件,比如说数百MB甚至千MB。 显然,等待完全读取文件时返回大BufferAPI不是一个好主意。 想象一下,如果并发读取一些大文件, 我们的应用程序很容易耗尽内存。 除此之外,V8中的Buffer不能大于0x3FFFFFFF字节(小于1GB)。 所以,在耗尽物理内存之前,我们可能会碰壁。

使用Buffered的API进行压缩文件

举一个具体的例子,让我们考虑一个简单的命令行接口(CLI)的应用程序,它使用Gzip格式压缩文件。 使用BufferedAPI,这样的应用程序在Node.js中大概这么编写(为简洁起见,省略了异常处理):

const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];
fs.readFile(file, (err, buffer) => {
  zlib.gzip(buffer, (err, buffer) => {
    fs.writeFile(file + '.gz', buffer, err => {
      console.log('File successfully compressed');
    });
  });
});

现在,我们可以尝试将前面的代码放在一个叫做gzip.js的文件中,然后执行下面的命令:

node gzip <path to file>

如果我们选择一个足够大的文件,比如说大于1GB的文件,我们会收到一个错误信息,说明我们要读取的文件大于最大允许的缓冲区大小,如下所示:

RangeError: File size is greater than possible Buffer:0x3FFFFFFF

上面的例子中,没找到一个大文件,但确实对于大文件的读取速率慢了许多。

正如我们所预料到的那样,使用Buffer来进行大文件的读取显然是错误的。

使用Streams进行压缩文件

我们必须修复我们的Gzip应用程序,并使其处理大文件的最简单方法是使用StreamsAPI。 让我们看看如何实现这一点。 让我们用下面的代码替换刚创建的模块的内容:

const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];
fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream(file + '.gz'))
  .on('finish', () => console.log('File successfully compressed'));

“是吗?”你可能会问。是的;正如我们所说的,由于Streams的接口和可组合性,因此我们还能写出这样的更加简洁,优雅和精炼的代码。 我们稍后会详细地看到这一点,但是现在需要认识到的重要一点是,程序可以顺畅地运行在任何大小的文件上,理想情况是内存利用率不变。 尝试一下(但考虑压缩一个大文件可能需要一段时间)。

时间效率

现在让我们考虑一个压缩文件并将其上传到远程HTTP服务器的应用程序的例子,该远程HTTP服务器进而将其解压缩并保存到文件系统中。如果我们的客户端是使用BufferedAPI实现的,那么只有当整个文件被读取和压缩时,上传才会开始。 另一方面,只有在接收到所有数据的情况下,解压缩才会在服务器上启动。 实现相同结果的更好的解决方案涉及使用Streams。 在客户端机器上,Streams只要从文件系统中读取就可以压缩和发送数据块,而在服务器上,只要从远程对端接收到数据块,就可以解压每个数据块。 我们通过构建前面提到的应用程序来展示这一点,从服务器端开始。

我们创建一个叫做gzipReceive.js的模块,代码如下:

const http = require('http');
const fs = require('fs');
const zlib = require('zlib');

const server = http.createServer((req, res) => {
  const filename = req.headers.filename;
  console.log('File request received: ' + filename);
  req
    .pipe(zlib.createGunzip())
    .pipe(fs.createWriteStream(filename))
    .on('finish', () => {
      res.writeHead(201, {
        'Content-Type': 'text/plain'
      });
      res.end('That\'s it\n');
      console.log(`File saved: ${filename}`);
    });
});

server.listen(3000, () => console.log('Listening'));

服务器从网络接收数据块,将其解压缩,并在接收到数据块后立即保存,这要归功于Node.jsStreams

我们的应用程序的客户端将进入一个名为gzipSend.js的模块,如下所示:

在前面的代码中,我们再次使用Streams从文件中读取数据,然后在从文件系统中读取的同时压缩并发送每个数据块。

现在,运行这个应用程序,我们首先使用以下命令启动服务器:

node gzipReceive

然后,我们可以通过指定要发送的文件和服务器的地址(例如localhost)来启动客户端:

node gzipSend <path to file> localhost

如果我们选择一个足够大的文件,我们将更容易地看到数据如何从客户端流向服务器,但为什么这种模式下,我们使用Streams,比使用BufferedAPI更有效率? 下图应该给我们一个提示:

一个文件被处理的过程,它经过以下阶段:

  1. 客户端从文件系统中读取
  2. 客户端压缩数据
  3. 客户端将数据发送到服务器
  4. 服务端接收数据
  5. 服务端解压数据
  6. 服务端将数据写入磁盘

为了完成处理,我们必须按照流水线顺序那样经过每个阶段,直到最后。在上图中,我们可以看到,使用BufferedAPI,这个过程完全是顺序的。为了压缩数据,我们首先必须等待整个文件被读取完毕,然后,发送数据,我们必须等待整个文件被读取和压缩,依此类推。当我们使用Streams时,只要我们收到第一个数据块,流水线就会被启动,而不需要等待整个文件的读取。但更令人惊讶的是,当下一块数据可用时,不需要等待上一组任务完成;相反,另一条装配线是并行启动的。因为我们执行的每个任务都是异步的,这样显得很完美,所以可以通过Node.js来并行执行Streams的相关操作;唯一的限制就是每个阶段都必须保证数据块的到达顺序。

从前面的图可以看出,使用Streams的结果是整个过程花费的时间更少,因为我们不用等待所有数据被全部读取完毕和处理。

组合性

到目前为止,我们已经看到的代码已经告诉我们如何使用pipe()方法来组装Streams的数据块,Streams允许我们连接不同的处理单元,每个处理单元负责单一的职责(这是符合Node.js风格的)。这是可能的,因为Streams具有统一的接口,并且就API而言,不同Streams也可以很好的进行交互。唯一的先决条件是管道的下一个Streams必须支持上一个Streams生成的数据类型,可以是二进制,文本甚至是对象,我们将在后面的章节中看到。

为了证明Streams组合性的优势,我们可以尝试在我们先前构建的gzipReceive / gzipSend应用程序中添加加密功能。
为此,我们只需要通过向流水线添加另一个Streams来更新客户端。 确切地说,由crypto.createChipher()返回的流。 由此产生的代码应如下所示:

const fs = require('fs');
const zlib = require('zlib');
const crypto = require('crypto');
const http = require('http');
const path = require('path');

const file = process.argv[2];
const server = process.argv[3];

const options = {
  hostname: server,
  port: 3000,
  path: '/',
  method: 'PUT',
  headers: {
    filename: path.basename(file),
    'Content-Type': 'application/octet-stream',
    'Content-Encoding': 'gzip'
  }
};

const req = http.request(options, res => {
  console.log('Server response: ' + res.statusCode);
});

fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .pipe(crypto.createCipher('aes192', 'a_shared_secret'))
  .pipe(req)
  .on('finish', () => {
    console.log('File successfully sent');
  });

使用相同的方式,我们更新服务端的代码,使得它可以在数据块进行解压之前先解密:

const http = require('http');
const fs = require('fs');
const zlib = require('zlib');
const crypto = require('crypto');

const server = http.createServer((req, res) => {
  const filename = req.headers.filename;
  console.log('File request received: ' + filename);
  req
    .pipe(crypto.createDecipher('aes192', 'a_shared_secret'))
    .pipe(zlib.createGunzip())
    .pipe(fs.createWriteStream(filename))
    .on('finish', () => {
      res.writeHead(201, {
        'Content-Type': 'text/plain'
      });
      res.end('That\'s it\n');
      console.log(`File saved: ${filename}`);
    });
});

server.listen(3000, () => console.log('Listening'));
crypto是Node.js的核心模块之一,提供了一系列加密算法。

只需几行代码,我们就在应用程序中添加了一个加密层。 我们只需要简单地通过把已经存在的Streams模块和加密层组合到一起,就可以。类似的,我们可以添加和合并其他Streams,如同在玩乐高积木一样。

显然,这种方法的主要优点是可重用性,但正如我们从目前为止所介绍的代码中可以看到的那样,Streams也可以实现更清晰,更模块化,更加简洁的代码。 出于这些原因,流通常不仅仅用于处理纯粹的I / O,而且它还是简化和模块化代码的手段。

开始使用Streams

在前面的章节中,我们了解了为什么Streams如此强大,而且它在Node.js中无处不在,甚至在Node.js的核心模块中也有其身影。 例如,我们已经看到,fs模块具有用于从文件读取的createReadStream()和用于写入文件的createWriteStream()HTTP请求和响应对象本质上是Streams,并且zlib模块允许我们使用StreamsAPI压缩和解压缩数据块。

现在我们知道为什么Streams是如此重要,让我们退后一步,开始更详细地探索它。

Streams的结构

Node.js中的每个Streams都是Streams核心模块中可用的四个基本抽象类之一的实现:

  • stream.Readable
  • stream.Writable
  • stream.Duplex
  • stream.Transform

每个stream类也是EventEmitter的一个实例。实际上,Streams可以产生几种类型的事件,比如end事件会在一个可读的Streams完成读取,或者错误读取,或其过程中产生异常时触发。

请注意,为简洁起见,在本章介绍的例子中,我们经常会忽略适当的错误处理。但是,在生产环境下中,总是建议为所有Stream注册错误事件侦听器。

Streams之所以如此灵活的原因之一是它不仅能够处理二进制数据,而且几乎可以处理任何JavaScript值。实际上,Streams可以支持两种操作模式:

  • 二进制模式:以数据块形式(例如buffersstrings)流式传输数据
  • 对象模式:将流数据视为一系列离散对象(这使得我们几乎可以使用任何JavaScript值)

这两种操作模式使我们不仅可以使用I / O流,而且还可以作为一种工具,以函数式的风格优雅地组合处理单元,我们将在本章后面看到。

在本章中,我们将主要使用在Node.js 0.11中引入的Node.js流接口,也称为版本3。 有关与旧接口差异的更多详细信息,请参阅StrongLoop在https://strongloop.com/strong...

可读的Streams

一个可读的Streams表示一个数据源,在Node.js中,它使用stream模块中的Readableabstract类实现。

从Streams中读取信息

从可读Streams接收数据有两种方式:non-flowing模式和flowing模式。 我们来更详细地分析这些模式。

non-flowing模式(不流动模式)

从可读的Streams中读取数据的默认模式是为其附加一个可读事件侦听器,用于指示要读取的新数据的可用性。然后,在一个循环中,我们读取所有的数据,直到内部buffer被清空。这可以使用read()方法完成,该方法同步从内部缓冲区中读取数据,并返回表示数据块的BufferString对象。read()方法以如下使用模式:

readable.read([size]);

使用这种方法,数据随时可以直接从Streams中按需提取。

为了说明这是如何工作的,我们创建一个名为readStdin.js的新模块,它实现了一个简单的程序,它从标准输入(一个可读流)中读取数据,并将所有数据回送到标准输出:

process.stdin
  .on('readable', () => {
    let chunk;
    console.log('New data available');
    while ((chunk = process.stdin.read()) !== null) {
      console.log(
        `Chunk read: (${chunk.length}) "${chunk.toString()}"`
      );
    }
  })
  .on('end', () => process.stdout.write('End of stream'));

read()方法是一个同步操作,它从可读Streams的内部Buffers区中提取数据块。如果Streams在二进制模式下工作,返回的数据块默认为一个Buffer对象。

在以二进制模式工作的可读的Stream中,我们可以通过在Stream上调用setEncoding(encoding)来读取字符串而不是Buffer对象,并提供有效的编码格式(例如utf8)。

数据是从可读的侦听器中读取的,只要有新的数据,就会调用这个侦听器。当内部缓冲区中没有更多数据可用时,read()方法返回null;在这种情况下,我们不得不等待另一个可读的事件被触发,告诉我们可以再次读取或者等待表示Streams读取过程结束的end事件触发。当一个流以二进制模式工作时,我们也可以通过向read()方法传递一个size参数来指定我们想要读取的数据大小。这在实现网络协议或解析特定数据格式时特别有用。

现在,我们准备运行readStdin模块并进行实验。让我们在控制台中键入一些字符,然后按Enter键查看回显到标准输出中的数据。要终止流并因此生成一个正常的结束事件,我们需要插入一个EOF(文件结束)字符(在Windows上使用Ctrl + Z或在Linux上使用Ctrl + D)。

我们也可以尝试将我们的程序与其他程序连接起来;这可以使用管道运算符(|),它将程序的标准输出重定向到另一个程序的标准输入。例如,我们可以运行如下命令:

cat <path to a file> | node readStdin

这是流式范例是一个通用接口的一个很好的例子,它使得我们的程序能够进行通信,而不管它们是用什么语言写的。

flowing模式(流动模式)

Streams中读取的另一种方法是将侦听器附加到data事件;这会将Streams切换为flowing模式,其中数据不是使用read()函数来提取的,而是一旦有数据到达data监听器就被推送到监听器内。例如,我们之前创建的readStdin应用程序将使用流动模式:

process.stdin
  .on('data', chunk => {
    console.log('New data available');
    console.log(
      `Chunk read: (${chunk.length}) "${chunk.toString()}"`
    );
  })
  .on('end', () => process.stdout.write('End of stream'));

flowing模式是旧版Streams接口(也称为Streams1)的继承,其灵活性较低,API较少。随着Streams2接口的引入,flowing模式不是默认的工作模式,要启用它,需要将侦听器附加到data事件或显式调用resume()方法。 要暂时中断Streams触发data事件,我们可以调用pause()方法,导致任何传入数据缓存在内部buffer中。

调用pause()不会导致Streams切换回non-flowing模式。

实现可读的Streams

现在我们知道如何从Streams中读取数据,下一步是学习如何实现一个新的Readable数据流。为此,有必要通过继承stream.Readable的原型来创建一个新的类。 具体流必须提供_read()方法的实现:

readable._read(size)

Readable类的内部将调用_read()方法,而该方法又将启动
使用push()填充内部缓冲区:

请注意,read()是Stream消费者调用的方法,而_read()是一个由Stream子类实现的方法,不能直接调用。下划线通常表示该方法为私有方法,不应该直接调用。

为了演示如何实现新的可读Streams,我们可以尝试实现一个生成随机字符串的Streams。 我们来创建一个名为randomStream.js的新模块,它将包含我们的字符串的generator的代码:

const stream = require('stream');
const Chance = require('chance');

const chance = new Chance();

class RandomStream extends stream.Readable {
  constructor(options) {
    super(options);
  }

  _read(size) {
    const chunk = chance.string(); //[1]
    console.log(`Pushing chunk of size: ${chunk.length}`);
    this.push(chunk, 'utf8'); //[2]
    if (chance.bool({
        likelihood: 5
      })) { //[3]
      this.push(null);
    }
  }
}

module.exports = RandomStream;

在文件顶部,我们将加载我们的依赖关系。除了我们正在加载一个chance的npm模块之外,没有什么特别之处,它是一个用于生成各种随机值的库,从数字到字符串到整个句子都能生成随机值。

下一步是创建一个名为RandomStream的新类,并指定stream.Readable作为其父类。 在前面的代码中,我们调用父类的构造函数来初始化其内部状态,并将收到的options参数作为输入。通过options对象传递的可能参数包括以下内容:

  • 用于将Buffers转换为Stringsencoding参数(默认值为null
  • 是否启用对象模式(objectMode默认为false
  • 存储在内部buffer区中的数据的上限,一旦超过这个上限,则暂停从data source读取(highWaterMark默认为16KB

好的,现在让我们来解释一下我们重写的stream.Readable类的_read()方法:

  • 该方法使用chance生成随机字符串。
  • 它将字符串push内部buffer。 请注意,由于我们push的是String,此外我们还指定了编码为utf8(如果数据块只是一个二进制Buffer,则不需要)。
  • 5%的概率随机中断stream的随机字符串产生,通过pushnull到内部Buffer来表示EOF,即stream的结束。

我们还可以看到在_read()函数的输入中给出的size参数被忽略了,因为它是一个建议的参数。 我们可以简单地把所有可用的数据都push到内部的buffer中,但是如果在同一个调用中有多个推送,那么我们应该检查push()是否返回false,因为这意味着内部buffer已经达到了highWaterMark限制,我们应该停止添加更多的数据。

以上就是RandomStream模块,我们现在准备好使用它。我们来创建一个名为generateRandom.js的新模块,在这个模块中我们实例化一个新的RandomStream对象并从中提取一些数据:

const RandomStream = require('./randomStream');
const randomStream = new RandomStream();

randomStream.on('readable', () => {
  let chunk;
  while ((chunk = randomStream.read()) !== null) {
    console.log(`Chunk received: ${chunk.toString()}`);
  }
});

现在,一切都准备好了,我们尝试新的自定义的stream。 像往常一样简单地执行generateRandom模块,观察随机的字符串在屏幕上流动。

可写的Streams

一个可写的stream表示一个数据终点,在Node.js中,它使用stream模块中的Writable抽象类来实现。

写入一个stream

把一些数据放在可写入的stream中是一件简单的事情, 我们所要做的就是使用write()方法,它具有以下格式:

writable.write(chunk, [encoding], [callback])

encoding参数是可选的,其在chunkString类型时指定(默认为utf8,如果chunkBuffer,则忽略);当数据块被刷新到底层资源中时,callback就会被调用,callback参数也是可选的。

为了表示没有更多的数据将被写入stream中,我们必须使用end()方法:

writable.end([chunk], [encoding], [callback])

我们可以通过end()方法提供最后一块数据。在这种情况下,callbak函数相当于为finish事件注册一个监听器,当数据块全部被写入stream中时,会触发该事件。

现在,让我们通过创建一个输出随机字符串序列的小型HTTP服务器来演示这是如何工作的:

const Chance = require('chance');
const chance = new Chance();

require('http').createServer((req, res) => {
  res.writeHead(200, {
    'Content-Type': 'text/plain'
  }); //[1]
  while (chance.bool({
      likelihood: 95
    })) { //[2]
    res.write(chance.string() + '\n'); //[3]
  }
  res.end('\nThe end...\n'); //[4]
  res.on('finish', () => console.log('All data was sent')); //[5]
}).listen(8080, () => console.log('Listening on http://localhost:8080'));

我们创建了一个HTTP服务器,并把数据写入res对象,res对象是http.ServerResponse的一个实例,也是一个可写入的stream。下面来解释上述代码发生了什么:

  1. 我们首先写HTTP response的头部。请注意,writeHead()不是Writable接口的一部分,实际上,这个方法是http.ServerResponse类公开的辅助方法。
  2. 我们开始一个5%的概率终止的循环(进入循环体的概率为chance.bool()产生,其为95%)。
  3. 在循环内部,我们写入一个随机字符串到stream
  4. 一旦我们不在循环中,我们调用streamend(),表示没有更多

数据块将被写入。另外,我们在结束之前提供一个最终的字符串写入流中。

  1. 最后,我们注册一个finish事件的监听器,当所有的数据块都被刷新到底层socket中时,这个事件将被触发。

我们可以调用这个小模块称为entropyServer.js,然后执行它。要测试这个服务器,我们可以在地址http:// localhost:8080打开一个浏览器,或者从终端使用curl命令,如下所示:

curl localhost:8080

此时,服务器应该开始向您选择的HTTP客户端发送随机字符串(请注意,某些浏览器可能会缓冲数据,并且流式传输行为可能不明显)。

Back-pressure(反压)

类似于在真实管道系统中流动的液体,Node.jsstream也可能遭受瓶颈,数据写入速度可能快于stream的消耗。 解决这个问题的机制包括缓冲输入数据;然而,如果数据stream没有给生产者任何反馈,我们可能会产生越来越多的数据被累积到内部缓冲区的情况,导致内存泄露的发生。

为了防止这种情况的发生,当内部buffer超过highWaterMark限制时,writable.write()将返回false。 可写入的stream具有highWaterMark属性,这是write()方法开始返回false的内部Buffer区大小的限制,一旦Buffer区的大小超过这个限制,表示应用程序应该停止写入。 当缓冲器被清空时,会触发一个叫做drain的事件,通知再次开始写入是安全的。 这种机制被称为back-pressure

本节介绍的机制同样适用于可读的stream。事实上,在可读stream中也存在back-pressure,并且在_read()内调用的push()方法返回false时触发。 但是,这对于stream实现者来说是一个特定的问题,所以我们将不经常处理它。

我们可以通过修改之前创建的entropyServer模块来演示可写入的streamback-pressure

const Chance = require('chance');
const chance = new Chance();

require('http').createServer((req, res) => {
  res.writeHead(200, {
    'Content-Type': 'text/plain'
  });

  function generateMore() { //[1]
    while (chance.bool({
        likelihood: 95
      })) {
      const shouldContinue = res.write(
        chance.string({
          length: (16 * 1024) - 1
        }) //[2]
      );
      if (!shouldContinue) { //[3]
        console.log('Backpressure');
        return res.once('drain', generateMore);
      }
    }
    res.end('\nThe end...\n', () => console.log('All data was sent'));
  }
  generateMore();
}).listen(8080, () => console.log('Listening on http://localhost:8080'));

前面代码中最重要的步骤可以概括如下:

  1. 我们将主逻辑封装在一个名为generateMore()的函数中。
  2. 为了增加获得一些back-pressure的机会,我们将数据块的大小增加到16KB-1Byte,这非常接近默认的highWaterMark限制。
  3. 在写入一大块数据之后,我们检查res.write()的返回值。 如果它返回false,这意味着内部buffer已满,我们应该停止发送更多的数据。在这种情况下,我们从函数中退出,然后新注册一个写入事件的发布者,当drain事件触发时调用generateMore

如果我们现在尝试再次运行服务器,然后使用curl生成客户端请求,则很可能会有一些back-pressure,因为服务器以非常高的速度生成数据,速度甚至会比底层socket更快。

实现可写入的Streams

我们可以通过继承stream.Writable类来实现一个新的可写入的流,并为_write()方法提供一个实现。实现一个我们自定义的可写入的Streams类。

让我们构建一个可写入的stream,它接收对象的格式如下:

{
  path: <path to a file>
  content: <string or buffer>
}

这个类的作用是这样的:对于每一个对象,我们的stream必须将content部分保存到在给定路径中创建的文件中。 我们可以立即看到,我们stream的输入是对象,而不是StringsBuffers,这意味着我们的stream必须以对象模式工作。

调用模块toFileStream.js

const stream = require('stream');
const fs = require('fs');
const path = require('path');
const mkdirp = require('mkdirp');

class ToFileStream extends stream.Writable {
  constructor() {
    super({
      objectMode: true
    });
  }

  _write(chunk, encoding, callback) {
    mkdirp(path.dirname(chunk.path), err => {
      if (err) {
        return callback(err);
      }
      fs.writeFile(chunk.path, chunk.content, callback);
    });
  }
}
module.exports = ToFileStream;

作为第一步,我们加载所有我们所需要的依赖包。注意,我们需要模块mkdirp,正如你应该从前几章中所知道的,它应该使用npm安装。

我们创建了一个新类,它从stream.Writable扩展而来。

我们不得不调用父构造函数来初始化其内部状态;我们还提供了一个option对象作为参数,用于指定流在对象模式下工作(objectMode:true)。stream.Writable接受的其他选项如下:

  • highWaterMark(默认值是16KB):控制back-pressure的上限。
  • decodeStrings(默认为true):在字符串传递给_write()方法之前,将字符串自动解码为二进制buffer区。 在对象模式下这个参数被忽略。

最后,我们为_write()方法提供了一个实现。正如你所看到的,这个方法接受一个数据块,一个编码方式(只有在二进制模式下,stream选项decodeStrings设置为false时才有意义)。

另外,该方法接受一个回调函数,该函数在操作完成时需要调用;而不必要传递操作的结果,但是如果需要的话,我们仍然可以传递一个error对象,这将导致stream触发error事件。

现在,为了尝试我们刚刚构建的stream,我们可以创建一个名为writeToFile.js的新模块,并对该流执行一些写操作:

const ToFileStream = require('./toFileStream.js');
const tfs = new ToFileStream();

tfs.write({path: "file1.txt", content: "Hello"});
tfs.write({path: "file2.txt", content: "Node.js"});
tfs.write({path: "file3.txt", content: "Streams"});
tfs.end(() => console.log("All files created"));

有了这个,我们创建并使用了我们的第一个自定义的可写入流。 像往常一样运行新模块来检查其输出;你会看到执行后会创建三个新文件。

双重的Streams

双重的stream既是可读的,也可写的。 当我们想描述一个既是数据源又是数据终点的实体时(例如socket),这就显得十分有用了。 双工流继承stream.Readablestream.Writable的方法,所以它对我们来说并不新鲜。这意味着我们可以read()write()数据,或者可以监听readabledrain事件。

要创建一个自定义的双重stream,我们必须为_read()_write()提供一个实现。传递给Duplex()构造函数的options对象在内部被转发给ReadableWritable的构造函数。options参数的内容与前面讨论的相同,options增加了一个名为allowHalfOpen值(默认为true),如果设置为false,则会导致只要stream的一方(ReadableWritable)结束,stream就结束了。

为了使双重的stream在一方以对象模式工作,而在另一方以二进制模式工作,我们需要在流构造器中手动设置以下属性:
this._writableState.objectMode
this._readableState.objectMode

转换的Streams

转换的Streams是专门设计用于处理数据转换的一种特殊类型的双重Streams

在一个简单的双重Streams中,从stream中读取的数据和写入到其中的数据之间没有直接的关系(至少stream是不可知的)。 想想一个TCP socket,它只是向远程节点发送数据和从远程节点接收数据。TCP socket自身没有意识到输入和输出之间有任何关系。

下图说明了双重Streams中的数据流:

另一方面,转换的Streams对从可写入端接收到的每个数据块应用某种转换,然后在其可读端使转换的数据可用。

下图显示了数据如何在转换的Streams中流动:

从外面看,转换的Streams的接口与双重Streams的接口完全相同。但是,当我们想要构建一个新的双重Streams时,我们必须提供_read()_write()方法,而为了实现一个新的变换流,我们必须填写另一对方法:_transform()_flush())。

我们来演示如何用一个例子来创建一个新的转换的Streams

实现转换的Streams

我们来实现一个转换的Streams,它将替换给定所有出现的字符串。 要做到这一点,我们必须创建一个名为replaceStream.js的新模块。 让我们直接看怎么实现它:

const stream = require('stream');
const util = require('util');

class ReplaceStream extends stream.Transform {
  constructor(searchString, replaceString) {
    super();
    this.searchString = searchString;
    this.replaceString = replaceString;
    this.tailPiece = '';
  }

  _transform(chunk, encoding, callback) {
    const pieces = (this.tailPiece + chunk)         //[1]
      .split(this.searchString);
    const lastPiece = pieces[pieces.length - 1];
    const tailPieceLen = this.searchString.length - 1;

    this.tailPiece = lastPiece.slice(-tailPieceLen);     //[2]
    pieces[pieces.length - 1] = lastPiece.slice(0,-tailPieceLen);

    this.push(pieces.join(this.replaceString));       //[3]
    callback();
  }

  _flush(callback) {
    this.push(this.tailPiece);
    callback();
  }
}

module.exports = ReplaceStream;

与往常一样,我们将从其依赖项开始构建模块。这次我们没有使用第三方模块。

然后我们创建了一个从stream.Transform基类继承的新类。该类的构造函数接受两个参数:searchStringreplaceString。 正如你所想象的那样,它们允许我们定义要匹配的文本以及用作替换的字符串。 我们还初始化一个将由_transform()方法使用的tailPiece内部变量。

现在,我们来分析一下_transform()方法,它是我们新类的核心。_transform()方法与可写入的stream_write()方法具有几乎相同的格式,但不是将数据写入底层资源,而是使用this.push()将其推入内部buffer,这与我们会在可读流的_read()方法中执行。这显示了转换的Streams的双方如何实际连接。

ReplaceStream_transform()方法实现了我们这个新类的核心。正常情况下,搜索和替换buffer区中的字符串是一件容易的事情;但是,当数据流式传输时,情况则完全不同,可能的匹配可能分布在多个数据块中。代码后面的程序可以解释如下:

  1. 我们的算法使用searchString函数作为分隔符来分割块。
  2. 然后,它取出分隔后生成的数组的最后一项lastPiece,并提取其最后一个字符searchString.length - 1。结果被保存到tailPiece变量中,它将会被作为下一个数据块的前缀。
  3. 最后,所有从split()得到的片段用replaceString作为分隔符连接在一起,并推入内部buffer区。

stream结束时,我们可能仍然有最后一个tailPiece变量没有被压入内部缓冲区。这正是_flush()方法的用途;它在stream结束之前被调用,并且这是我们最终有机会完成流或者在完全结束流之前推送任何剩余数据的地方。

_flush()方法只需要一个回调函数作为参数,当所有的操作完成后,我们必须确保调用这个回调函数。完成了这个,我们已经完成了我们的ReplaceStream类。

现在,是时候尝试新的stream。我们可以创建另一个名为replaceStreamTest.js的模块来写入一些数据,然后读取转换的结果:

const ReplaceStream = require('./replaceStream');

const rs = new ReplaceStream('World', 'Node.js');
rs.on('data', chunk => console.log(chunk.toString()));

rs.write('Hello W');
rs.write('orld!');
rs.end();

为了使得这个例子更复杂一些,我们把搜索词分布在两个不同的数据块上;然后,使用flowing模式,我们从同一个stream中读取数据,记录每个已转换的块。运行前面的程序应该产生以下输出:

Hel
lo Node.js
!
有一个值得提及是,第五种类型的stream:stream.PassThrough。 与我们介绍的其他流类不同,PassThrough不是抽象的,可以直接实例化,而不需要实现任何方法。实际上,这是一个可转换的stream,它可以输出每个数据块,而不需要进行任何转换。

使用管道连接Streams

Unix管道的概念是由Douglas Mcllroy发明的;这使程序的输出能够连接到下一个的输入。看看下面的命令:

echo Hello World! | sed s/World/Node.js/g

在前面的命令中,echo会将Hello World!写入标准输出,然后被重定向到sed命令的标准输入(因为有管道操作符 |)。 然后sedNode.js替换任何World,并将结果打印到它的标准输出(这次是控制台)。

以类似的方式,可以使用可读的Streamspipe()方法将Node.jsStreams连接在一起,它具有以下接口:

readable.pipe(writable, [options])

非常直观地,pipe()方法将从可读的Streams中发出的数据抽取到所提供的可写入的Streams中。 另外,当可读的Streams发出end事件(除非我们指定{end:false}作为options)时,可写入的Streams将自动结束。 pipe()方法返回作为参数传递的可写入的Streams,如果这样的stream也是可读的(例如双重或可转换的Streams),则允许我们创建链式调用。

将两个Streams连接到一起时,则允许数据自动流向可写入的Streams,所以不需要调用read()write()方法;但最重要的是不需要控制back-pressure,因为它会自动处理。

举个简单的例子(将会有大量的例子),我们可以创建一个名为replace.js的新模块,它接受来自标准输入的文本流,应用替换转换,然后将数据返回到标准输出:

const ReplaceStream = require('./replaceStream');
process.stdin
  .pipe(new ReplaceStream(process.argv[2], process.argv[3]))
  .pipe(process.stdout);

上述程序将来自标准输入的数据传送到ReplaceStream,然后返回到标准输出。 现在,为了实践这个小应用程序,我们可以利用Unix管道将一些数据重定向到它的标准输入,如下所示:

echo Hello World! | node replace World Node.js

运行上述程序,会输出如下结果:

Hello Node.js

这个简单的例子演示了Streams(特别是文本Streams)是一个通用接口,管道几乎是构成和连接所有这些接口的通用方式。

error事件不会通过管道自动传播。举个例子,看如下代码片段:
stream1
  .pipe(stream2)
  .on('error', function() {});
在前面的链式调用中,我们将只捕获来自stream2的错误,这是由于我们给其添加了erorr事件侦听器。这意味着,如果我们想捕获从stream1生成的任何错误,我们必须直接附加另一个错误侦听器。 稍后我们将看到一种可以实现共同错误捕获的另一种模式(合并Streams)。 此外,我们应该注意到,如果目标Streams(读取的Streams)发出错误,它将会对源Streams通知一个error,之后导致管道的中断。

Streams如何通过管道

到目前为止,我们创建自定义Streams的方式并不完全遵循Node定义的模式;实际上,从stream基类继承是违反small surface area的,并需要一些示例代码。 这并不意味着Streams设计得不好,实际上,我们不应该忘记,因为StreamsNode.js核心的一部分,所以它们必须尽可能地灵活,广泛拓展Streams以致于用户级模块能够将它们充分运用。

然而,大多数情况下,我们并不需要原型继承可以给予的所有权力和可扩展性,但通常我们想要的仅仅是定义新Streams的一种快速开发的模式。Node.js社区当然也为此创建了一个解决方案。 一个完美的例子是through2,一个使得我们可以简单地创建转换的Streams的小型库。 通过through2,我们可以通过调用一个简单的函数来创建一个新的可转换的Streams

const transform = through2([options], [_transform], [_flush]);

类似的,from2也允许我们像下面这样创建一个可读的Streams

const readable = from2([options], _read);

接下来,我们将在本章其余部分展示它们的用法,那时,我们会清楚使用这些小型库的好处。

throughfrom是基于Stream1规范的顶层库。

基于Streams的异步控制流

通过我们已经介绍的例子,应该清楚的是,Streams不仅可以用来处理I / O,而且可以用作处理任何类型数据的优雅编程模式。 但优点并不止这些;还可以利用Streams来实现异步控制流,在本节将会看到。

顺序执行

默认情况下,Streams将按顺序处理数据;例如,转换的Streams_transform()函数在前一个数据块执行callback()之后才会进行下一块数据块的调用。这是Streams的一个重要属性,按正确顺序处理每个数据块至关重要,但是也可以利用这一属性将Streams实现优雅的传统控制流模式。

代码总是比太多的解释要好得多,所以让我们来演示一下如何使用流来按顺序执行异步任务的例子。让我们创建一个函数来连接一组接收到的文件作为输入,确保遵守提供的顺序。我们创建一个名为concatFiles.js的新模块,并从其依赖开始:

const fromArray = require('from2-array');
const through = require('through2');
const fs = require('fs');

我们将使用through2来简化转换的Streams的创建,并使用from2-array从一个对象数组中创建可读的Streams
接下来,我们可以定义concatFiles()函数:

function concatFiles(destination, files, callback) {
  const destStream = fs.createWriteStream(destination);
  fromArray.obj(files)             //[1]
    .pipe(through.obj((file, enc, done) => {   //[2]
      const src = fs.createReadStream(file);
      src.pipe(destStream, {end: false});
      src.on('end', done); //[3]
    }))
    .on('finish', () => {         //[4]
      destStream.end();
      callback();
    });
}

module.exports = concatFiles;

前面的函数通过将files数组转换为Streams来实现对files数组的顺序迭代。 该函数所遵循的程序解释如下:

  1. 首先,我们使用from2-arrayfiles数组创建一个可读的Streams
  2. 接下来,我们使用through来创建一个转换的Streams来处理序列中的每个文件。对于每个文件,我们创建一个可读的Streams,并通过管道将其输入到表示输出文件的destStream中。 在源文件完成读取后,通过在pipe()方法的第二个参数中指定{end:false},我们确保不关闭destStream
  3. 当源文件的所有内容都被传送到destStream时,我们调用through.obj公开的done函数来传递当前处理已经完成,在我们的情况下这是需要触发处理下一个文件。
  4. 所有文件处理完后,finish事件被触发。我们最后可以结束destStream并调用concatFiles()callback()函数,这个函数表示整个操作的完成。

我们现在可以尝试使用我们刚刚创建的小模块。让我们创建一个名为concat.js的新文件来完成一个示例:

const concatFiles = require('./concatFiles');

concatFiles(process.argv[2], process.argv.slice(3), () => {
  console.log('Files concatenated successfully');
});

我们现在可以运行上述程序,将目标文件作为第一个命令行参数,接着是要连接的文件列表,例如:

node concat allTogether.txt file1.txt file2.txt

执行这一条命令,会创建一个名为allTogether.txt的新文件,其中按顺序保存file1.txtfile2.txt的内容。

使用concatFiles()函数,我们能够仅使用Streams实现异步操作的顺序执行。正如我们在Chapter3 Asynchronous Control Flow Patters with Callbacks中看到的那样,如果使用纯JavaScript实现,或者使用async等外部库,则需要使用或实现迭代器。我们现在提供了另外一个可以达到同样效果的方法,正如我们所看到的,它的实现方式非常优雅且可读性高。

模式:使用Streams或Streams的组合,可以轻松地按顺序遍历一组异步任务。

无序并行执行

我们刚刚看到Streams按顺序处理每个数据块,但有时这可能并不能这么做,因为这样并没有充分利用Node.js的并发性。如果我们必须对每个数据块执行一个缓慢的异步操作,那么并行化执行这一组异步任务完全是有必要的。当然,只有在每个数据块之间没有关系的情况下才能应用这种模式,这些数据块可能经常发生在对象模式的Streams中,但是对于二进制模式的Streams很少使用无序的并行执行。

注意:当处理数据的顺序很重要时,不能使用无序并行执行的Streams。

为了并行化一个可转换的Streams的执行,我们可以运用Chapter3 Asynchronous Control Flow Patters with Callbacks所讲到的无序并行执行的相同模式,然后做出一些改变使它们适用于Streams。让我们看看这是如何更改的。

实现一个无序并行的Streams

让我们用一个例子直接说明:我们创建一个叫做parallelStream.js的模块,然后自定义一个普通的可转换的Streams,然后给出一系列可转换流的方法:

const stream = require('stream');

class ParallelStream extends stream.Transform {
  constructor(userTransform) {
    super({objectMode: true});
    this.userTransform = userTransform;
    this.running = 0;
    this.terminateCallback = null;
  }

  _transform(chunk, enc, done) {
    this.running++;
    this.userTransform(chunk, enc, this._onComplete.bind(this), this.push.bind(this));
    done();
  }

  _flush(done) {
    if(this.running > 0) {
      this.terminateCallback = done;
    } else {
      done();
    }
  }

  _onComplete(err) {
    this.running--;
    if(err) {
      return this.emit('error', err);
    }
    if(this.running === 0) {
      this.terminateCallback && this.terminateCallback();
    }
  }
}

module.exports = ParallelStream;

我们来分析一下这个新的自定义的类。正如你所看到的一样,构造函数接受一个userTransform()函数作为参数,然后将其另存为一个实例变量;我们也调用父构造函数,并且我们默认启用对象模式。

接下来,来看_transform()方法,在这个方法中,我们执行userTransform()函数,然后增加当前正在运行的任务个数; 最后,我们通过调用done()来通知当前转换步骤已经完成。_transform()方法展示了如何并行处理另一项任务。我们不用等待userTransform()方法执行完毕再调用done()。 相反,我们立即执行done()方法。另一方面,我们提供了一个特殊的回调函数给userTransform()方法,这就是this._onComplete()方法;以便我们在userTransform()完成的时候收到通知。

Streams终止之前,会调用_flush()方法,所以如果仍有任务正在运行,我们可以通过不立即调用done()回调函数来延迟finish事件的触发。相反,我们将其分配给this.terminateCallback变量。为了理解Streams如何正确终止,来看_onComplete()方法。

在每组异步任务最终完成时,_onComplete()方法会被调用。首先,它会检查是否有任务正在运行,如果没有,则调用this.terminateCallback()函数,这将导致Streams结束,触发_flush()方法的finish事件。

利用刚刚构建的ParallelStream类可以轻松地创建一个无序并行执行的可转换的Streams实例,但是有个注意:它不会保留项目接收的顺序。实际上,异步操作可以在任何时候都有可能完成并推送数据,而跟它们开始的时刻并没有必然的联系。因此我们知道,对于二进制模式的Streams并不适用,因为二进制的Streams对顺序要求较高。

实现一个URL监控应用程序

现在,让我们使用ParallelStream模块实现一个具体的例子。让我们想象以下我们想要构建一个简单的服务来监控一个大URL列表的状态,让我们想象以下,所有的这些URL包含在一个单独的文件中,并且每一个URL占据一个空行。

Streams能够为这个场景提供一个高效且优雅的解决方案。特别是当我们使用我们刚刚写的ParallelStream类来无序地审核这些URL

接下来,让我们创建一个简单的放在checkUrls.js模块的应用程序。

const fs = require('fs');
const split = require('split');
const request = require('request');
const ParallelStream = require('./parallelStream');

fs.createReadStream(process.argv[2])         //[1]
  .pipe(split())                             //[2]
  .pipe(new ParallelStream((url, enc, done, push) => {     //[3]
    if(!url) return done();
    request.head(url, (err, response) => {
      push(url + ' is ' + (err ? 'down' : 'up') + '\n');
      done();
    });
  }))
  .pipe(fs.createWriteStream('results.txt'))   //[4]
  .on('finish', () => console.log('All urls were checked'))
;

正如我们所看到的,通过流,我们的代码看起来非常优雅,直观。 让我们看看它是如何工作的:

  1. 首先,我们通过给定的文件参数创建一个可读的Streams,便于接下来读取文件。
  2. 我们通过split将输入的文件的Streams的内容输出一个可转换的Streams到管道中,并且分隔了数据块的每一行。
  3. 然后,是时候使用我们的ParallelStream来检查URL了,我们发送一个HEAD请求然后等待请求的response。当请求返回时,我们把请求的结果pushstream中。
  4. 最后,通过管道把结果保存到results.txt文件中。
node checkUrls urlList.txt

这里的文件urlList.txt包含一组URL,例如:

  • http://www.mariocasciaro.me/
  • http://loige.co/
  • http://thiswillbedownforsure.com/

当应用执行完成后,我们可以看到一个文件results.txt被创建,里面包含有操作的结果,例如:

  • http://thiswillbedownforsure.com is down
  • http://loige.co is up
  • http://www.mariocasciaro.me is up

输出的结果的顺序很有可能与输入文件中指定URL的顺序不同。这是Streams无序并行执行任务的明显特征。

出于好奇,我们可能想尝试用一个正常的through2流替换ParallelStream,并比较两者的行为和性能(你可能想这样做的一个练习)。我们将会看到,使用through2的方式会比较慢,因为每个URL都将按顺序进行检查,而且文件results.txt中结果的顺序也会被保留。

无序限制并行执行

如果运行包含数千或数百万个URL的文件的checkUrls应用程序,我们肯定会遇到麻烦。我们的应用程序将同时创建不受控制的连接数量,并行发送大量数据,并可能破坏应用程序的稳定性和整个系统的可用性。我们已经知道,控制负载的无序限制并行执行是一个极好的解决方案。

让我们通过创建一个limitedParallelStream.js模块来看看它是如何工作的,这个模块是改编自上一节中创建的parallelStream.js模块。

让我们看看它的构造函数:

class LimitedParallelStream extends stream.Transform {
  constructor(concurrency, userTransform) {
    super({objectMode: true});
    this.concurrency = concurrency;
    this.userTransform = userTransform;
    this.running = 0;
    this.terminateCallback = null;
    this.continueCallback = null;
  }
// ...
}

我们需要一个concurrency变量作为输入来限制并发量,这次我们要保存两个回调函数,continueCallback用于任何挂起的_transform方法,terminateCallback用于_flush方法的回调。
接下来看_transform()方法:

_transform(chunk, enc, done) {
  this.running++;
  this.userTransform(chunk, enc,  this.push.bind(this), this._onComplete.bind(this));
  if(this.running < this.concurrency) {
    done();
  } else {
    this.continueCallback = done;
  }
}

这次在_transform()方法中,我们必须在调用done()之前检查是否达到了最大并行数量的限制,如果没有达到了限制,才能触发下一个项目的处理。如果我们已经达到最大并行数量的限制,我们可以简单地将done()回调保存到continueCallback变量中,以便在任务完成后立即调用它。

_flush()方法与ParallelStream类保持完全一样,所以我们直接转到实现_onComplete()方法:

_onComplete(err) {
  this.running--;
  if(err) {
    return this.emit('error', err);
  }
  const tmpCallback = this.continueCallback;
  this.continueCallback = null;
  tmpCallback && tmpCallback();
  if(this.running === 0) {
    this.terminateCallback && this.terminateCallback();
  }
}

每当任务完成,我们调用任何已保存的continueCallback()将导致
stream解锁,触发下一个项目的处理。

这就是limitedParallelStream模块。 我们现在可以在checkUrls模块中使用它来代替parallelStream,并且将我们的任务的并发限制在我们设置的值上。

顺序并行执行

我们以前创建的并行Streams可能会使得数据的顺序混乱,但是在某些情况下这是不可接受的。有时,实际上,有那种需要每个数据块都以接收到的相同顺序发出的业务场景。我们仍然可以并行运行transform函数。我们所要做的就是对每个任务发出的数据进行排序,使其遵循与接收数据相同的顺序。

这种技术涉及使用buffer,在每个正在运行的任务发出时重新排序块。为简洁起见,我们不打算提供这样一个stream的实现,因为这本书的范围是相当冗长的;我们要做的就是重用为了这个特定目的而构建的npm上的一个可用包,例如through2-parallel

我们可以通过修改现有的checkUrls模块来快速检查一个有序的并行执行的行为。 假设我们希望我们的结果按照与输入文件中的URL相同的顺序编写。 我们可以使用通过through2-parallel来实现:

const fs = require('fs');
const split = require('split');
const request = require('request');
const throughParallel = require('through2-parallel');

fs.createReadStream(process.argv[2])
  .pipe(split())
  .pipe(throughParallel.obj({concurrency: 2}, function (url, enc, done) {
    if(!url) return done();
    request.head(url, (err, response) => {
      this.push(url + ' is ' + (err ? 'down' : 'up') + '\n');
      done();
    });
  }))
  .pipe(fs.createWriteStream('results.txt'))
  .on('finish', () => console.log('All urls were checked'))
;

正如我们所看到的,through2-parallel的接口与through2的接口非常相似;唯一的不同是在through2-parallel还可以为我们提供的transform函数指定一个并发限制。如果我们尝试运行这个新版本的checkUrls,我们会看到results.txt文件列出结果的顺序与输入文件中
URLs的出现顺序是一样的。

通过这个,我们总结了使用Streams实现异步控制流的分析;接下来,我们研究管道模式。

管道模式

就像在现实生活中一样,Node.jsStreams也可以按照不同的模式进行管道连接。事实上,我们可以将两个不同的Streams合并成一个Streams,将一个Streams分成两个或更多的管道,或者根据条件重定向流。 在本节中,我们将探讨可应用于Node.jsStreams最重要的管道技术。

组合的Streams

在本章中,我们强调Streams提供了一个简单的基础结构来模块化和重用我们的代码,但是却漏掉了一个重要的部分:如果我们想要模块化和重用整个流水线?如果我们想要合并多个Streams,使它们看起来像外部的Streams,那该怎么办?下图显示了这是什么意思:

从上图中,我们看到了如何组合几个流的了:

  • 当我们写入组合的Streams的时候,实际上我们是写入组合的Streams的第一个单元,即StreamA
  • 当我们从组合的Streams中读取信息时,实际上我们从组合的Streams的最后一个单元中读取。

一个组合的Streams通常是一个多重的Streams,通过连接第一个单元的写入端和连接最后一个单元的读取端。

要从两个不同的Streams(一个可读的Streams和一个可写入的Streams)中创建一个多重的Streams,我们可以使用一个npm模块,例如duplexer2

但上述这么做并不完整。实际上,组合的Streams还应该做到捕获到管道中任意一段Streams单元产生的错误。我们已经说过,任何错误都不会自动传播到管道中。 所以,我们必须有适当的错误管理,我们将不得不显式附加一个错误监听器到每个Streams。但是,组合的Streams实际上是一个黑盒,这意味着我们无法访问管道中间的任何单元,所以对于管道中任意单元的异常捕获,组合的Streams也充当聚合器的角色。

总而言之,组合的Streams具有两个主要优点:

  • 管道内部是一个黑盒,对使用者不可见。
  • 简化了错误管理,因为我们不必为管道中的每个单元附加一个错误侦听器,而只需要给组合的Streams自身附加上就可以了。

组合的Streams是一个非常通用和普遍的做法,所以如果我们没有任何特殊的需要,我们可能只想重用现有的解决方案,如multipipecombine-stream

实现一个组合的Streams

为了说明一个简单的例子,我们来考虑下面两个组合的Streams的情况:

  • 压缩和加密数据
  • 解压和解密数据

使用诸如multipipe之类的库,我们可以通过组合一些核心库中已有的Streams(文件combinedStreams.js)来轻松地构建组合的Streams

const zlib = require('zlib');
const crypto = require('crypto');
const combine = require('multipipe');
module.exports.compressAndEncrypt = password => {
  return combine(
    zlib.createGzip(),
    crypto.createCipher('aes192', password)
  );
};
module.exports.decryptAndDecompress = password => {
  return combine(
    crypto.createDecipher('aes192', password),
    zlib.createGunzip()
  );
};

例如,我们现在可以使用这些组合的数据流,如同黑盒,这些对我们均是不可见的,可以创建一个小型应用程序,通过压缩和加密来归档文件。 让我们在一个名为archive.js的新模块中做这件事:

const fs = require('fs');
const compressAndEncryptStream = require('./combinedStreams').compressAndEncrypt;
fs.createReadStream(process.argv[3])
  .pipe(compressAndEncryptStream(process.argv[2]))
  .pipe(fs.createWriteStream(process.argv[3] + ".gz.enc"));

我们可以通过从我们创建的流水线中构建一个组合的Stream来进一步改进前面的代码,但这次并不只是为了获得对外不可见的黑盒,而是为了进行异常捕获。 实际上,正如我们已经提到过的那样,写下如下的代码只会捕获最后一个Stream单元发出的错误:

fs.createReadStream(process.argv[3])
  .pipe(compressAndEncryptStream(process.argv[2]))
  .pipe(fs.createWriteStream(process.argv[3] + ".gz.enc"))
  .on('error', function(err) {
    // 只会捕获最后一个单元的错误
    console.log(err);
  });

但是,通过把所有的Streams结合在一起,我们可以优雅地解决这个问题。重构后的archive.js如下:

const combine = require('multipipe');
   const fs = require('fs');
   const compressAndEncryptStream =
     require('./combinedStreams').compressAndEncrypt;
   combine(
     fs.createReadStream(process.argv[3])
     .pipe(compressAndEncryptStream(process.argv[2]))
     .pipe(fs.createWriteStream(process.argv[3] + ".gz.enc"))
   ).on('error', err => {
     // 使用组合的Stream可以捕获任意位置的错误
     console.log(err);
   });

正如我们所看到的,我们现在可以将一个错误侦听器直接附加到组合的Streams,它将接收任何内部流发出的任何error事件。
现在,要运行archive模块,只需在命令行参数中指定passwordfile参数,即压缩模块的参数:

node archive mypassword /path/to/a/file.text

通过这个例子,我们已经清楚地证明了组合的Stream是多么重要; 从一个方面来说,它允许我们创建流的可重用组合,从另一方面来说,它简化了管道的错误管理。

分开的Streams

我们可以通过将单个可读的Stream管道化为多个可写入的Stream来执行Stream的分支。当我们想要将相同的数据发送到不同的目的地时,这便体现其作用了,例如,两个不同的套接字或两个不同的文件。当我们想要对相同的数据执行不同的转换时,或者当我们想要根据一些标准拆分数据时,也可以使用它。如图所示:

Node.js中分开的Stream是一件小事。举例说明。

实现一个多重校验和的生成器

让我们创建一个输出给定文件的sha1md5散列的小工具。我们来调用这个新模块generateHashes.js,看如下的代码:

const fs = require('fs');
const crypto = require('crypto');
const sha1Stream = crypto.createHash('sha1');
sha1Stream.setEncoding('base64');
const md5Stream = crypto.createHash('md5');
md5Stream.setEncoding('base64');

目前为止没什么特别的 该模块的下一个部分实际上是我们将从文件创建一个可读的Stream,并将其分叉到两个不同的流,以获得另外两个文件,其中一个包含sha1散列,另一个包含md5校验和:

const inputFile = process.argv[2];
const inputStream = fs.createReadStream(inputFile);
inputStream
  .pipe(sha1Stream)
  .pipe(fs.createWriteStream(inputFile + '.sha1'));
inputStream
  .pipe(md5Stream)
  .pipe(fs.createWriteStream(inputFile + '.md5'));

这很简单:inputStream变量通过管道一边输入到sha1Stream,另一边输入到md5Stream。但是要注意:

  • inputStream结束时,md5Streamsha1Stream会自动结束,除非当调用pipe()时指定了end选项为false
  • Stream的两个分支会接受相同的数据块,因此当对数据执行一些副作用的操作时我们必须非常谨慎,因为那样会影响另外一个分支。
  • 黑盒外会产生背压,来自inputStream的数据流的流速会根据接收最慢的分支的流速作出调整。

合并的Streams

合并与分开相对,通过把一组可读的Streams合并到一个单独的可写的Stream里,如图所示:

将多个Streams合并为一个通常是一个简单的操作; 然而,我们必须注意我们处理end事件的方式,因为使用自动结束选项的管道系统会在一个源结束时立即结束目标流。 这通常会导致错误,因为其他还未结束的源将继续写入已终止的Stream。 解决此问题的方法是在将多个源传输到单个目标时使用选项{end:false},并且只有在所有源完成读取后才在目标Stream上调用end()

用多个源文件压缩为一个压缩包

举一个简单的例子,我们来实现一个小程序,它根据两个不同目录的内容创建一个压缩包。 为此,我们将介绍两个新的npm模块:

  • tar用来创建压缩包
  • fstream从文件系统文件创建对象streams的库

我们创建一个新模块mergeTar.js,如下开始初始化:

var tar = require('tar');
var fstream = require('fstream');
var path = require('path');
var destination = path.resolve(process.argv[2]);
var sourceA = path.resolve(process.argv[3]);
var sourceB = path.resolve(process.argv[4]);

在前面的代码中,我们只加载全部依赖包和初始化包含目标文件和两个源目录(sourceAsourceB)的变量。

接下来,我们创建tarStream并通过管道输出到一个可写入的Stream

const pack = tar.Pack();
pack.pipe(fstream.Writer(destination));

现在,我们开始初始化源Stream

let endCount = 0;

function onEnd() {
  if (++endCount === 2) {
    pack.end();
  }
}

const sourceStreamA = fstream.Reader({
    type: "Directory",
    path: sourceA
  })
  .on('end', onEnd);

const sourceStreamB = fstream.Reader({
    type: "Directory",
    path: sourceB
  })
  .on('end', onEnd);

在前面的代码中,我们创建了从两个源目录(sourceStreamAsourceStreamB)中读取的Stream那么对于每个源Stream,我们附加一个end事件订阅者,只有当这两个目录被完全读取时,才会触发packend事件。

最后,合并两个Stream

sourceStreamA.pipe(pack, {end: false});
sourceStreamB.pipe(pack, {end: false});

我们将两个源文件都压缩到pack这个Stream中,并通过设定pipe()option参数为{end:false}配置终点Stream的自动触发end事件。

这样,我们已经完成了我们简单的TAR程序。我们可以通过提供目标文件作为第一个命令行参数,然后是两个源目录来尝试运行这个实用程序:

node mergeTar dest.tar /path/to/sourceA /path/to/sourceB

npm中我们可以找到一些可以简化Stream的合并的模块:

要注意,流入目标Stream的数据是随机混合的,这是一个在某些类型的对象流中可以接受的属性(正如我们在上一个例子中看到的那样),但是在处理二进制Stream时通常是一个不希望这样。

然而,我们可以通过一种模式按顺序合并Stream; 它包含一个接一个地合并源Stream,当前一个结束时,开始发送第二段数据块(就像连接所有源Stream的输出一样)。在npm上,我们可以找到一些也处理这种情况的软件包。其中之一是multistream

多路复用和多路分解

合并Stream模式有一个特殊的模式,我们并不是真的只想将多个Stream合并在一起,而是使用一个共享通道来传送一组数据Stream。与之前的不一样,因为源数据Stream在共享通道内保持逻辑分离,这使得一旦数据到达共享通道的另一端,我们就可以再次分离数据Stream。如图所示:

将多个Stream组合在单个Stream上传输的操作被称为多路复用,而相反的操作(即,从共享Stream接收数据重构原始的Stream)则被称为多路分用。执行这些操作的设备分别称为多路复用器和多路分解器(。 这是一个在计算机科学和电信领域广泛研究的话题,因为它是几乎任何类型的通信媒体,如电话,广播,电视,当然还有互联网本身的基础之一。 对于本书的范围,我们不会过多解释,因为这是一个很大的话题。

我们想在本节中演示的是,如何使用共享的Node.js Streams来传送多个逻辑上分离的Stream,然后在共享Stream的另一端再次分离,即实现一次多路复用和多路分解。

创建一个远程logger日志记录

举例说明,我们希望有一个小程序来启动子进程,并将其标准输出和标准错误都重定向到远程服务器,服务器接受它们然后保存为两个单独的文件。因此,在这种情况下,共享介质是TCP连接,而要复用的两个通道是子进程的stdoutstderr。 我们将利用分组交换的技术,这种技术与IPTCPUDP等协议所使用的技术相同,包括将数据封装在数据包中,允许我们指定各种源信息,这对多路复用,路由,控制 流程,检查损坏的数据都十分有帮助。

如图所示,这个例子的协议大概是这样,数据被封装成具有以下结构的数据包:

在客户端实现多路复用

先说客户端,创建一个名为client.js的模块,这是我们这个应用程序的一部分,它负责启动一个子进程并实现Stream多路复用。

开始定义模块,首先加载依赖:

const child_process = require('child_process');
const net = require('net');

然后开始实现多路复用的函数:

function multiplexChannels(sources, destination) {
  let totalChannels = sources.length;

  for(let i = 0; i < sources.length; i++) {
    sources[i]
      .on('readable', function() { // [1]
        let chunk;
        while ((chunk = this.read()) !== null) {
          const outBuff = new Buffer(1 + 4 + chunk.length); // [2]
          outBuff.writeUInt8(i, 0);
          outBuff.writeUInt32BE(chunk.length, 1);
          chunk.copy(outBuff, 5);
          console.log('Sending packet to channel: ' + i);
          destination.write(outBuff); // [3]
        }
      })
      .on('end', () => { //[4]
        if (--totalChannels === 0) {
          destination.end();
        }
      });
  }
}

multiplexChannels()函数接受要复用的源Stream作为输入
和复用接口作为参数,然后执行以下步骤:

  1. 对于每个源Stream,它会注册一个readable事件侦听器,我们使用non-flowing模式从流中读取数据。
  2. 每读取一个数据块,我们将其封装到一个首部中,首部的顺序为:channel ID为1字节(UInt8),数据包大小为4字节(UInt32BE),然后为实际数据。
  3. 数据包准备好后,我们将其写入目标Stream
  4. 我们为end事件注册一个监听器,以便当所有源Stream结束时,end事件触发,通知目标Stream触发end事件。
注意,我们的协议最多能够复用多达256个不同的源流,因为我们只有1个字节来标识channel
const socket = net.connect(3000, () => { // [1]
  const child = child_process.fork( // [2]
    process.argv[2],
    process.argv.slice(3), {
      silent: true
    }
  );
  multiplexChannels([child.stdout, child.stderr], socket); // [3]
});

在最后,我们执行以下操作:

  1. 我们创建一个新的TCP客户端连接到地址localhost:3000
  2. 我们通过使用第一个命令行参数作为路径来启动子进程,同时我们提供剩余的process.argv数组作为子进程的参数。我们指定选项{silent:true},以便子进程不会继承父级的stdoutstderr
  3. 我们使用mutiplexChannels()函数将stdoutstderr多路复用到socket里。
在服务端实现多路分解

现在来看服务端,创建server.js模块,在这里我们将来自远程连接的Stream多路分解,并将它们传送到两个不同的文件中。

首先创建一个名为demultiplexChannel()的函数:

function demultiplexChannel(source, destinations) {
  let currentChannel = null;
  let currentLength = null;
  source
    .on('readable', () => { //[1]
      let chunk;
      if(currentChannel === null) {          //[2]
        chunk = source.read(1);
        currentChannel = chunk && chunk.readUInt8(0);
      }
    
      if(currentLength === null) {          //[3]
        chunk = source.read(4);
        currentLength = chunk && chunk.readUInt32BE(0);
        if(currentLength === null) {
          return;
        }
      }
    
      chunk = source.read(currentLength);        //[4]
      if(chunk === null) {
        return;
      }
    
      console.log('Received packet from: ' + currentChannel);
    
      destinations[currentChannel].write(chunk);      //[5]
      currentChannel = null;
      currentLength = null;
    })
    .on('end', () => {            //[6]
      destinations.forEach(destination => destination.end());
      console.log('Source channel closed');
    })
  ;
}

上面的代码可能看起来很复杂,仔细阅读并非如此;由于Node.js可读的Stream的拉动特性,我们可以很容易地实现我们的小协议的多路分解,如下所示:

  1. 我们开始使用non-flowing模式从流中读取数据。
  2. 首先,如果我们还没有读取channel ID,我们尝试从流中读取1个字节,然后将其转换为数字。
  3. 下一步是读取首部的长度。我们需要读取4个字节,所以有可能在内部Buffer还没有足够的数据,这将导致this.read()调用返回null。在这种情况下,我们只是中断解析,然后重试下一个readable事件。
  4. 当我们最终还可以读取数据大小时,我们知道从内部Buffer中拉出多少数据,所以我们尝试读取所有数据。
  5. 当我们读取所有的数据时,我们可以把它写到正确的目标通道,一定要记得重置currentChannelcurrentLength变量(这些变量将被用来解析下一个数据包)。
  6. 最后,当源channel结束时,一定不要忘记调用目标Streamend()方法。

既然我们可以多路分解源Stream,进行如下调用:

net.createServer(socket => {
  const stdoutStream = fs.createWriteStream('stdout.log');
  const stderrStream = fs.createWriteStream('stderr.log');
  demultiplexChannel(socket, [stdoutStream, stderrStream]);
})
  .listen(3000, () => console.log('Server started'))
;

在上面的代码中,我们首先在3000端口上启动一个TCP服务器,然后对于我们接收到的每个连接,我们将创建两个可写入的Stream,指向两个不同的文件,一个用于标准输出,另一个用于标准错误; 这些是我们的目标channel。 最后,我们使用demultiplexChannel()将套接字流解复用为stdoutStreamstderrStream

运行多路复用和多路分解应用程序

现在,我们准备尝试运行我们的新的多路复用/多路分解应用程序,但首先让我们创建一个小的Node.js程序来产生一些示例输出; 我们把它叫做generateData.js

console.log("out1");
console.log("out2");
console.error("err1");
console.log("out3");
console.error("err2");

首先,让我们开始运行服务端:

node server

然后运行客户端,需要提供作为子进程的文件参数:

node client generateData.js

客户端几乎立马运行,但是进程结束时,generateData应用程序的标准输入和标准输出经过一个TCP连接,然后在服务器端,被多路分解成两个文件。

注意,当我们使用child_process.fork()时,我们的客户端能够启动别的Node.js模块。

对象Streams的多路复用和多路分解

我们刚刚展示的例子演示了如何复用和解复用二进制/文本Stream,但值得一提的是,相同的规则也适用于对象Stream。 最大的区别是,使用对象,我们已经有了使用原子消息(对象)传输数据的方法,所以多路复用就像设置一个属性channel ID到每个对象一样简单,而多路分解只需要读·channel ID属性,并将每个对象路由到正确的目标Stream

还有一种模式是取一个对象上的几个属性并分发到多个目的Stream的模式 通过这种模式,我们可以实现复杂的流程,如下图所示:

如上图所示,取一个对象Stream表示animals,然后根据动物类型:reptilesamphibiansmammals,然后分发到正确的目标Stream中。

总结

在本章中,我们已经对Node.js Streams及其使用案例进行了阐述,但同时也应该为编程范式打开一扇大门,几乎具有无限的可能性。我们了解了为什么StreamNode.js社区赞誉,并且我们掌握了它们的基本功能,使我们能够利用它做更多有趣的事情。我们分析了一些先进的模式,并开始了解如何将不同配置的Streams连接在一起,掌握这些特性,从而使流如此多才多艺,功能强大。

如果我们遇到不能用一个Stream来实现的功能,我们可以通过将其他Streams连接在一起来实现,这是Node.js的一个很好的特性;Streams在处理二进制数据,字符串和对象都十分有用,并具有鲜明的特点。

在下一章中,我们将重点介绍传统的面向对象的设计模式。尽管JavaScript在某种程度上是面向对象的语言,但在Node.js中,函数式或混合方法通常是首选。在阅读下一章便揭晓答案。

查看原文

赞 27 收藏 121 评论 2

simon_woo 关注了标签 · 2017-09-25

graphql

GraphQL 是一个由Facebook提出的 应用层查询语言. 使用 GraphQL, 你可以基于图模式定义你的后端. 然后客户端就可以请求所需要的数据集。

关注 118

认证与成就

  • 获得 690 次点赞
  • 获得 6 枚徽章 获得 0 枚金徽章, 获得 1 枚银徽章, 获得 5 枚铜徽章

擅长技能
编辑

开源项目 & 著作
编辑

  • bootstrap-grid-system

    抽取bootstrap网格系统

  • 我的个人简历

    这是我个人简历的线上版本,页面重构使用了响应式布局,兼容各种设备。使用了js自动化工具对简历进行开发,部署,上线。

注册于 2014-07-31
个人主页被 3.4k 人浏览