```java
@Component
public class RedisWorker {
private static final long BEGIN_TIMESTAMP;
private final StringRedisTemplate stringRedisTemplate;
private final String INCR_PREFIX="incr:";
private final String INCR_DELIMITER=":";
/**
* 位运算向高位移动的位数,为了给redis自增长key腾出32位的空间
*/
private final int COUNT_BITS=32;
public RedisWorker(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}
static {
//生产时间戳
LocalDateTime begin_time = LocalDateTime.of(2002, 1, 2, 0, 0, 0);
//计算开始时间戳
BEGIN_TIMESTAMP=begin_time.toEpochSecond(ZoneOffset.UTC);
}
/**
* <P>
* 基于传入key生成一个全局唯一ID
* </P>
* @param keyPrefix 需要为某个传入的key生成一个全局唯一ID
* @return
*/
public long nextId(String keyPrefix){
//1.生成时间戳
LocalDateTime now = LocalDateTime.now();
long nowTimeStamp = now.toEpochSecond(ZoneOffset.UTC);
long timeStampGap=nowTimeStamp-BEGIN_TIMESTAMP;
//2.生成序列号
//2.1 获取当前日期,精确到天
String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
//redis单个key的自增长有上限,最大为2的64次方
//如果自增长key不存在,redis会自动创建一个
Long increment = stringRedisTemplate.opsForValue().increment(INCR_PREFIX + keyPrefix + INCR_DELIMITER + date);
return timeStampGap << COUNT_BITS | increment;
}
}
@SpringBootTest
class HmDianPingApplicationTests {
@Autowired
private IShopService iShopService;
@Autowired
private RedisWorker redisWorker;
private static final ExecutorService es= Executors.newFixedThreadPool(500);
/**
* 测试生成全局唯一ID
*/
@Test
public void testGloballyUniqueID() throws InterruptedException {
CountDownLatch countDownLatch=new CountDownLatch(300);
Runnable task=()->{
for (int i = 0; i < 100 ; i++) {
long nextId = redisWorker.nextId("order");
System.out.println(nextId);
}
countDownLatch.countDown();
};
long start = System.currentTimeMillis();
for (int i = 0; i < 300; i++) {
es.submit(task);
}
countDownLatch.await();
long end = System.currentTimeMillis();
System.out.println("cost time "+(end-start)+" ms");
}
}
大家可以自己测试一下
数据库自增指的是单独使用数据库中某一张表来专门存放主键,当我们需要的时候,只需要提前从该表中读取出一批主键集合,缓存在内存中即可,但是该方法显然太慢了,因此不推荐使用
@Override
@Transactional
public Result seckillVoucher(Long voucherId) {
//1.查询优惠卷
SeckillVoucher seckillVoucher = iSeckillVoucherService.getById(voucherId);
//2.判断秒杀是否开始
if(seckillVoucher.getBeginTime().isAfter(LocalDateTime.now())){
//尚未开始
return Result.fail("秒杀尚未开始!");
}
//3.判断秒杀书否已经结束
if(seckillVoucher.getEndTime().isBefore(LocalDateTime.now())){
//已经结束
return Result.fail("秒杀已经结束");
}
//4.判断库存是否充足
if(seckillVoucher.getStock()<1){
return Result.fail("库存不足!");
}
//5.扣减库存
boolean success = iSeckillVoucherService.update()
.setSql("stock = stock -1")
.eq("voucher_id", voucherId)
.update();
if(!success){
return Result.fail("扣减失败");
}
//6.创建订单
VoucherOrder voucherOrder = new VoucherOrder();
//6.1 订单id
long orderId = redisWorker.nextId("order");
voucherOrder.setId(orderId);
//6.2用户id
Long userId = UserHolder.getUser().getId();
voucherOrder.setUserId(userId);
//6.3 代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
//7。 返回订单id
return Result.ok(orderId);
}
}
为什么会产生超卖问题:
当库存只剩一件的时候,此时三个线程打进入,同时查询,发现只剩一件库存,然后会挨个执行扣减库存的逻辑,此时就会导致超卖问题的发生。
比较版本号是否变化,每次操作完版本号加一
比较数据本身是否发生变化
就拿上面例子中出现的超卖问题为例,通过cas法进行解决,其实很简单,只需要改一行代码即可:
//5.扣减库存
boolean success = iSeckillVoucherService.update()
.setSql("stock = stock -1")
.eq("voucher_id", voucherId)
//cas比较stock数据是否变化,如果发生了变化,不进行处理
.eq("stock",seckillVoucher.getStock())
.update();
发现超卖问题没有了,但是却只卖出去了23件,只是为什么?
这是因为当一堆线程尝试去并发修改数据时,最先修改得手的线程,改变了stock的值后,后面其他的线程,都会因为stock值与旧值不符,而更新失败。
这里可以简单优化一下,让stock大于0即可
//5.扣减库存
boolean success = iSeckillVoucherService.update()
.setSql("stock = stock -1")
.eq("voucher_id", voucherId)
.gt("stock",0)
.update();
其实我们只需要再扣减库存前判断一下当前用户是否已经抢购过票否,即可:
@Override
@Transactional
public Result seckillVoucher(Long voucherId) {
//1.查询优惠卷
SeckillVoucher seckillVoucher = iSeckillVoucherService.getById(voucherId);
//2.判断秒杀是否开始
if(seckillVoucher.getBeginTime().isAfter(LocalDateTime.now())){
//尚未开始
return Result.fail("秒杀尚未开始!");
}
//3.判断秒杀书否已经结束
if(seckillVoucher.getEndTime().isBefore(LocalDateTime.now())){
//已经结束
return Result.fail("秒杀已经结束");
}
//4.判断库存是否充足
if(seckillVoucher.getStock()<1){
return Result.fail("库存不足!");
}
//5.一人一单
Long userId = UserHolder.getUser().getId();
//加上悲观锁--我们这里要确保每一个用户id一把锁,toString底层是创建一个新的String对象,
// 我们这里把每次得到的用户id放入字符串常量池中,确保其唯一性
synchronized (userId.toString().intern()){
Integer count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
if(count>0){
return Result.fail("用户已经购买过一次了");
}
}
//6.扣减库存
boolean success = iSeckillVoucherService.update()
.setSql("stock = stock -1")
.eq("voucher_id", voucherId)
.gt("stock",0)
.update();
if(!success){
return Result.fail("扣减失败");
}
//7.创建订单
VoucherOrder voucherOrder = new VoucherOrder();
//7.1 订单id
long orderId = redisWorker.nextId("order");
voucherOrder.setId(orderId);
//7.2用户id
voucherOrder.setUserId(userId);
//7.3 代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
//8。 返回订单id
return Result.ok(orderId);
}
在单机模式下,我们可以通过加互斥锁来保证线程安全性,原理是利用jvm的锁监视器来完成的
但是在集群模式下,我们会部署多台tomcat,每一台tomcat对应一台全新的JVM,那么每台jvm都有自己的锁监视器,这样就导致每台jvm内部能够保证线程安全性,但是多台jvm之间无法保证线程安全性,从而导致集群模式下的并发安全问题
上面获取锁的过程还是存在一些问题,如果添加锁和设置过期时间两条命令之间,发生故障,也会导致锁无法释放,因此我们必须确保添加锁和设置过期时间两者执行的原子性
set命令可以同时设置过期时候,和添加互斥性,实现获取锁和设置过期时间的原子性。
如果获取锁失败,我们之间快速返回失败信息,不会阻塞去尝试获取锁。
public class SimpleRedisLock implements Ilock{
private String name;
private StringRedisTemplate stringRedisTemplate;
private static final String KEY_PREFIX="lock:";
public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}
@Override
public boolean tryLock(long timeSec) {
//获取线程编号
long threadId = Thread.currentThread().getId();
//获取锁
Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId + "", timeSec, TimeUnit.SECONDS);
//success可能为null,这样拆箱过程会报错
return Boolean.TRUE.equals(success);
}
@Override
public void unLock() {
stringRedisTemplate.delete(KEY_PREFIX+name);
}
}
应用到上面悲观锁解决一人一单的代码中去:
@Override
@Transactional
public Result seckillVoucher(Long voucherId) {
//1.查询优惠卷
SeckillVoucher seckillVoucher = iSeckillVoucherService.getById(voucherId);
//2.判断秒杀是否开始
if(seckillVoucher.getBeginTime().isAfter(LocalDateTime.now())){
//尚未开始
return Result.fail("秒杀尚未开始!");
}
//3.判断秒杀书否已经结束
if(seckillVoucher.getEndTime().isBefore(LocalDateTime.now())){
//已经结束
return Result.fail("秒杀已经结束");
}
//4.判断库存是否充足
if(seckillVoucher.getStock()<1){
return Result.fail("库存不足!");
}
//5.一人一单
Long userId = UserHolder.getUser().getId();
//创建锁对象
SimpleRedisLock simpleRedisLock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);
//尝试获取分布式锁
boolean isLock = simpleRedisLock.tryLock(1200L);
if(!isLock){
return Result.fail("重复下单!!!");
}
//我们只需要确保下面这两行代码的集群并发问题被解决
try{
Integer count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
if(count>0){
return Result.fail("用户已经购买过一次了");
}
}finally {
simpleRedisLock.unLock();
}
//6.扣减库存
boolean success = iSeckillVoucherService.update()
.setSql("stock = stock -1")
.eq("voucher_id", voucherId)
.gt("stock",0)
.update();
if(!success){
return Result.fail("扣减失败");
}
//7.创建订单
VoucherOrder voucherOrder = new VoucherOrder();
//7.1 订单id
long orderId = redisWorker.nextId("order");
voucherOrder.setId(orderId);
//7.2用户id
voucherOrder.setUserId(userId);
//7.3 代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
//8。 返回订单id
return Result.ok(orderId);
}
大家自习用jemeter去进行并发测试即可
这一切的根源都在于线程一误删了别人的锁,导致一系列错误发生
就是释放锁前,判断一下锁的标识是否改变
注意这个锁标识不能只是线程ID,因为分布式环境下,不同的jvm可能会分配到相同的线程ID,因此需要再加上UUID进行区分
jvm内部通过维护一个递增的数字,来标识当前已经创建的线程数,而这个递增的数字就会分配给当前线程作为线程ID
即用UUID来确保不同jvm之间,锁标识的不同,加上线程ID方便在同一个jvm进程中,进行锁标识的区分
public class SimpleRedisLock implements Ilock{
private String name;
private StringRedisTemplate stringRedisTemplate;
private static final String KEY_PREFIX="lock:";
private static final String ID_PREFIX= UUID.randomUUID().toString(true)+"-";
public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}
@Override
public boolean tryLock(long timeSec) {
//获取线程编号
String threadId = ID_PREFIX+Thread.currentThread().getId();
//获取锁
Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId + "", timeSec, TimeUnit.SECONDS);
//success可能为null,这样拆箱过程会报错
return Boolean.TRUE.equals(success);
}
@Override
public void unLock() {
//对比锁标识是否发生改变
String curLockTag = ID_PREFIX + Thread.currentThread().getId();
String lockTag = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
//标识没有改变,可以释放锁,否则不进行操作
if(curLockTag.equals(lockTag)){
stringRedisTemplate.delete(KEY_PREFIX+name);
}
}
}
上面好像已经完美解决了分布式锁的误删除问题,但是真的是这样吗?
因此,我们必须要确保判断标识和释放锁的原子性执行,即释放锁的过程必须是原子性的
大家感兴趣可以看一下LUA的教程,该语言比较简单,容易上手:
Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性,Lua是一种编程语言。
这里重点介绍Redis提供的调用函数.语法如下:
#执行redis命令
redis.call('命令名称','key','其他参数',...)
例如,我们要执行set name jack,则脚本是这样的:
#执行set name jack
redis.call('set','name','jack')
例如,我们要先执行set name Rose,再执行get name,则脚本如下:
#先执行set name jack
redis.call('set','name','jack')
#再执行get name
local name=redis.call('get','name')
#返回
return name
写好脚本后,需要用Redis命令来调用脚本,调用脚本的常见命令如下:
例如,我们要执行redis.call(‘set’,‘name’,‘jack’)这个脚本,语法如下:
如果脚本中的key,value不想写死,可以作为参数传递,key类型参数会放入KEYS数组,其他参数会放入ARGV数组,在脚本中可以从KEYS和ARGV数组获取这些参数。
释放锁的业务流程是这样的:
如果用lua脚本来标识则是这样的:
--这里KEYS[i]就是锁的key,这里的ARGV[1]就是当前线程标识
--lua数组下标从1开始
--获取锁中的标识,判断是否与当前线程标识一致
if(redis.call('GET',KEYS[1])==ARGV[1]) then
-- 一致,则删除锁
return redis.call('DEL',KEYS[1])
end
--不一致,则直接返回
return 0
public class SimpleRedisLock implements Ilock{
private String name;
private StringRedisTemplate stringRedisTemplate;
private static final String KEY_PREFIX="lock:";
private static final String ID_PREFIX= UUID.randomUUID().toString(true)+"-";
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static {
UNLOCK_SCRIPT=new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("delLock.lua"));
//设置脚本返回结果
UNLOCK_SCRIPT.setResultType(Long.class);
}
public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}
@Override
public boolean tryLock(long timeSec) {
//获取线程编号
String threadId = ID_PREFIX+Thread.currentThread().getId();
//获取锁
Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId + "", timeSec, TimeUnit.SECONDS);
//success可能为null,这样拆箱过程会报错
return Boolean.TRUE.equals(success);
}
@Override
public void unLock() {
stringRedisTemplate.execute(UNLOCK_SCRIPT, Collections.singletonList(KEY_PREFIX + name),ID_PREFIX+Thread.currentThread().getId());
}
}
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.17.1</version>
</dependency>
@Configuration
public class RedisConfig {
@Bean
public RedissonClient redissonClient(@Value("${spring.redis.host}")String redisAddress,
@Value("${spring.redis.port}")String redisPort,
@Value("${spring.redis.password}")String redisPwd){
//配置类
Config config=new Config();
//添加redis地址,这里添加了单点的地址,也可以使用config.useClusterServers()添加集群地址
config.useSingleServer().setAddress("redis://"+redisAddress+":"+redisPort).setPassword(redisPwd);
//创建客户端
return Redisson.create(config);
}
}
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Autowired
private ISeckillVoucherService iSeckillVoucherService;
@Autowired
private RedisWorker redisWorker;
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Autowired
private RedissonClient redissonClient;
@Override
@Transactional
public Result seckillVoucher(Long voucherId) {
//1.查询优惠卷
SeckillVoucher seckillVoucher = iSeckillVoucherService.getById(voucherId);
//2.判断秒杀是否开始
if(seckillVoucher.getBeginTime().isAfter(LocalDateTime.now())){
//尚未开始
return Result.fail("秒杀尚未开始!");
}
//3.判断秒杀书否已经结束
if(seckillVoucher.getEndTime().isBefore(LocalDateTime.now())){
//已经结束
return Result.fail("秒杀已经结束");
}
//4.判断库存是否充足
if(seckillVoucher.getStock()<1){
return Result.fail("库存不足!");
}
//5.一人一单
Long userId = UserHolder.getUser().getId();
//创建锁对象
RLock lock = redissonClient.getLock("lock:order:" + userId);
//尝试获取分布式锁
// 第一个参数为获取锁的最大等待时间(期间会重试)--默认-1,,失败直接返回
//锁自动释放时间--默认30秒
//时间单位
boolean tryLock = lock.tryLock();
if(!tryLock){
return Result.fail("重复下单!!!");
}
//我们只需要确保下面这两行代码的集群并发问题被解决
try{
Integer count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
if(count>0){
return Result.fail("用户已经购买过一次了");
}
}finally {
lock.unlock();
}
//6.扣减库存
boolean success = iSeckillVoucherService.update()
.setSql("stock = stock -1")
.eq("voucher_id", voucherId)
.gt("stock",0)
.update();
if(!success){
return Result.fail("扣减失败");
}
//7.创建订单
VoucherOrder voucherOrder = new VoucherOrder();
//7.1 订单id
long orderId = redisWorker.nextId("order");
voucherOrder.setId(orderId);
//7.2用户id
voucherOrder.setUserId(userId);
//7.3 代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
//8。 返回订单id
return Result.ok(orderId);
}
}
和jdk的可重入锁ReentrantLock思路一致,如果是同一个线程在锁没有释放前,尝试去获取锁,那么锁计数加一,然后再释放锁的时候,只有当计数为0的时候,才会真正去释放锁,否则只是将计数减去一个
这里因为需要一个计数属性,因此之前的String结构,需要变为hash结构来实现
这里加锁和解锁的过程都需要保证原子性,因此还是需要使用lua脚本来实现
大家感兴趣可以自己去看一下Redisson中源码实现,最终也是使用了lua脚本来确保多条redis命令执行的原子性的,而且和我们上面给出的lua脚本基本一致
@Override
public boolean tryLock() {
//tryLockAsync---暗示该任务是异步执行的
//get()就是获取该方法返回的Future对象,然后阻塞等待,知道获取到返回的异步结果为止
return get(tryLockAsync());
}
get方法:
@Override
public <V> V get(RFuture<V> future) {
if (Thread.currentThread().getName().startsWith("redisson-netty")) {
throw new IllegalStateException("Sync methods can't be invoked from async/rx/reactive listeners");
}
try {
//无限制阻塞等待
return future.toCompletableFuture().get();
} catch (InterruptedException e) {
future.cancel(true);
Thread.currentThread().interrupt();
throw new RedisException(e);
} catch (ExecutionException e) {
throw convertException(e);
}
}
tryLockAsync方法:
@Override
public RFuture<Boolean> tryLockAsync() {
//传入的当前线程ID号
return tryLockAsync(Thread.currentThread().getId());
}
重载方法:
@Override
public RFuture<Boolean> tryLockAsync(long threadId) {
//第一个参数是获取锁失败后最大重试等待时间(-1表示不等待,获取失败直接返回)
//第二个参数是锁超时释放的时间(-1表示用户没有设置,那么设置为默认的30sec)
//第三个参数是时间单位
//第四个参数是线程id
return tryAcquireOnceAsync(-1, -1, null, threadId);
}
tryAcquireOnceAsync方法:
private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture<Boolean> acquiredFuture;
//用户手动设置了leaseTime,走这里
if (leaseTime > 0) {
acquiredFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
} else {
//如果leaseTime 为-1走这里
//internalLockLeaseTime就是给出的默认值,默认值为30sec
acquiredFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
}
//acquiredFuture执行结束后,会将该future的返回值作为结果传入thenApply方法进行处理
CompletionStage<Boolean> f = acquiredFuture.thenApply(acquired -> {
// lock acquired
//如果锁获取成功
if (acquired) {
if (leaseTime > 0) {
//将internalLockLeaseTime 设置为leaseTime转换为Millis的值
internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
//如果用户没设置leaseTime ,或者leaseTime 设置为了-1,会进入下面这个定时续约有效期的方法
scheduleExpirationRenewal(threadId);
}
}
//返回是否获取锁成功
return acquired;
});
return new CompletableFutureWrapper<>(f);
}
tryLockInnerAsync:真正尝试去获取锁:
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
//异步执行一段lua脚本
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
//锁是否存在
"if (redis.call('exists', KEYS[1]) == 0) then " +
//锁不存在--那么获取锁,设置key为name,值为线程ID+随机数字
//然后计数器为1
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
//设置有效期
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
//返回nil--锁获取成功
"return nil; " +
"end; " +
//如果锁是自己的
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
//重入处理--计数加一
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
//有效期重置
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
//返回nil--锁获取成功
"return nil; " +
"end; " +
//最终返回当前锁的剩余有效期---走到这里说明锁获取失败了
"return redis.call('pttl', KEYS[1]);",
//key[1]为我们设置的锁名,getLock时传入的name,argv[1]为锁超时释放事件,argv[2]为线程ID拼接上一串随机数字
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}
scheduleExpirationRenewal方法:
进入该方法的前提是leaseTime 为-1并且锁获取成功了
private static final ConcurrentMap<String, ExpirationEntry> EXPIRATION_RENEWAL_MAP = new ConcurrentHashMap<>();
protected void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
//该map是一个全局静态共享的map
//this.entryName = id + ":" + name;--->可以理解为锁名
//能够放入的前提是当前锁在map中不存在
//保证一把锁无论重入几次,拿到的永远是同一个ExpirationEntry
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
//说明放入失败,但是我们可以拿到当前锁已经存在的ExpirationEntry
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
//说明是第一次进来
entry.addThreadId(threadId);
try {
//刷新当前锁有效期
renewExpiration();
} finally {
if (Thread.currentThread().isInterrupted()) {
cancelExpirationRenewal(threadId);
}
}
}
}
renewExpiration方法:
private void renewExpiration() {
//获取到当前锁名
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
//一个定时任务,并且会延时执行
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
//拿到锁对应的ExpirationEntry
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
//更新当前锁的有效期
CompletionStage<Boolean> future = renewExpirationAsync(threadId);
future.whenComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getRawName() + " expiration", e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
}
//如果续期成功,那么递归继续执行当前方法,然后又是延迟十秒后,再去执行任务,续约有效期
if (res) {
// reschedule itself
renewExpiration();
} else {
cancelExpirationRenewal(null);
}
});
}
},
// internalLockLeaseTime 为30,因此这里是过了10秒后,该任务执行一次
internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
//将ExpirationEntry与当前续约的task任务关联,方便在释放锁的时候,结束该任务
ee.setTimeout(task);
}
renewExpirationAsync方法
protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {
//执行一段lua脚本
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
//当前锁是否是否存在
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
//如果存在,那么就重置有效期
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
//返回一,表示续期成功
"return 1; " +
"end; " +
//锁续期失败
"return 0;",
Collections.singletonList(getRawName()),
internalLockLeaseTime, getLockName(threadId));
}
@Override
public void unlock() {
try {
//get还是阻塞等待获取结果
get(unlockAsync(Thread.currentThread().getId()));
} catch (RedisException e) {
if (e.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException) e.getCause();
} else {
throw e;
}
}
}
unlockAsync方法源码分析:
@Override
public RFuture<Void> unlockAsync(long threadId) {
//真正释放锁的流程
RFuture<Boolean> future = unlockInnerAsync(threadId);
CompletionStage<Void> f = future.handle((opStatus, e) -> {
//取消锁的续约
cancelExpirationRenewal(threadId);
if (e != null) {
throw new CompletionException(e);
}
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
throw new CompletionException(cause);
}
return null;
});
return new CompletableFutureWrapper<>(f);
}
unlockInnerAsync真正释放锁:
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
//还是执行一段lua脚本
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
//锁是否存在
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
//锁不存在,返回nil
"return nil;" +
"end; " +
//锁计数减一
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
//计数是否大于0
"if (counter > 0) then " +
//如果计数大于0,那么重置锁的有效期
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
//返回0
"return 0; " +
"else " +
//计数小于0--删除锁,然后发布订阅消息
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
cancelExpirationRenewal取消锁的续约
protected void cancelExpirationRenewal(Long threadId) {
//拿到锁对应的ExpirationEntry
ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (task == null) {
return;
}
//移除定时续期任务
if (threadId != null) {
task.removeThreadId(threadId);
}
if (threadId == null || task.hasNoThreads()) {
Timeout timeout = task.getTimeout();
if (timeout != null) {
timeout.cancel();
}
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
//尝试获取锁--这段逻辑上面分析过了--返回剩余过期时间
Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired
//如果返回的剩余过期时间为null,表示获取锁成功,上面分析的获取锁的lua脚本中讲过了
if (ttl == null) {
return true;
}
//计算剩余等待时间还剩多少
time -= System.currentTimeMillis() - current;
//如果waitTime被消耗完了,那么返回获取锁失败
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
current = System.currentTimeMillis();
//订阅当前锁的释放消息---lua脚本解锁成功后,会发布一个消息
CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
try {
//当然不是一直等待,而是等待time时间
subscribeFuture.get(time, TimeUnit.MILLISECONDS);
} catch (ExecutionException | TimeoutException e) {
//如果超时前,锁还是没有被释放,那么返回获取锁失败
if (!subscribeFuture.cancel(false)) {
subscribeFuture.whenComplete((res, ex) -> {
if (ex == null) {
unsubscribe(res, threadId);
}
});
}
acquireFailed(waitTime, unit, threadId);
return false;
}
try {
//计算剩余waitTime时间,如果没了,返回获取锁失败
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
//循环尝试
while (true) {
long currentTime = System.currentTimeMillis();
//重试去获取锁
ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
// waiting for message
currentTime = System.currentTimeMillis();
//通过信号量机制等待一段时间后,如果还是没能通知去获取锁,那么就返回
if (ttl >= 0 && ttl < time) {
commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
}
} finally {
//取消订阅
unsubscribe(commandExecutor.getNow(subscribeFuture), threadId);
}
// return get(tryLockAsync(waitTime, leaseTime, unit));
}
大家思考一个场景:
在Redis主从集群环境下,如果我们要将锁存入Redis的主节点中去,按照正常操作是,锁设置到主节点后,会进行主从同步,这样从节点上也能同步主节点上的锁记录,这样当主节点挂掉的时候,从节点中某一个上位后,也能够确保锁不会丢失
但是如果主从同步过程中出现意外会发生什么事情呢?
如果主从同步过程中,主节点挂掉了,那么存储到主节点锁就丢失了,导致下次再获取锁的时候,能够获取成功
使用分片集群,即我们可以使用多个互不相干的redis节点,或者redis集群。
我们将锁存入每个redis节点或集群中,这样就算其中一台挂掉了,那么只要其中某一台中锁记录保存着,就说明锁还存在。
RedisConfig进行修改,增加一个redissionClient
@Configuration
public class RedisConfig {
@Bean
public RedissonClient redissonClient(@Value("${spring.redis.host}")String redisAddress,
@Value("${spring.redis.port}")String redisPort,
@Value("${spring.redis.password}")String redisPwd){
//配置类
Config config=new Config();
//添加redis地址,这里添加了单点的地址,也可以使用config.useClusterServers()添加集群地址
config.useSingleServer().setAddress("redis://"+redisAddress+":"+redisPort).setPassword(redisPwd);
//创建客户端
return Redisson.create(config);
}
@Bean
public RedissonClient redissonClient1(@Value("${spring.redis1.host}")String redisAddress,
@Value("${spring.redis1.port}")String redisPort,
@Value("${spring.redis1.password}")String redisPwd){
//配置类
Config config=new Config();
//添加redis地址,这里添加了单点的地址,也可以使用config.useClusterServers()添加集群地址
config.useSingleServer().setAddress("redis://"+redisAddress+":"+redisPort);
//创建客户端
return Redisson.create(config);
}
}
联锁使用测试:
@Resource(name = "redissonClient")
private RedissonClient redissonClient;
@Resource(name = "redissonClient1")
private RedissonClient redissonClient1;
private RLock multiLock;
@BeforeEach
void setUp(){
RLock lock = redissonClient.getLock("order");
RLock lock2 = redissonClient1.getLock("order");
//创建联锁
multiLock = redissonClient.getMultiLock(lock, lock2);
}
@Test
public void multiLockTest(){
boolean tryLock = multiLock.tryLock();
boolean tryLock1 = multiLock.tryLock();
System.out.println("你好");
multiLock.unlock();
multiLock.unlock();
}
两次unlock过后,两台redis中的锁都被释放掉了
@Override
public boolean tryLock() {
try {
return tryLock(-1, -1, null);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long newLeaseTime = -1;
//如果用户指定了leaseTime或者waitTime就把相关时间单位统一化管理
if (leaseTime > 0) {
if (waitTime > 0) {
newLeaseTime = unit.toMillis(waitTime)*2;
} else {
newLeaseTime = unit.toMillis(leaseTime);
}
}
long time = System.currentTimeMillis();
long remainTime = -1;
if (waitTime > 0) {
//remainTime 就是waitTime,剩余等待时间
remainTime = unit.toMillis(waitTime);
}
long lockWaitTime = calcLockWaitTime(remainTime);
//failedLocksLimit为0
int failedLocksLimit = failedLocksLimit();
//acquiredLocks 保存获取成功锁集合
List<RLock> acquiredLocks = new ArrayList<>(locks.size());
//遍历联锁集合
for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
RLock lock = iterator.next();
boolean lockAcquired;
try {
//空参的tryLock
if (waitTime <= 0 && leaseTime <= 0) {
lockAcquired = lock.tryLock();
} else {
//有参的
long awaitTime = Math.min(lockWaitTime, remainTime);
lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
}
} catch (RedisResponseTimeoutException e) {
unlockInner(Arrays.asList(lock));
lockAcquired = false;
} catch (Exception e) {
lockAcquired = false;
}
//如果当前锁获取成功,那么加入acquiredLocks集合中
if (lockAcquired) {
acquiredLocks.add(lock);
} else {
//只有当所有锁都获取成功后,才会跳出集合
if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
break;
}
//failedLocksLimit 默认为0
if (failedLocksLimit == 0) {
//因为当前锁获取失败了,那么就把所已经获取成功的锁都给释放掉
unlockInner(acquiredLocks);
//如果waitTime设置为-1,即失败快速返回,那么就直接返回false
if (waitTime <= 0) {
return false;
}
failedLocksLimit = failedLocksLimit();
//清空已经获取成功的锁集合
acquiredLocks.clear();
// reset iterator
//将迭代器的指针移动到集合首位--再下一轮循环时,重新从第一把锁开始获取
while (iterator.hasPrevious()) {
iterator.previous();
}
} else {
failedLocksLimit--;
}
}
//将剩余时间减去获取当前锁消耗的时间
if (remainTime > 0) {
remainTime -= System.currentTimeMillis() - time;
time = System.currentTimeMillis();
//如果剩余时间没了,那么释放已经获取的锁,然后返回false
if (remainTime <= 0) {
unlockInner(acquiredLocks);
return false;
}
}
//获取下一把锁,或者重头开始获取
}
//所有锁获取成功后,如果还有剩余时间
if (leaseTime > 0) {
acquiredLocks.stream()
.map(l -> (RedissonBaseLock) l)
//重置每一把锁的过期时间---为了保证每一把锁的过期时间都是一样的
//因为上面循环获取每一把锁时,最先获取到的锁,过期时间与其他锁而言会短很多
.map(l -> l.expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS))
.forEach(f -> f.toCompletableFuture().join());
}
return true;
}
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://cjdhy.blog.csdn.net/article/details/124461134
内容来源于网络,如有侵权,请联系作者删除!