// file : Agent.cpp // author : Boris Kolpackov // cvs-id : $Id$ #include #if defined(ACE_USES_OLD_IOSTREAMS) # if defined(_MSC_VER) # include # else # include # endif #else # include #endif #include "ace/OS.h" #include "tao/corba.h" #include "orbsvcs/CosNotificationC.h" #include "orbsvcs/CosNotifyChannelAdminC.h" #include "orbsvcs/CosNotifyCommC.h" #include "orbsvcs/CosNotifyCommS.h" // For in-process Notification Service. // #include "ace/Dynamic_Service.h" #include "ace/Argv_Type_Converter.h" #include "orbsvcs/Notify/Service.h" #include "orbsvcs/Notify/CosNotify_Initializer.h" // NS static link helper. #include "Gate/Gate.h" using namespace CORBA; using namespace CosNotifyComm; using namespace CosNotification; using namespace CosNotifyChannelAdmin; class Agent : public POA_CosNotifyComm::StructuredPushConsumer, public PortableServer::RefCountServantBase { public: Agent (char const* space_craft_name, char const* agent_name, EventChannel_ptr ch) : space_craft_name_ (space_craft_name), agent_name_ (agent_name), counter_ (0) { // Obtain a proxy consumer. // ProxyConsumer_var pc ( ch->default_supplier_admin ()->obtain_notification_push_consumer ( STRUCTURED_EVENT, consumer_id_)); consumer_ = StructuredProxyPushConsumer::_narrow (pc.in ()); consumer_->connect_structured_push_supplier ( StructuredPushSupplier::_nil ()); // Register as a consumer. // StructuredPushConsumer_var ref (_this ()); // Activate on the default POA. ProxySupplier_var ps ( ch->default_consumer_admin ()->obtain_notification_push_supplier ( STRUCTURED_EVENT, supplier_id_)); supplier_ = StructuredProxyPushSupplier::_narrow (ps.in ()); supplier_->connect_structured_push_consumer (ref.in ()); // Start tracker thread. // if (ACE_OS::thr_create (&tracker_thunk, this, THR_JOINABLE, &thread_) != 0) ::abort (); } private: static ACE_THR_FUNC_RETURN tracker_thunk (void* arg) { Agent* a = reinterpret_cast (arg); a->tracker (); return 0; } void tracker () { while (true) { StructuredEvent e; // Header. // e.header.fixed_header.event_type.domain_name = string_dup ("Aerospace"); e.header.fixed_header.event_type.type_name = string_dup ("AgentDiscovery"); // Make a unique "event id" by combining space_craft_name, agent_name, // and counter. This can be handy for debugging. // #if defined(ACE_USES_OLD_IOSTREAMS) ostrstream ostr; #else std::ostringstream ostr; #endif ostr << space_craft_name_ << ":" << agent_name_ << ":" << counter_++; #if defined(ACE_USES_OLD_IOSTREAMS) e.header.fixed_header.event_name = ostr.str (); #else e.header.fixed_header.event_name = ostr.str ().c_str (); #endif // Also add space_craft_name and agent_name fields separately // into variable_header. This will make filtering easier. // e.header.variable_header.length (2); e.header.variable_header[0].name = string_dup ("space_craft_name"); e.header.variable_header[0].value <<= string_dup (space_craft_name_.in ()); e.header.variable_header[1].name = string_dup ("agent_name"); e.header.variable_header[1].value <<= string_dup (agent_name_.in ()); // Add the counter value into filterable_data section of the event. // e.filterable_data.length (1); e.filterable_data[0].name = string_dup ("counter"); e.filterable_data[0].value <<= counter_; consumer_->push_structured_event (e); ACE_OS::sleep (ACE_Time_Value (3, 0)); } } private: // NotifyPublish interface. // virtual void offer_change (EventTypeSeq const&, EventTypeSeq const& ACE_ENV_ARG_DECL_NOT_USED) ACE_THROW_SPEC ((CORBA::SystemException, CosNotifyComm::InvalidEventType)) { // We don't care. } // StructuredPushSupplier interface. // virtual void push_structured_event (StructuredEvent const& e ACE_ENV_ARG_DECL_NOT_USED) ACE_THROW_SPEC ((CORBA::SystemException, CosEventComm::Disconnected)) { // Extract space_craft_name and agent_name. // Char const* space_craft_name = 0; Char const* agent_name = 0; e.header.variable_header[0].value >>= space_craft_name; e.header.variable_header[1].value >>= agent_name; // Extract the counter value. // CORBA::ULong counter; e.filterable_data[0].value >>= counter; cerr << e.header.fixed_header.event_type.domain_name << "::" << e.header.fixed_header.event_type.type_name << " " << "id=" << e.header.fixed_header.event_name << " from " << "(" << space_craft_name << ", " << agent_name << ")" << ": " << counter << endl; } virtual void disconnect_structured_push_consumer (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) ACE_THROW_SPEC ((CORBA::SystemException)) { // We don't care. } private: String_var space_craft_name_; String_var agent_name_; ULong counter_; ACE_thread_t thread_; ProxyID consumer_id_; StructuredProxyPushConsumer_var consumer_; ProxyID supplier_id_; StructuredProxyPushSupplier_var supplier_; }; int ACE_TMAIN (int argc, ACE_TCHAR* argv[]) { ACE_Argv_Type_Converter convert (argc, argv); ACE_TRY_NEW_ENV { ORB_var orb (ORB_init (convert.get_argc(), convert.get_ASCII_argv())); if (argc < 2) { ACE_ERROR ((LM_ERROR, "Usage: %s [={a, b, c}]\n", argv[0])); return 1; } // Activate the root POA. // CORBA::Object_var obj ( orb->resolve_initial_references ("RootPOA" ACE_ENV_ARG_PARAMETER)); ACE_TRY_CHECK; PortableServer::POA_var root_poa (PortableServer::POA::_narrow (obj.in ())); PortableServer::POAManager_var poa_manager (root_poa->the_POAManager ()); poa_manager->activate (); // Initialize Notification Service. // TAO_Notify_Service* ns = ACE_Dynamic_Service::instance ( TAO_NOTIFICATION_SERVICE_NAME); if (ns == 0) { ns = ACE_Dynamic_Service::instance ( TAO_NOTIFY_DEF_EMO_FACTORY_NAME); } if (ns == 0) { ACE_ERROR ((LM_ERROR, "Notification Service not found! check svc.conf\n")); return -1; } ns->init_service (orb.in () ACE_ENV_ARG_PARAMETER); ACE_TRY_CHECK; // Create the channel factory. // EventChannelFactory_var factory (ns->create (root_poa.in () ACE_ENV_ARG_PARAMETER)); ACE_TRY_CHECK; if (is_nil (factory.in ())) { ACE_ERROR ((LM_ERROR, "Unable to create channel factory\n")); return 1; } // Create the channel. // QoSProperties qosp; AdminProperties ap; ChannelID id; EventChannel_var channel (factory->create_channel (qosp, ap, id)); // Find which space craft we are on. // ACE_INET_Addr addr; char const* space_craft_name = 0; if (argc < 3) space_craft_name = "a"; // Default to spacecraft "a". else space_craft_name = convert.get_ASCII_argv()[2]; // Do a quick mapping to mcast addresses. // switch (space_craft_name[0]) { case 'a': { addr = ACE_INET_Addr ("224.1.0.1:10000"); break; } case 'b': { addr = ACE_INET_Addr ("224.1.0.2:10000"); break; } case 'c': { addr = ACE_INET_Addr ("224.1.0.3:10000"); break; } default: { ACE_ERROR ((LM_ERROR, "Space craft name should be either 'a', 'b', or 'c'.\n")); return 1; } } // Create the gate. // Gate gate (addr, channel.in ()); // Start the agent. // Agent agent (space_craft_name, convert.get_ASCII_argv()[1], channel.in ()); orb->run (); return 0; } ACE_CATCH (CORBA::UserException, ue) { ACE_PRINT_EXCEPTION (ue, "User exception: "); return 1; } ACE_CATCH (CORBA::SystemException, se) { ACE_PRINT_EXCEPTION (se, "System exception: "); return 1; } ACE_ENDTRY; return 1; }