ソースを参照

Merge pull request #4 from just-a-stone/master

redis原子性操作
zhou hao 8 年 前
コミット
e4e3c909d5

+ 161 - 118
hsweb-web-concurrent/hsweb-web-concurrent-lock/src/main/java/org/hsweb/concurrent/lock/support/redis/RedisReadWriteLock.java

@@ -1,10 +1,16 @@
 package org.hsweb.concurrent.lock.support.redis;
 
+import org.springframework.core.io.ClassPathResource;
+import org.springframework.data.redis.connection.StringRedisConnection;
 import org.springframework.data.redis.core.RedisCallback;
 import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.core.StringRedisTemplate;
+import org.springframework.data.redis.core.script.DefaultRedisScript;
+import org.springframework.scripting.support.ResourceScriptSource;
 import org.springframework.util.Assert;
 
-import java.util.UUID;
+import java.util.*;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
@@ -20,20 +26,34 @@ public class RedisReadWriteLock implements ReadWriteLock {
     private WriteLock writeLock;
     private long lockKeyExpireTime = DEFAULT_EXPIRE;
     private long waitTime = 30;
-    protected byte[] lockValue;
-    private byte[] readLockKey, writeLockKey;
+    protected String lockValue;
+    private String readLockKey, writeLockKey;
 
-    private RedisTemplate redisTemplate;
+    private static DefaultRedisScript<Boolean> redisScriptRead;
+    private static DefaultRedisScript<Boolean> redisScriptWrite;
+
+    static {
+        //初始化脚本
+        redisScriptRead = new DefaultRedisScript<>();
+        redisScriptWrite = new DefaultRedisScript<>();
+
+        redisScriptRead.setScriptSource(new ResourceScriptSource(new ClassPathResource("MEAT_INF/scripts/vcheckAndSadd.lua")));
+        redisScriptRead.setResultType(Boolean.class);
+        redisScriptWrite.setScriptSource(new ResourceScriptSource(new ClassPathResource("MEAT_INF/scripts/scheckAndVset.lua")));
+        redisScriptWrite.setResultType(Boolean.class);
+    }
+
+    private StringRedisTemplate redisTemplate;
 
     public RedisReadWriteLock(String key, RedisTemplate redisTemplate) {
         Assert.notNull(key);
         Assert.notNull(redisTemplate);
-        this.redisTemplate = redisTemplate;
+        this.redisTemplate = new StringRedisTemplate(redisTemplate.getConnectionFactory());
         readLock = new ReadLock();
         writeLock = new WriteLock();
-        readLockKey = (PREFIX + key + ".read.lock").getBytes();
-        writeLockKey = (PREFIX + key + ".write.lock").getBytes();
-        lockValue = (UUID.randomUUID().toString()).getBytes();
+        readLockKey = PREFIX + key + ".read.lock";
+        writeLockKey = PREFIX + key + ".write.lock";
+        lockValue = UUID.randomUUID().toString();
     }
 
     @Override
@@ -46,11 +66,11 @@ public class RedisReadWriteLock implements ReadWriteLock {
         return writeLock;
     }
 
-    private byte[] getReadKey() {
+    private String getReadKey() {
         return readLockKey;
     }
 
-    private byte[] getWriteKey() {
+    private String getWriteKey() {
         return writeLockKey;
     }
 
@@ -69,89 +89,101 @@ public class RedisReadWriteLock implements ReadWriteLock {
         this.lockKeyExpireTime = lockKeyExpireTime;
     }
 
+
+
     class ReadLock implements Lock {
-        public byte[] lockValue() {
-            return new String(lockValue).concat(Thread.currentThread().getId() + "").getBytes();
+
+        private List<String> keys = new ArrayList<>();
+
+        public ReadLock() {
+            super();
+            keys.add(getWriteKey().toString());
+            keys.add(getReadKey().toString());
+        }
+
+        public String lockValue() {
+            return new String(lockValue).concat(Thread.currentThread().getId() + "");
         }
 
         @Override
         public void lock() {
-            redisTemplate.execute((RedisCallback<String>) connection -> {
-                boolean locked = false;
-                do {
-                    if (!connection.exists(getWriteKey())) {
-                        connection.setNX(getReadKey(), lockValue());
-                        connection.expire(getReadKey(), lockKeyExpireTime);
-                        locked = true;
-                    } else
-                        sleep();
-                } while (!locked);
-                return null;
-            });
+
+            while (true) {
+                Boolean locked = redisTemplate.execute(redisScriptRead, keys, lockValue());
+                if (!locked) {
+                    sleep();
+                } else {
+                    /*
+                    * 此处增加对所有读锁的过期
+                    * 1、防止项目停止,导致读锁一直存在
+                    *
+                    * @TODO 后期可以抽出到 redisScriptRead脚本中
+                    * */
+                    expire();
+                    break;
+                }
+            }
         }
 
         @Override
         public void lockInterruptibly() throws InterruptedException {
-            boolean locked = (Boolean) redisTemplate.execute((RedisCallback<Boolean>) connection ->
-            {
-                boolean writeLocked = connection.exists(getWriteKey());
-                if (!writeLocked) {
-                    if (connection.setNX(getReadKey(), lockValue)) {
-                        connection.expire(getReadKey(), lockKeyExpireTime);
-                    }
-                    writeLocked = true;
-                }
-                return writeLocked;
-            });
-            if (!locked) throw new InterruptedException("");
+
+            boolean locked = redisTemplate.execute(redisScriptRead, keys, lockValue());
+            if (locked) {
+                expire();
+            } else {
+                throw new InterruptedException("could not get the read lock!");
+            }
         }
 
         @Override
         public boolean tryLock() {
-            return (Boolean) redisTemplate.execute((RedisCallback<Boolean>) connection ->
-            {
-                if (connection.setNX(getReadKey(), lockValue)) {
-                    connection.expire(getReadKey(), lockKeyExpireTime);
-                }
-                return false;
-            });
+            boolean locked = redisTemplate.execute(redisScriptRead, keys, lockValue());
+            if (locked){
+                expire();
+            }
+            return locked;
         }
 
         @Override
         public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
             byte[] error = new byte[1];
-            boolean success = (Boolean) redisTemplate.execute((RedisCallback<Boolean>) connection -> {
-                boolean locked = false;
-                long startWith = System.nanoTime();
-                do {
-                    if (!connection.exists(getWriteKey())) {
-                        connection.setNX(getReadKey(), lockValue);
-                        connection.expire(getReadKey(), lockKeyExpireTime);
-                        return true;
-                    }
-                    long now = System.nanoTime();
-                    if (now - startWith > unit.toNanos(time)) {
-                        error[0] = 1;
-                        return false;
-                    }
-                    sleep();
-                } while (!locked);
-                return false;
-            });
+
+            boolean locked;
+            long startWith = System.nanoTime();
+            do {
+
+                locked = redisTemplate.execute(redisScriptRead, keys, lockValue());
+                if (locked) {
+                    expire();
+                    break;
+                }
+
+                long now = System.nanoTime();
+                if (now - startWith > unit.toNanos(time)) {
+                    error[0] = 1;
+                    break;
+                }
+                sleep();
+            } while (!locked);
+
             if (error[0] == 1) {
                 throw new InterruptedException("try lock time out!");
             }
-            return success;
+            return locked;
         }
 
         @Override
         public void unlock() {
             redisTemplate.execute((RedisCallback) conn -> {
-                byte[] lock = conn.get(getReadKey());
-                if (lock == null) return null;
+                StringRedisConnection strConn = (StringRedisConnection) conn;
+                Set<String> locks = strConn.sMembers(getReadKey());
+
+                if (locks == null || locks.size() == 0)
+                    return null;
                 //当前读锁为自己持有 才解锁
-                if (new String(lock).equals(new String(lockValue()))) {
-                    conn.del(getReadKey());
+                if (locks.contains(lockValue())) {
+                    strConn.sRem(getReadKey(), lockValue());
                 }
                 return null;
             });
@@ -161,88 +193,99 @@ public class RedisReadWriteLock implements ReadWriteLock {
         public Condition newCondition() {
             throw new UnsupportedOperationException();
         }
+
+        private void expire() {
+            redisTemplate.expire(getReadKey(), lockKeyExpireTime, TimeUnit.SECONDS);
+        }
     }
 
     class WriteLock implements Lock {
+
+        private List<String> keys = new ArrayList<>();
+
+        public WriteLock() {
+            super();
+            keys.add(getReadKey());
+            keys.add(getWriteKey());
+        }
+
         @Override
         public void lock() {
-            redisTemplate.execute((RedisCallback<String>) connection -> {
-                boolean locked = false, readLocked = false;
-                do {
-                    readLocked = connection.exists(getReadKey());
-                    if (!readLocked) {
-                        locked = connection.setNX(getWriteKey(), lockValue);
-                        connection.expire(getWriteKey(), lockKeyExpireTime);
-                    } else
-                        sleep();
-                } while (!locked);
-                return null;
-            });
+
+            boolean locked;
+
+            do {
+                locked = redisTemplate.execute(redisScriptWrite, keys, lockValue);
+                if (locked) {
+                    expire();
+                } else {
+                    sleep();
+                }
+            } while (!locked);
         }
 
         @Override
         public void lockInterruptibly() throws InterruptedException {
-            boolean locked = (Boolean) redisTemplate.execute((RedisCallback<Boolean>) connection ->
-            {
-                boolean readLocked = connection.exists(getReadKey());
-                if (!readLocked) {
-                    boolean _locked = connection.setNX(getWriteKey(), lockValue);
-                    if (_locked) connection.expire(getWriteKey(), lockKeyExpireTime);
-                    return _locked;
-                }
-                return false;
-            });
-            if (!locked) throw new InterruptedException("");
+            boolean locked = redisTemplate.execute(redisScriptWrite, keys, lockValue);
+            if (locked) {
+                expire();
+            } else {
+                throw new InterruptedException("");
+            }
         }
 
         @Override
         public boolean tryLock() {
-            return (Boolean) redisTemplate.execute((RedisCallback<Boolean>) connection -> {
-                if (connection.exists(getReadKey())) return false;
-                boolean locked = connection.setNX(getWriteKey(), lockValue);
-                if (locked)
-                    connection.expire(getWriteKey(), lockKeyExpireTime);
-                return locked;
-            });
+            boolean locked = redisTemplate.execute(redisScriptWrite, keys, lockValue);
+            if (locked) {
+                expire();
+            }
+            return locked;
         }
 
         @Override
         public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
             byte[] error = new byte[1];
-            boolean success = (Boolean) redisTemplate.execute((RedisCallback<Boolean>) connection -> {
-                boolean locked = false;
-                long startWith = System.nanoTime();
-                do {
-                    if (!connection.exists(getReadKey())) {
-                        locked = connection.setNX(getWriteKey(), lockValue);
-                        if (locked) {
-                            connection.expire(getWriteKey(), lockKeyExpireTime);
-                            return true;
-                        }
-                    }
-                    long now = System.nanoTime();
-                    if (now - startWith > unit.toNanos(time)) {
-                        error[0] = 1;
-                        return false;
-                    }
-                    sleep();
-                } while (!locked);
-                return null;
-            });
+
+            boolean locked;
+            long startWith = System.nanoTime();
+            do {
+                locked = redisTemplate.execute(redisScriptWrite, keys, lockValue);
+                long now = System.nanoTime();
+                if (now - startWith > unit.toNanos(time)) {
+                    error[0] = 1;
+                    break;
+                }
+                sleep();
+            } while (!locked);
+
+            if (locked) {
+                expire();
+            }
+
             if (error[0] == 1) {
                 throw new InterruptedException("lock time out!");
             }
-            return success;
+            return locked;
         }
 
         @Override
         public void unlock() {
-            redisTemplate.execute((RedisCallback) conn -> conn.del(getWriteKey()));
+            redisTemplate.execute((RedisCallback) conn -> {
+                StringRedisConnection strConn = (StringRedisConnection) conn;
+
+                strConn.del(getWriteKey());
+                return null;
+            });
         }
 
         @Override
         public Condition newCondition() {
             throw new UnsupportedOperationException();
         }
+
+        private void expire() {
+            redisTemplate.expire(getWriteKey(), lockKeyExpireTime, TimeUnit.SECONDS);
+        }
     }
 }

+ 21 - 0
hsweb-web-concurrent/hsweb-web-concurrent-lock/src/main/resources/META-INF/scheckAndVset.lua

@@ -0,0 +1,21 @@
+--
+-- Created by IntelliJ IDEA.
+-- User: aaa
+-- Date: 2016/9/19
+-- Time: 17:31
+-- To change this template use File | Settings | File Templates.
+--
+
+local size = redis.call('SCARD',KEYS[1])
+if(size == 0)
+then
+    local flag = redis.call('SETNX',KEYS[2],ARGV[1])
+    if(flag)
+    then
+        return true
+    else
+        return false
+    end
+else
+    return false
+end

+ 16 - 0
hsweb-web-concurrent/hsweb-web-concurrent-lock/src/main/resources/META-INF/vcheckAndsAdd.lua

@@ -0,0 +1,16 @@
+--
+-- Created by IntelliJ IDEA.
+-- User: aaa
+-- Date: 2016/9/19
+-- Time: 17:31
+-- To change this template use File | Settings | File Templates.
+--
+
+local v = redis.call('GET',KEYS[1])
+if(v)
+then
+    return false
+else
+    redis.call('SADD',KEYS[2],ARGV[1])
+    return true
+end