diff options
Diffstat (limited to 'ACE/TAO/orbsvcs/tests/AVStreams/Bidirectional_Flows/sender.cpp')
-rw-r--r-- | ACE/TAO/orbsvcs/tests/AVStreams/Bidirectional_Flows/sender.cpp | 476 |
1 files changed, 476 insertions, 0 deletions
diff --git a/ACE/TAO/orbsvcs/tests/AVStreams/Bidirectional_Flows/sender.cpp b/ACE/TAO/orbsvcs/tests/AVStreams/Bidirectional_Flows/sender.cpp new file mode 100644 index 00000000000..9eef99a23e7 --- /dev/null +++ b/ACE/TAO/orbsvcs/tests/AVStreams/Bidirectional_Flows/sender.cpp @@ -0,0 +1,476 @@ +// $Id$ + +#include "sender.h" +#include "tao/debug.h" +#include "ace/Get_Opt.h" +#include "ace/High_Res_Timer.h" + +typedef ACE_Unmanaged_Singleton<Sender, ACE_Null_Mutex> SENDER; +// Create a singleton instance of the Sender. + +static FILE *output_file = 0; +// File handle of the file into which received data is written. + +static const ACE_TCHAR *output_file_name = ACE_TEXT ("output"); +// File name of the file into which received data is written. + + +int +Sender_StreamEndPoint::get_callback (const char *, + TAO_AV_Callback *&callback) +{ + // Create and return the sender application callback to AVStreams + // for further upcalls. + callback = &this->callback_; + return 0; +} + +int +Sender_StreamEndPoint::set_protocol_object (const char *, + TAO_AV_Protocol_Object *object) +{ + // Set the sender protocol object corresponding to the transport + // protocol selected. + SENDER::instance ()->protocol_object (object); + return 0; +} + +Sender_Callback::Sender_Callback (void) + : frame_count_ (1) +{ +} + +int +Sender_Callback::receive_frame (ACE_Message_Block *frame, + TAO_AV_frame_info *, + const ACE_Addr &) +{ + // + // Upcall from the AVStreams when there is data to be received from + // the sender. + // + ACE_DEBUG ((LM_DEBUG, + "Sender_Callback::receive_frame for frame %d\n", + this->frame_count_++)); + + while (frame != 0) + { + // Write the received data to the file. + size_t result = + ACE_OS::fwrite (frame->rd_ptr (), + frame->length (), + 1, + output_file); + + if (result == frame->length ()) + ACE_ERROR_RETURN ((LM_ERROR, + "Sender_Callback::fwrite failed\n"), + -1); + + frame = frame->cont (); + } + + if (SENDER::instance ()->eof () == 1) + SENDER::instance ()->shutdown (); + return 0; +} + +Sender::Sender (void) + : sender_mmdevice_ (0), + streamctrl_ (0), + frame_count_ (0), + filename_ ("input"), + input_file_ (0), + protocol_ ("UDP"), + frame_rate_ (30), + mb_ (BUFSIZ), + eof_ (0) +{ +} + +void +Sender::protocol_object (TAO_AV_Protocol_Object *object) +{ + // Set the sender protocol object corresponding to the transport + // protocol selected. + this->protocol_object_ = object; +} + +int +Sender::eof (void) +{ + return this->eof_; +} + +void +Sender::shutdown (void) +{ + try + { + // File reading is complete, destroy the stream. + AVStreams::flowSpec stop_spec; + this->streamctrl_->destroy (stop_spec); + + // Shut the orb down. + TAO_AV_CORE::instance ()->orb ()->shutdown (0); + } + catch (const CORBA::Exception& ex) + { + ex._tao_print_exception ("shutdown\n"); + } +} + +int +Sender::parse_args (int argc, + ACE_TCHAR *argv[]) +{ + // Parse command line arguments + ACE_Get_Opt opts (argc, argv, ACE_TEXT("f:p:r:d")); + + int c; + while ((c= opts ()) != -1) + { + switch (c) + { + case 'f': + this->filename_ = ACE_TEXT_ALWAYS_CHAR (opts.opt_arg ()); + break; + case 'p': + this->protocol_ = ACE_TEXT_ALWAYS_CHAR (opts.opt_arg ()); + break; + case 'r': + this->frame_rate_ = ACE_OS::atoi (opts.opt_arg ()); + break; + case 'd': + TAO_debug_level++; + break; + default: + ACE_DEBUG ((LM_DEBUG, "Unknown Option\n")); + return -1; + } + } + return 0; +} + +// Method to get the object reference of the receiver +int +Sender::bind_to_receiver (void) +{ + CosNaming::Name name (1); + name.length (1); + name [0].id = + CORBA::string_dup ("Receiver"); + + // Resolve the receiver object reference from the Naming Service + CORBA::Object_var receiver_mmdevice_obj = + this->naming_client_->resolve (name); + + this->receiver_mmdevice_ = + AVStreams::MMDevice::_narrow (receiver_mmdevice_obj.in ()); + + if (CORBA::is_nil (this->receiver_mmdevice_.in ())) + ACE_ERROR_RETURN ((LM_ERROR, + "Could not resolve Receiver_MMdevice in Naming service\n"), + -1); + + return 0; +} + +int +Sender::init (int argc, + ACE_TCHAR *argv[]) +{ + // Initialize the endpoint strategy with the orb and poa. + int result = + this->endpoint_strategy_.init (TAO_AV_CORE::instance ()->orb (), + TAO_AV_CORE::instance ()->poa ()); + if (result != 0) + return result; + + // Initialize the naming services + result = + this->naming_client_.init (TAO_AV_CORE::instance ()->orb ()); + if (result != 0) + return result; + + // Parse the command line arguments + result = + this->parse_args (argc, + argv); + if (result != 0) + return result; + + // Open file to read. + this->input_file_ = + ACE_OS::fopen (this->filename_.c_str (), + "r"); + + if (this->input_file_ == 0) + ACE_ERROR_RETURN ((LM_DEBUG, + "Cannot open input file %C\n", + this->filename_.c_str ()), + -1); + else + ACE_DEBUG ((LM_DEBUG, + "File opened successfully\n")); + + // Resolve the object reference of the receiver from the Naming Service. + result = this->bind_to_receiver (); + + if (result != 0) + ACE_ERROR_RETURN ((LM_ERROR, + "(%P|%t) Error binding to the naming service\n"), + -1); + + + // Initialize the QoS + AVStreams::streamQoS_var the_qos (new AVStreams::streamQoS); + + // Create the forward flow specification to describe the flow. + TAO_Forward_FlowSpec_Entry entry ("Data_Receiver", + "IN", + "USER_DEFINED", + "", + this->protocol_.c_str (), + 0); + + AVStreams::flowSpec flow_spec (1); + flow_spec.length (2); + flow_spec [0] = CORBA::string_dup (entry.entry_to_string ()); + + // Create the forward flow specification to describe the flow. + TAO_Forward_FlowSpec_Entry entry1 ("Data_Receiver1", + "OUT", + "USER_DEFINED", + "", + this->protocol_.c_str (), + 0); + + flow_spec [1] = CORBA::string_dup (entry1.entry_to_string ()); + + // Register the sender mmdevice object with the ORB + ACE_NEW_RETURN (this->sender_mmdevice_, + TAO_MMDevice (&this->endpoint_strategy_), + -1); + + // Servant Reference Counting to manage lifetime + PortableServer::ServantBase_var safe_mmdevice = + this->sender_mmdevice_; + + AVStreams::MMDevice_var mmdevice = + this->sender_mmdevice_->_this (); + + ACE_NEW_RETURN (this->streamctrl_, + TAO_StreamCtrl, + -1); + + PortableServer::ServantBase_var safe_streamctrl = + this->streamctrl_; + + // Bind/Connect the sender and receiver MMDevices. + CORBA::Boolean bind_result = + this->streamctrl_->bind_devs (mmdevice.in (), + this->receiver_mmdevice_.in (), + the_qos.inout (), + flow_spec); + + if (bind_result == 0) + ACE_ERROR_RETURN ((LM_ERROR, + "streamctrl::bind_devs failed\n"), + -1); + + return 0; +} + +// Method to send data at the specified rate +int +Sender::pace_data (void) +{ + // The time that should lapse between two consecutive frames sent. + ACE_Time_Value inter_frame_time; + + // The time between two consecutive frames. + inter_frame_time.set (1 / (double) this->frame_rate_); + + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "Frame Rate = %d / second\n" + "Inter Frame Time = %d (msec)\n", + this->frame_rate_, + inter_frame_time.msec ())); + + try + { + // The time taken for sending a frame and preparing for the next frame + ACE_High_Res_Timer elapsed_timer; + + // Continue to send data till the file is read to the end. + while (1) + { + // Read from the file into a message block. + int n = ACE_OS::fread (this->mb_.wr_ptr (), + 1, + this->mb_.size (), + this->input_file_); + + if (n < 0) + ACE_ERROR_RETURN ((LM_ERROR, + "Sender::pace_data fread failed\n"), + -1); + + if (n == 0) + { + // At end of file break the loop and end the sender. + ACE_DEBUG ((LM_DEBUG,"Handle_Start:End of file\n")); + this->eof_ = 1; + break; + } + + this->mb_.wr_ptr (n); + + if (this->frame_count_ > 1) + { + // + // Second frame and beyond + // + + // Stop the timer that was started just before the previous frame was sent. + elapsed_timer.stop (); + + // Get the time elapsed after sending the previous frame. + ACE_Time_Value elapsed_time; + elapsed_timer.elapsed_time (elapsed_time); + + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "Elapsed Time = %d\n", + elapsed_time.msec ())); + + // Check to see if the inter frame time has elapsed. + if (elapsed_time < inter_frame_time) + { + // Inter frame time has not elapsed. + + // Calculate the time to wait before the next frame needs to be sent. + ACE_Time_Value wait_time (inter_frame_time - elapsed_time); + + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "Wait Time = %d\n", + wait_time.msec ())); + + // Run the orb for the wait time so the sender can + // continue other orb requests. + TAO_AV_CORE::instance ()->orb ()->run (wait_time); + } + } + + // Start timer before sending the frame. + elapsed_timer.start (); + + // Send frame. + int result = + this->protocol_object_->send_frame (&this->mb_); + + if (result < 0) + ACE_ERROR_RETURN ((LM_ERROR, + "send failed:%p", + "Sender::pace_data send\n"), + -1); + + ACE_DEBUG ((LM_DEBUG, + "Sender::pace_data frame %d was sent succesfully\n", + ++this->frame_count_)); + + // Reset the message block. + this->mb_.reset (); + + } // end while + + // File reading is complete, destroy the stream. + AVStreams::flowSpec stop_spec; + this->streamctrl_->destroy (stop_spec); + + // Shut the orb down. + //TAO_AV_CORE::instance ()->orb ()->shutdown (1, + //); + } + catch (const CORBA::Exception&) + { + //ACE_PRINT_EXCEPTION (ex, + // "Sender::pace_data Failed\n"); + return -1; + } + return 0; +} + +int +ACE_TMAIN (int argc, + ACE_TCHAR *argv[]) +{ + try + { + CORBA::ORB_var orb = + CORBA::ORB_init (argc, argv); + + CORBA::Object_var obj + = orb->resolve_initial_references ("RootPOA"); + + // Get the POA_var object from Object_var + PortableServer::POA_var root_poa + = PortableServer::POA::_narrow (obj.in ()); + + PortableServer::POAManager_var mgr + = root_poa->the_POAManager (); + + mgr->activate (); + + // Initialize the AV Stream components. +/* TAO_AV_CORE::instance ()->init (orb.in (), + root_poa.in ()); */ + + // Initialize the AVStreams components. + TAO_AV_CORE::instance ()->init (orb.in (), root_poa.in ()); + + // Initialize the Sender. + int result = 0; + result = SENDER::instance ()->init (argc, + argv); + + if (result < 0) + ACE_ERROR_RETURN ((LM_ERROR, + "Sender::init failed\n"), + -1); + + // Make sure we have a valid <output_file> + output_file = ACE_OS::fopen (output_file_name, + "w"); + if (output_file == 0) + ACE_ERROR_RETURN ((LM_DEBUG, + "Cannot open output file %s\n", + output_file_name), + -1); + + else + ACE_DEBUG ((LM_DEBUG, + "File Opened Successfully\n")); + + // Start sending data. + result = SENDER::instance ()->pace_data (); + ACE_Time_Value tv(3,0); + orb->run (tv); + } + catch (const CORBA::Exception& ex) + { + ex._tao_print_exception ("Sender Failed\n"); + return -1; + } + + SENDER::close (); // Explicitly finalize the Unmanaged_Singleton. + + return 0; +} + +#if defined (ACE_HAS_EXPLICIT_STATIC_TEMPLATE_MEMBER_INSTANTIATION) +template ACE_Unmanaged_Singleton<Sender, ACE_Null_Mutex> *ACE_Unmanaged_Singleton<Sender, ACE_Null_Mutex>::singleton_; +#endif /* ACE_HAS_EXPLICIT_STATIC_TEMPLATE_MEMBER_INSTANTIATION */ |