diff options
-rw-r--r-- | src/clnt_dg.c | 116 | ||||
-rw-r--r-- | src/clnt_fd_locks.h | 205 | ||||
-rw-r--r-- | src/clnt_vc.c | 149 | ||||
-rw-r--r-- | tirpc/reentrant.h | 1 |
4 files changed, 317 insertions, 154 deletions
diff --git a/src/clnt_dg.c b/src/clnt_dg.c index eb5467f..df402ec 100644 --- a/src/clnt_dg.c +++ b/src/clnt_dg.c @@ -53,6 +53,7 @@ #include <unistd.h> #include <err.h> #include "rpc_com.h" +#include "clnt_fd_locks.h" #ifdef IP_RECVERR #include <asm/types.h> @@ -78,24 +79,28 @@ static void clnt_dg_destroy(CLIENT *); * This machinery implements per-fd locks for MT-safety. It is not * sufficient to do per-CLIENT handle locks for MT-safety because a * user may create more than one CLIENT handle with the same fd behind - * it. Therfore, we allocate an array of flags (dg_fd_locks), protected - * by the clnt_fd_lock mutex, and an array (dg_cv) of condition variables - * similarly protected. Dg_fd_lock[fd] == 1 => a call is activte on some - * CLIENT handle created for that fd. + * it. + * + * We keep track of a list of per-fd locks, protected by the clnt_fd_lock + * mutex. Each per-fd lock consists of a predicate indicating whether is + * active or not: fd_lock->active == TRUE => a call is active on some + * CLIENT handle created for that fd. Each fd predicate is guarded by a + * condition variable so that the global mutex can be unlocked while + * waiting for the predicate to change. + * * The current implementation holds locks across the entire RPC and reply, * including retransmissions. Yes, this is silly, and as soon as this * code is proven to work, this should be the first thing fixed. One step * at a time. */ -static int *dg_fd_locks; +static fd_locks_t *dg_fd_locks; extern mutex_t clnt_fd_lock; -static cond_t *dg_cv; -#define release_fd_lock(fd, mask) { \ +#define release_fd_lock(fd_lock, mask) { \ mutex_lock(&clnt_fd_lock); \ - dg_fd_locks[fd] = 0; \ + fd_lock->active = FALSE; \ mutex_unlock(&clnt_fd_lock); \ thr_sigsetmask(SIG_SETMASK, &(mask), NULL); \ - cond_signal(&dg_cv[fd]); \ + cond_signal(&fd_lock->cv); \ } static const char mem_err_clnt_dg[] = "clnt_dg_create: out of memory"; @@ -107,6 +112,7 @@ static const char mem_err_clnt_dg[] = "clnt_dg_create: out of memory"; */ struct cu_data { int cu_fd; /* connections fd */ + fd_lock_t *cu_fd_lock; bool_t cu_closeit; /* opened by library */ struct sockaddr_storage cu_raddr; /* remote address */ int cu_rlen; @@ -155,47 +161,20 @@ clnt_dg_create(fd, svcaddr, program, version, sendsz, recvsz) sigset_t newmask; struct __rpc_sockinfo si; int one = 1; + fd_lock_t *fd_lock; sigfillset(&newmask); thr_sigsetmask(SIG_SETMASK, &newmask, &mask); mutex_lock(&clnt_fd_lock); - if (dg_fd_locks == (int *) NULL) { - size_t cv_allocsz, fd_allocsz; - unsigned int dtbsize = __rpc_dtbsize(); - - if ( (size_t) dtbsize > SIZE_MAX/sizeof(cond_t)) { - mutex_unlock(&clnt_fd_lock); - thr_sigsetmask(SIG_SETMASK, &(mask), NULL); - errno = EOVERFLOW; - goto err1; - } - - fd_allocsz = dtbsize * sizeof (int); - dg_fd_locks = (int *) mem_alloc(fd_allocsz); - if (dg_fd_locks == (int *) NULL) { - mutex_unlock(&clnt_fd_lock); - thr_sigsetmask(SIG_SETMASK, &(mask), NULL); - errno = ENOMEM; - goto err1; - } else - memset(dg_fd_locks, '\0', fd_allocsz); - - cv_allocsz = dtbsize * sizeof (cond_t); - dg_cv = (cond_t *) mem_alloc(cv_allocsz); - if (dg_cv == (cond_t *) NULL) { - mem_free(dg_fd_locks, fd_allocsz); - dg_fd_locks = (int *) NULL; - mutex_unlock(&clnt_fd_lock); - thr_sigsetmask(SIG_SETMASK, &(mask), NULL); - errno = ENOMEM; + if (dg_fd_locks == (fd_locks_t *) NULL) { + dg_fd_locks = fd_locks_init(); + if (dg_fd_locks == (fd_locks_t *) NULL) { goto err1; - } else { - int i; - - for (i = 0; i < dtbsize; i++) - cond_init(&dg_cv[i], 0, (void *) 0); } } + fd_lock = fd_lock_create(fd, dg_fd_locks); + if (fd_lock == (fd_lock_t *) NULL) + goto err1; mutex_unlock(&clnt_fd_lock); thr_sigsetmask(SIG_SETMASK, &(mask), NULL); @@ -274,6 +253,7 @@ clnt_dg_create(fd, svcaddr, program, version, sendsz, recvsz) */ cu->cu_closeit = FALSE; cu->cu_fd = fd; + cu->cu_fd_lock = fd_lock; cl->cl_ops = clnt_dg_ops(); cl->cl_private = (caddr_t)(void *)cu; cl->cl_auth = authnone_create(); @@ -319,17 +299,15 @@ clnt_dg_call(cl, proc, xargs, argsp, xresults, resultsp, utimeout) sigset_t newmask; socklen_t salen; ssize_t recvlen = 0; - int rpc_lock_value; u_int32_t xid, inval, outval; outlen = 0; sigfillset(&newmask); thr_sigsetmask(SIG_SETMASK, &newmask, &mask); mutex_lock(&clnt_fd_lock); - while (dg_fd_locks[cu->cu_fd]) - cond_wait(&dg_cv[cu->cu_fd], &clnt_fd_lock); - rpc_lock_value = 1; - dg_fd_locks[cu->cu_fd] = rpc_lock_value; + while (cu->cu_fd_lock->active) + cond_wait(&cu->cu_fd_lock->cv, &clnt_fd_lock); + cu->cu_fd_lock->active = TRUE; mutex_unlock(&clnt_fd_lock); if (cu->cu_total.tv_usec == -1) { timeout = utimeout; /* use supplied timeout */ @@ -473,7 +451,7 @@ get_reply: mem_free(cbuf, (outlen + 256)); e = (struct sock_extended_err *) CMSG_DATA(cmsg); cu->cu_error.re_errno = e->ee_errno; - release_fd_lock(cu->cu_fd, mask); + release_fd_lock(cu->cu_fd_lock, mask); return (cu->cu_error.re_status = RPC_CANTRECV); } mem_free(cbuf, (outlen + 256)); @@ -553,7 +531,7 @@ get_reply: } out: - release_fd_lock(cu->cu_fd, mask); + release_fd_lock(cu->cu_fd_lock, mask); return (cu->cu_error.re_status); } @@ -582,13 +560,14 @@ clnt_dg_freeres(cl, xdr_res, res_ptr) sigfillset(&newmask); thr_sigsetmask(SIG_SETMASK, &newmask, &mask); mutex_lock(&clnt_fd_lock); - while (dg_fd_locks[cu->cu_fd]) - cond_wait(&dg_cv[cu->cu_fd], &clnt_fd_lock); + while (cu->cu_fd_lock->active) + cond_wait(&cu->cu_fd_lock->cv, &clnt_fd_lock); + cu->cu_fd_lock->active = TRUE; xdrs->x_op = XDR_FREE; dummy = (*xdr_res)(xdrs, res_ptr); mutex_unlock(&clnt_fd_lock); thr_sigsetmask(SIG_SETMASK, &mask, NULL); - cond_signal(&dg_cv[cu->cu_fd]); + cond_signal(&cu->cu_fd_lock->cv); return (dummy); } @@ -609,36 +588,34 @@ clnt_dg_control(cl, request, info) struct netbuf *addr; sigset_t mask; sigset_t newmask; - int rpc_lock_value; sigfillset(&newmask); thr_sigsetmask(SIG_SETMASK, &newmask, &mask); mutex_lock(&clnt_fd_lock); - while (dg_fd_locks[cu->cu_fd]) - cond_wait(&dg_cv[cu->cu_fd], &clnt_fd_lock); - rpc_lock_value = 1; - dg_fd_locks[cu->cu_fd] = rpc_lock_value; + while (cu->cu_fd_lock->active) + cond_wait(&cu->cu_fd_lock->cv, &clnt_fd_lock); + cu->cu_fd_lock->active = TRUE; mutex_unlock(&clnt_fd_lock); switch (request) { case CLSET_FD_CLOSE: cu->cu_closeit = TRUE; - release_fd_lock(cu->cu_fd, mask); + release_fd_lock(cu->cu_fd_lock, mask); return (TRUE); case CLSET_FD_NCLOSE: cu->cu_closeit = FALSE; - release_fd_lock(cu->cu_fd, mask); + release_fd_lock(cu->cu_fd_lock, mask); return (TRUE); } /* for other requests which use info */ if (info == NULL) { - release_fd_lock(cu->cu_fd, mask); + release_fd_lock(cu->cu_fd_lock, mask); return (FALSE); } switch (request) { case CLSET_TIMEOUT: if (time_not_ok((struct timeval *)info)) { - release_fd_lock(cu->cu_fd, mask); + release_fd_lock(cu->cu_fd_lock, mask); return (FALSE); } cu->cu_total = *(struct timeval *)info; @@ -652,7 +629,7 @@ clnt_dg_control(cl, request, info) break; case CLSET_RETRY_TIMEOUT: if (time_not_ok((struct timeval *)info)) { - release_fd_lock(cu->cu_fd, mask); + release_fd_lock(cu->cu_fd_lock, mask); return (FALSE); } cu->cu_wait = *(struct timeval *)info; @@ -672,7 +649,7 @@ clnt_dg_control(cl, request, info) case CLSET_SVC_ADDR: /* set to new address */ addr = (struct netbuf *)info; if (addr->len < sizeof cu->cu_raddr) { - release_fd_lock(cu->cu_fd, mask); + release_fd_lock(cu->cu_fd_lock, mask); return (FALSE); } (void) memcpy(&cu->cu_raddr, addr->buf, addr->len); @@ -735,10 +712,10 @@ clnt_dg_control(cl, request, info) cu->cu_connect = *(int *)info; break; default: - release_fd_lock(cu->cu_fd, mask); + release_fd_lock(cu->cu_fd_lock, mask); return (FALSE); } - release_fd_lock(cu->cu_fd, mask); + release_fd_lock(cu->cu_fd_lock, mask); return (TRUE); } @@ -754,8 +731,8 @@ clnt_dg_destroy(cl) sigfillset(&newmask); thr_sigsetmask(SIG_SETMASK, &newmask, &mask); mutex_lock(&clnt_fd_lock); - while (dg_fd_locks[cu_fd]) - cond_wait(&dg_cv[cu_fd], &clnt_fd_lock); + while (cu->cu_fd_lock->active) + cond_wait(&cu->cu_fd_lock->cv, &clnt_fd_lock); if (cu->cu_closeit) (void)close(cu_fd); XDR_DESTROY(&(cu->cu_outxdrs)); @@ -765,9 +742,10 @@ clnt_dg_destroy(cl) if (cl->cl_tp && cl->cl_tp[0]) mem_free(cl->cl_tp, strlen(cl->cl_tp) +1); mem_free(cl, sizeof (CLIENT)); + cond_signal(&cu->cu_fd_lock->cv); + fd_lock_destroy(cu_fd, cu->cu_fd_lock, dg_fd_locks); mutex_unlock(&clnt_fd_lock); thr_sigsetmask(SIG_SETMASK, &mask, NULL); - cond_signal(&dg_cv[cu_fd]); } static struct clnt_ops * diff --git a/src/clnt_fd_locks.h b/src/clnt_fd_locks.h new file mode 100644 index 0000000..8263071 --- /dev/null +++ b/src/clnt_fd_locks.h @@ -0,0 +1,205 @@ +/* + * debug.h -- debugging routines for libtirpc + * + * Copyright (c) 2020 SUSE LINUX GmbH, Nuernberg, Germany. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * - Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * - Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * - Neither the name of Sun Microsystems, Inc. nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef _CLNT_FD_LOCKS_H +#define _CLNT_FD_LOCKS_H + +#include <sys/queue.h> +#include <errno.h> +#include <reentrant.h> +#include <rpc/xdr.h> + + +/* + * This utility manages a list of per-fd locks for the clients. + * + * If MAX_FDLOCKS_PREALLOC is defined, a number of pre-fd locks will be + * pre-allocated. This number is the minimum of MAX_FDLOCKS_PREALLOC or + * the process soft limit of allowed fds. + */ +#ifdef MAX_FDLOCKS_PREALLOC +static unsigned int fd_locks_prealloc = 0; +#endif + +/* per-fd lock */ +struct fd_lock_t { + bool_t active; + cond_t cv; +}; +typedef struct fd_lock_t fd_lock_t; + + +/* internal type to store per-fd locks in a list */ +struct fd_lock_item_t { + /* fd_lock_t first so we can cast to fd_lock_item_t */ + fd_lock_t fd_lock; + int fd; + unsigned int refs; + TAILQ_ENTRY(fd_lock_item_t) link; +}; +typedef struct fd_lock_item_t fd_lock_item_t; +#define to_fd_lock_item(fdlock_t_ptr) ((fd_lock_item_t*) fdlock_t_ptr) + + +/* internal list of per-fd locks */ +typedef TAILQ_HEAD(,fd_lock_item_t) fd_lock_list_t; + + +#ifdef MAX_FDLOCKS_PREALLOC + +/* With pre-allocation, keep track of both an array and a list */ +struct fd_locks_t { + fd_lock_list_t fd_lock_list; + fd_lock_t *fd_lock_array; +}; +typedef struct fd_locks_t fd_locks_t; +#define to_fd_lock_list(fd_locks_t_ptr) (&fd_locks_t_ptr->fd_lock_list) + +#else + +/* With no pre-allocation, just keep track of a list */ +typedef fd_lock_list_t fd_locks_t; +#define to_fd_lock_list(fd_locks_t_ptr) ((fd_lock_list_t *) fd_locks_t_ptr) + +#endif + + +/* allocate fd locks */ +static inline +fd_locks_t* fd_locks_init() { + fd_locks_t *fd_locks; + + fd_locks = (fd_locks_t *) mem_alloc(sizeof(fd_locks_t)); + if (fd_locks == (fd_locks_t *) NULL) { + errno = ENOMEM; + return (NULL); + } + TAILQ_INIT(to_fd_lock_list(fd_locks)); + +#ifdef MAX_FDLOCKS_PREALLOC + size_t fd_lock_arraysz; + + if (fd_locks_prealloc == 0) { + unsigned int dtbsize = __rpc_dtbsize(); + if (0 < dtbsize && dtbsize < MAX_FDLOCKS_PREALLOC) + fd_locks_prealloc = dtbsize; + else + fd_locks_prealloc = MAX_FDLOCKS_PREALLOC; + } + + if ( (size_t) fd_locks_prealloc > SIZE_MAX/sizeof(fd_lock_t)) { + errno = EOVERFLOW; + return (NULL); + } + + fd_lock_arraysz = fd_locks_prealloc * sizeof (fd_lock_t); + fd_locks->fd_lock_array = (fd_lock_t *) mem_alloc(fd_lock_arraysz); + if (fd_locks->fd_lock_array == (fd_lock_t *) NULL) { + errno = ENOMEM; + return (NULL); + } + else { + int i; + + for (i = 0; i < fd_locks_prealloc; i++) { + fd_locks->fd_lock_array[i].active = FALSE; + cond_init(&fd_locks->fd_lock_array[i].cv, 0, (void *) 0); + } + } +#endif + + return fd_locks; +} + +/* de-allocate fd locks */ +static inline +void fd_locks_destroy(fd_locks_t *fd_locks) { +#ifdef MAX_FDLOCKS_PREALLOC + fd_lock_t *array = fd_locks->fd_lock_array; + mem_free(array, fd_locks_prealloc * sizeof (fd_lock_t)); +#endif + fd_lock_item_t *item; + fd_lock_list_t *list = to_fd_lock_list(fd_locks); + + TAILQ_FOREACH(item, list, link) { + cond_destroy(&item->fd_lock.cv); + mem_free(item, sizeof (*item)); + } + mem_free(fd_locks, sizeof (*fd_locks)); +} + +/* allocate per-fd lock */ +static inline +fd_lock_t* fd_lock_create(int fd, fd_locks_t *fd_locks) { +#ifdef MAX_FDLOCKS_PREALLOC + if (fd < fd_locks_prealloc) { + return &fd_locks->fd_lock_array[fd]; + } +#endif + fd_lock_item_t* item; + fd_lock_list_t *list = to_fd_lock_list(fd_locks); + + for (item = TAILQ_FIRST(list); + item != (fd_lock_item_t *) NULL && item->fd != fd; + item = TAILQ_NEXT(item, link)); + + if (item == (fd_lock_item_t *) NULL) { + item = (fd_lock_item_t *) mem_alloc(sizeof(fd_lock_item_t)); + if (item == (fd_lock_item_t *) NULL) { + errno = ENOMEM; + return (NULL); + } + item->fd = fd; + item->refs = 1; + item->fd_lock.active = FALSE; + cond_init(&item->fd_lock.cv, 0, (void *) 0); + TAILQ_INSERT_HEAD(list, item, link); + } else { + item->refs++; + } + return &item->fd_lock; +} + +/* de-allocate per-fd lock */ +static inline +void fd_lock_destroy(int fd, fd_lock_t *fd_lock, fd_locks_t *fd_locks) { +#ifdef MAX_FDLOCKS_PREALLOC + if (fd < fd_locks_prealloc) + return; +#endif + fd_lock_item_t* item = to_fd_lock_item(fd_lock); + item->refs--; + if (item->refs <= 0) { + TAILQ_REMOVE(to_fd_lock_list(fd_locks), item, link); + cond_destroy(&item->fd_lock.cv); + mem_free(item, sizeof (*item)); + } +} + +#endif /* _CLNT_FD_LOCKS_H */ diff --git a/src/clnt_vc.c b/src/clnt_vc.c index ec58892..2f3dde6 100644 --- a/src/clnt_vc.c +++ b/src/clnt_vc.c @@ -67,6 +67,7 @@ #include <rpc/rpc.h> #include "rpc_com.h" +#include "clnt_fd_locks.h" #define MCALL_MSG_SIZE 24 @@ -110,6 +111,7 @@ static int write_vc(void *, void *, int); struct ct_data { int ct_fd; /* connection's fd */ + fd_lock_t *ct_fd_lock; bool_t ct_closeit; /* close it on destroy */ struct timeval ct_wait; /* wait interval in milliseconds */ bool_t ct_waitset; /* wait set by clnt_control? */ @@ -124,27 +126,32 @@ struct ct_data { }; /* - * This machinery implements per-fd locks for MT-safety. It is not - * sufficient to do per-CLIENT handle locks for MT-safety because a - * user may create more than one CLIENT handle with the same fd behind - * it. Therfore, we allocate an array of flags (vc_fd_locks), protected - * by the clnt_fd_lock mutex, and an array (vc_cv) of condition variables - * similarly protected. Vc_fd_lock[fd] == 1 => a call is active on some - * CLIENT handle created for that fd. - * The current implementation holds locks across the entire RPC and reply. - * Yes, this is silly, and as soon as this code is proven to work, this - * should be the first thing fixed. One step at a time. + * This machinery implements per-fd locks for MT-safety. It is not + * sufficient to do per-CLIENT handle locks for MT-safety because a + * user may create more than one CLIENT handle with the same fd behind + * it. + * + * We keep track of a list of per-fd locks, protected by the clnt_fd_lock + * mutex. Each per-fd lock consists of a predicate indicating whether is + * active or not: fd_lock->active == TRUE => a call is active on some + * CLIENT handle created for that fd. Each fd predicate is guarded by a + * condition variable so that the global mutex can be unlocked while + * waiting for the predicate to change. + * + * The current implementation holds locks across the entire RPC and reply, + * including retransmissions. Yes, this is silly, and as soon as this + * code is proven to work, this should be the first thing fixed. One step + * at a time. */ -static int *vc_fd_locks; +static fd_locks_t *vc_fd_locks; extern pthread_mutex_t disrupt_lock; extern mutex_t clnt_fd_lock; -static cond_t *vc_cv; -#define release_fd_lock(fd, mask) { \ +#define release_fd_lock(fd_lock, mask) { \ mutex_lock(&clnt_fd_lock); \ - vc_fd_locks[fd] = 0; \ + fd_lock->active = FALSE; \ mutex_unlock(&clnt_fd_lock); \ thr_sigsetmask(SIG_SETMASK, &(mask), (sigset_t *) NULL); \ - cond_signal(&vc_cv[fd]); \ + cond_signal(&fd_lock->cv); \ } static const char clnt_vc_errstr[] = "%s : %s"; @@ -181,6 +188,7 @@ clnt_vc_create(fd, raddr, prog, vers, sendsz, recvsz) struct sockaddr_storage ss; socklen_t slen; struct __rpc_sockinfo si; + fd_lock_t *fd_lock; mutex_lock(&disrupt_lock); if (disrupt == 0) @@ -201,49 +209,22 @@ clnt_vc_create(fd, raddr, prog, vers, sendsz, recvsz) sigfillset(&newmask); thr_sigsetmask(SIG_SETMASK, &newmask, &mask); mutex_lock(&clnt_fd_lock); - if (vc_fd_locks == (int *) NULL) { - size_t cv_allocsz, fd_allocsz; - unsigned int dtbsize = __rpc_dtbsize(); - struct rpc_createerr *ce = &get_rpc_createerr(); - - if ( (size_t) dtbsize > SIZE_MAX/sizeof(cond_t)) { - mutex_unlock(&clnt_fd_lock); - thr_sigsetmask(SIG_SETMASK, &(mask), NULL); - ce->cf_stat = RPC_SYSTEMERROR; - ce->cf_error.re_errno = EOVERFLOW; - goto err; - } - - fd_allocsz = dtbsize * sizeof (int); - vc_fd_locks = (int *) mem_alloc(fd_allocsz); - if (vc_fd_locks == (int *) NULL) { - mutex_unlock(&clnt_fd_lock); - thr_sigsetmask(SIG_SETMASK, &(mask), NULL); - ce->cf_stat = RPC_SYSTEMERROR; - ce->cf_error.re_errno = ENOMEM; - goto err; - } else - memset(vc_fd_locks, '\0', fd_allocsz); - - assert(vc_cv == (cond_t *) NULL); - cv_allocsz = dtbsize * sizeof (cond_t); - vc_cv = (cond_t *) mem_alloc(cv_allocsz); - if (vc_cv == (cond_t *) NULL) { - mem_free(vc_fd_locks, fd_allocsz); - vc_fd_locks = (int *) NULL; - mutex_unlock(&clnt_fd_lock); - thr_sigsetmask(SIG_SETMASK, &(mask), NULL); + if (vc_fd_locks == (fd_locks_t *) NULL) { + vc_fd_locks = fd_locks_init(); + if (vc_fd_locks == (fd_locks_t *) NULL) { + struct rpc_createerr *ce = &get_rpc_createerr(); ce->cf_stat = RPC_SYSTEMERROR; - ce->cf_error.re_errno = ENOMEM; + ce->cf_error.re_errno = errno; goto err; - } else { - int i; - - for (i = 0; i < dtbsize; i++) - cond_init(&vc_cv[i], 0, (void *) 0); } - } else - assert(vc_cv != (cond_t *) NULL); + } + fd_lock = fd_lock_create(fd, vc_fd_locks); + if (fd_lock == (fd_lock_t *) NULL) { + struct rpc_createerr *ce = &get_rpc_createerr(); + ce->cf_stat = RPC_SYSTEMERROR; + ce->cf_error.re_errno = errno; + goto err; + } /* * Do not hold mutex during connect @@ -279,6 +260,7 @@ clnt_vc_create(fd, raddr, prog, vers, sendsz, recvsz) * Set up private data struct */ ct->ct_fd = fd; + ct->ct_fd_lock = fd_lock; ct->ct_wait.tv_usec = 0; ct->ct_waitset = FALSE; ct->ct_addr.buf = malloc(raddr->maxlen); @@ -361,17 +343,15 @@ clnt_vc_call(cl, proc, xdr_args, args_ptr, xdr_results, results_ptr, timeout) bool_t shipnow; int refreshes = 2; sigset_t mask, newmask; - int rpc_lock_value; assert(cl != NULL); sigfillset(&newmask); thr_sigsetmask(SIG_SETMASK, &newmask, &mask); mutex_lock(&clnt_fd_lock); - while (vc_fd_locks[ct->ct_fd]) - cond_wait(&vc_cv[ct->ct_fd], &clnt_fd_lock); - rpc_lock_value = 1; - vc_fd_locks[ct->ct_fd] = rpc_lock_value; + while (ct->ct_fd_lock->active) + cond_wait(&ct->ct_fd_lock->cv, &clnt_fd_lock); + ct->ct_fd_lock->active = TRUE; mutex_unlock(&clnt_fd_lock); if (!ct->ct_waitset) { /* If time is not within limits, we ignore it. */ @@ -395,22 +375,22 @@ call_again: if (ct->ct_error.re_status == RPC_SUCCESS) ct->ct_error.re_status = RPC_CANTENCODEARGS; (void)xdrrec_endofrecord(xdrs, TRUE); - release_fd_lock(ct->ct_fd, mask); + release_fd_lock(ct->ct_fd_lock, mask); return (ct->ct_error.re_status); } if (! xdrrec_endofrecord(xdrs, shipnow)) { - release_fd_lock(ct->ct_fd, mask); + release_fd_lock(ct->ct_fd_lock, mask); return (ct->ct_error.re_status = RPC_CANTSEND); } if (! shipnow) { - release_fd_lock(ct->ct_fd, mask); + release_fd_lock(ct->ct_fd_lock, mask); return (RPC_SUCCESS); } /* * Hack to provide rpc-based message passing */ if (timeout.tv_sec == 0 && timeout.tv_usec == 0) { - release_fd_lock(ct->ct_fd, mask); + release_fd_lock(ct->ct_fd_lock, mask); return(ct->ct_error.re_status = RPC_TIMEDOUT); } @@ -424,14 +404,14 @@ call_again: reply_msg.acpted_rply.ar_results.where = NULL; reply_msg.acpted_rply.ar_results.proc = (xdrproc_t)xdr_void; if (! xdrrec_skiprecord(xdrs)) { - release_fd_lock(ct->ct_fd, mask); + release_fd_lock(ct->ct_fd_lock, mask); return (ct->ct_error.re_status); } /* now decode and validate the response header */ if (! xdr_replymsg(xdrs, &reply_msg)) { if (ct->ct_error.re_status == RPC_SUCCESS) continue; - release_fd_lock(ct->ct_fd, mask); + release_fd_lock(ct->ct_fd_lock, mask); return (ct->ct_error.re_status); } if (reply_msg.rm_xid == x_id) @@ -464,7 +444,7 @@ call_again: if (refreshes-- && AUTH_REFRESH(cl->cl_auth, &reply_msg)) goto call_again; } /* end of unsuccessful completion */ - release_fd_lock(ct->ct_fd, mask); + release_fd_lock(ct->ct_fd_lock, mask); return (ct->ct_error.re_status); } @@ -502,13 +482,13 @@ clnt_vc_freeres(cl, xdr_res, res_ptr) sigfillset(&newmask); thr_sigsetmask(SIG_SETMASK, &newmask, &mask); mutex_lock(&clnt_fd_lock); - while (vc_fd_locks[ct->ct_fd]) - cond_wait(&vc_cv[ct->ct_fd], &clnt_fd_lock); + while (ct->ct_fd_lock->active) + cond_wait(&ct->ct_fd_lock->cv, &clnt_fd_lock); xdrs->x_op = XDR_FREE; dummy = (*xdr_res)(xdrs, res_ptr); mutex_unlock(&clnt_fd_lock); thr_sigsetmask(SIG_SETMASK, &(mask), NULL); - cond_signal(&vc_cv[ct->ct_fd]); + cond_signal(&ct->ct_fd_lock->cv); return dummy; } @@ -530,7 +510,6 @@ clnt_vc_control(cl, request, info) void *infop = info; sigset_t mask; sigset_t newmask; - int rpc_lock_value; u_int32_t tmp; u_int32_t ltmp; @@ -541,20 +520,19 @@ clnt_vc_control(cl, request, info) sigfillset(&newmask); thr_sigsetmask(SIG_SETMASK, &newmask, &mask); mutex_lock(&clnt_fd_lock); - while (vc_fd_locks[ct->ct_fd]) - cond_wait(&vc_cv[ct->ct_fd], &clnt_fd_lock); - rpc_lock_value = 1; - vc_fd_locks[ct->ct_fd] = rpc_lock_value; + while (ct->ct_fd_lock->active) + cond_wait(&ct->ct_fd_lock->cv, &clnt_fd_lock); + ct->ct_fd_lock->active = TRUE; mutex_unlock(&clnt_fd_lock); switch (request) { case CLSET_FD_CLOSE: ct->ct_closeit = TRUE; - release_fd_lock(ct->ct_fd, mask); + release_fd_lock(ct->ct_fd_lock, mask); return (TRUE); case CLSET_FD_NCLOSE: ct->ct_closeit = FALSE; - release_fd_lock(ct->ct_fd, mask); + release_fd_lock(ct->ct_fd_lock, mask); return (TRUE); default: break; @@ -562,13 +540,13 @@ clnt_vc_control(cl, request, info) /* for other requests which use info */ if (info == NULL) { - release_fd_lock(ct->ct_fd, mask); + release_fd_lock(ct->ct_fd_lock, mask); return (FALSE); } switch (request) { case CLSET_TIMEOUT: if (time_not_ok((struct timeval *)info)) { - release_fd_lock(ct->ct_fd, mask); + release_fd_lock(ct->ct_fd_lock, mask); return (FALSE); } ct->ct_wait = *(struct timeval *)infop; @@ -588,7 +566,7 @@ clnt_vc_control(cl, request, info) *(struct netbuf *)info = ct->ct_addr; break; case CLSET_SVC_ADDR: /* set to new address */ - release_fd_lock(ct->ct_fd, mask); + release_fd_lock(ct->ct_fd_lock, mask); return (FALSE); case CLGET_XID: /* @@ -642,10 +620,10 @@ clnt_vc_control(cl, request, info) break; default: - release_fd_lock(ct->ct_fd, mask); + release_fd_lock(ct->ct_fd_lock, mask); return (FALSE); } - release_fd_lock(ct->ct_fd, mask); + release_fd_lock(ct->ct_fd_lock, mask); return (TRUE); } @@ -666,8 +644,8 @@ clnt_vc_destroy(cl) sigfillset(&newmask); thr_sigsetmask(SIG_SETMASK, &newmask, &mask); mutex_lock(&clnt_fd_lock); - while (vc_fd_locks[ct_fd]) - cond_wait(&vc_cv[ct_fd], &clnt_fd_lock); + while (ct->ct_fd_lock->active) + cond_wait(&ct->ct_fd_lock->cv, &clnt_fd_lock); if (ct->ct_closeit && ct->ct_fd != -1) { (void)close(ct->ct_fd); } @@ -680,9 +658,10 @@ clnt_vc_destroy(cl) if (cl->cl_tp && cl->cl_tp[0]) mem_free(cl->cl_tp, strlen(cl->cl_tp) +1); mem_free(cl, sizeof(CLIENT)); + cond_signal(&ct->ct_fd_lock->cv); + fd_lock_destroy(ct_fd, ct->ct_fd_lock, vc_fd_locks); mutex_unlock(&clnt_fd_lock); thr_sigsetmask(SIG_SETMASK, &(mask), NULL); - cond_signal(&vc_cv[ct_fd]); } /* diff --git a/tirpc/reentrant.h b/tirpc/reentrant.h index 5f5c96e..5bb581a 100644 --- a/tirpc/reentrant.h +++ b/tirpc/reentrant.h @@ -57,6 +57,7 @@ #define mutex_unlock(m) pthread_mutex_unlock(m) #define cond_init(c, a, p) pthread_cond_init(c, a) +#define cond_destroy(c) pthread_cond_destroy(c) #define cond_signal(m) pthread_cond_signal(m) #define cond_broadcast(m) pthread_cond_broadcast(m) #define cond_wait(c, m) pthread_cond_wait(c, m) |