博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
两种分布式锁实现方案(一)
阅读量:6565 次
发布时间:2019-06-24

本文共 11266 字,大约阅读时间需要 37 分钟。

  hot3.png

一。为何使用分布式锁?

当应用服务器数量超过1台,对相同数据的访问可能造成访问冲突(特别是写冲突)。单纯使用关系数据库比如MYSQL的应用可以借助于事务来实现锁,也可以使用版本号等实现乐观锁,最大的缺陷就是可用性降低(性能差)。对于GLEASY这种满足大规模并发访问请求的应用来说,使用数据库事务来实现数据库就有些捉襟见肘了。另外对于一些不依赖数据库的应用,比如分布式文件系统,为了保证同一文件在大量读写操作情况下的正确性,必须引入分布式锁来约束对同一文件的并发操作。

二。对分布式锁的要求

1.高性能(分布式锁不能成为系统的性能瓶颈)
2.避免死锁(拿到锁的结点挂掉不会导致其它结点永远无法继续)
3.支持锁重入

三。方案1,基于zookeeper的分布式锁

/*** DistributedLockUtil.java* 分布式锁工厂类,所有分布式请求都由该工厂类负责**/public class DistributedLockUtil {	private static Object schemeLock = new Object();	private static Object mutexLock = new Object();	private static Map
mutexLockMap = new ConcurrentHashMap(); private String schema; private Map
cache = new ConcurrentHashMap
(); private static Map
instances = new ConcurrentHashMap(); public static DistributedLockUtil getInstance(String schema){ DistributedLockUtil u = instances.get(schema); if(u==null){ synchronized(schemeLock){ u = instances.get(schema); if(u == null){ u = new DistributedLockUtil(schema); instances.put(schema, u); } } } return u; } private DistributedLockUtil(String schema){ this.schema = schema; } private Object getMutex(String key){ Object mx = mutexLockMap.get(key); if(mx == null){ synchronized(mutexLock){ mx = mutexLockMap.get(key); if(mx==null){ mx = new Object(); mutexLockMap.put(key,mx); } } } return mx; } private DistributedReentrantLock getLock(String key){ DistributedReentrantLock lock = cache.get(key); if(lock == null){ synchronized(getMutex(key)){ lock = cache.get(key); if(lock == null){ lock = new DistributedReentrantLock(key,schema); cache.put(key, lock); } } } return lock; } public void reset(){ for(String s : cache.keySet()){ getLock(s).unlock(); } } /** * 尝试加锁 * 如果当前线程已经拥有该锁的话,直接返回false,表示不用再次加锁,此时不应该再调用unlock进行解锁 * * @param key * @return * @throws InterruptedException * @throws KeeperException */ public LockStat lock(String key) throws InterruptedException, KeeperException{ if(getLock(key).isOwner()){ return LockStat.NONEED; } getLock(key).lock(); return LockStat.SUCCESS; } public void clearLock(String key) throws InterruptedException, KeeperException{ synchronized(getMutex(key)){ DistributedReentrantLock l = cache.get(key); l.clear(); cache.remove(key); } } public void unlock(String key,LockStat stat) throws InterruptedException, KeeperException{ unlock(key,stat,false); } public void unlock(String key,LockStat stat,boolean keepalive) throws InterruptedException, KeeperException{ if(stat == null) return; if(LockStat.SUCCESS.equals(stat)){ DistributedReentrantLock lock = getLock(key); boolean hasWaiter = lock.unlock(); if(!hasWaiter && !keepalive){ synchronized(getMutex(key)){ lock.clear(); cache.remove(key); } } } } public static enum LockStat{ NONEED, SUCCESS }}

/***DistributedReentrantLock.java*本地线程之间锁争用,先使用虚拟机内部锁机制,减少结点间通信开销*/public class DistributedReentrantLock {	private static final Logger logger =  Logger.getLogger(DistributedReentrantLock.class);    private ReentrantLock reentrantLock = new ReentrantLock();    private WriteLock writeLock;    private long timeout = 3*60*1000;        private final Object mutex = new Object();    private String dir;    private String schema;        private final ExitListener exitListener = new ExitListener(){		@Override		public void execute() {			initWriteLock();		}	};		private synchronized void initWriteLock(){		logger.debug("初始化writeLock");		writeLock = new WriteLock(dir,new LockListener(){			@Override			public void lockAcquired() {				synchronized(mutex){					mutex.notify();				}			}			@Override			public void lockReleased() {			}    		    	},schema);				if(writeLock != null && writeLock.zk != null){			writeLock.zk.addExitListener(exitListener);		}				synchronized(mutex){			mutex.notify();		}	}	    public DistributedReentrantLock(String dir,String schema) {	    	this.dir = dir;    	this.schema = schema;    	initWriteLock();    }    public void lock(long timeout) throws InterruptedException, KeeperException {        reentrantLock.lock();//多线程竞争时,先拿到第一层锁        try{        	boolean res = writeLock.trylock();        	if(!res){	        	synchronized(mutex){					mutex.wait(timeout);				}	        	if(writeLock == null || !writeLock.isOwner()){	        		throw new InterruptedException("锁超时");	        	}        	}        }catch(InterruptedException e){        	reentrantLock.unlock();        	throw e;        }catch(KeeperException e){        	reentrantLock.unlock();        	throw e;        }    }        public void lock() throws InterruptedException, KeeperException {    	lock(timeout);    }    public void destroy()  throws KeeperException {    	writeLock.unlock();    }        public boolean unlock(){    	if(!isOwner()) return false;        try{        	writeLock.unlock();        	reentrantLock.unlock();//多线程竞争时,释放最外层锁        }catch(RuntimeException e){        	reentrantLock.unlock();//多线程竞争时,释放最外层锁        	throw e;        }                return reentrantLock.hasQueuedThreads();    }    public boolean isOwner() {        return reentrantLock.isHeldByCurrentThread() && writeLock.isOwner();    }	public void clear() {		writeLock.clear();	}}

/***WriteLock.java*基于zk的锁实现*一个最简单的场景如下:*1.结点A请求加锁,在特定路径下注册自己(会话自增结点),得到一个ID号1*2.结点B请求加锁,在特定路径下注册自己(会话自增结点),得到一个ID号2*3.结点A获取所有结点ID,判断出来自己是最小结点号,于是获得锁*4.结点B获取所有结点ID,判断出来自己不是最小结点,于是监听小于自己的最大结点(结点A)变更事件*5.结点A拿到锁,处理业务,处理完,释放锁(删除自己)*6.结点B收到结点A变更事件,判断出来自己已经是最小结点号,于是获得锁。*/public class WriteLock extends ZkPrimative {   private static final Logger LOG =  Logger.getLogger(WriteLock.class);    private final String dir;    private String id;    private LockNode idName;    private String ownerId;    private String lastChildId;    private byte[] data = {0x12, 0x34};    private LockListener callback;        public WriteLock(String dir,String schema) {        super(schema,true);        this.dir = dir;    }        public WriteLock(String dir,LockListener callback,String schema) {    	this(dir,schema);        this.callback = callback;    }    public LockListener getLockListener() {        return this.callback;    }        public void setLockListener(LockListener callback) {        this.callback = callback;    }    public synchronized void unlock() throws RuntimeException {    	if(zk == null || zk.isClosed()){    		return;    	}        if (id != null) {            try {            	 zk.delete(id, -1);               } catch (InterruptedException e) {                LOG.warn("Caught: " + e, e);                //set that we have been interrupted.               Thread.currentThread().interrupt();            } catch (KeeperException.NoNodeException e) {                // do nothing            } catch (KeeperException e) {                LOG.warn("Caught: " + e, e);                throw (RuntimeException) new RuntimeException(e.getMessage()).                    initCause(e);            }finally {                if (callback != null) {                    callback.lockReleased();                }                id = null;            }        }    }        private class LockWatcher implements Watcher {        public void process(WatchedEvent event) {            LOG.debug("Watcher fired on path: " + event.getPath() + " state: " +                     event.getState() + " type " + event.getType());            try {                trylock();            } catch (Exception e) {                LOG.warn("Failed to acquire lock: " + e, e);            }        }    }        private void findPrefixInChildren(String prefix, ZooKeeper zookeeper, String dir)         throws KeeperException, InterruptedException {        List
names = zookeeper.getChildren(dir, false); for (String name : names) { if (name.startsWith(prefix)) { id = dir + "/" + name; if (LOG.isDebugEnabled()) { LOG.debug("Found id created last time: " + id); } break; } } if (id == null) { id = zookeeper.create(dir + "/" + prefix, data, acl, EPHEMERAL_SEQUENTIAL); if (LOG.isDebugEnabled()) { LOG.debug("Created id: " + id); } } } public void clear() { if(zk == null || zk.isClosed()){ return; } try { zk.delete(dir, -1); } catch (Exception e) { LOG.error("clear error: " + e,e); } } public synchronized boolean trylock() throws KeeperException, InterruptedException { if(zk == null){ LOG.info("zk 是空"); return false; } if (zk.isClosed()) { LOG.info("zk 已经关闭"); return false; } ensurePathExists(dir); LOG.debug("id:"+id); do { if (id == null) { long sessionId = zk.getSessionId(); String prefix = "x-" + sessionId + "-"; idName = new LockNode(id); LOG.debug("idName:"+idName); } if (id != null) { List
names = zk.getChildren(dir, false); if (names.isEmpty()) { LOG.warn("No children in: " + dir + " when we've just " + "created one! Lets recreate it..."); id = null; } else { SortedSet
sortedNames = new TreeSet
(); for (String name : names) { sortedNames.add(new LockNode(dir + "/" + name)); } ownerId = sortedNames.first().getName(); LOG.debug("all:"+sortedNames); SortedSet
lessThanMe = sortedNames.headSet(idName); LOG.debug("less than me:"+lessThanMe); if (!lessThanMe.isEmpty()) { LockNode lastChildName = lessThanMe.last(); lastChildId = lastChildName.getName(); if (LOG.isDebugEnabled()) { LOG.debug("watching less than me node: " + lastChildId); } Stat stat = zk.exists(lastChildId, new LockWatcher()); if (stat != null) { return Boolean.FALSE; } else { LOG.warn("Could not find the" + " stats for less than me: " + lastChildName.getName()); } } else { if (isOwner()) { if (callback != null) { callback.lockAcquired(); } return Boolean.TRUE; } } } } } while (id == null); return Boolean.FALSE; } public String getDir() { return dir; } public boolean isOwner() { return id != null && ownerId != null && id.equals(ownerId); } public String getId() { return this.id; }}

使用本方案实现的分布式锁,可以很好地解决锁重入的问题,而且使用会话结点来避免死锁;性能方面,根据笔者自测结果,加锁解锁各一次算是一个操作,本方案实现的分布式锁,TPS大概为2000-3000,性能比较一般;

转载于:https://my.oschina.net/boltwu/blog/425648

你可能感兴趣的文章
float 浮动详解
查看>>
php中time()与$_SERVER[REQUEST_TIME]用法区别
查看>>
truncate table
查看>>
跟我一起学习ASP.NET 4.5 MVC4.0 (转)
查看>>
我的vim(持续更新)
查看>>
关于UIP协议栈主动发送数据
查看>>
CocoaPods ReactiveCocoa 学习实践一 之 配置环境
查看>>
数据结构-顺序输出数字
查看>>
Hadoop 2.0 Yarn代码:ResourcesManager端代码_RM端各模块服务的启动
查看>>
课后作业-阅读任务-阅读提问-1
查看>>
poj2407(欧拉函数模板题)
查看>>
mysql安装步骤
查看>>
Unix 入门
查看>>
DD测磁盘读写性能
查看>>
括号闭合问题
查看>>
提供一些对象有效性校验的方法
查看>>
配置相关的一些辅助类
查看>>
LeetCode解题思路:575. Distribute Candies
查看>>
POJ1609 UVALive2815 UVA1196 ZOJ1787 Tiling Up Blocks【二维最长上升子序列+DP】
查看>>
扩展欧几里得算法
查看>>