diff options
author | yamuna <yamuna@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2003-06-25 18:44:55 +0000 |
---|---|---|
committer | yamuna <yamuna@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2003-06-25 18:44:55 +0000 |
commit | 017034bb613e359f9e2b5e8ec1ff0bb6ebba90af (patch) | |
tree | 8cdc226b01b7ad7e980b879ff4e9410f8b18f9d8 /TAO/orbsvcs/tests/AVStreams | |
parent | 4a4b1dc566368794aa534568365ce8e58c289d25 (diff) | |
download | ATCD-017034bb613e359f9e2b5e8ec1ff0bb6ebba90af.tar.gz |
ChangelogTag: Wed June 25 14:46:19 2003 Yamuna Krishnamurthy <yamuna@oomworks.com>
Diffstat (limited to 'TAO/orbsvcs/tests/AVStreams')
8 files changed, 205 insertions, 14 deletions
diff --git a/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/Connection_Manager.cpp b/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/Connection_Manager.cpp index 7f59e17ed6a..8fd9df060fe 100644 --- a/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/Connection_Manager.cpp +++ b/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/Connection_Manager.cpp @@ -10,6 +10,95 @@ Connection_Manager::~Connection_Manager (void) { } +void +Connection_Manager::load_ep_addr (const char* file_name) +{ + FILE* addr_file = ACE_OS::fopen (file_name, "r"); + + if (addr_file == 0) + { + ACE_ERROR ((LM_DEBUG, + "Cannot open addr file %s\n", + file_name)); + return; + } + else + ACE_DEBUG ((LM_DEBUG, + "Addr file opened successfully\n")); + + while (1) + { + char buf [BUFSIZ]; + + // Read from the file into a buffer + + + /* + int n = ACE_OS::fread (buf, + 1, + BUFSIZ, + addr_file); + */ + + if ((ACE_OS::fgets (buf,BUFSIZ,addr_file)) == NULL) + { + // At end of file break the loop and end the sender. + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG,"End of Addr file\n")); + break; + } + + + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "%s\n", + buf)); + + Endpoint_Addresses* addr; + ACE_NEW (addr, + Endpoint_Addresses); + + TAO_Tokenizer addr_tokenizer (buf,'/'); + + ACE_CString flowname; + + if (addr_tokenizer [0] == 0) + { + ACE_ERROR ((LM_ERROR, + "Corresponding flow name not specified for endpoint addresses\n")); + return; + } + else + flowname += addr_tokenizer [0]; + + if (addr_tokenizer [1] != 0) + addr->sender_addr += CORBA::string_dup (addr_tokenizer [1]); + + if (addr_tokenizer [2] != 0) + addr->receiver_addr += CORBA::string_dup (addr_tokenizer [2]); + + int result = ep_addr_.bind (flowname, + addr); + if (result == 0) + { + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "Flowname %s Bound Successfully\n", + flowname.c_str ())); + } + else if (result == 1) + ACE_DEBUG ((LM_DEBUG, + "Flowname %s already exists\n", + flowname.c_str ())); + else ACE_DEBUG ((LM_DEBUG, + "Flowname %s Bound Failed\n", + flowname.c_str ())); + + + } + +} + int Connection_Manager::init (CORBA::ORB_ptr orb) { @@ -30,8 +119,10 @@ Connection_Manager::bind_to_receivers (const ACE_CString &sender_name, this->sender_name_ = sender_name; + /* this->sender_ = AVStreams::MMDevice::_duplicate (sender); + */ CosNaming::Name name (1); name.length (1); @@ -184,7 +275,8 @@ Connection_Manager::add_to_receivers (CosNaming::BindingList &binding_list } void -Connection_Manager::connect_to_receivers (ACE_ENV_SINGLE_ARG_DECL) +Connection_Manager::connect_to_receivers (AVStreams::MMDevice_ptr sender + ACE_ENV_ARG_DECL) { // Connect to all receivers that we know about. for (Receivers::iterator iterator = this->receivers_.begin (); @@ -197,13 +289,37 @@ Connection_Manager::connect_to_receivers (ACE_ENV_SINGLE_ARG_DECL) ACE_CString flowname = (*iterator).ext_id_; + + Endpoint_Addresses* addr = 0; + int result = ep_addr_.find (flowname, + addr); + + ACE_CString sender_addr_str; + ACE_CString receiver_addr_str; + + if (result != -1) + { + sender_addr_str = addr->sender_addr; + receiver_addr_str = addr->receiver_addr; + } + else ACE_DEBUG ((LM_DEBUG, + "No endpoint address for flowname %s\n", + flowname.c_str ())); + + ACE_INET_Addr receiver_addr (receiver_addr_str.c_str ()); + ACE_INET_Addr sender_addr (sender_addr_str.c_str ()); + + // Create the forward flow specification to describe the flow. TAO_Forward_FlowSpec_Entry sender_entry (flowname.c_str (), "IN", "USER_DEFINED", "", "UDP", - 0); + &sender_addr); + + sender_entry.set_peer_addr (&receiver_addr); + // Set the flow specification for the stream between receiver // and distributer @@ -232,7 +348,7 @@ Connection_Manager::connect_to_receivers (ACE_ENV_SINGLE_ARG_DECL) streamctrl_object); // Bind the sender and receiver MMDevices. - (void) streamctrl->bind_devs (this->sender_.in (), + (void) streamctrl->bind_devs (sender, (*iterator).int_id_.in (), the_qos.inout (), flow_spec @@ -370,13 +486,38 @@ Connection_Manager::connect_to_sender (ACE_ENV_SINGLE_ARG_DECL) "_" + this->receiver_name_; + + Endpoint_Addresses* addr = 0; + int ret = ep_addr_.find (flowname, + addr); + + ACE_CString sender_addr_str; + ACE_CString receiver_addr_str; + + if (ret != -1) + { + sender_addr_str = addr->sender_addr; + receiver_addr_str = addr->receiver_addr; + + ACE_DEBUG ((LM_DEBUG, + "Address Strings %s %s\n", + sender_addr_str.c_str (), + receiver_addr_str.c_str ())); + } + + ACE_INET_Addr sender_addr (sender_addr_str.c_str ()); + ACE_INET_Addr receiver_addr (receiver_addr_str.c_str ()); + // Create the forward flow specification to describe the flow. TAO_Forward_FlowSpec_Entry sender_entry (flowname.c_str (), "IN", "USER_DEFINED", "", "UDP", - 0); + &sender_addr); + + sender_entry.set_peer_addr (&receiver_addr); + // Set the flow specification for the stream between sender and // receiver. @@ -444,7 +585,7 @@ Connection_Manager::add_streamctrl (const ACE_CString &flowname, if( streamctrl_any.in() >>= streamctrl ) { - // Any still owns the pointer, so we duplicate it + // Any still owns the pointer, so we duplicate it AVStreams::StreamCtrl::_duplicate( streamctrl ); this->streamctrls_.bind (flowname, streamctrl); diff --git a/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/Connection_Manager.h b/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/Connection_Manager.h index 8c2e07dbf51..f27b97eef2f 100644 --- a/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/Connection_Manager.h +++ b/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/Connection_Manager.h @@ -23,6 +23,13 @@ #include "orbsvcs/AV/Protocol_Factory.h" #include "tao/PortableServer/PortableServer.h" +class Endpoint_Addresses +{ + public: + ACE_CString sender_addr; + ACE_CString receiver_addr; +}; + class Connection_Manager { // = TITLE @@ -49,7 +56,8 @@ public: // Method that binds the sender to the Naming Service and retreives // the references of any registered receivers. - void connect_to_receivers (ACE_ENV_SINGLE_ARG_DECL_WITH_DEFAULTS); + void connect_to_receivers (AVStreams::MMDevice_ptr sender + ACE_ENV_ARG_DECL_WITH_DEFAULTS); // Connect to the receivers that we found. void bind_to_sender (const ACE_CString &sender_name, @@ -75,9 +83,9 @@ public: // Map of receivers. typedef ACE_Hash_Map_Manager<ACE_CString, - AVStreams::MMDevice_var, - ACE_Null_Mutex> - Receivers; + AVStreams::MMDevice_var, + ACE_Null_Mutex> + Receivers; // Map of protocol objects. typedef ACE_Hash_Map_Manager<ACE_CString, @@ -85,19 +93,31 @@ public: ACE_Null_Mutex> Protocol_Objects; + + // Map of streamctrl. typedef ACE_Hash_Map_Manager<ACE_CString, AVStreams::StreamCtrl_var, ACE_Null_Mutex> StreamCtrls; + // Map of flownames and corresponding endpoint addresses + typedef ACE_Hash_Map_Manager<ACE_CString, + Endpoint_Addresses*, + ACE_Null_Mutex> + EP_Addr; + // Map accessors. Receivers &receivers (void); Protocol_Objects &protocol_objects (void); StreamCtrls &streamctrls (void); + void load_ep_addr (const char* file_name); + protected: + + void find_receivers (ACE_ENV_SINGLE_ARG_DECL); void add_to_receivers (CosNaming::BindingList &binding_list @@ -110,6 +130,7 @@ protected: Receivers receivers_; Protocol_Objects protocol_objects_; StreamCtrls streamctrls_; + EP_Addr ep_addr_; // Sender name. ACE_CString sender_name_; diff --git a/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/distributer.cpp b/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/distributer.cpp index 4dd7074aded..de4512bf7da 100644 --- a/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/distributer.cpp +++ b/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/distributer.cpp @@ -97,6 +97,7 @@ Distributer::Distributer (void) : sender_name_ ("sender") , distributer_name_ ("distributer") , done_ (0) + , addr_file_ ("addr_file") { } @@ -115,13 +116,16 @@ Distributer::parse_args (int argc, char **argv) { // Parse command line arguments - ACE_Get_Opt opts (argc, argv, "s:r:"); + ACE_Get_Opt opts (argc, argv, "s:r:a:"); int c; while ((c= opts ()) != -1) { switch (c) { + case 'a': + this->addr_file_ = opts.opt_arg (); + break; case 's': this->sender_name_ = opts.opt_arg (); break; @@ -168,6 +172,8 @@ Distributer::init (int argc, if (result != 0) return result; + this->connection_manager_.load_ep_addr (this->addr_file_.c_str ()); + ACE_NEW_RETURN (this->distributer_sender_mmdevice_, TAO_MMDevice (&this->sender_endpoint_strategy_), -1); @@ -196,7 +202,8 @@ Distributer::init (int argc, ACE_CHECK_RETURN (-1); // Connect to receivers - this->connection_manager_.connect_to_receivers (ACE_ENV_SINGLE_ARG_PARAMETER); + this->connection_manager_.connect_to_receivers (distributer_sender_mmdevice.in () + ACE_ENV_ARG_PARAMETER); ACE_CHECK_RETURN (-1); // Bind to sender. diff --git a/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/distributer.h b/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/distributer.h index a135e2ffe4e..319f6287193 100644 --- a/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/distributer.h +++ b/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/distributer.h @@ -154,4 +154,6 @@ protected: int done_; // Flag to know when we are done. + + ACE_CString addr_file_; }; diff --git a/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/receiver.cpp b/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/receiver.cpp index a2cd8fade22..4c1d94125a0 100644 --- a/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/receiver.cpp +++ b/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/receiver.cpp @@ -84,6 +84,7 @@ Receiver_Callback::handle_destroy (void) Receiver::Receiver (void) : mmdevice_ (0), output_file_name_ ("output"), + addr_file_ ("addr_file"), sender_name_ ("distributer"), receiver_name_ ("receiver") { @@ -111,6 +112,8 @@ Receiver::init (int, if (result != 0) return result; + this->connection_manager_.load_ep_addr (this->addr_file_.c_str ()); + // Register the receiver mmdevice object with the ORB ACE_NEW_RETURN (this->mmdevice_, TAO_MMDevice (&this->reactive_strategy_), @@ -145,13 +148,16 @@ Receiver::parse_args (int argc, // Parse the command line arguments ACE_Get_Opt opts (argc, argv, - "f:s:r:"); + "f:s:r:a:"); int c; while ((c = opts ()) != -1) { switch (c) { + case 'a': + this->addr_file_ = opts.opt_arg (); + break; case 'f': this->output_file_name_ = opts.opt_arg (); break; diff --git a/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/receiver.h b/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/receiver.h index acdf204041f..8d9638f4d2e 100644 --- a/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/receiver.h +++ b/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/receiver.h @@ -114,6 +114,8 @@ protected: ACE_CString output_file_name_; // File name of the file into which received data is written. + ACE_CString addr_file_; + ACE_CString sender_name_; // Sender name. diff --git a/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/sender.cpp b/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/sender.cpp index 22f7ce39ece..6fb8655f858 100644 --- a/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/sender.cpp +++ b/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/sender.cpp @@ -41,6 +41,7 @@ Sender::Sender (void) frame_count_ (0), filename_ ("input"), input_file_ (0), + addr_file_ ("addr_file"), frame_rate_ (10.0), mb_ (BUFSIZ), sender_name_ ("sender") @@ -52,13 +53,16 @@ Sender::parse_args (int argc, char **argv) { // Parse command line arguments - ACE_Get_Opt opts (argc, argv, "s:f:r:d"); + ACE_Get_Opt opts (argc, argv, "s:f:r:da:"); int c; while ((c= opts ()) != -1) { switch (c) { + case 'a': + this->addr_file_ = opts.opt_arg (); + break; case 'f': this->filename_ = opts.opt_arg (); break; @@ -97,6 +101,8 @@ Sender::init (int argc, if (result != 0) return result; + + // Parse the command line arguments result = this->parse_args (argc, @@ -104,6 +110,8 @@ Sender::init (int argc, if (result != 0) return result; + this->connection_manager_.load_ep_addr (this->addr_file_.c_str ()); + // Open file to read. this->input_file_ = ACE_OS::fopen (this->filename_.c_str (), @@ -139,7 +147,8 @@ Sender::init (int argc, ACE_CHECK_RETURN (-1); // Connect to the receivers - this->connection_manager_.connect_to_receivers (ACE_ENV_SINGLE_ARG_PARAMETER); + this->connection_manager_.connect_to_receivers (mmdevice.in () + ACE_ENV_ARG_PARAMETER); ACE_CHECK_RETURN (-1); return 0; diff --git a/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/sender.h b/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/sender.h index 910bc9f55f3..b4392ed9a4c 100644 --- a/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/sender.h +++ b/TAO/orbsvcs/tests/AVStreams/Asynch_Three_Stage/sender.h @@ -91,6 +91,9 @@ private: FILE *input_file_; // File handle of the file read from. + ACE_CString addr_file_; + // File from which data is read. + double frame_rate_; // Rate at which the data will be sent. |