summaryrefslogtreecommitdiff
path: root/src/server.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/server.h')
-rw-r--r--src/server.h85
1 files changed, 42 insertions, 43 deletions
diff --git a/src/server.h b/src/server.h
index 765b67d62..1201cab3b 100644
--- a/src/server.h
+++ b/src/server.h
@@ -359,7 +359,11 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
#define CLIENT_LUA_DEBUG_SYNC (1<<26) /* EVAL debugging without fork() */
#define CLIENT_MODULE (1<<27) /* Non connected client used by some module. */
#define CLIENT_PROTECTED (1<<28) /* Client should not be freed for now. */
-/* #define CLIENT_... (1<<29) currently unused, feel free to use in the future */
+#define CLIENT_EXECUTING_COMMAND (1<<29) /* Indicates that the client is currently in the process of handling
+ a command. usually this will be marked only during call()
+ however, blocked clients might have this flag kept until they
+ will try to reprocess the command. */
+
#define CLIENT_PENDING_COMMAND (1<<30) /* Indicates the client has a fully
* parsed command ready for execution. */
#define CLIENT_TRACKING (1ULL<<31) /* Client enabled keys tracking in order to
@@ -388,15 +392,18 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
/* Client block type (btype field in client structure)
* if CLIENT_BLOCKED flag is set. */
-#define BLOCKED_NONE 0 /* Not blocked, no CLIENT_BLOCKED flag set. */
-#define BLOCKED_LIST 1 /* BLPOP & co. */
-#define BLOCKED_WAIT 2 /* WAIT for synchronous replication. */
-#define BLOCKED_MODULE 3 /* Blocked by a loadable module. */
-#define BLOCKED_STREAM 4 /* XREAD. */
-#define BLOCKED_ZSET 5 /* BZPOP et al. */
-#define BLOCKED_POSTPONE 6 /* Blocked by processCommand, re-try processing later. */
-#define BLOCKED_SHUTDOWN 7 /* SHUTDOWN. */
-#define BLOCKED_NUM 8 /* Number of blocked states. */
+typedef enum blocking_type {
+ BLOCKED_NONE, /* Not blocked, no CLIENT_BLOCKED flag set. */
+ BLOCKED_LIST, /* BLPOP & co. */
+ BLOCKED_WAIT, /* WAIT for synchronous replication. */
+ BLOCKED_MODULE, /* Blocked by a loadable module. */
+ BLOCKED_STREAM, /* XREAD. */
+ BLOCKED_ZSET, /* BZPOP et al. */
+ BLOCKED_POSTPONE, /* Blocked by processCommand, re-try processing later. */
+ BLOCKED_SHUTDOWN, /* SHUTDOWN. */
+ BLOCKED_NUM, /* Number of blocked states. */
+ BLOCKED_END /* End of enumeration */
+} blocking_type;
/* Client request types */
#define PROTO_REQ_INLINE 1
@@ -569,13 +576,11 @@ typedef enum {
/* Command call flags, see call() function */
#define CMD_CALL_NONE 0
-#define CMD_CALL_SLOWLOG (1<<0)
-#define CMD_CALL_STATS (1<<1)
-#define CMD_CALL_PROPAGATE_AOF (1<<2)
-#define CMD_CALL_PROPAGATE_REPL (1<<3)
+#define CMD_CALL_PROPAGATE_AOF (1<<0)
+#define CMD_CALL_PROPAGATE_REPL (1<<1)
#define CMD_CALL_PROPAGATE (CMD_CALL_PROPAGATE_AOF|CMD_CALL_PROPAGATE_REPL)
-#define CMD_CALL_FULL (CMD_CALL_SLOWLOG | CMD_CALL_STATS | CMD_CALL_PROPAGATE)
-#define CMD_CALL_FROM_MODULE (1<<4) /* From RM_Call */
+#define CMD_CALL_FULL (CMD_CALL_PROPAGATE)
+#define CMD_CALL_FROM_MODULE (1<<2) /* From RM_Call */
/* Command propagation flags, see propagateNow() function */
#define PROPAGATE_NONE 0
@@ -992,27 +997,13 @@ typedef struct multiState {
* The fields used depend on client->btype. */
typedef struct blockingState {
/* Generic fields. */
- long count; /* Elements to pop if count was specified (BLMPOP/BZMPOP), -1 otherwise. */
- mstime_t timeout; /* Blocking operation timeout. If UNIX current time
- * is > timeout then the operation timed out. */
-
- /* BLOCKED_LIST, BLOCKED_ZSET and BLOCKED_STREAM */
- dict *keys; /* The keys we are waiting to terminate a blocking
- * operation such as BLPOP or XREAD. Or NULL. */
- robj *target; /* The key that should receive the element,
- * for BLMOVE. */
- struct blockPos {
- int wherefrom; /* Where to pop from */
- int whereto; /* Where to push to */
- } blockpos; /* The positions in the src/dst lists/zsets
- * where we want to pop/push an element
- * for BLPOP, BRPOP, BLMOVE and BZMPOP. */
-
- /* BLOCK_STREAM */
- size_t xread_count; /* XREAD COUNT option. */
- robj *xread_group; /* XREADGROUP group name. */
- robj *xread_consumer; /* XREADGROUP consumer name. */
- int xread_group_noack;
+ blocking_type btype; /* Type of blocking op if CLIENT_BLOCKED. */
+ mstime_t timeout; /* Blocking operation timeout. If UNIX current time
+ * is > timeout then the operation timed out. */
+ int unblock_on_nokey; /* Whether to unblock the client when at least one of the keys
+ is deleted or does not exist anymore */
+ /* BLOCKED_LIST, BLOCKED_ZSET and BLOCKED_STREAM or any other Keys related blocking */
+ dict *keys; /* The keys we are blocked on */
/* BLOCKED_WAIT */
int numreplicas; /* Number of replicas we are waiting for ACK. */
@@ -1029,7 +1020,7 @@ typedef struct blockingState {
* operation such as B[LR]POP, but received new data in the context of the
* last executed command.
*
- * After the execution of every command or script, we run this list to check
+ * After the execution of every command or script, we iterate over this list to check
* if as a result we should serve data to clients blocked, unblocking them.
* Note that server.ready_keys will not have duplicates as there dictionary
* also called ready_keys in every structure representing a Redis database,
@@ -1167,8 +1158,7 @@ typedef struct client {
int slave_capa; /* Slave capabilities: SLAVE_CAPA_* bitwise OR. */
int slave_req; /* Slave requirements: SLAVE_REQ_* */
multiState mstate; /* MULTI/EXEC state */
- int btype; /* Type of blocking op if CLIENT_BLOCKED. */
- blockingState bpop; /* blocking state */
+ blockingState bstate; /* blocking state */
long long woff; /* Last write global replication offset. */
list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */
dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */
@@ -2635,7 +2625,6 @@ int listTypeEqual(listTypeEntry *entry, robj *o);
void listTypeDelete(listTypeIterator *iter, listTypeEntry *entry);
robj *listTypeDup(robj *o);
void listTypeDelRange(robj *o, long start, long stop);
-void unblockClientWaitingData(client *c);
void popGenericCommand(client *c, int where);
void listElementsRemoved(client *c, robj *key, int where, robj *o, long count, int signal, int *deleted);
typedef enum {
@@ -2938,6 +2927,7 @@ int overMaxmemoryAfterAlloc(size_t moremem);
uint64_t getCommandFlags(client *c);
int processCommand(client *c);
int processPendingCommandAndInputBuffer(client *c);
+int processCommandAndResetClient(client *c);
void setupSignalHandlers(void);
void removeSignalHandlers(void);
int createSocketAcceptHandler(connListener *sfd, aeFileProc *accept_handler);
@@ -3166,6 +3156,7 @@ int objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle,
#define LOOKUP_NOSTATS (1<<2) /* Don't update keyspace hits/misses counters. */
#define LOOKUP_WRITE (1<<3) /* Delete expired keys even in replicas. */
#define LOOKUP_NOEXPIRE (1<<4) /* Avoid deleting lazy expired keys. */
+#define LOKKUP_NOEFFECTS (LOOKUP_NONOTIFY | LOOKUP_NOSTATS | LOOKUP_NOTOUCH | LOOKUP_NOEXPIRE) /* Avoid any effects from fetching the key */
void dbAdd(redisDb *db, robj *key, robj *val);
int dbAddRDBLoad(redisDb *db, sds key, robj *val);
@@ -3281,20 +3272,28 @@ typedef struct luaScript {
robj *body;
} luaScript;
-/* Blocked clients */
+/* Blocked clients API */
void processUnblockedClients(void);
+void initClientBlockingState(client *c);
void blockClient(client *c, int btype);
void unblockClient(client *c);
+void unblockClientOnTimeout(client *c);
+void unblockClientOnError(client *c, const char *err_str);
void queueClientForReprocessing(client *c);
void replyToBlockedClientTimedOut(client *c);
int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int unit);
void disconnectAllBlockedClients(void);
void handleClientsBlockedOnKeys(void);
void signalKeyAsReady(redisDb *db, robj *key, int type);
+void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, int unblock_on_nokey);
+void blockClientShutdown(client *c);
+void blockPostponeClient(client *c);
+void blockForReplication(client *c, mstime_t timeout, long long offset, long numreplicas);
void signalDeletedKeyAsReady(redisDb *db, robj *key, int type);
-void blockForKeys(client *c, int btype, robj **keys, int numkeys, long count, mstime_t timeout, robj *target, struct blockPos *blockpos, streamID *ids, int unblock_on_nokey);
void updateStatsOnUnblock(client *c, long blocked_us, long reply_us, int had_errors);
void scanDatabaseForDeletedKeys(redisDb *emptied, redisDb *replaced_with);
+void totalNumberOfBlockingKeys(unsigned long *blocking_keys, unsigned long *bloking_keys_on_nokey);
+
/* timeout.c -- Blocked clients timeout and connections timeout. */
void addClientToTimeoutTable(client *c);