summaryrefslogtreecommitdiff
path: root/bdb/examples_c/ex_repquote/ex_rq_util.c
diff options
context:
space:
mode:
Diffstat (limited to 'bdb/examples_c/ex_repquote/ex_rq_util.c')
-rw-r--r--bdb/examples_c/ex_repquote/ex_rq_util.c412
1 files changed, 412 insertions, 0 deletions
diff --git a/bdb/examples_c/ex_repquote/ex_rq_util.c b/bdb/examples_c/ex_repquote/ex_rq_util.c
new file mode 100644
index 00000000000..89fd7ae485a
--- /dev/null
+++ b/bdb/examples_c/ex_repquote/ex_rq_util.c
@@ -0,0 +1,412 @@
+/*-
+ * See the file LICENSE for redistribution information.
+ *
+ * Copyright (c) 2001-2002
+ * Sleepycat Software. All rights reserved.
+ *
+ * $Id: ex_rq_util.c,v 1.20 2002/08/06 05:39:04 bostic Exp $
+ */
+
+#include <sys/types.h>
+
+#include <errno.h>
+#include <pthread.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+
+#include <db.h>
+
+#include "ex_repquote.h"
+
+static int connect_site __P((DB_ENV *, machtab_t *, const char *,
+ repsite_t *, int *, int *));
+void * elect_thread __P((void *));
+
+typedef struct {
+ DB_ENV *dbenv;
+ machtab_t *machtab;
+} elect_args;
+
+typedef struct {
+ DB_ENV *dbenv;
+ const char *progname;
+ const char *home;
+ int fd;
+ u_int32_t eid;
+ machtab_t *tab;
+} hm_loop_args;
+
+/*
+ * This is a generic message handling loop that is used both by the
+ * master to accept messages from a client as well as by clients
+ * to communicate with other clients.
+ */
+void *
+hm_loop(args)
+ void *args;
+{
+ DB_ENV *dbenv;
+ DBT rec, control;
+ const char *c, *home, *progname;
+ int fd, eid, n, newm;
+ int open, pri, r, ret, t_ret, tmpid;
+ elect_args *ea;
+ hm_loop_args *ha;
+ machtab_t *tab;
+ pthread_t elect_thr;
+ repsite_t self;
+ u_int32_t timeout;
+ void *status;
+
+ ea = NULL;
+
+ ha = (hm_loop_args *)args;
+ dbenv = ha->dbenv;
+ fd = ha->fd;
+ home = ha->home;
+ eid = ha->eid;
+ progname = ha->progname;
+ tab = ha->tab;
+ free(ha);
+
+ memset(&rec, 0, sizeof(DBT));
+ memset(&control, 0, sizeof(DBT));
+
+ for (ret = 0; ret == 0;) {
+ if ((ret = get_next_message(fd, &rec, &control)) != 0) {
+ /*
+ * Close this connection; if it's the master call
+ * for an election.
+ */
+ close(fd);
+ if ((ret = machtab_rem(tab, eid, 1)) != 0)
+ break;
+
+ /*
+ * If I'm the master, I just lost a client and this
+ * thread is done.
+ */
+ if (master_eid == SELF_EID)
+ break;
+
+ /*
+ * If I was talking with the master and the master
+ * went away, I need to call an election; else I'm
+ * done.
+ */
+ if (master_eid != eid)
+ break;
+
+ master_eid = DB_EID_INVALID;
+ machtab_parm(tab, &n, &pri, &timeout);
+ if ((ret = dbenv->rep_elect(dbenv,
+ n, pri, timeout, &newm)) != 0)
+ continue;
+
+ /*
+ * Regardless of the results, the site I was talking
+ * to is gone, so I have nothing to do but exit.
+ */
+ if (newm == SELF_EID && (ret =
+ dbenv->rep_start(dbenv, NULL, DB_REP_MASTER)) == 0)
+ ret = domaster(dbenv, progname);
+ break;
+ }
+
+ tmpid = eid;
+ switch(r = dbenv->rep_process_message(dbenv,
+ &control, &rec, &tmpid)) {
+ case DB_REP_NEWSITE:
+ /*
+ * Check if we got sent connect information and if we
+ * did, if this is me or if we already have a
+ * connection to this new site. If we don't,
+ * establish a new one.
+ */
+
+ /* No connect info. */
+ if (rec.size == 0)
+ break;
+
+ /* It's me, do nothing. */
+ if (strncmp(myaddr, rec.data, rec.size) == 0)
+ break;
+
+ self.host = (char *)rec.data;
+ self.host = strtok(self.host, ":");
+ if ((c = strtok(NULL, ":")) == NULL) {
+ dbenv->errx(dbenv, "Bad host specification");
+ goto out;
+ }
+ self.port = atoi(c);
+
+ /*
+ * We try to connect to the new site. If we can't,
+ * we treat it as an error since we know that the site
+ * should be up if we got a message from it (even
+ * indirectly).
+ */
+ if ((ret = connect_site(dbenv,
+ tab, progname, &self, &open, &eid)) != 0)
+ goto out;
+ break;
+ case DB_REP_HOLDELECTION:
+ if (master_eid == SELF_EID)
+ break;
+ /* Make sure that previous election has finished. */
+ if (ea != NULL) {
+ (void)pthread_join(elect_thr, &status);
+ ea = NULL;
+ }
+ if ((ea = calloc(sizeof(elect_args), 1)) == NULL) {
+ ret = errno;
+ goto out;
+ }
+ ea->dbenv = dbenv;
+ ea->machtab = tab;
+ ret = pthread_create(&elect_thr,
+ NULL, elect_thread, (void *)ea);
+ break;
+ case DB_REP_NEWMASTER:
+ /* Check if it's us. */
+ master_eid = tmpid;
+ if (tmpid == SELF_EID) {
+ if ((ret = dbenv->rep_start(dbenv,
+ NULL, DB_REP_MASTER)) != 0)
+ goto out;
+ ret = domaster(dbenv, progname);
+ }
+ break;
+ case 0:
+ break;
+ default:
+ dbenv->err(dbenv, r, "DB_ENV->rep_process_message");
+ break;
+ }
+ }
+
+out: if ((t_ret = machtab_rem(tab, eid, 1)) != 0 && ret == 0)
+ ret = t_ret;
+
+ /* Don't close the environment before any children exit. */
+ if (ea != NULL)
+ (void)pthread_join(elect_thr, &status);
+
+ return ((void *)ret);
+}
+
+/*
+ * This is a generic thread that spawns a thread to listen for connections
+ * on a socket and then spawns off child threads to handle each new
+ * connection.
+ */
+void *
+connect_thread(args)
+ void *args;
+{
+ DB_ENV *dbenv;
+ const char *home, *progname;
+ int fd, i, eid, ns, port, ret;
+ hm_loop_args *ha;
+ connect_args *cargs;
+ machtab_t *machtab;
+#define MAX_THREADS 25
+ pthread_t hm_thrs[MAX_THREADS];
+ pthread_attr_t attr;
+
+ ha = NULL;
+ cargs = (connect_args *)args;
+ dbenv = cargs->dbenv;
+ home = cargs->home;
+ progname = cargs->progname;
+ machtab = cargs->machtab;
+ port = cargs->port;
+
+ if ((ret = pthread_attr_init(&attr)) != 0)
+ return ((void *)EXIT_FAILURE);
+
+ if ((ret =
+ pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED)) != 0)
+ goto err;
+
+ /*
+ * Loop forever, accepting connections from new machines,
+ * and forking off a thread to handle each.
+ */
+ if ((fd = listen_socket_init(progname, port)) < 0) {
+ ret = errno;
+ goto err;
+ }
+
+ for (i = 0; i < MAX_THREADS; i++) {
+ if ((ns = listen_socket_accept(machtab,
+ progname, fd, &eid)) < 0) {
+ ret = errno;
+ goto err;
+ }
+ if ((ha = calloc(sizeof(hm_loop_args), 1)) == NULL)
+ goto err;
+ ha->progname = progname;
+ ha->home = home;
+ ha->fd = ns;
+ ha->eid = eid;
+ ha->tab = machtab;
+ ha->dbenv = dbenv;
+ if ((ret = pthread_create(&hm_thrs[i++], &attr,
+ hm_loop, (void *)ha)) != 0)
+ goto err;
+ ha = NULL;
+ }
+
+ /* If we fell out, we ended up with too many threads. */
+ dbenv->errx(dbenv, "Too many threads");
+ ret = ENOMEM;
+
+err: pthread_attr_destroy(&attr);
+ return (ret == 0 ? (void *)EXIT_SUCCESS : (void *)EXIT_FAILURE);
+}
+
+/*
+ * Open a connection to everyone that we've been told about. If we
+ * cannot open some connections, keep trying.
+ */
+void *
+connect_all(args)
+ void *args;
+{
+ DB_ENV *dbenv;
+ all_args *aa;
+ const char *home, *progname;
+ hm_loop_args *ha;
+ int failed, i, eid, nsites, open, ret, *success;
+ machtab_t *machtab;
+ repsite_t *sites;
+
+ ha = NULL;
+ aa = (all_args *)args;
+ dbenv = aa->dbenv;
+ progname = aa->progname;
+ home = aa->home;
+ machtab = aa->machtab;
+ nsites = aa->nsites;
+ sites = aa->sites;
+
+ ret = 0;
+
+ /* Some implementations of calloc are sad about alloc'ing 0 things. */
+ if ((success = calloc(nsites > 0 ? nsites : 1, sizeof(int))) == NULL) {
+ dbenv->err(dbenv, errno, "connect_all");
+ ret = 1;
+ goto err;
+ }
+
+ for (failed = nsites; failed > 0;) {
+ for (i = 0; i < nsites; i++) {
+ if (success[i])
+ continue;
+
+ ret = connect_site(dbenv, machtab,
+ progname, &sites[i], &open, &eid);
+
+ /*
+ * If we couldn't make the connection, this isn't
+ * fatal to the loop, but we have nothing further
+ * to do on this machine at the moment.
+ */
+ if (ret == DB_REP_UNAVAIL)
+ continue;
+
+ if (ret != 0)
+ goto err;
+
+ failed--;
+ success[i] = 1;
+
+ /* If the connection is already open, we're done. */
+ if (ret == 0 && open == 1)
+ continue;
+
+ }
+ sleep(1);
+ }
+
+err: free(success);
+ return (ret ? (void *)EXIT_FAILURE : (void *)EXIT_SUCCESS);
+}
+
+int
+connect_site(dbenv, machtab, progname, site, is_open, eidp)
+ DB_ENV *dbenv;
+ machtab_t *machtab;
+ const char *progname;
+ repsite_t *site;
+ int *is_open;
+ int *eidp;
+{
+ int ret, s;
+ hm_loop_args *ha;
+ pthread_t hm_thr;
+
+ if ((s = get_connected_socket(machtab, progname,
+ site->host, site->port, is_open, eidp)) < 0)
+ return (DB_REP_UNAVAIL);
+
+ if (*is_open)
+ return (0);
+
+ if ((ha = calloc(sizeof(hm_loop_args), 1)) == NULL) {
+ ret = errno;
+ goto err;
+ }
+
+ ha->progname = progname;
+ ha->fd = s;
+ ha->eid = *eidp;
+ ha->tab = machtab;
+ ha->dbenv = dbenv;
+
+ if ((ret = pthread_create(&hm_thr, NULL,
+ hm_loop, (void *)ha)) != 0) {
+ dbenv->err(dbenv, ret, "connect site");
+ goto err1;
+ }
+
+ return (0);
+
+err1: free(ha);
+err:
+ return (ret);
+}
+
+/*
+ * We need to spawn off a new thread in which to hold an election in
+ * case we are the only thread listening on for messages.
+ */
+void *
+elect_thread(args)
+ void *args;
+{
+ DB_ENV *dbenv;
+ elect_args *eargs;
+ int n, ret, pri;
+ machtab_t *machtab;
+ u_int32_t timeout;
+
+ eargs = (elect_args *)args;
+ dbenv = eargs->dbenv;
+ machtab = eargs->machtab;
+ free(eargs);
+
+ machtab_parm(machtab, &n, &pri, &timeout);
+ while ((ret =
+ dbenv->rep_elect(dbenv, n, pri, timeout, &master_eid)) != 0)
+ sleep(2);
+
+ /* Check if it's us. */
+ if (master_eid == SELF_EID)
+ ret = dbenv->rep_start(dbenv, NULL, DB_REP_MASTER);
+
+ return ((void *)(ret == 0 ? EXIT_SUCCESS : EXIT_FAILURE));
+}