KerryWu

KerryWu 查看完整档案

上海编辑福州大学  |  软件工程 编辑上海得帆  |  高级技术顾问 编辑 github.com/Kerry2019 编辑
编辑

保持饥饿

个人动态

KerryWu 发布了文章 · 2月28日

分享自画的一个支付方案图

赞 0 收藏 0 评论 0

KerryWu 发布了文章 · 2月15日

Seata TCC 分布式事务

1. 前言

本文先通过分布式事务中tcc方案,衍生出seata的tcc模式,主要还是会通过代码示例来做介绍。github代码地址可提前下载,该项目中包括数据库、seata配置,以及所有分布式服务的全部代码。大家如果想练练手,可以先拉取该项目代码,再结合本文学习。核心配置环境如下:

环境类型版本号
jdk1.8.0_251
mysql8.0.22
seata server1.4.1

1.1. tcc

我们前面有几篇文章都有介绍过分布式事务的方案,目前常见的分布式事务方案有:2pc、tcc和异步确保型。之前讲过用jta atomikos实现多数据源的 2pc,用 异步确保型 方案实现支付业务的事务等等,就是没专门讲过 tcc 的应用。

因为tcc方案的操作难度还是比较大的。不能单打独斗,最好需要依托一个成熟的框架来实现。常见的tcc开源框架有tcc-transaction、Hmily和ByteTCC等,不过他们不像seata背靠大厂,无法提供持续的维护,因此我更推荐seata的tcc方案。

1.2. seata

先说说seata吧,分布式事务的解决方案肯定不局限于上面说的三种,实际上五花八门。因为它的确很让人头疼,各位大神都想研发出最好用的框架。本文的主角 - seata ,就是阿里的一个开源项目。

seata提供了AT、TCC、SAGA 和 XA,一共4种事务模式。像AT模式就很受欢迎,我们在实现多数据源的事务一致性时,通常会选用 2PC的方案,等待所有数据源的事务执行成功,最后再一起提交事务。这个等待所有数据源事务执行的过程就比较耗时,即影响性能,也不安全。

而seata AT模式的做法就很灵活,它学习数据库的 undo log,每个事务执行时立即提交事务,但会把 undo 的回退sql记录下来。如果所有事务执行成功,清除记录 undo sql的行记录,如果某个事务失败,则执行对应 undo sql 回滚数据。在保证事务的同时,并发量也大了起来。

但我们今天要讲的是 seata TCC 模式,如果你对 Seata的其他模式感兴趣,可以上官网了解。

2. 业务

先讲一下示例的业务吧,我们还是拿比较经典的电商支付场景举例。假设支付成功后,涉及到三个系统事务:

  1. 订单系统(order):创建支付订单。
  2. 库存系统(storage):对应商品扣除库存。
  3. 账户系统(account):用户账户扣除响应金额。

2.1. tcc业务

按照tcc(try-confirm-cancel)的思路,这三个事务可以分别分解成下面的过程。

订单系统 order
  1. try: 创建订单,但是订单状态设置一个临时状态(如:status=0)。
  2. confirm: try成功,提交事务,将订单状态更新为完全状态(如:status=1)。
  3. cancel: 回滚事务,删除该订单记录。
库存系统 storage
  1. try: 将需要减少的库存量冻结起来。
  2. confirm: try成功,提交事务,使用冻结的库存扣除,完成业务数据处理。
  3. cancel: 回滚事务,冻结的库存解冻,恢复以前的库存量。
账户系统 account
  1. try: 将需要扣除的钱冻结起来。
  2. confirm: try成功,提交事务,使用冻结的钱扣除,完成业务数据处理。
  3. cancel: 回滚事务,冻结的钱解冻,恢复以前的账户余额。

2.2. 数据库

为了模拟分布式事务,上述的不同系统业务,我们通过在不同数据库中创建表结构来模拟。当然tcc的分布式事务不局限于数据库层面,还包括http接口调用和rpc调用等,但是异曲同工,可以作为示例参考。

下面先列出三张业务表的表结构,具体的sql可见最后附件。

表:order
列名类型备注
idint主键
order_novarchar订单号
user_idint用户id
product_idint产品id
amountint数量
moneydecimal金额
statusint订单状态:0:创建中;1:已完结
表:storage
列名类型备注
idint主键
product_idint产品id
residueint剩余库存
frozenintTCC事务锁定的库存
表:account
列名类型备注
idint主键
user_idint用户id
residueint剩余可用额度
frozenintTCC事务锁定的金额

3. seata server

3.1. 下载

seata server 的安装包可直接从官方github下载,下载压缩包后,解压到本地或服务器上。

3.2. 配置

Seata Server 的配置文件有两个:

  • seata/conf/registry.conf
  • seata/conf/file.conf
registry.conf

Seata Server 要向注册中心进行注册,这样,其他服务就可以通过注册中心去发现 Seata Server,与 Seata Server 进行通信。Seata 支持多款注册中心服务:nacos 、eureka、redis、zk、consul、etcd3、sofa。我们项目中要使用 eureka 注册中心,eureka服务的连接地址、注册的服务名,这需要在 registry.conf 文件中对 registry 进行配置。

Seata 需要存储全局事务信息、分支事务信息、全局锁信息,这些数据存储到什么位置?针对存储位置的配置,支持放在配置中心,或者也可以放在本地文件。Seata Server 支持的配置中心服务有:nacos 、apollo、zk、consul、etcd3。这里我们选择最简单的,使用本地文件,这需要在 registry.conf 文件中对 config 进行配置。

file.conf

file.conf 中对事务信息的存储位置进行配置,存储位置支持:file、db、redis。
这里我们选择数据库作为存储位置,这需要在 file.conf 中进行配置。

3.3. 启动

执行 seata/bin/seata-server.sh(windows 是 seata-server.bat) 脚本即可启动seata server。还可以配置下列参数:

-h:注册到注册中心的ip
-p:server rpc 监听端口,默认 8091
-m:全局事务会话信息存储模式,file、db,优先读取启动参数
-n:server node,多个server时,需要区分各自节点,用于生成不同区间的transctionId,以免冲突
-e:多环境配置

3.4. 常见问题

mysql 8

默认启动后会报mysql-connector-java-x.jar驱动的错误,是因为seata server 默认不支持mysql 8。
可以在seata server的 lib 文件夹下替换 mysql 的驱动 jar 包。lib 文件夹下,已经有一个 jdbc 文件夹,把里面驱动版本为 8 的 mysql-connector-java-x.jar 包拷贝到外面 lib 文件夹下即可。

4. 代码

github示例项目中包括3个业务服务、1个注册中心,以及resources下的数据库脚本和seata server配置文件。按照服务的启动顺序,如下分类:

  1. resources/database-sql:初始化数据库
  2. eureka-server:运行 注册中心
  3. resources/seata-server:下载、安装、配置、启动 seata server服务
  4. account-service:运行 用户账户服务
  5. storage-service:运行 商品库存服务
  6. order-service:运行 订单服务
  7. 测试:通过postman等工具,调用 order-server 的下订单接口

3个业务服务中,order订单服务 可以被称为“主事务”,当订单创建成功后,再在订单服务中调用 account账号服务storage库存服务两个“副事务”。因此从 seata tcc代码层面上,可以分成下面两类。
下文中不会列举业务代码,完整代码可以从github上查看,只会列出 seata 的相关代码和配置。

4.1. 主事务(order)

4.1.1. application 配置文件

配置文件中需要配置 tx-service-group,需要注意的是,3个业务服务中都需要配置同样的值。

application.yml
spring:
  cloud:
    alibaba:
      seata:
        tx-service-group: order_tx_group

4.1.2. seata配置文件

在application.yml同级目录,即 resources 目录下,创建两个seata 的配置文件。还记得在seata server 启动的时候也有这两个文件,但内容不一样,不要混淆了。

file.conf
transport {
  type = "TCP"
  server = "NIO"
  heartbeat = true
  enableClientBatchSendRequest = true

  threadFactory {
    bossThreadPrefix = "NettyBoss"
    workerThreadPrefix = "NettyServerNIOWorker"
    serverExecutorThread-prefix = "NettyServerBizHandler"
    shareBossWorker = false
    clientSelectorThreadPrefix = "NettyClientSelector"
    clientSelectorThreadSize = 1
    clientWorkerThreadPrefix = "NettyClientWorkerThread"
    bossThreadSize = 1
    workerThreadSize = "default"
  }
  shutdown {
    wait = 3
  }
  serialization = "seata"
  compressor = "none"
}
service {
  vgroupMapping.order_tx_group = "seata-server"
  order_tx_group.grouplist = "127.0.0.1:8091"
  enableDegrade = false
  disableGlobalTransaction = false
}

client {
  rm {
    asyncCommitBufferLimit = 10000
    lock {
      retryInterval = 10
      retryTimes = 30
      retryPolicyBranchRollbackOnConflict = true
    }
    reportRetryCount = 5
    tableMetaCheckEnable = false
    reportSuccessEnable = false
  }
  tm {
    commitRetryCount = 5
    rollbackRetryCount = 5
  }
  undo {
    dataValidation = true
    logSerialization = "jackson"
    logTable = "undo_log"
  }
  log {
    exceptionRate = 100
  }
}
registry.conf
registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  type = "eureka"

  eureka {
    serviceUrl = "http://localhost:8761/eureka"
  }
}

config {
  # file、nacos 、apollo、zk、consul、etcd3、springCloudConfig
  type = "file"
  file {
    name = "file.conf"
  }
}

4.1.3. @LocalTCC tcc服务

这是配置 TCC 子服务的核心代码,

  • @LocalTCC:

该注解需要添加到上面描述的接口上,表示实现该接口的类被 seata 来管理,seata 根据事务的状态,自动调用我们定义的方法,如果没问题则调用 Commit 方法,否则调用 Rollback 方法。

  • @TwoPhaseBusinessAction:

该注解用在接口的 Try 方法上。

  • @BusinessActionContextParameter:

该注解用来修饰 Try 方法的入参,被修饰的入参可以在 Commit 方法和 Rollback 方法中通过 BusinessActionContext 获取。

  • BusinessActionContext:

在接口方法的实现代码中,可以通过 BusinessActionContext 来获取参数, BusinessActionContext 就是 seata tcc 的事务上下文,用于存放 tcc 事务的一些关键数据。BusinessActionContext 对象可以直接作为 commit 方法和 rollbakc 方法的参数,Seata 会自动注入参数。

OrderTccAction.java

@LocalTCC
public interface OrderTccAction {

    /**
     * try 尝试
     *
     * BusinessActionContext 上下文对象,用来在两个阶段之间传递数据
     * BusinessActionContextParameter 注解的参数数据会被存入 BusinessActionContext
     * TwoPhaseBusinessAction 注解中commitMethod、rollbackMethod 属性有默认值,可以不写
     *
     * @param businessActionContext
     * @param orderNo
     * @param userId
     * @param productId
     * @param amount
     * @param money
     * @return
     */
    @TwoPhaseBusinessAction(name = "orderTccAction")
    boolean prepareCreateOrder(BusinessActionContext businessActionContext,
                               @BusinessActionContextParameter(paramName = "orderNo") String orderNo,
                               @BusinessActionContextParameter(paramName = "userId") Long userId,
                               @BusinessActionContextParameter(paramName = "productId") Long productId,
                               @BusinessActionContextParameter(paramName = "amount") Integer amount,
                               @BusinessActionContextParameter(paramName = "money") BigDecimal money);

    /**
     * commit 提交
     * @param businessActionContext
     * @return
     */
    boolean commit(BusinessActionContext businessActionContext);

    /**
     * cancel 撤销
     * @param businessActionContext
     * @return
     */
    boolean rollback(BusinessActionContext businessActionContext);
}

OrderTccActionImpl.java

@Slf4j
@Component
public class OrderTccActionImpl implements OrderTccAction {
    private final OrderMapper orderMapper;
    public OrderTccActionImpl(OrderMapper orderMapper){
        this.orderMapper=orderMapper;
    }
    
    /**
     * try 尝试
     *
     * BusinessActionContext 上下文对象,用来在两个阶段之间传递数据
     * BusinessActionContextParameter 注解的参数数据会被存入 BusinessActionContext
     * TwoPhaseBusinessAction 注解中commitMethod、rollbackMethod 属性有默认值,可以不写
     *
     * @param businessActionContext
     * @param orderNo
     * @param userId
     * @param productId
     * @param amount
     * @param money
     * @return
     */
    @Transactional(rollbackFor = Exception.class)
    @Override
    public boolean prepareCreateOrder(BusinessActionContext businessActionContext,
                                      String orderNo,
                                      Long userId,
                                      Long productId,
                                      Integer amount,
                                      BigDecimal money) {
        orderMapper.save(new OrderDO(orderNo,userId, productId, amount, money, 0));
        ResultHolder.setResult(OrderTccAction.class, businessActionContext.getXid(), "p");
        return true;
    }

    /**
     * commit 提交
     *
     * @param businessActionContext
     * @return
     */
    @Transactional(rollbackFor = Exception.class)
    @Override
    public boolean commit(BusinessActionContext businessActionContext) {
        //检查标记是否存在,如果标记不存在不重复提交
        String p = ResultHolder.getResult(OrderTccAction.class, businessActionContext.getXid());
        if (p == null){
            return true;
        }

        /**
         * 上下文对象从第一阶段向第二阶段传递时,先转成了json数据,然后还原成上下文对象
         * 其中的整数比较小的会转成Integer类型,所以如果需要Long类型,需要先转换成字符串在用Long.valueOf()解析。
         */
        String orderNo = businessActionContext.getActionContext("orderNo").toString();
        orderMapper.updateStatusByOrderNo(orderNo, 1);
        //提交完成后,删除标记
        ResultHolder.removeResult(OrderTccAction.class, businessActionContext.getXid());
        return true;
    }

    /**
     * cancel 撤销
     *
     * 第一阶段没有完成的情况下,不必执行回滚。因为第一阶段有本地事务,事务失败时已经进行了回滚。
     * 如果这里第一阶段成功,而其他全局事务参与者失败,这里会执行回滚
     * 幂等性控制:如果重复执行回滚则直接返回
     *
     * @param businessActionContext
     * @return
     */
    @Transactional(rollbackFor = Exception.class)
    @Override
    public boolean rollback(BusinessActionContext businessActionContext) {
        //检查标记是否存在,如果标记不存在不重复提交
        String p = ResultHolder.getResult(OrderTccAction.class, businessActionContext.getXid());
        if (p == null){
            return true;
        }
        String orderNo = businessActionContext.getActionContext("orderNo").toString();
        orderMapper.deleteByOrderNo(orderNo);
        //提交完成后,删除标记
        ResultHolder.removeResult(OrderTccAction.class, businessActionContext.getXid());
        return true;
    }

}

4.1.4. @GlobalTransactional 全局服务

@GlobalTransactional 注解是唯一作用到“主事务”的方法。该注解加在“主事务”调用“副事务”的方法上。

OrderServiceImpl.java

@Service
public class OrderServiceImpl implements OrderService {
    private final OrderTccAction orderTccAction;
    private final AccountFeign accountFeign;
    private final StorageFeign storageFeign;

    public OrderServiceImpl(OrderTccAction orderTccAction, AccountFeign accountFeign, StorageFeign storageFeign){
        this.orderTccAction=orderTccAction;
        this.accountFeign=accountFeign;
        this.storageFeign=storageFeign;
    }

    /**
     * 创建订单
     * @param orderDO
     */
    @GlobalTransactional
    @Override
    public void createOrder(OrderDO orderDO) {
        String orderNo=this.generateOrderNo();
        //创建订单
        orderTccAction.prepareCreateOrder(null,
                orderNo,
                orderDO.getUserId(),
                orderDO.getProductId(),
                orderDO.getAmount(),
                orderDO.getMoney());
        //扣余额
        accountFeign.decreaseMoney(orderDO.getUserId(),orderDO.getMoney());
        //扣库存
        storageFeign.decreaseStorage(orderDO.getProductId(),orderDO.getAmount());
    }

    private String generateOrderNo(){
        return LocalDateTime.now()
                .format(
                        DateTimeFormatter.ofPattern("yyMMddHHmmssSSS")
                );
    }
}

4.2. 副事务(account、storage)

account 和 storage 两个服务相比较于 order,只少了 “4.1.4. @GlobalTransactional 全局服务”,其他的配置完全一样。因此,这里就不再赘言了。

5. 总结

测试

通过调用“主事务” order-service 的创建订单接口,来模拟分布式事务。我们可以通过在3个业务服务的不同代码处故意抛出错误,看是否能够实现事务的一致回滚。

seata框架表结构

在 /resources/database-sql 的数据库脚本中,各自还有一些 seata 框架本身的表结构,用于存储分布式事务各自的中间状态。因为这个中间状态很短,一旦事务一致性达成,表数据就会自动删除,因此平时我们无法查看数据库。

因为seata tcc模式,会一直阻塞到所有的 try执行完毕,再执行后续的。从而我们可以通过在部分业务服务try的代码中加上Thread.sleep(10000),强制让事务过程变慢,从而就可以看到这些 seata 表数据。

幂等性

tcc模式中,CommitCancel 都是有自动重试功能的,处于事务一致性考虑,重试功能很有必要。但我们就一定要慎重考虑方法的 幂等性,示例代码中的ResultHolder类并不是个好方案,还是要在Commit、Cancel业务方法本身做幂等性要求。

查看原文

赞 0 收藏 0 评论 0

KerryWu 赞了文章 · 1月25日

如何保证 Redis 缓存与数据库双写一致性?

作者:不学无数的程序员\
链接:https://www.jianshu.com/p/a8e...

在做系统优化时,想到了将数据进行分级存储的思路。因为在系统中会存在一些数据,有些数据的实时性要求不高,比如一些配置信息。

基本上配置了很久才会变一次。而有一些数据实时性要求非常高,比如订单和流水的数据。所以这里根据数据要求实时性不同将数据分为三级。

  • 第1级:订单数据和支付流水数据;这两块数据对实时性和精确性要求很高,所以不添加任何缓存,读写操作将直接操作数据库。
  • 第2级:用户相关数据;这些数据和用户相关,具有读多写少的特征,所以我们使用redis进行缓存。
  • 第3级:支付配置信息;这些数据和用户无关,具有数据量小,频繁读,几乎不修改的特征,所以我们使用本地内存进行缓存。

但是只要使用到缓存,无论是本地内存做缓存还是使用 redis 做缓存,那么就会存在数据同步的问题,因为配置信息缓存在内存中,而内存时无法感知到数据在数据库的修改。这样就会造成数据库中的数据与缓存中数据不一致的问题。

接下来就讨论一下关于保证缓存和数据库双写时的数据一致性。

解决方案

那么我们这里列出来所有策略,并且讨论他们优劣性。

  1. 先更新数据库,后更新缓存
  2. 先更新数据库,后删除缓存
  3. 先更新缓存,后更新数据库
  4. 先删除缓存,后更新数据库

先更新数据库,后更新缓存

这种场景一般是没有人使用的,主要原因是在更新缓存那一步,为什么呢?因为有的业务需求缓存中存在的值并不是直接从数据库中查出来的,有的是需要经过一系列计算来的缓存值,那么这时候后你要更新缓存的话其实代价是很高的。如果此时有大量的对数据库进行写数据的请求,但是读请求并不多,那么此时如果每次写请求都更新一下缓存,那么性能损耗是非常大的。

举个例子比如在数据库中有一个值为 1 的值,此时我们有 10 个请求对其每次加一的操作,但是这期间并没有读操作进来,如果用了先更新数据库的办法,那么此时就会有十个请求对缓存进行更新,会有大量的冷数据产生,如果我们不更新缓存而是删除缓存,那么在有读请求来的时候那么就会只更新缓存一次。

先更新缓存,后更新数据库

这一种情况应该不需要我们考虑了吧,和第一种情况是一样的。

先删除缓存,后更新数据库

该方案也会出问题,具体出现的原因如下。

先删除缓存,后更新数据库

此时来了两个请求,请求 A(更新操作) 和请求 B(查询操作)

  1. 请求 A 会先删除 Redis 中的数据,然后去数据库进行更新操作
  2. 此时请求 B 看到 Redis 中的数据时空的,会去数据库中查询该值,补录到 Redis 中
  3. 但是此时请求 A 并没有更新成功,或者事务还未提交

那么这时候就会产生数据库和 Redis 数据不一致的问题。如何解决呢?其实最简单的解决办法就是延时双删的策略。

延时双删

但是上述的保证事务提交完以后再进行删除缓存还有一个问题,就是如果你使用的是 Mysql 的读写分离的架构的话,那么其实主从同步之间也会有时间差。

主从同步时间差

此时来了两个请求,请求 A(更新操作) 和请求 B(查询操作)

  1. 请求 A 更新操作,删除了 Redis
  2. 请求主库进行更新操作,主库与从库进行同步数据的操作
  3. 请 B 查询操作,发现 Redis 中没有数据
  4. 去从库中拿去数据
  5. 此时同步数据还未完成,拿到的数据是旧数据

此时的解决办法就是如果是对 Redis 进行填充数据的查询数据库操作,那么就强制将其指向主库进行查询。

从主库中拿数据

先更新数据库,后删除缓存

问题:这一种情况也会出现问题,比如更新数据库成功了,但是在删除缓存的阶段出错了没有删除成功,那么此时再读取缓存的时候每次都是错误的数据了。

先更新数据库,后删除缓存

此时解决方案就是利用消息队列进行删除的补偿。具体的业务逻辑用语言描述如下:

  1. 请求 A 先对数据库进行更新操作
  2. 在对 Redis 进行删除操作的时候发现报错,删除失败
  3. 此时将Redis 的 key 作为消息体发送到消息队列中
  4. 系统接收到消息队列发送的消息后再次对 Redis 进行删除操作

但是这个方案会有一个缺点就是会对业务代码造成大量的侵入,深深的耦合在一起,所以这时会有一个优化的方案,我们知道对 Mysql 数据库更新操作后再 binlog 日志中我们都能够找到相应的操作,那么我们可以订阅 Mysql 数据库的 binlog 日志对缓存进行操作。

利用订阅 binlog 删除缓存

总结

每种方案各有利弊,比如在第二种先删除缓存,后更新数据库这个方案我们最后讨论了要更新 Redis 的时候强制走主库查询就能解决问题,那么这样的操作会对业务代码进行大量的侵入,但是不需要增加的系统,不需要增加整体的服务的复杂度。

最后一种方案我们最后讨论了利用订阅 binlog 日志进行搭建独立系统操作 Redis,这样的缺点其实就是增加了系统复杂度。其实每一次的选择都需要我们对于我们的业务进行评估来选择,没有一种技术是对于所有业务都通用的。没有最好的,只有最适合我们的。

近期热文推荐:

1.Java 15 正式发布, 14 个新特性,刷新你的认知!!

2.终于靠开源项目弄到 IntelliJ IDEA 激活码了,真香!

3.我用 Java 8 写了一段逻辑,同事直呼看不懂,你试试看。。

4.吊打 Tomcat ,Undertow 性能很炸!!

5.《Java开发手册(嵩山版)》最新发布,速速下载!

觉得不错,别忘了随手点赞+转发哦!

查看原文

赞 4 收藏 3 评论 1

KerryWu 关注了用户 · 1月18日

禾白少二 @musfsrz

关注 14

KerryWu 收藏了文章 · 1月18日

分布式事务(三)--Seata TCC模式事务

TCC 基本原理

TCC 与 Seata AT 事务一样都是两阶段事务,它与 AT 事务的主要区别为:

  • TCC 对业务代码侵入严重
    每个阶段的数据操作都要自己进行编码来实现,事务框架无法自动处理。
  • TCC 效率更高
    不必对数据加全局锁,允许多个事务同时操作数据。

a

第一阶段 Try

以账户服务为例,当下订单时要扣减用户账户金额:

a

假如用户购买 100 元商品,要扣减 100 元。

TCC 事务首先对这100元的扣减金额进行预留,或者说是先冻结这100元:

a

第二阶段 Confirm

如果第一阶段能够顺利完成,那么说明“扣减金额”业务(分支事务)最终肯定是可以成功的。当全局事务提交时, TC会控制当前分支事务进行提交,如果提交失败,TC 会反复尝试,直到提交成功为止。

当全局事务提交时,就可以使用冻结的金额来最终实现业务数据操作:
a

第二阶段 Cancel

如果全局事务回滚,就把冻结的金额进行解冻,恢复到以前的状态,TC 会控制当前分支事务回滚,如果回滚失败,TC 会反复尝试,直到回滚完成为止。

a

多个事务并发的情况

多个TCC全局事务允许并发,它们执行扣减金额时,只需要冻结各自的金额即可:

a

Seata TCC事务模式

Seata 支持 TCC 事务模式,与 AT 模式相同的,也需要以下组件来支持全局事务的控制:

  • TC 事务协调器
  • TM 事务管理器
  • RM 资源管理器

准备订单项目案例

新建 seata-tcc 工程

新建 Empty Project:
在这里插入图片描述
工程命名为 seata-tcc,存放到 seata-samples 文件夹下,与 seata-at 工程存放在一起:

a

导入订单项目,无事务版本

下载项目代码

  1. 访问 git 仓库 https://gitee.com/benwang6/seata-samples
  2. 访问项目标签
    a
  3. 下载无事务版
    a

解压到 seata-tcc 目录

压缩文件中的 7 个项目目录解压缩到 seata-tcc 目录:

a

导入项目

在 idea 中按两下 shift 键,搜索 add maven projects,打开 maven 工具:
a

然后选择 seata-tcc 工程目录下的 7 个项目的 pom.xml 导入:

a

order启动全局事务,添加“保存订单”分支事务

在订单项目中执行添加订单:

a

我们要添加以下 TCC 事务操作的代码:

  • Try - 第一阶,冻结数据阶段,向订单表直接插入订单,订单状态设置为0(冻结状态)。

a

  • Confirm - 第二阶段,提交事务,将订单状态修改成1(正常状态)。

a

  • Cancel - 第二阶段,回滚事务,删除订单。

a

order-parent 添加 seata 依赖

打开 order-parent 中注释掉的 seata 依赖:

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.2.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>cn.tedu</groupId>
    <artifactId>order-parent</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>pom</packaging>
    <name>order-parent</name>

    <properties>
        <mybatis-plus.version>3.3.2</mybatis-plus.version>
        <druid-spring-boot-starter.version>1.1.23</druid-spring-boot-starter.version>
        <seata.version>1.3.0</seata.version>
        <spring-cloud-alibaba-seata.version>2.0.0.RELEASE</spring-cloud-alibaba-seata.version>
        <spring-cloud.version>Hoxton.SR6</spring-cloud.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-openfeign</artifactId>
        </dependency>

        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>${mybatis-plus.version}</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>${druid-spring-boot-starter.version}</version>
        </dependency>

        <!-- 打开 seata 依赖 -->
        <dependency>
          <groupId>com.alibaba.cloud</groupId>
          <artifactId>spring-cloud-alibaba-seata</artifactId>
          <version>${spring-cloud-alibaba-seata.version}</version>
          <exclusions>
            <exclusion>
              <artifactId>seata-all</artifactId>
              <groupId>io.seata</groupId>
            </exclusion>
          </exclusions>
        </dependency>
        <dependency>
          <groupId>io.seata</groupId>
          <artifactId>seata-all</artifactId>
          <version>${seata.version}</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project> 

配置

application.yml

设置全局事务组的组名:

spring:
  application:
    name: order

  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://localhost/seata_order?useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2B8
    username: root
    password: root

  # 事务组设置
  cloud:
    alibaba:
      seata:
        tx-service-group: order_tx_group

......

registry.conf 和 file.conf

与 AT 事务中的配置完全相同:

registry.conf

registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  type = "eureka"

  nacos {
    serverAddr = "localhost"
    namespace = ""
    cluster = "default"
  }
  eureka {
    serviceUrl = "http://localhost:8761/eureka"
    # application = "default"
    # weight = "1"
  }
  redis {
    serverAddr = "localhost:6379"
    db = "0"
    password = ""
    cluster = "default"
    timeout = "0"
  }
  zk {
    cluster = "default"
    serverAddr = "127.0.0.1:2181"
    session.timeout = 6000
    connect.timeout = 2000
    username = ""
    password = ""
  }
  consul {
    cluster = "default"
    serverAddr = "127.0.0.1:8500"
  }
  etcd3 {
    cluster = "default"
    serverAddr = "http://localhost:2379"
  }
  sofa {
    serverAddr = "127.0.0.1:9603"
    application = "default"
    region = "DEFAULT_ZONE"
    datacenter = "DefaultDataCenter"
    cluster = "default"
    group = "SEATA_GROUP"
    addressWaitTime = "3000"
  }
  file {
    name = "file.conf"
  }
}

config {
  # file、nacos 、apollo、zk、consul、etcd3、springCloudConfig
  type = "file"

  nacos {
    serverAddr = "localhost"
    namespace = ""
    group = "SEATA_GROUP"
  }
  consul {
    serverAddr = "127.0.0.1:8500"
  }
  apollo {
    app.id = "seata-server"
    apollo.meta = "http://192.168.1.204:8801"
    namespace = "application"
  }
  zk {
    serverAddr = "127.0.0.1:2181"
    session.timeout = 6000
    connect.timeout = 2000
    username = ""
    password = ""
  }
  etcd3 {
    serverAddr = "http://localhost:2379"
  }
  file {
    name = "file.conf"
  }
}

file.conf

transport {
  # tcp udt unix-domain-socket
  type = "TCP"
  #NIO NATIVE
  server = "NIO"
  #enable heartbeat
  heartbeat = true
  # the client batch send request enable
  enableClientBatchSendRequest = true
  #thread factory for netty
  threadFactory {
    bossThreadPrefix = "NettyBoss"
    workerThreadPrefix = "NettyServerNIOWorker"
    serverExecutorThread-prefix = "NettyServerBizHandler"
    shareBossWorker = false
    clientSelectorThreadPrefix = "NettyClientSelector"
    clientSelectorThreadSize = 1
    clientWorkerThreadPrefix = "NettyClientWorkerThread"
    # netty boss thread size,will not be used for UDT
    bossThreadSize = 1
    #auto default pin or 8
    workerThreadSize = "default"
  }
  shutdown {
    # when destroy server, wait seconds
    wait = 3
  }
  serialization = "seata"
  compressor = "none"
}
service {
  #transaction service group mapping
  # order_tx_group 与 yml 中的 “tx-service-group: order_tx_group” 配置一致
  # “seata-server” 与 TC 服务器的注册名一致
  # 从eureka获取seata-server的地址,再向seata-server注册自己,设置group
  vgroupMapping.order_tx_group = "seata-server"
  #only support when registry.type=file, please don't set multiple addresses
  order_tx_group.grouplist = "127.0.0.1:8091"
  #degrade, current not support
  enableDegrade = false
  #disable seata
  disableGlobalTransaction = false
}

client {
  rm {
    asyncCommitBufferLimit = 10000
    lock {
      retryInterval = 10
      retryTimes = 30
      retryPolicyBranchRollbackOnConflict = true
    }
    reportRetryCount = 5
    tableMetaCheckEnable = false
    reportSuccessEnable = false
  }
  tm {
    commitRetryCount = 5
    rollbackRetryCount = 5
  }
  undo {
    dataValidation = true
    logSerialization = "jackson"
    logTable = "undo_log"
  }
  log {
    exceptionRate = 100
  }
}

OrderMapper 添加更新订单状态、删除订单

根据前面的分析,订单数据操作有以下三项:

  • 插入订单
  • 修改订单状态
  • 删除订单

在 OrderMapper 中已经有插入订单的方法了,现在需要添加修改订单和删除订单的方法(删除方法从BaseMapper继承):

package cn.tedu.order.mapper;

import cn.tedu.order.entity.Order;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Param;

public interface OrderMapper extends BaseMapper {
    void create(Order order);
    void updateStatus(@Param("orderId") Long orderId, @Param("status") Integer status);
}

那么对应的 OrderMapper.xml 中也要添加 sql:

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="cn.tedu.order.mapper.OrderMapper" >
    <resultMap id="BaseResultMap" type="cn.tedu.order.entity.Order" >
        <id column="id" property="id" jdbcType="BIGINT" />
        <result column="user_id" property="userId" jdbcType="BIGINT" />
        <result column="product_id" property="productId" jdbcType="BIGINT" />
        <result column="count" property="count" jdbcType="INTEGER" />
        <result column="money" property="money" jdbcType="DECIMAL" />
        <result column="status" property="status" jdbcType="INTEGER" />
    </resultMap>
    <insert id="create">
        INSERT INTO `order` (`id`,`user_id`,`product_id`,`count`,`money`,`status`)
        VALUES(#{id}, #{userId}, #{productId}, #{count}, #{money}, ${status});
    </insert>
    <update id="updateStatus" >
        UPDATE `order` SET `status`=#{status} WHERE `id`=#{orderId};
    </update>
    <delete id="deleteById">
        DELETE FROM `order` WHERE `id`=#{orderId}
    </delete>
</mapper>

Seata 实现订单的 TCC 操作方法

  • 第一阶段 Try
  • 第二阶段

    • Confirm
    • Cancel

第二阶段为了处理幂等性问题这里首先添加一个工具类 ResultHolder

这个工具也可以在第二阶段 Confirm 或 Cancel 阶段对第一阶段的成功与否进行判断,在第一阶段成功时需要保存一个标识。

ResultHolder可以为每一个全局事务保存一个标识:

package cn.tedu.order.tcc;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class ResultHolder {
    private static Map<Class<?>, Map<String, String>> map = new ConcurrentHashMap<Class<?>, Map<String, String>>();

    public static void setResult(Class<?> actionClass, String xid, String v) {
        Map<String, String> results = map.get(actionClass);

        if (results == null) {
            synchronized (map) {
                if (results == null) {
                    results = new ConcurrentHashMap<>();
                    map.put(actionClass, results);
                }
            }
        }

        results.put(xid, v);
    }

    public static String getResult(Class<?> actionClass, String xid) {
        Map<String, String> results = map.get(actionClass);
        if (results != null) {
            return results.get(xid);
        }

        return null;
    }

    public static void removeResult(Class<?> actionClass, String xid) {
        Map<String, String> results = map.get(actionClass);
        if (results != null) {
            results.remove(xid);
        }
    }
}

Seata 实现 TCC 操作需要定义一个接口,我们在接口中添加以下方法:

  • Try - prepareCreateOrder()
  • Confirm - commit()
  • Cancel - rollback()
package cn.tedu.order.tcc;

import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import io.seata.rm.tcc.api.LocalTCC;
import io.seata.rm.tcc.api.TwoPhaseBusinessAction;

import java.math.BigDecimal;

@LocalTCC
public interface OrderTccAction {
    // T (try - 预留资源,冻结订单)
    /*
    第一阶段的方法
    通过注解指定第二阶段的两个方法名
    
    BusinessActionContext 上下文对象,用来在两个阶段之间传递数据
    @BusinessActionContextParameter 注解的参数数据会被存入 BusinessActionContext
    @TwoPhaseBusinessAction 提交回滚的默认值可以不写
     */
    @TwoPhaseBusinessAction(name = "orderTccAction", commitMethod = "commit", rollbackMethod = "rollback")
    boolean prepareCreateOrder(BusinessActionContext businessActionContext,
                      @BusinessActionContextParameter(paramName = "orderId") Long orderId,
                      @BusinessActionContextParameter(paramName = "userId") Long userId,
                      @BusinessActionContextParameter(paramName = "productId") Long productId,
                      @BusinessActionContextParameter(paramName = "count") Integer count,
                      @BusinessActionContextParameter(paramName = "money") BigDecimal money);

    //C (confirm - 确认,提交)
    boolean commit(BusinessActionContext businessActionContext);

    //C (cancel - 取消,回滚)
    boolean rollback(BusinessActionContext businessActionContext);

}

实现类:

package cn.tedu.order.tcc;

import cn.tedu.order.entity.Order;
import cn.tedu.order.mapper.OrderMapper;
import io.seata.rm.tcc.api.BusinessActionContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.math.BigDecimal;
@Component
public class OrderTccActionImpl implements OrderTccAction {
    @Autowired
    private OrderMapper orderMapper;

    @Override
    public boolean prepareCreateOrder(BusinessActionContext ctx, Long orderId, Long userId, Long productId, Integer count, BigDecimal money) {
        orderMapper.create(new Order(orderId, userId, productId, count, money, 0));

        //保存一阶段的成功标记
        ResultHolder.setResult(OrderTccAction.class, ctx.getXid(), "p");
        return true;
    }

    @Override
    public boolean commit(BusinessActionContext ctx) {
        //检查标记是否存在,如果标记不存在不重复提交
        String p = ResultHolder.getResult(OrderTccAction.class, ctx.getXid());
        if (p == null){
            return true;
        }

        /**
         * 上下文对象从第一阶段向第二阶段传递时,先转成了json数据,然后还原成上下文对象
         * 其中的整数比较小的会转成Integer类型,所以如果需要Long类型,需要先转换成字符串在用Long.valueOf()解析。
         */
        Long orderId = Long.valueOf(ctx.getActionContext("orderId").toString());
        orderMapper.updateStatus(orderId, 1);

        //提交完成后,删除标记
        ResultHolder.removeResult(OrderTccAction.class, ctx.getXid());
        return true;
    }

    @Override
    public boolean rollback(BusinessActionContext ctx) {
        //第一阶段没有完成的情况下,不必执行回滚
        //因为第一阶段有本地事务,事务失败时已经进行了回滚。
        //如果这里第一阶段成功,而其他全局事务参与者失败,这里会执行回滚
        //幂等性控制:如果重复执行回滚则直接返回
        
        //检查标记是否存在,如果标记不存在不重复提交
        String p = ResultHolder.getResult(OrderTccAction.class, ctx.getXid());
        if (p == null){
            return true;
        }

        Long orderId = Long.valueOf(ctx.getActionContext("orderId").toString());
        orderMapper.deleteById(orderId);

        //提交完成后,删除标记
        ResultHolder.removeResult(OrderTccAction.class, ctx.getXid());
        return true;
    }
}

在业务代码中调用 Try 阶段方法

业务代码中不再直接保存订单数据,而是调用 TCC 第一阶段方法prepareCreateOrder(),并添加全局事务注解 @GlobalTransactional

package cn.tedu.order.service;

import cn.tedu.order.entity.Order;
import cn.tedu.order.feign.AccountClient;
import cn.tedu.order.feign.EasyIdGeneratorClient;
import cn.tedu.order.feign.StorageClient;
import cn.tedu.order.mapper.OrderMapper;
import cn.tedu.order.tcc.OrderTccAction;
import io.seata.spring.annotation.GlobalTransactional;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.Random;

@Service
public class OrderServiceImpl implements OrderService {
    // @Autowired
    // private OrderMapper orderMapper;
    @Autowired
    EasyIdGeneratorClient easyIdGeneratorClient;
    @Autowired
    private AccountClient accountClient;
    @Autowired
    private StorageClient storageClient;

    @Autowired
    private OrderTccAction orderTccAction;

    @GlobalTransactional
    @Override
    public void create(Order order) {
        // 从全局唯一id发号器获得id
        Long orderId = easyIdGeneratorClient.nextId("order_business");
        order.setId(orderId);

        // orderMapper.create(order);

        // 这里修改成调用 TCC 第一节端方法
        //orderTccAction是一个动态代理对象,其中添加了前置拦截器,所以底层会创建,所以传null值即可
        orderTccAction.prepareCreateOrder(
                null,
                order.getId(),
                order.getUserId(),
                order.getProductId(),
                order.getCount(),
                order.getMoney());

        // 修改库存
        //storageClient.decrease(order.getProductId(), order.getCount());

        // 修改账户余额
        //accountClient.decrease(order.getUserId(), order.getMoney());

    }
}

启动 order 进行测试

按顺序启动服务:

  1. Eureka
  2. Seata Server
  3. Easy Id Generator
  4. Order

调用保存订单,地址:
http://localhost:8083/create?userId=1&productId=1&count=10&money=100

观察控制台日志:
a

查看数据库表中的订单数据:

a

storage添加“减少库存”分支事务

在库存项目中执行减少库存:

a

我们要添加以下 TCC 事务操作的代码:

  • Try - 第一阶,冻结数据阶段,将要减少的库存量先冻结:

a

  • Confirm - 第二阶段,提交事务,使用冻结的库存完成业务数据处理:

a

  • Cancel - 第二阶段,回滚事务,冻结的库存解冻,恢复以前的库存量:

a

配置

有三个文件需要配置:

  • application.yml
  • registry.conf
  • file.conf

这三个文件的设置与上面 order 项目的配置完全相同,请参考上面订单配置一章进行配置。

StorageMapper 添加冻结库存相关方法

根据前面的分析,库存数据操作有以下三项:

  • 冻结库存
  • 冻结库存量修改为已售出量
  • 解冻库存

在 StorageMapper 中添加三个方法:

package cn.tedu.storage.mapper;

import cn.tedu.storage.entity.Storage;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Param;

public interface StorageMapper extends BaseMapper<Storage> {
    void decrease(Long productId, Integer count);
    // 冻结库存
    void updateFrozen(@Param("productId") Long productId, @Param("residue") Integer residue, @Param("frozen") Integer frozen);
    // 提交时,把冻结量修改到已售出
    void updateFrozenToUsed(@Param("productId") Long productId, @Param("count") Integer count);
    // 回滚时,把冻结量修改到可用库存
    void updateFrozenToResidue(@Param("productId") Long productId, @Param("count") Integer count);
} 

那么对应的 StorageMapper.xml 中也要添加 sql:

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="cn.tedu.storage.mapper.StorageMapper" >
    <resultMap id="BaseResultMap" type="cn.tedu.storage.entity.Storage" >
        <id column="id" property="id" jdbcType="BIGINT" />
        <result column="product_id" property="productId" jdbcType="BIGINT" />
        <result column="total" property="total" jdbcType="INTEGER" />
        <result column="used" property="used" jdbcType="INTEGER" />
        <result column="residue" property="residue" jdbcType="INTEGER" />
    </resultMap>
    <update id="decrease">
    UPDATE storage SET used = used + #{count},residue = residue - #{count} WHERE product_id = #{productId}
  </update>
    <select id="selectById" resultMap="BaseResultMap">
        SELECT * FROM storage WHERE `product_id`=#{productId}
    </select>

    <update id="updateFrozen">
        UPDATE storage SET `residue`=#{residue},`frozen`=#{frozen} WHERE `product_id`=#{productId}
    </update>

    <update id="updateFrozenToUsed">
        UPDATE storage SET `frozen`=`frozen`-#{count}, `used`=`used`+#{count} WHERE `product_id`=#{productId}
    </update>

    <update id="updateFrozenToResidue">
        UPDATE storage SET `frozen`=`frozen`-#{count}, `residue`=`residue`+#{count} WHERE `product_id`=#{productId}
    </update>
</mapper>

Seata 实现库存的 TCC 操作方法

工具类 ResultHolder

package cn.tedu.storage.tcc;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class ResultHolder {
    private static Map<Class<?>, Map<String, String>> map = new ConcurrentHashMap<Class<?>, Map<String, String>>();

    public static void setResult(Class<?> actionClass, String xid, String v) {
        Map<String, String> results = map.get(actionClass);

        if (results == null) {
            synchronized (map) {
                if (results == null) {
                    results = new ConcurrentHashMap<>();
                    map.put(actionClass, results);
                }
            }
        }

        results.put(xid, v);
    }

    public static String getResult(Class<?> actionClass, String xid) {
        Map<String, String> results = map.get(actionClass);
        if (results != null) {
            return results.get(xid);
        }

        return null;
    }

    public static void removeResult(Class<?> actionClass, String xid) {
        Map<String, String> results = map.get(actionClass);
        if (results != null) {
            results.remove(xid);
        }
    }
}

添加 TCC 接口,在接口中添加以下方法:

  • Try - prepareDecreaseStorage()
  • Confirm - commit()
  • Cancel - rollback()
package cn.tedu.storage.tcc;

import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import io.seata.rm.tcc.api.LocalTCC;
import io.seata.rm.tcc.api.TwoPhaseBusinessAction;

@LocalTCC
public interface StorageTccAction {

    @TwoPhaseBusinessAction(name = "storageTccAction", commitMethod = "commit", rollbackMethod = "rollback")
    boolean prepareDecreaseStorage(BusinessActionContext businessActionContext,
                                   @BusinessActionContextParameter(paramName = "productId") Long productId,
                                   @BusinessActionContextParameter(paramName = "count") Integer count);

    boolean commit(BusinessActionContext businessActionContext);

    boolean rollback(BusinessActionContext businessActionContext);

}

实现类:

package cn.tedu.storage.tcc;

import cn.tedu.storage.entity.Storage;
import cn.tedu.storage.mapper.StorageMapper;
import io.seata.rm.tcc.api.BusinessActionContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

@Component
@Slf4j
public class StorageTccActionImpl implements StorageTccAction {

    @Autowired
    private StorageMapper storageMapper;

    @Transactional
    @Override
    public boolean prepareDecreaseStorage(BusinessActionContext businessActionContext, Long productId, Integer count) {
        log.info("减少商品库存,第一阶段,锁定减少的库存量,productId="+productId+", count="+count);

        Storage storage = storageMapper.selectById(productId);
        if (storage.getResidue()-count<0) {
            throw new RuntimeException("库存不足");
        }

        /*
        库存减掉count, 冻结库存增加count
         */
        storageMapper.updateFrozen(productId, storage.getResidue()-count, storage.getFrozen()+count);

        //保存标识
        ResultHolder.setResult(getClass(), businessActionContext.getXid(), "p");
        return true;
    }

    @Transactional
    @Override
    public boolean commit(BusinessActionContext businessActionContext) {
        long productId = Long.parseLong(businessActionContext.getActionContext("productId").toString());
        int count = Integer.parseInt(businessActionContext.getActionContext("count").toString());
        log.info("减少商品库存,第二阶段提交,productId="+productId+", count="+count);

        //防止重复提交
        if (ResultHolder.getResult(getClass(), businessActionContext.getXid()) == null) {
            return true;
        }

        storageMapper.updateFrozenToUsed(productId, count);

        //删除标识
        ResultHolder.removeResult(getClass(), businessActionContext.getXid());
        return true;
    }

    @Transactional
    @Override
    public boolean rollback(BusinessActionContext businessActionContext) {

        long productId = Long.parseLong(businessActionContext.getActionContext("productId").toString());
        int count = Integer.parseInt(businessActionContext.getActionContext("count").toString());
        log.info("减少商品库存,第二阶段,回滚,productId="+productId+", count="+count);

        //防止重复回滚
        if (ResultHolder.getResult(getClass(), businessActionContext.getXid()) == null) {
            return true;
        }

        storageMapper.updateFrozenToResidue(productId, count);

        //删除标识
        ResultHolder.removeResult(getClass(), businessActionContext.getXid());
        return true;
    }
} 

在业务代码中调用 Try 阶段方法

业务代码中调用 TCC 第一阶段方法prepareDecreaseStorage(),并添加全局事务注解 @GlobalTransactional

package cn.tedu.storage.service;

import cn.tedu.storage.tcc.StorageTccAction;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class StorageServiceImpl implements StorageService {
    // @Autowired
    // private StorageMapper storageMapper;

    @Autowired
    private StorageTccAction storageTccAction;

    @Override
    public void decrease(Long productId, Integer count) throws Exception {
        // storageMapper.decrease(productId,count);
        storageTccAction.prepareDecreaseStorage(null, productId, count);
    }

}

启动 storage 进行测试

按顺序启动服务:

  1. Eureka
  2. Seata Server
  3. Easy Id Generator
  4. Storage
  5. Order

调用保存订单,地址:
http://localhost:8083/create?userId=1&productId=1&count=10&money=100

观察 storage 的控制台日志:
a

查看数据库表中的库存数据:

a

account添加“扣减金额”分支事务

扣减金额 TCC 事务分析请见《分布式事务(六)Seata TCC模式-TCC模式介绍

配置

有三个文件需要配置:

  • application.yml
  • registry.conf
  • file.conf

这三个文件的设置与上面 order 项目的配置完全相同,请参考上面订单配置一章进行配置。

AccountMapper 添加冻结库存相关方法

根据前面的分析,库存数据操作有以下三项:

  • 冻结库存
  • 冻结库存量修改为已售出量
  • 解冻库存

在 AccountMapper 中添加三个方法:

package cn.tedu.account.mapper;

import cn.tedu.account.entity.Account;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Param;

import java.math.BigDecimal;

public interface AccountMapper extends BaseMapper<Account> {
    void decrease(Long userId, BigDecimal money);

    void updateFrozen(@Param("userId") Long userId, @Param("residue") BigDecimal residue, @Param("frozen") BigDecimal frozen);

    void updateFrozenToUsed(@Param("userId") Long userId, @Param("money") BigDecimal money);

    void updateFrozenToResidue(@Param("userId") Long userId, @Param("money") BigDecimal money);
}

那么对应的 AccountMapper.xml 中添加 sql:

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="cn.tedu.account.mapper.AccountMapper" >
    <resultMap id="BaseResultMap" type="cn.tedu.account.entity.Account" >
        <id column="id" property="id" jdbcType="BIGINT" />
        <result column="user_id" property="userId" jdbcType="BIGINT" />
        <result column="total" property="total" jdbcType="DECIMAL" />
        <result column="used" property="used" jdbcType="DECIMAL" />
        <result column="residue" property="residue" jdbcType="DECIMAL"/>
        <result column="frozen" property="frozen" jdbcType="DECIMAL"/>
    </resultMap>
    <update id="decrease">
      UPDATE account SET residue = residue - #{money},used = used + #{money} where user_id = #{userId};
    </update>
    <select id="selectById" resultMap="BaseResultMap">
        SELECT * FROM account WHERE `user_id`=#{userId}
    </select>

    <update id="updateFrozen">
        UPDATE account SET `residue`=#{residue},`frozen`=#{frozen} WHERE `user_id`=#{userId}
    </update>

    <update id="updateFrozenToUsed">
        UPDATE account SET `frozen`=`frozen`-#{money}, `used`=`used`+#{money} WHERE `user_id`=#{userId}
    </update>

    <update id="updateFrozenToResidue">
        UPDATE account SET `frozen`=`frozen`-#{money}, `residue`=`residue`+#{money} WHERE `user_id`=#{userId}
    </update>
</mapper>

Seata 实现库存的 TCC 操作方法

工具类 ResultHolder

package cn.tedu.account.tcc;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class ResultHolder {
    private static Map<Class<?>, Map<String, String>> map = new ConcurrentHashMap<Class<?>, Map<String, String>>();

    public static void setResult(Class<?> actionClass, String xid, String v) {
        Map<String, String> results = map.get(actionClass);

        if (results == null) {
            synchronized (map) {
                if (results == null) {
                    results = new ConcurrentHashMap<>();
                    map.put(actionClass, results);
                }
            }
        }

        results.put(xid, v);
    }

    public static String getResult(Class<?> actionClass, String xid) {
        Map<String, String> results = map.get(actionClass);
        if (results != null) {
            return results.get(xid);
        }

        return null;
    }

    public static void removeResult(Class<?> actionClass, String xid) {
        Map<String, String> results = map.get(actionClass);
        if (results != null) {
            results.remove(xid);
        }
    }
}

添加 TCC 接口,在接口中添加以下方法:

  • Try - prepareDecreaseAccount()
  • Confirm - commit()
  • Cancel - rollback()
package cn.tedu.account.tcc;

import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import io.seata.rm.tcc.api.LocalTCC;
import io.seata.rm.tcc.api.TwoPhaseBusinessAction;

import java.math.BigDecimal;

@LocalTCC
public interface AccountTccAction {

    @TwoPhaseBusinessAction(name = "accountTccAction", commitMethod = "commit", rollbackMethod = "rollback")
    boolean prepareDecreaseAccount(BusinessActionContext businessActionContext,
                                   @BusinessActionContextParameter(paramName = "userId") Long userId,
                                   @BusinessActionContextParameter(paramName = "money") BigDecimal money);

    boolean commit(BusinessActionContext businessActionContext);

    boolean rollback(BusinessActionContext businessActionContext);

}

实现类:

package cn.tedu.account.tcc;

import cn.tedu.account.entity.Account;
import cn.tedu.account.mapper.AccountMapper;
import io.seata.rm.tcc.api.BusinessActionContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import java.math.BigDecimal;

@Component
@Slf4j
public class AccountTccActionImpl implements AccountTccAction {
    @Autowired
    private AccountMapper accountMapper;

    @Transactional
    @Override
    public boolean prepareDecreaseAccount(BusinessActionContext businessActionContext, Long userId, BigDecimal money) {
        log.info("减少账户金额,第一阶段锁定金额,userId="+userId+", money="+money);

        Account account = accountMapper.selectById(userId);
        if (account.getResidue().compareTo(money) < 0) {
            throw new RuntimeException("账户金额不足");
        }

        /*
        余额-money
        冻结+money
         */
        accountMapper.updateFrozen(userId, account.getResidue().subtract(money), account.getFrozen().add(money));

        //保存标识
        ResultHolder.setResult(getClass(), businessActionContext.getXid(), "p");
        return true;
    }

    @Transactional
    @Override
    public boolean commit(BusinessActionContext businessActionContext) {

        long userId = Long.parseLong(businessActionContext.getActionContext("userId").toString());
        BigDecimal money =  new BigDecimal(businessActionContext.getActionContext("money").toString());
        log.info("减少账户金额,第二阶段,提交,userId="+userId+", money="+money);

        //防止重复提交
        if (ResultHolder.getResult(getClass(), businessActionContext.getXid()) == null) {
            return true;
        }

        accountMapper.updateFrozenToUsed(userId, money);

        //删除标识
        ResultHolder.removeResult(getClass(), businessActionContext.getXid());
        return true;
    }

    @Transactional
    @Override
    public boolean rollback(BusinessActionContext businessActionContext) {
        long userId = Long.parseLong(businessActionContext.getActionContext("userId").toString());
        BigDecimal money =  new BigDecimal(businessActionContext.getActionContext("money").toString());

        //防止重复回滚
        if (ResultHolder.getResult(getClass(), businessActionContext.getXid()) == null) {
            return true;
        }

        log.info("减少账户金额,第二阶段,回滚,userId="+userId+", money="+money);

        accountMapper.updateFrozenToResidue(userId, money);

        //删除标识
        ResultHolder.removeResult(getClass(), businessActionContext.getXid());
        return true;
    }
}

在业务代码中调用 Try 阶段方法

业务代码中调用 TCC 第一阶段方法prepareDecreaseAccount(),并添加全局事务注解 @GlobalTransactional

package cn.tedu.account.service;

import cn.tedu.account.mapper.AccountMapper;
import cn.tedu.account.tcc.AccountTccAction;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.math.BigDecimal;
@Service
public class AccountServiceImpl implements AccountService {
    // @Autowired
    // private AccountMapper accountMapper;

    @Autowired
    private AccountTccAction accountTccAction;

    @Override
    public void decrease(Long userId, BigDecimal money) {
        // accountMapper.decrease(userId,money);
        accountTccAction.prepareDecreaseAccount(null, userId, money);
    }
}

启动 account 进行测试

按顺序启动服务:

  1. Eureka
  2. Seata Server
  3. Easy Id Generator
  4. Storage
  5. Account
  6. Order

调用保存订单,地址:
http://localhost:8083/create?userId=1&productId=1&count=10&money=100

观察 account 的控制台日志:
a

查看数据库表中的账户数据:

a

全局事务回滚测试

下面来测试全局事务回滚的情况。

订单和库存第一阶段成功,而账户第一阶段失败了,这时会触发全局事务的回滚,如下图所示:

a
首先在 account 的第一阶段代码中添加模拟异常:

AccountTccActionImplprepareDecreaseAccount 方法

@Transactional
    @Override
    public boolean prepareDecreaseAccount(BusinessActionContext businessActionContext, Long userId, BigDecimal money) {
        log.info("减少账户金额,第一阶段锁定金额,userId="+userId+", money="+money);

        Account account = accountMapper.selectById(userId);
        if (account.getResidue().compareTo(money) < 0) {
            throw new RuntimeException("账户金额不足");
        }

        /*
        余额-money
        冻结+money
         */
        accountMapper.updateFrozen(userId, account.getResidue().subtract(money), account.getFrozen().add(money));

        if (Math.random() < 0.5) {
            throw new RuntimeException("模拟异常");
        }

        //保存标识
        ResultHolder.setResult(getClass(), businessActionContext.getXid(), "p");
        return true;
    }

重启 account 后,访问订单:
http://localhost:8083/create?userId=1&productId=1&count=10&money=100

查看控制台,可以看到 storage 和 order 的回滚日志,order 的回滚日志如下:

a

查看原文

KerryWu 赞了文章 · 1月18日

分布式事务(三)--Seata TCC模式事务

TCC 基本原理

TCC 与 Seata AT 事务一样都是两阶段事务,它与 AT 事务的主要区别为:

  • TCC 对业务代码侵入严重
    每个阶段的数据操作都要自己进行编码来实现,事务框架无法自动处理。
  • TCC 效率更高
    不必对数据加全局锁,允许多个事务同时操作数据。

a

第一阶段 Try

以账户服务为例,当下订单时要扣减用户账户金额:

a

假如用户购买 100 元商品,要扣减 100 元。

TCC 事务首先对这100元的扣减金额进行预留,或者说是先冻结这100元:

a

第二阶段 Confirm

如果第一阶段能够顺利完成,那么说明“扣减金额”业务(分支事务)最终肯定是可以成功的。当全局事务提交时, TC会控制当前分支事务进行提交,如果提交失败,TC 会反复尝试,直到提交成功为止。

当全局事务提交时,就可以使用冻结的金额来最终实现业务数据操作:
a

第二阶段 Cancel

如果全局事务回滚,就把冻结的金额进行解冻,恢复到以前的状态,TC 会控制当前分支事务回滚,如果回滚失败,TC 会反复尝试,直到回滚完成为止。

a

多个事务并发的情况

多个TCC全局事务允许并发,它们执行扣减金额时,只需要冻结各自的金额即可:

a

Seata TCC事务模式

Seata 支持 TCC 事务模式,与 AT 模式相同的,也需要以下组件来支持全局事务的控制:

  • TC 事务协调器
  • TM 事务管理器
  • RM 资源管理器

准备订单项目案例

新建 seata-tcc 工程

新建 Empty Project:
在这里插入图片描述
工程命名为 seata-tcc,存放到 seata-samples 文件夹下,与 seata-at 工程存放在一起:

a

导入订单项目,无事务版本

下载项目代码

  1. 访问 git 仓库 https://gitee.com/benwang6/seata-samples
  2. 访问项目标签
    a
  3. 下载无事务版
    a

解压到 seata-tcc 目录

压缩文件中的 7 个项目目录解压缩到 seata-tcc 目录:

a

导入项目

在 idea 中按两下 shift 键,搜索 add maven projects,打开 maven 工具:
a

然后选择 seata-tcc 工程目录下的 7 个项目的 pom.xml 导入:

a

order启动全局事务,添加“保存订单”分支事务

在订单项目中执行添加订单:

a

我们要添加以下 TCC 事务操作的代码:

  • Try - 第一阶,冻结数据阶段,向订单表直接插入订单,订单状态设置为0(冻结状态)。

a

  • Confirm - 第二阶段,提交事务,将订单状态修改成1(正常状态)。

a

  • Cancel - 第二阶段,回滚事务,删除订单。

a

order-parent 添加 seata 依赖

打开 order-parent 中注释掉的 seata 依赖:

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.2.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>cn.tedu</groupId>
    <artifactId>order-parent</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>pom</packaging>
    <name>order-parent</name>

    <properties>
        <mybatis-plus.version>3.3.2</mybatis-plus.version>
        <druid-spring-boot-starter.version>1.1.23</druid-spring-boot-starter.version>
        <seata.version>1.3.0</seata.version>
        <spring-cloud-alibaba-seata.version>2.0.0.RELEASE</spring-cloud-alibaba-seata.version>
        <spring-cloud.version>Hoxton.SR6</spring-cloud.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-openfeign</artifactId>
        </dependency>

        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>${mybatis-plus.version}</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>${druid-spring-boot-starter.version}</version>
        </dependency>

        <!-- 打开 seata 依赖 -->
        <dependency>
          <groupId>com.alibaba.cloud</groupId>
          <artifactId>spring-cloud-alibaba-seata</artifactId>
          <version>${spring-cloud-alibaba-seata.version}</version>
          <exclusions>
            <exclusion>
              <artifactId>seata-all</artifactId>
              <groupId>io.seata</groupId>
            </exclusion>
          </exclusions>
        </dependency>
        <dependency>
          <groupId>io.seata</groupId>
          <artifactId>seata-all</artifactId>
          <version>${seata.version}</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project> 

配置

application.yml

设置全局事务组的组名:

spring:
  application:
    name: order

  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://localhost/seata_order?useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2B8
    username: root
    password: root

  # 事务组设置
  cloud:
    alibaba:
      seata:
        tx-service-group: order_tx_group

......

registry.conf 和 file.conf

与 AT 事务中的配置完全相同:

registry.conf

registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  type = "eureka"

  nacos {
    serverAddr = "localhost"
    namespace = ""
    cluster = "default"
  }
  eureka {
    serviceUrl = "http://localhost:8761/eureka"
    # application = "default"
    # weight = "1"
  }
  redis {
    serverAddr = "localhost:6379"
    db = "0"
    password = ""
    cluster = "default"
    timeout = "0"
  }
  zk {
    cluster = "default"
    serverAddr = "127.0.0.1:2181"
    session.timeout = 6000
    connect.timeout = 2000
    username = ""
    password = ""
  }
  consul {
    cluster = "default"
    serverAddr = "127.0.0.1:8500"
  }
  etcd3 {
    cluster = "default"
    serverAddr = "http://localhost:2379"
  }
  sofa {
    serverAddr = "127.0.0.1:9603"
    application = "default"
    region = "DEFAULT_ZONE"
    datacenter = "DefaultDataCenter"
    cluster = "default"
    group = "SEATA_GROUP"
    addressWaitTime = "3000"
  }
  file {
    name = "file.conf"
  }
}

config {
  # file、nacos 、apollo、zk、consul、etcd3、springCloudConfig
  type = "file"

  nacos {
    serverAddr = "localhost"
    namespace = ""
    group = "SEATA_GROUP"
  }
  consul {
    serverAddr = "127.0.0.1:8500"
  }
  apollo {
    app.id = "seata-server"
    apollo.meta = "http://192.168.1.204:8801"
    namespace = "application"
  }
  zk {
    serverAddr = "127.0.0.1:2181"
    session.timeout = 6000
    connect.timeout = 2000
    username = ""
    password = ""
  }
  etcd3 {
    serverAddr = "http://localhost:2379"
  }
  file {
    name = "file.conf"
  }
}

file.conf

transport {
  # tcp udt unix-domain-socket
  type = "TCP"
  #NIO NATIVE
  server = "NIO"
  #enable heartbeat
  heartbeat = true
  # the client batch send request enable
  enableClientBatchSendRequest = true
  #thread factory for netty
  threadFactory {
    bossThreadPrefix = "NettyBoss"
    workerThreadPrefix = "NettyServerNIOWorker"
    serverExecutorThread-prefix = "NettyServerBizHandler"
    shareBossWorker = false
    clientSelectorThreadPrefix = "NettyClientSelector"
    clientSelectorThreadSize = 1
    clientWorkerThreadPrefix = "NettyClientWorkerThread"
    # netty boss thread size,will not be used for UDT
    bossThreadSize = 1
    #auto default pin or 8
    workerThreadSize = "default"
  }
  shutdown {
    # when destroy server, wait seconds
    wait = 3
  }
  serialization = "seata"
  compressor = "none"
}
service {
  #transaction service group mapping
  # order_tx_group 与 yml 中的 “tx-service-group: order_tx_group” 配置一致
  # “seata-server” 与 TC 服务器的注册名一致
  # 从eureka获取seata-server的地址,再向seata-server注册自己,设置group
  vgroupMapping.order_tx_group = "seata-server"
  #only support when registry.type=file, please don't set multiple addresses
  order_tx_group.grouplist = "127.0.0.1:8091"
  #degrade, current not support
  enableDegrade = false
  #disable seata
  disableGlobalTransaction = false
}

client {
  rm {
    asyncCommitBufferLimit = 10000
    lock {
      retryInterval = 10
      retryTimes = 30
      retryPolicyBranchRollbackOnConflict = true
    }
    reportRetryCount = 5
    tableMetaCheckEnable = false
    reportSuccessEnable = false
  }
  tm {
    commitRetryCount = 5
    rollbackRetryCount = 5
  }
  undo {
    dataValidation = true
    logSerialization = "jackson"
    logTable = "undo_log"
  }
  log {
    exceptionRate = 100
  }
}

OrderMapper 添加更新订单状态、删除订单

根据前面的分析,订单数据操作有以下三项:

  • 插入订单
  • 修改订单状态
  • 删除订单

在 OrderMapper 中已经有插入订单的方法了,现在需要添加修改订单和删除订单的方法(删除方法从BaseMapper继承):

package cn.tedu.order.mapper;

import cn.tedu.order.entity.Order;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Param;

public interface OrderMapper extends BaseMapper {
    void create(Order order);
    void updateStatus(@Param("orderId") Long orderId, @Param("status") Integer status);
}

那么对应的 OrderMapper.xml 中也要添加 sql:

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="cn.tedu.order.mapper.OrderMapper" >
    <resultMap id="BaseResultMap" type="cn.tedu.order.entity.Order" >
        <id column="id" property="id" jdbcType="BIGINT" />
        <result column="user_id" property="userId" jdbcType="BIGINT" />
        <result column="product_id" property="productId" jdbcType="BIGINT" />
        <result column="count" property="count" jdbcType="INTEGER" />
        <result column="money" property="money" jdbcType="DECIMAL" />
        <result column="status" property="status" jdbcType="INTEGER" />
    </resultMap>
    <insert id="create">
        INSERT INTO `order` (`id`,`user_id`,`product_id`,`count`,`money`,`status`)
        VALUES(#{id}, #{userId}, #{productId}, #{count}, #{money}, ${status});
    </insert>
    <update id="updateStatus" >
        UPDATE `order` SET `status`=#{status} WHERE `id`=#{orderId};
    </update>
    <delete id="deleteById">
        DELETE FROM `order` WHERE `id`=#{orderId}
    </delete>
</mapper>

Seata 实现订单的 TCC 操作方法

  • 第一阶段 Try
  • 第二阶段

    • Confirm
    • Cancel

第二阶段为了处理幂等性问题这里首先添加一个工具类 ResultHolder

这个工具也可以在第二阶段 Confirm 或 Cancel 阶段对第一阶段的成功与否进行判断,在第一阶段成功时需要保存一个标识。

ResultHolder可以为每一个全局事务保存一个标识:

package cn.tedu.order.tcc;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class ResultHolder {
    private static Map<Class<?>, Map<String, String>> map = new ConcurrentHashMap<Class<?>, Map<String, String>>();

    public static void setResult(Class<?> actionClass, String xid, String v) {
        Map<String, String> results = map.get(actionClass);

        if (results == null) {
            synchronized (map) {
                if (results == null) {
                    results = new ConcurrentHashMap<>();
                    map.put(actionClass, results);
                }
            }
        }

        results.put(xid, v);
    }

    public static String getResult(Class<?> actionClass, String xid) {
        Map<String, String> results = map.get(actionClass);
        if (results != null) {
            return results.get(xid);
        }

        return null;
    }

    public static void removeResult(Class<?> actionClass, String xid) {
        Map<String, String> results = map.get(actionClass);
        if (results != null) {
            results.remove(xid);
        }
    }
}

Seata 实现 TCC 操作需要定义一个接口,我们在接口中添加以下方法:

  • Try - prepareCreateOrder()
  • Confirm - commit()
  • Cancel - rollback()
package cn.tedu.order.tcc;

import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import io.seata.rm.tcc.api.LocalTCC;
import io.seata.rm.tcc.api.TwoPhaseBusinessAction;

import java.math.BigDecimal;

@LocalTCC
public interface OrderTccAction {
    // T (try - 预留资源,冻结订单)
    /*
    第一阶段的方法
    通过注解指定第二阶段的两个方法名
    
    BusinessActionContext 上下文对象,用来在两个阶段之间传递数据
    @BusinessActionContextParameter 注解的参数数据会被存入 BusinessActionContext
    @TwoPhaseBusinessAction 提交回滚的默认值可以不写
     */
    @TwoPhaseBusinessAction(name = "orderTccAction", commitMethod = "commit", rollbackMethod = "rollback")
    boolean prepareCreateOrder(BusinessActionContext businessActionContext,
                      @BusinessActionContextParameter(paramName = "orderId") Long orderId,
                      @BusinessActionContextParameter(paramName = "userId") Long userId,
                      @BusinessActionContextParameter(paramName = "productId") Long productId,
                      @BusinessActionContextParameter(paramName = "count") Integer count,
                      @BusinessActionContextParameter(paramName = "money") BigDecimal money);

    //C (confirm - 确认,提交)
    boolean commit(BusinessActionContext businessActionContext);

    //C (cancel - 取消,回滚)
    boolean rollback(BusinessActionContext businessActionContext);

}

实现类:

package cn.tedu.order.tcc;

import cn.tedu.order.entity.Order;
import cn.tedu.order.mapper.OrderMapper;
import io.seata.rm.tcc.api.BusinessActionContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.math.BigDecimal;
@Component
public class OrderTccActionImpl implements OrderTccAction {
    @Autowired
    private OrderMapper orderMapper;

    @Override
    public boolean prepareCreateOrder(BusinessActionContext ctx, Long orderId, Long userId, Long productId, Integer count, BigDecimal money) {
        orderMapper.create(new Order(orderId, userId, productId, count, money, 0));

        //保存一阶段的成功标记
        ResultHolder.setResult(OrderTccAction.class, ctx.getXid(), "p");
        return true;
    }

    @Override
    public boolean commit(BusinessActionContext ctx) {
        //检查标记是否存在,如果标记不存在不重复提交
        String p = ResultHolder.getResult(OrderTccAction.class, ctx.getXid());
        if (p == null){
            return true;
        }

        /**
         * 上下文对象从第一阶段向第二阶段传递时,先转成了json数据,然后还原成上下文对象
         * 其中的整数比较小的会转成Integer类型,所以如果需要Long类型,需要先转换成字符串在用Long.valueOf()解析。
         */
        Long orderId = Long.valueOf(ctx.getActionContext("orderId").toString());
        orderMapper.updateStatus(orderId, 1);

        //提交完成后,删除标记
        ResultHolder.removeResult(OrderTccAction.class, ctx.getXid());
        return true;
    }

    @Override
    public boolean rollback(BusinessActionContext ctx) {
        //第一阶段没有完成的情况下,不必执行回滚
        //因为第一阶段有本地事务,事务失败时已经进行了回滚。
        //如果这里第一阶段成功,而其他全局事务参与者失败,这里会执行回滚
        //幂等性控制:如果重复执行回滚则直接返回
        
        //检查标记是否存在,如果标记不存在不重复提交
        String p = ResultHolder.getResult(OrderTccAction.class, ctx.getXid());
        if (p == null){
            return true;
        }

        Long orderId = Long.valueOf(ctx.getActionContext("orderId").toString());
        orderMapper.deleteById(orderId);

        //提交完成后,删除标记
        ResultHolder.removeResult(OrderTccAction.class, ctx.getXid());
        return true;
    }
}

在业务代码中调用 Try 阶段方法

业务代码中不再直接保存订单数据,而是调用 TCC 第一阶段方法prepareCreateOrder(),并添加全局事务注解 @GlobalTransactional

package cn.tedu.order.service;

import cn.tedu.order.entity.Order;
import cn.tedu.order.feign.AccountClient;
import cn.tedu.order.feign.EasyIdGeneratorClient;
import cn.tedu.order.feign.StorageClient;
import cn.tedu.order.mapper.OrderMapper;
import cn.tedu.order.tcc.OrderTccAction;
import io.seata.spring.annotation.GlobalTransactional;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.Random;

@Service
public class OrderServiceImpl implements OrderService {
    // @Autowired
    // private OrderMapper orderMapper;
    @Autowired
    EasyIdGeneratorClient easyIdGeneratorClient;
    @Autowired
    private AccountClient accountClient;
    @Autowired
    private StorageClient storageClient;

    @Autowired
    private OrderTccAction orderTccAction;

    @GlobalTransactional
    @Override
    public void create(Order order) {
        // 从全局唯一id发号器获得id
        Long orderId = easyIdGeneratorClient.nextId("order_business");
        order.setId(orderId);

        // orderMapper.create(order);

        // 这里修改成调用 TCC 第一节端方法
        //orderTccAction是一个动态代理对象,其中添加了前置拦截器,所以底层会创建,所以传null值即可
        orderTccAction.prepareCreateOrder(
                null,
                order.getId(),
                order.getUserId(),
                order.getProductId(),
                order.getCount(),
                order.getMoney());

        // 修改库存
        //storageClient.decrease(order.getProductId(), order.getCount());

        // 修改账户余额
        //accountClient.decrease(order.getUserId(), order.getMoney());

    }
}

启动 order 进行测试

按顺序启动服务:

  1. Eureka
  2. Seata Server
  3. Easy Id Generator
  4. Order

调用保存订单,地址:
http://localhost:8083/create?userId=1&productId=1&count=10&money=100

观察控制台日志:
a

查看数据库表中的订单数据:

a

storage添加“减少库存”分支事务

在库存项目中执行减少库存:

a

我们要添加以下 TCC 事务操作的代码:

  • Try - 第一阶,冻结数据阶段,将要减少的库存量先冻结:

a

  • Confirm - 第二阶段,提交事务,使用冻结的库存完成业务数据处理:

a

  • Cancel - 第二阶段,回滚事务,冻结的库存解冻,恢复以前的库存量:

a

配置

有三个文件需要配置:

  • application.yml
  • registry.conf
  • file.conf

这三个文件的设置与上面 order 项目的配置完全相同,请参考上面订单配置一章进行配置。

StorageMapper 添加冻结库存相关方法

根据前面的分析,库存数据操作有以下三项:

  • 冻结库存
  • 冻结库存量修改为已售出量
  • 解冻库存

在 StorageMapper 中添加三个方法:

package cn.tedu.storage.mapper;

import cn.tedu.storage.entity.Storage;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Param;

public interface StorageMapper extends BaseMapper<Storage> {
    void decrease(Long productId, Integer count);
    // 冻结库存
    void updateFrozen(@Param("productId") Long productId, @Param("residue") Integer residue, @Param("frozen") Integer frozen);
    // 提交时,把冻结量修改到已售出
    void updateFrozenToUsed(@Param("productId") Long productId, @Param("count") Integer count);
    // 回滚时,把冻结量修改到可用库存
    void updateFrozenToResidue(@Param("productId") Long productId, @Param("count") Integer count);
} 

那么对应的 StorageMapper.xml 中也要添加 sql:

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="cn.tedu.storage.mapper.StorageMapper" >
    <resultMap id="BaseResultMap" type="cn.tedu.storage.entity.Storage" >
        <id column="id" property="id" jdbcType="BIGINT" />
        <result column="product_id" property="productId" jdbcType="BIGINT" />
        <result column="total" property="total" jdbcType="INTEGER" />
        <result column="used" property="used" jdbcType="INTEGER" />
        <result column="residue" property="residue" jdbcType="INTEGER" />
    </resultMap>
    <update id="decrease">
    UPDATE storage SET used = used + #{count},residue = residue - #{count} WHERE product_id = #{productId}
  </update>
    <select id="selectById" resultMap="BaseResultMap">
        SELECT * FROM storage WHERE `product_id`=#{productId}
    </select>

    <update id="updateFrozen">
        UPDATE storage SET `residue`=#{residue},`frozen`=#{frozen} WHERE `product_id`=#{productId}
    </update>

    <update id="updateFrozenToUsed">
        UPDATE storage SET `frozen`=`frozen`-#{count}, `used`=`used`+#{count} WHERE `product_id`=#{productId}
    </update>

    <update id="updateFrozenToResidue">
        UPDATE storage SET `frozen`=`frozen`-#{count}, `residue`=`residue`+#{count} WHERE `product_id`=#{productId}
    </update>
</mapper>

Seata 实现库存的 TCC 操作方法

工具类 ResultHolder

package cn.tedu.storage.tcc;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class ResultHolder {
    private static Map<Class<?>, Map<String, String>> map = new ConcurrentHashMap<Class<?>, Map<String, String>>();

    public static void setResult(Class<?> actionClass, String xid, String v) {
        Map<String, String> results = map.get(actionClass);

        if (results == null) {
            synchronized (map) {
                if (results == null) {
                    results = new ConcurrentHashMap<>();
                    map.put(actionClass, results);
                }
            }
        }

        results.put(xid, v);
    }

    public static String getResult(Class<?> actionClass, String xid) {
        Map<String, String> results = map.get(actionClass);
        if (results != null) {
            return results.get(xid);
        }

        return null;
    }

    public static void removeResult(Class<?> actionClass, String xid) {
        Map<String, String> results = map.get(actionClass);
        if (results != null) {
            results.remove(xid);
        }
    }
}

添加 TCC 接口,在接口中添加以下方法:

  • Try - prepareDecreaseStorage()
  • Confirm - commit()
  • Cancel - rollback()
package cn.tedu.storage.tcc;

import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import io.seata.rm.tcc.api.LocalTCC;
import io.seata.rm.tcc.api.TwoPhaseBusinessAction;

@LocalTCC
public interface StorageTccAction {

    @TwoPhaseBusinessAction(name = "storageTccAction", commitMethod = "commit", rollbackMethod = "rollback")
    boolean prepareDecreaseStorage(BusinessActionContext businessActionContext,
                                   @BusinessActionContextParameter(paramName = "productId") Long productId,
                                   @BusinessActionContextParameter(paramName = "count") Integer count);

    boolean commit(BusinessActionContext businessActionContext);

    boolean rollback(BusinessActionContext businessActionContext);

}

实现类:

package cn.tedu.storage.tcc;

import cn.tedu.storage.entity.Storage;
import cn.tedu.storage.mapper.StorageMapper;
import io.seata.rm.tcc.api.BusinessActionContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

@Component
@Slf4j
public class StorageTccActionImpl implements StorageTccAction {

    @Autowired
    private StorageMapper storageMapper;

    @Transactional
    @Override
    public boolean prepareDecreaseStorage(BusinessActionContext businessActionContext, Long productId, Integer count) {
        log.info("减少商品库存,第一阶段,锁定减少的库存量,productId="+productId+", count="+count);

        Storage storage = storageMapper.selectById(productId);
        if (storage.getResidue()-count<0) {
            throw new RuntimeException("库存不足");
        }

        /*
        库存减掉count, 冻结库存增加count
         */
        storageMapper.updateFrozen(productId, storage.getResidue()-count, storage.getFrozen()+count);

        //保存标识
        ResultHolder.setResult(getClass(), businessActionContext.getXid(), "p");
        return true;
    }

    @Transactional
    @Override
    public boolean commit(BusinessActionContext businessActionContext) {
        long productId = Long.parseLong(businessActionContext.getActionContext("productId").toString());
        int count = Integer.parseInt(businessActionContext.getActionContext("count").toString());
        log.info("减少商品库存,第二阶段提交,productId="+productId+", count="+count);

        //防止重复提交
        if (ResultHolder.getResult(getClass(), businessActionContext.getXid()) == null) {
            return true;
        }

        storageMapper.updateFrozenToUsed(productId, count);

        //删除标识
        ResultHolder.removeResult(getClass(), businessActionContext.getXid());
        return true;
    }

    @Transactional
    @Override
    public boolean rollback(BusinessActionContext businessActionContext) {

        long productId = Long.parseLong(businessActionContext.getActionContext("productId").toString());
        int count = Integer.parseInt(businessActionContext.getActionContext("count").toString());
        log.info("减少商品库存,第二阶段,回滚,productId="+productId+", count="+count);

        //防止重复回滚
        if (ResultHolder.getResult(getClass(), businessActionContext.getXid()) == null) {
            return true;
        }

        storageMapper.updateFrozenToResidue(productId, count);

        //删除标识
        ResultHolder.removeResult(getClass(), businessActionContext.getXid());
        return true;
    }
} 

在业务代码中调用 Try 阶段方法

业务代码中调用 TCC 第一阶段方法prepareDecreaseStorage(),并添加全局事务注解 @GlobalTransactional

package cn.tedu.storage.service;

import cn.tedu.storage.tcc.StorageTccAction;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class StorageServiceImpl implements StorageService {
    // @Autowired
    // private StorageMapper storageMapper;

    @Autowired
    private StorageTccAction storageTccAction;

    @Override
    public void decrease(Long productId, Integer count) throws Exception {
        // storageMapper.decrease(productId,count);
        storageTccAction.prepareDecreaseStorage(null, productId, count);
    }

}

启动 storage 进行测试

按顺序启动服务:

  1. Eureka
  2. Seata Server
  3. Easy Id Generator
  4. Storage
  5. Order

调用保存订单,地址:
http://localhost:8083/create?userId=1&productId=1&count=10&money=100

观察 storage 的控制台日志:
a

查看数据库表中的库存数据:

a

account添加“扣减金额”分支事务

扣减金额 TCC 事务分析请见《分布式事务(六)Seata TCC模式-TCC模式介绍

配置

有三个文件需要配置:

  • application.yml
  • registry.conf
  • file.conf

这三个文件的设置与上面 order 项目的配置完全相同,请参考上面订单配置一章进行配置。

AccountMapper 添加冻结库存相关方法

根据前面的分析,库存数据操作有以下三项:

  • 冻结库存
  • 冻结库存量修改为已售出量
  • 解冻库存

在 AccountMapper 中添加三个方法:

package cn.tedu.account.mapper;

import cn.tedu.account.entity.Account;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Param;

import java.math.BigDecimal;

public interface AccountMapper extends BaseMapper<Account> {
    void decrease(Long userId, BigDecimal money);

    void updateFrozen(@Param("userId") Long userId, @Param("residue") BigDecimal residue, @Param("frozen") BigDecimal frozen);

    void updateFrozenToUsed(@Param("userId") Long userId, @Param("money") BigDecimal money);

    void updateFrozenToResidue(@Param("userId") Long userId, @Param("money") BigDecimal money);
}

那么对应的 AccountMapper.xml 中添加 sql:

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="cn.tedu.account.mapper.AccountMapper" >
    <resultMap id="BaseResultMap" type="cn.tedu.account.entity.Account" >
        <id column="id" property="id" jdbcType="BIGINT" />
        <result column="user_id" property="userId" jdbcType="BIGINT" />
        <result column="total" property="total" jdbcType="DECIMAL" />
        <result column="used" property="used" jdbcType="DECIMAL" />
        <result column="residue" property="residue" jdbcType="DECIMAL"/>
        <result column="frozen" property="frozen" jdbcType="DECIMAL"/>
    </resultMap>
    <update id="decrease">
      UPDATE account SET residue = residue - #{money},used = used + #{money} where user_id = #{userId};
    </update>
    <select id="selectById" resultMap="BaseResultMap">
        SELECT * FROM account WHERE `user_id`=#{userId}
    </select>

    <update id="updateFrozen">
        UPDATE account SET `residue`=#{residue},`frozen`=#{frozen} WHERE `user_id`=#{userId}
    </update>

    <update id="updateFrozenToUsed">
        UPDATE account SET `frozen`=`frozen`-#{money}, `used`=`used`+#{money} WHERE `user_id`=#{userId}
    </update>

    <update id="updateFrozenToResidue">
        UPDATE account SET `frozen`=`frozen`-#{money}, `residue`=`residue`+#{money} WHERE `user_id`=#{userId}
    </update>
</mapper>

Seata 实现库存的 TCC 操作方法

工具类 ResultHolder

package cn.tedu.account.tcc;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class ResultHolder {
    private static Map<Class<?>, Map<String, String>> map = new ConcurrentHashMap<Class<?>, Map<String, String>>();

    public static void setResult(Class<?> actionClass, String xid, String v) {
        Map<String, String> results = map.get(actionClass);

        if (results == null) {
            synchronized (map) {
                if (results == null) {
                    results = new ConcurrentHashMap<>();
                    map.put(actionClass, results);
                }
            }
        }

        results.put(xid, v);
    }

    public static String getResult(Class<?> actionClass, String xid) {
        Map<String, String> results = map.get(actionClass);
        if (results != null) {
            return results.get(xid);
        }

        return null;
    }

    public static void removeResult(Class<?> actionClass, String xid) {
        Map<String, String> results = map.get(actionClass);
        if (results != null) {
            results.remove(xid);
        }
    }
}

添加 TCC 接口,在接口中添加以下方法:

  • Try - prepareDecreaseAccount()
  • Confirm - commit()
  • Cancel - rollback()
package cn.tedu.account.tcc;

import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import io.seata.rm.tcc.api.LocalTCC;
import io.seata.rm.tcc.api.TwoPhaseBusinessAction;

import java.math.BigDecimal;

@LocalTCC
public interface AccountTccAction {

    @TwoPhaseBusinessAction(name = "accountTccAction", commitMethod = "commit", rollbackMethod = "rollback")
    boolean prepareDecreaseAccount(BusinessActionContext businessActionContext,
                                   @BusinessActionContextParameter(paramName = "userId") Long userId,
                                   @BusinessActionContextParameter(paramName = "money") BigDecimal money);

    boolean commit(BusinessActionContext businessActionContext);

    boolean rollback(BusinessActionContext businessActionContext);

}

实现类:

package cn.tedu.account.tcc;

import cn.tedu.account.entity.Account;
import cn.tedu.account.mapper.AccountMapper;
import io.seata.rm.tcc.api.BusinessActionContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import java.math.BigDecimal;

@Component
@Slf4j
public class AccountTccActionImpl implements AccountTccAction {
    @Autowired
    private AccountMapper accountMapper;

    @Transactional
    @Override
    public boolean prepareDecreaseAccount(BusinessActionContext businessActionContext, Long userId, BigDecimal money) {
        log.info("减少账户金额,第一阶段锁定金额,userId="+userId+", money="+money);

        Account account = accountMapper.selectById(userId);
        if (account.getResidue().compareTo(money) < 0) {
            throw new RuntimeException("账户金额不足");
        }

        /*
        余额-money
        冻结+money
         */
        accountMapper.updateFrozen(userId, account.getResidue().subtract(money), account.getFrozen().add(money));

        //保存标识
        ResultHolder.setResult(getClass(), businessActionContext.getXid(), "p");
        return true;
    }

    @Transactional
    @Override
    public boolean commit(BusinessActionContext businessActionContext) {

        long userId = Long.parseLong(businessActionContext.getActionContext("userId").toString());
        BigDecimal money =  new BigDecimal(businessActionContext.getActionContext("money").toString());
        log.info("减少账户金额,第二阶段,提交,userId="+userId+", money="+money);

        //防止重复提交
        if (ResultHolder.getResult(getClass(), businessActionContext.getXid()) == null) {
            return true;
        }

        accountMapper.updateFrozenToUsed(userId, money);

        //删除标识
        ResultHolder.removeResult(getClass(), businessActionContext.getXid());
        return true;
    }

    @Transactional
    @Override
    public boolean rollback(BusinessActionContext businessActionContext) {
        long userId = Long.parseLong(businessActionContext.getActionContext("userId").toString());
        BigDecimal money =  new BigDecimal(businessActionContext.getActionContext("money").toString());

        //防止重复回滚
        if (ResultHolder.getResult(getClass(), businessActionContext.getXid()) == null) {
            return true;
        }

        log.info("减少账户金额,第二阶段,回滚,userId="+userId+", money="+money);

        accountMapper.updateFrozenToResidue(userId, money);

        //删除标识
        ResultHolder.removeResult(getClass(), businessActionContext.getXid());
        return true;
    }
}

在业务代码中调用 Try 阶段方法

业务代码中调用 TCC 第一阶段方法prepareDecreaseAccount(),并添加全局事务注解 @GlobalTransactional

package cn.tedu.account.service;

import cn.tedu.account.mapper.AccountMapper;
import cn.tedu.account.tcc.AccountTccAction;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.math.BigDecimal;
@Service
public class AccountServiceImpl implements AccountService {
    // @Autowired
    // private AccountMapper accountMapper;

    @Autowired
    private AccountTccAction accountTccAction;

    @Override
    public void decrease(Long userId, BigDecimal money) {
        // accountMapper.decrease(userId,money);
        accountTccAction.prepareDecreaseAccount(null, userId, money);
    }
}

启动 account 进行测试

按顺序启动服务:

  1. Eureka
  2. Seata Server
  3. Easy Id Generator
  4. Storage
  5. Account
  6. Order

调用保存订单,地址:
http://localhost:8083/create?userId=1&productId=1&count=10&money=100

观察 account 的控制台日志:
a

查看数据库表中的账户数据:

a

全局事务回滚测试

下面来测试全局事务回滚的情况。

订单和库存第一阶段成功,而账户第一阶段失败了,这时会触发全局事务的回滚,如下图所示:

a
首先在 account 的第一阶段代码中添加模拟异常:

AccountTccActionImplprepareDecreaseAccount 方法

@Transactional
    @Override
    public boolean prepareDecreaseAccount(BusinessActionContext businessActionContext, Long userId, BigDecimal money) {
        log.info("减少账户金额,第一阶段锁定金额,userId="+userId+", money="+money);

        Account account = accountMapper.selectById(userId);
        if (account.getResidue().compareTo(money) < 0) {
            throw new RuntimeException("账户金额不足");
        }

        /*
        余额-money
        冻结+money
         */
        accountMapper.updateFrozen(userId, account.getResidue().subtract(money), account.getFrozen().add(money));

        if (Math.random() < 0.5) {
            throw new RuntimeException("模拟异常");
        }

        //保存标识
        ResultHolder.setResult(getClass(), businessActionContext.getXid(), "p");
        return true;
    }

重启 account 后,访问订单:
http://localhost:8083/create?userId=1&productId=1&count=10&money=100

查看控制台,可以看到 storage 和 order 的回滚日志,order 的回滚日志如下:

a

查看原文

赞 1 收藏 1 评论 1

KerryWu 发布了文章 · 1月10日

mysql增量同步 - canal

两年前在项目上实施oracle etl同步时,客户就提出cdc(Change Data Capture)增量同步的需求,并且明确要求基于日志来捕获数据变化。当时对于这方面的知识储备不够,只觉得这样的需求太苛刻。到了后来我实施分布式架构的方案越来越多,经常会思考如何保障数据的一致性,也让我回过头来,重新思考当年客户的需求。

本文的主角是canal,常用来保障mysql到redis、elasticsearch等产品数据的增量同步。下文先讲canal的安装配置,再结合具体的代码,实现mysql到redis的实时同步。

1. 简介

1.1. 背景

分布式架构近些年很受推崇,我们的系统开发不再局限于一台mysql数据库,可以为了缓存而引入redis,为了搜索而引入elasticsearch,等等,这些是分布式架构给我们带来的便利性。

但带来的挑战也加大了,比如:微服务治理、分布式事务,今天还会讲到数据同步。以前对于关系型数据库的数据同步,已经诞生了不少 ETL工具,比如我们熟悉的 oracle ODI。但是以现在微服务开发而论,还是不够灵活,我们需要可以自由的将mysql数据同步到redis、elasticsearch等地方。这里就可以用到本文的主角 -- canal

1.2. canal

canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。

早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。

基于日志增量订阅和消费的业务包括:

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

当前的 canal 支持源端 mysql 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x 。

1.3. 工作原理

MySQL主备复制原理

MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)

MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)

MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

canal 工作原理

canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议

MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )

canal 解析 binary log 对象(原始为 byte 流)

1.4. 优点

关于canal的优点,肯定是要拿它和之前接触过 ETL工具做比较。

  • 面向编程: 我用过的几个ETL工具都偏工具化。同步规则的自定义空间不大,作为开发人员更倾向于用编程的方式实现,我想那些用 spring cloud gateway 替代nginx 的人应该能理解。而 canal的客户端,则是完全面向java编程的,开发起来更方便。
  • 增量同步: 大多ETL工具都专注于“全量同步”,对于实时性,也都靠设置定时策略来周期性执行,但 canal是专注于做实时“增量同步”的,而且它的做法也比较好。不少ETL工具老的方案,是通过数据库trigger来实现增量同步的,会给数据库带来很大的压力,侵入性较高,而canal用的是新的方案,基于binlog日志来监听数据变化。

2. 安装配置

2.1. mysql配置

需要先开启mysql的 binlog 写入功能,配置 binlog-format 为 ROW 模式。这里修改 my.cnf 文件,添加下列配置:

log-bin=mysql-bin   # 开启 binlog
binlog-format=ROW   # 选择 ROW 模式
server_id=1        # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

重启mysql,用以下命令检查一下binlog是否正确启动:

mysql> show variables like 'log_bin%';
+---------------------------------+----------------------------------+
| Variable_name                   | Value                            |
+---------------------------------+----------------------------------+
| log_bin                         | ON                               |
| log_bin_basename                | /data/mysql/data/mysql-bin       |
| log_bin_index                   | /data/mysql/data/mysql-bin.index |
| log_bin_trust_function_creators | OFF                              |
| log_bin_use_v1_row_events       | OFF                              |
+---------------------------------+----------------------------------+
5 rows in set (0.00 sec)
mysql> show variables like 'binlog_format%';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW   |
+---------------+-------+
1 row in set (0.00 sec)

创建一个mysql用户canal 并且赋远程链接权限权限。

CREATE USER canal IDENTIFIED BY 'canal';
GRANT ALL PRIVILEGES ON test_canal.user TO 'canal'@'%';
FLUSH PRIVILEGES;

2.2. canal.deployer安装配置

  1. canal下载页找到对应版本的部署包(canal.deployer-1.x.x.tar.gz)。
  2. 解压安装包tar -zxvf canal.deployer-1.4.0.tar.gz,得到四个目录bin、conf、lib、logs。
  3. 修改配置conf/example/instance.properties,配置参数比较多,下面就列几个常见的数据库配置信息
canal.instance.master.address = 127.0.0.1:3306 
canal.instance.dbUsername = canal  
canal.instance.dbPassword = canal
  1. 通过脚本启动 canal
# 启动
sh bin/startup.sh
# 关闭
sh bin/stop.sh
# 查看日志
tail -500f logs/canal/canal.log
# 查看具体实例日志
tail -500f logs/example/example.log

3. 基于adapter同步

3.1. 简述

canal作为mysql的实时数据订阅组件,实现了对mysql binlog数据的抓取。

虽然阿里也开源了一个纯粹从mysql同步数据到mysql的项目otter(github.com/alibaba/otter,基于canal的),实现了mysql的单向同步、双向同步等能力。但是我们经常有从mysql同步数据到es、hbase等存储的需求,就需要用户自己用canal-client获取数据进行消费,比较麻烦。

从1.1.1版本开始,canal实现了一个配套的落地模块,实现对canal订阅的消息进行消费,就是client-adapter(github.com/alibaba/canal/wiki/ClientAdapter)。

目前的最新稳定版1.1.4版本中,client-adapter已经实现了同步数据到RDS、ES、HBase的能力。

目前Adapter具备以下基本能力:

  • 对接上游消息,包括kafka、rocketmq、canal-server
  • 实现mysql数据的增量同步
  • 实现mysql数据的全量同步
  • 下游写入支持rds、es、hbase

本文不关注这部分canal.adapter的配置,具体的配置方式,请参考github官方文档。

3.2. elasticsearch示例

简单贴一贴下游es的配置,修改启动器配置: application.yml

server:
  port: 8081
logging:
  level:
    com.alibaba.otter.canal.client.adapter.es: DEBUG
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  canalServerHost: 127.0.0.1:11111
  flatMessage: true
  srcDataSources:
    defaultDS:
      url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true
      username: root
      password: 121212
  canalInstances:
  - instance: example
    adapterGroups:
    - outAdapters:
      - name: es
        hosts: 127.0.0.1:9300               # es 集群地址, 逗号分隔
        properties:
          cluster.name: elasticsearch       # es cluster name

adapter将会自动加载 conf/es 下的所有.yml结尾的配置文件,再修改 conf/es/mytest_user.yml文件:

dataSourceKey: defaultDS        # 源数据源的key, 对应上面配置的srcDataSources中的值
destination: example            # cannal的instance或者MQ的topic
esMapping:
  _index: mytest_user           # es 的索引名称
  _type: _doc                   # es 的doc名称
  _id: _id                      # es 的_id, 如果不配置该项必须配置下面的pk项_id则会由es自动分配
#  pk: id                       # 如果不需要_id, 则需要指定一个属性为主键属性
  # sql映射
  sql: "select a.id as _id, a.name as _name, a.role_id as _role_id, b.role_name as _role_name,
        a.c_time as _c_time, c.labels as _labels from user a
        left join role b on b.id=a.role_id
        left join (select user_id, group_concat(label order by id desc separator ';') as labels from label
        group by user_id) c on c.user_id=a.id"
#  objFields:
#    _labels: array:;           # 数组或者对象属性, array:; 代表以;字段里面是以;分隔的
#    _obj: obj:{"test":"123"}
  etlCondition: "where a.c_time>='{0}'"     # etl 的条件参数
  commitBatch: 3000                         # 提交批大小

sql映射说明:

sql支持多表关联自由组合, 但是有一定的限制:

  1. 主表不能为子查询语句
  2. 只能使用left outer join即最左表一定要是主表
  3. 关联从表如果是子查询不能有多张表
  4. 主sql中不能有where查询条件(从表子查询中可以有where条件但是不推荐, 可能会造成数据同步的不一致, 比如修改了where条件中的字段内容)
  5. 关联条件只允许主外键的'='操作不能出现其他常量判断比如: on a.role_id=b.id and b.statues=1
  6. 关联条件必须要有一个字段出现在主查询语句中比如: on a.role_id=b.id 其中的 a.role_id 或者 b.id 必须出现在主select语句中

Elastic Search的mapping 属性与sql的查询值将一一对应(不支持 select *), 比如: select a.id as _id, a.name, a.email as _email from user, 其中name将映射到es mapping的name field, _email将 映射到mapping的_email field, 这里以别名(如果有别名)作为最终的映射字段. 这里的_id可以填写到配置文件的 _id: _id映射.

1、单表映射索引示例
select a.id as _id, a.name, a.role_id, a.c_time from user a

该sql对应的es mapping示例:

{
    "mytest_user": {
        "mappings": {
            "_doc": {
                "properties": {
                    "name": {
                        "type": "text"
                    },
                    "role_id": {
                        "type": "long"
                    },
                    "c_time": {
                        "type": "date"
                    }
                }
            }
        }
    }
}
2、单表映射索引示例sql带函数或运算操作
select a.id as _id, concat(a.name,'_test') as name, a.role_id+10000 as role_id, a.c_time from user a

函数字段后必须跟上别名, 该sql对应的es mapping示例:

{
    "mytest_user": {
        "mappings": {
            "_doc": {
                "properties": {
                    "name": {
                        "type": "text"
                    },
                    "role_id": {
                        "type": "long"
                    },
                    "c_time": {
                        "type": "date"
                    }
                }
            }
        }
    }
}
3、多表映射(一对一, 多对一)索引示例sql:
select a.id as _id, a.name, a.role_id, b.role_name, a.c_time from user a 
left join role b on b.id = a.role_id

注:这里join操作只能是left outer join, 第一张表必须为主表!!该sql对应的es mapping示例:

{
    "mytest_user": {
        "mappings": {
            "_doc": {
                "properties": {
                    "name": {
                        "type": "text"
                    },
                    "role_id": {
                        "type": "long"
                    },
                    "role_name": {
                        "type": "text"
                    },
                    "c_time": {
                        "type": "date"
                    }
                }
            }
        }
    }
}
4、多表映射(一对多)索引示例sql:
select a.id as _id, a.name, a.role_id, c.labels, a.c_time from user a 
left join (select user_id, group_concat(label order by id desc separator ';') as labels from label
        group by user_id) c on c.user_id=a.id

注:left join 后的子查询只允许一张表, 即子查询中不能再包含子查询或者关联!!该sql对应的es mapping示例:

{
    "mytest_user": {
        "mappings": {
            "_doc": {
                "properties": {
                    "name": {
                        "type": "text"
                    },
                    "role_id": {
                        "type": "long"
                    },
                    "c_time": {
                        "type": "date"
                    },
                    "labels": {
                        "type": "text"
                    }
                }
            }
        }
    }
}

4. 基于代码同步

如果你是同步到es、rds、hbase,但是adapter实现不了你的需求,可以用下列代码的方式实现。
如果你是想要同步到redis、mongo等数据库,因为adapter目前还不支持,同样可以用代码的方式实现。

如果你是用springboot开发,目前有两种常见的方式:

  1. 阿里原生的canal.client,比较推荐这种,可参考GitHub官方文档
  2. 个人基于canal.client开发的starter,可参考GitHub官方文档

本文选用第一种方式,实现 canal 到 redis的实时同步。

4.1. 代码(redis同步)

pom.xml
        <!--lombok-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.16</version>
        </dependency>
        <!-- canal-->
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.3</version>
        </dependency>
        <!--redis-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
RedisTableUtil.java
@Slf4j
@Component
public class RedisTableUtil {
    private static final String PK_NAME="id";
    private static final String NULL_ID="NULL";

    private final RedisTemplate redisTemplate;

    public RedisTableUtil(RedisTemplate redisTemplate){
        this.redisTemplate=redisTemplate;
    }

    /**
     * 新增
     * @param columnList
     * @param databaseName
     * @param tableName
     */
    public void insert(List<CanalEntry.Column> columnList,String databaseName,String tableName){
        String keyName=this.generateKeyName(columnList,databaseName,tableName);
        HashOperations hashOperations= redisTemplate.opsForHash();
        columnList.stream()
                .forEach((column -> {
                    hashOperations.put(keyName,column.getName(),column.getValue());
                }));
    }

    /**
     * 删除
     * @param columnList
     * @param databaseName
     * @param tableName
     */
    public void delete(List<CanalEntry.Column> columnList,String databaseName,String tableName){
        String keyName=this.generateKeyName(columnList,databaseName,tableName);
        redisTemplate.delete(keyName);
    }

    /**
     * 更新
     * @param columnList
     * @param databaseName
     * @param tableName
     */
    public void update(List<CanalEntry.Column> columnList,String databaseName,String tableName){
        String keyName=this.generateKeyName(columnList,databaseName,tableName);
        HashOperations hashOperations= redisTemplate.opsForHash();
        columnList.stream()
                .filter(CanalEntry.Column::getUpdated)
                .forEach((column -> {
                    hashOperations.put(keyName,column.getName(),column.getValue());
                }));
    }

    /**
     * 生成 行记录 key
     * @param columnList
     * @param databaseName
     * @param tableName
     * @return
     */
    private String generateKeyName(List<CanalEntry.Column> columnList,String databaseName,String tableName){
        Optional<String> id= columnList.stream()
                .filter(column -> PK_NAME.equals(column.getName()))
                .map(CanalEntry.Column::getValue)
                .findFirst();
        return databaseName+"_"+tableName+"_"+id.orElse(NULL_ID);
    }
}
RedisConfig.java
@Configuration
public class RedisConfig {

    @Bean
    public RedisTemplate redisTemplate(RedisTemplate redisTemplate){
        RedisSerializer<String> stringSerializer = new StringRedisSerializer();
        redisTemplate.setKeySerializer(stringSerializer);
        redisTemplate.setValueSerializer(stringSerializer);
        redisTemplate.setHashKeySerializer(stringSerializer);
        redisTemplate.setHashValueSerializer(stringSerializer);
        return redisTemplate;
    }

}
CanalServer.java
@Slf4j
@Component
public class CanalServer {
    private static final String THREAD_NAME_PREFIX="canalStart-";

    private final RedisTableUtil redisTableUtil;

    public CanalServer(RedisTableUtil redisTableUtil){
        this.redisTableUtil=redisTableUtil;
    }

    /**
     * 初始化
     * 单线程启动 canal客户端
     */
    @PostConstruct
    public void init() {
        //需要开启一个新的线程来执行 canal 服务
        Thread initThread = new CanalStartThread();
        initThread.setName(THREAD_NAME_PREFIX);
        initThread.start();
    }


    /**
     * 定义 canal服务线程
     */
    public class CanalStartThread extends Thread {
        @Override
        public void run() {
            // 创建链接
            CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("localhost", 11111),
                    "example", "", "");
            connector.connect();
            connector.subscribe(".*\\..*");
            connector.rollback();
            try {
                while (true) {
                    // 获取指定数量的数据
                    Message message = connector.getWithoutAck(1000);
                    long batchId = message.getId();
                    if (batchId != -1 &&  message.getEntries().size() > 0) {
                        entryHandler(message.getEntries());
                    }
                    connector.ack(batchId); // 提交确认
                    Thread.sleep(1000);
                }

            }catch (Exception e){
                log.error("Canal线程异常,已终止:"+e.getMessage());
            } finally {
                connector.disconnect();
            }
        }
    }

    /**
     * canal 入口处理器
     * @param entrys
     */
    private  void entryHandler( List<Entry> entrys) {
        for (Entry entry : entrys) {
            //操作事物 忽略
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { continue; }
            CanalEntry.RowChange rowChage = null;
            String databaseName=null;
            String tableName=null;
            try {
                databaseName=entry.getHeader().getSchemaName();
                tableName=entry.getHeader().getTableName();
                rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (InvalidProtocolBufferException e) {
                log.error("获取数据失败:"+e.getMessage());
            }
            //获取执行的事件
            CanalEntry.EventType eventType = rowChage.getEventType();
            for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
                //删除操作
                if (eventType.equals(CanalEntry.EventType.DELETE)) {
                    redisTableUtil.delete(rowData.getBeforeColumnsList(),databaseName,tableName);
                }
                //添加操作
                else if (eventType.equals(CanalEntry.EventType.INSERT)) {
                    redisTableUtil.insert(rowData.getAfterColumnsList(),databaseName,tableName);
                }
                //修改操作
                else if(eventType.equals(CanalEntry.EventType.UPDATE)) {
                    redisTableUtil.insert(rowData.getAfterColumnsList(),databaseName,tableName);
                }
            }
        }
    }
    
}

CanalServer 中单独起了一个线程,每秒获取mysql日志。当监听到mysql表数据的新增、更新、删除操作时,会在redis中做出对应的数据操作,具体redis的更新逻辑在RedisTableUtil类中定义。

这里在while(true)循环中,加上了 Thread.sleep(1000);,为了避免空轮询造成的CPU占有率飙升。那么有人会问Thread.sleep 不是暂停线程吗,它就不会占有CPU了吗?Thread.sleep,主要是为了暂停当前线程,把cpu片段让出给其他线程,减缓当前线程的执行。因此它会暂停线程,但并不会占有CPU运行片段。

一般生产环境日志量大,可以将监听到的binlog事件推到消息中间件,再在消息中间件的消费端做下游数据同步的处理。

另外通过binlog监听到的数据库操作不止DML,其实还有DQL和DDL等,只不过代码中没有体现。因此可以想象的到,使用canal能实现的功能,远不止数据同步这点功能。

查看原文

赞 0 收藏 0 评论 0

KerryWu 赞了文章 · 2020-12-21

hadoop教程-MapReduce

什么是MapReduce

MapReduce是hadoop进行多节点计算时采用的计算模型,说白了就是hadoop拆分任务的一套方法论,刚接触MapReduce这个概念时,一时很难理解,也查了很多资料,因为每个人理解不一样,反而看的越多越糊涂,其实本质是很简单的东西,这里举一个例子帮助理解,因为网上大部分是hadoop官方计算单词(wordcount)的例子,这里就换一个场景举例。

假设有以下一份成绩单

1,张三,78,87,69
2,李四,56,76,91
3,王五,65,46,84
4,赵六,89,56,98
...

各列分别是编号,学生姓名,语文成绩,数学成绩,英语成绩,现在要求统计各科成绩最高分,假设这份成绩单非常非常的长,有上千万行,在没有任何计算机系统的帮助下,要怎么靠人工解决这个问题?

  • 单人统计

专门派一个人进行统计工作,优点是简单,缺点也很明显,需要非常长的时间,甚至数据量达到一定程度,一个人一辈子可能也统计不完

  • 多人统计

如果有足够的人可以进行统计工作,要怎么去协调这些人?假设成绩单有100w行并且有1000人可以进行统计

    1. 设一个管理员,管理员把成绩单平均拆分成1000份给1000个人,每个人需要统计1000行数据
    1. 管理员制作一个表格,要求每个人把自己统计的结果填入该表格,表格格式如下
科目人员1结果人员2结果...人员1000结果
语文
数学
英语
    1. 管理员最终得到了如下数据

    科目| 人员1结果|人员2结果|...|人员1000结果

语文8085...76
数学8990...88
英语9485...90
    1. 各科各有1000个结果,管理员又把这个表格拆成了100个小表格分给100个人进行统计,这样每个小表格各有10个数据,小表格格式如下

第一个人领到的小表格

科目人员1结果人员2结果...人员10结果
语文8085...76
数学8990...88
英语9485...90

第二个领到的小表格

科目人员11结果人员12结果...人员20结果
语文8375...88
数学7995...58
英语9485...90
    1. 管理员再次把每个人的结果收集上来,又得到了100份数据,如果管理员愿意又可以把这个数据进行拆分交给多个人进行统计,如此反复最终得到一个最大值结果,管理员也可以自己完成最后的统计,因为数据量不大。

那么在这个过程中,我们看到了,一份庞大的成绩单经过以下几个步骤后,最终我们获得了我们想要的结果

  • 成绩单拆分多份
  • 每一份进行单独统计
  • 对结果进行登记
  • 对统计的结果可以再次进行拆分,也可以直接进行统计
  • 如此反复之后最终得到了结果

那么把这个过程用MapReduce语言进行描述就可以是以下过程:

  • 成绩单拆分多份- 分片(split)
  • 每一份进行单独统计 - map
  • 并且对结果进行登记 - shuffle
  • 对统计的结果可以再次进行拆分- combiner
  • 也可以直接进行统计 - reduce

另外在管理员的表格中,三个科目后面记录

开发

我们用实际java代码解决上面的问题,假设你已经按照上一篇教程安装好了hadoop集群环境

  • 创建工程

你可以用你熟悉的ide创建一个普通java工程,建议用maven进行包管理,并加入以下包依赖

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.5.1</version>
</dependency>
  • 创建Mapper

Mapper对应是MapReduce中的map过程,在以下mapper代码:

StudentMapper.java

public class StudentMapper extends MapReduceBase implements
        Mapper<LongWritable, Text, Text, IntWritable> {
    public void map(LongWritable longWritable, Text text, OutputCollector<Text, IntWritable> outputCollector, Reporter reporter) throws IOException {
        String[] ss = text.toString().split(",");
        outputCollector.collect(new Text("语文"), new IntWritable(Integer.parseInt(ss[2])));
        outputCollector.collect(new Text("数学"), new IntWritable(Integer.parseInt(ss[3])));
        outputCollector.collect(new Text("英语"), new IntWritable(Integer.parseInt(ss[4])));
    }
}

StudentMapper实现了 Mapper<LongWritable, Text, Text, IntWritable>接口,这里有四个参数,四个参数含义如下

  • LongWritable:hadoop会把txt文件按行进行分割,这个表示该行在文件中的位置,一般不用
  • Text:行内容,比如第一行就是1,张三,78,87,69
  • Text:前面提到,最终我们要按照科目进行汇总然后计算最高分,那么科目名称就是key,每次计算的结果就是后面的value,所以这里用text表示key,因为我们要存储科目名称
  • IntWritable:存储计算结果,这里指的是基于本次统计所得到的科目最高分

方法map(LongWritable longWritable, Text text, OutputCollector<Text, IntWritable> outputCollector, Reporter reporter)的几个参数和上面含义一样,注意到outputCollector是一个数组,说明这里可以写入多个结果,reporter可以向hadoop汇报任务进度。在这个mapper里面,我们并没有做什么计算,我们只是把文本里面的成绩解析出来,并且按科目放到outputCollector中,相当于大家第一次都没干活,只是把数据整理好。经过mapper后,数据从

1,张三,78,87,69
2,李四,56,76,91
3,王五,65,46,84
4,赵六,89,56,98
...

变成了

---
语文78566589...
数学87764656...
英语69918498...

StudentReducer.java

public class StudentReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
    public void reduce(Text text, Iterator<IntWritable> iterator, OutputCollector<Text, IntWritable> outputCollector, Reporter reporter) throws IOException {
        StringBuffer str = new StringBuffer();
        Integer max = -1;
        while (iterator.hasNext()) {
            Integer score = iterator.next().get();
            if (score > max) {
                max = score;
            }
        }
        outputCollector.collect(new Text(text.toString()), new IntWritable(max));
    }
}

Reducer就开始真正执行计算了,reducer函数reduce(Text text, Iterator<IntWritable> iterator, OutputCollector<Text, IntWritable> outputCollector, Reporter reporter)参数含义如下:

  • text:就是Mapper的第三个参数
  • iterator:就是Mapper中写入outputCollector的数据,和第一参数组合起来就是mapper中的outputCollector
  • outputCollector:reducer计算后的结果需要写入到该参数中,这里我们写入的内容是类似key:语文 value:90结构的数据,所以类型为<Text, IntWritable>

前面提到过mapper会把数据整理好,并且按科目将成绩写入的outputCollector中,那么到了reducer这一步,hadoop就会把mapper写入的数据按照key进行汇总(也就是科目),并且交付给reducer,reducer负责计算里面最高分,并且也将结果写入outputCollector。

StudentProcessor

public class StudentProcessor {

    public static void main(String args[]) throws Exception {
        JobConf conf = new JobConf(StudentProcessor.class);
        conf.setJobName("max_scroe_poc1");
        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(IntWritable.class);
        conf.setMapperClass(StudentMapper.class);
        conf.setReducerClass(StudentReducer.class);
        conf.setInputFormat(TextInputFormat.class);
        conf.setOutputFormat(TextOutputFormat.class);
        FileInputFormat.setInputPaths(conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(conf, new Path(args[1]));
        JobClient.runJob(conf);
    }
}

我们还需要一个包含main函数的启动类,执行mvn package命令进行打包,我们假设包名为hadoop-score-job.jar,将jar包通过ftp等工具上传到服务器目录下。

  • 上传数据

hadoop借助hdfs分布式文件系统,能够将大文件存储在多个节点,通过hdfs cli工具,我们感觉在操作本地文件一样,在上面的代码中FileInputFormat.setInputPaths(conf, new Path(args[0]));设置了MapReduce的数据来源,用户指定目录,该目录下文件作为数据来源,这里的目录就是hdfs中的目录,并且该目录必须存在,而且数据需要上传到该目录下,执行以下命令创建目录

hadoop fs -mkdir poc01_input

执行以下命令将数据导入到hdfs中

hadoop fs -put score.txt poc01_input

score.txt内容为

1,张三,78,87,69
2,李四,56,76,91
3,王五,65,46,84
4,赵六,89,56,98

通过ls命令可以查看文件是否上传成功

$ hadoop fs -ls poc01_input
Found 1 items
-rw-r--r--   1 hadoop supergroup         72 2020-12-13 15:43 poc01_input/score.txt
  • 执行job

执行以下命令开始运行job

$ hadoop jar hadoop-score-job.jar com.hadoop.poc.StudentProcessor poc01_input poc01_output

20/12/13 16:01:33 INFO client.RMProxy: Connecting to ResourceManager at master/172.16.8.42:18040
20/12/13 16:01:33 INFO client.RMProxy: Connecting to ResourceManager at master/172.16.8.42:18040
20/12/13 16:01:34 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
20/12/13 16:01:34 INFO mapred.FileInputFormat: Total input files to process : 1
20/12/13 16:01:35 INFO mapreduce.JobSubmitter: number of splits:2
20/12/13 16:01:35 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1607087481584_0005
20/12/13 16:01:35 INFO conf.Configuration: resource-types.xml not found
20/12/13 16:01:35 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
20/12/13 16:01:35 INFO resource.ResourceUtils: Adding resource type - name = memory-mb, units = Mi, type = COUNTABLE
20/12/13 16:01:35 INFO resource.ResourceUtils: Adding resource type - name = vcores, units = , type = COUNTABLE
20/12/13 16:01:36 INFO impl.YarnClientImpl: Submitted application application_1607087481584_0005
20/12/13 16:01:36 INFO mapreduce.Job: The url to track the job: http://master:18088/proxy/application_1607087481584_0005/
20/12/13 16:01:36 INFO mapreduce.Job: Running job: job_1607087481584_0005
20/12/13 16:01:43 INFO mapreduce.Job: Job job_1607087481584_0005 running in uber mode : false
20/12/13 16:01:43 INFO mapreduce.Job:  map 0% reduce 0%
20/12/13 16:01:51 INFO mapreduce.Job:  map 100% reduce 0%
20/12/13 16:01:57 INFO mapreduce.Job:  map 100% reduce 100%
20/12/13 16:01:57 INFO mapreduce.Job: Job job_1607087481584_0005 completed successfully
20/12/13 16:01:57 INFO mapreduce.Job: Counters: 49
        File System Counters
                FILE: Number of bytes read=84
                FILE: Number of bytes written=625805
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=316
                HDFS: Number of bytes written=30
                HDFS: Number of read operations=9
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=2
        Job Counters 
                Launched map tasks=2
                Launched reduce tasks=1
                Data-local map tasks=2
                Total time spent by all maps in occupied slots (ms)=12036
                Total time spent by all reduces in occupied slots (ms)=3311
                Total time spent by all map tasks (ms)=12036
                Total time spent by all reduce tasks (ms)=3311
                Total vcore-milliseconds taken by all map tasks=12036
                Total vcore-milliseconds taken by all reduce tasks=3311
                Total megabyte-milliseconds taken by all map tasks=12324864
                Total megabyte-milliseconds taken by all reduce tasks=3390464
        Map-Reduce Framework
                Map input records=4
                Map output records=12
                Map output bytes=132
                Map output materialized bytes=90
                Input split bytes=208
                Combine input records=12
                Combine output records=6
                Reduce input groups=3
                Reduce shuffle bytes=90
                Reduce input records=6
                Reduce output records=3
                Spilled Records=12
                Shuffled Maps =2
                Failed Shuffles=0
                Merged Map outputs=2
                GC time elapsed (ms)=395
                CPU time spent (ms)=1790
                Physical memory (bytes) snapshot=794595328
                Virtual memory (bytes) snapshot=5784080384
                Total committed heap usage (bytes)=533200896
        Shuffle Errors
                BAD_ID=0
                CONNECTION=0
                IO_ERROR=0
                WRONG_LENGTH=0
                WRONG_MAP=0
                WRONG_REDUCE=0
        File Input Format Counters 
                Bytes Read=108
        File Output Format Counters 
                Bytes Written=30
  • hadoop-score-job.jar为上面打包的jar包,需要cd到jar包目录下执行命令
  • com.hadoop.poc.StudentProcessor 包含main函数的类
  • poc01_input 数据来源目录
  • poc01_output 数据输出目录

job执行完后,结果会保存在poc01_output目录下

$ hadoop fs -ls poc01_output2
Found 2 items
-rw-r--r--   1 hadoop supergroup          0 2020-12-13 16:01 poc01_output2/_SUCCESS
-rw-r--r--   1 hadoop supergroup         30 2020-12-13 16:01 poc01_output2/part-00000

$ hadoop fs -cat poc01_output2/part-00000
数学    87
英语    98
语文    89
查看原文

赞 1 收藏 0 评论 0

KerryWu 收藏了文章 · 2020-12-04

Java中的native是如何实现的(JNI)

什么是JNI

JNI是Java Native Interface的缩写,Java本地接口(JNI)提供了将Java与C/C++、汇编等本地代码集成的方案,该规范使得在 Java 虚拟机内运行的 Java 代码能够与其它编程语言互相操作,包括创建本地方法、更新 Java 对象、调用 Java 方法,引用 Java 类,捕捉和抛出异常等,也允许 Java 代码调用 C/C++ 或汇编语言编写的程序和库。使用java与本地已编译的代码交互,通常会丧失平台可移植性。但是,有些情况下这样做是可以接受的,甚至是必须的。例如,使用一些旧的库,与硬件、操作系统进行交互,或者为了提高程序的性能。JNI标准至少要保证本地代码能工作在任何Java 虚拟机环境。

JNI的编程步骤

  1. 编写带有native声明的方法的java类
  2. 使用javac命令编译所编写的java类
  3. 然后使用javah + java类名生成扩展名为h的头文件
  4. 使用C/C++实现本地方法
  5. 将C/C++编写的文件生成动态连接库

利用JNI简单实现自己的Thread

Java 中的线程

在Java里创建并启动一个线程的代码:

public static void main(String[] args) {
    Thread thread = new Thread(){
        @Override
        public void run() {
            // do something
        }
    }; 
    thread.start();
}

通过查看 Thread 类的源码,可以发现 start() 方法中最核心的就是调用了 start0() 方法,而 start0() 方法又是一个native方法:

public synchronized void start() {
  
    if (threadStatus != 0)
        throw new IllegalThreadStateException();

    group.add(this);

    boolean started = false;
    try {
        start0(); // 调用本地方法启动线程
        started = true;
    } finally {
        try {
            if (!started) {
                group.threadStartFailed(this);
            }
        } catch (Throwable ignore) {
            /* do nothing. If start0 threw a Throwable then
              it will be passed up the call stack */
        }
    }
}

// native 本地方法
private native void start0();

这里就是通过 Java 代码来调用 JVM 本地的 C 代码,C 中调用系统函数创建线程,并且会反过来调用 Thread 中的 run() 方法,这也是为什么调用了 start() 方法后会自动执行 run() 方法中的逻辑。

操作系统的线程

了解一下 Linux 操作系统中是如何创建一个线程的,创建线程函数:

 int pthread_create(pthread_t *thread, const pthread_attr_t *attr,
                          void *(*start_routine) (void *), void *arg);

pthread_create

使用 man 命令可以查看 pthread_create,这个函数是linux系统的函数,可以用C或者C++直接调用,函数有四个参数:

  • pthread_t *thread:传出参数,调用之后会传出被创建线程的id
  • const pthread_attr_t *attr:线程属性,传NULL即可,保持默认属性
  • void (start_routine) (void *): 线程的启动后的主体函数 相当于java当中的run
  • void *arg:主体函数的参数,如果没有可以传NULL

在 Linux 上启动一个线程的代码:

#include <pthread.h> //头文件
#include <stdio.h>
pthread_t pid; //定义一个变量,接受创建线程后的线程id
//定义线程的主体函数
void* thread_entity(void* arg)
{   
    printf("new Thread!");
}

int main()
{
    // 调用操作系统的函数创建线程,注意四个参数
    pthread_create(&pid, NULL, thread_entity, NULL);
    usleep(100);
    printf("main\n");
}

Java定义本地方法

下面我们就通过 JNI 技术简单实现一个 Thread 类来模拟线程创建和执行过程。

package com.fantasy;

public class Sample {

    static {
        // 装载库,保证JVM在启动的时候就会装载,这里的库指的是C程序生成的动态链接库
        // Linux下是.so文件,Windows下是.dll文件
        System.loadLibrary( "ThreadNative" );
    }

    public static void main(String[] args) {
        new MyThread(() -> System.out.println("Java run method...")).start();
    }
}

class MyThread implements Runnable {
    private Runnable target;

    public MyThread(Runnable target) {
        this.target = target;
    }

    @Override
    public void run() {
        if (target != null) {
            target.run();
        }
    }

    public synchronized void start() {
        start0();
    }

    private native void start0();
}

编译成class文件:

[root@sdb1 fantasy]# pwd
/root/com/fantasy
[root@sdb1 fantasy]# javac Sample.java
[root@sdb1 fantasy]# ll
total 12
-rw-r--r--. 1 root root  501 Jun  1 22:10 MyThread.class
-rw-r--r--. 1 root root 1176 Jun  1 22:10 Sample.class
-rw-r--r--. 1 root root  692 Jun  1 22:09 Sample.java

生成.h头文件:

[root@sdb1 ~]# pwd
/root
[root@sdb1 ~]# javah com.fantasy.MyThread
[root@sdb1 ~]# ll
total 4
drwxr-xr-x. 3 root root  21 Jun  1 22:07 com
-rw-r--r--. 1 root root 429 Jun  1 22:12 com_fantasy_MyThread.h
注意:native 方法在哪个类中就使用javah命令生成对应的头文件,运行 javah 命令需要在包路径外面,javah packageName.className。

下面是 com_fantasy_MyThread.h 头文件的内容:

/* DO NOT EDIT THIS FILE - it is machine generated */
#include <jni.h>
/* Header for class com_fantasy_MyThread */

#ifndef _Included_com_fantasy_MyThread
#define _Included_com_fantasy_MyThread
#ifdef __cplusplus
extern "C" {
#endif
/*
 * Class:     com_fantasy_MyThread
 * Method:    start0
 * Signature: ()V
 */
JNIEXPORT void JNICALL Java_com_fantasy_MyThread_start0
  (JNIEnv *, jobject);

#ifdef __cplusplus
}
#endif
#endif

其中 Java_com_fantasy_MyThread_start0 方法就是需要在C程序中定义的方法。
在注释中我们可以看到有一个 Signature,这个是方法的签名,表示方法的参数类型和返回值类型,简单了解一下:

Java类型Signature说明
booleanZ
byteB
charC
shortS
intI
longL
floatF
doubleD
voidV
ObjectL+/分隔的完整类名例如:Ljava/lang/String表示String类型
Array[签名例如: [I表示int类型的数组, [Ljava/lang/String表示String类型的数组
Method(参数签名)返回类型签名例如: ([I)I表示参数类型为int数组,返回值int类型的方法

C/C++实现本地方法

上一步已经生成了.h头文件,现在我们来实现里面定义的方法。创建 thread.c 文件,并编写如下代码:

#include <pthread.h>
#include <stdio.h>
#include "com_fantasy_MyThread.h" // 导入刚刚编译的那个.h文件

pthread_t pid;
void* thread_entity(void* arg)
{
    printf("new Thread!\n");
}

// 这个方法定义要参考.h文件中的方法
JNIEXPORT void JNICALL Java_com_fantasy_MyThread_start0
(JNIEnv *env, jobject obj){
    pthread_create(&pid, NULL, thread_entity, NULL);
    sleep(1);
    printf("main thread %lu, create thread %lu\n", pthread_self(), pid);
    
    //通过反射调用java中的方法
    //找class,使用 FindClass 方法,参数就是要调用的函数的类的完全限定名,但是需要把点换成/
    jclass cls = (*env)->FindClass(env, "com/fantasy/MyThread");
    //获取 run 方法
    jmethodID mid = (*env)->GetMethodID(env, cls, "run", "()V");
    // 调用方法
    (*env)->CallVoidMethod(env, obj, mid);
    printf("success to call run() method!\n");

}

将这个 thread.c 文件编译为一个动态链接库,命名规则为 libxxx.so,xxx要跟Java中 System.loadLibrary( "ThreadNative") 指定的字符串保持一致,也就是 ThreadNative,编译命令如下:

gcc -fPIC -I /opt/jdk1.8.0_161/include/ -I /opt/jdk1.8.0_161/include/linux -shared -o libThreadNative.so thread.c

至此,Java代码与C代码编写与编译完成。

运行Java代码

运行前,还需要将 .so 文件所在路径加入到path中,这样Java才能找到这个库文件,.so 文件的路径为"/root/libThreadNative.so",所以配置如下:

export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/root/

让我们来运行Java主类来看一下结果吧!

[root@sdb1 ~]# pwd
/root
[root@sdb1 ~]# ll
total 20
drwxr-xr-x. 3 root root   21 Jun  1 22:07 com
-rw-r--r--. 1 root root  429 Jun  1 22:12 com_fantasy_MyThread.h
-rwxr-xr-x. 1 root root 8336 Jun  1 23:32 libThreadNative.so
-rw-r--r--. 1 root root  666 Jun  1 23:32 thread.c
[root@sdb1 ~]# java com.fantasy.Sample
new Thread!
main thread 139986752292608, create thread 139986681366272
Java run method...
success to call run() method!

可以看到输出结果符合我们的预期。通过模拟Thread,也可以得出以下结论:

  • Thread 类调用 start() 方法会通过jvm调用系统底层函数创建线程,并且回调 run() 方法
  • java 级别的线程其实就是操作系统级别的线程
查看原文

KerryWu 赞了文章 · 2020-11-23

差点跪了!阿里3面真题:CAP和BASE理论了解么?可以结合实际案例说下不?

本文节选自我开源的 JavaGuide :https://github.com/Snailclimb/JavaGuide (Github标星92k+!一份涵盖大部分 Java 程序员所需要掌握的核心知识。准备 Java 面试,首选 JavaGuide!)

经历过技术面试的小伙伴想必对这个两个概念已经再熟悉不过了!

Guide哥当年参加面试的时候,不夸张地说,只要问到分布式相关的内容,面试官几乎是必定会问这两个分布式相关的理论。

并且,这两个理论也可以说是小伙伴们学习分布式相关内容的基础了!

因此,小伙伴们非常非常有必要将这理论搞懂,并且能够用自己的理解给别人讲出来。

这篇文章我会站在自己的角度对这两个概念进行解读!

个人能力有限。如果文章有任何需要改善和完善的地方,欢迎在评论区指出,共同进步!——爱你们的Guide哥

CAP理论

CAP 理论/定理起源于 2000年,由加州大学伯克利分校的Eric Brewer教授在分布式计算原理研讨会(PODC)上提出,因此 CAP定理又被称作 布鲁尔定理(Brewer’s theorem)

2年后,麻省理工学院的Seth Gilbert和Nancy Lynch 发表了布鲁尔猜想的证明,CAP理论正式成为分布式领域的定理。

简介

CAP 也就是 Consistency(一致性)Availability(可用性)Partition Tolerance(分区容错性) 这三个单词首字母组合。

CAP 理论的提出者布鲁尔在提出 CAP 猜想的时候,并没有详细定义 ConsistencyAvailabilityPartition Tolerance 三个单词的明确定义。

因此,对于 CAP 的民间解读有很多,一般比较被大家推荐的是下面 👇 这种版本的解。

在理论计算机科学中,CAP 定理(CAP theorem)指出对于一个分布式系统来说,当设计读写操作时,只能能同时满足以下三点中的两个:

  • 一致性(Consistence) : 所有节点访问同一份最新的数据副本
  • 可用性(Availability): 非故障的节点在合理的时间内返回合理的响应(不是错误或者超时的响应)。
  • 分区容错性(Partition tolerance) : 分布式系统出现网络分区的时候,仍然能够对外提供服务。

什么是网络分区?

分布式系统中,多个节点之前的网络本来是连通的,但是因为某些故障(比如部分节点网络出了问题)某些节点之间不连通了,整个网络就分成了几块区域,这就叫网络分区。

partition-tolerance

不是所谓的“3 选 2”

大部分人解释这一定律时,常常简单的表述为:“一致性、可用性、分区容忍性三者你只能同时达到其中两个,不可能同时达到”。实际上这是一个非常具有误导性质的说法,而且在 CAP 理论诞生 12 年之后,CAP 之父也在 2012 年重写了之前的论文。

当发生网络分区的时候,如果我们要继续服务,那么强一致性和可用性只能 2 选 1。也就是说当网络分区之后 P 是前提,决定了 P 之后才有 C 和 A 的选择。也就是说分区容错性(Partition tolerance)我们是必须要实现的。

简而言之就是:CAP 理论中分区容错性 P 是一定要满足的,在此基础上,只能满足可用性 A 或者一致性 C。

因此,分布式系统理论上不可能选择 CA 架构,只能选择 CP 或者 AP 架构。

为啥无同时保证 CA 呢?

举个例子:若系统出现“分区”,系统中的某个节点在进行写操作。为了保证 C, 必须要禁止其他节点的读写操作,这就和 A 发生冲突了。如果为了保证 A,其他节点的读写操作正常的话,那就和 C 发生冲突了。

选择的关键在于当前的业务场景,没有定论,比如对于需要确保强一致性的场景如银行一般会选择保证 CP 。

CAP 实际应用案例

我这里以注册中心来探讨一下 CAP 的实际应用。考虑到很多小伙伴不知道注册中心是干嘛的,这里简单以 Dubbo 为例说一说。

下图是 Dubbo 的架构图。注册中心 Registry 在其中扮演了什么角色呢?提供了什么服务呢?

注册中心负责服务地址的注册与查找,相当于目录服务,服务提供者和消费者只在启动时与注册中心交互,注册中心不转发请求,压力较小。

常见的可以作为注册中心的组件有:ZooKeeper、Eureka、Nacos...。

  1. ZooKeeper 保证的是 CP。 任何时刻对 ZooKeeper 的读请求都能得到一致性的结果,但是, ZooKeeper 不保证每次请求的可用性比如在 Leader 选举过程中或者半数以上的机器不可用的时候服务就是不可用的。
  2. Eureka 保证的则是 AP。 Eureka 在设计的时候就是优先保证 A (可用性)。在 Eureka 中不存在什么 Leader 节点,每个节点都是一样的、平等的。因此 Eureka 不会像 ZooKeeper 那样出现选举过程中或者半数以上的机器不可用的时候服务就是不可用的情况。 Eureka 保证即使大部分节点挂掉也不会影响正常提供服务,只要有一个节点是可用的就行了。只不过这个节点上的数据可能并不是最新的。
  3. Nacos 不仅支持 CP 也支持 AP。

总结

在进行分布式系统设计和开发时,我们不应该仅仅局限在 CAP 问题上,还要关注系统的扩展性、可用性等等

在系统发生“分区”的情况下,CAP 理论只能满足 CP 或者 AP。要注意的是,这里的前提是系统发生了“分区”

如果系统没有发生“分区”的话,节点间的网络连接通信正常的话,也就不存在 P 了。这个时候,我们就可以同时保证 C 和 A 了。

总结:如果系统发生“分区”,我们要考虑选择 CP 还是 AP。如果系统没有发生“分区”的话,我们要思考如何保证 CA 。

推荐阅读

  1. CAP 定理简化 (英文,有趣的案例)
  2. 神一样的 CAP 理论被应用在何方 (中文,列举了很多实际的例子)
  3. 请停止呼叫数据库 CP 或 AP (英文,带给你不一样的思考)

BASE 理论

BASE 理论起源于 2008 年, 由eBay的架构师Dan Pritchett在ACM上发表。

简介

BASEBasically Available(基本可用)Soft-state(软状态)Eventually Consistent(最终一致性) 三个短语的缩写。BASE 理论是对 CAP 中一致性 C 和可用性 A 权衡的结果,其来源于对大规模互联网系统分布式实践的总结,是基于 CAP 定理逐步演化而来的,它大大降低了我们对系统的要求。

BASE 理论的核心思想

即使无法做到强一致性,但每个应用都可以根据自身业务特点,采用适当的方式来使系统达到最终一致性。

也就是牺牲数据的一致性来满足系统的高可用性,系统中一部分数据不可用或者不一致时,仍需要保持系统整体“主要可用”。

BASE 理论本质上是对 CAP 的延伸和补充,更具体地说,是对 CAP 中 AP 方案的一个补充。

为什么这样说呢?

CAP 理论这节我们也说过了:

如果系统没有发生“分区”的话,节点间的网络连接通信正常的话,也就不存在 P 了。这个时候,我们就可以同时保证 C 和 A 了。因此,如果系统发生“分区”,我们要考虑选择 CP 还是 AP。如果系统没有发生“分区”的话,我们要思考如何保证 CA 。

因此,AP 方案只是在系统发生分区的时候放弃一致性,而不是永远放弃一致性。在分区故障恢复后,系统应该达到最终一致性。这一点其实就是 BASE 理论延伸的地方。

BASE 理论三要素

BASE理论三要素

1. 基本可用

基本可用是指分布式系统在出现不可预知故障的时候,允许损失部分可用性。但是,这绝不等价于系统不可用。

什么叫允许损失部分可用性呢?

  • 响应时间上的损失: 正常情况下,处理用户请求需要 0.5s 返回结果,但是由于系统出现故障,处理用户请求的时间变为 3 s。
  • 系统功能上的损失:正常情况下,用户可以使用系统的全部功能,但是由于系统访问量突然剧增,系统的部分非核心功能无法使用。

2. 软状态

软状态指允许系统中的数据存在中间状态(CAP 理论中的数据不一致),并认为该中间状态的存在不会影响系统的整体可用性,即允许系统在不同节点的数据副本之间进行数据同步的过程存在延时。

3. 最终一致性

最终一致性强调的是系统中所有的数据副本,在经过一段时间的同步后,最终能够达到一个一致的状态。因此,最终一致性的本质是需要系统保证最终数据能够达到一致,而不需要实时保证系统数据的强一致性。

分布式一致性的 3 种级别:

  1. 强一致性 :系统写入了什么,读出来的就是什么。
  2. 弱一致性 :不一定可以读取到最新写入的值,也不保证多少时间之后读取到的数据是最新的,只是会尽量保证某个时刻达到数据一致的状态。
  3. 最终一致性 :弱一致性的升级版。,系统会保证在一定时间内达到数据一致的状态,

业界比较推崇是最终一致性级别,但是某些对数据一致要求十分严格的场景比如银行转账还是要保证强一致性。

总结

ACID 是数据库事务完整性的理论,CAP 是分布式系统设计理论,BASE 是 CAP 理论中 AP 方案的延伸。

图解计算机基础+个人原创的 Java 面试手册PDF版。

微信搜“JavaGuide”回复“计算机基础”即可获取图解计算机基础+个人原创的 Java 面试手册。

查看原文

赞 9 收藏 5 评论 0

认证与成就

  • 获得 105 次点赞
  • 获得 2 枚徽章 获得 0 枚金徽章, 获得 0 枚银徽章, 获得 2 枚铜徽章

擅长技能
编辑

开源项目 & 著作
编辑

注册于 2018-07-24
个人主页被 4k 人浏览