diff options
Diffstat (limited to 'trunk/TAO/orbsvcs/tests/AVStreams/Multiple_Flows/receiver.cpp')
-rw-r--r-- | trunk/TAO/orbsvcs/tests/AVStreams/Multiple_Flows/receiver.cpp | 306 |
1 files changed, 306 insertions, 0 deletions
diff --git a/trunk/TAO/orbsvcs/tests/AVStreams/Multiple_Flows/receiver.cpp b/trunk/TAO/orbsvcs/tests/AVStreams/Multiple_Flows/receiver.cpp new file mode 100644 index 00000000000..ef33207328f --- /dev/null +++ b/trunk/TAO/orbsvcs/tests/AVStreams/Multiple_Flows/receiver.cpp @@ -0,0 +1,306 @@ +// $Id$ + +#include "receiver.h" +#include "ace/Get_Opt.h" +#include "ace/High_Res_Timer.h" + +int endstream = 0; + +// Create a singleton instance of the Sender. + +// An Unmanaged_Singleton is used to avoid static object destruction +// order related problems since the underlying singleton object +// contains references to static TypeCodes. +typedef ACE_Unmanaged_Singleton<Receiver,ACE_Null_Mutex> RECEIVER; + + +int +Receiver_StreamEndPoint::get_callback (const char *flow_name, + TAO_AV_Callback *&callback) +{ + Receiver_Callback *callback_; + ACE_NEW_RETURN (callback_, + Receiver_Callback, + -1); + + // Return the receiver application callback to the AVStreams for + // future upcalls. + callback = callback_; + callback_->flowname (flow_name); + return 0; +} + + +int +Receiver_StreamEndPoint::set_protocol_object (const char * flowname, + TAO_AV_Protocol_Object *object) +{ + // Set the sender protocol object corresponding to the transport + // protocol selected. + if (ACE_OS::strcmp (flowname, "Data_Receiver1") == 0) + RECEIVER::instance ()->protocol_object (object); + return 0; +} + +Receiver_Callback::Receiver_Callback (void) + : frame_count_ (1), + mb_ (BUFSIZ) +{ + ACE_DEBUG ((LM_DEBUG, + "Receiver_Callback::Receiver_Callback\n")); +} + +void +Receiver_Callback::flowname (const char* flow_name) +{ + this->flowname_ = flow_name; + + // Make sure we have a valid <output_file> + this->output_file_ = ACE_OS::fopen (this->flowname_.c_str (), + "w"); + if (this->output_file_ == 0) + ACE_ERROR ((LM_DEBUG, + "Cannot open output file %s\n", + this->flowname_.c_str ())); + + else + ACE_DEBUG ((LM_DEBUG, + "%s File Opened Successfully\n", + this->flowname_.c_str ())); + + +} + +int +Receiver_Callback::receive_frame (ACE_Message_Block *frame, + TAO_AV_frame_info *, + const ACE_Addr &) +{ + // + // Upcall from the AVStreams when there is data to be received from + // the sender. + // + ACE_DEBUG ((LM_DEBUG, + "Receiver_Callback::receive_frame for frame %d for flow %s\n", + this->frame_count_++, + this->flowname_.c_str ())); + + while (frame != 0) + { + // Write the received data to the file. + size_t result = + ACE_OS::fwrite (frame->rd_ptr (), + frame->length (), + 1, + this->output_file_); + + if (result == frame->length ()) + ACE_ERROR_RETURN ((LM_ERROR, + "Receiver_Callback::fwrite failed\n"), + -1); + + frame = frame->cont (); + } + + return 0; +} + +int +Receiver_Callback::handle_destroy (void) +{ + // Called when the distributer requests the stream to be shutdown. + ACE_DEBUG ((LM_DEBUG, + "Receiver_Callback::handle_destroy\n")); + + endstream++; + + return 0; +} + +Receiver::Receiver (void) + : mmdevice_ (0), + frame_rate_ (30), + frame_count_ (0), + filename_ ("input"), + mb_ (BUFSIZ) +{ +} + +Receiver::~Receiver (void) +{ +} + +void +Receiver::protocol_object (TAO_AV_Protocol_Object *object) +{ + // Set the sender protocol object corresponding to the transport + // protocol selected. + this->protocol_object_ = object; +} + +int +Receiver::parse_args (int argc, + char **argv) +{ + // Parse command line arguments + ACE_Get_Opt opts (argc, argv, "f:r:d"); + + int c; + while ((c= opts ()) != -1) + { + switch (c) + { + case 'f': + this->filename_ = opts.opt_arg (); + break; + case 'r': + this->frame_rate_ = ACE_OS::atoi (opts.opt_arg ()); + break; + case 'd': + TAO_debug_level++; + break; + default: + ACE_DEBUG ((LM_DEBUG, "Unknown Option\n")); + return -1; + } + } + return 0; +} + +int +Receiver::init (int argc, + char ** argv + ACE_ENV_ARG_DECL) +{ + // Initialize the endpoint strategy with the orb and poa. + int result = + this->reactive_strategy_.init (TAO_AV_CORE::instance ()->orb (), + TAO_AV_CORE::instance ()->poa ()); + if (result != 0) + return result; + + // Parse the command line arguments + result = + this->parse_args (argc, + argv); + if (result != 0) + return result; + + + // Register the receiver mmdevice object with the ORB + ACE_NEW_RETURN (this->mmdevice_, + TAO_MMDevice (&this->reactive_strategy_), + -1); + + // Servant Reference Counting to manage lifetime + PortableServer::ServantBase_var safe_mmdevice = + this->mmdevice_; + + CORBA::Object_var mmdevice = + this->mmdevice_->_this (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + + // Register the mmdevice with the naming service. + CosNaming::Name name (1); + name.length (1); + name [0].id = + CORBA::string_dup ("Receiver"); + + // Initialize the naming services + if (this->naming_client_.init (TAO_AV_CORE::instance ()->orb ()) != 0) + ACE_ERROR_RETURN ((LM_ERROR, + "Unable to initialize " + "the TAO_Naming_Client\n"), + -1); + + // Register the receiver object with the naming server. + this->naming_client_->rebind (name, + mmdevice.in () + ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + + return 0; +} + +TAO_AV_Protocol_Object* +Receiver::protocol_object (void) +{ + return this->protocol_object_; +} + +int +main (int argc, + char **argv) +{ + ACE_DECLARE_NEW_CORBA_ENV; + ACE_TRY + { + // Initialize the ORB first. + CORBA::ORB_var orb = + CORBA::ORB_init (argc, + argv, + 0 + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + CORBA::Object_var obj + = orb->resolve_initial_references ("RootPOA" + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + // Get the POA_var object from Object_var. + PortableServer::POA_var root_poa = + PortableServer::POA::_narrow (obj.in () + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + PortableServer::POAManager_var mgr + = root_poa->the_POAManager (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + mgr->activate (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + // Initialize the AVStreams components. + TAO_AV_CORE::instance ()->init (orb.in (), + root_poa.in () + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + int result = + RECEIVER::instance ()->init (argc, + argv + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + if (result != 0) + return result; + + while (endstream != 2) + { + orb->perform_work (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + } + + // Hack for now.... + ACE_OS::sleep (1); + + orb->destroy (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + } + ACE_CATCHANY + { + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"receiver::init"); + return -1; + } + ACE_ENDTRY; + ACE_CHECK_RETURN (-1); + + RECEIVER::close (); // Explicitly finalize the Unmanaged_Singleton. + + return 0; +} + +#if defined (ACE_HAS_EXPLICIT_STATIC_TEMPLATE_MEMBER_INSTANTIATION) +template ACE_Unmanaged_Singleton<Receiver, ACE_Null_Mutex> *ACE_Unmanaged_Singleton<Receiver, ACE_Null_Mutex>::singleton_; +#endif /* ACE_HAS_EXPLICIT_STATIC_TEMPLATE_MEMBER_INSTANTIATION */ |