diff options
Diffstat (limited to 'trunk/TAO/orbsvcs/orbsvcs/Notify/Routing_Slip.cpp')
-rw-r--r-- | trunk/TAO/orbsvcs/orbsvcs/Notify/Routing_Slip.cpp | 933 |
1 files changed, 933 insertions, 0 deletions
diff --git a/trunk/TAO/orbsvcs/orbsvcs/Notify/Routing_Slip.cpp b/trunk/TAO/orbsvcs/orbsvcs/Notify/Routing_Slip.cpp new file mode 100644 index 00000000000..eb101ef2105 --- /dev/null +++ b/trunk/TAO/orbsvcs/orbsvcs/Notify/Routing_Slip.cpp @@ -0,0 +1,933 @@ +// $Id$ + +#include "orbsvcs/Notify/Routing_Slip.h" + +#include "orbsvcs/Notify/Delivery_Request.h" +#include "orbsvcs/Notify/Worker_Task.h" +#include "orbsvcs/Notify/ProxyConsumer.h" +#include "orbsvcs/Notify/ProxySupplier.h" +#include "orbsvcs/Notify/Event_Persistence_Strategy.h" +#include "orbsvcs/Notify/Routing_Slip_Persistence_Manager.h" +#include "orbsvcs/Notify/Routing_Slip_Queue.h" +#include "orbsvcs/Notify/Method_Request_Lookup.h" +#include "orbsvcs/Notify/Method_Request_Dispatch.h" + +#include "tao/debug.h" +#include "tao/corba.h" + +#include "ace/Dynamic_Service.h" + +//#define DEBUG_LEVEL 9 +#ifndef DEBUG_LEVEL +# define DEBUG_LEVEL TAO_debug_level +#endif //DEBUG_LEVEL + +#define QUEUE_ALLOWED 1 + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +namespace TAO_Notify +{ +/////////////////////// +// Routing_Slip Statics + +Routing_Slip_Queue Routing_Slip::persistent_queue_(QUEUE_ALLOWED); + +TAO_SYNCH_MUTEX Routing_Slip::sequence_lock_; +int Routing_Slip::routing_slip_sequence_= 0; +size_t Routing_Slip::count_enter_transient_ = 0; +size_t Routing_Slip::count_continue_transient_ = 0; +size_t Routing_Slip::count_enter_reloaded_ = 0; +size_t Routing_Slip::count_enter_new_ = 0; +size_t Routing_Slip::count_continue_new_ = 0; +size_t Routing_Slip::count_enter_complete_while_new_ = 0; +size_t Routing_Slip::count_enter_saving_ = 0; +size_t Routing_Slip::count_enter_saved_ = 0; +size_t Routing_Slip::count_enter_updating_ = 0; +size_t Routing_Slip::count_enter_changed_while_saving_ = 0; +size_t Routing_Slip::count_continue_changed_while_saving_ = 0; +size_t Routing_Slip::count_enter_changed_ = 0; +size_t Routing_Slip::count_continue_changed_ = 0; +size_t Routing_Slip::count_enter_complete_ = 0; +size_t Routing_Slip::count_enter_deleting_ = 0; +size_t Routing_Slip::count_enter_terminal_ = 0; + +Routing_Slip_Ptr +Routing_Slip::create (const TAO_Notify_Event::Ptr& event ACE_ENV_ARG_DECL) +{ + Routing_Slip * prs; + ACE_NEW_THROW_EX (prs, Routing_Slip (event), CORBA::NO_MEMORY ()); + ACE_CHECK_RETURN (Routing_Slip_Ptr()); + Routing_Slip_Ptr result(prs); + result->this_ptr_ = result; // let the pointers touch so they use the same ref count + + // note we don't care about ultra-precise stats, so no guard for these + if (DEBUG_LEVEL > 8 && ((result->sequence_ % 100) == 0)) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%P|%t) Routing_Slip_Statistics\n") + ACE_TEXT (" enter_transient \t%d\n") + ACE_TEXT (" continue_transient \t%d\n") + ACE_TEXT (" enter_reloaded \t%d\n") + ACE_TEXT (" enter_new \t%d\n") + ACE_TEXT (" continue_new \t%d\n") + ACE_TEXT (" enter_complete_while_new \t%d\n") + ACE_TEXT (" enter_saving \t%d\n") + ACE_TEXT (" enter_saved \t%d\n") + ACE_TEXT (" enter_updating \t%d\n") + ACE_TEXT (" enter_changed_while_saving \t%d\n") + ACE_TEXT (" continue_changed_while_saving\t%d\n") + ACE_TEXT (" enter_changed \t%d\n") + ACE_TEXT (" continue_changed \t%d\n") + ACE_TEXT (" enter_complete \t%d\n") + ACE_TEXT (" enter_deleting \t%d\n") + ACE_TEXT (" enter_terminal \t%d\n") + , static_cast<int> (count_enter_transient_) + , static_cast<int> (count_continue_transient_) + , static_cast<int> (count_enter_reloaded_) + , static_cast<int> (count_enter_new_) + , static_cast<int> (count_continue_new_) + , static_cast<int> (count_enter_complete_while_new_) + , static_cast<int> (count_enter_saving_) + , static_cast<int> (count_enter_saved_) + , static_cast<int> (count_enter_updating_) + , static_cast<int> (count_enter_changed_while_saving_) + , static_cast<int> (count_continue_changed_while_saving_) + , static_cast<int> (count_enter_changed_) + , static_cast<int> (count_continue_changed_) + , static_cast<int> (count_enter_complete_) + , static_cast<int> (count_enter_deleting_) + , static_cast<int> (count_enter_terminal_) + )); + } + return result; +} + +// static +Routing_Slip_Ptr +Routing_Slip::create ( + TAO_Notify_EventChannelFactory & ecf, + Routing_Slip_Persistence_Manager * rspm) +{ + Routing_Slip_Ptr result; + ACE_Message_Block * event_mb = 0; + ACE_Message_Block * rs_mb = 0; + ACE_DECLARE_NEW_ENV; + ACE_TRY + { + if (rspm->reload (event_mb, rs_mb)) + { + TAO_InputCDR cdr_event (event_mb); + TAO_Notify_Event::Ptr event (TAO_Notify_Event::unmarshal (cdr_event)); + if (event.isSet()) + { + result = create (event ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + TAO_InputCDR cdr_rs (rs_mb); + if ( result->unmarshal (ecf, cdr_rs)) + { + result->set_rspm (rspm); + } + else + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%P|%t) Routing_Slip::create: Unmarshalling failed for routing slip.\n") + )); + result.reset (); + } + } + else + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%P|%t) Routing_Slip::create: Unmarshalling failed for event.\n") + )); + } + } + } + ACE_CATCHANY + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%P|%t) Routing_Slip::create: Exception reloading event.\n") + )); + } + ACE_ENDTRY; + delete event_mb; + delete rs_mb; + + return result; +} + +void +Routing_Slip::set_rspm (Routing_Slip_Persistence_Manager * rspm) +{ + this->rspm_ = rspm; + if (rspm_ != 0) + { + rspm->set_callback (this); + } +} + +Routing_Slip::Routing_Slip( + const TAO_Notify_Event::Ptr& event) + : is_safe_ (false) + , until_safe_ (internals_) + , this_ptr_ (0) + , event_(event) + , state_ (rssCREATING) + , complete_requests_ (0) + , rspm_ (0) +{ + Routing_Slip_Guard guard (sequence_lock_); + this->sequence_ = ++routing_slip_sequence_; + if (DEBUG_LEVEL > 1) ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) Routing Slip #%d: constructor\n"), + this->sequence_ + )); +} + +Routing_Slip::~Routing_Slip () +{ + if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) Routing Slip #%d: destructor\n"), + this->sequence_ + )); +} + +bool +Routing_Slip::create_persistence_manager() +{ + if (this->rspm_ == 0) + { + Event_Persistence_Strategy * strategy = + ACE_Dynamic_Service <TAO_Notify::Event_Persistence_Strategy>::instance ("Event_Persistence"); + if (strategy != 0) + { + Event_Persistence_Factory * factory = strategy->get_factory (); + if (factory != 0) + { + set_rspm (factory->create_routing_slip_persistence_manager(this)); + } + } + } + return this->rspm_ != 0; +} + +const TAO_Notify_Event::Ptr & +Routing_Slip::event () const +{ + return this->event_; +} + +void +Routing_Slip::wait_persist () +{ + Routing_Slip_Guard guard (this->internals_); + while (!this->is_safe_) + { + this->until_safe_.wait (); + } +} + +void +Routing_Slip::route (TAO_Notify_ProxyConsumer* pc, bool reliable_channel ACE_ENV_ARG_DECL) +{ + ACE_ASSERT(pc != 0); + + TAO_Notify_ProxyConsumer::Ptr pcgrd(pc); + + Routing_Slip_Guard guard (this->internals_); + + size_t request_id = delivery_requests_.size (); + + if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) Routing Slip #%d: add Delivery_Request #%d: lookup, completed %d of %d\n"), + this->sequence_, + static_cast<int> (request_id), + static_cast<int> (this->complete_requests_), + static_cast<int> (this->delivery_requests_.size ()) + )); + + Delivery_Request_Ptr request (new Delivery_Request (this->this_ptr_, request_id)); + this->delivery_requests_.push_back (request); + TAO_Notify_Method_Request_Lookup_Queueable method (request, pc); + + if (this->state_ == rssCREATING) + { + if (! reliable_channel) + { + enter_state_transient (guard); + } + else if (ACE_Dynamic_Service <TAO_Notify::Event_Persistence_Strategy>::instance ("Event_Persistence") == 0) + { + enter_state_transient (guard); + } + else if (! this->event_->reliable().is_valid()) + { + enter_state_new (guard); + } + else if (this->event_->reliable().value() == CosNotification::Persistent) + { + enter_state_new (guard); + } + else + { + enter_state_transient (guard); + } + } + guard.release (); + pc->execute_task (method ACE_ENV_ARG_PARAMETER); +} +#if 0 // forward +void +Routing_Slip::forward (TAO_Notify_ProxySupplier* ps, bool filter) +{ + // must be the first action + ACE_ASSERT (this->state_ == rssCREATING); + + TAO_Notify_ProxySupplier::Ptr psgrd(ps); + Routing_Slip_Guard guard (this->internals_); + + enter_state_transient (guard); + size_t request_id = delivery_requests_.size (); + + if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) Routing Slip #%d: add Delivery_Request #%d: Forward %s; completed %d of %d\n"), + this->sequence_, + static_cast<int> (request_id), + filter ? ACE_TEXT ("Filter") : ACE_TEXT ("No Filter"), + static_cast<int> (this->complete_requests_), + static_cast<int> (this->delivery_requests_.size ()) + )); + + Delivery_Request_Ptr request (new Delivery_Request (this->this_ptr_, request_id)); + if (! ps->has_shutdown() ) + { + this->delivery_requests_.push_back (request); +// Delivery_Method_Dispatch method (request, ps, filter); + TAO_Notify_Method_Request_Dispatch_No_Copy method (request, ps, filter); + guard.release (); + if (DEBUG_LEVEL > 8) + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Routing Slip #%d: dispatching Delivery_Request %d to " + "proxy supplier %d\n", + this->sequence_, + static_cast<int> (request_id), + ps->id())); + ps->worker_task()->execute (method); + } + else + { + if (DEBUG_LEVEL > 5) + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Routing Slip #%d: not dispatching Delivery_Request %d to " + "proxy supplier %d; already shut down\n", + this->sequence_, + static_cast<int> (request_id), + ps->id())); + } +} +#endif // forward + +void +Routing_Slip::dispatch ( + TAO_Notify_ProxySupplier* ps, + bool filter + ACE_ENV_ARG_DECL) +{ + // cannot be the first action + ACE_ASSERT (this->state_ != rssCREATING); + + TAO_Notify_ProxySupplier::Ptr psgrd(ps); + Routing_Slip_Guard guard (this->internals_); + + size_t request_id = delivery_requests_.size (); + + if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) Routing Slip #%d: add Delivery_Request #%d: Dispatch %s; completed %d of %d\n"), + this->sequence_, + static_cast<int> (request_id), + filter ? ACE_TEXT ("Filter") : ACE_TEXT ("No Filter"), + static_cast<int> (this->complete_requests_), + static_cast<int> (this->delivery_requests_.size ()) + )); + + Delivery_Request_Ptr request (new Delivery_Request (this->this_ptr_, request_id)); + if (! ps->has_shutdown() ) + { + this->delivery_requests_.push_back (request); + TAO_Notify_Method_Request_Dispatch_No_Copy method (request, ps, filter); + guard.release (); + if (DEBUG_LEVEL > 8) + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Routing Slip #%d: dispatching Delivery_Request %d to " + "proxy supplier %d\n", + this->sequence_, + static_cast<int> (request_id), + ps->id())); + ps->execute_task (method ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + } + else + { + if (DEBUG_LEVEL > 5) + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Routing Slip #%d: not dispatching Delivery_Request %d to " + "proxy supplier %d; already shut down\n", + this->sequence_, + static_cast<int> (request_id), + ps->id())); + } +} + +////////// +// signals + +void +Routing_Slip::delivery_request_complete (size_t request_id) +{ + Routing_Slip_Guard guard (this->internals_); + ACE_ASSERT (request_id < this->delivery_requests_.size ()); + // reset the pointer to allow the delivery_request to be deleted. + this->delivery_requests_[request_id].reset (); + this->complete_requests_ += 1; + + if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) Routing Slip #%d: delivery_request_complete #%d: completed %d of %d\n"), + this->sequence_, + static_cast<int> (request_id), + static_cast<int> (this->complete_requests_), + static_cast<int> (this->delivery_requests_.size ()) + )); + State state = this->state_; + switch (state) + { + case rssTRANSIENT: + { + continue_state_transient (guard); + break; + } + case rssNEW: + { + continue_state_new (guard); + break; + } + case rssSAVING: + { + enter_state_changed_while_saving (guard); + break; + } + case rssUPDATING: + { + enter_state_changed_while_saving (guard); + break; + } + case rssSAVED: + { + enter_state_changed (guard); + break; + } + case rssCHANGED_WHILE_SAVING: + { + continue_state_changed_while_saving (guard); + break; + } + case rssCHANGED: + { + continue_state_changed (guard); + break; + } + default: + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%P|%t) Notification Service Routing Slip: Unexpected delivery_request_complete in state %d\n"), + static_cast<int> (this->state_) + )); + break; + } + } +} + +void +Routing_Slip::at_front_of_persist_queue () +{ + Routing_Slip_Guard guard (this->internals_); + State state = this->state_; + switch (state) + { + case rssNEW: + { + if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) Routing Slip #%d: NEW Reached front of queue\n"), + this->sequence_ + )); + enter_state_saving (guard); + break; + } + case rssCOMPLETE_WHILE_NEW: + { + if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) Routing Slip #%d: COMPLETE_WHILE_NEW Reached front of queue\n"), + this->sequence_ + )); + this->persistent_queue_.complete (); + enter_state_terminal (guard); + break; + } + case rssCHANGED: + { + if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) Routing Slip #%d: CHANGED Reached front of queue\n"), + this->sequence_ + )); + enter_state_updating (guard); + break; + } + case rssCOMPLETE: + { + if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) Routing Slip #%d: COMPLETE Reached front of queue\n"), + this->sequence_ + )); + enter_state_deleting (guard); + break; + } + default: + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%P|%t) Routing Slip %d: Unexpected at_front_of_persist_queue in state %d\n"), + this->sequence_, + static_cast<int> (this->state_) + )); + break; + } + } +} + +void +Routing_Slip::persist_complete () +{ + // keep this object around til this method returns. + Routing_Slip_Ptr me(this->this_ptr_); + Routing_Slip_Guard guard (this->internals_); + ACE_ASSERT (guard.locked ()); + + // allow the ConsumerProxy to return from the CORBA push call. + if (! is_safe_) + { + is_safe_ = true; + this->until_safe_.signal (); + } + + State state = this->state_; + switch (state) + { + case rssSAVING: + { + if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) Routing Slip #%d: SAVING persist complete\n"), + this->sequence_ + )); + enter_state_saved(guard); + break; + } + case rssCHANGED_WHILE_SAVING: + { + enter_state_changed (guard); + break; + } + case rssUPDATING: + { + if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) Routing Slip #%d: UPDATING persist complete\n"), + this->sequence_ + )); + enter_state_saved (guard); + break; + } + case rssDELETING: + { + if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) Routing Slip #%d: DELETING persist complete\n"), + this->sequence_ + )); + enter_state_terminal (guard); + break; + } + default: + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%P|%t) Notification Service Routing Slip: Unexpected transition in state %d\n"), + static_cast<int> (this->state_) + )); + break; + } + } + this->persistent_queue_.complete (); +} + +////////////////// +// support methods + +bool +Routing_Slip::all_deliveries_complete () const +{ + return this->complete_requests_ == this->delivery_requests_.size (); +} + +void +Routing_Slip::add_to_persist_queue(Routing_Slip_Guard & guard) +{ + guard.release (); + this->persistent_queue_.add (this->this_ptr_); + guard.acquire (); // necessary? +} + +//////////////////// +// State transitions + +void +Routing_Slip::enter_state_new (Routing_Slip_Guard & guard) +{ + ++count_enter_new_; + if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state NEW\n"), + this->sequence_ + )); + this->state_ = rssNEW; + add_to_persist_queue(guard); +} + +void +Routing_Slip::continue_state_new (Routing_Slip_Guard & guard) +{ + ++count_continue_new_; + if (all_deliveries_complete ()) + { + this->enter_state_complete_while_new (guard); + } +} +void +Routing_Slip::enter_state_complete_while_new (Routing_Slip_Guard & guard) +{ + ++count_enter_complete_while_new_; + ACE_UNUSED_ARG (guard); + if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state COMPLETE_WHILE_NEW\n"), + this->sequence_ + )); + // allow the ConsumerProxy to return from the CORBA push call. + if (! is_safe_) + { + is_safe_ = true; + this->until_safe_.signal (); + } + this->state_ = rssCOMPLETE_WHILE_NEW; +} + +void +Routing_Slip::enter_state_reloaded (Routing_Slip_Guard & guard) +{ + ++count_enter_reloaded_; + ACE_UNUSED_ARG (guard); + if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) Routing Slip #&d: enter state RELOADED\n"), + this->sequence_ + )); + this->state_ = rssRELOADED; +} + +void +Routing_Slip::enter_state_transient (Routing_Slip_Guard & guard) +{ + ++count_enter_transient_; + ACE_UNUSED_ARG (guard); + if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state TRANSIENT\n"), + this->sequence_ + )); + this->state_ = rssTRANSIENT; + if (! is_safe_) + { + is_safe_ = true; + this->until_safe_.signal (); + } + if (all_deliveries_complete ()) + { + enter_state_terminal (guard); + } +} + +void +Routing_Slip::continue_state_transient (Routing_Slip_Guard & guard) +{ + ++count_continue_transient_; + if (all_deliveries_complete ()) + { + enter_state_terminal (guard); + } +} +void +Routing_Slip::enter_state_saving (Routing_Slip_Guard & guard) +{ + ++count_enter_saving_; + if (!create_persistence_manager ()) + { + // Note This should actually be a throw (out of memory) + // but we cheat and make this a transient event. + this->persistent_queue_.complete (); + enter_state_transient (guard); + } + else + { + if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state SAVING\n"), + this->sequence_ + )); + this->state_ = rssSAVING; + + TAO_OutputCDR event_cdr; + this->event_->marshal (event_cdr); + + const ACE_Message_Block *event_mb = event_cdr.begin (); + TAO_OutputCDR rs_cdr; + marshal (rs_cdr); + const ACE_Message_Block *rs_mb = rs_cdr.begin (); + + guard.release (); + this->rspm_->store (*event_mb, *rs_mb); + + guard.acquire (); // necessary? + } +} + +void +Routing_Slip::enter_state_saved (Routing_Slip_Guard & guard) +{ + ++count_enter_saved_; + ACE_UNUSED_ARG (guard); + if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state SAVED\n"), + this->sequence_ + )); + this->state_ = rssSAVED; +} + +void +Routing_Slip::enter_state_updating (Routing_Slip_Guard & guard) +{ + ++count_enter_updating_; + if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state UPDATING\n"), + this->sequence_ + )); + this->state_ = rssUPDATING; + + TAO_OutputCDR rs_cdr; + marshal (rs_cdr); + const ACE_Message_Block *rs_mb = rs_cdr.begin (); + guard.release (); + + ACE_ASSERT (this->rspm_ != 0); + this->rspm_->update (*rs_mb); + guard.acquire (); // necessary? +} + + +void +Routing_Slip::enter_state_changed_while_saving (Routing_Slip_Guard & guard) +{ + ++count_enter_changed_while_saving_; + ACE_UNUSED_ARG (guard); + if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state CHANGED_WHILE_SAVING\n"), + this->sequence_ + )); + this->state_ = rssCHANGED_WHILE_SAVING; +} + +void +Routing_Slip::continue_state_changed_while_saving (Routing_Slip_Guard & guard) +{ + ACE_UNUSED_ARG (guard); + // no action necessary +} + +void +Routing_Slip::enter_state_changed (Routing_Slip_Guard & guard) +{ + ++count_enter_changed_; + if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state CHANGED\n"), + this->sequence_ + )); + // complete state change BEFORE initiating request to avoid + // race condition if request finishes before state is stable. + this->state_ = rssCHANGED; + if (all_deliveries_complete ()) + { + enter_state_complete (guard); + } + add_to_persist_queue (guard); +} + +void +Routing_Slip::continue_state_changed (Routing_Slip_Guard & guard) +{ + ++count_continue_changed_; + if (all_deliveries_complete ()) + { + enter_state_complete (guard); + } +} + +void +Routing_Slip::enter_state_complete (Routing_Slip_Guard & guard) +{ + ++count_enter_complete_; + ACE_UNUSED_ARG (guard); + if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state COMPLETE\n"), + this->sequence_ + )); + this->state_ = rssCOMPLETE; +} + +void +Routing_Slip::enter_state_deleting (Routing_Slip_Guard & guard) +{ + ++count_enter_deleting_; + if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state DELETING\n"), + this->sequence_ + )); + this->state_ = rssDELETING; + guard.release (); + this->rspm_->remove (); + guard.acquire (); // necessary? +} + +void +Routing_Slip::enter_state_terminal (Routing_Slip_Guard & guard) +{ + ++count_enter_terminal_; + ACE_UNUSED_ARG (guard); + ACE_ASSERT( this->is_safe_); + if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state TERMINAL\n"), + this->sequence_ + )); + this->state_ = rssTERMINAL; + this->this_ptr_.reset (); +} + +void +Routing_Slip::marshal (TAO_OutputCDR & cdr) +{ + size_t request_count = this->delivery_requests_.size(); + cdr.write_ulong (request_count - this->complete_requests_); + for (size_t nreq = 0; nreq < request_count; ++nreq) + { + Delivery_Request * request = this->delivery_requests_[nreq].get (); + if (request != 0) + { + request->marshal (cdr); + } + } +} + +bool +Routing_Slip::unmarshal (TAO_Notify_EventChannelFactory &ecf, TAO_InputCDR & cdr) +{ + CORBA::ULong count = 0; + cdr.read_ulong (count); + for (size_t nreq = 0; nreq < count; ++nreq) + { + ACE_CDR::Octet code = 0; + while (cdr.read_octet(code)) + { + ACE_DECLARE_NEW_ENV; + ACE_TRY + { + if (code == TAO_Notify_Method_Request_Dispatch::persistence_code) + { + Delivery_Request * prequest; + ACE_NEW_THROW_EX ( + prequest, + Delivery_Request(this_ptr_, this->delivery_requests_.size ()), + CORBA::NO_MEMORY ()); + ACE_TRY_CHECK; + Delivery_Request_Ptr request(prequest); + TAO_Notify_Method_Request_Dispatch_Queueable * method = + TAO_Notify_Method_Request_Dispatch::unmarshal ( + request, + ecf, + cdr + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + if (method != 0) + { + this->delivery_requests_.push_back (request); + this->delivery_methods_.push_back (method); + } + } + else if (code == TAO_Notify_Method_Request_Lookup::persistence_code) + { + Delivery_Request_Ptr request(new Delivery_Request(this_ptr_, this->delivery_requests_.size ())); + TAO_Notify_Method_Request_Lookup_Queueable * method = + TAO_Notify_Method_Request_Lookup::unmarshal ( + request, + ecf, + cdr + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK + if (method != 0) + { + this->delivery_requests_.push_back (request); + this->delivery_methods_.push_back (method); + } + } + } + ACE_CATCHANY; + { + // @@todo should we log this? + // just ignore failures + } + ACE_ENDTRY; + } + } + return this->delivery_requests_.size () > 0; +} + +void +Routing_Slip::reconnect (ACE_ENV_SINGLE_ARG_DECL) +{ + Routing_Slip_Guard guard (this->internals_); + enter_state_saved (guard); + guard.release (); + //@@todo is there a worker_task available to do this? + size_t count = this->delivery_methods_.size (); + for (size_t nmethod = 0; nmethod < count; ++nmethod) + { + this->delivery_methods_[nmethod]->execute (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + } + this->delivery_methods_.clear (); +} + +int +Routing_Slip::sequence() const +{ + return this->sequence_; +} + +bool +Routing_Slip::should_retry () const +{ + // simple minded test: if it's transient, don't retry it + // @@todo Eventually this should check timeout, discard policy, etc. + return this->state_ != rssTRANSIENT; +} + +} // namespace + +TAO_END_VERSIONED_NAMESPACE_DECL |