summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorArun Ranganathan <arun.ranga@hotmail.ca>2020-07-29 01:46:44 -0400
committerGitHub <noreply@github.com>2020-07-29 08:46:44 +0300
commitf6cad30bb69b2ad35bb0a870077fac2d4605d727 (patch)
treeb02a2bb74a44b6425a4a93579df877aecc3c845d /src
parent63dae5232415d216dfc1acce8b5335e20aa3b178 (diff)
downloadredis-f6cad30bb69b2ad35bb0a870077fac2d4605d727.tar.gz
Show threading configuration in INFO output (#7446)
Co-authored-by: Oran Agra <oran@redislabs.com>
Diffstat (limited to 'src')
-rw-r--r--src/networking.c33
-rw-r--r--src/server.c22
-rw-r--r--src/server.h5
3 files changed, 46 insertions, 14 deletions
diff --git a/src/networking.c b/src/networking.c
index a3c04efa6..0da9bbc9c 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -1295,6 +1295,9 @@ client *lookupClientByID(uint64_t id) {
* set to 0. So when handler_installed is set to 0 the function must be
* thread safe. */
int writeToClient(client *c, int handler_installed) {
+ /* Update total number of writes on server */
+ server.stat_total_writes_processed++;
+
ssize_t nwritten = 0, totwritten = 0;
size_t objlen;
clientReplyBlock *o;
@@ -1910,6 +1913,9 @@ void readQueryFromClient(connection *conn) {
* the event loop. This is the case if threaded I/O is enabled. */
if (postponeClientRead(c)) return;
+ /* Update total number of reads on server */
+ server.stat_total_reads_processed++;
+
readlen = PROTO_IOBUF_LEN;
/* If this is a multi bulk request, and we are processing a bulk reply
* that is large enough, try to maximize the probability that the query
@@ -2907,7 +2913,6 @@ int tio_debug = 0;
pthread_t io_threads[IO_THREADS_MAX_NUM];
pthread_mutex_t io_threads_mutex[IO_THREADS_MAX_NUM];
_Atomic unsigned long io_threads_pending[IO_THREADS_MAX_NUM];
-int io_threads_active; /* Are the threads currently spinning waiting I/O? */
int io_threads_op; /* IO_THREADS_OP_WRITE or IO_THREADS_OP_READ. */
/* This is the list of clients each thread will serve when threaded I/O is
@@ -2966,7 +2971,7 @@ void *IOThreadMain(void *myid) {
/* Initialize the data structures needed for threaded I/O. */
void initThreadedIO(void) {
- io_threads_active = 0; /* We start with threads not active. */
+ server.io_threads_active = 0; /* We start with threads not active. */
/* Don't spawn any thread if the user selected a single thread:
* we'll handle I/O directly from the main thread. */
@@ -3000,10 +3005,10 @@ void initThreadedIO(void) {
void startThreadedIO(void) {
if (tio_debug) { printf("S"); fflush(stdout); }
if (tio_debug) printf("--- STARTING THREADED IO ---\n");
- serverAssert(io_threads_active == 0);
+ serverAssert(server.io_threads_active == 0);
for (int j = 1; j < server.io_threads_num; j++)
pthread_mutex_unlock(&io_threads_mutex[j]);
- io_threads_active = 1;
+ server.io_threads_active = 1;
}
void stopThreadedIO(void) {
@@ -3014,10 +3019,10 @@ void stopThreadedIO(void) {
if (tio_debug) printf("--- STOPPING THREADED IO [R%d] [W%d] ---\n",
(int) listLength(server.clients_pending_read),
(int) listLength(server.clients_pending_write));
- serverAssert(io_threads_active == 1);
+ serverAssert(server.io_threads_active == 1);
for (int j = 1; j < server.io_threads_num; j++)
pthread_mutex_lock(&io_threads_mutex[j]);
- io_threads_active = 0;
+ server.io_threads_active = 0;
}
/* This function checks if there are not enough pending clients to justify
@@ -3036,7 +3041,7 @@ int stopThreadedIOIfNeeded(void) {
if (server.io_threads_num == 1) return 1;
if (pending < (server.io_threads_num*2)) {
- if (io_threads_active) stopThreadedIO();
+ if (server.io_threads_active) stopThreadedIO();
return 1;
} else {
return 0;
@@ -3054,7 +3059,7 @@ int handleClientsWithPendingWritesUsingThreads(void) {
}
/* Start threads if needed. */
- if (!io_threads_active) startThreadedIO();
+ if (!server.io_threads_active) startThreadedIO();
if (tio_debug) printf("%d TOTAL WRITE pending clients\n", processed);
@@ -3111,6 +3116,10 @@ int handleClientsWithPendingWritesUsingThreads(void) {
}
}
listEmpty(server.clients_pending_write);
+
+ /* Update processed count on server */
+ server.stat_io_writes_processed += processed;
+
return processed;
}
@@ -3119,7 +3128,7 @@ int handleClientsWithPendingWritesUsingThreads(void) {
* As a side effect of calling this function the client is put in the
* pending read clients and flagged as such. */
int postponeClientRead(client *c) {
- if (io_threads_active &&
+ if (server.io_threads_active &&
server.io_threads_do_reads &&
!ProcessingEventsWhileBlocked &&
!(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ)))
@@ -3139,7 +3148,7 @@ int postponeClientRead(client *c) {
* the reads in the buffers, and also parse the first command available
* rendering it in the client structures. */
int handleClientsWithPendingReadsUsingThreads(void) {
- if (!io_threads_active || !server.io_threads_do_reads) return 0;
+ if (!server.io_threads_active || !server.io_threads_do_reads) return 0;
int processed = listLength(server.clients_pending_read);
if (processed == 0) return 0;
@@ -3200,5 +3209,9 @@ int handleClientsWithPendingReadsUsingThreads(void) {
}
processInputBuffer(c);
}
+
+ /* Update processed count on server */
+ server.stat_io_reads_processed += processed;
+
return processed;
}
diff --git a/src/server.c b/src/server.c
index 22a910e1c..afe5094c5 100644
--- a/src/server.c
+++ b/src/server.c
@@ -2726,6 +2726,10 @@ void resetServerStats(void) {
server.stat_sync_full = 0;
server.stat_sync_partial_ok = 0;
server.stat_sync_partial_err = 0;
+ server.stat_io_reads_processed = 0;
+ server.stat_total_reads_processed = 0;
+ server.stat_io_writes_processed = 0;
+ server.stat_total_writes_processed = 0;
for (j = 0; j < STATS_METRIC_COUNT; j++) {
server.inst_metric[j].idx = 0;
server.inst_metric[j].last_sample_time = mstime();
@@ -4071,7 +4075,8 @@ sds genRedisInfoString(const char *section) {
"configured_hz:%i\r\n"
"lru_clock:%u\r\n"
"executable:%s\r\n"
- "config_file:%s\r\n",
+ "config_file:%s\r\n"
+ "io_threads_active:%d\r\n",
REDIS_VERSION,
redisGitSHA1(),
strtol(redisGitDirty(),NULL,10) > 0,
@@ -4095,7 +4100,8 @@ sds genRedisInfoString(const char *section) {
server.config_hz,
server.lruclock,
server.executable ? server.executable : "",
- server.configfile ? server.configfile : "");
+ server.configfile ? server.configfile : "",
+ server.io_threads_active);
}
/* Clients */
@@ -4367,7 +4373,11 @@ sds genRedisInfoString(const char *section) {
"tracking_total_keys:%lld\r\n"
"tracking_total_items:%lld\r\n"
"tracking_total_prefixes:%lld\r\n"
- "unexpected_error_replies:%lld\r\n",
+ "unexpected_error_replies:%lld\r\n"
+ "total_reads_processed:%lld\r\n"
+ "total_writes_processed:%lld\r\n"
+ "io_threaded_reads_processed:%lld\r\n"
+ "io_threaded_writes_processed:%lld\r\n",
server.stat_numconnections,
server.stat_numcommands,
getInstantaneousMetric(STATS_METRIC_COMMAND),
@@ -4398,7 +4408,11 @@ sds genRedisInfoString(const char *section) {
(unsigned long long) trackingGetTotalKeys(),
(unsigned long long) trackingGetTotalItems(),
(unsigned long long) trackingGetTotalPrefixes(),
- server.stat_unexpected_error_replies);
+ server.stat_unexpected_error_replies,
+ server.stat_total_reads_processed,
+ server.stat_total_writes_processed,
+ server.stat_io_reads_processed,
+ server.stat_io_writes_processed);
}
/* Replication */
diff --git a/src/server.h b/src/server.h
index 745ae76a2..adfac3293 100644
--- a/src/server.h
+++ b/src/server.h
@@ -1108,6 +1108,7 @@ struct redisServer {
queries. Will still serve RESP2 queries. */
int io_threads_num; /* Number of IO threads to use. */
int io_threads_do_reads; /* Read and parse from IO threads? */
+ int io_threads_active; /* Is IO threads currently active? */
long long events_processed_while_blocked; /* processEventsWhileBlocked() */
/* RDB / AOF loading information */
@@ -1157,6 +1158,10 @@ struct redisServer {
size_t stat_module_cow_bytes; /* Copy on write bytes during module fork. */
uint64_t stat_clients_type_memory[CLIENT_TYPE_COUNT];/* Mem usage by type */
long long stat_unexpected_error_replies; /* Number of unexpected (aof-loading, replica to master, etc.) error replies */
+ long long stat_io_reads_processed; /* Number of read events processed by IO / Main threads */
+ long long stat_io_writes_processed; /* Number of write events processed by IO / Main threads */
+ _Atomic long long stat_total_reads_processed; /* Total number of read events processed */
+ _Atomic long long stat_total_writes_processed; /* Total number of write events processed */
/* The following two are used to track instantaneous metrics, like
* number of operations per second, network traffic. */
struct {