summaryrefslogtreecommitdiff
path: root/src/server.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/server.h')
-rw-r--r--src/server.h55
1 files changed, 40 insertions, 15 deletions
diff --git a/src/server.h b/src/server.h
index 9b26221f0..ee3b7df5c 100644
--- a/src/server.h
+++ b/src/server.h
@@ -59,6 +59,7 @@ typedef long long mstime_t; /* millisecond time type. */
#include "anet.h" /* Networking the easy way */
#include "ziplist.h" /* Compact list data structure */
#include "intset.h" /* Compact integer set structure */
+#include "stream.h" /* Stream data type header file. */
#include "version.h" /* Version macro */
#include "util.h" /* Misc functions useful in many places */
#include "latency.h" /* Latency monitor API */
@@ -255,6 +256,8 @@ typedef long long mstime_t; /* millisecond time type. */
#define BLOCKED_LIST 1 /* BLPOP & co. */
#define BLOCKED_WAIT 2 /* WAIT for synchronous replication. */
#define BLOCKED_MODULE 3 /* Blocked by a loadable module. */
+#define BLOCKED_STREAM 4 /* XREAD. */
+#define BLOCKED_NUM 5 /* Number of blocked states. */
/* Client request types */
#define PROTO_REQ_INLINE 1
@@ -424,7 +427,8 @@ typedef long long mstime_t; /* millisecond time type. */
#define NOTIFY_ZSET (1<<7) /* z */
#define NOTIFY_EXPIRED (1<<8) /* x */
#define NOTIFY_EVICTED (1<<9) /* e */
-#define NOTIFY_ALL (NOTIFY_GENERIC | NOTIFY_STRING | NOTIFY_LIST | NOTIFY_SET | NOTIFY_HASH | NOTIFY_ZSET | NOTIFY_EXPIRED | NOTIFY_EVICTED) /* A */
+#define NOTIFY_STREAM (1<<10) /* t */
+#define NOTIFY_ALL (NOTIFY_GENERIC | NOTIFY_STRING | NOTIFY_LIST | NOTIFY_SET | NOTIFY_HASH | NOTIFY_ZSET | NOTIFY_EXPIRED | NOTIFY_EVICTED | NOTIFY_STREAM) /* A flag */
/* Get the first bind addr or NULL */
#define NET_FIRST_BIND_ADDR (server.bindaddr_count ? server.bindaddr[0] : NULL)
@@ -446,11 +450,11 @@ typedef long long mstime_t; /* millisecond time type. */
/* A redis object, that is a type able to hold a string / list / set */
/* The actual Redis Object */
-#define OBJ_STRING 0
-#define OBJ_LIST 1
-#define OBJ_SET 2
-#define OBJ_ZSET 3
-#define OBJ_HASH 4
+#define OBJ_STRING 0 /* String object. */
+#define OBJ_LIST 1 /* List object. */
+#define OBJ_SET 2 /* Set object. */
+#define OBJ_ZSET 3 /* Sorted set object. */
+#define OBJ_HASH 4 /* Hash object. */
/* The "module" object type is a special one that signals that the object
* is one directly managed by a Redis module. In this case the value points
@@ -463,7 +467,8 @@ typedef long long mstime_t; /* millisecond time type. */
* by a 64 bit module type ID, which has a 54 bits module-specific signature
* in order to dispatch the loading to the right module, plus a 10 bits
* encoding version. */
-#define OBJ_MODULE 5
+#define OBJ_MODULE 5 /* Module object. */
+#define OBJ_STREAM 6 /* Stream object. */
/* Extract encver / signature from a module type ID. */
#define REDISMODULE_TYPE_ENCVER_BITS 10
@@ -575,6 +580,7 @@ typedef struct RedisModuleDigest {
#define OBJ_ENCODING_SKIPLIST 7 /* Encoded as skiplist */
#define OBJ_ENCODING_EMBSTR 8 /* Embedded sds string encoding */
#define OBJ_ENCODING_QUICKLIST 9 /* Encoded as linked list of ziplists */
+#define OBJ_ENCODING_STREAM 10 /* Encoded as a radix tree of listpacks */
#define LRU_BITS 24
#define LRU_CLOCK_MAX ((1<<LRU_BITS)-1) /* Max value of obj->lru */
@@ -586,7 +592,7 @@ typedef struct redisObject {
unsigned encoding:4;
unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or
* LFU data (least significant 8 bits frequency
- * and most significant 16 bits decreas time). */
+ * and most significant 16 bits access time). */
int refcount;
void *ptr;
} robj;
@@ -638,12 +644,17 @@ typedef struct blockingState {
mstime_t timeout; /* Blocking operation timeout. If UNIX current time
* is > timeout then the operation timed out. */
- /* BLOCKED_LIST */
+ /* BLOCKED_LIST and BLOCKED_STREAM */
dict *keys; /* The keys we are waiting to terminate a blocking
- * operation such as BLPOP. Otherwise NULL. */
+ * operation such as BLPOP or XREAD. Or NULL. */
robj *target; /* The key that should receive the element,
* for BRPOPLPUSH. */
+ /* BLOCK_STREAM */
+ size_t xread_count; /* XREAD COUNT option. */
+ robj *xread_group; /* XREAD group name. */
+ mstime_t xread_retry_time, xread_retry_ttl;
+
/* BLOCKED_WAIT */
int numreplicas; /* Number of replicas we are waiting for ACK. */
long long reploffset; /* Replication offset to reach. */
@@ -722,6 +733,7 @@ typedef struct client {
dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */
list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */
sds peerid; /* Cached peer ID. */
+ listNode *client_list_node; /* list node in client list */
/* Response buffer */
int bufpos;
@@ -1118,10 +1130,11 @@ struct redisServer {
unsigned long long maxmemory; /* Max number of memory bytes to use */
int maxmemory_policy; /* Policy for key eviction */
int maxmemory_samples; /* Pricision of random sampling */
- unsigned int lfu_log_factor; /* LFU logarithmic counter factor. */
- unsigned int lfu_decay_time; /* LFU counter decay factor. */
+ int lfu_log_factor; /* LFU logarithmic counter factor. */
+ int lfu_decay_time; /* LFU counter decay factor. */
/* Blocked clients */
- unsigned int bpop_blocked_clients; /* Number of clients blocked by lists */
+ unsigned int blocked_clients; /* # of clients executing a blocking cmd.*/
+ unsigned int blocked_clients_by_type[BLOCKED_NUM];
list *unblocked_clients; /* list of clients to unblock before next loop */
list *ready_keys; /* List of readyList structures for BLPOP & co */
/* Sort parameters - qsort_r() is only available under BSD so we
@@ -1288,6 +1301,7 @@ typedef struct {
extern struct redisServer server;
extern struct sharedObjectsStruct shared;
extern dictType objectKeyPointerValueDictType;
+extern dictType objectKeyHeapPointerValueDictType;
extern dictType setDictType;
extern dictType zsetDictType;
extern dictType clusterNodesDictType;
@@ -1386,6 +1400,7 @@ int handleClientsWithPendingWrites(void);
int clientHasPendingReplies(client *c);
void unlinkClient(client *c);
int writeToClient(int fd, client *c, int handler_installed);
+void linkClient(client *c);
#ifdef __GNUC__
void addReplyErrorFormat(client *c, const char *fmt, ...)
@@ -1411,9 +1426,7 @@ int listTypeEqual(listTypeEntry *entry, robj *o);
void listTypeDelete(listTypeIterator *iter, listTypeEntry *entry);
void listTypeConvert(robj *subject, int enc);
void unblockClientWaitingData(client *c);
-void handleClientsBlockedOnLists(void);
void popGenericCommand(client *c, int where);
-void signalListAsReady(redisDb *db, robj *key);
/* MULTI/EXEC/WATCH... */
void unwatchAllKeys(client *c);
@@ -1456,6 +1469,7 @@ robj *createIntsetObject(void);
robj *createHashObject(void);
robj *createZsetObject(void);
robj *createZsetZiplistObject(void);
+robj *createStreamObject(void);
robj *createModuleObject(moduleType *mt, void *value);
int getLongFromObjectOrReply(client *c, robj *o, long *target, const char *msg);
int checkType(client *c, robj *o, int type);
@@ -1755,6 +1769,7 @@ int *evalGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys);
int *sortGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys);
int *migrateGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys);
int *georadiusGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys);
+int *xreadGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys);
/* Cluster */
void clusterInit(void);
@@ -1782,6 +1797,7 @@ void scriptingInit(int setup);
int ldbRemoveChild(pid_t pid);
void ldbKillForkedSessions(void);
int ldbPendingChildren(void);
+sds luaCreateFunction(client *c, lua_State *lua, robj *body);
/* Blocked clients */
void processUnblockedClients(void);
@@ -1790,6 +1806,9 @@ void unblockClient(client *c);
void replyToBlockedClientTimedOut(client *c);
int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int unit);
void disconnectAllBlockedClients(void);
+void handleClientsBlockedOnKeys(void);
+void signalKeyAsReady(redisDb *db, robj *key);
+void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, streamID *ids);
/* expire.c -- Handling of expired keys */
void activeExpireCycle(int type);
@@ -1803,6 +1822,7 @@ void evictionPoolAlloc(void);
#define LFU_INIT_VAL 5
unsigned long LFUGetTimeInMinutes(void);
uint8_t LFULogIncr(uint8_t value);
+unsigned long LFUDecrAndReturn(robj *o);
/* Keys hashing / comparison functions for dict.c hash tables. */
uint64_t dictSdsHash(const void *key);
@@ -1991,6 +2011,11 @@ void pfdebugCommand(client *c);
void latencyCommand(client *c);
void moduleCommand(client *c);
void securityWarningCommand(client *c);
+void xaddCommand(client *c);
+void xrangeCommand(client *c);
+void xrevrangeCommand(client *c);
+void xlenCommand(client *c);
+void xreadCommand(client *c);
#if defined(__GNUC__)
void *calloc(size_t count, size_t size) __attribute__ ((deprecated));