summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/tests/Notify/Persistent_Filter
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/orbsvcs/tests/Notify/Persistent_Filter')
-rw-r--r--TAO/orbsvcs/tests/Notify/Persistent_Filter/Filter.cpp752
-rw-r--r--TAO/orbsvcs/tests/Notify/Persistent_Filter/Filter.h270
-rw-r--r--TAO/orbsvcs/tests/Notify/Persistent_Filter/Makefile.am95
-rw-r--r--TAO/orbsvcs/tests/Notify/Persistent_Filter/NS.conf5
-rw-r--r--TAO/orbsvcs/tests/Notify/Persistent_Filter/Persistent_Filter.mpc19
-rw-r--r--TAO/orbsvcs/tests/Notify/Persistent_Filter/README10
-rw-r--r--TAO/orbsvcs/tests/Notify/Persistent_Filter/consumer.cpp30
-rwxr-xr-xTAO/orbsvcs/tests/Notify/Persistent_Filter/run_test.pl132
-rw-r--r--TAO/orbsvcs/tests/Notify/Persistent_Filter/supplier.cpp29
9 files changed, 1342 insertions, 0 deletions
diff --git a/TAO/orbsvcs/tests/Notify/Persistent_Filter/Filter.cpp b/TAO/orbsvcs/tests/Notify/Persistent_Filter/Filter.cpp
new file mode 100644
index 00000000000..3176f89fcdd
--- /dev/null
+++ b/TAO/orbsvcs/tests/Notify/Persistent_Filter/Filter.cpp
@@ -0,0 +1,752 @@
+/* -*- C++ -*- $Id$ */
+
+#include "Filter.h"
+#include "ace/Get_Opt.h"
+#include "ace/OS.h"
+
+ACE_RCSID(Filter, Filter, "Filter.cpp,v 1.13 2002/05/28 20:24:16 pradeep Exp")
+
+#define NOTIFY_FACTORY_NAME "NotifyEventChannelFactory"
+#define NAMING_SERVICE_NAME "NameService"
+#define CA_FILTER "threshold < 20"
+#define SA_FILTER "threshold > 10"
+
+#define MOD_CA_FILTER "threshold < 15"
+#define MOD_SA_FILTER "threshold > 10"
+
+#define TCL_GRAMMAR "TCL"
+#define EVENTS_TO_SEND 30
+int EVENTS_EXPECTED_TO_RECEIVE = 9*4; // 2 consumers get the same events from 2 suppliers
+#define DOMAIN_NAME "*"
+#define TYPE_NAME "*"
+
+ ACE_Atomic_Op <TAO_SYNCH_MUTEX, int> g_result_count = 0;
+
+FilterClient::FilterClient (void)
+ :consumer_1 (0),
+ consumer_2 (0),
+ supplier_1 (0),
+ supplier_2 (0),
+ adminid_1_id_ (0),
+ adminid_2_id_ (0),
+ channel_id_ (0),
+ use_persistent_ (false),
+ modify_constraint_ (false),
+ done_ (0)
+{
+ g_result_count = 0;
+ // No-Op.
+ ifgop_ = CosNotifyChannelAdmin::AND_OP;
+}
+
+FilterClient::~FilterClient ()
+{
+}
+
+
+int
+FilterClient::parse_args (int argc, ACE_TCHAR *argv[])
+{
+ ACE_Get_Opt opts (argc, argv, ACE_TEXT("pm"));
+ int c;
+
+ while ((c = opts ()) != -1)
+ switch (c)
+ {
+ case 'p':
+ this->use_persistent_ = true;
+ break;
+ case 'm':
+ this->modify_constraint_ = true;
+ EVENTS_EXPECTED_TO_RECEIVE = 4*4;
+ break;
+ case '?':
+ default:
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("usage: %C [-p] [-m]\n"),
+ argv [0]),
+ -1);
+ }
+
+ return 0;
+}
+
+void
+FilterClient::init_supplier (int argc, ACE_TCHAR *argv [])
+{
+ init_ORB (argc, argv);
+
+ if (parse_args (argc, argv) == -1)
+ {
+ ACE_OS::exit (1);
+ }
+
+ resolve_naming_service ();
+
+ resolve_Notify_factory ();
+
+ if (this->use_persistent_)
+ {
+ get_EC ();
+ }
+ else
+ {
+ create_EC ();
+ }
+
+ if (this->use_persistent_)
+ {
+ get_supplieradmin ();
+ }
+ else
+ {
+ create_supplieradmin ();
+ }
+
+ create_suppliers ();
+}
+
+
+void
+FilterClient::init_consumer (int argc, ACE_TCHAR *argv [])
+{
+ init_ORB (argc, argv);
+
+ if (parse_args (argc, argv) == -1)
+ {
+ ACE_OS::exit (1);
+ }
+
+ resolve_naming_service ();
+
+ resolve_Notify_factory ();
+
+ get_EC ();
+
+ if (this->use_persistent_)
+ {
+ get_consumeradmin ();
+ }
+ else
+ {
+ create_consumeradmin ();
+ }
+
+ create_consumers ();
+}
+
+void
+FilterClient::wait_ready ()
+{
+ while ( 1 )
+ {
+ CosNotifyChannelAdmin::AdminIDSeq_var cons_ids
+ = ec_->get_all_consumeradmins ();
+
+ CosNotifyChannelAdmin::AdminIDSeq_var sup_ids
+ = ec_->get_all_supplieradmins ();
+
+ if (cons_ids->length () + sup_ids->length () == 3)
+ break;
+ else
+ ACE_OS::sleep (1);
+ }
+}
+
+
+void
+FilterClient::wait_consumer_complete ()
+{
+ int i = 0;
+ const int TIMEOUT = 30;
+
+ while ( i < TIMEOUT )
+ {
+ try
+ {
+ CosNotifyChannelAdmin::AdminIDSeq_var ids
+ = this->ec_->get_all_consumeradmins();
+
+ if (ids->length () > 0)
+ {
+ ACE_OS::sleep (1);
+ ++i;
+ }
+ else
+ break;
+ }
+ catch (const CORBA::OBJECT_NOT_EXIST&)
+ {
+ break;
+ }
+ }
+}
+
+
+void
+FilterClient::run_supplier ()
+{
+ this->wait_ready ();
+
+ //Add delay so consumer won't miss any events.
+ ACE_OS::sleep (5);
+
+ send_events ();
+
+ this->wait_consumer_complete ();
+}
+
+void
+FilterClient::run_consumer ()
+{
+ this->wait_ready ();
+
+ if (g_result_count != EVENTS_EXPECTED_TO_RECEIVE)
+ { // if we still need to wait for events, run the orb.
+ while (!this->done_)
+ if (this->orb_->work_pending ())
+ this->orb_->perform_work ();
+ }
+
+ //ACE_DEBUG ((LM_DEBUG, "(%P|%t)destroying consumer admins ... \n"));
+
+ //this->consumer_admin_1_->destroy ();
+ //this->consumer_admin_2_->destroy ();
+}
+
+void
+FilterClient::done (void)
+{
+ this->done_ = 1;
+}
+
+void
+FilterClient::init_ORB (int& argc, ACE_TCHAR **& argv)
+{
+ this->orb_ = CORBA::ORB_init (argc, argv);
+
+
+ CORBA::Object_ptr poa_object =
+ this->orb_->resolve_initial_references("RootPOA");
+
+ if (CORBA::is_nil (poa_object))
+ {
+ ACE_ERROR ((LM_ERROR,
+ " (%P|%t) Unable to initialize the POA.\n"));
+ return;
+ }
+ this->root_poa_ =
+ PortableServer::POA::_narrow (poa_object);
+
+ PortableServer::POAManager_var poa_manager =
+ root_poa_->the_POAManager ();
+
+ poa_manager->activate ();
+}
+
+void
+FilterClient::resolve_naming_service ()
+{
+ CORBA::Object_var naming_obj =
+ this->orb_->resolve_initial_references (NAMING_SERVICE_NAME);
+
+ // Need to check return value for errors.
+ if (CORBA::is_nil (naming_obj.in ()))
+ throw CORBA::UNKNOWN ();
+
+ this->naming_context_ =
+ CosNaming::NamingContext::_narrow (naming_obj.in ());
+}
+
+void
+FilterClient::resolve_Notify_factory ()
+{
+ CosNaming::Name name (1);
+ name.length (1);
+ name[0].id = CORBA::string_dup (NOTIFY_FACTORY_NAME);
+
+ CORBA::Object_var obj =
+ this->naming_context_->resolve (name);
+
+ this->notify_factory_ =
+ CosNotifyChannelAdmin::EventChannelFactory::_narrow (obj.in ());
+}
+
+void
+FilterClient::create_EC ()
+{
+ ec_ = notify_factory_->create_channel (initial_qos_,
+ initial_admin_,
+ channel_id_);
+
+ ACE_ASSERT (!CORBA::is_nil (ec_.in ()));
+}
+
+void
+FilterClient::get_EC ()
+{
+ int i = 0;
+ const int TIMEOUT = 20;
+ while (i < TIMEOUT)
+ {
+ CosNotifyChannelAdmin::ChannelIDSeq_var ids
+ = notify_factory_->get_all_channels ();
+
+ if (ids->length () > 0)
+ {
+ //ACE_DEBUG ((LM_DEBUG, "(%P|%t)get_EC %d \n", ids->length ()));
+ ec_ = notify_factory_->get_event_channel (ids[0]);
+
+ ACE_ASSERT (!CORBA::is_nil (ec_.in ()));
+ break;
+ }
+ else
+ {
+ ++ i;
+ ACE_OS::sleep (1);
+ }
+ }
+}
+
+void
+FilterClient::create_supplieradmin ()
+{
+ CosNotifyChannelAdmin::AdminID adminid = 0;
+
+ supplier_admin_ =
+ ec_->new_for_suppliers (this->ifgop_, adminid);
+
+ ACE_ASSERT (!CORBA::is_nil (supplier_admin_.in ()));
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%P|%t)create_supplieradmin %d \n"), adminid));
+
+ CosNotifyFilter::FilterFactory_var ffact =
+ ec_->default_filter_factory ();
+
+ // setup a filter at the consumer admin
+ CosNotifyFilter::Filter_var sa_filter =
+ ffact->create_filter (TCL_GRAMMAR);
+
+ ACE_ASSERT (!CORBA::is_nil (sa_filter.in ()));
+
+ CosNotifyFilter::ConstraintExpSeq constraint_list (2);
+ constraint_list.length (2);
+
+ constraint_list[0].event_types.length (1);
+ constraint_list[0].event_types[0].domain_name = CORBA::string_dup(DOMAIN_NAME);
+ constraint_list[0].event_types[0].type_name = CORBA::string_dup(TYPE_NAME);
+
+ constraint_list[0].constraint_expr = CORBA::string_dup (SA_FILTER);
+ constraint_list[1].event_types.length (1);
+ constraint_list[1].event_types[0].domain_name = CORBA::string_dup(DOMAIN_NAME);
+ constraint_list[1].event_types[0].type_name = CORBA::string_dup(TYPE_NAME);
+ constraint_list[1].constraint_expr = CORBA::string_dup (SA_FILTER);
+
+ sa_filter->add_constraints (constraint_list);
+
+ supplier_admin_->add_filter (sa_filter.in ());
+}
+
+
+void
+FilterClient::get_supplieradmin ()
+{
+ CosNotifyChannelAdmin::AdminIDSeq_var ids
+ = ec_->get_all_supplieradmins();
+
+ ACE_ASSERT (ids->length () == 1);
+
+ supplier_admin_ = ec_->get_supplieradmin (ids[0]);
+
+ ACE_ASSERT (!CORBA::is_nil (supplier_admin_.in ()));
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%P|%t)get_supplieradmin %d \n"), ids[0]));
+
+ CosNotifyFilter::FilterAdmin_var admin
+ = CosNotifyFilter::FilterAdmin::_narrow (supplier_admin_.in ());
+ verify_filter (admin, SA_FILTER, MOD_SA_FILTER);
+}
+
+
+void
+FilterClient::create_consumeradmin ()
+{
+ consumer_admin_1_ =
+ ec_->new_for_consumers (this->ifgop_, this->adminid_1_id_);
+
+ ACE_ASSERT (!CORBA::is_nil (consumer_admin_1_.in ()));
+
+ consumer_admin_2_ =
+ ec_->new_for_consumers (this->ifgop_, this->adminid_2_id_);
+
+ ACE_ASSERT (!CORBA::is_nil (consumer_admin_2_.in ()));
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%P|%t)create_consumeradmin %d %d\n"),
+ adminid_1_id_, adminid_2_id_));
+
+ CosNotifyFilter::FilterFactory_var ffact =
+ ec_->default_filter_factory ();
+
+ // setup a filter at the consumer admin
+ CosNotifyFilter::Filter_var ca_filter_1 =
+ ffact->create_filter (TCL_GRAMMAR);
+
+ ACE_ASSERT (!CORBA::is_nil (ca_filter_1.in ()));
+
+ // setup a filter at the consumer admin
+ CosNotifyFilter::Filter_var ca_filter_2 =
+ ffact->create_filter (TCL_GRAMMAR);
+
+ ACE_ASSERT (!CORBA::is_nil (ca_filter_2.in ()));
+
+ /* struct ConstraintExp {
+ CosNotification::EventTypeSeq event_types;
+ string constraint_expr;
+ };
+ */
+ CosNotifyFilter::ConstraintExpSeq constraint_list (2);
+ constraint_list.length (2);
+
+ constraint_list[0].event_types.length (1);
+ constraint_list[0].event_types[0].domain_name = CORBA::string_dup(DOMAIN_NAME);
+ constraint_list[0].event_types[0].type_name = CORBA::string_dup(TYPE_NAME);
+
+ constraint_list[0].constraint_expr = CORBA::string_dup (CA_FILTER);
+ constraint_list[1].event_types.length (1);
+ constraint_list[1].event_types[0].domain_name = CORBA::string_dup(DOMAIN_NAME);
+ constraint_list[1].event_types[0].type_name = CORBA::string_dup(TYPE_NAME);
+ constraint_list[1].constraint_expr = CORBA::string_dup (CA_FILTER);
+
+ ca_filter_1->add_constraints (constraint_list);
+ ca_filter_2->add_constraints (constraint_list);
+
+ consumer_admin_1_->add_filter (ca_filter_1.in ());
+
+ consumer_admin_2_->add_filter (ca_filter_2.in ());
+}
+
+void
+FilterClient::get_consumeradmin ()
+{
+ CosNotifyChannelAdmin::AdminIDSeq_var ids
+ = ec_->get_all_consumeradmins();
+
+ ACE_ASSERT (ids->length () == 2);
+
+ this->adminid_1_id_ = ids[0];
+ this->adminid_2_id_ = ids[1];
+
+ consumer_admin_1_ = ec_->get_consumeradmin (this->adminid_1_id_);
+ consumer_admin_2_ = ec_->get_consumeradmin (this->adminid_2_id_);
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%P|%t)get_consumeradmin %d %d\n"),
+ adminid_1_id_, adminid_2_id_));
+
+ CosNotifyFilter::FilterAdmin_var admin
+ = CosNotifyFilter::FilterAdmin::_narrow (consumer_admin_1_.in ());
+ verify_filter (admin, CA_FILTER, MOD_CA_FILTER);
+ admin = CosNotifyFilter::FilterAdmin::_narrow (consumer_admin_2_.in ());
+ verify_filter (admin, CA_FILTER, MOD_CA_FILTER);
+}
+
+
+void
+FilterClient::verify_filter (CosNotifyFilter::FilterAdmin_var& admin,
+ const char* constraint_expr,
+ const char* mod_constraint_expr)
+{
+ ACE_UNUSED_ARG (constraint_expr);
+ // only used to validate assert, which is
+ // compiled out for nondebug builds.
+ CosNotifyFilter::FilterIDSeq_var ids
+ = admin->get_all_filters ();
+
+ ACE_ASSERT (ids->length () == 1);
+
+ CosNotifyFilter::Filter_var filter
+ = admin->get_filter (ids[0]);
+
+ ACE_ASSERT (! CORBA::is_nil (filter.in ()));
+
+ CosNotifyFilter::ConstraintInfoSeq_var infos
+ = filter->get_all_constraints();
+
+ ACE_ASSERT (infos->length () == 2);
+
+ u_int index = 0;
+
+ for (index = 0; index < infos->length (); ++ index)
+ {
+ CosNotifyFilter::ConstraintID id = infos[index].constraint_id;
+ ACE_UNUSED_ARG (id);
+ // only used to validate assert, which is
+ // compiled out for nondebug builds.
+ ACE_ASSERT (id != 0);
+ ACE_ASSERT (ACE_OS::strcmp (infos[index].constraint_expression.constraint_expr.in (), constraint_expr) == 0);
+
+ CosNotification::EventTypeSeq& events = infos[index].constraint_expression.event_types;
+ ACE_UNUSED_ARG (events);
+ // only used to validate assert, which is
+ // compiled out for nondebug builds.
+ ACE_ASSERT (events.length () == 1);
+
+ ACE_ASSERT (ACE_OS::strcmp (events[0].domain_name.in (), DOMAIN_NAME) == 0);
+ ACE_ASSERT (ACE_OS::strcmp (events[0].type_name.in (), TYPE_NAME) == 0);
+ }
+
+ if (modify_constraint_)
+ {
+ CosNotifyFilter::ConstraintIDSeq_var ids = new CosNotifyFilter::ConstraintIDSeq (2);
+ ids->length (2);
+ for (index = 0; index < infos->length (); ++ index)
+ {
+ ids[index] = infos[index].constraint_id;
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%P|%t)modify constraint %d \n"), ids[index]));
+
+ infos[index].constraint_expression.constraint_expr
+ = CORBA::string_dup (mod_constraint_expr);
+ }
+
+ filter->modify_constraints (ids.in (), infos.in());
+ }
+}
+
+
+void
+FilterClient::create_consumers ()
+{
+ // startup the first consumer.
+ ACE_NEW_THROW_EX (consumer_1,
+ Filter_StructuredPushConsumer (this, "consumer1"),
+ CORBA::NO_MEMORY ());
+
+ consumer_1->connect (consumer_admin_1_.in ());
+
+ // startup the second consumer.
+ ACE_NEW_THROW_EX (consumer_2,
+ Filter_StructuredPushConsumer (this, "consumer2"),
+ CORBA::NO_MEMORY ());
+
+ consumer_2->connect (consumer_admin_2_.in ());
+}
+
+void
+FilterClient::create_suppliers ()
+{
+ // startup the first supplier
+ ACE_NEW_THROW_EX (supplier_1,
+ Filter_StructuredPushSupplier ("supplier1"),
+ CORBA::NO_MEMORY ());
+
+ supplier_1->connect (supplier_admin_.in ());
+
+ // startup the second supplier
+ ACE_NEW_THROW_EX (supplier_2,
+ Filter_StructuredPushSupplier ("supplier2"),
+ CORBA::NO_MEMORY ());
+
+ supplier_2->connect (supplier_admin_.in ());
+}
+
+void
+FilterClient::send_events ()
+{
+ // operations:
+ CosNotification::StructuredEvent event;
+
+ // EventHeader
+
+ // FixedEventHeader
+ // EventType
+ // string
+ event.header.fixed_header.event_type.domain_name = CORBA::string_dup(DOMAIN_NAME);
+ // string
+ event.header.fixed_header.event_type.type_name = CORBA::string_dup(TYPE_NAME);
+ // string
+ event.header.fixed_header.event_name = CORBA::string_dup("myevent");
+
+ // OptionalHeaderFields
+ // PropertySeq
+ // sequence<Property>: string name, any value
+ event.header.variable_header.length (1); // put nothing here
+
+ // FilterableEventBody
+ // PropertySeq
+ // sequence<Property>: string name, any value
+ event.filterable_data.length (3);
+ event.filterable_data[0].name = CORBA::string_dup("threshold");
+
+ event.filterable_data[1].name = CORBA::string_dup("temperature");
+ event.filterable_data[1].value <<= (CORBA::Long)70;
+
+ event.filterable_data[2].name = CORBA::string_dup("pressure");
+ event.filterable_data[2].value <<= (CORBA::Long)80;
+
+ event.filterable_data[0].value <<= (CORBA::Long)4;
+
+ // any
+ event.remainder_of_body <<= (CORBA::Long)4;
+
+ for (int i = 0; i < EVENTS_TO_SEND; i++)
+ {
+ event.filterable_data[0].value <<= (CORBA::Long)i;
+
+ // any
+ event.remainder_of_body <<= (CORBA::Long)i;
+
+ supplier_1->send_event (event);
+
+ supplier_2->send_event (event);
+ }
+}
+
+
+Filter_StructuredPushConsumer::Filter_StructuredPushConsumer (FilterClient* filter, const char* my_name)
+ :filter_ (filter),
+ my_name_ (my_name)
+{
+}
+
+Filter_StructuredPushConsumer::~Filter_StructuredPushConsumer (void)
+{
+}
+
+void
+Filter_StructuredPushConsumer::connect (CosNotifyChannelAdmin::ConsumerAdmin_ptr consumer_admin)
+{
+ // Activate the consumer with the default_POA_
+ CosNotifyComm::StructuredPushConsumer_var objref =
+ this->_this ();
+
+ CosNotifyChannelAdmin::ProxySupplier_var proxysupplier =
+ consumer_admin->obtain_notification_push_supplier (CosNotifyChannelAdmin::STRUCTURED_EVENT, proxy_supplier_id_);
+
+ ACE_ASSERT (!CORBA::is_nil (proxysupplier.in ()));
+
+ // narrow
+ this->proxy_supplier_ =
+ CosNotifyChannelAdmin::StructuredProxyPushSupplier::
+ _narrow (proxysupplier.in ());
+
+ ACE_ASSERT (!CORBA::is_nil (proxy_supplier_.in ()));
+
+ proxy_supplier_->connect_structured_push_consumer (objref.in ());
+}
+
+void
+Filter_StructuredPushConsumer::disconnect ()
+{
+ this->proxy_supplier_->
+ disconnect_structured_push_supplier();
+}
+
+void
+Filter_StructuredPushConsumer::offer_change
+ (const CosNotification::EventTypeSeq & /*added*/,
+ const CosNotification::EventTypeSeq & /*removed*/)
+{
+ // No-Op.
+}
+
+void
+Filter_StructuredPushConsumer::push_structured_event
+ (const CosNotification::StructuredEvent & notification)
+{
+ CORBA::Long val;
+
+ notification.remainder_of_body >>= val;
+
+ // @@ Pradeep: for your tests try to make sure that you count the
+ // number of expected and sent events to verify that things work
+ // correctly in an automatic way...
+
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%C received event, %d\n"),
+ my_name_.fast_rep (), val));
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("event count %d\n"), g_result_count.value ()));
+
+ if (++g_result_count == EVENTS_EXPECTED_TO_RECEIVE)
+ this->filter_->done (); // all events received, we're done.
+}
+
+void
+Filter_StructuredPushConsumer::disconnect_structured_push_consumer ()
+{
+ // No-Op.
+}
+
+
+/*****************************************************************/
+
+Filter_StructuredPushSupplier::Filter_StructuredPushSupplier (const char* my_name)
+ :my_name_ (my_name)
+{
+}
+
+Filter_StructuredPushSupplier::~Filter_StructuredPushSupplier ()
+{
+}
+
+void
+Filter_StructuredPushSupplier::connect (CosNotifyChannelAdmin::SupplierAdmin_ptr supplier_admin)
+{
+ CosNotifyComm::StructuredPushSupplier_var objref =
+ this->_this ();
+
+ CosNotifyChannelAdmin::ProxyConsumer_var proxyconsumer =
+ supplier_admin->obtain_notification_push_consumer (CosNotifyChannelAdmin::STRUCTURED_EVENT, proxy_consumer_id_);
+
+ ACE_ASSERT (!CORBA::is_nil (proxyconsumer.in ()));
+
+ // narrow
+ this->proxy_consumer_ =
+ CosNotifyChannelAdmin::StructuredProxyPushConsumer::_narrow (proxyconsumer.in ());
+
+ ACE_ASSERT (!CORBA::is_nil (proxy_consumer_.in ()));
+
+ proxy_consumer_->connect_structured_push_supplier (objref.in ());
+}
+
+void
+Filter_StructuredPushSupplier::disconnect ()
+{
+ ACE_ASSERT (!CORBA::is_nil (this->proxy_consumer_.in ()));
+
+ this->proxy_consumer_->disconnect_structured_push_consumer();
+}
+
+void
+Filter_StructuredPushSupplier::subscription_change
+ (const CosNotification::EventTypeSeq & /*added*/,
+ const CosNotification::EventTypeSeq & /*removed */)
+{
+ //No-Op.
+}
+
+void
+Filter_StructuredPushSupplier::send_event
+ (const CosNotification::StructuredEvent& event)
+{
+ ACE_ASSERT (!CORBA::is_nil (this->proxy_consumer_.in ()));
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%C is sending an event \n"), my_name_.fast_rep ()));
+
+ proxy_consumer_->push_structured_event (event);
+}
+
+void
+Filter_StructuredPushSupplier::disconnect_structured_push_supplier ()
+{
+ // No-Op.
+}
+
+
diff --git a/TAO/orbsvcs/tests/Notify/Persistent_Filter/Filter.h b/TAO/orbsvcs/tests/Notify/Persistent_Filter/Filter.h
new file mode 100644
index 00000000000..98c27a6d436
--- /dev/null
+++ b/TAO/orbsvcs/tests/Notify/Persistent_Filter/Filter.h
@@ -0,0 +1,270 @@
+/* -*- C++ -*- */
+// $Id$
+// ==========================================================================
+//
+// = FILENAME
+// Filter.h
+//
+// = DESCRIPTION
+// Class to demo structured event filtering.
+//
+// = AUTHOR
+// Pradeep Gore <pradeep@cs.wustl.edu>
+//
+// ==========================================================================
+
+#ifndef NOTIFY_FILTER_CLIENT_H
+#define NOTIFY_FILTER_CLIENT_H
+
+#include "orbsvcs/orbsvcs/CosNotifyChannelAdminS.h"
+#include "orbsvcs/orbsvcs/CosNotifyCommC.h"
+#include "orbsvcs/orbsvcs/CosNamingC.h"
+#include "ace/SString.h"
+
+#if defined(_MSC_VER)
+#pragma warning(push)
+#pragma warning(disable:4250)
+#endif /* _MSC_VER */
+
+class Filter_StructuredPushConsumer;
+class Filter_StructuredPushSupplier;
+
+class FilterClient
+{
+ // = TITLE
+ // Filter Client
+ // = DESCRIPTION
+ // Client example that shows how to do Structured Event filtering
+ // in the Notification Service.
+
+ public:
+ // = Initialization and Termination
+ FilterClient (void);
+ // Constructor
+
+ ~FilterClient ();
+ // Destructor
+
+ void init_supplier (int argc, ACE_TCHAR *argv []);
+ void init_consumer (int argc, ACE_TCHAR *argv []);
+ // Init the Client.
+
+ void run_supplier ();
+ void run_consumer ();
+ // Run the demo.
+
+ void done (void);
+ // Consumer calls done, We're done.
+
+ protected:
+
+ int parse_args (int argc,
+ ACE_TCHAR *argv[]);
+
+ void init_ORB (int& argc, ACE_TCHAR **& argv);
+ // Initializes the ORB.
+
+ void resolve_naming_service ();
+ // Try to get hold of a running naming service.
+
+ void resolve_Notify_factory ();
+ // Try to resolve the Notify factory from the Naming service.
+
+ void create_EC ();
+ void get_EC ();
+ // Create an EC.
+
+ void create_supplieradmin();
+ // Create the Supplier Admin.
+ void get_supplieradmin();
+
+ void create_consumeradmin ();
+ // Create the Consumer Admin.
+ void get_consumeradmin ();
+
+ void create_consumers ();
+ // Create and initialize the consumers.
+
+ void create_suppliers ();
+ // create and initialize the suppliers.
+
+ void send_events ();
+ // send the events.
+
+ void wait_ready ();
+
+ void wait_consumer_complete ();
+
+ void verify_filter (CosNotifyFilter::FilterAdmin_var& admin,
+ const char* constraint_expr,
+ const char* mod_constraint_expr);
+
+ // = Data Members
+ PortableServer::POA_var root_poa_;
+ // Reference to the root poa.
+
+ CORBA::ORB_var orb_;
+ // The ORB that we use.
+
+ CosNaming::NamingContext_var naming_context_;
+ // Handle to the name service.
+
+ CosNotifyChannelAdmin::EventChannelFactory_var notify_factory_;
+ // Channel factory.
+
+ CosNotifyChannelAdmin::EventChannel_var ec_;
+ // The one channel that we create using the factory.
+
+ CosNotifyChannelAdmin::InterFilterGroupOperator ifgop_;
+ // The group operator between admin-proxy's.
+
+ CosNotification::QoSProperties initial_qos_;
+ // Initial qos specified to the factory when creating the EC.
+
+ CosNotification::AdminProperties initial_admin_;
+ // Initial admin props specified to the factory when creating the EC.
+
+ CosNotifyChannelAdmin::ConsumerAdmin_var consumer_admin_1_;
+ CosNotifyChannelAdmin::ConsumerAdmin_var consumer_admin_2_;
+ // The consumer admin used by consumers.
+
+ CosNotifyChannelAdmin::SupplierAdmin_var supplier_admin_;
+ // The supplier admin used by suppliers.
+
+ Filter_StructuredPushConsumer* consumer_1;
+ // Consumer #1
+
+ Filter_StructuredPushConsumer* consumer_2;
+ // Consumer #2
+
+ Filter_StructuredPushSupplier* supplier_1;
+ // Supplier #1
+
+ Filter_StructuredPushSupplier* supplier_2;
+ // Supplier #2
+
+ CosNotifyChannelAdmin::AdminID adminid_1_id_;
+ CosNotifyChannelAdmin::AdminID adminid_2_id_;
+
+ CosNotifyChannelAdmin::ChannelID channel_id_;
+
+ bool use_persistent_;
+
+ bool modify_constraint_;
+
+ // Set this flag to exit the run loop.
+ CORBA::Boolean done_;
+};
+
+/*****************************************************************/
+class Filter_StructuredPushConsumer : public POA_CosNotifyComm::StructuredPushConsumer
+{
+ // = TITLE
+ // Filter_StructuredPushConsumer
+ //
+ // = DESCRIPTION
+ // Consumer for the Filter example.
+ //
+
+ public:
+ // = Initialization and Termination code
+ Filter_StructuredPushConsumer (FilterClient* filter, const char *my_name);
+ // Constructor.
+
+ void connect (CosNotifyChannelAdmin::ConsumerAdmin_ptr consumer_admin);
+ // Connect the Consumer to the EventChannel.
+ // Creates a new proxy supplier and connects to it.
+
+ virtual void disconnect ();
+ // Disconnect from the supplier.
+
+protected:
+ // = Data members
+
+ FilterClient* filter_;
+ // The callback for <done>
+
+ ACE_CString my_name_;
+ // The name of this consumer.
+
+ CosNotifyChannelAdmin::StructuredProxyPushSupplier_var proxy_supplier_;
+ // The proxy that we are connected to.
+
+ CosNotifyChannelAdmin::ProxyID proxy_supplier_id_;
+ // The proxy_supplier id.
+
+ // = Methods
+ virtual ~Filter_StructuredPushConsumer (void);
+ // Destructor
+
+ // = NotifyPublish method
+ virtual void offer_change (
+ const CosNotification::EventTypeSeq & added,
+ const CosNotification::EventTypeSeq & removed
+ );
+
+ // = StructuredPushSupplier methods
+ virtual void push_structured_event (
+ const CosNotification::StructuredEvent & notification
+ );
+
+ virtual void disconnect_structured_push_consumer ();
+};
+
+/*****************************************************************/
+
+class Filter_StructuredPushSupplier : public POA_CosNotifyComm::StructuredPushSupplier
+{
+ // = TITLE
+ // Filter_StructuredPushSupplier
+ //
+ // = DESCRIPTION
+ // Supplier for the filter example.
+ //
+ public:
+ // = Initialization and Termination code
+ Filter_StructuredPushSupplier (const char* my_name);
+ // Constructor.
+
+ void connect (CosNotifyChannelAdmin::SupplierAdmin_ptr supplier_admin);
+ // Connect the Supplier to the EventChannel.
+ // Creates a new proxy supplier and connects to it.
+
+ void disconnect ();
+ // Disconnect from the supplier.
+
+ virtual void send_event (const CosNotification::StructuredEvent& event);
+ // Send one event.
+
+protected:
+ // = Data members
+ ACE_CString my_name_;
+ // The name of this consumer.
+
+ CosNotifyChannelAdmin::StructuredProxyPushConsumer_var proxy_consumer_;
+ // The proxy that we are connected to.
+
+ CosNotifyChannelAdmin::ProxyID proxy_consumer_id_;
+ // This supplier's id.
+
+ // = Protected Methods
+ virtual ~Filter_StructuredPushSupplier ();
+ // Destructor
+
+ // = NotifySubscribe
+ virtual void subscription_change (
+ const CosNotification::EventTypeSeq & added,
+ const CosNotification::EventTypeSeq & removed
+ );
+
+ // = StructuredPushSupplier method
+ virtual void disconnect_structured_push_supplier ();
+};
+
+/***************************************************************************/
+
+#if defined(_MSC_VER)
+#pragma warning(pop)
+#endif /* _MSC_VER */
+
+#endif /* NOTIFY_FILTER_CLIENT_H */
diff --git a/TAO/orbsvcs/tests/Notify/Persistent_Filter/Makefile.am b/TAO/orbsvcs/tests/Notify/Persistent_Filter/Makefile.am
new file mode 100644
index 00000000000..83d7ba78ff8
--- /dev/null
+++ b/TAO/orbsvcs/tests/Notify/Persistent_Filter/Makefile.am
@@ -0,0 +1,95 @@
+## Process this file with automake to create Makefile.in
+##
+## $Id$
+##
+## This file was generated by MPC. Any changes made directly to
+## this file will be lost the next time it is generated.
+##
+## MPC Command:
+## ../bin/mwc.pl -type automake -noreldefs TAO.mwc
+
+ACE_BUILDDIR = $(top_builddir)/..
+ACE_ROOT = $(top_srcdir)/..
+TAO_BUILDDIR = $(top_builddir)
+TAO_ROOT = $(top_srcdir)
+
+noinst_PROGRAMS =
+
+## Makefile.consumer.am
+
+if BUILD_EXCEPTIONS
+
+noinst_PROGRAMS += consumer
+
+consumer_CPPFLAGS = \
+ -I$(ACE_ROOT) \
+ -I$(ACE_BUILDDIR) \
+ -I$(TAO_ROOT) \
+ -I$(TAO_BUILDDIR) \
+ -I$(TAO_ROOT)/orbsvcs \
+ -I$(TAO_BUILDDIR)/orbsvcs \
+ -DTAO_HAS_TYPED_EVENT_CHANNEL
+
+consumer_SOURCES = \
+ Filter.cpp \
+ consumer.cpp \
+ Filter.h
+
+consumer_LDADD = \
+ $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosNaming.la \
+ $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosNotification_Skel.la \
+ $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosEvent_Skel.la \
+ $(TAO_BUILDDIR)/tao/libTAO_PortableServer.la \
+ $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosNotification.la \
+ $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosEvent.la \
+ $(TAO_BUILDDIR)/tao/libTAO_AnyTypeCode.la \
+ $(TAO_BUILDDIR)/tao/libTAO.la \
+ $(ACE_BUILDDIR)/ace/libACE.la
+
+endif BUILD_EXCEPTIONS
+
+## Makefile.supplier.am
+
+if BUILD_EXCEPTIONS
+
+noinst_PROGRAMS += supplier
+
+supplier_CPPFLAGS = \
+ -I$(ACE_ROOT) \
+ -I$(ACE_BUILDDIR) \
+ -I$(TAO_ROOT) \
+ -I$(TAO_BUILDDIR) \
+ -I$(TAO_ROOT)/orbsvcs \
+ -I$(TAO_BUILDDIR)/orbsvcs \
+ -DTAO_HAS_TYPED_EVENT_CHANNEL
+
+supplier_SOURCES = \
+ Filter.cpp \
+ supplier.cpp \
+ Filter.h
+
+supplier_LDADD = \
+ $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosNaming.la \
+ $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosNotification_Skel.la \
+ $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosEvent_Skel.la \
+ $(TAO_BUILDDIR)/tao/libTAO_PortableServer.la \
+ $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosNotification.la \
+ $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosEvent.la \
+ $(TAO_BUILDDIR)/tao/libTAO_AnyTypeCode.la \
+ $(TAO_BUILDDIR)/tao/libTAO.la \
+ $(ACE_BUILDDIR)/ace/libACE.la
+
+endif BUILD_EXCEPTIONS
+
+
+ACLOCAL = @ACLOCAL@
+ACLOCAL_AMFLAGS = -I m4
+AUTOMAKE_OPTIONS = foreign
+
+## Clean up template repositories, etc.
+clean-local:
+ -rm -f *~ *.bak *.rpo *.sym lib*.*_pure_* core core.*
+ -rm -f gcctemp.c gcctemp so_locations *.ics
+ -rm -rf cxx_repository ptrepository ti_files
+ -rm -rf templateregistry ir.out
+ -rm -rf ptrepository SunWS_cache Templates.DB
diff --git a/TAO/orbsvcs/tests/Notify/Persistent_Filter/NS.conf b/TAO/orbsvcs/tests/Notify/Persistent_Filter/NS.conf
new file mode 100644
index 00000000000..b5fc33c0af7
--- /dev/null
+++ b/TAO/orbsvcs/tests/Notify/Persistent_Filter/NS.conf
@@ -0,0 +1,5 @@
+static TAO_CosNotify_Service "-AllocateTaskPerProxy -DispatchingThreads 1 -SourceThreads 1 -AllowReconnect -ValidateClient -ValidateClientDelay 1 -ValidateClientInterval 10 "
+
+dynamic Topology_Factory Service_Object* TAO_CosNotification_Persist:_make_TAO_Notify_XML_Topology_Factory() "-save_base_path ./persistency.notif -load_base_path ./persistency.notif -backup_count 1 -v"
+
+#dynamic Logging_Strategy Service_Object* ACE:_make_ACE_Logging_Strategy() "-s NS -f OSTREAM -t 0 -m 1024 -w"
diff --git a/TAO/orbsvcs/tests/Notify/Persistent_Filter/Persistent_Filter.mpc b/TAO/orbsvcs/tests/Notify/Persistent_Filter/Persistent_Filter.mpc
new file mode 100644
index 00000000000..64f8381180d
--- /dev/null
+++ b/TAO/orbsvcs/tests/Notify/Persistent_Filter/Persistent_Filter.mpc
@@ -0,0 +1,19 @@
+// -*- MPC -*-
+// $Id$
+
+
+project(supplier) : orbsvcsexe, notification, notification_skel, naming {
+ Source_Files {
+ supplier.cpp
+ Filter.cpp
+ }
+}
+
+
+project(consumer) : orbsvcsexe, notification, notification_skel, naming {
+ Source_Files {
+ consumer.cpp
+ Filter.cpp
+ }
+}
+
diff --git a/TAO/orbsvcs/tests/Notify/Persistent_Filter/README b/TAO/orbsvcs/tests/Notify/Persistent_Filter/README
new file mode 100644
index 00000000000..c3963500c29
--- /dev/null
+++ b/TAO/orbsvcs/tests/Notify/Persistent_Filter/README
@@ -0,0 +1,10 @@
+This test is based on the Filter example.
+It tests the persistent filter changes. The filter constraints
+are saved to persistent data instead of filter IOR.
+
+The run_test.pl starts NotificationService, consumer and supplier.
+After they complete, restart them again with using the persistent
+data. The restarted consumer and supplier will verify the filters
+and modify the constraints. The test finally should pass with no
+errors.
+
diff --git a/TAO/orbsvcs/tests/Notify/Persistent_Filter/consumer.cpp b/TAO/orbsvcs/tests/Notify/Persistent_Filter/consumer.cpp
new file mode 100644
index 00000000000..33f0e181f1d
--- /dev/null
+++ b/TAO/orbsvcs/tests/Notify/Persistent_Filter/consumer.cpp
@@ -0,0 +1,30 @@
+// -*- C++ -*-
+// $Id$
+
+#include "Filter.h"
+
+int
+ACE_TMAIN (int argc, ACE_TCHAR *argv [])
+{
+ FilterClient client;
+
+ try
+ {
+ client.init_consumer (argc, argv); //Init the Client
+
+ client.run_consumer ();
+ }
+ catch (const CORBA::UserException& ue)
+ {
+ ue._tao_print_exception (
+ "TLS_Client user error: ");
+ return 1;
+ }
+ catch (const CORBA::SystemException& se)
+ {
+ se._tao_print_exception ("Consumer system error: ");
+ return 1;
+ }
+
+ return 0;
+}
diff --git a/TAO/orbsvcs/tests/Notify/Persistent_Filter/run_test.pl b/TAO/orbsvcs/tests/Notify/Persistent_Filter/run_test.pl
new file mode 100755
index 00000000000..d22a0800507
--- /dev/null
+++ b/TAO/orbsvcs/tests/Notify/Persistent_Filter/run_test.pl
@@ -0,0 +1,132 @@
+eval '(exit $?0)' && eval 'exec perl -S $0 ${1+"$@"}'
+ & eval 'exec perl -S $0 $argv:q'
+ if 0;
+
+# $Id$
+# -*- perl -*-
+
+use lib "$ENV{ACE_ROOT}/bin";
+use PerlACE::Run_Test;
+
+
+$experiment_timeout = 60;
+$startup_timeout = 60;
+
+$notify_ior = PerlACE::LocalFile ("notify.ior");
+
+$naming_ior = PerlACE::LocalFile ("naming.ior");
+
+$svc_conf = PerlACE::LocalFile ("NS.conf");
+
+$persistent_prefix = "persistency.notif";
+
+$status = 0;
+
+$Naming = new PerlACE::Process ("../../../Naming_Service/Naming_Service",
+ "-o $naming_ior");
+
+$Notification = new PerlACE::Process ("../../../Notify_Service/Notify_Service");
+#-ORBDebugLevel 10 -ORBVerboseLogging 1
+$Notify_Args = "-ORBSvcConf $svc_conf -ORBInitRef NameService=file://$naming_ior -IORoutput $notify_ior ";
+
+$Supplier = new PerlACE::Process ("supplier");
+
+$Supplier_Args = "-ORBInitRef NameService=file://$naming_ior";
+
+$Consumer = new PerlACE::Process ("consumer");
+
+$Consumer_Args = "-ORBInitRef NameService=file://$naming_ior";
+
+unlink $naming_ior;
+$Naming->Spawn ();
+
+if (PerlACE::waitforfile_timed ($naming_ior, $startup_timeout) == -1) {
+ print STDERR "ERROR: waiting for the naming service to start\n";
+ $Naming->Kill ();
+ exit 1;
+}
+
+sub run_test
+{
+ my $client_args = shift;
+
+ unlink $notify_ior;
+ $Notification->Arguments ($Notify_Args);
+ $args = $Notification->Arguments ();
+ print STDERR "Running Notification with arguments: $args\n";
+ $Notification->Spawn ();
+
+ if (PerlACE::waitforfile_timed ($notify_ior, $startup_timeout) == -1) {
+ print STDERR "ERROR: waiting for the notify service to start\n";
+ $Notification->Kill ();
+ $Naming->Kill ();
+ exit 1;
+ }
+
+ sleep (5);
+
+ $Supplier->Arguments ($client_args . $Supplier_Args);
+ $Consumer->Arguments ($client_args . $Consumer_Args);
+ $args = $Supplier->Arguments ();
+ print STDERR "Running Supplier with arguments: $args\n";
+ $args = $Consumer->Arguments ();
+ print STDERR "Running Consumer with arguments: $args\n";
+
+ $status = $Supplier->Spawn ();
+
+ if ($status != 0)
+ {
+ print STDERR "ERROR: Supplier Spawn returned $status\n";
+ $Notification->Kill ();
+ $Naming->Kill ();
+ exit 1;
+ }
+
+ $status = $Consumer->Spawn ();
+
+ if ($status != 0)
+ {
+ print STDERR "ERROR: Consumer returned $status\n";
+ $Supplier->Kill ();
+ $Notification->Kill ();
+ $Naming->Kill ();
+ exit 1;
+ }
+
+ $status = $Consumer->WaitKill ($experiment_timeout);
+
+ if ($status != 0)
+ {
+ print STDERR "ERROR: Consumer WaitKill returned $status\n";
+ $Consumer->Kill ();
+ $Supplier->Kill ();
+ $Notification->Kill ();
+ $Naming->Kill ();
+ exit 1;
+ }
+
+ $status = $Supplier->WaitKill ($experiment_timeout);
+
+ if ($status != 0)
+ {
+ print STDERR "ERROR: Supplier WaitKill returned $status\n";
+ $Supplier->Kill ();
+ $Notification->Kill ();
+ $Naming->Kill ();
+ exit 1;
+ }
+
+ $Notification->Kill ();
+ unlink $notify_ior;
+}
+
+unlink <$persistent_prefix.*>;
+
+run_test ();
+run_test ("-m -p ");
+
+
+$Naming->Kill ();
+unlink $naming_ior;
+
+exit $status;
diff --git a/TAO/orbsvcs/tests/Notify/Persistent_Filter/supplier.cpp b/TAO/orbsvcs/tests/Notify/Persistent_Filter/supplier.cpp
new file mode 100644
index 00000000000..c2aca21a2c1
--- /dev/null
+++ b/TAO/orbsvcs/tests/Notify/Persistent_Filter/supplier.cpp
@@ -0,0 +1,29 @@
+// -*- C++ -*-
+// $Id$
+
+#include "Filter.h"
+
+int
+ACE_TMAIN (int argc, ACE_TCHAR *argv [])
+{
+ FilterClient client;
+
+ try
+ {
+ client.init_supplier (argc, argv); //Init the Client
+
+ client.run_supplier ();
+ }
+ catch (const CORBA::UserException& ue)
+ {
+ ue._tao_print_exception ("TLS_Client user error: ");
+ return 1;
+ }
+ catch (const CORBA::SystemException& se)
+ {
+ se._tao_print_exception ("Supplier system error: ");
+ return 1;
+ }
+
+ return 0;
+}