Java-Redis:实战篇(2)

优惠券秒杀

全局唯一ID

每个店铺都可以发布优惠券。

当用户抢购时,就会生成订单并保存到tb_voucher_order这张表中,而订单表如果使用数据库自增ID就存在一些问题:

  • id的规律性太明显。
  • 受单表数据量的限制。

全局ID生成器,是一种在分布式系统下用来生成全局唯一ID的工具,一般要满足下列特性:

  • 唯一性
  • 高可用
  • 高性能
  • 递增性
  • 安全性
ID组成部分

为了增加ID的安全性,我们可以不直接使用Redis自增的数值,而是拼接一些其它信息:

ID的组成部分:

  • 符号位:1bit,永远为0
  • 时间戳:31bit,以秒为单位,可以使用69年
  • 序列号:32bit,秒内的计数器,支持每秒产生2^32个不同ID
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.hmdp.utils;

@Component
public class RedisIdWorker {
//开始时间戳
private static final long BEGIN_TIMESTAMP = 1640995200L;

//序列号的位数
private static final int COUNT_BITS = 32;

@Resource
private StringRedisTemplate stringRedisTemplate;

public long nextId(String keyPrefix) {
//1.生成时间戳
LocalDateTime now = LocalDateTime.now();
long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
long timestamp = nowSecond - BEGIN_TIMESTAMP;

//2.生成序列号
//2.1 获取当前日期,精确到天
String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
//2.2 自增长
long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);
// 每天的订单对应一个key,不同的订单对应不同的key,key自增的上限是当天的订单数。
// 这样避免了全部的订单对应同一个key,可能全部订单数有一天会超出2^32。

//3.拼接并返回
return timestamp << COUNT_BITS | count;
}

public static void main(String[] args) {
LocalDateTime time = LocalDateTime.of(2022, 1, 1, 0, 0, 0);
long second = time.toEpochSecond(ZoneOffset.UTC);
System.out.println("second = " + second);
}
}

使用测试方法进行测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.hmdp;

@SpringBootTest
class HmDianPingApplicationTests {
@Resource
private RedisIdWorker redisIdWorker;

private ExecutorService es = Executors.newFixedThreadPool(500);//500个线程
@Autowired
private SpringUtil springUtil;

@Test
void testIdWorker() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(300);//使得异步变同步,让计时器有效

Runnable task = () -> {//任务:生成100个id
for(int i=0; i<100; i++){
long id = redisIdWorker.nextId("order");
System.out.println("id = " + id);
}
latch.countDown();//每次任务进行一次,countDown计数器减一
};

long begin = System.currentTimeMillis();
for(int i=0; i<300; i++){//任务提交300次,每次100个id,一共是30000个id
es.submit(task);//异步
}
latch.await();//await谁调用就是让谁暂停,要等CountDownLatch计数器为0,才能进行主线程
long end = System.currentTimeMillis();
System.out.println("time = " + (end - begin));
}
}
总结

全局唯一ID生成策略:

  • UUID
  • Redis自增
  • snowflake算法
  • 数据库自增

Redis自增ID策略:

  • 每天一个key,方便统计订单量
  • ID构造是 时间戳 + 计数器

实现优惠券秒杀下单

优惠券

每个店铺都可以发布优惠券,分为平价券和特价券。平价券可以任意购买,而特价券需要秒杀抢购:

表关系如下:

tb_voucher:优惠券的基本信息,优惠金额、使用规则等。

tb_seckill_voucher:优惠券的库存、开始抢购时间,结束抢购时间。特价优惠券才需要填写这些信息。

添加秒杀优惠券

VoucherController中提供了一个接口,可以添加秒杀优惠券:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package com.hmdp.controller;

@RestController
@RequestMapping("/voucher")
public class VoucherController {
@Resource
private IVoucherService voucherService;

/**
* 新增秒杀券
* @param voucher 优惠券信息,包含秒杀信息
* @return 优惠券id
*/
@PostMapping("seckill")
public Result addSeckillVoucher(@RequestBody Voucher voucher) {
voucherService.addSeckillVoucher(voucher);
return Result.ok(voucher.getId());
}
}

实现优惠券秒杀的下单功能

下单时需要判断两点:

  • 秒杀是否开始或结束,如果尚未开始或已经结束则无法下单。
  • 库存是否充足,不足则无法下单。

1.VoucherOrderController

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package com.hmdp.controller;

@RestController
@RequestMapping("/voucher-order")
public class VoucherOrderController {

@Resource
private IVoucherOrderService voucherOrderService;

@PostMapping("seckill/{id}")
public Result seckillVoucher(@PathVariable("id") Long voucherId) {
return voucherOrderService.seckillVoucher(voucherId);
}
}

2.IVoucherOrderService

1
2
3
4
5
package com.hmdp.service;

public interface IVoucherOrderService extends IService<VoucherOrder> {
Result seckillVoucher(Long voucherId);
}

3.VoucherOrderServiceImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package com.hmdp.service.impl;

@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Resource
private ISeckillVoucherService seckillVoucherService;
@Autowired
private RedisIdWorker redisIdWorker;

@Override
@Transactional
public Result seckillVoucher(Long voucherId){
//1.查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
//2.判断秒杀是否开始
if(voucher.getBeginTime().isAfter(LocalDateTime.now())){
//尚未开始
return Result.fail("秒杀尚未开始!");
}
//3.判断秒杀是否已经结束
if(voucher.getEndTime().isBefore(LocalDateTime.now())){
return Result.fail("秒杀已经结束!");
}
//4.判断库存是否充足
if(voucher.getStock() < 1){
//库存不足
return Result.fail("库存不足");
}
//5.扣减库存
Boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherId).update();
if(!success){
//扣减失败
return Result.fail("库存不足");
}

//6.创建订单
VoucherOrder voucherOrder = new VoucherOrder();
//6.1 订单id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
//6.2 用户id
long userId = UserHolder.getUser().getId();
voucherOrder.setUserId(userId);
//6.3 代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);

//7.返回订单id
return Result.ok(orderId);
}
}
测试

在商店页面点击代金券的限时抢购按钮,抢购成功返回订单号。

超卖问题

超卖现象

1.正常情况下,不出现超卖。

2.多线程并发下,出现超卖现象。

分析:因为一开始有很多线程进来,查询库存时,库存是充足的,此时有线程扣减库存进行了更新,但是有很多线程在此之前已经查询到了旧的库存,导致不一致的情况。此后这些线程会在库存不足的情况下还进行扣减,导致出现了超卖现象。

测试

1.使用Jmeter进行测试,定义线程组,设置200个线程,执行时间为0s。

2.填写HTTP请求

3.在登录状态头设置Authorization,然后运行。

4.运行结束后在查看结果树中查看运行的结果。

5.在数据库查看库存,可以发现库存是负数,订单数多于100,这是因为出现了超卖现象。

加锁

超卖问题是典型的多线程安全问题,针对这一问题的常见解决方案就是加锁

1.悲观锁

认为线程安全问题一定会发生,因此在操作数据之前先获取锁,确保线程串行执行。

  • 例如SynchronizedLock都属于悲观锁。悲观锁添加同步锁,让线程串行执行
  • 优点:简单粗暴。
  • 缺点:性能一般。

2.乐观锁

认为线程安全问题不一定会发生,因此不加锁,只是在更新数据时去判断有没有其它线程对数据做了修改。

  • 如果没有修改则认为是安全的,自己才更新数据。
  • 如果已经被其它线程修改说明发生了安全问题,此时可以重试或异常。

乐观锁不加锁,在更新时判断是否有其它线程在修改。

  • 优点:性能好。
  • 缺点:存在成功率低的问题。(当一下子涌入多个线程时,每个线程同时到达查询库存,此时多个线程中库存的值都是相同的,当有一个线程修改了库存,那么这些其他线程都无法成功扣减库存,导致成功率过低。)

乐观锁的关键是判断之前查询得到的数据是否有被修改过,常见的方式有两种:

(1)版本号法

(2)CAS法

使用CAS法

弊端:失败的概率大大增加。一开始有无数的线程涌进来,因为没加锁,线程全部并行运行。所以一堆线程查库存时会查到100个库存。加入100个线程都查到了库存为100,但是只会有一条线程执行扣减语句成功,此时库存为99,则剩下的99个线程因为stock查询时100,和现有库存99不一致,导致无法进行扣减,所以这99个线程都会失败。其实这些线程有些是能够进行扣减的,因此导致了失败率大大增加。

1
2
3
4
5
6
//5.扣减库存
Boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1") // set stock = stock - 1
.eq("voucher_id", voucherId)
.eq("stock", voucher.getStock()) // where id = ? and stock = ?
.update();

测试

1.使用Jmeter进行测试,定义线程组,设置200个线程,执行时间为0s。

2.填写HTTP请求。在登录状态头设置Authorization,然后运行(和上述超卖现象一样)。

3.运行结束后在查看结果树中查看运行的结果。

4.在数据库查看库存,可以发现库存大概在70-80之间,失败率较高。

只判断库存是否大于0
1
2
3
4
5
6
//5.扣减库存
Boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1") // set stock = stock - 1
.eq("voucher_id", voucherId)
.gt("stock", 0) // where id = ? and stock > 0
.update();

测试

和上述方法一样使用Jmeter,在数据库查看库存,可以看到刚好库存只剩0,说明没有出现超卖,也没有失败率高的情况。

一人一单

需求:修改秒杀业务,要求同一个优惠券,一个用户只能下一单。

一人一单的并发安全问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
@Override
@Transactional
public Result seckillVoucher(Long voucherId){
//1.查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
//2.判断秒杀是否开始
if(voucher.getBeginTime().isAfter(LocalDateTime.now())){
//尚未开始
return Result.fail("秒杀尚未开始!");
}
//3.判断秒杀是否已经结束
if(voucher.getEndTime().isBefore(LocalDateTime.now())){
return Result.fail("秒杀已经结束!");
}
//4.判断库存是否充足
if(voucher.getStock() < 1){
//库存不足
return Result.fail("库存不足");
}

//5.一人一单
Long userId = UserHolder.getUser().getId();
//5.1 查询订单
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
//5.2 判断是否存在
if(count > 0){
return Result.fail("用户已经购买一次!");
}

//6.扣减库存
Boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1") // set stock = stock - 1
.eq("voucher_id", voucherId)
.gt("stock", 0) // where id = ? and stock > 0
.update();
if(!success){
//扣减失败
return Result.fail("库存不足");
}

//7.创建订单
VoucherOrder voucherOrder = new VoucherOrder();
//7.1 订单id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
//7.2 用户id
voucherOrder.setUserId(userId);
//7.3 代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);

//8.返回订单id
return Result.ok(orderId);
}
测试

和超卖问题一样使用Jmeter,在数据库查看库存,可以看到虽然同一个用户没有把所有库存都减为0,但是一个用户也扣减了10个库存,说明还是存在一人多单的问题。

分析:这是因为当一个用户的多个线程进入时,同时查询订单都不存在,此时这多个线程都会去扣减库存,直到建立订单之后的线程才会失败,因此出现一人多单的现象。

解决单点的一人一单并发安全问题

1.引入依赖。

1
2
3
4
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
</dependency>

2.IVoucherOrderService

1
2
3
4
5
6
7
package com.hmdp.service;

public interface IVoucherOrderService extends IService<VoucherOrder> {

Result seckillVoucher(Long voucherId);
Result createVoucherOrder(Long voucherId);
}

3.VoucherOrderServiceImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package com.hmdp.service.impl;

@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Resource
private ISeckillVoucherService seckillVoucherService;
@Autowired
private RedisIdWorker redisIdWorker;

@Override
public Result seckillVoucher(Long voucherId){
//1.查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
//2.判断秒杀是否开始
if(voucher.getBeginTime().isAfter(LocalDateTime.now())){
//尚未开始
return Result.fail("秒杀尚未开始!");
}
//3.判断秒杀是否已经结束
if(voucher.getEndTime().isBefore(LocalDateTime.now())){
return Result.fail("秒杀已经结束!");
}
//4.判断库存是否充足
if(voucher.getStock() < 1){
//库存不足
return Result.fail("库存不足");
}

Long userId = UserHolder.getUser().getId();//每个用户一个锁
// 锁应该锁住整个事务。
// 原因:如果在事务里面定义锁,当锁执行完释放锁之后,其他线程进来查询订单时,
// 事务里面新增的订单可能还没有写入数据库,因为还没有提交事务。所有查询时依然可能存在并发安全问题。
// 因此锁锁定的范围应该是整个事务,在事务提交之后再释放锁。
synchronized (userId.toString().intern()){
// toString不能保证安装值来加锁,因为toString内部new String,所以每调用一次toString都是一个全新的字符串对象,
// 因此即使userId一样,toString之后都是全新的对象,都是不一样的锁。
// 调用intern方法:返回字符串对象的规范表示,去字符串常量池里面找值一样的字符串地址,此时值一样,intern返回结果一样。锁就一样。

// 获取代理对象(事务)
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();//拿到当前对象的代理对象
return proxy.createVoucherOrder(voucherId);

// return createVoucherOrder(voucherId);
// 这里return createVoucherOrder(voucherId);相当于调用 return this.createVoucherOrder(voucherId);
// 这种调用this拿到的是当前VoucherOrderServiceImpl的对象,而不是它的代理对象。
// 事务要想生效用的是Spring对VoucherOrderServiceImpl这个类做了动态代理,拿到他的代理对象,用代理对象做事务处理
// 这里的this指的是非代理对象,也就是目标对象,所以他没有事务功能。(事务失效的可能性之一)
}
}

@Transactional //事务的范围是更新数据库,上述查询数据库不需要加事务,事务的方法必须为public才能生效,不能是private
public Result createVoucherOrder(Long voucherId) {
//5.一人一单
Long userId = UserHolder.getUser().getId();
//5.1 查询订单
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
//5.2 判断是否存在
if(count > 0){
return Result.fail("用户已经购买一次!");
}

//6.扣减库存
Boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1") // set stock = stock - 1
.eq("voucher_id", voucherId)
.gt("stock", 0) // where id = ? and stock > 0
.update();
if(!success){
//扣减失败
return Result.fail("库存不足");
}

//7.创建订单
VoucherOrder voucherOrder = new VoucherOrder();
//7.1 订单id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
//7.2 用户id
voucherOrder.setUserId(userId);
//7.3 代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);

//8.返回订单id
return Result.ok(orderId);
}
}

4.启动类HmDianPingApplication暴露代理对象。

1
2
3
4
5
6
7
8
9
10
11
12
package com.hmdp;

@EnableAspectJAutoProxy(exposeProxy = true)//暴露代理对象(默认是不暴露)
@MapperScan("com.hmdp.mapper")
@SpringBootApplication
public class HmDianPingApplication {

public static void main(String[] args) {
SpringApplication.run(HmDianPingApplication.class, args);
}

}
测试

和超卖问题一样使用Jmeter,在数据库查看库存,可以看到同一个用户只能下单一次,解决了一人一旦问题。

注意:

参考链接:Spring事务失效的场景

spring 事务失效的 12 种场景_spring 截获duplicatekeyexception 不抛异常-CSDN博客

多台JVM下并发安全问题

测试

通过加锁可以解决在单机情况下的一人一单安全问题,但是在集群模式下就不行了。

我们将服务启动两份,端口分别为8081和8082:

然后修改nginxconf目录下的nginx.conf文件,配置反向代理和负载均衡:

修改如下两个地方:

1
2
3
4
5
6
7
8
9
10
11
12
http {
location /api {
#proxy_pass http://127.0.0.1:8081;
proxy_pass http://backend;
}
}

upstream backend {
server 127.0.0.1:8081 max_fails=5 fail_timeout=10s weight=1;
server 127.0.0.1:8082 max_fails=5 fail_timeout=10s weight=1;
}
}

改完之后重新加载:(如果还不行就重启nginx和重启电脑)

1
2
3
D:\software\nginx-1.18.0-dianping>nginx.exe -s reload

D:\software\nginx-1.18.0-dianping>

现在,用户请求会在这两个节点上负载均衡,再次测试下是否存在线程安全问题。

测试:

通过网页访问http://localhost:8080/api/voucher/list/1,可以发现两个节点上都会出现查询请求。

使用Jmeter进行测试,查看数据库发现一个人有多个订单。因此,在集群模式下,锁没有锁住,出现了一人多单的情况。

原因分析:因为多集群下,用户的多个请求可能会被发送到不同的JVM下执行,而不同JVM下的锁监视器是不相同的,因此一个用户可以获得多个锁,从而导致一个用户有多个请求成功,出现一人多单。

分布式锁

分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁

  • 多进程可见
  • 互斥
  • 高可用
  • 高性能
  • 安全性

分布式锁的核心是实现多进程之间互斥,而满足这一点的方式有很多,常见的有三种:

MySQL Redis Zookeeper
互斥 利用mysql本身的互斥锁机制
例如:业务执行前去mysql申请互斥锁,然后执行业务,当业务执行完以后提交事务,这时锁释放,当业务抛出异常时,会自动触发回滚,锁就释放了。
利用setnx这样的互斥命令
实现互斥:数据不存在时才能set成功,数据存在时set失败。删除key即可释放锁。
利用节点的唯一性和有序性实现互斥
唯一性:创建节点时,节点不能重复
有序性:每创建一个节点,节点的id是递增的(约定id最小的获取锁成功,由此来实现互斥。释放锁时把节点删除即可。)
高可用 好(mysql支持主从模式) 好(支持主从、集群模式) 好(支持集群)
高性能 一般(受限于mysql的性能) 一般(Zookeeper集群强调强一致性,会导致主从之间数据同步消耗时间,性能会比Redis差一些)
安全性 断开连接,自动释放锁 利用锁超时时间,到期释放(分析:服务出现故障,锁不能够自动释放,没有人执行删除动作时,锁会一直在那里得不到释放,其他进程得到锁会产生死锁的现象) 临时节点,断开连接自动释放
基于Redis的分布式锁

实现分布式锁时需要实现的两个基本方法:

1.获取锁:

  • 互斥:确保只能有一个线程获取锁。
  • 非阻塞:尝试一次,成功返回true,失败返回false
1
2
3
4
# 添加锁,利用setnx的互斥特性
SETNX lock thread1
# 添加锁过期时间,避免服务宕机引起的死锁
EXPIRE lock 10
1
2
# 添加锁,NX是互斥、EX是设置超时时间
SET lock thread1 NX EX 10

2.释放锁:

  • 手动释放。
  • 超时释放。
1
2
# 释放锁,删除即可
DEL key
版本一:基于Redis实现分布式锁初级版本

需求:定义一个类,实现下面接口,利用Redis实现分布式锁功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package com.hmdp.utils;

public interface ILock {
/**
* 尝试获取锁
* @param timeoutSec 锁持有的超时时间,过期后自动释放
* @return true代表获取锁成功; false代表获取锁失败
*/
boolean tryLock(long timeoutSec);

//释放锁
void unlock();
}

SimpleRedisLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.hmdp.utils;

public class SimpleRedisLock implements ILock{

private String name;
private StringRedisTemplate stringRedisTemplate;

public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}

private static final String KEY_PREFIX = "lock:";

@Override
public boolean tryLock(long timeoutSec) {
//获取线程标识
long threadId = Thread.currentThread().getId();
//获取锁
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + name, threadId + "", timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}

@Override
public void unlock() {
//释放锁
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}

VoucherOrderServiceImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Autowired
private StringRedisTemplate stringRedisTemplate;

@Override
public Result seckillVoucher(Long voucherId){
//1.查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
//2.判断秒杀是否开始
if(voucher.getBeginTime().isAfter(LocalDateTime.now())){
//尚未开始
return Result.fail("秒杀尚未开始!");
}
//3.判断秒杀是否已经结束
if(voucher.getEndTime().isBefore(LocalDateTime.now())){
return Result.fail("秒杀已经结束!");
}
//4.判断库存是否充足
if(voucher.getStock() < 1){
//库存不足
return Result.fail("库存不足");
}

Long userId = UserHolder.getUser().getId();
//创建锁对象
SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);//每个用户一个锁
//获取锁
boolean isLock = lock.tryLock(1200);
//判断是否获取锁成功
if(!isLock){
//获取锁失败,返回错误或重试
return Result.fail("不允许重复下单");
}
try {
//获取代理对象(事务)
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();//拿到当前对象的代理对象
return proxy.createVoucherOrder(voucherId);
}finally {
//释放锁
lock.unlock();
}
}

测试:

1.在Postman中准备如下两个一样的请求http://localhost:8080/api/voucher-order/seckill/11Header携带有效的authorization。必须保证两个相同,才能是同一用户。

2.在 VoucherOrderServiceImplseckillVoucher方法的判断是否获取锁成功if(!isLock)处打上断点。

3.运行两个程序,分别在8081和8082端口。同时运行Postman的两个相同的请求,分别进入这两个不同的端口后台进行执行。

4.可以发现,只有其中一个程序获取到了锁,另一个程序没有获取到锁。这时一个用户只能下一个订单,实现了分布式锁。

版本二:一个线程释放另一个线程的锁

问题分析:当线程1获取锁之后发生业务阻塞,阻塞期间触发超时释放锁。此时线程2可以获取锁,并执行业务。在线程2执行业务的过程中,线程1阻塞结束完成业务,并释放了线程2的锁,因此出现了一个释放另一个线程的锁的问题。

解决一个线程释放另一个线程的锁的问题:当释放锁的时候进行判断,判断当前要释放的锁和该线程的锁是否一致,如果一致才能进行释放。

流程图:

需求:修改之前的分布式锁实现,满足:

  1. 在获取锁时存入线程标示(可以用UUID表示)。注意:因为不同JVM的线程都是从1开始递增,所以不同JVM的线程有可能出现线程id一样的情况,所以需要用UUID来标识。

  2. 在释放锁时先获取锁中的线程标示,判断是否与当前线程标示一致。

    • 如果一致则释放锁。

    • 如果不一致则不释放锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.hmdp.utils;

public class SimpleRedisLock implements ILock{

private String name;
private StringRedisTemplate stringRedisTemplate;

public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}

private static final String KEY_PREFIX = "lock:";
private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";//用UUID作为锁

@Override
public boolean tryLock(long timeoutSec) {
//获取线程标识
String threadId = ID_PREFIX + Thread.currentThread().getId();
//获取锁
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}

@Override
public void unlock() {
//获取线程标识
String threadId = ID_PREFIX + Thread.currentThread().getId();
//获取锁中的标识
String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
//判断标识是否一致
if(threadId.equals(id)){
//释放锁
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}
}

测试:

1.和上述一样,在Postman中准备如下两个一样的请求http://localhost:8080/api/voucher-order/seckill/11Header携带有效的authorization。必须保证两个相同,才能是同一用户。

2.和上述一样,在 VoucherOrderServiceImplseckillVoucher方法的判断是否获取锁成功if(!isLock)处打上断点。

3.首先发送Postman的一个请求1,程序会进入其中一个端口1执行,这时进入断点,可以发现获取锁成功。

然后去Redis中可以发现该锁的已存在,接下来删除该锁,模拟线程阻塞。

之后发送Postman的另一条同样的请求2,程序会进入另外一个端口2执行,这时进入断点,因为之前的锁已经被删除,此时也能够获取锁成功。

此时回到请求1继续执行,释放锁的时候会判断是否是请求1的锁,不是则直接退出。

回到请求2继续执行,此时判断锁是请求2的锁,会执行释放锁的操作才会退出。

4.总结:上述执行流程保证了请求1无法释放请求2的锁,只有请求2才能释放请求2的锁,保证了释放锁的正确性。

版本三:判断锁标识是否一致和释放锁不同步执行

Redis的Lua脚本

Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性。Lua是一种编程语言,它的基本语法大家可以参考网站:https://www.runoob.com/lua/lua-tutorial.html

Redis提供的调用函数,语法如下:

1
2
# 执行redis命令
redis.call('命令名称', 'key', '其它参数', ...)

例如,我们要执行set name jack,则脚本是这样:

1
2
# 执行 set name jack
redis.call('set', 'name', 'jack')

例如,我们要先执行set name Rose,再执行get name,则脚本如下:

1
2
3
4
5
6
# 先执行 set name jack
redis.call('set', 'name', 'jack')
# 再执行 get name
local name = redis.call('get', 'name')
# 返回
return name

写好脚本以后,需要用Redis命令来调用脚本,调用脚本的常见命令如下:

1
2
3
4
5
127.0.0.1:6379> help @scripting

EVAL script numkeys key [key ...] arg [arg ...]
summary: Execute a Lua script server side
since: 2.6.0

例如,我们要执行redis.call('set', 'name', 'jack')这个脚本,语法如下:

如果脚本中的keyvalue不想写死,可以作为参数传递。key类型参数会放入KEYS数组,其它参数会放入ARGV数组,在脚本中可以从KEYSARGV数组获取这些参数:

释放锁的业务流程是这样的:

  1. 获取锁中的线程标示。
  2. 判断是否与指定的标示(当前线程标示)一致。
  3. 如果一致则释放锁(删除)。
  4. 如果不一致则什么都不做。

如果用Lua脚本来表示则是这样的:

1
2
3
4
5
6
7
8
-- 这里的 KEYS[1] 就是锁的key,这里的ARGV[1] 就是当前线程标示
-- 获取锁中的标示,判断是否与当前线程标示一致
if (redis.call('GET', KEYS[1]) == ARGV[1]) then
-- 一致,则删除锁
return redis.call('DEL', KEYS[1])
end
-- 不一致,则直接返回
return 0

需求:基于Lua脚本实现分布式锁的释放锁逻辑

提示:RedisTemplate调用Lua脚本的API如下:

1.新建unlock.lua脚本,位置在:src/main/resources/unlock.lua

1
2
3
4
5
6
7
8
-- 这里的 KEYS[1] 就是锁的key,这里的ARGV[1] 就是当前线程标示
-- 获取锁中的标示,判断是否与当前线程标示一致
if (redis.call('GET', KEYS[1]) == ARGV[1]) then
-- 一致,则删除锁
return redis.call('DEL', KEYS[1])
end
-- 不一致,则直接返回
return 0

2.修改锁的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package com.hmdp.utils;

public class SimpleRedisLock implements ILock{

private String name;
private StringRedisTemplate stringRedisTemplate;

public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}

private static final String KEY_PREFIX = "lock:";
private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";//用UUID作为锁
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static {
UNLOCK_SCRIPT = new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
UNLOCK_SCRIPT.setResultType(Long.class);
}

@Override
public boolean tryLock(long timeoutSec) {
//获取线程标识
String threadId = ID_PREFIX + Thread.currentThread().getId();
//获取锁
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}

@Override
public void unlock() {
//调用lua脚本
stringRedisTemplate.execute(
UNLOCK_SCRIPT,
Collections.singletonList(KEY_PREFIX + name), // 参数KEYS集合
ID_PREFIX + Thread.currentThread().getId() // 参数ARGV
);
}
}
总结

基于Redis的分布式锁实现思路:

  • 利用set nx ex获取锁,并设置过期时间,保存线程标示。
  • 释放锁时先判断线程标示是否与自己一致,一致则删除锁。

特性:

  • 利用set nx满足互斥性。
  • 利用set ex保证故障时锁依然能释放,避免死锁,提高安全性。
  • 利用Redis集群保证高可用和高并发特性。
基于setnx实现的分布式锁存在问题

1.不可重入:同一个线程无法多次获取同一把锁。

2.不可重试:获取锁只尝试一次就返回false,没有重试机制。

3.超时释放:锁超时释放虽然可以避免死锁,但如果是业务执行耗时较长,也会导致锁释放,存在安全隐患。

4.主从一致性:如果Redis提供了主从集群,主从同步存在延迟,当主宕机时,如果从并同步主中的锁数据,则会出现锁实现。

Redisson

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现

官网地址: https://redisson.org

GitHub地址: https://github.com/redisson/redisson

Redisson入门

1.引入依赖。

1
2
3
4
5
6
<!-- redisson -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.40.0</version>
</dependency>

2.配置Redisson客户端。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package com.hmdp.config;

@Configuration
public class RedissonConfig {

@Bean
public RedissonClient redissonClient() {
//配置
Config config = new Config();
// 添加redis地址,这里添加了单点的地址,也可以使用config.useClusterServers()添加集群地址
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
//创建RedissonClient对象
return Redisson.create(config);
}
}

3.使用Redisson的分布式锁。(这里只是测试)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Resource
private RedissonClient redissonClient;

@Test
void testRedisson() throws InterruptedException {
// 获取锁(可重入),指定锁的名称
RLock lock = redissonClient.getLock("anyLock");
// 尝试获取锁,参数分别是:获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位
boolean isLock = lock.tryLock(1, 10, TimeUnit.SECONDS);
// 判断释放获取成功
if(isLock){
try {
System.out.println("执行业务");
}finally {
// 释放锁
lock.unlock();
}
}
}

4.在VoucherOrderServiceImpl中修改seckillVoucher方法,只需要将获取锁的方式修改,将创建锁对象的方式修改即可:

1
2
3
//创建锁对象
//SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);//每个用户一个锁
RLock lock = redissonClient.getLock("lock:order:" + userId);//只添加这一句,注释上一句

完整代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@Resource
private RedissonClient redissonClient;

@Override
public Result seckillVoucher(Long voucherId){
//1.查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
//2.判断秒杀是否开始
if(voucher.getBeginTime().isAfter(LocalDateTime.now())){
//尚未开始
return Result.fail("秒杀尚未开始!");
}
//3.判断秒杀是否已经结束
if(voucher.getEndTime().isBefore(LocalDateTime.now())){
return Result.fail("秒杀已经结束!");
}
//4.判断库存是否充足
if(voucher.getStock() < 1){
//库存不足
return Result.fail("库存不足");
}

Long userId = UserHolder.getUser().getId();
//创建锁对象
//SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);//每个用户一个锁
RLock lock = redissonClient.getLock("lock:order:" + userId);//只添加这一句,注释上一句
//获取锁
boolean isLock = lock.tryLock();//释放不等待
//判断是否获取锁成功
if(!isLock){
//获取锁失败,返回错误或重试
return Result.fail("不允许重复下单");
}
try {
//获取代理对象(事务)
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();//拿到当前对象的代理对象
return proxy.createVoucherOrder(voucherId);
}finally {
//释放锁
lock.unlock();
}
}

测试:和之前一样,使用Jmeter进行测试,同一个用户200个线程,查看数据库发现一个用户只能下一单,库存只减少了1,所以分布式锁设置成功。

Redisson可重入锁原理

1.出现死锁的原因:当线程1获取锁之后,调用线程2,线程2也需要获得锁,然而此时锁被线程1获得无法释放,线程2等待线程1执行完毕释放锁,线程1等到线程2执行完毕,造成互相等待的情况,这就是死锁。

2.Redisson可重入锁原理:使用哈希表来存储锁,哈希表的value存储获得锁的线程的个数。每当一个线程获得锁都进行加一操作,每当一个线程释放锁都进行减一操作。当value值为0时,即代表没有线程获得锁,则可以进行释放锁的操作。

3.lua脚本。

(1)获取锁的Lua脚本。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
local key = KEYS[1]; -- 锁的key
local threadId = ARGV[1]; -- 线程唯一标识
local releaseTime = ARGV[2]; -- 锁的自动释放时间
-- 判断是否存在
if(redis.call('exists', key) == 0) then
-- 不存在, 获取锁
redis.call('hset', key, threadId, '1');
-- 设置有效期
redis.call('expire', key, releaseTime);
return 1; -- 返回结果
end;
-- 锁已经存在,判断threadId是否是自己
if(redis.call('hexists', key, threadId) == 1) then
-- 不存在, 获取锁,重入次数+1
redis.call('hincrby', key, threadId, '1');
-- 设置有效期
redis.call('expire', key, releaseTime);
return 1; -- 返回结果
end;
return 0; -- 代码走到这里,说明获取锁的不是自己,获取锁失败

(2)释放锁的Lua脚本。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
local key = KEYS[1]; -- 锁的key
local threadId = ARGV[1]; -- 线程唯一标识
local releaseTime = ARGV[2]; -- 锁的自动释放时间
-- 判断当前锁是否还是被自己持有
if (redis.call('HEXISTS', key, threadId) == 0) then
return nil; -- 如果已经不是自己,则直接返回
end;
-- 是自己的锁,则重入次数-1
local count = redis.call('HINCRBY', key, threadId, -1);
-- 判断是否重入次数是否已经为0
if (count > 0) then
-- 大于0说明不能释放锁,重置有效期然后返回
redis.call('EXPIRE', key, releaseTime);
return nil;
else -- 等于0说明可以释放锁,直接删除
redis.call('DEL', key);
return nil;
end;

4.测试Redisson可重入锁代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package com.hmdp;

@Slf4j
@SpringBootTest
class RedissonTest {

@Resource
private RedissonClient redissonClient;

private RLock lock;

@BeforeEach
void setUp() {
lock = redissonClient.getLock("order");
}

@Test
void method1() throws InterruptedException {
// 尝试获取锁
boolean isLock = lock.tryLock();
if (!isLock) {
log.error("获取锁失败 .... 1");
return;
}
try {
log.info("获取锁成功 .... 1");
method2();
log.info("开始执行业务 ... 1");
} finally {
log.warn("准备释放锁 .... 1");
lock.unlock();
}
}
void method2() {
// 尝试获取锁
boolean isLock = lock.tryLock();
if (!isLock) {
log.error("获取锁失败 .... 2");
return;
}
try {
log.info("获取锁成功 .... 2");
log.info("开始执行业务 ... 2");
} finally {
log.warn("准备释放锁 .... 2");
lock.unlock();
}
}
}

测试:method1method2两个方法的tryLock方法处打上断点,观察Redis中锁的value值变化。method1获取锁时value为1,method2获取锁时value从1变成2,method2释放锁时从2变成1,method1释放锁时value从1变为0,释放掉。

Redisson分布式锁原理

Redisson分布式锁原理:

  • 可重入:利用hash结构记录线程id和重入次数。
  • 可重试:利用信号量和PubSub功能实现等待、唤醒,获取锁失败的重试机制。
  • 超时续约:利用watchDog,每隔一段时间(releaseTime / 3),重置超时时间。
Redisson分布式锁主从一致性问题

1.主从一致性问题:获取锁之后,主节点发生故障,此时还没有进行主从同步,当从节点变为主节点时,找不到锁,就会发生锁失效,这就是主从一致性问题。

2.Redisson分布式锁主从一致性问题。

多个独立的Redis节点,必须在所有节点都获取重入锁,才算获取锁成功。

3.创建多个Redis集群:复制多个Redis文件夹,修改每个Redis文件夹下的redis.windows.conf配置不同端口。这里我使用了三个Redis,端口分别为:6379、6380、6381。

修改RedissonConfig文件,配置不同端口的多个集群。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.hmdp.config;

@Configuration
public class RedissonConfig {

@Bean
public RedissonClient redissonClient() {
//配置
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
//创建RedissonClient对象
return Redisson.create(config);
}

@Bean
public RedissonClient redissonClient2() {
//配置
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6380");
//创建RedissonClient对象
return Redisson.create(config);
}

@Bean
public RedissonClient redissonClient3() {
//配置
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6381");
//创建RedissonClient对象
return Redisson.create(config);
}
}

RedissonTest进行联锁测试。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package com.hmdp;

@Slf4j
@SpringBootTest
class RedissonTest {

@Resource
private RedissonClient redissonClient;
@Resource
private RedissonClient redissonClient2;
@Resource
private RedissonClient redissonClient3;


private RLock lock;

@BeforeEach
void setUp() {
RLock lock1 = redissonClient.getLock("order");
RLock lock2 = redissonClient2.getLock("order");
RLock lock3 = redissonClient3.getLock("order");

//创建联锁 multiLock
lock = redissonClient.getMultiLock(lock1, lock2, lock3);
}

@Test
void method1() throws InterruptedException {
// 尝试获取锁
boolean isLock = lock.tryLock();
if (!isLock) {
log.error("获取锁失败 .... 1");
return;
}
try {
log.info("获取锁成功 .... 1");
method2();
log.info("开始执行业务 ... 1");
} finally {
log.warn("准备释放锁 .... 1");
lock.unlock();
}
}
void method2() {
// 尝试获取锁
boolean isLock = lock.tryLock();
if (!isLock) {
log.error("获取锁失败 .... 2");
return;
}
try {
log.info("获取锁成功 .... 2");
log.info("开始执行业务 ... 2");
} finally {
log.warn("准备释放锁 .... 2");
lock.unlock();
}
}
}

测试:和之前可重入锁原理时的测试一样,这次观察三个Redis中锁的值的变化。可以发现在三个Redis节点上,所有的value值是统一变化的:在method1method2两个方法的tryLock方法处打上断点,观察Redis中锁的value值变化。method1获取锁时value为1,method2获取锁时value从1变成2,method2释放锁时从2变成1,method1释放锁时value从1变为0,释放掉。

总结

(1)不可重入Redis分布式锁

原理:利用setnx的互斥性;利用ex避免死锁;释放锁时判断线程标示。

缺陷:不可重入、无法重试、锁超时失效。

(2)可重入的Redis分布式锁

原理:利用hash结构,记录线程标示和重入次数;利用watchDog延续锁时间;利用信号量控制锁重试等待。

缺陷:redis宕机引起锁失效问题。

(3)RedissonmultiLock

原理:多个独立的Redis节点,必须在所有节点都获取重入锁,才算获取锁成功。

缺陷:运维成本高、实现复杂。

Redis优化秒杀

Redis优化秒杀思路:

Redis优化秒杀流程图:

案例

1.新增秒杀优惠券的同时,将优惠券信息保存到Redis中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package com.hmdp.service.impl;

@Service
public class VoucherServiceImpl extends ServiceImpl<VoucherMapper, Voucher> implements IVoucherService {
@Resource
private ISeckillVoucherService seckillVoucherService;
@Autowired
private StringRedisTemplate stringRedisTemplate;

@Override
@Transactional
public void addSeckillVoucher(Voucher voucher) {
// 保存优惠券
save(voucher);
// 保存秒杀信息
SeckillVoucher seckillVoucher = new SeckillVoucher();
seckillVoucher.setVoucherId(voucher.getId());
seckillVoucher.setStock(voucher.getStock());
seckillVoucher.setBeginTime(voucher.getBeginTime());
seckillVoucher.setEndTime(voucher.getEndTime());
seckillVoucherService.save(seckillVoucher);

//保存秒杀库存到Redis中
stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString());
}
}

2.基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功。

seckill.lua

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
-- 1.参数列表
-- 1.1.优惠券id
local voucherId = ARGV[1]
-- 1.2.用户id
local userId = ARGV[2]

-- 2.数据key
-- 2.1.库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2.订单key
local orderKey = 'seckill:order:' .. voucherId

-- 3.脚本业务
-- 3.1.判断库存是否充足 get stockKey
if(tonumber(redis.call('get', stockKey)) <= 0) then
-- 3.2.库存不足,返回1
return 1
end
-- 3.2.判断用户是否下单 SISMEMBER orderKey userId
if(redis.call('sismember', orderKey, userId) == 1) then
-- 3.3.存在,说明是重复下单,返回2
return 2
end
-- 3.4.扣库存 incrby stockKey -1
redis.call('incrby', stockKey, -1)
-- 3.5.下单(保存用户)sadd orderKey userId
redis.call('sadd', orderKey, userId)
return 0

3.如果抢购成功,将优惠券id和用户id封装后存入阻塞队列。

4.开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package com.hmdp.service.impl;

@Slf4j
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Resource
private ISeckillVoucherService seckillVoucherService;
@Autowired
private RedisIdWorker redisIdWorker;
@Autowired
private StringRedisTemplate stringRedisTemplate;

@Resource
private RedissonClient redissonClient;

private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
SECKILL_SCRIPT = new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}

//阻塞队列
private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024*1024);//阻塞队列添加新订单
//线程池
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();//获取单线程处理订单

@PostConstruct//在当前类初始化完毕后执行
private void init() {
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}

private class VoucherOrderHandler implements Runnable {
@Override
public void run() {//从阻塞队列取出订单信息处理,该方法应该一初始化就开始执行
while (true) {
try {
//1.获取队列中的订单信息
VoucherOrder voucherOrder = orderTasks.take();//take方法获取不到会阻塞
//2.创建订单
handleVoucherOrder(voucherOrder);
} catch (Exception e) {
e.printStackTrace();
log.error("处理订单异常", e);
}
}
}
}

private void handleVoucherOrder(VoucherOrder voucherOrder) {
//1.获取用户
//Long userId = UserHolder.getUser().getId();//异步的子线程,不能通过ThreadLocal获得userId
Long userId = voucherOrder.getUserId();

//注意:这里其实不用加锁也是可以的。
//2.创建锁对象
RLock lock = redissonClient.getLock("lock:order:" + userId);
//3.获取锁
boolean isLock = lock.tryLock();//释放不等待
//4.判断是否获取锁成功
if(!isLock){
//获取锁失败,返回错误或重试
log.error(("不允许重复下单"));
return;
}
try {
//获取代理对象(事务)
// IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();//拿到当前对象的代理对象
// 注意:这里是拿不到代理对象的,因为currentProxy底层是通过ThreadLocal获取的,
// 当前handleVoucherOrder是基于线程做的,现在是子线程,子线程无法从ThreadLocal取出。
proxy.createVoucherOrder(voucherOrder);
}finally {
//释放锁
lock.unlock();
}
}

private IVoucherOrderService proxy;

@Override
public Result seckillVoucher(Long voucherId){
//获取用户
Long userId = UserHolder.getUser().getId();
//1.执行lua脚本
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(), userId.toString()
);
//2.判断结果是否为0
int r = result.intValue();
if(r != 0){
//2.1 不为0,代表没有购买资格
return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
}
//2.2 为0,有购买资格,把下单信息保存到阻塞队列
//创建订单
VoucherOrder voucherOrder = new VoucherOrder();
//2.3 订单id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
//2.4 用户id
voucherOrder.setUserId(userId);
//2.5 代金券id
voucherOrder.setVoucherId(voucherId);
//2.6 放入阻塞队列
orderTasks.add(voucherOrder);

//3.获取代理对象
proxy = (IVoucherOrderService) AopContext.currentProxy();//拿到当前对象的代理对象

//4.返回订单id
return Result.ok(orderId);
}

@Transactional //事务的范围是更新数据库,上述查询数据库不需要加事务,事务的方法必须为public才能生效,不能是private
public void createVoucherOrder(VoucherOrder voucherOrder) {
//5.一人一单
//Long userId = UserHolder.getUser().getId();//异步的子线程,不能通过ThreadLocal获得userId
Long userId = voucherOrder.getUserId();

//5.1 查询订单
int count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();
//5.2 判断是否存在
if(count > 0){
//用户已经购买过一次
log.error("用户已经购买过一次");
return ;
}

//6.扣减库存
Boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1") // set stock = stock - 1
.eq("voucher_id", voucherOrder.getVoucherId())
.gt("stock", 0) // where id = ? and stock > 0
.update();
if(!success){
//扣减失败
log.error("库存不足");
return ;
}

//7.创建订单
save(voucherOrder);
}
}
测试

Postman中准备如下两个一样的请求http://localhost:8080/api/voucher-order/seckill/12Header携带两个不同的有效的authorization

发送请求,第一次返回订单号,后续请求返回不能重复下单,即为测试成功。

也有使用Jmeter进行高并发测试,但我没有进行测试。

Redis消息队列实现异步秒杀

消息队列(Message Queue),字面意思就是存放消息的队列。最简单的消息队列模型包括3个角色:

  • 消息队列:存储和管理消息,也被称为消息代理(Message Broker)。
  • 生产者:发送消息到消息队列。
  • 消费者:从消息队列获取消息并处理消息。

Redis提供了三种不同的方式来实现消息队列:

  • list结构:基于List结构模拟消息队列。
  • PubSub:基本的点对点消息模型。
  • Stream:比较完善的消息队列模型。
基于List结构模拟消息队列

消息队列(Message Queue),字面意思就是存放消息的队列。而Redislist数据结构是一个双向链表,很容易模拟出队列效果。

队列是入口和出口不在一边,因此我们可以利用:LPUSH结合RPOP、或者RPUSH结合LPOP来实现。

不过要注意的是,当队列中没有消息时RPOPLPOP操作会返回null,并不像JVM的阻塞队列那样会阻塞并等待消息。因此这里应该使用BRPOP或者BLPOP来实现阻塞效果。

优点:

  • 利用Redis存储,不受限于JVM内存上限。
  • 基于Redis的持久化机制,数据安全性有保证。
  • 可以满足消息有序性。

缺点:

  • 无法避免消息丢失。(消费者取到消息后没有处理就挂掉了,这条数据就丢失了。)
  • 只支持单消费者。(消息一旦被一个消费者取走,则从队列里移除,其他消费者无法取到。)

基于PubSub的消息队列

PubSub(发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。

SUBSCRIBE channel [channel]:订阅一个或多个频道。

PUBLISH channel msg:向一个频道发送消息。

PSUBSCRIBE pattern[pattern]:订阅与pattern格式匹配的所有频道。

优点:

  • 采用发布订阅模型,支持多生产、多消费。

缺点:

  • 不支持数据持久化。(List结构本质不是消息队列,就是一个链表用于数据存储的,是当成消息队列来用了。Redis中用于存储的数据都支持持久化。而PubSub本身设计出来就是用于消息发送的,当发送一条消息时,这条消息所在的频道没有被订阅,则这条消息就丢失了。发出的所有消息都不会在Redis中保存。)
  • 无法避免消息丢失。
  • 消息堆积有上限,超出时数据丢失。(消费者缓存空间是有上限的。)

基于Stream的消息队列

StreamRedis 5.0引入的一种新数据类型,可以实现一个功能非常完善的消息队列。

发送消息的命令:

例如:

1
2
3
## 创建名为 users 的队列,并向其中发送一个消息,内容是:{name=jack,age=21},并且使用Redis自动生成ID
127.0.0.1:6379> XADD users * name jack age 21
"1644805700523-0"
XREAD

读取消息的方式之一:XREAD

例如,使用XREAD读取第一个消息:

1
2
3
4
5
6
7
127.0.0.1:6379[1]> XREAD COUNT 1 STREAMS users 0
1) 1) "users"
2) 1) 1) "1734487324128-0"
2) 1) "name"
2) "jack"
3) "age"
4) "21"

XREAD阻塞方式,读取最新的消息:

1
2
3
4
## 指定起始ID为$时,代表读取最新的消息
127.0.0.1:6379[1]> XREAD COUNT 1 BLOCK 1000 STREAMS users $
(nil)
(1.10s)

在业务开发中,我们可以循环的调用XREAD阻塞方式来查询最新消息,从而实现持续监听队列的效果,伪代码如下:

注意:当我们指定起始ID$时,代表读取最新的消息,如果我们处理一条消息的过程中,又有超过1条以上的消息到达队列,则下次获取时也只能获取到最新的一条,会出现漏读消息的问题。

STREAM类型消息队列的XREAD命令特点:

  • 消息可回溯。(消息读完后不丢失,永久保存在队列中。可以随时读取)
  • 一个消息可以被多个消费者读取。
  • 可以阻塞读取。
  • 有消息漏读的风险。
消费者组

消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。具备下列特点:

  1. 消息分流:队列中的消息会分流给组内的不同消费者,而不是重复消费,从而加快消息处理的速度。
  2. 消息标示:消费者组会维护一个标示,记录最后一个被处理的消息,哪怕消费者宕机重启,还会从标示之后读取消息。确保每一个消息都会被消费。
  3. 消息确认:消费者获取消息后,消息处于pending状态,并存入一个pending-list。当处理完成后需要通过XACK来确认消息,标记消息为已处理,才会从pending-list移除。

(1)创建消费者组:XGROUP CREATE key groupName ID [MKSTREAM]

  • key:队列名称。
  • groupName:消费者组名称。
  • ID:起始ID标示,$代表队列中最后一个消息,0则代表队列中第一个消息。
  • MKSTREAM:队列不存在时自动创建队列。

(2)其它常见命令:

1
2
3
4
5
6
7
8
# 删除指定的消费者组
XGROUP DESTORY key groupName

# 给指定的消费者组添加消费者
XGROUP CREATECONSUMER key groupname consumername

# 删除消费者组中的指定消费者
XGROUP DELCONSUMER key groupname consumername

(3)从消费者组读取消息:

1
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
  • group:消费组名称。

  • consumer:消费者名称,如果消费者不存在,会自动创建一个消费者。

  • count:本次查询的最大数量。

  • BLOCK milliseconds:当没有消息时最长等待时间。

  • NOACK:无需手动ACK,获取到消息后自动确认。

  • STREAMS key:指定队列名称。

  • ID:获取消息的起始ID

    • “>”:从下一个未消费的消息开始

    • 其它:根据指定idpending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始。

(4)消费者监听消息的基本思路:

使用实例:

1.创建消息队列。

1
2
3
4
5
6
7
8
> xadd s1 * k1 v1
1734511630654-0
> xadd s1 * k2 v2
1734511640519-0
> xadd s1 * k3 v3
1734511906934-0
> XGROUP CREATE s1 g1 0
OK

2.创建消费者组,组内有两个消费者读取消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
127.0.0.1:6379> select 1
OK
127.0.0.1:6379[1]> xreadgroup group g1 c1 count 1 block 2000 streams s1 >
1) 1) "s1"
2) 1) 1) "1734511630654-0"
2) 1) "k1"
2) "v1"
127.0.0.1:6379[1]> xreadgroup group g1 c2 count 1 block 2000 streams s1 >
1) 1) "s1"
2) 1) 1) "1734511640519-0"
2) 1) "k2"
2) "v2"
127.0.0.1:6379[1]> xack s1 g1 1734511630654-0
(integer) 1
127.0.0.1:6379[1]> xpending s1 g1 - + 10
1) 1) "1734511640519-0"
2) "c2"
3) (integer) 61958
4) (integer) 1
127.0.0.1:6379[1]> xreadgroup group g1 c1 count 1 block 2000 streams s1 0
1) 1) "s1"
2) (empty array)
127.0.0.1:6379[1]> xreadgroup group g1 c2 count 1 block 2000 streams s1 0
1) 1) "s1"
2) 1) 1) "1734511640519-0"
2) 1) "k2"
2) "v2"
127.0.0.1:6379[1]> xack s1 g1 1734511640519-0
(integer) 1
127.0.0.1:6379[1]> xpending s1 g1 - + 10
(empty array)

STREAM类型消息队列的XREADGROUP命令特点:

  • 消息可回溯。
  • 可以多消费者争抢消息,加快消费速度。
  • 可以阻塞读取。
  • 没有消息漏读的风险。
  • 有消息确认机制,保证消息至少被消费一次。
案例

基于RedisStream结构作为消息队列,实现异步秒杀下单。

需求:

1.创建一个Stream类型的消息队列,名为stream.orders

1
2
> xgroup create stream.orders g1 0 mkstream
OK

2.修改之前的秒杀下单Lua脚本,在认定有抢购资格后,直接向stream.orders中添加消息,内容包含voucherIduserIdorderId

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
-- 1.参数列表
-- 1.1.优惠券id
local voucherId = ARGV[1]
-- 1.2.用户id
local userId = ARGV[2]
-- 1.3.订单id
local orderId = ARGV[3]

-- 2.数据key
-- 2.1.库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2.订单key
local orderKey = 'seckill:order:' .. voucherId

-- 3.脚本业务
-- 3.1.判断库存是否充足 get stockKey
if(tonumber(redis.call('get', stockKey)) <= 0) then
-- 3.2.库存不足,返回1
return 1
end
-- 3.2.判断用户是否下单 SISMEMBER orderKey userId
if(redis.call('sismember', orderKey, userId) == 1) then
-- 3.3.存在,说明是重复下单,返回2
return 2
end
-- 3.4.扣库存 incrby stockKey -1
redis.call('incrby', stockKey, -1)
-- 3.5.下单(保存用户)sadd orderKey userId
redis.call('sadd', orderKey, userId)
-- 3.6.发送到消息队列中, XADD stream.orders * k1 v1 k2 v2 ...
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
return 0

3.项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
package com.hmdp.service.impl;

@Slf4j
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Resource
private ISeckillVoucherService seckillVoucherService;
@Autowired
private RedisIdWorker redisIdWorker;
@Autowired
private StringRedisTemplate stringRedisTemplate;

@Resource
private RedissonClient redissonClient;

private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
SECKILL_SCRIPT = new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}

//线程池
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();//获取单线程处理订单

@PostConstruct//在当前类初始化完毕后执行
private void init() {
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}

private class VoucherOrderHandler implements Runnable {
String queueName = "stream.orders";
@Override
public void run() {//从阻塞队列取出订单信息处理,该方法应该一初始化就开始执行
while (true) {
try {
//1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS streams.order >
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
StreamOffset.create(queueName, ReadOffset.lastConsumed())
);
//2.判断消息是否获取成功
if(list == null || list.isEmpty()){
//如果获取失败,说明没有消息,继续下一次循环
continue;
}
//解析消息中的订单信息
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> values = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
//3.如果获取成功,可以下单
handleVoucherOrder(voucherOrder);
//4.ACK确认 SACK stream.orders g1 id
stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
} catch (Exception e) {
e.printStackTrace();
log.error("处理订单异常", e);
handlePendingList();
}
}
}

private void handlePendingList() {
while (true) {
try {
//1.获取pending-list中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 STREAMS streams.order 0
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1),
StreamOffset.create(queueName, ReadOffset.from("0"))
);
//2.判断消息是否获取成功
if(list == null || list.isEmpty()){
//如果获取失败,说明pending-list没有异常消息,结束循环
break;
}
//解析消息中的订单信息
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> values = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
//3.如果获取成功,可以下单
handleVoucherOrder(voucherOrder);
//4.ACK确认 SACK stream.orders g1 id
stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
} catch (Exception e) {
e.printStackTrace();
log.error("处理pending-list订单异常", e);
try {
Thread.sleep(20);//防止抛出异常太频繁,可以休眠一下
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
}
}

private void handleVoucherOrder(VoucherOrder voucherOrder) {
//1.获取用户
//Long userId = UserHolder.getUser().getId();//异步的子线程,不能通过ThreadLocal获得userId
Long userId = voucherOrder.getUserId();

//注意:这里其实不用加锁也是可以的。
//2.创建锁对象
RLock lock = redissonClient.getLock("lock:order:" + userId);
//3.获取锁
boolean isLock = lock.tryLock();//释放不等待
//4.判断是否获取锁成功
if(!isLock){
//获取锁失败,返回错误或重试
log.error(("不允许重复下单"));
return;
}
try {
//获取代理对象(事务)
// IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();//拿到当前对象的代理对象
// 注意:这里是拿不到代理对象的,因为currentProxy底层是通过ThreadLocal获取的,
// 当前handleVoucherOrder是基于线程做的,现在是子线程,子线程无法从ThreadLocal取出。
proxy.createVoucherOrder(voucherOrder);
}finally {
//释放锁
lock.unlock();
}
}

private IVoucherOrderService proxy;

@Override
public Result seckillVoucher(Long voucherId){
//获取用户
Long userId = UserHolder.getUser().getId();
//获取订单id
long orderId = redisIdWorker.nextId("order");
//1.执行lua脚本
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(), userId.toString(), String.valueOf(orderId)
);
//2.判断结果是否为0
int r = result.intValue();
if(r != 0){
//2.1 不为0,代表没有购买资格
return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
}

//3.获取代理对象
proxy = (IVoucherOrderService) AopContext.currentProxy();//拿到当前对象的代理对象

//4.返回订单id
return Result.ok(orderId);
}

@Transactional //事务的范围是更新数据库,上述查询数据库不需要加事务,事务的方法必须为public才能生效,不能是private
public void createVoucherOrder(VoucherOrder voucherOrder) {
//5.一人一单
//Long userId = UserHolder.getUser().getId();//异步的子线程,不能通过ThreadLocal获得userId
Long userId = voucherOrder.getUserId();

//5.1 查询订单
int count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();
//5.2 判断是否存在
if(count > 0){
//用户已经购买过一次
log.error("用户已经购买过一次");
return ;
}

//6.扣减库存
Boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1") // set stock = stock - 1
.eq("voucher_id", voucherOrder.getVoucherId())
.gt("stock", 0) // where id = ? and stock > 0
.update();
if(!success){
//扣减失败
log.error("库存不足");
return ;
}

//7.创建订单
save(voucherOrder);
}
}
测试

Postman中准备如下两个一样的请求http://localhost:8080/api/voucher-order/seckill/12Header携带两个不同的有效的authorization

发送请求,第一次返回订单号,后续请求返回不能重复下单,即为测试成功。

也有使用Jmeter进行高并发测试,但我没有进行测试。

三种消息队列对比
List PubSub Stream
消息持久化 支持 不支持 支持
阻塞读取 支持 支持 支持
消息堆积处理 受限于内存空间,可以利用多消费者加快处理 受限于消费者缓冲区 受限于队列长度,可以利用消费者组提高消费速度,减少堆积
消息确认机制 不支持 不支持 支持
消息回溯 不支持 不支持 支持

Java-Redis:实战篇(2)
http://surourou8.github.io/2024/12/19/Java-Redis:实战篇(2)/
作者
Su Rourou
发布于
2024年12月19日
许可协议