diff options
Diffstat (limited to 'src/server.h')
-rw-r--r-- | src/server.h | 81 |
1 files changed, 71 insertions, 10 deletions
diff --git a/src/server.h b/src/server.h index b911697a1..21d5fcd65 100644 --- a/src/server.h +++ b/src/server.h @@ -377,6 +377,13 @@ typedef enum { /* Synchronous read timeout - slave side */ #define CONFIG_REPL_SYNCIO_TIMEOUT 5 +/* The default number of replication backlog blocks to trim per call. */ +#define REPL_BACKLOG_TRIM_BLOCKS_PER_CALL 64 + +/* In order to quickly find the requested offset for PSYNC requests, + * we index some nodes in the replication buffer linked list into a rax. */ +#define REPL_BACKLOG_INDEX_PER_BLOCKS 64 + /* List related stuff */ #define LIST_HEAD 0 #define LIST_TAIL 1 @@ -767,6 +774,33 @@ typedef struct clientReplyBlock { char buf[]; } clientReplyBlock; +/* Replication buffer blocks is the list of replBufBlock. + * + * +--------------+ +--------------+ +--------------+ + * | refcount = 1 | ... | refcount = 0 | ... | refcount = 2 | + * +--------------+ +--------------+ +--------------+ + * | / \ + * | / \ + * | / \ + * Repl Backlog Replia_A Replia_B + * + * Each replica or replication backlog increments only the refcount of the + * 'ref_repl_buf_node' which it points to. So when replica walks to the next + * node, it should first increase the next node's refcount, and when we trim + * the replication buffer nodes, we remove node always from the head node which + * refcount is 0. If the refcount of the head node is not 0, we must stop + * trimming and never iterate the next node. */ + +/* Similar with 'clientReplyBlock', it is used for shared buffers between + * all replica clients and replication backlog. */ +typedef struct replBufBlock { + int refcount; /* Number of replicas or repl backlog using. */ + long long id; /* The unique incremental number. */ + long long repl_offset; /* Start replication offset of the block. */ + size_t size, used; + char buf[]; +} replBufBlock; + /* Redis database representation. There are multiple databases identified * by integers from 0 (the default database) up to the max configured * database. The database number is the 'id' field in the structure. */ @@ -929,6 +963,24 @@ typedef struct { need more reserved IDs use UINT64_MAX-1, -2, ... and so forth. */ +/* Replication backlog is not separate memory, it just is one consumer of + * the global replication buffer. This structure records the reference of + * replication buffers. Since the replication buffer block list may be very long, + * it would cost much time to search replication offset on partial resync, so + * we use one rax tree to index some blocks every REPL_BACKLOG_INDEX_PER_BLOCKS + * to make searching offset from replication buffer blocks list faster. */ +typedef struct replBacklog { + listNode *ref_repl_buf_node; /* Referenced node of replication buffer blocks, + * see the definition of replBufBlock. */ + size_t unindexed_count; /* The count from last creating index block. */ + rax *blocks_index; /* The index of reocrded blocks of replication + * buffer for quickly searching replication + * offset on partial resynchronization. */ + long long histlen; /* Backlog actual data length */ + long long offset; /* Replication "master offset" of first + * byte in the replication backlog buffer.*/ +} replBacklog; + typedef struct { list *clients; size_t mem_usage_sum; @@ -1029,6 +1081,11 @@ typedef struct client { listNode *mem_usage_bucket_node; clientMemUsageBucket *mem_usage_bucket; + listNode *ref_repl_buf_node; /* Referenced node of replication buffer blocks, + * see the definition of replBufBlock. */ + size_t ref_block_pos; /* Access position of referenced buffer block, + * i.e. the next offset to send. */ + /* Response buffer */ int bufpos; size_t buf_usable_size; /* Usable size of buffer. */ @@ -1528,14 +1585,8 @@ struct redisServer { long long second_replid_offset; /* Accept offsets up to this for replid2. */ int slaveseldb; /* Last SELECTed DB in replication output */ int repl_ping_slave_period; /* Master pings the slave every N seconds */ - char *repl_backlog; /* Replication backlog for partial syncs */ + replBacklog *repl_backlog; /* Replication backlog for partial syncs */ long long repl_backlog_size; /* Backlog circular buffer size */ - long long cfg_repl_backlog_size;/* Backlog circular buffer size in config */ - long long repl_backlog_histlen; /* Backlog actual data length */ - long long repl_backlog_idx; /* Backlog circular buffer current offset, - that is the next byte will'll write to.*/ - long long repl_backlog_off; /* Replication "master offset" of first - byte in the replication backlog buffer.*/ time_t repl_backlog_time_limit; /* Time without slaves after the backlog gets released. */ time_t repl_no_slaves_since; /* We have no slaves since that time. @@ -1547,6 +1598,9 @@ struct redisServer { int repl_diskless_load; /* Slave parse RDB directly from the socket. * see REPL_DISKLESS_LOAD_* enum */ int repl_diskless_sync_delay; /* Delay to start a diskless repl BGSAVE. */ + size_t repl_buffer_mem; /* The memory of replication buffer. */ + list *repl_buffer_blocks; /* Replication buffers blocks list + * (serving replica clients and repl backlog) */ /* Replication (slave) */ char *masteruser; /* AUTH with this user and masterauth with master */ sds masterauth; /* AUTH with this password with master */ @@ -2031,6 +2085,7 @@ void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask); void acceptTLSHandler(aeEventLoop *el, int fd, void *privdata, int mask); void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask); void readQueryFromClient(connection *conn); +int prepareClientToWrite(client *c); void addReplyNull(client *c); void addReplyNullArray(client *c); void addReplyBool(client *c, int b); @@ -2063,8 +2118,8 @@ void addReplyPushLen(client *c, long length); void addReplyHelp(client *c, const char **help); void addReplySubcommandSyntaxError(client *c); void addReplyLoadedModules(client *c); +void copyReplicaOutputBuffer(client *dst, client *src); void addListRangeReply(client *c, robj *o, long start, long end, int reverse); -void copyClientOutputBuffer(client *dst, client *src); size_t sdsZmallocSize(sds s); size_t getStringObjectSdsUsedMemory(robj *o); void freeClientReplyValue(void *o); @@ -2238,7 +2293,10 @@ ssize_t syncReadLine(int fd, char *ptr, ssize_t size, long long timeout); /* Replication */ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc); -void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t buflen); +void replicationFeedStreamFromMasterStream(char *buf, size_t buflen); +void resetReplicationBuffer(void); +void feedReplicationBuffer(char *buf, size_t len); +void freeReplicaReferencedReplBuffer(client *replica); void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc); void updateSlavesWaitingBgsave(int bgsaveerr, int type); void replicationCron(void); @@ -2264,8 +2322,11 @@ int replicationSetupSlaveForFullResync(client *slave, long long offset); void changeReplicationId(void); void clearReplicationId2(void); void createReplicationBacklog(void); +void freeReplicationBacklog(void); void replicationCacheMasterUsingMyself(void); void feedReplicationBacklog(void *ptr, size_t len); +void incrementalTrimReplicationBacklog(size_t blocks); +int canFeedReplicaReplBuffer(client *replica); void showLatestBacklog(void); void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask); void rdbPipeWriteHandlerConnRemoved(struct connection *conn); @@ -2613,7 +2674,7 @@ size_t lazyfreeGetPendingObjectsCount(void); size_t lazyfreeGetFreedObjectsCount(void); void lazyfreeResetStats(void); void freeObjAsync(robj *key, robj *obj, int dbid); - +void freeReplicationBacklogRefMemAsync(list *blocks, rax *index); /* API to get key arguments from commands */ int *getKeysPrepareResult(getKeysResult *result, int numkeys); |