「黑马点评」七、Redis 消息队列优化异步秒杀下单
异步秒杀下单优化之 Redis Stream 消息队列
需求:
- 创建一个 Stream 类型的消息队列,名为 stream.orders
- 修改之前的秒杀下单 Lua 脚本,在认定有抢购资格后,直接向 stream.orders 中添加消息,内容包含 voucherId、userId、orderId
- 项目启动时,开启一个线程任务,尝试获取 stream.orders 中的消息,完成下单
创建消息队列
消息队列名为 stream.orders,消费者组名为 g1,无则创建
xgroup create stream.orders g1 0 MKSTREAM
在主入口处向消息队列发送资料
@Override
public Result<Long> seckillVoucher(Long voucherId) {
// 1. 查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2. 判断秒杀时间
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
return Result.error("秒杀尚未开始");
}
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
return Result.error("秒杀已结束");
}
// 3. 校验库存以及一人一单
Long orderId = redisIdWorker.nextId("order");
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(),UserHolder.getUser().getId().toString(),orderId.toString()
);
if (result != 0) {
return Result.error(result == 1 ? "库存不足" : "不能重复下单");
}
// 6. 获取代理对象
proxy = (IVoucherOrderService) AopContext.currentProxy();
// 7. 返回订单id
return Result.success(orderId);
}
Lua 脚本
检验库存是否充足以及一人一单(是否在优惠券 set 下)
1.参数列表
local voucherId = ARGV[1]
local userId = ARGV[2]
local orderId = ARGV[3]
-- 2.数据key
local stockKey = 'seckill:stock:' .. voucherId
local orderKey = 'seckill:order:' .. voucherId
-- 3.脚本业务
if(tonumber(redis.call('get', stockKey)) <= 0) then
return 1
end
if(redis.call('sismember', orderKey, userId) == 1) then
return 2
end
redis.call('incrby', stockKey, -1)
redis.call('sadd', orderKey, userId)
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
return 0
异步处理任务的线程任务类
- 从消息队列的分组中读取消息,阻塞 2 s还没读取到重新读取
- 有消息就解析并调用真正的下单函数处理
- 随后进行 ACK 处理
- 如果还没确认成功前出现问题,那么处理 pending-list,从 pending-list 中读取(也就是从 0 读没有 ack 的消息)并处理,直到读取空为止(也就是处理完)
private class VoucherOrderHandler implements Runnable {
String queueName = "streams.orders";
@Override
public void run() {
while (true) {
try {
// XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS streams.orders >
List<MapRecord<String, Object, Object> > list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)), StreamOffset.create(queueName, ReadOffset.lastConsumed())
);
if (list == null || list.isEmpty()) {
continue;
}
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> value = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
handleVoucherOrder(voucherOrder);
stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
} catch (Exception e) {
log.error("处理订单异常", e);
handlerPendingList();
}
}
}
VoucherOrderHandler 中处理 pending-list 的方法
private void handlerPendingList() {
while (true) {
try {
// XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS streams.orders >
List<MapRecord<String, Object, Object> > list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1),
StreamOffset.create(queueName, ReadOffset.from("0"))
);
if (list == null || list.isEmpty()) {
break;
}
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> value = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
handleVoucherOrder(voucherOrder);
stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
} catch (Exception e) {
log.error("处理 pending-list 异常", e);
try {
Thread.sleep(20);
} catch (Exception e1) {
e1.printStackTrace();
}
}
}
}