本文共 7168 字,大约阅读时间需要 23 分钟。
其中有一些幂等性的细节。
- springboot 2.2.6
常用的有
execute
有很多参数execute(RedisCallback)
和execute(SessionCallback)
通常都是使用SessionCallback
,因为封装的更好用,不需要自己转byte
在execute
中,你可以在回调中随意获取值,这和executePipelined
有很大区别。
看看两者的区别
redisTemplate.execute(new SessionCallback
它和execute差不多,但是使用了Pipeline,也就是说传入的SessionCallback,里面exec的代码你是无法真实的看到返回内容,有点绕口,并且你不能在回调中返回非null的值,它会被管道覆盖,看个例子:
redisTemplate.executePipelined(new SessionCallback() { @Override public Object execute(RedisOperations operations) throws DataAccessException { operations.multi(); operations.delete("删掉这个"); List exec = operations.exec(); //这里你无法看到任何内容 //事实上 executePipelined中,不让你返回除了null的任何东西 System.out.println(exec); //如果这里需要判断exec.get(0) xxxx是不行的 return null;//这里你不返回null会出错,你不能返回exec } });
- 正确的方式
Listresults = redisTemplate.executePipelined(new SessionCallback () { @Override public Object execute(RedisOperations operations) throws DataAccessException { operations.multi(); operations.delete("删掉这个"); operations.exec(); return null; } }); //在这里可以使用results了 System.out.println(results);
executePipelined 只适合串行操作,并且中途不需要判断,比如批量的set操作,或者get的值,你最后拿出来只是读,并不需要它在中途去操作事务。
- 清楚了上面了
executePipelined
和execute
后,为了保证幂等性,锁的实现我们就需要使用到excute
一个简单的例子
@Componentpublic class MyLock { @Autowired private RedisTemplateobjectRedisTemplate; public String acquire(String lockName, Duration expire){ String identifier = UUID.randomUUID().toString(); //尝试获取锁的等待时间,如果需要,这里可以改为while(true),看实际情况 long timeOut = System.currentTimeMillis() + 5000; while(timeOut > System.currentTimeMillis()) { //这里使用了setIfAbsent,其实就是SET key value [EX seconds] [PX milliseconds] NX Boolean setBoolean = objectRedisTemplate.opsForValue().setIfAbsent(lockName, identifier, expire); if (setBoolean) return identifier; } return null; } public String tryLock(String lockName, Duration expire){ String identifier = UUID.randomUUID().toString(); int retryLimit = 0; long delay = 10; while(retryLimit<10) { try { //每次多尝试一次,就增加延迟时间 Thread.sleep(delay << retryLimit); } catch (InterruptedException e) { //nothing to do } retryLimit++; Boolean setBoolean = objectRedisTemplate.opsForValue().setIfAbsent(lockName, identifier, expire); if (setBoolean) return identifier; } return null; } public Object release(String lockName, String identifier){ assert lockName!=null && identifier!=null; return objectRedisTemplate.execute(new SessionCallback () { @Override public Object execute(RedisOperations operations) throws DataAccessException { List results = null; //如果在watch中被改,results是个长度为0的list,并不是null //result为null的情况是锁刚好自动到期,并被其他线程获取到了锁 //标识符不同了,没有执行到operations.multi();我们unwatch返回就行了 while(results==null||results.size()==0){ operations.watch(lockName); if (identifier.equals(operations.opsForValue().get(lockName))){ operations.multi(); operations.delete(lockName); results = operations.exec(); }else{ operations.unwatch(); return null; } } return results; } }); }}
@Test public void testLock() throws InterruptedException { CountDownLatch latch = new CountDownLatch(3); new Thread(()->{ String identifier = myLock.acquire("Lock007", Duration.ofSeconds(5)); if (StringUtils.isNotBlank(identifier)){ System.out.println(Thread.currentThread().getName()+" 拿到了锁"); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } myLock.release("Lock007",identifier); } else System.out.println(Thread.currentThread().getName()+" 没拿到锁"); latch.countDown(); },"一号线程").start(); new Thread(()->{ String identifier = myLock.acquire("Lock007", Duration.ofMillis(5500)); if (StringUtils.isNotBlank(identifier)){ System.out.println(Thread.currentThread().getName()+" 拿到了锁"); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } myLock.release("Lock007",identifier); } else System.out.println(Thread.currentThread().getName()+" 没拿到锁"); latch.countDown(); },"二号线程").start(); new Thread(()->{ String identifier = myLock.acquire("Lock007", Duration.ofMillis(5500)); if (StringUtils.isNotBlank(identifier)){ System.out.println(Thread.currentThread().getName()+" 拿到了锁"); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } myLock.release("Lock007",identifier); } else System.out.println(Thread.currentThread().getName()+" 没拿到锁"); latch.countDown(); },"三号线程").start(); latch.await(); }
- 细说一下 在watch中,如果被监视的key被篡改,那么exec一定会返回一个长度为0的list,并不是一个null,注意这点。我看好多文章都写的是个null,事实上并不是
- 在watch中,无论如何都会返回list,不会是个null,只是如果list中没有元素,则表明中途被人篡改。
- 在watch中 如果自动到期,那么执行delete后exec,返回值list一定包含0(del 返回0 表示删除失败),不影响事务。
- 在watch中 如果另外一个线程手动删除key, 那么返回的一定是个[](长度为0的list),而不是返回删除失败。
- 自动到期和人为删除watch的表现是不同的。
当对redis集群
使用上面的获取锁代码后,如果从服务器
在同步数据过程中
,主服务器挂了
,此时从服务器变为master
,但从服务器会丢失这个锁的KEY
,其他worker
在当前worker没有释放锁
的情况下,仍然可以获取到一把锁
,这类问题解决 要么用redis
的wait
命令,要么读写分离
后,setnx
到锁后用get再读一次
,直到从服务器有这条数据
为止。
转载地址:http://uxlsi.baihongyu.com/