diff options
Diffstat (limited to 'TAO/local/bin/Event_Service/Event_Channel.cpp')
-rw-r--r-- | TAO/local/bin/Event_Service/Event_Channel.cpp | 201 |
1 files changed, 34 insertions, 167 deletions
diff --git a/TAO/local/bin/Event_Service/Event_Channel.cpp b/TAO/local/bin/Event_Service/Event_Channel.cpp index dcb2c883bd7..47919109f66 100644 --- a/TAO/local/bin/Event_Service/Event_Channel.cpp +++ b/TAO/local/bin/Event_Service/Event_Channel.cpp @@ -1349,21 +1349,6 @@ ACE_ES_Consumer_Correlation::connected (ACE_Push_Consumer_Proxy *consumer, return -1; - // Check if client has specified event fowarding. - if (consumer->qos ().forward_event.type_ != ACE_ES_EVENT_ANY) - { - forwarding_rt_info_ = iter.first_rt_info (); - - if (forwarding_rt_info_ == 0) - ACE_ERROR_RETURN ((LM_ERROR, - "ACE_ES_Consumer_Correlation::connected: " - "could not get rt_info_ from event " - "forwarding consumer.\n"), -1); - - if (this->initialize_event_forwarding () == -1) - return -1; - } - int cgroup_index = -1; int dgroup_index = -1; int crep_index = 0; @@ -1634,53 +1619,6 @@ ACE_ES_Consumer_Correlation::register_event (RtecEventChannelAdmin::Dependency & } int -ACE_ES_Consumer_Correlation::initialize_event_forwarding (void) -{ - ACE_TRY - { - // Get supplier admin object. - RtecEventChannelAdmin::SupplierAdmin_ptr sa = - correlation_module_->channel_->for_suppliers (ACE_TRY_ENV); - ACE_CHECK_ENV; - // if (env.exception () != 0) - // ACE_ERROR_RETURN ((LM_ERROR, "ACE_ES_Consumer_Correlation::initialize_event_forwarding" - // "for_suppliers failed.\n"), -1); - - // Get push consumer proxy. - channel_ = sa->obtain_push_consumer (ACE_TRY_ENV); - ACE_CHECK_ENV; - - // if (env.exception () != 0) - // ACE_ERROR_RETURN ((LM_ERROR, "ACE_ES_Consumer_Correlation::initialize_event_forwarding" - // "obtain_push_consumer failed.\n"), -1); - - // Create the supplierQOS. - qos_.publications_.length (1); - qos_.publications_[0].event_.type_ = consumer_->qos ().forward_event.type_; - qos_.publications_[0].dependency_info_.rt_info = forwarding_rt_info_; - qos_.publications_[0].dependency_info_.number_of_calls = 1; - - // Connect to the channel. - channel_->connect_push_supplier (this, qos_, ACE_TRY_ENV); - ACE_CHECK_ENV; - // if (env.exception () != 0) - // ACE_ERROR_RETURN ((LM_ERROR, "ACE_ES_Consumer_Correlation::initialize_event_forwarding" - // "connect_push_supplier failed.\n"), -1); - - // We're connected to the channel. - connected_ = 1; - } - ACE_CATCHANY - { - ACE_ERROR_RETURN ((LM_ERROR, "ACE_ES_Consumer_Correlation::initialize_event_forwarding" - " failed.\n"), -1); - } - ACE_ENDTRY; - - return 0; -} - -int ACE_ES_Consumer_Correlation::disconnecting (void) { // If we were forwarding events, disconnect as a supplier. @@ -1713,60 +1651,42 @@ ACE_ES_Consumer_Correlation::push (ACE_ES_Consumer_Rep *cr, switch (cr->correlation_type ()) { case ACE_ES_Consumer_Rep::NO_CORRELATION: - // Calls reschedule on all disjunction groups it belongs to. - cr->reschedule_deadlines (); - - if (forwarding_rt_info_ != 0) - { - this->push_forward_event (); - return 0; - } - else - { - ACE_TIMEPROBE (" ACE_ES_Consumer_Correlation::push, determine NO CORRELATION"); - ACE_ES_Dispatch_Request *request = - new ACE_ES_Dispatch_Request (consumer_, event, cr->dependency ()->rt_info); - ACE_TIMEPROBE (" ACE_ES_Consumer_Correlation::push, NO_CORR: alloc"); + { + // Calls reschedule on all disjunction groups it belongs to. + cr->reschedule_deadlines (); - if (request == 0) - ACE_ERROR_RETURN ((LM_ERROR, "%p.\n", - "ACE_ES_Consumer_Correlation::push"), 0); + ACE_TIMEPROBE (" ACE_ES_Consumer_Correlation::push, determine NO CORRELATION"); + ACE_ES_Dispatch_Request *request = + new ACE_ES_Dispatch_Request (consumer_, event, cr->dependency ()->rt_info); + ACE_TIMEPROBE (" ACE_ES_Consumer_Correlation::push, NO_CORR: alloc"); + + if (request == 0) + ACE_ERROR_RETURN ((LM_ERROR, "%p.\n", + "ACE_ES_Consumer_Correlation::push"), 0); - return request; - } + return request; + } case ACE_ES_Consumer_Rep::CORRELATE: return this->correlate (cr, event); case ACE_ES_Consumer_Rep::DEADLINE_TIMEOUT: { - if (forwarding_rt_info_ != 0) - { - // This will clear any pending events for this group. - cr->top_group ()->add_events (0, pending_events_, pending_flags_); - - // Send the forward event. - this->push_forward_event (); - return 0; - } - else - { - ACE_ES_Dispatch_Request *request = - new ACE_ES_Dispatch_Request (consumer_, cr->dependency ()->rt_info); + ACE_ES_Dispatch_Request *request = + new ACE_ES_Dispatch_Request (consumer_, cr->dependency ()->rt_info); - if (request == 0) - ACE_ERROR_RETURN ((LM_ERROR, "%p.\n", - "ACE_ES_Consumer_Correlation::push"), 0); + if (request == 0) + ACE_ERROR_RETURN ((LM_ERROR, "%p.\n", + "ACE_ES_Consumer_Correlation::push"), 0); - // Add the deadline timeout to the outbox. - request->event_set () += event; + // Add the deadline timeout to the outbox. + request->event_set () += event; - // Add any pending events to the outbox. - cr->top_group ()->add_events (&(request->event_set ()), - pending_events_, pending_flags_); - - return request; - } + // Add any pending events to the outbox. + cr->top_group ()->add_events (&(request->event_set ()), + pending_events_, pending_flags_); + + return request; } default: @@ -1801,9 +1721,6 @@ ACE_ES_Consumer_Correlation::correlate (ACE_ES_Consumer_Rep *cr, // for all iterations through the conjunction groups. u_long freeze_pending_flags = pending_flags_; - // This is to flag forwarding. - int forward_event = 0; - for (int x=0; x < n_conjunction_groups_; x++) { if (conjunction_groups_[x].should_forward (freeze_pending_flags)) @@ -1812,22 +1729,17 @@ ACE_ES_Consumer_Correlation::correlate (ACE_ES_Consumer_Rep *cr, // this will reschedule them. conjunction_groups_[x].reschedule_deadline (); - if (forwarding_rt_info_ == 0) + // First time in, allocate the new dispatch request. + if (request == 0) { - // First time in, allocate the new dispatch request. + request = + new ACE_ES_Dispatch_Request (consumer_, + cr->dependency ()->rt_info); if (request == 0) - { - request = - new ACE_ES_Dispatch_Request (consumer_, - cr->dependency ()->rt_info); - if (request == 0) - ACE_ERROR_RETURN ((LM_ERROR, "%p.\n", - "ACE_ES_Consumer_Correlation::correlate"), 0); - outbox = &(request->event_set ()); - } + ACE_ERROR_RETURN ((LM_ERROR, "%p.\n", + "ACE_ES_Consumer_Correlation::correlate"), 0); + outbox = &(request->event_set ()); } - else - forward_event = 1; // Add each of the pending events for this correlation to // the outgoing dispatch request. If outbox == 0, then @@ -1838,55 +1750,10 @@ ACE_ES_Consumer_Correlation::correlate (ACE_ES_Consumer_Rep *cr, } } - if (forward_event) - { - this->push_forward_event (); - return 0; - } - else - return request; + return request; } -void -ACE_ES_Consumer_Correlation::push_forward_event (void) -{ - ACE_TRY - { - RtecEventComm::EventSet set; - set.length(1); - set[0] = consumer_->qos ().forward_event; - channel_->push (set, ACE_TRY_ENV); - } - ACE_CATCH (CORBA::BAD_OPERATION, bad_op) - { - ACE_ERROR ((LM_ERROR, - "ACE_ES_Consumer_Correlation::push_forward_event - ")); - // @@ TODO handle exceptions. - // "BAD_OP %s: %d, %s\n", - // bad_op.id(), - // bad_op.minor(), - // bad_op.param())); - } - ACE_CATCH (CORBA::SystemException, sys_ex) - { - ACE_ERROR ((LM_ERROR, - "ACE_ES_Consumer_Correlation::push_forward_event - ")); - // @@ TODO: How to check the exceptions? - // "SYS_EX %s: %d, %s\n", - // sys_ex.id(), - // sys_ex.minor(), - // sys_ex.param())); - } - ACE_CATCHANY - { - ACE_ERROR ((LM_ERROR, - "ACE_ES_Consumer_Correlation::push_forward_event - " - "unexpected exception\n")); - } - ACE_ENDTRY; -} - // ************************************************************ ACE_ES_Consumer_Rep::~ACE_ES_Consumer_Rep (void) |