diff options
Diffstat (limited to 'TAO/orbsvcs/Trading_Service/Trading_Service.cpp')
-rw-r--r-- | TAO/orbsvcs/Trading_Service/Trading_Service.cpp | 372 |
1 files changed, 360 insertions, 12 deletions
diff --git a/TAO/orbsvcs/Trading_Service/Trading_Service.cpp b/TAO/orbsvcs/Trading_Service/Trading_Service.cpp index 819155e826d..d02f72e64fe 100644 --- a/TAO/orbsvcs/Trading_Service/Trading_Service.cpp +++ b/TAO/orbsvcs/Trading_Service/Trading_Service.cpp @@ -36,8 +36,37 @@ Trading_Shutdown::handle_signal (int signum, } Trading_Service::Trading_Service (void) + : federate_ (0), + ior_output_file_ (0), + bootstrapper_ (0) { - // constructor + char *trader_name = + CORBA::string_alloc (MAXHOSTNAMELEN + 10); + + if (trader_name != 0) + { + // The trader name is the concatenation of the local host name + // and the server's process id. + char host_name[MAXHOSTNAMELEN]; + ACE_INET_Addr localhost ((u_short) 0); + localhost.get_host_name (host_name, + MAXHOSTNAMELEN); + ACE_OS::sprintf (trader_name, + "%s_%ld", + host_name, + ACE_static_cast (long, ACE_OS::getpid ())); + + for (char *dot = 0; + (dot = ACE_OS::strchr (trader_name, '.')) != 0; + *dot = '_') + continue; + + ACE_DEBUG ((LM_DEBUG, + "*** Trading Service %s initializing.\n", + trader_name)); + + this->name_ = trader_name; + } } Trading_Service::~Trading_Service (void) @@ -48,38 +77,357 @@ Trading_Service::~Trading_Service (void) int Trading_Service::init (int argc, char *argv[], - CORBA::Environment &) + CORBA::Environment &ACE_TRY_ENV) { - int result_trader = - this->trading_loader_.init (argc, argv); + this->orb_manager_.init (argc, + argv, + ACE_TRY_ENV); + ACE_CHECK_RETURN (-1); - if (result_trader == -1) + if (this->parse_args (argc, argv) == -1) return -1; + CORBA::ORB_ptr orb = + this->orb_manager_.orb (); + + this->orb_manager_.activate_poa_manager (ACE_TRY_ENV); + ACE_CHECK_RETURN (-1); + + // Create a Trader Object and set its Service Type Repository. + auto_ptr<TAO_Trader_Factory::TAO_TRADER> auto_trader (TAO_Trader_Factory::create_trader (argc, argv)); + this->trader_ = + auto_trader; + TAO_Support_Attributes_i &sup_attr = + this->trader_->support_attributes (); + TAO_Trading_Components_i &trd_comp = + this->trader_->trading_components (); + sup_attr.type_repos (this->type_repos_._this (ACE_TRY_ENV)); + ACE_CHECK_RETURN (-1); + + // The Spec says: return a reference to the Lookup interface from + // the resolve_initial_references method. + CosTrading::Lookup_ptr lookup = + trd_comp.lookup_if (); + this->ior_ = + orb->object_to_string (lookup, + ACE_TRY_ENV); + ACE_CHECK_RETURN (-1); + + // Dump the ior to a file. + if (this->ior_output_file_ != 0) + { + ACE_OS::fprintf (this->ior_output_file_, + "%s", + this->ior_.in ()); + ACE_OS::fclose (this->ior_output_file_); + } + + if (this->federate_) + { + // Only become a multicast server if we're the only trader + // on the multicast network. + // @@ Could do other things. For example, every timeout + // period try to federate again, but let's not hardcode that + // policy. + if (this->bootstrap_to_federation (ACE_TRY_ENV) == -1) + this->init_multicast_server (); + } + else + this->init_multicast_server (); return 0; } int Trading_Service::run (CORBA::Environment &ACE_TRY_ENV) { + int return_value; Trading_Shutdown trading_shutdown (*this); - int return_value = - this->trading_loader_.run (ACE_TRY_ENV); + // Run the Trading Service. + return_value = this->orb_manager_.run (ACE_TRY_ENV); ACE_CHECK_RETURN (-1); return return_value; } int +Trading_Service::init_multicast_server (void) +{ +#if defined ACE_HAS_IP_MULTICAST + // Get reactor instance from TAO. + ACE_Reactor *reactor = TAO_ORB_Core_instance ()->reactor (); + + // First, see if the user has given us a multicast port number for + // the name service on the command-line; + u_short port = + TAO_ORB_Core_instance ()->orb_params ()->service_port (TRADINGSERVICE); + + if (port == 0) + { + const char *port_number = + ACE_OS::getenv ("TradingServicePort"); + + if (port_number != 0) + port = ACE_OS::atoi (port_number); + else + port = TAO_DEFAULT_TRADING_SERVER_REQUEST_PORT; + } + + // Instantiate a server that will receive requests for an ior + if (this->ior_multicast_.init ((char *) this->ior_.in (), + port, + ACE_DEFAULT_MULTICAST_ADDR, + TAO_SERVICEID_TRADINGSERVICE) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "Failed to init IOR multicast.\n"), + -1); + + // Register event handler for the ior multicast. + if (reactor->register_handler (&this->ior_multicast_, + ACE_Event_Handler::READ_MASK) == -1) + ACE_DEBUG ((LM_DEBUG, + "cannot register Event handler\n")); + else + ACE_DEBUG ((LM_DEBUG, + "The multicast server setup is done.\n")); + + // Other trader instances will bootstrap to us. + this->bootstrapper_ = 1; + +#endif /* ACE_HAS_IP_MULTICAST */ + return 0; +} + +int +Trading_Service::bootstrap_to_federation (CORBA::Environment &ACE_TRY_ENV) +{ + // If all traders follow this strategy, it creates a complete graph + // of all known traders on a multicast network. + CORBA::ORB_var orb = + this->orb_manager_.orb (); + + ACE_DEBUG ((LM_DEBUG, + "*** Bootstrapping to another Trading Service.\n")); + CORBA::Object_var trading_obj = + orb->resolve_initial_references ("TradingService"); + + if (CORBA::is_nil (trading_obj.in ())) + ACE_ERROR_RETURN ((LM_ERROR, + "We're all alone. " + "Unable to link to other traders.\n"), + -1); + + ACE_DEBUG ((LM_DEBUG, + "*** Narrowing the lookup interface.\n")); + CosTrading::Lookup_var lookup_if = + CosTrading::Lookup::_narrow (trading_obj.in (), + ACE_TRY_ENV); + ACE_CHECK_RETURN (-1); + + ACE_DEBUG ((LM_DEBUG, + "*** Obtaining the link interface.\n")); + CosTrading::Link_var link_if = + lookup_if->link_if (ACE_TRY_ENV); + ACE_CHECK_RETURN (-1); + + TAO_Trading_Components_i &trd_comp = + this->trader_->trading_components (); + CosTrading::Lookup_ptr our_lookup = + trd_comp.lookup_if (); + CosTrading::Link_ptr our_link = + trd_comp.link_if (); + + ACE_DEBUG ((LM_DEBUG, + "*** Linking found trader to self.\n")); + link_if->add_link (this->name_.in (), + our_lookup, + CosTrading::always, + CosTrading::always, + ACE_TRY_ENV); + ACE_CHECK_RETURN (-1); + + ACE_DEBUG ((LM_DEBUG, + "*** Linking self to found trader.\n")); + our_link->add_link ("Bootstrap", + lookup_if.in (), + CosTrading::always, + CosTrading::always, + ACE_TRY_ENV); + ACE_CHECK_RETURN (-1); + + ACE_DEBUG ((LM_DEBUG, + "*** Retrieving list of known linked traders.\n")); + CosTrading::LinkNameSeq_var link_name_seq = + link_if->list_links (ACE_TRY_ENV); + ACE_CHECK_RETURN (-1); + + ACE_DEBUG ((LM_DEBUG, + "*** Linking self to all linked traders.\n")); + for (CORBA::ULong i = link_name_seq->length () - 1; + i > 0; + i--) + { + // Avoid linking to ourselves. + if (ACE_OS::strcmp (ACE_static_cast (const char *, + link_name_seq[i]), + this->name_.in ()) != 0) + { + ACE_DEBUG ((LM_DEBUG, + "*** Getting info for link %s.\n", + ACE_static_cast (const char *, + link_name_seq[i]))); + CosTrading::Link::LinkInfo_var link_info = + link_if->describe_link (link_name_seq[i], + ACE_TRY_ENV); + ACE_CHECK_RETURN (-1); + + CosTrading::Lookup_ptr remote_lookup; + remote_lookup = link_info->target.in (); + + ACE_DEBUG ((LM_DEBUG, + "*** Retrieving its link interface.\n")); + CosTrading::Link_var remote_link = + remote_lookup->link_if (ACE_TRY_ENV); + ACE_CHECK_RETURN (-1); + + ACE_DEBUG ((LM_DEBUG, + "*** Creating a link to me from it.\n")); + remote_link->add_link (this->name_.in (), + our_lookup, + CosTrading::always, + CosTrading::always, + ACE_TRY_ENV); + ACE_CHECK_RETURN (-1); + + ACE_DEBUG ((LM_DEBUG, + "*** Creating a link to it from me.\n")); + our_link->add_link (link_name_seq[i], + remote_lookup, + CosTrading::always, + CosTrading::always, + ACE_TRY_ENV); + ACE_CHECK_RETURN (-1); + } + } + + return 0; +} + +int Trading_Service::shutdown (void) { - // Invoke TAO_Trading_Loader::fini () - int shutdown_result = - this->trading_loader_.fini (); + ACE_TRY_NEW_ENV + { + if (this->trader_.get () != 0) + { + TAO_Trading_Components_i& trd_comp + = this->trader_->trading_components (); + CosTrading::Link_ptr our_link = + trd_comp.link_if (); - if (shutdown_result == -1) - return -1; + CosTrading::LinkNameSeq_var link_name_seq = + our_link->list_links (ACE_TRY_ENV); + ACE_TRY_CHECK; + + ACE_DEBUG ((LM_DEBUG, + "*** Unlinking from federated traders.\n")); + + for (CORBA::ULong j = 0; + j != link_name_seq->length (); + ++j) + { + CORBA::ULong i = + link_name_seq->length () - j - 1; + + ACE_DEBUG ((LM_DEBUG, + "*** Describing the next link.\n")); + CosTrading::Link::LinkInfo_var link_info = + our_link->describe_link (link_name_seq[i], + ACE_TRY_ENV); + ACE_TRY_CHECK; + + ACE_DEBUG ((LM_DEBUG, + "*** Removing link to %s.\n", + ACE_static_cast (const char *, + link_name_seq[i]))); + our_link->remove_link (link_name_seq[i], + ACE_TRY_ENV); + ACE_TRY_CHECK; + + CosTrading::Lookup_ptr remote_lookup; + remote_lookup = + link_info->target.in (); + + ACE_DEBUG ((LM_DEBUG, + "*** Retrieving its link interface.\n")); + CosTrading::Link_var remote_link = + remote_lookup->link_if (ACE_TRY_ENV); + ACE_TRY_CHECK; + + ACE_DEBUG ((LM_DEBUG, + "*** Removing its link to us.\n")); + + if (this->bootstrapper_) + remote_link->remove_link ("Bootstrap", + ACE_TRY_ENV); + else + remote_link->remove_link (this->name_.in (), + ACE_TRY_ENV); + ACE_TRY_CHECK; + } + } + } + ACE_CATCHANY + { + // ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "Trading Service shutting down"); + } + ACE_ENDTRY; + + return 0; +} + +int +Trading_Service::parse_args (int& argc, char *argv[]) +{ + ACE_Arg_Shifter arg_shifter (argc, argv); + + while (arg_shifter.is_anything_left ()) + { + char *current_arg = arg_shifter.get_current (); + + if (ACE_OS::strcmp (current_arg, + "-TSfederate") == 0) + { + arg_shifter.consume_arg (); + this->federate_ = 1; + } + if (ACE_OS::strcmp (current_arg, + "-TSdumpior") == 0) + { + arg_shifter.consume_arg (); + if (arg_shifter.is_parameter_next ()) + { + + char *file_name = + arg_shifter.get_current (); + this->ior_output_file_ = + ACE_OS::fopen (file_name, "w"); + + if (this->ior_output_file_ == 0) + ACE_ERROR_RETURN ((LM_ERROR, + "Unable to open %s for writing: %p\n", + file_name), -1); + arg_shifter.consume_arg (); + } + else + this->ior_output_file_ = + ACE_OS::fdopen (ACE_STDOUT, + "w"); + } + + else + arg_shifter.ignore_arg (); + } return 0; } |