summaryrefslogtreecommitdiff
path: root/src/server.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/server.h')
-rw-r--r--src/server.h81
1 files changed, 71 insertions, 10 deletions
diff --git a/src/server.h b/src/server.h
index b911697a1..21d5fcd65 100644
--- a/src/server.h
+++ b/src/server.h
@@ -377,6 +377,13 @@ typedef enum {
/* Synchronous read timeout - slave side */
#define CONFIG_REPL_SYNCIO_TIMEOUT 5
+/* The default number of replication backlog blocks to trim per call. */
+#define REPL_BACKLOG_TRIM_BLOCKS_PER_CALL 64
+
+/* In order to quickly find the requested offset for PSYNC requests,
+ * we index some nodes in the replication buffer linked list into a rax. */
+#define REPL_BACKLOG_INDEX_PER_BLOCKS 64
+
/* List related stuff */
#define LIST_HEAD 0
#define LIST_TAIL 1
@@ -767,6 +774,33 @@ typedef struct clientReplyBlock {
char buf[];
} clientReplyBlock;
+/* Replication buffer blocks is the list of replBufBlock.
+ *
+ * +--------------+ +--------------+ +--------------+
+ * | refcount = 1 | ... | refcount = 0 | ... | refcount = 2 |
+ * +--------------+ +--------------+ +--------------+
+ * | / \
+ * | / \
+ * | / \
+ * Repl Backlog Replia_A Replia_B
+ *
+ * Each replica or replication backlog increments only the refcount of the
+ * 'ref_repl_buf_node' which it points to. So when replica walks to the next
+ * node, it should first increase the next node's refcount, and when we trim
+ * the replication buffer nodes, we remove node always from the head node which
+ * refcount is 0. If the refcount of the head node is not 0, we must stop
+ * trimming and never iterate the next node. */
+
+/* Similar with 'clientReplyBlock', it is used for shared buffers between
+ * all replica clients and replication backlog. */
+typedef struct replBufBlock {
+ int refcount; /* Number of replicas or repl backlog using. */
+ long long id; /* The unique incremental number. */
+ long long repl_offset; /* Start replication offset of the block. */
+ size_t size, used;
+ char buf[];
+} replBufBlock;
+
/* Redis database representation. There are multiple databases identified
* by integers from 0 (the default database) up to the max configured
* database. The database number is the 'id' field in the structure. */
@@ -929,6 +963,24 @@ typedef struct {
need more reserved IDs use UINT64_MAX-1,
-2, ... and so forth. */
+/* Replication backlog is not separate memory, it just is one consumer of
+ * the global replication buffer. This structure records the reference of
+ * replication buffers. Since the replication buffer block list may be very long,
+ * it would cost much time to search replication offset on partial resync, so
+ * we use one rax tree to index some blocks every REPL_BACKLOG_INDEX_PER_BLOCKS
+ * to make searching offset from replication buffer blocks list faster. */
+typedef struct replBacklog {
+ listNode *ref_repl_buf_node; /* Referenced node of replication buffer blocks,
+ * see the definition of replBufBlock. */
+ size_t unindexed_count; /* The count from last creating index block. */
+ rax *blocks_index; /* The index of reocrded blocks of replication
+ * buffer for quickly searching replication
+ * offset on partial resynchronization. */
+ long long histlen; /* Backlog actual data length */
+ long long offset; /* Replication "master offset" of first
+ * byte in the replication backlog buffer.*/
+} replBacklog;
+
typedef struct {
list *clients;
size_t mem_usage_sum;
@@ -1029,6 +1081,11 @@ typedef struct client {
listNode *mem_usage_bucket_node;
clientMemUsageBucket *mem_usage_bucket;
+ listNode *ref_repl_buf_node; /* Referenced node of replication buffer blocks,
+ * see the definition of replBufBlock. */
+ size_t ref_block_pos; /* Access position of referenced buffer block,
+ * i.e. the next offset to send. */
+
/* Response buffer */
int bufpos;
size_t buf_usable_size; /* Usable size of buffer. */
@@ -1528,14 +1585,8 @@ struct redisServer {
long long second_replid_offset; /* Accept offsets up to this for replid2. */
int slaveseldb; /* Last SELECTed DB in replication output */
int repl_ping_slave_period; /* Master pings the slave every N seconds */
- char *repl_backlog; /* Replication backlog for partial syncs */
+ replBacklog *repl_backlog; /* Replication backlog for partial syncs */
long long repl_backlog_size; /* Backlog circular buffer size */
- long long cfg_repl_backlog_size;/* Backlog circular buffer size in config */
- long long repl_backlog_histlen; /* Backlog actual data length */
- long long repl_backlog_idx; /* Backlog circular buffer current offset,
- that is the next byte will'll write to.*/
- long long repl_backlog_off; /* Replication "master offset" of first
- byte in the replication backlog buffer.*/
time_t repl_backlog_time_limit; /* Time without slaves after the backlog
gets released. */
time_t repl_no_slaves_since; /* We have no slaves since that time.
@@ -1547,6 +1598,9 @@ struct redisServer {
int repl_diskless_load; /* Slave parse RDB directly from the socket.
* see REPL_DISKLESS_LOAD_* enum */
int repl_diskless_sync_delay; /* Delay to start a diskless repl BGSAVE. */
+ size_t repl_buffer_mem; /* The memory of replication buffer. */
+ list *repl_buffer_blocks; /* Replication buffers blocks list
+ * (serving replica clients and repl backlog) */
/* Replication (slave) */
char *masteruser; /* AUTH with this user and masterauth with master */
sds masterauth; /* AUTH with this password with master */
@@ -2031,6 +2085,7 @@ void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask);
void acceptTLSHandler(aeEventLoop *el, int fd, void *privdata, int mask);
void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask);
void readQueryFromClient(connection *conn);
+int prepareClientToWrite(client *c);
void addReplyNull(client *c);
void addReplyNullArray(client *c);
void addReplyBool(client *c, int b);
@@ -2063,8 +2118,8 @@ void addReplyPushLen(client *c, long length);
void addReplyHelp(client *c, const char **help);
void addReplySubcommandSyntaxError(client *c);
void addReplyLoadedModules(client *c);
+void copyReplicaOutputBuffer(client *dst, client *src);
void addListRangeReply(client *c, robj *o, long start, long end, int reverse);
-void copyClientOutputBuffer(client *dst, client *src);
size_t sdsZmallocSize(sds s);
size_t getStringObjectSdsUsedMemory(robj *o);
void freeClientReplyValue(void *o);
@@ -2238,7 +2293,10 @@ ssize_t syncReadLine(int fd, char *ptr, ssize_t size, long long timeout);
/* Replication */
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc);
-void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t buflen);
+void replicationFeedStreamFromMasterStream(char *buf, size_t buflen);
+void resetReplicationBuffer(void);
+void feedReplicationBuffer(char *buf, size_t len);
+void freeReplicaReferencedReplBuffer(client *replica);
void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc);
void updateSlavesWaitingBgsave(int bgsaveerr, int type);
void replicationCron(void);
@@ -2264,8 +2322,11 @@ int replicationSetupSlaveForFullResync(client *slave, long long offset);
void changeReplicationId(void);
void clearReplicationId2(void);
void createReplicationBacklog(void);
+void freeReplicationBacklog(void);
void replicationCacheMasterUsingMyself(void);
void feedReplicationBacklog(void *ptr, size_t len);
+void incrementalTrimReplicationBacklog(size_t blocks);
+int canFeedReplicaReplBuffer(client *replica);
void showLatestBacklog(void);
void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
void rdbPipeWriteHandlerConnRemoved(struct connection *conn);
@@ -2613,7 +2674,7 @@ size_t lazyfreeGetPendingObjectsCount(void);
size_t lazyfreeGetFreedObjectsCount(void);
void lazyfreeResetStats(void);
void freeObjAsync(robj *key, robj *obj, int dbid);
-
+void freeReplicationBacklogRefMemAsync(list *blocks, rax *index);
/* API to get key arguments from commands */
int *getKeysPrepareResult(getKeysResult *result, int numkeys);