Redis實現(xiàn)分布式鎖

            1.根據(jù)lockKey區(qū)進行setnx(set not
          exist,如果key值為空,則正常設(shè)置,返回1,否則不會進行設(shè)置并返回0)操作,如果設(shè)置成功,表示已經(jīng)獲得鎖,否則并沒有獲取鎖。


            2.如果沒有獲得鎖,去Redis上拿到該key對應(yīng)的值,在該key上我們存儲一個時間戳(用毫秒表示,t1),為了避免死鎖以及其他客戶端占用該鎖超過一定時間(5秒),使用該客戶端當(dāng)前時間戳,與存儲的時間戳作比較。


            3.如果沒有超過該key的使用時限,返回false,表示其他人正在占用該key,不能強制使用;如果已經(jīng)超過時限,那我們就可以進行解鎖,使用我們的時間戳來代替該字段的值。


            4.但是如果在setnx失敗后,get該值卻無法拿到該字段時,說明操作之前該鎖已經(jīng)被釋放,這個時候,最好的辦法就是重新執(zhí)行一遍setnx方法來獲取其值以獲得該鎖。

            釋放鎖:刪除redis中key

          ?
          1 public class RedisKeyLock { 2 private static Logger logger =
          Logger.getLogger(RedisKeyLock.class); 3 private final static long
          ACCQUIRE_LOCK_TIMEOUT_IN_MS = 10 * 1000; 4 private final static int
          EXPIRE_IN_SECOND = 5;//鎖失效時間 5 private final static long WAIT_INTERVAL_IN_MS =
          100; 6 private static RedisKeyLock lock; 7 private JedisPool jedisPool; 8
          private RedisKeyLock(JedisPool pool){ 9 this.jedisPool = pool; 10 } 11 public
          static RedisKeyLock getInstance(JedisPool pool){ 12 if(lock == null){ 13 lock =
          new RedisKeyLock(pool); 14 } 15 return lock; 16 } 17 18 public void lock(final
          String redisKey) {19 Jedis resource = null; 20 try { 21 long now =
          System.currentTimeMillis();22 resource = jedisPool.getResource(); 23 long
          timeoutAt = now + ACCQUIRE_LOCK_TIMEOUT_IN_MS; 24 boolean flag = false; 25 while
          (true) { 26 String expireAt = String.valueOf(now + EXPIRE_IN_SECOND * 1000); 27
          long ret = resource.setnx(redisKey, expireAt); 28 if (ret == 1) {//已獲取鎖 29 flag
          =true; 30 break; 31 } else {//未獲取鎖,重試獲取鎖 32 String oldExpireAt =
          resource.get(redisKey);33 if (oldExpireAt != null &&
          Long.parseLong(oldExpireAt) < now) { 34 oldExpireAt = resource.getSet(redisKey,
          expireAt);35 if (Long.parseLong(oldExpireAt) < now) { 36 flag = true; 37 break;
          38 } 39 } 40 } 41 if (timeoutAt < now) { 42 break; 43 } 44
          TimeUnit.NANOSECONDS.sleep(WAIT_INTERVAL_IN_MS);45 } 46 if (!flag) { 47 throw
          new RuntimeException("canot acquire lock now ..."); 48 } 49 } catch
          (JedisException je) {50 logger.error("lock", je); 51 je.printStackTrace(); 52
          if (resource != null) { 53 jedisPool.returnBrokenResource(resource); 54 } 55 }
          catch (Exception e) { 56 e.printStackTrace(); 57 logger.error("lock", e); 58 }
          finally { 59 if (resource != null) { 60 jedisPool.returnResource(resource); 61
          }62 } 63 } 64 public boolean unlock(final String redisKey) { 65 Jedis
          resource =null; 66 try { 67 resource = jedisPool.getResource(); 68
          resource.del(redisKey);69 return true; 70 } catch (JedisException je) { 71
          je.printStackTrace();72 if (resource != null) { 73
          jedisPool.returnBrokenResource(resource);74 } 75 return false; 76 } catch
          (Exception e) {77 logger.error("lock", e); 78 return false; 79 } finally { 80 if
          (resource !=null) { 81 jedisPool.returnResource(resource); 82 } 83 } 84 }
          85 }
          ?

          另一個版本:

            SET my:lock 隨機值 NX PX 30000

            這個的NX的意思就是只有key不存在的時候才會設(shè)置成功,PX 30000的意思是30秒后鎖自動釋放。別人創(chuàng)建的時候如果發(fā)現(xiàn)已經(jīng)有了就不能加鎖了。

            釋放鎖就是刪除key,但是一般可以用lua腳本刪除,判斷value一樣才刪除

          ?


            為啥要用隨機值呢?因為如果某個客戶端獲取到了鎖,但是阻塞了很長時間才執(zhí)行完,此時可能已經(jīng)自動釋放鎖了,此時可能別的客戶端已經(jīng)獲取到了這個鎖,要是你這個時候直接刪除key的話會有問題,所以得用隨機值加上面的lua腳本來釋放鎖。(就是根據(jù)這個隨機值來判斷這個鎖是不是自己加的)

          ?

            如果是Redis是單機,會有問題。因為如果是普通的redis單實例,那就是單點故障。單節(jié)點掛了會導(dǎo)致鎖失效。

            如果是redis普通主從,那redis主從異步復(fù)制,如果主節(jié)點掛了,key還沒同步到從節(jié)點,此時從節(jié)點切換為主節(jié)點,別人就會拿到鎖。

          ?

          RedLock算法

            這個場景是假設(shè)有一個redis cluster,有5個redis master實例。然后執(zhí)行如下步驟獲取一把鎖:

          ?

            獲取當(dāng)前時間戳,單位是毫秒

            跟上面類似,輪流嘗試在每個master節(jié)點上創(chuàng)建鎖,過期時間較短,一般就幾十毫秒

            嘗試在大多數(shù)節(jié)點上建立一個鎖,比如5個節(jié)點就要求是3個節(jié)點(n / 2 +1)

            客戶端計算建立好鎖的時間,如果建立鎖的時間小于超時時間,就算建立成功了

            要是鎖建立失敗了,那么就依次刪除這個鎖

            只要別人建立了一把分布式鎖,你就得不斷輪詢?nèi)L試獲取鎖

          ?



          ?

           

          Zookeeper實現(xiàn)分布式鎖

          基于臨時順序節(jié)點:


            1.客戶端調(diào)用create()方法創(chuàng)建名為“l(fā)ocknode/guid-lock-”的節(jié)點,需要注意的是,這里節(jié)點的創(chuàng)建類型需要設(shè)置為EPHEMERAL_SEQUENTIAL。

            2.客戶端調(diào)用getChildren(“l(fā)ocknode”)方法來獲取所有已經(jīng)創(chuàng)建的子節(jié)點。

            3.客戶端獲取到所有子節(jié)點path之后,如果發(fā)現(xiàn)自己在步驟1中創(chuàng)建的節(jié)點是所有節(jié)點中序號最小的,那么就認為這個客戶端獲得了鎖。


            4.如果創(chuàng)建的節(jié)點不是所有節(jié)點中序號最小的,那么則監(jiān)視比自己創(chuàng)建節(jié)點的序列號小的最大的節(jié)點,進入等待。直到下次監(jiān)視的子節(jié)點變更的時候,再進行子節(jié)點的獲取,判斷是否獲取鎖。

            釋放鎖的過程相對比較簡單,就是刪除自己創(chuàng)建的那個子節(jié)點即可。

          ?

          不太嚴謹?shù)拇a:

          ?
          1 public class ZooKeeperDistributedLock implements Watcher{ 2 3 private
          ZooKeeper zk; 4 private String locksRoot= "/locks"; 5 private String productId;
          6 private String waitNode; 7 private String lockNode; 8 private
          CountDownLatch latch; 9 private CountDownLatch connectedLatch = new
          CountDownLatch(1); 10 private int sessionTimeout = 30000; 11 12 public
          ZooKeeperDistributedLock(String productId){ 13 this.productId = productId; 14
          try { 15 String address =
          "192.168.31.187:2181,192.168.31.19:2181,192.168.31.227:2181"; 16 zk = new
          ZooKeeper(address, sessionTimeout,this); 17 connectedLatch.await(); 18 }
          catch (IOException e) { 19 throw new LockException(e); 20 } catch
          (KeeperException e) { 21 throw new LockException(e); 22 } catch
          (InterruptedException e) { 23 throw new LockException(e); 24 } 25 } 26 27
          public void process(WatchedEvent event) { 28 if(event.getState()==
          KeeperState.SyncConnected){ 29 connectedLatch.countDown(); 30 return; 31 }
          32 33 if(this.latch != null) { 34 this.latch.countDown(); 35 } 36 } 37
          38 public void acquireDistributedLock() { 39 try { 40 if(this.tryLock()){ 41
          return; 42 } 43 else{ 44 waitForLock(waitNode, sessionTimeout); 45 } 46
          }catch (KeeperException e) { 47 throw new LockException(e); 48 } catch
          (InterruptedException e) { 49 throw new LockException(e); 50 } 51 } 52 53
          public boolean tryLock() { 54 try { 55 // 傳入進去的locksRoot + “/” + productId 56
          // 假設(shè)productId代表了一個商品id,比如說1 57 // locksRoot = locks 58 //
          /locks/10000000000,/locks/10000000001,/locks/10000000002 59 lockNode =
          zk.create(locksRoot + "/" + productId,new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
          CreateMode.EPHEMERAL_SEQUENTIAL); 60 61 // 看看剛創(chuàng)建的節(jié)點是不是最小的節(jié)點 62 //
          locks:10000000000,10000000001,10000000002 63 List<String> locks =
          zk.getChildren(locksRoot,false); 64 Collections.sort(locks); 65 66 if
          (lockNode.equals(locksRoot+"/"+ locks.get(0))){ 67 //如果是最小的節(jié)點,則表示取得鎖 68 return
          true; 69 } 70 71 //如果不是最小的節(jié)點,找到比自己小1的節(jié)點 72 int previousLockIndex = -1; 73
          for(int i = 0; i < locks.size(); i++) { 74 if(lockNode.equals(locksRoot + “/” +
          locks.get(i))) { 75 previousLockIndex = i - 1; 76 break; 77 } 78 } 79 80
          this.waitNode = locks.get(previousLockIndex); 81 } catch (KeeperException e) {
          82 throw new LockException(e); 83 } catch (InterruptedException e) { 84 throw
          new LockException(e); 85 } 86 return false; 87 } 88 89 private boolean
          waitForLock(String waitNode,long waitTime) throws InterruptedException,
          KeeperException { 90 Stat stat = zk.exists(locksRoot + "/" + waitNode, true);
          91 if(stat != null){ 92 this.latch = new CountDownLatch(1); 93 this
          .latch.await(waitTime, TimeUnit.MILLISECONDS);this.latch = null; 94 } 95
          return true; 96 } 97 98 public void unlock() { 99 try { 100 //
          刪除/locks/10000000000節(jié)點101 // 刪除/locks/10000000001節(jié)點 102
          System.out.println("unlock " + lockNode); 103 zk.delete(lockNode,-1); 104
          lockNode =null; 105 zk.close(); 106 } catch (InterruptedException e) { 107
          e.printStackTrace();108 } catch (KeeperException e) { 109 e.printStackTrace();
          110 } 111 } 112 113 public class LockException extends RuntimeException { 114
          private static final long serialVersionUID = 1L; 115 public
          LockException(String e){116 super(e); 117 } 118 public LockException(Exception
          e){119 super(e); 120 } 121 } 122 123 //
          如果有一把鎖,被多個人給競爭,此時多個人會排隊,第一個拿到鎖的人會執(zhí)行,然后釋放鎖,后面的每個人都會去監(jiān)聽排在自己前面的那個人創(chuàng)建的node上,一旦某個人釋放了鎖,排在自己后面的人就會被zookeeper給通知,一旦被通知了之后,就ok了,自己就獲取到了鎖,就可以執(zhí)行代碼了
          124 125 }
          ?

          ?

          ?

          ?

          另一個版本:

          ?

            zk分布式鎖,就是某個節(jié)點嘗試創(chuàng)建臨時znode,此時創(chuàng)建成功了就獲取了這個鎖;這個時候別的客戶端來創(chuàng)建鎖會失敗,只能注冊個監(jiān)聽器監(jiān)聽這個鎖。

            釋放鎖就是刪除這個znode,一旦釋放掉就會通知客戶端,然后有一個等待著的客戶端就可以再次重新加鎖。

          ?

          ?

          ?

            redis分布式鎖,其實需要自己不斷去嘗試獲取鎖,比較消耗性能

          ?

            zk分布式鎖,獲取不到鎖,注冊個監(jiān)聽器即可,不需要不斷主動嘗試獲取鎖,性能開銷較小

          ?


            另外一點就是,如果是redis獲取鎖的那個客戶端bug了或者掛了,那么只能等待超時時間之后才能釋放鎖;而zk的話,因為創(chuàng)建的是臨時znode,只要客戶端掛了,znode就沒了,此時就自動釋放鎖

          ?


          友情鏈接
          ioDraw流程圖
          API參考文檔
          OK工具箱
          云服務(wù)器優(yōu)惠
          阿里云優(yōu)惠券
          騰訊云優(yōu)惠券
          京東云優(yōu)惠券
          站點信息
          問題反饋
          郵箱:[email protected]
          QQ群:637538335
          關(guān)注微信

                国产人人操 | 日韩三级电影网 | 天天综合色一区二区三区 | 欧美色图视频在线观看 | 安娜大尺度做爰未删减视频 |