summaryrefslogtreecommitdiff
path: root/daemons/clvmd/clvmd.c
diff options
context:
space:
mode:
Diffstat (limited to 'daemons/clvmd/clvmd.c')
-rw-r--r--daemons/clvmd/clvmd.c2422
1 files changed, 0 insertions, 2422 deletions
diff --git a/daemons/clvmd/clvmd.c b/daemons/clvmd/clvmd.c
deleted file mode 100644
index 73f471145..000000000
--- a/daemons/clvmd/clvmd.c
+++ /dev/null
@@ -1,2422 +0,0 @@
-/*
- * Copyright (C) 2002-2004 Sistina Software, Inc. All rights reserved.
- * Copyright (C) 2004-2014 Red Hat, Inc. All rights reserved.
- *
- * This file is part of LVM2.
- *
- * This copyrighted material is made available to anyone wishing to use,
- * modify, copy, or redistribute it subject to the terms and conditions
- * of the GNU General Public License v.2.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
- */
-
-/*
- * CLVMD: Cluster LVM daemon
- */
-
-#include "clvmd-common.h"
-
-#include "clvmd-comms.h"
-#include "daemons/clvmd/clvm.h"
-#include "clvmd.h"
-#include "lvm-functions.h"
-#include "lvm-version.h"
-#include "refresh_clvmd.h"
-
-#ifdef HAVE_COROSYNC_CONFDB_H
-#include <corosync/confdb.h>
-#endif
-
-#include <pthread.h>
-#include <getopt.h>
-#include <ctype.h>
-#include <stdarg.h>
-
-#include <fcntl.h>
-#include <netinet/in.h>
-#include <signal.h>
-#include <stddef.h>
-#include <syslog.h>
-#include <sys/un.h>
-#include <sys/utsname.h>
-
-#ifndef TRUE
-#define TRUE 1
-#endif
-#ifndef FALSE
-#define FALSE 0
-#endif
-
-#define MAX_RETRIES 4
-#define MAX_MISSING_LEN 8000 /* Max supported clvmd message size ? */
-
-#define ISLOCAL_CSID(c) (memcmp(c, our_csid, max_csid_len) == 0)
-
-/* Head of the fd list. Also contains
- the cluster_socket details */
-static struct local_client local_client_head;
-static int _local_client_count = 0;
-
-static unsigned short global_xid = 0; /* Last transaction ID issued */
-
-struct cluster_ops *clops = NULL;
-
-static char our_csid[MAX_CSID_LEN];
-static unsigned max_csid_len;
-static unsigned max_cluster_message;
-static unsigned max_cluster_member_name_len;
-
-static void _add_client(struct local_client *new_client, struct local_client *existing_client)
-{
- _local_client_count++;
- DEBUGLOG("(%p) Adding listener for fd %d. (Now %d monitored fds.)\n", new_client, new_client->fd, _local_client_count);
- new_client->next = existing_client->next;
- existing_client->next = new_client;
-}
-
-int add_client(struct local_client *new_client)
-{
- _add_client(new_client, &local_client_head);
-
- return 0;
-}
-
-/* Returns 0 if delfd is found and removed from list */
-static int _del_client(struct local_client *delfd)
-{
- struct local_client *lastfd, *thisfd;
-
- for (lastfd = &local_client_head; (thisfd = lastfd->next); lastfd = thisfd)
- if (thisfd == delfd) {
- DEBUGLOG("(%p) Removing listener for fd %d\n", thisfd, thisfd->fd);
- lastfd->next = delfd->next;
- _local_client_count--;
- return 0;
- }
-
- return 1;
-}
-
-/* Structure of items on the LVM thread list */
-struct lvm_thread_cmd {
- struct dm_list list;
-
- struct local_client *client;
- struct clvm_header *msg;
- char csid[MAX_CSID_LEN];
- int remote; /* Flag */
- int msglen;
- unsigned short xid;
-};
-
-struct lvm_startup_params {
- struct dm_hash_table *excl_uuid;
-};
-
-static debug_t debug = DEBUG_OFF;
-static int foreground_mode = 0;
-static pthread_t lvm_thread;
-/* Stack size 128KiB for thread, must be bigger then DEFAULT_RESERVED_STACK */
-static const size_t STACK_SIZE = 128 * 1024;
-static pthread_attr_t stack_attr;
-static int lvm_thread_exit = 0;
-static pthread_mutex_t lvm_thread_mutex;
-static pthread_mutex_t _debuglog_mutex = PTHREAD_MUTEX_INITIALIZER;
-static pthread_cond_t lvm_thread_cond;
-static pthread_barrier_t lvm_start_barrier;
-static struct dm_list lvm_cmd_head;
-static volatile sig_atomic_t quit = 0;
-static volatile sig_atomic_t reread_config = 0;
-static int child_pipe[2];
-
-/* Reasons the daemon failed initialisation */
-#define DFAIL_INIT 1
-#define DFAIL_LOCAL_SOCK 2
-#define DFAIL_CLUSTER_IF 3
-#define DFAIL_MALLOC 4
-#define DFAIL_TIMEOUT 5
-#define SUCCESS 0
-
-typedef enum {IF_AUTO, IF_CMAN, IF_OPENAIS, IF_COROSYNC, IF_SINGLENODE} if_type_t;
-
-/* Prototypes for code further down */
-static void sigusr2_handler(int sig);
-static void sighup_handler(int sig);
-static void sigterm_handler(int sig);
-static void send_local_reply(struct local_client *client, int status,
- int clientid);
-static void free_reply(struct local_client *client);
-static void send_version_message(void);
-static void *pre_and_post_thread(void *arg);
-static int send_message(void *buf, int msglen, const char *csid, int fd,
- const char *errtext);
-static int read_from_local_sock(struct local_client *thisfd);
-static int cleanup_zombie(struct local_client *thisfd);
-static int process_local_command(struct clvm_header *msg, int msglen,
- struct local_client *client,
- unsigned short xid);
-static void process_remote_command(struct clvm_header *msg, int msglen, int fd,
- const char *csid);
-static int process_reply(const struct clvm_header *msg, int msglen,
- const char *csid);
-static int open_local_sock(void);
-static void close_local_sock(int local_socket);
-static int check_local_clvmd(void);
-static struct local_client *find_client(int clientid);
-static void main_loop(int cmd_timeout);
-static void be_daemon(int start_timeout);
-static int check_all_clvmds_running(struct local_client *client);
-static int local_rendezvous_callback(struct local_client *thisfd, char *buf,
- int len, const char *csid,
- struct local_client **new_client);
-static void *lvm_thread_fn(void *) __attribute__((noreturn));
-static int add_to_lvmqueue(struct local_client *client, struct clvm_header *msg,
- int msglen, const char *csid);
-static int distribute_command(struct local_client *thisfd);
-static void hton_clvm(struct clvm_header *hdr);
-static void ntoh_clvm(struct clvm_header *hdr);
-static void add_reply_to_list(struct local_client *client, int status,
- const char *csid, const char *buf, int len);
-static if_type_t parse_cluster_interface(char *ifname);
-static if_type_t get_cluster_type(void);
-
-static void usage(const char *prog, FILE *file)
-{
- fprintf(file, "Usage: %s [options]\n"
- " -C Sets debug level (from -d) on all clvmd instances clusterwide\n"
- " -d[<n>] Set debug logging (0:none, 1:stderr (implies -f option), 2:syslog)\n"
- " -E<uuid> Take this lock uuid as exclusively locked resource (for restart)\n"
- " -f Don't fork, run in the foreground\n"
- " -h Show this help information\n"
- " -I<cmgr> Cluster manager (default: auto)\n"
- " Available cluster managers: "
-#ifdef USE_COROSYNC
- "corosync "
-#endif
-#ifdef USE_CMAN
- "cman "
-#endif
-#ifdef USE_OPENAIS
- "openais "
-#endif
-#ifdef USE_SINGLENODE
- "singlenode "
-#endif
- "\n"
- " -R Tell all running clvmds in the cluster to reload their device cache\n"
- " -S Restart clvmd, preserving exclusive locks\n"
- " -t<secs> Command timeout (default: 60 seconds)\n"
- " -T<secs> Startup timeout (default: 0 seconds)\n"
- " -V Show version of clvmd\n"
- "\n", prog);
-}
-
-/* Called to signal the parent how well we got on during initialisation */
-static void child_init_signal(int status)
-{
- if (child_pipe[1]) {
- /* FIXME Use a proper wrapper around write */
- if (write(child_pipe[1], &status, sizeof(status)) < 0)
- log_sys_error("write", "child_pipe");
- if (close(child_pipe[1]))
- log_sys_error("close", "child_pipe");
- }
-}
-
-static __attribute__((noreturn)) void child_init_signal_and_exit(int status)
-{
- child_init_signal(status);
- exit(status);
-}
-
-static void safe_close(int *fd)
-{
- if (*fd >= 0) {
- int to_close = *fd;
- *fd = -1;
- if (close(to_close))
- log_sys_error("close", ""); /* path */
- }
-}
-
-void debuglog(const char *fmt, ...)
-{
- time_t P;
- va_list ap;
- static int syslog_init = 0;
- char buf_ctime[64];
-
- switch (clvmd_get_debug()) {
- case DEBUG_STDERR:
- pthread_mutex_lock(&_debuglog_mutex);
- va_start(ap,fmt);
- time(&P);
- fprintf(stderr, "CLVMD[%x]: %.15s ", (int)pthread_self(), ctime_r(&P, buf_ctime) + 4);
- vfprintf(stderr, fmt, ap);
- va_end(ap);
- fflush(stderr);
- pthread_mutex_unlock(&_debuglog_mutex);
- break;
- case DEBUG_SYSLOG:
- pthread_mutex_lock(&_debuglog_mutex);
- if (!syslog_init) {
- openlog("clvmd", LOG_PID, LOG_DAEMON);
- syslog_init = 1;
- }
-
- va_start(ap,fmt);
- vsyslog(LOG_DEBUG, fmt, ap);
- va_end(ap);
- pthread_mutex_unlock(&_debuglog_mutex);
- break;
- case DEBUG_OFF:
- break;
- }
-}
-
-void clvmd_set_debug(debug_t new_debug)
-{
- if (!foreground_mode && new_debug == DEBUG_STDERR)
- new_debug = DEBUG_SYSLOG;
-
- if (new_debug > DEBUG_SYSLOG)
- new_debug = DEBUG_SYSLOG;
-
- debug = new_debug;
-}
-
-debug_t clvmd_get_debug(void)
-{
- return debug;
-}
-
-int clvmd_get_foreground(void)
-{
- return foreground_mode;
-}
-
-static const char *decode_cmd(unsigned char cmdl)
-{
- static char buf[128];
- const char *command;
-
- switch (cmdl) {
- case CLVMD_CMD_TEST:
- command = "TEST";
- break;
- case CLVMD_CMD_LOCK_VG:
- command = "LOCK_VG";
- break;
- case CLVMD_CMD_LOCK_LV:
- command = "LOCK_LV";
- break;
- case CLVMD_CMD_REFRESH:
- command = "REFRESH";
- break;
- case CLVMD_CMD_SET_DEBUG:
- command = "SET_DEBUG";
- break;
- case CLVMD_CMD_GET_CLUSTERNAME:
- command = "GET_CLUSTERNAME";
- break;
- case CLVMD_CMD_VG_BACKUP:
- command = "VG_BACKUP";
- break;
- case CLVMD_CMD_REPLY:
- command = "REPLY";
- break;
- case CLVMD_CMD_VERSION:
- command = "VERSION";
- break;
- case CLVMD_CMD_GOAWAY:
- command = "GOAWAY";
- break;
- case CLVMD_CMD_LOCK:
- command = "LOCK";
- break;
- case CLVMD_CMD_UNLOCK:
- command = "UNLOCK";
- break;
- case CLVMD_CMD_LOCK_QUERY:
- command = "LOCK_QUERY";
- break;
- case CLVMD_CMD_RESTART:
- command = "RESTART";
- break;
- case CLVMD_CMD_SYNC_NAMES:
- command = "SYNC_NAMES";
- break;
- default:
- command = "unknown";
- break;
- }
-
- snprintf(buf, sizeof(buf), "%s (0x%x)", command, cmdl);
-
- return buf;
-}
-
-static void remove_lockfile(void)
-{
- if (unlink(CLVMD_PIDFILE))
- log_sys_error("unlink", CLVMD_PIDFILE);
-}
-
-/*
- * clvmd require dm-ioctl capability for operation
- */
-static void check_permissions(void)
-{
- if (getuid() || geteuid()) {
- log_error("Cannot run as a non-root user.");
-
- /*
- * Fail cleanly here if not run as root, instead of failing
- * later when attempting a root-only operation
- * Preferred exit code from an initscript for this.
- */
- exit(4);
- }
-}
-
-int main(int argc, char *argv[])
-{
- int local_sock;
- struct local_client *newfd, *delfd;
- struct lvm_startup_params lvm_params;
- int opt;
- int cmd_timeout = DEFAULT_CMD_TIMEOUT;
- int start_timeout = 0;
- if_type_t cluster_iface = IF_AUTO;
- sigset_t ss;
- debug_t debug_opt = DEBUG_OFF;
- debug_t debug_arg = DEBUG_OFF;
- int clusterwide_opt = 0;
- mode_t old_mask;
- int ret = 1;
-
- struct option longopts[] = {
- { "help", 0, 0, 'h' },
- { NULL, 0, 0, 0 }
- };
-
- if (!(lvm_params.excl_uuid = dm_hash_create(128))) {
- fprintf(stderr, "Failed to allocate hash table\n");
- return 1;
- }
-
- /* Deal with command-line arguments */
- opterr = 0;
- optind = 0;
- while ((opt = getopt_long(argc, argv, "Vhfd:t:RST:CI:E:",
- longopts, NULL)) != -1) {
- switch (opt) {
- case 'h':
- usage(argv[0], stdout);
- exit(0);
-
- case 'R':
- check_permissions();
- ret = (refresh_clvmd(1) == 1) ? 0 : 1;
- goto out;
-
- case 'S':
- check_permissions();
- ret = (restart_clvmd(clusterwide_opt) == 1) ? 0 : 1;
- goto out;
-
- case 'C':
- clusterwide_opt = 1;
- break;
-
- case 'd':
- debug_opt = DEBUG_STDERR;
- debug_arg = (debug_t) atoi(optarg);
- if (debug_arg == DEBUG_STDERR)
- foreground_mode = 1;
- break;
-
- case 'f':
- foreground_mode = 1;
- break;
- case 't':
- cmd_timeout = atoi(optarg);
- if (!cmd_timeout) {
- fprintf(stderr, "command timeout is invalid\n");
- usage(argv[0], stderr);
- exit(1);
- }
- break;
- case 'I':
- cluster_iface = parse_cluster_interface(optarg);
- break;
- case 'E':
- if (!dm_hash_insert(lvm_params.excl_uuid, optarg, optarg)) {
- fprintf(stderr, "Failed to allocate hash entry\n");
- goto out;
- }
- break;
- case 'T':
- start_timeout = atoi(optarg);
- if (start_timeout <= 0) {
- fprintf(stderr, "startup timeout is invalid\n");
- usage(argv[0], stderr);
- exit(1);
- }
- break;
-
- case 'V':
- printf("Cluster LVM daemon version: %s\n", LVM_VERSION);
- printf("Protocol version: %d.%d.%d\n",
- CLVMD_MAJOR_VERSION, CLVMD_MINOR_VERSION,
- CLVMD_PATCH_VERSION);
- exit(0);
- break;
-
- default:
- usage(argv[0], stderr);
- exit(2);
- }
- }
-
- check_permissions();
-
- /*
- * Switch to C locale to avoid reading large locale-archive file
- * used by some glibc (on some distributions it takes over 100MB).
- * Daemon currently needs to use mlockall().
- */
- if (setenv("LC_ALL", "C", 1))
- perror("Cannot set LC_ALL to C");
-
- /* Setting debug options on an existing clvmd */
- if (debug_opt && !check_local_clvmd()) {
- dm_hash_destroy(lvm_params.excl_uuid);
- return debug_clvmd(debug_arg, clusterwide_opt)==1?0:1;
- }
-
- clvmd_set_debug(debug_arg);
-
- /* Fork into the background (unless requested not to) */
- if (!foreground_mode)
- be_daemon(start_timeout);
-
- (void) dm_prepare_selinux_context(DEFAULT_RUN_DIR, S_IFDIR);
- old_mask = umask(0077);
- if (dm_create_dir(DEFAULT_RUN_DIR) == 0) {
- DEBUGLOG("clvmd: unable to create %s directory\n",
- DEFAULT_RUN_DIR);
- umask(old_mask);
- exit(1);
- }
- umask(old_mask);
-
- /* Create pidfile */
- (void) dm_prepare_selinux_context(CLVMD_PIDFILE, S_IFREG);
- if (dm_create_lockfile(CLVMD_PIDFILE) == 0) {
- DEBUGLOG("clvmd: unable to create lockfile\n");
- exit(1);
- }
- (void) dm_prepare_selinux_context(NULL, 0);
-
- atexit(remove_lockfile);
-
- DEBUGLOG("CLVMD started\n");
-
- /* Open the Unix socket we listen for commands on.
- We do this before opening the cluster socket so that
- potential clients will block rather than error if we are running
- but the cluster is not ready yet */
- local_sock = open_local_sock();
- if (local_sock < 0) {
- child_init_signal_and_exit(DFAIL_LOCAL_SOCK);
- /* NOTREACHED */
- }
-
- /* Set up signal handlers, USR1 is for cluster change notifications (in cman)
- USR2 causes child threads to exit.
- (HUP used to cause gulm to re-read the nodes list from CCS.)
- PIPE should be ignored */
- signal(SIGUSR2, sigusr2_handler);
- signal(SIGHUP, sighup_handler);
- signal(SIGPIPE, SIG_IGN);
-
- /* Block SIGUSR2/SIGINT/SIGTERM in process */
- sigemptyset(&ss);
- sigaddset(&ss, SIGUSR2);
- sigaddset(&ss, SIGINT);
- sigaddset(&ss, SIGTERM);
- sigprocmask(SIG_BLOCK, &ss, NULL);
-
- /* Initialise the LVM thread variables */
- dm_list_init(&lvm_cmd_head);
- if (pthread_attr_init(&stack_attr) ||
- pthread_attr_setstacksize(&stack_attr, STACK_SIZE + getpagesize())) {
- log_sys_error("pthread_attr_init", "");
- exit(1);
- }
- pthread_mutex_init(&lvm_thread_mutex, NULL);
- pthread_cond_init(&lvm_thread_cond, NULL);
- pthread_barrier_init(&lvm_start_barrier, NULL, 2);
- init_lvhash();
-
- /* Start the cluster interface */
- if (cluster_iface == IF_AUTO)
- cluster_iface = get_cluster_type();
-
-#ifdef USE_CMAN
- if ((cluster_iface == IF_AUTO || cluster_iface == IF_CMAN) &&
- (clops = init_cman_cluster())) {
- max_csid_len = CMAN_MAX_CSID_LEN;
- max_cluster_message = CMAN_MAX_CLUSTER_MESSAGE;
- max_cluster_member_name_len = CMAN_MAX_NODENAME_LEN;
- syslog(LOG_NOTICE, "Cluster LVM daemon started - connected to CMAN");
- }
-#endif
-#ifdef USE_COROSYNC
- if (!clops)
- if (((cluster_iface == IF_AUTO || cluster_iface == IF_COROSYNC) &&
- (clops = init_corosync_cluster()))) {
- max_csid_len = COROSYNC_CSID_LEN;
- max_cluster_message = COROSYNC_MAX_CLUSTER_MESSAGE;
- max_cluster_member_name_len = COROSYNC_MAX_CLUSTER_MEMBER_NAME_LEN;
- syslog(LOG_NOTICE, "Cluster LVM daemon started - connected to Corosync");
- }
-#endif
-#ifdef USE_OPENAIS
- if (!clops)
- if ((cluster_iface == IF_AUTO || cluster_iface == IF_OPENAIS) &&
- (clops = init_openais_cluster())) {
- max_csid_len = OPENAIS_CSID_LEN;
- max_cluster_message = OPENAIS_MAX_CLUSTER_MESSAGE;
- max_cluster_member_name_len = OPENAIS_MAX_CLUSTER_MEMBER_NAME_LEN;
- syslog(LOG_NOTICE, "Cluster LVM daemon started - connected to OpenAIS");
- }
-#endif
-#ifdef USE_SINGLENODE
- if (!clops)
- if (cluster_iface == IF_SINGLENODE && (clops = init_singlenode_cluster())) {
- max_csid_len = SINGLENODE_CSID_LEN;
- max_cluster_message = SINGLENODE_MAX_CLUSTER_MESSAGE;
- max_cluster_member_name_len = MAX_CLUSTER_MEMBER_NAME_LEN;
- syslog(LOG_NOTICE, "Cluster LVM daemon started - running in single-node mode");
- }
-#endif
-
- if (!clops) {
- DEBUGLOG("Can't initialise cluster interface\n");
- log_error("Can't initialise cluster interface.");
- child_init_signal_and_exit(DFAIL_CLUSTER_IF);
- /* NOTREACHED */
- }
- DEBUGLOG("Cluster ready, doing some more initialisation\n");
-
- /* Save our CSID */
- clops->get_our_csid(our_csid);
-
- /* Initialise the FD list head */
- local_client_head.fd = clops->get_main_cluster_fd();
- local_client_head.type = CLUSTER_MAIN_SOCK;
- local_client_head.callback = clops->cluster_fd_callback;
- _local_client_count++;
-
- /* Add the local socket to the list */
- if (!(newfd = dm_zalloc(sizeof(struct local_client)))) {
- child_init_signal_and_exit(DFAIL_MALLOC);
- /* NOTREACHED */
- }
-
- newfd->fd = local_sock;
- newfd->type = LOCAL_RENDEZVOUS;
- newfd->callback = local_rendezvous_callback;
-
- (void) add_client(newfd);
-
- /* This needs to be started after cluster initialisation
- as it may need to take out locks */
- DEBUGLOG("Starting LVM thread\n");
- DEBUGLOG("(%p) Main cluster socket fd %d with local socket %d (%p)\n",
- &local_client_head, local_client_head.fd, newfd->fd, newfd);
-
- /* Don't let anyone else to do work until we are started */
- if (pthread_create(&lvm_thread, &stack_attr, lvm_thread_fn, &lvm_params)) {
- log_sys_error("pthread_create", "");
- goto out;
- }
-
- /* Don't start until the LVM thread is ready */
- pthread_barrier_wait(&lvm_start_barrier);
-
- /* Tell the rest of the cluster our version number */
- if (clops->cluster_init_completed)
- clops->cluster_init_completed();
-
- DEBUGLOG("clvmd ready for work\n");
- child_init_signal(SUCCESS);
-
- /* Try to shutdown neatly */
- signal(SIGTERM, sigterm_handler);
- signal(SIGINT, sigterm_handler);
-
- /* Do some work */
- main_loop(cmd_timeout);
-
- pthread_mutex_lock(&lvm_thread_mutex);
- lvm_thread_exit = 1;
- pthread_cond_signal(&lvm_thread_cond);
- pthread_mutex_unlock(&lvm_thread_mutex);
- if ((errno = pthread_join(lvm_thread, NULL)))
- log_sys_error("pthread_join", "");
-
- close_local_sock(local_sock);
-
- while ((delfd = local_client_head.next)) {
- local_client_head.next = delfd->next;
- _local_client_count--;
- /* Failing cleanup_zombie leaks... */
- if (delfd->type == LOCAL_SOCK && !cleanup_zombie(delfd))
- cmd_client_cleanup(delfd); /* calls sync_unlock */
- if (delfd->fd != local_sock)
- safe_close(&(delfd->fd));
- dm_free(delfd);
- }
-
- DEBUGLOG("cluster_closedown\n");
- destroy_lvhash();
- clops->cluster_closedown();
-
- ret = 0;
-out:
- dm_hash_destroy(lvm_params.excl_uuid);
-
- return ret;
-}
-
-/* Called when the cluster layer has completed initialisation.
- We send the version message */
-void clvmd_cluster_init_completed(void)
-{
- send_version_message();
-}
-
-/* Data on a connected socket */
-static int local_sock_callback(struct local_client *thisfd, char *buf, int len,
- const char *csid,
- struct local_client **new_client)
-{
- *new_client = NULL;
- return read_from_local_sock(thisfd);
-}
-
-/* Data on a connected socket */
-static int local_rendezvous_callback(struct local_client *thisfd, char *buf,
- int len, const char *csid,
- struct local_client **new_client)
-{
- /* Someone connected to our local socket, accept it. */
-
- struct sockaddr_un socka;
- struct local_client *newfd;
- socklen_t sl = sizeof(socka);
- int client_fd = accept(thisfd->fd, (struct sockaddr *) &socka, &sl);
-
- if (client_fd == -1 && errno == EINTR)
- return 1;
-
- if (client_fd >= 0) {
- if (!(newfd = dm_zalloc(sizeof(*newfd)))) {
- if (close(client_fd))
- log_sys_error("close", "socket");
- return 1;
- }
-
- pthread_cond_init(&newfd->bits.localsock.cond, NULL);
- pthread_mutex_init(&newfd->bits.localsock.mutex, NULL);
-
- if (fcntl(client_fd, F_SETFD, 1))
- DEBUGLOG("(%p) Setting CLOEXEC on client fd %d failed: %s\n", thisfd, client_fd, strerror(errno));
-
- newfd->fd = client_fd;
- newfd->type = LOCAL_SOCK;
- newfd->callback = local_sock_callback;
- newfd->bits.localsock.all_success = 1;
- DEBUGLOG("(%p) Got new connection on fd %d\n", newfd, newfd->fd);
- *new_client = newfd;
- }
- return 1;
-}
-
-static int local_pipe_callback(struct local_client *thisfd, char *buf,
- int maxlen, const char *csid,
- struct local_client **new_client)
-{
- int len;
- char buffer[PIPE_BUF];
- struct local_client *sock_client = thisfd->bits.pipe.client;
- int status = -1; /* in error by default */
-
- len = read(thisfd->fd, buffer, sizeof(int));
- if (len == -1 && errno == EINTR)
- return 1;
-
- if (len == sizeof(int))
- memcpy(&status, buffer, sizeof(int));
-
- DEBUGLOG("(%p) Read on pipe %d, %d bytes, status %d\n",
- thisfd, thisfd->fd, len, status);
-
- /* EOF on pipe or an error, close it */
- if (len <= 0) {
- void *ret = &status;
- if (close(thisfd->fd))
- log_sys_error("close", "local_pipe");
-
- /* Clear out the cross-link */
- if (thisfd->bits.pipe.client)
- thisfd->bits.pipe.client->bits.localsock.pipe_client = NULL;
-
- /* Reap child thread */
- if (thisfd->bits.pipe.threadid) {
- if ((errno = pthread_join(thisfd->bits.pipe.threadid, &ret)))
- log_sys_error("pthread_join", "");
-
- thisfd->bits.pipe.threadid = 0;
- if (thisfd->bits.pipe.client)
- thisfd->bits.pipe.client->bits.localsock.threadid = 0;
- }
- return -1;
- } else {
- DEBUGLOG("(%p) Background routine status was %d, sock_client %p\n",
- thisfd, status, sock_client);
- /* But has the client gone away ?? */
- if (!sock_client) {
- DEBUGLOG("(%p) Got pipe response for dead client, ignoring it\n", thisfd);
- } else {
- /* If error then just return that code */
- if (status)
- send_local_reply(sock_client, status,
- sock_client->fd);
- else {
- /* FIXME: closer inspect this code since state is write thread protected */
- pthread_mutex_lock(&sock_client->bits.localsock.mutex);
- if (sock_client->bits.localsock.state == POST_COMMAND) {
- pthread_mutex_unlock(&sock_client->bits.localsock.mutex);
- send_local_reply(sock_client, 0,
- sock_client->fd);
- } else {
- /* PRE_COMMAND finished. */
- pthread_mutex_unlock(&sock_client->bits.localsock.mutex);
- if ((status = distribute_command(sock_client)))
- send_local_reply(sock_client, EFBIG,
- sock_client->fd);
- }
- }
- }
- }
- return len;
-}
-
-/* If a noed is up, look for it in the reply array, if it's not there then
- add one with "ETIMEDOUT".
- NOTE: This won't race with real replies because they happen in the same thread.
-*/
-static void timedout_callback(struct local_client *client, const char *csid,
- int node_up)
-{
- struct node_reply *reply;
- char nodename[max_cluster_member_name_len];
-
- if (!node_up)
- return;
-
- clops->name_from_csid(csid, nodename);
- DEBUGLOG("(%p) Checking for a reply from %s\n", client, nodename);
- pthread_mutex_lock(&client->bits.localsock.mutex);
-
- reply = client->bits.localsock.replies;
- while (reply && strcmp(reply->node, nodename) != 0)
- reply = reply->next;
-
- pthread_mutex_unlock(&client->bits.localsock.mutex);
-
- if (!reply) {
- DEBUGLOG("(%p) Node %s timed-out\n", client, nodename);
- add_reply_to_list(client, ETIMEDOUT, csid,
- "Command timed out", 18);
- }
-}
-
-/* Called when the request has timed out on at least one node. We fill in
- the remaining node entries with ETIMEDOUT and return.
-
- By the time we get here the node that caused
- the timeout could have gone down, in which case we will never get the expected
- number of replies that triggers the post command so we need to do it here
-*/
-static void request_timed_out(struct local_client *client)
-{
- DEBUGLOG("(%p) Request timed-out. padding\n", client);
- clops->cluster_do_node_callback(client, timedout_callback);
-
- if (!client->bits.localsock.threadid)
- return;
-
- pthread_mutex_lock(&client->bits.localsock.mutex);
-
- if (!client->bits.localsock.finished &&
- (client->bits.localsock.num_replies !=
- client->bits.localsock.expected_replies)) {
- /* Post-process the command */
- client->bits.localsock.state = POST_COMMAND;
- pthread_cond_signal(&client->bits.localsock.cond);
- }
-
- pthread_mutex_unlock(&client->bits.localsock.mutex);
-}
-
-/* This is where the real work happens */
-static void main_loop(int cmd_timeout)
-{
- sigset_t ss;
-
- DEBUGLOG("Using timeout of %d seconds\n", cmd_timeout);
-
- sigemptyset(&ss);
- sigaddset(&ss, SIGINT);
- sigaddset(&ss, SIGTERM);
- pthread_sigmask(SIG_UNBLOCK, &ss, NULL);
- /* Main loop */
- while (!quit) {
- fd_set in;
- int select_status;
- struct local_client *thisfd, *nextfd;
- struct timeval tv = { cmd_timeout, 0 };
- int quorate = clops->is_quorate();
- int client_count = 0;
- int max_fd = 0;
-
- /* Wait on the cluster FD and all local sockets/pipes */
- local_client_head.fd = clops->get_main_cluster_fd();
- FD_ZERO(&in);
-
- for (thisfd = &local_client_head; thisfd; thisfd = thisfd->next) {
- client_count++;
- max_fd = max(max_fd, thisfd->fd);
- }
-
- if (max_fd > FD_SETSIZE - 32) {
- fprintf(stderr, "WARNING: There are too many connections to clvmd. Investigate and take action now!\n");
- fprintf(stderr, "WARNING: Your cluster may freeze up if the number of clvmd file descriptors (%d) exceeds %d.\n", max_fd + 1, FD_SETSIZE);
- }
-
- for (thisfd = &local_client_head; thisfd; thisfd = nextfd) {
- nextfd = thisfd->next;
-
- if (thisfd->removeme && !cleanup_zombie(thisfd)) {
- /* cleanup_zombie might have removed the next list element */
- nextfd = thisfd->next;
-
- (void) _del_client(thisfd);
-
- DEBUGLOG("(%p) removeme set with %d monitored fds remaining\n", thisfd, _local_client_count);
-
- /* Queue cleanup, this also frees the client struct */
- add_to_lvmqueue(thisfd, NULL, 0, NULL);
- continue;
- }
-
- if (thisfd->removeme)
- continue;
-
- /* if the cluster is not quorate then don't listen for new requests */
- if ((thisfd->type != LOCAL_RENDEZVOUS &&
- thisfd->type != LOCAL_SOCK) || quorate)
- if (thisfd->fd < FD_SETSIZE)
- FD_SET(thisfd->fd, &in);
- }
-
- select_status = select(FD_SETSIZE, &in, NULL, NULL, &tv);
-
- if (reread_config) {
- int saved_errno = errno;
-
- reread_config = 0;
- DEBUGLOG("got SIGHUP\n");
- if (clops->reread_config)
- clops->reread_config();
- errno = saved_errno;
- }
-
- if (select_status > 0) {
- char csid[MAX_CSID_LEN];
- char buf[max_cluster_message];
-
- for (thisfd = &local_client_head; thisfd; thisfd = thisfd->next) {
- if (thisfd->fd < FD_SETSIZE && FD_ISSET(thisfd->fd, &in)) {
- struct local_client *newfd = NULL;
- int ret;
-
- /* FIXME Remove from main thread in case it blocks! */
- /* Do callback */
- ret = thisfd->callback(thisfd, buf, sizeof(buf),
- csid, &newfd);
- /* Ignore EAGAIN */
- if (ret < 0 && (errno == EAGAIN || errno == EINTR)) {
- continue;
- }
-
- /* Got error or EOF: Remove it from the list safely */
- if (ret <= 0) {
- int type = thisfd->type;
-
- /* If the cluster socket shuts down, so do we */
- if (type == CLUSTER_MAIN_SOCK ||
- type == CLUSTER_INTERNAL)
- goto closedown;
-
- DEBUGLOG("(%p) ret == %d, errno = %d. removing client\n",
- thisfd, ret, errno);
- thisfd->removeme = 1;
- continue;
- }
-
- /* New client...simply add it to the list */
- if (newfd) {
- _add_client(newfd, thisfd);
- thisfd = newfd;
- }
- }
- }
- }
-
- /* Select timed out. Check for clients that have been waiting too long for a response */
- if (select_status == 0) {
- time_t the_time = time(NULL);
-
- for (thisfd = &local_client_head; thisfd; thisfd = thisfd->next) {
- if (thisfd->type == LOCAL_SOCK &&
- thisfd->bits.localsock.sent_out &&
- (thisfd->bits.localsock.sent_time + cmd_timeout) < the_time &&
- thisfd->bits.localsock.expected_replies !=
- thisfd->bits.localsock.num_replies) {
- /* Send timed out message + replies we already have */
- DEBUGLOG("Request to client %p timed-out (send: %ld, now: %ld)\n",
- thisfd, thisfd->bits.localsock.sent_time, the_time);
-
- thisfd->bits.localsock.all_success = 0;
-
- request_timed_out(thisfd);
- }
- }
- }
- if (select_status < 0) {
- if (errno == EINTR)
- continue;
-
-#ifdef DEBUG
- perror("select error");
- exit(-1);
-#endif
- }
- }
-
- closedown:
- if (quit)
- DEBUGLOG("SIGTERM received\n");
-}
-
-static __attribute__ ((noreturn)) void wait_for_child(int c_pipe, int timeout)
-{
- int child_status;
- fd_set fds;
- struct timeval tv = {timeout, 0};
-
- FD_ZERO(&fds);
- FD_SET(c_pipe, &fds);
-
- switch (select(c_pipe+1, &fds, NULL, NULL, timeout? &tv: NULL)) {
- case 0:
- fprintf(stderr, "clvmd startup timed out\n");
- exit(DFAIL_TIMEOUT);
- case 1:
- if (read(c_pipe, &child_status, sizeof(child_status)) !=
- sizeof(child_status)) {
- fprintf(stderr, "clvmd failed in initialisation\n");
- exit(DFAIL_INIT);
- }
-
- switch (child_status) {
- case SUCCESS:
- break;
- case DFAIL_INIT:
- fprintf(stderr, "clvmd failed in initialisation\n");
- break;
- case DFAIL_LOCAL_SOCK:
- fprintf(stderr, "clvmd could not create local socket\n");
- fprintf(stderr, "Another clvmd is probably already running\n");
- break;
- case DFAIL_CLUSTER_IF:
- fprintf(stderr, "clvmd could not connect to cluster manager\n");
- fprintf(stderr, "Consult syslog for more information\n");
- break;
- case DFAIL_MALLOC:
- fprintf(stderr, "clvmd failed, not enough memory\n");
- break;
- default:
- fprintf(stderr, "clvmd failed, error was %d\n", child_status);
- break;
- }
- exit(child_status);
- default:
- fprintf(stderr, "clvmd startup, select failed: %s\n", strerror(errno));
- exit(DFAIL_INIT);
- }
-}
-
-/*
- * Fork into the background and detach from our parent process.
- * In the interests of user-friendliness we wait for the daemon
- * to complete initialisation before returning its status
- * the the user.
- */
-static void be_daemon(int timeout)
-{
- int devnull = open("/dev/null", O_RDWR);
- if (devnull == -1) {
- perror("Can't open /dev/null");
- exit(3);
- }
-
- if (pipe(child_pipe)) {
- perror("Error creating pipe");
- exit(3);
- }
-
- switch (fork()) {
- case -1:
- perror("clvmd: can't fork");
- exit(2);
-
- case 0: /* Child */
- (void) close(child_pipe[0]);
- break;
-
- default: /* Parent */
- (void) close(devnull);
- (void) close(child_pipe[1]);
- wait_for_child(child_pipe[0], timeout); /* noreturn */
- }
-
- /* Detach ourself from the calling environment */
- if ((dup2(devnull, STDIN_FILENO) == -1) ||
- (dup2(devnull, STDOUT_FILENO) == -1) ||
- (dup2(devnull, STDERR_FILENO) == -1)) {
- perror("Error setting terminal FDs to /dev/null");
- log_error("Error setting terminal FDs to /dev/null: %m");
- exit(5);
- }
-
- if ((devnull > STDERR_FILENO) && close(devnull)) {
- log_sys_error("close", "/dev/null");
- exit(7);
- }
-
- if (chdir("/")) {
- log_error("Error setting current directory to /: %m");
- exit(6);
- }
-
- setsid();
-}
-
-static int verify_message(char *buf, int len)
-{
- struct clvm_header *h = (struct clvm_header *)buf;
-
- if (len < (int)sizeof(struct clvm_header)) {
- log_error("verify_message short len %d.", len);
- return -1;
- }
-
- switch (h->cmd) {
- case CLVMD_CMD_REPLY:
- case CLVMD_CMD_VERSION:
- case CLVMD_CMD_GOAWAY:
- case CLVMD_CMD_TEST:
- case CLVMD_CMD_LOCK:
- case CLVMD_CMD_UNLOCK:
- case CLVMD_CMD_LOCK_LV:
- case CLVMD_CMD_LOCK_VG:
- case CLVMD_CMD_LOCK_QUERY:
- case CLVMD_CMD_REFRESH:
- case CLVMD_CMD_GET_CLUSTERNAME:
- case CLVMD_CMD_SET_DEBUG:
- case CLVMD_CMD_VG_BACKUP:
- case CLVMD_CMD_RESTART:
- case CLVMD_CMD_SYNC_NAMES:
- break;
- default:
- log_error("verify_message bad cmd %x.", h->cmd);
- return -1;
- }
-
- /* TODO: we may be able to narrow len/flags/clientid/arglen checks based on cmd */
-
- if (h->flags & ~(CLVMD_FLAG_LOCAL | CLVMD_FLAG_SYSTEMLV | CLVMD_FLAG_NODEERRS | CLVMD_FLAG_REMOTE)) {
- log_error("verify_message bad flags %x.", h->flags);
- return -1;
- }
-
- if (h->arglen > max_cluster_message) {
- log_error("verify_message bad arglen %x max %d.", h->arglen, max_cluster_message);
- return -1;
- }
-
- return 0;
-}
-
-static void dump_message(char *buf, int len)
-{
- unsigned char row[8];
- char str[9];
- int i, j = 0;
-
- str[8] = '\0';
- if (len > 128)
- len = 128;
-
- for (i = 0; i < len; ++i) {
- row[j] = buf[i];
- str[j] = (isprint(buf[i])) ? buf[i] : ' ';
-
- if (i + 1 == len) {
- for (;j < 8; ++j) {
- row[j] = 0;
- str[j] = ' ';
- }
-
- log_error("%02x %02x %02x %02x %02x %02x %02x %02x [%s]",
- row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7], str);
- j = 0;
- }
- }
-}
-
-static int cleanup_zombie(struct local_client *thisfd)
-{
- int *status;
- struct local_client *pipe_client;
-
- if (thisfd->type != LOCAL_SOCK)
- return 0;
-
- if (!thisfd->bits.localsock.cleanup_needed)
- return 0;
-
- DEBUGLOG("(%p) EOF on local socket %d: inprogress=%d\n",
- thisfd, thisfd->fd, thisfd->bits.localsock.in_progress);
-
- if ((pipe_client = thisfd->bits.localsock.pipe_client))
- pipe_client = pipe_client->bits.pipe.client;
-
- /* If the client went away in mid command then tidy up */
- if (thisfd->bits.localsock.in_progress) {
- DEBUGLOG("Sending SIGUSR2 to pre&post thread (%p in-progress)\n", pipe_client);
- pthread_kill(thisfd->bits.localsock.threadid, SIGUSR2);
- if (pthread_mutex_trylock(&thisfd->bits.localsock.mutex))
- return 1;
- thisfd->bits.localsock.state = POST_COMMAND;
- thisfd->bits.localsock.finished = 1;
- pthread_cond_signal(&thisfd->bits.localsock.cond);
- pthread_mutex_unlock(&thisfd->bits.localsock.mutex);
-
- /* Free any unsent buffers */
- free_reply(thisfd);
- }
-
- /* Kill the subthread & free resources */
- if (thisfd->bits.localsock.threadid) {
- DEBUGLOG("(%p) Waiting for pre&post thread\n", pipe_client);
- pthread_mutex_lock(&thisfd->bits.localsock.mutex);
- thisfd->bits.localsock.state = PRE_COMMAND;
- thisfd->bits.localsock.finished = 1;
- pthread_cond_signal(&thisfd->bits.localsock.cond);
- pthread_mutex_unlock(&thisfd->bits.localsock.mutex);
-
- if ((errno = pthread_join(thisfd->bits.localsock.threadid,
- (void **) &status)))
- log_sys_error("pthread_join", "");
-
- DEBUGLOG("(%p) Joined pre&post thread\n", pipe_client);
-
- thisfd->bits.localsock.threadid = 0;
-
- /* Remove the pipe client */
- if (thisfd->bits.localsock.pipe_client) {
- struct local_client *delfd = thisfd->bits.localsock.pipe_client;
-
- (void) close(delfd->fd); /* Close pipe */
- (void) close(thisfd->bits.localsock.pipe);
-
- /* Remove pipe client */
- if (!_del_client(delfd)) {
- dm_free(delfd);
- thisfd->bits.localsock.pipe_client = NULL;
- }
- }
- }
-
- /* Free the command buffer */
- dm_free(thisfd->bits.localsock.cmd);
-
- safe_close(&(thisfd->fd));
- thisfd->bits.localsock.cleanup_needed = 0;
-
- return 0;
-}
-
-/* Called when we have a read from the local socket.
- was in the main loop but it's grown up and is a big girl now */
-static int read_from_local_sock(struct local_client *thisfd)
-{
- int len;
- int argslen;
- int missing_len;
- char buffer[PIPE_BUF + 1];
- char csid[MAX_CSID_LEN];
- int comms_pipe[2];
- struct local_client *newfd;
- struct clvm_header *inheader = (struct clvm_header *) buffer;
- int status;
-
- len = read(thisfd->fd, buffer, sizeof(buffer) - 1);
- if (len == -1 && errno == EINTR)
- return 1;
-
- DEBUGLOG("(%p) Read on local socket %d, len = %d\n", thisfd, thisfd->fd, len);
-
- if (len && verify_message(buffer, len) < 0) {
- log_error("read_from_local_sock from %d len %d bad verify.",
- thisfd->fd, len);
- dump_message(buffer, len);
- /* force error handling below */
- len = 0;
- }
-
- /* EOF or error on socket */
- if (len <= 0) {
- thisfd->bits.localsock.cleanup_needed = 1;
- (void) cleanup_zombie(thisfd); /* ignore errors here */
- return 0;
- }
-
- buffer[len] = 0; /* Ensure \0 terminated */
-
- /* Fill in the client ID */
- inheader->clientid = htonl(thisfd->fd);
-
- /* If we are already busy then return an error */
- if (thisfd->bits.localsock.in_progress) {
- struct clvm_header reply = {
- .cmd = CLVMD_CMD_REPLY,
- .status = EBUSY
- };
- send_message(&reply, sizeof(reply), our_csid, thisfd->fd,
- "Error sending EBUSY reply to local user");
- return len;
- }
-
- /* See if we have the whole message */
- argslen = len - strlen(inheader->node) - sizeof(struct clvm_header);
- missing_len = inheader->arglen - argslen;
-
- if (missing_len < 0)
- missing_len = 0;
-
- /* We need at least sizeof(struct clvm_header) bytes in buffer */
- if (len < (int)sizeof(struct clvm_header) || /* Already handled in verify_message() */
- argslen < 0 || missing_len > MAX_MISSING_LEN) {
- struct clvm_header reply = {
- .cmd = CLVMD_CMD_REPLY,
- .status = EINVAL
- };
- send_message(&reply, sizeof(reply), our_csid, thisfd->fd,
- "Error sending EINVAL reply to local user");
- return 0;
- }
-
- /* Free any old buffer space */
- dm_free(thisfd->bits.localsock.cmd);
-
- /* Save the message */
- if (!(thisfd->bits.localsock.cmd = dm_malloc(len + missing_len))) {
- struct clvm_header reply = {
- .cmd = CLVMD_CMD_REPLY,
- .status = ENOMEM
- };
- send_message(&reply, sizeof(reply), our_csid, thisfd->fd,
- "Error sending ENOMEM reply to local user");
- return 0;
- }
- memcpy(thisfd->bits.localsock.cmd, buffer, len);
- thisfd->bits.localsock.cmd_len = len + missing_len;
- inheader = (struct clvm_header *) thisfd->bits.localsock.cmd;
-
- /* If we don't have the full message then read the rest now */
- if (missing_len) {
- char *argptr = inheader->node + strlen(inheader->node) + 1;
-
- while (missing_len > 0) {
- DEBUGLOG("(%p) got %d bytes, need another %d (total %d)\n",
- thisfd, argslen, missing_len, inheader->arglen);
- len = read(thisfd->fd, argptr + argslen, missing_len);
- if (len == -1 && errno == EINTR)
- continue;
-
- if (len <= 0) {
- /* EOF or error on socket */
- DEBUGLOG("(%p) EOF on local socket\n", thisfd);
- dm_free(thisfd->bits.localsock.cmd);
- thisfd->bits.localsock.cmd = NULL;
- return 0;
- }
-
- missing_len -= len;
- argslen += len;
- }
- }
-
- /* Only run the command if all the cluster nodes are running CLVMD */
- if (((inheader->flags & CLVMD_FLAG_LOCAL) == 0) &&
- (check_all_clvmds_running(thisfd) == -1)) {
- thisfd->bits.localsock.expected_replies = 0;
- thisfd->bits.localsock.num_replies = 0;
- send_local_reply(thisfd, EHOSTDOWN, thisfd->fd);
- return len;
- }
-
- /* Check the node name for validity */
- if (inheader->node[0] && clops->csid_from_name(csid, inheader->node)) {
- /* Error, node is not in the cluster */
- struct clvm_header reply = {
- .cmd = CLVMD_CMD_REPLY,
- .status = ENOENT
- };
-
- DEBUGLOG("(%p) Unknown node: '%s'\n", thisfd, inheader->node);
- send_message(&reply, sizeof(reply), our_csid, thisfd->fd,
- "Error sending ENOENT reply to local user");
- thisfd->bits.localsock.expected_replies = 0;
- thisfd->bits.localsock.num_replies = 0;
- thisfd->bits.localsock.in_progress = FALSE;
- thisfd->bits.localsock.sent_out = FALSE;
- return len;
- }
-
- /* If we already have a subthread then just signal it to start */
- if (thisfd->bits.localsock.threadid) {
- pthread_mutex_lock(&thisfd->bits.localsock.mutex);
- thisfd->bits.localsock.state = PRE_COMMAND;
- pthread_cond_signal(&thisfd->bits.localsock.cond);
- pthread_mutex_unlock(&thisfd->bits.localsock.mutex);
- return len;
- }
-
- /* Create a pipe and add the reading end to our FD list */
- if (pipe(comms_pipe)) {
- struct clvm_header reply = {
- .cmd = CLVMD_CMD_REPLY,
- .status = EBUSY
- };
-
- DEBUGLOG("(%p) Creating pipe failed: %s\n", thisfd, strerror(errno));
- send_message(&reply, sizeof(reply), our_csid, thisfd->fd,
- "Error sending EBUSY reply to local user");
- return len;
- }
-
- if (!(newfd = dm_zalloc(sizeof(*newfd)))) {
- struct clvm_header reply = {
- .cmd = CLVMD_CMD_REPLY,
- .status = ENOMEM
- };
-
- (void) close(comms_pipe[0]);
- (void) close(comms_pipe[1]);
-
- send_message(&reply, sizeof(reply), our_csid, thisfd->fd,
- "Error sending ENOMEM reply to local user");
- return len;
- }
-
- DEBUGLOG("(%p) Creating pipe, [%d, %d]\n", thisfd, comms_pipe[0], comms_pipe[1]);
-
- if (fcntl(comms_pipe[0], F_SETFD, 1))
- DEBUGLOG("setting CLOEXEC on pipe[0] failed: %s\n", strerror(errno));
- if (fcntl(comms_pipe[1], F_SETFD, 1))
- DEBUGLOG("setting CLOEXEC on pipe[1] failed: %s\n", strerror(errno));
-
- newfd->fd = comms_pipe[0];
- newfd->type = THREAD_PIPE;
- newfd->callback = local_pipe_callback;
- newfd->bits.pipe.client = thisfd;
-
- _add_client(newfd, thisfd);
-
- /* Store a cross link to the pipe */
- thisfd->bits.localsock.pipe_client = newfd;
- thisfd->bits.localsock.pipe = comms_pipe[1];
-
- /* Make sure the thread has a copy of it's own ID */
- newfd->bits.pipe.threadid = thisfd->bits.localsock.threadid;
-
- /* Run the pre routine */
- thisfd->bits.localsock.in_progress = TRUE;
- thisfd->bits.localsock.state = PRE_COMMAND;
- thisfd->bits.localsock.cleanup_needed = 1;
- DEBUGLOG("(%p) Creating pre&post thread for pipe fd %d\n", newfd, newfd->fd);
- status = pthread_create(&thisfd->bits.localsock.threadid,
- &stack_attr, pre_and_post_thread, thisfd);
- DEBUGLOG("(%p) Created pre&post thread, state = %d\n", newfd, status);
-
- return len;
-}
-
-/* Add a file descriptor from the cluster or comms interface to
- our list of FDs for select
-*/
-
-/* Called when the pre-command has completed successfully - we
- now execute the real command on all the requested nodes */
-static int distribute_command(struct local_client *thisfd)
-{
- struct clvm_header *inheader =
- (struct clvm_header *) thisfd->bits.localsock.cmd;
- int len = thisfd->bits.localsock.cmd_len;
-
- thisfd->xid = global_xid++;
- DEBUGLOG("(%p) distribute command: XID = %d, flags=0x%x (%s%s)\n",
- thisfd, thisfd->xid, inheader->flags,
- (inheader->flags & CLVMD_FLAG_LOCAL) ? "LOCAL" : "",
- (inheader->flags & CLVMD_FLAG_REMOTE) ? "REMOTE" : "");
-
- /* Forward it to other nodes in the cluster if needed */
- if (!(inheader->flags & CLVMD_FLAG_LOCAL)) {
- /* if node is empty then do it on the whole cluster */
- if (inheader->node[0] == '\0') {
- thisfd->bits.localsock.expected_replies =
- clops->get_num_nodes();
- thisfd->bits.localsock.num_replies = 0;
- thisfd->bits.localsock.sent_time = time(NULL);
- thisfd->bits.localsock.in_progress = TRUE;
- thisfd->bits.localsock.sent_out = TRUE;
-
- /*
- * Send to local node first, even if CLVMD_FLAG_REMOTE
- * is set so we still get a reply if this is the
- * only node.
- */
- add_to_lvmqueue(thisfd, inheader, len, NULL);
-
- DEBUGLOG("(%p) Sending message to all cluster nodes\n", thisfd);
- inheader->xid = thisfd->xid;
- send_message(inheader, len, NULL, -1,
- "Error forwarding message to cluster");
- } else {
- /* Do it on a single node */
- char csid[MAX_CSID_LEN];
-
- if (clops->csid_from_name(csid, inheader->node))
- /* This has already been checked so should not happen */
- return 0;
-
- /* OK, found a node... */
- thisfd->bits.localsock.in_progress = TRUE;
- thisfd->bits.localsock.expected_replies = 1;
- thisfd->bits.localsock.num_replies = 0;
-
- /* Are we the requested node ?? */
- if (memcmp(csid, our_csid, max_csid_len) == 0) {
- DEBUGLOG("(%p) Doing command on local node only\n", thisfd);
- add_to_lvmqueue(thisfd, inheader, len, NULL);
- } else {
- DEBUGLOG("(%p) Sending message to single node: %s\n",
- thisfd, inheader->node);
- inheader->xid = thisfd->xid;
- send_message(inheader, len, csid, -1,
- "Error forwarding message to cluster node");
- }
- }
- } else {
- /* Local explicitly requested, ignore nodes */
- thisfd->bits.localsock.in_progress = TRUE;
- thisfd->bits.localsock.expected_replies = 1;
- thisfd->bits.localsock.num_replies = 0;
- DEBUGLOG("(%p) Doing command explicitly on local node only\n", thisfd);
- add_to_lvmqueue(thisfd, inheader, len, NULL);
- }
-
- return 0;
-}
-
-/* Process a command from a remote node and return the result */
-static void process_remote_command(struct clvm_header *msg, int msglen, int fd,
- const char *csid)
-{
- char *replyargs;
- char nodename[max_cluster_member_name_len];
- int replylen = 0;
- int buflen = max_cluster_message - sizeof(struct clvm_header) - 1;
- int status;
-
- /* Get the node name as we /may/ need it later */
- clops->name_from_csid(csid, nodename);
-
- DEBUGLOG("process_remote_command %s for clientid 0x%x XID %d on node %s\n",
- decode_cmd(msg->cmd), msg->clientid, msg->xid, nodename);
-
- /* Check for GOAWAY and sulk */
- if (msg->cmd == CLVMD_CMD_GOAWAY) {
- DEBUGLOG("Told to go away by %s\n", nodename);
- log_error("Told to go away by %s.", nodename);
- exit(99);
- }
-
- /* Version check is internal - don't bother exposing it in clvmd-command.c */
- if (msg->cmd == CLVMD_CMD_VERSION) {
- int version_nums[3];
- char node[256];
-
- memcpy(version_nums, msg->args, sizeof(version_nums));
-
- clops->name_from_csid(csid, node);
- DEBUGLOG("Remote node %s is version %d.%d.%d\n",
- node, ntohl(version_nums[0]),
- ntohl(version_nums[1]), ntohl(version_nums[2]));
-
- if (ntohl(version_nums[0]) != CLVMD_MAJOR_VERSION) {
- struct clvm_header byebyemsg = {
- .cmd = CLVMD_CMD_GOAWAY
- };
-
- DEBUGLOG("Telling node %s to go away because of incompatible version number\n",
- node);
- log_notice("Telling node %s to go away because of incompatible version number %d.%d.%d\n",
- node, ntohl(version_nums[0]),
- ntohl(version_nums[1]), ntohl(version_nums[2]));
-
- clops->cluster_send_message(&byebyemsg, sizeof(byebyemsg), our_csid,
- "Error Sending GOAWAY message");
- } else
- clops->add_up_node(csid);
-
- return;
- }
-
- /* Allocate a default reply buffer */
- if ((replyargs = dm_malloc(max_cluster_message - sizeof(struct clvm_header))))
- /* Run the command */
- /* FIXME: usage of init_test() is unprotected */
- status = do_command(NULL, msg, msglen, &replyargs,
- buflen, &replylen);
- else
- status = ENOMEM;
-
- /* If it wasn't a reply, then reply */
- if (msg->cmd != CLVMD_CMD_REPLY) {
- char *aggreply;
-
- aggreply = dm_realloc(replyargs, replylen + sizeof(struct clvm_header));
- if (aggreply) {
- struct clvm_header *agghead =
- (struct clvm_header *) aggreply;
-
- replyargs = aggreply;
- /* Move it up so there's room for a header in front of the data */
- memmove(aggreply + offsetof(struct clvm_header, args),
- replyargs, replylen);
-
- agghead->xid = msg->xid;
- agghead->cmd = CLVMD_CMD_REPLY;
- agghead->status = status;
- agghead->flags = 0;
- agghead->clientid = msg->clientid;
- agghead->arglen = replylen;
- agghead->node[0] = '\0';
- send_message(aggreply, sizeof(struct clvm_header) + replylen,
- csid, fd, "Error sending command reply");
- } else {
- /* Return a failure response */
- struct clvm_header reply = {
- .cmd = CLVMD_CMD_REPLY,
- .status = ENOMEM,
- .clientid = msg->clientid
- };
- DEBUGLOG("Error attempting to realloc return buffer\n");
- send_message(&reply, sizeof(reply), csid, fd,
- "Error sending ENOMEM command reply");
- }
- }
-
- dm_free(replyargs);
-}
-
-/* Add a reply to a command to the list of replies for this client.
- If we have got a full set then send them to the waiting client down the local
- socket */
-static void add_reply_to_list(struct local_client *client, int status,
- const char *csid, const char *buf, int len)
-{
- struct node_reply *reply;
-
- /* Add it to the list of replies */
- if (!(reply = dm_zalloc(sizeof(*reply)))) {
- /* It's all gone horribly wrong... */
- send_local_reply(client, ENOMEM, client->fd);
- return;
- }
-
- reply->status = status;
- clops->name_from_csid(csid, reply->node);
- DEBUGLOG("(%p) Reply from node %s: %d bytes\n", client, reply->node, len);
-
- if (len > 0) {
- if (!(reply->replymsg = dm_malloc(len)))
- reply->status = ENOMEM;
- else
- memcpy(reply->replymsg, buf, len);
- } else
- reply->replymsg = NULL;
-
- pthread_mutex_lock(&client->bits.localsock.mutex);
-
- if (client->bits.localsock.finished) {
- dm_free(reply->replymsg);
- dm_free(reply);
- } else {
- /* Hook it onto the reply chain */
- reply->next = client->bits.localsock.replies;
- client->bits.localsock.replies = reply;
-
- /* If we have the whole lot then do the post-process */
- /* Post-process the command */
- if (++client->bits.localsock.num_replies ==
- client->bits.localsock.expected_replies) {
- client->bits.localsock.state = POST_COMMAND;
- pthread_cond_signal(&client->bits.localsock.cond);
- }
- DEBUGLOG("(%p) Got %d replies, expecting: %d\n",
- client, client->bits.localsock.num_replies,
- client->bits.localsock.expected_replies);
- }
- pthread_mutex_unlock(&client->bits.localsock.mutex);
-}
-
-/* This is the thread that runs the PRE and post commands for a particular connection */
-static __attribute__ ((noreturn)) void *pre_and_post_thread(void *arg)
-{
- struct local_client *client = (struct local_client *) arg;
- int status;
- int write_status;
- sigset_t ss;
- int pipe_fd = client->bits.localsock.pipe;
-
- DEBUGLOG("(%p) Pre&post thread pipe fd %d\n", client, pipe_fd);
- pthread_mutex_lock(&client->bits.localsock.mutex);
-
- /* Ignore SIGUSR1 (handled by master process) but enable
- SIGUSR2 (kills subthreads) */
- sigemptyset(&ss);
- sigaddset(&ss, SIGUSR1);
- pthread_sigmask(SIG_BLOCK, &ss, NULL);
-
- sigdelset(&ss, SIGUSR1);
- sigaddset(&ss, SIGUSR2);
- pthread_sigmask(SIG_UNBLOCK, &ss, NULL);
-
- /* Loop around doing PRE and POST functions until the client goes away */
- while (!client->bits.localsock.finished) {
- /* Execute the code */
- /* FIXME: usage of init_test() is unprotected as in do_command() */
- if ((status = do_pre_command(client)))
- client->bits.localsock.all_success = 0;
-
- DEBUGLOG("(%p) Pre&post thread writes status %d down to pipe fd %d\n",
- client, status, pipe_fd);
-
- /* Tell the parent process we have finished this bit */
- while ((write_status = write(pipe_fd, &status, sizeof(int))) != sizeof(int))
- if (write_status >=0 || (errno != EINTR && errno != EAGAIN)) {
- log_error("Error sending to pipe: %m");
- break;
- }
-
- if (status) {
- client->bits.localsock.state = POST_COMMAND;
- goto next_pre;
- }
-
- /* We may need to wait for the condition variable before running the post command */
- if (client->bits.localsock.state != POST_COMMAND &&
- !client->bits.localsock.finished) {
- DEBUGLOG("(%p) Pre&post thread waiting to do post command, state = %d\n",
- client, client->bits.localsock.state);
- pthread_cond_wait(&client->bits.localsock.cond,
- &client->bits.localsock.mutex);
- }
-
- DEBUGLOG("(%p) Pre&post thread got post command condition...\n", client);
-
- /* POST function must always run, even if the client aborts */
- status = 0;
- do_post_command(client);
-
- while ((write_status = write(pipe_fd, &status, sizeof(int))) != sizeof(int))
- if (write_status >=0 || (errno != EINTR && errno != EAGAIN)) {
- log_error("Error sending to pipe: %m");
- break;
- }
-next_pre:
- if (client->bits.localsock.state != PRE_COMMAND &&
- !client->bits.localsock.finished) {
- DEBUGLOG("(%p) Pre&post thread waiting for next pre command\n", client);
- pthread_cond_wait(&client->bits.localsock.cond,
- &client->bits.localsock.mutex);
- }
-
- DEBUGLOG("(%p) Pre&post thread got pre command condition...\n", client);
- }
- pthread_mutex_unlock(&client->bits.localsock.mutex);
- DEBUGLOG("(%p) Pre&post thread finished\n", client);
-
- pthread_exit(NULL);
-}
-
-/* Process a command on the local node and store the result */
-static int process_local_command(struct clvm_header *msg, int msglen,
- struct local_client *client,
- unsigned short xid)
-{
- char *replybuf;
- int buflen = max_cluster_message - sizeof(struct clvm_header) - 1;
- int replylen = 0;
- int status;
-
- if (!(replybuf = dm_malloc(max_cluster_message)))
- return -1;
-
- DEBUGLOG("(%p) process_local_command: %s msg=%p, msglen =%d\n",
- client, decode_cmd(msg->cmd), msg, msglen);
-
- /* If remote flag is set, just set a successful status code. */
- if (msg->flags & CLVMD_FLAG_REMOTE)
- status = 0;
- else
- status = do_command(client, msg, msglen, &replybuf, buflen, &replylen);
-
- if (status)
- client->bits.localsock.all_success = 0;
-
- /* If we took too long then discard the reply */
- if (xid == client->xid)
- add_reply_to_list(client, status, our_csid, replybuf, replylen);
- else
- DEBUGLOG("(%p) Local command took too long, discarding xid %d, current is %d\n",
- client, xid, client->xid);
-
- dm_free(replybuf);
-
- return status;
-}
-
-static int process_reply(const struct clvm_header *msg, int msglen, const char *csid)
-{
- struct local_client *client;
-
- if (!(client = find_client(msg->clientid))) {
- DEBUGLOG("Got message for unknown client 0x%x\n",
- msg->clientid);
- log_error("Got message for unknown client 0x%x.",
- msg->clientid);
- return -1;
- }
-
- if (msg->status)
- client->bits.localsock.all_success = 0;
-
- /* Gather replies together for this client id */
- if (msg->xid == client->xid)
- add_reply_to_list(client, msg->status, csid, msg->args,
- msg->arglen);
- else
- DEBUGLOG("Discarding reply with old XID %d, current = %d\n",
- msg->xid, client->xid);
-
- return 0;
-}
-
-/* Send an aggregated reply back to the client */
-static void send_local_reply(struct local_client *client, int status, int fd)
-{
- struct clvm_header *clientreply;
- struct node_reply *thisreply = client->bits.localsock.replies;
- char *replybuf;
- char *ptr;
- int message_len = 0;
-
- DEBUGLOG("(%p) Send local reply\n", client);
-
- /* Work out the total size of the reply */
- while (thisreply) {
- if (thisreply->replymsg)
- message_len += strlen(thisreply->replymsg) + 1;
- else
- message_len++;
-
- message_len += strlen(thisreply->node) + 1 + sizeof(int);
-
- thisreply = thisreply->next;
- }
-
- /* Add in the size of our header */
- message_len = message_len + sizeof(struct clvm_header);
- if (!(replybuf = dm_malloc(message_len))) {
- DEBUGLOG("(%p) Memory allocation fails\n", client);
- return;
- }
-
- clientreply = (struct clvm_header *) replybuf;
- clientreply->status = status;
- clientreply->cmd = CLVMD_CMD_REPLY;
- clientreply->node[0] = '\0';
- clientreply->xid = 0;
- clientreply->clientid = 0;
- clientreply->flags = 0;
-
- ptr = clientreply->args;
-
- /* Add in all the replies, and free them as we go */
- thisreply = client->bits.localsock.replies;
- while (thisreply) {
- struct node_reply *tempreply = thisreply;
-
- strcpy(ptr, thisreply->node);
- ptr += strlen(thisreply->node) + 1;
-
- if (thisreply->status)
- clientreply->flags |= CLVMD_FLAG_NODEERRS;
-
- memcpy(ptr, &thisreply->status, sizeof(int));
- ptr += sizeof(int);
-
- if (thisreply->replymsg) {
- strcpy(ptr, thisreply->replymsg);
- ptr += strlen(thisreply->replymsg) + 1;
- } else {
- ptr[0] = '\0';
- ptr++;
- }
- thisreply = thisreply->next;
-
- dm_free(tempreply->replymsg);
- dm_free(tempreply);
- }
-
- /* Terminate with an empty node name */
- *ptr = '\0';
-
- clientreply->arglen = ptr - clientreply->args;
-
- /* And send it */
- send_message(replybuf, message_len, our_csid, fd,
- "Error sending REPLY to client");
- dm_free(replybuf);
-
- /* Reset comms variables */
- client->bits.localsock.replies = NULL;
- client->bits.localsock.expected_replies = 0;
- client->bits.localsock.in_progress = FALSE;
- client->bits.localsock.sent_out = FALSE;
-}
-
-/* Just free a reply chain baceuse it wasn't used. */
-static void free_reply(struct local_client *client)
-{
- /* Add in all the replies, and free them as we go */
- struct node_reply *thisreply = client->bits.localsock.replies;
- while (thisreply) {
- struct node_reply *tempreply = thisreply;
-
- thisreply = thisreply->next;
-
- dm_free(tempreply->replymsg);
- dm_free(tempreply);
- }
- client->bits.localsock.replies = NULL;
-}
-
-/* Send our version number to the cluster */
-static void send_version_message(void)
-{
- char message[sizeof(struct clvm_header) + sizeof(int) * 3];
- struct clvm_header *msg = (struct clvm_header *) message;
- int version_nums[3] = {
- htonl(CLVMD_MAJOR_VERSION),
- htonl(CLVMD_MINOR_VERSION),
- htonl(CLVMD_PATCH_VERSION)
- };
-
- msg->cmd = CLVMD_CMD_VERSION;
- msg->status = 0;
- msg->flags = 0;
- msg->clientid = 0;
- msg->arglen = sizeof(version_nums);
-
- memcpy(&msg->args, version_nums, sizeof(version_nums));
-
- hton_clvm(msg);
-
- clops->cluster_send_message(message, sizeof(message), NULL,
- "Error Sending version number");
-}
-
-/* Send a message to either a local client or another server */
-static int send_message(void *buf, int msglen, const char *csid, int fd,
- const char *errtext)
-{
- int len = 0;
- int ptr;
- struct timespec delay;
- struct timespec remtime;
- int retry_cnt = 0;
-
- /* Send remote messages down the cluster socket */
- if (!csid || !ISLOCAL_CSID(csid)) {
- hton_clvm((struct clvm_header *) buf);
- return clops->cluster_send_message(buf, msglen, csid, errtext);
- }
-
- if (fd < 0)
- return 0;
-
- /* Make sure it all goes */
- for (ptr = 0; ptr < msglen;) {
- if ((len = write(fd, (char*)buf + ptr, msglen - ptr)) <= 0) {
- if (errno == EINTR)
- continue;
- if ((errno == EAGAIN || errno == EIO || errno == ENOSPC) &&
- ++retry_cnt < MAX_RETRIES) {
- delay.tv_sec = 0;
- delay.tv_nsec = 100000;
- remtime.tv_sec = 0;
- remtime.tv_nsec = 0;
- (void) nanosleep (&delay, &remtime);
- continue;
- }
- DEBUGLOG("%s", errtext);
- log_error("%s", errtext);
- break;
- }
- ptr += len;
- }
-
- return len;
-}
-
-static int process_work_item(struct lvm_thread_cmd *cmd)
-{
- /* If msg is NULL then this is a cleanup request */
- if (cmd->msg == NULL) {
- DEBUGLOG("(%p) process_work_item: free\n", cmd->client);
- cmd_client_cleanup(cmd->client);
- pthread_mutex_destroy(&cmd->client->bits.localsock.mutex);
- pthread_cond_destroy(&cmd->client->bits.localsock.cond);
- dm_free(cmd->client);
- return 0;
- }
-
- if (!cmd->remote) {
- DEBUGLOG("(%p) process_work_item: local\n", cmd->client);
- process_local_command(cmd->msg, cmd->msglen, cmd->client,
- cmd->xid);
- } else {
- DEBUGLOG("(%p) process_work_item: remote\n", cmd->client);
- process_remote_command(cmd->msg, cmd->msglen, cmd->client->fd,
- cmd->csid);
- }
-
- return 0;
-}
-
-/*
- * Routine that runs in the "LVM thread".
- */
-static void *lvm_thread_fn(void *arg)
-{
- sigset_t ss;
- struct lvm_startup_params *lvm_params = arg;
- struct lvm_thread_cmd *cmd;
-
- DEBUGLOG("LVM thread function started\n");
-
- /* Ignore SIGUSR1 & 2 */
- sigemptyset(&ss);
- sigaddset(&ss, SIGUSR1);
- sigaddset(&ss, SIGUSR2);
- pthread_sigmask(SIG_BLOCK, &ss, NULL);
-
- /* Initialise the interface to liblvm */
- init_clvm(lvm_params->excl_uuid);
-
- /* Allow others to get moving */
- pthread_barrier_wait(&lvm_start_barrier);
- DEBUGLOG("LVM thread ready for work.\n");
-
- /* Now wait for some actual work */
- pthread_mutex_lock(&lvm_thread_mutex);
-
- for (;;) {
- while (!dm_list_empty(&lvm_cmd_head)) {
- cmd = dm_list_item(dm_list_first(&lvm_cmd_head),
- struct lvm_thread_cmd);
- dm_list_del(&cmd->list);
- pthread_mutex_unlock(&lvm_thread_mutex);
-
- process_work_item(cmd);
- dm_free(cmd->msg);
- dm_free(cmd);
-
- pthread_mutex_lock(&lvm_thread_mutex);
- }
-
- if (lvm_thread_exit)
- break;
-
- DEBUGLOG("LVM thread waiting for work\n");
- pthread_cond_wait(&lvm_thread_cond, &lvm_thread_mutex);
- }
-
- pthread_mutex_unlock(&lvm_thread_mutex);
- DEBUGLOG("LVM thread exits\n");
-
- destroy_lvm();
-
- pthread_exit(NULL);
-}
-
-/* Pass down some work to the LVM thread */
-static int add_to_lvmqueue(struct local_client *client, struct clvm_header *msg,
- int msglen, const char *csid)
-{
- struct lvm_thread_cmd *cmd;
-
- if (!(cmd = dm_malloc(sizeof(*cmd))))
- return ENOMEM;
-
- if (msglen) {
- if (!(cmd->msg = dm_malloc(msglen))) {
- log_error("Unable to allocate buffer space.");
- dm_free(cmd);
- return -1;
- }
- memcpy(cmd->msg, msg, msglen);
- }
- else
- cmd->msg = NULL;
-
- cmd->client = client;
- cmd->msglen = msglen;
- cmd->xid = client->xid;
-
- if (csid) {
- memcpy(cmd->csid, csid, max_csid_len);
- cmd->remote = 1;
- } else
- cmd->remote = 0;
-
- DEBUGLOG("(%p) add_to_lvmqueue: cmd=%p, msg=%p, len=%d, csid=%p, xid=%d\n",
- client, cmd, msg, msglen, csid, cmd->xid);
- pthread_mutex_lock(&lvm_thread_mutex);
- if (lvm_thread_exit) {
- pthread_mutex_unlock(&lvm_thread_mutex);
- dm_free(cmd->msg);
- dm_free(cmd);
- return -1; /* We are about to exit */
- }
- dm_list_add(&lvm_cmd_head, &cmd->list);
- pthread_cond_signal(&lvm_thread_cond);
- pthread_mutex_unlock(&lvm_thread_mutex);
-
- return 0;
-}
-
-/* Return 0 if we can talk to an existing clvmd */
-/*
- * FIXME:
- *
- * This function returns only -1 or 0, but there are
- * different levels of errors, some of them should stop
- * further execution of clvmd thus another state is needed
- * and some error message need to be only informational.
- */
-static int check_local_clvmd(void)
-{
- int local_socket;
- int ret = 0;
- struct sockaddr_un sockaddr = { .sun_family = AF_UNIX };
-
- if (!dm_strncpy(sockaddr.sun_path, CLVMD_SOCKNAME, sizeof(sockaddr.sun_path))) {
- log_error("%s: clvmd socket name too long.", CLVMD_SOCKNAME);
- return -1;
- }
-
- /* Open local socket */
- if ((local_socket = socket(PF_UNIX, SOCK_STREAM, 0)) < 0) {
- log_sys_error("socket", "local socket");
- return -1;
- }
-
- if (connect(local_socket,(struct sockaddr *) &sockaddr,
- sizeof(sockaddr))) {
- /* connection failure is expected state */
- if (errno == ENOENT)
- log_sys_debug("connect", "local socket");
- else
- log_sys_error("connect", "local socket");
- ret = -1;
- }
-
- if (close(local_socket))
- log_sys_error("close", "local socket");
-
- return ret;
-}
-
-static void close_local_sock(int local_socket)
-{
- if (local_socket != -1 && close(local_socket))
- log_sys_error("close", CLVMD_SOCKNAME);
-
- if (CLVMD_SOCKNAME[0] != '\0' && unlink(CLVMD_SOCKNAME))
- stack;
-}
-
-/* Open the local socket, that's the one we talk to libclvm down */
-static int open_local_sock(void)
-{
- mode_t old_mask;
- int local_socket = -1;
- struct sockaddr_un sockaddr = { .sun_family = AF_UNIX };
-
- if (!dm_strncpy(sockaddr.sun_path, CLVMD_SOCKNAME, sizeof(sockaddr.sun_path))) {
- log_error("%s: clvmd socket name too long.", CLVMD_SOCKNAME);
- return -1;
- }
-
- close_local_sock(local_socket);
-
- (void) dm_prepare_selinux_context(CLVMD_SOCKNAME, S_IFSOCK);
- old_mask = umask(0077);
-
- /* Open local socket */
- if ((local_socket = socket(PF_UNIX, SOCK_STREAM, 0)) < 0) {
- log_error("Can't create local socket: %m");
- goto error;
- }
-
- /* Set Close-on-exec & non-blocking */
- if (fcntl(local_socket, F_SETFD, 1))
- DEBUGLOG("setting CLOEXEC on local_socket failed: %s\n", strerror(errno));
- if (fcntl(local_socket, F_SETFL, fcntl(local_socket, F_GETFL, 0) | O_NONBLOCK))
- DEBUGLOG("setting O_NONBLOCK on local_socket failed: %s\n", strerror(errno));
-
-
- if (bind(local_socket, (struct sockaddr *) &sockaddr, sizeof(sockaddr))) {
- log_error("can't bind local socket: %m");
- goto error;
- }
- if (listen(local_socket, 1) != 0) {
- log_error("listen local: %m");
- goto error;
- }
-
- umask(old_mask);
- (void) dm_prepare_selinux_context(NULL, 0);
- return local_socket;
-error:
- close_local_sock(local_socket);
- umask(old_mask);
- (void) dm_prepare_selinux_context(NULL, 0);
- return -1;
-}
-
-void process_message(struct local_client *client, char *buf, int len,
- const char *csid)
-{
- char nodename[max_cluster_member_name_len];
- struct clvm_header *inheader = (struct clvm_header *) buf;
- ntoh_clvm(inheader); /* Byteswap fields */
-
- if (verify_message(buf, len) < 0) {
- clops->name_from_csid(csid, nodename);
- log_error("process_message from %s len %d bad verify.", nodename, len);
- dump_message(buf, len);
- return;
- }
-
- if (inheader->cmd == CLVMD_CMD_REPLY)
- process_reply(inheader, len, csid);
- else
- add_to_lvmqueue(client, inheader, len, csid);
-}
-
-
-static void check_all_callback(struct local_client *client, const char *csid,
- int node_up)
-{
- if (!node_up)
- add_reply_to_list(client, EHOSTDOWN, csid, "CLVMD not running", 18);
-}
-
-/* Check to see if all CLVMDs are running (ie one on
- every node in the cluster).
- If not, returns -1 and prints out a list of errant nodes */
-static int check_all_clvmds_running(struct local_client *client)
-{
- DEBUGLOG("(%p) check_all_clvmds_running\n", client);
-
- return clops->cluster_do_node_callback(client, check_all_callback);
-}
-
-/* Return a local_client struct given a client ID.
- client IDs are in network byte order */
-static struct local_client *find_client(int clientid)
-{
- struct local_client *thisfd;
-
- for (thisfd = &local_client_head; thisfd; thisfd = thisfd->next)
- if (thisfd->fd == (int)ntohl(clientid))
- return thisfd;
-
- return NULL;
-}
-
-/* Byte-swapping routines for the header so we
- work in a heterogeneous environment */
-static void hton_clvm(struct clvm_header *hdr)
-{
- hdr->status = htonl(hdr->status);
- hdr->arglen = htonl(hdr->arglen);
- hdr->xid = htons(hdr->xid);
- /* Don't swap clientid as it's only a token as far as
- remote nodes are concerned */
-}
-
-static void ntoh_clvm(struct clvm_header *hdr)
-{
- hdr->status = ntohl(hdr->status);
- hdr->arglen = ntohl(hdr->arglen);
- hdr->xid = ntohs(hdr->xid);
-}
-
-/* Handler for SIGUSR2 - sent to kill subthreads */
-static void sigusr2_handler(int sig)
-{
- DEBUGLOG("SIGUSR2 received\n");
-}
-
-static void sigterm_handler(int sig)
-{
- quit = 1;
-}
-
-static void sighup_handler(int sig)
-{
- reread_config = 1;
-}
-
-int sync_lock(const char *resource, int mode, int flags, int *lockid)
-{
- return clops->sync_lock(resource, mode, flags, lockid);
-}
-
-int sync_unlock(const char *resource, int lockid)
-{
- return clops->sync_unlock(resource, lockid);
-}
-
-static if_type_t parse_cluster_interface(char *ifname)
-{
- if_type_t iface = IF_AUTO;
-
- if (!strcmp(ifname, "auto"))
- iface = IF_AUTO;
- else if (!strcmp(ifname, "cman"))
- iface = IF_CMAN;
- else if (!strcmp(ifname, "openais"))
- iface = IF_OPENAIS;
- else if (!strcmp(ifname, "corosync"))
- iface = IF_COROSYNC;
- else if (!strcmp(ifname, "singlenode"))
- iface = IF_SINGLENODE;
-
- return iface;
-}
-
-/*
- * Try and find a cluster system in corosync's objdb, if it is running. This is
- * only called if the command-line option is not present, and if it fails
- * we still try the interfaces in order.
- */
-static if_type_t get_cluster_type(void)
-{
-#ifdef HAVE_COROSYNC_CONFDB_H
- confdb_handle_t handle;
- if_type_t type = IF_AUTO;
- int result;
- char buf[255];
- size_t namelen = sizeof(buf);
- hdb_handle_t cluster_handle;
- hdb_handle_t clvmd_handle;
- confdb_callbacks_t callbacks = { 0 };
-
- result = confdb_initialize (&handle, &callbacks);
- if (result != CS_OK)
- return type;
-
- result = confdb_object_find_start(handle, OBJECT_PARENT_HANDLE);
- if (result != CS_OK)
- goto out;
-
- result = confdb_object_find(handle, OBJECT_PARENT_HANDLE, (void *)"cluster", strlen("cluster"), &cluster_handle);
- if (result != CS_OK)
- goto out;
-
- result = confdb_object_find_start(handle, cluster_handle);
- if (result != CS_OK)
- goto out;
-
- result = confdb_object_find(handle, cluster_handle, (void *)"clvmd", strlen("clvmd"), &clvmd_handle);
- if (result != CS_OK)
- goto out;
-
- result = confdb_key_get(handle, clvmd_handle, (void *)"interface", strlen("interface"), buf, &namelen);
- if (result != CS_OK)
- goto out;
-
- if (namelen >= sizeof(buf))
- namelen = sizeof(buf) - 1;
-
- buf[namelen] = '\0';
- type = parse_cluster_interface(buf);
- DEBUGLOG("got interface type '%s' from confdb\n", buf);
-out:
- confdb_finalize(handle);
- return type;
-#else
- return IF_AUTO;
-#endif
-}