diff options
Diffstat (limited to 'modules/CIAO/connectors/dds4ccm/impl/ndds/Getter_T.cpp')
-rw-r--r-- | modules/CIAO/connectors/dds4ccm/impl/ndds/Getter_T.cpp | 305 |
1 files changed, 305 insertions, 0 deletions
diff --git a/modules/CIAO/connectors/dds4ccm/impl/ndds/Getter_T.cpp b/modules/CIAO/connectors/dds4ccm/impl/ndds/Getter_T.cpp new file mode 100644 index 00000000000..6ddc60bb46c --- /dev/null +++ b/modules/CIAO/connectors/dds4ccm/impl/ndds/Getter_T.cpp @@ -0,0 +1,305 @@ +// $Id$ +#include "dds4ccm/impl/ndds/DataReader.h" +#include "dds4ccm/impl/ndds/Utils.h" +#include "dds4ccm/impl/ndds/Duration_t.h" +#include "dds4ccm/impl/ndds/SampleInfo.h" + +#include "ciao/Logger/Log_Macros.h" + +template <typename DDS_TYPE, typename CCM_TYPE > +CIAO::DDS4CCM::RTI::Getter_T<DDS_TYPE, CCM_TYPE>::Getter_T (void) : + impl_ (0), + condition_(0), + time_out_ (), + max_delivered_data_ (0), + gd_ (0), + ws_ (0), + rd_condition_ (0) +{ + CIAO_TRACE ("CIAO::DDS4CCM::RTI::Getter_T::Getter_T"); +} + +template <typename DDS_TYPE, typename CCM_TYPE > +CIAO::DDS4CCM::RTI::Getter_T<DDS_TYPE, CCM_TYPE>::~Getter_T (void) +{ + CIAO_TRACE ("CIAO::DDS4CCM::RTI::Getter_T::~Getter_T"); + delete gd_; + delete ws_; +} + +template <typename DDS_TYPE, typename CCM_TYPE> +typename DDS_TYPE::data_reader * +CIAO::DDS4CCM::RTI::Getter_T<DDS_TYPE, CCM_TYPE>::impl (void) +{ + if (this->impl_) + { + return this->impl_; + } + else + { + throw ::CORBA::BAD_INV_ORDER (); + } +} + +template <typename DDS_TYPE, typename CCM_TYPE > +bool +CIAO::DDS4CCM::RTI::Getter_T<DDS_TYPE, CCM_TYPE>::wait ( + DDSConditionSeq& active_conditions) +{ + DDS_Duration_t timeout; + timeout <<= this->time_out_; + DDS_ReturnCode_t const retcode = ws_->wait (active_conditions, timeout); + if (retcode == DDS_RETCODE_TIMEOUT) + { + CIAO_DEBUG (6, (LM_DEBUG, ACE_TEXT ("Getter: No data available after timeout.\n"))); + return false; + } + return true; +} + +template <typename DDS_TYPE, typename CCM_TYPE > +bool +CIAO::DDS4CCM::RTI::Getter_T<DDS_TYPE, CCM_TYPE>::get_many ( + typename CCM_TYPE::seq_type::_out_type instances, + ::CCM_DDS::ReadInfoSeq_out infos) +{ + instances = new typename CCM_TYPE::seq_type; + infos = new ::CCM_DDS::ReadInfoSeq; + + DDSConditionSeq active_conditions; + if (!this->wait (active_conditions)) + { + return false; + } + + ::DDS_Long max_samples = this->max_delivered_data_; + if (max_samples == 0) + { + max_samples = DDS_LENGTH_UNLIMITED; + } + + DDS_SampleInfoSeq sample_info; + typename DDS_TYPE::dds_seq_type data; + for (::DDS_Long i = 0; i < active_conditions.length(); i++) + { + if (active_conditions[i] == gd_) + { + gd_->set_trigger_value (false); + } + + if (active_conditions[i] == rd_condition_) + { + // Check trigger + active_conditions[i]->get_trigger_value (); + + // Take read condition + DDS_ReturnCode_t retcode = this->impl ()->read (data, + sample_info, + max_samples, + DDS_NOT_READ_SAMPLE_STATE , + DDS_ANY_VIEW_STATE, + DDS_ANY_INSTANCE_STATE); + + if (retcode == DDS_RETCODE_OK && data.length () >= 1) + { + ::CORBA::ULong number_read = 0; + for (::DDS_Long index = 0; index < sample_info.length (); index ++) + { + if (sample_info[index].valid_data) + { + ++number_read; + } + } + CIAO_DEBUG (6, (LM_DEBUG, ACE_TEXT ("Getter_T::get_many: ") + ACE_TEXT ("read <%d> - valid <%d>\n"), + sample_info.length (), + number_read)); + infos->length (number_read); + instances->length (number_read); + number_read = 0; + for (::DDS_Long j = 0; j < data.length (); j ++) + { + if (sample_info[j].valid_data) + { + infos->operator[](number_read) <<= sample_info[j]; + instances->operator[](number_read) = data[j]; + ++number_read; + } + } + } + else + { + // RETCODE_NO_DATA should be an error + // because after a timeout there should be + // data. + CIAO_ERROR (1, (LM_ERROR, CLINFO + "CIAO::DDS4CCM::RTI::Getter_T::Getter_T - " + "Error while reading from DDS: <%C>\n", + translate_retcode (retcode))); + this->impl ()->return_loan(data,sample_info); + throw CCM_DDS::InternalError (retcode, 1); + } + + retcode = this->impl ()->return_loan(data,sample_info); + if (retcode != DDS_RETCODE_OK) + { + CIAO_ERROR (1, (LM_ERROR, ACE_TEXT ("return loan error %C\n"), + translate_retcode (retcode))); + } + } + } + return true; +} + +template <typename DDS_TYPE, typename CCM_TYPE > +bool +CIAO::DDS4CCM::RTI::Getter_T<DDS_TYPE, CCM_TYPE>::get_one ( + typename DDS_TYPE::value_type::_out_type an_instance, + ::CCM_DDS::ReadInfo_out info) +{ + an_instance = new typename DDS_TYPE::value_type; + + DDSConditionSeq active_conditions; + if (!this->wait (active_conditions)) + { + return false; + } + + DDS_SampleInfoSeq sample_info; + typename DDS_TYPE::dds_seq_type data; + for (::DDS_Long i = 0; i < active_conditions.length(); i++) + { + if (active_conditions[i] == gd_) + { + gd_->set_trigger_value (false); + } + + if (active_conditions[i] == rd_condition_) + { + // Check trigger + active_conditions[i]->get_trigger_value (); + + // Take read condition + DDS_ReturnCode_t retcode = this->impl ()->read (data, + sample_info, + DDS_LENGTH_UNLIMITED, + DDS_NOT_READ_SAMPLE_STATE , + DDS_ANY_VIEW_STATE, + DDS_ANY_INSTANCE_STATE); + + if (retcode == DDS_RETCODE_OK && data.length () >= 1) + { + info <<= sample_info[0]; //retrieves the last sample. + *an_instance = data[0]; + } + else + { + // RETCODE_NO_DATA should be an error + // because after a timeout there should be + // data. + CIAO_ERROR (1, (LM_ERROR, CLINFO + "CIAO::DDS4CCM::RTI::Getter_T::Getter_T - " + "Error while reading from DDS: <%C>\n", + translate_retcode (retcode))); + this->impl ()->return_loan(data,sample_info); + throw CCM_DDS::InternalError (retcode, 1); + } + + retcode = this->impl ()->return_loan(data,sample_info); + if (retcode != DDS_RETCODE_OK) + { + CIAO_ERROR (1, (LM_ERROR, + ACE_TEXT ("return loan error %C\n"), + translate_retcode (retcode))); + } + } + } + return true; +} + +template <typename DDS_TYPE, typename CCM_TYPE > +::DDS::Duration_t +CIAO::DDS4CCM::RTI::Getter_T<DDS_TYPE, CCM_TYPE>::time_out (void) +{ + return this->time_out_; +} + +template <typename DDS_TYPE, typename CCM_TYPE > +void +CIAO::DDS4CCM::RTI::Getter_T<DDS_TYPE, CCM_TYPE>::time_out ( + const ::DDS::Duration_t & time_out) +{ + this->time_out_ = time_out; +} + +template <typename DDS_TYPE, typename CCM_TYPE > +::CCM_DDS::DataNumber_t +CIAO::DDS4CCM::RTI::Getter_T<DDS_TYPE, CCM_TYPE>::max_delivered_data (void) +{ + return this->max_delivered_data_; +} + +template <typename DDS_TYPE, typename CCM_TYPE > +void +CIAO::DDS4CCM::RTI::Getter_T<DDS_TYPE, CCM_TYPE>::max_delivered_data ( + ::CCM_DDS::DataNumber_t max_delivered_data) +{ + this->max_delivered_data_ = max_delivered_data; +} + +template <typename DDS_TYPE, typename CCM_TYPE> +void +CIAO::DDS4CCM::RTI::Getter_T<DDS_TYPE, CCM_TYPE>::set_impl ( + ::DDS::DataReader_ptr reader) +{ + CIAO_TRACE ("CIAO::DDS4CCM::RTI::Getter_T::set_impl"); + + if (::CORBA::is_nil (reader)) + { + impl_ = 0; + delete gd_; + gd_ = 0; + delete ws_; + ws_ = 0; + } + else + { + RTI_DataReader_i *rdr = dynamic_cast <RTI_DataReader_i *> (reader); + if (!rdr) + { + CIAO_ERROR (1, (LM_ERROR, CLINFO "CIAO::DDS4CCM::RTI::Getter_T::data_reader - " + "Unable to cast provided DataReader to servant\n")); + throw CORBA::INTERNAL (); + } + + this->impl_ = DDS_TYPE::data_reader::narrow (rdr->get_impl ()); + + if (!this->impl_) + { + CIAO_ERROR (1, (LM_ERROR, CLINFO "CIAO::DDS4CCM::RTI::Getter_T::data_reader - " + "Unable to narrow the provided writer entity to the specific " + "type necessary to publish messages\n")); + throw CORBA::INTERNAL (); + } + + // Now create the waitset conditions + gd_ = new DDSGuardCondition (); + ws_ = new DDSWaitSet (); + rd_condition_ = this->impl ()->create_readcondition (DDS_NOT_READ_SAMPLE_STATE, + DDS_NEW_VIEW_STATE | DDS_NOT_NEW_VIEW_STATE, + DDS_ALIVE_INSTANCE_STATE | DDS_NOT_ALIVE_INSTANCE_STATE); + DDS_ReturnCode_t retcode = ws_->attach_condition (gd_); + if (retcode != DDS_RETCODE_OK) + { + CIAO_ERROR (1, (LM_ERROR, CLINFO "GETTER: Unable to attach guard condition to waitset.\n")); + throw CCM_DDS::InternalError (retcode, 0); + } + retcode = ws_->attach_condition (rd_condition_); + if (retcode != DDS_RETCODE_OK) + { + CIAO_ERROR (1, (LM_ERROR, CLINFO "GETTER: Unable to attach read condition to waitset.\n")); + throw CCM_DDS::InternalError (retcode, 1); + } + } +} + |