diff options
author | boris <boris@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2005-07-07 13:42:41 +0000 |
---|---|---|
committer | boris <boris@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2005-07-07 13:42:41 +0000 |
commit | fd407016a2a6c847cd323254e012c5a50eb9d475 (patch) | |
tree | 6d5adf704e377abe5066b802acdc347155cf035f /TAO/orbsvcs | |
parent | ac3e9f6504fc53f8c87ac92e99face8118315299 (diff) | |
download | ATCD-fd407016a2a6c847cd323254e012c5a50eb9d475.tar.gz |
ChangeLogTag:Thu Jul 7 15:18:01 2005 Boris Kolpackov <boris@kolpackov.net>
Diffstat (limited to 'TAO/orbsvcs')
-rw-r--r-- | TAO/orbsvcs/examples/Notify/Federation/Agent/Agent.cpp | 303 | ||||
-rw-r--r-- | TAO/orbsvcs/examples/Notify/Federation/Agent/Agent.mpc | 9 | ||||
-rw-r--r-- | TAO/orbsvcs/examples/Notify/Federation/Agent/README | 12 | ||||
-rw-r--r-- | TAO/orbsvcs/examples/Notify/Federation/Agent/agent.dia | bin | 0 -> 4102 bytes | |||
-rw-r--r-- | TAO/orbsvcs/examples/Notify/Federation/Gate/Gate.cpp | 258 | ||||
-rw-r--r-- | TAO/orbsvcs/examples/Notify/Federation/Gate/Gate.h | 85 | ||||
-rw-r--r-- | TAO/orbsvcs/examples/Notify/Federation/Gate/Gate.mpc | 8 | ||||
-rw-r--r-- | TAO/orbsvcs/examples/Notify/Federation/SpaceCraft/README | 27 | ||||
-rw-r--r-- | TAO/orbsvcs/examples/Notify/Federation/SpaceCraft/SpaceCraft.cpp | 175 | ||||
-rw-r--r-- | TAO/orbsvcs/examples/Notify/Federation/SpaceCraft/SpaceCraft.mpc | 9 | ||||
-rw-r--r-- | TAO/orbsvcs/examples/Notify/Federation/federation.mwc | 5 |
11 files changed, 891 insertions, 0 deletions
diff --git a/TAO/orbsvcs/examples/Notify/Federation/Agent/Agent.cpp b/TAO/orbsvcs/examples/Notify/Federation/Agent/Agent.cpp new file mode 100644 index 00000000000..001cdcad97b --- /dev/null +++ b/TAO/orbsvcs/examples/Notify/Federation/Agent/Agent.cpp @@ -0,0 +1,303 @@ +// file : Agent.cpp +// author : Boris Kolpackov <boris@dre.vanderbilt.edu> +// cvs-id : $Id$ + +#include <iostream> +#include <sstream> + +#include <ace/OS.h> + +#include <tao/corba.h> + +#include <orbsvcs/CosNotificationC.h> +#include <orbsvcs/CosNotifyChannelAdminC.h> +#include <orbsvcs/CosNotifyCommC.h> +#include <orbsvcs/CosNotifyCommS.h> + +// For in-process Notification Service. +// +#include <ace/Dynamic_Service.h> +#include <orbsvcs/Notify/Service.h> +#include <orbsvcs/Notify/CosNotify_Initializer.h> // NS static link helper. + + +#include <Gate/Gate.h> + +using std::cerr; +using std::endl; + +using namespace CORBA; +using namespace CosNotifyComm; +using namespace CosNotification; +using namespace CosNotifyChannelAdmin; + +class Agent : public POA_CosNotifyComm::StructuredPushConsumer, + public PortableServer::RefCountServantBase +{ +public: + Agent (char const* space_craft_name, + char const* agent_name, + EventChannel_ptr ch) + : space_craft_name_ (space_craft_name), + agent_name_ (agent_name), + counter_ (0) + { + // Obtain a proxy consumer. + // + ProxyConsumer_var pc ( + ch->default_supplier_admin ()->obtain_notification_push_consumer ( + STRUCTURED_EVENT, consumer_id_)); + + consumer_ = StructuredProxyPushConsumer::_narrow (pc.in ()); + + consumer_->connect_structured_push_supplier ( + StructuredPushSupplier::_nil ()); + + // Register as a consumer. + // + StructuredPushConsumer_var ref (_this ()); // Activate on the default POA. + + ProxySupplier_var ps ( + ch->default_consumer_admin ()->obtain_notification_push_supplier ( + STRUCTURED_EVENT, supplier_id_)); + + supplier_ = StructuredProxyPushSupplier::_narrow (ps.in ()); + + supplier_->connect_structured_push_consumer (ref.in ()); + + // Start tracker thread. + // + if (ACE_OS::thr_create (&tracker_thunk, + this, + THR_JOINABLE, + &thread_) != 0) ::abort (); + } + +private: + static ACE_THR_FUNC_RETURN + tracker_thunk (void* arg) + { + Agent* a (reinterpret_cast<Agent*> (arg)); + a->tracker (); + return 0; + } + + void + tracker () + { + while (true) + { + StructuredEvent e; + + // Header. + // + e.header.fixed_header.event_type.domain_name = string_dup ("Aerospace"); + e.header.fixed_header.event_type.type_name = string_dup ("AgentDiscovery"); + + // Make a unique "event id" by combining space_craft_name, agent_name, + // and counter. This can be handy for debugging. + // + std::ostringstream ostr; + ostr << space_craft_name_ << ":" << agent_name_ << ":" << counter_++; + + e.header.fixed_header.event_name = ostr.str ().c_str (); + + // Also add space_craft_name and agent_name fields separately + // into variable_header. This will make filtering easier. + // + e.header.variable_header.length (2); + + e.header.variable_header[0].name = string_dup ("space_craft_name"); + e.header.variable_header[0].value <<= string_dup (space_craft_name_); + + e.header.variable_header[1].name = string_dup ("agent_name"); + e.header.variable_header[1].value <<= string_dup (agent_name_); + + // Add the counter value into filterable_data section of the event. + // + e.filterable_data.length (1); + + e.filterable_data[0].name = string_dup ("counter"); + e.filterable_data[0].value <<= counter_; + + + consumer_->push_structured_event (e); + + ACE_OS::sleep (ACE_Time_Value (3, 0)); + } + } + +private: + // NotifyPublish interface. + // + virtual void + offer_change (EventTypeSeq const&, EventTypeSeq const&) throw () + { + // We don't care. + } + + // StructuredPushSupplier interface. + // + virtual void + push_structured_event (StructuredEvent const& e) throw () + { + // Extract space_craft_name and agent_name. + // + Char const* space_craft_name = 0; + Char const* agent_name = 0; + + e.header.variable_header[0].value >>= space_craft_name; + e.header.variable_header[1].value >>= agent_name; + + // Extract the counter value. + // + CORBA::ULong counter; + e.filterable_data[0].value >>= counter; + + cerr << e.header.fixed_header.event_type.domain_name << "::" + << e.header.fixed_header.event_type.type_name << " " + << "id=" << e.header.fixed_header.event_name << " from " + << "(" << space_craft_name << ", " << agent_name << ")" + << ": " << counter << endl; + } + + + virtual void + disconnect_structured_push_consumer () throw () + { + // We don't care. + } + +private: + String_var space_craft_name_; + String_var agent_name_; + ULong counter_; + + ACE_thread_t thread_; + + ProxyID consumer_id_; + StructuredProxyPushConsumer_var consumer_; + + ProxyID supplier_id_; + StructuredProxyPushSupplier_var supplier_; + +}; + +int +main (int argc, char* argv[]) +{ + ORB_var orb (ORB_init (argc, argv)); + + if (argc < 2) + { + ACE_ERROR ((LM_ERROR, + "Usage: %s <agent-name> [<space-craft-name>={a, b, c}]\n", + argv[0])); + return 1; + } + + + // Activate the root POA. + // + CORBA::Object_var obj (orb->resolve_initial_references ("RootPOA")); + PortableServer::POA_var root_poa (PortableServer::POA::_narrow (obj.in ())); + + PortableServer::POAManager_var poa_manager (root_poa->the_POAManager ()); + + poa_manager->activate (); + + + // Initialize Notification Service. + // + TAO_Notify_Service* ns = + ACE_Dynamic_Service<TAO_Notify_Service>::instance ( + TAO_NOTIFICATION_SERVICE_NAME); + + if (ns == 0) + { + ns = + ACE_Dynamic_Service<TAO_Notify_Service>::instance ( + TAO_NOTIFY_DEF_EMO_FACTORY_NAME); + } + + if (ns == 0) + { + ACE_ERROR ((LM_ERROR, + "Notification Service not found! check svc.conf\n")); + return -1; + } + + ns->init (orb.in () /*ACE_ENV_ARG_PARAMETER*/); + //ACE_CHECK_RETURN (-1); + + + + // Create the channel factory. + // + EventChannelFactory_var factory (ns->create (root_poa.in () + /*ACE_ENV_ARG_PARAMETER*/)); + //ACE_CHECK_RETURN (-1); + + if (is_nil (factory.in ())) + { + ACE_ERROR ((LM_ERROR, + "Unable to create channel factory\n")); + return 1; + } + + + // Create the channel. + // + QoSProperties qosp; + AdminProperties ap; + ChannelID id; + + EventChannel_var channel (factory->create_channel (qosp, ap, id)); + + // Find which space craft we are on. + // + ACE_INET_Addr addr; + char const* space_craft_name = 0; + + if (argc < 3) + space_craft_name = "a"; // Default to spacecraft "a". + else + space_craft_name = argv[2]; + + // Do a quick mapping to mcast addresses. + // + switch (space_craft_name[0]) + { + case 'a': + { + addr = ACE_INET_Addr ("224.1.0.1:10000"); + break; + } + case 'b': + { + addr = ACE_INET_Addr ("224.1.0.2:10000"); + break; + } + case 'c': + { + addr = ACE_INET_Addr ("224.1.0.3:10000"); + break; + } + default: + { + ACE_ERROR ((LM_ERROR, + "Space craft name should be either 'a', 'b', or 'c'.\n")); + return 1; + } + } + + // Create the gate. + // + Gate gate (addr, channel); + + // Start the agent. + // + Agent agent (space_craft_name, argv[1], channel); + + orb->run (); +} diff --git a/TAO/orbsvcs/examples/Notify/Federation/Agent/Agent.mpc b/TAO/orbsvcs/examples/Notify/Federation/Agent/Agent.mpc new file mode 100644 index 00000000000..897298573c4 --- /dev/null +++ b/TAO/orbsvcs/examples/Notify/Federation/Agent/Agent.mpc @@ -0,0 +1,9 @@ +// -*- MPC -*- +// $Id$ + +project : rmcast, orbsvcsexe, notification, notification_skel, notification_serv, typecodefactory { + exename = agent + after += Gate + libs += Gate + includes += .. +} diff --git a/TAO/orbsvcs/examples/Notify/Federation/Agent/README b/TAO/orbsvcs/examples/Notify/Federation/Agent/README new file mode 100644 index 00000000000..53f999666f6 --- /dev/null +++ b/TAO/orbsvcs/examples/Notify/Federation/Agent/README @@ -0,0 +1,12 @@ +Agent is a simple application which sends and receives discovery messages +via multicast-based federation of Notification Service. To run the example +you don't need to start Notification Service; each agent process will create +its own. Just start a few agents (in different terminal windows): + + +$ ./agent smith +$ ./agent johnson + + +-- +Boris Kolpackov <boris@kolpackov.net> diff --git a/TAO/orbsvcs/examples/Notify/Federation/Agent/agent.dia b/TAO/orbsvcs/examples/Notify/Federation/Agent/agent.dia Binary files differnew file mode 100644 index 00000000000..9840e3fe8a6 --- /dev/null +++ b/TAO/orbsvcs/examples/Notify/Federation/Agent/agent.dia diff --git a/TAO/orbsvcs/examples/Notify/Federation/Gate/Gate.cpp b/TAO/orbsvcs/examples/Notify/Federation/Gate/Gate.cpp new file mode 100644 index 00000000000..bc50c1fafc6 --- /dev/null +++ b/TAO/orbsvcs/examples/Notify/Federation/Gate/Gate.cpp @@ -0,0 +1,258 @@ +// file : Gate.cpp +// author : Boris Kolpackov <boris@dre.vanderbilt.edu> +// cvs-id : $Id$ + +#include "Gate.h" + +/* +#include <iostream> + +using std::cerr; +using std::endl; +*/ + +using namespace CORBA; +using namespace CosNotifyComm; +using namespace CosNotification; +using namespace CosNotifyChannelAdmin; + +Gate:: +~Gate () +{ + // Stop tracker thread. + // + { + Lock l (mutex_); + stop_ = true; + } + + thread_mgr_.wait (); +} + + +Gate:: +Gate (ACE_INET_Addr const& group, EventChannel_ptr ch) + : socket_ (group, false), + stop_ (false) +{ + init (ch->default_consumer_admin (), + ch->default_supplier_admin ()); +} + +Gate:: +Gate (ACE_INET_Addr const& group, + ConsumerAdmin_ptr consumer_admin, + SupplierAdmin_ptr supplier_admin) + : socket_ (group, false), + stop_ (false) +{ + init (consumer_admin, supplier_admin); +} + +void Gate:: +init (ConsumerAdmin_ptr consumer_admin, + SupplierAdmin_ptr supplier_admin) +{ + // Generate unique id. It is used to prevent event looping. + // + ACE_Utils::UUID uuid; + ACE_Utils::UUID_GENERATOR::instance ()->init (); + ACE_Utils::UUID_GENERATOR::instance ()->generateUUID (uuid); + + id_ = string_alloc (uuid.to_string ()->length () + 2); + strcpy (id_.inout (), "_"); + strcpy (id_.inout () + 1, uuid.to_string ()->rep ()); + + // ACE_DEBUG ((LM_DEBUG, "ID: %s\n", id_.in ())); + + + // Obtain proxy consumer. + // + ProxyConsumer_var pc ( + supplier_admin->obtain_notification_push_consumer ( + STRUCTURED_EVENT, consumer_id_)); + + consumer_ = StructuredProxyPushConsumer::_narrow (pc.in ()); + + consumer_->connect_structured_push_supplier ( + StructuredPushSupplier::_nil ()); + + + // Register as consumer. + // + StructuredPushConsumer_var ref (_this ()); // Activate on default POA. + + ProxySupplier_var ps ( + consumer_admin->obtain_notification_push_supplier ( + STRUCTURED_EVENT, supplier_id_)); + + supplier_ = StructuredProxyPushSupplier::_narrow (ps.in ()); + + supplier_->connect_structured_push_consumer (ref.in ()); + + + // Create tracker thread. + // + thread_mgr_.spawn (tracker_thunk, this); +} + +ACE_THR_FUNC_RETURN Gate:: +tracker_thunk (void* arg) +{ + Gate* a (reinterpret_cast<Gate*> (arg)); + a->tracker (); + return 0; +} + +void Gate:: +tracker () +{ + // Time period after which a manual cancellation request is + // checked for. + // + ACE_Time_Value const timeout (0, 500); + + while (true) + { + ssize_t n; + + while (true) + { + n = socket_.size (timeout); + + // Check for cancellation request. + // + { + Lock l (mutex_); + + if (stop_) + return; + } + + if (n == -1) + { + if (errno != ETIME) + abort (); + } + else + break; + } + + OctetSeq seq (n); + seq.length (n); + + char* buffer = reinterpret_cast<char*> (seq.get_buffer ()); + + if (socket_.recv (buffer, n) != n) + { + ACE_ERROR ((LM_ERROR, + "recv() reported different size than size()\n")); + continue; + } + + TAO_InputCDR cdr (buffer, n); + + StructuredEvent e; + + cdr >> e; + + // Add TTL header to prevent infinite message looping. + // + ULong i (0); + + for (; i < e.header.variable_header.length (); ++i) + { + if (strcmp (e.header.variable_header[i].name, id_) == 0) break; + } + + if (i == e.header.variable_header.length ()) + { + e.header.variable_header.length (i + 1); + + e.header.variable_header[i].name = string_dup (id_); + } + + //ACE_DEBUG ((LM_DEBUG, + // "adding %s as header #%d\n", + // e.header.variable_header[i].name.in (), i)); + + e.header.variable_header[i].value <<= ULong (1); + + /* + cerr << "IN: " + << e.header.fixed_header.event_type.domain_name << "::" + << e.header.fixed_header.event_type.type_name << " " + << e.header.fixed_header.event_name << endl; + */ + + consumer_->push_structured_event (e); + } +} + +void Gate:: +push_structured_event (StructuredEvent const& e) throw () +{ + for (ULong i (0); i < e.header.variable_header.length (); ++i) + { + if (strcmp (e.header.variable_header[i].name, id_) == 0) + { + ULong ttl; + + e.header.variable_header[i].value >>= ttl; + + if (ttl <= 1) + { + //ACE_DEBUG ((LM_DEBUG, + // "DROPPED\n")); + return; + } + + break; + } + } + + /* + cerr << "OUT: " + << e.header.fixed_header.event_type.domain_name << "::" + << e.header.fixed_header.event_type.type_name << " " + << e.header.fixed_header.event_name << endl; + */ + + TAO_OutputCDR cdr; + + cdr << e; + + size_t size (cdr.total_length ()); + + OctetSeq seq (size); + seq.length (size); + + char* buffer = reinterpret_cast<char*> (seq.get_buffer ()); + + { + char* buf (buffer); + + for (ACE_Message_Block const* mb = cdr.begin (); + mb != 0; + mb = mb->cont ()) + { + ACE_OS::memcpy (buf, mb->rd_ptr (), mb->length ()); + buf += mb->length (); + } + } + + socket_.send (buffer, size); +} + + +void Gate:: +disconnect_structured_push_consumer () throw () +{ + // We don't care. +} + +void Gate:: +offer_change (EventTypeSeq const&, EventTypeSeq const&) throw () +{ + // We don't care. +} diff --git a/TAO/orbsvcs/examples/Notify/Federation/Gate/Gate.h b/TAO/orbsvcs/examples/Notify/Federation/Gate/Gate.h new file mode 100644 index 00000000000..c70513c3dfc --- /dev/null +++ b/TAO/orbsvcs/examples/Notify/Federation/Gate/Gate.h @@ -0,0 +1,85 @@ +// file : Gate.h +// author : Boris Kolpackov <boris@dre.vanderbilt.edu> +// cvs-id : $Id$ + +#ifndef GATE_H +#define GATE_H + +#include "ace/OS.h" +#include "ace/INET_Addr.h" +#include "ace/UUID.h" + +#include "ace/Thread_Mutex.h" +#include "ace/Thread_Manager.h" + +#include "tao/corba.h" + +#include "orbsvcs/CosNotificationC.h" +#include "orbsvcs/CosNotifyChannelAdminC.h" +#include "orbsvcs/CosNotifyCommC.h" +#include "orbsvcs/CosNotifyCommS.h" + +#include "ace/RMCast/Socket.h" + +class Gate : public POA_CosNotifyComm::StructuredPushConsumer, + public PortableServer::RefCountServantBase +{ +public: + virtual + ~Gate (); + + Gate (ACE_INET_Addr const& group, + CosNotifyChannelAdmin::EventChannel_ptr ch); + + Gate (ACE_INET_Addr const& group, + CosNotifyChannelAdmin::ConsumerAdmin_ptr consumer_admin, + CosNotifyChannelAdmin::SupplierAdmin_ptr supplier_admin); + +private: + void + init (CosNotifyChannelAdmin::ConsumerAdmin_ptr consumer_admin, + CosNotifyChannelAdmin::SupplierAdmin_ptr supplier_admin); + + static ACE_THR_FUNC_RETURN + tracker_thunk (void* arg); + + void + tracker (); + +private: + // NotifyPublish interface. + // + virtual void + offer_change (CosNotification::EventTypeSeq const&, + CosNotification::EventTypeSeq const&) throw (); + + // StructuredPushSupplier interface. + // + virtual void + push_structured_event (CosNotification::StructuredEvent const& e) throw (); + + + virtual void + disconnect_structured_push_consumer () throw (); + +private: + ACE_thread_t thread_; + + CosNotifyChannelAdmin::ProxyID consumer_id_; + CosNotifyChannelAdmin::StructuredProxyPushConsumer_var consumer_; + + CosNotifyChannelAdmin::ProxyID supplier_id_; + CosNotifyChannelAdmin::StructuredProxyPushSupplier_var supplier_; + + ACE_RMCast::Socket socket_; + CORBA::String_var id_; + + typedef ACE_SYNCH_MUTEX Mutex; + typedef ACE_Guard<Mutex> Lock; + + bool stop_; + Mutex mutex_; + ACE_Thread_Manager thread_mgr_; +}; + +#endif // GATE_H diff --git a/TAO/orbsvcs/examples/Notify/Federation/Gate/Gate.mpc b/TAO/orbsvcs/examples/Notify/Federation/Gate/Gate.mpc new file mode 100644 index 00000000000..4b8db7f0477 --- /dev/null +++ b/TAO/orbsvcs/examples/Notify/Federation/Gate/Gate.mpc @@ -0,0 +1,8 @@ +// -*- MPC -*- +// $Id$ + +project(Gate) : rmcast, orbsvcsexe, notification, notification_skel { + sharedname = Gate + dynamicflags = + +} diff --git a/TAO/orbsvcs/examples/Notify/Federation/SpaceCraft/README b/TAO/orbsvcs/examples/Notify/Federation/SpaceCraft/README new file mode 100644 index 00000000000..3be18eaa394 --- /dev/null +++ b/TAO/orbsvcs/examples/Notify/Federation/SpaceCraft/README @@ -0,0 +1,27 @@ +This example simulates spacecraft constellation. Each spacecraft +connects to a constellation-wide multicast group to which it +forwards pre-filtered messages on behalf of its agents. To run +the example start a few spacecrafts (in separate terminal windows): + +$ ./craft a +$ ./craft b +$ ./craft c + +Then start a few agents (from ../Agent) for each spacecraft: + +$ ./agent 1 a +$ ./agent 2 a + +$ ./agent 1 b +$ ./agent 2 b + +$ ./agent 1 c +$ ./agent 2 c + +You should be able to observer that each agent receives only +every third message from the agents on other spacecrafts. + + +-- +Boris Kolpackov <boris@kolpackov.net> + diff --git a/TAO/orbsvcs/examples/Notify/Federation/SpaceCraft/SpaceCraft.cpp b/TAO/orbsvcs/examples/Notify/Federation/SpaceCraft/SpaceCraft.cpp new file mode 100644 index 00000000000..9190b3bda84 --- /dev/null +++ b/TAO/orbsvcs/examples/Notify/Federation/SpaceCraft/SpaceCraft.cpp @@ -0,0 +1,175 @@ +// file : SpaceCraft.cpp +// author : Boris Kolpackov <boris@dre.vanderbilt.edu> +// cvs-id : $Id$ + +#include <iostream> +#include <sstream> + +#include <ace/OS.h> + +#include <tao/corba.h> + +#include <orbsvcs/CosNotificationC.h> +#include <orbsvcs/CosNotifyChannelAdminC.h> +#include <orbsvcs/CosNotifyCommC.h> +#include <orbsvcs/CosNotifyCommS.h> + +// For in-process Notification Service. +// +#include <ace/Dynamic_Service.h> +#include <orbsvcs/Notify/Service.h> +#include <orbsvcs/Notify/CosNotify_Initializer.h> // NS static link helper. + + +#include <Gate/Gate.h> + +using std::cerr; +using std::endl; + +using namespace CORBA; +using namespace CosNotifyComm; +using namespace CosNotifyFilter; +using namespace CosNotification; +using namespace CosNotifyChannelAdmin; + +int +main (int argc, char* argv[]) +{ + ORB_var orb (ORB_init (argc, argv)); + + if (argc < 2) + { + ACE_ERROR ((LM_ERROR, + "Usage: %s <space-craft-name>={a, b, c}\n", + argv[0])); + return 1; + } + + // Activate the root POA. + // + CORBA::Object_var obj (orb->resolve_initial_references ("RootPOA")); + PortableServer::POA_var root_poa (PortableServer::POA::_narrow (obj.in ())); + + PortableServer::POAManager_var poa_manager (root_poa->the_POAManager ()); + + poa_manager->activate (); + + + // Initialize Notification Service. + // + TAO_Notify_Service* ns = + ACE_Dynamic_Service<TAO_Notify_Service>::instance ( + TAO_NOTIFICATION_SERVICE_NAME); + + if (ns == 0) + { + ns = + ACE_Dynamic_Service<TAO_Notify_Service>::instance ( + TAO_NOTIFY_DEF_EMO_FACTORY_NAME); + } + + if (ns == 0) + { + ACE_ERROR ((LM_ERROR, + "Notification Service not found! check svc.conf\n")); + return -1; + } + + ns->init (orb.in () /*ACE_ENV_ARG_PARAMETER*/); + //ACE_CHECK_RETURN (-1); + + + + // Create the channel factory. + // + EventChannelFactory_var factory (ns->create (root_poa.in () + /*ACE_ENV_ARG_PARAMETER*/)); + //ACE_CHECK_RETURN (-1); + + if (is_nil (factory.in ())) + { + ACE_ERROR ((LM_ERROR, + "Unable to create channel factory\n")); + return 1; + } + + + // Create the channel. + // + QoSProperties qosp; + AdminProperties ap; + ChannelID id; + + EventChannel_var channel (factory->create_channel (qosp, ap, id)); + + // Create and install the filter. We want to reduce the amount + // of dicovery messages that are propagated between space crafts. + // + FilterFactory_var filter_factory (channel->default_filter_factory ()); + Filter_var filter (filter_factory->create_filter ("EXTENDED_TCL")); + + ConstraintExpSeq constraints (1); + constraints.length (1); + + constraints[0].event_types.length (0); + constraints[0].constraint_expr = string_dup ( + "$domain_name == 'Aerospace' and " + "$type_name == 'AgentDiscovery' and " + "($.counter - 3 * ($.counter / 3)) == 0");// ETCL (or TAO) doesn't have %? + + filter->add_constraints (constraints); + + AdminID admin_id = 0; + ConsumerAdmin_var consumer_admin ( + channel->new_for_consumers (AND_OP, admin_id)); + + consumer_admin->add_filter (filter); + + // Find which space craft we are. + // + ACE_INET_Addr space_craft_addr; + char const* space_craft_name = 0; + + space_craft_name = argv[1]; + + // Do a quick mapping to mcast addresses. + // + switch (space_craft_name[0]) + { + case 'a': + { + space_craft_addr = ACE_INET_Addr ("224.1.0.1:10000"); + break; + } + case 'b': + { + space_craft_addr = ACE_INET_Addr ("224.1.0.2:10000"); + break; + } + case 'c': + { + space_craft_addr = ACE_INET_Addr ("224.1.0.3:10000"); + break; + } + default: + { + ACE_ERROR ((LM_ERROR, + "Space craft name should be either 'a', 'b', or 'c'.\n")); + return 1; + } + } + + // Create the SpaceCraft <=> Channel gate. + // + Gate space_craft_gate (space_craft_addr, + consumer_admin, + channel->default_supplier_admin ()); + + + // Create the Channel <=> Constellation gate. + // + ACE_INET_Addr constellation_addr ("224.1.1.1:10000"); + Gate constellation_gate (constellation_addr, channel); + + orb->run (); +} diff --git a/TAO/orbsvcs/examples/Notify/Federation/SpaceCraft/SpaceCraft.mpc b/TAO/orbsvcs/examples/Notify/Federation/SpaceCraft/SpaceCraft.mpc new file mode 100644 index 00000000000..e5d56c9be40 --- /dev/null +++ b/TAO/orbsvcs/examples/Notify/Federation/SpaceCraft/SpaceCraft.mpc @@ -0,0 +1,9 @@ +// -*- MPC -*- +// $Id$ + +project : rmcast, orbsvcsexe, notification, notification_skel, notification_serv, typecodefactory { + exename = craft + after += Gate + libs += Gate + includes += .. +} diff --git a/TAO/orbsvcs/examples/Notify/Federation/federation.mwc b/TAO/orbsvcs/examples/Notify/Federation/federation.mwc new file mode 100644 index 00000000000..5be03124c71 --- /dev/null +++ b/TAO/orbsvcs/examples/Notify/Federation/federation.mwc @@ -0,0 +1,5 @@ +// -*- MPC -*- +// $Id$ + +workspace { +} |