ShardingSphere

ShardingSphere 查看完整档案

北京编辑  |  填写毕业院校Apache  |  TLP 编辑 shardingsphere.apache.org 编辑
编辑

Apache ShardingSphere 是一套开源的分布式数据库中间件解决方案组成的生态圈,它由Sharding-JDBC、Sharding-Proxy和Sharding-Sidecar(规划中)这3款相互独立,却又能够混合部署配合使用的产品组成。

它们均提供标准化的数据分片、分布式事务和数据库治理功能,可适用于如Java同构、异构语言、云原生等各种多样化的应用场景。

个人动态

ShardingSphere 发布了文章 · 11月16日

万字长文详解Shardingsphere对XA分布式事务的支持

Apache ShardingSphere 是一套开源的分布式数据库中间件解决方案组成的生态圈,它由 JDBC、Proxy 和 Sidecar(规划中)这 3 款相互独立,却又能够混合部署配合使用的产品组成。 它们均提供标准化的数据分片、分布式事务和数据库治理功能,可适用于如 Java 同构、异构语言、云原生等各种多样化的应用场景。

ShardingSphere 已于2020年4月16日成为 Apache 软件基金会的顶级项目。

分布式系统CAP理论

一致性(Consistency)
  • 一致性指 all nodes see the same data at the same time,即更新操作成功并返回客户端完成后,所有节点在同一时间的数据完全一致,不能存在中间状态。
  • 关于一致性,如果用户时刻看到的数据都是一致的,那么称之为强一致性。如果允许存在中间状态,只要求经过一段时间后,数据最终是一致的,则称之为最终一致性。此外,如果允许存在部分数据不一致,那么就称之为弱一致性
可用性(Availability)
  • 可用性是指系统提供的服务必须一直处于可用的状态,对于用户的每一个操作请求总是能够在有限的时间内返回结果。有限的时间内是指:对于用户的一个操作请求,系统必须能够在指定的时间内返回对应的处理结果,如果超过了这个时间范围,那么系统就被认为是不可用的。
  • 返回结果是可用性的另一个非常重要的指标,它要求系统在完成对用户请求的处理后,返回一个正常的响应结果,不论这个结果是成功还是失败。
分区容错性(Partition tolerance )
  • 布式系统在遇到任何网络分区故障的时候,仍然需要能够保证对外提供满足一致性和可用性的服务,除非是整个网络环境都发生了故障。

X/Open DTP模型与XA规范

X/Open,即现在的open group,是一个独立的组织,主要负责制定各种行业技术标准。官网地址:http://www.opengroup.org/。X/Open组织主要由各大知名公司或者厂商进行支持,这些组织不光遵循X/Open组织定义的行业技术标准,也参与到标准的制定。下图展示了open group目前主要成员(官网截图):

DTP模型

  • 应用程序(Application Program ,简称AP):用于定义事务边界(即定义事务的开始和结束),并且在事务边界内对资源进行操作。
  • 资源管理器(Resource Manager,简称RM,一般也称为事务参与者):如数据库、文件系统等,并提供访问资源的方式。

    • 事务管理器(Transaction Manager ,简称TM,一般也称为事务协调者):负责分配事务唯一标识,监控事务的执行进度,并负责事务的提交、回滚等。
XA规范


这里的接口规范特别多,我们只要来讲讲几个最重要的。

  • xa_start : 在 RM端调用此接口开启一个XA事务,后面需要接上XID 作为参数。
  • xa_end : 取消当前线程与事务的关联, 与 xa_start是配对使用。
  • xa_prepare : 询问RM 是否已经准备好了提交事务。
  • xa_commit : 通知RM 提交事务分支。
  • xa_rollback : 通知RM 提交回滚事务分支。
XA二阶段提交
  • 阶段一 :TM通知各个RM准备提交它们的事务分支。如果RM判断自己进行的工作可以被提交,那就就对工作内容进行持久化,再给TM肯定答复;要是发生了其他情况,那给TM的都是否定答复。在发送了否定答复并回滚了已经的工作后,RM就可以丢弃这个事务分支信息。
  • 阶段二 :TM根据阶段1各个RM prepare的结果,决定是提交还是回滚事务。如果所有的RM都prepare成功,那么TM通知所有的RM进行提交;如果有RM prepare失败的话,则TM通知所有RM回滚自己的事务分支。

MySQL对XA协议的支持

MySQL5.0.3开始支持XA分布式事务,且只有InnoDB存储引擎支持XA事务。
MySQLDTP模型中也是属于资源管理器RM

MySQL XA 事务的 SQL语法
XA START xid    //开启XA事务,xid是一个唯一值,表示事务分支标识符
XA END xid  //结束一个XA事务,
XA PREPARE xid 准备提交
XA COMMIT xid [ONE PHASE] //提交事务。两阶段提交协议中,如果只有一个RM参与,那么可以优化为一阶段提交
XA ROLLBACK xid  //回滚
XA RECOVER [CONVERT XID]  //列出所有处于PREPARE阶段的XA事务
MySQL xid详解

mysql中使用xid来作为一个事务分支的标识符。通过C语言进行描述,如下:

/∗
∗ Transaction branch identification: XID and NULLXID:
∗/
#define XIDDATASIZE 128  /∗ size in bytes ∗/
#define MAXGTRIDSIZE 64  /∗ maximum size in bytes of gtrid ∗/
#define MAXBQUALSIZE 64  /∗ maximum size in bytes of bqual ∗/
struct xid_t {
    long formatID;     /* format identifier */
    long gtrid_length; /* value 1-64 */
    long bqual_length; /* value 1-64 */
    char data[XIDDATASIZE];
    };
/∗
∗ A value of -1 in formatID means that the XID is null.
∗/
typedef struct xid_t XID;
/∗
∗ Declarations of routines by which RMs call TMs:
∗/
extern int ax_reg(int, XID ∗, long);
extern int ax_unreg(int, long);
  • gtrid :全局事务标识符(global transaction identifier),最大不能超过64字节。
  • bqual :分支限定符(branch qualifier),最大不能超过64字节。
  • formatId:记录gtrid、bqual的格式,类似于memcached中flags字段的作用。
  • data :xid的值,其是 gtrid和bqual拼接后的内容。。
MySQL XA事务状态

JTA规范

JTA(Java Transaction API):为J2EE平台提供了分布式事务服务(distributed transaction)的能力。 某种程度上,可以认为JTA规范是XA规范的Java版,其把XA规范中规定的DTP模型交互接口抽象成Java接口中的方法,并规定每个方法要实现什么样的功能。

JTA 定义的接口
  • javax.transaction.TransactionManager : 事务管理器,负责事务的begin, commitrollback 等命令。
  • javax.transaction.UserTransaction:用于声明一个分布式事务。
  • javax.transaction.TransactionSynchronizationRegistry:事务同步注册
  • javax.transaction.xa.XAResource:定义RM提供给TM操作的接口
  • javax.transaction.xa.Xid:事务xid接口。
TM provider:
  • 实现UserTransaction、TransactionManager、Transaction、TransactionSynchronizationRegistry、Synchronization、Xid接口,通过与XAResource接口交互来实现分布式事务。
RM provider:
  • XAResource接口需要由资源管理器者来实现,XAResource接口中定义了一些方法,这些方法将会被TM进行调用,如:

    • start方法:开启事务分支
    • end方法:结束事务分支
    • prepare方法:准备提交
    • commit方法:提交
    • rollback方法:回滚
    • recover方法:列出所有处于PREPARED状态的事务分支

ShardingSphere对XA分布式事务的支持

ShardingSphere针对XA分布式事务的接口以及JTA规范,提供了标准的,基于SPI实现的org.apache.shardingsphere.transaction.spi.ShardingTransactionManager

public interface ShardingTransactionManager extends AutoCloseable {

    /**
     * Initialize sharding transaction manager.
     *
     * @param databaseType database type
     * @param resourceDataSources resource data sources
     */
    void init(DatabaseType databaseType, Collection<ResourceDataSource> resourceDataSources);

    /**
     * Get transaction type.
     *
     * @return transaction type
     */
    TransactionType getTransactionType();

    /**
     * Judge is in transaction or not.
     *
     * @return in transaction or not
     */
    boolean isInTransaction();

    /**
     * Get transactional connection.
     *
     * @param dataSourceName data source name
     * @return connection
     * @throws SQLException SQL exception
     */
    Connection getConnection(String dataSourceName) throws SQLException;

    /**
     * Begin transaction.
     */
    void begin();

    /**
     * Commit transaction.
     */
    void commit();

    /**
     * Rollback transaction.
     */
    void rollback();
}

对于XA分布式事务的支持的具体实现类为 :org.apache.shardingsphere.transaction.xa.XAShardingTransactionManager
在此类中,会调用基于SPI实现的org.apache.shardingsphere.transaction.xa.spi.XATransactionManager,来进行XA事务的管理操作。

总结

我们了解了分布式事务的CAP理论,了解了X/Open的DTP模型,以及XA的接口规范,MySQL对XA协议的支持。最好我们讲解了JTA的规范,以及ShardingSphere对XA事务进行整合的时候定义的SPI接口,这些都是很重要的理论基础,接下来,我们将详细来讲解基于AtomkikosXATransactionManager的具体实现,以及源码解析。

Shardingsphere整合Atomikos对XA分布式事务的源码解析

Atomikos(https://www.atomikos.com/),其实是一家公司的名字,提供了基于JTA规范的XA分布式事务TM的实现。其旗下最著名的产品就是事务管理器。产品分两个版本:

  • TransactionEssentials:开源的免费产品;
  • ExtremeTransactions:上商业版,需要收费。

这两个产品的关系如下图所示:

ExtremeTransactions在TransactionEssentials的基础上额外提供了以下功能(重要的):

  • 支持TCC:这是一种柔性事务
  • 支持通过RMI、IIOP、SOAP这些远程过程调用技术,进行事务传播。
  • 事务日志云存储,云端对事务进行恢复,并且提供了完善的管理后台。

org.apache.shardingsphere.transaction.xa.XAShardingTransactionManager详解

我们简单的来回顾下org.apache.shardingsphere.transaction.spi.ShardingTransactionManager

public interface ShardingTransactionManager extends AutoCloseable {

    /**
     * Initialize sharding transaction manager.
     *
     * @param databaseType database type
     * @param resourceDataSources resource data sources
     */
    void init(DatabaseType databaseType, Collection<ResourceDataSource> resourceDataSources);

    /**
     * Get transaction type.
     *
     * @return transaction type
     */
    TransactionType getTransactionType();

    /**
     * Judge is in transaction or not.
     *
     * @return in transaction or not
     */
    boolean isInTransaction();

    /**
     * Get transactional connection.
     *
     * @param dataSourceName data source name
     * @return connection
     * @throws SQLException SQL exception
     */
    Connection getConnection(String dataSourceName) throws SQLException;

    /**
     * Begin transaction.
     */
    void begin();

    /**
     * Commit transaction.
     */
    void commit();

    /**
     * Rollback transaction.
     */
    void rollback();
}

我们重点县关注init方法,从它的命名,你就应该能够看出来,这是整个框架的初始化方法,让我们来看看它是如何进行初始化的。


 private final Map<String, XATransactionDataSource> cachedDataSources = new HashMap<>();

 private final XATransactionManager xaTransactionManager = XATransactionManagerLoader.getInstance().getTransactionManager();

    @Override
    public void init(final DatabaseType databaseType, final Collection<ResourceDataSource> resourceDataSources) {
        for (ResourceDataSource each : resourceDataSources) {
            cachedDataSources.put(each.getOriginalName(), new XATransactionDataSource(databaseType, each.getUniqueResourceName(), each.getDataSource(), xaTransactionManager));
        }
        xaTransactionManager.init();
    }
  • 首先SPI的方式加载XATransactionManager的具体实现类,这里返回的就是org.apache.shardingsphere.transaction.xa.atomikos.manager.AtomikosTransactionManager
  • 我们在关注下 new XATransactionDataSource() , 进入 org.apache.shardingsphere.transaction.xa.jta.datasource。XATransactionDataSource类的构造方法。
public XATransactionDataSource(final DatabaseType databaseType, final String resourceName, final DataSource dataSource, final XATransactionManager xaTransactionManager) {
        this.databaseType = databaseType;
        this.resourceName = resourceName;
        this.dataSource = dataSource;
        if (!CONTAINER_DATASOURCE_NAMES.contains(dataSource.getClass().getSimpleName())) {
            // 重点关注 1 ,返回了xaDatasource
            xaDataSource = XADataSourceFactory.build(databaseType, dataSource);
            this.xaTransactionManager = xaTransactionManager;
            // 重点关注2 注册资源
            xaTransactionManager.registerRecoveryResource(resourceName, xaDataSource);
        }
    }
  • 我们重点来关注 XADataSourceFactory.build(databaseType, dataSource),从名字我们就可以看出,这应该是返回JTA规范里面的XADataSource,在ShardingSphere里面很多的功能,可以从代码风格的命名上就能猜出来,这就是优雅代码(吹一波)。不多逼逼,我们进入该方法。
public final class XADataSourceFactory {

    public static XADataSource build(final DatabaseType databaseType, final DataSource dataSource) {
        return new DataSourceSwapper(XADataSourceDefinitionFactory.getXADataSourceDefinition(databaseType)).swap(dataSource);
    }
}
  • 首先又是一个SPI定义的 XADataSourceDefinitionFactory,它根据不同的数据库类型,来加载不同的方言。然后我们进入 swap方法。
 public XADataSource swap(final DataSource dataSource) {
        XADataSource result = createXADataSource();
        setProperties(result, getDatabaseAccessConfiguration(dataSource));
        return result;
    }
  • 很简明,第一步创建,XADataSource,第二步给它设置属性(包含数据的连接,用户名密码等),然后返回。
  • 返回 XATransactionDataSource 类,关注 xaTransactionManager.registerRecoveryResource(resourceName, xaDataSource); 从名字可以看出,这是注册事务恢复资源。这个我们在事务恢复的时候详解。
  • 返回 XAShardingTransactionManager.init() ,我们重点来关注:

xaTransactionManager.init();,最后进入AtomikosTransactionManager.init()。流程图如下:

代码:

public final class AtomikosTransactionManager implements XATransactionManager {

    private final UserTransactionManager transactionManager = new UserTransactionManager();

    private final UserTransactionService userTransactionService = new UserTransactionServiceImp();

    @Override
    public void init() {
        userTransactionService.init();
    }

}
  • 进入UserTransactionServiceImp.init()
private void initialize() {
       //添加恢复资源 不用关心
        for (RecoverableResource resource : resources_) {
            Configuration.addResource ( resource );
        }
        for (LogAdministrator logAdministrator : logAdministrators_) {
            Configuration.addLogAdministrator ( logAdministrator );
        }
         //注册插件 不用关心
        for (TransactionServicePlugin nxt : tsListeners_) {
            Configuration.registerTransactionServicePlugin ( nxt );
        }
        //获取配置属性 重点关心
        ConfigProperties configProps = Configuration.getConfigProperties();
        configProps.applyUserSpecificProperties(properties_);
        //进行初始化
        Configuration.init();
    }
  • 我们重点关注,获取配置属性。最后进入com.atomikos.icatch.provider.imp.AssemblerImp.initializeProperties()方法。
    @Override
    public ConfigProperties initializeProperties() {
         //读取classpath下的默认配置transactions-defaults.properties
        Properties defaults = new Properties();
        loadPropertiesFromClasspath(defaults, DEFAULT_PROPERTIES_FILE_NAME);
        //读取classpath下,transactions.properties配置,覆盖transactions-defaults.properties中相同key的值
        Properties transactionsProperties = new Properties(defaults);
        loadPropertiesFromClasspath(transactionsProperties, TRANSACTIONS_PROPERTIES_FILE_NAME);
        //读取classpath下,jta.properties,覆盖transactions-defaults.properties、transactions.properties中相同key的值
        Properties jtaProperties = new Properties(transactionsProperties);
        loadPropertiesFromClasspath(jtaProperties, JTA_PROPERTIES_FILE_NAME);

        //读取通过java -Dcom.atomikos.icatch.file方式指定的自定义配置文件路径,覆盖之前的同名配置
        Properties customProperties = new Properties(jtaProperties);
        loadPropertiesFromCustomFilePath(customProperties);
        //最终构造一个ConfigProperties对象,来表示实际要使用的配置
        Properties finalProperties = new Properties(customProperties);
        return new ConfigProperties(finalProperties);
    }
  • 接下来重点关注, Configuration.init(), 进行初始化。
ublic static synchronized boolean init() {
        boolean startupInitiated = false;
        if (service_ == null) {
            startupInitiated = true;
           //SPI方式加载插件注册,无需过多关心
            addAllTransactionServicePluginServicesFromClasspath();
            ConfigProperties configProperties = getConfigProperties();
          //调用插件的beforeInit方法进行初始化话,无需过多关心
            notifyBeforeInit(configProperties);
          //进行事务日志恢复的初始化,很重要,接下来详解
            assembleSystemComponents(configProperties);
         //进入系统注解的初始化,一般重要
            initializeSystemComponents(configProperties);
            notifyAfterInit();
            if (configProperties.getForceShutdownOnVmExit()) {
                addShutdownHook(new ForceShutdownHook());
            }
        }
        return startupInitiated;
    }
  • 我们先来关注 assembleSystemComponents(configProperties); 进入它,进入com.atomikos.icatch.provider.imp.AssemblerImp.assembleTransactionService()方法:
@Override
    public TransactionServiceProvider assembleTransactionService(
            ConfigProperties configProperties) {
        RecoveryLog recoveryLog =null;
       //打印日志
        logProperties(configProperties.getCompletedProperties());
       //生成唯一名字
        String tmUniqueName = configProperties.getTmUniqueName();

        long maxTimeout = configProperties.getMaxTimeout();
        int maxActives = configProperties.getMaxActives();
        boolean threaded2pc = configProperties.getThreaded2pc();
      //SPI方式加载OltpLog ,这是最重要的扩展地方,如果用户没有SPI的方式去扩展那么就为null
        OltpLog oltpLog = createOltpLogFromClasspath();
        if (oltpLog == null) {
            LOGGER.logInfo("Using default (local) logging and recovery...");
                        //创建事务日志存储资源
            Repository repository = createRepository(configProperties);
            oltpLog = createOltpLog(repository);
            //??? Assemble recoveryLog
            recoveryLog = createRecoveryLog(repository);
        }
        StateRecoveryManagerImp    recoveryManager = new StateRecoveryManagerImp();
        recoveryManager.setOltpLog(oltpLog);
           //生成唯一id生成器,以后生成XID会用的到
        UniqueIdMgr idMgr = new UniqueIdMgr ( tmUniqueName );
        int overflow = idMgr.getMaxIdLengthInBytes() - MAX_TID_LENGTH;
        if ( overflow > 0 ) {
            // see case 73086
            String msg = "Value too long : " + tmUniqueName;
            LOGGER.logFatal ( msg );
            throw new SysException(msg);
        }
        return new TransactionServiceImp(tmUniqueName, recoveryManager, idMgr, maxTimeout, maxActives, !threaded2pc, recoveryLog);
    }
  • 我们重点来分析createOltpLogFromClasspath(), 采用SPI的加载方式来获取,默认这里会返回 null, 什么意思呢?
    就是当没有扩展的时候,atomikos,会创建框架自定义的资源,来存储事务日志。
private OltpLog createOltpLogFromClasspath() {
        OltpLog ret = null;
        ServiceLoader<OltpLogFactory> loader = ServiceLoader.load(OltpLogFactory.class,Configuration.class.getClassLoader());
        int i = 0;
        for (OltpLogFactory l : loader ) {
            ret = l.createOltpLog();
            i++;
        }
        if (i > 1) {
            String msg = "More than one OltpLogFactory found in classpath - error in configuration!";
            LOGGER.logFatal(msg);
            throw new SysException(msg);
        }
        return ret;
    }
  • 我们跟着进入 Repository repository = createRepository(configProperties);
    private CachedRepository createCoordinatorLogEntryRepository(
            ConfigProperties configProperties) throws LogException {
        //创建内存资源存储
        InMemoryRepository inMemoryCoordinatorLogEntryRepository = new InMemoryRepository();
       //进行初始化
        inMemoryCoordinatorLogEntryRepository.init();
       //创建使用文件存储资源作为backup
        FileSystemRepository backupCoordinatorLogEntryRepository = new FileSystemRepository();
       //进行初始化
        backupCoordinatorLogEntryRepository.init();
      //内存与file资源进行合并
        CachedRepository repository = new CachedRepository(inMemoryCoordinatorLogEntryRepository, backupCoordinatorLogEntryRepository);
        repository.init();
        return repository;
    }
  • 这里就会创建出 CachedRepository,里面包含了 InMemoryRepositoryFileSystemRepository
  • 回到主线 com.atomikos.icatch.config.Configuration.init(), 最后来分析下notifyAfterInit();
    private static void notifyAfterInit() {
         //进行插件的初始化
        for (TransactionServicePlugin p : tsListenersList_) {
            p.afterInit();
        }
        for (LogAdministrator a : logAdministrators_) {
            a.registerLogControl(service_.getLogControl());
        }
         //设置事务恢复服务,进行事务的恢复
        for (RecoverableResource r : resourceList_ ) {
            r.setRecoveryService(recoveryService_);
        }

    }
  • 插件的初始化会进入com.atomikos.icatch.jta.JtaTransactionServicePlugin.afterInit()
    public void afterInit() {
        TransactionManagerImp.installTransactionManager(Configuration.getCompositeTransactionManager(), autoRegisterResources);
          //如果我们自定义扩展了 OltpLog ,这里就会返回null,如果是null,那么XaResourceRecoveryManager就是null
        RecoveryLog recoveryLog = Configuration.getRecoveryLog();
        long maxTimeout = Configuration.getConfigProperties().getMaxTimeout();
        if (recoveryLog != null) {
            XaResourceRecoveryManager.installXaResourceRecoveryManager(new DefaultXaRecoveryLog(recoveryLog, maxTimeout),Configuration.getConfigProperties().getTmUniqueName());
        }

    }
  • 重点注意 RecoveryLog recoveryLog = Configuration.getRecoveryLog(); ,如果用户采用SPI的方式,扩展了com.atomikos.recovery.OltpLog这里就会返回 null。 如果是null,则不会对 XaResourceRecoveryManager 进行初始化。
  • 回到 notifyAfterInit(), 我们来分析 setRecoveryService
public void setRecoveryService ( RecoveryService recoveryService )
            throws ResourceException
    {

        if ( recoveryService != null ) {
            if ( LOGGER.isTraceEnabled() ) LOGGER.logTrace ( "Installing recovery service on resource "
                    + getName () );
            this.branchIdentifier=recoveryService.getName();
            recover();
        }
    }
  • 我们进入 recover() 方法:
 public void recover() {
        XaResourceRecoveryManager xaResourceRecoveryManager = XaResourceRecoveryManager.getInstance();
        //null for LogCloud recovery
        if (xaResourceRecoveryManager != null) {
            try {
                xaResourceRecoveryManager.recover(getXAResource());
            } catch (Exception e) {
                refreshXAResource(); //cf case 156968
            }

        }
    }
  • 看到最关键的注释了吗,如果用户采用SPI的方式,扩展了com.atomikos.recovery.OltpLog,那么XaResourceRecoveryManager 为null,则就会进行云端恢复,反之则进行事务恢复。 事务恢复很复杂,我们会单独来讲。

到这里atomikos的基本的初始化已经完成。

atomikos事务begin流程

我们知道,本地的事务,都会有一个 trainsaction.begin, 对应XA分布式事务来说也不另外,我们再把思路切换回XAShardingTransactionManager.begin(), 会调用com.atomikos.icatch.jta.TransactionManagerImp.begin()。流程图如下:

代码:

  public void begin ( int timeout ) throws NotSupportedException,
            SystemException
    {
        CompositeTransaction ct = null;
        ResumePreviousTransactionSubTxAwareParticipant resumeParticipant = null;

        ct = compositeTransactionManager.getCompositeTransaction();
        if ( ct != null && ct.getProperty (  JTA_PROPERTY_NAME ) == null ) {
            LOGGER.logWarning ( "JTA: temporarily suspending incompatible transaction: " + ct.getTid() +
                    " (will be resumed after JTA transaction ends)" );
            ct = compositeTransactionManager.suspend();
            resumeParticipant = new ResumePreviousTransactionSubTxAwareParticipant ( ct );
        }

        try {
      //创建事务补偿点
            ct = compositeTransactionManager.createCompositeTransaction ( ( ( long ) timeout ) * 1000 );
            if ( resumeParticipant != null ) ct.addSubTxAwareParticipant ( resumeParticipant );
            if ( ct.isRoot () && getDefaultSerial () )
                ct.setSerial ();
            ct.setProperty ( JTA_PROPERTY_NAME , "true" );
        } catch ( SysException se ) {
            String msg = "Error in begin()";
            LOGGER.logError( msg , se );
            throw new ExtendedSystemException ( msg , se );
        }
        recreateCompositeTransactionAsJtaTransaction(ct);
    }
  • 这里我们主要关注 compositeTransactionManager.createCompositeTransaction(),
public CompositeTransaction createCompositeTransaction ( long timeout ) throws SysException
    {
        CompositeTransaction ct = null , ret = null;

        ct = getCurrentTx ();
        if ( ct == null ) {
            ret = getTransactionService().createCompositeTransaction ( timeout );
            if(LOGGER.isDebugEnabled()){
                LOGGER.logDebug("createCompositeTransaction ( " + timeout + " ): "
                    + "created new ROOT transaction with id " + ret.getTid ());
            }
        } else {
             if(LOGGER.isDebugEnabled()) LOGGER.logDebug("createCompositeTransaction ( " + timeout + " )");
            ret = ct.createSubTransaction ();

        }

        Thread thread = Thread.currentThread ();
        setThreadMappings ( ret, thread );

        return ret;
    }
  • 创建了事务补偿点,然后把他放到了用当前线程作为key的Map当中,这里思考,为啥它不用 threadLocal

到这里atomikos的事务begin流程已经完成。 大家可能有些疑惑,begin好像什么都没有做,XA start 也没调用? 别慌,下一节继续来讲。

XATransactionDataSource getConnection() 流程

我们都知道想要执行SQL语句,必须要获取到数据库的connection。让我们再回到 XAShardingTransactionManager.getConnection() 最后会调用到org.apache.shardingsphere.transaction.xa.jta.datasourceXATransactionDataSource.getConnection()。流程图如下:

代码 :

 public Connection getConnection() throws SQLException, SystemException, RollbackException {
      //先检查是否已经有存在的connection,这一步很关心,也是XA的关键,因为XA事务,必须在同一个connection
        if (CONTAINER_DATASOURCE_NAMES.contains(dataSource.getClass().getSimpleName())) {
            return dataSource.getConnection();
        }
      //获取数据库连接
        Connection result = dataSource.getConnection();
      //转成XAConnection,其实是同一个连接
        XAConnection xaConnection = XAConnectionFactory.createXAConnection(databaseType, xaDataSource, result);
      //获取JTA事务定义接口
        Transaction transaction = xaTransactionManager.getTransactionManager().getTransaction();
        if (!enlistedTransactions.get().contains(transaction)) {
      //进行资源注册
            transaction.enlistResource(new SingleXAResource(resourceName, xaConnection.getXAResource()));
            transaction.registerSynchronization(new Synchronization() {
                @Override
                public void beforeCompletion() {
                    enlistedTransactions.get().remove(transaction);
                }

                @Override
                public void afterCompletion(final int status) {
                    enlistedTransactions.get().clear();
                }
            });
            enlistedTransactions.get().add(transaction);
        }
        return result;
    }
  • 首先第一步很关心,尤其是对shardingsphere来说,因为在一个事务里面,会有多个SQL语句,打到相同的数据库,所以对相同的数据库,必须获取同一个XAConnection,这样才能进行XA事务的提交与回滚。
  • 我们接下来关心 transaction.enlistResource(new SingleXAResource(resourceName, xaConnection.getXAResource()));, 会进入com.atomikos.icatch.jta.TransactionImp.enlistResource(), 代码太长,截取一部分。
try {
                restx = (XAResourceTransaction) res
                        .getResourceTransaction(this.compositeTransaction);

                // next, we MUST set the xa resource again,
                // because ONLY the instance we got as argument
                // is available for use now !
                // older instances (set in restx from previous sibling)
                // have connections that may be in reuse already
                // ->old xares not valid except for 2pc operations

                restx.setXAResource(xares);
                restx.resume();
            } catch (ResourceException re) {
                throw new ExtendedSystemException(
                        "Unexpected error during enlist", re);
            } catch (RuntimeException e) {
                throw e;
            }

            addXAResourceTransaction(restx, xares);
  • 我们直接看 restx.resume();
public synchronized void resume() throws ResourceException {
        int flag = 0;
        String logFlag = "";
        if (this.state.equals(TxState.LOCALLY_DONE)) {// reused instance
            flag = XAResource.TMJOIN;
            logFlag = "XAResource.TMJOIN";
        } else if (!this.knownInResource) {// new instance
            flag = XAResource.TMNOFLAGS;
            logFlag = "XAResource.TMNOFLAGS";
        } else
            throw new IllegalStateException("Wrong state for resume: "
                    + this.state);

        try {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.logDebug("XAResource.start ( " + this.xidToHexString
                        + " , " + logFlag + " ) on resource "
                        + this.resourcename
                        + " represented by XAResource instance "
                        + this.xaresource);
            }
            this.xaresource.start(this.xid, flag);

        } catch (XAException xaerr) {
            String msg = interpretErrorCode(this.resourcename, "resume",
                    this.xid, xaerr.errorCode);
            LOGGER.logWarning(msg, xaerr);
            throw new ResourceException(msg, xaerr);
        }
        setState(TxState.ACTIVE);
        this.knownInResource = true;
    }
  • 哦多尅,看见了吗,各位,看见了 this.xaresource.start(this.xid, flag); 了吗????,我们进去,假设我们使用的Mysql数据库:
 public void start(Xid xid, int flags) throws XAException {
        StringBuilder commandBuf = new StringBuilder(300);
        commandBuf.append("XA START ");
        appendXid(commandBuf, xid);
        switch(flags) {
        case 0:
            break;
        case 2097152:
            commandBuf.append(" JOIN");
            break;
        case 134217728:
            commandBuf.append(" RESUME");
            break;
        default:
            throw new XAException(-5);
        }

        this.dispatchCommand(commandBuf.toString());
        this.underlyingConnection.setInGlobalTx(true);
    }
  • 组装XA start Xid SQL语句,进行执行。

到这里,我们总结下,在获取数据库连接的时候,我们执行了XA协议接口中的 XA start xid

atomikos事务commit流程

好了,上面我们已经开启了事务,现在我们来分析下事务commit流程,我们再把视角切换回XAShardingTransactionManager.commit(),最后我们会进入com.atomikos.icatch.imp.CompositeTransactionImp.commit() 方法。流程图如下:

代码:

 public void commit () throws HeurRollbackException, HeurMixedException,
            HeurHazardException, SysException, SecurityException,
            RollbackException
    {
       //首先更新下事务日志的状态
        doCommit ();
        setSiblingInfoForIncoming1pcRequestFromRemoteClient();

        if ( isRoot () ) {
         //真正的commit操作
          coordinator.terminate ( true );
        }
    }
  • 我们关注 coordinator.terminate ( true );
 protected void terminate ( boolean commit ) throws HeurRollbackException,
            HeurMixedException, SysException, java.lang.SecurityException,
            HeurCommitException, HeurHazardException, RollbackException,
            IllegalStateException

    {
        synchronized ( fsm_ ) {
            if ( commit ) {
                     //判断有几个参与者,如果只有一个,直接提交
                if ( participants_.size () <= 1 ) {
                    commit ( true );
                } else {
                                //否则,走XA 2阶段提交流程,先prepare, 再提交
                    int prepareResult = prepare ();
                    // make sure to only do commit if NOT read only
                    if ( prepareResult != Participant.READ_ONLY )
                        commit ( false );
                }
            } else {
                rollback ();
            }
        }
    }
  • 首先会判断参与者的个数,这里我们可以理解为MySQL的database数量,如果只有一个,退化成一阶段,直接提交。
    如果有多个,则走标准的XA二阶段提交流程。
  • 我们来看 prepare (); 流程,最后会走到com.atomikos.icatch.imp.PrepareMessage.send() ---> com.atomikos.datasource.xa.XAResourceTransaction.prepare()
int ret = 0;
        terminateInResource();

        if (TxState.ACTIVE == this.state) {
            // tolerate non-delisting apps/servers
            suspend();
        }

        // duplicate prepares can happen for siblings in serial subtxs!!!
        // in that case, the second prepare just returns READONLY
        if (this.state == TxState.IN_DOUBT)
            return Participant.READ_ONLY;
        else if (!(this.state == TxState.LOCALLY_DONE))
            throw new SysException("Wrong state for prepare: " + this.state);
        try {
            // refresh xaresource for MQSeries: seems to close XAResource after
            // suspend???
            testOrRefreshXAResourceFor2PC();
            if (LOGGER.isTraceEnabled()) {
                LOGGER.logTrace("About to call prepare on XAResource instance: "
                        + this.xaresource);
            }
            ret = this.xaresource.prepare(this.xid);

        } catch (XAException xaerr) {
            String msg = interpretErrorCode(this.resourcename, "prepare",
                    this.xid, xaerr.errorCode);
            if (XAException.XA_RBBASE <= xaerr.errorCode
                    && xaerr.errorCode <= XAException.XA_RBEND) {
                LOGGER.logWarning(msg, xaerr); // see case 84253
                throw new RollbackException(msg);
            } else {
                LOGGER.logError(msg, xaerr);
                throw new SysException(msg, xaerr);
            }
        }
        setState(TxState.IN_DOUBT);
        if (ret == XAResource.XA_RDONLY) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.logDebug("XAResource.prepare ( " + this.xidToHexString
                        + " ) returning XAResource.XA_RDONLY " + "on resource "
                        + this.resourcename
                        + " represented by XAResource instance "
                        + this.xaresource);
            }
            return Participant.READ_ONLY;
        } else {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.logDebug("XAResource.prepare ( " + this.xidToHexString
                        + " ) returning OK " + "on resource "
                        + this.resourcename
                        + " represented by XAResource instance "
                        + this.xaresource);
            }
            return Participant.READ_ONLY + 1;
        }
  • 终于,我们看到了这么一句 ret = this.xaresource.prepare(this.xid); 但是等等,我们之前不是说了,XA start xid 以后要先 XA end xid 吗? 答案就在 suspend(); 里面。
public synchronized void suspend() throws ResourceException {

        // BugzID: 20545
        // State may be IN_DOUBT or TERMINATED when a connection is closed AFTER
        // commit!
        // In that case, don't call END again, and also don't generate any
        // error!
        // This is required for some hibernate connection release strategies.
        if (this.state.equals(TxState.ACTIVE)) {
            try {
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.logDebug("XAResource.end ( " + this.xidToHexString
                            + " , XAResource.TMSUCCESS ) on resource "
                            + this.resourcename
                            + " represented by XAResource instance "
                            + this.xaresource);
                }
                 //执行了 xa end 语句
                this.xaresource.end(this.xid, XAResource.TMSUCCESS);

            } catch (XAException xaerr) {
                String msg = interpretErrorCode(this.resourcename, "end",
                        this.xid, xaerr.errorCode);
                if (LOGGER.isTraceEnabled())
                    LOGGER.logTrace(msg, xaerr);
                // don't throw: fix for case 102827
            }
            setState(TxState.LOCALLY_DONE);
        }
    }

到了这里,我们已经执行了 XA start xid -> XA end xid --> XA prepare xid, 接下来就是最后一步 commit

  • 我们再回到 terminate(false) 方法,来看 commit()流程。其实和 prepare流程一样,最后会走到 com.atomikos.datasource.xa.XAResourceTransaction.commit()。 commit执行完,数据提交
//繁杂代码过多,就显示核心的
this.xaresource.commit(this.xid, onePhase);

思考:这里的参与者提交是在一个循环里面,一个一个提交的,如果之前的提交了,后面的参与者提交的时候,挂了,就会造成数据的不一致性。

Atomikos rollback() 流程


上面我们已经分析了commit流程,其实rollback流程和commit流程一样,我们在把目光切换回 org.apache.shardingsphere.transaction.xa.XAShardingTransactionManager.rollback() ,最后会执行到com.atomikos.icatch.imp.CompositeTransactionImp.rollback()

    public void rollback () throws IllegalStateException, SysException
    {
        //清空资源,更新事务日志状态等
        doRollback ();
        if ( isRoot () ) {
            try {
                coordinator.terminate ( false );
            } catch ( Exception e ) {
                throw new SysException ( "Unexpected error in rollback: " + e.getMessage (), e );
            }
        }
    }
  • 重点关注 coordinator.terminate ( false ); ,这个和 commit流程是一样的,只不过在 commit流程里面,参数传的是true。
 protected void terminate ( boolean commit ) throws HeurRollbackException,
            HeurMixedException, SysException, java.lang.SecurityException,
            HeurCommitException, HeurHazardException, RollbackException,
            IllegalStateException

    {
        synchronized ( fsm_ ) {
            if ( commit ) {
                if ( participants_.size () <= 1 ) {
                    commit ( true );
                } else {
                    int prepareResult = prepare ();
                    // make sure to only do commit if NOT read only
                    if ( prepareResult != Participant.READ_ONLY )
                        commit ( false );
                }
            } else {
                 //如果是false,走的是rollback
                rollback ();
            }
        }
    }
  • 我们重点关注 rollback() ,最后会走到com.atomikos.datasource.xa.XAResourceTransaction.rollback()
public synchronized void rollback()
            throws HeurCommitException, HeurMixedException,
            HeurHazardException, SysException {
        terminateInResource();

        if (rollbackShouldDoNothing()) {
            return;
        }
        if (this.state.equals(TxState.TERMINATED)) {
            return;
        }

        if (this.state.equals(TxState.HEUR_MIXED))
            throw new HeurMixedException();
        if (this.state.equals(TxState.HEUR_COMMITTED))
            throw new HeurCommitException();
        if (this.xaresource == null) {
            throw new HeurHazardException("XAResourceTransaction "
                    + getXid() + ": no XAResource to rollback?");
        }

        try {
            if (this.state.equals(TxState.ACTIVE)) { // first suspend xid
                suspend();
            }

            // refresh xaresource for MQSeries: seems to close XAResource after
            // suspend???
            testOrRefreshXAResourceFor2PC();
            if (LOGGER.isDebugEnabled()) {
                LOGGER.logDebug("XAResource.rollback ( " + this.xidToHexString
                        + " ) " + "on resource " + this.resourcename
                        + " represented by XAResource instance "
                        + this.xaresource);
            }
            this.xaresource.rollback(this.xid);

先在supend()方法里面执行了 XA end xid 语句, 接下来执行 this.xaresource.rollback(this.xid); 进行数据的回滚。

Atomikos-recover 流程

说事务恢复流程之前,我们来讨论下,会啥会出现事务恢复?XA二阶段提交协议不是强一致性的吗?要解答这个问题,我们就要来看看XA二阶段协议有什么问题?

问题一 :单点故障

由于协调者的重要性,一旦协调者TM发生故障。参与者RM会一直阻塞下去。尤其在第二阶段,协调者发生故障,那么所有的参与者还都处于锁定事务资源的状态中,而无法继续完成事务操作。(如果是协调者挂掉,可以重新选举一个协调者,但是无法解决因为协调者宕机导致的参与者处于阻塞状态的问题)

问题二 :数据不一致

数据不一致。在二阶段提交的阶段二中,当协调者向参与者发送commit请求之后,发生了局部网络异常或者在发送commit请求过程中协调者发生了故障,这回导致只有一部分参与者接受到了commit请求。而在这部分参与者接到commit请求之后就会执行commit操作。但是其他部分未接到commit请求的机器则无法执行事务提交。于是整个分布式系统便出现了数据不一致性的现象。

如何解决?

解决的方案简单,就是我们在事务的操作的每一步,我们都需要对事务状态的日志进行人为的记录,我们可以把日志记录存储在我们想存储的地方,可以是本地存储,也可以中心化的存储。atomikos的开源版本,我们之前也分析了,它是使用内存 + file的方式,存储在本地,这样的话,如果在一个集群系统里面,如果有节点宕机,日志又存储在本地,所以事务不能及时的恢复(需要重启服务)。

Atomikos 多场景下事务恢复。

Atomikos 提供了二种方式,来应对不同场景下的异常情况。

  • 场景一: 服务节点不宕机,因为其他的原因,产生需要事务恢复的情况。 这个时候才要定时任务进行恢复。

具体的代码 com.atomikos.icatch.imp.TransactionServiceImp.init() 方法,会初始化一个定时任务,进行事务的恢复。

public synchronized void init ( Properties properties ) throws SysException
    {
        shutdownInProgress_ = false;
        control_ = new com.atomikos.icatch.admin.imp.LogControlImp ( (AdminLog) this.recoveryLog );
        ConfigProperties configProperties = new ConfigProperties(properties);
        long recoveryDelay = configProperties.getRecoveryDelay();
        recoveryTimer = new PooledAlarmTimer(recoveryDelay);
        recoveryTimer.addAlarmTimerListener(new AlarmTimerListener() {
            @Override
            public void alarm(AlarmTimer timer) {
                //进行事务恢复
                performRecovery();

            }
        });

        TaskManager.SINGLETON.executeTask(recoveryTimer);
        initialized_ = true;
    }
  • 最终会进入com.atomikos.datasource.xa.XATransactionalResource.recover() 方法。
   public void recover() {
        XaResourceRecoveryManager xaResourceRecoveryManager = XaResourceRecoveryManager.getInstance();
        if (xaResourceRecoveryManager != null) { //null for LogCloud recovery
            try {
                xaResourceRecoveryManager.recover(getXAResource());
            } catch (Exception e) {
                refreshXAResource(); //cf case 156968
            }

        }
    }
  • 场景二: 当服务节点宕机重启动过程中进行事务的恢复。具体实现在com.atomikos.datasource.xa.XATransactionalResource.setRecoveryService()方法里面
 @Override
    public void setRecoveryService ( RecoveryService recoveryService )
            throws ResourceException
    {

        if ( recoveryService != null ) {
            if ( LOGGER.isTraceEnabled() ) LOGGER.logTrace ( "Installing recovery service on resource "
                    + getName () );
            this.branchIdentifier=recoveryService.getName();
         //进行事务恢复
            recover();
        }

    }

com.atomikos.datasource.xa.XATransactionalResource.recover() 流程详解。

主代码:

    public void recover(XAResource xaResource) throws XAException {
      // 根据XA recovery 协议获取 xid
        List<XID> xidsToRecover = retrievePreparedXidsFromXaResource(xaResource);
        Collection<XID> xidsToCommit;
        try {
            // xid 与日志记录的xid进行匹配
            xidsToCommit = retrieveExpiredCommittingXidsFromLog();
            for (XID xid : xidsToRecover) {
                if (xidsToCommit.contains(xid)) {
            //执行 XA commit xid 进行提交
                    replayCommit(xid, xaResource);
                } else {
                    attemptPresumedAbort(xid, xaResource);
                }
            }
        } catch (LogException couldNotRetrieveCommittingXids) {
            LOGGER.logWarning("Transient error while recovering - will retry later...", couldNotRetrieveCommittingXids);
        }
    }
  • 我们来看一下如何根据 XA recovery 协议获取RM端存储的xid。 进入方法 retrievePreparedXidsFromXaResource(xaResource), 最后进入 com.atomikos.datasource.xa.RecoveryScan.recoverXids()方法。
public static List<XID> recoverXids(XAResource xaResource, XidSelector selector) throws XAException {
        List<XID> ret = new ArrayList<XID>();

        boolean done = false;
        int flags = XAResource.TMSTARTRSCAN;
        Xid[] xidsFromLastScan = null;
        List<XID> allRecoveredXidsSoFar = new ArrayList<XID>();
        do {
            xidsFromLastScan = xaResource.recover(flags);
            flags = XAResource.TMNOFLAGS;
            done = (xidsFromLastScan == null || xidsFromLastScan.length == 0);
            if (!done) {
                // TEMPTATIVELY SET done TO TRUE
                // TO TOLERATE ORACLE 8.1.7 INFINITE
                // LOOP (ALWAYS RETURNS SAME RECOVER
                // SET). IF A NEW SET OF XIDS IS RETURNED
                // THEN done WILL BE RESET TO FALSE
                done = true;
                for ( int i = 0; i < xidsFromLastScan.length; i++ ) {
                    XID xid = new XID ( xidsFromLastScan[i] );
                    // our own XID implements equals and hashCode properly
                    if (!allRecoveredXidsSoFar.contains(xid)) {
                        // a new xid is returned -> we can not be in a recovery loop -> go on
                        allRecoveredXidsSoFar.add(xid);
                        done = false;
                        if (selector.selects(xid)) {
                            ret.add(xid);
                        }
                    }
                }
            }
        } while (!done);

        return ret;
    }
  • 我们重点关注xidsFromLastScan = xaResource.recover(flags); 这个方法,如果我们使用MySQL,那么久会进入 MysqlXAConnection.recover()方法。执行 XA recovery xid 语句来获取 xid。
 protected static Xid[] recover(Connection c, int flag) throws XAException {
        /*
         * The XA RECOVER statement returns information for those XA transactions on the MySQL server that are in the PREPARED state. (See Section 13.4.7.2, ???XA
         * Transaction States???.) The output includes a row for each such XA transaction on the server, regardless of which client started it.
         *
         * XA RECOVER output rows look like this (for an example xid value consisting of the parts 'abc', 'def', and 7):
         *
         * mysql> XA RECOVER;
         * +----------+--------------+--------------+--------+
         * | formatID | gtrid_length | bqual_length | data |
         * +----------+--------------+--------------+--------+
         * | 7 | 3 | 3 | abcdef |
         * +----------+--------------+--------------+--------+
         *
         * The output columns have the following meanings:
         *
         * formatID is the formatID part of the transaction xid
         * gtrid_length is the length in bytes of the gtrid part of the xid
         * bqual_length is the length in bytes of the bqual part of the xid
         * data is the concatenation of the gtrid and bqual parts of the xid
         */

        boolean startRscan = ((flag & TMSTARTRSCAN) > 0);
        boolean endRscan = ((flag & TMENDRSCAN) > 0);

        if (!startRscan && !endRscan && flag != TMNOFLAGS) {
            throw new MysqlXAException(XAException.XAER_INVAL, Messages.getString("MysqlXAConnection.001"), null);
        }

        //
        // We return all recovered XIDs at once, so if not  TMSTARTRSCAN, return no new XIDs
        //
        // We don't attempt to maintain state to check for TMNOFLAGS "outside" of a scan
        //

        if (!startRscan) {
            return new Xid[0];
        }

        ResultSet rs = null;
        Statement stmt = null;

        List<MysqlXid> recoveredXidList = new ArrayList<MysqlXid>();

        try {
            // TODO: Cache this for lifetime of XAConnection
            stmt = c.createStatement();

            rs = stmt.executeQuery("XA RECOVER");

            while (rs.next()) {
                final int formatId = rs.getInt(1);
                int gtridLength = rs.getInt(2);
                int bqualLength = rs.getInt(3);
                byte[] gtridAndBqual = rs.getBytes(4);

                final byte[] gtrid = new byte[gtridLength];
                final byte[] bqual = new byte[bqualLength];

                if (gtridAndBqual.length != (gtridLength + bqualLength)) {
                    throw new MysqlXAException(XAException.XA_RBPROTO, Messages.getString("MysqlXAConnection.002"), null);
                }

                System.arraycopy(gtridAndBqual, 0, gtrid, 0, gtridLength);
                System.arraycopy(gtridAndBqual, gtridLength, bqual, 0, bqualLength);

                recoveredXidList.add(new MysqlXid(gtrid, bqual, formatId));
            }
        } catch (SQLException sqlEx) {
            throw mapXAExceptionFromSQLException(sqlEx);
        } finally {
            if (rs != null) {
                try {
                    rs.close();
                } catch (SQLException sqlEx) {
                    throw mapXAExceptionFromSQLException(sqlEx);
                }
            }

            if (stmt != null) {
                try {
                    stmt.close();
                } catch (SQLException sqlEx) {
                    throw mapXAExceptionFromSQLException(sqlEx);
                }
            }
        }

        int numXids = recoveredXidList.size();

        Xid[] asXids = new Xid[numXids];
        Object[] asObjects = recoveredXidList.toArray();

        for (int i = 0; i < numXids; i++) {
            asXids[i] = (Xid) asObjects[i];
        }

        return asXids;
    }
  • 这里要注意如果Mysql的版本 <5.7.7 ,则不会有任何数据,在以后的版本中Mysql进行了修复,因此如果我们想要使用MySQL充当RM,版本必须 >= 5.7.7 ,原因是:
MySQL 5.6版本在客户端退出的时候,自动把已经prepare的事务回滚了,那么MySQL为什么要这样做?这主要取决于MySQL的内部实现,MySQL 5.7以前的版本,对于prepare的事务,MySQL是不会记录binlog的(官方说是减少fsync,起到了优化的作用)。只有当分布式事务提交的时候才会把前面的操作写入binlog信息,所以对于binlog来说,分布式事务与普通的事务没有区别,而prepare以前的操作信息都保存在连接的IO_CACHE中,如果这个时候客户端退出了,以前的binlog信息都会被丢失,再次重连后允许提交的话,会造成Binlog丢失,从而造成主从数据的不一致,所以官方在客户端退出的时候直接把已经prepare的事务都回滚了!
  • 回到主线再从自己记录的事务日志里面获取XID
  Collection<XID> xidsToCommit = retrieveExpiredCommittingXidsFromLog();
  • 我们来看下获取事务日志里面的XID的retrieveExpiredCommittingXidsFromLog()方法。 然后进入com.atomikos.recovery.imp.RecoveryLogImp.getCommittingParticipants()方法。
public Collection<ParticipantLogEntry> getCommittingParticipants()
            throws LogReadException {
        Collection<ParticipantLogEntry> committingParticipants = new HashSet<ParticipantLogEntry>();
        Collection<CoordinatorLogEntry> committingCoordinatorLogEntries = repository.findAllCommittingCoordinatorLogEntries();

        for (CoordinatorLogEntry coordinatorLogEntry : committingCoordinatorLogEntries) {
            for (ParticipantLogEntry participantLogEntry : coordinatorLogEntry.participants) {
                committingParticipants.add(participantLogEntry);
            }
        }
        return committingParticipants;
    }

到这里我们来简单介绍一下,事务日志的存储结构。首先是 CoordinatorLogEntry,这是一次XA事务的所有信息实体类。

public class CoordinatorLogEntry implements Serializable {

  //全局事务id
     public final String id;

   //是否已经提交
    public final boolean wasCommitted;

    /**
     * Only for subtransactions, null otherwise.
     */
    public final String superiorCoordinatorId;

   //参与者集合
    public final ParticipantLogEntry[] participants;
}
  • 再来看一下参与者实体类 ParticipantLogEntry :
public class ParticipantLogEntry implements Serializable {

    private static final long serialVersionUID = 1728296701394899871L;

    /**
     * The ID of the global transaction as known by the transaction core.
     */

    public final String coordinatorId;

    /**
     * Identifies the participant within the global transaction.
     */

    public final String uri;

    /**
     * When does this participant expire (expressed in millis since Jan 1, 1970)?
     */

    public final long expires;

    /**
     * Best-known state of the participant.
     */
    public final TxState state;

    /**
     * For diagnostic purposes, null if not relevant.
     */
    public final String resourceName;
}
  • 回到com.atomikos.recovery.xa.DefaultXaRecoveryLog.getExpiredCommittingXids() 方法,可以到获取了一次XA事务过程中,存储的事务日志中的xid。
public Set<XID> getExpiredCommittingXids() throws LogReadException {
        Set<XID> ret = new HashSet<XID>();
        Collection<ParticipantLogEntry> entries = log.getCommittingParticipants();
        for (ParticipantLogEntry entry : entries) {
            if (expired(entry) && !http(entry)) {
                XID xid = new XID(entry.coordinatorId, entry.uri);
                ret.add(xid);
            }
        }
        return ret;
    }
  • 如果从RM中通过XA recovery取出的XID,包含在从事务日志中取出的XID,则进行commit,否则进行rollback.
List<XID> xidsToRecover = retrievePreparedXidsFromXaResource(xaResource);
        Collection<XID> xidsToCommit;
        try {
            xidsToCommit = retrieveExpiredCommittingXidsFromLog();
            for (XID xid : xidsToRecover) {
                if (xidsToCommit.contains(xid)) {
                    replayCommit(xid, xaResource);
                } else {
                    attemptPresumedAbort(xid, xaResource);
                }
            }
        } catch (LogException couldNotRetrieveCommittingXids) {
            LOGGER.logWarning("Transient error while recovering - will retry later...", couldNotRetrieveCommittingXids);
        }
  • replayCommit 方法如下:
private void replayCommit(XID xid, XAResource xaResource) {
        if (LOGGER.isDebugEnabled()) LOGGER.logDebug("Replaying commit of xid: " + xid);
        try {
      //进行事务提交
            xaResource.commit(xid, false);
     //更新事务日志
            log.terminated(xid);
        } catch (XAException e) {
            if (alreadyHeuristicallyTerminatedByResource(e)) {
                handleHeuristicTerminationByResource(xid, xaResource, e, true);
            } else if (xidTerminatedInResourceByConcurrentCommit(e)) {
                log.terminated(xid);
            } else {
                LOGGER.logWarning("Transient error while replaying commit - will retry later...", e);
            }
        }
    }
  • attemptPresumedAbort(xid, xaResource); 方法如下:
private void attemptPresumedAbort(XID xid, XAResource xaResource) {
        try {
            log.presumedAborting(xid);
            if (LOGGER.isDebugEnabled()) LOGGER.logDebug("Presumed abort of xid: " + xid);
            try {
         //进行回滚
                xaResource.rollback(xid);
        //更新日志状态
                log.terminated(xid);
            } catch (XAException e) {
                if (alreadyHeuristicallyTerminatedByResource(e)) {
                    handleHeuristicTerminationByResource(xid, xaResource, e, false);
                } else if (xidTerminatedInResourceByConcurrentRollback(e)) {
                    log.terminated(xid);
                } else {
                    LOGGER.logWarning("Unexpected exception during recovery - ignoring to retry later...", e);
                }
            }
        } catch (IllegalStateException presumedAbortNotAllowedInCurrentLogState) {
            // ignore to retry later if necessary
        } catch (LogException logWriteException) {
            LOGGER.logWarning("log write failed for Xid: "+xid+", ignoring to retry later", logWriteException);
        }
    }

总结

文章到此,已经写的很长很多了,我们分析了ShardingSphere对于XA方案,提供了一套SPI解决方案,对Atomikos进行了整合,也分析了Atomikos初始化流程,开始事务流程,获取连接流程,提交事务流程,回滚事务流程,事务恢复流程。希望对大家理解XA的原理有所帮助。

加入我们

Apache ShardingSphere 一直践行Apache Way的开源之道,社区完全开放与平等,人人享受开源带来的快乐。

地址: https://github.com/apache/sha...

作者介绍: 肖宇,Apache ShardingSphere Committer,开源hmily分布式事务框架作者,
开源soul网关作者,热爱开源,追求写优雅代码。目前就职入京东数科,参与ShardingSphere的开源建设,以及分布式数据库的研发工作。

查看原文

赞 0 收藏 0 评论 0

ShardingSphere 发布了文章 · 11月13日

Apache ShardingSphere 5.0.0-alpha版本发布

本期看点

Apache ShardingSphere是一套开源的分布式数据库中间件解决方案组成的生态圈,它由Sharding-JDBC、Sharding-Proxy和Sharding-Sidecar(规划中)这3款相互独立的产品组成。他们均提供标准化的数据分片、分布式事务、数据迁移、数据库治理和管控界面功能,可适用于如Java同构、异构语言、容器、云原生等各种多样化的应用场景。

本次版本发布距离上次 4.1.1 的发布已有5个月有余,在这期间 Apache ShardingSphere不断的打磨优化,修复社区反馈的问题,加强功能和开发新特性。

在这里我们很高兴的宣布 5.0.0-alpha 的发布!大家的等待是值得的。5.x 是 Apache ShardingSphere从分库分表中间件向分布式数据库生态转化的里程碑,从 4.x 版本后期开始打磨的可插拔架构在 5.x 版本已逐渐成型,项目的设计理念和 API 都进行了大幅提升。欢迎大家测试使用!

5.0.0-alpha 具体版本发布信息如下:

A.
新特性

1.可插拔架构全面上线,支持开发者通过SPI机制扩展功能。
(扩展点请参见开发者手册:https://shardingsphere.apache...
2.提供独立SQL解析功能,用于解析多数据库方言。
3.提供RDL(Rule Definition Language)语句,支持使用SQL在线创建分片规则。
4.新增影子数据库功能。

B.
编译 & 依赖

1.升级JDK的最低支持版本至Java8。
2.更新Google Guava库到29.0-jre版本。
3.更新Zookeeper 至 3.6.x 版本,并更新curator至5.1.0版本。

C.
API 变更

1.全新分片/数据加密/影子库/主从规则配置API。
2.全新分片策略及分片算法API。
3.全新弹性迁移创建任务的API。
4.删除DefaultDataSourceName配置项,由ShardingSphere托管所有数据源。
5.属性配置项分隔符由点‘.’修改为减号‘-’。
6.参数allow.range.query.with.inline.sharding由全局参数调整至分片算法参数。

D.
重构

1.依据数据库方言,重构解析模块域模型对象。
2.使用SPI机制重构元数据在线变更处理。
3.Orchestration模块重名为Governance模块。
4.MasterSlave模块重名为QueryReplica模块。
5.重构Governance注册中心中的元数据结构。
6.ShardingSphere UI合并配置中心和注册中心显示布局。

E.
增强

1.MySQL SQL 和 PostgreSQL语法定义及解析支持增强。
2.增强对各方言数据库子查询的支持度。
3.支持对非分片表使用MySQL视图操作。
4.ShardingSphere Proxy支持对非分片表使用MySQL存储函数、存储过程操作。
5.支持使用SQLServer Top语法。
6.优化接入端metadata加载方式,提高启动速度。
7.优化批量插入性能。
8.接入端支持使用Oracle RAC连接串。
9.XA事务管理器增加对Oracle数据库的支持。
10.ShardingSphere Proxy支持使用p6sy驱动。
11.迁移工具支持断点续传功能。
12.迁移工具支持使用ShardingSphere JDBC迁移数据至新集群。

F.
漏洞修复

1.修复处理OrderBy条件时,别名改写错误问题。
2.修复MySQL Insert语句包含表达式时,SQL改写错误问题。
3.修复Update on duplicate SQL中参数计算错误问题。
4.修复批量插入时,generatedKeys获取错误的问题。
5.修复DML语句更新操作多表校验异常问题。
6.修复表不存在时执行SQL导致NPE问题。
7.修复对不在配置规则中的表使用Show table命令的报错问题。
8.修复Oracle数据库在多用户场景下元数据加载错误问题。
9.修复不能在线启用从库节点问题。
10.修复ShardingSphere JDBC不支持PostgreSQL数组类型问题。
11.修复ShardingSphere Proxy在查询超长blob数据时无响应问题。

G.
变更日志 MILESTONE

https://github.com/apache/sha...

H.
ShardingSphere-UI

1.合并配置中心和注册中心。
2.支持配置etcd注册中心。
3.支持查看metadata。
4.支持动态删除schema。

社区建设

Apache ShardingSphere 在社区建设方面也取得了较大的成功,ShardingSphere 自开源以来一直备受关注,在GitHub上一直保持着稳定的增长趋势,截止目前已经突破12k+ stars。
并且已登记的使用公司/组织140+且覆盖了各行各业,如互联网金融,物流,在线教育,企业服务,甚至还包括政府机关。未声明的公司不计其数,无法准确统计。感谢大家对社区建设的支持。
此外,Apache ShardingSphere自开源以来共产生164位 Contributor,29位 Committer(含17位PMC),在此也要感谢他们对社区的贡献。

点击查看Contributor列表:https://github.com/apache/inc...

欢迎更多的用户与使用公司查看与登记:https://shardingsphere.apache...


Apache ShardingSphere不断践行Apache Way,致力于打造充满活力、规范、互助的社区!开源路上,我们欢迎你的加入。

项目地址:
https://github.com/apache/sha...
https://github.com/apache/sha...

更多信息请浏览官网:
https://shardingsphere.apache...

扫二维码|关注我们
image

查看原文

赞 0 收藏 0 评论 0

ShardingSphere 发布了文章 · 11月13日

好消息!Elastic Job 3.0.0-beta 版本正式发布

本期看点

本周Apache ShardingSphere团队很高兴的向大家宣布:Apache
ShardingSphere ElasticJob-3.0.0-beta和ShardingSphere ElasticJob UI-3.0.0-beta的新版本正式发布了!

ElasticJob
是一个分布式调度解决方案,提供分布式任务的分片,弹性伸缩,全自动发现,基于时间驱动、数据驱动、常驻任务和临时任务的多任务类型,任务聚合和动态调配资源,故障检测、自动修复,失效转移和重试,完善的运维平台和管理工具,以及对云原生的良好支持等功能特性,可以全面满足企业对于任务管理和批量作业的全面调度处理能力。

ElasticJob于2020年5月28日成为Apache ShardingSphere子项目。

本次发布版本经过不断的打磨优化
做出的调整如下,欢迎大家测试和使用:

01

ElasticJob-3.0.0-beta

API 变更
API changes

  1. 重构作业监听器配置。
  2. 重构作业错误器程序配置。
  3. 重构作业跟踪配置。

新特性
New Feature

  1. 支持HTTP类型作业。
  2. 从内核模块中删除Spring Boot依赖项。
  3. 作业执行错误时支持电子邮件、微信、钉钉的通知机制。

Bug修复
Bug fixes

  1. 修复One-Off Job无法分布式执行的问题。
  2. 修复使用PostgreSQL作为事件跟踪存储时,会重复创建已有的表导致事件跟踪不可用的问题。
  3. 修复重新分片标志设置不正确时可能产生的死锁问题。

02

ElasticJob UI-3.0.0-beta

增强
Enhancement

  1. 在缺少JDBC驱动程序时提供更明确的报错信息。
  2. 支持作业自定义属性的增删改查管控。
  3. 在作业运行历史记录页面中,增加作业名称和服务器IP显示项。
  4. 升级Dockerfile。

GitHub地址:
https://github.com/apache/sha...
https://github.com/apache/sha...

官方网站:
https://shardingsphere.apache...

下载链接:
https://shardingsphere.apache...

扫码关注我们
image

查看原文

赞 0 收藏 0 评论 0

ShardingSphere 发布了文章 · 11月13日

ElasticJob后续设计规划

本文概览
产品定位
架构设计
ElasticJob-Lite 和 ElasticJob-Cloud 调整
模块规划

· 任务触发
· 资源治理
· 任务治理
· 产品形态

关于社区

1.产品定位

ElasticJob 目前是基于定时任务的分片调度中间件,在 ElasticJob-Cloud 中增加了资源治理的能力。
未来的 ElasticJob 希望将功能划分为独立的三个部分:任务触发、资源管理、任务治理。

任务触发是必选模块,其他两个模块是可选的。只有任务触发,相当于降级为 QuartZ。
任务触发 + 任务治理可以理解为 ElasticJob 的现状。在任务触发的同时,增加分布式治理和任务分片的能力,未来的基于有向无环图的任务编排也属于任务治理模块。
任务触发 + 资源治理可以理解为类似于操作系统的任务调度机制。在任务执行时增加资源的管控,在资源不足的情况下将任务排队,资源管控可以对接 Kubernetes 和 Apache Mesos。目前的 ElasticJob-Cloud 则是任务触发 + Mesos 资源治理实现方式。

任务触发 + 任务治理 + 资源治理是 ElasticJob 未来的全部能力,而 ElasticJob-Lite 和 ElasticJob-Cloud 则仅仅是部署形态不同。

2.架构设计

ElasticJob 希望采用可插拔架构设计,将功能模块通过 SPI 动态织入现有架构体系。针对于当前设计的三大功能模块,其可插拔设计分别是:

任务触发:目前是基于 CRON 表达式和一次性调度两种触发形式。希望调整为调度 SPI + 调度实现(CRON,One-Off,其他...)。
资源治理:目前没有独立的资源治理抽象层。希望未来在增加资源治理 SPI 的同时,增加基于Apache Mesos、Kubernetes 和 NoDep 的资源治理实现模块。
任务治理:目前只有任务分片和高可用治理两部分。希望未来提供任务治理 SPI,并将分片和高可用作为其实现模块,并在此基础上增加 DAG 治理能力。任务治理的各个模块相互隔离且可叠加。

  1. ElasticJob-Lite
    和 ElasticJob-Cloud 调整

ElasticJob-Lite 和 ElasticJob-Cloud 调整为仅仅是部署形态不同。
ElasticJob-Lite 采用无中心化的 jar 部署形态,提供 spring 等框架的接入层,适用于轻量级应用。
ElasticJob-Cloud 采用中心化的 server 部署形态,适用于集中化的作业云管平台。

  1. 模块规划

任务触发:

Trigger API
Trigger SPI
Trigger Kernel
CRON Trigger
One-Off Trigger
Customized Trigger

资源治理:

Resource Management API
Resource Management SPI
Resource Management Kernel
NoDep Resource Management
Mesos Resource Management
Kubernetes Resource Management

任务治理:

Job Governance API
Job Governance SPI
Job Governance Kernel
Sharding Job
HA Job
DAG Job

产品形态:

ElasticJob-Lite 架构调整
ElasticJob-Cloud 架构调整
ElasticJob-Cloud 剥离 Mesos

关于社区
欢迎开源爱好者加入 ElasticJob 社区的建设:

GitHub 地址:https://github.com/apache/sha...

官方网站:http://shardingsphere.apache....

长按二维码加入社区
image

查看原文

赞 0 收藏 0 评论 0

ShardingSphere 发布了文章 · 10月20日

Apache ShardingSphere荣获可信开源项目奖项

编辑:齐洋
图片来源见logo

2020年10月16日,由中国信息通信研究院主办,云计算标准和开源推进委员会承办的“2020云计算开源产业大会”在北京线下和网络直播的两种方式盛大开幕,在本次大会上正式发布了2020年可信开源评估结果,Apache ShardingSphere 荣获OSCAR可信开源项目的奖项!

1.png

OSCAR开源评估是中国信通院针对开源现状启动的评估工作,旨在为企业构建开源治理体系时提供可信参考。这一奖项意味着Apache ShardingSphere在许可证合规性,软件安全性,软件活跃度,技术成熟度,服务支持力和软件兼容性六个方面均获得了国家级认可。

4.png

ShardingSphere自2018年11月加入Apache基金会孵化器以来至今已有23个月的旅程,在这段时间里ShardingSphere不仅在项目建设上获得较大成功,同时还经营出了活跃的社区。目前ShardingSphere在github上已经拥有12k+ star,同时获得了140+的登记使用公司以及来自不同领域的committer进行代码贡献。

3.png


开源不易,我们从未停下过脚步!

在此要感谢所有关注Apache ShardingSphere和积极在社区贡献代码的朋友们!

项目地址:

https://github.com/sharding-sphere/sharding-sphere/

更多信息请浏览官网:

http://shardingsphere.io/

如果你想加入社区群,请扫描下方社区经理二维码。
查看原文

赞 0 收藏 0 评论 0

ShardingSphere 发布了文章 · 8月11日

停滞数年后,ElasticJob 携首个 Apache 版本 3.0.0-alpha 回归!

在成为 Apache ShardingSphere 的子项目的几个月时间里,ElasticJob 社区在修复与合并了535个 issue 和 pull request 之后,发布了加入 Apache 软件基金会后的第一个正式版本:3.0.0-alpha。

背景

ElasticJob(https://github.com/apache/sha...)是面向互联网生态和海量任务的分布式调度解决方案,由两个相互独立的子项目 ElasticJob-Lite 和 ElasticJob-Cloud 组成。它诞生于 2015年,当时业界虽然有 QuartZ 等出类拔萃的定时任务框架,但缺乏分布式方面的探索。分布式调度云平台产品的缺失,使得 ElasticJob 从出现伊始便备受关注。它有效地弥补了作业在分布式领域的短板,并且提供了一站式的自动化运维管控端,各个产品使用统一的作业 API,开发者仅需一次开发,即可随意部署。

ElasticJob 在技术选型时,选择站在了巨人的肩膀上而不是重复制造轮子的理念,将定时任务事实标准的 QuartZ 与 分布式协调的利器 ZooKeeper 完美结合,快速而稳定的搭建了全新概念的分布式调度框架。

ElasticJob调度模型

ElasticJob 的调度模型划分为支持线程级别调度的进程内调度 ElasticJob-Lite,和进程级别调度的ElasticJob-Cloud。

进程内调度

ElasticJob-Lite 是面向进程内的线程级调度框架。它能够与 Spring 、Dubbo等 Java 框架配合使用,在作业中可自由使用 Spring 注入的 Bean,如数据源连接池、Dubbo 远程服务等,更加方便地贴合业务开发。

ElasticJob-Lite与业务应用部署在一起,其生命周期与业务应用保持一致,是典型的嵌入式轻量级架构。ElasticJob-Lite 非常适合于资源使用稳定、部署架构简单的普通 Java 应用,可以理解为 Java 开发框架。

ElasticJob-Lite 本身是无中心化架构,无需独立的中心化调度节点,分布式下的每个任务节点均是以自调度的方式适时的调度作业。任务之间只需要一个注册中心来对分布式场景下的任务状态进行协调即可,目前支持 ZooKeeper 作为注册中心。

架构图如下:

通过图中可看出,ElasticJob-Lite 的分布式作业节点通过选举获取主节点,并通过主节点进行分片。分片完毕后,主节点与从节点并无二致,均以自我调度的方式执行任务。

进程级调度

ElasticJob-Cloud 拥有进程内调度和进程级别调度两种方式。由于 ElasticJob-Cloud 能够对作业服务器的资源进行控制,因此其作业类型可划分为常驻任务和瞬时任务。常驻任务类似于ElasticJob-Lite,是进程内调度;瞬时任务则完全不同,它充分的利用了资源分配的削峰填谷能力,是进程级的调度,每次任务的会启动全新的进程处理。

ElasticJob-Cloud 需要通过 Mesos 对资源进行控制,并且通过部署在 Mesos Master的调度器进行任务和资源的分配。Cloud采用中心化架构,将调度中心的高可用交由 Mesos管理。

它的架构图如下:

通过图中可看出,ElasticJob-Cloud 除了拥有 Lite 的全部能力之外,还拥有资源分配和任务分发的能力。它将作业的开发、打包、分发、调度、治理、分片等一些列的生命周期完全托管,是真正的作业云调度系统。

相比于 ElasticJob-Lite 的简单易用,ElasticJob-Cloud 对 Mesos 的强依赖增加了系统部署的复杂度,因此更加适合大规模的作业系统。

功能列表

ElasticJob 功能主要有弹性调度、资源分配、作业治理和可视化管控。

弹性调度

弹性调度是 ElasticJob 最重要的功能,也是这款产品名称的由来。它是一款能够让任务通过分片进行水平扩展的任务处理系统。

ElasticJob 中任务分片项的概念,使得任务可以在分布式的环境下运行,每台任务服务器只运行分配给该服务器的分片。随着服务器的增加或宕机,ElasticJob 会近乎实时的感知服务器数量的变更,从而重新为分布式的任务服务器分配更加合理的任务分片项,使得任务可以随着资源的增加而提升效率。

资源分配

调度是指在适合的时间将适合的资源分配给任务,并使其生效。ElasticJob 具备资源分配的能力,它能够像分布式的操作系统一样调度任务。资源分配是借由 Mesos 实现的,由 Mesos 负责分配任务声明的所需资源(CPU 和内存),并将分配出去的资源进行隔离。ElasticJob 在获取到资源之后才会执行任务。

考虑到 Mesos 系统部署相对复杂,因此 ElasticJob 将这部分拆分至 ElasticJob cloud 部分,供高级用户使用。随着 Kubernetes 的强劲发展,ElasticJob 未来也会完成 Cloud 部分与它的对接。

作业治理

作业在分布式场景下的高可用、失效转移、错过作业重新执行等行为的治理和协调。

可视化管控端

主要包括作业声明周期管控、执行历史记录查询、配置中心管理等。

3.0.0-alpha 功能先睹为快

构建 & 依赖

1. 升级至 Java 8

2. 升级最低支持的  ZooKeeper 版本 至 3.6.x

API 变更

1. 将 Maven 坐标的 groupId 变更为org.apache.shardingsphere.elasticjob

2. 将包名称变更为org.apache.shardingsphere.elasticjob

3. 将 Spring 命名空间名称变更为 http://shardingsphere.apache....

4. 全新的作业 API,可使用 SPI 自定制作业类型

5. 使用 SPI 引用配置策略,如任务分片、线程池使用和错误处理等策略

6. 将控制台代码从作业核心模块中分离

新功能

1. 调度器多元化,增加一次性任务调度器

2. 提供ElasticJob-Lite 项目的 官方 Spring Boot Starter

3. 支持使用多种数据库类型存储作业历史轨迹数据

4. 允许用户通过环境变量指定适合的 IP 地址

5. 全新的控制台界面

3.x版本设计解读

通过Release Notes能够看出,ElasticJob 3.x 并非 2.x 的修补版本,而是通过革新的设计理念践行的一套新产品。

ElasticJob 3.x 最直观的变化是将原有的个位数的模块数量拆分为数十个职责清理的微模块。

新版本的关键词是微内核、可扩展和生态对接。

微内核

ElasticJob 3.x 抽象了 API 和基础设施模块,并且将注册中心、历史执行轨迹、控制台、作业执行器、Lite和Cloud等模块全数分离。

内核模块高度可扩展,但不依赖于可扩展模块本身的实现。它继承了 ElasticJob 之前的能力,在继续为开发者提供分布式服务的工具包的同时,向开发者开放可自由定制化扩展的脚手架。

可扩展

ElasticJob 3.x在微内核的基础上定义了丰富的可扩展接口,包括作业类型、配置策略、历史执行轨迹存储端以及将要做的注册中心存储端等可扩展接口。

开发者可以在不修改 ElasticJob 源码的情况下织入定制化功能,真正做到对修改关闭,对扩展开放。

### 生态对接

ElasticJob 3.x 提供了官方的Spring Boot Starter,并已经着手开发基于 Apache SkyWalking 的自动探针,使其能更加便捷的融入现有的技术体系。

另外,从Release Notes中可以解读到的是,ElasticJob 3.x 并未对 Cloud进行大幅更新,其主要改动均集中在内核以及Lite模块。

对于部署复杂且逐渐不再流行的 Mesos,ElasticJob 的 3.x 将渐渐弱化对它的依赖,并计划在未来提供更加泛化资源隔离 API,使 Cloud 产品线可对接Mesos,Kubernetes 甚至无依赖的独立部署使用。

3.0.0-beta 功能预告

在调整完项目和包结构之后,ElasticJob 3.0.0-beta版本将工作重点放在新功能开发和操作 API 标准化这两个方面。

新功能预告

  • 作业依赖

支持基于有向无环图(DAG)的作业依赖。依赖包含基于作业整体维度的依赖,以及基于作业分片项的依赖,打造更加灵活的作业治理解决方案。

  • HTTP 作业类型

支持HTTP作业类型,在Script 之外提供另外的跨语言作业类型。

操作 API 标准化

  • 统一提供基于 RESTful 操作API接口
  • 简化基于 SDK 操作API接口

未来规划

未来,ElasticJob 将大刀阔斧的向前迈进,主要的规划如下:

  • 调度执行分离

将调度器和执行器完全分离。调度器可以与执行器一起部署,即为 ElasticJob lite 的无中心化轻量级版本;调度器可以与执行器分离部署,即为 ElasticJob cloud 的资源管控的一站式分布式调度系统。

  • 更加易用的云管产品

将目前仅支持 Mesos 的 ElasticJob cloud 打造为支持 Mesos 和 Kubernetes 的作业云管平台,并提供无 Mesos 和 Kubernetes 也能够独立使用的不包含资源管控的纯作业管控平台。

  • 可插拔生态

与 Apache ShardingSphere 一脉相承,ElasticJob 也将提供更加可插拔和模块化架构,为开发者提供基础设施。方便开发者基于 ElasticJob 二次开发,添加各种定制化功能,包括但不限于作业类型(如:大数据作业、HTTP作业等)、注册中心类型(如:Eureka等)、执行轨迹存储介质(如其他数据库类型)等。

ElasticJob 最终会将 Lite 和 Cloud 以更贴近的方式供开发工程师和运维工程师使用,共享其调度、执行和作业库。整体规划如下:

关于 ElasticJob 社区

ElasticJob 社区在之前的几年处于停滞状况,主要原因是作者精力有限,分身乏术。在接收到了作为 Apache ShardingSphere 弹性迁移的调度基础设施的需求之后,本就一脉相承的 ElasticJob 社区决定重启,并且作为 Apache ShardingSphere 的子项目继续发光发热。目前的 ElasticJob 已正式将项目源码迁入 Apache 的 GitHub 仓库,并且在重启的几个月来十分活跃,在GitHub 周和月度趋势排名中榜上有名。

ElasticJob 是Apache ShardingSphere(https://github.com/apache/sha...)的子项目,目标是成为独立的 Apache 顶级项目,以及为 Apache ShardingSphere 的弹性迁移提供数据调度的基石。

作者简介

张亮,京东数科数字技术中心架构专家,Apache ShardingSphere PMC Chair。

热爱开源,擅长以 Java 为主分布式架构,推崇优雅代码。

目前主要精力投入在将分布式数据库中间件 Apache ShardingSphere 打造为业界一流的金融级数据解决方案之上。

Apache ShardingSphere(https://github.com/apache/sha... Apache 软件基金会顶级项目,也是 Apache 软件基金会首个分布式数据库中间件。

曾出版书籍《未来架构——从服务化到云原生》。

GitHub: https://github.com/terrymanu,随时欢迎技术交流和指正。

查看原文

赞 1 收藏 0 评论 0

ShardingSphere 发布了文章 · 8月11日

快讯!Apache ShardingSphere 官方文档 pdf 版已上线

Apache ShardingSphere 的官方文档 pdf 版已上线,实时更新,与官网保持同步,欢迎大家留存。大家需要注意 pdf 首页的生成时间,以保证获取的 pdf 文档没有过期。

地址如下:

中文版

https://shardingsphere.apache..._docs_cn.pdf

英文版

https://shardingsphere.apache..._docs_en.pdf

欢迎精通 pdf 格式优化的同学联系 ShardingSphere 社区,共同把 pdf 文档做的更精美。

pdf 文档主要分为 8 章 - 简介、功能列表、快速入门、概念&功能、用户手册、开发者手册、下载、FAQ。文档理论讲解透彻,使用示例深入,干货满满,希望能够帮助大家更好的学习和实践。与此同时,我们也希望该文档能够成为 handbook 在工程师之间传阅,欢迎大家积极转发和分享!

查看原文

赞 2 收藏 0 评论 1

ShardingSphere 发布了文章 · 7月26日

ShardingSphere 4.x 下载

  • 最新版本
  • 全部版本
  • 校验版本

最新版本

ShardingSphere的发布版包括源码包及其对应的二进制包。由于下载内容分布在镜像服务器上,所以下载后应该进行GPG或SHA-512校验,以此来保证内容没有被篡改。

具体下载地址见:

https://shardingsphere.apache.org/document/legacy/4.x/document/cn/downloads/

全部版本

全部版本请到Archive repository查看。 全部孵化器版本请到Archive incubator repository查看。

校验版本

PGP签名文件

使用PGP或SHA签名验证下载文件的完整性至关重要。可以使用GPG或PGP验证PGP签名。请下载KEYS以及发布的asc签名文件。建议从主发布目录而不是镜像中获取这些文件。

gpg -i KEYS

or

pgpk -a KEYS

or

pgp -ka KEYS

要验证二进制文件或源代码,您可以从主发布目录下载相关的asc文件,并按照以下指南进行操作。

gpg --verify apache-shardingsphere-********.asc apache-shardingsphere-*********

or

pgpv apache-shardingsphere-********.asc

or

pgp apache-shardingsphere-********.asc
查看原文

赞 0 收藏 0 评论 0

ShardingSphere 发布了文章 · 7月26日

ShardingSphere 4.x FAQ常见问题

1. 如果SQL在ShardingSphere中执行不正确,该如何调试?

回答:

在Sharding-Proxy以及Sharding-JDBC 1.5.0版本之后提供了sql.show的配置,可以将解析上下文和改写后的SQL以及最终路由至的数据源的细节信息全部打印至info日志。 sql.show配置默认关闭,如果需要请通过配置开启。

2. 阅读源码时为什么会出现编译错误?

回答:

ShardingSphere使用lombok实现极简代码。关于更多使用和安装细节,请参考lombok官网。

sharding-orchestration-reg模块需要先执行mvn install命令,根据protobuf文件生成gRPC相关的java文件。

3. 使用Spring命名空间时找不到xsd?

回答:

Spring命名空间使用规范并未强制要求将xsd文件部署至公网地址,但考虑到部分用户的需求,我们也将相关xsd文件部署至ShardingSphere官网。

实际上sharding-jdbc-spring-namespace的jar包中META-INF\spring.schemas配置了xsd文件的位置:META-INF\namespace\sharding.xsd和META-INF\namespace\master-slave.xsd,只需确保jar包中该文件存在即可。

4. Cloud not resolve placeholder … in string value …异常的解决方法?

回答:

行表达式标识符可以使用${...}或$->{...},但前者与Spring本身的属性文件占位符冲突,因此在Spring环境中使用行表达式标识符建议使用$->{...}。

5. inline表达式返回结果为何出现浮点数?

回答:

Java的整数相除结果是整数,但是对于inline表达式中的Groovy语法则不同,整数相除结果是浮点数。 想获得除法整数结果需要将A/B改为A.intdiv(B)。

6. 如果只有部分数据库分库分表,是否需要将不分库分表的表也配置在分片规则中?

回答:

是的。因为ShardingSphere是将多个数据源合并为一个统一的逻辑数据源。因此即使不分库分表的部分,不配置分片规则ShardingSphere即无法精确的断定应该路由至哪个数据源。 但是ShardingSphere提供了两种变通的方式,有助于简化配置。

方法1:配置default-data-source,凡是在默认数据源中的表可以无需配置在分片规则中,ShardingSphere将在找不到分片数据源的情况下将表路由至默认数据源。

方法2:将不参与分库分表的数据源独立于ShardingSphere之外,在应用中使用多个数据源分别处理分片和不分片的情况。

7. ShardingSphere除了支持自带的分布式自增主键之外,还能否支持原生的自增主键?

回答:是的,可以支持。但原生自增主键有使用限制,即不能将原生自增主键同时作为分片键使用。

由于ShardingSphere并不知晓数据库的表结构,而原生自增主键是不包含在原始SQL中内的,因此ShardingSphere无法将该字段解析为分片字段。如自增主键非分片键,则无需关注,可正常返回;若自增主键同时作为分片键使用,ShardingSphere无法解析其分片值,导致SQL路由至多张表,从而影响应用的正确性。

而原生自增主键返回的前提条件是INSERT SQL必须最终路由至一张表,因此,面对返回多表的INSERT SQL,自增主键则会返回零。

8. 指定了泛型为Long的SingleKeyTableShardingAlgorithm,遇到ClassCastException: Integer can not cast to Long?

回答:

必须确保数据库表中该字段和分片算法该字段类型一致,如:数据库中该字段类型为int(11),泛型所对应的分片类型应为Integer,如果需要配置为Long类型,请确保数据库中该字段类型为bigint。

9. 使用SQLSever和PostgreSQL时,聚合列不加别名会抛异常?

回答:

SQLServer和PostgreSQL获取不加别名的聚合列会改名。例如,如下SQL:

SELECT SUM(num), SUM(num2) FROM tablexxx;

SQLServer获取到的列为空字符串和(2),PostgreSQL获取到的列为空sum和sum(2)。这将导致ShardingSphere在结果归并时无法找到相应的列而出错。

正确的SQL写法应为:

SELECT SUM(num) AS sum_num, SUM(num2) AS sum_num2 FROM tablexxx;

10. Oracle数据库使用Timestamp类型的Order By语句抛出异常提示“Order by value must implements Comparable”?

回答:

针对上面问题解决方式有两种: 1.配置启动JVM参数“-oracle.jdbc.J2EE13Compliant=true” 2.通过代码在项目初始化时设置System.getProperties().setProperty(“oracle.jdbc.J2EE13Compliant”, “true”);

原因如下:

com.dangdang.ddframe.rdb.sharding.merger.orderby.OrderByValue#getOrderValues()方法如下:

    private List<Comparable<?>> getOrderValues() throws SQLException {
        List<Comparable<?>> result = new ArrayList<>(orderByItems.size());
        for (OrderItem each : orderByItems) {
            Object value = resultSet.getObject(each.getIndex());
            Preconditions.checkState(null == value || value instanceof Comparable, "Order by value must implements Comparable");
            result.add((Comparable<?>) value);
        }
        return result;
    }

使用了resultSet.getObject(int index)方法,针对TimeStamp oracle会根据oracle.jdbc.J2EE13Compliant属性判断返回java.sql.TimeStamp还是自定义oralce.sql.TIMESTAMP 详见ojdbc源码oracle.jdbc.driver.TimestampAccessor#getObject(int var1)方法:

    Object getObject(int var1) throws SQLException {
        Object var2 = null;
        if(this.rowSpaceIndicator == null) {
            DatabaseError.throwSqlException(21);
        }

        if(this.rowSpaceIndicator[this.indicatorIndex + var1] != -1) {
            if(this.externalType != 0) {
                switch(this.externalType) {
                case 93:
                    return this.getTimestamp(var1);
                default:
                    DatabaseError.throwSqlException(4);
                    return null;
                }
            }

            if(this.statement.connection.j2ee13Compliant) {
                var2 = this.getTimestamp(var1);
            } else {
                var2 = this.getTIMESTAMP(var1);
            }
        }

        return var2;
    }

11. 使用Proxool时分库结果不正确?

回答:

使用Proxool配置多个数据源时,应该为每个数据源设置alias,因为Proxool在获取连接时会判断连接池中是否包含已存在的alias,不配置alias会造成每次都只从一个数据源中获取连接。

以下是Proxool源码中ProxoolDataSource类getConnection方法的关键代码:

    if(!ConnectionPoolManager.getInstance().isPoolExists(this.alias)) {
        this.registerPool();
    }

更多关于alias使用方法请参考Proxool官网。

PS:sourceforge网站需要翻墙访问。

12. ShardingSphere提供的默认分布式自增主键策略为什么是不连续的,且尾数大多为偶数?

回答:

ShardingSphere采用snowflake算法作为默认的分布式自增主键策略,用于保证分布式的情况下可以无中心化的生成不重复的自增序列。因此自增主键可以保证递增,但无法保证连续。

而snowflake算法的最后4位是在同一毫秒内的访问递增值。因此,如果毫秒内并发度不高,最后4位为零的几率则很大。因此并发度不高的应用生成偶数主键的几率会更高。

在3.1.0版本中,尾数大多为偶数的问题已彻底解决,参见:https://github.com/sharding-sphere/sharding-sphere/issues/1617

13. Windows环境下,通过Git克隆ShardingSphere源码时为什么提示文件名过长,如何解决?

回答:

为保证源码的可读性,ShardingSphere编码规范要求类、方法和变量的命名要做到顾名思义,避免使用缩写,因此可能导致部分源码文件命名较长。由于Windows版本的Git是使用msys编译的,它使用了旧版本的Windows Api,限制文件名不能超过260个字符。

解决方案如下:

打开cmd.exe(你需要将git添加到环境变量中)并执行下面的命令,可以让git支持长文件名:

git config --global core.longpaths true

如果是Windows 10,还需要通过注册表或组策略,解除操作系统的文件名长度限制(需要重启):

在注册表编辑器中创建HKLM\SYSTEM\CurrentControlSet\Control\FileSystem LongPathsEnabled, 类型为REG_DWORD,并设置为1。 或者从系统菜单点击设置图标,输入“编辑组策略”, 然后在打开的窗口依次进入“计算机管理” > “管理模板” > “系统” > “文件系统”,在右侧双击“启用 win32 长路径”。

参考资料: https://docs.microsoft.com/zh-cn/windows/desktop/FileIO/naming-a-filehttps://ourcodeworld.com/articles/read/109/how-to-solve-filename-too-long-error-in-git-powershell-and-github-application-for-windows

14. Windows环境下,运行Sharding-Proxy,找不到或无法加载主类 org.apache.shardingshpere.shardingproxy.Bootstrap,如何解决?

回答:

某些解压缩工具在解压Sharding-Proxy二进制包时可能将文件名截断,导致找不到某些类。

解决方案:

打开cmd.exe并执行下面的命令:

tar zxvf apache-shardingsphere-${RELEASE.VERSION}-sharding-proxy-bin.tar.gz

15. Type is required 异常的解决方法?

回答:

ShardingSphere中很多功能实现类的加载方式是通过SPI注入的方式完成的,如分布式主键,注册中心等;这些功能通过配置中type类型来寻找对应的SPI实现,因此必须在配置文件中指定类型。

16. 为什么我实现了ShardingKeyGenerator接口,也配置了Type,但是自定义的分布式主键依然不生效?

回答:

Service Provider Interface (SPI)是一种为了被第三方实现或扩展的API,除了实现接口外,还需要在META-INF/services中创建对应文件来指定SPI的实现类,JVM才会加载这些服务。

具体的SPI使用方式,请大家自行搜索。

与分布式主键ShardingKeyGenerator接口相同,其他ShardingSphere的扩展功能也需要用相同的方式注入才能生效。

17. JPA 和 数据脱敏无法一起使用,如何解决?

回答:

由于数据脱敏的DDL尚未开发完成,因此对于自动生成DDL语句的JPA与数据脱敏一起使用时,会导致JPA的实体类(Entity)无法同时满足DDL和DML的情况。

解决方案如下:

  1. 以需要脱敏的逻辑列名编写JPA的实体类(Entity)。
  2. 关闭JPA的auto-ddl,如 auto-ddl=none。
  3. 手动建表,建表时应使用数据脱敏配置的cipherColumn,plainColumn和assistedQueryColumn代替逻辑列。

18. 服务启动时如何加快metadata加载速度?

回答:

  1. 升级到4.0.1以上的版本,以提高default dataSource的table metadata的加载速度。
  2. 参照你采用的连接池,将配置项max.connections.size.per.query(默认值为1)调高(版本 >= 3.0.0.M3)。

19. 如何在inline分表策略时,允许执行范围查询操作(BETWEEN AND、>、<、>=、<=)?

回答:

  1. 需要使用4.1.0以上版本。
  2. 将配置项allow.range.query.with.inline.sharding设置为true即可(默认为false)。
  3. 需要注意的是,此时所有的范围查询将会使用广播的方式查询每一个分表。

20. 为什么配置了某个数据连接池的spring-boot-starter(比如druid)和sharding-jdbc-spring-boot-starter时,系统启动会报错?

回答:

  1. 因为数据连接池的starter(比如druid)可能会先加载并且其创建一个默认数据源,这将会使得sharding-jdbc创建数据源时发生冲突。
  2. 解决办法为,去掉数据连接池的starter即可,sharing-jdbc自己会创建数据连接池。

21. 在使用sharing-proxy的时候,如何动态在sharding-ui上添加新的logic schema?

回答:

  1. 4.1.0之前的版本不支持动态添加或删除logic schema的功能,例如一个proxy启动的时候有2个logic schema,就会一直持有这2个schema,只能感知这两个schema内部的表和rule的变更事件。
  2. 4.1.0版本支持在sharding-ui或直接在zookeeper上增加新的logic schema,删除logic schema的功能计划在5.0.0版本支持。

22. 在使用sharing-proxy时,怎么使用合适的工具连接到proxy?

回答:

  1. sharding-proxy可以看做是一个mysql server,所以首选支持mysql命令连接和操作。
  2. 如果使用其他第三方数据库工具,可能由于不同工具的特定实现导致出现异常。建议选择特定版本的工具或者打开特定参数,例如使用Navicat 11.1.13版本(不建议12.x),使用IDEA/DataGrip时打开introspect using JDBC metadata选项。
查看原文

赞 0 收藏 0 评论 0

ShardingSphere 发布了文章 · 7月26日

ShardingSphere 4.x Sharding-Scaling(Alpha) 用户手册

简介

Sharding-Scaling是一个提供给用户的通用的ShardingSphere数据接入迁移,及弹性伸缩的解决方案。

4.1.0开始向用户提供。

部署启动

  1. 执行以下命令,编译生成sharding-scaling二进制包:

git clone https://github.com/apache/shardingsphere.git;
cd shardingsphere;
mvn clean install -Prelease;

发布包所在目录为:/sharding-distribution/sharding-scaling-distribution/target/apache-shardingsphere-${latest.release.version}-sharding-scaling-bin.tar.gz

  1. 解压缩发布包,修改配置文件conf/server.yaml,这里主要修改启动端口,保证不与本机其他端口冲突,其他值保持默认即可:
port: 8888
blockQueueSize: 10000
pushTimeout: 1000
workerThread: 30
  1. 启动sharding-scaling:
sh bin/start.sh
  1. 查看日志logs/stdout.log,确保启动成功。
  2. 使用curl命令再次确认正常运行。
curl -X GET http://localhost:8888/shardingscaling/job/list

应答应为:

{"success":true,"errorCode":0,"errorMsg":null,"model":[]}

结束Sharding-Scaling

sh bin/stop.sh

应用配置项

应用现有配置项如下,相应的配置可在conf/server.yaml中修改:

名称说明默认值
portHTTP服务监听端口8888
blockQueueSize数据传输通道队列大小10000
pushTimeout数据推送超时时间,单位ms1000
workerThread工作线程池大小,允许同时运行的迁移任务线程数30

使用手册

环境要求

纯JAVA开发,JDK建议1.8以上版本。

支持迁移场景如下:

源端目标端是否支持
MySQL(5.1.15 ~ 5.7.x)sharding-proxy支持
PostgreSQL(9.4 ~ )sharding-proxy支持

注意
如果后端连接MySQL数据库,需要下载MySQL Connector/J
解压缩后,将mysql-connector-java-5.1.47.jar拷贝到${sharding-scaling}lib目录。

权限要求

MySQL 需要开启binlogbinlog format为Row模式,且迁移时所使用用户需要赋予Replication相关权限。

+-----------------------------------------+---------------------------------------+
| Variable_name                           | Value                                 |
+-----------------------------------------+---------------------------------------+
| log_bin                                 | ON                                    |
| binlog_format                           | ROW                                   |
+-----------------------------------------+---------------------------------------+

+------------------------------------------------------------------------------+
|Grants for ${username}@${host}                                                |
+------------------------------------------------------------------------------+
|GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO ${username}@${host}     |
|.......                                                                       |
+------------------------------------------------------------------------------+

PostgreSQL 需要开启test_decoding

API接口

弹性迁移组件提供了简单的HTTP API接口

创建迁移任务

接口描述:POST /shardingscaling/job/start

请求体:

ParameterDescribe
ruleConfiguration.sourceDatasource源端sharding sphere数据源相关配置
ruleConfiguration.sourceRule源端sharding sphere表规则相关配置
ruleConfiguration.destinationDataSources.name目标端sharding proxy名称
ruleConfiguration.destinationDataSources.url目标端sharding proxy jdbc url
ruleConfiguration.destinationDataSources.username目标端sharding proxy用户名
ruleConfiguration.destinationDataSources.password目标端sharding proxy密码
jobConfiguration.concurrency迁移并发度,举例:如果设置为3,则待迁移的表将会有三个线程同时对该表进行迁移,前提是该表有整数型主键

示例:

curl -X POST \
  http://localhost:8888/shardingscaling/job/start \
  -H 'content-type: application/json' \
  -d '{
   "ruleConfiguration": {
      "sourceDatasource": "ds_0: !!YamlDataSourceConfiguration\n  dataSourceClassName: com.zaxxer.hikari.HikariDataSource\n  properties:\n    jdbcUrl: jdbc:mysql://127.0.0.1:3306/test?serverTimezone=UTC&useSSL=false\n    username: root\n    password: '\''123456'\''\n    connectionTimeout: 30000\n    idleTimeout: 60000\n    maxLifetime: 1800000\n    maxPoolSize: 50\n    minPoolSize: 1\n    maintenanceIntervalMilliseconds: 30000\n    readOnly: false\n",
      "sourceRule": "defaultDatabaseStrategy:\n  inline:\n    algorithmExpression: ds_${user_id % 2}\n    shardingColumn: user_id\ntables:\n  t1:\n    actualDataNodes: ds_0.t1\n    keyGenerator:\n      column: order_id\n      type: SNOWFLAKE\n    logicTable: t1\n    tableStrategy:\n      inline:\n        algorithmExpression: t1\n        shardingColumn: order_id\n  t2:\n    actualDataNodes: ds_0.t2\n    keyGenerator:\n      column: order_item_id\n      type: SNOWFLAKE\n    logicTable: t2\n    tableStrategy:\n      inline:\n        algorithmExpression: t2\n        shardingColumn: order_id\n",
      "destinationDataSources": {
         "name": "dt_0",
         "password": "123456",
         "url": "jdbc:mysql://127.0.0.1:3306/test2?serverTimezone=UTC&useSSL=false",
         "username": "root"
      }
   },
   "jobConfiguration": {
      "concurrency": 3
   }
}'

返回信息:

{
   "success": true,
   "errorCode": 0,
   "errorMsg": null,
   "model": null
}

查询迁移任务进度

接口描述:GET /shardingscaling/job/progress/{jobId}

示例:

curl -X GET \
  http://localhost:8888/shardingscaling/job/progress/1

返回信息:

{
   "success": true,
   "errorCode": 0,
   "errorMsg": null,
   "model": {
        "id": 1,
        "jobName": "Local Sharding Scaling Job",
        "status": "RUNNING/STOPPED"
        "syncTaskProgress": [{
            "id": "127.0.0.1-3306-test",
            "status": "PREPARING/MIGRATE_HISTORY_DATA/SYNCHRONIZE_REALTIME_DATA/STOPPING/STOPPED",
            "historySyncTaskProgress": [{
                "id": "history-test-t1#0",
                "estimatedRows": 41147,
                "syncedRows": 41147
            }, {
                "id": "history-test-t1#1",
                "estimatedRows": 42917,
                "syncedRows": 42917
            }, {
                "id": "history-test-t1#2",
                "estimatedRows": 43543,
                "syncedRows": 43543
            }, {
                "id": "history-test-t2#0",
                "estimatedRows": 39679,
                "syncedRows": 39679
            }, {
                "id": "history-test-t2#1",
                "estimatedRows": 41483,
                "syncedRows": 41483
            }, {
                "id": "history-test-t2#2",
                "estimatedRows": 42107,
                "syncedRows": 42107
            }],
            "realTimeSyncTaskProgress": {
                "id": "realtime-test",
                "delayMillisecond": 1576563771372,
                "logPosition": {
                    "filename": "ON.000007",
                    "position": 177532875,
                    "serverId": 0
                }
            }
        }]
   }
}

查询所有迁移任务

接口描述:GET /shardingscaling/job/list

示例:

curl -X GET \
  http://localhost:8888/shardingscaling/job/list

返回信息:

{
  "success": true,
  "errorCode": 0,
  "model": [
    {
      "jobId": 1,
      "jobName": "Local Sharding Scaling Job",
      "status": "RUNNING"
    }
  ]
}

停止迁移任务

接口描述:POST /shardingscaling/job/stop

请求体:

ParameterDescribe
jobIdjob id

示例:

curl -X POST \
  http://localhost:8888/shardingscaling/job/stop \
  -H 'content-type: application/json' \
  -d '{
   "jobId":1
}'

返回信息:

{
   "success": true,
   "errorCode": 0,
   "errorMsg": null,
   "model": null
}

通过UI界面来操作

Sharding-scaling与sharding-ui集成了用户界面,所以上述所有任务相关的操作都可以通过UI界面点点鼠标来实现,当然本质上还是调用了上述基本接口。

更多信息请参考sharding-ui项目。

查看原文

赞 1 收藏 0 评论 0

认证与成就

  • 获得 36 次点赞
  • 获得 1 枚徽章 获得 0 枚金徽章, 获得 0 枚银徽章, 获得 1 枚铜徽章

擅长技能
编辑

开源项目 & 著作
编辑

(゚∀゚ )
暂时没有

注册于 4月20日
个人主页被 2.6k 人浏览