/* blocked.c - generic support for blocking operations like BLPOP & WAIT. * * Copyright (c) 2009-2012, Salvatore Sanfilippo * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * * Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * * Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * Neither the name of Redis nor the names of its contributors may be used * to endorse or promote products derived from this software without * specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. * * --------------------------------------------------------------------------- * * API: * * getTimeoutFromObjectOrReply() is just an utility function to parse a * timeout argument since blocking operations usually require a timeout. * * blockClient() set the REDIS_BLOCKED flag in the client, and set the * specified block type 'btype' filed to one of REDIS_BLOCKED_* macros. * * unblockClient() unblocks the client doing the following: * 1) It calls the btype-specific function to cleanup the state. * 2) It unblocks the client by unsetting the REDIS_BLOCKED flag. * 3) It puts the client into a list of just unblocked clients that are * processed ASAP in the beforeSleep() event loop callback, so that * if there is some query buffer to process, we do it. This is also * required because otherwise there is no 'readable' event fired, we * already read the pending commands. We also set the REDIS_UNBLOCKED * flag to remember the client is in the unblocked_clients list. * * processUnblockedClients() is called inside the beforeSleep() function * to process the query buffer from unblocked clients and remove the clients * from the blocked_clients queue. * * replyToBlockedClientTimedOut() is called by the cron function when * a client blocked reaches the specified timeout (if the timeout is set * to 0, no timeout is processed). * It usually just needs to send a reply to the client. * * When implementing a new type of blocking opeation, the implementation * should modify unblockClient() and replyToBlockedClientTimedOut() in order * to handle the btype-specific behavior of this two functions. */ #include "redis.h" /* Get a timeout value from an object and store it into 'timeout'. * The final timeout is always stored as milliseconds as a time where the * timeout will expire, however the parsing is performed according to * the 'unit' that can be seconds or milliseconds. * * Note that if the timeout is zero (usually from the point of view of * commands API this means no timeout) the value stored into 'timeout' * is zero. */ int getTimeoutFromObjectOrReply(redisClient *c, robj *object, mstime_t *timeout, int unit) { long long tval; if (getLongLongFromObjectOrReply(c,object,&tval, "timeout is not an integer or out of range") != REDIS_OK) return REDIS_ERR; if (tval < 0) { addReplyError(c,"timeout is negative"); return REDIS_ERR; } if (tval > 0) { if (unit == UNIT_SECONDS) tval *= 1000; tval += mstime(); } *timeout = tval; return REDIS_OK; } /* Block a client for the specific operation type. Once the REDIS_BLOCKED * flag is set client query buffer is not longer processed, but accumulated, * and will be processed when the client is unblocked. */ void blockClient(redisClient *c, int btype) { c->flags |= REDIS_BLOCKED; c->btype = btype; server.bpop_blocked_clients++; } /* This function is called in the beforeSleep() function of the event loop * in order to process the pending input buffer of clients that were * unblocked after a blocking operation. */ void processUnblockedClients(void) { listNode *ln; redisClient *c; while (listLength(server.unblocked_clients)) { ln = listFirst(server.unblocked_clients); redisAssert(ln != NULL); c = ln->value; listDelNode(server.unblocked_clients,ln); c->flags &= ~REDIS_UNBLOCKED; c->btype = REDIS_BLOCKED_NONE; /* Process remaining data in the input buffer. */ if (c->querybuf && sdslen(c->querybuf) > 0) { server.current_client = c; processInputBuffer(c); server.current_client = NULL; } } } /* Unblock a client calling the right function depending on the kind * of operation the client is blocking for. */ void unblockClient(redisClient *c) { if (c->btype == REDIS_BLOCKED_LIST) { unblockClientWaitingData(c); } else if (c->btype == REDIS_BLOCKED_WAIT) { unblockClientWaitingReplicas(c); } else { redisPanic("Unknown btype in unblockClient()."); } /* Clear the flags, and put the client in the unblocked list so that * we'll process new commands in its query buffer ASAP. */ c->flags &= ~REDIS_BLOCKED; c->flags |= REDIS_UNBLOCKED; c->btype = REDIS_BLOCKED_NONE; server.bpop_blocked_clients--; listAddNodeTail(server.unblocked_clients,c); } /* This function gets called when a blocked client timed out in order to * send it a reply of some kind. */ void replyToBlockedClientTimedOut(redisClient *c) { if (c->btype == REDIS_BLOCKED_LIST) { addReply(c,shared.nullmultibulk); } else if (c->btype == REDIS_BLOCKED_WAIT) { addReplyLongLong(c,replicationCountAcksByOffset(c->bpop.reploffset)); } else { redisPanic("Unknown btype in replyToBlockedClientTimedOut()."); } }