From 63d15dfc879489ebe811064886d69a347cca544d Mon Sep 17 00:00:00 2001 From: yoav Date: Wed, 12 Dec 2012 15:59:22 +0200 Subject: Chunked loading of RDB to prevent redis from stalling reading very large keys. --- src/rdb.c | 23 ++++++++++++++--------- src/redis.c | 1 + src/redis.h | 1 + src/rio.c | 4 ++++ src/rio.h | 31 +++++++++++++++++++++++++------ 5 files changed, 45 insertions(+), 15 deletions(-) diff --git a/src/rdb.c b/src/rdb.c index c24f2d58f..c53c157c5 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -1057,21 +1057,32 @@ void stopLoading(void) { server.loading = 0; } +/* Track loading progress in order to serve client's from time to time + and if needed calculate rdb checksum */ +void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) { + if (server.rdb_checksum) + rioGenericUpdateChecksum(r, buf, len); + if (server.loading_process_events_interval_bytes && + (r->processed_bytes + len)/server.loading_process_events_interval_bytes > r->processed_bytes/server.loading_process_events_interval_bytes) { + loadingProgress(r->processed_bytes); + aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT); + } +} + int rdbLoad(char *filename) { uint32_t dbid; int type, rdbver; redisDb *db = server.db+0; char buf[1024]; long long expiretime, now = mstime(); - long loops = 0; FILE *fp; rio rdb; if ((fp = fopen(filename,"r")) == NULL) return REDIS_ERR; rioInitWithFile(&rdb,fp); - if (server.rdb_checksum) - rdb.update_cksum = rioGenericUpdateChecksum; + rdb.update_cksum = rdbLoadProgressCallback; + rdb.max_processing_chunk = server.loading_process_events_interval_bytes; if (rioRead(&rdb,buf,9) == 0) goto eoferr; buf[9] = '\0'; if (memcmp(buf,"REDIS",5) != 0) { @@ -1093,12 +1104,6 @@ int rdbLoad(char *filename) { robj *key, *val; expiretime = -1; - /* Serve the clients from time to time */ - if (!(loops++ % 1000)) { - loadingProgress(rioTell(&rdb)); - aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT); - } - /* Read type. */ if ((type = rdbLoadType(&rdb)) == -1) goto eoferr; if (type == REDIS_RDB_OPCODE_EXPIRETIME) { diff --git a/src/redis.c b/src/redis.c index 714d9b65f..99955488e 100644 --- a/src/redis.c +++ b/src/redis.c @@ -1290,6 +1290,7 @@ void initServerConfig() { server.lua_client = NULL; server.lua_timedout = 0; server.migrate_cached_sockets = dictCreate(&migrateCacheDictType,NULL); + server.loading_process_events_interval_bytes = (1024*1024*2); updateLRUClock(); resetServerSaveParams(); diff --git a/src/redis.h b/src/redis.h index b067fdf45..e78dc528d 100644 --- a/src/redis.h +++ b/src/redis.h @@ -753,6 +753,7 @@ struct redisServer { off_t loading_total_bytes; off_t loading_loaded_bytes; time_t loading_start_time; + off_t loading_process_events_interval_bytes; /* Fast pointers to often looked up command */ struct redisCommand *delCommand, *multiCommand, *lpushCommand, *lpopCommand, *rpopCommand; diff --git a/src/rio.c b/src/rio.c index b2f46a08b..405e789e6 100644 --- a/src/rio.c +++ b/src/rio.c @@ -108,6 +108,8 @@ static const rio rioBufferIO = { rioBufferTell, NULL, /* update_checksum */ 0, /* current checksum */ + 0, /* bytes read or written */ + 0, /* read/write chunk size */ { { NULL, 0 } } /* union for io-specific vars */ }; @@ -117,6 +119,8 @@ static const rio rioFileIO = { rioFileTell, NULL, /* update_checksum */ 0, /* current checksum */ + 0, /* bytes read or written */ + 0, /* read/write chunk size */ { { NULL, 0 } } /* union for io-specific vars */ }; diff --git a/src/rio.h b/src/rio.h index 3cab66af0..c28b47dc4 100644 --- a/src/rio.h +++ b/src/rio.h @@ -53,6 +53,12 @@ struct _rio { /* The current checksum */ uint64_t cksum; + /* number of bytes read or written */ + size_t processed_bytes; + + /* maximum simgle read or write chunk size */ + size_t max_processing_chunk; + /* Backend-specific vars. */ union { struct { @@ -74,16 +80,29 @@ typedef struct _rio rio; * if needed. */ static inline size_t rioWrite(rio *r, const void *buf, size_t len) { - if (r->update_cksum) r->update_cksum(r,buf,len); - return r->write(r,buf,len); + while (len) { + size_t bytes_to_write = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len; + if (r->update_cksum) r->update_cksum(r,buf,bytes_to_write); + if (r->write(r,buf,bytes_to_write) == 0) + return 0; + buf = (char*)buf + bytes_to_write; + len -= bytes_to_write; + r->processed_bytes += bytes_to_write; + } + return 1; } static inline size_t rioRead(rio *r, void *buf, size_t len) { - if (r->read(r,buf,len) == 1) { - if (r->update_cksum) r->update_cksum(r,buf,len); - return 1; + while (len) { + size_t bytes_to_read = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len; + if (r->read(r,buf,bytes_to_read) == 0) + return 0; + if (r->update_cksum) r->update_cksum(r,buf,bytes_to_read); + buf = (char*)buf + bytes_to_read; + len -= bytes_to_read; + r->processed_bytes += bytes_to_read; } - return 0; + return 1; } static inline off_t rioTell(rio *r) { -- cgit v1.2.1