diff options
author | schmidt <douglascraigschmidt@users.noreply.github.com> | 2005-09-20 13:40:57 +0000 |
---|---|---|
committer | schmidt <douglascraigschmidt@users.noreply.github.com> | 2005-09-20 13:40:57 +0000 |
commit | 3a91c94da44c5ecc38034b5edc63ee02c8cd6456 (patch) | |
tree | e1450e5c84176d4200fb5ddc6a8b395e458a3923 /protocols | |
parent | b77a923d4dee47d6c8fa83b6c9e68d532c4fc929 (diff) | |
download | ATCD-3a91c94da44c5ecc38034b5edc63ee02c8cd6456.tar.gz |
ChangeLogTag:Tue Sep 20 10:10:00 UTC 2005 Simon Massey <simon.massey@prismtech.com>
Diffstat (limited to 'protocols')
-rw-r--r-- | protocols/ace/RMCast/Socket.cpp | 463 |
1 files changed, 231 insertions, 232 deletions
diff --git a/protocols/ace/RMCast/Socket.cpp b/protocols/ace/RMCast/Socket.cpp index 03564335ff3..448d4692e81 100644 --- a/protocols/ace/RMCast/Socket.cpp +++ b/protocols/ace/RMCast/Socket.cpp @@ -80,314 +80,313 @@ namespace ACE_RMCast Socket_Impl:: - Socket_Impl (Address const& a, bool loop, Parameters const& params) - : loop_ (loop), - params_ (params), - cond_ (mutex_) - { - fragment_.reset (new Fragment (params_)); - reassemble_.reset (new Reassemble (params_)); - acknowledge_.reset (new Acknowledge (params_)); - retransmit_.reset (new Retransmit (params_)); - flow_.reset (new Flow (params_)); - link_.reset (new Link (a, params_)); - - // Start IN stack from top to bottom. - // - in_start (0); - fragment_->in_start (this); - reassemble_->in_start (fragment_.get ()); - acknowledge_->in_start (reassemble_.get ()); - retransmit_->in_start (acknowledge_.get ()); - flow_->in_start (retransmit_.get ()); - link_->in_start (flow_.get ()); - - // Start OUT stack from bottom up. - // - link_->out_start (0); - flow_->out_start (link_.get ()); - retransmit_->out_start (flow_.get ()); - acknowledge_->out_start (retransmit_.get ()); - reassemble_->out_start (acknowledge_.get ()); - fragment_->out_start (reassemble_.get ()); - out_start (fragment_.get ()); - } + Socket_Impl (Address const& a, bool loop, Parameters const& params) + : loop_ (loop), + params_ (params), + cond_ (mutex_) + { + fragment_.reset (new Fragment (params_)); + reassemble_.reset (new Reassemble (params_)); + acknowledge_.reset (new Acknowledge (params_)); + retransmit_.reset (new Retransmit (params_)); + flow_.reset (new Flow (params_)); + link_.reset (new Link (a, params_)); + + // Start IN stack from top to bottom. + // + in_start (0); + fragment_->in_start (this); + reassemble_->in_start (fragment_.get ()); + acknowledge_->in_start (reassemble_.get ()); + retransmit_->in_start (acknowledge_.get ()); + flow_->in_start (retransmit_.get ()); + link_->in_start (flow_.get ()); + + // Start OUT stack from bottom up. + // + link_->out_start (0); + flow_->out_start (link_.get ()); + retransmit_->out_start (flow_.get ()); + acknowledge_->out_start (retransmit_.get ()); + reassemble_->out_start (acknowledge_.get ()); + fragment_->out_start (reassemble_.get ()); + out_start (fragment_.get ()); + } Socket_Impl:: - ~Socket_Impl () - { - // Stop OUT stack from top to bottom. - // - out_stop (); - fragment_->out_stop (); - reassemble_->out_stop (); - acknowledge_->out_stop (); - retransmit_->out_stop (); - flow_->out_stop (); - link_->out_stop (); - - // Stop IN stack from bottom up. - // - link_->in_stop (); - flow_->in_stop (); - retransmit_->in_stop (); - acknowledge_->in_stop (); - reassemble_->in_stop (); - fragment_->in_stop (); - in_stop (); - } + ~Socket_Impl () + { + // Stop OUT stack from top to bottom. + // + out_stop (); + fragment_->out_stop (); + reassemble_->out_stop (); + acknowledge_->out_stop (); + retransmit_->out_stop (); + flow_->out_stop (); + link_->out_stop (); + + // Stop IN stack from bottom up. + // + link_->in_stop (); + flow_->in_stop (); + retransmit_->in_stop (); + acknowledge_->in_stop (); + reassemble_->in_stop (); + fragment_->in_stop (); + in_stop (); + } void Socket_Impl:: - send_ (void const* buf, size_t s) - { - Message_ptr m (new Message); + send_ (void const* buf, size_t s) + { + Message_ptr m (new Message); - m->add (Profile_ptr (new Data (buf, s))); + m->add (Profile_ptr (new Data (buf, s))); - // Qualification is for VC6 and VxWorks. - // - Element::send (m); - } + // Qualification is for VC6 and VxWorks. + // + Element::send (m); + } ssize_t Socket_Impl:: - recv_ (void* buf, - size_t s, - ACE_Time_Value const* timeout, - ACE_INET_Addr* from) - { - ACE_Time_Value abs_time; - - if (timeout) - abs_time = ACE_OS::gettimeofday () + *timeout; - - Lock l (mutex_); - - while (queue_.is_empty ()) + recv_ (void* buf, + size_t s, + ACE_Time_Value const* timeout, + ACE_INET_Addr* from) { + ACE_Time_Value abs_time; + if (timeout) - { - if (cond_.wait (&abs_time) != -1) - break; - } - else - { - if (cond_.wait () != -1) - break; - } - - return -1; // errno is already set - } + abs_time = ACE_OS::gettimeofday () + *timeout; + Lock l (mutex_); + + while (queue_.is_empty ()) + { + if (timeout) + { + if (cond_.wait (&abs_time) != -1) + break; + } + else + { + if (cond_.wait () != -1) + break; + } + + return -1; // errno is already set + } - Message_ptr m; - if (queue_.dequeue_head (m) == -1) - abort (); + Message_ptr m; + if (queue_.dequeue_head (m) == -1) + abort (); - if (queue_.is_empty ()) - { - // Remove data from the pipe. - // - if (signal_pipe_.read_handle () != ACE_INVALID_HANDLE) - { - char c; - if (ACE_OS::read (signal_pipe_.read_handle (), &c, 1) != 1) + if (queue_.is_empty ()) { - perror ("read: "); - abort (); + // Remove data from the pipe. + // + if (signal_pipe_.read_handle () != ACE_INVALID_HANDLE) + { + char c; + + if (signal_pipe_.recv (&c, 1) != 1) + { + perror ("read: "); + abort (); + } + } } - } - } - if (from) - *from = static_cast<From const*> (m->find (From::id))->address (); + if (from) + *from = static_cast<From const*> (m->find (From::id))->address (); - if (m->find (NoData::id) != 0) - { - errno = ENOENT; - return -1; - } + if (m->find (NoData::id) != 0) + { + errno = ENOENT; + return -1; + } - Data const* d = static_cast<Data const*>(m->find (Data::id)); + Data const* d = static_cast<Data const*>(m->find (Data::id)); - ssize_t r (static_cast<ssize_t> (d->size () < s ? d->size () : s)); + ssize_t r (static_cast<ssize_t> (d->size () < s ? d->size () : s)); - ACE_OS::memcpy (buf, d->buf (), r); + ACE_OS::memcpy (buf, d->buf (), r); - return r; - } + return r; + } ssize_t Socket_Impl:: - size_ (ACE_Time_Value const* timeout) - { - ACE_Time_Value abs_time; + size_ (ACE_Time_Value const* timeout) + { + ACE_Time_Value abs_time; - if (timeout) - abs_time = ACE_OS::gettimeofday () + *timeout; + if (timeout) + abs_time = ACE_OS::gettimeofday () + *timeout; - Lock l (mutex_); + Lock l (mutex_); - while (queue_.is_empty ()) - { - if (timeout) - { - if (cond_.wait (&abs_time) != -1) - break; - } - else - { - if (cond_.wait () != -1) - break; - } - - return -1; // errno is already set - } + while (queue_.is_empty ()) + { + if (timeout) + { + if (cond_.wait (&abs_time) != -1) + break; + } + else + { + if (cond_.wait () != -1) + break; + } + + return -1; // errno is already set + } - // I can't get the head of the queue without actually dequeuing - // the element. - // - Message_ptr m; + // I can't get the head of the queue without actually dequeuing + // the element. + // + Message_ptr m; - if (queue_.dequeue_head (m) == -1) - abort (); + if (queue_.dequeue_head (m) == -1) + abort (); - if (queue_.enqueue_head (m) == -1) - abort (); + if (queue_.enqueue_head (m) == -1) + abort (); - if (m->find (NoData::id) != 0) - { - errno = ENOENT; - return -1; - } + if (m->find (NoData::id) != 0) + { + errno = ENOENT; + return -1; + } - Data const* d = static_cast<Data const*>(m->find (Data::id)); + Data const* d = static_cast<Data const*>(m->find (Data::id)); - return static_cast<ssize_t> (d->size ()); - } + return static_cast<ssize_t> (d->size ()); + } ACE_HANDLE Socket_Impl:: - get_handle_ () - { - if (signal_pipe_.read_handle () == ACE_INVALID_HANDLE) + get_handle_ () { - signal_pipe_.open (); - } + if (signal_pipe_.read_handle () == ACE_INVALID_HANDLE) + { + signal_pipe_.open (); + } - return signal_pipe_.read_handle (); - } + return signal_pipe_.read_handle (); + } void Socket_Impl:: - recv (Message_ptr m) - { - if (m->find (Data::id) != 0 || m->find (NoData::id) != 0) + recv (Message_ptr m) { - if (!loop_) - { - Address to (static_cast<To const*> (m->find (To::id))->address ()); + if (m->find (Data::id) != 0 || m->find (NoData::id) != 0) + { + if (!loop_) + { + Address to (static_cast<To const*> (m->find (To::id))->address ()); - Address from ( - static_cast<From const*> (m->find (From::id))->address ()); + Address from ( + static_cast<From const*> (m->find (From::id))->address ()); - if (to == from) - return; - } + if (to == from) + return; + } - Lock l (mutex_); + Lock l (mutex_); - //if (queue_.size () != 0) - // cerr << "recv socket queue size: " << queue_.size () << endl; + //if (queue_.size () != 0) + // cerr << "recv socket queue size: " << queue_.size () << endl; - bool signal (queue_.is_empty ()); + bool signal (queue_.is_empty ()); - queue_.enqueue_tail (m); + queue_.enqueue_tail (m); - if (signal) - { - // Also write to the pipe. - // - if (signal_pipe_.write_handle () != ACE_INVALID_HANDLE) - { - char c; + if (signal) + { + // Also write to the pipe. + if (signal_pipe_.write_handle () != ACE_INVALID_HANDLE) + { + char c; - if (ACE_OS::write (signal_pipe_.write_handle (), &c, 1) != 1) - { - // perror ("write: "); - abort (); - } - } + if (signal_pipe_.send (&c, 1) != 1) + { + // perror ("write: "); + abort (); + } + } - cond_.signal (); - } + cond_.signal (); + } + } } - } // Socket // // Socket:: - ~Socket () - { - } + ~Socket () + { + } Socket:: - Socket (Address const& a, bool loop, Parameters const& params) - : impl_ (new Socket_Impl (a, loop, params)) - { - } + Socket (Address const& a, bool loop, Parameters const& params) + : impl_ (new Socket_Impl (a, loop, params)) + { + } void Socket:: - send (void const* buf, size_t s) - { - impl_->send_ (buf, s); - } + send (void const* buf, size_t s) + { + impl_->send_ (buf, s); + } ssize_t Socket:: - recv (void* buf, size_t s) - { - return impl_->recv_ (buf, s, 0, 0); - } + recv (void* buf, size_t s) + { + return impl_->recv_ (buf, s, 0, 0); + } ssize_t Socket:: - recv (void* buf, size_t s, ACE_INET_Addr& from) - { - return impl_->recv_ (buf, s, 0, &from); - } + recv (void* buf, size_t s, ACE_INET_Addr& from) + { + return impl_->recv_ (buf, s, 0, &from); + } ssize_t Socket:: - recv (void* buf, size_t s, ACE_Time_Value const& timeout) - { - return impl_->recv_ (buf, s, &timeout, 0); - } + recv (void* buf, size_t s, ACE_Time_Value const& timeout) + { + return impl_->recv_ (buf, s, &timeout, 0); + } ssize_t Socket:: - recv (void* buf, - size_t s, - ACE_Time_Value const& timeout, - ACE_INET_Addr& from) - { - return impl_->recv_ (buf, s, &timeout, &from); - } + recv (void* buf, + size_t s, + ACE_Time_Value const& timeout, + ACE_INET_Addr& from) + { + return impl_->recv_ (buf, s, &timeout, &from); + } ssize_t Socket:: - size () - { - return impl_->size_ (0); - } + size () + { + return impl_->size_ (0); + } ssize_t Socket:: - size (ACE_Time_Value const& timeout) - { - return impl_->size_ (&timeout); - } + size (ACE_Time_Value const& timeout) + { + return impl_->size_ (&timeout); + } ACE_HANDLE Socket:: - get_handle () - { - return impl_->get_handle_ (); - } + get_handle () + { + return impl_->get_handle_ (); + } } |