summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoryoav <yoav@monfort.co.il>2012-12-12 15:59:22 +0200
committerantirez <antirez@gmail.com>2013-07-16 15:41:24 +0200
commit63d15dfc879489ebe811064886d69a347cca544d (patch)
tree50d59b50a89f725b5025a908525a49a4ecdfe552
parent9d520a7f70d24ea663b8de9c7178624d76a8d846 (diff)
downloadredis-63d15dfc879489ebe811064886d69a347cca544d.tar.gz
Chunked loading of RDB to prevent redis from stalling reading very large keys.
-rw-r--r--src/rdb.c23
-rw-r--r--src/redis.c1
-rw-r--r--src/redis.h1
-rw-r--r--src/rio.c4
-rw-r--r--src/rio.h31
5 files changed, 45 insertions, 15 deletions
diff --git a/src/rdb.c b/src/rdb.c
index c24f2d58f..c53c157c5 100644
--- a/src/rdb.c
+++ b/src/rdb.c
@@ -1057,21 +1057,32 @@ void stopLoading(void) {
server.loading = 0;
}
+/* Track loading progress in order to serve client's from time to time
+ and if needed calculate rdb checksum */
+void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
+ if (server.rdb_checksum)
+ rioGenericUpdateChecksum(r, buf, len);
+ if (server.loading_process_events_interval_bytes &&
+ (r->processed_bytes + len)/server.loading_process_events_interval_bytes > r->processed_bytes/server.loading_process_events_interval_bytes) {
+ loadingProgress(r->processed_bytes);
+ aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT);
+ }
+}
+
int rdbLoad(char *filename) {
uint32_t dbid;
int type, rdbver;
redisDb *db = server.db+0;
char buf[1024];
long long expiretime, now = mstime();
- long loops = 0;
FILE *fp;
rio rdb;
if ((fp = fopen(filename,"r")) == NULL) return REDIS_ERR;
rioInitWithFile(&rdb,fp);
- if (server.rdb_checksum)
- rdb.update_cksum = rioGenericUpdateChecksum;
+ rdb.update_cksum = rdbLoadProgressCallback;
+ rdb.max_processing_chunk = server.loading_process_events_interval_bytes;
if (rioRead(&rdb,buf,9) == 0) goto eoferr;
buf[9] = '\0';
if (memcmp(buf,"REDIS",5) != 0) {
@@ -1093,12 +1104,6 @@ int rdbLoad(char *filename) {
robj *key, *val;
expiretime = -1;
- /* Serve the clients from time to time */
- if (!(loops++ % 1000)) {
- loadingProgress(rioTell(&rdb));
- aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT);
- }
-
/* Read type. */
if ((type = rdbLoadType(&rdb)) == -1) goto eoferr;
if (type == REDIS_RDB_OPCODE_EXPIRETIME) {
diff --git a/src/redis.c b/src/redis.c
index 714d9b65f..99955488e 100644
--- a/src/redis.c
+++ b/src/redis.c
@@ -1290,6 +1290,7 @@ void initServerConfig() {
server.lua_client = NULL;
server.lua_timedout = 0;
server.migrate_cached_sockets = dictCreate(&migrateCacheDictType,NULL);
+ server.loading_process_events_interval_bytes = (1024*1024*2);
updateLRUClock();
resetServerSaveParams();
diff --git a/src/redis.h b/src/redis.h
index b067fdf45..e78dc528d 100644
--- a/src/redis.h
+++ b/src/redis.h
@@ -753,6 +753,7 @@ struct redisServer {
off_t loading_total_bytes;
off_t loading_loaded_bytes;
time_t loading_start_time;
+ off_t loading_process_events_interval_bytes;
/* Fast pointers to often looked up command */
struct redisCommand *delCommand, *multiCommand, *lpushCommand, *lpopCommand,
*rpopCommand;
diff --git a/src/rio.c b/src/rio.c
index b2f46a08b..405e789e6 100644
--- a/src/rio.c
+++ b/src/rio.c
@@ -108,6 +108,8 @@ static const rio rioBufferIO = {
rioBufferTell,
NULL, /* update_checksum */
0, /* current checksum */
+ 0, /* bytes read or written */
+ 0, /* read/write chunk size */
{ { NULL, 0 } } /* union for io-specific vars */
};
@@ -117,6 +119,8 @@ static const rio rioFileIO = {
rioFileTell,
NULL, /* update_checksum */
0, /* current checksum */
+ 0, /* bytes read or written */
+ 0, /* read/write chunk size */
{ { NULL, 0 } } /* union for io-specific vars */
};
diff --git a/src/rio.h b/src/rio.h
index 3cab66af0..c28b47dc4 100644
--- a/src/rio.h
+++ b/src/rio.h
@@ -53,6 +53,12 @@ struct _rio {
/* The current checksum */
uint64_t cksum;
+ /* number of bytes read or written */
+ size_t processed_bytes;
+
+ /* maximum simgle read or write chunk size */
+ size_t max_processing_chunk;
+
/* Backend-specific vars. */
union {
struct {
@@ -74,16 +80,29 @@ typedef struct _rio rio;
* if needed. */
static inline size_t rioWrite(rio *r, const void *buf, size_t len) {
- if (r->update_cksum) r->update_cksum(r,buf,len);
- return r->write(r,buf,len);
+ while (len) {
+ size_t bytes_to_write = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len;
+ if (r->update_cksum) r->update_cksum(r,buf,bytes_to_write);
+ if (r->write(r,buf,bytes_to_write) == 0)
+ return 0;
+ buf = (char*)buf + bytes_to_write;
+ len -= bytes_to_write;
+ r->processed_bytes += bytes_to_write;
+ }
+ return 1;
}
static inline size_t rioRead(rio *r, void *buf, size_t len) {
- if (r->read(r,buf,len) == 1) {
- if (r->update_cksum) r->update_cksum(r,buf,len);
- return 1;
+ while (len) {
+ size_t bytes_to_read = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len;
+ if (r->read(r,buf,bytes_to_read) == 0)
+ return 0;
+ if (r->update_cksum) r->update_cksum(r,buf,bytes_to_read);
+ buf = (char*)buf + bytes_to_read;
+ len -= bytes_to_read;
+ r->processed_bytes += bytes_to_read;
}
- return 0;
+ return 1;
}
static inline off_t rioTell(rio *r) {