summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorcoryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1998-09-04 22:00:26 +0000
committercoryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1998-09-04 22:00:26 +0000
commit8fd36918500991a638d81cdcead9c9c80fd119c1 (patch)
tree6e61bb9e28f54be0b4e6e1bdc8a1b5ff748e426e
parent0eb92a3dd5f7ee31b0ef3a5c4c1780a4977a89bb (diff)
downloadATCD-8fd36918500991a638d81cdcead9c9c80fd119c1.tar.gz
ChangeLogTag:Fri Sep 4 16:44:19 1998 Carlos O'Ryan <coryan@cs.wustl.edu>
-rw-r--r--TAO/ChangeLog-98c59
-rw-r--r--TAO/examples/Simulator/Event_Supplier/DOVE_Supplier.cpp23
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/Dispatching_Modules.h7
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/Dispatching_Modules.i4
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_Gateway.cpp4
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_Gateway.h2
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.cpp97
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.h21
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_UDP_Admin.cpp23
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_UDP_Admin.h70
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/Event_Channel.cpp145
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/Event_Channel.h45
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/Event_Channel.i14
-rw-r--r--TAO/orbsvcs/orbsvcs/Event_Utilities.cpp22
-rw-r--r--TAO/orbsvcs/orbsvcs/Event_Utilities.i52
-rw-r--r--TAO/orbsvcs/orbsvcs/Makefile3
-rw-r--r--TAO/orbsvcs/orbsvcs/RtecEventChannelAdmin.idl114
-rw-r--r--TAO/orbsvcs/orbsvcs/RtecEventComm.idl45
-rw-r--r--TAO/orbsvcs/orbsvcs/RtecUDPAdmin.idl28
-rw-r--r--TAO/orbsvcs/tests/EC_Custom_Marshal/ECM_Consumer.cpp6
-rw-r--r--TAO/orbsvcs/tests/EC_Custom_Marshal/ECM_Supplier.cpp22
-rw-r--r--TAO/orbsvcs/tests/EC_Mcast/EC_Mcast.cpp1637
-rw-r--r--TAO/orbsvcs/tests/EC_Mcast/EC_Mcast.h554
-rw-r--r--TAO/orbsvcs/tests/EC_Mcast/EC_Mcast.i120
-rw-r--r--TAO/orbsvcs/tests/EC_Mcast/README27
-rw-r--r--TAO/orbsvcs/tests/EC_Mcast/sample.cfg64
-rw-r--r--TAO/orbsvcs/tests/EC_Multiple/EC_Multiple.cpp43
-rw-r--r--TAO/orbsvcs/tests/EC_Throughput/ECT_Consumer.cpp2
-rw-r--r--TAO/orbsvcs/tests/EC_Throughput/ECT_Supplier.cpp24
-rw-r--r--TAO/orbsvcs/tests/Event_Latency/Event_Latency.cpp25
-rw-r--r--TAO/tests/Cubit/TAO/IDL_Cubit/svc.conf2
31 files changed, 1996 insertions, 1308 deletions
diff --git a/TAO/ChangeLog-98c b/TAO/ChangeLog-98c
index a57d875e88d..2d035267e17 100644
--- a/TAO/ChangeLog-98c
+++ b/TAO/ChangeLog-98c
@@ -1,3 +1,62 @@
+Fri Sep 4 16:44:19 1998 Carlos O'Ryan <coryan@cs.wustl.edu>
+
+ * orbsvcs/orbsvcs/Makefile:
+ * orbsvcs/orbsvcs/RtecUDPAdmin.idl:
+ * orbsvcs/orbsvcs/RtecEventChannelAdmin.idl:
+ * orbsvcs/orbsvcs/Event/EC_Gateway_UDP.h:
+ * orbsvcs/orbsvcs/Event/EC_Gateway_UDP.cpp:
+ * orbsvcs/orbsvcs/Event/EC_UDP_Admin.h:
+ * orbsvcs/orbsvcs/Event/EC_UDP_Admin.cpp:
+ * orbsvcs/orbsvcs/Event/Event_Channel.h:
+ * orbsvcs/orbsvcs/Event/Event_Channel.i:
+ * orbsvcs/orbsvcs/Event/Event_Channel.cpp:
+ * orbsvcs/tests/EC_Mcast/EC_Mcast.h:
+ * orbsvcs/tests/EC_Mcast/EC_Mcast.i:
+ * orbsvcs/tests/EC_Mcast/EC_Mcast.cpp:
+ * orbsvcs/tests/EC_Mcast/sample.cfg:
+ * orbsvcs/tests/EC_Mcast/README:
+ New implementation of the EC_Mcast test, the test can now handle
+ multiple processes, each process joins several multicast groups
+ and push events to a (potentially different) set of multicast
+ groups. Since the configuration is more complicated a sample
+ file is included.
+ The test is prepared to support several interesting features,
+ such as:
+ + Dynamic changes in the multicast group joined.
+ + Handle OS limitations wrt the maximum number of mcast groups
+ per socket.
+ + Support different mappings for the type->mcast group
+ relation (currently the event type *is* the multicast
+ group).
+ The current implementation also offers the initial interfaces to
+ observe changes in the subcription and/or publication list of a
+ *remote* event channel, this will enable the automation and
+ optimization of the local publication list (there is no sense in
+ sending an event if nobody is currently interested).
+
+ * orbsvcs/orbsvcs/Event_Utilities.cpp:
+ * orbsvcs/orbsvcs/Event_Utilities.i:
+ * orbsvcs/orbsvcs/Event/Dispatching_Modules.h:
+ * orbsvcs/orbsvcs/Event/Dispatching_Modules.i:
+ * orbsvcs/orbsvcs/RtecEventComm.idl:
+ * orbsvcs/orbsvcs/Event/EC_Gateway.h:
+ * orbsvcs/orbsvcs/Event/EC_Gateway.cpp:
+ * orbsvcs/orbsvcs/Event/Event_Channel.h:
+ * orbsvcs/orbsvcs/Event/Event_Channel.i:
+ * orbsvcs/orbsvcs/Event/Event_Channel.cpp:
+ * orbsvcs/tests/EC_Custom_Marshal/ECM_Supplier.cpp:
+ * orbsvcs/tests/EC_Custom_Marshal/ECM_Consumer.cpp:
+ * orbsvcs/tests/EC_Multiple/EC_Multiple.cpp:
+ * orbsvcs/tests/EC_Throughput/ECT_Consumer.cpp:
+ * orbsvcs/tests/EC_Throughput/ECT_Supplier.cpp:
+ * orbsvcs/tests/Event_Latency/Event_Latency.cpp:
+ * orbsvcs/tests/Simulator/Event_Supplier/DOVE_Supplier.cpp:
+ Added a new IDL structure to represent the Event Header, this
+ will let us factor out the minimum information needed to
+ transmit QoS and subscription/publication info.
+ I also normalized some of the field names in the RtecEventComm
+ structures.
+
Fri Sep 4 16:22:17 1998 Nagarajan Surendran <naga@cs.wustl.edu>
* tests/Cubit/TAO/MT_Cubit/Globals.{h,cpp}: Added macro VX_VME_INIT
diff --git a/TAO/examples/Simulator/Event_Supplier/DOVE_Supplier.cpp b/TAO/examples/Simulator/Event_Supplier/DOVE_Supplier.cpp
index bb142f67ec5..970b83cd83a 100644
--- a/TAO/examples/Simulator/Event_Supplier/DOVE_Supplier.cpp
+++ b/TAO/examples/Simulator/Event_Supplier/DOVE_Supplier.cpp
@@ -53,12 +53,12 @@ DOVE_Supplier::notify (CORBA::Any &message)
event.source_ = SOURCE_ID;
event.type_ = ACE_ES_EVENT_NOTIFICATION;
event.ttl_ = 1;
- event.creation_time_ = ORBSVCS_Time::zero;
- event.ec_recv_time_ = ORBSVCS_Time::zero;
- event.ec_send_time_ = ORBSVCS_Time::zero;
+ event.header.creation_time = ORBSVCS_Time::zero;
+ event.header.ec_recv_time = ORBSVCS_Time::zero;
+ event.header.ec_send_time = ORBSVCS_Time::zero;
//event.data_.x = 0;
//event.data_.y = 0;
- event.data_.any_value = message;
+ event.data.any_value = message;
RtecEventComm::EventSet events;
events.length (1);
@@ -192,13 +192,14 @@ DOVE_Supplier::connect_Supplier ()
qos.publications[0].event.source_ = SOURCE_ID;
qos.publications[0].event.type_ = ACE_ES_EVENT_NOTIFICATION;
qos.publications[0].event.ttl_ = 1;
- qos.publications[0].event.creation_time_ = ORBSVCS_Time::zero;
- qos.publications[0].event.ec_recv_time_ = ORBSVCS_Time::zero;
- qos.publications[0].event.ec_send_time_ = ORBSVCS_Time::zero;
- qos.publications[0].event.data_.any_value.replace (CORBA::_tc_short,
- &x,
- 0,
- TAO_TRY_ENV);
+ qos.publications[0].event.header.creation_time = ORBSVCS_Time::zero;
+ qos.publications[0].event.header.ec_recv_time = ORBSVCS_Time::zero;
+ qos.publications[0].event.header.ec_send_time = ORBSVCS_Time::zero;
+ qos.publications[0].event.data.any_value.replace (CORBA::_tc_short,
+ &x,
+ 0,
+ TAO_TRY_ENV);
+ TAO_CHECK_ENV;
qos.publications[0].dependency_info.number_of_calls = 1;
qos.publications[0].dependency_info.rt_info = rt_info;
diff --git a/TAO/orbsvcs/orbsvcs/Event/Dispatching_Modules.h b/TAO/orbsvcs/orbsvcs/Event/Dispatching_Modules.h
index eb6ee59581e..8949f967f73 100644
--- a/TAO/orbsvcs/orbsvcs/Event/Dispatching_Modules.h
+++ b/TAO/orbsvcs/orbsvcs/Event/Dispatching_Modules.h
@@ -151,9 +151,10 @@ public:
ACE_ES_Dispatch_Request (ACE_Push_Consumer_Proxy *consumer,
const RtecEventComm::Time &time,
RtecScheduler::handle_t rt_info);
- // Set consumer_ to <consumer> and sets single_event_.creation_time_
- // to <time>. Sets use_single_event_ to 1. <rt_info> describes the
- // method receiving this dispatch.
+ // Set consumer_ to <consumer> and sets
+ // single_event_.header.creation_time to <time>. Sets
+ // use_single_event_ to 1. <rt_info> describes the method
+ // receiving this dispatch.
ACE_ES_Dispatch_Request (ACE_Push_Consumer_Proxy *consumer,
ACE_ES_Event_Container *event,
diff --git a/TAO/orbsvcs/orbsvcs/Event/Dispatching_Modules.i b/TAO/orbsvcs/orbsvcs/Event/Dispatching_Modules.i
index 93b7b6b5c09..17c6be1cb0d 100644
--- a/TAO/orbsvcs/orbsvcs/Event/Dispatching_Modules.i
+++ b/TAO/orbsvcs/orbsvcs/Event/Dispatching_Modules.i
@@ -72,8 +72,8 @@ ACE_ES_Dispatch_Request::ACE_ES_Dispatch_Request (ACE_Push_Consumer_Proxy *consu
single_event_ (),
event_set_ ()
{
- single_event_.creation_time_ = time;
- single_event_.type_ = ACE_ES_EVENT_TIMEOUT;
+ single_event_.header.creation_time = time;
+ single_event_.header.type = ACE_ES_EVENT_TIMEOUT;
}
ACE_INLINE void
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.cpp
index 173edb03d0d..4d5585eff26 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.cpp
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.cpp
@@ -226,12 +226,12 @@ TAO_EC_Gateway_IIOP::push (const RtecEventComm::EventSet &events,
for (u_int i = 0; i < events.length (); ++i)
{
//ACE_DEBUG ((LM_DEBUG, "type = %d ", events[i].type_));
- if (events[i].ttl_ > 0)
+ if (events[i].header.ttl > 0)
{
count++;
out.length (count);
out[count - 1] = events[i];
- out[count - 1].ttl_--;
+ out[count - 1].header.ttl--;
}
}
//ACE_DEBUG ((LM_DEBUG, "count = %d\n", count));
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.h b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.h
index 1e90de9ada6..0897dcb7770 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.h
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.h
@@ -180,4 +180,4 @@ private:
// We talk to the EC (as a consumer) using this proxy.
};
-#endif /* ACE_EVENT_CHANNEL_H */
+#endif /* ACE_EC_GATEWAY_H */
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.cpp
index 5e9673ddbaa..a6ffad336f7 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.cpp
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.cpp
@@ -15,19 +15,27 @@ TAO_ECG_UDP_Sender::TAO_ECG_UDP_Sender (void)
int
TAO_ECG_UDP_Sender::get_local_addr (ACE_INET_Addr& addr)
{
- return this->dgram_.get_local_addr (addr);
+ if (this->dgram_ == 0)
+ return -1;
+ return this->dgram_->get_local_addr (addr);
}
void
TAO_ECG_UDP_Sender::init (RtecEventChannelAdmin::EventChannel_ptr lcl_ec,
RtecScheduler::Scheduler_ptr lcl_sched,
const char* lcl_name,
- const ACE_INET_Addr& ipaddr,
+ RtecUDPAdmin::AddrServer_ptr addr_server,
+ ACE_SOCK_Dgram *dgram,
CORBA::Environment &_env)
{
this->lcl_ec_ =
RtecEventChannelAdmin::EventChannel::_duplicate (lcl_ec);
+ this->addr_server_ =
+ RtecUDPAdmin::AddrServer::_duplicate (addr_server);
+
+ this->dgram_ = dgram;
+
this->lcl_info_ =
lcl_sched->create (lcl_name, _env);
if (_env.exception () != 0) return;
@@ -45,14 +53,6 @@ TAO_ECG_UDP_Sender::init (RtecEventChannelAdmin::EventChannel_ptr lcl_ec,
RtecScheduler::OPERATION,
_env);
if (_env.exception () != 0) return;
-
- if (this->dgram_.open (ipaddr) == -1)
- {
- // @@ TODO Use a Event Channel specific exception
- ACE_ERROR ((LM_ERROR, "ECG_UDP::init - Dgram open failed\n"));
- _env.exception (new CORBA::COMM_FAILURE (CORBA::COMPLETED_NO));
- }
- if (_env.exception () != 0) return;
}
void
@@ -61,14 +61,6 @@ TAO_ECG_UDP_Sender::shutdown (CORBA::Environment& _env)
this->close (_env);
if (_env.exception () == 0) return;
this->lcl_ec_ = RtecEventChannelAdmin::EventChannel::_nil ();
-
- if (this->dgram_.close () == -1)
- {
- // @@ TODO Use a Event Channel specific exception
- ACE_ERROR ((LM_ERROR, "ECG_UDP::init - Dgram close failed\n"));
- _env.exception (new CORBA::COMM_FAILURE (CORBA::COMPLETED_NO));
- }
- if (_env.exception () != 0) return;
}
void
@@ -139,7 +131,7 @@ void
TAO_ECG_UDP_Sender::push (const RtecEventComm::EventSet &events,
CORBA::Environment & _env)
{
- ACE_DEBUG ((LM_DEBUG, "ECG_UDP_Sender::push - "));
+ // ACE_DEBUG ((LM_DEBUG, "ECG_UDP_Sender::push - "));
if (events.length () == 0)
{
@@ -147,7 +139,7 @@ TAO_ECG_UDP_Sender::push (const RtecEventComm::EventSet &events,
return;
}
- ACE_DEBUG ((LM_DEBUG, "%d event(s) - ", events.length ()));
+ // ACE_DEBUG ((LM_DEBUG, "%d event(s) - ", events.length ()));
// @@ TODO, there is an extra data copy here, we should do the event
// modification without it and only compact the necessary events.
@@ -155,25 +147,32 @@ TAO_ECG_UDP_Sender::push (const RtecEventComm::EventSet &events,
RtecEventComm::EventSet out (events.length ());
for (u_int i = 0; i < events.length (); ++i)
{
- //ACE_DEBUG ((LM_DEBUG, "type = %d ", events[i].type_));
- if (events[i].ttl_ > 0)
- {
- count++;
- out.length (count);
- out[count - 1] = events[i];
- out[count - 1].ttl_--;
- }
- }
- ACE_DEBUG ((LM_DEBUG, "count = %d\n", count));
+ if (events[i].header.ttl <= 0)
+ continue;
+
+ const RtecEventComm::Event& e = events[i];
+
+ // Copy only the header...
+ RtecEventComm::EventHeader header = e.header;
+ header.ttl--;
+
+ RtecUDPAdmin::UDP_Addr udp_addr;
+ this->addr_server_->get_addr (header, udp_addr, _env);
+ TAO_CHECK_ENV_RETURN_VOID(_env);
- if (count > 0)
- {
TAO_OutputCDR cdr;
cdr.write_boolean (TAO_ENCAP_BYTE_ORDER);
cdr.write_ulong (0); // Place holder for size...
- cdr.encode (RtecEventComm::_tc_EventSet, &out, 0, _env);
- if (_env.exception () != 0) return;
+ // Marshal as if it was a sequence of one element, notice how we
+ // marshal a modified version of the header, but the data is not
+ // copied...
+ cdr.write_ulong (1);
+ cdr.encode (RtecEventComm::_tc_EventHeader, &header, 0, _env);
+ TAO_CHECK_ENV_RETURN_VOID(_env);
+
+ cdr.encode (RtecEventComm::_tc_EventData, &e.data, 0, _env);
+ TAO_CHECK_ENV_RETURN_VOID(_env);
CORBA::ULong bodylen = cdr.total_length ();
char* buf = ACE_const_cast(char*,cdr.buffer ());
@@ -192,8 +191,9 @@ TAO_ECG_UDP_Sender::push (const RtecEventComm::EventSet &events,
#endif
// This is a good maximum, because Dgrams cannot be longer than
- // 64K and the usual size for a CDR fragment is 512 bytes, still
- // if this is not enough we allocate memory from the heap.
+ // 64K and the usual size for a CDR fragment is 512 bytes.
+ // @@ TODO In the future we may need to allocate some memory
+ // from the heap.
const int TAO_WRITEV_MAX = 128;
ACE_IO_Vector iov[TAO_WRITEV_MAX];
@@ -207,20 +207,23 @@ TAO_ECG_UDP_Sender::push (const RtecEventComm::EventSet &events,
iovcnt++;
}
- ssize_t n = this->dgram_.send (iov, iovcnt);
+ ACE_INET_Addr inet_addr (udp_addr.port, udp_addr.ipaddr);
+ // ACE_DEBUG ((LM_DEBUG, "sending to (%d,%u)\n",
+ // udp_addr.port, udp_addr.ipaddr));
+ ssize_t n = this->dgram_->send (iov, iovcnt, inet_addr);
if (n == -1)
{
// @@ TODO Use a Event Channel specific exception
ACE_DEBUG ((LM_DEBUG,
"ECG_UDP (%t) send failed %p\n", ""));
- _env.exception (new CORBA::COMM_FAILURE (CORBA::COMPLETED_NO));
+ TAO_THROW(CORBA::COMM_FAILURE (CORBA::COMPLETED_NO));
}
else if (n == 0)
{
// @@ TODO Use a Event Channel specific exception
ACE_DEBUG ((LM_DEBUG,
"ECG_UDP (%t) EOF on send \n"));
- _env.exception (new CORBA::COMM_FAILURE (CORBA::COMPLETED_NO));
+ TAO_THROW(CORBA::COMM_FAILURE (CORBA::COMPLETED_NO));
}
}
}
@@ -445,15 +448,25 @@ TAO_ECG_Mcast_EH::TAO_ECG_Mcast_EH (TAO_ECG_UDP_Receiver *recv)
}
int
-TAO_ECG_Mcast_EH::open (const ACE_INET_Addr& mcast_group)
+TAO_ECG_Mcast_EH::open (void)
{
- if (this->dgram_.subscribe (mcast_group) == -1)
- return -1;
return this->reactor ()->register_handler (this,
ACE_Event_Handler::READ_MASK);
}
int
+TAO_ECG_Mcast_EH::subscribe (const ACE_INET_Addr &mcast_addr)
+{
+ return this->dgram_.subscribe (mcast_addr);
+}
+
+int
+TAO_ECG_Mcast_EH::unsubscribe (const ACE_INET_Addr &mcast_addr)
+{
+ return this->dgram_.unsubscribe ();
+}
+
+int
TAO_ECG_Mcast_EH::close (void)
{
if (this->reactor ()->remove_handler (this,
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.h b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.h
index 5031c3ad020..23d7d8416f0 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.h
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.h
@@ -55,8 +55,8 @@
#include "ace/SOCK_CODgram.h"
#include "ace/SOCK_Dgram_Mcast.h"
-#include "orbsvcs/RtecEventCommS.h"
#include "orbsvcs/RtecEventChannelAdminS.h"
+#include "orbsvcs/RtecUDPAdminS.h"
#include "orbsvcs/orbsvcs_export.h"
class TAO_ORBSVCS_Export TAO_ECG_UDP_Sender : public POA_RtecEventComm::PushConsumer
@@ -69,6 +69,8 @@ class TAO_ORBSVCS_Export TAO_ECG_UDP_Sender : public POA_RtecEventComm::PushCons
// This class connect as a consumer to an EventChannel
// and it sends the events using UDP, the UDP address can be a
// normal IP address or it can be a multicast group.
+ // The UDP address is obtained from a RtecUDPAdmin::AddrServer
+ // class.
// It marshalls the events using TAO CDR classes.
// No provisions are taken for message fragmentation.
//
@@ -81,7 +83,8 @@ public:
void init (RtecEventChannelAdmin::EventChannel_ptr lcl_ec,
RtecScheduler::Scheduler_ptr lcl_sched,
const char* lcl_name,
- const ACE_INET_Addr& ipaddr,
+ RtecUDPAdmin::AddrServer_ptr addr_server,
+ ACE_SOCK_Dgram* dgram,
CORBA::Environment &_env);
// To do its job this class requires to know the local EC it will
// connect to; it also requires to build an RT_Info for the local
@@ -118,8 +121,12 @@ private:
RtecEventChannelAdmin::ProxyPushSupplier_var supplier_proxy_;
// We talk to the EC (as a consumer) using this proxy.
- ACE_SOCK_CODgram dgram_;
- // The datagram used to send the data.
+ RtecUDPAdmin::AddrServer_var addr_server_;
+ // We query this object to determine where are the events sent.
+
+ ACE_SOCK_Dgram *dgram_;
+ // The datagram used to sendto(), this object is *not* owned by the
+ // UDP_Sender.
};
class TAO_ORBSVCS_Export TAO_ECG_UDP_Receiver : public POA_RtecEventComm::PushSupplier
@@ -226,7 +233,7 @@ class TAO_ORBSVCS_Export TAO_ECG_Mcast_EH : public ACE_Event_Handler
public:
TAO_ECG_Mcast_EH (TAO_ECG_UDP_Receiver *recv);
- int open (const ACE_INET_Addr& mcast_group);
+ int open (void);
// Open the datagram (join the mcast group) and register with
// this->reactor()
@@ -234,6 +241,10 @@ public:
// Close the datagram (leave the mcast group) and unregister with
// the reactor.
+ int subscribe (const ACE_INET_Addr &mcast_addr);
+ int unsubscribe (const ACE_INET_Addr &mcast_addr);
+ // Control the multicast group subscriptions
+
// Reactor callbacks
virtual int handle_input (ACE_HANDLE fd);
virtual ACE_HANDLE get_handle (void) const;
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_UDP_Admin.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_UDP_Admin.cpp
new file mode 100644
index 00000000000..d45ce1002ab
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_UDP_Admin.cpp
@@ -0,0 +1,23 @@
+// $Id$
+
+#include "orbsvcs/Event/EC_UDP_Admin.h"
+
+ACE_RCSID(Event, EC_UDP_Admin, "$Id$")
+
+TAO_EC_Simple_AddrServer::TAO_EC_Simple_AddrServer (CORBA::UShort port)
+ : port_ (port)
+{
+}
+
+TAO_EC_Simple_AddrServer::~TAO_EC_Simple_AddrServer (void)
+{
+}
+
+void
+TAO_EC_Simple_AddrServer::get_addr (const RtecEventComm::EventHeader& header,
+ RtecUDPAdmin::UDP_Addr_out addr,
+ CORBA::Environment&)
+{
+ addr.ipaddr = header.type;
+ addr.port = this->port_;
+}
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_UDP_Admin.h b/TAO/orbsvcs/orbsvcs/Event/EC_UDP_Admin.h
new file mode 100644
index 00000000000..ef2220d1714
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_UDP_Admin.h
@@ -0,0 +1,70 @@
+/* -*- C++ -*- */
+// $Id$
+//
+// ============================================================================
+//
+// = LIBRARY
+// TAO services
+//
+// = FILENAME
+// EC_UDP_Admin
+//
+// = AUTHOR
+// Carlos O'Ryan
+//
+// = DESCRIPTION
+// Simple implementations of the UDP Administration service.
+//
+// connects to a "remote" EC as a consumer, it also connects to the
+// <local> EC as a supplier of events, this later EC is usually
+// collocated.
+// The QoS parameters for both connections must be provided by the
+// user.
+// To avoid infinite loops of events the Gateway descreases the TTL
+// field of the events and will not deliver any events with TTL less
+// than or equal to 0.
+//
+// = TODO
+// The class makes an extra copy of the events, we need to
+// investigate if closer collaboration with its collocated EC could
+// be used to remove that copy.
+//
+// ============================================================================
+
+#ifndef TAO_EC_UDP_ADMIN_H
+#define TAO_EC_UDP_ADMIN_H
+
+#include "orbsvcs/RtecUDPAdminS.h"
+#include "orbsvcs/orbsvcs_export.h"
+
+class TAO_ORBSVCS_Export TAO_EC_Simple_AddrServer : public POA_RtecUDPAdmin::AddrServer
+{
+ // = TITLE
+ // TAO Real-time Event Service; a simple UDP address server.
+ //
+ // = DESCRIPTION
+ // The EC is able to use multiple multicast groups to transmit its
+ // data, the is given control over the mapping between the Event
+ // (type,source) pair and the (ipaddr,port) pair using a
+ // AddrServer.
+ // This class implements a very simple server that simply maps the
+ // <type> component to the <ipaddr> and uses a fixed <port>,
+ // provided at initialization time.
+ //
+public:
+ TAO_EC_Simple_AddrServer (CORBA::UShort port);
+ // Constructor
+
+ virtual ~TAO_EC_Simple_AddrServer (void);
+ // Destructor
+
+ // = The RtecUDPAdmin::AddrServer methods
+ virtual void get_addr (const RtecEventComm::EventHeader& header,
+ RtecUDPAdmin::UDP_Addr_out addr,
+ CORBA::Environment& env);
+
+private:
+ CORBA::UShort port_;
+};
+
+#endif /* TAO_EC_UDP_ADMIN_H */
diff --git a/TAO/orbsvcs/orbsvcs/Event/Event_Channel.cpp b/TAO/orbsvcs/orbsvcs/Event/Event_Channel.cpp
index d1c41ef578a..e050778f48c 100644
--- a/TAO/orbsvcs/orbsvcs/Event/Event_Channel.cpp
+++ b/TAO/orbsvcs/orbsvcs/Event/Event_Channel.cpp
@@ -165,7 +165,7 @@ public:
ACE_ES_Dependency_Iterator iter (consumer->qos ().dependencies);
while (iter.advance_dependency () == 0)
{
- RtecEventComm::EventType &type = (*iter).event.type_;
+ RtecEventComm::EventType &type = (*iter).event.header.type;
if (type != ACE_ES_GLOBAL_DESIGNATOR &&
type != ACE_ES_CONJUNCTION_DESIGNATOR &&
type != ACE_ES_DISJUNCTION_DESIGNATOR)
@@ -422,7 +422,7 @@ ACE_Push_Supplier_Proxy::connect_push_supplier (RtecEventComm::PushSupplier_ptr
// @@ TODO: The SupplierQOS should have a more reasonable interface to
// obtain the supplier_id(), BTW, a callback to push_supplier will
// not work: it usually results in some form of dead-lock.
- this->source_id_ = qos_.publications[0].event.source_;
+ this->source_id_ = qos_.publications[0].event.header.source;
supplier_module_->connected (this, _env);
}
@@ -438,7 +438,7 @@ ACE_Push_Supplier_Proxy::push (const RtecEventComm::EventSet &event,
{
RtecEventComm::Event& ev =
ACE_const_cast(RtecEventComm::Event&,event[i]);
- ORBSVCS_Time::hrtime_to_TimeT (ev.ec_recv_time_, ec_recv);
+ ORBSVCS_Time::hrtime_to_TimeT (ev.header.ec_recv_time, ec_recv);
}
supplier_module_->push (this, event, _env);
}
@@ -812,6 +812,21 @@ ACE_EventChannel::update_supplier_gwys (CORBA::Environment& _env)
}
}
+RtecEventChannelAdmin::Observer_Handle
+ACE_EventChannel::append_observer (RtecEventChannelAdmin::Observer_ptr,
+ CORBA::Environment &)
+{
+ return 0;
+ // @@ TODO fill in the "implementation details"
+}
+
+void
+ACE_EventChannel::remove_observer (RtecEventChannelAdmin::Observer_Handle,
+ CORBA::Environment &)
+{
+ // @@ TODO fill in the "implementation details"
+}
+
// ****************************************************************
ACE_ES_Disjunction_Group::~ACE_ES_Disjunction_Group (void)
@@ -1267,7 +1282,7 @@ ACE_ES_Consumer_Module::push (const ACE_ES_Dispatch_Request *request,
{
RtecEventComm::Event& ev =
ACE_const_cast(RtecEventComm::Event&,event_set[i]);
- ORBSVCS_Time::hrtime_to_TimeT (ev.ec_send_time_, ec_send);
+ ORBSVCS_Time::hrtime_to_TimeT (ev.header.ec_send_time, ec_send);
}
request->consumer ()->push (event_set, _env);
}
@@ -1333,9 +1348,9 @@ ACE_ES_Consumer_Module::fill_qos (RtecEventChannelAdmin::ConsumerQOS& c_qos,
CORBA::ULong cc = 0;
CORBA::ULong sc = 0;
- dep[cc].event.type_ = ACE_ES_DISJUNCTION_DESIGNATOR;
- dep[cc].event.source_ = 0;
- dep[cc].event.creation_time_ = ORBSVCS_Time::zero;
+ dep[cc].event.header.type = ACE_ES_DISJUNCTION_DESIGNATOR;
+ dep[cc].event.header.source = 0;
+ dep[cc].event.header.creation_time = ORBSVCS_Time::zero;
dep[cc].rt_info = 0;
cc++;
@@ -1354,7 +1369,7 @@ ACE_ES_Consumer_Module::fill_qos (RtecEventChannelAdmin::ConsumerQOS& c_qos,
RtecEventComm::Event& event =
c->qos ().dependencies[j].event;
- RtecEventComm::EventType type = event.type_;
+ RtecEventComm::EventType type = event.header.type;
if (type <= ACE_ES_EVENT_UNDEFINED)
continue;
@@ -1369,22 +1384,22 @@ ACE_ES_Consumer_Module::fill_qos (RtecEventChannelAdmin::ConsumerQOS& c_qos,
CORBA::ULong k;
for (k = 0; k < cc; ++k)
{
- if (dep[k].event.type_ == event.type_
- && dep[k].event.source_ == event.source_)
+ if (dep[k].event.header.type == event.header.type
+ && dep[k].event.header.source == event.header.source)
break;
}
if (k == cc)
{
- dep[cc].event.type_ = event.type_;
- dep[cc].event.source_ = event.source_;
- dep[cc].event.creation_time_ = ORBSVCS_Time::zero;
+ dep[cc].event.header.type = event.header.type;
+ dep[cc].event.header.source = event.header.source;
+ dep[cc].event.header.creation_time = ORBSVCS_Time::zero;
// The RT_Info is filled up later.
dep[cc].rt_info = 0;
cc++;
- pub[sc].event.type_ = event.type_;
- pub[sc].event.source_ = event.source_;
- pub[sc].event.creation_time_ = ORBSVCS_Time::zero;
+ pub[sc].event.header.type = event.header.type;
+ pub[sc].event.header.source = event.header.source;
+ pub[sc].event.header.creation_time = ORBSVCS_Time::zero;
pub[sc].dependency_info.dependency_type =
RtecScheduler::TWO_WAY_CALL;
pub[sc].dependency_info.number_of_calls = 1;
@@ -1471,9 +1486,9 @@ int
ACE_ES_Correlation_Module::schedule_timeout (ACE_ES_Consumer_Rep_Timeout *consumer)
{
RtecEventComm::Time &interval =
- consumer->dependency ()->event.creation_time_;
+ consumer->dependency ()->event.header.creation_time;
RtecEventComm::Time &delay =
- consumer->dependency ()->event.creation_time_;
+ consumer->dependency ()->event.header.creation_time;
// Store the preemption priority so we can cancel the correct timer.
// The priority values may change during the process lifetime (e.g.,
@@ -1528,9 +1543,9 @@ ACE_ES_Correlation_Module::reschedule_timeout (ACE_ES_Consumer_Rep_Timeout *cons
else
{
RtecEventComm::Time &interval =
- consumer->dependency ()->event.creation_time_;
+ consumer->dependency ()->event.header.creation_time;
RtecEventComm::Time &delay =
- consumer->dependency ()->event.creation_time_;
+ consumer->dependency ()->event.header.creation_time;
// Store the preemption priority so we can cancel the correct timer.
// The priority values may change during the process lifetime (e.g.,
@@ -1705,7 +1720,7 @@ ACE_ES_Consumer_Correlation::connected (ACE_Push_Consumer_Proxy *consumer,
// Keep track of how many conjunction and disjunction groups are
// registered. Update the index pointers so that the helper
// functions can update the appropriate group objects.
- switch ((*iter).event.type_)
+ switch ((*iter).event.header.type)
{
case ACE_ES_CONJUNCTION_DESIGNATOR:
cgroup_index++;
@@ -1880,8 +1895,8 @@ ACE_ES_Consumer_Correlation::get_consumer_rep (RtecEventChannelAdmin::Dependency
RtecEventComm::Event& e = consumer_reps_[x]->dependency ()->event;
// If <dependency> matches any previously subscribed consumer
// reps, we'll reuse it.
- if (e.type_ == dependency.event.type_
- && e.source_ == dependency.event.source_ )
+ if (e.header.type == dependency.event.header.type
+ && e.header.source == dependency.event.header.source )
{
rep = consumer_reps_[x];
break;
@@ -2120,8 +2135,10 @@ ACE_ES_Consumer_Rep_Timeout::execute (void)
CORBA::Environment __env;
ACE_Time_Value tv = ACE_OS::gettimeofday ();
ORBSVCS_Time::Time_Value_to_TimeT
- (timeout_event_->creation_time_, tv);
- correlation_->correlation_module_->push (this, timeout_event_, __env);
+ (timeout_event_->header.creation_time, tv);
+ correlation_->correlation_module_->push (this,
+ timeout_event_,
+ __env);
if (__env.exception () != 0)
ACE_ERROR ((LM_ERROR, "ACE_ES_Consumer_Rep_Timeout::execute: unexpected exception.\n"));
}
@@ -2174,7 +2191,7 @@ ACE_ES_Subscription_Module::connected (ACE_Push_Supplier_Proxy *supplier,
RtecEventChannelAdmin::PublicationSet &publications =
supplier->qos ().publications;
- sid = publications[0].event.source_;
+ sid = publications[0].event.header.source;
for (CORBA::ULong index=0; index < publications.length (); index++)
{
// Check to make sure an RT_Info was specified.
@@ -2188,7 +2205,7 @@ ACE_ES_Subscription_Module::connected (ACE_Push_Supplier_Proxy *supplier,
#endif
RtecEventComm::EventType event_type =
- publications[index].event.type_;
+ publications[index].event.header.type;
// @@ TODO we should throw something Check to make sure a type
// was specified.
@@ -2474,12 +2491,12 @@ ACE_ES_Subscription_Module::push_source_type (ACE_Push_Supplier_Proxy *source,
return 0;
}
- if (supplier_map.find (event->type_, subscribers) == -1)
+ if (supplier_map.find (event->header.type, subscribers) == -1)
{
ACE_DEBUG ((LM_ERROR,
"EC (%t) ACE_ES_Subscription_Module::push_source_type"
" Warning: event type %d not registered.\n",
- event->type_));
+ event->header.type));
ACE_TIMEPROBE (TAO_EVENT_CHANNEL_PUSH_SOURCE_TYPE);
return 0; // continue anyway
}
@@ -2789,23 +2806,23 @@ ACE_ES_Subscription_Module::subscribe (ACE_ES_Consumer_Rep *consumer)
int result = 0;
RtecEventComm::Event &event = consumer->dependency ()->event;
- if (event.source_ == 0)
+ if (event.header.source == 0)
// Not source-based subscription.
{
- if (event.type_ == ACE_ES_EVENT_ANY)
+ if (event.header.type == ACE_ES_EVENT_ANY)
result = this->subscribe_all (consumer);
else
- result = this->subscribe_type (consumer, event.type_);
+ result = this->subscribe_type (consumer, event.header.type);
}
else
// Source-based subscription.
{
- if (event.type_ == ACE_ES_EVENT_ANY)
- result = this->subscribe_source (consumer, event.source_);
+ if (event.header.type == ACE_ES_EVENT_ANY)
+ result = this->subscribe_source (consumer, event.header.source);
else
result = this->subscribe_source_type (consumer,
- event.source_,
- event.type_);
+ event.header.source,
+ event.header.type);
}
return result;
@@ -2823,18 +2840,18 @@ ACE_ES_Subscription_Module::unsubscribe (ACE_ES_Consumer_Rep *consumer)
RtecEventComm::Event &event = consumer->dependency ()->event;
- if (event.type_ != ACE_ES_EVENT_ANY)
+ if (event.header.type != ACE_ES_EVENT_ANY)
{
// Remove the consumer from the global type-based subscription list.
ACE_ES_Subscription_Info::remove (type_subscribers_,
consumer,
- event.type_);
+ event.header.type);
}
else
// Remove the consumer from the global source-based subscription list.
ACE_ES_Subscription_Info::remove (source_subscribers_,
consumer,
- event.source_);
+ event.header.source);
#if 0
// @@ TODO This code was removed and I'm (coryan) adding it again
@@ -2858,19 +2875,21 @@ ACE_ES_Subscription_Module::unsubscribe (ACE_ES_Consumer_Rep *consumer)
int result = 0;
- if (event.source_ == 0)
+ if (event.header.source == 0)
{
- if (event.type_ == ACE_ES_EVENT_ANY)
+ if (event.header.type == ACE_ES_EVENT_ANY)
result = this->unsubscribe_all (consumer);
else
- result = this->unsubscribe_type (consumer, event.type_);
+ result = this->unsubscribe_type (consumer, event.header.type);
}
else
{
- if (event.type_ == ACE_ES_EVENT_ANY)
- result = this->unsubscribe_source (consumer, event.source_);
+ if (event.header.type == ACE_ES_EVENT_ANY)
+ result = this->unsubscribe_source (consumer, event.header.source);
else
- result = this->unsubscribe_source_type (consumer, event.source_, event.type_);
+ result = this->unsubscribe_source_type (consumer,
+ event.header.source,
+ event.header.type);
}
return result;
#else
@@ -2943,13 +2962,13 @@ ACE_ES_Subscription_Module::unsubscribe_source_type (ACE_ES_Consumer_Rep *consum
Supplier_Iterator iter (all_suppliers_);
// Step through all supplier proxies looking for a match to the
- // consumer's event.source_. This is the same as unsubscribe_type,
- // only we can check the source first.
+ // consumer's event.header.source. This is the same as
+ // unsubscribe_type, only we can check the source first.
for (ACE_Push_Supplier_Proxy **proxy = 0;
iter.next (proxy) != 0;
iter.advance ())
// If the proxy matches the source id we're looking for, try to
- // remove <consumer> from the proxy's <event.type_> set.
+ // remove <consumer> from the proxy's <event.header.type> set.
if ((**proxy) == source)
{
ACE_ES_WGUARD mon ((*proxy)->subscription_info ().lock_);
@@ -3228,9 +3247,9 @@ ACE_ES_Supplier_Module::fill_qos (RtecEventChannelAdmin::ConsumerQOS& c_qos,
CORBA::ULong cc = 0;
CORBA::ULong sc = 0;
- dep[cc].event.type_ = ACE_ES_DISJUNCTION_DESIGNATOR;
- dep[cc].event.source_ = 0;
- dep[cc].event.creation_time_ = ORBSVCS_Time::zero;
+ dep[cc].event.header.type = ACE_ES_DISJUNCTION_DESIGNATOR;
+ dep[cc].event.header.source = 0;
+ dep[cc].event.header.creation_time = ORBSVCS_Time::zero;
dep[cc].rt_info = 0;
cc++;
@@ -3249,7 +3268,7 @@ ACE_ES_Supplier_Module::fill_qos (RtecEventChannelAdmin::ConsumerQOS& c_qos,
RtecEventComm::Event& event =
s->qos ().publications[j].event;
- RtecEventComm::EventType type = event.type_;
+ RtecEventComm::EventType type = event.header.type;
if (type <= ACE_ES_EVENT_UNDEFINED)
continue;
@@ -3264,22 +3283,22 @@ ACE_ES_Supplier_Module::fill_qos (RtecEventChannelAdmin::ConsumerQOS& c_qos,
CORBA::ULong k;
for (k = 0; k < sc; ++k)
{
- if (pub[k].event.type_ == event.type_
- && pub[k].event.source_ == event.source_)
+ if (pub[k].event.header.type == event.header.type
+ && pub[k].event.header.source == event.header.source)
break;
}
if (k == sc)
{
- dep[cc].event.type_ = event.type_;
- dep[cc].event.source_ = event.source_;
- dep[cc].event.creation_time_ = ORBSVCS_Time::zero;
+ dep[cc].event.header.type = event.header.type;
+ dep[cc].event.header.source = event.header.source;
+ dep[cc].event.header.creation_time = ORBSVCS_Time::zero;
// The RT_Info is filled up later.
dep[cc].rt_info = 0;
cc++;
- pub[sc].event.type_ = event.type_;
- pub[sc].event.source_ = event.source_;
- pub[sc].event.creation_time_ = ORBSVCS_Time::zero;
+ pub[sc].event.header.type = event.header.type;
+ pub[sc].event.header.source = event.header.source;
+ pub[sc].event.header.creation_time = ORBSVCS_Time::zero;
pub[sc].dependency_info.dependency_type =
RtecScheduler::TWO_WAY_CALL;
pub[sc].dependency_info.number_of_calls = 1;
@@ -3386,10 +3405,10 @@ dump_event (const RtecEventComm::Event &event)
ACE_DEBUG ((LM_DEBUG, "source_ = %d "
"type_ = %d "
"time_ = %u.\n",
- (void*)event.source_,
- event.type_,
+ (void*)event.header.source,
+ event.header.type,
// The divide-by-1 is for ACE_U_LongLong support.
- ORBSVCS_Time::to_hrtime (event.creation_time_) / 1));
+ ORBSVCS_Time::to_hrtime (event.header.creation_time) / 1));
}
// ************************************************************
diff --git a/TAO/orbsvcs/orbsvcs/Event/Event_Channel.h b/TAO/orbsvcs/orbsvcs/Event/Event_Channel.h
index a73b8e27a59..d84afcd5a3e 100644
--- a/TAO/orbsvcs/orbsvcs/Event/Event_Channel.h
+++ b/TAO/orbsvcs/orbsvcs/Event/Event_Channel.h
@@ -169,11 +169,12 @@ class TAO_Module_Factory;
// ec..
class TAO_ORBSVCS_Export ACE_EventChannel : public POA_RtecEventChannelAdmin::EventChannel
// = TITLE
-// ACE Event Channel.
+// TAO's Real-time Event Channel.
//
// = DESCRIPTION
-// Implementation of COSS Event Channel. For more detailed
-// information, see http://www.cs.wustl.edu/~mda/event.html.
+// This class implements the interface defined in
+// RtecEventChannelAdmin.idl. For more details check:
+// http://www.cs.wustl.edu/~coryan/EC/JSAC98.pdf
{
public:
enum { INITIAL_STATE = 0,
@@ -191,20 +192,6 @@ public:
virtual ~ACE_EventChannel (void);
// Calls destroy.
- // = Accessor methods to Event Channel objects. The Event Channel
- // acts as a sort of service repository of object references. All
- // objects in the Event Service come to this interface to obtain
- // object references during initialization.
-
- virtual RtecEventChannelAdmin::ConsumerAdmin_ptr for_consumers (CORBA::Environment &);
- // Consumer administration factory method.
-
- virtual RtecEventChannelAdmin::SupplierAdmin_ptr for_suppliers (CORBA::Environment &);
- // Supplier administration factory method.
-
- virtual void destroy (CORBA::Environment &);
- // Explicitly shut down the channel.
-
RtecEventChannelAdmin::EventChannel_ptr get_ref (CORBA::Environment &);
// Allow transformations to RtecEventChannelAdmin::EventChannel.
@@ -249,9 +236,31 @@ public:
void update_supplier_gwys (CORBA::Environment& _env);
// The consumer (or supplier) list has changed, thus the EC has to
// inform any gateways it has.
- // TODO: currently we only support consumer gateways.
ACE_Task_Manager* task_manager (void) const;
+ // Each Event Channel has its own Task_Manager to handle timers.
+
+ // = The RtecEventChannelAdmin::EventChannel methods.
+
+ virtual RtecEventChannelAdmin::ConsumerAdmin_ptr
+ for_consumers (CORBA::Environment &);
+ // In this implementation of the EC this returns the interface for
+ // the Consumer_Module.
+
+ virtual RtecEventChannelAdmin::SupplierAdmin_ptr
+ for_suppliers (CORBA::Environment &);
+ // Return an interface to the Supplier_Module.
+
+ virtual void destroy (CORBA::Environment &);
+ // Shutdown the EC, free all resources, stop all threads and then
+ // shutdown the server where the Servant is running.
+
+ virtual RtecEventChannelAdmin::Observer_Handle
+ append_observer (RtecEventChannelAdmin::Observer_ptr observer,
+ CORBA::Environment &env);
+ virtual void remove_observer (RtecEventChannelAdmin::Observer_Handle,
+ CORBA::Environment &env);
+ // The observer manipulators
private:
ACE_RTU_Manager *rtu_manager_;
diff --git a/TAO/orbsvcs/orbsvcs/Event/Event_Channel.i b/TAO/orbsvcs/orbsvcs/Event/Event_Channel.i
index 83b979ab94d..844831376e0 100644
--- a/TAO/orbsvcs/orbsvcs/Event/Event_Channel.i
+++ b/TAO/orbsvcs/orbsvcs/Event/Event_Channel.i
@@ -154,15 +154,15 @@ operator == (const RtecEventComm::Event &event1,
const RtecEventComm::Event &event2)
{
// Check if the sources are equal. 0 is a wildcard.
- if ((event1.source_ != 0)
- && (event2.source_ != 0)
- && (event1.source_ != event2.source_))
+ if ((event1.header.source != 0)
+ && (event2.header.source != 0)
+ && (event1.header.source != event2.header.source))
return 0;
// Check if the types are equal. ACE_ES_EVENT_ANY is a wildcard.
- if ((event1.type_ != ACE_ES_EVENT_ANY) &&
- (event2.type_ != ACE_ES_EVENT_ANY) &&
- (event1.type_ != event2.type_))
+ if ((event1.header.type != ACE_ES_EVENT_ANY) &&
+ (event2.header.type != ACE_ES_EVENT_ANY) &&
+ (event1.header.type != event2.header.type))
return 0;
return 1;
@@ -559,7 +559,7 @@ ACE_ES_Dependency_Iterator::parse (void)
if (rt_info_ == 0)
rt_info_ = rep_[x].rt_info;
- switch (rep_[x].event.type_)
+ switch (rep_[x].event.header.type)
{
case ACE_ES_CONJUNCTION_DESIGNATOR:
n_conjunctions_++;
diff --git a/TAO/orbsvcs/orbsvcs/Event_Utilities.cpp b/TAO/orbsvcs/orbsvcs/Event_Utilities.cpp
index d88c19c18cc..7e62bc12466 100644
--- a/TAO/orbsvcs/orbsvcs/Event_Utilities.cpp
+++ b/TAO/orbsvcs/orbsvcs/Event_Utilities.cpp
@@ -23,10 +23,9 @@ ACE_ConsumerQOS_Factory::start_conjunction_group (void)
{
int l = qos_.dependencies.length ();
qos_.dependencies.length (l + 1);
- qos_.dependencies[l].event.type_ = ACE_ES_CONJUNCTION_DESIGNATOR;
+ qos_.dependencies[l].event.header.type = ACE_ES_CONJUNCTION_DESIGNATOR;
qos_.dependencies[l].rt_info = 0;
- // TODO: qos_.dependencies[l].event.data_.lval (0);
- designator_set_ = 1;
+ this->designator_set_ = 1;
return 0;
}
@@ -35,10 +34,9 @@ ACE_ConsumerQOS_Factory::start_disjunction_group (void)
{
int l = qos_.dependencies.length ();
qos_.dependencies.length (l + 1);
- qos_.dependencies[l].event.type_ = ACE_ES_DISJUNCTION_DESIGNATOR;
+ qos_.dependencies[l].event.header.type = ACE_ES_DISJUNCTION_DESIGNATOR;
qos_.dependencies[l].rt_info = 0;
- // TODO: qos_.dependencies[l].event.data_.lval (0);
- designator_set_ = 1;
+ this->designator_set_ = 1;
return 0;
}
@@ -52,16 +50,14 @@ ACE_ConsumerQOS_Factory::insert (const RtecEventChannelAdmin::Dependency &subscr
int l = qos_.dependencies.length ();
qos_.dependencies.length (l + 1);
qos_.dependencies[l].rt_info = 0;
- qos_.dependencies[l].event.type_ = ACE_ES_GLOBAL_DESIGNATOR;
+ qos_.dependencies[l].event.header.type = ACE_ES_GLOBAL_DESIGNATOR;
- // TODO: IDL union qos_.dependencies[l].event.data_.lval (0);
this->designator_set_ = 1;
}
int l = qos_.dependencies.length ();
qos_.dependencies.length (l + 1);
qos_.dependencies[l] = subscribe;
- // TODO: IDL union qos_.dependencies[l].event.data_.lval (0);
return 0;
}
@@ -72,8 +68,8 @@ void event_debug (const char* p,
ACE_DEBUG ((LM_DEBUG,
"%*.*s - event.source: %d\n"
"%*.*s event.type: %d\n",
- l, l, p, event.source_,
- l, l, p, event.type_));
+ l, l, p, event.header.source,
+ l, l, p, event.header.type));
}
void
@@ -107,8 +103,8 @@ ACE_SupplierQOS_Factory::insert (RtecEventComm::EventSourceID sid,
{
int l = qos_.publications.length ();
qos_.publications.length (l + 1);
- qos_.publications[l].event.source_ = sid;
- qos_.publications[l].event.type_ = type;
+ qos_.publications[l].event.header.source = sid;
+ qos_.publications[l].event.header.type = type;
// TODO: IDL union qos_.publications[l].event.data_.lval (0);
qos_.publications[l].dependency_info.rt_info = rt_info;
qos_.publications[l].dependency_info.number_of_calls = ncalls;
diff --git a/TAO/orbsvcs/orbsvcs/Event_Utilities.i b/TAO/orbsvcs/orbsvcs/Event_Utilities.i
index 58423e9af27..53910d1ac1c 100644
--- a/TAO/orbsvcs/orbsvcs/Event_Utilities.i
+++ b/TAO/orbsvcs/orbsvcs/Event_Utilities.i
@@ -8,11 +8,11 @@ ACE_ConsumerQOS_Factory::insert (RtecEventComm::EventSourceID source,
RtecScheduler::handle_t rt_info)
{
RtecEventChannelAdmin::Dependency dependency;
- dependency.event.source_ = source;
- dependency.event.type_ = type;
- //dependency.event.creation_time_ = 0;
- //dependency.event.ec_recv_time_ = 0;
- //dependency.event.ec_send_time_ = 0;
+ dependency.event.header.source = source;
+ dependency.event.header.type = type;
+ //dependency.event.header.creation_time = 0;
+ //dependency.event.header.ec_recv_time = 0;
+ //dependency.event.header.ec_send_time = 0;
dependency.rt_info = rt_info;
return this->insert (dependency);
}
@@ -22,11 +22,11 @@ ACE_ConsumerQOS_Factory::insert_type (RtecEventComm::EventType type,
RtecScheduler::handle_t rt_info)
{
RtecEventChannelAdmin::Dependency dependency;
- dependency.event.source_ = 0;
- dependency.event.type_ = type;
- //dependency.event.creation_time_ = 0;
- //dependency.event.ec_recv_time_ = 0;
- //dependency.event.ec_send_time_ = 0;
+ dependency.event.header.source = 0;
+ dependency.event.header.type = type;
+ //dependency.event.header.creation_time = 0;
+ //dependency.event.header.ec_recv_time = 0;
+ //dependency.event.header.ec_send_time = 0;
dependency.rt_info = rt_info;
return this->insert (dependency);
}
@@ -36,11 +36,11 @@ ACE_ConsumerQOS_Factory::insert_source (RtecEventComm::EventSourceID source,
RtecScheduler::handle_t rt_info)
{
RtecEventChannelAdmin::Dependency dependency;
- dependency.event.source_ = source;
- dependency.event.type_ = ACE_ES_EVENT_ANY;
- //dependency.event.creation_time_ = 0;
- //dependency.event.ec_recv_time_ = 0;
- //dependency.event.ec_send_time_ = 0;
+ dependency.event.header.source = source;
+ dependency.event.header.type = ACE_ES_EVENT_ANY;
+ //dependency.event.header.creation_time = 0;
+ //dependency.event.header.ec_recv_time = 0;
+ //dependency.event.header.ec_send_time = 0;
dependency.rt_info = rt_info;
return this->insert (dependency);
}
@@ -51,11 +51,11 @@ ACE_ConsumerQOS_Factory::insert_time (RtecEventComm::EventType type,
RtecScheduler::handle_t rt_info)
{
RtecEventChannelAdmin::Dependency dependency;
- dependency.event.source_ = 0;
- dependency.event.type_ = type;
- dependency.event.creation_time_ = interval;
- //dependency.event.ec_recv_time_ = 0;
- //dependency.event.ec_send_time_ = 0;
+ dependency.event.header.source = 0;
+ dependency.event.header.type = type;
+ dependency.event.header.creation_time = interval;
+ //dependency.event.header.ec_recv_time = 0;
+ //dependency.event.header.ec_send_time = 0;
dependency.rt_info = rt_info;
return this->insert (dependency);
}
@@ -64,12 +64,12 @@ ACE_INLINE int
ACE_ConsumerQOS_Factory::insert_act (RtecEventComm::EventData act)
{
RtecEventChannelAdmin::Dependency dependency;
- dependency.event.source_ = 0;
- dependency.event.type_ = ACE_ES_EVENT_ACT;
- //dependency.event.creation_time_ = 0;
- //dependency.event.ec_recv_time_ = 0;
- //dependency.event.ec_send_time_ = 0;
- dependency.event.data_ = act;
+ dependency.event.header.source = 0;
+ dependency.event.header.type = ACE_ES_EVENT_ACT;
+ //dependency.event.header.creation_time = 0;
+ //dependency.event.header.ec_recv_time = 0;
+ //dependency.event.header.ec_send_time = 0;
+ dependency.event.data = act;
return this->insert (dependency);
}
diff --git a/TAO/orbsvcs/orbsvcs/Makefile b/TAO/orbsvcs/orbsvcs/Makefile
index 7ec933dfaef..d1edaf72a6b 100644
--- a/TAO/orbsvcs/orbsvcs/Makefile
+++ b/TAO/orbsvcs/orbsvcs/Makefile
@@ -37,6 +37,7 @@ IDL_SRCS= \
RtecEventComm \
RtecScheduler \
RtecEventChannelAdmin \
+ RtecUDPAdmin \
LifeCycleService \
CosTrading \
AVStreams \
@@ -69,6 +70,7 @@ FILES= $(IDL_FILES) \
Event/Task_Manager \
Event/EC_Gateway \
Event/EC_Gateway_UDP \
+ Event/EC_UDP_Admin \
Event/Module_Factory \
\
Sched/Config_Scheduler \
@@ -138,6 +140,7 @@ IDL_EXT=C.h C.i C.cpp S.h S.i S.cpp S_T.h S_T.i S_T.cpp
#
# Extra dependencies not catched by make depend.
#
+$(foreach ext, $(IDL_EXT), RtecUDPAdmin(ext)): RtecEventComm.idl
$(foreach ext, $(IDL_EXT), RtecScheduler$(ext)): CosTimeBase.idl
$(foreach ext, $(IDL_EXT), RtecEventComm$(ext)): CosTimeBase.idl
$(foreach ext, $(IDL_EXT), RtecEventChannelAdmin$(ext)): CosTimeBase.idl
diff --git a/TAO/orbsvcs/orbsvcs/RtecEventChannelAdmin.idl b/TAO/orbsvcs/orbsvcs/RtecEventChannelAdmin.idl
index 081c48ef498..a47ffc32cce 100644
--- a/TAO/orbsvcs/orbsvcs/RtecEventChannelAdmin.idl
+++ b/TAO/orbsvcs/orbsvcs/RtecEventChannelAdmin.idl
@@ -31,30 +31,109 @@ module RtecEventChannelAdmin {
boolean is_gateway;
};
- interface ProxyPushConsumer: RtecEventComm::PushConsumer {
- void connect_push_supplier(
- in RtecEventComm::PushSupplier push_supplier,
- in SupplierQOS qos) raises(AlreadyConnected);
- };
-
interface ProxyPushSupplier: RtecEventComm::PushSupplier {
+ // = TITLE
+ // The Proxy Supplier
+ //
+ // = DESCRIPTION
+ // Consumers receive their events from objects of this type. See
+ // the interfaces below to see how to gain access to an object
+ // reference of this type.
+
void connect_push_consumer(
in RtecEventComm::PushConsumer push_consumer,
- in ConsumerQOS qos) raises(AlreadyConnected, TypeError);
+ in ConsumerQOS qos)
+ raises(AlreadyConnected, TypeError);
+ // Before receiving any events the consumer must provide its
+ // publication list and QoS information to the Event Channel
+ // through this method.
+
void suspend_connection ();
+ // Temporarly suspend reception of events from the Event
+ // Channel. Calling this method is more efficient than dropping
+ // them on the receiving end and less expensive than disconnecting
+ // and connecting again (but it is not free!!)
+
void resume_connection ();
+ // Resume the reception of events.
+ };
+
+ interface ProxyPushConsumer: RtecEventComm::PushConsumer {
+ // = TITLE
+ // The Proxy Consumer
+ //
+ // = DESCRIPTION
+ // Suppliers push their events to objects of this type. See the
+ // interfaces below to see how to gain access to an object
+ // reference of this type.
+
+ void connect_push_supplier(
+ in RtecEventComm::PushSupplier push_supplier,
+ in SupplierQOS qos)
+ raises(AlreadyConnected);
+ // Before pushing events the supplier must provide its publication
+ // list and QoS information to the Event Channel through this
+ // method.
};
- // TODO: Find out the exception specs for the following interface's
+ // @@ TODO: Find out the exception specs for the following interface's
// methods.
interface ConsumerAdmin {
+ // = TITLE
+ // The Supplier factory
+ //
+ // = DESCRIPTION
+ // Consumers use this interface to create suppliers they can
+ // connect to.
+
ProxyPushSupplier obtain_push_supplier();
+ // Obtain a supplier
};
interface SupplierAdmin {
+ // = TITLE
+ // The Consumer factory
+ //
+ // = DESCRIPTION
+ // Suppliers use this interface to create consumers they can
+ // connect to.
+
ProxyPushConsumer obtain_push_consumer();
+ // Obtain a consumer
};
+ interface Observer {
+ // = TITLE
+ // Observes any changes in the consumer or supplier sets for an
+ // Event Channel
+ //
+ // = DESCRIPTION
+ // This object receives updates from Event Channels with any
+ // changes on set of consumer and or suppliers registered with
+ // the Event Channel.
+
+ void update_consumer (in ConsumerQOS sub);
+ // A change in the list of consumers has ocurred. The disjunction
+ // of the subscriptions (and its equivalent form ) is
+ // passed to the observer.
+
+ void update_supplier (in SupplierQOS pub);
+ // A change in the list of consumers has ocurred. The disjunction
+ // of the publications (and its equivalent form for suppliers).
+ };
+
+ typedef unsigned long long Observer_Handle;
+ // This is used as an opaque ID to control the addition and removal
+ // of handles from an event channel.
+
interface EventChannel {
+ // = TITLE
+ // The Event Channel class
+ //
+ // = DESCRIPTION
+ // This class provides the main entry point for the Event
+ // Channel. The class follows a protocol similar to the
+ // COS Event Service as described in the CORBAservices spec.
+ //
exception SYNCHRONIZATION_ERROR {};
exception QOS_ERROR {};
exception SUBSCRIPTION_ERROR {};
@@ -62,11 +141,24 @@ module RtecEventChannelAdmin {
exception DISPATCH_ERROR {};
ConsumerAdmin for_consumers();
+ // Consumers call this method to gain access to the
+ // ProxyPushSupplier factory.
+
SupplierAdmin for_suppliers();
+ // Suppliers call this method to gain access to the
+ // ProxyPushConsumer factory.
void destroy ();
- };
-
-};
+ // This method shutdown the Event Channel, destroy any resources
+ // for it and actually shutdown the server where the Event Channel
+ // is running.
+ Observer_Handle append_observer (in Observer gw);
+ // Add a gateway to the Event Channel, the handle returned must be
+ // used to remove the gateway from the ORB.
+ void remove_observer (in Observer_Handle gw);
+ // Remove the observer.
+ // @@ TODO: We should raise something if the handle is invalid.
+ };
+};
diff --git a/TAO/orbsvcs/orbsvcs/RtecEventComm.idl b/TAO/orbsvcs/orbsvcs/RtecEventComm.idl
index 25f39626f52..3a939249656 100644
--- a/TAO/orbsvcs/orbsvcs/RtecEventComm.idl
+++ b/TAO/orbsvcs/orbsvcs/RtecEventComm.idl
@@ -18,14 +18,6 @@ module RtecEventComm {
// Users willing to implement their own marshalling may use a
// sequence of octet.
-#if 0
- union EventData switch(long) {
- case 1: double dval;
- case 2: string sval;
- case 3: sequence<octet> bval;
- default: long lval;
- };
-#else
typedef sequence<octet> EventPayload;
struct EventData {
long x;
@@ -44,45 +36,54 @@ module RtecEventComm {
any any_value;
#endif /* TAO_LACKS_EVENT_CHANNEL_ANY */
};
-#endif
typedef TimeBase::TimeT Time;
typedef long EventSourceID;
typedef long EventType;
- struct Event
+ struct EventHeader
{
// = TITLE
- // The Event structure.
+ // The Event Header
//
// = DESCRIPTION
- // Events are represented by this structure, it is simply a
- // header,data pair.
- //
-
- EventType type_;
+ // Each event carries some information to do filtering,
+ // correlation, etc.
+ EventType type;
// The event type.
// This may be different from the discriminator in the EventData
// union above, the motivation is to allow filtering by data
// contents: different event types are assigned to different data
// contents though they use the same discriminator.
- EventSourceID source_;
+ EventSourceID source;
// Some way to identify the supplier.
- long ttl_;
+ long ttl;
// The "Time To Live" count, each time an EC process the event it
// decreases the TTL field, when it gets to zero the message is no
// longer forwarded.
- Time creation_time_;
- Time ec_recv_time_;
- Time ec_send_time_;
+ Time creation_time;
+ Time ec_recv_time;
+ Time ec_send_time;
// Some timestamps, they actually belong in the payload, for some
// kind of measument event.
+ };
+
+ struct Event
+ {
+ // = TITLE
+ // The Event structure.
+ //
+ // = DESCRIPTION
+ // Events are represented by this structure, it is simply a
+ // header,data pair.
+ //
+ EventHeader header;
- EventData data_;
+ EventData data;
// The event payload.
};
typedef sequence<Event> EventSet;
diff --git a/TAO/orbsvcs/orbsvcs/RtecUDPAdmin.idl b/TAO/orbsvcs/orbsvcs/RtecUDPAdmin.idl
new file mode 100644
index 00000000000..6f5d043d5c5
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/RtecUDPAdmin.idl
@@ -0,0 +1,28 @@
+//
+// $Id$
+//
+#include "RtecEventComm.idl"
+
+module RtecUDPAdmin {
+ // = TITLE
+ // Multicast Administration module
+ //
+ // = DESCRIPTION
+ // When the EC is used as an interface to multicast communication
+ // a mapping between event types and multicast addresses must be
+ // stablished.
+
+ struct UDP_Addr {
+ unsigned long ipaddr;
+ unsigned short port;
+ };
+
+ interface AddrServer {
+ void get_addr (in RtecEventComm::EventHeader header,
+ out UDP_Addr addr);
+ // Get the addr and port given the event header.
+ };
+
+};
+
+
diff --git a/TAO/orbsvcs/tests/EC_Custom_Marshal/ECM_Consumer.cpp b/TAO/orbsvcs/tests/EC_Custom_Marshal/ECM_Consumer.cpp
index e2d60d79670..7119810e965 100644
--- a/TAO/orbsvcs/tests/EC_Custom_Marshal/ECM_Consumer.cpp
+++ b/TAO/orbsvcs/tests/EC_Custom_Marshal/ECM_Consumer.cpp
@@ -214,7 +214,7 @@ Driver::push_consumer (void* consumer_cookie,
{
const RtecEventComm::Event& e = events[i];
- if (e.data_.payload.mb () == 0)
+ if (e.data.payload.mb () == 0)
{
ACE_DEBUG ((LM_DEBUG, "No data in event[%d]\n", i));
continue;
@@ -228,10 +228,10 @@ Driver::push_consumer (void* consumer_cookie,
// already!)?
// Note that there is no copying
- int byte_order = e.data_.payload[0];
+ int byte_order = e.data.payload[0];
ACE_Message_Block* mb =
- ACE_Message_Block::duplicate (e.data_.payload.mb ());
+ ACE_Message_Block::duplicate (e.data.payload.mb ());
mb->rd_ptr (1); // skip the byte order
TAO_InputCDR cdr (mb, byte_order);
diff --git a/TAO/orbsvcs/tests/EC_Custom_Marshal/ECM_Supplier.cpp b/TAO/orbsvcs/tests/EC_Custom_Marshal/ECM_Supplier.cpp
index 2c1f21924df..1dfd8b4bd6d 100644
--- a/TAO/orbsvcs/tests/EC_Custom_Marshal/ECM_Supplier.cpp
+++ b/TAO/orbsvcs/tests/EC_Custom_Marshal/ECM_Supplier.cpp
@@ -246,27 +246,27 @@ ECMS_Driver::supplier_task (Test_Supplier *supplier,
{
RtecEventComm::EventSet event (1);
event.length (1);
- event[0].source_ = supplier->supplier_id ();
- event[0].ttl_ = 1;
+ event[0].header.source = supplier->supplier_id ();
+ event[0].header.ttl = 1;
ACE_hrtime_t t = ACE_OS::gethrtime ();
- ORBSVCS_Time::hrtime_to_TimeT (event[0].creation_time_, t);
- event[0].ec_recv_time_ = ORBSVCS_Time::zero;
- event[0].ec_send_time_ = ORBSVCS_Time::zero;
+ ORBSVCS_Time::hrtime_to_TimeT (event[0].header.creation_time, t);
+ event[0].header.ec_recv_time = ORBSVCS_Time::zero;
+ event[0].header.ec_send_time = ORBSVCS_Time::zero;
if (i == ACE_static_cast (CORBA::Long, this->event_count_) - 1)
- event[0].type_ = ACE_ES_EVENT_SHUTDOWN;
+ event[0].header.type = ACE_ES_EVENT_SHUTDOWN;
else if (i % 2 == 0)
- event[0].type_ = this->event_a_;
+ event[0].header.type = this->event_a_;
else
- event[0].type_ = this->event_b_;
+ event[0].header.type = this->event_b_;
- event[0].data_.x = 0;
- event[0].data_.y = 0;
+ event[0].data.x = 0;
+ event[0].data.y = 0;
// We use replace to minimize the copies, this should result
// in just one memory allocation;
- event[0].data_.payload.replace (mblen, mb);
+ event[0].data.payload.replace (mblen, mb);
supplier->consumer_proxy ()->push(event, TAO_TRY_ENV);
TAO_CHECK_ENV;
diff --git a/TAO/orbsvcs/tests/EC_Mcast/EC_Mcast.cpp b/TAO/orbsvcs/tests/EC_Mcast/EC_Mcast.cpp
index ab30bcbbfc5..160e8dcb7c3 100644
--- a/TAO/orbsvcs/tests/EC_Mcast/EC_Mcast.cpp
+++ b/TAO/orbsvcs/tests/EC_Mcast/EC_Mcast.cpp
@@ -3,6 +3,7 @@
#include "ace/Get_Opt.h"
#include "ace/Auto_Ptr.h"
#include "ace/Sched_Params.h"
+#include "ace/Read_Buffer.h"
#include "orbsvcs/Event_Utilities.h"
#include "orbsvcs/Event_Service_Constants.h"
@@ -14,48 +15,34 @@
#include "orbsvcs/Event/Event_Channel.h"
#include "EC_Mcast.h"
-ACE_RCSID(EC_Mcast, EC_Mcast, "$Id$")
+#if !defined (__ACE_INLINE__)
+#include "EC_Mcast.i"
+#endif /* __ACE_INLINE__ */
-#define ECM_DEFAULT_SEND_MCAST_GROUP "224.9.9.1"
-#define ECM_DEFAULT_RECV_MCAST_GROUP "224.9.9.2"
+ACE_RCSID(EC_Mcast, EC_Mcast, "$Id$")
ECM_Driver::ECM_Driver (void)
: lcl_name_ ("ECM"),
- short_circuit_ (0),
- n_suppliers_ (1),
- n_consumers_ (1),
- workload_ (10),
event_period_ (25000),
- event_count_ (200),
- s_event_a_ (ACE_ES_EVENT_UNDEFINED),
- s_event_b_ (ACE_ES_EVENT_UNDEFINED + 1),
- c_event_a_ (ACE_ES_EVENT_UNDEFINED),
- c_event_b_ (ACE_ES_EVENT_UNDEFINED + 1),
- r_event_a_ (ACE_ES_EVENT_UNDEFINED),
- r_event_b_ (ACE_ES_EVENT_UNDEFINED + 1),
- schedule_file_ (0),
- pid_file_name_ (0),
- send_mcast_group_ (u_short(23000), ECM_DEFAULT_SEND_MCAST_GROUP),
- recv_mcast_group_ (u_short(23001), ECM_DEFAULT_RECV_MCAST_GROUP),
- mcast_eh_ (&receiver_),
- ready_ (0),
- ready_cnd_ (ready_mtx_)
+ event_count_ (100),
+ config_filename_ (0),
+ pid_filename_ (0),
+ local_federations_count_ (0),
+ all_federations_count_ (0)
{
}
-
-
int
ECM_Driver::run (int argc, char* argv[])
{
TAO_TRY
{
- CORBA::ORB_var orb =
+ this->orb_ =
CORBA::ORB_init (argc, argv, "", TAO_TRY_ENV);
TAO_CHECK_ENV;
CORBA::Object_var poa_object =
- orb->resolve_initial_references("RootPOA");
+ this->orb_->resolve_initial_references("RootPOA");
if (CORBA::is_nil (poa_object.in ()))
ACE_ERROR_RETURN ((LM_ERROR,
" (%P|%t) Unable to initialize the POA.\n"),
@@ -72,44 +59,62 @@ ECM_Driver::run (int argc, char* argv[])
if (this->parse_args (argc, argv))
return 1;
+ if (this->parse_config_file ())
+ return 1;
+
ACE_DEBUG ((LM_DEBUG,
"Execution parameters:\n"
" lcl name = <%s>\n"
- " short circuit = <%d>\n"
- " suppliers = <%d>\n"
- " consumers = <%d>\n"
- " workload = <%d> (iterations)\n"
" event period = <%d> (usecs)\n"
" event count = <%d>\n"
- " supplier Event A = <%d>\n"
- " supplier Event B = <%d>\n"
- " consumer Event A = <%d>\n"
- " consumer Event B = <%d>\n"
- " remote Event A = <%d>\n"
- " remote Event B = <%d>\n"
- " schedule_file = <%s>\n"
+ " config file name = <%s>\n"
" pid file name = <%s>\n",
this->lcl_name_?this->lcl_name_:"nil",
- this->short_circuit_,
- this->n_suppliers_,
- this->n_consumers_,
- this->workload_,
this->event_period_,
this->event_count_,
- this->s_event_a_,
- this->s_event_b_,
- this->c_event_a_,
- this->c_event_b_,
- this->r_event_a_,
- this->r_event_b_,
-
- this->schedule_file_?this->schedule_file_:"nil",
- this->pid_file_name_?this->pid_file_name_:"nil") );
- if (this->pid_file_name_ != 0)
+ this->config_filename_?this->config_filename_:"nil",
+ this->pid_filename_?this->pid_filename_:"nil") );
+
+ int i;
+ for (i = 0; i < this->local_federations_count_; ++i)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ " name = <%s>\n"
+ " port = <%d>\n"
+ " supplier types:\n",
+ this->local_federations_[i]->name ()?this->local_federations_[i]->name ():"nil",
+ this->local_federations_[i]->mcast_port ()));
+ int j;
+ for (j = 0;
+ j < this->local_federations_[i]->supplier_types ();
+ ++j)
+ {
+
+ ACE_DEBUG ((LM_DEBUG,
+ " name = <%s>\n"
+ " ipadd = <%x>\n",
+ this->local_federations_[i]->supplier_name (j),
+ this->local_federations_[i]->supplier_ipaddr (j)));
+ }
+ ACE_DEBUG ((LM_DEBUG,
+ " consumer types:\n"));
+ for (j = 0;
+ j < this->local_federations_[i]->consumer_types ();
+ ++j)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ " name = <%s>\n"
+ " ipadd = <%x>\n",
+ this->local_federations_[i]->consumer_name (j),
+ this->local_federations_[i]->consumer_ipaddr (j)));
+ }
+ }
+
+ if (this->pid_filename_ != 0)
{
- FILE* pid = ACE_OS::fopen (this->pid_file_name_, "w");
+ FILE* pid = ACE_OS::fopen (this->pid_filename_, "w");
if (pid != 0)
{
ACE_OS::fprintf (pid, "%d\n", ACE_OS::getpid ());
@@ -117,6 +122,7 @@ ECM_Driver::run (int argc, char* argv[])
}
}
+#if 0
int min_priority =
ACE_Sched_Params::priority_min (ACE_SCHED_FIFO);
// Enable FIFO scheduling, e.g., RT scheduling class on Solaris.
@@ -136,11 +142,12 @@ ECM_Driver::run (int argc, char* argv[])
if (ACE_OS::thr_setprio (min_priority) == -1)
{
- ACE_ERROR ((LM_ERROR, "(%P|%t) main thr_setprio failed\n"));
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) main thr_setprio failed\n"));
}
+#endif /* 0 */
CORBA::Object_var naming_obj =
- orb->resolve_initial_references ("NameService");
+ this->orb_->resolve_initial_references ("NameService");
if (CORBA::is_nil (naming_obj.in ()))
ACE_ERROR_RETURN ((LM_ERROR,
" (%P|%t) Unable to get the Naming Service.\n"),
@@ -161,7 +168,7 @@ ECM_Driver::run (int argc, char* argv[])
char buf[bufsize];
CORBA::String_var str =
- orb->object_to_string (scheduler.in (), TAO_TRY_ENV);
+ this->orb_->object_to_string (scheduler.in (), TAO_TRY_ENV);
TAO_CHECK_ENV;
ACE_DEBUG ((LM_DEBUG, "The (local) scheduler IOR is <%s>\n",
str.in ()));
@@ -189,7 +196,7 @@ ECM_Driver::run (int argc, char* argv[])
ec_impl._this (TAO_TRY_ENV);
TAO_CHECK_ENV;
- str = orb->object_to_string (ec.in (), TAO_TRY_ENV);
+ str = this->orb_->object_to_string (ec.in (), TAO_TRY_ENV);
TAO_CHECK_ENV;
ACE_DEBUG ((LM_DEBUG, "The (local) EC IOR is <%s>\n", str.in ()));
@@ -218,33 +225,33 @@ ECM_Driver::run (int argc, char* argv[])
ACE_DEBUG ((LM_DEBUG, "located local EC\n"));
- this->connect_ecg (local_ec.in (), TAO_TRY_ENV);
+ this->open_federations (local_ec.in (),
+ scheduler.in (),
+ TAO_TRY_ENV);
TAO_CHECK_ENV;
- this->connect_suppliers (local_ec.in (), TAO_TRY_ENV);
- TAO_CHECK_ENV;
-
- ACE_DEBUG ((LM_DEBUG, "connected supplier\n"));
+ ACE_DEBUG ((LM_DEBUG, "EC_Mcast: open_federations done\n"));
- this->connect_consumers (local_ec.in (), TAO_TRY_ENV);
+ this->open_senders (local_ec.in (),
+ scheduler.in (),
+ TAO_TRY_ENV);
TAO_CHECK_ENV;
- ACE_DEBUG ((LM_DEBUG, "connected consumer\n"));
+ ACE_DEBUG ((LM_DEBUG, "EC_Mcast: open_senders done\n"));
- this->activate_suppliers (local_ec.in (), TAO_TRY_ENV);
+ this->open_receivers (local_ec.in (),
+ scheduler.in (),
+ TAO_TRY_ENV);
TAO_CHECK_ENV;
- ACE_DEBUG ((LM_DEBUG, "suppliers are active\n"));
+ ACE_DEBUG ((LM_DEBUG, "EC_Mcast: open_receivers done\n"));
- this->running_suppliers_ = this->n_suppliers_;
+ this->activate_federations (local_ec.in (),
+ scheduler.in (),
+ TAO_TRY_ENV);
+ TAO_CHECK_ENV;
- // Acquire the mutex for the ready mutex, blocking any supplier
- // that may start after this point.
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ready_mon, this->ready_mtx_, 1);
- this->ready_ = 1;
- this->test_start_ = ACE_OS::gethrtime ();
- ready_mon.release ();
- this->ready_cnd_.broadcast ();
+ ACE_DEBUG ((LM_DEBUG, "EC_Mcast: activate_federations done\n"));
ACE_DEBUG ((LM_DEBUG, "activate the EC\n"));
@@ -252,34 +259,25 @@ ECM_Driver::run (int argc, char* argv[])
ec_impl.activate ();
ACE_DEBUG ((LM_DEBUG, "running the test\n"));
- if (orb->run () == -1)
+ if (this->orb_->run () == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "orb->run"), -1);
- this->test_stop_ = ACE_OS::gethrtime ();
-
ACE_DEBUG ((LM_DEBUG, "shutdown the EC\n"));
ec_impl.shutdown ();
this->dump_results ();
- this->receiver_.close (TAO_TRY_ENV);
- TAO_CHECK_ENV;
- this->receiver_.shutdown (TAO_TRY_ENV);
- TAO_CHECK_ENV;
-
- this->sender_.close (TAO_TRY_ENV);
+ this->close_receivers (TAO_TRY_ENV);
TAO_CHECK_ENV;
- this->sender_.shutdown (TAO_TRY_ENV);
+ this->close_senders (TAO_TRY_ENV);
TAO_CHECK_ENV;
- this->disconnect_consumers (TAO_TRY_ENV);
- TAO_CHECK_ENV;
- this->disconnect_suppliers (TAO_TRY_ENV);
+ this->close_federations (TAO_TRY_ENV);
TAO_CHECK_ENV;
ACE_DEBUG ((LM_DEBUG, "shutdown grace period\n"));
tv.set (5, 0);
- if (orb->run (&tv) == -1)
+ if (this->orb_->run (&tv) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "orb->run"), -1);
naming_context->unbind (schedule_name, TAO_TRY_ENV);
@@ -287,45 +285,6 @@ ECM_Driver::run (int argc, char* argv[])
naming_context->unbind (channel_name, TAO_TRY_ENV);
TAO_CHECK_ENV;
-
- if (this->schedule_file_ != 0)
- {
- RtecScheduler::RT_Info_Set_var infos;
- RtecScheduler::Config_Info_Set_var configs;
-
-#if defined (__SUNPRO_CC)
- // Sun C++ 4.2 warns with the code below:
- // Warning (Anachronism): Temporary used for non-const
- // reference, now obsolete.
- // Note: Type "CC -migration" for more on anachronisms.
- // Warning (Anachronism): The copy constructor for argument
- // infos of type RtecScheduler::RT_Info_Set_out should take
- // const RtecScheduler::RT_Info_Set_out&.
- // But, this code is not CORBA conformant, because users should
- // not define instances of _out types.
-
- RtecScheduler::RT_Info_Set_out infos_out (infos);
- RtecScheduler::Config_Info_Set_out configs_out (configs);
- ACE_Scheduler_Factory::server ()->compute_scheduling
- (ACE_Sched_Params::priority_min (ACE_SCHED_FIFO,
- ACE_SCOPE_THREAD),
- ACE_Sched_Params::priority_max (ACE_SCHED_FIFO,
- ACE_SCOPE_THREAD),
- infos_out, configs_out, TAO_TRY_ENV);
-#else /* ! __SUNPRO_CC */
- ACE_Scheduler_Factory::server ()->compute_scheduling
- (ACE_Sched_Params::priority_min (ACE_SCHED_FIFO,
- ACE_SCOPE_THREAD),
- ACE_Sched_Params::priority_max (ACE_SCHED_FIFO,
- ACE_SCOPE_THREAD),
- infos.out (), configs.out (), TAO_TRY_ENV);
-#endif /* ! __SUNPRO_CC */
-
- TAO_CHECK_ENV;
- ACE_Scheduler_Factory::dump_schedule (infos.in (),
- configs.in (),
- this->schedule_file_);
- }
}
TAO_CATCH (CORBA::SystemException, sys_ex)
{
@@ -340,510 +299,177 @@ ECM_Driver::run (int argc, char* argv[])
}
void
-ECM_Driver::disconnect_suppliers (CORBA::Environment &_env)
+ECM_Driver::federation_has_shutdown (ECM_Local_Federation *federation,
+ CORBA::Environment &)
{
- for (int i = 0; i < this->n_suppliers_; ++i)
- {
- this->suppliers_[i]->close (_env);
- if (_env.exception () != 0) return;
- }
+ ACE_DEBUG ((LM_DEBUG, "Federation <%s> shuting down\n",
+ federation->name ()));
+ this->federations_running_--;
+ if (this->federations_running_ <= 0)
+ this->orb_->shutdown ();
}
void
-ECM_Driver::connect_suppliers (RtecEventChannelAdmin::EventChannel_ptr local_ec,
- CORBA::Environment &_env)
+ECM_Driver::open_federations (RtecEventChannelAdmin::EventChannel_ptr ec,
+ RtecScheduler::Scheduler_ptr scheduler,
+ CORBA::Environment &_env)
{
- TAO_TRY
+ for (int i = 0; i < this->local_federations_count_; ++i)
{
- for (int i = 0; i < this->n_suppliers_; ++i)
- {
- // Limit the number of events sent by each supplier
- int mc = this->event_count_ / this->n_suppliers_;
- if (mc == 0)
- mc = 1;
-
- char buf[BUFSIZ];
- ACE_OS::sprintf (buf, "supplier_%02.2d@%s", i, this->lcl_name_);
-
- ACE_NEW (this->suppliers_[i],
- ECM_Supplier (this, this->suppliers_ + i));
-
- this->suppliers_[i]->open (buf,
- this->s_event_a_,
- this->s_event_b_,
- mc,
- this->event_period_ * 10,
- local_ec,
- TAO_TRY_ENV);
- TAO_CHECK_ENV;
- }
-
- }
- TAO_CATCHANY
- {
- TAO_RETHROW;
+ this->local_federations_[i]->open (this->event_count_,
+ this->event_period_,
+ ec, scheduler, _env);
+ TAO_CHECK_ENV_RETURN_VOID (_env);
}
- TAO_ENDTRY;
}
void
-ECM_Driver::disconnect_consumers (CORBA::Environment &_env)
+ECM_Driver::activate_federations (RtecEventChannelAdmin::EventChannel_ptr ec,
+ RtecScheduler::Scheduler_ptr scheduler,
+ CORBA::Environment &_env)
{
- for (int i = 0; i < this->n_consumers_; ++i)
+ this->federations_running_ = this->local_federations_count_;
+ for (int i = 0; i < this->local_federations_count_; ++i)
{
- this->consumers_[i]->close (_env);
- if (_env.exception () != 0) return;
+ this->local_federations_[i]->activate (this->event_period_,
+ ec, scheduler, _env);
+ TAO_CHECK_ENV_RETURN_VOID (_env);
}
}
void
-ECM_Driver::activate_suppliers (RtecEventChannelAdmin::EventChannel_ptr local_ec,
- CORBA::Environment &_env)
+ECM_Driver::close_federations (CORBA::Environment &_env)
{
- TAO_TRY
+ for (int i = 0; i < this->local_federations_count_; ++i)
{
- for (int i = 0; i < this->n_suppliers_; ++i)
- {
- // Limit the number of events sent by each supplier
- int mc = this->event_count_ / this->n_suppliers_;
- if (mc == 0)
- mc = 1;
-
- char buf[BUFSIZ];
- ACE_OS::sprintf (buf, "supplier_%02.2d@%s", i, this->lcl_name_);
-
- this->suppliers_[i]->activate (buf,
- this->event_period_ * 10,
- local_ec,
- TAO_TRY_ENV);
- TAO_CHECK_ENV;
- }
-
+ this->local_federations_[i]->close (_env);
+ TAO_CHECK_ENV_RETURN_VOID (_env);
}
- TAO_CATCHANY
- {
- TAO_RETHROW;
- }
- TAO_ENDTRY;
}
void
-ECM_Driver::connect_consumers (RtecEventChannelAdmin::EventChannel_ptr local_ec,
- CORBA::Environment &_env)
+ECM_Driver::open_senders (RtecEventChannelAdmin::EventChannel_ptr ec,
+ RtecScheduler::Scheduler_ptr scheduler,
+ CORBA::Environment &_env)
{
- TAO_TRY
+ if (this->send_dgram_.open (ACE_Addr::sap_any) == -1)
{
- for (int i = 0; i < this->n_consumers_; ++i)
- {
- char buf[BUFSIZ];
- ACE_OS::sprintf (buf, "consumer_%02.2d@%s", i, this->lcl_name_);
-
- ACE_NEW (this->consumers_[i],
- ECM_Consumer (this, this->consumers_ + i));
-
- this->consumers_[i]->open (buf,
- this->c_event_a_,
- this->c_event_b_,
- local_ec,
- TAO_TRY_ENV);
- TAO_CHECK_ENV;
- this->stats_[i].total_time_ = 0;
- this->stats_[i].lcl_count_ = 0;
- this->stats_[i].rmt_count_ = 0;
- }
-
- this->running_consumers_ = this->n_consumers_;
+ // @@ TODO throw an application specific exception.
+ _env.exception (new CORBA::COMM_FAILURE (CORBA::COMPLETED_NO));
+ return;
}
- TAO_CATCHANY
+ for (int i = 0; i < this->all_federations_count_; ++i)
{
- TAO_RETHROW;
+ this->all_federations_[i]->open (&this->send_dgram_,
+ ec,
+ scheduler,
+ _env);
+ TAO_CHECK_ENV_RETURN_VOID (_env);
}
- TAO_ENDTRY;
}
void
-ECM_Driver::connect_ecg (RtecEventChannelAdmin::EventChannel_ptr local_ec,
- CORBA::Environment &_env)
+ECM_Driver::close_senders (CORBA::Environment &_env)
{
- TAO_TRY
+ for (int i = 0; i < this->all_federations_count_; ++i)
{
- RtecScheduler::Scheduler_ptr local_sch =
- ACE_Scheduler_Factory::server ();
-
- // We could use the same name on the local and remote scheduler,
- // but that fails when using a global scheduler.
- const int bufsize = 512;
-
- char mcast_name[bufsize];
- ACE_OS::strcpy (mcast_name, "sender");
- ACE_OS::strcat (mcast_name, "@");
- ACE_OS::strcat (mcast_name, this->lcl_name_);
-
- this->sender_.init (local_ec,
- local_sch,
- mcast_name,
- this->send_mcast_group_,
- TAO_TRY_ENV);
- TAO_CHECK_ENV;
-
- ACE_INET_Addr ignore_from;
- this->sender_.get_local_addr (ignore_from);
-
- char recv_name[bufsize];
- ACE_OS::strcpy (recv_name, "receiver");
- ACE_OS::strcat (recv_name, "@");
- ACE_OS::strcat (recv_name, this->lcl_name_);
-
- this->receiver_.init (local_ec,
- local_sch,
- recv_name,
- ignore_from,
- TAO_TRY_ENV);
- TAO_CHECK_ENV;
-
- this->mcast_eh_.reactor (TAO_ORB_Core_instance ()->reactor ());
- this->mcast_eh_.open (this->recv_mcast_group_);
-
- RtecEventChannelAdmin::ConsumerQOS sender_qos;
- sender_qos.is_gateway = 1;
- sender_qos.dependencies.length (3);
- sender_qos.dependencies[0].event.type_ = ACE_ES_DISJUNCTION_DESIGNATOR;
- sender_qos.dependencies[0].event.source_ = 0;
- sender_qos.dependencies[0].event.creation_time_ = ORBSVCS_Time::zero;
- sender_qos.dependencies[0].rt_info = 0;
- sender_qos.dependencies[1].event.type_ = this->s_event_a_;
- sender_qos.dependencies[1].event.source_ = 0;
- sender_qos.dependencies[1].event.creation_time_ = ORBSVCS_Time::zero;
- sender_qos.dependencies[1].rt_info = 0;
- sender_qos.dependencies[2].event.type_ = this->s_event_b_;
- sender_qos.dependencies[2].event.source_ = 0;
- sender_qos.dependencies[2].event.creation_time_ = ORBSVCS_Time::zero;
- sender_qos.dependencies[2].rt_info = 0;
-
- this->sender_.open (sender_qos, TAO_TRY_ENV);
- TAO_CHECK_ENV;
-
- RtecEventChannelAdmin::SupplierQOS receiver_qos;
- receiver_qos.is_gateway = 1;
- receiver_qos.publications.length (2);
- receiver_qos.publications[0].event.type_ = this->r_event_a_;
- receiver_qos.publications[0].event.source_ = 0;
- receiver_qos.publications[0].event.creation_time_ = ORBSVCS_Time::zero;
- receiver_qos.publications[0].dependency_info.dependency_type =
- RtecScheduler::TWO_WAY_CALL;
- receiver_qos.publications[0].dependency_info.number_of_calls = 1;
- receiver_qos.publications[0].dependency_info.rt_info = 0;
- receiver_qos.publications[1].event.type_ = this->r_event_b_;
- receiver_qos.publications[1].event.source_ = 0;
- receiver_qos.publications[1].event.creation_time_ = ORBSVCS_Time::zero;
- receiver_qos.publications[1].dependency_info.dependency_type =
- RtecScheduler::TWO_WAY_CALL;
- receiver_qos.publications[1].dependency_info.number_of_calls = 1;
- receiver_qos.publications[1].dependency_info.rt_info = 0;
-
- this->receiver_.open (receiver_qos, TAO_TRY_ENV);
- TAO_CHECK_ENV;
- }
- TAO_CATCHANY
- {
- TAO_RETHROW;
+ this->all_federations_[i]->close (_env);
+ TAO_CHECK_ENV_RETURN_VOID (_env);
}
- TAO_ENDTRY;
+ this->send_dgram_.close ();
}
void
-ECM_Driver::push_supplier (void * /* cookie */,
- RtecEventChannelAdmin::ProxyPushConsumer_ptr consumer,
- const RtecEventComm::EventSet &events,
- CORBA::Environment & _env)
+ECM_Driver::open_receivers (RtecEventChannelAdmin::EventChannel_ptr ec,
+ RtecScheduler::Scheduler_ptr scheduler,
+ CORBA::Environment &_env)
{
- this->wait_until_ready ();
- // ACE_DEBUG ((LM_DEBUG, "(%P|%t) events sent by supplier\n"));
- // @@ TODO we could keep somekind of stats here...
- if (!this->short_circuit_)
+ for (int i = 0; i < this->local_federations_count_; ++i)
{
- consumer->push (events, _env);
- }
- else
- {
- for (int i = 0; i < this->n_consumers_ && !_env.exception (); ++i)
- {
- this->consumers_[i]->push (events, _env);
- }
+ this->local_federations_[i]->open_receiver (ec,
+ scheduler,
+ _env);
+ TAO_CHECK_ENV_RETURN_VOID (_env);
}
}
void
-ECM_Driver::push_consumer (void *consumer_cookie,
- ACE_hrtime_t arrival,
- const RtecEventComm::EventSet &events,
- CORBA::Environment &)
+ECM_Driver::close_receivers (CORBA::Environment &_env)
{
- int ID =
- (ACE_reinterpret_cast(ECM_Consumer**,consumer_cookie)
- - this->consumers_);
-
- // ACE_DEBUG ((LM_DEBUG, "(%P|%t) events received by consumer %d\n", ID));
-
- if (events.length () == 0)
+ for (int i = 0; i < this->local_federations_count_; ++i)
{
- // ACE_DEBUG ((LM_DEBUG, "no events\n"));
- return;
+ this->local_federations_[i]->close_receiver (_env);
+ TAO_CHECK_ENV_RETURN_VOID (_env);
}
-
- // ACE_DEBUG ((LM_DEBUG, "%d event(s)\n", events.length ()));
-
-#if 0
- const int bufsize = 128;
- char buf[bufsize];
- ACE_OS::sprintf (buf, "Consumer %d receives event in thread: ", ID);
-#endif
-
- for (u_int i = 0; i < events.length (); ++i)
- {
- const RtecEventComm::Event& e = events[i];
-
- if (e.type_ == ACE_ES_EVENT_SHUTDOWN)
- {
- this->shutdown_consumer (ID);
- continue;
- }
-
- ACE_hrtime_t s;
- ORBSVCS_Time::TimeT_to_hrtime (s, e.creation_time_);
- ACE_hrtime_t nsec = arrival - s;
- if (this->local_source (e.source_))
- {
- int& count = this->stats_[ID].lcl_count_;
-
- this->stats_[ID].lcl_latency_[count] = nsec;
- int workload = this->workload_;
- int interval = this->event_period_;
-
- for (int j = 0; j < workload; ++j)
- {
- // Eat a little CPU so the Utilization test can measure the
- // consumed time....
- /* takes about 40.2 usecs on a 167 MHz Ultra2 */
- u_long n = 1279UL;
- ACE::is_prime (n, 2, n / 2);
- }
- // Increment the elapsed time on this consumer.
- ACE_hrtime_t now = ACE_OS::gethrtime ();
- this->stats_[ID].total_time_ += now - arrival;
- this->stats_[ID].end_[count] = now;
-
- // We estimate our laxity based on the event creation
- // time... it may not be very precise, but will do; other
- // strategies include:
- // + Keep track of the "current frame", then then deadline
- // is the end of the frame.
- // + Use the start of the test to keep the current frame.
- // + Use the last execution.
-
- // Work around MSVC++ bug, it does not not how to convert an
- // unsigned 64 bit int into a long....
- CORBA::ULong tmp = ACE_static_cast(CORBA::ULong,(s - now));
- this->stats_[ID].laxity_[count] = 1 + tmp/1000.0F/interval;
- count++;
- }
- else
- {
- int& count = this->stats_[ID].rmt_count_;
- this->stats_[ID].rmt_latency_[count] = nsec;
- count++;
- }
- }
-}
-
-void
-ECM_Driver::wait_until_ready (void)
-{
- ACE_GUARD (ACE_Thread_Mutex, ready_mon, this->ready_mtx_);
- while (!this->ready_)
- this->ready_cnd_.wait ();
-}
-
-void
-ECM_Driver::shutdown_supplier (void* /* supplier_cookie */,
- RtecEventComm::PushConsumer_ptr consumer,
- CORBA::Environment& _env)
-{
-
- this->running_suppliers_--;
- if (this->running_suppliers_ == 0)
- {
- // We propagate a shutdown event through the system...
- RtecEventComm::EventSet shutdown (1);
- shutdown.length (1);
- RtecEventComm::Event& s = shutdown[0];
-
- s.source_ = 0;
- s.ttl_ = 1;
-
- ACE_hrtime_t t = ACE_OS::gethrtime ();
- ORBSVCS_Time::hrtime_to_TimeT (s.creation_time_, t);
- s.ec_recv_time_ = ORBSVCS_Time::zero;
- s.ec_send_time_ = ORBSVCS_Time::zero;
- s.data_.x = 0;
- s.data_.y = 0;
- s.type_ = ACE_ES_EVENT_SHUTDOWN;
- consumer->push (shutdown, _env);
- }
-}
-
-void
-ECM_Driver::shutdown_consumer (int id)
-{
- ACE_DEBUG ((LM_DEBUG, "Shutdown consumer %d\n", id));
- this->running_consumers_--;
- if (this->running_consumers_ == 0)
- TAO_ORB_Core_instance ()->orb ()->shutdown ();
-}
-
-int
-ECM_Driver::shutdown (CORBA::Environment& _env)
-{
- ACE_UNUSED_ARG (_env);
-
- ACE_DEBUG ((LM_DEBUG, "Shutting down the multiple EC test\n"));
-
- TAO_ORB_Core_instance ()->orb ()->shutdown ();
- return 0;
}
void
ECM_Driver::dump_results (void)
{
- const int bufsize = 512;
- char buf[bufsize];
-
- for (int i = 0; i < this->n_consumers_; ++i)
+ for (int i = 0; i < this->local_federations_count_; ++i)
{
- ACE_OS::sprintf (buf, "CO%02.2d", i);
- this->dump_results (buf, this->stats_[i]);
+ this->local_federations_[i]->dump_results ();
}
- // the cast is to workaround a msvc++ bug...
- CORBA::ULong tmp = ACE_static_cast(CORBA::ULong,
- this->test_stop_ - this->test_start_);
- double usec = tmp / 1000.0;
- ACE_DEBUG ((LM_DEBUG, "Time[TOTAL]: %.3f\n", usec));
}
-void
-ECM_Driver::dump_results (const char* name, Stats& stats)
-{
- // @@ We are reporting the information without specifics about
- // the cast is to workaround a msvc++ bug...
- double usec = ACE_static_cast(CORBA::ULong,stats.total_time_) / 1000.0;
- ACE_DEBUG ((LM_DEBUG, "Time[LCL,%s]: %.3f\n", name, usec));
- int i;
- for (i = 1; i < stats.lcl_count_ - 1; ++i)
- {
- // the cast is to workaround a msvc++ bug...
- usec = ACE_static_cast(CORBA::ULong,stats.lcl_latency_[i]) / 1000.0;
- ACE_DEBUG ((LM_DEBUG, "Latency[LCL,%s]: %.3f\n", name, usec));
-
- double percent = stats.laxity_[i] * 100.0;
- ACE_DEBUG ((LM_DEBUG, "Laxity[LCL,%s]: %.3f\n", name, percent));
-
- // the cast is to workaround a msvc++ bug...
- usec = ACE_static_cast(CORBA::ULong,stats.end_[i] - this->test_start_) / 1000.0;
- ACE_DEBUG ((LM_DEBUG, "Completion[LCL,%s]: %.3f\n", name, usec));
- }
- for (i = 1; i < stats.rmt_count_ - 1; ++i)
- {
- // the cast is to workaround a msvc++ bug...
- double usec = ACE_static_cast(CORBA::ULong,stats.rmt_latency_[i]) / 1000.0;
- ACE_DEBUG ((LM_DEBUG, "Latency[RMT,%s]: %.3f\n", name, usec));
- }
-}
-
-int
-ECM_Driver::local_source (RtecEventComm::EventSourceID id) const
-{
- for (int i = 0; i < this->n_suppliers_; ++i)
- {
- if (this->suppliers_[i]->supplier_id () == id)
- return 1;
- }
- return 0;
-}
+
+// ****************************************************************
int
ECM_Driver::parse_args (int argc, char *argv [])
{
- ACE_Get_Opt get_opt (argc, argv, "xl:s:r:h:p:d:");
+ ACE_Get_Opt get_opt (argc, argv, "l:p:c:n:t:f:");
int opt;
while ((opt = get_opt ()) != EOF)
{
switch (opt)
{
- case 'x':
- this->short_circuit_ = 1;
- break;
-
case 'l':
this->lcl_name_ = get_opt.optarg;
break;
- case 'h':
- {
- char* aux;
- char* arg = ACE_OS::strtok_r (get_opt.optarg, ",", &aux);
-
- this->n_suppliers_ = ACE_OS::atoi (arg);
- arg = ACE_OS::strtok_r (0, ",", &aux);
- this->n_consumers_ = ACE_OS::atoi (arg);
- arg = ACE_OS::strtok_r (0, ",", &aux);
- this->workload_ = ACE_OS::atoi (arg);
- arg = ACE_OS::strtok_r (0, ",", &aux);
- this->event_period_ = ACE_OS::atoi (arg);
- arg = ACE_OS::strtok_r (0, ",", &aux);
- this->event_count_ = ACE_OS::atoi (arg);
- arg = ACE_OS::strtok_r (0, ",", &aux);
- this->s_event_a_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg);
- arg = ACE_OS::strtok_r (0, ",", &aux);
- this->s_event_b_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg);
- arg = ACE_OS::strtok_r (0, ",", &aux);
- this->c_event_a_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg);
- arg = ACE_OS::strtok_r (0, ",", &aux);
- this->c_event_b_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg);
- arg = ACE_OS::strtok_r (0, ",", &aux);
- this->r_event_a_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg);
- arg = ACE_OS::strtok_r (0, ",", &aux);
- this->r_event_b_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg);
- }
- break;
-
case 'p':
- this->pid_file_name_ = get_opt.optarg;
+ this->pid_filename_ = get_opt.optarg;
break;
- case 'd':
- this->schedule_file_ = get_opt.optarg;
- break;
+ case 'c':
+ this->config_filename_ = get_opt.optarg;
+ break;
+
+ case 't':
+ this->event_period_ = ACE_OS::atoi (get_opt.optarg);
+ break;
- case 's':
- this->send_mcast_group_.set (get_opt.optarg);
- break;
+ case 'n':
+ this->event_count_ = ACE_OS::atoi (get_opt.optarg);
+ break;
- case 'r':
- this->recv_mcast_group_.set (get_opt.optarg);
- break;
+ case 'f':
+ {
+ char* aux;
+ int i = 0;
+ for (char* arg = ACE_OS::strtok_r (get_opt.optarg, ",", &aux);
+ arg != 0 && i < ECM_Driver::MAX_LOCAL_FEDERATIONS;
+ arg = ACE_OS::strtok_r (0, ",", &aux), ++i)
+ {
+ this->local_names_[i] = arg;
+ }
+ this->local_federations_count_ = i;
+ }
+ break;
case '?':
default:
ACE_DEBUG ((LM_DEBUG,
"Usage: %s "
"[ORB options] "
- "-x (short circuit EC) "
- "-h <high priority args> "
+ "-n <event_count> "
+ "-t <event_period> "
+ "-l <localname> "
"-p <pid file name> "
- "-d <schedule file name> "
- "-s <send_mcast group> "
- "-r <recv_mcast group> "
+ "-c <config file name> "
+ "-g federation,federation,... "
"\n",
argv[0]));
return -1;
@@ -857,110 +483,328 @@ ECM_Driver::parse_args (int argc, char *argv [])
"%s: event count (%d) is out of range, "
"reset to default (%d)\n",
argv[0], this->event_count_,
- 160));
- this->event_count_ = 160;
+ 100));
+ this->event_count_ = 100;
+ }
+
+ return 0;
+}
+
+int
+ECM_Driver::parse_config_file (void)
+{
+ FILE* cfg = 0;
+ if (this->config_filename_ != 0)
+ cfg = ACE_OS::fopen (this->config_filename_, "r");
+ else
+ cfg = stdin;
+
+ if (cfg == 0)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR, "cannot open config file <%s>\n",
+ this->config_filename_), -1);
}
- if (this->n_consumers_ <= 0
- || this->n_consumers_ >= ECM_Driver::MAX_CONSUMERS
- || this->n_suppliers_ <= 0
- || this->n_suppliers_ >= ECM_Driver::MAX_SUPPLIERS)
+ int s = fscanf (cfg, "%d", &this->all_federations_count_);
+ if (s == 0 || s == EOF)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "problem reading federation count\n"), -1);
+ }
+ // ACE_DEBUG ((LM_DEBUG,
+ // "total federations = %d\n",
+ // this->all_federations_count_));
+ for (int i = 0; i < this->all_federations_count_; ++i)
{
- ACE_ERROR_RETURN ((LM_DEBUG,
- "%s: number of consumers or "
- "suppliers out of range\n", argv[0]), -1);
+ if (this->skip_blanks (cfg, "reading federation name"))
+ return -1;
+ ACE_Read_Buffer reader(cfg);
+ char* buf = reader.read (' ', ' ', '\0');
+ char* name = CORBA::string_dup (buf);
+ reader.alloc()->free (buf);
+
+
+ int port;
+ if (this->skip_blanks (cfg, "reading federation port number"))
+ return -1;
+ fscanf (cfg, "%d", &port);
+ CORBA::UShort mcast_port = ACE_static_cast(CORBA::UShort, port);
+
+ int ns, nc;
+ if (this->skip_blanks (cfg, "reading supplier count"))
+ return -1;
+ s = fscanf (cfg, "%d", &ns);
+ if (s == 0 || s == EOF)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "problem reading supplier count (%d)\n",
+ i), -1);
+ }
+ if (this->skip_blanks (cfg, "reading constumer count"))
+ return -1;
+ s = fscanf (cfg, "%d", &nc);
+ if (s == 0 || s == EOF)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "problem reading consumer count (%d)\n",
+ i), -1);
+ }
+ // ACE_DEBUG ((LM_DEBUG, "i = %d <%s> <%d> <%d> <%d>\n",
+ // i, name, mcast_port, ns, nc));
+
+ char** supplier_names;
+ char** consumer_names;
+ ACE_NEW_RETURN (supplier_names, char*[ns], -1);
+ ACE_NEW_RETURN (consumer_names, char*[nc], -1);
+
+ if (this->parse_name_list (cfg, ns, supplier_names,
+ "reading supplier list"))
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "error parsing supplier list for <%s>\n",
+ name), -1);
+ }
+
+ if (this->parse_name_list (cfg, nc, consumer_names,
+ "reading consumer list"))
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "error parsing consumer list for <%s>\n",
+ name), -1);
+ }
+
+ ACE_NEW_RETURN (this->all_federations_[i],
+ ECM_Federation (name, mcast_port,
+ ns, supplier_names,
+ nc, consumer_names), -1);
+ }
+ ACE_OS::fclose (cfg);
+
+ for (int j = 0; j < this->local_federations_count_; ++j)
+ {
+ int k = 0;
+ for (; k < this->all_federations_count_; ++k)
+ {
+ if (ACE_OS::strcmp (this->local_names_[j],
+ this->all_federations_[k]->name ()) == 0)
+ {
+ ACE_NEW_RETURN (this->local_federations_[j],
+ ECM_Local_Federation (this->all_federations_[k],
+ this),
+ -1);
+ break;
+ }
+ }
+ if (k == this->all_federations_count_)
+ ACE_ERROR ((LM_ERROR,
+ "Cannot find federations <%s>\n",
+ this->local_names_[j]));
}
return 0;
}
+int
+ECM_Driver::parse_name_list (FILE* file,
+ int n,
+ char** names,
+ const char* error_msg)
+{
+ for (int i = 0; i < n; ++i)
+ {
+ if (this->skip_blanks (file, error_msg))
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "error on item %d while %s\n",
+ i, error_msg), -1);
+ }
+ ACE_Read_Buffer tmp(file);
+ char* buf = tmp.read ('\n', '\n', '\0');
+ names[i] = CORBA::string_dup (buf);
+ tmp.alloc ()->free (buf);
+ }
+ return 0;
+}
+
+int
+ECM_Driver::skip_blanks (FILE* file,
+ const char* error_msg)
+{
+ int c;
+ // Consume all the blanks.
+ while (isspace (c = fgetc (file)));
+ if (c == EOF)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Unexpected EOF in config file while %s\n",
+ error_msg),
+ -1);
+ }
+ ungetc (c, file);
+ return 0;
+}
// ****************************************************************
-ECM_Supplier::ECM_Supplier (ECM_Driver *test,
- void *cookie)
- : test_ (test),
- cookie_ (cookie),
- consumer_ (this)
+ECM_Federation::ECM_Federation (char* name,
+ CORBA::UShort mcast_port,
+ int supplier_types,
+ char** supplier_names,
+ int consumer_types,
+ char** consumer_names)
+ : name_ (name),
+ mcast_port_ (mcast_port),
+ supplier_types_ (supplier_types),
+ supplier_names_ (supplier_names),
+ consumer_types_ (consumer_types),
+ consumer_names_ (consumer_names),
+ addr_server_ (mcast_port)
{
+ ACE_NEW (this->supplier_ipaddr_, CORBA::ULong[this->supplier_types_]);
+ ACE_NEW (this->consumer_ipaddr_, CORBA::ULong[this->consumer_types_]);
+
+ int i;
+ for (i = 0; i < this->supplier_types_; ++i)
+ {
+ ACE_INET_Addr addr (u_short(0), this->supplier_names_[i]);
+ this->supplier_ipaddr_[i] = addr.get_ip_address ();
+ }
+ for (i = 0; i < this->consumer_types_; ++i)
+ {
+ ACE_INET_Addr addr (u_short(0), this->consumer_names_[i]);
+ this->consumer_ipaddr_[i] = addr.get_ip_address ();
+ }
}
void
-ECM_Supplier::open (const char* name,
- int event_a,
- int event_b,
- int event_count,
- const RtecScheduler::Period& rate,
- RtecEventChannelAdmin::EventChannel_ptr ec,
- CORBA::Environment &_env)
+ECM_Federation::open (ACE_SOCK_Dgram *dgram,
+ RtecEventChannelAdmin::EventChannel_ptr ec,
+ RtecScheduler::Scheduler_ptr scheduler,
+ CORBA::Environment &_env)
{
- this->event_a_ = event_a;
- this->event_b_ = event_b;
- this->event_count_ = event_count;
-
- TAO_TRY
+ const int bufsize = 512;
+ char buf[bufsize];
+ ACE_OS::strcpy (buf, this->name ());
+ ACE_OS::strcat (buf, "/sender");
+
+ RtecUDPAdmin::AddrServer_var addr_server =
+ this->addr_server_._this (_env);
+ TAO_CHECK_ENV_RETURN_VOID (_env);
+
+ this->sender_.init (ec, scheduler,
+ buf,
+ addr_server.in (),
+ dgram,
+ _env);
+ TAO_CHECK_ENV_RETURN_VOID (_env);
+
+ RtecScheduler::handle_t rt_info =
+ scheduler->create (buf, _env);
+ TAO_CHECK_ENV_RETURN_VOID(_env);
+
+ // The worst case execution time is far less than 2
+ // milliseconds, but that is a safe estimate....
+ ACE_Time_Value tv (0, 2000);
+ TimeBase::TimeT time;
+ ORBSVCS_Time::Time_Value_to_TimeT (time, tv);
+ scheduler->set (rt_info,
+ RtecScheduler::VERY_HIGH_CRITICALITY,
+ time, time, time,
+ 0,
+ RtecScheduler::VERY_LOW_IMPORTANCE,
+ time,
+ 0,
+ RtecScheduler::OPERATION,
+ _env);
+ TAO_CHECK_ENV_RETURN_VOID (_env);
+
+ ACE_ConsumerQOS_Factory qos;
+ qos.start_disjunction_group ();
+ for (int i = 0; i < this->consumer_types (); ++i)
{
- RtecScheduler::Scheduler_ptr server =
- ACE_Scheduler_Factory::server ();
+ qos.insert_type (this->consumer_ipaddr (i), rt_info);
+ }
+ RtecEventChannelAdmin::ConsumerQOS qos_copy = qos.get_ConsumerQOS ();
+ this->sender_.open (qos_copy, _env);
+ TAO_CHECK_ENV_RETURN_VOID (_env);
+}
- RtecScheduler::handle_t rt_info =
- server->create (name, TAO_TRY_ENV);
- TAO_CHECK_ENV;
+void
+ECM_Federation::close (CORBA::Environment &_env)
+{
+ this->sender_.close (_env);
+ TAO_CHECK_ENV_RETURN_VOID (_env);
+ this->sender_.shutdown (_env);
+ TAO_CHECK_ENV_RETURN_VOID (_env);
+}
- // The execution times are set to reasonable values, but
- // actually they are changed on the real execution, i.e. we
- // lie to the scheduler to obtain right priorities; but we
- // don't care if the set is schedulable.
- ACE_Time_Value tv (0, 2000);
- TimeBase::TimeT time;
- ORBSVCS_Time::Time_Value_to_TimeT (time, tv);
- server->set (rt_info,
- RtecScheduler::VERY_HIGH_CRITICALITY,
- time, time, time,
- rate,
- RtecScheduler::VERY_LOW_IMPORTANCE,
- time,
- 1,
- RtecScheduler::OPERATION,
- TAO_TRY_ENV);
- TAO_CHECK_ENV;
+// ****************************************************************
- this->supplier_id_ = ACE::crc32 (name);
- ACE_DEBUG ((LM_DEBUG, "ID for <%s> is %04.4x\n", name,
- this->supplier_id_));
+ECM_Supplier::ECM_Supplier (ECM_Local_Federation* federation)
+ : federation_ (federation),
+ consumer_ (this)
+{
+}
- ACE_SupplierQOS_Factory qos;
- qos.insert (this->supplier_id_,
- this->event_a_,
- rt_info, 1);
- qos.insert (this->supplier_id_,
- this->event_b_,
- rt_info, 1);
+void
+ECM_Supplier::open (const char* name,
+ RtecScheduler::Period period,
+ RtecEventChannelAdmin::EventChannel_ptr ec,
+ RtecScheduler::Scheduler_ptr scheduler,
+ CORBA::Environment &_env)
+{
+ RtecScheduler::handle_t rt_info =
+ scheduler->create (name, _env);
+ TAO_CHECK_ENV_RETURN_VOID (_env);
+
+ // The execution times are set to reasonable values, but
+ // actually they are changed on the real execution, i.e. we
+ // lie to the scheduler to obtain right priorities; but we
+ // don't care if the set is schedulable.
+ ACE_Time_Value tv (0, 2000);
+ TimeBase::TimeT time;
+ ORBSVCS_Time::Time_Value_to_TimeT (time, tv);
+
+ scheduler->set (rt_info,
+ RtecScheduler::VERY_HIGH_CRITICALITY,
+ time, time, time,
+ period,
+ RtecScheduler::VERY_LOW_IMPORTANCE,
+ time,
+ 1,
+ RtecScheduler::OPERATION,
+ _env);
+ TAO_CHECK_ENV_RETURN_VOID (_env);
+
+ this->supplier_id_ = ACE::crc32 (name);
+ ACE_DEBUG ((LM_DEBUG, "ID for <%s> is %04.4x\n", name,
+ this->supplier_id_));
+
+ ACE_SupplierQOS_Factory qos;
+ for (int i = 0; i < this->federation_->supplier_types (); ++i)
+ {
qos.insert (this->supplier_id_,
- ACE_ES_EVENT_SHUTDOWN,
- rt_info, 1);
-
- RtecEventChannelAdmin::SupplierAdmin_var supplier_admin =
- ec->for_suppliers (TAO_TRY_ENV);
- TAO_CHECK_ENV;
+ this->federation_->supplier_ipaddr (i),
+ rt_info, 1);
+ }
+ qos.insert (this->supplier_id_,
+ ACE_ES_EVENT_SHUTDOWN,
+ rt_info, 1);
- this->consumer_proxy_ =
- supplier_admin->obtain_push_consumer (TAO_TRY_ENV);
- TAO_CHECK_ENV;
+ RtecEventChannelAdmin::SupplierAdmin_var supplier_admin =
+ ec->for_suppliers (_env);
+ TAO_CHECK_ENV_RETURN_VOID (_env);
- RtecEventComm::PushSupplier_var objref = this->_this (TAO_TRY_ENV);
- TAO_CHECK_ENV;
+ this->consumer_proxy_ =
+ supplier_admin->obtain_push_consumer (_env);
+ TAO_CHECK_ENV_RETURN_VOID (_env);
- this->consumer_proxy_->connect_push_supplier (objref.in (),
- qos.get_SupplierQOS (),
- TAO_TRY_ENV);
- TAO_CHECK_ENV;
+ RtecEventComm::PushSupplier_var objref = this->_this (_env);
+ TAO_CHECK_ENV_RETURN_VOID (_env);
- }
- TAO_CATCHANY
- {
- TAO_RETHROW;
- }
- TAO_ENDTRY;
+ this->consumer_proxy_->connect_push_supplier (objref.in (),
+ qos.get_SupplierQOS (),
+ _env);
+ TAO_CHECK_ENV_RETURN_VOID (_env);
}
void
@@ -970,262 +814,423 @@ ECM_Supplier::close (CORBA::Environment &_env)
return;
this->consumer_proxy_->disconnect_push_consumer (_env);
- if (_env.exception () != 0) return;
+ TAO_CHECK_ENV_RETURN_VOID (_env);
this->consumer_proxy_ = 0;
}
void
ECM_Supplier::activate (const char* name,
- const RtecScheduler::Period& rate,
- RtecEventChannelAdmin::EventChannel_ptr ec,
- CORBA::Environment &_env)
+ RtecScheduler::Period period,
+ RtecEventChannelAdmin::EventChannel_ptr ec,
+ RtecScheduler::Scheduler_ptr scheduler,
+ CORBA::Environment &_env)
{
- TAO_TRY
- {
- RtecScheduler::Scheduler_ptr server =
- ACE_Scheduler_Factory::server ();
+ const int bufsize = 512;
+ char buf[bufsize];
+ ACE_OS::strcpy (buf, "consumer_");
+ ACE_OS::strcat (buf, name);
+ RtecScheduler::handle_t rt_info =
+ scheduler->create (buf, _env);
+ TAO_CHECK_ENV_RETURN_VOID (_env);
+
+ // The execution times are set to reasonable values, but
+ // actually they are changed on the real execution, i.e. we
+ // lie to the scheduler to obtain right priorities; but we
+ // don't care if the set is schedulable.
+ ACE_Time_Value tv (0, 2000);
+ TimeBase::TimeT time;
+ ORBSVCS_Time::Time_Value_to_TimeT (time, tv);
+ scheduler->set (rt_info,
+ RtecScheduler::VERY_HIGH_CRITICALITY,
+ time, time, time,
+ period,
+ RtecScheduler::VERY_LOW_IMPORTANCE,
+ time,
+ 1,
+ RtecScheduler::OPERATION,
+ _env);
+ TAO_CHECK_ENV_RETURN_VOID (_env);
+
+ // Also connect our consumer for timeout events from the EC.
+ int interval = period / 10;
+ ACE_Time_Value tv_timeout (interval / ACE_ONE_SECOND_IN_USECS,
+ interval % ACE_ONE_SECOND_IN_USECS);
+ TimeBase::TimeT timeout;
+ ORBSVCS_Time::Time_Value_to_TimeT (timeout, tv_timeout);
+
+ ACE_ConsumerQOS_Factory consumer_qos;
+ consumer_qos.start_disjunction_group ();
+ consumer_qos.insert_time (ACE_ES_EVENT_INTERVAL_TIMEOUT,
+ timeout,
+ rt_info);
+
+ // = Connect as a consumer.
+ RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin =
+ ec->for_consumers (_env);
+ TAO_CHECK_ENV_RETURN_VOID (_env);
+
+ this->supplier_proxy_ =
+ consumer_admin->obtain_push_supplier (_env);
+ TAO_CHECK_ENV_RETURN_VOID (_env);
+
+ RtecEventComm::PushConsumer_var cref =
+ this->consumer_._this (_env);
+ TAO_CHECK_ENV_RETURN_VOID (_env);
+
+ this->supplier_proxy_->connect_push_consumer (cref.in (),
+ consumer_qos.get_ConsumerQOS (),
+ _env);
+ TAO_CHECK_ENV_RETURN_VOID (_env);
+}
- const int bufsize = 512;
- char buf[bufsize];
- ACE_OS::strcpy (buf, "consumer_");
- ACE_OS::strcat (buf, name);
- RtecScheduler::handle_t rt_info =
- server->create (buf, TAO_TRY_ENV);
- TAO_CHECK_ENV;
+int
+ECM_Supplier::supplier_id (void) const
+{
+ return this->supplier_id_;
+}
+void
+ECM_Supplier::push (const RtecEventComm::EventSet& events,
+ CORBA::Environment& _env)
+{
+ for (u_int i = 0; i < events.length (); ++i)
+ {
+ const RtecEventComm::Event& e = events[i];
+ if (e.header.type != ACE_ES_EVENT_INTERVAL_TIMEOUT)
+ continue;
- // The execution times are set to reasonable values, but
- // actually they are changed on the real execution, i.e. we
- // lie to the scheduler to obtain right priorities; but we
- // don't care if the set is schedulable.
- ACE_Time_Value tv (0, 2000);
- TimeBase::TimeT time;
- ORBSVCS_Time::Time_Value_to_TimeT (time, tv);
- server->set (rt_info,
- RtecScheduler::VERY_HIGH_CRITICALITY,
- time, time, time,
- rate,
- RtecScheduler::VERY_LOW_IMPORTANCE,
- time,
- 1,
- RtecScheduler::OPERATION,
- TAO_TRY_ENV);
- TAO_CHECK_ENV;
+ this->federation_->supplier_timeout (this->consumer_proxy_.in (),
+ _env);
+ TAO_CHECK_ENV_RETURN_VOID (_env);
+ }
+}
- // Also connect our consumer for timeout events from the EC.
- int interval = rate / 10;
- ACE_Time_Value tv_timeout (interval / ACE_ONE_SECOND_IN_USECS,
- interval % ACE_ONE_SECOND_IN_USECS);
- TimeBase::TimeT timeout;
- ORBSVCS_Time::Time_Value_to_TimeT (timeout, tv_timeout);
-
- ACE_ConsumerQOS_Factory consumer_qos;
- consumer_qos.start_disjunction_group ();
- consumer_qos.insert_time (ACE_ES_EVENT_INTERVAL_TIMEOUT,
- timeout,
- rt_info);
-
- // = Connect as a consumer.
- RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin =
- ec->for_consumers (TAO_TRY_ENV);
- TAO_CHECK_ENV;
+void
+ECM_Supplier::disconnect_push_supplier (CORBA::Environment& _env)
+{
+ this->supplier_proxy_->disconnect_push_supplier (_env);
+}
- this->supplier_proxy_ =
- consumer_admin->obtain_push_supplier (TAO_TRY_ENV);
- TAO_CHECK_ENV;
+void
+ECM_Supplier::disconnect_push_consumer (CORBA::Environment &)
+{
+}
- RtecEventComm::PushConsumer_var cref =
- this->consumer_._this (TAO_TRY_ENV);
- TAO_CHECK_ENV;
+// ****************************************************************
- this->supplier_proxy_->connect_push_consumer (cref.in (),
- consumer_qos.get_ConsumerQOS (),
- TAO_TRY_ENV);
- TAO_CHECK_ENV;
- }
- TAO_CATCHANY
- {
- TAO_RETHROW;
- }
- TAO_ENDTRY;
+ECM_Consumer::ECM_Consumer (ECM_Local_Federation *federation)
+ : federation_ (federation)
+{
}
void
-ECM_Supplier::push (const RtecEventComm::EventSet& events,
- CORBA::Environment& _env)
+ECM_Consumer::open (const char* name,
+ RtecEventChannelAdmin::EventChannel_ptr ec,
+ RtecScheduler::Scheduler_ptr scheduler,
+ CORBA::Environment& _env)
{
-#if 0
- const int bufsize = 128;
- char buf[bufsize];
- ACE_OS::sprintf (buf, "Supplier %d receives event in thread: ",
- this->supplier_id_);
-#endif
-
- if (events.length () == 0 || this->event_count_ < 0)
+ RtecScheduler::handle_t rt_info =
+ scheduler->create (name, _env);
+ TAO_CHECK_ENV_RETURN_VOID(_env);
+
+ // The worst case execution time is far less than 2
+ // milliseconds, but that is a safe estimate....
+ ACE_Time_Value tv (0, 2000);
+ TimeBase::TimeT time;
+ ORBSVCS_Time::Time_Value_to_TimeT (time, tv);
+ scheduler->set (rt_info,
+ RtecScheduler::VERY_HIGH_CRITICALITY,
+ time, time, time,
+ 0,
+ RtecScheduler::VERY_LOW_IMPORTANCE,
+ time,
+ 0,
+ RtecScheduler::OPERATION,
+ _env);
+ TAO_CHECK_ENV_RETURN_VOID (_env);
+
+ ACE_ConsumerQOS_Factory qos;
+ qos.start_disjunction_group ();
+ qos.insert_type (ACE_ES_EVENT_SHUTDOWN, rt_info);
+ const ECM_Federation* federation = this->federation_->federation ();
+ for (int i = 0; i < federation->consumer_types (); ++i)
{
- // ACE_DEBUG ((LM_DEBUG, "no events\n"));
- return;
+ qos.insert_type (federation->consumer_ipaddr (i), rt_info);
}
- RtecEventComm::EventSet sent (events.length ());
- sent.length (events.length ());
+ // = Connect as a consumer.
+ RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin =
+ ec->for_consumers (_env);
+ TAO_CHECK_ENV_RETURN_VOID (_env);
- for (u_int i = 0; i < events.length (); ++i)
- {
- const RtecEventComm::Event& e = events[i];
- if (e.type_ != ACE_ES_EVENT_INTERVAL_TIMEOUT)
- continue;
-
- // ACE_DEBUG ((LM_DEBUG, "ECM_Supplier - timeout (%t)\n"));
+ this->supplier_proxy_ =
+ consumer_admin->obtain_push_supplier (_env);
+ TAO_CHECK_ENV_RETURN_VOID (_env);
- RtecEventComm::Event& s = sent[i];
- s.source_ = this->supplier_id_;
- s.ttl_ = 1;
+ RtecEventComm::PushConsumer_var objref = this->_this (_env);
+ TAO_CHECK_ENV_RETURN_VOID (_env);
- ACE_hrtime_t t = ACE_OS::gethrtime ();
- ORBSVCS_Time::hrtime_to_TimeT (s.creation_time_, t);
- s.ec_recv_time_ = ORBSVCS_Time::zero;
- s.ec_send_time_ = ORBSVCS_Time::zero;
+ this->supplier_proxy_->connect_push_consumer (objref.in (),
+ qos.get_ConsumerQOS (),
+ _env);
+ TAO_CHECK_ENV_RETURN_VOID (_env);
+}
- s.data_.x = 0;
- s.data_.y = 0;
+void
+ECM_Consumer::close (CORBA::Environment &_env)
+{
+ if (CORBA::is_nil (this->supplier_proxy_.in ()))
+ return;
- this->event_count_--;
+ this->supplier_proxy_->disconnect_push_supplier (_env);
+ TAO_CHECK_ENV_RETURN_VOID (_env);
- if (this->event_count_ < 0)
- {
- //this->supplier_proxy_->disconnect_push_supplier (_env);
- //if (_env.exception () != 0) return;
- this->test_->shutdown_supplier (this->cookie_,
- this->consumer_proxy_.in (),
- _env);
- }
- if (this->event_count_ % 2 == 0)
- {
- // Generate an A event...
- s.type_ = this->event_a_;
- }
- else
- {
- s.type_ = this->event_b_;
- }
- }
- this->test_->push_supplier (this->cookie_,
- this->consumer_proxy_.in (),
- sent,
- _env);
+ this->supplier_proxy_ = 0;
}
void
-ECM_Supplier::disconnect_push_supplier (CORBA::Environment& _env)
+ECM_Consumer::push (const RtecEventComm::EventSet& events,
+ CORBA::Environment &_env)
{
- this->supplier_proxy_->disconnect_push_supplier (_env);
+ ACE_hrtime_t arrival = ACE_OS::gethrtime ();
+ this->federation_->consumer_push (arrival, events, _env);
}
void
-ECM_Supplier::disconnect_push_consumer (CORBA::Environment &)
+ECM_Consumer::disconnect_push_consumer (CORBA::Environment &)
{
}
-int ECM_Supplier::supplier_id (void) const
+// ****************************************************************
+
+ECM_Local_Federation::ECM_Local_Federation (ECM_Federation *federation,
+ ECM_Driver *driver)
+ : federation_ (federation),
+ driver_ (driver),
+ consumer_ (this),
+ supplier_ (this),
+ recv_count_ (0),
+ unfiltered_count_ (0),
+ invalid_count_ (0),
+ send_count_ (0),
+ event_count_ (0),
+ last_publication_change_ (0),
+ last_subscription_change_ (0),
+ mcast_eh_ (&receiver_)
{
- return this->supplier_id_;
}
-// ****************************************************************
+void
+ECM_Local_Federation::open (int event_count,
+ RtecScheduler::Period period,
+ RtecEventChannelAdmin::EventChannel_ptr ec,
+ RtecScheduler::Scheduler_ptr scheduler,
+ CORBA::Environment& _env)
+{
+ this->event_count_ = event_count;
+
+ const int bufsize = 512;
+ char buf[bufsize];
+ ACE_OS::strcpy (buf, this->federation_->name ());
+ ACE_OS::strcat (buf, "::supplier");
+
+ this->supplier_.open (buf, period, ec, scheduler, _env);
+ TAO_CHECK_ENV_RETURN_VOID (_env);
+
+ ACE_OS::strcpy (buf, this->federation_->name ());
+ ACE_OS::strcat (buf, "::consumer");
+ this->consumer_.open (buf, ec, scheduler, _env);
+ TAO_CHECK_ENV_RETURN_VOID (_env);
+}
-ECM_Consumer::ECM_Consumer (ECM_Driver *test,
- void *cookie)
- : test_ (test),
- cookie_ (cookie)
+void
+ECM_Local_Federation::close (CORBA::Environment &_env)
{
+ this->consumer_.close (_env);
+ TAO_CHECK_ENV_RETURN_VOID (_env);
+
+ this->supplier_.close (_env);
+ TAO_CHECK_ENV_RETURN_VOID (_env);
}
void
-ECM_Consumer::open (const char* name,
- int event_a, int event_b,
- RtecEventChannelAdmin::EventChannel_ptr ec,
- CORBA::Environment& _env)
+ECM_Local_Federation::activate (RtecScheduler::Period period,
+ RtecEventChannelAdmin::EventChannel_ptr ec,
+ RtecScheduler::Scheduler_ptr scheduler,
+ CORBA::Environment& _env)
{
- TAO_TRY
- {
- RtecScheduler::Scheduler_ptr server =
- ACE_Scheduler_Factory::server ();
+ this->supplier_.activate (this->federation_->name (),
+ period,
+ ec, scheduler, _env);
+}
- RtecScheduler::handle_t rt_info =
- server->create (name, TAO_TRY_ENV);
- TAO_CHECK_ENV;
+void
+ECM_Local_Federation::supplier_timeout (RtecEventComm::PushConsumer_ptr consumer,
+ CORBA::Environment &_env)
+{
+ RtecEventComm::EventSet sent (1);
+ sent.length (1);
- // The worst case execution time is far less than 2
- // milliseconds, but that is a safe estimate....
- ACE_Time_Value tv (0, 2000);
- TimeBase::TimeT time;
- ORBSVCS_Time::Time_Value_to_TimeT (time, tv);
- server->set (rt_info,
- RtecScheduler::VERY_HIGH_CRITICALITY,
- time, time, time,
- 0,
- RtecScheduler::VERY_LOW_IMPORTANCE,
- time,
- 0,
- RtecScheduler::OPERATION,
- TAO_TRY_ENV);
- TAO_CHECK_ENV;
+ RtecEventComm::Event& s = sent[0];
+ s.header.source = this->supplier_.supplier_id();
+ s.header.ttl = 1;
- ACE_ConsumerQOS_Factory qos;
- qos.start_disjunction_group ();
- qos.insert_type (ACE_ES_EVENT_SHUTDOWN, rt_info);
- qos.insert_type (event_a, rt_info);
- qos.insert_type (event_b, rt_info);
+ ACE_hrtime_t t = ACE_OS::gethrtime ();
+ ORBSVCS_Time::hrtime_to_TimeT (s.header.creation_time, t);
+ s.header.ec_recv_time = ORBSVCS_Time::zero;
+ s.header.ec_send_time = ORBSVCS_Time::zero;
- // = Connect as a consumer.
- RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin =
- ec->for_consumers (TAO_TRY_ENV);
- TAO_CHECK_ENV;
+ s.data.x = 0;
+ s.data.y = 0;
- this->supplier_proxy_ =
- consumer_admin->obtain_push_supplier (TAO_TRY_ENV);
- TAO_CHECK_ENV;
+ this->event_count_--;
- RtecEventComm::PushConsumer_var objref = this->_this (TAO_TRY_ENV);
- TAO_CHECK_ENV;
+ // ACE_DEBUG ((LM_DEBUG, "Federation <%s> event count <%d>\n",
+ // this->name (), this->event_count_));
- this->supplier_proxy_->connect_push_consumer (objref.in (),
- qos.get_ConsumerQOS (),
- TAO_TRY_ENV);
- TAO_CHECK_ENV;
+ if (this->event_count_ < 0)
+ {
+ this->driver_->federation_has_shutdown (this, _env);
+ return;
}
- TAO_CATCHANY
+ int i = this->event_count_ % this->federation_->supplier_types ();
+ s.header.type = this->federation_->supplier_ipaddr (i);
+
+ consumer->push (sent, _env);
+ TAO_CHECK_ENV_RETURN_VOID (_env);
+
+ this->send_count_++;
+}
+
+void
+ECM_Local_Federation::consumer_push (ACE_hrtime_t,
+ const RtecEventComm::EventSet &event,
+ CORBA::Environment &_env)
+{
+ if (event.length () == 0)
{
- TAO_RETHROW;
+ return;
+ }
+
+ for (CORBA::ULong i = 0; i < event.length (); ++i)
+ {
+ const RtecEventComm::Event& e = event[i];
+
+ this->recv_count_++;
+
+ int j = 0;
+ for (; j < this->federation_->consumer_types (); ++j)
+ {
+ if (e.header.type == this->federation_->consumer_ipaddr(j))
+ {
+ // @@ TODO check if the type is in the current
+ // subscription list.
+ break;
+ }
+ }
+ if (j == this->federation_->consumer_types ())
+ this->invalid_count_++;
}
- TAO_ENDTRY;
}
void
-ECM_Consumer::close (CORBA::Environment &_env)
+ECM_Local_Federation::open_receiver (RtecEventChannelAdmin::EventChannel_ptr ec,
+ RtecScheduler::Scheduler_ptr scheduler,
+ CORBA::Environment &_env)
{
- if (CORBA::is_nil (this->supplier_proxy_.in ()))
- return;
+ const int bufsize = 512;
+ char buf[bufsize];
+ ACE_OS::strcpy (buf, this->name ());
+ ACE_OS::strcat (buf, "/receiver");
+
+ ACE_INET_Addr local_addr;
+ this->federation_->sender_local_addr (local_addr);
+ this->receiver_.init (ec, scheduler,
+ buf,
+ local_addr,
+ _env);
+ TAO_CHECK_ENV_RETURN_VOID (_env);
+
+ RtecScheduler::handle_t rt_info =
+ scheduler->create (buf, _env);
+ TAO_CHECK_ENV_RETURN_VOID(_env);
+
+ // The worst case execution time is far less than 2
+ // milliseconds, but that is a safe estimate....
+ ACE_Time_Value tv (0, 2000);
+ TimeBase::TimeT time;
+ ORBSVCS_Time::Time_Value_to_TimeT (time, tv);
+ scheduler->set (rt_info,
+ RtecScheduler::VERY_HIGH_CRITICALITY,
+ time, time, time,
+ 0,
+ RtecScheduler::VERY_LOW_IMPORTANCE,
+ time,
+ 1,
+ RtecScheduler::OPERATION,
+ _env);
+ TAO_CHECK_ENV_RETURN_VOID (_env);
+
+ RtecEventComm::EventSourceID source = ACE::crc32 (buf);
+
+ this->mcast_eh_.reactor (TAO_ORB_Core_instance ()->reactor ());
+
+ ACE_SupplierQOS_Factory qos;
+ for (int i = 0; i < this->consumer_types (); ++i)
+ {
+ qos.insert (source,
+ this->consumer_ipaddr (i),
+ rt_info, 1);
+ ACE_INET_Addr mcast_addr (this->mcast_port (),
+ this->consumer_ipaddr (i));
+ this->mcast_eh_.subscribe (mcast_addr);
+ }
- this->supplier_proxy_->disconnect_push_supplier (_env);
- if (_env.exception () != 0) return;
+ this->mcast_eh_.open ();
- this->supplier_proxy_ = 0;
+ RtecEventChannelAdmin::SupplierQOS qos_copy =
+ qos.get_SupplierQOS ();
+ this->receiver_.open (qos_copy, _env);
+ TAO_CHECK_ENV_RETURN_VOID (_env);
+
+
}
void
-ECM_Consumer::push (const RtecEventComm::EventSet& events,
- CORBA::Environment &_env)
+ECM_Local_Federation::close_receiver (CORBA::Environment &_env)
{
- ACE_hrtime_t arrival = ACE_OS::gethrtime ();
- this->test_->push_consumer (this->cookie_, arrival, events, _env);
+ this->receiver_.close (_env);
+ TAO_CHECK_ENV_RETURN_VOID (_env);
+ this->receiver_.shutdown (_env);
+ TAO_CHECK_ENV_RETURN_VOID (_env);
}
void
-ECM_Consumer::disconnect_push_consumer (CORBA::Environment &)
+ECM_Local_Federation::dump_results (void) const
{
+ double unfiltered_ratio = 0;
+ if (this->recv_count_ != 0)
+ unfiltered_ratio = double(this->unfiltered_count_)/this->recv_count_;
+ double invalid_ratio = 0;
+ if (this->recv_count_ != 0)
+ invalid_ratio = double(this->invalid_count_)/this->recv_count_;
+
+ ACE_DEBUG ((LM_DEBUG,
+ "Federation: %s\n"
+ " events received: %d\n"
+ " unfiltered events received: %d\n"
+ " ratio: %f\n"
+ " invalid events received: %d\n"
+ " ratio: %f\n"
+ " events sent: %d\n",
+ this->name (),
+ this->recv_count_,
+ this->unfiltered_count_,
+ unfiltered_ratio,
+ this->invalid_count_,
+ invalid_ratio,
+ this->send_count_));
}
// ****************************************************************
@@ -1233,8 +1238,8 @@ ECM_Consumer::disconnect_push_consumer (CORBA::Environment &)
int
main (int argc, char *argv [])
{
- ECM_Driver test;
- return test.run (argc, argv);
+ ECM_Driver driver;
+ return driver.run (argc, argv);
}
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
diff --git a/TAO/orbsvcs/tests/EC_Mcast/EC_Mcast.h b/TAO/orbsvcs/tests/EC_Mcast/EC_Mcast.h
index 46aa7a3aaad..758f5c3734b 100644
--- a/TAO/orbsvcs/tests/EC_Mcast/EC_Mcast.h
+++ b/TAO/orbsvcs/tests/EC_Mcast/EC_Mcast.h
@@ -6,10 +6,37 @@
// = DESCRIPTION
// This test attempts to communicate several Event Channels UDP
// using multicast.
-// The test hardcodes all the objects involved (consumers,
-// suppliers, proxies, etc.); the objective is to gain understanding
-// on the use of multicast events, not to provide a complete and
-// clean implementation.
+// The test reads a configuration file that describe what events are
+// received by each "VirtualConsumer". The user must provide, on the
+// command line, which virtual consumers are present on each process.
+// The test also creates one supplier for each consumer, the
+// supplier can send an event of any possible type described in the
+// file.
+
+// = HOW
+// The test creates a local consumer for each remote consumer, this
+// is necessary to send the event with the right port number; it
+// then sends the event using multicast.
+// Notice that there is still a win in using multicast because
+// multiple copies of the virtual consumer may be available.
+// To receive the event the test creates one local supplier for each
+// local "Virtual Consumer".
+//
+// = TODO
+// The class names in this test are *way* too artificial, I should
+// use the RTI names.
+//
+// It is unfortunate that the test must know before-hand the remote
+// consumer interests. It would be really simple to use a better
+// strategy: the test could "observe" changes in the remote EC
+// subscription list, it could then modify its local consumers
+// subscriptions.
+// Similarly the suppliers that supply Mcast packets as local events
+// could observe the changes in the local subscriptions and use that
+// to join or leave the multicast groups.
+// To demostrate this the test will need to reconfigure its
+// subscription list every so often (a few seconds seems like a good
+// idea).
//
// ============================================================================
@@ -22,29 +49,114 @@
#include "orbsvcs/RtecEventCommS.h"
#include "orbsvcs/Channel_Clients_T.h"
#include "orbsvcs/Event/EC_Gateway_UDP.h"
+#include "orbsvcs/Event/EC_UDP_Admin.h"
class ECM_Driver;
+class ECM_Federation
+{
+ // = DESCRIPTION
+ // The test reads a configuration file where it obtains the data
+ // about each "federation". A federation is some application,
+ // distributed over several processes. The potential set of
+ // publications and the potential set of subscriptions is known
+ // beforehand, but the actual publications (or subscriptions) may
+ // change dynamically.
+ // As stated above the federation may be present in more than one
+ // process, but also a process may participate in more than one
+ // federation.
+ //
+public:
+ ECM_Federation (char* name,
+ CORBA::UShort mcast_port,
+ int supplier_types,
+ char** supplier_names,
+ int consumer_types,
+ char** consumer_names);
+ // Constructor, it assumes ownership of the buffers, strings must be
+ // allocated using CORBA::string_alloc(), buffers using operator new.
+
+ ~ECM_Federation (void);
+ // Dtor
+
+ const char* name (void) const;
+ // The name of the federation....
+
+ CORBA::UShort mcast_port (void) const;
+ // The port used by this federation to receive mcast messages.
+
+ int supplier_types (void) const;
+ // The number of different event types published by this federation.
+
+ const char* supplier_name (CORBA::ULong i) const;
+ // The name (mcast addr in A.B.C.D format) of the event type <i>
+
+ CORBA::ULong supplier_ipaddr (CORBA::ULong i) const;
+ // The ipaddr (in host byte order) of the event type <i>
+
+ int consumer_types (void) const;
+ // The number of different event types consumed by this federation.
+
+ const char* consumer_name (CORBA::ULong i) const;
+ // The name (mcast addr in A.B.C.D format) of the event type <i>
+
+ CORBA::ULong consumer_ipaddr (CORBA::ULong i) const;
+ // The ipaddr (in host byte order) of the event type <i>
+
+ void open (ACE_SOCK_Dgram *dgram,
+ RtecEventChannelAdmin::EventChannel_ptr ec,
+ RtecScheduler::Scheduler_ptr scheduler,
+ CORBA::Environment &_env);
+ // Connect the UDP sender to the EC.
+
+ void close (CORBA::Environment &_env);
+ // Close the UDP sender, disconnect from the EC
+
+ int sender_local_addr (ACE_INET_Addr& addr);
+ // Return the sender local address
+
+private:
+ char* name_;
+ CORBA::UShort mcast_port_;
+
+ int supplier_types_;
+ char** supplier_names_;
+ CORBA::ULong* supplier_ipaddr_;
+
+ int consumer_types_;
+ char** consumer_names_;
+ CORBA::ULong* consumer_ipaddr_;
+
+ TAO_ECG_UDP_Sender sender_;
+ // The sender
+
+ TAO_EC_Simple_AddrServer addr_server_;
+ // Resolve event headers (type,source) to UDP addresses
+ // (ipaddr,port)
+};
+
+class ECM_Local_Federation;
+
class ECM_Supplier : public POA_RtecEventComm::PushSupplier
{
//
// = TITLE
- // Helper class to implement the different tests within ECM_Driver.
+ // Helper class to simulate an application acting as an event
+ // supplier.
//
// = DESCRIPTION
- // ECM_Driver can be configured to have a single or mcast
- // suppliers, to use the EC or short-circuit it, to use the
- // Gateway or not; this class connects as a consumer for timeouts
- // in the EC, at each timeout it delegates on the ECM_Driver class
- // to execute the proper test.
+ // This class connects as a consumer for timeouts in the EC. On
+ // every timeout it delegates on the ECM_Local_Federation class,
+ // usually this results in some reconfiguration and/or some events
+ // sent.
+ //
public:
- ECM_Supplier (ECM_Driver* test, void* cookie);
+ ECM_Supplier (ECM_Local_Federation* federation);
void open (const char* name,
- int event_a, int event_b,
- int event_count,
- const RtecScheduler::Period& rate,
- RtecEventChannelAdmin::EventChannel_ptr ec,
+ RtecScheduler::Period period,
+ RtecEventChannelAdmin::EventChannel_ptr event_channel,
+ RtecScheduler::Scheduler_ptr scheduler,
CORBA::Environment& _env);
// This method connects the supplier to the EC.
@@ -52,38 +164,30 @@ public:
// Disconnect from the EC.
void activate (const char* name,
- const RtecScheduler::Period& rate,
- RtecEventChannelAdmin::EventChannel_ptr ec,
+ RtecScheduler::Period period,
+ RtecEventChannelAdmin::EventChannel_ptr event_channel,
+ RtecScheduler::Scheduler_ptr scheduler,
CORBA::Environment& _env);
+ // Connect as a consumer to start receiving events.
+
+ RtecEventComm::EventSourceID supplier_id (void) const;
+ // The supplier ID.
void push (const RtecEventComm::EventSet& events,
CORBA::Environment &_env);
void disconnect_push_consumer (CORBA::Environment &);
// Implement the callbacks for our consumer personality.
-
+ // = The POA_RtecEventComm::PushSupplier methods.
virtual void disconnect_push_supplier (CORBA::Environment &);
- // The methods in the skeleton.
-
- RtecEventComm::EventSourceID supplier_id (void) const;
- // The supplier ID.
private:
- ECM_Driver* test_;
-
- void *cookie_;
- // The test provide us a cookie so we can give back our identity.
+ ECM_Local_Federation* federation_;
+ // To callback the federation.
RtecEventComm::EventSourceID supplier_id_;
// We generate an id based on the name....
- int event_a_;
- int event_b_;
- // The two types of events we may generate...
-
- int event_count_;
- // The number of events sent by this supplier.
-
RtecEventChannelAdmin::ProxyPushConsumer_var consumer_proxy_;
// We talk to the EC (as a supplier) using this proxy.
@@ -93,252 +197,298 @@ private:
RtecEventChannelAdmin::ProxyPushSupplier_var supplier_proxy_;
// We talk to the EC (as a supplier) using this proxy.
-
};
class ECM_Consumer : public POA_RtecEventComm::PushConsumer
{
//
// = TITLE
- // Helper class to implement the different tests within ECM_Driver.
+ // Helper class to simulate an application acting as an event
+ // consumer.
//
// = DESCRIPTION
- // ECM_Driver must collect events destined to many consumers, but
- // needs to distinguish through which consumer it is receiving the
- // event. The easiest way is to create a shallow class that
- // forwards the events to the EC, but passing back some cookie to
- // identify the consumer.
+ // This class connects as an event consumer to the EC. The events
+ // are actually handled by the ECM_Local_Federation.
public:
- ECM_Consumer (ECM_Driver* test, void *cookie);
+ ECM_Consumer (ECM_Local_Federation* federation);
void open (const char* name,
- int event_a, int event_b,
- RtecEventChannelAdmin::EventChannel_ptr ec,
+ RtecEventChannelAdmin::EventChannel_ptr event_channel,
+ RtecScheduler::Scheduler_ptr scheduler,
CORBA::Environment& _env);
// This method connects the consumer to the EC.
void close (CORBA::Environment &_env);
// Disconnect from the EC.
+ // = The POA_RtecEventComm::PushComsumer methods.
virtual void push (const RtecEventComm::EventSet& events,
CORBA::Environment &_env);
virtual void disconnect_push_consumer (CORBA::Environment &);
- // The skeleton methods.
private:
- ECM_Driver* test_;
- // The test class.
-
- void *cookie_;
- // The magic cookie that serves as our ID.
+ ECM_Local_Federation* federation_;
+ // To callback.
RtecEventChannelAdmin::ProxyPushSupplier_var supplier_proxy_;
// We talk to the EC using this proxy.
};
+class ECM_Local_Federation
+{
+ // = DESCRIPTION
+ // This class is used to represent a federation that is actually
+ // running in this process.
+ //
+public:
+ ECM_Local_Federation (ECM_Federation *federation,
+ ECM_Driver *driver);
+ // Constructor.
+
+ void open (int event_count,
+ RtecScheduler::Period period,
+ RtecEventChannelAdmin::EventChannel_ptr event_channel,
+ RtecScheduler::Scheduler_ptr scheduler,
+ CORBA::Environment& _env);
+ // Connect both the supplier and the consumer.
+
+ void close (CORBA::Environment& _env);
+ // Disconnect everybody from the EC
+
+ void activate (RtecScheduler::Period period,
+ RtecEventChannelAdmin::EventChannel_ptr event_channel,
+ RtecScheduler::Scheduler_ptr scheduler,
+ CORBA::Environment& _env);
+ // Activate the supplier
+
+ void supplier_timeout (RtecEventComm::PushConsumer_ptr consumer,
+ CORBA::Environment& _env);
+ // The supplier is ready to send a new event.
+
+ void consumer_push (ACE_hrtime_t arrival,
+ const RtecEventComm::EventSet& event,
+ CORBA::Environment& _env);
+ // The consumer just received an event.
+
+ const ECM_Federation *federation (void) const;
+ // The federation description.
+
+ void open_receiver (RtecEventChannelAdmin::EventChannel_ptr ec,
+ RtecScheduler::Scheduler_ptr scheduler,
+ CORBA::Environment &_env);
+ // Connect the UDP receiver to the EC.
+
+ void close_receiver (CORBA::Environment &_env);
+ // Close the UDP receiver, disconnect from the EC
+
+ void dump_results (void) const;
+ // Report the results back to the user...
+
+ // = Delegate on the federation description
+ const char* name (void) const;
+ CORBA::UShort mcast_port (void) const;
+ int supplier_types (void) const;
+ const char* supplier_name (CORBA::ULong i) const;
+ CORBA::ULong supplier_ipaddr (CORBA::ULong i) const;
+ int consumer_types (void) const;
+ const char* consumer_name (CORBA::ULong i) const;
+ CORBA::ULong consumer_ipaddr (CORBA::ULong i) const;
+
+private:
+ ECM_Federation *federation_;
+ // The description of the events we send and receive.
+
+ ECM_Driver *driver_;
+ // The test driver.
+
+ ECM_Consumer consumer_;
+ ECM_Supplier supplier_;
+ // The supplier and consumer helper classes, other than
+ // initialization this classes only forward events to the
+ // Federation.
+
+ // Collect statistics
+
+ CORBA::ULong recv_count_;
+ // Messages received.
+
+ CORBA::ULong unfiltered_count_;
+ // Messages received that were not properly filtered.
+
+ CORBA::ULong invalid_count_;
+ // Message received that could *not* be destined to this federation,
+ // yet they were received.
+
+ CORBA::ULong send_count_;
+ // Messages sent.
+
+ int event_count_;
+ // How many messages will we send before stop the simulation.
+
+ ACE_Time_Value last_publication_change_;
+ // The last time we changed our publication list, we don't change it
+ // too often.
+
+ ACE_Time_Value last_subscription_change_;
+ // The last time we changed our publication, so we don't change too
+ // often.
+
+ TAO_ECG_UDP_Receiver receiver_;
+ // This object reads the events and pushes them into the EC. Notice
+ // that it can receive events from multiple Event Handlers.
+
+ TAO_ECG_Mcast_EH mcast_eh_;
+ // The event handler, it receives callbacks from the reactor
+ // whenever an event is available in some of the multicast groups,
+ // it then forwards to the <mcast_recv_> object for processing and
+ // dispatching of the event.
+ // @@ TODO Eventually we may need several of this objects to handle
+ // OS limitations on the number of multicast groups per socket.
+};
+
class ECM_Driver
{
//
// = TITLE
- // Test and demonstrate the use of TAO_EC_Gateway.
+ // Demonstrate the use of the UDP Gateways.
//
// = DESCRIPTION
- // This class is design to exercise several features of the EC_Gateway
- // class and the mcast EC architecture.
- // We want to create two EC, each one having a single supplier and a
- // single consumer.
- // + To test the remote facilities the consumer register for both a
- // local event and a remote one.
- // + To test the remote filtering features the remote consumer only
- // wants one of the local events, and this event is generated less
- // frequently.
- //
- // This class creates the local EC_Gateway a consumer and a
- // supplier, it uses the command line to figure the subscriptions
- // and publications list.
+ // This class is design to exercise several features of the UDP
+ // Gateways and its companion classes.
+ // We create a set of processes, each running one EC, with
+ // multiple consumers and suppliers colocated with the EC.
+ // The ECs communicate among themselves using multicast.
+ // The test thus show how to use multicast, change the local
+ // ECG_UDP_Receiver and ECG_UDP_Sender QoS specifications
+ // dynamically, how to economically use the OS resources to
+ // receive and send multicast messages, etc.
//
public:
ECM_Driver (void);
enum {
MAX_EVENTS = 1024,
- // Maximum number of events to send...
+ // Maximum number of events to send on each Federation.
- MAX_CONSUMERS = 16,
- // Maximum number of consumers.
+ MAX_LOCAL_FEDERATIONS = 16,
+ // Maximum number of federations running on a single process
- MAX_SUPPLIERS = 16
- // Maximum number of suppliers.
+ MAX_FEDERATIONS = 128
+ // Maximum number of federations in the simulation
};
int run (int argc, char* argv[]);
- // Execute the test.
-
- void push_supplier (void* supplier_cookie,
- RtecEventChannelAdmin::ProxyPushConsumer_ptr consumer,
- const RtecEventComm::EventSet &events,
- CORBA::Environment &);
- // Callback method for suppliers, we push for them to their
- // consumers and take statistics on the way.
- // It is possible that we ignore the <consumer> parameter when
- // testing the short-circuit case.
-
- void push_consumer (void* consumer_cookie,
- ACE_hrtime_t arrival,
- const RtecEventComm::EventSet& events,
- CORBA::Environment&);
- // Callback method for consumers, if any of our consumers has
- // received events it will invoke this method.
-
- void shutdown_supplier (void* supplier_cookie,
- RtecEventComm::PushConsumer_ptr consumer,
- CORBA::Environment& _env);
- // One of the suppliers has completed its work.
+ // Run the test, read all the configuration files, etc.
+
+ void federation_has_shutdown (ECM_Local_Federation *federation,
+ CORBA::Environment& _env);
+ // One of the federations has completed its simulation, once all of
+ // them finish the test exists.
+
private:
- RtecEventChannelAdmin::EventChannel_ptr
- get_ec (CosNaming::NamingContext_ptr naming_context,
- const char* ec_name,
- CORBA::Environment &_env);
- // Helper routine to obtain an EC given its name.
-
- void connect_suppliers (RtecEventChannelAdmin::EventChannel_ptr local_ec,
- CORBA::Environment &_env);
- void disconnect_suppliers (CORBA::Environment &_env);
- // Connect the suppliers.
-
- void activate_suppliers (RtecEventChannelAdmin::EventChannel_ptr local_ec,
- CORBA::Environment &_env);
- // Activate the suppliers, i.e. they start generating events.
-
- void connect_ecg (RtecEventChannelAdmin::EventChannel_ptr local_ec,
- CORBA::Environment &_env);
- // Connect the EC gateway, it builds the Subscriptions and the
- // Publications list.
-
- void connect_consumers (RtecEventChannelAdmin::EventChannel_ptr local_ec,
- CORBA::Environment &_env);
- void disconnect_consumers (CORBA::Environment &_env);
- // Connect and disconnect the consumers.
+ void open_federations (RtecEventChannelAdmin::EventChannel_ptr ec,
+ RtecScheduler::Scheduler_ptr scheduler,
+ CORBA::Environment &_env);
+ // Connect the federations to the EC.
+
+ void activate_federations (RtecEventChannelAdmin::EventChannel_ptr ec,
+ RtecScheduler::Scheduler_ptr scheduler,
+ CORBA::Environment &_env);
+ // Activate all the federations
+
+ void close_federations (CORBA::Environment &_env);
+ // Close the federations, i.e. disconnect from the EC, deactivate
+ // the objects, etc.
+
+ void open_senders (RtecEventChannelAdmin::EventChannel_ptr ec,
+ RtecScheduler::Scheduler_ptr scheduler,
+ CORBA::Environment &_env);
+ // Connect all the senders, so we can start multicasting events.
+
+ void open_receivers (RtecEventChannelAdmin::EventChannel_ptr ec,
+ RtecScheduler::Scheduler_ptr scheduler,
+ CORBA::Environment &_env);
+ // Connect all the receivers, thus we accept events arriving through
+ // multicast.
+
+ void close_senders (CORBA::Environment &_env);
+ // Close all the senders to cleanup resources.
+
+ void close_receivers (CORBA::Environment &_env);
+ // Close all the receivers to cleanup resources.
int shutdown (CORBA::Environment&);
- // Called when the main thread (i.e. not the scavenger thread) is
- // shutting down.
+ // Called when the main thread.
int parse_args (int argc, char* argv[]);
- // parse the command line args
-
- void dump_results (void);
- // Dump the results to the standard output.
+ // parse the command line arguments
- void wait_until_ready (void);
- // Block event delivery until all the consumers are ready.
+ int parse_config_file (void);
+ // parse the command line arguments
- struct Stats;
- void dump_results (const char* name, Stats& stats);
- // Dump the results for a particular consumer.
+ int parse_name_list (FILE* file, int n, char** names,
+ const char* error_msg);
+ // parse one of the lists of names in the federation definition.
- int local_source (RtecEventComm::EventSourceID id) const;
- // Check if <id> correspond to a local supplier.
+ int skip_blanks (FILE* file,
+ const char* error_msg);
+ // skip the blanks in the file.
- void shutdown_consumer (int id);
- // One of the consumers has completed its work.
+ void dump_results (void);
+ // Dump the results to the standard output.
private:
char* lcl_name_;
// The name of the "local" EC.
- int short_circuit_;
- // Send events directly to local consumers.
-
- int n_suppliers_;
- // The number of suppliers in this test.
-
- int n_consumers_;
- // The number of consumers.
-
- int workload_;
- // The number of iterations of ACE::is_prime() to execute.
-
int event_period_;
// The events are generated using this interval.
int event_count_;
// How many events will the suppliers send
- int s_event_a_;
- int s_event_b_;
- int c_event_a_;
- int c_event_b_;
- int r_event_a_;
- int r_event_b_;
- // Each supplier send two types of events, each consumer receives
- // two other types. The remote suppliers send other two types of
- // events.
-
- const char* schedule_file_;
- // Ask the schedule to compute and dump its schedule after the test
- // execution.
-
- const char* pid_file_name_;
- // The name of a file where the process stores its pid
+ char* config_filename_;
+ // The name of the file where we read the configuration.
- ACE_INET_Addr send_mcast_group_;
- // The multicast group to send the consumer events.
-
- TAO_ECG_UDP_Sender sender_;
- // This consumer sends the local events through <send_mcast_group>
-
- ACE_INET_Addr recv_mcast_group_;
- // The multicast group to receive the consumer events.
-
- TAO_ECG_UDP_Receiver receiver_;
- // This supplier pushes the remote events received throug
- // <recv_mcast_group>
-
- TAO_ECG_Mcast_EH mcast_eh_;
- // The event handler to receive the mcast events.
-
- ECM_Supplier* suppliers_[ECM_Driver::MAX_SUPPLIERS];
- ECM_Consumer* consumers_[ECM_Driver::MAX_CONSUMERS];
- // The suppliers and consumer arrays, the sizes are controlled using
- // {lp,hp}_{suppliers,consumers}_
+ const char* pid_filename_;
+ // The name of a file where the process stores its pid
- // @@ TODO it looks like the HP and LP data could be encapsulated.
+ int local_federations_count_;
+ // How many federations are running in this process (or, if you
+ // prefer, in how many federations does this process participate).
- struct Stats {
- ACE_hrtime_t total_time_;
- float laxity_[MAX_EVENTS];
- ACE_hrtime_t lcl_latency_[MAX_EVENTS];
- ACE_hrtime_t end_[MAX_EVENTS];
- int lcl_count_;
- // We keep laxity and total_time stats only for the local events.
+ ECM_Local_Federation* local_federations_[MAX_LOCAL_FEDERATIONS];
+ // The local federations.
- ACE_hrtime_t rmt_latency_[MAX_EVENTS];
- int rmt_count_;
- };
- Stats stats_[ECM_Driver::MAX_CONSUMERS];
- // Store the measurements for local and remote events..
+ char* local_names_[MAX_LOCAL_FEDERATIONS];
+ // The names of the local federations.
- int ready_;
- ACE_SYNCH_MUTEX ready_mtx_;
- ACE_SYNCH_CONDITION ready_cnd_;
- // Before accepting any events the suppliers must wait for the test
- // to setup all the consumers.
- // The suppliers wait on the condition variable.
+ int all_federations_count_;
+ // The total number of federations we belong to.
- ACE_Atomic_Op<ACE_SYNCH_MUTEX,int> running_suppliers_;
- // keep track of how many suppliers are still running so we shutdown
- // at the right moment.
+ ECM_Federation* all_federations_[MAX_FEDERATIONS];
+ // All the federations.
- ACE_Atomic_Op<ACE_SYNCH_MUTEX,int> running_consumers_;
- // keep track of how many consumers are still running so we shutdown
- // at the right moment.
+ ACE_Atomic_Op<ACE_SYNCH_MUTEX,int> federations_running_;
+ // Keep track of how many federations are active so we can shutdown
+ // once they are all destroyed.
ACE_hrtime_t test_start_;
ACE_hrtime_t test_stop_;
// Measure the test elapsed time as well as mark the beginning of
// the frames.
+
+ CORBA::ORB_var orb_;
+ // The ORB, so we can shut it down.
+
+ ACE_SOCK_Dgram send_dgram_;
+ // This socket is shared by all the federations to send the
+ // multicast events.
};
+#if defined (__ACE_INLINE__)
+#include "EC_Mcast.i"
+#endif /* __ACE_INLINE__ */
+
#endif /* EC_MCAST_H */
diff --git a/TAO/orbsvcs/tests/EC_Mcast/EC_Mcast.i b/TAO/orbsvcs/tests/EC_Mcast/EC_Mcast.i
new file mode 100644
index 00000000000..0e1a45f7940
--- /dev/null
+++ b/TAO/orbsvcs/tests/EC_Mcast/EC_Mcast.i
@@ -0,0 +1,120 @@
+//
+// $Id$
+//
+
+ACE_INLINE int
+ECM_Federation::sender_local_addr (ACE_INET_Addr& addr)
+{
+ return this->sender_.get_local_addr (addr);
+}
+
+
+ACE_INLINE const char*
+ECM_Federation::name (void) const
+{
+ return this->name_;
+}
+
+ACE_INLINE CORBA::UShort
+ECM_Federation::mcast_port (void) const
+{
+ return this->mcast_port_;
+}
+
+ACE_INLINE int
+ECM_Federation::supplier_types (void) const
+{
+ return this->supplier_types_;
+}
+
+ACE_INLINE const char*
+ECM_Federation::supplier_name (CORBA::ULong i) const
+{
+ if (i < this->supplier_types_)
+ return this->supplier_names_[i];
+ return 0;
+}
+
+ACE_INLINE CORBA::ULong
+ECM_Federation::supplier_ipaddr (CORBA::ULong i) const
+{
+ if (i < this->supplier_types_)
+ return this->supplier_ipaddr_[i];
+ return 0;
+}
+
+ACE_INLINE int
+ECM_Federation::consumer_types (void) const
+{
+ return this->consumer_types_;
+}
+
+ACE_INLINE const char*
+ECM_Federation::consumer_name (CORBA::ULong i) const
+{
+ if (i < this->consumer_types_)
+ return this->consumer_names_[i];
+ return 0;
+}
+
+ACE_INLINE CORBA::ULong
+ECM_Federation::consumer_ipaddr (CORBA::ULong i) const
+{
+ if (i < this->consumer_types_)
+ return this->consumer_ipaddr_[i];
+ return 0;
+}
+
+ACE_INLINE const ECM_Federation*
+ECM_Local_Federation::federation (void) const
+{
+ return this->federation_;
+}
+
+ACE_INLINE const char*
+ECM_Local_Federation::name (void) const
+{
+ return this->federation_->name ();
+}
+
+ACE_INLINE CORBA::UShort
+ECM_Local_Federation::mcast_port (void) const
+{
+ return this->federation_->mcast_port ();
+}
+
+ACE_INLINE int
+ECM_Local_Federation::supplier_types (void) const
+{
+ return this->federation_->supplier_types ();
+}
+
+ACE_INLINE const char*
+ECM_Local_Federation::supplier_name (CORBA::ULong i) const
+{
+ return this->federation_->supplier_name (i);
+}
+
+ACE_INLINE CORBA::ULong
+ECM_Local_Federation::supplier_ipaddr (CORBA::ULong i) const
+{
+ return this->federation_->supplier_ipaddr (i);
+}
+
+ACE_INLINE int
+ECM_Local_Federation::consumer_types (void) const
+{
+ return this->federation_->consumer_types ();
+}
+
+ACE_INLINE const char*
+ECM_Local_Federation::consumer_name (CORBA::ULong i) const
+{
+ return this->federation_->consumer_name (i);
+}
+
+ACE_INLINE CORBA::ULong
+ECM_Local_Federation::consumer_ipaddr (CORBA::ULong i) const
+{
+ return this->federation_->consumer_ipaddr (i);
+}
diff --git a/TAO/orbsvcs/tests/EC_Mcast/README b/TAO/orbsvcs/tests/EC_Mcast/README
new file mode 100644
index 00000000000..e3be683604a
--- /dev/null
+++ b/TAO/orbsvcs/tests/EC_Mcast/README
@@ -0,0 +1,27 @@
+# $Id$
+
+This test can be pretty complicated to run, a sample configuration file is
+included to help you startup.
+
+ The basic idea to remember is that a "Federation" is a logical
+grouping of incoming and outgoing mcast addresses that share the same
+multicast port. The test reads the configuration file to find out
+which Federations are globally available defined, but the comand line
+is used to specify the list of federations that are actually running
+on each process.
+ The test requires the Naming Serivice:
+
+$ ../../Naming_Service/Naming_Service
+
+ this is an *extremely* silent service, don't be surprized if
+you see no output.
+
+ Then you ough to run at least two copies of the test, on
+different windows (to make the test interesting):
+
+$ ./EC_Mcast -l ECM1 -p ECM1.pid -c sample.cfg -n 200 -t 500000 -f Set01
+$ ./EC_Mcast -l ECM2 -p ECM2.pid -c sample.cfg -n 200 -t 500000 -f Set02
+
+ the test will report the number of events received, if you run
+just one of the processes you will notice that this number is smaller,
+this indicates that some events come from a "remote" event.
diff --git a/TAO/orbsvcs/tests/EC_Mcast/sample.cfg b/TAO/orbsvcs/tests/EC_Mcast/sample.cfg
new file mode 100644
index 00000000000..060977e294f
--- /dev/null
+++ b/TAO/orbsvcs/tests/EC_Mcast/sample.cfg
@@ -0,0 +1,64 @@
+6
+Set00 12000 5 3
+ 224.100.0.1
+ 224.100.0.2
+ 224.100.0.3
+ 224.100.0.4
+ 224.100.0.5
+ 224.100.5.1
+ 224.100.5.2
+ 224.100.5.3
+Set01 12001 4 4
+ 224.100.1.1
+ 224.100.1.2
+ 224.100.1.3
+ 224.100.1.4
+ 224.100.2.1
+ 224.100.2.2
+ 224.100.2.4
+ 224.100.0.1
+Set02 12002 3 5
+ 224.100.2.1
+ 224.100.2.2
+ 224.100.2.3
+ 224.100.2.1
+ 224.100.2.2
+ 224.100.2.3
+ 224.100.1.1
+ 224.100.1.3
+Set03 12003 4 4
+ 224.100.3.1
+ 224.100.3.2
+ 224.100.3.3
+ 224.100.3.4
+ 224.100.2.1
+ 224.100.2.2
+ 224.100.2.3
+ 224.100.2.4
+Set04 12004 4 4
+ 224.100.4.1
+ 224.100.4.2
+ 224.100.4.3
+ 224.100.4.4
+ 224.100.5.1
+ 224.100.5.2
+ 224.100.4.3
+ 224.100.4.4
+Set05 12005 4 4
+ 224.100.5.1
+ 224.100.5.2
+ 224.100.5.3
+ 224.100.5.4
+ 224.100.4.1
+ 224.100.4.2
+ 224.100.5.3
+ 224.100.5.4
+Set06 12006 4 4
+ 224.100.0.1
+ 224.100.0.2
+ 224.100.2.1
+ 224.100.2.2
+ 224.100.0.3
+ 224.100.0.4
+ 224.100.2.3
+ 224.100.2.4
diff --git a/TAO/orbsvcs/tests/EC_Multiple/EC_Multiple.cpp b/TAO/orbsvcs/tests/EC_Multiple/EC_Multiple.cpp
index e263e080476..5ceddced4a0 100644
--- a/TAO/orbsvcs/tests/EC_Multiple/EC_Multiple.cpp
+++ b/TAO/orbsvcs/tests/EC_Multiple/EC_Multiple.cpp
@@ -119,9 +119,6 @@ print_priority_info (const char *const name)
#endif /* ACE_HAS_PTHREADS_STD */
}
-
-
-
int
Test_ECG::run (int argc, char* argv[])
{
@@ -926,16 +923,16 @@ Test_ECG::push_consumer (void *consumer_cookie,
{
const RtecEventComm::Event& e = events[i];
- if (e.type_ == ACE_ES_EVENT_SHUTDOWN)
+ if (e.header.type == ACE_ES_EVENT_SHUTDOWN)
{
this->shutdown_consumer (ID);
continue;
}
ACE_hrtime_t s;
- ORBSVCS_Time::TimeT_to_hrtime (s, e.creation_time_);
+ ORBSVCS_Time::TimeT_to_hrtime (s, e.header.creation_time);
ACE_hrtime_t nsec = arrival - s;
- if (this->local_source (e.source_))
+ if (this->local_source (e.header.source))
{
int& count = this->stats_[ID].lcl_count_;
@@ -1004,16 +1001,14 @@ Test_ECG::shutdown_supplier (void* /* supplier_cookie */,
shutdown.length (1);
RtecEventComm::Event& s = shutdown[0];
- s.source_ = 0;
- s.ttl_ = 1;
+ s.header.source = 0;
+ s.header.ttl = 1;
ACE_hrtime_t t = ACE_OS::gethrtime ();
- ORBSVCS_Time::hrtime_to_TimeT (s.creation_time_, t);
- s.ec_recv_time_ = ORBSVCS_Time::zero;
- s.ec_send_time_ = ORBSVCS_Time::zero;
- s.data_.x = 0;
- s.data_.y = 0;
- s.type_ = ACE_ES_EVENT_SHUTDOWN;
+ ORBSVCS_Time::hrtime_to_TimeT (s.header.creation_time, t);
+ s.header.ec_recv_time = ORBSVCS_Time::zero;
+ s.header.ec_send_time = ORBSVCS_Time::zero;
+ s.header.type = ACE_ES_EVENT_SHUTDOWN;
consumer->push (shutdown, _env);
}
}
@@ -1483,22 +1478,22 @@ Test_Supplier::push (const RtecEventComm::EventSet& events,
for (u_int i = 0; i < events.length (); ++i)
{
const RtecEventComm::Event& e = events[i];
- if (e.type_ != ACE_ES_EVENT_INTERVAL_TIMEOUT)
+ if (e.header.type != ACE_ES_EVENT_INTERVAL_TIMEOUT)
continue;
// ACE_DEBUG ((LM_DEBUG, "Test_Supplier - timeout (%t)\n"));
RtecEventComm::Event& s = sent[i];
- s.source_ = this->supplier_id_;
- s.ttl_ = 1;
+ s.header.source = this->supplier_id_;
+ s.header.ttl = 1;
ACE_hrtime_t t = ACE_OS::gethrtime ();
- ORBSVCS_Time::hrtime_to_TimeT (s.creation_time_, t);
- s.ec_recv_time_ = ORBSVCS_Time::zero;
- s.ec_send_time_ = ORBSVCS_Time::zero;
+ ORBSVCS_Time::hrtime_to_TimeT (s.header.creation_time, t);
+ s.header.ec_recv_time = ORBSVCS_Time::zero;
+ s.header.ec_send_time = ORBSVCS_Time::zero;
- s.data_.x = 0;
- s.data_.y = 0;
+ s.data.x = 0;
+ s.data.y = 0;
this->message_count_--;
@@ -1513,11 +1508,11 @@ Test_Supplier::push (const RtecEventComm::EventSet& events,
if (this->message_count_ % 2 == 0)
{
// Generate an A event...
- s.type_ = this->event_a_;
+ s.header.type = this->event_a_;
}
else
{
- s.type_ = this->event_b_;
+ s.header.type = this->event_b_;
}
}
this->test_->push_supplier (this->cookie_,
diff --git a/TAO/orbsvcs/tests/EC_Throughput/ECT_Consumer.cpp b/TAO/orbsvcs/tests/EC_Throughput/ECT_Consumer.cpp
index 4b2abf3637e..f94473fc6fa 100644
--- a/TAO/orbsvcs/tests/EC_Throughput/ECT_Consumer.cpp
+++ b/TAO/orbsvcs/tests/EC_Throughput/ECT_Consumer.cpp
@@ -213,7 +213,7 @@ Driver::push_consumer (void* consumer_cookie,
{
const RtecEventComm::Event& e = events[i];
- if (e.data_.payload.mb () == 0)
+ if (e.data.payload.mb () == 0)
{
ACE_DEBUG ((LM_DEBUG, "No data in event[%d]\n", i));
continue;
diff --git a/TAO/orbsvcs/tests/EC_Throughput/ECT_Supplier.cpp b/TAO/orbsvcs/tests/EC_Throughput/ECT_Supplier.cpp
index 014fcff7adb..b3600dc9363 100644
--- a/TAO/orbsvcs/tests/EC_Throughput/ECT_Supplier.cpp
+++ b/TAO/orbsvcs/tests/EC_Throughput/ECT_Supplier.cpp
@@ -199,28 +199,28 @@ ECTS_Driver::supplier_task (Test_Supplier *supplier,
{
RtecEventComm::EventSet event (1);
event.length (1);
- event[0].source_ = supplier->supplier_id ();
- event[0].ttl_ = 1;
+ event[0].header.source = supplier->supplier_id ();
+ event[0].header.ttl = 1;
ACE_hrtime_t t = ACE_OS::gethrtime ();
- ORBSVCS_Time::hrtime_to_TimeT (event[0].creation_time_, t);
- event[0].ec_recv_time_ = ORBSVCS_Time::zero;
- event[0].ec_send_time_ = ORBSVCS_Time::zero;
+ ORBSVCS_Time::hrtime_to_TimeT (event[0].header.creation_time, t);
+ event[0].header.ec_recv_time = ORBSVCS_Time::zero;
+ event[0].header.ec_send_time = ORBSVCS_Time::zero;
if (i == ACE_static_cast (CORBA::Long, this->event_count_) - 1)
- event[0].type_ = ACE_ES_EVENT_SHUTDOWN;
+ event[0].header.type = ACE_ES_EVENT_SHUTDOWN;
else if (i % 2 == 0)
- event[0].type_ = this->event_a_;
+ event[0].header.type = this->event_a_;
else
- event[0].type_ = this->event_b_;
+ event[0].header.type = this->event_b_;
- event[0].data_.x = 0;
- event[0].data_.y = 0;
+ event[0].data.x = 0;
+ event[0].data.y = 0;
// We use replace to minimize the copies, this should result
// in just one memory allocation;
- event[0].data_.payload.replace (this->event_size_,
- &mb);
+ event[0].data.payload.replace (this->event_size_,
+ &mb);
supplier->consumer_proxy ()->push(event, TAO_TRY_ENV);
TAO_CHECK_ENV;
diff --git a/TAO/orbsvcs/tests/Event_Latency/Event_Latency.cpp b/TAO/orbsvcs/tests/Event_Latency/Event_Latency.cpp
index 612f0b7ae1e..b1c5c1c8061 100644
--- a/TAO/orbsvcs/tests/Event_Latency/Event_Latency.cpp
+++ b/TAO/orbsvcs/tests/Event_Latency/Event_Latency.cpp
@@ -189,7 +189,7 @@ Latency_Consumer::push (const RtecEventComm::EventSet &events,
for (CORBA::ULong i = 0; i < events.length (); ++i)
{
- if (events[i].type_ == ACE_ES_EVENT_SHUTDOWN)
+ if (events[i].header.type == ACE_ES_EVENT_SHUTDOWN)
{
ACE_DEBUG ((LM_DEBUG,
"Latency Consumer: received shutdown event\n"));
@@ -201,15 +201,15 @@ Latency_Consumer::push (const RtecEventComm::EventSet &events,
{
ACE_hrtime_t creation;
ORBSVCS_Time::TimeT_to_hrtime (creation,
- events[i].creation_time_);
+ events[i].header.creation_time);
ACE_hrtime_t ec_recv;
ORBSVCS_Time::TimeT_to_hrtime (ec_recv,
- events[i].ec_recv_time_);
+ events[i].header.ec_recv_time);
ACE_hrtime_t ec_send;
ORBSVCS_Time::TimeT_to_hrtime (ec_send,
- events[i].ec_send_time_);
+ events[i].header.ec_send_time);
const ACE_hrtime_t now = ACE_OS::gethrtime ();
const ACE_hrtime_t elapsed = now - creation;
@@ -553,24 +553,25 @@ Latency_Supplier::push (const RtecEventComm::EventSet &events,
for (CORBA::ULong i = 0; i < events.length (); ++i)
{
- if (!master_ && events[i].type_ == ACE_ES_EVENT_SHUTDOWN)
+ if (!master_ && events[i].header.type == ACE_ES_EVENT_SHUTDOWN)
{
ACE_DEBUG ((LM_DEBUG,
"Latency Supplier: received shutdown event\n"));
this->shutdown ();
}
- else if (events[i].type_ == ACE_ES_EVENT_INTERVAL_TIMEOUT)
+ else if (events[i].header.type == ACE_ES_EVENT_INTERVAL_TIMEOUT)
{
// Create the event to send.
RtecEventComm::Event event;
- event.source_ = supplier_id_;
- event.type_ = ACE_ES_EVENT_NOTIFICATION;
+ event.header.source = this->supplier_id_;
+ event.header.type = ACE_ES_EVENT_NOTIFICATION;
++total_sent_;
- if (timestamp_)
+ if (this->timestamp_)
{
ACE_hrtime_t now = ACE_OS::gethrtime ();
- ORBSVCS_Time::hrtime_to_TimeT (event.creation_time_, now);
+ ORBSVCS_Time::hrtime_to_TimeT (event.header.creation_time,
+ now);
}
// @@ ACE_TIMEPROBE_RESET;
@@ -665,8 +666,8 @@ Latency_Supplier::shutdown (void)
{
// Create the shutdown message.
RtecEventComm::Event event;
- event.source_ = supplier_id_;
- event.type_ = ACE_ES_EVENT_SHUTDOWN;
+ event.header.source = this->supplier_id_;
+ event.header.type = ACE_ES_EVENT_SHUTDOWN;
// Push the shutdown event.
RtecEventComm::EventSet events (1);
diff --git a/TAO/tests/Cubit/TAO/IDL_Cubit/svc.conf b/TAO/tests/Cubit/TAO/IDL_Cubit/svc.conf
index ac7727fb103..3d0227a9192 100644
--- a/TAO/tests/Cubit/TAO/IDL_Cubit/svc.conf
+++ b/TAO/tests/Cubit/TAO/IDL_Cubit/svc.conf
@@ -5,4 +5,4 @@
dynamic Resource_Factory Service_Object * TAO:_make_TAO_Resource_Factory() "-ORBresources tss -ORBreactorlock null"
dynamic Client_Strategy_Factory Service_Object * TAO:_make_TAO_Default_Client_Strategy_Factory() "-ORBiiopprofilelock null"
-dynamic Server_Strategy_Factory Service_Object * TAO:_make_TAO_Default_Server_Strategy_Factory() "-ORBconcurrency thread-per-connection -ORBpoalock null -ORBconnectorlock null"
+dynamic Server_Strategy_Factory Service_Object * TAO:_make_TAO_Default_Server_Strategy_Factory() "-ORBconcurrency reactive -ORBpoalock null -ORBconnectorlock null"