summaryrefslogtreecommitdiff
path: root/ACE/TAO/orbsvcs/tests/AVStreams/Simple_Two_Stage/receiver.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ACE/TAO/orbsvcs/tests/AVStreams/Simple_Two_Stage/receiver.cpp')
-rw-r--r--ACE/TAO/orbsvcs/tests/AVStreams/Simple_Two_Stage/receiver.cpp296
1 files changed, 296 insertions, 0 deletions
diff --git a/ACE/TAO/orbsvcs/tests/AVStreams/Simple_Two_Stage/receiver.cpp b/ACE/TAO/orbsvcs/tests/AVStreams/Simple_Two_Stage/receiver.cpp
new file mode 100644
index 00000000000..9c4a3d00fbc
--- /dev/null
+++ b/ACE/TAO/orbsvcs/tests/AVStreams/Simple_Two_Stage/receiver.cpp
@@ -0,0 +1,296 @@
+// $Id$
+
+#include "receiver.h"
+#include "ace/Get_Opt.h"
+#include "ace/High_Res_Timer.h"
+
+static FILE *output_file = 0;
+// File handle of the file into which received data is written.
+
+static const ACE_TCHAR *output_file_name = ACE_TEXT ("output");
+// File name of the file into which received data is written.
+
+static const ACE_TCHAR *stats_file_name = ACE_TEXT ("Stats.dat");
+
+int stats [1000010];
+long stats_index = 0;
+
+int start = 1;
+ACE_Time_Value start_time;
+
+int
+Receiver_StreamEndPoint::get_callback (const char *,
+ TAO_AV_Callback *&callback)
+{
+ // Return the receiver application callback to the AVStreams for
+ // future upcalls.
+ callback = &this->callback_;
+ return 0;
+}
+
+Receiver_Callback::Receiver_Callback (void)
+ : frame_count_ (0)
+{
+}
+
+int
+Receiver_Callback::receive_frame (ACE_Message_Block *frame,
+ TAO_AV_frame_info *,
+ const ACE_Addr &)
+{
+ //
+ // Upcall from the AVStreams when there is data to be received from
+ // the sender.
+ //
+ ACE_DEBUG ((LM_DEBUG,
+ "Receiver_Callback::receive_frame for frame %d\n",
+ this->frame_count_++));
+
+ if (start)
+ {
+ start_time = ACE_OS::gettimeofday ();
+ start = 0;
+ }
+ else
+ {
+ ACE_Time_Value elapsed_time = ACE_OS::gettimeofday () - start_time;
+ stats [stats_index++] = elapsed_time.msec ();
+ start_time = ACE_OS::gettimeofday ();
+ }
+
+
+ while (frame != 0)
+ {
+ // Write the received data to the file.
+ size_t result =
+ ACE_OS::fwrite (frame->rd_ptr (),
+ frame->length (),
+ 1,
+ output_file);
+
+
+ if (result == frame->length ())
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Receiver_Callback::fwrite failed\n"),
+ -1);
+
+ frame = frame->cont ();
+ }
+
+ return 0;
+}
+
+void
+Receiver_Callback::dump_samples (const char* file)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ "Dumping Stats.....\n"));
+
+ FILE* stats_file = ACE_OS::fopen (file, "w");
+
+ if (stats_file == 0)
+ {
+ ACE_ERROR ((LM_ERROR,
+ "Stats.dat cannot be opened\n"));
+ }
+
+ int i;
+ for (i = 0; i < stats_index; i++)
+ {
+ stats_.sample ((ACE_UINT64)stats [i]);
+ }
+
+ stats_.dump_results (ACE_TEXT("Inter Frame Arrival Time Statistics "),
+ stats_file,
+ 1);
+ // ACE_High_Res_Timer::global_scale_factor ());
+
+ for (i = 0; i < stats_index; i++)
+ {
+ ACE_OS::fprintf (stats_file, "%d\n",stats [i]);
+ }
+
+ ACE_OS::fclose (stats_file);
+
+ ACE_DEBUG ((LM_DEBUG,
+ "Done\n"));
+}
+
+int
+Receiver_Callback::handle_destroy (void)
+{
+ // Called when the distributer requests the stream to be shutdown.
+ ACE_DEBUG ((LM_DEBUG,
+ "Receiver_Callback::end_stream\n"));
+
+ dump_samples (ACE_TEXT_ALWAYS_CHAR (stats_file_name));
+
+ try
+ {
+ TAO_AV_CORE::instance ()->orb ()->shutdown (0);
+ }
+ catch (const CORBA::Exception& ex)
+ {
+ ex._tao_print_exception ("Receiver_Callback::handle_destroy Failed\n");
+ return -1;
+
+ }
+
+ return 0;
+}
+
+Receiver::Receiver (void)
+ : mmdevice_ (0)
+{
+}
+
+Receiver::~Receiver (void)
+{
+}
+
+int
+Receiver::init (int,
+ ACE_TCHAR *[])
+{
+ // Initialize the endpoint strategy with the orb and poa.
+ int result =
+ this->reactive_strategy_.init (TAO_AV_CORE::instance ()->orb (),
+ TAO_AV_CORE::instance ()->poa ());
+ if (result != 0)
+ return result;
+
+ // Register the receiver mmdevice object with the ORB
+ ACE_NEW_RETURN (this->mmdevice_,
+ TAO_MMDevice (&this->reactive_strategy_),
+ -1);
+
+ // Servant Reference Counting to manage lifetime
+ PortableServer::ServantBase_var safe_mmdevice =
+ this->mmdevice_;
+
+ CORBA::Object_var mmdevice =
+ this->mmdevice_->_this ();
+
+ // Register the mmdevice with the naming service.
+ CosNaming::Name name (1);
+ name.length (1);
+ name [0].id =
+ CORBA::string_dup ("Receiver");
+
+ // Initialize the naming services
+ if (this->naming_client_.init (TAO_AV_CORE::instance ()->orb ()) != 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Unable to initialize "
+ "the TAO_Naming_Client\n"),
+ -1);
+
+ // Register the receiver object with the naming server.
+ this->naming_client_->rebind (name,
+ mmdevice.in ());
+
+ return 0;
+}
+
+
+int
+parse_args (int argc,
+ ACE_TCHAR *argv[])
+{
+ // Parse the command line arguments
+ ACE_Get_Opt opts (argc,
+ argv,
+ "f:s:");
+
+ int c;
+ while ((c = opts ()) != -1)
+ {
+ switch (c)
+ {
+ case 'f':
+ output_file_name = opts.opt_arg ();
+ break;
+ case 's':
+ stats_file_name = opts.opt_arg ();
+ break;
+ default:
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Usage: receiver -f filename"),
+ -1);
+ }
+ }
+
+ return 0;
+}
+
+int
+ACE_TMAIN (int argc, ACE_TCHAR *argv[])
+{
+ try
+ {
+ // Initialize the ORB first.
+ CORBA::ORB_var orb =
+ CORBA::ORB_init (argc, argv);
+
+ CORBA::Object_var obj
+ = orb->resolve_initial_references ("RootPOA");
+
+ // Get the POA_var object from Object_var.
+ PortableServer::POA_var root_poa =
+ PortableServer::POA::_narrow (obj.in ());
+
+ PortableServer::POAManager_var mgr
+ = root_poa->the_POAManager ();
+
+ mgr->activate ();
+
+ // Initialize the AVStreams components.
+ TAO_AV_CORE::instance ()->init (orb.in (),
+ root_poa.in ());
+
+ int result =
+ parse_args (argc,
+ argv);
+
+ if (result == -1)
+ return -1;
+
+ // Make sure we have a valid <output_file>
+ output_file = ACE_OS::fopen (output_file_name,
+ "w");
+ if (output_file == 0)
+ ACE_ERROR_RETURN ((LM_DEBUG,
+ "Cannot open output file %s\n",
+ output_file_name),
+ -1);
+
+ else
+ ACE_DEBUG ((LM_DEBUG,
+ "File Opened Successfully\n"));
+
+ Receiver receiver;
+ result =
+ receiver.init (argc,
+ argv);
+
+ if (result != 0)
+ return result;
+
+ orb->run ();
+
+ // Hack for now....
+ ACE_OS::sleep (1);
+
+ orb->destroy ();
+
+
+ }
+ catch (const CORBA::Exception& ex)
+ {
+ ex._tao_print_exception ("receiver::init");
+ return -1;
+ }
+
+ ACE_OS::fclose (output_file);
+
+ return 0;
+}