diff options
Diffstat (limited to 'PACE/pace/emulation/mqueue.c')
-rw-r--r-- | PACE/pace/emulation/mqueue.c | 285 |
1 files changed, 163 insertions, 122 deletions
diff --git a/PACE/pace/emulation/mqueue.c b/PACE/pace/emulation/mqueue.c index c1c7440a5fc..e30e927e60c 100644 --- a/PACE/pace/emulation/mqueue.c +++ b/PACE/pace/emulation/mqueue.c @@ -1,4 +1,19 @@ -/* $Id$ */ +/* $Id$ + * ============================================================================ + * + * = LIBRARY + * pace + * + * = FILENAME + * pace/emulation/mqueue.c + * + * = AUTHOR + * John Heitmann + * + * ============================================================================ */ + +#if PACE_LINUX + #include "pace/sys/mman.h" #include "pace/stdio.h" #include "pace/fcntl.h" @@ -6,15 +21,15 @@ #include "pace/stdlib.h" #include "pace/sys/types.h" #include "pace/pthread.h" +#include "pace/sys/stat.h" #include "pace/emulation/mqueue.h" -typedef struct mqd* pace_mqd_t; - typedef struct { pace_mq_attr attr; pace_size_t num_open; pace_pthread_mutex_t mutex; + pace_pthread_cond_t cond; pace_size_t head; pace_size_t freelist; } mqfile; @@ -28,26 +43,43 @@ typedef struct struct mq_attr attrdefault = { 0, 32, 256, 0 }; -pace_mqd_t mq_open (const char* name, int oflag, pace_mode_t mode, pace_mq_attr* attr) +/* This remains mq_open due to the macro in pace/mqueue.h */ +pace_mqd_t mq_open (const char* name, + int oflag, + pace_mode_t mode, + pace_mq_attr* attr) { int m_padding = sizeof (message_header); /* How much extra space per message do we need */ int f_padding = sizeof (mqfile); /* How much fixed padding is needed */ int mflags, mprot; int fd; - pace_size_t i; + int i; pace_size_t mapsize; char* mmaploc; + char* new_name; int create_mmap = 0; /* 1 if the file has never be inited */ - pace_pthread_mutexattr_t mattr; message_header* temp = 0; /*Used in initializaiton of mqueue*/ long index; /* index into the file */ pace_mqd_t result = pace_malloc (sizeof (struct mqd)); + pace_stat_s statbuf; +retry: if (attr == 0) { attr = &attrdefault; } + else + { + if (attr->mq_maxmsg < 0 || attr->mq_msgsize < 0) + { + errno = EBADF; + return (pace_mqd_t)-1; + } + } + /* Create a name that will go to /tmp with a unique name */ + new_name = malloc (256); + snprintf (new_name, 256, "/tmp/mq013028%s", name); /* Fix alignment */ if (attr->mq_msgsize % sizeof (long) != 0) { @@ -56,7 +88,7 @@ pace_mqd_t mq_open (const char* name, int oflag, pace_mode_t mode, pace_mq_attr* if (oflag & PACE_O_CREAT) { /* We need to protect access without the help of O_RDONLY in the fs */ - fd = pace_open ((name, PACE_O_RDWR | PACE_O_CREAT | PACE_O_EXCL, mode)); + fd = pace_open ((new_name, PACE_O_RDWR | PACE_O_CREAT | PACE_O_EXCL, mode & ~S_IXUSR)); if (fd == -1 && errno != EEXIST) { @@ -77,19 +109,49 @@ pace_mqd_t mq_open (const char* name, int oflag, pace_mode_t mode, pace_mq_attr* else { /* We want the existing file */ - fd = pace_open ((name, PACE_O_RDWR)); + fd = pace_open ((new_name, PACE_O_RDWR)); + if (fd == -1 && errno == ENOENT) + { + /* Something odd is going on */ + goto retry; + } + else if (fd == -1) + { + return (pace_mqd_t)-1; + } } } else { - fd = pace_open ((name, PACE_O_RDWR)); + fd = pace_open ((new_name, PACE_O_RDWR)); if (fd == -1) { - errno = ENOENT; return (pace_mqd_t)-1; } } + /* + The following loop makes shure that we haven't entered a race condition. If a file + has been created but not initialized, its IXUSR will not be set (see above). + */ + while (create_mmap == 0) + { + if (stat (new_name, &statbuf) == -1) + { + close (fd); + if (errno == ENOENT && (oflag & O_CREAT)) + { + goto retry; + } + return (pace_mqd_t)-1; + } + else if ((statbuf.st_mode & S_IXUSR) == 0) + { + break; + } + pace_sleep (1); + } + mapsize = f_padding + (attr->mq_msgsize + m_padding) * (attr->mq_maxmsg); mprot = PACE_PROT_READ | PACE_PROT_WRITE; mflags = PACE_MAP_SHARED; @@ -99,30 +161,43 @@ pace_mqd_t mq_open (const char* name, int oflag, pace_mode_t mode, pace_mq_attr* /* Create and 0 out the file */ if (pace_lseek (fd, mapsize, PACE_SEEK_SET) == -1) { + pace_unlink (new_name); return (pace_mqd_t)-1; } if (pace_write (fd, "", 1) != 1) { + pace_unlink (new_name); return (pace_mqd_t)-1; } - mmaploc = mmap (0, mapsize, mprot, mflags, fd, 0); - if ((int)mmaploc == -1) + mmaploc = pace_mmap (0, mapsize, mprot, mflags, fd, 0); + + if (mmaploc == MAP_FAILED) { + pace_unlink (new_name); return (pace_mqd_t)-1; } + + pace_close (fd); + pace_memset (mmaploc, 0, mapsize); - if (pace_pthread_mutexattr_init (&mattr) == -1) + if ((errno = pace_pthread_mutex_init (&(((mqfile*)mmaploc)->mutex), 0)) != 0) { + pace_unlink (new_name); + pace_munmap (mmaploc, mapsize); return (pace_mqd_t)-1; } - if (pace_pthread_mutex_init (&(((mqfile*)mmaploc)->mutex), &mattr) == -1) + if ((errno = pace_pthread_mutex_lock (&(((mqfile*)mmaploc)->mutex))) != 0) { + pace_unlink (new_name); + pace_munmap (mmaploc, mapsize); return (pace_mqd_t)-1; } - if (pace_pthread_mutex_lock (&(((mqfile*)mmaploc)->mutex)) == -1) + if ((errno = pace_pthread_cond_init (&(((mqfile*)mmaploc)->cond), 0)) != 0) { + pace_unlink (new_name); + pace_munmap (mmaploc, mapsize); return (pace_mqd_t)-1; } @@ -139,20 +214,30 @@ pace_mqd_t mq_open (const char* name, int oflag, pace_mode_t mode, pace_mq_attr* temp->next = 0; attr->mq_curmsgs = 0; ((mqfile*)mmaploc)->attr = *attr; + + /* Set S_IXUSR so that the file is known to be inited */ + if (pace_fchmod (fd, mode | S_IXUSR) == -1) + { + pace_unlink (new_name); + pace_munmap (mmaploc, mapsize); + return (pace_mqd_t)-1; + } } else { /* Just open the existing map */ - mmaploc = mmap (0, mapsize, mprot, mflags, fd, 0); - if ((int)mmaploc == -1) + mmaploc = pace_mmap (0, mapsize, mprot, mflags, fd, 0); + if (mmaploc == MAP_FAILED) { return (pace_mqd_t)-1; } + pace_close (fd); /* ???? Test here for race */ if (pace_pthread_mutex_lock (&(((mqfile*)mmaploc)->mutex)) == -1) { + pace_munmap (mmaploc, mapsize); return (pace_mqd_t)-1; } ((mqfile*)mmaploc)->attr.mq_flags = attr->mq_flags; @@ -163,6 +248,7 @@ pace_mqd_t mq_open (const char* name, int oflag, pace_mode_t mode, pace_mq_attr* if (pace_pthread_mutex_unlock (&(((mqfile*)mmaploc)->mutex)) == -1) { + pace_munmap (mmaploc, mapsize); return (pace_mqd_t)-1; } @@ -200,7 +286,10 @@ int mq_unlink (const char* name) return pace_unlink (name); } -int mq_send (pace_mqd_t mqdes, const char* ptr, pace_size_t length, unsigned int priority) +int mq_send (pace_mqd_t mqdes, + const char* ptr, + pace_size_t length, + unsigned int priority) { mqfile* queue = ((mqfile*)mqdes->mptr); long index, old_index; @@ -210,13 +299,16 @@ int mq_send (pace_mqd_t mqdes, const char* ptr, pace_size_t length, unsigned int errno = EBADF; return -1; } - if (queue->attr.mq_msgsize < length) + if (queue->attr.mq_msgsize < (int) length) { /* Message too long */ errno = EMSGSIZE; return -1; } - + if ((errno = pace_pthread_mutex_lock (&queue->mutex)) != 0) + { + return -1; + } /* If the queue is full... */ if (queue->attr.mq_curmsgs >= queue->attr.mq_maxmsg) { @@ -225,65 +317,70 @@ int mq_send (pace_mqd_t mqdes, const char* ptr, pace_size_t length, unsigned int errno = EAGAIN; return -1; } - else + while (queue->attr.mq_maxmsg <= queue->attr.mq_curmsgs) { - /* ???? */ + pace_pthread_cond_wait (&queue->cond, &queue->mutex); } } + + /* Fill in the fields of the header */ + ((message_header*)(&mqdes->mptr[queue->freelist]))->priority = priority; + ((message_header*)(&mqdes->mptr[queue->freelist]))->length = length; + pace_memcpy (((void*)(&mqdes->mptr[queue->freelist + sizeof (message_header)])), + ptr, length); + + /* Update the linked list */ + old_index = 0; + index = queue->head; + while (index != 0 && ((message_header*)(&mqdes->mptr[index]))->priority >= priority) + { + old_index = index; + index = ((message_header*)(&mqdes->mptr[index]))->next; + } + + /* If the msg goes at the head */ + if (old_index == 0) + { + queue->head = queue->freelist; + queue->freelist = ((message_header*)(&mqdes->mptr[queue->freelist]))->next; + ((message_header*)(&mqdes->mptr[queue->head]))->next = index; + } else { - if (pace_pthread_mutex_lock (&queue->mutex) == -1) - { - return -1; - } + ((message_header*)(&mqdes->mptr[old_index]))->next = queue->freelist; + old_index = queue->freelist; + queue->freelist = ((message_header*)(&mqdes->mptr[queue->freelist]))->next; + ((message_header*)(&mqdes->mptr[old_index]))->next = index; + } - queue->attr.mq_curmsgs++; - /* Fill in the fields of the header */ - ((message_header*)(&mqdes->mptr[queue->freelist]))->priority = priority; - ((message_header*)(&mqdes->mptr[queue->freelist]))->length = length; - pace_memcpy (((void*)(&mqdes->mptr[queue->freelist + sizeof (message_header)])), - ptr, length); - - /* Update the linked list */ - old_index = 0; - index = queue->head; - while (index != 0 && ((message_header*)(&mqdes->mptr[index]))->priority >= priority) - { - old_index = index; - index = ((message_header*)(&mqdes->mptr[index]))->next; - } - /* If the msg goes at the head */ - if (old_index == 0) - { - queue->head = queue->freelist; - queue->freelist = ((message_header*)(&mqdes->mptr[queue->freelist]))->next; - ((message_header*)(&mqdes->mptr[queue->head]))->next = index; - } - else - { - ((message_header*)(&mqdes->mptr[old_index]))->next = queue->freelist; - old_index = queue->freelist; - queue->freelist = ((message_header*)(&mqdes->mptr[queue->freelist]))->next; - ((message_header*)(&mqdes->mptr[old_index]))->next = index; - } - if (pace_pthread_mutex_unlock (&queue->mutex) == -1) + if (queue->attr.mq_curmsgs == 0) + { + /* Let other waiting threads know there is food on the table */ + if ((errno = pace_pthread_cond_signal (&((mqfile*)mqdes->mptr)->cond)) != 0) { return -1; } } + queue->attr.mq_curmsgs++; + + if ((errno = pace_pthread_mutex_unlock (&queue->mutex)) != 0) + { + return -1; + } + return 0; } pace_ssize_t mq_receive (pace_mqd_t mqdes, - char * msg_ptr, - pace_size_t msg_len, - unsigned int * nmsg_prio) + char * msg_ptr, + pace_size_t msg_len, + unsigned int * nmsg_prio) { mqfile* queue = ((mqfile*)mqdes->mptr); pace_size_t temp; - if (queue->attr.mq_msgsize > msg_len) + if (queue->attr.mq_msgsize > (long) msg_len) { errno = EMSGSIZE; return -1; @@ -336,7 +433,9 @@ int mq_getattr (pace_mqd_t mqdes, pace_mq_attr * mqstat) return 0; } -int mq_setattr(pace_mqd_t mqdes, const pace_mq_attr * mqstat, pace_mq_attr * omqstat) +int mq_setattr(pace_mqd_t mqdes, + const pace_mq_attr * mqstat, + pace_mq_attr * omqstat) { if (omqstat != 0) { @@ -374,9 +473,9 @@ void print_queue (pace_mqd_t mqd) i++; index = ((message_header*)(&mqd->mptr[index]))->next; } - printf ("There are %i total blacks of size %i.\n", queue->attr.mq_maxmsg, queue->attr.mq_msgsize); + printf ("There are %li total blacks of size %li.\n", queue->attr.mq_maxmsg, queue->attr.mq_msgsize); printf ("There are %i free blocks left.\n", i); - printf ("There are %i messages on the queue.\n", queue->attr.mq_curmsgs); + printf ("There are %li messages on the queue.\n", queue->attr.mq_curmsgs); i=0; index = queue->head; @@ -391,62 +490,4 @@ void print_queue (pace_mqd_t mqd) printf ("\n"); } -int main (int argc, char** argv) -{ - int flags; - pace_mqd_t mqd; - pace_mq_attr attr; - char* string; - char* s2; - flags = O_RDWR | O_CREAT | O_NONBLOCK; - attr.mq_flags |= O_NONBLOCK; - attr.mq_msgsize = 51; - attr.mq_maxmsg = 50; - - mqd = mq_open ("hello", flags, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH, &attr); - - PACE_UNUSED_ARG (argc); - PACE_UNUSED_ARG (argv); - - string = "Message\n"; - s2 = malloc (1600); - - print_queue (mqd); - - printf ("Sending message 1\n"); - if (mq_send (mqd, string, 9, 11) == -1) - { - perror ("send"); - return -1; - } - print_queue (mqd); - - printf ("Sending message 2\n"); - string = "Message2\n"; - if (mq_send (mqd, string, 10, 10) == -1) - { - perror ("send"); - return -1; - } -print_queue (mqd); - - printf ("Getting message one\n"); - flags = mq_receive (mqd, s2, 1600, 0); - if (flags == -1) - { - perror ("receiev:"); - exit (0); - } -print_queue (mqd); - printf (s2); - flags = mq_receive (mqd, s2, 1600, 0); - if (flags == -1) - { - perror ("receiev:"); - exit (0); - } -print_queue (mqd); - printf (s2); - - return mq_close (mqd); -} +#endif /* PACE_LINUX */ |