summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorJaime Caamano Ruiz <jcaamano@suse.com>2020-06-16 13:00:52 -0400
committerSteve Dickson <steved@redhat.com>2020-06-17 10:58:37 -0400
commite7c34df8f57331063b9d795812c62cec3ddfbc17 (patch)
treeb0abe1765cb7d440584b7fb5114109ec5de27186 /src
parentc300af4954948019eb58bd2cefdf373cb2994eff (diff)
downloadti-rpc-e7c34df8f57331063b9d795812c62cec3ddfbc17.tar.gz
libtirpc: replace array with list for per-fd lockslibtirpc-1-2-7-rc3
Currently per-fd locks for the clients are pre-allocated up to the soft limit of maximum allowed open file desciptors per process as defined in __rpc_dtbsize(): if (getrlimit(RLIMIT_NOFILE, &rl) == 0) { return (tbsize = (int)rl.rlim_cur); } This limit can be arbitrarily large for any given process resulting in unreasonable memory allocation. For example, for systemd PID1 process this limit is set to 1073741816 since version 240. systemd is an indirect user of this library as it fetches information about users, groups, etc... This patch proposes a list implementation of per-fd locks based on glibc doubly linked lists. It also includes support for a fixed array based pre-allocation up to a compile-time defined limit of locks for equivalence to the previous implementation. Signed-off-by: Steve Dickson <steved@redhat.com>
Diffstat (limited to 'src')
-rw-r--r--src/clnt_dg.c116
-rw-r--r--src/clnt_fd_locks.h205
-rw-r--r--src/clnt_vc.c149
3 files changed, 316 insertions, 154 deletions
diff --git a/src/clnt_dg.c b/src/clnt_dg.c
index eb5467f..df402ec 100644
--- a/src/clnt_dg.c
+++ b/src/clnt_dg.c
@@ -53,6 +53,7 @@
#include <unistd.h>
#include <err.h>
#include "rpc_com.h"
+#include "clnt_fd_locks.h"
#ifdef IP_RECVERR
#include <asm/types.h>
@@ -78,24 +79,28 @@ static void clnt_dg_destroy(CLIENT *);
* This machinery implements per-fd locks for MT-safety. It is not
* sufficient to do per-CLIENT handle locks for MT-safety because a
* user may create more than one CLIENT handle with the same fd behind
- * it. Therfore, we allocate an array of flags (dg_fd_locks), protected
- * by the clnt_fd_lock mutex, and an array (dg_cv) of condition variables
- * similarly protected. Dg_fd_lock[fd] == 1 => a call is activte on some
- * CLIENT handle created for that fd.
+ * it.
+ *
+ * We keep track of a list of per-fd locks, protected by the clnt_fd_lock
+ * mutex. Each per-fd lock consists of a predicate indicating whether is
+ * active or not: fd_lock->active == TRUE => a call is active on some
+ * CLIENT handle created for that fd. Each fd predicate is guarded by a
+ * condition variable so that the global mutex can be unlocked while
+ * waiting for the predicate to change.
+ *
* The current implementation holds locks across the entire RPC and reply,
* including retransmissions. Yes, this is silly, and as soon as this
* code is proven to work, this should be the first thing fixed. One step
* at a time.
*/
-static int *dg_fd_locks;
+static fd_locks_t *dg_fd_locks;
extern mutex_t clnt_fd_lock;
-static cond_t *dg_cv;
-#define release_fd_lock(fd, mask) { \
+#define release_fd_lock(fd_lock, mask) { \
mutex_lock(&clnt_fd_lock); \
- dg_fd_locks[fd] = 0; \
+ fd_lock->active = FALSE; \
mutex_unlock(&clnt_fd_lock); \
thr_sigsetmask(SIG_SETMASK, &(mask), NULL); \
- cond_signal(&dg_cv[fd]); \
+ cond_signal(&fd_lock->cv); \
}
static const char mem_err_clnt_dg[] = "clnt_dg_create: out of memory";
@@ -107,6 +112,7 @@ static const char mem_err_clnt_dg[] = "clnt_dg_create: out of memory";
*/
struct cu_data {
int cu_fd; /* connections fd */
+ fd_lock_t *cu_fd_lock;
bool_t cu_closeit; /* opened by library */
struct sockaddr_storage cu_raddr; /* remote address */
int cu_rlen;
@@ -155,47 +161,20 @@ clnt_dg_create(fd, svcaddr, program, version, sendsz, recvsz)
sigset_t newmask;
struct __rpc_sockinfo si;
int one = 1;
+ fd_lock_t *fd_lock;
sigfillset(&newmask);
thr_sigsetmask(SIG_SETMASK, &newmask, &mask);
mutex_lock(&clnt_fd_lock);
- if (dg_fd_locks == (int *) NULL) {
- size_t cv_allocsz, fd_allocsz;
- unsigned int dtbsize = __rpc_dtbsize();
-
- if ( (size_t) dtbsize > SIZE_MAX/sizeof(cond_t)) {
- mutex_unlock(&clnt_fd_lock);
- thr_sigsetmask(SIG_SETMASK, &(mask), NULL);
- errno = EOVERFLOW;
- goto err1;
- }
-
- fd_allocsz = dtbsize * sizeof (int);
- dg_fd_locks = (int *) mem_alloc(fd_allocsz);
- if (dg_fd_locks == (int *) NULL) {
- mutex_unlock(&clnt_fd_lock);
- thr_sigsetmask(SIG_SETMASK, &(mask), NULL);
- errno = ENOMEM;
- goto err1;
- } else
- memset(dg_fd_locks, '\0', fd_allocsz);
-
- cv_allocsz = dtbsize * sizeof (cond_t);
- dg_cv = (cond_t *) mem_alloc(cv_allocsz);
- if (dg_cv == (cond_t *) NULL) {
- mem_free(dg_fd_locks, fd_allocsz);
- dg_fd_locks = (int *) NULL;
- mutex_unlock(&clnt_fd_lock);
- thr_sigsetmask(SIG_SETMASK, &(mask), NULL);
- errno = ENOMEM;
+ if (dg_fd_locks == (fd_locks_t *) NULL) {
+ dg_fd_locks = fd_locks_init();
+ if (dg_fd_locks == (fd_locks_t *) NULL) {
goto err1;
- } else {
- int i;
-
- for (i = 0; i < dtbsize; i++)
- cond_init(&dg_cv[i], 0, (void *) 0);
}
}
+ fd_lock = fd_lock_create(fd, dg_fd_locks);
+ if (fd_lock == (fd_lock_t *) NULL)
+ goto err1;
mutex_unlock(&clnt_fd_lock);
thr_sigsetmask(SIG_SETMASK, &(mask), NULL);
@@ -274,6 +253,7 @@ clnt_dg_create(fd, svcaddr, program, version, sendsz, recvsz)
*/
cu->cu_closeit = FALSE;
cu->cu_fd = fd;
+ cu->cu_fd_lock = fd_lock;
cl->cl_ops = clnt_dg_ops();
cl->cl_private = (caddr_t)(void *)cu;
cl->cl_auth = authnone_create();
@@ -319,17 +299,15 @@ clnt_dg_call(cl, proc, xargs, argsp, xresults, resultsp, utimeout)
sigset_t newmask;
socklen_t salen;
ssize_t recvlen = 0;
- int rpc_lock_value;
u_int32_t xid, inval, outval;
outlen = 0;
sigfillset(&newmask);
thr_sigsetmask(SIG_SETMASK, &newmask, &mask);
mutex_lock(&clnt_fd_lock);
- while (dg_fd_locks[cu->cu_fd])
- cond_wait(&dg_cv[cu->cu_fd], &clnt_fd_lock);
- rpc_lock_value = 1;
- dg_fd_locks[cu->cu_fd] = rpc_lock_value;
+ while (cu->cu_fd_lock->active)
+ cond_wait(&cu->cu_fd_lock->cv, &clnt_fd_lock);
+ cu->cu_fd_lock->active = TRUE;
mutex_unlock(&clnt_fd_lock);
if (cu->cu_total.tv_usec == -1) {
timeout = utimeout; /* use supplied timeout */
@@ -473,7 +451,7 @@ get_reply:
mem_free(cbuf, (outlen + 256));
e = (struct sock_extended_err *) CMSG_DATA(cmsg);
cu->cu_error.re_errno = e->ee_errno;
- release_fd_lock(cu->cu_fd, mask);
+ release_fd_lock(cu->cu_fd_lock, mask);
return (cu->cu_error.re_status = RPC_CANTRECV);
}
mem_free(cbuf, (outlen + 256));
@@ -553,7 +531,7 @@ get_reply:
}
out:
- release_fd_lock(cu->cu_fd, mask);
+ release_fd_lock(cu->cu_fd_lock, mask);
return (cu->cu_error.re_status);
}
@@ -582,13 +560,14 @@ clnt_dg_freeres(cl, xdr_res, res_ptr)
sigfillset(&newmask);
thr_sigsetmask(SIG_SETMASK, &newmask, &mask);
mutex_lock(&clnt_fd_lock);
- while (dg_fd_locks[cu->cu_fd])
- cond_wait(&dg_cv[cu->cu_fd], &clnt_fd_lock);
+ while (cu->cu_fd_lock->active)
+ cond_wait(&cu->cu_fd_lock->cv, &clnt_fd_lock);
+ cu->cu_fd_lock->active = TRUE;
xdrs->x_op = XDR_FREE;
dummy = (*xdr_res)(xdrs, res_ptr);
mutex_unlock(&clnt_fd_lock);
thr_sigsetmask(SIG_SETMASK, &mask, NULL);
- cond_signal(&dg_cv[cu->cu_fd]);
+ cond_signal(&cu->cu_fd_lock->cv);
return (dummy);
}
@@ -609,36 +588,34 @@ clnt_dg_control(cl, request, info)
struct netbuf *addr;
sigset_t mask;
sigset_t newmask;
- int rpc_lock_value;
sigfillset(&newmask);
thr_sigsetmask(SIG_SETMASK, &newmask, &mask);
mutex_lock(&clnt_fd_lock);
- while (dg_fd_locks[cu->cu_fd])
- cond_wait(&dg_cv[cu->cu_fd], &clnt_fd_lock);
- rpc_lock_value = 1;
- dg_fd_locks[cu->cu_fd] = rpc_lock_value;
+ while (cu->cu_fd_lock->active)
+ cond_wait(&cu->cu_fd_lock->cv, &clnt_fd_lock);
+ cu->cu_fd_lock->active = TRUE;
mutex_unlock(&clnt_fd_lock);
switch (request) {
case CLSET_FD_CLOSE:
cu->cu_closeit = TRUE;
- release_fd_lock(cu->cu_fd, mask);
+ release_fd_lock(cu->cu_fd_lock, mask);
return (TRUE);
case CLSET_FD_NCLOSE:
cu->cu_closeit = FALSE;
- release_fd_lock(cu->cu_fd, mask);
+ release_fd_lock(cu->cu_fd_lock, mask);
return (TRUE);
}
/* for other requests which use info */
if (info == NULL) {
- release_fd_lock(cu->cu_fd, mask);
+ release_fd_lock(cu->cu_fd_lock, mask);
return (FALSE);
}
switch (request) {
case CLSET_TIMEOUT:
if (time_not_ok((struct timeval *)info)) {
- release_fd_lock(cu->cu_fd, mask);
+ release_fd_lock(cu->cu_fd_lock, mask);
return (FALSE);
}
cu->cu_total = *(struct timeval *)info;
@@ -652,7 +629,7 @@ clnt_dg_control(cl, request, info)
break;
case CLSET_RETRY_TIMEOUT:
if (time_not_ok((struct timeval *)info)) {
- release_fd_lock(cu->cu_fd, mask);
+ release_fd_lock(cu->cu_fd_lock, mask);
return (FALSE);
}
cu->cu_wait = *(struct timeval *)info;
@@ -672,7 +649,7 @@ clnt_dg_control(cl, request, info)
case CLSET_SVC_ADDR: /* set to new address */
addr = (struct netbuf *)info;
if (addr->len < sizeof cu->cu_raddr) {
- release_fd_lock(cu->cu_fd, mask);
+ release_fd_lock(cu->cu_fd_lock, mask);
return (FALSE);
}
(void) memcpy(&cu->cu_raddr, addr->buf, addr->len);
@@ -735,10 +712,10 @@ clnt_dg_control(cl, request, info)
cu->cu_connect = *(int *)info;
break;
default:
- release_fd_lock(cu->cu_fd, mask);
+ release_fd_lock(cu->cu_fd_lock, mask);
return (FALSE);
}
- release_fd_lock(cu->cu_fd, mask);
+ release_fd_lock(cu->cu_fd_lock, mask);
return (TRUE);
}
@@ -754,8 +731,8 @@ clnt_dg_destroy(cl)
sigfillset(&newmask);
thr_sigsetmask(SIG_SETMASK, &newmask, &mask);
mutex_lock(&clnt_fd_lock);
- while (dg_fd_locks[cu_fd])
- cond_wait(&dg_cv[cu_fd], &clnt_fd_lock);
+ while (cu->cu_fd_lock->active)
+ cond_wait(&cu->cu_fd_lock->cv, &clnt_fd_lock);
if (cu->cu_closeit)
(void)close(cu_fd);
XDR_DESTROY(&(cu->cu_outxdrs));
@@ -765,9 +742,10 @@ clnt_dg_destroy(cl)
if (cl->cl_tp && cl->cl_tp[0])
mem_free(cl->cl_tp, strlen(cl->cl_tp) +1);
mem_free(cl, sizeof (CLIENT));
+ cond_signal(&cu->cu_fd_lock->cv);
+ fd_lock_destroy(cu_fd, cu->cu_fd_lock, dg_fd_locks);
mutex_unlock(&clnt_fd_lock);
thr_sigsetmask(SIG_SETMASK, &mask, NULL);
- cond_signal(&dg_cv[cu_fd]);
}
static struct clnt_ops *
diff --git a/src/clnt_fd_locks.h b/src/clnt_fd_locks.h
new file mode 100644
index 0000000..8263071
--- /dev/null
+++ b/src/clnt_fd_locks.h
@@ -0,0 +1,205 @@
+/*
+ * debug.h -- debugging routines for libtirpc
+ *
+ * Copyright (c) 2020 SUSE LINUX GmbH, Nuernberg, Germany.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ * - Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * - Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ * - Neither the name of Sun Microsystems, Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _CLNT_FD_LOCKS_H
+#define _CLNT_FD_LOCKS_H
+
+#include <sys/queue.h>
+#include <errno.h>
+#include <reentrant.h>
+#include <rpc/xdr.h>
+
+
+/*
+ * This utility manages a list of per-fd locks for the clients.
+ *
+ * If MAX_FDLOCKS_PREALLOC is defined, a number of pre-fd locks will be
+ * pre-allocated. This number is the minimum of MAX_FDLOCKS_PREALLOC or
+ * the process soft limit of allowed fds.
+ */
+#ifdef MAX_FDLOCKS_PREALLOC
+static unsigned int fd_locks_prealloc = 0;
+#endif
+
+/* per-fd lock */
+struct fd_lock_t {
+ bool_t active;
+ cond_t cv;
+};
+typedef struct fd_lock_t fd_lock_t;
+
+
+/* internal type to store per-fd locks in a list */
+struct fd_lock_item_t {
+ /* fd_lock_t first so we can cast to fd_lock_item_t */
+ fd_lock_t fd_lock;
+ int fd;
+ unsigned int refs;
+ TAILQ_ENTRY(fd_lock_item_t) link;
+};
+typedef struct fd_lock_item_t fd_lock_item_t;
+#define to_fd_lock_item(fdlock_t_ptr) ((fd_lock_item_t*) fdlock_t_ptr)
+
+
+/* internal list of per-fd locks */
+typedef TAILQ_HEAD(,fd_lock_item_t) fd_lock_list_t;
+
+
+#ifdef MAX_FDLOCKS_PREALLOC
+
+/* With pre-allocation, keep track of both an array and a list */
+struct fd_locks_t {
+ fd_lock_list_t fd_lock_list;
+ fd_lock_t *fd_lock_array;
+};
+typedef struct fd_locks_t fd_locks_t;
+#define to_fd_lock_list(fd_locks_t_ptr) (&fd_locks_t_ptr->fd_lock_list)
+
+#else
+
+/* With no pre-allocation, just keep track of a list */
+typedef fd_lock_list_t fd_locks_t;
+#define to_fd_lock_list(fd_locks_t_ptr) ((fd_lock_list_t *) fd_locks_t_ptr)
+
+#endif
+
+
+/* allocate fd locks */
+static inline
+fd_locks_t* fd_locks_init() {
+ fd_locks_t *fd_locks;
+
+ fd_locks = (fd_locks_t *) mem_alloc(sizeof(fd_locks_t));
+ if (fd_locks == (fd_locks_t *) NULL) {
+ errno = ENOMEM;
+ return (NULL);
+ }
+ TAILQ_INIT(to_fd_lock_list(fd_locks));
+
+#ifdef MAX_FDLOCKS_PREALLOC
+ size_t fd_lock_arraysz;
+
+ if (fd_locks_prealloc == 0) {
+ unsigned int dtbsize = __rpc_dtbsize();
+ if (0 < dtbsize && dtbsize < MAX_FDLOCKS_PREALLOC)
+ fd_locks_prealloc = dtbsize;
+ else
+ fd_locks_prealloc = MAX_FDLOCKS_PREALLOC;
+ }
+
+ if ( (size_t) fd_locks_prealloc > SIZE_MAX/sizeof(fd_lock_t)) {
+ errno = EOVERFLOW;
+ return (NULL);
+ }
+
+ fd_lock_arraysz = fd_locks_prealloc * sizeof (fd_lock_t);
+ fd_locks->fd_lock_array = (fd_lock_t *) mem_alloc(fd_lock_arraysz);
+ if (fd_locks->fd_lock_array == (fd_lock_t *) NULL) {
+ errno = ENOMEM;
+ return (NULL);
+ }
+ else {
+ int i;
+
+ for (i = 0; i < fd_locks_prealloc; i++) {
+ fd_locks->fd_lock_array[i].active = FALSE;
+ cond_init(&fd_locks->fd_lock_array[i].cv, 0, (void *) 0);
+ }
+ }
+#endif
+
+ return fd_locks;
+}
+
+/* de-allocate fd locks */
+static inline
+void fd_locks_destroy(fd_locks_t *fd_locks) {
+#ifdef MAX_FDLOCKS_PREALLOC
+ fd_lock_t *array = fd_locks->fd_lock_array;
+ mem_free(array, fd_locks_prealloc * sizeof (fd_lock_t));
+#endif
+ fd_lock_item_t *item;
+ fd_lock_list_t *list = to_fd_lock_list(fd_locks);
+
+ TAILQ_FOREACH(item, list, link) {
+ cond_destroy(&item->fd_lock.cv);
+ mem_free(item, sizeof (*item));
+ }
+ mem_free(fd_locks, sizeof (*fd_locks));
+}
+
+/* allocate per-fd lock */
+static inline
+fd_lock_t* fd_lock_create(int fd, fd_locks_t *fd_locks) {
+#ifdef MAX_FDLOCKS_PREALLOC
+ if (fd < fd_locks_prealloc) {
+ return &fd_locks->fd_lock_array[fd];
+ }
+#endif
+ fd_lock_item_t* item;
+ fd_lock_list_t *list = to_fd_lock_list(fd_locks);
+
+ for (item = TAILQ_FIRST(list);
+ item != (fd_lock_item_t *) NULL && item->fd != fd;
+ item = TAILQ_NEXT(item, link));
+
+ if (item == (fd_lock_item_t *) NULL) {
+ item = (fd_lock_item_t *) mem_alloc(sizeof(fd_lock_item_t));
+ if (item == (fd_lock_item_t *) NULL) {
+ errno = ENOMEM;
+ return (NULL);
+ }
+ item->fd = fd;
+ item->refs = 1;
+ item->fd_lock.active = FALSE;
+ cond_init(&item->fd_lock.cv, 0, (void *) 0);
+ TAILQ_INSERT_HEAD(list, item, link);
+ } else {
+ item->refs++;
+ }
+ return &item->fd_lock;
+}
+
+/* de-allocate per-fd lock */
+static inline
+void fd_lock_destroy(int fd, fd_lock_t *fd_lock, fd_locks_t *fd_locks) {
+#ifdef MAX_FDLOCKS_PREALLOC
+ if (fd < fd_locks_prealloc)
+ return;
+#endif
+ fd_lock_item_t* item = to_fd_lock_item(fd_lock);
+ item->refs--;
+ if (item->refs <= 0) {
+ TAILQ_REMOVE(to_fd_lock_list(fd_locks), item, link);
+ cond_destroy(&item->fd_lock.cv);
+ mem_free(item, sizeof (*item));
+ }
+}
+
+#endif /* _CLNT_FD_LOCKS_H */
diff --git a/src/clnt_vc.c b/src/clnt_vc.c
index ec58892..2f3dde6 100644
--- a/src/clnt_vc.c
+++ b/src/clnt_vc.c
@@ -67,6 +67,7 @@
#include <rpc/rpc.h>
#include "rpc_com.h"
+#include "clnt_fd_locks.h"
#define MCALL_MSG_SIZE 24
@@ -110,6 +111,7 @@ static int write_vc(void *, void *, int);
struct ct_data {
int ct_fd; /* connection's fd */
+ fd_lock_t *ct_fd_lock;
bool_t ct_closeit; /* close it on destroy */
struct timeval ct_wait; /* wait interval in milliseconds */
bool_t ct_waitset; /* wait set by clnt_control? */
@@ -124,27 +126,32 @@ struct ct_data {
};
/*
- * This machinery implements per-fd locks for MT-safety. It is not
- * sufficient to do per-CLIENT handle locks for MT-safety because a
- * user may create more than one CLIENT handle with the same fd behind
- * it. Therfore, we allocate an array of flags (vc_fd_locks), protected
- * by the clnt_fd_lock mutex, and an array (vc_cv) of condition variables
- * similarly protected. Vc_fd_lock[fd] == 1 => a call is active on some
- * CLIENT handle created for that fd.
- * The current implementation holds locks across the entire RPC and reply.
- * Yes, this is silly, and as soon as this code is proven to work, this
- * should be the first thing fixed. One step at a time.
+ * This machinery implements per-fd locks for MT-safety. It is not
+ * sufficient to do per-CLIENT handle locks for MT-safety because a
+ * user may create more than one CLIENT handle with the same fd behind
+ * it.
+ *
+ * We keep track of a list of per-fd locks, protected by the clnt_fd_lock
+ * mutex. Each per-fd lock consists of a predicate indicating whether is
+ * active or not: fd_lock->active == TRUE => a call is active on some
+ * CLIENT handle created for that fd. Each fd predicate is guarded by a
+ * condition variable so that the global mutex can be unlocked while
+ * waiting for the predicate to change.
+ *
+ * The current implementation holds locks across the entire RPC and reply,
+ * including retransmissions. Yes, this is silly, and as soon as this
+ * code is proven to work, this should be the first thing fixed. One step
+ * at a time.
*/
-static int *vc_fd_locks;
+static fd_locks_t *vc_fd_locks;
extern pthread_mutex_t disrupt_lock;
extern mutex_t clnt_fd_lock;
-static cond_t *vc_cv;
-#define release_fd_lock(fd, mask) { \
+#define release_fd_lock(fd_lock, mask) { \
mutex_lock(&clnt_fd_lock); \
- vc_fd_locks[fd] = 0; \
+ fd_lock->active = FALSE; \
mutex_unlock(&clnt_fd_lock); \
thr_sigsetmask(SIG_SETMASK, &(mask), (sigset_t *) NULL); \
- cond_signal(&vc_cv[fd]); \
+ cond_signal(&fd_lock->cv); \
}
static const char clnt_vc_errstr[] = "%s : %s";
@@ -181,6 +188,7 @@ clnt_vc_create(fd, raddr, prog, vers, sendsz, recvsz)
struct sockaddr_storage ss;
socklen_t slen;
struct __rpc_sockinfo si;
+ fd_lock_t *fd_lock;
mutex_lock(&disrupt_lock);
if (disrupt == 0)
@@ -201,49 +209,22 @@ clnt_vc_create(fd, raddr, prog, vers, sendsz, recvsz)
sigfillset(&newmask);
thr_sigsetmask(SIG_SETMASK, &newmask, &mask);
mutex_lock(&clnt_fd_lock);
- if (vc_fd_locks == (int *) NULL) {
- size_t cv_allocsz, fd_allocsz;
- unsigned int dtbsize = __rpc_dtbsize();
- struct rpc_createerr *ce = &get_rpc_createerr();
-
- if ( (size_t) dtbsize > SIZE_MAX/sizeof(cond_t)) {
- mutex_unlock(&clnt_fd_lock);
- thr_sigsetmask(SIG_SETMASK, &(mask), NULL);
- ce->cf_stat = RPC_SYSTEMERROR;
- ce->cf_error.re_errno = EOVERFLOW;
- goto err;
- }
-
- fd_allocsz = dtbsize * sizeof (int);
- vc_fd_locks = (int *) mem_alloc(fd_allocsz);
- if (vc_fd_locks == (int *) NULL) {
- mutex_unlock(&clnt_fd_lock);
- thr_sigsetmask(SIG_SETMASK, &(mask), NULL);
- ce->cf_stat = RPC_SYSTEMERROR;
- ce->cf_error.re_errno = ENOMEM;
- goto err;
- } else
- memset(vc_fd_locks, '\0', fd_allocsz);
-
- assert(vc_cv == (cond_t *) NULL);
- cv_allocsz = dtbsize * sizeof (cond_t);
- vc_cv = (cond_t *) mem_alloc(cv_allocsz);
- if (vc_cv == (cond_t *) NULL) {
- mem_free(vc_fd_locks, fd_allocsz);
- vc_fd_locks = (int *) NULL;
- mutex_unlock(&clnt_fd_lock);
- thr_sigsetmask(SIG_SETMASK, &(mask), NULL);
+ if (vc_fd_locks == (fd_locks_t *) NULL) {
+ vc_fd_locks = fd_locks_init();
+ if (vc_fd_locks == (fd_locks_t *) NULL) {
+ struct rpc_createerr *ce = &get_rpc_createerr();
ce->cf_stat = RPC_SYSTEMERROR;
- ce->cf_error.re_errno = ENOMEM;
+ ce->cf_error.re_errno = errno;
goto err;
- } else {
- int i;
-
- for (i = 0; i < dtbsize; i++)
- cond_init(&vc_cv[i], 0, (void *) 0);
}
- } else
- assert(vc_cv != (cond_t *) NULL);
+ }
+ fd_lock = fd_lock_create(fd, vc_fd_locks);
+ if (fd_lock == (fd_lock_t *) NULL) {
+ struct rpc_createerr *ce = &get_rpc_createerr();
+ ce->cf_stat = RPC_SYSTEMERROR;
+ ce->cf_error.re_errno = errno;
+ goto err;
+ }
/*
* Do not hold mutex during connect
@@ -279,6 +260,7 @@ clnt_vc_create(fd, raddr, prog, vers, sendsz, recvsz)
* Set up private data struct
*/
ct->ct_fd = fd;
+ ct->ct_fd_lock = fd_lock;
ct->ct_wait.tv_usec = 0;
ct->ct_waitset = FALSE;
ct->ct_addr.buf = malloc(raddr->maxlen);
@@ -361,17 +343,15 @@ clnt_vc_call(cl, proc, xdr_args, args_ptr, xdr_results, results_ptr, timeout)
bool_t shipnow;
int refreshes = 2;
sigset_t mask, newmask;
- int rpc_lock_value;
assert(cl != NULL);
sigfillset(&newmask);
thr_sigsetmask(SIG_SETMASK, &newmask, &mask);
mutex_lock(&clnt_fd_lock);
- while (vc_fd_locks[ct->ct_fd])
- cond_wait(&vc_cv[ct->ct_fd], &clnt_fd_lock);
- rpc_lock_value = 1;
- vc_fd_locks[ct->ct_fd] = rpc_lock_value;
+ while (ct->ct_fd_lock->active)
+ cond_wait(&ct->ct_fd_lock->cv, &clnt_fd_lock);
+ ct->ct_fd_lock->active = TRUE;
mutex_unlock(&clnt_fd_lock);
if (!ct->ct_waitset) {
/* If time is not within limits, we ignore it. */
@@ -395,22 +375,22 @@ call_again:
if (ct->ct_error.re_status == RPC_SUCCESS)
ct->ct_error.re_status = RPC_CANTENCODEARGS;
(void)xdrrec_endofrecord(xdrs, TRUE);
- release_fd_lock(ct->ct_fd, mask);
+ release_fd_lock(ct->ct_fd_lock, mask);
return (ct->ct_error.re_status);
}
if (! xdrrec_endofrecord(xdrs, shipnow)) {
- release_fd_lock(ct->ct_fd, mask);
+ release_fd_lock(ct->ct_fd_lock, mask);
return (ct->ct_error.re_status = RPC_CANTSEND);
}
if (! shipnow) {
- release_fd_lock(ct->ct_fd, mask);
+ release_fd_lock(ct->ct_fd_lock, mask);
return (RPC_SUCCESS);
}
/*
* Hack to provide rpc-based message passing
*/
if (timeout.tv_sec == 0 && timeout.tv_usec == 0) {
- release_fd_lock(ct->ct_fd, mask);
+ release_fd_lock(ct->ct_fd_lock, mask);
return(ct->ct_error.re_status = RPC_TIMEDOUT);
}
@@ -424,14 +404,14 @@ call_again:
reply_msg.acpted_rply.ar_results.where = NULL;
reply_msg.acpted_rply.ar_results.proc = (xdrproc_t)xdr_void;
if (! xdrrec_skiprecord(xdrs)) {
- release_fd_lock(ct->ct_fd, mask);
+ release_fd_lock(ct->ct_fd_lock, mask);
return (ct->ct_error.re_status);
}
/* now decode and validate the response header */
if (! xdr_replymsg(xdrs, &reply_msg)) {
if (ct->ct_error.re_status == RPC_SUCCESS)
continue;
- release_fd_lock(ct->ct_fd, mask);
+ release_fd_lock(ct->ct_fd_lock, mask);
return (ct->ct_error.re_status);
}
if (reply_msg.rm_xid == x_id)
@@ -464,7 +444,7 @@ call_again:
if (refreshes-- && AUTH_REFRESH(cl->cl_auth, &reply_msg))
goto call_again;
} /* end of unsuccessful completion */
- release_fd_lock(ct->ct_fd, mask);
+ release_fd_lock(ct->ct_fd_lock, mask);
return (ct->ct_error.re_status);
}
@@ -502,13 +482,13 @@ clnt_vc_freeres(cl, xdr_res, res_ptr)
sigfillset(&newmask);
thr_sigsetmask(SIG_SETMASK, &newmask, &mask);
mutex_lock(&clnt_fd_lock);
- while (vc_fd_locks[ct->ct_fd])
- cond_wait(&vc_cv[ct->ct_fd], &clnt_fd_lock);
+ while (ct->ct_fd_lock->active)
+ cond_wait(&ct->ct_fd_lock->cv, &clnt_fd_lock);
xdrs->x_op = XDR_FREE;
dummy = (*xdr_res)(xdrs, res_ptr);
mutex_unlock(&clnt_fd_lock);
thr_sigsetmask(SIG_SETMASK, &(mask), NULL);
- cond_signal(&vc_cv[ct->ct_fd]);
+ cond_signal(&ct->ct_fd_lock->cv);
return dummy;
}
@@ -530,7 +510,6 @@ clnt_vc_control(cl, request, info)
void *infop = info;
sigset_t mask;
sigset_t newmask;
- int rpc_lock_value;
u_int32_t tmp;
u_int32_t ltmp;
@@ -541,20 +520,19 @@ clnt_vc_control(cl, request, info)
sigfillset(&newmask);
thr_sigsetmask(SIG_SETMASK, &newmask, &mask);
mutex_lock(&clnt_fd_lock);
- while (vc_fd_locks[ct->ct_fd])
- cond_wait(&vc_cv[ct->ct_fd], &clnt_fd_lock);
- rpc_lock_value = 1;
- vc_fd_locks[ct->ct_fd] = rpc_lock_value;
+ while (ct->ct_fd_lock->active)
+ cond_wait(&ct->ct_fd_lock->cv, &clnt_fd_lock);
+ ct->ct_fd_lock->active = TRUE;
mutex_unlock(&clnt_fd_lock);
switch (request) {
case CLSET_FD_CLOSE:
ct->ct_closeit = TRUE;
- release_fd_lock(ct->ct_fd, mask);
+ release_fd_lock(ct->ct_fd_lock, mask);
return (TRUE);
case CLSET_FD_NCLOSE:
ct->ct_closeit = FALSE;
- release_fd_lock(ct->ct_fd, mask);
+ release_fd_lock(ct->ct_fd_lock, mask);
return (TRUE);
default:
break;
@@ -562,13 +540,13 @@ clnt_vc_control(cl, request, info)
/* for other requests which use info */
if (info == NULL) {
- release_fd_lock(ct->ct_fd, mask);
+ release_fd_lock(ct->ct_fd_lock, mask);
return (FALSE);
}
switch (request) {
case CLSET_TIMEOUT:
if (time_not_ok((struct timeval *)info)) {
- release_fd_lock(ct->ct_fd, mask);
+ release_fd_lock(ct->ct_fd_lock, mask);
return (FALSE);
}
ct->ct_wait = *(struct timeval *)infop;
@@ -588,7 +566,7 @@ clnt_vc_control(cl, request, info)
*(struct netbuf *)info = ct->ct_addr;
break;
case CLSET_SVC_ADDR: /* set to new address */
- release_fd_lock(ct->ct_fd, mask);
+ release_fd_lock(ct->ct_fd_lock, mask);
return (FALSE);
case CLGET_XID:
/*
@@ -642,10 +620,10 @@ clnt_vc_control(cl, request, info)
break;
default:
- release_fd_lock(ct->ct_fd, mask);
+ release_fd_lock(ct->ct_fd_lock, mask);
return (FALSE);
}
- release_fd_lock(ct->ct_fd, mask);
+ release_fd_lock(ct->ct_fd_lock, mask);
return (TRUE);
}
@@ -666,8 +644,8 @@ clnt_vc_destroy(cl)
sigfillset(&newmask);
thr_sigsetmask(SIG_SETMASK, &newmask, &mask);
mutex_lock(&clnt_fd_lock);
- while (vc_fd_locks[ct_fd])
- cond_wait(&vc_cv[ct_fd], &clnt_fd_lock);
+ while (ct->ct_fd_lock->active)
+ cond_wait(&ct->ct_fd_lock->cv, &clnt_fd_lock);
if (ct->ct_closeit && ct->ct_fd != -1) {
(void)close(ct->ct_fd);
}
@@ -680,9 +658,10 @@ clnt_vc_destroy(cl)
if (cl->cl_tp && cl->cl_tp[0])
mem_free(cl->cl_tp, strlen(cl->cl_tp) +1);
mem_free(cl, sizeof(CLIENT));
+ cond_signal(&ct->ct_fd_lock->cv);
+ fd_lock_destroy(ct_fd, ct->ct_fd_lock, vc_fd_locks);
mutex_unlock(&clnt_fd_lock);
thr_sigsetmask(SIG_SETMASK, &(mask), NULL);
- cond_signal(&vc_cv[ct_fd]);
}
/*