diff options
Diffstat (limited to 'TAO/examples/Simulator/Event_Supplier/DOVE_Supplier.cpp')
-rw-r--r-- | TAO/examples/Simulator/Event_Supplier/DOVE_Supplier.cpp | 487 |
1 files changed, 487 insertions, 0 deletions
diff --git a/TAO/examples/Simulator/Event_Supplier/DOVE_Supplier.cpp b/TAO/examples/Simulator/Event_Supplier/DOVE_Supplier.cpp new file mode 100644 index 00000000000..654cee6b006 --- /dev/null +++ b/TAO/examples/Simulator/Event_Supplier/DOVE_Supplier.cpp @@ -0,0 +1,487 @@ +// $Id$ + +// ============================================================================ +// +// = FILENAME +// DOVE_Supplier.cpp +// +// = DESCRIPTION +// A wrapper around the event service initialization and +// marshalling +// +// = AUTHOR +// Michael Kircher (mk1@cs.wustl.edu) +// +// ============================================================================ + +#include "DOVE_Supplier.h" +#include "tao/ORB_Core.h" + +ACE_RCSID (Event_Supplier, + DOVE_Supplier, + "$Id$") + +// Static pointer member initialization for Singleton. + +ACE_Scheduler_Factory::POD_RT_Info * +DOVE_Supplier::pod_rt_info_instance_ = 0; + +// Constructor. + +DOVE_Supplier::DOVE_Supplier () + : initialized_ (0), + connected_ (0), + connection_params_list_ (0), + current_connection_params_ (0), + connection_count_ (0), + current_connection_index_ (0), + internal_DOVE_Supplier_ptr_ (0), + MIB_name_ (0) +{ + ACE_NEW (internal_DOVE_Supplier_ptr_, + Internal_DOVE_Supplier (this)); + + if (internal_DOVE_Supplier_ptr_ == 0) + { + ACE_ERROR ((LM_ERROR, + "DOVE_Supplier::DOVE_Supplier internal " + "supplier not allocated.")); + } +} + +// Destructor. + +DOVE_Supplier::~DOVE_Supplier () +{ + for (int i = 0; i < this->connection_count_; ++i) + { + delete (this->connection_params_list_ [i]); + } + + delete [] this->connection_params_list_; + + delete internal_DOVE_Supplier_ptr_; + +} + +// Initialize the ORB and the connection to the Name Service + +int +DOVE_Supplier::init (void) +{ + try + { + // Connect to the RootPOA. + CORBA::Object_var poaObject_var = + TAO_ORB_Core_instance()->orb()->resolve_initial_references("RootPOA"); + + if (CORBA::is_nil (poaObject_var.in ())) + ACE_ERROR_RETURN ((LM_ERROR, + " (%P|%t) Unable to initialize the POA.\n"), + -1); + + this->root_POA_var_ = + PortableServer::POA::_narrow (poaObject_var.in ()); + + this->poa_manager_ = + root_POA_var_->the_POAManager (); + + // Get the Naming Service object reference. + CORBA::Object_var namingObj_var = + TAO_ORB_Core_instance()->orb()->resolve_initial_references ( + "NameService"); + + if (CORBA::is_nil (namingObj_var.in ())) + ACE_ERROR_RETURN ((LM_ERROR, + " (%P|%t) Unable to get the Naming Service.\n"), + -1); + + this->namingContext_var_ = + CosNaming::NamingContext::_narrow (namingObj_var.in ()); + } + catch (const CORBA::Exception& ex) + { + ex._tao_print_exception ("DOVE_Supplier::init"); + return -1; + } + + initialized_ = 1; + return 0; +} + +int +DOVE_Supplier::connect (const char* MIB_name, + const char* es_name, + const char * ss_name, + ACE_Scheduler_Factory::POD_RT_Info * pod_rt_info) +{ + // Initialize the supplier if this has not already been done. + if ((initialized_ == 0) && (this->init () == -1)) + { + ACE_ERROR_RETURN ((LM_ERROR, + " (%P|%t) Unable to initialize the DOVE_Supplier.\n"), + -1); + } + + + // Grab the default RT_Info settings if others were not provided. + if (pod_rt_info == 0) + { + // Get the default singleton if we were not passed the data + pod_rt_info = DOVE_Supplier::pod_rt_info_instance (); + if (pod_rt_info == 0) + { + ACE_ERROR_RETURN ((LM_ERROR, + " (%P|%t) Unable to obtain" + " the default RT_Info data.\n"), + -1); + } + } + + // Save the passed MIB name + MIB_name_ = (MIB_name == 0) ? "MIB_unknown" : MIB_name; + + // Create a new connection parameters structure. + Connection_Params * cp_temp = 0; + ACE_NEW_RETURN (cp_temp, Connection_Params, -1); + + // Populate the known fields of the new connection params struct. + cp_temp->pod_rt_info_ = *pod_rt_info; + cp_temp->es_name_ = (es_name == 0) ? "EventService" : es_name; + cp_temp->ss_name_ = (ss_name == 0) ? "ScheduleService" : ss_name; + + // Allocate a new connection parameters pointer array. + // Cannot use ACE_NEW_RETURN here, as we need to clean up + // cp_temp if we fail here, and we need what cp_temp points + // to after the current scope if we succeed here. + Connection_Params ** cp_list_temp; + cp_list_temp = + new Connection_Params * [this->connection_count_ + 1]; + if (cp_list_temp == 0) + { + // Avoid a memory leak if we failed to allocate. + delete cp_temp; + + ACE_ERROR_RETURN ((LM_ERROR, + " (%P|%t) DOVE_Supplier::connect could not " + "reallocate connection params list"), + -1); + } + + // Copy the connection struct pointers from + // the old list (if any) to the new one. + for (int i = 0; i < this->connection_count_; ++i) + { + cp_list_temp [i] = + this->connection_params_list_ [i]; + } + + // Put a pointer to the new connection params structure + // in the new list, increment the connection params count, + // and point to the latest connection parameters. + cp_list_temp [this->connection_count_] = cp_temp; + this->current_connection_params_ = cp_temp; + current_connection_index_ = connection_count_; + ++ (this->connection_count_); + + // Replace the old list of pointers with the new one + delete [] this->connection_params_list_; + this->connection_params_list_ = cp_list_temp; + + // Resolve the event service reference. + if (this->get_EventChannel () == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, + " (%P|%t) Unable to resolve the event service.\n"), + -1); + } + + // Resolve the scheduling service reference. + if (this->get_Scheduler () == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, + " (%P|%t) Unable to resolve the scheduler.\n"), + -1); + } + + // Connect to the event service as a supplier. + if (this->connect_Supplier () == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, + " (%P|%t) Unable to connect to the event service.\n"), + -1); + } + + return 0; + +} + + +// This method is invoked after all connect calls are done. + +void +DOVE_Supplier::connected () +{ + if (! connected_) + { + // Code to do post-connection-establishment + // one-time logic goes here. + + connected_ = 1; + } +} + + +void +DOVE_Supplier::disconnect () +{ +} + + +void +DOVE_Supplier::notify (CORBA::Any &message) +{ + // Finalize connection establishment no later than the first event notification + if (! connected_) + { + this->connected (); + } + + try + { + RtecEventComm::Event event; + event.header.source = SOURCE_ID; + event.header.type = ACE_ES_EVENT_NOTIFICATION; + event.header.ttl = 1; + ACE_hrtime_t creation_time = ACE_OS::gethrtime (); + ORBSVCS_Time::hrtime_to_TimeT (event.header.creation_time, creation_time); + event.header.ec_recv_time = ORBSVCS_Time::zero (); + event.header.ec_send_time = ORBSVCS_Time::zero (); + event.data.any_value = message; + + RtecEventComm::EventSet events; + events.length (1); + events[0] = event; + + // Now we invoke a RPC + this->current_connection_params_->proxyPushConsumer_var_->push (events); + } + catch (const CORBA::Exception&) + { + ACE_ERROR ((LM_ERROR, + "DOVE_Supplier::notify: " + "unexpected exception.\n")); + } +} + + +// Use the next connection in the list of established connections. + +void +DOVE_Supplier::use_next_connection () +{ + if (connection_count_ > 0) + { + current_connection_index_ = + (current_connection_index_ == connection_count_ - 1) + ? 0 : current_connection_index_ + 1; + + current_connection_params_ = + connection_params_list_ [current_connection_index_]; + } +} + + +// Use the previous connection in the list of established connections. + +void +DOVE_Supplier::use_prev_connection () +{ + if (connection_count_ > 0) + { + current_connection_index_ = + (current_connection_index_ == 0) + ? connection_count_ - 1 + : current_connection_index_ - 1; + + current_connection_params_ = + connection_params_list_ [current_connection_index_]; + } +} + + + +// -------------------- Internal Demo Supplier ----------------------------- + +DOVE_Supplier::Internal_DOVE_Supplier::Internal_DOVE_Supplier (DOVE_Supplier *impl_ptr) + : impl_ptr_ (impl_ptr) +{ +} + +// ---------------------------------------------------------------------------- + +int +DOVE_Supplier::get_Scheduler () +{ + try + { + CosNaming::Name schedule_name (1); + schedule_name.length (1); + schedule_name[0].id = + CORBA::string_dup (this->current_connection_params_->ss_name_); + + CORBA::Object_var objref = + namingContext_var_->resolve (schedule_name); + + this->current_connection_params_->scheduler_var_ = + RtecScheduler::Scheduler::_narrow(objref.in ()); + } + catch (const CORBA::Exception&) + { + current_connection_params_->scheduler_var_ = 0; + ACE_ERROR_RETURN ((LM_ERROR, + "DOVE_Supplier::get_Scheduler: " + "error while resolving scheduler %s\n", + this->current_connection_params_->ss_name_), + -1); + } + + return 0; +} + + +int +DOVE_Supplier::get_EventChannel () +{ + try + { + // Get a reference to the Event Service + CosNaming::Name channel_name (1); + channel_name.length (1); + channel_name[0].id = + CORBA::string_dup (this->current_connection_params_->es_name_); + + CORBA::Object_var eventServiceObj_var = + this->namingContext_var_->resolve (channel_name); + + this->current_connection_params_->eventChannel_var_ = + RtecEventChannelAdmin::EventChannel::_narrow (eventServiceObj_var.in()); + + if (CORBA::is_nil (this->current_connection_params_->eventChannel_var_.in())) + ACE_ERROR_RETURN ((LM_ERROR, + "The reference to the event channel is nil!"), + 1); + } + catch (const CORBA::Exception& ex) + { + ex._tao_print_exception ("DOVE_Supplier::get_EventChannel"); + return -1; + } + + return 0; +} + + +int +DOVE_Supplier::connect_Supplier () +{ + try + { + // Generate the Real-time information descriptor. + this->current_connection_params_->rt_info_ = + this->current_connection_params_-> + scheduler_var_-> + create (this->current_connection_params_->pod_rt_info_.entry_point); + + + this->current_connection_params_->scheduler_var_-> + set (this->current_connection_params_->rt_info_, + static_cast<RtecScheduler::Criticality_t> (this->current_connection_params_->pod_rt_info_.criticality), + this->current_connection_params_->pod_rt_info_.worst_case_execution_time, + this->current_connection_params_->pod_rt_info_.typical_execution_time, + this->current_connection_params_->pod_rt_info_.cached_execution_time, + this->current_connection_params_->pod_rt_info_.period, + static_cast<RtecScheduler::Importance_t> (this->current_connection_params_->pod_rt_info_.importance), + this->current_connection_params_->pod_rt_info_.quantum, + this->current_connection_params_->pod_rt_info_.threads, + static_cast<RtecScheduler::Info_Type_t> (this->current_connection_params_->pod_rt_info_.info_type)); + + + + // Set the publications to report them to the event channel. + + CORBA::Short x = 0; + RtecEventChannelAdmin::SupplierQOS qos; + qos.publications.length (1); + qos.publications[0].event.header.source = SOURCE_ID; + qos.publications[0].event.header.type = ACE_ES_EVENT_NOTIFICATION; + qos.publications[0].event.header.ttl = 1; + qos.publications[0].event.header.creation_time = ORBSVCS_Time::zero (); + qos.publications[0].event.header.ec_recv_time = ORBSVCS_Time::zero (); + qos.publications[0].event.header.ec_send_time = ORBSVCS_Time::zero (); + qos.publications[0].event.data.any_value <<= x; + qos.publications[0].dependency_info.number_of_calls = 1; + qos.publications[0].dependency_info.rt_info = + this->current_connection_params_->rt_info_; + + // = Connect as a supplier. + this->current_connection_params_->supplierAdmin_var_ = + this->current_connection_params_->eventChannel_var_->for_suppliers (); + + this->current_connection_params_->proxyPushConsumer_var_ = + this->current_connection_params_->supplierAdmin_var_->obtain_push_consumer (); + + // In calling _this we get back an object reference and register + // the servant with the POA. + RtecEventComm::PushSupplier_var pushSupplier_var = + this->internal_DOVE_Supplier_ptr_->_this (); + + // Connect the supplier to the proxy consumer. + ACE_SupplierQOS_Factory::debug (qos); + this->current_connection_params_-> + proxyPushConsumer_var_->connect_push_supplier (pushSupplier_var.in (), + qos); + } + catch (const CORBA::Exception& ex) + { + ex._tao_print_exception ("DOVE_Supplier::connect_supplier"); + return -1; + } + + return 0; + +} + + +// Access the default rt_info singleton. + +ACE_Scheduler_Factory::POD_RT_Info * +DOVE_Supplier::pod_rt_info_instance () +{ + if (DOVE_Supplier::pod_rt_info_instance_ == 0) + { + ACE_NEW_RETURN (DOVE_Supplier::pod_rt_info_instance_, + ACE_Scheduler_Factory::POD_RT_Info, + 0); + + // Set up the default data. + DOVE_Supplier::pod_rt_info_instance_->entry_point = "ABC"; + DOVE_Supplier::pod_rt_info_instance_->criticality = + RtecScheduler::VERY_LOW_CRITICALITY; + DOVE_Supplier::pod_rt_info_instance_->worst_case_execution_time = + ORBSVCS_Time::zero (); + DOVE_Supplier::pod_rt_info_instance_->typical_execution_time = + ORBSVCS_Time::zero (); + DOVE_Supplier::pod_rt_info_instance_->cached_execution_time = + ORBSVCS_Time::zero (); + DOVE_Supplier::pod_rt_info_instance_->period = 10000000; + DOVE_Supplier::pod_rt_info_instance_->importance = + RtecScheduler::VERY_LOW_IMPORTANCE; + DOVE_Supplier::pod_rt_info_instance_->quantum = ORBSVCS_Time::zero (); + DOVE_Supplier::pod_rt_info_instance_->threads = 1; + DOVE_Supplier::pod_rt_info_instance_->info_type = + RtecScheduler::OPERATION; + } + + return DOVE_Supplier::pod_rt_info_instance_; +} |