diff options
Diffstat (limited to 'src/server.h')
-rw-r--r-- | src/server.h | 85 |
1 files changed, 42 insertions, 43 deletions
diff --git a/src/server.h b/src/server.h index 765b67d62..1201cab3b 100644 --- a/src/server.h +++ b/src/server.h @@ -359,7 +359,11 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT]; #define CLIENT_LUA_DEBUG_SYNC (1<<26) /* EVAL debugging without fork() */ #define CLIENT_MODULE (1<<27) /* Non connected client used by some module. */ #define CLIENT_PROTECTED (1<<28) /* Client should not be freed for now. */ -/* #define CLIENT_... (1<<29) currently unused, feel free to use in the future */ +#define CLIENT_EXECUTING_COMMAND (1<<29) /* Indicates that the client is currently in the process of handling + a command. usually this will be marked only during call() + however, blocked clients might have this flag kept until they + will try to reprocess the command. */ + #define CLIENT_PENDING_COMMAND (1<<30) /* Indicates the client has a fully * parsed command ready for execution. */ #define CLIENT_TRACKING (1ULL<<31) /* Client enabled keys tracking in order to @@ -388,15 +392,18 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT]; /* Client block type (btype field in client structure) * if CLIENT_BLOCKED flag is set. */ -#define BLOCKED_NONE 0 /* Not blocked, no CLIENT_BLOCKED flag set. */ -#define BLOCKED_LIST 1 /* BLPOP & co. */ -#define BLOCKED_WAIT 2 /* WAIT for synchronous replication. */ -#define BLOCKED_MODULE 3 /* Blocked by a loadable module. */ -#define BLOCKED_STREAM 4 /* XREAD. */ -#define BLOCKED_ZSET 5 /* BZPOP et al. */ -#define BLOCKED_POSTPONE 6 /* Blocked by processCommand, re-try processing later. */ -#define BLOCKED_SHUTDOWN 7 /* SHUTDOWN. */ -#define BLOCKED_NUM 8 /* Number of blocked states. */ +typedef enum blocking_type { + BLOCKED_NONE, /* Not blocked, no CLIENT_BLOCKED flag set. */ + BLOCKED_LIST, /* BLPOP & co. */ + BLOCKED_WAIT, /* WAIT for synchronous replication. */ + BLOCKED_MODULE, /* Blocked by a loadable module. */ + BLOCKED_STREAM, /* XREAD. */ + BLOCKED_ZSET, /* BZPOP et al. */ + BLOCKED_POSTPONE, /* Blocked by processCommand, re-try processing later. */ + BLOCKED_SHUTDOWN, /* SHUTDOWN. */ + BLOCKED_NUM, /* Number of blocked states. */ + BLOCKED_END /* End of enumeration */ +} blocking_type; /* Client request types */ #define PROTO_REQ_INLINE 1 @@ -569,13 +576,11 @@ typedef enum { /* Command call flags, see call() function */ #define CMD_CALL_NONE 0 -#define CMD_CALL_SLOWLOG (1<<0) -#define CMD_CALL_STATS (1<<1) -#define CMD_CALL_PROPAGATE_AOF (1<<2) -#define CMD_CALL_PROPAGATE_REPL (1<<3) +#define CMD_CALL_PROPAGATE_AOF (1<<0) +#define CMD_CALL_PROPAGATE_REPL (1<<1) #define CMD_CALL_PROPAGATE (CMD_CALL_PROPAGATE_AOF|CMD_CALL_PROPAGATE_REPL) -#define CMD_CALL_FULL (CMD_CALL_SLOWLOG | CMD_CALL_STATS | CMD_CALL_PROPAGATE) -#define CMD_CALL_FROM_MODULE (1<<4) /* From RM_Call */ +#define CMD_CALL_FULL (CMD_CALL_PROPAGATE) +#define CMD_CALL_FROM_MODULE (1<<2) /* From RM_Call */ /* Command propagation flags, see propagateNow() function */ #define PROPAGATE_NONE 0 @@ -992,27 +997,13 @@ typedef struct multiState { * The fields used depend on client->btype. */ typedef struct blockingState { /* Generic fields. */ - long count; /* Elements to pop if count was specified (BLMPOP/BZMPOP), -1 otherwise. */ - mstime_t timeout; /* Blocking operation timeout. If UNIX current time - * is > timeout then the operation timed out. */ - - /* BLOCKED_LIST, BLOCKED_ZSET and BLOCKED_STREAM */ - dict *keys; /* The keys we are waiting to terminate a blocking - * operation such as BLPOP or XREAD. Or NULL. */ - robj *target; /* The key that should receive the element, - * for BLMOVE. */ - struct blockPos { - int wherefrom; /* Where to pop from */ - int whereto; /* Where to push to */ - } blockpos; /* The positions in the src/dst lists/zsets - * where we want to pop/push an element - * for BLPOP, BRPOP, BLMOVE and BZMPOP. */ - - /* BLOCK_STREAM */ - size_t xread_count; /* XREAD COUNT option. */ - robj *xread_group; /* XREADGROUP group name. */ - robj *xread_consumer; /* XREADGROUP consumer name. */ - int xread_group_noack; + blocking_type btype; /* Type of blocking op if CLIENT_BLOCKED. */ + mstime_t timeout; /* Blocking operation timeout. If UNIX current time + * is > timeout then the operation timed out. */ + int unblock_on_nokey; /* Whether to unblock the client when at least one of the keys + is deleted or does not exist anymore */ + /* BLOCKED_LIST, BLOCKED_ZSET and BLOCKED_STREAM or any other Keys related blocking */ + dict *keys; /* The keys we are blocked on */ /* BLOCKED_WAIT */ int numreplicas; /* Number of replicas we are waiting for ACK. */ @@ -1029,7 +1020,7 @@ typedef struct blockingState { * operation such as B[LR]POP, but received new data in the context of the * last executed command. * - * After the execution of every command or script, we run this list to check + * After the execution of every command or script, we iterate over this list to check * if as a result we should serve data to clients blocked, unblocking them. * Note that server.ready_keys will not have duplicates as there dictionary * also called ready_keys in every structure representing a Redis database, @@ -1167,8 +1158,7 @@ typedef struct client { int slave_capa; /* Slave capabilities: SLAVE_CAPA_* bitwise OR. */ int slave_req; /* Slave requirements: SLAVE_REQ_* */ multiState mstate; /* MULTI/EXEC state */ - int btype; /* Type of blocking op if CLIENT_BLOCKED. */ - blockingState bpop; /* blocking state */ + blockingState bstate; /* blocking state */ long long woff; /* Last write global replication offset. */ list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */ dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */ @@ -2635,7 +2625,6 @@ int listTypeEqual(listTypeEntry *entry, robj *o); void listTypeDelete(listTypeIterator *iter, listTypeEntry *entry); robj *listTypeDup(robj *o); void listTypeDelRange(robj *o, long start, long stop); -void unblockClientWaitingData(client *c); void popGenericCommand(client *c, int where); void listElementsRemoved(client *c, robj *key, int where, robj *o, long count, int signal, int *deleted); typedef enum { @@ -2938,6 +2927,7 @@ int overMaxmemoryAfterAlloc(size_t moremem); uint64_t getCommandFlags(client *c); int processCommand(client *c); int processPendingCommandAndInputBuffer(client *c); +int processCommandAndResetClient(client *c); void setupSignalHandlers(void); void removeSignalHandlers(void); int createSocketAcceptHandler(connListener *sfd, aeFileProc *accept_handler); @@ -3166,6 +3156,7 @@ int objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle, #define LOOKUP_NOSTATS (1<<2) /* Don't update keyspace hits/misses counters. */ #define LOOKUP_WRITE (1<<3) /* Delete expired keys even in replicas. */ #define LOOKUP_NOEXPIRE (1<<4) /* Avoid deleting lazy expired keys. */ +#define LOKKUP_NOEFFECTS (LOOKUP_NONOTIFY | LOOKUP_NOSTATS | LOOKUP_NOTOUCH | LOOKUP_NOEXPIRE) /* Avoid any effects from fetching the key */ void dbAdd(redisDb *db, robj *key, robj *val); int dbAddRDBLoad(redisDb *db, sds key, robj *val); @@ -3281,20 +3272,28 @@ typedef struct luaScript { robj *body; } luaScript; -/* Blocked clients */ +/* Blocked clients API */ void processUnblockedClients(void); +void initClientBlockingState(client *c); void blockClient(client *c, int btype); void unblockClient(client *c); +void unblockClientOnTimeout(client *c); +void unblockClientOnError(client *c, const char *err_str); void queueClientForReprocessing(client *c); void replyToBlockedClientTimedOut(client *c); int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int unit); void disconnectAllBlockedClients(void); void handleClientsBlockedOnKeys(void); void signalKeyAsReady(redisDb *db, robj *key, int type); +void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, int unblock_on_nokey); +void blockClientShutdown(client *c); +void blockPostponeClient(client *c); +void blockForReplication(client *c, mstime_t timeout, long long offset, long numreplicas); void signalDeletedKeyAsReady(redisDb *db, robj *key, int type); -void blockForKeys(client *c, int btype, robj **keys, int numkeys, long count, mstime_t timeout, robj *target, struct blockPos *blockpos, streamID *ids, int unblock_on_nokey); void updateStatsOnUnblock(client *c, long blocked_us, long reply_us, int had_errors); void scanDatabaseForDeletedKeys(redisDb *emptied, redisDb *replaced_with); +void totalNumberOfBlockingKeys(unsigned long *blocking_keys, unsigned long *bloking_keys_on_nokey); + /* timeout.c -- Blocked clients timeout and connections timeout. */ void addClientToTimeoutTable(client *c); |