summaryrefslogtreecommitdiff
path: root/src/redis-benchmark.c
diff options
context:
space:
mode:
authorartix <artix2@gmail.com>2019-01-17 17:40:15 +0100
committerartix <artix2@gmail.com>2019-03-01 17:53:14 +0100
commitf95e01266eb42d7442f3b62242957e9e99e3ac90 (patch)
treea6187e1f23bc12ab2bdb8fd268d28751333b98f4 /src/redis-benchmark.c
parent5fd5799cf91fb8612d03a4d4292f25cfc73e1f84 (diff)
downloadredis-f95e01266eb42d7442f3b62242957e9e99e3ac90.tar.gz
Redis benchmark: configurable thread count in cluster mode and fixes
Diffstat (limited to 'src/redis-benchmark.c')
-rw-r--r--src/redis-benchmark.c49
1 files changed, 35 insertions, 14 deletions
diff --git a/src/redis-benchmark.c b/src/redis-benchmark.c
index ab71839ae..4e15fa580 100644
--- a/src/redis-benchmark.c
+++ b/src/redis-benchmark.c
@@ -53,7 +53,7 @@
#define UNUSED(V) ((void) V)
#define RANDPTR_INITIAL_SIZE 8
#define MAX_LATENCY_PRECISION 3
-#define MAX_THREADS 16
+#define MAX_THREADS 500
#define CLUSTER_SLOTS 16384
#define CLIENT_GET_EVENTLOOP(c) \
@@ -189,7 +189,13 @@ static void freeClient(client c) {
listNode *ln;
aeDeleteFileEvent(el,c->context->fd,AE_WRITABLE);
aeDeleteFileEvent(el,c->context->fd,AE_READABLE);
- aeStop(el);
+ if (c->thread_id >= 0) {
+ int requests_finished = 0;
+ atomicGet(config.requests_finished, requests_finished);
+ if (requests_finished >= config.requests) {
+ aeStop(el);
+ }
+ }
redisFree(c->context);
sdsfree(c->obuf);
zfree(c->randptr);
@@ -261,6 +267,7 @@ static void clientDone(client c) {
else atomicGet(config.requests_finished, requests_finished);
if (requests_finished >= config.requests) {
freeClient(c);
+ if (!config.num_threads && config.el) aeStop(config.el);
return;
}
if (config.keepalive) {
@@ -306,6 +313,7 @@ static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int is_err = (r->type == REDIS_REPLY_ERROR);
if (is_err && config.showerrors) {
+ /* TODO: static lasterr_time not thread-safe */
static time_t lasterr_time = 0;
time_t now = time(NULL);
if (lasterr_time != now) {
@@ -326,10 +334,13 @@ static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
clusterNode *node = c->cluster_node;
assert(node);
if (++node->current_slot_index >= node->slots_count) {
- fprintf(stderr,"Cluster node %s:%d has no more "
- "valid slots, aborting...\n", node->ip,
- node->port);
- exit(1);
+ if (config.showerrors) {
+ fprintf(stderr, "WARN: No more available slots in "
+ "node %s:%d\n", node->ip, node->port);
+ }
+ freeReplyObject(reply);
+ freeClient(c);
+ return;
}
}
@@ -352,9 +363,8 @@ static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
}
int requests_finished = 0;
atomicGetIncr(config.requests_finished, requests_finished, 1);
- requests_finished--;
if (requests_finished < config.requests)
- config.latency[++requests_finished] = c->latency;
+ config.latency[requests_finished] = c->latency;
c->pending--;
if (c->pending == 0) {
clientDone(c);
@@ -433,15 +443,20 @@ static client createClient(char *cmd, size_t len, client from, int thread_id) {
int is_cluster_client = (config.cluster_mode && thread_id >= 0);
client c = zmalloc(sizeof(struct _client));
- const char *ip;
- int port;
+ const char *ip = NULL;
+ int port = 0;
c->cluster_node = NULL;
if (config.hostsocket == NULL || is_cluster_client) {
if (!is_cluster_client) {
ip = config.hostip;
port = config.hostport;
} else {
- clusterNode *node = config.cluster_nodes[thread_id];
+ int node_idx = 0;
+ if (config.num_threads < config.cluster_node_count)
+ node_idx = config.liveclients % config.cluster_node_count;
+ else
+ node_idx = thread_id % config.cluster_node_count;
+ clusterNode *node = config.cluster_nodes[node_idx];
if (node == NULL) exit(1);
ip = (const char *) node->ip;
port = node->port;
@@ -556,7 +571,7 @@ static client createClient(char *cmd, size_t len, client from, int thread_id) {
while ((p = strstr(p,"{tag}")) != NULL) {
if (c->stagfree == 0) {
c->stagptr = zrealloc(c->stagptr,
- sizeof(char*)*c->staglen*2);
+ sizeof(char*) * c->staglen*2);
c->stagfree += c->staglen;
}
c->stagptr[c->staglen++] = p;
@@ -1112,6 +1127,10 @@ int showThroughput(struct aeEventLoop *eventLoop, long long id, void *clientData
fprintf(stderr,"All clients disconnected... aborting.\n");
exit(1);
}
+ if (config.num_threads && requests_finished >= config.requests) {
+ aeStop(eventLoop);
+ return AE_NOMORE;
+ }
if (config.csv) return 250;
if (config.idlemode == 1) {
printf("clients: %d\r", config.liveclients);
@@ -1214,8 +1233,10 @@ int main(int argc, const char **argv) {
if (node->name) printf("%s ", node->name);
printf("%s:%d\n", node->ip, node->port);
}
- /* TODO: allow for more thrads per cluster node. */
- config.num_threads = config.cluster_node_count;
+ /* Automatically set thread number to node count if not specified
+ * by the user. */
+ if (config.num_threads == 0)
+ config.num_threads = config.cluster_node_count;
}
if (config.num_threads > 0) {