summaryrefslogtreecommitdiff
path: root/src/repmgr/repmgr_queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/repmgr/repmgr_queue.c')
-rw-r--r--src/repmgr/repmgr_queue.c132
1 files changed, 115 insertions, 17 deletions
diff --git a/src/repmgr/repmgr_queue.c b/src/repmgr/repmgr_queue.c
index 6a381acf..3a51b32b 100644
--- a/src/repmgr/repmgr_queue.c
+++ b/src/repmgr/repmgr_queue.c
@@ -1,7 +1,7 @@
/*-
* See the file LICENSE for redistribution information.
*
- * Copyright (c) 2006, 2012 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2006, 2015 Oracle and/or its affiliates. All rights reserved.
*
* $Id$
*/
@@ -22,13 +22,28 @@ __repmgr_queue_destroy(env)
ENV *env;
{
DB_REP *db_rep;
+ REP *rep;
REPMGR_MESSAGE *m;
REPMGR_CONNECTION *conn;
+ u_int32_t mtype;
int ret, t_ret;
+ COMPQUIET(mtype, 0);
+
db_rep = env->rep_handle;
+ rep = db_rep->region;
ret = 0;
+
+ /*
+ * Turn on the DB_EVENT_REP_INQUEUE_FULL event firing. We only do
+ * this for the main listener process. For a subordinate process,
+ * it is always turned on.
+ */
+ if (!STAILQ_EMPTY(&db_rep->input_queue.header) &&
+ !IS_SUBORDINATE(db_rep))
+ rep->inqueue_full_event_on = 1;
+
while (!STAILQ_EMPTY(&db_rep->input_queue.header)) {
m = STAILQ_FIRST(&db_rep->input_queue.header);
STAILQ_REMOVE_HEAD(&db_rep->input_queue.header, entries);
@@ -38,8 +53,25 @@ __repmgr_queue_destroy(env)
ret == 0)
ret = t_ret;
}
+ if (m->msg_hdr.type == REPMGR_OWN_MSG) {
+ mtype = REPMGR_OWN_MSG_TYPE(m->msg_hdr);
+ if ((conn = m->v.gmdb_msg.conn) != NULL) {
+ /*
+ * A site that removed itself may have already
+ * closed its connections.
+ */
+ if ((t_ret = __repmgr_close_connection(env,
+ conn)) != 0 && ret == 0 &&
+ mtype != REPMGR_REMOVE_REQUEST)
+ ret = t_ret;
+ if ((t_ret = __repmgr_decr_conn_ref(env,
+ conn)) != 0 && ret == 0)
+ ret = t_ret;
+ }
+ }
__os_free(env, m);
}
+
return (ret);
}
@@ -60,14 +92,17 @@ __repmgr_queue_get(env, msgp, th)
REPMGR_RUNNABLE *th;
{
DB_REP *db_rep;
+ REP *rep;
REPMGR_MESSAGE *m;
#ifdef DB_WIN32
HANDLE wait_events[2];
#endif
+ u_int32_t msgsize;
int ret;
ret = 0;
db_rep = env->rep_handle;
+ rep = db_rep->region;
while ((m = available_work(env)) == NULL &&
db_rep->repmgr_status == running && !th->quit_requested) {
@@ -104,10 +139,42 @@ __repmgr_queue_get(env, msgp, th)
else {
STAILQ_REMOVE(&db_rep->input_queue.header,
m, __repmgr_message, entries);
- db_rep->input_queue.size--;
+ msgsize = (u_int32_t)m->size;
+ while (msgsize >= GIGABYTE) {
+ DB_ASSERT(env, db_rep->input_queue.gbytes > 0);
+ db_rep->input_queue.gbytes--;
+ msgsize -= GIGABYTE;
+ }
+ if (db_rep->input_queue.bytes < msgsize) {
+ DB_ASSERT(env, db_rep->input_queue.gbytes > 0);
+ db_rep->input_queue.gbytes--;
+ db_rep->input_queue.bytes += GIGABYTE;
+ }
+ db_rep->input_queue.bytes -= msgsize;
+
+ /*
+ * Check if current size is out of the red zone.
+ * If it is, we will turn on the DB_EVENT_REP_INQUEUE_FULL
+ * event firing.
+ *
+ * We only have the redzone machanism for the main listener
+ * process.
+ */
+ if (!IS_SUBORDINATE(db_rep) &&
+ rep->inqueue_full_event_on == 0) {
+ MUTEX_LOCK(env, rep->mtx_repmgr);
+ if (db_rep->input_queue.gbytes <
+ rep->inqueue_rz_gbytes ||
+ (db_rep->input_queue.gbytes ==
+ rep->inqueue_rz_gbytes &&
+ db_rep->input_queue.bytes <
+ rep->inqueue_rz_bytes))
+ rep->inqueue_full_event_on = 1;
+ MUTEX_UNLOCK(env, rep->mtx_repmgr);
+ }
+
*msgp = m;
}
-
err:
return (ret);
}
@@ -157,24 +224,55 @@ __repmgr_queue_put(env, msg)
REPMGR_MESSAGE *msg;
{
DB_REP *db_rep;
+ REP *rep;
+ u_int32_t msgsize;
db_rep = env->rep_handle;
+ rep = db_rep->region;
+
+ /*
+ * Drop message if incoming queue contains more messages than the
+ * limit. See dbenv->repmgr_set_incoming_queue_max() for more
+ * information.
+ */
+ MUTEX_LOCK(env, rep->mtx_repmgr);
+ if (db_rep->input_queue.gbytes > rep->inqueue_max_gbytes ||
+ (db_rep->input_queue.gbytes == rep->inqueue_max_gbytes &&
+ db_rep->input_queue.bytes >= rep->inqueue_max_bytes)) {
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+ "incoming queue limit exceeded"));
+ STAT(rep->mstat.st_incoming_msgs_dropped++);
+ if (IS_SUBORDINATE(db_rep) || rep->inqueue_full_event_on) {
+ DB_EVENT(env, DB_EVENT_REP_INQUEUE_FULL, NULL);
+ /*
+ * We will always disable the event firing after
+ * the queue is full. It will be enabled again
+ * after the incoming queue size is out of the
+ * redzone.
+ *
+ * We only have the redzone machanism for the main
+ * listener process.
+ */
+ if (!IS_SUBORDINATE(db_rep))
+ rep->inqueue_full_event_on = 0;
+ }
+ MUTEX_UNLOCK(env, rep->mtx_repmgr);
+ __os_free(env, msg);
+ return (0);
+ }
+ MUTEX_UNLOCK(env, rep->mtx_repmgr);
STAILQ_INSERT_TAIL(&db_rep->input_queue.header, msg, entries);
- db_rep->input_queue.size++;
+ msgsize = (u_int32_t)msg->size;
+ while (msgsize >= GIGABYTE) {
+ msgsize -= GIGABYTE;
+ db_rep->input_queue.gbytes++;
+ }
+ db_rep->input_queue.bytes += msgsize;
+ if (db_rep->input_queue.bytes >= GIGABYTE) {
+ db_rep->input_queue.gbytes++;
+ db_rep->input_queue.bytes -= GIGABYTE;
+ }
return (__repmgr_signal(&db_rep->msg_avail));
}
-
-/*
- * PUBLIC: int __repmgr_queue_size __P((ENV *));
- *
- * !!!
- * Caller must hold repmgr->mutex.
- */
-int
-__repmgr_queue_size(env)
- ENV *env;
-{
- return (env->rep_handle->input_queue.size);
-}