diff options
author | yamuna <yamuna@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2001-03-21 00:18:14 +0000 |
---|---|---|
committer | yamuna <yamuna@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2001-03-21 00:18:14 +0000 |
commit | abfe587ae9f02e039ba1e1e5830690facaa12053 (patch) | |
tree | 6269e654492a93917fd2252f5b23c136ab836088 | |
parent | e8a5df9320e95eb22f111f47d2cda1a96815060f (diff) | |
download | ATCD-abfe587ae9f02e039ba1e1e5830690facaa12053.tar.gz |
*** empty log message ***
7 files changed, 278 insertions, 264 deletions
diff --git a/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/README b/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/README index c11faefa8e8..9280d519f5a 100644 --- a/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/README +++ b/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/README @@ -51,6 +51,10 @@ receiver [-f <filename>] distributer: ----------- -distributer +distributer [-s <sender_port_number>] [-r <receiver_port_number] + +-s sender port number --> The port at which the sender stream endpoint is opened + +--r receiver port number --> The port at which the receiver stream endpoint is opened diff --git a/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/distributer.cpp b/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/distributer.cpp index 6ebe4ed39ce..d4a1db4000d 100644 --- a/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/distributer.cpp +++ b/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/distributer.cpp @@ -10,22 +10,25 @@ int Distributer_StreamEndPoint::set_protocol_object (const char *flow_name, TAO_AV_Protocol_Object *object) { - // Set the protocol object corresponding to the transport protocol selected. - DISTRIBUTER::instance ()->set_protocol_object (flow_name, object); - + if (ACE_OS::strcmp (flow_name, "Data_Sender") == 0) + { + // Set the protocol object corresponding to the transport protocol selected. + DISTRIBUTER::instance ()->set_sender_protocol_object (object); + } + // Store the flowname of the stream that this callback belongs to. - ACE_CString fname = flow_name; - this->callback_.flowname (fname); + this->callback_.flowname (flow_name); // Increment the stream count. DISTRIBUTER::instance ()->stream_created (); + return 0; } int Distributer_StreamEndPoint::get_callback (const char *, - TAO_AV_Callback *&callback) + TAO_AV_Callback *&callback) { // Create and return the application callback and return to the AVStreams // for further upcalls. @@ -34,7 +37,7 @@ Distributer_StreamEndPoint::get_callback (const char *, } void -Distributer_Callback::flowname (ACE_CString flowname) +Distributer_Callback::flowname (const ACE_CString &flowname) { this->flowname_ = flowname; } @@ -56,11 +59,15 @@ Distributer_Callback::receive_frame (ACE_Message_Block *frame, ACE_DEBUG ((LM_DEBUG, "Distributer_Callback::receive_frame\n")); - ACE_CString flowname = "Data_Receiver"; - // Get the protocol object corresponding to the receiver stream // send the data received from sender to the receiver. - DISTRIBUTER::instance ()->get_protocol_object (flowname.c_str())->send_frame (frame); + + int result = DISTRIBUTER::instance ()->get_sender_protocol_object ()->send_frame (frame); + + if (result < 0) + ACE_ERROR_RETURN ((LM_ERROR, + "Distributer_Callback::receive_frame send failed\n"), + -1); return 0; } @@ -69,81 +76,115 @@ int Distributer_Callback::handle_destroy (void) { // Called when the sender requests the stream to be shutdown. - ACE_DECLARE_NEW_CORBA_ENV; - - ACE_DEBUG ((LM_DEBUG, - "Distributer_Callback::end_stream\n")); - - if (ACE_OS::strcmp (this->flowname_.c_str (), "Data_Sender") == 0) + ACE_TRY_NEW_ENV { - // Destroy the sender receiver stream as the sender has requested a stream destroy - AVStreams::flowSpec stop_spec; - DISTRIBUTER::instance ()->get_receiver_streamctrl ()->destroy (stop_spec, - ACE_TRY_ENV); + + ACE_DEBUG ((LM_DEBUG, + "Distributer_Callback::end_stream\n")); + + if (ACE_OS::strcmp (this->flowname_.c_str (), "Data_Sender") == 0) + { + // Destroy the sender receiver stream as the sender has requested a stream destroy + AVStreams::flowSpec stop_spec; + DISTRIBUTER::instance ()->get_receiver_streamctrl ()->destroy (stop_spec, + ACE_TRY_ENV); + ACE_TRY_CHECK; + } + + // Decrement the stream count. + DISTRIBUTER::instance ()->stream_destroyed (); } - - // Decrement the stream count. - DISTRIBUTER::instance ()->stream_destroyed (); + ACE_CATCHANY + { + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, + "Distributer_Callback::handle_destroy Failed\n"); + return -1; + } + ACE_ENDTRY; + return 0; } int -Distributer::set_protocol_object (const char* flowname, TAO_AV_Protocol_Object* object) +Distributer::set_sender_protocol_object (TAO_AV_Protocol_Object* object) { // Set the corresponding protocol objects for the different streams created. - if (ACE_OS::strcmp ("Data_Sender", flowname) == 0) - this->protocol_object_ [0] = object; - else if (ACE_OS::strcmp ("Data_Receiver", flowname) == 0) - this->protocol_object_ [1] = object; + this->sender_protocol_object_ = object; return 0; } TAO_AV_Protocol_Object* -Distributer::get_protocol_object (const char* flowname) +Distributer::get_sender_protocol_object (void) { - if (ACE_OS::strcmp ("Data_Sender", flowname) == 0) - return this->protocol_object_ [0]; - else if (ACE_OS::strcmp ("Data_Receiver", flowname) == 0) - return this->protocol_object_ [1]; - - return *(this->protocol_object_); + return this->sender_protocol_object_; } Distributer::Distributer (void) :distributer_mmdevice_ (0), + sender_protocol_object_ (0), count_ (0), protocol_ ("UDP"), - sender_streamctrl_ (0), receiver_streamctrl_ (0), stream_count_ (0), - done_ (0) + done_ (0), + host_sender_port_ ("8000"), + host_receiver_port_ ("8010") { - + // Get the local host name char buf [BUFSIZ]; ACE_OS::hostname (buf, BUFSIZ); // Set the address to the local host and port. - this->address_ = buf; - this->address_ += ":8000"; + this->sender_address_ = buf; + this->sender_address_ += ":"; + this->sender_address_ += host_sender_port_.c_str (); + + // Set the address to the local host and port. + this->receiver_address_ = buf; + this->receiver_address_ += ":"; + this->receiver_address_ += host_receiver_port_.c_str (); - protocol_object_ [0] = 0; - protocol_object_ [1] = 0; - this->mb.size (BUFSIZ); } Distributer::~Distributer (void) { } +int +Distributer::parse_args (int argc, + char **argv) +{ + // Parse command line arguments + ACE_Get_Opt opts (argc,argv,"r:s:"); + + int c; + while ((c= opts ()) != -1) + { + switch (c) + { + case 'r': + this->host_receiver_port_ = opts.optarg; + break; + case 's': + this->host_sender_port_ = opts.optarg; + break; + default: + ACE_DEBUG ((LM_DEBUG,"Unknown Option\n")); + return -1; + } + } + return 0; +} + // Method to bind the sender reference to the Naming Service. int Distributer::bind_to_mmdevice (AVStreams::MMDevice_ptr &mmdevice, - ACE_CString mmdevice_name, + const ACE_CString &mmdevice_name, CORBA::Environment &ACE_TRY_ENV) { // Initialize the naming services @@ -177,10 +218,11 @@ Distributer::bind_to_mmdevice (AVStreams::MMDevice_ptr &mmdevice, int -Distributer::init (int, - char **, - CORBA::Environment &ACE_TRY_ENV) +Distributer::init (int argc, + char **argv, + CORBA::Environment &ACE_TRY_ENV) { + // Initialize the endpoint strategy with the orb and poa. int result = this->a_endpoint_strategy_.init (TAO_AV_CORE::instance ()->orb (), @@ -188,6 +230,15 @@ Distributer::init (int, if (result != 0) return result; + result = this->parse_args (argc, + argv); + + if (result < 0) + ACE_ERROR_RETURN ((LM_ERROR, + " (%P|%t) Error in Parse Args \n"), + -1); + + // Bind to the receiver mmdevice ACE_CString mmdevice_name ("Receiver"); result = bind_to_mmdevice (this->receiver_mmdevice_.inout (), @@ -208,24 +259,17 @@ Distributer::init (int, if (result != 0) return result; - // Create the Flow protocol name - ACE_CString flow_protocol_str; - - flow_protocol_str = ""; - // Initialize the QoS AVStreams::streamQoS_var the_qos (new AVStreams::streamQoS); // Set the address of the of the distributer receiver endpoint. - ACE_INET_Addr receiver_addr (this->address_.c_str ()); + ACE_INET_Addr receiver_addr (this->receiver_address_.c_str ()); - this->receiver_flowname_ = "Data_Receiver"; - // Create the forward flow specification to describe the flow. - TAO_Forward_FlowSpec_Entry receiver_entry (this->receiver_flowname_.c_str (), + TAO_Forward_FlowSpec_Entry receiver_entry ("Data_Receiver", "IN", "USER_DEFINED", - flow_protocol_str.c_str (), + "", // Flowname this->protocol_.c_str (), &receiver_addr); @@ -263,54 +307,51 @@ Distributer::init (int, ACE_TRY_ENV); ACE_CHECK_RETURN (-1); - // Flowname for the stream bewteen the sender and distributer - this->sender_flowname_ = "Data_Sender"; - // Get the local host name char buf [BUFSIZ]; ACE_OS::hostname (buf, BUFSIZ); - // Set the address to the local host and port - this->address_ = buf; - this->address_ += ":8001"; - // Set the address of the sender. - ACE_INET_Addr sender_addr (this->address_.c_str ()); + ACE_INET_Addr sender_addr (this->sender_address_.c_str ()); // Create the forward flow specification to describe the flow. - TAO_Forward_FlowSpec_Entry sender_entry (this->sender_flowname_.c_str (), + TAO_Forward_FlowSpec_Entry sender_entry ("Data_Sender", "OUT", "USER_DEFINED", - flow_protocol_str.c_str (), + "", // Flowname this->protocol_.c_str (), &sender_addr); - // Set the flow specification for the stream between sender and distributer - flow_spec [0] = CORBA::string_dup (sender_entry.entry_to_string ()); - - ACE_NEW_RETURN (this->sender_streamctrl_, + TAO_StreamCtrl* sender_streamctrl_; + // Video stream controller for the stream between sender and distributer + + ACE_NEW_RETURN (sender_streamctrl_, TAO_StreamCtrl, -1); // Servant Reference Counting to manage lifetime PortableServer::ServantBase_var safe_sender_streamctrl = - this->sender_streamctrl_; + sender_streamctrl_; + + // Set the flow specification for the stream between sender and distributer + flow_spec [0] = CORBA::string_dup (sender_entry.entry_to_string ()); // Bind/Connect the sender and sitributer MMDevices. CORBA::Boolean res = - this->sender_streamctrl_->bind_devs (distributer_mmdevice.in (), - this->sender_mmdevice_.in (), - the_qos.inout (), - flow_spec, - ACE_TRY_ENV); + sender_streamctrl_->bind_devs (distributer_mmdevice.in (), + sender_mmdevice_.in (), + the_qos.inout (), + flow_spec, + ACE_TRY_ENV); ACE_CHECK_RETURN (-1); if (res == 0) ACE_ERROR_RETURN ((LM_ERROR,"Streamctrl::bind_devs failed\n"),-1); AVStreams::flowSpec start_spec; - this->sender_streamctrl_->start (start_spec,ACE_TRY_ENV); + sender_streamctrl_->start (start_spec, + ACE_TRY_ENV); ACE_CHECK_RETURN (-1); return 0; @@ -322,12 +363,6 @@ Distributer::get_receiver_streamctrl (void) return this->receiver_streamctrl_; } -TAO_StreamCtrl* -Distributer::get_sender_streamctrl (void) -{ - return this->sender_streamctrl_; -} - void Distributer::stream_created (void) @@ -369,7 +404,7 @@ main (int argc, CORBA::Object_var obj = orb->resolve_initial_references ("RootPOA", ACE_TRY_ENV); - ACE_TRY_CHECK; + ACE_TRY_CHECK; // Get the POA_var object from Object_var. PortableServer::POA_var root_poa = @@ -392,15 +427,15 @@ main (int argc, // Initialize the Distributer int result = DISTRIBUTER::instance ()->init (argc, - argv, - ACE_TRY_ENV); + argv, + ACE_TRY_ENV); ACE_TRY_CHECK; if (result != 0) return result; // run the orb till the streams are not destroyed. - while (!DISTRIBUTER::instance ()->done () && orb->work_pending ()) + while (!DISTRIBUTER::instance ()->done ()) { orb->perform_work (ACE_TRY_ENV); ACE_TRY_CHECK; @@ -420,14 +455,10 @@ main (int argc, #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) template class ACE_Singleton <Distributer,ACE_Null_Mutex>; -template class TAO_AV_Endpoint_Reactive_Strategy_A -<Distributer_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl>; -template class TAO_AV_Endpoint_Reactive_Strategy -<Distributer_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl>; +template class TAO_AV_Endpoint_Reactive_Strategy_A<Distributer_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl>; +template class TAO_AV_Endpoint_Reactive_Strategy<Distributer_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl>; #elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) #pragma instantiate ACE_Singleton <Client,ACE_Null_Mutex> -#pragma instantiate TAO_AV_Endpoint_Reactive_Strategy_A -<Distributer_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl> -#pragma instantiate TAO_AV_Endpoint_Reactive_Strategy -<Distributer_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl> +#pragma instantiate TAO_AV_Endpoint_Reactive_Strategy_A<Distributer_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl> +#pragma instantiate TAO_AV_Endpoint_Reactive_Strategy<Distributer_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl> #endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ diff --git a/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/distributer.h b/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/distributer.h index 03d2c5a4df8..9e77f1fcab8 100644 --- a/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/distributer.h +++ b/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/distributer.h @@ -24,12 +24,6 @@ #include "orbsvcs/AV/Endpoint_Strategy.h" #include "orbsvcs/AV/Policy.h" -#define AV_MAX_STREAM_COUNT 2 -// The distributer currently handles only two connections, one from -// the sender and one from the receiver. - -class Distributer; - class Distributer_Callback : public TAO_AV_Callback { // = TITLE @@ -51,7 +45,7 @@ public: // Called when the sender has finished reading the file and wants // to close down the connection. - void flowname (ACE_CString); + void flowname (const ACE_CString &flowname); ACE_CString flowname (void); // Accessor methods to set and get the flowname corresponding // to the callback @@ -108,27 +102,28 @@ public: CORBA::Environment &); // Initialize data components. - int set_protocol_object (const char*, - TAO_AV_Protocol_Object *object); - TAO_AV_Protocol_Object* get_protocol_object (const char*); + int set_sender_protocol_object (TAO_AV_Protocol_Object *object); + TAO_AV_Protocol_Object* get_sender_protocol_object (void); // Accessor methods to set/get the protocol object of the receiver/sender // process int bind_to_mmdevice (AVStreams::MMDevice_ptr &mmdevice, - ACE_CString mmdevice_name, + const ACE_CString &mmdevice_name, CORBA::Environment &ACE_TRY_ENV); // Resolve the reference of the mmdevice from the naming service. TAO_StreamCtrl* get_receiver_streamctrl (void); - - TAO_StreamCtrl* get_sender_streamctrl (void); + // Get the stream control of the receiver void stream_created (void); // Called when stream created void stream_destroyed (void); // Called when stream destroyed - + + int parse_args (int argc, + char **argv); + int done (void); // Return the flag that suggests orb shutdown. @@ -149,44 +144,36 @@ protected: AVStreams::MMDevice_var sender_mmdevice_; // The sender MMDevice. - TAO_AV_Protocol_Object *protocol_object_ [2]; - + TAO_AV_Protocol_Object *sender_protocol_object_; + // The sender protocol object + int count_; // Number of frames sent. - int argc_; - char **argv_; + ACE_CString sender_address_; + // Address of the distributer host machine or a multicast address - ACE_CString address_; - // Address of the distributer host machine or a multicast address - Default is - // UDP multicast addess + ACE_CString receiver_address_; + // Address of the distributer host machine or a multicast address ACE_CString protocol_; // Selected protocol - default is UDP - ACE_CString sender_flowname_; - // The flow name of the stream set up between the - // sender and the distributer. - - ACE_CString receiver_flowname_; - // The flow name of the stream set up between the - // receiver and the distributer. - - TAO_StreamCtrl* sender_streamctrl_; - // Video stream controller for the stream between sender and distributer - TAO_StreamCtrl* receiver_streamctrl_; // Video stream controller for the stream between receivers and distributer - ACE_Message_Block mb; - // Message block into which data is read from a file and then sent. - int stream_count_; // Number of active streams. When a stream is disconnected this // count is decremented. int done_; // Flag to indicate orb shutdown + + ACE_CString host_sender_port_; + //Distributer sender streamendpoint port + + ACE_CString host_receiver_port_; + //Distributer receiver streamendpoint port }; typedef ACE_Singleton<Distributer, ACE_Null_Mutex> DISTRIBUTER; diff --git a/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/receiver.cpp b/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/receiver.cpp index 5e180b0c668..513f6d9a542 100644 --- a/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/receiver.cpp +++ b/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/receiver.cpp @@ -20,8 +20,8 @@ Receiver_StreamEndPoint::get_callback (const char *, int Receiver_Callback::receive_frame (ACE_Message_Block *frame, - TAO_AV_frame_info *, - const ACE_Addr &) + TAO_AV_frame_info *, + const ACE_Addr &) { // Upcall from the AVStreams when there is data to be received from the // distributer. @@ -53,18 +53,22 @@ Receiver_Callback::handle_destroy (void) // Called when the distributer requests the stream to be shutdown. ACE_DEBUG ((LM_DEBUG, "Receiver_Callback::end_stream\n")); - TAO_AV_CORE::instance ()->orb ()->shutdown (); - return 0; -} - + + ACE_TRY_NEW_ENV + { + TAO_AV_CORE::instance ()->orb ()->shutdown (0, + ACE_TRY_ENV); + ACE_TRY_CHECK; + } + ACE_CATCHANY + { + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, + "Receiver_Callback::handle_destroy Failed\n"); + return -1; + + } + ACE_ENDTRY; -int -Receiver_Callback::handle_stop (void) -{ - // Called when the distributer requests the stream to be shutdown. - ACE_DEBUG ((LM_DEBUG, - "Receiver_Callback::end_stream\n")); - TAO_AV_CORE::instance ()->orb ()->shutdown (); return 0; } @@ -79,8 +83,8 @@ Receiver::~Receiver (void) int Receiver::init (int, - char **, - CORBA::Environment &ACE_TRY_ENV) + char **, + CORBA::Environment &ACE_TRY_ENV) { // Initialize the endpoint strategy with the orb and poa. int result = @@ -94,20 +98,20 @@ Receiver::init (int, ACE_NEW_RETURN (this->mmdevice_, TAO_MMDevice (&this->reactive_strategy_), -1); - + // Servant Reference Counting to manage lifetime PortableServer::ServantBase_var safe_mmdevice = this->mmdevice_; + CORBA::Object_var mmdevice = + this->mmdevice_->_this (ACE_TRY_ENV); + ACE_CHECK_RETURN(-1); + // Register the receiver mmdevice with the naming service. CosNaming::Name receiver_mmdevice_name (1); receiver_mmdevice_name.length (1); receiver_mmdevice_name [0].id = CORBA::string_dup ("Receiver"); - CORBA::Object_var mmdevice = - this->mmdevice_->_this (ACE_TRY_ENV); - ACE_CHECK_RETURN(-1); - // Initialize the naming services if (this->my_naming_client_.init (TAO_AV_CORE::instance ()->orb ()) != 0) ACE_ERROR_RETURN ((LM_ERROR, @@ -165,25 +169,6 @@ main (int argc, ACE_TRY_ENV); ACE_TRY_CHECK; - int result = - parse_args (argc, - argv); - - if (result == -1) - return -1; - - // Make sure we have a valid <output_file> - output_file = ACE_OS::fopen (output_file_name, - "w"); - if (output_file == 0) - ACE_ERROR_RETURN ((LM_DEBUG, - "Cannot open output file %s\n", - output_file_name), - -1); - - else ACE_DEBUG ((LM_DEBUG, - "File Opened Successfull\n")); - CORBA::Object_var obj = orb->resolve_initial_references ("RootPOA", ACE_TRY_ENV); @@ -208,6 +193,25 @@ main (int argc, ACE_TRY_ENV); ACE_TRY_CHECK; + int result = + parse_args (argc, + argv); + + if (result == -1) + return -1; + + // Make sure we have a valid <output_file> + output_file = ACE_OS::fopen (output_file_name, + "w"); + if (output_file == 0) + ACE_ERROR_RETURN ((LM_DEBUG, + "Cannot open output file %s\n", + output_file_name), + -1); + + else ACE_DEBUG ((LM_DEBUG, + "File Opened Successfull\n")); + Receiver receiver; result = receiver.init (argc, diff --git a/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/receiver.h b/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/receiver.h index ece363d2dd4..dc0eeb47472 100644 --- a/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/receiver.h +++ b/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/receiver.h @@ -44,7 +44,6 @@ public: // to close down the connection. int handle_destroy (void); - int handle_stop (void); }; class Receiver_StreamEndPoint : public TAO_Server_StreamEndPoint diff --git a/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/sender.cpp b/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/sender.cpp index da4516db7a6..e40154e7ccd 100644 --- a/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/sender.cpp +++ b/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/sender.cpp @@ -4,20 +4,23 @@ #include "ace/Get_Opt.h" #include "ace/High_Res_Timer.h" -ACE_High_Res_Timer last_frame_sent_time; -// The time taken for sending a frmae and preparing for the next frame - -ACE_Time_Value inter_frame_time; -// The time that should lapse between two consecutive frames sent. - int Sender_Callback::handle_start (void) { // Connection setup, start sending data. - - ACE_DECLARE_NEW_CORBA_ENV; - SENDER::instance ()->pace_data (ACE_TRY_ENV); - ACE_CHECK_RETURN (-1); + ACE_TRY_NEW_ENV + { + SENDER::instance ()->pace_data (ACE_TRY_ENV); + ACE_TRY_CHECK; + } + ACE_CATCHANY + { + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, + "Sender_Callback::handle_start pace data failed"); + + return -1; + } + ACE_ENDTRY; return 0; } @@ -36,6 +39,7 @@ Sender_StreamEndPoint::get_callback (const char *, // Store reference to the sender stream endpoint SENDER::instance ()->set_endpoint (this); + return 0; } @@ -49,15 +53,15 @@ Sender_StreamEndPoint::set_protocol_object (const char *, } Sender::Sender (void) - :sender_mmdevice_ (0), - endpoint_ (0), - count_ (0), - filename_ ("Makefile"), - fp_ (0), - frame_rate_ (30), - protocol_object_ (0) + : sender_mmdevice_ (0), + endpoint_ (0), + count_ (0), + filename_ ("Makefile"), + fp_ (0), + frame_rate_ (30), + protocol_object_ (0) { - this->mb.size (BUFSIZ); + this->mb_.size (BUFSIZ); } void @@ -117,13 +121,6 @@ Sender::file (void) return this->fp_; } -int -Sender::frame_rate (void) -{ - return this->frame_rate_; -} - - // Method to bind the sender reference to the Naming Service. int Sender::register_sender (CORBA::Environment &ACE_TRY_ENV) @@ -140,11 +137,6 @@ Sender::register_sender (CORBA::Environment &ACE_TRY_ENV) TAO_MMDevice (&this->endpoint_strategy_), -1); - - CosNaming::Name sender_mmdevice_name (1); - sender_mmdevice_name.length (1); - sender_mmdevice_name [0].id = CORBA::string_dup ("Sender"); - // Servant Reference Counting to manage lifetime PortableServer::ServantBase_var safe_sender_mmdevice = this->sender_mmdevice_; @@ -153,6 +145,10 @@ Sender::register_sender (CORBA::Environment &ACE_TRY_ENV) this->sender_mmdevice_->_this (ACE_TRY_ENV); ACE_CHECK_RETURN(-1); + CosNaming::Name sender_mmdevice_name (1); + sender_mmdevice_name.length (1); + sender_mmdevice_name [0].id = CORBA::string_dup ("Sender"); + // Register the server object with the naming server. this->my_naming_client_->rebind (sender_mmdevice_name, mmdevice.in (), @@ -165,26 +161,21 @@ Sender::register_sender (CORBA::Environment &ACE_TRY_ENV) int Sender::init (int argc, char **argv, - CORBA::Environment& ACE_TRY_ENV) + CORBA::Environment &ACE_TRY_ENV) { - this->argc_ = argc; - this->argv_ = argv; - - CORBA::String_var ior; - // Initialize the endpoint strategy with the orb and poa. - this->endpoint_strategy_.init(TAO_AV_CORE::instance ()->orb (), - TAO_AV_CORE::instance ()->poa ()); - + int result = + this->endpoint_strategy_.init (TAO_AV_CORE::instance ()->orb (), + TAO_AV_CORE::instance ()->poa ()); + // Parse the command line arguments - int result = this->parse_args (argc, + result = this->parse_args (argc, argv); if (result < 0) ACE_ERROR_RETURN ((LM_ERROR, " (%P|%t) Error in Parse Args \n"), -1); - // Open file to read. this->fp_ = ACE_OS::fopen (this->filename_.c_str (), @@ -194,8 +185,9 @@ Sender::init (int argc, "Cannot open output file %s\n", this->filename_.c_str ()), -1); - else ACE_DEBUG ((LM_DEBUG, - "File opened successfully\n")); + else + ACE_DEBUG ((LM_DEBUG, + "File opened successfully\n")); // Register the object reference with the Naming Service. result = this->register_sender (ACE_TRY_ENV); @@ -211,7 +203,7 @@ Sender::init (int argc, // Method to send data at the specified rate int -Sender::pace_data (CORBA::Environment& ACE_TRY_ENV) +Sender::pace_data (CORBA::Environment &ACE_TRY_ENV) { // Time within which a frame should be sent. @@ -222,7 +214,11 @@ Sender::pace_data (CORBA::Environment& ACE_TRY_ENV) "Frame Time ONE = %f\n Frame Rate = %d\n", frame_time, this->frame_rate_)); - + + // The time that should lapse between two consecutive frames sent. + ACE_Time_Value inter_frame_time; + + // The time between two consecutive frames. inter_frame_time.set (frame_time); @@ -233,65 +229,63 @@ Sender::pace_data (CORBA::Environment& ACE_TRY_ENV) ACE_TRY { - + + // The time taken for sending a frame and preparing for the next frame + ACE_High_Res_Timer elapsed_timer; + // Continue to send data till the file is read to the end. while (1) { - // Count the frames sent. count_++; // Reset the message block. - this->mb.reset (); + this->mb_.reset (); // Read from the file into a message block. - int n = ACE_OS::fread (this->mb.wr_ptr (), + int n = ACE_OS::fread (this->mb_.wr_ptr (), 1, - this->mb.size (), + this->mb_.size (), SENDER::instance ()->file ()); - this->mb.wr_ptr (n); - if (n < 0) ACE_ERROR_RETURN ((LM_ERROR, - "FTP_Client_Flow_Handler::fread end of file\n"), + "Sender::pace_data fread failed\n"), -1); if (n == 0) { - if (feof (SENDER::instance ()->file ())) - { - // At end of file break the loop and end the client. - if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG,"Handle_Start:End of file\n")); - break; - } - - } + // At end of file break the loop and end the client. + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG,"Handle_Start:End of file\n")); + break; + } + + this->mb_.wr_ptr (n); if (this->count_ > 1) { // Second frame and beyond // Stop the timer that was started just before the previous frame was sent. - last_frame_sent_time.stop (); + elapsed_timer.stop (); // Get the time elapsed after sending the previous frame. - ACE_Time_Value tv; - last_frame_sent_time.elapsed_time (tv); + ACE_Time_Value elapsed_time; + elapsed_timer.elapsed_time (elapsed_time); if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "Elapsed Time = %d\n", - tv.msec ())); + elapsed_time.msec ())); // Check to see if the inter frame time has elapsed. - if (tv < inter_frame_time) + if (elapsed_time < inter_frame_time) { // Inter frame time has not elapsed. - // Claculate the time to wait before the next frame needs to be sent. - ACE_Time_Value wait_time (inter_frame_time - tv); + // Calculate the time to wait before the next frame needs to be sent. + ACE_Time_Value wait_time (inter_frame_time - elapsed_time); if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "Wait Time = %d\n", @@ -305,16 +299,16 @@ Sender::pace_data (CORBA::Environment& ACE_TRY_ENV) } // Start timer before sending the frame. - last_frame_sent_time.start (); + elapsed_timer.start (); // Send frame. - int result = this->protocol_object_->send_frame (&this->mb); + int result = this->protocol_object_->send_frame (&this->mb_); if (result < 0) ACE_ERROR_RETURN ((LM_ERROR, - "send failed:%p","FTP_Client_Flow_Handler::send\n"), + "send failed:%p","Sender::pace_data send\n"), -1); - ACE_DEBUG ((LM_DEBUG,"Client::pace_data buffer sent succesfully\n")); + ACE_DEBUG ((LM_DEBUG,"Sender::pace_data buffer sent succesfully\n")); } // end while @@ -322,8 +316,10 @@ Sender::pace_data (CORBA::Environment& ACE_TRY_ENV) AVStreams::flowSpec stop_spec; // Get the strem controoler for this stream. - CORBA::Any_ptr streamctrl_any = SENDER::instance ()->get_endpoint ()->get_property_value ("Related_StreamCtrl", - ACE_TRY_ENV); + CORBA::Any_ptr streamctrl_any = + SENDER::instance ()->get_endpoint ()->get_property_value ("Related_StreamCtrl", + ACE_TRY_ENV); + ACE_TRY_CHECK; AVStreams::StreamCtrl_ptr streamctrl; @@ -332,16 +328,18 @@ Sender::pace_data (CORBA::Environment& ACE_TRY_ENV) // Destroy the stream streamctrl->destroy (stop_spec, ACE_TRY_ENV); + ACE_TRY_CHECK; // Shut the orb down. - TAO_AV_CORE::instance ()->orb ()->shutdown (0); + TAO_AV_CORE::instance ()->orb ()->shutdown (0, + ACE_TRY_ENV); ACE_TRY_CHECK; } ACE_CATCHANY { ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, - "Client::pace_data Failed"); + "Client::pace_data Failed\n"); return -1; } ACE_ENDTRY; @@ -393,7 +391,7 @@ main (int argc, if (result < 0) ACE_ERROR_RETURN ((LM_ERROR, - "client::init failed\n"), -1); + "Sender::init failed\n"), -1); orb->run (ACE_TRY_ENV); ACE_TRY_CHECK; diff --git a/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/sender.h b/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/sender.h index 1b4a7ef940a..31e68c686b4 100644 --- a/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/sender.h +++ b/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/sender.h @@ -104,12 +104,6 @@ public: FILE *file (void); // File handle from which data is read to be sent. - TAO_StreamCtrl* streamctrl (void); - // The stream control interface that manages the stream set up - - int frame_rate (void); - // The requested frame rate for sending each frame of data read from the file. - private: int parse_args (int argc, char **argv); // Method to parse the command line arguments. @@ -119,22 +113,19 @@ private: ENDPOINT_STRATEGY endpoint_strategy_; // The reactive strategy of the client. - + AVStreams::MMDevice_var receiver_mmdevice_; // The receiver MMDevice that the sender connects to - + TAO_MMDevice* sender_mmdevice_; // The sender MMDevice. - + Sender_StreamEndPoint* endpoint_; // Reference to the sender streamendpoint - + int count_; // Number of frames sent. - int argc_; - char **argv_; - ACE_CString filename_; // File from which data is read. @@ -147,7 +138,7 @@ private: int frame_rate_; // The sepcified data frame rate - ACE_Message_Block mb; + ACE_Message_Block mb_; // Message block into which data is read from a file and then sent. TAO_AV_Protocol_Object* protocol_object_; |