summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2018-09-03 18:17:25 +0200
committerantirez <antirez@gmail.com>2018-09-03 18:17:31 +0200
commit3e7349fdaf8cdbf96e595750034af43e6d6c56f0 (patch)
treebb218aafee722cdf33c36d90d068a61b0952c535
parentfebe102bf6d94428779f3943aea5947893301912 (diff)
downloadredis-3e7349fdaf8cdbf96e595750034af43e6d6c56f0.tar.gz
Make pending buffer processing safe for CLIENT_MASTER client.
Related to #5305.
-rw-r--r--src/blocked.c2
-rw-r--r--src/networking.c32
-rw-r--r--src/server.h1
3 files changed, 22 insertions, 13 deletions
diff --git a/src/blocked.c b/src/blocked.c
index 35c33d1cc..aeca87a6e 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -126,7 +126,7 @@ void processUnblockedClients(void) {
* the code is conceptually more correct this way. */
if (!(c->flags & CLIENT_BLOCKED)) {
if (c->querybuf && sdslen(c->querybuf) > 0) {
- processInputBuffer(c);
+ processInputBufferAndReplicate(c);
}
}
}
diff --git a/src/networking.c b/src/networking.c
index 27c695306..8e55ec902 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -1425,6 +1425,25 @@ void processInputBuffer(client *c) {
server.current_client = NULL;
}
+/* This is a wrapper for processInputBuffer that also cares about handling
+ * the replication forwarding to the sub-slaves, in case the client 'c'
+ * is flagged as master. Usually you want to call this instead of the
+ * raw processInputBuffer(). */
+void processInputBufferAndReplicate(client *c) {
+ if (!(c->flags & CLIENT_MASTER)) {
+ processInputBuffer(c);
+ } else {
+ size_t prev_offset = c->reploff;
+ processInputBuffer(c);
+ size_t applied = c->reploff - prev_offset;
+ if (applied) {
+ replicationFeedSlavesFromMasterStream(server.slaves,
+ c->pending_querybuf, applied);
+ sdsrange(c->pending_querybuf,applied,-1);
+ }
+ }
+}
+
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
client *c = (client*) privdata;
int nread, readlen;
@@ -1492,18 +1511,7 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
* was actually applied to the master state: this quantity, and its
* corresponding part of the replication stream, will be propagated to
* the sub-slaves and to the replication backlog. */
- if (!(c->flags & CLIENT_MASTER)) {
- processInputBuffer(c);
- } else {
- size_t prev_offset = c->reploff;
- processInputBuffer(c);
- size_t applied = c->reploff - prev_offset;
- if (applied) {
- replicationFeedSlavesFromMasterStream(server.slaves,
- c->pending_querybuf, applied);
- sdsrange(c->pending_querybuf,applied,-1);
- }
- }
+ processInputBufferAndReplicate(c);
}
void getClientsMaxBuffers(unsigned long *longest_output_list,
diff --git a/src/server.h b/src/server.h
index 4fb8e383c..4a5967f10 100644
--- a/src/server.h
+++ b/src/server.h
@@ -1418,6 +1418,7 @@ void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask);
void *addDeferredMultiBulkLength(client *c);
void setDeferredMultiBulkLength(client *c, void *node, long length);
void processInputBuffer(client *c);
+void processInputBufferAndReplicate(client *c);
void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask);
void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask);