diff options
Diffstat (limited to 'src/server.h')
-rw-r--r-- | src/server.h | 55 |
1 files changed, 40 insertions, 15 deletions
diff --git a/src/server.h b/src/server.h index 9b26221f0..ee3b7df5c 100644 --- a/src/server.h +++ b/src/server.h @@ -59,6 +59,7 @@ typedef long long mstime_t; /* millisecond time type. */ #include "anet.h" /* Networking the easy way */ #include "ziplist.h" /* Compact list data structure */ #include "intset.h" /* Compact integer set structure */ +#include "stream.h" /* Stream data type header file. */ #include "version.h" /* Version macro */ #include "util.h" /* Misc functions useful in many places */ #include "latency.h" /* Latency monitor API */ @@ -255,6 +256,8 @@ typedef long long mstime_t; /* millisecond time type. */ #define BLOCKED_LIST 1 /* BLPOP & co. */ #define BLOCKED_WAIT 2 /* WAIT for synchronous replication. */ #define BLOCKED_MODULE 3 /* Blocked by a loadable module. */ +#define BLOCKED_STREAM 4 /* XREAD. */ +#define BLOCKED_NUM 5 /* Number of blocked states. */ /* Client request types */ #define PROTO_REQ_INLINE 1 @@ -424,7 +427,8 @@ typedef long long mstime_t; /* millisecond time type. */ #define NOTIFY_ZSET (1<<7) /* z */ #define NOTIFY_EXPIRED (1<<8) /* x */ #define NOTIFY_EVICTED (1<<9) /* e */ -#define NOTIFY_ALL (NOTIFY_GENERIC | NOTIFY_STRING | NOTIFY_LIST | NOTIFY_SET | NOTIFY_HASH | NOTIFY_ZSET | NOTIFY_EXPIRED | NOTIFY_EVICTED) /* A */ +#define NOTIFY_STREAM (1<<10) /* t */ +#define NOTIFY_ALL (NOTIFY_GENERIC | NOTIFY_STRING | NOTIFY_LIST | NOTIFY_SET | NOTIFY_HASH | NOTIFY_ZSET | NOTIFY_EXPIRED | NOTIFY_EVICTED | NOTIFY_STREAM) /* A flag */ /* Get the first bind addr or NULL */ #define NET_FIRST_BIND_ADDR (server.bindaddr_count ? server.bindaddr[0] : NULL) @@ -446,11 +450,11 @@ typedef long long mstime_t; /* millisecond time type. */ /* A redis object, that is a type able to hold a string / list / set */ /* The actual Redis Object */ -#define OBJ_STRING 0 -#define OBJ_LIST 1 -#define OBJ_SET 2 -#define OBJ_ZSET 3 -#define OBJ_HASH 4 +#define OBJ_STRING 0 /* String object. */ +#define OBJ_LIST 1 /* List object. */ +#define OBJ_SET 2 /* Set object. */ +#define OBJ_ZSET 3 /* Sorted set object. */ +#define OBJ_HASH 4 /* Hash object. */ /* The "module" object type is a special one that signals that the object * is one directly managed by a Redis module. In this case the value points @@ -463,7 +467,8 @@ typedef long long mstime_t; /* millisecond time type. */ * by a 64 bit module type ID, which has a 54 bits module-specific signature * in order to dispatch the loading to the right module, plus a 10 bits * encoding version. */ -#define OBJ_MODULE 5 +#define OBJ_MODULE 5 /* Module object. */ +#define OBJ_STREAM 6 /* Stream object. */ /* Extract encver / signature from a module type ID. */ #define REDISMODULE_TYPE_ENCVER_BITS 10 @@ -575,6 +580,7 @@ typedef struct RedisModuleDigest { #define OBJ_ENCODING_SKIPLIST 7 /* Encoded as skiplist */ #define OBJ_ENCODING_EMBSTR 8 /* Embedded sds string encoding */ #define OBJ_ENCODING_QUICKLIST 9 /* Encoded as linked list of ziplists */ +#define OBJ_ENCODING_STREAM 10 /* Encoded as a radix tree of listpacks */ #define LRU_BITS 24 #define LRU_CLOCK_MAX ((1<<LRU_BITS)-1) /* Max value of obj->lru */ @@ -586,7 +592,7 @@ typedef struct redisObject { unsigned encoding:4; unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or * LFU data (least significant 8 bits frequency - * and most significant 16 bits decreas time). */ + * and most significant 16 bits access time). */ int refcount; void *ptr; } robj; @@ -638,12 +644,17 @@ typedef struct blockingState { mstime_t timeout; /* Blocking operation timeout. If UNIX current time * is > timeout then the operation timed out. */ - /* BLOCKED_LIST */ + /* BLOCKED_LIST and BLOCKED_STREAM */ dict *keys; /* The keys we are waiting to terminate a blocking - * operation such as BLPOP. Otherwise NULL. */ + * operation such as BLPOP or XREAD. Or NULL. */ robj *target; /* The key that should receive the element, * for BRPOPLPUSH. */ + /* BLOCK_STREAM */ + size_t xread_count; /* XREAD COUNT option. */ + robj *xread_group; /* XREAD group name. */ + mstime_t xread_retry_time, xread_retry_ttl; + /* BLOCKED_WAIT */ int numreplicas; /* Number of replicas we are waiting for ACK. */ long long reploffset; /* Replication offset to reach. */ @@ -722,6 +733,7 @@ typedef struct client { dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */ list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */ sds peerid; /* Cached peer ID. */ + listNode *client_list_node; /* list node in client list */ /* Response buffer */ int bufpos; @@ -1118,10 +1130,11 @@ struct redisServer { unsigned long long maxmemory; /* Max number of memory bytes to use */ int maxmemory_policy; /* Policy for key eviction */ int maxmemory_samples; /* Pricision of random sampling */ - unsigned int lfu_log_factor; /* LFU logarithmic counter factor. */ - unsigned int lfu_decay_time; /* LFU counter decay factor. */ + int lfu_log_factor; /* LFU logarithmic counter factor. */ + int lfu_decay_time; /* LFU counter decay factor. */ /* Blocked clients */ - unsigned int bpop_blocked_clients; /* Number of clients blocked by lists */ + unsigned int blocked_clients; /* # of clients executing a blocking cmd.*/ + unsigned int blocked_clients_by_type[BLOCKED_NUM]; list *unblocked_clients; /* list of clients to unblock before next loop */ list *ready_keys; /* List of readyList structures for BLPOP & co */ /* Sort parameters - qsort_r() is only available under BSD so we @@ -1288,6 +1301,7 @@ typedef struct { extern struct redisServer server; extern struct sharedObjectsStruct shared; extern dictType objectKeyPointerValueDictType; +extern dictType objectKeyHeapPointerValueDictType; extern dictType setDictType; extern dictType zsetDictType; extern dictType clusterNodesDictType; @@ -1386,6 +1400,7 @@ int handleClientsWithPendingWrites(void); int clientHasPendingReplies(client *c); void unlinkClient(client *c); int writeToClient(int fd, client *c, int handler_installed); +void linkClient(client *c); #ifdef __GNUC__ void addReplyErrorFormat(client *c, const char *fmt, ...) @@ -1411,9 +1426,7 @@ int listTypeEqual(listTypeEntry *entry, robj *o); void listTypeDelete(listTypeIterator *iter, listTypeEntry *entry); void listTypeConvert(robj *subject, int enc); void unblockClientWaitingData(client *c); -void handleClientsBlockedOnLists(void); void popGenericCommand(client *c, int where); -void signalListAsReady(redisDb *db, robj *key); /* MULTI/EXEC/WATCH... */ void unwatchAllKeys(client *c); @@ -1456,6 +1469,7 @@ robj *createIntsetObject(void); robj *createHashObject(void); robj *createZsetObject(void); robj *createZsetZiplistObject(void); +robj *createStreamObject(void); robj *createModuleObject(moduleType *mt, void *value); int getLongFromObjectOrReply(client *c, robj *o, long *target, const char *msg); int checkType(client *c, robj *o, int type); @@ -1755,6 +1769,7 @@ int *evalGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys); int *sortGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys); int *migrateGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys); int *georadiusGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys); +int *xreadGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys); /* Cluster */ void clusterInit(void); @@ -1782,6 +1797,7 @@ void scriptingInit(int setup); int ldbRemoveChild(pid_t pid); void ldbKillForkedSessions(void); int ldbPendingChildren(void); +sds luaCreateFunction(client *c, lua_State *lua, robj *body); /* Blocked clients */ void processUnblockedClients(void); @@ -1790,6 +1806,9 @@ void unblockClient(client *c); void replyToBlockedClientTimedOut(client *c); int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int unit); void disconnectAllBlockedClients(void); +void handleClientsBlockedOnKeys(void); +void signalKeyAsReady(redisDb *db, robj *key); +void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, streamID *ids); /* expire.c -- Handling of expired keys */ void activeExpireCycle(int type); @@ -1803,6 +1822,7 @@ void evictionPoolAlloc(void); #define LFU_INIT_VAL 5 unsigned long LFUGetTimeInMinutes(void); uint8_t LFULogIncr(uint8_t value); +unsigned long LFUDecrAndReturn(robj *o); /* Keys hashing / comparison functions for dict.c hash tables. */ uint64_t dictSdsHash(const void *key); @@ -1991,6 +2011,11 @@ void pfdebugCommand(client *c); void latencyCommand(client *c); void moduleCommand(client *c); void securityWarningCommand(client *c); +void xaddCommand(client *c); +void xrangeCommand(client *c); +void xrevrangeCommand(client *c); +void xlenCommand(client *c); +void xreadCommand(client *c); #if defined(__GNUC__) void *calloc(size_t count, size_t size) __attribute__ ((deprecated)); |