summaryrefslogtreecommitdiff
path: root/src/redis-benchmark.c
diff options
context:
space:
mode:
authorartix <artix2@gmail.com>2018-09-29 12:59:03 +0200
committerartix <artix2@gmail.com>2019-03-01 17:53:14 +0100
commit434f7613041045d6794bde8b93aa1e733bbab6ca (patch)
treec727a66537783d06670df2a4787bfbd2fa187dfe /src/redis-benchmark.c
parentdfd3cc5f783630c1611ff2922f6f1f1171be44f5 (diff)
downloadredis-434f7613041045d6794bde8b93aa1e733bbab6ca.tar.gz
Redis benchmark: table-based slot hashtag placeholder replacement in cluster mode.
Diffstat (limited to 'src/redis-benchmark.c')
-rw-r--r--src/redis-benchmark.c122
1 files changed, 97 insertions, 25 deletions
diff --git a/src/redis-benchmark.c b/src/redis-benchmark.c
index 636114a28..8db554fea 100644
--- a/src/redis-benchmark.c
+++ b/src/redis-benchmark.c
@@ -98,7 +98,6 @@ static struct config {
int cluster_mode;
int cluster_node_count;
struct clusterNode **cluster_nodes;
- struct clusterNode *cluster_slots[CLUSTER_SLOTS];
/* Thread mutexes to be used as fallbacks by atomicvar.h */
pthread_mutex_t requests_issued_mutex;
pthread_mutex_t requests_finished_mutex;
@@ -111,6 +110,9 @@ typedef struct _client {
char **randptr; /* Pointers to :rand: strings inside the command buf */
size_t randlen; /* Number of pointers in client->randptr */
size_t randfree; /* Number of unused pointers in client->randptr */
+ char **stagptr; /* Pointers to slot hashtags (cluster mode only) */
+ size_t staglen; /* Number of pointers in client->stagptr */
+ size_t stagfree; /* Number of unused pointers in client->stagptr */
size_t written; /* Bytes of 'obuf' already written */
long long start; /* Start time of a request */
long long latency; /* Request latency */
@@ -120,6 +122,7 @@ typedef struct _client {
benchmark commands and discarded after the first send. */
int prefixlen; /* Size in bytes of the pending prefix commands */
int thread_id;
+ struct clusterNode *cluster_node;
} *client;
/* Threads. */
@@ -137,8 +140,8 @@ typedef struct clusterNode {
sds name;
int flags;
sds replicate; /* Master ID if node is a slave */
- uint8_t slots[CLUSTER_SLOTS];
- int slots_count;
+ list *slots;
+ listIter slot_iter;
int replicas_count;
sds *migrating; /* An array of sds where even strings are slots and odd
* strings are the destination node IDs. */
@@ -188,6 +191,7 @@ static void freeClient(client c) {
redisFree(c->context);
sdsfree(c->obuf);
zfree(c->randptr);
+ zfree(c->stagptr);
zfree(c);
if (config.num_threads) pthread_mutex_lock(&(config.liveclients_mutex));
config.liveclients--;
@@ -232,6 +236,28 @@ static void randomizeClientKey(client c) {
}
}
+static void setClusterKeyHashTag(client c) {
+ assert(c->thread_id >= 0);
+ clusterNode *node = c->cluster_node;
+ assert(node);
+ listNode *ln = listNext(&node->slot_iter);
+ if (ln == NULL) {
+ listRewind(node->slots, &(node->slot_iter));
+ ln = listNext(&(node->slot_iter));
+ assert(ln != NULL);
+ }
+ int slot = (int) ln->value;
+ const char *tag = crc16_slot_table[slot];
+ int taglen = strlen(tag);
+ size_t i;
+ for (i = 0; i < c->staglen; i++) {
+ char *p = c->stagptr[i] + 1;
+ p[0] = tag[0];
+ p[1] = (taglen >= 2 ? tag[1] : '}');
+ p[2] = (taglen == 3 ? tag[2] : '}');
+ }
+}
+
static void clientDone(client c) {
int requests_finished = 0;
if (!config.num_threads) requests_finished = config.requests_finished;
@@ -294,7 +320,12 @@ static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
redisReply *r = reply;
if (r->type == REDIS_REPLY_ERROR && lasterr_time != now) {
lasterr_time = now;
- printf("Error from server: %s\n", r->str);
+ if (c->cluster_node) {
+ printf("Error from server %s:%d: %s\n",
+ c->cluster_node->ip,
+ c->cluster_node->port,
+ r->str);
+ } else printf("Error from server: %s\n", r->str);
}
}
@@ -354,6 +385,7 @@ static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
/* Really initialize: randomize keys and set start time. */
if (config.randomkeys) randomizeClientKey(c);
+ if (config.cluster_mode && c->staglen > 0) setClusterKeyHashTag(c);
c->start = ustime();
c->latency = -1;
}
@@ -401,9 +433,10 @@ static client createClient(char *cmd, size_t len, client from, int thread_id) {
int is_cluster_client = (config.cluster_mode && thread_id >= 0);
client c = zmalloc(sizeof(struct _client));
+ const char *ip;
+ int port;
+ c->cluster_node = NULL;
if (config.hostsocket == NULL || is_cluster_client) {
- const char *ip;
- int port;
if (!is_cluster_client) {
ip = config.hostip;
port = config.hostport;
@@ -412,6 +445,7 @@ static client createClient(char *cmd, size_t len, client from, int thread_id) {
if (node == NULL) exit(1);
ip = (const char *) node->ip;
port = node->port;
+ c->cluster_node = node;
}
c->context = redisConnectNonBlock(ip,port);
} else {
@@ -419,8 +453,8 @@ static client createClient(char *cmd, size_t len, client from, int thread_id) {
}
if (c->context->err) {
fprintf(stderr,"Could not connect to Redis at ");
- if (config.hostsocket == NULL)
- fprintf(stderr,"%s:%d: %s\n",config.hostip,config.hostport,c->context->errstr);
+ if (config.hostsocket == NULL || is_cluster_client)
+ fprintf(stderr,"%s:%d: %s\n",ip,port,c->context->errstr);
else
fprintf(stderr,"%s: %s\n",config.hostsocket,c->context->errstr);
exit(1);
@@ -469,6 +503,8 @@ static client createClient(char *cmd, size_t len, client from, int thread_id) {
c->pending = config.pipeline+c->prefix_pending;
c->randptr = NULL;
c->randlen = 0;
+ c->stagptr = NULL;
+ c->staglen = 0;
/* Find substrings in the output buffer that need to be randomized. */
if (config.randomkeys) {
@@ -499,6 +535,36 @@ static client createClient(char *cmd, size_t len, client from, int thread_id) {
}
}
}
+ /* If cluster mode is enabled, set slot hashtags pointers. */
+ if (config.cluster_mode) {
+ if (from) {
+ c->staglen = from->staglen;
+ c->stagfree = 0;
+ c->stagptr = zmalloc(sizeof(char*)*c->staglen);
+ /* copy the offsets. */
+ for (j = 0; j < (int)c->staglen; j++) {
+ c->stagptr[j] = c->obuf + (from->stagptr[j]-from->obuf);
+ /* Adjust for the different select prefix length. */
+ c->stagptr[j] += c->prefixlen - from->prefixlen;
+ }
+ } else {
+ char *p = c->obuf;
+
+ c->staglen = 0;
+ c->stagfree = RANDPTR_INITIAL_SIZE;
+ c->stagptr = zmalloc(sizeof(char*)*c->stagfree);
+ while ((p = strstr(p,"{tag}")) != NULL) {
+ if (c->stagfree == 0) {
+ c->stagptr = zrealloc(c->stagptr,
+ sizeof(char*)*c->staglen*2);
+ c->stagfree += c->staglen;
+ }
+ c->stagptr[c->staglen++] = p;
+ c->stagfree--;
+ p += 5; /* 12 is strlen("{tag}"). */
+ }
+ }
+ }
aeEventLoop *el = NULL;
if (thread_id < 0) el = config.el;
else {
@@ -667,8 +733,7 @@ static clusterNode *createClusterNode(char *ip, int port) {
node->flags = 0;
node->replicate = NULL;
node->replicas_count = 0;
- memset(node->slots, 0, sizeof(node->slots));
- node->slots_count = 0;
+ node->slots = listCreate();
node->migrating = NULL;
node->importing = NULL;
node->migrating_count = 0;
@@ -688,6 +753,11 @@ static void freeClusterNode(clusterNode *node) {
for (i = 0; i < node->importing_count; i++) sdsfree(node->importing[i]);
zfree(node->importing);
}
+ /* If the node is not the reference node, that uses the address from
+ * config.hostip and config.hostport, then the node ip has been
+ * allocated by fetchClusterConfiguration, so it must be freed. */
+ if (node->ip && strcmp(node->ip, config.hostip) != 0) sdsfree(node->ip);
+ listRelease(node->slots);
zfree(node);
}
@@ -785,17 +855,13 @@ static int fetchClusterConfiguration() {
/* If internal bus is specified, then just drop it. */
if ((paddr = strchr(addr, '@')) != NULL) *paddr = '\0';
int port = atoi(addr);
- node = createClusterNode(ip, port);
+ node = createClusterNode(sdsnew(ip), port);
}
if (node == NULL) {
success = 0;
goto cleanup;
}
node->name = sdsnew(name);
- if (!addClusterNode(node)) {
- success = 0;
- goto cleanup;
- }
if (i == 8) {
int remaining = strlen(line);
while (remaining > 0) {
@@ -848,20 +914,26 @@ static int fetchClusterConfiguration() {
*p = '\0';
start = atoi(slotsdef);
stop = atoi(p + 1);
- node->slots_count += (stop - (start - 1));
while (start <= stop) {
int slot = start++;
- node->slots[slot] = 1;
- config.cluster_slots[slot] = node;
+ listAddNodeTail(node->slots, (void *)(uintptr_t)slot);
}
} else if (p > slotsdef) {
int slot = atoi(slotsdef);
- node->slots[slot] = 1;
- node->slots_count++;
- config.cluster_slots[slot] = node;
+ listAddNodeTail(node->slots, (void *)(uintptr_t)slot);
}
}
}
+ if (listLength(node->slots) == 0) {
+ printf("WARNING: master node %s:%d has no slots, skipping...\n",
+ node->ip, node->port);
+ continue;
+ }
+ if (!addClusterNode(node)) {
+ success = 0;
+ goto cleanup;
+ }
+ listRewind(node->slots, &(node->slot_iter));
}
cleanup:
if (ctx) redisFree(ctx);
@@ -1106,7 +1178,6 @@ int main(int argc, const char **argv) {
config.cluster_mode = 0;
config.cluster_node_count = 0;
config.cluster_nodes = NULL;
- memset(config.cluster_slots, 0, sizeof(config.cluster_slots));
i = parseOptions(argc,argv);
argc -= i;
@@ -1142,6 +1213,7 @@ int main(int argc, const char **argv) {
if (node->name) printf("%s ", node->name);
printf("%s:%d\n", node->ip, node->port);
}
+ /* TODO: allow for more thrads per cluster node. */
config.num_threads = config.cluster_node_count;
}
@@ -1196,19 +1268,19 @@ int main(int argc, const char **argv) {
}
if (test_is_selected("set")) {
- len = redisFormatCommand(&cmd,"SET key:__rand_int__ %s",data);
+ len = redisFormatCommand(&cmd,"SET key:__rand_int__{tag} %s",data);
benchmark("SET",cmd,len);
free(cmd);
}
if (test_is_selected("get")) {
- len = redisFormatCommand(&cmd,"GET key:__rand_int__");
+ len = redisFormatCommand(&cmd,"GET key:__rand_int__{tag}");
benchmark("GET",cmd,len);
free(cmd);
}
if (test_is_selected("incr")) {
- len = redisFormatCommand(&cmd,"INCR counter:__rand_int__");
+ len = redisFormatCommand(&cmd,"INCR counter:__rand_int__{tag}");
benchmark("INCR",cmd,len);
free(cmd);
}