diff options
Diffstat (limited to 'modules/CIAO/connectors/dds4ccm/examples/Quoter/Connector/Quoter_Connector_exec.cpp')
-rw-r--r-- | modules/CIAO/connectors/dds4ccm/examples/Quoter/Connector/Quoter_Connector_exec.cpp | 217 |
1 files changed, 118 insertions, 99 deletions
diff --git a/modules/CIAO/connectors/dds4ccm/examples/Quoter/Connector/Quoter_Connector_exec.cpp b/modules/CIAO/connectors/dds4ccm/examples/Quoter/Connector/Quoter_Connector_exec.cpp index 099d7e6a1c3..5eab90d13c7 100644 --- a/modules/CIAO/connectors/dds4ccm/examples/Quoter/Connector/Quoter_Connector_exec.cpp +++ b/modules/CIAO/connectors/dds4ccm/examples/Quoter/Connector/Quoter_Connector_exec.cpp @@ -35,30 +35,31 @@ #include "dds4ccm/impl/ndds/NDDS_Traits.h" #include "dds4ccm/impl/ndds/DomainParticipantFactory.h" +#include "dds4ccm/impl/ndds/DomainParticipant.h" // should be removed after lem fix #include "../Broker/BrokerEC.h" // should be removed after we refactor stuff back into ndds impl. #include "dds4ccm/impl/ndds/DataReader.h" - +#include "dds4ccm/impl/ndds/ListenerControl.h" namespace CIAO_Quoter_Quoter_Connector_Impl { //============================================================ // Facet Executor Implementation Class: Stock_Info_Reader_exec_i //============================================================ - + Stock_Info_Reader_exec_i::Stock_Info_Reader_exec_i (void) { } - + Stock_Info_Reader_exec_i::~Stock_Info_Reader_exec_i (void) { } - + // Operations from ::CCM_DDS::Stock_Info_Reader - + void Stock_Info_Reader_exec_i::read_all ( ::CCM_DDS::Stock_Info_Reader::Stock_InfoSeq_out /* instances */, @@ -66,7 +67,7 @@ namespace CIAO_Quoter_Quoter_Connector_Impl { /* Your code here. */ } - + void Stock_Info_Reader_exec_i::read_all_history ( ::CCM_DDS::Stock_Info_Reader::Stock_InfoSeq_out /* instances */, @@ -74,7 +75,7 @@ namespace CIAO_Quoter_Quoter_Connector_Impl { /* Your code here. */ } - + void Stock_Info_Reader_exec_i::read_one ( ::Quoter::Stock_Info & /* an_instance */, @@ -82,7 +83,7 @@ namespace CIAO_Quoter_Quoter_Connector_Impl { /* Your code here. */ } - + void Stock_Info_Reader_exec_i::read_one_history ( const ::Quoter::Stock_Info & /* an_instance */, @@ -91,51 +92,52 @@ namespace CIAO_Quoter_Quoter_Connector_Impl { /* Your code here. */ } - + ::CCM_DDS::QueryFilter * Stock_Info_Reader_exec_i::filter (void) { /* Your code here. */ return 0; } - + void Stock_Info_Reader_exec_i::filter ( const ::CCM_DDS::QueryFilter & /* filter */) { /* Your code here. */ } - + //============================================================ // Component Executor Implementation Class: Quoter_Connector_exec_i //============================================================ - + Quoter_Connector_exec_i::Quoter_Connector_exec_i (void) : default_domain_configured_ (false), domain_id_ (0), default_topic_configured_ (false), topic_name_ ("Quoter_Topic"), __info_in_configured_ (false), - __info_out_configured_ (false) + __info_out_configured_ (false), + __info_out_rawlistener_enabled_ (false) { } - + Quoter_Connector_exec_i::~Quoter_Connector_exec_i (void) { } - + // Supported operations and attributes. - + // Component attributes. - + char * Quoter_Connector_exec_i::topic_name (void) { // @from DDS_TopicBase return CORBA::string_dup (this->topic_name_.in ()); } - + void Quoter_Connector_exec_i::topic_name ( const char * topic_name) @@ -143,38 +145,38 @@ namespace CIAO_Quoter_Quoter_Connector_Impl // @from DDS_TopicBase this->topic_name_ = topic_name; } - + ::DDS::StringSeq * Quoter_Connector_exec_i::key_fields (void) { // @from DDS_TopicBase - ::DDS::StringSeq *retval = + ::DDS::StringSeq *retval = new ::DDS::StringSeq (this->key_fields_.length ()); - + for (CORBA::ULong i = 0; i < this->key_fields_.length (); ++i) (*retval)[i] = CORBA::string_dup (this->key_fields_[i]); - + return retval; } - + void Quoter_Connector_exec_i::key_fields ( const ::DDS::StringSeq & key_fields) { // @from DDS_TopicBase this->key_fields_.length (key_fields.length ()); - + for (CORBA::ULong i = 0; i < this->key_fields_.length (); ++i) this->key_fields_[i] = CORBA::string_dup (key_fields[i]); } - + ::DDS::DomainId_t Quoter_Connector_exec_i::domain_id (void) { // @from DDS_Base return this->domain_id_; } - + void Quoter_Connector_exec_i::domain_id ( ::DDS::DomainId_t domain_id) @@ -182,14 +184,14 @@ namespace CIAO_Quoter_Quoter_Connector_Impl // @from DDS_Base this->domain_id_ = domain_id; } - + char * Quoter_Connector_exec_i::qos_profile (void) { // @from DDS_Base return CORBA::string_dup (this->qos_profile_.in ()); } - + void Quoter_Connector_exec_i::qos_profile ( const char * qos_profile) @@ -197,7 +199,7 @@ namespace CIAO_Quoter_Quoter_Connector_Impl // @from DDS_Base this->qos_profile_ = qos_profile; } - + // Port operations. void @@ -205,16 +207,19 @@ namespace CIAO_Quoter_Quoter_Connector_Impl { CIAO_DEBUG ((LM_TRACE, CLINFO "Quoter_Connector_exec_i::configure_default_domain_ - " "Configuring default domain\n")); - + if (this->default_domain_configured_) return; try { + NDDSConfigLogger::get_instance()->set_verbosity_by_category(NDDS_CONFIG_LOG_CATEGORY_API, + NDDS_CONFIG_LOG_VERBOSITY_STATUS_ALL ); + // Generic code this->domain_factory_ = new ::CIAO::DDS4CCM::RTI::RTI_DomainParticipantFactory_i (); - + ::DDS::DomainParticipantQos qos; - this->domain_ = + this->domain_ = this->domain_factory_->create_participant (this->domain_id_, qos, 0, @@ -227,26 +232,36 @@ namespace CIAO_Quoter_Quoter_Connector_Impl } } - void + void Quoter_Connector_exec_i::configure_default_topic_ (void) { CIAO_DEBUG ((LM_TRACE, CLINFO "Quoter_Connector_exec_i::configure_default_topic_ - " "Configuring default topic\n")); if (this->default_topic_configured_) return; - + this->configure_default_domain_ (); - + try { if (CORBA::is_nil (this->topic_)) { - ::DDS::TopicQos tqos; - this->topic_ = - this->domain_->create_topic (this->topic_name_.in (), - Stock_Info_Traits::type_support::get_type_name (), - tqos, - 0, - 0); + CIAO::DDS4CCM::RTI::RTI_DomainParticipant_i *part = dynamic_cast< CIAO::DDS4CCM::RTI::RTI_DomainParticipant_i * > (this->domain_.in ()); + DDS_ReturnCode_t retcode = Stock_Info_Traits::type_support::register_type( + part->get_participant (), Stock_Info_Traits::type_support::get_type_name ()); + if (retcode == DDS_RETCODE_OK) + { + ::DDS::TopicQos tqos; + this->topic_ = + this->domain_->create_topic (this->topic_name_.in (), + Stock_Info_Traits::type_support::get_type_name (), + tqos, + 0, + 0); + } + else + { + throw CORBA::INTERNAL (); + } } } catch (...) @@ -254,16 +269,16 @@ namespace CIAO_Quoter_Quoter_Connector_Impl CIAO_ERROR ((LM_ERROR, "Caught unknown error while configuring default topic\n")); throw CORBA::INTERNAL (); } - } - - void + } + + void Quoter_Connector_exec_i::configure_port_info_in_ (void) { if (this->__info_in_configured_) return; - + this->configure_default_topic_ (); - + try { if (CORBA::is_nil (this->__info_in_publisher_.in ())) @@ -273,7 +288,7 @@ namespace CIAO_Quoter_Quoter_Connector_Impl 0, 0); } - + if (CORBA::is_nil (this->__info_in_datawriter_.in ())) { ::DDS::DataWriterQos dwqos; @@ -282,7 +297,7 @@ namespace CIAO_Quoter_Quoter_Connector_Impl 0, 0); this->__info_in_datawriter_ = ::DDS::CCM_DataWriter::_narrow (dwv_tmp); - } + } } catch (...) { @@ -290,23 +305,25 @@ namespace CIAO_Quoter_Quoter_Connector_Impl throw CORBA::INTERNAL (); } } - - + + class info_out_Listener : public virtual ::DDS::DataReaderListener { public: info_out_Listener (::CCM_DDS::Stock_Info_RawListener_ptr listen, - ::CCM_DDS::PortStatusListener_ptr psl) - : enable_ (false), - listener_ (::CCM_DDS::Stock_Info_RawListener::_duplicate (listen)), - portlistener_ (::CCM_DDS::PortStatusListener::_duplicate (psl)) + ::CCM_DDS::PortStatusListener_ptr psl, + ACE_Atomic_Op <TAO_SYNCH_MUTEX, bool> &enabled) + : listener_ (::CCM_DDS::Stock_Info_RawListener::_duplicate (listen)), + portlistener_ (::CCM_DDS::PortStatusListener::_duplicate (psl)), + enable_ (enabled) { }; // from DataReaderListener virtual void on_data_available( ::DDS::DataReader *rdr) { + printf ("*** on data available\n"); if (!this->enable_.value ()) return; @@ -318,7 +335,7 @@ namespace CIAO_Quoter_Quoter_Connector_Impl ACE_ERROR ((LM_ERROR, ACE_TEXT ("Stock_InfoDataReader::narrow failed.\n"))); return; } - + /* Loop until there are messages available in the queue */ for(;;) { ::Quoter::Stock_Info instance; @@ -326,6 +343,7 @@ namespace CIAO_Quoter_Quoter_Connector_Impl ::DDS::ReturnCode_t result = reader->take_next_sample(instance, sampleinfo); if (result == DDS_RETCODE_NO_DATA) { + printf ("no more samples\n"); /* No more samples */ break; } else if (result != DDS_RETCODE_OK) { @@ -333,47 +351,42 @@ namespace CIAO_Quoter_Quoter_Connector_Impl return; } if (sampleinfo.valid_data) { + printf ("got valid data\n"); ::CCM_DDS::ReadInfo empty; listener_->on_data (instance, empty); } - + } - + }; - + virtual void on_requested_deadline_missed (::DDS::DataReader_ptr the_reader, const ::DDS::RequestedDeadlineMissedStatus & status) { this->portlistener_->on_requested_deadline_missed (the_reader, status); }; - - + + virtual void on_sample_lost (::DDS::DataReader_ptr the_reader, const ::DDS::SampleLostStatus & status) { this->portlistener_->on_sample_lost (the_reader, status); }; - - - - // From ListenerControl - bool enabled () const; - void enabled (bool enable); - + private: ::CCM_DDS::Stock_Info_RawListener_var listener_; ::CCM_DDS::PortStatusListener_var portlistener_; - ACE_Atomic_Op <TAO_SYNCH_MUTEX, bool> enable_;; + ACE_Atomic_Op <TAO_SYNCH_MUTEX, bool> &enable_; }; - + void Quoter_Connector_exec_i::configure_port_info_out_ (void) { if (this->__info_out_configured_) return; - + this->configure_default_topic_ (); - + try { if (CORBA::is_nil (this->__info_out_subscriber_.in ())) @@ -383,22 +396,23 @@ namespace CIAO_Quoter_Quoter_Connector_Impl 0, 0); } - + if (CORBA::is_nil (this->__info_out_datareader_.in ())) { this->__info_out_portstatus_ = this->context_->get_connection_info_out_status (); - + this->__info_out_datareaderlistener = new info_out_Listener (this->context_->get_connection_info_out_listener (), - this->context_->get_connection_info_out_status ()); - + this->context_->get_connection_info_out_status (), + this->__info_out_rawlistener_enabled_); + ::DDS::DataReaderQos drqos; - this->__info_out_datareader_ = + this->__info_out_datareader_ = this->__info_out_subscriber_->create_datareader (this->topic_.in (), drqos, this->__info_out_datareaderlistener.in (), DDS_DATA_AVAILABLE_STATUS); } - + } catch (...) { @@ -406,97 +420,102 @@ namespace CIAO_Quoter_Quoter_Connector_Impl throw CORBA::INTERNAL (); } } - + ::CCM_DDS::CCM_Stock_Info_Writer_ptr Quoter_Connector_exec_i::get_info_in_data (void) { std::cerr << "get_info_in_data" << std::endl; - + this->configure_port_info_in_ (); - + return new CIAO::DDS4CCM::RTI::Writer_T<Stock_Info_Traits, ::CCM_DDS::CCM_Stock_Info_Writer> (this->__info_in_datawriter_.in ()); } - + ::DDS::CCM_DataWriter_ptr Quoter_Connector_exec_i::get_info_in_dds_entity (void) { this->configure_port_info_in_ (); - + return this->__info_in_datawriter_.in (); } - + ::CCM_DDS::CCM_Stock_Info_Reader_ptr Quoter_Connector_exec_i::get_info_out_data (void) { /* Your code here. */ return ::CCM_DDS::CCM_Stock_Info_Reader::_nil (); } - + ::CCM_DDS::CCM_ListenerControl_ptr Quoter_Connector_exec_i::get_info_out_control (void) { /* Your code here. */ - return ::CCM_DDS::CCM_ListenerControl::_nil (); + return new CCM_DDS_ListenerControl_i (this->__info_out_rawlistener_enabled_); } - + ::DDS::CCM_DataReader_ptr Quoter_Connector_exec_i::get_info_out_dds_entity (void) { /* Your code here. */ return ::DDS::CCM_DataReader::_nil (); } - + // Operations from Components::SessionComponent. - + void Quoter_Connector_exec_i::set_session_context ( ::Components::SessionContext_ptr ctx) { - ::Quoter::CCM_Quoter_Connector_Context_var lctx = + ::Quoter::CCM_Quoter_Connector_Context_var lctx = ::Quoter::CCM_Quoter_Connector_Context::_narrow (ctx); - + if ( ::CORBA::is_nil (lctx.in ())) { throw ::CORBA::INTERNAL (); } - + this->context_ = lctx; } - + void Quoter_Connector_exec_i::configuration_complete (void) { } - + void Quoter_Connector_exec_i::ccm_activate (void) { /* Your code here. */ + if (!CORBA::is_nil (this->context_->get_connection_info_out_listener ()) || + !CORBA::is_nil (this->context_->get_connection_info_out_status ())) + { + this->configure_port_info_out_ (); + } } - + void Quoter_Connector_exec_i::ccm_passivate (void) { /* Your code here. */ } - + void Quoter_Connector_exec_i::ccm_remove (void) { /* Your code here. */ } - + extern "C" QUOTER_CONNECTOR_EXEC_Export ::Components::EnterpriseComponent_ptr create_Quoter_Quoter_Connector_Impl (void) { ::Components::EnterpriseComponent_ptr retval = ::Components::EnterpriseComponent::_nil (); - + ACE_NEW_NORETURN ( retval, Quoter_Connector_exec_i); - + return retval; } } |