diff options
Diffstat (limited to 'TAO/orbsvcs/tests/Event/Mcast/Two_Way/application.cpp')
-rw-r--r-- | TAO/orbsvcs/tests/Event/Mcast/Two_Way/application.cpp | 666 |
1 files changed, 666 insertions, 0 deletions
diff --git a/TAO/orbsvcs/tests/Event/Mcast/Two_Way/application.cpp b/TAO/orbsvcs/tests/Event/Mcast/Two_Way/application.cpp new file mode 100644 index 00000000000..ce7c85cdd95 --- /dev/null +++ b/TAO/orbsvcs/tests/Event/Mcast/Two_Way/application.cpp @@ -0,0 +1,666 @@ +// $Id$ + +#include "Constants.h" + +#include "orbsvcs/Event_Utilities.h" +#include "orbsvcs/Event/EC_Lifetime_Utils_T.h" +#include "orbsvcs/Event/ECG_UDP_Sender.h" +#include "orbsvcs/Event/ECG_UDP_Receiver.h" + +#include "orbsvcs/RtecEventChannelAdminC.h" +#include "orbsvcs/RtecEventCommS.h" + +#include "tao/ORB_Core.h" + +#include "ace/Array_Base.h" +#include "ace/Get_Opt.h" +#include "ace/Reactor.h" +#include "ace/OS_NS_unistd.h" +#include "ace/os_include/os_netdb.h" + +// Indicates whether this application is responsible for destroying +// the Event Channel it's using upon exit. +int destroy_ec_flag = 0; + +/** + * @class Heartbeat_Application + * + * @brief A simple application for testing federation of Event + * Channels via multicast. + * + * NOTE: Contains platform-specific code (event data), i.e., + * might not work cross-platform. + * + * This class acts both as a receiver and a supplier of HEARTBEAT events + * to a multicast-federated Event Channel. After sending a prespecified + * number of heartbeat events, it prints out a summary about received + * heartbeats and shuts down. + */ +class Heartbeat_Application : + public POA_RtecEventComm::PushConsumer, + public TAO_EC_Deactivated_Object +{ +public: + + /// Constructor. + Heartbeat_Application (void); + + /// Destructor. + ~Heartbeat_Application (void); + + // Initializes the object: connects with EC as a supplier and a + // consumer and registers with reactor for timeouts. If init () + // completes successfully, shutdown () must be called when this + // object is no longer needed, for proper resource cleanup. (This + // is normally done by handle_timeout() method, but if handle_timeout() + // will not have a chance to execute, it is the responsibility of + // the user.) + void init (CORBA::ORB_var orb, + RtecEventChannelAdmin::EventChannel_var ec + ACE_ENV_ARG_DECL); + + // No-op if the object hasn't been fully initialized. Otherwise, + // deregister from reactor and poa, destroy ec or just disconnect from it + // (based on <destroy_ec> flag), and shut down the orb. + void shutdown (void); + + /// Send another heartbeat or, if we already sent/attempted the required + /// number of heartbeats, perform shutdown(). + int handle_timeout (const ACE_Time_Value& tv, + const void* act); + + /// PushConsumer methods. + //@{ + /// Update our <heartbeats_> database to reflect newly received heartbeats. + virtual void push (const RtecEventComm::EventSet &events + ACE_ENV_ARG_DECL) + ACE_THROW_SPEC((CORBA::SystemException)); + + /// Initiate shutdown(). + virtual void disconnect_push_consumer (ACE_ENV_SINGLE_ARG_DECL) + ACE_THROW_SPEC((CORBA::SystemException)); + //@} + +private: + + /** + * @class Timeout_Handler + * + * @brief Helper class for receiving timeouts from Reactor. + */ + class Timeout_Handler : public ACE_Event_Handler + { + public: + /// Constructor. + Timeout_Handler (Heartbeat_Application *recv); + /// Reactor callback. + virtual int handle_timeout (const ACE_Time_Value& tv, + const void* act); + private: + /// We callback to this object when a message arrives. + Heartbeat_Application* receiver_; + }; + + /// Helpers. + //@{ + /// Verify that arguments are not nil and store their values. + int check_args (CORBA::ORB_var orb, + RtecEventChannelAdmin::EventChannel_var ec); + /// Connects to EC as a supplier. + void connect_as_supplier (ACE_ENV_SINGLE_ARG_DECL); + /// Connects to EC as a consumer. Activate with default POA. + void connect_as_consumer (ACE_ENV_SINGLE_ARG_DECL); + /// Call destroy() on the EC. Does not propagate exceptions. + void destroy_ec (void); + /// Registers with orb's reactor for timeouts ocurring every 0.5 + /// seconds. Returns 0 on success, -1 on error. + int register_for_timeouts (void); + /// Deregister from reactor. + void stop_timeouts (void); + //@} + + /// Flag indicating whether this object has been fully initialized. + int initialized_; + + /// Helper object for receiving timeouts from Reactor. + Timeout_Handler timeout_handler_; + + /// Number of heartbeats we sent so far. + size_t n_timeouts_; + + /// Info we keep on each HEARTBEAT source. + typedef struct { + pid_t pid; + char hostname [MAXHOSTNAMELEN]; + int total; + } HEARTBEAT_SOURCE_ENTRY; + + /// Stores info on all heartbeats we received so far. + ACE_Array_Base<HEARTBEAT_SOURCE_ENTRY> heartbeats_; + + /// Our identity: pid followed by hostname. We include this info into each + /// heartbeat we send. + char hostname_and_pid_ [MAXHOSTNAMELEN+11]; + + /// ORB and EC pointers - to allow cleanup down the road. + CORBA::ORB_var orb_; + RtecEventChannelAdmin::EventChannel_var ec_; + + /// Consumer proxy which represents us in EC as a supplier. + RtecEventChannelAdmin::ProxyPushConsumer_var consumer_; + + typedef TAO_EC_Auto_Command<TAO_ECG_UDP_Sender_Disconnect_Command> + Supplier_Proxy_Disconnect; + typedef TAO_EC_Auto_Command<TAO_ECG_UDP_Receiver_Disconnect_Command> + Consumer_Proxy_Disconnect; + + /// Manages our connection to Supplier Proxy. + Supplier_Proxy_Disconnect supplier_proxy_disconnect_; + /// Manages our connection to Consumer Proxy. + Consumer_Proxy_Disconnect consumer_proxy_disconnect_; +}; +// ************************************************************************** + +Heartbeat_Application::Timeout_Handler:: +Timeout_Handler (Heartbeat_Application* r) + : receiver_ (r) +{ +} + +int +Heartbeat_Application::Timeout_Handler:: +handle_timeout (const ACE_Time_Value& tv, + const void* act) +{ + return this->receiver_->handle_timeout (tv, act); +} + +// ************************************************************************** + +Heartbeat_Application::Heartbeat_Application (void) + : initialized_ (0) + , timeout_handler_ (this) + , n_timeouts_ (0) + , orb_ () + , ec_ () + , consumer_ () + , supplier_proxy_disconnect_ () + , consumer_proxy_disconnect_ () +{ +} + +Heartbeat_Application::~Heartbeat_Application (void) +{ +} + +int +Heartbeat_Application::check_args (CORBA::ORB_var orb, + RtecEventChannelAdmin::EventChannel_var ec) +{ + if (CORBA::is_nil (ec.in ())) + { + ACE_ERROR_RETURN ((LM_ERROR, + "%N (%l): Nil ec argument to " + "Heartbeat_Application::init\n"), + -1); + } + + if (CORBA::is_nil (orb.in ())) + { + ACE_ERROR_RETURN ((LM_ERROR, + "%N (%l): Nil orb argument to " + "Heartbeat_Application::init\n"), + -1); + } + + this->ec_ = ec; + this->orb_ = orb; + + return 0; +} + +void +Heartbeat_Application::init (CORBA::ORB_var orb, + RtecEventChannelAdmin::EventChannel_var ec + ACE_ENV_ARG_DECL) +{ + // Verify arguments. + if (this->check_args (orb, ec) == -1) + { + ACE_THROW (CORBA::INTERNAL ()); + } + + // Get hostname & process id, i.e., identity of this application. + pid_t pid = ACE_OS::getpid (); + ACE_OS::memcpy (this->hostname_and_pid_, + &pid, + sizeof (pid)); + + if (gethostname (this->hostname_and_pid_ + sizeof (pid), + MAXHOSTNAMELEN) + != 0) + { + ACE_ERROR ((LM_ERROR, + "Heartbeat_Application::init - " + "cannot get hostname\n")); + ACE_THROW (CORBA::INTERNAL ()); + } + + // Connect to EC as a supplier. + this->connect_as_supplier (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + // Connect to EC as a consumer. + ACE_TRY + { + this->connect_as_consumer (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + } + ACE_CATCHANY + { + this->consumer_proxy_disconnect_.execute (); + ACE_RE_THROW; + } + ACE_ENDTRY; + ACE_CHECK; + + // Register for reactor timeouts. + if (this->register_for_timeouts () == -1) + { + this->consumer_proxy_disconnect_.execute (); + this->supplier_proxy_disconnect_.execute (); + this->deactivator_.deactivate (); + ACE_THROW (CORBA::INTERNAL ()); + } + + this->initialized_ = 1; +} + +int +Heartbeat_Application::register_for_timeouts (void) +{ + // Schedule timeout every 0.5 seconds, for sending heartbeat events. + ACE_Time_Value timeout_interval (0, 500000); + ACE_Reactor *reactor = this->orb_->orb_core ()->reactor (); + if (!reactor + || reactor->schedule_timer (&this->timeout_handler_, 0, + timeout_interval, + timeout_interval) == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, + "Heartbeat_Application::register_for_timeouts - " + "cannot schedule timer\n"), + -1); + } + + return 0; +} + +void +Heartbeat_Application::stop_timeouts (void) +{ + ACE_Reactor *reactor = this->orb_->orb_core ()->reactor (); + if (!reactor + || reactor->cancel_timer (&this->timeout_handler_) == -1) + { + ACE_ERROR ((LM_ERROR, + "Heartbeat_Application::stop_timeouts - " + "cannot deregister from reactor.\n")); + } +} + +void +Heartbeat_Application::connect_as_supplier (ACE_ENV_SINGLE_ARG_DECL) +{ + // Obtain reference to SupplierAdmin. + RtecEventChannelAdmin::SupplierAdmin_var supplier_admin = + this->ec_->for_suppliers (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + // Obtain ProxyPushConsumer and connect this supplier. + RtecEventChannelAdmin::ProxyPushConsumer_var proxy = + supplier_admin->obtain_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + Consumer_Proxy_Disconnect new_proxy_disconnect (proxy.in ()); + + ACE_SupplierQOS_Factory qos; + qos.insert (SOURCE_ID, HEARTBEAT, 0, 1); + + proxy->connect_push_supplier (RtecEventComm::PushSupplier::_nil (), + qos.get_SupplierQOS () + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + // Update resource managers. + this->consumer_ = proxy._retn (); + this->consumer_proxy_disconnect_.set_command (new_proxy_disconnect); +} + +void +Heartbeat_Application::connect_as_consumer (ACE_ENV_SINGLE_ARG_DECL) +{ + // Activate with poa. + RtecEventComm::PushConsumer_var consumer_ref; + PortableServer::POA_var poa = this->_default_POA (); + + TAO_EC_Object_Deactivator deactivator; + activate (consumer_ref, + poa.in (), + this, + deactivator + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + // Obtain reference to ConsumerAdmin. + RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin = + this->ec_->for_consumers (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + // Obtain ProxyPushSupplier.. + RtecEventChannelAdmin::ProxyPushSupplier_var proxy = + consumer_admin->obtain_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + Supplier_Proxy_Disconnect new_proxy_disconnect (proxy.in ()); + + // Connect this consumer. + ACE_ConsumerQOS_Factory qos; + qos.start_disjunction_group (1); + qos.insert_type (ACE_ES_EVENT_ANY, 0); + proxy->connect_push_consumer (consumer_ref.in (), + qos.get_ConsumerQOS () + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + // Update resource managers. + this->supplier_proxy_disconnect_.set_command (new_proxy_disconnect); + this->set_deactivator (deactivator); +} + +int +Heartbeat_Application::handle_timeout (const ACE_Time_Value&, + const void*) +{ + ACE_TRY_NEW_ENV + { + if (this->n_timeouts_++ < HEARTBEATS_TO_SEND) + { + RtecEventComm::EventSet events (1); + events.length (1); + // Events travelling through gateways must have a ttl count of at + // least 1! + events[0].header.ttl = 1; + events[0].header.type = HEARTBEAT; + events[0].header.source = SOURCE_ID; + + // Store our hostname and process id in the data portion of + // the event. + events[0].data.payload.replace (MAXHOSTNAMELEN+11, + MAXHOSTNAMELEN+11, + (u_char *)this->hostname_and_pid_, + 0); + + this->consumer_->push (events ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + } + else + // We already sent the required number of heartbeats. Time to + // shutdown this app. + { + this->shutdown (); + } + } + ACE_CATCHANY + { + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, + "Suppressed the following exception in " + "Heartbeat_Application::handle_timeout:\n"); + } + ACE_ENDTRY; + return 0; +} + +void +Heartbeat_Application::push (const RtecEventComm::EventSet &events + ACE_ENV_ARG_DECL_NOT_USED) + ACE_THROW_SPEC((CORBA::SystemException)) +{ + for (CORBA::ULong i = 0; i < events.length (); ++i) + { + // Figure out heartbeat source. + const u_char * buffer = events[i].data.payload.get_buffer (); + pid_t pid = *((pid_t*) buffer); + char * host = (char*) buffer + sizeof (pid); + + // Update heartbeat database. + int found = 0; + size_t size = this->heartbeats_.size (); + for (size_t j = 0; j < size; ++j) + { + if (this->heartbeats_[j].pid == pid + && ACE_OS::strcmp (this->heartbeats_[j].hostname, host) + == 0) + { + this->heartbeats_[j].total++; + found = 1; + break; + } + } + // Make new entry in the database. + if (!found) + { + if (this->heartbeats_.size (size + 1) + == -1) + { + ACE_ERROR ((LM_ERROR, + "Unable to add new entry " + "to heartbeat database \n")); + break; + } + + this->heartbeats_[size].pid = pid; + this->heartbeats_[size].total = 1; + ACE_OS::memcpy (this->heartbeats_[size].hostname, + host, + ACE_OS::strlen (host) + 1); + } + } +} + +void +Heartbeat_Application::disconnect_push_consumer (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) + ACE_THROW_SPEC((CORBA::SystemException)) +{ + this->shutdown (); +} + +void +Heartbeat_Application::destroy_ec (void) +{ + if (!CORBA::is_nil (this->ec_.in ())) + { + ACE_TRY_NEW_ENV + { + this->ec_->destroy (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + } + ACE_CATCHANY + { + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, + "Suppressed the following exception in " + "Application_Heartbeat::destroy_ec\n"); + } + ACE_ENDTRY; + + this->ec_ = RtecEventChannelAdmin::EventChannel::_nil (); + } +} +void +Heartbeat_Application::shutdown (void) +{ + if (!this->initialized_) + return; + + this->initialized_ = 0; + + // Deregister from Reactor. + this->stop_timeouts (); + + // Disconnect from ECs as a consumer. + this->supplier_proxy_disconnect_.execute (); + // Disconnect from EC as a supplier. + this->consumer_proxy_disconnect_.execute (); + + if (destroy_ec_flag) + { + this->destroy_ec (); + } + + // Deregister from POA. + this->deactivator_.deactivate (); + + // Print out heartbeats report. + pid_t pid = ACE_OS::getpid (); + char hostname[MAXHOSTNAMELEN + 1]; + if (gethostname (hostname, MAXHOSTNAMELEN) != 0) + { + ACE_ERROR ((LM_ERROR, + "Heartbeat_Application::shutdown - " + "cannot get hostname\n")); + hostname[0] = '\0'; + } + ACE_DEBUG ((LM_DEBUG, + "%d@%s Received following heartbeats:\n", + pid, hostname)); + for (size_t i = 0; i < this->heartbeats_.size (); ++i) + { + ACE_DEBUG ((LM_DEBUG, + "Host %s, pid %d - total of %u\n", + this->heartbeats_[i].hostname, + this->heartbeats_[i].pid, + this->heartbeats_[i].total)); + } + + // Shutdown the ORB. + ACE_TRY_NEW_ENV + { + this->orb_->shutdown (0 ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + } + ACE_CATCHANY + { + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, + "The following exception occured in " + "Heartbeat_Application::shutdown:\n"); + } + ACE_ENDTRY; +} + +//////////////////////////////////////////////////////////// +int +check_for_nil (CORBA::Object_ptr obj, const char *message) +{ + if (CORBA::is_nil (obj)) + ACE_ERROR_RETURN ((LM_ERROR, + "ERROR: Object reference <%s> is nil\n", + message), + -1); + else + return 0; +} + +int +parse_args (int argc, char ** argv) +{ + ACE_Get_Opt get_opt (argc, argv, "d"); + int opt; + + while ((opt = get_opt ()) != EOF) + { + switch (opt) + { + case 'd': + destroy_ec_flag = 1; + break; + + case '?': + default: + ACE_DEBUG ((LM_DEBUG, + "Usage: %s " + "-d" + "\n", + argv[0])); + return -1; + } + } + + return 0; +} + +int +main (int argc, char *argv[]) +{ + // We may want this to be alive beyond the next block. + TAO_EC_Servant_Var<Heartbeat_Application> app; + + ACE_TRY_NEW_ENV + { + // Initialize ORB and POA, POA Manager, parse args. + CORBA::ORB_var orb = + CORBA::ORB_init (argc, argv, "" ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + if (parse_args (argc, argv) == -1) + return 1; + + CORBA::Object_var obj = + orb->resolve_initial_references ("RootPOA" ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + PortableServer::POA_var poa = + PortableServer::POA::_narrow (obj.in () ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + if (check_for_nil (poa.in (), "POA") == -1) + return 1; + + PortableServer::POAManager_var manager = + poa->the_POAManager (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + // Obtain reference to EC. + obj = orb->resolve_initial_references ("Event_Service" ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + RtecEventChannelAdmin::EventChannel_var ec = + RtecEventChannelAdmin::EventChannel::_narrow (obj.in () + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + if (check_for_nil (ec.in (), "EC") == -1) + return 1; + + // Init our application. + app = new Heartbeat_Application; + if (!app.in ()) + return 1; + + app->init (orb, ec ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + // Allow processing of CORBA requests. + manager->activate (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + // Receive events from EC. + orb->run (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + } + ACE_CATCHANY + { + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, + "Exception in Heartbeat Application:"); + // Since there was an exception, application might not have had + // a chance to shutdown. + app->shutdown (); + return 1; + } + ACE_ENDTRY; + + return 0; +} |