diff options
Diffstat (limited to 'ACE/examples/Bounded_Packet_Relay')
-rw-r--r-- | ACE/examples/Bounded_Packet_Relay/.cvsignore | 2 | ||||
-rw-r--r-- | ACE/examples/Bounded_Packet_Relay/BPR_Drivers.cpp | 535 | ||||
-rw-r--r-- | ACE/examples/Bounded_Packet_Relay/BPR_Drivers.h | 290 | ||||
-rw-r--r-- | ACE/examples/Bounded_Packet_Relay/BPR_Drivers_T.cpp | 322 | ||||
-rw-r--r-- | ACE/examples/Bounded_Packet_Relay/BPR_Drivers_T.h | 178 | ||||
-rw-r--r-- | ACE/examples/Bounded_Packet_Relay/Bounded_Packet_Relay.mpc | 7 | ||||
-rw-r--r-- | ACE/examples/Bounded_Packet_Relay/Makefile.am | 39 | ||||
-rw-r--r-- | ACE/examples/Bounded_Packet_Relay/README | 194 | ||||
-rw-r--r-- | ACE/examples/Bounded_Packet_Relay/Thread_Bounded_Packet_Relay.cpp | 770 | ||||
-rw-r--r-- | ACE/examples/Bounded_Packet_Relay/Thread_Bounded_Packet_Relay.h | 400 | ||||
-rw-r--r-- | ACE/examples/Bounded_Packet_Relay/bpr_thread.cpp | 122 |
11 files changed, 2859 insertions, 0 deletions
diff --git a/ACE/examples/Bounded_Packet_Relay/.cvsignore b/ACE/examples/Bounded_Packet_Relay/.cvsignore new file mode 100644 index 00000000000..2f50e8efaee --- /dev/null +++ b/ACE/examples/Bounded_Packet_Relay/.cvsignore @@ -0,0 +1,2 @@ +bpr_thread +bpr_thread diff --git a/ACE/examples/Bounded_Packet_Relay/BPR_Drivers.cpp b/ACE/examples/Bounded_Packet_Relay/BPR_Drivers.cpp new file mode 100644 index 00000000000..fb646962004 --- /dev/null +++ b/ACE/examples/Bounded_Packet_Relay/BPR_Drivers.cpp @@ -0,0 +1,535 @@ +// $Id$ + +// ============================================================================ +// = LIBRARY +// examples +// +// = FILENAME +// BPR_Driver.cpp +// +// = DESCRIPTION +// This code builds an abstraction to factor out common code for +// the different implementations of the Timer_Queue. +// +// = AUTHORS +// Chris Gill <cdgill@cs.wustl.edu> and +// Douglas C. Schmidt <schmidt@cs.wustl.edu> +// +// Based on the Timer Queue Test example written by +// +// Carlos O'Ryan <coryan@cs.wustl.edu> and +// Douglas C. Schmidt <schmidt@cs.wustl.edu> and +// Sergio Flores-Gaitan <sergio@cs.wustl.edu> +// +// ============================================================================ + +#include "ace/OS_NS_sys_time.h" +#include "BPR_Drivers.h" + +ACE_RCSID(Bounded_Packet_Relay, BPR_Drivers, "$Id$") + +// Constructor. + +Input_Device_Wrapper_Base::Input_Device_Wrapper_Base (ACE_Thread_Manager *input_task_mgr) + : ACE_Task_Base (input_task_mgr), + send_input_msg_cmd_ (0), + input_period_ (ACE_ONE_SECOND_IN_USECS), + is_active_ (0), + send_count_ (0) +{ +} + +// Destructor. + +Input_Device_Wrapper_Base::~Input_Device_Wrapper_Base (void) +{ +} + +// Sets send input message command in the input device driver object. + +int +Input_Device_Wrapper_Base::set_send_input_msg_cmd (ACE_Command_Base *send_input_msg_cmd) +{ + // Set the new command. Input device is not responsible + // for deleting the old command, if any. + send_input_msg_cmd_ = send_input_msg_cmd; + return 0; +} + +// Sets period between when input messages are produced. + +int +Input_Device_Wrapper_Base::set_input_period (u_long input_period) +{ + input_period_ = input_period; + return 0; +} + +// Sets count of messages to send. + +int +Input_Device_Wrapper_Base::set_send_count (long count) +{ + send_count_ = count; + return 0; +} + +// Request that the input device stop sending messages +// and terminate its thread. Should return 1 if it will do so, 0 +// if it has already done so, or -1 if there is a problem doing so. + +int +Input_Device_Wrapper_Base::request_stop (void) +{ + if (is_active_) + { + is_active_ = 0; + return 1; + } + + return 0; +} + +// This method runs the input device loop in the new thread. + +int +Input_Device_Wrapper_Base::svc (void) +{ + ACE_Time_Value timeout; + ACE_Message_Block *message; + + // Set a flag to indicate we're active. + is_active_ = 1; + + // Start with the total count of messages to send. + for (current_count_ = send_count_; + // While we're still marked active, and there are packets to send. + (is_active_) && (current_count_ != 0); + ) + { + // Create an input message to send. + message = create_input_message (); + if (message == 0) + { + if (is_active_) + { + is_active_ = 0; + ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n", + "Failed to create input message object"), + -1); + } + + break; + } + + // Make sure there is a send command object. + if (send_input_msg_cmd_ == 0) + { + delete message; + if (is_active_) + { + is_active_ = 0; + ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n", + "send message command object not instantiated"), + -1); + } + + break; + } + + // Send the input message. + if (send_input_msg_cmd_->execute ((void *) message) < 0) + { + delete message; + if (is_active_) + { + is_active_ = 0; + ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n", + "Failed executing send message command object"), + -1); + } + + break; + } + + // If all went well, decrement count of messages to send, and + // run the reactor event loop unti we get a timeout or something + // happens in a registered upcall. + if (current_count_ > 0) + --current_count_; + + timeout = ACE_Time_Value (0, input_period_); + reactor_.run_event_loop (timeout); + } + + is_active_ = 0; + + return 0; +} + +// Sends a newly created message block, carrying data read from the +// underlying input device, by passing a pointer to the message block +// to its command execution. + +int +Input_Device_Wrapper_Base::send_input_message (ACE_Message_Block *amb) +{ + if (send_input_msg_cmd_) + return send_input_msg_cmd_->execute ((void *) amb); + else + { + if (is_active_) + ACE_ERROR ((LM_ERROR, "%t %p\n", + "Input_Device_Wrapper_Base::send_input_message: " + "command object not instantiated")); + + return -1; + } +} + +Output_Device_Wrapper_Base::~Output_Device_Wrapper_Base (void) +{ +} + +// Constructor. + +Bounded_Packet_Relay::Bounded_Packet_Relay (ACE_Thread_Manager *input_task_mgr, + Input_Device_Wrapper_Base *input_wrapper, + Output_Device_Wrapper_Base *output_wrapper) + : is_active_ (0), + input_task_mgr_ (input_task_mgr), + input_wrapper_ (input_wrapper), + output_wrapper_ (output_wrapper), + queue_ (Bounded_Packet_Relay::DEFAULT_HWM, + Bounded_Packet_Relay::DEFAULT_LWM), + queue_hwm_ (Bounded_Packet_Relay::DEFAULT_HWM), + queue_lwm_ (Bounded_Packet_Relay::DEFAULT_LWM), + transmission_number_ (0), + packets_sent_ (0), + status_ (Bounded_Packet_Relay::UN_INITIALIZED), + transmission_start_ (ACE_Time_Value::zero), + transmission_end_ (ACE_Time_Value::zero) +{ + if (input_task_mgr_ == 0) + input_task_mgr_ = ACE_Thread_Manager::instance (); +} + +// Destructor. + +Bounded_Packet_Relay::~Bounded_Packet_Relay (void) +{ + // Reactivate the queue, and then clear it. + queue_.activate (); + while (! queue_.is_empty ()) + { + ACE_Message_Block *msg; + queue_.dequeue_head (msg); + delete msg; + } + +} + +// Requests output be sent to output device. + +int +Bounded_Packet_Relay::send_input (void) +{ + // Don't block, return immediately if queue is empty. + ACE_Message_Block *item; + + // Using a separate (non-const) time value + // is necessary on some platforms + ACE_Time_Value immediate (ACE_Time_Value::zero); + + if (queue_.dequeue_head (item, + &immediate) < 0) + return 1; + + // If a message block was dequeued, send it to the output device. + + if (output_wrapper_->write_output_message ((void *) item) < 0) + { + if (is_active_) + ACE_ERROR ((LM_ERROR, + "%t %p\n", + "failed to write to output device object")); + + return -1; + } + + // If all went OK, increase count of packets sent. + ++packets_sent_; + return 0; +} + +// Requests a transmission be started. + +int +Bounded_Packet_Relay::start_transmission (u_long packet_count, + u_long arrival_period, + int logging_level) +{ + // Serialize access to start and end transmission calls, statistics + // reporting calls. + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->transmission_lock_, -1); + + // If a transmission is already in progress, just return. + if (is_active_) + return 1; + + // Set transmission in progress flag true. + is_active_ = 1; + + // Update statistics for a new transmission. + ++transmission_number_; + packets_sent_ = 0; + status_ = STARTED; + transmission_start_ = ACE_OS::gettimeofday (); + + // Reactivate the queue, and then clear it. + queue_.activate (); + while (! queue_.is_empty ()) + { + ACE_Message_Block *msg; + queue_.dequeue_head (msg); + delete msg; + } + + // Initialize the output device. + if (output_wrapper_->modify_device_settings ((void *) &logging_level) < 0) + { + status_ = ERROR_DETECTED; + transmission_end_ = ACE_OS::gettimeofday (); + is_active_ = 0; + ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n", + "failed to initialize output device object"), + -1); + } + // Initialize the input device. + if (input_wrapper_->modify_device_settings ((void *) &logging_level) < 0) + { + status_ = ERROR_DETECTED; + transmission_end_ = ACE_OS::gettimeofday (); + is_active_ = 0; + ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n", + "failed to initialize output device object"), + -1); + } + else if (input_wrapper_->set_input_period (arrival_period) < 0) + { + status_ = ERROR_DETECTED; + transmission_end_ = ACE_OS::gettimeofday (); + is_active_ = 0; + ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n", + "failed to initialize input device object"), + -1); + } + else if (input_wrapper_->set_send_count (packet_count) < 0) + { + status_ = ERROR_DETECTED; + transmission_end_ = ACE_OS::gettimeofday (); + is_active_ = 0; + ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n", + "failed to initialize input device object"), + -1); + } + // Activate the input device. + else if (input_wrapper_->activate () < 0) + { + status_ = ERROR_DETECTED; + transmission_end_ = ACE_OS::gettimeofday (); + is_active_ = 0; + ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n", + "failed to activate input device object"), + -1); + } + + // If all went well, print a startup message and return success. + ACE_DEBUG ((LM_DEBUG, + "\n\nTransmission %u started\n\n", + transmission_number_)); + return 0; +} + +// Requests a transmission be ended. + +int +Bounded_Packet_Relay::end_transmission (Transmission_Status status) +{ + // Serialize access to start and end transmission calls, + // statistics reporting calls. + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->transmission_lock_, -1); + + // If a transmission is not already in progress, just return. + if (! is_active_) + return 1; + + // Set transmission in progress flag false. + is_active_ = 0; + + // Ask the the input thread to stop. + if (input_wrapper_->request_stop () < 0) + { + status_ = ERROR_DETECTED; + transmission_end_ = ACE_OS::gettimeofday (); + ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n", + "failed asking input device thread to stop"), + -1); + } + + // Deactivate the queue, allowing all waiting threads to continue. + queue_.deactivate (); + + // Wait for input thread to stop. + input_task_mgr_->wait_task (input_wrapper_); + + // Reactivate the queue, and then clear it. + queue_.activate (); + while (! queue_.is_empty ()) + { + ACE_Message_Block *msg; + queue_.dequeue_head (msg); + delete msg; + } + + // If all went well, set passed status, stamp end time, print a + // termination message, and return success. + status_ = status; + transmission_end_ = ACE_OS::gettimeofday (); + ACE_DEBUG ((LM_DEBUG, + "\n\nTransmission %u ended with status: %s\n\n", + transmission_number_, status_msg ())); + return 0; +} + +// Requests a report of statistics from the last transmission. + +int +Bounded_Packet_Relay::report_statistics (void) +{ + // Serialize access to start and end transmission calls, + // statistics reporting calls. + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->transmission_lock_, -1); + + // If a transmission is already in progress, just return. + if (is_active_) + return 1; + + // Calculate duration of trasmission. + ACE_Time_Value duration (transmission_end_); + duration -= transmission_start_; + + // Report transmission statistics. + ACE_DEBUG ((LM_DEBUG, + "\n\nStatisics for transmission %u:\n\n" + "Transmission status: %s\n" + "Start time: %d (sec) %d (usec)\n" + "End time: %d (sec) %d (usec)\n" + "Duration: %d (sec) %d (usec)\n" + "Packets relayed: %u\n\n", + transmission_number_, status_msg (), + transmission_start_.sec (), + transmission_start_.usec (), + transmission_end_.sec (), + transmission_end_.usec (), + duration.sec (), + duration.usec (), + packets_sent_)); + return 0; +} + +// Public entry point to which to push input. + +int +Bounded_Packet_Relay::receive_input (void * arg) +{ + if (! arg) + { + if (is_active_) + ACE_ERROR ((LM_ERROR, "%t %p\n", + "Bounded_Packet_Relay::receive_input: " + "null argument")); + + return -1; + } + ACE_Message_Block *message = static_cast<ACE_Message_Block *> (arg); + if (queue_.enqueue_tail (message) < 0) + { + if (is_active_) + ACE_ERROR ((LM_ERROR, "%t %p\n", + "Bounded_Packet_Relay::receive_input failed")); + + return -1; + } + + return 0; +} + +// Get high water mark for relay queue. + +ACE_UINT32 +Bounded_Packet_Relay::queue_hwm (void) +{ + return queue_lwm_; +} + + +// Set high water mark for relay queue. + +void +Bounded_Packet_Relay::queue_hwm (ACE_UINT32 hwm) +{ + queue_hwm_ = hwm; +} + +// Get low water mark for relay queue. + +ACE_UINT32 +Bounded_Packet_Relay::queue_lwm (void) +{ + return queue_lwm_; +} + +// Set low water mark for relay queue. + +void +Bounded_Packet_Relay::queue_lwm (ACE_UINT32 lwm) +{ + queue_lwm_ = lwm; +} + + + +// Returns string corresponding to current status. + +const char * +Bounded_Packet_Relay::status_msg (void) +{ + const char *status_msg; + switch (status_) + { + case UN_INITIALIZED: + status_msg = "uninitialized"; + break; + case STARTED: + status_msg = "in progress"; + break; + case COMPLETED: + status_msg = "completed with all packets sent"; + break; + case TIMED_OUT: + status_msg = "terminated by transmission duration timer"; + break; + case CANCELLED: + status_msg = "cancelled by external control"; + break; + case ERROR_DETECTED: + status_msg = "error was detected"; + break; + default: + status_msg = "unknown transmission status"; + break; + } + + return status_msg; +} diff --git a/ACE/examples/Bounded_Packet_Relay/BPR_Drivers.h b/ACE/examples/Bounded_Packet_Relay/BPR_Drivers.h new file mode 100644 index 00000000000..f39a544c512 --- /dev/null +++ b/ACE/examples/Bounded_Packet_Relay/BPR_Drivers.h @@ -0,0 +1,290 @@ +/* -*- C++ -*- */ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// examples +// +// = FILENAME +// BPR_Drivers.h +// +// = DESCRIPTION +// This code builds abstractions to factor out common code from +// the different possible implementations of the Timer_Queue based +// bounded packet relay example. +// +// = AUTHORS +// Chris Gill <cdgill@cs.wustl.edu> and +// Douglas C. Schmidt <schmidt@cs.wustl.edu> +// +// Based on the Timer Queue Test example written by +// +// Carlos O'Ryan <coryan@cs.wustl.edu> and +// Douglas C. Schmidt <schmidt@cs.wustl.edu> and +// Sergio Flores-Gaitan <sergio@cs.wustl.edu> +// +// ============================================================================ + +#ifndef _BPR_DRIVERS_H_ +#define _BPR_DRIVERS_H_ + +#include "ace/Functor.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "ace/Reactor.h" +#include "ace/Task.h" + +// forward declarations +class Input_Device_Wrapper_Base; +class Output_Device_Wrapper_Base; + +class Bounded_Packet_Relay +{ + // = TITLE + // This class defines a packet relay abstraction for a + // transmission bounded external commands to start and end the + // transmission. The transmission may be bounded by the number + // of packets to send, the dration of the transmission, or any + // other factors. + // + // = DESCRIPTION + // The relay abstraction implemented by this class registers a + // callback command with an input device wrapper, and relays + // input to an output device at a pace specified in the start + // transmission call. +public: + // = Enumerates possible status values for a transmission. + enum Transmission_Status + { + UN_INITIALIZED, + STARTED, + COMPLETED, + TIMED_OUT, + CANCELLED, + ERROR_DETECTED + }; + + enum Queue_Defaults + { + DEFAULT_HWM = 0x7FFFFFFF, + DEFAULT_LWM = 0x7FFFFFFF + }; + + typedef int (Bounded_Packet_Relay::*ACTION) (void *); + // Command entry point type definition. + + // = Initialization method + + Bounded_Packet_Relay (ACE_Thread_Manager *input_task_mgr, + Input_Device_Wrapper_Base *input_wrapper, + Output_Device_Wrapper_Base *output_wrapper); + // Constructor. + + virtual ~Bounded_Packet_Relay (void); + // Destructor. + + int send_input (void); + // Requests output be sent to output device. + + int start_transmission (u_long packet_count, + u_long arrival_period, + int logging_level); + // Requests a transmission be started. + + int end_transmission (Transmission_Status status); + // Requests a transmission be ended. + + int report_statistics (void); + // Requests a report of statistics from the last transmission. + + // = Command accessible entry points. + + int receive_input (void *); + // Public entry point to which to push input. + + // = Accessors and mutators for relay settings + + ACE_UINT32 queue_hwm (void); + // Get high water mark for relay queue. + + void queue_hwm (ACE_UINT32 hwm); + // Set high water mark for relay queue. + + ACE_UINT32 queue_lwm (void); + // Get low water mark for relay queue. + + void queue_lwm (ACE_UINT32 lwm); + // Set low water mark for relay queue. + +private: + // = Concurrency Management. + + int is_active_; + // flag for whether or not a transmission is active + + ACE_Thread_Manager * input_task_mgr_; + // Thread manager for the input device task. + + Input_Device_Wrapper_Base * input_wrapper_; + // Pointer to the input device wrapper. + + Output_Device_Wrapper_Base * output_wrapper_; + // Pointer to the output device wrapper. + + ACE_Message_Queue<ACE_SYNCH> queue_; + // Queue used to buffer input messages. + + ACE_UINT32 queue_hwm_; + // High water mark for relay queue. + + ACE_UINT32 queue_lwm_; + // Low water mark for relay queue. + + ACE_SYNCH_MUTEX transmission_lock_; + // Lock for thread-safe synchronization of transmission startup and + // termination. + + // = Transmission Statistics + + const char *status_msg (void); + // Returns string corresponding to current status. + + u_long transmission_number_; + // Number of transmissions sent. + + u_long packets_sent_; + // Count of packets sent in the most recent transmission. + + Transmission_Status status_; + // Status of the current or most recent transmission. + + ACE_Time_Value transmission_start_; + // Start time of the most recent transmission. + + ACE_Time_Value transmission_end_; + // Ending time of the most recent transmission. + +}; + +class Input_Device_Wrapper_Base : public ACE_Task_Base +{ + // = TITLE + // This class defines an abstract base class for an input device + // wrapper that hides the details of the specific device and + // provides a consistent message passing interface without + // knowing anything about the implementation of the input device + // or the message receiver. + // + // The abstract base class ctor takes a command template object + // that is instantiated with the correct receiver and action + // types. This command object is used to send newly created input + // messages to the receiver. + // + // The abstract base class is designed to operate in an active + // "push" mode, sending input data to the receiver whenever the + // data is ready. The underlying device may be active, notifying + // the wrapper when data is ready, or may be passive in which + // case the wrapper must rely on a reactive and/or polling + // mechanism. + // + // = DESCRIPTION + // Derived classes are responsible for filling in concrete + // definitions for the abstract message creation method and the + // svc method. +public: + // = Initialization and termination methods. + Input_Device_Wrapper_Base (ACE_Thread_Manager *input_task_mgr); + // Constructor. + + virtual ~Input_Device_Wrapper_Base (); + // Destructor. + + int set_send_input_msg_cmd (ACE_Command_Base *send_input_msg_cmd); + // Sets send input message command in the input device driver + // object. + + int set_input_period (u_long input_period); + // Sets period (in usecs) between when inputs are created. + + int set_send_count (long count); + // Sets count of messages to send. + + int request_stop (void); + // Requests that the input device stop sending messages and + // terminate its thread. Should return 1 if it will do so, 0 if it + // has already done so, or -1 if there is a problem doing so. + + virtual int svc (void); + // This method runs the input device loop in the new thread. + + virtual int modify_device_settings (void *) = 0; + // Provides an abstract interface to allow modifying device + // settings. + +protected: + virtual ACE_Message_Block *create_input_message (void) = 0; + // Creates a new message block, carrying data read from the + // underlying input device. + + virtual int send_input_message (ACE_Message_Block *); + // Sends a newly created message block, carrying data read from the + // underlying input device, by passing a pointer to the message + // block to its command execution. + + ACE_Command_Base *send_input_msg_cmd_; + // Send newly created input message. + + u_long input_period_; + // Period between when input values are produced (usecs). + + ACE_Reactor reactor_; + // Reactor used to multiplex input streams, timeouts. + + int is_active_; + // Flag to indicate whether or not input object is + // (and should remain) active. + + long send_count_; + // Count of messages to send before stopping (-1 indicates the + // device should not stop). + + long current_count_; + // Currently remaining count of messages to send before stopping + // (-1 indicates the device should not stop). + +}; + +class Output_Device_Wrapper_Base +{ + // = TITLE + // This class defines an abstract base class for an output device + // wrapper that hides the details of the specific device and + // provides a consistent write method interface without knowing + // anything about the implementation. + // + // = DESCRIPTION + // The abstract methods write_output_message () and + // modify_device_settings () are defined in derived classes to + // write the contents of the passed message out the underlying + // output device, and update device settings, respectively. +public: + + virtual ~Output_Device_Wrapper_Base (void); + + virtual int write_output_message (void *) = 0; + // Writes contents of the passed message block out to the underlying + // output device. + + virtual int modify_device_settings (void *) = 0; + // Provides an abstract interface to allow modifying device + // settings. +}; + +// include the templates +#include "BPR_Drivers_T.h" + +#endif /* _BPR_DRIVERS_H_ */ diff --git a/ACE/examples/Bounded_Packet_Relay/BPR_Drivers_T.cpp b/ACE/examples/Bounded_Packet_Relay/BPR_Drivers_T.cpp new file mode 100644 index 00000000000..6e597dfb9e5 --- /dev/null +++ b/ACE/examples/Bounded_Packet_Relay/BPR_Drivers_T.cpp @@ -0,0 +1,322 @@ +// $Id$ + +// ============================================================================ +// = LIBRARY +// examples +// +// = FILENAME +// BPR_Driver.cpp +// +// = DESCRIPTION +// This code builds an abstraction to factor out common code for +// the different implementations of the Timer_Queue. +// +// = AUTHORS +// Chris Gill <cdgill@cs.wustl.edu> and +// Douglas C. Schmidt <schmidt@cs.wustl.edu> +// +// Based on the Timer Queue Test example written by +// +// Carlos O'Ryan <coryan@cs.wustl.edu> and +// Douglas C. Schmidt <schmidt@cs.wustl.edu> and +// Sergio Flores-Gaitan <sergio@cs.wustl.edu> +// +// ============================================================================ + +#ifndef _BPR_DRIVER_T_CPP_ +#define _BPR_DRIVER_T_CPP_ + +// #include BPR_Drivers.h instead of BPR_Drivers_T.h +// to avoid problems with circular includes +#include "BPR_Drivers.h" +#include "ace/OS_NS_string.h" +#include "ace/OS_NS_unistd.h" + +ACE_RCSID(Bounded_Packet_Relay, BPR_Drivers_T, "$Id$") + +// Constructor. + +template <class TQ> +Bounded_Packet_Relay_Driver<TQ>::Bounded_Packet_Relay_Driver (void) + : packet_count_cmd_ (0), + arrival_period_cmd_ (0), + transmit_period_cmd_ (0), + duration_limit_cmd_ (0), + logging_level_cmd_ (0), + run_transmission_cmd_ (0), + cancel_transmission_cmd_ (0), + report_stats_cmd_ (0), + shutdown_cmd_ (0), + packet_count_ (1000), + arrival_period_ (10000), + send_period_ (10000), + duration_limit_ (20000000), + logging_level_ (0) +{ +} + +// Destructor. + +template <class TQ> +Bounded_Packet_Relay_Driver<TQ>::~Bounded_Packet_Relay_Driver (void) +{ + // delete all instantiated command objects + delete packet_count_cmd_; + delete arrival_period_cmd_; + delete transmit_period_cmd_; + delete duration_limit_cmd_; + delete logging_level_cmd_; + delete run_transmission_cmd_; + delete cancel_transmission_cmd_; + delete report_stats_cmd_; + delete shutdown_cmd_; +} + +// Parse the input and execute the corresponding command. + +template <class TQ> int +Bounded_Packet_Relay_Driver<TQ>::parse_commands (const char *buf) +{ + int option; + + if (::sscanf (buf, "%d", &option) <= 0) + // If there was an error reading the option simply try on the next + // line. + return 0; + + switch (option) + { + case 1: // set packet count + { + u_long count; + + // We just reread the option, this simplies parsing (since + // sscanf can do it for us). + if (::sscanf (buf, "%d %lu", &option, &count) < 2) + // If there was not enough information on the line, ignore + // option and try the next line. + return 0; + if (packet_count_cmd_->execute ((void *) &count) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%t %p\n", + "set packet count failed"), + -1); + break; + } + case 2: // Set the arrival period. + { + u_long usec; + + // We just reread the option, this simplies parsing (since + // sscanf can do it for us). + if (::sscanf (buf, "%d %lu", &option, &usec) < 2) + // If there was not enough information on the line, ignore + // option and try the next line. + return 0; + if (arrival_period_cmd_->execute ((void *) &usec) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%t %p\n", + "set arrival period failed"), + -1); + break; + } + case 3: // Set transmit period. + { + u_long usec; + + // We just reread the option, this simplies parsing (since + // sscanf can do it for us). + if (::sscanf (buf, "%d %lu", &option, &usec) < 2) + // If there was not enough information on the line, ignore + // option and try the next line. + return 0; + if (transmit_period_cmd_->execute ((void *) &usec) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%t %p\n", + "set transmit period failed"), + -1); + break; + } + case 4: // Set duration limit. + { + u_long usec; + + // We just reread the option, this simplies parsing (since + // sscanf can do it for us). + if (::sscanf (buf, "%d %lu", &option, &usec) < 2) + // If there was not enough information on the line, ignore + // option and try the next line. + return 0; + if (duration_limit_cmd_->execute ((void *) &usec) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%t %p\n", + "\nSet duration limit failed."), + -1); + break; + } + case 5: // Set logging level. + { + int level; + + // We just reread the option, this simplies parsing (since + // sscanf can do it for us). + if ((::sscanf (buf, "%d %d", &option, &level) < 2) || + (level < 0) || (level > 7)) + { + // If there was not enough information on the line, or the + // passed value was invalid, ignore and try again. + return 0; + } + + if (logging_level_cmd_->execute ((void *) &level) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%t %p\n", + "set logging level failed"), + -1); + break; + } + case 6: // Run one transmission. + return run_transmission_cmd_->execute (0); + /* NOTREACHED */ + case 7: // Cancel current transmission. + return cancel_transmission_cmd_->execute (0); + /* NOTREACHED */ + case 8: // Report statistics. + return report_stats_cmd_->execute (0); + /* NOTREACHED */ + case 9: // Shut down the driver. + return shutdown_cmd_->execute (0); + /* NOTREACHED */ + default: + // Display an error message. + ACE_ERROR_RETURN ((LM_ERROR, "invalid input %s\n", buf), 0); + ACE_NOTREACHED (break); + /* NOTREACHED */ + } /* ENDSWITCH */ + return 0; +} + +// Runs the test. + +template <class TQ> int +Bounded_Packet_Relay_Driver<TQ>::run (void) +{ + this->init (); + + // Process all the incoming events. + + for (;;) + if (this->get_next_request () == -1) + return -1; + + ACE_NOTREACHED (return 0); +} + +// Gets the next request from the user input. + +template <class TQ> int +Bounded_Packet_Relay_Driver<TQ>::get_next_request (void) +{ + char buf[BUFSIZ]; + + this->display_menu (); + + // Reads input from the user. + if (this->read_input (buf, sizeof buf) <= 0) + return -1; + + // Parse and run the command. + return this->parse_commands (buf); +} + +// Reads input from the user from ACE_STDIN into the buffer specified. + +template <class TQ> ssize_t +Bounded_Packet_Relay_Driver<TQ>::read_input (char *buf, size_t bufsiz) +{ + ACE_OS::memset (buf, 0, bufsiz); + + // Wait for user to type commands. This call is automatically + // restarted when SIGINT or SIGALRM signals occur. + return ACE_OS::read (ACE_STDIN, buf, bufsiz); +} + +// Get count of packets to send in a transmission. + +template <class TQ> u_long +Bounded_Packet_Relay_Driver<TQ>::packet_count (void) +{ + return packet_count_; +} + +// Set count of packets to send in a transmission. + +template <class TQ> void +Bounded_Packet_Relay_Driver<TQ>::packet_count (u_long pc) +{ + packet_count_ = pc; +} + +// Get rate at which input packets are to arrive. + +template <class TQ> u_long +Bounded_Packet_Relay_Driver<TQ>::arrival_period (void) +{ + return arrival_period_; +} + +// Set rate at which input packets are to arrive. + +template <class TQ> void +Bounded_Packet_Relay_Driver<TQ>::arrival_period (u_long ap) +{ + arrival_period_ = ap; +} + +// Get rate at which packets are to be relayed (usec). + +template <class TQ> u_long +Bounded_Packet_Relay_Driver<TQ>::send_period (void) +{ + return send_period_; +} + +// Set rate at which packets are to be relayed (usec). + +template <class TQ> void +Bounded_Packet_Relay_Driver<TQ>::send_period (u_long sp) +{ + send_period_ = sp; +} + +// Get limit on the duration of the transmission (usec). + +template <class TQ> u_long +Bounded_Packet_Relay_Driver<TQ>::duration_limit (void) +{ + return duration_limit_; +} + +// Set limit on the duration of the transmission (usec). + +template <class TQ> void +Bounded_Packet_Relay_Driver<TQ>::duration_limit (u_long dl) +{ + duration_limit_ = dl; +} +// Get logging level. + +template <class TQ> int +Bounded_Packet_Relay_Driver<TQ>::logging_level (void) +{ + return logging_level_; +} + +// Set logging level. + +template <class TQ> void +Bounded_Packet_Relay_Driver<TQ>::logging_level (int ll) +{ + logging_level_ = ll; +} +#endif /* _BPR_DRIVER_T_CPP_ */ diff --git a/ACE/examples/Bounded_Packet_Relay/BPR_Drivers_T.h b/ACE/examples/Bounded_Packet_Relay/BPR_Drivers_T.h new file mode 100644 index 00000000000..4d0ca6bdfb4 --- /dev/null +++ b/ACE/examples/Bounded_Packet_Relay/BPR_Drivers_T.h @@ -0,0 +1,178 @@ +/* -*- C++ -*- */ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// examples +// +// = FILENAME +// BPR_Drivers_T.h +// +// = DESCRIPTION +// This code factors out common class templates for use in +// the different possible implementations of the Timer_Queue +// based bounded packet relay example. +// +// = AUTHORS +// Chris Gill <cdgill@cs.wustl.edu> and +// Douglas C. Schmidt <schmidt@cs.wustl.edu> +// +// Based on the Timer Queue Test example written by +// +// Carlos O'Ryan <coryan@cs.wustl.edu> and +// Douglas C. Schmidt <schmidt@cs.wustl.edu> and +// Sergio Flores-Gaitan <sergio@cs.wustl.edu> +// +// ============================================================================ + +#ifndef _BPR_DRIVERS_T_H_ +#define _BPR_DRIVERS_T_H_ + +#include "ace/Functor.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +// Forward declarations. +class Input_Device_Wrapper_Base; +class Output_Device_Wrapper_Base; + +template <class TQ> +class Bounded_Packet_Relay_Driver +{ + // = TITLE + // This abstract base class provides a simple abstraction for a + // test driver for the bounded packet relay example. + // + // = DESCRIPTION + // This is the place where the common code to test the different + // implementations of the timer queue resides. This class has + // the logic for the parse_commands () method, the run (), + // read_input () and get_next_request () methods. Subclasses can + // override these methods if there is some logic that is specific + // to that implementation. +public: + Bounded_Packet_Relay_Driver (void); + // Constructor. + + virtual ~Bounded_Packet_Relay_Driver (void); + // Destructor. + + virtual int parse_commands (const char *buf); + // Breaks up the input string buffer into pieces and executes the + // appropriate method to handle that operation. + + virtual int run (void); + // This is the main entry point for the driver. The user of the + // class should normally invoke this method. Returns 0 when + // successful, or 0 otherwise. + + virtual int get_next_request (void); + // This internal method gets the next request from the user. + // Returns -1 when user wants to exit. Returns 0 otherwise. + + virtual ssize_t read_input (char *buf, size_t bufsiz); + // Reads input from the user into the buffer <buf> with a maximum of + // <bufsiz> bytes. Returns the amount of bytes actually read + // Otherwise, a -1 is returned and errno is set to indicate the + // error. + + virtual int display_menu (void)=0; + // Prints the user interface for the driver to STDERR. + + virtual int init (void)=0; + // Initializes values and operations for the driver. + + u_long packet_count (void); + // Get count of packets to send in a transmission. + + void packet_count (u_long pc); + // Set count of packets to send in a transmission. + + u_long arrival_period (void); + // Get rate at which input packets are to arrive. + + void arrival_period (u_long ap); + // Set rate at which input packets are to arrive. + + u_long send_period (void); + // Get rate at which packets are to be relayed (usec). + + void send_period (u_long sp); + // Set rate at which packets are to be relayed (usec). + + u_long duration_limit (void); + // Get limit on the duration of the transmission (usec). + + void duration_limit (u_long dl); + // Set limit on the duration of the transmission (usec). + + int logging_level (void); + // Get logging level. + + void logging_level (int ll); + // Set logging level. + +protected: + // = Major Driver Mechanisms + + TQ timer_queue_; + // Timer queue for transmission timeouts. + + // = Set of commands to be executed. + + ACE_Command_Base *packet_count_cmd_; + // Set packet count command. + + ACE_Command_Base *arrival_period_cmd_; + // Set arrival period command. + + ACE_Command_Base *transmit_period_cmd_; + // Set transmit period command. + + ACE_Command_Base *duration_limit_cmd_; + // Set duration limit command. + + ACE_Command_Base *logging_level_cmd_; + // Set logging level command. + + ACE_Command_Base *run_transmission_cmd_; + // Run transmission command. + + ACE_Command_Base *cancel_transmission_cmd_; + // Cancel transmission command. + + ACE_Command_Base *report_stats_cmd_; + // Report statistics command. + + ACE_Command_Base *shutdown_cmd_; + // Shut down the driver. + +private: + u_long packet_count_; + // Count of packets to send in a transmission. + + u_long arrival_period_; + // Rate at which input packets are to arrive. + + u_long send_period_; + // Rate at which packets are to be relayed (usec). + + u_long duration_limit_; + // Limit on the duration of the transmission (usec). + + int logging_level_; + // Logging level. +}; + +#if defined (ACE_TEMPLATES_REQUIRE_SOURCE) +#include "BPR_Drivers_T.cpp" +#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */ + +#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA) +#pragma implementation ("BPR_Drivers_T.cpp") +#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */ + +#endif /* _BPR_DRIVERS_T_H_ */ diff --git a/ACE/examples/Bounded_Packet_Relay/Bounded_Packet_Relay.mpc b/ACE/examples/Bounded_Packet_Relay/Bounded_Packet_Relay.mpc new file mode 100644 index 00000000000..087cc5ac3be --- /dev/null +++ b/ACE/examples/Bounded_Packet_Relay/Bounded_Packet_Relay.mpc @@ -0,0 +1,7 @@ +// -*- MPC -*- +// $Id$ + +project : aceexe { + exename = bpr_thread + macros += ACE_HAS_DEFERRED_TIMER_COMMANDS +} diff --git a/ACE/examples/Bounded_Packet_Relay/Makefile.am b/ACE/examples/Bounded_Packet_Relay/Makefile.am new file mode 100644 index 00000000000..5fb14ba2d05 --- /dev/null +++ b/ACE/examples/Bounded_Packet_Relay/Makefile.am @@ -0,0 +1,39 @@ +## Process this file with automake to create Makefile.in +## +## $Id$ +## +## This file was generated by MPC. Any changes made directly to +## this file will be lost the next time it is generated. +## +## MPC Command: +## /acebuilds/ACE_wrappers-repository/bin/mwc.pl -include /acebuilds/MPC/config -include /acebuilds/MPC/templates -feature_file /acebuilds/ACE_wrappers-repository/local.features -noreldefs -type automake -exclude build,Kokyu + +ACE_BUILDDIR = $(top_builddir) +ACE_ROOT = $(top_srcdir) + +## Makefile.Bounded_Packet_Relay.am +noinst_PROGRAMS = bpr_thread + +bpr_thread_CPPFLAGS = \ + -I$(ACE_ROOT) \ + -I$(ACE_BUILDDIR) \ + -DACE_HAS_DEFERRED_TIMER_COMMANDS + +bpr_thread_SOURCES = \ + BPR_Drivers.cpp \ + Thread_Bounded_Packet_Relay.cpp \ + bpr_thread.cpp \ + BPR_Drivers.h \ + BPR_Drivers_T.h \ + Thread_Bounded_Packet_Relay.h + +bpr_thread_LDADD = \ + $(ACE_BUILDDIR)/ace/libACE.la + +## Clean up template repositories, etc. +clean-local: + -rm -f *~ *.bak *.rpo *.sym lib*.*_pure_* core core.* + -rm -f gcctemp.c gcctemp so_locations *.ics + -rm -rf cxx_repository ptrepository ti_files + -rm -rf templateregistry ir.out + -rm -rf ptrepository SunWS_cache Templates.DB diff --git a/ACE/examples/Bounded_Packet_Relay/README b/ACE/examples/Bounded_Packet_Relay/README new file mode 100644 index 00000000000..5757224c510 --- /dev/null +++ b/ACE/examples/Bounded_Packet_Relay/README @@ -0,0 +1,194 @@ +1. INTRODUCTION + +This directory contains an example that illustrates how to use both +threaded and reactive concurrency mechanisms in ACE. The example +application schedules and processes heterogenerous user input and +timer-based events in the context of a bounded packet relay mechanism. + +In this example, a transmission begins, packets arrive from an input device +object, and are transferred to an output device object by a relay object at +a specified pace. The transfer continues until all packets have been +relayed, a duration limit expires, or the transmission is cancelled. + +User input is handled concurrently with a running transmission. You +can run a transmission, cancel a transmission, change transmission +parameters, view statistics from the most recent transmission, or exit +the program, using selections from an interactive text-based menu. +In addition, the example program can be run in batch mode, with the +appropriate commands piped to the program's standard input stream. + +Transmission parameters are intialized to default values. Transmission +parameter values persist until/unless they are subsequently modified by an +appropriate command. If an invalid value for a command is given, or a run +or report command is issued while a transmission is in progress, the +offending command has no effect, and an error message is generated. + +2. USER INTERFACE + +Commands that can be given to the program include the following: + +Settings commands: + + 1 <number of packets to relay in one transmission> + + Minimum value is 1 packet, defaults to 1000 packets. + + 2 <input packet arrival period (in usec)> + + Minimum value is 1 usec, defaults to 10000 usec (10 msec). + + 3 <output packet transmission period (in usec)> + + Minimum value is 1 usec, defaults to 10000 usec (10 msec). + + 4 <limit on duration of transmission (in usec)> + + Minimum value is 1 usec, defaults to 20000000 usec (20 sec). + A value of 0 is also a valid input, in which case no limit + will be placed on the duration of the transmission (it will + end when all packets have been relayed from the input device + to the output device). + + 5 <logging level> + + 0 - does no logging + 1 - logs packets created by the input device + 2 - logs packets consumed by the output device + 4 - prints contents of packets consumed by the output device + + To set several flags, pass their sum as the logging level value: + e.g., a logging level value of 7 turns on all possible logging. + +Action commands: + + 6 - runs a transmission using the current settings + + 7 - cancels a transmission if there is one running + + 8 - reports statistics from the most recent transmission + + 9 - quits the program + +3. APPLICATION DESIGN + +3.1. KEY COMPONENTS + +The design of this example application consists of four main +components: the driver object, the relay object, the input device +object, and the output device object. + +The driver object is responsible for receiving user input and overall handling +of user input commands. The driver object is active, with separate threads +for receiving user input and managing its transmission timer queue. This +allows the user to issue commands while a transmission is in progress. The +driver object uses an ACE_Thread_Timer_Queue_Adapter, which is derived from +ACE_Task_Base. The driver object starts another active object, called +User_Input_Task, which is also derived from ACE_Task_Base. This allows both +the timer queue and the user input object to be made active, running in their +own threads of control. + +The relay object is passive, managing a message queue and necessary +locks to allow safe access from multiple threads. It provides methods +to receive and enqueue a mesage from the input device, dequeue a +message and send it to the output device, and to start or end a +transmission. It uses ACE_Message_Queue (which contains ACE_Message_Block +objects) and ACE_Thread_Mutex objects to implement this functionality. + +The input object is active, managing timeouts and input events in its +own thread. The input object is also reactive, using an ACE_Reactor to +allow response to multiple input handles as well as to do polling at +specific timeouts. The input pseudo-device wrapper in this example +does not make use of input handles and only does timed polling, but +extending this only requires registering the appropriate input handles +and handlers with the reactor. The input object is derived from +ACE_Task_Base, and is activated by the relay object when a new +transmission starts. The input object packages each data packet in an +ACE_Message_Block before sending it to the relay object. + +The output object is passive. If logging is turned on, it will report +the arrival time, relative to the start of the transmission, of each +output message, and the contents of that message. The output object +will also "consume" each ACE_Message_Block passed to it, calling +delete on the passed pointer. + +3.2. RUN-TIME CHARACTERSITICS + +When the user initiates a transmission, the appropriate settings are passed +by the driver to the relay object's start transmission method. The relay +object tries to start a new transmission. If another transmission is in +progress, the method returns an error. Otherwise, the relay object's start +transmission method initializes itself and the input and output device +objects, activates the input device object, and stores the handle for +the new input device thread. + +The driver then constructs a timeout handler with a count of the +number of messages to relay and a send timeout value, and pushes a +timer with this handler onto the timer queue. If there is a limit on +the duration of the transmission, the driver constructs a different +handler for end-of-transmission, and pushes a timer for the end of +the transmission with this handler onto the timer queue as well. When +the user issues a cancel transmission command, the driver calls the +relay's end transmission method. + +When a send timeout expires, the handler (running in the timer queue +thread) calls the send method of the relay. If there are any enqueued +messages from the input device object in its queue, the relay object's +send method will dequeue a message, pass it to the output device +object, and return success. If there are no messages in the queue, +the relay object's send method will return failure. If the send was +successful, the handler will decrement its count of remaining +messages. If the count is greater than zero, the handler will +register a new send timer for itself and exit. If the count is zero, +then the handler will call the relay's end transmission method, clear +the timer queue, and mark itself inactive before exiting. + +Similarly, if the end-of-transmission timer expires before all packets +have been sent, that handler will call the relay's end transmission +method, clear the timer queue, release the semaphore, and then exit. + +When the relay's end transmission method is called, it marks the relay +itself inactive, and calls the input device's deactivate method, which +sets the input device's activity flag to inactive (see below). The +relay's end transmission method then waits until the input device thread +exits, before returning. + +While it is still active, the input device thread blocks on a reactor +timeout for the duration it was given by the relay. When the timeout +expires, the input device checks a flag to see if it is still active. +If the flag says the input device is inactive, the thread simply +exits. This allows cancellation of the input device when a +transmission has ended. If the flag says it is still active, the +thread builds a new character buffer, and wraps this with a new +ACE_Message_Block. The input device passes this message block to the +execution method of the send input message command object with which +it was constructed. This level of indirection allows the input device +to be used with arbitrary types, so that it could for example be +connected directly to an output device. The input device also +maintains a count of the number of messages it has sent. Once the +input device has sent all its messages, it marks itself inactive, and +its thread simply exits. + +4. ACCESSING THE SOURCE CODE + +The files for this example are located in +$ACE_ROOT/examples/Bounded_Packet_Relay in the latest release of ACE, +which is located at + +http://www.cs.wustl.edu/~schmidt/ACE_wrappers/ACE.tar.gz + +Source Files: Thread_Bounded_Packet_Relay.h + Thread_Bounded_Packet_Relay.cpp + BPR_Driver.h + BPR_Driver.cpp + BPR_Driver_T.h + BPR_Driver_T.cpp + bpr_thread.cpp + +Make file: Makefile + +Doc file: README (this file) + +Executable: bpr_thread + + + diff --git a/ACE/examples/Bounded_Packet_Relay/Thread_Bounded_Packet_Relay.cpp b/ACE/examples/Bounded_Packet_Relay/Thread_Bounded_Packet_Relay.cpp new file mode 100644 index 00000000000..a2f179a5348 --- /dev/null +++ b/ACE/examples/Bounded_Packet_Relay/Thread_Bounded_Packet_Relay.cpp @@ -0,0 +1,770 @@ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// examples +// +// = FILENAME +// Thread_Bounded_Packet_Relay.cpp +// +// = DESCRIPTION +// Method definitions for the threaded-bounded packet relay class. +// +// = AUTHORS +// Chris Gill <cdgill@cs.wustl.edu> and +// Douglas C. Schmidt <schmidt@cs.wustl.edu> +// +// Based on the Timer Queue Test example written by +// +// Carlos O'Ryan <coryan@cs.wustl.edu> and +// Douglas C. Schmidt <schmidt@cs.wustl.edu> and +// Sergio Flores-Gaitan <sergio@cs.wustl.edu> +// +// ============================================================================ + +#include "ace/OS_NS_string.h" +#include "ace/OS_NS_sys_time.h" +#include "ace/Condition_T.h" +#include "ace/Null_Mutex.h" + +#include "Thread_Bounded_Packet_Relay.h" + +typedef Thread_Bounded_Packet_Relay_Driver::MYCOMMAND DRIVER_CMD; +typedef ACE_Command_Callback<BPR_Handler_Base, BPR_Handler_Base::ACTION> HANDLER_CMD; +typedef ACE_Command_Callback<Send_Handler, Send_Handler::ACTION> SEND_HANDLER_CMD; + +ACE_RCSID(Bounded_Packet_Relay, Thread_Bounded_Packet_Relay, "$Id$") + +// Constructor. + +Text_Input_Device_Wrapper::Text_Input_Device_Wrapper (ACE_Thread_Manager *input_task_mgr, + size_t read_length, + const char* text, + int logging) + : Input_Device_Wrapper_Base (input_task_mgr), + read_length_ (read_length), + text_ (text), + index_ (0), + logging_ (logging), + packet_count_ (0) + +{ +} + +// Destructor. + +Text_Input_Device_Wrapper::~Text_Input_Device_Wrapper (void) +{ +} + +// Modifies device settings based on passed pointer to a u_long. + +int +Text_Input_Device_Wrapper::modify_device_settings (void *logging) +{ + packet_count_ = 0; + + if (logging) + logging_ = *static_cast<int *> (logging); + else + ACE_ERROR_RETURN ((LM_ERROR, + "Text_Input_Device_Wrapper::modify_device_settings: " + "null argument"), + -1); + return 0; +} + + +// Creates a new message block, carrying data +// read from the underlying input device. + +ACE_Message_Block * +Text_Input_Device_Wrapper::create_input_message (void) +{ + + // Construct a new message block to send. + ACE_Message_Block *mb; + ACE_NEW_RETURN (mb, + ACE_Message_Block (read_length_), + 0); + + // Zero out a "read" buffer to hold data. + char read_buf [BUFSIZ]; + ACE_OS::memset (read_buf, 0, BUFSIZ); + + // Loop through the text, filling in data to copy into the read + // buffer (leaving room for a terminating zero). + for (size_t i = 0; i < read_length_ - 1; ++i) + { + read_buf [i] = text_ [index_]; + index_ = (index_ + 1) % ACE_OS::strlen (text_); + } + + // Copy buf into the Message_Block and update the wr_ptr (). + if (mb->copy (read_buf, read_length_) < 0) + { + delete mb; + ACE_ERROR_RETURN ((LM_ERROR, + "read buffer copy failed"), + 0); + } + + // log packet creation if logging is turned on + if (logging_ & Text_Input_Device_Wrapper::LOG_MSGS_CREATED) + { + ++packet_count_; + ACE_DEBUG ((LM_DEBUG, "input message %d created\n", + packet_count_)); + } + + return mb; +} + +// Constructor. + +Text_Output_Device_Wrapper::Text_Output_Device_Wrapper (int logging) + : logging_ (logging) +{ +} + +// Consume and possibly print out the passed message. + +int +Text_Output_Device_Wrapper::write_output_message (void *message) +{ + if (message) + { + ++packet_count_; + + if (logging_ & Text_Output_Device_Wrapper::LOG_MSGS_RCVD) + ACE_DEBUG ((LM_DEBUG, "output message %d received\n", + packet_count_)); + + if (logging_ & Text_Output_Device_Wrapper::PRINT_MSGS_RCVD) + ACE_DEBUG ((LM_DEBUG, "output message %d:\n[%s]\n", + packet_count_, + static_cast<ACE_Message_Block *> (message)-> + rd_ptr ())); + + delete static_cast<ACE_Message_Block *> (message); + return 0; + } + ACE_ERROR_RETURN ((LM_ERROR, + "Text_Output_Device_Wrapper::" + "write_output_message: null argument"), -1); +} + +// Modifies device settings based on passed pointer to a u_long. + +int +Text_Output_Device_Wrapper::modify_device_settings (void *logging) +{ + packet_count_ = 0; + + if (logging) + logging_ = *static_cast<int *> (logging); + else + ACE_ERROR_RETURN ((LM_ERROR, + "Text_Output_Device_Wrapper::modify_device_settings: " + "null argument"), + -1); + return 0; +} + +// Constructor. + +User_Input_Task::User_Input_Task (Bounded_Packet_Relay *relay, + Thread_Timer_Queue *queue, + Thread_Bounded_Packet_Relay_Driver &tbprd) + : ACE_Task_Base (ACE_Thread_Manager::instance ()), + usecs_ (ACE_ONE_SECOND_IN_USECS), + relay_ (relay), + queue_ (queue), + driver_ (tbprd) +{ +} + +// Destructor. + +User_Input_Task::~User_Input_Task (void) +{ + this->clear_all_timers (); +} + +// Runs the main event loop. + +int +User_Input_Task::svc (void) +{ + for (;;) + // Call back to the driver's implementation of how to read and + // parse input. + if (this->driver_.get_next_request () == -1) + break; + + // We are done. + this->relay_->end_transmission (Bounded_Packet_Relay::CANCELLED); + this->queue_->deactivate (); + ACE_DEBUG ((LM_DEBUG, + "terminating user input thread\n")); + this->clear_all_timers (); + return 0; +} + +// Sets the number of packets for the next transmission. + +int +User_Input_Task::set_packet_count (void *argument) +{ + if (argument) + { + driver_.packet_count (*static_cast<int *> (argument)); + return 0; + } + ACE_ERROR_RETURN ((LM_ERROR, + "User_Input_Task::set_packet_count: null argument"), + -1); +} + +// Sets the input device packet arrival period (usecs) for the next +// transmission. + +int +User_Input_Task::set_arrival_period (void *argument) +{ + if (argument) + { + driver_.arrival_period (*static_cast<int *> (argument)); + return 0; + } + ACE_ERROR_RETURN ((LM_ERROR, + "User_Input_Task::set_arrival_period: null argument"), + -1); +} + +// Sets the period between output device sends (usecs) for the next +// transmission. + +int +User_Input_Task::set_send_period (void *argument) +{ + if (argument) + { + driver_.send_period (*static_cast<int *> (argument)); + return 0; + } + ACE_ERROR_RETURN ((LM_ERROR, + "User_Input_Task::set_send_period: null argument"), + -1); +} + +// Sets a limit on the transmission duration (usecs). + +int +User_Input_Task::set_duration_limit (void *argument) +{ + if (argument) + { + driver_.duration_limit (*static_cast<int *> (argument)); + return 0; + } + ACE_ERROR_RETURN ((LM_ERROR, + "User_Input_Task::set_duration_limit: null argument"), + -1); +} + +// Sets logging level (0 or 1) for output device for the next +// transmission. + +int +User_Input_Task::set_logging_level (void *argument) +{ + if (argument) + { + driver_.logging_level (*static_cast<int *> (argument)); + return 0; + } + ACE_ERROR_RETURN ((LM_ERROR, + "User_Input_Task::set_logging_level: null argument"), + -1); +} + +// Runs the next transmission (if one is not in progress). + +int +User_Input_Task::run_transmission (void *) +{ + if (relay_) + { + switch (relay_->start_transmission (driver_.packet_count (), + driver_.arrival_period (), + driver_.logging_level ())) + { + case 1: + ACE_DEBUG ((LM_DEBUG, + "\nRun transmission: " + " Transmission already in progress\n")); + return 0; + /* NOTREACHED */ + case 0: + { + ACE_Time_Value now = ACE_OS::gettimeofday (); + ACE_Time_Value send_every (0, driver_.send_period ()); + ACE_Time_Value send_at (send_every + now); + + Send_Handler *send_handler; + ACE_NEW_RETURN (send_handler, + Send_Handler (driver_.packet_count (), + send_every, + *relay_, + *queue_, + driver_), + -1); + if (queue_->schedule (send_handler, 0, send_at) < 0) + { + delete send_handler; + ACE_ERROR_RETURN ((LM_ERROR, + "User_Input_Task::run_transmission: " + "failed to schedule send handler"), + -1); + } + if (driver_.duration_limit ()) + { + ACE_Time_Value terminate_at (0, driver_.duration_limit ()); + terminate_at += now; + + Termination_Handler *termination_handler; + + termination_handler = + new Termination_Handler (*relay_, + *queue_, + driver_); + + if (! termination_handler) + { + this->clear_all_timers (); + ACE_ERROR_RETURN ((LM_ERROR, + "User_Input_Task::run_transmission: " + "failed to allocate termination " + "handler"), + -1); + } + if (queue_->schedule (termination_handler, + 0, terminate_at) < 0) + { + delete termination_handler; + this->clear_all_timers (); + ACE_ERROR_RETURN ((LM_ERROR, + "User_Input_Task::run_transmission: " + "failed to schedule termination " + "handler"), + -1); + } + } + return 0; + } + /* NOTREACHED */ + default: + return -1; + /* NOTREACHED */ + } + } + ACE_ERROR_RETURN ((LM_ERROR, + "User_Input_Task::run_transmission: " + "relay not instantiated"), + -1); +} + +// Ends the current transmission (if one is in progress). + +int +User_Input_Task::end_transmission (void *) +{ + if (relay_) + { + switch (relay_->end_transmission (Bounded_Packet_Relay::CANCELLED)) + { + case 1: + ACE_DEBUG ((LM_DEBUG, + "\nEnd transmission: " + "no transmission in progress\n")); + /* Fall through to next case */ + case 0: + // Cancel any remaining timers. + this->clear_all_timers (); + return 0; + /* NOTREACHED */ + default: + return -1; + /* NOTREACHED */ + } + } + ACE_ERROR_RETURN ((LM_ERROR, + "User_Input_Task::end_transmission: " + "relay not instantiated"), + -1); +} + +// Reports statistics for the previous transmission +// (if one is not in progress). + +int +User_Input_Task::report_stats (void *) +{ + if (relay_) + { + switch (relay_->report_statistics ()) + { + case 1: + ACE_DEBUG ((LM_DEBUG, + "\nRun transmission: " + "\ntransmission already in progress\n")); + return 0; + /* NOTREACHED */ + + case 0: + this->clear_all_timers (); + return 0; + /* NOTREACHED */ + + default: + return -1; + /* NOTREACHED */ + } + } + ACE_ERROR_RETURN ((LM_ERROR, + "User_Input_Task::report_stats: " + "relay not instantiated"), + -1); +} + +// Shut down the task. + +int +User_Input_Task::shutdown (void *) +{ + // Clear any outstanding timers. + this->clear_all_timers (); + +#if !defined (ACE_LACKS_PTHREAD_CANCEL) + // Cancel the thread timer queue task "preemptively." + ACE_Thread::cancel (this->queue_->thr_id ()); +#else + // Cancel the thread timer queue task "voluntarily." + this->queue_->deactivate (); +#endif /* ACE_LACKS_PTHREAD_CANCEL */ + + // -1 indicates we are shutting down the application. + return -1; +} + +// Helper method: clears all timers. + +int +User_Input_Task::clear_all_timers (void) +{ + // loop through the timers in the queue, cancelling each one + for (ACE_Timer_Node_T <ACE_Event_Handler *> *node; + (node = queue_->timer_queue ()->get_first ()) != 0; + ) + queue_->timer_queue ()->cancel (node->get_timer_id (), 0, 0); + + return 0; +} + +// Constructor. + +BPR_Handler_Base::BPR_Handler_Base (Bounded_Packet_Relay &relay, + Thread_Timer_Queue &queue) + : relay_ (relay), + queue_ (queue) +{ +} + +// Destructor. + +BPR_Handler_Base::~BPR_Handler_Base (void) +{ +} + +// Helper method: clears all timers. + +int +BPR_Handler_Base::clear_all_timers (void *) +{ + // Loop through the timers in the queue, cancelling each one. + + for (ACE_Timer_Node_T <ACE_Event_Handler *> *node; + (node = queue_.timer_queue ()->get_first ()) != 0; + ) + queue_.timer_queue ()->cancel (node->get_timer_id (), 0, 0); + // queue_.cancel (node->get_timer_id (), 0); + + // Invoke the handler's (virtual) destructor + delete this; + + return 0; +} + +// Constructor. + +Send_Handler::Send_Handler (u_long send_count, + const ACE_Time_Value &duration, + Bounded_Packet_Relay &relay, + Thread_Timer_Queue &queue, + Thread_Bounded_Packet_Relay_Driver &driver) + : BPR_Handler_Base (relay, queue), + send_count_ (send_count), + duration_ (duration), + driver_ (driver) +{ +} + +// Destructor. + +Send_Handler::~Send_Handler (void) +{ +} + +// Call back hook. + +int +Send_Handler::handle_timeout (const ACE_Time_Value &, + const void *) +{ + switch (relay_.send_input ()) + { + case 0: + // Decrement count of packets to relay. + --send_count_; + /* Fall through to next case. */ + case 1: + if (send_count_ > 0) + { + // Enqueue a deferred callback to the reregister command. + SEND_HANDLER_CMD *re_register_callback_; + ACE_NEW_RETURN (re_register_callback_, + SEND_HANDLER_CMD (*this, + &Send_Handler::reregister), + -1); + return queue_.enqueue_command (re_register_callback_); + } + else + { + // All packets are sent, time to end the transmission, redisplay + // the user menu, cancel any other timers, and go away. + relay_.end_transmission (Bounded_Packet_Relay::COMPLETED); + driver_.display_menu (); + + // Enqueue a deferred callback to the clear_all_timers command. + HANDLER_CMD *clear_timers_callback_; + ACE_NEW_RETURN (clear_timers_callback_, + HANDLER_CMD (*this, + &BPR_Handler_Base::clear_all_timers), + -1); + return queue_.enqueue_command (clear_timers_callback_); + } + /* NOTREACHED */ + default: + return -1; + } +} + +// Cancellation hook. + +int +Send_Handler::cancelled (void) +{ + delete this; + return 0; +} + +// Helper method: re-registers this timer + +int +Send_Handler::reregister (void *) +{ + // Re-register the handler for a new timeout. + if (queue_.schedule (this, + 0, + duration_ + ACE_OS::gettimeofday ()) < 0) + ACE_ERROR_RETURN ((LM_ERROR, + "Send_Handler::reregister: " + "failed to reschedule send handler"), + -1); + + return 0; +} + + +// Constructor. + +Termination_Handler::Termination_Handler (Bounded_Packet_Relay &relay, + Thread_Timer_Queue &queue, + Thread_Bounded_Packet_Relay_Driver &driver) + : BPR_Handler_Base (relay, queue), + driver_ (driver) +{ +} + +// Destructor. + +Termination_Handler::~Termination_Handler (void) +{ +} + +// Call back hook. + +int +Termination_Handler::handle_timeout (const ACE_Time_Value &, + const void *) +{ + // Transmission timed out, so end the transmission, display the user + // menu, and register a callback to clear the timer queue and then + // make this object go away. + relay_.end_transmission (Bounded_Packet_Relay::TIMED_OUT); + driver_.display_menu (); + + // Enqueue a deferred callback to the clear_all_timers command. + HANDLER_CMD *clear_timers_callback_; + ACE_NEW_RETURN (clear_timers_callback_, + HANDLER_CMD (*this, + &BPR_Handler_Base::clear_all_timers), + -1); + return queue_.enqueue_command (clear_timers_callback_); +} + +// Cancellation hook + +int +Termination_Handler::cancelled (void) +{ + delete this; + return 0; +} + +// Constructor. + +Thread_Bounded_Packet_Relay_Driver::Thread_Bounded_Packet_Relay_Driver (Bounded_Packet_Relay *relay) + : input_task_ (relay, &timer_queue_, *this) +{ +} + +// Destructor. + +Thread_Bounded_Packet_Relay_Driver::~Thread_Bounded_Packet_Relay_Driver (void) +{ +} + +// Display the user menu. + +int +Thread_Bounded_Packet_Relay_Driver::display_menu (void) +{ + static char menu[] = + "\n\n Options:\n" + " ----------------------------------------------------------------------\n" + " 1 <number of packets to relay in one transmission = %d>\n" + " min = 1 packet.\n" + " 2 <input packet arrival period (in usec) = %d>\n" + " min = 1.\n" + " 3 <output packet transmission period (in usec) = %d>\n" + " min = 1.\n" + " 4 <limit on duration of transmission (in usec) = %d>\n" + " min = 1, no limit = 0.\n" + " 5 <logging level flags = %d>\n" + " no logging = 0,\n" + " log packets created by input device = 1,\n" + " log packets consumed by output device = 2,\n" + " logging options 1,2 = 3,\n" + " print contents of packets consumed by output put device = 4,\n" + " logging options 1,4 = 5,\n" + " logging options 2,4 = 6,\n" + " logging options 1,2,4 = 7.\n" + " ----------------------------------------------------------------------\n" + " 6 - runs a transmission using the current settings\n" + " 7 - cancels a transmission (if there is one running)\n" + " 8 - reports statistics from the most recent transmission\n" + " 9 - quits the program\n" + " ----------------------------------------------------------------------\n" + " Please enter your choice: "; + + ACE_DEBUG ((LM_DEBUG, + menu, + this->packet_count (), + this->arrival_period (), + this->send_period (), + this->duration_limit (), + this->logging_level ())); + + return 0; +} + +// Initialize the driver. + +int +Thread_Bounded_Packet_Relay_Driver::init (void) +{ + // Initialize the <Command> objects with their corresponding + // methods from <User_Input_Task>. + ACE_NEW_RETURN (packet_count_cmd_, + DRIVER_CMD (input_task_, + &User_Input_Task::set_packet_count), + -1); + ACE_NEW_RETURN (arrival_period_cmd_, + DRIVER_CMD (input_task_, + &User_Input_Task::set_arrival_period), + -1); + ACE_NEW_RETURN (transmit_period_cmd_, + DRIVER_CMD (input_task_, + &User_Input_Task::set_send_period), + -1); + ACE_NEW_RETURN (duration_limit_cmd_, + DRIVER_CMD (input_task_, + &User_Input_Task::set_duration_limit), + -1); + ACE_NEW_RETURN (logging_level_cmd_, + DRIVER_CMD (input_task_, + &User_Input_Task::set_logging_level), + -1); + ACE_NEW_RETURN (run_transmission_cmd_, + DRIVER_CMD (input_task_, + &User_Input_Task::run_transmission), + -1); + ACE_NEW_RETURN (cancel_transmission_cmd_, + DRIVER_CMD (input_task_, + &User_Input_Task::end_transmission), + -1); + ACE_NEW_RETURN (report_stats_cmd_, + DRIVER_CMD (input_task_, + &User_Input_Task::report_stats), + -1); + ACE_NEW_RETURN (shutdown_cmd_, + DRIVER_CMD (input_task_, + &User_Input_Task::shutdown), + -1); + if (this->input_task_.activate () == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "cannot activate input task"), + -1); + else if (this->timer_queue_.activate () == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "cannot activate timer queue"), + -1); + else if (ACE_Thread_Manager::instance ()->wait () == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "wait on Thread_Manager failed"), + -1); + return 0; +} + +// Run the driver + +int +Thread_Bounded_Packet_Relay_Driver::run (void) +{ + this->init (); + return 0; +} + diff --git a/ACE/examples/Bounded_Packet_Relay/Thread_Bounded_Packet_Relay.h b/ACE/examples/Bounded_Packet_Relay/Thread_Bounded_Packet_Relay.h new file mode 100644 index 00000000000..ccd5782b487 --- /dev/null +++ b/ACE/examples/Bounded_Packet_Relay/Thread_Bounded_Packet_Relay.h @@ -0,0 +1,400 @@ +/* -*- C++ -*- */ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// examples +// +// = FILENAME +// Thread_Bounded_Packet_Relay.h +// +// = DESCRIPTION +// This code provides a thread based implementation +// of the bounded packet relay example. +// +// = AUTHORS +// Chris Gill <cdgill@cs.wustl.edu> and +// Douglas C. Schmidt <schmidt@cs.wustl.edu> +// +// Based on the Timer Queue Test example written by +// +// Carlos O'Ryan <coryan@cs.wustl.edu> and +// Douglas C. Schmidt <schmidt@cs.wustl.edu> and +// Sergio Flores-Gaitan <sergio@cs.wustl.edu> +// +// ============================================================================ + +#ifndef _THREAD_BOUNDED_PACKET_RELAY_H_ +#define _THREAD_BOUNDED_PACKET_RELAY_H_ + +#include "ace/Functor_T.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "ace/Task.h" +#include "ace/Timer_Heap_T.h" +#include "ace/Timer_Queue_Adapters.h" +#include "BPR_Drivers.h" + +// These typedefs ensure that we use the minimal amount of locking +// necessary. +typedef ACE_Event_Handler_Handle_Timeout_Upcall<ACE_Null_Mutex> + Upcall; +typedef ACE_Timer_Heap_T<ACE_Event_Handler *, + Upcall, + ACE_Null_Mutex> + Timer_Heap; +typedef ACE_Timer_Heap_Iterator_T<ACE_Event_Handler *, + Upcall, + ACE_Null_Mutex> + Timer_Heap_Iterator; +typedef ACE_Thread_Timer_Queue_Adapter<Timer_Heap> + Thread_Timer_Queue; + +// Forward declaration. +class Thread_Bounded_Packet_Relay_Driver; + +class Text_Input_Device_Wrapper : public Input_Device_Wrapper_Base +{ + // = TITLE + // Defines a wrapper for a simple active looping text input + // pseudo-device. + // + // = DESCRIPTION + // The wrapper is an active object, running in its own thread, + // and uses a reactor to generate timeouts. When a timeout + // occurs, the wrapper calls its concrete message creation + // method. The wrapper then calls its base class message send + // method to forward the message to the receiver. + // + // A more sophisticated version of this class would use the + // reactive capabilities as well as the timeout generating + // capabilities of the reactor, multiplexing several input + // streams. Comments to this effect appear in the definition of + // the event loop method. +public: + + // = Enumerated logging level flags + enum Logging_Flags {NO_LOGGING = 0, + LOG_MSGS_CREATED = 1}; + + // = Initialization and termination methods. + Text_Input_Device_Wrapper (ACE_Thread_Manager *input_task_mgr, + size_t read_length, + const char* text, + int logging = 0); + // Constructor. + + virtual ~Text_Input_Device_Wrapper (void); + // Destructor. + + virtual int modify_device_settings (void *logging); + // Modifies device settings based on passed pointer to a u_long. + // Turns logging on if u_long is non-zero, off if u_long is zero, + // and does nothing if the pointer is null. + +protected: + virtual ACE_Message_Block *create_input_message (void); + // Creates a new message block, carrying data read from the + // underlying input device. + +private: + size_t read_length_; + // Length of the buffer into which to "read". + + const char *text_; + // Text to "read" into the buffer. + + size_t index_; + // Index into the string. + + int logging_; + // This value is 0 if logging is turned off, non-zero otherwise + + u_long packet_count_; + // This value holds a count of packets created. + +}; + +class Text_Output_Device_Wrapper : public Output_Device_Wrapper_Base +{ + // = TITLE + // Implements a simple wrapper for a output pseudo-device. + // + // = DESCRIPTION + // Data from the passed output message is printed to the standard + // output stream, if logging is turned on. +public: + + // = Enumerated logging level flags + enum Logging_Flags {NO_LOGGING = 0, + LOG_MSGS_RCVD = 2, + PRINT_MSGS_RCVD = 4}; + + Text_Output_Device_Wrapper (int logging = 0); + // Default constructor. + + // = Command Accessible Entry Points + + virtual int write_output_message (void *message); + // Consumes and possibly prints out the passed message. + + virtual int modify_device_settings (void *logging); + // Modifies device settings based on passed pointer to a u_long. + // Turns logging on if u_long is non-zero, off if u_long is zero, + // and does nothing if the pointer is null. + +private: + + int logging_; + // This value holds the logging level. + + u_long packet_count_; + // This value holds a count of packets received. + +}; + +class User_Input_Task : public ACE_Task_Base +{ + // = TITLE + // Read user actions on the Timer_Queue from stdin. + // + // = DESCRIPTION + // This class reads user input from stdin. The commands allow + // the control of a Timer_Queue, which is dispatched by another + // thread. +public: + + // = Trait for command accessible entry points. + + typedef int (User_Input_Task::*ACTION) (void *); + + User_Input_Task (Bounded_Packet_Relay *relay, + Thread_Timer_Queue *queue, + Thread_Bounded_Packet_Relay_Driver &timer_queue_driver); + // Constructor. + + virtual ~User_Input_Task (void); + // Destructor. + + virtual int svc (void); + // This method runs the event loop in the new thread. + + // = Some helper methods. + + int set_packet_count (void *); + // Sets the number of packets for the next transmission. + + int set_arrival_period (void *); + // Sets the input device packet arrival period (usecs) for the next + // transmission. + + int set_send_period (void *); + // Sets the period between output device sends (usecs) for the next + // transmission. + + int set_duration_limit (void *); + // Sets a limit on the transmission duration (usecs). + + int set_logging_level (void *); + // Sets logging level (0 or 1) for output device for the next + // transmission. + + int run_transmission (void *); + // Runs the next transmission (if one is not in progress). + + int end_transmission (void *); + // Ends the current transmission (if one is in progress). + + int report_stats (void *); + // Reports statistics for the previous transmission (if one is not + // in progress). + + int shutdown (void *); + // Shuts down the task. + + int clear_all_timers (void); + // Helper method: clears all timers. + +private: + const int usecs_; + // How many microseconds are in a second. + + Bounded_Packet_Relay *relay_; + // The bounded packet relay. + + Thread_Timer_Queue *queue_; + // The timer queue implementation. + + Thread_Bounded_Packet_Relay_Driver &driver_; + // The thread timer queue test driver. +}; + +class BPR_Handler_Base : public ACE_Event_Handler +{ + // = TITLE + // Base event handler class for bounded packet relay example. + // + // = DESCRIPTION + // The base class provides a helper method that derived classes + // can register as a deferred execution callback that will cancel + // all timers in the underlying timer queue, and then delete "this". + // +public: + + // = Trait for command accessible entry points. + + typedef int (BPR_Handler_Base::*ACTION) (void *); + + + BPR_Handler_Base (Bounded_Packet_Relay &relay, + Thread_Timer_Queue &queue); + // Constructor. + + virtual ~BPR_Handler_Base (void); + // Destructor. + + // = Command accessible entry points. + + virtual int clear_all_timers (void *); + // Helper method: clears all timers. + +protected: + Bounded_Packet_Relay &relay_; + // Stores a reference to the relay object on which to invoke + // the appropritate calls when the timer expires. + + Thread_Timer_Queue &queue_; + // Store a reference to the timer queue, in which to re-register + // the send timer and handler if there are still sends to perform. +}; + +class Send_Handler; + +class Send_Handler : public BPR_Handler_Base +{ + // = TITLE + // Event handler for message send timeout events. + // + // = DESCRIPTION + // The <handle_timeout> hook method calls the relay's send + // method and decrements its count of messages to send. + // If there are still messages to send, it re-registers itself + // with the timer queue. Otherwise it calls the relay's end + // transmission method, and registers a deferred execution + // callback to clear the timer queue, and then delete "this". +public: + + // = Trait for command accessible entry points. + + typedef int (Send_Handler::*ACTION) (void *); + + Send_Handler (u_long send_count, + const ACE_Time_Value &duration, + Bounded_Packet_Relay &relay, + Thread_Timer_Queue &queue, + Thread_Bounded_Packet_Relay_Driver &driver); + // Constructor. + + virtual ~Send_Handler (void); + // Destructor. + + virtual int handle_timeout (const ACE_Time_Value ¤t_time, + const void *arg); + // Call back hook. + + virtual int cancelled (void); + // Cancellation hook. + + // = Command accessible entry points. + + virtual int reregister (void *timeout); + // Helper method: re-registers this handler. + +private: + + u_long send_count_; + // Count of the number of messages to send from the + // relay object to the output device object. + + ACE_Time_Value duration_; + // Stores the expected duration until expiration, and is used to + // re-register the handler if there are still sends to perform. + + Thread_Bounded_Packet_Relay_Driver &driver_; + // Reference to the driver that will redisplay the user input menu. +}; + +class Termination_Handler : public BPR_Handler_Base +{ + // = TITLE + // Event handler for end transmission timeout events. + // + // = DESCRIPTION + // The <handle_timeout> hook method calls the relay's end + // transmission method, then registers a deferred execution + // callback to clear all timers and then delete "this". +public: + Termination_Handler (Bounded_Packet_Relay &relay, + Thread_Timer_Queue &queue, + Thread_Bounded_Packet_Relay_Driver &driver); + // Constructor. + + virtual ~Termination_Handler (void); + // Destructor. + + virtual int handle_timeout (const ACE_Time_Value ¤t_time, + const void *arg); + // Call back hook. + + virtual int cancelled (void); + // Cancellation hook. + +private: + Thread_Bounded_Packet_Relay_Driver &driver_; + // Reference to the driver that will redisplay the user input menu. +}; + +class Thread_Bounded_Packet_Relay_Driver : public Bounded_Packet_Relay_Driver <Thread_Timer_Queue> +{ + // = TITLE + // Implements an example application that exercises + // <Thread_Timer_Queue> timer queue. + // + // = DESCRIPTION + // This class implements a simple test driver for the + // <Thread_Timer_Queue>. The <display_menu> hook method is + // called from the base class to print a menu specific to the + // thread implementation of the timer queue. +public: + + // = Trait for commands issued from this driver + + typedef ACE_Command_Callback<User_Input_Task, User_Input_Task::ACTION> MYCOMMAND; + + // = Initialization and termination methods. + + Thread_Bounded_Packet_Relay_Driver (Bounded_Packet_Relay *relay); + // Constructor. + + virtual ~Thread_Bounded_Packet_Relay_Driver (void); + // Destructor. + + virtual int display_menu (void); + // Displays the user menu. + + virtual int init (void); + // Initializes the driver. + + virtual int run (void); + // Run the driver. + +private: + User_Input_Task input_task_; + // User input task, subclassed from ACE_Task. +}; + +#endif /* _THREAD_BOUNDED_PACKET_RELAY_H_ */ diff --git a/ACE/examples/Bounded_Packet_Relay/bpr_thread.cpp b/ACE/examples/Bounded_Packet_Relay/bpr_thread.cpp new file mode 100644 index 00000000000..0a564ecd2a5 --- /dev/null +++ b/ACE/examples/Bounded_Packet_Relay/bpr_thread.cpp @@ -0,0 +1,122 @@ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// examples +// +// = FILENAME +// bpr_thread.cpp +// +// = DESCRIPTION +// Exercises drivers for a bounded packet relay, based on threaded timer queues. +// +// = AUTHORS +// Chris Gill <cdgill@cs.wustl.edu> and +// Douglas C. Schmidt <schmidt@cs.wustl.edu> +// +// Based on the Timer Queue Test example written by +// +// Carlos O'Ryan <coryan@cs.wustl.edu> and +// Douglas C. Schmidt <schmidt@cs.wustl.edu> and +// Sergio Flores-Gaitan <sergio@cs.wustl.edu> +// +// ============================================================================ + +// The following #pragma is needed to disable a warning that occurs +// in MSVC 6 due to the overly long debugging symbols generated for +// the std::auto_ptr<Timer_Queue_Test_Driver<...> > template +// instance used by some of the methods in this file. +#ifdef _MSC_VER +# pragma warning(disable: 4786) /* identifier was truncated to '255' + characters in the browser + information */ +#endif /* _MSC_VER */ + +#include "ace/Auto_Ptr.h" +#include "Thread_Bounded_Packet_Relay.h" + +ACE_RCSID (Bounded_Packet_Relay, + bpr_thread, + "$Id$") + +typedef Bounded_Packet_Relay_Driver<Thread_Timer_Queue> + THREAD_BOUNDED_PACKET_RELAY_DRIVER; + +typedef ACE_Command_Callback<Bounded_Packet_Relay,Bounded_Packet_Relay::ACTION> + INPUT_CALLBACK; + +// A snippet from Andrew Marvell (Oliver Cromwell's poet laureate) +static const char input_text [] = +"But ever at my back I hear\n" +" Time's winged chariot hurrying near."; + +int +ACE_TMAIN (int, ACE_TCHAR *[]) +{ + // Construct a new thread manager for the input device task. Auto + // ptr ensures memory is freed when we exit this scope. + ACE_Thread_Manager *input_task_mgr; + ACE_NEW_RETURN (input_task_mgr, + ACE_Thread_Manager, + -1); + auto_ptr <ACE_Thread_Manager> mgr (input_task_mgr); + + // Construct a new input device wrapper. Auto ptr ensures memory is + // freed when we exit this scope. + Text_Input_Device_Wrapper *input_device; + ACE_NEW_RETURN (input_device, + Text_Input_Device_Wrapper (input_task_mgr, + sizeof (input_text), + input_text), + -1); + auto_ptr <Text_Input_Device_Wrapper> input (input_device); + + // Construct a new output device wrapper. Auto ptr ensures memory + // is freed when we exit this scope. + Text_Output_Device_Wrapper *output_device; + ACE_NEW_RETURN (output_device, + Text_Output_Device_Wrapper, + -1); + auto_ptr <Text_Output_Device_Wrapper> output (output_device); + + // Construct a new bounded packet relay. Auto ptr ensures memory is + // freed when we exit this scope. + Bounded_Packet_Relay *packet_relay; + ACE_NEW_RETURN (packet_relay, + Bounded_Packet_Relay (input_task_mgr, + input_device, + output_device), + -1); + auto_ptr <Bounded_Packet_Relay> relay (packet_relay); + + // Construct a receive input callback command for the relay, and register + // it with the input device. Auto ptr ensures memory is freed when we exit + // this scope. + INPUT_CALLBACK *input_callback; + ACE_NEW_RETURN (input_callback, + INPUT_CALLBACK (*packet_relay, + &Bounded_Packet_Relay::receive_input), + -1); + auto_ptr <INPUT_CALLBACK> callback (input_callback); + if (input_device->set_send_input_msg_cmd (input_callback) < 0) + { + ACE_ERROR_RETURN ((LM_ERROR, + "failed to register input callback"), + -1); + } + + // Construct a new bounded packet relay driver. Auto ptr ensures + // memory is freed when we exit this scope. + THREAD_BOUNDED_PACKET_RELAY_DRIVER *tbprd; + + ACE_NEW_RETURN (tbprd, + Thread_Bounded_Packet_Relay_Driver (packet_relay), + -1); + + auto_ptr <THREAD_BOUNDED_PACKET_RELAY_DRIVER> driver (tbprd); + + return driver->run (); + // All dynamically allocated memory is released when main() returns. +} + |