Java分布式锁解决方案

分布式锁实现方式

数据库悲观锁

所谓的悲观锁:顾名思义,就是很悲观,每次去拿数据的时候都认为别人会修改,所以每次拿数据的时候都会上锁。这样别人拿数据的时候就要等待直到锁的释放。

这里是采用oracle的 select …… where id=1 for update 来实现分布式锁,建议加上nowait,或者wait 以及 of

下面是demo:

select * from table where id=1 for update nowait;//当锁被占用,不等待直接报错
select * from table where id=1 for update wait 6;//当锁被占用,等待6s
select * from table where id=1 for update of columns nowait;//锁定执行的列,反之则锁所有列。

该方案,在高并发时显然不适用,依赖于数据库的性能以及锁机制,会造成锁无法释放。

数据库乐观锁

所谓的乐观锁:就是很乐观,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据。一般的方案都是加一个版本号字段(version),在查询数据时将版本号带出来,更新后将版本号+1,如果版本号一致才更新,并获取影响行数,如果没更新则报错。

redis的setnx

由于redis是单线程工作的,所以它存取key-value 的时候是单线程工作的,并且多个线程请求redis并不存在竞争问题。所以可以设置一个标识来作为一把锁,只有获取了该锁之后,才能对共享的资源进行操作,没有拿到锁的线程处于不断去取锁的状态,直到等到上一个线程释放锁(即后一个线程可以取到锁)或者超过规定超时时间不再取锁。

import com.test.core.base.utils.ApplicationUtil;
import com.test.redis.api.RedisStringOperationService;

/**
 * redis分布式锁
 * @author LIPENG
 * @date 2017年9月22日 下午4:25:56
 * @version V1.0
 */
public class RedisDistributionLock {
    private static final int DEFAULT_ACQUIRY_RESOLUTION_MILLIS = 100;
    /**
     * Lock key path.
     */
    private String lockKey;

    /**
     * 锁超时时间,防止线程在入锁以后,无限的执行等待
     */
    private int expireMsecs = 60 * 1000;

    /**
     * 锁等待时间,防止线程饥饿
     */
    private int timeoutMsecs = 10 * 1000;

    private volatile boolean locked = false;

    /**
     * Detailed constructor with default acquire timeout 10000 msecs and lock expiration of 60000 msecs.
     *
     * @param lockKey lock key (ex. account:1, ...)
     */
    public RedisDistributionLock(String lockKey) {
        this.lockKey = lockKey + "_lock";
    }

    /**
     * Detailed constructor with default lock expiration of 60000 msecs.
     *
     */
    public RedisDistributionLock(String lockKey, int timeoutMsecs) {
        this(lockKey);
        this.timeoutMsecs = timeoutMsecs;
    }

    /**
     * Detailed constructor.
     *
     */
    public RedisDistributionLock(String lockKey, int timeoutMsecs, int expireMsecs) {
        this(lockKey, timeoutMsecs);
        this.expireMsecs = expireMsecs;
    }

    /**
     * @return lock key
     */
    public String getLockKey() {
        return lockKey;
    }



    /**
     * 获得 lock.
     * 实现思路: 主要是使用了redis 的setnx命令,缓存了锁.
     * reids缓存的key是锁的key,所有的共享, value是锁的到期时间(注意:这里把过期时间放在value了,没有时间上设置其超时时间)
     * 执行过程:
     * 1.通过setnx尝试设置某个key的值,成功(当前没有这个锁)则返回,成功获得锁
     * 2.锁已经存在则获取锁的到期时间,和当前时间比较,超时的话,则设置新的值
     *
     * @return true if lock is acquired, false acquire timeouted
     * @throws InterruptedException in case of thread interruption
     */
    public synchronized boolean lock() throws InterruptedException {
        int timeout = timeoutMsecs;
        while (timeout >= 0) {
            long expires = System.currentTimeMillis() + expireMsecs + 1;
            String expiresStr = String.valueOf(expires); //锁到期时间
            if (getRedisStringOperationService().setNX(lockKey, expiresStr)) {
                // lock acquired
                locked = true;
                return true;
            }

            String currentValueStr = getRedisStringOperationService().get(lockKey); //redis里的时间
            if (currentValueStr != null && Long.parseLong(currentValueStr) < System.currentTimeMillis()) {
                //判断是否为空,不为空的情况下,如果被其他线程设置了值,则第二个条件判断是过不去的
                // lock is expired

                String oldValueStr = getRedisStringOperationService().getAndSet(lockKey, expiresStr);
                //获取上一个锁到期时间,并设置现在的锁到期时间,
                //只有一个线程才能获取上一个线上的设置时间,因为jedis.getSet是同步的
                if (oldValueStr != null && oldValueStr.equals(currentValueStr)) {
                    //防止误删(覆盖,因为key是相同的)了他人的锁——这里达不到效果,这里值会被覆盖,但是因为什么相差了很少的时间,所以可以接受

                    //[分布式的情况下]:如过这个时候,多个线程恰好都到了这里,但是只有一个线程的设置值和当前值相同,他才有权利获取锁
                    // lock acquired
                    locked = true;
                    return true;
                }
            }
            timeout -= DEFAULT_ACQUIRY_RESOLUTION_MILLIS;

            /*
                延迟100 毫秒,  这里使用随机时间可能会好一点,可以防止饥饿进程的出现,即,当同时到达多个进程,
                只会有一个进程获得锁,其他的都用同样的频率进行尝试,后面有来了一些进行,也以同样的频率申请锁,这将可能导致前面来的锁得不到满足.
                使用随机的等待时间可以一定程度上保证公平性
             */
            Thread.sleep(DEFAULT_ACQUIRY_RESOLUTION_MILLIS);

        }
        return false;
    }


    /**
     * 释放锁
     */
    public synchronized void unlock() {
        if (locked) {
        	getRedisStringOperationService().delete(lockKey);
            locked = false;
        }
    }
    
    private RedisStringOperationService getRedisStringOperationService(){
    	return	ApplicationUtil.getBean(RedisStringOperationService.class);
    }

}

调用方式:

RedisDistributionLock lock = new RedisDistributionLock("key", 10000, 20000);
		try {
			if (lock.lock()) {
				// 需要加锁的代码
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		} finally {
			lock.unlock();
		}

使用zookeeper

当很多进程需要访问共享资源时,我们可以通过zk来实现分布式锁。主要步骤是: 1.建立一个节点,假如名为:lock 。节点类型为持久节点(PERSISTENT) 2.每当进程需要访问共享资源时,会调用分布式锁的lock()或tryLock()方法获得锁,这个时候会在第一步创建的lock节点下建立相应的顺序子节点,节点类型为临时顺序节点(EPHEMERAL_SEQUENTIAL),通过组成特定的名字name+lock+顺序号。 3.在建立子节点后,对lock下面的所有以name开头的子节点进行排序,判断刚刚建立的子节点顺序号是否是最小的节点,假如是最小节点,则获得该锁对资源进行访问。 4.假如不是该节点,就获得该节点的上一顺序节点,并给该节点是否存在注册监听事件。同时在这里阻塞。等待监听事件的发生,获得锁控制权。 5.当调用完共享资源后,调用unlock()方法,关闭zk,进而可以引发监听事件,释放该锁。 实现的分布式锁是严格的按照顺序访问的并发锁。

package cn.wpeace.zktest;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.data.Stat;
/**
 * @author peace
 *
 */
public class DistributedLock implements Lock, Watcher{
    private ZooKeeper zk;
    private String root = "/locks";//根
    private String lockName;//竞争资源的标志
    private String waitNode;//等待前一个锁
    private String myZnode;//当前锁
    private CountDownLatch latch;//计数器
    private CountDownLatch connectedSignal=new CountDownLatch(1);
    private int sessionTimeout = 30000; 
    /**
     * 创建分布式锁,使用前请确认config配置的zookeeper服务可用
     * @param config 192.168.1.127:2181
     * @param lockName 竞争资源标志,lockName中不能包含单词_lock_
     */
    public DistributedLock(String config, String lockName){
        this.lockName = lockName;
        // 创建一个与服务器的连接
         try {
            zk = new ZooKeeper(config, sessionTimeout, this);
            connectedSignal.await();
            Stat stat = zk.exists(root, false);//此去不执行 Watcher
            if(stat == null){
                // 创建根节点
                zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); 
            }
        } catch (IOException e) {
            throw new LockException(e);
        } catch (KeeperException e) {
            throw new LockException(e);
        } catch (InterruptedException e) {
            throw new LockException(e);
        }
    }
    /**
     * zookeeper节点的监视器
     */
    public void process(WatchedEvent event) {
        //建立连接用
        if(event.getState()==KeeperState.SyncConnected){
            connectedSignal.countDown();
            return;
        }
        //其他线程放弃锁的标志
        if(this.latch != null) {  
            this.latch.countDown();  
        }
    }

    public void lock() {   
        try {
            if(this.tryLock()){
                System.out.println("Thread " + Thread.currentThread().getId() + " " +myZnode + " get lock true");
                return;
            }
            else{
                waitForLock(waitNode, sessionTimeout);//等待锁
            }
        } catch (KeeperException e) {
            throw new LockException(e);
        } catch (InterruptedException e) {
            throw new LockException(e);
        } 
    }
    public boolean tryLock() {
        try {
            String splitStr = "_lock_";
            if(lockName.contains(splitStr))
                throw new LockException("lockName can not contains \\u000B");
            //创建临时子节点
            myZnode = zk.create(root + "/" + lockName + splitStr, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
            System.out.println(myZnode + " is created ");
            //取出所有子节点
            List<String> subNodes = zk.getChildren(root, false);
            //取出所有lockName的锁
            List<String> lockObjNodes = new ArrayList<String>();
            for (String node : subNodes) {
                String _node = node.split(splitStr)[0];
                if(_node.equals(lockName)){
                    lockObjNodes.add(node);
                }
            }
            Collections.sort(lockObjNodes);

            if(myZnode.equals(root+"/"+lockObjNodes.get(0))){
                //如果是最小的节点,则表示取得锁
                System.out.println(myZnode + "==" + lockObjNodes.get(0));
                return true;
            }
            //如果不是最小的节点,找到比自己小1的节点
            String subMyZnode = myZnode.substring(myZnode.lastIndexOf("/") + 1);
            waitNode = lockObjNodes.get(Collections.binarySearch(lockObjNodes, subMyZnode) - 1);//找到前一个子节点
        } catch (KeeperException e) {
            throw new LockException(e);
        } catch (InterruptedException e) {
            throw new LockException(e);
        }
        return false;
    }
    public boolean tryLock(long time, TimeUnit unit) {
        try {
            if(this.tryLock()){
                return true;
            }
            return waitForLock(waitNode,time);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }
    private boolean waitForLock(String lower, long waitTime) throws InterruptedException, KeeperException {
        Stat stat = zk.exists(root + "/" + lower,true);//同时注册监听。
        //判断比自己小一个数的节点是否存在,如果不存在则无需等待锁,同时注册监听
        if(stat != null){
            System.out.println("Thread " + Thread.currentThread().getId() + " waiting for " + root + "/" + lower);
            this.latch = new CountDownLatch(1);
            this.latch.await(waitTime, TimeUnit.MILLISECONDS);//等待,这里应该一直等待其他线程释放锁
            this.latch = null;
        }
        return true;
    }
    public void unlock() {
        try {
            System.out.println("unlock " + myZnode);
            zk.delete(myZnode,-1);
            myZnode = null;
            zk.close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }
    public void lockInterruptibly() throws InterruptedException {
        this.lock();
    }
    public Condition newCondition() {
        return null;
    }

    public class LockException extends RuntimeException {
        private static final long serialVersionUID = 1L;
        public LockException(String e){
            super(e);
        }
        public LockException(Exception e){
            super(e);
        }
    }
}

调用方法:

DistributedLock lock   = new DistributedLock("192.168.1.127:2181","lock");
 lock.lock();
 //共享资源
 if(lock != null)
  lock.unlock();

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

分布式锁注意事项

设置锁超时时间

redis、数据库等实现的分布式锁,需要设置锁超时时间的原因在于:其他客户端无法得知已经获取锁的客户端的状态 是挂了呢,还是正在执行。所以只能傻傻的设置一个超时,认为超时之后就简单的判定获取锁的客户端挂了。

一旦锁设定了超时时间,可能获取锁的客户端因各种原因执行业务操作的时候耗时较长,超出了锁的超时时间,这时其他客户端就可以再次获取锁了,所以就会带来并发问题。

消除锁超时时间

为了消除这个锁超时,就需要由服务器来作为代理来通知,

如ZooKeeper,一旦客户端挂了,就会删除对应的临时节点,然后通知watch该节点的其他客户端。所以客户端不需要设置锁超时,就等待通知即可。

从这点来说ZooKeeper是更可靠的,降低了因锁超时带来的并发问题。

img

方案的高可用问题

redis、数据库等方案要想实现高可用,则必须有对应的高可用方案。如最简单的主从架构,又引入了一致性的问题,又会有很多的坑。

ZooKeeper方案本身可以做到高可用、一致性,所以ZooKeeper方案也更简单一些。

连接的单点问题

这个单点不是说redis或zookeeper的单点问题,而是客户端和服务器端的这个连接的单点问题。先来举个例子:

如ZooKeeper还是会出现并发问题的,如客户端获取到锁了之后,和ZooKeeper连接出现了session超时, 就会导致ZooKeeper集群删除对应的临时节点,其他客户端也就能获取到锁了,此时就存在并发问题。

这种问题的根由就是:客户端和ZooKeeper集群之间的连接是单连接,即只连接其中的一台机器。一旦该连接出现网络抖动, 这种分布式锁方案也会出现并发问题。

减少并发的措施:增大session的超时时间,尽量减少网络抖动,但是这也会降低服务器端对客户端的状态检测的灵敏度,这个灵敏度在分布式锁的场景下也不是特别重要,所以无所谓了。

消除连接的单点问题

要消除单点,必然是建立多连接来防止网络的抖动,即客户端连接多个服务器端,向每个服务器都执行获取锁的操作。

如redis的Redlock实现的分布式锁。

有N个独立的master服务器,客户端会向所有的服务器发送获取锁的操作。过半的服务器都获取到锁了则认为获取到锁了,这种也有很多细节。这种方式就解决了上述所说的ZooKeeper单连接可能造成的并发问题。

然而redis由于上述1所说的redis自身设计的问题,Redlock实现的分布式锁也会有锁超时问题,即也会存在并发。

所以理想中更好的方案就是:解决了上述2个问题,从而来进一步减少并发的可能性。

redis如果能像ZooKeeper一样,实现了和客户端绑定的临时key,一旦redis客户端挂了,临时key删除,通知watch该key的其他客户端(感觉这个是一个不错的需求,不知redis未来是否要实现),就可以消除锁超时,再使用Redlock实现的分布式锁,这时候可靠性就更高了。

本文侧重总结在可靠性方面的问题,性能嘛,单机的redis当然是最快的了,其次zookeeper,最后数据库。而上述第五点,Redlock方案牺牲了一些性能来换取了可靠性。

概览分布式锁

其实要解决2个高可用的问题:

  • 数据存储的高可用(解决基本使用) 如使用redis、数据库、ZooKeeper,他们承载着分布式锁需要的数据,不能是单点的,要集群高可用
  • 连接的高可用(降低并发的概率) 那就需要建立多连接,如向N个redis master建立连接,向每一个都获取锁。

所以应该理想的布局是:

和N个独立的服务器(如ZooKeeper)都建立连接,向每台服务器都请求获取锁的操作,过半成功才表示获取到锁

这N个独立的服务器既有数据的保障,又有多连接的保障。所以简单来说,应该和3个独立的ZooKeeper机器都建立连接,而不是这3台构成一个ZooKeeper集群。

本文分享自微信公众号 - Java编程指南(JavaXxzyfx)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2018-07-25

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

这些信息有用吗?
Do you have any suggestions for improvement?

Thanks for your feedback!