基于redis的分布式锁怎么实现
发表于:2025-12-01 作者:千家信息网编辑
千家信息网最后更新 2025年12月01日,本篇内容介绍了"基于redis的分布式锁怎么实现"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!基于r
千家信息网最后更新 2025年12月01日基于redis的分布式锁怎么实现
本篇内容介绍了"基于redis的分布式锁怎么实现"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
基于redis的分布式锁
/***分布式锁工厂类*/public class RedisLockUtil { private static final Logger logger = Logger.getLogger(RedisLockUtil.class); private static Object schemeLock = new Object(); private static Map instances = new ConcurrentHashMap(); public static RedisLockUtil getInstance(String schema){ RedisLockUtil u = instances.get(schema); if(u==null){ synchronized(schemeLock){ u = instances.get(schema); if(u == null){ LockObserver lo = new LockObserver(schema); u = new RedisLockUtil(schema,lo); instances.put(schema, u); } } } return u; } private Object mutexLock = new Object(); private Map mutexLockMap = new ConcurrentHashMap(); private Map cache = new ConcurrentHashMap(); private DelayQueue dq = new DelayQueue(); private AbstractLockObserver lo; public RedisLockUtil(String schema, AbstractLockObserver lo){ Thread th = new Thread(lo); th.setDaemon(false); th.setName("Lock Observer:"+schema); th.start(); clearUselessLocks(schema); this.lo = lo; } public void clearUselessLocks(String schema){ Thread th = new Thread(new Runnable(){ @Override public void run() { while(!SystemExitListener.isOver()){ try { RedisReentrantLock t = dq.take(); if(t.clear()){ String key = t.getKey(); synchronized(getMutex(key)){ cache.remove(key); } } t.resetCleartime(); } catch (InterruptedException e) { } } } }); th.setDaemon(true); th.setName("Lock cleaner:"+schema); th.start(); } private Object getMutex(String key){ Object mx = mutexLockMap.get(key); if(mx == null){ synchronized(mutexLock){ mx = mutexLockMap.get(key); if(mx==null){ mx = new Object(); mutexLockMap.put(key,mx); } } } return mx; } private RedisReentrantLock getLock(String key,boolean addref){ RedisReentrantLock lock = cache.get(key); if(lock == null){ synchronized(getMutex(key)){ lock = cache.get(key); if(lock == null){ lock = new RedisReentrantLock(key,lo); cache.put(key, lock); } } } if(addref){ if(!lock.incRef()){ synchronized(getMutex(key)){ lock = cache.get(key); if(!lock.incRef()){ lock = new RedisReentrantLock(key,lo); cache.put(key, lock); } } } } return lock; } public void reset(){ for(String s : cache.keySet()){ getLock(s,false).unlock(); } } /** * 尝试加锁 * 如果当前线程已经拥有该锁的话,直接返回,表示不用再次加锁,此时不应该再调用unlock进行解锁 * * @param key * @return * @throws Exception * @throws InterruptedException * @throws KeeperException */ public LockStat lock(String key) { return lock(key,-1); } public LockStat lock(String key,int timeout) { RedisReentrantLock ll = getLock(key,true); ll.incRef(); try{ if(ll.isOwner(false)){ ll.descrRef(); return LockStat.NONEED; } if(ll.lock(timeout)){ return LockStat.SUCCESS; }else{ ll.descrRef(); if(ll.setCleartime()){ dq.put(ll); } return null; } }catch(LockNotExistsException e){ ll.descrRef(); return lock(key,timeout); }catch(RuntimeException e){ ll.descrRef(); throw e; } } public void unlock(String key,LockStat stat) { unlock(key,stat,false); } public void unlock(String key,LockStat stat,boolean keepalive){ if(stat == null) return; if(LockStat.SUCCESS.equals(stat)){ RedisReentrantLock lock = getLock(key,false); boolean candestroy = lock.unlock(); if(candestroy && !keepalive){ if(lock.setCleartime()){ dq.put(lock); } } } } public static enum LockStat{ NONEED, SUCCESS }} /**
*分布式锁本地代理类
*/
public class RedisReentrantLock implements Delayed{
private static final Logger logger = Logger.getLogger(RedisReentrantLock.class);
private ReentrantLock reentrantLock = new ReentrantLock();
private RedisLock redisLock;
private long timeout = 3*60;
private CountDownLatch lockcount = new CountDownLatch(1);
private String key;
private AbstractLockObserver observer;
private int ref = 0;
private Object refLock = new Object();
private boolean destroyed = false;
private long cleartime = -1;
public RedisReentrantLock(String key,AbstractLockObserver observer) {
this.key = key;
this.observer = observer;
initWriteLock();
}
public boolean isDestroyed() {
return destroyed;
}
private synchronized void initWriteLock(){
redisLock = new RedisLock(key,new LockListener(){
@Override
public void lockAcquired() {
lockcount.countDown();
}
@Override
public long getExpire() {
return 0;
}
@Override
public void lockError() {
/*synchronized(mutex){
mutex.notify();
}*/
lockcount.countDown();
}
},observer);
}
public boolean incRef(){
synchronized(refLock){
if(destroyed) return false;
ref ++;
}
return true;
}
public void descrRef(){
synchronized(refLock){
ref --;
}
}
public boolean clear() {
if(destroyed) return true;
synchronized(refLock){
if(ref > 0){
return false;
}
destroyed = true;
redisLock.clear();
redisLock = null;
return true;
}
}
public boolean lock(long timeout) throws LockNotExistsException{
if(timeout <= 0) timeout = this.timeout;
//incRef();
reentrantLock.lock();//多线程竞争时,先拿到第一层锁
if(redisLock == null){
reentrantLock.unlock();
//descrRef();
throw new LockNotExistsException();
}
try{
lockcount = new CountDownLatch(1);
boolean res = redisLock.trylock(timeout);
if(!res){
lockcount.await(timeout, TimeUnit.SECONDS);
//mutex.wait(timeout*1000);
if(!redisLock.doExpire()){
reentrantLock.unlock();
return false;
}
}
return true;
}catch(InterruptedException e){
reentrantLock.unlock();
return false;
}
}
public boolean lock() throws LockNotExistsException {
return lock(timeout);
}
public boolean unlock(){
if(!isOwner(true)) {
try{
throw new RuntimeException("big ================================================ error.key:"+key);
}catch(Exception e){
logger.error("err:"+e,e);
}
return false;
}
try{
redisLock.unlock();
reentrantLock.unlock();//多线程竞争时,释放最外层锁
}catch(RuntimeException e){
reentrantLock.unlock();//多线程竞争时,释放最外层锁
throw e;
}finally{
descrRef();
}
return canDestroy();
}
public boolean canDestroy(){
synchronized(refLock){
return ref <= 0;
}
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public boolean isOwner(boolean check) {
synchronized(refLock){
if(redisLock == null) {
logger.error("reidsLock is null:key="+key);
return false;
}
boolean a = reentrantLock.isHeldByCurrentThread();
boolean b = redisLock.isOwner();
if(check){
if(!a || !b){
logger.error(key+";a:"+a+";b:"+b);
}
}
return a && b;
}
}
public boolean setCleartime() {
synchronized(this){
if(cleartime>0) return false;
this.cleartime = System.currentTimeMillis() + 10*1000;
return true;
}
}
public void resetCleartime(){
synchronized(this){
this.cleartime = -1;
}
}
@Override
public int compareTo(Delayed object) {
if(object instanceof RedisReentrantLock){
RedisReentrantLock t = (RedisReentrantLock)object;
long l = this.cleartime - t.cleartime;
if(l > 0) return 1 ; //比当前的小则返回1,比当前的大则返回-1,否则为0
else if(l < 0 ) return -1;
else return 0;
}
return 0;
}
@Override
public long getDelay(TimeUnit unit) {
long d = unit.convert(cleartime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
return d;
}
}
/***使用Redis实现的分布式锁*基本工作原理如下:*1. 使用setnx(key,时间戮+超时),如果设置成功,则直接拿到锁*2. 如果设置不成功,获取key的值v1(它的到期时间戮),跟当前时间对比,看是否已经超时*3. 如果超时(说明拿到锁的结点已经挂掉),v2=getset(key,时间戮+超时+1),判断v2是否等于v1,如果相等,加锁成功,否则加锁失败,等过段时间再重试(200MS)*/public class RedisLock implements LockListener{ private String key; private boolean owner = false; private AbstractLockObserver observer = null; private LockListener lockListener = null; private boolean waiting = false; private long expire;//锁超时时间,以秒为单位 private boolean expired = false; public RedisLock(String key, LockListener lockListener, AbstractLockObserver observer) { this.key = key; this.lockListener = lockListener; this.observer = observer; } public boolean trylock(long expire) { synchronized(this){ if(owner){ return true; } this.expire = expire; this.expired = false; if(!waiting){ owner = observer.tryLock(key,expire); if(!owner){ waiting = true; observer.addLockListener(key, this); } } return owner; } } public boolean isOwner() { return owner; } public void unlock() { synchronized(this){ observer.unLock(key); owner = false; } } public void clear() { synchronized(this){ if(waiting) { observer.removeLockListener(key); waiting = false; } } } public boolean doExpire(){ synchronized(this){ if(owner) return true; if(expired) return false; expired = true; clear(); } return false; } @Override public void lockAcquired() { synchronized(this){ if(expired){ unlock(); return; } owner = true; waiting = false; } lockListener.lockAcquired(); } @Override public long getExpire() { return this.expire; } @Override public void lockError() { synchronized(this){ owner = false; waiting = false; lockListener.lockError(); } }}public class LockObserver extends AbstractLockObserver implements Runnable{ private CacheRedisClient client; private Object mutex = new Object(); private Map lockMap = new ConcurrentHashMap(); private boolean stoped = false; private long interval = 500; private boolean terminated = false; private CountDownLatch doneSignal = new CountDownLatch(1); public LockObserver(String schema){ client = new CacheRedisClient(schema); SystemExitListener.addTerminateListener(new ExitHandler(){ public void run() { stoped = true; try { doneSignal.await(); } catch (InterruptedException e) { } } }); } public void addLockListener(String key,LockListener listener){ if(terminated){ listener.lockError(); return; } synchronized(mutex){ lockMap.put(key, listener); } } public void removeLockListener(String key){ synchronized(mutex){ lockMap.remove(key); } } @Override public void run() { while(!terminated){ long p1 = System.currentTimeMillis(); Map clone = new HashMap(); synchronized(mutex){ clone.putAll(lockMap); } Set keyset = clone.keySet(); if(keyset.size() > 0){ ConnectionFactory.setSingleConnectionPerThread(keyset.size()); for(String key : keyset){ LockListener ll = clone.get(key); try{ if(tryLock(key,ll.getExpire())) { ll.lockAcquired(); removeLockListener(key); } }catch(Exception e){ ll.lockError(); removeLockListener(key); } } ConnectionFactory.releaseThreadConnection(); }else{ if(stoped){ terminated = true; doneSignal.countDown(); return; } } try { long p2 = System.currentTimeMillis(); long cost = p2 - p1; if(cost <= interval){ Thread.sleep(interval - cost); }else{ Thread.sleep(interval*2); } } catch (InterruptedException e) { } } } /** * 超时时间单位为s!!! * @param key * @param expire * @return */ public boolean tryLock(final String key,final long expireInSecond){ if(terminated) return false; final long tt = System.currentTimeMillis(); final long expire = expireInSecond * 1000; final Long ne = tt + expire; List 使用本方案实现的分布式锁,可以完美地解决锁重入问题;通过引入超时也避免了死锁问题;性能方面,笔者自测试结果如下:
500线程 tps = 35000
[root@DB1 benchtest-util]# target/benchtest/bin/TestFastRedis /data/config/util/config_0_11.properties lock 500 500000
线程总时间:6553466;平均:13.106932
实际总时间:13609; 平均:0.027218
TPS达到35000,比方案1强了整整一个数量级
"基于redis的分布式锁怎么实现"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!
时间
分布式
线程
成功
竞争
内容
单位
外层
实际
更多
知识
问题
实用
学有所成
接下来
不用
再次
原理
困境
工厂
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
数据库三级考试速过
网络技术本科学校
毕业论文使用别人的数据库
方舟搜不到国内服务器
数据库中为什么创建索引
戴尔塔式服务器柜
义乌电脑软件开发自学步骤
明日之后圣诞服务器
深圳市创邦帮网络技术
网络安全的学Python吗
左外连接数据库
海南新概念互联网科技有限公司
上海hp服务器阵列卡品牌
混沌与秩序2几个服务器
网络安全保密七条刚性规定
找靓机网络服务器
dota2日本服务器人多吗
软件开发人员 智商
专业课程思政网络安全
网管是网络技术员吗
主服务器进度条wa
广西网络安全等级保护测评机构
艾尔登法环联机连不上服务器
数据库字段前两个字符替换
网络安全教育板报 大学生
oracle数据库算法题
网络安全策划活动背景图片
北京拉卡拉网络技术有
与internet时间服务器同步选哪个
如何租借服务器