diff options
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/Event/Event_Channel.cpp')
-rw-r--r-- | TAO/orbsvcs/orbsvcs/Event/Event_Channel.cpp | 245 |
1 files changed, 122 insertions, 123 deletions
diff --git a/TAO/orbsvcs/orbsvcs/Event/Event_Channel.cpp b/TAO/orbsvcs/orbsvcs/Event/Event_Channel.cpp index 2f8f9662fea..c32a9280a30 100644 --- a/TAO/orbsvcs/orbsvcs/Event/Event_Channel.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/Event_Channel.cpp @@ -12,6 +12,10 @@ #include "orbsvcs/Event/Event_Manip.h" #include "orbsvcs/Event/Event_Channel.h" +// These are to save space. +#define WRITE_GUARD ACE_ES_WRITE_GUARD +#define READ_GUARD ACE_ES_READ_GUARD + #if !defined (__ACE_INLINE__) #include "Event_Channel.i" #endif /* __ACE_INLINE__ */ @@ -294,10 +298,10 @@ ACE_Push_Supplier_Proxy::ACE_Push_Supplier_Proxy (ACE_ES_Supplier_Module *sm) void ACE_Push_Supplier_Proxy::connect_push_supplier (RtecEventComm::PushSupplier_ptr push_supplier, const RtecEventChannelAdmin::SupplierQOS &qos, - CORBA::Environment &ACE_TRY_ENV) + CORBA::Environment &TAO_IN_ENV) { if (this->connected ()) - ACE_THROW (RtecEventChannelAdmin::AlreadyConnected()); + TAO_THROW (RtecEventChannelAdmin::AlreadyConnected()); this->push_supplier_ = RtecEventComm::PushSupplier::_duplicate(push_supplier); @@ -405,6 +409,7 @@ ACE_Push_Consumer_Proxy::push (const RtecEventComm::EventSet &events, { ACE_DEBUG ((LM_DEBUG, "EC (%t) Push to disconnected consumer\n")); + // ACE_ES_DEBUG_ST (::dump_sequence (events)); return; } @@ -413,13 +418,12 @@ ACE_Push_Consumer_Proxy::push (const RtecEventComm::EventSet &events, } void -ACE_Push_Consumer_Proxy::connect_push_consumer ( - RtecEventComm::PushConsumer_ptr push_consumer, - const RtecEventChannelAdmin::ConsumerQOS &qos, - CORBA::Environment &ACE_TRY_ENV) +ACE_Push_Consumer_Proxy::connect_push_consumer (RtecEventComm::PushConsumer_ptr push_consumer, + const RtecEventChannelAdmin::ConsumerQOS &qos, + CORBA::Environment &TAO_IN_ENV) { if (this->connected ()) - ACE_THROW (RtecEventChannelAdmin::AlreadyConnected()); + TAO_THROW (RtecEventChannelAdmin::AlreadyConnected()); this->push_consumer_ = RtecEventComm::PushConsumer::_duplicate(push_consumer); @@ -434,16 +438,15 @@ ACE_Push_Consumer_Proxy::connect_push_consumer ( // ACE_ConsumerQOS_Factory::debug (qos_); - this->consumer_module_->connected (this, ACE_TRY_ENV); + this->consumer_module_->connected (this, TAO_IN_ENV); } void -ACE_Push_Consumer_Proxy::disconnect_push_supplier ( - CORBA::Environment &TAO_IN_ENV) +ACE_Push_Consumer_Proxy::disconnect_push_supplier (CORBA::Environment &TAO_IN_ENV) { ACE_TIMEPROBE_PRINT; this->push_consumer_ = RtecEventComm::PushConsumer::_nil (); - this->consumer_module_->disconnecting (this, ACE_TRY_ENV); + this->consumer_module_->disconnecting (this, TAO_IN_ENV); } void @@ -601,12 +604,12 @@ ACE_EventChannel::destroy (CORBA::Environment &) // Flush all messages in the channel. Shutdown_Channel *sc = new Shutdown_Channel (this); if (sc == 0) - ACE_THROW (CORBA::NO_MEMORY ()); + TAO_THROW (CORBA::NO_MEMORY ()); // Create a wrapper around the dispatch request. Flush_Queue_ACT *act = new Flush_Queue_ACT (sc, dispatching_module_); if (act == 0) - ACE_THROW (CORBA::NO_MEMORY ()); + TAO_THROW (CORBA::NO_MEMORY ()); // Set a 100ns timer. if (this->timer_module ()->schedule_timer (0, // no rt-info @@ -641,7 +644,9 @@ ACE_EventChannel::shutdown (void) void ACE_EventChannel::report_connect (u_long event) { - ACE_GUARD (ACE_ES_MUTEX, ace_mon, this->lock_); + ACE_ES_GUARD ace_mon (lock_); + if (ace_mon.locked () == 0) + ACE_ERROR ((LM_ERROR, "ACE_EventChannel::report_connect")); this->report_connect_i (event); } @@ -656,7 +661,9 @@ void ACE_EventChannel::report_disconnect (u_long event) { // No need to gtrab the lock is already take by our callers. - ACE_GUARD (ACE_ES_MUTEX, ace_mon, this->lock_); + ACE_ES_GUARD ace_mon (lock_); + if (ace_mon.locked () == 0) + ACE_ERROR ((LM_ERROR, "ACE_EventChannel::report_disconnect")); this->report_disconnect (event); } @@ -691,14 +698,12 @@ ACE_EventChannel::del_gateway (TAO_EC_Gateway* gw, } void -ACE_EventChannel::update_consumer_gwys (CORBA::Environment& ACE_TRY_ENV) +ACE_EventChannel::update_consumer_gwys (CORBA::Environment& TAO_IN_ENV) { Observer_Map observers; { - ACE_GUARD_THROW_EX ( - ACE_ES_MUTEX, ace_mon, this->lock_, - RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR()); - ACE_CHECK; + TAO_GUARD_THROW (ACE_ES_MUTEX, ace_mon, this->lock_, TAO_IN_ENV, + RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR()); if (this->observers_.current_size () == 0 || this->state_ == ACE_EventChannel::SHUTDOWN) @@ -1120,7 +1125,7 @@ ACE_ES_Consumer_Module::connected (ACE_Push_Consumer_Proxy *consumer, void ACE_ES_Consumer_Module::shutdown_request (ACE_ES_Dispatch_Request *request) { - ACE_TRY_NEW_ENV + TAO_TRY { Shutdown_Consumer *sc = (Shutdown_Consumer *) request; @@ -1137,26 +1142,25 @@ ACE_ES_Consumer_Module::shutdown_request (ACE_ES_Dispatch_Request *request) // Deactivate the consumer proxy PortableServer::POA_var poa = - sc->consumer ()->_default_POA (ACE_TRY_ENV); - ACE_TRY_CHECK; + sc->consumer ()->_default_POA (TAO_TRY_ENV); + TAO_CHECK_ENV; PortableServer::ObjectId_var id = - poa->servant_to_id (sc->consumer (), ACE_TRY_ENV); - ACE_TRY_CHECK; - poa->deactivate_object (id.in (), ACE_TRY_ENV); - ACE_TRY_CHECK; + poa->servant_to_id (sc->consumer (), TAO_TRY_ENV); + TAO_CHECK_ENV; + poa->deactivate_object (id.in (), TAO_TRY_ENV); + TAO_CHECK_ENV; // Delete the consumer proxy, no need to delete it, is is owned // by the POA // delete sc->consumer (); if (!dont_update) - this->channel_->update_consumer_gwys (ACE_TRY_ENV); - ACE_TRY_CHECK; + this->channel_->update_consumer_gwys (TAO_TRY_ENV); + TAO_CHECK_ENV; - ACE_GUARD_THROW_EX ( - ACE_ES_MUTEX, ace_mon, this->lock_, - RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR()); - ACE_TRY_CHECK; + ACE_ES_GUARD ace_mon (lock_); + if (ace_mon.locked () == 0) + return; // Tell the channel that we may need to shut down. if (all_consumers_.size () <= 0) @@ -1166,12 +1170,11 @@ ACE_ES_Consumer_Module::shutdown_request (ACE_ES_Dispatch_Request *request) channel_->report_disconnect_i (ACE_EventChannel::CONSUMER); } } - ACE_CATCHANY + TAO_CATCHANY { - ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, - "Consumer_Module::shutdown_request"); + TAO_TRY_ENV.print_exception ("Consumer_Module::shutdown_request"); } - ACE_ENDTRY; + TAO_ENDTRY; } void @@ -1180,7 +1183,7 @@ ACE_ES_Consumer_Module::shutdown (void) Consumers copy; { - ACE_Guard<ACE_ES_MUTEX> ace_mon (lock_); + ACE_ES_GUARD ace_mon (lock_); if (ace_mon.locked () == 0) goto DONE; @@ -1219,7 +1222,7 @@ ACE_ES_Consumer_Module::shutdown (void) // Remove the consumer from our list. { - ACE_Guard<ACE_ES_MUTEX> ace_mon (lock_); + ACE_ES_GUARD ace_mon (lock_); if (ace_mon.locked () == 0) ACE_ERROR ((LM_ERROR, "%p Failed to acquire lock.\n", "ACE_ES_Consumer_Module::shutdown")); @@ -1238,16 +1241,15 @@ DONE: void ACE_ES_Consumer_Module::disconnecting (ACE_Push_Consumer_Proxy *consumer, - CORBA::Environment &ACE_TRY_ENV) + CORBA::Environment &TAO_IN_ENV) { { - ACE_GUARD_THROW_EX ( - ACE_ES_MUTEX, ace_mon, this->lock_, - RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR()); - ACE_CHECK; + ACE_ES_GUARD ace_mon (lock_); + if (ace_mon.locked () == 0) + TAO_THROW (RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR()); if (all_consumers_.remove (consumer) == -1) - ACE_THROW (RtecEventChannelAdmin::EventChannel::SUBSCRIPTION_ERROR()); + TAO_THROW (RtecEventChannelAdmin::EventChannel::SUBSCRIPTION_ERROR()); } // Tell everyone else that the consumer is disconnecting. This @@ -1255,7 +1257,7 @@ ACE_ES_Consumer_Module::disconnecting (ACE_Push_Consumer_Proxy *consumer, // etc. However, messages may still be queued in the ReactorEx or // in the Dispatching Module for this consumer, so no queues or // proxies can be deleted just yet. - down_->disconnecting (consumer, ACE_TRY_ENV); + down_->disconnecting (consumer, TAO_IN_ENV); // Send a shutdown message through the system. When this is // dispatched, the consumer proxy will be deleted. <request> is @@ -1270,13 +1272,13 @@ ACE_ES_Consumer_Module::disconnecting (ACE_Push_Consumer_Proxy *consumer, Shutdown_Consumer *sc = new Shutdown_Consumer (this, consumer, scheduler.in ()); if (sc == 0) - ACE_THROW (CORBA::NO_MEMORY ()); + TAO_THROW (CORBA::NO_MEMORY ()); // Create a wrapper around the dispatch request. Flush_Queue_ACT *act = new Flush_Queue_ACT (sc, channel_->dispatching_module_); if (act == 0) - ACE_THROW (CORBA::NO_MEMORY ()); + TAO_THROW (CORBA::NO_MEMORY ()); // ACE_DEBUG ((LM_DEBUG, "EC (%t) initiating consumer disconnect.\n")); @@ -1325,7 +1327,7 @@ ACE_ES_Consumer_Module::push (const ACE_ES_Dispatch_Request *request, RtecEventChannelAdmin::ProxyPushSupplier_ptr ACE_ES_Consumer_Module::obtain_push_supplier (CORBA::Environment &ACE_TRY_ENV) { - RtecEventChannelAdmin::ProxyPushSupplier_ptr proxy = + RtecEventChannelAdmin::ProxyPushSupplier_ptr proxy = RtecEventChannelAdmin::ProxyPushSupplier::_nil (); auto_ptr<ACE_Push_Consumer_Proxy> new_consumer (new ACE_Push_Consumer_Proxy (this)); @@ -1335,13 +1337,12 @@ ACE_ES_Consumer_Module::obtain_push_supplier (CORBA::Environment &ACE_TRY_ENV) { ACE_ERROR ((LM_ERROR, "ACE_EventChannel" "::obtain_push_supplier failed.\n")); - ACE_THROW_RETURN (CORBA::NO_MEMORY (), proxy); + TAO_THROW_RETURN (CORBA::NO_MEMORY (), proxy); } { - ACE_GUARD_THROW_EX ( - ACE_ES_MUTEX, ace_mon, this->lock_, - RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR()); + ACE_GUARD_THROW_EX (ACE_ES_MUTEX, ace_mon, this->lock_, + RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR()); ACE_CHECK_RETURN (proxy); if (all_consumers_.insert (new_consumer.get ()) == -1) @@ -2093,57 +2094,57 @@ ACE_ES_Dispatch_Request * ACE_ES_Consumer_Correlation::correlate (ACE_ES_Consumer_Rep *cr, const TAO_EC_Event &event) { - // If the consumer has specified correlation criteria, then we must - // first acquire the mutex. - ACE_Guard<ACE_ES_MUTEX> ace_mon (lock_); - if (ace_mon.locked () == 0) - ACE_ERROR_RETURN ((LM_ERROR, "%p.\n", - "ACE_ES_Consumer_Correlation::push"), 0); - - int bit = ACE_INT2BIT[cr->type_id ()]; - if (ACE_BIT_DISABLED (this->pending_flags_, bit)) - { - // Add the new event to the pending events. - pending_events_[cr->type_id ()] += event; - // Set the bit corresponding to the arrived event. - // This should be pending_flags_->event_arrived (index); - ACE_SET_BITS (pending_flags_, bit); - } - - ACE_ES_Dispatch_Request *request = 0; - TAO_EC_Event_Array *outbox = 0; - // Since add_events changes pending_flags_, we need to keep this - // for all iterations through the conjunction groups. - u_long freeze_pending_flags = pending_flags_; - - for (int x=0; x < n_conjunction_groups_; x++) - { - if (conjunction_groups_[x].should_forward (freeze_pending_flags)) - { - // If there is a deadline timer for this conjunction group, - // this will reschedule them. - conjunction_groups_[x].reschedule_deadline (); - - // First time in, allocate the new dispatch request. - if (request == 0) - { - request = - new ACE_ES_Dispatch_Request (consumer_, - cr->dependency ()->rt_info); - if (request == 0) - ACE_ERROR_RETURN ((LM_ERROR, "%p.\n", - "ACE_ES_Consumer_Correlation::correlate"), 0); - outbox = &(request->event_set ()); - } - - // Add each of the pending events for this correlation to - // the outgoing dispatch request. If outbox == 0, then - // this will just clear any pending events. - conjunction_groups_[x].add_events (outbox, - pending_events_, - pending_flags_); - } - } + // If the consumer has specified correlation criteria, then we must + // first acquire the mutex. + ACE_ES_GUARD ace_mon (lock_); + if (ace_mon.locked () == 0) + ACE_ERROR_RETURN ((LM_ERROR, "%p.\n", + "ACE_ES_Consumer_Correlation::push"), 0); + + int bit = ACE_INT2BIT[cr->type_id ()]; + if (ACE_BIT_DISABLED (this->pending_flags_, bit)) + { + // Add the new event to the pending events. + pending_events_[cr->type_id ()] += event; + // Set the bit corresponding to the arrived event. + // This should be pending_flags_->event_arrived (index); + ACE_SET_BITS (pending_flags_, bit); + } + + ACE_ES_Dispatch_Request *request = 0; + TAO_EC_Event_Array *outbox = 0; + // Since add_events changes pending_flags_, we need to keep this + // for all iterations through the conjunction groups. + u_long freeze_pending_flags = pending_flags_; + + for (int x=0; x < n_conjunction_groups_; x++) + { + if (conjunction_groups_[x].should_forward (freeze_pending_flags)) + { + // If there is a deadline timer for this conjunction group, + // this will reschedule them. + conjunction_groups_[x].reschedule_deadline (); + + // First time in, allocate the new dispatch request. + if (request == 0) + { + request = + new ACE_ES_Dispatch_Request (consumer_, + cr->dependency ()->rt_info); + if (request == 0) + ACE_ERROR_RETURN ((LM_ERROR, "%p.\n", + "ACE_ES_Consumer_Correlation::correlate"), 0); + outbox = &(request->event_set ()); + } + + // Add each of the pending events for this correlation to + // the outgoing dispatch request. If outbox == 0, then + // this will just clear any pending events. + conjunction_groups_[x].add_events (outbox, + pending_events_, + pending_flags_); + } + } return request; } @@ -2373,15 +2374,14 @@ ACE_ES_Subscription_Module::reregister_consumers (RtecEventComm::EventSourceID s void ACE_ES_Subscription_Module::disconnecting (ACE_Push_Supplier_Proxy *supplier, - CORBA::Environment &ACE_TRY_ENV) + CORBA::Environment &TAO_IN_ENV) { - ACE_WRITE_GUARD_THROW_EX ( - ACE_ES_RW_LOCK, ace_mon, this->lock_, - RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR()); - ACE_CHECK; + ACE_ES_WGUARD ace_mon (lock_); + if (ace_mon.locked () == 0) + TAO_THROW (RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR()); if (all_suppliers_.remove (supplier) == -1) - ACE_THROW (RtecEventChannelAdmin::EventChannel::SUBSCRIPTION_ERROR()); + TAO_THROW (RtecEventChannelAdmin::EventChannel::SUBSCRIPTION_ERROR()); // Remove all consumers from the supplier's source-based subscription lists. ACE_ES_Subscription_Info::Subscriber_Set_Iterator source_iterator @@ -3142,19 +3142,17 @@ ACE_ES_Supplier_Module::connected (ACE_Push_Supplier_Proxy *supplier, void ACE_ES_Supplier_Module::disconnecting (ACE_Push_Supplier_Proxy *supplier, - CORBA::Environment &ACE_TRY_ENV) + CORBA::Environment &TAO_IN_ENV) { CORBA::Boolean need_update = 0; { - ACE_GUARD_THROW_EX ( - ACE_ES_MUTEX, ace_mon, this->lock_, - RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR()); - ACE_CHECK; + TAO_GUARD_THROW (ACE_SYNCH_MUTEX, ace_mon, this->lock_, TAO_IN_ENV, + RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR()); if (all_suppliers_.remove (supplier) == -1) - ACE_THROW (RtecEventChannelAdmin::EventChannel::SUBSCRIPTION_ERROR()); + TAO_THROW (RtecEventChannelAdmin::EventChannel::SUBSCRIPTION_ERROR()); - up_->disconnecting (supplier, ACE_TRY_ENV); + up_->disconnecting (supplier, TAO_IN_ENV); if (this->all_suppliers_.size () <= 0) { @@ -3171,7 +3169,7 @@ ACE_ES_Supplier_Module::disconnecting (ACE_Push_Supplier_Proxy *supplier, // CORBA::release (supplier); } if (need_update) - this->channel_->update_supplier_gwys (ACE_TRY_ENV); + this->channel_->update_supplier_gwys (TAO_IN_ENV); } void @@ -3180,7 +3178,9 @@ ACE_ES_Supplier_Module::shutdown (void) Suppliers copy; { - ACE_GUARD (ACE_ES_MUTEX, ace_mon, this->lock_); + ACE_ES_GUARD ace_mon (lock_); + if (ace_mon.locked () == 0) + return; copy = all_suppliers_; } @@ -3206,7 +3206,7 @@ ACE_ES_Supplier_Module::shutdown (void) RtecEventChannelAdmin::ProxyPushConsumer_ptr ACE_ES_Supplier_Module::obtain_push_consumer (CORBA::Environment &ACE_TRY_ENV) { - RtecEventChannelAdmin::ProxyPushConsumer_ptr proxy = + RtecEventChannelAdmin::ProxyPushConsumer_ptr proxy = RtecEventChannelAdmin::ProxyPushConsumer::_nil (); auto_ptr<ACE_Push_Supplier_Proxy> new_supplier (new ACE_Push_Supplier_Proxy (this)); @@ -3215,9 +3215,8 @@ ACE_ES_Supplier_Module::obtain_push_consumer (CORBA::Environment &ACE_TRY_ENV) ACE_THROW_RETURN (CORBA::NO_MEMORY (), proxy); { - ACE_GUARD_THROW_EX ( - ACE_ES_MUTEX, ace_mon, this->lock_, - RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR()); + ACE_GUARD_THROW_EX (ACE_ES_MUTEX, ace_mon, this->lock_, + RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR()); ACE_CHECK_RETURN (proxy); if (all_suppliers_.insert (new_supplier.get ()) == -1) |