summaryrefslogtreecommitdiff
path: root/PACE/pace/emulation/mqueue.c
diff options
context:
space:
mode:
Diffstat (limited to 'PACE/pace/emulation/mqueue.c')
-rw-r--r--PACE/pace/emulation/mqueue.c571
1 files changed, 0 insertions, 571 deletions
diff --git a/PACE/pace/emulation/mqueue.c b/PACE/pace/emulation/mqueue.c
deleted file mode 100644
index 36b74616f9e..00000000000
--- a/PACE/pace/emulation/mqueue.c
+++ /dev/null
@@ -1,571 +0,0 @@
-/* $Id$ -*- C -*-
- * ============================================================================
- *
- * = LIBRARY
- * pace
- *
- * = FILENAME
- * pace/emulation/mqueue.c
- *
- * = AUTHOR
- * John Heitmann
- *
- * ============================================================================ */
-
-#include "pace/sys/mman.h"
-#include "pace/stdio.h"
-#include "pace/fcntl.h"
-#include "pace/string.h"
-#include "pace/stdlib.h"
-#include "pace/sys/types.h"
-#include "pace/pthread.h"
-#include "pace/sys/stat.h"
-#include "pace/emulation/mqueue.h"
-
-typedef struct
-{
- pace_mq_attr attr;
- pace_size_t num_open; /* How many processes have a valid mqd_t to here */
- pace_size_t rec_wait; /* How many processes are blocked on mq_receive */
- pace_pid_t not_pid; /* Who is actually registered for notification */
- pace_sigevent notification;
- pace_pthread_mutex_t mutex;
- pace_pthread_cond_t cond;
- pace_size_t head;
- pace_size_t freelist;
-} mqfile;
-
-typedef struct
-{
- pace_size_t next; /* Index of next element */
- unsigned int priority;
- pace_size_t length;
-} message_header;
-
-static struct mq_attr pace_attrdefault = { 0, 32, 256, 0 };
-#define PACE_MQ_LOCKPOSTFIX "mqlock9587"
-#define PACE_MQ_DATAPOSTFIX "mqdata2355"
-
-/* This remains mq_open due to the macro in pace/mqueue.h */
-#if (PACE_HAS_POSIX_NONUOF_FUNCS)
-pace_mqd_t mq_open (const char* name,
- int oflag,
- pace_mode_t mode,
- pace_mq_attr* attr)
-{
- int m_padding = sizeof (message_header); /* How much extra space per message do we need */
- int f_padding = sizeof (mqfile); /* How much fixed padding is needed */
- int mflags, mprot;
- int fd;
- int i;
- pace_size_t mapsize;
- char* mmaploc;
- char* new_name;
- char* lock_name;
- int create_mmap = 0; /* 1 if the file has never be inited */
- message_header* temp = 0; /*Used in initialization of mqueue*/
- long index; /* index into the file */
- pace_mqd_t result = pace_malloc (sizeof (struct mqd));
- pace_stat_s statbuf;
-
-retry:
- if (attr == 0)
- {
- attr = &pace_attrdefault;
- }
- else
- {
- if (attr->mq_maxmsg < 0 || attr->mq_msgsize < 0)
- {
- errno = EBADF;
- return (pace_mqd_t)-1;
- }
- }
-
- /* Create a name that will go to /tmp with a unique name */
- new_name = malloc (256);
- lock_name = malloc (256);
- snprintf (new_name, 256, "/tmp%s%s", name, PACE_MQ_DATAPOSTFIX);
- snprintf (lock_name, 256, "/tmp%s%s", name, PACE_MQ_LOCKPOSTFIX);
-
- /* Fix alignment */
- if (attr->mq_msgsize % sizeof (long) != 0)
- {
- attr->mq_msgsize += 8 - (attr->mq_msgsize % sizeof (long));
- }
-
- if (oflag & PACE_O_CREAT)
- {
- /* We need to protect access without the help of O_RDONLY in the fs */
- fd = pace_open (new_name, PACE_O_RDWR | PACE_O_CREAT | PACE_O_EXCL, mode);
-
- if (fd == -1 && errno != EEXIST)
- {
- /* An error other than EEXIST has occurred. */
- return (pace_mqd_t)-1;
- }
- else if (fd != -1)
- {
- /* If a new file was created successfully */
- create_mmap = 1;
- }
- else if (oflag & PACE_O_EXCL)
- {
- /* If the file exists and we don't want it */
- errno = EEXIST;
- return (pace_mqd_t)-1;
- }
- else
- {
- /* We want the existing file */
- fd = pace_open (new_name, PACE_O_RDWR);
- if (fd == -1 && errno == ENOENT)
- {
- /* Something odd is going on */
- goto retry;
- }
- else if (fd == -1)
- {
- return (pace_mqd_t)-1;
- }
- }
- }
- else
- {
- fd = pace_open (new_name, PACE_O_RDWR);
- if (fd == -1)
- {
- return (pace_mqd_t)-1;
- }
- }
-
- /*
- The following loop makes shure that we haven't entered a race condition. If a file
- has been created but not initialized, its IXUSR will not be set (see above).
- */
- while (create_mmap == 0)
- {
- if (stat (lock_name, &statbuf) == -1)
- {
- close (fd);
- if (errno == ENOENT && (oflag & O_CREAT))
- {
- goto retry;
- }
- return (pace_mqd_t)-1;
- }
- else
- {
- break;
- }
- pace_sleep (1);
- }
-
- mapsize = f_padding + (attr->mq_msgsize + m_padding) * (attr->mq_maxmsg);
- mprot = PACE_PROT_READ | PACE_PROT_WRITE;
- mflags = PACE_MAP_SHARED;
-
- if (create_mmap)
- {
- /* Create and 0 out the file */
- if (pace_lseek (fd, mapsize, PACE_SEEK_SET) == -1)
- {
- pace_unlink (new_name);
- return (pace_mqd_t)-1;
- }
- if (pace_write (fd, "", 1) != 1)
- {
- pace_unlink (new_name);
- return (pace_mqd_t)-1;
- }
-
- mmaploc = pace_mmap (0, mapsize, mprot, mflags, fd, 0);
- pace_close (fd);
- if (mmaploc == MAP_FAILED)
- {
- pace_unlink (new_name);
- return (pace_mqd_t)-1;
- }
-
- pace_memset (mmaploc, 0, mapsize);
-
- if ((errno = pace_pthread_mutex_init (&(((mqfile*)mmaploc)->mutex), 0)) != 0)
- {
- pace_unlink (new_name);
- pace_munmap (mmaploc, mapsize);
- return (pace_mqd_t)-1;
- }
- if ((errno = pace_pthread_mutex_lock (&(((mqfile*)mmaploc)->mutex))) != 0)
- {
- pace_unlink (new_name);
- pace_munmap (mmaploc, mapsize);
- return (pace_mqd_t)-1;
- }
-
- if ((errno = pace_pthread_cond_init (&(((mqfile*)mmaploc)->cond), 0)) != 0)
- {
- pace_unlink (new_name);
- pace_munmap (mmaploc, mapsize);
- return (pace_mqd_t)-1;
- }
-
- index = sizeof (mqfile);
- ((mqfile*)mmaploc)->freelist = index;
- ((mqfile*)mmaploc)->head = 0;
-
- for (i = 0; i < attr->mq_maxmsg; ++i)
- {
- temp = (message_header *) &mmaploc[index];
- index += sizeof (message_header) + attr->mq_msgsize;
- temp->next = index;
- }
- temp->next = 0;
- attr->mq_curmsgs = 0;
- ((mqfile*)mmaploc)->attr = *attr;
-
- /* Create the lock file so that the file is known to be inited */
- if (pace_open (lock_name, O_CREAT | O_EXCL) == -1)
- {
- pace_unlink (new_name);
- pace_munmap (mmaploc, mapsize);
- return (pace_mqd_t)-1;
- }
-
- }
- else
- {
- /* Just open the existing map */
- mmaploc = pace_mmap (0, mapsize, mprot, mflags, fd, 0);
- if (mmaploc == MAP_FAILED)
- {
- return (pace_mqd_t)-1;
- }
- pace_close (fd);
-
- /* ???? Test here for race */
-
- if (pace_pthread_mutex_lock (&(((mqfile*)mmaploc)->mutex)) == -1)
- {
- pace_munmap (mmaploc, mapsize);
- return (pace_mqd_t)-1;
- }
- ((mqfile*)mmaploc)->attr.mq_flags = attr->mq_flags;
- }
-
- ((mqfile*)mmaploc)->num_open++;
-
-
- if (pace_pthread_mutex_unlock (&(((mqfile*)mmaploc)->mutex)) == -1)
- {
- pace_munmap (mmaploc, mapsize);
- return (pace_mqd_t)-1;
- }
-
- result->mptr = mmaploc;
- result->length = mapsize;
- result->oflag = oflag;
-
- return result;
-}
-#endif /* PACE_HAS_POSIX_NONUOF_FUNCS */
-
-#if (PACE_HAS_POSIX_NONUOF_FUNCS)
-int mq_close (pace_mqd_t mqdes)
-{
- if (pace_pthread_mutex_lock (&( ((mqfile*)mqdes->mptr)->mutex)) == -1)
- {
- errno = EBADF;
- return -1;
- }
- ((mqfile*)mqdes->mptr)->num_open--;
- if (pace_pthread_mutex_unlock (&(((mqfile*)mqdes->mptr)->mutex)) == -1)
- {
- errno = EBADF;
- return -1;
- }
- if (munmap (mqdes->mptr, mqdes->length) == -1)
- {
- return -1;
- }
- free (mqdes);
- return 0;
-}
-#endif /* PACE_HAS_POSIX_NONUOF_FUNCS */
-
-#if (PACE_HAS_POSIX_NONUOF_FUNCS)
-int mq_unlink (const char* name)
-{
- int result1, result2;
- char* new_name;
- new_name = malloc (256);
- snprintf (new_name, 256, "/tmp%s%s", name, PACE_MQ_DATAPOSTFIX);
- result1 = pace_unlink (new_name);
- snprintf (new_name, 256, "/tmp%s%s", name, PACE_MQ_LOCKPOSTFIX);
- result2 = pace_unlink (new_name);
- free (new_name);
- return (result1 == -1 || result2 == -1 ? -1 : 0);
-}
-#endif /* PACE_HAS_POSIX_NONUOF_FUNCS */
-
-#if (PACE_HAS_POSIX_NONUOF_FUNCS)
-int mq_send (pace_mqd_t mqdes,
- const char* ptr,
- pace_size_t length,
- unsigned int priority)
-{
- mqfile* queue = ((mqfile*)mqdes->mptr);
- long index, old_index;
- if (mqdes->oflag & O_RDONLY)
- {
- /* Incorrect access priviledges */
- errno = EBADF;
- return -1;
- }
- if (queue->attr.mq_msgsize < (int) length)
- {
- /* Message too long */
- errno = EMSGSIZE;
- return -1;
- }
- if ((errno = pace_pthread_mutex_lock (&queue->mutex)) != 0)
- {
- return -1;
- }
- /* If the queue is full... */
- if (queue->attr.mq_curmsgs >= queue->attr.mq_maxmsg)
- {
- if (queue->attr.mq_flags & O_NONBLOCK)
- {
- errno = EAGAIN;
- return -1;
- }
- while (queue->attr.mq_maxmsg <= queue->attr.mq_curmsgs)
- {
- pace_pthread_cond_wait (&queue->cond, &queue->mutex);
- pace_printf ("Send Woke Up\n");
- }
- }
-
- /* Fill in the fields of the header */
- ((message_header*)(&mqdes->mptr[queue->freelist]))->priority = priority;
- ((message_header*)(&mqdes->mptr[queue->freelist]))->length = length;
- pace_memcpy (((void*)(&mqdes->mptr[queue->freelist + sizeof (message_header)])),
- ptr, length);
-
- /* Update the linked list */
- old_index = 0;
- index = queue->head;
- while (index != 0 && ((message_header*)(&mqdes->mptr[index]))->priority >= priority)
- {
- old_index = index;
- index = ((message_header*)(&mqdes->mptr[index]))->next;
- }
-
- /* If the msg goes at the head */
- if (old_index == 0)
- {
- queue->head = queue->freelist;
- queue->freelist = ((message_header*)(&mqdes->mptr[queue->freelist]))->next;
- ((message_header*)(&mqdes->mptr[queue->head]))->next = index;
- }
- else
- {
- ((message_header*)(&mqdes->mptr[old_index]))->next = queue->freelist;
- old_index = queue->freelist;
- queue->freelist = ((message_header*)(&mqdes->mptr[queue->freelist]))->next;
- ((message_header*)(&mqdes->mptr[old_index]))->next = index;
- }
-
- queue->attr.mq_curmsgs++;
-
- if ((errno = pace_pthread_mutex_unlock (&queue->mutex)) != 0)
- {
- return -1;
- }
-
- if (queue->attr.mq_curmsgs == 1)
- {
- /* If there is no one waiting and blocked */
- if (queue->not_pid != 0 && queue->rec_wait == 0)
- {
- if (queue->notification.sigev_notify == SIGEV_SIGNAL)
- {
- sigqueue (queue->not_pid,
- queue->notification.sigev_signo,
- queue->notification.sigev_value);
- }
- queue->not_pid = 0;
- }
- else
- {
- pace_printf ("Send is Signalling\n");
- /* Let other waiting threads know there is food on the table */
- if ((errno = pace_pthread_cond_signal (&((mqfile*)mqdes->mptr)->cond)) != 0)
- {
- return -1;
- }
- }
- }
- return 0;
-}
-#endif /* PACE_HAS_POSIX_NONUOF_FUNCS */
-
-#if (PACE_HAS_POSIX_NONUOF_FUNCS)
-pace_ssize_t mq_receive (pace_mqd_t mqdes,
- char * msg_ptr,
- pace_size_t msg_len,
- unsigned int * nmsg_prio)
-{
- mqfile* queue = ((mqfile*)mqdes->mptr);
- pace_size_t temp;
-
- if (queue->attr.mq_msgsize > (long) msg_len)
- {
- errno = EMSGSIZE;
- return -1;
- }
-
- if ((errno = pace_pthread_mutex_lock (&queue->mutex)) != 0)
- {
- return -1;
- }
-
- /* If the queue is empty... */
- if (queue->attr.mq_curmsgs <= 0)
- {
- if (queue->attr.mq_flags & O_NONBLOCK)
- {
- errno = EAGAIN;
- return -1;
- }
- while (queue->attr.mq_curmsgs <= 0)
- {
- pace_printf ("Recv is going to sleep\n");
- queue->rec_wait++;
- pace_pthread_cond_wait (&(queue->cond), &(queue->mutex));
- queue->rec_wait--;
- pace_printf ("Recv is waking from sleep\n");
- }
- }
-
- if (nmsg_prio != 0)
- {
- *nmsg_prio = ((message_header*)(&mqdes->mptr[queue->head]))->priority;
- }
-
- pace_memcpy (msg_ptr, ((void*)(&mqdes->mptr[queue->head + sizeof (message_header)])),
- ((message_header*)(&mqdes->mptr[queue->head]))->length);
- temp = queue->head;
- queue->head = ((message_header*)(&mqdes->mptr[queue->head]))->next;
- ((message_header*)(&mqdes->mptr[temp]))->next = queue->freelist;
- queue->freelist = temp;
-
- queue->attr.mq_curmsgs--;
-
- if (pace_pthread_mutex_unlock (&queue->mutex) == -1)
- {
- errno = EBADMSG;
- return -1;
- }
-
- if (queue->attr.mq_curmsgs == (queue->attr.mq_maxmsg-1))
- {
- pace_printf ("Recv is signalling\n");
- /* Let other waiting threads know there is room available */
- if ((errno = pace_pthread_cond_signal (&((mqfile*)mqdes->mptr)->cond)) != 0)
- {
- return -1;
- }
- }
-
- return ((message_header*)(&mqdes->mptr[queue->head]))->length;
-}
-#endif /* PACE_HAS_POSIX_NONUOF_FUNCS */
-
-#if (PACE_HAS_POSIX_NONUOF_FUNCS)
-int mq_getattr (pace_mqd_t mqdes, pace_mq_attr * mqstat)
-{
- mqfile* queue = ((mqfile*)mqdes->mptr);
-
- if ((errno = pace_pthread_mutex_lock (&queue->mutex)) != 0)
- {
- return -1;
- }
-
- *mqstat = queue->attr;
-
- pace_pthread_mutex_unlock (&queue->mutex);
-
- return 0;
-}
-#endif /* PACE_HAS_POSIX_NONUOF_FUNCS */
-
-#if (PACE_HAS_POSIX_NONUOF_FUNCS)
-int mq_setattr(pace_mqd_t mqdes,
- const pace_mq_attr * mqstat,
- pace_mq_attr * omqstat)
-{
- mqfile* queue = ((mqfile*)(mqdes->mptr));
-
- if ((errno = pace_pthread_mutex_lock (&queue->mutex)) != 0)
- {
- return -1;
- }
- if (omqstat != 0)
- {
- *omqstat = ((mqfile*)mqdes->mptr)->attr;
- }
- if (mqstat == 0 || mqdes == 0)
- {
- /* You eediot*/
- errno = EFAULT;
- pace_pthread_mutex_unlock (&queue->mutex);
- return -1;
- }
-
- ((mqfile*)mqdes->mptr)->attr.mq_flags = mqstat->mq_flags;
-
- pace_pthread_mutex_unlock (&queue->mutex);
- return 0;
-}
-#endif /* PACE_HAS_POSIX_NONUOF_FUNCS */
-
-#if (PACE_HAS_POSIX_NONUOF_FUNCS)
-int mq_notify (pace_mqd_t mqd, const pace_sigevent* notification)
-{
- mqfile* queue = ((mqfile*)(mqd->mptr));
- pace_pid_t pid = pace_getpid ();
-
- if ((errno = pace_pthread_mutex_lock (&queue->mutex)) != 0)
- {
- return -1;
- }
-
- if (notification == 0)
- {
- /* Unregister if notification is null */
- if (queue->not_pid == pid)
- {
- queue->not_pid = 0;
- }
- }
- else
- {
- if (queue->not_pid && pace_kill (queue->not_pid, 0))
- {
- /* If another process is registered */
- if (errno != ESRCH)
- {
- pace_pthread_mutex_unlock (&queue->mutex);
- return -1;
- }
- }
- queue->not_pid = pid;
- queue->notification = *notification;
- }
-
- pthread_mutex_unlock (&queue->mutex);
-
- return 0;
-}
-#endif /* PACE_HAS_POSIX_NONUOF_FUNCS */