summaryrefslogtreecommitdiff
path: root/CIAO/connectors/dds4ccm/impl/ndds/DataReaderListener_T.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'CIAO/connectors/dds4ccm/impl/ndds/DataReaderListener_T.cpp')
-rw-r--r--CIAO/connectors/dds4ccm/impl/ndds/DataReaderListener_T.cpp170
1 files changed, 170 insertions, 0 deletions
diff --git a/CIAO/connectors/dds4ccm/impl/ndds/DataReaderListener_T.cpp b/CIAO/connectors/dds4ccm/impl/ndds/DataReaderListener_T.cpp
new file mode 100644
index 00000000000..103b226130b
--- /dev/null
+++ b/CIAO/connectors/dds4ccm/impl/ndds/DataReaderListener_T.cpp
@@ -0,0 +1,170 @@
+// $Id$
+#include "dds4ccm/impl/ndds/Utils.h"
+
+#include "dds4ccm/impl/ndds/DataReader.h"
+#include "dds4ccm/impl/logger/Log_Macros.h"
+#include "ace/Reactor.h"
+
+template <typename DDS_TYPE, typename CCM_TYPE>
+CIAO::DDS4CCM::DataReaderListener_T<DDS_TYPE, CCM_TYPE>::DataReaderListener_T (
+ typename CCM_TYPE::listener_type::_ptr_type listener,
+ ::CCM_DDS::PortStatusListener_ptr port_status_listener,
+ ::CCM_DDS::DataListenerControl_ptr control,
+ ACE_Reactor* reactor)
+ : PortStatusListener_T <DDS_TYPE, CCM_TYPE> (port_status_listener, reactor) ,
+ listener_ (CCM_TYPE::listener_type::_duplicate (listener)),
+ control_ (::CCM_DDS::DataListenerControl::_duplicate (control))
+{
+ DDS4CCM_TRACE ("CIAO::DDS4CCM::DataReaderListener_T::DataReaderListener_T");
+}
+
+template <typename DDS_TYPE, typename CCM_TYPE>
+CIAO::DDS4CCM::DataReaderListener_T<DDS_TYPE, CCM_TYPE>::~DataReaderListener_T (void)
+{
+ DDS4CCM_TRACE ("CIAO::DDS4CCM::DataReaderListener_T::~DataReaderListener_T");
+}
+
+template <typename DDS_TYPE, typename CCM_TYPE>
+void
+CIAO::DDS4CCM::DataReaderListener_T<DDS_TYPE, CCM_TYPE>::on_data_available(::DDS::DataReader_ptr rdr)
+{
+ DDS4CCM_TRACE ("CIAO::DDS4CCM::DataReaderListener_T::on_data_available");
+
+ if (CORBA::is_nil (this->control_.in ()) || this->control_->mode () == ::CCM_DDS::NOT_ENABLED)
+ {
+ return;
+ }
+ else
+ {
+ if (this->reactor_)
+ {
+ drh* rh = 0;
+ ACE_NEW (rh, drh (this, rdr));
+
+ ACE_Event_Handler_var safe_handler (rh);
+ if (this->reactor_->notify (rh) != 0)
+ {
+ DDS4CCM_ERROR (1, (LM_ERROR, ACE_TEXT ("DataReaderListener_T::failed to use reactor.\n")));
+ }
+ }
+ else
+ {
+ this->on_data_available_i (rdr);
+ }
+ }
+}
+
+template <typename DDS_TYPE, typename CCM_TYPE>
+void
+CIAO::DDS4CCM::DataReaderListener_T<DDS_TYPE, CCM_TYPE>::on_data_available_i (::DDS::DataReader_ptr rdr)
+{
+ DDS4CCM_TRACE ("CIAO::DDS4CCM::DataReaderListener_T::on_data_available_i");
+
+ if (CORBA::is_nil (this->control_.in ()) || this->control_->mode () == ::CCM_DDS::NOT_ENABLED)
+ {
+ return;
+ }
+
+ ::CIAO::DDS4CCM::CCM_DDS_DataReader_i* rd =
+ dynamic_cast < ::CIAO::DDS4CCM::CCM_DDS_DataReader_i*>(rdr);
+ if (!rd)
+ {
+ /* In this specific case, this will never fail */
+ DDS4CCM_ERROR (1, (LM_ERROR, ACE_TEXT ("DataReaderListener_T::dynamic_cast failed.\n")));
+ return;
+ }
+
+ typename DDS_TYPE::data_reader * reader =
+ dynamic_cast< typename DDS_TYPE::data_reader * > ((rd->get_impl ()));
+
+ if (!reader)
+ {
+ /* In this specific case, this will never fail */
+ DDS4CCM_ERROR (1, (LM_ERROR, ACE_TEXT ("DataReaderListener_T::narrow failed.\n")));
+ return;
+ }
+
+ typename DDS_TYPE::dds_seq_type data;
+ DDS_SampleInfoSeq sample_info;
+ ::DDS::ReturnCode_t const result = reader->take (
+ data,
+ sample_info,
+ DDS_LENGTH_UNLIMITED,
+ DDS_NOT_READ_SAMPLE_STATE,
+ DDS_NEW_VIEW_STATE | DDS_NOT_NEW_VIEW_STATE,
+ DDS_ANY_INSTANCE_STATE);
+ if (result == DDS_RETCODE_NO_DATA)
+ {
+ return;
+ }
+ else if (result != DDS_RETCODE_OK)
+ {
+ DDS4CCM_ERROR (1, (LM_ERROR, ACE_TEXT ("Unable to take data from data reader, error %C.\n"), translate_retcode (result)));
+ return;
+ }
+
+ if (this->control_->mode () == ::CCM_DDS::ONE_BY_ONE)
+ {
+ for (::DDS_Long i = 0; i < data.length (); ++i)
+ {
+ if (sample_info[i].valid_data)
+ {
+ ::CCM_DDS::ReadInfo info;
+ info <<= sample_info[i];
+ listener_->on_one_data (data[i], info);
+ }
+ }
+ }
+ else
+ {
+ CORBA::ULong nr_of_samples = 0;
+ for (::DDS_Long i = 0 ; i < sample_info.length(); i++)
+ {
+ if (sample_info[i].valid_data)
+ {
+ ++nr_of_samples;
+ }
+ }
+
+ if (nr_of_samples > 0)
+ {
+ typename CCM_TYPE::seq_type::_var_type inst_seq = new typename CCM_TYPE::seq_type (nr_of_samples);
+ ::CCM_DDS::ReadInfoSeq_var infoseq = new ::CCM_DDS::ReadInfoSeq (nr_of_samples);
+
+ infoseq->length (nr_of_samples);
+ inst_seq->length (nr_of_samples);
+
+ // Copy the valid samples
+ CORBA::ULong ix = 0;
+ for (::DDS_Long i = 0 ; i < sample_info.length(); i++)
+ {
+ if(sample_info[i].valid_data)
+ {
+ infoseq[ix] <<= sample_info[i];
+ inst_seq[ix] = data[i];
+ ++ix;
+ }
+ }
+ listener_->on_many_data (inst_seq, infoseq);
+ }
+ }
+ // Return the loan
+ reader->return_loan(data, sample_info);
+}
+
+template <typename DDS_TYPE, typename CCM_TYPE>
+::DDS::StatusMask
+CIAO::DDS4CCM::DataReaderListener_T<DDS_TYPE, CCM_TYPE>::get_mask (
+ typename CCM_TYPE::listener_type::_ptr_type listener)
+{
+ if (!CORBA::is_nil (listener) || CIAO_debug_level >= 10)
+ {
+ return ::DDS::DATA_AVAILABLE_STATUS |
+ ::DDS::REQUESTED_DEADLINE_MISSED_STATUS |
+ ::DDS::SAMPLE_LOST_STATUS;
+ }
+ else
+ {
+ return 0;
+ }
+}