From a69a0c9c3bc199a627a794e07f3449290ddb0dc8 Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 11 Jan 2010 05:15:54 -0500 Subject: More threaded I/O VM work + Redis init script --- Makefile | 8 +++---- redis.c | 62 +++++++++++++++++++++++++++++++++++++------------ staticsymbols.h | 15 +++++++++++- utils/redis_init_script | 36 ++++++++++++++++++++++++++++ zmalloc.c | 2 ++ 5 files changed, 102 insertions(+), 21 deletions(-) create mode 100755 utils/redis_init_script diff --git a/Makefile b/Makefile index 50b328874..5caf21b8e 100644 --- a/Makefile +++ b/Makefile @@ -5,12 +5,10 @@ uname_S := $(shell sh -c 'uname -s 2>/dev/null || echo not') ifeq ($(uname_S),SunOS) CFLAGS?= -std=c99 -pedantic -O2 -Wall -W -D__EXTENSIONS__ -D_XPG6 - CCLINK?= -ldl -lnsl -lsocket -lm - PTLINK?= -lpthread + CCLINK?= -ldl -lnsl -lsocket -lm -lpthread else CFLAGS?= -std=c99 -pedantic -O2 -Wall -W $(ARCH) $(PROF) - CCLINK?= -lm - PTLINK?= -lpthread + CCLINK?= -lm -pthread endif CCOPT= $(CFLAGS) $(CCLINK) $(ARCH) $(PROF) DEBUG?= -g -rdynamic -ggdb @@ -42,7 +40,7 @@ sds.o: sds.c sds.h zmalloc.h zmalloc.o: zmalloc.c config.h redis-server: $(OBJ) - $(CC) -o $(PRGNAME) $(CCOPT) $(PTLINK) $(DEBUG) $(OBJ) + $(CC) -o $(PRGNAME) $(CCOPT) $(DEBUG) $(OBJ) @echo "" @echo "Hint: To run the test-redis.tcl script is a good idea." @echo "Launch the redis server with ./redis-server, then in another" diff --git a/redis.c b/redis.c index 36bee82cf..6d9e95d57 100644 --- a/redis.c +++ b/redis.c @@ -536,7 +536,8 @@ static void vmInit(void); static void vmMarkPagesFree(off_t page, off_t count); static robj *vmLoadObject(robj *key); static robj *vmPreviewObject(robj *key); -static int vmSwapOneObject(void); +static int vmSwapOneObjectBlocking(void); +static int vmSwapOneObjectThreaded(void); static int vmCanSwapOut(void); static void freeOneObjectFromFreelist(void); static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask); @@ -1232,11 +1233,17 @@ static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientD { if (listLength(server.objfreelist)) { freeOneObjectFromFreelist(); - } else if (vmSwapOneObject() == REDIS_ERR) { - if ((loops % 30) == 0 && zmalloc_used_memory() > - (server.vm_max_memory+server.vm_max_memory/10)) { - redisLog(REDIS_WARNING,"WARNING: vm-max-memory limit exceeded by more than 10%% but unable to swap more objects out!"); + } else { + if (vmSwapOneObjectThreaded() == REDIS_ERR) { + if ((loops % 30) == 0 && zmalloc_used_memory() > + (server.vm_max_memory+server.vm_max_memory/10)) { + redisLog(REDIS_WARNING,"WARNING: vm-max-memory limit exceeded by more than 10%% but unable to swap more objects out!"); + } } + /* Note that we freed just one object, because anyway when + * the I/O thread in charge to swap this object out will + * do its work, the handler of completed jobs will try to swap + * more objects if we are out of memory. */ break; } } @@ -1635,6 +1642,7 @@ static void freeClient(redisClient *c) { aeDeleteFileEvent(server.el,c->fd,AE_READABLE); aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE); listRelease(c->reply); + listRelease(c->io_keys); freeClientArgv(c); close(c->fd); /* Remove from the list of clients */ @@ -3329,7 +3337,7 @@ static int rdbLoad(char *filename) { loadedkeys++; if (server.vm_enabled && (loadedkeys % 5000) == 0) { while (zmalloc_used_memory() > server.vm_max_memory) { - if (vmSwapOneObject() == REDIS_ERR) break; + if (vmSwapOneObjectBlocking() == REDIS_ERR) break; } } } @@ -6605,7 +6613,7 @@ int loadAppendOnlyFile(char *filename) { loadedkeys++; if (server.vm_enabled && (loadedkeys % 5000) == 0) { while (zmalloc_used_memory() > server.vm_max_memory) { - if (vmSwapOneObject() == REDIS_ERR) break; + if (vmSwapOneObjectBlocking() == REDIS_ERR) break; } } } @@ -7071,7 +7079,7 @@ static int vmFindContiguousPages(off_t *first, int n) { * needed to later retrieve the object into the key object. * If we can't find enough contiguous empty pages to swap the object on disk * REDIS_ERR is returned. */ -static int vmSwapObject(robj *key, robj *val) { +static int vmSwapObjectBlocking(robj *key, robj *val) { off_t pages = rdbSavedObjectPages(val); off_t page; @@ -7080,7 +7088,7 @@ static int vmSwapObject(robj *key, robj *val) { if (vmFindContiguousPages(&page,pages) == REDIS_ERR) return REDIS_ERR; if (fseeko(server.vm_fp,page*server.vm_page_size,SEEK_SET) == -1) { redisLog(REDIS_WARNING, - "Critical VM problem in vmSwapObject(): can't seek: %s", + "Critical VM problem in vmSwapObjectBlocking(): can't seek: %s", strerror(errno)); return REDIS_ERR; } @@ -7100,6 +7108,13 @@ static int vmSwapObject(robj *key, robj *val) { return REDIS_OK; } +static int vmSwapObjectThreaded(robj *key, robj *val) { + + key = key; + val = val; + return REDIS_OK; +} + /* Load the value object relative to the 'key' object from swap to memory. * The newly allocated object is returned. * @@ -7221,8 +7236,11 @@ static double computeObjectSwappability(robj *o) { /* Try to swap an object that's a good candidate for swapping. * Returns REDIS_OK if the object was swapped, REDIS_ERR if it's not possible - * to swap any object at all. */ -static int vmSwapOneObject(void) { + * to swap any object at all. + * + * If 'usethreaded' is true, Redis will try to swap the object in background + * using I/O threads. */ +static int vmSwapOneObject(int usethreads) { int j, i; struct dictEntry *best = NULL; double best_swappability = 0; @@ -7269,14 +7287,27 @@ static int vmSwapOneObject(void) { key = dictGetEntryKey(best) = newkey; } /* Swap it */ - if (vmSwapObject(key,val) == REDIS_OK) { - dictGetEntryVal(best) = NULL; + if (usethreads) { + vmSwapObjectThreaded(key,val); return REDIS_OK; } else { - return REDIS_ERR; + if (vmSwapObjectBlocking(key,val) == REDIS_OK) { + dictGetEntryVal(best) = NULL; + return REDIS_OK; + } else { + return REDIS_ERR; + } } } +static int vmSwapOneObjectBlocking() { + return vmSwapOneObject(0); +} + +static int vmSwapOneObjectThreaded() { + return vmSwapOneObject(1); +} + /* Return true if it's safe to swap out objects in a given moment. * Basically we don't want to swap objects out while there is a BGSAVE * or a BGAEOREWRITE running in backgroud. */ @@ -7358,6 +7389,7 @@ static void vmCancelThreadedIOJob(robj *o) { if (job->type == REDIS_IOJOB_SWAP) decrRefCount(job->val); listDelNode(lists[i],ln); + zfree(job); break; case 1: /* io_processing */ case 2: /* io_processed */ @@ -7448,7 +7480,7 @@ static void debugCommand(redisClient *c) { /* Swap it */ if (key->storage != REDIS_VM_MEMORY) { addReplySds(c,sdsnew("-ERR This key is not in memory\r\n")); - } else if (vmSwapObject(key,val) == REDIS_OK) { + } else if (vmSwapObjectBlocking(key,val) == REDIS_OK) { dictGetEntryVal(de) = NULL; addReply(c,shared.ok); } else { diff --git a/staticsymbols.h b/staticsymbols.h index c8ad4a992..a7f946948 100644 --- a/staticsymbols.h +++ b/staticsymbols.h @@ -14,6 +14,7 @@ static struct redisFunctionSym symsTable[] = { {"blockingPopGenericCommand",(unsigned long)blockingPopGenericCommand}, {"blpopCommand",(unsigned long)blpopCommand}, {"brpopCommand",(unsigned long)brpopCommand}, +{"bytesToHuman",(unsigned long)bytesToHuman}, {"call",(unsigned long)call}, {"closeTimedoutClients",(unsigned long)closeTimedoutClients}, {"compareStringObjects",(unsigned long)compareStringObjects}, @@ -33,6 +34,7 @@ static struct redisFunctionSym symsTable[] = { {"decrRefCount",(unsigned long)decrRefCount}, {"decrbyCommand",(unsigned long)decrbyCommand}, {"delCommand",(unsigned long)delCommand}, +{"deleteIfSwapped",(unsigned long)deleteIfSwapped}, {"deleteIfVolatile",(unsigned long)deleteIfVolatile}, {"deleteKey",(unsigned long)deleteKey}, {"dictEncObjKeyCompare",(unsigned long)dictEncObjKeyCompare}, @@ -60,6 +62,7 @@ static struct redisFunctionSym symsTable[] = { {"freeHashObject",(unsigned long)freeHashObject}, {"freeListObject",(unsigned long)freeListObject}, {"freeMemoryIfNeeded",(unsigned long)freeMemoryIfNeeded}, +{"freeOneObjectFromFreelist",(unsigned long)freeOneObjectFromFreelist}, {"freeSetObject",(unsigned long)freeSetObject}, {"freeStringObject",(unsigned long)freeStringObject}, {"freeZsetObject",(unsigned long)freeZsetObject}, @@ -90,6 +93,7 @@ static struct redisFunctionSym symsTable[] = { {"lindexCommand",(unsigned long)lindexCommand}, {"llenCommand",(unsigned long)llenCommand}, {"loadServerConfig",(unsigned long)loadServerConfig}, +{"lockThreadedIO",(unsigned long)lockThreadedIO}, {"lookupKey",(unsigned long)lookupKey}, {"lookupKeyByPattern",(unsigned long)lookupKeyByPattern}, {"lookupKeyRead",(unsigned long)lookupKeyRead}, @@ -198,17 +202,26 @@ static struct redisFunctionSym symsTable[] = { {"ttlCommand",(unsigned long)ttlCommand}, {"typeCommand",(unsigned long)typeCommand}, {"unblockClient",(unsigned long)unblockClient}, +{"unlockThreadedIO",(unsigned long)unlockThreadedIO}, {"updateSlavesWaitingBgsave",(unsigned long)updateSlavesWaitingBgsave}, +{"vmCanSwapOut",(unsigned long)vmCanSwapOut}, +{"vmCancelThreadedIOJob",(unsigned long)vmCancelThreadedIOJob}, {"vmFindContiguousPages",(unsigned long)vmFindContiguousPages}, {"vmFreePage",(unsigned long)vmFreePage}, +{"vmGenericLoadObject",(unsigned long)vmGenericLoadObject}, {"vmInit",(unsigned long)vmInit}, {"vmLoadObject",(unsigned long)vmLoadObject}, {"vmMarkPageFree",(unsigned long)vmMarkPageFree}, {"vmMarkPageUsed",(unsigned long)vmMarkPageUsed}, {"vmMarkPagesFree",(unsigned long)vmMarkPagesFree}, {"vmMarkPagesUsed",(unsigned long)vmMarkPagesUsed}, -{"vmSwapObject",(unsigned long)vmSwapObject}, +{"vmPreviewObject",(unsigned long)vmPreviewObject}, +{"vmSwapObjectBlocking",(unsigned long)vmSwapObjectBlocking}, +{"vmSwapObjectThreaded",(unsigned long)vmSwapObjectThreaded}, {"vmSwapOneObject",(unsigned long)vmSwapOneObject}, +{"vmSwapOneObjectBlocking",(unsigned long)vmSwapOneObjectBlocking}, +{"vmSwapOneObjectThreaded",(unsigned long)vmSwapOneObjectThreaded}, +{"vmThreadedIOCompletedJob",(unsigned long)vmThreadedIOCompletedJob}, {"yesnotoi",(unsigned long)yesnotoi}, {"zaddCommand",(unsigned long)zaddCommand}, {"zaddGenericCommand",(unsigned long)zaddGenericCommand}, diff --git a/utils/redis_init_script b/utils/redis_init_script new file mode 100755 index 000000000..35b906fce --- /dev/null +++ b/utils/redis_init_script @@ -0,0 +1,36 @@ +#!/bin/sh + +REDISPORT=6379 +EXEC=/usr/local/bin/redis-server + +PIDFILE=/var/run/redis_${REDISPORT}.pid +CONF="/etc/redis/${REDISPORT}.conf" + +case "$1" in + start) + if [ -f $PIDFILE ] + then + echo -n "$PIDFILE exists, process is already running or crashed\n" + else + echo -n "Starting Redis server...\n" + $EXEC $CONF + fi + ;; + stop) + if [ ! -f $PIDFILE ] + then + echo -n "$PIDFILE does not exist, process is not running\n" + else + echo -n "Stopping ...\n" + echo -n "Sending SHUTDOWN\r\n" | nc localhost $REDISPORT & + PID=$(cat $PIDFILE) + while [ -x /proc/${PIDFILE} ] + do + echo "Waiting for Redis to shutdown ..." + sleep 1 + done + rm $PIDFILE + echo "Redis stopped" + fi + ;; +esac diff --git a/zmalloc.c b/zmalloc.c index eb06da3b8..64263756e 100644 --- a/zmalloc.c +++ b/zmalloc.c @@ -31,6 +31,7 @@ #include #include #include +#include #include "config.h" #if defined(__sun) @@ -40,6 +41,7 @@ #endif static size_t used_memory = 0; +pthread_mutex_t used_memory_mutex = PTHREAD_MUTEX_INITIALIZER; static void zmalloc_oom(size_t size) { fprintf(stderr, "zmalloc: Out of memory trying to allocate %zu bytes\n", -- cgit v1.2.1