没有足够的数据
(゚∀゚ )
暂时没有任何数据
(゚∀゚ )
暂时没有任何数据
KerryWu 发布了文章 · 2月15日
本文先通过分布式事务中tcc方案,衍生出seata的tcc模式,主要还是会通过代码示例来做介绍。github代码地址可提前下载,该项目中包括数据库、seata配置,以及所有分布式服务的全部代码。大家如果想练练手,可以先拉取该项目代码,再结合本文学习。核心配置环境如下:
环境类型 | 版本号 |
---|---|
jdk | 1.8.0_251 |
mysql | 8.0.22 |
seata server | 1.4.1 |
我们前面有几篇文章都有介绍过分布式事务的方案,目前常见的分布式事务方案有:2pc、tcc和异步确保型。之前讲过用jta atomikos实现多数据源的 2pc
,用 异步确保型
方案实现支付业务的事务等等,就是没专门讲过 tcc
的应用。
因为tcc方案的操作难度还是比较大的。不能单打独斗,最好需要依托一个成熟的框架来实现。常见的tcc开源框架有tcc-transaction、Hmily和ByteTCC等,不过他们不像seata背靠大厂,无法提供持续的维护,因此我更推荐seata的tcc方案。
先说说seata吧,分布式事务的解决方案肯定不局限于上面说的三种,实际上五花八门。因为它的确很让人头疼,各位大神都想研发出最好用的框架。本文的主角 - seata
,就是阿里的一个开源项目。
seata提供了AT、TCC、SAGA 和 XA,一共4种事务模式。像AT模式就很受欢迎,我们在实现多数据源的事务一致性时,通常会选用 2PC
的方案,等待所有数据源的事务执行成功,最后再一起提交事务。这个等待所有数据源事务执行的过程就比较耗时,即影响性能,也不安全。
而seata AT模式的做法就很灵活,它学习数据库的 undo log,每个事务执行时立即提交事务,但会把 undo 的回退sql记录下来。如果所有事务执行成功,清除记录 undo sql的行记录,如果某个事务失败,则执行对应 undo sql 回滚数据。在保证事务的同时,并发量也大了起来。
但我们今天要讲的是 seata TCC 模式,如果你对 Seata的其他模式感兴趣,可以上官网了解。
先讲一下示例的业务吧,我们还是拿比较经典的电商支付场景举例。假设支付成功后,涉及到三个系统事务:
按照tcc(try-confirm-cancel)的思路,这三个事务可以分别分解成下面的过程。
订单系统 order
库存系统 storage
账户系统 account
为了模拟分布式事务,上述的不同系统业务,我们通过在不同数据库中创建表结构来模拟。当然tcc的分布式事务不局限于数据库层面,还包括http接口调用和rpc调用等,但是异曲同工,可以作为示例参考。
下面先列出三张业务表的表结构,具体的sql可见最后附件。
表:order
列名 | 类型 | 备注 |
---|---|---|
id | int | 主键 |
order_no | varchar | 订单号 |
user_id | int | 用户id |
product_id | int | 产品id |
amount | int | 数量 |
money | decimal | 金额 |
status | int | 订单状态:0:创建中;1:已完结 |
表:storage
列名 | 类型 | 备注 |
---|---|---|
id | int | 主键 |
product_id | int | 产品id |
residue | int | 剩余库存 |
frozen | int | TCC事务锁定的库存 |
表:account
列名 | 类型 | 备注 |
---|---|---|
id | int | 主键 |
user_id | int | 用户id |
residue | int | 剩余可用额度 |
frozen | int | TCC事务锁定的金额 |
seata server 的安装包可直接从官方github下载,下载压缩包后,解压到本地或服务器上。
Seata Server 的配置文件有两个:
registry.conf
Seata Server 要向注册中心进行注册,这样,其他服务就可以通过注册中心去发现 Seata Server,与 Seata Server 进行通信。Seata 支持多款注册中心服务:nacos 、eureka、redis、zk、consul、etcd3、sofa。我们项目中要使用 eureka 注册中心,eureka服务的连接地址、注册的服务名,这需要在 registry.conf 文件中对 registry
进行配置。
Seata 需要存储全局事务信息、分支事务信息、全局锁信息,这些数据存储到什么位置?针对存储位置的配置,支持放在配置中心,或者也可以放在本地文件。Seata Server 支持的配置中心服务有:nacos 、apollo、zk、consul、etcd3。这里我们选择最简单的,使用本地文件,这需要在 registry.conf 文件中对 config
进行配置。
file.conf
file.conf 中对事务信息的存储位置进行配置,存储位置支持:file、db、redis。
这里我们选择数据库作为存储位置,这需要在 file.conf 中进行配置。
执行 seata/bin/seata-server.sh(windows 是 seata-server.bat) 脚本即可启动seata server。还可以配置下列参数:
-h:注册到注册中心的ip
-p:server rpc 监听端口,默认 8091
-m:全局事务会话信息存储模式,file、db,优先读取启动参数
-n:server node,多个server时,需要区分各自节点,用于生成不同区间的transctionId,以免冲突
-e:多环境配置
mysql 8
默认启动后会报mysql-connector-java-x.jar
驱动的错误,是因为seata server 默认不支持mysql 8。
可以在seata server的 lib 文件夹下替换 mysql 的驱动 jar 包。lib 文件夹下,已经有一个 jdbc 文件夹,把里面驱动版本为 8 的 mysql-connector-java-x.jar 包拷贝到外面 lib 文件夹下即可。
github示例项目中包括3个业务服务、1个注册中心,以及resources下的数据库脚本和seata server配置文件。按照服务的启动顺序,如下分类:
3个业务服务中,order订单服务
可以被称为“主事务”,当订单创建成功后,再在订单服务中调用 account账号服务
和 storage库存服务
两个“副事务”。因此从 seata tcc代码层面上,可以分成下面两类。
下文中不会列举业务代码,完整代码可以从github上查看,只会列出 seata 的相关代码和配置。
配置文件中需要配置 tx-service-group
,需要注意的是,3个业务服务中都需要配置同样的值。
application.yml
spring:
cloud:
alibaba:
seata:
tx-service-group: order_tx_group
在application.yml同级目录,即 resources 目录下,创建两个seata 的配置文件。还记得在seata server 启动的时候也有这两个文件,但内容不一样,不要混淆了。
file.conf
transport {
type = "TCP"
server = "NIO"
heartbeat = true
enableClientBatchSendRequest = true
threadFactory {
bossThreadPrefix = "NettyBoss"
workerThreadPrefix = "NettyServerNIOWorker"
serverExecutorThread-prefix = "NettyServerBizHandler"
shareBossWorker = false
clientSelectorThreadPrefix = "NettyClientSelector"
clientSelectorThreadSize = 1
clientWorkerThreadPrefix = "NettyClientWorkerThread"
bossThreadSize = 1
workerThreadSize = "default"
}
shutdown {
wait = 3
}
serialization = "seata"
compressor = "none"
}
service {
vgroupMapping.order_tx_group = "seata-server"
order_tx_group.grouplist = "127.0.0.1:8091"
enableDegrade = false
disableGlobalTransaction = false
}
client {
rm {
asyncCommitBufferLimit = 10000
lock {
retryInterval = 10
retryTimes = 30
retryPolicyBranchRollbackOnConflict = true
}
reportRetryCount = 5
tableMetaCheckEnable = false
reportSuccessEnable = false
}
tm {
commitRetryCount = 5
rollbackRetryCount = 5
}
undo {
dataValidation = true
logSerialization = "jackson"
logTable = "undo_log"
}
log {
exceptionRate = 100
}
}
registry.conf
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "eureka"
eureka {
serviceUrl = "http://localhost:8761/eureka"
}
}
config {
# file、nacos 、apollo、zk、consul、etcd3、springCloudConfig
type = "file"
file {
name = "file.conf"
}
}
这是配置 TCC 子服务的核心代码,
该注解需要添加到上面描述的接口上,表示实现该接口的类被 seata 来管理,seata 根据事务的状态,自动调用我们定义的方法,如果没问题则调用 Commit 方法,否则调用 Rollback 方法。
该注解用在接口的 Try 方法上。
该注解用来修饰 Try 方法的入参,被修饰的入参可以在 Commit 方法和 Rollback 方法中通过 BusinessActionContext 获取。
在接口方法的实现代码中,可以通过 BusinessActionContext 来获取参数, BusinessActionContext 就是 seata tcc 的事务上下文,用于存放 tcc 事务的一些关键数据。BusinessActionContext 对象可以直接作为 commit 方法和 rollbakc 方法的参数,Seata 会自动注入参数。
OrderTccAction.java
@LocalTCC
public interface OrderTccAction {
/**
* try 尝试
*
* BusinessActionContext 上下文对象,用来在两个阶段之间传递数据
* BusinessActionContextParameter 注解的参数数据会被存入 BusinessActionContext
* TwoPhaseBusinessAction 注解中commitMethod、rollbackMethod 属性有默认值,可以不写
*
* @param businessActionContext
* @param orderNo
* @param userId
* @param productId
* @param amount
* @param money
* @return
*/
@TwoPhaseBusinessAction(name = "orderTccAction")
boolean prepareCreateOrder(BusinessActionContext businessActionContext,
@BusinessActionContextParameter(paramName = "orderNo") String orderNo,
@BusinessActionContextParameter(paramName = "userId") Long userId,
@BusinessActionContextParameter(paramName = "productId") Long productId,
@BusinessActionContextParameter(paramName = "amount") Integer amount,
@BusinessActionContextParameter(paramName = "money") BigDecimal money);
/**
* commit 提交
* @param businessActionContext
* @return
*/
boolean commit(BusinessActionContext businessActionContext);
/**
* cancel 撤销
* @param businessActionContext
* @return
*/
boolean rollback(BusinessActionContext businessActionContext);
}
OrderTccActionImpl.java
@Slf4j
@Component
public class OrderTccActionImpl implements OrderTccAction {
private final OrderMapper orderMapper;
public OrderTccActionImpl(OrderMapper orderMapper){
this.orderMapper=orderMapper;
}
/**
* try 尝试
*
* BusinessActionContext 上下文对象,用来在两个阶段之间传递数据
* BusinessActionContextParameter 注解的参数数据会被存入 BusinessActionContext
* TwoPhaseBusinessAction 注解中commitMethod、rollbackMethod 属性有默认值,可以不写
*
* @param businessActionContext
* @param orderNo
* @param userId
* @param productId
* @param amount
* @param money
* @return
*/
@Transactional(rollbackFor = Exception.class)
@Override
public boolean prepareCreateOrder(BusinessActionContext businessActionContext,
String orderNo,
Long userId,
Long productId,
Integer amount,
BigDecimal money) {
orderMapper.save(new OrderDO(orderNo,userId, productId, amount, money, 0));
ResultHolder.setResult(OrderTccAction.class, businessActionContext.getXid(), "p");
return true;
}
/**
* commit 提交
*
* @param businessActionContext
* @return
*/
@Transactional(rollbackFor = Exception.class)
@Override
public boolean commit(BusinessActionContext businessActionContext) {
//检查标记是否存在,如果标记不存在不重复提交
String p = ResultHolder.getResult(OrderTccAction.class, businessActionContext.getXid());
if (p == null){
return true;
}
/**
* 上下文对象从第一阶段向第二阶段传递时,先转成了json数据,然后还原成上下文对象
* 其中的整数比较小的会转成Integer类型,所以如果需要Long类型,需要先转换成字符串在用Long.valueOf()解析。
*/
String orderNo = businessActionContext.getActionContext("orderNo").toString();
orderMapper.updateStatusByOrderNo(orderNo, 1);
//提交完成后,删除标记
ResultHolder.removeResult(OrderTccAction.class, businessActionContext.getXid());
return true;
}
/**
* cancel 撤销
*
* 第一阶段没有完成的情况下,不必执行回滚。因为第一阶段有本地事务,事务失败时已经进行了回滚。
* 如果这里第一阶段成功,而其他全局事务参与者失败,这里会执行回滚
* 幂等性控制:如果重复执行回滚则直接返回
*
* @param businessActionContext
* @return
*/
@Transactional(rollbackFor = Exception.class)
@Override
public boolean rollback(BusinessActionContext businessActionContext) {
//检查标记是否存在,如果标记不存在不重复提交
String p = ResultHolder.getResult(OrderTccAction.class, businessActionContext.getXid());
if (p == null){
return true;
}
String orderNo = businessActionContext.getActionContext("orderNo").toString();
orderMapper.deleteByOrderNo(orderNo);
//提交完成后,删除标记
ResultHolder.removeResult(OrderTccAction.class, businessActionContext.getXid());
return true;
}
}
@GlobalTransactional
注解是唯一作用到“主事务”的方法。该注解加在“主事务”调用“副事务”的方法上。
OrderServiceImpl.java
@Service
public class OrderServiceImpl implements OrderService {
private final OrderTccAction orderTccAction;
private final AccountFeign accountFeign;
private final StorageFeign storageFeign;
public OrderServiceImpl(OrderTccAction orderTccAction, AccountFeign accountFeign, StorageFeign storageFeign){
this.orderTccAction=orderTccAction;
this.accountFeign=accountFeign;
this.storageFeign=storageFeign;
}
/**
* 创建订单
* @param orderDO
*/
@GlobalTransactional
@Override
public void createOrder(OrderDO orderDO) {
String orderNo=this.generateOrderNo();
//创建订单
orderTccAction.prepareCreateOrder(null,
orderNo,
orderDO.getUserId(),
orderDO.getProductId(),
orderDO.getAmount(),
orderDO.getMoney());
//扣余额
accountFeign.decreaseMoney(orderDO.getUserId(),orderDO.getMoney());
//扣库存
storageFeign.decreaseStorage(orderDO.getProductId(),orderDO.getAmount());
}
private String generateOrderNo(){
return LocalDateTime.now()
.format(
DateTimeFormatter.ofPattern("yyMMddHHmmssSSS")
);
}
}
account 和 storage 两个服务相比较于 order,只少了 “4.1.4. @GlobalTransactional 全局服务”,其他的配置完全一样。因此,这里就不再赘言了。
测试
通过调用“主事务” order-service 的创建订单接口,来模拟分布式事务。我们可以通过在3个业务服务的不同代码处故意抛出错误,看是否能够实现事务的一致回滚。
seata框架表结构
在 /resources/database-sql 的数据库脚本中,各自还有一些 seata 框架本身的表结构,用于存储分布式事务各自的中间状态。因为这个中间状态很短,一旦事务一致性达成,表数据就会自动删除,因此平时我们无法查看数据库。
因为seata tcc模式,会一直阻塞到所有的 try执行完毕,再执行后续的。从而我们可以通过在部分业务服务try的代码中加上Thread.sleep(10000)
,强制让事务过程变慢,从而就可以看到这些 seata 表数据。
幂等性
tcc模式中,Commit
和 Cancel
都是有自动重试功能的,处于事务一致性考虑,重试功能很有必要。但我们就一定要慎重考虑方法的 幂等性
,示例代码中的ResultHolder类并不是个好方案,还是要在Commit、Cancel业务方法本身做幂等性要求。
本文先通过分布式事务中tcc方案,衍生出seata的tcc模式,主要还是会通过代码示例来做介绍。github代码地址可提前下载,该项目中包括数据库、seata配置,以及所有分布式服务的全部代码。大家如果想练练手,可以先拉取该项目代码,再结合本文学习。核心配置环境如下:
赞 0 收藏 0 评论 0
KerryWu 赞了文章 · 1月25日
作者:不学无数的程序员\
链接:https://www.jianshu.com/p/a8e...
在做系统优化时,想到了将数据进行分级存储的思路。因为在系统中会存在一些数据,有些数据的实时性要求不高,比如一些配置信息。
基本上配置了很久才会变一次。而有一些数据实时性要求非常高,比如订单和流水的数据。所以这里根据数据要求实时性不同将数据分为三级。
但是只要使用到缓存,无论是本地内存做缓存还是使用 redis 做缓存,那么就会存在数据同步的问题,因为配置信息缓存在内存中,而内存时无法感知到数据在数据库的修改。这样就会造成数据库中的数据与缓存中数据不一致的问题。
接下来就讨论一下关于保证缓存和数据库双写时的数据一致性。
那么我们这里列出来所有策略,并且讨论他们优劣性。
这种场景一般是没有人使用的,主要原因是在更新缓存那一步,为什么呢?因为有的业务需求缓存中存在的值并不是直接从数据库中查出来的,有的是需要经过一系列计算来的缓存值,那么这时候后你要更新缓存的话其实代价是很高的。如果此时有大量的对数据库进行写数据的请求,但是读请求并不多,那么此时如果每次写请求都更新一下缓存,那么性能损耗是非常大的。
举个例子比如在数据库中有一个值为 1 的值,此时我们有 10 个请求对其每次加一的操作,但是这期间并没有读操作进来,如果用了先更新数据库的办法,那么此时就会有十个请求对缓存进行更新,会有大量的冷数据产生,如果我们不更新缓存而是删除缓存,那么在有读请求来的时候那么就会只更新缓存一次。
这一种情况应该不需要我们考虑了吧,和第一种情况是一样的。
该方案也会出问题,具体出现的原因如下。
此时来了两个请求,请求 A(更新操作) 和请求 B(查询操作)
那么这时候就会产生数据库和 Redis 数据不一致的问题。如何解决呢?其实最简单的解决办法就是延时双删的策略。
但是上述的保证事务提交完以后再进行删除缓存还有一个问题,就是如果你使用的是 Mysql 的读写分离的架构的话,那么其实主从同步之间也会有时间差。
此时来了两个请求,请求 A(更新操作) 和请求 B(查询操作)
此时的解决办法就是如果是对 Redis 进行填充数据的查询数据库操作,那么就强制将其指向主库进行查询。
问题:这一种情况也会出现问题,比如更新数据库成功了,但是在删除缓存的阶段出错了没有删除成功,那么此时再读取缓存的时候每次都是错误的数据了。
此时解决方案就是利用消息队列进行删除的补偿。具体的业务逻辑用语言描述如下:
但是这个方案会有一个缺点就是会对业务代码造成大量的侵入,深深的耦合在一起,所以这时会有一个优化的方案,我们知道对 Mysql 数据库更新操作后再 binlog 日志中我们都能够找到相应的操作,那么我们可以订阅 Mysql 数据库的 binlog 日志对缓存进行操作。
每种方案各有利弊,比如在第二种先删除缓存,后更新数据库这个方案我们最后讨论了要更新 Redis 的时候强制走主库查询就能解决问题,那么这样的操作会对业务代码进行大量的侵入,但是不需要增加的系统,不需要增加整体的服务的复杂度。
最后一种方案我们最后讨论了利用订阅 binlog 日志进行搭建独立系统操作 Redis,这样的缺点其实就是增加了系统复杂度。其实每一次的选择都需要我们对于我们的业务进行评估来选择,没有一种技术是对于所有业务都通用的。没有最好的,只有最适合我们的。
近期热文推荐:
1.Java 15 正式发布, 14 个新特性,刷新你的认知!!
2.终于靠开源项目弄到 IntelliJ IDEA 激活码了,真香!
3.我用 Java 8 写了一段逻辑,同事直呼看不懂,你试试看。。
觉得不错,别忘了随手点赞+转发哦!
查看原文作者:不学无数的程序员\链接:[链接]在做系统优化时,想到了将数据进行分级存储的思路。因为在系统中会存在一些数据,有些数据的实时性要求不高,比如一些配置信息。基本上配置了很久才会变一次。而有一些数据实时性要求非常高,比如订单和流水的数据。所以这里根据...
赞 4 收藏 3 评论 1
KerryWu 收藏了文章 · 1月18日
TCC 与 Seata AT 事务一样都是两阶段事务,它与 AT 事务的主要区别为:
以账户服务为例,当下订单时要扣减用户账户金额:
假如用户购买 100 元商品,要扣减 100 元。
TCC 事务首先对这100元的扣减金额进行预留,或者说是先冻结这100元:
如果第一阶段能够顺利完成,那么说明“扣减金额”业务(分支事务)最终肯定是可以成功的。当全局事务提交时, TC会控制当前分支事务进行提交,如果提交失败,TC 会反复尝试,直到提交成功为止。
当全局事务提交时,就可以使用冻结的金额来最终实现业务数据操作:
如果全局事务回滚,就把冻结的金额进行解冻,恢复到以前的状态,TC 会控制当前分支事务回滚,如果回滚失败,TC 会反复尝试,直到回滚完成为止。
多个TCC全局事务允许并发,它们执行扣减金额时,只需要冻结各自的金额即可:
Seata 支持 TCC 事务模式,与 AT 模式相同的,也需要以下组件来支持全局事务的控制:
新建 Empty Project:
工程命名为 seata-tcc
,存放到 seata-samples 文件夹下,与 seata-at
工程存放在一起:
压缩文件中的 7 个项目目录解压缩到 seata-tcc
目录:
在 idea 中按两下 shift
键,搜索 add maven projects
,打开 maven 工具:
然后选择 seata-tcc
工程目录下的 7 个项目的 pom.xml
导入:
在订单项目中执行添加订单:
我们要添加以下 TCC 事务操作的代码:
T
ry - 第一阶,冻结数据阶段,向订单表直接插入订单,订单状态设置为0(冻结状态)。C
onfirm - 第二阶段,提交事务,将订单状态修改成1(正常状态)。C
ancel - 第二阶段,回滚事务,删除订单。打开 order-parent 中注释掉的 seata 依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.2.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>cn.tedu</groupId>
<artifactId>order-parent</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>pom</packaging>
<name>order-parent</name>
<properties>
<mybatis-plus.version>3.3.2</mybatis-plus.version>
<druid-spring-boot-starter.version>1.1.23</druid-spring-boot-starter.version>
<seata.version>1.3.0</seata.version>
<spring-cloud-alibaba-seata.version>2.0.0.RELEASE</spring-cloud-alibaba-seata.version>
<spring-cloud.version>Hoxton.SR6</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>${mybatis-plus.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>${druid-spring-boot-starter.version}</version>
</dependency>
<!-- 打开 seata 依赖 -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-seata</artifactId>
<version>${spring-cloud-alibaba-seata.version}</version>
<exclusions>
<exclusion>
<artifactId>seata-all</artifactId>
<groupId>io.seata</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
<version>${seata.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
设置全局事务组的组名:
spring:
application:
name: order
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost/seata_order?useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2B8
username: root
password: root
# 事务组设置
cloud:
alibaba:
seata:
tx-service-group: order_tx_group
......
与 AT 事务中的配置完全相同:
registry.conf
:
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "eureka"
nacos {
serverAddr = "localhost"
namespace = ""
cluster = "default"
}
eureka {
serviceUrl = "http://localhost:8761/eureka"
# application = "default"
# weight = "1"
}
redis {
serverAddr = "localhost:6379"
db = "0"
password = ""
cluster = "default"
timeout = "0"
}
zk {
cluster = "default"
serverAddr = "127.0.0.1:2181"
session.timeout = 6000
connect.timeout = 2000
username = ""
password = ""
}
consul {
cluster = "default"
serverAddr = "127.0.0.1:8500"
}
etcd3 {
cluster = "default"
serverAddr = "http://localhost:2379"
}
sofa {
serverAddr = "127.0.0.1:9603"
application = "default"
region = "DEFAULT_ZONE"
datacenter = "DefaultDataCenter"
cluster = "default"
group = "SEATA_GROUP"
addressWaitTime = "3000"
}
file {
name = "file.conf"
}
}
config {
# file、nacos 、apollo、zk、consul、etcd3、springCloudConfig
type = "file"
nacos {
serverAddr = "localhost"
namespace = ""
group = "SEATA_GROUP"
}
consul {
serverAddr = "127.0.0.1:8500"
}
apollo {
app.id = "seata-server"
apollo.meta = "http://192.168.1.204:8801"
namespace = "application"
}
zk {
serverAddr = "127.0.0.1:2181"
session.timeout = 6000
connect.timeout = 2000
username = ""
password = ""
}
etcd3 {
serverAddr = "http://localhost:2379"
}
file {
name = "file.conf"
}
}
file.conf
:
transport {
# tcp udt unix-domain-socket
type = "TCP"
#NIO NATIVE
server = "NIO"
#enable heartbeat
heartbeat = true
# the client batch send request enable
enableClientBatchSendRequest = true
#thread factory for netty
threadFactory {
bossThreadPrefix = "NettyBoss"
workerThreadPrefix = "NettyServerNIOWorker"
serverExecutorThread-prefix = "NettyServerBizHandler"
shareBossWorker = false
clientSelectorThreadPrefix = "NettyClientSelector"
clientSelectorThreadSize = 1
clientWorkerThreadPrefix = "NettyClientWorkerThread"
# netty boss thread size,will not be used for UDT
bossThreadSize = 1
#auto default pin or 8
workerThreadSize = "default"
}
shutdown {
# when destroy server, wait seconds
wait = 3
}
serialization = "seata"
compressor = "none"
}
service {
#transaction service group mapping
# order_tx_group 与 yml 中的 “tx-service-group: order_tx_group” 配置一致
# “seata-server” 与 TC 服务器的注册名一致
# 从eureka获取seata-server的地址,再向seata-server注册自己,设置group
vgroupMapping.order_tx_group = "seata-server"
#only support when registry.type=file, please don't set multiple addresses
order_tx_group.grouplist = "127.0.0.1:8091"
#degrade, current not support
enableDegrade = false
#disable seata
disableGlobalTransaction = false
}
client {
rm {
asyncCommitBufferLimit = 10000
lock {
retryInterval = 10
retryTimes = 30
retryPolicyBranchRollbackOnConflict = true
}
reportRetryCount = 5
tableMetaCheckEnable = false
reportSuccessEnable = false
}
tm {
commitRetryCount = 5
rollbackRetryCount = 5
}
undo {
dataValidation = true
logSerialization = "jackson"
logTable = "undo_log"
}
log {
exceptionRate = 100
}
}
根据前面的分析,订单数据操作有以下三项:
在 OrderMapper 中已经有插入订单的方法了,现在需要添加修改订单和删除订单的方法(删除方法从BaseMapper继承):
package cn.tedu.order.mapper;
import cn.tedu.order.entity.Order;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Param;
public interface OrderMapper extends BaseMapper {
void create(Order order);
void updateStatus(@Param("orderId") Long orderId, @Param("status") Integer status);
}
那么对应的 OrderMapper.xml
中也要添加 sql:
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="cn.tedu.order.mapper.OrderMapper" >
<resultMap id="BaseResultMap" type="cn.tedu.order.entity.Order" >
<id column="id" property="id" jdbcType="BIGINT" />
<result column="user_id" property="userId" jdbcType="BIGINT" />
<result column="product_id" property="productId" jdbcType="BIGINT" />
<result column="count" property="count" jdbcType="INTEGER" />
<result column="money" property="money" jdbcType="DECIMAL" />
<result column="status" property="status" jdbcType="INTEGER" />
</resultMap>
<insert id="create">
INSERT INTO `order` (`id`,`user_id`,`product_id`,`count`,`money`,`status`)
VALUES(#{id}, #{userId}, #{productId}, #{count}, #{money}, ${status});
</insert>
<update id="updateStatus" >
UPDATE `order` SET `status`=#{status} WHERE `id`=#{orderId};
</update>
<delete id="deleteById">
DELETE FROM `order` WHERE `id`=#{orderId}
</delete>
</mapper>
第二阶段
第二阶段为了处理幂等性问题这里首先添加一个工具类 ResultHolder
。
这个工具也可以在第二阶段 Confirm 或 Cancel 阶段对第一阶段的成功与否进行判断,在第一阶段成功时需要保存一个标识。
ResultHolder
可以为每一个全局事务保存一个标识:
package cn.tedu.order.tcc;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class ResultHolder {
private static Map<Class<?>, Map<String, String>> map = new ConcurrentHashMap<Class<?>, Map<String, String>>();
public static void setResult(Class<?> actionClass, String xid, String v) {
Map<String, String> results = map.get(actionClass);
if (results == null) {
synchronized (map) {
if (results == null) {
results = new ConcurrentHashMap<>();
map.put(actionClass, results);
}
}
}
results.put(xid, v);
}
public static String getResult(Class<?> actionClass, String xid) {
Map<String, String> results = map.get(actionClass);
if (results != null) {
return results.get(xid);
}
return null;
}
public static void removeResult(Class<?> actionClass, String xid) {
Map<String, String> results = map.get(actionClass);
if (results != null) {
results.remove(xid);
}
}
}
Seata 实现 TCC 操作需要定义一个接口,我们在接口中添加以下方法:
prepareCreateOrder()
commit()
rollback()
package cn.tedu.order.tcc;
import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import io.seata.rm.tcc.api.LocalTCC;
import io.seata.rm.tcc.api.TwoPhaseBusinessAction;
import java.math.BigDecimal;
@LocalTCC
public interface OrderTccAction {
// T (try - 预留资源,冻结订单)
/*
第一阶段的方法
通过注解指定第二阶段的两个方法名
BusinessActionContext 上下文对象,用来在两个阶段之间传递数据
@BusinessActionContextParameter 注解的参数数据会被存入 BusinessActionContext
@TwoPhaseBusinessAction 提交回滚的默认值可以不写
*/
@TwoPhaseBusinessAction(name = "orderTccAction", commitMethod = "commit", rollbackMethod = "rollback")
boolean prepareCreateOrder(BusinessActionContext businessActionContext,
@BusinessActionContextParameter(paramName = "orderId") Long orderId,
@BusinessActionContextParameter(paramName = "userId") Long userId,
@BusinessActionContextParameter(paramName = "productId") Long productId,
@BusinessActionContextParameter(paramName = "count") Integer count,
@BusinessActionContextParameter(paramName = "money") BigDecimal money);
//C (confirm - 确认,提交)
boolean commit(BusinessActionContext businessActionContext);
//C (cancel - 取消,回滚)
boolean rollback(BusinessActionContext businessActionContext);
}
实现类:
package cn.tedu.order.tcc;
import cn.tedu.order.entity.Order;
import cn.tedu.order.mapper.OrderMapper;
import io.seata.rm.tcc.api.BusinessActionContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.math.BigDecimal;
@Component
public class OrderTccActionImpl implements OrderTccAction {
@Autowired
private OrderMapper orderMapper;
@Override
public boolean prepareCreateOrder(BusinessActionContext ctx, Long orderId, Long userId, Long productId, Integer count, BigDecimal money) {
orderMapper.create(new Order(orderId, userId, productId, count, money, 0));
//保存一阶段的成功标记
ResultHolder.setResult(OrderTccAction.class, ctx.getXid(), "p");
return true;
}
@Override
public boolean commit(BusinessActionContext ctx) {
//检查标记是否存在,如果标记不存在不重复提交
String p = ResultHolder.getResult(OrderTccAction.class, ctx.getXid());
if (p == null){
return true;
}
/**
* 上下文对象从第一阶段向第二阶段传递时,先转成了json数据,然后还原成上下文对象
* 其中的整数比较小的会转成Integer类型,所以如果需要Long类型,需要先转换成字符串在用Long.valueOf()解析。
*/
Long orderId = Long.valueOf(ctx.getActionContext("orderId").toString());
orderMapper.updateStatus(orderId, 1);
//提交完成后,删除标记
ResultHolder.removeResult(OrderTccAction.class, ctx.getXid());
return true;
}
@Override
public boolean rollback(BusinessActionContext ctx) {
//第一阶段没有完成的情况下,不必执行回滚
//因为第一阶段有本地事务,事务失败时已经进行了回滚。
//如果这里第一阶段成功,而其他全局事务参与者失败,这里会执行回滚
//幂等性控制:如果重复执行回滚则直接返回
//检查标记是否存在,如果标记不存在不重复提交
String p = ResultHolder.getResult(OrderTccAction.class, ctx.getXid());
if (p == null){
return true;
}
Long orderId = Long.valueOf(ctx.getActionContext("orderId").toString());
orderMapper.deleteById(orderId);
//提交完成后,删除标记
ResultHolder.removeResult(OrderTccAction.class, ctx.getXid());
return true;
}
}
业务代码中不再直接保存订单数据,而是调用 TCC 第一阶段方法prepareCreateOrder()
,并添加全局事务注解 @GlobalTransactional
:
package cn.tedu.order.service;
import cn.tedu.order.entity.Order;
import cn.tedu.order.feign.AccountClient;
import cn.tedu.order.feign.EasyIdGeneratorClient;
import cn.tedu.order.feign.StorageClient;
import cn.tedu.order.mapper.OrderMapper;
import cn.tedu.order.tcc.OrderTccAction;
import io.seata.spring.annotation.GlobalTransactional;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Random;
@Service
public class OrderServiceImpl implements OrderService {
// @Autowired
// private OrderMapper orderMapper;
@Autowired
EasyIdGeneratorClient easyIdGeneratorClient;
@Autowired
private AccountClient accountClient;
@Autowired
private StorageClient storageClient;
@Autowired
private OrderTccAction orderTccAction;
@GlobalTransactional
@Override
public void create(Order order) {
// 从全局唯一id发号器获得id
Long orderId = easyIdGeneratorClient.nextId("order_business");
order.setId(orderId);
// orderMapper.create(order);
// 这里修改成调用 TCC 第一节端方法
//orderTccAction是一个动态代理对象,其中添加了前置拦截器,所以底层会创建,所以传null值即可
orderTccAction.prepareCreateOrder(
null,
order.getId(),
order.getUserId(),
order.getProductId(),
order.getCount(),
order.getMoney());
// 修改库存
//storageClient.decrease(order.getProductId(), order.getCount());
// 修改账户余额
//accountClient.decrease(order.getUserId(), order.getMoney());
}
}
按顺序启动服务:
调用保存订单,地址:
http://localhost:8083/create?userId=1&productId=1&count=10&money=100
观察控制台日志:
查看数据库表中的订单数据:
在库存项目中执行减少库存:
我们要添加以下 TCC 事务操作的代码:
T
ry - 第一阶,冻结数据阶段,将要减少的库存量先冻结:C
onfirm - 第二阶段,提交事务,使用冻结的库存完成业务数据处理:C
ancel - 第二阶段,回滚事务,冻结的库存解冻,恢复以前的库存量:有三个文件需要配置:
这三个文件的设置与上面 order 项目的配置完全相同,请参考上面订单配置一章进行配置。
根据前面的分析,库存数据操作有以下三项:
在 StorageMapper 中添加三个方法:
package cn.tedu.storage.mapper;
import cn.tedu.storage.entity.Storage;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Param;
public interface StorageMapper extends BaseMapper<Storage> {
void decrease(Long productId, Integer count);
// 冻结库存
void updateFrozen(@Param("productId") Long productId, @Param("residue") Integer residue, @Param("frozen") Integer frozen);
// 提交时,把冻结量修改到已售出
void updateFrozenToUsed(@Param("productId") Long productId, @Param("count") Integer count);
// 回滚时,把冻结量修改到可用库存
void updateFrozenToResidue(@Param("productId") Long productId, @Param("count") Integer count);
}
那么对应的 StorageMapper.xml
中也要添加 sql:
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="cn.tedu.storage.mapper.StorageMapper" >
<resultMap id="BaseResultMap" type="cn.tedu.storage.entity.Storage" >
<id column="id" property="id" jdbcType="BIGINT" />
<result column="product_id" property="productId" jdbcType="BIGINT" />
<result column="total" property="total" jdbcType="INTEGER" />
<result column="used" property="used" jdbcType="INTEGER" />
<result column="residue" property="residue" jdbcType="INTEGER" />
</resultMap>
<update id="decrease">
UPDATE storage SET used = used + #{count},residue = residue - #{count} WHERE product_id = #{productId}
</update>
<select id="selectById" resultMap="BaseResultMap">
SELECT * FROM storage WHERE `product_id`=#{productId}
</select>
<update id="updateFrozen">
UPDATE storage SET `residue`=#{residue},`frozen`=#{frozen} WHERE `product_id`=#{productId}
</update>
<update id="updateFrozenToUsed">
UPDATE storage SET `frozen`=`frozen`-#{count}, `used`=`used`+#{count} WHERE `product_id`=#{productId}
</update>
<update id="updateFrozenToResidue">
UPDATE storage SET `frozen`=`frozen`-#{count}, `residue`=`residue`+#{count} WHERE `product_id`=#{productId}
</update>
</mapper>
工具类 ResultHolder
:
package cn.tedu.storage.tcc;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class ResultHolder {
private static Map<Class<?>, Map<String, String>> map = new ConcurrentHashMap<Class<?>, Map<String, String>>();
public static void setResult(Class<?> actionClass, String xid, String v) {
Map<String, String> results = map.get(actionClass);
if (results == null) {
synchronized (map) {
if (results == null) {
results = new ConcurrentHashMap<>();
map.put(actionClass, results);
}
}
}
results.put(xid, v);
}
public static String getResult(Class<?> actionClass, String xid) {
Map<String, String> results = map.get(actionClass);
if (results != null) {
return results.get(xid);
}
return null;
}
public static void removeResult(Class<?> actionClass, String xid) {
Map<String, String> results = map.get(actionClass);
if (results != null) {
results.remove(xid);
}
}
}
添加 TCC 接口,在接口中添加以下方法:
prepareDecreaseStorage()
commit()
rollback()
package cn.tedu.storage.tcc;
import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import io.seata.rm.tcc.api.LocalTCC;
import io.seata.rm.tcc.api.TwoPhaseBusinessAction;
@LocalTCC
public interface StorageTccAction {
@TwoPhaseBusinessAction(name = "storageTccAction", commitMethod = "commit", rollbackMethod = "rollback")
boolean prepareDecreaseStorage(BusinessActionContext businessActionContext,
@BusinessActionContextParameter(paramName = "productId") Long productId,
@BusinessActionContextParameter(paramName = "count") Integer count);
boolean commit(BusinessActionContext businessActionContext);
boolean rollback(BusinessActionContext businessActionContext);
}
实现类:
package cn.tedu.storage.tcc;
import cn.tedu.storage.entity.Storage;
import cn.tedu.storage.mapper.StorageMapper;
import io.seata.rm.tcc.api.BusinessActionContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
@Component
@Slf4j
public class StorageTccActionImpl implements StorageTccAction {
@Autowired
private StorageMapper storageMapper;
@Transactional
@Override
public boolean prepareDecreaseStorage(BusinessActionContext businessActionContext, Long productId, Integer count) {
log.info("减少商品库存,第一阶段,锁定减少的库存量,productId="+productId+", count="+count);
Storage storage = storageMapper.selectById(productId);
if (storage.getResidue()-count<0) {
throw new RuntimeException("库存不足");
}
/*
库存减掉count, 冻结库存增加count
*/
storageMapper.updateFrozen(productId, storage.getResidue()-count, storage.getFrozen()+count);
//保存标识
ResultHolder.setResult(getClass(), businessActionContext.getXid(), "p");
return true;
}
@Transactional
@Override
public boolean commit(BusinessActionContext businessActionContext) {
long productId = Long.parseLong(businessActionContext.getActionContext("productId").toString());
int count = Integer.parseInt(businessActionContext.getActionContext("count").toString());
log.info("减少商品库存,第二阶段提交,productId="+productId+", count="+count);
//防止重复提交
if (ResultHolder.getResult(getClass(), businessActionContext.getXid()) == null) {
return true;
}
storageMapper.updateFrozenToUsed(productId, count);
//删除标识
ResultHolder.removeResult(getClass(), businessActionContext.getXid());
return true;
}
@Transactional
@Override
public boolean rollback(BusinessActionContext businessActionContext) {
long productId = Long.parseLong(businessActionContext.getActionContext("productId").toString());
int count = Integer.parseInt(businessActionContext.getActionContext("count").toString());
log.info("减少商品库存,第二阶段,回滚,productId="+productId+", count="+count);
//防止重复回滚
if (ResultHolder.getResult(getClass(), businessActionContext.getXid()) == null) {
return true;
}
storageMapper.updateFrozenToResidue(productId, count);
//删除标识
ResultHolder.removeResult(getClass(), businessActionContext.getXid());
return true;
}
}
业务代码中调用 TCC 第一阶段方法prepareDecreaseStorage()
,并添加全局事务注解 @GlobalTransactional
:
package cn.tedu.storage.service;
import cn.tedu.storage.tcc.StorageTccAction;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class StorageServiceImpl implements StorageService {
// @Autowired
// private StorageMapper storageMapper;
@Autowired
private StorageTccAction storageTccAction;
@Override
public void decrease(Long productId, Integer count) throws Exception {
// storageMapper.decrease(productId,count);
storageTccAction.prepareDecreaseStorage(null, productId, count);
}
}
按顺序启动服务:
调用保存订单,地址:
http://localhost:8083/create?userId=1&productId=1&count=10&money=100
观察 storage 的控制台日志:
查看数据库表中的库存数据:
扣减金额 TCC 事务分析请见《分布式事务(六)Seata TCC模式-TCC模式介绍》
有三个文件需要配置:
这三个文件的设置与上面 order 项目的配置完全相同,请参考上面订单配置一章进行配置。
根据前面的分析,库存数据操作有以下三项:
在 AccountMapper 中添加三个方法:
package cn.tedu.account.mapper;
import cn.tedu.account.entity.Account;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Param;
import java.math.BigDecimal;
public interface AccountMapper extends BaseMapper<Account> {
void decrease(Long userId, BigDecimal money);
void updateFrozen(@Param("userId") Long userId, @Param("residue") BigDecimal residue, @Param("frozen") BigDecimal frozen);
void updateFrozenToUsed(@Param("userId") Long userId, @Param("money") BigDecimal money);
void updateFrozenToResidue(@Param("userId") Long userId, @Param("money") BigDecimal money);
}
那么对应的 AccountMapper.xml
中添加 sql:
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="cn.tedu.account.mapper.AccountMapper" >
<resultMap id="BaseResultMap" type="cn.tedu.account.entity.Account" >
<id column="id" property="id" jdbcType="BIGINT" />
<result column="user_id" property="userId" jdbcType="BIGINT" />
<result column="total" property="total" jdbcType="DECIMAL" />
<result column="used" property="used" jdbcType="DECIMAL" />
<result column="residue" property="residue" jdbcType="DECIMAL"/>
<result column="frozen" property="frozen" jdbcType="DECIMAL"/>
</resultMap>
<update id="decrease">
UPDATE account SET residue = residue - #{money},used = used + #{money} where user_id = #{userId};
</update>
<select id="selectById" resultMap="BaseResultMap">
SELECT * FROM account WHERE `user_id`=#{userId}
</select>
<update id="updateFrozen">
UPDATE account SET `residue`=#{residue},`frozen`=#{frozen} WHERE `user_id`=#{userId}
</update>
<update id="updateFrozenToUsed">
UPDATE account SET `frozen`=`frozen`-#{money}, `used`=`used`+#{money} WHERE `user_id`=#{userId}
</update>
<update id="updateFrozenToResidue">
UPDATE account SET `frozen`=`frozen`-#{money}, `residue`=`residue`+#{money} WHERE `user_id`=#{userId}
</update>
</mapper>
工具类 ResultHolder
:
package cn.tedu.account.tcc;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class ResultHolder {
private static Map<Class<?>, Map<String, String>> map = new ConcurrentHashMap<Class<?>, Map<String, String>>();
public static void setResult(Class<?> actionClass, String xid, String v) {
Map<String, String> results = map.get(actionClass);
if (results == null) {
synchronized (map) {
if (results == null) {
results = new ConcurrentHashMap<>();
map.put(actionClass, results);
}
}
}
results.put(xid, v);
}
public static String getResult(Class<?> actionClass, String xid) {
Map<String, String> results = map.get(actionClass);
if (results != null) {
return results.get(xid);
}
return null;
}
public static void removeResult(Class<?> actionClass, String xid) {
Map<String, String> results = map.get(actionClass);
if (results != null) {
results.remove(xid);
}
}
}
添加 TCC 接口,在接口中添加以下方法:
prepareDecreaseAccount()
commit()
rollback()
package cn.tedu.account.tcc;
import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import io.seata.rm.tcc.api.LocalTCC;
import io.seata.rm.tcc.api.TwoPhaseBusinessAction;
import java.math.BigDecimal;
@LocalTCC
public interface AccountTccAction {
@TwoPhaseBusinessAction(name = "accountTccAction", commitMethod = "commit", rollbackMethod = "rollback")
boolean prepareDecreaseAccount(BusinessActionContext businessActionContext,
@BusinessActionContextParameter(paramName = "userId") Long userId,
@BusinessActionContextParameter(paramName = "money") BigDecimal money);
boolean commit(BusinessActionContext businessActionContext);
boolean rollback(BusinessActionContext businessActionContext);
}
实现类:
package cn.tedu.account.tcc;
import cn.tedu.account.entity.Account;
import cn.tedu.account.mapper.AccountMapper;
import io.seata.rm.tcc.api.BusinessActionContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.math.BigDecimal;
@Component
@Slf4j
public class AccountTccActionImpl implements AccountTccAction {
@Autowired
private AccountMapper accountMapper;
@Transactional
@Override
public boolean prepareDecreaseAccount(BusinessActionContext businessActionContext, Long userId, BigDecimal money) {
log.info("减少账户金额,第一阶段锁定金额,userId="+userId+", money="+money);
Account account = accountMapper.selectById(userId);
if (account.getResidue().compareTo(money) < 0) {
throw new RuntimeException("账户金额不足");
}
/*
余额-money
冻结+money
*/
accountMapper.updateFrozen(userId, account.getResidue().subtract(money), account.getFrozen().add(money));
//保存标识
ResultHolder.setResult(getClass(), businessActionContext.getXid(), "p");
return true;
}
@Transactional
@Override
public boolean commit(BusinessActionContext businessActionContext) {
long userId = Long.parseLong(businessActionContext.getActionContext("userId").toString());
BigDecimal money = new BigDecimal(businessActionContext.getActionContext("money").toString());
log.info("减少账户金额,第二阶段,提交,userId="+userId+", money="+money);
//防止重复提交
if (ResultHolder.getResult(getClass(), businessActionContext.getXid()) == null) {
return true;
}
accountMapper.updateFrozenToUsed(userId, money);
//删除标识
ResultHolder.removeResult(getClass(), businessActionContext.getXid());
return true;
}
@Transactional
@Override
public boolean rollback(BusinessActionContext businessActionContext) {
long userId = Long.parseLong(businessActionContext.getActionContext("userId").toString());
BigDecimal money = new BigDecimal(businessActionContext.getActionContext("money").toString());
//防止重复回滚
if (ResultHolder.getResult(getClass(), businessActionContext.getXid()) == null) {
return true;
}
log.info("减少账户金额,第二阶段,回滚,userId="+userId+", money="+money);
accountMapper.updateFrozenToResidue(userId, money);
//删除标识
ResultHolder.removeResult(getClass(), businessActionContext.getXid());
return true;
}
}
业务代码中调用 TCC 第一阶段方法prepareDecreaseAccount()
,并添加全局事务注解 @GlobalTransactional
:
package cn.tedu.account.service;
import cn.tedu.account.mapper.AccountMapper;
import cn.tedu.account.tcc.AccountTccAction;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.math.BigDecimal;
@Service
public class AccountServiceImpl implements AccountService {
// @Autowired
// private AccountMapper accountMapper;
@Autowired
private AccountTccAction accountTccAction;
@Override
public void decrease(Long userId, BigDecimal money) {
// accountMapper.decrease(userId,money);
accountTccAction.prepareDecreaseAccount(null, userId, money);
}
}
按顺序启动服务:
调用保存订单,地址:
http://localhost:8083/create?userId=1&productId=1&count=10&money=100
观察 account 的控制台日志:
查看数据库表中的账户数据:
下面来测试全局事务回滚的情况。
订单和库存第一阶段成功,而账户第一阶段失败了,这时会触发全局事务的回滚,如下图所示:
首先在 account 的第一阶段代码中添加模拟异常:
AccountTccActionImpl
的 prepareDecreaseAccount
方法
@Transactional
@Override
public boolean prepareDecreaseAccount(BusinessActionContext businessActionContext, Long userId, BigDecimal money) {
log.info("减少账户金额,第一阶段锁定金额,userId="+userId+", money="+money);
Account account = accountMapper.selectById(userId);
if (account.getResidue().compareTo(money) < 0) {
throw new RuntimeException("账户金额不足");
}
/*
余额-money
冻结+money
*/
accountMapper.updateFrozen(userId, account.getResidue().subtract(money), account.getFrozen().add(money));
if (Math.random() < 0.5) {
throw new RuntimeException("模拟异常");
}
//保存标识
ResultHolder.setResult(getClass(), businessActionContext.getXid(), "p");
return true;
}
重启 account 后,访问订单:
http://localhost:8083/create?userId=1&productId=1&count=10&money=100
查看控制台,可以看到 storage 和 order 的回滚日志,order 的回滚日志如下:
TCC 基本原理TCC 与 Seata AT 事务一样都是两阶段事务,它与 AT 事务的主要区别为:TCC 对业务代码侵入严重每个阶段的数据操作都要自己进行编码来实现,事务框架无法自动处理。TCC 效率更高不必对数据加全局锁,允许多个事务同时操作数据。第一阶段 Try以账户服务为...
KerryWu 赞了文章 · 1月18日
TCC 与 Seata AT 事务一样都是两阶段事务,它与 AT 事务的主要区别为:
以账户服务为例,当下订单时要扣减用户账户金额:
假如用户购买 100 元商品,要扣减 100 元。
TCC 事务首先对这100元的扣减金额进行预留,或者说是先冻结这100元:
如果第一阶段能够顺利完成,那么说明“扣减金额”业务(分支事务)最终肯定是可以成功的。当全局事务提交时, TC会控制当前分支事务进行提交,如果提交失败,TC 会反复尝试,直到提交成功为止。
当全局事务提交时,就可以使用冻结的金额来最终实现业务数据操作:
如果全局事务回滚,就把冻结的金额进行解冻,恢复到以前的状态,TC 会控制当前分支事务回滚,如果回滚失败,TC 会反复尝试,直到回滚完成为止。
多个TCC全局事务允许并发,它们执行扣减金额时,只需要冻结各自的金额即可:
Seata 支持 TCC 事务模式,与 AT 模式相同的,也需要以下组件来支持全局事务的控制:
新建 Empty Project:
工程命名为 seata-tcc
,存放到 seata-samples 文件夹下,与 seata-at
工程存放在一起:
压缩文件中的 7 个项目目录解压缩到 seata-tcc
目录:
在 idea 中按两下 shift
键,搜索 add maven projects
,打开 maven 工具:
然后选择 seata-tcc
工程目录下的 7 个项目的 pom.xml
导入:
在订单项目中执行添加订单:
我们要添加以下 TCC 事务操作的代码:
T
ry - 第一阶,冻结数据阶段,向订单表直接插入订单,订单状态设置为0(冻结状态)。C
onfirm - 第二阶段,提交事务,将订单状态修改成1(正常状态)。C
ancel - 第二阶段,回滚事务,删除订单。打开 order-parent 中注释掉的 seata 依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.2.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>cn.tedu</groupId>
<artifactId>order-parent</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>pom</packaging>
<name>order-parent</name>
<properties>
<mybatis-plus.version>3.3.2</mybatis-plus.version>
<druid-spring-boot-starter.version>1.1.23</druid-spring-boot-starter.version>
<seata.version>1.3.0</seata.version>
<spring-cloud-alibaba-seata.version>2.0.0.RELEASE</spring-cloud-alibaba-seata.version>
<spring-cloud.version>Hoxton.SR6</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>${mybatis-plus.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>${druid-spring-boot-starter.version}</version>
</dependency>
<!-- 打开 seata 依赖 -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-seata</artifactId>
<version>${spring-cloud-alibaba-seata.version}</version>
<exclusions>
<exclusion>
<artifactId>seata-all</artifactId>
<groupId>io.seata</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
<version>${seata.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
设置全局事务组的组名:
spring:
application:
name: order
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost/seata_order?useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2B8
username: root
password: root
# 事务组设置
cloud:
alibaba:
seata:
tx-service-group: order_tx_group
......
与 AT 事务中的配置完全相同:
registry.conf
:
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "eureka"
nacos {
serverAddr = "localhost"
namespace = ""
cluster = "default"
}
eureka {
serviceUrl = "http://localhost:8761/eureka"
# application = "default"
# weight = "1"
}
redis {
serverAddr = "localhost:6379"
db = "0"
password = ""
cluster = "default"
timeout = "0"
}
zk {
cluster = "default"
serverAddr = "127.0.0.1:2181"
session.timeout = 6000
connect.timeout = 2000
username = ""
password = ""
}
consul {
cluster = "default"
serverAddr = "127.0.0.1:8500"
}
etcd3 {
cluster = "default"
serverAddr = "http://localhost:2379"
}
sofa {
serverAddr = "127.0.0.1:9603"
application = "default"
region = "DEFAULT_ZONE"
datacenter = "DefaultDataCenter"
cluster = "default"
group = "SEATA_GROUP"
addressWaitTime = "3000"
}
file {
name = "file.conf"
}
}
config {
# file、nacos 、apollo、zk、consul、etcd3、springCloudConfig
type = "file"
nacos {
serverAddr = "localhost"
namespace = ""
group = "SEATA_GROUP"
}
consul {
serverAddr = "127.0.0.1:8500"
}
apollo {
app.id = "seata-server"
apollo.meta = "http://192.168.1.204:8801"
namespace = "application"
}
zk {
serverAddr = "127.0.0.1:2181"
session.timeout = 6000
connect.timeout = 2000
username = ""
password = ""
}
etcd3 {
serverAddr = "http://localhost:2379"
}
file {
name = "file.conf"
}
}
file.conf
:
transport {
# tcp udt unix-domain-socket
type = "TCP"
#NIO NATIVE
server = "NIO"
#enable heartbeat
heartbeat = true
# the client batch send request enable
enableClientBatchSendRequest = true
#thread factory for netty
threadFactory {
bossThreadPrefix = "NettyBoss"
workerThreadPrefix = "NettyServerNIOWorker"
serverExecutorThread-prefix = "NettyServerBizHandler"
shareBossWorker = false
clientSelectorThreadPrefix = "NettyClientSelector"
clientSelectorThreadSize = 1
clientWorkerThreadPrefix = "NettyClientWorkerThread"
# netty boss thread size,will not be used for UDT
bossThreadSize = 1
#auto default pin or 8
workerThreadSize = "default"
}
shutdown {
# when destroy server, wait seconds
wait = 3
}
serialization = "seata"
compressor = "none"
}
service {
#transaction service group mapping
# order_tx_group 与 yml 中的 “tx-service-group: order_tx_group” 配置一致
# “seata-server” 与 TC 服务器的注册名一致
# 从eureka获取seata-server的地址,再向seata-server注册自己,设置group
vgroupMapping.order_tx_group = "seata-server"
#only support when registry.type=file, please don't set multiple addresses
order_tx_group.grouplist = "127.0.0.1:8091"
#degrade, current not support
enableDegrade = false
#disable seata
disableGlobalTransaction = false
}
client {
rm {
asyncCommitBufferLimit = 10000
lock {
retryInterval = 10
retryTimes = 30
retryPolicyBranchRollbackOnConflict = true
}
reportRetryCount = 5
tableMetaCheckEnable = false
reportSuccessEnable = false
}
tm {
commitRetryCount = 5
rollbackRetryCount = 5
}
undo {
dataValidation = true
logSerialization = "jackson"
logTable = "undo_log"
}
log {
exceptionRate = 100
}
}
根据前面的分析,订单数据操作有以下三项:
在 OrderMapper 中已经有插入订单的方法了,现在需要添加修改订单和删除订单的方法(删除方法从BaseMapper继承):
package cn.tedu.order.mapper;
import cn.tedu.order.entity.Order;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Param;
public interface OrderMapper extends BaseMapper {
void create(Order order);
void updateStatus(@Param("orderId") Long orderId, @Param("status") Integer status);
}
那么对应的 OrderMapper.xml
中也要添加 sql:
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="cn.tedu.order.mapper.OrderMapper" >
<resultMap id="BaseResultMap" type="cn.tedu.order.entity.Order" >
<id column="id" property="id" jdbcType="BIGINT" />
<result column="user_id" property="userId" jdbcType="BIGINT" />
<result column="product_id" property="productId" jdbcType="BIGINT" />
<result column="count" property="count" jdbcType="INTEGER" />
<result column="money" property="money" jdbcType="DECIMAL" />
<result column="status" property="status" jdbcType="INTEGER" />
</resultMap>
<insert id="create">
INSERT INTO `order` (`id`,`user_id`,`product_id`,`count`,`money`,`status`)
VALUES(#{id}, #{userId}, #{productId}, #{count}, #{money}, ${status});
</insert>
<update id="updateStatus" >
UPDATE `order` SET `status`=#{status} WHERE `id`=#{orderId};
</update>
<delete id="deleteById">
DELETE FROM `order` WHERE `id`=#{orderId}
</delete>
</mapper>
第二阶段
第二阶段为了处理幂等性问题这里首先添加一个工具类 ResultHolder
。
这个工具也可以在第二阶段 Confirm 或 Cancel 阶段对第一阶段的成功与否进行判断,在第一阶段成功时需要保存一个标识。
ResultHolder
可以为每一个全局事务保存一个标识:
package cn.tedu.order.tcc;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class ResultHolder {
private static Map<Class<?>, Map<String, String>> map = new ConcurrentHashMap<Class<?>, Map<String, String>>();
public static void setResult(Class<?> actionClass, String xid, String v) {
Map<String, String> results = map.get(actionClass);
if (results == null) {
synchronized (map) {
if (results == null) {
results = new ConcurrentHashMap<>();
map.put(actionClass, results);
}
}
}
results.put(xid, v);
}
public static String getResult(Class<?> actionClass, String xid) {
Map<String, String> results = map.get(actionClass);
if (results != null) {
return results.get(xid);
}
return null;
}
public static void removeResult(Class<?> actionClass, String xid) {
Map<String, String> results = map.get(actionClass);
if (results != null) {
results.remove(xid);
}
}
}
Seata 实现 TCC 操作需要定义一个接口,我们在接口中添加以下方法:
prepareCreateOrder()
commit()
rollback()
package cn.tedu.order.tcc;
import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import io.seata.rm.tcc.api.LocalTCC;
import io.seata.rm.tcc.api.TwoPhaseBusinessAction;
import java.math.BigDecimal;
@LocalTCC
public interface OrderTccAction {
// T (try - 预留资源,冻结订单)
/*
第一阶段的方法
通过注解指定第二阶段的两个方法名
BusinessActionContext 上下文对象,用来在两个阶段之间传递数据
@BusinessActionContextParameter 注解的参数数据会被存入 BusinessActionContext
@TwoPhaseBusinessAction 提交回滚的默认值可以不写
*/
@TwoPhaseBusinessAction(name = "orderTccAction", commitMethod = "commit", rollbackMethod = "rollback")
boolean prepareCreateOrder(BusinessActionContext businessActionContext,
@BusinessActionContextParameter(paramName = "orderId") Long orderId,
@BusinessActionContextParameter(paramName = "userId") Long userId,
@BusinessActionContextParameter(paramName = "productId") Long productId,
@BusinessActionContextParameter(paramName = "count") Integer count,
@BusinessActionContextParameter(paramName = "money") BigDecimal money);
//C (confirm - 确认,提交)
boolean commit(BusinessActionContext businessActionContext);
//C (cancel - 取消,回滚)
boolean rollback(BusinessActionContext businessActionContext);
}
实现类:
package cn.tedu.order.tcc;
import cn.tedu.order.entity.Order;
import cn.tedu.order.mapper.OrderMapper;
import io.seata.rm.tcc.api.BusinessActionContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.math.BigDecimal;
@Component
public class OrderTccActionImpl implements OrderTccAction {
@Autowired
private OrderMapper orderMapper;
@Override
public boolean prepareCreateOrder(BusinessActionContext ctx, Long orderId, Long userId, Long productId, Integer count, BigDecimal money) {
orderMapper.create(new Order(orderId, userId, productId, count, money, 0));
//保存一阶段的成功标记
ResultHolder.setResult(OrderTccAction.class, ctx.getXid(), "p");
return true;
}
@Override
public boolean commit(BusinessActionContext ctx) {
//检查标记是否存在,如果标记不存在不重复提交
String p = ResultHolder.getResult(OrderTccAction.class, ctx.getXid());
if (p == null){
return true;
}
/**
* 上下文对象从第一阶段向第二阶段传递时,先转成了json数据,然后还原成上下文对象
* 其中的整数比较小的会转成Integer类型,所以如果需要Long类型,需要先转换成字符串在用Long.valueOf()解析。
*/
Long orderId = Long.valueOf(ctx.getActionContext("orderId").toString());
orderMapper.updateStatus(orderId, 1);
//提交完成后,删除标记
ResultHolder.removeResult(OrderTccAction.class, ctx.getXid());
return true;
}
@Override
public boolean rollback(BusinessActionContext ctx) {
//第一阶段没有完成的情况下,不必执行回滚
//因为第一阶段有本地事务,事务失败时已经进行了回滚。
//如果这里第一阶段成功,而其他全局事务参与者失败,这里会执行回滚
//幂等性控制:如果重复执行回滚则直接返回
//检查标记是否存在,如果标记不存在不重复提交
String p = ResultHolder.getResult(OrderTccAction.class, ctx.getXid());
if (p == null){
return true;
}
Long orderId = Long.valueOf(ctx.getActionContext("orderId").toString());
orderMapper.deleteById(orderId);
//提交完成后,删除标记
ResultHolder.removeResult(OrderTccAction.class, ctx.getXid());
return true;
}
}
业务代码中不再直接保存订单数据,而是调用 TCC 第一阶段方法prepareCreateOrder()
,并添加全局事务注解 @GlobalTransactional
:
package cn.tedu.order.service;
import cn.tedu.order.entity.Order;
import cn.tedu.order.feign.AccountClient;
import cn.tedu.order.feign.EasyIdGeneratorClient;
import cn.tedu.order.feign.StorageClient;
import cn.tedu.order.mapper.OrderMapper;
import cn.tedu.order.tcc.OrderTccAction;
import io.seata.spring.annotation.GlobalTransactional;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Random;
@Service
public class OrderServiceImpl implements OrderService {
// @Autowired
// private OrderMapper orderMapper;
@Autowired
EasyIdGeneratorClient easyIdGeneratorClient;
@Autowired
private AccountClient accountClient;
@Autowired
private StorageClient storageClient;
@Autowired
private OrderTccAction orderTccAction;
@GlobalTransactional
@Override
public void create(Order order) {
// 从全局唯一id发号器获得id
Long orderId = easyIdGeneratorClient.nextId("order_business");
order.setId(orderId);
// orderMapper.create(order);
// 这里修改成调用 TCC 第一节端方法
//orderTccAction是一个动态代理对象,其中添加了前置拦截器,所以底层会创建,所以传null值即可
orderTccAction.prepareCreateOrder(
null,
order.getId(),
order.getUserId(),
order.getProductId(),
order.getCount(),
order.getMoney());
// 修改库存
//storageClient.decrease(order.getProductId(), order.getCount());
// 修改账户余额
//accountClient.decrease(order.getUserId(), order.getMoney());
}
}
按顺序启动服务:
调用保存订单,地址:
http://localhost:8083/create?userId=1&productId=1&count=10&money=100
观察控制台日志:
查看数据库表中的订单数据:
在库存项目中执行减少库存:
我们要添加以下 TCC 事务操作的代码:
T
ry - 第一阶,冻结数据阶段,将要减少的库存量先冻结:C
onfirm - 第二阶段,提交事务,使用冻结的库存完成业务数据处理:C
ancel - 第二阶段,回滚事务,冻结的库存解冻,恢复以前的库存量:有三个文件需要配置:
这三个文件的设置与上面 order 项目的配置完全相同,请参考上面订单配置一章进行配置。
根据前面的分析,库存数据操作有以下三项:
在 StorageMapper 中添加三个方法:
package cn.tedu.storage.mapper;
import cn.tedu.storage.entity.Storage;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Param;
public interface StorageMapper extends BaseMapper<Storage> {
void decrease(Long productId, Integer count);
// 冻结库存
void updateFrozen(@Param("productId") Long productId, @Param("residue") Integer residue, @Param("frozen") Integer frozen);
// 提交时,把冻结量修改到已售出
void updateFrozenToUsed(@Param("productId") Long productId, @Param("count") Integer count);
// 回滚时,把冻结量修改到可用库存
void updateFrozenToResidue(@Param("productId") Long productId, @Param("count") Integer count);
}
那么对应的 StorageMapper.xml
中也要添加 sql:
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="cn.tedu.storage.mapper.StorageMapper" >
<resultMap id="BaseResultMap" type="cn.tedu.storage.entity.Storage" >
<id column="id" property="id" jdbcType="BIGINT" />
<result column="product_id" property="productId" jdbcType="BIGINT" />
<result column="total" property="total" jdbcType="INTEGER" />
<result column="used" property="used" jdbcType="INTEGER" />
<result column="residue" property="residue" jdbcType="INTEGER" />
</resultMap>
<update id="decrease">
UPDATE storage SET used = used + #{count},residue = residue - #{count} WHERE product_id = #{productId}
</update>
<select id="selectById" resultMap="BaseResultMap">
SELECT * FROM storage WHERE `product_id`=#{productId}
</select>
<update id="updateFrozen">
UPDATE storage SET `residue`=#{residue},`frozen`=#{frozen} WHERE `product_id`=#{productId}
</update>
<update id="updateFrozenToUsed">
UPDATE storage SET `frozen`=`frozen`-#{count}, `used`=`used`+#{count} WHERE `product_id`=#{productId}
</update>
<update id="updateFrozenToResidue">
UPDATE storage SET `frozen`=`frozen`-#{count}, `residue`=`residue`+#{count} WHERE `product_id`=#{productId}
</update>
</mapper>
工具类 ResultHolder
:
package cn.tedu.storage.tcc;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class ResultHolder {
private static Map<Class<?>, Map<String, String>> map = new ConcurrentHashMap<Class<?>, Map<String, String>>();
public static void setResult(Class<?> actionClass, String xid, String v) {
Map<String, String> results = map.get(actionClass);
if (results == null) {
synchronized (map) {
if (results == null) {
results = new ConcurrentHashMap<>();
map.put(actionClass, results);
}
}
}
results.put(xid, v);
}
public static String getResult(Class<?> actionClass, String xid) {
Map<String, String> results = map.get(actionClass);
if (results != null) {
return results.get(xid);
}
return null;
}
public static void removeResult(Class<?> actionClass, String xid) {
Map<String, String> results = map.get(actionClass);
if (results != null) {
results.remove(xid);
}
}
}
添加 TCC 接口,在接口中添加以下方法:
prepareDecreaseStorage()
commit()
rollback()
package cn.tedu.storage.tcc;
import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import io.seata.rm.tcc.api.LocalTCC;
import io.seata.rm.tcc.api.TwoPhaseBusinessAction;
@LocalTCC
public interface StorageTccAction {
@TwoPhaseBusinessAction(name = "storageTccAction", commitMethod = "commit", rollbackMethod = "rollback")
boolean prepareDecreaseStorage(BusinessActionContext businessActionContext,
@BusinessActionContextParameter(paramName = "productId") Long productId,
@BusinessActionContextParameter(paramName = "count") Integer count);
boolean commit(BusinessActionContext businessActionContext);
boolean rollback(BusinessActionContext businessActionContext);
}
实现类:
package cn.tedu.storage.tcc;
import cn.tedu.storage.entity.Storage;
import cn.tedu.storage.mapper.StorageMapper;
import io.seata.rm.tcc.api.BusinessActionContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
@Component
@Slf4j
public class StorageTccActionImpl implements StorageTccAction {
@Autowired
private StorageMapper storageMapper;
@Transactional
@Override
public boolean prepareDecreaseStorage(BusinessActionContext businessActionContext, Long productId, Integer count) {
log.info("减少商品库存,第一阶段,锁定减少的库存量,productId="+productId+", count="+count);
Storage storage = storageMapper.selectById(productId);
if (storage.getResidue()-count<0) {
throw new RuntimeException("库存不足");
}
/*
库存减掉count, 冻结库存增加count
*/
storageMapper.updateFrozen(productId, storage.getResidue()-count, storage.getFrozen()+count);
//保存标识
ResultHolder.setResult(getClass(), businessActionContext.getXid(), "p");
return true;
}
@Transactional
@Override
public boolean commit(BusinessActionContext businessActionContext) {
long productId = Long.parseLong(businessActionContext.getActionContext("productId").toString());
int count = Integer.parseInt(businessActionContext.getActionContext("count").toString());
log.info("减少商品库存,第二阶段提交,productId="+productId+", count="+count);
//防止重复提交
if (ResultHolder.getResult(getClass(), businessActionContext.getXid()) == null) {
return true;
}
storageMapper.updateFrozenToUsed(productId, count);
//删除标识
ResultHolder.removeResult(getClass(), businessActionContext.getXid());
return true;
}
@Transactional
@Override
public boolean rollback(BusinessActionContext businessActionContext) {
long productId = Long.parseLong(businessActionContext.getActionContext("productId").toString());
int count = Integer.parseInt(businessActionContext.getActionContext("count").toString());
log.info("减少商品库存,第二阶段,回滚,productId="+productId+", count="+count);
//防止重复回滚
if (ResultHolder.getResult(getClass(), businessActionContext.getXid()) == null) {
return true;
}
storageMapper.updateFrozenToResidue(productId, count);
//删除标识
ResultHolder.removeResult(getClass(), businessActionContext.getXid());
return true;
}
}
业务代码中调用 TCC 第一阶段方法prepareDecreaseStorage()
,并添加全局事务注解 @GlobalTransactional
:
package cn.tedu.storage.service;
import cn.tedu.storage.tcc.StorageTccAction;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class StorageServiceImpl implements StorageService {
// @Autowired
// private StorageMapper storageMapper;
@Autowired
private StorageTccAction storageTccAction;
@Override
public void decrease(Long productId, Integer count) throws Exception {
// storageMapper.decrease(productId,count);
storageTccAction.prepareDecreaseStorage(null, productId, count);
}
}
按顺序启动服务:
调用保存订单,地址:
http://localhost:8083/create?userId=1&productId=1&count=10&money=100
观察 storage 的控制台日志:
查看数据库表中的库存数据:
扣减金额 TCC 事务分析请见《分布式事务(六)Seata TCC模式-TCC模式介绍》
有三个文件需要配置:
这三个文件的设置与上面 order 项目的配置完全相同,请参考上面订单配置一章进行配置。
根据前面的分析,库存数据操作有以下三项:
在 AccountMapper 中添加三个方法:
package cn.tedu.account.mapper;
import cn.tedu.account.entity.Account;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Param;
import java.math.BigDecimal;
public interface AccountMapper extends BaseMapper<Account> {
void decrease(Long userId, BigDecimal money);
void updateFrozen(@Param("userId") Long userId, @Param("residue") BigDecimal residue, @Param("frozen") BigDecimal frozen);
void updateFrozenToUsed(@Param("userId") Long userId, @Param("money") BigDecimal money);
void updateFrozenToResidue(@Param("userId") Long userId, @Param("money") BigDecimal money);
}
那么对应的 AccountMapper.xml
中添加 sql:
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="cn.tedu.account.mapper.AccountMapper" >
<resultMap id="BaseResultMap" type="cn.tedu.account.entity.Account" >
<id column="id" property="id" jdbcType="BIGINT" />
<result column="user_id" property="userId" jdbcType="BIGINT" />
<result column="total" property="total" jdbcType="DECIMAL" />
<result column="used" property="used" jdbcType="DECIMAL" />
<result column="residue" property="residue" jdbcType="DECIMAL"/>
<result column="frozen" property="frozen" jdbcType="DECIMAL"/>
</resultMap>
<update id="decrease">
UPDATE account SET residue = residue - #{money},used = used + #{money} where user_id = #{userId};
</update>
<select id="selectById" resultMap="BaseResultMap">
SELECT * FROM account WHERE `user_id`=#{userId}
</select>
<update id="updateFrozen">
UPDATE account SET `residue`=#{residue},`frozen`=#{frozen} WHERE `user_id`=#{userId}
</update>
<update id="updateFrozenToUsed">
UPDATE account SET `frozen`=`frozen`-#{money}, `used`=`used`+#{money} WHERE `user_id`=#{userId}
</update>
<update id="updateFrozenToResidue">
UPDATE account SET `frozen`=`frozen`-#{money}, `residue`=`residue`+#{money} WHERE `user_id`=#{userId}
</update>
</mapper>
工具类 ResultHolder
:
package cn.tedu.account.tcc;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class ResultHolder {
private static Map<Class<?>, Map<String, String>> map = new ConcurrentHashMap<Class<?>, Map<String, String>>();
public static void setResult(Class<?> actionClass, String xid, String v) {
Map<String, String> results = map.get(actionClass);
if (results == null) {
synchronized (map) {
if (results == null) {
results = new ConcurrentHashMap<>();
map.put(actionClass, results);
}
}
}
results.put(xid, v);
}
public static String getResult(Class<?> actionClass, String xid) {
Map<String, String> results = map.get(actionClass);
if (results != null) {
return results.get(xid);
}
return null;
}
public static void removeResult(Class<?> actionClass, String xid) {
Map<String, String> results = map.get(actionClass);
if (results != null) {
results.remove(xid);
}
}
}
添加 TCC 接口,在接口中添加以下方法:
prepareDecreaseAccount()
commit()
rollback()
package cn.tedu.account.tcc;
import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import io.seata.rm.tcc.api.LocalTCC;
import io.seata.rm.tcc.api.TwoPhaseBusinessAction;
import java.math.BigDecimal;
@LocalTCC
public interface AccountTccAction {
@TwoPhaseBusinessAction(name = "accountTccAction", commitMethod = "commit", rollbackMethod = "rollback")
boolean prepareDecreaseAccount(BusinessActionContext businessActionContext,
@BusinessActionContextParameter(paramName = "userId") Long userId,
@BusinessActionContextParameter(paramName = "money") BigDecimal money);
boolean commit(BusinessActionContext businessActionContext);
boolean rollback(BusinessActionContext businessActionContext);
}
实现类:
package cn.tedu.account.tcc;
import cn.tedu.account.entity.Account;
import cn.tedu.account.mapper.AccountMapper;
import io.seata.rm.tcc.api.BusinessActionContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.math.BigDecimal;
@Component
@Slf4j
public class AccountTccActionImpl implements AccountTccAction {
@Autowired
private AccountMapper accountMapper;
@Transactional
@Override
public boolean prepareDecreaseAccount(BusinessActionContext businessActionContext, Long userId, BigDecimal money) {
log.info("减少账户金额,第一阶段锁定金额,userId="+userId+", money="+money);
Account account = accountMapper.selectById(userId);
if (account.getResidue().compareTo(money) < 0) {
throw new RuntimeException("账户金额不足");
}
/*
余额-money
冻结+money
*/
accountMapper.updateFrozen(userId, account.getResidue().subtract(money), account.getFrozen().add(money));
//保存标识
ResultHolder.setResult(getClass(), businessActionContext.getXid(), "p");
return true;
}
@Transactional
@Override
public boolean commit(BusinessActionContext businessActionContext) {
long userId = Long.parseLong(businessActionContext.getActionContext("userId").toString());
BigDecimal money = new BigDecimal(businessActionContext.getActionContext("money").toString());
log.info("减少账户金额,第二阶段,提交,userId="+userId+", money="+money);
//防止重复提交
if (ResultHolder.getResult(getClass(), businessActionContext.getXid()) == null) {
return true;
}
accountMapper.updateFrozenToUsed(userId, money);
//删除标识
ResultHolder.removeResult(getClass(), businessActionContext.getXid());
return true;
}
@Transactional
@Override
public boolean rollback(BusinessActionContext businessActionContext) {
long userId = Long.parseLong(businessActionContext.getActionContext("userId").toString());
BigDecimal money = new BigDecimal(businessActionContext.getActionContext("money").toString());
//防止重复回滚
if (ResultHolder.getResult(getClass(), businessActionContext.getXid()) == null) {
return true;
}
log.info("减少账户金额,第二阶段,回滚,userId="+userId+", money="+money);
accountMapper.updateFrozenToResidue(userId, money);
//删除标识
ResultHolder.removeResult(getClass(), businessActionContext.getXid());
return true;
}
}
业务代码中调用 TCC 第一阶段方法prepareDecreaseAccount()
,并添加全局事务注解 @GlobalTransactional
:
package cn.tedu.account.service;
import cn.tedu.account.mapper.AccountMapper;
import cn.tedu.account.tcc.AccountTccAction;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.math.BigDecimal;
@Service
public class AccountServiceImpl implements AccountService {
// @Autowired
// private AccountMapper accountMapper;
@Autowired
private AccountTccAction accountTccAction;
@Override
public void decrease(Long userId, BigDecimal money) {
// accountMapper.decrease(userId,money);
accountTccAction.prepareDecreaseAccount(null, userId, money);
}
}
按顺序启动服务:
调用保存订单,地址:
http://localhost:8083/create?userId=1&productId=1&count=10&money=100
观察 account 的控制台日志:
查看数据库表中的账户数据:
下面来测试全局事务回滚的情况。
订单和库存第一阶段成功,而账户第一阶段失败了,这时会触发全局事务的回滚,如下图所示:
首先在 account 的第一阶段代码中添加模拟异常:
AccountTccActionImpl
的 prepareDecreaseAccount
方法
@Transactional
@Override
public boolean prepareDecreaseAccount(BusinessActionContext businessActionContext, Long userId, BigDecimal money) {
log.info("减少账户金额,第一阶段锁定金额,userId="+userId+", money="+money);
Account account = accountMapper.selectById(userId);
if (account.getResidue().compareTo(money) < 0) {
throw new RuntimeException("账户金额不足");
}
/*
余额-money
冻结+money
*/
accountMapper.updateFrozen(userId, account.getResidue().subtract(money), account.getFrozen().add(money));
if (Math.random() < 0.5) {
throw new RuntimeException("模拟异常");
}
//保存标识
ResultHolder.setResult(getClass(), businessActionContext.getXid(), "p");
return true;
}
重启 account 后,访问订单:
http://localhost:8083/create?userId=1&productId=1&count=10&money=100
查看控制台,可以看到 storage 和 order 的回滚日志,order 的回滚日志如下:
TCC 基本原理TCC 与 Seata AT 事务一样都是两阶段事务,它与 AT 事务的主要区别为:TCC 对业务代码侵入严重每个阶段的数据操作都要自己进行编码来实现,事务框架无法自动处理。TCC 效率更高不必对数据加全局锁,允许多个事务同时操作数据。第一阶段 Try以账户服务为...
赞 1 收藏 1 评论 1
KerryWu 发布了文章 · 1月10日
两年前在项目上实施oracle etl同步时,客户就提出cdc(Change Data Capture)增量同步的需求,并且明确要求基于日志来捕获数据变化。当时对于这方面的知识储备不够,只觉得这样的需求太苛刻。到了后来我实施分布式架构的方案越来越多,经常会思考如何保障数据的一致性,也让我回过头来,重新思考当年客户的需求。
本文的主角是canal,常用来保障mysql到redis、elasticsearch等产品数据的增量同步。下文先讲canal的安装配置,再结合具体的代码,实现mysql到redis的实时同步。
分布式架构近些年很受推崇,我们的系统开发不再局限于一台mysql数据库,可以为了缓存而引入redis,为了搜索而引入elasticsearch,等等,这些是分布式架构给我们带来的便利性。
但带来的挑战也加大了,比如:微服务治理、分布式事务,今天还会讲到数据同步。以前对于关系型数据库的数据同步,已经诞生了不少 ETL工具,比如我们熟悉的 oracle ODI。但是以现在微服务开发而论,还是不够灵活,我们需要可以自由的将mysql数据同步到redis、elasticsearch等地方。这里就可以用到本文的主角 -- canal。
canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。
早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。
基于日志增量订阅和消费的业务包括:
当前的 canal 支持源端 mysql 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x 。
MySQL主备复制原理
MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
canal 工作原理
canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
canal 解析 binary log 对象(原始为 byte 流)
关于canal的优点,肯定是要拿它和之前接触过 ETL工具做比较。
需要先开启mysql的 binlog 写入功能,配置 binlog-format 为 ROW 模式。这里修改 my.cnf 文件,添加下列配置:
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
重启mysql,用以下命令检查一下binlog是否正确启动:
mysql> show variables like 'log_bin%';
+---------------------------------+----------------------------------+
| Variable_name | Value |
+---------------------------------+----------------------------------+
| log_bin | ON |
| log_bin_basename | /data/mysql/data/mysql-bin |
| log_bin_index | /data/mysql/data/mysql-bin.index |
| log_bin_trust_function_creators | OFF |
| log_bin_use_v1_row_events | OFF |
+---------------------------------+----------------------------------+
5 rows in set (0.00 sec)
mysql> show variables like 'binlog_format%';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW |
+---------------+-------+
1 row in set (0.00 sec)
创建一个mysql用户canal 并且赋远程链接权限权限。
CREATE USER canal IDENTIFIED BY 'canal';
GRANT ALL PRIVILEGES ON test_canal.user TO 'canal'@'%';
FLUSH PRIVILEGES;
tar -zxvf canal.deployer-1.4.0.tar.gz
,得到四个目录bin、conf、lib、logs。canal.instance.master.address = 127.0.0.1:3306
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
# 启动
sh bin/startup.sh
# 关闭
sh bin/stop.sh
# 查看日志
tail -500f logs/canal/canal.log
# 查看具体实例日志
tail -500f logs/example/example.log
canal作为mysql的实时数据订阅组件,实现了对mysql binlog数据的抓取。
虽然阿里也开源了一个纯粹从mysql同步数据到mysql的项目otter(github.com/alibaba/otter,基于canal的),实现了mysql的单向同步、双向同步等能力。但是我们经常有从mysql同步数据到es、hbase等存储的需求,就需要用户自己用canal-client获取数据进行消费,比较麻烦。
从1.1.1版本开始,canal实现了一个配套的落地模块,实现对canal订阅的消息进行消费,就是client-adapter(github.com/alibaba/canal/wiki/ClientAdapter)。
目前的最新稳定版1.1.4版本中,client-adapter已经实现了同步数据到RDS、ES、HBase的能力。
目前Adapter具备以下基本能力:
本文不关注这部分canal.adapter的配置,具体的配置方式,请参考github官方文档。
简单贴一贴下游es的配置,修改启动器配置: application.yml
server:
port: 8081
logging:
level:
com.alibaba.otter.canal.client.adapter.es: DEBUG
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
canalServerHost: 127.0.0.1:11111
flatMessage: true
srcDataSources:
defaultDS:
url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true
username: root
password: 121212
canalInstances:
- instance: example
adapterGroups:
- outAdapters:
- name: es
hosts: 127.0.0.1:9300 # es 集群地址, 逗号分隔
properties:
cluster.name: elasticsearch # es cluster name
adapter将会自动加载 conf/es 下的所有.yml结尾的配置文件,再修改 conf/es/mytest_user.yml文件:
dataSourceKey: defaultDS # 源数据源的key, 对应上面配置的srcDataSources中的值
destination: example # cannal的instance或者MQ的topic
esMapping:
_index: mytest_user # es 的索引名称
_type: _doc # es 的doc名称
_id: _id # es 的_id, 如果不配置该项必须配置下面的pk项_id则会由es自动分配
# pk: id # 如果不需要_id, 则需要指定一个属性为主键属性
# sql映射
sql: "select a.id as _id, a.name as _name, a.role_id as _role_id, b.role_name as _role_name,
a.c_time as _c_time, c.labels as _labels from user a
left join role b on b.id=a.role_id
left join (select user_id, group_concat(label order by id desc separator ';') as labels from label
group by user_id) c on c.user_id=a.id"
# objFields:
# _labels: array:; # 数组或者对象属性, array:; 代表以;字段里面是以;分隔的
# _obj: obj:{"test":"123"}
etlCondition: "where a.c_time>='{0}'" # etl 的条件参数
commitBatch: 3000 # 提交批大小
sql映射说明:
sql支持多表关联自由组合, 但是有一定的限制:
Elastic Search的mapping 属性与sql的查询值将一一对应(不支持 select *), 比如: select a.id as _id, a.name, a.email as _email from user, 其中name将映射到es mapping的name field, _email将 映射到mapping的_email field, 这里以别名(如果有别名)作为最终的映射字段. 这里的_id可以填写到配置文件的 _id: _id映射.
1、单表映射索引示例
select a.id as _id, a.name, a.role_id, a.c_time from user a
该sql对应的es mapping示例:
{
"mytest_user": {
"mappings": {
"_doc": {
"properties": {
"name": {
"type": "text"
},
"role_id": {
"type": "long"
},
"c_time": {
"type": "date"
}
}
}
}
}
}
2、单表映射索引示例sql带函数或运算操作
select a.id as _id, concat(a.name,'_test') as name, a.role_id+10000 as role_id, a.c_time from user a
函数字段后必须跟上别名, 该sql对应的es mapping示例:
{
"mytest_user": {
"mappings": {
"_doc": {
"properties": {
"name": {
"type": "text"
},
"role_id": {
"type": "long"
},
"c_time": {
"type": "date"
}
}
}
}
}
}
3、多表映射(一对一, 多对一)索引示例sql:
select a.id as _id, a.name, a.role_id, b.role_name, a.c_time from user a
left join role b on b.id = a.role_id
注:这里join操作只能是left outer join, 第一张表必须为主表!!该sql对应的es mapping示例:
{
"mytest_user": {
"mappings": {
"_doc": {
"properties": {
"name": {
"type": "text"
},
"role_id": {
"type": "long"
},
"role_name": {
"type": "text"
},
"c_time": {
"type": "date"
}
}
}
}
}
}
4、多表映射(一对多)索引示例sql:
select a.id as _id, a.name, a.role_id, c.labels, a.c_time from user a
left join (select user_id, group_concat(label order by id desc separator ';') as labels from label
group by user_id) c on c.user_id=a.id
注:left join 后的子查询只允许一张表, 即子查询中不能再包含子查询或者关联!!该sql对应的es mapping示例:
{
"mytest_user": {
"mappings": {
"_doc": {
"properties": {
"name": {
"type": "text"
},
"role_id": {
"type": "long"
},
"c_time": {
"type": "date"
},
"labels": {
"type": "text"
}
}
}
}
}
}
如果你是同步到es、rds、hbase,但是adapter实现不了你的需求,可以用下列代码的方式实现。
如果你是想要同步到redis、mongo等数据库,因为adapter目前还不支持,同样可以用代码的方式实现。
如果你是用springboot开发,目前有两种常见的方式:
本文选用第一种方式,实现 canal 到 redis的实时同步。
pom.xml
<!--lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.16</version>
</dependency>
<!-- canal-->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.3</version>
</dependency>
<!--redis-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
RedisTableUtil.java
@Slf4j
@Component
public class RedisTableUtil {
private static final String PK_NAME="id";
private static final String NULL_ID="NULL";
private final RedisTemplate redisTemplate;
public RedisTableUtil(RedisTemplate redisTemplate){
this.redisTemplate=redisTemplate;
}
/**
* 新增
* @param columnList
* @param databaseName
* @param tableName
*/
public void insert(List<CanalEntry.Column> columnList,String databaseName,String tableName){
String keyName=this.generateKeyName(columnList,databaseName,tableName);
HashOperations hashOperations= redisTemplate.opsForHash();
columnList.stream()
.forEach((column -> {
hashOperations.put(keyName,column.getName(),column.getValue());
}));
}
/**
* 删除
* @param columnList
* @param databaseName
* @param tableName
*/
public void delete(List<CanalEntry.Column> columnList,String databaseName,String tableName){
String keyName=this.generateKeyName(columnList,databaseName,tableName);
redisTemplate.delete(keyName);
}
/**
* 更新
* @param columnList
* @param databaseName
* @param tableName
*/
public void update(List<CanalEntry.Column> columnList,String databaseName,String tableName){
String keyName=this.generateKeyName(columnList,databaseName,tableName);
HashOperations hashOperations= redisTemplate.opsForHash();
columnList.stream()
.filter(CanalEntry.Column::getUpdated)
.forEach((column -> {
hashOperations.put(keyName,column.getName(),column.getValue());
}));
}
/**
* 生成 行记录 key
* @param columnList
* @param databaseName
* @param tableName
* @return
*/
private String generateKeyName(List<CanalEntry.Column> columnList,String databaseName,String tableName){
Optional<String> id= columnList.stream()
.filter(column -> PK_NAME.equals(column.getName()))
.map(CanalEntry.Column::getValue)
.findFirst();
return databaseName+"_"+tableName+"_"+id.orElse(NULL_ID);
}
}
RedisConfig.java
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate redisTemplate(RedisTemplate redisTemplate){
RedisSerializer<String> stringSerializer = new StringRedisSerializer();
redisTemplate.setKeySerializer(stringSerializer);
redisTemplate.setValueSerializer(stringSerializer);
redisTemplate.setHashKeySerializer(stringSerializer);
redisTemplate.setHashValueSerializer(stringSerializer);
return redisTemplate;
}
}
CanalServer.java
@Slf4j
@Component
public class CanalServer {
private static final String THREAD_NAME_PREFIX="canalStart-";
private final RedisTableUtil redisTableUtil;
public CanalServer(RedisTableUtil redisTableUtil){
this.redisTableUtil=redisTableUtil;
}
/**
* 初始化
* 单线程启动 canal客户端
*/
@PostConstruct
public void init() {
//需要开启一个新的线程来执行 canal 服务
Thread initThread = new CanalStartThread();
initThread.setName(THREAD_NAME_PREFIX);
initThread.start();
}
/**
* 定义 canal服务线程
*/
public class CanalStartThread extends Thread {
@Override
public void run() {
// 创建链接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("localhost", 11111),
"example", "", "");
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
try {
while (true) {
// 获取指定数量的数据
Message message = connector.getWithoutAck(1000);
long batchId = message.getId();
if (batchId != -1 && message.getEntries().size() > 0) {
entryHandler(message.getEntries());
}
connector.ack(batchId); // 提交确认
Thread.sleep(1000);
}
}catch (Exception e){
log.error("Canal线程异常,已终止:"+e.getMessage());
} finally {
connector.disconnect();
}
}
}
/**
* canal 入口处理器
* @param entrys
*/
private void entryHandler( List<Entry> entrys) {
for (Entry entry : entrys) {
//操作事物 忽略
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { continue; }
CanalEntry.RowChange rowChage = null;
String databaseName=null;
String tableName=null;
try {
databaseName=entry.getHeader().getSchemaName();
tableName=entry.getHeader().getTableName();
rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (InvalidProtocolBufferException e) {
log.error("获取数据失败:"+e.getMessage());
}
//获取执行的事件
CanalEntry.EventType eventType = rowChage.getEventType();
for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
//删除操作
if (eventType.equals(CanalEntry.EventType.DELETE)) {
redisTableUtil.delete(rowData.getBeforeColumnsList(),databaseName,tableName);
}
//添加操作
else if (eventType.equals(CanalEntry.EventType.INSERT)) {
redisTableUtil.insert(rowData.getAfterColumnsList(),databaseName,tableName);
}
//修改操作
else if(eventType.equals(CanalEntry.EventType.UPDATE)) {
redisTableUtil.insert(rowData.getAfterColumnsList(),databaseName,tableName);
}
}
}
}
}
CanalServer 中单独起了一个线程,每秒获取mysql日志。当监听到mysql表数据的新增、更新、删除操作时,会在redis中做出对应的数据操作,具体redis的更新逻辑在RedisTableUtil类中定义。
这里在while(true)
循环中,加上了 Thread.sleep(1000);
,为了避免空轮询造成的CPU占有率飙升。那么有人会问Thread.sleep 不是暂停线程吗,它就不会占有CPU了吗?Thread.sleep,主要是为了暂停当前线程,把cpu片段让出给其他线程,减缓当前线程的执行。因此它会暂停线程,但并不会占有CPU运行片段。
一般生产环境日志量大,可以将监听到的binlog事件推到消息中间件,再在消息中间件的消费端做下游数据同步的处理。
另外通过binlog监听到的数据库操作不止DML,其实还有DQL和DDL等,只不过代码中没有体现。因此可以想象的到,使用canal能实现的功能,远不止数据同步这点功能。
查看原文两年前在项目上实施oracle etl同步时,客户就提出cdc(Change Data Capture)增量同步的需求,并且明确要求基于日志来捕获数据变化。当时对于这方面的知识储备不够,只觉得这样的需求太苛刻。到了后来我实施分布式架构的方案越来越多,经常会思考如何保障数据的一致...
赞 0 收藏 0 评论 0
KerryWu 赞了文章 · 2020-12-21
MapReduce是hadoop进行多节点计算时采用的计算模型,说白了就是hadoop拆分任务的一套方法论,刚接触MapReduce这个概念时,一时很难理解,也查了很多资料,因为每个人理解不一样,反而看的越多越糊涂,其实本质是很简单的东西,这里举一个例子帮助理解,因为网上大部分是hadoop官方计算单词(wordcount)的例子,这里就换一个场景举例。
假设有以下一份成绩单
1,张三,78,87,69
2,李四,56,76,91
3,王五,65,46,84
4,赵六,89,56,98
...
各列分别是编号,学生姓名,语文成绩,数学成绩,英语成绩
,现在要求统计各科成绩最高分,假设这份成绩单非常非常的长,有上千万行,在没有任何计算机系统的帮助下,要怎么靠人工解决这个问题?
专门派一个人进行统计工作,优点是简单,缺点也很明显,需要非常长的时间,甚至数据量达到一定程度,一个人一辈子可能也统计不完
如果有足够的人可以进行统计工作,要怎么去协调这些人?假设成绩单有100w行并且有1000人可以进行统计
科目 | 人员1结果 | 人员2结果 | ... | 人员1000结果 |
---|---|---|---|---|
语文 | ||||
数学 | ||||
英语 |
科目| 人员1结果|人员2结果|...|人员1000结果
语文 | 80 | 85 | ... | 76 | |
数学 | 89 | 90 | ... | 88 | |
英语 | 94 | 85 | ... | 90 |
第一个人领到的小表格
科目 | 人员1结果 | 人员2结果 | ... | 人员10结果 | |
---|---|---|---|---|---|
语文 | 80 | 85 | ... | 76 | |
数学 | 89 | 90 | ... | 88 | |
英语 | 94 | 85 | ... | 90 |
第二个领到的小表格
科目 | 人员11结果 | 人员12结果 | ... | 人员20结果 | |
---|---|---|---|---|---|
语文 | 83 | 75 | ... | 88 | |
数学 | 79 | 95 | ... | 58 | |
英语 | 94 | 85 | ... | 90 |
那么在这个过程中,我们看到了,一份庞大的成绩单经过以下几个步骤后,最终我们获得了我们想要的结果
那么把这个过程用MapReduce语言进行描述就可以是以下过程:
另外在管理员的表格中,三个科目后面记录
我们用实际java代码解决上面的问题,假设你已经按照上一篇教程安装好了hadoop集群环境
你可以用你熟悉的ide创建一个普通java工程,建议用maven进行包管理,并加入以下包依赖
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.5.1</version>
</dependency>
Mapper对应是MapReduce中的map过程,在以下mapper代码:
StudentMapper.java
public class StudentMapper extends MapReduceBase implements
Mapper<LongWritable, Text, Text, IntWritable> {
public void map(LongWritable longWritable, Text text, OutputCollector<Text, IntWritable> outputCollector, Reporter reporter) throws IOException {
String[] ss = text.toString().split(",");
outputCollector.collect(new Text("语文"), new IntWritable(Integer.parseInt(ss[2])));
outputCollector.collect(new Text("数学"), new IntWritable(Integer.parseInt(ss[3])));
outputCollector.collect(new Text("英语"), new IntWritable(Integer.parseInt(ss[4])));
}
}
StudentMapper实现了 Mapper<LongWritable, Text, Text, IntWritable>
接口,这里有四个参数,四个参数含义如下
1,张三,78,87,69
方法map(LongWritable longWritable, Text text, OutputCollector<Text, IntWritable> outputCollector, Reporter reporter)
的几个参数和上面含义一样,注意到outputCollector是一个数组,说明这里可以写入多个结果,reporter可以向hadoop汇报任务进度。在这个mapper里面,我们并没有做什么计算,我们只是把文本里面的成绩解析出来,并且按科目放到outputCollector中,相当于大家第一次都没干活,只是把数据整理好。经过mapper后,数据从
1,张三,78,87,69
2,李四,56,76,91
3,王五,65,46,84
4,赵六,89,56,98
...
变成了
- | - | - | ||||
---|---|---|---|---|---|---|
语文 | 78 | 56 | 65 | 89 | ... | |
数学 | 87 | 76 | 46 | 56 | ... | |
英语 | 69 | 91 | 84 | 98 | ... |
StudentReducer.java
public class StudentReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text text, Iterator<IntWritable> iterator, OutputCollector<Text, IntWritable> outputCollector, Reporter reporter) throws IOException {
StringBuffer str = new StringBuffer();
Integer max = -1;
while (iterator.hasNext()) {
Integer score = iterator.next().get();
if (score > max) {
max = score;
}
}
outputCollector.collect(new Text(text.toString()), new IntWritable(max));
}
}
Reducer就开始真正执行计算了,reducer函数reduce(Text text, Iterator<IntWritable> iterator, OutputCollector<Text, IntWritable> outputCollector, Reporter reporter)
参数含义如下:
key:语文 value:90
结构的数据,所以类型为<Text, IntWritable>
前面提到过mapper会把数据整理好,并且按科目将成绩写入的outputCollector中,那么到了reducer这一步,hadoop就会把mapper写入的数据按照key进行汇总(也就是科目),并且交付给reducer,reducer负责计算里面最高分,并且也将结果写入outputCollector。
StudentProcessor
public class StudentProcessor {
public static void main(String args[]) throws Exception {
JobConf conf = new JobConf(StudentProcessor.class);
conf.setJobName("max_scroe_poc1");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(StudentMapper.class);
conf.setReducerClass(StudentReducer.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
}
我们还需要一个包含main函数的启动类,执行mvn package
命令进行打包,我们假设包名为hadoop-score-job.jar
,将jar包通过ftp等工具上传到服务器目录下。
hadoop借助hdfs分布式文件系统,能够将大文件存储在多个节点,通过hdfs cli工具,我们感觉在操作本地文件一样,在上面的代码中FileInputFormat.setInputPaths(conf, new Path(args[0]));
设置了MapReduce的数据来源,用户指定目录,该目录下文件作为数据来源,这里的目录就是hdfs中的目录,并且该目录必须存在,而且数据需要上传到该目录下,执行以下命令创建目录
hadoop fs -mkdir poc01_input
执行以下命令将数据导入到hdfs中
hadoop fs -put score.txt poc01_input
score.txt
内容为
1,张三,78,87,69
2,李四,56,76,91
3,王五,65,46,84
4,赵六,89,56,98
通过ls
命令可以查看文件是否上传成功
$ hadoop fs -ls poc01_input
Found 1 items
-rw-r--r-- 1 hadoop supergroup 72 2020-12-13 15:43 poc01_input/score.txt
执行以下命令开始运行job
$ hadoop jar hadoop-score-job.jar com.hadoop.poc.StudentProcessor poc01_input poc01_output
20/12/13 16:01:33 INFO client.RMProxy: Connecting to ResourceManager at master/172.16.8.42:18040
20/12/13 16:01:33 INFO client.RMProxy: Connecting to ResourceManager at master/172.16.8.42:18040
20/12/13 16:01:34 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
20/12/13 16:01:34 INFO mapred.FileInputFormat: Total input files to process : 1
20/12/13 16:01:35 INFO mapreduce.JobSubmitter: number of splits:2
20/12/13 16:01:35 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1607087481584_0005
20/12/13 16:01:35 INFO conf.Configuration: resource-types.xml not found
20/12/13 16:01:35 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
20/12/13 16:01:35 INFO resource.ResourceUtils: Adding resource type - name = memory-mb, units = Mi, type = COUNTABLE
20/12/13 16:01:35 INFO resource.ResourceUtils: Adding resource type - name = vcores, units = , type = COUNTABLE
20/12/13 16:01:36 INFO impl.YarnClientImpl: Submitted application application_1607087481584_0005
20/12/13 16:01:36 INFO mapreduce.Job: The url to track the job: http://master:18088/proxy/application_1607087481584_0005/
20/12/13 16:01:36 INFO mapreduce.Job: Running job: job_1607087481584_0005
20/12/13 16:01:43 INFO mapreduce.Job: Job job_1607087481584_0005 running in uber mode : false
20/12/13 16:01:43 INFO mapreduce.Job: map 0% reduce 0%
20/12/13 16:01:51 INFO mapreduce.Job: map 100% reduce 0%
20/12/13 16:01:57 INFO mapreduce.Job: map 100% reduce 100%
20/12/13 16:01:57 INFO mapreduce.Job: Job job_1607087481584_0005 completed successfully
20/12/13 16:01:57 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=84
FILE: Number of bytes written=625805
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=316
HDFS: Number of bytes written=30
HDFS: Number of read operations=9
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=2
Launched reduce tasks=1
Data-local map tasks=2
Total time spent by all maps in occupied slots (ms)=12036
Total time spent by all reduces in occupied slots (ms)=3311
Total time spent by all map tasks (ms)=12036
Total time spent by all reduce tasks (ms)=3311
Total vcore-milliseconds taken by all map tasks=12036
Total vcore-milliseconds taken by all reduce tasks=3311
Total megabyte-milliseconds taken by all map tasks=12324864
Total megabyte-milliseconds taken by all reduce tasks=3390464
Map-Reduce Framework
Map input records=4
Map output records=12
Map output bytes=132
Map output materialized bytes=90
Input split bytes=208
Combine input records=12
Combine output records=6
Reduce input groups=3
Reduce shuffle bytes=90
Reduce input records=6
Reduce output records=3
Spilled Records=12
Shuffled Maps =2
Failed Shuffles=0
Merged Map outputs=2
GC time elapsed (ms)=395
CPU time spent (ms)=1790
Physical memory (bytes) snapshot=794595328
Virtual memory (bytes) snapshot=5784080384
Total committed heap usage (bytes)=533200896
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=108
File Output Format Counters
Bytes Written=30
job执行完后,结果会保存在poc01_output
目录下
$ hadoop fs -ls poc01_output2
Found 2 items
-rw-r--r-- 1 hadoop supergroup 0 2020-12-13 16:01 poc01_output2/_SUCCESS
-rw-r--r-- 1 hadoop supergroup 30 2020-12-13 16:01 poc01_output2/part-00000
$ hadoop fs -cat poc01_output2/part-00000
数学 87
英语 98
语文 89
查看原文MapReduce是hadoop进行多节点计算时采用的计算模型,说白了就是hadoop拆分任务的一套方法论,刚接触MapReduce这个概念时,一时很难理解,也查了很多资料,因为每个人理解不一样,反而看的越多越糊涂,其实本质是很简单的东西,这里举一个例子帮助理解,因为网上大部...
赞 1 收藏 0 评论 0
KerryWu 收藏了文章 · 2020-12-04
JNI是Java Native Interface
的缩写,Java本地接口(JNI)提供了将Java与C/C++、汇编等本地代码集成的方案,该规范使得在 Java 虚拟机内运行的 Java 代码能够与其它编程语言互相操作,包括创建本地方法、更新 Java 对象、调用 Java 方法,引用 Java 类,捕捉和抛出异常等,也允许 Java 代码调用 C/C++ 或汇编语言编写的程序和库。使用java与本地已编译的代码交互,通常会丧失平台可移植性。但是,有些情况下这样做是可以接受的,甚至是必须的。例如,使用一些旧的库,与硬件、操作系统进行交互,或者为了提高程序的性能。JNI标准至少要保证本地代码能工作在任何Java 虚拟机环境。
在Java里创建并启动一个线程的代码:
public static void main(String[] args) {
Thread thread = new Thread(){
@Override
public void run() {
// do something
}
};
thread.start();
}
通过查看 Thread 类的源码,可以发现 start() 方法中最核心的就是调用了 start0() 方法,而 start0() 方法又是一个native方法:
public synchronized void start() {
if (threadStatus != 0)
throw new IllegalThreadStateException();
group.add(this);
boolean started = false;
try {
start0(); // 调用本地方法启动线程
started = true;
} finally {
try {
if (!started) {
group.threadStartFailed(this);
}
} catch (Throwable ignore) {
/* do nothing. If start0 threw a Throwable then
it will be passed up the call stack */
}
}
}
// native 本地方法
private native void start0();
这里就是通过 Java 代码来调用 JVM 本地的 C 代码,C 中调用系统函数创建线程,并且会反过来调用 Thread 中的 run() 方法,这也是为什么调用了 start() 方法后会自动执行 run() 方法中的逻辑。
了解一下 Linux 操作系统中是如何创建一个线程的,创建线程函数:
int pthread_create(pthread_t *thread, const pthread_attr_t *attr,
void *(*start_routine) (void *), void *arg);
使用 man 命令可以查看 pthread_create,这个函数是linux系统的函数,可以用C或者C++直接调用,函数有四个参数:
在 Linux 上启动一个线程的代码:
#include <pthread.h> //头文件
#include <stdio.h>
pthread_t pid; //定义一个变量,接受创建线程后的线程id
//定义线程的主体函数
void* thread_entity(void* arg)
{
printf("new Thread!");
}
int main()
{
// 调用操作系统的函数创建线程,注意四个参数
pthread_create(&pid, NULL, thread_entity, NULL);
usleep(100);
printf("main\n");
}
下面我们就通过 JNI 技术简单实现一个 Thread 类来模拟线程创建和执行过程。
package com.fantasy;
public class Sample {
static {
// 装载库,保证JVM在启动的时候就会装载,这里的库指的是C程序生成的动态链接库
// Linux下是.so文件,Windows下是.dll文件
System.loadLibrary( "ThreadNative" );
}
public static void main(String[] args) {
new MyThread(() -> System.out.println("Java run method...")).start();
}
}
class MyThread implements Runnable {
private Runnable target;
public MyThread(Runnable target) {
this.target = target;
}
@Override
public void run() {
if (target != null) {
target.run();
}
}
public synchronized void start() {
start0();
}
private native void start0();
}
编译成class文件:
[root@sdb1 fantasy]# pwd
/root/com/fantasy
[root@sdb1 fantasy]# javac Sample.java
[root@sdb1 fantasy]# ll
total 12
-rw-r--r--. 1 root root 501 Jun 1 22:10 MyThread.class
-rw-r--r--. 1 root root 1176 Jun 1 22:10 Sample.class
-rw-r--r--. 1 root root 692 Jun 1 22:09 Sample.java
生成.h头文件:
[root@sdb1 ~]# pwd
/root
[root@sdb1 ~]# javah com.fantasy.MyThread
[root@sdb1 ~]# ll
total 4
drwxr-xr-x. 3 root root 21 Jun 1 22:07 com
-rw-r--r--. 1 root root 429 Jun 1 22:12 com_fantasy_MyThread.h
注意:native 方法在哪个类中就使用javah命令生成对应的头文件,运行 javah 命令需要在包路径外面,javah packageName.className。
下面是 com_fantasy_MyThread.h 头文件的内容:
/* DO NOT EDIT THIS FILE - it is machine generated */
#include <jni.h>
/* Header for class com_fantasy_MyThread */
#ifndef _Included_com_fantasy_MyThread
#define _Included_com_fantasy_MyThread
#ifdef __cplusplus
extern "C" {
#endif
/*
* Class: com_fantasy_MyThread
* Method: start0
* Signature: ()V
*/
JNIEXPORT void JNICALL Java_com_fantasy_MyThread_start0
(JNIEnv *, jobject);
#ifdef __cplusplus
}
#endif
#endif
其中 Java_com_fantasy_MyThread_start0 方法就是需要在C程序中定义的方法。
在注释中我们可以看到有一个 Signature,这个是方法的签名,表示方法的参数类型和返回值类型,简单了解一下:
Java类型 | Signature | 说明 |
---|---|---|
boolean | Z | |
byte | B | |
char | C | |
short | S | |
int | I | |
long | L | |
float | F | |
double | D | |
void | V | |
Object | L+/分隔的完整类名 | 例如:Ljava/lang/String表示String类型 |
Array | [签名 | 例如: [I表示int类型的数组, [Ljava/lang/String表示String类型的数组 |
Method | (参数签名)返回类型签名 | 例如: ([I)I表示参数类型为int数组,返回值int类型的方法 |
上一步已经生成了.h头文件,现在我们来实现里面定义的方法。创建 thread.c 文件,并编写如下代码:
#include <pthread.h>
#include <stdio.h>
#include "com_fantasy_MyThread.h" // 导入刚刚编译的那个.h文件
pthread_t pid;
void* thread_entity(void* arg)
{
printf("new Thread!\n");
}
// 这个方法定义要参考.h文件中的方法
JNIEXPORT void JNICALL Java_com_fantasy_MyThread_start0
(JNIEnv *env, jobject obj){
pthread_create(&pid, NULL, thread_entity, NULL);
sleep(1);
printf("main thread %lu, create thread %lu\n", pthread_self(), pid);
//通过反射调用java中的方法
//找class,使用 FindClass 方法,参数就是要调用的函数的类的完全限定名,但是需要把点换成/
jclass cls = (*env)->FindClass(env, "com/fantasy/MyThread");
//获取 run 方法
jmethodID mid = (*env)->GetMethodID(env, cls, "run", "()V");
// 调用方法
(*env)->CallVoidMethod(env, obj, mid);
printf("success to call run() method!\n");
}
将这个 thread.c 文件编译为一个动态链接库,命名规则为 libxxx.so,xxx要跟Java中 System.loadLibrary( "ThreadNative") 指定的字符串保持一致,也就是 ThreadNative,编译命令如下:
gcc -fPIC -I /opt/jdk1.8.0_161/include/ -I /opt/jdk1.8.0_161/include/linux -shared -o libThreadNative.so thread.c
至此,Java代码与C代码编写与编译完成。
运行前,还需要将 .so 文件所在路径加入到path中,这样Java才能找到这个库文件,.so 文件的路径为"/root/libThreadNative.so",所以配置如下:
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/root/
让我们来运行Java主类来看一下结果吧!
[root@sdb1 ~]# pwd
/root
[root@sdb1 ~]# ll
total 20
drwxr-xr-x. 3 root root 21 Jun 1 22:07 com
-rw-r--r--. 1 root root 429 Jun 1 22:12 com_fantasy_MyThread.h
-rwxr-xr-x. 1 root root 8336 Jun 1 23:32 libThreadNative.so
-rw-r--r--. 1 root root 666 Jun 1 23:32 thread.c
[root@sdb1 ~]# java com.fantasy.Sample
new Thread!
main thread 139986752292608, create thread 139986681366272
Java run method...
success to call run() method!
可以看到输出结果符合我们的预期。通过模拟Thread,也可以得出以下结论:
JNI是Java Native Interface的缩写,Java本地接口(JNI)提供了将Java与C/C++、汇编等本地代码集成的方案,该规范使得在 Java 虚拟机内运行的 Java 代码能够与其它编程语言互相操作,包括创建本地方法、更新 Java 对象、调用 Java 方法,引用 Java 类,捕捉和抛出异常...
KerryWu 赞了文章 · 2020-11-23
本文节选自我开源的 JavaGuide :https://github.com/Snailclimb/JavaGuide (Github标星92k+!一份涵盖大部分 Java 程序员所需要掌握的核心知识。准备 Java 面试,首选 JavaGuide!)
经历过技术面试的小伙伴想必对这个两个概念已经再熟悉不过了!
Guide哥当年参加面试的时候,不夸张地说,只要问到分布式相关的内容,面试官几乎是必定会问这两个分布式相关的理论。
并且,这两个理论也可以说是小伙伴们学习分布式相关内容的基础了!
因此,小伙伴们非常非常有必要将这理论搞懂,并且能够用自己的理解给别人讲出来。
这篇文章我会站在自己的角度对这两个概念进行解读!
个人能力有限。如果文章有任何需要改善和完善的地方,欢迎在评论区指出,共同进步!——爱你们的Guide哥
CAP 理论/定理起源于 2000年,由加州大学伯克利分校的Eric Brewer教授在分布式计算原理研讨会(PODC)上提出,因此 CAP定理又被称作 布鲁尔定理(Brewer’s theorem)
2年后,麻省理工学院的Seth Gilbert和Nancy Lynch 发表了布鲁尔猜想的证明,CAP理论正式成为分布式领域的定理。
CAP 也就是 Consistency(一致性)、Availability(可用性)、Partition Tolerance(分区容错性) 这三个单词首字母组合。
CAP 理论的提出者布鲁尔在提出 CAP 猜想的时候,并没有详细定义 Consistency、Availability、Partition Tolerance 三个单词的明确定义。
因此,对于 CAP 的民间解读有很多,一般比较被大家推荐的是下面 👇 这种版本的解。
在理论计算机科学中,CAP 定理(CAP theorem)指出对于一个分布式系统来说,当设计读写操作时,只能能同时满足以下三点中的两个:
什么是网络分区?
分布式系统中,多个节点之前的网络本来是连通的,但是因为某些故障(比如部分节点网络出了问题)某些节点之间不连通了,整个网络就分成了几块区域,这就叫网络分区。
大部分人解释这一定律时,常常简单的表述为:“一致性、可用性、分区容忍性三者你只能同时达到其中两个,不可能同时达到”。实际上这是一个非常具有误导性质的说法,而且在 CAP 理论诞生 12 年之后,CAP 之父也在 2012 年重写了之前的论文。
当发生网络分区的时候,如果我们要继续服务,那么强一致性和可用性只能 2 选 1。也就是说当网络分区之后 P 是前提,决定了 P 之后才有 C 和 A 的选择。也就是说分区容错性(Partition tolerance)我们是必须要实现的。简而言之就是:CAP 理论中分区容错性 P 是一定要满足的,在此基础上,只能满足可用性 A 或者一致性 C。
因此,分布式系统理论上不可能选择 CA 架构,只能选择 CP 或者 AP 架构。
为啥无同时保证 CA 呢?
举个例子:若系统出现“分区”,系统中的某个节点在进行写操作。为了保证 C, 必须要禁止其他节点的读写操作,这就和 A 发生冲突了。如果为了保证 A,其他节点的读写操作正常的话,那就和 C 发生冲突了。
选择的关键在于当前的业务场景,没有定论,比如对于需要确保强一致性的场景如银行一般会选择保证 CP 。
我这里以注册中心来探讨一下 CAP 的实际应用。考虑到很多小伙伴不知道注册中心是干嘛的,这里简单以 Dubbo 为例说一说。
下图是 Dubbo 的架构图。注册中心 Registry 在其中扮演了什么角色呢?提供了什么服务呢?
注册中心负责服务地址的注册与查找,相当于目录服务,服务提供者和消费者只在启动时与注册中心交互,注册中心不转发请求,压力较小。
常见的可以作为注册中心的组件有:ZooKeeper、Eureka、Nacos...。
在进行分布式系统设计和开发时,我们不应该仅仅局限在 CAP 问题上,还要关注系统的扩展性、可用性等等
在系统发生“分区”的情况下,CAP 理论只能满足 CP 或者 AP。要注意的是,这里的前提是系统发生了“分区”
如果系统没有发生“分区”的话,节点间的网络连接通信正常的话,也就不存在 P 了。这个时候,我们就可以同时保证 C 和 A 了。
总结:如果系统发生“分区”,我们要考虑选择 CP 还是 AP。如果系统没有发生“分区”的话,我们要思考如何保证 CA 。
BASE 理论起源于 2008 年, 由eBay的架构师Dan Pritchett在ACM上发表。
BASE 是 Basically Available(基本可用) 、Soft-state(软状态) 和 Eventually Consistent(最终一致性) 三个短语的缩写。BASE 理论是对 CAP 中一致性 C 和可用性 A 权衡的结果,其来源于对大规模互联网系统分布式实践的总结,是基于 CAP 定理逐步演化而来的,它大大降低了我们对系统的要求。
即使无法做到强一致性,但每个应用都可以根据自身业务特点,采用适当的方式来使系统达到最终一致性。
也就是牺牲数据的一致性来满足系统的高可用性,系统中一部分数据不可用或者不一致时,仍需要保持系统整体“主要可用”。
BASE 理论本质上是对 CAP 的延伸和补充,更具体地说,是对 CAP 中 AP 方案的一个补充。
为什么这样说呢?
CAP 理论这节我们也说过了:
如果系统没有发生“分区”的话,节点间的网络连接通信正常的话,也就不存在 P 了。这个时候,我们就可以同时保证 C 和 A 了。因此,如果系统发生“分区”,我们要考虑选择 CP 还是 AP。如果系统没有发生“分区”的话,我们要思考如何保证 CA 。
因此,AP 方案只是在系统发生分区的时候放弃一致性,而不是永远放弃一致性。在分区故障恢复后,系统应该达到最终一致性。这一点其实就是 BASE 理论延伸的地方。
基本可用是指分布式系统在出现不可预知故障的时候,允许损失部分可用性。但是,这绝不等价于系统不可用。
什么叫允许损失部分可用性呢?
软状态指允许系统中的数据存在中间状态(CAP 理论中的数据不一致),并认为该中间状态的存在不会影响系统的整体可用性,即允许系统在不同节点的数据副本之间进行数据同步的过程存在延时。
最终一致性强调的是系统中所有的数据副本,在经过一段时间的同步后,最终能够达到一个一致的状态。因此,最终一致性的本质是需要系统保证最终数据能够达到一致,而不需要实时保证系统数据的强一致性。
分布式一致性的 3 种级别:
- 强一致性 :系统写入了什么,读出来的就是什么。
- 弱一致性 :不一定可以读取到最新写入的值,也不保证多少时间之后读取到的数据是最新的,只是会尽量保证某个时刻达到数据一致的状态。
- 最终一致性 :弱一致性的升级版。,系统会保证在一定时间内达到数据一致的状态,
业界比较推崇是最终一致性级别,但是某些对数据一致要求十分严格的场景比如银行转账还是要保证强一致性。
ACID 是数据库事务完整性的理论,CAP 是分布式系统设计理论,BASE 是 CAP 理论中 AP 方案的延伸。
图解计算机基础+个人原创的 Java 面试手册PDF版。
微信搜“JavaGuide”回复“计算机基础”即可获取图解计算机基础+个人原创的 Java 面试手册。
本文节选自我开源的 JavaGuide :[链接] (Github标星92k+!一份涵盖大部分 Java 程序员所需要掌握的核心知识。准备 Java 面试,首选 JavaGuide!)
赞 9 收藏 5 评论 0
查看全部 个人动态 →
后端开发规范文档
注册于 2018-07-24
个人主页被 4k 人浏览
推荐关注