summaryrefslogtreecommitdiff
path: root/ACE/examples/Bounded_Packet_Relay
diff options
context:
space:
mode:
Diffstat (limited to 'ACE/examples/Bounded_Packet_Relay')
-rw-r--r--ACE/examples/Bounded_Packet_Relay/.cvsignore2
-rw-r--r--ACE/examples/Bounded_Packet_Relay/BPR_Drivers.cpp535
-rw-r--r--ACE/examples/Bounded_Packet_Relay/BPR_Drivers.h290
-rw-r--r--ACE/examples/Bounded_Packet_Relay/BPR_Drivers_T.cpp322
-rw-r--r--ACE/examples/Bounded_Packet_Relay/BPR_Drivers_T.h178
-rw-r--r--ACE/examples/Bounded_Packet_Relay/Bounded_Packet_Relay.mpc7
-rw-r--r--ACE/examples/Bounded_Packet_Relay/Makefile.am39
-rw-r--r--ACE/examples/Bounded_Packet_Relay/README194
-rw-r--r--ACE/examples/Bounded_Packet_Relay/Thread_Bounded_Packet_Relay.cpp770
-rw-r--r--ACE/examples/Bounded_Packet_Relay/Thread_Bounded_Packet_Relay.h400
-rw-r--r--ACE/examples/Bounded_Packet_Relay/bpr_thread.cpp122
11 files changed, 2859 insertions, 0 deletions
diff --git a/ACE/examples/Bounded_Packet_Relay/.cvsignore b/ACE/examples/Bounded_Packet_Relay/.cvsignore
new file mode 100644
index 00000000000..2f50e8efaee
--- /dev/null
+++ b/ACE/examples/Bounded_Packet_Relay/.cvsignore
@@ -0,0 +1,2 @@
+bpr_thread
+bpr_thread
diff --git a/ACE/examples/Bounded_Packet_Relay/BPR_Drivers.cpp b/ACE/examples/Bounded_Packet_Relay/BPR_Drivers.cpp
new file mode 100644
index 00000000000..fb646962004
--- /dev/null
+++ b/ACE/examples/Bounded_Packet_Relay/BPR_Drivers.cpp
@@ -0,0 +1,535 @@
+// $Id$
+
+// ============================================================================
+// = LIBRARY
+// examples
+//
+// = FILENAME
+// BPR_Driver.cpp
+//
+// = DESCRIPTION
+// This code builds an abstraction to factor out common code for
+// the different implementations of the Timer_Queue.
+//
+// = AUTHORS
+// Chris Gill <cdgill@cs.wustl.edu> and
+// Douglas C. Schmidt <schmidt@cs.wustl.edu>
+//
+// Based on the Timer Queue Test example written by
+//
+// Carlos O'Ryan <coryan@cs.wustl.edu> and
+// Douglas C. Schmidt <schmidt@cs.wustl.edu> and
+// Sergio Flores-Gaitan <sergio@cs.wustl.edu>
+//
+// ============================================================================
+
+#include "ace/OS_NS_sys_time.h"
+#include "BPR_Drivers.h"
+
+ACE_RCSID(Bounded_Packet_Relay, BPR_Drivers, "$Id$")
+
+// Constructor.
+
+Input_Device_Wrapper_Base::Input_Device_Wrapper_Base (ACE_Thread_Manager *input_task_mgr)
+ : ACE_Task_Base (input_task_mgr),
+ send_input_msg_cmd_ (0),
+ input_period_ (ACE_ONE_SECOND_IN_USECS),
+ is_active_ (0),
+ send_count_ (0)
+{
+}
+
+// Destructor.
+
+Input_Device_Wrapper_Base::~Input_Device_Wrapper_Base (void)
+{
+}
+
+// Sets send input message command in the input device driver object.
+
+int
+Input_Device_Wrapper_Base::set_send_input_msg_cmd (ACE_Command_Base *send_input_msg_cmd)
+{
+ // Set the new command. Input device is not responsible
+ // for deleting the old command, if any.
+ send_input_msg_cmd_ = send_input_msg_cmd;
+ return 0;
+}
+
+// Sets period between when input messages are produced.
+
+int
+Input_Device_Wrapper_Base::set_input_period (u_long input_period)
+{
+ input_period_ = input_period;
+ return 0;
+}
+
+// Sets count of messages to send.
+
+int
+Input_Device_Wrapper_Base::set_send_count (long count)
+{
+ send_count_ = count;
+ return 0;
+}
+
+// Request that the input device stop sending messages
+// and terminate its thread. Should return 1 if it will do so, 0
+// if it has already done so, or -1 if there is a problem doing so.
+
+int
+Input_Device_Wrapper_Base::request_stop (void)
+{
+ if (is_active_)
+ {
+ is_active_ = 0;
+ return 1;
+ }
+
+ return 0;
+}
+
+// This method runs the input device loop in the new thread.
+
+int
+Input_Device_Wrapper_Base::svc (void)
+{
+ ACE_Time_Value timeout;
+ ACE_Message_Block *message;
+
+ // Set a flag to indicate we're active.
+ is_active_ = 1;
+
+ // Start with the total count of messages to send.
+ for (current_count_ = send_count_;
+ // While we're still marked active, and there are packets to send.
+ (is_active_) && (current_count_ != 0);
+ )
+ {
+ // Create an input message to send.
+ message = create_input_message ();
+ if (message == 0)
+ {
+ if (is_active_)
+ {
+ is_active_ = 0;
+ ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n",
+ "Failed to create input message object"),
+ -1);
+ }
+
+ break;
+ }
+
+ // Make sure there is a send command object.
+ if (send_input_msg_cmd_ == 0)
+ {
+ delete message;
+ if (is_active_)
+ {
+ is_active_ = 0;
+ ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n",
+ "send message command object not instantiated"),
+ -1);
+ }
+
+ break;
+ }
+
+ // Send the input message.
+ if (send_input_msg_cmd_->execute ((void *) message) < 0)
+ {
+ delete message;
+ if (is_active_)
+ {
+ is_active_ = 0;
+ ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n",
+ "Failed executing send message command object"),
+ -1);
+ }
+
+ break;
+ }
+
+ // If all went well, decrement count of messages to send, and
+ // run the reactor event loop unti we get a timeout or something
+ // happens in a registered upcall.
+ if (current_count_ > 0)
+ --current_count_;
+
+ timeout = ACE_Time_Value (0, input_period_);
+ reactor_.run_event_loop (timeout);
+ }
+
+ is_active_ = 0;
+
+ return 0;
+}
+
+// Sends a newly created message block, carrying data read from the
+// underlying input device, by passing a pointer to the message block
+// to its command execution.
+
+int
+Input_Device_Wrapper_Base::send_input_message (ACE_Message_Block *amb)
+{
+ if (send_input_msg_cmd_)
+ return send_input_msg_cmd_->execute ((void *) amb);
+ else
+ {
+ if (is_active_)
+ ACE_ERROR ((LM_ERROR, "%t %p\n",
+ "Input_Device_Wrapper_Base::send_input_message: "
+ "command object not instantiated"));
+
+ return -1;
+ }
+}
+
+Output_Device_Wrapper_Base::~Output_Device_Wrapper_Base (void)
+{
+}
+
+// Constructor.
+
+Bounded_Packet_Relay::Bounded_Packet_Relay (ACE_Thread_Manager *input_task_mgr,
+ Input_Device_Wrapper_Base *input_wrapper,
+ Output_Device_Wrapper_Base *output_wrapper)
+ : is_active_ (0),
+ input_task_mgr_ (input_task_mgr),
+ input_wrapper_ (input_wrapper),
+ output_wrapper_ (output_wrapper),
+ queue_ (Bounded_Packet_Relay::DEFAULT_HWM,
+ Bounded_Packet_Relay::DEFAULT_LWM),
+ queue_hwm_ (Bounded_Packet_Relay::DEFAULT_HWM),
+ queue_lwm_ (Bounded_Packet_Relay::DEFAULT_LWM),
+ transmission_number_ (0),
+ packets_sent_ (0),
+ status_ (Bounded_Packet_Relay::UN_INITIALIZED),
+ transmission_start_ (ACE_Time_Value::zero),
+ transmission_end_ (ACE_Time_Value::zero)
+{
+ if (input_task_mgr_ == 0)
+ input_task_mgr_ = ACE_Thread_Manager::instance ();
+}
+
+// Destructor.
+
+Bounded_Packet_Relay::~Bounded_Packet_Relay (void)
+{
+ // Reactivate the queue, and then clear it.
+ queue_.activate ();
+ while (! queue_.is_empty ())
+ {
+ ACE_Message_Block *msg;
+ queue_.dequeue_head (msg);
+ delete msg;
+ }
+
+}
+
+// Requests output be sent to output device.
+
+int
+Bounded_Packet_Relay::send_input (void)
+{
+ // Don't block, return immediately if queue is empty.
+ ACE_Message_Block *item;
+
+ // Using a separate (non-const) time value
+ // is necessary on some platforms
+ ACE_Time_Value immediate (ACE_Time_Value::zero);
+
+ if (queue_.dequeue_head (item,
+ &immediate) < 0)
+ return 1;
+
+ // If a message block was dequeued, send it to the output device.
+
+ if (output_wrapper_->write_output_message ((void *) item) < 0)
+ {
+ if (is_active_)
+ ACE_ERROR ((LM_ERROR,
+ "%t %p\n",
+ "failed to write to output device object"));
+
+ return -1;
+ }
+
+ // If all went OK, increase count of packets sent.
+ ++packets_sent_;
+ return 0;
+}
+
+// Requests a transmission be started.
+
+int
+Bounded_Packet_Relay::start_transmission (u_long packet_count,
+ u_long arrival_period,
+ int logging_level)
+{
+ // Serialize access to start and end transmission calls, statistics
+ // reporting calls.
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->transmission_lock_, -1);
+
+ // If a transmission is already in progress, just return.
+ if (is_active_)
+ return 1;
+
+ // Set transmission in progress flag true.
+ is_active_ = 1;
+
+ // Update statistics for a new transmission.
+ ++transmission_number_;
+ packets_sent_ = 0;
+ status_ = STARTED;
+ transmission_start_ = ACE_OS::gettimeofday ();
+
+ // Reactivate the queue, and then clear it.
+ queue_.activate ();
+ while (! queue_.is_empty ())
+ {
+ ACE_Message_Block *msg;
+ queue_.dequeue_head (msg);
+ delete msg;
+ }
+
+ // Initialize the output device.
+ if (output_wrapper_->modify_device_settings ((void *) &logging_level) < 0)
+ {
+ status_ = ERROR_DETECTED;
+ transmission_end_ = ACE_OS::gettimeofday ();
+ is_active_ = 0;
+ ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n",
+ "failed to initialize output device object"),
+ -1);
+ }
+ // Initialize the input device.
+ if (input_wrapper_->modify_device_settings ((void *) &logging_level) < 0)
+ {
+ status_ = ERROR_DETECTED;
+ transmission_end_ = ACE_OS::gettimeofday ();
+ is_active_ = 0;
+ ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n",
+ "failed to initialize output device object"),
+ -1);
+ }
+ else if (input_wrapper_->set_input_period (arrival_period) < 0)
+ {
+ status_ = ERROR_DETECTED;
+ transmission_end_ = ACE_OS::gettimeofday ();
+ is_active_ = 0;
+ ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n",
+ "failed to initialize input device object"),
+ -1);
+ }
+ else if (input_wrapper_->set_send_count (packet_count) < 0)
+ {
+ status_ = ERROR_DETECTED;
+ transmission_end_ = ACE_OS::gettimeofday ();
+ is_active_ = 0;
+ ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n",
+ "failed to initialize input device object"),
+ -1);
+ }
+ // Activate the input device.
+ else if (input_wrapper_->activate () < 0)
+ {
+ status_ = ERROR_DETECTED;
+ transmission_end_ = ACE_OS::gettimeofday ();
+ is_active_ = 0;
+ ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n",
+ "failed to activate input device object"),
+ -1);
+ }
+
+ // If all went well, print a startup message and return success.
+ ACE_DEBUG ((LM_DEBUG,
+ "\n\nTransmission %u started\n\n",
+ transmission_number_));
+ return 0;
+}
+
+// Requests a transmission be ended.
+
+int
+Bounded_Packet_Relay::end_transmission (Transmission_Status status)
+{
+ // Serialize access to start and end transmission calls,
+ // statistics reporting calls.
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->transmission_lock_, -1);
+
+ // If a transmission is not already in progress, just return.
+ if (! is_active_)
+ return 1;
+
+ // Set transmission in progress flag false.
+ is_active_ = 0;
+
+ // Ask the the input thread to stop.
+ if (input_wrapper_->request_stop () < 0)
+ {
+ status_ = ERROR_DETECTED;
+ transmission_end_ = ACE_OS::gettimeofday ();
+ ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n",
+ "failed asking input device thread to stop"),
+ -1);
+ }
+
+ // Deactivate the queue, allowing all waiting threads to continue.
+ queue_.deactivate ();
+
+ // Wait for input thread to stop.
+ input_task_mgr_->wait_task (input_wrapper_);
+
+ // Reactivate the queue, and then clear it.
+ queue_.activate ();
+ while (! queue_.is_empty ())
+ {
+ ACE_Message_Block *msg;
+ queue_.dequeue_head (msg);
+ delete msg;
+ }
+
+ // If all went well, set passed status, stamp end time, print a
+ // termination message, and return success.
+ status_ = status;
+ transmission_end_ = ACE_OS::gettimeofday ();
+ ACE_DEBUG ((LM_DEBUG,
+ "\n\nTransmission %u ended with status: %s\n\n",
+ transmission_number_, status_msg ()));
+ return 0;
+}
+
+// Requests a report of statistics from the last transmission.
+
+int
+Bounded_Packet_Relay::report_statistics (void)
+{
+ // Serialize access to start and end transmission calls,
+ // statistics reporting calls.
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->transmission_lock_, -1);
+
+ // If a transmission is already in progress, just return.
+ if (is_active_)
+ return 1;
+
+ // Calculate duration of trasmission.
+ ACE_Time_Value duration (transmission_end_);
+ duration -= transmission_start_;
+
+ // Report transmission statistics.
+ ACE_DEBUG ((LM_DEBUG,
+ "\n\nStatisics for transmission %u:\n\n"
+ "Transmission status: %s\n"
+ "Start time: %d (sec) %d (usec)\n"
+ "End time: %d (sec) %d (usec)\n"
+ "Duration: %d (sec) %d (usec)\n"
+ "Packets relayed: %u\n\n",
+ transmission_number_, status_msg (),
+ transmission_start_.sec (),
+ transmission_start_.usec (),
+ transmission_end_.sec (),
+ transmission_end_.usec (),
+ duration.sec (),
+ duration.usec (),
+ packets_sent_));
+ return 0;
+}
+
+// Public entry point to which to push input.
+
+int
+Bounded_Packet_Relay::receive_input (void * arg)
+{
+ if (! arg)
+ {
+ if (is_active_)
+ ACE_ERROR ((LM_ERROR, "%t %p\n",
+ "Bounded_Packet_Relay::receive_input: "
+ "null argument"));
+
+ return -1;
+ }
+ ACE_Message_Block *message = static_cast<ACE_Message_Block *> (arg);
+ if (queue_.enqueue_tail (message) < 0)
+ {
+ if (is_active_)
+ ACE_ERROR ((LM_ERROR, "%t %p\n",
+ "Bounded_Packet_Relay::receive_input failed"));
+
+ return -1;
+ }
+
+ return 0;
+}
+
+// Get high water mark for relay queue.
+
+ACE_UINT32
+Bounded_Packet_Relay::queue_hwm (void)
+{
+ return queue_lwm_;
+}
+
+
+// Set high water mark for relay queue.
+
+void
+Bounded_Packet_Relay::queue_hwm (ACE_UINT32 hwm)
+{
+ queue_hwm_ = hwm;
+}
+
+// Get low water mark for relay queue.
+
+ACE_UINT32
+Bounded_Packet_Relay::queue_lwm (void)
+{
+ return queue_lwm_;
+}
+
+// Set low water mark for relay queue.
+
+void
+Bounded_Packet_Relay::queue_lwm (ACE_UINT32 lwm)
+{
+ queue_lwm_ = lwm;
+}
+
+
+
+// Returns string corresponding to current status.
+
+const char *
+Bounded_Packet_Relay::status_msg (void)
+{
+ const char *status_msg;
+ switch (status_)
+ {
+ case UN_INITIALIZED:
+ status_msg = "uninitialized";
+ break;
+ case STARTED:
+ status_msg = "in progress";
+ break;
+ case COMPLETED:
+ status_msg = "completed with all packets sent";
+ break;
+ case TIMED_OUT:
+ status_msg = "terminated by transmission duration timer";
+ break;
+ case CANCELLED:
+ status_msg = "cancelled by external control";
+ break;
+ case ERROR_DETECTED:
+ status_msg = "error was detected";
+ break;
+ default:
+ status_msg = "unknown transmission status";
+ break;
+ }
+
+ return status_msg;
+}
diff --git a/ACE/examples/Bounded_Packet_Relay/BPR_Drivers.h b/ACE/examples/Bounded_Packet_Relay/BPR_Drivers.h
new file mode 100644
index 00000000000..f39a544c512
--- /dev/null
+++ b/ACE/examples/Bounded_Packet_Relay/BPR_Drivers.h
@@ -0,0 +1,290 @@
+/* -*- C++ -*- */
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// examples
+//
+// = FILENAME
+// BPR_Drivers.h
+//
+// = DESCRIPTION
+// This code builds abstractions to factor out common code from
+// the different possible implementations of the Timer_Queue based
+// bounded packet relay example.
+//
+// = AUTHORS
+// Chris Gill <cdgill@cs.wustl.edu> and
+// Douglas C. Schmidt <schmidt@cs.wustl.edu>
+//
+// Based on the Timer Queue Test example written by
+//
+// Carlos O'Ryan <coryan@cs.wustl.edu> and
+// Douglas C. Schmidt <schmidt@cs.wustl.edu> and
+// Sergio Flores-Gaitan <sergio@cs.wustl.edu>
+//
+// ============================================================================
+
+#ifndef _BPR_DRIVERS_H_
+#define _BPR_DRIVERS_H_
+
+#include "ace/Functor.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+#include "ace/Reactor.h"
+#include "ace/Task.h"
+
+// forward declarations
+class Input_Device_Wrapper_Base;
+class Output_Device_Wrapper_Base;
+
+class Bounded_Packet_Relay
+{
+ // = TITLE
+ // This class defines a packet relay abstraction for a
+ // transmission bounded external commands to start and end the
+ // transmission. The transmission may be bounded by the number
+ // of packets to send, the dration of the transmission, or any
+ // other factors.
+ //
+ // = DESCRIPTION
+ // The relay abstraction implemented by this class registers a
+ // callback command with an input device wrapper, and relays
+ // input to an output device at a pace specified in the start
+ // transmission call.
+public:
+ // = Enumerates possible status values for a transmission.
+ enum Transmission_Status
+ {
+ UN_INITIALIZED,
+ STARTED,
+ COMPLETED,
+ TIMED_OUT,
+ CANCELLED,
+ ERROR_DETECTED
+ };
+
+ enum Queue_Defaults
+ {
+ DEFAULT_HWM = 0x7FFFFFFF,
+ DEFAULT_LWM = 0x7FFFFFFF
+ };
+
+ typedef int (Bounded_Packet_Relay::*ACTION) (void *);
+ // Command entry point type definition.
+
+ // = Initialization method
+
+ Bounded_Packet_Relay (ACE_Thread_Manager *input_task_mgr,
+ Input_Device_Wrapper_Base *input_wrapper,
+ Output_Device_Wrapper_Base *output_wrapper);
+ // Constructor.
+
+ virtual ~Bounded_Packet_Relay (void);
+ // Destructor.
+
+ int send_input (void);
+ // Requests output be sent to output device.
+
+ int start_transmission (u_long packet_count,
+ u_long arrival_period,
+ int logging_level);
+ // Requests a transmission be started.
+
+ int end_transmission (Transmission_Status status);
+ // Requests a transmission be ended.
+
+ int report_statistics (void);
+ // Requests a report of statistics from the last transmission.
+
+ // = Command accessible entry points.
+
+ int receive_input (void *);
+ // Public entry point to which to push input.
+
+ // = Accessors and mutators for relay settings
+
+ ACE_UINT32 queue_hwm (void);
+ // Get high water mark for relay queue.
+
+ void queue_hwm (ACE_UINT32 hwm);
+ // Set high water mark for relay queue.
+
+ ACE_UINT32 queue_lwm (void);
+ // Get low water mark for relay queue.
+
+ void queue_lwm (ACE_UINT32 lwm);
+ // Set low water mark for relay queue.
+
+private:
+ // = Concurrency Management.
+
+ int is_active_;
+ // flag for whether or not a transmission is active
+
+ ACE_Thread_Manager * input_task_mgr_;
+ // Thread manager for the input device task.
+
+ Input_Device_Wrapper_Base * input_wrapper_;
+ // Pointer to the input device wrapper.
+
+ Output_Device_Wrapper_Base * output_wrapper_;
+ // Pointer to the output device wrapper.
+
+ ACE_Message_Queue<ACE_SYNCH> queue_;
+ // Queue used to buffer input messages.
+
+ ACE_UINT32 queue_hwm_;
+ // High water mark for relay queue.
+
+ ACE_UINT32 queue_lwm_;
+ // Low water mark for relay queue.
+
+ ACE_SYNCH_MUTEX transmission_lock_;
+ // Lock for thread-safe synchronization of transmission startup and
+ // termination.
+
+ // = Transmission Statistics
+
+ const char *status_msg (void);
+ // Returns string corresponding to current status.
+
+ u_long transmission_number_;
+ // Number of transmissions sent.
+
+ u_long packets_sent_;
+ // Count of packets sent in the most recent transmission.
+
+ Transmission_Status status_;
+ // Status of the current or most recent transmission.
+
+ ACE_Time_Value transmission_start_;
+ // Start time of the most recent transmission.
+
+ ACE_Time_Value transmission_end_;
+ // Ending time of the most recent transmission.
+
+};
+
+class Input_Device_Wrapper_Base : public ACE_Task_Base
+{
+ // = TITLE
+ // This class defines an abstract base class for an input device
+ // wrapper that hides the details of the specific device and
+ // provides a consistent message passing interface without
+ // knowing anything about the implementation of the input device
+ // or the message receiver.
+ //
+ // The abstract base class ctor takes a command template object
+ // that is instantiated with the correct receiver and action
+ // types. This command object is used to send newly created input
+ // messages to the receiver.
+ //
+ // The abstract base class is designed to operate in an active
+ // "push" mode, sending input data to the receiver whenever the
+ // data is ready. The underlying device may be active, notifying
+ // the wrapper when data is ready, or may be passive in which
+ // case the wrapper must rely on a reactive and/or polling
+ // mechanism.
+ //
+ // = DESCRIPTION
+ // Derived classes are responsible for filling in concrete
+ // definitions for the abstract message creation method and the
+ // svc method.
+public:
+ // = Initialization and termination methods.
+ Input_Device_Wrapper_Base (ACE_Thread_Manager *input_task_mgr);
+ // Constructor.
+
+ virtual ~Input_Device_Wrapper_Base ();
+ // Destructor.
+
+ int set_send_input_msg_cmd (ACE_Command_Base *send_input_msg_cmd);
+ // Sets send input message command in the input device driver
+ // object.
+
+ int set_input_period (u_long input_period);
+ // Sets period (in usecs) between when inputs are created.
+
+ int set_send_count (long count);
+ // Sets count of messages to send.
+
+ int request_stop (void);
+ // Requests that the input device stop sending messages and
+ // terminate its thread. Should return 1 if it will do so, 0 if it
+ // has already done so, or -1 if there is a problem doing so.
+
+ virtual int svc (void);
+ // This method runs the input device loop in the new thread.
+
+ virtual int modify_device_settings (void *) = 0;
+ // Provides an abstract interface to allow modifying device
+ // settings.
+
+protected:
+ virtual ACE_Message_Block *create_input_message (void) = 0;
+ // Creates a new message block, carrying data read from the
+ // underlying input device.
+
+ virtual int send_input_message (ACE_Message_Block *);
+ // Sends a newly created message block, carrying data read from the
+ // underlying input device, by passing a pointer to the message
+ // block to its command execution.
+
+ ACE_Command_Base *send_input_msg_cmd_;
+ // Send newly created input message.
+
+ u_long input_period_;
+ // Period between when input values are produced (usecs).
+
+ ACE_Reactor reactor_;
+ // Reactor used to multiplex input streams, timeouts.
+
+ int is_active_;
+ // Flag to indicate whether or not input object is
+ // (and should remain) active.
+
+ long send_count_;
+ // Count of messages to send before stopping (-1 indicates the
+ // device should not stop).
+
+ long current_count_;
+ // Currently remaining count of messages to send before stopping
+ // (-1 indicates the device should not stop).
+
+};
+
+class Output_Device_Wrapper_Base
+{
+ // = TITLE
+ // This class defines an abstract base class for an output device
+ // wrapper that hides the details of the specific device and
+ // provides a consistent write method interface without knowing
+ // anything about the implementation.
+ //
+ // = DESCRIPTION
+ // The abstract methods write_output_message () and
+ // modify_device_settings () are defined in derived classes to
+ // write the contents of the passed message out the underlying
+ // output device, and update device settings, respectively.
+public:
+
+ virtual ~Output_Device_Wrapper_Base (void);
+
+ virtual int write_output_message (void *) = 0;
+ // Writes contents of the passed message block out to the underlying
+ // output device.
+
+ virtual int modify_device_settings (void *) = 0;
+ // Provides an abstract interface to allow modifying device
+ // settings.
+};
+
+// include the templates
+#include "BPR_Drivers_T.h"
+
+#endif /* _BPR_DRIVERS_H_ */
diff --git a/ACE/examples/Bounded_Packet_Relay/BPR_Drivers_T.cpp b/ACE/examples/Bounded_Packet_Relay/BPR_Drivers_T.cpp
new file mode 100644
index 00000000000..6e597dfb9e5
--- /dev/null
+++ b/ACE/examples/Bounded_Packet_Relay/BPR_Drivers_T.cpp
@@ -0,0 +1,322 @@
+// $Id$
+
+// ============================================================================
+// = LIBRARY
+// examples
+//
+// = FILENAME
+// BPR_Driver.cpp
+//
+// = DESCRIPTION
+// This code builds an abstraction to factor out common code for
+// the different implementations of the Timer_Queue.
+//
+// = AUTHORS
+// Chris Gill <cdgill@cs.wustl.edu> and
+// Douglas C. Schmidt <schmidt@cs.wustl.edu>
+//
+// Based on the Timer Queue Test example written by
+//
+// Carlos O'Ryan <coryan@cs.wustl.edu> and
+// Douglas C. Schmidt <schmidt@cs.wustl.edu> and
+// Sergio Flores-Gaitan <sergio@cs.wustl.edu>
+//
+// ============================================================================
+
+#ifndef _BPR_DRIVER_T_CPP_
+#define _BPR_DRIVER_T_CPP_
+
+// #include BPR_Drivers.h instead of BPR_Drivers_T.h
+// to avoid problems with circular includes
+#include "BPR_Drivers.h"
+#include "ace/OS_NS_string.h"
+#include "ace/OS_NS_unistd.h"
+
+ACE_RCSID(Bounded_Packet_Relay, BPR_Drivers_T, "$Id$")
+
+// Constructor.
+
+template <class TQ>
+Bounded_Packet_Relay_Driver<TQ>::Bounded_Packet_Relay_Driver (void)
+ : packet_count_cmd_ (0),
+ arrival_period_cmd_ (0),
+ transmit_period_cmd_ (0),
+ duration_limit_cmd_ (0),
+ logging_level_cmd_ (0),
+ run_transmission_cmd_ (0),
+ cancel_transmission_cmd_ (0),
+ report_stats_cmd_ (0),
+ shutdown_cmd_ (0),
+ packet_count_ (1000),
+ arrival_period_ (10000),
+ send_period_ (10000),
+ duration_limit_ (20000000),
+ logging_level_ (0)
+{
+}
+
+// Destructor.
+
+template <class TQ>
+Bounded_Packet_Relay_Driver<TQ>::~Bounded_Packet_Relay_Driver (void)
+{
+ // delete all instantiated command objects
+ delete packet_count_cmd_;
+ delete arrival_period_cmd_;
+ delete transmit_period_cmd_;
+ delete duration_limit_cmd_;
+ delete logging_level_cmd_;
+ delete run_transmission_cmd_;
+ delete cancel_transmission_cmd_;
+ delete report_stats_cmd_;
+ delete shutdown_cmd_;
+}
+
+// Parse the input and execute the corresponding command.
+
+template <class TQ> int
+Bounded_Packet_Relay_Driver<TQ>::parse_commands (const char *buf)
+{
+ int option;
+
+ if (::sscanf (buf, "%d", &option) <= 0)
+ // If there was an error reading the option simply try on the next
+ // line.
+ return 0;
+
+ switch (option)
+ {
+ case 1: // set packet count
+ {
+ u_long count;
+
+ // We just reread the option, this simplies parsing (since
+ // sscanf can do it for us).
+ if (::sscanf (buf, "%d %lu", &option, &count) < 2)
+ // If there was not enough information on the line, ignore
+ // option and try the next line.
+ return 0;
+ if (packet_count_cmd_->execute ((void *) &count) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%t %p\n",
+ "set packet count failed"),
+ -1);
+ break;
+ }
+ case 2: // Set the arrival period.
+ {
+ u_long usec;
+
+ // We just reread the option, this simplies parsing (since
+ // sscanf can do it for us).
+ if (::sscanf (buf, "%d %lu", &option, &usec) < 2)
+ // If there was not enough information on the line, ignore
+ // option and try the next line.
+ return 0;
+ if (arrival_period_cmd_->execute ((void *) &usec) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%t %p\n",
+ "set arrival period failed"),
+ -1);
+ break;
+ }
+ case 3: // Set transmit period.
+ {
+ u_long usec;
+
+ // We just reread the option, this simplies parsing (since
+ // sscanf can do it for us).
+ if (::sscanf (buf, "%d %lu", &option, &usec) < 2)
+ // If there was not enough information on the line, ignore
+ // option and try the next line.
+ return 0;
+ if (transmit_period_cmd_->execute ((void *) &usec) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%t %p\n",
+ "set transmit period failed"),
+ -1);
+ break;
+ }
+ case 4: // Set duration limit.
+ {
+ u_long usec;
+
+ // We just reread the option, this simplies parsing (since
+ // sscanf can do it for us).
+ if (::sscanf (buf, "%d %lu", &option, &usec) < 2)
+ // If there was not enough information on the line, ignore
+ // option and try the next line.
+ return 0;
+ if (duration_limit_cmd_->execute ((void *) &usec) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%t %p\n",
+ "\nSet duration limit failed."),
+ -1);
+ break;
+ }
+ case 5: // Set logging level.
+ {
+ int level;
+
+ // We just reread the option, this simplies parsing (since
+ // sscanf can do it for us).
+ if ((::sscanf (buf, "%d %d", &option, &level) < 2) ||
+ (level < 0) || (level > 7))
+ {
+ // If there was not enough information on the line, or the
+ // passed value was invalid, ignore and try again.
+ return 0;
+ }
+
+ if (logging_level_cmd_->execute ((void *) &level) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%t %p\n",
+ "set logging level failed"),
+ -1);
+ break;
+ }
+ case 6: // Run one transmission.
+ return run_transmission_cmd_->execute (0);
+ /* NOTREACHED */
+ case 7: // Cancel current transmission.
+ return cancel_transmission_cmd_->execute (0);
+ /* NOTREACHED */
+ case 8: // Report statistics.
+ return report_stats_cmd_->execute (0);
+ /* NOTREACHED */
+ case 9: // Shut down the driver.
+ return shutdown_cmd_->execute (0);
+ /* NOTREACHED */
+ default:
+ // Display an error message.
+ ACE_ERROR_RETURN ((LM_ERROR, "invalid input %s\n", buf), 0);
+ ACE_NOTREACHED (break);
+ /* NOTREACHED */
+ } /* ENDSWITCH */
+ return 0;
+}
+
+// Runs the test.
+
+template <class TQ> int
+Bounded_Packet_Relay_Driver<TQ>::run (void)
+{
+ this->init ();
+
+ // Process all the incoming events.
+
+ for (;;)
+ if (this->get_next_request () == -1)
+ return -1;
+
+ ACE_NOTREACHED (return 0);
+}
+
+// Gets the next request from the user input.
+
+template <class TQ> int
+Bounded_Packet_Relay_Driver<TQ>::get_next_request (void)
+{
+ char buf[BUFSIZ];
+
+ this->display_menu ();
+
+ // Reads input from the user.
+ if (this->read_input (buf, sizeof buf) <= 0)
+ return -1;
+
+ // Parse and run the command.
+ return this->parse_commands (buf);
+}
+
+// Reads input from the user from ACE_STDIN into the buffer specified.
+
+template <class TQ> ssize_t
+Bounded_Packet_Relay_Driver<TQ>::read_input (char *buf, size_t bufsiz)
+{
+ ACE_OS::memset (buf, 0, bufsiz);
+
+ // Wait for user to type commands. This call is automatically
+ // restarted when SIGINT or SIGALRM signals occur.
+ return ACE_OS::read (ACE_STDIN, buf, bufsiz);
+}
+
+// Get count of packets to send in a transmission.
+
+template <class TQ> u_long
+Bounded_Packet_Relay_Driver<TQ>::packet_count (void)
+{
+ return packet_count_;
+}
+
+// Set count of packets to send in a transmission.
+
+template <class TQ> void
+Bounded_Packet_Relay_Driver<TQ>::packet_count (u_long pc)
+{
+ packet_count_ = pc;
+}
+
+// Get rate at which input packets are to arrive.
+
+template <class TQ> u_long
+Bounded_Packet_Relay_Driver<TQ>::arrival_period (void)
+{
+ return arrival_period_;
+}
+
+// Set rate at which input packets are to arrive.
+
+template <class TQ> void
+Bounded_Packet_Relay_Driver<TQ>::arrival_period (u_long ap)
+{
+ arrival_period_ = ap;
+}
+
+// Get rate at which packets are to be relayed (usec).
+
+template <class TQ> u_long
+Bounded_Packet_Relay_Driver<TQ>::send_period (void)
+{
+ return send_period_;
+}
+
+// Set rate at which packets are to be relayed (usec).
+
+template <class TQ> void
+Bounded_Packet_Relay_Driver<TQ>::send_period (u_long sp)
+{
+ send_period_ = sp;
+}
+
+// Get limit on the duration of the transmission (usec).
+
+template <class TQ> u_long
+Bounded_Packet_Relay_Driver<TQ>::duration_limit (void)
+{
+ return duration_limit_;
+}
+
+// Set limit on the duration of the transmission (usec).
+
+template <class TQ> void
+Bounded_Packet_Relay_Driver<TQ>::duration_limit (u_long dl)
+{
+ duration_limit_ = dl;
+}
+// Get logging level.
+
+template <class TQ> int
+Bounded_Packet_Relay_Driver<TQ>::logging_level (void)
+{
+ return logging_level_;
+}
+
+// Set logging level.
+
+template <class TQ> void
+Bounded_Packet_Relay_Driver<TQ>::logging_level (int ll)
+{
+ logging_level_ = ll;
+}
+#endif /* _BPR_DRIVER_T_CPP_ */
diff --git a/ACE/examples/Bounded_Packet_Relay/BPR_Drivers_T.h b/ACE/examples/Bounded_Packet_Relay/BPR_Drivers_T.h
new file mode 100644
index 00000000000..4d0ca6bdfb4
--- /dev/null
+++ b/ACE/examples/Bounded_Packet_Relay/BPR_Drivers_T.h
@@ -0,0 +1,178 @@
+/* -*- C++ -*- */
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// examples
+//
+// = FILENAME
+// BPR_Drivers_T.h
+//
+// = DESCRIPTION
+// This code factors out common class templates for use in
+// the different possible implementations of the Timer_Queue
+// based bounded packet relay example.
+//
+// = AUTHORS
+// Chris Gill <cdgill@cs.wustl.edu> and
+// Douglas C. Schmidt <schmidt@cs.wustl.edu>
+//
+// Based on the Timer Queue Test example written by
+//
+// Carlos O'Ryan <coryan@cs.wustl.edu> and
+// Douglas C. Schmidt <schmidt@cs.wustl.edu> and
+// Sergio Flores-Gaitan <sergio@cs.wustl.edu>
+//
+// ============================================================================
+
+#ifndef _BPR_DRIVERS_T_H_
+#define _BPR_DRIVERS_T_H_
+
+#include "ace/Functor.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+// Forward declarations.
+class Input_Device_Wrapper_Base;
+class Output_Device_Wrapper_Base;
+
+template <class TQ>
+class Bounded_Packet_Relay_Driver
+{
+ // = TITLE
+ // This abstract base class provides a simple abstraction for a
+ // test driver for the bounded packet relay example.
+ //
+ // = DESCRIPTION
+ // This is the place where the common code to test the different
+ // implementations of the timer queue resides. This class has
+ // the logic for the parse_commands () method, the run (),
+ // read_input () and get_next_request () methods. Subclasses can
+ // override these methods if there is some logic that is specific
+ // to that implementation.
+public:
+ Bounded_Packet_Relay_Driver (void);
+ // Constructor.
+
+ virtual ~Bounded_Packet_Relay_Driver (void);
+ // Destructor.
+
+ virtual int parse_commands (const char *buf);
+ // Breaks up the input string buffer into pieces and executes the
+ // appropriate method to handle that operation.
+
+ virtual int run (void);
+ // This is the main entry point for the driver. The user of the
+ // class should normally invoke this method. Returns 0 when
+ // successful, or 0 otherwise.
+
+ virtual int get_next_request (void);
+ // This internal method gets the next request from the user.
+ // Returns -1 when user wants to exit. Returns 0 otherwise.
+
+ virtual ssize_t read_input (char *buf, size_t bufsiz);
+ // Reads input from the user into the buffer <buf> with a maximum of
+ // <bufsiz> bytes. Returns the amount of bytes actually read
+ // Otherwise, a -1 is returned and errno is set to indicate the
+ // error.
+
+ virtual int display_menu (void)=0;
+ // Prints the user interface for the driver to STDERR.
+
+ virtual int init (void)=0;
+ // Initializes values and operations for the driver.
+
+ u_long packet_count (void);
+ // Get count of packets to send in a transmission.
+
+ void packet_count (u_long pc);
+ // Set count of packets to send in a transmission.
+
+ u_long arrival_period (void);
+ // Get rate at which input packets are to arrive.
+
+ void arrival_period (u_long ap);
+ // Set rate at which input packets are to arrive.
+
+ u_long send_period (void);
+ // Get rate at which packets are to be relayed (usec).
+
+ void send_period (u_long sp);
+ // Set rate at which packets are to be relayed (usec).
+
+ u_long duration_limit (void);
+ // Get limit on the duration of the transmission (usec).
+
+ void duration_limit (u_long dl);
+ // Set limit on the duration of the transmission (usec).
+
+ int logging_level (void);
+ // Get logging level.
+
+ void logging_level (int ll);
+ // Set logging level.
+
+protected:
+ // = Major Driver Mechanisms
+
+ TQ timer_queue_;
+ // Timer queue for transmission timeouts.
+
+ // = Set of commands to be executed.
+
+ ACE_Command_Base *packet_count_cmd_;
+ // Set packet count command.
+
+ ACE_Command_Base *arrival_period_cmd_;
+ // Set arrival period command.
+
+ ACE_Command_Base *transmit_period_cmd_;
+ // Set transmit period command.
+
+ ACE_Command_Base *duration_limit_cmd_;
+ // Set duration limit command.
+
+ ACE_Command_Base *logging_level_cmd_;
+ // Set logging level command.
+
+ ACE_Command_Base *run_transmission_cmd_;
+ // Run transmission command.
+
+ ACE_Command_Base *cancel_transmission_cmd_;
+ // Cancel transmission command.
+
+ ACE_Command_Base *report_stats_cmd_;
+ // Report statistics command.
+
+ ACE_Command_Base *shutdown_cmd_;
+ // Shut down the driver.
+
+private:
+ u_long packet_count_;
+ // Count of packets to send in a transmission.
+
+ u_long arrival_period_;
+ // Rate at which input packets are to arrive.
+
+ u_long send_period_;
+ // Rate at which packets are to be relayed (usec).
+
+ u_long duration_limit_;
+ // Limit on the duration of the transmission (usec).
+
+ int logging_level_;
+ // Logging level.
+};
+
+#if defined (ACE_TEMPLATES_REQUIRE_SOURCE)
+#include "BPR_Drivers_T.cpp"
+#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */
+
+#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA)
+#pragma implementation ("BPR_Drivers_T.cpp")
+#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */
+
+#endif /* _BPR_DRIVERS_T_H_ */
diff --git a/ACE/examples/Bounded_Packet_Relay/Bounded_Packet_Relay.mpc b/ACE/examples/Bounded_Packet_Relay/Bounded_Packet_Relay.mpc
new file mode 100644
index 00000000000..087cc5ac3be
--- /dev/null
+++ b/ACE/examples/Bounded_Packet_Relay/Bounded_Packet_Relay.mpc
@@ -0,0 +1,7 @@
+// -*- MPC -*-
+// $Id$
+
+project : aceexe {
+ exename = bpr_thread
+ macros += ACE_HAS_DEFERRED_TIMER_COMMANDS
+}
diff --git a/ACE/examples/Bounded_Packet_Relay/Makefile.am b/ACE/examples/Bounded_Packet_Relay/Makefile.am
new file mode 100644
index 00000000000..5fb14ba2d05
--- /dev/null
+++ b/ACE/examples/Bounded_Packet_Relay/Makefile.am
@@ -0,0 +1,39 @@
+## Process this file with automake to create Makefile.in
+##
+## $Id$
+##
+## This file was generated by MPC. Any changes made directly to
+## this file will be lost the next time it is generated.
+##
+## MPC Command:
+## /acebuilds/ACE_wrappers-repository/bin/mwc.pl -include /acebuilds/MPC/config -include /acebuilds/MPC/templates -feature_file /acebuilds/ACE_wrappers-repository/local.features -noreldefs -type automake -exclude build,Kokyu
+
+ACE_BUILDDIR = $(top_builddir)
+ACE_ROOT = $(top_srcdir)
+
+## Makefile.Bounded_Packet_Relay.am
+noinst_PROGRAMS = bpr_thread
+
+bpr_thread_CPPFLAGS = \
+ -I$(ACE_ROOT) \
+ -I$(ACE_BUILDDIR) \
+ -DACE_HAS_DEFERRED_TIMER_COMMANDS
+
+bpr_thread_SOURCES = \
+ BPR_Drivers.cpp \
+ Thread_Bounded_Packet_Relay.cpp \
+ bpr_thread.cpp \
+ BPR_Drivers.h \
+ BPR_Drivers_T.h \
+ Thread_Bounded_Packet_Relay.h
+
+bpr_thread_LDADD = \
+ $(ACE_BUILDDIR)/ace/libACE.la
+
+## Clean up template repositories, etc.
+clean-local:
+ -rm -f *~ *.bak *.rpo *.sym lib*.*_pure_* core core.*
+ -rm -f gcctemp.c gcctemp so_locations *.ics
+ -rm -rf cxx_repository ptrepository ti_files
+ -rm -rf templateregistry ir.out
+ -rm -rf ptrepository SunWS_cache Templates.DB
diff --git a/ACE/examples/Bounded_Packet_Relay/README b/ACE/examples/Bounded_Packet_Relay/README
new file mode 100644
index 00000000000..5757224c510
--- /dev/null
+++ b/ACE/examples/Bounded_Packet_Relay/README
@@ -0,0 +1,194 @@
+1. INTRODUCTION
+
+This directory contains an example that illustrates how to use both
+threaded and reactive concurrency mechanisms in ACE. The example
+application schedules and processes heterogenerous user input and
+timer-based events in the context of a bounded packet relay mechanism.
+
+In this example, a transmission begins, packets arrive from an input device
+object, and are transferred to an output device object by a relay object at
+a specified pace. The transfer continues until all packets have been
+relayed, a duration limit expires, or the transmission is cancelled.
+
+User input is handled concurrently with a running transmission. You
+can run a transmission, cancel a transmission, change transmission
+parameters, view statistics from the most recent transmission, or exit
+the program, using selections from an interactive text-based menu.
+In addition, the example program can be run in batch mode, with the
+appropriate commands piped to the program's standard input stream.
+
+Transmission parameters are intialized to default values. Transmission
+parameter values persist until/unless they are subsequently modified by an
+appropriate command. If an invalid value for a command is given, or a run
+or report command is issued while a transmission is in progress, the
+offending command has no effect, and an error message is generated.
+
+2. USER INTERFACE
+
+Commands that can be given to the program include the following:
+
+Settings commands:
+
+ 1 <number of packets to relay in one transmission>
+
+ Minimum value is 1 packet, defaults to 1000 packets.
+
+ 2 <input packet arrival period (in usec)>
+
+ Minimum value is 1 usec, defaults to 10000 usec (10 msec).
+
+ 3 <output packet transmission period (in usec)>
+
+ Minimum value is 1 usec, defaults to 10000 usec (10 msec).
+
+ 4 <limit on duration of transmission (in usec)>
+
+ Minimum value is 1 usec, defaults to 20000000 usec (20 sec).
+ A value of 0 is also a valid input, in which case no limit
+ will be placed on the duration of the transmission (it will
+ end when all packets have been relayed from the input device
+ to the output device).
+
+ 5 <logging level>
+
+ 0 - does no logging
+ 1 - logs packets created by the input device
+ 2 - logs packets consumed by the output device
+ 4 - prints contents of packets consumed by the output device
+
+ To set several flags, pass their sum as the logging level value:
+ e.g., a logging level value of 7 turns on all possible logging.
+
+Action commands:
+
+ 6 - runs a transmission using the current settings
+
+ 7 - cancels a transmission if there is one running
+
+ 8 - reports statistics from the most recent transmission
+
+ 9 - quits the program
+
+3. APPLICATION DESIGN
+
+3.1. KEY COMPONENTS
+
+The design of this example application consists of four main
+components: the driver object, the relay object, the input device
+object, and the output device object.
+
+The driver object is responsible for receiving user input and overall handling
+of user input commands. The driver object is active, with separate threads
+for receiving user input and managing its transmission timer queue. This
+allows the user to issue commands while a transmission is in progress. The
+driver object uses an ACE_Thread_Timer_Queue_Adapter, which is derived from
+ACE_Task_Base. The driver object starts another active object, called
+User_Input_Task, which is also derived from ACE_Task_Base. This allows both
+the timer queue and the user input object to be made active, running in their
+own threads of control.
+
+The relay object is passive, managing a message queue and necessary
+locks to allow safe access from multiple threads. It provides methods
+to receive and enqueue a mesage from the input device, dequeue a
+message and send it to the output device, and to start or end a
+transmission. It uses ACE_Message_Queue (which contains ACE_Message_Block
+objects) and ACE_Thread_Mutex objects to implement this functionality.
+
+The input object is active, managing timeouts and input events in its
+own thread. The input object is also reactive, using an ACE_Reactor to
+allow response to multiple input handles as well as to do polling at
+specific timeouts. The input pseudo-device wrapper in this example
+does not make use of input handles and only does timed polling, but
+extending this only requires registering the appropriate input handles
+and handlers with the reactor. The input object is derived from
+ACE_Task_Base, and is activated by the relay object when a new
+transmission starts. The input object packages each data packet in an
+ACE_Message_Block before sending it to the relay object.
+
+The output object is passive. If logging is turned on, it will report
+the arrival time, relative to the start of the transmission, of each
+output message, and the contents of that message. The output object
+will also "consume" each ACE_Message_Block passed to it, calling
+delete on the passed pointer.
+
+3.2. RUN-TIME CHARACTERSITICS
+
+When the user initiates a transmission, the appropriate settings are passed
+by the driver to the relay object's start transmission method. The relay
+object tries to start a new transmission. If another transmission is in
+progress, the method returns an error. Otherwise, the relay object's start
+transmission method initializes itself and the input and output device
+objects, activates the input device object, and stores the handle for
+the new input device thread.
+
+The driver then constructs a timeout handler with a count of the
+number of messages to relay and a send timeout value, and pushes a
+timer with this handler onto the timer queue. If there is a limit on
+the duration of the transmission, the driver constructs a different
+handler for end-of-transmission, and pushes a timer for the end of
+the transmission with this handler onto the timer queue as well. When
+the user issues a cancel transmission command, the driver calls the
+relay's end transmission method.
+
+When a send timeout expires, the handler (running in the timer queue
+thread) calls the send method of the relay. If there are any enqueued
+messages from the input device object in its queue, the relay object's
+send method will dequeue a message, pass it to the output device
+object, and return success. If there are no messages in the queue,
+the relay object's send method will return failure. If the send was
+successful, the handler will decrement its count of remaining
+messages. If the count is greater than zero, the handler will
+register a new send timer for itself and exit. If the count is zero,
+then the handler will call the relay's end transmission method, clear
+the timer queue, and mark itself inactive before exiting.
+
+Similarly, if the end-of-transmission timer expires before all packets
+have been sent, that handler will call the relay's end transmission
+method, clear the timer queue, release the semaphore, and then exit.
+
+When the relay's end transmission method is called, it marks the relay
+itself inactive, and calls the input device's deactivate method, which
+sets the input device's activity flag to inactive (see below). The
+relay's end transmission method then waits until the input device thread
+exits, before returning.
+
+While it is still active, the input device thread blocks on a reactor
+timeout for the duration it was given by the relay. When the timeout
+expires, the input device checks a flag to see if it is still active.
+If the flag says the input device is inactive, the thread simply
+exits. This allows cancellation of the input device when a
+transmission has ended. If the flag says it is still active, the
+thread builds a new character buffer, and wraps this with a new
+ACE_Message_Block. The input device passes this message block to the
+execution method of the send input message command object with which
+it was constructed. This level of indirection allows the input device
+to be used with arbitrary types, so that it could for example be
+connected directly to an output device. The input device also
+maintains a count of the number of messages it has sent. Once the
+input device has sent all its messages, it marks itself inactive, and
+its thread simply exits.
+
+4. ACCESSING THE SOURCE CODE
+
+The files for this example are located in
+$ACE_ROOT/examples/Bounded_Packet_Relay in the latest release of ACE,
+which is located at
+
+http://www.cs.wustl.edu/~schmidt/ACE_wrappers/ACE.tar.gz
+
+Source Files: Thread_Bounded_Packet_Relay.h
+ Thread_Bounded_Packet_Relay.cpp
+ BPR_Driver.h
+ BPR_Driver.cpp
+ BPR_Driver_T.h
+ BPR_Driver_T.cpp
+ bpr_thread.cpp
+
+Make file: Makefile
+
+Doc file: README (this file)
+
+Executable: bpr_thread
+
+
+
diff --git a/ACE/examples/Bounded_Packet_Relay/Thread_Bounded_Packet_Relay.cpp b/ACE/examples/Bounded_Packet_Relay/Thread_Bounded_Packet_Relay.cpp
new file mode 100644
index 00000000000..a2f179a5348
--- /dev/null
+++ b/ACE/examples/Bounded_Packet_Relay/Thread_Bounded_Packet_Relay.cpp
@@ -0,0 +1,770 @@
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// examples
+//
+// = FILENAME
+// Thread_Bounded_Packet_Relay.cpp
+//
+// = DESCRIPTION
+// Method definitions for the threaded-bounded packet relay class.
+//
+// = AUTHORS
+// Chris Gill <cdgill@cs.wustl.edu> and
+// Douglas C. Schmidt <schmidt@cs.wustl.edu>
+//
+// Based on the Timer Queue Test example written by
+//
+// Carlos O'Ryan <coryan@cs.wustl.edu> and
+// Douglas C. Schmidt <schmidt@cs.wustl.edu> and
+// Sergio Flores-Gaitan <sergio@cs.wustl.edu>
+//
+// ============================================================================
+
+#include "ace/OS_NS_string.h"
+#include "ace/OS_NS_sys_time.h"
+#include "ace/Condition_T.h"
+#include "ace/Null_Mutex.h"
+
+#include "Thread_Bounded_Packet_Relay.h"
+
+typedef Thread_Bounded_Packet_Relay_Driver::MYCOMMAND DRIVER_CMD;
+typedef ACE_Command_Callback<BPR_Handler_Base, BPR_Handler_Base::ACTION> HANDLER_CMD;
+typedef ACE_Command_Callback<Send_Handler, Send_Handler::ACTION> SEND_HANDLER_CMD;
+
+ACE_RCSID(Bounded_Packet_Relay, Thread_Bounded_Packet_Relay, "$Id$")
+
+// Constructor.
+
+Text_Input_Device_Wrapper::Text_Input_Device_Wrapper (ACE_Thread_Manager *input_task_mgr,
+ size_t read_length,
+ const char* text,
+ int logging)
+ : Input_Device_Wrapper_Base (input_task_mgr),
+ read_length_ (read_length),
+ text_ (text),
+ index_ (0),
+ logging_ (logging),
+ packet_count_ (0)
+
+{
+}
+
+// Destructor.
+
+Text_Input_Device_Wrapper::~Text_Input_Device_Wrapper (void)
+{
+}
+
+// Modifies device settings based on passed pointer to a u_long.
+
+int
+Text_Input_Device_Wrapper::modify_device_settings (void *logging)
+{
+ packet_count_ = 0;
+
+ if (logging)
+ logging_ = *static_cast<int *> (logging);
+ else
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Text_Input_Device_Wrapper::modify_device_settings: "
+ "null argument"),
+ -1);
+ return 0;
+}
+
+
+// Creates a new message block, carrying data
+// read from the underlying input device.
+
+ACE_Message_Block *
+Text_Input_Device_Wrapper::create_input_message (void)
+{
+
+ // Construct a new message block to send.
+ ACE_Message_Block *mb;
+ ACE_NEW_RETURN (mb,
+ ACE_Message_Block (read_length_),
+ 0);
+
+ // Zero out a "read" buffer to hold data.
+ char read_buf [BUFSIZ];
+ ACE_OS::memset (read_buf, 0, BUFSIZ);
+
+ // Loop through the text, filling in data to copy into the read
+ // buffer (leaving room for a terminating zero).
+ for (size_t i = 0; i < read_length_ - 1; ++i)
+ {
+ read_buf [i] = text_ [index_];
+ index_ = (index_ + 1) % ACE_OS::strlen (text_);
+ }
+
+ // Copy buf into the Message_Block and update the wr_ptr ().
+ if (mb->copy (read_buf, read_length_) < 0)
+ {
+ delete mb;
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "read buffer copy failed"),
+ 0);
+ }
+
+ // log packet creation if logging is turned on
+ if (logging_ & Text_Input_Device_Wrapper::LOG_MSGS_CREATED)
+ {
+ ++packet_count_;
+ ACE_DEBUG ((LM_DEBUG, "input message %d created\n",
+ packet_count_));
+ }
+
+ return mb;
+}
+
+// Constructor.
+
+Text_Output_Device_Wrapper::Text_Output_Device_Wrapper (int logging)
+ : logging_ (logging)
+{
+}
+
+// Consume and possibly print out the passed message.
+
+int
+Text_Output_Device_Wrapper::write_output_message (void *message)
+{
+ if (message)
+ {
+ ++packet_count_;
+
+ if (logging_ & Text_Output_Device_Wrapper::LOG_MSGS_RCVD)
+ ACE_DEBUG ((LM_DEBUG, "output message %d received\n",
+ packet_count_));
+
+ if (logging_ & Text_Output_Device_Wrapper::PRINT_MSGS_RCVD)
+ ACE_DEBUG ((LM_DEBUG, "output message %d:\n[%s]\n",
+ packet_count_,
+ static_cast<ACE_Message_Block *> (message)->
+ rd_ptr ()));
+
+ delete static_cast<ACE_Message_Block *> (message);
+ return 0;
+ }
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Text_Output_Device_Wrapper::"
+ "write_output_message: null argument"), -1);
+}
+
+// Modifies device settings based on passed pointer to a u_long.
+
+int
+Text_Output_Device_Wrapper::modify_device_settings (void *logging)
+{
+ packet_count_ = 0;
+
+ if (logging)
+ logging_ = *static_cast<int *> (logging);
+ else
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Text_Output_Device_Wrapper::modify_device_settings: "
+ "null argument"),
+ -1);
+ return 0;
+}
+
+// Constructor.
+
+User_Input_Task::User_Input_Task (Bounded_Packet_Relay *relay,
+ Thread_Timer_Queue *queue,
+ Thread_Bounded_Packet_Relay_Driver &tbprd)
+ : ACE_Task_Base (ACE_Thread_Manager::instance ()),
+ usecs_ (ACE_ONE_SECOND_IN_USECS),
+ relay_ (relay),
+ queue_ (queue),
+ driver_ (tbprd)
+{
+}
+
+// Destructor.
+
+User_Input_Task::~User_Input_Task (void)
+{
+ this->clear_all_timers ();
+}
+
+// Runs the main event loop.
+
+int
+User_Input_Task::svc (void)
+{
+ for (;;)
+ // Call back to the driver's implementation of how to read and
+ // parse input.
+ if (this->driver_.get_next_request () == -1)
+ break;
+
+ // We are done.
+ this->relay_->end_transmission (Bounded_Packet_Relay::CANCELLED);
+ this->queue_->deactivate ();
+ ACE_DEBUG ((LM_DEBUG,
+ "terminating user input thread\n"));
+ this->clear_all_timers ();
+ return 0;
+}
+
+// Sets the number of packets for the next transmission.
+
+int
+User_Input_Task::set_packet_count (void *argument)
+{
+ if (argument)
+ {
+ driver_.packet_count (*static_cast<int *> (argument));
+ return 0;
+ }
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "User_Input_Task::set_packet_count: null argument"),
+ -1);
+}
+
+// Sets the input device packet arrival period (usecs) for the next
+// transmission.
+
+int
+User_Input_Task::set_arrival_period (void *argument)
+{
+ if (argument)
+ {
+ driver_.arrival_period (*static_cast<int *> (argument));
+ return 0;
+ }
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "User_Input_Task::set_arrival_period: null argument"),
+ -1);
+}
+
+// Sets the period between output device sends (usecs) for the next
+// transmission.
+
+int
+User_Input_Task::set_send_period (void *argument)
+{
+ if (argument)
+ {
+ driver_.send_period (*static_cast<int *> (argument));
+ return 0;
+ }
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "User_Input_Task::set_send_period: null argument"),
+ -1);
+}
+
+// Sets a limit on the transmission duration (usecs).
+
+int
+User_Input_Task::set_duration_limit (void *argument)
+{
+ if (argument)
+ {
+ driver_.duration_limit (*static_cast<int *> (argument));
+ return 0;
+ }
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "User_Input_Task::set_duration_limit: null argument"),
+ -1);
+}
+
+// Sets logging level (0 or 1) for output device for the next
+// transmission.
+
+int
+User_Input_Task::set_logging_level (void *argument)
+{
+ if (argument)
+ {
+ driver_.logging_level (*static_cast<int *> (argument));
+ return 0;
+ }
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "User_Input_Task::set_logging_level: null argument"),
+ -1);
+}
+
+// Runs the next transmission (if one is not in progress).
+
+int
+User_Input_Task::run_transmission (void *)
+{
+ if (relay_)
+ {
+ switch (relay_->start_transmission (driver_.packet_count (),
+ driver_.arrival_period (),
+ driver_.logging_level ()))
+ {
+ case 1:
+ ACE_DEBUG ((LM_DEBUG,
+ "\nRun transmission: "
+ " Transmission already in progress\n"));
+ return 0;
+ /* NOTREACHED */
+ case 0:
+ {
+ ACE_Time_Value now = ACE_OS::gettimeofday ();
+ ACE_Time_Value send_every (0, driver_.send_period ());
+ ACE_Time_Value send_at (send_every + now);
+
+ Send_Handler *send_handler;
+ ACE_NEW_RETURN (send_handler,
+ Send_Handler (driver_.packet_count (),
+ send_every,
+ *relay_,
+ *queue_,
+ driver_),
+ -1);
+ if (queue_->schedule (send_handler, 0, send_at) < 0)
+ {
+ delete send_handler;
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "User_Input_Task::run_transmission: "
+ "failed to schedule send handler"),
+ -1);
+ }
+ if (driver_.duration_limit ())
+ {
+ ACE_Time_Value terminate_at (0, driver_.duration_limit ());
+ terminate_at += now;
+
+ Termination_Handler *termination_handler;
+
+ termination_handler =
+ new Termination_Handler (*relay_,
+ *queue_,
+ driver_);
+
+ if (! termination_handler)
+ {
+ this->clear_all_timers ();
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "User_Input_Task::run_transmission: "
+ "failed to allocate termination "
+ "handler"),
+ -1);
+ }
+ if (queue_->schedule (termination_handler,
+ 0, terminate_at) < 0)
+ {
+ delete termination_handler;
+ this->clear_all_timers ();
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "User_Input_Task::run_transmission: "
+ "failed to schedule termination "
+ "handler"),
+ -1);
+ }
+ }
+ return 0;
+ }
+ /* NOTREACHED */
+ default:
+ return -1;
+ /* NOTREACHED */
+ }
+ }
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "User_Input_Task::run_transmission: "
+ "relay not instantiated"),
+ -1);
+}
+
+// Ends the current transmission (if one is in progress).
+
+int
+User_Input_Task::end_transmission (void *)
+{
+ if (relay_)
+ {
+ switch (relay_->end_transmission (Bounded_Packet_Relay::CANCELLED))
+ {
+ case 1:
+ ACE_DEBUG ((LM_DEBUG,
+ "\nEnd transmission: "
+ "no transmission in progress\n"));
+ /* Fall through to next case */
+ case 0:
+ // Cancel any remaining timers.
+ this->clear_all_timers ();
+ return 0;
+ /* NOTREACHED */
+ default:
+ return -1;
+ /* NOTREACHED */
+ }
+ }
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "User_Input_Task::end_transmission: "
+ "relay not instantiated"),
+ -1);
+}
+
+// Reports statistics for the previous transmission
+// (if one is not in progress).
+
+int
+User_Input_Task::report_stats (void *)
+{
+ if (relay_)
+ {
+ switch (relay_->report_statistics ())
+ {
+ case 1:
+ ACE_DEBUG ((LM_DEBUG,
+ "\nRun transmission: "
+ "\ntransmission already in progress\n"));
+ return 0;
+ /* NOTREACHED */
+
+ case 0:
+ this->clear_all_timers ();
+ return 0;
+ /* NOTREACHED */
+
+ default:
+ return -1;
+ /* NOTREACHED */
+ }
+ }
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "User_Input_Task::report_stats: "
+ "relay not instantiated"),
+ -1);
+}
+
+// Shut down the task.
+
+int
+User_Input_Task::shutdown (void *)
+{
+ // Clear any outstanding timers.
+ this->clear_all_timers ();
+
+#if !defined (ACE_LACKS_PTHREAD_CANCEL)
+ // Cancel the thread timer queue task "preemptively."
+ ACE_Thread::cancel (this->queue_->thr_id ());
+#else
+ // Cancel the thread timer queue task "voluntarily."
+ this->queue_->deactivate ();
+#endif /* ACE_LACKS_PTHREAD_CANCEL */
+
+ // -1 indicates we are shutting down the application.
+ return -1;
+}
+
+// Helper method: clears all timers.
+
+int
+User_Input_Task::clear_all_timers (void)
+{
+ // loop through the timers in the queue, cancelling each one
+ for (ACE_Timer_Node_T <ACE_Event_Handler *> *node;
+ (node = queue_->timer_queue ()->get_first ()) != 0;
+ )
+ queue_->timer_queue ()->cancel (node->get_timer_id (), 0, 0);
+
+ return 0;
+}
+
+// Constructor.
+
+BPR_Handler_Base::BPR_Handler_Base (Bounded_Packet_Relay &relay,
+ Thread_Timer_Queue &queue)
+ : relay_ (relay),
+ queue_ (queue)
+{
+}
+
+// Destructor.
+
+BPR_Handler_Base::~BPR_Handler_Base (void)
+{
+}
+
+// Helper method: clears all timers.
+
+int
+BPR_Handler_Base::clear_all_timers (void *)
+{
+ // Loop through the timers in the queue, cancelling each one.
+
+ for (ACE_Timer_Node_T <ACE_Event_Handler *> *node;
+ (node = queue_.timer_queue ()->get_first ()) != 0;
+ )
+ queue_.timer_queue ()->cancel (node->get_timer_id (), 0, 0);
+ // queue_.cancel (node->get_timer_id (), 0);
+
+ // Invoke the handler's (virtual) destructor
+ delete this;
+
+ return 0;
+}
+
+// Constructor.
+
+Send_Handler::Send_Handler (u_long send_count,
+ const ACE_Time_Value &duration,
+ Bounded_Packet_Relay &relay,
+ Thread_Timer_Queue &queue,
+ Thread_Bounded_Packet_Relay_Driver &driver)
+ : BPR_Handler_Base (relay, queue),
+ send_count_ (send_count),
+ duration_ (duration),
+ driver_ (driver)
+{
+}
+
+// Destructor.
+
+Send_Handler::~Send_Handler (void)
+{
+}
+
+// Call back hook.
+
+int
+Send_Handler::handle_timeout (const ACE_Time_Value &,
+ const void *)
+{
+ switch (relay_.send_input ())
+ {
+ case 0:
+ // Decrement count of packets to relay.
+ --send_count_;
+ /* Fall through to next case. */
+ case 1:
+ if (send_count_ > 0)
+ {
+ // Enqueue a deferred callback to the reregister command.
+ SEND_HANDLER_CMD *re_register_callback_;
+ ACE_NEW_RETURN (re_register_callback_,
+ SEND_HANDLER_CMD (*this,
+ &Send_Handler::reregister),
+ -1);
+ return queue_.enqueue_command (re_register_callback_);
+ }
+ else
+ {
+ // All packets are sent, time to end the transmission, redisplay
+ // the user menu, cancel any other timers, and go away.
+ relay_.end_transmission (Bounded_Packet_Relay::COMPLETED);
+ driver_.display_menu ();
+
+ // Enqueue a deferred callback to the clear_all_timers command.
+ HANDLER_CMD *clear_timers_callback_;
+ ACE_NEW_RETURN (clear_timers_callback_,
+ HANDLER_CMD (*this,
+ &BPR_Handler_Base::clear_all_timers),
+ -1);
+ return queue_.enqueue_command (clear_timers_callback_);
+ }
+ /* NOTREACHED */
+ default:
+ return -1;
+ }
+}
+
+// Cancellation hook.
+
+int
+Send_Handler::cancelled (void)
+{
+ delete this;
+ return 0;
+}
+
+// Helper method: re-registers this timer
+
+int
+Send_Handler::reregister (void *)
+{
+ // Re-register the handler for a new timeout.
+ if (queue_.schedule (this,
+ 0,
+ duration_ + ACE_OS::gettimeofday ()) < 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Send_Handler::reregister: "
+ "failed to reschedule send handler"),
+ -1);
+
+ return 0;
+}
+
+
+// Constructor.
+
+Termination_Handler::Termination_Handler (Bounded_Packet_Relay &relay,
+ Thread_Timer_Queue &queue,
+ Thread_Bounded_Packet_Relay_Driver &driver)
+ : BPR_Handler_Base (relay, queue),
+ driver_ (driver)
+{
+}
+
+// Destructor.
+
+Termination_Handler::~Termination_Handler (void)
+{
+}
+
+// Call back hook.
+
+int
+Termination_Handler::handle_timeout (const ACE_Time_Value &,
+ const void *)
+{
+ // Transmission timed out, so end the transmission, display the user
+ // menu, and register a callback to clear the timer queue and then
+ // make this object go away.
+ relay_.end_transmission (Bounded_Packet_Relay::TIMED_OUT);
+ driver_.display_menu ();
+
+ // Enqueue a deferred callback to the clear_all_timers command.
+ HANDLER_CMD *clear_timers_callback_;
+ ACE_NEW_RETURN (clear_timers_callback_,
+ HANDLER_CMD (*this,
+ &BPR_Handler_Base::clear_all_timers),
+ -1);
+ return queue_.enqueue_command (clear_timers_callback_);
+}
+
+// Cancellation hook
+
+int
+Termination_Handler::cancelled (void)
+{
+ delete this;
+ return 0;
+}
+
+// Constructor.
+
+Thread_Bounded_Packet_Relay_Driver::Thread_Bounded_Packet_Relay_Driver (Bounded_Packet_Relay *relay)
+ : input_task_ (relay, &timer_queue_, *this)
+{
+}
+
+// Destructor.
+
+Thread_Bounded_Packet_Relay_Driver::~Thread_Bounded_Packet_Relay_Driver (void)
+{
+}
+
+// Display the user menu.
+
+int
+Thread_Bounded_Packet_Relay_Driver::display_menu (void)
+{
+ static char menu[] =
+ "\n\n Options:\n"
+ " ----------------------------------------------------------------------\n"
+ " 1 <number of packets to relay in one transmission = %d>\n"
+ " min = 1 packet.\n"
+ " 2 <input packet arrival period (in usec) = %d>\n"
+ " min = 1.\n"
+ " 3 <output packet transmission period (in usec) = %d>\n"
+ " min = 1.\n"
+ " 4 <limit on duration of transmission (in usec) = %d>\n"
+ " min = 1, no limit = 0.\n"
+ " 5 <logging level flags = %d>\n"
+ " no logging = 0,\n"
+ " log packets created by input device = 1,\n"
+ " log packets consumed by output device = 2,\n"
+ " logging options 1,2 = 3,\n"
+ " print contents of packets consumed by output put device = 4,\n"
+ " logging options 1,4 = 5,\n"
+ " logging options 2,4 = 6,\n"
+ " logging options 1,2,4 = 7.\n"
+ " ----------------------------------------------------------------------\n"
+ " 6 - runs a transmission using the current settings\n"
+ " 7 - cancels a transmission (if there is one running)\n"
+ " 8 - reports statistics from the most recent transmission\n"
+ " 9 - quits the program\n"
+ " ----------------------------------------------------------------------\n"
+ " Please enter your choice: ";
+
+ ACE_DEBUG ((LM_DEBUG,
+ menu,
+ this->packet_count (),
+ this->arrival_period (),
+ this->send_period (),
+ this->duration_limit (),
+ this->logging_level ()));
+
+ return 0;
+}
+
+// Initialize the driver.
+
+int
+Thread_Bounded_Packet_Relay_Driver::init (void)
+{
+ // Initialize the <Command> objects with their corresponding
+ // methods from <User_Input_Task>.
+ ACE_NEW_RETURN (packet_count_cmd_,
+ DRIVER_CMD (input_task_,
+ &User_Input_Task::set_packet_count),
+ -1);
+ ACE_NEW_RETURN (arrival_period_cmd_,
+ DRIVER_CMD (input_task_,
+ &User_Input_Task::set_arrival_period),
+ -1);
+ ACE_NEW_RETURN (transmit_period_cmd_,
+ DRIVER_CMD (input_task_,
+ &User_Input_Task::set_send_period),
+ -1);
+ ACE_NEW_RETURN (duration_limit_cmd_,
+ DRIVER_CMD (input_task_,
+ &User_Input_Task::set_duration_limit),
+ -1);
+ ACE_NEW_RETURN (logging_level_cmd_,
+ DRIVER_CMD (input_task_,
+ &User_Input_Task::set_logging_level),
+ -1);
+ ACE_NEW_RETURN (run_transmission_cmd_,
+ DRIVER_CMD (input_task_,
+ &User_Input_Task::run_transmission),
+ -1);
+ ACE_NEW_RETURN (cancel_transmission_cmd_,
+ DRIVER_CMD (input_task_,
+ &User_Input_Task::end_transmission),
+ -1);
+ ACE_NEW_RETURN (report_stats_cmd_,
+ DRIVER_CMD (input_task_,
+ &User_Input_Task::report_stats),
+ -1);
+ ACE_NEW_RETURN (shutdown_cmd_,
+ DRIVER_CMD (input_task_,
+ &User_Input_Task::shutdown),
+ -1);
+ if (this->input_task_.activate () == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "cannot activate input task"),
+ -1);
+ else if (this->timer_queue_.activate () == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "cannot activate timer queue"),
+ -1);
+ else if (ACE_Thread_Manager::instance ()->wait () == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "wait on Thread_Manager failed"),
+ -1);
+ return 0;
+}
+
+// Run the driver
+
+int
+Thread_Bounded_Packet_Relay_Driver::run (void)
+{
+ this->init ();
+ return 0;
+}
+
diff --git a/ACE/examples/Bounded_Packet_Relay/Thread_Bounded_Packet_Relay.h b/ACE/examples/Bounded_Packet_Relay/Thread_Bounded_Packet_Relay.h
new file mode 100644
index 00000000000..ccd5782b487
--- /dev/null
+++ b/ACE/examples/Bounded_Packet_Relay/Thread_Bounded_Packet_Relay.h
@@ -0,0 +1,400 @@
+/* -*- C++ -*- */
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// examples
+//
+// = FILENAME
+// Thread_Bounded_Packet_Relay.h
+//
+// = DESCRIPTION
+// This code provides a thread based implementation
+// of the bounded packet relay example.
+//
+// = AUTHORS
+// Chris Gill <cdgill@cs.wustl.edu> and
+// Douglas C. Schmidt <schmidt@cs.wustl.edu>
+//
+// Based on the Timer Queue Test example written by
+//
+// Carlos O'Ryan <coryan@cs.wustl.edu> and
+// Douglas C. Schmidt <schmidt@cs.wustl.edu> and
+// Sergio Flores-Gaitan <sergio@cs.wustl.edu>
+//
+// ============================================================================
+
+#ifndef _THREAD_BOUNDED_PACKET_RELAY_H_
+#define _THREAD_BOUNDED_PACKET_RELAY_H_
+
+#include "ace/Functor_T.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+#include "ace/Task.h"
+#include "ace/Timer_Heap_T.h"
+#include "ace/Timer_Queue_Adapters.h"
+#include "BPR_Drivers.h"
+
+// These typedefs ensure that we use the minimal amount of locking
+// necessary.
+typedef ACE_Event_Handler_Handle_Timeout_Upcall<ACE_Null_Mutex>
+ Upcall;
+typedef ACE_Timer_Heap_T<ACE_Event_Handler *,
+ Upcall,
+ ACE_Null_Mutex>
+ Timer_Heap;
+typedef ACE_Timer_Heap_Iterator_T<ACE_Event_Handler *,
+ Upcall,
+ ACE_Null_Mutex>
+ Timer_Heap_Iterator;
+typedef ACE_Thread_Timer_Queue_Adapter<Timer_Heap>
+ Thread_Timer_Queue;
+
+// Forward declaration.
+class Thread_Bounded_Packet_Relay_Driver;
+
+class Text_Input_Device_Wrapper : public Input_Device_Wrapper_Base
+{
+ // = TITLE
+ // Defines a wrapper for a simple active looping text input
+ // pseudo-device.
+ //
+ // = DESCRIPTION
+ // The wrapper is an active object, running in its own thread,
+ // and uses a reactor to generate timeouts. When a timeout
+ // occurs, the wrapper calls its concrete message creation
+ // method. The wrapper then calls its base class message send
+ // method to forward the message to the receiver.
+ //
+ // A more sophisticated version of this class would use the
+ // reactive capabilities as well as the timeout generating
+ // capabilities of the reactor, multiplexing several input
+ // streams. Comments to this effect appear in the definition of
+ // the event loop method.
+public:
+
+ // = Enumerated logging level flags
+ enum Logging_Flags {NO_LOGGING = 0,
+ LOG_MSGS_CREATED = 1};
+
+ // = Initialization and termination methods.
+ Text_Input_Device_Wrapper (ACE_Thread_Manager *input_task_mgr,
+ size_t read_length,
+ const char* text,
+ int logging = 0);
+ // Constructor.
+
+ virtual ~Text_Input_Device_Wrapper (void);
+ // Destructor.
+
+ virtual int modify_device_settings (void *logging);
+ // Modifies device settings based on passed pointer to a u_long.
+ // Turns logging on if u_long is non-zero, off if u_long is zero,
+ // and does nothing if the pointer is null.
+
+protected:
+ virtual ACE_Message_Block *create_input_message (void);
+ // Creates a new message block, carrying data read from the
+ // underlying input device.
+
+private:
+ size_t read_length_;
+ // Length of the buffer into which to "read".
+
+ const char *text_;
+ // Text to "read" into the buffer.
+
+ size_t index_;
+ // Index into the string.
+
+ int logging_;
+ // This value is 0 if logging is turned off, non-zero otherwise
+
+ u_long packet_count_;
+ // This value holds a count of packets created.
+
+};
+
+class Text_Output_Device_Wrapper : public Output_Device_Wrapper_Base
+{
+ // = TITLE
+ // Implements a simple wrapper for a output pseudo-device.
+ //
+ // = DESCRIPTION
+ // Data from the passed output message is printed to the standard
+ // output stream, if logging is turned on.
+public:
+
+ // = Enumerated logging level flags
+ enum Logging_Flags {NO_LOGGING = 0,
+ LOG_MSGS_RCVD = 2,
+ PRINT_MSGS_RCVD = 4};
+
+ Text_Output_Device_Wrapper (int logging = 0);
+ // Default constructor.
+
+ // = Command Accessible Entry Points
+
+ virtual int write_output_message (void *message);
+ // Consumes and possibly prints out the passed message.
+
+ virtual int modify_device_settings (void *logging);
+ // Modifies device settings based on passed pointer to a u_long.
+ // Turns logging on if u_long is non-zero, off if u_long is zero,
+ // and does nothing if the pointer is null.
+
+private:
+
+ int logging_;
+ // This value holds the logging level.
+
+ u_long packet_count_;
+ // This value holds a count of packets received.
+
+};
+
+class User_Input_Task : public ACE_Task_Base
+{
+ // = TITLE
+ // Read user actions on the Timer_Queue from stdin.
+ //
+ // = DESCRIPTION
+ // This class reads user input from stdin. The commands allow
+ // the control of a Timer_Queue, which is dispatched by another
+ // thread.
+public:
+
+ // = Trait for command accessible entry points.
+
+ typedef int (User_Input_Task::*ACTION) (void *);
+
+ User_Input_Task (Bounded_Packet_Relay *relay,
+ Thread_Timer_Queue *queue,
+ Thread_Bounded_Packet_Relay_Driver &timer_queue_driver);
+ // Constructor.
+
+ virtual ~User_Input_Task (void);
+ // Destructor.
+
+ virtual int svc (void);
+ // This method runs the event loop in the new thread.
+
+ // = Some helper methods.
+
+ int set_packet_count (void *);
+ // Sets the number of packets for the next transmission.
+
+ int set_arrival_period (void *);
+ // Sets the input device packet arrival period (usecs) for the next
+ // transmission.
+
+ int set_send_period (void *);
+ // Sets the period between output device sends (usecs) for the next
+ // transmission.
+
+ int set_duration_limit (void *);
+ // Sets a limit on the transmission duration (usecs).
+
+ int set_logging_level (void *);
+ // Sets logging level (0 or 1) for output device for the next
+ // transmission.
+
+ int run_transmission (void *);
+ // Runs the next transmission (if one is not in progress).
+
+ int end_transmission (void *);
+ // Ends the current transmission (if one is in progress).
+
+ int report_stats (void *);
+ // Reports statistics for the previous transmission (if one is not
+ // in progress).
+
+ int shutdown (void *);
+ // Shuts down the task.
+
+ int clear_all_timers (void);
+ // Helper method: clears all timers.
+
+private:
+ const int usecs_;
+ // How many microseconds are in a second.
+
+ Bounded_Packet_Relay *relay_;
+ // The bounded packet relay.
+
+ Thread_Timer_Queue *queue_;
+ // The timer queue implementation.
+
+ Thread_Bounded_Packet_Relay_Driver &driver_;
+ // The thread timer queue test driver.
+};
+
+class BPR_Handler_Base : public ACE_Event_Handler
+{
+ // = TITLE
+ // Base event handler class for bounded packet relay example.
+ //
+ // = DESCRIPTION
+ // The base class provides a helper method that derived classes
+ // can register as a deferred execution callback that will cancel
+ // all timers in the underlying timer queue, and then delete "this".
+ //
+public:
+
+ // = Trait for command accessible entry points.
+
+ typedef int (BPR_Handler_Base::*ACTION) (void *);
+
+
+ BPR_Handler_Base (Bounded_Packet_Relay &relay,
+ Thread_Timer_Queue &queue);
+ // Constructor.
+
+ virtual ~BPR_Handler_Base (void);
+ // Destructor.
+
+ // = Command accessible entry points.
+
+ virtual int clear_all_timers (void *);
+ // Helper method: clears all timers.
+
+protected:
+ Bounded_Packet_Relay &relay_;
+ // Stores a reference to the relay object on which to invoke
+ // the appropritate calls when the timer expires.
+
+ Thread_Timer_Queue &queue_;
+ // Store a reference to the timer queue, in which to re-register
+ // the send timer and handler if there are still sends to perform.
+};
+
+class Send_Handler;
+
+class Send_Handler : public BPR_Handler_Base
+{
+ // = TITLE
+ // Event handler for message send timeout events.
+ //
+ // = DESCRIPTION
+ // The <handle_timeout> hook method calls the relay's send
+ // method and decrements its count of messages to send.
+ // If there are still messages to send, it re-registers itself
+ // with the timer queue. Otherwise it calls the relay's end
+ // transmission method, and registers a deferred execution
+ // callback to clear the timer queue, and then delete "this".
+public:
+
+ // = Trait for command accessible entry points.
+
+ typedef int (Send_Handler::*ACTION) (void *);
+
+ Send_Handler (u_long send_count,
+ const ACE_Time_Value &duration,
+ Bounded_Packet_Relay &relay,
+ Thread_Timer_Queue &queue,
+ Thread_Bounded_Packet_Relay_Driver &driver);
+ // Constructor.
+
+ virtual ~Send_Handler (void);
+ // Destructor.
+
+ virtual int handle_timeout (const ACE_Time_Value &current_time,
+ const void *arg);
+ // Call back hook.
+
+ virtual int cancelled (void);
+ // Cancellation hook.
+
+ // = Command accessible entry points.
+
+ virtual int reregister (void *timeout);
+ // Helper method: re-registers this handler.
+
+private:
+
+ u_long send_count_;
+ // Count of the number of messages to send from the
+ // relay object to the output device object.
+
+ ACE_Time_Value duration_;
+ // Stores the expected duration until expiration, and is used to
+ // re-register the handler if there are still sends to perform.
+
+ Thread_Bounded_Packet_Relay_Driver &driver_;
+ // Reference to the driver that will redisplay the user input menu.
+};
+
+class Termination_Handler : public BPR_Handler_Base
+{
+ // = TITLE
+ // Event handler for end transmission timeout events.
+ //
+ // = DESCRIPTION
+ // The <handle_timeout> hook method calls the relay's end
+ // transmission method, then registers a deferred execution
+ // callback to clear all timers and then delete "this".
+public:
+ Termination_Handler (Bounded_Packet_Relay &relay,
+ Thread_Timer_Queue &queue,
+ Thread_Bounded_Packet_Relay_Driver &driver);
+ // Constructor.
+
+ virtual ~Termination_Handler (void);
+ // Destructor.
+
+ virtual int handle_timeout (const ACE_Time_Value &current_time,
+ const void *arg);
+ // Call back hook.
+
+ virtual int cancelled (void);
+ // Cancellation hook.
+
+private:
+ Thread_Bounded_Packet_Relay_Driver &driver_;
+ // Reference to the driver that will redisplay the user input menu.
+};
+
+class Thread_Bounded_Packet_Relay_Driver : public Bounded_Packet_Relay_Driver <Thread_Timer_Queue>
+{
+ // = TITLE
+ // Implements an example application that exercises
+ // <Thread_Timer_Queue> timer queue.
+ //
+ // = DESCRIPTION
+ // This class implements a simple test driver for the
+ // <Thread_Timer_Queue>. The <display_menu> hook method is
+ // called from the base class to print a menu specific to the
+ // thread implementation of the timer queue.
+public:
+
+ // = Trait for commands issued from this driver
+
+ typedef ACE_Command_Callback<User_Input_Task, User_Input_Task::ACTION> MYCOMMAND;
+
+ // = Initialization and termination methods.
+
+ Thread_Bounded_Packet_Relay_Driver (Bounded_Packet_Relay *relay);
+ // Constructor.
+
+ virtual ~Thread_Bounded_Packet_Relay_Driver (void);
+ // Destructor.
+
+ virtual int display_menu (void);
+ // Displays the user menu.
+
+ virtual int init (void);
+ // Initializes the driver.
+
+ virtual int run (void);
+ // Run the driver.
+
+private:
+ User_Input_Task input_task_;
+ // User input task, subclassed from ACE_Task.
+};
+
+#endif /* _THREAD_BOUNDED_PACKET_RELAY_H_ */
diff --git a/ACE/examples/Bounded_Packet_Relay/bpr_thread.cpp b/ACE/examples/Bounded_Packet_Relay/bpr_thread.cpp
new file mode 100644
index 00000000000..0a564ecd2a5
--- /dev/null
+++ b/ACE/examples/Bounded_Packet_Relay/bpr_thread.cpp
@@ -0,0 +1,122 @@
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// examples
+//
+// = FILENAME
+// bpr_thread.cpp
+//
+// = DESCRIPTION
+// Exercises drivers for a bounded packet relay, based on threaded timer queues.
+//
+// = AUTHORS
+// Chris Gill <cdgill@cs.wustl.edu> and
+// Douglas C. Schmidt <schmidt@cs.wustl.edu>
+//
+// Based on the Timer Queue Test example written by
+//
+// Carlos O'Ryan <coryan@cs.wustl.edu> and
+// Douglas C. Schmidt <schmidt@cs.wustl.edu> and
+// Sergio Flores-Gaitan <sergio@cs.wustl.edu>
+//
+// ============================================================================
+
+// The following #pragma is needed to disable a warning that occurs
+// in MSVC 6 due to the overly long debugging symbols generated for
+// the std::auto_ptr<Timer_Queue_Test_Driver<...> > template
+// instance used by some of the methods in this file.
+#ifdef _MSC_VER
+# pragma warning(disable: 4786) /* identifier was truncated to '255'
+ characters in the browser
+ information */
+#endif /* _MSC_VER */
+
+#include "ace/Auto_Ptr.h"
+#include "Thread_Bounded_Packet_Relay.h"
+
+ACE_RCSID (Bounded_Packet_Relay,
+ bpr_thread,
+ "$Id$")
+
+typedef Bounded_Packet_Relay_Driver<Thread_Timer_Queue>
+ THREAD_BOUNDED_PACKET_RELAY_DRIVER;
+
+typedef ACE_Command_Callback<Bounded_Packet_Relay,Bounded_Packet_Relay::ACTION>
+ INPUT_CALLBACK;
+
+// A snippet from Andrew Marvell (Oliver Cromwell's poet laureate)
+static const char input_text [] =
+"But ever at my back I hear\n"
+" Time's winged chariot hurrying near.";
+
+int
+ACE_TMAIN (int, ACE_TCHAR *[])
+{
+ // Construct a new thread manager for the input device task. Auto
+ // ptr ensures memory is freed when we exit this scope.
+ ACE_Thread_Manager *input_task_mgr;
+ ACE_NEW_RETURN (input_task_mgr,
+ ACE_Thread_Manager,
+ -1);
+ auto_ptr <ACE_Thread_Manager> mgr (input_task_mgr);
+
+ // Construct a new input device wrapper. Auto ptr ensures memory is
+ // freed when we exit this scope.
+ Text_Input_Device_Wrapper *input_device;
+ ACE_NEW_RETURN (input_device,
+ Text_Input_Device_Wrapper (input_task_mgr,
+ sizeof (input_text),
+ input_text),
+ -1);
+ auto_ptr <Text_Input_Device_Wrapper> input (input_device);
+
+ // Construct a new output device wrapper. Auto ptr ensures memory
+ // is freed when we exit this scope.
+ Text_Output_Device_Wrapper *output_device;
+ ACE_NEW_RETURN (output_device,
+ Text_Output_Device_Wrapper,
+ -1);
+ auto_ptr <Text_Output_Device_Wrapper> output (output_device);
+
+ // Construct a new bounded packet relay. Auto ptr ensures memory is
+ // freed when we exit this scope.
+ Bounded_Packet_Relay *packet_relay;
+ ACE_NEW_RETURN (packet_relay,
+ Bounded_Packet_Relay (input_task_mgr,
+ input_device,
+ output_device),
+ -1);
+ auto_ptr <Bounded_Packet_Relay> relay (packet_relay);
+
+ // Construct a receive input callback command for the relay, and register
+ // it with the input device. Auto ptr ensures memory is freed when we exit
+ // this scope.
+ INPUT_CALLBACK *input_callback;
+ ACE_NEW_RETURN (input_callback,
+ INPUT_CALLBACK (*packet_relay,
+ &Bounded_Packet_Relay::receive_input),
+ -1);
+ auto_ptr <INPUT_CALLBACK> callback (input_callback);
+ if (input_device->set_send_input_msg_cmd (input_callback) < 0)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "failed to register input callback"),
+ -1);
+ }
+
+ // Construct a new bounded packet relay driver. Auto ptr ensures
+ // memory is freed when we exit this scope.
+ THREAD_BOUNDED_PACKET_RELAY_DRIVER *tbprd;
+
+ ACE_NEW_RETURN (tbprd,
+ Thread_Bounded_Packet_Relay_Driver (packet_relay),
+ -1);
+
+ auto_ptr <THREAD_BOUNDED_PACKET_RELAY_DRIVER> driver (tbprd);
+
+ return driver->run ();
+ // All dynamically allocated memory is released when main() returns.
+}
+