分布式鎖在分布式應用當中是要經常用到的,主要是解決分布式資源訪問衝突的問題。 一開始考慮採用ReentrantLock來實現,但是實際上去實現的時候,是有問題的,ReentrantLock的lock和unlock要求必須是在同一線程進行,而分布式應用中,lock和unlock是兩次不相關的請求,因此肯定不是同一線程,因此導致無法使用ReentrantLock。
接下來就考慮採用自己做個狀態來進行鎖狀態的記錄,結果發現總是死鎖,仔細一看代碼,能不鎖死麼。
public synchronized void lock{ while(lock){ Thread.sleep(1); } lock=true; ... } public synchronized void unlock{ lock=false; ... }
第一個請求要求獲得鎖,好麼,給他個鎖定狀態,然後他拿著鎖去幹活了。
這個時候,第二個請求也要求鎖,OK,他在lock中等待解鎖。
第一個幹完活了,過來還鎖了,這個時候悲催了,因為,他進不了unlock方法了。
可能有人會問,為什麼採用while,而不是採用wait...notify?這個問題留一下,看看有人能給出來不?
總之,上面的方安案流產了。
同樣,不把synchronized 放在方法上,直接放在方法裡放個同步對象可以不??道理是一樣的,也會發生上面一樣的死鎖。
到此為止前途一片黑暗。
@沈學良同學的寫了一個用zk做的同布鎖,感覺還是比較複雜的且存疑。自己做不出來吧,又不死心。
再來看看Lock的接口,想了一下,不遵守Lock的接口了。編寫了下面的接口。
public interface DistributedLock extends RemoteObject { long lock throws RemoteException, TimeoutException; long tryLock(long time, TimeUnit unit) throws RemoteException, TimeoutException; void
unlock(long token) throws RemoteException; }
呵呵,眼尖的同學可能已經發現不同了。
lock方法增加了個long返回值,tryLock方法,返回的也不是boolean,也是long,unlock方法多了一個long參數型參數,呵呵,技巧就在這裡了。
public class DistributedLockImpl extends UnicastRemoteObject implements DistributedLock { /** * 超時單位 */ private TimeUnit lockTimeoutUnit = TimeUnit.SECONDS; /** * 鎖的令牌 */ private volatile long token
= 0; /** * 同步對象 */ byte lock = new byte[0]; /** * 默認永不超時 */ long lockTimeout = 60 * 60;//默認超時3600秒 long beginLockTime;//獲取令牌時間,單位毫秒 public DistributedLockImpl throws RemoteException { super; } /** *
@param lockTimeout 鎖超時時間,如果加鎖的對象不解鎖,超時之後自動解鎖 * @param lockTimeoutUnit * @throws RemoteException */ public DistributedLockImpl(long lockTimeout, TimeUnit lockTimeoutUnit) throws RemoteException {
super; this.lockTimeout = lockTimeout; this.lockTimeoutUnit = this.lockTimeoutUnit; } public long lock throws TimeoutException { return tryLock(0, TimeUnit.MILLISECONDS); } private boolean
isLockTimeout { if (lockTimeout <= 0) { return false; } return (System.currentTimeMillis - beginLockTime) < lockTimeoutUnit.toMillis(lockTimeout); } private long getToken { beginLockTime =
System.currentTimeMillis; token = System.nanoTime; return token; } public long tryLock(long time, TimeUnit unit) throws TimeoutException { synchronized (lock) { long startTime = System.nanoTime;
while (token != 0 && isLockTimeout) { try { if (time > 0) { long endTime = System.nanoTime; if (endTime - startTime >= unit.toMillis(time)) { throw new TimeoutException; } }
Thread.sleep(1); } catch (InterruptedException e) { //DO Noting } } return getToken; } } public void unlock(long token) { if (this.token != 0 && token == this.token) { this.token = 0; } else {
throw new RuntimeException("令牌" + token + "無效."); } } }
下面對代碼進行一下講解。
上面的代碼提供了,永遠等待的獲取鎖的lock方法和如果在指定的時間獲取鎖失敗就獲得超時異常的tryLock方法,另外還有一個unlock方法。
技術的關鍵點實際上就是在token上,上面的實現,有一個基本的假設,就是兩次遠程調用之間的時間不可能在1納秒之內完成。因此,每次鎖的操作都會返回一個長整型的令牌,就是當時執行時間的納秒數。下次解鎖必須用獲得的令牌進行解鎖,才可以成功。如此,解鎖就不用添加同步操作了,從而解決掉上面死鎖的問題。
實際上,沒有令牌也是可以的,但是那樣就會導致a獲取了鎖,但是b執行unlock也會成功解鎖,是不安全的,而加入令牌,就可以保證只有加鎖者才可以解鎖。
下面是測試代碼:
public class TestDLock { public static void main(String args) throws Exception { RmiServer rmiServer = new LocalRmiServer; DistributedLockImpl distributedLock = new DistributedLockImpl;
rmiServer.registerRemoteObject("lock1", distributedLock); MultiThreadProcessor processor = new MultiThreadProcessor("aa"); for (int i = 0; i < 8; i++) { processor.addProcessor(new RunLock("aa" +
i)); } long s = System.currentTimeMillis; processor.start; long e = System.currentTimeMillis; System.out.println(e - s); rmiServer.unexportObject(distributedLock); } } class RunLock extends
AbstractProcessor { public RunLock(String name) { super(name); } @Override protected void action throws Exception { try { RmiServer client = new RemoteRmiServer; DistributedLock lock =
client.getRemoteObject("lock1"); for (int i = 0; i < 1000; i++) { long token = lock.lock; lock.unlock(token); } System.out.println("end-" + Thread.currentThread.getId); } catch (RemoteException e)
{ e.printStackTrace; } } }
運行情況:
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27-0 [main] INFO - 線程組<aa>運行開始,線程數8...
-3 [aa-aa0] INFO - 線程<aa-aa0>運行開始...
-3 [aa-aa1] INFO - 線程<aa-aa1>運行開始...
-3 [aa-aa2] INFO - 線程<aa-aa2>運行開始...
-3 [aa-aa3] INFO - 線程<aa-aa3>運行開始...
-3 [aa-aa4] INFO - 線程<aa-aa4>運行開始...
-4 [aa-aa5] INFO - 線程<aa-aa5>運行開始...
-4 [aa-aa6] INFO - 線程<aa-aa6>運行開始...
-8 [aa-aa7] INFO - 線程<aa-aa7>運行開始...
end-19
-9050 [aa-aa3] INFO - 線程<aa-aa3>運行結束
end-17
-9052 [aa-aa1] INFO - 線程<aa-aa1>運行結束
end-20
-9056 [aa-aa4] INFO - 線程<aa-aa4>運行結束
end-16
-9058 [aa-aa0] INFO - 線程<aa-aa0>運行結束
end-21
-9059 [aa-aa5] INFO - 線程<aa-aa5>運行結束
end-26
-9063 [aa-aa7] INFO - 線程<aa-aa7>運行結束
end-18
-9064 [aa-aa2] INFO - 線程<aa-aa2>運行結束
end-22
-9065 [aa-aa6] INFO - 線程<aa-aa6>運行結束
-9066 [main] INFO - 線程組<aa>運行結束, 用時:9065ms
9069
也就是9069ms中執行了8000次鎖定及解鎖操作。
小結:
上面的分布式鎖實現方案,綜合考慮了實現簡單,鎖安全,鎖超時等因素。實際測試,大概900到1000次獲取鎖和釋放鎖操作每秒,可以滿足大多數應用要求。