「黑马点评」七、Redis 消息队列优化异步秒杀下单

2025 年 4 月 25 日 星期五(已编辑)
4
这篇文章上次修改于 2025 年 4 月 26 日 星期六,可能部分内容已经不适用,如有疑问可询问作者。

阅读此文章之前,你可能需要首先阅读以下的文章才能更好的理解上下文。

「黑马点评」七、Redis 消息队列优化异步秒杀下单

异步秒杀下单优化之 Redis Stream 消息队列

需求:

  1. 创建一个 Stream 类型的消息队列,名为 stream.orders
  2. 修改之前的秒杀下单 Lua 脚本,在认定有抢购资格后,直接向 stream.orders 中添加消息,内容包含 voucherId、userId、orderId
  3. 项目启动时,开启一个线程任务,尝试获取 stream.orders 中的消息,完成下单

创建消息队列

消息队列名为 stream.orders,消费者组名为 g1,无则创建

xgroup create stream.orders g1 0 MKSTREAM

在主入口处向消息队列发送资料

@Override
public Result<Long> seckillVoucher(Long voucherId) {
    // 1. 查询优惠券
    SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
    // 2. 判断秒杀时间
    if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
        return Result.error("秒杀尚未开始");
    }
    if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
        return Result.error("秒杀已结束");
    }
    // 3. 校验库存以及一人一单
    Long orderId = redisIdWorker.nextId("order");
    Long result = stringRedisTemplate.execute(
        SECKILL_SCRIPT,
        Collections.emptyList(),
        voucherId.toString(),UserHolder.getUser().getId().toString(),orderId.toString()
    );
    if (result != 0) {
        return Result.error(result == 1 ? "库存不足" : "不能重复下单");
    }
    // 6. 获取代理对象
    proxy = (IVoucherOrderService) AopContext.currentProxy();
    // 7. 返回订单id
    return Result.success(orderId);
}

Lua 脚本

检验库存是否充足以及一人一单(是否在优惠券 set 下)

 1.参数列表
local voucherId = ARGV[1]
local userId = ARGV[2]
local orderId = ARGV[3]
 
-- 2.数据key
local stockKey = 'seckill:stock:' .. voucherId
local orderKey = 'seckill:order:' .. voucherId
 
-- 3.脚本业务
if(tonumber(redis.call('get', stockKey)) <= 0) then
    return 1
end
if(redis.call('sismember', orderKey, userId) == 1) then
    return 2
end
redis.call('incrby', stockKey, -1)
redis.call('sadd', orderKey, userId)
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
return 0

异步处理任务的线程任务类

  1. 从消息队列的分组中读取消息,阻塞 2 s还没读取到重新读取
  2. 有消息就解析并调用真正的下单函数处理
  3. 随后进行 ACK 处理
  4. 如果还没确认成功前出现问题,那么处理 pending-list,从 pending-list 中读取(也就是从 0 读没有 ack 的消息)并处理,直到读取空为止(也就是处理完)
private class VoucherOrderHandler implements Runnable {
    String queueName = "streams.orders";
    @Override
    public void run() {
        while (true) {
            try {
                // XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS streams.orders >
                List<MapRecord<String, Object, Object> > list = stringRedisTemplate.opsForStream().read(
                    Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)), StreamOffset.create(queueName, ReadOffset.lastConsumed())
                );
                if (list == null || list.isEmpty()) {
                    continue;
                }
                MapRecord<String, Object, Object> record = list.get(0);
                Map<Object, Object> value = record.getValue();
                VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
                handleVoucherOrder(voucherOrder);
                stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
            } catch (Exception e) {
                log.error("处理订单异常", e);
                handlerPendingList();
            }
        }
    }

VoucherOrderHandler 中处理 pending-list 的方法

private void handlerPendingList() {
    while (true) {
        try {
            // XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS streams.orders >
            List<MapRecord<String, Object, Object> > list = stringRedisTemplate.opsForStream().read(
                Consumer.from("g1", "c1"),
                StreamReadOptions.empty().count(1),
                StreamOffset.create(queueName, ReadOffset.from("0"))
            );
            if (list == null || list.isEmpty()) {
                break;
            }
            MapRecord<String, Object, Object> record = list.get(0);
            Map<Object, Object> value = record.getValue();
            VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
            handleVoucherOrder(voucherOrder);
            stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
        } catch (Exception e) {
            log.error("处理 pending-list 异常", e);
            try {
                Thread.sleep(20);
            } catch (Exception e1) {
                e1.printStackTrace();
            }
        }
    }
}

使用社交账号登录

  • Loading...
  • Loading...
  • Loading...
  • Loading...
  • Loading...