summaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorcdgill <cdgill@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1998-09-19 17:43:28 +0000
committercdgill <cdgill@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1998-09-19 17:43:28 +0000
commit49574b611ce2df265ab36a29b32e85945aebb515 (patch)
treec1d174189337967c938ed5501e8e5c8df894a990 /examples
parentdae6838c87f4dfaad57283fafd3185ea2402cc22 (diff)
downloadATCD-49574b611ce2df265ab36a29b32e85945aebb515.tar.gz
first (very rough) checkin
Diffstat (limited to 'examples')
-rw-r--r--examples/Bounded_Packet_Relay/BPR_Drivers.cpp606
-rw-r--r--examples/Bounded_Packet_Relay/BPR_Drivers.h315
-rw-r--r--examples/Bounded_Packet_Relay/Makefile294
-rw-r--r--examples/Bounded_Packet_Relay/README201
-rw-r--r--examples/Bounded_Packet_Relay/Thread_Bounded_Packet_Relay.cpp672
-rw-r--r--examples/Bounded_Packet_Relay/Thread_Bounded_Packet_Relay.h359
-rw-r--r--examples/Bounded_Packet_Relay/bpr_thread.cpp51
7 files changed, 2498 insertions, 0 deletions
diff --git a/examples/Bounded_Packet_Relay/BPR_Drivers.cpp b/examples/Bounded_Packet_Relay/BPR_Drivers.cpp
new file mode 100644
index 00000000000..542fe67aa5a
--- /dev/null
+++ b/examples/Bounded_Packet_Relay/BPR_Drivers.cpp
@@ -0,0 +1,606 @@
+// $Id$
+
+// ============================================================================
+// = LIBRARY
+// examples
+//
+// = FILENAME
+// BPR_Driver.cpp
+//
+// = DESCRIPTION
+// This code builds an abstraction to factor out common code for
+// the different implementations of the Timer_Queue.
+//
+// = AUTHOR
+// Chris Gill <cdgill@cs.wustl.edu>
+//
+// Based on examples/Timer_Queue/Driver.cpp, written by
+// Douglas Schmidt <schmidt@cs.wustl.edu> &&
+// Sergio Flores-Gaitan <sergio@cs.wustl.edu>
+//
+// ============================================================================
+
+#if !defined (_BPR_DRIVER_CPP_)
+#define _BPR_DRIVER_CPP_
+
+#include "ace/Auto_Ptr.h"
+#include "BPR_Driver.h"
+
+ACE_RCSID(Bounded_Packet_Relay, BPR_Driver, "$Id$")
+
+Input_Device_Wrapper_Base::Input_Device_Wrapper_Base (ACE_Thread_Manager * input_task_mgr)
+ : ACE_Task_Base (input_task_mgr)
+ , send_input_msg_cmd_ (0)
+ , input_rate_ (ACE_ONE_SECOND_IN_USECS)
+ , reactor_
+ , is_active_ (0)
+{
+}
+ // ctor
+
+Input_Device_Wrapper_Base::Input_Device_Wrapper_Base ()
+{
+ delete send_input_msg_cmd_;
+}
+ // dtor
+
+int
+Input_Device_Wrapper_Base::set_send_input_msg_cmd (Command_Base *send_input_msg_cmd)
+{
+ // delete the old command (if any) first
+ delete send_input_msg_cmd_;
+
+ send_input_msg_cmd_ = send_input_msg_cmd;
+ return 0;
+}
+ // sets send input message command in the input device driver object
+
+int
+Input_Device_Wrapper_Base::set_input_period (u_long input_period)
+{
+ input_period_ = input_period;
+ return 0;
+}
+ // sets period between when input messages are produced
+
+int
+Input_Device_Wrapper_Base::set_send_count (long count)
+{
+ send_count_ = count;
+}
+ // sets count of messages to send
+
+int
+Input_Device_Wrapper_Base::request_stop (void)
+{
+ is_active_ = 0;
+}
+ // request that the input device stop sending messages
+ // and terminate its thread. Should return 1 if it will do so, 0
+ // if it has already done so, or -1 if there is a problem doing so.
+
+
+int
+Input_Device_Wrapper_Base::svc (void)
+{
+ long count;
+ ACE_Time_Value timeout;
+ ACE_Message_Block *message;
+
+ // set a flag ti indicate we're active
+ is_active_ = 1;
+
+ // start with the total count of messages to send
+ count = send_count_;
+
+ // while we're still marked active, and there are packets to send
+ while ((is_active_) && (count != 0))
+ {
+ // make sure there is a send command object
+ if (! send_input_msg_cmd_)
+ {
+ is_active_ = 0;
+ ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n",
+ "send message command object not instantiated"),
+ -1);
+ }
+
+ // create an input message to send
+ message = create_input_message ();
+ if (! message)
+ {
+ is_active_ = 0;
+ ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n",
+ "Failed to create input message object"),
+ -1);
+ }
+
+ // send the input message
+ if (send_input_msg_cmd_->execute ((void *) message) < 0)
+ {
+ is_active_ = 0;
+ delete message;
+ ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n",
+ "Failed executing send message command object"),
+ -1);
+ }
+
+ // if all went well, decrement count of messages to send,
+ // and run the reactor event loop unti we get a timeout
+ // or something happens in a registered upcall.
+ if (count > 0)
+ {
+ --count;
+ }
+ timeout = ACE_Time_Value (0, period_);
+ reactor_.run_event_loop (timeout);
+ }
+
+ is_active_ = 0;
+
+ return 0;
+}
+ // This method runs the input device loop in the new thread.
+
+int
+Input_Device_Wrapper_Base::send_input_message (ACE_Message_Block *amb)
+{
+ if (send_input_msg_cmd_)
+ {
+ return send_input_msg_cmd_->execute ((void *) amb);
+ }
+ else
+ {
+ ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n",
+ "Input_Device_Wrapper_Base::send_input_message: "
+ "command object not instantiated"),
+ -1);
+ }
+}
+ // sends a newly created message block, carrying data
+ // read from the underlying input device, by passing a
+ // pointer to the message block to its command execution
+
+template <class SYNCH>
+Bounded_Packet_Relay<SYNCH>::Bounded_Packet_Relay (
+ ACE_Thread_Manager * input_task_mgr,
+ Input_Device_Wrapper_Base * input_wrapper,
+ Output_Device_Wrapper_Base * output_wrapper)
+ : input_task_mgr_ (input_task_mgr)
+ , input_wrapper_ (input_wrapper)
+ , input_thread_handle_ (0)
+ , output_wrapper_ (output_wrapper)
+ , queue_
+ , is_active_ (0)
+ , transmission_lock_
+ , queue_lock_
+ , transmission_number_ (0)
+ , packets_sent_ (0)
+ , status_ (Bounded_Packet_Relay<SYNCH>::UN_INITIALIZED)
+ , elapsed_duration_ (0)
+{
+ if (input_task_mgr_ == 0)
+ {
+ input_task_mgr_ = ACE_Thread_Manager::instance ();
+ }
+}
+ // ctor
+
+template <class SYNCH>
+Bounded_Packet_Relay<SYNCH>::~Bounded_Packet_Relay (void)
+{
+ delete input_wrapper_;
+ delete output_wrapper_;
+}
+ // dtor
+
+template <class SYNCH> int
+Bounded_Packet_Relay<SYNCH>::send_input (void)
+{
+ ACE_Message_Block *item;
+
+ // don't block, return immediately if queue is empty
+ if (queue_.dequeue_head (item, ACE_OS::gettimeofday ()) < 0)
+ {
+ return 1;
+ }
+
+ // if a message block was dequeued, send it to the output device
+ if (output_wrapper_->write_output_message ((void *) item) < 0)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n",
+ "failed to write to output device object"),
+ -1);
+ }
+
+ // if all went OK, increase count of packets sent
+ ++packets_sent_;
+ return 0;
+}
+ // Requests output be sent to output device.
+
+template <class SYNCH> int
+Bounded_Packet_Relay<SYNCH>::start_transmission (u_long packet_count,
+ u_long arrival_period,
+ u_long logging_level)
+{
+ // serialize access to start and end transmission calls,
+ // statistics reporting calls
+ ACE_GUARD_RETURN (SYNCH, ace_mon, this->transmission_lock_, -1);
+
+ // if a transmission is already in progress, just return
+ if (status_ == STARTED)
+ {
+ return 1;
+ }
+
+ // Update statistics for a new transmission
+ ++transmission_number;
+ packets_sent_ = 0;
+ status_ = STARTED;
+ transmission_start_ = ACE_OS::gettimeofday ();
+
+ // initialize the output device
+ if (output_wrapper_->modify_device_settings ((void *) &logging level) < 0)
+ {
+ status_ = ERROR;
+ transmission_end_ = ACE_OS::gettimeofday ();
+ ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n",
+ "failed to initialize output device object"),
+ -1);
+ }
+
+ // initialize the input device
+ if (input_wrapper_->set_input_period (u_long input_period) < 0)
+ {
+ status_ = ERROR;
+ transmission_end_ = ACE_OS::gettimeofday ();
+ ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n",
+ "failed to initialize input device object"),
+ -1);
+ }
+
+ // activate the input device and save a handle to the new thread
+ if (input_wrapper_->activate () < 0)
+ {
+ status_ = ERROR;
+ transmission_end_ = ACE_OS::gettimeofday ();
+ ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n",
+ "failed to activate input device object"),
+ -1);
+ }
+
+ // If all went well, return success
+ return 0;
+}
+ // Requests a transmission be started.
+
+template <class SYNCH> int
+Bounded_Packet_Relay<SYNCH>::end_transmission (Transmission_Status status)
+{
+ // serialize access to start and end transmission calls,
+ // statistics reporting calls
+ ACE_GUARD_RETURN (SYNCH, ace_mon, this->transmission_lock_, -1);
+
+ // if a transmission is not already in progress, just return
+ if (status_ != STARTED)
+ {
+ return 1;
+ }
+
+ // ask the the input thread to stop
+ if (input_wrapper_->request_stop () < 0)
+ {
+ status_ = ERROR;
+ transmission_end_ = ACE_OS::gettimeofday ();
+ ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n",
+ "failed asking input device thread to stop"),
+ -1);
+ }
+
+ // wait for input thread to stop
+ if (input_task_mgr_->wait_task (input_wrapper_) < 0)
+ {
+ status_ = ERROR;
+ transmission_end_ = ACE_OS::gettimeofday ();
+ ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n",
+ "failed waiting for input device thread to stop"),
+ -1);
+ }
+
+ // if all went well, set passed status, stamp end time, return success
+ status_ = status;
+ transmission_end_ = ACE_OS::gettimeofday ();
+ return 0;
+}
+ // Requests a transmission be ended.
+
+template <class SYNCH> int
+Bounded_Packet_Relay<SYNCH>::report_statistics (void)
+{
+ // serialize access to start and end transmission calls,
+ // statistics reporting calls
+ ACE_GUARD_RETURN (SYNCH, ace_mon, this->transmission_lock_, -1);
+
+ // if a transmission is already in progress, just return
+ if (status_ == STARTED)
+ {
+ return 1;
+ }
+
+ const char *status_msg;
+ switch (status_)
+ {
+ case UN_INITIALIZED:
+ status_msg = "Uninitialized";
+ break;
+
+ case STARTED:
+ // NOT REACHED: user should never see this ;-)
+ status_msg = "In progress";
+ break;
+
+ case COMPLETED:
+ status_msg = "Completed with all packets sent";
+ break;
+
+ case TIMED_OUT:
+ status_msg = "Terminated by transmission duration timer";
+ break;
+
+ case CANCELLED:
+ status_msg = "Cancelled by external control";
+ break;
+
+ case ERROR:
+ status_msg = "Error was detected";
+ break;
+
+ default:
+ status_msg = "Unknown";
+ break;
+ }
+
+ // calculate duration of trasmission
+ ACE_Time_Value duration (transmission_end_);
+ duration -= transmission_start_;
+
+ // report transmission statistics
+ if (ACE_OS::fprintf (ACE_STDOUT,
+ "\n\nStatisics for transmission %lu:\n\n"
+ "Transmission status: %s\n"
+ "Start time: %ld (sec) %ld usec\n"
+ "End time: %ld (sec) %ld usec\n"
+ "Duration: %ld (sec) %ld usec\n"
+ "Packets relayed: %lu\n\n",
+ transmission_number_, status_msg,
+ transmission_start_.sec (),
+ transmission_start_.usec (),
+ transmission_end_.sec (),
+ transmission_end_.usec (),
+ duration.sec (),
+ duration.usec (),
+ packets_sent_) < 0)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n",
+ "Bounded_Packet_Relay<SYNCH>::report_statistics"
+ "ACE_OS::fprintf failed"), -1);
+ }
+
+ if (ACE_OS::fflush (ACE_STDOUT) < 0)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n",
+ "Bounded_Packet_Relay<SYNCH>::report_statistics"
+ "ACE_OS::fflush failed"), -1);
+ }
+
+ return 0;
+}
+ // Requests a report of statistics from the last transmission.
+
+template <class SYNCH> int
+Bounded_Packet_Relay<SYNCH>::receive_input (void * arg)
+{
+ ACE_Message_Block *message;
+
+ message = ACE_static_cast (ACE_Message_Block *, arg);
+
+ if (queue_.enqueue_tail (message) < 0)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n",
+ "Bounded_Packet_Relay<SYNCH>::receive_input failed"),
+ -1);
+ }
+
+ return 0;
+}
+ // public entry point to which to push input.
+
+// Parse the input and execute the corresponding command
+
+template <class TQ, class RECEIVER, class ACTION> int
+Bounded_Packet_Relay_Driver<TQ, RECEIVER, ACTION>::parse_commands (const char *buf)
+{
+ int option;
+
+ if (::sscanf (buf, "%d", &option) <= 0)
+ // If there was an error reading the option simply try on the next line.
+ return 0;
+
+ switch (option)
+ {
+ case 1: // set packet count
+ {
+ u_long count;
+
+ // We just reread the option, this simplies parsing
+ // (since sscanf can do it for us.)
+ if (::sscanf (buf, "%d %lu", &option, &count) < 2)
+ {
+ // if there was not enough information on the line,
+ // ignore option and try the next line.
+ return 0;
+ }
+
+ if (packet_count_cmd_->execute ((void *) &count) == -1)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n", "set packet count failed"), -1);
+ }
+
+ break;
+ }
+
+ case 2: // set arrival period
+ {
+ u_long usec;
+
+ // We just reread the option, this simplies parsing
+ // (since sscanf can do it for us.)
+ if (::sscanf (buf, "%d %lu", &option, &usec) < 2)
+ {
+ // if there was not enough information on the line,
+ // ignore option and try the next line.
+ return 0;
+ }
+
+ if (arrival_period_cmd_->execute ((void *) &usec) == -1)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n", "set arrival period failed"), -1);
+ }
+
+ break;
+ }
+
+ case 3: // set transmit period
+ {
+ u_long usec;
+
+ // We just reread the option, this simplies parsing
+ // (since sscanf can do it for us.)
+ if (::sscanf (buf, "%d %lu", &option, &usec) < 2)
+ {
+ // if there was not enough information on the line,
+ // ignore option and try the next line.
+ return 0;
+ }
+
+ if (transmit_period_cmd_->execute ((void *) &usec) == -1)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n", "set transmit period failed"), -1);
+ }
+
+ break;
+ }
+
+ case 4: // set duration limit
+ {
+ u_long usec;
+
+ // We just reread the option, this simplies parsing
+ // (since sscanf can do it for us.)
+ if (::sscanf (buf, "%d %lu", &option, &usec) < 2)
+ {
+ // if there was not enough information on the line,
+ // ignore option and try the next line.
+ return 0;
+ }
+
+ if (duration_limit_cmd_->execute ((void *) &usec) == -1)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n", "set duration limit failed"), -1);
+ }
+
+ break;
+ }
+
+ case 5: // set logging level
+ {
+ u_long level;
+
+ // We just reread the option, this simplies parsing
+ // (since sscanf can do it for us.)
+ if (::sscanf (buf, "%d %lu", &option, &level) < 2)
+ {
+ // if there was not enough information on the line,
+ // ignore option and try the next line.
+ return 0;
+ }
+
+ if (logging_level_cmd_->execute ((void *) &level) == -1)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n", "set logging level failed"), -1);
+ }
+
+ break;
+ }
+
+ case 6: // run one transmission
+ return run_transmission_cmd_->execute (0);
+ /* NOTREACHED */
+
+ case 7: // report statistics
+ return report_stats_cmd_->execute (0);
+ /* NOTREACHED */
+
+ case 8: // shut down the driver
+ return shutdown_cmd_->execute (0);
+ /* NOTREACHED */
+
+ default:
+ // Display an error message.
+ ACE_ERROR_RETURN ((LM_ERROR, "invalid input %s\n", buf), 0);
+ ACE_NOTREACHED (break);
+ /* NOTREACHED */
+
+ } /* ENDSWITCH */
+
+ return 0;
+}
+
+// Runs the test.
+
+template <class TQ, class RECEIVER, class ACTION> int
+Bounded_Packet_Relay_Driver<TQ, RECEIVER, ACTION>::run (void)
+{
+ this->init ();
+
+ for (;;)
+ if (this->get_next_request () == -1)
+ return -1;
+
+ ACE_NOTREACHED (return 0);
+}
+
+// gets the next request from the user input.
+
+template <class TQ, class RECEIVER, class ACTION> int
+Bounded_Packet_Relay_Driver<TQ, RECEIVER, ACTION>::get_next_request (void)
+{
+ char buf[BUFSIZ];
+
+ this->display_menu ();
+
+ ACE_OS::fprintf (ACE_STDERR, "Please enter your choice: ");
+ ACE_OS::fflush (ACE_STDERR);
+
+ // reads input from the user
+ if (this->read_input (buf, sizeof buf) <= 0)
+ return -1;
+
+ // Parse and run the command.
+ return this->parse_commands (buf);
+}
+
+// Reads input from the user from ACE_STDIN into the buffer specified.
+
+template <class TQ, class RECEIVER, class ACTION> ssize_t
+Bounded_Packet_Relay_Driver<TQ, RECEIVER, ACTION>::read_input (char *buf, size_t bufsiz)
+{
+ ACE_OS::memset (buf, 0, bufsiz);
+
+ // Wait for user to type commands. This call is automatically
+ // restarted when SIGINT or SIGALRM signals occur.
+ return ACE_OS::read (ACE_STDIN, buf, bufsiz);
+}
+
+#endif /* _BPR_DRIVER_CPP_ */
diff --git a/examples/Bounded_Packet_Relay/BPR_Drivers.h b/examples/Bounded_Packet_Relay/BPR_Drivers.h
new file mode 100644
index 00000000000..b583dc8edcf
--- /dev/null
+++ b/examples/Bounded_Packet_Relay/BPR_Drivers.h
@@ -0,0 +1,315 @@
+/* -*- C++ -*- */
+
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// examples
+//
+// = FILENAME
+// BPR_Driver.h
+//
+// = DESCRIPTION
+// This code builds an abstraction to factor out common code from
+// the different implementations of the Timer_Queue based bounded
+// packet relay example.
+//
+// = AUTHORS
+// Chris Gill <cdgill@cs.wustl.edu>
+//
+// Based on examples/Timer_Queue/Driver.h written by
+// Sergio Flores-Gaitan <sergio@cs.wustl.edu>
+//
+// ============================================================================
+
+#if !defined (_BPR_DRIVER_H_)
+#define _BPR_DRIVER_H_
+
+#include "ace/Task.h"
+// #include "ace/Timer_Heap_T.h"
+// #include "ace/Timer_Queue_Adapters.h"
+
+class Command_Base
+ // = TITLE
+ // Defines an abstract class that allows us to invoke commands
+ // without knowing anything about the implementation.
+ //
+ // = DESCRIPTION
+ // This class declares an interface to execute a command independent
+ // of the effect of the command, or the objects used to implement it.
+{
+public:
+
+ virtual int execute (void *arg) = 0;
+ // Invokes the method <action_> from the object <receiver_>.
+
+};
+
+
+class Input_Device_Wrapper_Base : public ACE_Task_Base
+ // = TITLE
+ // Defines an abstract base class for an input device wrapper that hides
+ // the details of the specific device and provides a consistent message
+ // passing interface without knowing anything about the implementation
+ // of the input device or the message receiver.
+ //
+ // The abstract base class ctor takes a command template object that is
+ // instantiated with the correct receiver and action types. This command
+ // object is used to send newly created input messages to the receiver.
+ //
+ // The abstract base class is designed to operate in an active "push"
+ // mode, sending input data to the receiver whenever the data is ready.
+ // The underlying device may be active, notifying the wrapper when
+ // data is ready, or may be passive in which case the wrapper must
+ // rely on a reactive and/or polling mechanism.
+ //
+ // = DESCRIPTION
+ // Derived classes are responsible for filling in concrete definitions
+ // for the abstract message creation method and the svc method.
+{
+public:
+
+ Input_Device_Wrapper_Base (ACE_Thread_Manager * input_task_mgr);
+ // ctor
+
+ virtual ~Input_Device_Wrapper_Base ();
+ // dtor
+
+ int set_send_input_msg_cmd (Command_Base *send_input_msg_cmd);
+ // sets send input message command in the input device driver object
+
+ int set_input_period (u_long input_period);
+ // sets period (in usecs) between when inputs are created
+
+ int set_send_count (long count);
+ // sets count of messages to send
+
+ int request_stop (void);
+ // request that the input device stop sending messages
+ // and terminate its thread. Should return 1 if it will do so, 0
+ // if it has already done so, or -1 if there is a problem doing so.
+
+ virtual int svc (void);
+ // This method runs the input device loop in the new thread.
+
+protected:
+
+ virtual ACE_Message_Block *create_input_message () = 0;
+ // creates a new message block, carrying data
+ // read from the underlying input device
+
+ virtual int send_input_message (ACE_Message_Block *);
+ // sends a newly created message block, carrying data
+ // read from the underlying input device, by passing a
+ // pointer to the message block to its command execution
+
+ Command_Base *send_input_msg_cmd_;
+ // send newly created input message.
+
+ u_long input_period_;
+ // period between when input values are produced (usecs)
+
+ ACE_Reactor reactor_;
+ // reactor used to multiplex input streams, timeouts
+
+ int is_active_;
+ // flag to indicate whether or not input object
+ // is and should remain active
+
+ long send_count_;
+ // count of messages to send before stopping
+ // (-1 indicates the device should not stop)
+};
+
+
+class Output_Device_Wrapper_Base
+ // = TITLE
+ // Defines an abstract base class for an output device wrapper that hides
+ // the details of the specific device and provides a consistent write
+ // method interface without knowing anything about the implementation.
+ //
+ // = DESCRIPTION
+ // The abstract methods write_output_message () and modify_device_settings ()
+ // are defined in derived classes to write the contents of the passed message
+ // out the underlying output device, and update device settings, respectively.
+{
+public:
+
+ virtual int write_output_message (void *) = 0;
+ // writes contents of the passed message block
+ // out to the underlying output device
+
+ virtual int modify_device_settings (void *) = 0;
+ // provides an abstract interface to allow modifying device settings
+};
+
+
+template <class SYNCH>
+class Bounded_Packet_Relay
+{
+public:
+
+ typedef int (Input_Task::*ACTION) (void *);
+ // Command entry point type definition
+
+ enum Transmission_Status {UN_INITIALIZED, STARTED, COMPLETED, TIMED_OUT, CANCELLED, ERROR};
+ // enumerates possible status values at the end of a transmission
+
+ Bounded_Packet_Relay (ACE_Thread_Manager * input_task_mgr,
+ Input_Device_Wrapper_Base * input_wrapper,
+ Output_Device_Wrapper_Base * output_wrapper);
+ // ctor
+
+ virtual ~Bounded_Packet_Relay (void);
+ // dtor
+
+ int send_input (void);
+ // Requests output be sent to output device.
+
+ int start_transmission (u_long packet_count,
+ u_long arrival_period,
+ u_long logging_level);
+ // Requests a transmission be started.
+
+ int end_transmission (Transmission_Status status);
+ // Requests a transmission be ended.
+
+ int report_statistics (void);
+ // Requests a report of statistics from the last transmission.
+
+ /////////////////////////////////////
+ // Command Accessible Entry Points //
+ /////////////////////////////////////
+
+ int receive_input (void *);
+ // public entry point to which to push input.
+
+private:
+
+ ////////////////////////////
+ // Concurrency Management //
+ ////////////////////////////
+
+ ACE_Thread_Manager * input_task_mgr_;
+ // Thread manager for the input device task
+
+ Input_Device_Wrapper_Base * input_wrapper_;
+ // pointer to the input device wrapper
+
+ Output_Device_Wrapper_Base * output_wrapper_;
+ // pointer to the output device wrapper
+
+ ACE_Message_Queue<SYNCH> queue_;
+
+ SYNCH transmission_lock_;
+ // lock for safe thread synchronization
+ // of transmission startup and termination
+
+ /////////////////////////////
+ // Transmission Statistics //
+ /////////////////////////////
+
+ u_long transmission_number_;
+ // number of transmissions sent
+
+ u_long packets_sent_;
+ // count of packets sent in the most recent transmission
+
+ Transmission_Status status_;
+ // status of the current or most recent transmission
+
+ ACE_Time_Value transmission_start_;
+ // start time of the most recent transmission
+
+ ACE_Time_Value transmission_end_;
+ // ending time of the most recent transmission
+
+};
+
+template <class TQ>
+class Bounded_Packet_Relay_Driver
+ // = TITLE
+ // Defines a class that provides a simmple implementation for
+ // a test driver for timer queues.
+ //
+ // = DESCRIPTION
+ // This is the place where the common code to test the different
+ // implementations of the timer queue resides. This class has
+ // the logic for the parse_commands () method, the run (),
+ // read_input () and get_next_request () methods. Subclasses can
+ // override these methods if there is some logic that is specific
+ // to that implementation.
+{
+public:
+
+ virtual int parse_commands (const char *buf);
+ // Breaks up the input string buffer into pieces and executes
+ // the appropriate method to handle that operation.
+
+ virtual int run (void);
+ // This is the main entry point for the driver. The user
+ // of the class should normally invoke this method.
+ // Returns 0 when successful, or 0 otherwise.
+
+ virtual int get_next_request (void);
+ // This internal method gets the next request from the user.
+ // Returns -1 when user wants to exit. Returns 0 otherwise.
+
+ virtual ssize_t read_input (char *buf, size_t bufsiz);
+ // Reads input from the user into the buffer <buf> with a maximum
+ // of <bufsiz> bytes. Returns the amount of bytes actually read
+ // Otherwise, a -1 is returned and errno is set to indicate the error.
+
+ virtual int display_menu (void);
+ // Prints the user interface for the driver to STDOUT.
+
+ virtual int init (void)=0;
+ // Initializes values and operations for the driver.
+
+protected:
+
+ // = Major Driver Mechanisms
+
+ TQ timer_queue_;
+ // timer queue for transmission timeouts
+
+ // = Set of <Command>s to be executed.
+
+ Command_Base *packet_count_cmd_;
+ // set packet count command
+
+ Command_Base *arrival_period_cmd_;
+ // set arrival period command.
+
+ Command_Base *transmit_period_cmd_;
+ // set transmit period command.
+
+ Command_Base *duration_limit_cmd_;
+ // set duration limit command.
+
+ Command_Base *logging_level_cmd_;
+ // set logging level command.
+
+ Command_Base *run_transmission_cmd_;
+ // run transmission command.
+
+ Command_Base *cancel_transmission_cmd_;
+ // cancel transmission command.
+
+ Command_Base *report_stats_cmd_;
+ // report statistics command.
+
+ Command_Base *shutdown_cmd_;
+ // shutdown the driver.
+};
+
+#if defined (ACE_TEMPLATES_REQUIRE_SOURCE)
+#include "Driver.cpp"
+#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */
+
+#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA)
+#pragma implementation ("Driver.cpp")
+#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */
+
+#endif /* _BPR_DRIVER_H_ */
diff --git a/examples/Bounded_Packet_Relay/Makefile b/examples/Bounded_Packet_Relay/Makefile
new file mode 100644
index 00000000000..9291ba4b2f9
--- /dev/null
+++ b/examples/Bounded_Packet_Relay/Makefile
@@ -0,0 +1,294 @@
+#----------------------------------------------------------------------------
+# $Id$
+#----------------------------------------------------------------------------
+
+#----------------------------------------------------------------------------
+# Local macros
+#----------------------------------------------------------------------------
+
+INFO = README
+
+LIB = libBPRD.a
+SHLIB = libBPRD.$(SOEXT)
+
+BIN = bpr_thread
+
+PSRC = $(addsuffix .cpp,$(BIN))
+
+LSRC = Thread_Bounded_Packet_Relay.cpp \
+ BPR_Drivers.cpp
+
+LDLIBS = -lBPRD
+
+VLDLIBS = $(LDLIBS:%=%$(VAR))
+
+BUILD = $(VLIB) $(VSHLIB) $(SHLIBA) $(VBIN)
+
+#----------------------------------------------------------------------------
+# Include macros and targets
+#----------------------------------------------------------------------------
+
+include $(ACE_ROOT)/include/makeinclude/wrapper_macros.GNU
+include $(ACE_ROOT)/include/makeinclude/macros.GNU
+include $(ACE_ROOT)/include/makeinclude/rules.common.GNU
+include $(ACE_ROOT)/include/makeinclude/rules.nonested.GNU
+include $(ACE_ROOT)/include/makeinclude/rules.lib.GNU
+include $(ACE_ROOT)/include/makeinclude/rules.bin.GNU
+include $(ACE_ROOT)/include/makeinclude/rules.local.GNU
+
+#----------------------------------------------------------------------------
+# Local targets
+#----------------------------------------------------------------------------
+
+#----------------------------------------------------------------------------
+# Dependencies
+#----------------------------------------------------------------------------
+
+# DO NOT DELETE THIS LINE -- g++dep uses it.
+# DO NOT PUT ANYTHING AFTER THIS LINE, IT WILL GO AWAY.
+
+.obj/Thread_Bounded_Packet_Relay.o .obj/Thread_Bounded_Packet_Relay.so .shobj/Thread_Bounded_Packet_Relay.o .shobj/Thread_Bounded_Packet_Relay.so: Thread_Bounded_Packet_Relay.cpp \
+ $(ACE_ROOT)/ace/Task.h \
+ $(ACE_ROOT)/ace/Service_Object.h \
+ $(ACE_ROOT)/ace/Shared_Object.h \
+ $(ACE_ROOT)/ace/ACE.h \
+ $(ACE_ROOT)/ace/OS.h \
+ $(ACE_ROOT)/ace/inc_user_config.h \
+ $(ACE_ROOT)/ace/config.h \
+ $(ACE_ROOT)/ace/streams.h \
+ $(ACE_ROOT)/ace/Basic_Types.h \
+ $(ACE_ROOT)/ace/Basic_Types.i \
+ $(ACE_ROOT)/ace/OS.i \
+ $(ACE_ROOT)/ace/Trace.h \
+ $(ACE_ROOT)/ace/Log_Msg.h \
+ $(ACE_ROOT)/ace/Log_Record.h \
+ $(ACE_ROOT)/ace/ACE.i \
+ $(ACE_ROOT)/ace/Log_Priority.h \
+ $(ACE_ROOT)/ace/Log_Record.i \
+ $(ACE_ROOT)/ace/Shared_Object.i \
+ $(ACE_ROOT)/ace/Event_Handler.h \
+ $(ACE_ROOT)/ace/Event_Handler.i \
+ $(ACE_ROOT)/ace/Service_Object.i \
+ $(ACE_ROOT)/ace/Thread_Manager.h \
+ $(ACE_ROOT)/ace/Thread.h \
+ $(ACE_ROOT)/ace/Thread.i \
+ $(ACE_ROOT)/ace/Synch.h \
+ $(ACE_ROOT)/ace/SV_Semaphore_Complex.h \
+ $(ACE_ROOT)/ace/SV_Semaphore_Simple.h \
+ $(ACE_ROOT)/ace/SV_Semaphore_Simple.i \
+ $(ACE_ROOT)/ace/SV_Semaphore_Complex.i \
+ $(ACE_ROOT)/ace/Synch.i \
+ $(ACE_ROOT)/ace/Synch_T.h \
+ $(ACE_ROOT)/ace/Synch_T.i \
+ $(ACE_ROOT)/ace/Atomic_Op.i \
+ $(ACE_ROOT)/ace/Containers.h \
+ $(ACE_ROOT)/ace/Containers.i \
+ $(ACE_ROOT)/ace/Thread_Manager.i \
+ $(ACE_ROOT)/ace/Task.i \
+ $(ACE_ROOT)/ace/Task_T.h \
+ $(ACE_ROOT)/ace/Message_Queue.h \
+ $(ACE_ROOT)/ace/Message_Block.h \
+ $(ACE_ROOT)/ace/Malloc.h \
+ $(ACE_ROOT)/ace/Malloc.i \
+ $(ACE_ROOT)/ace/Malloc_T.h \
+ $(ACE_ROOT)/ace/Free_List.h \
+ $(ACE_ROOT)/ace/Free_List.i \
+ $(ACE_ROOT)/ace/Malloc_T.i \
+ $(ACE_ROOT)/ace/Memory_Pool.h \
+ $(ACE_ROOT)/ace/Signal.h \
+ $(ACE_ROOT)/ace/Signal.i \
+ $(ACE_ROOT)/ace/Object_Manager.h \
+ $(ACE_ROOT)/ace/Object_Manager.i \
+ $(ACE_ROOT)/ace/Managed_Object.h \
+ $(ACE_ROOT)/ace/Managed_Object.i \
+ $(ACE_ROOT)/ace/Mem_Map.h \
+ $(ACE_ROOT)/ace/Mem_Map.i \
+ $(ACE_ROOT)/ace/Memory_Pool.i \
+ $(ACE_ROOT)/ace/Message_Block.i \
+ $(ACE_ROOT)/ace/IO_Cntl_Msg.h \
+ $(ACE_ROOT)/ace/Strategies.h \
+ $(ACE_ROOT)/ace/Strategies_T.h \
+ $(ACE_ROOT)/ace/Service_Config.h \
+ $(ACE_ROOT)/ace/Service_Config.i \
+ $(ACE_ROOT)/ace/Reactor.h \
+ $(ACE_ROOT)/ace/Handle_Set.h \
+ $(ACE_ROOT)/ace/Handle_Set.i \
+ $(ACE_ROOT)/ace/Timer_Queue.h \
+ $(ACE_ROOT)/ace/Timer_Queue_T.h \
+ $(ACE_ROOT)/ace/Timer_Queue_T.i \
+ $(ACE_ROOT)/ace/Reactor.i \
+ $(ACE_ROOT)/ace/Reactor_Impl.h \
+ $(ACE_ROOT)/ace/Svc_Conf_Tokens.h \
+ $(ACE_ROOT)/ace/Synch_Options.h \
+ $(ACE_ROOT)/ace/Hash_Map_Manager.h \
+ $(ACE_ROOT)/ace/Message_Queue.i \
+ $(ACE_ROOT)/ace/Task_T.i \
+ $(ACE_ROOT)/ace/Timer_Heap_T.h \
+ $(ACE_ROOT)/ace/Timer_Queue_Adapters.h \
+ $(ACE_ROOT)/ace/Timer_Queue_Adapters.i \
+ Thread_Bounded_Packet_Relay.h BPR_Drivers.h
+.obj/BPR_Drivers.o .obj/BPR_Drivers.so .shobj/BPR_Drivers.o .shobj/BPR_Drivers.so: BPR_Drivers.cpp \
+ $(ACE_ROOT)/ace/Auto_Ptr.h \
+ $(ACE_ROOT)/ace/ACE.h \
+ $(ACE_ROOT)/ace/OS.h \
+ $(ACE_ROOT)/ace/inc_user_config.h \
+ $(ACE_ROOT)/ace/config.h \
+ $(ACE_ROOT)/ace/streams.h \
+ $(ACE_ROOT)/ace/Basic_Types.h \
+ $(ACE_ROOT)/ace/Basic_Types.i \
+ $(ACE_ROOT)/ace/OS.i \
+ $(ACE_ROOT)/ace/Trace.h \
+ $(ACE_ROOT)/ace/Log_Msg.h \
+ $(ACE_ROOT)/ace/Log_Record.h \
+ $(ACE_ROOT)/ace/ACE.i \
+ $(ACE_ROOT)/ace/Log_Priority.h \
+ $(ACE_ROOT)/ace/Log_Record.i \
+ $(ACE_ROOT)/ace/Auto_Ptr.i BPR_Drivers.h \
+ $(ACE_ROOT)/ace/Task.h \
+ $(ACE_ROOT)/ace/Service_Object.h \
+ $(ACE_ROOT)/ace/Shared_Object.h \
+ $(ACE_ROOT)/ace/Shared_Object.i \
+ $(ACE_ROOT)/ace/Event_Handler.h \
+ $(ACE_ROOT)/ace/Event_Handler.i \
+ $(ACE_ROOT)/ace/Service_Object.i \
+ $(ACE_ROOT)/ace/Thread_Manager.h \
+ $(ACE_ROOT)/ace/Thread.h \
+ $(ACE_ROOT)/ace/Thread.i \
+ $(ACE_ROOT)/ace/Synch.h \
+ $(ACE_ROOT)/ace/SV_Semaphore_Complex.h \
+ $(ACE_ROOT)/ace/SV_Semaphore_Simple.h \
+ $(ACE_ROOT)/ace/SV_Semaphore_Simple.i \
+ $(ACE_ROOT)/ace/SV_Semaphore_Complex.i \
+ $(ACE_ROOT)/ace/Synch.i \
+ $(ACE_ROOT)/ace/Synch_T.h \
+ $(ACE_ROOT)/ace/Synch_T.i \
+ $(ACE_ROOT)/ace/Atomic_Op.i \
+ $(ACE_ROOT)/ace/Containers.h \
+ $(ACE_ROOT)/ace/Containers.i \
+ $(ACE_ROOT)/ace/Thread_Manager.i \
+ $(ACE_ROOT)/ace/Task.i \
+ $(ACE_ROOT)/ace/Task_T.h \
+ $(ACE_ROOT)/ace/Message_Queue.h \
+ $(ACE_ROOT)/ace/Message_Block.h \
+ $(ACE_ROOT)/ace/Malloc.h \
+ $(ACE_ROOT)/ace/Malloc.i \
+ $(ACE_ROOT)/ace/Malloc_T.h \
+ $(ACE_ROOT)/ace/Free_List.h \
+ $(ACE_ROOT)/ace/Free_List.i \
+ $(ACE_ROOT)/ace/Malloc_T.i \
+ $(ACE_ROOT)/ace/Memory_Pool.h \
+ $(ACE_ROOT)/ace/Signal.h \
+ $(ACE_ROOT)/ace/Signal.i \
+ $(ACE_ROOT)/ace/Object_Manager.h \
+ $(ACE_ROOT)/ace/Object_Manager.i \
+ $(ACE_ROOT)/ace/Managed_Object.h \
+ $(ACE_ROOT)/ace/Managed_Object.i \
+ $(ACE_ROOT)/ace/Mem_Map.h \
+ $(ACE_ROOT)/ace/Mem_Map.i \
+ $(ACE_ROOT)/ace/Memory_Pool.i \
+ $(ACE_ROOT)/ace/Message_Block.i \
+ $(ACE_ROOT)/ace/IO_Cntl_Msg.h \
+ $(ACE_ROOT)/ace/Strategies.h \
+ $(ACE_ROOT)/ace/Strategies_T.h \
+ $(ACE_ROOT)/ace/Service_Config.h \
+ $(ACE_ROOT)/ace/Service_Config.i \
+ $(ACE_ROOT)/ace/Reactor.h \
+ $(ACE_ROOT)/ace/Handle_Set.h \
+ $(ACE_ROOT)/ace/Handle_Set.i \
+ $(ACE_ROOT)/ace/Timer_Queue.h \
+ $(ACE_ROOT)/ace/Timer_Queue_T.h \
+ $(ACE_ROOT)/ace/Timer_Queue_T.i \
+ $(ACE_ROOT)/ace/Reactor.i \
+ $(ACE_ROOT)/ace/Reactor_Impl.h \
+ $(ACE_ROOT)/ace/Svc_Conf_Tokens.h \
+ $(ACE_ROOT)/ace/Synch_Options.h \
+ $(ACE_ROOT)/ace/Hash_Map_Manager.h \
+ $(ACE_ROOT)/ace/Message_Queue.i \
+ $(ACE_ROOT)/ace/Task_T.i \
+ $(ACE_ROOT)/ace/Timer_Heap_T.h \
+ $(ACE_ROOT)/ace/Timer_Queue_Adapters.h \
+ $(ACE_ROOT)/ace/Timer_Queue_Adapters.i
+.obj/bpr_thread.o .obj/bpr_thread.so .shobj/bpr_thread.o .shobj/bpr_thread.so: bpr_thread.cpp \
+ $(ACE_ROOT)/ace/Auto_Ptr.h \
+ $(ACE_ROOT)/ace/ACE.h \
+ $(ACE_ROOT)/ace/OS.h \
+ $(ACE_ROOT)/ace/inc_user_config.h \
+ $(ACE_ROOT)/ace/config.h \
+ $(ACE_ROOT)/ace/streams.h \
+ $(ACE_ROOT)/ace/Basic_Types.h \
+ $(ACE_ROOT)/ace/Basic_Types.i \
+ $(ACE_ROOT)/ace/OS.i \
+ $(ACE_ROOT)/ace/Trace.h \
+ $(ACE_ROOT)/ace/Log_Msg.h \
+ $(ACE_ROOT)/ace/Log_Record.h \
+ $(ACE_ROOT)/ace/ACE.i \
+ $(ACE_ROOT)/ace/Log_Priority.h \
+ $(ACE_ROOT)/ace/Log_Record.i \
+ $(ACE_ROOT)/ace/Auto_Ptr.i BPR_Drivers.h \
+ $(ACE_ROOT)/ace/Task.h \
+ $(ACE_ROOT)/ace/Service_Object.h \
+ $(ACE_ROOT)/ace/Shared_Object.h \
+ $(ACE_ROOT)/ace/Shared_Object.i \
+ $(ACE_ROOT)/ace/Event_Handler.h \
+ $(ACE_ROOT)/ace/Event_Handler.i \
+ $(ACE_ROOT)/ace/Service_Object.i \
+ $(ACE_ROOT)/ace/Thread_Manager.h \
+ $(ACE_ROOT)/ace/Thread.h \
+ $(ACE_ROOT)/ace/Thread.i \
+ $(ACE_ROOT)/ace/Synch.h \
+ $(ACE_ROOT)/ace/SV_Semaphore_Complex.h \
+ $(ACE_ROOT)/ace/SV_Semaphore_Simple.h \
+ $(ACE_ROOT)/ace/SV_Semaphore_Simple.i \
+ $(ACE_ROOT)/ace/SV_Semaphore_Complex.i \
+ $(ACE_ROOT)/ace/Synch.i \
+ $(ACE_ROOT)/ace/Synch_T.h \
+ $(ACE_ROOT)/ace/Synch_T.i \
+ $(ACE_ROOT)/ace/Atomic_Op.i \
+ $(ACE_ROOT)/ace/Containers.h \
+ $(ACE_ROOT)/ace/Containers.i \
+ $(ACE_ROOT)/ace/Thread_Manager.i \
+ $(ACE_ROOT)/ace/Task.i \
+ $(ACE_ROOT)/ace/Task_T.h \
+ $(ACE_ROOT)/ace/Message_Queue.h \
+ $(ACE_ROOT)/ace/Message_Block.h \
+ $(ACE_ROOT)/ace/Malloc.h \
+ $(ACE_ROOT)/ace/Malloc.i \
+ $(ACE_ROOT)/ace/Malloc_T.h \
+ $(ACE_ROOT)/ace/Free_List.h \
+ $(ACE_ROOT)/ace/Free_List.i \
+ $(ACE_ROOT)/ace/Malloc_T.i \
+ $(ACE_ROOT)/ace/Memory_Pool.h \
+ $(ACE_ROOT)/ace/Signal.h \
+ $(ACE_ROOT)/ace/Signal.i \
+ $(ACE_ROOT)/ace/Object_Manager.h \
+ $(ACE_ROOT)/ace/Object_Manager.i \
+ $(ACE_ROOT)/ace/Managed_Object.h \
+ $(ACE_ROOT)/ace/Managed_Object.i \
+ $(ACE_ROOT)/ace/Mem_Map.h \
+ $(ACE_ROOT)/ace/Mem_Map.i \
+ $(ACE_ROOT)/ace/Memory_Pool.i \
+ $(ACE_ROOT)/ace/Message_Block.i \
+ $(ACE_ROOT)/ace/IO_Cntl_Msg.h \
+ $(ACE_ROOT)/ace/Strategies.h \
+ $(ACE_ROOT)/ace/Strategies_T.h \
+ $(ACE_ROOT)/ace/Service_Config.h \
+ $(ACE_ROOT)/ace/Service_Config.i \
+ $(ACE_ROOT)/ace/Reactor.h \
+ $(ACE_ROOT)/ace/Handle_Set.h \
+ $(ACE_ROOT)/ace/Handle_Set.i \
+ $(ACE_ROOT)/ace/Timer_Queue.h \
+ $(ACE_ROOT)/ace/Timer_Queue_T.h \
+ $(ACE_ROOT)/ace/Timer_Queue_T.i \
+ $(ACE_ROOT)/ace/Reactor.i \
+ $(ACE_ROOT)/ace/Reactor_Impl.h \
+ $(ACE_ROOT)/ace/Svc_Conf_Tokens.h \
+ $(ACE_ROOT)/ace/Synch_Options.h \
+ $(ACE_ROOT)/ace/Hash_Map_Manager.h \
+ $(ACE_ROOT)/ace/Message_Queue.i \
+ $(ACE_ROOT)/ace/Task_T.i \
+ $(ACE_ROOT)/ace/Timer_Heap_T.h \
+ $(ACE_ROOT)/ace/Timer_Queue_Adapters.h \
+ $(ACE_ROOT)/ace/Timer_Queue_Adapters.i \
+ Thread_Bounded_Packet_Relay.h
+
+# IF YOU PUT ANYTHING HERE IT WILL GO AWAY
diff --git a/examples/Bounded_Packet_Relay/README b/examples/Bounded_Packet_Relay/README
new file mode 100644
index 00000000000..c067ffc215f
--- /dev/null
+++ b/examples/Bounded_Packet_Relay/README
@@ -0,0 +1,201 @@
+1. INTRODUCTION
+
+This directory contains an example that illustrates how to use both
+threaded and reactive concurrency mechanisms in ACE. The example
+application schedules and processes heterogenerous user input and
+timer-based events in the context of a bounded packet relay mechanism.
+
+This example is based on the notion of a time-bounded packet relay.
+In this example, a transmission begins, packets arrive from an input
+device object, and are transferred to an output device object by a
+relay object at a specified pace. The transfer continues until all
+packets have been relayed or a duration limit expires, at which point
+the transmission ends.
+
+User input is handled concurrently with a running transmission. You
+can run a transmission, cancel a transmission, change transmission
+parameters, view statistics from the most recent transmission, or exit
+the program, using selections from an interactive text-based menu.
+
+In addition, the example program can be run in batch mode, with the
+appropriate commands piped to the program in place of its standard
+input stream. Commands sent in batch mode must be separated by new
+lines. Menu prompts and error messages are directed to the standard
+error stream. The results of the transmission run and statistics
+reporting commands are directed to the standard output stream, so the
+results can be isolated and directed to a file.
+
+Transmission parameters are intialized to default values.
+Transmission parameter values persist until/unless they are
+subsequently modified by an appropriate command. If an invalid value
+for a command is given, or a run or report command is issued while a
+transmission is in progress, the offending command has no effect, and
+an error message is generated.
+
+2. USER INTERFACE
+
+Commands that can be given to the program include the following:
+
+Settings commands:
+
+ 1 <number of packets to relay in one transmission>
+
+ Minimum value is 1 packet, defaults to 1000 packets.
+
+ 2 <input packet arrival period (in usec)>
+
+ Minimum value is 1 usec, defaults to 500 usec (1/2 msec).
+
+ 3 <output packet transmission period (in usec)>
+
+ Minimum value is 1 usec, defaults to 1000 usec (1 msec).
+
+ 4 <limit on duration of transmission (in usec)>
+
+ Minimum value is 1 usec, defaults to 1500000 usec (1 1/2 sec).
+ A value of 0 is also a valid input, in which case no limit
+ will be placed on the duration of the transmission (it will
+ end when all packets have been relayed from the input device
+ to the output device).
+
+ 5 <logging level>
+
+ 0 - does no logging
+ 1 - reports time each packet arrived at the output device
+ as well as the contents of the message.
+
+Action commands:
+
+ 6 - runs a transmission using the current settings
+
+ 7 - cancels a transmission if there is one running
+
+ 8 - reports statistics from the most recent transmission
+
+ 9 - quits the program
+
+3. APPLICATION DESIGN
+
+3.1. KEY COMPONENTS
+
+The design of this example application consists of four main
+components: the driver object, the relay object, the input device
+object, and the output device object.
+
+The driver object is responsible for receiving user input and overall
+handling of user input commands. The driver object is active, with
+separate threads for receiving user input and managing its
+transmission timer queue. This allows the user to issue commands
+while a transmission is in progress. The driver object uses an
+ACE_Thread_Timer_Queue_Adapter, which is derived from ACE_Task_Base.
+The driver object runs inside another active object, called
+User_Input_Task, which is also derived from ACE_Task_Base. This
+allows both the timer queue and the user input object to be made
+active, running in their own threads of control.
+
+The relay object is passive, managing a message queue and necessary
+locks to allow safe access from multiple threads. It provides methods
+to receive and enqueue a mesage from the input device, dequeue a
+message and send it to the output device, and to start or end a
+transmission. It uses ACE_Message_Queue (which contains ACE_Message_Block
+objects) and ACE_Thread_Mutex objects to implement this functionality.
+
+The input object is active, managing timeouts and input events in its
+own thread. The input object is also reactive, using an ACE_Reactor to
+allow response to multiple input handles as well as to do polling at
+specific timeouts. The input pseudo-device wrapper in this example
+does not make use of input handles and only does timed polling, but
+extending this only requires registering the appropriate input handles
+and handlers with the reactor. The input object is derived from
+ACE_Task_Base, and is activated by the relay object when a new
+transmission starts. The input object packages each data packet in an
+ACE_Message_Block before sending it to the relay object.
+
+The output object is passive. If logging is turned on, it will report
+the arrival time, relative to the start of the transmission, of each
+output message, and the contents of that message. The output object
+will also "consume" each ACE_Message_Block passed to it, calling
+delete on the passed pointer.
+
+3.2. RUN-TIME CHARACTERSITICS
+
+When the user initiates a transmission, the appropriate settings are
+passed by the driver to the relay object's start transmission method.
+The relay object tries to acquire a lock for a new transmission. If
+it fails to do so, another transmission is in progress, and the
+methods returns an error. Otherwise, the relay object's start
+transmission method initializes itself and the input and output device
+objects, activates the input device object, and stores the handle for
+the new input device thread.
+
+The driver then constructs a timeout handler with a count of the
+number of messages to relay and a send timeout value, and pushes a
+timer with this handler onto the timer queue. If there is a limit on
+the duration of the transmission, the driver constructs a different
+handler for end-of- transmission, and pushes a timer for the end of
+the transmission with this handler onto the timer queue as well. When
+the user issues a cancel transmission command, the driver calls the
+relay's end transmission method.
+
+When a send timeout expires, the handler (running in the timer queue
+thread) calls the send method of the relay. If there are any enqueued
+messages from the input device object in its queue, the relay object's
+send method will dequeue the message, pass it to the output device
+object, and return success. If there are no messages in the queue,
+the relay object's send method will return failure. If the send was
+successful, the handler will decrement its count of remaining
+messages. If the count is greater than zero, the handler will
+register a new send timer for itself and exit. If the count is zero,
+then the handler will call the relay's end transmission method, clear
+the timer queue, and release the transmission-in-progress semaphore
+before exiting.
+
+Similarly, if the end-of-transmission timer expires before all packets
+have been sent, that handler will call the relay's end transmission
+method, clear the timer queue, release the semaphore, and then exit.
+
+When the relay's end transmission method is called, it marks the relay
+itself inactive, and calls the input device's deactivate method, which
+sets the input device's activity flag to inactive (see below). The
+relay's end transmission method then waits on the stored input thread
+handle until that thread exits, before returning.
+
+While it is still active, the input device thread blocks on a reactor
+timeout for the duration it was given by the relay. When the timeout
+expires, the input device checks a flag to see if it is still active.
+If the flag says the input device is inactive, the thread simply
+exits. Tis allows cancellation of the input device when a
+transmission has ended. If the flag says it is still active, the
+thread builds a new character buffer, and wraps this with a new
+ACE_Message_Block. The input device passes this message block to the
+execution method of the send input message command object with which
+it was constructed. This level of indirection allows the input device
+to be used with arbitrary types, so that it could for example be
+connected directly to an output device. The input device also
+maintains a count of the number of messages it has sent. Once the
+input device has sent all its messages, it marks itself inactive, and
+its thread simply exits.
+
+4. ACCESSING THE SOURCE CODE
+
+The files for this example are located in
+$ACE_ROOT/examples/Bounded_Packet_Relay in the latest release of ACE,
+which will be located at
+
+http://www.cs.wustl.edu/~schmidt/ACE_wrappers/ACE.tar.gz
+
+when the example application is complete.
+
+Source Files: Thread_Bounded_Packet_Relay.h
+ Thread_Bounded_Packet_Relay.cpp
+ BPR_Driver.h
+ BPR_Driver.cpp
+ bpr_thread.cpp
+
+Make file: Makefile
+
+Doc file: README (this file)
+
+Executable: bpr_thread
+
+
diff --git a/examples/Bounded_Packet_Relay/Thread_Bounded_Packet_Relay.cpp b/examples/Bounded_Packet_Relay/Thread_Bounded_Packet_Relay.cpp
new file mode 100644
index 00000000000..17d9f2d947e
--- /dev/null
+++ b/examples/Bounded_Packet_Relay/Thread_Bounded_Packet_Relay.cpp
@@ -0,0 +1,672 @@
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// examples
+//
+// = FILENAME
+// Thread_Bounded_Packet_Relay.cpp
+//
+// = DESCRIPTION
+// Method definitions for a threaded bounded packet relay
+//
+// = AUTHORS
+// Chris Gill <cdgill@cs.wustl.edu>
+//
+// Based on the threaded timer queue driver by
+//
+// Carlos O'Ryan <coryan@cs.wustl.edu>
+// Douglas C. Schmidt <schmidt@cs.wustl.edu>
+//
+// ============================================================================
+
+ACE_RCSID(Bounded_Packet_Relay, Thread_Bounded_Packet_Relay, "$Id$")
+
+#include "ace/Task.h"
+#include "ace/Timer_Heap_T.h"
+#include "ace/Timer_Queue_Adapters.h"
+
+#include "Thread_Bounded_Packet_Relay.h"
+
+// constructor
+
+template <class RECEIVER, class ACTION>
+Command<RECEIVER, ACTION>::Command (RECEIVER &recvr,
+ ACTION action)
+ : receiver_ (recvr),
+ action_ (action)
+{
+}
+
+// invokes an operation.
+
+template <class RECEIVER, class ACTION> int
+Command<RECEIVER, ACTION>::execute (void *arg)
+{
+ return (receiver_.*action_) (arg);
+}
+
+
+Text_Input_Device_Wrapper::Text_Input_Device_Wrapper (ACE_Thread_Manager * input_task_mgr,
+ size_t read_length, const char* text)
+ : Input_Device_Wrapper_Base (input_task_mgr)
+ , read_length_ (read_length)
+ , text_ (text)
+ , index_ (0)
+{
+}
+ // ctor
+
+Text_Input_Device_Wrapper::~Text_Input_Device_Wrapper ()
+{
+}
+ // dtor
+
+
+ACE_Message_Block *
+Text_Input_Device_Wrapper::create_input_message ()
+{
+ // construct a new message block to send
+ ACE_Message_Block *message;
+ ACE_NEW_RETURN (message,
+ ACE_Message_Block (read_length_),
+ 0);
+
+ // zero out a "read" buffer to hold data
+ char read_buf [BUFSIZ];
+ ACE_OS::memset (read_buf, 0, BUFSIZ);
+
+ // loop through the text, filling in data to copy into
+ // the read buffer (leaving room for a terminating zero)
+ for (size_t i = 0; i < read_length_ - 1; ++i)
+ {
+ read_buf [i] = text_ [index_];
+ index_ = (index_ + 1) % ACE_OS::strlen (text_);
+ }
+
+ // Copy buf into the Message_Block and update the wr_ptr ().
+ if (mb->copy (read_buf, read_length_) < 0)
+ {
+ delete message;
+ ACE_ERROR_RETURN ((LM_ERROR, "read buffer copy failed"), 0);
+ }
+
+ return message;
+}
+ // creates a new message block, carrying data
+ // read from the underlying input device
+
+
+Text_Output_Driver_Wrapper::Text_Output_Driver_Wrapper (int logging)
+ : logging_ (logging)
+{
+}
+ // default ctor
+
+int
+Text_Output_Driver_Wrapper::write_output_message (void *message)
+{
+ ACE_Message_Block *message;
+}
+ // consume and possibly print out the passed message
+
+int
+Text_Output_Driver_Wrapper::modify_device_settings (void *logging)
+{
+ if (logging)
+ {
+ logging_ = * ACE_static_cast (int *, logging);
+ }
+ else
+ {
+ ACE_ERROR_RETURN ((LM_ERROR, "null logging level pointer"), -1);
+ }
+}
+ // modifies device settings based on passed pointer to a u_long
+ // turns logging on if u_long is non-zero, off if u_long is zero,
+ // and does nothing if the pointer is null.
+
+
+User_Input_Task::User_Input_Task (Thread_Timer_Queue *queue,
+ Thread_Bounded_Packet_Relay_Driver &tbprd)
+ : ACE_Task_Base (ACE_Thread_Manager::instance ()),
+ queue_ (queue),
+ usecs_ (ACE_ONE_SECOND_IN_USECS),
+ driver_ (tbprd)
+{
+}
+ // ctor
+
+int
+User_Input_Task::svc (void)
+{
+ for (;;)
+ // call back to the driver's implementation on how to read and
+ // parse input.
+ if (this->driver_.get_next_request () == -1)
+ break;
+
+ // we are done.
+ this->relay_->end_transmission (Bounded_Packet_Relay_Driver::CANCELLED);
+ this->queue_->deactivate ();
+ ACE_DEBUG ((LM_DEBUG, "terminating input thread\n"));
+ return 0;
+}
+ // This method runs the event loop in the new thread.
+
+ // = Some helper methods.
+
+int
+User_Input_Task::set_packet_count (void *argument)
+{
+ if (argument)
+ {
+ packet_count_ = * ACE_static_cast (int *, argument);
+ return 0;
+ }
+
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "User_Input_Task::set_packet_count: null argument"),
+ -1);
+}
+ // set the number of packets for the next transmission.
+
+int
+User_Input_Task::set_arrival_period (void *argument)
+{
+ if (argument)
+ {
+ arrival_period_ = * ACE_static_cast (int *, argument);
+ return 0;
+ }
+
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "User_Input_Task::set_arrival_period: null argument"),
+ -1);
+}
+ // sets the input device packet arrival period (usecs)
+ // for the next transmission.
+
+int
+User_Input_Task::set_send_period (void *argument)
+{
+ if (argument)
+ {
+ send_period_ = * ACE_static_cast (int *, argument);
+ return 0;
+ }
+
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "User_Input_Task::set_send_period: null argument"),
+ -1);
+}
+ // sets the period between output device sends (usecs)
+ // for the next transmission.
+
+int
+User_Input_Task::set_duration_limit (void *argument)
+{
+ if (argument)
+ {
+ duration_limit_ = * ACE_static_cast (int *, argument);
+ return 0;
+ }
+
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "User_Input_Task::set_duration_limit: null argument"),
+ -1);
+}
+ // sets a limit on the transmission duration (usecs)
+
+int
+User_Input_Task::set_logging_level (void *argument)
+{
+ if (argument)
+ {
+ logging_level_ = * ACE_static_cast (int *, argument);
+ return 0;
+ }
+
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "User_Input_Task::set_logging_level: null argument"),
+ -1);
+}
+ // sets logging level (0 or 1) for output device
+ // for the next transmission
+
+int
+User_Input_Task::run_transmission (void *argument)
+{
+ // Macro to avoid "warning: unused parameter" type warning.
+ ACE_UNUSED_ARG (argument);
+
+ if (relay_)
+ {
+ switch (relay_->start_transmission (packet_count_,
+ arrival_period_,
+ logging_level_))
+ {
+ case 1:
+ ACE_OS::fprintf (ACE_STDERR,
+ "\nRun transmission: "
+ "transmission already in progress\n");
+ return 0;
+ /* not reached */
+
+ case 0:
+ {
+ ACE_Time_Value now = ACE_OS::gettimeofday ();
+ ACE_Time_Value send_every (0, send_period_);
+ ACE_Time_Value send_at (send_every + now);
+
+ Send_Handler *send_handler;
+
+ ACE_NEW_RETURN (send_handler,
+ Send_Handler (packet_count_,
+ send_every,
+ *relay_,
+ *queue_),
+ -1);
+
+ if (queue_->schedule (send_handler, 0, send_at) < 0)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "User_Input_Task::run_transmission: "
+ "failed to schedule send handler"),
+ -1);
+ }
+
+ if (duration_limit_)
+ {
+ ACE_Time_Value terminate_at (0, duration_limit_);
+ terminate_at += now;
+
+ Termination_Handler *termination_handler;
+
+ ACE_NEW_RETURN (termination_handler,
+ Termination_Handler (*relay_,
+ *queue_),
+ -1);
+
+ if (queue_->schedule (termination_handler, 0, terminate_at) < 0)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "User_Input_Task::run_transmission: "
+ "failed to schedule termination handler"),
+ -1);
+ }
+ }
+
+ return 0;
+ }
+ /* not reached */
+
+ default:
+ return -1;
+ /* not reached */
+ }
+ }
+
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "User_Input_Task::run_transmission: "
+ "relay not instantiated"),
+ -1);
+}
+ // runs the next transmission (if one is not in progress)
+
+int
+User_Input_Task::end_transmission (void *argument)
+{
+ // Macro to avoid "warning: unused parameter" type warning.
+ ACE_UNUSED_ARG (argument);
+
+ if (relay_)
+ {
+ Bounded_Packet_Relay::Transmission_Status *status;
+
+ status =
+ ACE_static_cast (Bounded_Packet_Relay::Transmission_Status *,
+ argument);
+
+ if (status)
+ {
+ switch (relay_->end_transmission (*status)
+ {
+ case 1:
+ ACE_OS::fprintf (ACE_STDERR,
+ "\nEnd transmission: "
+ "no transmission in progress\n");
+ return 0;
+ /* not reached */
+
+ case 0:
+ // cancel any remaining timers
+ ACE_Timer_Node_T <ACE_Event_Handler *> *node;
+ while ((node = queue_->timer_queue ().get_first ()))
+ {
+ queue->cancel (node->get_timer_id (), 0);
+ }
+ return 0;
+ /* not reached */
+
+ default:
+ return -1;
+ /* not reached */
+ }
+ }
+
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "User_Input_Task::end_transmission: "
+ "null argument"),
+ -1);
+ }
+
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "User_Input_Task::end_transmission: "
+ "relay not instantiated"),
+ -1);
+}
+ // ends the current transmission (if one is in progress)
+
+int
+User_Input_Task::report_stats (void *argument)
+{
+ // Macro to avoid "warning: unused parameter" type warning.
+ ACE_UNUSED_ARG (argument);
+
+ if (relay_)
+ {
+ switch (relay_->report_statistics ())
+ {
+ case 1:
+ ACE_OS::fprintf (ACE_STDERR,
+ "\nRun transmission: "
+ "\transmission already in progress\n");
+ /* fall through to next case */
+
+ case 0:
+ return 0;
+ /* not reached */
+
+ default:
+ return -1;
+ /* not reached */
+ }
+ }
+
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "User_Input_Task::report_stats: "
+ "relay not instantiated"),
+ -1);
+}
+ // reports statistics for the previous transmission
+ // (if one is not in progress)
+
+ int
+User_Input_Task::shutdown (void *argument)
+{
+ // Macro to avoid "warning: unused parameter" type warning.
+ ACE_UNUSED_ARG (argument);
+
+#if !defined (ACE_LACKS_PTHREAD_CANCEL)
+ // Cancel the thread timer queue task "preemptively."
+ ACE_Thread::cancel (this->queue_->thr_id ());
+#else
+ // Cancel the thread timer queue task "voluntarily."
+ this->queue_->deactivate ();
+#endif /* ACE_LACKS_PTHREAD_CANCEL */
+
+ // -1 indicates we are shutting down the application.
+ return -1;
+}
+ // Shutdown task.
+
+
+Send_Handler::Send_Handler (u_long send_count,
+ const ACE_Time_Value &duration,
+ Bounded_Packet_Relay<ACE_Thread_Mutex> &relay,
+ Thread_Timer_Queue &queue)
+ : send_count_ (send_count)
+ , duration_ (duration)
+ , relay_ (relay)
+ , queue_ (queue)
+{
+}
+ // ctor
+
+Send_Handler::~Send_Handler (void)
+{
+}
+ // dtor
+
+int
+Send_Handler::handle_timeout (const ACE_Time_Value &current_time,
+ const void *arg)
+{
+ switch (relay_->send_input ())
+ {
+ case 0:
+ // decrement count of packets to relay
+ --send_count_;
+ /* fall through to next case */
+ case 1:
+ if (send_count_ > 0)
+ {
+ // re-register the handler for a new timeout
+ if (queue_->schedule (this, 0,
+ duration_ + ACE_OS::gettimeofday ()) < 0)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Send_Handler::handle_timeout: "
+ "failed to reschedule send handler"),
+ -1);
+ }
+ return 0;
+ }
+ else
+ {
+ // all packets are sent, time to cancel any other
+ // timers, end the transmission, and go away
+ ACE_Timer_Node_T <ACE_Event_Handler *> *node;
+ while ((node = queue_->timer_queue ().get_first ()))
+ {
+ queue->cancel (node->get_timer_id (), 0);
+ }
+ relay_->end_transmission (Bounded_Packet_Relay::COMPLETED);
+ delete this;
+ return 0;
+ }
+ /* not reached */
+ default:
+ return -1;
+ }
+}
+ // Call back hook.
+
+int
+Send_Handler::cancelled (void)
+{
+ delete this;
+ return 0;
+}
+ // Cancellation hook
+
+
+Termination_Handler::Termination_Handler (Bounded_Packet_Relay<ACE_Thread_Mutex> &relay,
+ Thread_Timer_Queue &queue)
+ : relay_ (relay)
+ , queue_ (queue)
+{
+}
+ // ctor
+
+Termination_Handler::~Termination_Handler (void)
+{
+}
+ // dtor
+
+int
+Termination_Handler::handle_timeout (const ACE_Time_Value &current_time,
+ const void *arg)
+{
+ // transmission timed out, cancel any other
+ // timers, end the transmission, and go away
+ ACE_Timer_Node_T <ACE_Event_Handler *> *node;
+ while ((node = queue_->timer_queue ().get_first ()))
+ {
+ queue->cancel (node->get_timer_id (), 0);
+ }
+ relay_->end_transmission (Bounded_Packet_Relay::TIMED_OUT);
+ delete this;
+ return 0;
+}
+ // Call back hook.
+
+int
+Termination_Handler::cancelled (void)
+{
+ delete this;
+ return 0;
+}
+ // Cancellation hook
+
+Thread_Bounded_Packet_Relay_Driver::Thread_Bounded_Packet_Relay_Driver (void);
+ : input_task_ (&timer_queue_, *this)
+{
+}
+// ctor
+
+Thread_Bounded_Packet_Relay_Driver::~Thread_Bounded_Packet_Relay_Driver (void);
+{
+}
+// dtor
+
+int
+Thread_Bounded_Packet_Relay_Driver::display_menu (void);
+{
+ static char menu[] =
+ "\n\n Options:\n"
+ " -------\n"
+ " 1 <number of packets to relay in one transmission>\n"
+ " min = 1 packet, default = 1000.\n"
+ " 2 <input packet arrival period (in usec)>\n"
+ " min = 1, default = 500.\n"
+ " 3 <output packet transmission period (in usec)>\n"
+ " min = 1, default = 1000.\n"
+ " 4 <limit on duration of transmission (in usec)>\n"
+ " min = 1, default = 1500000, no limit = 0.\n"
+ " 5 <logging level>\n"
+ " no logging = 0 (default), logging = 1.\n"
+ " 6 - run a transmission using the current settings\n"
+ " 7 - cancel transmission (if there is one running)\n"
+ " 8 - report statistics from the most recent transmission\n"
+ " 9 - quit the program\n";
+
+ ACE_OS::fprintf(ACE_STDERR, "%s", menu);
+
+ return 0;
+}
+// display the user menu
+
+int
+Thread_Bounded_Packet_Relay_Driver::init (void);
+{
+ typedef Command<Input_Task, Input_Task::ACTION> CMD;
+
+ // initialize the <Command> objects with their corresponding
+ // methods from <Input_Task>
+ ACE_NEW_RETURN (packet_count_cmd_,
+ CMD (input_task_, &User_Input_Task::set_packet_count),
+ -1);
+
+ ACE_NEW_RETURN (arrival_period_cmd_,
+ CMD (input_task_, &User_Input_Task::set_arrival_period),
+ -1);
+
+ ACE_NEW_RETURN (transmit_period_cmd_,
+ CMD (input_task_, &User_Input_Task::set_send_period),
+ -1);
+
+ ACE_NEW_RETURN (duration_limit_cmd_,
+ CMD (input_task_, &User_Input_Task::set_duration_limit),
+ -1);
+
+ ACE_NEW_RETURN (logging_level_cmd_,
+ CMD (input_task_, &User_Input_Task::set_logging_level),
+ -1);
+
+ ACE_NEW_RETURN (run_transmission_cmd_,
+ CMD (input_task_, &User_Input_Task::run_transmission),
+ -1);
+
+ ACE_NEW_RETURN (cancel_transmission_cmd_,
+ CMD (input_task_, &User_Input_Task::end_transmission),
+ -1);
+
+ ACE_NEW_RETURN (report_stats_cmd_,
+ CMD (input_task_, &User_Input_Task::report_stats),
+ -1);
+
+ ACE_NEW_RETURN (shutdown_cmd_,
+ CMD (input_task_, &User_Input_Task::shutdown),
+ -1);
+
+ if (this->input_task_.activate () == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "cannot activate input task"), -1);
+
+ if (this->timer_queue_.activate () == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "cannot activate timer queue"), -1);
+
+ if (ACE_Thread_Manager::instance ()->wait () == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "wait on Thread_Manager failed"),-1);
+
+ return 0;
+}
+// initialize the driver
+
+int
+Thread_Bounded_Packet_Relay_Driver::run (void);
+{
+ this->init ();
+ return 0;
+}
+// run the driver
+
+
+#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
+template class ACE_Thread_Timer_Queue_Adapter<Timer_Heap>;
+template class Bounded_Packet_Relay_Driver<Thread_Timer_Queue,
+ Input_Task,
+ Input_Task::ACTION>;
+template class Command<Input_Task, Input_Task::ACTION>;
+#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
+#pragma instantiate ACE_Thread_Timer_Queue_Adapter<Timer_Heap>
+#pragma instantiate Bounded_Packet_Relay_Driver<Thread_Timer_Queue, \
+ Input_Task, \
+ Input_Task::ACTION>
+#pragma instantiate Command<Input_Task, Input_Task::ACTION>
+#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
+
+#if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0)
+// These templates will specialized in libACE.* if the platforms does
+// not define ACE_MT_SAFE.
+
+#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
+template class ACE_Thread_Condition<ACE_Thread_Mutex>;
+template class ACE_Condition<ACE_Thread_Mutex>;
+template class ACE_Event_Handler_Handle_Timeout_Upcall<ACE_Null_Mutex>;
+template class ACE_Timer_Queue_T<ACE_Event_Handler *, ACE_Event_Handler_Handle_Timeout_Upcall<ACE_Null_Mutex>, ACE_Null_Mutex>;
+template class ACE_Timer_Heap_T<ACE_Event_Handler *, ACE_Event_Handler_Handle_Timeout_Upcall<ACE_Null_Mutex>, ACE_Null_Mutex>;
+template class ACE_Timer_Heap_Iterator_T<ACE_Event_Handler *, ACE_Event_Handler_Handle_Timeout_Upcall<ACE_Null_Mutex>, ACE_Null_Mutex>;
+template class ACE_Timer_Queue_Iterator_T<ACE_Event_Handler *, ACE_Event_Handler_Handle_Timeout_Upcall<ACE_Null_Mutex>, ACE_Null_Mutex>;
+#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
+#pragma instantiate ACE_Thread_Condition<ACE_Thread_Mutex>
+#pragma instantiate ACE_Condition<ACE_Thread_Mutex>
+#pragma instantiate ACE_Event_Handler_Handle_Timeout_Upcall<ACE_Null_Mutex>
+#pragma instantiate ACE_Timer_Queue_T<ACE_Event_Handler *, ACE_Event_Handler_Handle_Timeout_Upcall<ACE_Null_Mutex>, ACE_Null_Mutex>
+#pragma instantiate ACE_Timer_Heap_T<ACE_Event_Handler *, ACE_Event_Handler_Handle_Timeout_Upcall<ACE_Null_Mutex>, ACE_Null_Mutex>
+#pragma instantiate ACE_Timer_Heap_Iterator_T<ACE_Event_Handler *, ACE_Event_Handler_Handle_Timeout_Upcall<ACE_Null_Mutex>, ACE_Null_Mutex>
+#pragma instantiate ACE_Timer_Queue_Iterator_T<ACE_Event_Handler *, ACE_Event_Handler_Handle_Timeout_Upcall<ACE_Null_Mutex>, ACE_Null_Mutex>
+#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
+#endif /* ACE_MT_SAFE */
diff --git a/examples/Bounded_Packet_Relay/Thread_Bounded_Packet_Relay.h b/examples/Bounded_Packet_Relay/Thread_Bounded_Packet_Relay.h
new file mode 100644
index 00000000000..32bff6f8990
--- /dev/null
+++ b/examples/Bounded_Packet_Relay/Thread_Bounded_Packet_Relay.h
@@ -0,0 +1,359 @@
+/* -*- C++ -*- */
+
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// examples
+//
+// = FILENAME
+// Thread_Bounded_Packet_Relay.h
+//
+// = DESCRIPTION
+// This code exercises the <ACE_Thread_Timer_Queue_Adapter> using
+// an <ACE_Timer_Heap_T>.
+//
+// = AUTHORS
+// Carlos O'Ryan <coryan@cs.wustl.edu> and
+// Sergio Flores-Gaitan <sergio@cs.wustl.edu>
+//
+// ============================================================================
+
+#if !defined (_THREAD_BOUNDED_PACKET_RELAY_H_)
+#define _THREAD_BOUNDED_PACKET_RELAY_H_
+
+#include "ace/Task.h"
+#include "ace/Timer_Heap_T.h"
+#include "ace/Timer_Queue_Adapters.h"
+#include "BPR_Drivers.h"
+
+// These typedefs ensure that we use the minimal amount of locking
+// necessary.
+typedef ACE_Event_Handler_Handle_Timeout_Upcall<ACE_Null_Mutex>
+ Upcall;
+typedef ACE_Timer_Heap_T<ACE_Event_Handler *,
+ Upcall,
+ ACE_Null_Mutex>
+ Timer_Heap;
+typedef ACE_Timer_Heap_Iterator_T<ACE_Event_Handler *,
+ Upcall,
+ ACE_Null_Mutex>
+ Timer_Heap_Iterator;
+typedef ACE_Thread_Timer_Queue_Adapter<Timer_Heap>
+ Thread_Timer_Queue;
+
+// Forward declaration.
+class Thread_Bounded_Packet_Relay_Driver;
+
+template <class RECEIVER, class ACTION>
+class Command : public Command_Base
+ // = TITLE
+ // Defines an abstract class that allows us to invoke commands
+ // without knowing anything about the implementation. This class
+ // is used in the <Bounded_Packet_Relay_Driver> to invoke operations
+ // of the driver.
+ //
+ // = DESCRIPTION
+ // This class declares an interface to execute operations,
+ // binding a RECEIVER object with an ACTION. The RECEIVER knows
+ // how to implement the operation. A class can invoke operations
+ // without knowing anything about it, or how it was implemented.
+{
+public:
+
+ Command (RECEIVER &recvr, ACTION action);
+ // Sets the <receiver_> of the Command to recvr, and the
+ // <action_> of the Command to <action>.
+
+ virtual int execute (void *arg);
+ // Invokes the method <action_> from the object <receiver_>.
+
+private:
+ RECEIVER &receiver_;
+ // object where the method resides.
+
+ ACTION action_;
+ // method that is going to be invoked.
+};
+
+
+class Text_Input_Device_Wrapper : public Input_Device_Wrapper_Base
+ // = TITLE
+ // Defines a wrapper for a simple active looping text input
+ // pseudo-device.
+ //
+ // = DESCRIPTION
+ // The wrapper is an active object, running in its own thread, and uses a
+ // reactor to generate timeouts. When a timeout occurs, the wrapper
+ // calls its concrete message creation method. The wrapper then calls
+ // its base class message send method to forward the message to the
+ // receiver.
+ //
+ // A more sophisticated version of this class would use the reactive
+ // capabilities as well as the timeout generating capabilities of the
+ // reactor, multiplexing several input streams. Comments to this
+ // effect appear in the definition of the event loop method.
+{
+ public:
+
+ Text_Input_Device_Wrapper (ACE_Thread_Manager * input_task_mgr, size_t read_length, const char* text);
+ // ctor
+
+ ~Text_Input_Device_Wrapper ();
+ // dtor
+
+protected:
+
+ virtual ACE_Message_Block *create_input_message ();
+ // creates a new message block, carrying data
+ // read from the underlying input device
+
+private:
+
+ size_t read_length_;
+ // length of the buffer into which to "read"
+
+ const char *text_;
+ // text to "read" into the buffer
+
+ size_t index_;
+ // index into the string
+};
+
+
+class Text_Output_Driver_Wrapper : public Output_Device_Wrapper_Base
+ // = TITLE
+ // Implements a simple wrapper for a output pseudo-device.
+ //
+ // = DESCRIPTION
+ // Data from the passed output message is printed to the standard
+ // output stream, if logging is turned on.
+{
+public:
+
+ Text_Output_Driver_Wrapper (int logging = 0);
+ // default ctor
+
+ // = Command Accessible Entry Points
+
+ virtual int write_output_message (void *message);
+ // consume and possibly print out the passed message
+
+ virtual int modify_device_settings (void *logging);
+ // modifies device settings based on passed pointer to a u_long
+ // turns logging on if u_long is non-zero, off if u_long is zero,
+ // and does nothing if the pointer is null.
+
+private:
+
+ int logging_;
+ // 0 if logging is turned off, non-zero otherwise
+};
+
+
+class User_Input_Task : public ACE_Task_Base
+{
+ // = TITLE
+ // Read user actions on the Timer_Queue from stdin.
+ //
+ // = DESCRIPTION
+ // This class reads user input from stdin; those commands permit
+ // the control of a Timer_Queue, which is dispatched by another
+ // thread.
+public:
+
+ typedef int (User_Input_Task::*ACTION) (void *);
+ // trait for command accessible entry points
+
+ User_Input_Task (Thread_Timer_Queue *queue,
+ Thread_Bounded_Packet_Relay_Driver &timer_queue_driver);
+ // ctor
+
+ virtual int svc (void);
+ // This method runs the event loop in the new thread.
+
+ // = Some helper methods.
+
+ int set_packet_count (void *);
+ // set the number of packets for the next transmission.
+
+ int set_arrival_period (void *);
+ // sets the input device packet arrival period (usecs)
+ // for the next transmission.
+
+ int set_send_period (void *);
+ // sets the period between output device sends (usecs)
+ // for the next transmission.
+
+ int set_duration_limit (void *);
+ // sets a limit on the transmission duration (usecs)
+
+ int set_logging_level (void *);
+ // sets logging level (0 or 1) for output device
+ // for the next transmission
+
+ int run_transmission (void *);
+ // runs the next transmission (if one is not in progress)
+
+ int end_transmission (void *);
+ // ends the current transmission (if one is in progress)
+
+ int report_stats (void *);
+ // reports statistics for the previous transmission
+ // (if one is not in progress)
+
+ int shutdown (void *);
+ // Shutdown task.
+
+private:
+
+ const int usecs_;
+ // How many microseconds are in a second.
+
+ Bounded_Packet_Relay<ACE_Thread_Mutex> *relay_;
+ // the bounded packet relay
+
+ Thread_Timer_Queue *queue_;
+ // The timer queue implementation.
+
+ Thread_Bounded_Packet_Relay_Driver &driver_;
+ // The thread timer queue test driver.
+
+ u_long packet_count_;
+ // count of packets to send in a transmission
+
+ u_long arrival_period_;
+ // rate at which input packets are to arrive
+
+ u_long send_period_;
+ // rate at which packets are to be relayed (usec)
+
+ u_long duration_limit_;
+ // limit on the duration of the transmission (usec)
+
+ u_long logging_level_;
+ // logging level
+
+};
+
+
+
+class Send_Handler : public ACE_Event_Handler
+{
+ // = TITLE
+ // Event handler for message send timeout events.
+ //
+ // = DESCRIPTION
+ // The <handle_timeout> hook method calls the relay's send
+ // method and decrements its count of messages to send.
+ // If there are still messages to send, it re-registers itself
+ // with the timer queue. Otherwise it calls the relay's end
+ // transmission method, clears the timer queue, and then deletes "this".
+public:
+ Send_Handler (u_long send_count,
+ const ACE_Time_Value &duration,
+ Bounded_Packet_Relay<ACE_Thread_Mutex> &relay,
+ Thread_Timer_Queue &queue);
+ // ctor
+
+ ~Send_Handler (void);
+ // dtor
+
+ virtual int handle_timeout (const ACE_Time_Value &current_time,
+ const void *arg);
+ // Call back hook.
+
+ virtual int cancelled (void);
+ // Cancellation hook
+
+private:
+
+ u_long send_count_;
+ // Count of the number of messages to send from the
+ // relay object to the output device object.
+
+ ACE_Time_Value duration_;
+ // Store the expected duration until expiration, it is used to
+ // re-register the handler if there are still sends to perform.
+
+ Bounded_Packet_Relay<ACE_Thread_Mutex> &relay_;
+ // Store a reference to the relay object on which to invoke
+ // the appropritate calls when the timer expires
+
+ Thread_Timer_Queue &queue_;
+ // Store a reference to the timer queue, in which we'll re-register
+ // ourselves if there are still sends to perform
+
+};
+
+class Termination_Handler : public ACE_Event_Handler
+{
+ // = TITLE
+ // Event handler for end transmission timeout events.
+ //
+ // = DESCRIPTION
+ // The <handle_timeout> hook method calls the relay's end
+ // transmission method, and then deletes "this".
+public:
+ Termination_Handler (Bounded_Packet_Relay<ACE_Thread_Mutex> &relay,
+ Thread_Timer_Queue &queue);
+ // ctor
+
+ ~Termination_Handler (void);
+ // dtor
+
+ virtual int handle_timeout (const ACE_Time_Value &current_time,
+ const void *arg);
+ // Call back hook.
+
+ virtual int cancelled (void);
+ // Cancellation hook
+
+private:
+
+ Bounded_Packet_Relay<ACE_Thread_Mutex> &relay_;
+ // Store a reference to the relay object on which to invoke
+ // the end transmission call when the timer expires
+
+ Thread_Timer_Queue &queue_;
+ // Store a reference to the timer queue, which we'll
+ // clear of all timers when this one expires.
+
+};
+
+class Thread_Bounded_Packet_Relay_Driver : public Bounded_Packet_Relay_Driver <Thread_Timer_Queue>
+{
+ // = TITLE
+ // Implements an example application that exercises
+ // <Thread_Timer_Queue> timer queue.
+ //
+ // = DESCRIPTION
+ // This class implements a simple test driver for the
+ // <Thread_Timer_Queue>. The <display_menu> hook method is
+ // called from the base class to print a menu specific to the
+ // thread implementation of the timer queue.
+public:
+
+ Thread_Bounded_Packet_Relay_Driver (void);
+ // ctor
+
+ ~Thread_Bounded_Packet_Relay_Driver (void);
+ // dtor
+
+ virtual int display_menu (void);
+ // display the user menu
+
+ virtual int init (void);
+ // initialize the driver
+
+ virtual int run (void);
+ // run the driver
+
+private:
+
+ User_Input_Task input_task_;
+ // Subclassed from ACE_Task.
+};
+
+#endif /* _THREAD_BOUNDED_PACKET_RELAY_H_ */
diff --git a/examples/Bounded_Packet_Relay/bpr_thread.cpp b/examples/Bounded_Packet_Relay/bpr_thread.cpp
new file mode 100644
index 00000000000..dbc01b5e74c
--- /dev/null
+++ b/examples/Bounded_Packet_Relay/bpr_thread.cpp
@@ -0,0 +1,51 @@
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// examples
+//
+// = FILENAME
+// bpr_thread.cpp
+//
+// = DESCRIPTION
+// Exercises drivers for a bounded packet relay, based on threaded timer queues.
+//
+// = AUTHORS
+// Chris Gill <cdgill@cs.wustl.edu>
+//
+// Based on the Timer_Queue_Test example written by:
+//
+// Douglas Schmidt <schmidt@cs.wustl.edu> &&
+// Sergio Flores-Gaitan <sergio@cs.wustl.edu>
+//
+// ============================================================================
+
+#include "ace/Auto_Ptr.h"
+#include "Thread_Bounded_Packet_Relay.h"
+
+ACE_RCSID(Bounded_Packet_Relay, bpr_thread, "$Id$")
+
+typedef Bounded_Packet_Relay_Driver<Thread_Timer_Queue>
+ THREAD_BOUNDED_PACKET_RELAY_DRIVER;
+
+int
+main (int, char *[])
+{
+ // Auto ptr ensures that the driver memory is released
+ // automatically.
+ THREAD_BOUNDED_PACKET_RELAY_DRIVER *tbprd;
+ ACE_NEW_RETURN (tbprd, Thread_Bounded_Packet_Relay_Driver, -1);
+
+ auto_ptr <THREAD_BOUNDED_PACKET_RELAY_DRIVER> driver (tbprd);
+
+ return driver->run ();
+}
+
+#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
+template class auto_ptr <THREAD_BOUNDED_PACKET_RELAY_DRIVER>;
+template class ACE_Auto_Basic_Ptr <THREAD_BOUNDED_PACKET_RELAY_DRIVER>;
+#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
+#pragma instantiate auto_ptr <THREAD_BOUNDED_PACKET_RELAY_DRIVER>
+#pragma instantiate ACE_Auto_Basic_Ptr <THREAD_BOUNDED_PACKET_RELAY_DRIVER>
+#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */