From e0855857e56fd07749b4eddec8fb09e2295137cc Mon Sep 17 00:00:00 2001 From: nobody Date: Thu, 8 Jan 2004 13:51:26 +0000 Subject: This commit was manufactured by cvs2svn to create branch 'unlabeled-1.3.2'. --- protocols/ace/TMCast/LinkListener.hpp | 166 ++++++++++++++++++++++++++++++++++ protocols/ace/TMCast/TMCast.mpc | 8 ++ 2 files changed, 174 insertions(+) create mode 100644 protocols/ace/TMCast/LinkListener.hpp create mode 100644 protocols/ace/TMCast/TMCast.mpc diff --git a/protocols/ace/TMCast/LinkListener.hpp b/protocols/ace/TMCast/LinkListener.hpp new file mode 100644 index 00000000000..aee1263aa0a --- /dev/null +++ b/protocols/ace/TMCast/LinkListener.hpp @@ -0,0 +1,166 @@ +// file : TMCast/LinkListener.hpp +// author : Boris Kolpackov +// cvs-id : $Id$ + +// OS primitives +#include +#include +#include + + +#include "Messaging.hpp" + +namespace TMCast +{ + // + // + // + class LinkFailure : public virtual Message {}; + + + // + // + // + class LinkData : public virtual Message + { + public: + LinkData (Protocol::MessageHeader const* header, + void* payload, + size_t size) + : size_ (size) + { + ACE_OS::memcpy (&header_, header, sizeof (Protocol::MessageHeader)); + ACE_OS::memcpy (payload_, payload, size_); + } + + Protocol::MessageHeader const& + header () const + { + return header_; + } + + void const* + payload () const + { + return payload_; + } + + size_t + size () const + { + return size_; + } + + private: + Protocol::MessageHeader header_; + char payload_[Protocol::MAX_MESSAGE_SIZE]; + size_t size_; + }; + + typedef + ACE_Refcounted_Auto_Ptr + LinkDataPtr; + + // + // + // + class LinkListener + { + private: + class Terminate : public virtual Message {}; + + public: + LinkListener (ACE_SOCK_Dgram_Mcast& sock, MessageQueue& out) + : sock_(sock), out_ (out) + { + if (ACE_OS::thr_create (&thread_thunk, + this, + THR_JOINABLE, + &thread_) != 0) ::abort (); + } + + ~LinkListener () + { + { + MessageQueueAutoLock lock (control_); + + control_.push (MessagePtr (new Terminate)); + } + + if (ACE_OS::thr_join (thread_, &thread_, 0) != 0) ::abort (); + + // cerr << "Link listener is down." << endl; + } + + + static ACE_THR_FUNC_RETURN + thread_thunk (void* arg) + { + LinkListener* obj = reinterpret_cast (arg); + + obj->execute (); + return 0; + } + + void + execute () + { + char msg[Protocol::MAX_MESSAGE_SIZE]; + + ssize_t header_size = sizeof (Protocol::MessageHeader); + + // OS::Time timeout (1000000); // one millisecond + + ACE_Time_Value timeout (0, 1000); // one millisecond + + try + { + while (true) + { + // Check control message queue + + { + MessageQueueAutoLock lock (control_); + + if (!control_.empty ()) break; + } + + ACE_Addr junk; + ssize_t n = sock_.recv (msg, + Protocol::MAX_MESSAGE_SIZE, + junk, + 0, + &timeout); + + if (n != -1) + { + if (n < header_size) throw false; + + Protocol::MessageHeader* header = + reinterpret_cast (msg); + + MessageQueueAutoLock lock (out_); + + out_.push (MessagePtr (new LinkData (header, + msg + header_size, + n - header_size))); + } + } + } + catch (...) + { + MessageQueueAutoLock lock (out_); + + out_.push (MessagePtr (new LinkFailure)); + } + } + + private: + typedef ACE_Guard AutoLock; + + ACE_thread_t thread_; + ACE_SOCK_Dgram_Mcast& sock_; + MessageQueue& out_; + MessageQueue control_; + }; +} diff --git a/protocols/ace/TMCast/TMCast.mpc b/protocols/ace/TMCast/TMCast.mpc new file mode 100644 index 00000000000..0899982dd09 --- /dev/null +++ b/protocols/ace/TMCast/TMCast.mpc @@ -0,0 +1,8 @@ +// -*- MPC -*- +// $Id$ + +project : acelib, core { + requires += exceptions + sharedname = TMCast + dynamicflags += TMCAST_BUILD_DLL +} -- cgit v1.2.1