diff options
author | huangming <huangminghuang@users.noreply.github.com> | 2003-10-15 18:28:41 +0000 |
---|---|---|
committer | huangming <huangminghuang@users.noreply.github.com> | 2003-10-15 18:28:41 +0000 |
commit | cd884da7fa189da8b338d90a13600bb23eceb95b (patch) | |
tree | 61b84981017228729842d0e5b8b54a907fbb174c | |
parent | 19ad9cc2b3a8625f902bfd1384f17cf17b017b2c (diff) | |
download | ATCD-cd884da7fa189da8b338d90a13600bb23eceb95b.tar.gz |
Wed Oct 15 13:00:24 2003 Huang-Ming Huang <hh1@cse.wustl.edu>
25 files changed, 1865 insertions, 0 deletions
diff --git a/TAO/orbsvcs/FTRT_Event_Service/Event_Service/FTRT_Event_Service.mpc b/TAO/orbsvcs/FTRT_Event_Service/Event_Service/FTRT_Event_Service.mpc new file mode 100644 index 00000000000..ad6d724e165 --- /dev/null +++ b/TAO/orbsvcs/FTRT_Event_Service/Event_Service/FTRT_Event_Service.mpc @@ -0,0 +1,9 @@ +project(FTRT_Event_Service) : orbsvcsexe, rtschedevent, ftrtevent, ftorbutils { + exename = ftrt_eventservice + includes += $(TAO_ROOT)/orbsvcs $(TAO_ROOT)/orbsvcs/FtRtEvent/Utils $(TAO_ROOT)/orbsvcs/FtRtEvent/EventChannel + after += FTRT_EventChannel + libs += TAO_Utils TAO_FTRT_EventChannel + specific(gnuace) { + lit_libs += TAO_Strategies + } +}
\ No newline at end of file diff --git a/TAO/orbsvcs/FTRT_Event_Service/Event_Service/FT_EventService.cpp b/TAO/orbsvcs/FTRT_Event_Service/Event_Service/FT_EventService.cpp new file mode 100644 index 00000000000..d0c04d42f2e --- /dev/null +++ b/TAO/orbsvcs/FTRT_Event_Service/Event_Service/FT_EventService.cpp @@ -0,0 +1,295 @@ +// $Id$ + +#include "FT_EventService.h" +#include "ace/Argv_Type_Converter.h" +#include "ace/Thread_Manager.h" +#include "ace/Get_Opt.h" +#include "ace/INET_Addr.h" +#include "ace/SOCK_Connector.h" +#include "ace/SOCK_Stream.h" +#include "orbsvcs/Sched/Config_Scheduler.h" +#include "orbsvcs/Scheduler_Factory.h" +#include "orbsvcs/FtRtEvent/EventChannel/FTRTEC_ServiceActivate.h" + +ACE_RCSID (Event_Service, + FT_EventService, + "$Id$") + + +int ACE_TMAIN (int argc, ACE_TCHAR* argv[]) +{ + FT_EventService event_service; + return event_service.run (argc, argv); +} + +FT_EventService::FT_EventService() +: global_scheduler_(0) +, sched_impl_(0) +, membership_(TAO_FTEC_Event_Channel::NONE) +, num_threads_(1) +, task_(orb_) +{ +} + +FT_EventService::~FT_EventService() +{ + delete sched_impl_; +} + +int +FT_EventService::run(int argc, ACE_TCHAR* argv[]) +{ + ACE_TRY_NEW_ENV + { + // Make a copy of command line parameter. + ACE_Argv_Type_Converter command(argc, argv); + + // Initialize ORB. + orb_ = CORBA::ORB_init (command.get_argc(), + command.get_ASCII_argv(), + "" ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + if (this->parse_args (command.get_argc(), command.get_TCHAR_argv()) == -1) + return 1; + + CORBA::Object_var root_poa_object = + orb_->resolve_initial_references("RootPOA" + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + if (CORBA::is_nil (root_poa_object.in ())) + ACE_ERROR_RETURN ((LM_ERROR, + " (%P|%t) Unable to initialize the root POA.\n"), + 1); + + PortableServer::POA_var root_poa = + PortableServer::POA::_narrow (root_poa_object.in () ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + PortableServer::POAManager_var poa_manager = + root_poa->the_POAManager (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + poa_manager->activate (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + CORBA::Object_var naming_obj = + orb_->resolve_initial_references ("NameService" ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + if (CORBA::is_nil (naming_obj.in ())) + ACE_ERROR_RETURN ((LM_ERROR, + " (%P|%t) Unable to initialize the Naming Service.\n"), + 1); + + CosNaming::NamingContext_var naming_context = + CosNaming::NamingContext::_narrow (naming_obj.in () ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + setup_scheduler(naming_context.in() ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN(-1); + + + poa_manager->activate(ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + // Activate the Event channel implementation + + TAO_FTEC_Event_Channel ec(orb_, root_poa); + + FtRtecEventChannelAdmin::EventChannel_var ec_ior = + ec.activate(membership_ + ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + if (report_factory(orb_.in(), ec_ior.in())==-1) + return -1; + + orb_->run(ACE_ENV_SINGLE_ARG_PARAMETER); + } + ACE_CATCHANY + { + ACE_PRINT_EXCEPTION(ACE_ANY_EXCEPTION, "A CORBA Exception occurred."); + } + ACE_ENDTRY; + + ACE_CHECK_RETURN(-1); + + ACE_Thread_Manager::instance()->wait(); + return 0; +} + +int +FT_EventService::parse_args (int argc, ACE_TCHAR* argv []) +{ + /// get the membership from the environment variable + char* member = ACE_OS::getenv("FTEC_MEMBERSHIP"); + + membership_ = TAO_FTEC_Event_Channel::NONE; + + if (member) { + if (ACE_OS::strcasecmp(member, "PRIMARY")==0) { + membership_ = TAO_FTEC_Event_Channel::PRIMARY; + } + else if (ACE_OS::strcasecmp(member, "BACKUP")==0) { + membership_ = TAO_FTEC_Event_Channel::BACKUP; + } + } + + char* n_threads = ACE_OS::getenv("FTEC_NUM_THREAD"); + + this->num_threads_ = 1; + if (n_threads) + this->num_threads_ = ACE_OS::atoi(n_threads); + + ACE_Get_Opt get_opt (argc, argv, ACE_LIB_TEXT("jn:ps:")); + int opt; + + while ((opt = get_opt ()) != EOF) + { + switch (opt) + { + case 'j': + this->membership_ = TAO_FTEC_Event_Channel::BACKUP; + break; + case 'n': + this->num_threads_ = ACE_OS::atoi(get_opt.opt_arg ()); + break; + case 'p': + this->membership_ = TAO_FTEC_Event_Channel::PRIMARY; + break; + case 's': + // It could be just a flag (i.e. no "global" or "local" + // argument, but this is consistent with the EC_Multiple + // test and also allows for a runtime scheduling service. + + if (ACE_OS::strcasecmp (get_opt.opt_arg (), ACE_LIB_TEXT("global")) == 0) + { + this->global_scheduler_ = 1; + } + else if (ACE_OS::strcasecmp (get_opt.opt_arg (), ACE_LIB_TEXT("local")) == 0) + { + this->global_scheduler_ = 0; + } + else + { + ACE_DEBUG ((LM_DEBUG, + ACE_LIB_TEXT("Unknown scheduling type <%s> ") + ACE_LIB_TEXT("defaulting to local\n"), + get_opt.opt_arg ())); + this->global_scheduler_ = 0; + } + break; + + case '?': + default: + ACE_DEBUG ((LM_DEBUG, + ACE_LIB_TEXT("Usage: %s \n") + ACE_LIB_TEXT(" -j join the object group\n") + ACE_LIB_TEXT(" -p set as primary\n") + ACE_LIB_TEXT(" -s <global|local> \n") + ACE_LIB_TEXT("\n"), + argv[0])); + return -1; + } + } + + if (this->num_threads_ < 1) + ACE_ERROR_RETURN((LM_ERROR, "Invalid number of threads specified\n"), -1); + + return 0; +} + +void +FT_EventService::setup_scheduler(CosNaming::NamingContext_ptr naming_context + ACE_ENV_ARG_DECL_WITH_DEFAULTS) +{ + RtecScheduler::Scheduler_var scheduler; + if (CORBA::is_nil(naming_context)) { + ACE_NEW_THROW_EX (this->sched_impl_, + ACE_Config_Scheduler, + CORBA::NO_MEMORY()); + + scheduler = this->sched_impl_->_this (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + if (ACE_Scheduler_Factory::server(scheduler.in()) == -1) + ACE_ERROR((LM_ERROR,"Unable to install scheduler\n")); + } + else { + // This is the name we (potentially) register the Scheduling + // Service in the Naming Service. + CosNaming::Name schedule_name (1); + schedule_name.length (1); + schedule_name[0].id = CORBA::string_dup ("ScheduleService"); + + + if (1) + { + // We must find the scheduler object reference... + + if (this->global_scheduler_ == 0) + { + ACE_NEW_THROW_EX (this->sched_impl_, + ACE_Config_Scheduler, + CORBA::NO_MEMORY()); + + scheduler = this->sched_impl_->_this (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + // Register the servant with the Naming Context.... + naming_context->rebind (schedule_name, scheduler.in () + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + } + else + { + CORBA::Object_var tmp = + naming_context->resolve (schedule_name ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + scheduler = RtecScheduler::Scheduler::_narrow (tmp.in () + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + } + } + } +} + +int +FT_EventService::report_factory(CORBA::ORB_ptr orb, + FtRtecEventChannelAdmin::EventChannel_ptr ec) +{ + char* addr = ACE_OS::getenv("EventChannelFactoryAddr"); + + if (addr != NULL) { + // instaniated by object factory, report my ior back to the factory + ACE_INET_Addr factory_addr(addr); + ACE_SOCK_Connector connector; + ACE_SOCK_Stream stream; + + ACE_DEBUG((LM_DEBUG,"connecting to %s\n",addr)); + if (connector.connect(stream, factory_addr) == -1) + ACE_ERROR_RETURN((LM_ERROR, "(%P|%t) Invalid Factory Address\n"), -1); + + ACE_DEBUG((LM_DEBUG,"Factory connected\n")); + CORBA::String_var my_ior_string = orb->object_to_string(ec + ACE_ENV_ARG_PARAMETER); + + ACE_TRY_CHECK; + int len = strlen(my_ior_string.in()) ; + + if (stream.send_n(my_ior_string.in(), len) != len) + ACE_ERROR_RETURN((LM_ERROR, "(%P|%t) IOR Transmission Error\n"), -1); + + stream.close(); + } + return 0; +} + +void FT_EventService::become_primary() +{ + if (this->num_threads_ > 1) { + task_.activate(THR_NEW_LWP | THR_JOINABLE, num_threads_-1); + } +} + diff --git a/TAO/orbsvcs/FTRT_Event_Service/Event_Service/FT_EventService.h b/TAO/orbsvcs/FTRT_Event_Service/Event_Service/FT_EventService.h new file mode 100644 index 00000000000..d65b67dc17c --- /dev/null +++ b/TAO/orbsvcs/FTRT_Event_Service/Event_Service/FT_EventService.h @@ -0,0 +1,52 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file FT_EventService.h + * + * $Id$ + * + * @author Huang-Ming Huang <hh1@cse.wustl.edu> + */ +//============================================================================= +#ifndef FT_EVENTSERVICE_H +#define FT_EVENTSERVICE_H + +#include "orbsvcs/CosNamingC.h" +#include "orbsvcs/FtRtecEventChannelAdminC.h" +#include "orbsvcs/FtRtEvent/EventChannel/FTEC_Event_Channel.h" +#include "orbsvcs/FtRtEvent/EventChannel/FTEC_Become_Primary_Listener.h" +#include "TP_Task.h" + +namespace POA_RtecScheduler +{ + class Scheduler; +}; + +class FT_EventService : private TAO_FTEC_Become_Primary_Listener +{ +public: + FT_EventService(); + ~FT_EventService(); + + int run(int argc, ACE_TCHAR* argv[]); + +private: + int parse_args (int argc, ACE_TCHAR* argv []); + void setup_scheduler(CosNaming::NamingContext_ptr naming_context + ACE_ENV_ARG_DECL_WITH_DEFAULTS); + int report_factory(CORBA::ORB_ptr orb, + FtRtecEventChannelAdmin::EventChannel_ptr ec); + virtual void become_primary(); + + int global_scheduler_; + // Should we use a global scheduler or a local one? + POA_RtecScheduler::Scheduler *sched_impl_; + // The Scheduler implementation. + TAO_FTEC_Event_Channel::MEMBERSHIP membership_; + int num_threads_; + CORBA::ORB_var orb_; + TP_Task task_; +}; + +#endif diff --git a/TAO/orbsvcs/FTRT_Event_Service/Event_Service/TP_Task.h b/TAO/orbsvcs/FTRT_Event_Service/Event_Service/TP_Task.h new file mode 100644 index 00000000000..f5f2d410cac --- /dev/null +++ b/TAO/orbsvcs/FTRT_Event_Service/Event_Service/TP_Task.h @@ -0,0 +1,38 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file TP_Task.h + * + * $Id$ + * + * @author Huang-Ming Huang <hh1@cse.wustl.edu> + */ +//============================================================================= +#ifndef TP_TASK_H +#define TP_TASK_H + +#include "ace/Task.h" +#include "tao/corba.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +class TP_Task : public ACE_Task_Base { +public: + TP_Task(CORBA::ORB_var& orb) + : orb_(orb) + { + } + + ~TP_Task(){} + virtual int svc() { + this->orb_->run(); + return 0; + } +private: + CORBA::ORB_var& orb_; +}; + +#endif diff --git a/TAO/orbsvcs/FTRT_Event_Service/Event_Service/svc.conf b/TAO/orbsvcs/FTRT_Event_Service/Event_Service/svc.conf new file mode 100644 index 00000000000..d23c985de19 --- /dev/null +++ b/TAO/orbsvcs/FTRT_Event_Service/Event_Service/svc.conf @@ -0,0 +1,20 @@ +## $Id$ + +## The value of FTEC_DETECTOR_TRANSPORT_PROTOCL should be either "sctp" or "tcp". +## The FTEC_HEART_BEAT is only meanful for stcp. It represents the heart beat in +## seconds for the connections between replicas. + +static FTRTEC_Fault_Detector "$FTEC_DETECTOR_TRANSPORT_PROTOCL -HeartBeat $FTEC_HEART_BEAT" + + +## If FTEC_REPLICATION_STRATEGY is set to AMI, then use AMI for replicating operations; +## otherwise, two-way CORBA call is used for replication. + +static FTRTEC_Replication "$FTEC_REPLICATION_STRATEGY" + +## FTEC_EVENT_SERVICE_NAME is used for the event channnel to register to the naming service. +## FTEC_OBJECT_ID should be a globally unique identifier defined in +## draft-leach-uuids-guids-01.txt. This value should be the same +## for the entire object group. + +static FTRTEC_Identification "-Name $FTEC_EVENT_SERVICE_NAME -Object_ID $FTEC_OBJECT_ID"
\ No newline at end of file diff --git a/TAO/orbsvcs/FTRT_Event_Service/Factory_Service/EventChannelFactory_i.cpp b/TAO/orbsvcs/FTRT_Event_Service/Factory_Service/EventChannelFactory_i.cpp new file mode 100644 index 00000000000..2ec8cf8f842 --- /dev/null +++ b/TAO/orbsvcs/FTRT_Event_Service/Factory_Service/EventChannelFactory_i.cpp @@ -0,0 +1,201 @@ +// $Id$ + +#include "EventChannelFactory_i.h" +#include "ace/Process.h" +#include "ace/Read_Buffer.h" +#include "ace/SOCK_Acceptor.h" +#include "ace/SOCK_Stream.h" +#include "orbsvcs/FtRtEvent/Utils/ScopeGuard.h" +#include "orbsvcs/FtRtEvent/Utils/UUID.h" + +ACE_RCSID (Factory_Service, + EventChannelFactory_i, + "$Id$") + + +const int WAIT_FOR_REGISTRATION_TIME = 10; //sec +const int MAX_ID_STRING = 10; + +EventChannelFactory_i::EventChannelFactory_i(const char* conf_filename, CORBA::ORB_ptr orb_ptr) +: conf_file(conf_filename), id(0), orb(orb_ptr) +{ +} + +CORBA::Object_ptr EventChannelFactory_i::create_object ( + const char * type_id, + const FT::Criteria & the_criteria, + FT::GenericFactory::FactoryCreationId_out factory_creation_id + ACE_ENV_ARG_DECL_WITH_DEFAULTS + ) + ACE_THROW_SPEC (( + CORBA::SystemException + , FT::NoFactory + , FT::ObjectNotCreated + , FT::InvalidCriteria + , FT::InvalidProperty + , FT::CannotMeetCriteria + )) +{ + + ACE_DEBUG((LM_DEBUG,"EventChannelFactory_i::create_object\n")); + + FILE* file = fopen(conf_file, "r"); + if (file == NULL) + ACE_THROW_RETURN(FT::NoFactory(), CORBA::Object::_nil()); + + ScopeGuard file_guard = MakeGuard(fclose, file); + ACE_UNUSED_ARG(file_guard); + + char *id=0, *prog=0; + ACE_Read_Buffer read_buf(file); + ScopeGuard id_guard = MakeObjGuard(* ACE_Allocator::instance(), + &ACE_Allocator::free, id); + ACE_UNUSED_ARG(id_guard); + + ScopeGuard prog_guard = MakeObjGuard(* ACE_Allocator::instance(), + &ACE_Allocator::free, prog); + ACE_UNUSED_ARG(prog_guard); + + while ((id = read_buf.read(' ')) != NULL && + (prog = read_buf.read('\n')) != NULL) { + id[strlen(id)-1] = '\0'; + if (strcmp(id, type_id) == 0) { + return create_process(prog, the_criteria, factory_creation_id); + } + } + + ACE_THROW_RETURN(FT::ObjectNotCreated(), CORBA::Object::_nil()); +} + +void EventChannelFactory_i::delete_object ( + const FT::GenericFactory::FactoryCreationId & factory_creation_id + ACE_ENV_ARG_DECL_WITH_DEFAULTS + ) + ACE_THROW_SPEC (( + CORBA::SystemException + , FT::ObjectNotFound + )) +{ + ACE_TRACE("EventChannelFactory_i::delete_object"); + CORBA::ULong object_id; + factory_creation_id >>= object_id; + CORBA::Object_var obj; + if (objects.find(object_id, obj) == 0) { + objects.unbind(object_id); + //obj->shutdown(ACE_ENV_SINGLE_ARG_PARAMETER); + } +} + +CORBA::Object_ptr EventChannelFactory_i::create_process ( + char * process_str, + const FT::Criteria & the_criteria, + FT::GenericFactory::FactoryCreationId_out factory_creation_id) +{ + ACE_TRACE("EventChannelFactory_i::create_prcess"); + + CORBA::Object_ptr result = CORBA::Object::_nil(); + + // fill the factory_creation_id + + ACE_NEW_RETURN(factory_creation_id, + FT::GenericFactory::FactoryCreationId, + CORBA::Object::_nil()); + *factory_creation_id <<= (CORBA::ULong) ++id; + + // create an acceptor and get the listen address + + ACE_SOCK_Acceptor acceptor; + ACE_INET_Addr server_addr; + acceptor.open(server_addr); + acceptor.get_local_addr(server_addr); + + ACE_Process_Options options; + ACE_CString str; + + char* pos = ACE_OS::strrchr(process_str, '/'); + if (pos !=0) { // + *pos = '\0'; + options.working_directory(process_str); + *pos = '/'; + } + str = process_str; + + const int ENV_BUF_LEN = 512; + char buf[ENV_BUF_LEN]; + server_addr.addr_to_string(buf,ENV_BUF_LEN,0); + options.setenv("EventChannelFactoryAddr", buf); + + // extract the object ID from the criteria + for (size_t i = 0; i < the_criteria.length(); ++i) + { + const CosNaming::Name& name = the_criteria[i].nam; + if (name.length() > 0) { + const char* val; + const char* id = name[0].id.in(); + the_criteria[i].val >>= val; + if (id[0] != '-') // environment variable + options.setenv(id, "%s", val); + else {// command line option + ACE_OS::sprintf(buf, " %s %s", id, val); + str += buf; + } + } + } + + ACE_DEBUG((LM_DEBUG, "Command Line : %s\n", str.c_str())); + + options.command_line(str.c_str()); + + // Try to create a new process running date. + ACE_Process new_process; + + ACE_Time_Value timeout(WAIT_FOR_REGISTRATION_TIME); + timeout += ACE_OS::gettimeofday(); + if (new_process.spawn (options) == -1) + { + int error = ACE_OS::last_error (); + ACE_ERROR ((LM_ERROR, + "%p errno = %d.\n", + str.c_str(), + error)); + return result; + } + + ACE_INET_Addr client_addr; + ACE_SOCK_Stream stream; + + ACE_DEBUG((LM_DEBUG, "accepting connection from event channel\n")); + if (acceptor.accept(stream, &client_addr, &timeout) != -1) + { + ACE_DEBUG((LM_DEBUG, "Factory Connect established with %s:%d\n", + client_addr.get_host_name(), client_addr.get_port_number() )); + + // receive the ior string from the created object + + char ior[5000] = {'0'}; + int n = 0; + int byteRead=0; + while ((n = stream.recv(ior+byteRead, 5000-byteRead))) { + byteRead += n; + } + + if (strlen(ior) ==0) + return result; + + + CORBA::Object_var result = orb->string_to_object(ior + ACE_ENV_ARG_PARAMETER); + + ACE_CHECK_RETURN(result); + + if (objects.bind(id, result) ==0){ + return result._retn(); + } + } + else { + ACE_DEBUG((LM_DEBUG,"accept fail\n")); + } + + return result; +} + diff --git a/TAO/orbsvcs/FTRT_Event_Service/Factory_Service/EventChannelFactory_i.h b/TAO/orbsvcs/FTRT_Event_Service/Factory_Service/EventChannelFactory_i.h new file mode 100644 index 00000000000..9ceba6abc3b --- /dev/null +++ b/TAO/orbsvcs/FTRT_Event_Service/Factory_Service/EventChannelFactory_i.h @@ -0,0 +1,65 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file EventChannelFactory_i.h + * + * $Id$ + * + * @author Huang-Ming Huang <hh1@cse.wustl.edu> + */ +//============================================================================= + +#ifndef EVENTCHANNELFACTORY_I_H +#define EVENTCHANNELFACTORY_I_H + +#include "orbsvcs/FT_CORBAS.h" +#include "ace/Synch.h" +#include "ace/Hash_Map_Manager_T.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +class EventChannelFactory_i : public POA_FT::GenericFactory { +public: + EventChannelFactory_i(const char* conf_filename, CORBA::ORB_ptr); + + virtual CORBA::Object_ptr create_object ( + const char * type_id, + const FT::Criteria & the_criteria, + FT::GenericFactory::FactoryCreationId_out factory_creation_id + ACE_ENV_ARG_DECL_WITH_DEFAULTS + ) + ACE_THROW_SPEC (( + CORBA::SystemException + , FT::NoFactory + , FT::ObjectNotCreated + , FT::InvalidCriteria + , FT::InvalidProperty + , FT::CannotMeetCriteria + )) ; + + virtual void delete_object ( + const FT::GenericFactory::FactoryCreationId & factory_creation_id + ACE_ENV_ARG_DECL_WITH_DEFAULTS + ) + ACE_THROW_SPEC (( + CORBA::SystemException + , FT::ObjectNotFound + )); + +private: + CORBA::Object_ptr create_process ( + char * process, + const FT::Criteria & the_criteria, + FT::GenericFactory::FactoryCreationId_out factory_creation_id); + + const char* conf_file; + int id; + CORBA::ORB_ptr orb; + typedef ACE_Hash_Map_Manager<int, CORBA::Object_var, ACE_Null_Mutex> Objects; + Objects objects; +}; +#endif + diff --git a/TAO/orbsvcs/FTRT_Event_Service/Factory_Service/FTRTEC_Factory_Service.cpp b/TAO/orbsvcs/FTRT_Event_Service/Factory_Service/FTRTEC_Factory_Service.cpp new file mode 100644 index 00000000000..0a68d5fcdf5 --- /dev/null +++ b/TAO/orbsvcs/FTRT_Event_Service/Factory_Service/FTRTEC_Factory_Service.cpp @@ -0,0 +1,157 @@ +// $Id$ + +#include "EventChannelFactory_i.h" +#include "ace/Task.h" +#include "ace/SString.h" +#include "ace/Get_Opt.h" + +ACE_RCSID (Factory_Service, + FTRTEC_Factory_Service, + "$Id$") + +namespace { + ACE_CString id, kind, output; +} + +int parse_args(int argc, char* argv[]) +{ + ACE_Get_Opt get_opt (argc, argv, ACE_LIB_TEXT("i:k:o:")); + int opt; + + int result = 0; + while ((opt = get_opt ()) != EOF) + { + switch (opt) + { + case 'i': + id = get_opt.opt_arg (); + break; + case 'k': + kind = get_opt.opt_arg (); + break; + case 'o': + output = get_opt.opt_arg (); + break; + default: + result = -1; + break; + } + } + + if (result == -1 || (id.length() == 0 && output.length() == 0)) + { + ACE_DEBUG ((LM_DEBUG, + ACE_LIB_TEXT("Usage: %s \n") + ACE_LIB_TEXT(" [-i id] set the id that is used to register to the naming service\n") + ACE_LIB_TEXT(" [-k kind] set the kind that is used to register to the naming service\n") + ACE_LIB_TEXT(" [-o filename] set the output file name for the IOR\n") + ACE_LIB_TEXT("\n"), + argv[0])); + return -1; + } + return 0; +} + +int main(int argc, ACE_TCHAR* argv[]) +{ + + ACE_DECLARE_NEW_CORBA_ENV; + ACE_TRY { + CORBA::ORB_var orb = CORBA::ORB_init(argc, argv + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + + if (parse_args(argc, argv) == -1) + return -1; + + CORBA::Object_var obj = + orb->resolve_initial_references("RootPOA" + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + PortableServer::POA_var poa = + PortableServer::POA::_narrow(obj.in() + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + PortableServer::POAManager_var mgr = poa->the_POAManager(ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + mgr->activate(ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + EventChannelFactory_i servant("factory.cfg", orb.in()); + + FT::GenericFactory_var event_channel_factory = + servant._this(ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + // register to the Event Service + + if (id.length()) { + CORBA::Object_var namng_contex_object = + orb->resolve_initial_references("NameService" + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + CosNaming::NamingContext_var naming_context = + CosNaming::NamingContext::_narrow(namng_contex_object.in() + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + // register to naming service + CosNaming::Name name(1); + name.length(1); + name[0].id = CORBA::string_dup(id.c_str()); + if (kind.length()) + name[0].kind = CORBA::string_dup(kind.c_str()); + + naming_context->bind(name, event_channel_factory.in() + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + ACE_DEBUG((LM_DEBUG, "Register to naming service with %s", id.c_str())); + if (kind.length()) + ACE_DEBUG((LM_DEBUG, ", %s", kind.c_str())); + ACE_DEBUG((LM_DEBUG,"\n")); + } + + if (output.length()) { + // get the IOR of factory + CORBA::String_var str = orb->object_to_string(event_channel_factory.in() + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + if (ACE_OS::strcmp(output.c_str(), "") != 0) + { + FILE *output_file= + ACE_OS::fopen (ACE_TEXT_CHAR_TO_TCHAR(output.c_str()), + ACE_LIB_TEXT("w")); + if (output_file == 0) + ACE_ERROR_RETURN ((LM_ERROR, + "Cannot open output file for writing IOR: %s", + output.c_str()), + 1); + ACE_OS::fprintf (output_file, "%s", str.in ()); + ACE_OS::fclose (output_file); + } + } + + ACE_TRY_CHECK; + + orb->run(ACE_ENV_SINGLE_ARG_PARAMETER); + + } + ACE_CATCHANY + { + ACE_PRINT_EXCEPTION(ACE_ANY_EXCEPTION, "A CORBA Exception occurred."); + } + ACE_ENDTRY; + + + ACE_CHECK_RETURN(1); + + return 0; +} + diff --git a/TAO/orbsvcs/FTRT_Event_Service/Factory_Service/FTRTEC_Factory_Service.mpc b/TAO/orbsvcs/FTRT_Event_Service/Factory_Service/FTRTEC_Factory_Service.mpc new file mode 100644 index 00000000000..55b7f390558 --- /dev/null +++ b/TAO/orbsvcs/FTRT_Event_Service/Factory_Service/FTRTEC_Factory_Service.mpc @@ -0,0 +1,11 @@ +project(FTRTEC_Factory_Service): orbsvcsexe, ftorbutils, notify { + after += FaultTolerance + exename = ftrtec_factory_service + includes += $(TAO_ROOT)/orbsvcs + libs += TAO_FaultTolerance + + specific(gnuace) { + lit_libs += TAO_Strategies + } +} + diff --git a/TAO/orbsvcs/FTRT_Event_Service/Factory_Service/factory.cfg b/TAO/orbsvcs/FTRT_Event_Service/Factory_Service/factory.cfg new file mode 100644 index 00000000000..021110f6781 --- /dev/null +++ b/TAO/orbsvcs/FTRT_Event_Service/Factory_Service/factory.cfg @@ -0,0 +1,2 @@ +IDL:FtRtecEventChannelAdmin/EventChannel:1.0 ../FTRT_Event_Service/ftrt_eventservice +IDL:RtecEventChannelAdmin/EventChannel:1.0 ../FTRT_Event_Service/ftrt_eventservice
\ No newline at end of file diff --git a/TAO/orbsvcs/FTRT_Event_Service/Gateway_Service/FTRTEC_Gateway_Service.cpp b/TAO/orbsvcs/FTRT_Event_Service/Gateway_Service/FTRTEC_Gateway_Service.cpp new file mode 100644 index 00000000000..9178b08b0be --- /dev/null +++ b/TAO/orbsvcs/FTRT_Event_Service/Gateway_Service/FTRTEC_Gateway_Service.cpp @@ -0,0 +1,134 @@ +// $Id$ + +#include "ace/Get_Opt.h" +#include "orbsvcs/FtRtEvent/Utils/FTEC_Gateway.h" +#include "orbsvcs/FtRtEvent/Utils/resolve_init.h" +/// include this file to statically linked with FT ORB +#include "orbsvcs/FaultTolerance/FT_ClientService_Activate.h" + +/// include this file to statically linked with Transaction Depth +#include "orbsvcs/FtRtEvent/ClientORB/FTRT_ClientORB_Loader.h" + +ACE_RCSID (Gateway_Service, + FTRTEC_Gateway_Service, + "$Id$") + + +namespace { + CORBA::ORB_var orb; + FtRtecEventChannelAdmin::EventChannel_var ftec; + ACE_CString ior_file_name; +} + +int parse_args(int argc, ACE_TCHAR** argv) +{ + ACE_TRY_NEW_ENV { + ACE_Get_Opt get_opt (argc, argv, ACE_LIB_TEXT("i:n:o:")); + int opt; + CosNaming::Name name(1); + name.length(1); + name[0].id = CORBA::string_dup("FT_EventService"); + + while ((opt = get_opt ()) != EOF) + { + switch (opt) + { + case 'i': + { + CORBA::Object_var obj = orb->string_to_object(get_opt.opt_arg () + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + ftec = FtRtecEventChannelAdmin::EventChannel::_narrow(obj.in() + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + } + break; + case 'n': + name[0].id = CORBA::string_dup(get_opt.opt_arg ()); + break; + case 'o': + ior_file_name = get_opt.opt_arg (); + break; + } + } + + if (CORBA::is_nil(ftec.in())) { + /// we should get the ftec from Naming Service + + CosNaming::NamingContext_var naming_context = + resolve_init<CosNaming::NamingContext>(orb.in(), "NameService" + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + ftec = resolve<FtRtecEventChannelAdmin::EventChannel>(naming_context.in(), + name + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + if (CORBA::is_nil(ftec.in())) + ACE_ERROR_RETURN((LM_ERROR, "Cannot Find FT_EventService\n"), -1); + } + } + ACE_CATCHANY { + ACE_ERROR_RETURN((LM_ERROR, "Cannot Find FT_EventService\n"), -1); + } + ACE_ENDTRY; + return 0; +} + +int main(int argc, ACE_TCHAR** argv) +{ + ACE_TRY_NEW_ENV + { + orb = CORBA::ORB_init (argc, argv ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + if (parse_args(argc, argv)==-1) + return 1; + + PortableServer::POA_var + root_poa = resolve_init<PortableServer::POA>(orb.in(), "RootPOA" + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + // create POAManager + PortableServer::POAManager_var + mgr = root_poa->the_POAManager(ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + + mgr->activate(ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + TAO_FTRTEC::FTEC_Gateway gateway_servant(orb.in(), ftec.in()); + + RtecEventChannelAdmin::EventChannel_var gateway = + gateway_servant.activate(root_poa.in() ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + if (ior_file_name.length()) + { + CORBA::String_var str = orb->object_to_string(gateway.in() + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + FILE *output_file= + ACE_OS::fopen (ACE_TEXT_CHAR_TO_TCHAR(ior_file_name.c_str()), + ACE_LIB_TEXT("w")); + if (output_file == 0) + ACE_ERROR_RETURN ((LM_ERROR, + "Cannot open output file for writing IOR: %s", + ior_file_name.c_str()), + 1); + ACE_OS::fprintf (output_file, "%s", str.in ()); + ACE_OS::fclose (output_file); + } + + orb->run(ACE_ENV_SINGLE_ARG_PARAMETER); + } + ACE_CATCHANY { + return 1; + } + ACE_ENDTRY; + return 0; +} diff --git a/TAO/orbsvcs/FTRT_Event_Service/Gateway_Service/FTRTEC_Gateway_Service.mpc b/TAO/orbsvcs/FTRT_Event_Service/Gateway_Service/FTRTEC_Gateway_Service.mpc new file mode 100644 index 00000000000..7c21a4791f3 --- /dev/null +++ b/TAO/orbsvcs/FTRT_Event_Service/Gateway_Service/FTRTEC_Gateway_Service.mpc @@ -0,0 +1,9 @@ + project(FTRTEC_Gateway_Service): orbsvcsexe, ftrteventclient { + exename = ftrtec_gateway_service + includes += $(TAO_ROOT)/orbsvcs $(TAO_ROOT)/orbsvcs/FtRtEvent/Utils + + specific(gnuace) { + lit_libs += TAO_Strategies + } +} + diff --git a/TAO/orbsvcs/FTRT_Event_Service/NameService b/TAO/orbsvcs/FTRT_Event_Service/NameService new file mode 100755 index 00000000000..4e599cf9a44 --- /dev/null +++ b/TAO/orbsvcs/FTRT_Event_Service/NameService @@ -0,0 +1,17 @@ +#!/bin/sh + +case $# in + 0) ARG="-multicast";; + 1) ARG=$1 +esac + +case "$ARG" in + -multicast) PARAM="-m 1";; + -iiop) PARAM=iiop://`hostname`:${ORBNameServicePort=10000};; + -sciop) PARAM=sciop://`hostname`:${ORBNameServicePort=10000};; + *) + @echo Usage : NameService [-iiop | -sciop] 2&>1 ; exit 2 + +esac + +$TAO_ROOT/orbsvcs/Naming_Service/Naming_Service -d -d -d -ORBEndPoint "$PARAM" diff --git a/TAO/orbsvcs/FTRT_Event_Service/Readme b/TAO/orbsvcs/FTRT_Event_Service/Readme new file mode 100644 index 00000000000..a31d33bc059 --- /dev/null +++ b/TAO/orbsvcs/FTRT_Event_Service/Readme @@ -0,0 +1,155 @@ +// $Id$ + + +Fault Tolerant Event Service + + +Important Note : In current stage, the Fault Tolerant Event Service can only be made under + MPC build. The conventional makefiles are yet to be supported. + +This directory contains the following programs: + + ftrt_eventservice : Implements the functionality of fault tolerant event channel. + It can be started directly or be started by the ft_factory. + + ftrtec_factory_service : A program used to spawn the ftrt_eventservice process. The process it + create can be controled through "test.cfg" whose contents should begin with the + repository id of EventChannel followed by the executable path of + ftrt_eventservice. + + ftrtec_gateway_service : An intermediator program between the ftrt_eventservice and the clients which do not support FT CORBA. + + consumer : A shell script to start the consumer test program. The actual consumer pragram is in orbsvcs/tests/FtRtEvent. + + supplier : A shell script to start the supplier test program. The actual supplier pragram is in orbsvcs/tests/FtRtEvent. + + ftec : a shell script to start ftrt_eventservice. + +Quick start: + + Run the applications as follows: + + + 1. Start Naming_Service + $ $TAO_ROOT/orbsvcs/Naming_Service/Naming_Service -m 1 + or you can use the shell script NameService in this directory to start it. + + 2. Start the ftrt_eventservice. Use the "-p" option to start it as a primary and + use the "-j" option to start it as a backup. + + $ ./ftec -p + $ ./ftec -j + $ ./ftec -j + + + 3. Start the consumer and supplier. + $ ./consumer + $ ./supplier + +How do we add a new FTRTEC to the system? + + Just use + + ./ftec -j + + The newly created process will contact to the naming service and then join to + the existing object group. + +Is there any adjustable options for FTRTEC? + + Here is the list of options for the ftec script + + -sciop Use SCIOP for CORBA communication + -sctp Use SCTP for fault detection + -hb n Specify the heart beat interval in sec + for SCTP connection, this option also activate sctp option. + -ami Use AMI call for replication messages (The default is + two-way CORBA call for replication) + -p activate as a primary replica. + -j activate as a backup replica. + + + Below are some options that are used for the consumer and supplier + test scripts. + + -sciop Use SCIOP for CORBA communication. This requires that the Naming + Service and ftec are also started using SCIOP transport protocol. + + -d n Specify the transaction depth. The transaction depth indicates the + number of replicas that must complete the subscription request before + the request can return. + + -t f.f For supplier only. Specify the time interval between event sending + in seconds, this value should be a float point. + + If you the naming service are not running at the same machine with above programs, + you can always set the environmental variables NameServiceIOR before starting the + ftec, consumer or supplier. + + + How do I start the FTRTEC using ftrtec_factory_service? + + The ftrtec_factory_service is a small program that can instaniate a ftrt_eventservice on demand. + It exports the FT::GenericFactory interface to its client. There are two ways that + you can get the IOR for the factory object. 1) specify the name you want the factory + register to the naming service and then get the IOR from the naming service by + the name. 2) output the IOR to a file when the factory starts. Here are the options + + ftrtec_factory_service : + + -i id_string The id field of the name that is used to register to the naming service + -k kind_string The kind field of the name that is used to register to the naming service + -o output_filename The output file name for the factory IOR. + +Once you get the IOR for the factory, you can use create_object to intantiate the ftrt_eventservice. +Here are the parameters in create_object() to control how ftrt_eventservice is created. + + type_id : this value should be "IDL:FtRtecEventChannelAdmin/EventChannel:1.0" + the_criteria : the_criteria is a sequence of Property which in term consists of + "nam" and "value". Below a a list of possible nam and values. + + nam value + ========================================================================== + FTEC_MEMBERSHIP PRIMARY + BACKUP + NONE + ---------------------------------------------------------------------- + FTEC_DETECTOR_TRANSPORT_PROTOCL TCP + SCTP + ----------------------------------------------------------------------- + FTEC_HEART_BEAT the heart beat value in sec. + (Note, you have to specify it using string, i.e. + the_criteria[0].value <<= "5"); + --------------------------------------------------------------------------------- + FTEC_REPLICATION_STRATEGY AMI + + (If not specified, the ftrt_eventservice use default + two-way call for replication) + ---------------------------------------------------------------------------------- + NameServieIOR the corbaloc representation for the + naming service + ======================================================================================= + + Any nam string started with "-" will be used as a command line option to start ftrt_eventservice. + For example, if you specfiy the name as "-ORBEndpoint" and value as "sciop://" then the + ftrt_eventservice can be started using sciop. + +How do I use the ftrtec_gateway_service program ? + + The FTRTEC uses some features in FT CORBA that requires every client to use FT ORB to work. If your + client is written based other ORBs other than TAO. You cannot get the desired fault tolerance feature. + In this case you can have the ftec_gateway as an intermediator between the FTRTEC and you client program. + For example, if you have an existing client called my_supplier. + + # setting up the event channel group as previously stated. + + $ftrtec_gateway_service -o gateway.ior ## start the gateway and output the IOR of the gateway to a file + $my_supplier -i file://gateway.ior ## start the supplier using the gateway + + + + + + + + diff --git a/TAO/orbsvcs/FTRT_Event_Service/consumer b/TAO/orbsvcs/FTRT_Event_Service/consumer new file mode 100755 index 00000000000..eec89bc5e69 --- /dev/null +++ b/TAO/orbsvcs/FTRT_Event_Service/consumer @@ -0,0 +1,22 @@ +#!/bin/sh + +HOSTNAME=`hostname` +EndpintParam= +FTEC_TransactionDepth=${FTEC_TransactionDepth=1} + + +while test "$1" != "" +do + case "$1" in + -sciop) ORBNameServicePort=${ORBNameServicePort=10000} + export NameServiceIOR=${NameServiceIOR=corbaloc:sciop:$HOSTNAME:$ORBNameServicePort/NameService} + EndpointParam="-ORBEndpoint sciop://";; + -d) $FTEC_TransactionDepth=$2 + shift;; + esac + shift +done + +cd $TAO_ROOT/orbsvcs/tests/FtRtEvent +export FTEC_TransactionDepth +./consumer $EndpointParam diff --git a/TAO/orbsvcs/FTRT_Event_Service/ftec b/TAO/orbsvcs/FTRT_Event_Service/ftec new file mode 100755 index 00000000000..390c1af594a --- /dev/null +++ b/TAO/orbsvcs/FTRT_Event_Service/ftec @@ -0,0 +1,25 @@ +#!/bin/sh +HOSTNAME=`hostname` +ARGS= + +while test "$1" != "" +do + case "$1" in + -sciop) ORBNameServicePort=${ORBNameServicePort=10000} + export NameServiceIOR=${NameServiceIOR=corbaloc:sciop:$HOSTNAME:$ORBNameServicePort/NameService} + ARGS="$ARGS -ORBEndpoint sciop:// ";; + -sctp) export FTEC_DETECTOR_TRANSPORT_PROTOCOL=sctp ;; + -hb) export FTEC_DETECTOR_TRANSPORT_PROTOCOL=sctp + export FTEC_HEART_BEAT=$2 + shift;; + -ami) export FTEC_REPLCATION_STRATEGY=AMI;; + -p) ARGS="$ARGS -p";; + -j) ARGS="$ARGS -j";; + -ns) ORBNameServicePort=${ORBNameServicePort=10000} + export NameServiceIOR=${NameServiceIOR=corbaloc:iiop:$2:$ORBNameServicePort/NameService}; + shift;; + esac + shift +done +cd ./Event_Service +./ftrt_eventservice $ARGS diff --git a/TAO/orbsvcs/FTRT_Event_Service/supplier b/TAO/orbsvcs/FTRT_Event_Service/supplier new file mode 100755 index 00000000000..96c63a29e90 --- /dev/null +++ b/TAO/orbsvcs/FTRT_Event_Service/supplier @@ -0,0 +1,21 @@ +#!/bin/sh + +HOSTNAME=`hostname` +EndpintParam= +FTEC_TransactionDepth=${FTEC_TransactionDepth=1} + +while test "$1" != "" +do + case "$1" in + -sciop) ORBNameServicePort=${ORBNameServicePort=10000} + export NameServiceIOR=${NameServiceIOR=corbaloc:sciop:$HOSTNAME:$ORBNameServicePort/NameService} + EndpointParam="-ORBEndpoint sciop://";; + -d) $FTEC_TransactionDepth=$2 + shift;; + esac + shift +done + +cd $TAO_ROOT/orbsvcs/tests/FtRtEvent +export FTEC_TransactionDepth +./supplier $EndpointParam diff --git a/TAO/orbsvcs/tests/FtRtEvent/FtRtEvent.mpc b/TAO/orbsvcs/tests/FtRtEvent/FtRtEvent.mpc new file mode 100644 index 00000000000..f73fae7c25d --- /dev/null +++ b/TAO/orbsvcs/tests/FtRtEvent/FtRtEvent.mpc @@ -0,0 +1,30 @@ +project(*Consumer): orbsvcsexe, ftrteventclient { + exename = consumer + specific(gnuace) { + lit_libs += TAO_Strategies + } + Source_Files { + consumer.cpp + PushConsumer.cpp + } + + Header_Files { + PushConsumer.h + } +} + +project(*Supplier): orbsvcsexe, ftrteventclient { + exename = supplier + + specific(gnuace) { + lit_libs += TAO_Strategies + } + Source_Files { + supplier.cpp + PushSupplier.cpp + } + + Header_Files { + PushSupplier.h + } +} diff --git a/TAO/orbsvcs/tests/FtRtEvent/PushConsumer.cpp b/TAO/orbsvcs/tests/FtRtEvent/PushConsumer.cpp new file mode 100644 index 00000000000..7a2f3049b87 --- /dev/null +++ b/TAO/orbsvcs/tests/FtRtEvent/PushConsumer.cpp @@ -0,0 +1,56 @@ +// $Id$ + +#include "PushConsumer.h" +#include "orbsvcs/FtRtEvent/Utils/resolve_init.h" +#include <stdio.h> + +ACE_RCSID (FtRtEvent, + PushConsumer, + "$Id$") + +PushConsumer_impl::PushConsumer_impl(CORBA::ORB_ptr orb) +: orb_(CORBA::ORB::_duplicate(orb)) +{ +} + + +void +PushConsumer_impl::push (const RtecEventComm::EventSet & event + ACE_ENV_ARG_DECL_WITH_DEFAULTS + ) + ACE_THROW_SPEC (( + CORBA::SystemException + )) +{ + CORBA::ULong x; + ACE_Time_Value time_val = ACE_OS::gettimeofday (); + + if (event.length() >0) { + TimeBase::TimeT elaps = + time_val.sec () * 10000000 + time_val.usec ()* 10 - event[0].header.ec_send_time; + event[0].data.any_value >>= x; + printf("Received data : %d, single trip time = %d usec\n", x, static_cast<long>(elaps/10)); + } +} + + +void +PushConsumer_impl::disconnect_push_consumer (ACE_ENV_SINGLE_ARG_DECL_WITH_DEFAULTS + ) + ACE_THROW_SPEC (( + CORBA::SystemException + )) +{ + PortableServer::Current_var current = + resolve_init<PortableServer::Current>(orb_.in(), "POACurrent" ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + PortableServer::POA_var poa = current->get_POA(ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + PortableServer::ObjectId_var oid = current->get_object_id(ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + poa->deactivate_object(oid ACE_ENV_ARG_PARAMETER); + +} diff --git a/TAO/orbsvcs/tests/FtRtEvent/PushConsumer.h b/TAO/orbsvcs/tests/FtRtEvent/PushConsumer.h new file mode 100644 index 00000000000..d84080a2eac --- /dev/null +++ b/TAO/orbsvcs/tests/FtRtEvent/PushConsumer.h @@ -0,0 +1,42 @@ +// -*- C++ -*- +//============================================================================= +/** + * @file PushConsumer.h + * + * $Id$ + * + * @author Huang-Ming Huang <hh1@cse.wustl.edu> + */ +//============================================================================= + +#ifndef PUSHCONSUMERIMPL_H +#define PUSHCONSUMERIMPL_H + +#include "orbsvcs/RtecEventCommS.h" + +class PushConsumer_impl : +public virtual POA_RtecEventComm::PushConsumer +{ +public: + PushConsumer_impl(CORBA::ORB_ptr orb); + + virtual void push ( + const RtecEventComm::EventSet & data + ACE_ENV_ARG_DECL_WITH_DEFAULTS + ) + ACE_THROW_SPEC (( + CORBA::SystemException + )); + + virtual void disconnect_push_consumer ( + ACE_ENV_SINGLE_ARG_DECL_WITH_DEFAULTS + ) + ACE_THROW_SPEC (( + CORBA::SystemException + )); +private: + CORBA::ORB_var orb_; + PushConsumer_impl(const PushConsumer_impl&); + void operator==(const PushConsumer_impl&); +}; +#endif diff --git a/TAO/orbsvcs/tests/FtRtEvent/PushSupplier.cpp b/TAO/orbsvcs/tests/FtRtEvent/PushSupplier.cpp new file mode 100644 index 00000000000..c3542fa1a78 --- /dev/null +++ b/TAO/orbsvcs/tests/FtRtEvent/PushSupplier.cpp @@ -0,0 +1,145 @@ +// $Id$ + +#include "orbsvcs/CosNamingC.h" +#include "orbsvcs/Event_Utilities.h" +#include "PushSupplier.h" +#include "ace/Reactor.h" +#include "ace/Select_Reactor.h" +#include "tao/MProfile.h" +#include "tao/Stub.h" +#include "orbsvcs/FtRtEvent/Utils/resolve_init.h" + +ACE_RCSID (FtRtEvent, + PushSupplier, + "$Id$") + + +int +PushSupplier_impl::ReactorTask::svc (void) +{ + ACE_DEBUG((LM_DEBUG, "Reactor Thread started\n")); + ACE_Reactor reactor (new ACE_Select_Reactor) ; + reactor_ = &reactor; + + extern ACE_Time_Value timer_interval; + + if (reactor_->schedule_timer(handler_, 0, ACE_Time_Value::zero, timer_interval)== -1) + ACE_ERROR_RETURN((LM_ERROR,"Cannot schedule timer\n"),-1); + + reactor_->run_reactor_event_loop(); + ACE_DEBUG((LM_DEBUG, "Reactor Thread ended\n")); + + return 0; +} + + + +PushSupplier_impl::PushSupplier_impl(CORBA::ORB_ptr orb) +: orb_(orb), seq_no_(0), reactor_task_(this) +{ +} + +PushSupplier_impl::~PushSupplier_impl() +{ + reactor_task_.wait(); +} + +int PushSupplier_impl::init(RtecEventChannelAdmin::EventChannel_ptr channel ACE_ENV_ARG_DECL) +{ + + ACE_DEBUG((LM_DEBUG, "for_suppliers\n")); + RtecEventChannelAdmin::SupplierAdmin_var supplier_admin = + channel->for_suppliers(ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + ACE_DEBUG((LM_DEBUG, "obtain_push_consumer\n")); + consumer_ = + supplier_admin->obtain_push_consumer(ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + + + ACE_DEBUG((LM_DEBUG, "got push_consumer with %d profiles\n", + consumer_->_stubobj ()->base_profiles ().profile_count ())); + + RtecEventChannelAdmin::SupplierQOS qos; + qos.publications.length (1); + RtecEventComm::EventHeader& h0 = + qos.publications[0].event.header; + h0.type = ACE_ES_EVENT_UNDEFINED; // first free event type + h0.source = 1; // first free event source + + RtecEventComm::PushSupplier_var supplier = _this(); + + ACE_DEBUG((LM_DEBUG, "connect_push_supplier\n")); + consumer_->connect_push_supplier(supplier.in(), + qos ACE_ENV_ARG_PARAMETER); + + ACE_DEBUG((LM_DEBUG, "push_consumer connected\n")); + + + if (!reactor_task_.thr_count() && + reactor_task_.activate (THR_NEW_LWP | THR_JOINABLE, 1) != 0) + ACE_ERROR_RETURN ((LM_ERROR, + "Cannot activate reactor thread\n"), + -1); + + return 0; + +} + + + +void PushSupplier_impl::disconnect_push_supplier ( + ACE_ENV_SINGLE_ARG_DECL_WITH_DEFAULTS + ) + ACE_THROW_SPEC (( + CORBA::SystemException + )) +{ + + reactor_task_.reactor_->end_reactor_event_loop(); + + PortableServer::Current_var current = + resolve_init<PortableServer::Current>(orb_.in(), "POACurrent" ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + PortableServer::POA_var poa = current->get_POA(ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + PortableServer::ObjectId_var oid = current->get_object_id(ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + poa->deactivate_object(oid ACE_ENV_ARG_PARAMETER); +} + +int PushSupplier_impl::handle_timeout (const ACE_Time_Value ¤t_time, + const void *act) +{ + ACE_UNUSED_ARG(act); + ACE_UNUSED_ARG(current_time); + + ACE_DECLARE_NEW_CORBA_ENV; + ACE_TRY { + RtecEventComm::EventSet event (1); + event.length (1); + event[0].header.type = ACE_ES_EVENT_UNDEFINED; + event[0].header.source = 1; + event[0].header.ttl = 1; + + ACE_Time_Value time_val = ACE_OS::gettimeofday (); + + event[0].header.ec_send_time = time_val.sec () * 10000000 + time_val.usec ()* 10; + event[0].data.any_value <<= seq_no_; + + consumer_->push(event ACE_ENV_ARG_PARAMETER); + ACE_DEBUG((LM_DEBUG, "sending data %d\n", seq_no_)); + ++seq_no_; + } + ACE_CATCHANY + { + ACE_PRINT_EXCEPTION(ACE_ANY_EXCEPTION, "A CORBA Exception occurred."); + } + ACE_ENDTRY; + return 0; +} diff --git a/TAO/orbsvcs/tests/FtRtEvent/PushSupplier.h b/TAO/orbsvcs/tests/FtRtEvent/PushSupplier.h new file mode 100644 index 00000000000..a425411e6df --- /dev/null +++ b/TAO/orbsvcs/tests/FtRtEvent/PushSupplier.h @@ -0,0 +1,63 @@ +// -*- C++ -*- +//============================================================================= +/** + * @file PushSupplier.h + * + * $Id$ + * + * @author Huang-Ming Huang <hh1@cse.wustl.edu> + */ +//============================================================================= + +#ifndef PUSHSUPPLIER_H +#define PUSHSUPPLIER_H + +#include "orbsvcs/RtecEventCommS.h" +#include "orbsvcs/RtecEventChannelAdminC.h" +#include "ace/Event_Handler.h" +#include "ace/Task.h" +#include "ace/Reactor.h" +#include "ace/Time_Value.h" + +class ACE_Reactor; + +class PushSupplier_impl : +public ACE_Event_Handler +,public virtual POA_RtecEventComm::PushSupplier +{ +public: + PushSupplier_impl(CORBA::ORB_ptr orb); + ~PushSupplier_impl(); + + int init(RtecEventChannelAdmin::EventChannel_ptr ACE_ENV_ARG_DECL); + + virtual void disconnect_push_supplier ( + ACE_ENV_SINGLE_ARG_DECL_WITH_DEFAULTS + ) + ACE_THROW_SPEC (( + CORBA::SystemException + )); +private: + + class ReactorTask : public ACE_Task_Base + { + public: + // ctor + ReactorTask(ACE_Event_Handler* handler) : handler_(handler){} + virtual int svc (void); + // The thread entry point. + + ACE_Reactor* reactor_; + ACE_Event_Handler* handler_; + }; + + virtual int handle_timeout (const ACE_Time_Value ¤t_time, + const void *act = 0); + CORBA::ORB_var orb_; + CORBA::ULong seq_no_; + ReactorTask reactor_task_; + RtecEventChannelAdmin::ProxyPushConsumer_var consumer_; + PushSupplier_impl(const PushSupplier_impl&); + void operator==(const PushSupplier_impl&); +}; +#endif diff --git a/TAO/orbsvcs/tests/FtRtEvent/consumer.cpp b/TAO/orbsvcs/tests/FtRtEvent/consumer.cpp new file mode 100644 index 00000000000..f77c0f2e3ad --- /dev/null +++ b/TAO/orbsvcs/tests/FtRtEvent/consumer.cpp @@ -0,0 +1,151 @@ +// $Id$ + +#include "orbsvcs/CosNamingC.h" +#include "orbsvcs/FtRtecEventChannelAdminC.h" +#include "orbsvcs/Event_Utilities.h" +#include "PushConsumer.h" +#include "ace/Get_Opt.h" +#include "orbsvcs/FtRtEvent/Utils/resolve_init.h" +#include "orbsvcs/FtRtEvent/Utils/FTEC_Gateway.h" + +/// include this file to statically linked with FT ORB +#include "orbsvcs/FaultTolerance/FT_ClientService_Activate.h" + +/// include this file to statically linked with Transaction Depth +#include "orbsvcs/FtRtEvent/ClientORB/FTRT_ClientORB_Loader.h" + +ACE_RCSID (FtRtEvent, + PushConsumer, + "$Id$") + +CORBA::ORB_var orb; +auto_ptr<TAO_FTRTEC::FTEC_Gateway> gateway; + +RtecEventChannelAdmin::EventChannel_ptr +get_event_channel(int argc, ACE_TCHAR** argv ACE_ENV_ARG_DECL) +{ + FtRtecEventChannelAdmin::EventChannel_var channel; + ACE_Get_Opt get_opt (argc, argv, ACE_LIB_TEXT("hi:n")); + int opt; + int use_gateway = 1; + + while ((opt = get_opt ()) != EOF) + { + switch (opt) + { + case 'i': + { + CORBA::Object_var obj = orb->string_to_object(get_opt.opt_arg () + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + channel = FtRtecEventChannelAdmin::EventChannel::_narrow(obj.in() + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + } + break; + case 'n': + use_gateway = 0; + break; + case 'h': + case '?': + ACE_DEBUG((LM_DEBUG, + ACE_LIB_TEXT("Usage: %s ") + ACE_LIB_TEXT("-i ftrt_eventchannel_ior\n") + ACE_LIB_TEXT("-n do not use gateway\n") + ACE_LIB_TEXT("\n"), + argv[0])); + return 0; + } + } + + + if (CORBA::is_nil(channel.in())) + { + CosNaming::Name name(1); + name.length(1); + name[0].id = CORBA::string_dup("FT_EventService"); + + CosNaming::NamingContext_var naming_context = + resolve_init<CosNaming::NamingContext>(orb.in(), "NameService" + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + channel = resolve<FtRtecEventChannelAdmin::EventChannel>(naming_context.in(), + name + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + } + + if (use_gateway) + { + gateway.reset(new TAO_FTRTEC::FTEC_Gateway(orb.in(), channel.in())); + return gateway->_this(ACE_ENV_SINGLE_ARG_PARAMETER); + } + else + return channel._retn(); +} + +int main(int argc, ACE_TCHAR** argv) +{ + ACE_TRY_NEW_ENV { + orb = CORBA::ORB_init(argc, argv + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + RtecEventChannelAdmin::EventChannel_var channel + = get_event_channel(argc, argv ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + + if (CORBA::is_nil(channel.in())) + ACE_ERROR_RETURN((LM_ERROR, "Cannot Find FT_EventService\n"), -1); + + PortableServer::POA_var poa = + resolve_init<PortableServer::POA>(orb.in(), "RootPOA" + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + PortableServer::POAManager_var mgr = poa->the_POAManager(ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + mgr->activate(ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + PushConsumer_impl push_consumer_impl(orb.in()); + RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin = + channel->for_consumers(ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + RtecEventChannelAdmin::ProxyPushSupplier_var supplier = + consumer_admin->obtain_push_supplier(ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + RtecEventChannelAdmin::ConsumerQOS qos; + qos.is_gateway = 1; + qos.dependencies.length(1); + + RtecEventComm::EventHeader& h0 = + qos.dependencies[0].event.header; + h0.type = ACE_ES_EVENT_UNDEFINED; // first free event type + h0.source = ACE_ES_EVENT_SOURCE_ANY; + + RtecEventComm::PushConsumer_var push_consumer = + push_consumer_impl._this(); + + supplier->connect_push_consumer(push_consumer.in(), + qos ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + orb->run(ACE_ENV_SINGLE_ARG_PARAMETER); + + } + ACE_CATCHANY { + ACE_PRINT_EXCEPTION(ACE_ANY_EXCEPTION, "A CORBA Exception occurred."); + } + ACE_ENDTRY; + + ACE_CHECK_RETURN(-1); + + return 0; +} + diff --git a/TAO/orbsvcs/tests/FtRtEvent/supplier.cpp b/TAO/orbsvcs/tests/FtRtEvent/supplier.cpp new file mode 100644 index 00000000000..1bfae5f5b04 --- /dev/null +++ b/TAO/orbsvcs/tests/FtRtEvent/supplier.cpp @@ -0,0 +1,142 @@ +// $Id$ +#include "orbsvcs/CosNamingC.h" +#include "orbsvcs/FtRtecEventChannelAdminC.h" +#include "PushSupplier.h" +#include "ace/Get_Opt.h" +#include "orbsvcs/FtRtEvent/Utils/resolve_init.h" +#include "orbsvcs/FtRtEvent/Utils/FTEC_Gateway.h" + +/// include this file to statically linked with FT ORB +#include "orbsvcs/FaultTolerance/FT_ClientService_Activate.h" + +/// include this file to statically linked with Transaction Depth +#include "orbsvcs/FtRtEvent/ClientORB/FTRT_ClientORB_Loader.h" + +ACE_RCSID (FtRtEvent, + supplier, + "$Id$") + + +ACE_Time_Value timer_interval(1,0); +CORBA::ORB_var orb; +auto_ptr<TAO_FTRTEC::FTEC_Gateway> gateway; + +RtecEventChannelAdmin::EventChannel_ptr +get_event_channel(int argc, ACE_TCHAR** argv ACE_ENV_ARG_DECL) +{ + FtRtecEventChannelAdmin::EventChannel_var channel; + ACE_Get_Opt get_opt (argc, argv, ACE_LIB_TEXT("hi:nt:?")); + int opt; + int use_gateway = 1; + + while ((opt = get_opt ()) != EOF) + { + switch (opt) + { + case 'i': + { + CORBA::Object_var obj = orb->string_to_object(get_opt.opt_arg () + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + channel = FtRtecEventChannelAdmin::EventChannel::_narrow(obj.in() + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + } + break; + case 'n': + use_gateway = 0; + break; + case 't': + timer_interval.set(atof(get_opt.opt_arg ())); + case 'h': + case '?': + ACE_DEBUG((LM_DEBUG, + ACE_LIB_TEXT("Usage: %s ") + ACE_LIB_TEXT("-i ftrt_eventchannel_ior\n") + ACE_LIB_TEXT("-n do not use gateway\n") + ACE_LIB_TEXT("-t time Time interval in seconds between events (default 1.0)\n") + ACE_LIB_TEXT("\n"), + argv[0])); + return 0; + + } + } + + + if (CORBA::is_nil(channel.in())) + { + /// Find the FTRTEC from the Naming Service + CosNaming::Name name(1); + name.length(1); + name[0].id = CORBA::string_dup("FT_EventService"); + + CosNaming::NamingContext_var naming_context = + resolve_init<CosNaming::NamingContext>(orb.in(), "NameService" + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + channel = resolve<FtRtecEventChannelAdmin::EventChannel>(naming_context.in(), + name + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + } + + if (use_gateway) + { + // use local gateway to communicate with FTRTEC + gateway.reset(new TAO_FTRTEC::FTEC_Gateway(orb.in(), channel.in())); + return gateway->_this(ACE_ENV_SINGLE_ARG_PARAMETER); + } + else + return channel._retn(); +} + + +int main(int argc, ACE_TCHAR** argv) +{ + ACE_DECLARE_NEW_CORBA_ENV; + ACE_TRY { + orb = CORBA::ORB_init(argc, argv + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + + RtecEventChannelAdmin::EventChannel_var channel + = get_event_channel(argc, argv ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + + if (CORBA::is_nil(channel.in())) + return -1; + + PortableServer::POA_var poa = + resolve_init<PortableServer::POA>(orb.in(), "RootPOA" + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + PortableServer::POAManager_var mgr = poa->the_POAManager(ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + mgr->activate(ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + PushSupplier_impl push_supplier(orb.in()); + if (push_supplier.init(channel.in() ACE_ENV_ARG_PARAMETER) == -1) + return -1; + + RtecEventComm::PushSupplier_var + supplier = push_supplier._this(); + + + orb->run(ACE_ENV_SINGLE_ARG_PARAMETER); + + } + ACE_CATCHANY { + ACE_PRINT_EXCEPTION(ACE_ANY_EXCEPTION, "A CORBA Exception occurred."); + } + ACE_ENDTRY; + + ACE_CHECK_RETURN(-1); + + return 0; +} diff --git a/TAO/orbsvcs/tests/FtRtEvent/svc.conf b/TAO/orbsvcs/tests/FtRtEvent/svc.conf new file mode 100644 index 00000000000..49c3a8a3d04 --- /dev/null +++ b/TAO/orbsvcs/tests/FtRtEvent/svc.conf @@ -0,0 +1,3 @@ +## $Id$ + +static FTRT_ClientORB_Service "-ORBTransactionDepth $FTEC_TransactionDepth"
\ No newline at end of file |