summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorWang Yuan <wangyuan21@baidu.com>2020-09-17 21:01:45 +0800
committerGitHub <noreply@github.com>2020-09-17 16:01:45 +0300
commit445a4b669a3a7232a18bf23340c5f7d580aa92c7 (patch)
treea129255257c274a946489b4389e410ab928f4484 /src
parent092cfca5224e8e88053cd5b8a7b82f0f14b17011 (diff)
downloadredis-445a4b669a3a7232a18bf23340c5f7d580aa92c7.tar.gz
Implement redisAtomic to replace _Atomic C11 builtin (#7707)
Redis 6.0 introduces I/O threads, it is so cool and efficient, we use C11 _Atomic to establish inter-thread synchronization without mutex. But the compiler that must supports C11 _Atomic can compile redis code, that brings a lot of inconvenience since some common platforms can't support by default such as CentOS7, so we want to implement redis atomic type to make it more portable. We have implemented our atomic variable for redis that only has 'relaxed' operations in src/atomicvar.h, so we implement some operations with 'sequentially-consistent', just like the default behavior of C11 _Atomic that can establish inter-thread synchronization. And we replace all uses of C11 _Atomic with redis atomic variable. Our implementation of redis atomic variable uses C11 _Atomic, __atomic or __sync macros if available, it supports most common platforms, and we will detect automatically which feature we use. In Makefile we use a dummy file to detect if the compiler supports C11 _Atomic. Now for gcc, we can compile redis code theoretically if your gcc version is not less than 4.1.2(starts to support __sync_xxx operations). Otherwise, we remove use mutex fallback to implement redis atomic variable for performance and test. You will get compiling errors if your compiler doesn't support all features of above. For cover redis atomic variable tests, we add other CI jobs that build redis on CentOS6 and CentOS7 and workflow daily jobs that run the tests on them. For them, we just install gcc by default in order to cover different compiler versions, gcc is 4.4.7 by default installation on CentOS6 and 4.8.5 on CentOS7. We restore the feature that we can test redis with Helgrind to find data race errors. But you need install Valgrind in the default path configuration firstly before running your tests, since we use macros in helgrind.h to tell Helgrind inter-thread happens-before relationship explicitly for avoiding false positives. Please open an issue on github if you find data race errors relate to this commit. Unrelated: - Fix redefinition of typedef 'RedisModuleUserChangedFunc' For some old version compilers, they will report errors or warnings, if we re-define function type.
Diffstat (limited to 'src')
-rw-r--r--src/Makefile14
-rw-r--r--src/atomicvar.h109
-rw-r--r--src/evict.c2
-rw-r--r--src/lazyfree.c3
-rw-r--r--src/module.c5
-rw-r--r--src/networking.c41
-rw-r--r--src/redis-benchmark.c25
-rw-r--r--src/replication.c10
-rw-r--r--src/server.c44
-rw-r--r--src/server.h23
-rw-r--r--src/zmalloc.c3
11 files changed, 163 insertions, 116 deletions
diff --git a/src/Makefile b/src/Makefile
index 9ff344c6a..6414b8c78 100644
--- a/src/Makefile
+++ b/src/Makefile
@@ -20,7 +20,7 @@ DEPENDENCY_TARGETS=hiredis linenoise lua hdr_histogram
NODEPS:=clean distclean
# Default settings
-STD=-std=c11 -pedantic -DREDIS_STATIC=''
+STD=-pedantic -DREDIS_STATIC=''
ifneq (,$(findstring clang,$(CC)))
ifneq (,$(findstring FreeBSD,$(uname_S)))
STD+=-Wno-c11-extensions
@@ -29,6 +29,16 @@ endif
WARN=-Wall -W -Wno-missing-field-initializers
OPT=$(OPTIMIZATION)
+# Detect if the compiler supports C11 _Atomic
+C11_ATOMIC := $(shell sh -c 'echo "\#include <stdatomic.h>" > foo.c; \
+ $(CC) -std=c11 -c foo.c -o foo.o &> /dev/null; \
+ if [ -f foo.o ]; then echo "yes"; rm foo.o; fi; rm foo.c')
+ifeq ($(C11_ATOMIC),yes)
+ STD+=-std=c11
+else
+ STD+=-std=c99
+endif
+
PREFIX?=/usr/local
INSTALL_BIN=$(PREFIX)/bin
INSTALL=install
@@ -367,7 +377,7 @@ valgrind:
$(MAKE) OPTIMIZATION="-O0" MALLOC="libc"
helgrind:
- $(MAKE) OPTIMIZATION="-O0" MALLOC="libc" CFLAGS="-D__ATOMIC_VAR_FORCE_SYNC_MACROS"
+ $(MAKE) OPTIMIZATION="-O0" MALLOC="libc" CFLAGS="-D__ATOMIC_VAR_FORCE_SYNC_MACROS" REDIS_CFLAGS="-I/usr/local/include" REDIS_LDFLAGS="-L/usr/local/lib"
src/help.h:
@../utils/generate-command-help.rb > help.h
diff --git a/src/atomicvar.h b/src/atomicvar.h
index ecd26ad70..6ac04c605 100644
--- a/src/atomicvar.h
+++ b/src/atomicvar.h
@@ -1,5 +1,5 @@
-/* This file implements atomic counters using __atomic or __sync macros if
- * available, otherwise synchronizing different threads using a mutex.
+/* This file implements atomic counters using c11 _Atomic, __atomic or __sync
+ * macros if available, otherwise we will throw an error when compile.
*
* The exported interface is composed of three macros:
*
@@ -8,16 +8,8 @@
* atomicDecr(var,count) -- Decrement the atomic counter
* atomicGet(var,dstvar) -- Fetch the atomic counter value
* atomicSet(var,value) -- Set the atomic counter value
- *
- * The variable 'var' should also have a declared mutex with the same
- * name and the "_mutex" postfix, for instance:
- *
- * long myvar;
- * pthread_mutex_t myvar_mutex;
- * atomicSet(myvar,12345);
- *
- * If atomic primitives are available (tested in config.h) the mutex
- * is not used.
+ * atomicGetWithSync(var,value) -- 'atomicGet' with inter-thread synchronization
+ * atomicSetWithSync(var,value) -- 'atomicSet' with inter-thread synchronization
*
* Never use return value from the macros, instead use the AtomicGetIncr()
* if you need to get the current value and increment it atomically, like
@@ -58,17 +50,64 @@
*/
#include <pthread.h>
+#include "config.h"
#ifndef __ATOMIC_VAR_H
#define __ATOMIC_VAR_H
+/* Define redisAtomic for atomic variable. */
+#define redisAtomic
+
/* To test Redis with Helgrind (a Valgrind tool) it is useful to define
* the following macro, so that __sync macros are used: those can be detected
* by Helgrind (even if they are less efficient) so that no false positive
* is reported. */
// #define __ATOMIC_VAR_FORCE_SYNC_MACROS
-#if !defined(__ATOMIC_VAR_FORCE_SYNC_MACROS) && defined(__ATOMIC_RELAXED) && !defined(__sun) && (!defined(__clang__) || !defined(__APPLE__) || __apple_build_version__ > 4210057)
+/* There will be many false positives if we test Redis with Helgrind, since
+ * Helgrind can't understand we have imposed ordering on the program, so
+ * we use macros in helgrind.h to tell Helgrind inter-thread happens-before
+ * relationship explicitly for avoiding false positives.
+ *
+ * For more details, please see: valgrind/helgrind.h and
+ * https://www.valgrind.org/docs/manual/hg-manual.html#hg-manual.effective-use
+ *
+ * These macros take effect only when 'make helgrind', and you must first
+ * install Valgrind in the default path configuration. */
+#ifdef __ATOMIC_VAR_FORCE_SYNC_MACROS
+#include <valgrind/helgrind.h>
+#else
+#define ANNOTATE_HAPPENS_BEFORE(v) ((void) v)
+#define ANNOTATE_HAPPENS_AFTER(v) ((void) v)
+#endif
+
+#if !defined(__ATOMIC_VAR_FORCE_SYNC_MACROS) && defined(__STDC_VERSION__) && \
+ (__STDC_VERSION__ >= 201112L) && !defined(__STDC_NO_ATOMICS__)
+/* Use '_Atomic' keyword if the compiler supports. */
+#undef redisAtomic
+#define redisAtomic _Atomic
+/* Implementation using _Atomic in C11. */
+
+#include <stdatomic.h>
+#define atomicIncr(var,count) atomic_fetch_add_explicit(&var,(count),memory_order_relaxed)
+#define atomicGetIncr(var,oldvalue_var,count) do { \
+ oldvalue_var = atomic_fetch_add_explicit(&var,(count),memory_order_relaxed); \
+} while(0)
+#define atomicDecr(var,count) atomic_fetch_sub_explicit(&var,(count),memory_order_relaxed)
+#define atomicGet(var,dstvar) do { \
+ dstvar = atomic_load_explicit(&var,memory_order_relaxed); \
+} while(0)
+#define atomicSet(var,value) atomic_store_explicit(&var,value,memory_order_relaxed)
+#define atomicGetWithSync(var,dstvar) do { \
+ dstvar = atomic_load_explicit(&var,memory_order_seq_cst); \
+} while(0)
+#define atomicSetWithSync(var,value) \
+ atomic_store_explicit(&var,value,memory_order_seq_cst)
+#define REDIS_ATOMIC_API "c11-builtin"
+
+#elif !defined(__ATOMIC_VAR_FORCE_SYNC_MACROS) && !defined(__sun) && \
+ (!defined(__clang__) || !defined(__APPLE__) || __apple_build_version__ > 4210057) && \
+ defined(__ATOMIC_RELAXED) && defined(__ATOMIC_SEQ_CST)
/* Implementation using __atomic macros. */
#define atomicIncr(var,count) __atomic_add_fetch(&var,(count),__ATOMIC_RELAXED)
@@ -80,6 +119,11 @@
dstvar = __atomic_load_n(&var,__ATOMIC_RELAXED); \
} while(0)
#define atomicSet(var,value) __atomic_store_n(&var,value,__ATOMIC_RELAXED)
+#define atomicGetWithSync(var,dstvar) do { \
+ dstvar = __atomic_load_n(&var,__ATOMIC_SEQ_CST); \
+} while(0)
+#define atomicSetWithSync(var,value) \
+ __atomic_store_n(&var,value,__ATOMIC_SEQ_CST)
#define REDIS_ATOMIC_API "atomic-builtin"
#elif defined(HAVE_ATOMIC)
@@ -96,38 +140,19 @@
#define atomicSet(var,value) do { \
while(!__sync_bool_compare_and_swap(&var,var,value)); \
} while(0)
+/* Actually the builtin issues a full memory barrier by default. */
+#define atomicGetWithSync(var,dstvar) { \
+ dstvar = __sync_sub_and_fetch(&var,0,__sync_synchronize); \
+ ANNOTATE_HAPPENS_AFTER(&var); \
+} while(0)
+#define atomicSetWithSync(var,value) do { \
+ ANNOTATE_HAPPENS_BEFORE(&var); \
+ while(!__sync_bool_compare_and_swap(&var,var,value,__sync_synchronize)); \
+} while(0)
#define REDIS_ATOMIC_API "sync-builtin"
#else
-/* Implementation using pthread mutex. */
-
-#define atomicIncr(var,count) do { \
- pthread_mutex_lock(&var ## _mutex); \
- var += (count); \
- pthread_mutex_unlock(&var ## _mutex); \
-} while(0)
-#define atomicGetIncr(var,oldvalue_var,count) do { \
- pthread_mutex_lock(&var ## _mutex); \
- oldvalue_var = var; \
- var += (count); \
- pthread_mutex_unlock(&var ## _mutex); \
-} while(0)
-#define atomicDecr(var,count) do { \
- pthread_mutex_lock(&var ## _mutex); \
- var -= (count); \
- pthread_mutex_unlock(&var ## _mutex); \
-} while(0)
-#define atomicGet(var,dstvar) do { \
- pthread_mutex_lock(&var ## _mutex); \
- dstvar = var; \
- pthread_mutex_unlock(&var ## _mutex); \
-} while(0)
-#define atomicSet(var,value) do { \
- pthread_mutex_lock(&var ## _mutex); \
- var = value; \
- pthread_mutex_unlock(&var ## _mutex); \
-} while(0)
-#define REDIS_ATOMIC_API "pthread-mutex"
+#error "Unable to determine atomic operations for your platform"
#endif
#endif /* __ATOMIC_VAR_H */
diff --git a/src/evict.c b/src/evict.c
index 258e1a25e..0e8f818ff 100644
--- a/src/evict.c
+++ b/src/evict.c
@@ -79,7 +79,7 @@ unsigned int getLRUClock(void) {
unsigned int LRU_CLOCK(void) {
unsigned int lruclock;
if (1000/server.hz <= LRU_CLOCK_RESOLUTION) {
- lruclock = server.lruclock;
+ atomicGet(server.lruclock,lruclock);
} else {
lruclock = getLRUClock();
}
diff --git a/src/lazyfree.c b/src/lazyfree.c
index 821dc50df..b0fc26fcf 100644
--- a/src/lazyfree.c
+++ b/src/lazyfree.c
@@ -3,8 +3,7 @@
#include "atomicvar.h"
#include "cluster.h"
-static size_t lazyfree_objects = 0;
-pthread_mutex_t lazyfree_objects_mutex = PTHREAD_MUTEX_INITIALIZER;
+static redisAtomic size_t lazyfree_objects = 0;
/* Return the number of currently pending objects to free. */
size_t lazyfreeGetPendingObjectsCount(void) {
diff --git a/src/module.c b/src/module.c
index 315161ed9..a501a4830 100644
--- a/src/module.c
+++ b/src/module.c
@@ -357,11 +357,6 @@ unsigned long long ModulesInHooks = 0; /* Total number of modules in hooks
/* Data structures related to the redis module users */
-/* This callback type is called by moduleNotifyUserChanged() every time
- * a user authenticated via the module API is associated with a different
- * user or gets disconnected. */
-typedef void (*RedisModuleUserChangedFunc) (uint64_t client_id, void *privdata);
-
/* This is the object returned by RM_CreateModuleUser(). The module API is
* able to create users, set ACLs to such users, and later authenticate
* clients using such newly created users. */
diff --git a/src/networking.c b/src/networking.c
index 6becdc288..3fa298783 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -103,7 +103,8 @@ client *createClient(connection *conn) {
}
selectDb(c,0);
- uint64_t client_id = ++server.next_client_id;
+ uint64_t client_id;
+ atomicGetIncr(server.next_client_id, client_id, 1);
c->id = client_id;
c->resp = 2;
c->conn = conn;
@@ -1314,7 +1315,7 @@ client *lookupClientByID(uint64_t id) {
* thread safe. */
int writeToClient(client *c, int handler_installed) {
/* Update total number of writes on server */
- server.stat_total_writes_processed++;
+ atomicIncr(server.stat_total_writes_processed, 1);
ssize_t nwritten = 0, totwritten = 0;
size_t objlen;
@@ -1376,7 +1377,7 @@ int writeToClient(client *c, int handler_installed) {
zmalloc_used_memory() < server.maxmemory) &&
!(c->flags & CLIENT_SLAVE)) break;
}
- server.stat_net_output_bytes += totwritten;
+ atomicIncr(server.stat_net_output_bytes, totwritten);
if (nwritten == -1) {
if (connGetState(c->conn) == CONN_STATE_CONNECTED) {
nwritten = 0;
@@ -1933,7 +1934,7 @@ void readQueryFromClient(connection *conn) {
if (postponeClientRead(c)) return;
/* Update total number of reads on server */
- server.stat_total_reads_processed++;
+ atomicIncr(server.stat_total_reads_processed, 1);
readlen = PROTO_IOBUF_LEN;
/* If this is a multi bulk request, and we are processing a bulk reply
@@ -1979,7 +1980,7 @@ void readQueryFromClient(connection *conn) {
sdsIncrLen(c->querybuf,nread);
c->lastinteraction = server.unixtime;
if (c->flags & CLIENT_MASTER) c->read_reploff += nread;
- server.stat_net_input_bytes += nread;
+ atomicIncr(server.stat_net_input_bytes, nread);
if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
@@ -2938,7 +2939,7 @@ int tio_debug = 0;
pthread_t io_threads[IO_THREADS_MAX_NUM];
pthread_mutex_t io_threads_mutex[IO_THREADS_MAX_NUM];
-_Atomic unsigned long io_threads_pending[IO_THREADS_MAX_NUM];
+redisAtomic unsigned long io_threads_pending[IO_THREADS_MAX_NUM];
int io_threads_op; /* IO_THREADS_OP_WRITE or IO_THREADS_OP_READ. */
/* This is the list of clients each thread will serve when threaded I/O is
@@ -2946,6 +2947,16 @@ int io_threads_op; /* IO_THREADS_OP_WRITE or IO_THREADS_OP_READ. */
* itself. */
list *io_threads_list[IO_THREADS_MAX_NUM];
+static inline unsigned long getIOPendingCount(int i) {
+ unsigned long count = 0;
+ atomicGetWithSync(io_threads_pending[i], count);
+ return count;
+}
+
+static inline void setIOPendingCount(int i, unsigned long count) {
+ atomicSetWithSync(io_threads_pending[i], count);
+}
+
void *IOThreadMain(void *myid) {
/* The ID is the thread number (from 0 to server.iothreads_num-1), and is
* used by the thread to just manipulate a single sub-array of clients. */
@@ -2959,17 +2970,17 @@ void *IOThreadMain(void *myid) {
while(1) {
/* Wait for start */
for (int j = 0; j < 1000000; j++) {
- if (io_threads_pending[id] != 0) break;
+ if (getIOPendingCount(id) != 0) break;
}
/* Give the main thread a chance to stop this thread. */
- if (io_threads_pending[id] == 0) {
+ if (getIOPendingCount(id) == 0) {
pthread_mutex_lock(&io_threads_mutex[id]);
pthread_mutex_unlock(&io_threads_mutex[id]);
continue;
}
- serverAssert(io_threads_pending[id] != 0);
+ serverAssert(getIOPendingCount(id) != 0);
if (tio_debug) printf("[%ld] %d to handle\n", id, (int)listLength(io_threads_list[id]));
@@ -2989,7 +3000,7 @@ void *IOThreadMain(void *myid) {
}
}
listEmpty(io_threads_list[id]);
- io_threads_pending[id] = 0;
+ setIOPendingCount(id, 0);
if (tio_debug) printf("[%ld] Done\n", id);
}
@@ -3018,7 +3029,7 @@ void initThreadedIO(void) {
/* Things we do only for the additional threads. */
pthread_t tid;
pthread_mutex_init(&io_threads_mutex[i],NULL);
- io_threads_pending[i] = 0;
+ setIOPendingCount(i, 0);
pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
@@ -3124,7 +3135,7 @@ int handleClientsWithPendingWritesUsingThreads(void) {
io_threads_op = IO_THREADS_OP_WRITE;
for (int j = 1; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
- io_threads_pending[j] = count;
+ setIOPendingCount(j, count);
}
/* Also use the main thread to process a slice of clients. */
@@ -3139,7 +3150,7 @@ int handleClientsWithPendingWritesUsingThreads(void) {
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
- pending += io_threads_pending[j];
+ pending += getIOPendingCount(j);
if (pending == 0) break;
}
if (tio_debug) printf("I/O WRITE All threads finshed\n");
@@ -3214,7 +3225,7 @@ int handleClientsWithPendingReadsUsingThreads(void) {
io_threads_op = IO_THREADS_OP_READ;
for (int j = 1; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
- io_threads_pending[j] = count;
+ setIOPendingCount(j, count);
}
/* Also use the main thread to process a slice of clients. */
@@ -3229,7 +3240,7 @@ int handleClientsWithPendingReadsUsingThreads(void) {
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
- pending += io_threads_pending[j];
+ pending += getIOPendingCount(j);
if (pending == 0) break;
}
if (tio_debug) printf("I/O READ All threads finshed\n");
diff --git a/src/redis-benchmark.c b/src/redis-benchmark.c
index a221ebdd2..7a7828184 100644
--- a/src/redis-benchmark.c
+++ b/src/redis-benchmark.c
@@ -76,11 +76,11 @@ static struct config {
int hostport;
const char *hostsocket;
int numclients;
- int liveclients;
+ redisAtomic int liveclients;
int requests;
- int requests_issued;
- int requests_finished;
- int previous_requests_finished;
+ redisAtomic int requests_issued;
+ redisAtomic int requests_finished;
+ redisAtomic int previous_requests_finished;
int last_printed_bytes;
long long previous_tick;
int keysize;
@@ -113,18 +113,12 @@ static struct config {
struct redisConfig *redis_config;
struct hdr_histogram* latency_histogram;
struct hdr_histogram* current_sec_latency_histogram;
- int is_fetching_slots;
- int is_updating_slots;
- int slots_last_update;
+ redisAtomic int is_fetching_slots;
+ redisAtomic int is_updating_slots;
+ redisAtomic int slots_last_update;
int enable_tracking;
- /* Thread mutexes to be used as fallbacks by atomicvar.h */
- pthread_mutex_t requests_issued_mutex;
- pthread_mutex_t requests_finished_mutex;
pthread_mutex_t liveclients_mutex;
- pthread_mutex_t is_fetching_slots_mutex;
pthread_mutex_t is_updating_slots_mutex;
- pthread_mutex_t updating_slots_mutex;
- pthread_mutex_t slots_last_update_mutex;
} config;
typedef struct _client {
@@ -1669,13 +1663,8 @@ int main(int argc, const char **argv) {
fprintf(stderr, "WARN: could not fetch server CONFIG\n");
}
if (config.num_threads > 0) {
- pthread_mutex_init(&(config.requests_issued_mutex), NULL);
- pthread_mutex_init(&(config.requests_finished_mutex), NULL);
pthread_mutex_init(&(config.liveclients_mutex), NULL);
- pthread_mutex_init(&(config.is_fetching_slots_mutex), NULL);
pthread_mutex_init(&(config.is_updating_slots_mutex), NULL);
- pthread_mutex_init(&(config.updating_slots_mutex), NULL);
- pthread_mutex_init(&(config.slots_last_update_mutex), NULL);
}
if (config.keepalive == 0) {
diff --git a/src/replication.c b/src/replication.c
index 3f98b1062..be05254e8 100644
--- a/src/replication.c
+++ b/src/replication.c
@@ -1032,7 +1032,7 @@ void sendBulkToSlave(connection *conn) {
freeClient(slave);
return;
}
- server.stat_net_output_bytes += nwritten;
+ atomicIncr(server.stat_net_output_bytes, nwritten);
sdsrange(slave->replpreamble,nwritten,-1);
if (sdslen(slave->replpreamble) == 0) {
sdsfree(slave->replpreamble);
@@ -1061,7 +1061,7 @@ void sendBulkToSlave(connection *conn) {
return;
}
slave->repldboff += nwritten;
- server.stat_net_output_bytes += nwritten;
+ atomicIncr(server.stat_net_output_bytes, nwritten);
if (slave->repldboff == slave->repldbsize) {
close(slave->repldbfd);
slave->repldbfd = -1;
@@ -1102,7 +1102,7 @@ void rdbPipeWriteHandler(struct connection *conn) {
return;
} else {
slave->repldboff += nwritten;
- server.stat_net_output_bytes += nwritten;
+ atomicIncr(server.stat_net_output_bytes, nwritten);
if (slave->repldboff < server.rdb_pipe_bufflen)
return; /* more data to write.. */
}
@@ -1180,7 +1180,7 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData,
/* Note: when use diskless replication, 'repldboff' is the offset
* of 'rdb_pipe_buff' sent rather than the offset of entire RDB. */
slave->repldboff = nwritten;
- server.stat_net_output_bytes += nwritten;
+ atomicIncr(server.stat_net_output_bytes, nwritten);
}
/* If we were unable to write all the data to one of the replicas,
* setup write handler (and disable pipe read handler, below) */
@@ -1558,7 +1558,7 @@ void readSyncBulkPayload(connection *conn) {
cancelReplicationHandshake(1);
return;
}
- server.stat_net_input_bytes += nread;
+ atomicIncr(server.stat_net_input_bytes, nread);
/* When a mark is used, we want to detect EOF asap in order to avoid
* writing the EOF mark into the file... */
diff --git a/src/server.c b/src/server.c
index e8c51ee86..31e082510 100644
--- a/src/server.c
+++ b/src/server.c
@@ -1764,7 +1764,8 @@ void databasesCron(void) {
void updateCachedTime(int update_daylight_info) {
server.ustime = ustime();
server.mstime = server.ustime / 1000;
- server.unixtime = server.mstime / 1000;
+ time_t unixtime = server.mstime / 1000;
+ atomicSet(server.unixtime, unixtime);
/* To get information about daylight saving time, we need to call
* localtime_r and cache the result. However calling localtime_r in this
@@ -1913,11 +1914,15 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
}
run_with_period(100) {
+ long long stat_net_input_bytes, stat_net_output_bytes;
+ atomicGet(server.stat_net_input_bytes, stat_net_input_bytes);
+ atomicGet(server.stat_net_output_bytes, stat_net_output_bytes);
+
trackInstantaneousMetric(STATS_METRIC_COMMAND,server.stat_numcommands);
trackInstantaneousMetric(STATS_METRIC_NET_INPUT,
- server.stat_net_input_bytes);
+ stat_net_input_bytes);
trackInstantaneousMetric(STATS_METRIC_NET_OUTPUT,
- server.stat_net_output_bytes);
+ stat_net_output_bytes);
}
/* We have just LRU_BITS bits per object for LRU information.
@@ -1931,7 +1936,8 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
*
* Note that you can change the resolution altering the
* LRU_CLOCK_RESOLUTION define. */
- server.lruclock = getLRUClock();
+ unsigned int lruclock = getLRUClock();
+ atomicSet(server.lruclock,lruclock);
cronUpdateMemoryStats();
@@ -2443,7 +2449,8 @@ void initServerConfig(void) {
server.next_client_id = 1; /* Client IDs, start from 1 .*/
server.loading_process_events_interval_bytes = (1024*1024*2);
- server.lruclock = getLRUClock();
+ unsigned int lruclock = getLRUClock();
+ atomicSet(server.lruclock,lruclock);
resetServerSaveParams();
appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
@@ -2846,9 +2853,9 @@ void resetServerStats(void) {
server.stat_sync_partial_ok = 0;
server.stat_sync_partial_err = 0;
server.stat_io_reads_processed = 0;
- server.stat_total_reads_processed = 0;
+ atomicSet(server.stat_total_reads_processed, 0);
server.stat_io_writes_processed = 0;
- server.stat_total_writes_processed = 0;
+ atomicSet(server.stat_total_writes_processed, 0);
for (j = 0; j < STATS_METRIC_COUNT; j++) {
server.inst_metric[j].idx = 0;
server.inst_metric[j].last_sample_time = mstime();
@@ -2856,8 +2863,8 @@ void resetServerStats(void) {
memset(server.inst_metric[j].samples,0,
sizeof(server.inst_metric[j].samples));
}
- server.stat_net_input_bytes = 0;
- server.stat_net_output_bytes = 0;
+ atomicSet(server.stat_net_input_bytes, 0);
+ atomicSet(server.stat_net_output_bytes, 0);
server.stat_unexpected_error_replies = 0;
server.aof_delayed_fsync = 0;
server.blocked_last_cron = 0;
@@ -4186,6 +4193,8 @@ sds genRedisInfoString(const char *section) {
call_uname = 0;
}
+ unsigned int lruclock;
+ atomicGet(server.lruclock,lruclock);
info = sdscatfmt(info,
"# Server\r\n"
"redis_version:%s\r\n"
@@ -4230,7 +4239,7 @@ sds genRedisInfoString(const char *section) {
(int64_t)(uptime/(3600*24)),
server.hz,
server.config_hz,
- server.lruclock,
+ lruclock,
server.executable ? server.executable : "",
server.configfile ? server.configfile : "",
server.io_threads_active);
@@ -4472,6 +4481,13 @@ sds genRedisInfoString(const char *section) {
/* Stats */
if (allsections || defsections || !strcasecmp(section,"stats")) {
+ long long stat_total_reads_processed, stat_total_writes_processed;
+ long long stat_net_input_bytes, stat_net_output_bytes;
+ atomicGet(server.stat_total_reads_processed, stat_total_reads_processed);
+ atomicGet(server.stat_total_writes_processed, stat_total_writes_processed);
+ atomicGet(server.stat_net_input_bytes, stat_net_input_bytes);
+ atomicGet(server.stat_net_output_bytes, stat_net_output_bytes);
+
if (sections++) info = sdscat(info,"\r\n");
info = sdscatprintf(info,
"# Stats\r\n"
@@ -4513,8 +4529,8 @@ sds genRedisInfoString(const char *section) {
server.stat_numconnections,
server.stat_numcommands,
getInstantaneousMetric(STATS_METRIC_COMMAND),
- server.stat_net_input_bytes,
- server.stat_net_output_bytes,
+ stat_net_input_bytes,
+ stat_net_output_bytes,
(float)getInstantaneousMetric(STATS_METRIC_NET_INPUT)/1024,
(float)getInstantaneousMetric(STATS_METRIC_NET_OUTPUT)/1024,
server.stat_rejected_conn,
@@ -4541,8 +4557,8 @@ sds genRedisInfoString(const char *section) {
(unsigned long long) trackingGetTotalItems(),
(unsigned long long) trackingGetTotalPrefixes(),
server.stat_unexpected_error_replies,
- server.stat_total_reads_processed,
- server.stat_total_writes_processed,
+ stat_total_reads_processed,
+ stat_total_writes_processed,
server.stat_io_reads_processed,
server.stat_io_writes_processed);
}
diff --git a/src/server.h b/src/server.h
index 8cba2f9b5..cac8544b9 100644
--- a/src/server.h
+++ b/src/server.h
@@ -34,6 +34,7 @@
#include "config.h"
#include "solarisfixes.h"
#include "rio.h"
+#include "atomicvar.h"
#include <stdio.h>
#include <stdlib.h>
@@ -511,8 +512,10 @@ typedef void (*moduleTypeDigestFunc)(struct RedisModuleDigest *digest, void *val
typedef size_t (*moduleTypeMemUsageFunc)(const void *value);
typedef void (*moduleTypeFreeFunc)(void *value);
-/* A callback that is called when the client authentication changes. This
- * needs to be exposed since you can't cast a function pointer to (void *) */
+/* This callback type is called by moduleNotifyUserChanged() every time
+ * a user authenticated via the module API is associated with a different
+ * user or gets disconnected. This needs to be exposed since you can't cast
+ * a function pointer to (void *). */
typedef void (*RedisModuleUserChangedFunc) (uint64_t client_id, void *privdata);
@@ -1063,7 +1066,7 @@ struct redisServer {
dict *commands; /* Command table */
dict *orig_commands; /* Command table before command renaming. */
aeEventLoop *el;
- _Atomic unsigned int lruclock; /* Clock for LRU eviction */
+ redisAtomic unsigned int lruclock; /* Clock for LRU eviction */
int shutdown_asap; /* SHUTDOWN needed ASAP */
int activerehashing; /* Incremental rehash in serverCron() */
int active_defrag_running; /* Active defragmentation running (holds current scan aggressiveness) */
@@ -1111,7 +1114,7 @@ struct redisServer {
mstime_t clients_pause_end_time; /* Time when we undo clients_paused */
char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */
dict *migrate_cached_sockets;/* MIGRATE cached sockets */
- _Atomic uint64_t next_client_id; /* Next client unique ID. Incremental. */
+ redisAtomic uint64_t next_client_id; /* Next client unique ID. Incremental. */
int protected_mode; /* Don't accept external connections. */
int gopher_enabled; /* If true the server will reply to gopher
queries. Will still serve RESP2 queries. */
@@ -1160,8 +1163,8 @@ struct redisServer {
long long slowlog_log_slower_than; /* SLOWLOG time limit (to get logged) */
unsigned long slowlog_max_len; /* SLOWLOG max number of items logged */
struct malloc_stats cron_malloc_stats; /* sampled in serverCron(). */
- _Atomic long long stat_net_input_bytes; /* Bytes read from network. */
- _Atomic long long stat_net_output_bytes; /* Bytes written to network. */
+ redisAtomic long long stat_net_input_bytes; /* Bytes read from network. */
+ redisAtomic long long stat_net_output_bytes; /* Bytes written to network. */
size_t stat_rdb_cow_bytes; /* Copy on write bytes during RDB saving. */
size_t stat_aof_cow_bytes; /* Copy on write bytes during AOF rewrite. */
size_t stat_module_cow_bytes; /* Copy on write bytes during module fork. */
@@ -1169,8 +1172,8 @@ struct redisServer {
long long stat_unexpected_error_replies; /* Number of unexpected (aof-loading, replica to master, etc.) error replies */
long long stat_io_reads_processed; /* Number of read events processed by IO / Main threads */
long long stat_io_writes_processed; /* Number of write events processed by IO / Main threads */
- _Atomic long long stat_total_reads_processed; /* Total number of read events processed */
- _Atomic long long stat_total_writes_processed; /* Total number of write events processed */
+ redisAtomic long long stat_total_reads_processed; /* Total number of read events processed */
+ redisAtomic long long stat_total_writes_processed; /* Total number of write events processed */
/* The following two are used to track instantaneous metrics, like
* number of operations per second, network traffic. */
struct {
@@ -1193,7 +1196,7 @@ struct redisServer {
int active_defrag_cycle_min; /* minimal effort for defrag in CPU percentage */
int active_defrag_cycle_max; /* maximal effort for defrag in CPU percentage */
unsigned long active_defrag_max_scan_fields; /* maximum number of fields of set/hash/zset/list to process from within the main dict scan */
- _Atomic size_t client_max_querybuf_len; /* Limit for client query buffer length */
+ size_t client_max_querybuf_len; /* Limit for client query buffer length */
int dbnum; /* Total number of configured DBs */
int supervised; /* 1 if supervised, 0 otherwise. */
int supervised_mode; /* See SUPERVISED_* */
@@ -1393,7 +1396,7 @@ struct redisServer {
int list_max_ziplist_size;
int list_compress_depth;
/* time cache */
- _Atomic time_t unixtime; /* Unix time sampled every cron cycle. */
+ redisAtomic time_t unixtime; /* Unix time sampled every cron cycle. */
time_t timezone; /* Cached timezone. As set by tzset(). */
int daylight_active; /* Currently in daylight saving time. */
mstime_t mstime; /* 'unixtime' in milliseconds. */
diff --git a/src/zmalloc.c b/src/zmalloc.c
index e36319241..cefa39aa8 100644
--- a/src/zmalloc.c
+++ b/src/zmalloc.c
@@ -74,8 +74,7 @@ void zlibc_free(void *ptr) {
#define update_zmalloc_stat_alloc(__n) atomicIncr(used_memory,(__n))
#define update_zmalloc_stat_free(__n) atomicDecr(used_memory,(__n))
-static size_t used_memory = 0;
-pthread_mutex_t used_memory_mutex = PTHREAD_MUTEX_INITIALIZER;
+static redisAtomic size_t used_memory = 0;
static void zmalloc_default_oom(size_t size) {
fprintf(stderr, "zmalloc: Out of memory trying to allocate %zu bytes\n",