summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoryamuna <yamuna@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2003-06-25 18:44:55 +0000
committeryamuna <yamuna@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2003-06-25 18:44:55 +0000
commit146c6f3c7ce06ce8dff28b77fb89a064f64f1efb (patch)
tree8cdc226b01b7ad7e980b879ff4e9410f8b18f9d8
parent19eba5022e40c89a36d0823efa90b4527de290c3 (diff)
downloadATCD-146c6f3c7ce06ce8dff28b77fb89a064f64f1efb.tar.gz
ChangelogTag: Wed June 25 14:46:19 2003 Yamuna Krishnamurthy <yamuna@oomworks.com>
-rw-r--r--TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/Connection_Manager.cpp151
-rw-r--r--TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/Connection_Manager.h29
-rw-r--r--TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/distributer.cpp11
-rw-r--r--TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/distributer.h2
-rw-r--r--TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/receiver.cpp8
-rw-r--r--TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/receiver.h2
-rw-r--r--TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/sender.cpp13
-rw-r--r--TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/sender.h3
8 files changed, 205 insertions, 14 deletions
diff --git a/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/Connection_Manager.cpp b/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/Connection_Manager.cpp
index 7f59e17ed6a..8fd9df060fe 100644
--- a/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/Connection_Manager.cpp
+++ b/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/Connection_Manager.cpp
@@ -10,6 +10,95 @@ Connection_Manager::~Connection_Manager (void)
{
}
+void
+Connection_Manager::load_ep_addr (const char* file_name)
+{
+ FILE* addr_file = ACE_OS::fopen (file_name, "r");
+
+ if (addr_file == 0)
+ {
+ ACE_ERROR ((LM_DEBUG,
+ "Cannot open addr file %s\n",
+ file_name));
+ return;
+ }
+ else
+ ACE_DEBUG ((LM_DEBUG,
+ "Addr file opened successfully\n"));
+
+ while (1)
+ {
+ char buf [BUFSIZ];
+
+ // Read from the file into a buffer
+
+
+ /*
+ int n = ACE_OS::fread (buf,
+ 1,
+ BUFSIZ,
+ addr_file);
+ */
+
+ if ((ACE_OS::fgets (buf,BUFSIZ,addr_file)) == NULL)
+ {
+ // At end of file break the loop and end the sender.
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,"End of Addr file\n"));
+ break;
+ }
+
+
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "%s\n",
+ buf));
+
+ Endpoint_Addresses* addr;
+ ACE_NEW (addr,
+ Endpoint_Addresses);
+
+ TAO_Tokenizer addr_tokenizer (buf,'/');
+
+ ACE_CString flowname;
+
+ if (addr_tokenizer [0] == 0)
+ {
+ ACE_ERROR ((LM_ERROR,
+ "Corresponding flow name not specified for endpoint addresses\n"));
+ return;
+ }
+ else
+ flowname += addr_tokenizer [0];
+
+ if (addr_tokenizer [1] != 0)
+ addr->sender_addr += CORBA::string_dup (addr_tokenizer [1]);
+
+ if (addr_tokenizer [2] != 0)
+ addr->receiver_addr += CORBA::string_dup (addr_tokenizer [2]);
+
+ int result = ep_addr_.bind (flowname,
+ addr);
+ if (result == 0)
+ {
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "Flowname %s Bound Successfully\n",
+ flowname.c_str ()));
+ }
+ else if (result == 1)
+ ACE_DEBUG ((LM_DEBUG,
+ "Flowname %s already exists\n",
+ flowname.c_str ()));
+ else ACE_DEBUG ((LM_DEBUG,
+ "Flowname %s Bound Failed\n",
+ flowname.c_str ()));
+
+
+ }
+
+}
+
int
Connection_Manager::init (CORBA::ORB_ptr orb)
{
@@ -30,8 +119,10 @@ Connection_Manager::bind_to_receivers (const ACE_CString &sender_name,
this->sender_name_ =
sender_name;
+ /*
this->sender_ =
AVStreams::MMDevice::_duplicate (sender);
+ */
CosNaming::Name name (1);
name.length (1);
@@ -184,7 +275,8 @@ Connection_Manager::add_to_receivers (CosNaming::BindingList &binding_list
}
void
-Connection_Manager::connect_to_receivers (ACE_ENV_SINGLE_ARG_DECL)
+Connection_Manager::connect_to_receivers (AVStreams::MMDevice_ptr sender
+ ACE_ENV_ARG_DECL)
{
// Connect to all receivers that we know about.
for (Receivers::iterator iterator = this->receivers_.begin ();
@@ -197,13 +289,37 @@ Connection_Manager::connect_to_receivers (ACE_ENV_SINGLE_ARG_DECL)
ACE_CString flowname =
(*iterator).ext_id_;
+
+ Endpoint_Addresses* addr = 0;
+ int result = ep_addr_.find (flowname,
+ addr);
+
+ ACE_CString sender_addr_str;
+ ACE_CString receiver_addr_str;
+
+ if (result != -1)
+ {
+ sender_addr_str = addr->sender_addr;
+ receiver_addr_str = addr->receiver_addr;
+ }
+ else ACE_DEBUG ((LM_DEBUG,
+ "No endpoint address for flowname %s\n",
+ flowname.c_str ()));
+
+ ACE_INET_Addr receiver_addr (receiver_addr_str.c_str ());
+ ACE_INET_Addr sender_addr (sender_addr_str.c_str ());
+
+
// Create the forward flow specification to describe the flow.
TAO_Forward_FlowSpec_Entry sender_entry (flowname.c_str (),
"IN",
"USER_DEFINED",
"",
"UDP",
- 0);
+ &sender_addr);
+
+ sender_entry.set_peer_addr (&receiver_addr);
+
// Set the flow specification for the stream between receiver
// and distributer
@@ -232,7 +348,7 @@ Connection_Manager::connect_to_receivers (ACE_ENV_SINGLE_ARG_DECL)
streamctrl_object);
// Bind the sender and receiver MMDevices.
- (void) streamctrl->bind_devs (this->sender_.in (),
+ (void) streamctrl->bind_devs (sender,
(*iterator).int_id_.in (),
the_qos.inout (),
flow_spec
@@ -370,13 +486,38 @@ Connection_Manager::connect_to_sender (ACE_ENV_SINGLE_ARG_DECL)
"_" +
this->receiver_name_;
+
+ Endpoint_Addresses* addr = 0;
+ int ret = ep_addr_.find (flowname,
+ addr);
+
+ ACE_CString sender_addr_str;
+ ACE_CString receiver_addr_str;
+
+ if (ret != -1)
+ {
+ sender_addr_str = addr->sender_addr;
+ receiver_addr_str = addr->receiver_addr;
+
+ ACE_DEBUG ((LM_DEBUG,
+ "Address Strings %s %s\n",
+ sender_addr_str.c_str (),
+ receiver_addr_str.c_str ()));
+ }
+
+ ACE_INET_Addr sender_addr (sender_addr_str.c_str ());
+ ACE_INET_Addr receiver_addr (receiver_addr_str.c_str ());
+
// Create the forward flow specification to describe the flow.
TAO_Forward_FlowSpec_Entry sender_entry (flowname.c_str (),
"IN",
"USER_DEFINED",
"",
"UDP",
- 0);
+ &sender_addr);
+
+ sender_entry.set_peer_addr (&receiver_addr);
+
// Set the flow specification for the stream between sender and
// receiver.
@@ -444,7 +585,7 @@ Connection_Manager::add_streamctrl (const ACE_CString &flowname,
if( streamctrl_any.in() >>= streamctrl )
{
- // Any still owns the pointer, so we duplicate it
+ // Any still owns the pointer, so we duplicate it
AVStreams::StreamCtrl::_duplicate( streamctrl );
this->streamctrls_.bind (flowname,
streamctrl);
diff --git a/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/Connection_Manager.h b/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/Connection_Manager.h
index 8c2e07dbf51..f27b97eef2f 100644
--- a/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/Connection_Manager.h
+++ b/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/Connection_Manager.h
@@ -23,6 +23,13 @@
#include "orbsvcs/AV/Protocol_Factory.h"
#include "tao/PortableServer/PortableServer.h"
+class Endpoint_Addresses
+{
+ public:
+ ACE_CString sender_addr;
+ ACE_CString receiver_addr;
+};
+
class Connection_Manager
{
// = TITLE
@@ -49,7 +56,8 @@ public:
// Method that binds the sender to the Naming Service and retreives
// the references of any registered receivers.
- void connect_to_receivers (ACE_ENV_SINGLE_ARG_DECL_WITH_DEFAULTS);
+ void connect_to_receivers (AVStreams::MMDevice_ptr sender
+ ACE_ENV_ARG_DECL_WITH_DEFAULTS);
// Connect to the receivers that we found.
void bind_to_sender (const ACE_CString &sender_name,
@@ -75,9 +83,9 @@ public:
// Map of receivers.
typedef ACE_Hash_Map_Manager<ACE_CString,
- AVStreams::MMDevice_var,
- ACE_Null_Mutex>
- Receivers;
+ AVStreams::MMDevice_var,
+ ACE_Null_Mutex>
+ Receivers;
// Map of protocol objects.
typedef ACE_Hash_Map_Manager<ACE_CString,
@@ -85,19 +93,31 @@ public:
ACE_Null_Mutex>
Protocol_Objects;
+
+
// Map of streamctrl.
typedef ACE_Hash_Map_Manager<ACE_CString,
AVStreams::StreamCtrl_var,
ACE_Null_Mutex>
StreamCtrls;
+ // Map of flownames and corresponding endpoint addresses
+ typedef ACE_Hash_Map_Manager<ACE_CString,
+ Endpoint_Addresses*,
+ ACE_Null_Mutex>
+ EP_Addr;
+
// Map accessors.
Receivers &receivers (void);
Protocol_Objects &protocol_objects (void);
StreamCtrls &streamctrls (void);
+ void load_ep_addr (const char* file_name);
+
protected:
+
+
void find_receivers (ACE_ENV_SINGLE_ARG_DECL);
void add_to_receivers (CosNaming::BindingList &binding_list
@@ -110,6 +130,7 @@ protected:
Receivers receivers_;
Protocol_Objects protocol_objects_;
StreamCtrls streamctrls_;
+ EP_Addr ep_addr_;
// Sender name.
ACE_CString sender_name_;
diff --git a/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/distributer.cpp b/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/distributer.cpp
index 4dd7074aded..de4512bf7da 100644
--- a/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/distributer.cpp
+++ b/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/distributer.cpp
@@ -97,6 +97,7 @@ Distributer::Distributer (void)
: sender_name_ ("sender")
, distributer_name_ ("distributer")
, done_ (0)
+ , addr_file_ ("addr_file")
{
}
@@ -115,13 +116,16 @@ Distributer::parse_args (int argc,
char **argv)
{
// Parse command line arguments
- ACE_Get_Opt opts (argc, argv, "s:r:");
+ ACE_Get_Opt opts (argc, argv, "s:r:a:");
int c;
while ((c= opts ()) != -1)
{
switch (c)
{
+ case 'a':
+ this->addr_file_ = opts.opt_arg ();
+ break;
case 's':
this->sender_name_ = opts.opt_arg ();
break;
@@ -168,6 +172,8 @@ Distributer::init (int argc,
if (result != 0)
return result;
+ this->connection_manager_.load_ep_addr (this->addr_file_.c_str ());
+
ACE_NEW_RETURN (this->distributer_sender_mmdevice_,
TAO_MMDevice (&this->sender_endpoint_strategy_),
-1);
@@ -196,7 +202,8 @@ Distributer::init (int argc,
ACE_CHECK_RETURN (-1);
// Connect to receivers
- this->connection_manager_.connect_to_receivers (ACE_ENV_SINGLE_ARG_PARAMETER);
+ this->connection_manager_.connect_to_receivers (distributer_sender_mmdevice.in ()
+ ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
// Bind to sender.
diff --git a/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/distributer.h b/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/distributer.h
index a135e2ffe4e..319f6287193 100644
--- a/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/distributer.h
+++ b/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/distributer.h
@@ -154,4 +154,6 @@ protected:
int done_;
// Flag to know when we are done.
+
+ ACE_CString addr_file_;
};
diff --git a/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/receiver.cpp b/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/receiver.cpp
index a2cd8fade22..4c1d94125a0 100644
--- a/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/receiver.cpp
+++ b/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/receiver.cpp
@@ -84,6 +84,7 @@ Receiver_Callback::handle_destroy (void)
Receiver::Receiver (void)
: mmdevice_ (0),
output_file_name_ ("output"),
+ addr_file_ ("addr_file"),
sender_name_ ("distributer"),
receiver_name_ ("receiver")
{
@@ -111,6 +112,8 @@ Receiver::init (int,
if (result != 0)
return result;
+ this->connection_manager_.load_ep_addr (this->addr_file_.c_str ());
+
// Register the receiver mmdevice object with the ORB
ACE_NEW_RETURN (this->mmdevice_,
TAO_MMDevice (&this->reactive_strategy_),
@@ -145,13 +148,16 @@ Receiver::parse_args (int argc,
// Parse the command line arguments
ACE_Get_Opt opts (argc,
argv,
- "f:s:r:");
+ "f:s:r:a:");
int c;
while ((c = opts ()) != -1)
{
switch (c)
{
+ case 'a':
+ this->addr_file_ = opts.opt_arg ();
+ break;
case 'f':
this->output_file_name_ = opts.opt_arg ();
break;
diff --git a/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/receiver.h b/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/receiver.h
index acdf204041f..8d9638f4d2e 100644
--- a/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/receiver.h
+++ b/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/receiver.h
@@ -114,6 +114,8 @@ protected:
ACE_CString output_file_name_;
// File name of the file into which received data is written.
+ ACE_CString addr_file_;
+
ACE_CString sender_name_;
// Sender name.
diff --git a/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/sender.cpp b/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/sender.cpp
index 22f7ce39ece..6fb8655f858 100644
--- a/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/sender.cpp
+++ b/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/sender.cpp
@@ -41,6 +41,7 @@ Sender::Sender (void)
frame_count_ (0),
filename_ ("input"),
input_file_ (0),
+ addr_file_ ("addr_file"),
frame_rate_ (10.0),
mb_ (BUFSIZ),
sender_name_ ("sender")
@@ -52,13 +53,16 @@ Sender::parse_args (int argc,
char **argv)
{
// Parse command line arguments
- ACE_Get_Opt opts (argc, argv, "s:f:r:d");
+ ACE_Get_Opt opts (argc, argv, "s:f:r:da:");
int c;
while ((c= opts ()) != -1)
{
switch (c)
{
+ case 'a':
+ this->addr_file_ = opts.opt_arg ();
+ break;
case 'f':
this->filename_ = opts.opt_arg ();
break;
@@ -97,6 +101,8 @@ Sender::init (int argc,
if (result != 0)
return result;
+
+
// Parse the command line arguments
result =
this->parse_args (argc,
@@ -104,6 +110,8 @@ Sender::init (int argc,
if (result != 0)
return result;
+ this->connection_manager_.load_ep_addr (this->addr_file_.c_str ());
+
// Open file to read.
this->input_file_ =
ACE_OS::fopen (this->filename_.c_str (),
@@ -139,7 +147,8 @@ Sender::init (int argc,
ACE_CHECK_RETURN (-1);
// Connect to the receivers
- this->connection_manager_.connect_to_receivers (ACE_ENV_SINGLE_ARG_PARAMETER);
+ this->connection_manager_.connect_to_receivers (mmdevice.in ()
+ ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
return 0;
diff --git a/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/sender.h b/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/sender.h
index 910bc9f55f3..b4392ed9a4c 100644
--- a/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/sender.h
+++ b/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/sender.h
@@ -91,6 +91,9 @@ private:
FILE *input_file_;
// File handle of the file read from.
+ ACE_CString addr_file_;
+ // File from which data is read.
+
double frame_rate_;
// Rate at which the data will be sent.