summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoryamuna <yamuna@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2001-03-21 00:18:14 +0000
committeryamuna <yamuna@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2001-03-21 00:18:14 +0000
commitabfe587ae9f02e039ba1e1e5830690facaa12053 (patch)
tree6269e654492a93917fd2252f5b23c136ab836088
parente8a5df9320e95eb22f111f47d2cda1a96815060f (diff)
downloadATCD-abfe587ae9f02e039ba1e1e5830690facaa12053.tar.gz
*** empty log message ***
-rw-r--r--TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/README6
-rw-r--r--TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/distributer.cpp223
-rw-r--r--TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/distributer.h57
-rw-r--r--TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/receiver.cpp82
-rw-r--r--TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/receiver.h1
-rw-r--r--TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/sender.cpp154
-rw-r--r--TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/sender.h19
7 files changed, 278 insertions, 264 deletions
diff --git a/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/README b/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/README
index c11faefa8e8..9280d519f5a 100644
--- a/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/README
+++ b/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/README
@@ -51,6 +51,10 @@ receiver [-f <filename>]
distributer:
-----------
-distributer
+distributer [-s <sender_port_number>] [-r <receiver_port_number]
+
+-s sender port number --> The port at which the sender stream endpoint is opened
+
+--r receiver port number --> The port at which the receiver stream endpoint is opened
diff --git a/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/distributer.cpp b/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/distributer.cpp
index 6ebe4ed39ce..d4a1db4000d 100644
--- a/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/distributer.cpp
+++ b/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/distributer.cpp
@@ -10,22 +10,25 @@ int
Distributer_StreamEndPoint::set_protocol_object (const char *flow_name,
TAO_AV_Protocol_Object *object)
{
- // Set the protocol object corresponding to the transport protocol selected.
- DISTRIBUTER::instance ()->set_protocol_object (flow_name, object);
-
+ if (ACE_OS::strcmp (flow_name, "Data_Sender") == 0)
+ {
+ // Set the protocol object corresponding to the transport protocol selected.
+ DISTRIBUTER::instance ()->set_sender_protocol_object (object);
+ }
+
// Store the flowname of the stream that this callback belongs to.
- ACE_CString fname = flow_name;
- this->callback_.flowname (fname);
+ this->callback_.flowname (flow_name);
// Increment the stream count.
DISTRIBUTER::instance ()->stream_created ();
+
return 0;
}
int
Distributer_StreamEndPoint::get_callback (const char *,
- TAO_AV_Callback *&callback)
+ TAO_AV_Callback *&callback)
{
// Create and return the application callback and return to the AVStreams
// for further upcalls.
@@ -34,7 +37,7 @@ Distributer_StreamEndPoint::get_callback (const char *,
}
void
-Distributer_Callback::flowname (ACE_CString flowname)
+Distributer_Callback::flowname (const ACE_CString &flowname)
{
this->flowname_ = flowname;
}
@@ -56,11 +59,15 @@ Distributer_Callback::receive_frame (ACE_Message_Block *frame,
ACE_DEBUG ((LM_DEBUG,
"Distributer_Callback::receive_frame\n"));
- ACE_CString flowname = "Data_Receiver";
-
// Get the protocol object corresponding to the receiver stream
// send the data received from sender to the receiver.
- DISTRIBUTER::instance ()->get_protocol_object (flowname.c_str())->send_frame (frame);
+
+ int result = DISTRIBUTER::instance ()->get_sender_protocol_object ()->send_frame (frame);
+
+ if (result < 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Distributer_Callback::receive_frame send failed\n"),
+ -1);
return 0;
}
@@ -69,81 +76,115 @@ int
Distributer_Callback::handle_destroy (void)
{
// Called when the sender requests the stream to be shutdown.
- ACE_DECLARE_NEW_CORBA_ENV;
-
- ACE_DEBUG ((LM_DEBUG,
- "Distributer_Callback::end_stream\n"));
-
- if (ACE_OS::strcmp (this->flowname_.c_str (), "Data_Sender") == 0)
+ ACE_TRY_NEW_ENV
{
- // Destroy the sender receiver stream as the sender has requested a stream destroy
- AVStreams::flowSpec stop_spec;
- DISTRIBUTER::instance ()->get_receiver_streamctrl ()->destroy (stop_spec,
- ACE_TRY_ENV);
+
+ ACE_DEBUG ((LM_DEBUG,
+ "Distributer_Callback::end_stream\n"));
+
+ if (ACE_OS::strcmp (this->flowname_.c_str (), "Data_Sender") == 0)
+ {
+ // Destroy the sender receiver stream as the sender has requested a stream destroy
+ AVStreams::flowSpec stop_spec;
+ DISTRIBUTER::instance ()->get_receiver_streamctrl ()->destroy (stop_spec,
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ }
+
+ // Decrement the stream count.
+ DISTRIBUTER::instance ()->stream_destroyed ();
}
-
- // Decrement the stream count.
- DISTRIBUTER::instance ()->stream_destroyed ();
+ ACE_CATCHANY
+ {
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
+ "Distributer_Callback::handle_destroy Failed\n");
+ return -1;
+ }
+ ACE_ENDTRY;
+
return 0;
}
int
-Distributer::set_protocol_object (const char* flowname, TAO_AV_Protocol_Object* object)
+Distributer::set_sender_protocol_object (TAO_AV_Protocol_Object* object)
{
// Set the corresponding protocol objects for the different streams created.
- if (ACE_OS::strcmp ("Data_Sender", flowname) == 0)
- this->protocol_object_ [0] = object;
- else if (ACE_OS::strcmp ("Data_Receiver", flowname) == 0)
- this->protocol_object_ [1] = object;
+ this->sender_protocol_object_ = object;
return 0;
}
TAO_AV_Protocol_Object*
-Distributer::get_protocol_object (const char* flowname)
+Distributer::get_sender_protocol_object (void)
{
- if (ACE_OS::strcmp ("Data_Sender", flowname) == 0)
- return this->protocol_object_ [0];
- else if (ACE_OS::strcmp ("Data_Receiver", flowname) == 0)
- return this->protocol_object_ [1];
-
- return *(this->protocol_object_);
+ return this->sender_protocol_object_;
}
Distributer::Distributer (void)
:distributer_mmdevice_ (0),
+ sender_protocol_object_ (0),
count_ (0),
protocol_ ("UDP"),
- sender_streamctrl_ (0),
receiver_streamctrl_ (0),
stream_count_ (0),
- done_ (0)
+ done_ (0),
+ host_sender_port_ ("8000"),
+ host_receiver_port_ ("8010")
{
-
+
// Get the local host name
char buf [BUFSIZ];
ACE_OS::hostname (buf,
BUFSIZ);
// Set the address to the local host and port.
- this->address_ = buf;
- this->address_ += ":8000";
+ this->sender_address_ = buf;
+ this->sender_address_ += ":";
+ this->sender_address_ += host_sender_port_.c_str ();
+
+ // Set the address to the local host and port.
+ this->receiver_address_ = buf;
+ this->receiver_address_ += ":";
+ this->receiver_address_ += host_receiver_port_.c_str ();
- protocol_object_ [0] = 0;
- protocol_object_ [1] = 0;
- this->mb.size (BUFSIZ);
}
Distributer::~Distributer (void)
{
}
+int
+Distributer::parse_args (int argc,
+ char **argv)
+{
+ // Parse command line arguments
+ ACE_Get_Opt opts (argc,argv,"r:s:");
+
+ int c;
+ while ((c= opts ()) != -1)
+ {
+ switch (c)
+ {
+ case 'r':
+ this->host_receiver_port_ = opts.optarg;
+ break;
+ case 's':
+ this->host_sender_port_ = opts.optarg;
+ break;
+ default:
+ ACE_DEBUG ((LM_DEBUG,"Unknown Option\n"));
+ return -1;
+ }
+ }
+ return 0;
+}
+
// Method to bind the sender reference to the Naming Service.
int
Distributer::bind_to_mmdevice (AVStreams::MMDevice_ptr &mmdevice,
- ACE_CString mmdevice_name,
+ const ACE_CString &mmdevice_name,
CORBA::Environment &ACE_TRY_ENV)
{
// Initialize the naming services
@@ -177,10 +218,11 @@ Distributer::bind_to_mmdevice (AVStreams::MMDevice_ptr &mmdevice,
int
-Distributer::init (int,
- char **,
- CORBA::Environment &ACE_TRY_ENV)
+Distributer::init (int argc,
+ char **argv,
+ CORBA::Environment &ACE_TRY_ENV)
{
+
// Initialize the endpoint strategy with the orb and poa.
int result =
this->a_endpoint_strategy_.init (TAO_AV_CORE::instance ()->orb (),
@@ -188,6 +230,15 @@ Distributer::init (int,
if (result != 0)
return result;
+ result = this->parse_args (argc,
+ argv);
+
+ if (result < 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ " (%P|%t) Error in Parse Args \n"),
+ -1);
+
+
// Bind to the receiver mmdevice
ACE_CString mmdevice_name ("Receiver");
result = bind_to_mmdevice (this->receiver_mmdevice_.inout (),
@@ -208,24 +259,17 @@ Distributer::init (int,
if (result != 0)
return result;
- // Create the Flow protocol name
- ACE_CString flow_protocol_str;
-
- flow_protocol_str = "";
-
// Initialize the QoS
AVStreams::streamQoS_var the_qos (new AVStreams::streamQoS);
// Set the address of the of the distributer receiver endpoint.
- ACE_INET_Addr receiver_addr (this->address_.c_str ());
+ ACE_INET_Addr receiver_addr (this->receiver_address_.c_str ());
- this->receiver_flowname_ = "Data_Receiver";
-
// Create the forward flow specification to describe the flow.
- TAO_Forward_FlowSpec_Entry receiver_entry (this->receiver_flowname_.c_str (),
+ TAO_Forward_FlowSpec_Entry receiver_entry ("Data_Receiver",
"IN",
"USER_DEFINED",
- flow_protocol_str.c_str (),
+ "", // Flowname
this->protocol_.c_str (),
&receiver_addr);
@@ -263,54 +307,51 @@ Distributer::init (int,
ACE_TRY_ENV);
ACE_CHECK_RETURN (-1);
- // Flowname for the stream bewteen the sender and distributer
- this->sender_flowname_ = "Data_Sender";
-
// Get the local host name
char buf [BUFSIZ];
ACE_OS::hostname (buf,
BUFSIZ);
- // Set the address to the local host and port
- this->address_ = buf;
- this->address_ += ":8001";
-
// Set the address of the sender.
- ACE_INET_Addr sender_addr (this->address_.c_str ());
+ ACE_INET_Addr sender_addr (this->sender_address_.c_str ());
// Create the forward flow specification to describe the flow.
- TAO_Forward_FlowSpec_Entry sender_entry (this->sender_flowname_.c_str (),
+ TAO_Forward_FlowSpec_Entry sender_entry ("Data_Sender",
"OUT",
"USER_DEFINED",
- flow_protocol_str.c_str (),
+ "", // Flowname
this->protocol_.c_str (),
&sender_addr);
- // Set the flow specification for the stream between sender and distributer
- flow_spec [0] = CORBA::string_dup (sender_entry.entry_to_string ());
-
- ACE_NEW_RETURN (this->sender_streamctrl_,
+ TAO_StreamCtrl* sender_streamctrl_;
+ // Video stream controller for the stream between sender and distributer
+
+ ACE_NEW_RETURN (sender_streamctrl_,
TAO_StreamCtrl,
-1);
// Servant Reference Counting to manage lifetime
PortableServer::ServantBase_var safe_sender_streamctrl =
- this->sender_streamctrl_;
+ sender_streamctrl_;
+
+ // Set the flow specification for the stream between sender and distributer
+ flow_spec [0] = CORBA::string_dup (sender_entry.entry_to_string ());
// Bind/Connect the sender and sitributer MMDevices.
CORBA::Boolean res =
- this->sender_streamctrl_->bind_devs (distributer_mmdevice.in (),
- this->sender_mmdevice_.in (),
- the_qos.inout (),
- flow_spec,
- ACE_TRY_ENV);
+ sender_streamctrl_->bind_devs (distributer_mmdevice.in (),
+ sender_mmdevice_.in (),
+ the_qos.inout (),
+ flow_spec,
+ ACE_TRY_ENV);
ACE_CHECK_RETURN (-1);
if (res == 0)
ACE_ERROR_RETURN ((LM_ERROR,"Streamctrl::bind_devs failed\n"),-1);
AVStreams::flowSpec start_spec;
- this->sender_streamctrl_->start (start_spec,ACE_TRY_ENV);
+ sender_streamctrl_->start (start_spec,
+ ACE_TRY_ENV);
ACE_CHECK_RETURN (-1);
return 0;
@@ -322,12 +363,6 @@ Distributer::get_receiver_streamctrl (void)
return this->receiver_streamctrl_;
}
-TAO_StreamCtrl*
-Distributer::get_sender_streamctrl (void)
-{
- return this->sender_streamctrl_;
-}
-
void
Distributer::stream_created (void)
@@ -369,7 +404,7 @@ main (int argc,
CORBA::Object_var obj
= orb->resolve_initial_references ("RootPOA",
ACE_TRY_ENV);
- ACE_TRY_CHECK;
+ ACE_TRY_CHECK;
// Get the POA_var object from Object_var.
PortableServer::POA_var root_poa =
@@ -392,15 +427,15 @@ main (int argc,
// Initialize the Distributer
int result = DISTRIBUTER::instance ()->init (argc,
- argv,
- ACE_TRY_ENV);
+ argv,
+ ACE_TRY_ENV);
ACE_TRY_CHECK;
if (result != 0)
return result;
// run the orb till the streams are not destroyed.
- while (!DISTRIBUTER::instance ()->done () && orb->work_pending ())
+ while (!DISTRIBUTER::instance ()->done ())
{
orb->perform_work (ACE_TRY_ENV);
ACE_TRY_CHECK;
@@ -420,14 +455,10 @@ main (int argc,
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
template class ACE_Singleton <Distributer,ACE_Null_Mutex>;
-template class TAO_AV_Endpoint_Reactive_Strategy_A
-<Distributer_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl>;
-template class TAO_AV_Endpoint_Reactive_Strategy
-<Distributer_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl>;
+template class TAO_AV_Endpoint_Reactive_Strategy_A<Distributer_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl>;
+template class TAO_AV_Endpoint_Reactive_Strategy<Distributer_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl>;
#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
#pragma instantiate ACE_Singleton <Client,ACE_Null_Mutex>
-#pragma instantiate TAO_AV_Endpoint_Reactive_Strategy_A
-<Distributer_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl>
-#pragma instantiate TAO_AV_Endpoint_Reactive_Strategy
-<Distributer_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl>
+#pragma instantiate TAO_AV_Endpoint_Reactive_Strategy_A<Distributer_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl>
+#pragma instantiate TAO_AV_Endpoint_Reactive_Strategy<Distributer_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl>
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
diff --git a/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/distributer.h b/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/distributer.h
index 03d2c5a4df8..9e77f1fcab8 100644
--- a/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/distributer.h
+++ b/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/distributer.h
@@ -24,12 +24,6 @@
#include "orbsvcs/AV/Endpoint_Strategy.h"
#include "orbsvcs/AV/Policy.h"
-#define AV_MAX_STREAM_COUNT 2
-// The distributer currently handles only two connections, one from
-// the sender and one from the receiver.
-
-class Distributer;
-
class Distributer_Callback : public TAO_AV_Callback
{
// = TITLE
@@ -51,7 +45,7 @@ public:
// Called when the sender has finished reading the file and wants
// to close down the connection.
- void flowname (ACE_CString);
+ void flowname (const ACE_CString &flowname);
ACE_CString flowname (void);
// Accessor methods to set and get the flowname corresponding
// to the callback
@@ -108,27 +102,28 @@ public:
CORBA::Environment &);
// Initialize data components.
- int set_protocol_object (const char*,
- TAO_AV_Protocol_Object *object);
- TAO_AV_Protocol_Object* get_protocol_object (const char*);
+ int set_sender_protocol_object (TAO_AV_Protocol_Object *object);
+ TAO_AV_Protocol_Object* get_sender_protocol_object (void);
// Accessor methods to set/get the protocol object of the receiver/sender
// process
int bind_to_mmdevice (AVStreams::MMDevice_ptr &mmdevice,
- ACE_CString mmdevice_name,
+ const ACE_CString &mmdevice_name,
CORBA::Environment &ACE_TRY_ENV);
// Resolve the reference of the mmdevice from the naming service.
TAO_StreamCtrl* get_receiver_streamctrl (void);
-
- TAO_StreamCtrl* get_sender_streamctrl (void);
+ // Get the stream control of the receiver
void stream_created (void);
// Called when stream created
void stream_destroyed (void);
// Called when stream destroyed
-
+
+ int parse_args (int argc,
+ char **argv);
+
int done (void);
// Return the flag that suggests orb shutdown.
@@ -149,44 +144,36 @@ protected:
AVStreams::MMDevice_var sender_mmdevice_;
// The sender MMDevice.
- TAO_AV_Protocol_Object *protocol_object_ [2];
-
+ TAO_AV_Protocol_Object *sender_protocol_object_;
+ // The sender protocol object
+
int count_;
// Number of frames sent.
- int argc_;
- char **argv_;
+ ACE_CString sender_address_;
+ // Address of the distributer host machine or a multicast address
- ACE_CString address_;
- // Address of the distributer host machine or a multicast address - Default is
- // UDP multicast addess
+ ACE_CString receiver_address_;
+ // Address of the distributer host machine or a multicast address
ACE_CString protocol_;
// Selected protocol - default is UDP
- ACE_CString sender_flowname_;
- // The flow name of the stream set up between the
- // sender and the distributer.
-
- ACE_CString receiver_flowname_;
- // The flow name of the stream set up between the
- // receiver and the distributer.
-
- TAO_StreamCtrl* sender_streamctrl_;
- // Video stream controller for the stream between sender and distributer
-
TAO_StreamCtrl* receiver_streamctrl_;
// Video stream controller for the stream between receivers and distributer
- ACE_Message_Block mb;
- // Message block into which data is read from a file and then sent.
-
int stream_count_;
// Number of active streams. When a stream is disconnected this
// count is decremented.
int done_;
// Flag to indicate orb shutdown
+
+ ACE_CString host_sender_port_;
+ //Distributer sender streamendpoint port
+
+ ACE_CString host_receiver_port_;
+ //Distributer receiver streamendpoint port
};
typedef ACE_Singleton<Distributer, ACE_Null_Mutex> DISTRIBUTER;
diff --git a/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/receiver.cpp b/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/receiver.cpp
index 5e180b0c668..513f6d9a542 100644
--- a/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/receiver.cpp
+++ b/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/receiver.cpp
@@ -20,8 +20,8 @@ Receiver_StreamEndPoint::get_callback (const char *,
int
Receiver_Callback::receive_frame (ACE_Message_Block *frame,
- TAO_AV_frame_info *,
- const ACE_Addr &)
+ TAO_AV_frame_info *,
+ const ACE_Addr &)
{
// Upcall from the AVStreams when there is data to be received from the
// distributer.
@@ -53,18 +53,22 @@ Receiver_Callback::handle_destroy (void)
// Called when the distributer requests the stream to be shutdown.
ACE_DEBUG ((LM_DEBUG,
"Receiver_Callback::end_stream\n"));
- TAO_AV_CORE::instance ()->orb ()->shutdown ();
- return 0;
-}
-
+
+ ACE_TRY_NEW_ENV
+ {
+ TAO_AV_CORE::instance ()->orb ()->shutdown (0,
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ }
+ ACE_CATCHANY
+ {
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
+ "Receiver_Callback::handle_destroy Failed\n");
+ return -1;
+
+ }
+ ACE_ENDTRY;
-int
-Receiver_Callback::handle_stop (void)
-{
- // Called when the distributer requests the stream to be shutdown.
- ACE_DEBUG ((LM_DEBUG,
- "Receiver_Callback::end_stream\n"));
- TAO_AV_CORE::instance ()->orb ()->shutdown ();
return 0;
}
@@ -79,8 +83,8 @@ Receiver::~Receiver (void)
int
Receiver::init (int,
- char **,
- CORBA::Environment &ACE_TRY_ENV)
+ char **,
+ CORBA::Environment &ACE_TRY_ENV)
{
// Initialize the endpoint strategy with the orb and poa.
int result =
@@ -94,20 +98,20 @@ Receiver::init (int,
ACE_NEW_RETURN (this->mmdevice_,
TAO_MMDevice (&this->reactive_strategy_),
-1);
-
+
// Servant Reference Counting to manage lifetime
PortableServer::ServantBase_var safe_mmdevice =
this->mmdevice_;
+ CORBA::Object_var mmdevice =
+ this->mmdevice_->_this (ACE_TRY_ENV);
+ ACE_CHECK_RETURN(-1);
+
// Register the receiver mmdevice with the naming service.
CosNaming::Name receiver_mmdevice_name (1);
receiver_mmdevice_name.length (1);
receiver_mmdevice_name [0].id = CORBA::string_dup ("Receiver");
- CORBA::Object_var mmdevice =
- this->mmdevice_->_this (ACE_TRY_ENV);
- ACE_CHECK_RETURN(-1);
-
// Initialize the naming services
if (this->my_naming_client_.init (TAO_AV_CORE::instance ()->orb ()) != 0)
ACE_ERROR_RETURN ((LM_ERROR,
@@ -165,25 +169,6 @@ main (int argc,
ACE_TRY_ENV);
ACE_TRY_CHECK;
- int result =
- parse_args (argc,
- argv);
-
- if (result == -1)
- return -1;
-
- // Make sure we have a valid <output_file>
- output_file = ACE_OS::fopen (output_file_name,
- "w");
- if (output_file == 0)
- ACE_ERROR_RETURN ((LM_DEBUG,
- "Cannot open output file %s\n",
- output_file_name),
- -1);
-
- else ACE_DEBUG ((LM_DEBUG,
- "File Opened Successfull\n"));
-
CORBA::Object_var obj
= orb->resolve_initial_references ("RootPOA",
ACE_TRY_ENV);
@@ -208,6 +193,25 @@ main (int argc,
ACE_TRY_ENV);
ACE_TRY_CHECK;
+ int result =
+ parse_args (argc,
+ argv);
+
+ if (result == -1)
+ return -1;
+
+ // Make sure we have a valid <output_file>
+ output_file = ACE_OS::fopen (output_file_name,
+ "w");
+ if (output_file == 0)
+ ACE_ERROR_RETURN ((LM_DEBUG,
+ "Cannot open output file %s\n",
+ output_file_name),
+ -1);
+
+ else ACE_DEBUG ((LM_DEBUG,
+ "File Opened Successfull\n"));
+
Receiver receiver;
result =
receiver.init (argc,
diff --git a/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/receiver.h b/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/receiver.h
index ece363d2dd4..dc0eeb47472 100644
--- a/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/receiver.h
+++ b/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/receiver.h
@@ -44,7 +44,6 @@ public:
// to close down the connection.
int handle_destroy (void);
- int handle_stop (void);
};
class Receiver_StreamEndPoint : public TAO_Server_StreamEndPoint
diff --git a/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/sender.cpp b/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/sender.cpp
index da4516db7a6..e40154e7ccd 100644
--- a/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/sender.cpp
+++ b/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/sender.cpp
@@ -4,20 +4,23 @@
#include "ace/Get_Opt.h"
#include "ace/High_Res_Timer.h"
-ACE_High_Res_Timer last_frame_sent_time;
-// The time taken for sending a frmae and preparing for the next frame
-
-ACE_Time_Value inter_frame_time;
-// The time that should lapse between two consecutive frames sent.
-
int
Sender_Callback::handle_start (void)
{
// Connection setup, start sending data.
-
- ACE_DECLARE_NEW_CORBA_ENV;
- SENDER::instance ()->pace_data (ACE_TRY_ENV);
- ACE_CHECK_RETURN (-1);
+ ACE_TRY_NEW_ENV
+ {
+ SENDER::instance ()->pace_data (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ }
+ ACE_CATCHANY
+ {
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
+ "Sender_Callback::handle_start pace data failed");
+
+ return -1;
+ }
+ ACE_ENDTRY;
return 0;
}
@@ -36,6 +39,7 @@ Sender_StreamEndPoint::get_callback (const char *,
// Store reference to the sender stream endpoint
SENDER::instance ()->set_endpoint (this);
+
return 0;
}
@@ -49,15 +53,15 @@ Sender_StreamEndPoint::set_protocol_object (const char *,
}
Sender::Sender (void)
- :sender_mmdevice_ (0),
- endpoint_ (0),
- count_ (0),
- filename_ ("Makefile"),
- fp_ (0),
- frame_rate_ (30),
- protocol_object_ (0)
+ : sender_mmdevice_ (0),
+ endpoint_ (0),
+ count_ (0),
+ filename_ ("Makefile"),
+ fp_ (0),
+ frame_rate_ (30),
+ protocol_object_ (0)
{
- this->mb.size (BUFSIZ);
+ this->mb_.size (BUFSIZ);
}
void
@@ -117,13 +121,6 @@ Sender::file (void)
return this->fp_;
}
-int
-Sender::frame_rate (void)
-{
- return this->frame_rate_;
-}
-
-
// Method to bind the sender reference to the Naming Service.
int
Sender::register_sender (CORBA::Environment &ACE_TRY_ENV)
@@ -140,11 +137,6 @@ Sender::register_sender (CORBA::Environment &ACE_TRY_ENV)
TAO_MMDevice (&this->endpoint_strategy_),
-1);
-
- CosNaming::Name sender_mmdevice_name (1);
- sender_mmdevice_name.length (1);
- sender_mmdevice_name [0].id = CORBA::string_dup ("Sender");
-
// Servant Reference Counting to manage lifetime
PortableServer::ServantBase_var safe_sender_mmdevice =
this->sender_mmdevice_;
@@ -153,6 +145,10 @@ Sender::register_sender (CORBA::Environment &ACE_TRY_ENV)
this->sender_mmdevice_->_this (ACE_TRY_ENV);
ACE_CHECK_RETURN(-1);
+ CosNaming::Name sender_mmdevice_name (1);
+ sender_mmdevice_name.length (1);
+ sender_mmdevice_name [0].id = CORBA::string_dup ("Sender");
+
// Register the server object with the naming server.
this->my_naming_client_->rebind (sender_mmdevice_name,
mmdevice.in (),
@@ -165,26 +161,21 @@ Sender::register_sender (CORBA::Environment &ACE_TRY_ENV)
int
Sender::init (int argc,
char **argv,
- CORBA::Environment& ACE_TRY_ENV)
+ CORBA::Environment &ACE_TRY_ENV)
{
- this->argc_ = argc;
- this->argv_ = argv;
-
- CORBA::String_var ior;
-
// Initialize the endpoint strategy with the orb and poa.
- this->endpoint_strategy_.init(TAO_AV_CORE::instance ()->orb (),
- TAO_AV_CORE::instance ()->poa ());
-
+ int result =
+ this->endpoint_strategy_.init (TAO_AV_CORE::instance ()->orb (),
+ TAO_AV_CORE::instance ()->poa ());
+
// Parse the command line arguments
- int result = this->parse_args (argc,
+ result = this->parse_args (argc,
argv);
if (result < 0)
ACE_ERROR_RETURN ((LM_ERROR,
" (%P|%t) Error in Parse Args \n"),
-1);
-
// Open file to read.
this->fp_ = ACE_OS::fopen (this->filename_.c_str (),
@@ -194,8 +185,9 @@ Sender::init (int argc,
"Cannot open output file %s\n",
this->filename_.c_str ()),
-1);
- else ACE_DEBUG ((LM_DEBUG,
- "File opened successfully\n"));
+ else
+ ACE_DEBUG ((LM_DEBUG,
+ "File opened successfully\n"));
// Register the object reference with the Naming Service.
result = this->register_sender (ACE_TRY_ENV);
@@ -211,7 +203,7 @@ Sender::init (int argc,
// Method to send data at the specified rate
int
-Sender::pace_data (CORBA::Environment& ACE_TRY_ENV)
+Sender::pace_data (CORBA::Environment &ACE_TRY_ENV)
{
// Time within which a frame should be sent.
@@ -222,7 +214,11 @@ Sender::pace_data (CORBA::Environment& ACE_TRY_ENV)
"Frame Time ONE = %f\n Frame Rate = %d\n",
frame_time,
this->frame_rate_));
-
+
+ // The time that should lapse between two consecutive frames sent.
+ ACE_Time_Value inter_frame_time;
+
+
// The time between two consecutive frames.
inter_frame_time.set (frame_time);
@@ -233,65 +229,63 @@ Sender::pace_data (CORBA::Environment& ACE_TRY_ENV)
ACE_TRY
{
-
+
+ // The time taken for sending a frame and preparing for the next frame
+ ACE_High_Res_Timer elapsed_timer;
+
// Continue to send data till the file is read to the end.
while (1)
{
-
// Count the frames sent.
count_++;
// Reset the message block.
- this->mb.reset ();
+ this->mb_.reset ();
// Read from the file into a message block.
- int n = ACE_OS::fread (this->mb.wr_ptr (),
+ int n = ACE_OS::fread (this->mb_.wr_ptr (),
1,
- this->mb.size (),
+ this->mb_.size (),
SENDER::instance ()->file ());
- this->mb.wr_ptr (n);
-
if (n < 0)
ACE_ERROR_RETURN ((LM_ERROR,
- "FTP_Client_Flow_Handler::fread end of file\n"),
+ "Sender::pace_data fread failed\n"),
-1);
if (n == 0)
{
- if (feof (SENDER::instance ()->file ()))
- {
- // At end of file break the loop and end the client.
- if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,"Handle_Start:End of file\n"));
- break;
- }
-
- }
+ // At end of file break the loop and end the client.
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,"Handle_Start:End of file\n"));
+ break;
+ }
+
+ this->mb_.wr_ptr (n);
if (this->count_ > 1)
{
// Second frame and beyond
// Stop the timer that was started just before the previous frame was sent.
- last_frame_sent_time.stop ();
+ elapsed_timer.stop ();
// Get the time elapsed after sending the previous frame.
- ACE_Time_Value tv;
- last_frame_sent_time.elapsed_time (tv);
+ ACE_Time_Value elapsed_time;
+ elapsed_timer.elapsed_time (elapsed_time);
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG,
"Elapsed Time = %d\n",
- tv.msec ()));
+ elapsed_time.msec ()));
// Check to see if the inter frame time has elapsed.
- if (tv < inter_frame_time)
+ if (elapsed_time < inter_frame_time)
{
// Inter frame time has not elapsed.
- // Claculate the time to wait before the next frame needs to be sent.
- ACE_Time_Value wait_time (inter_frame_time - tv);
+ // Calculate the time to wait before the next frame needs to be sent.
+ ACE_Time_Value wait_time (inter_frame_time - elapsed_time);
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG,
"Wait Time = %d\n",
@@ -305,16 +299,16 @@ Sender::pace_data (CORBA::Environment& ACE_TRY_ENV)
}
// Start timer before sending the frame.
- last_frame_sent_time.start ();
+ elapsed_timer.start ();
// Send frame.
- int result = this->protocol_object_->send_frame (&this->mb);
+ int result = this->protocol_object_->send_frame (&this->mb_);
if (result < 0)
ACE_ERROR_RETURN ((LM_ERROR,
- "send failed:%p","FTP_Client_Flow_Handler::send\n"),
+ "send failed:%p","Sender::pace_data send\n"),
-1);
- ACE_DEBUG ((LM_DEBUG,"Client::pace_data buffer sent succesfully\n"));
+ ACE_DEBUG ((LM_DEBUG,"Sender::pace_data buffer sent succesfully\n"));
} // end while
@@ -322,8 +316,10 @@ Sender::pace_data (CORBA::Environment& ACE_TRY_ENV)
AVStreams::flowSpec stop_spec;
// Get the strem controoler for this stream.
- CORBA::Any_ptr streamctrl_any = SENDER::instance ()->get_endpoint ()->get_property_value ("Related_StreamCtrl",
- ACE_TRY_ENV);
+ CORBA::Any_ptr streamctrl_any =
+ SENDER::instance ()->get_endpoint ()->get_property_value ("Related_StreamCtrl",
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
AVStreams::StreamCtrl_ptr streamctrl;
@@ -332,16 +328,18 @@ Sender::pace_data (CORBA::Environment& ACE_TRY_ENV)
// Destroy the stream
streamctrl->destroy (stop_spec,
ACE_TRY_ENV);
+ ACE_TRY_CHECK;
// Shut the orb down.
- TAO_AV_CORE::instance ()->orb ()->shutdown (0);
+ TAO_AV_CORE::instance ()->orb ()->shutdown (0,
+ ACE_TRY_ENV);
ACE_TRY_CHECK;
}
ACE_CATCHANY
{
ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
- "Client::pace_data Failed");
+ "Client::pace_data Failed\n");
return -1;
}
ACE_ENDTRY;
@@ -393,7 +391,7 @@ main (int argc,
if (result < 0)
ACE_ERROR_RETURN ((LM_ERROR,
- "client::init failed\n"), -1);
+ "Sender::init failed\n"), -1);
orb->run (ACE_TRY_ENV);
ACE_TRY_CHECK;
diff --git a/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/sender.h b/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/sender.h
index 1b4a7ef940a..31e68c686b4 100644
--- a/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/sender.h
+++ b/TAO/orbsvcs/tests/AVStreams/Simple_Three_Stage/sender.h
@@ -104,12 +104,6 @@ public:
FILE *file (void);
// File handle from which data is read to be sent.
- TAO_StreamCtrl* streamctrl (void);
- // The stream control interface that manages the stream set up
-
- int frame_rate (void);
- // The requested frame rate for sending each frame of data read from the file.
-
private:
int parse_args (int argc, char **argv);
// Method to parse the command line arguments.
@@ -119,22 +113,19 @@ private:
ENDPOINT_STRATEGY endpoint_strategy_;
// The reactive strategy of the client.
-
+
AVStreams::MMDevice_var receiver_mmdevice_;
// The receiver MMDevice that the sender connects to
-
+
TAO_MMDevice* sender_mmdevice_;
// The sender MMDevice.
-
+
Sender_StreamEndPoint* endpoint_;
// Reference to the sender streamendpoint
-
+
int count_;
// Number of frames sent.
- int argc_;
- char **argv_;
-
ACE_CString filename_;
// File from which data is read.
@@ -147,7 +138,7 @@ private:
int frame_rate_;
// The sepcified data frame rate
- ACE_Message_Block mb;
+ ACE_Message_Block mb_;
// Message block into which data is read from a file and then sent.
TAO_AV_Protocol_Object* protocol_object_;