diff options
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.cpp')
-rw-r--r-- | TAO/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.cpp | 139 |
1 files changed, 84 insertions, 55 deletions
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.cpp index c6e919c2dd3..9e8b2a1d0de 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.cpp @@ -27,6 +27,8 @@ TAO_ECG_UDP_Sender::get_local_addr (ACE_INET_Addr& addr) void TAO_ECG_UDP_Sender::init (RtecEventChannelAdmin::EventChannel_ptr lcl_ec, + RtecScheduler::Scheduler_ptr lcl_sched, + const char* lcl_name, RtecUDPAdmin::AddrServer_ptr addr_server, TAO_ECG_UDP_Out_Endpoint* endpoint, CORBA::Environment &TAO_IN_ENV) @@ -38,6 +40,29 @@ TAO_ECG_UDP_Sender::init (RtecEventChannelAdmin::EventChannel_ptr lcl_ec, RtecUDPAdmin::AddrServer::_duplicate (addr_server); this->endpoint_ = endpoint; + + this->lcl_info_ = lcl_sched->lookup (lcl_name, TAO_IN_ENV); + TAO_CHECK_ENV_RETURN_VOID (TAO_IN_ENV); + if (this->lcl_info_ == -1) + { + this->lcl_info_ = + lcl_sched->create (lcl_name, TAO_IN_ENV); + TAO_CHECK_ENV_RETURN_VOID (TAO_IN_ENV); + + ACE_Time_Value tv (0, 500); + TimeBase::TimeT time; + ORBSVCS_Time::Time_Value_to_TimeT (time, tv); + lcl_sched->set (this->lcl_info_, + RtecScheduler::VERY_HIGH_CRITICALITY, + time, time, time, + 25000 * 10, + RtecScheduler::VERY_LOW_IMPORTANCE, + time, + 0, + RtecScheduler::OPERATION, + TAO_IN_ENV); + TAO_CHECK_ENV_RETURN_VOID (TAO_IN_ENV); + } } int @@ -72,6 +97,10 @@ TAO_ECG_UDP_Sender::open (RtecEventChannelAdmin::ConsumerQOS& sub, if (sub.dependencies.length () == 0) return; + for (CORBA::ULong j = 0; j < sub.dependencies.length (); ++j) + { + sub.dependencies[j].rt_info = this->lcl_info_; + } //ACE_DEBUG ((LM_DEBUG, "ECG (%t) Gateway/Supplier ")); //ACE_SupplierQOS_Factory::debug (pub); @@ -98,26 +127,16 @@ TAO_ECG_UDP_Sender::open (RtecEventChannelAdmin::ConsumerQOS& sub, } void -TAO_ECG_UDP_Sender::close (CORBA::Environment &ACE_TRY_ENV) +TAO_ECG_UDP_Sender::close (CORBA::Environment &env) { // ACE_DEBUG ((LM_DEBUG, "ECG (%t) Closing gateway\n")); if (CORBA::is_nil (this->supplier_proxy_.in ())) return; - this->supplier_proxy_->disconnect_push_supplier (ACE_TRY_ENV); - ACE_CHECK; - + this->supplier_proxy_->disconnect_push_supplier (env); + if (env.exception () != 0) return; this->supplier_proxy_ = RtecEventChannelAdmin::ProxyPushSupplier::_nil (); - - PortableServer::POA_var poa = - this->_default_POA (ACE_TRY_ENV); - ACE_CHECK; - PortableServer::ObjectId_var id = - poa->servant_to_id (this, ACE_TRY_ENV); - ACE_CHECK; - poa->deactivate_object (id.in (), ACE_TRY_ENV); - ACE_CHECK; } void @@ -130,7 +149,7 @@ TAO_ECG_UDP_Sender::disconnect_push_consumer (CORBA::Environment &) void TAO_ECG_UDP_Sender::push (const RtecEventComm::EventSet &events, - CORBA::Environment & ACE_TRY_ENV) + CORBA::Environment & TAO_IN_ENV) { // ACE_DEBUG ((LM_DEBUG, "ECG_UDP_Sender::push - \n")); @@ -167,8 +186,8 @@ TAO_ECG_UDP_Sender::push (const RtecEventComm::EventSet &events, // Grab the right mcast group for this event... RtecUDPAdmin::UDP_Addr udp_addr; - this->addr_server_->get_addr (header, udp_addr, ACE_TRY_ENV); - ACE_CHECK; + this->addr_server_->get_addr (header, udp_addr, TAO_IN_ENV); + TAO_CHECK_ENV_RETURN_VOID(TAO_IN_ENV); // Start building the message TAO_OutputCDR cdr; @@ -177,11 +196,11 @@ TAO_ECG_UDP_Sender::push (const RtecEventComm::EventSet &events, // marshal a modified version of the header, but the payload is // marshal without any extra copies. cdr.write_ulong (1); - cdr.encode (RtecEventComm::_tc_EventHeader, &header, 0, ACE_TRY_ENV); - ACE_CHECK; + cdr.encode (RtecEventComm::_tc_EventHeader, &header, 0, TAO_IN_ENV); + TAO_CHECK_ENV_RETURN_VOID(TAO_IN_ENV); - cdr.encode (RtecEventComm::_tc_EventData, &e.data, 0, ACE_TRY_ENV); - ACE_CHECK; + cdr.encode (RtecEventComm::_tc_EventData, &e.data, 0, TAO_IN_ENV); + TAO_CHECK_ENV_RETURN_VOID(TAO_IN_ENV); const int TAO_WRITEV_MAX = IOV_MAX; iovec iov[TAO_WRITEV_MAX]; @@ -229,8 +248,8 @@ TAO_ECG_UDP_Sender::push (const RtecEventComm::EventSet &events, fragment_count, iov, iovcnt, - ACE_TRY_ENV); - ACE_CHECK; + TAO_IN_ENV); + TAO_CHECK_ENV_RETURN_VOID(TAO_IN_ENV); fragment_id++; fragment_offset += max_fragment_payload; @@ -256,8 +275,8 @@ TAO_ECG_UDP_Sender::push (const RtecEventComm::EventSet &events, fragment_count, iov, iovcnt, - ACE_TRY_ENV); - ACE_CHECK; + TAO_IN_ENV); + TAO_CHECK_ENV_RETURN_VOID(TAO_IN_ENV); fragment_id++; fragment_offset += max_fragment_payload; @@ -277,8 +296,8 @@ TAO_ECG_UDP_Sender::push (const RtecEventComm::EventSet &events, fragment_count, iov, iovcnt, - ACE_TRY_ENV); - ACE_CHECK; + TAO_IN_ENV); + TAO_CHECK_ENV_RETURN_VOID(TAO_IN_ENV); fragment_id++; fragment_offset += fragment_size; @@ -330,7 +349,7 @@ TAO_ECG_UDP_Sender::send_fragment (const RtecUDPAdmin::UDP_Addr& udp_addr, { CORBA::ULong header[TAO_ECG_UDP_Sender::ECG_HEADER_SIZE / sizeof(CORBA::ULong) - + ACE_CDR::MAX_ALIGNMENT]; + + CDR::MAX_ALIGNMENT]; char* buf = ACE_reinterpret_cast(char*,header); TAO_OutputCDR cdr (buf, sizeof(header)); cdr.write_boolean (TAO_ENCAP_BYTE_ORDER); @@ -364,14 +383,14 @@ TAO_ECG_UDP_Sender::send_fragment (const RtecUDPAdmin::UDP_Addr& udp_addr, // @@ TODO Use a Event Channel specific exception ACE_DEBUG ((LM_DEBUG, "ECG_UDP (%t) send failed %p\n", "")); - TAO_THROW(CORBA::COMM_FAILURE ()); + TAO_THROW(CORBA::COMM_FAILURE (CORBA::COMPLETED_NO)); } else if (n == 0) { // @@ TODO Use a Event Channel specific exception ACE_DEBUG ((LM_DEBUG, "ECG_UDP (%t) EOF on send \n")); - TAO_THROW(CORBA::COMM_FAILURE ()); + TAO_THROW(CORBA::COMM_FAILURE (CORBA::COMPLETED_NO)); } } @@ -518,7 +537,7 @@ TAO_ECG_UDP_Request_Entry (CORBA::Boolean byte_order, request_size_ (request_size), fragment_count_ (fragment_count) { - ACE_CDR::grow (&this->payload_, this->request_size_); + CDR::grow (&this->payload_, this->request_size_); this->payload_.wr_ptr (request_size_); this->received_fragments_ = this->default_received_fragments_; @@ -638,6 +657,8 @@ TAO_ECG_UDP_Receiver::TAO_ECG_UDP_Receiver (void) void TAO_ECG_UDP_Receiver::init (RtecEventChannelAdmin::EventChannel_ptr lcl_ec, + RtecScheduler::Scheduler_ptr lcl_sched, + const char* lcl_name, TAO_ECG_UDP_Out_Endpoint* ignore_from, RtecUDPAdmin::AddrServer_ptr addr_server, ACE_Reactor *reactor, @@ -653,6 +674,29 @@ TAO_ECG_UDP_Receiver::init (RtecEventChannelAdmin::EventChannel_ptr lcl_ec, this->addr_server_ = RtecUDPAdmin::AddrServer::_duplicate (addr_server); + this->lcl_info_ = lcl_sched->lookup (lcl_name, TAO_IN_ENV); + TAO_CHECK_ENV_RETURN_VOID (TAO_IN_ENV); + if (this->lcl_info_ == -1) + { + this->lcl_info_ = + lcl_sched->create (lcl_name, TAO_IN_ENV); + TAO_CHECK_ENV_RETURN_VOID (TAO_IN_ENV); + + ACE_Time_Value tv (0, 500); + TimeBase::TimeT time; + ORBSVCS_Time::Time_Value_to_TimeT (time, tv); + lcl_sched->set (this->lcl_info_, + RtecScheduler::VERY_HIGH_CRITICALITY, + time, time, time, + 25000 * 10, + RtecScheduler::VERY_LOW_IMPORTANCE, + time, + 1, + RtecScheduler::REMOTE_DEPENDANT, + TAO_IN_ENV); + TAO_CHECK_ENV_RETURN_VOID (TAO_IN_ENV); + } + this->reactor_ = reactor; this->max_timeout_ = max_timeout; // @@ TODO throw an exception.... @@ -679,6 +723,11 @@ TAO_ECG_UDP_Receiver::open (RtecEventChannelAdmin::SupplierQOS& pub, if (pub.publications.length () == 0) return; + for (CORBA::ULong i = 0; i < pub.publications.length (); ++i) + { + pub.publications[i].dependency_info.rt_info = this->lcl_info_; + } + // = Connect as a supplier to the local EC RtecEventChannelAdmin::SupplierAdmin_var supplier_admin = this->lcl_ec_->for_suppliers (TAO_IN_ENV); @@ -702,26 +751,17 @@ TAO_ECG_UDP_Receiver::open (RtecEventChannelAdmin::SupplierQOS& pub, } void -TAO_ECG_UDP_Receiver::close (CORBA::Environment &ACE_TRY_ENV) +TAO_ECG_UDP_Receiver::close (CORBA::Environment &env) { // ACE_DEBUG ((LM_DEBUG, "ECG (%t) Closing gateway\n")); if (CORBA::is_nil (this->consumer_proxy_.in ())) return; - this->consumer_proxy_->disconnect_push_consumer (ACE_TRY_ENV); - ACE_CHECK; - + this->consumer_proxy_->disconnect_push_consumer (env); + if (env.exception () != 0) return; this->consumer_proxy_ = RtecEventChannelAdmin::ProxyPushConsumer::_nil (); - PortableServer::POA_var poa = - this->_default_POA (ACE_TRY_ENV); - ACE_CHECK; - PortableServer::ObjectId_var id = - poa->servant_to_id (this, ACE_TRY_ENV); - ACE_CHECK; - poa->deactivate_object (id.in (), ACE_TRY_ENV); - ACE_CHECK; } void @@ -749,7 +789,7 @@ TAO_ECG_UDP_Receiver::handle_input (ACE_SOCK_Dgram& dgram) // Use ULong so the alignment is right. CORBA::ULong header[TAO_ECG_UDP_Sender::ECG_HEADER_SIZE / sizeof(CORBA::ULong) - + ACE_CDR::MAX_ALIGNMENT]; + + CDR::MAX_ALIGNMENT]; ACE_INET_Addr from; ssize_t n = dgram.recv (header, sizeof(header), from, MSG_PEEK); if (n == -1) @@ -1065,17 +1105,6 @@ TAO_ECG_Mcast_EH::close (CORBA::Environment& TAO_IN_ENV) this->handle_ = 0; TAO_CHECK_ENV_RETURN (TAO_IN_ENV, -1); - { - PortableServer::POA_var poa = - this->observer_._default_POA (ACE_TRY_ENV); - ACE_CHECK_RETURN (-1); - PortableServer::ObjectId_var id = - poa->servant_to_id (&this->observer_, ACE_TRY_ENV); - ACE_CHECK_RETURN (-1); - poa->deactivate_object (id.in (), ACE_TRY_ENV); - ACE_CHECK_RETURN (-1); - } - return 0; } @@ -1137,7 +1166,7 @@ TAO_ECG_Mcast_EH::update_consumer (const RtecEventChannelAdmin::ConsumerQOS& sub ACE_INET_Addr inet_addr (addr.port, addr.ipaddr); if (this->subscribe (inet_addr) == -1) - ACE_ERROR ((LM_ERROR, + ACE_ERROR ((LM_DEBUG, "cannot subscribe to %s:%d\n", inet_addr.get_host_addr (), inet_addr.get_port_number ())); |