本文将介绍分布式事务,以xa事务为主,简要剖析mysql中的xa实现、dble的xa实现(因dble复用mycat,故也会覆盖到mycat的xa事务),最后介绍udal的xa事务
分布式事务是指事务的参与者、支持事务的服务器、资源服务器以及事务管理器分别位于不同的分布式系统的不同节点之上。
随着并发量、数据量越来越大及业务已经细化到不能再按照业务划分,我们不得不使用分布式数据库提高系统的性能。在分布式系统中,各个节点在物理上都是相对独立的,每个节点上的数据操作都可以满足 ACID。但是,各独立节点之间无法知道其他节点事务的执行情况,如果想让多台机器中的数据保存一致,就必须保证所有节点上的数据操作要么全部执行成功,要么全部不执行,比较常规的解决方法是引入“协调者”来统一调度所有节点的执行。
实现分布式事务的方案比较多,常见的比如基于 XA
协议的 2PC
、3PC
,基于业务层的 TCC
,还有应用消息队列 + 消息表实现的最终一致性方案。本文将重点研究基于XA协议,并介绍分布式事务的实现。
dble默认开启的是一般分布式事务,需要设置set xa=on后开启xa事务,同时采用最大努力通知的方式。
最大努力通知是最简单的一种柔性事务,适用于一些最终一致性时间敏感度低的业务,且被动方处理结果 不影响主动方的处理结果。
这个方案的大致意思就是:
- 系统 A 本地事务执行完之后,发送个消息到 MQ;
- 这里会有个专门消费 MQ 的服务,这个服务会消费 MQ 并调用系统 B 的接口;
- 要是系统 B 执行成功就 ok 了;要是系统 B 执行失败了,那么最大努力通知服务就定时尝试重新调用系统 B, 反复 N 次,最后还是不行就放弃。
由于dble本身就是一个中间件,不会再使用消息队列中间件,如果在COMMIT节点发生失败,则尝试重新下发,几次尝试未果将事务交给定时任务来继续重试,从而完成最大努力通知方式。
一般分布式事务
dble默认是不开启XA事务方式,采用一般分布式事务。在分布式事务中整体的逻辑和mysql的事务逻辑类似,通过长期持有的连接来进行,每个前端连接frontconnection对应一个session,在dble的每个session中有对应的事务状态以及session所持有的后端连接集合target,在非事务状态下或者是autocommit状态下每次后端连接被使用完毕之后就会被移除target并释放回空闲连接池,但是在事务开启的状态下,在SQL执行完毕的时候connection会在target中长期储存,直到session发起commit或者是rollback。
Dble中的普通分布式事务其实就是后端mysql事务的集合,并且这个事务是没有文件记录的,由于mysql的事务特性在事务发生的过程中若断开连接等同于放弃事务,所以可能出现在commit的过程中由于各种意外情况导致事务的部分提交,例如连接后端四个节点dn1,dn2,dn3,dn4在提交commit进行依次下发的时候dn1,dn2,dn3都提交成功,但是dn4由于网络意外提交失败,导致了预期执行的部分内容丢失,并且由于dn1,dn2.dn3已经提交成功无法进行数据回滚,只能进行人工的数据补偿。**
XA事务
XA 规范 是 X/Open 组织定义的分布式事务处理(DTP,Distributed Transaction Processing)标准。
XA 规范 描述了全局的事务管理器与局部的资源管理器之间的接口。 XA规范 的目的是允许的多个资源(如数据库,应用服务器,消息队列等)在同一事务中访问,这样可以使 ACID 属性跨越应用程序而保持有效。XA 支持分布式事务,即允许多个独立的事务资源参与一个全局事务的能力。
XA 规范 使用两阶段提交(2PC,Two-Phase Commit)来保证所有资源同时提交或回滚任何特定的事务。
为了降低单点压力,通常会根据业务情况进行分表分库,将表分布在不同的库中(库可能分布在不同的机器上)。在这种场景下,事务的提交会变得相对复杂,因为多个节点(库)的存在,可能存在部分节点提交失败的情况。
基于 XA 协议实现的分布式事务,XA 协议中分为两部分:事务管理器和本地资源管理器。其中本地资源管理器往往由数据库实现,比如 Oracle、MYSQL 这些数据库都实现了 XA 接口,而事务管理器则作为一个全局的调度者。
两阶段提交(2PC
),对业务侵⼊很小,它最⼤的优势就是对使⽤⽅透明,用户可以像使⽤本地事务⼀样使⽤基于 XA 协议的分布式事务(也是dble的主要功能实现),能够严格保障事务 ACID 特性。
在XA协议中分为两阶段:
第一阶段:事务管理器要求每个涉及到事务的数据库预提交(precommit)此操作,并反映是否可以提交.
第二阶段:事务协调器要求每个数据库提交数据,或者回滚数据。
准备阶段指事务协调者(事务管理器)向每个参与者(资源管理器)发送准备消息,每个参与者要么直接返回失败消息(如权限验证失败),要么在本地执行事务,写本地的 redo 和undo日志但不提交,可以进一步将准备阶段分为以下三步。
(1)协调者节点向所有参与者节点询问是否可以执行提交操作(vote),并开始等待各参与者节点的响应。
(2)参与者节点执行询问发起为止的所有事务操作,并将 undo 信息和 redo 信息写入日志。
(3)各参与者节点响应协调者节点发起的询问。如果参与者节点的事务操作实际执行成功,则它返回一个“同意”消息;如果参与者节点的事务操作实际执行失败,则它返回一个“中止”消息。
提交阶段指如果协调者收到了参与者的失败消息或者超时,则直接向每个参与者发送回滚(Rollback)消息,否则发送提交(Commit)消息,参与者根据协调者的指令执行提交或者回滚操作,释放所有事务在处理过程中使用的锁资源。
二阶段提交所存在的缺点如下。
(1)同步阻塞问题,在执行过程中所有参与节点都是事务阻塞型的,当参与者占用公共资源时,其他第三方节点访问公共资源时不得不处于阻塞状态。
(2)单点故障,由于协调者的重要性,一旦协调者发生故障,则参与者会一直阻塞下去。
(3)数据不一致,在二阶段提交的第 2 个阶段中,当协调者向参与者发送 commit 请求之后发生了局部网络异常或者在发送 commit 请求的过程中协调者发生了故障,则会导致只有一部分参与者接收到了 commit 请求,而在这部分参与者在接收到 commit 请求之后就会执行commit操作,其他部分未接收到 commit 请求的机器则无法执行事务提交,于是整个分布式系统便出现了数据不一致的现象。
mysql的XA介绍
MySQL XA 的命令集合如下:
XA START xid: 开启一个事务,并将事务置于ACTIVE状态,此后执行的SQL语句都将置于该是事务中。
XA END xid: 将事务置于IDLE状态,表示事务内的SQL操作完成。
XA PREPARE xid: 实现事务提交的准备工作,事务状态置于PREPARED状态。事务如果无法完成提交前的准备操作,该语句会执行失败。
XA COMMIT xid: 事务最终提交,完成持久化。
XA ROLLBACK xid: 事务回滚终止。
XA RECOVER: 查看MySQL中存在的PREPARED状态的xa事务。
Mycat的XA实现
通过阅读dble源码,发现是在复用Mycat的多个类,因此对mycat的xa实现做简单剖析。
XA 事务启动的源码如下:
public class MySQLConnection extends BackendAIOConnection {
//设置开启事务
private void getAutocommitCommand(StringBuilder sb, boolean autoCommit) {
if (autoCommit) {
sb.append("SET autocommit=1;");
} else {
sb.append("SET autocommit=0;");
}
}
public void execute(RouteResultsetNode rrn, ServerConnection sc,boolean autocommit) throws UnsupportedEncodingException {
if(!modifiedSQLExecuted && rrn.isModifySQL()) {
modifiedSQLExecuted = true;
}
//获取当前事务 ID
String xaTXID = sc.getSession2().getXaTXID();
synAndDoExecute(xaTXID, rrn, sc.getCharsetIndex(), sc.getTxIsolation(),autocommit);
}
……
……//
}
用户应用侧设置手动提交以后,Mycat 会在当前连接中加入
SET autocommit=0;
将该语句加入到 StringBuffer 中,等待提交到数据库。 用户连接 Session 的源码如下:
public class NonBlockingSession implements Session {
……
……// NonBlockingSession.java 源码
}
SET XA = ON ;语句分析
用户应用侧发送该语句到 Mycat 中,由 SQL 语句解析器解析后交由 SetHandle 进行处理c.getSession2().setXATXEnabled (true);
调用 NonBlockSession 中的 setXATXEnable d 方法设置 XA 开关启动,并生成 XID,代码如下:
public void setXATXEnabled(boolean xaTXEnabled) {
LOGGER.info("XA Transaction enabled ,con " + this.getSource());
if (xaTXEnabled && this.xaTXID == null) {
xaTXID = genXATXID();
}
}
另外,NonBlockSession 会接收来自于用户应用侧的 commit, 调用 commit 方法进行处理事务提交的逻辑。 在 commit()方法中,首先会 check 节点个数,一个节点和多个节点分为不同的处理过程,这里只讲下多个节点的处理方法 checkDistriTransaxAndExecute(); 该方法会对多个节点的事务进行提交。 协调者的源码如下:
public class MultiNodeCoordinator implements ResponseHandler {
//MultiNodeCoordinator.java 源码
}
在 NonBlockSession 的 checkDistriTransaxAndExecute()方法中, NonBlockSession 会话类会调用专门进行多节点协同的 MultiNodeCoordinator 类进行具体的处理,在 MultiNodeCoordinator类中,executeBatchNodeCmd 方法加入 XA 1PC 提交的处理,代码片段如下:
for (RouteResultsetNode rrn : session.getTargetKeys()) {
……
if (mysqlCon.getXaStatus() == TxState.TX_STARTED_STATE){
//recovery Log
participantLogEntry[started] = new
ParticipantLogEntry(xaTxId,conn.getHost(),0,conn.getSchema(),((MySQLConnection) conn).getXaStatus());
String[] cmds = new String[]{"XA END " + xaTxId,"XA PREPARE " + xaTxId};
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Start execute the batch cmd : "+ cmds[0] + ";" +cmds[1]+","+"current
connection:"+conn.getHost()+":"+conn.getPort());
}
mysqlCon.execBatchCmd(cmds);
}
……
}
在 MultiNodeCoordinator 类的 okResponse 方法中,则进行 2pc 的事务提交
for (RouteResultsetNode rrn : session.getTargetKeys()) {
……
if (mysqlCon.getXaStatus() == TxState.TX_STARTED_STATE){
//recovery Log
participantLogEntry[started] = new
ParticipantLogEntry(xaTxId,conn.getHost(),0,conn.getSchema(),((MySQLConnection) conn).getXaStatus());
String[] cmds = new String[]{"XA END " + xaTxId,"XA PREPARE " + xaTxId};
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Start execute the batch cmd : "+ cmds[0] + ";" +cmds[1]+","+"current
connection:"+conn.getHost()+":"+conn.getPort());
}
mysqlCon.execBatchCmd(cmds);
}
……
}
分片事务提交处理的源码如下:
public class CommitNodeHandler implements ResponseHandler {
//结束 XA
public void commit(BackendConnection conn) {
……
……//省略此处代码,CommitNodeHandler.java源码
}
//提交 XA
@Override
public void okResponse(byte[] ok, BackendConnection conn) {
……
……//省略此处代码,
}
在 Mycat 中同样支持单节点 MySQL 数据库的 XA 事务处理,在 CommitNodeHandler 类中就是对单节点的 XA 二阶段处理,处理方式与 MultiNodeCoordinator 类同,通过 commit 方法进行 1pc 的提交,而通过okResponse 的方法进行 2pc 阶段的事务提交。 ** **分片事务回滚处理的源码如下:
public class RollbackNodeHandler extends MultiNodeHandler {
……
……//省略此处代码,RollbackNodeHandler.java 源码
}
在 RollbackNodeHandler 的 rollback 方法中加入了对 XA 事务的 rollback 处理,用户应用侧发起的 rollback 会在这个方法中进行处理。
for (final RouteResultsetNode node : session.getTargetKeys()) {
……
//support the XA rollback
MySQLConnection mysqlCon = (MySQLConnection) conn;
if(session.getXaTXID()!=null) {
String xaTxId = session.getXaTXID();
mysqlCon.execCmd("XA END " + xaTxId + ";");
mysqlCon.execCmd("XA ROLLBACK " + xaTxId + ";");
}else {
conn.rollback();
}
……
}
同样,该方法会对所有的 MySQL 数据库节点发起 xa rollback 指令。
dble的XA支持
Dble提供分布式事务支持,使用两阶段提交的分布式事务。可以为了性能选择普通模式或者为了数据安全采用XA模式。当然,XA模式依赖于MySQL-5.7的XA Transaction,MySQL节点的高可用性和数据的可靠性。
- 事务开始前需要设置手动提交:set autocommit=0;
- 使用命令开启XA 事务:set xa=on;
Dble整体来说的处理原则如下:
- 将XA事务的提交分为END PREPARE COMMIT三个部分
- 如果在PREPARE下发之前有节点失败或报错,关闭所有后端连接放弃事务数据
- 如果在PREPARE下发过程中发生失败,则回滚事务,所有节点下发ROLLBACK
- 如果在COMMIT节点发生失败,则尝试重新下发,几次尝试未果将事务交给定时任务来继续重试。
XA事务记录的内容
由于在Dble中采用两段提交的分布式事务,所以使用XA事务的时候对于DBLE本身就拥有了状态。状态就需要有文件或者其他方式的记录,其中关于XA事务细节的记录主要是记录以下几个部分
1. 事务ID
2. 事务状态
3. 事务中每个节点的连接host
4. 事务中每个节点的连接端口
5. 事务中每个节点连接最后的事务状态
6. 事务中每个节点连接的过期状态(没有实际作用)
7. 事务中每个节点连接对应的后端数据库
关键类说明:
- XAstage抽象类,继承自接口TransactionStage。包含String属性,"XA END STAGE"、"XA PREPARE STAGE"、"XA COMMIT STAGE"、"XA COMMIT FAIL STAGE"、"XA ROLLBACK STAGE"、"XA ROLLBACK FAIL STAGE",记录状态;包含两个会话类,NonBlockingSession和AbstractXAHandler; ** **onEnterStage对所有路由节点获取的连接进行处理,通过ConcurrentMap<RouteResultsetNode, BackendConnection>的结构,如果获取backend Connection成功并且获取BackendService成功,则调用onEnterStage(MySQLResponseService conn)方法(抽象方法,必须在子类实现)
- **XACommitStage,继承自XAStage,重写next(XAStage的父类),onEnterStage,onEnterStage(MySQLResponseService service),onConnectionOk,onConnectionError,onConnectionClose,onConnectError,getStage方法;每个方法都通过XAStateLog.saveXARecoveryLog记录日志状态。 **