Skip to content

Commit da5d934

Browse files
committed
分布式锁 redis实现
1 parent 2ecde20 commit da5d934

File tree

5 files changed

+173
-18
lines changed

5 files changed

+173
-18
lines changed

src/main/java/code/cache/CacheSnowCrash.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public Object getWithLock(String key) throws InterruptedException {
6565
Object value = cache.get(key);
6666
if (value == null) { //代表缓存值过期
6767
String lockKey = key;
68-
String lockSeq = lock.getLock(lockKey);
68+
lock.lock(lockKey);
6969
try {
7070
//双重判断
7171
if((value = cache.get(key)) == null){
@@ -74,7 +74,7 @@ public Object getWithLock(String key) throws InterruptedException {
7474
return value;
7575
}
7676
} finally {
77-
lock.unlock(lockKey, lockSeq);
77+
lock.unlock(lockKey);
7878
}
7979
} else{
8080
return value;

src/main/java/code/distribution/lock/DistributedLock.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,14 @@ public interface DistributedLock {
1212
/**
1313
* 获取锁
1414
* @param key 业务Key
15-
* @return 锁序号
15+
* @return
1616
*/
17-
String getLock(String key);
17+
void lock(String key);
1818

1919
/**
2020
* 解锁
2121
* @param key 业务Key
22-
* @param lockSeq 锁序号
2322
* @return
2423
*/
25-
boolean unlock(String key, String lockSeq);
24+
void unlock(String key);
2625
}
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
package code.distribution.lock;
2+
3+
import lombok.Data;
4+
5+
import java.util.HashMap;
6+
import java.util.Map;
7+
import java.util.UUID;
8+
import java.util.concurrent.atomic.AtomicInteger;
9+
10+
/**
11+
* 〈Redis锁〉<p>
12+
* 〈功能详细描述〉
13+
*
14+
* @author zixiao
15+
* @date 2019/8/26
16+
*/
17+
public class RedisLock implements DistributedLock {
18+
19+
private RedisClient redisClient = new RedisClient();
20+
21+
@Override
22+
public void lock(String key) {
23+
long expired = 3000;
24+
long threadId = Thread.currentThread().getId();
25+
Long ttl = tryAcquire(key, threadId, expired);
26+
if(ttl == null){
27+
return;
28+
}
29+
30+
while (true){
31+
ttl = tryAcquire(key, threadId, expired);
32+
if(ttl == null){
33+
return;
34+
}
35+
}
36+
}
37+
38+
private Long tryAcquire(String key, long threadId, long expired){
39+
return redisClient.tryLockInner(key, String.valueOf(threadId), expired);
40+
}
41+
42+
@Override
43+
public void unlock(String key) {
44+
tryRelease(key, Thread.currentThread().getId());
45+
}
46+
47+
private void tryRelease(String key, long threadId){
48+
redisClient.tryUnlockInner(key, String.valueOf(threadId));
49+
}
50+
51+
class RedisClient{
52+
53+
private Map<String/*lockKey*/, Map<String/*lockOwnerId*/, AtomicInteger>> lockHolder = new HashMap<>();
54+
55+
private Map<String, ExpiredObject> expiredMap = new HashMap<>();
56+
57+
public Long tryLockInner(String lockKey, String threadId, long expiredMills){
58+
String threadUk = UUID.randomUUID().toString() + threadId;
59+
//lua保证张一个事务里面
60+
synchronized (lockHolder){
61+
if(!exist(lockKey)){
62+
hset(lockKey, threadUk, 1);
63+
pexpire(lockKey, expiredMills);
64+
System.out.println("Lock success, I'm the first guy.");
65+
return null;
66+
}
67+
else if(hexist(lockKey, threadUk)){
68+
int locks = hincrby(lockKey, threadUk, 1);
69+
pexpire(lockKey, expiredMills);
70+
System.out.println("Locked by me(reentrant), locks="+locks);
71+
return null;
72+
}
73+
System.out.println("Locked by another guy.");
74+
return pttl(lockKey);
75+
}
76+
}
77+
78+
public void tryUnlockInner(String lockKey, String threadId){
79+
String threadUk = UUID.randomUUID().toString() + threadId;
80+
if(!exist(lockKey)){
81+
System.out.println("no lock existed.");
82+
return;
83+
}
84+
else if(hexist(lockKey, threadUk)){
85+
int locks = hdecrby(lockKey, threadUk, 1);
86+
System.out.println("Unlock by me(reentrant), remain locks="+locks);
87+
return;
88+
}
89+
}
90+
91+
public boolean exist(String key){
92+
return lockHolder.containsKey(key) ;
93+
}
94+
95+
public void hset(String key, String subKey, int value){
96+
Map<String, AtomicInteger> map = new HashMap<>();
97+
map.put(subKey, new AtomicInteger(Integer.valueOf(value)));
98+
lockHolder.put(key, map);
99+
}
100+
101+
/**
102+
* ms为单位
103+
*/
104+
public void pexpire(String key, long expireMills){
105+
//expire
106+
expiredMap.put(key, new ExpiredObject(expireMills));
107+
}
108+
109+
public boolean hexist(String key, String subKey){
110+
Map<String, AtomicInteger> map = lockHolder.get(key);
111+
if( map != null && map.containsKey(subKey)){
112+
return true;
113+
}
114+
return false;
115+
}
116+
117+
public int hincrby(String key, String subKey, int delta){
118+
Map<String, AtomicInteger> map = lockHolder.get(key);
119+
return map.get(subKey).addAndGet(delta);
120+
}
121+
122+
public int hdecrby(String key, String subKey, int delta){
123+
Map<String, AtomicInteger> map = lockHolder.get(key);
124+
return map.get(subKey).addAndGet(-1 * delta);
125+
}
126+
127+
public Long pttl(String key){
128+
ExpiredObject expiredObject = expiredMap.get(key);
129+
long esapled = System.currentTimeMillis() - expiredObject.start;
130+
if(esapled >= expiredObject.getExpiredMills()){
131+
return null;
132+
}else{
133+
return expiredObject.getExpiredMills() - esapled;
134+
}
135+
}
136+
}
137+
138+
139+
@Data
140+
class ExpiredObject{
141+
142+
private long start;
143+
144+
private long expiredMills;
145+
146+
public ExpiredObject(long expiredMills) {
147+
this.start = System.currentTimeMillis();
148+
this.expiredMills = expiredMills;
149+
}
150+
}
151+
}

src/main/java/code/distribution/lock/ZooKeeperLock.java

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,10 @@ public ZooKeeperLock(){
4141
zkClient = new ZkClient("zk.tbj.com:2181", 6000, 6000);
4242
}
4343

44+
private ThreadLocal<String> lockSeqHolder = new ThreadLocal<>();
45+
4446
@Override
45-
public String getLock(String key) {
47+
public void lock(String key) {
4648
String lockPath = ROOT + SEP + key;
4749
//1、新建已key作为节点名的锁lockPath
4850
if(!zkClient.exists(lockPath)){
@@ -60,7 +62,7 @@ public String getLock(String key) {
6062
//3、尝试获取锁
6163
tryGetLock(lockPath, myLockSeq);
6264

63-
return myLockSeq;
65+
lockSeqHolder.set(myLockSeq);
6466
}
6567

6668
/**
@@ -79,6 +81,7 @@ public int compare(String o1, String o2) {
7981
});
8082
// a、如果是序号最小的子节点, 则认为获得锁
8183
if(nodeSeqList.size() == 0 || nodeSeqList.size() == 1 || myLockSeq.equals(nodeSeqList.get(0))){
84+
System.out.println(">>>Get lock on node " + myLockSeq);
8285
return;
8386
}
8487

@@ -121,9 +124,13 @@ public void handleDataDeleted(String dataPath) throws Exception {
121124
}
122125

123126
@Override
124-
public boolean unlock(String key, String lockSeq) {
127+
public void unlock(String key) {
128+
String lockSeq = lockSeqHolder.get();
125129
String lockFullPath = ROOT + SEP + key + SEP +lockSeq;
126-
return zkClient.delete(lockFullPath);
130+
if(zkClient.delete(lockFullPath)){
131+
lockSeqHolder.remove();
132+
}
133+
System.out.println(">>>Release lock on node " + lockSeq);
127134
}
128135

129136
public static void main(String[] args) {
@@ -134,17 +141,15 @@ public static void main(String[] args) {
134141
executorService.execute(new Runnable() {
135142
@Override
136143
public void run() {
137-
DistributedLock dsLock = new ZooKeeperLock();
144+
DistributedLock lock = new ZooKeeperLock();
138145
String bizKey = "loanId-5";
139-
String lockSeq = dsLock.getLock(bizKey);
146+
lock.lock(bizKey);
140147
try {
141-
System.out.println(Thread.currentThread().getName() + " get lock on " + lockSeq);
142148
doBiz(bizKey);
143149
} catch (Exception e) {
144150
e.printStackTrace();
145151
} finally {
146-
System.out.println(Thread.currentThread().getName() + " release on " + lockSeq);
147-
dsLock.unlock(bizKey, lockSeq);
152+
lock.unlock(bizKey);
148153
}
149154
}
150155
});

src/main/java/code/http/HttpRequestConfig.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,21 +23,21 @@ public class HttpRequestConfig {
2323

2424
/**
2525
* 从连接池中获取可用连接超时时间
26-
* 尝试从连接池中获取,若是在等待了一定的时间后还没有获取到可用连接,则会抛出获取连接超时异常
26+
* 尝试从连接池中获取,若是在等待了一定的时间后还没有获取到可用连接,则会抛出获取 ConnectionPoolTimeoutException
2727
*/
2828
private int connectionRequestTimeout = TIMEOUT;
2929

3030
/**
3131
* 连接目标超时时间
3232
* 指的是连接目标url的连接超时时间,即客服端发送请求到与目标url建立起连接的最大时间。
33-
* 如果在该时间范围内还没有建立起连接,则就抛出ConnectionTimeOut异常
33+
* 如果在该时间范围内还没有建立起连接,则就抛出 ConnectTimeoutException
3434
*/
3535
private int connectTimeout = TIMEOUT;
3636

3737
/**
3838
* 等待响应超时(读取数据超时)
3939
* 连接上一个url后,获取response的返回等待时间 ,即在与目标url建立连接后,等待放回response的最大时间,
40-
* 在规定时间内没有返回响应的话就抛出SocketTimeout异常
40+
* 在规定时间内没有返回响应的话就抛出 SocketTimeoutException
4141
*/
4242
private int socketTimeout = TIMEOUT<<1;
4343

0 commit comments

Comments
 (0)