summaryrefslogtreecommitdiff
path: root/src/server.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/server.h')
-rw-r--r--src/server.h99
1 files changed, 77 insertions, 22 deletions
diff --git a/src/server.h b/src/server.h
index 485c051fd..4da7a010f 100644
--- a/src/server.h
+++ b/src/server.h
@@ -57,6 +57,10 @@
#include <systemd/sd-daemon.h>
#endif
+#ifndef static_assert
+#define static_assert(expr, lit) extern char __static_assert_failure[(expr) ? 1:-1]
+#endif
+
typedef long long mstime_t; /* millisecond time type. */
typedef long long ustime_t; /* microsecond time type. */
@@ -105,6 +109,7 @@ typedef long long ustime_t; /* microsecond time type. */
#define PROTO_SHARED_SELECT_CMDS 10
#define OBJ_SHARED_INTEGERS 10000
#define OBJ_SHARED_BULKHDR_LEN 32
+#define OBJ_SHARED_HDR_STRLEN(_len_) (((_len_) < 10) ? 4 : 5) /* see shared.mbulkhdr etc. */
#define LOG_MAX_LEN 1024 /* Default maximum length of syslog messages.*/
#define AOF_REWRITE_ITEMS_PER_CMD 64
#define AOF_ANNOTATION_LINE_MAX_LEN 1024
@@ -159,11 +164,14 @@ typedef long long ustime_t; /* microsecond time type. */
#define PROTO_INLINE_MAX_SIZE (1024*64) /* Max size of inline reads */
#define PROTO_MBULK_BIG_ARG (1024*32)
#define PROTO_RESIZE_THRESHOLD (1024*32) /* Threshold for determining whether to resize query buffer */
+#define PROTO_REPLY_MIN_BYTES (1024) /* the lower limit on reply buffer size */
#define LONG_STR_SIZE 21 /* Bytes needed for long -> str + '\0' */
#define REDIS_AUTOSYNC_BYTES (1024*1024*4) /* Sync file every 4MB. */
#define LIMIT_PENDING_QUERYBUF (4*1024*1024) /* 4mb */
+#define REPLY_BUFFER_DEFAULT_PEAK_RESET_TIME 5000 /* 5 seconds */
+
/* When configuring the server eventloop, we setup it so that the total number
* of file descriptors we can handle are server.maxclients + RESERVED_FDS +
* a few more to stay safe. Since RESERVED_FDS defaults to 32, we add 96
@@ -210,6 +218,7 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
#define CMD_NO_MULTI (1ULL<<24)
#define CMD_MOVABLE_KEYS (1ULL<<25) /* populated by populateCommandMovableKeys */
#define CMD_ALLOW_BUSY ((1ULL<<26))
+#define CMD_MODULE_GETCHANNELS (1ULL<<27) /* Use the modules getchannels interface. */
/* Command flags that describe ACLs categories. */
#define ACL_CATEGORY_KEYSPACE (1ULL<<0)
@@ -262,12 +271,23 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
#define CMD_KEY_DELETE (1ULL<<7) /* Explicitly deletes some content
* from the value of the key. */
/* Other flags: */
-#define CMD_KEY_CHANNEL (1ULL<<8) /* PUBSUB shard channel */
+#define CMD_KEY_NOT_KEY (1ULL<<8) /* A 'fake' key that should be routed
+ * like a key in cluster mode but is
+ * excluded from other key checks. */
#define CMD_KEY_INCOMPLETE (1ULL<<9) /* Means that the keyspec might not point
* out to all keys it should cover */
#define CMD_KEY_VARIABLE_FLAGS (1ULL<<10) /* Means that some keys might have
* different flags depending on arguments */
+/* Key flags for when access type is unknown */
+#define CMD_KEY_FULL_ACCESS (CMD_KEY_RW | CMD_KEY_ACCESS | CMD_KEY_UPDATE)
+
+/* Channel flags share the same flag space as the key flags */
+#define CMD_CHANNEL_PATTERN (1ULL<<11) /* The argument is a channel pattern */
+#define CMD_CHANNEL_SUBSCRIBE (1ULL<<12) /* The command subscribes to channels */
+#define CMD_CHANNEL_UNSUBSCRIBE (1ULL<<13) /* The command unsubscribes to channels */
+#define CMD_CHANNEL_PUBLISH (1ULL<<14) /* The command publishes to channels. */
+
/* AOF states */
#define AOF_OFF 0 /* AOF is off */
#define AOF_ON 1 /* AOF is on */
@@ -1076,6 +1096,9 @@ typedef struct client {
robj **original_argv; /* Arguments of original command if arguments were rewritten. */
size_t argv_len_sum; /* Sum of lengths of objects in argv list. */
struct redisCommand *cmd, *lastcmd; /* Last command executed. */
+ struct redisCommand *realcmd; /* The original command that was executed by the client,
+ Used to update error stats in case the c->cmd was modified
+ during the command invocation (like on GEOADD for example). */
user *user; /* User associated with this connection. If the
user is set to NULL the connection can do
anything (admin). */
@@ -1084,6 +1107,7 @@ typedef struct client {
long bulklen; /* Length of bulk argument in multi bulk request. */
list *reply; /* List of reply objects to send to the client. */
unsigned long long reply_bytes; /* Tot bytes of objects in reply list. */
+ list *deferred_reply_errors; /* Used for module thread safe contexts. */
size_t sentlen; /* Amount of bytes already sent in the current
buffer or object being sent. */
time_t ctime; /* Client creation time. */
@@ -1159,14 +1183,11 @@ typedef struct client {
* i.e. the next offset to send. */
/* Response buffer */
+ size_t buf_peak; /* Peak used size of buffer in last 5 sec interval. */
+ mstime_t buf_peak_last_reset_time; /* keeps the last time the buffer peak value was reset */
int bufpos;
size_t buf_usable_size; /* Usable size of buffer. */
- /* Note that 'buf' must be the last field of client struct, because memory
- * allocator may give us more memory than our apply for reducing fragments,
- * but we want to make full use of given memory, i.e. we may access the
- * memory after 'buf'. To avoid make others fields corrupt, 'buf' must be
- * the last one. */
- char buf[PROTO_REPLY_CHUNK_BYTES];
+ char *buf;
} client;
struct saveparam {
@@ -1205,14 +1226,16 @@ struct sharedObjectsStruct {
*rpop, *lpop, *lpush, *rpoplpush, *lmove, *blmove, *zpopmin, *zpopmax,
*emptyscan, *multi, *exec, *left, *right, *hset, *srem, *xgroup, *xclaim,
*script, *replconf, *eval, *persist, *set, *pexpireat, *pexpire,
- *time, *pxat, *absttl, *retrycount, *force, *justid,
+ *time, *pxat, *absttl, *retrycount, *force, *justid, *entriesread,
*lastid, *ping, *setid, *keepttl, *load, *createconsumer,
*getack, *special_asterick, *special_equals, *default_username, *redacted,
*ssubscribebulk,*sunsubscribebulk,
*select[PROTO_SHARED_SELECT_CMDS],
*integers[OBJ_SHARED_INTEGERS],
*mbulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "*<value>\r\n" */
- *bulkhdr[OBJ_SHARED_BULKHDR_LEN]; /* "$<value>\r\n" */
+ *bulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "$<value>\r\n" */
+ *maphdr[OBJ_SHARED_BULKHDR_LEN], /* "%<value>\r\n" */
+ *sethdr[OBJ_SHARED_BULKHDR_LEN]; /* "~<value>\r\n" */
sds minstring, maxstring;
};
@@ -1536,6 +1559,8 @@ struct redisServer {
long long stat_total_active_defrag_time; /* Total time memory fragmentation over the limit, unit us */
monotime stat_last_active_defrag_time; /* Timestamp of current active defrag start */
size_t stat_peak_memory; /* Max used memory record */
+ long long stat_aof_rewrites; /* number of aof file rewrites performed */
+ long long stat_rdb_saves; /* number of rdb saves performed */
long long stat_fork_time; /* Time needed to perform latest fork() */
double stat_fork_rate; /* Fork rate in GB/sec. */
long long stat_total_forks; /* Total count of fork. */
@@ -1576,6 +1601,9 @@ struct redisServer {
long long samples[STATS_METRIC_SAMPLES];
int idx;
} inst_metric[STATS_METRIC_COUNT];
+ long long stat_reply_buffer_shrinks; /* Total number of output buffer shrinks */
+ long long stat_reply_buffer_expands; /* Total number of output buffer expands */
+
/* Configuration */
int verbosity; /* Loglevel in redis.conf */
int maxidletime; /* Client timeout in seconds */
@@ -1635,7 +1663,7 @@ struct redisServer {
int aof_last_write_status; /* C_OK or C_ERR */
int aof_last_write_errno; /* Valid if aof write/fsync status is ERR */
int aof_load_truncated; /* Don't stop on unexpected AOF EOF. */
- int aof_use_rdb_preamble; /* Use RDB preamble on AOF rewrites. */
+ int aof_use_rdb_preamble; /* Specify base AOF to use RDB encoding on AOF rewrites. */
redisAtomic int aof_bio_fsync_status; /* Status of AOF fsync in bio job. */
redisAtomic int aof_bio_fsync_errno; /* Errno of AOF fsync in bio job. */
aofManifest *aof_manifest; /* Used to track AOFs. */
@@ -1752,10 +1780,6 @@ struct redisServer {
char master_replid[CONFIG_RUN_ID_SIZE+1]; /* Master PSYNC runid. */
long long master_initial_offset; /* Master PSYNC offset. */
int repl_slave_lazy_flush; /* Lazy FLUSHALL before loading DB? */
- /* Replication script cache. */
- dict *repl_scriptcache_dict; /* SHA1 all slaves are aware of. */
- list *repl_scriptcache_fifo; /* First in, first out LRU eviction. */
- unsigned int repl_scriptcache_size; /* Max number of elements. */
/* Synchronous replication. */
list *clients_waiting_acks; /* Clients waiting in WAIT command. */
int get_ack_from_slaves; /* If true we send REPLCONF GETACK. */
@@ -1889,6 +1913,7 @@ struct redisServer {
int failover_state; /* Failover state */
int cluster_allow_pubsubshard_when_down; /* Is pubsubshard allowed when the cluster
is down, doesn't affect pubsub global. */
+ long reply_buffer_peak_reset_time; /* The amount of time (in milliseconds) to wait between reply buffer peak resets */
};
#define MAX_KEYS_BUFFER 256
@@ -1900,7 +1925,8 @@ typedef struct {
} keyReference;
/* A result structure for the various getkeys function calls. It lists the
- * keys as indices to the provided argv.
+ * keys as indices to the provided argv. This functionality is also re-used
+ * for returning channel information.
*/
typedef struct {
keyReference keysbuf[MAX_KEYS_BUFFER]; /* Pre-allocated buffer, to save heap allocations */
@@ -2294,11 +2320,12 @@ extern struct sharedObjectsStruct shared;
extern dictType objectKeyPointerValueDictType;
extern dictType objectKeyHeapPointerValueDictType;
extern dictType setDictType;
+extern dictType BenchmarkDictType;
extern dictType zsetDictType;
extern dictType dbDictType;
extern double R_Zero, R_PosInf, R_NegInf, R_Nan;
extern dictType hashDictType;
-extern dictType replScriptCacheDictType;
+extern dictType stringSetDictType;
extern dictType dbExpiresDictType;
extern dictType modulesDictType;
extern dictType sdsReplyDictType;
@@ -2308,8 +2335,9 @@ extern dict *modules;
* Functions prototypes
*----------------------------------------------------------------------------*/
-/* Key arguments specs */
+/* Command metadata */
void populateCommandLegacyRangeSpec(struct redisCommand *c);
+int populateArgsStructure(struct redisCommandArg *args);
/* Modules */
void moduleInitModulesSystem(void);
@@ -2318,6 +2346,7 @@ void modulesCron(void);
int moduleLoad(const char *path, void **argv, int argc);
void moduleLoadFromQueue(void);
int moduleGetCommandKeysViaAPI(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result);
+int moduleGetCommandChannelsViaAPI(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result);
moduleType *moduleTypeLookupModuleByID(uint64_t id);
void moduleTypeNameByID(char *name, uint64_t moduleid);
const char *moduleTypeModuleName(moduleType *mt);
@@ -2337,7 +2366,7 @@ int TerminateModuleForkChild(int child_pid, int wait);
ssize_t rdbSaveModulesAux(rio *rdb, int when);
int moduleAllDatatypesHandleErrors();
int moduleAllModulesHandleReplAsyncLoad();
-sds modulesCollectInfo(sds info, const char *section, int for_crash_report, int sections);
+sds modulesCollectInfo(sds info, dict *sections_dict, int for_crash_report, int sections);
void moduleFireServerEvent(uint64_t eid, int subid, void *data);
void processModuleLoadingProgressEvent(int is_aof);
int moduleTryServeClientBlockedOnKey(client *c, robj *key);
@@ -2368,6 +2397,9 @@ int validateProcTitleTemplate(const char *template);
int redisCommunicateSystemd(const char *sd_notify_msg);
void redisSetCpuAffinity(const char *cpulist);
+/* afterErrorReply flags */
+#define ERR_REPLY_FLAG_NO_STATS_UPDATE (1ULL<<0) /* Indicating that we should not update
+ error stats after sending error reply */
/* networking.c -- Networking and Client related operations */
client *createClient(connection *conn);
void freeClient(client *c);
@@ -2377,6 +2409,7 @@ int beforeNextClient(client *c);
void clearClientConnectionState(client *c);
void resetClient(client *c);
void freeClientOriginalArgv(client *c);
+void freeClientArgv(client *c);
void sendReplyToClient(connection *conn);
void *addReplyDeferredLen(client *c);
void setDeferredArrayLen(client *c, void *node, long length);
@@ -2406,6 +2439,8 @@ void addReplyBulkSds(client *c, sds s);
void setDeferredReplyBulkSds(client *c, void *node, sds s);
void addReplyErrorObject(client *c, robj *err);
void addReplyOrErrorObject(client *c, robj *reply);
+void afterErrorReply(client *c, const char *s, size_t len, int flags);
+void addReplyErrorSdsEx(client *c, sds err, int flags);
void addReplyErrorSds(client *c, sds err);
void addReplyError(client *c, const char *err);
void addReplyErrorArity(client *c);
@@ -2426,6 +2461,7 @@ void addReplySubcommandSyntaxError(client *c);
void addReplyLoadedModules(client *c);
void copyReplicaOutputBuffer(client *dst, client *src);
void addListRangeReply(client *c, robj *o, long start, long end, int reverse);
+void deferredAfterErrorReply(client *c, list *errors);
size_t sdsZmallocSize(sds s);
size_t getStringObjectSdsUsedMemory(robj *o);
void freeClientReplyValue(void *o);
@@ -2477,11 +2513,14 @@ int authRequired(client *c);
void clientInstallWriteHandler(client *c);
#ifdef __GNUC__
+void addReplyErrorFormatEx(client *c, int flags, const char *fmt, ...)
+ __attribute__((format(printf, 3, 4)));
void addReplyErrorFormat(client *c, const char *fmt, ...)
__attribute__((format(printf, 2, 3)));
void addReplyStatusFormat(client *c, const char *fmt, ...)
__attribute__((format(printf, 2, 3)));
#else
+void addReplyErrorFormatEx(client *c, int flags, const char *fmt, ...);
void addReplyErrorFormat(client *c, const char *fmt, ...);
void addReplyStatusFormat(client *c, const char *fmt, ...);
#endif
@@ -2760,6 +2799,10 @@ typedef struct {
int minex, maxex; /* are min or max exclusive? */
} zlexrangespec;
+/* flags for incrCommandFailedCalls */
+#define ERROR_COMMAND_REJECTED (1<<0) /* Indicate to update the command rejected stats */
+#define ERROR_COMMAND_FAILED (1<<1) /* Indicate to update the command failed stats */
+
zskiplist *zslCreate(void);
void zslFree(zskiplist *zsl);
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele);
@@ -2814,6 +2857,8 @@ struct redisCommand *lookupCommandBySds(sds s);
struct redisCommand *lookupCommandByCStringLogic(dict *commands, const char *s);
struct redisCommand *lookupCommandByCString(const char *s);
struct redisCommand *lookupCommandOrOriginal(robj **argv, int argc);
+void startCommandExecution();
+int incrCommandStatsOnError(struct redisCommand *cmd, int flags);
void call(client *c, int flags);
void alsoPropagate(int dbid, robj **argv, int argc, int target);
void propagatePendingCommands();
@@ -2932,7 +2977,7 @@ void resetServerSaveParams(void);
struct rewriteConfigState; /* Forward declaration to export API. */
void rewriteConfigRewriteLine(struct rewriteConfigState *state, const char *option, sds line, int force);
void rewriteConfigMarkAsProcessed(struct rewriteConfigState *state, const char *option);
-int rewriteConfig(char *path, int force_all);
+int rewriteConfig(char *path, int force_write);
void initConfigValues();
sds getConfigDebugInfo();
int allowProtectedAction(int config, client *c);
@@ -3001,13 +3046,15 @@ void freeReplicationBacklogRefMemAsync(list *blocks, rax *index);
/* API to get key arguments from commands */
#define GET_KEYSPEC_DEFAULT 0
-#define GET_KEYSPEC_INCLUDE_CHANNELS (1<<0) /* Consider channels as keys */
+#define GET_KEYSPEC_INCLUDE_NOT_KEYS (1<<0) /* Consider 'fake' keys as keys */
#define GET_KEYSPEC_RETURN_PARTIAL (1<<1) /* Return all keys that can be found */
int getKeysFromCommandWithSpecs(struct redisCommand *cmd, robj **argv, int argc, int search_flags, getKeysResult *result);
keyReference *getKeysPrepareResult(getKeysResult *result, int numkeys);
int getKeysFromCommand(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result);
int doesCommandHaveKeys(struct redisCommand *cmd);
+int getChannelsFromCommand(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result);
+int doesCommandHaveChannelsWithFlags(struct redisCommand *cmd, int flags);
void getKeysFreeResult(getKeysResult *result);
int sintercardGetKeys(struct redisCommand *cmd,robj **argv, int argc, getKeysResult *result);
int zunionInterDiffGetKeys(struct redisCommand *cmd,robj **argv, int argc, getKeysResult *result);
@@ -3064,6 +3111,11 @@ unsigned long evalMemory();
dict* evalScriptsDict();
unsigned long evalScriptsMemory();
+typedef struct luaScript {
+ uint64_t flags;
+ robj *body;
+} luaScript;
+
/* Blocked clients */
void processUnblockedClients(void);
void blockClient(client *c, int btype);
@@ -3075,7 +3127,7 @@ void disconnectAllBlockedClients(void);
void handleClientsBlockedOnKeys(void);
void signalKeyAsReady(redisDb *db, robj *key, int type);
void blockForKeys(client *c, int btype, robj **keys, int numkeys, long count, mstime_t timeout, robj *target, struct blockPos *blockpos, streamID *ids);
-void updateStatsOnUnblock(client *c, long blocked_us, long reply_us);
+void updateStatsOnUnblock(client *c, long blocked_us, long reply_us, int had_errors);
/* timeout.c -- Blocked clients timeout and connections timeout. */
void addClientToTimeoutTable(client *c);
@@ -3125,6 +3177,7 @@ void commandCountCommand(client *c);
void commandListCommand(client *c);
void commandInfoCommand(client *c);
void commandGetKeysCommand(client *c);
+void commandGetKeysAndFlagsCommand(client *c);
void commandHelpCommand(client *c);
void commandDocsCommand(client *c);
void setCommand(client *c);
@@ -3390,7 +3443,9 @@ void _serverPanic(const char *file, int line, const char *msg, ...);
void serverLogObjectDebugInfo(const robj *o);
void sigsegvHandler(int sig, siginfo_t *info, void *secret);
const char *getSafeInfoString(const char *s, size_t len, char **tmp);
-sds genRedisInfoString(const char *section);
+dict *genInfoSectionDict(robj **argv, int argc, char **defaults, int *out_all, int *out_everything);
+void releaseInfoSectionDict(dict *sec);
+sds genRedisInfoString(dict *section_dict, int all_sections, int everything);
sds genModulesInfoString(sds info);
void applyWatchdogPeriod();
void watchdogScheduleSignal(int period);