summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs
diff options
context:
space:
mode:
authorboris <boris@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2005-07-07 13:42:41 +0000
committerboris <boris@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2005-07-07 13:42:41 +0000
commitfd407016a2a6c847cd323254e012c5a50eb9d475 (patch)
tree6d5adf704e377abe5066b802acdc347155cf035f /TAO/orbsvcs
parentac3e9f6504fc53f8c87ac92e99face8118315299 (diff)
downloadATCD-fd407016a2a6c847cd323254e012c5a50eb9d475.tar.gz
ChangeLogTag:Thu Jul 7 15:18:01 2005 Boris Kolpackov <boris@kolpackov.net>
Diffstat (limited to 'TAO/orbsvcs')
-rw-r--r--TAO/orbsvcs/examples/Notify/Federation/Agent/Agent.cpp303
-rw-r--r--TAO/orbsvcs/examples/Notify/Federation/Agent/Agent.mpc9
-rw-r--r--TAO/orbsvcs/examples/Notify/Federation/Agent/README12
-rw-r--r--TAO/orbsvcs/examples/Notify/Federation/Agent/agent.diabin0 -> 4102 bytes
-rw-r--r--TAO/orbsvcs/examples/Notify/Federation/Gate/Gate.cpp258
-rw-r--r--TAO/orbsvcs/examples/Notify/Federation/Gate/Gate.h85
-rw-r--r--TAO/orbsvcs/examples/Notify/Federation/Gate/Gate.mpc8
-rw-r--r--TAO/orbsvcs/examples/Notify/Federation/SpaceCraft/README27
-rw-r--r--TAO/orbsvcs/examples/Notify/Federation/SpaceCraft/SpaceCraft.cpp175
-rw-r--r--TAO/orbsvcs/examples/Notify/Federation/SpaceCraft/SpaceCraft.mpc9
-rw-r--r--TAO/orbsvcs/examples/Notify/Federation/federation.mwc5
11 files changed, 891 insertions, 0 deletions
diff --git a/TAO/orbsvcs/examples/Notify/Federation/Agent/Agent.cpp b/TAO/orbsvcs/examples/Notify/Federation/Agent/Agent.cpp
new file mode 100644
index 00000000000..001cdcad97b
--- /dev/null
+++ b/TAO/orbsvcs/examples/Notify/Federation/Agent/Agent.cpp
@@ -0,0 +1,303 @@
+// file : Agent.cpp
+// author : Boris Kolpackov <boris@dre.vanderbilt.edu>
+// cvs-id : $Id$
+
+#include <iostream>
+#include <sstream>
+
+#include <ace/OS.h>
+
+#include <tao/corba.h>
+
+#include <orbsvcs/CosNotificationC.h>
+#include <orbsvcs/CosNotifyChannelAdminC.h>
+#include <orbsvcs/CosNotifyCommC.h>
+#include <orbsvcs/CosNotifyCommS.h>
+
+// For in-process Notification Service.
+//
+#include <ace/Dynamic_Service.h>
+#include <orbsvcs/Notify/Service.h>
+#include <orbsvcs/Notify/CosNotify_Initializer.h> // NS static link helper.
+
+
+#include <Gate/Gate.h>
+
+using std::cerr;
+using std::endl;
+
+using namespace CORBA;
+using namespace CosNotifyComm;
+using namespace CosNotification;
+using namespace CosNotifyChannelAdmin;
+
+class Agent : public POA_CosNotifyComm::StructuredPushConsumer,
+ public PortableServer::RefCountServantBase
+{
+public:
+ Agent (char const* space_craft_name,
+ char const* agent_name,
+ EventChannel_ptr ch)
+ : space_craft_name_ (space_craft_name),
+ agent_name_ (agent_name),
+ counter_ (0)
+ {
+ // Obtain a proxy consumer.
+ //
+ ProxyConsumer_var pc (
+ ch->default_supplier_admin ()->obtain_notification_push_consumer (
+ STRUCTURED_EVENT, consumer_id_));
+
+ consumer_ = StructuredProxyPushConsumer::_narrow (pc.in ());
+
+ consumer_->connect_structured_push_supplier (
+ StructuredPushSupplier::_nil ());
+
+ // Register as a consumer.
+ //
+ StructuredPushConsumer_var ref (_this ()); // Activate on the default POA.
+
+ ProxySupplier_var ps (
+ ch->default_consumer_admin ()->obtain_notification_push_supplier (
+ STRUCTURED_EVENT, supplier_id_));
+
+ supplier_ = StructuredProxyPushSupplier::_narrow (ps.in ());
+
+ supplier_->connect_structured_push_consumer (ref.in ());
+
+ // Start tracker thread.
+ //
+ if (ACE_OS::thr_create (&tracker_thunk,
+ this,
+ THR_JOINABLE,
+ &thread_) != 0) ::abort ();
+ }
+
+private:
+ static ACE_THR_FUNC_RETURN
+ tracker_thunk (void* arg)
+ {
+ Agent* a (reinterpret_cast<Agent*> (arg));
+ a->tracker ();
+ return 0;
+ }
+
+ void
+ tracker ()
+ {
+ while (true)
+ {
+ StructuredEvent e;
+
+ // Header.
+ //
+ e.header.fixed_header.event_type.domain_name = string_dup ("Aerospace");
+ e.header.fixed_header.event_type.type_name = string_dup ("AgentDiscovery");
+
+ // Make a unique "event id" by combining space_craft_name, agent_name,
+ // and counter. This can be handy for debugging.
+ //
+ std::ostringstream ostr;
+ ostr << space_craft_name_ << ":" << agent_name_ << ":" << counter_++;
+
+ e.header.fixed_header.event_name = ostr.str ().c_str ();
+
+ // Also add space_craft_name and agent_name fields separately
+ // into variable_header. This will make filtering easier.
+ //
+ e.header.variable_header.length (2);
+
+ e.header.variable_header[0].name = string_dup ("space_craft_name");
+ e.header.variable_header[0].value <<= string_dup (space_craft_name_);
+
+ e.header.variable_header[1].name = string_dup ("agent_name");
+ e.header.variable_header[1].value <<= string_dup (agent_name_);
+
+ // Add the counter value into filterable_data section of the event.
+ //
+ e.filterable_data.length (1);
+
+ e.filterable_data[0].name = string_dup ("counter");
+ e.filterable_data[0].value <<= counter_;
+
+
+ consumer_->push_structured_event (e);
+
+ ACE_OS::sleep (ACE_Time_Value (3, 0));
+ }
+ }
+
+private:
+ // NotifyPublish interface.
+ //
+ virtual void
+ offer_change (EventTypeSeq const&, EventTypeSeq const&) throw ()
+ {
+ // We don't care.
+ }
+
+ // StructuredPushSupplier interface.
+ //
+ virtual void
+ push_structured_event (StructuredEvent const& e) throw ()
+ {
+ // Extract space_craft_name and agent_name.
+ //
+ Char const* space_craft_name = 0;
+ Char const* agent_name = 0;
+
+ e.header.variable_header[0].value >>= space_craft_name;
+ e.header.variable_header[1].value >>= agent_name;
+
+ // Extract the counter value.
+ //
+ CORBA::ULong counter;
+ e.filterable_data[0].value >>= counter;
+
+ cerr << e.header.fixed_header.event_type.domain_name << "::"
+ << e.header.fixed_header.event_type.type_name << " "
+ << "id=" << e.header.fixed_header.event_name << " from "
+ << "(" << space_craft_name << ", " << agent_name << ")"
+ << ": " << counter << endl;
+ }
+
+
+ virtual void
+ disconnect_structured_push_consumer () throw ()
+ {
+ // We don't care.
+ }
+
+private:
+ String_var space_craft_name_;
+ String_var agent_name_;
+ ULong counter_;
+
+ ACE_thread_t thread_;
+
+ ProxyID consumer_id_;
+ StructuredProxyPushConsumer_var consumer_;
+
+ ProxyID supplier_id_;
+ StructuredProxyPushSupplier_var supplier_;
+
+};
+
+int
+main (int argc, char* argv[])
+{
+ ORB_var orb (ORB_init (argc, argv));
+
+ if (argc < 2)
+ {
+ ACE_ERROR ((LM_ERROR,
+ "Usage: %s <agent-name> [<space-craft-name>={a, b, c}]\n",
+ argv[0]));
+ return 1;
+ }
+
+
+ // Activate the root POA.
+ //
+ CORBA::Object_var obj (orb->resolve_initial_references ("RootPOA"));
+ PortableServer::POA_var root_poa (PortableServer::POA::_narrow (obj.in ()));
+
+ PortableServer::POAManager_var poa_manager (root_poa->the_POAManager ());
+
+ poa_manager->activate ();
+
+
+ // Initialize Notification Service.
+ //
+ TAO_Notify_Service* ns =
+ ACE_Dynamic_Service<TAO_Notify_Service>::instance (
+ TAO_NOTIFICATION_SERVICE_NAME);
+
+ if (ns == 0)
+ {
+ ns =
+ ACE_Dynamic_Service<TAO_Notify_Service>::instance (
+ TAO_NOTIFY_DEF_EMO_FACTORY_NAME);
+ }
+
+ if (ns == 0)
+ {
+ ACE_ERROR ((LM_ERROR,
+ "Notification Service not found! check svc.conf\n"));
+ return -1;
+ }
+
+ ns->init (orb.in () /*ACE_ENV_ARG_PARAMETER*/);
+ //ACE_CHECK_RETURN (-1);
+
+
+
+ // Create the channel factory.
+ //
+ EventChannelFactory_var factory (ns->create (root_poa.in ()
+ /*ACE_ENV_ARG_PARAMETER*/));
+ //ACE_CHECK_RETURN (-1);
+
+ if (is_nil (factory.in ()))
+ {
+ ACE_ERROR ((LM_ERROR,
+ "Unable to create channel factory\n"));
+ return 1;
+ }
+
+
+ // Create the channel.
+ //
+ QoSProperties qosp;
+ AdminProperties ap;
+ ChannelID id;
+
+ EventChannel_var channel (factory->create_channel (qosp, ap, id));
+
+ // Find which space craft we are on.
+ //
+ ACE_INET_Addr addr;
+ char const* space_craft_name = 0;
+
+ if (argc < 3)
+ space_craft_name = "a"; // Default to spacecraft "a".
+ else
+ space_craft_name = argv[2];
+
+ // Do a quick mapping to mcast addresses.
+ //
+ switch (space_craft_name[0])
+ {
+ case 'a':
+ {
+ addr = ACE_INET_Addr ("224.1.0.1:10000");
+ break;
+ }
+ case 'b':
+ {
+ addr = ACE_INET_Addr ("224.1.0.2:10000");
+ break;
+ }
+ case 'c':
+ {
+ addr = ACE_INET_Addr ("224.1.0.3:10000");
+ break;
+ }
+ default:
+ {
+ ACE_ERROR ((LM_ERROR,
+ "Space craft name should be either 'a', 'b', or 'c'.\n"));
+ return 1;
+ }
+ }
+
+ // Create the gate.
+ //
+ Gate gate (addr, channel);
+
+ // Start the agent.
+ //
+ Agent agent (space_craft_name, argv[1], channel);
+
+ orb->run ();
+}
diff --git a/TAO/orbsvcs/examples/Notify/Federation/Agent/Agent.mpc b/TAO/orbsvcs/examples/Notify/Federation/Agent/Agent.mpc
new file mode 100644
index 00000000000..897298573c4
--- /dev/null
+++ b/TAO/orbsvcs/examples/Notify/Federation/Agent/Agent.mpc
@@ -0,0 +1,9 @@
+// -*- MPC -*-
+// $Id$
+
+project : rmcast, orbsvcsexe, notification, notification_skel, notification_serv, typecodefactory {
+ exename = agent
+ after += Gate
+ libs += Gate
+ includes += ..
+}
diff --git a/TAO/orbsvcs/examples/Notify/Federation/Agent/README b/TAO/orbsvcs/examples/Notify/Federation/Agent/README
new file mode 100644
index 00000000000..53f999666f6
--- /dev/null
+++ b/TAO/orbsvcs/examples/Notify/Federation/Agent/README
@@ -0,0 +1,12 @@
+Agent is a simple application which sends and receives discovery messages
+via multicast-based federation of Notification Service. To run the example
+you don't need to start Notification Service; each agent process will create
+its own. Just start a few agents (in different terminal windows):
+
+
+$ ./agent smith
+$ ./agent johnson
+
+
+--
+Boris Kolpackov <boris@kolpackov.net>
diff --git a/TAO/orbsvcs/examples/Notify/Federation/Agent/agent.dia b/TAO/orbsvcs/examples/Notify/Federation/Agent/agent.dia
new file mode 100644
index 00000000000..9840e3fe8a6
--- /dev/null
+++ b/TAO/orbsvcs/examples/Notify/Federation/Agent/agent.dia
Binary files differ
diff --git a/TAO/orbsvcs/examples/Notify/Federation/Gate/Gate.cpp b/TAO/orbsvcs/examples/Notify/Federation/Gate/Gate.cpp
new file mode 100644
index 00000000000..bc50c1fafc6
--- /dev/null
+++ b/TAO/orbsvcs/examples/Notify/Federation/Gate/Gate.cpp
@@ -0,0 +1,258 @@
+// file : Gate.cpp
+// author : Boris Kolpackov <boris@dre.vanderbilt.edu>
+// cvs-id : $Id$
+
+#include "Gate.h"
+
+/*
+#include <iostream>
+
+using std::cerr;
+using std::endl;
+*/
+
+using namespace CORBA;
+using namespace CosNotifyComm;
+using namespace CosNotification;
+using namespace CosNotifyChannelAdmin;
+
+Gate::
+~Gate ()
+{
+ // Stop tracker thread.
+ //
+ {
+ Lock l (mutex_);
+ stop_ = true;
+ }
+
+ thread_mgr_.wait ();
+}
+
+
+Gate::
+Gate (ACE_INET_Addr const& group, EventChannel_ptr ch)
+ : socket_ (group, false),
+ stop_ (false)
+{
+ init (ch->default_consumer_admin (),
+ ch->default_supplier_admin ());
+}
+
+Gate::
+Gate (ACE_INET_Addr const& group,
+ ConsumerAdmin_ptr consumer_admin,
+ SupplierAdmin_ptr supplier_admin)
+ : socket_ (group, false),
+ stop_ (false)
+{
+ init (consumer_admin, supplier_admin);
+}
+
+void Gate::
+init (ConsumerAdmin_ptr consumer_admin,
+ SupplierAdmin_ptr supplier_admin)
+{
+ // Generate unique id. It is used to prevent event looping.
+ //
+ ACE_Utils::UUID uuid;
+ ACE_Utils::UUID_GENERATOR::instance ()->init ();
+ ACE_Utils::UUID_GENERATOR::instance ()->generateUUID (uuid);
+
+ id_ = string_alloc (uuid.to_string ()->length () + 2);
+ strcpy (id_.inout (), "_");
+ strcpy (id_.inout () + 1, uuid.to_string ()->rep ());
+
+ // ACE_DEBUG ((LM_DEBUG, "ID: %s\n", id_.in ()));
+
+
+ // Obtain proxy consumer.
+ //
+ ProxyConsumer_var pc (
+ supplier_admin->obtain_notification_push_consumer (
+ STRUCTURED_EVENT, consumer_id_));
+
+ consumer_ = StructuredProxyPushConsumer::_narrow (pc.in ());
+
+ consumer_->connect_structured_push_supplier (
+ StructuredPushSupplier::_nil ());
+
+
+ // Register as consumer.
+ //
+ StructuredPushConsumer_var ref (_this ()); // Activate on default POA.
+
+ ProxySupplier_var ps (
+ consumer_admin->obtain_notification_push_supplier (
+ STRUCTURED_EVENT, supplier_id_));
+
+ supplier_ = StructuredProxyPushSupplier::_narrow (ps.in ());
+
+ supplier_->connect_structured_push_consumer (ref.in ());
+
+
+ // Create tracker thread.
+ //
+ thread_mgr_.spawn (tracker_thunk, this);
+}
+
+ACE_THR_FUNC_RETURN Gate::
+tracker_thunk (void* arg)
+{
+ Gate* a (reinterpret_cast<Gate*> (arg));
+ a->tracker ();
+ return 0;
+}
+
+void Gate::
+tracker ()
+{
+ // Time period after which a manual cancellation request is
+ // checked for.
+ //
+ ACE_Time_Value const timeout (0, 500);
+
+ while (true)
+ {
+ ssize_t n;
+
+ while (true)
+ {
+ n = socket_.size (timeout);
+
+ // Check for cancellation request.
+ //
+ {
+ Lock l (mutex_);
+
+ if (stop_)
+ return;
+ }
+
+ if (n == -1)
+ {
+ if (errno != ETIME)
+ abort ();
+ }
+ else
+ break;
+ }
+
+ OctetSeq seq (n);
+ seq.length (n);
+
+ char* buffer = reinterpret_cast<char*> (seq.get_buffer ());
+
+ if (socket_.recv (buffer, n) != n)
+ {
+ ACE_ERROR ((LM_ERROR,
+ "recv() reported different size than size()\n"));
+ continue;
+ }
+
+ TAO_InputCDR cdr (buffer, n);
+
+ StructuredEvent e;
+
+ cdr >> e;
+
+ // Add TTL header to prevent infinite message looping.
+ //
+ ULong i (0);
+
+ for (; i < e.header.variable_header.length (); ++i)
+ {
+ if (strcmp (e.header.variable_header[i].name, id_) == 0) break;
+ }
+
+ if (i == e.header.variable_header.length ())
+ {
+ e.header.variable_header.length (i + 1);
+
+ e.header.variable_header[i].name = string_dup (id_);
+ }
+
+ //ACE_DEBUG ((LM_DEBUG,
+ // "adding %s as header #%d\n",
+ // e.header.variable_header[i].name.in (), i));
+
+ e.header.variable_header[i].value <<= ULong (1);
+
+ /*
+ cerr << "IN: "
+ << e.header.fixed_header.event_type.domain_name << "::"
+ << e.header.fixed_header.event_type.type_name << " "
+ << e.header.fixed_header.event_name << endl;
+ */
+
+ consumer_->push_structured_event (e);
+ }
+}
+
+void Gate::
+push_structured_event (StructuredEvent const& e) throw ()
+{
+ for (ULong i (0); i < e.header.variable_header.length (); ++i)
+ {
+ if (strcmp (e.header.variable_header[i].name, id_) == 0)
+ {
+ ULong ttl;
+
+ e.header.variable_header[i].value >>= ttl;
+
+ if (ttl <= 1)
+ {
+ //ACE_DEBUG ((LM_DEBUG,
+ // "DROPPED\n"));
+ return;
+ }
+
+ break;
+ }
+ }
+
+ /*
+ cerr << "OUT: "
+ << e.header.fixed_header.event_type.domain_name << "::"
+ << e.header.fixed_header.event_type.type_name << " "
+ << e.header.fixed_header.event_name << endl;
+ */
+
+ TAO_OutputCDR cdr;
+
+ cdr << e;
+
+ size_t size (cdr.total_length ());
+
+ OctetSeq seq (size);
+ seq.length (size);
+
+ char* buffer = reinterpret_cast<char*> (seq.get_buffer ());
+
+ {
+ char* buf (buffer);
+
+ for (ACE_Message_Block const* mb = cdr.begin ();
+ mb != 0;
+ mb = mb->cont ())
+ {
+ ACE_OS::memcpy (buf, mb->rd_ptr (), mb->length ());
+ buf += mb->length ();
+ }
+ }
+
+ socket_.send (buffer, size);
+}
+
+
+void Gate::
+disconnect_structured_push_consumer () throw ()
+{
+ // We don't care.
+}
+
+void Gate::
+offer_change (EventTypeSeq const&, EventTypeSeq const&) throw ()
+{
+ // We don't care.
+}
diff --git a/TAO/orbsvcs/examples/Notify/Federation/Gate/Gate.h b/TAO/orbsvcs/examples/Notify/Federation/Gate/Gate.h
new file mode 100644
index 00000000000..c70513c3dfc
--- /dev/null
+++ b/TAO/orbsvcs/examples/Notify/Federation/Gate/Gate.h
@@ -0,0 +1,85 @@
+// file : Gate.h
+// author : Boris Kolpackov <boris@dre.vanderbilt.edu>
+// cvs-id : $Id$
+
+#ifndef GATE_H
+#define GATE_H
+
+#include "ace/OS.h"
+#include "ace/INET_Addr.h"
+#include "ace/UUID.h"
+
+#include "ace/Thread_Mutex.h"
+#include "ace/Thread_Manager.h"
+
+#include "tao/corba.h"
+
+#include "orbsvcs/CosNotificationC.h"
+#include "orbsvcs/CosNotifyChannelAdminC.h"
+#include "orbsvcs/CosNotifyCommC.h"
+#include "orbsvcs/CosNotifyCommS.h"
+
+#include "ace/RMCast/Socket.h"
+
+class Gate : public POA_CosNotifyComm::StructuredPushConsumer,
+ public PortableServer::RefCountServantBase
+{
+public:
+ virtual
+ ~Gate ();
+
+ Gate (ACE_INET_Addr const& group,
+ CosNotifyChannelAdmin::EventChannel_ptr ch);
+
+ Gate (ACE_INET_Addr const& group,
+ CosNotifyChannelAdmin::ConsumerAdmin_ptr consumer_admin,
+ CosNotifyChannelAdmin::SupplierAdmin_ptr supplier_admin);
+
+private:
+ void
+ init (CosNotifyChannelAdmin::ConsumerAdmin_ptr consumer_admin,
+ CosNotifyChannelAdmin::SupplierAdmin_ptr supplier_admin);
+
+ static ACE_THR_FUNC_RETURN
+ tracker_thunk (void* arg);
+
+ void
+ tracker ();
+
+private:
+ // NotifyPublish interface.
+ //
+ virtual void
+ offer_change (CosNotification::EventTypeSeq const&,
+ CosNotification::EventTypeSeq const&) throw ();
+
+ // StructuredPushSupplier interface.
+ //
+ virtual void
+ push_structured_event (CosNotification::StructuredEvent const& e) throw ();
+
+
+ virtual void
+ disconnect_structured_push_consumer () throw ();
+
+private:
+ ACE_thread_t thread_;
+
+ CosNotifyChannelAdmin::ProxyID consumer_id_;
+ CosNotifyChannelAdmin::StructuredProxyPushConsumer_var consumer_;
+
+ CosNotifyChannelAdmin::ProxyID supplier_id_;
+ CosNotifyChannelAdmin::StructuredProxyPushSupplier_var supplier_;
+
+ ACE_RMCast::Socket socket_;
+ CORBA::String_var id_;
+
+ typedef ACE_SYNCH_MUTEX Mutex;
+ typedef ACE_Guard<Mutex> Lock;
+
+ bool stop_;
+ Mutex mutex_;
+ ACE_Thread_Manager thread_mgr_;
+};
+
+#endif // GATE_H
diff --git a/TAO/orbsvcs/examples/Notify/Federation/Gate/Gate.mpc b/TAO/orbsvcs/examples/Notify/Federation/Gate/Gate.mpc
new file mode 100644
index 00000000000..4b8db7f0477
--- /dev/null
+++ b/TAO/orbsvcs/examples/Notify/Federation/Gate/Gate.mpc
@@ -0,0 +1,8 @@
+// -*- MPC -*-
+// $Id$
+
+project(Gate) : rmcast, orbsvcsexe, notification, notification_skel {
+ sharedname = Gate
+ dynamicflags =
+
+}
diff --git a/TAO/orbsvcs/examples/Notify/Federation/SpaceCraft/README b/TAO/orbsvcs/examples/Notify/Federation/SpaceCraft/README
new file mode 100644
index 00000000000..3be18eaa394
--- /dev/null
+++ b/TAO/orbsvcs/examples/Notify/Federation/SpaceCraft/README
@@ -0,0 +1,27 @@
+This example simulates spacecraft constellation. Each spacecraft
+connects to a constellation-wide multicast group to which it
+forwards pre-filtered messages on behalf of its agents. To run
+the example start a few spacecrafts (in separate terminal windows):
+
+$ ./craft a
+$ ./craft b
+$ ./craft c
+
+Then start a few agents (from ../Agent) for each spacecraft:
+
+$ ./agent 1 a
+$ ./agent 2 a
+
+$ ./agent 1 b
+$ ./agent 2 b
+
+$ ./agent 1 c
+$ ./agent 2 c
+
+You should be able to observer that each agent receives only
+every third message from the agents on other spacecrafts.
+
+
+--
+Boris Kolpackov <boris@kolpackov.net>
+
diff --git a/TAO/orbsvcs/examples/Notify/Federation/SpaceCraft/SpaceCraft.cpp b/TAO/orbsvcs/examples/Notify/Federation/SpaceCraft/SpaceCraft.cpp
new file mode 100644
index 00000000000..9190b3bda84
--- /dev/null
+++ b/TAO/orbsvcs/examples/Notify/Federation/SpaceCraft/SpaceCraft.cpp
@@ -0,0 +1,175 @@
+// file : SpaceCraft.cpp
+// author : Boris Kolpackov <boris@dre.vanderbilt.edu>
+// cvs-id : $Id$
+
+#include <iostream>
+#include <sstream>
+
+#include <ace/OS.h>
+
+#include <tao/corba.h>
+
+#include <orbsvcs/CosNotificationC.h>
+#include <orbsvcs/CosNotifyChannelAdminC.h>
+#include <orbsvcs/CosNotifyCommC.h>
+#include <orbsvcs/CosNotifyCommS.h>
+
+// For in-process Notification Service.
+//
+#include <ace/Dynamic_Service.h>
+#include <orbsvcs/Notify/Service.h>
+#include <orbsvcs/Notify/CosNotify_Initializer.h> // NS static link helper.
+
+
+#include <Gate/Gate.h>
+
+using std::cerr;
+using std::endl;
+
+using namespace CORBA;
+using namespace CosNotifyComm;
+using namespace CosNotifyFilter;
+using namespace CosNotification;
+using namespace CosNotifyChannelAdmin;
+
+int
+main (int argc, char* argv[])
+{
+ ORB_var orb (ORB_init (argc, argv));
+
+ if (argc < 2)
+ {
+ ACE_ERROR ((LM_ERROR,
+ "Usage: %s <space-craft-name>={a, b, c}\n",
+ argv[0]));
+ return 1;
+ }
+
+ // Activate the root POA.
+ //
+ CORBA::Object_var obj (orb->resolve_initial_references ("RootPOA"));
+ PortableServer::POA_var root_poa (PortableServer::POA::_narrow (obj.in ()));
+
+ PortableServer::POAManager_var poa_manager (root_poa->the_POAManager ());
+
+ poa_manager->activate ();
+
+
+ // Initialize Notification Service.
+ //
+ TAO_Notify_Service* ns =
+ ACE_Dynamic_Service<TAO_Notify_Service>::instance (
+ TAO_NOTIFICATION_SERVICE_NAME);
+
+ if (ns == 0)
+ {
+ ns =
+ ACE_Dynamic_Service<TAO_Notify_Service>::instance (
+ TAO_NOTIFY_DEF_EMO_FACTORY_NAME);
+ }
+
+ if (ns == 0)
+ {
+ ACE_ERROR ((LM_ERROR,
+ "Notification Service not found! check svc.conf\n"));
+ return -1;
+ }
+
+ ns->init (orb.in () /*ACE_ENV_ARG_PARAMETER*/);
+ //ACE_CHECK_RETURN (-1);
+
+
+
+ // Create the channel factory.
+ //
+ EventChannelFactory_var factory (ns->create (root_poa.in ()
+ /*ACE_ENV_ARG_PARAMETER*/));
+ //ACE_CHECK_RETURN (-1);
+
+ if (is_nil (factory.in ()))
+ {
+ ACE_ERROR ((LM_ERROR,
+ "Unable to create channel factory\n"));
+ return 1;
+ }
+
+
+ // Create the channel.
+ //
+ QoSProperties qosp;
+ AdminProperties ap;
+ ChannelID id;
+
+ EventChannel_var channel (factory->create_channel (qosp, ap, id));
+
+ // Create and install the filter. We want to reduce the amount
+ // of dicovery messages that are propagated between space crafts.
+ //
+ FilterFactory_var filter_factory (channel->default_filter_factory ());
+ Filter_var filter (filter_factory->create_filter ("EXTENDED_TCL"));
+
+ ConstraintExpSeq constraints (1);
+ constraints.length (1);
+
+ constraints[0].event_types.length (0);
+ constraints[0].constraint_expr = string_dup (
+ "$domain_name == 'Aerospace' and "
+ "$type_name == 'AgentDiscovery' and "
+ "($.counter - 3 * ($.counter / 3)) == 0");// ETCL (or TAO) doesn't have %?
+
+ filter->add_constraints (constraints);
+
+ AdminID admin_id = 0;
+ ConsumerAdmin_var consumer_admin (
+ channel->new_for_consumers (AND_OP, admin_id));
+
+ consumer_admin->add_filter (filter);
+
+ // Find which space craft we are.
+ //
+ ACE_INET_Addr space_craft_addr;
+ char const* space_craft_name = 0;
+
+ space_craft_name = argv[1];
+
+ // Do a quick mapping to mcast addresses.
+ //
+ switch (space_craft_name[0])
+ {
+ case 'a':
+ {
+ space_craft_addr = ACE_INET_Addr ("224.1.0.1:10000");
+ break;
+ }
+ case 'b':
+ {
+ space_craft_addr = ACE_INET_Addr ("224.1.0.2:10000");
+ break;
+ }
+ case 'c':
+ {
+ space_craft_addr = ACE_INET_Addr ("224.1.0.3:10000");
+ break;
+ }
+ default:
+ {
+ ACE_ERROR ((LM_ERROR,
+ "Space craft name should be either 'a', 'b', or 'c'.\n"));
+ return 1;
+ }
+ }
+
+ // Create the SpaceCraft <=> Channel gate.
+ //
+ Gate space_craft_gate (space_craft_addr,
+ consumer_admin,
+ channel->default_supplier_admin ());
+
+
+ // Create the Channel <=> Constellation gate.
+ //
+ ACE_INET_Addr constellation_addr ("224.1.1.1:10000");
+ Gate constellation_gate (constellation_addr, channel);
+
+ orb->run ();
+}
diff --git a/TAO/orbsvcs/examples/Notify/Federation/SpaceCraft/SpaceCraft.mpc b/TAO/orbsvcs/examples/Notify/Federation/SpaceCraft/SpaceCraft.mpc
new file mode 100644
index 00000000000..e5d56c9be40
--- /dev/null
+++ b/TAO/orbsvcs/examples/Notify/Federation/SpaceCraft/SpaceCraft.mpc
@@ -0,0 +1,9 @@
+// -*- MPC -*-
+// $Id$
+
+project : rmcast, orbsvcsexe, notification, notification_skel, notification_serv, typecodefactory {
+ exename = craft
+ after += Gate
+ libs += Gate
+ includes += ..
+}
diff --git a/TAO/orbsvcs/examples/Notify/Federation/federation.mwc b/TAO/orbsvcs/examples/Notify/Federation/federation.mwc
new file mode 100644
index 00000000000..5be03124c71
--- /dev/null
+++ b/TAO/orbsvcs/examples/Notify/Federation/federation.mwc
@@ -0,0 +1,5 @@
+// -*- MPC -*-
+// $Id$
+
+workspace {
+}