zookeeper分布式锁
分布式锁介绍
分布式锁主要用于在分布式环境中保护跨进程、跨主机、跨网络的**共享资源实现互斥访问,以达到保证数据的一致性**。
本文主要谈的是Zookeeper的分布式锁,在此之前先了解下基于数据库和redis的分布式锁的实现。
基于数据库
1. 基于数据库表
最简单的方式可能就是直接创建一张锁表,当我们要锁住某个方法或资源时,我们就在该表中增加一条记录,想要释放锁的时候就删除这条记录。给某字段添加唯一性约束,如果有多个请求同时提交到数据库的话,数据库会保证只有一个操作可以成功,那么我们就可以认为操作成功的那个线程获得了该方法的锁,可以执行方法体内容。
会引入数据库单点、无失效时间、不阻塞、不可重入等问题。
2. 基于数据库排他锁
如果使用的是MySql的InnoDB引擎,在查询语句后面增加for update
,数据库会在查询过程中(须通过唯一索引查询)给数据库表增加排他锁,我们可以认为获得排它锁的线程即可获得分布式锁,通过 connection.commit() 操作来释放锁。
会引入数据库单点、不可重入、无法保证一定使用行锁(部分情况下MySQL自动使用表锁而不是行锁)、排他锁长时间不提交导致占用数据库连接等问题。
3. 数据库实现分布式锁总结
优点:
- 直接借助数据库,容易理解。
缺点:
- 会引入更多的问题,使整个方案变得越来越复杂
- 操作数据库需要一定的开销,有一定的性能问题
- 使用数据库的行级锁并不一定靠谱,尤其是当我们的锁表并不大的时候
基于缓存
相比较于基于数据库实现分布式锁的方案来说,基于缓存来实现在性能方面会表现的更好一点。目前有很多成熟的缓存产品,包括Redis、memcached、tair等。
这里以Redis为例举出几种实现方法:
1. 基于 redis 的 setnx()、expire() 方法做分布式锁
setnx 的含义就是SET if Not Exists
,其主要有两个参数 setnx(key, value)
。该方法是原子的,如果 key 不存在,则设置当前 key 成功,返回 1;如果当前 key 已经存在,则设置当前 key 失败,返回 0。
expire 设置过期时间,要注意的是 setnx 命令不能设置 key 的超时时间,只能通过 expire() 来对 key 设置。
2. 基于 redis 的 setnx()、get()、getset()方法做分布式锁
getset 这个命令主要有两个参数 getset(key,newValue)
,该方法是原子的,对 key 设置 newValue 这个值,并且返回 key 原来的旧值。
3. 基于 Redlock 做分布式锁
Redlock 是 Redis 的作者 antirez 给出的集群模式的 Redis 分布式锁,它基于 N 个完全独立的 Redis 节点(通常情况下 N 可以设置成 5)
4. 基于 redisson 做分布式锁
redisson 是 redis 官方的分布式锁组件
5. 基于缓存实现分布式锁总结
优点:
- 性能好
缺点:
- 实现中需要考虑的因素太多
- 通过超时时间来控制锁的失效时间并不是十分的靠谱
Zookeeper 如何实现分布式锁?
下面讲如何实现排他锁和共享锁,以及如何解决羊群效应。
排他锁:利用zk上临时节点路径的唯一性,哪个JVM先创建临时节点,哪个JVM就成功获取锁。
共享锁:在zk统一持久节点下创建临时顺序节点,哪个JVM创建的顺序节点的序号最小哪个JVM先获取锁。
排他锁
排他锁,又称写锁或独占锁。如果事务T1对数据对象O1加上了排他锁,那么在整个加锁期间,只允许事务T1对O1进行读取或更新操作,其他任务事务都不能对这个数据对象进行任何操作,直到T1释放了排他锁。
排他锁核心是保证当前有且仅有一个事务获得锁,并且锁释放之后,所有正在等待获取锁的事务都能够被通知到。
Zookeeper 的强一致性特性,能够很好地保证在分布式高并发情况下节点的创建一定能够保证全局唯一性**(节点路径唯一)**,即Zookeeper将会保证客户端无法重复创建一个已经存在的数据节点。可以利用Zookeeper这个特性,实现排他锁。
- 定义锁:通过Zookeeper上创建临时节点来表示一个锁
- 获取锁:客户端通过调用
create
方法创建表示锁的临时节点,可以认为创建成功的客户端获得了锁,同时可以让没有获得锁的节点在该节点上注册Watcher监听,以便实时监听到/lock节点的变更情况 - 释放锁:以下两种情况都可以让锁释放
- 当前获得锁的客户端发生宕机或异常,那么Zookeeper上这个临时节点就会被删除
- 正常执行完业务逻辑,客户端主动删除自己创建的临时节点
排他锁流程图:
步骤:
1、客户端尝试获取锁,先判断锁是否被占用(判断/lock路径是否存在),是则客户端进入等待,并对/lock节点,注册watcher事件。反之进入步骤2。
2、客户端创建/lock临时节点,创建失败,客户端进入等待,并对/lock节点,注册watcher事件。创建成功进入步骤3。
3、客户端创建/lock节点成功,则获取锁成功,客户端正常完成业务逻辑或者宕机或异常,释放锁,进入步骤4。
4、释放锁,此时监听/lock节点watcher事件,会通知(唤醒)所有等待获取锁的客户端(一个Watcher实例是一个回调函数,被回调一次后就被移除了),然后所有被 唤醒的客户端进入步骤1,再次尝试获取锁。以上流程重复循环。
共享锁
共享锁,又称读锁。如果事务T1对数据对象O1加上了共享锁,那么当前事务只能对O1进行读取操作,其他事务也只能对这个数据对象加共享锁,直到该数据对象上的所有共享锁都被释放。
共享锁与排他锁的区别在于,加了排他锁之后,数据对象只对当前事务可见,而加了共享锁之后,数据对象对所有事务都可见。
- 定义锁:通过Zookeeper上的数据节点来表示一个锁,是一个类似于 /l
ock/[hostname]-请求类型-序号
的临时顺序节点, 临时顺序节点是指这个:/[hostname]-请求类型-序号
- 获取锁:客户端通过调用
create
方法创建表示锁的临时顺序节点,如果是读请求,则创建/lock/[hostname]-R-序号
节点,如果是写请求则创建/lockpath/[hostname]-W-序号
节点 - 释放锁:与排他锁逻辑一致
临时顺序节点结构:
基于Zookeeper实现共享锁流程:
- 判断读写顺序:大概分为4个步骤
- 1)创建完节点后,获取
/lock
节点下的所有子节点,并对该节点注册子节点变更的Watcher监听 - 2)确定自己的节点序号在所有子节点中的顺序
- 3.1)对于读请求:1. 如果没有比自己序号更小的子节点,或者比自己序号小的子节点都是读请求,那么表明自己已经成功获取到了共享锁,同时开始执行读取逻辑 2. 如果有比自己序号小的子节点有写请求,那么等待
- 3.2)对于写请求,如果自己不是序号最小的节点,那么等待
- 4)接收到Watcher通知后,重复步骤1)
羊群效应
在实现共享锁的 “判断读写顺序” 的第1个步骤是:创建完节点后,获取 /lockpath
节点下的所有子节点,并对该节点注册子节点变更的Watcher监听。这样的话,任何一次客户端移除共享锁之后,Zookeeper将会发送子节点变更的Watcher通知给所有机器,系统中将有大量的 “Watcher通知” 和 “子节点列表获取” 这个操作重复执行,然后所有节点再判断自己是否是序号最小的节点(写请求)或者判断比自己序号小的子节点是否都是读请求(读请求),从而继续等待下一次通知。
然而,这些重复操作很多都是 “无用的”,实际上每个锁竞争者只需要关注序号比自己小的那个节点是否存在即可
当集群规模比较大时,这些 “无用的” 操作不仅会对Zookeeper造成巨大的性能影响和网络冲击,更为严重的是,如果同一时间有多个客户端释放了共享锁,Zookeeper服务器就会在短时间内向其余客户端发送大量的事件通知–这就是所谓的 “羊群效应“。
改进后的分布式锁实现:
具体实现如下:
- 客户端调用
create
方法创建一个类似于/lock/[hostname]-请求类型-序号
的临时顺序节点 - 客户端调用
getChildren
方法获取所有已经创建的子节点列表(这里不注册任何Watcher)
- 读请求:向比自己序号小的最后一个写请求节点注册Watcher监听
- 写请求:向比自己序号小的最后一个节点注册Watcher监听
- 如果无法获取任何共享锁,那么调用
exist
来对比自己小的那个节点注册Watcher - 等待Watcher监听,继续进入步骤2
Zookeeper羊群效应改进前后Watcher监听图
代码案例:
方案一:采用临时节点,模拟分布式生成订单号
1、引入maven依赖
<dependencies>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency>
</dependencies>
2、创建订单OrderNumGenerator类
//生成订单类(订单号:时间戳+业务ID)
public class OrderNumGenerator {
//全局订单id
public static int count = 0;
//时间戳+业务ID
public String getNumber() {
try {
Thread.sleep(200);
} catch (Exception e) {
}
SimpleDateFormat simpt = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss");
return simpt.format(new Date()) + "-" + ++count;
}
}
3、定义Lock接口
public interface Lock {
//获取到锁的资源
public void getLock();
// 释放锁
public void unLock();
}
4、创建ZookeeperAbstractLock抽象类
public abstract class ZookeeperAbstractLock implements Lock {
// zk连接地址
private static final String CONNECTSTRING = "127.0.0.1:2181";
// 创建zk连接
protected ZkClient zkClient = new ZkClient(CONNECTSTRING);
protected static final String PATH = "/lock";
public void getLock() {
if (tryLock()) {
System.out.println("##获取lock锁的资源####");
} else {
// 等待
waitLock();
// 重新获取锁资源
getLock();
}
}
// 获取锁资源
abstract boolean tryLock();
// 等待
abstract void waitLock();
//释放锁
public void unLock() {
if (zkClient != null) {
zkClient.close();
System.out.println("释放锁资源...");
}
}
}
5、创建ZookeeperDistrbuteLock类
public class ZookeeperDistrbuteLock extends ZookeeperAbstractLock {
private CountDownLatch countDownLatch = null;
@Override
boolean tryLock() {
try {
zkClient.createEphemeral(PATH);
return true;
} catch (Exception e) {
return false;
}
}
@Override
void waitLock() {
IZkDataListener izkDataListener = new IZkDataListener() {
public void handleDataDeleted(String path) throws Exception {
// 唤醒被等待的线程
if (countDownLatch != null) {
countDownLatch.countDown();
}
}
public void handleDataChange(String path, Object data) throws Exception {
}
};
// 注册事件
zkClient.subscribeDataChanges(PATH, izkDataListener);
if (zkClient.exists(PATH)) {
countDownLatch = new CountDownLatch(1);
try {
countDownLatch.await();
} catch (Exception e) {
e.printStackTrace();
}
}
// 删除监听
zkClient.unsubscribeDataChanges(PATH, izkDataListener);
}
}
测试
public class OrderService implements Runnable {
private OrderNumGenerator orderNumGenerator = new OrderNumGenerator();
// 使用lock锁
// private java.util.concurrent.locks.Lock lock = new ReentrantLock();
private Lock lock = new ZookeeperDistrbuteLock();
public void run() {
getNumber();
}
public void getNumber() {
try {
lock.getLock();
String number = orderNumGenerator.getNumber();
System.out.println(Thread.currentThread().getName() + ",生成订单ID:" + number);
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unLock();
}
}
public static void main(String[] args) {
System.out.println("####生成唯一订单号###");
// OrderService orderService = new OrderService();
for (int i = 0; i < 100; i++) {
new Thread(new OrderService()).start();
}
}
}
以上是以zookeeper创建临时节点的方式,实现分布式锁。会产生羊群之惊群效应。
方案二:采用临时顺序节点
参照方案一改一个地方即可:
public class ZooKeeperDistrbuteLock2 extends ZooKeeperAbstractLock {
private CountDownLatch countDownLatch = null;
/**
* 当前请求节点的前一个节点
*/
private String beforePath;
/**
* 当前请求的节点
*/
private String currentPath;
// 创建持久节点,保存临时顺序节点
public ZooKeeperDistrbuteLock2() {
if (!zkClient.exists(PATH)) {
zkClient.createPersistent(PATH);
}
}
@Override
public boolean tryLock() {
// 如果currentPath为空则为第一次尝试拿锁,第一次拿锁赋值currentPath
if (currentPath == null || currentPath.length() == 0) {
// 在指定的持久节点下创建临时顺序节点
currentPath = zkClient.createEphemeralSequential(PATH + "/", "lock");
}
// 获取所有临时节点并排序,例如:000044
List<String> childrenList = zkClient.getChildren(PATH);
Collections.sort(childrenList);
// 如果当前节点在所有节点中排名第一则获取锁成功
if (currentPath.equals(PATH + "/" + childrenList.get(0))) {
return true;
} else {
//查找当前节点的前一个节点
int wz = Collections.binarySearch(childrenList, currentPath.substring(6));
beforePath = PATH + "/" + childrenList.get(wz - 1);
}
return false;
}
@Override
public void waitLock() {
// 创建监听
IZkDataListener iZkDataListener = new IZkDataListener() {
@Override
public void handleDataChange(String s, Object o) throws Exception {
}
@Override
public void handleDataDeleted(String s) throws Exception {
// 释放锁,删除节点时唤醒等待的线程
if (countDownLatch != null) {
countDownLatch.countDown();
}
}
};
// 注册监听,这里是给排在当前节点前面的节点增加(删除数据的)监听,本质是启动另外一个线程去监听前置节点
zkClient.subscribeDataChanges(beforePath, iZkDataListener);
// 前置节点存在时,等待前置节点删除唤醒
if (zkClient.exists(beforePath)) {
countDownLatch = new CountDownLatch(1);
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 删除对前置节点的监听
zkClient.unsubscribeDataChanges(beforePath, iZkDataListener);
}
}
分布式锁比较:
还没有评论,来说两句吧...