diff options
Diffstat (limited to 'modules/CIAO/ciaosvcs/Events')
16 files changed, 1998 insertions, 0 deletions
diff --git a/modules/CIAO/ciaosvcs/Events/CIAO_EventService_Factory_impl.cpp b/modules/CIAO/ciaosvcs/Events/CIAO_EventService_Factory_impl.cpp new file mode 100644 index 00000000000..47f6370cf26 --- /dev/null +++ b/modules/CIAO/ciaosvcs/Events/CIAO_EventService_Factory_impl.cpp @@ -0,0 +1,100 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file CIAO_EventService_Factory_impl.cpp + * + * $Id$ + * + * @author Gan Deng <dengg@dre.vanderbilt.edu> + * @author George Edwards <g.edwards@vanderbilt.edu> + */ +//============================================================================= + +#include "CIAO_EventService_Factory_impl.h" +#include "ciaosvcs/Events/CIAO_RTEC/CIAO_RTEvent.h" + +namespace CIAO +{ + EventService_Factory_impl::EventService_Factory_impl (void) + { + } + + EventService_Factory_impl::EventService_Factory_impl ( + CORBA::ORB_ptr orb, + PortableServer::POA_ptr poa) : + orb_ (CORBA::ORB::_duplicate (orb)), + poa_ (PortableServer::POA::_duplicate (poa)) + { + } + + EventService_Factory_impl::~EventService_Factory_impl (void) + { + } + + CIAO_Event_Service_ptr + EventService_Factory_impl::create (EventServiceType type, + const char * ec_name) + { + ACE_DEBUG ((LM_DEBUG, "CIAO::EventService_Factory_impl::create_event_service\n")); + + EventServiceBase * event_service = 0; + + switch (type) + { + case RTEC: + ACE_NEW_RETURN (event_service, + RTEventService (this->orb_.in (), + this->poa_.in (), + ec_name), + 0); + break; + + default: + ACE_ERROR_RETURN ((LM_ERROR, "CIAO::EventService_Factory_impl::" + "create_event_service: unsupported type.\n"), + 0); + + } + + // Activate the servant + PortableServer::ObjectId_var oid = + this->poa_->activate_object (event_service); + + CORBA::Object_var obj = poa_->id_to_reference (oid.in()); + + CIAO_Event_Service_var service = + CIAO_Event_Service::_narrow (obj.in ()); + + //CIAO_Event_Service_var service = event_service->_this (); + return service._retn (); + } + + int + EventService_Factory_impl::Initializer (void) + { + return + ACE_Service_Config::process_directive ( + ace_svc_desc_EventService_Factory_impl + ); + } + + void EventService_Factory_impl::initialize ( + CORBA::ORB_ptr orb, PortableServer::POA_ptr poa) + { + this->orb_ = CORBA::ORB::_duplicate (orb); + this->poa_ = PortableServer::POA::_duplicate (poa); + } + +ACE_STATIC_SVC_DEFINE ( + EventService_Factory_impl, + ACE_TEXT ("CIAO_EventService_Factory"), + ACE_SVC_OBJ_T, + &ACE_SVC_NAME (EventService_Factory_impl), + ACE_Service_Type::DELETE_THIS | ACE_Service_Type::DELETE_OBJ, + 0 + ) + +ACE_FACTORY_DEFINE (CIAO_EVENTS, EventService_Factory_impl) + +} // namespace CIAO diff --git a/modules/CIAO/ciaosvcs/Events/CIAO_EventService_Factory_impl.h b/modules/CIAO/ciaosvcs/Events/CIAO_EventService_Factory_impl.h new file mode 100644 index 00000000000..7ba36ac9a0d --- /dev/null +++ b/modules/CIAO/ciaosvcs/Events/CIAO_EventService_Factory_impl.h @@ -0,0 +1,82 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file CIAO_EventService_Factory_impl.h + * + * $Id$ + * + * @author Gan Deng <dengg@dre.vanderbilt.edu> + * @author George Edwards <g.edwards@vanderbilt.edu> + */ +//============================================================================= + +#ifndef CIAO_EVENTSERVICE_FACTORY_IMPL_H +#define CIAO_EVENTSERVICE_FACTORY_IMPL_H +#include /**/ "ace/pre.h" + +#include "tao/PortableServer/PortableServer.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +#pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "CIAO_Events_Export.h" +#include "ciaosvcs/Events/CIAO_Events_Base/CIAO_EventServiceBase.h" + +namespace CIAO +{ + class CIAO_EVENTS_Export EventService_Factory_impl : + public ACE_Service_Object + { + public: + EventService_Factory_impl (void); + + EventService_Factory_impl (CORBA::ORB_ptr orb, + PortableServer::POA_ptr poa); + + //void init (CORBA::ORB_ptr orb, + // PortableServer::POA_ptr poa); + + virtual ~EventService_Factory_impl (void); + + /// A factory method which creates an CIAO_Event_Service object + virtual CIAO_Event_Service_ptr create (EventServiceType type, + const char * ec_name); + + virtual void initialize (CORBA::ORB_ptr orb, PortableServer::POA_ptr poa); + + /// Used to force the initialization. + static int Initializer (void); + + private: + /// Reference to the ORB + CORBA::ORB_var orb_; + + /// Reference to the Root POA + PortableServer::POA_var poa_; + }; + +ACE_STATIC_SVC_DECLARE (EventService_Factory_impl) +ACE_FACTORY_DECLARE (CIAO_EVENTS, EventService_Factory_impl) + +} + +#if defined (ACE_HAS_BROKEN_STATIC_CONSTRUCTORS) + +typedef int (*CIAO_Module_Initializer) (void); + +static CIAO_Module_Initializer +CIAO_Requires_EventService_Initializer = + &CIAO::EventService_Factory_impl::Initializer; + +#else + +static int +CIAO_Requires_EventService_Initializer = + CIAO::EventService_Factory_impl::Initializer (); + +#endif /* ACE_HAS_BROKEN_STATIC_CONSTRUCTORS */ + +#include /**/ "ace/post.h" +#endif /* CIAO_EVENTSERVICE_FACTORY_IMPL_H */ diff --git a/modules/CIAO/ciaosvcs/Events/CIAO_Events.mpc b/modules/CIAO/ciaosvcs/Events/CIAO_Events.mpc new file mode 100644 index 00000000000..a5a1417673e --- /dev/null +++ b/modules/CIAO/ciaosvcs/Events/CIAO_Events.mpc @@ -0,0 +1,20 @@ +// -*- MPC -*- +// $Id$ + +project (CIAO_DnC_Events) : orbsvcslib, ciao_rtevent_dnc { + + sharedname = CIAO_DnC_Events + idlflags += -Wb,export_include=CIAO_Events_Export.h -Wb,export_macro=CIAO_EVENTS_Export + dynamicflags = CIAO_EVENTS_BUILD_DLL + + Source_Files { + CIAO_EventService_Factory_impl.cpp + } + + Header_Files { + CIAO_EventService_Factory_impl.h + } + + IDL_Files { + } +} diff --git a/modules/CIAO/ciaosvcs/Events/CIAO_Events_Base/CIAO_EventServiceBase.cpp b/modules/CIAO/ciaosvcs/Events/CIAO_Events_Base/CIAO_EventServiceBase.cpp new file mode 100644 index 00000000000..9f6729ea0ef --- /dev/null +++ b/modules/CIAO/ciaosvcs/Events/CIAO_Events_Base/CIAO_EventServiceBase.cpp @@ -0,0 +1,37 @@ +//============================================================================= +/** + * @file CIAO_EventServiceBase.cpp + * + * $Id$ + * + * @author Gan Deng <dengg@dre.vanderbilt.edu> + */ +//============================================================================= + +#include "CIAO_EventServiceBase.h" + +namespace CIAO +{ + EventServiceBase:: + EventServiceBase (void) + { + } + + EventServiceBase::~EventServiceBase (void) + { + } + + void + EventServiceBase::ciao_push_event ( + ::Components::EventBase * evt, + const char * source_id, + ::CORBA::TypeCode_ptr tc) + ACE_THROW_SPEC (( + ::CORBA::SystemException, + ::Components::BadEventType)) + { + ACE_UNUSED_ARG (evt); + ACE_UNUSED_ARG (source_id); + ACE_UNUSED_ARG (tc); + } +} diff --git a/modules/CIAO/ciaosvcs/Events/CIAO_Events_Base/CIAO_EventServiceBase.h b/modules/CIAO/ciaosvcs/Events/CIAO_Events_Base/CIAO_EventServiceBase.h new file mode 100644 index 00000000000..be9e8f68e36 --- /dev/null +++ b/modules/CIAO/ciaosvcs/Events/CIAO_Events_Base/CIAO_EventServiceBase.h @@ -0,0 +1,142 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file CIAO_EventServiceBase.h + * + * $Id$ + * + * @author Gan Deng <dengg@dre.vanderbilt.edu> + * @author George Edwards <g.edwards@vanderbilt.edu> + */ +//============================================================================= + +#ifndef CIAO_EVENTSERVICEBASE_H +#define CIAO_EVENTSERVICEBASE_H +#include /**/ "ace/pre.h" + +#include "CIAO_EventsS.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +namespace CIAO +{ + /** + * @class EventServiceBase + * + * An abstract base servant class to implement the CIAO_Event_Service + * interface. The derived classes will provide appropriate + * implementations of the connect, disconnect, and push methods depending on + * the event mechanism used. + */ + class CIAO_EVENTS_Export EventServiceBase : + public virtual POA_CIAO::CIAO_Event_Service + { + public: + EventServiceBase (void); + + virtual ~EventServiceBase (void); + + /// A factory method for Supplier_Config objects + virtual Supplier_Config_ptr + create_supplier_config (void) + ACE_THROW_SPEC ((CORBA::SystemException)) = 0; + + /// A factory method for Consumer_Config objects + virtual Consumer_Config_ptr + create_consumer_config (void) + ACE_THROW_SPEC ((CORBA::SystemException)) = 0; + + /** + * @fn void connect_event_supplier (Supplier_Config_ptr supplier_config) + * + * Connects an event supplier using the options specified by + * @c supplier_config. + */ + virtual void connect_event_supplier ( + Supplier_Config_ptr supplier_config) + ACE_THROW_SPEC (( + CORBA::SystemException)) = 0; + + /** + * @fn void connect_event_consumer (Consumer_Config_ptr consumer_config) + * + * Connects an event consumer using the options specified by + * @c consumer_config. + */ + virtual void connect_event_consumer ( + Consumer_Config_ptr consumer_config) + ACE_THROW_SPEC (( + CORBA::SystemException)) = 0; + + /** + * @fn void disconnect_event_supplier () + * + * Disconnects the event supplier associated with this object. + */ + virtual void disconnect_event_supplier ( + const char * consumer_id) + ACE_THROW_SPEC (( + CORBA::SystemException, + Components::InvalidConnection)) = 0; + + /** + * @fn void disconnect_event_consumer (CONNECTION_ID consumer_id) + * + * Disconnects the event consumer with UUID @c consumer_id. + */ + virtual void disconnect_event_consumer ( + const char * consumer_id) + ACE_THROW_SPEC (( + CORBA::SystemException, + Components::InvalidConnection)) = 0; + + /** + * @fn void push_event (Components::EventBase * ev) + * + * Pushes event @c ev to all consumers. + */ + virtual void push_event ( + Components::EventBase * ev) + ACE_THROW_SPEC (( + CORBA::SystemException)) = 0; + + /** + * Pushes event @c ev to all consumers. The source id of the + * supplier is specified through @c source_id. + */ + virtual void ciao_push_event ( + ::Components::EventBase * evt, + const char * source_id, + ::CORBA::TypeCode_ptr tc) + ACE_THROW_SPEC (( + ::CORBA::SystemException, + ::Components::BadEventType)); + }; + + class Event_Consumer_Config_Base : + public virtual POA_CIAO::Consumer_Config + { + public: + virtual void start_conjunction_group ( + ::CORBA::Long size) + ACE_THROW_SPEC ((::CORBA::SystemException)) = 0; + + virtual void start_disjunction_group ( + ::CORBA::Long size) + ACE_THROW_SPEC ((::CORBA::SystemException)) = 0; + + virtual void insert_source ( + const char * source_id) + ACE_THROW_SPEC ((::CORBA::SystemException)) = 0; + + virtual void insert_type ( + ::CORBA::Long event_type) + ACE_THROW_SPEC ((::CORBA::SystemException)) = 0; + }; +} + +#include /**/ "ace/post.h" +#endif /* CIAO_EVENTSERVICEBASE_H */ diff --git a/modules/CIAO/ciaosvcs/Events/CIAO_Events_Base/CIAO_Events.idl b/modules/CIAO/ciaosvcs/Events/CIAO_Events_Base/CIAO_Events.idl new file mode 100644 index 00000000000..266f8b472e9 --- /dev/null +++ b/modules/CIAO/ciaosvcs/Events/CIAO_Events_Base/CIAO_Events.idl @@ -0,0 +1,79 @@ +// $Id$ + +/** + * @file CIAO_Events.idl + * + * @author George Edwards <g.edwards@vanderbilt.edu> + * @author Gan Deng <dengg@dre.vanderbilt.edu> + * + * @brief Interfaces for configuring CIAO's event mechanism. + */ + +#if !defined (CIAO_EVENTS_IDL) +#define CIAO_EVENTS_IDL + +#include "ciao/CCM_EventConsumerBase.idl" +#include "ciao/CCM_Base.idl" + +module CIAO +{ + /// A component's UUID + port name. + typedef string CONNECTION_ID; + + enum EventServiceType + { + EC, + RTEC, + NOTIFY, + RTNOTIFY + }; + + interface Supplier_Config + { + attribute CONNECTION_ID supplier_id; + + readonly attribute EventServiceType service_type; + + void destroy (); + }; + + interface Consumer_Config + { + attribute CONNECTION_ID consumer_id; + + readonly attribute EventServiceType service_type; + + attribute Components::EventConsumerBase consumer; + + void start_conjunction_group (in long size); + + void start_disjunction_group (in long size); + + void insert_source (in CONNECTION_ID source_id); + + void insert_type (in long event_type); + + void destroy (); + }; + + interface CIAO_Event_Service : Components::EventConsumerBase + { + Supplier_Config create_supplier_config (); + + Consumer_Config create_consumer_config (); + + void connect_event_supplier (in Supplier_Config supplier_conf); + + void connect_event_consumer (in Consumer_Config consumer_conf); + + void disconnect_event_supplier (in CONNECTION_ID conn_id) + raises (Components::InvalidConnection); + + void disconnect_event_consumer (in CONNECTION_ID conn_id) + raises (Components::InvalidConnection); + + //void push_event (in Components::EventBase ev); + }; +}; + +#endif /* CIAO_EVENTS_IDL */ diff --git a/modules/CIAO/ciaosvcs/Events/CIAO_Events_Base/CIAO_Events_Base.mpc b/modules/CIAO/ciaosvcs/Events/CIAO_Events_Base/CIAO_Events_Base.mpc new file mode 100644 index 00000000000..ef9688d0d5b --- /dev/null +++ b/modules/CIAO/ciaosvcs/Events/CIAO_Events_Base/CIAO_Events_Base.mpc @@ -0,0 +1,26 @@ +// -*- MPC -*- +// $Id$ + + +project (CIAO_DnC_Events_Base) : orbsvcslib, ciao_client_dnc, ciao_container_dnc { + + sharedname = CIAO_DnC_Events_Base + idlflags += -Wb,export_include=CIAO_Events_Export.h -Wb,export_macro=CIAO_EVENTS_Export + dynamicflags = CIAO_EVENTS_BUILD_DLL + + IDL_Files { + CIAO_Events.idl + } + + Source_Files { + CIAO_EventServiceBase.cpp + CIAO_EventsC.cpp + CIAO_EventsS.cpp + } + + Header_Files { + CIAO_EventsC.h + CIAO_EventsS.h + CIAO_EventServiceBase.h + } +} diff --git a/modules/CIAO/ciaosvcs/Events/CIAO_Events_Base/CIAO_Events_Export.h b/modules/CIAO/ciaosvcs/Events/CIAO_Events_Base/CIAO_Events_Export.h new file mode 100644 index 00000000000..6678cef4bcc --- /dev/null +++ b/modules/CIAO/ciaosvcs/Events/CIAO_Events_Base/CIAO_Events_Export.h @@ -0,0 +1,53 @@ +// -*- C++ -*- +// $Id$ +// Definition for Win32 Export directives. +// This file is generated automatically by generate_export_file.pl CIAO_EVENTS +// ------------------------------ +#ifndef CIAO_EVENTS_EXPORT_H +#define CIAO_EVENTS_EXPORT_H + +#include "ace/config-all.h" + +#if !defined (CIAO_EVENTS_HAS_DLL) +# define CIAO_EVENTS_HAS_DLL 1 +#endif /* ! CIAO_EVENTS_HAS_DLL */ + +#if defined (CIAO_EVENTS_HAS_DLL) && (CIAO_EVENTS_HAS_DLL == 1) +# if defined (CIAO_EVENTS_BUILD_DLL) +# define CIAO_EVENTS_Export ACE_Proper_Export_Flag +# define CIAO_EVENTS_SINGLETON_DECLARATION(T) ACE_EXPORT_SINGLETON_DECLARATION (T) +# define CIAO_EVENTS_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) ACE_EXPORT_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) +# else /* CIAO_EVENTS_BUILD_DLL */ +# define CIAO_EVENTS_Export ACE_Proper_Import_Flag +# define CIAO_EVENTS_SINGLETON_DECLARATION(T) ACE_IMPORT_SINGLETON_DECLARATION (T) +# define CIAO_EVENTS_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) ACE_IMPORT_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) +# endif /* CIAO_EVENTS_BUILD_DLL */ +#else /* CIAO_EVENTS_HAS_DLL == 1 */ +# define CIAO_EVENTS_Export +# define CIAO_EVENTS_SINGLETON_DECLARATION(T) +# define CIAO_EVENTS_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) +#endif /* CIAO_EVENTS_HAS_DLL == 1 */ + +// Set CIAO_EVENTS_NTRACE = 0 to turn on library specific tracing even if +// tracing is turned off for ACE. +#if !defined (CIAO_EVENTS_NTRACE) +# if (ACE_NTRACE == 1) +# define CIAO_EVENTS_NTRACE 1 +# else /* (ACE_NTRACE == 1) */ +# define CIAO_EVENTS_NTRACE 0 +# endif /* (ACE_NTRACE == 1) */ +#endif /* !CIAO_EVENTS_NTRACE */ + +#if (CIAO_EVENTS_NTRACE == 1) +# define CIAO_EVENTS_TRACE(X) +#else /* (CIAO_EVENTS_NTRACE == 1) */ +# if !defined (ACE_HAS_TRACE) +# define ACE_HAS_TRACE +# endif /* ACE_HAS_TRACE */ +# define CIAO_EVENTS_TRACE(X) ACE_TRACE_IMPL(X) +# include "ace/Trace.h" +#endif /* (CIAO_EVENTS_NTRACE == 1) */ + +#endif /* CIAO_EVENTS_EXPORT_H */ + +// End of auto generated file. diff --git a/modules/CIAO/ciaosvcs/Events/CIAO_Events_Export.h b/modules/CIAO/ciaosvcs/Events/CIAO_Events_Export.h new file mode 100644 index 00000000000..6678cef4bcc --- /dev/null +++ b/modules/CIAO/ciaosvcs/Events/CIAO_Events_Export.h @@ -0,0 +1,53 @@ +// -*- C++ -*- +// $Id$ +// Definition for Win32 Export directives. +// This file is generated automatically by generate_export_file.pl CIAO_EVENTS +// ------------------------------ +#ifndef CIAO_EVENTS_EXPORT_H +#define CIAO_EVENTS_EXPORT_H + +#include "ace/config-all.h" + +#if !defined (CIAO_EVENTS_HAS_DLL) +# define CIAO_EVENTS_HAS_DLL 1 +#endif /* ! CIAO_EVENTS_HAS_DLL */ + +#if defined (CIAO_EVENTS_HAS_DLL) && (CIAO_EVENTS_HAS_DLL == 1) +# if defined (CIAO_EVENTS_BUILD_DLL) +# define CIAO_EVENTS_Export ACE_Proper_Export_Flag +# define CIAO_EVENTS_SINGLETON_DECLARATION(T) ACE_EXPORT_SINGLETON_DECLARATION (T) +# define CIAO_EVENTS_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) ACE_EXPORT_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) +# else /* CIAO_EVENTS_BUILD_DLL */ +# define CIAO_EVENTS_Export ACE_Proper_Import_Flag +# define CIAO_EVENTS_SINGLETON_DECLARATION(T) ACE_IMPORT_SINGLETON_DECLARATION (T) +# define CIAO_EVENTS_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) ACE_IMPORT_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) +# endif /* CIAO_EVENTS_BUILD_DLL */ +#else /* CIAO_EVENTS_HAS_DLL == 1 */ +# define CIAO_EVENTS_Export +# define CIAO_EVENTS_SINGLETON_DECLARATION(T) +# define CIAO_EVENTS_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) +#endif /* CIAO_EVENTS_HAS_DLL == 1 */ + +// Set CIAO_EVENTS_NTRACE = 0 to turn on library specific tracing even if +// tracing is turned off for ACE. +#if !defined (CIAO_EVENTS_NTRACE) +# if (ACE_NTRACE == 1) +# define CIAO_EVENTS_NTRACE 1 +# else /* (ACE_NTRACE == 1) */ +# define CIAO_EVENTS_NTRACE 0 +# endif /* (ACE_NTRACE == 1) */ +#endif /* !CIAO_EVENTS_NTRACE */ + +#if (CIAO_EVENTS_NTRACE == 1) +# define CIAO_EVENTS_TRACE(X) +#else /* (CIAO_EVENTS_NTRACE == 1) */ +# if !defined (ACE_HAS_TRACE) +# define ACE_HAS_TRACE +# endif /* ACE_HAS_TRACE */ +# define CIAO_EVENTS_TRACE(X) ACE_TRACE_IMPL(X) +# include "ace/Trace.h" +#endif /* (CIAO_EVENTS_NTRACE == 1) */ + +#endif /* CIAO_EVENTS_EXPORT_H */ + +// End of auto generated file. diff --git a/modules/CIAO/ciaosvcs/Events/CIAO_RTEC/CIAO_RTEVENT_Export.h b/modules/CIAO/ciaosvcs/Events/CIAO_RTEC/CIAO_RTEVENT_Export.h new file mode 100644 index 00000000000..d79e4a4581f --- /dev/null +++ b/modules/CIAO/ciaosvcs/Events/CIAO_RTEC/CIAO_RTEVENT_Export.h @@ -0,0 +1,58 @@ + +// -*- C++ -*- +// $Id$ +// Definition for Win32 Export directives. +// This file is generated automatically by generate_export_file.pl CIAO_RTEVENT +// ------------------------------ +#ifndef CIAO_RTEVENT_EXPORT_H +#define CIAO_RTEVENT_EXPORT_H + +#include "ace/config-all.h" + +#if defined (ACE_AS_STATIC_LIBS) && !defined (CIAO_RTEVENT_HAS_DLL) +# define CIAO_RTEVENT_HAS_DLL 0 +#endif /* ACE_AS_STATIC_LIBS && CIAO_RTEVENT_HAS_DLL */ + +#if !defined (CIAO_RTEVENT_HAS_DLL) +# define CIAO_RTEVENT_HAS_DLL 1 +#endif /* ! CIAO_RTEVENT_HAS_DLL */ + +#if defined (CIAO_RTEVENT_HAS_DLL) && (CIAO_RTEVENT_HAS_DLL == 1) +# if defined (CIAO_RTEVENT_BUILD_DLL) +# define CIAO_RTEVENT_Export ACE_Proper_Export_Flag +# define CIAO_RTEVENT_SINGLETON_DECLARATION(T) ACE_EXPORT_SINGLETON_DECLARATION (T) +# define CIAO_RTEVENT_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) ACE_EXPORT_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) +# else /* CIAO_RTEVENT_BUILD_DLL */ +# define CIAO_RTEVENT_Export ACE_Proper_Import_Flag +# define CIAO_RTEVENT_SINGLETON_DECLARATION(T) ACE_IMPORT_SINGLETON_DECLARATION (T) +# define CIAO_RTEVENT_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) ACE_IMPORT_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) +# endif /* CIAO_RTEVENT_BUILD_DLL */ +#else /* CIAO_RTEVENT_HAS_DLL == 1 */ +# define CIAO_RTEVENT_Export +# define CIAO_RTEVENT_SINGLETON_DECLARATION(T) +# define CIAO_RTEVENT_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) +#endif /* CIAO_RTEVENT_HAS_DLL == 1 */ + +// Set CIAO_RTEVENT_NTRACE = 0 to turn on library specific tracing even if +// tracing is turned off for ACE. +#if !defined (CIAO_RTEVENT_NTRACE) +# if (ACE_NTRACE == 1) +# define CIAO_RTEVENT_NTRACE 1 +# else /* (ACE_NTRACE == 1) */ +# define CIAO_RTEVENT_NTRACE 0 +# endif /* (ACE_NTRACE == 1) */ +#endif /* !CIAO_RTEVENT_NTRACE */ + +#if (CIAO_RTEVENT_NTRACE == 1) +# define CIAO_RTEVENT_TRACE(X) +#else /* (CIAO_RTEVENT_NTRACE == 1) */ +# if !defined (ACE_HAS_TRACE) +# define ACE_HAS_TRACE +# endif /* ACE_HAS_TRACE */ +# define CIAO_RTEVENT_TRACE(X) ACE_TRACE_IMPL(X) +# include "ace/Trace.h" +#endif /* (CIAO_RTEVENT_NTRACE == 1) */ + +#endif /* CIAO_RTEVENT_EXPORT_H */ + +// End of auto generated file. diff --git a/modules/CIAO/ciaosvcs/Events/CIAO_RTEC/CIAO_RTEvent.cpp b/modules/CIAO/ciaosvcs/Events/CIAO_RTEC/CIAO_RTEvent.cpp new file mode 100644 index 00000000000..1ef5b1ad889 --- /dev/null +++ b/modules/CIAO/ciaosvcs/Events/CIAO_RTEC/CIAO_RTEvent.cpp @@ -0,0 +1,833 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file CIAO_RTEvent.cpp + * + * $Id$ + * + * @author Gan Deng <dengg@dre.vanderbilt.edu> + * @author George Edwards <g.edwards@vanderbilt.edu> + */ +//============================================================================= + +#include "CIAO_RTEvent.h" +#include "ciao/CIAO_common.h" +#include "SimpleAddressServer.h" +#include "tao/ORB_Core.h" +#include "tao/AnyTypeCode/Any_Unknown_IDL_Type.h" +#include "orbsvcs/CosNamingC.h" + +#include <sstream> + +namespace CIAO +{ + + + RTEventService::RTEventService (CORBA::ORB_ptr orb, + PortableServer::POA_ptr poa, + const char * ec_name) : + orb_ (CORBA::ORB::_duplicate (orb)), + root_poa_ (PortableServer::POA::_duplicate (poa)) + { + this->create_rt_event_channel (ec_name); + } + + + RTEventService::~RTEventService (void) + { + } + + + Supplier_Config_ptr + RTEventService::create_supplier_config (void) + ACE_THROW_SPEC ((CORBA::SystemException)) + { + RTEvent_Supplier_Config_impl * supplier_config = 0; + ACE_NEW_RETURN (supplier_config, + RTEvent_Supplier_Config_impl (this->root_poa_.in ()), + Supplier_Config::_nil ()); + RTEvent_Supplier_Config_var return_rtec = + supplier_config->_this (); + return return_rtec._retn (); + } + + + Consumer_Config_ptr + RTEventService::create_consumer_config (void) + ACE_THROW_SPEC ((CORBA::SystemException)) + { + RTEvent_Consumer_Config_impl * consumer_config = 0; + ACE_NEW_RETURN (consumer_config, + RTEvent_Consumer_Config_impl (this->root_poa_.in ()), + Consumer_Config::_nil ()); + RTEvent_Consumer_Config_var return_rtec = + consumer_config->_this (); + return return_rtec._retn (); + } + + + // @@TODO: We might want to maintain a map for managing multiple proxy consumers + // to multiple event suppliers. + void + RTEventService::connect_event_supplier ( + Supplier_Config_ptr supplier_config) + ACE_THROW_SPEC (( + CORBA::SystemException)) + { + if (CIAO::debug_level () > 9) + { + ACE_DEBUG ((LM_DEBUG, "CIAO::RTEventService::connect_event_supplier\n")); + } + + RTEvent_Supplier_Config_ptr rt_config = + RTEvent_Supplier_Config::_narrow (supplier_config); + + if (CORBA::is_nil (rt_config)) + { + throw CORBA::BAD_PARAM (); + } + + // Get a proxy push consumer from the EventChannel. + RtecEventChannelAdmin::SupplierAdmin_var supplier_admin = + this->rt_event_channel_->for_suppliers (); + + RtecEventChannelAdmin::ProxyPushConsumer_var proxy_push_consumer = + supplier_admin->obtain_push_consumer(); + + // Create and register supplier servant + RTEventServiceSupplier_impl * supplier_servant = 0; + ACE_NEW (supplier_servant, + RTEventServiceSupplier_impl (root_poa_.in ())); + RtecEventComm::PushSupplier_var push_supplier = + supplier_servant->_this (); + + RtecEventChannelAdmin::SupplierQOS_var qos = + rt_config->rt_event_qos (); + + ACE_SupplierQOS_Factory supplier_qos; + supplier_qos.insert (ACE_ES_EVENT_SOURCE_ANY, ACE_ES_EVENT_ANY, 0, 1); + + supplier_qos.insert (ACE_ES_EVENT_SOURCE_ANY, + ACE_ES_EVENT_ANY, + 0, // handle to the rt_info structure + 1); + + proxy_push_consumer->connect_push_supplier (push_supplier.in (), + supplier_qos.get_SupplierQOS ()); + + + this->proxy_consumer_map_.bind ( + supplier_config->supplier_id (), + proxy_push_consumer._retn ()); + } + + void + RTEventService::connect_event_consumer ( + Consumer_Config_ptr consumer_config) + ACE_THROW_SPEC (( + CORBA::SystemException)) + { + if (CIAO::debug_level () > 9) + { + ACE_DEBUG ((LM_DEBUG, "CIAO::RTEventService::connect_event_consumer\n")); + } + + RTEvent_Consumer_Config_ptr rt_config = + RTEvent_Consumer_Config::_narrow (consumer_config); + + if (CORBA::is_nil (rt_config)) + { + throw CORBA::BAD_PARAM (); + } + + Components::EventConsumerBase_var consumer = + consumer_config->consumer (); + + if (CORBA::is_nil (consumer.in ())) + ACE_DEBUG ((LM_DEBUG, "nil event consumer\n")); + + RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin = + this->rt_event_channel_->for_consumers (); + + RtecEventChannelAdmin::ProxyPushSupplier_var proxy_supplier = + consumer_admin->obtain_push_supplier (); + + // Create and register consumer servant + RTEventServiceConsumer_impl * consumer_servant = 0; + ACE_NEW (consumer_servant, + RTEventServiceConsumer_impl ( + root_poa_.in (), + consumer.in ())); + RtecEventComm::PushConsumer_var push_consumer = + consumer_servant->_this (); + + RtecEventChannelAdmin::ConsumerQOS_var qos = + rt_config->rt_event_qos (); + + ACE_DEBUG ((LM_DEBUG, "\n======== ConsumerQoS length is: %d\n\n", + qos->dependencies.length ())); + + if (qos->dependencies.length () == 0) + { + qos->dependencies.length (1); + qos->dependencies[0].event.header.type = ACE_ES_EVENT_ANY; + qos->dependencies[0].event.header.source = ACE_ES_EVENT_SOURCE_ANY; + qos->dependencies[0].rt_info = 0; + + ACE_DEBUG ((LM_DEBUG, "\n======== Normalized ConsumerQoS length is: %d\n\n", + qos->dependencies.length ())); + } + + proxy_supplier->connect_push_consumer (push_consumer.in (), + qos.in () + //qos_factory.get_ConsumerQOS () + ); + + ACE_CString consumer_id = + consumer_config->consumer_id (); + + this->proxy_supplier_map_.bind (consumer_id.c_str (), proxy_supplier._retn ()); + } + + void + RTEventService::disconnect_event_supplier ( + const char * connection_id) + ACE_THROW_SPEC (( + CORBA::SystemException, + Components::InvalidConnection)) + { + ACE_UNUSED_ARG (connection_id); + + RtecEventChannelAdmin::ProxyPushConsumer_var proxy_consumer; + + this->proxy_consumer_map_.unbind (connection_id, proxy_consumer); + + proxy_consumer->disconnect_push_consumer (); + } + + void + RTEventService::disconnect_event_consumer ( + const char * connection_id) + ACE_THROW_SPEC (( + CORBA::SystemException, + Components::InvalidConnection)) + { + RtecEventChannelAdmin::ProxyPushSupplier_var proxy_supplier; + + this->proxy_supplier_map_.unbind (connection_id, proxy_supplier); + + proxy_supplier->disconnect_push_supplier (); + } + + void + RTEventService::push_event ( + Components::EventBase * ev) + ACE_THROW_SPEC (( + CORBA::SystemException)) + { + ACE_UNUSED_ARG (ev); + if (CIAO::debug_level () > 10) + { + ACE_DEBUG ((LM_DEBUG, "------CIAO::RTEventService::push_event------\n")); + } + } + + void + RTEventService::ciao_push_event ( + Components::EventBase * ev, + const char * source_id, + CORBA::TypeCode_ptr tc) + ACE_THROW_SPEC (( + CORBA::SystemException, + Components::BadEventType)) + { + if (CIAO::debug_level () > 10) + { + ACE_DEBUG ((LM_DEBUG, "------CIAO::RTEventService::ciao_push_event------\n")); + } + RtecEventComm::EventSet events (1); + events.length (1); + + ACE_Hash<ACE_CString> hasher; + + events[0].header.source = hasher (source_id); + events[0].header.type = ACE_ES_EVENT_ANY; + events[0].header.ttl = 10; + + // We can't use the Any insert operator here, since it will put the + // EventBase typecode into the Any, and the actual eventtype's fields + // (if any) will get truncated when the Any is demarshaled. So the + // signature of this method has been changed to pass in the derived + // typecode, and TAO-specific methods are used to assign it as the + // Any's typecode and encode the value. This incurs an extra + // encoding, which we may want to try to optimize someday. + TAO_OutputCDR out; + out << ev; + TAO_InputCDR in (out); + TAO::Unknown_IDL_Type *unk = 0; + ACE_NEW (unk, + TAO::Unknown_IDL_Type (tc, in)); + events[0].data.any_value.replace (unk); + + ACE_DEBUG ((LM_DEBUG, "******* push event for source string: %s\n", source_id)); + ACE_DEBUG ((LM_DEBUG, "******* push event for source id: %i\n", events[0].header.source)); + + RtecEventChannelAdmin::ProxyPushConsumer_var proxy_consumer; + + if (this->proxy_consumer_map_.find (source_id, proxy_consumer) != 0) + { + ACE_DEBUG ((LM_DEBUG, + "CIAO (%P|%t) - RTEventService::ciao_push_event, " + "Error in finding the proxy consumer object.\n")); + throw Components::BadEventType (); + } + + proxy_consumer->push (events); + } + + void + RTEventService::create_rt_event_channel (const char * ec_name) + ACE_THROW_SPEC (( + CORBA::SystemException)) + { + if (CIAO::debug_level () > 10) + { + ACE_DEBUG ((LM_DEBUG, "CIAO::EventService_Factory_impl::create_rt_event_channel\n")); + } + + TAO_EC_Default_Factory::init_svcs (); + + TAO_EC_Event_Channel_Attributes attributes (this->root_poa_.in (), + this->root_poa_.in ()); + TAO_EC_Event_Channel * ec_servant = 0; + ACE_NEW (ec_servant, TAO_EC_Event_Channel (attributes)); + ec_servant->activate (); + this->rt_event_channel_ = ec_servant->_this (); + + if (false) + { + // Find the Naming Service. + CORBA::Object_var obj = orb_->resolve_initial_references("NameService"); + CosNaming::NamingContextExt_var root_context = CosNaming::NamingContextExt::_narrow(obj.in()); + + // Bind the Event Channel using Naming Services + CosNaming::Name_var name = root_context->to_name (ec_name); + ACE_DEBUG ((LM_DEBUG, "\nRegister naming: %s\n", ec_name)); + root_context->rebind (name.in(), rt_event_channel_.in()); + } + } + + ::CORBA::Boolean + RTEventService::create_addr_serv ( + const char * name, + ::CORBA::UShort port, + const char * address) + ACE_THROW_SPEC (( + ::CORBA::SystemException)) + { + ACE_DEBUG ((LM_ERROR, "Create an address server using port [%d]\n", port)); + + // Initialize the address server with the desired address. + // This will be used by the sender object and the multicast + // receiver. + ACE_INET_Addr send_addr (port, address); + + SimpleAddressServer * addr_srv_impl = new SimpleAddressServer (send_addr); + + PortableServer::ObjectId_var addr_srv_oid = + this->root_poa_->activate_object (addr_srv_impl); + CORBA::Object_var addr_srv_obj = + this->root_poa_->id_to_reference (addr_srv_oid.in()); + RtecUDPAdmin::AddrServer_var addr_srv = + RtecUDPAdmin::AddrServer::_narrow (addr_srv_obj.in()); + +/* + // First we convert the string into an INET address, then we + // convert that into the right IDL structure: + ACE_INET_Addr udp_addr (address); + ACE_DEBUG ((LM_DEBUG, + "udp mcast address is: %s\n", + address)); + RtecUDPAdmin::UDP_Addr addr; + addr.ipaddr = udp_addr.get_ip_address (); + addr.port = udp_addr.get_port_number (); + + // Now we create and activate the servant + SimpleAddressServer as_impl (addr); + RtecUDPAdmin::AddrServer_var addr_srv = + as_impl._this (); +*/ + + this->addr_serv_map_.bind ( + name, + RtecUDPAdmin::AddrServer::_duplicate (addr_srv.in ())); + + + return true; + } + + ::CORBA::Boolean + RTEventService::create_sender ( + const char * addr_serv_id) + ACE_THROW_SPEC (( + ::CORBA::SystemException)) + { + ACE_DEBUG ((LM_DEBUG, "Create a Sender object with addr_serv_id: %s\n",addr_serv_id )); + + // We need a local socket to send the data, open it and check + // that everything is OK: + TAO_ECG_Refcounted_Endpoint endpoint(new TAO_ECG_UDP_Out_Endpoint); + if (endpoint->dgram ().open (ACE_Addr::sap_any) == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, "Cannot open send endpoint\n"), + 1); + } + + RtecUDPAdmin::AddrServer_var addr_srv; + if (this->addr_serv_map_.find (addr_serv_id, addr_srv) != 0) + return false; + + // Now we setup the sender: + TAO_EC_Servant_Var<TAO_ECG_UDP_Sender> sender = TAO_ECG_UDP_Sender::create(); + sender->init (this->rt_event_channel_.in (), + addr_srv.in (), + endpoint); + + // Setup the subscription and connect to the EC + ACE_ConsumerQOS_Factory cons_qos_fact; + cons_qos_fact.start_disjunction_group (); + cons_qos_fact.insert (ACE_ES_EVENT_SOURCE_ANY, ACE_ES_EVENT_ANY, 0); + RtecEventChannelAdmin::ConsumerQOS sub = cons_qos_fact.get_ConsumerQOS (); + sub.is_gateway = 1; + sender->connect (sub); + + return true; + } + + ::CORBA::Boolean + RTEventService::create_receiver ( + const char * addr_serv_id, + ::CORBA::Boolean is_multicast, + ::CORBA::UShort listen_port) + ACE_THROW_SPEC (( + ::CORBA::SystemException)) + { + ACE_DEBUG ((LM_DEBUG, "Create a receiver object with addr_serv_id: %s\n",addr_serv_id )); + + // Create and initialize the receiver + TAO_EC_Servant_Var<TAO_ECG_UDP_Receiver> receiver = + TAO_ECG_UDP_Receiver::create(); + + // AddressServer is necessary when "multicast" is enabled, but not for "udp" + if (is_multicast) + { + TAO_ECG_UDP_Out_Endpoint endpoint; + if (endpoint.dgram ().open (ACE_Addr::sap_any) == -1) + { + ACE_DEBUG ((LM_ERROR, "Cannot open send endpoint\n")); + return false; + } + + // TAO_ECG_UDP_Receiver::init() takes a TAO_ECG_Refcounted_Endpoint. + // If we don't clone our endpoint and pass &endpoint, the receiver will + // attempt to delete endpoint during shutdown. + TAO_ECG_UDP_Out_Endpoint* clone; + ACE_NEW_RETURN (clone, + TAO_ECG_UDP_Out_Endpoint (endpoint), + false); + + RtecUDPAdmin::AddrServer_var addr_srv; + + if (this->addr_serv_map_.find (addr_serv_id, addr_srv) != 0) + return false; + + receiver->init (this->rt_event_channel_.in (), + clone, + addr_srv.in ()); + } + else + { + receiver->init (this->rt_event_channel_.in (), 0, 0); + } + + // Setup the registration and connect to the event channel + ACE_SupplierQOS_Factory supp_qos_fact; + supp_qos_fact.insert (ACE_ES_EVENT_SOURCE_ANY, ACE_ES_EVENT_ANY, 0, 1); + RtecEventChannelAdmin::SupplierQOS pub = supp_qos_fact.get_SupplierQOS (); + receiver->connect (pub); + + // Create the appropriate event handler and register it with the reactor + + if (is_multicast) + { + auto_ptr<TAO_ECG_Mcast_EH> mcast_eh (new TAO_ECG_Mcast_EH (receiver.in())); + mcast_eh->reactor (this->orb_->orb_core ()->reactor ()); + mcast_eh->open (this->rt_event_channel_.in()); + mcast_eh.release(); + } + else + { + ACE_DEBUG ((LM_DEBUG, "\nUDP Event Handler Port [%d]\n", listen_port)); + + //auto_ptr<TAO_ECG_UDP_EH> udp_eh (new TAO_ECG_UDP_EH (receiver.in())); + TAO_ECG_UDP_EH * udp_eh = new TAO_ECG_UDP_EH (receiver.in()); + + udp_eh->reactor (this->orb_->orb_core ()->reactor ()); + + ACE_INET_Addr local_addr (listen_port); + if (udp_eh->open (local_addr) == -1) + { + ACE_DEBUG ((LM_ERROR, "Cannot open event handler on port [%d]\n", listen_port)); + return false; + } + //udp_eh.release (); + } + + return true; + } + + + ::RtecEventChannelAdmin::EventChannel_ptr + RTEventService::tao_rt_event_channel () + ACE_THROW_SPEC ((::CORBA::SystemException)) + { + return this->rt_event_channel_.in (); + } + + ////////////////////////////////////////////////////////////////////// + /// Supplier Servant Implementation + ////////////////////////////////////////////////////////////////////// + + RTEventServiceSupplier_impl::RTEventServiceSupplier_impl ( + PortableServer::POA_ptr poa) : + poa_ (PortableServer::POA::_duplicate (poa)) + { + } + + void + RTEventServiceSupplier_impl::disconnect_push_supplier (void) + ACE_THROW_SPEC (( + CORBA::SystemException)) + { + PortableServer::ObjectId_var oid = this->poa_->servant_to_id (this); + this->poa_->deactivate_object (oid); + this->_remove_ref (); + } + + ////////////////////////////////////////////////////////////////////// + /// Consumer Servant Implementation + ////////////////////////////////////////////////////////////////////// + + RTEventServiceConsumer_impl::RTEventServiceConsumer_impl ( + PortableServer::POA_ptr poa, + Components::EventConsumerBase_ptr consumer) : + poa_ (PortableServer::POA::_duplicate (poa)), + event_consumer_ (Components::EventConsumerBase::_duplicate (consumer)) + { + } + + void + RTEventServiceConsumer_impl::push (const RtecEventComm::EventSet& events) + ACE_THROW_SPEC ((CORBA::SystemException)) + { + if (CIAO::debug_level () > 10) + { + ACE_DEBUG ((LM_DEBUG, "CIAO::RTEventServiceConsumer_impl::push\n")); + } + + for (size_t i = 0; i < events.length (); ++i) + { + std::ostringstream out; + out << "Received event," + << " type: " << events[i].header.type + << " source: " << events[i].header.source; + + ACE_OS::printf("%s\n", out.str().c_str()); // printf is synchronized + + Components::EventBase * ev = 0; + try + { + TAO::Unknown_IDL_Type *unk = + dynamic_cast<TAO::Unknown_IDL_Type *> (events[i].data.any_value.impl ()); + TAO_InputCDR for_reading (unk->_tao_get_cdr ()); + + if (for_reading >> ev) + { + ev->_add_ref (); + this->event_consumer_->push_event (ev); + } + else + { + ACE_ERROR ((LM_ERROR, "CIAO::RTEventServiceConsumer_impl::push(), " + "failed to extract event\n")); + } + } + catch (const CORBA::Exception& ex) + { + ACE_ERROR ((LM_ERROR, + "CORBA EXCEPTION caught\n")); + ex._tao_print_exception ("RTEventServiceConsumer_impl::push()\n"); + } + } + + } + + void + RTEventServiceConsumer_impl::disconnect_push_consumer (void) + ACE_THROW_SPEC ((CORBA::SystemException)) + { + if (CIAO::debug_level () > 10) + { + ACE_DEBUG ((LM_DEBUG, "CIAO::RTEventServiceConsumer_impl::disconnect_push_consumer\n")); + } + + PortableServer::ObjectId_var oid = this->poa_->servant_to_id (this); + this->poa_->deactivate_object (oid); + this->_remove_ref (); + } + + + ////////////////////////////////////////////////////////////////////// + /// Supplier Config Implementation + ////////////////////////////////////////////////////////////////////// + + RTEvent_Supplier_Config_impl::RTEvent_Supplier_Config_impl (PortableServer::POA_ptr poa) : + service_type_ (RTEC), + poa_ (PortableServer::POA::_duplicate (poa)) + { + } + + RTEvent_Supplier_Config_impl::~RTEvent_Supplier_Config_impl (void) + { + if (CIAO::debug_level () > 10) + { + ACE_DEBUG + ((LM_DEBUG, "RTEvent_Supplier_Config_impl::~RTEvent_Supplier_Config_impl\n")); + } + } + + void + RTEvent_Supplier_Config_impl::supplier_id ( + const char * supplier_id) + ACE_THROW_SPEC (( + CORBA::SystemException)) + { + if (CIAO::debug_level () > 10) + { + ACE_DEBUG ((LM_DEBUG, "supplier's id: %s\n", supplier_id)); + } + + this->supplier_id_ = supplier_id; + + ACE_Hash<ACE_CString> hasher; + + RtecEventComm::EventSourceID source_id = + hasher (this->supplier_id_.c_str ()); +/* + this->qos_.insert (source_id, + ACE_ES_EVENT_ANY, + 0, + 1); + +*/ + + this->qos_.insert (ACE_ES_EVENT_SOURCE_ANY, + ACE_ES_EVENT_ANY, + 0, // handle to the rt_info structure + 1); + + ACE_DEBUG ((LM_DEBUG, "supplier's source id is: %d\n", source_id)); + } + + CONNECTION_ID + RTEvent_Supplier_Config_impl::supplier_id () + ACE_THROW_SPEC (( + CORBA::SystemException)) + { + return CORBA::string_dup (this->supplier_id_.c_str ()); + } + + EventServiceType + RTEvent_Supplier_Config_impl::service_type () + ACE_THROW_SPEC (( + CORBA::SystemException)) + { + return this->service_type_; + } + + RtecEventChannelAdmin::SupplierQOS * + RTEvent_Supplier_Config_impl::rt_event_qos () + ACE_THROW_SPEC (( + CORBA::SystemException)) + { + RtecEventChannelAdmin::SupplierQOS * supplier_qos = 0; + ACE_NEW_RETURN (supplier_qos, + RtecEventChannelAdmin::SupplierQOS (this->qos_.get_SupplierQOS ()), + 0); + return supplier_qos; + } + + void + RTEvent_Supplier_Config_impl::destroy () + ACE_THROW_SPEC (( + CORBA::SystemException)) + { + PortableServer::ObjectId_var oid = this->poa_->servant_to_id (this); + this->poa_->deactivate_object (oid); + this->_remove_ref (); + } + + ////////////////////////////////////////////////////////////////////// + /// Consumer Config Implementation + ////////////////////////////////////////////////////////////////////// + + RTEvent_Consumer_Config_impl::RTEvent_Consumer_Config_impl (PortableServer::POA_ptr poa) : + service_type_ (RTEC), + poa_ (PortableServer::POA::_duplicate (poa)) + { + } + + RTEvent_Consumer_Config_impl::~RTEvent_Consumer_Config_impl (void) + { + ACE_DEBUG + ((LM_DEBUG, "RTEvent_Consumer_Config_impl::~RTEvent_Consumer_Config_impl\n")); + } + + void + RTEvent_Consumer_Config_impl::start_conjunction_group ( + CORBA::Long size) + ACE_THROW_SPEC (( + CORBA::SystemException)) + { + ACE_DEBUG + ((LM_DEBUG, "RTEvent_Consumer_Config_impl::start_conjunction_group\n")); + + this->qos_.start_conjunction_group (size); + } + + void + RTEvent_Consumer_Config_impl::start_disjunction_group ( + CORBA::Long size) + ACE_THROW_SPEC (( + CORBA::SystemException)) + { + // Note, since we only support basic builder here... + if (size == 0L) + this->qos_.start_disjunction_group (); + else + this->qos_.start_disjunction_group (size); + } + + void + RTEvent_Consumer_Config_impl::insert_source ( + const char * source_id) + ACE_THROW_SPEC (( + CORBA::SystemException)) + { + ACE_Hash<ACE_CString> hasher; + RtecEventComm::EventSourceID int_source_id = hasher (source_id); + + ACE_DEBUG ((LM_DEBUG, "******* the source string is: %s\n", source_id)); + ACE_DEBUG ((LM_DEBUG, "******* the source id is: %i\n", int_source_id)); + + this->qos_.insert_source (int_source_id, 0); + } + + void + RTEvent_Consumer_Config_impl::insert_type ( + ::CORBA::Long event_type) + ACE_THROW_SPEC ((::CORBA::SystemException)) + { + if (event_type == 0L) + this->qos_.insert_type (ACE_ES_EVENT_ANY, 0); + else + this->qos_.insert_type (event_type, + 0); + } + + void + RTEvent_Consumer_Config_impl::consumer_id ( + const char * consumer_id) + ACE_THROW_SPEC (( + CORBA::SystemException)) + { + if (CIAO::debug_level () > 10) + { + ACE_DEBUG ((LM_DEBUG, + "RTEvent_Consumer_Config_impl::set_consumer_id:%s\n", + consumer_id)); + } + + this->consumer_id_ = consumer_id; + } + + + void + RTEvent_Consumer_Config_impl::consumer ( + Components::EventConsumerBase_ptr consumer) + ACE_THROW_SPEC (( + CORBA::SystemException)) + { + this->consumer_ = Components::EventConsumerBase::_duplicate (consumer); + } + + CONNECTION_ID + RTEvent_Consumer_Config_impl::consumer_id () + ACE_THROW_SPEC (( + CORBA::SystemException)) + { + return CORBA::string_dup (this->consumer_id_.c_str ()); + } + + + EventServiceType + RTEvent_Consumer_Config_impl::service_type () + ACE_THROW_SPEC (( + CORBA::SystemException)) + { + return this->service_type_; + } + + Components::EventConsumerBase_ptr + RTEvent_Consumer_Config_impl::consumer () + ACE_THROW_SPEC (( + CORBA::SystemException)) + { + if (CIAO::debug_level () > 10) + { + ACE_DEBUG ((LM_DEBUG, "RTEvent_Consumer_Config_impl::get_consumer\n")); + } + + return Components::EventConsumerBase::_duplicate (this->consumer_.in ()); + } + + RtecEventChannelAdmin::ConsumerQOS * + RTEvent_Consumer_Config_impl::rt_event_qos () + ACE_THROW_SPEC (( + CORBA::SystemException)) + { + RtecEventChannelAdmin::ConsumerQOS * consumer_qos = 0; + ACE_NEW_RETURN (consumer_qos, + RtecEventChannelAdmin::ConsumerQOS (this->qos_.get_ConsumerQOS ()), + 0); + + return consumer_qos; + } + + void + RTEvent_Consumer_Config_impl::destroy () + ACE_THROW_SPEC (( + CORBA::SystemException)) + { + if (CIAO::debug_level () > 10) + { + ACE_DEBUG + ((LM_DEBUG, "RTEvent_Consumer_Config_impl::destroy\n")); + } + + PortableServer::ObjectId_var oid = this->poa_->servant_to_id (this); + this->poa_->deactivate_object (oid); + this->_remove_ref (); + } +} diff --git a/modules/CIAO/ciaosvcs/Events/CIAO_RTEC/CIAO_RTEvent.h b/modules/CIAO/ciaosvcs/Events/CIAO_RTEC/CIAO_RTEvent.h new file mode 100644 index 00000000000..7737643b0a4 --- /dev/null +++ b/modules/CIAO/ciaosvcs/Events/CIAO_RTEC/CIAO_RTEvent.h @@ -0,0 +1,359 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file CIAO_RTEvent.h + * + * $Id$ + * + * @author Gan Deng <dengg@dre.vanderbilt.edu> + * @author George Edwards <g.edwards@vanderbilt.edu> + */ +//============================================================================= + +#ifndef CIAO_RTEVENT_H +#define CIAO_RTEVENT_H +#include /**/ "ace/pre.h" + +#include "ace/config-all.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +#pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "CIAO_RTEVENT_Export.h" +#include "ciaosvcs/Events/CIAO_Events_Base/CIAO_EventServiceBase.h" +#include "CIAO_RTEventS.h" + +#include "orbsvcs/orbsvcs/Event_Utilities.h" +#include "orbsvcs/orbsvcs/Event/EC_Event_Channel.h" +#include "orbsvcs/orbsvcs/Event/EC_Default_Factory.h" +#include "orbsvcs/Event/ECG_Mcast_EH.h" +#include "orbsvcs/Event/ECG_UDP_Sender.h" +#include "orbsvcs/Event/ECG_UDP_Receiver.h" +#include "orbsvcs/Event/ECG_UDP_Out_Endpoint.h" +#include "orbsvcs/Event/ECG_UDP_EH.h" +#include "ace/Hash_Map_Manager.h" + +namespace CIAO +{ + + /** + * @class RTEventService + * + * An implementation of EventServiceBase using the RT event channel. + * + * @@ (GD) There should be a place where the deployment tool could + * specify the RT Event Channel service configuration file. + * This should be the place where the RtecEventChannel servant was + * first time initialized. + */ + class CIAO_RTEVENT_Export RTEventService : + public virtual EventServiceBase, + public virtual POA_CIAO::CIAO_RT_Event_Service + { + public: + + RTEventService (CORBA::ORB_ptr orb, + PortableServer::POA_ptr poa, + const char * ec_name); + + virtual ~RTEventService (void); + + virtual Supplier_Config_ptr + create_supplier_config (void) + ACE_THROW_SPEC ((CORBA::SystemException)); + + virtual Consumer_Config_ptr + create_consumer_config (void) + ACE_THROW_SPEC ((CORBA::SystemException)); + + virtual void connect_event_supplier ( + CIAO::Supplier_Config_ptr supplier_config) + ACE_THROW_SPEC (( + CORBA::SystemException)); + + virtual void connect_event_consumer ( + CIAO::Consumer_Config_ptr consumer_config) + ACE_THROW_SPEC (( + CORBA::SystemException)); + + virtual void + disconnect_event_supplier ( + const char * consumer_id) + ACE_THROW_SPEC (( + CORBA::SystemException, + Components::InvalidConnection)); + + virtual void disconnect_event_consumer ( + const char * connection_id) + ACE_THROW_SPEC (( + CORBA::SystemException, + Components::InvalidConnection)); + + virtual void push_event ( + Components::EventBase * ev) + ACE_THROW_SPEC (( + CORBA::SystemException)); + + virtual void ciao_push_event ( + Components::EventBase * evt, + const char * source_id, + CORBA::TypeCode_ptr tc) + ACE_THROW_SPEC (( + ::CORBA::SystemException, + ::Components::BadEventType)); + + virtual ::CORBA::Boolean create_addr_serv ( + const char * name, + ::CORBA::UShort port, + const char * address) + ACE_THROW_SPEC (( + ::CORBA::SystemException)); + + virtual ::CORBA::Boolean create_sender ( + const char * addr_serv_id) + ACE_THROW_SPEC (( + ::CORBA::SystemException)); + + virtual ::CORBA::Boolean create_receiver ( + const char * addr_serv_id, + ::CORBA::Boolean is_multicast, + ::CORBA::UShort listen_port) + ACE_THROW_SPEC (( + ::CORBA::SystemException)); + + virtual ::RtecEventChannelAdmin::EventChannel_ptr tao_rt_event_channel ( + ) + ACE_THROW_SPEC ((::CORBA::SystemException)); + + private: + // @@ (GD) This is the place where use could provide a parameter + // which specifies the event channel service configuration file. + void create_rt_event_channel (const char * ec_name) + ACE_THROW_SPEC (( + CORBA::SystemException)); + + private: + + /// Reference to the ORB + CORBA::ORB_var orb_; + + /// Reference to the Root POA + PortableServer::POA_var root_poa_; + + /** + * @var RtecEventChannelAdmin::EventChannel_var rt_event_channel_ + * + * Reference to the RT event channel. + */ + RtecEventChannelAdmin::EventChannel_var rt_event_channel_; + + /** + * @var ACE_Hash_Map_Manager<> proxy_supplier_map_ + * + * Mapping of each event sink to a proxy supplier for disconnect purposes. + */ + ACE_Hash_Map_Manager_Ex<ACE_CString, + RtecEventChannelAdmin::ProxyPushConsumer_var, + ACE_Hash<ACE_CString>, + ACE_Equal_To<ACE_CString>, + ACE_Null_Mutex> proxy_consumer_map_; + + /** + * @var ACE_Hash_Map_Manager<> proxy_supplier_map_ + * + * Mapping of each event sink to a proxy supplier for disconnect purposes. + */ + ACE_Hash_Map_Manager_Ex<ACE_CString, + RtecEventChannelAdmin::ProxyPushSupplier_var, + ACE_Hash<ACE_CString>, + ACE_Equal_To<ACE_CString>, + ACE_Null_Mutex> proxy_supplier_map_; + + /** + * @var ACE_Hash_Map_Manager<> addr_serv_map_ + * + * A map which managers a set of address servers for event channel + * federation purpose. + */ + ACE_Hash_Map_Manager_Ex<ACE_CString, + RtecUDPAdmin::AddrServer_var, + ACE_Hash<ACE_CString>, + ACE_Equal_To<ACE_CString>, + ACE_Null_Mutex> addr_serv_map_; + + }; + + /** + * @class RTEventServiceSupplier_impl + * + * An implementation of the PushSupplier interface. + */ + class RTEventServiceSupplier_impl : + public virtual POA_RtecEventComm::PushSupplier + { + public: + + RTEventServiceSupplier_impl ( + PortableServer::POA_ptr poa); + + virtual void disconnect_push_supplier () + ACE_THROW_SPEC (( + CORBA::SystemException)); + + private: + + PortableServer::POA_var poa_; + }; + + /** + * @class RTEventServiceConsumer_impl + * + * An implementation of the PushConsumer interface. + */ + class RTEventServiceConsumer_impl : + public virtual POA_RtecEventComm::PushConsumer + { + public: + + RTEventServiceConsumer_impl ( + PortableServer::POA_ptr poa, + Components::EventConsumerBase_ptr consumer); + + virtual void push ( + const RtecEventComm::EventSet& events) + ACE_THROW_SPEC (( + CORBA::SystemException)); + + virtual void disconnect_push_consumer () + ACE_THROW_SPEC (( + CORBA::SystemException)); + + private: + + PortableServer::POA_var poa_; + + Components::EventConsumerBase_var event_consumer_; + }; + + /** + * @class RTEvent_Consumer_Config_impl + * + * Implementation of the RTEvent_Consumer_Config IDL interface that + * configures TAO's RT event channel. An object of this type will be returned + * from @c CIAO::Container::create_consumer_config () when @c RTEC is + * specified as the event service type. + */ + class RTEvent_Consumer_Config_impl : + public virtual POA_CIAO::RTEvent_Consumer_Config, + public virtual Event_Consumer_Config_Base + { + + public: + RTEvent_Consumer_Config_impl (PortableServer::POA_ptr poa); + + virtual ~RTEvent_Consumer_Config_impl (void); + + virtual void start_conjunction_group (CORBA::Long size) + ACE_THROW_SPEC ((CORBA::SystemException)); + + virtual void start_disjunction_group (CORBA::Long size) + ACE_THROW_SPEC ((CORBA::SystemException)); + + virtual void insert_source (const char * source_id) + ACE_THROW_SPEC ((CORBA::SystemException)); + + virtual void insert_type (::CORBA::Long event_type) + ACE_THROW_SPEC ((::CORBA::SystemException)); + + virtual void consumer_id (const char * consumer_id) + ACE_THROW_SPEC ((CORBA::SystemException)); + + virtual CONNECTION_ID consumer_id () + ACE_THROW_SPEC ((CORBA::SystemException)); + + //virtual void supplier_id (const char * supplier_id) + // ACE_THROW_SPEC ((CORBA::SystemException)); + + //virtual CONNECTION_ID supplier_id () + // ACE_THROW_SPEC ((CORBA::SystemException)); + + virtual void consumer (Components::EventConsumerBase_ptr consumer) + ACE_THROW_SPEC ((CORBA::SystemException)); + + virtual Components::EventConsumerBase_ptr consumer () + ACE_THROW_SPEC ((CORBA::SystemException)); + + virtual EventServiceType service_type () + ACE_THROW_SPEC ((CORBA::SystemException)); + + //@@ (GD) There should be a place where the deployment tool could + // set up the rt_event_qos properties for Consumer Config. + + virtual RtecEventChannelAdmin::ConsumerQOS * rt_event_qos () + ACE_THROW_SPEC ((CORBA::SystemException)); + + virtual void destroy () + ACE_THROW_SPEC ((CORBA::SystemException)); + + private: + + ACE_CString consumer_id_; + + Components::EventConsumerBase_var consumer_; + + EventServiceType service_type_; + + ACE_ConsumerQOS_Factory qos_; + + PortableServer::POA_var poa_; + }; + + /** + * @class RTEvent_Supplier_Config_impl + * + * Implementation of the RTEvent_Supplier_Config IDL interface that + * configures TAO's RT event channel. An object of this type will be returned + * from @c CIAO::Container::create_supplier_config () when @c RTEC is + * specified as the event service type. + */ + class RTEvent_Supplier_Config_impl : + public virtual POA_CIAO::RTEvent_Supplier_Config + { + public: + RTEvent_Supplier_Config_impl (PortableServer::POA_ptr poa); + + virtual ~RTEvent_Supplier_Config_impl (void); + + void supplier_id (const char * supplier_id) + ACE_THROW_SPEC ((CORBA::SystemException)); + + CONNECTION_ID supplier_id () + ACE_THROW_SPEC ((CORBA::SystemException)); + + EventServiceType service_type () + ACE_THROW_SPEC ((CORBA::SystemException)); + + //@@ (GD) There should be a place where the deployment tool could + // set up the rt_event_qos properties for Supplier Config. + + RtecEventChannelAdmin::SupplierQOS * rt_event_qos () + ACE_THROW_SPEC ((CORBA::SystemException)); + + virtual void destroy () + ACE_THROW_SPEC ((CORBA::SystemException)); + + private: + ACE_CString supplier_id_; + + EventServiceType service_type_; + + ACE_SupplierQOS_Factory qos_; + + PortableServer::POA_var poa_; + }; +} + +#include /**/ "ace/post.h" +#endif /* CIAO_RTEVENT_H */ diff --git a/modules/CIAO/ciaosvcs/Events/CIAO_RTEC/CIAO_RTEvent.idl b/modules/CIAO/ciaosvcs/Events/CIAO_RTEC/CIAO_RTEvent.idl new file mode 100644 index 00000000000..072bf1f89a1 --- /dev/null +++ b/modules/CIAO/ciaosvcs/Events/CIAO_RTEC/CIAO_RTEvent.idl @@ -0,0 +1,54 @@ +// $Id$ + +/** + * @file CIAO_RTEvent.idl + * + * @author Gan Deng <dengg@dre.vanderbilt.edu> + * @author George Edwards + * + * @brief Interfaces for configuring CIAO's RT event channel. + */ + +#include <ciaosvcs/Events/CIAO_Events_Base/CIAO_Events.idl> +#include <orbsvcs/orbsvcs/RtecEventChannelAdmin.idl> + +module CIAO +{ + interface RTEvent_Consumer_Config : + Consumer_Config + { + //void start_logical_and_group (in long size); + + //void start_negation (); + + //void insert_bitmasked_value (in long source_mask, + // in long type_mask, + // in long source_value, + // in long type_value); + + readonly attribute RtecEventChannelAdmin::ConsumerQOS rt_event_qos; + }; + + interface RTEvent_Supplier_Config : + Supplier_Config + { + readonly attribute RtecEventChannelAdmin::SupplierQOS rt_event_qos; + }; + + interface CIAO_RT_Event_Service : + CIAO_Event_Service + { + boolean create_addr_serv (in string name, + in unsigned short port, + in string address); + + boolean create_sender (in string addr_serv_id); + + boolean create_receiver (in string addr_serv_id, + in boolean is_multicast, + in unsigned short listen_port); + + RtecEventChannelAdmin::EventChannel tao_rt_event_channel (); + }; + +}; diff --git a/modules/CIAO/ciaosvcs/Events/CIAO_RTEC/CIAO_RTEvent.mpc b/modules/CIAO/ciaosvcs/Events/CIAO_RTEC/CIAO_RTEvent.mpc new file mode 100644 index 00000000000..40ba0138f4b --- /dev/null +++ b/modules/CIAO/ciaosvcs/Events/CIAO_RTEC/CIAO_RTEvent.mpc @@ -0,0 +1,27 @@ +// -*- MPC -*- +// $Id$ + +project (CIAO_RTEvent) : naming, rtevent_serv, ciao_events_base_dnc { + + sharedname = CIAO_RTEvent + idlflags += -Wb,export_include=CIAO_RTEVENT_Export.h -Wb,export_macro=CIAO_RTEVENT_Export + dynamicflags = CIAO_RTEVENT_BUILD_DLL + + IDL_Files { + CIAO_RTEvent.idl + } + + Source_Files { + CIAO_RTEvent.cpp + CIAO_RTEventC.cpp + CIAO_RTEventS.cpp + SimpleAddressServer.cpp + } + + Header_Files { + CIAO_RTEvent.h + CIAO_RTEventC.h + CIAO_RTEventS.h + SimpleAddressServer.h + } +} diff --git a/modules/CIAO/ciaosvcs/Events/CIAO_RTEC/SimpleAddressServer.cpp b/modules/CIAO/ciaosvcs/Events/CIAO_RTEC/SimpleAddressServer.cpp new file mode 100644 index 00000000000..cdce7218291 --- /dev/null +++ b/modules/CIAO/ciaosvcs/Events/CIAO_RTEC/SimpleAddressServer.cpp @@ -0,0 +1,37 @@ +/* $Id$ */ + +#include "SimpleAddressServer.h" +#include <ace/INET_Addr.h> +#include <ace/OS_NS_string.h> + +SimpleAddressServer::SimpleAddressServer (const ACE_INET_Addr& address) { + this->address_.ipaddr = address.get_ip_address (); + this->address_.port = address.get_port_number (); +} + +SimpleAddressServer::SimpleAddressServer (const RtecUDPAdmin::UDP_Addr& addr) + : address_ (addr) +{ +} + +void +SimpleAddressServer::get_addr (const RtecEventComm::EventHeader&, + RtecUDPAdmin::UDP_Addr& address) + throw (CORBA::SystemException) { + address = this->address_; +} +/* +void +SimpleAddressServer::get_ip_address (const RtecEventComm::EventHeader&, + RtecUDPAdmin::UDP_IP_Address_out address) + throw (CORBA::SystemException) { + address = new RtecUDPAdmin::UDP_IP_Address; + + ACE_INET_Addr x (this->address_.port, + static_cast<ACE_UINT32>(this->address_.ipaddr)); + address->length (x.get_addr_size ()); + ACE_OS::memcpy (address->get_buffer (), + x.get_addr (), + x.get_addr_size ()); +} +*/ diff --git a/modules/CIAO/ciaosvcs/Events/CIAO_RTEC/SimpleAddressServer.h b/modules/CIAO/ciaosvcs/Events/CIAO_RTEC/SimpleAddressServer.h new file mode 100644 index 00000000000..5a69d3171dc --- /dev/null +++ b/modules/CIAO/ciaosvcs/Events/CIAO_RTEC/SimpleAddressServer.h @@ -0,0 +1,38 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file SimpleAddressServer.h + * + * $Id$ + * + * @author Gan Deng <dengg@dre.vanderbilt.edu> + */ +//============================================================================= + +#ifndef SIMPLEADDRESSSERVER_H +#define SIMPLEADDRESSSERVER_H + +#include <orbsvcs/RtecUDPAdminS.h> + +class SimpleAddressServer : public POA_RtecUDPAdmin::AddrServer { +public: + SimpleAddressServer (const ACE_INET_Addr& address); + + SimpleAddressServer (const RtecUDPAdmin::UDP_Addr& addr); + + virtual void get_addr (const RtecEventComm::EventHeader& header, + RtecUDPAdmin::UDP_Addr& address) + throw (CORBA::SystemException); + + +/* + virtual void get_ip_address (const RtecEventComm::EventHeader& header, + RtecUDPAdmin::UDP_IP_Address_out address) + throw (CORBA::SystemException); +*/ +private: + RtecUDPAdmin::UDP_Addr address_; +}; + +#endif |