diff options
author | coryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1998-02-09 21:16:13 +0000 |
---|---|---|
committer | coryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1998-02-09 21:16:13 +0000 |
commit | e941cef7a751a97cc16564eb13741b86a2abd661 (patch) | |
tree | b38b5a828c9a5b1fc00aa855361d6dece26b6000 /TAO/orbsvcs | |
parent | 3d496410aa90721ca568447feb229aff57d06863 (diff) | |
download | ATCD-e941cef7a751a97cc16564eb13741b86a2abd661.tar.gz |
ChangeLogTag:Mon Feb 9 12:59:29 1998 Carlos O'Ryan <coryan@cs.wustl.edu>
Diffstat (limited to 'TAO/orbsvcs')
-rw-r--r-- | TAO/orbsvcs/Event_Service/Event_Channel.cpp | 50 | ||||
-rw-r--r-- | TAO/orbsvcs/Event_Service/Event_Channel.i | 11 | ||||
-rw-r--r-- | TAO/orbsvcs/Event_Service/Event_Service.cpp | 35 | ||||
-rw-r--r-- | TAO/orbsvcs/Naming_Service/CosNaming_i.cpp | 131 | ||||
-rw-r--r-- | TAO/orbsvcs/Scheduling_Service/Config_Scheduler.cpp | 2 | ||||
-rw-r--r-- | TAO/orbsvcs/orbsvcs/Channel_Clients_T.cpp | 19 | ||||
-rw-r--r-- | TAO/orbsvcs/orbsvcs/Channel_Clients_T.h | 12 | ||||
-rw-r--r-- | TAO/orbsvcs/orbsvcs/Channel_Clients_T.i | 17 | ||||
-rw-r--r-- | TAO/orbsvcs/orbsvcs/RtecEventComm.idl | 56 | ||||
-rw-r--r-- | TAO/orbsvcs/tests/EC_Multiple/EC_Multiple.cpp | 589 | ||||
-rw-r--r-- | TAO/orbsvcs/tests/EC_Multiple/EC_Multiple.h | 156 | ||||
-rw-r--r-- | TAO/orbsvcs/tests/EC_Multiple/Makefile | 269 | ||||
-rw-r--r-- | TAO/orbsvcs/tests/EC_Multiple/README | 34 | ||||
-rw-r--r-- | TAO/orbsvcs/tests/Event_Latency/Event_Latency.cpp | 6 | ||||
-rw-r--r-- | TAO/orbsvcs/tests/Makefile | 1 |
15 files changed, 1271 insertions, 117 deletions
diff --git a/TAO/orbsvcs/Event_Service/Event_Channel.cpp b/TAO/orbsvcs/Event_Service/Event_Channel.cpp index 376b0c05c59..7d85f77295a 100644 --- a/TAO/orbsvcs/Event_Service/Event_Channel.cpp +++ b/TAO/orbsvcs/Event_Service/Event_Channel.cpp @@ -412,15 +412,14 @@ ACE_Push_Supplier_Proxy::push (const RtecEventComm::EventSet &event, CORBA::Environment &_env) { ACE_TIMEPROBE (" enter Push_Supplier_Proxy::push"); - if (!this->connected ()) - TAO_THROW (RtecEventComm::Disconnected); // @@ TOTAL HACK ACE_hrtime_t ec_recv = ACE_OS::gethrtime (); for (CORBA::ULong i = 0; i < event.length (); ++i) { - ACE_OS::memcpy (ACE_const_cast(void*,&event[i].ec_recv_time_), &ec_recv, - sizeof (RtecEventComm::Time)); + ACE_OS::memcpy + (ACE_const_cast(void*,&event[i].ec_recv_time_), + &ec_recv, sizeof (RtecEventComm::Time)); } supplier_module_->push (this, event, _env); } @@ -1134,8 +1133,10 @@ ACE_ES_Correlation_Module::push (ACE_ES_Consumer_Rep *consumer, int ACE_ES_Correlation_Module::schedule_timeout (ACE_ES_Consumer_Rep_Timeout *consumer) { - RtecEventComm::Time &interval = consumer->dependency ()->event_.creation_time_; - RtecEventComm::Time &delay = consumer->dependency ()->event_.creation_time_; + RtecEventComm::Time &interval = + consumer->dependency ()->event_.creation_time_; + RtecEventComm::Time &delay = + consumer->dependency ()->event_.creation_time_; // Store the preemption priority so we can cancel the correct timer. // The priority values may change during the process lifetime (e.g., @@ -1184,8 +1185,10 @@ ACE_ES_Correlation_Module::reschedule_timeout (ACE_ES_Consumer_Rep_Timeout *cons ACE_ERROR_RETURN ((LM_ERROR, "%p.\n", "ACE_ES_Disjunction_Group::reschedule_deadline"), -1); else { - RtecEventComm::Time &interval = consumer->dependency ()->event_.creation_time_; - RtecEventComm::Time &delay = consumer->dependency ()->event_.creation_time_; + RtecEventComm::Time &interval = + consumer->dependency ()->event_.creation_time_; + RtecEventComm::Time &delay = + consumer->dependency ()->event_.creation_time_; // Store the preemption priority so we can cancel the correct timer. // The priority values may change during the process lifetime (e.g., @@ -1515,14 +1518,11 @@ ACE_ES_Consumer_Correlation::get_consumer_rep (RtecEventChannelAdmin::Dependency // Step through all existing consumer reps. for (int x=0; x < crep_index; x++) { + RtecEventComm::Event& e = consumer_reps_[x]->dependency ()->event_; // If <dependency> matches any previously subscribed consumer // reps, we'll reuse it. - if (consumer_reps_[x]->dependency ()->event_.type_ == dependency.event_.type_ -#if defined(ACE_ES_LACKS_ORB) - && consumer_reps_[x]->dependency ()->event_.source_ == - dependency.event_.source_ -#endif /* ACE_ES_LACKS_ORB */ -) + if (e.type_ == dependency.event_.type_ + && e.source_ == dependency.event_.source_ ) { rep = consumer_reps_[x]; break; @@ -1752,7 +1752,8 @@ ACE_ES_Consumer_Rep_Timeout::execute (void) { CORBA::Environment __env; ACE_Time_Value tv = ACE_OS::gettimeofday (); - timeout_event_->creation_time_ = tv.sec () * 10000000 + tv.usec () * 10; + timeout_event_->creation_time_ = + tv.sec () * 10000000 + tv.usec () * 10; correlation_->correlation_module_->push (this, timeout_event_, __env); if (__env.exception () != 0) ACE_ERROR ((LM_ERROR, "ACE_ES_Consumer_Rep_Timeout::execute: unexpected exception.\n")); @@ -1818,7 +1819,8 @@ ACE_ES_Subscription_Module::connected (ACE_Push_Supplier_Proxy *supplier, } #endif - RtecEventComm::EventType &event_type = publications[index].event_.type_; + RtecEventComm::EventType &event_type = + publications[index].event_.type_; // Check to make sure a type was specified. if (event_type == ACE_ES_EVENT_ANY) @@ -2242,7 +2244,9 @@ ACE_ES_Subscription_Module::subscribe (ACE_ES_Consumer_Rep *consumer) if (event.type_ == ACE_ES_EVENT_ANY) result = this->subscribe_source (consumer, event.source_); else - result = this->subscribe_source_type (consumer, event.source_, event.type_); + result = this->subscribe_source_type (consumer, + event.source_, + event.type_); } return result; @@ -2264,13 +2268,15 @@ ACE_ES_Subscription_Module::unsubscribe (ACE_ES_Consumer_Rep *consumer) { // Remove the consumer from the global type-based subscription list. if (ACE_ES_Subscription_Info::remove (type_subscribers_, - consumer, event.type_) == 0) + consumer, + event.type_) == 0) consumer->_release (); } else // Remove the consumer from the global source-based subscription list. if (ACE_ES_Subscription_Info::remove (source_subscribers_, - consumer, event.source_) == 0) + consumer, + event.source_) == 0) consumer->_release (); return 0; @@ -2579,12 +2585,6 @@ ACE_ES_Supplier_Module::push (ACE_Push_Supplier_Proxy *proxy, TAO_CHECK_ENV; } } - TAO_CATCH (RtecEventComm::Disconnected, d) - { - ACE_ERROR ((LM_ERROR, "%p Disconnected.\n", - "ACE_ES_Supplier_Module::push")); - TAO_RETHROW; - } TAO_CATCH (RtecEventChannelAdmin::TypeError, t) { ACE_ERROR ((LM_ERROR, "%p Type Error.\n", diff --git a/TAO/orbsvcs/Event_Service/Event_Channel.i b/TAO/orbsvcs/Event_Service/Event_Channel.i index 4e1eb79db94..996ca7dc7d4 100644 --- a/TAO/orbsvcs/Event_Service/Event_Channel.i +++ b/TAO/orbsvcs/Event_Service/Event_Channel.i @@ -89,11 +89,6 @@ ACE_Push_Consumer_Proxy::push (const RtecEventComm::EventSet &events, push_consumer_->push (events, TAO_TRY_ENV); TAO_CHECK_ENV; } - TAO_CATCH (RtecEventComm::Disconnected, d) - { - ACE_ERROR ((LM_ERROR, "consumer disconnected.\n")); - TAO_RETHROW; - } TAO_CATCH (CORBA::SystemException, se) { ACE_ERROR ((LM_ERROR, "system exception.\n")); @@ -179,7 +174,8 @@ operator == (const RtecEventComm::Event &event1, const RtecEventComm::Event &event2) { // Check if the sources are equal. 0 is a wildcard. - if ((event1.source_ != 0) && (event2.source_ != 0) + if ((event1.source_ != 0) + && (event2.source_ != 0) && (event1.source_ != event2.source_)) return 0; @@ -518,7 +514,8 @@ ACE_ES_Subscription_Module::push_source_type (ACE_Push_Supplier_Proxy *source, if (supplier_map.find (event->type_, subscribers) == -1) { ACE_DEBUG ((LM_ERROR, "ACE_ES_Subscription_Module::push_source_type" - " Warning: event type %d not registered.\n", event->type_)); + " Warning: event type %d not registered.\n", + event->type_)); ACE_TIMEPROBE (" push_source_type"); return 0; // continue anyway } diff --git a/TAO/orbsvcs/Event_Service/Event_Service.cpp b/TAO/orbsvcs/Event_Service/Event_Service.cpp index d7bf66ba7aa..c352933570e 100644 --- a/TAO/orbsvcs/Event_Service/Event_Service.cpp +++ b/TAO/orbsvcs/Event_Service/Event_Service.cpp @@ -12,6 +12,36 @@ +const char* service_name = "EventService"; + +int +parse_args (int argc, char *argv []) +{ + ACE_Get_Opt get_opt (argc, argv, "n:"); + int opt; + + while ((opt = get_opt ()) != EOF) + { + switch (opt) + { + case 'n': + service_name = get_opt.optarg; + break; + case '?': + default: + ACE_DEBUG ((LM_DEBUG, + "Usage: %s " + "-n service_name " + "\n", + argv[0])); + return -1; + } + } + + return 0; +} + + int main (int argc, char *argv[]) { TAO_TRY @@ -21,6 +51,9 @@ int main (int argc, char *argv[]) CORBA::ORB_init (argc, argv, "internet", TAO_TRY_ENV); TAO_CHECK_ENV; + if (parse_args (argc, argv) == -1) + return 1; + CORBA::Object_var poa_object = orb->resolve_initial_references("RootPOA"); if (CORBA::is_nil (poa_object.in ())) @@ -63,7 +96,7 @@ int main (int argc, char *argv[]) CosNaming::Name channel_name (1); channel_name.length (1); - channel_name[0].id = CORBA::string_dup ("EventService"); + channel_name[0].id = CORBA::string_dup (service_name); naming_context->bind (channel_name, ec.in (), TAO_TRY_ENV); TAO_CHECK_ENV; diff --git a/TAO/orbsvcs/Naming_Service/CosNaming_i.cpp b/TAO/orbsvcs/Naming_Service/CosNaming_i.cpp index 1450ad12b82..562668bd920 100644 --- a/TAO/orbsvcs/Naming_Service/CosNaming_i.cpp +++ b/TAO/orbsvcs/Naming_Service/CosNaming_i.cpp @@ -32,7 +32,7 @@ NS_NamingContext::get_context (const CosNaming::Name &name) { // create compound name to be resolved // (<name> - last component) - CORBA::Environment IT_env; + CORBA::Environment _env; CORBA::ULong len = name.length (); CosNaming::Name comp_name (name); comp_name.length (len - 1); @@ -40,13 +40,13 @@ NS_NamingContext::get_context (const CosNaming::Name &name) // resolve CORBA::Object_ptr cont_ref; - cont_ref = resolve (comp_name, IT_env); + cont_ref = resolve (comp_name, _env); // Deal with exceptions in resolve: basicly, add the last component // of the name to <rest_of_name> and rethrow. - if (IT_env.exception () != 0) + if (_env.exception () != 0) { - IT_env.print_exception ("NS_NamingContext::get_context"); + _env.print_exception ("NS_NamingContext::get_context"); return 0; } @@ -55,10 +55,10 @@ NS_NamingContext::get_context (const CosNaming::Name &name) // Try narrowing object reference to a context type. CosNaming::NamingContext_ptr c; - c = CosNaming::NamingContext::_narrow (cont_ref, IT_env); - if (IT_env.exception () != 0) + c = CosNaming::NamingContext::_narrow (cont_ref, _env); + if (_env.exception () != 0) { - IT_env.print_exception ("NS_NamingContext::get_context - _narrow"); + _env.print_exception ("NS_NamingContext::get_context - _narrow"); return 0; } @@ -80,9 +80,9 @@ NS_NamingContext::get_context (const CosNaming::Name &name) void NS_NamingContext::bind (const CosNaming::Name& n, CORBA::Object_ptr obj, - CORBA::Environment &IT_env) + CORBA::Environment &_env) { - IT_env.clear (); + _env.clear (); // get the length of the name CORBA::ULong len = n.length (); @@ -90,8 +90,8 @@ NS_NamingContext::bind (const CosNaming::Name& n, // Check for invalid name. if (len == 0) { - IT_env.clear (); - IT_env.exception (new CosNaming::NamingContext::InvalidName); + _env.clear (); + _env.exception (new CosNaming::NamingContext::InvalidName); return; } @@ -104,7 +104,7 @@ NS_NamingContext::bind (const CosNaming::Name& n, CosNaming::Name simple_name; simple_name.length (1); simple_name[0] = n[len - 1]; - cont->bind (simple_name, obj, IT_env); + cont->bind (simple_name, obj, _env); } // If we received a simple name, we need to bind it in this context. @@ -116,20 +116,19 @@ NS_NamingContext::bind (const CosNaming::Name& n, // Try binding the name. if (context_.bind (name, entry) == -1) { - IT_env.clear (); - IT_env.exception (new CosNaming::NamingContext::AlreadyBound); + _env.clear (); + _env.exception (new CosNaming::NamingContext::AlreadyBound); return; } - /**/// throw CosNaming::NamingContext::AlreadyBound (); - // May need to add case dealing with -1. (Maybe throw cannot - // proceed). + ACE_DEBUG ((LM_DEBUG, "bound: <%s,%s>\n", + n[0].id.in (), n[0].kind.in ())); } } void NS_NamingContext::rebind (const CosNaming::Name& n, CORBA::Object_ptr obj, - CORBA::Environment &IT_env) + CORBA::Environment &_env) { // get the length of the name CORBA::ULong len = n.length (); @@ -137,8 +136,8 @@ NS_NamingContext::rebind (const CosNaming::Name& n, // check for invalid name. if (len == 0) { - IT_env.clear (); - IT_env.exception (new CosNaming::NamingContext::InvalidName); + _env.clear (); + _env.exception (new CosNaming::NamingContext::InvalidName); return; } @@ -151,7 +150,7 @@ NS_NamingContext::rebind (const CosNaming::Name& n, CosNaming::Name simple_name; simple_name.length (1); simple_name[0] = n[len - 1]; - cont->rebind (simple_name, obj, IT_env); + cont->rebind (simple_name, obj, _env); } // If we received a simple name, we need to rebind it in this context. else @@ -171,7 +170,7 @@ NS_NamingContext::rebind (const CosNaming::Name& n, void NS_NamingContext::bind_context (const CosNaming::Name &n, CosNaming::NamingContext_ptr nc, - CORBA::Environment &IT_env) + CORBA::Environment &_env) { // Get the length of the name. CORBA::ULong len = n.length (); @@ -179,8 +178,8 @@ NS_NamingContext::bind_context (const CosNaming::Name &n, // Check for invalid name. if (len == 0) { - IT_env.clear (); - IT_env.exception (new CosNaming::NamingContext::InvalidName); + _env.clear (); + _env.exception (new CosNaming::NamingContext::InvalidName); return; } @@ -193,7 +192,7 @@ NS_NamingContext::bind_context (const CosNaming::Name &n, CosNaming::Name simple_name; simple_name.length (1); simple_name[0] = n[len - 1]; - cont->bind_context (simple_name, nc, IT_env); + cont->bind_context (simple_name, nc, _env); } // If we received a simple name, we need to bind it in this context. @@ -206,8 +205,8 @@ NS_NamingContext::bind_context (const CosNaming::Name &n, // Try binding the name. if (context_.bind (name, entry) == 1) { - IT_env.clear (); - IT_env.exception (new CosNaming::NamingContext::AlreadyBound); + _env.clear (); + _env.exception (new CosNaming::NamingContext::AlreadyBound); return; } @@ -218,7 +217,7 @@ NS_NamingContext::bind_context (const CosNaming::Name &n, void NS_NamingContext::rebind_context (const CosNaming::Name &n, CosNaming::NamingContext_ptr nc, - CORBA::Environment &IT_env) + CORBA::Environment &_env) { // Get the length of the name. CORBA::ULong len = n.length (); @@ -226,8 +225,8 @@ NS_NamingContext::rebind_context (const CosNaming::Name &n, // Check for invalid name. if (len == 0) { - IT_env.clear (); - IT_env.exception (new CosNaming::NamingContext::InvalidName); + _env.clear (); + _env.exception (new CosNaming::NamingContext::InvalidName); return; } @@ -240,7 +239,7 @@ NS_NamingContext::rebind_context (const CosNaming::Name &n, CosNaming::Name simple_name; simple_name.length (1); simple_name[0] = n[len - 1]; - cont->rebind_context (simple_name, nc, IT_env); + cont->rebind_context (simple_name, nc, _env); } // if we received a simple name, we need to rebind it in this context. @@ -260,7 +259,7 @@ NS_NamingContext::rebind_context (const CosNaming::Name &n, CORBA::Object_ptr NS_NamingContext::resolve (const CosNaming::Name& n, - CORBA::Environment &IT_env) + CORBA::Environment &_env) { // get the length of the name CORBA::ULong len = n.length (); @@ -268,19 +267,22 @@ NS_NamingContext::resolve (const CosNaming::Name& n, // check for invalid name. if (len == 0) { - IT_env.clear (); - IT_env.exception (new CosNaming::NamingContext::InvalidName); - return 0; + _env.clear (); + _env.exception (new CosNaming::NamingContext::InvalidName); + return CORBA::Object::_nil (); } + ACE_DEBUG ((LM_DEBUG, "Trying to resolve <%s,%s>\n", + n[0].id.in (), n[0].kind.in ())); + // resolve the first component of the name NS_ExtId name (n[0].id, n[0].kind); NS_IntId entry; if (context_.find (name, entry) == -1) { - IT_env.clear (); - IT_env.exception (new CosNaming::NamingContext::NotFound (CosNaming::NamingContext::not_object, n)); - return 0; + _env.clear (); + _env.exception (new CosNaming::NamingContext::NotFound (CosNaming::NamingContext::not_object, n)); + return CORBA::Object::_nil (); } CORBA::Object_ptr item = entry.ref_; @@ -292,28 +294,32 @@ NS_NamingContext::resolve (const CosNaming::Name& n, CosNaming::NamingContext_var cont; if (entry.type_ == CosNaming::ncontext) { - cont = CosNaming::NamingContext::_narrow (item, IT_env); - if (IT_env.exception () != 0) + cont = CosNaming::NamingContext::_narrow (item, _env); + if (_env.exception () != 0) { - IT_env.print_exception ("NS_NamingContext::resolve"); - return 0; + _env.print_exception ("NS_NamingContext::resolve"); + return CORBA::Object::_nil (); } } else { - IT_env.clear (); - IT_env.exception (new CosNaming::NamingContext::NotFound (CosNaming::NamingContext::not_context, n)); - return 0; + _env.clear (); + _env.exception (new CosNaming::NamingContext::NotFound (CosNaming::NamingContext::not_context, n)); + return CORBA::Object::_nil (); } - CosNaming::Name rest_of_name; + CosNaming::Name rest_of_name (len - 1); rest_of_name.length (len - 1); for (CORBA::ULong i = 1; i < len; i++) rest_of_name[i-1] = n[i]; - return (cont->resolve (rest_of_name, IT_env)); + return (cont->resolve (rest_of_name, _env)); } + ACE_DEBUG ((LM_DEBUG, "Resolved <%s,%s> to %08.8x\n", + n[0].id.in (), n[0].kind.in (), + item)); + // if the name we had to resolve was simple, we just need // to return the result. return CORBA::Object::_duplicate (item); @@ -321,7 +327,7 @@ NS_NamingContext::resolve (const CosNaming::Name& n, void NS_NamingContext::unbind (const CosNaming::Name& n, - CORBA::Environment &IT_env) + CORBA::Environment &_env) { // if (do_operation (n, CORBA::_nil (), NS_NamingContext::unbind) == 0) @@ -331,8 +337,8 @@ NS_NamingContext::unbind (const CosNaming::Name& n, // check for invalid name. if (len == 0) { - IT_env.clear (); - IT_env.exception (new CosNaming::NamingContext::InvalidName); + _env.clear (); + _env.exception (new CosNaming::NamingContext::InvalidName); return; } @@ -345,7 +351,7 @@ NS_NamingContext::unbind (const CosNaming::Name& n, CosNaming::Name simple_name; simple_name.length (1); simple_name[0] = n[len - 1]; - cont->unbind (simple_name, IT_env); + cont->unbind (simple_name, _env); } else // If we received a simple name, we need to unbind it in this @@ -355,8 +361,8 @@ NS_NamingContext::unbind (const CosNaming::Name& n, // try unbinding the name. if (context_.unbind (name) == -1) { - IT_env.clear (); - IT_env.exception (new CosNaming::NamingContext::NotFound (CosNaming::NamingContext::not_object, n)); + _env.clear (); + _env.exception (new CosNaming::NamingContext::NotFound (CosNaming::NamingContext::not_object, n)); return; } } @@ -365,7 +371,6 @@ NS_NamingContext::unbind (const CosNaming::Name& n, CosNaming::NamingContext_ptr NS_NamingContext::new_context (CORBA::Environment &_env) { - NS_NamingContext *c = new NS_NamingContext; return c->_this (_env); @@ -390,12 +395,12 @@ NS_NamingContext::bind_new_context (const CosNaming::Name& n, } void -NS_NamingContext::destroy (CORBA::Environment &IT_env) +NS_NamingContext::destroy (CORBA::Environment &_env) { if (context_.current_size () != 0) { - IT_env.clear (); - IT_env.exception (new CosNaming::NamingContext::NotEmpty); + _env.clear (); + _env.exception (new CosNaming::NamingContext::NotEmpty); return; } @@ -479,10 +484,10 @@ NS_BindingIterator::~NS_BindingIterator (void) CORBA::Boolean NS_BindingIterator::next_one (CosNaming::Binding_out b, - CORBA::Environment &IT_env) + CORBA::Environment &_env) { // Macro to avoid "warning: unused parameter" type warning. - ACE_UNUSED_ARG (IT_env); + ACE_UNUSED_ARG (_env); if (hash_iter_->done ()) { b = new CosNaming::Binding; @@ -511,10 +516,10 @@ NS_BindingIterator::next_one (CosNaming::Binding_out b, CORBA::Boolean NS_BindingIterator::next_n (CORBA::ULong how_many, CosNaming::BindingList_out bl, - CORBA::Environment &IT_env) + CORBA::Environment &_env) { // Macro to avoid "warning: unused parameter" type warning. - ACE_UNUSED_ARG (IT_env); + ACE_UNUSED_ARG (_env); if (hash_iter_->done ()) { bl = new CosNaming::BindingList; @@ -559,10 +564,10 @@ NS_BindingIterator::next_n (CORBA::ULong how_many, } void -NS_BindingIterator::destroy (CORBA::Environment &IT_env) +NS_BindingIterator::destroy (CORBA::Environment &_env) { // Macro to avoid "warning: unused parameter" type warning. - ACE_UNUSED_ARG (IT_env); + ACE_UNUSED_ARG (_env); // CORBA::release (tie_ref_); } diff --git a/TAO/orbsvcs/Scheduling_Service/Config_Scheduler.cpp b/TAO/orbsvcs/Scheduling_Service/Config_Scheduler.cpp index fbce0144337..f4ea457bb25 100644 --- a/TAO/orbsvcs/Scheduling_Service/Config_Scheduler.cpp +++ b/TAO/orbsvcs/Scheduling_Service/Config_Scheduler.cpp @@ -16,7 +16,7 @@ ACE_Config_Scheduler::ACE_Config_Scheduler (void) : impl(new Scheduler_Generic) { - impl->output_level (10); + // impl->output_level (10); } ACE_Config_Scheduler::~ACE_Config_Scheduler (void) diff --git a/TAO/orbsvcs/orbsvcs/Channel_Clients_T.cpp b/TAO/orbsvcs/orbsvcs/Channel_Clients_T.cpp index 64704ad2060..cc1d2ea78b9 100644 --- a/TAO/orbsvcs/orbsvcs/Channel_Clients_T.cpp +++ b/TAO/orbsvcs/orbsvcs/Channel_Clients_T.cpp @@ -9,4 +9,23 @@ #include "orbsvcs/Channel_Clients_T.i" #endif /* __ACE_INLINE__ */ +template<class TARGET> void +ACE_PushConsumer_Adapter<TARGET>::push (const RtecEventComm::EventSet& events, + CORBA::Environment &_env) +{ + target_->push (events, _env); +} + +template<class TARGET> void +ACE_PushConsumer_Adapter<TARGET>::disconnect_push_consumer (CORBA::Environment &_env) +{ + target_->disconnect_push_consumer (_env); +} + +template<class TARGET> void +ACE_PushSupplier_Adapter<TARGET>::disconnect_push_supplier (CORBA::Environment &_env) +{ + target_->disconnect_push_supplier (_env); +} + #endif /* ACE_CHANNEL_CLIENTS_T_C */ diff --git a/TAO/orbsvcs/orbsvcs/Channel_Clients_T.h b/TAO/orbsvcs/orbsvcs/Channel_Clients_T.h index 1dd52e82ea6..09002a8ebaa 100644 --- a/TAO/orbsvcs/orbsvcs/Channel_Clients_T.h +++ b/TAO/orbsvcs/orbsvcs/Channel_Clients_T.h @@ -1,5 +1,5 @@ /* -*- C++ -*- */ -// $Id $ +// $Id$ // // ============================================================================ // @@ -24,12 +24,12 @@ #ifndef ACE_CHANNEL_CLIENTS_T_H #define ACE_CHANNEL_CLIENTS_T_H -#include "orbsvcs/RtecEventCommC.h" +#include "orbsvcs/RtecEventCommS.h" -// TODO: Add throw specs to this classes. +// @@ TODO: Add throw specs to this classes. template <class TARGET> -class ACE_PushConsumer_Adapter : public RtecEventComm::PushConsumer +class ACE_PushConsumer_Adapter : public POA_RtecEventComm::PushConsumer // = TITLE // ACE Push Consumer Adapter // @@ -41,7 +41,7 @@ public: // Forwards all calls to <owner>. virtual void push (const RtecEventComm::EventSet& events, - CORBA::Environment &); + CORBA::Environment &_env); // Forwards to target_. virtual void disconnect_push_consumer (CORBA::Environment &); @@ -54,7 +54,7 @@ private: // ************************************************************ template <class TARGET> -class ACE_PushSupplier_Adapter : public RtecEventComm::PushSupplier +class ACE_PushSupplier_Adapter : public POA_RtecEventComm::PushSupplier // = TITLE // ACE Push Supplier Adapter // diff --git a/TAO/orbsvcs/orbsvcs/Channel_Clients_T.i b/TAO/orbsvcs/orbsvcs/Channel_Clients_T.i new file mode 100644 index 00000000000..8a2c727c083 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Channel_Clients_T.i @@ -0,0 +1,17 @@ +// +// $Id$ +// + +template<class TARGET> +ACE_PushConsumer_Adapter<TARGET>::ACE_PushConsumer_Adapter (TARGET *t) + : target_ (t) +{ +} + +template<class TARGET> +ACE_PushSupplier_Adapter<TARGET>::ACE_PushSupplier_Adapter (TARGET *t) + : target_ (t) +{ +} + + diff --git a/TAO/orbsvcs/orbsvcs/RtecEventComm.idl b/TAO/orbsvcs/orbsvcs/RtecEventComm.idl index 5171aaa4438..4b060aad8c2 100644 --- a/TAO/orbsvcs/orbsvcs/RtecEventComm.idl +++ b/TAO/orbsvcs/orbsvcs/RtecEventComm.idl @@ -3,10 +3,21 @@ // module RtecEventComm { - exception Disconnected {}; + + // = TITLE + // User defined Event Data. + // + // = DESCRIPTION + // The Event payload is defined by this type. + // Users wanting maximum flexibility can use an Any, + // users that only have one type of event may use structures, + // other users may preffer union, trying to strike a balance + // between performance and flexibility. + // Users willing to implement their own marshalling may use a + // sequence of octet. #if 0 - union EventData switch(short) { + union EventData switch(long) { case 1: double dval; case 2: string sval; case 3: sequence<octet> bval; @@ -19,26 +30,53 @@ module RtecEventComm { }; #endif + // @@ TODO: Use CosTimeService? + // The current definition (double) is a TOTAL HACK, we store a + // <long long> here just because it fits; unfortunately <long long> + // is not fully supported in the IDL compiler (yet). + typedef double Time; + typedef long EventSourceID; typedef long EventType; - // @@ TODO: Use CosTimeService? - // The current definition (double) is a TOTAL HACK, we store a long - // long here just because it fits.... - typedef double Time; + struct Event + { + // = TITLE + // The Event structure. + // + // = DESCRIPTION + // Events are represented by this structure, it is simply a + // header,data pair. + // - struct Event { - EventSourceID source_; EventType type_; + // The event type. + // This may be different from the discriminator in the EventData + // union above, the motivation is to allow filtering by data + // contents: different event types are assigned to different data + // contents though they use the same discriminator. + + EventSourceID source_; + // Some way to identify the supplier. + + long ttl_; + // The "Time To Live" count, each time an EC process the event it + // decreases the TTL field, when it gets to zero the message is no + // longer forwarded. + Time creation_time_; Time ec_recv_time_; Time ec_send_time_; + // Some timestamps, they actually belong in the payload, for some + // kind of measument event. + EventData data_; + // The event payload. }; typedef sequence<Event> EventSet; interface PushConsumer { - oneway void push (in EventSet data); // raises(Disconnected); + oneway void push (in EventSet data); oneway void disconnect_push_consumer(); }; diff --git a/TAO/orbsvcs/tests/EC_Multiple/EC_Multiple.cpp b/TAO/orbsvcs/tests/EC_Multiple/EC_Multiple.cpp new file mode 100644 index 00000000000..6073f896e2e --- /dev/null +++ b/TAO/orbsvcs/tests/EC_Multiple/EC_Multiple.cpp @@ -0,0 +1,589 @@ +// +// $Id$ +// + +#include "ace/Get_Opt.h" + +#include "tao/Timeprobe.h" +#include "orbsvcs/Event_Utilities.h" +#include "orbsvcs/Event_Service_Constants.h" +#include "orbsvcs/Scheduler_Factory.h" +#include "orbsvcs/RtecEventChannelAdminC.h" +#include "EC_Multiple.h" + +// ************************************************************ + +EC_Proxy::EC_Proxy (void) + : consumer_ (this), + supplier_ (this) +{ +} + +EC_Proxy::~EC_Proxy (void) +{ +} + +int +EC_Proxy::open (RtecEventChannelAdmin::EventChannel_ptr remote_ec, + RtecEventChannelAdmin::EventChannel_ptr local_ec, + const RtecEventChannelAdmin::ConsumerQOS& subscriptions, + const RtecEventChannelAdmin::SupplierQOS& publications, + CORBA::Environment &_env) +{ + TAO_TRY + { + // = Connect as a supplier to the local EC + RtecEventChannelAdmin::SupplierAdmin_var supplier_admin = + local_ec->for_suppliers (TAO_TRY_ENV); + TAO_CHECK_ENV; + + this->consumer_proxy_ = + supplier_admin->obtain_push_consumer (TAO_TRY_ENV); + TAO_CHECK_ENV; + + RtecEventComm::PushSupplier_var supplier_ref = + this->supplier_._this (TAO_TRY_ENV); + TAO_CHECK_ENV; + + this->consumer_proxy_->connect_push_supplier (supplier_ref.in (), + publications, + TAO_TRY_ENV); + TAO_CHECK_ENV; + + RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin = + remote_ec->for_consumers (TAO_TRY_ENV); + TAO_CHECK_ENV; + + this->supplier_proxy_ = + consumer_admin->obtain_push_supplier (TAO_TRY_ENV); + TAO_CHECK_ENV; + + RtecEventComm::PushConsumer_var consumer_ref = + this->consumer_._this (TAO_TRY_ENV); + TAO_CHECK_ENV; + + this->supplier_proxy_->connect_push_consumer (consumer_ref.in (), + subscriptions, + TAO_TRY_ENV); + TAO_CHECK_ENV; + } + TAO_CATCHANY + { + TAO_TRY_ENV.print_exception ("EC_Proxy::open"); + return -1; + } + TAO_ENDTRY; + + return 0; +} + +void +EC_Proxy::disconnect_push_consumer (CORBA::Environment &) +{ + ACE_DEBUG ((LM_DEBUG, + "Supplier-consumer received disconnect from channel.\n")); +} + +void +EC_Proxy::disconnect_push_supplier (CORBA::Environment &) +{ + ACE_DEBUG ((LM_DEBUG, + "Supplier received disconnect from channel.\n")); +} + +void +EC_Proxy::push (const RtecEventComm::EventSet &events, + CORBA::Environment & _env) +{ + // ACE_DEBUG ((LM_DEBUG, "EC_Proxy::push - ")); + + if (events.length () == 0) + { + // ACE_DEBUG ((LM_DEBUG, "no events\n")); + return; + } + + ACE_DEBUG ((LM_DEBUG, "ECP: %d event(s)\n", events.length ())); + + // @@ TODO, there is an extra data copy here, we should do the event + // modification without it and only compact the necessary events. + int count = 0; + RtecEventComm::EventSet out (events.length ()); + for (int i = 0; i < events.length (); ++i) + { + if (events[i].ttl_ > 0) + { + count++; + out.length (count); + out[count - 1] = events[i]; + out[count - 1].ttl_--; + } + } + + if (count > 0) + { + this->consumer_proxy_->push (events, _env); + } +} + +// **************************************************************** + +Test_ECP::Test_ECP (void) + : consumer_ (this), + supplier_ (this), + event_a_ (0), + event_b_ (0), + event_c_ (0), + interval_ (250) +{ +} + +int +Test_ECP::run (int argc, char* argv[]) +{ + TAO_TRY + { + CORBA::ORB_var orb = + CORBA::ORB_init (argc, argv, "internet", TAO_TRY_ENV); + TAO_CHECK_ENV; + + CORBA::Object_var poa_object = + orb->resolve_initial_references("RootPOA"); + if (CORBA::is_nil (poa_object.in ())) + ACE_ERROR_RETURN ((LM_ERROR, + " (%P|%t) Unable to initialize the POA.\n"), + 1); + + PortableServer::POA_var root_poa = + PortableServer::POA::_narrow (poa_object.in (), TAO_TRY_ENV); + TAO_CHECK_ENV; + + PortableServer::POAManager_var poa_manager = + root_poa->the_POAManager (TAO_TRY_ENV); + TAO_CHECK_ENV; + + if (this->parse_args (argc, argv)) + return 1; + + CORBA::Object_var naming_obj = + orb->resolve_initial_references ("NameService"); + if (CORBA::is_nil (naming_obj.in ())) + ACE_ERROR_RETURN ((LM_ERROR, + " (%P|%t) Unable to get the Naming Service.\n"), + 1); + + CosNaming::NamingContext_var naming_context = + CosNaming::NamingContext::_narrow (naming_obj.in (), TAO_TRY_ENV); + TAO_CHECK_ENV; + + ACE_Scheduler_Factory::use_config (naming_context.in ()); + + RtecEventChannelAdmin::EventChannel_var local_ec = + this->get_ec (naming_context.in (), + this->lcl_ec_name_, + TAO_TRY_ENV); + TAO_CHECK_ENV; + + RtecEventChannelAdmin::EventChannel_var remote_ec = + this->get_ec (naming_context.in (), + this->rmt_ec_name_, + TAO_TRY_ENV); + TAO_CHECK_ENV; + + if (this->connect_supplier (local_ec.in (), + TAO_TRY_ENV) == -1) + return 1; + + if (this->connect_consumer (local_ec.in (), + TAO_TRY_ENV) == -1) + return 1; + + if (this->connect_ecp (local_ec.in (), + remote_ec.in (), + TAO_TRY_ENV) == -1) + return 1; + + orb->run (); + } + TAO_CATCH (CORBA::SystemException, sys_ex) + { + TAO_TRY_ENV.print_exception ("SYS_EX"); + } + TAO_ENDTRY; +} + +RtecEventChannelAdmin::EventChannel_ptr +Test_ECP::get_ec (CosNaming::NamingContext_ptr naming_context, + const char* ec_name, + CORBA::Environment &_env) +{ + CosNaming::Name channel_name (1); + channel_name.length (1); + channel_name[0].id = CORBA::string_dup (ec_name); + + CORBA::Object_ptr ec_ptr = + naming_context->resolve (channel_name, _env); + if (_env.exception () != 0 || CORBA::is_nil (ec_ptr)) + return RtecEventChannelAdmin::EventChannel::_nil (); + + return RtecEventChannelAdmin::EventChannel::_narrow (ec_ptr, _env); +} + +int +Test_ECP::connect_supplier (RtecEventChannelAdmin::EventChannel_ptr local_ec, + CORBA::Environment &_env) +{ + TAO_TRY + { + char buf[BUFSIZ]; + ACE_OS::strcpy (buf, "supplier@"); + ACE_OS::strcat (buf, this->lcl_ec_name_); + + RtecScheduler::Scheduler_ptr server = + ACE_Scheduler_Factory::server (); + + RtecScheduler::handle_t rt_info = + server->create (buf, TAO_TRY_ENV); + TAO_CHECK_ENV; + + server->set (rt_info, 1, 1, 1, + this->interval_ * 10000, // @@ Make it parametric + RtecScheduler::VERY_LOW, + RtecScheduler::NO_QUANTUM, 1, + TAO_TRY_ENV); + TAO_CHECK_ENV; + + this->supplier_id_ = ACE::crc32 (buf); + + ACE_SupplierQOS_Factory qos; + qos.insert (this->supplier_id_, + ACE_ES_EVENT_UNDEFINED + this->event_a_, + rt_info, 1); + qos.insert (this->supplier_id_, + ACE_ES_EVENT_UNDEFINED + this->event_b_, + rt_info, 1); + qos.insert (this->supplier_id_, + ACE_ES_EVENT_SHUTDOWN, + rt_info, 1); + + RtecEventChannelAdmin::SupplierAdmin_var supplier_admin = + local_ec->for_suppliers (TAO_TRY_ENV); + TAO_CHECK_ENV; + + this->consumer_proxy_ = + supplier_admin->obtain_push_consumer (TAO_TRY_ENV); + TAO_CHECK_ENV; + + RtecEventComm::PushSupplier_var objref = + this->supplier_._this (TAO_TRY_ENV); + TAO_CHECK_ENV; + + this->consumer_proxy_->connect_push_supplier (objref.in (), + qos.get_SupplierQOS (), + TAO_TRY_ENV); + TAO_CHECK_ENV; + } + TAO_CATCHANY + { + TAO_RETHROW_RETURN (-1); + } + TAO_ENDTRY; + return 0; +} + +int +Test_ECP::connect_consumer (RtecEventChannelAdmin::EventChannel_ptr local_ec, + CORBA::Environment &_env) +{ + TAO_TRY + { + RtecScheduler::Scheduler_ptr server = + ACE_Scheduler_Factory::server (); + + char buf[BUFSIZ]; + ACE_OS::strcpy (buf, "consumer@"); + ACE_OS::strcat (buf, this->lcl_ec_name_); + + RtecScheduler::handle_t rt_info = + server->create (buf, TAO_TRY_ENV); + TAO_CHECK_ENV; + + server->set (rt_info, 1, 1, 1, + this->interval_ * 10000, // @@ Make it parametric + RtecScheduler::VERY_LOW, + RtecScheduler::NO_QUANTUM, 1, + TAO_TRY_ENV); + TAO_CHECK_ENV; + + ACE_ConsumerQOS_Factory qos; + qos.start_disjunction_group (); + qos.insert_type (ACE_ES_EVENT_SHUTDOWN, + rt_info); + qos.insert_time (ACE_ES_EVENT_INTERVAL_TIMEOUT, + this->interval_ * 10000, + rt_info); + qos.insert_type (ACE_ES_EVENT_UNDEFINED + this->event_a_, + rt_info); + qos.insert_type (ACE_ES_EVENT_UNDEFINED + this->event_c_, + rt_info); + + // = Connect as a consumer. + RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin = + local_ec->for_consumers (TAO_TRY_ENV); + TAO_CHECK_ENV; + + this->supplier_proxy_ = + consumer_admin->obtain_push_supplier (TAO_TRY_ENV); + TAO_CHECK_ENV; + + RtecEventComm::PushConsumer_var objref = + this->consumer_._this (TAO_TRY_ENV); + TAO_CHECK_ENV; + + this->supplier_proxy_->connect_push_consumer (objref.in (), + qos.get_ConsumerQOS (), + TAO_TRY_ENV); + TAO_CHECK_ENV; + } + TAO_CATCHANY + { + TAO_RETHROW_RETURN (-1); + } + TAO_ENDTRY; + + return 0; +} + +int +Test_ECP::connect_ecp (RtecEventChannelAdmin::EventChannel_ptr local_ec, + RtecEventChannelAdmin::EventChannel_ptr remote_ec, + CORBA::Environment &_env) +{ + TAO_TRY + { + RtecScheduler::Scheduler_ptr server = + ACE_Scheduler_Factory::server (); + + // Generate its ConsumerQOS + char rmt[BUFSIZ]; + ACE_OS::strcpy (rmt, "ecp@"); + ACE_OS::strcat (rmt, this->rmt_ec_name_); + + RtecScheduler::handle_t rmt_info = + server->create (rmt, TAO_TRY_ENV); + TAO_CHECK_ENV; + + server->set (rmt_info, 1, 1, 1, + this->interval_ * 10000, + RtecScheduler::VERY_LOW, + RtecScheduler::NO_QUANTUM, 1, + TAO_TRY_ENV); + TAO_CHECK_ENV; + + ACE_ConsumerQOS_Factory consumer_qos; + consumer_qos.start_disjunction_group (); + consumer_qos.insert_type (ACE_ES_EVENT_UNDEFINED + this->event_a_, + rmt_info); + consumer_qos.insert_type (ACE_ES_EVENT_UNDEFINED + this->event_c_, + rmt_info); + + + // Generate its SupplierQOS + char lcl[BUFSIZ]; + ACE_OS::strcpy (lcl, "ecp@"); + ACE_OS::strcat (lcl, this->lcl_ec_name_); + + RtecScheduler::handle_t lcl_info = + server->create (lcl, TAO_TRY_ENV); + TAO_CHECK_ENV; + + server->set (lcl_info, 1, 1, 1, + this->interval_ * 10000, + RtecScheduler::VERY_LOW, + RtecScheduler::NO_QUANTUM, 1, + TAO_TRY_ENV); + TAO_CHECK_ENV; + + CORBA::ULong supplier_id = ACE::crc32 (lcl); + + ACE_SupplierQOS_Factory supplier_qos; + supplier_qos.insert (supplier_id, + ACE_ES_EVENT_UNDEFINED + this->event_a_, + lcl_info, 1); + supplier_qos.insert (supplier_id, + ACE_ES_EVENT_UNDEFINED + this->event_c_, + lcl_info, 1); + + this->ecp_.open (remote_ec, local_ec, + consumer_qos.get_ConsumerQOS (), + supplier_qos.get_SupplierQOS (), + TAO_TRY_ENV); + TAO_CHECK_ENV; + } + TAO_CATCHANY + { + TAO_RETHROW_RETURN (-1); + } + TAO_ENDTRY; + + return 0; +} + +void +Test_ECP::disconnect_push_consumer (CORBA::Environment &) +{ + ACE_DEBUG ((LM_DEBUG, + "Supplier-consumer received disconnect from channel.\n")); +} + +void +Test_ECP::disconnect_push_supplier (CORBA::Environment &) +{ + ACE_DEBUG ((LM_DEBUG, + "Supplier received disconnect from channel.\n")); +} + +void +Test_ECP::push (const RtecEventComm::EventSet &events, + CORBA::Environment & _env) +{ + // ACE_DEBUG ((LM_DEBUG, "EC_Proxy::push - ")); + + if (events.length () == 0) + { + ACE_DEBUG ((LM_DEBUG, "no events\n")); + return; + } + + ACE_DEBUG ((LM_DEBUG, "%d event(s)\n", events.length ())); + + for (int i = 0; i < events.length (); ++i) + { + const RtecEventComm::Event& e = events[i]; + if (e.type_ == ACE_ES_EVENT_INTERVAL_TIMEOUT) + { + // Generate some random events (acting as a supplier)... + int n = 2; // ACE_OS::rand () % 2; + RtecEventComm::EventSet sent (n); + sent.length (n); + + for (int j = 0; j < n; ++j) + { + Event& s = sent[j]; + s.source_ = this->supplier_id_; + s.ttl_ = 1; + + // @@ TOTAL HACK + ACE_hrtime_t t = ACE_OS::gethrtime (); + ACE_OS::memcpy (&s.creation_time_, &t, + sizeof (s.creation_time_)); + s.ec_recv_time_ = 0; + s.ec_send_time_ = 0; + + s.data_.x = 0; + s.data_.y = 0; + + if (j % 2 == 0) + { + // Generate an A event... + s.type_ = ACE_ES_EVENT_UNDEFINED + this->event_a_; + } + else + { + s.type_ = ACE_ES_EVENT_UNDEFINED + this->event_b_; + } + } + this->consumer_proxy_->push (sent, _env); + ACE_DEBUG ((LM_DEBUG, "Sent %d events\n", + n)); + } + else if (e.type_ == ACE_ES_EVENT_SHUTDOWN) + { + // @@ TODO + } + else + { + // Print out the events received... + ACE_DEBUG ((LM_DEBUG, "Received event %d from %04.4x\n", + e.type_, e.source_)); + } + } +} + +int +Test_ECP::parse_args (int argc, char *argv []) +{ + ACE_Get_Opt get_opt (argc, argv, "l:r:a:b:c:t:"); + int opt; + + while ((opt = get_opt ()) != EOF) + { + switch (opt) + { + case 'l': + this->lcl_ec_name_ = get_opt.optarg; + break; + case 'r': + this->rmt_ec_name_ = get_opt.optarg; + break; + case 'a': + this->event_a_ = ACE_OS::atoi (get_opt.optarg); + break; + case 'b': + this->event_b_ = ACE_OS::atoi (get_opt.optarg); + break; + case 'c': + this->event_c_ = ACE_OS::atoi (get_opt.optarg); + break; + case 't': + this->interval_ = ACE_OS::atoi (get_opt.optarg); + break; + case '?': + default: + ACE_DEBUG ((LM_DEBUG, + "Usage: %s " + "-l local_ec_name " + "-r remote_ec_name " + "<-a event_type_a> " + "<-b event_type_b> " + "<-c event_type_c> " + "-t event_interval " + "\n", + argv[0])); + return -1; + } + } + + if (this->event_a_ <= 0 + || this->event_b_ <= 0 + || this->event_c_ <= 0 ) + { + ACE_DEBUG ((LM_DEBUG, + "%s: you must specify the event types\n", + argv[0])); + return -1; + } + + return 0; +} + +// **************************************************************** + +int +main (int argc, char *argv []) +{ + Test_ECP test; + return test.run (argc, argv); +} + +#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) +template class ACE_PushConsumer_Adapter<Test_ECP>; +template class ACE_PushSupplier_Adapter<Test_ECP>; +template class ACE_PushConsumer_Adapter<EC_Proxy>; +template class ACE_PushSupplier_Adapter<EC_Proxy>; +#elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) +template class ACE_PushConsumer_Adapter<Test_ECP>; +template class ACE_PushSupplier_Adapter<Test_ECP>; +template class ACE_PushConsumer_Adapter<EC_Proxy>; +template class ACE_PushSupplier_Adapter<EC_Proxy>; +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ diff --git a/TAO/orbsvcs/tests/EC_Multiple/EC_Multiple.h b/TAO/orbsvcs/tests/EC_Multiple/EC_Multiple.h new file mode 100644 index 00000000000..5503863c9be --- /dev/null +++ b/TAO/orbsvcs/tests/EC_Multiple/EC_Multiple.h @@ -0,0 +1,156 @@ +/* -*- C++ -*- */ +// $Id$ +// +// ============================================================================ +// +// = DESCRIPTION +// This test attempts to communicate several Event Channels. +// The test hardcodes all the objects involved (consumers, +// suppliers, proxies, etc.); the objective is to gain understanding +// on the architecture needed to exploit locality in the Event +// cycle, not to provide a definite solution. +// +// ============================================================================ + +#if !defined (EC_MULTIPLE_H) +#define EC_MULTIPLE_H + +#include "ace/SString.h" +#include "orbsvcs/RtecEventChannelAdminC.h" +#include "orbsvcs/RtecEventCommS.h" +#include "orbsvcs/Channel_Clients_T.h" + + +class EC_Proxy +// = TITLE +// Event Channel Proxy. +// +// = DESCRIPTION +// This class mediates among two event channels, it connects as a +// consumer of events with a remote event channel, and as a supplier +// of events with the local EC. +// As a consumer it gives a QoS designed to only accept the events +// in which *local* consumers are interested. +// Eventually the local EC should create this object and compute its +// QoS in an automated manner; but this requires some way to filter +// out the peers registered as consumers, otherwise we will get +// loops in the QoS graph. +// It uses exactly the same set of events in the publications list +// when connected as a supplier. +// +// = NOTES +// An alternative implementation would be to register with the +// remote EC as a supplier, and then filter on the remote EC, but +// one of the objectives is to minimize network traffic. +// On the other hand the events will be pushed to remote consumers, +// event though they will be dropped upon receipt (due to the TTL +// field); IMHO this is another suggestion that the EC needs to know +// (somehow) which consumers are truly its peers in disguise. +// +// = ALTERNATIVES +// Check http://www.cs.wustl.edu/~coryan/Multiple_EC.html for a +// discussion on that topic. +// +{ +public: + EC_Proxy (void); + ~EC_Proxy (void); + + int open (RtecEventChannelAdmin::EventChannel_ptr remote_ec, + RtecEventChannelAdmin::EventChannel_ptr local_ec, + const RtecEventChannelAdmin::ConsumerQOS& subscriptions, + const RtecEventChannelAdmin::SupplierQOS& publications, + CORBA::Environment &_env); + // Establish the connections. + + void disconnect_push_supplier (CORBA::Environment &); + // The channel is disconnecting. + + void disconnect_push_consumer (CORBA::Environment &); + // The channel is disconnecting. + + void push (const RtecEventComm::EventSet &events, + CORBA::Environment &); + // This is the Consumer side behavior, it pushes the events to the + // local event channel. + +private: + ACE_PushConsumer_Adapter<EC_Proxy> consumer_; + ACE_PushSupplier_Adapter<EC_Proxy> supplier_; + + RtecEventChannelAdmin::ProxyPushConsumer_var consumer_proxy_; + RtecEventChannelAdmin::ProxyPushSupplier_var supplier_proxy_; +}; + +class Test_ECP +// +// = TITLE +// A simple test for the EC_Proxy class. +// +// = DESCRIPTION +// This class is design to exercise several features of the EC_Proxy +// class and the multiple EC architecture. +// We want to create two EC, each one having a single supplier and a +// single consumer. +// + To test the remote facilities the consumer register for both a +// local event and a remote one. +// + To test the remote filtering features the remote consumer only +// wants one of the local events, and this event is generated less +// frequently. +// +// This class creates the local ECP a consumer and a supplier, it +// uses the command line to figure the +// +{ +public: + Test_ECP (void); + + int run (int argc, char* argv[]); + // Execute the test. + + void disconnect_push_supplier (CORBA::Environment &); + void disconnect_push_consumer (CORBA::Environment &); + void push (const RtecEventComm::EventSet &events, + CORBA::Environment &); + // Implement the consumer and supplier upcalls. + + +private: + int parse_args (int argc, char* argv[]); + + RtecEventChannelAdmin::EventChannel_ptr + get_ec (CosNaming::NamingContext_ptr naming_context, + const char* ec_name, + CORBA::Environment &_env); + + int connect_supplier (RtecEventChannelAdmin::EventChannel_ptr local_ec, + CORBA::Environment &_env); + int connect_consumer (RtecEventChannelAdmin::EventChannel_ptr local_ec, + CORBA::Environment &_env); + int connect_ecp (RtecEventChannelAdmin::EventChannel_ptr local_ec, + RtecEventChannelAdmin::EventChannel_ptr remote_ec, + CORBA::Environment &_env); + +private: + ACE_PushConsumer_Adapter<Test_ECP> consumer_; + ACE_PushSupplier_Adapter<Test_ECP> supplier_; + + EC_Proxy ecp_; + + RtecEventChannelAdmin::ProxyPushConsumer_var consumer_proxy_; + RtecEventChannelAdmin::ProxyPushSupplier_var supplier_proxy_; + + RtecEventComm::EventSourceID supplier_id_; + + char* rmt_ec_name_; + char* lcl_ec_name_; + + int event_a_; + int event_b_; + int event_c_; + + int interval_; +}; + + +#endif /* EC_MULTIPLE_H */ diff --git a/TAO/orbsvcs/tests/EC_Multiple/Makefile b/TAO/orbsvcs/tests/EC_Multiple/Makefile new file mode 100644 index 00000000000..6b37bf1a0b1 --- /dev/null +++ b/TAO/orbsvcs/tests/EC_Multiple/Makefile @@ -0,0 +1,269 @@ +# +# $Id$ +# + +BIN = EC_Multiple + +BUILD = $(BIN) + +EC_MULTIPLE_SRCS= \ + EC_Multiple.cpp + +LSRC= \ + $(EC_MULTIPLE_SRCS) \ + +EC_MULTIPLE_OBJS = $(EC_MULTIPLE_SRCS:.cpp=.o) + +LDLIBS= -lorbsvcs -lTAO + +#---------------------------------------------------------------------------- +# Include macros and targets +#---------------------------------------------------------------------------- + +include $(ACE_ROOT)/include/makeinclude/wrapper_macros.GNU +include $(ACE_ROOT)/include/makeinclude/macros.GNU +include $(ACE_ROOT)/include/makeinclude/rules.common.GNU +include $(ACE_ROOT)/include/makeinclude/rules.nonested.GNU +include $(ACE_ROOT)/include/makeinclude/rules.local.GNU + +#### compiler-specific options +ifeq ($(CXX),g++) + CCFLAGS += -pedantic +else +ifeq ($(CXX),CC) +endif +endif + +ifdef quantify + CCFLAGS += -Dquantify + CPPFLAGS += -I/pkg/purify/quantify-2.1-solaris2 +endif # quantify + +#### Local rules and variables... + +ifndef TAO_ROOT +TAO_ROOT = $(ACE_ROOT)/TAO +endif +TSS_ORB_FLAG = #-DTAO_HAS_TSS_ORBCORE +DCFLAGS = -g +LDFLAGS += -L$(TAO_ROOT)/orbsvcs/orbsvcs -L$(TAO_ROOT)/tao +CPPFLAGS += -I$(TAO_ROOT)/orbsvcs -I$(TAO_ROOT) -I$(TAO_ROOT)/tao/compat $(TSS_ORB_FLAG)#-H + +# Leave the scheduler output out if this is a config run. +ifeq ($(runtime),1) +EC_MULTIPLE_CONFIG_OBJS=EC_Multiple_Scheduler_Runtime.o +endif # runtime + +ifeq ($(probe),1) + CCFLAGS += -DACE_ENABLE_TIMEPROBES +endif # probe + +EC_Multiple: $(addprefix $(VDIR),$(EC_MULTIPLE_OBJS) $(EC_MULTIPLE_CONFIG_OBJS)) + $(LINK.cc) $(LDFLAGS) -o $@ $^ $(VLDLIBS) + +#---------------------------------------------------------------------------- +# Dependencies +#---------------------------------------------------------------------------- + +# DO NOT DELETE THIS LINE -- g++dep uses it. +# DO NOT PUT ANYTHING AFTER THIS LINE, IT WILL GO AWAY. + +.obj/EC_Multiple.o .obj/EC_Multiple.so .shobj/EC_Multiple.o .shobj/EC_Multiple.so: EC_Multiple.cpp \ + $(ACE_ROOT)/ace/Get_Opt.h \ + $(ACE_ROOT)/ace/ACE.h \ + $(ACE_ROOT)/ace/OS.h \ + $(ACE_ROOT)/ace/inc_user_config.h \ + $(ACE_ROOT)/ace/config.h \ + $(ACE_ROOT)/ace/streams.h \ + $(ACE_ROOT)/ace/Basic_Types.h \ + $(ACE_ROOT)/ace/Basic_Types.i \ + $(ACE_ROOT)/ace/OS.i \ + $(ACE_ROOT)/ace/Trace.h \ + $(ACE_ROOT)/ace/Log_Msg.h \ + $(ACE_ROOT)/ace/Log_Record.h \ + $(ACE_ROOT)/ace/Log_Priority.h \ + $(ACE_ROOT)/ace/Log_Record.i \ + $(ACE_ROOT)/ace/Version.h \ + $(ACE_ROOT)/ace/ACE.i \ + $(ACE_ROOT)/ace/Get_Opt.i \ + $(TAO_ROOT)/tao/Timeprobe.h \ + $(ACE_ROOT)/ace/Synch.h \ + $(ACE_ROOT)/ace/SV_Semaphore_Complex.h \ + $(ACE_ROOT)/ace/SV_Semaphore_Simple.h \ + $(ACE_ROOT)/ace/SV_Semaphore_Simple.i \ + $(ACE_ROOT)/ace/SV_Semaphore_Complex.i \ + $(ACE_ROOT)/ace/Synch.i \ + $(ACE_ROOT)/ace/Synch_T.h \ + $(ACE_ROOT)/ace/Event_Handler.h \ + $(ACE_ROOT)/ace/Event_Handler.i \ + $(ACE_ROOT)/ace/Synch_T.i \ + $(ACE_ROOT)/ace/Thread.h \ + $(ACE_ROOT)/ace/Thread.i \ + $(ACE_ROOT)/ace/Atomic_Op.i \ + $(TAO_ROOT)/tao/Timeprobe.i \ + $(TAO_ROOT)/orbsvcs/orbsvcs/Event_Utilities.h \ + $(TAO_ROOT)/orbsvcs/orbsvcs/RtecEventChannelAdminC.h \ + $(TAO_ROOT)/tao/corba.h \ + $(ACE_ROOT)/ace/SOCK_Stream.h \ + $(ACE_ROOT)/ace/SOCK_IO.h \ + $(ACE_ROOT)/ace/SOCK.h \ + $(ACE_ROOT)/ace/Addr.h \ + $(ACE_ROOT)/ace/Addr.i \ + $(ACE_ROOT)/ace/IPC_SAP.h \ + $(ACE_ROOT)/ace/IPC_SAP.i \ + $(ACE_ROOT)/ace/SOCK.i \ + $(ACE_ROOT)/ace/SOCK_IO.i \ + $(ACE_ROOT)/ace/INET_Addr.h \ + $(ACE_ROOT)/ace/INET_Addr.i \ + $(ACE_ROOT)/ace/SOCK_Stream.i \ + $(ACE_ROOT)/ace/Hash_Map_Manager.h \ + $(ACE_ROOT)/ace/SString.h \ + $(ACE_ROOT)/ace/SString.i \ + $(ACE_ROOT)/ace/SOCK_Acceptor.h \ + $(ACE_ROOT)/ace/Time_Value.h \ + $(ACE_ROOT)/ace/SOCK_Acceptor.i \ + $(ACE_ROOT)/ace/SOCK_Connector.h \ + $(ACE_ROOT)/ace/SOCK_Connector.i \ + $(ACE_ROOT)/ace/Strategies.h \ + $(ACE_ROOT)/ace/Strategies_T.h \ + $(ACE_ROOT)/ace/Service_Config.h \ + $(ACE_ROOT)/ace/Service_Object.h \ + $(ACE_ROOT)/ace/Shared_Object.h \ + $(ACE_ROOT)/ace/Shared_Object.i \ + $(ACE_ROOT)/ace/Service_Object.i \ + $(ACE_ROOT)/ace/Signal.h \ + $(ACE_ROOT)/ace/Containers.h \ + $(ACE_ROOT)/ace/Containers.i \ + $(ACE_ROOT)/ace/Signal.i \ + $(ACE_ROOT)/ace/Object_Manager.h \ + $(ACE_ROOT)/ace/Object_Manager.i \ + $(ACE_ROOT)/ace/Managed_Object.h \ + $(ACE_ROOT)/ace/Managed_Object.i \ + $(ACE_ROOT)/ace/Service_Config.i \ + $(ACE_ROOT)/ace/Reactor.h \ + $(ACE_ROOT)/ace/Handle_Set.h \ + $(ACE_ROOT)/ace/Handle_Set.i \ + $(ACE_ROOT)/ace/Timer_Queue.h \ + $(ACE_ROOT)/ace/Timer_Queue_T.h \ + $(ACE_ROOT)/ace/Free_List.h \ + $(ACE_ROOT)/ace/Free_List.i \ + $(ACE_ROOT)/ace/Timer_Queue_T.i \ + $(ACE_ROOT)/ace/Reactor.i \ + $(ACE_ROOT)/ace/Reactor_Impl.h \ + $(ACE_ROOT)/ace/Svc_Conf_Tokens.h \ + $(ACE_ROOT)/ace/Synch_Options.h \ + $(ACE_ROOT)/ace/Connector.h \ + $(ACE_ROOT)/ace/Map_Manager.h \ + $(ACE_ROOT)/ace/Map_Manager.i \ + $(ACE_ROOT)/ace/Svc_Handler.h \ + $(ACE_ROOT)/ace/Task.h \ + $(ACE_ROOT)/ace/Thread_Manager.h \ + $(ACE_ROOT)/ace/Thread_Manager.i \ + $(ACE_ROOT)/ace/Task.i \ + $(ACE_ROOT)/ace/Task_T.h \ + $(ACE_ROOT)/ace/Message_Queue.h \ + $(ACE_ROOT)/ace/Message_Block.h \ + $(ACE_ROOT)/ace/Malloc.h \ + $(ACE_ROOT)/ace/Malloc.i \ + $(ACE_ROOT)/ace/Malloc_T.h \ + $(ACE_ROOT)/ace/Malloc_T.i \ + $(ACE_ROOT)/ace/Memory_Pool.h \ + $(ACE_ROOT)/ace/Mem_Map.h \ + $(ACE_ROOT)/ace/Mem_Map.i \ + $(ACE_ROOT)/ace/Memory_Pool.i \ + $(ACE_ROOT)/ace/Message_Block.i \ + $(ACE_ROOT)/ace/IO_Cntl_Msg.h \ + $(ACE_ROOT)/ace/Message_Queue.i \ + $(ACE_ROOT)/ace/Task_T.i \ + $(ACE_ROOT)/ace/Dynamic.h \ + $(ACE_ROOT)/ace/Dynamic.i \ + $(ACE_ROOT)/ace/Singleton.h \ + $(ACE_ROOT)/ace/Singleton.i \ + $(ACE_ROOT)/ace/Svc_Handler.i \ + $(ACE_ROOT)/ace/Connector.i \ + $(ACE_ROOT)/ace/Acceptor.h \ + $(ACE_ROOT)/ace/Acceptor.i \ + $(TAO_ROOT)/tao/compat/objbase.h \ + $(TAO_ROOT)/tao/compat/initguid.h \ + $(TAO_ROOT)/tao/orbconf.h \ + $(TAO_ROOT)/tao/orb.h \ + $(TAO_ROOT)/tao/align.h \ + $(TAO_ROOT)/tao/corbacom.h \ + $(TAO_ROOT)/tao/sequence.h \ + $(TAO_ROOT)/tao/sequence.i \ + $(TAO_ROOT)/tao/sequence_T.h \ + $(TAO_ROOT)/tao/sequence_T.i \ + $(TAO_ROOT)/tao/objkeyC.h \ + $(TAO_ROOT)/tao/objkeyC.i \ + $(TAO_ROOT)/tao/any.h \ + $(TAO_ROOT)/tao/params.h \ + $(TAO_ROOT)/tao/client_factory.h \ + $(TAO_ROOT)/tao/server_factory.h \ + $(TAO_ROOT)/tao/default_client.h \ + $(TAO_ROOT)/tao/default_server.h \ + $(TAO_ROOT)/tao/strategy_T.h \ + $(TAO_ROOT)/tao/strategy_T.i \ + $(TAO_ROOT)/tao/except.h \ + $(TAO_ROOT)/tao/orbobj.h \ + $(TAO_ROOT)/tao/nvlist.h \ + $(TAO_ROOT)/tao/object.h \ + $(TAO_ROOT)/tao/principa.h \ + $(TAO_ROOT)/tao/request.h \ + $(TAO_ROOT)/tao/svrrqst.h \ + $(TAO_ROOT)/tao/typecode.h \ + $(TAO_ROOT)/tao/marshal.h \ + $(TAO_ROOT)/tao/cdr.h \ + $(TAO_ROOT)/tao/stub.h \ + $(TAO_ROOT)/tao/poa.h \ + $(TAO_ROOT)/tao/poaC.h \ + $(TAO_ROOT)/tao/poaC.i \ + $(TAO_ROOT)/tao/servant_base.h \ + $(TAO_ROOT)/tao/poaS.h \ + $(TAO_ROOT)/tao/poaS.i \ + $(TAO_ROOT)/tao/objtable.h \ + $(TAO_ROOT)/tao/connect.h \ + $(TAO_ROOT)/tao/orb_core.h \ + $(TAO_ROOT)/tao/optable.h \ + $(TAO_ROOT)/tao/debug.h \ + $(TAO_ROOT)/tao/iiopobj.h \ + $(TAO_ROOT)/tao/iioporb.h \ + $(TAO_ROOT)/tao/giop.h \ + $(TAO_ROOT)/tao/orb_core.i \ + $(ACE_ROOT)/ace/Dynamic_Service.h \ + $(TAO_ROOT)/tao/corbacom.i \ + $(TAO_ROOT)/tao/typecode.i \ + $(TAO_ROOT)/tao/nvlist.i \ + $(TAO_ROOT)/tao/any.i \ + $(TAO_ROOT)/tao/stub.i \ + $(TAO_ROOT)/tao/object.i \ + $(TAO_ROOT)/tao/orbobj.i \ + $(TAO_ROOT)/tao/marshal.i \ + $(TAO_ROOT)/tao/cdr.i \ + $(TAO_ROOT)/tao/giop.i \ + $(TAO_ROOT)/tao/iioporb.i \ + $(TAO_ROOT)/tao/iiopobj.i \ + $(TAO_ROOT)/tao/params.i \ + $(TAO_ROOT)/tao/server_factory.i \ + $(TAO_ROOT)/tao/default_client.i \ + $(TAO_ROOT)/tao/default_server.i \ + $(TAO_ROOT)/tao/connect.i \ + $(TAO_ROOT)/tao/singletons.h \ + $(TAO_ROOT)/orbsvcs/orbsvcs/orbsvcs_export.h \ + $(TAO_ROOT)/orbsvcs/orbsvcs/RtecEventCommC.h \ + $(TAO_ROOT)/orbsvcs/orbsvcs/RtecEventCommC.i \ + $(TAO_ROOT)/orbsvcs/orbsvcs/RtecSchedulerC.h \ + $(TAO_ROOT)/orbsvcs/orbsvcs/RtecSchedulerC.i \ + $(TAO_ROOT)/orbsvcs/orbsvcs/RtecEventChannelAdminC.i \ + $(TAO_ROOT)/orbsvcs/orbsvcs/Event_Service_Constants.h \ + $(TAO_ROOT)/orbsvcs/orbsvcs/Event_Utilities.i \ + $(TAO_ROOT)/orbsvcs/orbsvcs/Scheduler_Factory.h \ + $(TAO_ROOT)/orbsvcs/orbsvcs/CosNamingC.h \ + $(TAO_ROOT)/orbsvcs/orbsvcs/CosNamingC.i \ + $(TAO_ROOT)/orbsvcs/orbsvcs/Scheduler_Factory.i \ + EC_Multiple.h \ + $(TAO_ROOT)/orbsvcs/orbsvcs/RtecEventCommS.h \ + $(TAO_ROOT)/orbsvcs/orbsvcs/RtecEventCommS.i \ + $(TAO_ROOT)/orbsvcs/orbsvcs/Channel_Clients_T.h \ + $(TAO_ROOT)/orbsvcs/orbsvcs/Channel_Clients_T.i + +# IF YOU PUT ANYTHING HERE IT WILL GO AWAY diff --git a/TAO/orbsvcs/tests/EC_Multiple/README b/TAO/orbsvcs/tests/EC_Multiple/README new file mode 100644 index 00000000000..8b7c3cffb4e --- /dev/null +++ b/TAO/orbsvcs/tests/EC_Multiple/README @@ -0,0 +1,34 @@ +# $Id$ + + This test ilustrates how to connect multiple ECs. + + The test requires the several other processes are running, the +user must remember to assign a different port to each one: + +$ Naming_Service +$ Scheduling_Service -ORBport 10030 + + we must run at least two EC, each one with a different name: + +$ Event_Service -ORBport 10040 -n EC1 +$ Event_Service -ORBport 10050 -n EC2 + + then we run two instances of the test, configuring the events +that are local, only remote, remote and local: + +$ EC_Multiple -ORBport 10060 -l EC1 -r EC2 -a 1 -b 2 -c 3 +$ EC_Multiple -ORBport 10070 -l EC2 -r EC1 -a 4 -b 3 -c 2 + + In this examples the first test treats EC1 as a local EC, and +EC2 as remote, it generates events <1> and <2> and listens to events +<1> and <3>. The second instance treats EC2 as local an EC1 as +remote, generates events <4> and <3> but listens to <4> and <2>. + + Note how this configuration will have pure local events, some +events that are both local and remote. Right now you have to examine +the output carefully to notice what is happening. + + Another configuration of interest is: + +$ EC_Multiple -ORBport 10060 -l EC1 -r EC2 -a 1 -b 2 -c 2 +$ EC_Multiple -ORBport 10070 -l EC2 -r EC1 -a 4 -b 2 -c 2 diff --git a/TAO/orbsvcs/tests/Event_Latency/Event_Latency.cpp b/TAO/orbsvcs/tests/Event_Latency/Event_Latency.cpp index 4443b19ed69..0a60d93ddc8 100644 --- a/TAO/orbsvcs/tests/Event_Latency/Event_Latency.cpp +++ b/TAO/orbsvcs/tests/Event_Latency/Event_Latency.cpp @@ -535,10 +535,6 @@ Latency_Supplier::push (const RtecEventComm::EventSet &events, } TAO_CHECK_ENV; } - TAO_CATCH (RtecEventComm::Disconnected, d) - { - ACE_ERROR ((LM_ERROR, "(%t) Latency_Supplier::push: disconnected.\n")); - } TAO_CATCHANY { ACE_ERROR ((LM_ERROR, "(%t) %s Latency_Supplier::push:" @@ -798,7 +794,7 @@ main (int argc, char *argv []) orb->resolve_initial_references ("NameService"); if (CORBA::is_nil (naming_obj.in ())) ACE_ERROR_RETURN ((LM_ERROR, - " (%P|%t) Unable to initialize the POA.\n"), + " (%P|%t) Unable to get the Naming Service.\n"), 1); CosNaming::NamingContext_var naming_context = diff --git a/TAO/orbsvcs/tests/Makefile b/TAO/orbsvcs/tests/Makefile index daef1459fd9..f2c8bca6fbf 100644 --- a/TAO/orbsvcs/tests/Makefile +++ b/TAO/orbsvcs/tests/Makefile @@ -11,6 +11,7 @@ DIRS = Simple_Naming \ Logger \ Event_Latency \ + EC_Multiple \ #---------------------------------------------------------------------------- # Include macros and targets |