坐死等吃

坐死等吃 查看完整档案

厦门编辑福州大学  |  计算机科学与技术 编辑  |  填写所在公司/组织填写个人主网站
编辑
_ | |__ _ _ __ _ | '_ \| | | |/ _` | | |_) | |_| | (_| | |_.__/ \__,_|\__, | |___/ 该用户太懒什么也没留下

个人动态

坐死等吃 赞了文章 · 今天 21:04

技术选型:为什么批处理我们却选择了Flink

最近接手了一个改造多平台日志服务的需求,经过梳理,我认为之前服务在设计上存在缺陷。经过一段时间的技术方案调研,最终我们决定选择使用 Flink 重构该服务。

目前重构后的服务已成功经受了国庆节流量洪峰的考验,今日特来总结回顾,和大家分享一下经验。

业务需求及背景

在了解改造服务的需求前,我们首先要明确,要解决什么问题以及目前的服务是如何解决的。

当前的业务逻辑还是比较清晰的:

  • 采集同一时段不同数据源的日志;
  • 对采集的数据进行处理;
  • 将处理后的数据上传到指定位置,供客户下载。

我们面临的痛点和难点:

  • 日志的数据量比较大:每小时未压缩的日志数据量有 50 多个 G,节假日等特殊时间节点,日志量会翻倍。
  • 目前服务使用单机进行处理,速度比较慢,扩容不方便。
  • 目前服务处理数据时需要清洗字段,按时间排序,统计某字段的频率等步骤。这些步骤都属于 ETL 中的常规操作,但是目前是以代码的形式实现的,我们想以配置形式减少重复编码,尽量更加简单、通用。

方案1:我们需要一个数据库吗?

针对以上业务需求,有同学提出:“我们可以把所有原始数据放到数据库中,后续的 ETL 可以通过 SQL 实现。”

如果你一听到"数据库"想到的就是 Pg、Mysql、Oracle 等,觉得这个方案不具有可行性,那你就错了。数据库的类型和维度是非常丰富的,如下图所示:

数据库行业全景图

按业务负载特征,关系型数据库可分为 OLTP 数据库(交易型)和 OLAP 数据库(分析型) :

  • OLTP,Online Transaction Processing。OLTP 数据库最大的特点是支持事务,增删查改等功能强大,适合需要被频繁修改的"热数据"。我们耳熟能详的 Mysql、Pg 等都属于这一类。缺点就是由于支持事务,插入时比较慢。拿来实现我们的需求显然是不合适的。
  • OLAP,Online Analytical Processing,数据分析为主。不支持事务,或者说是对事务的支持有限。OLAP 的场景是:大多数是读请求,数据总是以相当大的批(> 1000 rows)进行写入,不修改已添加的数据。

方案 1 小结

OLAP 的使用场景符合我们的需求,为此我们还专门去调研了一下 ClickHouse。但是有一个因素让我们最终放弃了使用 OLAP。请注意,数据库存储的数据都是二维的,有行和列两个维度。但是日志只有行一个维度。如果说为了把日志存入数据库把每行日志都切分,那统计字段的需求也就顺手实现了,又何必存到数据呢?

所以,OLAP 使用场景隐含的一个特点是:存入的数据需要被多维度反复分析的。这样才有把数据存入数据库的动力,像我们当前的需求对日志进行简单的变形后仍旧以文本日志的形式输出,使用 OLAP 是不合适的。

方案2:Hive 为什么不行?

看到这,熟悉大数据的同学可能会觉得我们水平很 Low,因为业务需求归根到底就是三个字:批处理。

那我们为什么第一时间没有考虑上大数据呢?

大数据处理流程

大数据确实如雷贯耳,但现在我们的日志处理这块大部分都是用 Golang 实现的,团队内的其他业务用了 Python、Lua、C,就是没有用过到 Java。而目前大数据都是基于 JVM 开发的。Golang 调用这些服务没有一个好用的客户端。

所以基于团队目前的技术储备,大数据才没有成为我们的首选。但是从目前的状况来看大数据是最优解了。那么我们该选用大数据的什么组件实现需求呢?

放弃使用数据库直接使用 HDFS 存储日志文件,应该是毋庸置疑的。

我们需求是离线批处理数据,对时效性没有要求,MapReduce 和 Hive 都能满足需求。但是 MapReduce 与 Hive 相比,Hive 在 MapReduce 上做了一层封装并且支持 SQL。看起来 Hive 是非常合适的。

那为什么最终放弃了 Hive 呢?

  • 机器资源问题。公司其他团队已经有一套 HDFS 的设施,只用来做存储,Hadoop 的 MapReduce 这个组件根本没跑起来。那套 HDFS 部署的机器资源比较紧张,他们担心我们使用 MapReduce 和 Hive 跑计算,会影响现在 HDFS 的性能; 我们想审批一批新的机器,重新使用 Ambari 搭建一套 Hadoop,却被告知没那么多闲置的机器资源。而且我们即便申请下来了机器,只跑目前服务也跑不满,机器资源大部分也会被闲置,也有浪费资源的嫌疑。
  • 存储分离是趋势。在调研中我们发现,像 Hadoop 这样把存储和计算放到一起的已经比较"落伍"了。Hadoop 存储分离,需要修改源码,目前没有开源实现,只是云厂商和各个大数据公司有相关商业产品。从这个角度讲,即便我们自己搞定了机器资源搭一套 Hadoop,也只不过是拾人牙慧罢了。

大数据生态图

方案 2 小结

再合适的技术方案不能落地也是空谈。但是技术方案想要落地时,已经不是一个单纯的技术问题了,资源限制,团队限制等都需要考虑在内。

一个优秀的技术方案立足于解决当下的问题,并且能放眼未来勾画蓝图,这样大家觉得 "有利可图",才愿意跟你一起折腾。

方案3:为什么我们放弃了 Spark?

通用的计算引擎

虽然使用 HDFS 的团队不赞成在他们的机器上跑 Hive,但是我们把日志数据存到他们的 HDFS 上还是没问题的。在已知 "存储和分离是趋势" 是前提的基础下,"我们到底需要什么" 这个问题已经有答案了。

我们需要的是一个通用的计算引擎。存储已经剥离给 HDFS 了,所以我们只需要找一个工具,帮我们处理 ETL 就可以了。Spark 和 Flink 正是这样的场景。

Spark 与 Flink 初次交锋

Spark 和 Flink 之间,我们毫不犹豫地选择了 Spark。原因非常简单:

  • Spark 适合批处理。Spark 当初的设计目标就是用来替换 MapReduce。而 Spark 流处理的能力是后来加上去的。所以用 Spark 进行批处理,可谓得心应手。
  • Spark 成熟度高。Spark 目前已经发布到 3.0,而 Flink 尚在 Flink 1.x 阶段。Flink 向来以流处理闻名,虽然被国内某云收购后开始鼓吹 "流批一体",但是线上效果还是有待检验的。
  • Scala 的加持。Spark 大部分是用 Scala 实现的。Scala 是一门多范式的编程语言,并且与 Haskell 有很深的渊源。Haskell 是一门大名鼎鼎的函数式编程语言。对于函数式编程语言,想必大多数程序猿都有一种 "虽不能至,然心向往之" 的情结。现在使用 Spark 能捎带着耍一耍函数式编程语言 Scala,岂不妙哉?

Scala

挥泪斩 Spark

前文已经交代过了,我们否决掉 Hive 的一个重要因素是没有足够的机器资源。所以我们把 Spark 直接部署到云平台上。

对于我司的云平台要补充一些细节。

我们的云平台是基于 K8S 二次开发的,目前还在迭代当中,因此"Spark on K8S" 的运行模式我们暂时用不了。在这样的情况下,我们采用了 "Spark Standalone" 的模式。Standalone 模式,也就是Master Slaver 模式,类似于 Nginx 那样的架构,Master 节点负责接收分发任务,Slaver 节点负责"干活"。

等到我们在云平台上以 "Spark Standalone" 模式部署好了,跑了几个测试 Case 发现了新问题。我们的云平台与办公网络是隔离的,如果办公网络想访问云平台的某个 Docker 容器,需要配置域名。而 Spark 的管理页面上很多 URL 的 domain 是所在机器的 IP,容器的 IP 是虚拟 IP,容器重启后IP 就会改变。具体如图:

部署在云平台的 spark

Spark 的管理平台非常重要,因为能从这上面看到当前各个节点运行情况,任务的异常信息等,现在很多链接不能访问,不利于我们对 Spark 任务进行问题排查和调优。基于这个原因,我们最终放弃了 Spark。

方案 3 小结

Spark 你真的很优秀,擅长批处理,如此成熟,还有函数式的基因 。。。这些优点早让我倾心不已。

Spark 你真的是个好人,如果不是云平台的限制,我一定选择你。

Spark,对不起。

方案4:Flink,真香!

给 Spark 发完好人卡后,我们看一看新欢 Flink。不客气的说,Flink 初期时很多实现都是抄的 Spark,所以二者的很多概念相似。所以 Flink 同样有 Standalone 模式,我们在部署阶段没遇到任何问题。

在跑了几个 Flink 测试 Case 后,我们由衷的感叹 Flink 真香。

放弃 Spark 时我们的痛点在于 "部署在云平台上的 Spark 服务的管理界面很多功能无法使用",而 Flink 的管理平台完全没有这个问题。除此之外,Flink 管理平台的 "颜值" 和功能都是 Spark 无法比拟的。

管理平台颜值对比

Spark管理平台页面

Flink管理平台页面

对比之下,Spark 的页面完全是个"黄脸婆"。

Flink 管理平台功能

由于 Spark 的功能很多不能使用,所以就不重点和 Flink 做比较了。这里只说 Flink 几个让人眼前一亮的功能。

  • 完善的 Restful API

部署了 Flink 或 Spark 服务后,该如何下发计算任务呢? 一般是通过 bin 目录下的一个名称中包含 submit 的可执行程序。那如果想把 Flink 或 Spark 做成微服务,通过 http 接口去下发任务呢?

Spark1.0 的时候支持 http,2.0时这个功能基本上废掉了,很多参数不支持了,把 http 这个功能交由 jobService 一个第三方开源组件去实现。这个 jobService 的开源组件对云平台的支持也非常不友好。所以在我们看来,Spark 通过 Http 下发任务的路子基本被堵死了。

反观 Flink,管理平台的接口是 Restful 的,不仅支持 Http 下发计算任务,还可以通过相关接口查看任务状态和获取异常或返回值。

  • 强大的任务分析能力

Flink 的任务分为几个不同的阶段,每个不同的阶段有不同的颜色。这样仅从颜色就可以判断出当前 Flink 任务执行的大致情况。如下图:

Flink管理平台页面

在任务详情页面,会有任务分解图和任务执行耗时表格,这两个结合起来能够知道当然 Flink 任务是如何分解的,是否出现数据倾斜的情况,哪个步骤耗时最多,是否有优化的空间。

管理平台页面

这就是做批处理技术选型时候的心路历程,随笔记了下来,希望对大家有所帮助。

推荐阅读

如何选择适合自己网站的防盗链

HTTP/3 来了,你了解它么?

查看原文

赞 4 收藏 3 评论 0

坐死等吃 赞了文章 · 9月21日

【写个工作用的脚手架cli】用脚手架整合模板和配置

学习总结篇,以能否造轮子来衡量学习效果。本篇主要介绍近期使用的一个cli工具

脚手架cli的解决的问题

随着公司各端的业务进行,前端方面会沉淀出一些通用的解决方案和模板。此时,统一维护和管理就非常有必要了。allen-cli就是基于这样的场景而诞生的。

这个项目脚手架,最终实现:整合各个模板,一键生成模板

使用示例

目前实现的功能为:

  1. 输入allen init命令选择一个脚手架模版进行下载,然后创建对应的app。
  2. 动态选择构建环境,适配移动端等不同情况。

allen-cli的具体流程

项目的整体结构:

1. 创建项目

npm init创建package.json, 主要加上bin命令

{
  "bin": {
    "allen": "bin/allen",
    "allen-init": "bin/allen-init"
  },
}

2. 解析参数

一个CLI需要通过命令行输入各种参数,可以直接用nodejs的process相关api进行解析,但是更推荐使用commander这个npm包可以大大简化解析的过程。

#!/usr/bin/env node
const program = require('commander')

console.log('version', require('../package').version)

program
  .version(require('../package').version)
    .usage('<command> [项目名称]')
    .command('init', '创建新项目')
    .parse(process.argv)

3. main主体流程

allen-init

// NODE moudle
//  node.js 命令行解决方案
const program = require("commander");

// node.js path模块
const path = require("path");

// node.js fs模块
const fs = require("fs");

// 常见的交互式命令行用户接口的集合
const inquirer = require("inquirer");

// 使用shell模式匹配文件
const glob = require("glob");

// 活动最新的npm包
const latestVersion = require("latest-version");

// node.js 子进程
const spawn = require("child_process").spawn;

// node.js 命令行环境的 loading效果, 和显示各种状态的图标
const ora = require("ora");

// The UNIX command rm -rf for node.
const rm = require("rimraf").sync;

async function main() {
  let projectRoot, templateName
  try {
    // 检测版本
    let isUpate = await checkVersion();
    // 更新版本
    if (isUpate) await updateCli();
    // 检测路径
    projectRoot = await checkDir();
    // 创建路径
    makeDir(projectRoot)
    // 选择模板
    let { git } = await selectTemplate();
    // 下载模板
    templateName = await dowload(rootName, git);
    // 本地配置
    let customizePrompt = await getCustomizePrompt(templateName, CONST.CUSTOMIZE_PROMPT)
    // 渲染本地配置
    await render(projectRoot, templateName, customizePrompt);
    // 删除无用文件
    deleteCusomizePrompt(projectRoot)
    // 构建结束
    afterBuild();
  } catch (err) {
    log.error(`创建失败:${err.message}`)
    afterError(projectRoot, templateName)
  }
}

3.1 创建文件下载模板

创建文件和选择模板

// 创建路径
function makeDir (projectRoot) {
  if (projectRoot !== ".") {
    fs.mkdirSync(projectName);
  }
}
/**
 * 模板选择
 */
function selectTemplate() {
  return new Promise((resolve, reject) => {
    let choices = Object.values(templateConfig).map(item => {
      return {
        name: item.name,
        value: item.value
      };
    });
    let config = {
      // type: 'checkbox',
      type: "list",
      message: "请选择创建项目类型",
      name: "select",
      choices: [new inquirer.Separator("模板类型"), ...choices]
    };
    inquirer.prompt(config).then(data => {
      let { select } = data;
      let { value, git } = templateConfig[select];
      resolve({
        git,
        // templateValue: value
      });
    });
  });
}

下载模板, 用的是download-git-repo

const download = require('download-git-repo')
const path = require('path')
const ora = require('ora')
const logSymbols = require("log-symbols");
const chalk = require("chalk");
const CONST = require('../conf/const')
module.exports = function (target, url) {
  const spinner = ora(`正在下载项目模板,源地址:${url}`)
  target = path.join(CONST.TEMPLATE_NAME)
  spinner.start()
  return new Promise((resolve,reject) => {
    download(`direct:${url}`,
    target, { clone: true }, (err) => {
      if (err) {
        spinner.fail()
        console.log(logSymbols.fail, chalk.red("模板下载失败:("));
        reject(err)
      } else {
        spinner.succeed()
        console.log(logSymbols.success, chalk.green("模板下载完毕:)"));
        resolve(target)
      }
    })
  })
}

3.2 界面交互配置

采用的是inquirer的这个库

// 常见的交互式命令行用户接口的集合
const inquirer = require("inquirer");

3.3 本地配置

如果需要将一些配置放在本地文件,则可以创建一些本地配置

/**
 * 
 * @param target 模板路径
 * @param fileName 读取文件名
 */
function getCustomizePrompt (target, fileName) {
  return new Promise ((resolve) => {
    const filePath = path.join(process.cwd(), target, fileName)
    if(fs.existsSync(filePath)) {
      console.log('读取模板配置文件')
      let file = require(filePath)
      resolve(file)
    } else {
      console.log('该文件没有配置文件')
      resolve([])
    }
  })
}

template.json

  {
    type: "confirm",
    name: "mobile",
    message: "是否用于移动端?"
  },
  {
    type: "confirm",
    name: "flexible",
    message: "是否使用移动端适配?",
    when: function (answers) {
      return answers.mobile
    }
  },

4. 涉及到的node.js操作

// NODE moudle
//  node.js 命令行解决方案
const program = require("commander");

// node.js path模块
const path = require("path");

// node.js fs模块
const fs = require("fs");

// 常见的交互式命令行用户接口的集合
const inquirer = require("inquirer");

// 使用shell模式匹配文件
const glob = require("glob");

// 活动最新的npm包
const latestVersion = require("latest-version");

// node.js 子进程
const spawn = require("child_process").spawn;

// node.js 命令行环境的 loading效果, 和显示各种状态的图标
const ora = require("ora");

// The UNIX command rm -rf for node.
const rm = require("rimraf").sync;

5. 本地安装使用

在项目目录下运行npm i -g,注册全局命令allen-cli即可使用

C:\Users\XX\AppData\Roaming\npm目录下会生成相应的可执行文件:

6. npm包allen-cli

一个基本的脚手架CLI就完成了。

欢迎试用:npm i -g allen-cli

相关解析

!/usr/bin/env node

使用过Linux或者Unix的开发者,对于Shebang应该不陌生,它是一个符号的名称,#!。这个符号通常在Unix系统的基本中第一行开头中出现,用于指明这个脚本文件的解释程序。了解了Shebang之后就可以理解,增加这一行是为了指定用node执行脚本文件

当你输入一个命令的时候,npm是如何识别并执行对应的文件的呢?

具体的原理阮一峰大神已经在npm scripts 使用指南中介绍过。简单的理解:

就是输入命令后,会有在一个新建的shell中执行指定的脚本,在执行这个脚本的时候,我们需要来指定这个脚本的解释程序是node。
在一些情况下,即使你增加了这一行,但还是可能会碰到一下错误,这是为什么呢?

No such file or directory

为了解决这个问题,首先需要了解一下/usr/bin/env。我们已经知道,Shebang是为了指定脚本的解释程序,可是不同用户或者不同的脚本解释器有可能安装在不同的目录下,系统如何知道要去哪里找你的解释程序呢?

/usr/bin/env就是告诉系统可以在PATH目录中查找。

所以配置#!/usr/bin/env node, 就是解决了不同的用户node路径不同的问题,可以让系统动态的去查找node来执行你的脚本文件。
看到这里你应该理解,为什么会出现No such file or directory的错误?因为你的node安装路径没有添加到系统的PATH中。所以去进行node环境变量配置就可以了。

NPM 执行脚本的原理

npm 脚本的原理非常简单。每当执行npm run,就会自动新建一个 Shell,在这个 Shell 里面执行指定的脚本命令。因此,只要是 Shell(一般是 Bash)可以运行的命令,就可以写在 npm 脚本里面。

比较特别的是,npm run新建的这个 Shell,会将当前目录的node_modules/.bin子目录加入PATH变量,执行结束后,再将PATH变量恢复原样。

参考链接

查看原文

赞 13 收藏 9 评论 6

坐死等吃 收藏了文章 · 2019-07-01

从实践到原理,带你参透 gRPC

image

原文地址:从实践到原理,带你参透 gRPC

gRPC 在 Go 语言中大放异彩,越来越多的小伙伴在使用,最近也在公司安利了一波,希望能通过这篇文章能带你一览 gRPC 的爱与恨。本文篇幅较长,希望你做好阅读准备,目录如下:

image

简述

gRPC 是一个高性能、开源和通用的 RPC 框架,面向移动和 HTTP/2 设计。目前提供 C、Java 和 Go 语言版本,分别是:grpc, grpc-java, grpc-go. 其中 C 版本支持 C, C++, Node.js, Python, Ruby, Objective-C, PHP 和 C# 支持。

gRPC 基于 HTTP/2 标准设计,带来诸如双向流、流控、头部压缩、单 TCP 连接上的多复用请求等特性。这些特性使得其在移动设备上表现更好,更省电和节省空间占用。

调用模型

image

1、客户端(gRPC Stub)调用 A 方法,发起 RPC 调用。

2、对请求信息使用 Protobuf 进行对象序列化压缩(IDL)。

3、服务端(gRPC Server)接收到请求后,解码请求体,进行业务逻辑处理并返回。

4、对响应结果使用 Protobuf 进行对象序列化压缩(IDL)。

5、客户端接受到服务端响应,解码请求体。回调被调用的 A 方法,唤醒正在等待响应(阻塞)的客户端调用并返回响应结果。

调用方式

一、Unary RPC:一元 RPC

image

Server

type SearchService struct{}

func (s *SearchService) Search(ctx context.Context, r *pb.SearchRequest) (*pb.SearchResponse, error) {
    return &pb.SearchResponse{Response: r.GetRequest() + " Server"}, nil
}

const PORT = "9001"

func main() {
    server := grpc.NewServer()
    pb.RegisterSearchServiceServer(server, &SearchService{})

    lis, err := net.Listen("tcp", ":"+PORT)
    ...

    server.Serve(lis)
}
  • 创建 gRPC Server 对象,你可以理解为它是 Server 端的抽象对象。
  • 将 SearchService(其包含需要被调用的服务端接口)注册到 gRPC Server。 的内部注册中心。这样可以在接受到请求时,通过内部的 “服务发现”,发现该服务端接口并转接进行逻辑处理。
  • 创建 Listen,监听 TCP 端口。
  • gRPC Server 开始 lis.Accept,直到 Stop 或 GracefulStop。

Client

func main() {
    conn, err := grpc.Dial(":"+PORT, grpc.WithInsecure())
    ...
    defer conn.Close()

    client := pb.NewSearchServiceClient(conn)
    resp, err := client.Search(context.Background(), &pb.SearchRequest{
        Request: "gRPC",
    })
    ...
}
  • 创建与给定目标(服务端)的连接句柄。
  • 创建 SearchService 的客户端对象。
  • 发送 RPC 请求,等待同步响应,得到回调后返回响应结果。

二、Server-side streaming RPC:服务端流式 RPC

image

Server

func (s *StreamService) List(r *pb.StreamRequest, stream pb.StreamService_ListServer) error {
    for n := 0; n <= 6; n++ {
        stream.Send(&pb.StreamResponse{
            Pt: &pb.StreamPoint{
                ...
            },
        })
    }

    return nil
}

Client

func printLists(client pb.StreamServiceClient, r *pb.StreamRequest) error {
    stream, err := client.List(context.Background(), r)
    ...
    
    for {
        resp, err := stream.Recv()
        if err == io.EOF {
            break
        }
        ...
    }

    return nil
}

三、Client-side streaming RPC:客户端流式 RPC

image

Server

func (s *StreamService) Record(stream pb.StreamService_RecordServer) error {
    for {
        r, err := stream.Recv()
        if err == io.EOF {
            return stream.SendAndClose(&pb.StreamResponse{Pt: &pb.StreamPoint{...}})
        }
        ...

    }

    return nil
}

Client

func printRecord(client pb.StreamServiceClient, r *pb.StreamRequest) error {
    stream, err := client.Record(context.Background())
    ...
    
    for n := 0; n < 6; n++ {
        stream.Send(r)
    }

    resp, err := stream.CloseAndRecv()
    ...

    return nil
}

四、Bidirectional streaming RPC:双向流式 RPC

image

Server

func (s *StreamService) Route(stream pb.StreamService_RouteServer) error {
    for {
        stream.Send(&pb.StreamResponse{...})
        r, err := stream.Recv()
        if err == io.EOF {
            return nil
        }
        ...
    }

    return nil
}

Client

func printRoute(client pb.StreamServiceClient, r *pb.StreamRequest) error {
    stream, err := client.Route(context.Background())
    ...

    for n := 0; n <= 6; n++ {
        stream.Send(r)
        resp, err := stream.Recv()
        if err == io.EOF {
            break
        }
        ...
    }

    stream.CloseSend()

    return nil
}

客户端与服务端是如何交互的

在开始分析之前,我们要先 gRPC 的调用有一个初始印象。那么最简单的就是对 Client 端调用 Server 端进行抓包去剖析,看看整个过程中它都做了些什么事。如下图:

image

  • Magic
  • SETTINGS
  • HEADERS
  • DATA
  • SETTINGS
  • WINDOW_UPDATE
  • PING
  • HEADERS
  • DATA
  • HEADERS
  • WINDOW_UPDATE
  • PING

我们略加整理发现共有十二个行为,是比较重要的。在开始分析之前,建议你自己先想一下,它们的作用都是什么?大胆猜测一下,带着疑问去学习效果更佳。

行为分析

Magic

image

Magic 帧的主要作用是建立 HTTP/2 请求的前言。在 HTTP/2 中,要求两端都要发送一个连接前言,作为对所使用协议的最终确认,并确定 HTTP/2 连接的初始设置,客户端和服务端各自发送不同的连接前言。

而上图中的 Magic 帧是客户端的前言之一,内容为 PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n,以确定启用 HTTP/2 连接。

SETTINGS

image

image

SETTINGS 帧的主要作用是设置这一个连接的参数,作用域是整个连接而并非单一的流。

而上图的 SETTINGS 帧都是空 SETTINGS 帧,图一是客户端连接的前言(Magic 和 SETTINGS 帧分别组成连接前言)。图二是服务端的。另外我们从图中可以看到多个 SETTINGS 帧,这是为什么呢?是因为发送完连接前言后,客户端和服务端还需要有一步互动确认的动作。对应的就是带有 ACK 标识 SETTINGS 帧。

HEADERS

image

HEADERS 帧的主要作用是存储和传播 HTTP 的标头信息。我们关注到 HEADERS 里有一些眼熟的信息,分别如下:

  • method:POST
  • scheme:http
  • path:/proto.SearchService/Search
  • authority::10001
  • content-type:application/grpc
  • user-agent:grpc-go/1.20.0-dev

你会发现这些东西非常眼熟,其实都是 gRPC 的基础属性,实际上远远不止这些,只是设置了多少展示多少。例如像平时常见的 grpc-timeoutgrpc-encoding 也是在这里设置的。

DATA

image

DATA 帧的主要作用是装填主体信息,是数据帧。而在上图中,可以很明显看到我们的请求参数 gRPC 存储在里面。只需要了解到这一点就可以了。

HEADERS, DATA, HEADERS

image

在上图中 HEADERS 帧比较简单,就是告诉我们 HTTP 响应状态和响应的内容格式。

imgae

在上图中 DATA 帧主要承载了响应结果的数据集,图中的 gRPC Server 就是我们 RPC 方法的响应结果。

image

在上图中 HEADERS 帧主要承载了 gRPC 状态 和 gRPC 状态消息,图中的 grpc-statusgrpc-message 就是我们的 gRPC 调用状态的结果。

其它步骤

WINDOW_UPDATE

主要作用是管理和流的窗口控制。通常情况下打开一个连接后,服务器和客户端会立即交换 SETTINGS 帧来确定流控制窗口的大小。默认情况下,该大小设置为约 65 KB,但可通过发出一个 WINDOW_UPDATE 帧为流控制设置不同的大小。

image

PING/PONG

主要作用是判断当前连接是否仍然可用,也常用于计算往返时间。其实也就是 PING/PONG,大家对此应该很熟。

小结

image

  • 在建立连接之前,客户端/服务端都会发送连接前言(Magic+SETTINGS),确立协议和配置项。
  • 在传输数据时,是会涉及滑动窗口(WINDOW_UPDATE)等流控策略的。
  • 传播 gRPC 附加信息时,是基于 HEADERS 帧进行传播和设置;而具体的请求/响应数据是存储的 DATA 帧中的。
  • 请求/响应结果会分为 HTTP 和 gRPC 状态响应两种类型。
  • 客户端发起 PING,服务端就会回应 PONG,反之亦可。

这块 gRPC 的基础使用,你可以看看我另外的 《gRPC 入门系列》,相信对你一定有帮助。

浅谈理解

服务端

image

为什么四行代码,就能够起一个 gRPC Server,内部做了什么逻辑。你有想过吗?接下来我们一步步剖析,看看里面到底是何方神圣。

一、初始化

// grpc.NewServer()
func NewServer(opt ...ServerOption) *Server {
    opts := defaultServerOptions
    for _, o := range opt {
        o(&opts)
    }
    s := &Server{
        lis:    make(map[net.Listener]bool),
        opts:   opts,
        conns:  make(map[io.Closer]bool),
        m:      make(map[string]*service),
        quit:   make(chan struct{}),
        done:   make(chan struct{}),
        czData: new(channelzData),
    }
    s.cv = sync.NewCond(&s.mu)
    ...

    return s
}

这块比较简单,主要是实例 grpc.Server 并进行初始化动作。涉及如下:

  • lis:监听地址列表。
  • opts:服务选项,这块包含 Credentials、Interceptor 以及一些基础配置。
  • conns:客户端连接句柄列表。
  • m:服务信息映射。
  • quit:退出信号。
  • done:完成信号。
  • czData:用于存储 ClientConn,addrConn 和 Server 的channelz 相关数据。
  • cv:当优雅退出时,会等待这个信号量,直到所有 RPC 请求都处理并断开才会继续处理。

二、注册

pb.RegisterSearchServiceServer(server, &SearchService{})

步骤一:Service API interface

// search.pb.go
type SearchServiceServer interface {
    Search(context.Context, *SearchRequest) (*SearchResponse, error)
}

func RegisterSearchServiceServer(s *grpc.Server, srv SearchServiceServer) {
    s.RegisterService(&_SearchService_serviceDesc, srv)
}

还记得我们平时编写的 Protobuf 吗?在生成出来的 .pb.go 文件中,会定义出 Service APIs interface 的具体实现约束。而我们在 gRPC Server 进行注册时,会传入应用 Service 的功能接口实现,此时生成的 RegisterServer 方法就会保证两者之间的一致性。

步骤二:Service API IDL

你想乱传糊弄一下?不可能的,请乖乖定义与 Protobuf 一致的接口方法。但是那个 &_SearchService_serviceDesc 又有什么作用呢?代码如下:

// search.pb.go
var _SearchService_serviceDesc = grpc.ServiceDesc{
    ServiceName: "proto.SearchService",
    HandlerType: (*SearchServiceServer)(nil),
    Methods: []grpc.MethodDesc{
        {
            MethodName: "Search",
            Handler:    _SearchService_Search_Handler,
        },
    },
    Streams:  []grpc.StreamDesc{},
    Metadata: "search.proto",
}

这看上去像服务的描述代码,用来向内部表述 “我” 都有什么。涉及如下:

  • ServiceName:服务名称
  • HandlerType:服务接口,用于检查用户提供的实现是否满足接口要求
  • Methods:一元方法集,注意结构内的 Handler 方法,其对应最终的 RPC 处理方法,在执行 RPC 方法的阶段会使用。
  • Streams:流式方法集
  • Metadata:元数据,是一个描述数据属性的东西。在这里主要是描述 SearchServiceServer 服务

步骤三:Register Service

func (s *Server) register(sd *ServiceDesc, ss interface{}) {
    ...
    srv := &service{
        server: ss,
        md:     make(map[string]*MethodDesc),
        sd:     make(map[string]*StreamDesc),
        mdata:  sd.Metadata,
    }
    for i := range sd.Methods {
        d := &sd.Methods[i]
        srv.md[d.MethodName] = d
    }
    for i := range sd.Streams {
        ...
    }
    s.m[sd.ServiceName] = srv
}

在最后一步中,我们会将先前的服务接口信息、服务描述信息给注册到内部 service 去,以便于后续实际调用的使用。涉及如下:

  • server:服务的接口信息
  • md:一元服务的 RPC 方法集
  • sd:流式服务的 RPC 方法集
  • mdata:metadata,元数据

小结

在这一章节中,主要介绍的是 gRPC Server 在启动前的整理和注册行为,看上去很简单,但其实一切都是为了后续的实际运行的预先准备。因此我们整理一下思路,将其串联起来看看,如下:

image

三、监听

接下来到了整个流程中,最重要也是大家最关注的监听/处理阶段,核心代码如下:

func (s *Server) Serve(lis net.Listener) error {
    ...
    var tempDelay time.Duration 
    for {
        rawConn, err := lis.Accept()
        if err != nil {
            if ne, ok := err.(interface {
                Temporary() bool
            }); ok && ne.Temporary() {
                if tempDelay == 0 {
                    tempDelay = 5 * time.Millisecond
                } else {
                    tempDelay *= 2
                }
                if max := 1 * time.Second; tempDelay > max {
                    tempDelay = max
                }
                ...
                timer := time.NewTimer(tempDelay)
                select {
                case <-timer.C:
                case <-s.quit:
                    timer.Stop()
                    return nil
                }
                continue
            }
            ...
            return err
        }
        tempDelay = 0

        s.serveWG.Add(1)
        go func() {
            s.handleRawConn(rawConn)
            s.serveWG.Done()
        }()
    }
}

Serve 会根据外部传入的 Listener 不同而调用不同的监听模式,这也是 net.Listener 的魅力,灵活性和扩展性会比较高。而在 gRPC Server 中最常用的就是 TCPConn,基于 TCP Listener 去做。接下来我们一起看看具体的处理逻辑,如下:

image

  • 循环处理连接,通过 lis.Accept 取出连接,如果队列中没有需处理的连接时,会形成阻塞等待。
  • lis.Accept 失败,则触发休眠机制,若为第一次失败那么休眠 5ms,否则翻倍,再次失败则不断翻倍直至上限休眠时间 1s,而休眠完毕后就会尝试去取下一个 “它”。
  • lis.Accept 成功,则重置休眠的时间计数和启动一个新的 goroutine 调用 handleRawConn 方法去执行/处理新的请求,也就是大家很喜欢说的 “每一个请求都是不同的 goroutine 在处理”。
  • 在循环过程中,包含了 “退出” 服务的场景,主要是硬关闭和优雅重启服务两种情况。

客户端

image

一、创建拨号连接

// grpc.Dial(":"+PORT, grpc.WithInsecure())
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
    cc := &ClientConn{
        target:            target,
        csMgr:             &connectivityStateManager{},
        conns:             make(map[*addrConn]struct{}),
        dopts:             defaultDialOptions(),
        blockingpicker:    newPickerWrapper(),
        czData:            new(channelzData),
        firstResolveEvent: grpcsync.NewEvent(),
    }
    ...
    chainUnaryClientInterceptors(cc)
    chainStreamClientInterceptors(cc)

    ...
}

grpc.Dial 方法实际上是对于 grpc.DialContext 的封装,区别在于 ctx 是直接传入 context.Background。其主要功能是创建与给定目标的客户端连接,其承担了以下职责:

  • 初始化 ClientConn
  • 初始化(基于进程 LB)负载均衡配置
  • 初始化 channelz
  • 初始化重试规则和客户端一元/流式拦截器
  • 初始化协议栈上的基础信息
  • 相关 context 的超时控制
  • 初始化并解析地址信息
  • 创建与服务端之间的连接

连没连

之前听到有的人说调用 grpc.Dial 后客户端就已经与服务端建立起了连接,但这对不对呢?我们先鸟瞰全貌,看看正在跑的 goroutine。如下:

image

我们可以有几个核心方法一直在等待/处理信号,通过分析底层源码可得知。涉及如下:

func (ac *addrConn) connect()
func (ac *addrConn) resetTransport()
func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time)
func (ac *addrConn) getReadyTransport()

在这里主要分析 goroutine 提示的 resetTransport 方法,看看都做了啥。核心代码如下:

func (ac *addrConn) resetTransport() {
    for i := 0; ; i++ {
        if ac.state == connectivity.Shutdown {
            return
        }
        ...
        connectDeadline := time.Now().Add(dialDuration)
        ac.updateConnectivityState(connectivity.Connecting)
        newTr, addr, reconnect, err := ac.tryAllAddrs(addrs, connectDeadline)
        if err != nil {
            if ac.state == connectivity.Shutdown {
                return
            }
            ac.updateConnectivityState(connectivity.TransientFailure)
            timer := time.NewTimer(backoffFor)
            select {
            case <-timer.C:
                ...
            }
            continue
        }

        if ac.state == connectivity.Shutdown {
            newTr.Close()
            return
        }
        ...
        if !healthcheckManagingState {
            ac.updateConnectivityState(connectivity.Ready)
        }
        ...

        if ac.state == connectivity.Shutdown {
            return
        }
        ac.updateConnectivityState(connectivity.TransientFailure)
    }
}

在该方法中会不断地去尝试创建连接,若成功则结束。否则不断地根据 Backoff 算法的重试机制去尝试创建连接,直到成功为止。从结论上来讲,单纯调用 DialContext 是异步建立连接的,也就是并不是马上生效,处于 Connecting 状态,而正式下要到达 Ready 状态才可用。

真的连了吗

image

在抓包工具上提示一个包都没有,那么这算真正连接了吗?我认为这是一个表述问题,我们应该尽可能的严谨。如果你真的想通过 DialContext 方法就打通与服务端的连接,则需要调用 WithBlock 方法,虽然会导致阻塞等待,但最终连接会到达 Ready 状态(握手成功)。如下图:

image

二、实例化 Service API

type SearchServiceClient interface {
    Search(ctx context.Context, in *SearchRequest, opts ...grpc.CallOption) (*SearchResponse, error)
}

type searchServiceClient struct {
    cc *grpc.ClientConn
}

func NewSearchServiceClient(cc *grpc.ClientConn) SearchServiceClient {
    return &searchServiceClient{cc}
}

这块就是实例 Service API interface,比较简单。

三、调用

// search.pb.go
func (c *searchServiceClient) Search(ctx context.Context, in *SearchRequest, opts ...grpc.CallOption) (*SearchResponse, error) {
    out := new(SearchResponse)
    err := c.cc.Invoke(ctx, "/proto.SearchService/Search", in, out, opts...)
    if err != nil {
        return nil, err
    }
    return out, nil
}

proto 生成的 RPC 方法更像是一个包装盒,把需要的东西放进去,而实际上调用的还是 grpc.invoke 方法。如下:

func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
    cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
    if err != nil {
        return err
    }
    if err := cs.SendMsg(req); err != nil {
        return err
    }
    return cs.RecvMsg(reply)
}

通过概览,可以关注到三块调用。如下:

  • newClientStream:获取传输层 Trasport 并组合封装到 ClientStream 中返回,在这块会涉及负载均衡、超时控制、 Encoding、 Stream 的动作,与服务端基本一致的行为。
  • cs.SendMsg:发送 RPC 请求出去,但其并不承担等待响应的功能。
  • cs.RecvMsg:阻塞等待接受到的 RPC 方法响应结果。

连接

// clientconn.go
func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
    t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickOptions{
        FullMethodName: method,
    })
    if err != nil {
        return nil, nil, toRPCErr(err)
    }
    return t, done, nil
}

newClientStream 方法中,我们通过 getTransport 方法获取了 Transport 层中抽象出来的 ClientTransport 和 ServerTransport,实际上就是获取一个连接给后续 RPC 调用传输使用。

四、关闭连接

// conn.Close()
func (cc *ClientConn) Close() error {
    defer cc.cancel()
    ...
    cc.csMgr.updateState(connectivity.Shutdown)
    ...
    cc.blockingpicker.close()
    if rWrapper != nil {
        rWrapper.close()
    }
    if bWrapper != nil {
        bWrapper.close()
    }

    for ac := range conns {
        ac.tearDown(ErrClientConnClosing)
    }
    if channelz.IsOn() {
        ...
        channelz.AddTraceEvent(cc.channelzID, ted)
        channelz.RemoveEntry(cc.channelzID)
    }
    return nil
}

该方法会取消 ClientConn 上下文,同时关闭所有底层传输。涉及如下:

  • Context Cancel
  • 清空并关闭客户端连接
  • 清空并关闭解析器连接
  • 清空并关闭负载均衡连接
  • 添加跟踪引用
  • 移除当前通道信息

Q&A

1. gRPC Metadata 是通过什么传输?

image

2. 调用 grpc.Dial 会真正的去连接服务端吗?

会,但是是异步连接的,连接状态为正在连接。但如果你设置了 grpc.WithBlock 选项,就会阻塞等待(等待握手成功)。另外你需要注意,当未设置 grpc.WithBlock 时,ctx 超时控制对其无任何效果。

3. 调用 ClientConn 不 Close 会导致泄露吗?

会,除非你的客户端不是常驻进程,那么在应用结束时会被动地回收资源。但如果是常驻进程,你又真的忘记执行 Close 语句,会造成的泄露。如下图:

3.1. 客户端

image

3.2. 服务端

image

3.3. TCP

image

4. 不控制超时调用的话,会出现什么问题?

短时间内不会出现问题,但是会不断积蓄泄露,积蓄到最后当然就是服务无法提供响应了。如下图:

image

5. 为什么默认的拦截器不可以传多个?

func chainUnaryClientInterceptors(cc *ClientConn) {
    interceptors := cc.dopts.chainUnaryInts
    if cc.dopts.unaryInt != nil {
        interceptors = append([]UnaryClientInterceptor{cc.dopts.unaryInt}, interceptors...)
    }
    var chainedInt UnaryClientInterceptor
    if len(interceptors) == 0 {
        chainedInt = nil
    } else if len(interceptors) == 1 {
        chainedInt = interceptors[0]
    } else {
        chainedInt = func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error {
            return interceptors[0](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, 0, invoker), opts...)
        }
    }
    cc.dopts.unaryInt = chainedInt
}

当存在多个拦截器时,取的就是第一个拦截器。因此结论是允许传多个,但并没有用。

6. 真的需要用到多个拦截器的话,怎么办?

可以使用 go-grpc-middleware 提供的 grpc.UnaryInterceptorgrpc.StreamInterceptor 链式方法,方便快捷省心。

单单会用还不行,我们再深剖一下,看看它是怎么实现的。核心代码如下:

func ChainUnaryClient(interceptors ...grpc.UnaryClientInterceptor) grpc.UnaryClientInterceptor {
    n := len(interceptors)
    if n > 1 {
        lastI := n - 1
        return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
            var (
                chainHandler grpc.UnaryInvoker
                curI         int
            )

            chainHandler = func(currentCtx context.Context, currentMethod string, currentReq, currentRepl interface{}, currentConn *grpc.ClientConn, currentOpts ...grpc.CallOption) error {
                if curI == lastI {
                    return invoker(currentCtx, currentMethod, currentReq, currentRepl, currentConn, currentOpts...)
                }
                curI++
                err := interceptors[curI](currentCtx, currentMethod, currentReq, currentRepl, currentConn, chainHandler, currentOpts...)
                curI--
                return err
            }

            return interceptors[0](ctx, method, req, reply, cc, chainHandler, opts...)
        }
    }
    ...
}

当拦截器数量大于 1 时,从 interceptors[1] 开始递归,每一个递归的拦截器 interceptors[i] 会不断地执行,最后才真正的去执行 handler 方法。同时也经常有人会问拦截器的执行顺序是什么,通过这段代码你得出结论了吗?

7. 频繁创建 ClientConn 有什么问题?

这个问题我们可以反向验证一下,假设不公用 ClientConn 看看会怎么样?如下:

func BenchmarkSearch(b *testing.B) {
    for i := 0; i < b.N; i++ {
        conn, err := GetClientConn()
        if err != nil {
            b.Errorf("GetClientConn err: %v", err)
        }
        _, err = Search(context.Background(), conn)
        if err != nil {
            b.Errorf("Search err: %v", err)
        }
    }
}

输出结果:

    ... connection error: desc = "transport: Error while dialing dial tcp :10001: socket: too many open files"
    ... connection error: desc = "transport: Error while dialing dial tcp :10001: socket: too many open files"
    ... connection error: desc = "transport: Error while dialing dial tcp :10001: socket: too many open files"
    ... connection error: desc = "transport: Error while dialing dial tcp :10001: socket: too many open files"
FAIL
exit status 1

当你的应用场景是存在高频次同时生成/调用 ClientConn 时,可能会导致系统的文件句柄占用过多。这种情况下你可以变更应用程序生成/调用 ClientConn 的模式,又或是池化它,这块可以参考 grpc-go-pool 项目。

8. 客户端请求失败后会默认重试吗?

会不断地进行重试,直到上下文取消。而重试时间方面采用 backoff 算法作为的重连机制,默认的最大重试时间间隔是 120s。

9. 为什么要用 HTTP/2 作为传输协议?

许多客户端要通过 HTTP 代理来访问网络,gRPC 全部用 HTTP/2 实现,等到代理开始支持 HTTP/2 就能透明转发 gRPC 的数据。不光如此,负责负载均衡、访问控制等等的反向代理都能无缝兼容 gRPC,比起自己设计 wire protocol 的 Thrift,这样做科学不少。@ctiller @滕亦飞

10. 在 Kubernetes 中 gRPC 负载均衡有问题?

gRPC 的 RPC 协议是基于 HTTP/2 标准实现的,HTTP/2 的一大特性就是不需要像 HTTP/1.1 一样,每次发出请求都要重新建立一个新连接,而是会复用原有的连接。

所以这将导致 kube-proxy 只有在连接建立时才会做负载均衡,而在这之后的每一次 RPC 请求都会利用原本的连接,那么实际上后续的每一次的 RPC 请求都跑到了同一个地方。

注:使用 k8s service 做负载均衡的情况下

总结

  • gRPC 基于 HTTP/2 + Protobuf。
  • gRPC 有四种调用方式,分别是一元、服务端/客户端流式、双向流式。
  • gRPC 的附加信息都会体现在 HEADERS 帧,数据在 DATA 帧上。
  • Client 请求若使用 grpc.Dial 默认是异步建立连接,当时状态为 Connecting。
  • Client 请求若需要同步则调用 WithBlock(),完成状态为 Ready。
  • Server 监听是循环等待连接,若没有则休眠,最大休眠时间 1s;若接收到新请求则起一个新的 goroutine 去处理。
  • grpc.ClientConn 不关闭连接,会导致 goroutine 和 Memory 等泄露。
  • 任何内/外调用如果不加超时控制,会出现泄漏和客户端不断重试。
  • 特定场景下,如果不对 grpc.ClientConn 加以调控,会影响调用。
  • 拦截器如果不用 go-grpc-middleware 链式处理,会覆盖。
  • 在选择 gRPC 的负责均衡模式时,需要谨慎。

参考

查看原文

坐死等吃 收藏了文章 · 2019-04-26

阿里45K高级Java岗,必备技能清单

相信你可能经历过这些:

已经工作两三年了,每个项目都会加班加点全力以赴去完成,薪资增长幅度却不如人意。

听说年后离职的老同事,金三刚拿下高薪offer,年薪直奔50万了。

由于现在的公司接触不到新技术,对自己的市场竞争力无比焦虑,未来职业道路怎么走?

这个凛冽寒冬,你终于明白,现在的工作机会很少,薪资很难有较大涨幅,Java工程师的年薪是技能决定的,是时候去提高技能,提前规划未来职业发展路径了。

作为Java工程师的你,平时工作已经很忙了,到底该如何进阶,才能快速提高薪资到45万甚至100万呢?

分享一些过来人的经验,供大家参考。

一朝成为Java工程师,就注定终身学习。

在James的13年的Java工程师生涯中,这一点深有体会,而让我坚持学习的动力,主要源于以下三个方面:

1、 不断增值自己,拥有更多选择权

如果将技术比做一棵树,每学会一门语言,就点亮树上一个果实,你会得越多,技术树的果实就越多,价值就越大。当技术树的果实积累到一定程度,你就能纵向往高阶技术方向晋级,职业发展之路更上一层楼。

2、 互联网没有舒适圈,止步不前便是退步

计算机技术更新迭代快、新技术层出不穷,如果想要成为一个优秀的Java工程师,就要做好随时学习的准备,并持之以恒,这样才能跟上互联网日新月异的发展节奏。

3、 学习是面对竞争、度过中年危机的最好方法

Java工程师是高薪职业,近年来互联网的高速发展下,更是一度成为抢手的热门职业,以至于投身到这个职业的人越来越多,千军万马过独木桥,想要不掉队,唯有努力学习,成为互联网公司高薪争抢的中高端Java工程师,才能不惧寒冬。

2018年这个寒冬让很多互联网人清醒了,众多大企裁员、招聘需求收缩,主要还是针对基础岗、温水煮青蛙给煮死的那波人,中高端Java工程师在市场上依然紧缺。

下面,James为大家梳理了一份目前互联网公司的主流技术选型。

互联网公司主流技术选型

进阶高级Java、架构师必学6大主要技能,包括:数据结构和算法、Java高级特性、Java web核心、数据库、Java框架与必备工具、系统架构设计等,希望能真正帮助到想要从程序员进阶为高级Java、架构师之路的朋友。

1、并发编程

通过深入了解最底层的运作原理,加强逻辑思维,才能编写出高效、安全的多线程并发程序。

包括:集合框架(源码)、工具类、框架Spring、SpringMVC、Mybatis、Shiro、Netty、服务器(tomcat、Nginx)、网络编程、序列化、JVM等。
图片描述

2、设计模式

设计模式是可复用面向对象软件的基础,学习设计模试是每一位Java工程师进阶的必经之路,灵活地使用设计模式,可以让代码变得简洁、易懂、复用性更高。

常见常用的设计模式有:工厂模式、代理模式等。
图片描述

3、分布式架构

随着业务体量及重要性的增大,单体架构模式无法对应大型应用场景,系统也决不允许存在单点故障导致整体不可用,所以只有垂直或水平拆分业务系统,形成一个分布式的架构来消除单点故障,从而提高整个系统的可用性。

包括:分布式Session、分布式缓存、数据库、一致性、负载均衡、消息队列(RabbitMQ、ZeroMQ、Kafka)等
图片描述

4、微服务

因时因地制宜,选择使用微服务架构的收益将远远大于成本。

规模较少的企业可以考虑适当引入合适的微服务架构,改造已有系统或新建微服务应用,逐步积累微服务架构经验,不要全盘实施微服务架构,综合考量成本与效率、实用性。
图片描述

5、JVM性能优化

图片描述

最后,James想说,职场也好,人生也罢,每个阶段都有一定的阻碍与瓶颈,这是我们都要经历的。

如果你能够认清自己以及自己所处的阶段,有针对性的去思考、充电,坚持做正确的事,付出比别人更多的努力,你就会比别人更加优秀,拥有更多的机会,这就是我们常说的马太效应:越努力,越幸运。

BAT面试真题+架构技术资料,提升软硬实力,收获高薪好offer

说了这么多,到底该如何学习呢,有没有资料或视频呀?

James潜心两年,专注Java面试通关、进阶架构师,不仅提供“知识”,更关注和强调从“知识获得”到“实战能力提升”的转化过程。坚持创作的BAT架构技术系列专题500+、面试资料库1000+,需要的可以私信:“架构”免费获取,供大家参考进阶。

查看原文

坐死等吃 评论了文章 · 2019-04-24

Activiti6通过监听修改实体id、springboot集成配置

1.前言

本文内容主要为以下两点,因为内容有交叉,所以会放在一起介绍。

  • 1.以自由跳转为基础实现不改变原先任务id的驳回
    关于Activiti6动态跳转可以查看我的另一篇文章Activiti6实现自由跳转
  • 2.java类方式进行Activiti6配置、spring boot集成
    因为有一些自定义的需求,如流程字体、自动部署、自定义监听器等,直接引入[activit-spring-boot]又没有必要,所以参考activit6源码中[activit-spring-boot]模块的代码完成。

2.实现介绍

关于自由跳转的内容我就不再多说,主要介绍如何修改Activiti生成的实体的id,以达到驳回时重新生成的任务id与原先的任务id一致。(某些业务场景下可能会用到,例如某流程中A环节提交的表单与task id绑定,当环节提交又被驳回时,为保证表单内容与任务关系不变,驳回后的任务id与原先任务id要一致)

2.1前提知识

  • 1.Activiti持久化实体的过程时先创建实体对象,记录到缓存中,在完成执行后统一进行缓存对象的持久化,并清空缓存。
  • 2.Activiti采用命令模式执行操作,所有操作都时一个CMD。执行一个CMD的时候会创建一个上下文环境,包含待持久化的实体缓存等,如果在CMD中嵌套执行CMD,新的CMD默认会使用上级上下文环境。当一个根级的CMD结束时,Activiti就会进行上述的缓存对象统一的持久化。
  • 3.Activiti有丰富的事件类型(具体可以查看事件枚举类ActivitiEventType)供我们实现相应监听器,进行特殊业务处理。例如ENTITY_CREATED——实体创建完成(task、activity、Execution等所有实体)、TASK_CREATED——任务创建完成(针对task)、TASK_COMPLETED——任务完成等等。

2.2关于修改任务id

结合上述内容我们就可以知道,只要在TASK_CREATED进行监听,直接在监听器中将id改为需要的值即可。理论上是这样,但是需要注意,Activiti6中历史任务实体创建是在TASK_CREATED之前的,如果你在TASK_CREATED中修改任务id,实际上历史任务实体创建时是获取不到的,这样就会导致历史任务的id与运行时任务id不一致。解决的办法也很简单,改为监听ENTITY_CREATED,判断是否时需要修改id的任务实体即可。

实现代码

properties配置文件

# 是否更新数据库表
spring.activiti.databaseSchemaUpdate=true
# 是否激活异步执行器
spring.activiti.asyncExecutorActivate=false
# 流程历史记录登录
spring.activiti.historyLevel=audit
# 是否检查更新流程定义
spring.activiti.checkProcessDefinitions=false
# 流程定义所在前缀
spring.activiti.processDefinitionLocationPrefix=classpath*:/procDef/
# 流程定义后缀
spring.activiti.processDefinitionLocationSuffixes=**.bpmn
# 部署流程定义时是否生成图片
spring.activiti.createDiagramOnDeploy=false
# 字体 下面内容为转成unicode的'宋体'
spring.activiti.activityFontName=\u5b8b\u4f53
spring.activiti.labelFontName=\u5b8b\u4f53

解析Properties类

@ConfigurationProperties("spring.activiti")
public class ActivitiProperties {
    private boolean checkProcessDefinitions = true;
    private boolean asyncExecutorActivate = true;
    private boolean restApiEnabled;
    private String deploymentName;
    private String mailServerHost = "localhost";
    private int mailServerPort = 1025;
    private String mailServerUserName;
    private String mailServerPassword;
    private String mailServerDefaultFrom;
    private boolean mailServerUseSsl;
    private boolean mailServerUseTls;
    private String databaseSchemaUpdate = "true";
    private String databaseSchema;
    private boolean isDbIdentityUsed = true;
    private boolean isDbHistoryUsed = true;
    private HistoryLevel historyLevel = HistoryLevel.AUDIT;
    private String processDefinitionLocationPrefix = "classpath:/processes/";
    private List<String> processDefinitionLocationSuffixes = Arrays.asList("**.bpmn20.xml", "**.bpmn");
    private String restApiMapping = "/api/*";
    private String restApiServletName = "activitiRestApi";
    private boolean jpaEnabled = true; // true by default
    private List<String> customMybatisMappers;
    private List<String> customMybatisXMLMappers;

    private boolean createDiagramOnDeploy;
    private String activityFontName;
    private String labelFontName;
    //省略getter、setter
}

spring boot配置类

@Configuration
@EnableConfigurationProperties(ActivitiProperties.class)
public class ActivitiConfig {
    private static final Logger logger = LoggerFactory.getLogger(ActivitiConfig.class);
    @Autowired
    private ActivitiProperties activitiProperties;
    @Autowired
    private DataSource dataSource;
    @Autowired
    private PlatformTransactionManager transactionManager;
    @Autowired
    private TaskCreatedListener taskCreatedListener;
    @Autowired
    private TaskCompletedListener taskCompletedListener;
    @Autowired
    private EntityCreatedListener entityCreatedListener;
    @Autowired
    private ResourcePatternResolver resourceLoader;

    @Bean
    public SpringProcessEngineConfiguration processEngineConfiguration() throws IOException {
        SpringProcessEngineConfiguration configuration = new SpringProcessEngineConfiguration();
        configuration.setDataSource(dataSource);
        configuration.setTransactionManager(transactionManager);
        configuration.setDatabaseSchemaUpdate(activitiProperties.getDatabaseSchemaUpdate());
        configuration.setAsyncExecutorActivate(activitiProperties.isAsyncExecutorActivate());
        configuration.setHistory(activitiProperties.getHistoryLevel().getKey());
        configuration.setCreateDiagramOnDeploy(activitiProperties.isCreateDiagramOnDeploy());
        configuration.setActivityFontName(activitiProperties.getActivityFontName());
        configuration.setLabelFontName(activitiProperties.getLabelFontName());
        //todo 修改自动部署,当前自动部署直接搬自[activit-spring-boot]
        //如果checkProcessDefinitions为true,则发布新版流程定义,后续可能根据流程定义文件MD5等判断是否真正变化而进行发布
        List<Resource> procDefResources = discoverProcessDefinitionResources(activitiProperties.getProcessDefinitionLocationPrefix(),            activitiProperties.getProcessDefinitionLocationSuffixes(),this.activitiProperties.isCheckProcessDefinitions());
        configuration.setDeploymentResources(procDefResources.toArray(new Resource[procDefResources.size()]));
        Map<String, List<ActivitiEventListener>> typedListeners = new HashMap<>();
        typedListeners.put("ENTITY_CREATED", Collections.singletonList(entityCreatedListener));
        typedListeners.put("TASK_CREATED", Collections.singletonList(taskCreatedListener));
        typedListeners.put("TASK_COMPLETED", Collections.singletonList(taskCompletedListener));
        configuration.setTypedEventListeners(typedListeners);
        return configuration;
    }
    private List<Resource> discoverProcessDefinitionResources(String prefix, List<String> suffixes, boolean checkPDs) throws IOException {
        if (checkPDs) {
            List<Resource> result = new ArrayList<>();
            for (String suffix : suffixes) {
                String path = prefix + suffix;
                Resource[] resources = resourceLoader.getResources(path);
                if (resources != null && resources.length > 0) {
                    CollectionUtils.mergeArrayIntoCollection(resources, result);
                }
            }
            if (result.isEmpty()) {
                logger.info("No process definitions were found for autodeployment");
            }
            return result;
        }
        return new ArrayList<>();
    }
    @Bean
    public ProcessEngineFactoryBean processEngine() throws IOException {
        ProcessEngineFactoryBean factoryBean = new ProcessEngineFactoryBean();
        factoryBean.setProcessEngineConfiguration(processEngineConfiguration());
        return factoryBean;
    }
    @Bean
    public RuntimeService runtimeService(ProcessEngine processEngine) {
        return processEngine.getRuntimeService();
    }
    @Bean
    public RepositoryService repositoryService(ProcessEngine processEngine) {
        return processEngine.getRepositoryService();
    }
    @Bean
    public TaskService taskService(ProcessEngine processEngine) {
        return processEngine.getTaskService();
    }
    @Bean
    public HistoryService historyService(ProcessEngine processEngine) {
        return processEngine.getHistoryService();
    }
    @Bean
    public ManagementService managementService(ProcessEngine processEngine) {
        return processEngine.getManagementService();
    }
    @Bean
    public IdentityService identityService(ProcessEngine processEngine) {
        return processEngine.getIdentityService();
    }

实体创建完成监听器

@Component
public class EntityCreatedListener implements ActivitiEventListener {
    public void onEvent(ActivitiEvent event){
        Object entity = ((ActivitiEntityEvent)event).getEntity();
        if(entity instanceof TaskEntity){
            TaskEntity taskEntity = (TaskEntity)entity;
            // 这个要改变的id值,可以在上篇文章中的SetFLowNodeAndGoCmd中设置相应流程变量即可。
            String changeTaskId = (String)taskEntity.getVariable("changeTaskIdVarKey");
            if(!StringUtils.isEmpty(changeTaskId)){
                taskEntity.setId(changeTaskId);
                taskEntity.setVariable("changeTaskIdKey","");
            }
        }
    }
    public boolean isFailOnException(){
        return true;
    }
}

2.3关于如何获取当前任务的来源任务,以进行驳回

我们知道Activiti中有TASK_CREATED和TASK_COMPLETED事件,在同一个流程实例中,一个任务A如果不是最后的结束任务,那么在它完成后,必定会有一个新的任务B创建,而我们简单理解为A为B的来源任务。(假设A是申请任务,B就时审批任务,B的处理人对当前审批不同意要驳回时,流程就要回退到任务A。)
这样一来,我们可以监听TASK_COMPLETED,在此时为流程设置一个变量fromTaskId,值为任务A的id,当任务A的TASK_COMPLETED结束后,就来到的了任务B的TASK_CREATED中,我们此时从流程变量中获取fromTaskId,并将次id作为任务B的来源id持久化到一张自己创建的任务关系表中。这样后面要进行驳回时,只要通过这样关系表,马上就可以定位到要驳回到的任务id了。

实现代码

任务完成监听器

// 关于监听器的注册看上面配置类中typedListeners部分已有
@Component
public class TaskCompletedListener implements ActivitiEventListener {
    public void onEvent(ActivitiEvent event){
        TaskEntity taskEntity = (TaskEntity)((ActivitiEntityEvent)event).getEntity();
        taskEntity.setVariable("fromTaskIdVarKey", taskEntity.getId());
    }

    public boolean isFailOnException(){
        return true;
    }
}

任务创建完成监听器

@Component
public class TaskCreatedListener implements ActivitiEventListener {
    public void onEvent(ActivitiEvent event){
        TaskEntity taskEntity = (TaskEntity)((ActivitiEntityEvent)event).getEntity();
        String fromTaskId = (String)taskEntity.getVariable(WfVarKeyConstants.fromTaskId);
        if(StringUtils.isEmpty(fromTaskId)) return;
        xxxTaskInfo info = new xxxTaskInfo();
        info.setId(taskEntity.getId());
        info.setFromId(fromTaskId);
        //此处进行任务关系持久化,自行实现
        xxxTaskInfoRepository.save(info);
    }
    public boolean isFailOnException(){
        return true;
    }
}

3.最后

本来打算做一个Activiti小贴士列表,不过看篇幅已经很长了,小贴士好像也凑不齐一篇文章,而且还没人看:)
那就放到下次来说
todo
1.Activiti命令执行模式
2.持久化过程与会话缓存(CRUD)
3.BPMN流程执行计划

查看原文

坐死等吃 评论了文章 · 2019-04-24

Activiti6通过监听修改实体id、springboot集成配置

1.前言

本文内容主要为以下两点,因为内容有交叉,所以会放在一起介绍。

  • 1.以自由跳转为基础实现不改变原先任务id的驳回
    关于Activiti6动态跳转可以查看我的另一篇文章Activiti6实现自由跳转
  • 2.java类方式进行Activiti6配置、spring boot集成
    因为有一些自定义的需求,如流程字体、自动部署、自定义监听器等,直接引入[activit-spring-boot]又没有必要,所以参考activit6源码中[activit-spring-boot]模块的代码完成。

2.实现介绍

关于自由跳转的内容我就不再多说,主要介绍如何修改Activiti生成的实体的id,以达到驳回时重新生成的任务id与原先的任务id一致。(某些业务场景下可能会用到,例如某流程中A环节提交的表单与task id绑定,当环节提交又被驳回时,为保证表单内容与任务关系不变,驳回后的任务id与原先任务id要一致)

2.1前提知识

  • 1.Activiti持久化实体的过程时先创建实体对象,记录到缓存中,在完成执行后统一进行缓存对象的持久化,并清空缓存。
  • 2.Activiti采用命令模式执行操作,所有操作都时一个CMD。执行一个CMD的时候会创建一个上下文环境,包含待持久化的实体缓存等,如果在CMD中嵌套执行CMD,新的CMD默认会使用上级上下文环境。当一个根级的CMD结束时,Activiti就会进行上述的缓存对象统一的持久化。
  • 3.Activiti有丰富的事件类型(具体可以查看事件枚举类ActivitiEventType)供我们实现相应监听器,进行特殊业务处理。例如ENTITY_CREATED——实体创建完成(task、activity、Execution等所有实体)、TASK_CREATED——任务创建完成(针对task)、TASK_COMPLETED——任务完成等等。

2.2关于修改任务id

结合上述内容我们就可以知道,只要在TASK_CREATED进行监听,直接在监听器中将id改为需要的值即可。理论上是这样,但是需要注意,Activiti6中历史任务实体创建是在TASK_CREATED之前的,如果你在TASK_CREATED中修改任务id,实际上历史任务实体创建时是获取不到的,这样就会导致历史任务的id与运行时任务id不一致。解决的办法也很简单,改为监听ENTITY_CREATED,判断是否时需要修改id的任务实体即可。

实现代码

properties配置文件

# 是否更新数据库表
spring.activiti.databaseSchemaUpdate=true
# 是否激活异步执行器
spring.activiti.asyncExecutorActivate=false
# 流程历史记录登录
spring.activiti.historyLevel=audit
# 是否检查更新流程定义
spring.activiti.checkProcessDefinitions=false
# 流程定义所在前缀
spring.activiti.processDefinitionLocationPrefix=classpath*:/procDef/
# 流程定义后缀
spring.activiti.processDefinitionLocationSuffixes=**.bpmn
# 部署流程定义时是否生成图片
spring.activiti.createDiagramOnDeploy=false
# 字体 下面内容为转成unicode的'宋体'
spring.activiti.activityFontName=\u5b8b\u4f53
spring.activiti.labelFontName=\u5b8b\u4f53

解析Properties类

@ConfigurationProperties("spring.activiti")
public class ActivitiProperties {
    private boolean checkProcessDefinitions = true;
    private boolean asyncExecutorActivate = true;
    private boolean restApiEnabled;
    private String deploymentName;
    private String mailServerHost = "localhost";
    private int mailServerPort = 1025;
    private String mailServerUserName;
    private String mailServerPassword;
    private String mailServerDefaultFrom;
    private boolean mailServerUseSsl;
    private boolean mailServerUseTls;
    private String databaseSchemaUpdate = "true";
    private String databaseSchema;
    private boolean isDbIdentityUsed = true;
    private boolean isDbHistoryUsed = true;
    private HistoryLevel historyLevel = HistoryLevel.AUDIT;
    private String processDefinitionLocationPrefix = "classpath:/processes/";
    private List<String> processDefinitionLocationSuffixes = Arrays.asList("**.bpmn20.xml", "**.bpmn");
    private String restApiMapping = "/api/*";
    private String restApiServletName = "activitiRestApi";
    private boolean jpaEnabled = true; // true by default
    private List<String> customMybatisMappers;
    private List<String> customMybatisXMLMappers;

    private boolean createDiagramOnDeploy;
    private String activityFontName;
    private String labelFontName;
    //省略getter、setter
}

spring boot配置类

@Configuration
@EnableConfigurationProperties(ActivitiProperties.class)
public class ActivitiConfig {
    private static final Logger logger = LoggerFactory.getLogger(ActivitiConfig.class);
    @Autowired
    private ActivitiProperties activitiProperties;
    @Autowired
    private DataSource dataSource;
    @Autowired
    private PlatformTransactionManager transactionManager;
    @Autowired
    private TaskCreatedListener taskCreatedListener;
    @Autowired
    private TaskCompletedListener taskCompletedListener;
    @Autowired
    private EntityCreatedListener entityCreatedListener;
    @Autowired
    private ResourcePatternResolver resourceLoader;

    @Bean
    public SpringProcessEngineConfiguration processEngineConfiguration() throws IOException {
        SpringProcessEngineConfiguration configuration = new SpringProcessEngineConfiguration();
        configuration.setDataSource(dataSource);
        configuration.setTransactionManager(transactionManager);
        configuration.setDatabaseSchemaUpdate(activitiProperties.getDatabaseSchemaUpdate());
        configuration.setAsyncExecutorActivate(activitiProperties.isAsyncExecutorActivate());
        configuration.setHistory(activitiProperties.getHistoryLevel().getKey());
        configuration.setCreateDiagramOnDeploy(activitiProperties.isCreateDiagramOnDeploy());
        configuration.setActivityFontName(activitiProperties.getActivityFontName());
        configuration.setLabelFontName(activitiProperties.getLabelFontName());
        //todo 修改自动部署,当前自动部署直接搬自[activit-spring-boot]
        //如果checkProcessDefinitions为true,则发布新版流程定义,后续可能根据流程定义文件MD5等判断是否真正变化而进行发布
        List<Resource> procDefResources = discoverProcessDefinitionResources(activitiProperties.getProcessDefinitionLocationPrefix(),            activitiProperties.getProcessDefinitionLocationSuffixes(),this.activitiProperties.isCheckProcessDefinitions());
        configuration.setDeploymentResources(procDefResources.toArray(new Resource[procDefResources.size()]));
        Map<String, List<ActivitiEventListener>> typedListeners = new HashMap<>();
        typedListeners.put("ENTITY_CREATED", Collections.singletonList(entityCreatedListener));
        typedListeners.put("TASK_CREATED", Collections.singletonList(taskCreatedListener));
        typedListeners.put("TASK_COMPLETED", Collections.singletonList(taskCompletedListener));
        configuration.setTypedEventListeners(typedListeners);
        return configuration;
    }
    private List<Resource> discoverProcessDefinitionResources(String prefix, List<String> suffixes, boolean checkPDs) throws IOException {
        if (checkPDs) {
            List<Resource> result = new ArrayList<>();
            for (String suffix : suffixes) {
                String path = prefix + suffix;
                Resource[] resources = resourceLoader.getResources(path);
                if (resources != null && resources.length > 0) {
                    CollectionUtils.mergeArrayIntoCollection(resources, result);
                }
            }
            if (result.isEmpty()) {
                logger.info("No process definitions were found for autodeployment");
            }
            return result;
        }
        return new ArrayList<>();
    }
    @Bean
    public ProcessEngineFactoryBean processEngine() throws IOException {
        ProcessEngineFactoryBean factoryBean = new ProcessEngineFactoryBean();
        factoryBean.setProcessEngineConfiguration(processEngineConfiguration());
        return factoryBean;
    }
    @Bean
    public RuntimeService runtimeService(ProcessEngine processEngine) {
        return processEngine.getRuntimeService();
    }
    @Bean
    public RepositoryService repositoryService(ProcessEngine processEngine) {
        return processEngine.getRepositoryService();
    }
    @Bean
    public TaskService taskService(ProcessEngine processEngine) {
        return processEngine.getTaskService();
    }
    @Bean
    public HistoryService historyService(ProcessEngine processEngine) {
        return processEngine.getHistoryService();
    }
    @Bean
    public ManagementService managementService(ProcessEngine processEngine) {
        return processEngine.getManagementService();
    }
    @Bean
    public IdentityService identityService(ProcessEngine processEngine) {
        return processEngine.getIdentityService();
    }

实体创建完成监听器

@Component
public class EntityCreatedListener implements ActivitiEventListener {
    public void onEvent(ActivitiEvent event){
        Object entity = ((ActivitiEntityEvent)event).getEntity();
        if(entity instanceof TaskEntity){
            TaskEntity taskEntity = (TaskEntity)entity;
            // 这个要改变的id值,可以在上篇文章中的SetFLowNodeAndGoCmd中设置相应流程变量即可。
            String changeTaskId = (String)taskEntity.getVariable("changeTaskIdVarKey");
            if(!StringUtils.isEmpty(changeTaskId)){
                taskEntity.setId(changeTaskId);
                taskEntity.setVariable("changeTaskIdKey","");
            }
        }
    }
    public boolean isFailOnException(){
        return true;
    }
}

2.3关于如何获取当前任务的来源任务,以进行驳回

我们知道Activiti中有TASK_CREATED和TASK_COMPLETED事件,在同一个流程实例中,一个任务A如果不是最后的结束任务,那么在它完成后,必定会有一个新的任务B创建,而我们简单理解为A为B的来源任务。(假设A是申请任务,B就时审批任务,B的处理人对当前审批不同意要驳回时,流程就要回退到任务A。)
这样一来,我们可以监听TASK_COMPLETED,在此时为流程设置一个变量fromTaskId,值为任务A的id,当任务A的TASK_COMPLETED结束后,就来到的了任务B的TASK_CREATED中,我们此时从流程变量中获取fromTaskId,并将次id作为任务B的来源id持久化到一张自己创建的任务关系表中。这样后面要进行驳回时,只要通过这样关系表,马上就可以定位到要驳回到的任务id了。

实现代码

任务完成监听器

// 关于监听器的注册看上面配置类中typedListeners部分已有
@Component
public class TaskCompletedListener implements ActivitiEventListener {
    public void onEvent(ActivitiEvent event){
        TaskEntity taskEntity = (TaskEntity)((ActivitiEntityEvent)event).getEntity();
        taskEntity.setVariable("fromTaskIdVarKey", taskEntity.getId());
    }

    public boolean isFailOnException(){
        return true;
    }
}

任务创建完成监听器

@Component
public class TaskCreatedListener implements ActivitiEventListener {
    public void onEvent(ActivitiEvent event){
        TaskEntity taskEntity = (TaskEntity)((ActivitiEntityEvent)event).getEntity();
        String fromTaskId = (String)taskEntity.getVariable(WfVarKeyConstants.fromTaskId);
        if(StringUtils.isEmpty(fromTaskId)) return;
        xxxTaskInfo info = new xxxTaskInfo();
        info.setId(taskEntity.getId());
        info.setFromId(fromTaskId);
        //此处进行任务关系持久化,自行实现
        xxxTaskInfoRepository.save(info);
    }
    public boolean isFailOnException(){
        return true;
    }
}

3.最后

本来打算做一个Activiti小贴士列表,不过看篇幅已经很长了,小贴士好像也凑不齐一篇文章,而且还没人看:)
那就放到下次来说
todo
1.Activiti命令执行模式
2.持久化过程与会话缓存(CRUD)
3.BPMN流程执行计划

查看原文

坐死等吃 评论了文章 · 2019-04-24

Activiti6通过监听修改实体id、springboot集成配置

1.前言

本文内容主要为以下两点,因为内容有交叉,所以会放在一起介绍。

  • 1.以自由跳转为基础实现不改变原先任务id的驳回
    关于Activiti6动态跳转可以查看我的另一篇文章Activiti6实现自由跳转
  • 2.java类方式进行Activiti6配置、spring boot集成
    因为有一些自定义的需求,如流程字体、自动部署、自定义监听器等,直接引入[activit-spring-boot]又没有必要,所以参考activit6源码中[activit-spring-boot]模块的代码完成。

2.实现介绍

关于自由跳转的内容我就不再多说,主要介绍如何修改Activiti生成的实体的id,以达到驳回时重新生成的任务id与原先的任务id一致。(某些业务场景下可能会用到,例如某流程中A环节提交的表单与task id绑定,当环节提交又被驳回时,为保证表单内容与任务关系不变,驳回后的任务id与原先任务id要一致)

2.1前提知识

  • 1.Activiti持久化实体的过程时先创建实体对象,记录到缓存中,在完成执行后统一进行缓存对象的持久化,并清空缓存。
  • 2.Activiti采用命令模式执行操作,所有操作都时一个CMD。执行一个CMD的时候会创建一个上下文环境,包含待持久化的实体缓存等,如果在CMD中嵌套执行CMD,新的CMD默认会使用上级上下文环境。当一个根级的CMD结束时,Activiti就会进行上述的缓存对象统一的持久化。
  • 3.Activiti有丰富的事件类型(具体可以查看事件枚举类ActivitiEventType)供我们实现相应监听器,进行特殊业务处理。例如ENTITY_CREATED——实体创建完成(task、activity、Execution等所有实体)、TASK_CREATED——任务创建完成(针对task)、TASK_COMPLETED——任务完成等等。

2.2关于修改任务id

结合上述内容我们就可以知道,只要在TASK_CREATED进行监听,直接在监听器中将id改为需要的值即可。理论上是这样,但是需要注意,Activiti6中历史任务实体创建是在TASK_CREATED之前的,如果你在TASK_CREATED中修改任务id,实际上历史任务实体创建时是获取不到的,这样就会导致历史任务的id与运行时任务id不一致。解决的办法也很简单,改为监听ENTITY_CREATED,判断是否时需要修改id的任务实体即可。

实现代码

properties配置文件

# 是否更新数据库表
spring.activiti.databaseSchemaUpdate=true
# 是否激活异步执行器
spring.activiti.asyncExecutorActivate=false
# 流程历史记录登录
spring.activiti.historyLevel=audit
# 是否检查更新流程定义
spring.activiti.checkProcessDefinitions=false
# 流程定义所在前缀
spring.activiti.processDefinitionLocationPrefix=classpath*:/procDef/
# 流程定义后缀
spring.activiti.processDefinitionLocationSuffixes=**.bpmn
# 部署流程定义时是否生成图片
spring.activiti.createDiagramOnDeploy=false
# 字体 下面内容为转成unicode的'宋体'
spring.activiti.activityFontName=\u5b8b\u4f53
spring.activiti.labelFontName=\u5b8b\u4f53

解析Properties类

@ConfigurationProperties("spring.activiti")
public class ActivitiProperties {
    private boolean checkProcessDefinitions = true;
    private boolean asyncExecutorActivate = true;
    private boolean restApiEnabled;
    private String deploymentName;
    private String mailServerHost = "localhost";
    private int mailServerPort = 1025;
    private String mailServerUserName;
    private String mailServerPassword;
    private String mailServerDefaultFrom;
    private boolean mailServerUseSsl;
    private boolean mailServerUseTls;
    private String databaseSchemaUpdate = "true";
    private String databaseSchema;
    private boolean isDbIdentityUsed = true;
    private boolean isDbHistoryUsed = true;
    private HistoryLevel historyLevel = HistoryLevel.AUDIT;
    private String processDefinitionLocationPrefix = "classpath:/processes/";
    private List<String> processDefinitionLocationSuffixes = Arrays.asList("**.bpmn20.xml", "**.bpmn");
    private String restApiMapping = "/api/*";
    private String restApiServletName = "activitiRestApi";
    private boolean jpaEnabled = true; // true by default
    private List<String> customMybatisMappers;
    private List<String> customMybatisXMLMappers;

    private boolean createDiagramOnDeploy;
    private String activityFontName;
    private String labelFontName;
    //省略getter、setter
}

spring boot配置类

@Configuration
@EnableConfigurationProperties(ActivitiProperties.class)
public class ActivitiConfig {
    private static final Logger logger = LoggerFactory.getLogger(ActivitiConfig.class);
    @Autowired
    private ActivitiProperties activitiProperties;
    @Autowired
    private DataSource dataSource;
    @Autowired
    private PlatformTransactionManager transactionManager;
    @Autowired
    private TaskCreatedListener taskCreatedListener;
    @Autowired
    private TaskCompletedListener taskCompletedListener;
    @Autowired
    private EntityCreatedListener entityCreatedListener;
    @Autowired
    private ResourcePatternResolver resourceLoader;

    @Bean
    public SpringProcessEngineConfiguration processEngineConfiguration() throws IOException {
        SpringProcessEngineConfiguration configuration = new SpringProcessEngineConfiguration();
        configuration.setDataSource(dataSource);
        configuration.setTransactionManager(transactionManager);
        configuration.setDatabaseSchemaUpdate(activitiProperties.getDatabaseSchemaUpdate());
        configuration.setAsyncExecutorActivate(activitiProperties.isAsyncExecutorActivate());
        configuration.setHistory(activitiProperties.getHistoryLevel().getKey());
        configuration.setCreateDiagramOnDeploy(activitiProperties.isCreateDiagramOnDeploy());
        configuration.setActivityFontName(activitiProperties.getActivityFontName());
        configuration.setLabelFontName(activitiProperties.getLabelFontName());
        //todo 修改自动部署,当前自动部署直接搬自[activit-spring-boot]
        //如果checkProcessDefinitions为true,则发布新版流程定义,后续可能根据流程定义文件MD5等判断是否真正变化而进行发布
        List<Resource> procDefResources = discoverProcessDefinitionResources(activitiProperties.getProcessDefinitionLocationPrefix(),            activitiProperties.getProcessDefinitionLocationSuffixes(),this.activitiProperties.isCheckProcessDefinitions());
        configuration.setDeploymentResources(procDefResources.toArray(new Resource[procDefResources.size()]));
        Map<String, List<ActivitiEventListener>> typedListeners = new HashMap<>();
        typedListeners.put("ENTITY_CREATED", Collections.singletonList(entityCreatedListener));
        typedListeners.put("TASK_CREATED", Collections.singletonList(taskCreatedListener));
        typedListeners.put("TASK_COMPLETED", Collections.singletonList(taskCompletedListener));
        configuration.setTypedEventListeners(typedListeners);
        return configuration;
    }
    private List<Resource> discoverProcessDefinitionResources(String prefix, List<String> suffixes, boolean checkPDs) throws IOException {
        if (checkPDs) {
            List<Resource> result = new ArrayList<>();
            for (String suffix : suffixes) {
                String path = prefix + suffix;
                Resource[] resources = resourceLoader.getResources(path);
                if (resources != null && resources.length > 0) {
                    CollectionUtils.mergeArrayIntoCollection(resources, result);
                }
            }
            if (result.isEmpty()) {
                logger.info("No process definitions were found for autodeployment");
            }
            return result;
        }
        return new ArrayList<>();
    }
    @Bean
    public ProcessEngineFactoryBean processEngine() throws IOException {
        ProcessEngineFactoryBean factoryBean = new ProcessEngineFactoryBean();
        factoryBean.setProcessEngineConfiguration(processEngineConfiguration());
        return factoryBean;
    }
    @Bean
    public RuntimeService runtimeService(ProcessEngine processEngine) {
        return processEngine.getRuntimeService();
    }
    @Bean
    public RepositoryService repositoryService(ProcessEngine processEngine) {
        return processEngine.getRepositoryService();
    }
    @Bean
    public TaskService taskService(ProcessEngine processEngine) {
        return processEngine.getTaskService();
    }
    @Bean
    public HistoryService historyService(ProcessEngine processEngine) {
        return processEngine.getHistoryService();
    }
    @Bean
    public ManagementService managementService(ProcessEngine processEngine) {
        return processEngine.getManagementService();
    }
    @Bean
    public IdentityService identityService(ProcessEngine processEngine) {
        return processEngine.getIdentityService();
    }

实体创建完成监听器

@Component
public class EntityCreatedListener implements ActivitiEventListener {
    public void onEvent(ActivitiEvent event){
        Object entity = ((ActivitiEntityEvent)event).getEntity();
        if(entity instanceof TaskEntity){
            TaskEntity taskEntity = (TaskEntity)entity;
            // 这个要改变的id值,可以在上篇文章中的SetFLowNodeAndGoCmd中设置相应流程变量即可。
            String changeTaskId = (String)taskEntity.getVariable("changeTaskIdVarKey");
            if(!StringUtils.isEmpty(changeTaskId)){
                taskEntity.setId(changeTaskId);
                taskEntity.setVariable("changeTaskIdKey","");
            }
        }
    }
    public boolean isFailOnException(){
        return true;
    }
}

2.3关于如何获取当前任务的来源任务,以进行驳回

我们知道Activiti中有TASK_CREATED和TASK_COMPLETED事件,在同一个流程实例中,一个任务A如果不是最后的结束任务,那么在它完成后,必定会有一个新的任务B创建,而我们简单理解为A为B的来源任务。(假设A是申请任务,B就时审批任务,B的处理人对当前审批不同意要驳回时,流程就要回退到任务A。)
这样一来,我们可以监听TASK_COMPLETED,在此时为流程设置一个变量fromTaskId,值为任务A的id,当任务A的TASK_COMPLETED结束后,就来到的了任务B的TASK_CREATED中,我们此时从流程变量中获取fromTaskId,并将次id作为任务B的来源id持久化到一张自己创建的任务关系表中。这样后面要进行驳回时,只要通过这样关系表,马上就可以定位到要驳回到的任务id了。

实现代码

任务完成监听器

// 关于监听器的注册看上面配置类中typedListeners部分已有
@Component
public class TaskCompletedListener implements ActivitiEventListener {
    public void onEvent(ActivitiEvent event){
        TaskEntity taskEntity = (TaskEntity)((ActivitiEntityEvent)event).getEntity();
        taskEntity.setVariable("fromTaskIdVarKey", taskEntity.getId());
    }

    public boolean isFailOnException(){
        return true;
    }
}

任务创建完成监听器

@Component
public class TaskCreatedListener implements ActivitiEventListener {
    public void onEvent(ActivitiEvent event){
        TaskEntity taskEntity = (TaskEntity)((ActivitiEntityEvent)event).getEntity();
        String fromTaskId = (String)taskEntity.getVariable(WfVarKeyConstants.fromTaskId);
        if(StringUtils.isEmpty(fromTaskId)) return;
        xxxTaskInfo info = new xxxTaskInfo();
        info.setId(taskEntity.getId());
        info.setFromId(fromTaskId);
        //此处进行任务关系持久化,自行实现
        xxxTaskInfoRepository.save(info);
    }
    public boolean isFailOnException(){
        return true;
    }
}

3.最后

本来打算做一个Activiti小贴士列表,不过看篇幅已经很长了,小贴士好像也凑不齐一篇文章,而且还没人看:)
那就放到下次来说
todo
1.Activiti命令执行模式
2.持久化过程与会话缓存(CRUD)
3.BPMN流程执行计划

查看原文

坐死等吃 评论了文章 · 2019-04-23

Activiti6通过监听修改实体id、springboot集成配置

1.前言

本文内容主要为以下两点,因为内容有交叉,所以会放在一起介绍。

  • 1.以自由跳转为基础实现不改变原先任务id的驳回
    关于Activiti6动态跳转可以查看我的另一篇文章Activiti6实现自由跳转
  • 2.java类方式进行Activiti6配置、spring boot集成
    因为有一些自定义的需求,如流程字体、自动部署、自定义监听器等,直接引入[activit-spring-boot]又没有必要,所以参考activit6源码中[activit-spring-boot]模块的代码完成。

2.实现介绍

关于自由跳转的内容我就不再多说,主要介绍如何修改Activiti生成的实体的id,以达到驳回时重新生成的任务id与原先的任务id一致。(某些业务场景下可能会用到,例如某流程中A环节提交的表单与task id绑定,当环节提交又被驳回时,为保证表单内容与任务关系不变,驳回后的任务id与原先任务id要一致)

2.1前提知识

  • 1.Activiti持久化实体的过程时先创建实体对象,记录到缓存中,在完成执行后统一进行缓存对象的持久化,并清空缓存。
  • 2.Activiti采用命令模式执行操作,所有操作都时一个CMD。执行一个CMD的时候会创建一个上下文环境,包含待持久化的实体缓存等,如果在CMD中嵌套执行CMD,新的CMD默认会使用上级上下文环境。当一个根级的CMD结束时,Activiti就会进行上述的缓存对象统一的持久化。
  • 3.Activiti有丰富的事件类型(具体可以查看事件枚举类ActivitiEventType)供我们实现相应监听器,进行特殊业务处理。例如ENTITY_CREATED——实体创建完成(task、activity、Execution等所有实体)、TASK_CREATED——任务创建完成(针对task)、TASK_COMPLETED——任务完成等等。

2.2关于修改任务id

结合上述内容我们就可以知道,只要在TASK_CREATED进行监听,直接在监听器中将id改为需要的值即可。理论上是这样,但是需要注意,Activiti6中历史任务实体创建是在TASK_CREATED之前的,如果你在TASK_CREATED中修改任务id,实际上历史任务实体创建时是获取不到的,这样就会导致历史任务的id与运行时任务id不一致。解决的办法也很简单,改为监听ENTITY_CREATED,判断是否时需要修改id的任务实体即可。

实现代码

properties配置文件

# 是否更新数据库表
spring.activiti.databaseSchemaUpdate=true
# 是否激活异步执行器
spring.activiti.asyncExecutorActivate=false
# 流程历史记录登录
spring.activiti.historyLevel=audit
# 是否检查更新流程定义
spring.activiti.checkProcessDefinitions=false
# 流程定义所在前缀
spring.activiti.processDefinitionLocationPrefix=classpath*:/procDef/
# 流程定义后缀
spring.activiti.processDefinitionLocationSuffixes=**.bpmn
# 部署流程定义时是否生成图片
spring.activiti.createDiagramOnDeploy=false
# 字体 下面内容为转成unicode的'宋体'
spring.activiti.activityFontName=\u5b8b\u4f53
spring.activiti.labelFontName=\u5b8b\u4f53

解析Properties类

@ConfigurationProperties("spring.activiti")
public class ActivitiProperties {
    private boolean checkProcessDefinitions = true;
    private boolean asyncExecutorActivate = true;
    private boolean restApiEnabled;
    private String deploymentName;
    private String mailServerHost = "localhost";
    private int mailServerPort = 1025;
    private String mailServerUserName;
    private String mailServerPassword;
    private String mailServerDefaultFrom;
    private boolean mailServerUseSsl;
    private boolean mailServerUseTls;
    private String databaseSchemaUpdate = "true";
    private String databaseSchema;
    private boolean isDbIdentityUsed = true;
    private boolean isDbHistoryUsed = true;
    private HistoryLevel historyLevel = HistoryLevel.AUDIT;
    private String processDefinitionLocationPrefix = "classpath:/processes/";
    private List<String> processDefinitionLocationSuffixes = Arrays.asList("**.bpmn20.xml", "**.bpmn");
    private String restApiMapping = "/api/*";
    private String restApiServletName = "activitiRestApi";
    private boolean jpaEnabled = true; // true by default
    private List<String> customMybatisMappers;
    private List<String> customMybatisXMLMappers;

    private boolean createDiagramOnDeploy;
    private String activityFontName;
    private String labelFontName;
    //省略getter、setter
}

spring boot配置类

@Configuration
@EnableConfigurationProperties(ActivitiProperties.class)
public class ActivitiConfig {
    private static final Logger logger = LoggerFactory.getLogger(ActivitiConfig.class);
    @Autowired
    private ActivitiProperties activitiProperties;
    @Autowired
    private DataSource dataSource;
    @Autowired
    private PlatformTransactionManager transactionManager;
    @Autowired
    private TaskCreatedListener taskCreatedListener;
    @Autowired
    private TaskCompletedListener taskCompletedListener;
    @Autowired
    private EntityCreatedListener entityCreatedListener;
    @Autowired
    private ResourcePatternResolver resourceLoader;

    @Bean
    public SpringProcessEngineConfiguration processEngineConfiguration() throws IOException {
        SpringProcessEngineConfiguration configuration = new SpringProcessEngineConfiguration();
        configuration.setDataSource(dataSource);
        configuration.setTransactionManager(transactionManager);
        configuration.setDatabaseSchemaUpdate(activitiProperties.getDatabaseSchemaUpdate());
        configuration.setAsyncExecutorActivate(activitiProperties.isAsyncExecutorActivate());
        configuration.setHistory(activitiProperties.getHistoryLevel().getKey());
        configuration.setCreateDiagramOnDeploy(activitiProperties.isCreateDiagramOnDeploy());
        configuration.setActivityFontName(activitiProperties.getActivityFontName());
        configuration.setLabelFontName(activitiProperties.getLabelFontName());
        //todo 修改自动部署,当前自动部署直接搬自[activit-spring-boot]
        //如果checkProcessDefinitions为true,则发布新版流程定义,后续可能根据流程定义文件MD5等判断是否真正变化而进行发布
        List<Resource> procDefResources = discoverProcessDefinitionResources(activitiProperties.getProcessDefinitionLocationPrefix(),            activitiProperties.getProcessDefinitionLocationSuffixes(),this.activitiProperties.isCheckProcessDefinitions());
        configuration.setDeploymentResources(procDefResources.toArray(new Resource[procDefResources.size()]));
        Map<String, List<ActivitiEventListener>> typedListeners = new HashMap<>();
        typedListeners.put("ENTITY_CREATED", Collections.singletonList(entityCreatedListener));
        typedListeners.put("TASK_CREATED", Collections.singletonList(taskCreatedListener));
        typedListeners.put("TASK_COMPLETED", Collections.singletonList(taskCompletedListener));
        configuration.setTypedEventListeners(typedListeners);
        return configuration;
    }
    private List<Resource> discoverProcessDefinitionResources(String prefix, List<String> suffixes, boolean checkPDs) throws IOException {
        if (checkPDs) {
            List<Resource> result = new ArrayList<>();
            for (String suffix : suffixes) {
                String path = prefix + suffix;
                Resource[] resources = resourceLoader.getResources(path);
                if (resources != null && resources.length > 0) {
                    CollectionUtils.mergeArrayIntoCollection(resources, result);
                }
            }
            if (result.isEmpty()) {
                logger.info("No process definitions were found for autodeployment");
            }
            return result;
        }
        return new ArrayList<>();
    }
    @Bean
    public ProcessEngineFactoryBean processEngine() throws IOException {
        ProcessEngineFactoryBean factoryBean = new ProcessEngineFactoryBean();
        factoryBean.setProcessEngineConfiguration(processEngineConfiguration());
        return factoryBean;
    }
    @Bean
    public RuntimeService runtimeService(ProcessEngine processEngine) {
        return processEngine.getRuntimeService();
    }
    @Bean
    public RepositoryService repositoryService(ProcessEngine processEngine) {
        return processEngine.getRepositoryService();
    }
    @Bean
    public TaskService taskService(ProcessEngine processEngine) {
        return processEngine.getTaskService();
    }
    @Bean
    public HistoryService historyService(ProcessEngine processEngine) {
        return processEngine.getHistoryService();
    }
    @Bean
    public ManagementService managementService(ProcessEngine processEngine) {
        return processEngine.getManagementService();
    }
    @Bean
    public IdentityService identityService(ProcessEngine processEngine) {
        return processEngine.getIdentityService();
    }

实体创建完成监听器

@Component
public class EntityCreatedListener implements ActivitiEventListener {
    public void onEvent(ActivitiEvent event){
        Object entity = ((ActivitiEntityEvent)event).getEntity();
        if(entity instanceof TaskEntity){
            TaskEntity taskEntity = (TaskEntity)entity;
            // 这个要改变的id值,可以在上篇文章中的SetFLowNodeAndGoCmd中设置相应流程变量即可。
            String changeTaskId = (String)taskEntity.getVariable("changeTaskIdVarKey");
            if(!StringUtils.isEmpty(changeTaskId)){
                taskEntity.setId(changeTaskId);
                taskEntity.setVariable("changeTaskIdKey","");
            }
        }
    }
    public boolean isFailOnException(){
        return true;
    }
}

2.3关于如何获取当前任务的来源任务,以进行驳回

我们知道Activiti中有TASK_CREATED和TASK_COMPLETED事件,在同一个流程实例中,一个任务A如果不是最后的结束任务,那么在它完成后,必定会有一个新的任务B创建,而我们简单理解为A为B的来源任务。(假设A是申请任务,B就时审批任务,B的处理人对当前审批不同意要驳回时,流程就要回退到任务A。)
这样一来,我们可以监听TASK_COMPLETED,在此时为流程设置一个变量fromTaskId,值为任务A的id,当任务A的TASK_COMPLETED结束后,就来到的了任务B的TASK_CREATED中,我们此时从流程变量中获取fromTaskId,并将次id作为任务B的来源id持久化到一张自己创建的任务关系表中。这样后面要进行驳回时,只要通过这样关系表,马上就可以定位到要驳回到的任务id了。

实现代码

任务完成监听器

// 关于监听器的注册看上面配置类中typedListeners部分已有
@Component
public class TaskCompletedListener implements ActivitiEventListener {
    public void onEvent(ActivitiEvent event){
        TaskEntity taskEntity = (TaskEntity)((ActivitiEntityEvent)event).getEntity();
        taskEntity.setVariable("fromTaskIdVarKey", taskEntity.getId());
    }

    public boolean isFailOnException(){
        return true;
    }
}

任务创建完成监听器

@Component
public class TaskCreatedListener implements ActivitiEventListener {
    public void onEvent(ActivitiEvent event){
        TaskEntity taskEntity = (TaskEntity)((ActivitiEntityEvent)event).getEntity();
        String fromTaskId = (String)taskEntity.getVariable(WfVarKeyConstants.fromTaskId);
        if(StringUtils.isEmpty(fromTaskId)) return;
        xxxTaskInfo info = new xxxTaskInfo();
        info.setId(taskEntity.getId());
        info.setFromId(fromTaskId);
        //此处进行任务关系持久化,自行实现
        xxxTaskInfoRepository.save(info);
    }
    public boolean isFailOnException(){
        return true;
    }
}

3.最后

本来打算做一个Activiti小贴士列表,不过看篇幅已经很长了,小贴士好像也凑不齐一篇文章,而且还没人看:)
那就放到下次来说
todo
1.Activiti命令执行模式
2.持久化过程与会话缓存(CRUD)
3.BPMN流程执行计划

查看原文

坐死等吃 评论了文章 · 2019-04-23

Activiti6通过监听修改实体id、springboot集成配置

1.前言

本文内容主要为以下两点,因为内容有交叉,所以会放在一起介绍。

  • 1.以自由跳转为基础实现不改变原先任务id的驳回
    关于Activiti6动态跳转可以查看我的另一篇文章Activiti6实现自由跳转
  • 2.java类方式进行Activiti6配置、spring boot集成
    因为有一些自定义的需求,如流程字体、自动部署、自定义监听器等,直接引入[activit-spring-boot]又没有必要,所以参考activit6源码中[activit-spring-boot]模块的代码完成。

2.实现介绍

关于自由跳转的内容我就不再多说,主要介绍如何修改Activiti生成的实体的id,以达到驳回时重新生成的任务id与原先的任务id一致。(某些业务场景下可能会用到,例如某流程中A环节提交的表单与task id绑定,当环节提交又被驳回时,为保证表单内容与任务关系不变,驳回后的任务id与原先任务id要一致)

2.1前提知识

  • 1.Activiti持久化实体的过程时先创建实体对象,记录到缓存中,在完成执行后统一进行缓存对象的持久化,并清空缓存。
  • 2.Activiti采用命令模式执行操作,所有操作都时一个CMD。执行一个CMD的时候会创建一个上下文环境,包含待持久化的实体缓存等,如果在CMD中嵌套执行CMD,新的CMD默认会使用上级上下文环境。当一个根级的CMD结束时,Activiti就会进行上述的缓存对象统一的持久化。
  • 3.Activiti有丰富的事件类型(具体可以查看事件枚举类ActivitiEventType)供我们实现相应监听器,进行特殊业务处理。例如ENTITY_CREATED——实体创建完成(task、activity、Execution等所有实体)、TASK_CREATED——任务创建完成(针对task)、TASK_COMPLETED——任务完成等等。

2.2关于修改任务id

结合上述内容我们就可以知道,只要在TASK_CREATED进行监听,直接在监听器中将id改为需要的值即可。理论上是这样,但是需要注意,Activiti6中历史任务实体创建是在TASK_CREATED之前的,如果你在TASK_CREATED中修改任务id,实际上历史任务实体创建时是获取不到的,这样就会导致历史任务的id与运行时任务id不一致。解决的办法也很简单,改为监听ENTITY_CREATED,判断是否时需要修改id的任务实体即可。

实现代码

properties配置文件

# 是否更新数据库表
spring.activiti.databaseSchemaUpdate=true
# 是否激活异步执行器
spring.activiti.asyncExecutorActivate=false
# 流程历史记录登录
spring.activiti.historyLevel=audit
# 是否检查更新流程定义
spring.activiti.checkProcessDefinitions=false
# 流程定义所在前缀
spring.activiti.processDefinitionLocationPrefix=classpath*:/procDef/
# 流程定义后缀
spring.activiti.processDefinitionLocationSuffixes=**.bpmn
# 部署流程定义时是否生成图片
spring.activiti.createDiagramOnDeploy=false
# 字体 下面内容为转成unicode的'宋体'
spring.activiti.activityFontName=\u5b8b\u4f53
spring.activiti.labelFontName=\u5b8b\u4f53

解析Properties类

@ConfigurationProperties("spring.activiti")
public class ActivitiProperties {
    private boolean checkProcessDefinitions = true;
    private boolean asyncExecutorActivate = true;
    private boolean restApiEnabled;
    private String deploymentName;
    private String mailServerHost = "localhost";
    private int mailServerPort = 1025;
    private String mailServerUserName;
    private String mailServerPassword;
    private String mailServerDefaultFrom;
    private boolean mailServerUseSsl;
    private boolean mailServerUseTls;
    private String databaseSchemaUpdate = "true";
    private String databaseSchema;
    private boolean isDbIdentityUsed = true;
    private boolean isDbHistoryUsed = true;
    private HistoryLevel historyLevel = HistoryLevel.AUDIT;
    private String processDefinitionLocationPrefix = "classpath:/processes/";
    private List<String> processDefinitionLocationSuffixes = Arrays.asList("**.bpmn20.xml", "**.bpmn");
    private String restApiMapping = "/api/*";
    private String restApiServletName = "activitiRestApi";
    private boolean jpaEnabled = true; // true by default
    private List<String> customMybatisMappers;
    private List<String> customMybatisXMLMappers;

    private boolean createDiagramOnDeploy;
    private String activityFontName;
    private String labelFontName;
    //省略getter、setter
}

spring boot配置类

@Configuration
@EnableConfigurationProperties(ActivitiProperties.class)
public class ActivitiConfig {
    private static final Logger logger = LoggerFactory.getLogger(ActivitiConfig.class);
    @Autowired
    private ActivitiProperties activitiProperties;
    @Autowired
    private DataSource dataSource;
    @Autowired
    private PlatformTransactionManager transactionManager;
    @Autowired
    private TaskCreatedListener taskCreatedListener;
    @Autowired
    private TaskCompletedListener taskCompletedListener;
    @Autowired
    private EntityCreatedListener entityCreatedListener;
    @Autowired
    private ResourcePatternResolver resourceLoader;

    @Bean
    public SpringProcessEngineConfiguration processEngineConfiguration() throws IOException {
        SpringProcessEngineConfiguration configuration = new SpringProcessEngineConfiguration();
        configuration.setDataSource(dataSource);
        configuration.setTransactionManager(transactionManager);
        configuration.setDatabaseSchemaUpdate(activitiProperties.getDatabaseSchemaUpdate());
        configuration.setAsyncExecutorActivate(activitiProperties.isAsyncExecutorActivate());
        configuration.setHistory(activitiProperties.getHistoryLevel().getKey());
        configuration.setCreateDiagramOnDeploy(activitiProperties.isCreateDiagramOnDeploy());
        configuration.setActivityFontName(activitiProperties.getActivityFontName());
        configuration.setLabelFontName(activitiProperties.getLabelFontName());
        //todo 修改自动部署,当前自动部署直接搬自[activit-spring-boot]
        //如果checkProcessDefinitions为true,则发布新版流程定义,后续可能根据流程定义文件MD5等判断是否真正变化而进行发布
        List<Resource> procDefResources = discoverProcessDefinitionResources(activitiProperties.getProcessDefinitionLocationPrefix(),            activitiProperties.getProcessDefinitionLocationSuffixes(),this.activitiProperties.isCheckProcessDefinitions());
        configuration.setDeploymentResources(procDefResources.toArray(new Resource[procDefResources.size()]));
        Map<String, List<ActivitiEventListener>> typedListeners = new HashMap<>();
        typedListeners.put("ENTITY_CREATED", Collections.singletonList(entityCreatedListener));
        typedListeners.put("TASK_CREATED", Collections.singletonList(taskCreatedListener));
        typedListeners.put("TASK_COMPLETED", Collections.singletonList(taskCompletedListener));
        configuration.setTypedEventListeners(typedListeners);
        return configuration;
    }
    private List<Resource> discoverProcessDefinitionResources(String prefix, List<String> suffixes, boolean checkPDs) throws IOException {
        if (checkPDs) {
            List<Resource> result = new ArrayList<>();
            for (String suffix : suffixes) {
                String path = prefix + suffix;
                Resource[] resources = resourceLoader.getResources(path);
                if (resources != null && resources.length > 0) {
                    CollectionUtils.mergeArrayIntoCollection(resources, result);
                }
            }
            if (result.isEmpty()) {
                logger.info("No process definitions were found for autodeployment");
            }
            return result;
        }
        return new ArrayList<>();
    }
    @Bean
    public ProcessEngineFactoryBean processEngine() throws IOException {
        ProcessEngineFactoryBean factoryBean = new ProcessEngineFactoryBean();
        factoryBean.setProcessEngineConfiguration(processEngineConfiguration());
        return factoryBean;
    }
    @Bean
    public RuntimeService runtimeService(ProcessEngine processEngine) {
        return processEngine.getRuntimeService();
    }
    @Bean
    public RepositoryService repositoryService(ProcessEngine processEngine) {
        return processEngine.getRepositoryService();
    }
    @Bean
    public TaskService taskService(ProcessEngine processEngine) {
        return processEngine.getTaskService();
    }
    @Bean
    public HistoryService historyService(ProcessEngine processEngine) {
        return processEngine.getHistoryService();
    }
    @Bean
    public ManagementService managementService(ProcessEngine processEngine) {
        return processEngine.getManagementService();
    }
    @Bean
    public IdentityService identityService(ProcessEngine processEngine) {
        return processEngine.getIdentityService();
    }

实体创建完成监听器

@Component
public class EntityCreatedListener implements ActivitiEventListener {
    public void onEvent(ActivitiEvent event){
        Object entity = ((ActivitiEntityEvent)event).getEntity();
        if(entity instanceof TaskEntity){
            TaskEntity taskEntity = (TaskEntity)entity;
            // 这个要改变的id值,可以在上篇文章中的SetFLowNodeAndGoCmd中设置相应流程变量即可。
            String changeTaskId = (String)taskEntity.getVariable("changeTaskIdVarKey");
            if(!StringUtils.isEmpty(changeTaskId)){
                taskEntity.setId(changeTaskId);
                taskEntity.setVariable("changeTaskIdKey","");
            }
        }
    }
    public boolean isFailOnException(){
        return true;
    }
}

2.3关于如何获取当前任务的来源任务,以进行驳回

我们知道Activiti中有TASK_CREATED和TASK_COMPLETED事件,在同一个流程实例中,一个任务A如果不是最后的结束任务,那么在它完成后,必定会有一个新的任务B创建,而我们简单理解为A为B的来源任务。(假设A是申请任务,B就时审批任务,B的处理人对当前审批不同意要驳回时,流程就要回退到任务A。)
这样一来,我们可以监听TASK_COMPLETED,在此时为流程设置一个变量fromTaskId,值为任务A的id,当任务A的TASK_COMPLETED结束后,就来到的了任务B的TASK_CREATED中,我们此时从流程变量中获取fromTaskId,并将次id作为任务B的来源id持久化到一张自己创建的任务关系表中。这样后面要进行驳回时,只要通过这样关系表,马上就可以定位到要驳回到的任务id了。

实现代码

任务完成监听器

// 关于监听器的注册看上面配置类中typedListeners部分已有
@Component
public class TaskCompletedListener implements ActivitiEventListener {
    public void onEvent(ActivitiEvent event){
        TaskEntity taskEntity = (TaskEntity)((ActivitiEntityEvent)event).getEntity();
        taskEntity.setVariable("fromTaskIdVarKey", taskEntity.getId());
    }

    public boolean isFailOnException(){
        return true;
    }
}

任务创建完成监听器

@Component
public class TaskCreatedListener implements ActivitiEventListener {
    public void onEvent(ActivitiEvent event){
        TaskEntity taskEntity = (TaskEntity)((ActivitiEntityEvent)event).getEntity();
        String fromTaskId = (String)taskEntity.getVariable(WfVarKeyConstants.fromTaskId);
        if(StringUtils.isEmpty(fromTaskId)) return;
        xxxTaskInfo info = new xxxTaskInfo();
        info.setId(taskEntity.getId());
        info.setFromId(fromTaskId);
        //此处进行任务关系持久化,自行实现
        xxxTaskInfoRepository.save(info);
    }
    public boolean isFailOnException(){
        return true;
    }
}

3.最后

本来打算做一个Activiti小贴士列表,不过看篇幅已经很长了,小贴士好像也凑不齐一篇文章,而且还没人看:)
那就放到下次来说
todo
1.Activiti命令执行模式
2.持久化过程与会话缓存(CRUD)
3.BPMN流程执行计划

查看原文

坐死等吃 发布了文章 · 2018-12-04

网狐荣耀6701/6801 手机打包发布

1.前言

本篇将介绍网狐cocos2dx-lua工程打包android的apk、ios编译过程以及vs2015模拟运行和调试、lua脚本热更新。
关于网狐的服务端编译运行,请查看网狐6701/6801服务端编译 测试部署

2.目录结构和地址修改

2.1.手机工程目录

-- client                    lua目录
--------- ciphercode         加密后用来发布的lua代码
--------- client             游戏大厅模块
--------- game               子游戏模块
--------- base               lua与cocos c++衔接的基础模块
-- frameworks
--------- cocos2d-x          cocos库
--------- runtime-src        cocos工程目录
-- info                      整理好的文档和批处理脚本
-- publish                   发布apk的目录

2.2.lua代码中配置修改

  1. 进入client目录。
  2. 查看修改base\src\app\models\AppDF.lua中,用来进行app更新和资源、脚本热更新的配置。
    BASE_C_VERSION(app版本)
    BASE_C_RESVERSION(资源版本,包括lua脚本、图片等)
  3. 修改basesrcappviewsWelcomeScene.lua中
    URL_REQUEST -> "http://172.16.58.129"
  4. 修改clientsrcplazamodelsyl.lua中
    LOGONSERVER -> "172.16.58.129"
    SERVER_LIST -> { "172.16.58.129", }

3.环境准备

以下各环境版本为笔者测试通过的,大家可以看情况更新。

3.1.windows环境

windows环境主要用于资源打包发布(publish)、安卓apk打包(android)和vs2015模拟运行和调试(debug)。

  1. Win7
  2. Cocos2dx-3.10(publish、android)Cocos_3.10.exe
    COCOS_CONSOLE_ROOT -> C:\cocos2d-x-3.10\tools\cocos2d-console\bin
    PATH -> %PATH%;%COCOS_CONSOLE_ROOT%
  3. Jdk1.8(android)
    JAVA_HOME -> C:Program FilesJavajdk1.8.0_131
    CLASSPATH -> %JAVA_HOME%lib;%JAVA_HOME%libtools.jar
  4. WinRAR(publish)
    PATH -> %PATH%;C:Program FilesWinRAR
  5. Python2.7(publish)
    PATH -> %PATH%;C:Python27
  6. Ant1.9.6(android)
    ANT_ROOT -> %ANT_HOME%bin
    ANT_HOME -> C:apache-ant-1.9.6
    PATH -> %PATH%;%ANT_HOME%bin
    CLASSPATH -> %ANT_HOME%lib
  7. AndroidSDK(android)adt-bundle-windows-x86-20140624
    ANDROID_SDK_ROOT -> C:sdk
  8. AndroidNDK(android)android-ndk-r10d-windows-x86_64
    NDK_HOME -> C:android-ndk-r10d
    NDK_ROOT -> C:android-ndk-r10d
  9. VS2015(debug)
    vs lua语法插件babelua-compat-1.06-vs2015

3.2.mac环境

  1. mac 10.11
  2. xcode 7.2.1 高版本会有问题,可自行解决

4.发布资源、脚本,网站后台增加手机游戏。

  1. 环境准备,参考博客
  2. info目录下,提供了一键发布资源脚本(发布资源.bat),执行即可。
  3. 将clientciphercode下的文件夹复制到网站前台目录下的download目录(没有则自己创建)。
  4. 进入网站后台->网站系统->站点配置->移动大厅。按字段说明填写,下载地址如下图,大厅版本和资源版本在2.2中提到了。
    clipboard.png
  5. 网站后台->系统维护->游戏管理->手游->新增。模块名称即子游戏在client\game\下的路径,用"."分割。
  6. 关于热更新。修改2.2中提到的资源版本号后,重新发布资源到前台网站download目录下,然后在网站后台,修改移动大厅为刚刚修改号的资源版本号即可。

clipboard.png

5.windows环境下,安卓打包

  1. 修改文件frameworks\runtime-src\proj.android\ant.properties,中keystore的路径。关于keystore的作用可以自行搜索了解。
  2. info目录下,提供了一键打包脚本(发布debug版.bat、发布release版.bat)会发布数到publish路径下。注意查看info目录下的GloryProjectR.bat,其中关键的打包命令call cocos compile -p android --ap android-20 -m release -j 4,需要确保sdk中要有命令中使用的sdk版本。
  3. 将apk放到前台网站的download目录下。apk的名称需要相应修改,因为代码中写死了名称。
    clipboard.png

6. mac环境下,ios编译

因为笔者没有证书,无法进行打包,就简单说明编译过程。需要注意因为网狐使用到的一个第三方支付sdk(竣付通),而该sdk不支持x86_64架构,所以当编译目标为模拟器时,将编译失败,需要使用真机进行编译调试。

  1. 发布资源(info目录下LuaiOSPackage.sh,作用类似发布资源.bat)。
  2. 使用xcode打开frameworks/runtime-src/proj.ios_mac/GloryProject.xcodeproj。
  3. 手机连接电脑,并在手机上选择信任。
  4. xcode点击编译即可,完成会在手机上安装app,打开即可。

7. windows环境下,vs2015模拟运行和调试

  1. 使用vs2015打开frameworks\runtime-src\proj.win32\GloryProject.sln解决方案。
  2. 选择debug配置,编译。生成GloryProject.exe,以此作为lua解释器。
  3. 使用以下命令即可运行手机项目。WORKDIR为client目录路径,game为GloryProject.exe路径。

    set WORKDIR=%CD%/client 
    set game=%CD%/run/debug/GloryProject.exe
    start %game% -workdir %WORKDIR%
  4. 关于调试,右键GloryProject项目->属性->调试。命令即GloryProject.exe的路径(同链接器中中输出文件的路径),命令参数即配置lua代码client目录,工作目录即生成的GloryProject.exe所在目录,完成。
    clipboard.png
  5. 安装完babelua-compat-1.06-vs2015后,vs菜单栏会多一个Lua选项,点击该选项->New Lua Project。点击ok后,因为会创建client目录的索引,clinet目录下文件特别多,所以会很卡,要么等,要么把子游戏目录game下的游戏删一些。
    Lua scripts folder -> lua脚本client目录
    Lua exe path -> GloryProject.exe路径
    Working path -> lua项目vs解决方案存放路径
    Command line -> 同上3
    Lua project name -> 名称
    clipboard.png
  6. 点击本地Windows调试器,即可。c++代码和lua代码均可断电调试了。
    clipboard.png
查看原文

赞 1 收藏 1 评论 0

认证与成就

  • 获得 58 次点赞
  • 获得 4 枚徽章 获得 0 枚金徽章, 获得 0 枚银徽章, 获得 4 枚铜徽章

擅长技能
编辑

(゚∀゚ )
暂时没有

开源项目 & 著作
编辑

(゚∀゚ )
暂时没有

注册于 2017-11-03
个人主页被 1.3k 人浏览