summaryrefslogtreecommitdiff
path: root/trunk/ACE/examples/Bounded_Packet_Relay/Thread_Bounded_Packet_Relay.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'trunk/ACE/examples/Bounded_Packet_Relay/Thread_Bounded_Packet_Relay.cpp')
-rw-r--r--trunk/ACE/examples/Bounded_Packet_Relay/Thread_Bounded_Packet_Relay.cpp770
1 files changed, 770 insertions, 0 deletions
diff --git a/trunk/ACE/examples/Bounded_Packet_Relay/Thread_Bounded_Packet_Relay.cpp b/trunk/ACE/examples/Bounded_Packet_Relay/Thread_Bounded_Packet_Relay.cpp
new file mode 100644
index 00000000000..a2f179a5348
--- /dev/null
+++ b/trunk/ACE/examples/Bounded_Packet_Relay/Thread_Bounded_Packet_Relay.cpp
@@ -0,0 +1,770 @@
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// examples
+//
+// = FILENAME
+// Thread_Bounded_Packet_Relay.cpp
+//
+// = DESCRIPTION
+// Method definitions for the threaded-bounded packet relay class.
+//
+// = AUTHORS
+// Chris Gill <cdgill@cs.wustl.edu> and
+// Douglas C. Schmidt <schmidt@cs.wustl.edu>
+//
+// Based on the Timer Queue Test example written by
+//
+// Carlos O'Ryan <coryan@cs.wustl.edu> and
+// Douglas C. Schmidt <schmidt@cs.wustl.edu> and
+// Sergio Flores-Gaitan <sergio@cs.wustl.edu>
+//
+// ============================================================================
+
+#include "ace/OS_NS_string.h"
+#include "ace/OS_NS_sys_time.h"
+#include "ace/Condition_T.h"
+#include "ace/Null_Mutex.h"
+
+#include "Thread_Bounded_Packet_Relay.h"
+
+typedef Thread_Bounded_Packet_Relay_Driver::MYCOMMAND DRIVER_CMD;
+typedef ACE_Command_Callback<BPR_Handler_Base, BPR_Handler_Base::ACTION> HANDLER_CMD;
+typedef ACE_Command_Callback<Send_Handler, Send_Handler::ACTION> SEND_HANDLER_CMD;
+
+ACE_RCSID(Bounded_Packet_Relay, Thread_Bounded_Packet_Relay, "$Id$")
+
+// Constructor.
+
+Text_Input_Device_Wrapper::Text_Input_Device_Wrapper (ACE_Thread_Manager *input_task_mgr,
+ size_t read_length,
+ const char* text,
+ int logging)
+ : Input_Device_Wrapper_Base (input_task_mgr),
+ read_length_ (read_length),
+ text_ (text),
+ index_ (0),
+ logging_ (logging),
+ packet_count_ (0)
+
+{
+}
+
+// Destructor.
+
+Text_Input_Device_Wrapper::~Text_Input_Device_Wrapper (void)
+{
+}
+
+// Modifies device settings based on passed pointer to a u_long.
+
+int
+Text_Input_Device_Wrapper::modify_device_settings (void *logging)
+{
+ packet_count_ = 0;
+
+ if (logging)
+ logging_ = *static_cast<int *> (logging);
+ else
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Text_Input_Device_Wrapper::modify_device_settings: "
+ "null argument"),
+ -1);
+ return 0;
+}
+
+
+// Creates a new message block, carrying data
+// read from the underlying input device.
+
+ACE_Message_Block *
+Text_Input_Device_Wrapper::create_input_message (void)
+{
+
+ // Construct a new message block to send.
+ ACE_Message_Block *mb;
+ ACE_NEW_RETURN (mb,
+ ACE_Message_Block (read_length_),
+ 0);
+
+ // Zero out a "read" buffer to hold data.
+ char read_buf [BUFSIZ];
+ ACE_OS::memset (read_buf, 0, BUFSIZ);
+
+ // Loop through the text, filling in data to copy into the read
+ // buffer (leaving room for a terminating zero).
+ for (size_t i = 0; i < read_length_ - 1; ++i)
+ {
+ read_buf [i] = text_ [index_];
+ index_ = (index_ + 1) % ACE_OS::strlen (text_);
+ }
+
+ // Copy buf into the Message_Block and update the wr_ptr ().
+ if (mb->copy (read_buf, read_length_) < 0)
+ {
+ delete mb;
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "read buffer copy failed"),
+ 0);
+ }
+
+ // log packet creation if logging is turned on
+ if (logging_ & Text_Input_Device_Wrapper::LOG_MSGS_CREATED)
+ {
+ ++packet_count_;
+ ACE_DEBUG ((LM_DEBUG, "input message %d created\n",
+ packet_count_));
+ }
+
+ return mb;
+}
+
+// Constructor.
+
+Text_Output_Device_Wrapper::Text_Output_Device_Wrapper (int logging)
+ : logging_ (logging)
+{
+}
+
+// Consume and possibly print out the passed message.
+
+int
+Text_Output_Device_Wrapper::write_output_message (void *message)
+{
+ if (message)
+ {
+ ++packet_count_;
+
+ if (logging_ & Text_Output_Device_Wrapper::LOG_MSGS_RCVD)
+ ACE_DEBUG ((LM_DEBUG, "output message %d received\n",
+ packet_count_));
+
+ if (logging_ & Text_Output_Device_Wrapper::PRINT_MSGS_RCVD)
+ ACE_DEBUG ((LM_DEBUG, "output message %d:\n[%s]\n",
+ packet_count_,
+ static_cast<ACE_Message_Block *> (message)->
+ rd_ptr ()));
+
+ delete static_cast<ACE_Message_Block *> (message);
+ return 0;
+ }
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Text_Output_Device_Wrapper::"
+ "write_output_message: null argument"), -1);
+}
+
+// Modifies device settings based on passed pointer to a u_long.
+
+int
+Text_Output_Device_Wrapper::modify_device_settings (void *logging)
+{
+ packet_count_ = 0;
+
+ if (logging)
+ logging_ = *static_cast<int *> (logging);
+ else
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Text_Output_Device_Wrapper::modify_device_settings: "
+ "null argument"),
+ -1);
+ return 0;
+}
+
+// Constructor.
+
+User_Input_Task::User_Input_Task (Bounded_Packet_Relay *relay,
+ Thread_Timer_Queue *queue,
+ Thread_Bounded_Packet_Relay_Driver &tbprd)
+ : ACE_Task_Base (ACE_Thread_Manager::instance ()),
+ usecs_ (ACE_ONE_SECOND_IN_USECS),
+ relay_ (relay),
+ queue_ (queue),
+ driver_ (tbprd)
+{
+}
+
+// Destructor.
+
+User_Input_Task::~User_Input_Task (void)
+{
+ this->clear_all_timers ();
+}
+
+// Runs the main event loop.
+
+int
+User_Input_Task::svc (void)
+{
+ for (;;)
+ // Call back to the driver's implementation of how to read and
+ // parse input.
+ if (this->driver_.get_next_request () == -1)
+ break;
+
+ // We are done.
+ this->relay_->end_transmission (Bounded_Packet_Relay::CANCELLED);
+ this->queue_->deactivate ();
+ ACE_DEBUG ((LM_DEBUG,
+ "terminating user input thread\n"));
+ this->clear_all_timers ();
+ return 0;
+}
+
+// Sets the number of packets for the next transmission.
+
+int
+User_Input_Task::set_packet_count (void *argument)
+{
+ if (argument)
+ {
+ driver_.packet_count (*static_cast<int *> (argument));
+ return 0;
+ }
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "User_Input_Task::set_packet_count: null argument"),
+ -1);
+}
+
+// Sets the input device packet arrival period (usecs) for the next
+// transmission.
+
+int
+User_Input_Task::set_arrival_period (void *argument)
+{
+ if (argument)
+ {
+ driver_.arrival_period (*static_cast<int *> (argument));
+ return 0;
+ }
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "User_Input_Task::set_arrival_period: null argument"),
+ -1);
+}
+
+// Sets the period between output device sends (usecs) for the next
+// transmission.
+
+int
+User_Input_Task::set_send_period (void *argument)
+{
+ if (argument)
+ {
+ driver_.send_period (*static_cast<int *> (argument));
+ return 0;
+ }
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "User_Input_Task::set_send_period: null argument"),
+ -1);
+}
+
+// Sets a limit on the transmission duration (usecs).
+
+int
+User_Input_Task::set_duration_limit (void *argument)
+{
+ if (argument)
+ {
+ driver_.duration_limit (*static_cast<int *> (argument));
+ return 0;
+ }
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "User_Input_Task::set_duration_limit: null argument"),
+ -1);
+}
+
+// Sets logging level (0 or 1) for output device for the next
+// transmission.
+
+int
+User_Input_Task::set_logging_level (void *argument)
+{
+ if (argument)
+ {
+ driver_.logging_level (*static_cast<int *> (argument));
+ return 0;
+ }
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "User_Input_Task::set_logging_level: null argument"),
+ -1);
+}
+
+// Runs the next transmission (if one is not in progress).
+
+int
+User_Input_Task::run_transmission (void *)
+{
+ if (relay_)
+ {
+ switch (relay_->start_transmission (driver_.packet_count (),
+ driver_.arrival_period (),
+ driver_.logging_level ()))
+ {
+ case 1:
+ ACE_DEBUG ((LM_DEBUG,
+ "\nRun transmission: "
+ " Transmission already in progress\n"));
+ return 0;
+ /* NOTREACHED */
+ case 0:
+ {
+ ACE_Time_Value now = ACE_OS::gettimeofday ();
+ ACE_Time_Value send_every (0, driver_.send_period ());
+ ACE_Time_Value send_at (send_every + now);
+
+ Send_Handler *send_handler;
+ ACE_NEW_RETURN (send_handler,
+ Send_Handler (driver_.packet_count (),
+ send_every,
+ *relay_,
+ *queue_,
+ driver_),
+ -1);
+ if (queue_->schedule (send_handler, 0, send_at) < 0)
+ {
+ delete send_handler;
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "User_Input_Task::run_transmission: "
+ "failed to schedule send handler"),
+ -1);
+ }
+ if (driver_.duration_limit ())
+ {
+ ACE_Time_Value terminate_at (0, driver_.duration_limit ());
+ terminate_at += now;
+
+ Termination_Handler *termination_handler;
+
+ termination_handler =
+ new Termination_Handler (*relay_,
+ *queue_,
+ driver_);
+
+ if (! termination_handler)
+ {
+ this->clear_all_timers ();
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "User_Input_Task::run_transmission: "
+ "failed to allocate termination "
+ "handler"),
+ -1);
+ }
+ if (queue_->schedule (termination_handler,
+ 0, terminate_at) < 0)
+ {
+ delete termination_handler;
+ this->clear_all_timers ();
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "User_Input_Task::run_transmission: "
+ "failed to schedule termination "
+ "handler"),
+ -1);
+ }
+ }
+ return 0;
+ }
+ /* NOTREACHED */
+ default:
+ return -1;
+ /* NOTREACHED */
+ }
+ }
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "User_Input_Task::run_transmission: "
+ "relay not instantiated"),
+ -1);
+}
+
+// Ends the current transmission (if one is in progress).
+
+int
+User_Input_Task::end_transmission (void *)
+{
+ if (relay_)
+ {
+ switch (relay_->end_transmission (Bounded_Packet_Relay::CANCELLED))
+ {
+ case 1:
+ ACE_DEBUG ((LM_DEBUG,
+ "\nEnd transmission: "
+ "no transmission in progress\n"));
+ /* Fall through to next case */
+ case 0:
+ // Cancel any remaining timers.
+ this->clear_all_timers ();
+ return 0;
+ /* NOTREACHED */
+ default:
+ return -1;
+ /* NOTREACHED */
+ }
+ }
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "User_Input_Task::end_transmission: "
+ "relay not instantiated"),
+ -1);
+}
+
+// Reports statistics for the previous transmission
+// (if one is not in progress).
+
+int
+User_Input_Task::report_stats (void *)
+{
+ if (relay_)
+ {
+ switch (relay_->report_statistics ())
+ {
+ case 1:
+ ACE_DEBUG ((LM_DEBUG,
+ "\nRun transmission: "
+ "\ntransmission already in progress\n"));
+ return 0;
+ /* NOTREACHED */
+
+ case 0:
+ this->clear_all_timers ();
+ return 0;
+ /* NOTREACHED */
+
+ default:
+ return -1;
+ /* NOTREACHED */
+ }
+ }
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "User_Input_Task::report_stats: "
+ "relay not instantiated"),
+ -1);
+}
+
+// Shut down the task.
+
+int
+User_Input_Task::shutdown (void *)
+{
+ // Clear any outstanding timers.
+ this->clear_all_timers ();
+
+#if !defined (ACE_LACKS_PTHREAD_CANCEL)
+ // Cancel the thread timer queue task "preemptively."
+ ACE_Thread::cancel (this->queue_->thr_id ());
+#else
+ // Cancel the thread timer queue task "voluntarily."
+ this->queue_->deactivate ();
+#endif /* ACE_LACKS_PTHREAD_CANCEL */
+
+ // -1 indicates we are shutting down the application.
+ return -1;
+}
+
+// Helper method: clears all timers.
+
+int
+User_Input_Task::clear_all_timers (void)
+{
+ // loop through the timers in the queue, cancelling each one
+ for (ACE_Timer_Node_T <ACE_Event_Handler *> *node;
+ (node = queue_->timer_queue ()->get_first ()) != 0;
+ )
+ queue_->timer_queue ()->cancel (node->get_timer_id (), 0, 0);
+
+ return 0;
+}
+
+// Constructor.
+
+BPR_Handler_Base::BPR_Handler_Base (Bounded_Packet_Relay &relay,
+ Thread_Timer_Queue &queue)
+ : relay_ (relay),
+ queue_ (queue)
+{
+}
+
+// Destructor.
+
+BPR_Handler_Base::~BPR_Handler_Base (void)
+{
+}
+
+// Helper method: clears all timers.
+
+int
+BPR_Handler_Base::clear_all_timers (void *)
+{
+ // Loop through the timers in the queue, cancelling each one.
+
+ for (ACE_Timer_Node_T <ACE_Event_Handler *> *node;
+ (node = queue_.timer_queue ()->get_first ()) != 0;
+ )
+ queue_.timer_queue ()->cancel (node->get_timer_id (), 0, 0);
+ // queue_.cancel (node->get_timer_id (), 0);
+
+ // Invoke the handler's (virtual) destructor
+ delete this;
+
+ return 0;
+}
+
+// Constructor.
+
+Send_Handler::Send_Handler (u_long send_count,
+ const ACE_Time_Value &duration,
+ Bounded_Packet_Relay &relay,
+ Thread_Timer_Queue &queue,
+ Thread_Bounded_Packet_Relay_Driver &driver)
+ : BPR_Handler_Base (relay, queue),
+ send_count_ (send_count),
+ duration_ (duration),
+ driver_ (driver)
+{
+}
+
+// Destructor.
+
+Send_Handler::~Send_Handler (void)
+{
+}
+
+// Call back hook.
+
+int
+Send_Handler::handle_timeout (const ACE_Time_Value &,
+ const void *)
+{
+ switch (relay_.send_input ())
+ {
+ case 0:
+ // Decrement count of packets to relay.
+ --send_count_;
+ /* Fall through to next case. */
+ case 1:
+ if (send_count_ > 0)
+ {
+ // Enqueue a deferred callback to the reregister command.
+ SEND_HANDLER_CMD *re_register_callback_;
+ ACE_NEW_RETURN (re_register_callback_,
+ SEND_HANDLER_CMD (*this,
+ &Send_Handler::reregister),
+ -1);
+ return queue_.enqueue_command (re_register_callback_);
+ }
+ else
+ {
+ // All packets are sent, time to end the transmission, redisplay
+ // the user menu, cancel any other timers, and go away.
+ relay_.end_transmission (Bounded_Packet_Relay::COMPLETED);
+ driver_.display_menu ();
+
+ // Enqueue a deferred callback to the clear_all_timers command.
+ HANDLER_CMD *clear_timers_callback_;
+ ACE_NEW_RETURN (clear_timers_callback_,
+ HANDLER_CMD (*this,
+ &BPR_Handler_Base::clear_all_timers),
+ -1);
+ return queue_.enqueue_command (clear_timers_callback_);
+ }
+ /* NOTREACHED */
+ default:
+ return -1;
+ }
+}
+
+// Cancellation hook.
+
+int
+Send_Handler::cancelled (void)
+{
+ delete this;
+ return 0;
+}
+
+// Helper method: re-registers this timer
+
+int
+Send_Handler::reregister (void *)
+{
+ // Re-register the handler for a new timeout.
+ if (queue_.schedule (this,
+ 0,
+ duration_ + ACE_OS::gettimeofday ()) < 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Send_Handler::reregister: "
+ "failed to reschedule send handler"),
+ -1);
+
+ return 0;
+}
+
+
+// Constructor.
+
+Termination_Handler::Termination_Handler (Bounded_Packet_Relay &relay,
+ Thread_Timer_Queue &queue,
+ Thread_Bounded_Packet_Relay_Driver &driver)
+ : BPR_Handler_Base (relay, queue),
+ driver_ (driver)
+{
+}
+
+// Destructor.
+
+Termination_Handler::~Termination_Handler (void)
+{
+}
+
+// Call back hook.
+
+int
+Termination_Handler::handle_timeout (const ACE_Time_Value &,
+ const void *)
+{
+ // Transmission timed out, so end the transmission, display the user
+ // menu, and register a callback to clear the timer queue and then
+ // make this object go away.
+ relay_.end_transmission (Bounded_Packet_Relay::TIMED_OUT);
+ driver_.display_menu ();
+
+ // Enqueue a deferred callback to the clear_all_timers command.
+ HANDLER_CMD *clear_timers_callback_;
+ ACE_NEW_RETURN (clear_timers_callback_,
+ HANDLER_CMD (*this,
+ &BPR_Handler_Base::clear_all_timers),
+ -1);
+ return queue_.enqueue_command (clear_timers_callback_);
+}
+
+// Cancellation hook
+
+int
+Termination_Handler::cancelled (void)
+{
+ delete this;
+ return 0;
+}
+
+// Constructor.
+
+Thread_Bounded_Packet_Relay_Driver::Thread_Bounded_Packet_Relay_Driver (Bounded_Packet_Relay *relay)
+ : input_task_ (relay, &timer_queue_, *this)
+{
+}
+
+// Destructor.
+
+Thread_Bounded_Packet_Relay_Driver::~Thread_Bounded_Packet_Relay_Driver (void)
+{
+}
+
+// Display the user menu.
+
+int
+Thread_Bounded_Packet_Relay_Driver::display_menu (void)
+{
+ static char menu[] =
+ "\n\n Options:\n"
+ " ----------------------------------------------------------------------\n"
+ " 1 <number of packets to relay in one transmission = %d>\n"
+ " min = 1 packet.\n"
+ " 2 <input packet arrival period (in usec) = %d>\n"
+ " min = 1.\n"
+ " 3 <output packet transmission period (in usec) = %d>\n"
+ " min = 1.\n"
+ " 4 <limit on duration of transmission (in usec) = %d>\n"
+ " min = 1, no limit = 0.\n"
+ " 5 <logging level flags = %d>\n"
+ " no logging = 0,\n"
+ " log packets created by input device = 1,\n"
+ " log packets consumed by output device = 2,\n"
+ " logging options 1,2 = 3,\n"
+ " print contents of packets consumed by output put device = 4,\n"
+ " logging options 1,4 = 5,\n"
+ " logging options 2,4 = 6,\n"
+ " logging options 1,2,4 = 7.\n"
+ " ----------------------------------------------------------------------\n"
+ " 6 - runs a transmission using the current settings\n"
+ " 7 - cancels a transmission (if there is one running)\n"
+ " 8 - reports statistics from the most recent transmission\n"
+ " 9 - quits the program\n"
+ " ----------------------------------------------------------------------\n"
+ " Please enter your choice: ";
+
+ ACE_DEBUG ((LM_DEBUG,
+ menu,
+ this->packet_count (),
+ this->arrival_period (),
+ this->send_period (),
+ this->duration_limit (),
+ this->logging_level ()));
+
+ return 0;
+}
+
+// Initialize the driver.
+
+int
+Thread_Bounded_Packet_Relay_Driver::init (void)
+{
+ // Initialize the <Command> objects with their corresponding
+ // methods from <User_Input_Task>.
+ ACE_NEW_RETURN (packet_count_cmd_,
+ DRIVER_CMD (input_task_,
+ &User_Input_Task::set_packet_count),
+ -1);
+ ACE_NEW_RETURN (arrival_period_cmd_,
+ DRIVER_CMD (input_task_,
+ &User_Input_Task::set_arrival_period),
+ -1);
+ ACE_NEW_RETURN (transmit_period_cmd_,
+ DRIVER_CMD (input_task_,
+ &User_Input_Task::set_send_period),
+ -1);
+ ACE_NEW_RETURN (duration_limit_cmd_,
+ DRIVER_CMD (input_task_,
+ &User_Input_Task::set_duration_limit),
+ -1);
+ ACE_NEW_RETURN (logging_level_cmd_,
+ DRIVER_CMD (input_task_,
+ &User_Input_Task::set_logging_level),
+ -1);
+ ACE_NEW_RETURN (run_transmission_cmd_,
+ DRIVER_CMD (input_task_,
+ &User_Input_Task::run_transmission),
+ -1);
+ ACE_NEW_RETURN (cancel_transmission_cmd_,
+ DRIVER_CMD (input_task_,
+ &User_Input_Task::end_transmission),
+ -1);
+ ACE_NEW_RETURN (report_stats_cmd_,
+ DRIVER_CMD (input_task_,
+ &User_Input_Task::report_stats),
+ -1);
+ ACE_NEW_RETURN (shutdown_cmd_,
+ DRIVER_CMD (input_task_,
+ &User_Input_Task::shutdown),
+ -1);
+ if (this->input_task_.activate () == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "cannot activate input task"),
+ -1);
+ else if (this->timer_queue_.activate () == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "cannot activate timer queue"),
+ -1);
+ else if (ACE_Thread_Manager::instance ()->wait () == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "wait on Thread_Manager failed"),
+ -1);
+ return 0;
+}
+
+// Run the driver
+
+int
+Thread_Bounded_Packet_Relay_Driver::run (void)
+{
+ this->init ();
+ return 0;
+}
+