diff options
author | Johnny Willemsen <jwillemsen@remedy.nl> | 2013-08-29 11:15:22 +0000 |
---|---|---|
committer | Johnny Willemsen <jwillemsen@remedy.nl> | 2013-08-29 11:15:22 +0000 |
commit | 6fe5790f97a39adbfd9768f53a949e495a78987a (patch) | |
tree | 22af43971e86d9c7a00cb14a96c48b8cdc2e49f5 /CIAO/connectors | |
parent | 9b4dfe46b8b375a19e26085f02a98602b88ca8aa (diff) | |
download | ATCD-6fe5790f97a39adbfd9768f53a949e495a78987a.tar.gz |
Thu Aug 29 11:11:22 UTC 2013 Johnny Willemsen <jwillemsen@remedy.nl>
* connectors/dds4ccm/impl/Coherent_Changes_Guard.h:
* connectors/dds4ccm/impl/DDS_Event_Connector_T.h:
* connectors/dds4ccm/impl/DDS_Listen_T.h:
* connectors/dds4ccm/impl/DDS_Listen_T.cpp:
* connectors/dds4ccm/impl/DDS_StateListen_T.h:
* connectors/dds4ccm/impl/DDS_StateListen_T.cpp:
* connectors/dds4ccm/impl/DDS_State_Connector_T.h:
* connectors/dds4ccm/impl/DataReaderListener_T.h:
* connectors/dds4ccm/impl/DataReaderListener_T.cpp:
* connectors/dds4ccm/impl/DataReaderStateListener_T.h:
* connectors/dds4ccm/impl/DataReaderStateListener_T.cpp:
* connectors/dds4ccm/impl/Getter_T.h:
* connectors/dds4ccm/impl/LocalObject.h:
* connectors/dds4ccm/impl/Utils.h:
Fixes for the DDS State connector. The semantics of this
connector is that the state is kept in DDS, but the listeners
where doing a take which removed the data from DDS. The basic
and extended listener ports C++ templates are extended with a
new template argument which is used to control the semantics
of the listener, DDS4CCM_READ or DDS4CCM_TAKE. The State
connector
does use read, the Event connector does use take. This fixes
bugzilla 4123
* connectors/dds4ccm/examples/Hello/Sender/Hello_Sender_exec.h:
* connectors/dds4ccm/examples/Hello/Sender/Hello_Sender_exec.cpp:
* connectors/dds4ccm/tests/KeyedSamples/Sender/Keyed_Test_Sender_exec.h:
* connectors/dds4ccm/tests/KeyedSamples/Sender/Keyed_Test_Sender_exec.cpp:
* connectors/dds4ccm/tests/ListenManyByMany/Receiver/LMBM_Test_Receiver_exec.h:
* connectors/dds4ccm/tests/ListenManyByMany/Receiver/LMBM_Test_Receiver_exec.cpp:
* connectors/dds4ccm/tests/ListenManyByMany/descriptors/Plan.cdp:
* connectors/dds4ccm/tests/MultiTopic/Connector/MultiTopic_Connector_T.h:
* connectors/dds4ccm/tests/MultipleTemp/Sender/MultipleTemp_Sender_exec.cpp:
* connectors/dds4ccm/tests/SLDisabled/Sender/SL_Disabled_Sender_exec.cpp:
* connectors/dds4ccm/tests/SLManyByMany/Receiver/SL_ManyByMany_Receiver_exec.h:
* connectors/dds4ccm/tests/SLManyByMany/Receiver/SL_ManyByMany_Receiver_exec.cpp:
* connectors/dds4ccm/tests/SLOneByOne/Receiver/SL_OneByOne_Receiver_exec.h:
* connectors/dds4ccm/tests/SLOneByOne/Receiver/SL_OneByOne_Receiver_exec.cpp:
* connectors/dds4ccm/tests/SLOneByOne/Sender/SL_OneByOne_Sender_exec.h:
* connectors/dds4ccm/tests/SLOneByOne/Sender/SL_OneByOne_Sender_exec.cpp:
* connectors/dds4ccm/tests/Updater/Sender/Updater_Sender_exec.cpp:
Extended a few tests to check the state in DDS after they
have received some samples through the listener. In previous
versions this never returned data, with the updated DDS4CCM
State connector semantics this now returns data
Diffstat (limited to 'CIAO/connectors')
31 files changed, 598 insertions, 369 deletions
diff --git a/CIAO/connectors/dds4ccm/examples/Hello/Sender/Hello_Sender_exec.cpp b/CIAO/connectors/dds4ccm/examples/Hello/Sender/Hello_Sender_exec.cpp index 8d51425cda2..909c4a6c336 100644 --- a/CIAO/connectors/dds4ccm/examples/Hello/Sender/Hello_Sender_exec.cpp +++ b/CIAO/connectors/dds4ccm/examples/Hello/Sender/Hello_Sender_exec.cpp @@ -326,9 +326,8 @@ namespace CIAO_Hello_Sender_Impl if (!this->ready_to_start_.value()) { ACE_ERROR ((LM_ERROR, ACE_TEXT ("Sender_exec_i::stop - ") - ACE_TEXT ("Sender never got ready to start\n"))); + ACE_TEXT ("ERROR Sender never got ready to start\n"))); } - } diff --git a/CIAO/connectors/dds4ccm/examples/Hello/Sender/Hello_Sender_exec.h b/CIAO/connectors/dds4ccm/examples/Hello/Sender/Hello_Sender_exec.h index 7126b1daeb7..0b346ff8dce 100644 --- a/CIAO/connectors/dds4ccm/examples/Hello/Sender/Hello_Sender_exec.h +++ b/CIAO/connectors/dds4ccm/examples/Hello/Sender/Hello_Sender_exec.h @@ -185,8 +185,7 @@ namespace CIAO_Hello_Sender_Impl CORBA::Boolean log_time_; ACE_CString msg_; - ACE_CString create_message ( - const ACE_CString &msg); + ACE_CString create_message (const ACE_CString &msg); Atomic_Boolean ready_to_start_; }; diff --git a/CIAO/connectors/dds4ccm/impl/Coherent_Changes_Guard.h b/CIAO/connectors/dds4ccm/impl/Coherent_Changes_Guard.h index 90067271e52..43bdabba816 100644 --- a/CIAO/connectors/dds4ccm/impl/Coherent_Changes_Guard.h +++ b/CIAO/connectors/dds4ccm/impl/Coherent_Changes_Guard.h @@ -18,6 +18,10 @@ namespace CIAO { namespace DDS4CCM { + /** + * Guard class which calls begin_coherent_changes() on + * construction and end_coherent_changes() at destruction + */ class DDS4CCM_DDS_IMPL_Export Coherent_Changes_Guard : private ACE_Copy_Disabled { diff --git a/CIAO/connectors/dds4ccm/impl/DDS_Event_Connector_T.h b/CIAO/connectors/dds4ccm/impl/DDS_Event_Connector_T.h index b42d90ba7a8..b4390b281b9 100644 --- a/CIAO/connectors/dds4ccm/impl/DDS_Event_Connector_T.h +++ b/CIAO/connectors/dds4ccm/impl/DDS_Event_Connector_T.h @@ -104,7 +104,8 @@ private: typename CCM_TYPE::push_consumer_traits, typename DDS_TYPE::typed_reader_type, typename DDS_TYPE::value_type, - SEQ_TYPE> + SEQ_TYPE, + CIAO::DDS4CCM::DDS4CCM_TAKE> push_consumer_; //@} diff --git a/CIAO/connectors/dds4ccm/impl/DDS_Listen_T.cpp b/CIAO/connectors/dds4ccm/impl/DDS_Listen_T.cpp index d96d926e7a3..14892b2ae33 100644 --- a/CIAO/connectors/dds4ccm/impl/DDS_Listen_T.cpp +++ b/CIAO/connectors/dds4ccm/impl/DDS_Listen_T.cpp @@ -5,33 +5,33 @@ #include "dds4ccm/impl/DataReaderListener_T.h" #include "dds4ccm/impl/logger/Log_Macros.h" -template <typename CCM_TYPE, typename TYPED_DDS_READER, typename VALUE_TYPE, typename SEQ_VALUE_TYPE> -DDS_Listen_T<CCM_TYPE, TYPED_DDS_READER, VALUE_TYPE, SEQ_VALUE_TYPE>::DDS_Listen_T (void) +template <typename CCM_TYPE, typename TYPED_DDS_READER, typename VALUE_TYPE, typename SEQ_VALUE_TYPE, CIAO::DDS4CCM::DDS4CCM_LISTENER_READ_TAKE LRT> +DDS_Listen_T<CCM_TYPE, TYPED_DDS_READER, VALUE_TYPE, SEQ_VALUE_TYPE, LRT>::DDS_Listen_T (void) { } -template <typename CCM_TYPE, typename TYPED_DDS_READER, typename VALUE_TYPE, typename SEQ_VALUE_TYPE> -DDS_Listen_T<CCM_TYPE, TYPED_DDS_READER, VALUE_TYPE, SEQ_VALUE_TYPE>::~DDS_Listen_T (void) +template <typename CCM_TYPE, typename TYPED_DDS_READER, typename VALUE_TYPE, typename SEQ_VALUE_TYPE, CIAO::DDS4CCM::DDS4CCM_LISTENER_READ_TAKE LRT> +DDS_Listen_T<CCM_TYPE, TYPED_DDS_READER, VALUE_TYPE, SEQ_VALUE_TYPE, LRT>::~DDS_Listen_T (void) { } -template <typename CCM_TYPE, typename TYPED_DDS_READER, typename VALUE_TYPE, typename SEQ_VALUE_TYPE> +template <typename CCM_TYPE, typename TYPED_DDS_READER, typename VALUE_TYPE, typename SEQ_VALUE_TYPE, CIAO::DDS4CCM::DDS4CCM_LISTENER_READ_TAKE LRT> void -DDS_Listen_T<CCM_TYPE, TYPED_DDS_READER, VALUE_TYPE, SEQ_VALUE_TYPE>::set_component ( +DDS_Listen_T<CCM_TYPE, TYPED_DDS_READER, VALUE_TYPE, SEQ_VALUE_TYPE, LRT>::set_component ( ::CORBA::Object_ptr component) { DDS_Subscriber_Base_T<CCM_TYPE, TYPED_DDS_READER, VALUE_TYPE, SEQ_VALUE_TYPE>::set_component (component); this->data_control_->_set_component (component); } -template <typename CCM_TYPE, typename TYPED_DDS_READER, typename VALUE_TYPE, typename SEQ_VALUE_TYPE> +template <typename CCM_TYPE, typename TYPED_DDS_READER, typename VALUE_TYPE, typename SEQ_VALUE_TYPE, CIAO::DDS4CCM::DDS4CCM_LISTENER_READ_TAKE LRT> void -DDS_Listen_T<CCM_TYPE, TYPED_DDS_READER, VALUE_TYPE, SEQ_VALUE_TYPE>::activate ( +DDS_Listen_T<CCM_TYPE, TYPED_DDS_READER, VALUE_TYPE, SEQ_VALUE_TYPE, LRT>::activate ( typename CCM_TYPE::data_listener_type::_ptr_type listener, ::CCM_DDS::PortStatusListener_ptr status, ACE_Reactor* reactor) { - DDS4CCM_TRACE ("DDS_Listen_T<CCM_TYPE, TYPED_DDS_READER, VALUE_TYPE, SEQ_VALUE_TYPE>::activate"); + DDS4CCM_TRACE ("DDS_Listen_T::activate"); ::DDS::StatusMask const mask = DataReaderListener_type::get_mask (listener, status); @@ -74,21 +74,21 @@ DDS_Listen_T<CCM_TYPE, TYPED_DDS_READER, VALUE_TYPE, SEQ_VALUE_TYPE>::activate ( } } -template <typename CCM_TYPE, typename TYPED_DDS_READER, typename VALUE_TYPE, typename SEQ_VALUE_TYPE> +template <typename CCM_TYPE, typename TYPED_DDS_READER, typename VALUE_TYPE, typename SEQ_VALUE_TYPE, CIAO::DDS4CCM::DDS4CCM_LISTENER_READ_TAKE LRT> void -DDS_Listen_T<CCM_TYPE, TYPED_DDS_READER, VALUE_TYPE, SEQ_VALUE_TYPE>::remove ( +DDS_Listen_T<CCM_TYPE, TYPED_DDS_READER, VALUE_TYPE, SEQ_VALUE_TYPE, LRT>::remove ( ::DDS::Subscriber_ptr subscriber) { - DDS4CCM_TRACE ("DDS_Listen_T<CCM_TYPE, TYPED_DDS_READER, VALUE_TYPE, SEQ_VALUE_TYPE>::remove"); + DDS4CCM_TRACE ("DDS_Listen_T::remove"); SubscriberBase_type::remove (subscriber); } -template <typename CCM_TYPE, typename TYPED_DDS_READER, typename VALUE_TYPE, typename SEQ_VALUE_TYPE> +template <typename CCM_TYPE, typename TYPED_DDS_READER, typename VALUE_TYPE, typename SEQ_VALUE_TYPE, CIAO::DDS4CCM::DDS4CCM_LISTENER_READ_TAKE LRT> ::CCM_DDS::CCM_DataListenerControl_ptr -DDS_Listen_T<CCM_TYPE, TYPED_DDS_READER, VALUE_TYPE, SEQ_VALUE_TYPE>::get_data_control (void) +DDS_Listen_T<CCM_TYPE, TYPED_DDS_READER, VALUE_TYPE, SEQ_VALUE_TYPE, LRT>::get_data_control (void) { - DDS4CCM_TRACE ("DDS_Listen_T<CCM_TYPE, TYPED_DDS_READER, VALUE_TYPE, SEQ_VALUE_TYPE>::get_data_control"); + DDS4CCM_TRACE ("DDS_Listen_T::get_data_control"); return ::CCM_DDS::CCM_DataListenerControl::_duplicate (this->data_control_); } diff --git a/CIAO/connectors/dds4ccm/impl/DDS_Listen_T.h b/CIAO/connectors/dds4ccm/impl/DDS_Listen_T.h index 970678ed91c..858d8323ff9 100644 --- a/CIAO/connectors/dds4ccm/impl/DDS_Listen_T.h +++ b/CIAO/connectors/dds4ccm/impl/DDS_Listen_T.h @@ -11,12 +11,20 @@ #include "dds4ccm/impl/DDS_Subscriber_Base_T.h" #include "dds4ccm/impl/DataReaderListener_T.h" #include "dds4ccm/impl/DataListenerControl_T.h" +#include "dds4ccm/impl/Utils.h" ACE_BEGIN_VERSIONED_NAMESPACE_DECL class ACE_Reactor; ACE_END_VERSIONED_NAMESPACE_DECL -template <typename CCM_TYPE, typename TYPED_DDS_READER, typename VALUE_TYPE, typename SEQ_VALUE_TYPE> +/** + * Template implementing the DDS4CCM Listen basic port + * @tparam CCM_TYPE Set of type traits for this basic port + * @tparam TYPED_DDS_READER The typed DDS DataReader type + * @tparam SEQ_TYPE The type of sequence + * @tparam LRT An enum indicating the read or take semantics of this port + */ +template <typename CCM_TYPE, typename TYPED_DDS_READER, typename VALUE_TYPE, typename SEQ_VALUE_TYPE, CIAO::DDS4CCM::DDS4CCM_LISTENER_READ_TAKE LRT> class DDS_Listen_T : public DDS_Subscriber_Base_T<CCM_TYPE, TYPED_DDS_READER, VALUE_TYPE, SEQ_VALUE_TYPE> { @@ -42,7 +50,7 @@ public: void remove (::DDS::Subscriber_ptr subscriber); private: - typedef ::CIAO::DDS4CCM::DataReaderListener_T<CCM_TYPE, TYPED_DDS_READER, SEQ_VALUE_TYPE> + typedef ::CIAO::DDS4CCM::DataReaderListener_T<CCM_TYPE, TYPED_DDS_READER, SEQ_VALUE_TYPE, LRT> DataReaderListener_type; typedef CCM_DDS_DataListenerControl_T< ::CCM_DDS::CCM_DataListenerControl> DataListenerControl_type; diff --git a/CIAO/connectors/dds4ccm/impl/DDS_StateListen_T.cpp b/CIAO/connectors/dds4ccm/impl/DDS_StateListen_T.cpp index 62eb3c5db87..13f99f0d508 100644 --- a/CIAO/connectors/dds4ccm/impl/DDS_StateListen_T.cpp +++ b/CIAO/connectors/dds4ccm/impl/DDS_StateListen_T.cpp @@ -5,33 +5,33 @@ #include "dds4ccm/impl/StateListenerControl_T.h" #include "dds4ccm/impl/logger/Log_Macros.h" -template <typename CCM_TYPE, typename TYPED_DDS_READER, typename VALUE_TYPE, typename SEQ_VALUE_TYPE> -DDS_StateListen_T<CCM_TYPE, TYPED_DDS_READER, VALUE_TYPE, SEQ_VALUE_TYPE>::DDS_StateListen_T (void) +template <typename CCM_TYPE, typename TYPED_DDS_READER, typename VALUE_TYPE, typename SEQ_VALUE_TYPE, CIAO::DDS4CCM::DDS4CCM_LISTENER_READ_TAKE LRT> +DDS_StateListen_T<CCM_TYPE, TYPED_DDS_READER, VALUE_TYPE, SEQ_VALUE_TYPE, LRT>::DDS_StateListen_T (void) { } -template <typename CCM_TYPE, typename TYPED_DDS_READER, typename VALUE_TYPE, typename SEQ_VALUE_TYPE> -DDS_StateListen_T<CCM_TYPE, TYPED_DDS_READER, VALUE_TYPE, SEQ_VALUE_TYPE>::~DDS_StateListen_T (void) +template <typename CCM_TYPE, typename TYPED_DDS_READER, typename VALUE_TYPE, typename SEQ_VALUE_TYPE, CIAO::DDS4CCM::DDS4CCM_LISTENER_READ_TAKE LRT> +DDS_StateListen_T<CCM_TYPE, TYPED_DDS_READER, VALUE_TYPE, SEQ_VALUE_TYPE, LRT>::~DDS_StateListen_T (void) { } -template <typename CCM_TYPE, typename TYPED_DDS_READER, typename VALUE_TYPE, typename SEQ_VALUE_TYPE> +template <typename CCM_TYPE, typename TYPED_DDS_READER, typename VALUE_TYPE, typename SEQ_VALUE_TYPE, CIAO::DDS4CCM::DDS4CCM_LISTENER_READ_TAKE LRT> void -DDS_StateListen_T<CCM_TYPE, TYPED_DDS_READER, VALUE_TYPE, SEQ_VALUE_TYPE>::set_component ( +DDS_StateListen_T<CCM_TYPE, TYPED_DDS_READER, VALUE_TYPE, SEQ_VALUE_TYPE, LRT>::set_component ( ::CORBA::Object_ptr component) { DDS_Subscriber_Base_T<CCM_TYPE, TYPED_DDS_READER, VALUE_TYPE, SEQ_VALUE_TYPE>::set_component (component); this->data_control_->_set_component (component); } -template <typename CCM_TYPE, typename TYPED_DDS_READER, typename VALUE_TYPE, typename SEQ_VALUE_TYPE> +template <typename CCM_TYPE, typename TYPED_DDS_READER, typename VALUE_TYPE, typename SEQ_VALUE_TYPE, CIAO::DDS4CCM::DDS4CCM_LISTENER_READ_TAKE LRT> void -DDS_StateListen_T<CCM_TYPE, TYPED_DDS_READER, VALUE_TYPE, SEQ_VALUE_TYPE>::activate ( +DDS_StateListen_T<CCM_TYPE, TYPED_DDS_READER, VALUE_TYPE, SEQ_VALUE_TYPE, LRT>::activate ( typename CCM_TYPE::data_listener_type::_ptr_type listener, ::CCM_DDS::PortStatusListener_ptr status, ACE_Reactor* reactor) { - DDS4CCM_TRACE ("DDS_StateListen_T<CCM_TYPE, TYPED_DDS_READER, VALUE_TYPE, SEQ_VALUE_TYPE>::activate"); + DDS4CCM_TRACE ("DDS_StateListen_T::activate"); ::DDS::StatusMask const mask = DataReaderStateListener_type::get_mask (listener); @@ -75,21 +75,21 @@ DDS_StateListen_T<CCM_TYPE, TYPED_DDS_READER, VALUE_TYPE, SEQ_VALUE_TYPE>::activ } } -template <typename CCM_TYPE, typename TYPED_DDS_READER, typename VALUE_TYPE, typename SEQ_VALUE_TYPE> +template <typename CCM_TYPE, typename TYPED_DDS_READER, typename VALUE_TYPE, typename SEQ_VALUE_TYPE, CIAO::DDS4CCM::DDS4CCM_LISTENER_READ_TAKE LRT> void -DDS_StateListen_T<CCM_TYPE, TYPED_DDS_READER, VALUE_TYPE, SEQ_VALUE_TYPE>::remove ( +DDS_StateListen_T<CCM_TYPE, TYPED_DDS_READER, VALUE_TYPE, SEQ_VALUE_TYPE, LRT>::remove ( ::DDS::Subscriber_ptr subscriber) { - DDS4CCM_TRACE ("DDS_StateListen_T<CCM_TYPE, TYPED_DDS_READER, VALUE_TYPE, SEQ_VALUE_TYPE>::remove"); + DDS4CCM_TRACE ("DDS_StateListen_T::remove"); SubscriberBase_type::remove (subscriber); } -template <typename CCM_TYPE, typename TYPED_DDS_READER, typename VALUE_TYPE, typename SEQ_VALUE_TYPE> +template <typename CCM_TYPE, typename TYPED_DDS_READER, typename VALUE_TYPE, typename SEQ_VALUE_TYPE, CIAO::DDS4CCM::DDS4CCM_LISTENER_READ_TAKE LRT> ::CCM_DDS::CCM_StateListenerControl_ptr -DDS_StateListen_T<CCM_TYPE, TYPED_DDS_READER, VALUE_TYPE, SEQ_VALUE_TYPE>::get_data_control (void) +DDS_StateListen_T<CCM_TYPE, TYPED_DDS_READER, VALUE_TYPE, SEQ_VALUE_TYPE, LRT>::get_data_control (void) { - DDS4CCM_TRACE ("DDS_StateListen_T<CCM_TYPE, TYPED_DDS_READER, VALUE_TYPE, SEQ_VALUE_TYPE>::get_data_control"); + DDS4CCM_TRACE ("DDS_StateListen_T::get_data_control"); return ::CCM_DDS::CCM_StateListenerControl::_duplicate (this->data_control_); } diff --git a/CIAO/connectors/dds4ccm/impl/DDS_StateListen_T.h b/CIAO/connectors/dds4ccm/impl/DDS_StateListen_T.h index 5395c2dced7..609e1efe6cd 100644 --- a/CIAO/connectors/dds4ccm/impl/DDS_StateListen_T.h +++ b/CIAO/connectors/dds4ccm/impl/DDS_StateListen_T.h @@ -3,7 +3,7 @@ * * $Id$ * - * Wrapper facade for NDDS. + * DDS4CCM StateListen port */ #ifndef DDS_STATELISTEN_T_H_ #define DDS_STATELISTEN_T_H_ @@ -11,8 +11,9 @@ #include "dds4ccm/impl/DDS_Subscriber_Base_T.h" #include "dds4ccm/impl/StateListenerControl_T.h" #include "dds4ccm/impl/DataReaderStateListener_T.h" +#include "dds4ccm/impl/Utils.h" -template <typename CCM_TYPE, typename TYPED_DDS_READER, typename VALUE_TYPE, typename SEQ_VALUE_TYPE> +template <typename CCM_TYPE, typename TYPED_DDS_READER, typename VALUE_TYPE, typename SEQ_VALUE_TYPE, CIAO::DDS4CCM::DDS4CCM_LISTENER_READ_TAKE LRT> class DDS_StateListen_T : public DDS_Subscriber_Base_T<CCM_TYPE, TYPED_DDS_READER, @@ -41,7 +42,7 @@ public: void remove (::DDS::Subscriber_ptr subscriber); private: - typedef ::CIAO::DDS4CCM::DataReaderStateListener_T<CCM_TYPE, TYPED_DDS_READER, SEQ_VALUE_TYPE> + typedef ::CIAO::DDS4CCM::DataReaderStateListener_T<CCM_TYPE, TYPED_DDS_READER, SEQ_VALUE_TYPE, LRT> DataReaderStateListener_type; typedef CCM_DDS_StateListenerControl_T< ::CCM_DDS::CCM_StateListenerControl> StateListenerControl_type; diff --git a/CIAO/connectors/dds4ccm/impl/DDS_State_Connector_T.h b/CIAO/connectors/dds4ccm/impl/DDS_State_Connector_T.h index b27b8f2887d..b24d14ab19e 100644 --- a/CIAO/connectors/dds4ccm/impl/DDS_State_Connector_T.h +++ b/CIAO/connectors/dds4ccm/impl/DDS_State_Connector_T.h @@ -181,7 +181,8 @@ private: typename CCM_TYPE::push_observer_traits, typename DDS_TYPE::typed_reader_type, typename DDS_TYPE::value_type, - SEQ_TYPE> + SEQ_TYPE, + CIAO::DDS4CCM::DDS4CCM_READ> push_observer_; //@} @@ -193,7 +194,8 @@ private: typename CCM_TYPE::push_state_observer_traits, typename DDS_TYPE::typed_reader_type, typename DDS_TYPE::value_type, - SEQ_TYPE> + SEQ_TYPE, + CIAO::DDS4CCM::DDS4CCM_READ> push_state_observer_; //@} diff --git a/CIAO/connectors/dds4ccm/impl/DataReaderListener_T.cpp b/CIAO/connectors/dds4ccm/impl/DataReaderListener_T.cpp index 24b30dfb4a3..29453804c04 100644 --- a/CIAO/connectors/dds4ccm/impl/DataReaderListener_T.cpp +++ b/CIAO/connectors/dds4ccm/impl/DataReaderListener_T.cpp @@ -8,8 +8,8 @@ namespace CIAO { namespace DDS4CCM { - template <typename CCM_TYPE, typename TYPED_READER, typename SEQ_TYPE> - DataReaderListener_T<CCM_TYPE, TYPED_READER, SEQ_TYPE>::DataReaderListener_T ( + template <typename CCM_TYPE, typename TYPED_DDS_READER, typename SEQ_TYPE, DDS4CCM_LISTENER_READ_TAKE LRT> + DataReaderListenerBase_T<CCM_TYPE, TYPED_DDS_READER, SEQ_TYPE, LRT>::DataReaderListenerBase_T ( typename CCM_TYPE::data_listener_type::_ptr_type listener, ::CCM_DDS::PortStatusListener_ptr port_status_listener, ::CCM_DDS::DataListenerControl_ptr control, @@ -20,21 +20,21 @@ namespace CIAO control_ (::CCM_DDS::DataListenerControl::_duplicate (control)), condition_manager_ (condition_manager) { - DDS4CCM_TRACE ("DataReaderListener_T::DataReaderListener_T"); + DDS4CCM_TRACE ("DataReaderListenerBase_T::DataReaderListenerBase_T"); } - template <typename CCM_TYPE, typename TYPED_READER, typename SEQ_TYPE> - DataReaderListener_T<CCM_TYPE, TYPED_READER, SEQ_TYPE>::~DataReaderListener_T (void) + template <typename CCM_TYPE, typename TYPED_DDS_READER, typename SEQ_TYPE, DDS4CCM_LISTENER_READ_TAKE LRT> + DataReaderListenerBase_T<CCM_TYPE, TYPED_DDS_READER, SEQ_TYPE, LRT>::~DataReaderListenerBase_T (void) { - DDS4CCM_TRACE ("DataReaderListener_T::~DataReaderListener_T"); + DDS4CCM_TRACE ("DataReaderListenerBase_T::~DataReaderListenerBase_T"); } - template <typename CCM_TYPE, typename TYPED_READER, typename SEQ_TYPE> + template <typename CCM_TYPE, typename TYPED_DDS_READER, typename SEQ_TYPE, DDS4CCM_LISTENER_READ_TAKE LRT> void - DataReaderListener_T<CCM_TYPE, TYPED_READER, SEQ_TYPE>::on_data_available ( + DataReaderListenerBase_T<CCM_TYPE, TYPED_DDS_READER, SEQ_TYPE, LRT>::on_data_available ( ::DDS::DataReader_ptr rdr) { - DDS4CCM_TRACE ("DataReaderListener_T::on_data_available"); + DDS4CCM_TRACE ("DataReaderListenerBase_T::on_data_available"); if (!::CORBA::is_nil (rdr) && this->control_->mode () != ::CCM_DDS::NOT_ENABLED) @@ -59,17 +59,81 @@ namespace CIAO } } - template <typename CCM_TYPE, typename TYPED_READER, typename SEQ_TYPE> + template <typename CCM_TYPE, typename TYPED_DDS_READER, typename SEQ_TYPE> + ::DDS::ReturnCode_t + DataReaderListener_T<CCM_TYPE, TYPED_DDS_READER, SEQ_TYPE, CIAO::DDS4CCM::DDS4CCM_TAKE>::get_data_i ( + typename TYPED_DDS_READER::_ptr_type reader, + ::DDS::QueryCondition_ptr qc, + SEQ_TYPE &data, + ::DDS::SampleInfoSeq &sample_info, + ::CORBA::Long max_samples) + { + DDS4CCM_TRACE ("DataReaderListener_T::get_data_i"); + + ::DDS::ReturnCode_t result = ::DDS::RETCODE_OK; + if (! ::CORBA::is_nil (qc)) + { + ::DDS::ReadCondition_var rd = ::DDS::ReadCondition::_narrow (qc); + result = reader->take_w_condition (data, + sample_info, + max_samples, + rd.in ()); + } + else + { + result = reader->take (data, + sample_info, + max_samples, + ::DDS::NOT_READ_SAMPLE_STATE, + ::DDS::NEW_VIEW_STATE | ::DDS::NOT_NEW_VIEW_STATE, + ::DDS::ANY_INSTANCE_STATE); + } + return result; + } + + template <typename CCM_TYPE, typename TYPED_DDS_READER, typename SEQ_TYPE> + ::DDS::ReturnCode_t + DataReaderListener_T<CCM_TYPE, TYPED_DDS_READER, SEQ_TYPE, CIAO::DDS4CCM::DDS4CCM_READ>::get_data_i ( + typename TYPED_DDS_READER::_ptr_type reader, + ::DDS::QueryCondition_ptr qc, + SEQ_TYPE &data, + ::DDS::SampleInfoSeq &sample_info, + ::CORBA::Long max_samples) + { + DDS4CCM_TRACE ("DataReaderListener_T::get_data_i"); + + ::DDS::ReturnCode_t result = ::DDS::RETCODE_OK; + if (! ::CORBA::is_nil (qc)) + { + ::DDS::ReadCondition_var rd = ::DDS::ReadCondition::_narrow (qc); + result = reader->read_w_condition (data, + sample_info, + max_samples, + rd.in ()); + } + else + { + result = reader->read (data, + sample_info, + max_samples, + ::DDS::NOT_READ_SAMPLE_STATE, + ::DDS::NEW_VIEW_STATE | ::DDS::NOT_NEW_VIEW_STATE, + ::DDS::ANY_INSTANCE_STATE); + } + return result; + } + + template <typename CCM_TYPE, typename TYPED_DDS_READER, typename SEQ_TYPE, DDS4CCM_LISTENER_READ_TAKE LRT> void - DataReaderListener_T<CCM_TYPE, TYPED_READER, SEQ_TYPE>::on_data_available_i ( + DataReaderListenerBase_T<CCM_TYPE, TYPED_DDS_READER, SEQ_TYPE, LRT>::on_data_available_i ( ::DDS::DataReader_ptr rdr) { - DDS4CCM_TRACE ("DataReaderListener_T::on_data_available_i"); + DDS4CCM_TRACE ("DataReaderListenerBase_T::on_data_available_i"); if (::CORBA::is_nil (rdr)) { DDS4CCM_ERROR (DDS4CCM_LOG_LEVEL_ERROR, (LM_ERROR, DDS4CCM_INFO - ACE_TEXT ("DataReaderListener_T::on_data_available_i - ") + ACE_TEXT ("DataReaderListenerBase_T::on_data_available_i - ") ACE_TEXT ("No datareader received.\n"))); return; } @@ -80,13 +144,13 @@ namespace CIAO return; } - typename TYPED_READER::_var_type reader; - reader = TYPED_READER::_narrow (rdr); + typename TYPED_DDS_READER::_var_type reader; + reader = TYPED_DDS_READER::_narrow (rdr); if (::CORBA::is_nil (reader.in ())) { DDS4CCM_ERROR (DDS4CCM_LOG_LEVEL_ERROR, (LM_ERROR, DDS4CCM_INFO - ACE_TEXT ("DataReaderListener_T::on_data_available_i - ") + ACE_TEXT ("DataReaderListenerBase_T::on_data_available_i - ") ACE_TEXT ("Failed to narrow DataReader to a type ") ACE_TEXT ("specific DataReader.\n"))); return; @@ -107,29 +171,12 @@ namespace CIAO ::DDS::QueryCondition_var qc = this->condition_manager_.get_querycondition_listener (); - ::DDS::ReturnCode_t result = ::DDS::RETCODE_OK; - - if (! ::CORBA::is_nil (qc.in ())) - { - ::DDS::ReadCondition_var rd = ::DDS::ReadCondition::_narrow (qc.in ()); - result = reader->take_w_condition (data, - sample_info, - max_samples, - rd.in ()); - } - else - { - result = reader->take (data, - sample_info, - max_samples, - ::DDS::NOT_READ_SAMPLE_STATE, - ::DDS::NEW_VIEW_STATE | ::DDS::NOT_NEW_VIEW_STATE, - ::DDS::ANY_INSTANCE_STATE); - } + ::DDS::ReturnCode_t const result = + this->get_data_i (reader, qc.in (), data, sample_info, max_samples); DDS4CCM_DEBUG (DDS4CCM_LOG_LEVEL_DDS_STATUS, (LM_INFO, DDS4CCM_INFO - ACE_TEXT ("DataReaderListener_T::on_data_available_i - ") - ACE_TEXT ("Take data returned %C.\n"), + ACE_TEXT ("DataReaderListenerBase_T::on_data_available_i - ") + ACE_TEXT ("Get data returned %C.\n"), translate_retcode (result))); if (result == ::DDS::RETCODE_OK) @@ -186,7 +233,7 @@ namespace CIAO if (retval != ::DDS::RETCODE_OK) { DDS4CCM_ERROR (DDS4CCM_LOG_LEVEL_ERROR, (LM_ERROR, DDS4CCM_INFO - ACE_TEXT ("DataReaderListener_T::on_data_available_i - ") + ACE_TEXT ("DataReaderListenerBase_T::on_data_available_i - ") ACE_TEXT ("Error returning loan to DDS - <%C>\n"), translate_retcode (retval))); // No exception here since this the DDS vendor doesn't expect this. @@ -198,14 +245,14 @@ namespace CIAO DDS4CCM_PRINT_DEBUG_CORBA_EXCEPTION ( DDS4CCM_LOG_LEVEL_ACTION, ex, - "DataReaderListener_T::on_data_available_i"); + "DataReaderListenerBase_T::on_data_available_i"); } catch (const ::CORBA::Exception& ex) { DDS4CCM_PRINT_CORBA_EXCEPTION ( DDS4CCM_LOG_LEVEL_ERROR, ex, - "DataReaderListener_T::on_data_available_i"); + "DataReaderListenerBase_T::on_data_available_i"); } catch (...) { @@ -215,13 +262,13 @@ namespace CIAO } } - template <typename CCM_TYPE, typename TYPED_READER, typename SEQ_TYPE> + template <typename CCM_TYPE, typename TYPED_DDS_READER, typename SEQ_TYPE, DDS4CCM_LISTENER_READ_TAKE LRT> ::DDS::StatusMask - DataReaderListener_T<CCM_TYPE, TYPED_READER, SEQ_TYPE>::get_mask ( + DataReaderListenerBase_T<CCM_TYPE, TYPED_DDS_READER, SEQ_TYPE, LRT>::get_mask ( typename CCM_TYPE::data_listener_type::_ptr_type listener, ::CCM_DDS::PortStatusListener_ptr status) { - DDS4CCM_TRACE ("DataReaderListener_T::get_mask"); + DDS4CCM_TRACE ("DataReaderListenerBase_T::get_mask"); ::DDS::StatusMask mask = 0; @@ -237,7 +284,7 @@ namespace CIAO ACE_CString msk; translate_statusmask (msk, mask); DDS4CCM_DEBUG (DDS4CCM_LOG_LEVEL_DDS_STATUS, (LM_DEBUG, DDS4CCM_INFO - "DataReaderListener_T::get_mask - " + "DataReaderListenerBase_T::get_mask - " "Mask becomes %C\n", msk.c_str ())); } diff --git a/CIAO/connectors/dds4ccm/impl/DataReaderListener_T.h b/CIAO/connectors/dds4ccm/impl/DataReaderListener_T.h index b1914887483..cd87b3cb9ff 100644 --- a/CIAO/connectors/dds4ccm/impl/DataReaderListener_T.h +++ b/CIAO/connectors/dds4ccm/impl/DataReaderListener_T.h @@ -23,18 +23,21 @@ namespace CIAO { namespace DDS4CCM { - template <typename CCM_TYPE, typename TYPED_READER, typename SEQ_TYPE> - class DataReaderListener_T : + template <typename CCM_TYPE, typename TYPED_DDS_READER, typename SEQ_TYPE, DDS4CCM_LISTENER_READ_TAKE LRT> + class DataReaderListener_T; + + template <typename CCM_TYPE, typename TYPED_DDS_READER, typename SEQ_TYPE, DDS4CCM_LISTENER_READ_TAKE LRT> + class DataReaderListenerBase_T : public PortStatusListener { - typedef DataReaderListener_T<CCM_TYPE, TYPED_READER, SEQ_TYPE> + typedef DataReaderListenerBase_T<CCM_TYPE, TYPED_DDS_READER, SEQ_TYPE, LRT> DataReaderListener_type; typedef ::CIAO::DDS4CCM::DataReaderHandler_T<DataReaderListener_type> DataReaderHandler_type; public: /// Constructor - DataReaderListener_T ( + DataReaderListenerBase_T ( typename CCM_TYPE::data_listener_type::_ptr_type listener, ::CCM_DDS::PortStatusListener_ptr port_status_listener, ::CCM_DDS::DataListenerControl_ptr control, @@ -42,7 +45,7 @@ namespace CIAO ConditionManager& condition_manager); /// Destructor - virtual ~DataReaderListener_T (void); + virtual ~DataReaderListenerBase_T (void); virtual void on_data_available (::DDS::DataReader_ptr rdr); @@ -53,10 +56,70 @@ namespace CIAO void on_data_available_i (::DDS::DataReader_ptr rdr); private: + /// Helper method to get data from DDS + virtual ::DDS::ReturnCode_t get_data_i ( + typename TYPED_DDS_READER::_ptr_type reader, + ::DDS::QueryCondition_ptr qc, + SEQ_TYPE &data, + ::DDS::SampleInfoSeq &sample_info, + ::CORBA::Long max_samples) = 0; + typename CCM_TYPE::data_listener_type::_var_type listener_; ::CCM_DDS::DataListenerControl_var control_; ConditionManager& condition_manager_; }; + + template <typename CCM_TYPE, typename TYPED_DDS_READER, typename SEQ_TYPE> + class DataReaderListener_T <CCM_TYPE, TYPED_DDS_READER, SEQ_TYPE, CIAO::DDS4CCM::DDS4CCM_TAKE> : + public DataReaderListenerBase_T <CCM_TYPE, TYPED_DDS_READER, SEQ_TYPE, CIAO::DDS4CCM::DDS4CCM_TAKE> + { + public: + /// Constructor + DataReaderListener_T ( + typename CCM_TYPE::data_listener_type::_ptr_type listener, + ::CCM_DDS::PortStatusListener_ptr port_status_listener, + ::CCM_DDS::DataListenerControl_ptr control, + ACE_Reactor * reactor, + ConditionManager& condition_manager) : + DataReaderListenerBase_T<CCM_TYPE, TYPED_DDS_READER, SEQ_TYPE, CIAO::DDS4CCM::DDS4CCM_TAKE> ( + listener, port_status_listener, control, reactor, condition_manager) + { + } + private: + /// Helper method to get data from DDS + virtual ::DDS::ReturnCode_t get_data_i ( + typename TYPED_DDS_READER::_ptr_type reader, + ::DDS::QueryCondition_ptr qc, + SEQ_TYPE &data, + ::DDS::SampleInfoSeq &sample_info, + ::CORBA::Long max_samples); + }; + + template <typename CCM_TYPE, typename TYPED_DDS_READER, typename SEQ_TYPE> + class DataReaderListener_T <CCM_TYPE, TYPED_DDS_READER, SEQ_TYPE, CIAO::DDS4CCM::DDS4CCM_READ> : + public DataReaderListenerBase_T <CCM_TYPE, TYPED_DDS_READER, SEQ_TYPE, CIAO::DDS4CCM::DDS4CCM_READ> + { + public: + /// Constructor + DataReaderListener_T ( + typename CCM_TYPE::data_listener_type::_ptr_type listener, + ::CCM_DDS::PortStatusListener_ptr port_status_listener, + ::CCM_DDS::DataListenerControl_ptr control, + ACE_Reactor * reactor, + ConditionManager& condition_manager) : + DataReaderListenerBase_T<CCM_TYPE, TYPED_DDS_READER, SEQ_TYPE, CIAO::DDS4CCM::DDS4CCM_READ> ( + listener, port_status_listener, control, reactor, condition_manager) + { + } + private: + /// Helper method to get data from DDS + virtual ::DDS::ReturnCode_t get_data_i ( + typename TYPED_DDS_READER::_ptr_type reader, + ::DDS::QueryCondition_ptr qc, + SEQ_TYPE &data, + ::DDS::SampleInfoSeq &sample_info, + ::CORBA::Long max_samples); + }; } } diff --git a/CIAO/connectors/dds4ccm/impl/DataReaderStateListener_T.cpp b/CIAO/connectors/dds4ccm/impl/DataReaderStateListener_T.cpp index b1bf63f3525..a63df652ab5 100644 --- a/CIAO/connectors/dds4ccm/impl/DataReaderStateListener_T.cpp +++ b/CIAO/connectors/dds4ccm/impl/DataReaderStateListener_T.cpp @@ -12,8 +12,8 @@ namespace CIAO { namespace DDS4CCM { - template <typename CCM_TYPE, typename TYPED_DDS_READER, typename SEQ_TYPE> - DataReaderStateListener_T<CCM_TYPE, TYPED_DDS_READER, SEQ_TYPE>::DataReaderStateListener_T ( + template <typename CCM_TYPE, typename TYPED_DDS_READER, typename SEQ_TYPE, DDS4CCM_LISTENER_READ_TAKE LRT> + DataReaderStateListenerBase_T<CCM_TYPE, TYPED_DDS_READER, SEQ_TYPE, LRT>::DataReaderStateListenerBase_T ( typename CCM_TYPE::data_listener_type::_ptr_type listener, ::CCM_DDS::PortStatusListener_ptr port_status_listener, ::CCM_DDS::StateListenerControl_ptr control, @@ -24,22 +24,22 @@ namespace CIAO control_ (::CCM_DDS::StateListenerControl::_duplicate (control)), condition_manager_ (condition_manager) { - DDS4CCM_TRACE ("DataReaderStateListener_T::DataReaderStateListener_T"); + DDS4CCM_TRACE ("DataReaderStateListenerBase_T::DataReaderStateListenerBase_T"); } // Implementation skeleton destructor - template <typename CCM_TYPE, typename TYPED_DDS_READER, typename SEQ_TYPE> - DataReaderStateListener_T<CCM_TYPE, TYPED_DDS_READER, SEQ_TYPE>::~DataReaderStateListener_T (void) + template <typename CCM_TYPE, typename TYPED_DDS_READER, typename SEQ_TYPE, DDS4CCM_LISTENER_READ_TAKE LRT> + DataReaderStateListenerBase_T<CCM_TYPE, TYPED_DDS_READER, SEQ_TYPE, LRT>::~DataReaderStateListenerBase_T (void) { - DDS4CCM_TRACE ("DataReaderStateListener_T::~DataReaderStateListener_T"); + DDS4CCM_TRACE ("DataReaderStateListenerBase_T::~DataReaderStateListenerBase_T"); } - template <typename CCM_TYPE, typename TYPED_DDS_READER, typename SEQ_TYPE> + template <typename CCM_TYPE, typename TYPED_DDS_READER, typename SEQ_TYPE, DDS4CCM_LISTENER_READ_TAKE LRT> void - DataReaderStateListener_T<CCM_TYPE, TYPED_DDS_READER, SEQ_TYPE>::on_data_available( + DataReaderStateListenerBase_T<CCM_TYPE, TYPED_DDS_READER, SEQ_TYPE, LRT>::on_data_available( ::DDS::DataReader_ptr rdr) { - DDS4CCM_TRACE ("DataReaderStateListener_T::on_data_available"); + DDS4CCM_TRACE ("DataReaderStateListenerBase_T::on_data_available"); if (!::CORBA::is_nil (rdr) && this->control_->mode () != ::CCM_DDS::NOT_ENABLED) @@ -52,7 +52,7 @@ namespace CIAO if (this->reactor_->notify (rh) != 0) { DDS4CCM_ERROR (DDS4CCM_LOG_LEVEL_ERROR, (LM_ERROR, DDS4CCM_INFO - ACE_TEXT ("DataReaderStateListener_T::on_data_available") + ACE_TEXT ("DataReaderStateListenerBase_T::on_data_available") ACE_TEXT ("failed to use reactor.\n"))); } } @@ -63,12 +63,12 @@ namespace CIAO } } - template <typename CCM_TYPE, typename TYPED_DDS_READER, typename SEQ_TYPE> + template <typename CCM_TYPE, typename TYPED_DDS_READER, typename SEQ_TYPE, DDS4CCM_LISTENER_READ_TAKE LRT> void - DataReaderStateListener_T<CCM_TYPE, TYPED_DDS_READER, SEQ_TYPE>::on_data_available_i ( + DataReaderStateListenerBase_T<CCM_TYPE, TYPED_DDS_READER, SEQ_TYPE, LRT>::on_data_available_i ( ::DDS::DataReader_ptr rdr) { - DDS4CCM_TRACE ("DataReaderStateListener_T::on_data_available_i"); + DDS4CCM_TRACE ("DataReaderStateListenerBase_T::on_data_available_i"); ::CCM_DDS::ListenerMode const mode = this->control_->mode (); if (::CORBA::is_nil (rdr) || @@ -83,7 +83,7 @@ namespace CIAO if (::CORBA::is_nil (reader.in ())) { DDS4CCM_ERROR (DDS4CCM_LOG_LEVEL_ERROR, (LM_ERROR, DDS4CCM_INFO - ACE_TEXT ("DataReaderStateListener_T::on_data_available_i - ") + ACE_TEXT ("DataReaderStateListenerBase_T::on_data_available_i - ") ACE_TEXT ("Failed to narrow DataReader to a type ") ACE_TEXT ("specific DataReader.\n"))); return; @@ -104,25 +104,8 @@ namespace CIAO ::DDS::QueryCondition_var qc = this->condition_manager_.get_querycondition_listener (); - ::DDS::ReturnCode_t result = ::DDS::RETCODE_OK; - - if (! ::CORBA::is_nil (qc.in ())) - { - ::DDS::ReadCondition_var rd = ::DDS::ReadCondition::_narrow (qc.in ()); - result = reader->take_w_condition (data, - sample_info, - max_samples, - rd.in ()); - } - else - { - result = reader->take (data, - sample_info, - max_samples, - ::DDS::NOT_READ_SAMPLE_STATE, - ::DDS::NEW_VIEW_STATE | ::DDS::NOT_NEW_VIEW_STATE, - ::DDS::ANY_INSTANCE_STATE); - } + ::DDS::ReturnCode_t const result = + this->get_data_i (reader, qc.in (), data, sample_info, max_samples); if (result == ::DDS::RETCODE_NO_DATA) { @@ -131,8 +114,8 @@ namespace CIAO else if (result != ::DDS::RETCODE_OK) { DDS4CCM_ERROR (DDS4CCM_LOG_LEVEL_ERROR, (LM_ERROR, DDS4CCM_INFO - ACE_TEXT ("DataReaderStateListener_T::on_data_available_i - ") - ACE_TEXT ("Unable to take data from data reader, ") + ACE_TEXT ("DataReaderStateListenerBase_T::on_data_available_i - ") + ACE_TEXT ("Unable to get data from data reader, ") ACE_TEXT ("error %C.\n"), translate_retcode (result))); } if (mode == ::CCM_DDS::ONE_BY_ONE) @@ -239,7 +222,7 @@ namespace CIAO if (retval != ::DDS::RETCODE_OK) { DDS4CCM_ERROR (DDS4CCM_LOG_LEVEL_ERROR, (LM_ERROR, DDS4CCM_INFO - ACE_TEXT ("DataReaderStateListener_T::on_data_available_i - ") + ACE_TEXT ("DataReaderStateListenerBase_T::on_data_available_i - ") ACE_TEXT ("Error returning loan to DDS - <%C>\n"), translate_retcode (retval))); // No exception here since this the DDS vendor doesn't expect this. @@ -251,26 +234,26 @@ namespace CIAO DDS4CCM_PRINT_DEBUG_CORBA_EXCEPTION ( DDS4CCM_LOG_LEVEL_ACTION, ex, - "DataReaderStateListener_T::on_data_available_i"); + "DataReaderStateListenerBase_T::on_data_available_i"); } catch (const ::CORBA::Exception& ex) { DDS4CCM_PRINT_CORBA_EXCEPTION ( DDS4CCM_LOG_LEVEL_ERROR, ex, - "DataReaderStateListener_T::on_data_available_i"); + "DataReaderStateListenerBase_T::on_data_available_i"); } catch (...) { DDS4CCM_ERROR (DDS4CCM_LOG_LEVEL_ERROR, (LM_ERROR, DDS4CCM_INFO - ACE_TEXT ("DataReaderStateListener_T::on_data_available_i - ") + ACE_TEXT ("DataReaderStateListenerBase_T::on_data_available_i - ") ACE_TEXT ("Unexpected exception caught\n"))); } } - template <typename CCM_TYPE, typename TYPED_DDS_READER, typename SEQ_TYPE> + template <typename CCM_TYPE, typename TYPED_DDS_READER, typename SEQ_TYPE, DDS4CCM_LISTENER_READ_TAKE LRT> ::DDS::StatusMask - DataReaderStateListener_T<CCM_TYPE, TYPED_DDS_READER, SEQ_TYPE>::get_mask ( + DataReaderStateListenerBase_T<CCM_TYPE, TYPED_DDS_READER, SEQ_TYPE, LRT>::get_mask ( typename CCM_TYPE::data_listener_type::_ptr_type listener) { ::DDS::StatusMask mask = 0; @@ -288,12 +271,76 @@ namespace CIAO ACE_CString msk; translate_statusmask (msk, mask); DDS4CCM_DEBUG (DDS4CCM_LOG_LEVEL_DDS_STATUS, (LM_DEBUG, DDS4CCM_INFO - "DataReaderStateListener_T::get_mask - " + "DataReaderStateListenerBase_T::get_mask - " "Mask becomes %C\n", msk.c_str ())); } return mask; } + + template <typename CCM_TYPE, typename TYPED_DDS_READER, typename SEQ_TYPE> + ::DDS::ReturnCode_t + DataReaderStateListener_T<CCM_TYPE, TYPED_DDS_READER, SEQ_TYPE, CIAO::DDS4CCM::DDS4CCM_TAKE>::get_data_i ( + typename TYPED_DDS_READER::_ptr_type reader, + ::DDS::QueryCondition_ptr qc, + SEQ_TYPE &data, + ::DDS::SampleInfoSeq &sample_info, + ::CORBA::Long max_samples) + { + DDS4CCM_TRACE ("DataReaderStateListener_T::get_data_i"); + + ::DDS::ReturnCode_t result = ::DDS::RETCODE_OK; + if (! ::CORBA::is_nil (qc)) + { + ::DDS::ReadCondition_var rd = ::DDS::ReadCondition::_narrow (qc); + result = reader->take_w_condition (data, + sample_info, + max_samples, + rd.in ()); + } + else + { + result = reader->take (data, + sample_info, + max_samples, + ::DDS::NOT_READ_SAMPLE_STATE, + ::DDS::NEW_VIEW_STATE | ::DDS::NOT_NEW_VIEW_STATE, + ::DDS::ANY_INSTANCE_STATE); + } + return result; + } + + template <typename CCM_TYPE, typename TYPED_DDS_READER, typename SEQ_TYPE> + ::DDS::ReturnCode_t + DataReaderStateListener_T<CCM_TYPE, TYPED_DDS_READER, SEQ_TYPE, CIAO::DDS4CCM::DDS4CCM_READ>::get_data_i ( + typename TYPED_DDS_READER::_ptr_type reader, + ::DDS::QueryCondition_ptr qc, + SEQ_TYPE &data, + ::DDS::SampleInfoSeq &sample_info, + ::CORBA::Long max_samples) + { + DDS4CCM_TRACE ("DataReaderStateListener_T::get_data_i"); + + ::DDS::ReturnCode_t result = ::DDS::RETCODE_OK; + if (! ::CORBA::is_nil (qc)) + { + ::DDS::ReadCondition_var rd = ::DDS::ReadCondition::_narrow (qc); + result = reader->read_w_condition (data, + sample_info, + max_samples, + rd.in ()); + } + else + { + result = reader->read (data, + sample_info, + max_samples, + ::DDS::NOT_READ_SAMPLE_STATE, + ::DDS::NEW_VIEW_STATE | ::DDS::NOT_NEW_VIEW_STATE, + ::DDS::ANY_INSTANCE_STATE); + } + return result; + } } } diff --git a/CIAO/connectors/dds4ccm/impl/DataReaderStateListener_T.h b/CIAO/connectors/dds4ccm/impl/DataReaderStateListener_T.h index 41ce5dcc658..0f67581114a 100644 --- a/CIAO/connectors/dds4ccm/impl/DataReaderStateListener_T.h +++ b/CIAO/connectors/dds4ccm/impl/DataReaderStateListener_T.h @@ -17,26 +17,36 @@ namespace CIAO { namespace DDS4CCM { - template <typename CCM_TYPE, typename TYPED_DDS_READER, typename SEQ_TYPE> - class DataReaderStateListener_T : + template <typename CCM_TYPE, typename TYPED_DDS_READER, typename SEQ_TYPE, DDS4CCM_LISTENER_READ_TAKE LRT> + class DataReaderStateListener_T; + + /** + * Template implementing the DDS4CCM DataReaderStateListener basic port + * @tparam CCM_TYPE Set of type traits for this basic port + * @tparam TYPED_DDS_READER The typed DDS DataReader type + * @tparam SEQ_TYPE The type of sequence + * @tparam LRT An enum indicating the semantics of this port + */ + template <typename CCM_TYPE, typename TYPED_DDS_READER, typename SEQ_TYPE, DDS4CCM_LISTENER_READ_TAKE LRT> + class DataReaderStateListenerBase_T : public PortStatusListener { - typedef DataReaderStateListener_T<CCM_TYPE, TYPED_DDS_READER, SEQ_TYPE> + typedef DataReaderStateListenerBase_T<CCM_TYPE, TYPED_DDS_READER, SEQ_TYPE, LRT> DataReaderStateListener_type; typedef ::CIAO::DDS4CCM::DataReaderHandler_T<DataReaderStateListener_type> DataReaderStateHandler_type; public: /// Constructor - DataReaderStateListener_T ( - typename CCM_TYPE::data_listener_type::_ptr_type listener, - ::CCM_DDS::PortStatusListener_ptr port_status_listener, - ::CCM_DDS::StateListenerControl_ptr control, - ACE_Reactor* reactor, - ConditionManager& condition_manager); + DataReaderStateListenerBase_T ( + typename CCM_TYPE::data_listener_type::_ptr_type listener, + ::CCM_DDS::PortStatusListener_ptr port_status_listener, + ::CCM_DDS::StateListenerControl_ptr control, + ACE_Reactor* reactor, + ConditionManager& condition_manager); /// Destructor - virtual ~DataReaderStateListener_T (void); + virtual ~DataReaderStateListenerBase_T (void); virtual void on_data_available (::DDS::DataReader_ptr rdr); @@ -46,10 +56,70 @@ namespace CIAO void on_data_available_i (::DDS::DataReader_ptr rdr); private: + /// Helper method to get data from DDS + virtual ::DDS::ReturnCode_t get_data_i ( + typename TYPED_DDS_READER::_ptr_type reader, + ::DDS::QueryCondition_ptr qc, + SEQ_TYPE &data, + ::DDS::SampleInfoSeq &sample_info, + ::CORBA::Long max_samples) = 0; + typename CCM_TYPE::data_listener_type::_var_type listener_; ::CCM_DDS::StateListenerControl_var control_; ConditionManager& condition_manager_; }; + + template <typename CCM_TYPE, typename TYPED_DDS_READER, typename SEQ_TYPE> + class DataReaderStateListener_T <CCM_TYPE, TYPED_DDS_READER, SEQ_TYPE, CIAO::DDS4CCM::DDS4CCM_TAKE> : + public DataReaderStateListenerBase_T <CCM_TYPE, TYPED_DDS_READER, SEQ_TYPE, CIAO::DDS4CCM::DDS4CCM_TAKE> + { + public: + /// Constructor + DataReaderStateListener_T ( + typename CCM_TYPE::data_listener_type::_ptr_type listener, + ::CCM_DDS::PortStatusListener_ptr port_status_listener, + ::CCM_DDS::StateListenerControl_ptr control, + ACE_Reactor* reactor, + ConditionManager& condition_manager) : + DataReaderStateListenerBase_T<CCM_TYPE, TYPED_DDS_READER, SEQ_TYPE, CIAO::DDS4CCM::DDS4CCM_TAKE> ( + listener, port_status_listener, control, reactor, condition_manager) + { + } + private: + /// Helper method to get data from DDS + virtual ::DDS::ReturnCode_t get_data_i ( + typename TYPED_DDS_READER::_ptr_type reader, + ::DDS::QueryCondition_ptr qc, + SEQ_TYPE &data, + ::DDS::SampleInfoSeq &sample_info, + ::CORBA::Long max_samples); + }; + + template <typename CCM_TYPE, typename TYPED_DDS_READER, typename SEQ_TYPE> + class DataReaderStateListener_T <CCM_TYPE, TYPED_DDS_READER, SEQ_TYPE, CIAO::DDS4CCM::DDS4CCM_READ> : + public DataReaderStateListenerBase_T <CCM_TYPE, TYPED_DDS_READER, SEQ_TYPE, CIAO::DDS4CCM::DDS4CCM_READ> + { + public: + /// Constructor + DataReaderStateListener_T ( + typename CCM_TYPE::data_listener_type::_ptr_type listener, + ::CCM_DDS::PortStatusListener_ptr port_status_listener, + ::CCM_DDS::StateListenerControl_ptr control, + ACE_Reactor* reactor, + ConditionManager& condition_manager) : + DataReaderStateListenerBase_T<CCM_TYPE, TYPED_DDS_READER, SEQ_TYPE, CIAO::DDS4CCM::DDS4CCM_READ> ( + listener, port_status_listener, control, reactor, condition_manager) + { + } + private: + /// Helper method to get data from DDS + virtual ::DDS::ReturnCode_t get_data_i ( + typename TYPED_DDS_READER::_ptr_type reader, + ::DDS::QueryCondition_ptr qc, + SEQ_TYPE &data, + ::DDS::SampleInfoSeq &sample_info, + ::CORBA::Long max_samples); + }; } } diff --git a/CIAO/connectors/dds4ccm/impl/Getter_T.h b/CIAO/connectors/dds4ccm/impl/Getter_T.h index 46592aee2c0..48ad0486374 100644 --- a/CIAO/connectors/dds4ccm/impl/Getter_T.h +++ b/CIAO/connectors/dds4ccm/impl/Getter_T.h @@ -124,7 +124,6 @@ namespace CIAO /** * @brief Implementation of the Getter port for variable sized data types. - * */ template <typename GETTER_TYPE, typename TYPED_DDS_READER, typename VALUE_TYPE, typename SEQ_VALUE_TYPE> class Getter_T <GETTER_TYPE, TYPED_DDS_READER, VALUE_TYPE, SEQ_VALUE_TYPE, false> : @@ -137,10 +136,9 @@ namespace CIAO * * Spec : get_one returns the next sample to be gotten. * - * Returns false when 'wait' times out or when no valid data could be read + * @retval false When 'wait' times out or when no valid data could be read * from DDS. - * Return true, when 'wait' is triggered. - * + * @retval true When 'wait' is triggered. */ virtual bool get_one ( typename VALUE_TYPE::_out_type an_instance, @@ -161,10 +159,9 @@ namespace CIAO * * Spec : get_one returns the next sample to be gotten. * - * Returns false when 'wait' times out or when no valid data could be read + * @retval false When 'wait' times out or when no valid data could be read * from DDS. - * Return true, when 'wait' is triggered. - * + * @retval true When 'wait' is triggered. */ virtual bool get_one ( typename VALUE_TYPE::_out_type an_instance, diff --git a/CIAO/connectors/dds4ccm/impl/LocalObject.h b/CIAO/connectors/dds4ccm/impl/LocalObject.h index 4449c19265a..bbaace729fc 100644 --- a/CIAO/connectors/dds4ccm/impl/LocalObject.h +++ b/CIAO/connectors/dds4ccm/impl/LocalObject.h @@ -11,6 +11,11 @@ namespace CIAO { namespace DDS4CCM { + /** + * Base class for all DDS4CCM local facets. It stores + * an object reference to the component this local + * facet belongs too. + */ class DDS4CCM_DDS_IMPL_Export LocalObject : public virtual ::CORBA::LocalObject { diff --git a/CIAO/connectors/dds4ccm/impl/Utils.h b/CIAO/connectors/dds4ccm/impl/Utils.h index 835b8b3694e..aa809067924 100644 --- a/CIAO/connectors/dds4ccm/impl/Utils.h +++ b/CIAO/connectors/dds4ccm/impl/Utils.h @@ -25,6 +25,20 @@ namespace CIAO { namespace DDS4CCM { + /** + * Enum controlling the semantics of the + * DDS4CCM listeners. + */ + enum DDS4CCM_LISTENER_READ_TAKE + { + /// Listener does a DDS read + DDS4CCM_READ, + /// Listener does a DDS take + DDS4CCM_TAKE + }; + + /// Helper method translating a DDS ReturnCode_t into + /// a readable string inline const char * translate_retcode (::DDS::ReturnCode_t ret) { #define DDS4CCM_RETCODE(X) case X: return #X diff --git a/CIAO/connectors/dds4ccm/tests/KeyedSamples/Sender/Keyed_Test_Sender_exec.cpp b/CIAO/connectors/dds4ccm/tests/KeyedSamples/Sender/Keyed_Test_Sender_exec.cpp index 3c4035abe57..26f86d157f9 100644 --- a/CIAO/connectors/dds4ccm/tests/KeyedSamples/Sender/Keyed_Test_Sender_exec.cpp +++ b/CIAO/connectors/dds4ccm/tests/KeyedSamples/Sender/Keyed_Test_Sender_exec.cpp @@ -1,30 +1,6 @@ // -*- C++ -*- // $Id$ -/** - * Code generated by the The ACE ORB (TAO) IDL Compiler v1.8.3 - * TAO and the TAO IDL Compiler have been developed by: - * Center for Distributed Object Computing - * Washington University - * St. Louis, MO - * USA - * http://www.cs.wustl.edu/~schmidt/doc-center.html - * and - * Distributed Object Computing Laboratory - * University of California at Irvine - * Irvine, CA - * USA - * and - * Institute for Software Integrated Systems - * Vanderbilt University - * Nashville, TN - * USA - * http://www.isis.vanderbilt.edu/ - * - * Information about TAO is available at: - * http://www.cs.wustl.edu/~schmidt/TAO.html - **/ - #include "Keyed_Test_Sender_exec.h" #include "tao/ORB_Core.h" #include "ace/Reactor.h" @@ -113,6 +89,7 @@ namespace CIAO_Keyed_Test_Sender_Impl : rate_ (1) , keys_ (5) , iterations_ (10) + , ready_to_start_ (false) { ACE_NEW_THROW_EX (this->ticker_, pulse_Generator (*this), @@ -204,6 +181,7 @@ namespace CIAO_Keyed_Test_Sender_Impl void Sender_exec_i::get_started (void) { + this->ready_to_start_ = true; this->start (); ACE_GUARD_THROW_EX (TAO_SYNCH_MUTEX, _guard, @@ -226,6 +204,12 @@ namespace CIAO_Keyed_Test_Sender_Impl Sender_exec_i::stop (void) { this->reactor ()->cancel_timer (this->ticker_); + + if (!this->ready_to_start_.value()) + { + ACE_ERROR ((LM_ERROR, ACE_TEXT ("Sender_exec_i::stop - ") + ACE_TEXT ("ERROR Sender never got ready to start\n"))); + } } // Component attributes and port operations. diff --git a/CIAO/connectors/dds4ccm/tests/KeyedSamples/Sender/Keyed_Test_Sender_exec.h b/CIAO/connectors/dds4ccm/tests/KeyedSamples/Sender/Keyed_Test_Sender_exec.h index 09afa3d3157..9390b0ae894 100644 --- a/CIAO/connectors/dds4ccm/tests/KeyedSamples/Sender/Keyed_Test_Sender_exec.h +++ b/CIAO/connectors/dds4ccm/tests/KeyedSamples/Sender/Keyed_Test_Sender_exec.h @@ -1,29 +1,6 @@ // -*- C++ -*- // $Id$ -/** - * Code generated by the The ACE ORB (TAO) IDL Compiler v1.8.3 - * TAO and the TAO IDL Compiler have been developed by: - * Center for Distributed Object Computing - * Washington University - * St. Louis, MO - * USA - * http://www.cs.wustl.edu/~schmidt/doc-center.html - * and - * Distributed Object Computing Laboratory - * University of California at Irvine - * Irvine, CA - * USA - * and - * Institute for Software Integrated Systems - * Vanderbilt University - * Nashville, TN - * USA - * http://www.isis.vanderbilt.edu/ - * - * Information about TAO is available at: - * http://www.cs.wustl.edu/~schmidt/TAO.html - **/ #ifndef CIAO_KEYED_TEST_SENDER_EXEC_LVAFIH_H_ #define CIAO_KEYED_TEST_SENDER_EXEC_LVAFIH_H_ @@ -42,6 +19,7 @@ namespace CIAO_Keyed_Test_Sender_Impl { + typedef ACE_Atomic_Op <TAO_SYNCH_MUTEX, CORBA::Boolean > Atomic_Boolean; class Sender_exec_i; /** @@ -169,7 +147,7 @@ namespace CIAO_Keyed_Test_Sender_Impl Keyed_Test_Table ktests_; Keyed_Test_Table::iterator last_key_; - + Atomic_Boolean ready_to_start_; //@} //@{ diff --git a/CIAO/connectors/dds4ccm/tests/ListenManyByMany/Receiver/LMBM_Test_Receiver_exec.cpp b/CIAO/connectors/dds4ccm/tests/ListenManyByMany/Receiver/LMBM_Test_Receiver_exec.cpp index b2b8dffeb42..738d45c2020 100644 --- a/CIAO/connectors/dds4ccm/tests/ListenManyByMany/Receiver/LMBM_Test_Receiver_exec.cpp +++ b/CIAO/connectors/dds4ccm/tests/ListenManyByMany/Receiver/LMBM_Test_Receiver_exec.cpp @@ -11,10 +11,14 @@ namespace CIAO_LMBM_Test_Receiver_Impl // ListenManyByManyTest_Listener_exec_i //============================================================ ListenManyByManyTest_Listener_exec_i::ListenManyByManyTest_Listener_exec_i ( - Atomic_ULong &received_one_by_one, - Atomic_ULong &received_many_by_many) - : received_one_by_one_ (received_one_by_one), - received_many_by_many_ (received_many_by_many) + ::LMBM_Test::CCM_Receiver_Context_ptr context, + Atomic_ULong &received_one_by_one, + Atomic_ULong &received_many_by_many, + Atomic_ULong &samples_read) + : context_ (::LMBM_Test::CCM_Receiver_Context::_duplicate (context)), + received_one_by_one_ (received_one_by_one), + received_many_by_many_ (received_many_by_many), + samples_read_ (samples_read) { } @@ -38,19 +42,19 @@ namespace CIAO_LMBM_Test_Receiver_Impl { if (an_instance.length () == 0) { - ACE_ERROR ((LM_ERROR, "ERROR: ListenManyByManyTest_Listener_exec_i::on_many_data:" - "instance sequence length is nil\n")); + ACE_ERROR ((LM_ERROR, "ERROR: ListenManyByManyTest_Listener_exec_i::on_many_data: " + "instance sequence length is nil\n")); return; } for (CORBA::ULong i = 0 ; i < info.length(); ++i) { - ACE_DEBUG ((LM_DEBUG, "ListenManyByManyTest_Listener_exec_i::on_many_data:" + ACE_DEBUG ((LM_DEBUG, "ListenManyByManyTest_Listener_exec_i::on_many_data: " "key <%C> - iteration <%d>\n", an_instance[i].key.in (), an_instance[i].iteration)); if (info[i].instance_handle == ::DDS::HANDLE_NIL) { - ACE_ERROR ((LM_ERROR, "ERROR: ListenManyByManyTest_Listener_exec_i::on_many_data:" + ACE_ERROR ((LM_ERROR, "ERROR: ListenManyByManyTest_Listener_exec_i::on_many_data: " "instance handle %d seems to be nil" "key <%C> - iteration <%d>\n", i, @@ -60,7 +64,7 @@ namespace CIAO_LMBM_Test_Receiver_Impl if (info[i].source_timestamp.sec == 0 && info[i].source_timestamp.nanosec == 0) { - ACE_ERROR ((LM_ERROR, "ERROR: ListenManyByManyTest_Listener_exec_i::on_one_data: " + ACE_ERROR ((LM_ERROR, "ERROR: ListenManyByManyTest_Listener_exec_i::on_many_data: " "source timestamp seems to be invalid (nil) " "key <%C> - iteration <%d>\n", an_instance[i].key.in (), @@ -68,6 +72,30 @@ namespace CIAO_LMBM_Test_Receiver_Impl } } this->received_many_by_many_ += an_instance.length (); + try + { + ::LMBM_Test::ListenManyByManyTestConnector::Reader_var reader = + this->context_->get_connection_info_listen_data (); + if (::CORBA::is_nil (reader.in ())) + { + ACE_ERROR ((LM_ERROR, "ListenManyByManyTest_Listener_exec_i::on_many_data - " + "ERROR: Reader seems nil\n")); + } + ::ListenManyByManyTestSeq seq; + ::CCM_DDS::ReadInfoSeq infos; + reader->read_all (seq, infos); + this->samples_read_ += seq.length (); + + ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("ListenManyByManyTest_Listener_exec_i::on_many_data - ") + ACE_TEXT ("Read <%u> samples\n"), + seq.length ())); + } + catch (const CCM_DDS::InternalError& ex) + { + ACE_ERROR ((LM_ERROR, ACE_TEXT ("ERROR: Internal Error ") + ACE_TEXT ("when using reader->read_all: index <%d> - retval <%d>\n"), + ex.index, ex.error_code)); + } } //============================================================ @@ -76,6 +104,7 @@ namespace CIAO_LMBM_Test_Receiver_Impl Receiver_exec_i::Receiver_exec_i (void) : received_one_by_one_ (0), received_many_by_many_ (0), + samples_read_ (0), iterations_ (10), keys_ (5) { @@ -97,8 +126,10 @@ namespace CIAO_LMBM_Test_Receiver_Impl Receiver_exec_i::get_info_listen_data_listener (void) { return new ListenManyByManyTest_Listener_exec_i ( - this->received_one_by_one_, - this->received_many_by_many_); + this->context_.in (), + this->received_one_by_one_, + this->received_many_by_many_, + this->samples_read_); } ::CCM_DDS::CCM_PortStatusListener_ptr @@ -204,6 +235,15 @@ namespace CIAO_LMBM_Test_Receiver_Impl "many_by_many callback. " "Test passed!\n")); } + if (this->samples_read_.value () == 0) + { + ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("OK: read no samples\n"))); + } + else + { + ACE_ERROR ((LM_ERROR, ACE_TEXT ("ERROR: read <%u> samples\n"), + this->samples_read_.value ())); + } } extern "C" RECEIVER_EXEC_Export ::Components::EnterpriseComponent_ptr diff --git a/CIAO/connectors/dds4ccm/tests/ListenManyByMany/Receiver/LMBM_Test_Receiver_exec.h b/CIAO/connectors/dds4ccm/tests/ListenManyByMany/Receiver/LMBM_Test_Receiver_exec.h index 74a7d84572f..86261f0de2a 100644 --- a/CIAO/connectors/dds4ccm/tests/ListenManyByMany/Receiver/LMBM_Test_Receiver_exec.h +++ b/CIAO/connectors/dds4ccm/tests/ListenManyByMany/Receiver/LMBM_Test_Receiver_exec.h @@ -30,8 +30,10 @@ namespace CIAO_LMBM_Test_Receiver_Impl { public: ListenManyByManyTest_Listener_exec_i ( - Atomic_ULong &received_one_by_one, - Atomic_ULong &received_many_by_many); + ::LMBM_Test::CCM_Receiver_Context_ptr context, + Atomic_ULong &received_one_by_one, + Atomic_ULong &received_many_by_many, + Atomic_ULong &samples_read_); virtual ~ListenManyByManyTest_Listener_exec_i (void); virtual void @@ -43,8 +45,10 @@ namespace CIAO_LMBM_Test_Receiver_Impl const ListenManyByManyTestSeq & an_instance, const ::CCM_DDS::ReadInfoSeq & info); private: + ::LMBM_Test::CCM_Receiver_Context_var context_; Atomic_ULong &received_one_by_one_; Atomic_ULong &received_many_by_many_; + Atomic_ULong &samples_read_; }; //============================================================ @@ -92,6 +96,7 @@ namespace CIAO_LMBM_Test_Receiver_Impl Atomic_ULong received_one_by_one_; Atomic_ULong received_many_by_many_; + Atomic_ULong samples_read_; CORBA::UShort iterations_; CORBA::UShort keys_; diff --git a/CIAO/connectors/dds4ccm/tests/ListenManyByMany/descriptors/Plan.cdp b/CIAO/connectors/dds4ccm/tests/ListenManyByMany/descriptors/Plan.cdp index 3113997f546..d529ec16667 100644 --- a/CIAO/connectors/dds4ccm/tests/ListenManyByMany/descriptors/Plan.cdp +++ b/CIAO/connectors/dds4ccm/tests/ListenManyByMany/descriptors/Plan.cdp @@ -285,6 +285,26 @@ <resourceType>Local_Interface</resourceType> </deployRequirement> <internalEndpoint> + <portName>info_listen_data</portName> + <provider>false</provider> + <kind>SimplexReceptacle</kind> + <instance xmi:idref="ReceiverComponentInstance" /> + </internalEndpoint> + <internalEndpoint> + <portName>push_consumer_data</portName> + <provider>true</provider> + <kind>Facet</kind> + <instance xmi:idref="LMBM_Test_ConnectorComponentInstance2" /> + </internalEndpoint> + </connection> + + <connection> + <name>info_listen_data_listener</name> + <deployRequirement> + <name>edu.dre.vanderbilt.DAnCE.ConnectionType</name> + <resourceType>Local_Interface</resourceType> + </deployRequirement> + <internalEndpoint> <portName>info_listen_data_listener</portName> <provider>true</provider> <kind>Facet</kind> diff --git a/CIAO/connectors/dds4ccm/tests/MultiTopic/Connector/MultiTopic_Connector_T.h b/CIAO/connectors/dds4ccm/tests/MultiTopic/Connector/MultiTopic_Connector_T.h index 742e45bf8e8..5b8ba748238 100644 --- a/CIAO/connectors/dds4ccm/tests/MultiTopic/Connector/MultiTopic_Connector_T.h +++ b/CIAO/connectors/dds4ccm/tests/MultiTopic/Connector/MultiTopic_Connector_T.h @@ -256,10 +256,11 @@ private: typename CCM_TYPE::push_consumer_cl_traits, typename DDS_TYPE::typed_reader_type, typename DDS_TYPE::value_type, - SEQ_TYPE> + SEQ_TYPE, + CIAO::DDS4CCM::DDS4CCM_TAKE> push_consumer_cl_; - //connection to the receiver implementation + /// Connection to the receiver implementation typename CCM_TYPE::push_consumer_cl_traits::data_listener_type::_var_type dl_; CORBA::String_var topic_name_sq_; diff --git a/CIAO/connectors/dds4ccm/tests/MultipleTemp/Sender/MultipleTemp_Sender_exec.cpp b/CIAO/connectors/dds4ccm/tests/MultipleTemp/Sender/MultipleTemp_Sender_exec.cpp index 29eaa54ddda..6b649b5256a 100644 --- a/CIAO/connectors/dds4ccm/tests/MultipleTemp/Sender/MultipleTemp_Sender_exec.cpp +++ b/CIAO/connectors/dds4ccm/tests/MultipleTemp/Sender/MultipleTemp_Sender_exec.cpp @@ -189,7 +189,7 @@ namespace CIAO_MultipleTemp_Sender_Impl catch (const CCM_DDS::InternalError& ex) { ACE_ERROR ((LM_ERROR, ACE_TEXT ("ERROR: Internal Error ") - ACE_TEXT ("whit update_many: index <%d> - retval <%d>\n"), + ACE_TEXT ("with update_many: index <%d> - retval <%d>\n"), ex.index, ex.error_code)); result = false; } diff --git a/CIAO/connectors/dds4ccm/tests/SLDisabled/Sender/SL_Disabled_Sender_exec.cpp b/CIAO/connectors/dds4ccm/tests/SLDisabled/Sender/SL_Disabled_Sender_exec.cpp index daf84e16bbd..43e6c01ed5b 100644 --- a/CIAO/connectors/dds4ccm/tests/SLDisabled/Sender/SL_Disabled_Sender_exec.cpp +++ b/CIAO/connectors/dds4ccm/tests/SLDisabled/Sender/SL_Disabled_Sender_exec.cpp @@ -309,7 +309,7 @@ namespace CIAO_SL_Disabled_Sender_Impl catch (const CCM_DDS::InternalError& ex) { ACE_ERROR ((LM_ERROR, ACE_TEXT ("ERROR: Internal Error ") - ACE_TEXT ("whit update_many: index <%d> - retval <%d>\n"), + ACE_TEXT ("with update_many: index <%d> - retval <%d>\n"), ex.index, ex.error_code)); return false; } diff --git a/CIAO/connectors/dds4ccm/tests/SLManyByMany/Receiver/SL_ManyByMany_Receiver_exec.cpp b/CIAO/connectors/dds4ccm/tests/SLManyByMany/Receiver/SL_ManyByMany_Receiver_exec.cpp index 66981b6442a..40219975a82 100644 --- a/CIAO/connectors/dds4ccm/tests/SLManyByMany/Receiver/SL_ManyByMany_Receiver_exec.cpp +++ b/CIAO/connectors/dds4ccm/tests/SLManyByMany/Receiver/SL_ManyByMany_Receiver_exec.cpp @@ -1,30 +1,6 @@ // -*- C++ -*- // $Id$ -/** - * Code generated by the The ACE ORB (TAO) IDL Compiler v1.8.3 - * TAO and the TAO IDL Compiler have been developed by: - * Center for Distributed Object Computing - * Washington University - * St. Louis, MO - * USA - * http://www.cs.wustl.edu/~schmidt/doc-center.html - * and - * Distributed Object Computing Laboratory - * University of California at Irvine - * Irvine, CA - * USA - * and - * Institute for Software Integrated Systems - * Vanderbilt University - * Nashville, TN - * USA - * http://www.isis.vanderbilt.edu/ - * - * Information about TAO is available at: - * http://www.cs.wustl.edu/~schmidt/TAO.html - **/ - #include "SL_ManyByMany_Receiver_exec.h" #include "tao/ORB_Core.h" #include "ace/Reactor.h" @@ -71,7 +47,8 @@ namespace CIAO_SL_ManyByMany_Receiver_Impl Atomic_Long &on_many_upd_trigger, Atomic_Long &on_deletion, Atomic_Bool &create_data, - Atomic_Bool &update_data) + Atomic_Bool &update_data, + Atomic_Long &samples_read) : ciao_context_ ( ::SL_ManyByMany::CCM_Receiver_Context::_duplicate (ctx)) , no_operation_ (no_operation) @@ -81,6 +58,7 @@ namespace CIAO_SL_ManyByMany_Receiver_Impl , on_deletion_ (on_deletion) , create_data_ (create_data) , update_data_ (update_data) + , samples_read_ (samples_read) { } @@ -165,6 +143,36 @@ namespace CIAO_SL_ManyByMany_Receiver_Impl this->update_data_ = true; } } + + // When we have received all updates, check if there is + // something left in the cache + if(this->on_many_update_.value () == ON_MANY_EXPECTED) + { + try + { + SL_ManyByMany::SLManyByManyConnector::Reader_var reader = + this->ciao_context_->get_connection_info_out_data (); + if (::CORBA::is_nil (reader.in ())) + { + ACE_ERROR ((LM_ERROR, "info_out_data_listener_exec_i::on_many_updates - " + "ERROR: Reader seems nil\n")); + } + ::TestTopicSeq seq; + ::CCM_DDS::ReadInfoSeq infos; + reader->read_all (seq, infos); + this->samples_read_ += seq.length (); + + ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("info_out_data_listener_exec_i::on_many_updates - ") + ACE_TEXT ("Read <%u> samples\n"), + seq.length ())); + } + catch (const CCM_DDS::InternalError& ex) + { + ACE_ERROR ((LM_ERROR, ACE_TEXT ("ERROR: Internal Error ") + ACE_TEXT ("when using reader->read_all: index <%d> - retval <%d>\n"), + ex.index, ex.error_code)); + } + } } void @@ -259,6 +267,7 @@ namespace CIAO_SL_ManyByMany_Receiver_Impl , create_data_ (false) , update_data_ (false) , reader_data_ (0) + , samples_read_ (0) { ACE_NEW_THROW_EX (this->ticker_, read_action_Generator (*this), @@ -351,7 +360,8 @@ namespace CIAO_SL_ManyByMany_Receiver_Impl this->on_many_upd_trigger_, this->on_deletion_, this->create_data_, - this->update_data_), + this->update_data_, + this->samples_read_), ::SL_ManyByMany::SLManyByManyConnector::CCM_StateListener::_nil ()); this->ciao_info_out_data_listener_ = tmp; @@ -518,7 +528,7 @@ namespace CIAO_SL_ManyByMany_Receiver_Impl { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("OK: did receive the expected ") ACE_TEXT ("number of 'on_many_update' samples: ") - ACE_TEXT ("expected <%d> - received <%d>,") + ACE_TEXT ("expected <%d> - received <%d>, ") ACE_TEXT ("on_many_update triggered at <%d> times.\n"), ON_MANY_EXPECTED, this->on_many_update_.value (), @@ -578,6 +588,15 @@ namespace CIAO_SL_ManyByMany_Receiver_Impl this->reader_data_.value () )); } + if (this->samples_read_.value () == 0) + { + ACE_ERROR ((LM_ERROR, ACE_TEXT ("ERROR: read no samples\n"))); + } + else + { + ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("OK: read <%u> samples\n"), + this->samples_read_.value ())); + } } extern "C" RECEIVER_EXEC_Export ::Components::EnterpriseComponent_ptr diff --git a/CIAO/connectors/dds4ccm/tests/SLManyByMany/Receiver/SL_ManyByMany_Receiver_exec.h b/CIAO/connectors/dds4ccm/tests/SLManyByMany/Receiver/SL_ManyByMany_Receiver_exec.h index 5674cabf36d..1284d89d6c3 100644 --- a/CIAO/connectors/dds4ccm/tests/SLManyByMany/Receiver/SL_ManyByMany_Receiver_exec.h +++ b/CIAO/connectors/dds4ccm/tests/SLManyByMany/Receiver/SL_ManyByMany_Receiver_exec.h @@ -1,29 +1,6 @@ // -*- C++ -*- // $Id$ -/** - * Code generated by the The ACE ORB (TAO) IDL Compiler v1.8.3 - * TAO and the TAO IDL Compiler have been developed by: - * Center for Distributed Object Computing - * Washington University - * St. Louis, MO - * USA - * http://www.cs.wustl.edu/~schmidt/doc-center.html - * and - * Distributed Object Computing Laboratory - * University of California at Irvine - * Irvine, CA - * USA - * and - * Institute for Software Integrated Systems - * Vanderbilt University - * Nashville, TN - * USA - * http://www.isis.vanderbilt.edu/ - * - * Information about TAO is available at: - * http://www.cs.wustl.edu/~schmidt/TAO.html - **/ #ifndef CIAO_SL_MANYBYMANY_RECEIVER_EXEC_4U4QJF_H_ #define CIAO_SL_MANYBYMANY_RECEIVER_EXEC_4U4QJF_H_ @@ -84,7 +61,8 @@ namespace CIAO_SL_ManyByMany_Receiver_Impl Atomic_Long &on_many_upd_trigger, Atomic_Long &on_deletion, Atomic_Bool &create_data, - Atomic_Bool &update_data); + Atomic_Bool &update_data, + Atomic_Long &samples_read); virtual ~info_out_data_listener_exec_i (void); //@{ @@ -116,6 +94,7 @@ namespace CIAO_SL_ManyByMany_Receiver_Impl Atomic_Long &on_deletion_; Atomic_Bool &create_data_; Atomic_Bool &update_data_; + Atomic_Long &samples_read_; }; /** @@ -210,7 +189,7 @@ namespace CIAO_SL_ManyByMany_Receiver_Impl Atomic_Bool create_data_; Atomic_Bool update_data_; Atomic_Long reader_data_; - + Atomic_Long samples_read_; //@} //@{ diff --git a/CIAO/connectors/dds4ccm/tests/SLOneByOne/Receiver/SL_OneByOne_Receiver_exec.cpp b/CIAO/connectors/dds4ccm/tests/SLOneByOne/Receiver/SL_OneByOne_Receiver_exec.cpp index d0c89b3fe80..e3ba1e99626 100644 --- a/CIAO/connectors/dds4ccm/tests/SLOneByOne/Receiver/SL_OneByOne_Receiver_exec.cpp +++ b/CIAO/connectors/dds4ccm/tests/SLOneByOne/Receiver/SL_OneByOne_Receiver_exec.cpp @@ -1,32 +1,7 @@ // -*- C++ -*- // $Id$ -/** - * Code generated by the The ACE ORB (TAO) IDL Compiler v1.8.3 - * TAO and the TAO IDL Compiler have been developed by: - * Center for Distributed Object Computing - * Washington University - * St. Louis, MO - * USA - * http://www.cs.wustl.edu/~schmidt/doc-center.html - * and - * Distributed Object Computing Laboratory - * University of California at Irvine - * Irvine, CA - * USA - * and - * Institute for Software Integrated Systems - * Vanderbilt University - * Nashville, TN - * USA - * http://www.isis.vanderbilt.edu/ - * - * Information about TAO is available at: - * http://www.cs.wustl.edu/~schmidt/TAO.html - **/ - #include "SL_OneByOne_Receiver_exec.h" - #include "dds4ccm/impl/dds4ccm_conf.h" #define ON_CREATION_EXPECTED 4 @@ -39,13 +14,13 @@ namespace CIAO_SL_OneByOne_Receiver_Impl /** * Facet Executor Implementation Class: info_out_data_listener_exec_i */ - info_out_data_listener_exec_i::info_out_data_listener_exec_i ( ::SL_OneByOne::CCM_Receiver_Context_ptr ctx, Atomic_Long &on_many_updates, Atomic_Long &on_creation, Atomic_Long &on_one_update, Atomic_Long &on_deletion, + Atomic_Long &samples_read, ACE_Thread_ID &thread_id) : ciao_context_ ( ::SL_OneByOne::CCM_Receiver_Context::_duplicate (ctx)) @@ -53,6 +28,7 @@ namespace CIAO_SL_OneByOne_Receiver_Impl , on_creation_ (on_creation) , on_one_update_ (on_one_update) , on_deletion_ (on_deletion) + , samples_read_ (samples_read) , thread_id_ (thread_id) { } @@ -104,6 +80,36 @@ namespace CIAO_SL_OneByOne_Receiver_Impl { ++this->on_one_update_; } + + // When we have received all updates, check if there is something + // left in the cache + if (this->on_one_update_.value () == ON_ONE_UPDATE_EXPECTED) + { + try + { + ::SL_OneByOne::SL_OneByOneConnector::Reader_var reader = + this->ciao_context_->get_connection_info_out_data (); + if (::CORBA::is_nil (reader.in ())) + { + ACE_ERROR ((LM_ERROR, "info_out_data_listener_exec_i::on_one_update - " + "ERROR: Reader seems nil\n")); + } + ::TestTopicSeq seq; + ::CCM_DDS::ReadInfoSeq infos; + reader->read_all (seq, infos); + this->samples_read_ += seq.length (); + + ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("StateListener_exec_i::on_one_update - ") + ACE_TEXT ("Read <%u> samples\n"), + seq.length ())); + } + catch (const CCM_DDS::InternalError& ex) + { + ACE_ERROR ((LM_ERROR, ACE_TEXT ("ERROR: Internal Error ") + ACE_TEXT ("when using reader->read_all: index <%d> - retval <%d>\n"), + ex.index, ex.error_code)); + } + } } void @@ -200,6 +206,7 @@ namespace CIAO_SL_OneByOne_Receiver_Impl on_creation_ (0), on_one_update_ (0), on_deletion_ (0), + samples_read_ (0), thread_id_listener_ (0, 0) { } @@ -226,6 +233,7 @@ namespace CIAO_SL_OneByOne_Receiver_Impl this->on_creation_, this->on_one_update_, this->on_deletion_, + this->samples_read_, this->thread_id_listener_), ::SL_OneByOne::SL_OneByOneConnector::CCM_StateListener::_nil ()); @@ -295,7 +303,6 @@ namespace CIAO_SL_OneByOne_Receiver_Impl void Receiver_exec_i::ccm_passivate (void) { - /* Your code here. */ } void @@ -373,6 +380,15 @@ namespace CIAO_SL_OneByOne_Receiver_Impl ON_DELETION_EXPECTED, this->on_deletion_.value ())); } + if (this->samples_read_.value () == 0) + { + ACE_ERROR ((LM_ERROR, ACE_TEXT ("ERROR: read no samples\n"))); + } + else + { + ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("OK: read <%u> samples\n"), + this->samples_read_.value ())); + } char ccm_buf [65]; ACE_Thread_ID ccm_thread_id; diff --git a/CIAO/connectors/dds4ccm/tests/SLOneByOne/Receiver/SL_OneByOne_Receiver_exec.h b/CIAO/connectors/dds4ccm/tests/SLOneByOne/Receiver/SL_OneByOne_Receiver_exec.h index df1f04ce71c..74017d49a28 100644 --- a/CIAO/connectors/dds4ccm/tests/SLOneByOne/Receiver/SL_OneByOne_Receiver_exec.h +++ b/CIAO/connectors/dds4ccm/tests/SLOneByOne/Receiver/SL_OneByOne_Receiver_exec.h @@ -1,29 +1,6 @@ // -*- C++ -*- // $Id$ -/** - * Code generated by the The ACE ORB (TAO) IDL Compiler v1.8.3 - * TAO and the TAO IDL Compiler have been developed by: - * Center for Distributed Object Computing - * Washington University - * St. Louis, MO - * USA - * http://www.cs.wustl.edu/~schmidt/doc-center.html - * and - * Distributed Object Computing Laboratory - * University of California at Irvine - * Irvine, CA - * USA - * and - * Institute for Software Integrated Systems - * Vanderbilt University - * Nashville, TN - * USA - * http://www.isis.vanderbilt.edu/ - * - * Information about TAO is available at: - * http://www.cs.wustl.edu/~schmidt/TAO.html - **/ #ifndef CIAO_SL_ONEBYONE_RECEIVER_EXEC_TKIVXN_H_ #define CIAO_SL_ONEBYONE_RECEIVER_EXEC_TKIVXN_H_ @@ -59,6 +36,7 @@ namespace CIAO_SL_OneByOne_Receiver_Impl Atomic_Long &, Atomic_Long &, Atomic_Long &, + Atomic_Long &, ACE_Thread_ID &); virtual ~info_out_data_listener_exec_i (void); @@ -88,6 +66,7 @@ namespace CIAO_SL_OneByOne_Receiver_Impl Atomic_Long &on_creation_; Atomic_Long &on_one_update_; Atomic_Long &on_deletion_; + Atomic_Long &samples_read_; ACE_Thread_ID &thread_id_; }; @@ -183,6 +162,7 @@ namespace CIAO_SL_OneByOne_Receiver_Impl Atomic_Long on_creation_; Atomic_Long on_one_update_; Atomic_Long on_deletion_; + Atomic_Long samples_read_; ACE_Thread_ID thread_id_listener_; //@} diff --git a/CIAO/connectors/dds4ccm/tests/SLOneByOne/Sender/SL_OneByOne_Sender_exec.cpp b/CIAO/connectors/dds4ccm/tests/SLOneByOne/Sender/SL_OneByOne_Sender_exec.cpp index 28394253186..58b7c44e957 100644 --- a/CIAO/connectors/dds4ccm/tests/SLOneByOne/Sender/SL_OneByOne_Sender_exec.cpp +++ b/CIAO/connectors/dds4ccm/tests/SLOneByOne/Sender/SL_OneByOne_Sender_exec.cpp @@ -1,41 +1,15 @@ // -*- C++ -*- // $Id$ -/** - * Code generated by the The ACE ORB (TAO) IDL Compiler v1.8.3 - * TAO and the TAO IDL Compiler have been developed by: - * Center for Distributed Object Computing - * Washington University - * St. Louis, MO - * USA - * http://www.cs.wustl.edu/~schmidt/doc-center.html - * and - * Distributed Object Computing Laboratory - * University of California at Irvine - * Irvine, CA - * USA - * and - * Institute for Software Integrated Systems - * Vanderbilt University - * Nashville, TN - * USA - * http://www.isis.vanderbilt.edu/ - * - * Information about TAO is available at: - * http://www.cs.wustl.edu/~schmidt/TAO.html - **/ - #include "SL_OneByOne_Sender_exec.h" #include "tao/ORB_Core.h" #include "ace/Reactor.h" namespace CIAO_SL_OneByOne_Sender_Impl { - /** * Write action generator */ - pulse_Generator::pulse_Generator (Sender_exec_i &callback) : pulse_callback_ (callback) { @@ -107,7 +81,6 @@ namespace CIAO_SL_OneByOne_Sender_Impl /** * Component Executor Implementation Class: Sender_exec_i */ - Sender_exec_i::Sender_exec_i (void) : test_nr_(UPDATE_CREATE), test_ok_(true) @@ -312,7 +285,7 @@ namespace CIAO_SL_OneByOne_Sender_Impl catch (const CCM_DDS::InternalError& ex) { ACE_ERROR ((LM_ERROR, ACE_TEXT ("ERROR: Internal Error ") - ACE_TEXT ("whit update_many: index <%d> - retval <%d>\n"), + ACE_TEXT ("with update_many: index <%d> - retval <%d>\n"), ex.index, ex.error_code)); return false; } diff --git a/CIAO/connectors/dds4ccm/tests/SLOneByOne/Sender/SL_OneByOne_Sender_exec.h b/CIAO/connectors/dds4ccm/tests/SLOneByOne/Sender/SL_OneByOne_Sender_exec.h index 8098258b587..4b0cb743934 100644 --- a/CIAO/connectors/dds4ccm/tests/SLOneByOne/Sender/SL_OneByOne_Sender_exec.h +++ b/CIAO/connectors/dds4ccm/tests/SLOneByOne/Sender/SL_OneByOne_Sender_exec.h @@ -1,29 +1,6 @@ // -*- C++ -*- // $Id$ -/** - * Code generated by the The ACE ORB (TAO) IDL Compiler v1.8.3 - * TAO and the TAO IDL Compiler have been developed by: - * Center for Distributed Object Computing - * Washington University - * St. Louis, MO - * USA - * http://www.cs.wustl.edu/~schmidt/doc-center.html - * and - * Distributed Object Computing Laboratory - * University of California at Irvine - * Irvine, CA - * USA - * and - * Institute for Software Integrated Systems - * Vanderbilt University - * Nashville, TN - * USA - * http://www.isis.vanderbilt.edu/ - * - * Information about TAO is available at: - * http://www.cs.wustl.edu/~schmidt/TAO.html - **/ #ifndef CIAO_SL_ONEBYONE_SENDER_EXEC_FHYBU2_H_ #define CIAO_SL_ONEBYONE_SENDER_EXEC_FHYBU2_H_ diff --git a/CIAO/connectors/dds4ccm/tests/Updater/Sender/Updater_Sender_exec.cpp b/CIAO/connectors/dds4ccm/tests/Updater/Sender/Updater_Sender_exec.cpp index 734c3955e33..7fde39d9f1f 100644 --- a/CIAO/connectors/dds4ccm/tests/Updater/Sender/Updater_Sender_exec.cpp +++ b/CIAO/connectors/dds4ccm/tests/Updater/Sender/Updater_Sender_exec.cpp @@ -461,7 +461,7 @@ namespace CIAO_Updater_Sender_Impl catch (const CCM_DDS::InternalError& ex) { ACE_ERROR ((LM_ERROR, ACE_TEXT ("ERROR: Internal Error ") - ACE_TEXT ("whit update_many: index <%d> - retval <%d>\n"), + ACE_TEXT ("with update_many: index <%d> - retval <%d>\n"), ex.index, ex.error_code)); return false; } |