优惠券秒杀 全局唯一ID 每个店铺都可以发布优惠券。
当用户抢购时,就会生成订单并保存到tb_voucher_order这张表中,而订单表如果使用数据库自增ID就存在一些问题:
全局ID生成器,是一种在分布式系统 下用来生成全局唯一ID的工具 ,一般要满足下列特性:
ID组成部分 为了增加ID的安全性,我们可以不直接使用Redis自增的数值,而是拼接一些其它信息:
ID的组成部分:
符号位:1bit,永远为0
时间戳:31bit,以秒为单位,可以使用69年
序列号:32bit,秒内的计数器,支持每秒产生2^32个不同ID
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 package com.hmdp.utils;@Component public class RedisIdWorker { private static final long BEGIN_TIMESTAMP = 1640995200L ; private static final int COUNT_BITS = 32 ; @Resource private StringRedisTemplate stringRedisTemplate; public long nextId (String keyPrefix) { LocalDateTime now = LocalDateTime.now(); long nowSecond = now.toEpochSecond(ZoneOffset.UTC); long timestamp = nowSecond - BEGIN_TIMESTAMP; String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd" )); long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date); return timestamp << COUNT_BITS | count; } public static void main (String[] args) { LocalDateTime time = LocalDateTime.of(2022 , 1 , 1 , 0 , 0 , 0 ); long second = time.toEpochSecond(ZoneOffset.UTC); System.out.println("second = " + second); } }
使用测试方法进行测试:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 package com.hmdp;@SpringBootTest class HmDianPingApplicationTests { @Resource private RedisIdWorker redisIdWorker; private ExecutorService es = Executors.newFixedThreadPool(500 ); @Autowired private SpringUtil springUtil; @Test void testIdWorker () throws InterruptedException { CountDownLatch latch = new CountDownLatch (300 ); Runnable task = () -> { for (int i=0 ; i<100 ; i++){ long id = redisIdWorker.nextId("order" ); System.out.println("id = " + id); } latch.countDown(); }; long begin = System.currentTimeMillis(); for (int i=0 ; i<300 ; i++){ es.submit(task); } latch.await(); long end = System.currentTimeMillis(); System.out.println("time = " + (end - begin)); } }
总结 全局唯一ID生成策略:
UUID
Redis自增
snowflake算法
数据库自增
Redis自增ID策略:
每天一个key,方便统计订单量
ID构造是 时间戳 + 计数器
实现优惠券秒杀下单 优惠券 每个店铺都可以发布优惠券,分为平价券和特价券。平价券可以任意购买,而特价券需要秒杀抢购:
表关系如下:
tb_voucher:优惠券的基本信息,优惠金额、使用规则等。
tb_seckill_voucher:优惠券的库存、开始抢购时间,结束抢购时间。特价优惠券才需要填写这些信息。
添加秒杀优惠券 在VoucherController中提供了一个接口,可以添加秒杀优惠券:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 package com.hmdp.controller;@RestController @RequestMapping("/voucher") public class VoucherController { @Resource private IVoucherService voucherService; @PostMapping("seckill") public Result addSeckillVoucher (@RequestBody Voucher voucher) { voucherService.addSeckillVoucher(voucher); return Result.ok(voucher.getId()); } }
实现优惠券秒杀的下单功能 下单时需要判断两点:
秒杀是否开始或结束,如果尚未开始或已经结束则无法下单。
库存是否充足,不足则无法下单。
1.VoucherOrderController。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 package com.hmdp.controller;@RestController @RequestMapping("/voucher-order") public class VoucherOrderController { @Resource private IVoucherOrderService voucherOrderService; @PostMapping("seckill/{id}") public Result seckillVoucher (@PathVariable("id") Long voucherId) { return voucherOrderService.seckillVoucher(voucherId); } }
2.IVoucherOrderService。
1 2 3 4 5 package com.hmdp.service;public interface IVoucherOrderService extends IService <VoucherOrder> { Result seckillVoucher (Long voucherId) ; }
3.VoucherOrderServiceImpl。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 package com.hmdp.service.impl;@Service public class VoucherOrderServiceImpl extends ServiceImpl <VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService { @Resource private ISeckillVoucherService seckillVoucherService; @Autowired private RedisIdWorker redisIdWorker; @Override @Transactional public Result seckillVoucher (Long voucherId) { SeckillVoucher voucher = seckillVoucherService.getById(voucherId); if (voucher.getBeginTime().isAfter(LocalDateTime.now())){ return Result.fail("秒杀尚未开始!" ); } if (voucher.getEndTime().isBefore(LocalDateTime.now())){ return Result.fail("秒杀已经结束!" ); } if (voucher.getStock() < 1 ){ return Result.fail("库存不足" ); } Boolean success = seckillVoucherService.update() .setSql("stock = stock - 1" ) .eq("voucher_id" , voucherId).update(); if (!success){ return Result.fail("库存不足" ); } VoucherOrder voucherOrder = new VoucherOrder (); long orderId = redisIdWorker.nextId("order" ); voucherOrder.setId(orderId); long userId = UserHolder.getUser().getId(); voucherOrder.setUserId(userId); voucherOrder.setVoucherId(voucherId); save(voucherOrder); return Result.ok(orderId); } }
测试 在商店页面点击代金券的限时抢购按钮,抢购成功返回订单号。
超卖问题 超卖现象 1.正常情况下,不出现超卖。
2.多线程并发下,出现超卖现象。
分析:因为一开始有很多线程进来,查询库存时,库存是充足的,此时有线程扣减库存进行了更新,但是有很多线程在此之前已经查询到了旧的库存,导致不一致的情况。此后这些线程会在库存不足的情况下还进行扣减,导致出现了超卖现象。
测试 1.使用Jmeter进行测试,定义线程组,设置200个线程,执行时间为0s。
2.填写HTTP请求。
3.在登录状态头设置Authorization,然后运行。
4.运行结束后在查看结果树中查看运行的结果。
5.在数据库查看库存,可以发现库存是负数,订单数多于100,这是因为出现了超卖现象。
加锁 超卖问题是典型的多线程安全问题 ,针对这一问题的常见解决方案就是加锁 。
1.悲观锁
认为线程安全问题一定会发生,因此在操作数据之前先获取锁,确保线程串行执行。
例如Synchronized、Lock都属于悲观锁。悲观锁添加同步锁,让线程串行执行 。
优点:简单粗暴。
缺点:性能一般。
2.乐观锁
认为线程安全问题不一定会发生,因此不加锁,只是在更新数据时去判断有没有其它线程对数据做了修改。
如果没有修改则认为是安全的,自己才更新数据。
如果已经被其它线程修改说明发生了安全问题,此时可以重试或异常。
乐观锁不加锁,在更新时判断是否有其它线程在修改。
优点:性能好。
缺点:存在成功率低的问题。(当一下子涌入多个线程时,每个线程同时到达查询库存,此时多个线程中库存的值都是相同的,当有一个线程修改了库存,那么这些其他线程都无法成功扣减库存,导致成功率过低。)
乐观锁的关键是判断之前查询得到的数据是否有被修改过 ,常见的方式有两种:
(1)版本号法
(2)CAS法
使用CAS法 弊端:失败的概率大大增加。一开始有无数的线程涌进来,因为没加锁,线程全部并行运行。所以一堆线程查库存时会查到100个库存。加入100个线程都查到了库存为100,但是只会有一条线程执行扣减语句成功,此时库存为99,则剩下的99个线程因为stock查询时100,和现有库存99不一致,导致无法进行扣减,所以这99个线程都会失败。其实这些线程有些是能够进行扣减的,因此导致了失败率大大增加。
1 2 3 4 5 6 Boolean success = seckillVoucherService.update() .setSql("stock = stock - 1" ) .eq("voucher_id" , voucherId) .eq("stock" , voucher.getStock()) .update();
测试
1.使用Jmeter进行测试,定义线程组,设置200个线程,执行时间为0s。
2.填写HTTP请求。在登录状态头设置Authorization,然后运行(和上述超卖现象一样)。
3.运行结束后在查看结果树中查看运行的结果。
4.在数据库查看库存,可以发现库存大概在70-80之间,失败率较高。
只判断库存是否大于0 1 2 3 4 5 6 Boolean success = seckillVoucherService.update() .setSql("stock = stock - 1" ) .eq("voucher_id" , voucherId) .gt("stock" , 0 ) .update();
测试
和上述方法一样使用Jmeter,在数据库查看库存,可以看到刚好库存只剩0,说明没有出现超卖,也没有失败率高的情况。
一人一单 需求:修改秒杀业务,要求同一个优惠券,一个用户只能下一单。
一人一单的并发安全问题
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 @Override @Transactional public Result seckillVoucher (Long voucherId) { SeckillVoucher voucher = seckillVoucherService.getById(voucherId); if (voucher.getBeginTime().isAfter(LocalDateTime.now())){ return Result.fail("秒杀尚未开始!" ); } if (voucher.getEndTime().isBefore(LocalDateTime.now())){ return Result.fail("秒杀已经结束!" ); } if (voucher.getStock() < 1 ){ return Result.fail("库存不足" ); } Long userId = UserHolder.getUser().getId(); int count = query().eq("user_id" , userId).eq("voucher_id" , voucherId).count(); if (count > 0 ){ return Result.fail("用户已经购买一次!" ); } Boolean success = seckillVoucherService.update() .setSql("stock = stock - 1" ) .eq("voucher_id" , voucherId) .gt("stock" , 0 ) .update(); if (!success){ return Result.fail("库存不足" ); } VoucherOrder voucherOrder = new VoucherOrder (); long orderId = redisIdWorker.nextId("order" ); voucherOrder.setId(orderId); voucherOrder.setUserId(userId); voucherOrder.setVoucherId(voucherId); save(voucherOrder); return Result.ok(orderId); }
测试 和超卖问题一样使用Jmeter,在数据库查看库存,可以看到虽然同一个用户没有把所有库存都减为0,但是一个用户也扣减了10个库存,说明还是存在一人多单的问题。
分析:这是因为当一个用户的多个线程进入时,同时查询订单都不存在,此时这多个线程都会去扣减库存,直到建立订单之后的线程才会失败,因此出现一人多单的现象。
解决单点的一人一单并发安全问题
1.引入依赖。
1 2 3 4 <dependency > <groupId > org.aspectj</groupId > <artifactId > aspectjweaver</artifactId > </dependency >
2.IVoucherOrderService。
1 2 3 4 5 6 7 package com.hmdp.service;public interface IVoucherOrderService extends IService <VoucherOrder> { Result seckillVoucher (Long voucherId) ; Result createVoucherOrder (Long voucherId) ; }
3.VoucherOrderServiceImpl。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 package com.hmdp.service.impl;@Service public class VoucherOrderServiceImpl extends ServiceImpl <VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService { @Resource private ISeckillVoucherService seckillVoucherService; @Autowired private RedisIdWorker redisIdWorker; @Override public Result seckillVoucher (Long voucherId) { SeckillVoucher voucher = seckillVoucherService.getById(voucherId); if (voucher.getBeginTime().isAfter(LocalDateTime.now())){ return Result.fail("秒杀尚未开始!" ); } if (voucher.getEndTime().isBefore(LocalDateTime.now())){ return Result.fail("秒杀已经结束!" ); } if (voucher.getStock() < 1 ){ return Result.fail("库存不足" ); } Long userId = UserHolder.getUser().getId(); synchronized (userId.toString().intern()){ IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy(); return proxy.createVoucherOrder(voucherId); } } @Transactional public Result createVoucherOrder (Long voucherId) { Long userId = UserHolder.getUser().getId(); int count = query().eq("user_id" , userId).eq("voucher_id" , voucherId).count(); if (count > 0 ){ return Result.fail("用户已经购买一次!" ); } Boolean success = seckillVoucherService.update() .setSql("stock = stock - 1" ) .eq("voucher_id" , voucherId) .gt("stock" , 0 ) .update(); if (!success){ return Result.fail("库存不足" ); } VoucherOrder voucherOrder = new VoucherOrder (); long orderId = redisIdWorker.nextId("order" ); voucherOrder.setId(orderId); voucherOrder.setUserId(userId); voucherOrder.setVoucherId(voucherId); save(voucherOrder); return Result.ok(orderId); } }
4.启动类HmDianPingApplication暴露代理对象。
1 2 3 4 5 6 7 8 9 10 11 12 package com.hmdp;@EnableAspectJAutoProxy(exposeProxy = true) @MapperScan("com.hmdp.mapper") @SpringBootApplication public class HmDianPingApplication { public static void main (String[] args) { SpringApplication.run(HmDianPingApplication.class, args); } }
测试 和超卖问题一样使用Jmeter,在数据库查看库存,可以看到同一个用户只能下单一次,解决了一人一旦问题。
注意:
参考链接:Spring事务失效的场景
spring 事务失效的 12 种场景_spring 截获duplicatekeyexception 不抛异常-CSDN博客
多台JVM下并发安全问题
测试 通过加锁可以解决在单机情况下的一人一单安全问题,但是在集群模式下就不行了。
我们将服务启动两份,端口分别为8081和8082:
然后修改nginx的conf目录下的nginx.conf文件,配置反向代理和负载均衡:
修改如下两个地方:
1 2 3 4 5 6 7 8 9 10 11 12 http { location /api { #proxy_pass http://127.0.0.1:8081; proxy_pass http://backend; } } upstream backend { server 127.0.0.1:8081 max_fails=5 fail_timeout=10s weight=1; server 127.0.0.1:8082 max_fails=5 fail_timeout=10s weight=1; } }
改完之后重新加载:(如果还不行就重启nginx和重启电脑)
1 2 3 D:\software\nginx-1.18.0-dianping>nginx.exe -s reload D:\software\nginx-1.18.0-dianping>
现在,用户请求会在这两个节点上负载均衡,再次测试下是否存在线程安全问题。
测试:
通过网页访问http://localhost:8080/api/voucher/list/1,可以发现两个节点上都会出现查询请求。
使用Jmeter进行测试,查看数据库发现一个人有多个订单。因此,在集群模式下,锁没有锁住,出现了一人多单的情况。
原因分析:因为多集群下,用户的多个请求可能会被发送到不同的JVM下执行,而不同JVM下的锁监视器是不相同的,因此一个用户可以获得多个锁,从而导致一个用户有多个请求成功,出现一人多单。
分布式锁
分布式锁 :满足分布式系统或集群模式下多进程可见并且互斥的锁 。
分布式锁的核心是实现多进程之间互斥 ,而满足这一点的方式有很多,常见的有三种:
MySQL
Redis
Zookeeper
互斥
利用mysql本身的互斥锁机制 例如:业务执行前去mysql申请互斥锁,然后执行业务,当业务执行完以后提交事务,这时锁释放,当业务抛出异常时,会自动触发回滚,锁就释放了。
利用setnx这样的互斥命令 实现互斥:数据不存在时才能set成功,数据存在时set失败。删除key即可释放锁。
利用节点的唯一性和有序性实现互斥 唯一性:创建节点时,节点不能重复 有序性:每创建一个节点,节点的id是递增的(约定id最小的获取锁成功,由此来实现互斥。释放锁时把节点删除即可。)
高可用
好(mysql支持主从模式)
好(支持主从、集群模式)
好(支持集群)
高性能
一般(受限于mysql的性能)
好
一般(Zookeeper集群强调强一致性,会导致主从之间数据同步消耗时间,性能会比Redis差一些)
安全性
断开连接,自动释放锁
利用锁超时时间,到期释放(分析:服务出现故障,锁不能够自动释放,没有人执行删除动作时,锁会一直在那里得不到释放,其他进程得到锁会产生死锁的现象)
临时节点,断开连接自动释放
基于Redis的分布式锁 实现分布式锁时需要实现的两个基本方法:
1.获取锁:
互斥:确保只能有一个线程获取锁。
非阻塞:尝试一次,成功返回true,失败返回false。
1 2 3 4 # 添加锁,利用setnx的互斥特性 SETNX lock thread1 # 添加锁过期时间,避免服务宕机引起的死锁 EXPIRE lock 10
1 2 # 添加锁,NX是互斥、EX是设置超时时间 SET lock thread1 NX EX 10
2.释放锁:
版本一:基于Redis实现分布式锁初级版本
需求:定义一个类,实现下面接口,利用Redis实现分布式锁功能。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 package com.hmdp.utils;public interface ILock { boolean tryLock (long timeoutSec) ; void unlock () ; }
SimpleRedisLock。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 package com.hmdp.utils;public class SimpleRedisLock implements ILock { private String name; private StringRedisTemplate stringRedisTemplate; public SimpleRedisLock (String name, StringRedisTemplate stringRedisTemplate) { this .name = name; this .stringRedisTemplate = stringRedisTemplate; } private static final String KEY_PREFIX = "lock:" ; @Override public boolean tryLock (long timeoutSec) { long threadId = Thread.currentThread().getId(); Boolean success = stringRedisTemplate.opsForValue() .setIfAbsent(KEY_PREFIX + name, threadId + "" , timeoutSec, TimeUnit.SECONDS); return Boolean.TRUE.equals(success); } @Override public void unlock () { stringRedisTemplate.delete(KEY_PREFIX + name); } }
VoucherOrderServiceImpl。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 @Autowired private StringRedisTemplate stringRedisTemplate;@Override public Result seckillVoucher (Long voucherId) { SeckillVoucher voucher = seckillVoucherService.getById(voucherId); if (voucher.getBeginTime().isAfter(LocalDateTime.now())){ return Result.fail("秒杀尚未开始!" ); } if (voucher.getEndTime().isBefore(LocalDateTime.now())){ return Result.fail("秒杀已经结束!" ); } if (voucher.getStock() < 1 ){ return Result.fail("库存不足" ); } Long userId = UserHolder.getUser().getId(); SimpleRedisLock lock = new SimpleRedisLock ("order:" + userId, stringRedisTemplate); boolean isLock = lock.tryLock(1200 ); if (!isLock){ return Result.fail("不允许重复下单" ); } try { IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy(); return proxy.createVoucherOrder(voucherId); }finally { lock.unlock(); } }
测试:
1.在Postman中准备如下两个一样的请求http://localhost:8080/api/voucher-order/seckill/11,Header携带有效的authorization。必须保证两个相同,才能是同一用户。
2.在 VoucherOrderServiceImpl的seckillVoucher方法的判断是否获取锁成功if(!isLock)处打上断点。
3.运行两个程序,分别在8081和8082端口。同时运行Postman的两个相同的请求,分别进入这两个不同的端口后台进行执行。
4.可以发现,只有其中一个程序获取到了锁,另一个程序没有获取到锁。这时一个用户只能下一个订单,实现了分布式锁。
版本二:一个线程释放另一个线程的锁 问题分析:当线程1获取锁之后发生业务阻塞,阻塞期间触发超时释放锁。此时线程2可以获取锁,并执行业务。在线程2执行业务的过程中,线程1阻塞结束完成业务,并释放了线程2的锁,因此出现了一个释放另一个线程的锁的问题。
解决一个线程释放另一个线程的锁的问题:当释放锁的时候进行判断,判断当前要释放的锁和该线程的锁是否一致,如果一致才能进行释放。
流程图:
需求:修改之前的分布式锁实现,满足:
在获取锁时存入线程标示(可以用UUID表示)。注意:因为不同JVM的线程都是从1开始递增,所以不同JVM的线程有可能出现线程id一样的情况,所以需要用UUID来标识。
在释放锁时先获取锁中的线程标示,判断是否与当前线程标示一致。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 package com.hmdp.utils;public class SimpleRedisLock implements ILock { private String name; private StringRedisTemplate stringRedisTemplate; public SimpleRedisLock (String name, StringRedisTemplate stringRedisTemplate) { this .name = name; this .stringRedisTemplate = stringRedisTemplate; } private static final String KEY_PREFIX = "lock:" ; private static final String ID_PREFIX = UUID.randomUUID().toString(true ) + "-" ; @Override public boolean tryLock (long timeoutSec) { String threadId = ID_PREFIX + Thread.currentThread().getId(); Boolean success = stringRedisTemplate.opsForValue() .setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS); return Boolean.TRUE.equals(success); } @Override public void unlock () { String threadId = ID_PREFIX + Thread.currentThread().getId(); String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name); if (threadId.equals(id)){ stringRedisTemplate.delete(KEY_PREFIX + name); } } }
测试:
1.和上述一样,在Postman中准备如下两个一样的请求http://localhost:8080/api/voucher-order/seckill/11,Header携带有效的authorization。必须保证两个相同,才能是同一用户。
2.和上述一样,在 VoucherOrderServiceImpl的seckillVoucher方法的判断是否获取锁成功if(!isLock)处打上断点。
3.首先发送Postman的一个请求1,程序会进入其中一个端口1执行,这时进入断点,可以发现获取锁成功。
然后去Redis中可以发现该锁的已存在,接下来删除该锁,模拟线程阻塞。
之后发送Postman的另一条同样的请求2,程序会进入另外一个端口2执行,这时进入断点,因为之前的锁已经被删除,此时也能够获取锁成功。
此时回到请求1继续执行,释放锁的时候会判断是否是请求1的锁,不是则直接退出。
回到请求2继续执行,此时判断锁是请求2的锁,会执行释放锁的操作才会退出。
4.总结:上述执行流程保证了请求1无法释放请求2的锁,只有请求2才能释放请求2的锁,保证了释放锁的正确性。
版本三:判断锁标识是否一致和释放锁不同步执行
Redis的Lua脚本
Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性。Lua是一种编程语言,它的基本语法大家可以参考网站:https://www.runoob.com/lua/lua-tutorial.html
Redis提供的调用函数,语法如下:
1 2 # 执行redis命令 redis.call('命令名称' , 'key' , '其它参数' , ...)
例如,我们要执行set name jack,则脚本是这样:
1 2 # 执行 set name jack redis.call('set' , 'name' , 'jack' )
例如,我们要先执行set name Rose,再执行get name,则脚本如下:
1 2 3 4 5 6 # 先执行 set name jack redis.call('set' , 'name' , 'jack' ) # 再执行 get namelocal name = redis.call('get' , 'name' ) # 返回return name
写好脚本以后,需要用Redis命令来调用脚本,调用脚本的常见命令如下:
1 2 3 4 5 127 .0 .0 .1 :6379 > help @scripting EVAL script numkeys key [key ...] arg [arg ...] summary: Execute a Lua script server side since : 2.6.0
例如,我们要执行redis.call('set', 'name', 'jack')这个脚本,语法如下:
如果脚本中的key、value不想写死,可以作为参数传递。key类型参数会放入KEYS数组,其它参数会放入ARGV数组,在脚本中可以从KEYS和ARGV数组获取这些参数:
释放锁的业务流程是这样的:
获取锁中的线程标示。
判断是否与指定的标示(当前线程标示)一致。
如果一致则释放锁(删除)。
如果不一致则什么都不做。
如果用Lua脚本来表示则是这样的:
1 2 3 4 5 6 7 8 if (redis.call('GET' , KEYS[1 ]) == ARGV[1 ]) then return redis.call('DEL' , KEYS[1 ])end return 0
需求:基于Lua脚本实现分布式锁的释放锁逻辑
提示:RedisTemplate调用Lua脚本的API如下:
1.新建unlock.lua脚本,位置在:src/main/resources/unlock.lua。
1 2 3 4 5 6 7 8 if (redis.call('GET' , KEYS[1 ]) == ARGV[1 ]) then return redis.call('DEL' , KEYS[1 ])end return 0
2.修改锁的实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 package com.hmdp.utils;public class SimpleRedisLock implements ILock { private String name; private StringRedisTemplate stringRedisTemplate; public SimpleRedisLock (String name, StringRedisTemplate stringRedisTemplate) { this .name = name; this .stringRedisTemplate = stringRedisTemplate; } private static final String KEY_PREFIX = "lock:" ; private static final String ID_PREFIX = UUID.randomUUID().toString(true ) + "-" ; private static final DefaultRedisScript<Long> UNLOCK_SCRIPT; static { UNLOCK_SCRIPT = new DefaultRedisScript <>(); UNLOCK_SCRIPT.setLocation(new ClassPathResource ("unlock.lua" )); UNLOCK_SCRIPT.setResultType(Long.class); } @Override public boolean tryLock (long timeoutSec) { String threadId = ID_PREFIX + Thread.currentThread().getId(); Boolean success = stringRedisTemplate.opsForValue() .setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS); return Boolean.TRUE.equals(success); } @Override public void unlock () { stringRedisTemplate.execute( UNLOCK_SCRIPT, Collections.singletonList(KEY_PREFIX + name), ID_PREFIX + Thread.currentThread().getId() ); } }
总结 基于Redis的分布式锁实现思路:
利用set nx ex获取锁,并设置过期时间,保存线程标示。
释放锁时先判断线程标示是否与自己一致,一致则删除锁。
特性:
利用set nx满足互斥性。
利用set ex保证故障时锁依然能释放,避免死锁,提高安全性。
利用Redis集群保证高可用和高并发特性。
基于setnx实现的分布式锁存在问题 1.不可重入:同一个线程无法多次获取同一把锁。
2.不可重试:获取锁只尝试一次就返回false,没有重试机制。
3.超时释放:锁超时释放虽然可以避免死锁,但如果是业务执行耗时较长,也会导致锁释放,存在安全隐患。
4.主从一致性:如果Redis提供了主从集群,主从同步存在延迟,当主宕机时,如果从并同步主中的锁数据,则会出现锁实现。
Redisson Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现 。
官网地址: https://redisson.org
GitHub地址: https://github.com/redisson/redisson
Redisson入门 1.引入依赖。
1 2 3 4 5 6 <dependency > <groupId > org.redisson</groupId > <artifactId > redisson</artifactId > <version > 3.40.0</version > </dependency >
2.配置Redisson客户端。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 package com.hmdp.config;@Configuration public class RedissonConfig { @Bean public RedissonClient redissonClient () { Config config = new Config (); config.useSingleServer().setAddress("redis://127.0.0.1:6379" ); return Redisson.create(config); } }
3.使用Redisson的分布式锁。(这里只是测试)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @Resource private RedissonClient redissonClient;@Test void testRedisson () throws InterruptedException { RLock lock = redissonClient.getLock("anyLock" ); boolean isLock = lock.tryLock(1 , 10 , TimeUnit.SECONDS); if (isLock){ try { System.out.println("执行业务" ); }finally { lock.unlock(); } } }
4.在VoucherOrderServiceImpl中修改seckillVoucher方法,只需要将获取锁的方式修改,将创建锁对象的方式修改即可:
1 2 3 RLock lock = redissonClient.getLock("lock:order:" + userId);
完整代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 @Resource private RedissonClient redissonClient;@Override public Result seckillVoucher (Long voucherId) { SeckillVoucher voucher = seckillVoucherService.getById(voucherId); if (voucher.getBeginTime().isAfter(LocalDateTime.now())){ return Result.fail("秒杀尚未开始!" ); } if (voucher.getEndTime().isBefore(LocalDateTime.now())){ return Result.fail("秒杀已经结束!" ); } if (voucher.getStock() < 1 ){ return Result.fail("库存不足" ); } Long userId = UserHolder.getUser().getId(); RLock lock = redissonClient.getLock("lock:order:" + userId); boolean isLock = lock.tryLock(); if (!isLock){ return Result.fail("不允许重复下单" ); } try { IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy(); return proxy.createVoucherOrder(voucherId); }finally { lock.unlock(); } }
测试: 和之前一样,使用Jmeter进行测试,同一个用户200个线程,查看数据库发现一个用户只能下一单,库存只减少了1,所以分布式锁设置成功。
Redisson可重入锁原理 1.出现死锁的原因:当线程1获取锁之后,调用线程2,线程2也需要获得锁,然而此时锁被线程1获得无法释放,线程2等待线程1执行完毕释放锁,线程1等到线程2执行完毕,造成互相等待的情况,这就是死锁。
2.Redisson可重入锁原理:使用哈希表来存储锁,哈希表的value存储获得锁的线程的个数。每当一个线程获得锁都进行加一操作,每当一个线程释放锁都进行减一操作。当value值为0时,即代表没有线程获得锁,则可以进行释放锁的操作。
3.lua脚本。
(1)获取锁的Lua脚本。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 local key = KEYS[1 ]; local threadId = ARGV[1 ]; local releaseTime = ARGV[2 ]; if (redis.call('exists' , key) == 0 ) then redis.call('hset' , key, threadId, '1' ); redis.call('expire' , key, releaseTime); return 1 ; end ;if (redis.call('hexists' , key, threadId) == 1 ) then redis.call('hincrby' , key, threadId, '1' ); redis.call('expire' , key, releaseTime); return 1 ; end ;return 0 ;
(2)释放锁的Lua脚本。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 local key = KEYS[1 ]; local threadId = ARGV[1 ]; local releaseTime = ARGV[2 ]; if (redis.call('HEXISTS' , key, threadId) == 0 ) then return nil ; end ;local count = redis.call('HINCRBY' , key, threadId, -1 );if (count > 0 ) then redis.call('EXPIRE' , key, releaseTime); return nil ;else redis.call('DEL' , key); return nil ;end ;
4.测试Redisson可重入锁代码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 package com.hmdp;@Slf4j @SpringBootTest class RedissonTest { @Resource private RedissonClient redissonClient; private RLock lock; @BeforeEach void setUp () { lock = redissonClient.getLock("order" ); } @Test void method1 () throws InterruptedException { boolean isLock = lock.tryLock(); if (!isLock) { log.error("获取锁失败 .... 1" ); return ; } try { log.info("获取锁成功 .... 1" ); method2(); log.info("开始执行业务 ... 1" ); } finally { log.warn("准备释放锁 .... 1" ); lock.unlock(); } } void method2 () { boolean isLock = lock.tryLock(); if (!isLock) { log.error("获取锁失败 .... 2" ); return ; } try { log.info("获取锁成功 .... 2" ); log.info("开始执行业务 ... 2" ); } finally { log.warn("准备释放锁 .... 2" ); lock.unlock(); } } }
测试: 在method1和method2两个方法的tryLock方法处打上断点,观察Redis中锁的value值变化。method1获取锁时value为1,method2获取锁时value从1变成2,method2释放锁时从2变成1,method1释放锁时value从1变为0,释放掉。
Redisson分布式锁原理
Redisson分布式锁原理:
可重入:利用hash结构记录线程id和重入次数。
可重试:利用信号量和PubSub功能实现等待、唤醒,获取锁失败的重试机制。
超时续约:利用watchDog,每隔一段时间(releaseTime / 3),重置超时时间。
Redisson分布式锁主从一致性问题 1.主从一致性问题:获取锁之后,主节点发生故障,此时还没有进行主从同步,当从节点变为主节点时,找不到锁,就会发生锁失效,这就是主从一致性问题。
2.Redisson分布式锁主从一致性问题。
多个独立的Redis节点,必须在所有节点都获取重入锁,才算获取锁成功。
3.创建多个Redis集群:复制多个Redis文件夹,修改每个Redis文件夹下的redis.windows.conf配置不同端口。这里我使用了三个Redis,端口分别为:6379、6380、6381。
修改RedissonConfig文件,配置不同端口的多个集群。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 package com.hmdp.config;@Configuration public class RedissonConfig { @Bean public RedissonClient redissonClient () { Config config = new Config (); config.useSingleServer().setAddress("redis://127.0.0.1:6379" ); return Redisson.create(config); } @Bean public RedissonClient redissonClient2 () { Config config = new Config (); config.useSingleServer().setAddress("redis://127.0.0.1:6380" ); return Redisson.create(config); } @Bean public RedissonClient redissonClient3 () { Config config = new Config (); config.useSingleServer().setAddress("redis://127.0.0.1:6381" ); return Redisson.create(config); } }
在RedissonTest进行联锁测试。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 package com.hmdp;@Slf4j @SpringBootTest class RedissonTest { @Resource private RedissonClient redissonClient; @Resource private RedissonClient redissonClient2; @Resource private RedissonClient redissonClient3; private RLock lock; @BeforeEach void setUp () { RLock lock1 = redissonClient.getLock("order" ); RLock lock2 = redissonClient2.getLock("order" ); RLock lock3 = redissonClient3.getLock("order" ); lock = redissonClient.getMultiLock(lock1, lock2, lock3); } @Test void method1 () throws InterruptedException { boolean isLock = lock.tryLock(); if (!isLock) { log.error("获取锁失败 .... 1" ); return ; } try { log.info("获取锁成功 .... 1" ); method2(); log.info("开始执行业务 ... 1" ); } finally { log.warn("准备释放锁 .... 1" ); lock.unlock(); } } void method2 () { boolean isLock = lock.tryLock(); if (!isLock) { log.error("获取锁失败 .... 2" ); return ; } try { log.info("获取锁成功 .... 2" ); log.info("开始执行业务 ... 2" ); } finally { log.warn("准备释放锁 .... 2" ); lock.unlock(); } } }
测试: 和之前可重入锁原理时的测试一样,这次观察三个Redis中锁的值的变化。可以发现在三个Redis节点上,所有的value值是统一变化的:在method1和method2两个方法的tryLock方法处打上断点,观察Redis中锁的value值变化。method1获取锁时value为1,method2获取锁时value从1变成2,method2释放锁时从2变成1,method1释放锁时value从1变为0,释放掉。
总结 (1)不可重入Redis分布式锁
原理:利用setnx的互斥性;利用ex避免死锁;释放锁时判断线程标示。
缺陷:不可重入、无法重试、锁超时失效。
(2)可重入的Redis分布式锁
原理:利用hash结构,记录线程标示和重入次数;利用watchDog延续锁时间;利用信号量控制锁重试等待。
缺陷:redis宕机引起锁失效问题。
(3)Redisson的multiLock
原理:多个独立的Redis节点,必须在所有节点都获取重入锁,才算获取锁成功。
缺陷:运维成本高、实现复杂。
Redis优化秒杀 Redis优化秒杀思路:
Redis优化秒杀流程图:
案例 1.新增秒杀优惠券的同时,将优惠券信息保存到Redis中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 package com.hmdp.service.impl;@Service public class VoucherServiceImpl extends ServiceImpl <VoucherMapper, Voucher> implements IVoucherService { @Resource private ISeckillVoucherService seckillVoucherService; @Autowired private StringRedisTemplate stringRedisTemplate; @Override @Transactional public void addSeckillVoucher (Voucher voucher) { save(voucher); SeckillVoucher seckillVoucher = new SeckillVoucher (); seckillVoucher.setVoucherId(voucher.getId()); seckillVoucher.setStock(voucher.getStock()); seckillVoucher.setBeginTime(voucher.getBeginTime()); seckillVoucher.setEndTime(voucher.getEndTime()); seckillVoucherService.save(seckillVoucher); stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString()); } }
2.基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功。
seckill.lua:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 local voucherId = ARGV[1 ]local userId = ARGV[2 ]local stockKey = 'seckill:stock:' .. voucherIdlocal orderKey = 'seckill:order:' .. voucherIdif (tonumber (redis.call('get' , stockKey)) <= 0 ) then return 1 end if (redis.call('sismember' , orderKey, userId) == 1 ) then return 2 end redis.call('incrby' , stockKey, -1 ) redis.call('sadd' , orderKey, userId)return 0
3.如果抢购成功,将优惠券id和用户id封装后存入阻塞队列。
4.开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 package com.hmdp.service.impl;@Slf4j @Service public class VoucherOrderServiceImpl extends ServiceImpl <VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService { @Resource private ISeckillVoucherService seckillVoucherService; @Autowired private RedisIdWorker redisIdWorker; @Autowired private StringRedisTemplate stringRedisTemplate; @Resource private RedissonClient redissonClient; private static final DefaultRedisScript<Long> SECKILL_SCRIPT; static { SECKILL_SCRIPT = new DefaultRedisScript <>(); SECKILL_SCRIPT.setLocation(new ClassPathResource ("seckill.lua" )); SECKILL_SCRIPT.setResultType(Long.class); } private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue <>(1024 *1024 ); private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor(); @PostConstruct private void init () { SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler ()); } private class VoucherOrderHandler implements Runnable { @Override public void run () { while (true ) { try { VoucherOrder voucherOrder = orderTasks.take(); handleVoucherOrder(voucherOrder); } catch (Exception e) { e.printStackTrace(); log.error("处理订单异常" , e); } } } } private void handleVoucherOrder (VoucherOrder voucherOrder) { Long userId = voucherOrder.getUserId(); RLock lock = redissonClient.getLock("lock:order:" + userId); boolean isLock = lock.tryLock(); if (!isLock){ log.error(("不允许重复下单" )); return ; } try { proxy.createVoucherOrder(voucherOrder); }finally { lock.unlock(); } } private IVoucherOrderService proxy; @Override public Result seckillVoucher (Long voucherId) { Long userId = UserHolder.getUser().getId(); Long result = stringRedisTemplate.execute( SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), userId.toString() ); int r = result.intValue(); if (r != 0 ){ return Result.fail(r == 1 ? "库存不足" : "不能重复下单" ); } VoucherOrder voucherOrder = new VoucherOrder (); long orderId = redisIdWorker.nextId("order" ); voucherOrder.setId(orderId); voucherOrder.setUserId(userId); voucherOrder.setVoucherId(voucherId); orderTasks.add(voucherOrder); proxy = (IVoucherOrderService) AopContext.currentProxy(); return Result.ok(orderId); } @Transactional public void createVoucherOrder (VoucherOrder voucherOrder) { Long userId = voucherOrder.getUserId(); int count = query().eq("user_id" , userId).eq("voucher_id" , voucherOrder.getVoucherId()).count(); if (count > 0 ){ log.error("用户已经购买过一次" ); return ; } Boolean success = seckillVoucherService.update() .setSql("stock = stock - 1" ) .eq("voucher_id" , voucherOrder.getVoucherId()) .gt("stock" , 0 ) .update(); if (!success){ log.error("库存不足" ); return ; } save(voucherOrder); } }
测试 在Postman中准备如下两个一样的请求http://localhost:8080/api/voucher-order/seckill/12,Header携带两个不同的有效的authorization。
发送请求,第一次返回订单号,后续请求返回不能重复下单,即为测试成功。
也有使用Jmeter进行高并发测试,但我没有进行测试。
Redis消息队列实现异步秒杀 消息队列(Message Queue),字面意思就是存放消息的队列。最简单的消息队列模型包括3个角色:
消息队列:存储和管理消息,也被称为消息代理(Message Broker)。
生产者:发送消息到消息队列。
消费者:从消息队列获取消息并处理消息。
Redis提供了三种不同的方式来实现消息队列:
list结构:基于List结构模拟消息队列。
PubSub:基本的点对点消息模型。
Stream:比较完善的消息队列模型。
基于List结构模拟消息队列 消息队列(Message Queue),字面意思就是存放消息的队列。而Redis的list数据结构是一个双向链表,很容易模拟出队列效果。
队列是入口和出口不在一边,因此我们可以利用:LPUSH结合RPOP、或者RPUSH结合LPOP来实现。
不过要注意的是,当队列中没有消息时RPOP或LPOP操作会返回null,并不像JVM的阻塞队列那样会阻塞并等待消息。因此这里应该使用BRPOP或者BLPOP来实现阻塞效果。
优点:
利用Redis存储,不受限于JVM内存上限。
基于Redis的持久化机制,数据安全性有保证。
可以满足消息有序性。
缺点:
无法避免消息丢失。(消费者取到消息后没有处理就挂掉了,这条数据就丢失了。)
只支持单消费者。(消息一旦被一个消费者取走,则从队列里移除,其他消费者无法取到。)
基于PubSub的消息队列 PubSub(发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。
SUBSCRIBE channel [channel]:订阅一个或多个频道。
PUBLISH channel msg:向一个频道发送消息。
PSUBSCRIBE pattern[pattern]:订阅与pattern格式匹配的所有频道。
优点:
缺点:
不支持数据持久化。(List结构本质不是消息队列,就是一个链表用于数据存储的,是当成消息队列来用了。Redis中用于存储的数据都支持持久化。而PubSub本身设计出来就是用于消息发送的,当发送一条消息时,这条消息所在的频道没有被订阅,则这条消息就丢失了。发出的所有消息都不会在Redis中保存。)
无法避免消息丢失。
消息堆积有上限,超出时数据丢失。(消费者缓存空间是有上限的。)
基于Stream的消息队列 Stream是Redis 5.0引入的一种新数据类型,可以实现一个功能非常完善的消息队列。
发送消息的命令:
例如:
1 2 3 ## 创建名为 users 的队列,并向其中发送一个消息,内容是:{name=jack,age=21},并且使用Redis自动生成ID 127.0.0.1:6379> XADD users * name jack age 21 "1644805700523-0"
XREAD 读取消息的方式之一:XREAD
例如,使用XREAD读取第一个消息:
1 2 3 4 5 6 7 127.0.0.1:6379[1]> XREAD COUNT 1 STREAMS users 0 1) 1) "users" 2) 1) 1) "1734487324128-0" 2) 1) "name" 2) "jack" 3) "age" 4) "21"
XREAD阻塞方式,读取最新的消息:
1 2 3 4 ## 指定起始ID为$时,代表读取最新的消息 127.0.0.1:6379[1]> XREAD COUNT 1 BLOCK 1000 STREAMS users $ (nil) (1.10s)
在业务开发中,我们可以循环的调用XREAD阻塞方式来查询最新消息,从而实现持续监听队列的效果,伪代码如下:
注意:当我们指定起始ID为$时,代表读取最新的消息,如果我们处理一条消息的过程中,又有超过1条以上的消息到达队列,则下次获取时也只能获取到最新的一条,会出现漏读消息的问题。
STREAM类型消息队列的XREAD命令特点:
消息可回溯。(消息读完后不丢失,永久保存在队列中。可以随时读取)
一个消息可以被多个消费者读取。
可以阻塞读取。
有消息漏读的风险。
消费者组 消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。具备下列特点:
消息分流 :队列中的消息会分流给组内的不同消费者,而不是重复消费,从而加快消息处理的速度。
消息标示 :消费者组会维护一个标示,记录最后一个被处理的消息,哪怕消费者宕机重启,还会从标示之后读取消息。确保每一个消息都会被消费。
消息确认 :消费者获取消息后,消息处于pending状态,并存入一个pending-list。当处理完成后需要通过XACK来确认消息,标记消息为已处理,才会从pending-list移除。
(1)创建消费者组:XGROUP CREATE key groupName ID [MKSTREAM]
key:队列名称。
groupName:消费者组名称。
ID:起始ID标示,$代表队列中最后一个消息,0则代表队列中第一个消息。
MKSTREAM:队列不存在时自动创建队列。
(2)其它常见命令:
1 2 3 4 5 6 7 8 # 删除指定的消费者组 XGROUP DESTORY key groupName # 给指定的消费者组添加消费者 XGROUP CREATECONSUMER key groupname consumername # 删除消费者组中的指定消费者 XGROUP DELCONSUMER key groupname consumername
(3)从消费者组读取消息:
1 XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
group:消费组名称。
consumer:消费者名称,如果消费者不存在,会自动创建一个消费者。
count:本次查询的最大数量。
BLOCK milliseconds:当没有消息时最长等待时间。
NOACK:无需手动ACK,获取到消息后自动确认。
STREAMS key:指定队列名称。
ID:获取消息的起始ID:
(4)消费者监听消息的基本思路:
使用实例:
1.创建消息队列。
1 2 3 4 5 6 7 8 > xadd s1 * k1 v1 1734511630654-0 > xadd s1 * k2 v2 1734511640519-0 > xadd s1 * k3 v3 1734511906934-0 > XGROUP CREATE s1 g1 0 OK
2.创建消费者组,组内有两个消费者读取消息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 127.0.0.1:6379 > select 1 OK127.0.0.1:6379 [1] > xreadgroup group g1 c1 count 1 block 2000 streams s1 >1 ) 1 ) "s1" 2 ) 1 ) 1 ) "1734511630654-0" 2 ) 1 ) "k1" 2 ) "v1" 127.0.0.1:6379 [1] > xreadgroup group g1 c2 count 1 block 2000 streams s1 >1 ) 1 ) "s1" 2 ) 1 ) 1 ) "1734511640519-0" 2 ) 1 ) "k2" 2 ) "v2" 127.0.0.1:6379 [1] > xack s1 g1 1734511630654 -0 (integer) 1 127.0.0.1:6379 [1] > xpending s1 g1 - + 10 1 ) 1 ) "1734511640519-0" 2 ) "c2" 3 ) (integer) 61958 4 ) (integer) 1 127.0.0.1:6379 [1] > xreadgroup group g1 c1 count 1 block 2000 streams s1 0 1 ) 1 ) "s1" 2 ) (empty array)127.0.0.1:6379 [1] > xreadgroup group g1 c2 count 1 block 2000 streams s1 0 1 ) 1 ) "s1" 2 ) 1 ) 1 ) "1734511640519-0" 2 ) 1 ) "k2" 2 ) "v2" 127.0.0.1:6379 [1] > xack s1 g1 1734511640519 -0 (integer) 1 127.0.0.1:6379 [1] > xpending s1 g1 - + 10 (empty array)
STREAM类型消息队列的XREADGROUP命令特点:
消息可回溯。
可以多消费者争抢消息,加快消费速度。
可以阻塞读取。
没有消息漏读的风险。
有消息确认机制,保证消息至少被消费一次。
案例 基于Redis的Stream结构作为消息队列,实现异步秒杀下单。
需求:
1.创建一个Stream类型的消息队列,名为stream.orders。
1 2 > xgroup create stream.orders g1 0 mkstream OK
2.修改之前的秒杀下单Lua脚本,在认定有抢购资格后,直接向stream.orders中添加消息,内容包含voucherId、userId、orderId。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 local voucherId = ARGV[1 ]local userId = ARGV[2 ]local orderId = ARGV[3 ]local stockKey = 'seckill:stock:' .. voucherIdlocal orderKey = 'seckill:order:' .. voucherIdif (tonumber (redis.call('get' , stockKey)) <= 0 ) then return 1 end if (redis.call('sismember' , orderKey, userId) == 1 ) then return 2 end redis.call('incrby' , stockKey, -1 ) redis.call('sadd' , orderKey, userId) redis.call('xadd' , 'stream.orders' , '*' , 'userId' , userId, 'voucherId' , voucherId, 'id' , orderId)return 0
3.项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 package com.hmdp.service.impl;@Slf4j @Service public class VoucherOrderServiceImpl extends ServiceImpl <VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService { @Resource private ISeckillVoucherService seckillVoucherService; @Autowired private RedisIdWorker redisIdWorker; @Autowired private StringRedisTemplate stringRedisTemplate; @Resource private RedissonClient redissonClient; private static final DefaultRedisScript<Long> SECKILL_SCRIPT; static { SECKILL_SCRIPT = new DefaultRedisScript <>(); SECKILL_SCRIPT.setLocation(new ClassPathResource ("seckill.lua" )); SECKILL_SCRIPT.setResultType(Long.class); } private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor(); @PostConstruct private void init () { SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler ()); } private class VoucherOrderHandler implements Runnable { String queueName = "stream.orders" ; @Override public void run () { while (true ) { try { List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read( Consumer.from("g1" , "c1" ), StreamReadOptions.empty().count(1 ).block(Duration.ofSeconds(2 )), StreamOffset.create(queueName, ReadOffset.lastConsumed()) ); if (list == null || list.isEmpty()){ continue ; } MapRecord<String, Object, Object> record = list.get(0 ); Map<Object, Object> values = record.getValue(); VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder (), true ); handleVoucherOrder(voucherOrder); stringRedisTemplate.opsForStream().acknowledge(queueName, "g1" , record.getId()); } catch (Exception e) { e.printStackTrace(); log.error("处理订单异常" , e); handlePendingList(); } } } private void handlePendingList () { while (true ) { try { List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read( Consumer.from("g1" , "c1" ), StreamReadOptions.empty().count(1 ), StreamOffset.create(queueName, ReadOffset.from("0" )) ); if (list == null || list.isEmpty()){ break ; } MapRecord<String, Object, Object> record = list.get(0 ); Map<Object, Object> values = record.getValue(); VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder (), true ); handleVoucherOrder(voucherOrder); stringRedisTemplate.opsForStream().acknowledge(queueName, "g1" , record.getId()); } catch (Exception e) { e.printStackTrace(); log.error("处理pending-list订单异常" , e); try { Thread.sleep(20 ); } catch (InterruptedException ex) { ex.printStackTrace(); } } } } } private void handleVoucherOrder (VoucherOrder voucherOrder) { Long userId = voucherOrder.getUserId(); RLock lock = redissonClient.getLock("lock:order:" + userId); boolean isLock = lock.tryLock(); if (!isLock){ log.error(("不允许重复下单" )); return ; } try { proxy.createVoucherOrder(voucherOrder); }finally { lock.unlock(); } } private IVoucherOrderService proxy; @Override public Result seckillVoucher (Long voucherId) { Long userId = UserHolder.getUser().getId(); long orderId = redisIdWorker.nextId("order" ); Long result = stringRedisTemplate.execute( SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), userId.toString(), String.valueOf(orderId) ); int r = result.intValue(); if (r != 0 ){ return Result.fail(r == 1 ? "库存不足" : "不能重复下单" ); } proxy = (IVoucherOrderService) AopContext.currentProxy(); return Result.ok(orderId); } @Transactional public void createVoucherOrder (VoucherOrder voucherOrder) { Long userId = voucherOrder.getUserId(); int count = query().eq("user_id" , userId).eq("voucher_id" , voucherOrder.getVoucherId()).count(); if (count > 0 ){ log.error("用户已经购买过一次" ); return ; } Boolean success = seckillVoucherService.update() .setSql("stock = stock - 1" ) .eq("voucher_id" , voucherOrder.getVoucherId()) .gt("stock" , 0 ) .update(); if (!success){ log.error("库存不足" ); return ; } save(voucherOrder); } }
测试 在Postman中准备如下两个一样的请求http://localhost:8080/api/voucher-order/seckill/12,Header携带两个不同的有效的authorization。
发送请求,第一次返回订单号,后续请求返回不能重复下单,即为测试成功。
也有使用Jmeter进行高并发测试,但我没有进行测试。
三种消息队列对比
List
PubSub
Stream
消息持久化
支持
不支持
支持
阻塞读取
支持
支持
支持
消息堆积处理
受限于内存空间,可以利用多消费者加快处理
受限于消费者缓冲区
受限于队列长度,可以利用消费者组提高消费速度,减少堆积
消息确认机制
不支持
不支持
支持
消息回溯
不支持
不支持
支持