summaryrefslogtreecommitdiff
path: root/src/blocked.c
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2013-12-03 17:43:53 +0100
committerantirez <antirez@gmail.com>2013-12-03 17:43:53 +0100
commit82b672f6335ac2db32a724ba5dc10398c949a4a8 (patch)
tree20ea736c60ef3970e0df07e48ee0e9698d798bd1 /src/blocked.c
parent2e027c48e5ab5e43e547bc4fb091574d1c7ed52b (diff)
downloadredis-82b672f6335ac2db32a724ba5dc10398c949a4a8.tar.gz
BLPOP blocking code refactored to be generic & reusable.
Diffstat (limited to 'src/blocked.c')
-rw-r--r--src/blocked.c121
1 files changed, 121 insertions, 0 deletions
diff --git a/src/blocked.c b/src/blocked.c
new file mode 100644
index 000000000..3f4dd6e8d
--- /dev/null
+++ b/src/blocked.c
@@ -0,0 +1,121 @@
+/* blocked.c - generic support for blocking operations like BLPOP & WAIT.
+ *
+ * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * * Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * * Neither the name of Redis nor the names of its contributors may be used
+ * to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "redis.h"
+
+/* Get a timeout value from an object and store it into 'timeout'.
+ * The final timeout is always stored as milliseconds as a time where the
+ * timeout will expire, however the parsing is performed according to
+ * the 'unit' that can be seconds or milliseconds.
+ *
+ * Note that if the timeout is zero (usually from the point of view of
+ * commands API this means no timeout) the value stored into 'timeout'
+ * is zero. */
+int getTimeoutFromObjectOrReply(redisClient *c, robj *object, mstime_t *timeout, int unit) {
+ long long tval;
+
+ if (getLongLongFromObjectOrReply(c,object,&tval,
+ "timeout is not an integer or out of range") != REDIS_OK)
+ return REDIS_ERR;
+
+ if (tval < 0) {
+ addReplyError(c,"timeout is negative");
+ return REDIS_ERR;
+ }
+
+ if (tval > 0) {
+ if (unit == UNIT_SECONDS) tval *= 1000;
+ tval += mstime();
+ }
+ *timeout = tval;
+
+ return REDIS_OK;
+}
+
+/* Block a client for the specific operation type. Once the REDIS_BLOCKED
+ * flag is set client query buffer is not longer processed, but accumulated,
+ * and will be processed when the client is unblocked. */
+void blockClient(redisClient *c, int btype) {
+ c->flags |= REDIS_BLOCKED;
+ c->btype = btype;
+ server.bpop_blocked_clients++;
+}
+
+/* This function is called in the beforeSleep() function of the event loop
+ * in order to process the pending input buffer of clients that were
+ * unblocked after a blocking operation. */
+void processUnblockedClients(void) {
+ listNode *ln;
+ redisClient *c;
+
+ while (listLength(server.unblocked_clients)) {
+ ln = listFirst(server.unblocked_clients);
+ redisAssert(ln != NULL);
+ c = ln->value;
+ listDelNode(server.unblocked_clients,ln);
+ c->flags &= ~REDIS_UNBLOCKED;
+ c->btype = REDIS_BLOCKED_NONE;
+
+ /* Process remaining data in the input buffer. */
+ if (c->querybuf && sdslen(c->querybuf) > 0) {
+ server.current_client = c;
+ processInputBuffer(c);
+ server.current_client = NULL;
+ }
+ }
+}
+
+/* Unblock a client calling the right function depending on the kind
+ * of operation the client is blocking for. */
+void unblockClient(redisClient *c) {
+ if (c->btype == REDIS_BLOCKED_LIST) {
+ unblockClientWaitingData(c);
+ } else {
+ redisPanic("Unknown btype in unblockClient().");
+ }
+ /* Clear the flags, and put the client in the unblocked list so that
+ * we'll process new commands in its query buffer ASAP. */
+ c->flags &= ~REDIS_BLOCKED;
+ c->flags |= REDIS_UNBLOCKED;
+ c->btype = REDIS_BLOCKED_NONE;
+ server.bpop_blocked_clients--;
+ listAddNodeTail(server.unblocked_clients,c);
+}
+
+/* This function gets called when a blocked client timed out in order to
+ * send it a reply of some kind. */
+void replyToBlockedClientTimedOut(redisClient *c) {
+ if (c->btype == REDIS_BLOCKED_LIST) {
+ addReply(c,shared.nullmultibulk);
+ } else {
+ redisPanic("Unknown btype in replyToBlockedClientTimedOut().");
+ }
+}
+