beanlam

beanlam 查看完整档案

广州编辑华南理工大学  |  ebusiness 编辑  |  填写所在公司/组织 www.beanlam.me 编辑
编辑

十年学会编程

个人动态

beanlam 赞了文章 · 11月18日

年轻人不讲武德来白piao我这个老同志

朋友们好啊,我是码农小胖哥。

今天有个同学问我在吗,我说什么事?

给我发个截图,我一看!噢,原来是帮忙搞个定时任务,还是动态的。

他说了两种选择,一种是用DelayQueue,一种是用消息队列。

他说,胖哥你能不能教我点招式混元功法,帮我完成这个需求。

我说可以!

我说你这两种都不好用,他不服气。

我说那你写个DelayQueue来看看,他写不出来。

他说你这估计也不会,我说我确实不会。

这是 JUC,传统底层开发是要讲基础的,必须融会贯通,我只会调包。

这种定时任务我用 Redis 更简单些。

他让我写个 DEMO,我说可以!

我一说,他啪就发了个表情img

很快啊,我就打开 IDEA,一个 DEMO 就出来了。

一个重写了 Redis 的 Key 失效监听器:

/**
 *  当redis 中的key过期时,触发一个事件,并不会准点触发事件,适用于时间不是特别敏感的触发需求。
 *  我们可以算好需要执行的时间间隔作为key失效时间,这样就可以保证到点执行逻辑了。
 */
public class RedisJobEventMessageListener extends KeyExpirationEventMessageListener {

    /**
     * Instantiates a new Redis event message listener.
     *
     * @param listenerContainer the listener container
     */
    public RedisEventMessageListener(RedisMessageListenerContainer listenerContainer) {
        super(listenerContainer);
    }


    @Override
    protected void doHandleMessage(Message message) {
        String key = message.toString();
        // 这个就是过期的key ,过期后,也就是事件触发后对应的value是拿不到的。
        // 这里实现业务逻辑,如果是服务器集群的话需要使用分布式锁进行抢占执行。
        System.out.println("key = " + key);
        System.out.println("end = " + LocalDateTime.now());
    }
}

一个监听器的配置:

/**
 * Redis 消息监听器容器.
 *
 * @param redisConnectionFactory the redis connection factory
 * @return the redis message listener container
 */
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory) {
    RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
    redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
    return redisMessageListenerContainer;
}


/**
 * Redis 定时任务监听器注册为Bean.
 *
 * @param redisMessageListenerContainer the redis message listener container
 * @return the redis event message listener
 */
@Bean
public RedisJobEventMessageListener redisEventMessageListener(RedisMessageListenerContainer redisMessageListenerContainer){
    return new RedisJobEventMessageListener(redisMessageListenerContainer);
}

一个执行测试:

@Test
public void redisJobTest() {
    // 调用 redisTemplate 对象设置一个10s 后过期的键,不出意外 10s 后键过期后会触发事件打印结果
    redisTemplate.boundValueOps("job").set("10s",10, TimeUnit.SECONDS);
    System.out.println("begin = " + LocalDateTime.now());
    try {
        // 测试需要休眠才能看到结果
        Thread.sleep(20000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    // ---------测试结果---------
    // begin = 2020-11-18T00:19:09.272
    // key = job
    // end = 2020-11-18T00:19:19.369
}

写完之后自然是传统编程思路点到为止,我把代码发给他,我打算放松一下,摸一摸鱼。

我本来想让他关注我,我想着他会主动去关注。

我大意了,没有说。

然后十分钟后他告诉我他搞定了,而且没有关注我。

我说同学你不讲规矩,你不懂。

他忙说对不起,我不懂规矩啊!

我说年轻人,

不讲规矩,

来,

白嫖!

我五年经验的小码农。

这好吗?这不好。

我劝这位同学,

耗子尾汁。

好好反思。

好好关注。

好好点赞。

好好评论。

要以和为贵,要讲规矩,

不要老是白嫖。

多多关注:码农小胖哥

谢谢同学们!

关注公众号:Felordcn 获取更多资讯

个人博客:https://felord.cn

查看原文

赞 4 收藏 0 评论 1

beanlam 关注了专栏 · 3月12日

金融级分布式架构SOFAStack

蚂蚁金服自主研发的分布式中间件(Scalable Open Financial Architecture)

关注 1047

beanlam 关注了用户 · 3月12日

SOFAStack @sofastack

SOFAStack™(Scalable Open Financial Architecture Stack)是一套用于快速构建金融级分布式架构的中间件,也是在金融场景里锤炼出来的最佳实践。

关注 528

beanlam 发布了文章 · 2019-10-11

阿里开源分布式事务组件 seata : 客户端事务执行逻辑分析

前言

先前在《阿里开源分布式事务组件 seata :demo 环境搭建以及运行流程简析》 这篇文章中已经提到过:
seata 客户端在处理事务逻辑的时候,实际上采用模板模式,委托给了 TransactionalTemplate 类去执行标准的事务处理流程,如下代码所示:

   public Object execute(TransactionalExecutor business) throws Throwable {
        // 1. get or create a transaction
        GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();

        // 1.1 get transactionInfo
        TransactionInfo txInfo = business.getTransactionInfo();
        if (txInfo == null) {
            throw new ShouldNeverHappenException("transactionInfo does not exist");
        }
        try {

            // 2. begin transaction
            beginTransaction(txInfo, tx);

            Object rs = null;
            try {

                // Do Your Business
                rs = business.execute();

            } catch (Throwable ex) {

                // 3.the needed business exception to rollback.
                completeTransactionAfterThrowing(txInfo,tx,ex);
                throw ex;
            }

            // 4. everything is fine, commit.
            commitTransaction(tx);

            return rs;
        } finally {
            //5. clear
            triggerAfterCompletion();
            cleanUp();
        }
    }

在客户端的事务处理流程中,流程比较清晰,处理流程也不复杂,除了客户端自身采用了一些机制,其实 seata 把比较“重”的逻辑都放在了 server 端。
比如说,开启一个全局事务时,事务 id 如何生成,事务的信息如何存储,这些是不需要客户端的关心的。

当我们使用标准的 JDBC 规范来处理单库数据库事务时,代码几乎都和下面是同一个模板:

conn.setAutocommit(false);
try {
    // 在这里使用 connection 进行 sql 操作;
    conn.xxxxxx
    //如果一切正常,则直接进行提交
    conn.commit();
} catch (Exception e) {
    conn.rollback();
}

虽然分布式事务和单机事务在“分布式” 和 “单机” 上区别很大,但在 “事务” 这个角度,却是相同的。
我们可以看到 seata 客户端的事务处理逻辑,跟单机事务的处理逻辑大同小异。
有差异的两个地方主要是:

  1. seata 中的全局事务如果提交失败,是不需要进行回滚,会有别的补救措施。
  2. 针对事务主体执行期间发生的异常,是不一定要回滚的,遇到有些异常可以直接提交,而有时候又可以直接忽略,不过这块跟具体场景有关系,默认出现了异常就是要回滚的。

事务信息

TransactionTemplate 处理事务时,必须知道事务的配置信息,包括:

  1. 超时时间
  2. 事务标识名称
  3. 回滚时机或者不回滚时机

这些信息主要通过在方法上的 @GlobalTransactional 注解携带进来,我们可以看一下这个注解包含了哪些属性:

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
@Inherited
public @interface GlobalTransactional {
    int timeoutMills() default TransactionInfo.DEFAULT_TIME_OUT;
    String name() default "";
    Class<? extends Throwable>[] rollbackFor() default {};
    String[] rollbackForClassName() default {};
    Class<? extends Throwable>[] noRollbackFor() default {};
    String[] noRollbackForClassName() default {};
}

这些信息由 TransactionExecutor 携带给 TransactionTemplate。
TransactionExecutor 这个类除了携带事务信息 TransactionInfo,它还携带了事务的执行主体,即标识了 @GlobalTransactional 注解的方法的方法体。
调用 TransactionExecutor 的 invoke 方法,就相当于执行了事务的执行主体。
TransactionExecutor 是一个接口,我们通过观察它的匿名实现类的构造方式,就能正式上面说的观点,例如 GlobalTransactionInterceptor 的 handleGlobalTransaction 方法:

    private Object handleGlobalTransaction(final MethodInvocation methodInvocation,
                                           final GlobalTransactional globalTrxAnno) throws Throwable {
        try {
            return transactionalTemplate.execute(new TransactionalExecutor() {
                @Override
                public Object execute() throws Throwable {
                    return methodInvocation.proceed();
                }

                public String name() {
                    String name = globalTrxAnno.name();
                    if (!StringUtils.isNullOrEmpty(name)) {
                        return name;
                    }
                    return formatMethod(methodInvocation.getMethod());
                }

                @Override
                public TransactionInfo getTransactionInfo() {
                    TransactionInfo transactionInfo = new TransactionInfo();
                    transactionInfo.setTimeOut(globalTrxAnno.timeoutMills());
                    transactionInfo.setName(name());
                    Set<RollbackRule> rollbackRules = new LinkedHashSet<>();
                    for (Class<?> rbRule : globalTrxAnno.rollbackFor()) {
                        rollbackRules.add(new RollbackRule(rbRule));
                    }
                    for (String rbRule : globalTrxAnno.rollbackForClassName()) {
                        rollbackRules.add(new RollbackRule(rbRule));
                    }
                    for (Class<?> rbRule : globalTrxAnno.noRollbackFor()) {
                        rollbackRules.add(new NoRollbackRule(rbRule));
                    }
                    for (String rbRule : globalTrxAnno.noRollbackForClassName()) {
                        rollbackRules.add(new NoRollbackRule(rbRule));
                    }
                    transactionInfo.setRollbackRules(rollbackRules);
                    return transactionInfo;
                }
            });
        } catch (TransactionalExecutor.ExecutionException e) {
            TransactionalExecutor.Code code = e.getCode();
            switch (code) {
                case RollbackDone:
                    throw e.getOriginalException();
                case BeginFailure:
                    failureHandler.onBeginFailure(e.getTransaction(), e.getCause());
                    throw e.getCause();
                case CommitFailure:
                    failureHandler.onCommitFailure(e.getTransaction(), e.getCause());
                    throw e.getCause();
                case RollbackFailure:
                    failureHandler.onRollbackFailure(e.getTransaction(), e.getCause());
                    throw e.getCause();
                default:
                    throw new ShouldNeverHappenException("Unknown TransactionalExecutor.Code: " + code);

            }
        }

全局事务的创建

  • 事务的发起者,需要创建全局事务,向 seata server 申请全局事务 id
  • 事务的参与者,则需要获取已经创建好的全局事务,以便将自己注册为正确全局事务下的一个事务分支,认清自己的身份....

GlobalTransactionContext 这个类负责上述说的两个功能,事务发起者调用它提供的 getCurrentOrCreate 方法创建全局事务,事务的参与者调用它的 getCurrent 方法获取当前所处的全局事务(实际上参与者也是根据全局事务 xid 进行创建)。
通过这些方法获取到的是一个 GlobalTransaction 接口的具体实现实例。代表了一个全局事务。
GlobalTransaction 提供了事务的操作流程 api,例如基本的 begin、commit 和 rollback,另外还有事务的状态,还有 server 分配的全局事务 id;
接口定义如下所示:

public interface GlobalTransaction {
    void begin() throws TransactionException;
    void begin(int timeout) throws TransactionException;
    void begin(int timeout, String name) throws TransactionException;
    void commit() throws TransactionException;
    void rollback() throws TransactionException;
    GlobalStatus getStatus() throws TransactionException;
    String getXid();
}

无论是参与者还是发起者,都需要在创建 GlobalTransaction 实例的时候,把它自己的身份讲清楚。由 GlobalTransactionRole 来定义这些角色。

全局事务 xid 的传播

全局事务的参与者和发起者是不要求部署在同一个操作系统环境下的,否则就分布式事务就谈不上分布式了。
因此,全局事务的发起者在向 server 注册了全局事务 id 后,必须通过某种方式把全局事务 xid 通过服务的调用链传下去。
如果 seata 是在 dubbo 的环境下运行的,那么通过 dubbo 调用方式(例如说泛化调用、或者可变参数、或者直接修改底层协议),就能够在服务调用时,将 xid 在整个服务调用链上传播。
在 seata 中,RootContext 这个类,就是用来保存参与者和发起者之间传播的 xid 的。传播流程大致如下:

  1. 发起者开启全局事务后,将 xid 塞进 RootContext 里
  2. 服务框架将 xid 沿着服务调用链一直传播,塞进每个参与者进程的 RootContext 里。
  3. 参与者发现 RootContext 里的 xid 存在时,它便知道自己处于全局事务中,并且知道 xid 是什么。

通过 GlobalTransactionContext 的 getCurrent 方法,我们可以看到这些事实:

    /**
     * Get GlobalTransaction instance bind on current thread.
     *
     * @return null if no transaction context there.
     */
    private static GlobalTransaction getCurrent() {
        String xid = RootContext.getXID();
        if (xid == null) {
            return null;
        }
        return new DefaultGlobalTransaction(xid, GlobalStatus.Begin, GlobalTransactionRole.Participant);
    }

与 server 端如何交互

GlobalTransaction 定义的事务操作 api,其具体实现类需要真正与服务端通信了。以 begin 这个 api 为例,它的默认实现类 DefaultGlobalTransaction 是这样实现的:

    public void begin(int timeout, String name) throws TransactionException {
        if (role != GlobalTransactionRole.Launcher) {
            check();
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Ignore Begin(): just involved in global transaction [" + xid + "]");
            }
            return;
        }
        if (xid != null) {
            throw new IllegalStateException();
        }
        if (RootContext.getXID() != null) {
            throw new IllegalStateException();
        }
        xid = transactionManager.begin(null, null, name, timeout);
        status = GlobalStatus.Begin;
        RootContext.bind(xid);
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Begin new global transaction [" + xid + "]");
        }
    }

实际上除了一些必要的条件检查和其它附带操作,它把与服务端具体的通信委托给了 TransactionManager 去做,那我们再看看 TransactionManager 做了什么事:

    @Override
    public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
        throws TransactionException {
        GlobalBeginRequest request = new GlobalBeginRequest();
        request.setTransactionName(name);
        request.setTimeout(timeout);
        GlobalBeginResponse response = (GlobalBeginResponse)syncCall(request);
        if (response.getResultCode() == ResultCode.Failed) {
            throw new TransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
        }
        return response.getXid();
    }

逻辑也十分简单,就是发送一个请求,然后接收一个响应。
不得不说,seata 在代码逻辑的抽象和结构层次这方面做得很好。

钩子及异常处理

seata 客户端通过 TransactionHook 这个接口,在一个事务的处理过程中,允许为它的各个阶段添加钩子逻辑

public interface TransactionHook {
    void beforeBegin();
    void afterBegin();
    void beforeCommit();
    void afterCommit();
    void beforeRollback();
    void afterRollback();
    void afterCompletion();
}

由于全局事务的提交和回滚依然是由可能失败的,比如说全局事务如果提交失败,意味着 undo_log 表里的数据目前是没有删除成功的,可能需要记录并在后面重试删除;
如果回滚失败,意味着当前这个全局事务属于异常结束,需要特殊处理、甚至人工介入。
seata 通过 FailureHandler 这个接口,提供了一个可扩展的点:

public interface FailureHandler {
    void onBeginFailure(GlobalTransaction tx, Throwable cause);
    void onCommitFailure(GlobalTransaction tx, Throwable cause);
    void onRollbackFailure(GlobalTransaction tx, Throwable cause);
}

扫一扫关注我的微信公众号

查看原文

赞 0 收藏 0 评论 0

beanlam 发布了文章 · 2019-10-09

阿里开源分布式事务组件 seata :demo 环境搭建以及运行流程简析

案例设计

seata 官方给出了一系列 demo 样例,不过我在用的过程中发现总有这个那个的问题,所以自己维护了一份基于 dubbo 的 demo 在 github 上,适配的 seata 版本是 0.8.0。
案例的设计直接参考官方 quick start给出的案例:
案例设计

整个案例分为三个服务,分别是存储服务、订单服务和账户服务,这些服务通过 dubbo 进行发布和调用,内部调用逻辑如上面图所示。
整个 demo 的工程样例如下所示:
工程样例

undo_log 表

这个案例除了在数据库需要建立业务表以外,还要额外建立一张 undo_log 表,这个表的主要作用是记录事务的前置镜像和后置镜像。
全局事务进行到提交阶段,则删除该表对应的记录,全局事务如果需要回滚,则会利用这个表里记录的镜像数据,恢复数据。
undo_log 表里的数据实际上是“朝生夕死”的,数据不需要在表里存活太久。表结构如下所示:

CREATE TABLE `undo_log` (
                          `id` bigint(20) NOT NULL AUTO_INCREMENT,
                          `branch_id` bigint(20) NOT NULL,
                          `xid` varchar(100) NOT NULL,
                          `context` varchar(128) NOT NULL,
                          `rollback_info` longblob NOT NULL,
                          `log_status` int(11) NOT NULL,
                          `log_created` datetime NOT NULL,
                          `log_modified` datetime NOT NULL,
                          `ext` varchar(100) DEFAULT NULL,
                          PRIMARY KEY (`id`),
                          UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

服务逻辑

每个服务都对应了一个 starter 类,这个类主要用来在 spring 环境下,将该服务启动,并通过 dubbo 发布出去,以账户服务为例:

/**
 * The type Dubbo account service starter.
 */
public class DubboAccountServiceStarter {
    /**
     * 2. Account service is ready . A buyer register an account: U100001 on my e-commerce platform
     *
     * @param args the input arguments
     */
    public static void main(String[] args) {
        ClassPathXmlApplicationContext accountContext = new ClassPathXmlApplicationContext(new String[]{"spring/dubbo-account-service.xml"});
        accountContext.getBean("service");
        JdbcTemplate accountJdbcTemplate = (JdbcTemplate) accountContext.getBean("jdbcTemplate");
        accountJdbcTemplate.update("delete from account_tbl where user_id = 'U100001'");
        accountJdbcTemplate.update("insert into account_tbl(user_id, money) values ('U100001', 999)");

        new ApplicationKeeper(accountContext).keep();
    }
}

首先通过 ClassPathXmlApplicationContext 读取 dubbo-account-service.xml 这个 spring 配置文件并启动 spring 容器环境,并通过 spring 的 jdbc template 对账户表的数据进行初始化。
dubbo-account-service.xml 配置文件中进行了各类 bean 的配置,包括 dubbo 与 spring 结合时的标准配置:

    <bean id="accountDataSourceProxy" class="io.seata.rm.datasource.DataSourceProxy">
        <constructor-arg ref="accountDataSource" />
    </bean>

    <bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
        <property name="dataSource" ref="accountDataSourceProxy" />
    </bean>

    <dubbo:application name="dubbo-demo-account-service" />
    <dubbo:registry address="zookeeper://localhost:2181" />
    <dubbo:protocol name="dubbo" port="20881" />
    <dubbo:service interface="io.seata.samples.dubbo.service.AccountService" ref="service" timeout="10000"/>

    <bean id="service" class="io.seata.samples.dubbo.service.impl.AccountServiceImpl">
        <property name="jdbcTemplate" ref="jdbcTemplate"/>
    </bean>

    <bean class="io.seata.spring.annotation.GlobalTransactionScanner">
        <constructor-arg value="dubbo-demo-account-service"/>
        <constructor-arg value="my_test_tx_group"/>
    </bean>

这份配置里主要有两个需要引起注意的关键点

  1. jdbcTemplate 这个 bean 所依赖的数据源 bean,是一个类名为 io.seata.rm.datasource.DataSourceProxy 的数据源类,通过它的名字可以很明显地看出这是一个代理模式的应用,因为 seata 为完成全局事务的逻辑,需要在普通的 sql 操作前后添加一些逻辑,比如说 sql 执行前对 sql 进行语法解析,生成前置镜像,sql 执行后生成后置镜像,通过代理的方式,可以方便地对 connection,statement 等进行代理包装,在调用的时候进行拦截,加入自己的逻辑。
  2. 配置文件中还有一个 io.seata.spring.annotation.GlobalTransactionScanner 类型的 bean,这个 bean 是支撑 seata 能在 spring 环境中通过注解的方式来划定事务边界的基础。在 spring 容器启动时,会扫描 @GlobalTransactional 注解是否存在,这个注解标识了全局事务的开始和结束,也就是我们常说的“事务的边界”

业务逻辑

业务逻辑的具体详情在 BusinessServiceImpl 类中可以看到:

    @Override
    @GlobalTransactional(timeoutMills = 300000, name = "dubbo-demo-tx")
    public void purchase(String userId, String commodityCode, int orderCount) {
        LOGGER.info("purchase begin ... xid: " + RootContext.getXID());
        storageService.deduct(commodityCode, orderCount);
        orderService.create(userId, commodityCode, orderCount);
        // throw new RuntimeException("xxx");
    }

先调用存储服务,减少库存,然后调用订单服务,新建订单。这两个动作属于一个整体的事务,任何一个动作失败,都需要撤销所有的操作。
这个方法也有两个需要注意的点:

  1. 该方法上声明了 @GlobalTransactional(timeoutMills = 300000, name = "dubbo-demo-tx") 这样的注解,用于让上文提到的 GlobalTransactionScanner 扫描的时候发现这是一个全局事务。
  2. 方法的最后有一行代码抛出了 RuntimeException,这主要是为了模仿全局事务的失败,并让 seata 走全局事务回滚逻辑。

事务扫描与边界定义

上文提到的 GlobalTransactionScanner 类,会在 spring 容器启动的时候,也被初始化。
在它的 afterPropertiesSet 方法被调用时,会触发 seata client 的初始化

    @Override
    public void afterPropertiesSet() {
        if (disableGlobalTransaction) {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Global transaction is disabled.");
            }
            return;
        }
        initClient();

    }

关于 seata client 的初始化的细节,可以看看我写的另外一篇文章《阿里开源分布式事务组件 seata :seata client 通信层解析》
初始化客户端做的事情主要是建立与 seata server 的连接,并注册 TM 和 RM。接下来,在 wrapIfNecessary 方法里,实现对注解的扫描,并对添加了注解的方法添加 interceptor。
这篇文章里我们暂时不讨论 TCC 模式,只讨论 AT 模式,也暂不讨论全局事务锁 GlobalLock 的实现,先忽略这些有关的逻辑,只关注事务处理逻辑。

                    Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
                    Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
                    if (!existsAnnotation(new Class[] {serviceInterface})
                        && !existsAnnotation(interfacesIfJdk)) {
                        return bean;
                    }
                    if (interceptor == null) {
                        interceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
                    }

在这里,interceptor 的实现是 GlobalTransactionalInterceptor,也就是说,以上文的案例为例子,当 BusinessServiceImpl 的 purchase 方法被调用的时候,实际上这个方法会被拦截器拦截,执行拦截器里的逻辑:

    @Override
    public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
        Class<?> targetClass = (methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null);
        Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
        final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);

        final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, GlobalTransactional.class);
        final GlobalLock globalLockAnnotation = getAnnotation(method, GlobalLock.class);
        if (globalTransactionalAnnotation != null) {
            return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
        } else if (globalLockAnnotation != null) {
            return handleGlobalLock(methodInvocation);
        } else {
            return methodInvocation.proceed();
        }
    }

    private Object handleGlobalTransaction(final MethodInvocation methodInvocation,
                                           final GlobalTransactional globalTrxAnno) throws Throwable {
        try {
            return transactionalTemplate.execute(new TransactionalExecutor() {
                @Override
                public Object execute() throws Throwable {
                    return methodInvocation.proceed();
                }

                public String name() {
                    String name = globalTrxAnno.name();
                    if (!StringUtils.isNullOrEmpty(name)) {
                        return name;
                    }
                    return formatMethod(methodInvocation.getMethod());
                }

                @Override
                public TransactionInfo getTransactionInfo() {
                    TransactionInfo transactionInfo = new TransactionInfo();
                    transactionInfo.setTimeOut(globalTrxAnno.timeoutMills());
                    transactionInfo.setName(name());
                    Set<RollbackRule> rollbackRules = new LinkedHashSet<>();
                    for (Class<?> rbRule : globalTrxAnno.rollbackFor()) {
                        rollbackRules.add(new RollbackRule(rbRule));
                    }
                    for (String rbRule : globalTrxAnno.rollbackForClassName()) {
                        rollbackRules.add(new RollbackRule(rbRule));
                    }
                    for (Class<?> rbRule : globalTrxAnno.noRollbackFor()) {
                        rollbackRules.add(new NoRollbackRule(rbRule));
                    }
                    for (String rbRule : globalTrxAnno.noRollbackForClassName()) {
                        rollbackRules.add(new NoRollbackRule(rbRule));
                    }
                    transactionInfo.setRollbackRules(rollbackRules);
                    return transactionInfo;
                }
            });
        } catch (TransactionalExecutor.ExecutionException e) {
            TransactionalExecutor.Code code = e.getCode();
            switch (code) {
                case RollbackDone:
                    throw e.getOriginalException();
                case BeginFailure:
                    failureHandler.onBeginFailure(e.getTransaction(), e.getCause());
                    throw e.getCause();
                case CommitFailure:
                    failureHandler.onCommitFailure(e.getTransaction(), e.getCause());
                    throw e.getCause();
                case RollbackFailure:
                    failureHandler.onRollbackFailure(e.getTransaction(), e.getCause());
                    throw e.getCause();
                default:
                    throw new ShouldNeverHappenException("Unknown TransactionalExecutor.Code: " + code);
            }
        }
    }

在执行 handleGlobalTransaction 方法时,实际上采用模板模式,委托给了 TransactionalTemplate 类去执行标准的事务处理流程。如下所示:

   /**
     * Execute object.
     *
     * @param business the business
     * @return the object
     * @throws TransactionalExecutor.ExecutionException the execution exception
     */
    public Object execute(TransactionalExecutor business) throws Throwable {
        // 1. get or create a transaction
        GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();

        // 1.1 get transactionInfo
        TransactionInfo txInfo = business.getTransactionInfo();
        if (txInfo == null) {
            throw new ShouldNeverHappenException("transactionInfo does not exist");
        }
        try {

            // 2. begin transaction
            beginTransaction(txInfo, tx);

            Object rs = null;
            try {

                // Do Your Business
                rs = business.execute();

            } catch (Throwable ex) {

                // 3.the needed business exception to rollback.
                completeTransactionAfterThrowing(txInfo,tx,ex);
                throw ex;
            }

            // 4. everything is fine, commit.
            commitTransaction(tx);

            return rs;
        } finally {
            //5. clear
            triggerAfterCompletion();
            cleanUp();
        }
    }

事务处理逻辑实际上是一种模板,将事务相关的处理逻辑放在 try 块里,发现异常后执行回滚,正常执行则执行提交。
在这里有个需要注意的地方是,seata 不把提交这个动作放在 try 块里,因为在 seata 里,全局事务的提交实际上是可以异步执行的。
因为全局事务如果进行到提交这一阶段,那么意味着各个分支事务已经执行过本地提交,全局事务的提交阶段仅仅是删除 undo_log 里的记录,这个记录删除或者不删除,实际上不会改变全局事务已经正常完成的事实。所以它可以用程序异步去做,或者以人工介入的方式去做,所以 seata 认为,全局事务提交失败,不需要执行回滚流程。

扫一扫关注我的微信公众号

查看原文

赞 0 收藏 0 评论 0

beanlam 发布了文章 · 2019-09-20

阿里开源分布式事务组件 seata :seata client 通信层解析

前言

之前在《阿里开源分布式事务组件 seata :seata server 通信层解析》这篇文章中,站在 server 端的角度简单分析过 seata 的网络通信模块。
上篇文章中提到了一些概念,例如说:异步转同步的 Future 机制,防洪机制,以及消息队列机制等,并非只是服务端才具有的功能特性,实际上对于客户端来说也同样适用,它们是整个通用的通信机制。
这篇文章讲讲 client 角度的通信细节,以及在上篇文章中未提及的一些关键点,算是对 seata 整个 client-server 通信模块的一个总结。
已经在上篇文章提及的概念在这篇文章中就不再赘述。

客户端角色

在 seata 中,它对客户端是有角色区分的,它把客户端分为两种角色,分别是 TM 和 RM。
在 seata 的设计中, TM 是全局事务的发起者, RM 则是全局事务的参与者。
这两种角色在进行启动的时候,都要向 server 进行注册,这个注册也可以等同于一个认证的作用,只有已经注册的客户端才可以进行其它操作。
不过注册的意义不仅仅在于认证,特别是对于 RM 来说,很关键的一点是要向 server 注册它的 resourceId,这一点十分重要,它关系到 RM 的高可用问题。
如果某个 RM 停机了,而全局事务还未进行完,这个时候就可以采用委托给其它 RM 的方式,保证全局事务正常结束,这里一个关键的地方,就是 resourceId。
这篇文章暂时不在这里展开,后面再单独写一写这里的设计。

《阿里开源分布式事务组件 seata :seata server 通信层解析》这篇文章中,我们已经看到,无论是客户端还是服务端的实现,它们都以 AbstractRpcRemoting 作为抽象骨架。
AbstractRpcRemoting 这个类包含了客户端和服务端的公共逻辑,包括

  1. 消息的发送和接收逻辑
  2. future 机制
  3. 防洪机制

连接池

这些在前面的文章里已经有提到过了。
客户端的实现细节主要是在 AbstractRpcRemotingClient 类里,它继承自 AbstractRpcRemoting 类。
因为使用的是 netty,所以必然会有 Bootstrap 逻辑,seata 把这一块的逻辑单独放到 RpcClientBootstrap 这个类里去,然后只对外提供一个 getNewChannel 方法。
这样来看,创建连接的细节已经被良好的封装,并且 RpcClientBootstrap 的职责也非常明确,那就是建立新连接。
不过大多数客户端都会引入连接池的设计,以达到对连接的重用,避免频繁创建和销毁连接带来的额外开销。seata 也用了连接池管理机制。
不过 seata 的连接池管理是借助 apache 的 common pools 来实现的,common pools 是一个对象池,这里把一条连接 channel 当作一个对象,就变成了连接池了。
借助 common pools 实现对象池,需要实现 KeyedPoolableObjectFactory 接口对应的相关方法,关键是以下三个方法:

V makeObject(K key) throws Exception; //方法定义了新对象是如何创建的。
void destroyObject(K key, V obj) throws Exception; //方法定义了对象应该怎么样去销毁。
boolean validateObject(K key, V obj);  //方法定义了如何检测对象的有效性。

另外还要指定对象的类型,以及用于存取对象的 key 的类型。

NettyPoolableFactory 这个类便实现了 KeyedPoolableObjectFactory 接口,并且定义了对象的类型是 Channel,而 key 是 NettyPoolKey。
makeObject 的实现主要分解为几个步骤,第一个是建立新连接:

  InetSocketAddress address = NetUtil.toInetSocketAddress(key.getAddress());
  if (LOGGER.isInfoEnabled()) {
   LOGGER.info("NettyPool create channel to " + key);
  }
  Channel tmpChannel = clientBootstrap.getNewChannel(address);
  long start = System.currentTimeMillis();
  Object response;
  Channel channelToServer = null;
  if (null == key.getMessage()) {
   throw new FrameworkException(
     "register msg is null, role:" + key.getTransactionRole().name());
  }

接下来,对连接进行注册,如果注册失败,则会抛出异常,并关闭刚刚建立的连接,这意味着一个连接建立以后,必须注册成功才能使用。如下代码所示:

  try {
   response = rpcRemotingClient.sendAsyncRequestWithResponse(tmpChannel, key.getMessage());
   if (!isResponseSuccess(response, key.getTransactionRole())) {
    rpcRemotingClient.onRegisterMsgFail(key.getAddress(), tmpChannel, response, key.getMessage());
   } else {
    channelToServer = tmpChannel;
    rpcRemotingClient.onRegisterMsgSuccess(key.getAddress(), tmpChannel, response,
      key.getMessage());
   }
  } catch (Exception exx) {
   if (tmpChannel != null) {
    tmpChannel.close();
   }
   throw new FrameworkException(
     "register error,role:" + key.getTransactionRole().name() + ",err:" + exx.getMessage());
  }

在上面我们也可以看到,注册消息是通过 NettyPoolKey 携带进来的。

销毁对象池的对象,在这里,其实就是关闭连接,而验证对象的有效性,其实就是验证连接的有效性。这两个方法的实现比较简单:

 @Override
 public void destroyObject(NettyPoolKey key, Channel channel) throws Exception {

  if (null != channel) {
   if (LOGGER.isInfoEnabled()) {
    LOGGER.info("will destroy channel:" + channel);
   }
   channel.disconnect();
   channel.close();
  }
 }


 @Override
 public boolean validateObject(NettyPoolKey key, Channel obj) {
  if (null != obj && obj.isActive()) {
   return true;
  }
  if (LOGGER.isInfoEnabled()) {
   LOGGER.info("channel valid false,channel:" + obj);
  }
  return false;
 }

连接管理

NettyClientChannelManager 这个类集成连接池机制和建立新连接的实现以后,成为了一个连接管理器。
当根据内部逻辑,需要向服务端发送信息时,都会通过 NettyClientChannelManager 获取连接。
虽然 NettyClientChannelManager 已经集成了一个连接池,不过它还是内部维护了 channel 的缓存

private final ConcurrentMap<String, Channel> channels = new ConcurrentHashMap<>();

当外部调用它来获取连接时,它先从自己的缓存里找,如果找到了,先验证channel 是否有效性,如果有效,直接返回这个 channel。
如果 channel 已经无效,或者没有缓存,那么就会委托连接池去建立新连接。这些细节在它的 acquireChannel 方法里:

    /**
     * Acquire netty client channel connected to remote server.
     *
     * @param serverAddress server address
     * @return netty channel
     */
    Channel acquireChannel(String serverAddress) {
        Channel channelToServer = channels.get(serverAddress);
        if (channelToServer != null) {
            channelToServer = getExistAliveChannel(channelToServer, serverAddress);
            if (null != channelToServer) {
                return channelToServer;
            }
        }
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("will connect to " + serverAddress);
        }
        channelLocks.putIfAbsent(serverAddress, new Object());
        synchronized (channelLocks.get(serverAddress)) {
            return doConnect(serverAddress);
        }
    }

这里也可以看到,如果需要建立新连接,是需要加锁做同步的,NettyClientChannelManager 内部也维护了一串锁。
下面看看建立连接的细节,前面我们说到 注册消息是通过 NettyPoolKey 携带进来的。
在建立连接时,需要生成 NettyPoolKey, seata 在这里应用了 java8 的一个新的特性 Function,即函数式接口。
也就是说,一个 NettyPoolKey 如何生成,由外部传入一个 函数 来决定的。并不需要 NettyClientChannelManager 本身去关心,这个生成方式相关的变量就是

    private Function<String, NettyPoolKey> poolKeyFunction;

它作为 NettyClientChannelManager 的构造函数的一个参数被传进来。
对于 RM Client 来说,它的 NettyPoolKey 的生成方式如下所示:

    @Override
    protected Function<String, NettyPoolKey> getPoolKeyFunction() {
        return (serverAddress) -> {
            String resourceIds = customerKeys == null ? getMergedResourceKeys() : customerKeys;
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("RM will register :" + resourceIds);
            }
            RegisterRMRequest message = new RegisterRMRequest(applicationId, transactionServiceGroup);
            message.setResourceIds(resourceIds);
            return new NettyPoolKey(NettyPoolKey.TransactionRole.RMROLE, serverAddress, message);
        };
    }

它同时也决定了 NettyPoolKey 里面携带的 message 到底是 RegisterRMRequest 还是 RegisterTMRequest。

同理,对于 TM Client 来说,它也有它自己的 NettyPoolKey 生成规则:

    @Override
    protected Function<String, NettyPoolKey> getPoolKeyFunction() {
        return (severAddress) -> {
            RegisterTMRequest message = new RegisterTMRequest(applicationId, transactionServiceGroup);
            return new NettyPoolKey(NettyPoolKey.TransactionRole.TMROLE, severAddress, message);
        };
    }

函数式编程的特点突然给人一种 IoC 的感觉。

那么回到 NettyClientChannelManager 建立连接的逻辑,上面我们已经解释过一些函数式和 NettyPoolKey 的背景了,直接看它的 doConnect 方法:

    private Channel doConnect(String serverAddress) {
        Channel channelToServer = channels.get(serverAddress);
        if (channelToServer != null && channelToServer.isActive()) {
            return channelToServer;
        }
        Channel channelFromPool;
        try {
            NettyPoolKey currentPoolKey = poolKeyFunction.apply(serverAddress);
            NettyPoolKey previousPoolKey = poolKeyMap.putIfAbsent(serverAddress, currentPoolKey);
            if (null != previousPoolKey && previousPoolKey.getMessage() instanceof RegisterRMRequest) {
                RegisterRMRequest registerRMRequest = (RegisterRMRequest) currentPoolKey.getMessage();
                ((RegisterRMRequest) previousPoolKey.getMessage()).setResourceIds(registerRMRequest.getResourceIds());
            }
            channelFromPool = nettyClientKeyPool.borrowObject(poolKeyMap.get(serverAddress));
            channels.put(serverAddress, channelFromPool);
        } catch (Exception exx) {
            LOGGER.error(FrameworkErrorCode.RegisterRM.getErrCode(), "register RM failed.", exx);
            throw new FrameworkException("can not register RM,err:" + exx.getMessage());
        }
        return channelFromPool;
    }

可以看到,其实逻辑没什么特别的,就是去 对象池里新生成个对象,创建一个新的连接而已。不过这里也有可以注意的地方:
NettyClientChannelManager 不仅缓存了连接,维护了锁,它也缓存 NettyPoolKey,如果它发现要新建连接,并且 NettyPoolKey 在之前就缓存过的话,它会看看是否是 RM Client 建立的连接,如果是,那么需要更新一下 resource ids。前面我们有说到,RM 注册时,注册 resource ids 是非常重要的。

另外,NettyClientChannelManager 里还有一个 reconnect 方法,它是在 AbstractRpcRemotingClient 初始化的时候,作为一个定时任务运行的

    @Override
    public void init() {
        clientBootstrap.start();
        timerExecutor.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                clientChannelManager.reconnect(getTransactionServiceGroup());
            }
        }, SCHEDULE_INTERVAL_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.SECONDS);
        mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
            MAX_MERGE_SEND_THREAD,
            KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(),
            new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
        mergeSendExecutorService.submit(new MergedSendRunnable());
        super.init();

它的作用,实际上是去注册中心查找当前服务组下有哪些可用的 server,定时检查与这些 server 之间的连接是否可用,如果不可用,则立马新建连接并缓存。

消息打包

在这个里我们也可以看到 mergeSendExecutorService 这个线程池,之前的文章《阿里开源分布式事务组件 seata :seata server 通信层解析》讲过这里,实际上它是消息队列机制,用来合并消息,再进行发送。我们在之前的文章里也提示过,seata server端的实现里,并没有定时去清除 basketMap,实际上是因为 server 端在通信时,根本没有用到这个 basketMap。
而客户端则有可能会用到,所以这里专门有一个线程池,去清空 basketMap 里的打包消息。

对于 seata 的通信模式和细节大体上就这些,本文到次结束。

扫一扫关注我的微信公众号

查看原文

赞 1 收藏 1 评论 0

beanlam 发布了文章 · 2019-09-19

阿里开源分布式事务组件 seata : 配置机制简析

seata 的客户端代码和服务端代码逻辑里,读取配置时统一采用的以下这种 API

ConfigurationFactory.getInstance().getString()

seata 目前(0.8.0)支持以下几种配置方式

  1. 本地文件方式
  2. zookeeper
  3. nacos
  4. apollo
  5. consul
  6. etcd

在分析 seata 的配置解析细节之前,先看看 seata 对于配置解析机制的设计
具体来说,seata 的配置项的命名风格都是类似于 computer.apple.macbookpro 这种的文本扁平化风格。
它本质上还是一个结构形的配置方式,即每个具体配置项都有父节点,举个例子:

computer.apple {
    macbookpro = 12000
    macbookair = 8000
}

这种配置方式与普通的 xml 配置文件在结构上没有什么大的区别,但与 xml 相比 还是有不少的好处和优点。

  1. 简洁明了,xml 本质上还是个标记型语言,引入了许多不必要的复杂标记,间接增加了解析成本
  2. 解析阶段,xml 定位到某个配置项的逻辑更加繁琐
  3. 配置更新时如果需要更新文件,xml 的文件的更新动作不够轻量级,如果依赖一些第三方实现,还会造成代码入侵,可扩展性差。

相比之下,目前主流的配置中心例如 zookeeper 或者 apollo,这些软件设计之初对数据结构的选型就与 computer.apple.macbookpro 这样的配置形态很相称。
例如,zookeeper 的数据结构是类似于文件系统的树状风格,所以 zookeeper 之前是很适合拿来做公共配置中心的,直到后面更优秀的配置中心出现,zookeeper 才慢慢淡出配置界,毕竟 zookeeper 它更擅长做的事情是分布式一致性的协调。
正是 seata 采用了这种配置风格,所以 seata 在配置中心的支持这一块,就很方便地与当前主流的配置中心做集成。

试想一下,如果要把一个 xml 配置文件存到 zookeeper 上做全局管理。大概有这么两种方式吧:

  1. 把整个 xml 文本存到一个 znode 上
  2. 把 xml 的结构解析称树状结构,再一个一个对应地到 zookeeper 上创建节点

第一种方式,显然很 low 哦。干脆存到数据库算了。
第二种方式,会带来额外的解析成本,并且不容易做配置变更这样的逻辑,因为一个配置项的变更,意味着要重新在内存里生成整个 xml 文本,然后再写进本地配置文件里面。

下面说一说 seata 的配置机制,因为 seata 支持上述这么多主流的配置中心,但是实际上使用的时候,必须也只能用一个。
因此,一个 seata 相关的进程启动的时候,必然要从本地某个地方直到要用什么配置中心。
而实际上,seata 是先读取本地的一个配置文件 registry.conf,再决定要用什么样的配置方式的。
这个逻辑在我们上面提到的 API ConfigurationFactory.getInstance() 的源码里可以看到具体的细节。

下面是 buildConfiguration 方法:

    private static Configuration buildConfiguration() {
        ConfigType configType = null;
        String configTypeName = null;
        try {
            // 这里读取的是本地 registry.conf 配置文件,获取配置中心的类型
            configTypeName = CURRENT_FILE_INSTANCE.getConfig(
                ConfigurationKeys.FILE_ROOT_CONFIG + ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR
                    + ConfigurationKeys.FILE_ROOT_TYPE);  
            configType = ConfigType.getType(configTypeName);
        } catch (Exception e) {
            throw new NotSupportYetException("not support register type: " + configTypeName, e);
        }
        if (ConfigType.File == configType) {
            String pathDataId = ConfigurationKeys.FILE_ROOT_CONFIG + ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR
                + FILE_TYPE + ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR
                + NAME_KEY;
            String name = CURRENT_FILE_INSTANCE.getConfig(pathDataId);
            return new FileConfiguration(name);
        } else {
            return EnhancedServiceLoader.load(ConfigurationProvider.class, Objects.requireNonNull(configType).name())
                .provide();
        }
    }

可以看到,seata 先读取本地的一个配置文件,默认是读取 registry.conf 文件,再从这个文件里面读取具体的配置类型。
那么 registry.conf 里面关于配置中心的配置是怎么样的呢,大概像这样:

config {
  # file、nacos 、apollo、zk、consul、etcd3
  type = "file"

  nacos {
    serverAddr = "localhost"
    namespace = "public"
    cluster = "default"
  }
  consul {
    serverAddr = "127.0.0.1:8500"
  }
  apollo {
    app.id = "seata-server"
    apollo.meta = "http://192.168.1.204:8801"
  }
  zk {
    serverAddr = "127.0.0.1:2181"
    session.timeout = 6000
    connect.timeout = 2000
  }
  etcd3 {
    serverAddr = "http://localhost:2379"
  }
  file {
    name = "file.conf"
  }
}

buildConfiguration() 方法会先读取 type,再根据 type 继续读取这个 type 所对应的具体配置。
比如说,type 如果是 file 类型,那么就会读取 config.file.name,获取到本地配置文件的名字,然后再去读取具体的配置文件。
如果 type 是 zk,那么就会读取 config.zk.* ,获取到 zk 的地址信息,然后通过请求 zk 的方式获取存放在 zk 上的信息。

读取配置时,主要是通过 Configuration 接口去获取,Configuration 接口有许多不同的实现,例如 FileConfiguration 或者 ZooKeeperConfiguration,分别代表不同的配置中心。

通过 buildConfiguration() 代码的逻辑可以看出,除了本地文件配置方式,其它配置方式都是采用 SPI 的服务发现机制进行扩展的。
不过这个 SPI 并不是直接用的 JDK 本身自带的 SPI 机制,而是 seata 自己实现的一种比 JDK SPI 功能更多的 SPI 机制,不过两者的主要思想是一样的。
阿里系的很多中间件软件都很喜欢用 SPI 机制,比如说 Dubbo,也是疯狂地 SPI。
这种自己实现的 SPI 机制的好处在于灵活性比较强,比如说可以自定义注解来标识实现类的优先级。

Configuration 接口定义的方法基本上是很基本的 getInt 或者 getString 这类方法,例如:

    /**
     * Gets int.
     *
     * @param dataId the data id
     * @param defaultValue the default value
     * @param timeoutMills the timeout mills
     * @return the int
     */
    int getInt(String dataId, int defaultValue, long timeoutMills);

    /**
     * Gets int.
     *
     * @param dataId the data id
     * @param defaultValue the default value
     * @return the int
     */
    int getInt(String dataId, int defaultValue);

这里有个需要注意的地方是,这些方法都有两个不同的参数列表,多了一个 timeoutMillis。
因为读取配置已经不能假设在本地文件读取来,而是有可能通过网络去某个注册中心读取,因为需要经过网络,那么必然会有读取不到的情况,这个参数是用来限制配置读取的超时时间。

接下来以 FileConfiguration 这个实现来看看 seata 的一些配置解析细节。
首先,本地配置文件的读取, seata 引用的是 typesafe 公司的一个解析库叫做 Config,这个解析库支持的配置风格,就是包括上文展示的 registry.conf 的这种类似于 json 的风格。
其官方介绍是这样的:纯Java写成、零外部依赖、代码精简、功能灵活、API友好。支持Java properties、JSON、JSON超集格式HOCON以及环境变量。
不过,它有一个重要的功能未实现,那就是配置文件的写入,所以目前 seata 也没有配置变更后更新本地配置文件的功能。
可以看一下 ConfigOperateRunnable 的 run 方法:

        @Override
        public void run() {
            if (null != configFuture) {
                if (configFuture.isTimeout()) {
                    setFailResult(configFuture);
                    return;
                }
                try {
                    if (configFuture.getOperation() == ConfigOperation.GET) {
                        String result = fileConfig.getString(configFuture.getDataId());
                        configFuture.setResult(result);
                    } else if (configFuture.getOperation() == ConfigOperation.PUT) {
                        //todo
                        configFuture.setResult(Boolean.TRUE);
                    } else if (configFuture.getOperation() == ConfigOperation.PUTIFABSENT) {
                        //todo
                        configFuture.setResult(Boolean.TRUE);
                    } else if (configFuture.getOperation() == ConfigOperation.REMOVE) {
                        //todo
                        configFuture.setResult(Boolean.TRUE);
                    }
                } catch (Exception e) {
                    setFailResult(configFuture);
                    LOGGER.warn("Could not found property {}, try to use default value instead.",
                        configFuture.getDataId());
                }
            }
        }

赫然写着 TODO。

前面提到来读取配置有超时,那 seata 读取的时候怎么样做到检测超时呢,从下面这个方法入手:

    @Override
    public String getConfig(String dataId, String defaultValue, long timeoutMills) {
        String value;
        if ((value = getConfigFromSysPro(dataId)) != null) {
            return value;
        }
        ConfigFuture configFuture = new ConfigFuture(dataId, defaultValue, ConfigOperation.GET, timeoutMills);
        configOperateExecutor.submit(new ConfigOperateRunnable(configFuture));
        return (String)configFuture.get();
    }

每次读取配置时,都会读取封装成一个任务,扔给一个指定的线程池,再通过 ConfigFuture 去获取结果,ConfigFuture 是支持超时时间的设置的,只不过这里 FileConfiguration 没有在乎这个超时时间,毕竟超时是用来针对需要通过网络访问的第三方配置中心的。
这里用到了一个“异步转同步”的思想,也比较好地运用了 Future 机制。
在我的另一篇文章中有专门讲到了 seata 的这种机制《阿里开源分布式事务组件 seata :seata server 通信层解析》

此外,Configuration 接口还定义了配置变更监听器相关的接口,允许监听某个配置项的变更。不过这块实现还不是很完整,比如说 zookeeper 的监听器目前还没有任何一种实现。

本文完。
扫一扫关注我的微信公众号

查看原文

赞 0 收藏 0 评论 3

beanlam 关注了用户 · 2019-09-02

良许 @liangxu_5d63396040bb5

公众号:良许Linux

关注 1721

beanlam 发布了文章 · 2019-05-21

阿里开源分布式事务组件 seata :seata server 通信层解析

RPC ?

seata client 和 seata server 间是需要通过网络通信来传递信息的,client 发送请求消息给 server,server 根据实际的处理逻辑,可能会给 client 发送相应的响应消息,或者不响应任何消息。在 seata 中,客户端和服务端的通信实现,被抽象成来公共的模块,它的 package 位于 io.seata.core.rpc 中。

这个包名叫 rpc,这个包下的很多类名也有 rpc 相关的字眼,而实际上在我看来,这个通信框架并不是一个常规意义的 rpc 框架,如果硬要揪书本知识,那么 rpc 的解释如下:

远程过程调用(英语:Remote Procedure Call,缩写为 RPC)是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程。如果涉及的软件采用面向对象编程,那么远程过程调用亦可称作远程调用或远程方法调用。

在以 dubbo 为代表的微服务时代下,dubbo 常规意义上我们都称之为 rpc 框架,rpc 的理论原则是:程序员无需额外地为这个交互作用编程。那么对于像 dubbo 这样的 rpc 实现,它能让 client 像调用本地代码 api 一样,来调用远程 server 上的某个 method。
在 client 这一层直接面向 interface 编程,通过动态代理的方式,对上层屏蔽掉通信细节,在底层,将方法调用,通过序列化方式,封装成一个二进制数据串发送给 server,server 层解析该消息,通过反射的方式,将 interface 对应的 implemention 执行起来,将执行结果,扁平化成一个二进制数据串,回送给 client,client 收到数据后,拼装成 interface api 所定义的返回值类型的一个实例,作为方法调用的返回值。整个底层的细节,应用层面并不需要了解,应用层只需要以 interface.method 的方式,就像代码在本地执行一样,就能把远端 interface_implemention.method 给调用起来。
rpc

而 seata 的 rpc 框架上,实际上仅仅是一个普通的基于 netty 的网络通信框架,client 与 server 之间通过发送 request 和 response 来达到相互通信的目的,在 seata 中的每个 request 和 response 类,都实现了如何把自己序列化的逻辑。
各种消息类型,都实现了 io.seata.core.protocol.MessageCodec 接口

public interface MessageCodec {
    /**
     * Gets type code.
     *
     * @return the type code
     */
    short getTypeCode();

    /**
     * Encode byte [ ].
     *
     * @return the byte [ ]
     */
    byte[] encode();

    /**
     * Decode boolean.
     *
     * @param in the in
     * @return the boolean
     */
    boolean decode(ByteBuf in);
}

MessageCodec

io.seata.core.protocol.GlobalBeginRequest 为例,它都 decode 和 encode 实现如下所示:

@Override
public byte[] encode() {
    ByteBuffer byteBuffer = ByteBuffer.allocate(256);
    byteBuffer.putInt(timeout);

    if (this.transactionName != null) {
        byte[] bs = transactionName.getBytes(UTF8);
        byteBuffer.putShort((short)bs.length);
        if (bs.length > 0) {
            byteBuffer.put(bs);
        }
    } else {
        byteBuffer.putShort((short)0);
    }

    byteBuffer.flip();
    byte[] content = new byte[byteBuffer.limit()];
    byteBuffer.get(content);
    return content;
}

@Override
public void decode(ByteBuffer byteBuffer) {
    this.timeout = byteBuffer.getInt();

    short len = byteBuffer.getShort();
    if (len > 0) {
        byte[] bs = new byte[len];
        byteBuffer.get(bs);
        this.setTransactionName(new String(bs, UTF8));
    }
}

这意味着,发送方先对 message 做 encode 动作形成字节数组,将字节数组发往接收方,接收方收到字节数组后,对字节数组先判断 message type,再用对应的 message 类型对字节数组做 decode 动作。

类的组织形式

从 seata server 的入口类 io.seata.server.Server 分析,main 方法如下所示:

/**
 * The entry point of application.
 *
 * @param args the input arguments
 * @throws IOException the io exception
 */
public static void main(String[] args) throws IOException {
    RpcServer rpcServer = new RpcServer(WORKING_THREADS);

    int port = SERVER_DEFAULT_PORT;
    //server port
    if (args.length > 0) {
        try {
            port = Integer.parseInt(args[0]);
        } catch (NumberFormatException e) {
            System.err.println("Usage: sh services-server.sh $LISTEN_PORT $PATH_FOR_PERSISTENT_DATA");
            System.exit(0);
        }
    }
    rpcServer.setListenPort(port);

    //log store mode : file、db
    String storeMode = null;
    if (args.length > 1) {
        storeMode = args[1];
    }

    UUIDGenerator.init(1);
    SessionHolder.init(storeMode);

    DefaultCoordinator coordinator = new DefaultCoordinator(rpcServer);
    coordinator.init();
    rpcServer.setHandler(coordinator);
    // register ShutdownHook
    ShutdownHook.getInstance().addDisposable(coordinator);

    if (args.length > 2) {
        XID.setIpAddress(args[2]);
    } else {
        XID.setIpAddress(NetUtil.getLocalIp());
    }
    XID.setPort(rpcServer.getListenPort());

    rpcServer.init();

    System.exit(0);
}

可以看到 seata server 使用一个 RpcServer 类来启动它的服务监听端口,这个端口用来接收 seata client 的消息,RpcServer 这个类是通信层的实现分析的入口。
在这里,SessionHolder 用来做全局事务树的管理,DefaultCoordinator 用来处理事务执行逻辑,而 RpcServer 是这两者可以正常运行的基础,这篇文章的重点在于剖析 RpcServer 的实现,进而延伸到 seata 整个通信框架的细节。
如果先从 RpcServer 的类继承图看的话,那么我们能发现一些与常规思维不太一样的地方,类继承图如下:

继承体系

褐色部分是 netty 的类,灰色部分是 seata 的类。
在一般常规的思维中,依赖 netty 做一个 server,大致的思路是:

  1. 定义一个 xxx server 类
  2. 在这个类中设置初始化 netty bootstrap,eventloop,以及设置相应的 ChannelHandler

在这种思维下,很容易想到,server 与 ChannelHandler 之间的关系应该是一个“组合”的关系,即在我们构建 server 的过程中,应该把 ChannelHandler 当作参数传递给 server,成为 server 类的成员变量。
没错,这是我们一般情况下的思维。不过 seata 在这方面却不那么“常规”,从上面的类继承图中可以看到,从 RpcServer 这个类开始向上追溯,发现它其实是 ChannelDuplexHandler 的一个子类或者实例。这种逻辑让人一时很困惑,一个问题在我脑海里浮现:“当我启动一个 RpcServer 的时候,我是真的在启动一个 server 吗?看起来我好像在启动一个 ChannelHandler,可是 ChannelHandler 怎么谈得上‘启动’呢?”

异步转同步的 Future 机制

首先分析 AbstractRpcRemoting 这个类,它直接继承自 ChannelDuplexHandler 类,而 ChannelDuplexHandler 是 netty 中 inbound handler 和 outbound handler 的结合体。
AbstractRpcRemoting 的 init 方法里,仅仅通过 Java 中的定时任务执行线程池启动了一个定时执行的任务:

/**
 * Init.
 */
public void init() {
    timerExecutor.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            List<MessageFuture> timeoutMessageFutures = new ArrayList<MessageFuture>(futures.size());

            for (MessageFuture future : futures.values()) {
                if (future.isTimeout()) {
                    timeoutMessageFutures.add(future);
                }
            }

            for (MessageFuture messageFuture : timeoutMessageFutures) {
                futures.remove(messageFuture.getRequestMessage().getId());
                messageFuture.setResultMessage(null);
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("timeout clear future : " + messageFuture.getRequestMessage().getBody());
                }
            }
            nowMills = System.currentTimeMillis();
        }
    }, TIMEOUT_CHECK_INTERNAL, TIMEOUT_CHECK_INTERNAL, TimeUnit.MILLISECONDS);
}

这个定时任务的逻辑也比较简单:扫描 ConcurrentHashMap<Long, MessageFuture> futures 这个成员变量里的 MessageFuture,如果这个 Future 超时了,就将 Future 的结果设置为 null。逻辑虽然简单,但这个功能涉及到了异步通信里一个很常见的功能,即异步转同步的功能。
在 netty 这种基于 NIO 的通信方式中,数据的发送,接收,全部是非阻塞的,因此判断一个动作完成与否,并不能像传统的 Java 同步代码一样,代码执行完了就认为相应的动作也真正完成了,例如,在 netty 中,如果通过 channel.write(); 方法往对端发送一个数据,这个方法执行完了,并不代表数据发送出去了,channel.write() 方法会返回一个 future,应用代码应该利用这个 future ,通过这个 future 可以知道数据到底发送出去了没有,也可以为这个 future 添加动作完成后的回调逻辑,也可以阻塞等待这个 future 所关联的动作执行完毕。
在 seata 中,存在着发送一个请求,并等待相应这样的使用场景,上层的 api 可能是这么定义的:
public Response request(Request request) {}
而基于 nio 的底层数据发送逻辑却是这样的:

1. send request message
2. 为业务的请求构建一个业务层面的 future 实例
3. 阻塞等待在这个 future 上
4. 当收到对应的 response message 后,唤醒上面的 future,阻塞等待在这个 future 上的线程继续执行
5. 拿到结果,request 方法结束

AbstractRpcRemoting 定义了几个数据发送相关的方法,分别是:

/**
 * Send async request with response object.
 *
 * @param address the address
 * @param channel the channel
 * @param msg     the msg
 * @return the object
 * @throws TimeoutException the timeout exception
 */
protected Object sendAsyncRequestWithResponse(String address, Channel channel, Object msg) throws TimeoutException;

/**
 * Send async request with response object.
 *
 * @param address the address
 * @param channel the channel
 * @param msg     the msg
 * @param timeout the timeout
 * @return the object
 * @throws TimeoutException the timeout exception
 */
protected Object sendAsyncRequestWithResponse(String address, Channel channel, Object msg, long timeout) throws
    TimeoutException;

/**
 * Send async request without response object.
 *
 * @param address the address
 * @param channel the channel
 * @param msg     the msg
 * @return the object
 * @throws TimeoutException the timeout exception
 */
protected Object sendAsyncRequestWithoutResponse(String address, Channel channel, Object msg) throws
    TimeoutException;

这几个方法就符合上面说到的发送一个请求,并等待相应这样的使用场景,上面这三个方法,其实都委托给了 sendAsyncRequest 来实现,这个方法的代码是这样子的:

private Object sendAsyncRequest(String address, Channel channel, Object msg, long timeout)
    throws TimeoutException {
    if (channel == null) {
        LOGGER.warn("sendAsyncRequestWithResponse nothing, caused by null channel.");
        return null;
    }
    final RpcMessage rpcMessage = new RpcMessage();
    rpcMessage.setId(RpcMessage.getNextMessageId());
    rpcMessage.setAsync(false);
    rpcMessage.setHeartbeat(false);
    rpcMessage.setRequest(true);
    rpcMessage.setBody(msg);

    final MessageFuture messageFuture = new MessageFuture();
    messageFuture.setRequestMessage(rpcMessage);
    messageFuture.setTimeout(timeout);
    futures.put(rpcMessage.getId(), messageFuture);

    if (address != null) {
        ConcurrentHashMap<String, BlockingQueue<RpcMessage>> map = basketMap;
        BlockingQueue<RpcMessage> basket = map.get(address);
        if (basket == null) {
            map.putIfAbsent(address, new LinkedBlockingQueue<RpcMessage>());
            basket = map.get(address);
        }
        basket.offer(rpcMessage);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("offer message: " + rpcMessage.getBody());
        }
        if (!isSending) {
            synchronized (mergeLock) {
                mergeLock.notifyAll();
            }
        }
    } else {
        ChannelFuture future;
        channelWriteableCheck(channel, msg);
        future = channel.writeAndFlush(rpcMessage);
        future.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                if (!future.isSuccess()) {
                    MessageFuture messageFuture = futures.remove(rpcMessage.getId());
                    if (messageFuture != null) {
                        messageFuture.setResultMessage(future.cause());
                    }
                    destroyChannel(future.channel());
                }
            }
        });
    }
    if (timeout > 0) {
        try {
            return messageFuture.get(timeout, TimeUnit.MILLISECONDS);
        } catch (Exception exx) {
            LOGGER.error("wait response error:" + exx.getMessage() + ",ip:" + address + ",request:" + msg);
            if (exx instanceof TimeoutException) {
                throw (TimeoutException)exx;
            } else {
                throw new RuntimeException(exx);
            }
        }
    } else {
        return null;
    }
}

先抛开方法的其它细节,比如说同步写还是异步写,以及发送频率控制。我们可以发现,这个方法其实从大角度来划分,就是如下的步骤:

  1. 构造请求 message
  2. 为这个请求 message 构造一个 message future
  3. 发送数据
  4. 阻塞等待在 message future

不过 AbstractRpcRemoting 也定义了方法用于仅发送消息,不接收响应的使用场景,如下所示:

/**
 * Send request.
 *
 * @param channel the channel
 * @param msg     the msg
 */
protected void sendRequest(Channel channel, Object msg) {
    RpcMessage rpcMessage = new RpcMessage();
    rpcMessage.setAsync(true);
    rpcMessage.setHeartbeat(msg instanceof HeartbeatMessage);
    rpcMessage.setRequest(true);
    rpcMessage.setBody(msg);
    rpcMessage.setId(RpcMessage.getNextMessageId());
    if (msg instanceof MergeMessage) {
        mergeMsgMap.put(rpcMessage.getId(), (MergeMessage)msg);
    }
    channelWriteableCheck(channel, msg);
    if (LOGGER.isDebugEnabled()) {
        LOGGER.debug("write message:" + rpcMessage.getBody() + ", channel:" + channel + ",active?"
            + channel.isActive() + ",writable?" + channel.isWritable() + ",isopen?" + channel.isOpen());
    }
    channel.writeAndFlush(rpcMessage);
}

/**
 * Send response.
 *
 * @param msgId   the msg id
 * @param channel the channel
 * @param msg     the msg
 */
protected void sendResponse(long msgId, Channel channel, Object msg) {
    RpcMessage rpcMessage = new RpcMessage();
    rpcMessage.setAsync(true);
    rpcMessage.setHeartbeat(msg instanceof HeartbeatMessage);
    rpcMessage.setRequest(false);
    rpcMessage.setBody(msg);
    rpcMessage.setId(msgId);
    channelWriteableCheck(channel, msg);
    if (LOGGER.isDebugEnabled()) {
        LOGGER.debug("send response:" + rpcMessage.getBody() + ",channel:" + channel);
    }
    channel.writeAndFlush(rpcMessage);
}

这样的场景就不需要引入 future 机制,直接调用 netty 的 api 把数据发送出去就完事了。
分析思路回到有 future 的场景,发送数据后,要在 future 上进行阻塞等待,即调用 get 方法,那 get 方法什么返回呢,我们上面说到 future 被唤醒的时候,我们先不讨论 future 的实现细节,一个 future 什么时候被唤醒呢,在这种 请求-响应 的模式下,显然是收到了响应的时候。所以我们需要查看一下 AbstractRpcRemoting 的 channelRead 方法

@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
    if (msg instanceof RpcMessage) {
        final RpcMessage rpcMessage = (RpcMessage)msg;
        if (rpcMessage.isRequest()) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));
            }
            try {
                AbstractRpcRemoting.this.messageExecutor.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            dispatch(rpcMessage.getId(), ctx, rpcMessage.getBody());
                        } catch (Throwable th) {
                            LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
                        }
                    }
                });
            } catch (RejectedExecutionException e) {
                LOGGER.error(FrameworkErrorCode.ThreadPoolFull.getErrCode(),
                    "thread pool is full, current max pool size is " + messageExecutor.getActiveCount());
                if (allowDumpStack) {
                    String name = ManagementFactory.getRuntimeMXBean().getName();
                    String pid = name.split("@")[0];
                    int idx = new Random().nextInt(100);
                    try {
                        Runtime.getRuntime().exec("jstack " + pid + " >d:/" + idx + ".log");
                    } catch (IOException exx) {
                        LOGGER.error(exx.getMessage());
                    }
                    allowDumpStack = false;
                }
            }
        } else {
            MessageFuture messageFuture = futures.remove(rpcMessage.getId());
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug(String
                    .format("%s msgId:%s, future :%s, body:%s", this, rpcMessage.getId(), messageFuture,
                        rpcMessage.getBody()));
            }
            if (messageFuture != null) {
                messageFuture.setResultMessage(rpcMessage.getBody());
            } else {
                try {
                    AbstractRpcRemoting.this.messageExecutor.execute(new Runnable() {
                        @Override
                        public void run() {
                            try {
                                dispatch(rpcMessage.getId(), ctx, rpcMessage.getBody());
                            } catch (Throwable th) {
                                LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
                            }
                        }
                    });
                } catch (RejectedExecutionException e) {
                    LOGGER.error(FrameworkErrorCode.ThreadPoolFull.getErrCode(),
                        "thread pool is full, current max pool size is " + messageExecutor.getActiveCount());
                }
            }
        }
    }
}

可以看到调用了 messageFuture 当 setResultMessage() 方法,设置 future 的结果,也就是说,唤醒了 future,那么阻塞在 future 的 get 方法上的线程就被唤醒了,得到结果,继续往下执行。

接下来我们讨论 MessageFuture 的实现细节,其实 seata 里面有很多种 future 相关的类,实现方式也不太一样,不过都大同小异,有的是基于 CompletableFuture 实现,有的是基于 CountDownLatch 实现。比如说,MessageFuture 就是基于 CompletableFuture 实现的,先看看它的成员变量:

private RpcMessage requestMessage;
private long timeout;
private long start = System.currentTimeMillis();
private transient CompletableFuture origin = new CompletableFuture();

CompletableFuture 是它的一个成员变量,它被利用来阻塞当前线程。MessageFuture 的 get 方法,依赖于 CompletableFuture 的 get 方法,来实现有一定时间限制的等待,直到另一个线程唤醒 CompletableFuture。如下所示:

/**
 * Get object.
 *
 * @param timeout the timeout
 * @param unit    the unit
 * @return the object
 * @throws TimeoutException the timeout exception
 * @throws InterruptedException the interrupted exception
 */
public Object get(long timeout, TimeUnit unit) throws TimeoutException,
    InterruptedException {
    Object result = null;
    try {
        result = origin.get(timeout, unit);
    } catch (ExecutionException e) {
        throw new ShouldNeverHappenException("Should not get results in a multi-threaded environment", e);
    } catch (TimeoutException e) {
        throw new TimeoutException("cost " + (System.currentTimeMillis() - start) + " ms");
    }

    if (result instanceof RuntimeException) {
        throw (RuntimeException)result;
    } else if (result instanceof Throwable) {
        throw new RuntimeException((Throwable)result);
    }

    return result;
}

/**
 * Sets result message.
 *
 * @param obj the obj
 */
public void setResultMessage(Object obj) {
    origin.complete(obj);
}

既然说到了 future 机制,这里也顺便把 io.seata.config.ConfigFuture 提一下,它就是上面提到的基于 CountDownLatch 实现的一种 future 机制,虽然实现方式两者不一样,但完成的功能和作用是一样的。

private final CountDownLatch latch = new CountDownLatch(1);

/**
 * Get object.
 *
 * @param timeout the timeout
 * @param unit    the unit
 * @return the object
 * @throws InterruptedException the interrupted exception
 */
public Object get(long timeout, TimeUnit unit) {
    this.timeoutMills = unit.toMillis(timeout);
    try {
        boolean success = latch.await(timeout, unit);
        if (!success) {
            LOGGER.error(
                "config operation timeout,cost:" + (System.currentTimeMillis() - start) + " ms,op:" + operation
                    .name()
                    + ",dataId:" + dataId);
            return getFailResult();
        }
    } catch (InterruptedException exx) {
        LOGGER.error("config operate interrupted,error:" + exx.getMessage());
        return getFailResult();
    }
    if (operation == ConfigOperation.GET) {
        return result == null ? content : result;
    } else {
        return result == null ? Boolean.FALSE : result;
    }
}

/**
 * Sets result.
 *
 * @param result the result
 */
public void setResult(Object result) {
    this.result = result;
    latch.countDown();
}

阻塞操作调用了 CountDownLatch 的 await 方法,而唤醒操作则调用 countDown 方法,核心在于需要把 CountDownLatch 的 latch 值设置为 1。
实际上,Java 语言本身已经提供了 java.util.concurrent.Future 这个类来提供 Future 机制,但 Java 原生的 Future 机制功能过于单一,比如说不能主动设置 future 的结果,也不能为它添加 listener,所有有许多像 seata 这样的软件,会选择去重新实现一种 future 机制来满足异步转同步的需求。也有像 netty 这样的软件,它不会借助类似于 countdownlatch 来实现,而是直接扩展 java.util.concurrent.Future,在它的基础上添加功能。

防洪机制

在 AbstractRpcRemoting 中,往外发数据的时候,它都会先进行一个检查,即检查当前的 channel 是否可写。

private void channelWriteableCheck(Channel channel, Object msg) {
    int tryTimes = 0;
    synchronized (lock) {
        while (!channel.isWritable()) {
            try {
                tryTimes++;
                if (tryTimes > NettyClientConfig.getMaxNotWriteableRetry()) {
                    destroyChannel(channel);
                    throw new FrameworkException("msg:" + ((msg == null) ? "null" : msg.toString()),
                        FrameworkErrorCode.ChannelIsNotWritable);
                }
                lock.wait(NOT_WRITEABLE_CHECK_MILLS);
            } catch (InterruptedException exx) {
                LOGGER.error(exx.getMessage());
            }
        }
    }
}

这要从 netty 的内部机制说起,当调用 ChannelHandlerContext 或者 Channel 的 write 方法时,netty 只是把要写的数据放入了自身的一个环形队列里面,再由后台线程真正往链路上发。如果接受方的处理速度慢,也就是说,接收的速度慢,那么根据 tcpip 协议的滑动窗口机制,它也会导致发送方发送得慢。
我们可以把 netty 的环形队列想像成一个水池,调用 write 方法往池子里加水,netty 通过后台线程,慢慢把池子的水流走。这就有可能出现一种情况,由于池子水流走的速度远远慢于往池子里加水的速度,这样会导致池子的总水量随着时间的推移越来越多。所以往池子里加水时应该考虑当前池子里的水量,否则最终会导致应用的内存溢出。
netty 对于水池提供了两个设置,一个是高水位,一个是低水位,当池子里的水高于高水位时,这个时候 channel.isWritable() 返回 false,并且直到水位慢慢降回到低水位时,这个方法才会返回 true。

高低水位

上述的 channelWriteableCheck 方法,发现channel 不可写的时候,进入循环等待,等待的目的是让池子的水位下降到 low water mark,如果等待超过最大允许等待的时间,那么将会抛出异常并关闭连接。

消息队列

在 AbstractRpcRemoting 中,发送数据有两种方式,一种是直接调用 channel 往外写,另一种是先把数据放进“数据篮子”里,它实际上是一个 map, key 为远端地址,value为一个消息队列。数据放队列后,再由其它线程往外发。下面是 sendAsycRequest 方法的一部分代码,显示了这种机制:

ConcurrentHashMap<String, BlockingQueue<RpcMessage>> map = basketMap;
BlockingQueue<RpcMessage> basket = map.get(address);
if (basket == null) {
    map.putIfAbsent(address, new LinkedBlockingQueue<RpcMessage>());
    basket = map.get(address);
}
basket.offer(rpcMessage);
if (LOGGER.isDebugEnabled()) {
    LOGGER.debug("offer message: " + rpcMessage.getBody());
}
if (!isSending) {
    synchronized (mergeLock) {
        mergeLock.notifyAll();
    }
}

但我们在 AbstractRpcRemoting 里面没有看有任何额外的线程在晴空这个 basketMap。回顾一下上面的 RpcServer 的类继承体系,接下来我们要分析一下,AbstractRpcRemotingServer 这个类。

继承体系

AbstractRpcRemotingServer 这个类主要定义了于netty 启动一个 server bootstrap 相关的类,可见真正启动服务监听端口的是在这个类,先看一下它的start方法

@Override
public void start() {
    this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupWorker)
        .channel(nettyServerConfig.SERVER_CHANNEL_CLAZZ)
        .option(ChannelOption.SO_BACKLOG, nettyServerConfig.getSoBackLogSize())
        .option(ChannelOption.SO_REUSEADDR, true)
        .childOption(ChannelOption.SO_KEEPALIVE, true)
        .childOption(ChannelOption.TCP_NODELAY, true)
        .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSendBufSize())
        .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketResvBufSize())
        .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
            new WriteBufferWaterMark(nettyServerConfig.getWriteBufferLowWaterMark(),
                nettyServerConfig.getWriteBufferHighWaterMark()))
        .localAddress(new InetSocketAddress(listenPort))
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) {
                ch.pipeline().addLast(new IdleStateHandler(nettyServerConfig.getChannelMaxReadIdleSeconds(), 0, 0))
                    .addLast(new MessageCodecHandler());
                if (null != channelHandlers) {
                    addChannelPipelineLast(ch, channelHandlers);
                }

            }
        });

    if (nettyServerConfig.isEnableServerPooledByteBufAllocator()) {
        this.serverBootstrap.childOption(ChannelOption.ALLOCATOR, NettyServerConfig.DIRECT_BYTE_BUF_ALLOCATOR);
    }

    try {
        ChannelFuture future = this.serverBootstrap.bind(listenPort).sync();
        LOGGER.info("Server started ... ");
        RegistryFactory.getInstance().register(new InetSocketAddress(XID.getIpAddress(), XID.getPort()));
        initialized.set(true);
        future.channel().closeFuture().sync();
    } catch (Exception exx) {
        throw new RuntimeException(exx);
    }
}

这个类很常规,就是遵循 netty 的使用规范,用合适的配置启动一个 server,并调用注册中心 api 把自己作为一个服务发布出去。
我们可以看到,配置中确实也出现了我们上文中提到过的上下水位的配置。
另外,channelpipeline 中,除了添加一个保持链路有效性探测的 IdleStateHandler,和一个 MessageCodec,处理事务逻辑相关的 Handler 还需要由参数传入。
接下来我们看 RpcServer 这个类,从它的 init 方法里,我们可以看到,它把自己做为一个 ChannelHandler,加入到了 channel pipeline 中

/**
 * Init.
 */
@Override
public void init() {
    super.init();
    setChannelHandlers(RpcServer.this);
    DefaultServerMessageListenerImpl defaultServerMessageListenerImpl = new DefaultServerMessageListenerImpl(
        transactionMessageHandler);
    defaultServerMessageListenerImpl.init();
    defaultServerMessageListenerImpl.setServerMessageSender(this);
    this.setServerMessageListener(defaultServerMessageListenerImpl);
    super.start();

}

RpcServer 自身也实现了 channelRead 方法,但它只处理心跳相关的信息和注册相关的信息,其它的业务消息,它交给父类处理,而先前我们也已经看到,父类的channelRead
方法里,反过来会调用 dispatch 这个抽象方法去做消息的分发,而 RpcServer 类实现了这个抽象方法,在接收到不同的消息类型是,采取不同的处理流程。
关于事务的处理流程的细节,本篇文章暂不涉及,后续文章再慢慢分析。

行文至此,回想我们先前提到的一个疑惑:
“当我启动一个 RpcServer 的时候,我是真的在启动一个 server 吗?看起来我好像在启动一个 ChannelHandler,可是 ChannelHandler 怎么谈得上‘启动’呢?”
是的,我们既在启动一个 server,这个 server 也实现了事务处理逻辑,它同时也是个 ChannelHandler。
没有一定的事实标准去衡量这样写的代码是好是坏,我们也没必要去争论 Effective Java 提到的什么时候该用组合,什么时候该用继承。
本文到此结束。
扫一扫关注我的微信公众号

查看原文

赞 1 收藏 0 评论 0

beanlam 赞了文章 · 2019-04-22

一键生成微信个人专属数据报告,了解你的微信社交历史

[TOC]

一键生成微信个人专属数据报告,了解你的微信社交历史

简介

你是否想过生成一份属于你的微信个人数据报告,了解你的微信社交历史。现在,我们基于python对微信好友进行全方位数据分析,包括:昵称、性别、年龄、地区、备注名、个性签名、头像、群聊和公众号等。

其中,在分析好友类型方面,主要统计出你的陌生人、星标好友、不让他看我的朋友圈的好友、不看他的朋友圈的好友数据。在分析地区方面,主要统计所有好友在全国的分布以及对好友数最多的省份进行进一步分析。在其他方面,统计出你的好友性别比例、猜出你最亲密的好友,分析你的特殊好友,找出与你所在共同群聊数最多的好友数据,对你的好友个性签名进行分析,对你的好友头像进行分析,并进一步检测出使用真人头像的好友数据。

目前网上关于这方面的数据分析文章比较多,但是运行起来比较麻烦,而本程序的运行十分简单,只需要扫码登录一步操作即可。

功能截图

example1.png
example2.png
example3.png
example4.png
example5.png
example6.png
example7.png
example8.png
example9.png
example10.png

如何运行

# 跳转到当前目录
cd 目录名
# 先卸载依赖库
pip uninstall -y -r requirement.txt
# 再重新安装依赖库
pip install -r requirement.txt
# 开始运行
python generate_wx_data.py

如何打包成二进制可执行文件

# 安装pyinstaller
pip install pyinstaller
# 跳转到当前目录
cd 目录名
# 先卸载依赖库
pip uninstall -y -r requirement.txt
# 再重新安装依赖库
pip install -r requirement.txt
# 更新 setuptools
pip install --upgrade setuptools
# 开始打包
pyinstaller generate_wx_data.py

编写思路

  1. 首先,进行初始化,并根据不同操作系统,启用微信机器人。
    # 初始化所需文件夹
    init_folders()


    # 启动微信机器人,自动根据操作系统执行不同的指令
    if('Windows' in system()):
        # Windows
        bot = Bot(cache_path=True)
    elif('Darwin' in system()):
        # MacOSX
        bot = Bot(cache_path=True)
    elif('Linux' in system()):
        # Linux
        bot = Bot(console_qr=2,cache_path=True)
    else:
        # 自行确定
        print(u"无法识别你的操作系统类型,请自己设置")
        exit()
  1. 登录完微信后,开始获取好友数据和群聊数据。
# 获取所有好友
friends = bot.friends(update=False)

# 获取所有活跃群聊
groups = bot.groups()
  1. 共同所在群聊成员分析,依次对每个好友进行检测。
def group_common_in():

    # 获取所有活跃的群聊
    groups = bot.groups()

    # 每个好友与你相同的群聊个数
    dict_common_in = {}

    # 遍历所有好友,第0个为你自己,所以去掉
    for x in friends[1:]:
        # 依次在每个群聊中搜索
        for y in groups:
            # x在y中
            if(x in y):
                # 获取微信名称
                name = x.nick_name
                # 判断是否有备注,有的话就使用备注
                if(x.remark_name and x.remark_name != ''):
                    name = x.remark_name

                # 增加计数
                if(name in dict_common_in.keys()):
                    dict_common_in[name] += 1
                else:
                    dict_common_in[name] = 1
  1. 获取微信好友头像,以便进一步分析。这里下载头像比较慢,所以采取多线程方式进行下载。在多线程中,使用队列保存我们的头像url,不同线程从队列中获取头像url,并下载到本地。
    # 创建一个队列,用于多线程下载头像,提高下载速度
    queue_head_image = Queue()

    # 将每个好友元素存入队列中
    # 如果为了方便调试,可以仅仅插入几个数据,friends[1:10]
    for user in friends:
        queue_head_image.put(user)

    # 启动10个线程下载头像
    for i in range(1, 10):
        t = Thread(target=download_head_image,args=(i,))
        t.start()

其中download_head_image的具体实现为:

# 下载好友头像,此步骤消耗时间比较长
def download_head_image(thread_name):

    # 队列不为空的情况
    while(not queue_head_image.empty()):
        # 取出一个好友元素
        user = queue_head_image.get()

        # 下载该好友头像,并保存到指定位置,生成一个15位数的随机字符串
        random_file_name = ''.join([str(random.randint(0,9)) for x in range(15)])
        user.get_avatar(save_path='image/' + random_file_name + '.jpg')

        # 输出提示
        print(u'线程%d:正在下载微信好友头像数据,进度%d/%d,请耐心等待……' %(thread_name, len(friends)-queue_head_image.qsize(), len(friends)))
  1. 进行性别、地区统计,并将生产的html文件保存到本地。这里没什么难度,所以就不详细展开了。
# 分析好友性别比例
def sex_ratio():

    # 初始化
    male, female, other = 0, 0, 0

    # 遍历
    for user in friends:
        if(user.sex == 1):
            male += 1
        elif(user.sex == 2):
            female += 1
        else:
            other += 1

    name_list = ['男性', '女性', '未设置']
    num_list = [male, female, other]

    pie = Pie("微信好友性别比例")
    pie.add("", name_list, num_list, is_label_show=True)
    pie.render('data/好友性别比例.html')
  1. 分析你认识的好友、最亲密的人以及特殊好友。以特殊好友为例,我们将好友分为星标好友(很重要的人), 不让他看我的朋友圈的好友, 不看他朋友圈的好友, 消息置顶好友, 陌生人。这里分类的依据是根据itchat中的StarFriendContactFlag而来的。根据经验可知,StarFriend为1表示为星标好友,ContactFlag为1和3表示好友,259和33027表示不让他看我的朋友圈,65539和65537和66051表示不看他的朋友圈,65795表示两项设置全禁止, 73731表示陌生人。
# 特殊好友分析
def analyze_special_friends():

    # 星标好友(很重要的人), 不让他看我的朋友圈的好友, 不看他朋友圈的好友, 消息置顶好友, 陌生人
    star_friends, hide_my_post_friends, hide_his_post_friends, sticky_on_top_friends, stranger_friends = 0, 0, 0, 0, 0

    for user in friends:


        # 星标好友为1,为0表示非星标,不存在星标选项的为陌生人
        if('StarFriend' in (user.raw).keys()):
            if((user.raw)['StarFriend'] == 1):
                star_friends += 1
        else:
            stranger_friends += 1

        # 好友类型及权限:1和3好友,259和33027不让他看我的朋友圈,65539和65537和66051不看他的朋友圈,65795两项设置全禁止, 73731陌生人
        if((user.raw)['ContactFlag'] in [259, 33027, 65795]):
            hide_my_post_friends += 1
        if ((user.raw)['ContactFlag'] in [66051, 65537, 65539, 65795]):
            hide_his_post_friends += 1

        # 消息置顶好友为2051
        if ((user.raw)['ContactFlag'] in [2051]):
            sticky_on_top_friends += 1

        # 陌生人
        if ((user.raw)['ContactFlag'] in [73731]):
            stranger_friends += 1


    bar = Bar('特殊好友分析')
    bar.add(name='', x_axis=['星标', '不让他看我朋友圈', '不看他朋友圈', '消息置顶', '陌生人'], y_axis=[star_friends, hide_my_post_friends, hide_his_post_friends, sticky_on_top_friends, stranger_friends], legend_orient="vertical", legend_pos="left")
    bar.render('data/特殊好友分析.html')
  1. 对好友个性签名进行分析,并绘制出词语。这里比较复杂,首先将个性签名列表转化为字符串,调用nlp处理接口,对返回的数据进行过滤。同时,对短语进行分词,过滤,词频统计操作。最后,使用pyechart进行绘制词语图。代码中注释非常多,基本都能看懂,所以在此也无需再详细展开了。
# 分析个性签名
def analyze_signature():

    # 个性签名列表
    data = []
    for user in friends:

        # 清除签名中的微信表情emoj,即<span class.*?</span>
        # 使用正则查找并替换方式,user.signature为源文本,将<span class.*?</span>替换成空
        new_signature = re.sub(re.compile(r"<span class.*?</span>", re.S), "", user.signature)

        # 只保留签名为1行的数据,过滤为多行的签名
        if(len(new_signature.split('\n')) == 1):
            data.append(new_signature)

    # 将个性签名列表转为string
    data = '\n'.join(data)

    # 进行分词处理,调用接口进行分词
    # 这里不使用jieba或snownlp的原因是无法打包成exe文件或者打包后文件非常大
    postData = {'data':data, 'type':'exportword', 'arg':'', 'beforeSend':'undefined'}
    response = post('http://life.chacuo.net/convertexportword',data=postData)
    data = response.text.replace('{"status":1,"info":"ok","data":["', '')
    # 解码
    data = data.encode('utf-8').decode('unicode_escape')

    # 将返回的分词结果json字符串转化为python对象,并做一些处理
    data = data.split("=====================================")[0]

    # 将分词结果转化为list,根据分词结果,可以知道以2个空格为分隔符
    data = data.split('  ')

    # 对分词结果数据进行去除一些无意义的词操作
    stop_words_list = [',', ',', '、', 'the', 'a', 'is', '…', '·', 'э', 'д', 'э', 'м', 'ж', 'и', 'л', 'т', 'ы', 'н', 'з', 'м', '…', '…', '…', '…', '…', '、', '.', '。', '!', '!', ':', ':', '~', '|', '▽', '`', 'ノ', '♪', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '\'', '‘', '’', '“', '”', '的', '了', '是', '你', '我', '他', '她','=', '\r', '\n', '\r\n', '\t', '以下关键词', '[', ']', '{', '}', '(', ')', '(', ')', 'span', '<', '>', 'class', 'html', '?', '就', '于', '下', '在', '吗', '嗯']
    tmp_data = []
    for word in data:
        if(word not in stop_words_list):
            tmp_data.append(word)
    data = tmp_data


    # 进行词频统计,结果存入字典signature_dict中
    signature_dict = {}
    for index, word in enumerate(data):

        print(u'正在统计好友签名数据,进度%d/%d,请耐心等待……' % (index + 1, len(data)))

        if(word in signature_dict.keys()):
            signature_dict[word] += 1
        else:
            signature_dict[word] = 1

    # 开始绘制词云
    name = [x for x in signature_dict.keys()]
    value = [x for x in signature_dict.values()]
    wordcloud = WordCloud('微信好友个性签名词云图')
    wordcloud.add("", name, value, shape='star', word_size_range=[1,100])
    wordcloud.render('data/好友个性签名词云.html')
  1. 拼接所有好友头像,这里使用到PIL的图像处理功能,首先对头像个数进行统计,自适应生成矩形图片。由于我们知道微信头像尺寸为640 * 640,所以处理起来就很方便了。
# 拼接所有微信好友头像
def merge_head_image():
    # 拼接头像
    pics = listdir('image')  # 得到user目录下的所有文件,即各个好友头像
    numPic = len(pics)
    eachsize = int(math.sqrt(float(640 * 640) / numPic))  # 先圈定每个正方形小头像的边长,如果嫌小可以加大
    numrow = int(640 / eachsize)
    numcol = int(numPic / numrow)  # 向下取整
    toImage = Image.new('RGB', (eachsize * numrow, eachsize * numcol))  # 先生成头像集模板

    x = 0  # 小头像拼接时的左上角横坐标
    y = 0  # 小头像拼接时的左上角纵坐标

    for index, i in enumerate(pics):

        print(u'正在拼接微信好友头像数据,进度%d/%d,请耐心等待……' % (index + 1, len(pics)))

        try:
            # 打开图片
            img = Image.open('image/' + i)
        except IOError:
            print(u'Error: 没有找到文件或读取文件失败')
        else:
            # 缩小图片
            img = img.resize((eachsize, eachsize), Image.ANTIALIAS)
            # 拼接图片
            toImage.paste(img, (x * eachsize, y * eachsize))
            x += 1
            if x == numrow:
                x = 0
                y += 1

    toImage.save('data/拼接' + ".jpg")
  1. 检测使用人脸作为头像的好友数量,这里使用到opencv的人脸检测功能,使用opencv默认的模型进行检测。首先载入图片,并进行灰度处理,最后加载人脸识别模型进行检测,若检测到脸数大于0,则说明存在。同时要注意的是,对错误的头像要进行舍弃操作。
# 检测使用真实人脸的好友个数
def detect_human_face():

    # 得到user目录下的所有文件名称,即各个好友头像
    pics = listdir('image')

    # 使用人脸的头像个数
    count_face_image = 0

    # 存储使用人脸的头像的文件名
    list_name_face_image = []

    # 加载人脸识别模型
    face_cascade = CascadeClassifier('model/haarcascade_frontalface_default.xml')

    for index, file_name in enumerate(pics):
        print(u'正在进行人脸识别,进度%d/%d,请耐心等待……' % (index+1, len(pics)))
        # 读取图片
        img = imread('image/' + file_name)

        # 检测图片是否读取成功,失败则跳过
        if img is None:
            continue

        # 对图片进行灰度处理
        gray = cvtColor(img, COLOR_BGR2GRAY)
        # 进行实际的人脸检测,传递参数是scaleFactor和minNeighbor,分别表示人脸检测过程中每次迭代时图
        faces = face_cascade.detectMultiScale(gray, 1.3, 5)
        if (len(faces) > 0):
            count_face_image += 1
            list_name_face_image.append(file_name)

    print(u'使用人脸的头像%d/%d' %(count_face_image,len(pics)))
  1. 所有数据统计完后,我们生产一个总的html网页文件,方便我们直接查看。
# 生成一个html文件,并保存到文件file_name中
def generate_html(file_name):
    with open(file_name, 'w', encoding='utf-8') as f:
        data = '''
            <meta http-equiv='Content-Type' content='text/html; charset=utf-8'>
            <meta charset="UTF-8">
            <title>一键生成微信个人专属数据报告(了解你的微信社交历史)</title>
            <meta name='keywords' content='微信个人数据'>
            <meta name='description' content=''>
            <iframe name="iframe1" marginwidth=0 marginheight=0 width=100% height=60% data-original="data/好友地区分布.html" frameborder=0></iframe>
            <iframe name="iframe2" marginwidth=0 marginheight=0 width=100% height=60% data-original="data/某省好友地区分布.html" frameborder=0></iframe>
            <iframe name="iframe3" marginwidth=0 marginheight=0 width=100% height=60% data-original="data/好友性别比例.html" frameborder=0></iframe>
            <iframe name="iframe4" marginwidth=0 marginheight=0 width=100% height=60% data-original="data/你认识的好友比例.html" frameborder=0></iframe>
            <iframe name="iframe5" marginwidth=0 marginheight=0 width=100% height=60% data-original="data/你最亲密的人.html" frameborder=0></iframe>
            <iframe name="iframe6" marginwidth=0 marginheight=0 width=100% height=60% data-original="data/特殊好友分析.html" frameborder=0></iframe>
            <iframe name="iframe7" marginwidth=0 marginheight=0 width=100% height=60% data-original="data/共同所在群聊分析.html" frameborder=0></iframe>
            <iframe name="iframe8" marginwidth=0 marginheight=0 width=100% height=60% data-original="data/好友个性签名词云.html" frameborder=0></iframe>
            <iframe name="iframe9" marginwidth=0 marginheight=0 width=100% height=60% data-original="data/微信好友头像拼接图.html" frameborder=0></iframe>
            <iframe name="iframe10" marginwidth=0 marginheight=0 width=100% height=60% data-original="data/使用人脸的微信好友头像拼接图.html" frameborder=0></iframe>
        '''
        f.write(data)

补充

完整版源代码存放在github上,有需要的可以下载

项目持续更新,欢迎您star本项目

查看原文

赞 18 收藏 12 评论 1

认证与成就

  • 获得 356 次点赞
  • 获得 24 枚徽章 获得 2 枚金徽章, 获得 6 枚银徽章, 获得 16 枚铜徽章

擅长技能
编辑

(゚∀゚ )
暂时没有

开源项目 & 著作
编辑

(゚∀゚ )
暂时没有

注册于 2014-12-19
个人主页被 3.9k 人浏览