From ece658713b659513e2c43a9498da286cafec17dd Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 9 May 2017 11:57:09 +0200 Subject: Modules TSC: Improve inter-thread synchronization. More work to do with server.unixtime and similar. Need to write Helgrind suppression file in order to suppress the valse positives. --- src/atomicvar.h | 48 +++++++++++++++++++++++++++++++++++++++++------- src/evict.c | 15 +++++++++++++++ src/networking.c | 5 ++++- src/server.c | 15 +++++++++++---- src/server.h | 12 ++++-------- 5 files changed, 75 insertions(+), 20 deletions(-) diff --git a/src/atomicvar.h b/src/atomicvar.h index 1efa7bffb..9b5628ad6 100644 --- a/src/atomicvar.h +++ b/src/atomicvar.h @@ -3,18 +3,29 @@ * * The exported interaface is composed of three macros: * - * atomicIncr(var,count,mutex) -- Increment the atomic counter - * atomicDecr(var,count,mutex) -- Decrement the atomic counter - * atomicGet(var,dstvar,mutex) -- Fetch the atomic counter value + * atomicIncr(var,count) -- Increment the atomic counter + * atomicGetIncr(var,oldvalue_var,count) -- Get and increment the atomic counter + * atomicDecr(var,count) -- Decrement the atomic counter + * atomicGet(var,dstvar) -- Fetch the atomic counter value + * atomicSet(var,value) -- Set the atomic counter value + * + * The variable 'var' should also have a declared mutex with the same + * name and the "_mutex" postfix, for instance: + * + * long myvar; + * pthread_mutex_t myvar_mutex; + * atomicSet(myvar,12345); * * If atomic primitives are availble (tested in config.h) the mutex * is not used. * - * Never use return value from the macros. To update and get use instead: + * Never use return value from the macros, instead use the AtomicGetIncr() + * if you need to get the current value and increment it atomically, like + * in the followign example: * - * atomicIncr(mycounter,...); - * atomicGet(mycounter,newvalue); - * doSomethingWith(newvalue); + * long oldvalue; + * atomicGetIncr(myvar,oldvalue,1); + * doSomethingWith(oldvalue); * * ---------------------------------------------------------------------------- * @@ -55,19 +66,29 @@ /* Implementation using __atomic macros. */ #define atomicIncr(var,count) __atomic_add_fetch(&var,(count),__ATOMIC_RELAXED) +#define atomicGetIncr(var,oldvalue_var,count) do { \ + oldvalue_var = __atomic_fetch_add(&var,(count),__ATOMIC_RELAXED); \ +} while(0) #define atomicDecr(var,count) __atomic_sub_fetch(&var,(count),__ATOMIC_RELAXED) #define atomicGet(var,dstvar) do { \ dstvar = __atomic_load_n(&var,__ATOMIC_RELAXED); \ } while(0) +#define atomicSet(var,value) __atomic_store_n(&var,value,__ATOMIC_RELAXED) #elif defined(HAVE_ATOMIC) /* Implementation using __sync macros. */ #define atomicIncr(var,count) __sync_add_and_fetch(&var,(count)) +#define atomicGetIncr(var,oldvalue_var,count) do { \ + oldvalue_var = __sync_fetch_and_add(&var,(count)); \ +} while(0) #define atomicDecr(var,count) __sync_sub_and_fetch(&var,(count)) #define atomicGet(var,dstvar) do { \ dstvar = __sync_sub_and_fetch(&var,0); \ } while(0) +#define atomicSet(var,value) do { \ + while(!__sync_bool_compare_and_swap(&var,var,value)); \ +} while(0) #else /* Implementation using pthread mutex. */ @@ -78,6 +99,13 @@ pthread_mutex_unlock(&var ## _mutex); \ } while(0) +#define atomicGetIncr(var,oldvalue_var,count) do { \ + pthread_mutex_lock(&var ## _mutex); \ + oldvalue_var = var; \ + var += (count); \ + pthread_mutex_unlock(&var ## _mutex); \ +} while(0) + #define atomicDecr(var,count) do { \ pthread_mutex_lock(&var ## _mutex); \ var -= (count); \ @@ -89,6 +117,12 @@ dstvar = var; \ pthread_mutex_unlock(&var ## _mutex); \ } while(0) + +#define atomicSet(var,value) do { \ + pthread_mutex_lock(&var ## _mutex); \ + var = value; \ + pthread_mutex_unlock(&var ## _mutex); \ +} while(0) #endif #endif /* __ATOMIC_VAR_H */ diff --git a/src/evict.c b/src/evict.c index 62753c5a7..bf5bea6b0 100644 --- a/src/evict.c +++ b/src/evict.c @@ -32,6 +32,7 @@ #include "server.h" #include "bio.h" +#include "atomicvar.h" /* ---------------------------------------------------------------------------- * Data structures @@ -72,6 +73,20 @@ unsigned int getLRUClock(void) { return (mstime()/LRU_CLOCK_RESOLUTION) & LRU_CLOCK_MAX; } +/* This function is used to obtain the current LRU clock. + * If the current resolution is lower than the frequency we refresh the + * LRU clock (as it should be in production servers) we return the + * precomputed value, otherwise we need to resort to a system call. */ +unsigned int LRU_CLOCK(void) { + unsigned int lruclock; + if (1000/server.hz <= LRU_CLOCK_RESOLUTION) { + atomicGet(server.lruclock,lruclock); + } else { + lruclock = getLRUClock(); + } + return lruclock; +} + /* Given an object returns the min number of milliseconds the object was never * requested, using an approximated LRU algorithm. */ unsigned long long estimateObjectIdleTime(robj *o) { diff --git a/src/networking.c b/src/networking.c index fae8e52bd..efaca1bc6 100644 --- a/src/networking.c +++ b/src/networking.c @@ -28,6 +28,7 @@ */ #include "server.h" +#include "atomicvar.h" #include #include #include @@ -88,7 +89,9 @@ client *createClient(int fd) { } selectDb(c,0); - c->id = server.next_client_id++; + uint64_t client_id; + atomicGetIncr(server.next_client_id,client_id,1); + c->id = client_id; c->fd = fd; c->name = NULL; c->bufpos = 0; diff --git a/src/server.c b/src/server.c index 6be12cffe..e1858cb53 100644 --- a/src/server.c +++ b/src/server.c @@ -32,6 +32,7 @@ #include "slowlog.h" #include "bio.h" #include "latency.h" +#include "atomicvar.h" #include #include @@ -68,7 +69,8 @@ double R_Zero, R_PosInf, R_NegInf, R_Nan; /*================================= Globals ================================= */ /* Global vars */ -struct redisServer server; /* server global state */ +struct redisServer server; /* Server global state */ +volatile unsigned long lru_clock; /* Server global current LRU time. */ /* Our command table. * @@ -976,7 +978,8 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { * * Note that you can change the resolution altering the * LRU_CLOCK_RESOLUTION define. */ - server.lruclock = getLRUClock(); + unsigned long lruclock = getLRUClock(); + atomicSet(server.lruclock,lruclock); /* Record the max memory used since the server was started. */ if (zmalloc_used_memory() > server.stat_peak_memory) @@ -1420,6 +1423,7 @@ void initServerConfig(void) { server.cluster_announce_bus_port = CONFIG_DEFAULT_CLUSTER_ANNOUNCE_BUS_PORT; server.migrate_cached_sockets = dictCreate(&migrateCacheDictType,NULL); server.next_client_id = 1; /* Client IDs, start from 1 .*/ + pthread_mutex_init(&server.next_client_id_mutex,NULL); server.loading_process_events_interval_bytes = (1024*1024*2); server.lazyfree_lazy_eviction = CONFIG_DEFAULT_LAZYFREE_LAZY_EVICTION; server.lazyfree_lazy_expire = CONFIG_DEFAULT_LAZYFREE_LAZY_EXPIRE; @@ -1427,7 +1431,8 @@ void initServerConfig(void) { server.always_show_logo = CONFIG_DEFAULT_ALWAYS_SHOW_LOGO; server.lua_time_limit = LUA_SCRIPT_TIME_LIMIT; - server.lruclock = getLRUClock(); + unsigned int lruclock = getLRUClock(); + atomicSet(server.lruclock,lruclock); resetServerSaveParams(); appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */ @@ -2809,6 +2814,8 @@ sds genRedisInfoString(char *section) { call_uname = 0; } + unsigned int lruclock; + atomicGet(server.lruclock,lruclock); info = sdscatprintf(info, "# Server\r\n" "redis_version:%s\r\n" @@ -2848,7 +2855,7 @@ sds genRedisInfoString(char *section) { (intmax_t)uptime, (intmax_t)(uptime/(3600*24)), server.hz, - (unsigned long) server.lruclock, + (unsigned long) lruclock, server.executable ? server.executable : "", server.configfile ? server.configfile : ""); } diff --git a/src/server.h b/src/server.h index 2bc49299b..ea46e5e23 100644 --- a/src/server.h +++ b/src/server.h @@ -563,19 +563,13 @@ typedef struct RedisModuleIO { typedef struct redisObject { unsigned type:4; unsigned encoding:4; - unsigned lru:LRU_BITS; /* LRU time (relative to server.lruclock) or + unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or * LFU data (least significant 8 bits frequency * and most significant 16 bits decreas time). */ int refcount; void *ptr; } robj; -/* Macro used to obtain the current LRU clock. - * If the current resolution is lower than the frequency we refresh the - * LRU clock (as it should be in production servers) we return the - * precomputed value, otherwise we need to resort to a system call. */ -#define LRU_CLOCK() ((1000/server.hz <= LRU_CLOCK_RESOLUTION) ? server.lruclock : getLRUClock()) - /* Macro used to initialize a Redis object allocated on the stack. * Note that this macro is taken near the structure definition to make sure * we'll update it when the structure is changed, to avoid bugs like @@ -866,7 +860,7 @@ struct redisServer { dict *commands; /* Command table */ dict *orig_commands; /* Command table before command renaming. */ aeEventLoop *el; - unsigned lruclock:LRU_BITS; /* Clock for LRU eviction */ + unsigned int lruclock; /* Clock for LRU eviction */ int shutdown_asap; /* SHUTDOWN needed ASAP */ int activerehashing; /* Incremental rehash in serverCron() */ int active_defrag_running; /* Active defragmentation running (holds current scan aggressiveness) */ @@ -906,6 +900,7 @@ struct redisServer { char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */ dict *migrate_cached_sockets;/* MIGRATE cached sockets */ uint64_t next_client_id; /* Next client unique ID. Incremental. */ + pthread_mutex_t next_client_id_mutex; int protected_mode; /* Don't accept external connections. */ /* RDB / AOF loading information */ int loading; /* We are loading data from disk if true */ @@ -1608,6 +1603,7 @@ void updateCachedTime(void); void resetServerStats(void); void activeDefragCycle(void); unsigned int getLRUClock(void); +unsigned int LRU_CLOCK(void); const char *evictPolicyToString(void); struct redisMemOverhead *getMemoryOverheadData(void); void freeMemoryOverheadData(struct redisMemOverhead *mh); -- cgit v1.2.1