summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2015-09-28 10:47:45 +0200
committerantirez <antirez@gmail.com>2015-10-01 13:02:26 +0200
commitc69c6c80fb5376a30e8cda34443a55b6326c9aa7 (patch)
treee2bf663000f90c754c9b995c3b15218ba59e9243
parentb08c36c5f2e635a128f2a306b6f38a0159ed56e6 (diff)
downloadredis-c69c6c80fb5376a30e8cda34443a55b6326c9aa7.tar.gz
Lazyfree: ability to free whole DBs in background.
-rw-r--r--src/bio.c13
-rw-r--r--src/cluster.c2
-rw-r--r--src/db.c43
-rw-r--r--src/debug.c4
-rw-r--r--src/lazyfree.c47
-rw-r--r--src/replication.c2
-rw-r--r--src/server.h9
7 files changed, 106 insertions, 14 deletions
diff --git a/src/bio.c b/src/bio.c
index fa10a503c..da11f7b86 100644
--- a/src/bio.c
+++ b/src/bio.c
@@ -85,6 +85,8 @@ struct bio_job {
void *bioProcessBackgroundJobs(void *arg);
void lazyfreeFreeObjectFromBioThread(robj *o);
+void lazyfreeFreeDatabaseFromBioThread(dict *ht1, dict *ht2);
+void lazyfreeFreeSlotsMapFromBioThread(zskiplist *sl);
/* Make sure we have enough stack to perform all the things we do in the
* main thread. */
@@ -187,7 +189,16 @@ void *bioProcessBackgroundJobs(void *arg) {
} else if (type == BIO_AOF_FSYNC) {
aof_fsync((long)job->arg1);
} else if (type == BIO_LAZY_FREE) {
- lazyfreeFreeObjectFromBioThread(job->arg1);
+ /* What we free changes depending on what arguments are set:
+ * arg1 -> free the object at pointer.
+ * arg2 & arg3 -> free two dictionaries (a Redis DB).
+ * only arg3 -> free the skiplist. */
+ if (job->arg1)
+ lazyfreeFreeObjectFromBioThread(job->arg1);
+ else if (job->arg2 && job->arg3)
+ lazyfreeFreeDatabaseFromBioThread(job->arg2,job->arg3);
+ else if (job->arg3)
+ lazyfreeFreeSlotsMapFromBioThread(job->arg3);
} else {
serverPanic("Wrong job type in bioProcessBackgroundJobs().");
}
diff --git a/src/cluster.c b/src/cluster.c
index 6fdc22175..17e86a6c0 100644
--- a/src/cluster.c
+++ b/src/cluster.c
@@ -495,7 +495,7 @@ void clusterReset(int hard) {
if (nodeIsSlave(myself)) {
clusterSetNodeAsMaster(myself);
replicationUnsetMaster();
- emptyDb(NULL);
+ emptyDb(-1,EMPTYDB_NO_FLAGS,NULL);
}
/* Close slots, reset manual failover state. */
diff --git a/src/db.c b/src/db.c
index d4b215e53..0bfa14695 100644
--- a/src/db.c
+++ b/src/db.c
@@ -29,6 +29,7 @@
#include "server.h"
#include "cluster.h"
+#include "atomicvar.h"
#include <signal.h>
#include <ctype.h>
@@ -238,16 +239,46 @@ robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o) {
return o;
}
-long long emptyDb(void(callback)(void*)) {
- int j;
+/* Remove all keys from all the databases in a Redis server.
+ * If callback is given the function is called from time to time to
+ * signal that work is in progress.
+ *
+ * The dbnum can be -1 if all teh DBs should be flushed, or the specified
+ * DB number if we want to flush only a single Redis database number.
+ *
+ * Flags are be EMPTYDB_NO_FLAGS if no special flags are specified or
+ * EMPTYDB_ASYCN if we want the memory to be freed in a different thread
+ * and the function to return ASAP.
+ *
+ * On success the fuction returns the number of keys removed from the
+ * database(s). Otherwise -1 is returned in the specific case the
+ * DB number is out of range, and errno is set to EINVAL. */
+long long emptyDb(int dbnum, int flags, void(callback)(void*)) {
+ int j, async = (flags & EMPTYDB_ASYNC);
long long removed = 0;
+ if (dbnum < -1 || dbnum >= server.dbnum) {
+ errno = EINVAL;
+ return -1;
+ }
+
for (j = 0; j < server.dbnum; j++) {
+ if (dbnum != 1 && dbnum != j) continue;
removed += dictSize(server.db[j].dict);
- dictEmpty(server.db[j].dict,callback);
- dictEmpty(server.db[j].expires,callback);
+ if (async) {
+ emptyDbAsync(&server.db[j]);
+ } else {
+ dictEmpty(server.db[j].dict,callback);
+ dictEmpty(server.db[j].expires,callback);
+ }
+ }
+ if (server.cluster_enabled) {
+ if (async) {
+ slotToKeyFlushAsync();
+ } else {
+ slotToKeyFlush();
+ }
}
- if (server.cluster_enabled) slotToKeyFlush();
return removed;
}
@@ -290,7 +321,7 @@ void flushdbCommand(client *c) {
void flushallCommand(client *c) {
signalFlushedDb(-1);
- server.dirty += emptyDb(NULL);
+ server.dirty += emptyDb(-1,EMPTYDB_NO_FLAGS,NULL);
addReply(c,shared.ok);
if (server.rdb_child_pid != -1) {
kill(server.rdb_child_pid,SIGUSR1);
diff --git a/src/debug.c b/src/debug.c
index 463b7f374..48a1b5137 100644
--- a/src/debug.c
+++ b/src/debug.c
@@ -271,7 +271,7 @@ void debugCommand(client *c) {
addReply(c,shared.err);
return;
}
- emptyDb(NULL);
+ emptyDb(-1,EMPTYDB_NO_FLAGS,NULL);
if (rdbLoad(server.rdb_filename) != C_OK) {
addReplyError(c,"Error trying to load the RDB dump");
return;
@@ -279,7 +279,7 @@ void debugCommand(client *c) {
serverLog(LL_WARNING,"DB reloaded by DEBUG RELOAD");
addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr,"loadaof")) {
- emptyDb(NULL);
+ emptyDb(-1,EMPTYDB_NO_FLAGS,NULL);
if (loadAppendOnlyFile(server.aof_filename) != C_OK) {
addReply(c,shared.err);
return;
diff --git a/src/lazyfree.c b/src/lazyfree.c
index f94edfd0a..321bd5411 100644
--- a/src/lazyfree.c
+++ b/src/lazyfree.c
@@ -1,6 +1,7 @@
#include "server.h"
#include "bio.h"
#include "atomicvar.h"
+#include "cluster.h"
static size_t lazyfree_objects = 0;
pthread_mutex_t lazyfree_objects_mutex = PTHREAD_MUTEX_INITIALIZER;
@@ -75,9 +76,51 @@ int dbAsyncDelete(redisDb *db, robj *key) {
}
}
-/* Implementation of function to release a single object called from the
- * lazyfree thread from bio.c. */
+/* Empty a Redis DB asynchronously. What the function does actually is to
+ * create a new empty set of hash tables and scheduling the old ones for
+ * lazy freeing. */
+void emptyDbAsync(redisDb *db) {
+ dict *oldht1 = db->dict, *oldht2 = db->expires;
+ db->dict = dictCreate(&dbDictType,NULL);
+ db->expires = dictCreate(&keyptrDictType,NULL);
+ atomicIncr(lazyfree_objects,dictSize(oldht1),
+ &lazyfree_objects_mutex);
+ bioCreateBackgroundJob(BIO_LAZY_FREE,NULL,oldht1,oldht2);
+}
+
+/* Empty the slots-keys map of Redis CLuster by creating a new empty one
+ * and scheduiling the old for lazy freeing. */
+void slotToKeyFlushAsync(void) {
+ zskiplist *oldsl = server.cluster->slots_to_keys;
+ server.cluster->slots_to_keys = zslCreate();
+ atomicIncr(lazyfree_objects,oldsl->length,
+ &lazyfree_objects_mutex);
+ bioCreateBackgroundJob(BIO_LAZY_FREE,NULL,NULL,oldsl);
+}
+
+/* Release objects from the lazyfree thread. It's just decrRefCount()
+ * updating the count of objects to release. */
void lazyfreeFreeObjectFromBioThread(robj *o) {
decrRefCount(o);
atomicDecr(lazyfree_objects,1,&lazyfree_objects_mutex);
}
+
+/* Release a database from the lazyfree thread. The 'db' pointer is the
+ * database which was substitutied with a fresh one in the main thread
+ * when the database was logically deleted. 'sl' is a skiplist used by
+ * Redis Cluster in order to take the hash slots -> keys mapping. This
+ * may be NULL if Redis Cluster is disabled. */
+void lazyfreeFreeDatabaseFromBioThread(dict *ht1, dict *ht2) {
+ size_t numkeys = dictSize(ht1);
+ dictRelease(ht1);
+ dictRelease(ht2);
+ atomicDecr(lazyfree_objects,numkeys,&lazyfree_objects_mutex);
+}
+
+/* Release the skiplist mapping Redis Cluster keys to slots in the
+ * lazyfree thread. */
+void lazyfreeFreeSlotsMapFromBioThread(zskiplist *sl) {
+ size_t len = sl->length;
+ zslFree(sl);
+ atomicDecr(lazyfree_objects,len,&lazyfree_objects_mutex);
+}
diff --git a/src/replication.c b/src/replication.c
index 7ab646362..5a61f4d99 100644
--- a/src/replication.c
+++ b/src/replication.c
@@ -1111,7 +1111,7 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
}
serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: Flushing old data");
signalFlushedDb(-1);
- emptyDb(replicationEmptyDbCallback);
+ emptyDb(-1,EMPTYDB_NO_FLAGS,replicationEmptyDbCallback);
/* Before loading the DB into memory we need to delete the readable
* handler, otherwise it will get called recursively since
* rdbLoad() will call the event loop to process events from time to
diff --git a/src/server.h b/src/server.h
index 5b5bc9887..7ba9a4325 100644
--- a/src/server.h
+++ b/src/server.h
@@ -1050,6 +1050,7 @@ extern dictType shaScriptObjectDictType;
extern double R_Zero, R_PosInf, R_NegInf, R_Nan;
extern dictType hashDictType;
extern dictType replScriptCacheDictType;
+extern dictType keyptrDictType;
/*-----------------------------------------------------------------------------
* Functions prototypes
@@ -1384,7 +1385,11 @@ robj *dbRandomKey(redisDb *db);
int dbSyncDelete(redisDb *db, robj *key);
int dbDelete(redisDb *db, robj *key);
robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o);
-long long emptyDb(void(callback)(void*));
+
+#define EMPTYDB_NO_FLAGS 0 /* No flags. */
+#define EMPTYDB_ASYNC (1<<0) /* Reclaim memory in another thread. */
+long long emptyDb(int dbnum, int flags, void(callback)(void*));
+
int selectDb(client *c, int id);
void signalModifiedKey(redisDb *db, robj *key);
void signalFlushedDb(int dbid);
@@ -1407,6 +1412,8 @@ void slotToKeyFlush(void);
#define LAZYFREE_STEP_OOM 2 /* Free a few elements at any cost if there
is something to free: we are out of memory */
int dbAsyncDelete(redisDb *db, robj *key);
+void emptyDbAsync(redisDb *db);
+void slotToKeyFlushAsync(void);
/* API to get key arguments from commands */
int *getKeysFromCommand(struct redisCommand *cmd, robj **argv, int argc, int *numkeys);