diff options
Diffstat (limited to 'TAO/orbsvcs/tests/Notify/Persistent_Filter')
9 files changed, 1342 insertions, 0 deletions
diff --git a/TAO/orbsvcs/tests/Notify/Persistent_Filter/Filter.cpp b/TAO/orbsvcs/tests/Notify/Persistent_Filter/Filter.cpp new file mode 100644 index 00000000000..3176f89fcdd --- /dev/null +++ b/TAO/orbsvcs/tests/Notify/Persistent_Filter/Filter.cpp @@ -0,0 +1,752 @@ +/* -*- C++ -*- $Id$ */ + +#include "Filter.h" +#include "ace/Get_Opt.h" +#include "ace/OS.h" + +ACE_RCSID(Filter, Filter, "Filter.cpp,v 1.13 2002/05/28 20:24:16 pradeep Exp") + +#define NOTIFY_FACTORY_NAME "NotifyEventChannelFactory" +#define NAMING_SERVICE_NAME "NameService" +#define CA_FILTER "threshold < 20" +#define SA_FILTER "threshold > 10" + +#define MOD_CA_FILTER "threshold < 15" +#define MOD_SA_FILTER "threshold > 10" + +#define TCL_GRAMMAR "TCL" +#define EVENTS_TO_SEND 30 +int EVENTS_EXPECTED_TO_RECEIVE = 9*4; // 2 consumers get the same events from 2 suppliers +#define DOMAIN_NAME "*" +#define TYPE_NAME "*" + + ACE_Atomic_Op <TAO_SYNCH_MUTEX, int> g_result_count = 0; + +FilterClient::FilterClient (void) + :consumer_1 (0), + consumer_2 (0), + supplier_1 (0), + supplier_2 (0), + adminid_1_id_ (0), + adminid_2_id_ (0), + channel_id_ (0), + use_persistent_ (false), + modify_constraint_ (false), + done_ (0) +{ + g_result_count = 0; + // No-Op. + ifgop_ = CosNotifyChannelAdmin::AND_OP; +} + +FilterClient::~FilterClient () +{ +} + + +int +FilterClient::parse_args (int argc, ACE_TCHAR *argv[]) +{ + ACE_Get_Opt opts (argc, argv, ACE_TEXT("pm")); + int c; + + while ((c = opts ()) != -1) + switch (c) + { + case 'p': + this->use_persistent_ = true; + break; + case 'm': + this->modify_constraint_ = true; + EVENTS_EXPECTED_TO_RECEIVE = 4*4; + break; + case '?': + default: + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("usage: %C [-p] [-m]\n"), + argv [0]), + -1); + } + + return 0; +} + +void +FilterClient::init_supplier (int argc, ACE_TCHAR *argv []) +{ + init_ORB (argc, argv); + + if (parse_args (argc, argv) == -1) + { + ACE_OS::exit (1); + } + + resolve_naming_service (); + + resolve_Notify_factory (); + + if (this->use_persistent_) + { + get_EC (); + } + else + { + create_EC (); + } + + if (this->use_persistent_) + { + get_supplieradmin (); + } + else + { + create_supplieradmin (); + } + + create_suppliers (); +} + + +void +FilterClient::init_consumer (int argc, ACE_TCHAR *argv []) +{ + init_ORB (argc, argv); + + if (parse_args (argc, argv) == -1) + { + ACE_OS::exit (1); + } + + resolve_naming_service (); + + resolve_Notify_factory (); + + get_EC (); + + if (this->use_persistent_) + { + get_consumeradmin (); + } + else + { + create_consumeradmin (); + } + + create_consumers (); +} + +void +FilterClient::wait_ready () +{ + while ( 1 ) + { + CosNotifyChannelAdmin::AdminIDSeq_var cons_ids + = ec_->get_all_consumeradmins (); + + CosNotifyChannelAdmin::AdminIDSeq_var sup_ids + = ec_->get_all_supplieradmins (); + + if (cons_ids->length () + sup_ids->length () == 3) + break; + else + ACE_OS::sleep (1); + } +} + + +void +FilterClient::wait_consumer_complete () +{ + int i = 0; + const int TIMEOUT = 30; + + while ( i < TIMEOUT ) + { + try + { + CosNotifyChannelAdmin::AdminIDSeq_var ids + = this->ec_->get_all_consumeradmins(); + + if (ids->length () > 0) + { + ACE_OS::sleep (1); + ++i; + } + else + break; + } + catch (const CORBA::OBJECT_NOT_EXIST&) + { + break; + } + } +} + + +void +FilterClient::run_supplier () +{ + this->wait_ready (); + + //Add delay so consumer won't miss any events. + ACE_OS::sleep (5); + + send_events (); + + this->wait_consumer_complete (); +} + +void +FilterClient::run_consumer () +{ + this->wait_ready (); + + if (g_result_count != EVENTS_EXPECTED_TO_RECEIVE) + { // if we still need to wait for events, run the orb. + while (!this->done_) + if (this->orb_->work_pending ()) + this->orb_->perform_work (); + } + + //ACE_DEBUG ((LM_DEBUG, "(%P|%t)destroying consumer admins ... \n")); + + //this->consumer_admin_1_->destroy (); + //this->consumer_admin_2_->destroy (); +} + +void +FilterClient::done (void) +{ + this->done_ = 1; +} + +void +FilterClient::init_ORB (int& argc, ACE_TCHAR **& argv) +{ + this->orb_ = CORBA::ORB_init (argc, argv); + + + CORBA::Object_ptr poa_object = + this->orb_->resolve_initial_references("RootPOA"); + + if (CORBA::is_nil (poa_object)) + { + ACE_ERROR ((LM_ERROR, + " (%P|%t) Unable to initialize the POA.\n")); + return; + } + this->root_poa_ = + PortableServer::POA::_narrow (poa_object); + + PortableServer::POAManager_var poa_manager = + root_poa_->the_POAManager (); + + poa_manager->activate (); +} + +void +FilterClient::resolve_naming_service () +{ + CORBA::Object_var naming_obj = + this->orb_->resolve_initial_references (NAMING_SERVICE_NAME); + + // Need to check return value for errors. + if (CORBA::is_nil (naming_obj.in ())) + throw CORBA::UNKNOWN (); + + this->naming_context_ = + CosNaming::NamingContext::_narrow (naming_obj.in ()); +} + +void +FilterClient::resolve_Notify_factory () +{ + CosNaming::Name name (1); + name.length (1); + name[0].id = CORBA::string_dup (NOTIFY_FACTORY_NAME); + + CORBA::Object_var obj = + this->naming_context_->resolve (name); + + this->notify_factory_ = + CosNotifyChannelAdmin::EventChannelFactory::_narrow (obj.in ()); +} + +void +FilterClient::create_EC () +{ + ec_ = notify_factory_->create_channel (initial_qos_, + initial_admin_, + channel_id_); + + ACE_ASSERT (!CORBA::is_nil (ec_.in ())); +} + +void +FilterClient::get_EC () +{ + int i = 0; + const int TIMEOUT = 20; + while (i < TIMEOUT) + { + CosNotifyChannelAdmin::ChannelIDSeq_var ids + = notify_factory_->get_all_channels (); + + if (ids->length () > 0) + { + //ACE_DEBUG ((LM_DEBUG, "(%P|%t)get_EC %d \n", ids->length ())); + ec_ = notify_factory_->get_event_channel (ids[0]); + + ACE_ASSERT (!CORBA::is_nil (ec_.in ())); + break; + } + else + { + ++ i; + ACE_OS::sleep (1); + } + } +} + +void +FilterClient::create_supplieradmin () +{ + CosNotifyChannelAdmin::AdminID adminid = 0; + + supplier_admin_ = + ec_->new_for_suppliers (this->ifgop_, adminid); + + ACE_ASSERT (!CORBA::is_nil (supplier_admin_.in ())); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t)create_supplieradmin %d \n"), adminid)); + + CosNotifyFilter::FilterFactory_var ffact = + ec_->default_filter_factory (); + + // setup a filter at the consumer admin + CosNotifyFilter::Filter_var sa_filter = + ffact->create_filter (TCL_GRAMMAR); + + ACE_ASSERT (!CORBA::is_nil (sa_filter.in ())); + + CosNotifyFilter::ConstraintExpSeq constraint_list (2); + constraint_list.length (2); + + constraint_list[0].event_types.length (1); + constraint_list[0].event_types[0].domain_name = CORBA::string_dup(DOMAIN_NAME); + constraint_list[0].event_types[0].type_name = CORBA::string_dup(TYPE_NAME); + + constraint_list[0].constraint_expr = CORBA::string_dup (SA_FILTER); + constraint_list[1].event_types.length (1); + constraint_list[1].event_types[0].domain_name = CORBA::string_dup(DOMAIN_NAME); + constraint_list[1].event_types[0].type_name = CORBA::string_dup(TYPE_NAME); + constraint_list[1].constraint_expr = CORBA::string_dup (SA_FILTER); + + sa_filter->add_constraints (constraint_list); + + supplier_admin_->add_filter (sa_filter.in ()); +} + + +void +FilterClient::get_supplieradmin () +{ + CosNotifyChannelAdmin::AdminIDSeq_var ids + = ec_->get_all_supplieradmins(); + + ACE_ASSERT (ids->length () == 1); + + supplier_admin_ = ec_->get_supplieradmin (ids[0]); + + ACE_ASSERT (!CORBA::is_nil (supplier_admin_.in ())); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t)get_supplieradmin %d \n"), ids[0])); + + CosNotifyFilter::FilterAdmin_var admin + = CosNotifyFilter::FilterAdmin::_narrow (supplier_admin_.in ()); + verify_filter (admin, SA_FILTER, MOD_SA_FILTER); +} + + +void +FilterClient::create_consumeradmin () +{ + consumer_admin_1_ = + ec_->new_for_consumers (this->ifgop_, this->adminid_1_id_); + + ACE_ASSERT (!CORBA::is_nil (consumer_admin_1_.in ())); + + consumer_admin_2_ = + ec_->new_for_consumers (this->ifgop_, this->adminid_2_id_); + + ACE_ASSERT (!CORBA::is_nil (consumer_admin_2_.in ())); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t)create_consumeradmin %d %d\n"), + adminid_1_id_, adminid_2_id_)); + + CosNotifyFilter::FilterFactory_var ffact = + ec_->default_filter_factory (); + + // setup a filter at the consumer admin + CosNotifyFilter::Filter_var ca_filter_1 = + ffact->create_filter (TCL_GRAMMAR); + + ACE_ASSERT (!CORBA::is_nil (ca_filter_1.in ())); + + // setup a filter at the consumer admin + CosNotifyFilter::Filter_var ca_filter_2 = + ffact->create_filter (TCL_GRAMMAR); + + ACE_ASSERT (!CORBA::is_nil (ca_filter_2.in ())); + + /* struct ConstraintExp { + CosNotification::EventTypeSeq event_types; + string constraint_expr; + }; + */ + CosNotifyFilter::ConstraintExpSeq constraint_list (2); + constraint_list.length (2); + + constraint_list[0].event_types.length (1); + constraint_list[0].event_types[0].domain_name = CORBA::string_dup(DOMAIN_NAME); + constraint_list[0].event_types[0].type_name = CORBA::string_dup(TYPE_NAME); + + constraint_list[0].constraint_expr = CORBA::string_dup (CA_FILTER); + constraint_list[1].event_types.length (1); + constraint_list[1].event_types[0].domain_name = CORBA::string_dup(DOMAIN_NAME); + constraint_list[1].event_types[0].type_name = CORBA::string_dup(TYPE_NAME); + constraint_list[1].constraint_expr = CORBA::string_dup (CA_FILTER); + + ca_filter_1->add_constraints (constraint_list); + ca_filter_2->add_constraints (constraint_list); + + consumer_admin_1_->add_filter (ca_filter_1.in ()); + + consumer_admin_2_->add_filter (ca_filter_2.in ()); +} + +void +FilterClient::get_consumeradmin () +{ + CosNotifyChannelAdmin::AdminIDSeq_var ids + = ec_->get_all_consumeradmins(); + + ACE_ASSERT (ids->length () == 2); + + this->adminid_1_id_ = ids[0]; + this->adminid_2_id_ = ids[1]; + + consumer_admin_1_ = ec_->get_consumeradmin (this->adminid_1_id_); + consumer_admin_2_ = ec_->get_consumeradmin (this->adminid_2_id_); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t)get_consumeradmin %d %d\n"), + adminid_1_id_, adminid_2_id_)); + + CosNotifyFilter::FilterAdmin_var admin + = CosNotifyFilter::FilterAdmin::_narrow (consumer_admin_1_.in ()); + verify_filter (admin, CA_FILTER, MOD_CA_FILTER); + admin = CosNotifyFilter::FilterAdmin::_narrow (consumer_admin_2_.in ()); + verify_filter (admin, CA_FILTER, MOD_CA_FILTER); +} + + +void +FilterClient::verify_filter (CosNotifyFilter::FilterAdmin_var& admin, + const char* constraint_expr, + const char* mod_constraint_expr) +{ + ACE_UNUSED_ARG (constraint_expr); + // only used to validate assert, which is + // compiled out for nondebug builds. + CosNotifyFilter::FilterIDSeq_var ids + = admin->get_all_filters (); + + ACE_ASSERT (ids->length () == 1); + + CosNotifyFilter::Filter_var filter + = admin->get_filter (ids[0]); + + ACE_ASSERT (! CORBA::is_nil (filter.in ())); + + CosNotifyFilter::ConstraintInfoSeq_var infos + = filter->get_all_constraints(); + + ACE_ASSERT (infos->length () == 2); + + u_int index = 0; + + for (index = 0; index < infos->length (); ++ index) + { + CosNotifyFilter::ConstraintID id = infos[index].constraint_id; + ACE_UNUSED_ARG (id); + // only used to validate assert, which is + // compiled out for nondebug builds. + ACE_ASSERT (id != 0); + ACE_ASSERT (ACE_OS::strcmp (infos[index].constraint_expression.constraint_expr.in (), constraint_expr) == 0); + + CosNotification::EventTypeSeq& events = infos[index].constraint_expression.event_types; + ACE_UNUSED_ARG (events); + // only used to validate assert, which is + // compiled out for nondebug builds. + ACE_ASSERT (events.length () == 1); + + ACE_ASSERT (ACE_OS::strcmp (events[0].domain_name.in (), DOMAIN_NAME) == 0); + ACE_ASSERT (ACE_OS::strcmp (events[0].type_name.in (), TYPE_NAME) == 0); + } + + if (modify_constraint_) + { + CosNotifyFilter::ConstraintIDSeq_var ids = new CosNotifyFilter::ConstraintIDSeq (2); + ids->length (2); + for (index = 0; index < infos->length (); ++ index) + { + ids[index] = infos[index].constraint_id; + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t)modify constraint %d \n"), ids[index])); + + infos[index].constraint_expression.constraint_expr + = CORBA::string_dup (mod_constraint_expr); + } + + filter->modify_constraints (ids.in (), infos.in()); + } +} + + +void +FilterClient::create_consumers () +{ + // startup the first consumer. + ACE_NEW_THROW_EX (consumer_1, + Filter_StructuredPushConsumer (this, "consumer1"), + CORBA::NO_MEMORY ()); + + consumer_1->connect (consumer_admin_1_.in ()); + + // startup the second consumer. + ACE_NEW_THROW_EX (consumer_2, + Filter_StructuredPushConsumer (this, "consumer2"), + CORBA::NO_MEMORY ()); + + consumer_2->connect (consumer_admin_2_.in ()); +} + +void +FilterClient::create_suppliers () +{ + // startup the first supplier + ACE_NEW_THROW_EX (supplier_1, + Filter_StructuredPushSupplier ("supplier1"), + CORBA::NO_MEMORY ()); + + supplier_1->connect (supplier_admin_.in ()); + + // startup the second supplier + ACE_NEW_THROW_EX (supplier_2, + Filter_StructuredPushSupplier ("supplier2"), + CORBA::NO_MEMORY ()); + + supplier_2->connect (supplier_admin_.in ()); +} + +void +FilterClient::send_events () +{ + // operations: + CosNotification::StructuredEvent event; + + // EventHeader + + // FixedEventHeader + // EventType + // string + event.header.fixed_header.event_type.domain_name = CORBA::string_dup(DOMAIN_NAME); + // string + event.header.fixed_header.event_type.type_name = CORBA::string_dup(TYPE_NAME); + // string + event.header.fixed_header.event_name = CORBA::string_dup("myevent"); + + // OptionalHeaderFields + // PropertySeq + // sequence<Property>: string name, any value + event.header.variable_header.length (1); // put nothing here + + // FilterableEventBody + // PropertySeq + // sequence<Property>: string name, any value + event.filterable_data.length (3); + event.filterable_data[0].name = CORBA::string_dup("threshold"); + + event.filterable_data[1].name = CORBA::string_dup("temperature"); + event.filterable_data[1].value <<= (CORBA::Long)70; + + event.filterable_data[2].name = CORBA::string_dup("pressure"); + event.filterable_data[2].value <<= (CORBA::Long)80; + + event.filterable_data[0].value <<= (CORBA::Long)4; + + // any + event.remainder_of_body <<= (CORBA::Long)4; + + for (int i = 0; i < EVENTS_TO_SEND; i++) + { + event.filterable_data[0].value <<= (CORBA::Long)i; + + // any + event.remainder_of_body <<= (CORBA::Long)i; + + supplier_1->send_event (event); + + supplier_2->send_event (event); + } +} + + +Filter_StructuredPushConsumer::Filter_StructuredPushConsumer (FilterClient* filter, const char* my_name) + :filter_ (filter), + my_name_ (my_name) +{ +} + +Filter_StructuredPushConsumer::~Filter_StructuredPushConsumer (void) +{ +} + +void +Filter_StructuredPushConsumer::connect (CosNotifyChannelAdmin::ConsumerAdmin_ptr consumer_admin) +{ + // Activate the consumer with the default_POA_ + CosNotifyComm::StructuredPushConsumer_var objref = + this->_this (); + + CosNotifyChannelAdmin::ProxySupplier_var proxysupplier = + consumer_admin->obtain_notification_push_supplier (CosNotifyChannelAdmin::STRUCTURED_EVENT, proxy_supplier_id_); + + ACE_ASSERT (!CORBA::is_nil (proxysupplier.in ())); + + // narrow + this->proxy_supplier_ = + CosNotifyChannelAdmin::StructuredProxyPushSupplier:: + _narrow (proxysupplier.in ()); + + ACE_ASSERT (!CORBA::is_nil (proxy_supplier_.in ())); + + proxy_supplier_->connect_structured_push_consumer (objref.in ()); +} + +void +Filter_StructuredPushConsumer::disconnect () +{ + this->proxy_supplier_-> + disconnect_structured_push_supplier(); +} + +void +Filter_StructuredPushConsumer::offer_change + (const CosNotification::EventTypeSeq & /*added*/, + const CosNotification::EventTypeSeq & /*removed*/) +{ + // No-Op. +} + +void +Filter_StructuredPushConsumer::push_structured_event + (const CosNotification::StructuredEvent & notification) +{ + CORBA::Long val; + + notification.remainder_of_body >>= val; + + // @@ Pradeep: for your tests try to make sure that you count the + // number of expected and sent events to verify that things work + // correctly in an automatic way... + + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%C received event, %d\n"), + my_name_.fast_rep (), val)); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("event count %d\n"), g_result_count.value ())); + + if (++g_result_count == EVENTS_EXPECTED_TO_RECEIVE) + this->filter_->done (); // all events received, we're done. +} + +void +Filter_StructuredPushConsumer::disconnect_structured_push_consumer () +{ + // No-Op. +} + + +/*****************************************************************/ + +Filter_StructuredPushSupplier::Filter_StructuredPushSupplier (const char* my_name) + :my_name_ (my_name) +{ +} + +Filter_StructuredPushSupplier::~Filter_StructuredPushSupplier () +{ +} + +void +Filter_StructuredPushSupplier::connect (CosNotifyChannelAdmin::SupplierAdmin_ptr supplier_admin) +{ + CosNotifyComm::StructuredPushSupplier_var objref = + this->_this (); + + CosNotifyChannelAdmin::ProxyConsumer_var proxyconsumer = + supplier_admin->obtain_notification_push_consumer (CosNotifyChannelAdmin::STRUCTURED_EVENT, proxy_consumer_id_); + + ACE_ASSERT (!CORBA::is_nil (proxyconsumer.in ())); + + // narrow + this->proxy_consumer_ = + CosNotifyChannelAdmin::StructuredProxyPushConsumer::_narrow (proxyconsumer.in ()); + + ACE_ASSERT (!CORBA::is_nil (proxy_consumer_.in ())); + + proxy_consumer_->connect_structured_push_supplier (objref.in ()); +} + +void +Filter_StructuredPushSupplier::disconnect () +{ + ACE_ASSERT (!CORBA::is_nil (this->proxy_consumer_.in ())); + + this->proxy_consumer_->disconnect_structured_push_consumer(); +} + +void +Filter_StructuredPushSupplier::subscription_change + (const CosNotification::EventTypeSeq & /*added*/, + const CosNotification::EventTypeSeq & /*removed */) +{ + //No-Op. +} + +void +Filter_StructuredPushSupplier::send_event + (const CosNotification::StructuredEvent& event) +{ + ACE_ASSERT (!CORBA::is_nil (this->proxy_consumer_.in ())); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%C is sending an event \n"), my_name_.fast_rep ())); + + proxy_consumer_->push_structured_event (event); +} + +void +Filter_StructuredPushSupplier::disconnect_structured_push_supplier () +{ + // No-Op. +} + + diff --git a/TAO/orbsvcs/tests/Notify/Persistent_Filter/Filter.h b/TAO/orbsvcs/tests/Notify/Persistent_Filter/Filter.h new file mode 100644 index 00000000000..98c27a6d436 --- /dev/null +++ b/TAO/orbsvcs/tests/Notify/Persistent_Filter/Filter.h @@ -0,0 +1,270 @@ +/* -*- C++ -*- */ +// $Id$ +// ========================================================================== +// +// = FILENAME +// Filter.h +// +// = DESCRIPTION +// Class to demo structured event filtering. +// +// = AUTHOR +// Pradeep Gore <pradeep@cs.wustl.edu> +// +// ========================================================================== + +#ifndef NOTIFY_FILTER_CLIENT_H +#define NOTIFY_FILTER_CLIENT_H + +#include "orbsvcs/orbsvcs/CosNotifyChannelAdminS.h" +#include "orbsvcs/orbsvcs/CosNotifyCommC.h" +#include "orbsvcs/orbsvcs/CosNamingC.h" +#include "ace/SString.h" + +#if defined(_MSC_VER) +#pragma warning(push) +#pragma warning(disable:4250) +#endif /* _MSC_VER */ + +class Filter_StructuredPushConsumer; +class Filter_StructuredPushSupplier; + +class FilterClient +{ + // = TITLE + // Filter Client + // = DESCRIPTION + // Client example that shows how to do Structured Event filtering + // in the Notification Service. + + public: + // = Initialization and Termination + FilterClient (void); + // Constructor + + ~FilterClient (); + // Destructor + + void init_supplier (int argc, ACE_TCHAR *argv []); + void init_consumer (int argc, ACE_TCHAR *argv []); + // Init the Client. + + void run_supplier (); + void run_consumer (); + // Run the demo. + + void done (void); + // Consumer calls done, We're done. + + protected: + + int parse_args (int argc, + ACE_TCHAR *argv[]); + + void init_ORB (int& argc, ACE_TCHAR **& argv); + // Initializes the ORB. + + void resolve_naming_service (); + // Try to get hold of a running naming service. + + void resolve_Notify_factory (); + // Try to resolve the Notify factory from the Naming service. + + void create_EC (); + void get_EC (); + // Create an EC. + + void create_supplieradmin(); + // Create the Supplier Admin. + void get_supplieradmin(); + + void create_consumeradmin (); + // Create the Consumer Admin. + void get_consumeradmin (); + + void create_consumers (); + // Create and initialize the consumers. + + void create_suppliers (); + // create and initialize the suppliers. + + void send_events (); + // send the events. + + void wait_ready (); + + void wait_consumer_complete (); + + void verify_filter (CosNotifyFilter::FilterAdmin_var& admin, + const char* constraint_expr, + const char* mod_constraint_expr); + + // = Data Members + PortableServer::POA_var root_poa_; + // Reference to the root poa. + + CORBA::ORB_var orb_; + // The ORB that we use. + + CosNaming::NamingContext_var naming_context_; + // Handle to the name service. + + CosNotifyChannelAdmin::EventChannelFactory_var notify_factory_; + // Channel factory. + + CosNotifyChannelAdmin::EventChannel_var ec_; + // The one channel that we create using the factory. + + CosNotifyChannelAdmin::InterFilterGroupOperator ifgop_; + // The group operator between admin-proxy's. + + CosNotification::QoSProperties initial_qos_; + // Initial qos specified to the factory when creating the EC. + + CosNotification::AdminProperties initial_admin_; + // Initial admin props specified to the factory when creating the EC. + + CosNotifyChannelAdmin::ConsumerAdmin_var consumer_admin_1_; + CosNotifyChannelAdmin::ConsumerAdmin_var consumer_admin_2_; + // The consumer admin used by consumers. + + CosNotifyChannelAdmin::SupplierAdmin_var supplier_admin_; + // The supplier admin used by suppliers. + + Filter_StructuredPushConsumer* consumer_1; + // Consumer #1 + + Filter_StructuredPushConsumer* consumer_2; + // Consumer #2 + + Filter_StructuredPushSupplier* supplier_1; + // Supplier #1 + + Filter_StructuredPushSupplier* supplier_2; + // Supplier #2 + + CosNotifyChannelAdmin::AdminID adminid_1_id_; + CosNotifyChannelAdmin::AdminID adminid_2_id_; + + CosNotifyChannelAdmin::ChannelID channel_id_; + + bool use_persistent_; + + bool modify_constraint_; + + // Set this flag to exit the run loop. + CORBA::Boolean done_; +}; + +/*****************************************************************/ +class Filter_StructuredPushConsumer : public POA_CosNotifyComm::StructuredPushConsumer +{ + // = TITLE + // Filter_StructuredPushConsumer + // + // = DESCRIPTION + // Consumer for the Filter example. + // + + public: + // = Initialization and Termination code + Filter_StructuredPushConsumer (FilterClient* filter, const char *my_name); + // Constructor. + + void connect (CosNotifyChannelAdmin::ConsumerAdmin_ptr consumer_admin); + // Connect the Consumer to the EventChannel. + // Creates a new proxy supplier and connects to it. + + virtual void disconnect (); + // Disconnect from the supplier. + +protected: + // = Data members + + FilterClient* filter_; + // The callback for <done> + + ACE_CString my_name_; + // The name of this consumer. + + CosNotifyChannelAdmin::StructuredProxyPushSupplier_var proxy_supplier_; + // The proxy that we are connected to. + + CosNotifyChannelAdmin::ProxyID proxy_supplier_id_; + // The proxy_supplier id. + + // = Methods + virtual ~Filter_StructuredPushConsumer (void); + // Destructor + + // = NotifyPublish method + virtual void offer_change ( + const CosNotification::EventTypeSeq & added, + const CosNotification::EventTypeSeq & removed + ); + + // = StructuredPushSupplier methods + virtual void push_structured_event ( + const CosNotification::StructuredEvent & notification + ); + + virtual void disconnect_structured_push_consumer (); +}; + +/*****************************************************************/ + +class Filter_StructuredPushSupplier : public POA_CosNotifyComm::StructuredPushSupplier +{ + // = TITLE + // Filter_StructuredPushSupplier + // + // = DESCRIPTION + // Supplier for the filter example. + // + public: + // = Initialization and Termination code + Filter_StructuredPushSupplier (const char* my_name); + // Constructor. + + void connect (CosNotifyChannelAdmin::SupplierAdmin_ptr supplier_admin); + // Connect the Supplier to the EventChannel. + // Creates a new proxy supplier and connects to it. + + void disconnect (); + // Disconnect from the supplier. + + virtual void send_event (const CosNotification::StructuredEvent& event); + // Send one event. + +protected: + // = Data members + ACE_CString my_name_; + // The name of this consumer. + + CosNotifyChannelAdmin::StructuredProxyPushConsumer_var proxy_consumer_; + // The proxy that we are connected to. + + CosNotifyChannelAdmin::ProxyID proxy_consumer_id_; + // This supplier's id. + + // = Protected Methods + virtual ~Filter_StructuredPushSupplier (); + // Destructor + + // = NotifySubscribe + virtual void subscription_change ( + const CosNotification::EventTypeSeq & added, + const CosNotification::EventTypeSeq & removed + ); + + // = StructuredPushSupplier method + virtual void disconnect_structured_push_supplier (); +}; + +/***************************************************************************/ + +#if defined(_MSC_VER) +#pragma warning(pop) +#endif /* _MSC_VER */ + +#endif /* NOTIFY_FILTER_CLIENT_H */ diff --git a/TAO/orbsvcs/tests/Notify/Persistent_Filter/Makefile.am b/TAO/orbsvcs/tests/Notify/Persistent_Filter/Makefile.am new file mode 100644 index 00000000000..83d7ba78ff8 --- /dev/null +++ b/TAO/orbsvcs/tests/Notify/Persistent_Filter/Makefile.am @@ -0,0 +1,95 @@ +## Process this file with automake to create Makefile.in +## +## $Id$ +## +## This file was generated by MPC. Any changes made directly to +## this file will be lost the next time it is generated. +## +## MPC Command: +## ../bin/mwc.pl -type automake -noreldefs TAO.mwc + +ACE_BUILDDIR = $(top_builddir)/.. +ACE_ROOT = $(top_srcdir)/.. +TAO_BUILDDIR = $(top_builddir) +TAO_ROOT = $(top_srcdir) + +noinst_PROGRAMS = + +## Makefile.consumer.am + +if BUILD_EXCEPTIONS + +noinst_PROGRAMS += consumer + +consumer_CPPFLAGS = \ + -I$(ACE_ROOT) \ + -I$(ACE_BUILDDIR) \ + -I$(TAO_ROOT) \ + -I$(TAO_BUILDDIR) \ + -I$(TAO_ROOT)/orbsvcs \ + -I$(TAO_BUILDDIR)/orbsvcs \ + -DTAO_HAS_TYPED_EVENT_CHANNEL + +consumer_SOURCES = \ + Filter.cpp \ + consumer.cpp \ + Filter.h + +consumer_LDADD = \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosNaming.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosNotification_Skel.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosEvent_Skel.la \ + $(TAO_BUILDDIR)/tao/libTAO_PortableServer.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosNotification.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosEvent.la \ + $(TAO_BUILDDIR)/tao/libTAO_AnyTypeCode.la \ + $(TAO_BUILDDIR)/tao/libTAO.la \ + $(ACE_BUILDDIR)/ace/libACE.la + +endif BUILD_EXCEPTIONS + +## Makefile.supplier.am + +if BUILD_EXCEPTIONS + +noinst_PROGRAMS += supplier + +supplier_CPPFLAGS = \ + -I$(ACE_ROOT) \ + -I$(ACE_BUILDDIR) \ + -I$(TAO_ROOT) \ + -I$(TAO_BUILDDIR) \ + -I$(TAO_ROOT)/orbsvcs \ + -I$(TAO_BUILDDIR)/orbsvcs \ + -DTAO_HAS_TYPED_EVENT_CHANNEL + +supplier_SOURCES = \ + Filter.cpp \ + supplier.cpp \ + Filter.h + +supplier_LDADD = \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosNaming.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosNotification_Skel.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosEvent_Skel.la \ + $(TAO_BUILDDIR)/tao/libTAO_PortableServer.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosNotification.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosEvent.la \ + $(TAO_BUILDDIR)/tao/libTAO_AnyTypeCode.la \ + $(TAO_BUILDDIR)/tao/libTAO.la \ + $(ACE_BUILDDIR)/ace/libACE.la + +endif BUILD_EXCEPTIONS + + +ACLOCAL = @ACLOCAL@ +ACLOCAL_AMFLAGS = -I m4 +AUTOMAKE_OPTIONS = foreign + +## Clean up template repositories, etc. +clean-local: + -rm -f *~ *.bak *.rpo *.sym lib*.*_pure_* core core.* + -rm -f gcctemp.c gcctemp so_locations *.ics + -rm -rf cxx_repository ptrepository ti_files + -rm -rf templateregistry ir.out + -rm -rf ptrepository SunWS_cache Templates.DB diff --git a/TAO/orbsvcs/tests/Notify/Persistent_Filter/NS.conf b/TAO/orbsvcs/tests/Notify/Persistent_Filter/NS.conf new file mode 100644 index 00000000000..b5fc33c0af7 --- /dev/null +++ b/TAO/orbsvcs/tests/Notify/Persistent_Filter/NS.conf @@ -0,0 +1,5 @@ +static TAO_CosNotify_Service "-AllocateTaskPerProxy -DispatchingThreads 1 -SourceThreads 1 -AllowReconnect -ValidateClient -ValidateClientDelay 1 -ValidateClientInterval 10 " + +dynamic Topology_Factory Service_Object* TAO_CosNotification_Persist:_make_TAO_Notify_XML_Topology_Factory() "-save_base_path ./persistency.notif -load_base_path ./persistency.notif -backup_count 1 -v" + +#dynamic Logging_Strategy Service_Object* ACE:_make_ACE_Logging_Strategy() "-s NS -f OSTREAM -t 0 -m 1024 -w" diff --git a/TAO/orbsvcs/tests/Notify/Persistent_Filter/Persistent_Filter.mpc b/TAO/orbsvcs/tests/Notify/Persistent_Filter/Persistent_Filter.mpc new file mode 100644 index 00000000000..64f8381180d --- /dev/null +++ b/TAO/orbsvcs/tests/Notify/Persistent_Filter/Persistent_Filter.mpc @@ -0,0 +1,19 @@ +// -*- MPC -*- +// $Id$ + + +project(supplier) : orbsvcsexe, notification, notification_skel, naming { + Source_Files { + supplier.cpp + Filter.cpp + } +} + + +project(consumer) : orbsvcsexe, notification, notification_skel, naming { + Source_Files { + consumer.cpp + Filter.cpp + } +} + diff --git a/TAO/orbsvcs/tests/Notify/Persistent_Filter/README b/TAO/orbsvcs/tests/Notify/Persistent_Filter/README new file mode 100644 index 00000000000..c3963500c29 --- /dev/null +++ b/TAO/orbsvcs/tests/Notify/Persistent_Filter/README @@ -0,0 +1,10 @@ +This test is based on the Filter example. +It tests the persistent filter changes. The filter constraints +are saved to persistent data instead of filter IOR. + +The run_test.pl starts NotificationService, consumer and supplier. +After they complete, restart them again with using the persistent +data. The restarted consumer and supplier will verify the filters +and modify the constraints. The test finally should pass with no +errors. + diff --git a/TAO/orbsvcs/tests/Notify/Persistent_Filter/consumer.cpp b/TAO/orbsvcs/tests/Notify/Persistent_Filter/consumer.cpp new file mode 100644 index 00000000000..33f0e181f1d --- /dev/null +++ b/TAO/orbsvcs/tests/Notify/Persistent_Filter/consumer.cpp @@ -0,0 +1,30 @@ +// -*- C++ -*- +// $Id$ + +#include "Filter.h" + +int +ACE_TMAIN (int argc, ACE_TCHAR *argv []) +{ + FilterClient client; + + try + { + client.init_consumer (argc, argv); //Init the Client + + client.run_consumer (); + } + catch (const CORBA::UserException& ue) + { + ue._tao_print_exception ( + "TLS_Client user error: "); + return 1; + } + catch (const CORBA::SystemException& se) + { + se._tao_print_exception ("Consumer system error: "); + return 1; + } + + return 0; +} diff --git a/TAO/orbsvcs/tests/Notify/Persistent_Filter/run_test.pl b/TAO/orbsvcs/tests/Notify/Persistent_Filter/run_test.pl new file mode 100755 index 00000000000..d22a0800507 --- /dev/null +++ b/TAO/orbsvcs/tests/Notify/Persistent_Filter/run_test.pl @@ -0,0 +1,132 @@ +eval '(exit $?0)' && eval 'exec perl -S $0 ${1+"$@"}' + & eval 'exec perl -S $0 $argv:q' + if 0; + +# $Id$ +# -*- perl -*- + +use lib "$ENV{ACE_ROOT}/bin"; +use PerlACE::Run_Test; + + +$experiment_timeout = 60; +$startup_timeout = 60; + +$notify_ior = PerlACE::LocalFile ("notify.ior"); + +$naming_ior = PerlACE::LocalFile ("naming.ior"); + +$svc_conf = PerlACE::LocalFile ("NS.conf"); + +$persistent_prefix = "persistency.notif"; + +$status = 0; + +$Naming = new PerlACE::Process ("../../../Naming_Service/Naming_Service", + "-o $naming_ior"); + +$Notification = new PerlACE::Process ("../../../Notify_Service/Notify_Service"); +#-ORBDebugLevel 10 -ORBVerboseLogging 1 +$Notify_Args = "-ORBSvcConf $svc_conf -ORBInitRef NameService=file://$naming_ior -IORoutput $notify_ior "; + +$Supplier = new PerlACE::Process ("supplier"); + +$Supplier_Args = "-ORBInitRef NameService=file://$naming_ior"; + +$Consumer = new PerlACE::Process ("consumer"); + +$Consumer_Args = "-ORBInitRef NameService=file://$naming_ior"; + +unlink $naming_ior; +$Naming->Spawn (); + +if (PerlACE::waitforfile_timed ($naming_ior, $startup_timeout) == -1) { + print STDERR "ERROR: waiting for the naming service to start\n"; + $Naming->Kill (); + exit 1; +} + +sub run_test +{ + my $client_args = shift; + + unlink $notify_ior; + $Notification->Arguments ($Notify_Args); + $args = $Notification->Arguments (); + print STDERR "Running Notification with arguments: $args\n"; + $Notification->Spawn (); + + if (PerlACE::waitforfile_timed ($notify_ior, $startup_timeout) == -1) { + print STDERR "ERROR: waiting for the notify service to start\n"; + $Notification->Kill (); + $Naming->Kill (); + exit 1; + } + + sleep (5); + + $Supplier->Arguments ($client_args . $Supplier_Args); + $Consumer->Arguments ($client_args . $Consumer_Args); + $args = $Supplier->Arguments (); + print STDERR "Running Supplier with arguments: $args\n"; + $args = $Consumer->Arguments (); + print STDERR "Running Consumer with arguments: $args\n"; + + $status = $Supplier->Spawn (); + + if ($status != 0) + { + print STDERR "ERROR: Supplier Spawn returned $status\n"; + $Notification->Kill (); + $Naming->Kill (); + exit 1; + } + + $status = $Consumer->Spawn (); + + if ($status != 0) + { + print STDERR "ERROR: Consumer returned $status\n"; + $Supplier->Kill (); + $Notification->Kill (); + $Naming->Kill (); + exit 1; + } + + $status = $Consumer->WaitKill ($experiment_timeout); + + if ($status != 0) + { + print STDERR "ERROR: Consumer WaitKill returned $status\n"; + $Consumer->Kill (); + $Supplier->Kill (); + $Notification->Kill (); + $Naming->Kill (); + exit 1; + } + + $status = $Supplier->WaitKill ($experiment_timeout); + + if ($status != 0) + { + print STDERR "ERROR: Supplier WaitKill returned $status\n"; + $Supplier->Kill (); + $Notification->Kill (); + $Naming->Kill (); + exit 1; + } + + $Notification->Kill (); + unlink $notify_ior; +} + +unlink <$persistent_prefix.*>; + +run_test (); +run_test ("-m -p "); + + +$Naming->Kill (); +unlink $naming_ior; + +exit $status; diff --git a/TAO/orbsvcs/tests/Notify/Persistent_Filter/supplier.cpp b/TAO/orbsvcs/tests/Notify/Persistent_Filter/supplier.cpp new file mode 100644 index 00000000000..c2aca21a2c1 --- /dev/null +++ b/TAO/orbsvcs/tests/Notify/Persistent_Filter/supplier.cpp @@ -0,0 +1,29 @@ +// -*- C++ -*- +// $Id$ + +#include "Filter.h" + +int +ACE_TMAIN (int argc, ACE_TCHAR *argv []) +{ + FilterClient client; + + try + { + client.init_supplier (argc, argv); //Init the Client + + client.run_supplier (); + } + catch (const CORBA::UserException& ue) + { + ue._tao_print_exception ("TLS_Client user error: "); + return 1; + } + catch (const CORBA::SystemException& se) + { + se._tao_print_exception ("Supplier system error: "); + return 1; + } + + return 0; +} |