diff options
Diffstat (limited to 'TAO/orbsvcs/tests/EC_Mcast/EC_Mcast.h')
-rw-r--r-- | TAO/orbsvcs/tests/EC_Mcast/EC_Mcast.h | 506 |
1 files changed, 506 insertions, 0 deletions
diff --git a/TAO/orbsvcs/tests/EC_Mcast/EC_Mcast.h b/TAO/orbsvcs/tests/EC_Mcast/EC_Mcast.h new file mode 100644 index 00000000000..96f43929d34 --- /dev/null +++ b/TAO/orbsvcs/tests/EC_Mcast/EC_Mcast.h @@ -0,0 +1,506 @@ +// -*- C++ -*- + +// $Id$ + +// +// ============================================================================ +// +// = DESCRIPTION +// This test attempts to communicate several Event Channels UDP +// using multicast. +// The test reads a configuration file that describe what events are +// received by each "Federation". The user must provide, on the +// command line, which federations are present on each process +// (these are called the "Local Federations"). +// The test also creates one supplier for each federation, the +// supplier can send an event of any possible type described in the +// file. + +// = HOW +// The test creates one UDP_Sender for each remote federation, +// this is a PushConsumer that sends the events using UDP +// multicast. +// Notice that there is still a win in using multicast because +// multiple copies of the federation may be present. +// To receive the event the test creates one UDP_Receiver for each +// local federation, it joins to the right multicast groups and +// pushes the events it receives, acting as a PushSupplier. +// +// The UDP_Receiversfederation suppliers Mcast packets as local events +// could observe the changes in the local subscriptions and use that +// to join or leave the multicast groups. +// To demostrate this the test will need to reconfigure its +// subscription list every so often (a few seconds seems like a good +// idea). +// +// = TODO +// +// It is unfortunate that the test must know before-hand the remote +// consumer interests. It would be really simple to use a better +// strategy: the test could "observe" changes in the remote EC +// subscription list, it could then modify its local consumers +// subscriptions. +// +// ============================================================================ + +#ifndef EC_MCAST_H +#define EC_MCAST_H + +#include "ace/SString.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "ace/High_Res_Timer.h" +#include "orbsvcs/RtecEventChannelAdminC.h" +#include "orbsvcs/RtecEventCommS.h" +#include "orbsvcs/Channel_Clients_T.h" +#include "orbsvcs/Event/ECG_UDP_Sender.h" +#include "orbsvcs/Event/EC_UDP_Admin.h" +#include "orbsvcs/Event/ECG_Mcast_EH.h" +#include "orbsvcs/Event/ECG_UDP_Out_Endpoint.h" +#include "orbsvcs/Event/ECG_UDP_Receiver.h" +#include "orbsvcs/Event/ECG_UDP_Sender.h" + +class ECM_Driver; + +class ECM_Federation +{ + // = DESCRIPTION + // The test reads a configuration file where it obtains the data + // about each "federation". A federation is some application, + // distributed over several processes. The potential set of + // publications and the potential set of subscriptions is known + // beforehand, but the actual publications (or subscriptions) may + // change dynamically. + // As stated above the federation may be present in more than one + // process, but also a process may participate in more than one + // federation. + // +public: + ECM_Federation (char* name, + CORBA::UShort mcast_port, + int supplier_types, + char** supplier_names, + int consumer_types, + char** consumer_names); + // Constructor, it assumes ownership of the buffers, strings must be + // allocated using CORBA::string_alloc(), buffers using operator new. + + ~ECM_Federation (void); + // Dtor + + const char* name (void) const; + // The name of the federation.... + + CORBA::UShort mcast_port (void) const; + // The port used by this federation to receive mcast messages. + + int supplier_types (void) const; + // The number of different event types published by this federation. + + const char* supplier_name (CORBA::ULong i) const; + // The name (mcast addr in A.B.C.D format) of the event type <i> + + CORBA::ULong supplier_ipaddr (CORBA::ULong i) const; + // The ipaddr (in host byte order) of the event type <i> + + int consumer_types (void) const; + // The number of different event types consumed by this federation. + + const char* consumer_name (CORBA::ULong i) const; + // The name (mcast addr in A.B.C.D format) of the event type <i> + + CORBA::ULong consumer_ipaddr (CORBA::ULong i) const; + // The ipaddr (in host byte order) of the event type <i> + + void open (TAO_ECG_UDP_Out_Endpoint *endoint, + RtecEventChannelAdmin::EventChannel_ptr ec); + // Connect the UDP sender to the EC. + + void close (void); + // Close the UDP sender, disconnect from the EC + + int sender_local_addr (ACE_INET_Addr& addr); + // Return the sender local address + + RtecUDPAdmin::AddrServer_ptr addr_server (void); + // This address server can be used to convert event headers + // (type,source) to UDP addresses (ipaddr,port) + +private: + char* name_; + CORBA::UShort mcast_port_; + + int supplier_types_; + char** supplier_names_; + CORBA::ULong* supplier_ipaddr_; + + int consumer_types_; + char** consumer_names_; + CORBA::ULong* consumer_ipaddr_; + + TAO_EC_Servant_Var<TAO_ECG_UDP_Sender> sender_; + // The sender + + TAO_EC_Simple_AddrServer addr_server_; + // Resolve event headers (type,source) to UDP addresses + // (ipaddr,port) +}; + +class ECM_Local_Federation; + +class ECM_Supplier : public POA_RtecEventComm::PushSupplier +{ + // + // = TITLE + // Helper class to simulate an application acting as an event + // supplier. + // + // = DESCRIPTION + // This class connects as a consumer for timeouts in the EC. On + // every timeout it delegates on the ECM_Local_Federation class, + // usually this results in some reconfiguration and/or some events + // sent. + // +public: + ECM_Supplier (ECM_Local_Federation* federation); + + void open (const char* name, + RtecEventChannelAdmin::EventChannel_ptr event_channel); + // This method connects the supplier to the EC. + + void close (void); + // Disconnect from the EC. + + void activate (RtecEventChannelAdmin::EventChannel_ptr event_channel, + RtecEventComm::Time interval); + // Connect as a consumer to start receiving events. + + RtecEventComm::EventSourceID supplier_id (void) const; + // The supplier ID. + + void push (const RtecEventComm::EventSet& events); + void disconnect_push_consumer (void); + // Implement the callbacks for our consumer personality. + + // = The POA_RtecEventComm::PushSupplier methods. + virtual void disconnect_push_supplier (void); + +private: + ECM_Local_Federation* federation_; + // To callback the federation. + + RtecEventComm::EventSourceID supplier_id_; + // We generate an id based on the name.... + + RtecEventChannelAdmin::ProxyPushConsumer_var consumer_proxy_; + // We talk to the EC (as a supplier) using this proxy. + + ACE_PushConsumer_Adapter<ECM_Supplier> consumer_; + // We also connect to the EC as a consumer so we can receive the + // timeout events. + + RtecEventChannelAdmin::ProxyPushSupplier_var supplier_proxy_; + // We talk to the EC (as a supplier) using this proxy. +}; + +class ECM_Consumer : public POA_RtecEventComm::PushConsumer +{ + // + // = TITLE + // Helper class to simulate an application acting as an event + // consumer. + // + // = DESCRIPTION + // This class connects as an event consumer to the EC. The events + // are actually handled by the ECM_Local_Federation. +public: + ECM_Consumer (ECM_Local_Federation* federation); + + void open (const char* name, + RtecEventChannelAdmin::EventChannel_ptr event_channel, + ACE_RANDR_TYPE &seed); + // This method connects the consumer to the EC. + + void close (void); + // Disconnect from the EC. + + void connect (ACE_RANDR_TYPE& seed); + void disconnect (void); + // Disconnect from the supplier, but do not forget about it or close + // it. + + // = The POA_RtecEventComm::PushComsumer methods. + virtual void push (const RtecEventComm::EventSet& events); + virtual void disconnect_push_consumer (void); + +private: + ECM_Local_Federation* federation_; + // To callback. + + RtecEventChannelAdmin::ProxyPushSupplier_var supplier_proxy_; + // We talk to the EC using this proxy. + + RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin_; + // We talk to the EC using this proxy. +}; + +class ECM_Local_Federation +{ + // = DESCRIPTION + // This class is used to represent a federation that is actually + // running in this process. + // +public: + ECM_Local_Federation (ECM_Federation *federation, + ECM_Driver *driver); + // Constructor. + ~ECM_Local_Federation (void); + // Destructor + + void open (int event_count, + RtecEventChannelAdmin::EventChannel_ptr event_channel); + // Connect both the supplier and the consumer. + + void close (void); + // Disconnect everybody from the EC + + void activate (RtecEventChannelAdmin::EventChannel_ptr event_channel, + RtecEventComm::Time interval); + // Activate the supplier + + void supplier_timeout (RtecEventComm::PushConsumer_ptr consumer); + // The supplier is ready to send a new event. + + void consumer_push (ACE_hrtime_t arrival, + const RtecEventComm::EventSet& event); + // The consumer just received an event. + + const ECM_Federation *federation (void) const; + // The federation description. + + void open_receiver (RtecEventChannelAdmin::EventChannel_ptr ec, + TAO_ECG_Refcounted_Endpoint ignore_from); + // Connect the UDP receiver to the EC. + + void close_receiver (void); + // Close the UDP receiver, disconnect from the EC + + void dump_results (void) const; + // Report the results back to the user... + + void subscribed_bit (int i, CORBA::Boolean x); + CORBA::Boolean subscribed_bit (int i) const; + // Set&Get the subscribed bit; this defines the subset of events + // that we actually publish. + + // = Delegate on the federation description + const char* name (void) const; + CORBA::UShort mcast_port (void) const; + int supplier_types (void) const; + const char* supplier_name (CORBA::ULong i) const; + CORBA::ULong supplier_ipaddr (CORBA::ULong i) const; + int consumer_types (void) const; + const char* consumer_name (CORBA::ULong i) const; + CORBA::ULong consumer_ipaddr (CORBA::ULong i) const; + +private: + ECM_Federation *federation_; + // The description of the events we send and receive. + + ECM_Driver *driver_; + // The test driver. + + ECM_Consumer consumer_; + ECM_Supplier supplier_; + // The supplier and consumer helper classes, other than + // initialization this classes only forward events to the + // Federation. + + // Collect statistics + + CORBA::ULong recv_count_; + // Messages received. + + CORBA::ULong unfiltered_count_; + // Messages received that were not properly filtered. + + CORBA::ULong invalid_count_; + // Message received that could *not* be destined to this federation, + // yet they were received. + + CORBA::ULong send_count_; + // Messages sent. + + int event_count_; + // How many messages will we send before stop the simulation. + + ACE_Time_Value last_publication_change_; + // The last time we changed our publication list, we don't change it + // too often. + + ACE_Time_Value last_subscription_change_; + // The last time we changed our publication, so we don't change too + // often. + + TAO_EC_Servant_Var<TAO_ECG_UDP_Receiver> receiver_; + // This object reads the events and pushes them into the EC. Notice + // that it can receive events from multiple Event Handlers. + + TAO_ECG_Mcast_EH* mcast_eh_; + // The event handler, it receives callbacks from the reactor + // whenever an event is available in some of the multicast groups, + // it then forwards to the <mcast_recv_> object for processing and + // dispatching of the event. + // @@ TODO Eventually we may need several of this objects to handle + // OS limitations on the number of multicast groups per socket. + + ACE_RANDR_TYPE seed_; + // The seed for a random number generator. + + CORBA::ULong subscription_change_period_; + // The (average) period between subscription changes, in usecs + + CORBA::ULong publication_change_period_; + // The (average) period between publication changes, in usecs + + CORBA::Boolean* subscription_subset_; + // The events we are actually subscribed to. +}; + +class ECM_Driver +{ + // + // = TITLE + // Demonstrate the use of the UDP Gateways. + // + // = DESCRIPTION + // This class is design to exercise several features of the UDP + // Gateways and its companion classes. + // We create a set of processes, each running one EC, with + // multiple consumers and suppliers colocated with the EC. + // The ECs communicate among themselves using multicast. + // The test thus show how to use multicast, change the local + // ECG_UDP_Receiver and ECG_UDP_Sender QoS specifications + // dynamically, how to economically use the OS resources to + // receive and send multicast messages, etc. + // +public: + ECM_Driver (void); + + enum { + MAX_EVENTS = 1024, + // Maximum number of events to send on each Federation. + + MAX_LOCAL_FEDERATIONS = 16, + // Maximum number of federations running on a single process + + MAX_FEDERATIONS = 128 + // Maximum number of federations in the simulation + }; + + int run (int argc, char* argv[]); + // Run the test, read all the configuration files, etc. + + void federation_has_shutdown (ECM_Local_Federation *federation); + // One of the federations has completed its simulation, once all of + // them finish the test exists. + + +private: + void open_federations (RtecEventChannelAdmin::EventChannel_ptr ec); + // Connect the federations to the EC. + + void activate_federations (RtecEventChannelAdmin::EventChannel_ptr ec); + // Activate all the federations + + void close_federations (void); + // Close the federations, i.e. disconnect from the EC, deactivate + // the objects, etc. + + void open_senders (RtecEventChannelAdmin::EventChannel_ptr ec); + // Connect all the senders, so we can start multicasting events. + + void open_receivers (RtecEventChannelAdmin::EventChannel_ptr ec); + // Connect all the receivers, thus we accept events arriving through + // multicast. + + void close_senders (void); + // Close all the senders to cleanup resources. + + void close_receivers (void); + // Close all the receivers to cleanup resources. + + int shutdown (void); + // Called when the main thread. + + int parse_args (int argc, char* argv[]); + // parse the command line arguments + + int parse_config_file (void); + // parse the command line arguments + + int parse_name_list (FILE* file, int n, char** names, + const char* error_msg); + // parse one of the lists of names in the federation definition. + + int skip_blanks (FILE* file, + const char* error_msg); + // skip the blanks in the file. + + void dump_results (void); + // Dump the results to the standard output. + +private: + int event_period_; + // The events are generated using this interval, in microseconds. + + int event_count_; + // How many events will the suppliers send + + char* config_filename_; + // The name of the file where we read the configuration. + + const char* pid_filename_; + // The name of a file where the process stores its pid + + int local_federations_count_; + // How many federations are running in this process (or, if you + // prefer, in how many federations does this process participate). + + ECM_Local_Federation* local_federations_[MAX_LOCAL_FEDERATIONS]; + // The local federations. + + char* local_names_[MAX_LOCAL_FEDERATIONS]; + // The names of the local federations. + + int all_federations_count_; + // The total number of federations we belong to. + + ECM_Federation* all_federations_[MAX_FEDERATIONS]; + // All the federations. + + ACE_Atomic_Op<TAO_SYNCH_MUTEX,CORBA::ULong> federations_running_; + // Keep track of how many federations are active so we can shutdown + // once they are all destroyed. + + ACE_hrtime_t test_start_; + ACE_hrtime_t test_stop_; + // Measure the test elapsed time as well as mark the beginning of + // the frames. + + CORBA::ORB_var orb_; + // The ORB, so we can shut it down. + + TAO_ECG_UDP_Out_Endpoint endpoint_; + // This socket is shared by all the federations to send the + // multicast events. +}; + +#if defined (__ACE_INLINE__) +#include "EC_Mcast.inl" +#endif /* __ACE_INLINE__ */ + +#endif /* EC_MCAST_H */ |