diff options
Diffstat (limited to 'TAO/orbsvcs/examples/FaultTolerance/RolyPoly/ReplicaController.cpp')
-rw-r--r-- | TAO/orbsvcs/examples/FaultTolerance/RolyPoly/ReplicaController.cpp | 506 |
1 files changed, 506 insertions, 0 deletions
diff --git a/TAO/orbsvcs/examples/FaultTolerance/RolyPoly/ReplicaController.cpp b/TAO/orbsvcs/examples/FaultTolerance/RolyPoly/ReplicaController.cpp new file mode 100644 index 00000000000..bf3c1e9bf0b --- /dev/null +++ b/TAO/orbsvcs/examples/FaultTolerance/RolyPoly/ReplicaController.cpp @@ -0,0 +1,506 @@ +// file : RolyPoly/ReplicaController.cpp +// author : Boris Kolpackov <boris@dre.vanderbilt.edu> +// cvs-id : $Id$ + +#include "ace/UUID.h" +#include "ace/Thread_Manager.h" +#include "ace/TMCast/Group.hpp" + +#include "tao/PortableServer/Servant_Base.h" + +#include "orbsvcs/FT_CORBA_ORBC.h" + +#include "CrashPoint.h" +#include "StateUpdate.h" +#include "ReplicaController.h" + + +// State slot. +// +// + +namespace +{ + PortableInterceptor::SlotId state_slot_id_; +} + +PortableInterceptor::SlotId +state_slot_id () +{ + return state_slot_id_; +} + +void +state_slot_id (PortableInterceptor::SlotId slot_id) +{ + state_slot_id_ = slot_id; +} + +Checkpointable:: +~Checkpointable () +{ +} + +CORBA::Any* Checkpointable:: +get_state () +{ + return 0; +} + +void Checkpointable:: +associate_state (CORBA::ORB_ptr orb, CORBA::Any const& state) +{ + CORBA::Object_var pic_obj = + orb->resolve_initial_references ("PICurrent"); + + PortableInterceptor::Current_var pic = + PortableInterceptor::Current::_narrow (pic_obj.in ()); + + pic->set_slot (state_slot_id (), state); +} + +// ReplyLogger +// +// + +ReplicaController:: +~ReplicaController () +{ +} + +ReplicaController:: +ReplicaController (CORBA::ORB_ptr orb) + : orb_ (CORBA::ORB::_duplicate (orb)) +{ + CORBA::Object_var poa_object = + orb_->resolve_initial_references ("RootPOA"); + + root_poa_ = PortableServer::POA::_narrow (poa_object.in ()); + + // Generate member id + ACE_Utils::UUID uuid; + ACE_Utils::UUID_GENERATOR::instance ()->init (); + ACE_Utils::UUID_GENERATOR::instance ()->generateUUID (uuid); + + ACE_INET_Addr address (10000, "239.255.0.1"); + + ACE_DEBUG ((LM_DEBUG, "Becoming a member with id %s\n", + uuid.to_string ()->c_str ())); + + group_.reset (new TMCast::Group (address, uuid.to_string ()->c_str ())); + + int r = ACE_Thread_Manager::instance ()->spawn ( + &ReplicaController::listener_thunk, this); + + if (r < 0) + { + orb_->shutdown (0); + } +} + +void ReplicaController:: +listener () +{ + try + { + for (char buffer[1024];;) + { + size_t n = group_->recv (buffer, sizeof (buffer)); + + ACE_HEX_DUMP ((LM_DEBUG, buffer, n)); + + TAO_InputCDR cdr (buffer, n); + + CORBA::OctetSeq object_id; + PortableInterceptor::AdapterName adapter_name; + CORBA::String_var client_id; + CORBA::Long retention_id; + CORBA::OctetSeq reply; + CORBA::Any state; + + cdr >> object_id; + cdr >> adapter_name; + cdr >> client_id.out (); + cdr >> retention_id; + cdr >> reply; + cdr >> state; + + if (!cdr.good_bit ()) + { + ACE_DEBUG ((LM_DEBUG, "CDR failed\n")); + //@@ what to do? + } + + ACE_DEBUG ((LM_DEBUG, + "Received log for %s with rid %i\n", + client_id.in (), + retention_id)); + + + RecordId rid (client_id.in (), retention_id); + + CORBA::OctetSeq_var tmp (new CORBA::OctetSeq (reply)); + log_.insert (rid, tmp); + + // Update state. + CORBA::TypeCode_var tc = state.type (); + + if (tc->kind () != CORBA::tk_null) + { + PortableServer::POA_var poa = resolve_poa (adapter_name); + + PortableServer::ServantBase_var servant = + poa->id_to_servant (object_id); + + Checkpointable* target = + dynamic_cast<Checkpointable*> (servant.in ()); + + if (target) target->set_state (state); + } + } + } + catch (TMCast::Group::Failed const&) + { + ACE_DEBUG ((LM_DEBUG, + "Group failure. Perhaps, I am alone in the group.\n")); + } + catch (TMCast::Group::InsufficienSpace const&) + { + ACE_DEBUG ((LM_DEBUG, "Group::InsufficienSpace\n")); + } + + orb_->shutdown (0); +} + +PortableServer::POA_ptr ReplicaController:: +resolve_poa (PortableInterceptor::AdapterName const&) +{ + //@@ Assume for now it's a root poa. + return PortableServer::POA::_duplicate (root_poa_.in ()); +} + + +void* ReplicaController:: +listener_thunk (void* p) +{ + ReplicaController* obj = reinterpret_cast<ReplicaController*> (p); + obj->listener(); + return 0; +} + +namespace +{ + FT::FTRequestServiceContext* + extract_context ( + PortableInterceptor::ServerRequestInfo_ptr ri + ACE_ENV_ARG_DECL) ACE_THROW_SPEC ((CORBA::SystemException)); +} + +void +ReplicaController::tao_ft_interception_point ( + PortableInterceptor::ServerRequestInfo_ptr ri, + CORBA::OctetSeq_out ocs + ACE_ENV_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException, + PortableInterceptor::ForwardRequest)) +{ + FT::FTRequestServiceContext_var ftr ( + extract_context (ri ACE_ENV_ARG_PARAMETER)); + + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Received request from %s with rid %i\n", + ftr->client_id.in (), + ftr->retention_id)); + + // Check if this request is eligible for replay. + + RecordId rid (ftr->client_id.in (), ftr->retention_id); + + if (log_.contains (rid)) + { + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Replaying reply for %s with rid %i\n", + ftr->client_id.in (), + ftr->retention_id)); + + CORBA::OctetSeq_var copy (log_.lookup (rid)); // make a copy + + ocs = copy._retn (); + } + + return; +} + +void +ReplicaController::send_reply ( + PortableInterceptor::ServerRequestInfo_ptr ri + ACE_ENV_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + FT::FTRequestServiceContext_var ftr ( + extract_context (ri ACE_ENV_ARG_PARAMETER)); + + + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Sending reply for %s with rid %i\n", + ftr->client_id.in (), + ftr->retention_id)); + + + // Prepare reply for logging. + + CORBA::Any_var result = + ri->result (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + TAO_OutputCDR cdr; + result->impl ()->marshal_value (cdr); + + Dynamic::ParameterList_var pl = + ri->arguments (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + CORBA::ULong len = pl->length (); + + for (CORBA::ULong index = 0; index != len ; ++index) + { + //@@ No chance for PARAM_OUT + if ((*pl)[index].mode == CORBA::PARAM_INOUT) + { + (*pl)[index].argument.impl ()->marshal_value (cdr); + } + } + + CORBA::OctetSeq_var reply; + + ACE_NEW (reply, CORBA::OctetSeq (cdr.total_length ())); + + reply->length (cdr.total_length ()); + + CORBA::Octet* buf = reply->get_buffer (); + + // @@ What if this throws an exception?? We don't have any way to + // check whether this succeeded + for (ACE_Message_Block const* mb = cdr.begin (); + mb != 0; + mb = mb->cont ()) + { + ACE_OS::memcpy (buf, mb->rd_ptr (), mb->length ()); + buf += mb->length (); + } + + // Logging the reply and state update. + // + + // First send message to members. + // + { + // Extract state update. + + CORBA::OctetSeq_var oid = ri->object_id (); + PortableInterceptor::AdapterName_var an = ri->adapter_name (); + + CORBA::Any_var state = ri->get_slot (state_slot_id ()); + + CORBA::TypeCode_var tc = state->type (); + + if (tc->kind () == CORBA::tk_null) + { + ACE_DEBUG ((LM_DEBUG, "Slot update is void\n")); + + PortableServer::POA_var poa = resolve_poa (an); + + PortableServer::ServantBase_var servant = + poa->id_to_servant (oid); + + Checkpointable* target = + dynamic_cast<Checkpointable*> (servant.in ()); + + if (target) + { + CORBA::Any_var tmp = target->get_state (); + + if (tmp != 0) state = tmp._retn (); + } + } + + TAO_OutputCDR cdr; + + cdr << oid; + cdr << an; + cdr << ftr->client_id.in (); + cdr << ftr->retention_id; + cdr << reply.in (); + cdr << *state; + + size_t size = cdr.total_length (); + + CORBA::OctetSeq_var msg; + + ACE_NEW (msg, CORBA::OctetSeq (size)); + + msg->length (size); + + { + CORBA::Octet* buf (msg->get_buffer ()); + + for (ACE_Message_Block const* mb = cdr.begin (); + mb != 0; + mb = mb->cont ()) + { + ACE_OS::memcpy (buf, mb->rd_ptr (), mb->length ()); + buf += mb->length (); + } + } + + CORBA::Octet* buf (msg->get_buffer ()); + + // Crash point 1. + // + if (crash_point == 1 && ftr->retention_id > 2) exit (1); + + try + { + while (true) + { + try + { + group_->send (buf, size); + ACE_DEBUG ((LM_DEBUG, "Sent log record of length %i\n", size)); + break; + } + catch (TMCast::Group::Aborted const&) + { + ACE_DEBUG ((LM_DEBUG, "Retrying to send log record.\n")); + } + } + } + catch (TMCast::Group::Failed const&) + { + ACE_DEBUG ((LM_DEBUG, + "Group failure. Perhaps, I am alone in the group.\n")); + } + } + + + // Now perform local logging. + // + RecordId rid (ftr->client_id.in (), ftr->retention_id); + + // This is slow but eh-safe ;-). + // + log_.insert (rid, reply); + + + // Crash point 2. + // + if (crash_point == 2 && ftr->retention_id > 2) exit (1); +} + + +namespace +{ + FT::FTRequestServiceContext* + extract_context (PortableInterceptor::ServerRequestInfo_ptr ri + ACE_ENV_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException)) + { + IOP::ServiceContext_var svc = + ri->get_request_service_context (IOP::FT_REQUEST + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + TAO_InputCDR cdr (ACE_reinterpret_cast (const char*, + svc->context_data.get_buffer ()), + svc->context_data.length ()); + + CORBA::Boolean byte_order; + + if ((cdr >> ACE_InputCDR::to_boolean (byte_order)) == 0) + { + //@@ what to throw? + ACE_THROW (CORBA::BAD_CONTEXT ()); + } + + cdr.reset_byte_order (ACE_static_cast (int,byte_order)); + + // Funny, the following two lines should normally translate + // just to one ctor call. But because we have to use this + // ACE_NEW macro hackery we have a default ctor call plus + // assignment operator call. Yet another example how the + // majority is being penalized by some broken platforms. + // + FT::FTRequestServiceContext_var req; + + //@@ completed status maybe wrong + // + ACE_NEW_THROW_EX (req, + FT::FTRequestServiceContext, + CORBA::NO_MEMORY ( + CORBA::SystemException::_tao_minor_code ( + TAO_DEFAULT_MINOR_CODE, + ENOMEM), + CORBA::COMPLETED_NO)); + + cdr >> *req; + + if (!cdr.good_bit ()) + { + //@@ what to throw? + ACE_THROW (CORBA::UNKNOWN ()); + } + + return req._retn (); + } +} + + +char* +ReplicaController::name (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + return CORBA::string_dup ("ReplicaController"); +} + +void +ReplicaController::send_exception ( + PortableInterceptor::ServerRequestInfo_ptr + ACE_ENV_ARG_DECL_NOT_USED) + ACE_THROW_SPEC ((CORBA::SystemException, + PortableInterceptor::ForwardRequest)) +{ +} + +void +ReplicaController::send_other ( + PortableInterceptor::ServerRequestInfo_ptr + ACE_ENV_ARG_DECL_NOT_USED) + ACE_THROW_SPEC ((CORBA::SystemException, + PortableInterceptor::ForwardRequest)) +{ +} + +void +ReplicaController::destroy (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ +} + +void +ReplicaController::receive_request_service_contexts ( + PortableInterceptor::ServerRequestInfo_ptr + ACE_ENV_ARG_DECL_NOT_USED) + ACE_THROW_SPEC ((CORBA::SystemException, + PortableInterceptor::ForwardRequest)) +{ + +} + +void +ReplicaController::receive_request ( + PortableInterceptor::ServerRequestInfo_ptr + ACE_ENV_ARG_DECL_NOT_USED) + ACE_THROW_SPEC ((CORBA::SystemException, + PortableInterceptor::ForwardRequest)) +{ +} |