0x00 概述
隨著互聯網技術的不斷發展,用戶量的不斷增加,越來越多的業務場景需要用到分佈式系統。
分佈式系統有一個著名的理論CAP,指在一個分佈式系統中,最多隻能同時滿足下面三項中的兩項:
- 一致性(Consistency):在分佈式系統中的所有數據備份,在同一時刻是否同樣的值(等同於所有節點訪問同一份最新的數據副本)
- 可用性(Availability):保證每個請求不管成功或者失敗都有響應
- 分區容錯性(Partition tolerance):系統中任意信息的丟失或失敗不會影響系統的繼續運作
所以在設計系統時,往往需要權衡,在CAP中作選擇,要麼AP,要麼CP、要麼AC。
當然,這個理論也並不一定完美,不同系統對CAP的要求級別不一樣,選擇需要考慮方方面面。
而在分佈式系統中訪問共享資源就需要一種互斥機制,來防止彼此之間的互相幹擾,以保證一致性,這個時候就需要使用分佈式鎖。
分佈式鎖:
當在分佈式模型下,數據隻有一份(或有限制),此時需要利用鎖技術來控制某一時刻修改數據的進程數。這種鎖即為分佈式鎖。
為了保證一個方法或屬性在高並發情況下的同一時間隻能被同一個線程執行,在傳統單體應用單機部署的情況下,可以使用並發處理相關的功能進行互斥控制。但是,隨著業務發展的需要,原單體單機部署的系統被演化成分佈式集群系統後,由於分佈式系統多線程、多進程並且分佈在不同機器上,這將使原單機部署情況下的並發控制鎖策略失效,單純的應用並不能提供分佈式鎖的能力。為了解決這個問題就需要一種跨機器的互斥機制來控制共享資源的訪問,這就是分佈式鎖要解決的問題!
分佈式鎖應該具備哪些條件:
- 互斥性:在分佈式系統環境下,一個方法在同一時間隻能被一個機器的一個線程執行;
- 高可用的獲取鎖與釋放鎖;
- 高性能的獲取鎖與釋放鎖;
- 可重入性:具備可重入特性,具備鎖失效機制,防止死鎖,即就算一個客戶端持有鎖的期間崩潰而沒有主動釋放鎖,也需要保證後續其他客戶端能夠加鎖成功
- 非阻塞:具備非阻塞鎖特性,即沒有獲取到鎖將直接返回獲取鎖失敗
分佈式鎖的業務場景:
- 互聯網秒殺(商品庫存)
- 搶優惠券
0x02 實現方式
分佈式鎖主要有幾種實現方式:
- 基於數據庫實現
- 基於Zookeeper實現
- 基於Redis實現
- 其他
- Chubby:谷歌公司實現的粗粒度分佈式鎖服務,底層使用了Paxos一致性算法Tair:淘寶的分佈式Key/Value存儲系統,主要是使用Tair的put()方法,原理和Redis類似Memcached:利用Memcached的add命令,此命令是原子性操作,隻有在key不存在的情況下才能add成功,也就意味著加鎖成功
如圖:
![](https://news.xinpengboligang.com/upload/keji/9925e8ed74a2916bfeb566d054941268.jpeg)
0x03 分佈式鎖:基於數據庫
1. 實現思想
主要有兩種方式:
- 悲觀鎖
- 樂觀鎖
A. 悲觀鎖(排他鎖)
利用select … where xx=yy for update排他鎖
註意:這裡需要註意的是where xx=yy,xx字段必須要走索引,否則會鎖表。有些情況下,比如表不大,mysql優化器會不走這個索引,導致鎖表問題。
核心思想:以「悲觀的心態」操作資源,無法獲得鎖成功,就一直阻塞著等待。
註意:該方式有很多缺陷,一般不建議使用。
實現:
創建一張資源鎖表:
CREATE TABLE `resource_lock` (
`id` int(4) NOT NULL AUTO_INCREMENT COMMENT '主鍵',
`resource_name` varchar(64) NOT NULL DEFAULT '' COMMENT '鎖定的資源名',
`owner` varchar(64) NOT NULL DEFAULT '' COMMENT '鎖擁有者',
`desc` varchar(1024) NOT NULL DEFAULT '備註信息',
`update_time` timestamp NOT NULL DEFAULT '' COMMENT '保存數據時間,自動生成',
PRIMARY KEY (`id`),
UNIQUE KEY `uidx_resource_name` (`resource_name `) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='鎖定中的資源';
註意:resource_name 鎖資源名稱必須有唯一索引
使用事務查詢更新:
@Transaction
public void lock(String name) {
ResourceLock rlock = exeSql("select * from resource_lock where resource_name = name for update");
if (rlock == null) {
exeSql("insert into resource_lock(reosurce_name,owner,count) values (name, 'ip',0)");
}
}
使用 for update 鎖定的資源。如果執行成功,會立即返回,執行插入數據庫,後續再執行一些其他業務邏輯,直到事務提交,執行結束;如果執行失敗,就會一直阻塞著。
可以在數據庫客戶端工具上測試出來這個效果,當在一個終端執行了 for update,不提交事務。在另外的終端上執行相同條件的 for update,會一直卡著
雖然也能實現分佈式鎖的效果,但是會存在性能瓶頸。
優點:
簡單易用,好理解,保障數據強一致性。
缺點:
1)在 RR 事務級別,select 的 for update 操作是基於間隙鎖(gap lock) 實現的,是一種悲觀鎖的實現方式,所以存在阻塞問題。
2)高並發情況下,大量請求進來,會導致大部分請求進行排隊,影響數據庫穩定性,也會耗費服務的CPU等資源。
當獲得鎖的客戶端等待時間過長時,會提示:
[40001][1205] Lock wait timeout exceeded; try restarting transaction
高並發情況下,也會造成占用過多的應用線程,導致業務無法正常響應。
3)如果優先獲得鎖的線程因為某些原因,一直沒有釋放掉鎖,可能會導致死鎖的發生。
4)鎖的長時間不釋放,會一直占用數據庫連接,可能會將數據庫連接池撐爆,影響其他服務。
5)MySql數據庫會做查詢優化,即便使用了索引,優化時發現全表掃效率更高,則可能會將行鎖升級為表鎖,此時可能就更悲劇了。
6)不支持可重入特性,並且超時等待時間是全局的,不能隨便改動。
B. 樂觀鎖
所謂樂觀鎖與悲觀鎖最大區別在於基於CAS思想,表中添加一個時間戳或者是版本號的字段來實現,update xx set version=new_version where xx=yy and version=Old_version,通過增加遞增的版本號字段實現樂觀鎖。
不具有互斥性,不會產生鎖等待而消耗資源,操作過程中認為不存在並發沖突,隻有update version失敗後才能覺察到。
搶購、秒殺就是用了這種實現以防止超賣。
如下圖:
![](https://news.xinpengboligang.com/upload/keji/ccbb96ad963c68d089b8271784bff521.jpeg)
實現:
創建一張資源鎖表:
CREATE TABLE `resource` (
`id` int(4) NOT NULL AUTO_INCREMENT COMMENT '主鍵',
`resource_name` varchar(64) NOT NULL DEFAULT '' COMMENT '資源名',
`share` varchar(64) NOT NULL DEFAULT '' COMMENT '狀態',
`version` int(4) NOT NULL DEFAULT '' COMMENT '版本號',
`desc` varchar(1024) NOT NULL DEFAULT '備註信息',
`update_time` timestamp NOT NULL DEFAULT '' COMMENT '保存數據時間,自動生成',
PRIMARY KEY (`id`),
UNIQUE KEY `uidx_resource_name` (`resource_name `) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='資源';
為表添加一個字段,版本號或者時間戳都可以。通過版本號或者時間戳,來保證多線程同時間操作共享資源的有序性和正確性。
偽代碼實現:
Resrouce resource = exeSql("select * from resource where resource_name = xxx");
boolean succ = exeSql("update resource set version= 'newVersion' ... where resource_name = xxx and version = 'oldVersion'");
if (!succ) {
// 發起重試
}
實際代碼中可以寫個while循環不斷重試,版本號不一致,更新失敗,重新獲取新的版本號,直到更新成功。
2. 優缺點
優點:
- 實現簡單,復雜度低
- 保障數據一致性
缺點:
- 性能低,並且有鎖表的風險
- 可靠性差
- 非阻塞操作失敗後,需要輪詢,占用CPU資源
- 長時間不commit或者是長時間輪詢,可能會占用較多的連接資源
0x04 分佈式鎖:基於Zookeeper
1. 實現思想
ZooKeeper是一個為分佈式應用提供一致性服務的開源組件,它內部是一個分層的文件系統目錄樹結構,規定同一個目錄下隻能有一個唯一文件名。
基於ZooKeeper實現分佈式鎖的步驟如下:
- 創建一個目錄mylock;
- 線程A想獲取鎖就在mylock目錄下創建臨時順序節點;
- 獲取mylock目錄下所有的子節點,然後獲取比自己小的兄弟節點,如果不存在,則說明當前線程順序號最小,獲得鎖;
- 線程B獲取所有節點,判斷自己不是最小節點,設置監聽比自己次小的節點;
- 線程A處理完,刪除自己的節點,線程B監聽到變更事件,判斷自己是不是最小的節點,如果是則獲得鎖。
整個過程如圖:
![](https://news.xinpengboligang.com/upload/keji/5668cadd651a2f4dd177cf8593b721f1.jpeg)
業界推薦直接使用Apache的開源庫Curator,它是一個ZooKeeper客戶端,Curator提供的InterProcessMutex是分佈式鎖的實現,acquire方法用於獲取鎖,release方法用於釋放鎖。
使用方式很簡單:
InterProcessMutex interProcessMutex = new InterProcessMutex(client,"/anyLock");
interProcessMutex.acquire();
interProcessMutex.release();
其實現分佈式鎖的核心源碼如下:
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
{
boolean haveTheLock = false;
boolean doDelete = false;
try {
if ( revocable.get() != null ) {
client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
}
while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock ) {
// 獲取當前所有節點排序後的集合
List<String> children = getSortedChildren();
// 獲取當前節點的名稱
String sequenceNodeName = ourPath.substring(basePath.length() 1); // 1 to include the slash
// 判斷當前節點是否是最小的節點
PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
if ( predicateResults.getsTheLock() ) {
// 獲取到鎖
haveTheLock = true;
} else {
// 沒獲取到鎖,對當前節點的上一個節點註冊一個監聽器
String previousSequencePath = basePath "/" predicateResults.getPathToWatch();
synchronized(this){
Stat stat = client.checkExists().usingWatcher(watcher).forPath(previousSequencePath);
if ( stat != null ){
if ( millisToWait != null ){
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if ( millisToWait <= 0 ){
doDelete = true; // timed out - delete our node
break;
}
wait(millisToWait);
}else{
wait();
}
}
}
// else it may have been deleted (i.e. lock released). Try to acquire again
}
}
}
catch ( Exception e ) {
doDelete = true;
throw e;
} finally{
if ( doDelete ){
deleteOurPath(ourPath);
}
}
return haveTheLock;
}
其實 Curator 實現分佈式鎖的底層原理和上面分析的是差不多的。如圖詳細描述其原理:
![](https://news.xinpengboligang.com/upload/keji/c1315f13069ef4e69ef43d3e622fcb32.jpeg)
另外,可基於Zookeeper自身的特性和原生Zookeeper API自行實現分佈式鎖。
2. 優缺點
優點:
- 可靠性非常高
- 性能較好
- CAP模型屬於CP,基於ZAB一致性算法實現
缺點:
- 性能並不如Redis(主要原因是在寫操作,即獲取鎖釋放鎖都需要在Leader上執行,然後同步到follower)
- 實現復雜度高
0x05 分佈式鎖:基於Redis
1. 實現思想
主要是基於命令:SETNX key value
命令官方文檔:
https://redis.io/commands/setnx
用法可參考:Redis命令參考
如圖:
![](https://news.xinpengboligang.com/upload/keji/65ea1f7e5fd3056637342a35061f43ee.jpeg)
實現思想的具體步驟:
- 獲取鎖的時候,使用setnx加鎖,並使用expire命令為鎖添加一個超時時間,超過該時間則自動釋放鎖,鎖的value值為一個隨機生成的UUID,通過此在釋放鎖的時候進行判斷。
- 獲取鎖的時候還設置一個獲取的超時時間,若超過這個時間則放棄獲取鎖。
- 釋放鎖的時候,通過UUID判斷是不是該鎖,若是該鎖,則執行delete進行鎖釋放。
具體的分佈式鎖的實現可參考後面內容
2. 優缺點
優點:
- 性能非常高
- 可靠性較高
- CAP模型屬於AP
缺點:
- 復雜度較高
- 無一致性算法,可靠性並不如Zookeeper
- 鎖刪除失敗 過期時間不好控制
- 非阻塞,獲取失敗後,需要輪詢不斷嘗試獲取鎖,比較消耗性能,占用cpu資源
0x06 分佈式鎖對比
- 從理解的難易程度角度(從低到高):數據庫 > 緩存 > Zookeeper
- 從實現的復雜性角度(從低到高):Zookeeper >= 緩存 > 數據庫
- 從性能角度(從高到低):緩存 > Zookeeper >= 數據庫
- 從可靠性角度(從高到低):Zookeeper > 緩存 > 數據庫
0x07 Redis分佈式鎖實現
下面以減庫存接口為例子,訪問接口的時候自動減商品的庫存
一、方案一
@Service
public class RedisLockDemo {
@Autowired
private StringRedisTemplate redisTemplate;
public String deduceStock() {
ValueOperations<String, String> valueOperations = redisTemplate.opsForValue();
//獲取redis中的庫存
int stock = Integer.valueOf(valueOperations.get("stock"));
if (stock > 0) {
int newStock = stock - 1;
valueOperations.set("stock", newStock "");
System.out.println("扣減庫存成功, 剩餘庫存:" newStock);
} else {
System.out.println("庫存已經為0,不能繼續扣減");
}
return "success";
}
}
表示:
- 先從Redis中讀取stock的值,表示商品的庫存
- 判斷商品庫存是否大於0,如果大於0,則庫存減1,然後再保存到Redis裡面去,否則就報錯
1. 改進
方案一這種簡單的從Redis讀取、判斷值再減1保存到Redis的操作,很容易在並發場景下出問題:
- 商品超賣
比如:
假設商品的庫存有50個,有3個用戶同時訪問該接口,先是同時讀取Redis中商品的庫存值,即都是讀取到了50,即同時執行到了這一行:
int stock = Integer.valueOf(valueOperations.get("stock"));
然後減1,即到了這一行:
int newStock = stock - 1;
此時3個用戶的realStock都是49,然後3個用戶都去設置stock為49,那麼就會產生庫存明明被3個用戶搶了,理論上是應該減去3的,結果庫存數隻減去了1導致商品超賣。
這種問題的產生原因是因為讀取庫存、減庫存、保存到Redis這幾步並不是原子操作
那麼可以使用加並發鎖synchronized來解決:
@Service
public class RedisLockDemo {
@Autowired
private StringRedisTemplate redisTemplate;
public String deduceStock() {
ValueOperations<String, String> valueOperations = redisTemplate.opsForValue();
synchronized (this) {
//獲取redis中的庫存
int stock = Integer.valueOf(valueOperations.get("stock"));
if (stock > 0) {
int newStock = stock - 1;
valueOperations.set("stock", newStock "");
System.out.println("扣減庫存成功, 剩餘庫存:" newStock);
} else {
System.out.println("庫存已經為0,不能繼續扣減");
}
}
return "success";
}
}
註意:在Java中關鍵字synchronized可以保證在同一時刻,隻有一個線程可以執行某個方法或某個代碼塊。
2. 再改進
以上的代碼在單體模式下並沒太大問題,但是在分佈式或集群架構環境下存在問題,比如架構如下:
![](https://news.xinpengboligang.com/upload/keji/2684e61c81ad09c98ac300e192035472.jpeg)
在分佈式或集群架構下,synchronized隻能保證當前的主機在同一時刻隻能有一個線程執行減庫存操作,但如圖同時有多個請求過來訪問的時候,不同主機在同一時刻依然是可以訪問減庫存接口的,這就導致問題1(商品超賣)在集群架構下依然存在。
註意:可以使用JMeter來模擬出高並發場景下訪問Nginx來測試觸發上面的問題
解決方法
使用如下的分佈式鎖進行解決
註意:方案一並不能稱之為分佈式鎖的
二、方案二
分佈式鎖的簡單實現如圖:
![](https://news.xinpengboligang.com/upload/keji/a84014583e5ce43534714ce72116451e.jpeg)
代碼實現如下:
@Service
public class RedisLockDemo {
@Autowired
private StringRedisTemplate redisTemplate;
public String deduceStock() {
ValueOperations<String, String> valueOperations = redisTemplate.opsForValue();
String lockKey = "product_001";
//加鎖: setnx
Boolean isSuccess = valueOperations.setIfAbsent(lockKey, "1");
if(null == isSuccess || isSuccess) {
System.out.println("服務器繁忙, 請稍後重試");
return "error";
}
//------ 執行業務邏輯 ----start------
int stock = Integer.valueOf(valueOperations.get("stock"));
if (stock > 0) {
int newStock = stock - 1;
//執行業務操作減庫存
valueOperations.set("stock", newStock "");
System.out.println("扣減庫存成功, 剩餘庫存:" newStock);
} else {
System.out.println("庫存已經為0,不能繼續扣減");
}
//------ 執行業務邏輯 ----end------
//釋放鎖
redisTemplate.delete(lockKey);
return "success";
}
}
其實就是對每一個商品加一把鎖,代碼裡面是product_001
- 使用setnx對商品進行加鎖
- 如成功說明加鎖成功,如失敗說明有其他請求搶占了該商品的鎖,則當前請求失敗退出
- 加鎖成功之後進行扣減庫存操作
- 刪除商品鎖
1. 改進1
上面的方式是有可能會造成死鎖的,比如說加鎖成功之後,扣減庫存的邏輯可能拋異常了,即並不會執行到釋放鎖的邏輯,那麼該商品鎖是一直沒有釋放,會成為死鎖的,其他請求完全無法扣減該商品的
使用try...catch...finally的方式可以解決拋異常的問題,如下:
@Service
public class RedisLockDemo {
@Autowired
private StringRedisTemplate redisTemplate;
public String deduceStock() {
ValueOperations<String, String> valueOperations = redisTemplate.opsForValue();
String lockKey = "product_001";
try {
//加鎖: setnx
Boolean isSuccess = valueOperations.setIfAbsent(lockKey, "1");
if(null == isSuccess || isSuccess) {
System.out.println("服務器繁忙, 請稍後重試");
return "error";
}
//------ 執行業務邏輯 ----start------
int stock = Integer.valueOf(valueOperations.get("stock"));
if (stock > 0) {
int newStock = stock - 1;
//執行業務操作減庫存
valueOperations.set("stock", newStock "");
System.out.println("扣減庫存成功, 剩餘庫存:" newStock);
} else {
System.out.println("庫存已經為0,不能繼續扣減");
}
//------ 執行業務邏輯 ----end------
} finally {
//釋放鎖
redisTemplate.delete(lockKey);
}
return "success";
}
}
把釋放鎖的邏輯放到finally裡面去,即不管try裡面的邏輯最終是成功還是失敗都會執行釋放鎖的邏輯
2. 改進2
那麼上面的方式是不是能夠解決死鎖的問題呢?
其實不然,除了拋異常之外,比如程序崩潰、服務器宕機、服務器重啟、請求超時被終止、發佈、人為kill等都有可能導致釋放鎖的邏輯沒有執行,比如對商品加分佈式鎖成功之後,在扣減庫存的時候服務器正在執行重啟,會導致沒有執行釋放鎖。
可以通過對鎖設置超時時間來防止死鎖的發生,使用Redis的expire命令可以對key進行設置超時時間,如圖:
![](https://news.xinpengboligang.com/upload/keji/c10c7500e1ae69a3850309fca9cdb9f6.jpeg)
代碼實現如下:
@Service
public class RedisLockDemo {
@Autowired
private StringRedisTemplate redisTemplate;
public String deduceStock() {
ValueOperations<String, String> valueOperations = redisTemplate.opsForValue();
String lockKey = "product_001";
try {
//加鎖: setnx
Boolean isSuccess = valueOperations.setIfAbsent(lockKey, "1");
//expire增加超時時間
redisTemplate.expire(lockKey, 10, TimeUnit.SECONDS);
if(null == isSuccess || isSuccess) {
System.out.println("服務器繁忙, 請稍後重試");
return "error";
}
//------ 執行業務邏輯 ----start------
int stock = Integer.valueOf(valueOperations.get("stock"));
if (stock > 0) {
int newStock = stock - 1;
//執行業務操作減庫存
valueOperations.set("stock", newStock "");
System.out.println("扣減庫存成功, 剩餘庫存:" newStock);
} else {
System.out.println("庫存已經為0,不能繼續扣減");
}
//------ 執行業務邏輯 ----end------
} finally {
//釋放鎖
redisTemplate.delete(lockKey);
}
return "success";
}
}
加鎖成功之後,把鎖的超時時間設置為10秒,即10秒之後自動會釋放鎖,避免死鎖的發生。
3. 改進3
但是上面的方式同樣會產生死鎖問題,加鎖和對鎖設置超時時間並不是原子操作,在加鎖成功之後,即將執行設置超時時間的時候系統發生崩潰,同樣還是會導致死鎖。
如圖:
![](https://news.xinpengboligang.com/upload/keji/cba4201b91b70e22ec3f538433cca21d.jpeg)
對此,有兩種做法:
- lua腳本
- set原生命令(Redis 2.6.12版本及以上)
一般是推薦使用set命令,Redis官方在2.6.12版本對set命令增加了NX、EX、PX等參數,即可以將上面的加鎖和設置時間放到一條命令上執行,通過set命令即可:
命令官方文檔:
https://redis.io/commands/set
用法可參考:Redis命令參考
如圖:
![](https://news.xinpengboligang.com/upload/keji/e89ba0192e4eb7911ab1c2a2acf5b471.jpeg)
SET key value NX 等同於 SETNX key value命令,並且可以使用EX參數來設置過期時間
註意:其實目前在Redis 2.6.12版本之後,所說的setnx命令,並非單單指Redis的SETNX key value命令,一般是代指Redis中對set命令加上nx參數進行使用,一般不會直接使用SETNX key value命令了
註意:Redis2.6.12之前的版本,隻能通過lua腳本來保證原子性了。
如圖:
![](https://news.xinpengboligang.com/upload/keji/39dcb737d7cc7c9613767054115fbaa4.jpeg)
代碼實現如下:
@Service
public class RedisLockDemo {
@Autowired
private StringRedisTemplate redisTemplate;
public String deduceStock() {
ValueOperations<String, String> valueOperations = redisTemplate.opsForValue();
String lockKey = "product_001";
try {
//加鎖: setnx 和 expire增加超時時間
Boolean isSuccess = valueOperations.setIfAbsent(lockKey, "1", 10, TimeUnit.SECONDS);
if(null == isSuccess || isSuccess) {
System.out.println("服務器繁忙, 請稍後重試");
return "error";
}
//------ 執行業務邏輯 ----start------
int stock = Integer.valueOf(valueOperations.get("stock"));
if (stock > 0) {
int newStock = stock - 1;
//執行業務操作減庫存
valueOperations.set("stock", newStock "");
System.out.println("扣減庫存成功, 剩餘庫存:" newStock);
} else {
System.out.println("庫存已經為0,不能繼續扣減");
}
//------ 執行業務邏輯 ----end------
} finally {
//釋放鎖
redisTemplate.delete(lockKey);
}
return "success";
}
}
4. 改進4
以上的方式其實還是存在著問題,在高並發場景下會存在問題,超時時間設置不合理導致的問題
大概的流程圖可參考:
![](https://news.xinpengboligang.com/upload/keji/91cdd6b5eca280ffc2c1e3bddc3b27b0.jpeg)
流程:
- 進程A加鎖之後,扣減庫存的時間超過設置的超時時間,這裡設置的鎖是10秒
- 在第10秒的時候由於時間到期了所以進程A設置的鎖被Redis釋放了(T5)
- 剛好進程B請求進來了,加鎖成功(T6)
- 進程A操作完成(扣減庫存)之後,把進程B設置的鎖給釋放了
- 剛好進程C請求進來了,加鎖成功
- 進程B操作完成之後,也把進程C設置的鎖給釋放了
- 以此類推…
解決方法也很簡單:
- 加鎖的時候,把值設置為唯一值,比如說UUID這種隨機數
- 釋放鎖的時候,獲取鎖的值判斷value是不是當前進程設置的唯一值,如果是再去刪除
如圖:
![](https://news.xinpengboligang.com/upload/keji/986ca37e24dd1a5529c7aa47a805016a.jpeg)
代碼如下:
@Service
public class RedisLockDemo {
@Autowired
private StringRedisTemplate redisTemplate;
public String deduceStock() {
ValueOperations<String, String> valueOperations = redisTemplate.opsForValue();
String lockKey = "product_001";
String clientId = UUID.randomUUID().toString();
try {
//加鎖: setnx 和 expire增加超時時間
Boolean isSuccess = valueOperations.setIfAbsent(lockKey, clientId, 10, TimeUnit.SECONDS);
if(null == isSuccess || isSuccess) {
System.out.println("服務器繁忙, 請稍後重試");
return "error";
}
//------ 執行業務邏輯 ----start------
int stock = Integer.valueOf(valueOperations.get("stock"));
if (stock > 0) {
int newStock = stock - 1;
//執行業務操作減庫存
valueOperations.set("stock", newStock "");
System.out.println("扣減庫存成功, 剩餘庫存:" newStock);
} else {
System.out.println("庫存已經為0,不能繼續扣減");
}
//------ 執行業務邏輯 ----end------
} finally {
if (clientId.equals(valueOperations.get(lockKey))) {
//釋放鎖
redisTemplate.delete(lockKey);
}
}
return "success";
}
}
5. 改進5
上面的方式其實存在一個明顯的問題,就是在finally代碼塊中,釋放鎖的時候,get和del並非原子操作,存在進程安全問題。
那麼刪除鎖的正確姿勢是使用lua腳本,通過redis的eval/evalsha命令來運行:
-- lua刪除鎖:
-- KEYS和ARGV分別是以集合方式傳入的參數,對應上文的Test和uuid。
-- 如果對應的value等於傳入的uuid。
if redis.call('get', KEYS[1]) == ARGV[1]
then
-- 執行刪除操作
return redis.call('del', KEYS[1])
else
-- 不成功,返回0
return 0
end
通俗一點的說,即lua腳本能夠保證原子性,在lua腳本裡執行是一個命令(eval/evalsha)去執行的,一條命令沒有執行完,其他客戶端是看不到的。
到此,基本上Redis的分佈式鎖的實現思想如下:
- 獲取鎖的時候,使用setnx加鎖,並使用expire命令為鎖添加一個超時時間,超過該時間則自動釋放鎖,鎖的value值為一個隨機生成的UUID,通過此在釋放鎖的時候進行判斷。
- 獲取鎖的時候還設置一個獲取的超時時間,若超過這個時間則放棄獲取鎖。
- 釋放鎖的時候,通過UUID判斷是不是該鎖,若是該鎖,則執行delete進行鎖釋放。
6. 改進6
雖然通過上面的方式解決了會刪除其他進程的鎖的問題,但是超時時間的設置依然是沒有解決的,設置成多少依然是個比較棘手的問題,設置少了容易導致業務沒有執行完鎖就被釋放了,而設置過大萬一服務出現異常無法正常釋放鎖會導致出現異常鎖的時間也很長。
怎麼解決這個問題呢?
目前大公司的一個方案是這樣子的:
- 在加鎖成功之後,啟動一個守護線程
- 守護線程每隔1/3的鎖的超時時間就去延遲鎖的超時時間,比如說鎖設置為30秒,那就是每隔10秒就去延長鎖的超時時間,重新設置為30秒
- 業務代碼執行完成,關閉守護線程
在實際操作中,需要註意幾點:
- 隻續對的:和釋放鎖一樣,需要判斷鎖的對象有沒有發生變化,否則會造成無論誰加鎖,守護線程都會重新設置鎖的超時時間
- 不能動不動就續:守護線程要在合理的時間再去設置鎖的超時時間,否則會造成資源的浪費
- 及時銷毀:如果加鎖的線程/進程已經處理完業務了,那麼守護進程應該被銷毀,否則會造成資源的浪費
三、方案三
上面的方案還得考慮Redis的部署問題。
眾所周知,Redis有3種部署方式:
- 單機模式
- Master-Slave Sentinel(哨兵)選舉模式
- Redis Cluster(集群)模式
使用 Redis 做分佈式鎖的缺點在於:如果采用單機部署模式,會存在單點問題,隻要 Redis 故障了。加鎖就不行了。
采用 Master-Slave 模式/集群模式,如下:
線程1加了鎖去執行業務了
剛好Redis的 master 發生故障掛掉了,此時還沒有將數據同步到 slave 上
集群會選舉一個新的 master 出來,但是新的 master 上並沒有這個鎖
線程2可以在新選舉產生的 master 上去加鎖,然後處理業務
這樣的話,就導致了兩個線程同時持有了鎖,鎖就不再具有安全性。
針對這個問題,有兩個解決方案:
- RedLock
- Zookeeper【推薦】
1. RedLock
基於以上的考慮,Redis的作者提出了一個RedLock的算法。
這個算法的意思大概是這樣的:假設 Redis 的部署模式是 Redis Cluster,總共有 5 個 Master 節點。
通過以下步驟獲取一把鎖:
- 獲取當前時間戳,單位是毫秒。
- 輪流嘗試在每個 Master 節點上創建鎖,過期時間設置較短,一般就幾十毫秒。
- 嘗試在大多數節點上建立一個鎖,比如 5 個節點就要求是 3 個節點(n / 2 1)。
- 客戶端計算建立好鎖的時間,如果建立鎖的時間小於超時時間,就算建立成功了。
- 要是鎖建立失敗了,那麼就依次刪除這個鎖。
- 隻要別人建立了一把分佈式鎖,你就得不斷輪詢去嘗試獲取鎖。
如圖:
![](https://news.xinpengboligang.com/upload/keji/6e0e090dac307b841944713c8a3bbb80.jpeg)
但是這樣的這種算法還是頗具爭議的,可能還會存在不少的問題,無法保證加鎖的過程一定正確,不太推薦。
更多關於RedLock的資料可參考:
- Redis官網的Redlock: https://redis.io/topics/distlock/
- 基於Redis的分佈式鎖到底安全嗎(上):http://zhangtielei.com/posts/blog-redlock-reasoning.html
- How to do distributed locking:https://martin.kleppmann.com/2016/02/08/how-to-do-distributed-locking.html
註意:除了RedLock之外目前並沒有有效解決Redis主從切換導致鎖失效的方法。在這種情況下(一致性要求非常高的情況下)一般是不會使用Redis,而推薦使用Zookeeper。
四、Redisson
目前業界對於Redis的分佈式鎖有了現成的實現方案了,比較出名的是Redisson開源框架。
Redisson 是 Redis 的 Java 實現的客戶端,其 API 提供了比較全面的 Redis 命令的支持。
Redission 通過 Netty 支持非阻塞 I/O。
Redisson 封裝了鎖的實現,讓我們像操作我們的本地 Lock一樣來使用,除此之外還有對集合、對象、常用緩存框架等做了友好的封裝,易於使用。
除此之外,Redisson還實現了分佈式鎖的自動續期機制、鎖的互斥自等待機制、鎖的可重入加鎖於釋放鎖的機制,可以說Redisson對分佈式鎖的實現是實現了一整套機制的。
Redisson 可以便捷的支持多種Redis部署架構:
- 單機模式
- Master-Slave Sentinel(哨兵)選舉模式
- Redis Cluster(集群)模式
引入Redission之後,使用上非常簡單,RedissonClient客戶端提供了眾多的接口實現,支持可重入鎖、公平鎖、讀寫鎖、鎖超時、RedLock等都提供了完整實現。
使用如下:
A. 引入maven
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.13.4</version>
</dependency>
B. 增加配置文件
@Configuration
public class RedissonConfig {
@Bean
public Redisson redisson() {
Config config = new Config();
//單機版
//config.useSingleServer().setAddress("redis://192.168.1.1:8001").setDatabase(0);
//集群版
config.useClusterServers()
.addNodeAddress("redis://192.168.1.1:8001")
.addNodeAddress("redis://192.168.1.1:8002")
.addNodeAddress("redis://192.168.1.2:8001")
.addNodeAddress("redis://192.168.1.2:8002")
.addNodeAddress("redis://192.168.1.3:8001")
.addNodeAddress("redis://192.168.1.3:8002");
return (Redisson) Redisson.create(config);
}
}
C. 分佈式鎖的實現
@Service
public class RedisLockDemo {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private Redisson redisson;
public String deduceStock() {
String lockKey = "lockKey";
RLock redissonLock = redisson.getLock(lockKey);
try {
//加鎖(超時默認30s), 實現鎖續命的功能(後臺啟動一個timer, 默認每10s檢測一次是否持有鎖)
redissonLock.lock();
//------ 執行業務邏輯 ----start------
int stock = Integer.valueOf(redisTemplate.opsForValue().get("stock"));
if (stock > 0) {
int newStock = stock - 1;
//執行業務操作減庫存
redisTemplate.opsForValue().set("stock", newStock "");
System.out.println("扣減庫存成功, 剩餘庫存:" newStock);
} else {
System.out.println("庫存已經為0,不能繼續扣減");
}
//------ 執行業務邏輯 ----end------
} finally {
//解鎖
redissonLock.unlock();
}
return "success";
}
}
實現的原理如下:
![](https://news.xinpengboligang.com/upload/keji/574be9545f5a8a230641a5d1f529c355.jpeg)
RedissonLock的使用介紹
// 鎖默認有效時間30秒,每10秒去檢查並重新設置超時時間
void lock();
// 超過鎖有效時間 leaseTime,就會釋放鎖
void lock(long leaseTime, TimeUnit unit);
// 嘗試獲取鎖;成功則返回true,失敗則返回false
boolean tryLock();
// 不會去啟動定時任務;在 time 時間內還沒有獲取到鎖,則返回false
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
// 不會去啟動定時任務;當 waitTime 的時間到了,還沒有獲取到鎖則返回false;若獲取到鎖了,鎖的有效時間設置為 leaseTime
boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;
也就是說,用法非常簡單,但是內部上實現了方案二裡面的所有細節:
- 為了兼容老的Redis版本,Redisson 所有指令都通過 Lua 腳本執行,Redis 支持 Lua 腳本原子性執行。
- Redisson 設置的Key 的默認過期時間為 30s,如果某個客戶端持有一個鎖超過了 30s 怎麼辦?Redisson 中有一個 Watchdog 的概念,翻譯過來就是看門狗,它會在你獲取鎖之後,每隔 10s 幫你把 Key 的超時時間設為 30s。
- 如果獲取鎖失敗,Redsson會通過while循環一直嘗試獲取鎖(可自定義等待時間,超時後返回失敗)
這樣的話,就算一直持有鎖也不會出現 Key 過期了,其他線程獲取到鎖的問題了。
另外,Redssion還提供了對Redlock算法的支持,用法也很簡單:
RedissonClient redisson = Redisson.create(config);
RLock lock1 = redisson.getFairLock("lock1");
RLock lock2 = redisson.getFairLock("lock2");
RLock lock3 = redisson.getFairLock("lock3");
RedissonRedLock multiLock = new RedissonRedLock(lock1, lock2, lock3);
multiLock.lock();
multiLock.unlock();
Redisson裡面關於加鎖/獲取鎖的Lua腳本流程圖如下:
![](https://news.xinpengboligang.com/upload/keji/78792ec1715287bd86b751c1d9e7b9e2.jpeg)
釋放鎖的Lua腳本流程圖如下:
![](https://news.xinpengboligang.com/upload/keji/6f7d47192fe9d1deb732eef3e415c4e0.jpeg)
強烈建議大傢看一下Redisson裡面關於分佈式鎖的源碼,更多關於Redisson的資料可參考:
- Redisson官網 https://redisson.org/
- Redisson官方中文文檔 https://www.bookstack.cn/read/redisson-doc-cn/overview.md
- Github https://gitcode.com/redisson/redisson?utm_source=csdn_github_accelerator&isLogin=1
- Redisson分佈式鎖實戰與Watch Dog機制解讀 https://www.cnblogs.com/keeya/p/14332131.html
- Redisson分佈式的原理 https://www.toutiao.com/article/6907120405061042692/
- 一文掌握Redisson分佈式鎖的原理 https://www.toutiao.com/article/6907120405061042692/
註意:Redison並不能有效的解決Redis的主從切換問題的,目前推薦使用Zookeeper分佈式鎖來解決。
五、分段鎖
怎麼在高並發的場景去實現一個高性能的分佈式鎖呢?
電商網站在大促的時候並發量很大:
(1)若搶購不是同一個商品,則可以增加Redis集群的cluster來實現,因為不是同一個商品,所以通過計算 key 的hash會落到不同的 cluster上;
(2)若搶購的是同一個商品,則計算key的hash值會落同一個cluster上,所以加機器也是沒有用的。
針對第二個問題,可以使用庫存分段鎖的方式去實現。
分段鎖
假如產品1有200個庫存,可以將這200個庫存分為10個段存儲(每段20個),每段存儲到一個cluster上;將key使用hash計算,使這些key最後落在不同的cluster上。
每個下單請求鎖了一個庫存分段,然後在業務邏輯裡面,就對數據庫或者是Redis中的那個分段庫存進行操作即可,包括查庫存 -> 判斷庫存是否充足 -> 扣減庫存。
具體可以參照 ConcurrentHashMap 的源碼去實現,它使用的就是分段鎖。
高性能分佈式鎖具體可參考鏈接:每秒上千訂單場景下的分佈式鎖高並發優化實踐!【石杉的架構筆記】
https://mp.weixin.qq.com/s/RLeujAj5rwZGNYMD0uLbrg
原理如圖:
![](https://news.xinpengboligang.com/upload/keji/f0e7a38fc2add76e6905a15bd89cb6fd.jpeg)
0x08 總結
總結:
- 追求數據可靠性/強一致性:使用Zookeeper
- 追求性能:選擇Redis,推薦Redisson
- Redis分佈式鎖目前最大問題在於:主從模式下/集群模式下,master節點宕機,異步同步數據導致鎖丟失問題
- Redis的RedLock算法具有很大爭議性,一般不推薦使用
0x09 附錄
Python代碼實現
註意:沒有實現看門狗的邏輯,需要自己實現
import redis
import uuid
import time
class LockService:
"""
基於Redis實現的分佈式鎖
"""
host = 'localhost'
port = 6379
password = ''
db = 1
def __init__(self, conn=None):
"""
如果不傳連接池的話,默認讀取配置的Redis作為連接池
:param conn:
"""
self.conn = conn if conn else self.get_redis_client()
def get_redis_client(self):
"""
獲取Redis連接
:return:
"""
return redis.Redis(
host=self.host,
port=self.port,
password=self.password,
db=self.db
)
def acquire_lock(self, lock_name, acquire_timeout=10, expire_time=30):
"""
加鎖/獲取鎖
如果不存在lock_name,則加鎖,並且設置過期時間,避免死鎖
如果存在lock_name,則刷新過期時間
:param lock_name: 鎖的名稱
:param acquire_timeout: 加鎖/獲取鎖的超時時間,默認10秒
:param expire_time: 鎖的超時時間,默認30秒
:return:
"""
lockname = f'lock:{lock_name}'
value = str(uuid.uuid4())
end_time = time.time() acquire_timeout
while time.time() < end_time:
# 如果不存在這個鎖則加鎖並設置過期時間,避免死鎖
if self.conn.set(lockname, value, ex=expire_time, nx=True):
return value
time.sleep(0.1)
return False
def release_lock(self, lock_name, value):
"""
釋放鎖
:param lock_name: 鎖的名稱
:param value: 鎖的值
:return:
"""
unlock_script = """
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
"""
lockname = f'lock:{lock_name}'
unlock = self.conn.register_script(unlock_script)
result = unlock(keys=[lockname], args=[value])
if result:
return True
else:
return False
參考
- 120分鐘搞懂Redis分佈式鎖的實現方式
- 分佈式鎖的三種實現方式
- Redis之分佈式鎖的使用
- 石杉的架構筆記:分佈式鎖的實現
- 石杉的架構筆記:每秒上千訂單場景下的分佈式鎖高並發優化實踐
- 面試不懂分佈式鎖?那得多吃虧
- Redisson官網
- Redisson官方中文文檔
- Redisson的Github
- Redisson分佈式鎖實戰與Watch Dog機制解讀
- Redisson分佈式的原理
- 一文掌握Redisson分佈式鎖的原理
- Redis官網的Redlock
- 基於Redis的分佈式鎖到底安全嗎(上)
- Distributed locks with Redis
- Maritin博客:How to do distributed locking
原文:
https://blog.csdn.net/jiandanokok/article/details/114296755
作者:jiandanokok