diff options
author | antirez <antirez@gmail.com> | 2013-12-03 17:43:53 +0100 |
---|---|---|
committer | antirez <antirez@gmail.com> | 2013-12-03 17:43:53 +0100 |
commit | 82b672f6335ac2db32a724ba5dc10398c949a4a8 (patch) | |
tree | 20ea736c60ef3970e0df07e48ee0e9698d798bd1 /src/blocked.c | |
parent | 2e027c48e5ab5e43e547bc4fb091574d1c7ed52b (diff) | |
download | redis-82b672f6335ac2db32a724ba5dc10398c949a4a8.tar.gz |
BLPOP blocking code refactored to be generic & reusable.
Diffstat (limited to 'src/blocked.c')
-rw-r--r-- | src/blocked.c | 121 |
1 files changed, 121 insertions, 0 deletions
diff --git a/src/blocked.c b/src/blocked.c new file mode 100644 index 000000000..3f4dd6e8d --- /dev/null +++ b/src/blocked.c @@ -0,0 +1,121 @@ +/* blocked.c - generic support for blocking operations like BLPOP & WAIT. + * + * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com> + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "redis.h" + +/* Get a timeout value from an object and store it into 'timeout'. + * The final timeout is always stored as milliseconds as a time where the + * timeout will expire, however the parsing is performed according to + * the 'unit' that can be seconds or milliseconds. + * + * Note that if the timeout is zero (usually from the point of view of + * commands API this means no timeout) the value stored into 'timeout' + * is zero. */ +int getTimeoutFromObjectOrReply(redisClient *c, robj *object, mstime_t *timeout, int unit) { + long long tval; + + if (getLongLongFromObjectOrReply(c,object,&tval, + "timeout is not an integer or out of range") != REDIS_OK) + return REDIS_ERR; + + if (tval < 0) { + addReplyError(c,"timeout is negative"); + return REDIS_ERR; + } + + if (tval > 0) { + if (unit == UNIT_SECONDS) tval *= 1000; + tval += mstime(); + } + *timeout = tval; + + return REDIS_OK; +} + +/* Block a client for the specific operation type. Once the REDIS_BLOCKED + * flag is set client query buffer is not longer processed, but accumulated, + * and will be processed when the client is unblocked. */ +void blockClient(redisClient *c, int btype) { + c->flags |= REDIS_BLOCKED; + c->btype = btype; + server.bpop_blocked_clients++; +} + +/* This function is called in the beforeSleep() function of the event loop + * in order to process the pending input buffer of clients that were + * unblocked after a blocking operation. */ +void processUnblockedClients(void) { + listNode *ln; + redisClient *c; + + while (listLength(server.unblocked_clients)) { + ln = listFirst(server.unblocked_clients); + redisAssert(ln != NULL); + c = ln->value; + listDelNode(server.unblocked_clients,ln); + c->flags &= ~REDIS_UNBLOCKED; + c->btype = REDIS_BLOCKED_NONE; + + /* Process remaining data in the input buffer. */ + if (c->querybuf && sdslen(c->querybuf) > 0) { + server.current_client = c; + processInputBuffer(c); + server.current_client = NULL; + } + } +} + +/* Unblock a client calling the right function depending on the kind + * of operation the client is blocking for. */ +void unblockClient(redisClient *c) { + if (c->btype == REDIS_BLOCKED_LIST) { + unblockClientWaitingData(c); + } else { + redisPanic("Unknown btype in unblockClient()."); + } + /* Clear the flags, and put the client in the unblocked list so that + * we'll process new commands in its query buffer ASAP. */ + c->flags &= ~REDIS_BLOCKED; + c->flags |= REDIS_UNBLOCKED; + c->btype = REDIS_BLOCKED_NONE; + server.bpop_blocked_clients--; + listAddNodeTail(server.unblocked_clients,c); +} + +/* This function gets called when a blocked client timed out in order to + * send it a reply of some kind. */ +void replyToBlockedClientTimedOut(redisClient *c) { + if (c->btype == REDIS_BLOCKED_LIST) { + addReply(c,shared.nullmultibulk); + } else { + redisPanic("Unknown btype in replyToBlockedClientTimedOut()."); + } +} + |