diff options
Diffstat (limited to 'src/server.h')
-rw-r--r-- | src/server.h | 99 |
1 files changed, 77 insertions, 22 deletions
diff --git a/src/server.h b/src/server.h index 485c051fd..4da7a010f 100644 --- a/src/server.h +++ b/src/server.h @@ -57,6 +57,10 @@ #include <systemd/sd-daemon.h> #endif +#ifndef static_assert +#define static_assert(expr, lit) extern char __static_assert_failure[(expr) ? 1:-1] +#endif + typedef long long mstime_t; /* millisecond time type. */ typedef long long ustime_t; /* microsecond time type. */ @@ -105,6 +109,7 @@ typedef long long ustime_t; /* microsecond time type. */ #define PROTO_SHARED_SELECT_CMDS 10 #define OBJ_SHARED_INTEGERS 10000 #define OBJ_SHARED_BULKHDR_LEN 32 +#define OBJ_SHARED_HDR_STRLEN(_len_) (((_len_) < 10) ? 4 : 5) /* see shared.mbulkhdr etc. */ #define LOG_MAX_LEN 1024 /* Default maximum length of syslog messages.*/ #define AOF_REWRITE_ITEMS_PER_CMD 64 #define AOF_ANNOTATION_LINE_MAX_LEN 1024 @@ -159,11 +164,14 @@ typedef long long ustime_t; /* microsecond time type. */ #define PROTO_INLINE_MAX_SIZE (1024*64) /* Max size of inline reads */ #define PROTO_MBULK_BIG_ARG (1024*32) #define PROTO_RESIZE_THRESHOLD (1024*32) /* Threshold for determining whether to resize query buffer */ +#define PROTO_REPLY_MIN_BYTES (1024) /* the lower limit on reply buffer size */ #define LONG_STR_SIZE 21 /* Bytes needed for long -> str + '\0' */ #define REDIS_AUTOSYNC_BYTES (1024*1024*4) /* Sync file every 4MB. */ #define LIMIT_PENDING_QUERYBUF (4*1024*1024) /* 4mb */ +#define REPLY_BUFFER_DEFAULT_PEAK_RESET_TIME 5000 /* 5 seconds */ + /* When configuring the server eventloop, we setup it so that the total number * of file descriptors we can handle are server.maxclients + RESERVED_FDS + * a few more to stay safe. Since RESERVED_FDS defaults to 32, we add 96 @@ -210,6 +218,7 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT]; #define CMD_NO_MULTI (1ULL<<24) #define CMD_MOVABLE_KEYS (1ULL<<25) /* populated by populateCommandMovableKeys */ #define CMD_ALLOW_BUSY ((1ULL<<26)) +#define CMD_MODULE_GETCHANNELS (1ULL<<27) /* Use the modules getchannels interface. */ /* Command flags that describe ACLs categories. */ #define ACL_CATEGORY_KEYSPACE (1ULL<<0) @@ -262,12 +271,23 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT]; #define CMD_KEY_DELETE (1ULL<<7) /* Explicitly deletes some content * from the value of the key. */ /* Other flags: */ -#define CMD_KEY_CHANNEL (1ULL<<8) /* PUBSUB shard channel */ +#define CMD_KEY_NOT_KEY (1ULL<<8) /* A 'fake' key that should be routed + * like a key in cluster mode but is + * excluded from other key checks. */ #define CMD_KEY_INCOMPLETE (1ULL<<9) /* Means that the keyspec might not point * out to all keys it should cover */ #define CMD_KEY_VARIABLE_FLAGS (1ULL<<10) /* Means that some keys might have * different flags depending on arguments */ +/* Key flags for when access type is unknown */ +#define CMD_KEY_FULL_ACCESS (CMD_KEY_RW | CMD_KEY_ACCESS | CMD_KEY_UPDATE) + +/* Channel flags share the same flag space as the key flags */ +#define CMD_CHANNEL_PATTERN (1ULL<<11) /* The argument is a channel pattern */ +#define CMD_CHANNEL_SUBSCRIBE (1ULL<<12) /* The command subscribes to channels */ +#define CMD_CHANNEL_UNSUBSCRIBE (1ULL<<13) /* The command unsubscribes to channels */ +#define CMD_CHANNEL_PUBLISH (1ULL<<14) /* The command publishes to channels. */ + /* AOF states */ #define AOF_OFF 0 /* AOF is off */ #define AOF_ON 1 /* AOF is on */ @@ -1076,6 +1096,9 @@ typedef struct client { robj **original_argv; /* Arguments of original command if arguments were rewritten. */ size_t argv_len_sum; /* Sum of lengths of objects in argv list. */ struct redisCommand *cmd, *lastcmd; /* Last command executed. */ + struct redisCommand *realcmd; /* The original command that was executed by the client, + Used to update error stats in case the c->cmd was modified + during the command invocation (like on GEOADD for example). */ user *user; /* User associated with this connection. If the user is set to NULL the connection can do anything (admin). */ @@ -1084,6 +1107,7 @@ typedef struct client { long bulklen; /* Length of bulk argument in multi bulk request. */ list *reply; /* List of reply objects to send to the client. */ unsigned long long reply_bytes; /* Tot bytes of objects in reply list. */ + list *deferred_reply_errors; /* Used for module thread safe contexts. */ size_t sentlen; /* Amount of bytes already sent in the current buffer or object being sent. */ time_t ctime; /* Client creation time. */ @@ -1159,14 +1183,11 @@ typedef struct client { * i.e. the next offset to send. */ /* Response buffer */ + size_t buf_peak; /* Peak used size of buffer in last 5 sec interval. */ + mstime_t buf_peak_last_reset_time; /* keeps the last time the buffer peak value was reset */ int bufpos; size_t buf_usable_size; /* Usable size of buffer. */ - /* Note that 'buf' must be the last field of client struct, because memory - * allocator may give us more memory than our apply for reducing fragments, - * but we want to make full use of given memory, i.e. we may access the - * memory after 'buf'. To avoid make others fields corrupt, 'buf' must be - * the last one. */ - char buf[PROTO_REPLY_CHUNK_BYTES]; + char *buf; } client; struct saveparam { @@ -1205,14 +1226,16 @@ struct sharedObjectsStruct { *rpop, *lpop, *lpush, *rpoplpush, *lmove, *blmove, *zpopmin, *zpopmax, *emptyscan, *multi, *exec, *left, *right, *hset, *srem, *xgroup, *xclaim, *script, *replconf, *eval, *persist, *set, *pexpireat, *pexpire, - *time, *pxat, *absttl, *retrycount, *force, *justid, + *time, *pxat, *absttl, *retrycount, *force, *justid, *entriesread, *lastid, *ping, *setid, *keepttl, *load, *createconsumer, *getack, *special_asterick, *special_equals, *default_username, *redacted, *ssubscribebulk,*sunsubscribebulk, *select[PROTO_SHARED_SELECT_CMDS], *integers[OBJ_SHARED_INTEGERS], *mbulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "*<value>\r\n" */ - *bulkhdr[OBJ_SHARED_BULKHDR_LEN]; /* "$<value>\r\n" */ + *bulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "$<value>\r\n" */ + *maphdr[OBJ_SHARED_BULKHDR_LEN], /* "%<value>\r\n" */ + *sethdr[OBJ_SHARED_BULKHDR_LEN]; /* "~<value>\r\n" */ sds minstring, maxstring; }; @@ -1536,6 +1559,8 @@ struct redisServer { long long stat_total_active_defrag_time; /* Total time memory fragmentation over the limit, unit us */ monotime stat_last_active_defrag_time; /* Timestamp of current active defrag start */ size_t stat_peak_memory; /* Max used memory record */ + long long stat_aof_rewrites; /* number of aof file rewrites performed */ + long long stat_rdb_saves; /* number of rdb saves performed */ long long stat_fork_time; /* Time needed to perform latest fork() */ double stat_fork_rate; /* Fork rate in GB/sec. */ long long stat_total_forks; /* Total count of fork. */ @@ -1576,6 +1601,9 @@ struct redisServer { long long samples[STATS_METRIC_SAMPLES]; int idx; } inst_metric[STATS_METRIC_COUNT]; + long long stat_reply_buffer_shrinks; /* Total number of output buffer shrinks */ + long long stat_reply_buffer_expands; /* Total number of output buffer expands */ + /* Configuration */ int verbosity; /* Loglevel in redis.conf */ int maxidletime; /* Client timeout in seconds */ @@ -1635,7 +1663,7 @@ struct redisServer { int aof_last_write_status; /* C_OK or C_ERR */ int aof_last_write_errno; /* Valid if aof write/fsync status is ERR */ int aof_load_truncated; /* Don't stop on unexpected AOF EOF. */ - int aof_use_rdb_preamble; /* Use RDB preamble on AOF rewrites. */ + int aof_use_rdb_preamble; /* Specify base AOF to use RDB encoding on AOF rewrites. */ redisAtomic int aof_bio_fsync_status; /* Status of AOF fsync in bio job. */ redisAtomic int aof_bio_fsync_errno; /* Errno of AOF fsync in bio job. */ aofManifest *aof_manifest; /* Used to track AOFs. */ @@ -1752,10 +1780,6 @@ struct redisServer { char master_replid[CONFIG_RUN_ID_SIZE+1]; /* Master PSYNC runid. */ long long master_initial_offset; /* Master PSYNC offset. */ int repl_slave_lazy_flush; /* Lazy FLUSHALL before loading DB? */ - /* Replication script cache. */ - dict *repl_scriptcache_dict; /* SHA1 all slaves are aware of. */ - list *repl_scriptcache_fifo; /* First in, first out LRU eviction. */ - unsigned int repl_scriptcache_size; /* Max number of elements. */ /* Synchronous replication. */ list *clients_waiting_acks; /* Clients waiting in WAIT command. */ int get_ack_from_slaves; /* If true we send REPLCONF GETACK. */ @@ -1889,6 +1913,7 @@ struct redisServer { int failover_state; /* Failover state */ int cluster_allow_pubsubshard_when_down; /* Is pubsubshard allowed when the cluster is down, doesn't affect pubsub global. */ + long reply_buffer_peak_reset_time; /* The amount of time (in milliseconds) to wait between reply buffer peak resets */ }; #define MAX_KEYS_BUFFER 256 @@ -1900,7 +1925,8 @@ typedef struct { } keyReference; /* A result structure for the various getkeys function calls. It lists the - * keys as indices to the provided argv. + * keys as indices to the provided argv. This functionality is also re-used + * for returning channel information. */ typedef struct { keyReference keysbuf[MAX_KEYS_BUFFER]; /* Pre-allocated buffer, to save heap allocations */ @@ -2294,11 +2320,12 @@ extern struct sharedObjectsStruct shared; extern dictType objectKeyPointerValueDictType; extern dictType objectKeyHeapPointerValueDictType; extern dictType setDictType; +extern dictType BenchmarkDictType; extern dictType zsetDictType; extern dictType dbDictType; extern double R_Zero, R_PosInf, R_NegInf, R_Nan; extern dictType hashDictType; -extern dictType replScriptCacheDictType; +extern dictType stringSetDictType; extern dictType dbExpiresDictType; extern dictType modulesDictType; extern dictType sdsReplyDictType; @@ -2308,8 +2335,9 @@ extern dict *modules; * Functions prototypes *----------------------------------------------------------------------------*/ -/* Key arguments specs */ +/* Command metadata */ void populateCommandLegacyRangeSpec(struct redisCommand *c); +int populateArgsStructure(struct redisCommandArg *args); /* Modules */ void moduleInitModulesSystem(void); @@ -2318,6 +2346,7 @@ void modulesCron(void); int moduleLoad(const char *path, void **argv, int argc); void moduleLoadFromQueue(void); int moduleGetCommandKeysViaAPI(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result); +int moduleGetCommandChannelsViaAPI(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result); moduleType *moduleTypeLookupModuleByID(uint64_t id); void moduleTypeNameByID(char *name, uint64_t moduleid); const char *moduleTypeModuleName(moduleType *mt); @@ -2337,7 +2366,7 @@ int TerminateModuleForkChild(int child_pid, int wait); ssize_t rdbSaveModulesAux(rio *rdb, int when); int moduleAllDatatypesHandleErrors(); int moduleAllModulesHandleReplAsyncLoad(); -sds modulesCollectInfo(sds info, const char *section, int for_crash_report, int sections); +sds modulesCollectInfo(sds info, dict *sections_dict, int for_crash_report, int sections); void moduleFireServerEvent(uint64_t eid, int subid, void *data); void processModuleLoadingProgressEvent(int is_aof); int moduleTryServeClientBlockedOnKey(client *c, robj *key); @@ -2368,6 +2397,9 @@ int validateProcTitleTemplate(const char *template); int redisCommunicateSystemd(const char *sd_notify_msg); void redisSetCpuAffinity(const char *cpulist); +/* afterErrorReply flags */ +#define ERR_REPLY_FLAG_NO_STATS_UPDATE (1ULL<<0) /* Indicating that we should not update + error stats after sending error reply */ /* networking.c -- Networking and Client related operations */ client *createClient(connection *conn); void freeClient(client *c); @@ -2377,6 +2409,7 @@ int beforeNextClient(client *c); void clearClientConnectionState(client *c); void resetClient(client *c); void freeClientOriginalArgv(client *c); +void freeClientArgv(client *c); void sendReplyToClient(connection *conn); void *addReplyDeferredLen(client *c); void setDeferredArrayLen(client *c, void *node, long length); @@ -2406,6 +2439,8 @@ void addReplyBulkSds(client *c, sds s); void setDeferredReplyBulkSds(client *c, void *node, sds s); void addReplyErrorObject(client *c, robj *err); void addReplyOrErrorObject(client *c, robj *reply); +void afterErrorReply(client *c, const char *s, size_t len, int flags); +void addReplyErrorSdsEx(client *c, sds err, int flags); void addReplyErrorSds(client *c, sds err); void addReplyError(client *c, const char *err); void addReplyErrorArity(client *c); @@ -2426,6 +2461,7 @@ void addReplySubcommandSyntaxError(client *c); void addReplyLoadedModules(client *c); void copyReplicaOutputBuffer(client *dst, client *src); void addListRangeReply(client *c, robj *o, long start, long end, int reverse); +void deferredAfterErrorReply(client *c, list *errors); size_t sdsZmallocSize(sds s); size_t getStringObjectSdsUsedMemory(robj *o); void freeClientReplyValue(void *o); @@ -2477,11 +2513,14 @@ int authRequired(client *c); void clientInstallWriteHandler(client *c); #ifdef __GNUC__ +void addReplyErrorFormatEx(client *c, int flags, const char *fmt, ...) + __attribute__((format(printf, 3, 4))); void addReplyErrorFormat(client *c, const char *fmt, ...) __attribute__((format(printf, 2, 3))); void addReplyStatusFormat(client *c, const char *fmt, ...) __attribute__((format(printf, 2, 3))); #else +void addReplyErrorFormatEx(client *c, int flags, const char *fmt, ...); void addReplyErrorFormat(client *c, const char *fmt, ...); void addReplyStatusFormat(client *c, const char *fmt, ...); #endif @@ -2760,6 +2799,10 @@ typedef struct { int minex, maxex; /* are min or max exclusive? */ } zlexrangespec; +/* flags for incrCommandFailedCalls */ +#define ERROR_COMMAND_REJECTED (1<<0) /* Indicate to update the command rejected stats */ +#define ERROR_COMMAND_FAILED (1<<1) /* Indicate to update the command failed stats */ + zskiplist *zslCreate(void); void zslFree(zskiplist *zsl); zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele); @@ -2814,6 +2857,8 @@ struct redisCommand *lookupCommandBySds(sds s); struct redisCommand *lookupCommandByCStringLogic(dict *commands, const char *s); struct redisCommand *lookupCommandByCString(const char *s); struct redisCommand *lookupCommandOrOriginal(robj **argv, int argc); +void startCommandExecution(); +int incrCommandStatsOnError(struct redisCommand *cmd, int flags); void call(client *c, int flags); void alsoPropagate(int dbid, robj **argv, int argc, int target); void propagatePendingCommands(); @@ -2932,7 +2977,7 @@ void resetServerSaveParams(void); struct rewriteConfigState; /* Forward declaration to export API. */ void rewriteConfigRewriteLine(struct rewriteConfigState *state, const char *option, sds line, int force); void rewriteConfigMarkAsProcessed(struct rewriteConfigState *state, const char *option); -int rewriteConfig(char *path, int force_all); +int rewriteConfig(char *path, int force_write); void initConfigValues(); sds getConfigDebugInfo(); int allowProtectedAction(int config, client *c); @@ -3001,13 +3046,15 @@ void freeReplicationBacklogRefMemAsync(list *blocks, rax *index); /* API to get key arguments from commands */ #define GET_KEYSPEC_DEFAULT 0 -#define GET_KEYSPEC_INCLUDE_CHANNELS (1<<0) /* Consider channels as keys */ +#define GET_KEYSPEC_INCLUDE_NOT_KEYS (1<<0) /* Consider 'fake' keys as keys */ #define GET_KEYSPEC_RETURN_PARTIAL (1<<1) /* Return all keys that can be found */ int getKeysFromCommandWithSpecs(struct redisCommand *cmd, robj **argv, int argc, int search_flags, getKeysResult *result); keyReference *getKeysPrepareResult(getKeysResult *result, int numkeys); int getKeysFromCommand(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result); int doesCommandHaveKeys(struct redisCommand *cmd); +int getChannelsFromCommand(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result); +int doesCommandHaveChannelsWithFlags(struct redisCommand *cmd, int flags); void getKeysFreeResult(getKeysResult *result); int sintercardGetKeys(struct redisCommand *cmd,robj **argv, int argc, getKeysResult *result); int zunionInterDiffGetKeys(struct redisCommand *cmd,robj **argv, int argc, getKeysResult *result); @@ -3064,6 +3111,11 @@ unsigned long evalMemory(); dict* evalScriptsDict(); unsigned long evalScriptsMemory(); +typedef struct luaScript { + uint64_t flags; + robj *body; +} luaScript; + /* Blocked clients */ void processUnblockedClients(void); void blockClient(client *c, int btype); @@ -3075,7 +3127,7 @@ void disconnectAllBlockedClients(void); void handleClientsBlockedOnKeys(void); void signalKeyAsReady(redisDb *db, robj *key, int type); void blockForKeys(client *c, int btype, robj **keys, int numkeys, long count, mstime_t timeout, robj *target, struct blockPos *blockpos, streamID *ids); -void updateStatsOnUnblock(client *c, long blocked_us, long reply_us); +void updateStatsOnUnblock(client *c, long blocked_us, long reply_us, int had_errors); /* timeout.c -- Blocked clients timeout and connections timeout. */ void addClientToTimeoutTable(client *c); @@ -3125,6 +3177,7 @@ void commandCountCommand(client *c); void commandListCommand(client *c); void commandInfoCommand(client *c); void commandGetKeysCommand(client *c); +void commandGetKeysAndFlagsCommand(client *c); void commandHelpCommand(client *c); void commandDocsCommand(client *c); void setCommand(client *c); @@ -3390,7 +3443,9 @@ void _serverPanic(const char *file, int line, const char *msg, ...); void serverLogObjectDebugInfo(const robj *o); void sigsegvHandler(int sig, siginfo_t *info, void *secret); const char *getSafeInfoString(const char *s, size_t len, char **tmp); -sds genRedisInfoString(const char *section); +dict *genInfoSectionDict(robj **argv, int argc, char **defaults, int *out_all, int *out_everything); +void releaseInfoSectionDict(dict *sec); +sds genRedisInfoString(dict *section_dict, int all_sections, int everything); sds genModulesInfoString(sds info); void applyWatchdogPeriod(); void watchdogScheduleSignal(int period); |