diff options
Diffstat (limited to 'src/repmgr/repmgr_queue.c')
-rw-r--r-- | src/repmgr/repmgr_queue.c | 132 |
1 files changed, 115 insertions, 17 deletions
diff --git a/src/repmgr/repmgr_queue.c b/src/repmgr/repmgr_queue.c index 6a381acf..3a51b32b 100644 --- a/src/repmgr/repmgr_queue.c +++ b/src/repmgr/repmgr_queue.c @@ -1,7 +1,7 @@ /*- * See the file LICENSE for redistribution information. * - * Copyright (c) 2006, 2012 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2006, 2015 Oracle and/or its affiliates. All rights reserved. * * $Id$ */ @@ -22,13 +22,28 @@ __repmgr_queue_destroy(env) ENV *env; { DB_REP *db_rep; + REP *rep; REPMGR_MESSAGE *m; REPMGR_CONNECTION *conn; + u_int32_t mtype; int ret, t_ret; + COMPQUIET(mtype, 0); + db_rep = env->rep_handle; + rep = db_rep->region; ret = 0; + + /* + * Turn on the DB_EVENT_REP_INQUEUE_FULL event firing. We only do + * this for the main listener process. For a subordinate process, + * it is always turned on. + */ + if (!STAILQ_EMPTY(&db_rep->input_queue.header) && + !IS_SUBORDINATE(db_rep)) + rep->inqueue_full_event_on = 1; + while (!STAILQ_EMPTY(&db_rep->input_queue.header)) { m = STAILQ_FIRST(&db_rep->input_queue.header); STAILQ_REMOVE_HEAD(&db_rep->input_queue.header, entries); @@ -38,8 +53,25 @@ __repmgr_queue_destroy(env) ret == 0) ret = t_ret; } + if (m->msg_hdr.type == REPMGR_OWN_MSG) { + mtype = REPMGR_OWN_MSG_TYPE(m->msg_hdr); + if ((conn = m->v.gmdb_msg.conn) != NULL) { + /* + * A site that removed itself may have already + * closed its connections. + */ + if ((t_ret = __repmgr_close_connection(env, + conn)) != 0 && ret == 0 && + mtype != REPMGR_REMOVE_REQUEST) + ret = t_ret; + if ((t_ret = __repmgr_decr_conn_ref(env, + conn)) != 0 && ret == 0) + ret = t_ret; + } + } __os_free(env, m); } + return (ret); } @@ -60,14 +92,17 @@ __repmgr_queue_get(env, msgp, th) REPMGR_RUNNABLE *th; { DB_REP *db_rep; + REP *rep; REPMGR_MESSAGE *m; #ifdef DB_WIN32 HANDLE wait_events[2]; #endif + u_int32_t msgsize; int ret; ret = 0; db_rep = env->rep_handle; + rep = db_rep->region; while ((m = available_work(env)) == NULL && db_rep->repmgr_status == running && !th->quit_requested) { @@ -104,10 +139,42 @@ __repmgr_queue_get(env, msgp, th) else { STAILQ_REMOVE(&db_rep->input_queue.header, m, __repmgr_message, entries); - db_rep->input_queue.size--; + msgsize = (u_int32_t)m->size; + while (msgsize >= GIGABYTE) { + DB_ASSERT(env, db_rep->input_queue.gbytes > 0); + db_rep->input_queue.gbytes--; + msgsize -= GIGABYTE; + } + if (db_rep->input_queue.bytes < msgsize) { + DB_ASSERT(env, db_rep->input_queue.gbytes > 0); + db_rep->input_queue.gbytes--; + db_rep->input_queue.bytes += GIGABYTE; + } + db_rep->input_queue.bytes -= msgsize; + + /* + * Check if current size is out of the red zone. + * If it is, we will turn on the DB_EVENT_REP_INQUEUE_FULL + * event firing. + * + * We only have the redzone machanism for the main listener + * process. + */ + if (!IS_SUBORDINATE(db_rep) && + rep->inqueue_full_event_on == 0) { + MUTEX_LOCK(env, rep->mtx_repmgr); + if (db_rep->input_queue.gbytes < + rep->inqueue_rz_gbytes || + (db_rep->input_queue.gbytes == + rep->inqueue_rz_gbytes && + db_rep->input_queue.bytes < + rep->inqueue_rz_bytes)) + rep->inqueue_full_event_on = 1; + MUTEX_UNLOCK(env, rep->mtx_repmgr); + } + *msgp = m; } - err: return (ret); } @@ -157,24 +224,55 @@ __repmgr_queue_put(env, msg) REPMGR_MESSAGE *msg; { DB_REP *db_rep; + REP *rep; + u_int32_t msgsize; db_rep = env->rep_handle; + rep = db_rep->region; + + /* + * Drop message if incoming queue contains more messages than the + * limit. See dbenv->repmgr_set_incoming_queue_max() for more + * information. + */ + MUTEX_LOCK(env, rep->mtx_repmgr); + if (db_rep->input_queue.gbytes > rep->inqueue_max_gbytes || + (db_rep->input_queue.gbytes == rep->inqueue_max_gbytes && + db_rep->input_queue.bytes >= rep->inqueue_max_bytes)) { + RPRINT(env, (env, DB_VERB_REPMGR_MISC, + "incoming queue limit exceeded")); + STAT(rep->mstat.st_incoming_msgs_dropped++); + if (IS_SUBORDINATE(db_rep) || rep->inqueue_full_event_on) { + DB_EVENT(env, DB_EVENT_REP_INQUEUE_FULL, NULL); + /* + * We will always disable the event firing after + * the queue is full. It will be enabled again + * after the incoming queue size is out of the + * redzone. + * + * We only have the redzone machanism for the main + * listener process. + */ + if (!IS_SUBORDINATE(db_rep)) + rep->inqueue_full_event_on = 0; + } + MUTEX_UNLOCK(env, rep->mtx_repmgr); + __os_free(env, msg); + return (0); + } + MUTEX_UNLOCK(env, rep->mtx_repmgr); STAILQ_INSERT_TAIL(&db_rep->input_queue.header, msg, entries); - db_rep->input_queue.size++; + msgsize = (u_int32_t)msg->size; + while (msgsize >= GIGABYTE) { + msgsize -= GIGABYTE; + db_rep->input_queue.gbytes++; + } + db_rep->input_queue.bytes += msgsize; + if (db_rep->input_queue.bytes >= GIGABYTE) { + db_rep->input_queue.gbytes++; + db_rep->input_queue.bytes -= GIGABYTE; + } return (__repmgr_signal(&db_rep->msg_avail)); } - -/* - * PUBLIC: int __repmgr_queue_size __P((ENV *)); - * - * !!! - * Caller must hold repmgr->mutex. - */ -int -__repmgr_queue_size(env) - ENV *env; -{ - return (env->rep_handle->input_queue.size); -} |