Redis分布式锁实现高并发控制实践

分布式锁

分布式锁是控制分布式系统之间同步访问共享资源的一种方式。在分布式系统中,常常需要协调他们的动作。如果不同的系统或是同一个系统的不同主机之间共享了一个或一组资源,那么访问这些资源的时候,往往需要互斥来防止彼此干扰来保证一致性,在这种情况下,便需要使用到分布式锁。

分布式锁基本功能

1.同一时刻只能存在一个锁

2.需要解决意外死锁问题,也就是锁能超时自动释放

3.支持主动释放锁

分布式锁解决什么问题

多进程并发执行任务时,需要保证任务的有序性或者唯一性

分布式锁适用场景

场景一:从前端界面发起一笔支付请求,如果前端没有做防重处理,那么可能在某一个时刻会有二笔一样的单子同时到达系统后台。

场景二:在App中下订单的时候,点击确认之后,没反应,就又点击了几次。在这种情况下,如果无法保证该接口的幂等性,那么将会出现重复下单问题。
在接收消息的时候,消息推送重复。如果处理消息的接口无法保证幂等,那么重复消费消息产生的影响可能会非常大

场景三:秒杀场景,数据库里有一张表,column分别是商品ID,和商品ID对应的库存量,秒杀成功就将此商品库存量-1。现在假设有1000个线程来秒杀商品,我们来根据这个简单的业务场景来解释一下分布式锁。

代码解释:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
//RedisLock.java

/**
* @Author: Usher
* @Description: redis分布式锁
*/
@Component
@Slf4j
public class RedisLock {

@Autowired
private StringRedisTemplate stringRedisTemplate;


/**
* 加锁
*
* @param key productId - 商品的唯一标志
* @param value 当前时间+超时时间
* @return
*/
//setIfAbsent ,SETINX
public boolean lock(String key, String value) {
if (stringRedisTemplate.opsForValue().setIfAbsent(key, value)) {
//返回true,也就是key不存在,获得锁
return true;
}
//判断锁超时 - 防止原来的操作异常,没有运行解锁操作 防止死锁
String curValue = stringRedisTemplate.opsForValue().get(key);
//如果锁过期
//curValue不为空且小于当前时间,说明锁过期,其他线程获得锁
if (!StringUtils.isEmpty(curValue) && Long.parseLong(curValue) < System.currentTimeMillis()) {
//获取上一个锁的时间value
String oldValue = stringRedisTemplate.opsForValue().getAndSet(key, value);
//假设两个线程同时进来,key被占用了。获取的值currentValue=A(get取的旧的值肯定是一样的),两个线程的value都是B,key都是K.锁时间已经过期了。
//而这里面的getAndSet一次只会一个执行,也就是一个执行之后,上一个的value已经变成了B。只有一个线程获取的上一个值会是A,另一个线程拿到的值是B。
//而oldValue等于A,不等于B,只能有一个线程拿到锁
if (!StringUtils.isEmpty(oldValue) && oldValue.equals(curValue)) {
return true;
}
}
return false;
}

/**
* 解锁
*
* @param key
* @param value
*/
public void unlock(String key, String value) {
try {
String curValue = stringRedisTemplate.opsForValue().get(key);
if (!StringUtils.isEmpty(curValue) && curValue.equals(value)) {
stringRedisTemplate.opsForValue().getOperations().delete(key);//删除key
}
} catch (Exception e) {
log.error("[Redis分布式锁] 解锁出现异常了,{}", e);
}
}
}

Redis的SETNX命令:

key设置值为value,如果key不存在,这种情况下等同SET命令。 当key存在时,什么也不做。SETNX是”SET if Not eXists”的简写。

返回值

Integer reply, 特定值:

  • 1 如果key被设置了
  • 0 如果key没有被设置

##例子

1
2
3
4
5
6
7
redis> SETNX mykey "Hello"
(integer) 1
redis> SETNX mykey "World"
(integer) 0
redis> GET mykey
"Hello"
redis>
Design pattern: Locking with !SETNX
设计模式:使用!SETNX加锁

Please note that:

请注意:

  1. 不鼓励以下模式来实现the Redlock algorithm ,该算法实现起来有一些复杂,但是提供了更好的保证并且具有容错性。
  2. 无论如何,我们保留旧的模式,因为肯定存在一些已实现的方法链接到该页面作为引用。而且,这是一个有趣的例子说明Redis命令能够被用来作为编程原语的。
  3. 无论如何,即使假设一个单例的加锁原语,但是从 2.6.12 开始,可以创建一个更加简单的加锁原语,相当于使用SET命令来获取锁,并且用一个简单的 Lua 脚本来释放锁。该模式被记录在SET命令的页面中。

也就是说,SETNX能够被使用并且以前也在被使用去作为一个加锁原语。例如,获取键为foo的锁,客户端可以尝试一下操作:

1
SETNX lock.foo <current Unix time + lock timeout + 1>

如果客户端获得锁,SETNX返回1,那么将lock.foo键的Unix时间设置为不在被认为有效的时间。客户端随后会使用DEL lock.foo去释放该锁。

如果SETNX返回0,那么该键已经被其他的客户端锁定。如果这是一个非阻塞的锁,才能立刻返回给调用者,或者尝试重新获取该锁,直到成功或者过期超时。

处理死锁

以上加锁算法存在一个问题:如果客户端出现故障,崩溃或者其他情况无法释放该锁会发生什么情况?这是能够检测到这种情况,因为该锁包含一个Unix时间戳,如果这样一个时间戳等于当前的Unix时间,该锁将不再有效。

当以下这种情况发生时,我们不能调用DEL来删除该锁,并且尝试执行一个SETNX,因为这里存在一个竞态条件,当多个客户端察觉到一个过期的锁并且都尝试去释放它。

  • C1 和 C2 读lock.foo检查时间戳,因为他们执行完SETNX后都被返回了0,因为锁仍然被 C3 所持有,并且 C3 已经崩溃。
  • C1 发送DEL lock.foo
  • C1 发送SETNX lock.foo命令并且成功返回
  • C2 发送DEL lock.foo
  • C2 发送SETNX lock.foo命令并且成功返回
  • 错误:由于竞态条件导致 C1 和 C2 都获取到了锁

幸运的是,可以使用以下的算法来避免这种情况,请看 C4 客户端所使用的好的算法:

  • C4 发送SETNX lock.foo为了获得该锁

  • 已经崩溃的客户端 C3 仍然持有该锁,所以Redis将会返回0给 C4

  • C4 发送GET lock.foo检查该锁是否已经过期。如果没有过期,C4 客户端将会睡眠一会,并且从一开始进行重试操作

  • 另一种情况,如果因为 lock.foo键的Unix时间小于当前的Unix时间而导致该锁已经过期,C4 会尝试执行以下的操作:

    1
    GETSET lock.foo <current Unix timestamp + lock timeout + 1>
  • 由于GETSET 的语意,C4会检查已经过期的旧值是否仍然存储在lock.foo中。如果是的话,C4 会获得锁

  • 如果另一个客户端,假如为 C5 ,比 C4 更快的通过GETSET操作获取到锁,那么 C4 执行GETSET操作会被返回一个不过期的时间戳。C4 将会从第一个步骤重新开始。请注意:即使 C4 在将来几秒设置该键,这也不是问题。

为了使这种加锁算法更加的健壮,持有锁的客户端应该总是要检查是否超时,保证使用DEL释放锁之前不会过期,因为客户端故障的情况可能是复杂的,不止是崩溃,还会阻塞一段时间,阻止一些操作的执行,并且在阻塞恢复后尝试执行DEL(此时,该LOCK已经被其他客户端所持有)

Redis的GETSET命令

自动将key对应到value并且返回原来key对应的value。如果key存在但是对应的value不是字符串,就返回错误。

设计模式

GETSET可以和INCR一起使用实现支持重置的计数功能。举个例子:每当有事件发生的时候,一段程序都会调用INCR给key mycounter加1,但是有时我们需要获取计数器的值,并且自动将其重置为0。这可以通过GETSET mycounter “0”来实现:

1
2
3
INCR mycounter
GETSET mycounter "0"
GET mycounter
返回值

bulk-string-reply: 返回之前的旧值,如果之前Key不存在将返回nil

例子
1
2
3
4
5
6
7
redis> INCR mycounter
(integer) 1
redis> GETSET mycounter "0"
"1"
redis> GET mycounter
"0"
redis>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78

/**
* @Author: Usher
* @Description:
*/
@Service
public class SeckillServiceImpl implements SeckillService{

@Autowired
private RedisLock redisLock;

private static final int TIMEOUT = 10*1000;//超时时间 10s

/**
* 活动,特价,限量100000份
*/
static Map<String,Integer> products;//模拟商品信息表
static Map<String,Integer> stock;//模拟库存表
static Map<String,String> orders;//模拟下单成功用户表
static {
/**
* 模拟多个表,商品信息表,库存表,秒杀成功订单表
*/
products = new HashMap<>();
stock = new HashMap<>();
orders = new HashMap<>();
products.put("123456",100000);
stock.put("123456",100000);
}

private String queryMap(String productId){//模拟查询数据库
return "国庆活动,红烧肉特惠,限量"
+products.get(productId)
+"份,还剩:"+stock.get(productId)
+"份,该商品成功下单用户数:"
+orders.size()+"人";
}
@Override
public String querySecKillProductInfo(String productId) {
return this.queryMap(productId);
}

//synchronized锁,没做到细粒度控制。比如有很多商品的秒杀,但是这个把所有商品的秒杀都锁住了。只适合单机的情况,不适合集群,
//采用redis分布式锁
//如果采用redis锁再加syn锁,可以保证都可以获得锁
@Override
public void orderProductMockDiffUser(String productId) {

//加锁
long time = System.currentTimeMillis() + TIMEOUT;
if(!redisLock.lock(productId,String.valueOf(time))){
throw new Exception("很抱歉,人太多了!稍后再试!");
}

//1.查询该商品库存,为0则活动结束
int stockNum = stock.get(productId);
if(stockNum==0){
throw new Exception("活动结束");
}else {
//2.下单
orders.put(KeyUtil.genUniqueKey(),productId);
//3.减库存
stockNum -=1;//不做处理的话,高并发下会出现超卖的情况,下单数,大于减库存的情况。虽然这里减了,但由于并发,减的库存还没存到map中去。新的并发拿到的是原来的库存
try{
Thread.sleep(100);//模拟减库存的处理时间
}catch (InterruptedException e){
e.printStackTrace();
}
stock.put(productId,stockNum);
}

//解锁
redisLock.unlock(productId,String.valueOf(time));

}

}

使用Apache ab压力测试工具,500个请求,100个并发

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
H:\软件\Apache24\bin>ab -n 500 -c 100 http://localhost:8080/sell/skill/order/123456
This is ApacheBench, Version 2.3 <$Revision: 1826891 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licensed to The Apache Software Foundation, http://www.apache.org/

Benchmarking localhost (be patient)
Completed 100 requests
Completed 200 requests
Completed 300 requests
Completed 400 requests
Completed 500 requests
Finished 500 requests


Server Software:
Server Hostname: localhost
Server Port: 8080

Document Path: /sell/skill/order/123456
Document Length: 101 bytes

Concurrency Level: 100
Time taken for tests: 9.222 seconds
Complete requests: 500
Failed requests: 494
(Connect: 0, Receive: 0, Length: 494, Exceptions: 0)
Total transferred: 95270 bytes
HTML transferred: 35680 bytes
Requests per second: 54.22 [#/sec] (mean)
Time per request: 1844.420 [ms] (mean)
Time per request: 18.444 [ms] (mean, across all concurrent requests)
Transfer rate: 10.09 [Kbytes/sec] received

Connection Times (ms)
min mean[+/-sd] median max
Connect: 2 3 0.5 3 8
Processing: 130 1557 1007.6 1012 7236
Waiting: 129 1556 1008.5 1012 7236
Total: 132 1560 1007.6 1015 7238

Percentage of the requests served within a certain time (ms)
50% 1015
66% 1715
75% 1829
80% 2489
90% 2769
95% 3337
98% 4431
99% 5124
100% 7238 (longest request)

测试结果

可以看到库存剩余数量和成功下单数一致,说明实现了分布式锁的并发控制