summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/tests
diff options
context:
space:
mode:
authoryamuna <yamuna@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2001-03-13 16:26:42 +0000
committeryamuna <yamuna@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2001-03-13 16:26:42 +0000
commitbdb060be5e7a9e1a8efa18f33a853942c791b5d3 (patch)
tree34e92dd08922ec9cd30bc5a8dcfd89f23914d5b3 /TAO/orbsvcs/tests
parentfa5e2f45b37b7736b9be7e5eed15391b05aedb6f (diff)
downloadATCD-bdb060be5e7a9e1a8efa18f33a853942c791b5d3.tar.gz
*** empty log message ***
Diffstat (limited to 'TAO/orbsvcs/tests')
-rw-r--r--TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/README56
-rw-r--r--TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/distributer.cpp405
-rw-r--r--TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/distributer.h202
-rw-r--r--TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/receiver.cpp244
-rw-r--r--TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/receiver.h100
-rwxr-xr-xTAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/run_test.pl83
-rw-r--r--TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/sender.cpp419
-rw-r--r--TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/sender.h179
8 files changed, 1688 insertions, 0 deletions
diff --git a/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/README b/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/README
new file mode 100644
index 00000000000..c11faefa8e8
--- /dev/null
+++ b/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/README
@@ -0,0 +1,56 @@
+// $Id$
+
+Description
+-----------
+
+This directory contains a simple three stage test with 3 processes
+a. Sender
+b. Distributer
+c. Receiver
+
+This test has a sender process that paces the data read from a file and sends it to
+the distributer process. The distributer acts as a conduit. It receives data from
+the sender and sends it immediately to the receiver process.
+
+The sender and receiver register their references with the Naming Service.
+The distributer gets the sender and receiver references from the Naming Service.
+It then binds to the receiver and sender. When the connection between the distributer
+and sender is set up the sender starts sending to the distrubuter which in turn sends
+data to the receiver. When the sender has finished reading the file it tells the ditri-
+buter which in turn tells the recveiver to destroy the streams set up.
+
+
+Running the test
+----------------
+
+Start the Naming Service
+
+Start the following processes in the same order.
+
+sender
+------
+
+sender [-f <filename>] [-r <data_rate>]
+
+-f filename --> The name of the file from which data needs to be sent
+
+-r data_rate--> The rate at which the data needs to be paced.
+
+
+receiver
+--------
+
+receiver [-f <filename>]
+
+-f filename --> File into which the data received from the distributer is stored.
+
+-s --> flag to use SFP. This option cannot be used with -p
+ TCP since SFP currently runs only over UDP.
+
+
+distributer:
+-----------
+
+distributer
+
+
diff --git a/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/distributer.cpp b/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/distributer.cpp
new file mode 100644
index 00000000000..32f556b4aa0
--- /dev/null
+++ b/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/distributer.cpp
@@ -0,0 +1,405 @@
+// $Id$
+
+#include "distributer.h"
+#include "ace/Get_Opt.h"
+#include "orbsvcs/AV/Protocol_Factory.h"
+
+int first = 1;
+
+int
+Distributer_StreamEndPoint::set_protocol_object (const char *flow_name,
+ TAO_AV_Protocol_Object *object)
+{
+ // Set the protocol object corresponding to the transport protocol selected.
+ DISTRIBUTER::instance ()->set_protocol_object (flow_name, object);
+
+ // Store the flowname of the stream that this callback belongs to.
+ ACE_CString fname = flow_name;
+ this->callback_.flowname (fname);
+
+ // Increment the stream count.
+ DISTRIBUTER::instance ()->stream_created ();
+ return 0;
+}
+
+
+int
+Distributer_StreamEndPoint::get_callback (const char *,
+ TAO_AV_Callback *&callback)
+{
+ // Create and return the application callback and return to the AVStreams
+ // for further upcalls.
+ callback = &this->callback_;
+ return 0;
+}
+
+void
+Distributer_Callback::flowname (ACE_CString flowname)
+{
+ this->flowname_ = flowname;
+}
+
+
+ACE_CString
+Distributer_Callback::flowname (void)
+{
+ return this->flowname_;
+}
+
+int
+Distributer_Callback::receive_frame (ACE_Message_Block *frame,
+ TAO_AV_frame_info *,
+ const ACE_Addr &)
+{
+ // Upcall from the AVStreams when there is data to be received from the
+ // sender.
+ ACE_DEBUG ((LM_DEBUG,
+ "Distributer_Callback::receive_frame\n"));
+
+ ACE_CString flowname = "Data_Receiver";
+
+ // Get the protocol object corresponding to the receiver stream
+ // send the data received from sender to the receiver.
+ DISTRIBUTER::instance ()->get_protocol_object (flowname.c_str())->send_frame (frame);
+
+ return 0;
+}
+
+int
+Distributer_Callback::handle_destroy (void)
+{
+ // Called when the sender requests the stream to be shutdown.
+ ACE_DECLARE_NEW_CORBA_ENV;
+
+ ACE_DEBUG ((LM_DEBUG,
+ "Distributer_Callback::end_stream\n"));
+
+ if (ACE_OS::strcmp (this->flowname_.c_str (), "Data_Sender") == 0)
+ {
+ // Destroy the sender receiver stream as the sender has requested a stream destroy
+ AVStreams::flowSpec stop_spec;
+ DISTRIBUTER::instance ()->get_receiver_streamctrl ()->destroy (stop_spec,
+ ACE_TRY_ENV);
+ }
+
+ // Decrement the stream count.
+ DISTRIBUTER::instance ()->stream_destroyed ();
+ return 0;
+}
+
+int
+Distributer::set_protocol_object (const char* flowname, TAO_AV_Protocol_Object* object)
+{
+ // Set the corresponding protocol objects for the different streams created.
+ if (ACE_OS::strcmp ("Data_Sender", flowname) == 0)
+ this->protocol_object_ [0] = object;
+ else if (ACE_OS::strcmp ("Data_Receiver", flowname) == 0)
+ this->protocol_object_ [1] = object;
+
+ return 0;
+}
+
+TAO_AV_Protocol_Object*
+Distributer::get_protocol_object (const char* flowname)
+{
+ if (ACE_OS::strcmp ("Data_Sender", flowname) == 0)
+ return this->protocol_object_ [0];
+ else if (ACE_OS::strcmp ("Data_Receiver", flowname) == 0)
+ return this->protocol_object_ [1];
+
+ return *(this->protocol_object_);
+}
+
+
+Distributer::Distributer (void)
+ : distributer_mmdevice_ (&a_endpoint_strategy_),
+ count_ (0),
+ protocol_ ("UDP"),
+ stream_count_ (0),
+ done_ (0)
+{
+
+ // Get the local host name
+ char buf [BUFSIZ];
+ ACE_OS::hostname (buf,
+ BUFSIZ);
+
+ // Set the address to the local host and port.
+ this->address_ = buf;
+ this->address_ += ":8000";
+
+ protocol_object_ [0] = 0;
+ protocol_object_ [1] = 0;
+ this->mb.size (BUFSIZ);
+}
+
+Distributer::~Distributer (void)
+{
+}
+
+
+// Method to bind the sender reference to the Naming Service.
+int
+Distributer::bind_to_mmdevice (AVStreams::MMDevice_ptr &mmdevice,
+ ACE_CString mmdevice_name,
+ CORBA::Environment &ACE_TRY_ENV)
+{
+ // Initialize the naming services
+ if (my_naming_client_.init (TAO_AV_CORE::instance ()->orb ()) != 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ " (%P|%t) Unable to initialize "
+ "the TAO_Naming_Client. \n"),
+ -1);
+
+ CosNaming::Name mmdevice_naming_name (1);
+ mmdevice_naming_name.length (1);
+ mmdevice_naming_name [0].id = CORBA::string_dup (mmdevice_name.c_str ());
+
+ // Resolve the mmdevice object reference from the Naming Service
+ CORBA::Object_var mmdevice_obj =
+ my_naming_client_->resolve (mmdevice_naming_name,
+ ACE_TRY_ENV);
+ ACE_CHECK_RETURN (-1);
+
+ mmdevice =
+ AVStreams::MMDevice::_narrow (mmdevice_obj.in (),
+ ACE_TRY_ENV);
+ ACE_CHECK_RETURN (-1);
+
+ if (CORBA::is_nil (mmdevice))
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Could not resolve MMdevice in Naming service <%s>\n"),
+ -1);
+ return 0;
+}
+
+
+int
+Distributer::init (int,
+ char **,
+ CORBA::Environment &ACE_TRY_ENV)
+{
+ // Initialize the endpoint strategy with the orb and poa.
+ int result =
+ this->a_endpoint_strategy_.init (TAO_AV_CORE::instance ()->orb (),
+ TAO_AV_CORE::instance ()->poa ());
+ if (result != 0)
+ return result;
+
+ // Bind to the receiver mmdevice
+ ACE_CString mmdevice_name ("Receiver");
+ result = bind_to_mmdevice (this->receiver_mmdevice_.inout (),
+ mmdevice_name,
+ ACE_TRY_ENV);
+ ACE_CHECK_RETURN (-1);
+
+ if (result != 0)
+ return result;
+
+ // Bind to the sender mmdevice
+ mmdevice_name = "Sender";
+ result = bind_to_mmdevice (this->sender_mmdevice_.inout (),
+ mmdevice_name,
+ ACE_TRY_ENV);
+ ACE_CHECK_RETURN (-1);
+
+ if (result != 0)
+ return result;
+
+ // Create the Flow protocol name
+ ACE_CString flow_protocol_str;
+ if (this->use_sfp_)
+ flow_protocol_str = "sfp:1.0";
+ else
+ flow_protocol_str = "";
+
+ // Initialize the QoS
+ AVStreams::streamQoS_var the_qos (new AVStreams::streamQoS);
+
+ // Set the address of the of the distributer receiver endpoint.
+ ACE_INET_Addr receiver_addr (this->address_.c_str ());
+
+ this->receiver_flowname_ = "Data_Receiver";
+
+ // Create the forward flow specification to describe the flow.
+ TAO_Forward_FlowSpec_Entry receiver_entry (this->receiver_flowname_.c_str (),
+ "IN",
+ "USER_DEFINED",
+ flow_protocol_str.c_str (),
+ this->protocol_.c_str (),
+ &receiver_addr);
+
+ // Set the flow specification for the stream between receiver and distributer
+ AVStreams::flowSpec flow_spec (1);
+ flow_spec.length (1);
+ flow_spec [0] = CORBA::string_dup (receiver_entry.entry_to_string ());
+
+ AVStreams::MMDevice_var distributer_mmdevice = this->distributer_mmdevice_._this (ACE_TRY_ENV);
+ ACE_CHECK_RETURN (-1);
+
+ // Bind/Connect the distributer and receiver MMDevices.
+ result =
+ this->receiver_streamctrl_.bind_devs (distributer_mmdevice.in (),
+ this->receiver_mmdevice_.in (),
+ the_qos.inout (),
+ flow_spec,
+ ACE_TRY_ENV);
+ ACE_CHECK_RETURN (-1);
+
+ // Flowname for the stream bewteen the sender and distributer
+ this->sender_flowname_ = "Data_Sender";
+
+ // Get the local host name
+ char buf [BUFSIZ];
+ ACE_OS::hostname (buf,
+ BUFSIZ);
+
+ // Set the address to the local host and port
+ this->address_ = buf;
+ this->address_ += ":8001";
+
+ // Set the address of the sender.
+ ACE_INET_Addr sender_addr (this->address_.c_str ());
+
+ // Create the forward flow specification to describe the flow.
+ TAO_Forward_FlowSpec_Entry sender_entry (this->sender_flowname_.c_str (),
+ "OUT",
+ "USER_DEFINED",
+ flow_protocol_str.c_str (),
+ this->protocol_.c_str (),
+ &sender_addr);
+
+ // Set the flow specification for the stream between sender and distributer
+ flow_spec [0] = CORBA::string_dup (sender_entry.entry_to_string ());
+
+ // Bind/Connect the sender and sitributer MMDevices.
+ CORBA::Boolean res =
+ this->sender_streamctrl_.bind_devs (distributer_mmdevice.in (),
+ this->sender_mmdevice_.in (),
+ the_qos.inout (),
+ flow_spec,
+ ACE_TRY_ENV);
+ ACE_CHECK_RETURN (-1);
+
+ if (res == 0)
+ ACE_ERROR_RETURN ((LM_ERROR,"Streamctrl::bind_devs failed\n"),-1);
+
+ AVStreams::flowSpec start_spec;
+ this->sender_streamctrl_.start (start_spec,ACE_TRY_ENV);
+ ACE_CHECK_RETURN (-1);
+
+ return 0;
+}
+
+TAO_StreamCtrl*
+Distributer::get_receiver_streamctrl (void)
+{
+ return &this->receiver_streamctrl_;
+}
+
+TAO_StreamCtrl*
+Distributer::get_sender_streamctrl (void)
+{
+ return &this->sender_streamctrl_;
+}
+
+
+void
+Distributer::stream_created (void)
+{
+ this->stream_count_++;
+}
+
+void
+Distributer::stream_destroyed (void)
+{
+ this->stream_count_--;
+
+ if (this->stream_count_ == 0)
+ this->done_ = 1;
+}
+
+int
+Distributer::done (void)
+{
+ return this->done_;
+}
+
+
+int
+main (int argc,
+ char **argv)
+{
+ ACE_DECLARE_NEW_CORBA_ENV;
+ ACE_TRY
+ {
+ // Initialize the ORB first.
+ CORBA::ORB_var orb = CORBA::ORB_init (argc,
+ argv,
+ 0,
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+
+ CORBA::Object_var obj
+ = orb->resolve_initial_references ("RootPOA",
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ // Get the POA_var object from Object_var.
+ PortableServer::POA_var root_poa =
+ PortableServer::POA::_narrow (obj.in (),
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ PortableServer::POAManager_var mgr
+ = root_poa->the_POAManager (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ mgr->activate (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ // Initialize the AVStreams components.
+ TAO_AV_CORE::instance ()->init (orb.in (),
+ root_poa.in (),
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ // Initialize the Distributer
+ int result = DISTRIBUTER::instance ()->init (argc,
+ argv,
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ if (result != 0)
+ return result;
+
+ // run the orb till the streams are not destroyed.
+ while (!DISTRIBUTER::instance ()->done () && orb->work_pending ())
+ orb->perform_work (ACE_TRY_ENV);
+
+ }
+ ACE_CATCHANY
+ {
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"main");
+ return -1;
+ }
+ ACE_ENDTRY;
+ ACE_CHECK_RETURN (-1);
+
+ return 0;
+}
+
+#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
+template class ACE_Singleton <Distributer,ACE_Null_Mutex>;
+template class TAO_AV_Endpoint_Reactive_Strategy_A
+<Distributer_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl>;
+template class TAO_AV_Endpoint_Reactive_Strategy
+<Distributer_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl>;
+#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
+#pragma instantiate ACE_Singleton <Client,ACE_Null_Mutex>
+#pragma instantiate TAO_AV_Endpoint_Reactive_Strategy_A
+<Distributer_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl>
+#pragma instantiate TAO_AV_Endpoint_Reactive_Strategy
+<Distributer_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl>
+#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
diff --git a/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/distributer.h b/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/distributer.h
new file mode 100644
index 00000000000..6441221e572
--- /dev/null
+++ b/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/distributer.h
@@ -0,0 +1,202 @@
+/* -*- C++ -*- */
+// $Id$
+// ============================================================================
+//
+// = LIBRARY
+// TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage
+//
+// = FILENAME
+// distributer.h
+//
+// = DESCRIPTION
+// Process to receive data from the sender and send it to the receiver
+//
+// = AUTHOR
+// Yamuna Krishnamurthy <yamuna@cs.wustl.edu>
+//
+// ============================================================================
+
+#ifndef DISTRIBUTER_H
+#define DISTRIBUTER_H
+
+#include "orbsvcs/Naming/Naming_Utils.h"
+#include "orbsvcs/AV/AVStreams_i.h"
+#include "orbsvcs/AV/Endpoint_Strategy.h"
+#include "orbsvcs/AV/Policy.h"
+
+#define AV_MAX_STREAM_COUNT 2
+// The distributer currently handles only two connections, one from
+// the sender and one from the receiver.
+
+class Distributer;
+
+class Distributer_Callback : public TAO_AV_Callback
+{
+ // = TITLE
+ // Defines a class for the distributer application callback
+ // for receiving data.
+ //
+ // = DESCRIPTION
+ // This class overides the methods of the TAO_AV_Callback so the
+ // AVStreams can make upcalls to the application.
+
+public:
+
+ int receive_frame (ACE_Message_Block *frame,
+ TAO_AV_frame_info *frame_info,
+ const ACE_Addr &peer_address);
+ // Method that is called when there is data to be received from the sender
+
+ int handle_destroy (void);
+ // Called when the sender has finished reading the file and wants
+ // to close down the connection.
+
+ void flowname (ACE_CString);
+ ACE_CString flowname (void);
+ // Accessor methods to set and get the flowname corresponding
+ // to the callback
+
+
+protected:
+ ACE_CString flowname_;
+
+};
+
+class Distributer_StreamEndPoint : public TAO_Client_StreamEndPoint
+{
+ // = TITLE
+ // Defines the distributer aplication stream endpoint
+ //
+ // = DESCRIPTION
+ // This is the class that overrides the tao_server_endpoint to handle
+ // pre and post connect processing.
+public:
+
+
+ int set_protocol_object (const char *,
+ TAO_AV_Protocol_Object *object);
+ // Store the reference to the protocol object corresponding
+ // to the transport
+
+ int get_callback (const char *flowname,
+ TAO_AV_Callback *&callback);
+ // Create the distributer application sender callback.
+
+
+private:
+ Distributer_Callback callback_;
+ // Reference to the application callback.
+};
+
+class Distributer
+{
+ // = TITLE
+ // Defines the distributer application class.
+ //
+ // = DESCRIPTION
+ // The distributer progarm that acts as intermediate receiver
+ // that receives data from the sender process.
+public:
+ Distributer (void);
+ // Constructor
+
+ ~Distributer (void);
+ // Destructor.
+
+ int init (int argc,
+ char **argv,
+ CORBA::Environment &);
+ // Initialize data components.
+
+ int set_protocol_object (const char*,
+ TAO_AV_Protocol_Object *object);
+ TAO_AV_Protocol_Object* get_protocol_object (const char*);
+ // Accessor methods to set/get the protocol object of the receiver/sender
+ // process
+
+ int bind_to_mmdevice (AVStreams::MMDevice_ptr &mmdevice,
+ ACE_CString mmdevice_name,
+ CORBA::Environment &ACE_TRY_ENV);
+ // Resolve the reference of the mmdevice from the naming service.
+
+ TAO_StreamCtrl* get_receiver_streamctrl (void);
+
+ TAO_StreamCtrl* get_sender_streamctrl (void);
+
+ void stream_created (void);
+ // Called when stream created
+
+ void stream_destroyed (void);
+ // Called when stream destroyed
+
+ int done (void);
+ // Return the flag that suggests orb shutdown.
+
+protected:
+ TAO_Naming_Client my_naming_client_;
+ // The Naming Service Client.
+
+ TAO_AV_Endpoint_Reactive_Strategy_A
+ <Distributer_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl> a_endpoint_strategy_;
+ // The reactive strategy of the distributer receiver.
+
+ TAO_MMDevice distributer_mmdevice_;
+ // The distributer receiver multimedia device
+
+ AVStreams::MMDevice_var receiver_mmdevice_;
+ // The receiver MMDevice.
+
+ AVStreams::MMDevice_var sender_mmdevice_;
+ // The sender MMDevice.
+
+ TAO_AV_Protocol_Object *protocol_object_ [2];
+
+ int count_;
+ // Number of frames sent.
+
+ int argc_;
+ char **argv_;
+
+ ACE_CString filename_;
+ // File from which data is read.
+
+ ACE_CString address_;
+ // Address of the distributer host machine or a multicast address - Default is
+ // UDP multicast addess
+
+ ACE_CString protocol_;
+ // Selected protocol - default is UDP
+
+ ACE_CString sender_flowname_;
+ // The flow name of the stream set up between the
+ // sender and the distributer.
+
+ ACE_CString receiver_flowname_;
+ // The flow name of the stream set up between the
+ // receiver and the distributer.
+
+ TAO_StreamCtrl sender_streamctrl_;
+ // Video stream controller for the stream between sender and distributer
+
+ TAO_StreamCtrl receiver_streamctrl_;
+ // Video stream controller for the stream between receivers and distributer
+
+ int use_sfp_;
+ // If set to 1 then use sfp as the flow carrier protocol.
+ ACE_Message_Block mb;
+ // Message block into which data is read from a file and then sent.
+
+ int stream_count_;
+ // Number of active streams. When a stream is disconnected this
+ // count is decremented.
+
+ int done_;
+ // Flag to indicate orb shutdown
+};
+
+typedef ACE_Singleton<Distributer, ACE_Null_Mutex> DISTRIBUTER;
+
+#endif /*DISTRIBUTER_H*/
+
+
+
diff --git a/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/receiver.cpp b/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/receiver.cpp
new file mode 100644
index 00000000000..85ffc3e41dd
--- /dev/null
+++ b/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/receiver.cpp
@@ -0,0 +1,244 @@
+// $Id$
+
+#include "receiver.h"
+#include "ace/Get_Opt.h"
+
+static FILE *output_file = 0;
+// File into which the received data is written.
+
+static const char *output_file_name = "output";
+// File handle of the file into which data is written.
+
+int
+Receiver_StreamEndPoint::get_callback (const char *,
+ TAO_AV_Callback *&callback)
+{
+ // Return the application callback to the AVStreams for further upcalls
+ callback = &this->callback_;
+ return 0;
+}
+
+int
+Receiver_Callback::receive_frame (ACE_Message_Block *frame,
+ TAO_AV_frame_info *,
+ const ACE_Addr &)
+{
+ // Upcall from the AVStreams when there is data to be received from the
+ // distributer.
+
+ ACE_DEBUG ((LM_DEBUG,
+ "Receiver_Callback::receive_frame\n"));
+
+ while (frame != 0)
+ {
+ // Write the received data to the file.
+ unsigned int result = ACE_OS::fwrite (frame->rd_ptr (),
+ frame->length (),
+ 1,
+ output_file);
+
+ if (result == frame->length ())
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "FTP_Server_Flow_Handler::fwrite failed\n"),
+ -1);
+
+ frame = frame->cont ();
+ }
+ return 0;
+}
+
+int
+Receiver_Callback::handle_destroy (void)
+{
+ // Called when the distributer requests the stream to be shutdown.
+ ACE_DEBUG ((LM_DEBUG,
+ "Receiver_Callback::end_stream\n"));
+ TAO_AV_CORE::instance ()->orb ()->shutdown ();
+ return 0;
+}
+
+
+int
+Receiver_Callback::handle_stop (void)
+{
+ // Called when the distributer requests the stream to be shutdown.
+ ACE_DEBUG ((LM_DEBUG,
+ "Receiver_Callback::end_stream\n"));
+ TAO_AV_CORE::instance ()->orb ()->shutdown ();
+ return 0;
+}
+
+Receiver::Receiver (void)
+ : mmdevice_ (0)
+{
+}
+
+Receiver::~Receiver (void)
+{
+ delete this->mmdevice_;
+}
+
+int
+Receiver::init (int,
+ char **,
+ CORBA::Environment &ACE_TRY_ENV)
+{
+ // Initialize the endpoint strategy with the orb and poa.
+ int result =
+ this->reactive_strategy_.init (TAO_AV_CORE::instance ()->orb (),
+ TAO_AV_CORE::instance ()->poa ());
+
+ if (result != 0)
+ return result;
+
+ // Register the receiver mmdevice object with the ORB
+ ACE_NEW_RETURN (this->mmdevice_,
+ TAO_MMDevice (&this->reactive_strategy_),
+ -1);
+
+ // Register the receiver mmdevice with the naming service.
+ CosNaming::Name receiver_mmdevice_name (1);
+ receiver_mmdevice_name.length (1);
+ receiver_mmdevice_name [0].id = CORBA::string_dup ("Receiver");
+
+ CORBA::Object_var mmdevice =
+ this->mmdevice_->_this (ACE_TRY_ENV);
+ ACE_CHECK_RETURN(-1);
+
+ // Initialize the naming services
+ if (this->my_naming_client_.init (TAO_AV_CORE::instance ()->orb ()) != 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Unable to initialize "
+ "the TAO_Naming_Client\n"),
+ -1);
+
+ // Register the server object with the naming server.
+ this->my_naming_client_->rebind (receiver_mmdevice_name,
+ mmdevice.in (),
+ ACE_TRY_ENV);
+ ACE_CHECK_RETURN (-1);
+
+ return 0;
+}
+
+int
+parse_args (int argc,
+ char **argv)
+{
+ // Parse the command line arguments
+ ACE_Get_Opt opts (argc,
+ argv,
+ "f:");
+
+ int c;
+ while ((c = opts ()) != -1)
+ {
+ switch (c)
+ {
+ case 'f':
+ output_file_name = opts.optarg;
+ break;
+ default:
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Usage: server -f filename"),
+ -1);
+ }
+ }
+
+ return 0;
+}
+
+int
+main (int argc,
+ char **argv)
+{
+ ACE_DECLARE_NEW_CORBA_ENV;
+ ACE_TRY
+ {
+ // Initialize the ORB first.
+ CORBA::ORB_var orb = CORBA::ORB_init (argc,
+ argv,
+ 0,
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ int result =
+ parse_args (argc,
+ argv);
+
+ if (result == -1)
+ return -1;
+
+ // Make sure we have a valid <output_file>
+ output_file = ACE_OS::fopen (output_file_name,
+ "w");
+ if (output_file == 0)
+ ACE_ERROR_RETURN ((LM_DEBUG,
+ "Cannot open output file %s\n",
+ output_file_name),
+ -1);
+
+ else ACE_DEBUG ((LM_DEBUG,
+ "File Opened Successfull\n"));
+
+ CORBA::Object_var obj
+ = orb->resolve_initial_references ("RootPOA",
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ // Get the POA_var object from Object_var.
+ PortableServer::POA_var root_poa =
+ PortableServer::POA::_narrow (obj.in (),
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ PortableServer::POAManager_var mgr
+ = root_poa->the_POAManager (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ mgr->activate (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ // Initialize the AVStreams components.
+ TAO_AV_CORE::instance ()->init (orb.in (),
+ root_poa.in (),
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ Receiver receiver;
+ result =
+ receiver.init (argc,
+ argv,
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ if (result != 0)
+ return result;
+
+ orb->run (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ }
+ ACE_CATCHANY
+ {
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"receiver::init");
+ return -1;
+ }
+ ACE_ENDTRY;
+ ACE_CHECK_RETURN (-1);
+
+ ACE_OS::fclose (output_file);
+
+ return 0;
+}
+
+#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
+template class TAO_AV_Endpoint_Reactive_Strategy_B
+<Receiver_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl>;
+template class TAO_AV_Endpoint_Reactive_Strategy
+<Receiver_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl>;
+#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
+#pragma instantiate TAO_AV_Endpoint_Reactive_Strategy_B
+<Receiver_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl>
+#pragma instantiate TAO_AV_Endpoint_Reactive_Strategy
+<Receiver_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl>
+#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
diff --git a/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/receiver.h b/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/receiver.h
new file mode 100644
index 00000000000..ece363d2dd4
--- /dev/null
+++ b/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/receiver.h
@@ -0,0 +1,100 @@
+/* -*- C++ -*- */
+// $Id$
+// ============================================================================
+//
+// = LIBRARY
+// TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage
+//
+// = FILENAME
+// receiver.h
+//
+// = DESCRIPTION
+// Receiver to receive data from the distributer
+//
+// = AUTHOR
+// Yamuna Krishnamurthy <yamuna@cs.wustl.edu>
+//
+// ============================================================================
+
+#ifndef RECEIVER_H
+#define RECEIVER_H
+
+#include "orbsvcs/Naming/Naming_Utils.h"
+#include "orbsvcs/AV/AVStreams_i.h"
+#include "orbsvcs/AV/Endpoint_Strategy.h"
+#include "orbsvcs/AV/Policy.h"
+
+class Receiver_Callback : public TAO_AV_Callback
+{
+ // = TITLE
+ // Defines a class for the application callback.
+ //
+ // = DESCRIPTION
+ // This class overides the methods of the TAO_AV_Callback so the
+ // AVStreams can make upcalls to the application.
+
+public:
+
+ // Method that is called when there is data to be received from the distributer
+ int receive_frame (ACE_Message_Block *frame,
+ TAO_AV_frame_info *frame_info,
+ const ACE_Addr &peer_address);
+
+ // Called when the distributer has finished sending the data and wants
+ // to close down the connection.
+ int handle_destroy (void);
+
+ int handle_stop (void);
+};
+
+class Receiver_StreamEndPoint : public TAO_Server_StreamEndPoint
+{
+ // = TITLE
+ // Defines the aplication stream endpoint
+ //
+ // = DESCRIPTION
+ // This is the class that overrides the tao_server_enpodint to handle
+ // pre and post connect processing.
+public:
+ // Create the application callback.
+ int get_callback (const char *flowname,
+ TAO_AV_Callback *&callback);
+
+private:
+ Receiver_Callback callback_;
+ // reference to the server application callback.
+};
+
+class Receiver
+{
+ // = TITLE
+ // Defines the receiver application class.
+ //
+ // = DESCRIPTION
+ // The receiver progarm that receives data
+ // sent by the distributer.
+public:
+ Receiver (void);
+ // Constructor
+
+ ~Receiver (void);
+ // Destructor.
+
+ int init (int argc,
+ char **argv,
+ CORBA::Environment &);
+ // Initialize data components.
+
+protected:
+ TAO_Naming_Client my_naming_client_;
+ // The Naming Service Client.
+
+ TAO_AV_Endpoint_Reactive_Strategy_B
+ <Receiver_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl> reactive_strategy_;
+ // The endpoint reactive strategy.
+
+ TAO_MMDevice *mmdevice_;
+ // The receiver MMDevice.
+};
+
+#endif /*RECEIVER_H*/
diff --git a/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/run_test.pl b/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/run_test.pl
new file mode 100755
index 00000000000..63e8a672ec1
--- /dev/null
+++ b/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/run_test.pl
@@ -0,0 +1,83 @@
+eval '(exit $?0)' && eval 'exec perl -S $0 ${1+"$@"}'
+ & eval 'exec perl -S $0 $argv:q'
+ if 0;
+
+# $Id$
+# -*- perl -*-
+
+use lib '../../../../../bin';
+use PerlACE::Run_Test;
+
+# amount of delay between running the servers
+
+$sleeptime = 6;
+$status = 0;
+
+$nsior = PerlACE::LocalFile ("ns.ior");
+$testfile = PerlACE::LocalFile ("test");
+$makefile = PerlACE::LocalFile ("Makefile");
+
+unlink $nsior;
+
+$NS = new PerlACE::Process ("../../../Naming_Service/Naming_Service", "-o $nsior");
+$SV = new PerlACE::Process ("sender", "-ORBInitRef NameService=file://$nsior -f $makefile");
+$RE = new PerlACE::Process ("receiver", "-ORBInitRef NameService=file://$nsior -f $testfile");
+$DI = new PerlACE::Process ("distributer", "-ORBInitRef NameService=file://$nsior");
+
+print STDERR "Starting Naming Service\n";
+
+$NS->Spawn ();
+
+if (PerlACE::waitforfile_timed ($nsior, 5) == -1) {
+ print STDERR "ERROR: cannot find naming service IOR file\n";
+ $NS->Kill ();
+ exit 1;
+}
+
+print STDERR "Starting Sender\n";
+
+$SV->Spawn ();
+
+sleep $sleeptime;
+
+print STDERR "Starting Receiver\n";
+
+$RE->Spawn ();
+
+sleep $sleeptime;
+
+print STDERR "Starting Distributer\n";
+
+$distributer = $DI->SpawnWaitKill (60);
+
+if ($distributer != 0) {
+ print STDERR "ERROR: distributer returned $distributer\n";
+ $status = 1;
+}
+
+$sender = $SV->TerminateWaitKill (5);
+
+if ($sender != 0) {
+ print STDERR "ERROR: sender returned $sender\n";
+ $status = 1;
+}
+
+$receiver = $RE->TerminateWaitKill (5);
+
+if ($receiver != 0) {
+ print STDERR "ERROR: sender returned $sender\n";
+ $status = 1;
+}
+
+
+$nserver = $NS->TerminateWaitKill (5);
+
+if ($nserver != 0) {
+ print STDERR "ERROR: Naming Service returned $nserver\n";
+ $status = 1;
+}
+
+unlink $nsior;
+unlink $testfile;
+
+exit $status;
diff --git a/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/sender.cpp b/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/sender.cpp
new file mode 100644
index 00000000000..0636a01a6e5
--- /dev/null
+++ b/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/sender.cpp
@@ -0,0 +1,419 @@
+// $Id$
+
+#include "sender.h"
+#include "ace/Get_Opt.h"
+#include "ace/High_Res_Timer.h"
+
+ACE_High_Res_Timer last_frame_sent_time;
+// The time taken for sending a frmae and preparing for the next frame
+
+ACE_Time_Value inter_frame_time;
+// The time that should lapse between two consecutive frames sent.
+
+int
+Sender_Callback::handle_start (void)
+{
+ // Connection setup, start sending data.
+
+ ACE_DECLARE_NEW_CORBA_ENV;
+ SENDER::instance ()->pace_data (ACE_TRY_ENV);
+ return 0;
+}
+
+Sender_StreamEndPoint::Sender_StreamEndPoint (void)
+{
+}
+
+int
+Sender_StreamEndPoint::get_callback (const char *,
+ TAO_AV_Callback *&callback)
+{
+ // Create and return the client application callback and return to the AVStreams
+ // for further upcalls.
+ callback = &this->callback_;
+
+ // Store reference to the sender stream endpoint
+ SENDER::instance ()->set_endpoint (this);
+ return 0;
+}
+
+int
+Sender_StreamEndPoint::set_protocol_object (const char *,
+ TAO_AV_Protocol_Object *object)
+{
+ // Set the client protocol object corresponding to the transport protocol selected.
+ SENDER::instance ()->set_protocol_object (object);
+ return 0;
+}
+
+Sender::Sender (void)
+ :count_ (0),
+ fp_ (0),
+ frame_rate_ (30)
+{
+ this->mb.size (BUFSIZ);
+}
+
+void
+Sender::set_protocol_object (TAO_AV_Protocol_Object *object)
+{
+ // Set the client protocol object corresponding to the transport protocol selected.
+ this->protocol_object_ = object;
+}
+
+void
+Sender::set_endpoint (Sender_StreamEndPoint* endpoint)
+{
+ // Store the sender stream endpoint
+ this->endpoint_ = endpoint;
+}
+
+Sender_StreamEndPoint*
+Sender::get_endpoint (void)
+{
+ // Return the sender stream endpoint reference.
+ return this->endpoint_;
+}
+
+int
+Sender::parse_args (int argc,
+ char **argv)
+{
+ // Parse command line arguments
+ ACE_Get_Opt opts (argc,argv,"f:r:s");
+
+ int c;
+ while ((c= opts ()) != -1)
+ {
+ switch (c)
+ {
+ case 'f':
+ this->filename_ = opts.optarg;
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "Input File Name %s\n",
+ this->filename_.c_str ()));
+ break;
+ case 'r':
+ this->frame_rate_ = ACE_OS::atoi (opts.optarg);
+ break;
+ default:
+ ACE_DEBUG ((LM_DEBUG,"Unknown Option\n"));
+ return -1;
+ }
+ }
+ return 0;
+}
+
+FILE *
+Sender::file (void)
+{
+ return this->fp_;
+}
+
+TAO_StreamCtrl*
+Sender::streamctrl (void)
+{
+ return &this->streamctrl_;
+}
+
+
+int
+Sender::frame_rate (void)
+{
+ return this->frame_rate_;
+}
+
+
+// Method to bind the sender reference to the Naming Service.
+int
+Sender::register_sender (CORBA::Environment &ACE_TRY_ENV)
+{
+ // Initialize the naming services
+ if (my_naming_client_.init (TAO_AV_CORE::instance ()->orb ()) != 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ " (%P|%t) Unable to initialize "
+ "the TAO_Naming_Client. \n"),
+ -1);
+
+ // Register the sender mmdevice object with the ORB
+ ACE_NEW_RETURN (this->sender_mmdevice_,
+ TAO_MMDevice (&this->endpoint_strategy_),
+ -1);
+
+ CosNaming::Name sender_mmdevice_name (1);
+ sender_mmdevice_name.length (1);
+ sender_mmdevice_name [0].id = CORBA::string_dup ("Sender");
+
+ CORBA::Object_var mmdevice =
+ this->sender_mmdevice_->_this (ACE_TRY_ENV);
+ ACE_CHECK_RETURN(-1);
+
+ // Register the server object with the naming server.
+ this->my_naming_client_->rebind (sender_mmdevice_name,
+ mmdevice.in (),
+ ACE_TRY_ENV);
+ ACE_CHECK_RETURN (-1);
+
+ return 0;
+}
+
+int
+Sender::init (int argc,
+ char **argv,
+ CORBA::Environment& ACE_TRY_ENV)
+{
+ this->argc_ = argc;
+ this->argv_ = argv;
+
+ CORBA::String_var ior;
+
+ // Initialize the endpoint strategy with the orb and poa.
+ this->endpoint_strategy_.init(TAO_AV_CORE::instance ()->orb (),
+ TAO_AV_CORE::instance ()->poa ());
+
+ // Parse the command line arguments
+ int result = this->parse_args (argc,
+ argv);
+
+ if (result < 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ " (%P|%t) Error in Parse Args \n"),
+ -1);
+
+
+ // Open file to read.
+ this->fp_ = ACE_OS::fopen (this->filename_.c_str (),
+ "r");
+ if (this->fp_ == 0)
+ ACE_ERROR_RETURN ((LM_DEBUG,
+ "Cannot open output file %s\n",
+ this->filename_.c_str ()),
+ -1);
+ else ACE_DEBUG ((LM_DEBUG,
+ "File opened successfully\n"));
+
+
+ // Register the object reference with the Naming Service.
+ if (this->register_sender (ACE_TRY_ENV) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%P|%t) Error binding to the naming service\n"),
+ -1);
+
+ return 0;
+}
+
+// Method to send data at the specified rate
+int
+Sender::pace_data (CORBA::Environment& ACE_TRY_ENV)
+{
+
+ // Time within which a frame should be sent.
+ double frame_time = 1/ (double) this->frame_rate_;
+
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "Frame Time ONE = %f\n Frame Rate = %d\n",
+ frame_time,
+ this->frame_rate_));
+
+ // The time between two consecutive frames.
+ inter_frame_time.set (frame_time);
+
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "Inter Frame Time = %d\n",
+ inter_frame_time.msec ()));
+
+ ACE_TRY
+ {
+
+ // Continue to send data till the file is read to the end.
+ while (1)
+ {
+
+ // Count the frames sent.
+ count_++;
+
+ // Reset the message block.
+ this->mb.reset ();
+
+ // Read from the file into a message block.
+ int n = ACE_OS::fread (this->mb.wr_ptr (),
+ 1,
+ this->mb.size (),
+ SENDER::instance ()->file ());
+
+ this->mb.wr_ptr (n);
+
+ if (n < 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "FTP_Client_Flow_Handler::fread end of file\n"),
+ -1);
+
+ if (n == 0)
+ {
+ if (feof (SENDER::instance ()->file ()))
+ {
+ // At end of file break the loop and end the client.
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,"Handle_Start:End of file\n"));
+ break;
+ }
+
+ }
+
+ if (this->count_ > 1)
+ {
+ // Second frame and beyond
+
+ // Stop the timer that was started just before the previous frame was sent.
+ last_frame_sent_time.stop ();
+
+ // Get the time elapsed after sending the previous frame.
+ ACE_Time_Value tv;
+ last_frame_sent_time.elapsed_time (tv);
+
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "Elapsed Time = %d\n",
+ tv.msec ()));
+
+ // Check to see if the inter frame time has elapsed.
+ if (tv < inter_frame_time)
+ {
+ // Inter frame time has not elapsed.
+
+ // Claculate the time to wait before the next frame needs to be sent.
+ ACE_Time_Value wait_time (inter_frame_time - tv);
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "Wait Time = %d\n",
+ wait_time.msec ()));
+
+ // run the orb for the wait time so the client can continue other orb requests.
+ TAO_AV_CORE::instance ()->orb ()->run (wait_time,
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ }
+ }
+
+ // Start timer before sending the frame.
+ last_frame_sent_time.start ();
+
+ // Send frame.
+ int result = this->protocol_object_->send_frame (&this->mb);
+
+ if (result < 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "send failed:%p","FTP_Client_Flow_Handler::send\n"),
+ -1);
+ ACE_DEBUG ((LM_DEBUG,"Client::pace_data buffer sent succesfully\n"));
+
+ } // end while
+
+
+ AVStreams::flowSpec stop_spec;
+
+ // Get the strem controoler for this stream.
+ CORBA::Any_ptr streamctrl_any = SENDER::instance ()->get_endpoint ()->get_property_value ("Related_StreamCtrl",
+ ACE_TRY_ENV);
+
+ AVStreams::StreamCtrl_ptr streamctrl;
+
+ *streamctrl_any >>= streamctrl;
+
+ // Destroy the stream
+ streamctrl->destroy (stop_spec,
+ ACE_TRY_ENV);
+
+ // Shut the orb down.
+ TAO_AV_CORE::instance ()->orb ()->shutdown (0);
+ ACE_TRY_CHECK;
+
+ }
+ ACE_CATCHANY
+ {
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
+ "Client::pace_data Failed");
+ return -1;
+ }
+ ACE_ENDTRY;
+ return 0;
+}
+
+int
+main (int argc,
+ char **argv)
+{
+ ACE_DECLARE_NEW_CORBA_ENV;
+ ACE_TRY
+ {
+ CORBA::ORB_var orb = CORBA::ORB_init (argc,
+ argv,
+ 0,
+ ACE_TRY_ENV);
+ CORBA::Object_var obj
+ = orb->resolve_initial_references ("RootPOA",
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+
+ //Get the POA_var object from Object_var
+ PortableServer::POA_var root_poa
+ = PortableServer::POA::_narrow (obj.in (),
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ PortableServer::POAManager_var mgr
+ = root_poa->the_POAManager (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ mgr->activate (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ // Initialize the AV Stream components.
+ TAO_AV_CORE::instance ()->init (orb.in (),
+ root_poa.in (),
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ // Initialize the Client.
+ int result = 0;
+ result = SENDER::instance ()->init (argc,
+ argv,
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ if (result < 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "client::init failed\n"), -1);
+
+ orb->run (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ }
+ ACE_CATCHANY
+ {
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"Sender Failed\n");
+ return -1;
+ }
+ ACE_ENDTRY;
+ ACE_CHECK_RETURN (-1);
+ return 0;
+}
+
+#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
+template class ACE_Singleton <Sender,ACE_Null_Mutex>;
+template class
+TAO_AV_Endpoint_Reactive_Strategy_B<Sender_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl>;
+template class
+TAO_AV_Endpoint_Reactive_Strategy<Sender_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl>;
+#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
+#pragma instantiate ACE_Singleton <Client,ACE_Null_Mutex>
+#pragma instantiate
+TAO_AV_Endpoint_Reactive_Strategy_B<Sender_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl>
+#pragma instantiate
+TAO_AV_Endpoint_Reactive_Strategy<Sender_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl>
+#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
+
diff --git a/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/sender.h b/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/sender.h
new file mode 100644
index 00000000000..9d6d95a7603
--- /dev/null
+++ b/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/sender.h
@@ -0,0 +1,179 @@
+/* -*- C++ -*- */
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage
+//
+// = FILENAME
+// sender.h
+//
+// = DESCRIPTION
+// Sender that reads data from a file and sends it
+// at a requested data rate.
+//
+// = AUTHOR
+// Yamuna Krishnamurthy <yamuna@cs.wustl.edu>
+//
+// ============================================================================
+
+
+#ifndef TAO_AV_SENDER_H
+#define TAO_AV_SENDER_H
+
+#include "ace/Get_Opt.h"
+#include "orbsvcs/Naming/Naming_Utils.h"
+#include "orbsvcs/AV/AVStreams_i.h"
+#include "orbsvcs/AV/Endpoint_Strategy.h"
+#include "orbsvcs/AV/Policy.h"
+#include "orbsvcs/AV/Protocol_Factory.h"
+
+class Sender_Callback : public TAO_AV_Callback
+{
+ // = TITLE
+ // Defines a class for the sender application callback.
+ //
+ // = DESCRIPTION
+ // This class overides the methods of the TAO_AV_Callback so the
+ // AVStreams can make upcalls to the application.
+
+public:
+ int handle_start (void);
+ // Method called when the device is ready to start sending data
+};
+
+class Sender_StreamEndPoint : public TAO_Server_StreamEndPoint
+{
+ // = TITLE
+ // Defines the sender stream endpoint.
+ //
+ // = DESCRIPTION
+ // This class overrides the methods of TAO_ClientStreamendpoint
+ // so the application can perform its processing during post and pre
+ // connection set up.
+
+public:
+ Sender_StreamEndPoint (void);
+ //Contructor
+
+ virtual int get_callback (const char *flowname,
+ TAO_AV_Callback *&callback);
+ // Create the application client callback and return its handle to the
+ // AVStreams for further application callbacks
+
+ virtual int set_protocol_object (const char *flowname,
+ TAO_AV_Protocol_Object *object);
+ // Set protocol object corresponding to the transport protocol chosen.
+
+protected:
+ Sender_Callback callback_;
+ // Reference to the client application callback.
+};
+
+typedef TAO_AV_Endpoint_Reactive_Strategy_B <Sender_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl> ENDPOINT_STRATEGY;
+
+
+class Sender
+{
+ // = TITLE
+ // Defines the Client Application
+ //
+ // = DESCRIPTION
+ // The actual client program that streams data
+ // to the distributers that are waiting for data.
+public:
+ Sender (void);
+ // Constructor
+
+ int init (int argc,
+ char **argv,
+ CORBA::Environment&);
+ // Method to initialize the various data components.
+
+ void set_protocol_object (TAO_AV_Protocol_Object *protocol_object);
+ // Set the protocol object corresponding to the transport protocol chosen.
+
+ void set_endpoint (Sender_StreamEndPoint* endpoint);
+ Sender_StreamEndPoint* get_endpoint (void);
+ // Accessor methods for setting/getting the sender stream endpoint
+
+ int pace_data (CORBA::Environment&);
+ // Method to pace and send data from a file.
+
+ FILE *file (void);
+ // File handle from which data is read to be sent.
+
+ TAO_StreamCtrl* streamctrl (void);
+ // The stream control interface that manages the stream set up
+
+ int frame_rate (void);
+ // The requested frame rate for sending each frame of data read from the file.
+
+private:
+ int parse_args (int argc, char **argv);
+ // Method to parse the command line arguments.
+
+ int register_sender (CORBA::Environment& ACE_TRY_ENV);
+ // Method that binds the client to the Naming Service
+
+ ENDPOINT_STRATEGY endpoint_strategy_;
+ // The reactive strategy of the client.
+
+ AVStreams::MMDevice_var receiver_mmdevice_;
+ // The receiver MMDevice that the sender connects to
+
+ TAO_MMDevice* sender_mmdevice_;
+ // The sender MMDevice.
+
+ TAO_StreamCtrl streamctrl_;
+ // Video stream controller
+
+ Sender_StreamEndPoint* endpoint_;
+ // Reference to the sender streamendpoint
+
+ int count_;
+ // Number of frames sent.
+
+ int argc_;
+ char **argv_;
+
+ ACE_CString filename_;
+ // File from which data is read.
+
+ ACE_CString address_;
+ // Address of the sender host machine or a multicast address - Default is
+ // UDP multicast addess
+
+ TAO_Naming_Client my_naming_client_;
+ // The Naming Service client.
+
+ FILE *fp_;
+ // File handle of the file read from.
+
+ ACE_CString protocol_;
+ // Selected protocol - default is UDP
+
+ ACE_CString flowname_;
+ // Teh flowname_ of the stream set up between the sender and receiver.
+
+ int use_sfp_;
+ // If set to 1 then use sfp as the flow carrier protocol.
+
+ int frame_rate_;
+ // The sepcified data frame rate
+
+ ACE_Message_Block mb;
+ // Message block into which data is read from a file and then sent.
+
+ TAO_AV_Protocol_Object* protocol_object_;
+ // Protocol object corresponding to the transport protocol selected.
+
+};
+
+typedef ACE_Singleton<Sender,ACE_Null_Mutex> SENDER;
+// Create a singleton instance of the Sender.
+
+#endif /* TAO_AV_FTP_H */
+
+