summaryrefslogtreecommitdiff
path: root/src/repmgr/repmgr_posix.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/repmgr/repmgr_posix.c')
-rw-r--r--src/repmgr/repmgr_posix.c804
1 files changed, 804 insertions, 0 deletions
diff --git a/src/repmgr/repmgr_posix.c b/src/repmgr/repmgr_posix.c
new file mode 100644
index 00000000..0687681a
--- /dev/null
+++ b/src/repmgr/repmgr_posix.c
@@ -0,0 +1,804 @@
+/*-
+ * See the file LICENSE for redistribution information.
+ *
+ * Copyright (c) 2005, 2012 Oracle and/or its affiliates. All rights reserved.
+ *
+ * $Id$
+ */
+
+#include "db_config.h"
+
+#include "db_int.h"
+
+/*
+ * Invalid open file descriptor value, that can be used as an out-of-band
+ * sentinel to mark our signalling pipe as unopened.
+ */
+#define NO_SUCH_FILE_DESC (-1)
+
+/* Aggregated control info needed for preparing for select() call. */
+struct io_info {
+ fd_set *reads, *writes;
+ int maxfd;
+};
+
+static int __repmgr_conn_work __P((ENV *, REPMGR_CONNECTION *, void *));
+static int prepare_io __P((ENV *, REPMGR_CONNECTION *, void *));
+
+/*
+ * Starts the thread described in the argument, and stores the resulting thread
+ * ID therein.
+ *
+ * PUBLIC: int __repmgr_thread_start __P((ENV *, REPMGR_RUNNABLE *));
+ */
+int
+__repmgr_thread_start(env, runnable)
+ ENV *env;
+ REPMGR_RUNNABLE *runnable;
+{
+ pthread_attr_t *attrp;
+#if defined(_POSIX_THREAD_ATTR_STACKSIZE) && defined(DB_STACKSIZE)
+ pthread_attr_t attributes;
+ size_t size;
+ int ret;
+
+ attrp = &attributes;
+ if ((ret = pthread_attr_init(&attributes)) != 0) {
+ __db_err(env, ret, DB_STR("3630",
+ "pthread_attr_init in repmgr_thread_start"));
+ return (ret);
+ }
+
+ size = DB_STACKSIZE;
+
+#ifdef PTHREAD_STACK_MIN
+ if (size < PTHREAD_STACK_MIN)
+ size = PTHREAD_STACK_MIN;
+#endif
+ if ((ret = pthread_attr_setstacksize(&attributes, size)) != 0) {
+ __db_err(env, ret, DB_STR("3631",
+ "pthread_attr_setstacksize in repmgr_thread_start"));
+ return (ret);
+ }
+#else
+ attrp = NULL;
+#endif
+
+ runnable->finished = FALSE;
+ runnable->quit_requested = FALSE;
+ runnable->env = env;
+
+ return (pthread_create(&runnable->thread_id, attrp,
+ runnable->run, runnable));
+}
+
+/*
+ * PUBLIC: int __repmgr_thread_join __P((REPMGR_RUNNABLE *));
+ */
+int
+__repmgr_thread_join(thread)
+ REPMGR_RUNNABLE *thread;
+{
+ return (pthread_join(thread->thread_id, NULL));
+}
+
+/*
+ * PUBLIC: int __repmgr_set_nonblock_conn __P((REPMGR_CONNECTION *));
+ */
+int
+__repmgr_set_nonblock_conn(conn)
+ REPMGR_CONNECTION *conn;
+{
+ return (__repmgr_set_nonblocking(conn->fd));
+}
+
+/*
+ * PUBLIC: int __repmgr_set_nonblocking __P((socket_t));
+ */
+int
+__repmgr_set_nonblocking(fd)
+ socket_t fd;
+{
+ int flags;
+
+ if ((flags = fcntl(fd, F_GETFL, 0)) < 0)
+ return (errno);
+ if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0)
+ return (errno);
+ return (0);
+}
+
+/*
+ * PUBLIC: int __repmgr_wake_waiters __P((ENV *, waiter_t *));
+ *
+ * Wake any "waiter" threads (either sending threads waiting for acks, or
+ * channel users waiting for response to request).
+ *
+ * !!!
+ * Caller must hold the db_rep->mutex, if this thread synchronization is to work
+ * properly.
+ */
+int
+__repmgr_wake_waiters(env, waiter)
+ ENV *env;
+ waiter_t *waiter;
+{
+ COMPQUIET(env, NULL);
+ return (pthread_cond_broadcast(waiter));
+}
+
+/*
+ * Waits a limited time for a condition to become true. (If the limit is 0 we
+ * wait forever.) All calls share just the one db_rep->mutex, but use whatever
+ * waiter_t the caller passes us.
+ *
+ * PUBLIC: int __repmgr_await_cond __P((ENV *,
+ * PUBLIC: PREDICATE, void *, db_timeout_t, waiter_t *));
+ */
+int
+__repmgr_await_cond(env, pred, ctx, timeout, wait_condition)
+ ENV *env;
+ PREDICATE pred;
+ void *ctx;
+ db_timeout_t timeout;
+ waiter_t *wait_condition;
+{
+ DB_REP *db_rep;
+ struct timespec deadline;
+ int ret, timed;
+
+ db_rep = env->rep_handle;
+ if ((timed = (timeout > 0)))
+ __repmgr_compute_wait_deadline(env, &deadline, timeout);
+ else
+ COMPQUIET(deadline.tv_sec, 0);
+
+ while (!(*pred)(env, ctx)) {
+ if (timed)
+ ret = pthread_cond_timedwait(wait_condition,
+ db_rep->mutex, &deadline);
+ else
+ ret = pthread_cond_wait(wait_condition, db_rep->mutex);
+ if (db_rep->repmgr_status == stopped)
+ return (DB_REP_UNAVAIL);
+ if (ret == ETIMEDOUT)
+ return (DB_TIMEOUT);
+ if (ret != 0)
+ return (ret);
+ }
+ return (0);
+}
+
+/*
+ * Waits for an in-progress membership DB operation (if any) to complete.
+ *
+ * PUBLIC: int __repmgr_await_gmdbop __P((ENV *));
+ *
+ * Caller holds mutex; we drop it while waiting.
+ */
+int
+__repmgr_await_gmdbop(env)
+ ENV *env;
+{
+ DB_REP *db_rep;
+ int ret;
+
+ db_rep = env->rep_handle;
+ while (db_rep->gmdb_busy)
+ if ((ret = pthread_cond_wait(&db_rep->gmdb_idle,
+ db_rep->mutex)) != 0)
+ return (ret);
+ return (0);
+}
+
+/*
+ * __repmgr_compute_wait_deadline --
+ * Computes a deadline time a certain distance into the future.
+ *
+ * PUBLIC: void __repmgr_compute_wait_deadline __P((ENV*,
+ * PUBLIC: struct timespec *, db_timeout_t));
+ */
+void
+__repmgr_compute_wait_deadline(env, result, wait)
+ ENV *env;
+ struct timespec *result;
+ db_timeout_t wait;
+{
+ /*
+ * The result is suitable for the pthread_cond_timewait call. (That
+ * call uses nano-second resolution; elsewhere we use microseconds.)
+ *
+ * Start with "now"; then add the "wait" offset.
+ *
+ * A db_timespec is the same as a "struct timespec" so we can pass
+ * result directly to the underlying Berkeley DB OS routine.
+ *
+ * !!!
+ * We use the system clock for the pthread_cond_timedwait call, but
+ * that's not optimal on systems with monotonic timers. Instead,
+ * we should call pthread_condattr_setclock on systems where it and
+ * monotonic timers are available, and then configure both this call
+ * and the subsequent pthread_cond_timewait call to use a monotonic
+ * timer.
+ */
+ __os_gettime(env, (db_timespec *)result, 0);
+ TIMESPEC_ADD_DB_TIMEOUT(result, wait);
+}
+
+/*
+ * PUBLIC: int __repmgr_await_drain __P((ENV *,
+ * PUBLIC: REPMGR_CONNECTION *, db_timeout_t));
+ *
+ * Waits for space to become available on the connection's output queue.
+ * Various ways we can exit:
+ *
+ * 1. queue becomes non-full
+ * 2. exceed time limit
+ * 3. connection becomes defunct (due to error in another thread)
+ * 4. repmgr is shutting down
+ * 5. any unexpected system resource failure
+ *
+ * In cases #3 and #5 we return an error code. Caller is responsible for
+ * distinguishing the remaining cases if desired, though we do help with #2 by
+ * showing the connection as congested.
+ *
+ * !!!
+ * Caller must hold repmgr->mutex.
+ */
+int
+__repmgr_await_drain(env, conn, timeout)
+ ENV *env;
+ REPMGR_CONNECTION *conn;
+ db_timeout_t timeout;
+{
+ DB_REP *db_rep;
+ struct timespec deadline;
+ int ret;
+
+ db_rep = env->rep_handle;
+
+ __repmgr_compute_wait_deadline(env, &deadline, timeout);
+
+ ret = 0;
+ while (conn->out_queue_length >= OUT_QUEUE_LIMIT) {
+ ret = pthread_cond_timedwait(&conn->drained,
+ db_rep->mutex, &deadline);
+ switch (ret) {
+ case 0:
+ if (db_rep->repmgr_status == stopped)
+ goto out; /* #4. */
+ /*
+ * Another thread could have stumbled into an error on
+ * the socket while we were waiting.
+ */
+ if (conn->state == CONN_DEFUNCT) {
+ ret = DB_REP_UNAVAIL; /* #3. */
+ goto out;
+ }
+ break;
+ case ETIMEDOUT:
+ conn->state = CONN_CONGESTED;
+ ret = 0;
+ goto out; /* #2. */
+ default:
+ goto out; /* #5. */
+ }
+ }
+ /* #1. */
+
+out:
+ return (ret);
+}
+
+/*
+ * PUBLIC: int __repmgr_alloc_cond __P((cond_var_t *));
+ *
+ * Initialize a condition variable (in allocated space).
+ */
+int
+__repmgr_alloc_cond(c)
+ cond_var_t *c;
+{
+ return (pthread_cond_init(c, NULL));
+}
+
+/*
+ * PUBLIC: int __repmgr_free_cond __P((cond_var_t *));
+ *
+ * Clean up a previously initialized condition variable.
+ */
+int
+__repmgr_free_cond(c)
+ cond_var_t *c;
+{
+ return (pthread_cond_destroy(c));
+}
+
+/*
+ * PUBLIC: void __repmgr_env_create_pf __P((DB_REP *));
+ */
+void
+__repmgr_env_create_pf(db_rep)
+ DB_REP *db_rep;
+{
+ db_rep->read_pipe = db_rep->write_pipe = NO_SUCH_FILE_DESC;
+}
+
+/*
+ * "Platform"-specific mutex creation function.
+ *
+ * PUBLIC: int __repmgr_create_mutex_pf __P((mgr_mutex_t *));
+ */
+int
+__repmgr_create_mutex_pf(mutex)
+ mgr_mutex_t *mutex;
+{
+ return (pthread_mutex_init(mutex, NULL));
+}
+
+/*
+ * PUBLIC: int __repmgr_destroy_mutex_pf __P((mgr_mutex_t *));
+ */
+int
+__repmgr_destroy_mutex_pf(mutex)
+ mgr_mutex_t *mutex;
+{
+ return (pthread_mutex_destroy(mutex));
+}
+
+/*
+ * PUBLIC: int __repmgr_init __P((ENV *));
+ */
+int
+__repmgr_init(env)
+ ENV *env;
+{
+ DB_REP *db_rep;
+ struct sigaction sigact;
+ int ack_inited, elect_inited, file_desc[2], gmdb_inited, queue_inited;
+ int ret;
+
+ db_rep = env->rep_handle;
+
+ /*
+ * Make sure we're not ignoring SIGPIPE, 'cuz otherwise we'd be killed
+ * just for trying to write onto a socket that had been reset. Note
+ * that we don't undo this in case of a later error, since we document
+ * that we leave the signal handling state like this, even after env
+ * close.
+ */
+ if (sigaction(SIGPIPE, NULL, &sigact) == -1) {
+ ret = errno;
+ __db_err(env, ret, DB_STR("3632",
+ "can't access signal handler"));
+ return (ret);
+ }
+ if (sigact.sa_handler == SIG_DFL) {
+ sigact.sa_handler = SIG_IGN;
+ sigact.sa_flags = 0;
+ if (sigaction(SIGPIPE, &sigact, NULL) == -1) {
+ ret = errno;
+ __db_err(env, ret, DB_STR("3633",
+ "can't access signal handler"));
+ return (ret);
+ }
+ }
+
+ ack_inited = elect_inited = gmdb_inited = queue_inited = FALSE;
+ if ((ret = __repmgr_init_waiters(env, &db_rep->ack_waiters)) != 0)
+ goto err;
+ ack_inited = TRUE;
+
+ if ((ret = pthread_cond_init(&db_rep->check_election, NULL)) != 0)
+ goto err;
+ elect_inited = TRUE;
+
+ if ((ret = pthread_cond_init(&db_rep->gmdb_idle, NULL)) != 0)
+ goto err;
+ gmdb_inited = TRUE;
+
+ if ((ret = pthread_cond_init(&db_rep->msg_avail, NULL)) != 0)
+ goto err;
+ queue_inited = TRUE;
+
+ if ((ret = pipe(file_desc)) == -1) {
+ ret = errno;
+ goto err;
+ }
+
+ db_rep->read_pipe = file_desc[0];
+ db_rep->write_pipe = file_desc[1];
+ return (0);
+err:
+ if (queue_inited)
+ (void)pthread_cond_destroy(&db_rep->msg_avail);
+ if (gmdb_inited)
+ (void)pthread_cond_destroy(&db_rep->gmdb_idle);
+ if (elect_inited)
+ (void)pthread_cond_destroy(&db_rep->check_election);
+ if (ack_inited)
+ (void)__repmgr_destroy_waiters(env, &db_rep->ack_waiters);
+ db_rep->read_pipe = db_rep->write_pipe = NO_SUCH_FILE_DESC;
+
+ return (ret);
+}
+
+/*
+ * PUBLIC: int __repmgr_deinit __P((ENV *));
+ */
+int
+__repmgr_deinit(env)
+ ENV *env;
+{
+ DB_REP *db_rep;
+ int ret, t_ret;
+
+ db_rep = env->rep_handle;
+
+ if (!(REPMGR_INITED(db_rep)))
+ return (0);
+
+ ret = pthread_cond_destroy(&db_rep->msg_avail);
+
+ if ((t_ret = pthread_cond_destroy(&db_rep->gmdb_idle)) != 0 &&
+ ret == 0)
+ ret = t_ret;
+
+ if ((t_ret = pthread_cond_destroy(&db_rep->check_election)) != 0 &&
+ ret == 0)
+ ret = t_ret;
+
+ if ((t_ret = __repmgr_destroy_waiters(env,
+ &db_rep->ack_waiters)) != 0 && ret == 0)
+ ret = t_ret;
+
+ if (close(db_rep->read_pipe) == -1 && ret == 0)
+ ret = errno;
+ if (close(db_rep->write_pipe) == -1 && ret == 0)
+ ret = errno;
+
+ db_rep->read_pipe = db_rep->write_pipe = NO_SUCH_FILE_DESC;
+ return (ret);
+}
+
+/*
+ * PUBLIC: int __repmgr_init_waiters __P((ENV *, waiter_t *));
+ */
+int
+__repmgr_init_waiters(env, waiters)
+ ENV *env;
+ waiter_t *waiters;
+{
+ COMPQUIET(env, NULL);
+ return (pthread_cond_init(waiters, NULL));
+}
+
+/*
+ * PUBLIC: int __repmgr_destroy_waiters __P((ENV *, waiter_t *));
+ */
+int
+__repmgr_destroy_waiters(env, waiters)
+ ENV *env;
+ waiter_t *waiters;
+{
+ COMPQUIET(env, NULL);
+ return (pthread_cond_destroy(waiters));
+}
+
+/*
+ * PUBLIC: int __repmgr_lock_mutex __P((mgr_mutex_t *));
+ */
+int
+__repmgr_lock_mutex(mutex)
+ mgr_mutex_t *mutex;
+{
+ return (pthread_mutex_lock(mutex));
+}
+
+/*
+ * PUBLIC: int __repmgr_unlock_mutex __P((mgr_mutex_t *));
+ */
+int
+__repmgr_unlock_mutex(mutex)
+ mgr_mutex_t *mutex;
+{
+ return (pthread_mutex_unlock(mutex));
+}
+
+/*
+ * Signals a condition variable.
+ *
+ * !!!
+ * Caller must hold mutex.
+ *
+ * PUBLIC: int __repmgr_signal __P((cond_var_t *));
+ */
+int
+__repmgr_signal(v)
+ cond_var_t *v;
+{
+ return (pthread_cond_broadcast(v));
+}
+
+/*
+ * Wake repmgr message processing threads, expressly for the purpose of shutting
+ * some subset of them down.
+ *
+ * !!!
+ * Caller must hold mutex.
+ *
+ * PUBLIC: int __repmgr_wake_msngers __P((ENV*, u_int));
+ */
+int
+__repmgr_wake_msngers(env, n)
+ ENV *env;
+ u_int n;
+{
+ DB_REP *db_rep;
+
+ COMPQUIET(n, 0);
+
+ db_rep = env->rep_handle;
+ return (__repmgr_signal(&db_rep->msg_avail));
+}
+
+/*
+ * PUBLIC: int __repmgr_wake_main_thread __P((ENV*));
+ *
+ * Can be called either with or without the mutex being held.
+ */
+int
+__repmgr_wake_main_thread(env)
+ ENV *env;
+{
+ DB_REP *db_rep;
+ u_int8_t any_value;
+
+ COMPQUIET(any_value, 0);
+ db_rep = env->rep_handle;
+
+ /*
+ * It doesn't matter what byte value we write. Just the appearance of a
+ * byte in the stream is enough to wake up the select() thread reading
+ * the pipe.
+ */
+ if (write(db_rep->write_pipe, VOID_STAR_CAST &any_value, 1) == -1)
+ return (errno);
+ return (0);
+}
+
+/*
+ * PUBLIC: int __repmgr_writev __P((socket_t, db_iovec_t *, int, size_t *));
+ */
+int
+__repmgr_writev(fd, iovec, buf_count, byte_count_p)
+ socket_t fd;
+ db_iovec_t *iovec;
+ int buf_count;
+ size_t *byte_count_p;
+{
+ int nw, result;
+
+ if ((nw = writev(fd, iovec, buf_count)) == -1) {
+ /* Why? See note at __repmgr_readv(). */
+ result = errno;
+ DB_ASSERT(NULL, result != 0);
+ return (result);
+ }
+ *byte_count_p = (size_t)nw;
+ return (0);
+}
+
+/*
+ * PUBLIC: int __repmgr_readv __P((socket_t, db_iovec_t *, int, size_t *));
+ */
+int
+__repmgr_readv(fd, iovec, buf_count, byte_count_p)
+ socket_t fd;
+ db_iovec_t *iovec;
+ int buf_count;
+ size_t *byte_count_p;
+{
+ int result;
+ ssize_t nw;
+
+ if ((nw = readv(fd, iovec, buf_count)) == -1) {
+ /*
+ * Why bother to assert this obvious "truth"? On some systems
+ * when the library is loaded into a single-threaded Tcl
+ * configuration the differing errno mechanisms apparently
+ * conflict, and we occasionally "see" a 0 value here! And that
+ * turns out to be painful to debug.
+ */
+ result = errno;
+ DB_ASSERT(NULL, result != 0);
+ return (result);
+ }
+ *byte_count_p = (size_t)nw;
+ return (0);
+}
+
+/*
+ * PUBLIC: int __repmgr_select_loop __P((ENV *));
+ */
+int
+__repmgr_select_loop(env)
+ ENV *env;
+{
+ struct timeval select_timeout, *select_timeout_p;
+ DB_REP *db_rep;
+ db_timespec timeout;
+ fd_set reads, writes;
+ struct io_info io_info;
+ int ret;
+ u_int8_t buf[10]; /* arbitrary size */
+
+ db_rep = env->rep_handle;
+ /*
+ * Almost this entire thread operates while holding the mutex. But note
+ * that it never blocks, except in the call to select() (which is the
+ * one place we relinquish the mutex).
+ */
+ LOCK_MUTEX(db_rep->mutex);
+ if ((ret = __repmgr_first_try_connections(env)) != 0)
+ goto out;
+ for (;;) {
+ FD_ZERO(&reads);
+ FD_ZERO(&writes);
+
+ /*
+ * Figure out which sockets to ask for input and output. It's
+ * simple for the signalling pipe and listen socket; but depends
+ * on backlog states for the connections to other sites.
+ */
+ FD_SET((u_int)db_rep->read_pipe, &reads);
+ io_info.maxfd = db_rep->read_pipe;
+
+ if (!IS_SUBORDINATE(db_rep)) {
+ FD_SET((u_int)db_rep->listen_fd, &reads);
+ if (db_rep->listen_fd > io_info.maxfd)
+ io_info.maxfd = db_rep->listen_fd;
+ }
+
+ io_info.reads = &reads;
+ io_info.writes = &writes;
+ if ((ret = __repmgr_each_connection(env,
+ prepare_io, &io_info, TRUE)) != 0)
+ goto out;
+
+ if (__repmgr_compute_timeout(env, &timeout)) {
+ /* Convert the timespec to a timeval. */
+ select_timeout.tv_sec = timeout.tv_sec;
+ select_timeout.tv_usec = timeout.tv_nsec / NS_PER_US;
+ select_timeout_p = &select_timeout;
+ } else {
+ /* No time-based events, so wait only for I/O. */
+ select_timeout_p = NULL;
+ }
+
+ UNLOCK_MUTEX(db_rep->mutex);
+
+ if ((ret = select(io_info.maxfd + 1,
+ &reads, &writes, NULL, select_timeout_p)) == -1) {
+ switch (ret = errno) {
+ case EINTR:
+ case EWOULDBLOCK:
+ LOCK_MUTEX(db_rep->mutex);
+ continue; /* simply retry */
+ default:
+ __db_err(env, ret, DB_STR("3634",
+ "select"));
+ return (ret);
+ }
+ }
+ LOCK_MUTEX(db_rep->mutex);
+ if (db_rep->repmgr_status == stopped) {
+ ret = 0;
+ goto out;
+ }
+
+ /*
+ * Timer expiration events include retrying of lost connections.
+ * Obviously elements can be added to the connection list there.
+ */
+ if ((ret = __repmgr_check_timeouts(env)) != 0)
+ goto out;
+
+ if ((ret = __repmgr_each_connection(env,
+ __repmgr_conn_work, &io_info, TRUE)) != 0)
+ goto out;
+
+ /*
+ * Read any bytes in the signalling pipe. Note that we don't
+ * actually need to do anything with them; they're just there to
+ * wake us up when necessary.
+ */
+ if (FD_ISSET((u_int)db_rep->read_pipe, &reads) &&
+ read(db_rep->read_pipe, VOID_STAR_CAST buf,
+ sizeof(buf)) <= 0) {
+ ret = errno;
+ goto out;
+ }
+ /*
+ * Obviously elements can be added to the connection list here.
+ */
+ if (!IS_SUBORDINATE(db_rep) &&
+ FD_ISSET((u_int)db_rep->listen_fd, &reads) &&
+ (ret = __repmgr_accept(env)) != 0)
+ goto out;
+ }
+out:
+ UNLOCK_MUTEX(db_rep->mutex);
+ if (ret == DB_DELETED)
+ ret = __repmgr_bow_out(env);
+ LOCK_MUTEX(db_rep->mutex);
+ (void)__repmgr_net_close(env);
+ UNLOCK_MUTEX(db_rep->mutex);
+ return (ret);
+}
+
+/*
+ * Examines a connection to see what sort of I/O to ask for. Clean up defunct
+ * connections.
+ */
+static int
+prepare_io(env, conn, info_)
+ ENV *env;
+ REPMGR_CONNECTION *conn;
+ void *info_;
+{
+ struct io_info *info;
+
+ info = info_;
+
+ if (conn->state == CONN_DEFUNCT)
+ return (__repmgr_cleanup_defunct(env, conn));
+
+ if (!STAILQ_EMPTY(&conn->outbound_queue)) {
+ FD_SET((u_int)conn->fd, info->writes);
+ if (conn->fd > info->maxfd)
+ info->maxfd = conn->fd;
+ }
+ /*
+ * For now we always accept incoming data. If we ever implement some
+ * kind of flow control, we should override it for fledgling connections
+ * (!IS_VALID_EID(conn->eid)) -- in other words, allow reading such a
+ * connection even during flow control duress.
+ */
+ FD_SET((u_int)conn->fd, info->reads);
+ if (conn->fd > info->maxfd)
+ info->maxfd = conn->fd;
+
+ return (0);
+}
+
+/*
+ * Examine a connection, to see what work needs to be done.
+ */
+static int
+__repmgr_conn_work(env, conn, info_)
+ ENV *env;
+ REPMGR_CONNECTION *conn;
+ void *info_;
+{
+ struct io_info *info;
+ int ret;
+ u_int fd;
+
+ ret = 0;
+ fd = (u_int)conn->fd;
+ info = info_;
+
+ if (conn->state == CONN_DEFUNCT)
+ return (0);
+
+ if (FD_ISSET(fd, info->writes))
+ ret = __repmgr_write_some(env, conn);
+
+ if (ret == 0 && FD_ISSET(fd, info->reads))
+ ret = __repmgr_read_from_site(env, conn);
+
+ if (ret == DB_REP_UNAVAIL)
+ ret = __repmgr_bust_connection(env, conn);
+ return (ret);
+}