diff options
author | cdgill <cdgill@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1998-09-19 17:43:28 +0000 |
---|---|---|
committer | cdgill <cdgill@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1998-09-19 17:43:28 +0000 |
commit | cdde8a7f7c6bbce8b96a14ece5658c67479b28d7 (patch) | |
tree | c1d174189337967c938ed5501e8e5c8df894a990 /examples/Bounded_Packet_Relay | |
parent | 2dac98850bf0d2b56b3f1a6ab75d276835a589cc (diff) | |
download | ATCD-cdde8a7f7c6bbce8b96a14ece5658c67479b28d7.tar.gz |
first (very rough) checkin
Diffstat (limited to 'examples/Bounded_Packet_Relay')
-rw-r--r-- | examples/Bounded_Packet_Relay/BPR_Drivers.cpp | 606 | ||||
-rw-r--r-- | examples/Bounded_Packet_Relay/BPR_Drivers.h | 315 | ||||
-rw-r--r-- | examples/Bounded_Packet_Relay/Makefile | 294 | ||||
-rw-r--r-- | examples/Bounded_Packet_Relay/README | 201 | ||||
-rw-r--r-- | examples/Bounded_Packet_Relay/Thread_Bounded_Packet_Relay.cpp | 672 | ||||
-rw-r--r-- | examples/Bounded_Packet_Relay/Thread_Bounded_Packet_Relay.h | 359 | ||||
-rw-r--r-- | examples/Bounded_Packet_Relay/bpr_thread.cpp | 51 |
7 files changed, 2498 insertions, 0 deletions
diff --git a/examples/Bounded_Packet_Relay/BPR_Drivers.cpp b/examples/Bounded_Packet_Relay/BPR_Drivers.cpp new file mode 100644 index 00000000000..542fe67aa5a --- /dev/null +++ b/examples/Bounded_Packet_Relay/BPR_Drivers.cpp @@ -0,0 +1,606 @@ +// $Id$ + +// ============================================================================ +// = LIBRARY +// examples +// +// = FILENAME +// BPR_Driver.cpp +// +// = DESCRIPTION +// This code builds an abstraction to factor out common code for +// the different implementations of the Timer_Queue. +// +// = AUTHOR +// Chris Gill <cdgill@cs.wustl.edu> +// +// Based on examples/Timer_Queue/Driver.cpp, written by +// Douglas Schmidt <schmidt@cs.wustl.edu> && +// Sergio Flores-Gaitan <sergio@cs.wustl.edu> +// +// ============================================================================ + +#if !defined (_BPR_DRIVER_CPP_) +#define _BPR_DRIVER_CPP_ + +#include "ace/Auto_Ptr.h" +#include "BPR_Driver.h" + +ACE_RCSID(Bounded_Packet_Relay, BPR_Driver, "$Id$") + +Input_Device_Wrapper_Base::Input_Device_Wrapper_Base (ACE_Thread_Manager * input_task_mgr) + : ACE_Task_Base (input_task_mgr) + , send_input_msg_cmd_ (0) + , input_rate_ (ACE_ONE_SECOND_IN_USECS) + , reactor_ + , is_active_ (0) +{ +} + // ctor + +Input_Device_Wrapper_Base::Input_Device_Wrapper_Base () +{ + delete send_input_msg_cmd_; +} + // dtor + +int +Input_Device_Wrapper_Base::set_send_input_msg_cmd (Command_Base *send_input_msg_cmd) +{ + // delete the old command (if any) first + delete send_input_msg_cmd_; + + send_input_msg_cmd_ = send_input_msg_cmd; + return 0; +} + // sets send input message command in the input device driver object + +int +Input_Device_Wrapper_Base::set_input_period (u_long input_period) +{ + input_period_ = input_period; + return 0; +} + // sets period between when input messages are produced + +int +Input_Device_Wrapper_Base::set_send_count (long count) +{ + send_count_ = count; +} + // sets count of messages to send + +int +Input_Device_Wrapper_Base::request_stop (void) +{ + is_active_ = 0; +} + // request that the input device stop sending messages + // and terminate its thread. Should return 1 if it will do so, 0 + // if it has already done so, or -1 if there is a problem doing so. + + +int +Input_Device_Wrapper_Base::svc (void) +{ + long count; + ACE_Time_Value timeout; + ACE_Message_Block *message; + + // set a flag ti indicate we're active + is_active_ = 1; + + // start with the total count of messages to send + count = send_count_; + + // while we're still marked active, and there are packets to send + while ((is_active_) && (count != 0)) + { + // make sure there is a send command object + if (! send_input_msg_cmd_) + { + is_active_ = 0; + ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n", + "send message command object not instantiated"), + -1); + } + + // create an input message to send + message = create_input_message (); + if (! message) + { + is_active_ = 0; + ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n", + "Failed to create input message object"), + -1); + } + + // send the input message + if (send_input_msg_cmd_->execute ((void *) message) < 0) + { + is_active_ = 0; + delete message; + ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n", + "Failed executing send message command object"), + -1); + } + + // if all went well, decrement count of messages to send, + // and run the reactor event loop unti we get a timeout + // or something happens in a registered upcall. + if (count > 0) + { + --count; + } + timeout = ACE_Time_Value (0, period_); + reactor_.run_event_loop (timeout); + } + + is_active_ = 0; + + return 0; +} + // This method runs the input device loop in the new thread. + +int +Input_Device_Wrapper_Base::send_input_message (ACE_Message_Block *amb) +{ + if (send_input_msg_cmd_) + { + return send_input_msg_cmd_->execute ((void *) amb); + } + else + { + ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n", + "Input_Device_Wrapper_Base::send_input_message: " + "command object not instantiated"), + -1); + } +} + // sends a newly created message block, carrying data + // read from the underlying input device, by passing a + // pointer to the message block to its command execution + +template <class SYNCH> +Bounded_Packet_Relay<SYNCH>::Bounded_Packet_Relay ( + ACE_Thread_Manager * input_task_mgr, + Input_Device_Wrapper_Base * input_wrapper, + Output_Device_Wrapper_Base * output_wrapper) + : input_task_mgr_ (input_task_mgr) + , input_wrapper_ (input_wrapper) + , input_thread_handle_ (0) + , output_wrapper_ (output_wrapper) + , queue_ + , is_active_ (0) + , transmission_lock_ + , queue_lock_ + , transmission_number_ (0) + , packets_sent_ (0) + , status_ (Bounded_Packet_Relay<SYNCH>::UN_INITIALIZED) + , elapsed_duration_ (0) +{ + if (input_task_mgr_ == 0) + { + input_task_mgr_ = ACE_Thread_Manager::instance (); + } +} + // ctor + +template <class SYNCH> +Bounded_Packet_Relay<SYNCH>::~Bounded_Packet_Relay (void) +{ + delete input_wrapper_; + delete output_wrapper_; +} + // dtor + +template <class SYNCH> int +Bounded_Packet_Relay<SYNCH>::send_input (void) +{ + ACE_Message_Block *item; + + // don't block, return immediately if queue is empty + if (queue_.dequeue_head (item, ACE_OS::gettimeofday ()) < 0) + { + return 1; + } + + // if a message block was dequeued, send it to the output device + if (output_wrapper_->write_output_message ((void *) item) < 0) + { + ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n", + "failed to write to output device object"), + -1); + } + + // if all went OK, increase count of packets sent + ++packets_sent_; + return 0; +} + // Requests output be sent to output device. + +template <class SYNCH> int +Bounded_Packet_Relay<SYNCH>::start_transmission (u_long packet_count, + u_long arrival_period, + u_long logging_level) +{ + // serialize access to start and end transmission calls, + // statistics reporting calls + ACE_GUARD_RETURN (SYNCH, ace_mon, this->transmission_lock_, -1); + + // if a transmission is already in progress, just return + if (status_ == STARTED) + { + return 1; + } + + // Update statistics for a new transmission + ++transmission_number; + packets_sent_ = 0; + status_ = STARTED; + transmission_start_ = ACE_OS::gettimeofday (); + + // initialize the output device + if (output_wrapper_->modify_device_settings ((void *) &logging level) < 0) + { + status_ = ERROR; + transmission_end_ = ACE_OS::gettimeofday (); + ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n", + "failed to initialize output device object"), + -1); + } + + // initialize the input device + if (input_wrapper_->set_input_period (u_long input_period) < 0) + { + status_ = ERROR; + transmission_end_ = ACE_OS::gettimeofday (); + ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n", + "failed to initialize input device object"), + -1); + } + + // activate the input device and save a handle to the new thread + if (input_wrapper_->activate () < 0) + { + status_ = ERROR; + transmission_end_ = ACE_OS::gettimeofday (); + ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n", + "failed to activate input device object"), + -1); + } + + // If all went well, return success + return 0; +} + // Requests a transmission be started. + +template <class SYNCH> int +Bounded_Packet_Relay<SYNCH>::end_transmission (Transmission_Status status) +{ + // serialize access to start and end transmission calls, + // statistics reporting calls + ACE_GUARD_RETURN (SYNCH, ace_mon, this->transmission_lock_, -1); + + // if a transmission is not already in progress, just return + if (status_ != STARTED) + { + return 1; + } + + // ask the the input thread to stop + if (input_wrapper_->request_stop () < 0) + { + status_ = ERROR; + transmission_end_ = ACE_OS::gettimeofday (); + ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n", + "failed asking input device thread to stop"), + -1); + } + + // wait for input thread to stop + if (input_task_mgr_->wait_task (input_wrapper_) < 0) + { + status_ = ERROR; + transmission_end_ = ACE_OS::gettimeofday (); + ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n", + "failed waiting for input device thread to stop"), + -1); + } + + // if all went well, set passed status, stamp end time, return success + status_ = status; + transmission_end_ = ACE_OS::gettimeofday (); + return 0; +} + // Requests a transmission be ended. + +template <class SYNCH> int +Bounded_Packet_Relay<SYNCH>::report_statistics (void) +{ + // serialize access to start and end transmission calls, + // statistics reporting calls + ACE_GUARD_RETURN (SYNCH, ace_mon, this->transmission_lock_, -1); + + // if a transmission is already in progress, just return + if (status_ == STARTED) + { + return 1; + } + + const char *status_msg; + switch (status_) + { + case UN_INITIALIZED: + status_msg = "Uninitialized"; + break; + + case STARTED: + // NOT REACHED: user should never see this ;-) + status_msg = "In progress"; + break; + + case COMPLETED: + status_msg = "Completed with all packets sent"; + break; + + case TIMED_OUT: + status_msg = "Terminated by transmission duration timer"; + break; + + case CANCELLED: + status_msg = "Cancelled by external control"; + break; + + case ERROR: + status_msg = "Error was detected"; + break; + + default: + status_msg = "Unknown"; + break; + } + + // calculate duration of trasmission + ACE_Time_Value duration (transmission_end_); + duration -= transmission_start_; + + // report transmission statistics + if (ACE_OS::fprintf (ACE_STDOUT, + "\n\nStatisics for transmission %lu:\n\n" + "Transmission status: %s\n" + "Start time: %ld (sec) %ld usec\n" + "End time: %ld (sec) %ld usec\n" + "Duration: %ld (sec) %ld usec\n" + "Packets relayed: %lu\n\n", + transmission_number_, status_msg, + transmission_start_.sec (), + transmission_start_.usec (), + transmission_end_.sec (), + transmission_end_.usec (), + duration.sec (), + duration.usec (), + packets_sent_) < 0) + { + ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n", + "Bounded_Packet_Relay<SYNCH>::report_statistics" + "ACE_OS::fprintf failed"), -1); + } + + if (ACE_OS::fflush (ACE_STDOUT) < 0) + { + ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n", + "Bounded_Packet_Relay<SYNCH>::report_statistics" + "ACE_OS::fflush failed"), -1); + } + + return 0; +} + // Requests a report of statistics from the last transmission. + +template <class SYNCH> int +Bounded_Packet_Relay<SYNCH>::receive_input (void * arg) +{ + ACE_Message_Block *message; + + message = ACE_static_cast (ACE_Message_Block *, arg); + + if (queue_.enqueue_tail (message) < 0) + { + ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n", + "Bounded_Packet_Relay<SYNCH>::receive_input failed"), + -1); + } + + return 0; +} + // public entry point to which to push input. + +// Parse the input and execute the corresponding command + +template <class TQ, class RECEIVER, class ACTION> int +Bounded_Packet_Relay_Driver<TQ, RECEIVER, ACTION>::parse_commands (const char *buf) +{ + int option; + + if (::sscanf (buf, "%d", &option) <= 0) + // If there was an error reading the option simply try on the next line. + return 0; + + switch (option) + { + case 1: // set packet count + { + u_long count; + + // We just reread the option, this simplies parsing + // (since sscanf can do it for us.) + if (::sscanf (buf, "%d %lu", &option, &count) < 2) + { + // if there was not enough information on the line, + // ignore option and try the next line. + return 0; + } + + if (packet_count_cmd_->execute ((void *) &count) == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n", "set packet count failed"), -1); + } + + break; + } + + case 2: // set arrival period + { + u_long usec; + + // We just reread the option, this simplies parsing + // (since sscanf can do it for us.) + if (::sscanf (buf, "%d %lu", &option, &usec) < 2) + { + // if there was not enough information on the line, + // ignore option and try the next line. + return 0; + } + + if (arrival_period_cmd_->execute ((void *) &usec) == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n", "set arrival period failed"), -1); + } + + break; + } + + case 3: // set transmit period + { + u_long usec; + + // We just reread the option, this simplies parsing + // (since sscanf can do it for us.) + if (::sscanf (buf, "%d %lu", &option, &usec) < 2) + { + // if there was not enough information on the line, + // ignore option and try the next line. + return 0; + } + + if (transmit_period_cmd_->execute ((void *) &usec) == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n", "set transmit period failed"), -1); + } + + break; + } + + case 4: // set duration limit + { + u_long usec; + + // We just reread the option, this simplies parsing + // (since sscanf can do it for us.) + if (::sscanf (buf, "%d %lu", &option, &usec) < 2) + { + // if there was not enough information on the line, + // ignore option and try the next line. + return 0; + } + + if (duration_limit_cmd_->execute ((void *) &usec) == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n", "set duration limit failed"), -1); + } + + break; + } + + case 5: // set logging level + { + u_long level; + + // We just reread the option, this simplies parsing + // (since sscanf can do it for us.) + if (::sscanf (buf, "%d %lu", &option, &level) < 2) + { + // if there was not enough information on the line, + // ignore option and try the next line. + return 0; + } + + if (logging_level_cmd_->execute ((void *) &level) == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n", "set logging level failed"), -1); + } + + break; + } + + case 6: // run one transmission + return run_transmission_cmd_->execute (0); + /* NOTREACHED */ + + case 7: // report statistics + return report_stats_cmd_->execute (0); + /* NOTREACHED */ + + case 8: // shut down the driver + return shutdown_cmd_->execute (0); + /* NOTREACHED */ + + default: + // Display an error message. + ACE_ERROR_RETURN ((LM_ERROR, "invalid input %s\n", buf), 0); + ACE_NOTREACHED (break); + /* NOTREACHED */ + + } /* ENDSWITCH */ + + return 0; +} + +// Runs the test. + +template <class TQ, class RECEIVER, class ACTION> int +Bounded_Packet_Relay_Driver<TQ, RECEIVER, ACTION>::run (void) +{ + this->init (); + + for (;;) + if (this->get_next_request () == -1) + return -1; + + ACE_NOTREACHED (return 0); +} + +// gets the next request from the user input. + +template <class TQ, class RECEIVER, class ACTION> int +Bounded_Packet_Relay_Driver<TQ, RECEIVER, ACTION>::get_next_request (void) +{ + char buf[BUFSIZ]; + + this->display_menu (); + + ACE_OS::fprintf (ACE_STDERR, "Please enter your choice: "); + ACE_OS::fflush (ACE_STDERR); + + // reads input from the user + if (this->read_input (buf, sizeof buf) <= 0) + return -1; + + // Parse and run the command. + return this->parse_commands (buf); +} + +// Reads input from the user from ACE_STDIN into the buffer specified. + +template <class TQ, class RECEIVER, class ACTION> ssize_t +Bounded_Packet_Relay_Driver<TQ, RECEIVER, ACTION>::read_input (char *buf, size_t bufsiz) +{ + ACE_OS::memset (buf, 0, bufsiz); + + // Wait for user to type commands. This call is automatically + // restarted when SIGINT or SIGALRM signals occur. + return ACE_OS::read (ACE_STDIN, buf, bufsiz); +} + +#endif /* _BPR_DRIVER_CPP_ */ diff --git a/examples/Bounded_Packet_Relay/BPR_Drivers.h b/examples/Bounded_Packet_Relay/BPR_Drivers.h new file mode 100644 index 00000000000..b583dc8edcf --- /dev/null +++ b/examples/Bounded_Packet_Relay/BPR_Drivers.h @@ -0,0 +1,315 @@ +/* -*- C++ -*- */ + +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// examples +// +// = FILENAME +// BPR_Driver.h +// +// = DESCRIPTION +// This code builds an abstraction to factor out common code from +// the different implementations of the Timer_Queue based bounded +// packet relay example. +// +// = AUTHORS +// Chris Gill <cdgill@cs.wustl.edu> +// +// Based on examples/Timer_Queue/Driver.h written by +// Sergio Flores-Gaitan <sergio@cs.wustl.edu> +// +// ============================================================================ + +#if !defined (_BPR_DRIVER_H_) +#define _BPR_DRIVER_H_ + +#include "ace/Task.h" +// #include "ace/Timer_Heap_T.h" +// #include "ace/Timer_Queue_Adapters.h" + +class Command_Base + // = TITLE + // Defines an abstract class that allows us to invoke commands + // without knowing anything about the implementation. + // + // = DESCRIPTION + // This class declares an interface to execute a command independent + // of the effect of the command, or the objects used to implement it. +{ +public: + + virtual int execute (void *arg) = 0; + // Invokes the method <action_> from the object <receiver_>. + +}; + + +class Input_Device_Wrapper_Base : public ACE_Task_Base + // = TITLE + // Defines an abstract base class for an input device wrapper that hides + // the details of the specific device and provides a consistent message + // passing interface without knowing anything about the implementation + // of the input device or the message receiver. + // + // The abstract base class ctor takes a command template object that is + // instantiated with the correct receiver and action types. This command + // object is used to send newly created input messages to the receiver. + // + // The abstract base class is designed to operate in an active "push" + // mode, sending input data to the receiver whenever the data is ready. + // The underlying device may be active, notifying the wrapper when + // data is ready, or may be passive in which case the wrapper must + // rely on a reactive and/or polling mechanism. + // + // = DESCRIPTION + // Derived classes are responsible for filling in concrete definitions + // for the abstract message creation method and the svc method. +{ +public: + + Input_Device_Wrapper_Base (ACE_Thread_Manager * input_task_mgr); + // ctor + + virtual ~Input_Device_Wrapper_Base (); + // dtor + + int set_send_input_msg_cmd (Command_Base *send_input_msg_cmd); + // sets send input message command in the input device driver object + + int set_input_period (u_long input_period); + // sets period (in usecs) between when inputs are created + + int set_send_count (long count); + // sets count of messages to send + + int request_stop (void); + // request that the input device stop sending messages + // and terminate its thread. Should return 1 if it will do so, 0 + // if it has already done so, or -1 if there is a problem doing so. + + virtual int svc (void); + // This method runs the input device loop in the new thread. + +protected: + + virtual ACE_Message_Block *create_input_message () = 0; + // creates a new message block, carrying data + // read from the underlying input device + + virtual int send_input_message (ACE_Message_Block *); + // sends a newly created message block, carrying data + // read from the underlying input device, by passing a + // pointer to the message block to its command execution + + Command_Base *send_input_msg_cmd_; + // send newly created input message. + + u_long input_period_; + // period between when input values are produced (usecs) + + ACE_Reactor reactor_; + // reactor used to multiplex input streams, timeouts + + int is_active_; + // flag to indicate whether or not input object + // is and should remain active + + long send_count_; + // count of messages to send before stopping + // (-1 indicates the device should not stop) +}; + + +class Output_Device_Wrapper_Base + // = TITLE + // Defines an abstract base class for an output device wrapper that hides + // the details of the specific device and provides a consistent write + // method interface without knowing anything about the implementation. + // + // = DESCRIPTION + // The abstract methods write_output_message () and modify_device_settings () + // are defined in derived classes to write the contents of the passed message + // out the underlying output device, and update device settings, respectively. +{ +public: + + virtual int write_output_message (void *) = 0; + // writes contents of the passed message block + // out to the underlying output device + + virtual int modify_device_settings (void *) = 0; + // provides an abstract interface to allow modifying device settings +}; + + +template <class SYNCH> +class Bounded_Packet_Relay +{ +public: + + typedef int (Input_Task::*ACTION) (void *); + // Command entry point type definition + + enum Transmission_Status {UN_INITIALIZED, STARTED, COMPLETED, TIMED_OUT, CANCELLED, ERROR}; + // enumerates possible status values at the end of a transmission + + Bounded_Packet_Relay (ACE_Thread_Manager * input_task_mgr, + Input_Device_Wrapper_Base * input_wrapper, + Output_Device_Wrapper_Base * output_wrapper); + // ctor + + virtual ~Bounded_Packet_Relay (void); + // dtor + + int send_input (void); + // Requests output be sent to output device. + + int start_transmission (u_long packet_count, + u_long arrival_period, + u_long logging_level); + // Requests a transmission be started. + + int end_transmission (Transmission_Status status); + // Requests a transmission be ended. + + int report_statistics (void); + // Requests a report of statistics from the last transmission. + + ///////////////////////////////////// + // Command Accessible Entry Points // + ///////////////////////////////////// + + int receive_input (void *); + // public entry point to which to push input. + +private: + + //////////////////////////// + // Concurrency Management // + //////////////////////////// + + ACE_Thread_Manager * input_task_mgr_; + // Thread manager for the input device task + + Input_Device_Wrapper_Base * input_wrapper_; + // pointer to the input device wrapper + + Output_Device_Wrapper_Base * output_wrapper_; + // pointer to the output device wrapper + + ACE_Message_Queue<SYNCH> queue_; + + SYNCH transmission_lock_; + // lock for safe thread synchronization + // of transmission startup and termination + + ///////////////////////////// + // Transmission Statistics // + ///////////////////////////// + + u_long transmission_number_; + // number of transmissions sent + + u_long packets_sent_; + // count of packets sent in the most recent transmission + + Transmission_Status status_; + // status of the current or most recent transmission + + ACE_Time_Value transmission_start_; + // start time of the most recent transmission + + ACE_Time_Value transmission_end_; + // ending time of the most recent transmission + +}; + +template <class TQ> +class Bounded_Packet_Relay_Driver + // = TITLE + // Defines a class that provides a simmple implementation for + // a test driver for timer queues. + // + // = DESCRIPTION + // This is the place where the common code to test the different + // implementations of the timer queue resides. This class has + // the logic for the parse_commands () method, the run (), + // read_input () and get_next_request () methods. Subclasses can + // override these methods if there is some logic that is specific + // to that implementation. +{ +public: + + virtual int parse_commands (const char *buf); + // Breaks up the input string buffer into pieces and executes + // the appropriate method to handle that operation. + + virtual int run (void); + // This is the main entry point for the driver. The user + // of the class should normally invoke this method. + // Returns 0 when successful, or 0 otherwise. + + virtual int get_next_request (void); + // This internal method gets the next request from the user. + // Returns -1 when user wants to exit. Returns 0 otherwise. + + virtual ssize_t read_input (char *buf, size_t bufsiz); + // Reads input from the user into the buffer <buf> with a maximum + // of <bufsiz> bytes. Returns the amount of bytes actually read + // Otherwise, a -1 is returned and errno is set to indicate the error. + + virtual int display_menu (void); + // Prints the user interface for the driver to STDOUT. + + virtual int init (void)=0; + // Initializes values and operations for the driver. + +protected: + + // = Major Driver Mechanisms + + TQ timer_queue_; + // timer queue for transmission timeouts + + // = Set of <Command>s to be executed. + + Command_Base *packet_count_cmd_; + // set packet count command + + Command_Base *arrival_period_cmd_; + // set arrival period command. + + Command_Base *transmit_period_cmd_; + // set transmit period command. + + Command_Base *duration_limit_cmd_; + // set duration limit command. + + Command_Base *logging_level_cmd_; + // set logging level command. + + Command_Base *run_transmission_cmd_; + // run transmission command. + + Command_Base *cancel_transmission_cmd_; + // cancel transmission command. + + Command_Base *report_stats_cmd_; + // report statistics command. + + Command_Base *shutdown_cmd_; + // shutdown the driver. +}; + +#if defined (ACE_TEMPLATES_REQUIRE_SOURCE) +#include "Driver.cpp" +#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */ + +#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA) +#pragma implementation ("Driver.cpp") +#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */ + +#endif /* _BPR_DRIVER_H_ */ diff --git a/examples/Bounded_Packet_Relay/Makefile b/examples/Bounded_Packet_Relay/Makefile new file mode 100644 index 00000000000..9291ba4b2f9 --- /dev/null +++ b/examples/Bounded_Packet_Relay/Makefile @@ -0,0 +1,294 @@ +#---------------------------------------------------------------------------- +# $Id$ +#---------------------------------------------------------------------------- + +#---------------------------------------------------------------------------- +# Local macros +#---------------------------------------------------------------------------- + +INFO = README + +LIB = libBPRD.a +SHLIB = libBPRD.$(SOEXT) + +BIN = bpr_thread + +PSRC = $(addsuffix .cpp,$(BIN)) + +LSRC = Thread_Bounded_Packet_Relay.cpp \ + BPR_Drivers.cpp + +LDLIBS = -lBPRD + +VLDLIBS = $(LDLIBS:%=%$(VAR)) + +BUILD = $(VLIB) $(VSHLIB) $(SHLIBA) $(VBIN) + +#---------------------------------------------------------------------------- +# Include macros and targets +#---------------------------------------------------------------------------- + +include $(ACE_ROOT)/include/makeinclude/wrapper_macros.GNU +include $(ACE_ROOT)/include/makeinclude/macros.GNU +include $(ACE_ROOT)/include/makeinclude/rules.common.GNU +include $(ACE_ROOT)/include/makeinclude/rules.nonested.GNU +include $(ACE_ROOT)/include/makeinclude/rules.lib.GNU +include $(ACE_ROOT)/include/makeinclude/rules.bin.GNU +include $(ACE_ROOT)/include/makeinclude/rules.local.GNU + +#---------------------------------------------------------------------------- +# Local targets +#---------------------------------------------------------------------------- + +#---------------------------------------------------------------------------- +# Dependencies +#---------------------------------------------------------------------------- + +# DO NOT DELETE THIS LINE -- g++dep uses it. +# DO NOT PUT ANYTHING AFTER THIS LINE, IT WILL GO AWAY. + +.obj/Thread_Bounded_Packet_Relay.o .obj/Thread_Bounded_Packet_Relay.so .shobj/Thread_Bounded_Packet_Relay.o .shobj/Thread_Bounded_Packet_Relay.so: Thread_Bounded_Packet_Relay.cpp \ + $(ACE_ROOT)/ace/Task.h \ + $(ACE_ROOT)/ace/Service_Object.h \ + $(ACE_ROOT)/ace/Shared_Object.h \ + $(ACE_ROOT)/ace/ACE.h \ + $(ACE_ROOT)/ace/OS.h \ + $(ACE_ROOT)/ace/inc_user_config.h \ + $(ACE_ROOT)/ace/config.h \ + $(ACE_ROOT)/ace/streams.h \ + $(ACE_ROOT)/ace/Basic_Types.h \ + $(ACE_ROOT)/ace/Basic_Types.i \ + $(ACE_ROOT)/ace/OS.i \ + $(ACE_ROOT)/ace/Trace.h \ + $(ACE_ROOT)/ace/Log_Msg.h \ + $(ACE_ROOT)/ace/Log_Record.h \ + $(ACE_ROOT)/ace/ACE.i \ + $(ACE_ROOT)/ace/Log_Priority.h \ + $(ACE_ROOT)/ace/Log_Record.i \ + $(ACE_ROOT)/ace/Shared_Object.i \ + $(ACE_ROOT)/ace/Event_Handler.h \ + $(ACE_ROOT)/ace/Event_Handler.i \ + $(ACE_ROOT)/ace/Service_Object.i \ + $(ACE_ROOT)/ace/Thread_Manager.h \ + $(ACE_ROOT)/ace/Thread.h \ + $(ACE_ROOT)/ace/Thread.i \ + $(ACE_ROOT)/ace/Synch.h \ + $(ACE_ROOT)/ace/SV_Semaphore_Complex.h \ + $(ACE_ROOT)/ace/SV_Semaphore_Simple.h \ + $(ACE_ROOT)/ace/SV_Semaphore_Simple.i \ + $(ACE_ROOT)/ace/SV_Semaphore_Complex.i \ + $(ACE_ROOT)/ace/Synch.i \ + $(ACE_ROOT)/ace/Synch_T.h \ + $(ACE_ROOT)/ace/Synch_T.i \ + $(ACE_ROOT)/ace/Atomic_Op.i \ + $(ACE_ROOT)/ace/Containers.h \ + $(ACE_ROOT)/ace/Containers.i \ + $(ACE_ROOT)/ace/Thread_Manager.i \ + $(ACE_ROOT)/ace/Task.i \ + $(ACE_ROOT)/ace/Task_T.h \ + $(ACE_ROOT)/ace/Message_Queue.h \ + $(ACE_ROOT)/ace/Message_Block.h \ + $(ACE_ROOT)/ace/Malloc.h \ + $(ACE_ROOT)/ace/Malloc.i \ + $(ACE_ROOT)/ace/Malloc_T.h \ + $(ACE_ROOT)/ace/Free_List.h \ + $(ACE_ROOT)/ace/Free_List.i \ + $(ACE_ROOT)/ace/Malloc_T.i \ + $(ACE_ROOT)/ace/Memory_Pool.h \ + $(ACE_ROOT)/ace/Signal.h \ + $(ACE_ROOT)/ace/Signal.i \ + $(ACE_ROOT)/ace/Object_Manager.h \ + $(ACE_ROOT)/ace/Object_Manager.i \ + $(ACE_ROOT)/ace/Managed_Object.h \ + $(ACE_ROOT)/ace/Managed_Object.i \ + $(ACE_ROOT)/ace/Mem_Map.h \ + $(ACE_ROOT)/ace/Mem_Map.i \ + $(ACE_ROOT)/ace/Memory_Pool.i \ + $(ACE_ROOT)/ace/Message_Block.i \ + $(ACE_ROOT)/ace/IO_Cntl_Msg.h \ + $(ACE_ROOT)/ace/Strategies.h \ + $(ACE_ROOT)/ace/Strategies_T.h \ + $(ACE_ROOT)/ace/Service_Config.h \ + $(ACE_ROOT)/ace/Service_Config.i \ + $(ACE_ROOT)/ace/Reactor.h \ + $(ACE_ROOT)/ace/Handle_Set.h \ + $(ACE_ROOT)/ace/Handle_Set.i \ + $(ACE_ROOT)/ace/Timer_Queue.h \ + $(ACE_ROOT)/ace/Timer_Queue_T.h \ + $(ACE_ROOT)/ace/Timer_Queue_T.i \ + $(ACE_ROOT)/ace/Reactor.i \ + $(ACE_ROOT)/ace/Reactor_Impl.h \ + $(ACE_ROOT)/ace/Svc_Conf_Tokens.h \ + $(ACE_ROOT)/ace/Synch_Options.h \ + $(ACE_ROOT)/ace/Hash_Map_Manager.h \ + $(ACE_ROOT)/ace/Message_Queue.i \ + $(ACE_ROOT)/ace/Task_T.i \ + $(ACE_ROOT)/ace/Timer_Heap_T.h \ + $(ACE_ROOT)/ace/Timer_Queue_Adapters.h \ + $(ACE_ROOT)/ace/Timer_Queue_Adapters.i \ + Thread_Bounded_Packet_Relay.h BPR_Drivers.h +.obj/BPR_Drivers.o .obj/BPR_Drivers.so .shobj/BPR_Drivers.o .shobj/BPR_Drivers.so: BPR_Drivers.cpp \ + $(ACE_ROOT)/ace/Auto_Ptr.h \ + $(ACE_ROOT)/ace/ACE.h \ + $(ACE_ROOT)/ace/OS.h \ + $(ACE_ROOT)/ace/inc_user_config.h \ + $(ACE_ROOT)/ace/config.h \ + $(ACE_ROOT)/ace/streams.h \ + $(ACE_ROOT)/ace/Basic_Types.h \ + $(ACE_ROOT)/ace/Basic_Types.i \ + $(ACE_ROOT)/ace/OS.i \ + $(ACE_ROOT)/ace/Trace.h \ + $(ACE_ROOT)/ace/Log_Msg.h \ + $(ACE_ROOT)/ace/Log_Record.h \ + $(ACE_ROOT)/ace/ACE.i \ + $(ACE_ROOT)/ace/Log_Priority.h \ + $(ACE_ROOT)/ace/Log_Record.i \ + $(ACE_ROOT)/ace/Auto_Ptr.i BPR_Drivers.h \ + $(ACE_ROOT)/ace/Task.h \ + $(ACE_ROOT)/ace/Service_Object.h \ + $(ACE_ROOT)/ace/Shared_Object.h \ + $(ACE_ROOT)/ace/Shared_Object.i \ + $(ACE_ROOT)/ace/Event_Handler.h \ + $(ACE_ROOT)/ace/Event_Handler.i \ + $(ACE_ROOT)/ace/Service_Object.i \ + $(ACE_ROOT)/ace/Thread_Manager.h \ + $(ACE_ROOT)/ace/Thread.h \ + $(ACE_ROOT)/ace/Thread.i \ + $(ACE_ROOT)/ace/Synch.h \ + $(ACE_ROOT)/ace/SV_Semaphore_Complex.h \ + $(ACE_ROOT)/ace/SV_Semaphore_Simple.h \ + $(ACE_ROOT)/ace/SV_Semaphore_Simple.i \ + $(ACE_ROOT)/ace/SV_Semaphore_Complex.i \ + $(ACE_ROOT)/ace/Synch.i \ + $(ACE_ROOT)/ace/Synch_T.h \ + $(ACE_ROOT)/ace/Synch_T.i \ + $(ACE_ROOT)/ace/Atomic_Op.i \ + $(ACE_ROOT)/ace/Containers.h \ + $(ACE_ROOT)/ace/Containers.i \ + $(ACE_ROOT)/ace/Thread_Manager.i \ + $(ACE_ROOT)/ace/Task.i \ + $(ACE_ROOT)/ace/Task_T.h \ + $(ACE_ROOT)/ace/Message_Queue.h \ + $(ACE_ROOT)/ace/Message_Block.h \ + $(ACE_ROOT)/ace/Malloc.h \ + $(ACE_ROOT)/ace/Malloc.i \ + $(ACE_ROOT)/ace/Malloc_T.h \ + $(ACE_ROOT)/ace/Free_List.h \ + $(ACE_ROOT)/ace/Free_List.i \ + $(ACE_ROOT)/ace/Malloc_T.i \ + $(ACE_ROOT)/ace/Memory_Pool.h \ + $(ACE_ROOT)/ace/Signal.h \ + $(ACE_ROOT)/ace/Signal.i \ + $(ACE_ROOT)/ace/Object_Manager.h \ + $(ACE_ROOT)/ace/Object_Manager.i \ + $(ACE_ROOT)/ace/Managed_Object.h \ + $(ACE_ROOT)/ace/Managed_Object.i \ + $(ACE_ROOT)/ace/Mem_Map.h \ + $(ACE_ROOT)/ace/Mem_Map.i \ + $(ACE_ROOT)/ace/Memory_Pool.i \ + $(ACE_ROOT)/ace/Message_Block.i \ + $(ACE_ROOT)/ace/IO_Cntl_Msg.h \ + $(ACE_ROOT)/ace/Strategies.h \ + $(ACE_ROOT)/ace/Strategies_T.h \ + $(ACE_ROOT)/ace/Service_Config.h \ + $(ACE_ROOT)/ace/Service_Config.i \ + $(ACE_ROOT)/ace/Reactor.h \ + $(ACE_ROOT)/ace/Handle_Set.h \ + $(ACE_ROOT)/ace/Handle_Set.i \ + $(ACE_ROOT)/ace/Timer_Queue.h \ + $(ACE_ROOT)/ace/Timer_Queue_T.h \ + $(ACE_ROOT)/ace/Timer_Queue_T.i \ + $(ACE_ROOT)/ace/Reactor.i \ + $(ACE_ROOT)/ace/Reactor_Impl.h \ + $(ACE_ROOT)/ace/Svc_Conf_Tokens.h \ + $(ACE_ROOT)/ace/Synch_Options.h \ + $(ACE_ROOT)/ace/Hash_Map_Manager.h \ + $(ACE_ROOT)/ace/Message_Queue.i \ + $(ACE_ROOT)/ace/Task_T.i \ + $(ACE_ROOT)/ace/Timer_Heap_T.h \ + $(ACE_ROOT)/ace/Timer_Queue_Adapters.h \ + $(ACE_ROOT)/ace/Timer_Queue_Adapters.i +.obj/bpr_thread.o .obj/bpr_thread.so .shobj/bpr_thread.o .shobj/bpr_thread.so: bpr_thread.cpp \ + $(ACE_ROOT)/ace/Auto_Ptr.h \ + $(ACE_ROOT)/ace/ACE.h \ + $(ACE_ROOT)/ace/OS.h \ + $(ACE_ROOT)/ace/inc_user_config.h \ + $(ACE_ROOT)/ace/config.h \ + $(ACE_ROOT)/ace/streams.h \ + $(ACE_ROOT)/ace/Basic_Types.h \ + $(ACE_ROOT)/ace/Basic_Types.i \ + $(ACE_ROOT)/ace/OS.i \ + $(ACE_ROOT)/ace/Trace.h \ + $(ACE_ROOT)/ace/Log_Msg.h \ + $(ACE_ROOT)/ace/Log_Record.h \ + $(ACE_ROOT)/ace/ACE.i \ + $(ACE_ROOT)/ace/Log_Priority.h \ + $(ACE_ROOT)/ace/Log_Record.i \ + $(ACE_ROOT)/ace/Auto_Ptr.i BPR_Drivers.h \ + $(ACE_ROOT)/ace/Task.h \ + $(ACE_ROOT)/ace/Service_Object.h \ + $(ACE_ROOT)/ace/Shared_Object.h \ + $(ACE_ROOT)/ace/Shared_Object.i \ + $(ACE_ROOT)/ace/Event_Handler.h \ + $(ACE_ROOT)/ace/Event_Handler.i \ + $(ACE_ROOT)/ace/Service_Object.i \ + $(ACE_ROOT)/ace/Thread_Manager.h \ + $(ACE_ROOT)/ace/Thread.h \ + $(ACE_ROOT)/ace/Thread.i \ + $(ACE_ROOT)/ace/Synch.h \ + $(ACE_ROOT)/ace/SV_Semaphore_Complex.h \ + $(ACE_ROOT)/ace/SV_Semaphore_Simple.h \ + $(ACE_ROOT)/ace/SV_Semaphore_Simple.i \ + $(ACE_ROOT)/ace/SV_Semaphore_Complex.i \ + $(ACE_ROOT)/ace/Synch.i \ + $(ACE_ROOT)/ace/Synch_T.h \ + $(ACE_ROOT)/ace/Synch_T.i \ + $(ACE_ROOT)/ace/Atomic_Op.i \ + $(ACE_ROOT)/ace/Containers.h \ + $(ACE_ROOT)/ace/Containers.i \ + $(ACE_ROOT)/ace/Thread_Manager.i \ + $(ACE_ROOT)/ace/Task.i \ + $(ACE_ROOT)/ace/Task_T.h \ + $(ACE_ROOT)/ace/Message_Queue.h \ + $(ACE_ROOT)/ace/Message_Block.h \ + $(ACE_ROOT)/ace/Malloc.h \ + $(ACE_ROOT)/ace/Malloc.i \ + $(ACE_ROOT)/ace/Malloc_T.h \ + $(ACE_ROOT)/ace/Free_List.h \ + $(ACE_ROOT)/ace/Free_List.i \ + $(ACE_ROOT)/ace/Malloc_T.i \ + $(ACE_ROOT)/ace/Memory_Pool.h \ + $(ACE_ROOT)/ace/Signal.h \ + $(ACE_ROOT)/ace/Signal.i \ + $(ACE_ROOT)/ace/Object_Manager.h \ + $(ACE_ROOT)/ace/Object_Manager.i \ + $(ACE_ROOT)/ace/Managed_Object.h \ + $(ACE_ROOT)/ace/Managed_Object.i \ + $(ACE_ROOT)/ace/Mem_Map.h \ + $(ACE_ROOT)/ace/Mem_Map.i \ + $(ACE_ROOT)/ace/Memory_Pool.i \ + $(ACE_ROOT)/ace/Message_Block.i \ + $(ACE_ROOT)/ace/IO_Cntl_Msg.h \ + $(ACE_ROOT)/ace/Strategies.h \ + $(ACE_ROOT)/ace/Strategies_T.h \ + $(ACE_ROOT)/ace/Service_Config.h \ + $(ACE_ROOT)/ace/Service_Config.i \ + $(ACE_ROOT)/ace/Reactor.h \ + $(ACE_ROOT)/ace/Handle_Set.h \ + $(ACE_ROOT)/ace/Handle_Set.i \ + $(ACE_ROOT)/ace/Timer_Queue.h \ + $(ACE_ROOT)/ace/Timer_Queue_T.h \ + $(ACE_ROOT)/ace/Timer_Queue_T.i \ + $(ACE_ROOT)/ace/Reactor.i \ + $(ACE_ROOT)/ace/Reactor_Impl.h \ + $(ACE_ROOT)/ace/Svc_Conf_Tokens.h \ + $(ACE_ROOT)/ace/Synch_Options.h \ + $(ACE_ROOT)/ace/Hash_Map_Manager.h \ + $(ACE_ROOT)/ace/Message_Queue.i \ + $(ACE_ROOT)/ace/Task_T.i \ + $(ACE_ROOT)/ace/Timer_Heap_T.h \ + $(ACE_ROOT)/ace/Timer_Queue_Adapters.h \ + $(ACE_ROOT)/ace/Timer_Queue_Adapters.i \ + Thread_Bounded_Packet_Relay.h + +# IF YOU PUT ANYTHING HERE IT WILL GO AWAY diff --git a/examples/Bounded_Packet_Relay/README b/examples/Bounded_Packet_Relay/README new file mode 100644 index 00000000000..c067ffc215f --- /dev/null +++ b/examples/Bounded_Packet_Relay/README @@ -0,0 +1,201 @@ +1. INTRODUCTION + +This directory contains an example that illustrates how to use both +threaded and reactive concurrency mechanisms in ACE. The example +application schedules and processes heterogenerous user input and +timer-based events in the context of a bounded packet relay mechanism. + +This example is based on the notion of a time-bounded packet relay. +In this example, a transmission begins, packets arrive from an input +device object, and are transferred to an output device object by a +relay object at a specified pace. The transfer continues until all +packets have been relayed or a duration limit expires, at which point +the transmission ends. + +User input is handled concurrently with a running transmission. You +can run a transmission, cancel a transmission, change transmission +parameters, view statistics from the most recent transmission, or exit +the program, using selections from an interactive text-based menu. + +In addition, the example program can be run in batch mode, with the +appropriate commands piped to the program in place of its standard +input stream. Commands sent in batch mode must be separated by new +lines. Menu prompts and error messages are directed to the standard +error stream. The results of the transmission run and statistics +reporting commands are directed to the standard output stream, so the +results can be isolated and directed to a file. + +Transmission parameters are intialized to default values. +Transmission parameter values persist until/unless they are +subsequently modified by an appropriate command. If an invalid value +for a command is given, or a run or report command is issued while a +transmission is in progress, the offending command has no effect, and +an error message is generated. + +2. USER INTERFACE + +Commands that can be given to the program include the following: + +Settings commands: + + 1 <number of packets to relay in one transmission> + + Minimum value is 1 packet, defaults to 1000 packets. + + 2 <input packet arrival period (in usec)> + + Minimum value is 1 usec, defaults to 500 usec (1/2 msec). + + 3 <output packet transmission period (in usec)> + + Minimum value is 1 usec, defaults to 1000 usec (1 msec). + + 4 <limit on duration of transmission (in usec)> + + Minimum value is 1 usec, defaults to 1500000 usec (1 1/2 sec). + A value of 0 is also a valid input, in which case no limit + will be placed on the duration of the transmission (it will + end when all packets have been relayed from the input device + to the output device). + + 5 <logging level> + + 0 - does no logging + 1 - reports time each packet arrived at the output device + as well as the contents of the message. + +Action commands: + + 6 - runs a transmission using the current settings + + 7 - cancels a transmission if there is one running + + 8 - reports statistics from the most recent transmission + + 9 - quits the program + +3. APPLICATION DESIGN + +3.1. KEY COMPONENTS + +The design of this example application consists of four main +components: the driver object, the relay object, the input device +object, and the output device object. + +The driver object is responsible for receiving user input and overall +handling of user input commands. The driver object is active, with +separate threads for receiving user input and managing its +transmission timer queue. This allows the user to issue commands +while a transmission is in progress. The driver object uses an +ACE_Thread_Timer_Queue_Adapter, which is derived from ACE_Task_Base. +The driver object runs inside another active object, called +User_Input_Task, which is also derived from ACE_Task_Base. This +allows both the timer queue and the user input object to be made +active, running in their own threads of control. + +The relay object is passive, managing a message queue and necessary +locks to allow safe access from multiple threads. It provides methods +to receive and enqueue a mesage from the input device, dequeue a +message and send it to the output device, and to start or end a +transmission. It uses ACE_Message_Queue (which contains ACE_Message_Block +objects) and ACE_Thread_Mutex objects to implement this functionality. + +The input object is active, managing timeouts and input events in its +own thread. The input object is also reactive, using an ACE_Reactor to +allow response to multiple input handles as well as to do polling at +specific timeouts. The input pseudo-device wrapper in this example +does not make use of input handles and only does timed polling, but +extending this only requires registering the appropriate input handles +and handlers with the reactor. The input object is derived from +ACE_Task_Base, and is activated by the relay object when a new +transmission starts. The input object packages each data packet in an +ACE_Message_Block before sending it to the relay object. + +The output object is passive. If logging is turned on, it will report +the arrival time, relative to the start of the transmission, of each +output message, and the contents of that message. The output object +will also "consume" each ACE_Message_Block passed to it, calling +delete on the passed pointer. + +3.2. RUN-TIME CHARACTERSITICS + +When the user initiates a transmission, the appropriate settings are +passed by the driver to the relay object's start transmission method. +The relay object tries to acquire a lock for a new transmission. If +it fails to do so, another transmission is in progress, and the +methods returns an error. Otherwise, the relay object's start +transmission method initializes itself and the input and output device +objects, activates the input device object, and stores the handle for +the new input device thread. + +The driver then constructs a timeout handler with a count of the +number of messages to relay and a send timeout value, and pushes a +timer with this handler onto the timer queue. If there is a limit on +the duration of the transmission, the driver constructs a different +handler for end-of- transmission, and pushes a timer for the end of +the transmission with this handler onto the timer queue as well. When +the user issues a cancel transmission command, the driver calls the +relay's end transmission method. + +When a send timeout expires, the handler (running in the timer queue +thread) calls the send method of the relay. If there are any enqueued +messages from the input device object in its queue, the relay object's +send method will dequeue the message, pass it to the output device +object, and return success. If there are no messages in the queue, +the relay object's send method will return failure. If the send was +successful, the handler will decrement its count of remaining +messages. If the count is greater than zero, the handler will +register a new send timer for itself and exit. If the count is zero, +then the handler will call the relay's end transmission method, clear +the timer queue, and release the transmission-in-progress semaphore +before exiting. + +Similarly, if the end-of-transmission timer expires before all packets +have been sent, that handler will call the relay's end transmission +method, clear the timer queue, release the semaphore, and then exit. + +When the relay's end transmission method is called, it marks the relay +itself inactive, and calls the input device's deactivate method, which +sets the input device's activity flag to inactive (see below). The +relay's end transmission method then waits on the stored input thread +handle until that thread exits, before returning. + +While it is still active, the input device thread blocks on a reactor +timeout for the duration it was given by the relay. When the timeout +expires, the input device checks a flag to see if it is still active. +If the flag says the input device is inactive, the thread simply +exits. Tis allows cancellation of the input device when a +transmission has ended. If the flag says it is still active, the +thread builds a new character buffer, and wraps this with a new +ACE_Message_Block. The input device passes this message block to the +execution method of the send input message command object with which +it was constructed. This level of indirection allows the input device +to be used with arbitrary types, so that it could for example be +connected directly to an output device. The input device also +maintains a count of the number of messages it has sent. Once the +input device has sent all its messages, it marks itself inactive, and +its thread simply exits. + +4. ACCESSING THE SOURCE CODE + +The files for this example are located in +$ACE_ROOT/examples/Bounded_Packet_Relay in the latest release of ACE, +which will be located at + +http://www.cs.wustl.edu/~schmidt/ACE_wrappers/ACE.tar.gz + +when the example application is complete. + +Source Files: Thread_Bounded_Packet_Relay.h + Thread_Bounded_Packet_Relay.cpp + BPR_Driver.h + BPR_Driver.cpp + bpr_thread.cpp + +Make file: Makefile + +Doc file: README (this file) + +Executable: bpr_thread + + diff --git a/examples/Bounded_Packet_Relay/Thread_Bounded_Packet_Relay.cpp b/examples/Bounded_Packet_Relay/Thread_Bounded_Packet_Relay.cpp new file mode 100644 index 00000000000..17d9f2d947e --- /dev/null +++ b/examples/Bounded_Packet_Relay/Thread_Bounded_Packet_Relay.cpp @@ -0,0 +1,672 @@ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// examples +// +// = FILENAME +// Thread_Bounded_Packet_Relay.cpp +// +// = DESCRIPTION +// Method definitions for a threaded bounded packet relay +// +// = AUTHORS +// Chris Gill <cdgill@cs.wustl.edu> +// +// Based on the threaded timer queue driver by +// +// Carlos O'Ryan <coryan@cs.wustl.edu> +// Douglas C. Schmidt <schmidt@cs.wustl.edu> +// +// ============================================================================ + +ACE_RCSID(Bounded_Packet_Relay, Thread_Bounded_Packet_Relay, "$Id$") + +#include "ace/Task.h" +#include "ace/Timer_Heap_T.h" +#include "ace/Timer_Queue_Adapters.h" + +#include "Thread_Bounded_Packet_Relay.h" + +// constructor + +template <class RECEIVER, class ACTION> +Command<RECEIVER, ACTION>::Command (RECEIVER &recvr, + ACTION action) + : receiver_ (recvr), + action_ (action) +{ +} + +// invokes an operation. + +template <class RECEIVER, class ACTION> int +Command<RECEIVER, ACTION>::execute (void *arg) +{ + return (receiver_.*action_) (arg); +} + + +Text_Input_Device_Wrapper::Text_Input_Device_Wrapper (ACE_Thread_Manager * input_task_mgr, + size_t read_length, const char* text) + : Input_Device_Wrapper_Base (input_task_mgr) + , read_length_ (read_length) + , text_ (text) + , index_ (0) +{ +} + // ctor + +Text_Input_Device_Wrapper::~Text_Input_Device_Wrapper () +{ +} + // dtor + + +ACE_Message_Block * +Text_Input_Device_Wrapper::create_input_message () +{ + // construct a new message block to send + ACE_Message_Block *message; + ACE_NEW_RETURN (message, + ACE_Message_Block (read_length_), + 0); + + // zero out a "read" buffer to hold data + char read_buf [BUFSIZ]; + ACE_OS::memset (read_buf, 0, BUFSIZ); + + // loop through the text, filling in data to copy into + // the read buffer (leaving room for a terminating zero) + for (size_t i = 0; i < read_length_ - 1; ++i) + { + read_buf [i] = text_ [index_]; + index_ = (index_ + 1) % ACE_OS::strlen (text_); + } + + // Copy buf into the Message_Block and update the wr_ptr (). + if (mb->copy (read_buf, read_length_) < 0) + { + delete message; + ACE_ERROR_RETURN ((LM_ERROR, "read buffer copy failed"), 0); + } + + return message; +} + // creates a new message block, carrying data + // read from the underlying input device + + +Text_Output_Driver_Wrapper::Text_Output_Driver_Wrapper (int logging) + : logging_ (logging) +{ +} + // default ctor + +int +Text_Output_Driver_Wrapper::write_output_message (void *message) +{ + ACE_Message_Block *message; +} + // consume and possibly print out the passed message + +int +Text_Output_Driver_Wrapper::modify_device_settings (void *logging) +{ + if (logging) + { + logging_ = * ACE_static_cast (int *, logging); + } + else + { + ACE_ERROR_RETURN ((LM_ERROR, "null logging level pointer"), -1); + } +} + // modifies device settings based on passed pointer to a u_long + // turns logging on if u_long is non-zero, off if u_long is zero, + // and does nothing if the pointer is null. + + +User_Input_Task::User_Input_Task (Thread_Timer_Queue *queue, + Thread_Bounded_Packet_Relay_Driver &tbprd) + : ACE_Task_Base (ACE_Thread_Manager::instance ()), + queue_ (queue), + usecs_ (ACE_ONE_SECOND_IN_USECS), + driver_ (tbprd) +{ +} + // ctor + +int +User_Input_Task::svc (void) +{ + for (;;) + // call back to the driver's implementation on how to read and + // parse input. + if (this->driver_.get_next_request () == -1) + break; + + // we are done. + this->relay_->end_transmission (Bounded_Packet_Relay_Driver::CANCELLED); + this->queue_->deactivate (); + ACE_DEBUG ((LM_DEBUG, "terminating input thread\n")); + return 0; +} + // This method runs the event loop in the new thread. + + // = Some helper methods. + +int +User_Input_Task::set_packet_count (void *argument) +{ + if (argument) + { + packet_count_ = * ACE_static_cast (int *, argument); + return 0; + } + + ACE_ERROR_RETURN ((LM_ERROR, + "User_Input_Task::set_packet_count: null argument"), + -1); +} + // set the number of packets for the next transmission. + +int +User_Input_Task::set_arrival_period (void *argument) +{ + if (argument) + { + arrival_period_ = * ACE_static_cast (int *, argument); + return 0; + } + + ACE_ERROR_RETURN ((LM_ERROR, + "User_Input_Task::set_arrival_period: null argument"), + -1); +} + // sets the input device packet arrival period (usecs) + // for the next transmission. + +int +User_Input_Task::set_send_period (void *argument) +{ + if (argument) + { + send_period_ = * ACE_static_cast (int *, argument); + return 0; + } + + ACE_ERROR_RETURN ((LM_ERROR, + "User_Input_Task::set_send_period: null argument"), + -1); +} + // sets the period between output device sends (usecs) + // for the next transmission. + +int +User_Input_Task::set_duration_limit (void *argument) +{ + if (argument) + { + duration_limit_ = * ACE_static_cast (int *, argument); + return 0; + } + + ACE_ERROR_RETURN ((LM_ERROR, + "User_Input_Task::set_duration_limit: null argument"), + -1); +} + // sets a limit on the transmission duration (usecs) + +int +User_Input_Task::set_logging_level (void *argument) +{ + if (argument) + { + logging_level_ = * ACE_static_cast (int *, argument); + return 0; + } + + ACE_ERROR_RETURN ((LM_ERROR, + "User_Input_Task::set_logging_level: null argument"), + -1); +} + // sets logging level (0 or 1) for output device + // for the next transmission + +int +User_Input_Task::run_transmission (void *argument) +{ + // Macro to avoid "warning: unused parameter" type warning. + ACE_UNUSED_ARG (argument); + + if (relay_) + { + switch (relay_->start_transmission (packet_count_, + arrival_period_, + logging_level_)) + { + case 1: + ACE_OS::fprintf (ACE_STDERR, + "\nRun transmission: " + "transmission already in progress\n"); + return 0; + /* not reached */ + + case 0: + { + ACE_Time_Value now = ACE_OS::gettimeofday (); + ACE_Time_Value send_every (0, send_period_); + ACE_Time_Value send_at (send_every + now); + + Send_Handler *send_handler; + + ACE_NEW_RETURN (send_handler, + Send_Handler (packet_count_, + send_every, + *relay_, + *queue_), + -1); + + if (queue_->schedule (send_handler, 0, send_at) < 0) + { + ACE_ERROR_RETURN ((LM_ERROR, + "User_Input_Task::run_transmission: " + "failed to schedule send handler"), + -1); + } + + if (duration_limit_) + { + ACE_Time_Value terminate_at (0, duration_limit_); + terminate_at += now; + + Termination_Handler *termination_handler; + + ACE_NEW_RETURN (termination_handler, + Termination_Handler (*relay_, + *queue_), + -1); + + if (queue_->schedule (termination_handler, 0, terminate_at) < 0) + { + ACE_ERROR_RETURN ((LM_ERROR, + "User_Input_Task::run_transmission: " + "failed to schedule termination handler"), + -1); + } + } + + return 0; + } + /* not reached */ + + default: + return -1; + /* not reached */ + } + } + + ACE_ERROR_RETURN ((LM_ERROR, + "User_Input_Task::run_transmission: " + "relay not instantiated"), + -1); +} + // runs the next transmission (if one is not in progress) + +int +User_Input_Task::end_transmission (void *argument) +{ + // Macro to avoid "warning: unused parameter" type warning. + ACE_UNUSED_ARG (argument); + + if (relay_) + { + Bounded_Packet_Relay::Transmission_Status *status; + + status = + ACE_static_cast (Bounded_Packet_Relay::Transmission_Status *, + argument); + + if (status) + { + switch (relay_->end_transmission (*status) + { + case 1: + ACE_OS::fprintf (ACE_STDERR, + "\nEnd transmission: " + "no transmission in progress\n"); + return 0; + /* not reached */ + + case 0: + // cancel any remaining timers + ACE_Timer_Node_T <ACE_Event_Handler *> *node; + while ((node = queue_->timer_queue ().get_first ())) + { + queue->cancel (node->get_timer_id (), 0); + } + return 0; + /* not reached */ + + default: + return -1; + /* not reached */ + } + } + + ACE_ERROR_RETURN ((LM_ERROR, + "User_Input_Task::end_transmission: " + "null argument"), + -1); + } + + ACE_ERROR_RETURN ((LM_ERROR, + "User_Input_Task::end_transmission: " + "relay not instantiated"), + -1); +} + // ends the current transmission (if one is in progress) + +int +User_Input_Task::report_stats (void *argument) +{ + // Macro to avoid "warning: unused parameter" type warning. + ACE_UNUSED_ARG (argument); + + if (relay_) + { + switch (relay_->report_statistics ()) + { + case 1: + ACE_OS::fprintf (ACE_STDERR, + "\nRun transmission: " + "\transmission already in progress\n"); + /* fall through to next case */ + + case 0: + return 0; + /* not reached */ + + default: + return -1; + /* not reached */ + } + } + + ACE_ERROR_RETURN ((LM_ERROR, + "User_Input_Task::report_stats: " + "relay not instantiated"), + -1); +} + // reports statistics for the previous transmission + // (if one is not in progress) + + int +User_Input_Task::shutdown (void *argument) +{ + // Macro to avoid "warning: unused parameter" type warning. + ACE_UNUSED_ARG (argument); + +#if !defined (ACE_LACKS_PTHREAD_CANCEL) + // Cancel the thread timer queue task "preemptively." + ACE_Thread::cancel (this->queue_->thr_id ()); +#else + // Cancel the thread timer queue task "voluntarily." + this->queue_->deactivate (); +#endif /* ACE_LACKS_PTHREAD_CANCEL */ + + // -1 indicates we are shutting down the application. + return -1; +} + // Shutdown task. + + +Send_Handler::Send_Handler (u_long send_count, + const ACE_Time_Value &duration, + Bounded_Packet_Relay<ACE_Thread_Mutex> &relay, + Thread_Timer_Queue &queue) + : send_count_ (send_count) + , duration_ (duration) + , relay_ (relay) + , queue_ (queue) +{ +} + // ctor + +Send_Handler::~Send_Handler (void) +{ +} + // dtor + +int +Send_Handler::handle_timeout (const ACE_Time_Value ¤t_time, + const void *arg) +{ + switch (relay_->send_input ()) + { + case 0: + // decrement count of packets to relay + --send_count_; + /* fall through to next case */ + case 1: + if (send_count_ > 0) + { + // re-register the handler for a new timeout + if (queue_->schedule (this, 0, + duration_ + ACE_OS::gettimeofday ()) < 0) + { + ACE_ERROR_RETURN ((LM_ERROR, + "Send_Handler::handle_timeout: " + "failed to reschedule send handler"), + -1); + } + return 0; + } + else + { + // all packets are sent, time to cancel any other + // timers, end the transmission, and go away + ACE_Timer_Node_T <ACE_Event_Handler *> *node; + while ((node = queue_->timer_queue ().get_first ())) + { + queue->cancel (node->get_timer_id (), 0); + } + relay_->end_transmission (Bounded_Packet_Relay::COMPLETED); + delete this; + return 0; + } + /* not reached */ + default: + return -1; + } +} + // Call back hook. + +int +Send_Handler::cancelled (void) +{ + delete this; + return 0; +} + // Cancellation hook + + +Termination_Handler::Termination_Handler (Bounded_Packet_Relay<ACE_Thread_Mutex> &relay, + Thread_Timer_Queue &queue) + : relay_ (relay) + , queue_ (queue) +{ +} + // ctor + +Termination_Handler::~Termination_Handler (void) +{ +} + // dtor + +int +Termination_Handler::handle_timeout (const ACE_Time_Value ¤t_time, + const void *arg) +{ + // transmission timed out, cancel any other + // timers, end the transmission, and go away + ACE_Timer_Node_T <ACE_Event_Handler *> *node; + while ((node = queue_->timer_queue ().get_first ())) + { + queue->cancel (node->get_timer_id (), 0); + } + relay_->end_transmission (Bounded_Packet_Relay::TIMED_OUT); + delete this; + return 0; +} + // Call back hook. + +int +Termination_Handler::cancelled (void) +{ + delete this; + return 0; +} + // Cancellation hook + +Thread_Bounded_Packet_Relay_Driver::Thread_Bounded_Packet_Relay_Driver (void); + : input_task_ (&timer_queue_, *this) +{ +} +// ctor + +Thread_Bounded_Packet_Relay_Driver::~Thread_Bounded_Packet_Relay_Driver (void); +{ +} +// dtor + +int +Thread_Bounded_Packet_Relay_Driver::display_menu (void); +{ + static char menu[] = + "\n\n Options:\n" + " -------\n" + " 1 <number of packets to relay in one transmission>\n" + " min = 1 packet, default = 1000.\n" + " 2 <input packet arrival period (in usec)>\n" + " min = 1, default = 500.\n" + " 3 <output packet transmission period (in usec)>\n" + " min = 1, default = 1000.\n" + " 4 <limit on duration of transmission (in usec)>\n" + " min = 1, default = 1500000, no limit = 0.\n" + " 5 <logging level>\n" + " no logging = 0 (default), logging = 1.\n" + " 6 - run a transmission using the current settings\n" + " 7 - cancel transmission (if there is one running)\n" + " 8 - report statistics from the most recent transmission\n" + " 9 - quit the program\n"; + + ACE_OS::fprintf(ACE_STDERR, "%s", menu); + + return 0; +} +// display the user menu + +int +Thread_Bounded_Packet_Relay_Driver::init (void); +{ + typedef Command<Input_Task, Input_Task::ACTION> CMD; + + // initialize the <Command> objects with their corresponding + // methods from <Input_Task> + ACE_NEW_RETURN (packet_count_cmd_, + CMD (input_task_, &User_Input_Task::set_packet_count), + -1); + + ACE_NEW_RETURN (arrival_period_cmd_, + CMD (input_task_, &User_Input_Task::set_arrival_period), + -1); + + ACE_NEW_RETURN (transmit_period_cmd_, + CMD (input_task_, &User_Input_Task::set_send_period), + -1); + + ACE_NEW_RETURN (duration_limit_cmd_, + CMD (input_task_, &User_Input_Task::set_duration_limit), + -1); + + ACE_NEW_RETURN (logging_level_cmd_, + CMD (input_task_, &User_Input_Task::set_logging_level), + -1); + + ACE_NEW_RETURN (run_transmission_cmd_, + CMD (input_task_, &User_Input_Task::run_transmission), + -1); + + ACE_NEW_RETURN (cancel_transmission_cmd_, + CMD (input_task_, &User_Input_Task::end_transmission), + -1); + + ACE_NEW_RETURN (report_stats_cmd_, + CMD (input_task_, &User_Input_Task::report_stats), + -1); + + ACE_NEW_RETURN (shutdown_cmd_, + CMD (input_task_, &User_Input_Task::shutdown), + -1); + + if (this->input_task_.activate () == -1) + ACE_ERROR_RETURN ((LM_ERROR, "cannot activate input task"), -1); + + if (this->timer_queue_.activate () == -1) + ACE_ERROR_RETURN ((LM_ERROR, "cannot activate timer queue"), -1); + + if (ACE_Thread_Manager::instance ()->wait () == -1) + ACE_ERROR_RETURN ((LM_ERROR, "wait on Thread_Manager failed"),-1); + + return 0; +} +// initialize the driver + +int +Thread_Bounded_Packet_Relay_Driver::run (void); +{ + this->init (); + return 0; +} +// run the driver + + +#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) +template class ACE_Thread_Timer_Queue_Adapter<Timer_Heap>; +template class Bounded_Packet_Relay_Driver<Thread_Timer_Queue, + Input_Task, + Input_Task::ACTION>; +template class Command<Input_Task, Input_Task::ACTION>; +#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) +#pragma instantiate ACE_Thread_Timer_Queue_Adapter<Timer_Heap> +#pragma instantiate Bounded_Packet_Relay_Driver<Thread_Timer_Queue, \ + Input_Task, \ + Input_Task::ACTION> +#pragma instantiate Command<Input_Task, Input_Task::ACTION> +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ + +#if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0) +// These templates will specialized in libACE.* if the platforms does +// not define ACE_MT_SAFE. + +#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) +template class ACE_Thread_Condition<ACE_Thread_Mutex>; +template class ACE_Condition<ACE_Thread_Mutex>; +template class ACE_Event_Handler_Handle_Timeout_Upcall<ACE_Null_Mutex>; +template class ACE_Timer_Queue_T<ACE_Event_Handler *, ACE_Event_Handler_Handle_Timeout_Upcall<ACE_Null_Mutex>, ACE_Null_Mutex>; +template class ACE_Timer_Heap_T<ACE_Event_Handler *, ACE_Event_Handler_Handle_Timeout_Upcall<ACE_Null_Mutex>, ACE_Null_Mutex>; +template class ACE_Timer_Heap_Iterator_T<ACE_Event_Handler *, ACE_Event_Handler_Handle_Timeout_Upcall<ACE_Null_Mutex>, ACE_Null_Mutex>; +template class ACE_Timer_Queue_Iterator_T<ACE_Event_Handler *, ACE_Event_Handler_Handle_Timeout_Upcall<ACE_Null_Mutex>, ACE_Null_Mutex>; +#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) +#pragma instantiate ACE_Thread_Condition<ACE_Thread_Mutex> +#pragma instantiate ACE_Condition<ACE_Thread_Mutex> +#pragma instantiate ACE_Event_Handler_Handle_Timeout_Upcall<ACE_Null_Mutex> +#pragma instantiate ACE_Timer_Queue_T<ACE_Event_Handler *, ACE_Event_Handler_Handle_Timeout_Upcall<ACE_Null_Mutex>, ACE_Null_Mutex> +#pragma instantiate ACE_Timer_Heap_T<ACE_Event_Handler *, ACE_Event_Handler_Handle_Timeout_Upcall<ACE_Null_Mutex>, ACE_Null_Mutex> +#pragma instantiate ACE_Timer_Heap_Iterator_T<ACE_Event_Handler *, ACE_Event_Handler_Handle_Timeout_Upcall<ACE_Null_Mutex>, ACE_Null_Mutex> +#pragma instantiate ACE_Timer_Queue_Iterator_T<ACE_Event_Handler *, ACE_Event_Handler_Handle_Timeout_Upcall<ACE_Null_Mutex>, ACE_Null_Mutex> +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ +#endif /* ACE_MT_SAFE */ diff --git a/examples/Bounded_Packet_Relay/Thread_Bounded_Packet_Relay.h b/examples/Bounded_Packet_Relay/Thread_Bounded_Packet_Relay.h new file mode 100644 index 00000000000..32bff6f8990 --- /dev/null +++ b/examples/Bounded_Packet_Relay/Thread_Bounded_Packet_Relay.h @@ -0,0 +1,359 @@ +/* -*- C++ -*- */ + +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// examples +// +// = FILENAME +// Thread_Bounded_Packet_Relay.h +// +// = DESCRIPTION +// This code exercises the <ACE_Thread_Timer_Queue_Adapter> using +// an <ACE_Timer_Heap_T>. +// +// = AUTHORS +// Carlos O'Ryan <coryan@cs.wustl.edu> and +// Sergio Flores-Gaitan <sergio@cs.wustl.edu> +// +// ============================================================================ + +#if !defined (_THREAD_BOUNDED_PACKET_RELAY_H_) +#define _THREAD_BOUNDED_PACKET_RELAY_H_ + +#include "ace/Task.h" +#include "ace/Timer_Heap_T.h" +#include "ace/Timer_Queue_Adapters.h" +#include "BPR_Drivers.h" + +// These typedefs ensure that we use the minimal amount of locking +// necessary. +typedef ACE_Event_Handler_Handle_Timeout_Upcall<ACE_Null_Mutex> + Upcall; +typedef ACE_Timer_Heap_T<ACE_Event_Handler *, + Upcall, + ACE_Null_Mutex> + Timer_Heap; +typedef ACE_Timer_Heap_Iterator_T<ACE_Event_Handler *, + Upcall, + ACE_Null_Mutex> + Timer_Heap_Iterator; +typedef ACE_Thread_Timer_Queue_Adapter<Timer_Heap> + Thread_Timer_Queue; + +// Forward declaration. +class Thread_Bounded_Packet_Relay_Driver; + +template <class RECEIVER, class ACTION> +class Command : public Command_Base + // = TITLE + // Defines an abstract class that allows us to invoke commands + // without knowing anything about the implementation. This class + // is used in the <Bounded_Packet_Relay_Driver> to invoke operations + // of the driver. + // + // = DESCRIPTION + // This class declares an interface to execute operations, + // binding a RECEIVER object with an ACTION. The RECEIVER knows + // how to implement the operation. A class can invoke operations + // without knowing anything about it, or how it was implemented. +{ +public: + + Command (RECEIVER &recvr, ACTION action); + // Sets the <receiver_> of the Command to recvr, and the + // <action_> of the Command to <action>. + + virtual int execute (void *arg); + // Invokes the method <action_> from the object <receiver_>. + +private: + RECEIVER &receiver_; + // object where the method resides. + + ACTION action_; + // method that is going to be invoked. +}; + + +class Text_Input_Device_Wrapper : public Input_Device_Wrapper_Base + // = TITLE + // Defines a wrapper for a simple active looping text input + // pseudo-device. + // + // = DESCRIPTION + // The wrapper is an active object, running in its own thread, and uses a + // reactor to generate timeouts. When a timeout occurs, the wrapper + // calls its concrete message creation method. The wrapper then calls + // its base class message send method to forward the message to the + // receiver. + // + // A more sophisticated version of this class would use the reactive + // capabilities as well as the timeout generating capabilities of the + // reactor, multiplexing several input streams. Comments to this + // effect appear in the definition of the event loop method. +{ + public: + + Text_Input_Device_Wrapper (ACE_Thread_Manager * input_task_mgr, size_t read_length, const char* text); + // ctor + + ~Text_Input_Device_Wrapper (); + // dtor + +protected: + + virtual ACE_Message_Block *create_input_message (); + // creates a new message block, carrying data + // read from the underlying input device + +private: + + size_t read_length_; + // length of the buffer into which to "read" + + const char *text_; + // text to "read" into the buffer + + size_t index_; + // index into the string +}; + + +class Text_Output_Driver_Wrapper : public Output_Device_Wrapper_Base + // = TITLE + // Implements a simple wrapper for a output pseudo-device. + // + // = DESCRIPTION + // Data from the passed output message is printed to the standard + // output stream, if logging is turned on. +{ +public: + + Text_Output_Driver_Wrapper (int logging = 0); + // default ctor + + // = Command Accessible Entry Points + + virtual int write_output_message (void *message); + // consume and possibly print out the passed message + + virtual int modify_device_settings (void *logging); + // modifies device settings based on passed pointer to a u_long + // turns logging on if u_long is non-zero, off if u_long is zero, + // and does nothing if the pointer is null. + +private: + + int logging_; + // 0 if logging is turned off, non-zero otherwise +}; + + +class User_Input_Task : public ACE_Task_Base +{ + // = TITLE + // Read user actions on the Timer_Queue from stdin. + // + // = DESCRIPTION + // This class reads user input from stdin; those commands permit + // the control of a Timer_Queue, which is dispatched by another + // thread. +public: + + typedef int (User_Input_Task::*ACTION) (void *); + // trait for command accessible entry points + + User_Input_Task (Thread_Timer_Queue *queue, + Thread_Bounded_Packet_Relay_Driver &timer_queue_driver); + // ctor + + virtual int svc (void); + // This method runs the event loop in the new thread. + + // = Some helper methods. + + int set_packet_count (void *); + // set the number of packets for the next transmission. + + int set_arrival_period (void *); + // sets the input device packet arrival period (usecs) + // for the next transmission. + + int set_send_period (void *); + // sets the period between output device sends (usecs) + // for the next transmission. + + int set_duration_limit (void *); + // sets a limit on the transmission duration (usecs) + + int set_logging_level (void *); + // sets logging level (0 or 1) for output device + // for the next transmission + + int run_transmission (void *); + // runs the next transmission (if one is not in progress) + + int end_transmission (void *); + // ends the current transmission (if one is in progress) + + int report_stats (void *); + // reports statistics for the previous transmission + // (if one is not in progress) + + int shutdown (void *); + // Shutdown task. + +private: + + const int usecs_; + // How many microseconds are in a second. + + Bounded_Packet_Relay<ACE_Thread_Mutex> *relay_; + // the bounded packet relay + + Thread_Timer_Queue *queue_; + // The timer queue implementation. + + Thread_Bounded_Packet_Relay_Driver &driver_; + // The thread timer queue test driver. + + u_long packet_count_; + // count of packets to send in a transmission + + u_long arrival_period_; + // rate at which input packets are to arrive + + u_long send_period_; + // rate at which packets are to be relayed (usec) + + u_long duration_limit_; + // limit on the duration of the transmission (usec) + + u_long logging_level_; + // logging level + +}; + + + +class Send_Handler : public ACE_Event_Handler +{ + // = TITLE + // Event handler for message send timeout events. + // + // = DESCRIPTION + // The <handle_timeout> hook method calls the relay's send + // method and decrements its count of messages to send. + // If there are still messages to send, it re-registers itself + // with the timer queue. Otherwise it calls the relay's end + // transmission method, clears the timer queue, and then deletes "this". +public: + Send_Handler (u_long send_count, + const ACE_Time_Value &duration, + Bounded_Packet_Relay<ACE_Thread_Mutex> &relay, + Thread_Timer_Queue &queue); + // ctor + + ~Send_Handler (void); + // dtor + + virtual int handle_timeout (const ACE_Time_Value ¤t_time, + const void *arg); + // Call back hook. + + virtual int cancelled (void); + // Cancellation hook + +private: + + u_long send_count_; + // Count of the number of messages to send from the + // relay object to the output device object. + + ACE_Time_Value duration_; + // Store the expected duration until expiration, it is used to + // re-register the handler if there are still sends to perform. + + Bounded_Packet_Relay<ACE_Thread_Mutex> &relay_; + // Store a reference to the relay object on which to invoke + // the appropritate calls when the timer expires + + Thread_Timer_Queue &queue_; + // Store a reference to the timer queue, in which we'll re-register + // ourselves if there are still sends to perform + +}; + +class Termination_Handler : public ACE_Event_Handler +{ + // = TITLE + // Event handler for end transmission timeout events. + // + // = DESCRIPTION + // The <handle_timeout> hook method calls the relay's end + // transmission method, and then deletes "this". +public: + Termination_Handler (Bounded_Packet_Relay<ACE_Thread_Mutex> &relay, + Thread_Timer_Queue &queue); + // ctor + + ~Termination_Handler (void); + // dtor + + virtual int handle_timeout (const ACE_Time_Value ¤t_time, + const void *arg); + // Call back hook. + + virtual int cancelled (void); + // Cancellation hook + +private: + + Bounded_Packet_Relay<ACE_Thread_Mutex> &relay_; + // Store a reference to the relay object on which to invoke + // the end transmission call when the timer expires + + Thread_Timer_Queue &queue_; + // Store a reference to the timer queue, which we'll + // clear of all timers when this one expires. + +}; + +class Thread_Bounded_Packet_Relay_Driver : public Bounded_Packet_Relay_Driver <Thread_Timer_Queue> +{ + // = TITLE + // Implements an example application that exercises + // <Thread_Timer_Queue> timer queue. + // + // = DESCRIPTION + // This class implements a simple test driver for the + // <Thread_Timer_Queue>. The <display_menu> hook method is + // called from the base class to print a menu specific to the + // thread implementation of the timer queue. +public: + + Thread_Bounded_Packet_Relay_Driver (void); + // ctor + + ~Thread_Bounded_Packet_Relay_Driver (void); + // dtor + + virtual int display_menu (void); + // display the user menu + + virtual int init (void); + // initialize the driver + + virtual int run (void); + // run the driver + +private: + + User_Input_Task input_task_; + // Subclassed from ACE_Task. +}; + +#endif /* _THREAD_BOUNDED_PACKET_RELAY_H_ */ diff --git a/examples/Bounded_Packet_Relay/bpr_thread.cpp b/examples/Bounded_Packet_Relay/bpr_thread.cpp new file mode 100644 index 00000000000..dbc01b5e74c --- /dev/null +++ b/examples/Bounded_Packet_Relay/bpr_thread.cpp @@ -0,0 +1,51 @@ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// examples +// +// = FILENAME +// bpr_thread.cpp +// +// = DESCRIPTION +// Exercises drivers for a bounded packet relay, based on threaded timer queues. +// +// = AUTHORS +// Chris Gill <cdgill@cs.wustl.edu> +// +// Based on the Timer_Queue_Test example written by: +// +// Douglas Schmidt <schmidt@cs.wustl.edu> && +// Sergio Flores-Gaitan <sergio@cs.wustl.edu> +// +// ============================================================================ + +#include "ace/Auto_Ptr.h" +#include "Thread_Bounded_Packet_Relay.h" + +ACE_RCSID(Bounded_Packet_Relay, bpr_thread, "$Id$") + +typedef Bounded_Packet_Relay_Driver<Thread_Timer_Queue> + THREAD_BOUNDED_PACKET_RELAY_DRIVER; + +int +main (int, char *[]) +{ + // Auto ptr ensures that the driver memory is released + // automatically. + THREAD_BOUNDED_PACKET_RELAY_DRIVER *tbprd; + ACE_NEW_RETURN (tbprd, Thread_Bounded_Packet_Relay_Driver, -1); + + auto_ptr <THREAD_BOUNDED_PACKET_RELAY_DRIVER> driver (tbprd); + + return driver->run (); +} + +#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) +template class auto_ptr <THREAD_BOUNDED_PACKET_RELAY_DRIVER>; +template class ACE_Auto_Basic_Ptr <THREAD_BOUNDED_PACKET_RELAY_DRIVER>; +#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) +#pragma instantiate auto_ptr <THREAD_BOUNDED_PACKET_RELAY_DRIVER> +#pragma instantiate ACE_Auto_Basic_Ptr <THREAD_BOUNDED_PACKET_RELAY_DRIVER> +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ |