summaryrefslogtreecommitdiff
path: root/PACE/pace/emulation/mqueue.c
diff options
context:
space:
mode:
Diffstat (limited to 'PACE/pace/emulation/mqueue.c')
-rw-r--r--PACE/pace/emulation/mqueue.c285
1 files changed, 163 insertions, 122 deletions
diff --git a/PACE/pace/emulation/mqueue.c b/PACE/pace/emulation/mqueue.c
index c1c7440a5fc..e30e927e60c 100644
--- a/PACE/pace/emulation/mqueue.c
+++ b/PACE/pace/emulation/mqueue.c
@@ -1,4 +1,19 @@
-/* $Id$ */
+/* $Id$
+ * ============================================================================
+ *
+ * = LIBRARY
+ * pace
+ *
+ * = FILENAME
+ * pace/emulation/mqueue.c
+ *
+ * = AUTHOR
+ * John Heitmann
+ *
+ * ============================================================================ */
+
+#if PACE_LINUX
+
#include "pace/sys/mman.h"
#include "pace/stdio.h"
#include "pace/fcntl.h"
@@ -6,15 +21,15 @@
#include "pace/stdlib.h"
#include "pace/sys/types.h"
#include "pace/pthread.h"
+#include "pace/sys/stat.h"
#include "pace/emulation/mqueue.h"
-typedef struct mqd* pace_mqd_t;
-
typedef struct
{
pace_mq_attr attr;
pace_size_t num_open;
pace_pthread_mutex_t mutex;
+ pace_pthread_cond_t cond;
pace_size_t head;
pace_size_t freelist;
} mqfile;
@@ -28,26 +43,43 @@ typedef struct
struct mq_attr attrdefault = { 0, 32, 256, 0 };
-pace_mqd_t mq_open (const char* name, int oflag, pace_mode_t mode, pace_mq_attr* attr)
+/* This remains mq_open due to the macro in pace/mqueue.h */
+pace_mqd_t mq_open (const char* name,
+ int oflag,
+ pace_mode_t mode,
+ pace_mq_attr* attr)
{
int m_padding = sizeof (message_header); /* How much extra space per message do we need */
int f_padding = sizeof (mqfile); /* How much fixed padding is needed */
int mflags, mprot;
int fd;
- pace_size_t i;
+ int i;
pace_size_t mapsize;
char* mmaploc;
+ char* new_name;
int create_mmap = 0; /* 1 if the file has never be inited */
- pace_pthread_mutexattr_t mattr;
message_header* temp = 0; /*Used in initializaiton of mqueue*/
long index; /* index into the file */
pace_mqd_t result = pace_malloc (sizeof (struct mqd));
+ pace_stat_s statbuf;
+retry:
if (attr == 0)
{
attr = &attrdefault;
}
+ else
+ {
+ if (attr->mq_maxmsg < 0 || attr->mq_msgsize < 0)
+ {
+ errno = EBADF;
+ return (pace_mqd_t)-1;
+ }
+ }
+ /* Create a name that will go to /tmp with a unique name */
+ new_name = malloc (256);
+ snprintf (new_name, 256, "/tmp/mq013028%s", name);
/* Fix alignment */
if (attr->mq_msgsize % sizeof (long) != 0)
{
@@ -56,7 +88,7 @@ pace_mqd_t mq_open (const char* name, int oflag, pace_mode_t mode, pace_mq_attr*
if (oflag & PACE_O_CREAT)
{
/* We need to protect access without the help of O_RDONLY in the fs */
- fd = pace_open ((name, PACE_O_RDWR | PACE_O_CREAT | PACE_O_EXCL, mode));
+ fd = pace_open ((new_name, PACE_O_RDWR | PACE_O_CREAT | PACE_O_EXCL, mode & ~S_IXUSR));
if (fd == -1 && errno != EEXIST)
{
@@ -77,19 +109,49 @@ pace_mqd_t mq_open (const char* name, int oflag, pace_mode_t mode, pace_mq_attr*
else
{
/* We want the existing file */
- fd = pace_open ((name, PACE_O_RDWR));
+ fd = pace_open ((new_name, PACE_O_RDWR));
+ if (fd == -1 && errno == ENOENT)
+ {
+ /* Something odd is going on */
+ goto retry;
+ }
+ else if (fd == -1)
+ {
+ return (pace_mqd_t)-1;
+ }
}
}
else
{
- fd = pace_open ((name, PACE_O_RDWR));
+ fd = pace_open ((new_name, PACE_O_RDWR));
if (fd == -1)
{
- errno = ENOENT;
return (pace_mqd_t)-1;
}
}
+ /*
+ The following loop makes shure that we haven't entered a race condition. If a file
+ has been created but not initialized, its IXUSR will not be set (see above).
+ */
+ while (create_mmap == 0)
+ {
+ if (stat (new_name, &statbuf) == -1)
+ {
+ close (fd);
+ if (errno == ENOENT && (oflag & O_CREAT))
+ {
+ goto retry;
+ }
+ return (pace_mqd_t)-1;
+ }
+ else if ((statbuf.st_mode & S_IXUSR) == 0)
+ {
+ break;
+ }
+ pace_sleep (1);
+ }
+
mapsize = f_padding + (attr->mq_msgsize + m_padding) * (attr->mq_maxmsg);
mprot = PACE_PROT_READ | PACE_PROT_WRITE;
mflags = PACE_MAP_SHARED;
@@ -99,30 +161,43 @@ pace_mqd_t mq_open (const char* name, int oflag, pace_mode_t mode, pace_mq_attr*
/* Create and 0 out the file */
if (pace_lseek (fd, mapsize, PACE_SEEK_SET) == -1)
{
+ pace_unlink (new_name);
return (pace_mqd_t)-1;
}
if (pace_write (fd, "", 1) != 1)
{
+ pace_unlink (new_name);
return (pace_mqd_t)-1;
}
- mmaploc = mmap (0, mapsize, mprot, mflags, fd, 0);
- if ((int)mmaploc == -1)
+ mmaploc = pace_mmap (0, mapsize, mprot, mflags, fd, 0);
+
+ if (mmaploc == MAP_FAILED)
{
+ pace_unlink (new_name);
return (pace_mqd_t)-1;
}
+
+ pace_close (fd);
+
pace_memset (mmaploc, 0, mapsize);
- if (pace_pthread_mutexattr_init (&mattr) == -1)
+ if ((errno = pace_pthread_mutex_init (&(((mqfile*)mmaploc)->mutex), 0)) != 0)
{
+ pace_unlink (new_name);
+ pace_munmap (mmaploc, mapsize);
return (pace_mqd_t)-1;
}
- if (pace_pthread_mutex_init (&(((mqfile*)mmaploc)->mutex), &mattr) == -1)
+ if ((errno = pace_pthread_mutex_lock (&(((mqfile*)mmaploc)->mutex))) != 0)
{
+ pace_unlink (new_name);
+ pace_munmap (mmaploc, mapsize);
return (pace_mqd_t)-1;
}
- if (pace_pthread_mutex_lock (&(((mqfile*)mmaploc)->mutex)) == -1)
+ if ((errno = pace_pthread_cond_init (&(((mqfile*)mmaploc)->cond), 0)) != 0)
{
+ pace_unlink (new_name);
+ pace_munmap (mmaploc, mapsize);
return (pace_mqd_t)-1;
}
@@ -139,20 +214,30 @@ pace_mqd_t mq_open (const char* name, int oflag, pace_mode_t mode, pace_mq_attr*
temp->next = 0;
attr->mq_curmsgs = 0;
((mqfile*)mmaploc)->attr = *attr;
+
+ /* Set S_IXUSR so that the file is known to be inited */
+ if (pace_fchmod (fd, mode | S_IXUSR) == -1)
+ {
+ pace_unlink (new_name);
+ pace_munmap (mmaploc, mapsize);
+ return (pace_mqd_t)-1;
+ }
}
else
{
/* Just open the existing map */
- mmaploc = mmap (0, mapsize, mprot, mflags, fd, 0);
- if ((int)mmaploc == -1)
+ mmaploc = pace_mmap (0, mapsize, mprot, mflags, fd, 0);
+ if (mmaploc == MAP_FAILED)
{
return (pace_mqd_t)-1;
}
+ pace_close (fd);
/* ???? Test here for race */
if (pace_pthread_mutex_lock (&(((mqfile*)mmaploc)->mutex)) == -1)
{
+ pace_munmap (mmaploc, mapsize);
return (pace_mqd_t)-1;
}
((mqfile*)mmaploc)->attr.mq_flags = attr->mq_flags;
@@ -163,6 +248,7 @@ pace_mqd_t mq_open (const char* name, int oflag, pace_mode_t mode, pace_mq_attr*
if (pace_pthread_mutex_unlock (&(((mqfile*)mmaploc)->mutex)) == -1)
{
+ pace_munmap (mmaploc, mapsize);
return (pace_mqd_t)-1;
}
@@ -200,7 +286,10 @@ int mq_unlink (const char* name)
return pace_unlink (name);
}
-int mq_send (pace_mqd_t mqdes, const char* ptr, pace_size_t length, unsigned int priority)
+int mq_send (pace_mqd_t mqdes,
+ const char* ptr,
+ pace_size_t length,
+ unsigned int priority)
{
mqfile* queue = ((mqfile*)mqdes->mptr);
long index, old_index;
@@ -210,13 +299,16 @@ int mq_send (pace_mqd_t mqdes, const char* ptr, pace_size_t length, unsigned int
errno = EBADF;
return -1;
}
- if (queue->attr.mq_msgsize < length)
+ if (queue->attr.mq_msgsize < (int) length)
{
/* Message too long */
errno = EMSGSIZE;
return -1;
}
-
+ if ((errno = pace_pthread_mutex_lock (&queue->mutex)) != 0)
+ {
+ return -1;
+ }
/* If the queue is full... */
if (queue->attr.mq_curmsgs >= queue->attr.mq_maxmsg)
{
@@ -225,65 +317,70 @@ int mq_send (pace_mqd_t mqdes, const char* ptr, pace_size_t length, unsigned int
errno = EAGAIN;
return -1;
}
- else
+ while (queue->attr.mq_maxmsg <= queue->attr.mq_curmsgs)
{
- /* ???? */
+ pace_pthread_cond_wait (&queue->cond, &queue->mutex);
}
}
+
+ /* Fill in the fields of the header */
+ ((message_header*)(&mqdes->mptr[queue->freelist]))->priority = priority;
+ ((message_header*)(&mqdes->mptr[queue->freelist]))->length = length;
+ pace_memcpy (((void*)(&mqdes->mptr[queue->freelist + sizeof (message_header)])),
+ ptr, length);
+
+ /* Update the linked list */
+ old_index = 0;
+ index = queue->head;
+ while (index != 0 && ((message_header*)(&mqdes->mptr[index]))->priority >= priority)
+ {
+ old_index = index;
+ index = ((message_header*)(&mqdes->mptr[index]))->next;
+ }
+
+ /* If the msg goes at the head */
+ if (old_index == 0)
+ {
+ queue->head = queue->freelist;
+ queue->freelist = ((message_header*)(&mqdes->mptr[queue->freelist]))->next;
+ ((message_header*)(&mqdes->mptr[queue->head]))->next = index;
+ }
else
{
- if (pace_pthread_mutex_lock (&queue->mutex) == -1)
- {
- return -1;
- }
+ ((message_header*)(&mqdes->mptr[old_index]))->next = queue->freelist;
+ old_index = queue->freelist;
+ queue->freelist = ((message_header*)(&mqdes->mptr[queue->freelist]))->next;
+ ((message_header*)(&mqdes->mptr[old_index]))->next = index;
+ }
- queue->attr.mq_curmsgs++;
- /* Fill in the fields of the header */
- ((message_header*)(&mqdes->mptr[queue->freelist]))->priority = priority;
- ((message_header*)(&mqdes->mptr[queue->freelist]))->length = length;
- pace_memcpy (((void*)(&mqdes->mptr[queue->freelist + sizeof (message_header)])),
- ptr, length);
-
- /* Update the linked list */
- old_index = 0;
- index = queue->head;
- while (index != 0 && ((message_header*)(&mqdes->mptr[index]))->priority >= priority)
- {
- old_index = index;
- index = ((message_header*)(&mqdes->mptr[index]))->next;
- }
- /* If the msg goes at the head */
- if (old_index == 0)
- {
- queue->head = queue->freelist;
- queue->freelist = ((message_header*)(&mqdes->mptr[queue->freelist]))->next;
- ((message_header*)(&mqdes->mptr[queue->head]))->next = index;
- }
- else
- {
- ((message_header*)(&mqdes->mptr[old_index]))->next = queue->freelist;
- old_index = queue->freelist;
- queue->freelist = ((message_header*)(&mqdes->mptr[queue->freelist]))->next;
- ((message_header*)(&mqdes->mptr[old_index]))->next = index;
- }
- if (pace_pthread_mutex_unlock (&queue->mutex) == -1)
+ if (queue->attr.mq_curmsgs == 0)
+ {
+ /* Let other waiting threads know there is food on the table */
+ if ((errno = pace_pthread_cond_signal (&((mqfile*)mqdes->mptr)->cond)) != 0)
{
return -1;
}
}
+ queue->attr.mq_curmsgs++;
+
+ if ((errno = pace_pthread_mutex_unlock (&queue->mutex)) != 0)
+ {
+ return -1;
+ }
+
return 0;
}
pace_ssize_t mq_receive (pace_mqd_t mqdes,
- char * msg_ptr,
- pace_size_t msg_len,
- unsigned int * nmsg_prio)
+ char * msg_ptr,
+ pace_size_t msg_len,
+ unsigned int * nmsg_prio)
{
mqfile* queue = ((mqfile*)mqdes->mptr);
pace_size_t temp;
- if (queue->attr.mq_msgsize > msg_len)
+ if (queue->attr.mq_msgsize > (long) msg_len)
{
errno = EMSGSIZE;
return -1;
@@ -336,7 +433,9 @@ int mq_getattr (pace_mqd_t mqdes, pace_mq_attr * mqstat)
return 0;
}
-int mq_setattr(pace_mqd_t mqdes, const pace_mq_attr * mqstat, pace_mq_attr * omqstat)
+int mq_setattr(pace_mqd_t mqdes,
+ const pace_mq_attr * mqstat,
+ pace_mq_attr * omqstat)
{
if (omqstat != 0)
{
@@ -374,9 +473,9 @@ void print_queue (pace_mqd_t mqd)
i++;
index = ((message_header*)(&mqd->mptr[index]))->next;
}
- printf ("There are %i total blacks of size %i.\n", queue->attr.mq_maxmsg, queue->attr.mq_msgsize);
+ printf ("There are %li total blacks of size %li.\n", queue->attr.mq_maxmsg, queue->attr.mq_msgsize);
printf ("There are %i free blocks left.\n", i);
- printf ("There are %i messages on the queue.\n", queue->attr.mq_curmsgs);
+ printf ("There are %li messages on the queue.\n", queue->attr.mq_curmsgs);
i=0;
index = queue->head;
@@ -391,62 +490,4 @@ void print_queue (pace_mqd_t mqd)
printf ("\n");
}
-int main (int argc, char** argv)
-{
- int flags;
- pace_mqd_t mqd;
- pace_mq_attr attr;
- char* string;
- char* s2;
- flags = O_RDWR | O_CREAT | O_NONBLOCK;
- attr.mq_flags |= O_NONBLOCK;
- attr.mq_msgsize = 51;
- attr.mq_maxmsg = 50;
-
- mqd = mq_open ("hello", flags, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH, &attr);
-
- PACE_UNUSED_ARG (argc);
- PACE_UNUSED_ARG (argv);
-
- string = "Message\n";
- s2 = malloc (1600);
-
- print_queue (mqd);
-
- printf ("Sending message 1\n");
- if (mq_send (mqd, string, 9, 11) == -1)
- {
- perror ("send");
- return -1;
- }
- print_queue (mqd);
-
- printf ("Sending message 2\n");
- string = "Message2\n";
- if (mq_send (mqd, string, 10, 10) == -1)
- {
- perror ("send");
- return -1;
- }
-print_queue (mqd);
-
- printf ("Getting message one\n");
- flags = mq_receive (mqd, s2, 1600, 0);
- if (flags == -1)
- {
- perror ("receiev:");
- exit (0);
- }
-print_queue (mqd);
- printf (s2);
- flags = mq_receive (mqd, s2, 1600, 0);
- if (flags == -1)
- {
- perror ("receiev:");
- exit (0);
- }
-print_queue (mqd);
- printf (s2);
-
- return mq_close (mqd);
-}
+#endif /* PACE_LINUX */