From fa8b7676a7d1aa93572fdc12cba2e4800d563f1a Mon Sep 17 00:00:00 2001 From: coryan Date: Fri, 10 Jul 1998 22:30:59 +0000 Subject: ChangeLogTag:Fri Jul 10 17:29:25 1998 Carlos O'Ryan --- TAO/ChangeLog-98c | 42 + TAO/orbsvcs/Event_Service/Event_Service.cpp | 18 +- TAO/orbsvcs/orbsvcs/Event/EC_Gateway.cpp | 15 +- TAO/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.cpp | 482 +++++++++ TAO/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.h | 249 +++++ TAO/orbsvcs/orbsvcs/Event/Event_Channel.cpp | 138 ++- TAO/orbsvcs/orbsvcs/Event/Event_Channel.h | 17 +- TAO/orbsvcs/orbsvcs/Event/Event_Channel.i | 6 + TAO/orbsvcs/orbsvcs/Makefile | 262 +++++ TAO/orbsvcs/tests/EC_Mcast/EC_Mcast.cpp | 1239 ++++++++++++++++++++++ TAO/orbsvcs/tests/EC_Mcast/EC_Mcast.h | 344 ++++++ TAO/orbsvcs/tests/EC_Mcast/Makefile | 34 + TAO/orbsvcs/tests/EC_Mcast/svc.conf | 7 + TAO/orbsvcs/tests/EC_Multiple/EC_Multiple.cpp | 10 +- TAO/orbsvcs/tests/EC_Throughput/ECT_Consumer.cpp | 5 +- TAO/tao/CDR.cpp | 54 - TAO/tao/CDR.h | 4 + TAO/tao/CDR.i | 56 + TAO/tao/GIOP.cpp | 2 +- 19 files changed, 2900 insertions(+), 84 deletions(-) create mode 100644 TAO/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.cpp create mode 100644 TAO/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.h create mode 100644 TAO/orbsvcs/tests/EC_Mcast/EC_Mcast.cpp create mode 100644 TAO/orbsvcs/tests/EC_Mcast/EC_Mcast.h create mode 100644 TAO/orbsvcs/tests/EC_Mcast/Makefile create mode 100644 TAO/orbsvcs/tests/EC_Mcast/svc.conf diff --git a/TAO/ChangeLog-98c b/TAO/ChangeLog-98c index 890dc13e4a1..ba085bd7644 100644 --- a/TAO/ChangeLog-98c +++ b/TAO/ChangeLog-98c @@ -1,3 +1,45 @@ +Fri Jul 10 17:29:25 1998 Carlos O'Ryan + + * orbsvcs/tests/EC_Mcast/Makefile: + * orbsvcs/tests/EC_Mcast/svc.conf: + * orbsvcs/tests/EC_Mcast/EC_Mcast.h: + * orbsvcs/tests/EC_Mcast/EC_Mcast.cpp: + * orbsvcs/orbsvcs/Makefile: + * orbsvcs/orbsvcs/Event/EC_Gateway_UDP.h: + * orbsvcs/orbsvcs/Event/EC_Gateway_UDP.cpp: + Added the first implementation of a UDP/Mcast based gateway to + the EC. The current classes require a lot of manual + configuration. + Also implemented a small test to show the usage. + + * tao/CDR.h: + * tao/CDR.i: + * tao/CDR.cpp: + Give access to external users of the swap_ methods, also + provide an accesor for the byte swapping flag in OutputCDRs. + + * tao/GIOP.cpp: + Use the new byte swap accessor. + + * orbsvcs/orbsvcs/Event/Event_Channel.h: + * orbsvcs/orbsvcs/Event/Event_Channel.i: + * orbsvcs/orbsvcs/Event/Event_Channel.cpp: + Completed support for gateways that are interested in the + supplier list changes. It is now possible to add Gateways + *after* the consumer or supplier list has stabilized. + + * orbsvcs/orbsvcs/Event/EC_Gateway.cpp: + Do nothing if receive a supplier update. + + * orbsvcs/tests/EC_Throughput/ECT_Consumer.cpp: + Destroy the Event Channel *after* the consumers unsubscribe from + it. + + * orbsvcs/tests/EC_Multiple/EC_Multiple.cpp: + * orbsvcs/Event_Service/Event_Service.cpp: + Before a normal exit we unbind the Event Service (and, if local, + the Scheduling Service) from the Naming Service. + Fri Jul 10 17:03:11 1998 Seth Benjamin Widoff * orbsvcs/orbsvcs/Trader/Constraint_Nodes.cpp: diff --git a/TAO/orbsvcs/Event_Service/Event_Service.cpp b/TAO/orbsvcs/Event_Service/Event_Service.cpp index 4d8ba9d5f3e..a7f25c69d17 100644 --- a/TAO/orbsvcs/Event_Service/Event_Service.cpp +++ b/TAO/orbsvcs/Event_Service/Event_Service.cpp @@ -113,6 +113,12 @@ int main (int argc, char *argv[]) auto_ptr scheduler_impl; RtecScheduler::Scheduler_var scheduler; + // This is the name we (potentially) register the Scheduling + // Service in the Naming Service. + CosNaming::Name schedule_name (1); + schedule_name.length (1); + schedule_name[0].id = CORBA::string_dup ("ScheduleService"); + if (global_scheduler == 0) { scheduler_impl = @@ -129,9 +135,6 @@ int main (int argc, char *argv[]) str.in ())); // Register the servant with the Naming Context.... - CosNaming::Name schedule_name (1); - schedule_name.length (1); - schedule_name[0].id = CORBA::string_dup ("ScheduleService"); naming_context->bind (schedule_name, scheduler.in (), TAO_TRY_ENV); TAO_CHECK_ENV; } @@ -164,6 +167,15 @@ int main (int argc, char *argv[]) if (orb->run () == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "run"), 1); + naming_context->unbind (channel_name, TAO_TRY_ENV); + TAO_CHECK_ENV; + + if (global_scheduler == 0) + { + naming_context->unbind (schedule_name, TAO_TRY_ENV); + TAO_CHECK_ENV; + } + } TAO_CATCHANY { diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.cpp index 1eab11d593e..ed67ff3c0a4 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.cpp @@ -186,20 +186,7 @@ TAO_EC_Gateway_IIOP::update_supplier (RtecEventChannelAdmin::ConsumerQOS& sub, RtecEventChannelAdmin::SupplierQOS& pub, CORBA::Environment& env) { - this->close (env); - if (env.exception () != 0) return; - - for (CORBA::ULong i = 0; i < sub.dependencies.length (); ++i) - { - sub.dependencies[i].rt_info = this->rmt_info_; - } - - for (CORBA::ULong j = 0; j < pub.publications.length (); ++j) - { - pub.publications[j].dependency_info.rt_info = this->lcl_info_; - } - - this->open (sub, pub, env); + // Do nothing... } void diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.cpp new file mode 100644 index 00000000000..f31281ec400 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.cpp @@ -0,0 +1,482 @@ +// +// $Id$ +// + +#include "orbsvcs/Event/EC_Gateway_UDP.h" +#include "orbsvcs/Event_Utilities.h" +#include "orbsvcs/Time_Utilities.h" + +// **************************************************************** + +TAO_ECG_UDP_Sender::TAO_ECG_UDP_Sender (void) +{ +} + +int +TAO_ECG_UDP_Sender::get_local_addr (ACE_INET_Addr& addr) +{ + return this->dgram_.get_local_addr (addr); +} + +void +TAO_ECG_UDP_Sender::init (RtecEventChannelAdmin::EventChannel_ptr lcl_ec, + RtecScheduler::Scheduler_ptr lcl_sched, + const char* lcl_name, + const ACE_INET_Addr& ipaddr, + CORBA::Environment &_env) +{ + this->lcl_ec_ = + RtecEventChannelAdmin::EventChannel::_duplicate (lcl_ec); + + this->lcl_info_ = + lcl_sched->create (lcl_name, _env); + if (_env.exception () != 0) return; + + ACE_Time_Value tv (0, 500); + TimeBase::TimeT time; + ORBSVCS_Time::Time_Value_to_TimeT (time, tv); + lcl_sched->set (this->lcl_info_, + RtecScheduler::VERY_HIGH_CRITICALITY, + time, time, time, + 25000 * 10, + RtecScheduler::VERY_LOW_IMPORTANCE, + time, + 0, + RtecScheduler::OPERATION, + _env); + if (_env.exception () != 0) return; + + if (this->dgram_.open (ipaddr) == -1) + { + // @@ TODO Use a Event Channel specific exception + ACE_ERROR ((LM_ERROR, "ECG_UDP::init - Dgram open failed\n")); + _env.exception (new CORBA::COMM_FAILURE (CORBA::COMPLETED_NO)); + } + if (_env.exception () != 0) return; +} + +void +TAO_ECG_UDP_Sender::shutdown (CORBA::Environment& _env) +{ + this->close (_env); + if (_env.exception () == 0) return; + this->lcl_ec_ = RtecEventChannelAdmin::EventChannel::_nil (); + + if (this->dgram_.close () == -1) + { + // @@ TODO Use a Event Channel specific exception + ACE_ERROR ((LM_ERROR, "ECG_UDP::init - Dgram close failed\n")); + _env.exception (new CORBA::COMM_FAILURE (CORBA::COMPLETED_NO)); + } + if (_env.exception () != 0) return; +} + +void +TAO_ECG_UDP_Sender::open (RtecEventChannelAdmin::ConsumerQOS& sub, + CORBA::Environment &_env) +{ + // ACE_DEBUG ((LM_DEBUG, "ECG (%t) Open gateway\n")); + if (CORBA::is_nil (this->lcl_ec_.in ())) + return; + + if (!CORBA::is_nil (this->supplier_proxy_.in ())) + this->close (_env); + if (_env.exception () != 0) return; + + if (sub.dependencies.length () == 0) + return; + for (CORBA::ULong j = 0; j < sub.dependencies.length (); ++j) + { + sub.dependencies[j].rt_info = this->lcl_info_; + } + + //ACE_DEBUG ((LM_DEBUG, "ECG (%t) Gateway/Supplier ")); + //ACE_SupplierQOS_Factory::debug (pub); + + RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin = + this->lcl_ec_->for_consumers (_env); + if (_env.exception () != 0) return; + + this->supplier_proxy_ = + consumer_admin->obtain_push_supplier (_env); + if (_env.exception () != 0) return; + + RtecEventComm::PushConsumer_var consumer_ref = + this->_this (_env); + if (_env.exception () != 0) return; + + //ACE_DEBUG ((LM_DEBUG, "ECG (%t) Gateway/Consumer ")); + //ACE_ConsumerQOS_Factory::debug (sub); + + this->supplier_proxy_->connect_push_consumer (consumer_ref.in (), + sub, + _env); + if (_env.exception () != 0) return; +} + +void +TAO_ECG_UDP_Sender::close (CORBA::Environment &env) +{ + // ACE_DEBUG ((LM_DEBUG, "ECG (%t) Closing gateway\n")); + if (CORBA::is_nil (this->supplier_proxy_.in ())) + return; + + this->supplier_proxy_->disconnect_push_supplier (env); + if (env.exception () != 0) return; + this->supplier_proxy_ = + RtecEventChannelAdmin::ProxyPushSupplier::_nil (); +} + +void +TAO_ECG_UDP_Sender::disconnect_push_consumer (CORBA::Environment &) +{ + ACE_DEBUG ((LM_DEBUG, + "ECG (%t): Supplier-consumer received " + "disconnect from channel.\n")); +} + +void +TAO_ECG_UDP_Sender::push (const RtecEventComm::EventSet &events, + CORBA::Environment & _env) +{ + ACE_DEBUG ((LM_DEBUG, "ECG_UDP_Sender::push - ")); + + if (events.length () == 0) + { + ACE_DEBUG ((LM_DEBUG, "no events\n")); + return; + } + + ACE_DEBUG ((LM_DEBUG, "%d event(s) - ", events.length ())); + + // @@ TODO, there is an extra data copy here, we should do the event + // modification without it and only compact the necessary events. + int count = 0; + RtecEventComm::EventSet out (events.length ()); + for (u_int i = 0; i < events.length (); ++i) + { + //ACE_DEBUG ((LM_DEBUG, "type = %d ", events[i].type_)); + if (events[i].ttl_ > 0) + { + count++; + out.length (count); + out[count - 1] = events[i]; + out[count - 1].ttl_--; + } + } + ACE_DEBUG ((LM_DEBUG, "count = %d\n", count)); + + if (count > 0) + { + TAO_OutputCDR cdr; + cdr.write_boolean (TAO_ENCAP_BYTE_ORDER); + cdr.write_ulong (0); // Place holder for size... + + cdr.encode (RtecEventComm::_tc_EventSet, &out, 0, _env); + if (_env.exception () != 0) return; + + CORBA::ULong bodylen = cdr.total_length (); + char* buf = ACE_const_cast(char*,cdr.buffer ()); + buf += 4; +#if !defined (TAO_ENABLE_SWAP_ON_WRITE) + *ACE_reinterpret_cast(CORBA::ULong*,buf) = bodylen; +#else + if (!cdr.do_byte_swap ()) + { + *ACE_reinterpret_cast(CORBA::ULong*, buf) = bodylen; + } + else + { + CDR::swap_4 (ACE_reinterpret_cast(char*,&bodylen), buf); + } +#endif + + // This is a good maximum, because Dgrams cannot be longer than + // 64K and the usual size for a CDR fragment is 512 bytes, still + // if this is not enough we allocate memory from the heap. + const int TAO_WRITEV_MAX = 128; + ACE_IO_Vector iov[TAO_WRITEV_MAX]; + + int iovcnt = 0; + for (const ACE_Message_Block* i = cdr.begin (); + i != cdr.end () && iovcnt < TAO_WRITEV_MAX; + i = i->cont ()) + { + iov[iovcnt].buffer (i->rd_ptr ()); + iov[iovcnt].length (i->length ()); + iovcnt++; + } + + ssize_t n = this->dgram_.send (iov, iovcnt); + if (n == -1) + { + // @@ TODO Use a Event Channel specific exception + ACE_DEBUG ((LM_DEBUG, + "ECG_UDP (%t) send failed %p\n", "")); + _env.exception (new CORBA::COMM_FAILURE (CORBA::COMPLETED_NO)); + } + else if (n == 0) + { + // @@ TODO Use a Event Channel specific exception + ACE_DEBUG ((LM_DEBUG, + "ECG_UDP (%t) EOF on send \n")); + _env.exception (new CORBA::COMM_FAILURE (CORBA::COMPLETED_NO)); + } + } +} + +// **************************************************************** + +TAO_ECG_UDP_Receiver::TAO_ECG_UDP_Receiver (void) +{ +} + +void +TAO_ECG_UDP_Receiver::init (RtecEventChannelAdmin::EventChannel_ptr lcl_ec, + RtecScheduler::Scheduler_ptr lcl_sched, + const char* lcl_name, + const ACE_INET_Addr& ignore_from, + CORBA::Environment &_env) +{ + this->ignore_from_ = ignore_from; + + this->lcl_ec_ = + RtecEventChannelAdmin::EventChannel::_duplicate (lcl_ec); + + this->lcl_info_ = + lcl_sched->create (lcl_name, _env); + if (_env.exception () != 0) return; + + ACE_Time_Value tv (0, 500); + TimeBase::TimeT time; + ORBSVCS_Time::Time_Value_to_TimeT (time, tv); + lcl_sched->set (this->lcl_info_, + RtecScheduler::VERY_HIGH_CRITICALITY, + time, time, time, + 25000 * 10, + RtecScheduler::VERY_LOW_IMPORTANCE, + time, + 1, + RtecScheduler::OPERATION, + _env); + if (_env.exception () != 0) return; +} + +void +TAO_ECG_UDP_Receiver::open (RtecEventChannelAdmin::SupplierQOS& pub, + CORBA::Environment &_env) +{ + if (CORBA::is_nil (this->lcl_ec_.in ())) + return; + + if (!CORBA::is_nil (this->consumer_proxy_.in ())) + this->close (_env); + if (_env.exception () != 0) return; + + if (pub.publications.length () == 0) + return; + + for (CORBA::ULong i = 0; i < pub.publications.length (); ++i) + { + pub.publications[i].dependency_info.rt_info = this->lcl_info_; + } + + // = Connect as a supplier to the local EC + RtecEventChannelAdmin::SupplierAdmin_var supplier_admin = + this->lcl_ec_->for_suppliers (_env); + if (_env.exception () != 0) return; + + this->consumer_proxy_ = + supplier_admin->obtain_push_consumer (_env); + if (_env.exception () != 0) return; + + RtecEventComm::PushSupplier_var supplier_ref = + this->_this (_env); + if (_env.exception () != 0) return; + + ACE_DEBUG ((LM_DEBUG, "ECG_UDP_Receiver (%t) Gateway/Supplier ")); + ACE_SupplierQOS_Factory::debug (pub); + + this->consumer_proxy_->connect_push_supplier (supplier_ref.in (), + pub, + _env); + if (_env.exception () != 0) return; +} + +void +TAO_ECG_UDP_Receiver::close (CORBA::Environment &env) +{ + // ACE_DEBUG ((LM_DEBUG, "ECG (%t) Closing gateway\n")); + if (CORBA::is_nil (this->consumer_proxy_.in ())) + return; + + this->consumer_proxy_->disconnect_push_consumer (env); + if (env.exception () != 0) return; + this->consumer_proxy_ = + RtecEventChannelAdmin::ProxyPushConsumer::_nil (); + +} + +void +TAO_ECG_UDP_Receiver::disconnect_push_supplier (CORBA::Environment &) +{ + ACE_DEBUG ((LM_DEBUG, + "ECG (%t): Supplier received " + "disconnect from channel.\n")); +} + +void +TAO_ECG_UDP_Receiver::shutdown (CORBA::Environment& _env) +{ + this->close (_env); + if (_env.exception () == 0) return; + + this->lcl_ec_ = RtecEventChannelAdmin::EventChannel::_nil (); +} + +int +TAO_ECG_UDP_Receiver::handle_input (ACE_SOCK_Dgram& dgram) +{ + // Use ULong so the alignment is right. + CORBA::ULong header[2]; + ACE_Addr from; + + ssize_t n = dgram.recv (header, sizeof(header), from, MSG_PEEK); + if (n == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "ECG_UDP_Receive_EH::handle_input - peek\n"), -1); + else if (n == 0) + ACE_ERROR_RETURN ((LM_ERROR, + "ECG_UDP_Receive_EH::handle_input - peek 0\n"), + 0); + + char* buf = ACE_reinterpret_cast(char*,header); + int byte_order = buf[0]; + CORBA::ULong length = header[1]; + + if (byte_order != TAO_ENCAP_BYTE_ORDER) + { + CDR::swap_4 (buf + 4, + ACE_reinterpret_cast(char*,&length)); + } + + ACE_Message_Block mb (length + CDR::MAX_ALIGNMENT); + CDR::mb_align (&mb); + mb.wr_ptr (length); + + n = dgram.recv (mb.rd_ptr (), length, from); + + if (n == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "ECG_UDP_Receive_EH::handle_input - read\n"), -1); + else if (n == 0) + ACE_ERROR_RETURN ((LM_ERROR, + "ECG_UDP_Receive_EH::handle_input - read 0\n"), + 0); + // This is to avoid receiving the events we send; notice that we + // drop the message after reading it. + if (from == this->ignore_from_) + return 0; + + TAO_TRY + { + TAO_InputCDR cdr (&mb, byte_order); + cdr.skip_bytes (8); // skip the header... + + RtecEventComm::EventSet events; + cdr.decode (RtecEventComm::_tc_EventSet, &events, 0, + TAO_TRY_ENV); + TAO_CHECK_ENV; + + this->consumer_proxy_->push (events, TAO_TRY_ENV); + TAO_CHECK_ENV; + } + TAO_CATCHANY + { + TAO_TRY_ENV.print_exception ("ECG_UDP_Receive_EH::handle_input"); + } + TAO_ENDTRY; + return 0; +} + +// **************************************************************** + +TAO_ECG_UDP_EH::TAO_ECG_UDP_EH (TAO_ECG_UDP_Receiver *recv) + : receiver_ (recv) +{ +} + +int +TAO_ECG_UDP_EH::open (const ACE_INET_Addr& ipaddr) +{ + if (this->dgram_.open (ipaddr) == -1) + return -1; + return this->reactor ()->register_handler (this, + ACE_Event_Handler::READ_MASK); +} + +int +TAO_ECG_UDP_EH::close (void) +{ + if (this->reactor ()->remove_handler (this, + ACE_Event_Handler::READ_MASK) == -1) + return -1; + + return this->dgram_.close (); +} + +int +TAO_ECG_UDP_EH::handle_input (ACE_HANDLE) +{ + return this->receiver_->handle_input (this->dgram_); +} + +ACE_HANDLE +TAO_ECG_UDP_EH::get_handle (void) const +{ + return this->dgram_.get_handle (); +} + +// **************************************************************** + +TAO_ECG_Mcast_EH::TAO_ECG_Mcast_EH (TAO_ECG_UDP_Receiver *recv) + : receiver_ (recv) +{ +} + +int +TAO_ECG_Mcast_EH::open (const ACE_INET_Addr& mcast_group) +{ + if (this->dgram_.subscribe (mcast_group) == -1) + return -1; + return this->reactor ()->register_handler (this, + ACE_Event_Handler::READ_MASK); +} + +int +TAO_ECG_Mcast_EH::close (void) +{ + if (this->reactor ()->remove_handler (this, + ACE_Event_Handler::READ_MASK) == -1) + return -1; + + return this->dgram_.unsubscribe (); +} + +int +TAO_ECG_Mcast_EH::handle_input (ACE_HANDLE) +{ + return this->receiver_->handle_input (this->dgram_); +} + +ACE_HANDLE +TAO_ECG_Mcast_EH::get_handle (void) const +{ + return this->dgram_.get_handle (); +} + +// **************************************************************** + +#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) +#elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.h b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.h new file mode 100644 index 00000000000..8d183de7560 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.h @@ -0,0 +1,249 @@ +/* -*- C++ -*- */ +// $Id$ +// +// ============================================================================ +// +// = LIBRARY +// TAO services +// +// = FILENAME +// EC_Gateway_UDP +// +// = AUTHOR +// Carlos O'Ryan +// +// = DESCRIPTION +// Define helper classes to propagate events between ECs using +// either UDP or multicast. +// The architecture is a bit complicated and deserves some +// explanation: sending the events over UDP (or mcast) is easy, a +// Consumer (TAO_ECG_UDP_Sender) subscribes for a certain set of +// events, its push() method marshalls the event set into a CDR +// stream that is sent using an ACE_SOCK_CODgram. The subscription +// set and IP address can be configured. +// Another helper class (TAO_ECG_UDP_Receiver) acts as a supplier of +// events; it receives a callback when an event is available on an +// ACE_SOCK_Dgram, it demarshalls the event and pushes it to the +// EC. Two ACE_Event_Handler classes are provided that can forward +// the events to this Supplier: TAO_ECG_Mcast_EH can receive events +// from a multicast group; TAO_ECG_UDP_EH can receive events from a +// regular UDP socket. +// +// Matching of the events types carried by a multicast group (or IP +// address) is up to the application. Gateway classes can be +// implemented to automate this: the EC informs its gateways about +// local changes in the subscriptions (for example), the Gateway +// could then consult an administrative server that will inform it +// which multicast groups carry those event types, it can then +// create the proper event handlers and TAO_ECG_Receivers. An +// analogous class can be implemented for the Supplier side. +// +// An alternative approach would be to look the current set of +// multicast groups and the events carried on each, using that +// information a suitable TAO_ECG_UDP_Receiver can be configured +// (and of course the Senders on the client side). +// +// = TODO +// The class makes an extra copy of the events, we need to +// investigate if closer collaboration with its collocated EC could +// be used to remove that copy. +// +// ============================================================================ + +#ifndef TAO_EC_GATEWAY_UDP_H +#define TAO_EC_GATEWAY_UDP_H + +#include "ace/SOCK_CODgram.h" +#include "ace/SOCK_Dgram_Mcast.h" +#include "orbsvcs/RtecEventCommS.h" +#include "orbsvcs/RtecEventChannelAdminS.h" + +class TAO_ORBSVCS_Export TAO_ECG_UDP_Sender : public POA_RtecEventComm::PushConsumer +{ + // + // = TITLE + // Send events received from a "local" EC using UDP. + // + // = DESCRIPTION + // This class connect as a consumer to an EventChannel + // and it sends the events using UDP, the UDP address can be a + // normal IP address or it can be a multicast group. + // It marshalls the events using TAO CDR classes. + // No provisions are taken for message fragmentation. + // +public: + TAO_ECG_UDP_Sender (void); + + int get_local_addr (ACE_INET_Addr& addr); + // Get the local endpoint used to send the events. + + void init (RtecEventChannelAdmin::EventChannel_ptr lcl_ec, + RtecScheduler::Scheduler_ptr lcl_sched, + const char* lcl_name, + const ACE_INET_Addr& ipaddr, + CORBA::Environment &_env); + // To do its job this class requires to know the local EC it will + // connect to; it also requires to build an RT_Info for the local + // scheduler. + // It only keeps a copy of its SupplierProxy, used for later + // connection and disconnections. + // @@ TODO part of the RT_Info is hardcoded, we need to make it + // parametric. + + void shutdown (CORBA::Environment&); + // Disconnect and shutdown the sender, no further connections will + // work unless init() is called again. + + void open (RtecEventChannelAdmin::ConsumerQOS& sub, + CORBA::Environment& env); + // Connect (or reconnect) to the EC with the given subscriptions. + + void close (CORBA::Environment& _env); + // Disconnect from the EC, but reconnection is still possible. + + virtual void disconnect_push_consumer (CORBA::Environment &); + virtual void push (const RtecEventComm::EventSet &events, + CORBA::Environment &); + // The PushConsumer methods. + +private: + RtecEventChannelAdmin::EventChannel_var lcl_ec_; + // The remote and the local EC, so we can reconnect when the + // subscription list changes. + + RtecScheduler::handle_t lcl_info_; + // Our local and remote RT_Infos. + + RtecEventChannelAdmin::ProxyPushSupplier_var supplier_proxy_; + // We talk to the EC (as a consumer) using this proxy. + + ACE_SOCK_CODgram dgram_; + // The datagram used to send the data. +}; + +class TAO_ORBSVCS_Export TAO_ECG_UDP_Receiver : public POA_RtecEventComm::PushSupplier +{ + // + // = TITLE + // Decodes events from an ACE_SOCK_Dgram and pushes them to the + // Event_Channel. + // + // = DESCRIPTION + // This supplier receives events from an ACE_SOCK_Dgram, either + // from a UDP socket or a Mcast group, decodes them and push them + // to the EC. + // No provisions are taken for message reassembly. + // +public: + TAO_ECG_UDP_Receiver (void); + + void init (RtecEventChannelAdmin::EventChannel_ptr lcl_ec, + RtecScheduler::Scheduler_ptr lcl_sched, + const char* lcl_name, + const ACE_INET_Addr& ignore_from, + CORBA::Environment &_env); + // To do its job this class requires to know the local EC it will + // connect to; it also requires to build an RT_Info for the local + // scheduler. + // @@ TODO part of the RT_Info is hardcoded, we need to make it + // parametric. + + void shutdown (CORBA::Environment&); + // Disconnect and shutdown the gateway, no further connectsions + + void open (RtecEventChannelAdmin::SupplierQOS& pub, + CORBA::Environment &env); + // Connect to the EC using the given publications lists. + + virtual void close (CORBA::Environment& env); + // Disconnect to the EC. + + int handle_input (ACE_SOCK_Dgram& dgram); + // The Event_Handlers call this method when data is available at the + // socket, the must be ready for reading and contain a full + // event. + + // The PushSupplier method. + virtual void disconnect_push_supplier (CORBA::Environment &); + +private: + RtecEventChannelAdmin::EventChannel_var lcl_ec_; + // The remote and the local EC, so we can reconnect when the list changes. + + RtecScheduler::handle_t lcl_info_; + // Our RT_Info. + + RtecEventChannelAdmin::ProxyPushConsumer_var consumer_proxy_; + // We talk to the EC (as a consumer) using this proxy. + + ACE_INET_Addr ignore_from_; + // Ignore any events coming from this IP addres. +}; + +class TAO_ORBSVCS_Export TAO_ECG_UDP_EH : public ACE_Event_Handler +{ + // + // = TITLE + // Event Handler for UDP messages. + // + // = DESCRIPTION + // This object receives callbacks from the Reactor when data is + // available on a UDP socket, it forwards to the ECG_UDP_Receiver + // which reads the events and transform it into an event. +public: + TAO_ECG_UDP_EH (TAO_ECG_UDP_Receiver *recv); + + int open (const ACE_INET_Addr& ipaddr); + // Open the datagram and register with this->reactor() + + int close (void); + // Close the datagram and unregister with the reactor. + + // Reactor callbacks + virtual int handle_input (ACE_HANDLE fd); + virtual ACE_HANDLE get_handle (void) const; + +private: + ACE_SOCK_Dgram dgram_; + // The datagram used to receive the data. + + TAO_ECG_UDP_Receiver* receiver_; + // We callback to this object when a message arrives. +}; + +class TAO_ORBSVCS_Export TAO_ECG_Mcast_EH : public ACE_Event_Handler +{ + // + // = TITLE + // Event Handler for UDP messages. + // + // = DESCRIPTION + // This object receives callbacks from the Reactor when data is + // available on the mcast socket, it forwards to the UDP_Receive + // gateway which reads the events and transform it into an event. + // +public: + TAO_ECG_Mcast_EH (TAO_ECG_UDP_Receiver *recv); + + int open (const ACE_INET_Addr& mcast_group); + // Open the datagram (join the mcast group) and register with + // this->reactor() + + int close (void); + // Close the datagram (leave the mcast group) and unregister with + // the reactor. + + // Reactor callbacks + virtual int handle_input (ACE_HANDLE fd); + virtual ACE_HANDLE get_handle (void) const; + +private: + ACE_SOCK_Dgram_Mcast dgram_; + // The datagram used to receive the data. + + TAO_ECG_UDP_Receiver* receiver_; + // We callback to this object when a message arrives. +}; + + +#endif /* ACE_EVENT_CHANNEL_UDP_H */ diff --git a/TAO/orbsvcs/orbsvcs/Event/Event_Channel.cpp b/TAO/orbsvcs/orbsvcs/Event/Event_Channel.cpp index e923549576c..3a48a798430 100644 --- a/TAO/orbsvcs/orbsvcs/Event/Event_Channel.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/Event_Channel.cpp @@ -769,13 +769,26 @@ ACE_EventChannel::report_disconnect_i (u_long event) } void -ACE_EventChannel::add_gateway (TAO_EC_Gateway* gw) +ACE_EventChannel::add_gateway (TAO_EC_Gateway* gw, + CORBA::Environment& _env) { this->gwys_.insert (gw); + + RtecEventChannelAdmin::ConsumerQOS c_qos; + RtecEventChannelAdmin::SupplierQOS s_qos; + + this->consumer_module_->fill_qos (c_qos, s_qos); + gw->update_consumer (c_qos, s_qos, _env); + if (_env.exception () != 0) return; + + this->supplier_module_->fill_qos (c_qos, s_qos); + gw->update_supplier (c_qos, s_qos, _env); + if (_env.exception () != 0) return; } void -ACE_EventChannel::del_gateway (TAO_EC_Gateway* gw) +ACE_EventChannel::del_gateway (TAO_EC_Gateway* gw, + CORBA::Environment&) { this->gwys_.remove (gw); } @@ -803,10 +816,25 @@ ACE_EventChannel::update_consumer_gwys (CORBA::Environment& _env) } void -ACE_EventChannel::update_supplier_gwys (CORBA::Environment&) +ACE_EventChannel::update_supplier_gwys (CORBA::Environment& _env) { if (this->gwys_.is_empty ()) return; + + ACE_DEBUG ((LM_DEBUG, + "EC (%t) Event_Channel::update_supplier_gwys\n")); + + RtecEventChannelAdmin::ConsumerQOS c_qos; + RtecEventChannelAdmin::SupplierQOS s_qos; + this->supplier_module_->fill_qos (c_qos, s_qos); + for (Gateway_Set_Iterator i = this->gwys_.begin (); + i != this->gwys_.end (); + ++i) + { + TAO_EC_Gateway* gw = *i; + gw->update_supplier (c_qos, s_qos, _env); + if (_env.exception () != 0) return; + } } // **************************************************************** @@ -3051,6 +3079,8 @@ ACE_ES_Supplier_Module::connected (ACE_Push_Supplier_Proxy *supplier, { channel_->report_connect (ACE_EventChannel::SUPPLIER); up_->connected (supplier, _env); + if (!supplier->qos ().is_gateway) + this->channel_->update_supplier_gwys (_env); } void @@ -3072,10 +3102,16 @@ ACE_ES_Supplier_Module::disconnecting (ACE_Push_Supplier_Proxy *supplier, channel_->report_disconnect_i (ACE_EventChannel::SUPPLIER); } + CORBA::Boolean dont_update = supplier->qos ().is_gateway; + + // @@ TODO It would seem // IMHO this release is broken: supplier is a parameter, we never // actually increased its reference count, so we shouldn't decrease // it. // CORBA::release (supplier); + + if (!dont_update) + this->channel_->update_supplier_gwys (_env); } void @@ -3185,6 +3221,102 @@ ACE_ES_Supplier_Module::push (ACE_Push_Supplier_Proxy *proxy, TAO_ENDTRY; } +void +ACE_ES_Supplier_Module::fill_qos (RtecEventChannelAdmin::ConsumerQOS& c_qos, + RtecEventChannelAdmin::SupplierQOS& s_qos) +{ + ACE_GUARD (ACE_ES_MUTEX, ace_mon, this->lock_); + + c_qos.is_gateway = CORBA::B_TRUE; + s_qos.is_gateway = CORBA::B_TRUE; + + int count = 0; + { + for (Supplier_Iterator i = this->all_suppliers_.begin (); + i != this->all_suppliers_.end (); + ++i) + { + ACE_Push_Supplier_Proxy *s = *i; + + if (s->qos ().is_gateway) + continue; + + count += s->qos ().publications.length (); + } + } + + RtecEventChannelAdmin::DependencySet& dep = c_qos.dependencies; + RtecEventChannelAdmin::PublicationSet& pub = s_qos.publications; + + dep.length (count + 1); + pub.length (count); + + CORBA::ULong cc = 0; + CORBA::ULong sc = 0; + dep[cc].event.type_ = ACE_ES_DISJUNCTION_DESIGNATOR; + dep[cc].event.source_ = 0; + dep[cc].event.creation_time_ = ORBSVCS_Time::zero; + dep[cc].rt_info = 0; + cc++; + + for (Supplier_Iterator i = this->all_suppliers_.begin (); + i != this->all_suppliers_.end (); + ++i) + { + ACE_Push_Supplier_Proxy *s = *i; + + if (s->qos ().is_gateway) + continue; + + CORBA::ULong count = s->qos ().publications.length (); + for (CORBA::ULong j = 0; j < count; ++j) + { + RtecEventComm::Event& event = + s->qos ().publications[j].event; + + RtecEventComm::EventType type = event.type_; + if (type <= ACE_ES_EVENT_UNDEFINED) + continue; + + // Only type and source dependencies are relevant, notice + // that we turn conjunctions into disjunctions because + // correlations could be satisfied by events coming from + // several remote ECs. + if (type <= ACE_ES_EVENT_UNDEFINED) + continue; + + // If the dependency is already there we don't add it. + CORBA::ULong k; + for (k = 0; k < sc; ++k) + { + if (pub[k].event.type_ == event.type_ + && pub[k].event.source_ == event.source_) + break; + } + if (k == sc) + { + dep[cc].event.type_ = event.type_; + dep[cc].event.source_ = event.source_; + dep[cc].event.creation_time_ = ORBSVCS_Time::zero; + // The RT_Info is filled up later. + dep[cc].rt_info = 0; + cc++; + + pub[sc].event.type_ = event.type_; + pub[sc].event.source_ = event.source_; + pub[sc].event.creation_time_ = ORBSVCS_Time::zero; + pub[sc].dependency_info.dependency_type = + RtecScheduler::TWO_WAY_CALL; + pub[sc].dependency_info.number_of_calls = 1; + pub[sc].dependency_info.rt_info = 0; + sc++; + } + } + } + dep.length (cc); + pub.length (sc); +} + // ************************************************************ ACE_ES_Priority_Timer::ACE_ES_Priority_Timer (ACE_Task_Manager* tm) diff --git a/TAO/orbsvcs/orbsvcs/Event/Event_Channel.h b/TAO/orbsvcs/orbsvcs/Event/Event_Channel.h index 942e27319f9..b500dee7566 100644 --- a/TAO/orbsvcs/orbsvcs/Event/Event_Channel.h +++ b/TAO/orbsvcs/orbsvcs/Event/Event_Channel.h @@ -237,8 +237,8 @@ public: void report_disconnect_i (u_long); // Consumer or supplier disconnected. - void add_gateway (TAO_EC_Gateway* gw); - void del_gateway (TAO_EC_Gateway* gw); + void add_gateway (TAO_EC_Gateway* gw, CORBA::Environment& _env); + void del_gateway (TAO_EC_Gateway* gw, CORBA::Environment& _env); // Add and remove gateways from the EC. void update_consumer_gwys (CORBA::Environment& _env); @@ -893,8 +893,8 @@ public: void fill_qos (RtecEventChannelAdmin::ConsumerQOS& c_qos, RtecEventChannelAdmin::SupplierQOS& s_qos); - // Fill the QoS factories with the disjuction off all the - // subscriptions in this EC. + // Fill the QoS with the disjuction off all the subscriptions in + // this EC. // It leaves the gateways out of the list. private: @@ -1163,6 +1163,12 @@ public: void shutdown (void); // Actively disconnect from all suppliers. + void fill_qos (RtecEventChannelAdmin::ConsumerQOS& c_qos, + RtecEventChannelAdmin::SupplierQOS& s_qos); + // Fill the QoS with the disjuction off all the publications in + // this EC. + // It leaves the gateways out of the list. + private: typedef ACE_Unbounded_Set_Iterator Supplier_Iterator; typedef ACE_Unbounded_Set Suppliers; @@ -1242,6 +1248,9 @@ public: RtecEventComm::EventSourceID source_id (void); // Returns underlying supplier object ref. + const RtecEventChannelAdmin::SupplierQOS& qos (void) const; + // The QoS for this supplier + private: RtecEventChannelAdmin::SupplierQOS qos_; // Reference to the supplier's qos params. diff --git a/TAO/orbsvcs/orbsvcs/Event/Event_Channel.i b/TAO/orbsvcs/orbsvcs/Event/Event_Channel.i index f9689302605..83b979ab94d 100644 --- a/TAO/orbsvcs/orbsvcs/Event/Event_Channel.i +++ b/TAO/orbsvcs/orbsvcs/Event/Event_Channel.i @@ -48,6 +48,12 @@ ACE_Push_Supplier_Proxy::source_id (void) return source_id_; } +ACE_INLINE const RtecEventChannelAdmin::SupplierQOS& +ACE_Push_Supplier_Proxy::qos (void) const +{ + return this->qos_; +} + // ************************************************** ACE_INLINE RtecEventChannelAdmin::ProxyPushSupplier_ptr diff --git a/TAO/orbsvcs/orbsvcs/Makefile b/TAO/orbsvcs/orbsvcs/Makefile index 9d20f0d8335..8047b19857f 100644 --- a/TAO/orbsvcs/orbsvcs/Makefile +++ b/TAO/orbsvcs/orbsvcs/Makefile @@ -84,6 +84,7 @@ FILES= $(IDL_FILES) \ Event/ReactorTask \ Event/Task_Manager \ Event/EC_Gateway \ + Event/EC_Gateway_UDP \ \ Sched/Config_Scheduler \ Sched/DynSched \ @@ -9167,6 +9168,267 @@ realclean: clean $(TAO_ROOT)/orbsvcs/orbsvcs/Time_Utilities.h \ $(TAO_ROOT)/orbsvcs/orbsvcs/Time_Utilities.i \ $(TAO_ROOT)/orbsvcs/orbsvcs/Event_Utilities.i +.obj/EC_Gateway_UDP.o .obj/EC_Gateway_UDP.so .shobj/EC_Gateway_UDP.o .shobj/EC_Gateway_UDP.so: Event/EC_Gateway_UDP.cpp \ + $(TAO_ROOT)/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.h \ + $(ACE_ROOT)/ace/SOCK_CODgram.h \ + $(ACE_ROOT)/ace/SOCK_IO.h \ + $(ACE_ROOT)/ace/SOCK.h \ + $(ACE_ROOT)/ace/ACE.h \ + $(ACE_ROOT)/ace/OS.h \ + $(ACE_ROOT)/ace/inc_user_config.h \ + $(ACE_ROOT)/ace/config.h \ + $(ACE_ROOT)/ace/config-sunos5.5.h \ + $(ACE_ROOT)/ace/config-g++-common.h \ + $(ACE_ROOT)/ace/streams.h \ + $(ACE_ROOT)/ace/Basic_Types.h \ + $(ACE_ROOT)/ace/Basic_Types.i \ + $(ACE_ROOT)/ace/OS.i \ + $(ACE_ROOT)/ace/Trace.h \ + $(ACE_ROOT)/ace/Log_Msg.h \ + $(ACE_ROOT)/ace/Log_Record.h \ + $(ACE_ROOT)/ace/ACE.i \ + $(ACE_ROOT)/ace/Log_Priority.h \ + $(ACE_ROOT)/ace/Log_Record.i \ + $(ACE_ROOT)/ace/Addr.h \ + $(ACE_ROOT)/ace/Addr.i \ + $(ACE_ROOT)/ace/IPC_SAP.h \ + $(ACE_ROOT)/ace/IPC_SAP.i \ + $(ACE_ROOT)/ace/SOCK.i \ + $(ACE_ROOT)/ace/SOCK_IO.i \ + $(ACE_ROOT)/ace/SOCK_CODgram.i \ + $(ACE_ROOT)/ace/SOCK_Dgram_Mcast.h \ + $(ACE_ROOT)/ace/SOCK_Dgram.h \ + $(ACE_ROOT)/ace/SOCK_Dgram.i \ + $(ACE_ROOT)/ace/INET_Addr.h \ + $(ACE_ROOT)/ace/INET_Addr.i \ + $(ACE_ROOT)/ace/SOCK_Dgram_Mcast.i \ + $(TAO_ROOT)/orbsvcs/orbsvcs/RtecEventCommS.h \ + $(TAO_ROOT)/orbsvcs/orbsvcs/CosTimeBaseS.h \ + $(TAO_ROOT)/orbsvcs/orbsvcs/CosTimeBaseC.h \ + $(TAO_ROOT)/tao/corba.h \ + $(TAO_ROOT)/tao/orbconf.h \ + $(ACE_ROOT)/ace/Get_Opt.h \ + $(ACE_ROOT)/ace/Get_Opt.i \ + $(ACE_ROOT)/ace/SOCK_Stream.h \ + $(ACE_ROOT)/ace/SOCK_Stream.i \ + $(ACE_ROOT)/ace/Synch_T.h \ + $(ACE_ROOT)/ace/Event_Handler.h \ + $(ACE_ROOT)/ace/Event_Handler.i \ + $(ACE_ROOT)/ace/Synch.h \ + $(ACE_ROOT)/ace/SV_Semaphore_Complex.h \ + $(ACE_ROOT)/ace/SV_Semaphore_Simple.h \ + $(ACE_ROOT)/ace/SV_Semaphore_Simple.i \ + $(ACE_ROOT)/ace/SV_Semaphore_Complex.i \ + $(ACE_ROOT)/ace/Synch.i \ + $(ACE_ROOT)/ace/Synch_T.i \ + $(ACE_ROOT)/ace/Thread.h \ + $(ACE_ROOT)/ace/Thread.i \ + $(ACE_ROOT)/ace/Atomic_Op.i \ + $(ACE_ROOT)/ace/Synch_T.cpp \ + $(ACE_ROOT)/ace/Hash_Map_Manager.h \ + $(ACE_ROOT)/ace/Hash_Map_Manager.cpp \ + $(ACE_ROOT)/ace/Service_Config.h \ + $(ACE_ROOT)/ace/Service_Object.h \ + $(ACE_ROOT)/ace/Shared_Object.h \ + $(ACE_ROOT)/ace/Shared_Object.i \ + $(ACE_ROOT)/ace/Service_Object.i \ + $(ACE_ROOT)/ace/Signal.h \ + $(ACE_ROOT)/ace/Containers.h \ + $(ACE_ROOT)/ace/Containers.i \ + $(ACE_ROOT)/ace/Containers.cpp \ + $(ACE_ROOT)/ace/Malloc.h \ + $(ACE_ROOT)/ace/Malloc_Base.h \ + $(ACE_ROOT)/ace/Malloc.i \ + $(ACE_ROOT)/ace/Malloc_T.h \ + $(ACE_ROOT)/ace/Free_List.h \ + $(ACE_ROOT)/ace/Free_List.i \ + $(ACE_ROOT)/ace/Free_List.cpp \ + $(ACE_ROOT)/ace/Malloc_T.i \ + $(ACE_ROOT)/ace/Malloc_T.cpp \ + $(ACE_ROOT)/ace/Memory_Pool.h \ + $(ACE_ROOT)/ace/Mem_Map.h \ + $(ACE_ROOT)/ace/Mem_Map.i \ + $(ACE_ROOT)/ace/Memory_Pool.i \ + $(ACE_ROOT)/ace/Signal.i \ + $(ACE_ROOT)/ace/Object_Manager.h \ + $(ACE_ROOT)/ace/Object_Manager.i \ + $(ACE_ROOT)/ace/Managed_Object.h \ + $(ACE_ROOT)/ace/Managed_Object.i \ + $(ACE_ROOT)/ace/Managed_Object.cpp \ + $(ACE_ROOT)/ace/SString.h \ + $(ACE_ROOT)/ace/SString.i \ + $(ACE_ROOT)/ace/Service_Config.i \ + $(ACE_ROOT)/ace/Reactor.h \ + $(ACE_ROOT)/ace/Handle_Set.h \ + $(ACE_ROOT)/ace/Handle_Set.i \ + $(ACE_ROOT)/ace/Timer_Queue.h \ + $(ACE_ROOT)/ace/Timer_Queue_T.h \ + $(ACE_ROOT)/ace/Timer_Queue_T.i \ + $(ACE_ROOT)/ace/Timer_Queue_T.cpp \ + $(ACE_ROOT)/ace/Reactor.i \ + $(ACE_ROOT)/ace/Reactor_Impl.h \ + $(ACE_ROOT)/ace/Svc_Conf_Tokens.h \ + $(ACE_ROOT)/ace/SOCK_Acceptor.h \ + $(ACE_ROOT)/ace/Time_Value.h \ + $(ACE_ROOT)/ace/SOCK_Acceptor.i \ + $(ACE_ROOT)/ace/SOCK_Connector.h \ + $(ACE_ROOT)/ace/SOCK_Connector.i \ + $(ACE_ROOT)/ace/Strategies.h \ + $(ACE_ROOT)/ace/Strategies_T.h \ + $(ACE_ROOT)/ace/Synch_Options.h \ + $(ACE_ROOT)/ace/Synch_Options.i \ + $(ACE_ROOT)/ace/Strategies_T.i \ + $(ACE_ROOT)/ace/Strategies_T.cpp \ + $(ACE_ROOT)/ace/Service_Repository.h \ + $(ACE_ROOT)/ace/Service_Types.h \ + $(ACE_ROOT)/ace/Service_Types.i \ + $(ACE_ROOT)/ace/Service_Repository.i \ + $(ACE_ROOT)/ace/Thread_Manager.h \ + $(ACE_ROOT)/ace/Thread_Manager.i \ + $(ACE_ROOT)/ace/WFMO_Reactor.h \ + $(ACE_ROOT)/ace/Message_Queue.h \ + $(ACE_ROOT)/ace/Message_Block.h \ + $(ACE_ROOT)/ace/Message_Block.i \ + $(ACE_ROOT)/ace/IO_Cntl_Msg.h \ + $(ACE_ROOT)/ace/Message_Queue_T.h \ + $(ACE_ROOT)/ace/Message_Queue_T.i \ + $(ACE_ROOT)/ace/Message_Queue_T.cpp \ + $(ACE_ROOT)/ace/Message_Queue.i \ + $(ACE_ROOT)/ace/WFMO_Reactor.i \ + $(ACE_ROOT)/ace/Strategies.i \ + $(ACE_ROOT)/ace/Connector.h \ + $(ACE_ROOT)/ace/Map_Manager.h \ + $(ACE_ROOT)/ace/Map_Manager.i \ + $(ACE_ROOT)/ace/Map_Manager.cpp \ + $(ACE_ROOT)/ace/Svc_Handler.h \ + $(ACE_ROOT)/ace/Task.h \ + $(ACE_ROOT)/ace/Task.i \ + $(ACE_ROOT)/ace/Task_T.h \ + $(ACE_ROOT)/ace/Task_T.i \ + $(ACE_ROOT)/ace/Task_T.cpp \ + $(ACE_ROOT)/ace/Module.h \ + $(ACE_ROOT)/ace/Module.i \ + $(ACE_ROOT)/ace/Module.cpp \ + $(ACE_ROOT)/ace/Stream_Modules.h \ + $(ACE_ROOT)/ace/Stream_Modules.i \ + $(ACE_ROOT)/ace/Stream_Modules.cpp \ + $(ACE_ROOT)/ace/Dynamic.h \ + $(ACE_ROOT)/ace/Dynamic.i \ + $(ACE_ROOT)/ace/Singleton.h \ + $(ACE_ROOT)/ace/Singleton.i \ + $(ACE_ROOT)/ace/Singleton.cpp \ + $(ACE_ROOT)/ace/Svc_Handler.i \ + $(ACE_ROOT)/ace/Svc_Handler.cpp \ + $(ACE_ROOT)/ace/Connector.i \ + $(ACE_ROOT)/ace/Connector.cpp \ + $(ACE_ROOT)/ace/Acceptor.h \ + $(ACE_ROOT)/ace/Acceptor.i \ + $(ACE_ROOT)/ace/Acceptor.cpp \ + $(TAO_ROOT)/tao/compat/objbase.h \ + $(TAO_ROOT)/tao/Align.h \ + $(TAO_ROOT)/tao/ORB.h \ + $(TAO_ROOT)/tao/Sequence.h \ + $(TAO_ROOT)/tao/Sequence.i \ + $(TAO_ROOT)/tao/Sequence_T.h \ + $(TAO_ROOT)/tao/Sequence_T.i \ + $(TAO_ROOT)/tao/Sequence_T.cpp \ + $(TAO_ROOT)/tao/Object_KeyC.h \ + $(TAO_ROOT)/tao/Object_KeyC.i \ + $(TAO_ROOT)/tao/Union.h \ + $(TAO_ROOT)/tao/ORB.i \ + $(TAO_ROOT)/tao/Exception.h \ + $(TAO_ROOT)/tao/Exception.i \ + $(TAO_ROOT)/tao/Any.h \ + $(TAO_ROOT)/tao/Any.i \ + $(TAO_ROOT)/tao/NVList.h \ + $(TAO_ROOT)/tao/NVList.i \ + $(TAO_ROOT)/tao/Principal.h \ + $(TAO_ROOT)/tao/Request.h \ + $(TAO_ROOT)/tao/Request.i \ + $(TAO_ROOT)/tao/Stub.h \ + $(TAO_ROOT)/tao/Stub.i \ + $(TAO_ROOT)/tao/Object.h \ + $(TAO_ROOT)/tao/Object.i \ + $(TAO_ROOT)/tao/Typecode.h \ + $(TAO_ROOT)/tao/Typecode.i \ + $(TAO_ROOT)/tao/Marshal.h \ + $(TAO_ROOT)/tao/Marshal.i \ + $(TAO_ROOT)/tao/CDR.h \ + $(TAO_ROOT)/tao/CDR.i \ + $(TAO_ROOT)/tao/PolicyC.h \ + $(TAO_ROOT)/tao/PolicyC.i \ + $(TAO_ROOT)/tao/CurrentC.h \ + $(TAO_ROOT)/tao/CurrentC.i \ + $(TAO_ROOT)/tao/POA.h \ + $(TAO_ROOT)/tao/POAC.h \ + $(TAO_ROOT)/tao/POAC.i \ + $(TAO_ROOT)/tao/Servant_Base.h \ + $(TAO_ROOT)/tao/POAS.h \ + $(TAO_ROOT)/tao/POA_CORBA.h \ + $(TAO_ROOT)/tao/POAS.i \ + $(TAO_ROOT)/tao/Object_Table.h \ + $(TAO_ROOT)/tao/POA.i \ + $(TAO_ROOT)/tao/poa_macros.h \ + $(TAO_ROOT)/tao/params.h \ + $(TAO_ROOT)/tao/params.i \ + $(TAO_ROOT)/tao/Connect.h \ + $(TAO_ROOT)/tao/Connect.i \ + $(TAO_ROOT)/tao/ORB_Core.h \ + $(TAO_ROOT)/tao/ORB_Core.i \ + $(ACE_ROOT)/ace/Dynamic_Service.h \ + $(ACE_ROOT)/ace/Dynamic_Service.cpp \ + $(TAO_ROOT)/tao/Operation_Table.h \ + $(TAO_ROOT)/tao/debug.h \ + $(TAO_ROOT)/tao/Client_Strategy_Factory.h \ + $(TAO_ROOT)/tao/Server_Strategy_Factory.h \ + $(TAO_ROOT)/tao/default_client.h \ + $(TAO_ROOT)/tao/default_client.i \ + $(TAO_ROOT)/tao/default_server.h \ + $(TAO_ROOT)/tao/ORB_Strategies_T.h \ + $(TAO_ROOT)/tao/ORB_Strategies_T.i \ + $(TAO_ROOT)/tao/ORB_Strategies_T.cpp \ + $(TAO_ROOT)/tao/default_server.i \ + $(TAO_ROOT)/tao/IIOP_Object.h \ + $(TAO_ROOT)/tao/IIOP_Object.i \ + $(TAO_ROOT)/tao/IIOP_ORB.h \ + $(TAO_ROOT)/tao/IIOP_ORB.i \ + $(TAO_ROOT)/tao/IIOP_Interpreter.h \ + $(TAO_ROOT)/tao/GIOP.h \ + $(TAO_ROOT)/tao/GIOP.i \ + $(TAO_ROOT)/tao/Server_Request.h \ + $(TAO_ROOT)/tao/Server_Request.i \ + $(TAO_ROOT)/tao/singletons.h \ + $(TAO_ROOT)/orbsvcs/orbsvcs/orbsvcs_export.h \ + $(TAO_ROOT)/orbsvcs/orbsvcs/CosTimeBaseC.i \ + $(TAO_ROOT)/orbsvcs/orbsvcs/CosTimeBaseS_T.h \ + $(TAO_ROOT)/orbsvcs/orbsvcs/CosTimeBaseS_T.i \ + $(TAO_ROOT)/orbsvcs/orbsvcs/CosTimeBaseS_T.cpp \ + $(TAO_ROOT)/orbsvcs/orbsvcs/CosTimeBaseS.i \ + $(TAO_ROOT)/orbsvcs/orbsvcs/RtecEventCommC.h \ + $(TAO_ROOT)/orbsvcs/orbsvcs/RtecEventCommC.i \ + $(TAO_ROOT)/orbsvcs/orbsvcs/RtecEventCommS_T.h \ + $(TAO_ROOT)/orbsvcs/orbsvcs/RtecEventCommS_T.i \ + $(TAO_ROOT)/orbsvcs/orbsvcs/RtecEventCommS_T.cpp \ + $(TAO_ROOT)/orbsvcs/orbsvcs/RtecEventCommS.i \ + $(TAO_ROOT)/orbsvcs/orbsvcs/RtecEventChannelAdminS.h \ + $(TAO_ROOT)/orbsvcs/orbsvcs/RtecSchedulerS.h \ + $(TAO_ROOT)/orbsvcs/orbsvcs/RtecSchedulerC.h \ + $(TAO_ROOT)/orbsvcs/orbsvcs/RtecSchedulerC.i \ + $(TAO_ROOT)/orbsvcs/orbsvcs/RtecSchedulerS_T.h \ + $(TAO_ROOT)/orbsvcs/orbsvcs/RtecSchedulerS_T.i \ + $(TAO_ROOT)/orbsvcs/orbsvcs/RtecSchedulerS_T.cpp \ + $(TAO_ROOT)/orbsvcs/orbsvcs/RtecSchedulerS.i \ + $(TAO_ROOT)/orbsvcs/orbsvcs/RtecEventChannelAdminC.h \ + $(TAO_ROOT)/orbsvcs/orbsvcs/RtecEventChannelAdminC.i \ + $(TAO_ROOT)/orbsvcs/orbsvcs/RtecEventChannelAdminS_T.h \ + $(TAO_ROOT)/orbsvcs/orbsvcs/RtecEventChannelAdminS_T.i \ + $(TAO_ROOT)/orbsvcs/orbsvcs/RtecEventChannelAdminS_T.cpp \ + $(TAO_ROOT)/orbsvcs/orbsvcs/RtecEventChannelAdminS.i \ + $(TAO_ROOT)/orbsvcs/orbsvcs/Event_Utilities.h \ + $(TAO_ROOT)/orbsvcs/orbsvcs/Event_Service_Constants.h \ + $(TAO_ROOT)/orbsvcs/orbsvcs/Time_Utilities.h \ + $(TAO_ROOT)/orbsvcs/orbsvcs/Time_Utilities.i \ + $(TAO_ROOT)/orbsvcs/orbsvcs/Event_Utilities.i .obj/Config_Scheduler.o .obj/Config_Scheduler.so .shobj/Config_Scheduler.o .shobj/Config_Scheduler.so: Sched/Config_Scheduler.cpp \ $(TAO_ROOT)/orbsvcs/orbsvcs/Time_Utilities.h \ $(TAO_ROOT)/orbsvcs/orbsvcs/CosTimeBaseC.h \ diff --git a/TAO/orbsvcs/tests/EC_Mcast/EC_Mcast.cpp b/TAO/orbsvcs/tests/EC_Mcast/EC_Mcast.cpp new file mode 100644 index 00000000000..58f4741044d --- /dev/null +++ b/TAO/orbsvcs/tests/EC_Mcast/EC_Mcast.cpp @@ -0,0 +1,1239 @@ +// $Id$ + +#include "ace/Get_Opt.h" +#include "ace/Auto_Ptr.h" +#include "ace/Sched_Params.h" + +#include "orbsvcs/Event_Utilities.h" +#include "orbsvcs/Event_Service_Constants.h" +#include "orbsvcs/Scheduler_Factory.h" +#include "orbsvcs/Time_Utilities.h" +#include "orbsvcs/RtecEventChannelAdminC.h" +#include "orbsvcs/Sched/Config_Scheduler.h" +#include "orbsvcs/Runtime_Scheduler.h" +#include "orbsvcs/Event/Event_Channel.h" +#include "EC_Mcast.h" + +#define ECM_DEFAULT_SEND_MCAST_GROUP "224.9.9.1" +#define ECM_DEFAULT_RECV_MCAST_GROUP "224.9.9.2" + +ECM_Driver::ECM_Driver (void) + : lcl_name_ ("ECM"), + short_circuit_ (0), + n_suppliers_ (1), + n_consumers_ (1), + workload_ (10), + event_period_ (25000), + event_count_ (200), + s_event_a_ (ACE_ES_EVENT_UNDEFINED), + s_event_b_ (ACE_ES_EVENT_UNDEFINED + 1), + c_event_a_ (ACE_ES_EVENT_UNDEFINED), + c_event_b_ (ACE_ES_EVENT_UNDEFINED + 1), + r_event_a_ (ACE_ES_EVENT_UNDEFINED), + r_event_b_ (ACE_ES_EVENT_UNDEFINED + 1), + schedule_file_ (0), + pid_file_name_ (0), + send_mcast_group_ (u_short(23000), ECM_DEFAULT_SEND_MCAST_GROUP), + recv_mcast_group_ (u_short(23001), ECM_DEFAULT_RECV_MCAST_GROUP), + mcast_eh_ (&receiver_), + ready_ (0), + ready_cnd_ (ready_mtx_) +{ +} + + + +int +ECM_Driver::run (int argc, char* argv[]) +{ + TAO_TRY + { + CORBA::ORB_var orb = + CORBA::ORB_init (argc, argv, "", TAO_TRY_ENV); + TAO_CHECK_ENV; + + CORBA::Object_var poa_object = + orb->resolve_initial_references("RootPOA"); + if (CORBA::is_nil (poa_object.in ())) + ACE_ERROR_RETURN ((LM_ERROR, + " (%P|%t) Unable to initialize the POA.\n"), + 1); + + PortableServer::POA_var root_poa = + PortableServer::POA::_narrow (poa_object.in (), TAO_TRY_ENV); + TAO_CHECK_ENV; + + PortableServer::POAManager_var poa_manager = + root_poa->the_POAManager (TAO_TRY_ENV); + TAO_CHECK_ENV; + + if (this->parse_args (argc, argv)) + return 1; + + ACE_DEBUG ((LM_DEBUG, + "Execution parameters:\n" + " lcl name = <%s>\n" + " short circuit = <%d>\n" + " suppliers = <%d>\n" + " consumers = <%d>\n" + " workload = <%d> (iterations)\n" + " event period = <%d> (usecs)\n" + " event count = <%d>\n" + " supplier Event A = <%d>\n" + " supplier Event B = <%d>\n" + " consumer Event A = <%d>\n" + " consumer Event B = <%d>\n" + " remote Event A = <%d>\n" + " remote Event B = <%d>\n" + " schedule_file = <%s>\n" + " pid file name = <%s>\n", + this->lcl_name_?this->lcl_name_:"nil", + this->short_circuit_, + + this->n_suppliers_, + this->n_consumers_, + this->workload_, + this->event_period_, + this->event_count_, + this->s_event_a_, + this->s_event_b_, + this->c_event_a_, + this->c_event_b_, + this->r_event_a_, + this->r_event_b_, + + this->schedule_file_?this->schedule_file_:"nil", + this->pid_file_name_?this->pid_file_name_:"nil") ); + + if (this->pid_file_name_ != 0) + { + FILE* pid = ACE_OS::fopen (this->pid_file_name_, "w"); + if (pid != 0) + { + ACE_OS::fprintf (pid, "%d\n", ACE_OS::getpid ()); + ACE_OS::fclose (pid); + } + } + + int min_priority = + ACE_Sched_Params::priority_min (ACE_SCHED_FIFO); + // Enable FIFO scheduling, e.g., RT scheduling class on Solaris. + + if (ACE_OS::sched_params (ACE_Sched_Params (ACE_SCHED_FIFO, + min_priority, + ACE_SCOPE_PROCESS)) != 0) + { + if (ACE_OS::last_error () == EPERM) + ACE_DEBUG ((LM_DEBUG, + "%s: user is not superuser, " + "so remain in time-sharing class\n", argv[0])); + else + ACE_ERROR ((LM_ERROR, + "%s: ACE_OS::sched_params failed\n", argv[0])); + } + + if (ACE_OS::thr_setprio (min_priority) == -1) + { + ACE_ERROR ((LM_ERROR, "(%P|%t) main thr_setprio failed\n")); + } + + CORBA::Object_var naming_obj = + orb->resolve_initial_references ("NameService"); + if (CORBA::is_nil (naming_obj.in ())) + ACE_ERROR_RETURN ((LM_ERROR, + " (%P|%t) Unable to get the Naming Service.\n"), + 1); + + CosNaming::NamingContext_var naming_context = + CosNaming::NamingContext::_narrow (naming_obj.in (), TAO_TRY_ENV); + TAO_CHECK_ENV; + + ACE_Config_Scheduler scheduler_impl; + RtecScheduler::Scheduler_var scheduler = + scheduler_impl._this (TAO_TRY_ENV); + TAO_CHECK_ENV; + + // We use this buffer to generate the names of the local + // services. + const int bufsize = 512; + char buf[bufsize]; + + CORBA::String_var str = + orb->object_to_string (scheduler.in (), TAO_TRY_ENV); + TAO_CHECK_ENV; + ACE_DEBUG ((LM_DEBUG, "The (local) scheduler IOR is <%s>\n", + str.in ())); + + ACE_OS::strcpy (buf, "ScheduleService@"); + ACE_OS::strcat (buf, this->lcl_name_); + + // Register the servant with the Naming Context.... + CosNaming::Name schedule_name (1); + schedule_name.length (1); + schedule_name[0].id = CORBA::string_dup (buf); + naming_context->bind (schedule_name, scheduler.in (), TAO_TRY_ENV); + TAO_CHECK_ENV; + + if (ACE_Scheduler_Factory::use_config (naming_context.in (), + buf) == -1) + return -1; + + // Create the EventService implementation, but don't start its + // internal threads. + ACE_EventChannel ec_impl (CORBA::B_FALSE); + + // Register Event_Service with the Naming Service. + RtecEventChannelAdmin::EventChannel_var ec = + ec_impl._this (TAO_TRY_ENV); + TAO_CHECK_ENV; + + str = orb->object_to_string (ec.in (), TAO_TRY_ENV); + TAO_CHECK_ENV; + + ACE_DEBUG ((LM_DEBUG, "The (local) EC IOR is <%s>\n", str.in ())); + + ACE_OS::strcpy (buf, "EventChannel@"); + ACE_OS::strcat (buf, this->lcl_name_); + + CosNaming::Name channel_name (1); + channel_name.length (1); + channel_name[0].id = CORBA::string_dup (buf); + naming_context->bind (channel_name, ec.in (), TAO_TRY_ENV); + TAO_CHECK_ENV; + + ACE_DEBUG ((LM_DEBUG, "waiting to start\n")); + + ACE_Time_Value tv (15, 0); + + poa_manager->activate (TAO_TRY_ENV); + TAO_CHECK_ENV; + + ACE_DEBUG ((LM_DEBUG, "starting....\n")); + + RtecEventChannelAdmin::EventChannel_var local_ec = + ec_impl._this (TAO_TRY_ENV); + TAO_CHECK_ENV; + + ACE_DEBUG ((LM_DEBUG, "located local EC\n")); + + this->connect_ecg (local_ec, TAO_TRY_ENV); + TAO_CHECK_ENV; + + this->connect_suppliers (local_ec.in (), TAO_TRY_ENV); + TAO_CHECK_ENV; + + ACE_DEBUG ((LM_DEBUG, "connected supplier\n")); + + this->connect_consumers (local_ec.in (), TAO_TRY_ENV); + TAO_CHECK_ENV; + + ACE_DEBUG ((LM_DEBUG, "connected consumer\n")); + + this->activate_suppliers (local_ec.in (), TAO_TRY_ENV); + TAO_CHECK_ENV; + + ACE_DEBUG ((LM_DEBUG, "suppliers are active\n")); + + this->running_suppliers_ = this->n_suppliers_; + + // Acquire the mutex for the ready mutex, blocking any supplier + // that may start after this point. + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ready_mon, this->ready_mtx_, 1); + this->ready_ = 1; + this->test_start_ = ACE_OS::gethrtime (); + ready_mon.release (); + this->ready_cnd_.broadcast (); + + ACE_DEBUG ((LM_DEBUG, "activate the EC\n")); + + // Create the EC internal threads + ec_impl.activate (); + + ACE_DEBUG ((LM_DEBUG, "running the test\n")); + if (orb->run () == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "orb->run"), -1); + + this->test_stop_ = ACE_OS::gethrtime (); + + ACE_DEBUG ((LM_DEBUG, "shutdown the EC\n")); + ec_impl.shutdown (); + + this->dump_results (); + + this->receiver_.close (TAO_TRY_ENV); + TAO_CHECK_ENV; + this->receiver_.shutdown (TAO_TRY_ENV); + TAO_CHECK_ENV; + + this->sender_.close (TAO_TRY_ENV); + TAO_CHECK_ENV; + this->sender_.shutdown (TAO_TRY_ENV); + TAO_CHECK_ENV; + + this->disconnect_consumers (TAO_TRY_ENV); + TAO_CHECK_ENV; + this->disconnect_suppliers (TAO_TRY_ENV); + TAO_CHECK_ENV; + + ACE_DEBUG ((LM_DEBUG, "shutdown grace period\n")); + tv.set (5, 0); + if (orb->run (&tv) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "orb->run"), -1); + + naming_context->unbind (schedule_name, TAO_TRY_ENV); + TAO_CHECK_ENV; + + naming_context->unbind (channel_name, TAO_TRY_ENV); + TAO_CHECK_ENV; + + if (this->schedule_file_ != 0) + { + RtecScheduler::RT_Info_Set_var infos; + +#if defined (__SUNPRO_CC) + // Sun C++ 4.2 warns with the code below: + // Warning (Anachronism): Temporary used for non-const + // reference, now obsolete. + // Note: Type "CC -migration" for more on anachronisms. + // Warning (Anachronism): The copy constructor for argument + // infos of type RtecScheduler::RT_Info_Set_out should take + // const RtecScheduler::RT_Info_Set_out&. + // But, this code is not CORBA conformant, because users should + // not define instances of _out types. + + RtecScheduler::RT_Info_Set_out infos_out (infos); + ACE_Scheduler_Factory::server ()->compute_scheduling + (ACE_Sched_Params::priority_min (ACE_SCHED_FIFO, + ACE_SCOPE_THREAD), + ACE_Sched_Params::priority_max (ACE_SCHED_FIFO, + ACE_SCOPE_THREAD), + infos_out, TAO_TRY_ENV); +#else /* ! __SUNPRO_CC */ + ACE_Scheduler_Factory::server ()->compute_scheduling + (ACE_Sched_Params::priority_min (ACE_SCHED_FIFO, + ACE_SCOPE_THREAD), + ACE_Sched_Params::priority_max (ACE_SCHED_FIFO, + ACE_SCOPE_THREAD), + infos.out (), TAO_TRY_ENV); +#endif /* ! __SUNPRO_CC */ + + TAO_CHECK_ENV; + ACE_Scheduler_Factory::dump_schedule (infos.in (), + this->schedule_file_); + } + } + TAO_CATCH (CORBA::SystemException, sys_ex) + { + TAO_TRY_ENV.print_exception ("SYS_EX"); + } + TAO_CATCHANY + { + TAO_TRY_ENV.print_exception ("NON SYS EX"); + } + TAO_ENDTRY; + return 0; +} + +void +ECM_Driver::disconnect_suppliers (CORBA::Environment &_env) +{ + for (int i = 0; i < this->n_suppliers_; ++i) + { + this->suppliers_[i]->close (_env); + if (_env.exception () != 0) return; + } +} + +void +ECM_Driver::connect_suppliers (RtecEventChannelAdmin::EventChannel_ptr local_ec, + CORBA::Environment &_env) +{ + TAO_TRY + { + for (int i = 0; i < this->n_suppliers_; ++i) + { + // Limit the number of events sent by each supplier + int mc = this->event_count_ / this->n_suppliers_; + if (mc == 0) + mc = 1; + + char buf[BUFSIZ]; + ACE_OS::sprintf (buf, "supplier_%02.2d@%s", i, this->lcl_name_); + + ACE_NEW (this->suppliers_[i], + ECM_Supplier (this, this->suppliers_ + i)); + + this->suppliers_[i]->open (buf, + this->s_event_a_, + this->s_event_b_, + mc, + this->event_period_ * 10, + local_ec, + TAO_TRY_ENV); + TAO_CHECK_ENV; + } + + } + TAO_CATCHANY + { + TAO_RETHROW; + } + TAO_ENDTRY; +} + +void +ECM_Driver::disconnect_consumers (CORBA::Environment &_env) +{ + for (int i = 0; i < this->n_consumers_; ++i) + { + this->consumers_[i]->close (_env); + if (_env.exception () != 0) return; + } +} + +void +ECM_Driver::activate_suppliers (RtecEventChannelAdmin::EventChannel_ptr local_ec, + CORBA::Environment &_env) +{ + TAO_TRY + { + for (int i = 0; i < this->n_suppliers_; ++i) + { + // Limit the number of events sent by each supplier + int mc = this->event_count_ / this->n_suppliers_; + if (mc == 0) + mc = 1; + + char buf[BUFSIZ]; + ACE_OS::sprintf (buf, "supplier_%02.2d@%s", i, this->lcl_name_); + + this->suppliers_[i]->activate (buf, + this->event_period_ * 10, + local_ec, + TAO_TRY_ENV); + TAO_CHECK_ENV; + } + + } + TAO_CATCHANY + { + TAO_RETHROW; + } + TAO_ENDTRY; +} + +void +ECM_Driver::connect_consumers (RtecEventChannelAdmin::EventChannel_ptr local_ec, + CORBA::Environment &_env) +{ + TAO_TRY + { + for (int i = 0; i < this->n_consumers_; ++i) + { + char buf[BUFSIZ]; + ACE_OS::sprintf (buf, "consumer_%02.2d@%s", i, this->lcl_name_); + + ACE_NEW (this->consumers_[i], + ECM_Consumer (this, this->consumers_ + i)); + + this->consumers_[i]->open (buf, + this->c_event_a_, + this->c_event_b_, + local_ec, + TAO_TRY_ENV); + TAO_CHECK_ENV; + this->stats_[i].total_time_ = 0; + this->stats_[i].lcl_count_ = 0; + this->stats_[i].rmt_count_ = 0; + } + + this->running_consumers_ = this->n_consumers_; + } + TAO_CATCHANY + { + TAO_RETHROW; + } + TAO_ENDTRY; +} + +void +ECM_Driver::connect_ecg (RtecEventChannelAdmin::EventChannel_ptr local_ec, + CORBA::Environment &_env) +{ + TAO_TRY + { + RtecScheduler::Scheduler_ptr local_sch = + ACE_Scheduler_Factory::server (); + + // We could use the same name on the local and remote scheduler, + // but that fails when using a global scheduler. + const int bufsize = 512; + + char mcast_name[bufsize]; + ACE_OS::strcpy (mcast_name, "sender"); + ACE_OS::strcat (mcast_name, "@"); + ACE_OS::strcat (mcast_name, this->lcl_name_); + + this->sender_.init (local_ec, + local_sch, + mcast_name, + this->send_mcast_group_, + TAO_TRY_ENV); + TAO_CHECK_ENV; + + ACE_INET_Addr ignore_from; + this->sender_.get_local_addr (ignore_from); + + char recv_name[bufsize]; + ACE_OS::strcpy (recv_name, "receiver"); + ACE_OS::strcat (recv_name, "@"); + ACE_OS::strcat (recv_name, this->lcl_name_); + + this->receiver_.init (local_ec, + local_sch, + recv_name, + ignore_from, + TAO_TRY_ENV); + TAO_CHECK_ENV; + + this->mcast_eh_.reactor (TAO_ORB_Core_instance ()->reactor ()); + this->mcast_eh_.open (this->recv_mcast_group_); + + RtecEventChannelAdmin::ConsumerQOS sender_qos; + sender_qos.is_gateway = CORBA::B_TRUE; + sender_qos.dependencies.length (3); + sender_qos.dependencies[0].event.type_ = ACE_ES_DISJUNCTION_DESIGNATOR; + sender_qos.dependencies[0].event.source_ = 0; + sender_qos.dependencies[0].event.creation_time_ = ORBSVCS_Time::zero; + sender_qos.dependencies[0].rt_info = 0; + sender_qos.dependencies[1].event.type_ = this->s_event_a_; + sender_qos.dependencies[1].event.source_ = 0; + sender_qos.dependencies[1].event.creation_time_ = ORBSVCS_Time::zero; + sender_qos.dependencies[1].rt_info = 0; + sender_qos.dependencies[2].event.type_ = this->s_event_b_; + sender_qos.dependencies[2].event.source_ = 0; + sender_qos.dependencies[2].event.creation_time_ = ORBSVCS_Time::zero; + sender_qos.dependencies[2].rt_info = 0; + + this->sender_.open (sender_qos, TAO_TRY_ENV); + TAO_CHECK_ENV; + + RtecEventChannelAdmin::SupplierQOS receiver_qos; + receiver_qos.is_gateway = CORBA::B_TRUE; + receiver_qos.publications.length (2); + receiver_qos.publications[0].event.type_ = this->r_event_a_; + receiver_qos.publications[0].event.source_ = 0; + receiver_qos.publications[0].event.creation_time_ = ORBSVCS_Time::zero; + receiver_qos.publications[0].dependency_info.dependency_type = + RtecScheduler::TWO_WAY_CALL; + receiver_qos.publications[0].dependency_info.number_of_calls = 1; + receiver_qos.publications[0].dependency_info.rt_info = 0; + receiver_qos.publications[1].event.type_ = this->r_event_b_; + receiver_qos.publications[1].event.source_ = 0; + receiver_qos.publications[1].event.creation_time_ = ORBSVCS_Time::zero; + receiver_qos.publications[1].dependency_info.dependency_type = + RtecScheduler::TWO_WAY_CALL; + receiver_qos.publications[1].dependency_info.number_of_calls = 1; + receiver_qos.publications[1].dependency_info.rt_info = 0; + + this->receiver_.open (receiver_qos, TAO_TRY_ENV); + TAO_CHECK_ENV; + } + TAO_CATCHANY + { + TAO_RETHROW; + } + TAO_ENDTRY; +} + +void +ECM_Driver::push_supplier (void * /* cookie */, + RtecEventChannelAdmin::ProxyPushConsumer_ptr consumer, + const RtecEventComm::EventSet &events, + CORBA::Environment & _env) +{ + this->wait_until_ready (); + // ACE_DEBUG ((LM_DEBUG, "(%P|%t) events sent by supplier\n")); + // @@ TODO we could keep somekind of stats here... + if (!this->short_circuit_) + { + consumer->push (events, _env); + } + else + { + for (int i = 0; i < this->n_consumers_ && !_env.exception (); ++i) + { + this->consumers_[i]->push (events, _env); + } + } +} + +void +ECM_Driver::push_consumer (void *consumer_cookie, + ACE_hrtime_t arrival, + const RtecEventComm::EventSet &events, + CORBA::Environment &) +{ + int ID = + (ACE_reinterpret_cast(ECM_Consumer**,consumer_cookie) + - this->consumers_); + + // ACE_DEBUG ((LM_DEBUG, "(%P|%t) events received by consumer %d\n", ID)); + + if (events.length () == 0) + { + // ACE_DEBUG ((LM_DEBUG, "no events\n")); + return; + } + + // ACE_DEBUG ((LM_DEBUG, "%d event(s)\n", events.length ())); + +#if 0 + const int bufsize = 128; + char buf[bufsize]; + ACE_OS::sprintf (buf, "Consumer %d receives event in thread: ", ID); +#endif + + for (u_int i = 0; i < events.length (); ++i) + { + const RtecEventComm::Event& e = events[i]; + + if (e.type_ == ACE_ES_EVENT_SHUTDOWN) + { + this->shutdown_consumer (ID); + continue; + } + + ACE_hrtime_t s; + ORBSVCS_Time::TimeT_to_hrtime (s, e.creation_time_); + ACE_hrtime_t nsec = arrival - s; + if (this->local_source (e.source_)) + { + int& count = this->stats_[ID].lcl_count_; + + this->stats_[ID].lcl_latency_[count] = nsec; + int workload = this->workload_; + int interval = this->event_period_; + + for (int j = 0; j < workload; ++j) + { + // Eat a little CPU so the Utilization test can measure the + // consumed time.... + /* takes about 40.2 usecs on a 167 MHz Ultra2 */ + u_long n = 1279UL; + ACE::is_prime (n, 2, n / 2); + } + // Increment the elapsed time on this consumer. + ACE_hrtime_t now = ACE_OS::gethrtime (); + this->stats_[ID].total_time_ += now - arrival; + this->stats_[ID].end_[count] = now; + + // We estimate our laxity based on the event creation + // time... it may not be very precise, but will do; other + // strategies include: + // + Keep track of the "current frame", then then deadline + // is the end of the frame. + // + Use the start of the test to keep the current frame. + // + Use the last execution. + + // Work around MSVC++ bug, it does not not how to convert an + // unsigned 64 bit int into a long.... + CORBA::ULong tmp = ACE_static_cast(CORBA::ULong,(s - now)); + this->stats_[ID].laxity_[count] = 1 + tmp/1000.0/interval; + count++; + } + else + { + int& count = this->stats_[ID].rmt_count_; + this->stats_[ID].rmt_latency_[count] = nsec; + count++; + } + } +} + +void +ECM_Driver::wait_until_ready (void) +{ + ACE_GUARD (ACE_Thread_Mutex, ready_mon, this->ready_mtx_); + while (!this->ready_) + this->ready_cnd_.wait (); +} + +void +ECM_Driver::shutdown_supplier (void* /* supplier_cookie */, + RtecEventComm::PushConsumer_ptr consumer, + CORBA::Environment& _env) +{ + + this->running_suppliers_--; + if (this->running_suppliers_ == 0) + { + // We propagate a shutdown event through the system... + RtecEventComm::EventSet shutdown (1); + shutdown.length (1); + RtecEventComm::Event& s = shutdown[0]; + + s.source_ = 0; + s.ttl_ = 1; + + ACE_hrtime_t t = ACE_OS::gethrtime (); + ORBSVCS_Time::hrtime_to_TimeT (s.creation_time_, t); + s.ec_recv_time_ = ORBSVCS_Time::zero; + s.ec_send_time_ = ORBSVCS_Time::zero; + s.data_.x = 0; + s.data_.y = 0; + s.type_ = ACE_ES_EVENT_SHUTDOWN; + consumer->push (shutdown, _env); + } +} + +void +ECM_Driver::shutdown_consumer (int id) +{ + ACE_DEBUG ((LM_DEBUG, "Shutdown consumer %d\n", id)); + this->running_consumers_--; + if (this->running_consumers_ == 0) + TAO_ORB_Core_instance ()->orb ()->shutdown (); +} + +int +ECM_Driver::shutdown (CORBA::Environment& _env) +{ + ACE_DEBUG ((LM_DEBUG, "Shutting down the multiple EC test\n")); + + TAO_ORB_Core_instance ()->orb ()->shutdown (); + return 0; +} + +void +ECM_Driver::dump_results (void) +{ + const int bufsize = 512; + char buf[bufsize]; + + for (int i = 0; i < this->n_consumers_; ++i) + { + ACE_OS::sprintf (buf, "CO%02.2d", i); + this->dump_results (buf, this->stats_[i]); + } + // the cast is to workaround a msvc++ bug... + CORBA::ULong tmp = ACE_static_cast(CORBA::ULong, + this->test_stop_ - this->test_start_); + double usec = tmp / 1000.0; + ACE_DEBUG ((LM_DEBUG, "Time[TOTAL]: %.3f\n", usec)); +} + +void +ECM_Driver::dump_results (const char* name, Stats& stats) +{ + // @@ We are reporting the information without specifics about + // the cast is to workaround a msvc++ bug... + double usec = ACE_static_cast(CORBA::ULong,stats.total_time_) / 1000.0; + ACE_DEBUG ((LM_DEBUG, "Time[LCL,%s]: %.3f\n", name, usec)); + int i; + for (i = 1; i < stats.lcl_count_ - 1; ++i) + { + // the cast is to workaround a msvc++ bug... + usec = ACE_static_cast(CORBA::ULong,stats.lcl_latency_[i]) / 1000.0; + ACE_DEBUG ((LM_DEBUG, "Latency[LCL,%s]: %.3f\n", name, usec)); + + double percent = stats.laxity_[i] * 100.0; + ACE_DEBUG ((LM_DEBUG, "Laxity[LCL,%s]: %.3f\n", name, percent)); + + // the cast is to workaround a msvc++ bug... + usec = ACE_static_cast(CORBA::ULong,stats.end_[i] - this->test_start_) / 1000.0; + ACE_DEBUG ((LM_DEBUG, "Completion[LCL,%s]: %.3f\n", name, usec)); + } + for (i = 1; i < stats.rmt_count_ - 1; ++i) + { + // the cast is to workaround a msvc++ bug... + double usec = ACE_static_cast(CORBA::ULong,stats.rmt_latency_[i]) / 1000.0; + ACE_DEBUG ((LM_DEBUG, "Latency[RMT,%s]: %.3f\n", name, usec)); + } +} + +int +ECM_Driver::local_source (RtecEventComm::EventSourceID id) const +{ + for (int i = 0; i < this->n_suppliers_; ++i) + { + if (this->suppliers_[i]->supplier_id () == id) + return 1; + } + return 0; +} + +int +ECM_Driver::parse_args (int argc, char *argv []) +{ + ACE_Get_Opt get_opt (argc, argv, "xl:s:r:h:p:d:"); + int opt; + + while ((opt = get_opt ()) != EOF) + { + switch (opt) + { + case 'x': + this->short_circuit_ = 1; + break; + + case 'l': + this->lcl_name_ = get_opt.optarg; + break; + + case 'h': + { + char* aux; + char* arg = ACE_OS::strtok_r (get_opt.optarg, ",", &aux); + + this->n_suppliers_ = ACE_OS::atoi (arg); + arg = ACE_OS::strtok_r (0, ",", &aux); + this->n_consumers_ = ACE_OS::atoi (arg); + arg = ACE_OS::strtok_r (0, ",", &aux); + this->workload_ = ACE_OS::atoi (arg); + arg = ACE_OS::strtok_r (0, ",", &aux); + this->event_period_ = ACE_OS::atoi (arg); + arg = ACE_OS::strtok_r (0, ",", &aux); + this->event_count_ = ACE_OS::atoi (arg); + arg = ACE_OS::strtok_r (0, ",", &aux); + this->s_event_a_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg); + arg = ACE_OS::strtok_r (0, ",", &aux); + this->s_event_b_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg); + arg = ACE_OS::strtok_r (0, ",", &aux); + this->c_event_a_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg); + arg = ACE_OS::strtok_r (0, ",", &aux); + this->c_event_b_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg); + arg = ACE_OS::strtok_r (0, ",", &aux); + this->r_event_a_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg); + arg = ACE_OS::strtok_r (0, ",", &aux); + this->r_event_b_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg); + } + break; + + case 'p': + this->pid_file_name_ = get_opt.optarg; + break; + + case 'd': + this->schedule_file_ = get_opt.optarg; + break; + + case 's': + this->send_mcast_group_.set (get_opt.optarg); + break; + + case 'r': + this->recv_mcast_group_.set (get_opt.optarg); + break; + + case '?': + default: + ACE_DEBUG ((LM_DEBUG, + "Usage: %s " + "[ORB options] " + "-x (short circuit EC) " + "-h " + "-p " + "-d " + "-s " + "-r " + "\n", + argv[0])); + return -1; + } + } + + if (this->event_count_ < 0 + || this->event_count_ >= ECM_Driver::MAX_EVENTS) + { + ACE_DEBUG ((LM_DEBUG, + "%s: event count (%d) is out of range, " + "reset to default (%d)\n", + argv[0], this->event_count_, + 160)); + this->event_count_ = 160; + } + + if (this->n_consumers_ <= 0 + || this->n_consumers_ >= ECM_Driver::MAX_CONSUMERS + || this->n_suppliers_ <= 0 + || this->n_suppliers_ >= ECM_Driver::MAX_SUPPLIERS) + { + ACE_ERROR_RETURN ((LM_DEBUG, + "%s: number of consumers or " + "suppliers out of range\n", argv[0]), -1); + } + + return 0; +} + +// **************************************************************** + +ECM_Supplier::ECM_Supplier (ECM_Driver *test, + void *cookie) + : test_ (test), + cookie_ (cookie), + consumer_ (this) +{ +} + +void +ECM_Supplier::open (const char* name, + int event_a, + int event_b, + int event_count, + const RtecScheduler::Period& rate, + RtecEventChannelAdmin::EventChannel_ptr ec, + CORBA::Environment &_env) +{ + this->event_a_ = event_a; + this->event_b_ = event_b; + this->event_count_ = event_count; + + TAO_TRY + { + RtecScheduler::Scheduler_ptr server = + ACE_Scheduler_Factory::server (); + + RtecScheduler::handle_t rt_info = + server->create (name, TAO_TRY_ENV); + TAO_CHECK_ENV; + + // The execution times are set to reasonable values, but + // actually they are changed on the real execution, i.e. we + // lie to the scheduler to obtain right priorities; but we + // don't care if the set is schedulable. + ACE_Time_Value tv (0, 2000); + TimeBase::TimeT time; + ORBSVCS_Time::Time_Value_to_TimeT (time, tv); + server->set (rt_info, + RtecScheduler::VERY_HIGH_CRITICALITY, + time, time, time, + rate, + RtecScheduler::VERY_LOW_IMPORTANCE, + time, + 1, + RtecScheduler::OPERATION, + TAO_TRY_ENV); + TAO_CHECK_ENV; + + this->supplier_id_ = ACE::crc32 (name); + ACE_DEBUG ((LM_DEBUG, "ID for <%s> is %04.4x\n", name, + this->supplier_id_)); + + ACE_SupplierQOS_Factory qos; + qos.insert (this->supplier_id_, + this->event_a_, + rt_info, 1); + qos.insert (this->supplier_id_, + this->event_b_, + rt_info, 1); + qos.insert (this->supplier_id_, + ACE_ES_EVENT_SHUTDOWN, + rt_info, 1); + + RtecEventChannelAdmin::SupplierAdmin_var supplier_admin = + ec->for_suppliers (TAO_TRY_ENV); + TAO_CHECK_ENV; + + this->consumer_proxy_ = + supplier_admin->obtain_push_consumer (TAO_TRY_ENV); + TAO_CHECK_ENV; + + RtecEventComm::PushSupplier_var objref = this->_this (TAO_TRY_ENV); + TAO_CHECK_ENV; + + this->consumer_proxy_->connect_push_supplier (objref.in (), + qos.get_SupplierQOS (), + TAO_TRY_ENV); + TAO_CHECK_ENV; + + } + TAO_CATCHANY + { + TAO_RETHROW; + } + TAO_ENDTRY; +} + +void +ECM_Supplier::close (CORBA::Environment &_env) +{ + if (CORBA::is_nil (this->consumer_proxy_.in ())) + return; + + this->consumer_proxy_->disconnect_push_consumer (_env); + if (_env.exception () != 0) return; + + this->consumer_proxy_ = 0; +} + +void +ECM_Supplier::activate (const char* name, + const RtecScheduler::Period& rate, + RtecEventChannelAdmin::EventChannel_ptr ec, + CORBA::Environment &_env) +{ + TAO_TRY + { + RtecScheduler::Scheduler_ptr server = + ACE_Scheduler_Factory::server (); + + const int bufsize = 512; + char buf[bufsize]; + ACE_OS::strcpy (buf, "consumer_"); + ACE_OS::strcat (buf, name); + RtecScheduler::handle_t rt_info = + server->create (buf, TAO_TRY_ENV); + TAO_CHECK_ENV; + + + // The execution times are set to reasonable values, but + // actually they are changed on the real execution, i.e. we + // lie to the scheduler to obtain right priorities; but we + // don't care if the set is schedulable. + ACE_Time_Value tv (0, 2000); + TimeBase::TimeT time; + ORBSVCS_Time::Time_Value_to_TimeT (time, tv); + server->set (rt_info, + RtecScheduler::VERY_HIGH_CRITICALITY, + time, time, time, + rate, + RtecScheduler::VERY_LOW_IMPORTANCE, + time, + 1, + RtecScheduler::OPERATION, + TAO_TRY_ENV); + TAO_CHECK_ENV; + + // Also connect our consumer for timeout events from the EC. + int interval = rate / 10; + ACE_Time_Value tv_timeout (interval / ACE_ONE_SECOND_IN_USECS, + interval % ACE_ONE_SECOND_IN_USECS); + TimeBase::TimeT timeout; + ORBSVCS_Time::Time_Value_to_TimeT (timeout, tv_timeout); + + ACE_ConsumerQOS_Factory consumer_qos; + consumer_qos.start_disjunction_group (); + consumer_qos.insert_time (ACE_ES_EVENT_INTERVAL_TIMEOUT, + timeout, + rt_info); + + // = Connect as a consumer. + RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin = + ec->for_consumers (TAO_TRY_ENV); + TAO_CHECK_ENV; + + this->supplier_proxy_ = + consumer_admin->obtain_push_supplier (TAO_TRY_ENV); + TAO_CHECK_ENV; + + RtecEventComm::PushConsumer_var cref = + this->consumer_._this (TAO_TRY_ENV); + TAO_CHECK_ENV; + + this->supplier_proxy_->connect_push_consumer (cref.in (), + consumer_qos.get_ConsumerQOS (), + TAO_TRY_ENV); + TAO_CHECK_ENV; + } + TAO_CATCHANY + { + TAO_RETHROW; + } + TAO_ENDTRY; +} + +void +ECM_Supplier::push (const RtecEventComm::EventSet& events, + CORBA::Environment& _env) +{ +#if 0 + const int bufsize = 128; + char buf[bufsize]; + ACE_OS::sprintf (buf, "Supplier %d receives event in thread: ", + this->supplier_id_); +#endif + + if (events.length () == 0 || this->event_count_ < 0) + { + // ACE_DEBUG ((LM_DEBUG, "no events\n")); + return; + } + + RtecEventComm::EventSet sent (events.length ()); + sent.length (events.length ()); + + for (u_int i = 0; i < events.length (); ++i) + { + const RtecEventComm::Event& e = events[i]; + if (e.type_ != ACE_ES_EVENT_INTERVAL_TIMEOUT) + continue; + + // ACE_DEBUG ((LM_DEBUG, "ECM_Supplier - timeout (%t)\n")); + + RtecEventComm::Event& s = sent[i]; + s.source_ = this->supplier_id_; + s.ttl_ = 1; + + ACE_hrtime_t t = ACE_OS::gethrtime (); + ORBSVCS_Time::hrtime_to_TimeT (s.creation_time_, t); + s.ec_recv_time_ = ORBSVCS_Time::zero; + s.ec_send_time_ = ORBSVCS_Time::zero; + + s.data_.x = 0; + s.data_.y = 0; + + this->event_count_--; + + if (this->event_count_ < 0) + { + //this->supplier_proxy_->disconnect_push_supplier (_env); + //if (_env.exception () != 0) return; + this->test_->shutdown_supplier (this->cookie_, + this->consumer_proxy_.in (), + _env); + } + if (this->event_count_ % 2 == 0) + { + // Generate an A event... + s.type_ = this->event_a_; + } + else + { + s.type_ = this->event_b_; + } + } + this->test_->push_supplier (this->cookie_, + this->consumer_proxy_.in (), + sent, + _env); +} + +void +ECM_Supplier::disconnect_push_supplier (CORBA::Environment& _env) +{ + this->supplier_proxy_->disconnect_push_supplier (_env); +} + +void +ECM_Supplier::disconnect_push_consumer (CORBA::Environment &) +{ +} + +int ECM_Supplier::supplier_id (void) const +{ + return this->supplier_id_; +} + +// **************************************************************** + +ECM_Consumer::ECM_Consumer (ECM_Driver *test, + void *cookie) + : test_ (test), + cookie_ (cookie) +{ +} + +void +ECM_Consumer::open (const char* name, + int event_a, int event_b, + RtecEventChannelAdmin::EventChannel_ptr ec, + CORBA::Environment& _env) +{ + TAO_TRY + { + RtecScheduler::Scheduler_ptr server = + ACE_Scheduler_Factory::server (); + + RtecScheduler::handle_t rt_info = + server->create (name, TAO_TRY_ENV); + TAO_CHECK_ENV; + + // The worst case execution time is far less than 2 + // milliseconds, but that is a safe estimate.... + ACE_Time_Value tv (0, 2000); + TimeBase::TimeT time; + ORBSVCS_Time::Time_Value_to_TimeT (time, tv); + server->set (rt_info, + RtecScheduler::VERY_HIGH_CRITICALITY, + time, time, time, + 0, + RtecScheduler::VERY_LOW_IMPORTANCE, + time, + 0, + RtecScheduler::OPERATION, + TAO_TRY_ENV); + TAO_CHECK_ENV; + + ACE_ConsumerQOS_Factory qos; + qos.start_disjunction_group (); + qos.insert_type (ACE_ES_EVENT_SHUTDOWN, rt_info); + qos.insert_type (event_a, rt_info); + qos.insert_type (event_b, rt_info); + + // = Connect as a consumer. + RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin = + ec->for_consumers (TAO_TRY_ENV); + TAO_CHECK_ENV; + + this->supplier_proxy_ = + consumer_admin->obtain_push_supplier (TAO_TRY_ENV); + TAO_CHECK_ENV; + + RtecEventComm::PushConsumer_var objref = this->_this (TAO_TRY_ENV); + TAO_CHECK_ENV; + + this->supplier_proxy_->connect_push_consumer (objref.in (), + qos.get_ConsumerQOS (), + TAO_TRY_ENV); + TAO_CHECK_ENV; + } + TAO_CATCHANY + { + TAO_RETHROW; + } + TAO_ENDTRY; +} + +void +ECM_Consumer::close (CORBA::Environment &_env) +{ + if (CORBA::is_nil (this->supplier_proxy_.in ())) + return; + + this->supplier_proxy_->disconnect_push_supplier (_env); + if (_env.exception () != 0) return; + + this->supplier_proxy_ = 0; +} + +void +ECM_Consumer::push (const RtecEventComm::EventSet& events, + CORBA::Environment &_env) +{ + ACE_hrtime_t arrival = ACE_OS::gethrtime (); + this->test_->push_consumer (this->cookie_, arrival, events, _env); +} + +void +ECM_Consumer::disconnect_push_consumer (CORBA::Environment &) +{ +} + +// **************************************************************** + +int +main (int argc, char *argv []) +{ + ECM_Driver test; + return test.run (argc, argv); +} + +#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) +template class ACE_Condition; +template class ACE_PushConsumer_Adapter; +#elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) +#pragma instantiate ACE_Condition +#pragma instantiate ACE_PushConsumer_Adapter +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ diff --git a/TAO/orbsvcs/tests/EC_Mcast/EC_Mcast.h b/TAO/orbsvcs/tests/EC_Mcast/EC_Mcast.h new file mode 100644 index 00000000000..9663b7cc035 --- /dev/null +++ b/TAO/orbsvcs/tests/EC_Mcast/EC_Mcast.h @@ -0,0 +1,344 @@ +/* -*- C++ -*- */ +// $Id$ +// +// ============================================================================ +// +// = DESCRIPTION +// This test attempts to communicate several Event Channels UDP +// using multicast. +// The test hardcodes all the objects involved (consumers, +// suppliers, proxies, etc.); the objective is to gain understanding +// on the use of multicast events, not to provide a complete and +// clean implementation. +// +// ============================================================================ + +#if !defined (EC_MCAST_H) +#define EC_MCAST_H + +#include "ace/SString.h" +#include "ace/High_Res_Timer.h" +#include "orbsvcs/RtecEventChannelAdminC.h" +#include "orbsvcs/RtecEventCommS.h" +#include "orbsvcs/Channel_Clients_T.h" +#include "orbsvcs/Event/EC_Gateway_UDP.h" + +class ECM_Driver; + +class ECM_Supplier : public POA_RtecEventComm::PushSupplier +{ + // + // = TITLE + // Helper class to implement the different tests within ECM_Driver. + // + // = DESCRIPTION + // ECM_Driver can be configured to have a single or mcast + // suppliers, to use the EC or short-circuit it, to use the + // Gateway or not; this class connects as a consumer for timeouts + // in the EC, at each timeout it delegates on the ECM_Driver class + // to execute the proper test. +public: + ECM_Supplier (ECM_Driver* test, void* cookie); + + void open (const char* name, + int event_a, int event_b, + int event_count, + const RtecScheduler::Period& rate, + RtecEventChannelAdmin::EventChannel_ptr ec, + CORBA::Environment& _env); + // This method connects the supplier to the EC. + + void close (CORBA::Environment &_env); + // Disconnect from the EC. + + void activate (const char* name, + const RtecScheduler::Period& rate, + RtecEventChannelAdmin::EventChannel_ptr ec, + CORBA::Environment& _env); + + void push (const RtecEventComm::EventSet& events, + CORBA::Environment &_env); + void disconnect_push_consumer (CORBA::Environment &); + // Implement the callbacks for our consumer personality. + + + virtual void disconnect_push_supplier (CORBA::Environment &); + // The methods in the skeleton. + + RtecEventComm::EventSourceID supplier_id (void) const; + // The supplier ID. + +private: + ECM_Driver* test_; + + void *cookie_; + // The test provide us a cookie so we can give back our identity. + + RtecEventComm::EventSourceID supplier_id_; + // We generate an id based on the name.... + + int event_a_; + int event_b_; + // The two types of events we may generate... + + int event_count_; + // The number of events sent by this supplier. + + RtecEventChannelAdmin::ProxyPushConsumer_var consumer_proxy_; + // We talk to the EC (as a supplier) using this proxy. + + ACE_PushConsumer_Adapter consumer_; + // We also connect to the EC as a consumer so we can receive the + // timeout events. + + RtecEventChannelAdmin::ProxyPushSupplier_var supplier_proxy_; + // We talk to the EC (as a supplier) using this proxy. + +}; + +class ECM_Consumer : public POA_RtecEventComm::PushConsumer +{ + // + // = TITLE + // Helper class to implement the different tests within ECM_Driver. + // + // = DESCRIPTION + // ECM_Driver must collect events destined to many consumers, but + // needs to distinguish through which consumer it is receiving the + // event. The easiest way is to create a shallow class that + // forwards the events to the EC, but passing back some cookie to + // identify the consumer. +public: + ECM_Consumer (ECM_Driver* test, void *cookie); + + void open (const char* name, + int event_a, int event_b, + RtecEventChannelAdmin::EventChannel_ptr ec, + CORBA::Environment& _env); + // This method connects the consumer to the EC. + + void close (CORBA::Environment &_env); + // Disconnect from the EC. + + virtual void push (const RtecEventComm::EventSet& events, + CORBA::Environment &_env); + virtual void disconnect_push_consumer (CORBA::Environment &); + // The skeleton methods. + +private: + ECM_Driver* test_; + // The test class. + + void *cookie_; + // The magic cookie that serves as our ID. + + RtecEventChannelAdmin::ProxyPushSupplier_var supplier_proxy_; + // We talk to the EC using this proxy. +}; + +class ECM_Driver +{ + // + // = TITLE + // Test and demonstrate the use of TAO_EC_Gateway. + // + // = DESCRIPTION + // This class is design to exercise several features of the EC_Gateway + // class and the mcast EC architecture. + // We want to create two EC, each one having a single supplier and a + // single consumer. + // + To test the remote facilities the consumer register for both a + // local event and a remote one. + // + To test the remote filtering features the remote consumer only + // wants one of the local events, and this event is generated less + // frequently. + // + // This class creates the local EC_Gateway a consumer and a + // supplier, it uses the command line to figure the subscriptions + // and publications list. + // +public: + ECM_Driver (void); + + enum { + MAX_EVENTS = 1024, + // Maximum number of events to send... + + MAX_CONSUMERS = 16, + // Maximum number of consumers. + + MAX_SUPPLIERS = 16 + // Maximum number of suppliers. + }; + + int run (int argc, char* argv[]); + // Execute the test. + + void push_supplier (void* supplier_cookie, + RtecEventChannelAdmin::ProxyPushConsumer_ptr consumer, + const RtecEventComm::EventSet &events, + CORBA::Environment &); + // Callback method for suppliers, we push for them to their + // consumers and take statistics on the way. + // It is possible that we ignore the parameter when + // testing the short-circuit case. + + void push_consumer (void* consumer_cookie, + ACE_hrtime_t arrival, + const RtecEventComm::EventSet& events, + CORBA::Environment&); + // Callback method for consumers, if any of our consumers has + // received events it will invoke this method. + + void shutdown_supplier (void* supplier_cookie, + RtecEventComm::PushConsumer_ptr consumer, + CORBA::Environment& _env); + // One of the suppliers has completed its work. + +private: + RtecEventChannelAdmin::EventChannel_ptr + get_ec (CosNaming::NamingContext_ptr naming_context, + const char* ec_name, + CORBA::Environment &_env); + // Helper routine to obtain an EC given its name. + + void connect_suppliers (RtecEventChannelAdmin::EventChannel_ptr local_ec, + CORBA::Environment &_env); + void disconnect_suppliers (CORBA::Environment &_env); + // Connect the suppliers. + + void activate_suppliers (RtecEventChannelAdmin::EventChannel_ptr local_ec, + CORBA::Environment &_env); + // Activate the suppliers, i.e. they start generating events. + + void connect_ecg (RtecEventChannelAdmin::EventChannel_ptr local_ec, + CORBA::Environment &_env); + // Connect the EC gateway, it builds the Subscriptions and the + // Publications list. + + void connect_consumers (RtecEventChannelAdmin::EventChannel_ptr local_ec, + CORBA::Environment &_env); + void disconnect_consumers (CORBA::Environment &_env); + // Connect and disconnect the consumers. + + int shutdown (CORBA::Environment&); + // Called when the main thread (i.e. not the scavenger thread) is + // shutting down. + + int parse_args (int argc, char* argv[]); + // parse the command line args + + void dump_results (void); + // Dump the results to the standard output. + + void wait_until_ready (void); + // Block event delivery until all the consumers are ready. + + struct Stats; + void dump_results (const char* name, Stats& stats); + // Dump the results for a particular consumer. + + int local_source (RtecEventComm::EventSourceID id) const; + // Check if correspond to a local supplier. + + void shutdown_consumer (int id); + // One of the consumers has completed its work. + +private: + char* lcl_name_; + // The name of the "local" EC. + + int short_circuit_; + // Send events directly to local consumers. + + int n_suppliers_; + // The number of suppliers in this test. + + int n_consumers_; + // The number of consumers. + + int workload_; + // The number of iterations of ACE::is_prime() to execute. + + int event_period_; + // The events are generated using this interval. + + int event_count_; + // How many events will the suppliers send + + int s_event_a_; + int s_event_b_; + int c_event_a_; + int c_event_b_; + int r_event_a_; + int r_event_b_; + // Each supplier send two types of events, each consumer receives + // two other types. The remote suppliers send other two types of + // events. + + const char* schedule_file_; + // Ask the schedule to compute and dump its schedule after the test + // execution. + + const char* pid_file_name_; + // The name of a file where the process stores its pid + + ACE_INET_Addr send_mcast_group_; + // The multicast group to send the consumer events. + + TAO_ECG_UDP_Sender sender_; + // This consumer sends the local events through + + ACE_INET_Addr recv_mcast_group_; + // The multicast group to receive the consumer events. + + TAO_ECG_UDP_Receiver receiver_; + // This supplier pushes the remote events received throug + // + + TAO_ECG_Mcast_EH mcast_eh_; + // The event handler to receive the mcast events. + + ECM_Supplier* suppliers_[ECM_Driver::MAX_SUPPLIERS]; + ECM_Consumer* consumers_[ECM_Driver::MAX_CONSUMERS]; + // The suppliers and consumer arrays, the sizes are controlled using + // {lp,hp}_{suppliers,consumers}_ + + // @@ TODO it looks like the HP and LP data could be encapsulated. + + struct Stats { + ACE_hrtime_t total_time_; + float laxity_[MAX_EVENTS]; + ACE_hrtime_t lcl_latency_[MAX_EVENTS]; + ACE_hrtime_t end_[MAX_EVENTS]; + int lcl_count_; + // We keep laxity and total_time stats only for the local events. + + ACE_hrtime_t rmt_latency_[MAX_EVENTS]; + int rmt_count_; + }; + Stats stats_[ECM_Driver::MAX_CONSUMERS]; + // Store the measurements for local and remote events.. + + int ready_; + ACE_SYNCH_MUTEX ready_mtx_; + ACE_Condition ready_cnd_; + // Before accepting any events the suppliers must wait for the test + // to setup all the consumers. + // The suppliers wait on the condition variable. + + ACE_Atomic_Op running_suppliers_; + // keep track of how many suppliers are still running so we shutdown + // at the right moment. + + ACE_Atomic_Op running_consumers_; + // keep track of how many consumers are still running so we shutdown + // at the right moment. + + ACE_hrtime_t test_start_; + ACE_hrtime_t test_stop_; + // Measure the test elapsed time as well as mark the beginning of + // the frames. +}; + +#endif /* EC_MCAST_H */ diff --git a/TAO/orbsvcs/tests/EC_Mcast/Makefile b/TAO/orbsvcs/tests/EC_Mcast/Makefile new file mode 100644 index 00000000000..156a1eeca5e --- /dev/null +++ b/TAO/orbsvcs/tests/EC_Mcast/Makefile @@ -0,0 +1,34 @@ +# $Id$ + +BIN = EC_Mcast +BUILD = $(BIN) +SRC = $(BIN:%=%$(VAR).cpp) +LDLIBS= -lorbsvcs -lTAO + +ifndef TAO_ROOT +TAO_ROOT = $(ACE_ROOT)/TAO +endif + +CPPFLAGS += -I$(TAO_ROOT)/orbsvcs \ + -I$(TAO_ROOT) + +#---------------------------------------------------------------------------- +# Include macros and targets +#---------------------------------------------------------------------------- + +include $(ACE_ROOT)/include/makeinclude/wrapper_macros.GNU +include $(ACE_ROOT)/include/makeinclude/macros.GNU +include $(ACE_ROOT)/include/makeinclude/rules.common.GNU +include $(ACE_ROOT)/include/makeinclude/rules.nonested.GNU +include $(ACE_ROOT)/include/makeinclude/rules.local.GNU +include $(ACE_ROOT)/include/makeinclude/rules.bin.GNU + +#### Local rules and variables... + +#---------------------------------------------------------------------------- +# Dependencies +#---------------------------------------------------------------------------- + +# DO NOT DELETE THIS LINE -- g++dep uses it. +# DO NOT PUT ANYTHING AFTER THIS LINE, IT WILL GO AWAY. +# IF YOU PUT ANYTHING HERE IT WILL GO AWAY diff --git a/TAO/orbsvcs/tests/EC_Mcast/svc.conf b/TAO/orbsvcs/tests/EC_Mcast/svc.conf new file mode 100644 index 00000000000..272f088b3dd --- /dev/null +++ b/TAO/orbsvcs/tests/EC_Mcast/svc.conf @@ -0,0 +1,7 @@ +# $Id$ +# +# The options are described in $TAO_ROOT/docs/Options.html +# +dynamic Resource_Factory Service_Object * TAO:_make_TAO_Resource_Factory() "-ORBresources global -ORBpoa global -ORBcoltable global" +dynamic Client_Strategy_Factory Service_Object * TAO:_make_TAO_Default_Client_Strategy_Factory() +dynamic Server_Strategy_Factory Service_Object * TAO:_make_TAO_Default_Server_Strategy_Factory() "-ORBconcurrency reactive -ORBdemuxstrategy dynamic -ORBtablesize 128 -ORBpoalock thread -ORBcoltbllock thread -ORBpoamgrlock thread" diff --git a/TAO/orbsvcs/tests/EC_Multiple/EC_Multiple.cpp b/TAO/orbsvcs/tests/EC_Multiple/EC_Multiple.cpp index 14641742646..9fad4c0eeb2 100644 --- a/TAO/orbsvcs/tests/EC_Multiple/EC_Multiple.cpp +++ b/TAO/orbsvcs/tests/EC_Multiple/EC_Multiple.cpp @@ -452,7 +452,8 @@ Test_ECG::run (int argc, char* argv[]) if (orb->run (&tv) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "orb->run"), -1); - ec_impl.add_gateway (&this->ecg_); + ec_impl.add_gateway (&this->ecg_, TAO_TRY_ENV); + TAO_CHECK_ENV; } for (int cd = 0; cd < this->consumer_disconnects_; ++cd) @@ -502,7 +503,8 @@ Test_ECG::run (int argc, char* argv[]) if (this->rmt_name_ != 0) { - ec_impl.del_gateway (&this->ecg_); + ec_impl.del_gateway (&this->ecg_, TAO_TRY_ENV); + TAO_CHECK_ENV; this->ecg_.close (TAO_TRY_ENV); TAO_CHECK_ENV; @@ -520,6 +522,10 @@ Test_ECG::run (int argc, char* argv[]) if (orb->run (&tv) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "orb->run"), -1); + naming_context->unbind (channel_name, TAO_TRY_ENV); + TAO_CHECK_ENV; + + if (this->schedule_file_ != 0) { diff --git a/TAO/orbsvcs/tests/EC_Throughput/ECT_Consumer.cpp b/TAO/orbsvcs/tests/EC_Throughput/ECT_Consumer.cpp index 62b63e35ef1..19330cc8ee8 100644 --- a/TAO/orbsvcs/tests/EC_Throughput/ECT_Consumer.cpp +++ b/TAO/orbsvcs/tests/EC_Throughput/ECT_Consumer.cpp @@ -153,12 +153,11 @@ Driver::run (int argc, char* argv[]) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "orb->run"), -1); ACE_DEBUG ((LM_DEBUG, "event loop finished\n")); - channel->destroy (TAO_TRY_ENV); - TAO_CHECK_ENV; - this->disconnect_consumers (TAO_TRY_ENV); TAO_CHECK_ENV; + channel->destroy (TAO_TRY_ENV); + TAO_CHECK_ENV; } TAO_CATCH (CORBA::SystemException, sys_ex) { diff --git a/TAO/tao/CDR.cpp b/TAO/tao/CDR.cpp index e65e7e7ebcf..04b486fc60a 100644 --- a/TAO/tao/CDR.cpp +++ b/TAO/tao/CDR.cpp @@ -37,60 +37,6 @@ # include "tao/CDR.i" #endif /* ! __ACE_INLINE__ */ -// This functions are private, so it is safe to declare them inline in -// the .cpp file, we still use the ACE_INLINE macro to support -// compilations without any inline code. - -ACE_INLINE -void CDR::swap_2 (const char *orig, char* target) -{ - target[1] = *orig++; - target[0] = *orig++; -} - -ACE_INLINE -void CDR::swap_4 (const char *orig, char* target) -{ - target [3] = *orig++; - target [2] = *orig++; - target [1] = *orig++; - target [0] = *orig++; -} - -ACE_INLINE -void CDR::swap_8 (const char *orig, char* target) -{ - target [7] = *orig++; - target [6] = *orig++; - target [5] = *orig++; - target [4] = *orig++; - target [3] = *orig++; - target [2] = *orig++; - target [1] = *orig++; - target [0] = *orig++; -} - -ACE_INLINE -void CDR::swap_16 (const char *orig, char* target) -{ - target [15] = *orig++; - target [14] = *orig++; - target [13] = *orig++; - target [12] = *orig++; - target [11] = *orig++; - target [10] = *orig++; - target [9] = *orig++; - target [8] = *orig++; - target [7] = *orig++; - target [6] = *orig++; - target [5] = *orig++; - target [4] = *orig++; - target [3] = *orig++; - target [2] = *orig++; - target [1] = *orig++; - target [0] = *orig++; -} - int CDR::grow (ACE_Message_Block *mb, size_t minsize) { diff --git a/TAO/tao/CDR.h b/TAO/tao/CDR.h index 66b26684126..5b496cf543b 100644 --- a/TAO/tao/CDR.h +++ b/TAO/tao/CDR.h @@ -319,6 +319,10 @@ private: // could be interesting to find the break even point and optimize // for that case, but that would be too platform dependent. + int do_byte_swap (void) const; + // If non-zero then this stream is writing in non-native byte order, + // this is only meaningful if TAO_ENABLE_SWAP_ON_WRITE is defined. + private: ACE_Message_Block start_; // The start of the chain of message blocks. diff --git a/TAO/tao/CDR.i b/TAO/tao/CDR.i index 32b0d7041f1..c202a337b1e 100644 --- a/TAO/tao/CDR.i +++ b/TAO/tao/CDR.i @@ -3,6 +3,56 @@ // **************************************************************** +ACE_INLINE +void CDR::swap_2 (const char *orig, char* target) +{ + target[1] = *orig++; + target[0] = *orig++; +} + +ACE_INLINE +void CDR::swap_4 (const char *orig, char* target) +{ + target [3] = *orig++; + target [2] = *orig++; + target [1] = *orig++; + target [0] = *orig++; +} + +ACE_INLINE +void CDR::swap_8 (const char *orig, char* target) +{ + target [7] = *orig++; + target [6] = *orig++; + target [5] = *orig++; + target [4] = *orig++; + target [3] = *orig++; + target [2] = *orig++; + target [1] = *orig++; + target [0] = *orig++; +} + +ACE_INLINE +void CDR::swap_16 (const char *orig, char* target) +{ + target [15] = *orig++; + target [14] = *orig++; + target [13] = *orig++; + target [12] = *orig++; + target [11] = *orig++; + target [10] = *orig++; + target [9] = *orig++; + target [8] = *orig++; + target [7] = *orig++; + target [6] = *orig++; + target [5] = *orig++; + target [4] = *orig++; + target [3] = *orig++; + target [2] = *orig++; + target [1] = *orig++; + target [0] = *orig++; +} + ACE_INLINE void CDR::mb_align (ACE_Message_Block* mb) { @@ -282,6 +332,12 @@ TAO_OutputCDR::encode (CORBA::TypeCode_ptr tc, return CORBA::TypeCode::TRAVERSE_STOP; } +ACE_INLINE +TAO_OutputCDR::do_byte_swap (void) const +{ + return this->do_byte_swap_; +} + // **************************************************************** ACE_INLINE CORBA_Boolean diff --git a/TAO/tao/GIOP.cpp b/TAO/tao/GIOP.cpp index 37da590fddd..5faecca7023 100644 --- a/TAO/tao/GIOP.cpp +++ b/TAO/tao/GIOP.cpp @@ -240,7 +240,7 @@ TAO_GIOP::send_request (TAO_SVC_HANDLER *handler, #if !defined (TAO_ENABLE_SWAP_ON_WRITE) *ACE_reinterpret_cast(CORBA::ULong*,buf + 8) = bodylen; #else - if (!stream->do_byte_swap_) + if (!stream->do_byte_swap ()) { *ACE_reinterpret_cast(CORBA::ULong*, buf + 8) = bodylen; } -- cgit v1.2.1