团
队
介
绍
我们是光大科技有限公司智能云计算部运维服务团队智慧教学、教务项目组,在集团“聚焦数字化能力,构建平台化光大,引领智能化未来”战略指引下,积极构建数字化、智能化、生态化的教学、培训、管理平台,用科技赋能集团数字化转型。
锁的由来
多线程环境中,经常遇到多个线程访问同一个“共享资源”,这时候作为开发者必须考虑如何维护数据一致性,这就需要某种机制来保证只有满足某个条件(获取锁成功)的线程才能访问资源,而不满足条件(获取锁失败)的线程只能等待,在下一轮竞争中来获取锁才能访问资源。
在经典的读改写操作(i++)中,如下图,共享变量会被多个处理器同时操作,此时读改写操作就不是原子操作了,这样会导致共享变量i的值和期望的结果不一致。如果要实现互斥操作共享资源的话,在同一个JVM中可以使用Java并发处理相关的API(如ReentrantLcok或synchronized)进行互斥控制。
以上是对锁简单的介绍。但是越来越多的项目都是以微服务的方式对外提供服务并且以集群分布式方式部署,在这样的部署环境下,简单的锁应用不能满足实际业务场景。所以针对以上问题,我们引入分布式锁来解决在分布式系统中对共享资源如何保证数据一致性的问题。
简介
在分布式场景中,当多个进程不在同一个系统中,就要用到分布式锁来控制多个进程对资源的访问。为了确保分布式锁可用,我们至少要确保锁的实现满足以下条件:
-
互斥性:任意时刻,只能有一个客户端获取到锁,不能同时有两个客户端获取到锁。
-
安全性:锁只能被持有该锁的客户端删除,不能由其它客户端删除。
-
防止死锁:获取锁的客户端因为某些原因(如宕机等)而未能释放锁,锁应具备失效机制,防止其它客户端再也无法获取到该锁。
-
非阻塞性:具备非阻塞锁的特性,在没有获取到锁的情况下,直接返回获取锁失败。
下图为从单机环境的锁到分布式环境的锁的演变:
下图为集群部署方式,一个应用需部署到多台服务器上然后做负载均衡,大致如下:
图片来源于网络
实现方式
实现分布式锁有很多种方式,例如可以采用Zookeeper、Memcached、Redis。本次主要介绍采用Redis来实现分布式锁,用Redis实现分布式锁的原理是使用Redis的SETNX命令。首先,我们先来看下Redis文档中对SETNX命令的介绍:
图片来源于网络
Redis文档
SETNX
SETNX key value
将key的值设为value,当且仅当key不存在。
若给定的key已经存在,则SETNX不做任何动作。
SETNX是『SET if Not eXists』(如果不存在,则SET)的简写。
返回值:
设置成功,返回1。
设置失败,返回0。
参照Redis文档,我们按照以下方式实现一个Redis分布式锁:客户端要获取一个特定名称的锁,用锁名称作为key调用SETNX命令向Redis写入,写入成功相当于获取锁成功,写入失败也即是SETNX方法返回0,获取锁失败。如返回0,表明该锁已被其他客户端取得,这时我们可以先返回或等待一会进行重试,等对方完成或等待锁超时。最后可以通过DEL命令来释放该锁。
采用Redis实现分布式锁存在一个问题:通常Redis采用主从复制的方式部署。如果Redis主节点挂了,可能会产生问题:
客户端A在主节点获得了一个锁,主节点挂了,而到从节点的写同步还没完成;
从节点被提升为主节点,客户端B获得和A相同的锁。
本次先不考虑这个问题。
在具体代码实现之前,我们应先定义几个概念:
1、获取锁的超时时间
-
是指一个客户端获取一个锁的最长等待时间。
-
从开始获取锁算起,当过了设定的获取锁超时时间后,依然没有获取到锁,则放弃继续获取锁,返回false。
-
客户端获取到锁并处理完业务后,要释放锁。
2、获取锁的等待重试时间
-
当获取锁失败,且当前时间未达到获取锁的超时时间,可以等待一段时间后,再次获取锁。
-
可以等待并重复多次获取锁。
3、锁过期时间
-
设置锁过期时间是用来解决死锁问题的。
-
当客户端获取到锁后,用“当前时间戳+锁过期时间”来设置当前锁的过期时间。
-
如果一个持有锁的客户端失败或崩溃了不能释放锁,而且当前的时间已经大于之前计算的锁过期时间的值,说明该锁已失效,可以被重新使用。
代码实现
1、我们先以流程图的方式简单看下Redis分布式锁的实现流程
2、按照以上流程图,我们创建名为RedisLock的java类,提供三种构造函数
public class RedisLock {
/**
* 此构造函数设置采用默认的获取锁超时时间、锁过期时间。
*/
public RedisLock(StringRedisTemplate redisTemplate, String lockKey) {
this(redisTemplate, lockKey,
DEFAULT_ACQUIRE_TIMEOUT_MILLIS,DEFAULT_EXPIRY_TIME_MILLIS);
}
/**
* 此构造函数设置获取锁超时时间,并采用默认的锁过期时间。
*/
public RedisLock(StringRedisTemplate redisTemplate, String lockKey,
int acquireTimeoutMillis) {
this(redisTemplate, lockKey, acquireTimeoutMillis,
DEFAULT_EXPIRY_TIME_MILLIS);
}
/**
* 此构造函数可设置获取锁超时时间和锁过期时间。
*/
public RedisLock(StringRedisTemplate redisTemplate, String lockKey,
int acquireTimeoutMillis, int expiryTimeMillis, UUID uuid) {
this.redisTemplate = redisTemplate;
this.lockKeyPath = KEY_PREFIX+lockKey;
this.acquiryTimeoutInMillis = acquireTimeoutMillis;
this.lockExpiryInMillis = expiryTimeMillis + 1;
this.lockUUID = uuid;
}
3、在其中定义一些需使用的静态常量
private static final Lock NO_LOCK = new Lock(new UUID(0l, 0l), 0l);
private static final int ONE_SECOND = 1000;
//缺省锁过期时间
public static final int DEFAULT_EXPIRY_TIME_MILLIS = 60 * ONE_SECOND;
//缺省的获取锁超时时间
public static final int DEFAULT_ACQUIRE_TIMEOUT_MILLIS = 10 * ONE_SECOND;
//获取锁失败后的重试、等待时间
public static final int DEFAULT_ACQUIRY_RESOLUTION_MILLIS = 100;
//key前缀
public static final String KEY_PREFIX = \"LOCK_KEY_\";
private final String lockKeyPath;
private final int lockExpiryInMillis;
private final int acquiryTimeoutInMillis;
private final UUID lockUUID;
4、定义名为Lock的内部类
protected static class Lock {
private UUID uuid;
private long expiryTime;
protected Lock(UUID uuid, long expiryTimeInMillis) {
this.uuid = uuid;
this.expiryTime = expiryTimeInMillis;
}
protected static Lock fromString(String text) {
try {
String[] parts = text.split(\":\");
UUID theUUID = UUID.fromString(parts[0]);
long theTime = Long.parseLong(parts[1]);
return new Lock(theUUID, theTime);
} catch (Exception any) {
return NO_LOCK;
}
}
public UUID getUUID() {
return uuid;
}
public long getExpiryTime() {
return expiryTime;
}
@Override
public String toString() {
return uuid.toString() + \":\" + expiryTime;
}
boolean isExpired() {
return getExpiryTime() < System.currentTimeMillis();
}
boolean isExpiredOrMine(UUID otherUUID) {
return this.isExpired() || this.getUUID().equals(otherUUID);
}
}
5、实现获得锁的方法
/**
* 获取锁方法
*/public synchronized boolean acquire() throws InterruptedException {
return acquire(redisTemplate);
}
protected synchronized boolean acquire(StringRedisTemplate redisTemplate) throws InterruptedException {
int timeout = acquiryTimeoutInMillis;
while (timeout >= 0) {
final Lock newLock = asLock(System.currentTimeMillis() + lockExpiryInMillis);
boolean result = redisTemplate.execute(new RedisCallback<Boolean>() {
public Boolean doInRedis(RedisConnection connection)throws DataAccessException {
return connection.setNX(lockKeyPath.getBytes(), newLock.toString().getBytes());
}
});
if (result) {
this.lock = newLock;
return true;
}
final String currentValueStr = redisTemplate.opsForValue().get(lockKeyPath);
final Lock currentLock = Lock.fromString(currentValueStr);
if (currentLock.isExpiredOrMine(lockUUID)) {
String oldValueStr = redisTemplate.opsForValue().getAndSet(lockKeyPath, newLock.toString());
if (oldValueStr != null && oldValueStr.equals(currentValueStr)) {
this.lock = newLock;
return true;
}
}
timeout -= DEFAULT_ACQUIRY_RESOLUTION_MILLIS;
Thread.sleep(DEFAULT_ACQUIRY_RESOLUTION_MILLIS);
}
return false;
}
6、实现释放锁的方法
/**
* 释放锁方法
*/public synchronized void release() {
release(redisTemplate);
}
protected synchronized void release(StringRedisTemplate redisTemplate) {
if (isLocked()) {
redisTemplate.delete(lockKeyPath);
this.lock = null;
}
}public synchronized boolean isLocked() {
return this.lock != null;
}
7、注解方式
/**
* 分布式锁,注解方式
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface DistributeLock {
/**
* 分布锁key
*
* @return
*/
String key();
/**
* 锁自动过期时间 ,单位为毫秒
*
* @return
*/
int expiryTimeMillis();
/**
* 获取锁最长等待时间,单位为毫秒
*
* @return
*/
int acquireTimeoutMillis();
}
/**
* 分布式锁拦截器
*/
public class DistributeLockInteceptor implements MethodInterceptor {
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
DistributeLock distributeLock = AnnotationUtils.findAnnotation(invocation.getMethod(), DistributeLock.class);
if (distributeLock != null) {
RedisLock lock = new RedisLock(stringRedisTemplate,distributeLock.key(),distributeLock.acquireTimeoutMillis(), distributeLock.expiryTimeMillis());
try {
if (lock.acquire()) {
return invocation.proceed();
} else {
throw new RuntimeException(\"wait for lock timeout!\");
}
} catch (InterruptedException e) {
throw new RuntimeException(\"can\'t get lock!\");
} finally {
lock.release();
}
} else {
return invocation.proceed();
}
}
}
使用方法
我们已经初步实现了基于Redis的分布式锁,本节主要介绍在业务代码中如何使用分布式锁。
1、获取锁
获取锁时,需要传递StringRedisTemplate Bean作为参数,因此在类中使用@Autowired注解自动注入StringRedisTemplate。
RedisLock lock = new RedisLock(redisTemplate, \"LOCK_100001\");
try {
if (lock.acquire()) {
//TODO 在这里实现你自己的业务逻辑
} else {
throw new RuntimeException(\"can\'t get lock!\");
}
} catch (InterruptedException e) {
throw new RuntimeException(\"can\'t get lock!\");
} finally {
lock.release();
}
只需要在示例的TODO位置放置需要互斥执行的逻辑即可。
上面为最简单的调用方式,我们还可以根据实际并发情况、任务执行时间来设置不同的获取锁超时时间、锁过期时间等参数,来达到最优的效果。
2、释放锁
redisLock.release();
业务执行完一定要执行release()来及时释放锁,未释放的锁需要等到锁过期后,其他客户端才能获得该锁。
3、注解方式获取与释放锁
我们也可以通过方法注解的方式来使用分布式锁,可以在类方法上配置注解@DistributeLock,当调用该方法的时候,会自动去获取分布式锁,只有获取到锁后才会执行该方法。例如:
@DistributeLock(key = \"orderNo\",acquireTimeoutMillis = 3000 , expiryTimeMillis = 5000)
public void updateLockInfo(LockInfo lockInfo) {
//TODO 在这里实现你自己的业务逻辑
}
要使用此注解,需要进行如下声明:
<bean id=\"distributeLockInteceptor\" class=\"com.xxx.common.lock.aop.DistributeLockInteceptor\" />
<context:annotation-config />
<aop:config>
<!--切入点 -->
<aop:pointcut id=\"distibuteLock\"
expression=\"execution(public * com.xxx.common.lock.annotation.*.*(..)) \" />
<!--在该切入点使用自定义拦截器 -->
<aop:advisor pointcut-ref=\"distibuteLock\" advice-ref=\"distributeLockInteceptor\" />
</aop:config>
小结
通过前面的介绍,我们大致认识了分布式锁,并结合实际情况实现了以Redis为基础的分布式锁,也学会了如何运用Redis分布式锁。后续我们会继续介绍用其他方式如何高效的实现分布式锁,让我们带着对分布式锁的认识,继续深入研究如何实现高并发、高可用、一致性的分布式系统。
往期回顾
1
基于Prometheus的分布式监控平台落地与实践
2
浅析虚拟桌面安全设计
3
如何定位H5页面存在的性能问题?性能优化工具的使用探索
4
网络中的身份证明–数字签名
欢迎关注EBCloud!
作者 | 张庚建
原创文章,作者:EBCloud,如若转载,请注明出处:https://www.sudun.com/ask/32865.html