summaryrefslogtreecommitdiff
path: root/protocols
diff options
context:
space:
mode:
authorschmidt <douglascraigschmidt@users.noreply.github.com>2005-09-20 13:40:57 +0000
committerschmidt <douglascraigschmidt@users.noreply.github.com>2005-09-20 13:40:57 +0000
commit3a91c94da44c5ecc38034b5edc63ee02c8cd6456 (patch)
treee1450e5c84176d4200fb5ddc6a8b395e458a3923 /protocols
parentb77a923d4dee47d6c8fa83b6c9e68d532c4fc929 (diff)
downloadATCD-3a91c94da44c5ecc38034b5edc63ee02c8cd6456.tar.gz
ChangeLogTag:Tue Sep 20 10:10:00 UTC 2005 Simon Massey <simon.massey@prismtech.com>
Diffstat (limited to 'protocols')
-rw-r--r--protocols/ace/RMCast/Socket.cpp463
1 files changed, 231 insertions, 232 deletions
diff --git a/protocols/ace/RMCast/Socket.cpp b/protocols/ace/RMCast/Socket.cpp
index 03564335ff3..448d4692e81 100644
--- a/protocols/ace/RMCast/Socket.cpp
+++ b/protocols/ace/RMCast/Socket.cpp
@@ -80,314 +80,313 @@ namespace ACE_RMCast
Socket_Impl::
- Socket_Impl (Address const& a, bool loop, Parameters const& params)
- : loop_ (loop),
- params_ (params),
- cond_ (mutex_)
- {
- fragment_.reset (new Fragment (params_));
- reassemble_.reset (new Reassemble (params_));
- acknowledge_.reset (new Acknowledge (params_));
- retransmit_.reset (new Retransmit (params_));
- flow_.reset (new Flow (params_));
- link_.reset (new Link (a, params_));
-
- // Start IN stack from top to bottom.
- //
- in_start (0);
- fragment_->in_start (this);
- reassemble_->in_start (fragment_.get ());
- acknowledge_->in_start (reassemble_.get ());
- retransmit_->in_start (acknowledge_.get ());
- flow_->in_start (retransmit_.get ());
- link_->in_start (flow_.get ());
-
- // Start OUT stack from bottom up.
- //
- link_->out_start (0);
- flow_->out_start (link_.get ());
- retransmit_->out_start (flow_.get ());
- acknowledge_->out_start (retransmit_.get ());
- reassemble_->out_start (acknowledge_.get ());
- fragment_->out_start (reassemble_.get ());
- out_start (fragment_.get ());
- }
+ Socket_Impl (Address const& a, bool loop, Parameters const& params)
+ : loop_ (loop),
+ params_ (params),
+ cond_ (mutex_)
+ {
+ fragment_.reset (new Fragment (params_));
+ reassemble_.reset (new Reassemble (params_));
+ acknowledge_.reset (new Acknowledge (params_));
+ retransmit_.reset (new Retransmit (params_));
+ flow_.reset (new Flow (params_));
+ link_.reset (new Link (a, params_));
+
+ // Start IN stack from top to bottom.
+ //
+ in_start (0);
+ fragment_->in_start (this);
+ reassemble_->in_start (fragment_.get ());
+ acknowledge_->in_start (reassemble_.get ());
+ retransmit_->in_start (acknowledge_.get ());
+ flow_->in_start (retransmit_.get ());
+ link_->in_start (flow_.get ());
+
+ // Start OUT stack from bottom up.
+ //
+ link_->out_start (0);
+ flow_->out_start (link_.get ());
+ retransmit_->out_start (flow_.get ());
+ acknowledge_->out_start (retransmit_.get ());
+ reassemble_->out_start (acknowledge_.get ());
+ fragment_->out_start (reassemble_.get ());
+ out_start (fragment_.get ());
+ }
Socket_Impl::
- ~Socket_Impl ()
- {
- // Stop OUT stack from top to bottom.
- //
- out_stop ();
- fragment_->out_stop ();
- reassemble_->out_stop ();
- acknowledge_->out_stop ();
- retransmit_->out_stop ();
- flow_->out_stop ();
- link_->out_stop ();
-
- // Stop IN stack from bottom up.
- //
- link_->in_stop ();
- flow_->in_stop ();
- retransmit_->in_stop ();
- acknowledge_->in_stop ();
- reassemble_->in_stop ();
- fragment_->in_stop ();
- in_stop ();
- }
+ ~Socket_Impl ()
+ {
+ // Stop OUT stack from top to bottom.
+ //
+ out_stop ();
+ fragment_->out_stop ();
+ reassemble_->out_stop ();
+ acknowledge_->out_stop ();
+ retransmit_->out_stop ();
+ flow_->out_stop ();
+ link_->out_stop ();
+
+ // Stop IN stack from bottom up.
+ //
+ link_->in_stop ();
+ flow_->in_stop ();
+ retransmit_->in_stop ();
+ acknowledge_->in_stop ();
+ reassemble_->in_stop ();
+ fragment_->in_stop ();
+ in_stop ();
+ }
void Socket_Impl::
- send_ (void const* buf, size_t s)
- {
- Message_ptr m (new Message);
+ send_ (void const* buf, size_t s)
+ {
+ Message_ptr m (new Message);
- m->add (Profile_ptr (new Data (buf, s)));
+ m->add (Profile_ptr (new Data (buf, s)));
- // Qualification is for VC6 and VxWorks.
- //
- Element::send (m);
- }
+ // Qualification is for VC6 and VxWorks.
+ //
+ Element::send (m);
+ }
ssize_t Socket_Impl::
- recv_ (void* buf,
- size_t s,
- ACE_Time_Value const* timeout,
- ACE_INET_Addr* from)
- {
- ACE_Time_Value abs_time;
-
- if (timeout)
- abs_time = ACE_OS::gettimeofday () + *timeout;
-
- Lock l (mutex_);
-
- while (queue_.is_empty ())
+ recv_ (void* buf,
+ size_t s,
+ ACE_Time_Value const* timeout,
+ ACE_INET_Addr* from)
{
+ ACE_Time_Value abs_time;
+
if (timeout)
- {
- if (cond_.wait (&abs_time) != -1)
- break;
- }
- else
- {
- if (cond_.wait () != -1)
- break;
- }
-
- return -1; // errno is already set
- }
+ abs_time = ACE_OS::gettimeofday () + *timeout;
+ Lock l (mutex_);
+
+ while (queue_.is_empty ())
+ {
+ if (timeout)
+ {
+ if (cond_.wait (&abs_time) != -1)
+ break;
+ }
+ else
+ {
+ if (cond_.wait () != -1)
+ break;
+ }
+
+ return -1; // errno is already set
+ }
- Message_ptr m;
- if (queue_.dequeue_head (m) == -1)
- abort ();
+ Message_ptr m;
+ if (queue_.dequeue_head (m) == -1)
+ abort ();
- if (queue_.is_empty ())
- {
- // Remove data from the pipe.
- //
- if (signal_pipe_.read_handle () != ACE_INVALID_HANDLE)
- {
- char c;
- if (ACE_OS::read (signal_pipe_.read_handle (), &c, 1) != 1)
+ if (queue_.is_empty ())
{
- perror ("read: ");
- abort ();
+ // Remove data from the pipe.
+ //
+ if (signal_pipe_.read_handle () != ACE_INVALID_HANDLE)
+ {
+ char c;
+
+ if (signal_pipe_.recv (&c, 1) != 1)
+ {
+ perror ("read: ");
+ abort ();
+ }
+ }
}
- }
- }
- if (from)
- *from = static_cast<From const*> (m->find (From::id))->address ();
+ if (from)
+ *from = static_cast<From const*> (m->find (From::id))->address ();
- if (m->find (NoData::id) != 0)
- {
- errno = ENOENT;
- return -1;
- }
+ if (m->find (NoData::id) != 0)
+ {
+ errno = ENOENT;
+ return -1;
+ }
- Data const* d = static_cast<Data const*>(m->find (Data::id));
+ Data const* d = static_cast<Data const*>(m->find (Data::id));
- ssize_t r (static_cast<ssize_t> (d->size () < s ? d->size () : s));
+ ssize_t r (static_cast<ssize_t> (d->size () < s ? d->size () : s));
- ACE_OS::memcpy (buf, d->buf (), r);
+ ACE_OS::memcpy (buf, d->buf (), r);
- return r;
- }
+ return r;
+ }
ssize_t Socket_Impl::
- size_ (ACE_Time_Value const* timeout)
- {
- ACE_Time_Value abs_time;
+ size_ (ACE_Time_Value const* timeout)
+ {
+ ACE_Time_Value abs_time;
- if (timeout)
- abs_time = ACE_OS::gettimeofday () + *timeout;
+ if (timeout)
+ abs_time = ACE_OS::gettimeofday () + *timeout;
- Lock l (mutex_);
+ Lock l (mutex_);
- while (queue_.is_empty ())
- {
- if (timeout)
- {
- if (cond_.wait (&abs_time) != -1)
- break;
- }
- else
- {
- if (cond_.wait () != -1)
- break;
- }
-
- return -1; // errno is already set
- }
+ while (queue_.is_empty ())
+ {
+ if (timeout)
+ {
+ if (cond_.wait (&abs_time) != -1)
+ break;
+ }
+ else
+ {
+ if (cond_.wait () != -1)
+ break;
+ }
+
+ return -1; // errno is already set
+ }
- // I can't get the head of the queue without actually dequeuing
- // the element.
- //
- Message_ptr m;
+ // I can't get the head of the queue without actually dequeuing
+ // the element.
+ //
+ Message_ptr m;
- if (queue_.dequeue_head (m) == -1)
- abort ();
+ if (queue_.dequeue_head (m) == -1)
+ abort ();
- if (queue_.enqueue_head (m) == -1)
- abort ();
+ if (queue_.enqueue_head (m) == -1)
+ abort ();
- if (m->find (NoData::id) != 0)
- {
- errno = ENOENT;
- return -1;
- }
+ if (m->find (NoData::id) != 0)
+ {
+ errno = ENOENT;
+ return -1;
+ }
- Data const* d = static_cast<Data const*>(m->find (Data::id));
+ Data const* d = static_cast<Data const*>(m->find (Data::id));
- return static_cast<ssize_t> (d->size ());
- }
+ return static_cast<ssize_t> (d->size ());
+ }
ACE_HANDLE Socket_Impl::
- get_handle_ ()
- {
- if (signal_pipe_.read_handle () == ACE_INVALID_HANDLE)
+ get_handle_ ()
{
- signal_pipe_.open ();
- }
+ if (signal_pipe_.read_handle () == ACE_INVALID_HANDLE)
+ {
+ signal_pipe_.open ();
+ }
- return signal_pipe_.read_handle ();
- }
+ return signal_pipe_.read_handle ();
+ }
void Socket_Impl::
- recv (Message_ptr m)
- {
- if (m->find (Data::id) != 0 || m->find (NoData::id) != 0)
+ recv (Message_ptr m)
{
- if (!loop_)
- {
- Address to (static_cast<To const*> (m->find (To::id))->address ());
+ if (m->find (Data::id) != 0 || m->find (NoData::id) != 0)
+ {
+ if (!loop_)
+ {
+ Address to (static_cast<To const*> (m->find (To::id))->address ());
- Address from (
- static_cast<From const*> (m->find (From::id))->address ());
+ Address from (
+ static_cast<From const*> (m->find (From::id))->address ());
- if (to == from)
- return;
- }
+ if (to == from)
+ return;
+ }
- Lock l (mutex_);
+ Lock l (mutex_);
- //if (queue_.size () != 0)
- // cerr << "recv socket queue size: " << queue_.size () << endl;
+ //if (queue_.size () != 0)
+ // cerr << "recv socket queue size: " << queue_.size () << endl;
- bool signal (queue_.is_empty ());
+ bool signal (queue_.is_empty ());
- queue_.enqueue_tail (m);
+ queue_.enqueue_tail (m);
- if (signal)
- {
- // Also write to the pipe.
- //
- if (signal_pipe_.write_handle () != ACE_INVALID_HANDLE)
- {
- char c;
+ if (signal)
+ {
+ // Also write to the pipe.
+ if (signal_pipe_.write_handle () != ACE_INVALID_HANDLE)
+ {
+ char c;
- if (ACE_OS::write (signal_pipe_.write_handle (), &c, 1) != 1)
- {
- // perror ("write: ");
- abort ();
- }
- }
+ if (signal_pipe_.send (&c, 1) != 1)
+ {
+ // perror ("write: ");
+ abort ();
+ }
+ }
- cond_.signal ();
- }
+ cond_.signal ();
+ }
+ }
}
- }
// Socket
//
//
Socket::
- ~Socket ()
- {
- }
+ ~Socket ()
+ {
+ }
Socket::
- Socket (Address const& a, bool loop, Parameters const& params)
- : impl_ (new Socket_Impl (a, loop, params))
- {
- }
+ Socket (Address const& a, bool loop, Parameters const& params)
+ : impl_ (new Socket_Impl (a, loop, params))
+ {
+ }
void Socket::
- send (void const* buf, size_t s)
- {
- impl_->send_ (buf, s);
- }
+ send (void const* buf, size_t s)
+ {
+ impl_->send_ (buf, s);
+ }
ssize_t Socket::
- recv (void* buf, size_t s)
- {
- return impl_->recv_ (buf, s, 0, 0);
- }
+ recv (void* buf, size_t s)
+ {
+ return impl_->recv_ (buf, s, 0, 0);
+ }
ssize_t Socket::
- recv (void* buf, size_t s, ACE_INET_Addr& from)
- {
- return impl_->recv_ (buf, s, 0, &from);
- }
+ recv (void* buf, size_t s, ACE_INET_Addr& from)
+ {
+ return impl_->recv_ (buf, s, 0, &from);
+ }
ssize_t Socket::
- recv (void* buf, size_t s, ACE_Time_Value const& timeout)
- {
- return impl_->recv_ (buf, s, &timeout, 0);
- }
+ recv (void* buf, size_t s, ACE_Time_Value const& timeout)
+ {
+ return impl_->recv_ (buf, s, &timeout, 0);
+ }
ssize_t Socket::
- recv (void* buf,
- size_t s,
- ACE_Time_Value const& timeout,
- ACE_INET_Addr& from)
- {
- return impl_->recv_ (buf, s, &timeout, &from);
- }
+ recv (void* buf,
+ size_t s,
+ ACE_Time_Value const& timeout,
+ ACE_INET_Addr& from)
+ {
+ return impl_->recv_ (buf, s, &timeout, &from);
+ }
ssize_t Socket::
- size ()
- {
- return impl_->size_ (0);
- }
+ size ()
+ {
+ return impl_->size_ (0);
+ }
ssize_t Socket::
- size (ACE_Time_Value const& timeout)
- {
- return impl_->size_ (&timeout);
- }
+ size (ACE_Time_Value const& timeout)
+ {
+ return impl_->size_ (&timeout);
+ }
ACE_HANDLE Socket::
- get_handle ()
- {
- return impl_->get_handle_ ();
- }
+ get_handle ()
+ {
+ return impl_->get_handle_ ();
+ }
}