summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/clnt_dg.c116
-rw-r--r--src/clnt_fd_locks.h205
-rw-r--r--src/clnt_vc.c149
-rw-r--r--tirpc/reentrant.h1
4 files changed, 317 insertions, 154 deletions
diff --git a/src/clnt_dg.c b/src/clnt_dg.c
index eb5467f..df402ec 100644
--- a/src/clnt_dg.c
+++ b/src/clnt_dg.c
@@ -53,6 +53,7 @@
#include <unistd.h>
#include <err.h>
#include "rpc_com.h"
+#include "clnt_fd_locks.h"
#ifdef IP_RECVERR
#include <asm/types.h>
@@ -78,24 +79,28 @@ static void clnt_dg_destroy(CLIENT *);
* This machinery implements per-fd locks for MT-safety. It is not
* sufficient to do per-CLIENT handle locks for MT-safety because a
* user may create more than one CLIENT handle with the same fd behind
- * it. Therfore, we allocate an array of flags (dg_fd_locks), protected
- * by the clnt_fd_lock mutex, and an array (dg_cv) of condition variables
- * similarly protected. Dg_fd_lock[fd] == 1 => a call is activte on some
- * CLIENT handle created for that fd.
+ * it.
+ *
+ * We keep track of a list of per-fd locks, protected by the clnt_fd_lock
+ * mutex. Each per-fd lock consists of a predicate indicating whether is
+ * active or not: fd_lock->active == TRUE => a call is active on some
+ * CLIENT handle created for that fd. Each fd predicate is guarded by a
+ * condition variable so that the global mutex can be unlocked while
+ * waiting for the predicate to change.
+ *
* The current implementation holds locks across the entire RPC and reply,
* including retransmissions. Yes, this is silly, and as soon as this
* code is proven to work, this should be the first thing fixed. One step
* at a time.
*/
-static int *dg_fd_locks;
+static fd_locks_t *dg_fd_locks;
extern mutex_t clnt_fd_lock;
-static cond_t *dg_cv;
-#define release_fd_lock(fd, mask) { \
+#define release_fd_lock(fd_lock, mask) { \
mutex_lock(&clnt_fd_lock); \
- dg_fd_locks[fd] = 0; \
+ fd_lock->active = FALSE; \
mutex_unlock(&clnt_fd_lock); \
thr_sigsetmask(SIG_SETMASK, &(mask), NULL); \
- cond_signal(&dg_cv[fd]); \
+ cond_signal(&fd_lock->cv); \
}
static const char mem_err_clnt_dg[] = "clnt_dg_create: out of memory";
@@ -107,6 +112,7 @@ static const char mem_err_clnt_dg[] = "clnt_dg_create: out of memory";
*/
struct cu_data {
int cu_fd; /* connections fd */
+ fd_lock_t *cu_fd_lock;
bool_t cu_closeit; /* opened by library */
struct sockaddr_storage cu_raddr; /* remote address */
int cu_rlen;
@@ -155,47 +161,20 @@ clnt_dg_create(fd, svcaddr, program, version, sendsz, recvsz)
sigset_t newmask;
struct __rpc_sockinfo si;
int one = 1;
+ fd_lock_t *fd_lock;
sigfillset(&newmask);
thr_sigsetmask(SIG_SETMASK, &newmask, &mask);
mutex_lock(&clnt_fd_lock);
- if (dg_fd_locks == (int *) NULL) {
- size_t cv_allocsz, fd_allocsz;
- unsigned int dtbsize = __rpc_dtbsize();
-
- if ( (size_t) dtbsize > SIZE_MAX/sizeof(cond_t)) {
- mutex_unlock(&clnt_fd_lock);
- thr_sigsetmask(SIG_SETMASK, &(mask), NULL);
- errno = EOVERFLOW;
- goto err1;
- }
-
- fd_allocsz = dtbsize * sizeof (int);
- dg_fd_locks = (int *) mem_alloc(fd_allocsz);
- if (dg_fd_locks == (int *) NULL) {
- mutex_unlock(&clnt_fd_lock);
- thr_sigsetmask(SIG_SETMASK, &(mask), NULL);
- errno = ENOMEM;
- goto err1;
- } else
- memset(dg_fd_locks, '\0', fd_allocsz);
-
- cv_allocsz = dtbsize * sizeof (cond_t);
- dg_cv = (cond_t *) mem_alloc(cv_allocsz);
- if (dg_cv == (cond_t *) NULL) {
- mem_free(dg_fd_locks, fd_allocsz);
- dg_fd_locks = (int *) NULL;
- mutex_unlock(&clnt_fd_lock);
- thr_sigsetmask(SIG_SETMASK, &(mask), NULL);
- errno = ENOMEM;
+ if (dg_fd_locks == (fd_locks_t *) NULL) {
+ dg_fd_locks = fd_locks_init();
+ if (dg_fd_locks == (fd_locks_t *) NULL) {
goto err1;
- } else {
- int i;
-
- for (i = 0; i < dtbsize; i++)
- cond_init(&dg_cv[i], 0, (void *) 0);
}
}
+ fd_lock = fd_lock_create(fd, dg_fd_locks);
+ if (fd_lock == (fd_lock_t *) NULL)
+ goto err1;
mutex_unlock(&clnt_fd_lock);
thr_sigsetmask(SIG_SETMASK, &(mask), NULL);
@@ -274,6 +253,7 @@ clnt_dg_create(fd, svcaddr, program, version, sendsz, recvsz)
*/
cu->cu_closeit = FALSE;
cu->cu_fd = fd;
+ cu->cu_fd_lock = fd_lock;
cl->cl_ops = clnt_dg_ops();
cl->cl_private = (caddr_t)(void *)cu;
cl->cl_auth = authnone_create();
@@ -319,17 +299,15 @@ clnt_dg_call(cl, proc, xargs, argsp, xresults, resultsp, utimeout)
sigset_t newmask;
socklen_t salen;
ssize_t recvlen = 0;
- int rpc_lock_value;
u_int32_t xid, inval, outval;
outlen = 0;
sigfillset(&newmask);
thr_sigsetmask(SIG_SETMASK, &newmask, &mask);
mutex_lock(&clnt_fd_lock);
- while (dg_fd_locks[cu->cu_fd])
- cond_wait(&dg_cv[cu->cu_fd], &clnt_fd_lock);
- rpc_lock_value = 1;
- dg_fd_locks[cu->cu_fd] = rpc_lock_value;
+ while (cu->cu_fd_lock->active)
+ cond_wait(&cu->cu_fd_lock->cv, &clnt_fd_lock);
+ cu->cu_fd_lock->active = TRUE;
mutex_unlock(&clnt_fd_lock);
if (cu->cu_total.tv_usec == -1) {
timeout = utimeout; /* use supplied timeout */
@@ -473,7 +451,7 @@ get_reply:
mem_free(cbuf, (outlen + 256));
e = (struct sock_extended_err *) CMSG_DATA(cmsg);
cu->cu_error.re_errno = e->ee_errno;
- release_fd_lock(cu->cu_fd, mask);
+ release_fd_lock(cu->cu_fd_lock, mask);
return (cu->cu_error.re_status = RPC_CANTRECV);
}
mem_free(cbuf, (outlen + 256));
@@ -553,7 +531,7 @@ get_reply:
}
out:
- release_fd_lock(cu->cu_fd, mask);
+ release_fd_lock(cu->cu_fd_lock, mask);
return (cu->cu_error.re_status);
}
@@ -582,13 +560,14 @@ clnt_dg_freeres(cl, xdr_res, res_ptr)
sigfillset(&newmask);
thr_sigsetmask(SIG_SETMASK, &newmask, &mask);
mutex_lock(&clnt_fd_lock);
- while (dg_fd_locks[cu->cu_fd])
- cond_wait(&dg_cv[cu->cu_fd], &clnt_fd_lock);
+ while (cu->cu_fd_lock->active)
+ cond_wait(&cu->cu_fd_lock->cv, &clnt_fd_lock);
+ cu->cu_fd_lock->active = TRUE;
xdrs->x_op = XDR_FREE;
dummy = (*xdr_res)(xdrs, res_ptr);
mutex_unlock(&clnt_fd_lock);
thr_sigsetmask(SIG_SETMASK, &mask, NULL);
- cond_signal(&dg_cv[cu->cu_fd]);
+ cond_signal(&cu->cu_fd_lock->cv);
return (dummy);
}
@@ -609,36 +588,34 @@ clnt_dg_control(cl, request, info)
struct netbuf *addr;
sigset_t mask;
sigset_t newmask;
- int rpc_lock_value;
sigfillset(&newmask);
thr_sigsetmask(SIG_SETMASK, &newmask, &mask);
mutex_lock(&clnt_fd_lock);
- while (dg_fd_locks[cu->cu_fd])
- cond_wait(&dg_cv[cu->cu_fd], &clnt_fd_lock);
- rpc_lock_value = 1;
- dg_fd_locks[cu->cu_fd] = rpc_lock_value;
+ while (cu->cu_fd_lock->active)
+ cond_wait(&cu->cu_fd_lock->cv, &clnt_fd_lock);
+ cu->cu_fd_lock->active = TRUE;
mutex_unlock(&clnt_fd_lock);
switch (request) {
case CLSET_FD_CLOSE:
cu->cu_closeit = TRUE;
- release_fd_lock(cu->cu_fd, mask);
+ release_fd_lock(cu->cu_fd_lock, mask);
return (TRUE);
case CLSET_FD_NCLOSE:
cu->cu_closeit = FALSE;
- release_fd_lock(cu->cu_fd, mask);
+ release_fd_lock(cu->cu_fd_lock, mask);
return (TRUE);
}
/* for other requests which use info */
if (info == NULL) {
- release_fd_lock(cu->cu_fd, mask);
+ release_fd_lock(cu->cu_fd_lock, mask);
return (FALSE);
}
switch (request) {
case CLSET_TIMEOUT:
if (time_not_ok((struct timeval *)info)) {
- release_fd_lock(cu->cu_fd, mask);
+ release_fd_lock(cu->cu_fd_lock, mask);
return (FALSE);
}
cu->cu_total = *(struct timeval *)info;
@@ -652,7 +629,7 @@ clnt_dg_control(cl, request, info)
break;
case CLSET_RETRY_TIMEOUT:
if (time_not_ok((struct timeval *)info)) {
- release_fd_lock(cu->cu_fd, mask);
+ release_fd_lock(cu->cu_fd_lock, mask);
return (FALSE);
}
cu->cu_wait = *(struct timeval *)info;
@@ -672,7 +649,7 @@ clnt_dg_control(cl, request, info)
case CLSET_SVC_ADDR: /* set to new address */
addr = (struct netbuf *)info;
if (addr->len < sizeof cu->cu_raddr) {
- release_fd_lock(cu->cu_fd, mask);
+ release_fd_lock(cu->cu_fd_lock, mask);
return (FALSE);
}
(void) memcpy(&cu->cu_raddr, addr->buf, addr->len);
@@ -735,10 +712,10 @@ clnt_dg_control(cl, request, info)
cu->cu_connect = *(int *)info;
break;
default:
- release_fd_lock(cu->cu_fd, mask);
+ release_fd_lock(cu->cu_fd_lock, mask);
return (FALSE);
}
- release_fd_lock(cu->cu_fd, mask);
+ release_fd_lock(cu->cu_fd_lock, mask);
return (TRUE);
}
@@ -754,8 +731,8 @@ clnt_dg_destroy(cl)
sigfillset(&newmask);
thr_sigsetmask(SIG_SETMASK, &newmask, &mask);
mutex_lock(&clnt_fd_lock);
- while (dg_fd_locks[cu_fd])
- cond_wait(&dg_cv[cu_fd], &clnt_fd_lock);
+ while (cu->cu_fd_lock->active)
+ cond_wait(&cu->cu_fd_lock->cv, &clnt_fd_lock);
if (cu->cu_closeit)
(void)close(cu_fd);
XDR_DESTROY(&(cu->cu_outxdrs));
@@ -765,9 +742,10 @@ clnt_dg_destroy(cl)
if (cl->cl_tp && cl->cl_tp[0])
mem_free(cl->cl_tp, strlen(cl->cl_tp) +1);
mem_free(cl, sizeof (CLIENT));
+ cond_signal(&cu->cu_fd_lock->cv);
+ fd_lock_destroy(cu_fd, cu->cu_fd_lock, dg_fd_locks);
mutex_unlock(&clnt_fd_lock);
thr_sigsetmask(SIG_SETMASK, &mask, NULL);
- cond_signal(&dg_cv[cu_fd]);
}
static struct clnt_ops *
diff --git a/src/clnt_fd_locks.h b/src/clnt_fd_locks.h
new file mode 100644
index 0000000..8263071
--- /dev/null
+++ b/src/clnt_fd_locks.h
@@ -0,0 +1,205 @@
+/*
+ * debug.h -- debugging routines for libtirpc
+ *
+ * Copyright (c) 2020 SUSE LINUX GmbH, Nuernberg, Germany.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ * - Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * - Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ * - Neither the name of Sun Microsystems, Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _CLNT_FD_LOCKS_H
+#define _CLNT_FD_LOCKS_H
+
+#include <sys/queue.h>
+#include <errno.h>
+#include <reentrant.h>
+#include <rpc/xdr.h>
+
+
+/*
+ * This utility manages a list of per-fd locks for the clients.
+ *
+ * If MAX_FDLOCKS_PREALLOC is defined, a number of pre-fd locks will be
+ * pre-allocated. This number is the minimum of MAX_FDLOCKS_PREALLOC or
+ * the process soft limit of allowed fds.
+ */
+#ifdef MAX_FDLOCKS_PREALLOC
+static unsigned int fd_locks_prealloc = 0;
+#endif
+
+/* per-fd lock */
+struct fd_lock_t {
+ bool_t active;
+ cond_t cv;
+};
+typedef struct fd_lock_t fd_lock_t;
+
+
+/* internal type to store per-fd locks in a list */
+struct fd_lock_item_t {
+ /* fd_lock_t first so we can cast to fd_lock_item_t */
+ fd_lock_t fd_lock;
+ int fd;
+ unsigned int refs;
+ TAILQ_ENTRY(fd_lock_item_t) link;
+};
+typedef struct fd_lock_item_t fd_lock_item_t;
+#define to_fd_lock_item(fdlock_t_ptr) ((fd_lock_item_t*) fdlock_t_ptr)
+
+
+/* internal list of per-fd locks */
+typedef TAILQ_HEAD(,fd_lock_item_t) fd_lock_list_t;
+
+
+#ifdef MAX_FDLOCKS_PREALLOC
+
+/* With pre-allocation, keep track of both an array and a list */
+struct fd_locks_t {
+ fd_lock_list_t fd_lock_list;
+ fd_lock_t *fd_lock_array;
+};
+typedef struct fd_locks_t fd_locks_t;
+#define to_fd_lock_list(fd_locks_t_ptr) (&fd_locks_t_ptr->fd_lock_list)
+
+#else
+
+/* With no pre-allocation, just keep track of a list */
+typedef fd_lock_list_t fd_locks_t;
+#define to_fd_lock_list(fd_locks_t_ptr) ((fd_lock_list_t *) fd_locks_t_ptr)
+
+#endif
+
+
+/* allocate fd locks */
+static inline
+fd_locks_t* fd_locks_init() {
+ fd_locks_t *fd_locks;
+
+ fd_locks = (fd_locks_t *) mem_alloc(sizeof(fd_locks_t));
+ if (fd_locks == (fd_locks_t *) NULL) {
+ errno = ENOMEM;
+ return (NULL);
+ }
+ TAILQ_INIT(to_fd_lock_list(fd_locks));
+
+#ifdef MAX_FDLOCKS_PREALLOC
+ size_t fd_lock_arraysz;
+
+ if (fd_locks_prealloc == 0) {
+ unsigned int dtbsize = __rpc_dtbsize();
+ if (0 < dtbsize && dtbsize < MAX_FDLOCKS_PREALLOC)
+ fd_locks_prealloc = dtbsize;
+ else
+ fd_locks_prealloc = MAX_FDLOCKS_PREALLOC;
+ }
+
+ if ( (size_t) fd_locks_prealloc > SIZE_MAX/sizeof(fd_lock_t)) {
+ errno = EOVERFLOW;
+ return (NULL);
+ }
+
+ fd_lock_arraysz = fd_locks_prealloc * sizeof (fd_lock_t);
+ fd_locks->fd_lock_array = (fd_lock_t *) mem_alloc(fd_lock_arraysz);
+ if (fd_locks->fd_lock_array == (fd_lock_t *) NULL) {
+ errno = ENOMEM;
+ return (NULL);
+ }
+ else {
+ int i;
+
+ for (i = 0; i < fd_locks_prealloc; i++) {
+ fd_locks->fd_lock_array[i].active = FALSE;
+ cond_init(&fd_locks->fd_lock_array[i].cv, 0, (void *) 0);
+ }
+ }
+#endif
+
+ return fd_locks;
+}
+
+/* de-allocate fd locks */
+static inline
+void fd_locks_destroy(fd_locks_t *fd_locks) {
+#ifdef MAX_FDLOCKS_PREALLOC
+ fd_lock_t *array = fd_locks->fd_lock_array;
+ mem_free(array, fd_locks_prealloc * sizeof (fd_lock_t));
+#endif
+ fd_lock_item_t *item;
+ fd_lock_list_t *list = to_fd_lock_list(fd_locks);
+
+ TAILQ_FOREACH(item, list, link) {
+ cond_destroy(&item->fd_lock.cv);
+ mem_free(item, sizeof (*item));
+ }
+ mem_free(fd_locks, sizeof (*fd_locks));
+}
+
+/* allocate per-fd lock */
+static inline
+fd_lock_t* fd_lock_create(int fd, fd_locks_t *fd_locks) {
+#ifdef MAX_FDLOCKS_PREALLOC
+ if (fd < fd_locks_prealloc) {
+ return &fd_locks->fd_lock_array[fd];
+ }
+#endif
+ fd_lock_item_t* item;
+ fd_lock_list_t *list = to_fd_lock_list(fd_locks);
+
+ for (item = TAILQ_FIRST(list);
+ item != (fd_lock_item_t *) NULL && item->fd != fd;
+ item = TAILQ_NEXT(item, link));
+
+ if (item == (fd_lock_item_t *) NULL) {
+ item = (fd_lock_item_t *) mem_alloc(sizeof(fd_lock_item_t));
+ if (item == (fd_lock_item_t *) NULL) {
+ errno = ENOMEM;
+ return (NULL);
+ }
+ item->fd = fd;
+ item->refs = 1;
+ item->fd_lock.active = FALSE;
+ cond_init(&item->fd_lock.cv, 0, (void *) 0);
+ TAILQ_INSERT_HEAD(list, item, link);
+ } else {
+ item->refs++;
+ }
+ return &item->fd_lock;
+}
+
+/* de-allocate per-fd lock */
+static inline
+void fd_lock_destroy(int fd, fd_lock_t *fd_lock, fd_locks_t *fd_locks) {
+#ifdef MAX_FDLOCKS_PREALLOC
+ if (fd < fd_locks_prealloc)
+ return;
+#endif
+ fd_lock_item_t* item = to_fd_lock_item(fd_lock);
+ item->refs--;
+ if (item->refs <= 0) {
+ TAILQ_REMOVE(to_fd_lock_list(fd_locks), item, link);
+ cond_destroy(&item->fd_lock.cv);
+ mem_free(item, sizeof (*item));
+ }
+}
+
+#endif /* _CLNT_FD_LOCKS_H */
diff --git a/src/clnt_vc.c b/src/clnt_vc.c
index ec58892..2f3dde6 100644
--- a/src/clnt_vc.c
+++ b/src/clnt_vc.c
@@ -67,6 +67,7 @@
#include <rpc/rpc.h>
#include "rpc_com.h"
+#include "clnt_fd_locks.h"
#define MCALL_MSG_SIZE 24
@@ -110,6 +111,7 @@ static int write_vc(void *, void *, int);
struct ct_data {
int ct_fd; /* connection's fd */
+ fd_lock_t *ct_fd_lock;
bool_t ct_closeit; /* close it on destroy */
struct timeval ct_wait; /* wait interval in milliseconds */
bool_t ct_waitset; /* wait set by clnt_control? */
@@ -124,27 +126,32 @@ struct ct_data {
};
/*
- * This machinery implements per-fd locks for MT-safety. It is not
- * sufficient to do per-CLIENT handle locks for MT-safety because a
- * user may create more than one CLIENT handle with the same fd behind
- * it. Therfore, we allocate an array of flags (vc_fd_locks), protected
- * by the clnt_fd_lock mutex, and an array (vc_cv) of condition variables
- * similarly protected. Vc_fd_lock[fd] == 1 => a call is active on some
- * CLIENT handle created for that fd.
- * The current implementation holds locks across the entire RPC and reply.
- * Yes, this is silly, and as soon as this code is proven to work, this
- * should be the first thing fixed. One step at a time.
+ * This machinery implements per-fd locks for MT-safety. It is not
+ * sufficient to do per-CLIENT handle locks for MT-safety because a
+ * user may create more than one CLIENT handle with the same fd behind
+ * it.
+ *
+ * We keep track of a list of per-fd locks, protected by the clnt_fd_lock
+ * mutex. Each per-fd lock consists of a predicate indicating whether is
+ * active or not: fd_lock->active == TRUE => a call is active on some
+ * CLIENT handle created for that fd. Each fd predicate is guarded by a
+ * condition variable so that the global mutex can be unlocked while
+ * waiting for the predicate to change.
+ *
+ * The current implementation holds locks across the entire RPC and reply,
+ * including retransmissions. Yes, this is silly, and as soon as this
+ * code is proven to work, this should be the first thing fixed. One step
+ * at a time.
*/
-static int *vc_fd_locks;
+static fd_locks_t *vc_fd_locks;
extern pthread_mutex_t disrupt_lock;
extern mutex_t clnt_fd_lock;
-static cond_t *vc_cv;
-#define release_fd_lock(fd, mask) { \
+#define release_fd_lock(fd_lock, mask) { \
mutex_lock(&clnt_fd_lock); \
- vc_fd_locks[fd] = 0; \
+ fd_lock->active = FALSE; \
mutex_unlock(&clnt_fd_lock); \
thr_sigsetmask(SIG_SETMASK, &(mask), (sigset_t *) NULL); \
- cond_signal(&vc_cv[fd]); \
+ cond_signal(&fd_lock->cv); \
}
static const char clnt_vc_errstr[] = "%s : %s";
@@ -181,6 +188,7 @@ clnt_vc_create(fd, raddr, prog, vers, sendsz, recvsz)
struct sockaddr_storage ss;
socklen_t slen;
struct __rpc_sockinfo si;
+ fd_lock_t *fd_lock;
mutex_lock(&disrupt_lock);
if (disrupt == 0)
@@ -201,49 +209,22 @@ clnt_vc_create(fd, raddr, prog, vers, sendsz, recvsz)
sigfillset(&newmask);
thr_sigsetmask(SIG_SETMASK, &newmask, &mask);
mutex_lock(&clnt_fd_lock);
- if (vc_fd_locks == (int *) NULL) {
- size_t cv_allocsz, fd_allocsz;
- unsigned int dtbsize = __rpc_dtbsize();
- struct rpc_createerr *ce = &get_rpc_createerr();
-
- if ( (size_t) dtbsize > SIZE_MAX/sizeof(cond_t)) {
- mutex_unlock(&clnt_fd_lock);
- thr_sigsetmask(SIG_SETMASK, &(mask), NULL);
- ce->cf_stat = RPC_SYSTEMERROR;
- ce->cf_error.re_errno = EOVERFLOW;
- goto err;
- }
-
- fd_allocsz = dtbsize * sizeof (int);
- vc_fd_locks = (int *) mem_alloc(fd_allocsz);
- if (vc_fd_locks == (int *) NULL) {
- mutex_unlock(&clnt_fd_lock);
- thr_sigsetmask(SIG_SETMASK, &(mask), NULL);
- ce->cf_stat = RPC_SYSTEMERROR;
- ce->cf_error.re_errno = ENOMEM;
- goto err;
- } else
- memset(vc_fd_locks, '\0', fd_allocsz);
-
- assert(vc_cv == (cond_t *) NULL);
- cv_allocsz = dtbsize * sizeof (cond_t);
- vc_cv = (cond_t *) mem_alloc(cv_allocsz);
- if (vc_cv == (cond_t *) NULL) {
- mem_free(vc_fd_locks, fd_allocsz);
- vc_fd_locks = (int *) NULL;
- mutex_unlock(&clnt_fd_lock);
- thr_sigsetmask(SIG_SETMASK, &(mask), NULL);
+ if (vc_fd_locks == (fd_locks_t *) NULL) {
+ vc_fd_locks = fd_locks_init();
+ if (vc_fd_locks == (fd_locks_t *) NULL) {
+ struct rpc_createerr *ce = &get_rpc_createerr();
ce->cf_stat = RPC_SYSTEMERROR;
- ce->cf_error.re_errno = ENOMEM;
+ ce->cf_error.re_errno = errno;
goto err;
- } else {
- int i;
-
- for (i = 0; i < dtbsize; i++)
- cond_init(&vc_cv[i], 0, (void *) 0);
}
- } else
- assert(vc_cv != (cond_t *) NULL);
+ }
+ fd_lock = fd_lock_create(fd, vc_fd_locks);
+ if (fd_lock == (fd_lock_t *) NULL) {
+ struct rpc_createerr *ce = &get_rpc_createerr();
+ ce->cf_stat = RPC_SYSTEMERROR;
+ ce->cf_error.re_errno = errno;
+ goto err;
+ }
/*
* Do not hold mutex during connect
@@ -279,6 +260,7 @@ clnt_vc_create(fd, raddr, prog, vers, sendsz, recvsz)
* Set up private data struct
*/
ct->ct_fd = fd;
+ ct->ct_fd_lock = fd_lock;
ct->ct_wait.tv_usec = 0;
ct->ct_waitset = FALSE;
ct->ct_addr.buf = malloc(raddr->maxlen);
@@ -361,17 +343,15 @@ clnt_vc_call(cl, proc, xdr_args, args_ptr, xdr_results, results_ptr, timeout)
bool_t shipnow;
int refreshes = 2;
sigset_t mask, newmask;
- int rpc_lock_value;
assert(cl != NULL);
sigfillset(&newmask);
thr_sigsetmask(SIG_SETMASK, &newmask, &mask);
mutex_lock(&clnt_fd_lock);
- while (vc_fd_locks[ct->ct_fd])
- cond_wait(&vc_cv[ct->ct_fd], &clnt_fd_lock);
- rpc_lock_value = 1;
- vc_fd_locks[ct->ct_fd] = rpc_lock_value;
+ while (ct->ct_fd_lock->active)
+ cond_wait(&ct->ct_fd_lock->cv, &clnt_fd_lock);
+ ct->ct_fd_lock->active = TRUE;
mutex_unlock(&clnt_fd_lock);
if (!ct->ct_waitset) {
/* If time is not within limits, we ignore it. */
@@ -395,22 +375,22 @@ call_again:
if (ct->ct_error.re_status == RPC_SUCCESS)
ct->ct_error.re_status = RPC_CANTENCODEARGS;
(void)xdrrec_endofrecord(xdrs, TRUE);
- release_fd_lock(ct->ct_fd, mask);
+ release_fd_lock(ct->ct_fd_lock, mask);
return (ct->ct_error.re_status);
}
if (! xdrrec_endofrecord(xdrs, shipnow)) {
- release_fd_lock(ct->ct_fd, mask);
+ release_fd_lock(ct->ct_fd_lock, mask);
return (ct->ct_error.re_status = RPC_CANTSEND);
}
if (! shipnow) {
- release_fd_lock(ct->ct_fd, mask);
+ release_fd_lock(ct->ct_fd_lock, mask);
return (RPC_SUCCESS);
}
/*
* Hack to provide rpc-based message passing
*/
if (timeout.tv_sec == 0 && timeout.tv_usec == 0) {
- release_fd_lock(ct->ct_fd, mask);
+ release_fd_lock(ct->ct_fd_lock, mask);
return(ct->ct_error.re_status = RPC_TIMEDOUT);
}
@@ -424,14 +404,14 @@ call_again:
reply_msg.acpted_rply.ar_results.where = NULL;
reply_msg.acpted_rply.ar_results.proc = (xdrproc_t)xdr_void;
if (! xdrrec_skiprecord(xdrs)) {
- release_fd_lock(ct->ct_fd, mask);
+ release_fd_lock(ct->ct_fd_lock, mask);
return (ct->ct_error.re_status);
}
/* now decode and validate the response header */
if (! xdr_replymsg(xdrs, &reply_msg)) {
if (ct->ct_error.re_status == RPC_SUCCESS)
continue;
- release_fd_lock(ct->ct_fd, mask);
+ release_fd_lock(ct->ct_fd_lock, mask);
return (ct->ct_error.re_status);
}
if (reply_msg.rm_xid == x_id)
@@ -464,7 +444,7 @@ call_again:
if (refreshes-- && AUTH_REFRESH(cl->cl_auth, &reply_msg))
goto call_again;
} /* end of unsuccessful completion */
- release_fd_lock(ct->ct_fd, mask);
+ release_fd_lock(ct->ct_fd_lock, mask);
return (ct->ct_error.re_status);
}
@@ -502,13 +482,13 @@ clnt_vc_freeres(cl, xdr_res, res_ptr)
sigfillset(&newmask);
thr_sigsetmask(SIG_SETMASK, &newmask, &mask);
mutex_lock(&clnt_fd_lock);
- while (vc_fd_locks[ct->ct_fd])
- cond_wait(&vc_cv[ct->ct_fd], &clnt_fd_lock);
+ while (ct->ct_fd_lock->active)
+ cond_wait(&ct->ct_fd_lock->cv, &clnt_fd_lock);
xdrs->x_op = XDR_FREE;
dummy = (*xdr_res)(xdrs, res_ptr);
mutex_unlock(&clnt_fd_lock);
thr_sigsetmask(SIG_SETMASK, &(mask), NULL);
- cond_signal(&vc_cv[ct->ct_fd]);
+ cond_signal(&ct->ct_fd_lock->cv);
return dummy;
}
@@ -530,7 +510,6 @@ clnt_vc_control(cl, request, info)
void *infop = info;
sigset_t mask;
sigset_t newmask;
- int rpc_lock_value;
u_int32_t tmp;
u_int32_t ltmp;
@@ -541,20 +520,19 @@ clnt_vc_control(cl, request, info)
sigfillset(&newmask);
thr_sigsetmask(SIG_SETMASK, &newmask, &mask);
mutex_lock(&clnt_fd_lock);
- while (vc_fd_locks[ct->ct_fd])
- cond_wait(&vc_cv[ct->ct_fd], &clnt_fd_lock);
- rpc_lock_value = 1;
- vc_fd_locks[ct->ct_fd] = rpc_lock_value;
+ while (ct->ct_fd_lock->active)
+ cond_wait(&ct->ct_fd_lock->cv, &clnt_fd_lock);
+ ct->ct_fd_lock->active = TRUE;
mutex_unlock(&clnt_fd_lock);
switch (request) {
case CLSET_FD_CLOSE:
ct->ct_closeit = TRUE;
- release_fd_lock(ct->ct_fd, mask);
+ release_fd_lock(ct->ct_fd_lock, mask);
return (TRUE);
case CLSET_FD_NCLOSE:
ct->ct_closeit = FALSE;
- release_fd_lock(ct->ct_fd, mask);
+ release_fd_lock(ct->ct_fd_lock, mask);
return (TRUE);
default:
break;
@@ -562,13 +540,13 @@ clnt_vc_control(cl, request, info)
/* for other requests which use info */
if (info == NULL) {
- release_fd_lock(ct->ct_fd, mask);
+ release_fd_lock(ct->ct_fd_lock, mask);
return (FALSE);
}
switch (request) {
case CLSET_TIMEOUT:
if (time_not_ok((struct timeval *)info)) {
- release_fd_lock(ct->ct_fd, mask);
+ release_fd_lock(ct->ct_fd_lock, mask);
return (FALSE);
}
ct->ct_wait = *(struct timeval *)infop;
@@ -588,7 +566,7 @@ clnt_vc_control(cl, request, info)
*(struct netbuf *)info = ct->ct_addr;
break;
case CLSET_SVC_ADDR: /* set to new address */
- release_fd_lock(ct->ct_fd, mask);
+ release_fd_lock(ct->ct_fd_lock, mask);
return (FALSE);
case CLGET_XID:
/*
@@ -642,10 +620,10 @@ clnt_vc_control(cl, request, info)
break;
default:
- release_fd_lock(ct->ct_fd, mask);
+ release_fd_lock(ct->ct_fd_lock, mask);
return (FALSE);
}
- release_fd_lock(ct->ct_fd, mask);
+ release_fd_lock(ct->ct_fd_lock, mask);
return (TRUE);
}
@@ -666,8 +644,8 @@ clnt_vc_destroy(cl)
sigfillset(&newmask);
thr_sigsetmask(SIG_SETMASK, &newmask, &mask);
mutex_lock(&clnt_fd_lock);
- while (vc_fd_locks[ct_fd])
- cond_wait(&vc_cv[ct_fd], &clnt_fd_lock);
+ while (ct->ct_fd_lock->active)
+ cond_wait(&ct->ct_fd_lock->cv, &clnt_fd_lock);
if (ct->ct_closeit && ct->ct_fd != -1) {
(void)close(ct->ct_fd);
}
@@ -680,9 +658,10 @@ clnt_vc_destroy(cl)
if (cl->cl_tp && cl->cl_tp[0])
mem_free(cl->cl_tp, strlen(cl->cl_tp) +1);
mem_free(cl, sizeof(CLIENT));
+ cond_signal(&ct->ct_fd_lock->cv);
+ fd_lock_destroy(ct_fd, ct->ct_fd_lock, vc_fd_locks);
mutex_unlock(&clnt_fd_lock);
thr_sigsetmask(SIG_SETMASK, &(mask), NULL);
- cond_signal(&vc_cv[ct_fd]);
}
/*
diff --git a/tirpc/reentrant.h b/tirpc/reentrant.h
index 5f5c96e..5bb581a 100644
--- a/tirpc/reentrant.h
+++ b/tirpc/reentrant.h
@@ -57,6 +57,7 @@
#define mutex_unlock(m) pthread_mutex_unlock(m)
#define cond_init(c, a, p) pthread_cond_init(c, a)
+#define cond_destroy(c) pthread_cond_destroy(c)
#define cond_signal(m) pthread_cond_signal(m)
#define cond_broadcast(m) pthread_cond_broadcast(m)
#define cond_wait(c, m) pthread_cond_wait(c, m)