searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

基于MySQL事务管理与远程调用的数据一致性探索

2024-06-17 07:05:54
9
0

背景

在日常开发中,会存在大量的接口需要同时写 DB 和调用远程服务,由于网络的不可靠性和节点故障,分布式系统中的数据一致性问题成为一个重大挑战。在当前服务MySQL写入成功,远程服务调用失败,或者在进行跨服务调用时,如果某个服务调用失败,如何保证前一个已成功的服务操作能正确回滚,保持数据的一致性。

 

方案

单次远程调用场景

  • 写数据库(1-n 次写)
  • 调用 feign 写 APISIX(1 次调用)

 

这个场景可使用 事务管理 + 重试机制。在最大程度保证成功的前提下,也能完美的保障数据的一致性。

 

首先,需要一个重试的能力,可以考虑 guava-retrying,封装了丰富的重试时机、停止重试策略、重试间隔策略、任务执行时间限制等。

但我们只需要异常重试(主要是解决网络抖动引发的异常)、重试次数、重试间隔,也可以自己封装一个工具类。

public class RetryUtil {

    public static <T> T retry(RetryableSupplier<T> operation, int attempts, long delay) throws Exception {
        Exception lastException = null;
        for (int i = 0; i < attempts; i++) {
            try {
                return operation.get();
            } catch (Exception e) {
                log.error("retry error.", e);
                lastException = e;
                if (delay > 0 && i < attempts - 1) {
                    Thread.sleep(delay);
                }
            }
        }
        if (null != lastException) {
            throw lastException;
        }
        return null;
    }

    public static void retry(RetryableRunnable runner, int attempts, long delay) throws Exception {
        Exception lastException = null;
        for (int i = 0; i < attempts; i++) {
            try {
                runner.run();
                return;
            } catch (Exception e) {
                log.error("retry error.", e);
                lastException = e;
                if (delay > 0 && i < attempts - 1) {
                    Thread.sleep(delay);
                }
            }
        }
        if (null != lastException) {
            throw lastException;
        }
    }
}

@FunctionalInterface
public interface RetryableRunnable {

    void run() throws Exception;
}

@FunctionalInterface
public interface RetryableSupplier<T> {

    T get() throws Exception;
}

 

然后是业务接口,开启事务管理以及在 feign 接口增加重试机制。

@Service
public class MyService {


    @Autowired
    private MyRepository myRepository;


    @Autowired
    private MyFeignClient myFeignClient;


    @Transactional
    public int updateDatabaseAndCallFeign(Data data) {
        try {
            // 业务校验,所有的写接口都需要加强校验,让异常尽早抛出
            bizCheck();      


            // 写数据库,先执行数据库
            myRepository.save(data);


            // 调用Feign接口,增加重试机制,需要确保服务是幂等的才用重试
            RetryUtil.retry(() -> myFeignClient.sendData(data), 3, 2000);
        } catch (Exception e) {
            log.error();
            // 手动回滚事务
            TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
            return 1;
        }
    }


    @Transactional(rollbackFor=Exception.class)
    public void updateDatabaseAndCallFeign(Data data) {
        // 业务校验,所有的写接口都需要加强校验,让异常尽早抛出
        bizCheck();      


        // 写数据库,先执行数据库
        myRepository.save(data);


        // 调用Feign接口,增加重试机制,需要确保服务是幂等的才用重试
        RetryUtil.retry(() -> myFeignClient.sendData(data), 3, 2000);
    }
}

 

这里先写数据库,可以利用数据库的事务管理回滚。

 

另外会存在一些批量接口场景,会多次调用 feign

  • 写数据库(1-n 个表)
  • 调用 feign 写 APISIX(n 次调用)

 

这种场景在以上方案的基础上,需要增加 feign 接口的回补处理

@Service
public class MyService {


    @Autowired
    private MyRepository myRepository;


    @Autowired
    private MyFeignClient1 myFeignClient1;


    @Autowired
    private MyFeignClient2 myFeignClient2;


    @Transactional
    public void updateDatabaseAndCallFeigns(Data data) {
        try {
            // 业务校验
            bizCheck();


            // 写数据库
            myRepository.save(data);


            List<Object> successList = new ArrayList<>();
            // 调用第一个Feign接口,增加重试机制
            RetryUtil.retry(() -> myFeignClient1.sendData(data), 3, 2000);
            successList.add(first feign info);
            
            // 调用第二个Feign接口,增加重试机制
            RetryUtil.retry(() -> myFeignClient2.sendData(data), 3, 2000);


        } catch (Exception e) {
            // 在捕获异常后,执行补偿逻辑
            try {
                // 如果第二个Feign接口失败,调用第一个Feign接口的补偿方法,根据successList判断
                myFeignClient1.rollbackSendData(data);
            } catch (Exception rollbackException) {
                // 记录补偿操作失败的情况
                log.error();
            }


            // 手动回滚数据库事务
            TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
        }
    }
}

 

另外还会存在一些批量处理的接口,只需要保障单条数据的一致性,即使中途发生异常,也会处理完所有数据,再返回异常数据。

 
@Service
// 1、需要把当前类的对象代理暴露
@EnableAspectJAutoProxy(exposeProxy = true)
public class MyService {


    @Autowired
    private MyRepository myRepository;


    @Autowired
    private MyFeignClient myFeignClient;

    public List<Data> processList(List<Data> dataList) {
        List<Data> failDataList = new ArrayList();
        for (Data singleData : dataList) {
            try {
                // 2、通过代理类调用,如果用this调用的话,事务传播会失效
                ((MyService) AopContext.currentProxy()).processSingle(singleData);
            } catch (Exception e) {
                failDataList.add(data);
            }
        }
        return failDataList;
    }
    
        // 3、开启事务传播,每次处理是单独的事务,单独提交
    @Transactional(propagation = Propagation.REQUIRES_NEW, rollbackFor = Exception.class)
    public void processSingle(Data singleData) {
      // 业务校验,所有的写接口都需要加强校验,让异常尽早抛出
      bizCheck();      

            // 写数据库,先执行数据库
      myRepository.save(data);

            // 调用Feign接口,增加重试机制,需要确保服务是幂等的才用重试
      RetryUtil.retry(() -> myFeignClient.sendData(data), 3, 2000);
    }

}
0条评论
0 / 1000
wjchan
1文章数
0粉丝数
wjchan
1 文章 | 0 粉丝
wjchan
1文章数
0粉丝数
wjchan
1 文章 | 0 粉丝
原创

基于MySQL事务管理与远程调用的数据一致性探索

2024-06-17 07:05:54
9
0

背景

在日常开发中,会存在大量的接口需要同时写 DB 和调用远程服务,由于网络的不可靠性和节点故障,分布式系统中的数据一致性问题成为一个重大挑战。在当前服务MySQL写入成功,远程服务调用失败,或者在进行跨服务调用时,如果某个服务调用失败,如何保证前一个已成功的服务操作能正确回滚,保持数据的一致性。

 

方案

单次远程调用场景

  • 写数据库(1-n 次写)
  • 调用 feign 写 APISIX(1 次调用)

 

这个场景可使用 事务管理 + 重试机制。在最大程度保证成功的前提下,也能完美的保障数据的一致性。

 

首先,需要一个重试的能力,可以考虑 guava-retrying,封装了丰富的重试时机、停止重试策略、重试间隔策略、任务执行时间限制等。

但我们只需要异常重试(主要是解决网络抖动引发的异常)、重试次数、重试间隔,也可以自己封装一个工具类。

public class RetryUtil {

    public static <T> T retry(RetryableSupplier<T> operation, int attempts, long delay) throws Exception {
        Exception lastException = null;
        for (int i = 0; i < attempts; i++) {
            try {
                return operation.get();
            } catch (Exception e) {
                log.error("retry error.", e);
                lastException = e;
                if (delay > 0 && i < attempts - 1) {
                    Thread.sleep(delay);
                }
            }
        }
        if (null != lastException) {
            throw lastException;
        }
        return null;
    }

    public static void retry(RetryableRunnable runner, int attempts, long delay) throws Exception {
        Exception lastException = null;
        for (int i = 0; i < attempts; i++) {
            try {
                runner.run();
                return;
            } catch (Exception e) {
                log.error("retry error.", e);
                lastException = e;
                if (delay > 0 && i < attempts - 1) {
                    Thread.sleep(delay);
                }
            }
        }
        if (null != lastException) {
            throw lastException;
        }
    }
}

@FunctionalInterface
public interface RetryableRunnable {

    void run() throws Exception;
}

@FunctionalInterface
public interface RetryableSupplier<T> {

    T get() throws Exception;
}

 

然后是业务接口,开启事务管理以及在 feign 接口增加重试机制。

@Service
public class MyService {


    @Autowired
    private MyRepository myRepository;


    @Autowired
    private MyFeignClient myFeignClient;


    @Transactional
    public int updateDatabaseAndCallFeign(Data data) {
        try {
            // 业务校验,所有的写接口都需要加强校验,让异常尽早抛出
            bizCheck();      


            // 写数据库,先执行数据库
            myRepository.save(data);


            // 调用Feign接口,增加重试机制,需要确保服务是幂等的才用重试
            RetryUtil.retry(() -> myFeignClient.sendData(data), 3, 2000);
        } catch (Exception e) {
            log.error();
            // 手动回滚事务
            TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
            return 1;
        }
    }


    @Transactional(rollbackFor=Exception.class)
    public void updateDatabaseAndCallFeign(Data data) {
        // 业务校验,所有的写接口都需要加强校验,让异常尽早抛出
        bizCheck();      


        // 写数据库,先执行数据库
        myRepository.save(data);


        // 调用Feign接口,增加重试机制,需要确保服务是幂等的才用重试
        RetryUtil.retry(() -> myFeignClient.sendData(data), 3, 2000);
    }
}

 

这里先写数据库,可以利用数据库的事务管理回滚。

 

另外会存在一些批量接口场景,会多次调用 feign

  • 写数据库(1-n 个表)
  • 调用 feign 写 APISIX(n 次调用)

 

这种场景在以上方案的基础上,需要增加 feign 接口的回补处理

@Service
public class MyService {


    @Autowired
    private MyRepository myRepository;


    @Autowired
    private MyFeignClient1 myFeignClient1;


    @Autowired
    private MyFeignClient2 myFeignClient2;


    @Transactional
    public void updateDatabaseAndCallFeigns(Data data) {
        try {
            // 业务校验
            bizCheck();


            // 写数据库
            myRepository.save(data);


            List<Object> successList = new ArrayList<>();
            // 调用第一个Feign接口,增加重试机制
            RetryUtil.retry(() -> myFeignClient1.sendData(data), 3, 2000);
            successList.add(first feign info);
            
            // 调用第二个Feign接口,增加重试机制
            RetryUtil.retry(() -> myFeignClient2.sendData(data), 3, 2000);


        } catch (Exception e) {
            // 在捕获异常后,执行补偿逻辑
            try {
                // 如果第二个Feign接口失败,调用第一个Feign接口的补偿方法,根据successList判断
                myFeignClient1.rollbackSendData(data);
            } catch (Exception rollbackException) {
                // 记录补偿操作失败的情况
                log.error();
            }


            // 手动回滚数据库事务
            TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
        }
    }
}

 

另外还会存在一些批量处理的接口,只需要保障单条数据的一致性,即使中途发生异常,也会处理完所有数据,再返回异常数据。

 
@Service
// 1、需要把当前类的对象代理暴露
@EnableAspectJAutoProxy(exposeProxy = true)
public class MyService {


    @Autowired
    private MyRepository myRepository;


    @Autowired
    private MyFeignClient myFeignClient;

    public List<Data> processList(List<Data> dataList) {
        List<Data> failDataList = new ArrayList();
        for (Data singleData : dataList) {
            try {
                // 2、通过代理类调用,如果用this调用的话,事务传播会失效
                ((MyService) AopContext.currentProxy()).processSingle(singleData);
            } catch (Exception e) {
                failDataList.add(data);
            }
        }
        return failDataList;
    }
    
        // 3、开启事务传播,每次处理是单独的事务,单独提交
    @Transactional(propagation = Propagation.REQUIRES_NEW, rollbackFor = Exception.class)
    public void processSingle(Data singleData) {
      // 业务校验,所有的写接口都需要加强校验,让异常尽早抛出
      bizCheck();      

            // 写数据库,先执行数据库
      myRepository.save(data);

            // 调用Feign接口,增加重试机制,需要确保服务是幂等的才用重试
      RetryUtil.retry(() -> myFeignClient.sendData(data), 3, 2000);
    }

}
文章来自个人专栏
数据一致性
1 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0