程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> JAVA編程 >> 關於JAVA >> 詳解Java若何完成基於Redis的散布式鎖

詳解Java若何完成基於Redis的散布式鎖

編輯:關於JAVA

詳解Java若何完成基於Redis的散布式鎖。本站提示廣大學習愛好者:(詳解Java若何完成基於Redis的散布式鎖)文章只能為提供參考,不一定能成為您想要的結果。以下是詳解Java若何完成基於Redis的散布式鎖正文


媒介

單JVM內同步好辦, 直接用JDK供給的鎖便可以了,然則跨過程同步靠這個確定是弗成能的,這類情形下確定要借助第三方,我這裡完成用Redis,固然還有許多其他的完成方法。其實基於Redis完成的道理還算比擬簡略的,在看代碼之前建議年夜家先去看看道理,看懂了以後看代碼應當就輕易懂得了。

我這裡不完成JDK的java.util.concurrent.locks.Lock接口,而是自界說一個,由於JDK的有個newCondition辦法我這裡臨時沒完成。這個Lock供給了5個lock辦法的變體,可以自行選擇應用哪個來獲得鎖,我的設法主意是最好用帶超時前往的那幾個辦法,由於不如許的話,假設redis掛了,線程永久都在那逝世輪回了(關於這裡,應當還可以進一步優化,假如redis掛了,Jedis的操作確定會拋異常之類的,可以界說個機制讓redis掛了的時刻告訴應用這個lock的用戶,或許說是線程)

package cc.lixiaohui.lock;

import java.util.concurrent.TimeUnit;

public interface Lock {

 /**
 * 壅塞性的獲得鎖, 不呼應中止
 */
 void lock;
 
 /**
 * 壅塞性的獲得鎖, 呼應中止
 * 
 * @throws InterruptedException
 */
 void lockInterruptibly throws InterruptedException;
 
 /**
 * 測驗考試獲得鎖, 獲得不到立刻前往, 不壅塞
 */
 boolean tryLock;
 
 /**
 * 超時主動前往的壅塞性的獲得鎖, 不呼應中止
 * 
 * @param time
 * @param unit
 * @return {@code true} 若勝利獲得到鎖, {@code false} 若在指准時間內未���取到鎖
  * 
 */
 boolean tryLock(long time, TimeUnit unit);
 
 /**
 * 超時主動前往的壅塞性的獲得鎖, 呼應中止
 * 
 * @param time
 * @param unit
 * @return {@code true} 若勝利獲得到鎖, {@code false} 若在指准時間內未獲得到鎖
 * @throws InterruptedException 在測驗考試獲得鎖確當前哨程被中止
 */
 boolean tryLockInterruptibly(long time, TimeUnit unit) throws InterruptedException;
 
 /**
 * 釋放鎖
 */
 void unlock;
 
}

看其籠統完成:

package cc.lixiaohui.lock;

import java.util.concurrent.TimeUnit;

/**
 * 鎖的骨架完成, 真實的獲得鎖的步調由子類去完成.
 * 
 * @author lixiaohui
 *
 */
public abstract class AbstractLock implements Lock {

 /**
 * <pre>
 * 這裡需不須要包管可見性值得評論辯論, 由於是散布式的鎖, 
 * 1.統一個jvm的多個線程應用分歧的鎖對象其實也是可以的, 這類情形下不須要包管可見性 
 * 2.統一個jvm的多個線程應用統一個鎖對象, 那可見性就必需要包管了.
 * </pre>
 */
 protected volatile boolean locked;

 /**
 * 以後jvm內持有該鎖的線程(if have one)
 */
 private Thread exclusiveOwnerThread;

 public void lock {
 try {
 lock(false, 0, null, false);
 } catch (InterruptedException e) {
 // TODO ignore
 }
 }

 public void lockInterruptibly throws InterruptedException {
 lock(false, 0, null, true);
 }

 public boolean tryLock(long time, TimeUnit unit) {
 try {
 return lock(true, time, unit, false);
 } catch (InterruptedException e) {
 // TODO ignore
 }
 return false;
 }

 public boolean tryLockInterruptibly(long time, TimeUnit unit) throws InterruptedException {
 return lock(true, time, unit, true);
 }

 public void unlock {
 // TODO 檢討以後線程能否持有鎖
 if (Thread.currentThread != getExclusiveOwnerThread) {
 throw new IllegalMonitorStateException("current thread does not hold the lock");
 }
 
 unlock0;
 setExclusiveOwnerThread(null);
 }

 protected void setExclusiveOwnerThread(Thread thread) {
 exclusiveOwnerThread = thread;
 }

 protected final Thread getExclusiveOwnerThread {
 return exclusiveOwnerThread;
 }

 protected abstract void unlock0;
 
 /**
 * 壅塞式獲得鎖的完成
 * 
 * @param useTimeout 
 * @param time
 * @param unit
 * @param interrupt 能否呼應中止
 * @return
 * @throws InterruptedException
 */
 protected abstract boolean lock(boolean useTimeout, long time, TimeUnit unit, boolean interrupt) throws InterruptedException;

}

基於Redis的終究完成,症結的獲得鎖,釋放鎖的代碼在這個類的lock辦法和unlock0辦法裡,年夜家可以只看這兩個辦法然後完整本身寫一個:

package cc.lixiaohui.lock;

import java.util.concurrent.TimeUnit;

import redis.clients.jedis.Jedis;

/**
 * <pre>
 * 基於Redis的SETNX操作完成的散布式鎖
 * 
 * 獲得鎖時最好用lock(long time, TimeUnit unit), 以避免網路成績而招致線程一向壅塞
 * 
 * <a href="http://redis.io/commands/setnx">SETNC操作參考材料</a>
 * </pre>
 * 
 * @author lixiaohui
 *
 */
public class RedisBasedDistributedLock extends AbstractLock {
 
 private Jedis jedis;
 
 // 鎖的名字
 protected String lockKey;
 
 // 鎖的有用時長(毫秒)
 protected long lockExpires;
 
 public RedisBasedDistributedLock(Jedis jedis, String lockKey, long lockExpires) {
 this.jedis = jedis;
 this.lockKey = lockKey;
 this.lockExpires = lockExpires;
 }

 // 壅塞式獲得鎖的完成
 protected boolean lock(boolean useTimeout, long time, TimeUnit unit, boolean interrupt) throws InterruptedException{
 if (interrupt) {
 checkInterruption;
 }
 
 long start = System.currentTimeMillis;
 long timeout = unit.toMillis(time); // if !useTimeout, then it's useless
 
 while (useTimeout ? isTimeout(start, timeout) : true) {
 if (interrupt) {
 checkInterruption;
 }
 
 long lockExpireTime = System.currentTimeMillis + lockExpires + 1;//鎖超不時間
 String stringOfLockExpireTime = String.valueOf(lockExpireTime);
 
 if (jedis.setnx(lockKey, stringOfLockExpireTime) == 1) { // 獲得到鎖
 // TODO 勝利獲得到鎖, 設置相干標識
 locked = true;
 setExclusiveOwnerThread(Thread.currentThread);
 return true;
 }
 
 String value = jedis.get(lockKey);
 if (value != null && isTimeExpired(value)) { // lock is expired
 // 假定多個線程(非單jvm)同時走到這裡
 String oldValue = jedis.getSet(lockKey, stringOfLockExpireTime); // getset is atomic
 // 然則走到這裡時每一個線程拿到的oldValue確定弗成能一樣(由於getset是原子性的)
 // 參加拿到的oldValue仍然是expired的,那末就解釋拿到鎖了
 if (oldValue != null && isTimeExpired(oldValue)) {
  // TODO 勝利獲得到鎖, 設置相干標識
  locked = true;
  setExclusiveOwnerThread(Thread.currentThread);
  return true;
 }
 } else { 
 // TODO lock is not expired, enter next loop retrying
 }
 }
 return false;
 }
 
 public boolean tryLock {
 long lockExpireTime = System.currentTimeMillis + lockExpires + 1;//鎖超不時間
 String stringOfLockExpireTime = String.valueOf(lockExpireTime);
 
 if (jedis.setnx(lockKey, stringOfLockExpireTime) == 1) { // 獲得到鎖
 // TODO 勝利獲得到鎖, 設置相干標識
 locked = true;
 setExclusiveOwnerThread(Thread.currentThread);
 return true;
 }
 
 String value = jedis.get(lockKey);
 if (value != null && isTimeExpired(value)) { // lock is expired
 // 假定多個線程(非單jvm)同時走到這裡
 String oldValue = jedis.getSet(lockKey, stringOfLockExpireTime); // getset is atomic
 // 然則走到這裡時每一個線程拿到的oldValue確定弗成能一樣(由於getset是原子性的)
 // 假設拿到的oldValue仍然是expired的,那末就解釋拿到鎖了
 if (oldValue != null && isTimeExpired(oldValue)) {
 // TODO 勝利獲得到鎖, 設置相干標識
 locked = true;
 setExclusiveOwnerThread(Thread.currentThread);
 return true;
 }
 } else { 
 // TODO lock is not expired, enter next loop retrying
 }
 
 return false;
 }
 
 /**
 * Queries if this lock is held by any thread.
 * 
 * @return {@code true} if any thread holds this lock and
  *   {@code false} otherwise
 */
 public boolean isLocked {
 if (locked) {
 return true;
 } else {
 String value = jedis.get(lockKey);
 // TODO 這裡實際上是有成績的, 想:當get辦法前往value後, 假定這個value曾經是過時的了,
 // 而就在這剎時, 另外一個節點set了value, 這時候鎖是被其余線程(節點持有), 而接上去的斷定
 // 是檢測不出這類情形的.不外這個成績應當不會招致其它的成績湧現, 由於這個辦法的目標原來就
 // 不是同步掌握, 它只是一種鎖狀況的申報.
 return !isTimeExpired(value);
 }
 }

 @Override
 protected void unlock0 {
 // TODO 斷定鎖能否過時
 String value = jedis.get(lockKey);
 if (!isTimeExpired(value)) {
 doUnlock;
 }
 }

 private void checkInterruption throws InterruptedException {
 if(Thread.currentThread.isInterrupted) {
 throw new InterruptedException;
 }
 }
 
 private boolean isTimeExpired(String value) {
 return Long.parseLong(value) < System.currentTimeMillis;
 }
 
 private boolean isTimeout(long start, long timeout) {
 return start + timeout > System.currentTimeMillis;
 }
 
 private void doUnlock {
 jedis.del(lockKey);
 }

}

假如未來還換一種完成方法(好比zookeeper之類的),到時直接繼續AbstractLock並完成lock(boolean useTimeout, long time, TimeUnit unit, boolean interrupt)unlock0辦法便可(所謂籠統嘛)

測試

模仿全局ID增加器,設計一個IDGenerator類,該類擔任生玉成局遞增ID,其代碼以下:

package cc.lixiaohui.lock;

import java.math.BigInteger;
import java.util.concurrent.TimeUnit;

/**
 * 模仿ID生成 
 * @author lixiaohui
 *
 */
public class IDGenerator {

 private static BigInteger id = BigInteger.valueOf(0);

 private final Lock lock;

 private static final BigInteger INCREMENT = BigInteger.valueOf(1);

 public IDGenerator(Lock lock) {
 this.lock = lock;
 }
 
 public String getAndIncrement {
 if (lock.tryLock(3, TimeUnit.SECONDS)) {
 try {
 // TODO 這裡獲得到鎖, 拜訪臨界區資本
 return getAndIncrement0;
 } finally {
 lock.unlock;
 }
 }
 return null;
 //return getAndIncrement0;
 }

 private String getAndIncrement0 {
 String s = id.toString;
 id = id.add(INCREMENT);
 return s;
 }
}

測試主邏輯:統一個JVM內開兩個線程逝世輪回地(輪回之間無距離,有的話測試就沒意義了)獲得ID(我這裡其實不是逝世輪回而是跑20s),獲得到ID存到統一個Set外面,在存之前先檢討該IDset中能否存在,假如已存在,則讓兩個線程都停滯。假如法式能正常跑完20s,那末解釋這個散布式鎖還算可以知足請求,如斯測試的後果應當和分歧JVM(也就是真實的散布式情況中)測試的後果是一樣的,上面是測試類的代碼:

package cc.lixiaohui.DistributedLock.DistributedLock;

import java.util.HashSet;
import java.util.Set;

import org.junit.Test;

import redis.clients.jedis.Jedis;
import cc.lixiaohui.lock.IDGenerator;
import cc.lixiaohui.lock.Lock;
import cc.lixiaohui.lock.RedisBasedDistributedLock;

public class IDGeneratorTest {
 
 private static Set<String> generatedIds = new HashSet<String>;
 
 private static final String LOCK_KEY = "lock.lock";
 private static final long LOCK_EXPIRE = 5 * 1000;
 
 @Test
 public void test throws InterruptedException {
 Jedis jedis1 = new Jedis("localhost", 6379);
 Lock lock1 = new RedisBasedDistributedLock(jedis1, LOCK_KEY, LOCK_EXPIRE);
 IDGenerator g1 = new IDGenerator(lock1);
 IDConsumeMission consume1 = new IDConsumeMission(g1, "consume1");
 
 Jedis jedis2 = new Jedis("localhost", 6379);
 Lock lock2 = new RedisBasedDistributedLock(jedis2, LOCK_KEY, LOCK_EXPIRE);
 IDGenerator g2 = new IDGenerator(lock2);
 IDConsumeMission consume2 = new IDConsumeMission(g2, "consume2");
 
 Thread t1 = new Thread(consume1);
 Thread t2 = new Thread(consume2);
 t1.start;
 t2.start;
 
 Thread.sleep(20 * 1000); //讓兩個線程跑20秒
 
 IDConsumeMission.stop;
 
 t1.join;
 t2.join;
 }
 
 static String time {
 return String.valueOf(System.currentTimeMillis / 1000);
 }
 
 static class IDConsumeMission implements Runnable {

 private IDGenerator idGenerator;
 
 private String name;
 
 private static volatile boolean stop;
 
 public IDConsumeMission(IDGenerator idGenerator, String name) {
 this.idGenerator = idGenerator;
 this.name = name;
 }
 
 public static void stop {
 stop = true;
 }
 
 public void run {
 System.out.println(time + ": consume " + name + " start ");
 while (!stop) {
 String id = idGenerator.getAndIncrement;
 if(generatedIds.contains(id)) {
  System.out.println(time + ": duplicate id generated, id = " + id);
  stop = true;
  continue;
 } 
 
 generatedIds.add(id);
 System.out.println(time + ": consume " + name + " add id = " + id);
 }
 System.out.println(time + ": consume " + name + " done ");
 }
 
 }
 
}

解釋一點,我這裡停滯兩個線程的方法其實不是很好,我是為了便利才這麼做的,由於只是測試,最好不要這麼做。

測試成果

跑20s打印的器械太多,後面打印的被clear了,只要差不多跑完的時刻才有,上面截圖。解釋了這個鎖能正常任務:

IDGererator沒有加鎖(即IDGereratorgetAndIncrement辦法外部獲得id時不上鎖)時,測試是欠亨過的,異常年夜的幾率半途就會停滯,上面是不加鎖時的測試成果:

這個1秒都不到:

這個也1秒都不到:

停止語

好了,以上就是Java完成基於Redis的散布式鎖的全體內容,列位假如發明成績願望能斧正,願望這篇文章能對年夜家的進修和任務帶來必定的贊助,假如有疑問可以留言交換。

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved