summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.cpp')
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.cpp139
1 files changed, 84 insertions, 55 deletions
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.cpp
index c6e919c2dd3..9e8b2a1d0de 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.cpp
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.cpp
@@ -27,6 +27,8 @@ TAO_ECG_UDP_Sender::get_local_addr (ACE_INET_Addr& addr)
void
TAO_ECG_UDP_Sender::init (RtecEventChannelAdmin::EventChannel_ptr lcl_ec,
+ RtecScheduler::Scheduler_ptr lcl_sched,
+ const char* lcl_name,
RtecUDPAdmin::AddrServer_ptr addr_server,
TAO_ECG_UDP_Out_Endpoint* endpoint,
CORBA::Environment &TAO_IN_ENV)
@@ -38,6 +40,29 @@ TAO_ECG_UDP_Sender::init (RtecEventChannelAdmin::EventChannel_ptr lcl_ec,
RtecUDPAdmin::AddrServer::_duplicate (addr_server);
this->endpoint_ = endpoint;
+
+ this->lcl_info_ = lcl_sched->lookup (lcl_name, TAO_IN_ENV);
+ TAO_CHECK_ENV_RETURN_VOID (TAO_IN_ENV);
+ if (this->lcl_info_ == -1)
+ {
+ this->lcl_info_ =
+ lcl_sched->create (lcl_name, TAO_IN_ENV);
+ TAO_CHECK_ENV_RETURN_VOID (TAO_IN_ENV);
+
+ ACE_Time_Value tv (0, 500);
+ TimeBase::TimeT time;
+ ORBSVCS_Time::Time_Value_to_TimeT (time, tv);
+ lcl_sched->set (this->lcl_info_,
+ RtecScheduler::VERY_HIGH_CRITICALITY,
+ time, time, time,
+ 25000 * 10,
+ RtecScheduler::VERY_LOW_IMPORTANCE,
+ time,
+ 0,
+ RtecScheduler::OPERATION,
+ TAO_IN_ENV);
+ TAO_CHECK_ENV_RETURN_VOID (TAO_IN_ENV);
+ }
}
int
@@ -72,6 +97,10 @@ TAO_ECG_UDP_Sender::open (RtecEventChannelAdmin::ConsumerQOS& sub,
if (sub.dependencies.length () == 0)
return;
+ for (CORBA::ULong j = 0; j < sub.dependencies.length (); ++j)
+ {
+ sub.dependencies[j].rt_info = this->lcl_info_;
+ }
//ACE_DEBUG ((LM_DEBUG, "ECG (%t) Gateway/Supplier "));
//ACE_SupplierQOS_Factory::debug (pub);
@@ -98,26 +127,16 @@ TAO_ECG_UDP_Sender::open (RtecEventChannelAdmin::ConsumerQOS& sub,
}
void
-TAO_ECG_UDP_Sender::close (CORBA::Environment &ACE_TRY_ENV)
+TAO_ECG_UDP_Sender::close (CORBA::Environment &env)
{
// ACE_DEBUG ((LM_DEBUG, "ECG (%t) Closing gateway\n"));
if (CORBA::is_nil (this->supplier_proxy_.in ()))
return;
- this->supplier_proxy_->disconnect_push_supplier (ACE_TRY_ENV);
- ACE_CHECK;
-
+ this->supplier_proxy_->disconnect_push_supplier (env);
+ if (env.exception () != 0) return;
this->supplier_proxy_ =
RtecEventChannelAdmin::ProxyPushSupplier::_nil ();
-
- PortableServer::POA_var poa =
- this->_default_POA (ACE_TRY_ENV);
- ACE_CHECK;
- PortableServer::ObjectId_var id =
- poa->servant_to_id (this, ACE_TRY_ENV);
- ACE_CHECK;
- poa->deactivate_object (id.in (), ACE_TRY_ENV);
- ACE_CHECK;
}
void
@@ -130,7 +149,7 @@ TAO_ECG_UDP_Sender::disconnect_push_consumer (CORBA::Environment &)
void
TAO_ECG_UDP_Sender::push (const RtecEventComm::EventSet &events,
- CORBA::Environment & ACE_TRY_ENV)
+ CORBA::Environment & TAO_IN_ENV)
{
// ACE_DEBUG ((LM_DEBUG, "ECG_UDP_Sender::push - \n"));
@@ -167,8 +186,8 @@ TAO_ECG_UDP_Sender::push (const RtecEventComm::EventSet &events,
// Grab the right mcast group for this event...
RtecUDPAdmin::UDP_Addr udp_addr;
- this->addr_server_->get_addr (header, udp_addr, ACE_TRY_ENV);
- ACE_CHECK;
+ this->addr_server_->get_addr (header, udp_addr, TAO_IN_ENV);
+ TAO_CHECK_ENV_RETURN_VOID(TAO_IN_ENV);
// Start building the message
TAO_OutputCDR cdr;
@@ -177,11 +196,11 @@ TAO_ECG_UDP_Sender::push (const RtecEventComm::EventSet &events,
// marshal a modified version of the header, but the payload is
// marshal without any extra copies.
cdr.write_ulong (1);
- cdr.encode (RtecEventComm::_tc_EventHeader, &header, 0, ACE_TRY_ENV);
- ACE_CHECK;
+ cdr.encode (RtecEventComm::_tc_EventHeader, &header, 0, TAO_IN_ENV);
+ TAO_CHECK_ENV_RETURN_VOID(TAO_IN_ENV);
- cdr.encode (RtecEventComm::_tc_EventData, &e.data, 0, ACE_TRY_ENV);
- ACE_CHECK;
+ cdr.encode (RtecEventComm::_tc_EventData, &e.data, 0, TAO_IN_ENV);
+ TAO_CHECK_ENV_RETURN_VOID(TAO_IN_ENV);
const int TAO_WRITEV_MAX = IOV_MAX;
iovec iov[TAO_WRITEV_MAX];
@@ -229,8 +248,8 @@ TAO_ECG_UDP_Sender::push (const RtecEventComm::EventSet &events,
fragment_count,
iov,
iovcnt,
- ACE_TRY_ENV);
- ACE_CHECK;
+ TAO_IN_ENV);
+ TAO_CHECK_ENV_RETURN_VOID(TAO_IN_ENV);
fragment_id++;
fragment_offset += max_fragment_payload;
@@ -256,8 +275,8 @@ TAO_ECG_UDP_Sender::push (const RtecEventComm::EventSet &events,
fragment_count,
iov,
iovcnt,
- ACE_TRY_ENV);
- ACE_CHECK;
+ TAO_IN_ENV);
+ TAO_CHECK_ENV_RETURN_VOID(TAO_IN_ENV);
fragment_id++;
fragment_offset += max_fragment_payload;
@@ -277,8 +296,8 @@ TAO_ECG_UDP_Sender::push (const RtecEventComm::EventSet &events,
fragment_count,
iov,
iovcnt,
- ACE_TRY_ENV);
- ACE_CHECK;
+ TAO_IN_ENV);
+ TAO_CHECK_ENV_RETURN_VOID(TAO_IN_ENV);
fragment_id++;
fragment_offset += fragment_size;
@@ -330,7 +349,7 @@ TAO_ECG_UDP_Sender::send_fragment (const RtecUDPAdmin::UDP_Addr& udp_addr,
{
CORBA::ULong header[TAO_ECG_UDP_Sender::ECG_HEADER_SIZE
/ sizeof(CORBA::ULong)
- + ACE_CDR::MAX_ALIGNMENT];
+ + CDR::MAX_ALIGNMENT];
char* buf = ACE_reinterpret_cast(char*,header);
TAO_OutputCDR cdr (buf, sizeof(header));
cdr.write_boolean (TAO_ENCAP_BYTE_ORDER);
@@ -364,14 +383,14 @@ TAO_ECG_UDP_Sender::send_fragment (const RtecUDPAdmin::UDP_Addr& udp_addr,
// @@ TODO Use a Event Channel specific exception
ACE_DEBUG ((LM_DEBUG,
"ECG_UDP (%t) send failed %p\n", ""));
- TAO_THROW(CORBA::COMM_FAILURE ());
+ TAO_THROW(CORBA::COMM_FAILURE (CORBA::COMPLETED_NO));
}
else if (n == 0)
{
// @@ TODO Use a Event Channel specific exception
ACE_DEBUG ((LM_DEBUG,
"ECG_UDP (%t) EOF on send \n"));
- TAO_THROW(CORBA::COMM_FAILURE ());
+ TAO_THROW(CORBA::COMM_FAILURE (CORBA::COMPLETED_NO));
}
}
@@ -518,7 +537,7 @@ TAO_ECG_UDP_Request_Entry (CORBA::Boolean byte_order,
request_size_ (request_size),
fragment_count_ (fragment_count)
{
- ACE_CDR::grow (&this->payload_, this->request_size_);
+ CDR::grow (&this->payload_, this->request_size_);
this->payload_.wr_ptr (request_size_);
this->received_fragments_ = this->default_received_fragments_;
@@ -638,6 +657,8 @@ TAO_ECG_UDP_Receiver::TAO_ECG_UDP_Receiver (void)
void
TAO_ECG_UDP_Receiver::init (RtecEventChannelAdmin::EventChannel_ptr lcl_ec,
+ RtecScheduler::Scheduler_ptr lcl_sched,
+ const char* lcl_name,
TAO_ECG_UDP_Out_Endpoint* ignore_from,
RtecUDPAdmin::AddrServer_ptr addr_server,
ACE_Reactor *reactor,
@@ -653,6 +674,29 @@ TAO_ECG_UDP_Receiver::init (RtecEventChannelAdmin::EventChannel_ptr lcl_ec,
this->addr_server_ =
RtecUDPAdmin::AddrServer::_duplicate (addr_server);
+ this->lcl_info_ = lcl_sched->lookup (lcl_name, TAO_IN_ENV);
+ TAO_CHECK_ENV_RETURN_VOID (TAO_IN_ENV);
+ if (this->lcl_info_ == -1)
+ {
+ this->lcl_info_ =
+ lcl_sched->create (lcl_name, TAO_IN_ENV);
+ TAO_CHECK_ENV_RETURN_VOID (TAO_IN_ENV);
+
+ ACE_Time_Value tv (0, 500);
+ TimeBase::TimeT time;
+ ORBSVCS_Time::Time_Value_to_TimeT (time, tv);
+ lcl_sched->set (this->lcl_info_,
+ RtecScheduler::VERY_HIGH_CRITICALITY,
+ time, time, time,
+ 25000 * 10,
+ RtecScheduler::VERY_LOW_IMPORTANCE,
+ time,
+ 1,
+ RtecScheduler::REMOTE_DEPENDANT,
+ TAO_IN_ENV);
+ TAO_CHECK_ENV_RETURN_VOID (TAO_IN_ENV);
+ }
+
this->reactor_ = reactor;
this->max_timeout_ = max_timeout;
// @@ TODO throw an exception....
@@ -679,6 +723,11 @@ TAO_ECG_UDP_Receiver::open (RtecEventChannelAdmin::SupplierQOS& pub,
if (pub.publications.length () == 0)
return;
+ for (CORBA::ULong i = 0; i < pub.publications.length (); ++i)
+ {
+ pub.publications[i].dependency_info.rt_info = this->lcl_info_;
+ }
+
// = Connect as a supplier to the local EC
RtecEventChannelAdmin::SupplierAdmin_var supplier_admin =
this->lcl_ec_->for_suppliers (TAO_IN_ENV);
@@ -702,26 +751,17 @@ TAO_ECG_UDP_Receiver::open (RtecEventChannelAdmin::SupplierQOS& pub,
}
void
-TAO_ECG_UDP_Receiver::close (CORBA::Environment &ACE_TRY_ENV)
+TAO_ECG_UDP_Receiver::close (CORBA::Environment &env)
{
// ACE_DEBUG ((LM_DEBUG, "ECG (%t) Closing gateway\n"));
if (CORBA::is_nil (this->consumer_proxy_.in ()))
return;
- this->consumer_proxy_->disconnect_push_consumer (ACE_TRY_ENV);
- ACE_CHECK;
-
+ this->consumer_proxy_->disconnect_push_consumer (env);
+ if (env.exception () != 0) return;
this->consumer_proxy_ =
RtecEventChannelAdmin::ProxyPushConsumer::_nil ();
- PortableServer::POA_var poa =
- this->_default_POA (ACE_TRY_ENV);
- ACE_CHECK;
- PortableServer::ObjectId_var id =
- poa->servant_to_id (this, ACE_TRY_ENV);
- ACE_CHECK;
- poa->deactivate_object (id.in (), ACE_TRY_ENV);
- ACE_CHECK;
}
void
@@ -749,7 +789,7 @@ TAO_ECG_UDP_Receiver::handle_input (ACE_SOCK_Dgram& dgram)
// Use ULong so the alignment is right.
CORBA::ULong header[TAO_ECG_UDP_Sender::ECG_HEADER_SIZE
/ sizeof(CORBA::ULong)
- + ACE_CDR::MAX_ALIGNMENT];
+ + CDR::MAX_ALIGNMENT];
ACE_INET_Addr from;
ssize_t n = dgram.recv (header, sizeof(header), from, MSG_PEEK);
if (n == -1)
@@ -1065,17 +1105,6 @@ TAO_ECG_Mcast_EH::close (CORBA::Environment& TAO_IN_ENV)
this->handle_ = 0;
TAO_CHECK_ENV_RETURN (TAO_IN_ENV, -1);
- {
- PortableServer::POA_var poa =
- this->observer_._default_POA (ACE_TRY_ENV);
- ACE_CHECK_RETURN (-1);
- PortableServer::ObjectId_var id =
- poa->servant_to_id (&this->observer_, ACE_TRY_ENV);
- ACE_CHECK_RETURN (-1);
- poa->deactivate_object (id.in (), ACE_TRY_ENV);
- ACE_CHECK_RETURN (-1);
- }
-
return 0;
}
@@ -1137,7 +1166,7 @@ TAO_ECG_Mcast_EH::update_consumer (const RtecEventChannelAdmin::ConsumerQOS& sub
ACE_INET_Addr inet_addr (addr.port, addr.ipaddr);
if (this->subscribe (inet_addr) == -1)
- ACE_ERROR ((LM_ERROR,
+ ACE_ERROR ((LM_DEBUG,
"cannot subscribe to %s:%d\n",
inet_addr.get_host_addr (),
inet_addr.get_port_number ()));