summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/tests/AVStreams/Pluggable_Flow_Protocol
diff options
context:
space:
mode:
authorWilliam R. Otte <wotte@dre.vanderbilt.edu>2006-07-24 15:50:21 +0000
committerWilliam R. Otte <wotte@dre.vanderbilt.edu>2006-07-24 15:50:21 +0000
commit3aff90f4a822fcf5d902bbfbcc9fa931d6191a8c (patch)
tree197c810e5f5bce17b1233a7cb8d7b50c0bcd25e2 /TAO/orbsvcs/tests/AVStreams/Pluggable_Flow_Protocol
parent6b846cf03c0bcbd8c276cb0af61a181e5f98eaae (diff)
downloadATCD-3aff90f4a822fcf5d902bbfbcc9fa931d6191a8c.tar.gz
Repo restructuring
Diffstat (limited to 'TAO/orbsvcs/tests/AVStreams/Pluggable_Flow_Protocol')
-rw-r--r--TAO/orbsvcs/tests/AVStreams/Pluggable_Flow_Protocol/AVS_Pluggable_Flow_Protocol.mpc28
-rw-r--r--TAO/orbsvcs/tests/AVStreams/Pluggable_Flow_Protocol/Makefile.am109
-rw-r--r--TAO/orbsvcs/tests/AVStreams/Pluggable_Flow_Protocol/README47
-rw-r--r--TAO/orbsvcs/tests/AVStreams/Pluggable_Flow_Protocol/TimeStamp.cpp241
-rw-r--r--TAO/orbsvcs/tests/AVStreams/Pluggable_Flow_Protocol/TimeStamp.h61
-rw-r--r--TAO/orbsvcs/tests/AVStreams/Pluggable_Flow_Protocol/export.h40
-rw-r--r--TAO/orbsvcs/tests/AVStreams/Pluggable_Flow_Protocol/receiver.cpp261
-rw-r--r--TAO/orbsvcs/tests/AVStreams/Pluggable_Flow_Protocol/receiver.h100
-rwxr-xr-xTAO/orbsvcs/tests/AVStreams/Pluggable_Flow_Protocol/run_test.pl71
-rw-r--r--TAO/orbsvcs/tests/AVStreams/Pluggable_Flow_Protocol/sender.cpp417
-rw-r--r--TAO/orbsvcs/tests/AVStreams/Pluggable_Flow_Protocol/sender.h117
-rw-r--r--TAO/orbsvcs/tests/AVStreams/Pluggable_Flow_Protocol/svc.conf5
-rw-r--r--TAO/orbsvcs/tests/AVStreams/Pluggable_Flow_Protocol/svc.conf.xml15
13 files changed, 1512 insertions, 0 deletions
diff --git a/TAO/orbsvcs/tests/AVStreams/Pluggable_Flow_Protocol/AVS_Pluggable_Flow_Protocol.mpc b/TAO/orbsvcs/tests/AVStreams/Pluggable_Flow_Protocol/AVS_Pluggable_Flow_Protocol.mpc
new file mode 100644
index 00000000000..71dc6fc745f
--- /dev/null
+++ b/TAO/orbsvcs/tests/AVStreams/Pluggable_Flow_Protocol/AVS_Pluggable_Flow_Protocol.mpc
@@ -0,0 +1,28 @@
+// -*- MPC -*-
+// $Id$
+
+project(*ts) : orbsvcslib, portableserver, avstreams, naming {
+ sharedname = TAO_TS
+ dynamicflags = TAO_TS_BUILD_DLL
+
+ Source_Files {
+ TimeStamp.cpp
+ }
+}
+
+project(*send): avstreamsexe {
+ exename = sender
+
+ Source_Files {
+ sender.cpp
+ }
+}
+
+project(*recv): avstreamsexe {
+ exename = receiver
+
+ Source_Files {
+ receiver.cpp
+ }
+}
+
diff --git a/TAO/orbsvcs/tests/AVStreams/Pluggable_Flow_Protocol/Makefile.am b/TAO/orbsvcs/tests/AVStreams/Pluggable_Flow_Protocol/Makefile.am
new file mode 100644
index 00000000000..b97da09530e
--- /dev/null
+++ b/TAO/orbsvcs/tests/AVStreams/Pluggable_Flow_Protocol/Makefile.am
@@ -0,0 +1,109 @@
+## Process this file with automake to create Makefile.in
+##
+## $Id$
+##
+## This file was generated by MPC. Any changes made directly to
+## this file will be lost the next time it is generated.
+##
+## MPC Command:
+## ../bin/mwc.pl -type automake -noreldefs TAO.mwc
+
+ACE_BUILDDIR = $(top_builddir)/..
+ACE_ROOT = $(top_srcdir)/..
+TAO_BUILDDIR = $(top_builddir)
+TAO_ROOT = $(top_srcdir)
+
+noinst_PROGRAMS =
+
+## Makefile.AVS_Pluggable_Flow_Protocol_Recv.am
+
+if !BUILD_ACE_FOR_TAO
+
+noinst_PROGRAMS += receiver
+
+receiver_CPPFLAGS = \
+ -I$(ACE_ROOT) \
+ -I$(ACE_BUILDDIR) \
+ -I$(TAO_ROOT) \
+ -I$(TAO_BUILDDIR) \
+ -I$(TAO_ROOT)/orbsvcs \
+ -I$(TAO_BUILDDIR)/orbsvcs
+
+receiver_SOURCES = \
+ receiver.cpp \
+ receiver.h
+
+receiver_LDADD = \
+ $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_AV.la \
+ $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosProperty_Serv.la \
+ $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosProperty_Skel.la \
+ $(TAO_BUILDDIR)/tao/libTAO_PortableServer.la \
+ $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosProperty.la \
+ $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosNaming.la \
+ $(TAO_BUILDDIR)/tao/libTAO_AnyTypeCode.la \
+ $(TAO_BUILDDIR)/tao/libTAO.la \
+ $(ACE_BUILDDIR)/ace/libACE.la
+
+endif !BUILD_ACE_FOR_TAO
+
+## Makefile.AVS_Pluggable_Flow_Protocol_Send.am
+
+if !BUILD_ACE_FOR_TAO
+
+noinst_PROGRAMS += sender
+
+sender_CPPFLAGS = \
+ -I$(ACE_ROOT) \
+ -I$(ACE_BUILDDIR) \
+ -I$(TAO_ROOT) \
+ -I$(TAO_BUILDDIR) \
+ -I$(TAO_ROOT)/orbsvcs \
+ -I$(TAO_BUILDDIR)/orbsvcs
+
+sender_SOURCES = \
+ sender.cpp \
+ sender.h
+
+sender_LDADD = \
+ $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_AV.la \
+ $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosProperty_Serv.la \
+ $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosProperty_Skel.la \
+ $(TAO_BUILDDIR)/tao/libTAO_PortableServer.la \
+ $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosProperty.la \
+ $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosNaming.la \
+ $(TAO_BUILDDIR)/tao/libTAO_AnyTypeCode.la \
+ $(TAO_BUILDDIR)/tao/libTAO.la \
+ $(ACE_BUILDDIR)/ace/libACE.la
+
+endif !BUILD_ACE_FOR_TAO
+
+## Makefile.AVS_Pluggable_Flow_Protocol_Ts.am
+
+if !BUILD_ACE_FOR_TAO
+
+noinst_LTLIBRARIES = libTAO_TS.la
+
+libTAO_TS_la_CPPFLAGS = \
+ -I$(ACE_ROOT) \
+ -I$(ACE_BUILDDIR) \
+ -I$(TAO_ROOT) \
+ -I$(TAO_BUILDDIR) \
+ -I$(TAO_ROOT)/orbsvcs \
+ -I$(TAO_BUILDDIR)/orbsvcs \
+ -DTAO_TS_BUILD_DLL
+
+libTAO_TS_la_SOURCES = \
+ TimeStamp.cpp
+
+noinst_HEADERS = \
+ TimeStamp.h
+
+endif !BUILD_ACE_FOR_TAO
+
+## Clean up template repositories, etc.
+clean-local:
+ -rm -f *~ *.bak *.rpo *.sym lib*.*_pure_* core core.*
+ -rm -f gcctemp.c gcctemp so_locations *.ics
+ -rm -rf cxx_repository ptrepository ti_files
+ -rm -rf templateregistry ir.out
+ -rm -rf ptrepository SunWS_cache Templates.DB
diff --git a/TAO/orbsvcs/tests/AVStreams/Pluggable_Flow_Protocol/README b/TAO/orbsvcs/tests/AVStreams/Pluggable_Flow_Protocol/README
new file mode 100644
index 00000000000..20365125206
--- /dev/null
+++ b/TAO/orbsvcs/tests/AVStreams/Pluggable_Flow_Protocol/README
@@ -0,0 +1,47 @@
+// $Id$
+
+Description
+-----------
+
+This directory contains a simple test in the form of a sender and a
+receiver. The test has the following features:
+
+1. It tests the AVStreams Pluggable Protocol Framework
+2. Shows a mechanism to pace data.
+
+A new flow protocol called Time Stamp Flow Protocol is plugged into the AV Streams Framework
+using the Pluggable Protocol Framework and the ACE Service Configurator. This new protocol
+is compiled into a library and loaded when the AVStreams framework is initialized. When a
+frame is sent this protocol appends a timestamp to the data being sent.
+
+Running the test
+----------------
+
+receiver
+--------
+
+receiver -f <output_filename>
+
+-f <output_filename> -> The name of the file under which the received stream
+ data has to be stored (defaults to "output")
+
+sender
+------
+
+sender [-f <filename>] [-p <protocol>] [-r <frame rate>] [-d]
+
+
+-f filename --> The file to be streamed to the receiver (defaults to
+ "input").
+
+-p protocol --> The protocol string could be UDP or TCP (defaults to
+ UDP). But with the multicast address it should be UDP.
+
+-r framerate--> The rate at which tha data frames need to be sent
+ (defaults to 30 frames per second).
+
+-d --> Increament the TAO_debug_level for debug messages.
+
+
+The test must be run with the naming service. Check the run_test.pl
+to see how to configure the test case.
diff --git a/TAO/orbsvcs/tests/AVStreams/Pluggable_Flow_Protocol/TimeStamp.cpp b/TAO/orbsvcs/tests/AVStreams/Pluggable_Flow_Protocol/TimeStamp.cpp
new file mode 100644
index 00000000000..d5ad5524c69
--- /dev/null
+++ b/TAO/orbsvcs/tests/AVStreams/Pluggable_Flow_Protocol/TimeStamp.cpp
@@ -0,0 +1,241 @@
+#include "TimeStamp.h"
+#include "orbsvcs/AV/AVStreams_i.h"
+#include "ace/High_Res_Timer.h"
+
+// $Id$
+//TimeStamp Protocol Object
+
+TimeStamp_Protocol_Object::TimeStamp_Protocol_Object (TAO_AV_Callback *callback,
+ TAO_AV_Transport *transport)
+ :TAO_AV_Protocol_Object (callback,transport)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ "TimeStamp_Protocol_Object::TimeStamp_Protocol_Object\n"));
+ ACE_NEW (this->frame_,
+ ACE_Message_Block);
+
+ this->frame_->size (4 * this->transport_->mtu ());
+}
+
+
+int
+TimeStamp_Protocol_Object::handle_input (void)
+{
+ ssize_t n = this->transport_->recv (this->frame_->rd_ptr (),
+ this->frame_->size ());
+ if (n == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_UDP_Flow_Handler::handle_input recv failed\n"),-1);
+ if (n == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_UDP_Flow_Handler::handle_input connection closed\n"),-1);
+ this->frame_->wr_ptr (this->frame_->rd_ptr () + n);
+
+ return this->callback_->receive_frame (this->frame_);
+}
+
+// int
+// TimeStamp_Protocol_Object::handle_input (void)
+// {
+// iovec iov;
+// int iovcnt;
+// int n = this->transport_->recv (&iov, iovcnt);
+
+// int frame_size = BUFSIZ;
+
+// int begin = 1;
+// ACE_Message_Block* prev;
+// int iov_base = 0;
+// while (n >= frame_size)
+// {
+// ACE_DEBUG ((LM_DEBUG,
+// "(%N|%l) Frame Size %d %d\n",
+// n,
+// frame_size));
+
+// ACE_Message_Block* mb;
+// ACE_NEW_RETURN (mb,
+// ACE_Message_Block(frame_size),
+// -1);
+
+// ACE_OS_String::memmove (mb->rd_ptr (), iov.iov_base, frame_size);
+// mb->wr_ptr (mb->rd_ptr () + frame_size);
+
+// // iov_base += frame_size;
+
+// n -= frame_size;
+
+// if (begin)
+// {
+// prev = mb;
+// this->frame_ = mb;
+// begin = 0;
+// }
+// else
+// {
+// prev->cont (mb);
+// prev = mb;
+
+// }
+// }
+
+// if (n > 0)
+// if (begin)
+// {
+// ACE_DEBUG ((LM_DEBUG,
+// "(%N|%l) Frame Size %d\n",
+// n));
+// ACE_OS_String::memmove (this->frame_->rd_ptr (), iov.iov_base, n);
+// this->frame_->wr_ptr (this->frame_->rd_ptr () + n);
+// }
+// else
+// {
+
+// ACE_DEBUG ((LM_DEBUG,
+// "(%N|%l) Frame Size %d\n",
+// n));
+
+// ACE_Message_Block* mb;
+// ACE_NEW_RETURN (mb,
+// ACE_Message_Block (frame_size),
+// -1);
+
+// ACE_OS_String::memmove (mb->rd_ptr (), iov.iov_base, n);
+// mb->wr_ptr (mb->rd_ptr () + n);
+// prev->cont (mb);
+// }
+
+// ACE_DEBUG ((LM_DEBUG,
+// "IOVEC SIZE %d %d\n",
+// n,
+// iov.iov_len));
+
+// if (n == -1)
+// ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_TCP_Flow_Handler::handle_input recv failed\n"),-1);
+// if (n == 0)
+// ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_TCP_Flow_Handler::handle_input connection closed\n"),-1);
+
+// return this->callback_->receive_frame (this->frame_);
+// }
+
+/// send a data frame.
+int
+TimeStamp_Protocol_Object::send_frame (ACE_Message_Block *frame,
+ TAO_AV_frame_info *)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ "TimeStamp_Protocol_Object::send_frame\n"));
+
+ ACE_Message_Block* timestamp;
+ ACE_NEW_RETURN (timestamp,
+ ACE_Message_Block (BUFSIZ),
+ -1);
+
+ ACE_hrtime_t now = ACE_OS::gethrtime ();
+
+ ACE_UINT64 usec = now;
+ ACE_UINT32 val_1 = ACE_CU64_TO_CU32 (usec);
+ ACE_DEBUG ((LM_DEBUG,
+ "Time Stamp %u usecs\n",
+ val_1));
+
+ ACE_OS_String::memcpy (timestamp->wr_ptr (), &now, sizeof (now));
+ timestamp->wr_ptr (sizeof (now));
+
+ frame->cont (timestamp);
+
+ ssize_t result = this->transport_->send (frame);
+ if (result < 0)
+ return result;
+ return 0;
+}
+
+int
+TimeStamp_Protocol_Object::send_frame (iovec const* iov,
+ int iovcnt,
+ TAO_AV_frame_info *)
+{
+ return this->transport_->send (iov,iovcnt);
+}
+
+int
+TimeStamp_Protocol_Object::send_frame (const char* buf,
+ size_t len)
+{
+ int result = this->transport_->send (buf, len, 0);
+ if (result < 0)
+ return result;
+ return 0;
+}
+
+/// end the stream.
+int
+TimeStamp_Protocol_Object::destroy (void)
+{
+ this->callback_->handle_destroy ();
+ return 0;
+}
+
+
+TimeStamp_Protocol_Factory::TimeStamp_Protocol_Factory (void)
+{
+}
+
+TimeStamp_Protocol_Factory::~TimeStamp_Protocol_Factory (void)
+{
+}
+
+int
+TimeStamp_Protocol_Factory::init (int, char **)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ "TimeStamp_Protocol_Factory::init\n"));
+ return 0;
+}
+
+int
+TimeStamp_Protocol_Factory::match_protocol (const char *flow_string)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ "TimeStamp_Protocol_Factory::match_protocol\n"));
+ if (ACE_OS::strcasecmp (flow_string,"TS") == 0)
+ return 1;
+ return 0;
+}
+
+TAO_AV_Protocol_Object*
+TimeStamp_Protocol_Factory::make_protocol_object (TAO_FlowSpec_Entry *entry,
+ TAO_Base_StreamEndPoint *endpoint,
+ TAO_AV_Flow_Handler *handler,
+ TAO_AV_Transport *transport)
+{
+ TAO_AV_Callback *callback = 0;
+ endpoint->get_callback (entry->flowname (),
+ callback);
+
+
+ TimeStamp_Protocol_Object *object = 0;
+ ACE_NEW_RETURN (object,
+ TimeStamp_Protocol_Object (callback,
+ transport),
+ 0);
+ callback->open (object,
+ handler);
+ endpoint->set_protocol_object (entry->flowname (),
+ object);
+ return object;
+}
+
+ACE_FACTORY_DEFINE (TAO_TS, TimeStamp_Protocol_Factory)
+ACE_STATIC_SVC_DEFINE (TimeStamp_Protocol_Factory,
+ ACE_TEXT ("TimeStamp_Protocol_Factory"),
+ ACE_SVC_OBJ_T,
+ &ACE_SVC_NAME (TimeStamp_Protocol_Factory),
+ ACE_Service_Type::DELETE_THIS |
+ ACE_Service_Type::DELETE_OBJ,
+ 0)
+
+
+
+
+
+
+
diff --git a/TAO/orbsvcs/tests/AVStreams/Pluggable_Flow_Protocol/TimeStamp.h b/TAO/orbsvcs/tests/AVStreams/Pluggable_Flow_Protocol/TimeStamp.h
new file mode 100644
index 00000000000..dcc578e1756
--- /dev/null
+++ b/TAO/orbsvcs/tests/AVStreams/Pluggable_Flow_Protocol/TimeStamp.h
@@ -0,0 +1,61 @@
+#ifndef TIMESTAMP_H
+#define TIMESTAMP_H
+#include /**/ "ace/pre.h"
+
+#include "export.h"
+
+#include "orbsvcs/AV/Protocol_Factory.h"
+
+#include "ace/Dynamic_Service.h"
+#include "ace/Service_Config.h"
+
+
+// $Id$
+
+class TAO_TS_Export TimeStamp_Protocol_Object : public TAO_AV_Protocol_Object
+{
+ public:
+ TimeStamp_Protocol_Object (TAO_AV_Callback *callback,
+ TAO_AV_Transport *transport);
+
+ virtual int handle_input (void);
+
+ /// send a data frame.
+ virtual int send_frame (ACE_Message_Block *frame,
+ TAO_AV_frame_info *frame_info = 0);
+
+ virtual int send_frame (iovec const* iov,
+ int iovcnt,
+ TAO_AV_frame_info *frame_info = 0);
+
+ virtual int send_frame (const char*buf,
+ size_t len);
+
+ /// end the stream.
+ virtual int destroy (void);
+
+ private:
+ /// Pre-allocated memory to receive the data...
+ ACE_Message_Block* frame_;
+};
+
+class TAO_TS_Export TimeStamp_Protocol_Factory : public TAO_AV_Flow_Protocol_Factory
+{
+ public:
+ /// Initialization hook.
+ TimeStamp_Protocol_Factory (void);
+ virtual ~TimeStamp_Protocol_Factory (void);
+ virtual int init (int argc, char *argv[]);
+ virtual int match_protocol (const char *flow_string);
+ // Note : Some platforms still don't support Covariant returns
+ virtual TAO_AV_Protocol_Object* make_protocol_object (TAO_FlowSpec_Entry *entry,
+ TAO_Base_StreamEndPoint *endpoint,
+ TAO_AV_Flow_Handler *handler,
+ TAO_AV_Transport *transport);
+};
+
+ACE_STATIC_SVC_DECLARE (TimeStamp_Protocol_Factory)
+ACE_FACTORY_DECLARE (TAO_TS, TimeStamp_Protocol_Factory)
+
+#include /**/ "ace/post.h"
+#endif /*TIMESTAMP_H*/
diff --git a/TAO/orbsvcs/tests/AVStreams/Pluggable_Flow_Protocol/export.h b/TAO/orbsvcs/tests/AVStreams/Pluggable_Flow_Protocol/export.h
new file mode 100644
index 00000000000..13156aa6e0a
--- /dev/null
+++ b/TAO/orbsvcs/tests/AVStreams/Pluggable_Flow_Protocol/export.h
@@ -0,0 +1,40 @@
+
+// -*- C++ -*-
+// $Id$
+// Definition for Win32 Export directives.
+// This file is generated automatically by generate_export_file.pl
+// ------------------------------
+#ifndef EXPORT_H
+#define EXPORT_H
+
+#include "ace/config-all.h"
+
+#if defined (TAO_AS_STATIC_LIBS)
+# if !defined (TAO_TS_HAS_DLL)
+# define TAO_TS_HAS_DLL 0
+# endif /* ! TAO_TS_HAS_DLL */
+#else
+# if !defined (TAO_TS_HAS_DLL)
+# define TAO_TS_HAS_DLL 1
+# endif /* ! TAO_TS_HAS_DLL */
+#endif
+
+#if defined (TAO_TS_HAS_DLL) && (TAO_TS_HAS_DLL == 1)
+# if defined (TAO_TS_BUILD_DLL)
+# define TAO_TS_Export ACE_Proper_Export_Flag
+# define TAO_TS_SINGLETON_DECLARATION(T) ACE_EXPORT_SINGLETON_DECLARATION (T)
+# define TAO_TS_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) ACE_EXPORT_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK)
+# else /* TAO_TS_BUILD_DLL */
+# define TAO_TS_Export ACE_Proper_Import_Flag
+# define TAO_TS_SINGLETON_DECLARATION(T) ACE_IMPORT_SINGLETON_DECLARATION (T)
+# define TAO_TS_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) ACE_IMPORT_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK)
+# endif /* TAO_TS_BUILD_DLL */
+#else /* TAO_TS_HAS_DLL == 1 */
+# define TAO_TS_Export
+# define TAO_TS_SINGLETON_DECLARATION(T)
+# define TAO_TS_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK)
+#endif /* TAO_TS_HAS_DLL == 1 */
+
+#endif /* TAO_TS_EXPORT_H */
+
+// End of auto generated file.
diff --git a/TAO/orbsvcs/tests/AVStreams/Pluggable_Flow_Protocol/receiver.cpp b/TAO/orbsvcs/tests/AVStreams/Pluggable_Flow_Protocol/receiver.cpp
new file mode 100644
index 00000000000..d24c92414ff
--- /dev/null
+++ b/TAO/orbsvcs/tests/AVStreams/Pluggable_Flow_Protocol/receiver.cpp
@@ -0,0 +1,261 @@
+// $Id$
+
+#include "receiver.h"
+#include "ace/Get_Opt.h"
+#include "ace/OS_String.h"
+#include "ace/High_Res_Timer.h"
+
+static FILE *output_file = 0;
+// File handle of the file into which received data is written.
+
+static const char *output_file_name = "output";
+// File name of the file into which received data is written.
+
+int endstream = 0;
+
+int
+Receiver_StreamEndPoint::get_callback (const char *,
+ TAO_AV_Callback *&callback)
+{
+ // Return the receiver application callback to the AVStreams for
+ // future upcalls.
+ callback = &this->callback_;
+ return 0;
+}
+
+Receiver_Callback::Receiver_Callback (void)
+ : frame_count_ (1)
+{
+}
+
+int
+Receiver_Callback::receive_frame (ACE_Message_Block *frame,
+ TAO_AV_frame_info *,
+ const ACE_Addr &)
+{
+ //
+ // Upcall from the AVStreams when there is data to be received from
+ // the sender.
+ //
+ ACE_DEBUG ((LM_DEBUG,
+ "Receiver_Callback::receive_frame for frame %d\n",
+ this->frame_count_++));
+
+ int frame_size = BUFSIZ;
+
+ // Write the received data to the file.
+ size_t result =
+ ACE_OS::fwrite (frame->rd_ptr (),
+ frame_size,
+ 1,
+ output_file);
+
+ if (result == frame->length ())
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Receiver_Callback::fwrite failed\n"),
+ -1);
+ frame->rd_ptr (frame_size);
+
+ ACE_hrtime_t stamp;
+ ACE_OS_String::memcpy (&stamp, frame->rd_ptr (), sizeof (stamp));
+
+ ACE_UINT64 usec = stamp;
+ ACE_UINT32 val_1 = ACE_CU64_TO_CU32 (usec);
+
+ ACE_DEBUG ((LM_DEBUG,
+ "Time Stamp %u\n",
+ val_1));
+
+ frame->reset ();
+ return 0;
+}
+
+int
+Receiver_Callback::handle_destroy (void)
+{
+ // Called when the distributer requests the stream to be shutdown.
+ ACE_DEBUG ((LM_DEBUG,
+ "Receiver_Callback::end_stream\n"));
+
+ endstream = 1;
+ return 0;
+}
+
+Receiver::Receiver (void)
+ : mmdevice_ (0)
+{
+}
+
+Receiver::~Receiver (void)
+{
+}
+
+int
+Receiver::init (int,
+ char **
+ ACE_ENV_ARG_DECL)
+{
+ // Initialize the endpoint strategy with the orb and poa.
+ int result =
+ this->reactive_strategy_.init (TAO_AV_CORE::instance ()->orb (),
+ TAO_AV_CORE::instance ()->poa ());
+ if (result != 0)
+ return result;
+
+ // Register the receiver mmdevice object with the ORB
+ ACE_NEW_RETURN (this->mmdevice_,
+ TAO_MMDevice (&this->reactive_strategy_),
+ -1);
+
+ // Servant Reference Counting to manage lifetime
+ PortableServer::ServantBase_var safe_mmdevice =
+ this->mmdevice_;
+
+ CORBA::Object_var mmdevice =
+ this->mmdevice_->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
+
+ // Register the mmdevice with the naming service.
+ CosNaming::Name name (1);
+ name.length (1);
+ name [0].id =
+ CORBA::string_dup ("Receiver");
+
+ // Initialize the naming services
+ if (this->naming_client_.init (TAO_AV_CORE::instance ()->orb ()) != 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Unable to initialize "
+ "the TAO_Naming_Client\n"),
+ -1);
+
+ // Register the receiver object with the naming server.
+ this->naming_client_->rebind (name,
+ mmdevice.in ()
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
+
+ return 0;
+}
+
+int
+parse_args (int argc,
+ char **argv)
+{
+ // Parse the command line arguments
+ ACE_Get_Opt opts (argc,
+ argv,
+ "f:");
+
+ int c;
+ while ((c = opts ()) != -1)
+ {
+ switch (c)
+ {
+ case 'f':
+ output_file_name = opts.opt_arg ();
+ break;
+ default:
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Usage: receiver -f filename"),
+ -1);
+ }
+ }
+
+ return 0;
+}
+
+int
+main (int argc,
+ char **argv)
+{
+ ACE_DECLARE_NEW_CORBA_ENV;
+ ACE_TRY
+ {
+ ACE_High_Res_Timer::global_scale_factor ();
+
+ // Initialize the ORB first.
+ CORBA::ORB_var orb =
+ CORBA::ORB_init (argc,
+ argv,
+ 0
+ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ CORBA::Object_var obj
+ = orb->resolve_initial_references ("RootPOA"
+ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ // Get the POA_var object from Object_var.
+ PortableServer::POA_var root_poa =
+ PortableServer::POA::_narrow (obj.in ()
+ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ PortableServer::POAManager_var mgr
+ = root_poa->the_POAManager (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ mgr->activate (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ // Initialize the AVStreams components.
+ TAO_AV_CORE::instance ()->init (orb.in (),
+ root_poa.in ()
+ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ int result =
+ parse_args (argc,
+ argv);
+
+ if (result == -1)
+ return -1;
+
+ // Make sure we have a valid <output_file>
+ output_file = ACE_OS::fopen (output_file_name,
+ "w");
+ if (output_file == 0)
+ ACE_ERROR_RETURN ((LM_DEBUG,
+ "Cannot open output file %s\n",
+ output_file_name),
+ -1);
+
+ else
+ ACE_DEBUG ((LM_DEBUG,
+ "File Opened Successfully\n"));
+
+ Receiver receiver;
+ result =
+ receiver.init (argc,
+ argv
+ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ if (result != 0)
+ return result;
+
+ while (!endstream)
+ {
+ orb->perform_work (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ }
+
+ // Hack for now....
+ ACE_OS::sleep (1);
+
+ orb->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ }
+ ACE_CATCHANY
+ {
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"receiver::init");
+ return -1;
+ }
+ ACE_ENDTRY;
+ ACE_CHECK_RETURN (-1);
+
+ ACE_OS::fclose (output_file);
+
+ return 0;
+}
diff --git a/TAO/orbsvcs/tests/AVStreams/Pluggable_Flow_Protocol/receiver.h b/TAO/orbsvcs/tests/AVStreams/Pluggable_Flow_Protocol/receiver.h
new file mode 100644
index 00000000000..39c4ac3b39e
--- /dev/null
+++ b/TAO/orbsvcs/tests/AVStreams/Pluggable_Flow_Protocol/receiver.h
@@ -0,0 +1,100 @@
+/* -*- C++ -*- */
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// TAO/orbsvcs/tests/AVStreams/Simple
+//
+// = FILENAME
+// receiver.h
+//
+// = DESCRIPTION
+// This application receives data from a AV sender and writes it to
+// a file.
+//
+// = AUTHOR
+// Yamuna Krishnamurthy <yamuna@cs.wustl.edu>
+//
+// ============================================================================
+
+#include "orbsvcs/Naming/Naming_Client.h"
+#include "orbsvcs/AV/AVStreams_i.h"
+#include "orbsvcs/AV/Endpoint_Strategy.h"
+#include "orbsvcs/AV/Policy.h"
+
+class Receiver_Callback : public TAO_AV_Callback
+{
+ // = TITLE
+ // Application defined callback object.
+ //
+ // = DESCRIPTION
+ // AVStreams calls this class when data shows up from a sender.
+public:
+
+ Receiver_Callback (void);
+ // Constructor.
+
+ // Method that is called when there is data to be received from a
+ // sender.
+ int receive_frame (ACE_Message_Block *frame,
+ TAO_AV_frame_info *frame_info,
+ const ACE_Addr &peer_address);
+
+ // Called when the sender is done sending data and wants to close
+ // down the connection.
+ int handle_destroy (void);
+
+private:
+ int frame_count_;
+ // Keeping a count of the incoming frames.
+};
+
+class Receiver_StreamEndPoint : public TAO_Server_StreamEndPoint
+{
+ // = TITLE
+ // Application defined stream endpoint object.
+ //
+ // = DESCRIPTION
+ // AVStreams calls this class during connection setup.
+public:
+ // Create a receiver application callback.
+ int get_callback (const char *flowname,
+ TAO_AV_Callback *&callback);
+
+private:
+ Receiver_Callback callback_;
+ // Receiver application callback.
+};
+
+class Receiver
+{
+ // = TITLE
+ // Receiver application class.
+ //
+ // = DESCRIPTION
+ // This class receives data from a AV sender and writes it to
+ // a file.
+public:
+ Receiver (void);
+ // Constructor
+
+ ~Receiver (void);
+ // Destructor.
+
+ int init (int argc,
+ char **argv
+ ACE_ENV_ARG_DECL);
+ // Initialize data components.
+
+protected:
+ TAO_Naming_Client naming_client_;
+ // The Naming Service Client.
+
+ TAO_AV_Endpoint_Reactive_Strategy_B
+ <Receiver_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl> reactive_strategy_;
+ // The endpoint reactive strategy.
+
+ TAO_MMDevice *mmdevice_;
+ // Receiver MMDevice.
+};
diff --git a/TAO/orbsvcs/tests/AVStreams/Pluggable_Flow_Protocol/run_test.pl b/TAO/orbsvcs/tests/AVStreams/Pluggable_Flow_Protocol/run_test.pl
new file mode 100755
index 00000000000..c04de62dfd6
--- /dev/null
+++ b/TAO/orbsvcs/tests/AVStreams/Pluggable_Flow_Protocol/run_test.pl
@@ -0,0 +1,71 @@
+eval '(exit $?0)' && eval 'exec perl -S $0 ${1+"$@"}'
+ & eval 'exec perl -S $0 $argv:q'
+ if 0;
+
+# $Id$
+# -*- perl -*-
+
+use lib $ENV{"ACE_ROOT"}.'/bin';
+use PerlACE::Run_Test;
+use File::stat;
+
+# amount of delay between running the servers
+
+$sleeptime = 2;
+$status = 0;
+
+$nsior = PerlACE::LocalFile ("ns.ior");
+$outfile = PerlACE::LocalFile ("output");
+
+# generate test stream data
+$input = PerlACE::generate_test_file("test_input", 102400);
+
+unlink $nsior;
+
+$NS = new PerlACE::Process ($ENV{"TAO_ROOT"}."/orbsvcs/Naming_Service/Naming_Service", "-o $nsior");
+$SV = new PerlACE::Process ("receiver", "-ORBInitRef NameService=file://$nsior -f output");
+$CL = new PerlACE::Process ("sender", "-ORBDebugLevel 2 -ORBInitRef NameService=file://$nsior -f $input -r 2");
+
+print STDERR "Starting Naming Service\n";
+
+$NS->Spawn ();
+
+if (PerlACE::waitforfile_timed ($nsior, 5) == -1) {
+ print STDERR "ERROR: cannot find naming service IOR file\n";
+ $NS->Kill ();
+ exit 1;
+}
+
+print STDERR "Starting Receiver\n";
+
+$SV->Spawn ();
+
+sleep $sleeptime;
+
+print STDERR "Starting Sender\n";
+
+$sender = $CL->SpawnWaitKill (200);
+
+if ($sender != 0) {
+ print STDERR "ERROR: sender returned $sender\n";
+ $status = 1;
+}
+
+$receiver = $SV->TerminateWaitKill (5);
+
+if ($receiver != 0) {
+ print STDERR "ERROR: receiver returned $receiver\n";
+ $status = 1;
+}
+
+$nserver = $NS->TerminateWaitKill (5);
+
+if ($nserver != 0) {
+ print STDERR "ERROR: Naming Service returned $nserver\n";
+ $status = 1;
+}
+
+unlink $nsior;
+unlink $output, $input;
+
+exit $status;
diff --git a/TAO/orbsvcs/tests/AVStreams/Pluggable_Flow_Protocol/sender.cpp b/TAO/orbsvcs/tests/AVStreams/Pluggable_Flow_Protocol/sender.cpp
new file mode 100644
index 00000000000..9af1f2ba80b
--- /dev/null
+++ b/TAO/orbsvcs/tests/AVStreams/Pluggable_Flow_Protocol/sender.cpp
@@ -0,0 +1,417 @@
+// $Id$
+
+#include "sender.h"
+#include "tao/debug.h"
+#include "ace/Get_Opt.h"
+#include "ace/High_Res_Timer.h"
+
+// Create a singleton instance of the Sender.
+
+// An Unmanaged_Singleton is used to avoid static object destruction
+// order related problems since the underlying singleton object
+// contains references to static TypeCodes.
+typedef ACE_Unmanaged_Singleton<Sender, ACE_Null_Mutex> SENDER;
+
+
+int
+Sender_StreamEndPoint::get_callback (const char *,
+ TAO_AV_Callback *&callback)
+{
+ // Create and return the sender application callback to AVStreams
+ // for further upcalls.
+ callback = &this->callback_;
+ return 0;
+}
+
+int
+Sender_StreamEndPoint::set_protocol_object (const char *,
+ TAO_AV_Protocol_Object *object)
+{
+ // Set the sender protocol object corresponding to the transport
+ // protocol selected.
+ SENDER::instance ()->protocol_object (object);
+ return 0;
+}
+
+Sender::Sender (void)
+ : sender_mmdevice_ (0),
+ streamctrl_ (0),
+ frame_count_ (0),
+ filename_ ("input"),
+ input_file_ (0),
+ protocol_ ("UDP"),
+ frame_rate_ (30),
+ mb_ (BUFSIZ)
+{
+}
+
+void
+Sender::protocol_object (TAO_AV_Protocol_Object *object)
+{
+ // Set the sender protocol object corresponding to the transport
+ // protocol selected.
+ this->protocol_object_ = object;
+}
+
+int
+Sender::parse_args (int argc,
+ char **argv)
+{
+ // Parse command line arguments
+ ACE_Get_Opt opts (argc, argv, "f:p:r:d");
+
+ int c;
+ while ((c= opts ()) != -1)
+ {
+ switch (c)
+ {
+ case 'f':
+ this->filename_ = opts.opt_arg ();
+ break;
+ case 'p':
+ this->protocol_ = opts.opt_arg ();
+ break;
+ case 'r':
+ this->frame_rate_ = ACE_OS::atoi (opts.opt_arg ());
+ break;
+ case 'd':
+ TAO_debug_level++;
+ break;
+ default:
+ ACE_DEBUG ((LM_DEBUG, "Unknown Option\n"));
+ return -1;
+ }
+ }
+ return 0;
+}
+
+// Method to get the object reference of the receiver
+int
+Sender::bind_to_receiver (ACE_ENV_SINGLE_ARG_DECL)
+{
+ CosNaming::Name name (1);
+ name.length (1);
+ name [0].id =
+ CORBA::string_dup ("Receiver");
+
+ // Resolve the receiver object reference from the Naming Service
+ CORBA::Object_var receiver_mmdevice_obj =
+ this->naming_client_->resolve (name
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
+
+ this->receiver_mmdevice_ =
+ AVStreams::MMDevice::_narrow (receiver_mmdevice_obj.in ()
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
+
+ if (CORBA::is_nil (this->receiver_mmdevice_.in ()))
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Could not resolve Receiver_MMdevice in Naming service <%s>\n"),
+ -1);
+
+ return 0;
+}
+
+int
+Sender::init (int argc,
+ char **argv
+ ACE_ENV_ARG_DECL)
+{
+ // Initialize the endpoint strategy with the orb and poa.
+ int result =
+ this->endpoint_strategy_.init (TAO_AV_CORE::instance ()->orb (),
+ TAO_AV_CORE::instance ()->poa ());
+ if (result != 0)
+ return result;
+
+ // Initialize the naming services
+ result =
+ this->naming_client_.init (TAO_AV_CORE::instance ()->orb ());
+ if (result != 0)
+ return result;
+
+ // Parse the command line arguments
+ result =
+ this->parse_args (argc,
+ argv);
+ if (result != 0)
+ return result;
+
+ // Open file to read.
+ this->input_file_ =
+ ACE_OS::fopen (this->filename_.c_str (),
+ "r");
+
+ if (this->input_file_ == 0)
+ ACE_ERROR_RETURN ((LM_DEBUG,
+ "Cannot open input file %s\n",
+ this->filename_.c_str ()),
+ -1);
+ else
+ ACE_DEBUG ((LM_DEBUG,
+ "File opened successfully\n"));
+
+ // Resolve the object reference of the receiver from the Naming Service.
+ result = this->bind_to_receiver (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
+
+ if (result != 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%P|%t) Error binding to the naming service\n"),
+ -1);
+
+
+ // Initialize the QoS
+ AVStreams::streamQoS_var the_qos (new AVStreams::streamQoS);
+
+ // Create the forward flow specification to describe the flow.
+ TAO_Forward_FlowSpec_Entry entry ("Data_Receiver",
+ "IN",
+ "USER_DEFINED",
+ "TS",
+ this->protocol_.c_str (),
+ 0);
+
+ AVStreams::flowSpec flow_spec (1);
+ flow_spec.length (1);
+ flow_spec [0] = CORBA::string_dup (entry.entry_to_string ());
+
+ // Register the sender mmdevice object with the ORB
+ ACE_NEW_RETURN (this->sender_mmdevice_,
+ TAO_MMDevice (&this->endpoint_strategy_),
+ -1);
+
+ // Servant Reference Counting to manage lifetime
+ PortableServer::ServantBase_var safe_mmdevice =
+ this->sender_mmdevice_;
+
+ AVStreams::MMDevice_var mmdevice =
+ this->sender_mmdevice_->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
+
+ ACE_NEW_RETURN (this->streamctrl_,
+ TAO_StreamCtrl,
+ -1);
+
+ PortableServer::ServantBase_var safe_streamctrl =
+ this->streamctrl_;
+
+ // Bind/Connect the sender and receiver MMDevices.
+ CORBA::Boolean bind_result =
+ this->streamctrl_->bind_devs (mmdevice.in (),
+ this->receiver_mmdevice_.in (),
+ the_qos.inout (),
+ flow_spec
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
+
+ if (bind_result == 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "streamctrl::bind_devs failed\n"),
+ -1);
+
+ return 0;
+}
+
+// Method to send data at the specified rate
+int
+Sender::pace_data (ACE_ENV_SINGLE_ARG_DECL)
+{
+ // The time that should lapse between two consecutive frames sent.
+ ACE_Time_Value inter_frame_time;
+
+ // The time between two consecutive frames.
+ inter_frame_time.set (1 / (double) this->frame_rate_);
+
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "Frame Rate = %d / second\n"
+ "Inter Frame Time = %d (msec)\n",
+ this->frame_rate_,
+ inter_frame_time.msec ()));
+
+ ACE_TRY
+ {
+ // The time taken for sending a frame and preparing for the next frame
+ ACE_High_Res_Timer elapsed_timer;
+
+ // Continue to send data till the file is read to the end.
+ while (1)
+ {
+ // Read from the file into a message block.
+ int n = ACE_OS::fread (this->mb_.wr_ptr (),
+ 1,
+ this->mb_.size (),
+ this->input_file_);
+
+ if (n < 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Sender::pace_data fread failed\n"),
+ -1);
+
+ if (n == 0)
+ {
+ // At end of file break the loop and end the sender.
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,"Handle_Start:End of file\n"));
+ break;
+ }
+
+ this->mb_.wr_ptr (n);
+
+ if (this->frame_count_ > 1)
+ {
+ //
+ // Second frame and beyond
+ //
+
+ // Stop the timer that was started just before the previous frame was sent.
+ elapsed_timer.stop ();
+
+ // Get the time elapsed after sending the previous frame.
+ ACE_Time_Value elapsed_time;
+ elapsed_timer.elapsed_time (elapsed_time);
+
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "Elapsed Time = %d\n",
+ elapsed_time.msec ()));
+
+ // Check to see if the inter frame time has elapsed.
+ if (elapsed_time < inter_frame_time)
+ {
+ // Inter frame time has not elapsed.
+
+ // Calculate the time to wait before the next frame needs to be sent.
+ ACE_Time_Value wait_time (inter_frame_time - elapsed_time);
+
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "Wait Time = %d\n",
+ wait_time.msec ()));
+
+ // Run the orb for the wait time so the sender can
+ // continue other orb requests.
+ TAO_AV_CORE::instance ()->orb ()->run (wait_time
+ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ }
+ }
+
+ // Start timer before sending the frame.
+ elapsed_timer.start ();
+
+ // Send frame.
+ int result =
+ this->protocol_object_->send_frame (&this->mb_);
+
+ if (result < 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "send failed:%p",
+ "Sender::pace_data send\n"),
+ -1);
+
+ ACE_DEBUG ((LM_DEBUG,
+ "Sender::pace_data frame %d was sent succesfully\n",
+ ++this->frame_count_));
+
+ // Reset the message block.
+ this->mb_.reset ();
+
+ } // end while
+
+ // File reading is complete, destroy the stream.
+ AVStreams::flowSpec stop_spec;
+ this->streamctrl_->destroy (stop_spec
+ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ // Shut the orb down.
+ TAO_AV_CORE::instance ()->orb ()->shutdown (0
+ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ }
+ ACE_CATCHANY
+ {
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
+ "Sender::pace_data Failed\n");
+ return -1;
+ }
+ ACE_ENDTRY;
+ return 0;
+}
+
+int
+main (int argc,
+ char **argv)
+{
+ ACE_DECLARE_NEW_CORBA_ENV;
+ ACE_TRY
+ {
+
+ ACE_High_Res_Timer::global_scale_factor ();
+
+ CORBA::ORB_var orb =
+ CORBA::ORB_init (argc,
+ argv,
+ 0
+ ACE_ENV_ARG_PARAMETER);
+
+ CORBA::Object_var obj
+ = orb->resolve_initial_references ("RootPOA"
+ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ // Get the POA_var object from Object_var
+ PortableServer::POA_var root_poa
+ = PortableServer::POA::_narrow (obj.in ()
+ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ PortableServer::POAManager_var mgr
+ = root_poa->the_POAManager (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ mgr->activate (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ // Initialize the AV Stream components.
+ TAO_AV_CORE::instance ()->init (orb.in (),
+ root_poa.in ()
+ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ // Initialize the Sender.
+ int result = 0;
+ result = SENDER::instance ()->init (argc,
+ argv
+ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ if (result < 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Sender::init failed\n"),
+ -1);
+
+ // Start sending data.
+ result = SENDER::instance ()->pace_data (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ }
+ ACE_CATCHANY
+ {
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
+ "Sender Failed\n");
+ return -1;
+ }
+ ACE_ENDTRY;
+ ACE_CHECK_RETURN (-1);
+
+ SENDER::close (); // Explicitly finalize the Unmanaged_Singleton.
+
+ return 0;
+}
+
+#if defined (ACE_HAS_EXPLICIT_STATIC_TEMPLATE_MEMBER_INSTANTIATION)
+template ACE_Unmanaged_Singleton<Sender, ACE_Null_Mutex> *ACE_Unmanaged_Singleton<Sender, ACE_Null_Mutex>::singleton_;
+#endif /* ACE_HAS_EXPLICIT_STATIC_TEMPLATE_MEMBER_INSTANTIATION */
diff --git a/TAO/orbsvcs/tests/AVStreams/Pluggable_Flow_Protocol/sender.h b/TAO/orbsvcs/tests/AVStreams/Pluggable_Flow_Protocol/sender.h
new file mode 100644
index 00000000000..20e9dc6ef50
--- /dev/null
+++ b/TAO/orbsvcs/tests/AVStreams/Pluggable_Flow_Protocol/sender.h
@@ -0,0 +1,117 @@
+/* -*- C++ -*- */
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// TAO/orbsvcs/tests/AVStreams/Simple
+//
+// = FILENAME
+// sender.h
+//
+// = DESCRIPTION
+// This application reads data from a file and sends it to s
+// receiver.
+//
+// = AUTHOR
+// Yamuna Krishnamurthy <yamuna@cs.wustl.edu>
+//
+// ============================================================================
+
+#include "orbsvcs/Naming/Naming_Client.h"
+#include "orbsvcs/AV/AVStreams_i.h"
+#include "orbsvcs/AV/Endpoint_Strategy.h"
+#include "orbsvcs/AV/Protocol_Factory.h"
+
+class Sender_StreamEndPoint : public TAO_Client_StreamEndPoint
+{
+ // = TITLE
+ // Defines a sender stream endpoint.
+public:
+ int get_callback (const char *flowname,
+ TAO_AV_Callback *&callback);
+ // Create the application callback and return its handle to
+ // AVStreams for further application callbacks.
+
+ int set_protocol_object (const char *flowname,
+ TAO_AV_Protocol_Object *object);
+ // Set protocol object corresponding to the transport protocol
+ // chosen.
+
+protected:
+ TAO_AV_Callback callback_;
+ // Application callback.
+};
+
+typedef TAO_AV_Endpoint_Reactive_Strategy_A
+ <Sender_StreamEndPoint,
+ TAO_VDev,
+ AV_Null_MediaCtrl>
+ SENDER_ENDPOINT_STRATEGY;
+
+class Sender
+{
+ // = TITLE
+ // Sender Application.
+ //
+ // = DESCRIPTION
+ // Class is responsible for streaming (and pacing) data to a
+ // receiver.
+public:
+ Sender (void);
+ // Constructor
+
+ int init (int argc,
+ char **argv
+ ACE_ENV_ARG_DECL);
+ // Method to initialize the various data components.
+
+ int pace_data (ACE_ENV_SINGLE_ARG_DECL);
+ // Method to pace and send data from a file.
+
+ void protocol_object (TAO_AV_Protocol_Object *protocol_object);
+ // Set the protocol object corresponding to the transport protocol chosen.
+
+private:
+ int parse_args (int argc, char **argv);
+ // Method to parse the command line arguments.
+
+ int bind_to_receiver (ACE_ENV_SINGLE_ARG_DECL);
+ // Method that binds the sender to the receiver.
+
+ SENDER_ENDPOINT_STRATEGY endpoint_strategy_;
+ // The endpoint strategy used by the sender.
+
+ AVStreams::MMDevice_var receiver_mmdevice_;
+ // The receiver MMDevice that the sender connects to.
+
+ TAO_MMDevice *sender_mmdevice_;
+ // The sender MMDevice.
+
+ TAO_StreamCtrl *streamctrl_;
+ // Stream controller
+
+ int frame_count_;
+ // Number of frames sent.
+
+ ACE_CString filename_;
+ // File from which data is read.
+
+ TAO_Naming_Client naming_client_;
+ // The Naming Service client.
+
+ FILE *input_file_;
+ // File handle of the file read from.
+
+ ACE_CString protocol_;
+ // Selected protocol - default is UDP
+
+ int frame_rate_;
+ // Rate at which the data will be sent.
+
+ ACE_Message_Block mb_;
+ // Message block into which data is read from a file and then sent.
+
+ TAO_AV_Protocol_Object *protocol_object_;
+ // Protocol object corresponding to the transport protocol selected.
+};
diff --git a/TAO/orbsvcs/tests/AVStreams/Pluggable_Flow_Protocol/svc.conf b/TAO/orbsvcs/tests/AVStreams/Pluggable_Flow_Protocol/svc.conf
new file mode 100644
index 00000000000..d4f2cd01350
--- /dev/null
+++ b/TAO/orbsvcs/tests/AVStreams/Pluggable_Flow_Protocol/svc.conf
@@ -0,0 +1,5 @@
+dynamic AV_Default_Resource_Factory Service_Object * TAO_AV:_make_TAO_AV_Default_Resource_Factory() ""
+dynamic TimeStamp_Protocol_Factory Service_Object * TAO_TS:_make_TimeStamp_Protocol_Factory() ""
+static AV_Default_Resource_Factory "-AVFlowProtocolFactory TimeStamp_Protocol_Factory"
+dynamic UDP_Factory Service_Object * TAO_AV:_make_TAO_AV_UDP_Factory() ""
+static AV_Default_Resource_Factory "-AVTransportFactory UDP_Factory"
diff --git a/TAO/orbsvcs/tests/AVStreams/Pluggable_Flow_Protocol/svc.conf.xml b/TAO/orbsvcs/tests/AVStreams/Pluggable_Flow_Protocol/svc.conf.xml
new file mode 100644
index 00000000000..ef8e8645e5b
--- /dev/null
+++ b/TAO/orbsvcs/tests/AVStreams/Pluggable_Flow_Protocol/svc.conf.xml
@@ -0,0 +1,15 @@
+<?xml version='1.0'?>
+<!-- Converted from ./orbsvcs/tests/AVStreams/Pluggable_Flow_Protocol/svc.conf by svcconf-convert.pl -->
+<ACE_Svc_Conf>
+ <dynamic id="AV_Default_Resource_Factory" type="Service_Object">
+ <initializer path="TAO_AV" init="_make_TAO_AV_Default_Resource_Factory"/>
+ </dynamic>
+ <dynamic id="TimeStamp_Protocol_Factory" type="Service_Object">
+ <initializer path="TAO_TS" init="_make_TimeStamp_Protocol_Factory"/>
+ </dynamic>
+ <static id="AV_Default_Resource_Factory" params="-AVFlowProtocolFactory TimeStamp_Protocol_Factory"/>
+ <dynamic id="UDP_Factory" type="Service_Object">
+ <initializer path="TAO_AV" init="_make_TAO_AV_UDP_Factory"/>
+ </dynamic>
+ <static id="AV_Default_Resource_Factory" params="-AVTransportFactory UDP_Factory"/>
+</ACE_Svc_Conf>