summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2017-05-09 11:57:09 +0200
committerantirez <antirez@gmail.com>2017-05-09 11:57:09 +0200
commitece658713b659513e2c43a9498da286cafec17dd (patch)
tree5102cb44ed2ed3c197bc901388a80b92c211c8cb
parent2a51bac44ebcff79dcd8ede2cab66ca213c9d504 (diff)
downloadredis-ece658713b659513e2c43a9498da286cafec17dd.tar.gz
Modules TSC: Improve inter-thread synchronization.
More work to do with server.unixtime and similar. Need to write Helgrind suppression file in order to suppress the valse positives.
-rw-r--r--src/atomicvar.h48
-rw-r--r--src/evict.c15
-rw-r--r--src/networking.c5
-rw-r--r--src/server.c15
-rw-r--r--src/server.h12
5 files changed, 75 insertions, 20 deletions
diff --git a/src/atomicvar.h b/src/atomicvar.h
index 1efa7bffb..9b5628ad6 100644
--- a/src/atomicvar.h
+++ b/src/atomicvar.h
@@ -3,18 +3,29 @@
*
* The exported interaface is composed of three macros:
*
- * atomicIncr(var,count,mutex) -- Increment the atomic counter
- * atomicDecr(var,count,mutex) -- Decrement the atomic counter
- * atomicGet(var,dstvar,mutex) -- Fetch the atomic counter value
+ * atomicIncr(var,count) -- Increment the atomic counter
+ * atomicGetIncr(var,oldvalue_var,count) -- Get and increment the atomic counter
+ * atomicDecr(var,count) -- Decrement the atomic counter
+ * atomicGet(var,dstvar) -- Fetch the atomic counter value
+ * atomicSet(var,value) -- Set the atomic counter value
+ *
+ * The variable 'var' should also have a declared mutex with the same
+ * name and the "_mutex" postfix, for instance:
+ *
+ * long myvar;
+ * pthread_mutex_t myvar_mutex;
+ * atomicSet(myvar,12345);
*
* If atomic primitives are availble (tested in config.h) the mutex
* is not used.
*
- * Never use return value from the macros. To update and get use instead:
+ * Never use return value from the macros, instead use the AtomicGetIncr()
+ * if you need to get the current value and increment it atomically, like
+ * in the followign example:
*
- * atomicIncr(mycounter,...);
- * atomicGet(mycounter,newvalue);
- * doSomethingWith(newvalue);
+ * long oldvalue;
+ * atomicGetIncr(myvar,oldvalue,1);
+ * doSomethingWith(oldvalue);
*
* ----------------------------------------------------------------------------
*
@@ -55,19 +66,29 @@
/* Implementation using __atomic macros. */
#define atomicIncr(var,count) __atomic_add_fetch(&var,(count),__ATOMIC_RELAXED)
+#define atomicGetIncr(var,oldvalue_var,count) do { \
+ oldvalue_var = __atomic_fetch_add(&var,(count),__ATOMIC_RELAXED); \
+} while(0)
#define atomicDecr(var,count) __atomic_sub_fetch(&var,(count),__ATOMIC_RELAXED)
#define atomicGet(var,dstvar) do { \
dstvar = __atomic_load_n(&var,__ATOMIC_RELAXED); \
} while(0)
+#define atomicSet(var,value) __atomic_store_n(&var,value,__ATOMIC_RELAXED)
#elif defined(HAVE_ATOMIC)
/* Implementation using __sync macros. */
#define atomicIncr(var,count) __sync_add_and_fetch(&var,(count))
+#define atomicGetIncr(var,oldvalue_var,count) do { \
+ oldvalue_var = __sync_fetch_and_add(&var,(count)); \
+} while(0)
#define atomicDecr(var,count) __sync_sub_and_fetch(&var,(count))
#define atomicGet(var,dstvar) do { \
dstvar = __sync_sub_and_fetch(&var,0); \
} while(0)
+#define atomicSet(var,value) do { \
+ while(!__sync_bool_compare_and_swap(&var,var,value)); \
+} while(0)
#else
/* Implementation using pthread mutex. */
@@ -78,6 +99,13 @@
pthread_mutex_unlock(&var ## _mutex); \
} while(0)
+#define atomicGetIncr(var,oldvalue_var,count) do { \
+ pthread_mutex_lock(&var ## _mutex); \
+ oldvalue_var = var; \
+ var += (count); \
+ pthread_mutex_unlock(&var ## _mutex); \
+} while(0)
+
#define atomicDecr(var,count) do { \
pthread_mutex_lock(&var ## _mutex); \
var -= (count); \
@@ -89,6 +117,12 @@
dstvar = var; \
pthread_mutex_unlock(&var ## _mutex); \
} while(0)
+
+#define atomicSet(var,value) do { \
+ pthread_mutex_lock(&var ## _mutex); \
+ var = value; \
+ pthread_mutex_unlock(&var ## _mutex); \
+} while(0)
#endif
#endif /* __ATOMIC_VAR_H */
diff --git a/src/evict.c b/src/evict.c
index 62753c5a7..bf5bea6b0 100644
--- a/src/evict.c
+++ b/src/evict.c
@@ -32,6 +32,7 @@
#include "server.h"
#include "bio.h"
+#include "atomicvar.h"
/* ----------------------------------------------------------------------------
* Data structures
@@ -72,6 +73,20 @@ unsigned int getLRUClock(void) {
return (mstime()/LRU_CLOCK_RESOLUTION) & LRU_CLOCK_MAX;
}
+/* This function is used to obtain the current LRU clock.
+ * If the current resolution is lower than the frequency we refresh the
+ * LRU clock (as it should be in production servers) we return the
+ * precomputed value, otherwise we need to resort to a system call. */
+unsigned int LRU_CLOCK(void) {
+ unsigned int lruclock;
+ if (1000/server.hz <= LRU_CLOCK_RESOLUTION) {
+ atomicGet(server.lruclock,lruclock);
+ } else {
+ lruclock = getLRUClock();
+ }
+ return lruclock;
+}
+
/* Given an object returns the min number of milliseconds the object was never
* requested, using an approximated LRU algorithm. */
unsigned long long estimateObjectIdleTime(robj *o) {
diff --git a/src/networking.c b/src/networking.c
index fae8e52bd..efaca1bc6 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -28,6 +28,7 @@
*/
#include "server.h"
+#include "atomicvar.h"
#include <sys/uio.h>
#include <math.h>
#include <ctype.h>
@@ -88,7 +89,9 @@ client *createClient(int fd) {
}
selectDb(c,0);
- c->id = server.next_client_id++;
+ uint64_t client_id;
+ atomicGetIncr(server.next_client_id,client_id,1);
+ c->id = client_id;
c->fd = fd;
c->name = NULL;
c->bufpos = 0;
diff --git a/src/server.c b/src/server.c
index 6be12cffe..e1858cb53 100644
--- a/src/server.c
+++ b/src/server.c
@@ -32,6 +32,7 @@
#include "slowlog.h"
#include "bio.h"
#include "latency.h"
+#include "atomicvar.h"
#include <time.h>
#include <signal.h>
@@ -68,7 +69,8 @@ double R_Zero, R_PosInf, R_NegInf, R_Nan;
/*================================= Globals ================================= */
/* Global vars */
-struct redisServer server; /* server global state */
+struct redisServer server; /* Server global state */
+volatile unsigned long lru_clock; /* Server global current LRU time. */
/* Our command table.
*
@@ -976,7 +978,8 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
*
* Note that you can change the resolution altering the
* LRU_CLOCK_RESOLUTION define. */
- server.lruclock = getLRUClock();
+ unsigned long lruclock = getLRUClock();
+ atomicSet(server.lruclock,lruclock);
/* Record the max memory used since the server was started. */
if (zmalloc_used_memory() > server.stat_peak_memory)
@@ -1420,6 +1423,7 @@ void initServerConfig(void) {
server.cluster_announce_bus_port = CONFIG_DEFAULT_CLUSTER_ANNOUNCE_BUS_PORT;
server.migrate_cached_sockets = dictCreate(&migrateCacheDictType,NULL);
server.next_client_id = 1; /* Client IDs, start from 1 .*/
+ pthread_mutex_init(&server.next_client_id_mutex,NULL);
server.loading_process_events_interval_bytes = (1024*1024*2);
server.lazyfree_lazy_eviction = CONFIG_DEFAULT_LAZYFREE_LAZY_EVICTION;
server.lazyfree_lazy_expire = CONFIG_DEFAULT_LAZYFREE_LAZY_EXPIRE;
@@ -1427,7 +1431,8 @@ void initServerConfig(void) {
server.always_show_logo = CONFIG_DEFAULT_ALWAYS_SHOW_LOGO;
server.lua_time_limit = LUA_SCRIPT_TIME_LIMIT;
- server.lruclock = getLRUClock();
+ unsigned int lruclock = getLRUClock();
+ atomicSet(server.lruclock,lruclock);
resetServerSaveParams();
appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
@@ -2809,6 +2814,8 @@ sds genRedisInfoString(char *section) {
call_uname = 0;
}
+ unsigned int lruclock;
+ atomicGet(server.lruclock,lruclock);
info = sdscatprintf(info,
"# Server\r\n"
"redis_version:%s\r\n"
@@ -2848,7 +2855,7 @@ sds genRedisInfoString(char *section) {
(intmax_t)uptime,
(intmax_t)(uptime/(3600*24)),
server.hz,
- (unsigned long) server.lruclock,
+ (unsigned long) lruclock,
server.executable ? server.executable : "",
server.configfile ? server.configfile : "");
}
diff --git a/src/server.h b/src/server.h
index 2bc49299b..ea46e5e23 100644
--- a/src/server.h
+++ b/src/server.h
@@ -563,19 +563,13 @@ typedef struct RedisModuleIO {
typedef struct redisObject {
unsigned type:4;
unsigned encoding:4;
- unsigned lru:LRU_BITS; /* LRU time (relative to server.lruclock) or
+ unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or
* LFU data (least significant 8 bits frequency
* and most significant 16 bits decreas time). */
int refcount;
void *ptr;
} robj;
-/* Macro used to obtain the current LRU clock.
- * If the current resolution is lower than the frequency we refresh the
- * LRU clock (as it should be in production servers) we return the
- * precomputed value, otherwise we need to resort to a system call. */
-#define LRU_CLOCK() ((1000/server.hz <= LRU_CLOCK_RESOLUTION) ? server.lruclock : getLRUClock())
-
/* Macro used to initialize a Redis object allocated on the stack.
* Note that this macro is taken near the structure definition to make sure
* we'll update it when the structure is changed, to avoid bugs like
@@ -866,7 +860,7 @@ struct redisServer {
dict *commands; /* Command table */
dict *orig_commands; /* Command table before command renaming. */
aeEventLoop *el;
- unsigned lruclock:LRU_BITS; /* Clock for LRU eviction */
+ unsigned int lruclock; /* Clock for LRU eviction */
int shutdown_asap; /* SHUTDOWN needed ASAP */
int activerehashing; /* Incremental rehash in serverCron() */
int active_defrag_running; /* Active defragmentation running (holds current scan aggressiveness) */
@@ -906,6 +900,7 @@ struct redisServer {
char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */
dict *migrate_cached_sockets;/* MIGRATE cached sockets */
uint64_t next_client_id; /* Next client unique ID. Incremental. */
+ pthread_mutex_t next_client_id_mutex;
int protected_mode; /* Don't accept external connections. */
/* RDB / AOF loading information */
int loading; /* We are loading data from disk if true */
@@ -1608,6 +1603,7 @@ void updateCachedTime(void);
void resetServerStats(void);
void activeDefragCycle(void);
unsigned int getLRUClock(void);
+unsigned int LRU_CLOCK(void);
const char *evictPolicyToString(void);
struct redisMemOverhead *getMemoryOverheadData(void);
void freeMemoryOverheadData(struct redisMemOverhead *mh);