diff options
Diffstat (limited to 'ACE/protocols/ace/RMCast/Socket.cpp')
-rw-r--r-- | ACE/protocols/ace/RMCast/Socket.cpp | 396 |
1 files changed, 396 insertions, 0 deletions
diff --git a/ACE/protocols/ace/RMCast/Socket.cpp b/ACE/protocols/ace/RMCast/Socket.cpp new file mode 100644 index 00000000000..51e6d450173 --- /dev/null +++ b/ACE/protocols/ace/RMCast/Socket.cpp @@ -0,0 +1,396 @@ +// file : ace/RMCast/Socket.cpp +// author : Boris Kolpackov <boris@kolpackov.net> +// cvs-id : $Id$ + +#include "ace/OS_Memory.h" +#include "ace/OS_NS_stdio.h" +#include "ace/OS_NS_stdlib.h" +#include "ace/OS_NS_string.h" +#include "ace/OS_NS_unistd.h" +#include "ace/OS_NS_sys_time.h" // gettimeofday + +#include "ace/Unbounded_Queue.h" + +#include "ace/Pipe.h" + +#include "Stack.h" +#include "Protocol.h" +#include "Bits.h" + +#include "Fragment.h" +#include "Reassemble.h" +#include "Acknowledge.h" +#include "Retransmit.h" +#include "Flow.h" +#include "Link.h" + +#include "Socket.h" + +/* +#include <iostream> +using std::cerr; +using std::endl; +*/ + +namespace ACE_RMCast +{ + class Socket_Impl : protected Element + { + public: + ~Socket_Impl (); + + Socket_Impl (Address const& a, bool loop, Parameters const& params); + + public: + void + send_ (void const* buf, size_t s); + + ssize_t + recv_ (void* buf, + size_t s, + ACE_Time_Value const* timeout, + ACE_INET_Addr* from); + + ssize_t + size_ (ACE_Time_Value const* timeout); + + ACE_HANDLE + get_handle_ (); + + private: + //FUZZ: disable check_for_lack_ACE_OS + virtual void recv (Message_ptr m); + //FUZZ: enable check_for_lack_ACE_OS + + private: + bool loop_; + Parameters const params_; + + Mutex mutex_; + Condition cond_; + + ACE_Unbounded_Queue<Message_ptr> queue_; + + ACE_Pipe signal_pipe_; + + ACE_Auto_Ptr<Fragment> fragment_; + ACE_Auto_Ptr<Reassemble> reassemble_; + ACE_Auto_Ptr<Acknowledge> acknowledge_; + ACE_Auto_Ptr<Retransmit> retransmit_; + ACE_Auto_Ptr<Flow> flow_; + ACE_Auto_Ptr<Link> link_; + }; + + + Socket_Impl:: + Socket_Impl (Address const& a, bool loop, Parameters const& params) + : loop_ (loop), + params_ (params), + cond_ (mutex_) + { + fragment_.reset (new Fragment (params_)); + reassemble_.reset (new Reassemble (params_)); + acknowledge_.reset (new Acknowledge (params_)); + retransmit_.reset (new Retransmit (params_)); + flow_.reset (new Flow (params_)); + link_.reset (new Link (a, params_)); + + // Start IN stack from top to bottom. + // + in_start (0); + fragment_->in_start (this); + reassemble_->in_start (fragment_.get ()); + acknowledge_->in_start (reassemble_.get ()); + retransmit_->in_start (acknowledge_.get ()); + flow_->in_start (retransmit_.get ()); + link_->in_start (flow_.get ()); + + // Start OUT stack from bottom up. + // + link_->out_start (0); + flow_->out_start (link_.get ()); + retransmit_->out_start (flow_.get ()); + acknowledge_->out_start (retransmit_.get ()); + reassemble_->out_start (acknowledge_.get ()); + fragment_->out_start (reassemble_.get ()); + out_start (fragment_.get ()); + } + + Socket_Impl:: + ~Socket_Impl () + { + // Stop OUT stack from top to bottom. + // + out_stop (); + fragment_->out_stop (); + reassemble_->out_stop (); + acknowledge_->out_stop (); + retransmit_->out_stop (); + flow_->out_stop (); + link_->out_stop (); + + // Stop IN stack from bottom up. + // + link_->in_stop (); + flow_->in_stop (); + retransmit_->in_stop (); + acknowledge_->in_stop (); + reassemble_->in_stop (); + fragment_->in_stop (); + in_stop (); + + // Close signal pipe. + // + if (signal_pipe_.read_handle () != ACE_INVALID_HANDLE) + signal_pipe_.close (); + } + + + void Socket_Impl:: + send_ (void const* buf, size_t s) + { + Message_ptr m (new Message); + + m->add (Profile_ptr (new Data (buf, s))); + + // Qualification is for VC6 and VxWorks. + // + Element::send (m); + } + + ssize_t Socket_Impl:: + recv_ (void* buf, + size_t s, + ACE_Time_Value const* timeout, + ACE_INET_Addr* from) + { + ACE_Time_Value abs_time; + + if (timeout) + abs_time = ACE_OS::gettimeofday () + *timeout; + + Lock l (mutex_); + + while (queue_.is_empty ()) + { + if (timeout) + { + if (cond_.wait (&abs_time) != -1) + break; + } + else + { + if (cond_.wait () != -1) + break; + } + + return -1; // errno is already set + } + + + Message_ptr m; + + if (queue_.dequeue_head (m) == -1) + ACE_OS::abort (); + + + if (queue_.is_empty ()) + { + // Remove data from the pipe. + // + if (signal_pipe_.read_handle () != ACE_INVALID_HANDLE) + { + char c; + + if (signal_pipe_.recv (&c, 1) != 1) + { + ACE_OS::perror ("read: "); + ACE_OS::abort (); + } + } + } + + if (from) + *from = static_cast<From const*> (m->find (From::id))->address (); + + if (m->find (NoData::id) != 0) + { + errno = ENOENT; + return -1; + } + + Data const* d = static_cast<Data const*>(m->find (Data::id)); + + ssize_t r (static_cast<ssize_t> (d->size () < s ? d->size () : s)); + + ACE_OS::memcpy (buf, d->buf (), r); + + return r; + } + + ssize_t Socket_Impl:: + size_ (ACE_Time_Value const* timeout) + { + ACE_Time_Value abs_time; + + if (timeout) + abs_time = ACE_OS::gettimeofday () + *timeout; + + Lock l (mutex_); + + while (queue_.is_empty ()) + { + if (timeout) + { + if (cond_.wait (&abs_time) != -1) + break; + } + else + { + if (cond_.wait () != -1) + break; + } + + return -1; // errno is already set + } + + // I can't get the head of the queue without actually dequeuing + // the element. + // + Message_ptr m; + + if (queue_.dequeue_head (m) == -1) + ACE_OS::abort (); + + if (queue_.enqueue_head (m) == -1) + ACE_OS::abort (); + + if (m->find (NoData::id) != 0) + { + errno = ENOENT; + return -1; + } + + Data const* d = static_cast<Data const*>(m->find (Data::id)); + + return static_cast<ssize_t> (d->size ()); + } + + ACE_HANDLE Socket_Impl:: + get_handle_ () + { + if (signal_pipe_.read_handle () == ACE_INVALID_HANDLE) + { + signal_pipe_.open (); + } + + return signal_pipe_.read_handle (); + } + + + void Socket_Impl::recv (Message_ptr m) + { + if (m->find (Data::id) != 0 || m->find (NoData::id) != 0) + { + if (!loop_) + { + Address to (static_cast<To const*> (m->find (To::id))->address ()); + + Address from ( + static_cast<From const*> (m->find (From::id))->address ()); + + if (to == from) + return; + } + + Lock l (mutex_); + + //if (queue_.size () != 0) + // cerr << "recv socket queue size: " << queue_.size () << endl; + + //FUZZ: disable check_for_lack_ACE_OS + bool signal (queue_.is_empty ()); + //FUZZ: enable check_for_lack_ACE_OS + + queue_.enqueue_tail (m); + + if (signal) + { + // Also write to the pipe. + if (signal_pipe_.write_handle () != ACE_INVALID_HANDLE) + { + char c; + + if (signal_pipe_.send (&c, 1) != 1) + { + // perror ("write: "); + ACE_OS::abort (); + } + } + + cond_.signal (); + } + } + } + + + // Socket + // + // + Socket:: + ~Socket () + { + } + + Socket:: + Socket (Address const& a, bool loop, Parameters const& params) + : impl_ (new Socket_Impl (a, loop, params)) + { + } + + void Socket::send (void const* buf, size_t s) + { + impl_->send_ (buf, s); + } + + ssize_t Socket::recv (void* buf, size_t s) + { + return impl_->recv_ (buf, s, 0, 0); + } + + ssize_t Socket::recv (void* buf, size_t s, ACE_INET_Addr& from) + { + return impl_->recv_ (buf, s, 0, &from); + } + + ssize_t Socket::recv (void* buf, size_t s, ACE_Time_Value const& timeout) + { + return impl_->recv_ (buf, s, &timeout, 0); + } + + ssize_t Socket::recv (void* buf, + size_t s, + ACE_Time_Value const& timeout, + ACE_INET_Addr& from) + { + return impl_->recv_ (buf, s, &timeout, &from); + } + + ssize_t Socket:: + size () + { + return impl_->size_ (0); + } + + ssize_t Socket:: + size (ACE_Time_Value const& timeout) + { + return impl_->size_ (&timeout); + } + + ACE_HANDLE Socket:: + get_handle () + { + return impl_->get_handle_ (); + } +} |