diff options
Diffstat (limited to 'PACE/pace/emulation/mqueue.c')
-rw-r--r-- | PACE/pace/emulation/mqueue.c | 570 |
1 files changed, 0 insertions, 570 deletions
diff --git a/PACE/pace/emulation/mqueue.c b/PACE/pace/emulation/mqueue.c deleted file mode 100644 index 4e0e6912de6..00000000000 --- a/PACE/pace/emulation/mqueue.c +++ /dev/null @@ -1,570 +0,0 @@ -/* $Id$ -*- C -*- - * ============================================================================ - * - * = LIBRARY - * pace - * - * = FILENAME - * pace/emulation/mqueue.c - * - * = AUTHOR - * John Heitmann - * - * ============================================================================ */ - -#include "pace/sys/mman.h" -#include "pace/stdio.h" -#include "pace/fcntl.h" -#include "pace/string.h" -#include "pace/stdlib.h" -#include "pace/sys/types.h" -#include "pace/pthread.h" -#include "pace/sys/stat.h" -#include "pace/emulation/mqueue.h" - -typedef struct -{ - pace_mq_attr attr; - pace_size_t num_open; /* How many processes have a valid mqd_t to here */ - pace_size_t rec_wait; /* How many processes are blocked on mq_receive */ - pace_pid_t not_pid; /* Who is actually registered for notification */ - pace_sigevent notification; - pace_pthread_mutex_t mutex; - pace_pthread_cond_t cond; - pace_size_t head; - pace_size_t freelist; -} mqfile; - -typedef struct -{ - pace_size_t next; /* Index of next element */ - unsigned int priority; - pace_size_t length; -} message_header; - -static struct mq_attr pace_attrdefault = { 0, 32, 256, 0 }; -#define PACE_MQ_LOCKPOSTFIX "mqlock9587" -#define PACE_MQ_DATAPOSTFIX "mqdata2355" - -/* This remains mq_open due to the macro in pace/mqueue.h */ -#if (PACE_HAS_POSIX_NONUOF_FUNCS) -pace_mqd_t mq_open (const char* name, - int oflag, - pace_mode_t mode, - pace_mq_attr* attr) -{ - int m_padding = sizeof (message_header); /* How much extra space per message do we need */ - int f_padding = sizeof (mqfile); /* How much fixed padding is needed */ - int mflags, mprot; - int fd; - int i; - pace_size_t mapsize; - char* mmaploc; - char* new_name; - char* lock_name; - int create_mmap = 0; /* 1 if the file has never be inited */ - message_header* temp = 0; /*Used in initialization of mqueue*/ - long index; /* index into the file */ - pace_mqd_t result = (pace_mqd_t) pace_malloc (sizeof (struct mqd)); - pace_stat_s statbuf; - -retry: - if (attr == 0) - { - attr = &pace_attrdefault; - } - else - { - if (attr->mq_maxmsg < 0 || attr->mq_msgsize < 0) - { - errno = EBADF; - return (pace_mqd_t)-1; - } - } - - /* Create a name that will go to /tmp with a unique name */ - new_name = (char*) malloc (256); - lock_name = (char*) malloc (256); - snprintf (new_name, 256, "/tmp%s%s", name, PACE_MQ_DATAPOSTFIX); - snprintf (lock_name, 256, "/tmp%s%s", name, PACE_MQ_LOCKPOSTFIX); - - /* Fix alignment */ - if (attr->mq_msgsize % sizeof (long) != 0) - { - attr->mq_msgsize += 8 - (attr->mq_msgsize % sizeof (long)); - } - - if (oflag & PACE_O_CREAT) - { - /* We need to protect access without the help of O_RDONLY in the fs */ - fd = pace_open (new_name, PACE_O_RDWR | PACE_O_CREAT | PACE_O_EXCL, mode); - - if (fd == -1 && errno != EEXIST) - { - /* An error other than EEXIST has occurred. */ - return (pace_mqd_t)-1; - } - else if (fd != -1) - { - /* If a new file was created successfully */ - create_mmap = 1; - } - else if (oflag & PACE_O_EXCL) - { - /* If the file exists and we don't want it */ - errno = EEXIST; - return (pace_mqd_t)-1; - } - else - { - /* We want the existing file */ - fd = pace_open (new_name, PACE_O_RDWR); - if (fd == -1 && errno == ENOENT) - { - /* Something odd is going on */ - goto retry; - } - else if (fd == -1) - { - return (pace_mqd_t)-1; - } - } - } - else - { - fd = pace_open (new_name, PACE_O_RDWR); - if (fd == -1) - { - return (pace_mqd_t)-1; - } - } - - /* - The following loop makes shure that we haven't entered a race condition. If a file - has been created but not initialized, its IXUSR will not be set (see above). - */ - while (create_mmap == 0) - { - if (stat (lock_name, &statbuf) == -1) - { - close (fd); - if (errno == ENOENT && (oflag & O_CREAT)) - { - goto retry; - } - return (pace_mqd_t)-1; - } - else - { - break; - } - } - - mapsize = f_padding + (attr->mq_msgsize + m_padding) * (attr->mq_maxmsg); - mprot = PACE_PROT_READ | PACE_PROT_WRITE; - mflags = PACE_MAP_SHARED; - - if (create_mmap) - { - /* Create and 0 out the file */ - if (pace_lseek (fd, mapsize, PACE_SEEK_SET) == -1) - { - pace_unlink (new_name); - return (pace_mqd_t)-1; - } - if (pace_write (fd, "", 1) != 1) - { - pace_unlink (new_name); - return (pace_mqd_t)-1; - } - - mmaploc = (char*) pace_mmap (0, mapsize, mprot, mflags, fd, 0); - pace_close (fd); - if (mmaploc == MAP_FAILED) - { - pace_unlink (new_name); - return (pace_mqd_t)-1; - } - - pace_memset (mmaploc, 0, mapsize); - - if ((errno = pace_pthread_mutex_init (&(((mqfile*)mmaploc)->mutex), 0)) != 0) - { - pace_unlink (new_name); - pace_munmap (mmaploc, mapsize); - return (pace_mqd_t)-1; - } - if ((errno = pace_pthread_mutex_lock (&(((mqfile*)mmaploc)->mutex))) != 0) - { - pace_unlink (new_name); - pace_munmap (mmaploc, mapsize); - return (pace_mqd_t)-1; - } - - if ((errno = pace_pthread_cond_init (&(((mqfile*)mmaploc)->cond), 0)) != 0) - { - pace_unlink (new_name); - pace_munmap (mmaploc, mapsize); - return (pace_mqd_t)-1; - } - - index = sizeof (mqfile); - ((mqfile*)mmaploc)->freelist = index; - ((mqfile*)mmaploc)->head = 0; - - for (i = 0; i < attr->mq_maxmsg; ++i) - { - temp = (message_header *) &mmaploc[index]; - index += sizeof (message_header) + attr->mq_msgsize; - temp->next = index; - } - temp->next = 0; - attr->mq_curmsgs = 0; - ((mqfile*)mmaploc)->attr = *attr; - - /* Create the lock file so that the file is known to be inited */ - if (pace_open (lock_name, O_CREAT | O_EXCL) == -1) - { - pace_unlink (new_name); - pace_munmap (mmaploc, mapsize); - return (pace_mqd_t)-1; - } - - } - else - { - /* Just open the existing map */ - mmaploc = (char*) pace_mmap (0, mapsize, mprot, mflags, fd, 0); - if (mmaploc == MAP_FAILED) - { - return (pace_mqd_t)-1; - } - pace_close (fd); - - /* ???? Test here for race */ - - if (pace_pthread_mutex_lock (&(((mqfile*)mmaploc)->mutex)) == -1) - { - pace_munmap (mmaploc, mapsize); - return (pace_mqd_t)-1; - } - ((mqfile*)mmaploc)->attr.mq_flags = attr->mq_flags; - } - - ((mqfile*)mmaploc)->num_open++; - - - if (pace_pthread_mutex_unlock (&(((mqfile*)mmaploc)->mutex)) == -1) - { - pace_munmap (mmaploc, mapsize); - return (pace_mqd_t)-1; - } - - result->mptr = mmaploc; - result->length = mapsize; - result->oflag = oflag; - - return result; -} -#endif /* PACE_HAS_POSIX_NONUOF_FUNCS */ - -#if (PACE_HAS_POSIX_NONUOF_FUNCS) -int mq_close (pace_mqd_t mqdes) -{ - if (pace_pthread_mutex_lock (&( ((mqfile*)mqdes->mptr)->mutex)) == -1) - { - errno = EBADF; - return -1; - } - ((mqfile*)mqdes->mptr)->num_open--; - if (pace_pthread_mutex_unlock (&(((mqfile*)mqdes->mptr)->mutex)) == -1) - { - errno = EBADF; - return -1; - } - if (munmap (mqdes->mptr, mqdes->length) == -1) - { - return -1; - } - free (mqdes); - return 0; -} -#endif /* PACE_HAS_POSIX_NONUOF_FUNCS */ - -#if (PACE_HAS_POSIX_NONUOF_FUNCS) -int mq_unlink (const char* name) -{ - int result1, result2; - char* new_name; - new_name = (char*) malloc (256); - snprintf (new_name, 256, "/tmp%s%s", name, PACE_MQ_DATAPOSTFIX); - result1 = pace_unlink (new_name); - snprintf (new_name, 256, "/tmp%s%s", name, PACE_MQ_LOCKPOSTFIX); - result2 = pace_unlink (new_name); - free (new_name); - return (result1 == -1 || result2 == -1 ? -1 : 0); -} -#endif /* PACE_HAS_POSIX_NONUOF_FUNCS */ - -#if (PACE_HAS_POSIX_NONUOF_FUNCS) -int mq_send (pace_mqd_t mqdes, - const char* ptr, - pace_size_t length, - unsigned int priority) -{ - mqfile* queue = ((mqfile*)mqdes->mptr); - long index, old_index; - if (mqdes->oflag & O_RDONLY) - { - /* Incorrect access priviledges */ - errno = EBADF; - return -1; - } - if (queue->attr.mq_msgsize < (int) length) - { - /* Message too long */ - errno = EMSGSIZE; - return -1; - } - if ((errno = pace_pthread_mutex_lock (&queue->mutex)) != 0) - { - return -1; - } - /* If the queue is full... */ - if (queue->attr.mq_curmsgs >= queue->attr.mq_maxmsg) - { - if (queue->attr.mq_flags & O_NONBLOCK) - { - errno = EAGAIN; - return -1; - } - while (queue->attr.mq_maxmsg <= queue->attr.mq_curmsgs) - { - pace_pthread_cond_wait (&queue->cond, &queue->mutex); - pace_printf ("Send Woke Up\n"); - } - } - - /* Fill in the fields of the header */ - ((message_header*)(&mqdes->mptr[queue->freelist]))->priority = priority; - ((message_header*)(&mqdes->mptr[queue->freelist]))->length = length; - pace_memcpy (((void*)(&mqdes->mptr[queue->freelist + sizeof (message_header)])), - ptr, length); - - /* Update the linked list */ - old_index = 0; - index = queue->head; - while (index != 0 && ((message_header*)(&mqdes->mptr[index]))->priority >= priority) - { - old_index = index; - index = ((message_header*)(&mqdes->mptr[index]))->next; - } - - /* If the msg goes at the head */ - if (old_index == 0) - { - queue->head = queue->freelist; - queue->freelist = ((message_header*)(&mqdes->mptr[queue->freelist]))->next; - ((message_header*)(&mqdes->mptr[queue->head]))->next = index; - } - else - { - ((message_header*)(&mqdes->mptr[old_index]))->next = queue->freelist; - old_index = queue->freelist; - queue->freelist = ((message_header*)(&mqdes->mptr[queue->freelist]))->next; - ((message_header*)(&mqdes->mptr[old_index]))->next = index; - } - - queue->attr.mq_curmsgs++; - - if ((errno = pace_pthread_mutex_unlock (&queue->mutex)) != 0) - { - return -1; - } - - if (queue->attr.mq_curmsgs == 1) - { - /* If there is no one waiting and blocked */ - if (queue->not_pid != 0 && queue->rec_wait == 0) - { - if (queue->notification.sigev_notify == SIGEV_SIGNAL) - { - sigqueue (queue->not_pid, - queue->notification.sigev_signo, - queue->notification.sigev_value); - } - queue->not_pid = 0; - } - else - { - pace_printf ("Send is Signalling\n"); - /* Let other waiting threads know there is food on the table */ - if ((errno = pace_pthread_cond_signal (&((mqfile*)mqdes->mptr)->cond)) != 0) - { - return -1; - } - } - } - return 0; -} -#endif /* PACE_HAS_POSIX_NONUOF_FUNCS */ - -#if (PACE_HAS_POSIX_NONUOF_FUNCS) -pace_ssize_t mq_receive (pace_mqd_t mqdes, - char * msg_ptr, - pace_size_t msg_len, - unsigned int * nmsg_prio) -{ - mqfile* queue = ((mqfile*)mqdes->mptr); - pace_size_t temp; - - if (queue->attr.mq_msgsize > (long) msg_len) - { - errno = EMSGSIZE; - return -1; - } - - if ((errno = pace_pthread_mutex_lock (&queue->mutex)) != 0) - { - return -1; - } - - /* If the queue is empty... */ - if (queue->attr.mq_curmsgs <= 0) - { - if (queue->attr.mq_flags & O_NONBLOCK) - { - errno = EAGAIN; - return -1; - } - while (queue->attr.mq_curmsgs <= 0) - { - pace_printf ("Recv is going to sleep\n"); - queue->rec_wait++; - pace_pthread_cond_wait (&(queue->cond), &(queue->mutex)); - queue->rec_wait--; - pace_printf ("Recv is waking from sleep\n"); - } - } - - if (nmsg_prio != 0) - { - *nmsg_prio = ((message_header*)(&mqdes->mptr[queue->head]))->priority; - } - - pace_memcpy (msg_ptr, ((void*)(&mqdes->mptr[queue->head + sizeof (message_header)])), - ((message_header*)(&mqdes->mptr[queue->head]))->length); - temp = queue->head; - queue->head = ((message_header*)(&mqdes->mptr[queue->head]))->next; - ((message_header*)(&mqdes->mptr[temp]))->next = queue->freelist; - queue->freelist = temp; - - queue->attr.mq_curmsgs--; - - if (pace_pthread_mutex_unlock (&queue->mutex) == -1) - { - errno = EBADMSG; - return -1; - } - - if (queue->attr.mq_curmsgs == (queue->attr.mq_maxmsg-1)) - { - pace_printf ("Recv is signalling\n"); - /* Let other waiting threads know there is room available */ - if ((errno = pace_pthread_cond_signal (&((mqfile*)mqdes->mptr)->cond)) != 0) - { - return -1; - } - } - - return ((message_header*)(&mqdes->mptr[queue->head]))->length; -} -#endif /* PACE_HAS_POSIX_NONUOF_FUNCS */ - -#if (PACE_HAS_POSIX_NONUOF_FUNCS) -int mq_getattr (pace_mqd_t mqdes, pace_mq_attr * mqstat) -{ - mqfile* queue = ((mqfile*)mqdes->mptr); - - if ((errno = pace_pthread_mutex_lock (&queue->mutex)) != 0) - { - return -1; - } - - *mqstat = queue->attr; - - pace_pthread_mutex_unlock (&queue->mutex); - - return 0; -} -#endif /* PACE_HAS_POSIX_NONUOF_FUNCS */ - -#if (PACE_HAS_POSIX_NONUOF_FUNCS) -int mq_setattr(pace_mqd_t mqdes, - const pace_mq_attr * mqstat, - pace_mq_attr * omqstat) -{ - mqfile* queue = ((mqfile*)(mqdes->mptr)); - - if ((errno = pace_pthread_mutex_lock (&queue->mutex)) != 0) - { - return -1; - } - if (omqstat != 0) - { - *omqstat = ((mqfile*)mqdes->mptr)->attr; - } - if (mqstat == 0 || mqdes == 0) - { - /* You eediot*/ - errno = EFAULT; - pace_pthread_mutex_unlock (&queue->mutex); - return -1; - } - - ((mqfile*)mqdes->mptr)->attr.mq_flags = mqstat->mq_flags; - - pace_pthread_mutex_unlock (&queue->mutex); - return 0; -} -#endif /* PACE_HAS_POSIX_NONUOF_FUNCS */ - -#if (PACE_HAS_POSIX_NONUOF_FUNCS) -int mq_notify (pace_mqd_t mqd, const pace_sigevent* notification) -{ - mqfile* queue = ((mqfile*)(mqd->mptr)); - pace_pid_t pid = pace_getpid (); - - if ((errno = pace_pthread_mutex_lock (&queue->mutex)) != 0) - { - return -1; - } - - if (notification == 0) - { - /* Unregister if notification is null */ - if (queue->not_pid == pid) - { - queue->not_pid = 0; - } - } - else - { - if (queue->not_pid && pace_kill (queue->not_pid, 0)) - { - /* If another process is registered */ - if (errno != ESRCH) - { - pace_pthread_mutex_unlock (&queue->mutex); - return -1; - } - } - queue->not_pid = pid; - queue->notification = *notification; - } - - pthread_mutex_unlock (&queue->mutex); - - return 0; -} -#endif /* PACE_HAS_POSIX_NONUOF_FUNCS */ |