summaryrefslogtreecommitdiff
path: root/CIAO/connectors/dds4ccm/impl/ndds/Subscriber.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'CIAO/connectors/dds4ccm/impl/ndds/Subscriber.cpp')
-rw-r--r--CIAO/connectors/dds4ccm/impl/ndds/Subscriber.cpp531
1 files changed, 531 insertions, 0 deletions
diff --git a/CIAO/connectors/dds4ccm/impl/ndds/Subscriber.cpp b/CIAO/connectors/dds4ccm/impl/ndds/Subscriber.cpp
new file mode 100644
index 00000000000..41ea04b1552
--- /dev/null
+++ b/CIAO/connectors/dds4ccm/impl/ndds/Subscriber.cpp
@@ -0,0 +1,531 @@
+// $Id$
+
+#include "Subscriber.h"
+#include "SubscriberListener.h"
+#include "Topic.h"
+#include "ContentFilteredTopic.h"
+#include "DataReader.h"
+#include "DataReaderListener.h"
+#include "Utils.h"
+#include "StatusCondition.h"
+#include "InstanceHandle_t.h"
+
+#include "DataReaderQos.h"
+#include "SubscriberQos.h"
+#include "TopicQos.h"
+
+#include "dds4ccm/idl/dds4ccm_BaseC.h"
+
+#include "dds4ccm/impl/logger/Log_Macros.h"
+
+namespace CIAO
+{
+ namespace DDS4CCM
+ {
+ CCM_DDS_Subscriber_i::CCM_DDS_Subscriber_i (DDSSubscriber * sub)
+ : impl_ (sub)
+ {
+ DDS4CCM_TRACE ("CCM_DDS_Subscriber_i::CCM_DDS_Subscriber_i");
+ }
+
+ CCM_DDS_Subscriber_i::~CCM_DDS_Subscriber_i (void)
+ {
+ DDS4CCM_TRACE ("CCM_DDS_Subscriber_i::~CCM_DDS_Subscriber_i");
+ }
+
+ ::DDS::ReturnCode_t
+ CCM_DDS_Subscriber_i::enable (void)
+ {
+ return this->impl ()->enable ();
+ }
+
+ ::DDS::StatusCondition_ptr
+ CCM_DDS_Subscriber_i::get_statuscondition (void)
+ {
+ ::DDS::StatusCondition_var retval = ::DDS::StatusCondition::_nil ();
+#if defined (CIAO_DDS4CCM_NDDS) && (CIAO_DDS4CCM_NDDS==1)
+ DDSStatusCondition* sc = this->impl ()->get_statuscondition ();
+ if (sc)
+ {
+ ACE_NEW_THROW_EX (retval,
+ CCM_DDS_StatusCondition_i (sc),
+ CORBA::NO_MEMORY ());
+ }
+#else
+ ::DDS::StatusCondition_var sc = this->impl ()->get_statuscondition ();
+ if (!CORBA::is_nil (sc.in ()))
+ {
+ ACE_NEW_THROW_EX (retval,
+ CCM_DDS_StatusCondition_i (sc.in ()),
+ CORBA::NO_MEMORY ());
+ }
+#endif
+ return retval._retn ();
+ }
+
+ ::DDS::StatusMask
+ CCM_DDS_Subscriber_i::get_status_changes (void)
+ {
+ return this->impl ()->get_status_changes ();
+ }
+
+ ::DDS::InstanceHandle_t
+ CCM_DDS_Subscriber_i::get_instance_handle (void)
+ {
+#if defined (CIAO_DDS4CCM_NDDS) && (CIAO_DDS4CCM_NDDS==1)
+ ::DDS_InstanceHandle_t const rtihandle = this->impl ()->get_instance_handle ();
+ ::DDS::InstanceHandle_t handle;
+ handle <<= rtihandle;
+ return handle;
+#else
+ return this->impl ()->get_instance_handle ();
+#endif
+ }
+
+ DDSDataReader *
+ CCM_DDS_Subscriber_i::create_datareader (
+ DDSContentFilteredTopic * topic,
+ DDSDataReaderListener * rti_drl,
+ ::DDS::StatusMask mask,
+ const ::DDS::DataReaderQos & qos)
+ {
+#if defined (CIAO_DDS4CCM_NDDS) && (CIAO_DDS4CCM_NDDS==1)
+ ACE_UNUSED_ARG (qos);
+ DDS_DataReaderQos rti_qos = DDS_DATAREADER_QOS_DEFAULT;
+ return this->impl ()->create_datareader (topic,
+ rti_qos,
+ rti_drl,
+ mask);
+#else
+ return this->impl ()->create_datareader (topic,
+ qos,
+ rti_drl,
+ mask);
+#endif
+ }
+
+ DDSDataReader *
+ CCM_DDS_Subscriber_i::create_datareader (
+ DDSTopic * topic,
+ DDSDataReaderListener * rti_drl,
+ ::DDS::StatusMask mask,
+ const ::DDS::DataReaderQos & qos)
+ {
+#if defined (CIAO_DDS4CCM_NDDS) && (CIAO_DDS4CCM_NDDS==1)
+ ACE_UNUSED_ARG (qos);
+ DDS_DataReaderQos rti_qos = DDS_DATAREADER_QOS_DEFAULT;
+ return this->impl ()->create_datareader (topic,
+ rti_qos,
+ rti_drl,
+ mask);
+#else
+ return this->impl ()->create_datareader (topic,
+ qos,
+ rti_drl,
+ mask);
+#endif
+ }
+
+#if defined (CIAO_DDS4CCM_NDDS) && (CIAO_DDS4CCM_NDDS==1)
+ DDSDataReader *
+ CCM_DDS_Subscriber_i::create_datareader_with_profile (
+ DDSContentFilteredTopic * topic,
+ const char * library_name,
+ const char * profile_name,
+ DDSDataReaderListener * rti_drl,
+ ::DDS::StatusMask mask)
+ {
+ return this->impl ()->create_datareader_with_profile (topic,
+ library_name,
+ profile_name,
+ rti_drl,
+ mask);
+ }
+#endif
+
+#if defined (CIAO_DDS4CCM_NDDS) && (CIAO_DDS4CCM_NDDS==1)
+ DDSDataReader *
+ CCM_DDS_Subscriber_i::create_datareader_with_profile (
+ DDSTopic * topic,
+ const char * library_name,
+ const char * profile_name,
+ DDSDataReaderListener * rti_drl,
+ ::DDS::StatusMask mask)
+ {
+ return this->impl ()->create_datareader_with_profile (topic,
+ library_name,
+ profile_name,
+ rti_drl,
+ mask);
+ }
+#endif
+
+ ::DDS::DataReader_ptr
+ CCM_DDS_Subscriber_i::create_datareader (
+ ::DDS::TopicDescription_ptr a_topic,
+ const ::DDS::DataReaderQos & qos,
+ ::DDS::DataReaderListener_ptr a_listener,
+ ::DDS::StatusMask mask)
+ {
+ DDS4CCM_TRACE ("CCM_DDS_Subscriber_i::create_datareader");
+ ::DDS::DataReader_var retval = ::DDS::DataReader::_nil ();
+ DDSDataReaderListener *rti_drl = 0;
+ if (!CORBA::is_nil (a_listener))
+ {
+ ACE_NEW_THROW_EX (rti_drl,
+ CCM_DDS_DataReaderListener_i (a_listener),
+ CORBA::NO_MEMORY ());
+ }
+
+ DDSDataReader * rti_dr = 0;
+ CCM_DDS_Topic_i * topic = dynamic_cast < CCM_DDS_Topic_i * > (a_topic);
+
+ if (!topic)
+ {
+ CCM_DDS_ContentFilteredTopic_i * cf_topic =
+ dynamic_cast < CCM_DDS_ContentFilteredTopic_i * > (a_topic);
+ if (!cf_topic)
+ {
+ DDS4CCM_ERROR (1, (LM_ERROR, CLINFO "CCM_DDS_Subscriber_i::create_datareader - "
+ "Error: Unable to cast provided topic to one of its servant.\n"));
+ delete rti_drl;
+ throw CCM_DDS::InternalError (::DDS::RETCODE_BAD_PARAMETER, 0);
+ }
+ else
+ rti_dr = this->create_datareader (cf_topic->get_impl (), rti_drl, mask, qos);
+ }
+ else
+ rti_dr = this->create_datareader (topic->get_impl (), rti_drl, mask, qos);
+
+ if (!rti_dr)
+ {
+ DDS4CCM_ERROR (1, (LM_ERROR, CLINFO "CCM_DDS_Subscriber_i::create_datareader - "
+ "Error: RTI Topic returned a nil datareader.\n"));
+ delete rti_drl;
+ throw CCM_DDS::InternalError (::DDS::RETCODE_ERROR, 0);
+ }
+ else
+ {
+ DDS4CCM_DEBUG (6, (LM_DEBUG, CLINFO "CCM_DDS_Subscriber_i::create_datareader_with_profile - "
+ "Successfully created datareader.\n"));
+ }
+
+ rti_dr->enable ();
+ ACE_NEW_THROW_EX (retval,
+ CCM_DDS_DataReader_i (rti_dr),
+ CORBA::NO_MEMORY ());
+ return retval._retn ();
+ }
+
+ ::DDS::DataReader_ptr
+ CCM_DDS_Subscriber_i::create_datareader_with_profile (
+ ::DDS::TopicDescription_ptr a_topic,
+ const char * library_name,
+ const char * profile_name,
+ ::DDS::DataReaderListener_ptr a_listener,
+ ::DDS::StatusMask mask)
+ {
+ DDS4CCM_TRACE ("CCM_DDS_Subscriber_i::create_datareader_with_profile");
+ ::DDS::DataReader_var retval = ::DDS::DataReader::_nil ();
+ DDSDataReaderListener *rti_drl = 0;
+ if (!CORBA::is_nil (a_listener))
+ {
+ ACE_NEW_THROW_EX (rti_drl,
+ CCM_DDS_DataReaderListener_i (a_listener),
+ CORBA::NO_MEMORY ());
+ }
+
+ DDSDataReader * rti_dr = 0;
+ CCM_DDS_Topic_i * topic = dynamic_cast < CCM_DDS_Topic_i * > (a_topic);
+
+ if (!topic)
+ {
+ CCM_DDS_ContentFilteredTopic_i * cf_topic =
+ dynamic_cast < CCM_DDS_ContentFilteredTopic_i * > (a_topic);
+ if (!cf_topic)
+ {
+ DDS4CCM_ERROR (1, (LM_ERROR, CLINFO "CCM_DDS_Subscriber_i::create_datareader_with_profile - "
+ "Error: Unable to cast provided topic to one of its servant.\n"));
+ delete rti_drl;
+ throw CCM_DDS::InternalError (::DDS::RETCODE_BAD_PARAMETER, 0);
+ }
+ else
+ rti_dr = this->create_datareader_with_profile (cf_topic->get_impl (),
+ library_name,
+ profile_name,
+ rti_drl,
+ mask);
+ }
+ else
+ rti_dr = this->create_datareader_with_profile (topic->get_impl (),
+ library_name,
+ profile_name,
+ rti_drl,
+ mask);
+
+ if (!rti_dr)
+ {
+ DDS4CCM_ERROR (1, (LM_ERROR, CLINFO "CCM_DDS_Subscriber_i::create_datareader_with_profile - "
+ "Error: RTI Topic returned a nil datareader.\n"));
+ delete rti_drl;
+ throw CCM_DDS::InternalError (::DDS::RETCODE_ERROR, 0);
+ }
+ else
+ {
+ DDS4CCM_DEBUG (6, (LM_DEBUG, CLINFO "CCM_DDS_Subscriber_i::create_datareader_with_profile - "
+ "Successfully created datareader with profile <%C#%C>.\n",
+ library_name,
+ profile_name));
+ }
+
+ rti_dr->enable ();
+ ACE_NEW_THROW_EX (retval,
+ CCM_DDS_DataReader_i (rti_dr),
+ CORBA::NO_MEMORY ());
+ return retval._retn ();
+ }
+
+ ::DDS::ReturnCode_t
+ CCM_DDS_Subscriber_i::delete_datareader (
+ ::DDS::DataReader_ptr a_datareader)
+ {
+ CCM_DDS_DataReader_i *dr = dynamic_cast< CCM_DDS_DataReader_i *> (a_datareader);
+ if (!dr)
+ {
+ DDS4CCM_ERROR (1, (LM_ERROR, CLINFO "CCM_DDS_Subscriber_i::delete_datareader - "
+ "Unable to cast provided object reference to servant.\n"));
+ return ::DDS::RETCODE_BAD_PARAMETER;
+ }
+
+ DDS4CCM_DEBUG (9, (LM_TRACE, CLINFO "CCM_DDS_Subscriber_i::delete_datareader - "
+ "Successfully casted provided object reference to servant.\n"));
+
+ DDS_ReturnCode_t const retval = this->impl ()->delete_datareader (dr->get_impl ());
+
+ if (retval != DDS_RETCODE_OK)
+ {
+ DDS4CCM_ERROR (1, (LM_ERROR, CLINFO "CCM_DDS_Subscriber_i::delete_datareader - "
+ "Error: Returned non-ok error code %C\n",
+ translate_retcode (retval)));
+ }
+ else
+ {
+ DDS4CCM_DEBUG (6, (LM_INFO, CLINFO "CCM_DDS_Subscriber_i::delete_datareader - "
+ "Datareader successfully deleted\n"));
+ }
+
+ return retval;
+ }
+
+ ::DDS::ReturnCode_t
+ CCM_DDS_Subscriber_i::delete_contained_entities (void)
+ {
+ return this->impl ()->delete_contained_entities ();
+ }
+
+ ::DDS::DataReader_ptr
+ CCM_DDS_Subscriber_i::lookup_datareader (
+ const char * impl_name)
+ {
+ ::DDS::DataReader_var retval = ::DDS::DataReader::_nil ();
+ DDSDataReader* dr = this->impl ()->lookup_datareader (impl_name);
+ if (dr)
+ {
+ ACE_NEW_THROW_EX (retval,
+ CCM_DDS_DataReader_i (dr),
+ CORBA::NO_MEMORY ());
+ }
+ return retval._retn ();
+ }
+
+ ::DDS::ReturnCode_t
+ CCM_DDS_Subscriber_i::get_datareaders (
+ ::DDS::DataReaderSeq & /*readers*/,
+ ::DDS::SampleStateMask /*sample_states*/,
+ ::DDS::ViewStateMask /*view_states*/,
+ ::DDS::InstanceStateMask /*instance_states*/)
+ {
+ throw CORBA::NO_IMPLEMENT ();
+ // Add your implementation here
+ }
+
+ ::DDS::ReturnCode_t
+ CCM_DDS_Subscriber_i::notify_datareaders (void)
+ {
+ return this->impl ()->notify_datareaders ();
+ }
+
+ ::DDS::ReturnCode_t
+ CCM_DDS_Subscriber_i::set_qos (
+ const ::DDS::SubscriberQos & qos)
+ {
+ CIAO_TRACE ("CCM_DDS_Subscriber_i::set_qos");
+#if defined (CIAO_DDS4CCM_NDDS) && (CIAO_DDS4CCM_NDDS==1)
+ ::DDS_SubscriberQos rti_qos;
+ rti_qos <<= qos;
+ return this->impl ()->get_qos (rti_qos);
+#else
+ return this->impl ()->set_qos (qos);
+#endif
+ }
+
+ ::DDS::ReturnCode_t
+ CCM_DDS_Subscriber_i::get_qos (
+ ::DDS::SubscriberQos & qos)
+ {
+ CIAO_TRACE ("CCM_DDS_Subscriber_i::get_qos");
+#if defined (CIAO_DDS4CCM_NDDS) && (CIAO_DDS4CCM_NDDS==1)
+ ::DDS_SubscriberQos rti_qos;
+ ::DDS::ReturnCode_t retcode = this->impl ()->get_qos (rti_qos);
+ qos <<= rti_qos;
+ return retcode;
+#else
+ return this->impl ()->get_qos (qos);
+#endif
+ }
+
+ ::DDS::ReturnCode_t
+ CCM_DDS_Subscriber_i::set_listener (
+ ::DDS::SubscriberListener_ptr a_listener,
+ ::DDS::StatusMask mask)
+ {
+ DDS4CCM_TRACE ("CCM_DDS_Subscriber_i::set_listener");
+
+ CCM_DDS_SubscriberListener_i* rti_impl_list = 0;
+ if (!CORBA::is_nil (a_listener))
+ {
+ ACE_NEW_THROW_EX (rti_impl_list,
+ CCM_DDS_SubscriberListener_i (a_listener),
+ CORBA::NO_MEMORY ());
+ }
+ return this->impl ()->set_listener (rti_impl_list, mask);
+ }
+
+ ::DDS::SubscriberListener_ptr
+ CCM_DDS_Subscriber_i::get_listener (void)
+ {
+ DDS4CCM_TRACE ("CCM_DDS_Subscriber_i::get_listener");
+
+ DDSSubscriberListener *rti_impl_list = this->impl ()->get_listener ();
+ CCM_DDS_SubscriberListener_i *list_proxy = dynamic_cast <CCM_DDS_SubscriberListener_i *> (rti_impl_list);
+ if (!list_proxy)
+ {
+ DDS4CCM_DEBUG (6, (LM_DEBUG, "CCM_DDS_Subscriber_i::get_listener - "
+ "DDS returned a NIL listener.\n"));
+ return ::DDS::SubscriberListener::_nil ();
+ }
+ return list_proxy->get_subscriber_listener ();
+ }
+
+ ::DDS::ReturnCode_t
+ CCM_DDS_Subscriber_i::begin_access (void)
+ {
+ return this->impl ()->begin_access ();
+ }
+
+ ::DDS::ReturnCode_t
+ CCM_DDS_Subscriber_i::end_access (void)
+ {
+ return this->impl ()->end_access ();
+ }
+
+ ::DDS::DomainParticipant_ptr
+ CCM_DDS_Subscriber_i::get_participant (void)
+ {
+ ::DDS::DomainParticipant_var retval = ::DDS::DomainParticipant::_nil ();
+#if defined (CIAO_DDS4CCM_NDDS) && (CIAO_DDS4CCM_NDDS==1)
+ DDSDomainParticipant* p = this->impl ()->get_participant ();
+ if (p)
+ {
+ ACE_NEW_THROW_EX (retval,
+ CCM_DDS_DomainParticipant_i (p),
+ CORBA::NO_MEMORY ());
+ }
+#else
+ ::DDS::DomainParticipant_var p = this->impl ()->get_participant ();
+ if (!CORBA::is_nil (p.in))
+ {
+ ACE_NEW_THROW_EX (retval,
+ CCM_DDS_DomainParticipant_i (p.in ()),
+ CORBA::NO_MEMORY ());
+ }
+#endif
+ return retval._retn ();
+ }
+
+ ::DDS::ReturnCode_t
+ CCM_DDS_Subscriber_i::set_default_datareader_qos (
+ const ::DDS::DataReaderQos & qos)
+ {
+ CIAO_TRACE ("CCM_DDS_Subscriber_i::set_default_datareader_qos");
+#if defined (CIAO_DDS4CCM_NDDS) && (CIAO_DDS4CCM_NDDS==1)
+ ::DDS_DataReaderQos rti_qos;
+ rti_qos <<= qos;
+ return this->impl ()->set_default_datareader_qos (rti_qos);
+#else
+ return this->impl ()->set_default_datareader_qos (qos);
+#endif
+ }
+
+ ::DDS::ReturnCode_t
+ CCM_DDS_Subscriber_i::get_default_datareader_qos (
+ ::DDS::DataReaderQos & qos)
+ {
+ CIAO_TRACE ("CCM_DDS_Subscriber_i::get_default_datareader_qos");
+#if defined (CIAO_DDS4CCM_NDDS) && (CIAO_DDS4CCM_NDDS==1)
+ ::DDS_DataReaderQos rti_qos;
+ ::DDS::ReturnCode_t retcode = this->impl ()->get_default_datareader_qos (rti_qos);
+ qos <<= rti_qos;
+ return retcode;
+#else
+ return this->impl ()->get_default_datareader_qos (qos);
+#endif
+ }
+
+ ::DDS::ReturnCode_t
+ CCM_DDS_Subscriber_i::copy_from_topic_qos (
+ ::DDS::DataReaderQos & a_datareader_qos,
+ const ::DDS::TopicQos & a_impl_qos)
+ {
+ DDS4CCM_TRACE ("CCM_DDS_Subscriber_i::copy_from_topic_qos");
+#if defined (CIAO_DDS4CCM_NDDS) && (CIAO_DDS4CCM_NDDS==1)
+ ::DDS_DataReaderQos rti_qos;
+ ::DDS_TopicQos rti_topic_qos;
+
+ rti_qos <<= a_datareader_qos;
+ rti_topic_qos <<= a_impl_qos;
+ ::DDS::ReturnCode_t retcode =
+ this->impl()->copy_from_topic_qos (rti_qos,
+ rti_topic_qos);
+ a_datareader_qos <<= rti_qos;
+ return retcode;
+#else
+ return this->impl()->copy_from_topic_qos (a_datareader_qos,
+ a_impl_qos);
+#endif
+ }
+
+ DDSSubscriber *
+ CCM_DDS_Subscriber_i::get_impl (void)
+ {
+ return this->impl_;
+ }
+
+ void
+ CCM_DDS_Subscriber_i::set_impl (DDSSubscriber * sub)
+ {
+ this->impl_ = sub;
+ }
+
+ DDSSubscriber *
+ CCM_DDS_Subscriber_i::impl (void)
+ {
+ if (!this->impl_)
+ {
+ throw ::CORBA::BAD_INV_ORDER ();
+ }
+ return this->impl_;
+ }
+ }
+}
+