summaryrefslogtreecommitdiff
path: root/protocols/ace/RMCast/Socket.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'protocols/ace/RMCast/Socket.cpp')
-rw-r--r--protocols/ace/RMCast/Socket.cpp112
1 files changed, 112 insertions, 0 deletions
diff --git a/protocols/ace/RMCast/Socket.cpp b/protocols/ace/RMCast/Socket.cpp
new file mode 100644
index 00000000000..640a1a7d694
--- /dev/null
+++ b/protocols/ace/RMCast/Socket.cpp
@@ -0,0 +1,112 @@
+// file : ace/RMCast/Socket.cpp
+// author : Boris Kolpackov <boris@kolpackov.net>
+// cvs-id : $Id$
+
+#include <ace/OS.h>
+
+#include <ace/RMCast/Socket.h>
+
+namespace ACE_RMCast
+{
+ Socket::
+ Socket (Address const& a, bool loop)
+ : loop_ (loop), sn_ (1), cond_ (mutex_)
+ {
+ acknowledge_ = auto_ptr<Acknowledge> (new Acknowledge ());
+ retransmit_ = auto_ptr<Retransmit> (new Retransmit ());
+ simulator_ = auto_ptr<Simulator> (new Simulator ());
+ link_ = auto_ptr<Link> (new Link (a));
+
+ // Start IN stack from top to bottom.
+ //
+ in_start (0);
+ acknowledge_->in_start (this);
+ retransmit_->in_start (acknowledge_.get ());
+ simulator_->in_start (retransmit_.get ());
+ link_->in_start (simulator_.get ());
+
+ // Start OUT stack from bottom up.
+ //
+ link_->out_start (0);
+ simulator_->out_start (link_.get ());
+ retransmit_->out_start (simulator_.get ());
+ acknowledge_->out_start (retransmit_.get ());
+ out_start (acknowledge_.get ());
+ }
+
+ Socket::
+ ~Socket ()
+ {
+ // Stop OUT stack from top to bottom.
+ //
+ out_stop ();
+ acknowledge_->out_stop ();
+ retransmit_->out_stop ();
+ simulator_->out_stop ();
+ link_->out_stop ();
+
+ // Stop IN stack from bottom up.
+ //
+ link_->in_stop ();
+ simulator_->in_stop ();
+ retransmit_->in_stop ();
+ acknowledge_->in_stop ();
+ in_stop ();
+ }
+
+
+ void Socket::
+ send (void const* buf, size_t s)
+ {
+ Message_ptr m (new Message);
+
+ m->add (Profile_ptr (new SN (sn_++)));
+ m->add (Profile_ptr (new Data (buf, s)));
+
+ send (m);
+ }
+
+ size_t Socket::
+ recv (void* buf, size_t s)
+ {
+ Lock l (mutex_);
+
+ while (queue_.is_empty ()) cond_.wait ();
+
+ Message_ptr m;
+ if (queue_.dequeue_head (m) == -1) abort ();
+
+ Data const* d (static_cast<Data const*>(m->find (Data::id)));
+
+ size_t r (d->size () < s ? d->size () : s);
+
+ ACE_OS::memcpy (buf, d->buf (), r);
+
+ return r;
+ }
+
+ void Socket::
+ recv (Message_ptr m)
+ {
+ if (m->find (Data::id) != 0)
+ {
+ if (!loop_)
+ {
+ Address to (static_cast<To const*> (m->find (To::id))->address ());
+
+ Address from (
+ static_cast<From const*> (m->find (From::id))->address ());
+
+ if (to == from) return;
+ }
+
+ Lock l (mutex_);
+
+ bool signal (queue_.is_empty ());
+
+ queue_.enqueue_tail (m);
+
+ if (signal) cond_.signal ();
+ }
+ }
+}