summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.github/workflows/ci.yml19
-rw-r--r--.github/workflows/daily.yml29
-rw-r--r--src/Makefile14
-rw-r--r--src/atomicvar.h109
-rw-r--r--src/evict.c2
-rw-r--r--src/lazyfree.c3
-rw-r--r--src/module.c5
-rw-r--r--src/networking.c41
-rw-r--r--src/redis-benchmark.c25
-rw-r--r--src/replication.c10
-rw-r--r--src/server.c44
-rw-r--r--src/server.h23
-rw-r--r--src/zmalloc.c3
13 files changed, 207 insertions, 120 deletions
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 4d6c1c14c..d28d331db 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -47,3 +47,22 @@ jobs:
- name: make
run: make MALLOC=libc
+ build-centos7-jemalloc:
+ runs-on: ubuntu-latest
+ container: centos:7
+ steps:
+ - uses: actions/checkout@v2
+ - name: make
+ run: |
+ yum -y install gcc make
+ make
+
+ build-centos6-jemalloc:
+ runs-on: ubuntu-latest
+ container: centos:6
+ steps:
+ - uses: actions/checkout@v1
+ - name: make
+ run: |
+ yum -y install gcc make
+ make
diff --git a/.github/workflows/daily.yml b/.github/workflows/daily.yml
index 087b9f2ef..1ccd99805 100644
--- a/.github/workflows/daily.yml
+++ b/.github/workflows/daily.yml
@@ -124,12 +124,33 @@ jobs:
- uses: actions/checkout@v2
- name: make
run: |
- yum -y install centos-release-scl
- yum -y install devtoolset-7
- scl enable devtoolset-7 "make"
+ yum -y install gcc make
+ make
- name: test
run: |
- yum -y install tcl
+ yum -y install which tcl
+ ./runtest --accurate --verbose
+ - name: module api test
+ run: ./runtest-moduleapi --verbose
+ - name: sentinel tests
+ run: ./runtest-sentinel
+ - name: cluster tests
+ run: ./runtest-cluster
+
+ test-centos6-jemalloc:
+ runs-on: ubuntu-latest
+ if: github.repository == 'redis/redis'
+ container: centos:6
+ timeout-minutes: 14400
+ steps:
+ - uses: actions/checkout@v1
+ - name: make
+ run: |
+ yum -y install gcc make
+ make
+ - name: test
+ run: |
+ yum -y install which tcl util-linux-ng
./runtest --accurate --verbose
- name: module api test
run: ./runtest-moduleapi --verbose
diff --git a/src/Makefile b/src/Makefile
index 9ff344c6a..6414b8c78 100644
--- a/src/Makefile
+++ b/src/Makefile
@@ -20,7 +20,7 @@ DEPENDENCY_TARGETS=hiredis linenoise lua hdr_histogram
NODEPS:=clean distclean
# Default settings
-STD=-std=c11 -pedantic -DREDIS_STATIC=''
+STD=-pedantic -DREDIS_STATIC=''
ifneq (,$(findstring clang,$(CC)))
ifneq (,$(findstring FreeBSD,$(uname_S)))
STD+=-Wno-c11-extensions
@@ -29,6 +29,16 @@ endif
WARN=-Wall -W -Wno-missing-field-initializers
OPT=$(OPTIMIZATION)
+# Detect if the compiler supports C11 _Atomic
+C11_ATOMIC := $(shell sh -c 'echo "\#include <stdatomic.h>" > foo.c; \
+ $(CC) -std=c11 -c foo.c -o foo.o &> /dev/null; \
+ if [ -f foo.o ]; then echo "yes"; rm foo.o; fi; rm foo.c')
+ifeq ($(C11_ATOMIC),yes)
+ STD+=-std=c11
+else
+ STD+=-std=c99
+endif
+
PREFIX?=/usr/local
INSTALL_BIN=$(PREFIX)/bin
INSTALL=install
@@ -367,7 +377,7 @@ valgrind:
$(MAKE) OPTIMIZATION="-O0" MALLOC="libc"
helgrind:
- $(MAKE) OPTIMIZATION="-O0" MALLOC="libc" CFLAGS="-D__ATOMIC_VAR_FORCE_SYNC_MACROS"
+ $(MAKE) OPTIMIZATION="-O0" MALLOC="libc" CFLAGS="-D__ATOMIC_VAR_FORCE_SYNC_MACROS" REDIS_CFLAGS="-I/usr/local/include" REDIS_LDFLAGS="-L/usr/local/lib"
src/help.h:
@../utils/generate-command-help.rb > help.h
diff --git a/src/atomicvar.h b/src/atomicvar.h
index ecd26ad70..6ac04c605 100644
--- a/src/atomicvar.h
+++ b/src/atomicvar.h
@@ -1,5 +1,5 @@
-/* This file implements atomic counters using __atomic or __sync macros if
- * available, otherwise synchronizing different threads using a mutex.
+/* This file implements atomic counters using c11 _Atomic, __atomic or __sync
+ * macros if available, otherwise we will throw an error when compile.
*
* The exported interface is composed of three macros:
*
@@ -8,16 +8,8 @@
* atomicDecr(var,count) -- Decrement the atomic counter
* atomicGet(var,dstvar) -- Fetch the atomic counter value
* atomicSet(var,value) -- Set the atomic counter value
- *
- * The variable 'var' should also have a declared mutex with the same
- * name and the "_mutex" postfix, for instance:
- *
- * long myvar;
- * pthread_mutex_t myvar_mutex;
- * atomicSet(myvar,12345);
- *
- * If atomic primitives are available (tested in config.h) the mutex
- * is not used.
+ * atomicGetWithSync(var,value) -- 'atomicGet' with inter-thread synchronization
+ * atomicSetWithSync(var,value) -- 'atomicSet' with inter-thread synchronization
*
* Never use return value from the macros, instead use the AtomicGetIncr()
* if you need to get the current value and increment it atomically, like
@@ -58,17 +50,64 @@
*/
#include <pthread.h>
+#include "config.h"
#ifndef __ATOMIC_VAR_H
#define __ATOMIC_VAR_H
+/* Define redisAtomic for atomic variable. */
+#define redisAtomic
+
/* To test Redis with Helgrind (a Valgrind tool) it is useful to define
* the following macro, so that __sync macros are used: those can be detected
* by Helgrind (even if they are less efficient) so that no false positive
* is reported. */
// #define __ATOMIC_VAR_FORCE_SYNC_MACROS
-#if !defined(__ATOMIC_VAR_FORCE_SYNC_MACROS) && defined(__ATOMIC_RELAXED) && !defined(__sun) && (!defined(__clang__) || !defined(__APPLE__) || __apple_build_version__ > 4210057)
+/* There will be many false positives if we test Redis with Helgrind, since
+ * Helgrind can't understand we have imposed ordering on the program, so
+ * we use macros in helgrind.h to tell Helgrind inter-thread happens-before
+ * relationship explicitly for avoiding false positives.
+ *
+ * For more details, please see: valgrind/helgrind.h and
+ * https://www.valgrind.org/docs/manual/hg-manual.html#hg-manual.effective-use
+ *
+ * These macros take effect only when 'make helgrind', and you must first
+ * install Valgrind in the default path configuration. */
+#ifdef __ATOMIC_VAR_FORCE_SYNC_MACROS
+#include <valgrind/helgrind.h>
+#else
+#define ANNOTATE_HAPPENS_BEFORE(v) ((void) v)
+#define ANNOTATE_HAPPENS_AFTER(v) ((void) v)
+#endif
+
+#if !defined(__ATOMIC_VAR_FORCE_SYNC_MACROS) && defined(__STDC_VERSION__) && \
+ (__STDC_VERSION__ >= 201112L) && !defined(__STDC_NO_ATOMICS__)
+/* Use '_Atomic' keyword if the compiler supports. */
+#undef redisAtomic
+#define redisAtomic _Atomic
+/* Implementation using _Atomic in C11. */
+
+#include <stdatomic.h>
+#define atomicIncr(var,count) atomic_fetch_add_explicit(&var,(count),memory_order_relaxed)
+#define atomicGetIncr(var,oldvalue_var,count) do { \
+ oldvalue_var = atomic_fetch_add_explicit(&var,(count),memory_order_relaxed); \
+} while(0)
+#define atomicDecr(var,count) atomic_fetch_sub_explicit(&var,(count),memory_order_relaxed)
+#define atomicGet(var,dstvar) do { \
+ dstvar = atomic_load_explicit(&var,memory_order_relaxed); \
+} while(0)
+#define atomicSet(var,value) atomic_store_explicit(&var,value,memory_order_relaxed)
+#define atomicGetWithSync(var,dstvar) do { \
+ dstvar = atomic_load_explicit(&var,memory_order_seq_cst); \
+} while(0)
+#define atomicSetWithSync(var,value) \
+ atomic_store_explicit(&var,value,memory_order_seq_cst)
+#define REDIS_ATOMIC_API "c11-builtin"
+
+#elif !defined(__ATOMIC_VAR_FORCE_SYNC_MACROS) && !defined(__sun) && \
+ (!defined(__clang__) || !defined(__APPLE__) || __apple_build_version__ > 4210057) && \
+ defined(__ATOMIC_RELAXED) && defined(__ATOMIC_SEQ_CST)
/* Implementation using __atomic macros. */
#define atomicIncr(var,count) __atomic_add_fetch(&var,(count),__ATOMIC_RELAXED)
@@ -80,6 +119,11 @@
dstvar = __atomic_load_n(&var,__ATOMIC_RELAXED); \
} while(0)
#define atomicSet(var,value) __atomic_store_n(&var,value,__ATOMIC_RELAXED)
+#define atomicGetWithSync(var,dstvar) do { \
+ dstvar = __atomic_load_n(&var,__ATOMIC_SEQ_CST); \
+} while(0)
+#define atomicSetWithSync(var,value) \
+ __atomic_store_n(&var,value,__ATOMIC_SEQ_CST)
#define REDIS_ATOMIC_API "atomic-builtin"
#elif defined(HAVE_ATOMIC)
@@ -96,38 +140,19 @@
#define atomicSet(var,value) do { \
while(!__sync_bool_compare_and_swap(&var,var,value)); \
} while(0)
+/* Actually the builtin issues a full memory barrier by default. */
+#define atomicGetWithSync(var,dstvar) { \
+ dstvar = __sync_sub_and_fetch(&var,0,__sync_synchronize); \
+ ANNOTATE_HAPPENS_AFTER(&var); \
+} while(0)
+#define atomicSetWithSync(var,value) do { \
+ ANNOTATE_HAPPENS_BEFORE(&var); \
+ while(!__sync_bool_compare_and_swap(&var,var,value,__sync_synchronize)); \
+} while(0)
#define REDIS_ATOMIC_API "sync-builtin"
#else
-/* Implementation using pthread mutex. */
-
-#define atomicIncr(var,count) do { \
- pthread_mutex_lock(&var ## _mutex); \
- var += (count); \
- pthread_mutex_unlock(&var ## _mutex); \
-} while(0)
-#define atomicGetIncr(var,oldvalue_var,count) do { \
- pthread_mutex_lock(&var ## _mutex); \
- oldvalue_var = var; \
- var += (count); \
- pthread_mutex_unlock(&var ## _mutex); \
-} while(0)
-#define atomicDecr(var,count) do { \
- pthread_mutex_lock(&var ## _mutex); \
- var -= (count); \
- pthread_mutex_unlock(&var ## _mutex); \
-} while(0)
-#define atomicGet(var,dstvar) do { \
- pthread_mutex_lock(&var ## _mutex); \
- dstvar = var; \
- pthread_mutex_unlock(&var ## _mutex); \
-} while(0)
-#define atomicSet(var,value) do { \
- pthread_mutex_lock(&var ## _mutex); \
- var = value; \
- pthread_mutex_unlock(&var ## _mutex); \
-} while(0)
-#define REDIS_ATOMIC_API "pthread-mutex"
+#error "Unable to determine atomic operations for your platform"
#endif
#endif /* __ATOMIC_VAR_H */
diff --git a/src/evict.c b/src/evict.c
index 258e1a25e..0e8f818ff 100644
--- a/src/evict.c
+++ b/src/evict.c
@@ -79,7 +79,7 @@ unsigned int getLRUClock(void) {
unsigned int LRU_CLOCK(void) {
unsigned int lruclock;
if (1000/server.hz <= LRU_CLOCK_RESOLUTION) {
- lruclock = server.lruclock;
+ atomicGet(server.lruclock,lruclock);
} else {
lruclock = getLRUClock();
}
diff --git a/src/lazyfree.c b/src/lazyfree.c
index 821dc50df..b0fc26fcf 100644
--- a/src/lazyfree.c
+++ b/src/lazyfree.c
@@ -3,8 +3,7 @@
#include "atomicvar.h"
#include "cluster.h"
-static size_t lazyfree_objects = 0;
-pthread_mutex_t lazyfree_objects_mutex = PTHREAD_MUTEX_INITIALIZER;
+static redisAtomic size_t lazyfree_objects = 0;
/* Return the number of currently pending objects to free. */
size_t lazyfreeGetPendingObjectsCount(void) {
diff --git a/src/module.c b/src/module.c
index 315161ed9..a501a4830 100644
--- a/src/module.c
+++ b/src/module.c
@@ -357,11 +357,6 @@ unsigned long long ModulesInHooks = 0; /* Total number of modules in hooks
/* Data structures related to the redis module users */
-/* This callback type is called by moduleNotifyUserChanged() every time
- * a user authenticated via the module API is associated with a different
- * user or gets disconnected. */
-typedef void (*RedisModuleUserChangedFunc) (uint64_t client_id, void *privdata);
-
/* This is the object returned by RM_CreateModuleUser(). The module API is
* able to create users, set ACLs to such users, and later authenticate
* clients using such newly created users. */
diff --git a/src/networking.c b/src/networking.c
index 6becdc288..3fa298783 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -103,7 +103,8 @@ client *createClient(connection *conn) {
}
selectDb(c,0);
- uint64_t client_id = ++server.next_client_id;
+ uint64_t client_id;
+ atomicGetIncr(server.next_client_id, client_id, 1);
c->id = client_id;
c->resp = 2;
c->conn = conn;
@@ -1314,7 +1315,7 @@ client *lookupClientByID(uint64_t id) {
* thread safe. */
int writeToClient(client *c, int handler_installed) {
/* Update total number of writes on server */
- server.stat_total_writes_processed++;
+ atomicIncr(server.stat_total_writes_processed, 1);
ssize_t nwritten = 0, totwritten = 0;
size_t objlen;
@@ -1376,7 +1377,7 @@ int writeToClient(client *c, int handler_installed) {
zmalloc_used_memory() < server.maxmemory) &&
!(c->flags & CLIENT_SLAVE)) break;
}
- server.stat_net_output_bytes += totwritten;
+ atomicIncr(server.stat_net_output_bytes, totwritten);
if (nwritten == -1) {
if (connGetState(c->conn) == CONN_STATE_CONNECTED) {
nwritten = 0;
@@ -1933,7 +1934,7 @@ void readQueryFromClient(connection *conn) {
if (postponeClientRead(c)) return;
/* Update total number of reads on server */
- server.stat_total_reads_processed++;
+ atomicIncr(server.stat_total_reads_processed, 1);
readlen = PROTO_IOBUF_LEN;
/* If this is a multi bulk request, and we are processing a bulk reply
@@ -1979,7 +1980,7 @@ void readQueryFromClient(connection *conn) {
sdsIncrLen(c->querybuf,nread);
c->lastinteraction = server.unixtime;
if (c->flags & CLIENT_MASTER) c->read_reploff += nread;
- server.stat_net_input_bytes += nread;
+ atomicIncr(server.stat_net_input_bytes, nread);
if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
@@ -2938,7 +2939,7 @@ int tio_debug = 0;
pthread_t io_threads[IO_THREADS_MAX_NUM];
pthread_mutex_t io_threads_mutex[IO_THREADS_MAX_NUM];
-_Atomic unsigned long io_threads_pending[IO_THREADS_MAX_NUM];
+redisAtomic unsigned long io_threads_pending[IO_THREADS_MAX_NUM];
int io_threads_op; /* IO_THREADS_OP_WRITE or IO_THREADS_OP_READ. */
/* This is the list of clients each thread will serve when threaded I/O is
@@ -2946,6 +2947,16 @@ int io_threads_op; /* IO_THREADS_OP_WRITE or IO_THREADS_OP_READ. */
* itself. */
list *io_threads_list[IO_THREADS_MAX_NUM];
+static inline unsigned long getIOPendingCount(int i) {
+ unsigned long count = 0;
+ atomicGetWithSync(io_threads_pending[i], count);
+ return count;
+}
+
+static inline void setIOPendingCount(int i, unsigned long count) {
+ atomicSetWithSync(io_threads_pending[i], count);
+}
+
void *IOThreadMain(void *myid) {
/* The ID is the thread number (from 0 to server.iothreads_num-1), and is
* used by the thread to just manipulate a single sub-array of clients. */
@@ -2959,17 +2970,17 @@ void *IOThreadMain(void *myid) {
while(1) {
/* Wait for start */
for (int j = 0; j < 1000000; j++) {
- if (io_threads_pending[id] != 0) break;
+ if (getIOPendingCount(id) != 0) break;
}
/* Give the main thread a chance to stop this thread. */
- if (io_threads_pending[id] == 0) {
+ if (getIOPendingCount(id) == 0) {
pthread_mutex_lock(&io_threads_mutex[id]);
pthread_mutex_unlock(&io_threads_mutex[id]);
continue;
}
- serverAssert(io_threads_pending[id] != 0);
+ serverAssert(getIOPendingCount(id) != 0);
if (tio_debug) printf("[%ld] %d to handle\n", id, (int)listLength(io_threads_list[id]));
@@ -2989,7 +3000,7 @@ void *IOThreadMain(void *myid) {
}
}
listEmpty(io_threads_list[id]);
- io_threads_pending[id] = 0;
+ setIOPendingCount(id, 0);
if (tio_debug) printf("[%ld] Done\n", id);
}
@@ -3018,7 +3029,7 @@ void initThreadedIO(void) {
/* Things we do only for the additional threads. */
pthread_t tid;
pthread_mutex_init(&io_threads_mutex[i],NULL);
- io_threads_pending[i] = 0;
+ setIOPendingCount(i, 0);
pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
@@ -3124,7 +3135,7 @@ int handleClientsWithPendingWritesUsingThreads(void) {
io_threads_op = IO_THREADS_OP_WRITE;
for (int j = 1; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
- io_threads_pending[j] = count;
+ setIOPendingCount(j, count);
}
/* Also use the main thread to process a slice of clients. */
@@ -3139,7 +3150,7 @@ int handleClientsWithPendingWritesUsingThreads(void) {
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
- pending += io_threads_pending[j];
+ pending += getIOPendingCount(j);
if (pending == 0) break;
}
if (tio_debug) printf("I/O WRITE All threads finshed\n");
@@ -3214,7 +3225,7 @@ int handleClientsWithPendingReadsUsingThreads(void) {
io_threads_op = IO_THREADS_OP_READ;
for (int j = 1; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
- io_threads_pending[j] = count;
+ setIOPendingCount(j, count);
}
/* Also use the main thread to process a slice of clients. */
@@ -3229,7 +3240,7 @@ int handleClientsWithPendingReadsUsingThreads(void) {
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
- pending += io_threads_pending[j];
+ pending += getIOPendingCount(j);
if (pending == 0) break;
}
if (tio_debug) printf("I/O READ All threads finshed\n");
diff --git a/src/redis-benchmark.c b/src/redis-benchmark.c
index a221ebdd2..7a7828184 100644
--- a/src/redis-benchmark.c
+++ b/src/redis-benchmark.c
@@ -76,11 +76,11 @@ static struct config {
int hostport;
const char *hostsocket;
int numclients;
- int liveclients;
+ redisAtomic int liveclients;
int requests;
- int requests_issued;
- int requests_finished;
- int previous_requests_finished;
+ redisAtomic int requests_issued;
+ redisAtomic int requests_finished;
+ redisAtomic int previous_requests_finished;
int last_printed_bytes;
long long previous_tick;
int keysize;
@@ -113,18 +113,12 @@ static struct config {
struct redisConfig *redis_config;
struct hdr_histogram* latency_histogram;
struct hdr_histogram* current_sec_latency_histogram;
- int is_fetching_slots;
- int is_updating_slots;
- int slots_last_update;
+ redisAtomic int is_fetching_slots;
+ redisAtomic int is_updating_slots;
+ redisAtomic int slots_last_update;
int enable_tracking;
- /* Thread mutexes to be used as fallbacks by atomicvar.h */
- pthread_mutex_t requests_issued_mutex;
- pthread_mutex_t requests_finished_mutex;
pthread_mutex_t liveclients_mutex;
- pthread_mutex_t is_fetching_slots_mutex;
pthread_mutex_t is_updating_slots_mutex;
- pthread_mutex_t updating_slots_mutex;
- pthread_mutex_t slots_last_update_mutex;
} config;
typedef struct _client {
@@ -1669,13 +1663,8 @@ int main(int argc, const char **argv) {
fprintf(stderr, "WARN: could not fetch server CONFIG\n");
}
if (config.num_threads > 0) {
- pthread_mutex_init(&(config.requests_issued_mutex), NULL);
- pthread_mutex_init(&(config.requests_finished_mutex), NULL);
pthread_mutex_init(&(config.liveclients_mutex), NULL);
- pthread_mutex_init(&(config.is_fetching_slots_mutex), NULL);
pthread_mutex_init(&(config.is_updating_slots_mutex), NULL);
- pthread_mutex_init(&(config.updating_slots_mutex), NULL);
- pthread_mutex_init(&(config.slots_last_update_mutex), NULL);
}
if (config.keepalive == 0) {
diff --git a/src/replication.c b/src/replication.c
index 3f98b1062..be05254e8 100644
--- a/src/replication.c
+++ b/src/replication.c
@@ -1032,7 +1032,7 @@ void sendBulkToSlave(connection *conn) {
freeClient(slave);
return;
}
- server.stat_net_output_bytes += nwritten;
+ atomicIncr(server.stat_net_output_bytes, nwritten);
sdsrange(slave->replpreamble,nwritten,-1);
if (sdslen(slave->replpreamble) == 0) {
sdsfree(slave->replpreamble);
@@ -1061,7 +1061,7 @@ void sendBulkToSlave(connection *conn) {
return;
}
slave->repldboff += nwritten;
- server.stat_net_output_bytes += nwritten;
+ atomicIncr(server.stat_net_output_bytes, nwritten);
if (slave->repldboff == slave->repldbsize) {
close(slave->repldbfd);
slave->repldbfd = -1;
@@ -1102,7 +1102,7 @@ void rdbPipeWriteHandler(struct connection *conn) {
return;
} else {
slave->repldboff += nwritten;
- server.stat_net_output_bytes += nwritten;
+ atomicIncr(server.stat_net_output_bytes, nwritten);
if (slave->repldboff < server.rdb_pipe_bufflen)
return; /* more data to write.. */
}
@@ -1180,7 +1180,7 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData,
/* Note: when use diskless replication, 'repldboff' is the offset
* of 'rdb_pipe_buff' sent rather than the offset of entire RDB. */
slave->repldboff = nwritten;
- server.stat_net_output_bytes += nwritten;
+ atomicIncr(server.stat_net_output_bytes, nwritten);
}
/* If we were unable to write all the data to one of the replicas,
* setup write handler (and disable pipe read handler, below) */
@@ -1558,7 +1558,7 @@ void readSyncBulkPayload(connection *conn) {
cancelReplicationHandshake(1);
return;
}
- server.stat_net_input_bytes += nread;
+ atomicIncr(server.stat_net_input_bytes, nread);
/* When a mark is used, we want to detect EOF asap in order to avoid
* writing the EOF mark into the file... */
diff --git a/src/server.c b/src/server.c
index e8c51ee86..31e082510 100644
--- a/src/server.c
+++ b/src/server.c
@@ -1764,7 +1764,8 @@ void databasesCron(void) {
void updateCachedTime(int update_daylight_info) {
server.ustime = ustime();
server.mstime = server.ustime / 1000;
- server.unixtime = server.mstime / 1000;
+ time_t unixtime = server.mstime / 1000;
+ atomicSet(server.unixtime, unixtime);
/* To get information about daylight saving time, we need to call
* localtime_r and cache the result. However calling localtime_r in this
@@ -1913,11 +1914,15 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
}
run_with_period(100) {
+ long long stat_net_input_bytes, stat_net_output_bytes;
+ atomicGet(server.stat_net_input_bytes, stat_net_input_bytes);
+ atomicGet(server.stat_net_output_bytes, stat_net_output_bytes);
+
trackInstantaneousMetric(STATS_METRIC_COMMAND,server.stat_numcommands);
trackInstantaneousMetric(STATS_METRIC_NET_INPUT,
- server.stat_net_input_bytes);
+ stat_net_input_bytes);
trackInstantaneousMetric(STATS_METRIC_NET_OUTPUT,
- server.stat_net_output_bytes);
+ stat_net_output_bytes);
}
/* We have just LRU_BITS bits per object for LRU information.
@@ -1931,7 +1936,8 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
*
* Note that you can change the resolution altering the
* LRU_CLOCK_RESOLUTION define. */
- server.lruclock = getLRUClock();
+ unsigned int lruclock = getLRUClock();
+ atomicSet(server.lruclock,lruclock);
cronUpdateMemoryStats();
@@ -2443,7 +2449,8 @@ void initServerConfig(void) {
server.next_client_id = 1; /* Client IDs, start from 1 .*/
server.loading_process_events_interval_bytes = (1024*1024*2);
- server.lruclock = getLRUClock();
+ unsigned int lruclock = getLRUClock();
+ atomicSet(server.lruclock,lruclock);
resetServerSaveParams();
appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
@@ -2846,9 +2853,9 @@ void resetServerStats(void) {
server.stat_sync_partial_ok = 0;
server.stat_sync_partial_err = 0;
server.stat_io_reads_processed = 0;
- server.stat_total_reads_processed = 0;
+ atomicSet(server.stat_total_reads_processed, 0);
server.stat_io_writes_processed = 0;
- server.stat_total_writes_processed = 0;
+ atomicSet(server.stat_total_writes_processed, 0);
for (j = 0; j < STATS_METRIC_COUNT; j++) {
server.inst_metric[j].idx = 0;
server.inst_metric[j].last_sample_time = mstime();
@@ -2856,8 +2863,8 @@ void resetServerStats(void) {
memset(server.inst_metric[j].samples,0,
sizeof(server.inst_metric[j].samples));
}
- server.stat_net_input_bytes = 0;
- server.stat_net_output_bytes = 0;
+ atomicSet(server.stat_net_input_bytes, 0);
+ atomicSet(server.stat_net_output_bytes, 0);
server.stat_unexpected_error_replies = 0;
server.aof_delayed_fsync = 0;
server.blocked_last_cron = 0;
@@ -4186,6 +4193,8 @@ sds genRedisInfoString(const char *section) {
call_uname = 0;
}
+ unsigned int lruclock;
+ atomicGet(server.lruclock,lruclock);
info = sdscatfmt(info,
"# Server\r\n"
"redis_version:%s\r\n"
@@ -4230,7 +4239,7 @@ sds genRedisInfoString(const char *section) {
(int64_t)(uptime/(3600*24)),
server.hz,
server.config_hz,
- server.lruclock,
+ lruclock,
server.executable ? server.executable : "",
server.configfile ? server.configfile : "",
server.io_threads_active);
@@ -4472,6 +4481,13 @@ sds genRedisInfoString(const char *section) {
/* Stats */
if (allsections || defsections || !strcasecmp(section,"stats")) {
+ long long stat_total_reads_processed, stat_total_writes_processed;
+ long long stat_net_input_bytes, stat_net_output_bytes;
+ atomicGet(server.stat_total_reads_processed, stat_total_reads_processed);
+ atomicGet(server.stat_total_writes_processed, stat_total_writes_processed);
+ atomicGet(server.stat_net_input_bytes, stat_net_input_bytes);
+ atomicGet(server.stat_net_output_bytes, stat_net_output_bytes);
+
if (sections++) info = sdscat(info,"\r\n");
info = sdscatprintf(info,
"# Stats\r\n"
@@ -4513,8 +4529,8 @@ sds genRedisInfoString(const char *section) {
server.stat_numconnections,
server.stat_numcommands,
getInstantaneousMetric(STATS_METRIC_COMMAND),
- server.stat_net_input_bytes,
- server.stat_net_output_bytes,
+ stat_net_input_bytes,
+ stat_net_output_bytes,
(float)getInstantaneousMetric(STATS_METRIC_NET_INPUT)/1024,
(float)getInstantaneousMetric(STATS_METRIC_NET_OUTPUT)/1024,
server.stat_rejected_conn,
@@ -4541,8 +4557,8 @@ sds genRedisInfoString(const char *section) {
(unsigned long long) trackingGetTotalItems(),
(unsigned long long) trackingGetTotalPrefixes(),
server.stat_unexpected_error_replies,
- server.stat_total_reads_processed,
- server.stat_total_writes_processed,
+ stat_total_reads_processed,
+ stat_total_writes_processed,
server.stat_io_reads_processed,
server.stat_io_writes_processed);
}
diff --git a/src/server.h b/src/server.h
index 8cba2f9b5..cac8544b9 100644
--- a/src/server.h
+++ b/src/server.h
@@ -34,6 +34,7 @@
#include "config.h"
#include "solarisfixes.h"
#include "rio.h"
+#include "atomicvar.h"
#include <stdio.h>
#include <stdlib.h>
@@ -511,8 +512,10 @@ typedef void (*moduleTypeDigestFunc)(struct RedisModuleDigest *digest, void *val
typedef size_t (*moduleTypeMemUsageFunc)(const void *value);
typedef void (*moduleTypeFreeFunc)(void *value);
-/* A callback that is called when the client authentication changes. This
- * needs to be exposed since you can't cast a function pointer to (void *) */
+/* This callback type is called by moduleNotifyUserChanged() every time
+ * a user authenticated via the module API is associated with a different
+ * user or gets disconnected. This needs to be exposed since you can't cast
+ * a function pointer to (void *). */
typedef void (*RedisModuleUserChangedFunc) (uint64_t client_id, void *privdata);
@@ -1063,7 +1066,7 @@ struct redisServer {
dict *commands; /* Command table */
dict *orig_commands; /* Command table before command renaming. */
aeEventLoop *el;
- _Atomic unsigned int lruclock; /* Clock for LRU eviction */
+ redisAtomic unsigned int lruclock; /* Clock for LRU eviction */
int shutdown_asap; /* SHUTDOWN needed ASAP */
int activerehashing; /* Incremental rehash in serverCron() */
int active_defrag_running; /* Active defragmentation running (holds current scan aggressiveness) */
@@ -1111,7 +1114,7 @@ struct redisServer {
mstime_t clients_pause_end_time; /* Time when we undo clients_paused */
char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */
dict *migrate_cached_sockets;/* MIGRATE cached sockets */
- _Atomic uint64_t next_client_id; /* Next client unique ID. Incremental. */
+ redisAtomic uint64_t next_client_id; /* Next client unique ID. Incremental. */
int protected_mode; /* Don't accept external connections. */
int gopher_enabled; /* If true the server will reply to gopher
queries. Will still serve RESP2 queries. */
@@ -1160,8 +1163,8 @@ struct redisServer {
long long slowlog_log_slower_than; /* SLOWLOG time limit (to get logged) */
unsigned long slowlog_max_len; /* SLOWLOG max number of items logged */
struct malloc_stats cron_malloc_stats; /* sampled in serverCron(). */
- _Atomic long long stat_net_input_bytes; /* Bytes read from network. */
- _Atomic long long stat_net_output_bytes; /* Bytes written to network. */
+ redisAtomic long long stat_net_input_bytes; /* Bytes read from network. */
+ redisAtomic long long stat_net_output_bytes; /* Bytes written to network. */
size_t stat_rdb_cow_bytes; /* Copy on write bytes during RDB saving. */
size_t stat_aof_cow_bytes; /* Copy on write bytes during AOF rewrite. */
size_t stat_module_cow_bytes; /* Copy on write bytes during module fork. */
@@ -1169,8 +1172,8 @@ struct redisServer {
long long stat_unexpected_error_replies; /* Number of unexpected (aof-loading, replica to master, etc.) error replies */
long long stat_io_reads_processed; /* Number of read events processed by IO / Main threads */
long long stat_io_writes_processed; /* Number of write events processed by IO / Main threads */
- _Atomic long long stat_total_reads_processed; /* Total number of read events processed */
- _Atomic long long stat_total_writes_processed; /* Total number of write events processed */
+ redisAtomic long long stat_total_reads_processed; /* Total number of read events processed */
+ redisAtomic long long stat_total_writes_processed; /* Total number of write events processed */
/* The following two are used to track instantaneous metrics, like
* number of operations per second, network traffic. */
struct {
@@ -1193,7 +1196,7 @@ struct redisServer {
int active_defrag_cycle_min; /* minimal effort for defrag in CPU percentage */
int active_defrag_cycle_max; /* maximal effort for defrag in CPU percentage */
unsigned long active_defrag_max_scan_fields; /* maximum number of fields of set/hash/zset/list to process from within the main dict scan */
- _Atomic size_t client_max_querybuf_len; /* Limit for client query buffer length */
+ size_t client_max_querybuf_len; /* Limit for client query buffer length */
int dbnum; /* Total number of configured DBs */
int supervised; /* 1 if supervised, 0 otherwise. */
int supervised_mode; /* See SUPERVISED_* */
@@ -1393,7 +1396,7 @@ struct redisServer {
int list_max_ziplist_size;
int list_compress_depth;
/* time cache */
- _Atomic time_t unixtime; /* Unix time sampled every cron cycle. */
+ redisAtomic time_t unixtime; /* Unix time sampled every cron cycle. */
time_t timezone; /* Cached timezone. As set by tzset(). */
int daylight_active; /* Currently in daylight saving time. */
mstime_t mstime; /* 'unixtime' in milliseconds. */
diff --git a/src/zmalloc.c b/src/zmalloc.c
index e36319241..cefa39aa8 100644
--- a/src/zmalloc.c
+++ b/src/zmalloc.c
@@ -74,8 +74,7 @@ void zlibc_free(void *ptr) {
#define update_zmalloc_stat_alloc(__n) atomicIncr(used_memory,(__n))
#define update_zmalloc_stat_free(__n) atomicDecr(used_memory,(__n))
-static size_t used_memory = 0;
-pthread_mutex_t used_memory_mutex = PTHREAD_MUTEX_INITIALIZER;
+static redisAtomic size_t used_memory = 0;
static void zmalloc_default_oom(size_t size) {
fprintf(stderr, "zmalloc: Out of memory trying to allocate %zu bytes\n",