Redis實現(xiàn)分布式鎖
1.根據(jù)lockKey區(qū)進行setnx(set not
exist,如果key值為空,則正常設(shè)置,返回1,否則不會進行設(shè)置并返回0)操作,如果設(shè)置成功,表示已經(jīng)獲得鎖,否則并沒有獲取鎖。
2.如果沒有獲得鎖,去Redis上拿到該key對應(yīng)的值,在該key上我們存儲一個時間戳(用毫秒表示,t1),為了避免死鎖以及其他客戶端占用該鎖超過一定時間(5秒),使用該客戶端當(dāng)前時間戳,與存儲的時間戳作比較。
3.如果沒有超過該key的使用時限,返回false,表示其他人正在占用該key,不能強制使用;如果已經(jīng)超過時限,那我們就可以進行解鎖,使用我們的時間戳來代替該字段的值。
4.但是如果在setnx失敗后,get該值卻無法拿到該字段時,說明操作之前該鎖已經(jīng)被釋放,這個時候,最好的辦法就是重新執(zhí)行一遍setnx方法來獲取其值以獲得該鎖。
釋放鎖:刪除redis中key
?
1 public class RedisKeyLock { 2 private static Logger logger =
Logger.getLogger(RedisKeyLock.class); 3 private final static long
ACCQUIRE_LOCK_TIMEOUT_IN_MS = 10 * 1000; 4 private final static int
EXPIRE_IN_SECOND = 5;//鎖失效時間 5 private final static long WAIT_INTERVAL_IN_MS =
100; 6 private static RedisKeyLock lock; 7 private JedisPool jedisPool; 8
private RedisKeyLock(JedisPool pool){ 9 this.jedisPool = pool; 10 } 11 public
static RedisKeyLock getInstance(JedisPool pool){ 12 if(lock == null){ 13 lock =
new RedisKeyLock(pool); 14 } 15 return lock; 16 } 17 18 public void lock(final
String redisKey) {19 Jedis resource = null; 20 try { 21 long now =
System.currentTimeMillis();22 resource = jedisPool.getResource(); 23 long
timeoutAt = now + ACCQUIRE_LOCK_TIMEOUT_IN_MS; 24 boolean flag = false; 25 while
(true) { 26 String expireAt = String.valueOf(now + EXPIRE_IN_SECOND * 1000); 27
long ret = resource.setnx(redisKey, expireAt); 28 if (ret == 1) {//已獲取鎖 29 flag
=true; 30 break; 31 } else {//未獲取鎖,重試獲取鎖 32 String oldExpireAt =
resource.get(redisKey);33 if (oldExpireAt != null &&
Long.parseLong(oldExpireAt) < now) { 34 oldExpireAt = resource.getSet(redisKey,
expireAt);35 if (Long.parseLong(oldExpireAt) < now) { 36 flag = true; 37 break;
38 } 39 } 40 } 41 if (timeoutAt < now) { 42 break; 43 } 44
TimeUnit.NANOSECONDS.sleep(WAIT_INTERVAL_IN_MS);45 } 46 if (!flag) { 47 throw
new RuntimeException("canot acquire lock now ..."); 48 } 49 } catch
(JedisException je) {50 logger.error("lock", je); 51 je.printStackTrace(); 52
if (resource != null) { 53 jedisPool.returnBrokenResource(resource); 54 } 55 }
catch (Exception e) { 56 e.printStackTrace(); 57 logger.error("lock", e); 58 }
finally { 59 if (resource != null) { 60 jedisPool.returnResource(resource); 61
}62 } 63 } 64 public boolean unlock(final String redisKey) { 65 Jedis
resource =null; 66 try { 67 resource = jedisPool.getResource(); 68
resource.del(redisKey);69 return true; 70 } catch (JedisException je) { 71
je.printStackTrace();72 if (resource != null) { 73
jedisPool.returnBrokenResource(resource);74 } 75 return false; 76 } catch
(Exception e) {77 logger.error("lock", e); 78 return false; 79 } finally { 80 if
(resource !=null) { 81 jedisPool.returnResource(resource); 82 } 83 } 84 }
85 }
?
另一個版本:
SET my:lock 隨機值 NX PX 30000
這個的NX的意思就是只有key不存在的時候才會設(shè)置成功,PX 30000的意思是30秒后鎖自動釋放。別人創(chuàng)建的時候如果發(fā)現(xiàn)已經(jīng)有了就不能加鎖了。
釋放鎖就是刪除key,但是一般可以用lua腳本刪除,判斷value一樣才刪除
?
為啥要用隨機值呢?因為如果某個客戶端獲取到了鎖,但是阻塞了很長時間才執(zhí)行完,此時可能已經(jīng)自動釋放鎖了,此時可能別的客戶端已經(jīng)獲取到了這個鎖,要是你這個時候直接刪除key的話會有問題,所以得用隨機值加上面的lua腳本來釋放鎖。(就是根據(jù)這個隨機值來判斷這個鎖是不是自己加的)
?
如果是Redis是單機,會有問題。因為如果是普通的redis單實例,那就是單點故障。單節(jié)點掛了會導(dǎo)致鎖失效。
如果是redis普通主從,那redis主從異步復(fù)制,如果主節(jié)點掛了,key還沒同步到從節(jié)點,此時從節(jié)點切換為主節(jié)點,別人就會拿到鎖。
?
RedLock算法
這個場景是假設(shè)有一個redis cluster,有5個redis master實例。然后執(zhí)行如下步驟獲取一把鎖:
?
獲取當(dāng)前時間戳,單位是毫秒
跟上面類似,輪流嘗試在每個master節(jié)點上創(chuàng)建鎖,過期時間較短,一般就幾十毫秒
嘗試在大多數(shù)節(jié)點上建立一個鎖,比如5個節(jié)點就要求是3個節(jié)點(n / 2 +1)
客戶端計算建立好鎖的時間,如果建立鎖的時間小于超時時間,就算建立成功了
要是鎖建立失敗了,那么就依次刪除這個鎖
只要別人建立了一把分布式鎖,你就得不斷輪詢?nèi)L試獲取鎖
?
?
Zookeeper實現(xiàn)分布式鎖
基于臨時順序節(jié)點:
1.客戶端調(diào)用create()方法創(chuàng)建名為“l(fā)ocknode/guid-lock-”的節(jié)點,需要注意的是,這里節(jié)點的創(chuàng)建類型需要設(shè)置為EPHEMERAL_SEQUENTIAL。
2.客戶端調(diào)用getChildren(“l(fā)ocknode”)方法來獲取所有已經(jīng)創(chuàng)建的子節(jié)點。
3.客戶端獲取到所有子節(jié)點path之后,如果發(fā)現(xiàn)自己在步驟1中創(chuàng)建的節(jié)點是所有節(jié)點中序號最小的,那么就認為這個客戶端獲得了鎖。
4.如果創(chuàng)建的節(jié)點不是所有節(jié)點中序號最小的,那么則監(jiān)視比自己創(chuàng)建節(jié)點的序列號小的最大的節(jié)點,進入等待。直到下次監(jiān)視的子節(jié)點變更的時候,再進行子節(jié)點的獲取,判斷是否獲取鎖。
釋放鎖的過程相對比較簡單,就是刪除自己創(chuàng)建的那個子節(jié)點即可。
?
不太嚴謹?shù)拇a:
?
1 public class ZooKeeperDistributedLock implements Watcher{ 2 3 private
ZooKeeper zk; 4 private String locksRoot= "/locks"; 5 private String productId;
6 private String waitNode; 7 private String lockNode; 8 private
CountDownLatch latch; 9 private CountDownLatch connectedLatch = new
CountDownLatch(1); 10 private int sessionTimeout = 30000; 11 12 public
ZooKeeperDistributedLock(String productId){ 13 this.productId = productId; 14
try { 15 String address =
"192.168.31.187:2181,192.168.31.19:2181,192.168.31.227:2181"; 16 zk = new
ZooKeeper(address, sessionTimeout,this); 17 connectedLatch.await(); 18 }
catch (IOException e) { 19 throw new LockException(e); 20 } catch
(KeeperException e) { 21 throw new LockException(e); 22 } catch
(InterruptedException e) { 23 throw new LockException(e); 24 } 25 } 26 27
public void process(WatchedEvent event) { 28 if(event.getState()==
KeeperState.SyncConnected){ 29 connectedLatch.countDown(); 30 return; 31 }
32 33 if(this.latch != null) { 34 this.latch.countDown(); 35 } 36 } 37
38 public void acquireDistributedLock() { 39 try { 40 if(this.tryLock()){ 41
return; 42 } 43 else{ 44 waitForLock(waitNode, sessionTimeout); 45 } 46
}catch (KeeperException e) { 47 throw new LockException(e); 48 } catch
(InterruptedException e) { 49 throw new LockException(e); 50 } 51 } 52 53
public boolean tryLock() { 54 try { 55 // 傳入進去的locksRoot + “/” + productId 56
// 假設(shè)productId代表了一個商品id,比如說1 57 // locksRoot = locks 58 //
/locks/10000000000,/locks/10000000001,/locks/10000000002 59 lockNode =
zk.create(locksRoot + "/" + productId,new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL); 60 61 // 看看剛創(chuàng)建的節(jié)點是不是最小的節(jié)點 62 //
locks:10000000000,10000000001,10000000002 63 List<String> locks =
zk.getChildren(locksRoot,false); 64 Collections.sort(locks); 65 66 if
(lockNode.equals(locksRoot+"/"+ locks.get(0))){ 67 //如果是最小的節(jié)點,則表示取得鎖 68 return
true; 69 } 70 71 //如果不是最小的節(jié)點,找到比自己小1的節(jié)點 72 int previousLockIndex = -1; 73
for(int i = 0; i < locks.size(); i++) { 74 if(lockNode.equals(locksRoot + “/” +
locks.get(i))) { 75 previousLockIndex = i - 1; 76 break; 77 } 78 } 79 80
this.waitNode = locks.get(previousLockIndex); 81 } catch (KeeperException e) {
82 throw new LockException(e); 83 } catch (InterruptedException e) { 84 throw
new LockException(e); 85 } 86 return false; 87 } 88 89 private boolean
waitForLock(String waitNode,long waitTime) throws InterruptedException,
KeeperException { 90 Stat stat = zk.exists(locksRoot + "/" + waitNode, true);
91 if(stat != null){ 92 this.latch = new CountDownLatch(1); 93 this
.latch.await(waitTime, TimeUnit.MILLISECONDS);this.latch = null; 94 } 95
return true; 96 } 97 98 public void unlock() { 99 try { 100 //
刪除/locks/10000000000節(jié)點101 // 刪除/locks/10000000001節(jié)點 102
System.out.println("unlock " + lockNode); 103 zk.delete(lockNode,-1); 104
lockNode =null; 105 zk.close(); 106 } catch (InterruptedException e) { 107
e.printStackTrace();108 } catch (KeeperException e) { 109 e.printStackTrace();
110 } 111 } 112 113 public class LockException extends RuntimeException { 114
private static final long serialVersionUID = 1L; 115 public
LockException(String e){116 super(e); 117 } 118 public LockException(Exception
e){119 super(e); 120 } 121 } 122 123 //
如果有一把鎖,被多個人給競爭,此時多個人會排隊,第一個拿到鎖的人會執(zhí)行,然后釋放鎖,后面的每個人都會去監(jiān)聽排在自己前面的那個人創(chuàng)建的node上,一旦某個人釋放了鎖,排在自己后面的人就會被zookeeper給通知,一旦被通知了之后,就ok了,自己就獲取到了鎖,就可以執(zhí)行代碼了
124 125 }
?
?
?
?
另一個版本:
?
zk分布式鎖,就是某個節(jié)點嘗試創(chuàng)建臨時znode,此時創(chuàng)建成功了就獲取了這個鎖;這個時候別的客戶端來創(chuàng)建鎖會失敗,只能注冊個監(jiān)聽器監(jiān)聽這個鎖。
釋放鎖就是刪除這個znode,一旦釋放掉就會通知客戶端,然后有一個等待著的客戶端就可以再次重新加鎖。
?
?
?
redis分布式鎖,其實需要自己不斷去嘗試獲取鎖,比較消耗性能
?
zk分布式鎖,獲取不到鎖,注冊個監(jiān)聽器即可,不需要不斷主動嘗試獲取鎖,性能開銷較小
?
另外一點就是,如果是redis獲取鎖的那個客戶端bug了或者掛了,那么只能等待超時時間之后才能釋放鎖;而zk的話,因為創(chuàng)建的是臨時znode,只要客戶端掛了,znode就沒了,此時就自動釋放鎖
?
熱門工具 換一換