diff options
author | William R. Otte <wotte@dre.vanderbilt.edu> | 2006-07-24 15:50:30 +0000 |
---|---|---|
committer | William R. Otte <wotte@dre.vanderbilt.edu> | 2006-07-24 15:50:30 +0000 |
commit | c44379cc7d9c7aa113989237ab0f56db12aa5219 (patch) | |
tree | 66a84b20d47f2269d8bdc6e0323f338763424d3a /ACE/ace/MEM_IO.cpp | |
parent | 3aff90f4a822fcf5d902bbfbcc9fa931d6191a8c (diff) | |
download | ATCD-c44379cc7d9c7aa113989237ab0f56db12aa5219.tar.gz |
Repo restructuring
Diffstat (limited to 'ACE/ace/MEM_IO.cpp')
-rw-r--r-- | ACE/ace/MEM_IO.cpp | 541 |
1 files changed, 541 insertions, 0 deletions
diff --git a/ACE/ace/MEM_IO.cpp b/ACE/ace/MEM_IO.cpp new file mode 100644 index 00000000000..7a9616f5220 --- /dev/null +++ b/ACE/ace/MEM_IO.cpp @@ -0,0 +1,541 @@ +// MEM_IO.cpp +// $Id$ + +#include "ace/MEM_IO.h" +#include "ace/Handle_Set.h" + +#if (ACE_HAS_POSITION_INDEPENDENT_POINTERS == 1) + +#if !defined (__ACE_INLINE__) +#include "ace/MEM_IO.inl" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(ace, MEM_IO, "$Id$") + +ACE_BEGIN_VERSIONED_NAMESPACE_DECL + +ACE_ALLOC_HOOK_DEFINE(ACE_MEM_IO) + +ACE_Reactive_MEM_IO::~ACE_Reactive_MEM_IO (void) +{ +} + +int +ACE_Reactive_MEM_IO::init (ACE_HANDLE handle, + const ACE_TCHAR *name, + MALLOC_OPTIONS *options) +{ + ACE_TRACE ("ACE_Reactive_MEM_IO::init"); + this->handle_ = handle; + return this->create_shm_malloc (name, + options); +} + +ssize_t +ACE_Reactive_MEM_IO::recv_buf (ACE_MEM_SAP_Node *&buf, + int flags, + const ACE_Time_Value *timeout) +{ + ACE_TRACE ("ACE_Reactive_MEM_IO::recv_buf"); + + if (this->shm_malloc_ == 0 || this->handle_ == ACE_INVALID_HANDLE) + return -1; + + off_t new_offset = 0; + ssize_t retv = ACE::recv (this->handle_, + (char *) &new_offset, + sizeof (off_t), + flags, + timeout); + + if (retv == 0) + { + // ACE_DEBUG ((LM_INFO, "MEM_Stream closed\n")); + buf = 0; + return 0; + } + else if (retv != sizeof (off_t)) + { + // Nothing available or we are really screwed. + buf = 0; + return -1; + } + + return this->get_buf_len (new_offset, buf); +} + +ssize_t +ACE_Reactive_MEM_IO::send_buf (ACE_MEM_SAP_Node *buf, + int flags, + const ACE_Time_Value *timeout) +{ + ACE_TRACE ("ACE_Reactive_MEM_IO::send_buf"); + + if (this->shm_malloc_ == 0 || this->handle_ == ACE_INVALID_HANDLE) + return -1; + + off_t offset = reinterpret_cast<char *> (buf) - + static_cast<char *> (this->shm_malloc_->base_addr ()); // the offset. + // Send the offset value over the socket. + if (ACE::send (this->handle_, + (const char *) &offset, + sizeof (offset), + flags, + timeout) != sizeof (offset)) + { + // unsucessful send, release the memory in the shared-memory. + this->release_buffer (buf); + + return -1; + } + return buf->size (); +} + +#if defined (ACE_WIN32) || !defined (_ACE_USE_SV_SEM) +int +ACE_MT_MEM_IO::Simple_Queue::write (ACE_MEM_SAP_Node *new_node) +{ + if (this->mq_ == 0) + return -1; + + // Here, we assume we already have acquired the lock necessary. + // And we are allowed to write. + if (this->mq_->tail_.addr () == 0) // nothing in the queue. + { + this->mq_->head_ = new_node; + this->mq_->tail_ = new_node; + new_node->next_ = 0; + } + else + { + this->mq_->tail_->next_ = new_node; + new_node->next_ = 0; + this->mq_->tail_ = new_node; + } + return 0; +} + +ACE_MEM_SAP_Node * +ACE_MT_MEM_IO::Simple_Queue::read () +{ + if (this->mq_ == 0) + return 0; + + ACE_MEM_SAP_Node *retv = 0; + + ACE_SEH_TRY + { + retv = this->mq_->head_; + // Here, we assume we already have acquired the lock necessary + // and there are soemthing in the queue. + if (this->mq_->head_ == this->mq_->tail_) + { + // Last message in the queue. + this->mq_->head_ = 0; + this->mq_->tail_ = 0; + } + else + this->mq_->head_ = retv->next_; + } + ACE_SEH_EXCEPT (this->malloc_->memory_pool ().seh_selector (GetExceptionInformation ())) + { + } + + return retv; +} + +ACE_MT_MEM_IO::~ACE_MT_MEM_IO () +{ + delete this->recv_channel_.sema_; + delete this->recv_channel_.lock_; + delete this->send_channel_.sema_; + delete this->send_channel_.lock_; +} + +int +ACE_MT_MEM_IO::init (ACE_HANDLE handle, + const ACE_TCHAR *name, + MALLOC_OPTIONS *options) +{ + ACE_TRACE ("ACE_MT_MEM_IO::init"); + ACE_UNUSED_ARG (handle); + + // @@ Give me a rule on naming and how the queue should + // be kept in the shared memory and we are done + // with this. + if (this->create_shm_malloc (name, options) == -1) + return -1; + + ACE_TCHAR server_sema [MAXPATHLEN]; + ACE_TCHAR client_sema [MAXPATHLEN]; + ACE_TCHAR server_lock [MAXPATHLEN]; + ACE_TCHAR client_lock [MAXPATHLEN]; + const ACE_TCHAR *basename = ACE::basename (name); + // size_t baselen = ACE_OS::strlen (basename); + + // Building names. @@ Check buffer overflow? + ACE_OS::strcpy (server_sema, basename); + ACE_OS::strcat (server_sema, ACE_LIB_TEXT ("_sema_to_server")); + ACE_OS::strcpy (client_sema, basename); + ACE_OS::strcat (client_sema, ACE_LIB_TEXT ("_sema_to_client")); + ACE_OS::strcpy (server_lock, basename); + ACE_OS::strcat (server_lock, ACE_LIB_TEXT ("_lock_to_server")); + ACE_OS::strcpy (client_lock, basename); + ACE_OS::strcat (client_lock, ACE_LIB_TEXT ("_lock_to_client")); + + void *to_server_ptr = 0; + // @@ Here, we assume the shared memory fill will never be resued. + // So we can determine whether we are server or client by examining + // if the simple message queues have already been set up in + // the Malloc object or not. + if (this->shm_malloc_->find ("to_server", to_server_ptr) == -1) + { + void *ptr = 0; + // We are server. + ACE_ALLOCATOR_RETURN (ptr, + this->shm_malloc_->malloc (2 * sizeof (MQ_Struct)), + -1); + + MQ_Struct *mymq = reinterpret_cast<MQ_Struct *> (ptr); + mymq->tail_ = 0; + mymq->head_ = 0; + (mymq + 1)->tail_ = 0; + (mymq + 1)->head_ = 0; + if (this->shm_malloc_->bind ("to_server", mymq) == -1) + return -1; + + if (this->shm_malloc_->bind ("to_client", mymq + 1) == -1) + return -1; + + this->recv_channel_.queue_.init (mymq, this->shm_malloc_); + ACE_NEW_RETURN (this->recv_channel_.sema_, + ACE_SYNCH_PROCESS_SEMAPHORE (0, server_sema), + -1); + ACE_NEW_RETURN (this->recv_channel_.lock_, + ACE_SYNCH_PROCESS_MUTEX (server_lock), + -1); + + this->send_channel_.queue_.init (mymq + 1, this->shm_malloc_); + ACE_NEW_RETURN (this->send_channel_.sema_, + ACE_SYNCH_PROCESS_SEMAPHORE (0, client_sema), + -1); + ACE_NEW_RETURN (this->send_channel_.lock_, + ACE_SYNCH_PROCESS_MUTEX (client_lock), + -1); + } + else + { + // we are client. + MQ_Struct *mymq = reinterpret_cast<MQ_Struct *> (to_server_ptr); + this->recv_channel_.queue_.init (mymq +1, this->shm_malloc_); + ACE_NEW_RETURN (this->recv_channel_.sema_, + ACE_SYNCH_PROCESS_SEMAPHORE (0, client_sema), + -1); + ACE_NEW_RETURN (this->recv_channel_.lock_, + ACE_SYNCH_PROCESS_MUTEX (client_lock), + -1); + + this->send_channel_.queue_.init (mymq, this->shm_malloc_); + ACE_NEW_RETURN (this->send_channel_.sema_, + ACE_SYNCH_PROCESS_SEMAPHORE (0, server_sema), + -1); + ACE_NEW_RETURN (this->send_channel_.lock_, + ACE_SYNCH_PROCESS_MUTEX (server_lock), + -1); + } + return 0; +} + +ssize_t +ACE_MT_MEM_IO::recv_buf (ACE_MEM_SAP_Node *&buf, + int flags, + const ACE_Time_Value *timeout) +{ + ACE_TRACE ("ACE_MT_MEM_IO::recv_buf"); + + // @@ Don't know how to handle timeout yet. + ACE_UNUSED_ARG (timeout); + ACE_UNUSED_ARG (flags); + + if (this->shm_malloc_ == 0) + return -1; + + // Need to handle timeout here. + if (this->recv_channel_.sema_->acquire () == -1) + return -1; + + { + // @@ We can probably skip the lock in certain circumstance. + ACE_GUARD_RETURN (ACE_SYNCH_PROCESS_MUTEX, ace_mon, *this->recv_channel_.lock_, -1); + + buf = this->recv_channel_.queue_.read (); + if (buf != 0) + return buf->size (); + return -1; + } +} + +ssize_t +ACE_MT_MEM_IO::send_buf (ACE_MEM_SAP_Node *buf, + int flags, + const ACE_Time_Value *timeout) +{ + ACE_TRACE ("ACE_MT_MEM_IO::send_buf"); + + // @@ Don't know how to handle timeout yet. + ACE_UNUSED_ARG (timeout); + ACE_UNUSED_ARG (flags); + + if (this->shm_malloc_ == 0) + return -1; + + { + // @@ We can probably skip the lock in certain curcumstances. + ACE_GUARD_RETURN (ACE_SYNCH_PROCESS_MUTEX, ace_mon, *this->send_channel_.lock_, -1); + + if (this->send_channel_.queue_.write (buf) == -1) + { + this->release_buffer (buf); + return -1; + } + } + + if (this->send_channel_.sema_->release () == -1) + return -1; + + return buf->size (); +} +#endif /* ACE_WIN32 || !_ACE_USE_SV_SEM */ + +void +ACE_MEM_IO::dump (void) const +{ +#if defined (ACE_HAS_DUMP) + ACE_TRACE ("ACE_MEM_IO::dump"); +#endif /* ACE_HAS_DUMP */ +} + +int +ACE_MEM_IO::init (const ACE_TCHAR *name, + ACE_MEM_IO::Signal_Strategy type, + ACE_MEM_SAP::MALLOC_OPTIONS *options) +{ + ACE_UNUSED_ARG (type); + + delete this->deliver_strategy_; + this->deliver_strategy_ = 0; + switch (type) + { + case ACE_MEM_IO::Reactive: + ACE_NEW_RETURN (this->deliver_strategy_, + ACE_Reactive_MEM_IO (), + -1); + break; +#if defined (ACE_WIN32) || !defined (_ACE_USE_SV_SEM) + case ACE_MEM_IO::MT: + ACE_NEW_RETURN (this->deliver_strategy_, + ACE_MT_MEM_IO (), + -1); + break; +#endif /* ACE_WIN32 || !_ACE_USE_SV_SEM */ + default: + return -1; + } + + return this->deliver_strategy_->init (this->get_handle (), + name, + options); +} + +int +ACE_MEM_IO::fini () +{ + if (this->deliver_strategy_ != 0) + return this->deliver_strategy_->fini (); + else + return -1; +} + +// Allows a client to read from a socket without having to provide +// a buffer to read. This method determines how much data is in the +// socket, allocates a buffer of this size, reads in the data, and +// returns the number of bytes read. + +ssize_t +ACE_MEM_IO::send (const ACE_Message_Block *message_block, + const ACE_Time_Value *timeout) +{ + ACE_TRACE ("ACE_MEM_IO::send"); + + if (this->deliver_strategy_ == 0) + return -1; // Something went seriously wrong. + + size_t len = message_block->total_length (); + + if (len != 0) + { + ACE_MEM_SAP_Node *buf = + reinterpret_cast<ACE_MEM_SAP_Node *> ( + this->deliver_strategy_->acquire_buffer (len)); + size_t n = 0; + while (message_block != 0) + { + ACE_OS::memcpy (static_cast<char *> (buf->data ()) + n, + message_block->rd_ptr (), + message_block->length ()); + n += message_block->length (); + + if (message_block->cont ()) + message_block = message_block->cont (); + else + message_block = message_block->next (); + } + + buf->size_ = len; + + return this->deliver_strategy_->send_buf (buf, + 0, + timeout); + } + return 0; +} + + +#if 0 +ssize_t +ACE_MEM_IO::recvv (iovec *io_vec, + const ACE_Time_Value *timeout) +{ + ACE_TRACE ("ACE_MEM_IO::recvv"); +#if defined (FIONREAD) + ACE_Handle_Set handle_set; + handle_set.reset (); + handle_set.set_bit (this->get_handle ()); + + io_vec->iov_base = 0; + + // Check the status of the current socket. + switch (ACE_OS::select (int (this->get_handle ()) + 1, + handle_set, + 0, 0, + timeout)) + { + case -1: + return -1; + /* NOTREACHED */ + case 0: + errno = ETIME; + return -1; + /* NOTREACHED */ + default: + // Goes fine, fallthrough to get data + break; + } + + int inlen; + + if (ACE_OS::ioctl (this->get_handle (), + FIONREAD, + &inlen) == -1) + return -1; + else if (inlen > 0) + { + ACE_NEW_RETURN (io_vec->iov_base, + char[inlen], + -1); + io_vec->iov_len = this->recv (io_vec->iov_base, + inlen); + return io_vec->iov_len; + } + else + return 0; +#else + ACE_UNUSED_ARG (io_vec); + ACE_UNUSED_ARG (timeout); + ACE_NOTSUP_RETURN (-1); +#endif /* FIONREAD */ +} + +// Send N char *ptrs and int lengths. Note that the char *'s precede +// the ints (basically, an varargs version of writev). The count N is +// the *total* number of trailing arguments, *not* a couple of the +// number of tuple pairs! + +ssize_t +ACE_MEM_IO::send (size_t n, ...) const +{ + ACE_TRACE ("ACE_MEM_IO::send"); + + va_list argp; + size_t total_tuples = n / 2; + iovec *iovp; +#if defined (ACE_HAS_ALLOCA) + iovp = (iovec *) alloca (total_tuples * sizeof (iovec)); +#else + ACE_NEW_RETURN (iovp, + iovec[total_tuples], + -1); +#endif /* !defined (ACE_HAS_ALLOCA) */ + + va_start (argp, n); + + for (size_t i = 0; i < total_tuples; i++) + { + iovp[i].iov_base = va_arg (argp, char *); + iovp[i].iov_len = va_arg (argp, ssize_t); + } + + ssize_t result = ACE_OS::sendv (this->get_handle (), + iovp, + total_tuples); +#if !defined (ACE_HAS_ALLOCA) + delete [] iovp; +#endif /* !defined (ACE_HAS_ALLOCA) */ + va_end (argp); + return result; +} + +// This is basically an interface to ACE_OS::readv, that doesn't use +// the struct iovec_Base explicitly. The ... can be passed as an arbitrary +// number of (char *ptr, int len) tuples. However, the count N is the +// *total* number of trailing arguments, *not* a couple of the number +// of tuple pairs! + +ssize_t +ACE_MEM_IO::recv (size_t n, ...) const +{ + ACE_TRACE ("ACE_MEM_IO::recv"); + + va_list argp; + size_t total_tuples = n / 2; + iovec *iovp; +#if defined (ACE_HAS_ALLOCA) + iovp = (iovec *) alloca (total_tuples * sizeof (iovec)); +#else + ACE_NEW_RETURN (iovp, + iovec[total_tuples], + -1); +#endif /* !defined (ACE_HAS_ALLOCA) */ + + va_start (argp, n); + + for (size_t i = 0; i < total_tuples; i++) + { + iovp[i].iov_base = va_arg (argp, char *); + iovp[i].iov_len = va_arg (argp, ssize_t); + } + + ssize_t result = ACE_OS::recvv (this->get_handle (), + iovp, + total_tuples); +#if !defined (ACE_HAS_ALLOCA) + delete [] iovp; +#endif /* !defined (ACE_HAS_ALLOCA) */ + va_end (argp); + return result; +} +#endif /* 0 */ + +ACE_END_VERSIONED_NAMESPACE_DECL + +#endif /* ACE_HAS_POSITION_INDEPENDENT_POINTERS == 1 */ |