首先我这里的服务体系是:(网关认证这些不在此图中)
1 前端循环查询订单详情接口
异步下单,前端APP或者小程序调用下单接口,生成订单号
然后前端使用这个订单号,跳转到订单详情页面 轮循这个订单详情接口,下单成功显示订单详情。
轮循订单详情接口,下单失败,显示错误弹框,然后点击确认退出页面。
小程序的下单流程如下
那首先是根据 sn 来查询订单详情
- 查询 Redis 中是否有订单号标识
- 查询 对应订单号是否下单成功
- 根据订单号 查询 Redis 缓存中的订单详情
@Service
@Slf4j
public class OrderServiceImpl implements OrderService {
@Autowired
private RedisService redisService;
@Override
public OrderInfo getOrderInfoBySn(String sn) {
Boolean aBoolean = redisService.hasKey(Constant.ORDER_SN + sn);
if (!aBoolean) {
throw new RuntimeException("下单失败 库存不足");
}
//1正在处理中
int statues = Integer.parseInt(redisService.get(Constant.ORDER_SN + sn).toString());
if (statues == 1) {
return null;
}
if (statues == 3) {
throw new RuntimeException("下单失败 库存不足");
}
//缓存订单信息
OrderInfo orderInfo = (OrderInfo) redisService.get(Constant.ORDER_SN_NORMAL + sn);
return orderInfo;
}
}
预下单时会生成订单号,然后将订单号标识保存到Redis中,并设置有效期为1分钟,标识状态如下:
- 1 队列正在处理中
- 2 下单成功
- 3 下单失败
2 订单预下单生成订单号
我这里是在 mini-service 服务中预下单,mini-service 中要集成 kafaka
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
然后添加配置如下:
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: Qgchun// 设置默认的生产者消费者所属组id
template:
default-topic: ORDER-CREATE-TOPIC
需要注意的是,开发环境要安装kafaka ,然后在预下单的时候发送下单的消息如下:
@Service
@Slf4j
public class OrderServiceImpl implements OrderService {
@Autowired
private SnowFlakeCompone snowFlakeCompone;
@Autowired
private RedisService redisService;
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
@Value("${spring.kafka.template.default-topic}")
private String kafkaTopic;
@Override
public String asyncCreatOrder(Long userId, Long goodsId) {
//判断库存
Boolean aBoolean = redisService.hasKey(Constant.GOOOD_STOCK + goodsId);
if (!aBoolean) {
throw new RuntimeException("库存不足");
}
//雪花算法订单号
String nextId = snowFlakeCompone.getInstance().nextId() + "";
//Redis保存预订单号状态 1 为正在处理中 2为下单完成 3为下单失败
redisService.set(Constant.ORDER_SN + nextId, 1);
//设置过期时间1分钟
redisService.expire(Constant.ORDER_SN + nextId, 60);
OrderMessage message = new OrderMessage();
message.setSn(nextId);
message.setUserId(userId);
message.setGoodsId(goodsId);
message.setNum(1);//下单商品数量
//kafka下单消息发送
kafkaTemplate.send(kafkaTopic, JSON.toJSONString(message));//使用send方法发送消息,需要传入topic名称
log.info("消息发送至 Kafka 成功");
return nextId;
}
}
3 订单服务定义 kafaka 消费者
创建一个消费者,使用 @KafkaListener 来监听对应的通道
@Slf4j
@Component
public class OrderConsumerListen {
@Resource
private OrderKafakaService orderKafakaService;
@KafkaListener(topics = "ORDER-CREATE-TOPIC")
public void listen(ConsumerRecord<String, String> record) throws Exception {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
// Object -> String
String message = (String) kafkaMessage.get();
// 反序列化
OrderMessage orderMessage = JSON.parseObject(message,OrderMessage.class);
// 创建订单
orderKafakaService.consumerTopicToCreateOrderWithKafka(orderMessage);
}
}
OrderKafakaService 就是实现下单的处理:
- 校验库存,从 Redis 中获取库存信息
- 乐观锁更新数据库库存
- 更新Redis缓存中的库存
- 创建订单
- 更新Redis中订单号的 标识,并缓存订单信息
@Service
@Slf4j
public class OrderKafakaServiceImpl extends ServiceImpl<OrderMapper, OrderInfo> implements OrderKafakaService {
@Resource
private GoodsInfoService goodsInfoService;
@Resource
RedisService redisService;
@Override
@Transactional
public void consumerTopicToCreateOrderWithKafka(OrderMessage orderMessage) throws Exception {
Long goodsId = orderMessage.getGoodsId();
Long userId = orderMessage.getUserId();
String sn = orderMessage.getSn();
Integer num = orderMessage.getNum();
// 第一步 校验库存,从 Redis 中获取
GoodsInfo goodsInfo = checkStockWithRedis(goodsId);
if(goodsInfo==null){
//更新下单标识下单失败
redisService.set(Constant.ORDER_SN+sn,3);
return;
}
// 第二步 乐观锁更新库存和Redis
boolean update = saleStockOptimsticWithRedis(goodsInfo);
if(!update){
//更新下单标识下单失败
redisService.set(Constant.ORDER_SN+sn,3);
return;
}
//第三步 创建订单
GoodsInfo goodsInfo1 = (GoodsInfo) redisService.get(Constant.GOODS_INFO + goodsId);
OrderInfo order = new OrderInfo();
order.setSn(sn);
order.setGoodsId(goodsId);
order.setUserId(userId);
order.setNum(num);
order.setPrice(goodsInfo1.getGoodsPrice().multiply(new BigDecimal(num)));
order.setName(goodsInfo1.getGoodsName());
order.setCreateTime(new Date());
order.setOrderStatues(0);
boolean res = this.save(order);
if (!res) {
//更新下单标识下单失败
redisService.set(Constant.ORDER_SN+sn,3);
return;
}
//第四步 更新预下订单标识
redisService.set(Constant.ORDER_SN+sn,2);
//未支付取消订单 设置过期时间30分钟
redisService.expire(Constant.ORDER_SN+sn,60*30);
//缓存订单信息
redisService.set(Constant.ORDER_SN_NORMAL+sn,order);
log.info("Kafka 消费 Topic 创建订单成功");
}
// Redis 中校验库存
private GoodsInfo checkStockWithRedis(Long goodsId) throws Exception {
int count = Integer.parseInt(redisService.get(Constant.GOOOD_STOCK + goodsId).toString());
int sale = Integer.parseInt(redisService.get(Constant.GOOOD_SALE + goodsId).toString());
int version = Integer.parseInt(redisService.get(Constant.GOOOD_VERSION + goodsId).toString());
String goodsName = redisService.get(Constant.GOODS_NAME + goodsId).toString();
if (count < 1) {
log.info("库存不足");
return null;
}
GoodsInfo stock = new GoodsInfo();
stock.setId(goodsId);
stock.setGoodsStock(count);
stock.setGoodsSale(sale);
stock.setVersion(version);
stock.setGoodsName(goodsName);
return stock;
}
// 更新 DB 和 Redis
private boolean saleStockOptimsticWithRedis(GoodsInfo goodsInfo) throws Exception {
boolean res = goodsInfoService.updateStockByOptimistic(goodsInfo);
if (!res) {
return false;
}
// 更新 Redis
redisService.updateStockWithRedis(goodsInfo);
return true;
}
}