背景
在日常开发中,会存在大量的接口需要同时写 DB 和调用远程服务,由于网络的不可靠性和节点故障,分布式系统中的数据一致性问题成为一个重大挑战。在当前服务MySQL写入成功,远程服务调用失败,或者在进行跨服务调用时,如果某个服务调用失败,如何保证前一个已成功的服务操作能正确回滚,保持数据的一致性。
方案
单次远程调用场景
- 写数据库(1-n 次写)
- 调用 feign 写 APISIX(1 次调用)
这个场景可使用 事务管理 + 重试机制。在最大程度保证成功的前提下,也能完美的保障数据的一致性。
首先,需要一个重试的能力,可以考虑 guava-retrying,封装了丰富的重试时机、停止重试策略、重试间隔策略、任务执行时间限制等。
但我们只需要异常重试(主要是解决网络抖动引发的异常)、重试次数、重试间隔,也可以自己封装一个工具类。
public class RetryUtil {
public static <T> T retry(RetryableSupplier<T> operation, int attempts, long delay) throws Exception {
Exception lastException = null;
for (int i = 0; i < attempts; i++) {
try {
return operation.get();
} catch (Exception e) {
log.error("retry error.", e);
lastException = e;
if (delay > 0 && i < attempts - 1) {
Thread.sleep(delay);
}
}
}
if (null != lastException) {
throw lastException;
}
return null;
}
public static void retry(RetryableRunnable runner, int attempts, long delay) throws Exception {
Exception lastException = null;
for (int i = 0; i < attempts; i++) {
try {
runner.run();
return;
} catch (Exception e) {
log.error("retry error.", e);
lastException = e;
if (delay > 0 && i < attempts - 1) {
Thread.sleep(delay);
}
}
}
if (null != lastException) {
throw lastException;
}
}
}
@FunctionalInterface
public interface RetryableRunnable {
void run() throws Exception;
}
@FunctionalInterface
public interface RetryableSupplier<T> {
T get() throws Exception;
}
然后是业务接口,开启事务管理以及在 feign 接口增加重试机制。
@Service
public class MyService {
@Autowired
private MyRepository myRepository;
@Autowired
private MyFeignClient myFeignClient;
@Transactional
public int updateDatabaseAndCallFeign(Data data) {
try {
// 业务校验,所有的写接口都需要加强校验,让异常尽早抛出
bizCheck();
// 写数据库,先执行数据库
myRepository.save(data);
// 调用Feign接口,增加重试机制,需要确保服务是幂等的才用重试
RetryUtil.retry(() -> myFeignClient.sendData(data), 3, 2000);
} catch (Exception e) {
log.error();
// 手动回滚事务
TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
return 1;
}
}
@Transactional(rollbackFor=Exception.class)
public void updateDatabaseAndCallFeign(Data data) {
// 业务校验,所有的写接口都需要加强校验,让异常尽早抛出
bizCheck();
// 写数据库,先执行数据库
myRepository.save(data);
// 调用Feign接口,增加重试机制,需要确保服务是幂等的才用重试
RetryUtil.retry(() -> myFeignClient.sendData(data), 3, 2000);
}
}
这里先写数据库,可以利用数据库的事务管理回滚。
另外会存在一些批量接口场景,会多次调用 feign
- 写数据库(1-n 个表)
- 调用 feign 写 APISIX(n 次调用)
这种场景在以上方案的基础上,需要增加 feign 接口的回补处理
@Service
public class MyService {
@Autowired
private MyRepository myRepository;
@Autowired
private MyFeignClient1 myFeignClient1;
@Autowired
private MyFeignClient2 myFeignClient2;
@Transactional
public void updateDatabaseAndCallFeigns(Data data) {
try {
// 业务校验
bizCheck();
// 写数据库
myRepository.save(data);
List<Object> successList = new ArrayList<>();
// 调用第一个Feign接口,增加重试机制
RetryUtil.retry(() -> myFeignClient1.sendData(data), 3, 2000);
successList.add(first feign info);
// 调用第二个Feign接口,增加重试机制
RetryUtil.retry(() -> myFeignClient2.sendData(data), 3, 2000);
} catch (Exception e) {
// 在捕获异常后,执行补偿逻辑
try {
// 如果第二个Feign接口失败,调用第一个Feign接口的补偿方法,根据successList判断
myFeignClient1.rollbackSendData(data);
} catch (Exception rollbackException) {
// 记录补偿操作失败的情况
log.error();
}
// 手动回滚数据库事务
TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
}
}
}
另外还会存在一些批量处理的接口,只需要保障单条数据的一致性,即使中途发生异常,也会处理完所有数据,再返回异常数据。
@Service
// 1、需要把当前类的对象代理暴露
@EnableAspectJAutoProxy(exposeProxy = true)
public class MyService {
@Autowired
private MyRepository myRepository;
@Autowired
private MyFeignClient myFeignClient;
public List<Data> processList(List<Data> dataList) {
List<Data> failDataList = new ArrayList();
for (Data singleData : dataList) {
try {
// 2、通过代理类调用,如果用this调用的话,事务传播会失效
((MyService) AopContext.currentProxy()).processSingle(singleData);
} catch (Exception e) {
failDataList.add(data);
}
}
return failDataList;
}
// 3、开启事务传播,每次处理是单独的事务,单独提交
@Transactional(propagation = Propagation.REQUIRES_NEW, rollbackFor = Exception.class)
public void processSingle(Data singleData) {
// 业务校验,所有的写接口都需要加强校验,让异常尽早抛出
bizCheck();
// 写数据库,先执行数据库
myRepository.save(data);
// 调用Feign接口,增加重试机制,需要确保服务是幂等的才用重试
RetryUtil.retry(() -> myFeignClient.sendData(data), 3, 2000);
}
}