summaryrefslogtreecommitdiff
path: root/ACE/examples/Bounded_Packet_Relay/BPR_Drivers.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ACE/examples/Bounded_Packet_Relay/BPR_Drivers.cpp')
-rw-r--r--ACE/examples/Bounded_Packet_Relay/BPR_Drivers.cpp535
1 files changed, 535 insertions, 0 deletions
diff --git a/ACE/examples/Bounded_Packet_Relay/BPR_Drivers.cpp b/ACE/examples/Bounded_Packet_Relay/BPR_Drivers.cpp
new file mode 100644
index 00000000000..dde26ed3ef9
--- /dev/null
+++ b/ACE/examples/Bounded_Packet_Relay/BPR_Drivers.cpp
@@ -0,0 +1,535 @@
+// $Id$
+
+// ============================================================================
+// = LIBRARY
+// examples
+//
+// = FILENAME
+// BPR_Driver.cpp
+//
+// = DESCRIPTION
+// This code builds an abstraction to factor out common code for
+// the different implementations of the Timer_Queue.
+//
+// = AUTHORS
+// Chris Gill <cdgill@cs.wustl.edu> and
+// Douglas C. Schmidt <schmidt@cs.wustl.edu>
+//
+// Based on the Timer Queue Test example written by
+//
+// Carlos O'Ryan <coryan@cs.wustl.edu> and
+// Douglas C. Schmidt <schmidt@cs.wustl.edu> and
+// Sergio Flores-Gaitan <sergio@cs.wustl.edu>
+//
+// ============================================================================
+
+#include "ace/OS_NS_sys_time.h"
+#include "BPR_Drivers.h"
+
+ACE_RCSID(Bounded_Packet_Relay, BPR_Drivers, "$Id$")
+
+// Constructor.
+
+Input_Device_Wrapper_Base::Input_Device_Wrapper_Base (ACE_Thread_Manager *input_task_mgr)
+ : ACE_Task_Base (input_task_mgr),
+ send_input_msg_cmd_ (0),
+ input_period_ (ACE_ONE_SECOND_IN_USECS),
+ is_active_ (0),
+ send_count_ (0)
+{
+}
+
+// Destructor.
+
+Input_Device_Wrapper_Base::~Input_Device_Wrapper_Base (void)
+{
+}
+
+// Sets send input message command in the input device driver object.
+
+int
+Input_Device_Wrapper_Base::set_send_input_msg_cmd (ACE_Command_Base *send_input_msg_cmd)
+{
+ // Set the new command. Input device is not responsible
+ // for deleting the old command, if any.
+ send_input_msg_cmd_ = send_input_msg_cmd;
+ return 0;
+}
+
+// Sets period between when input messages are produced.
+
+int
+Input_Device_Wrapper_Base::set_input_period (u_long input_period)
+{
+ input_period_ = input_period;
+ return 0;
+}
+
+// Sets count of messages to send.
+
+int
+Input_Device_Wrapper_Base::set_send_count (long count)
+{
+ send_count_ = count;
+ return 0;
+}
+
+// Request that the input device stop sending messages
+// and terminate its thread. Should return 1 if it will do so, 0
+// if it has already done so, or -1 if there is a problem doing so.
+
+int
+Input_Device_Wrapper_Base::request_stop (void)
+{
+ if (is_active_)
+ {
+ is_active_ = 0;
+ return 1;
+ }
+
+ return 0;
+}
+
+// This method runs the input device loop in the new thread.
+
+int
+Input_Device_Wrapper_Base::svc (void)
+{
+ ACE_Time_Value timeout;
+ ACE_Message_Block *message;
+
+ // Set a flag to indicate we're active.
+ is_active_ = 1;
+
+ // Start with the total count of messages to send.
+ for (current_count_ = send_count_;
+ // While we're still marked active, and there are packets to send.
+ (is_active_) && (current_count_ != 0);
+ )
+ {
+ // Create an input message to send.
+ message = create_input_message ();
+ if (message == 0)
+ {
+ if (is_active_)
+ {
+ is_active_ = 0;
+ ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n",
+ "Failed to create input message object"),
+ -1);
+ }
+
+ break;
+ }
+
+ // Make sure there is a send command object.
+ if (send_input_msg_cmd_ == 0)
+ {
+ delete message;
+ if (is_active_)
+ {
+ is_active_ = 0;
+ ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n",
+ "send message command object not instantiated"),
+ -1);
+ }
+
+ break;
+ }
+
+ // Send the input message.
+ if (send_input_msg_cmd_->execute ((void *) message) < 0)
+ {
+ delete message;
+ if (is_active_)
+ {
+ is_active_ = 0;
+ ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n",
+ "Failed executing send message command object"),
+ -1);
+ }
+
+ break;
+ }
+
+ // If all went well, decrement count of messages to send, and
+ // run the reactor event loop unti we get a timeout or something
+ // happens in a registered upcall.
+ if (current_count_ > 0)
+ --current_count_;
+
+ timeout = ACE_Time_Value (0, input_period_);
+ reactor_.run_event_loop (timeout);
+ }
+
+ is_active_ = 0;
+
+ return 0;
+}
+
+// Sends a newly created message block, carrying data read from the
+// underlying input device, by passing a pointer to the message block
+// to its command execution.
+
+int
+Input_Device_Wrapper_Base::send_input_message (ACE_Message_Block *amb)
+{
+ if (send_input_msg_cmd_)
+ return send_input_msg_cmd_->execute ((void *) amb);
+ else
+ {
+ if (is_active_)
+ ACE_ERROR ((LM_ERROR, "%t %p\n",
+ "Input_Device_Wrapper_Base::send_input_message: "
+ "command object not instantiated"));
+
+ return -1;
+ }
+}
+
+Output_Device_Wrapper_Base::~Output_Device_Wrapper_Base (void)
+{
+}
+
+// Constructor.
+
+Bounded_Packet_Relay::Bounded_Packet_Relay (ACE_Thread_Manager *input_task_mgr,
+ Input_Device_Wrapper_Base *input_wrapper,
+ Output_Device_Wrapper_Base *output_wrapper)
+ : is_active_ (0),
+ input_task_mgr_ (input_task_mgr),
+ input_wrapper_ (input_wrapper),
+ output_wrapper_ (output_wrapper),
+ queue_ (Bounded_Packet_Relay::DEFAULT_HWM,
+ Bounded_Packet_Relay::DEFAULT_LWM),
+ queue_hwm_ (Bounded_Packet_Relay::DEFAULT_HWM),
+ queue_lwm_ (Bounded_Packet_Relay::DEFAULT_LWM),
+ transmission_number_ (0),
+ packets_sent_ (0),
+ status_ (Bounded_Packet_Relay::UN_INITIALIZED),
+ transmission_start_ (ACE_Time_Value::zero),
+ transmission_end_ (ACE_Time_Value::zero)
+{
+ if (input_task_mgr_ == 0)
+ input_task_mgr_ = ACE_Thread_Manager::instance ();
+}
+
+// Destructor.
+
+Bounded_Packet_Relay::~Bounded_Packet_Relay (void)
+{
+ // Reactivate the queue, and then clear it.
+ queue_.activate ();
+ while (! queue_.is_empty ())
+ {
+ ACE_Message_Block *msg;
+ queue_.dequeue_head (msg);
+ delete msg;
+ }
+
+}
+
+// Requests output be sent to output device.
+
+int
+Bounded_Packet_Relay::send_input (void)
+{
+ // Don't block, return immediately if queue is empty.
+ ACE_Message_Block *item;
+
+ // Using a separate (non-const) time value
+ // is necessary on some platforms
+ ACE_Time_Value immediate (ACE_Time_Value::zero);
+
+ if (queue_.dequeue_head (item,
+ &immediate) < 0)
+ return 1;
+
+ // If a message block was dequeued, send it to the output device.
+
+ if (output_wrapper_->write_output_message ((void *) item) < 0)
+ {
+ if (is_active_)
+ ACE_ERROR ((LM_ERROR,
+ "%t %p\n",
+ "failed to write to output device object"));
+
+ return -1;
+ }
+
+ // If all went OK, increase count of packets sent.
+ ++packets_sent_;
+ return 0;
+}
+
+// Requests a transmission be started.
+
+int
+Bounded_Packet_Relay::start_transmission (u_long packet_count,
+ u_long arrival_period,
+ int logging_level)
+{
+ // Serialize access to start and end transmission calls, statistics
+ // reporting calls.
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->transmission_lock_, -1);
+
+ // If a transmission is already in progress, just return.
+ if (is_active_)
+ return 1;
+
+ // Set transmission in progress flag true.
+ is_active_ = 1;
+
+ // Update statistics for a new transmission.
+ ++transmission_number_;
+ packets_sent_ = 0;
+ status_ = STARTED;
+ transmission_start_ = ACE_OS::gettimeofday ();
+
+ // Reactivate the queue, and then clear it.
+ queue_.activate ();
+ while (! queue_.is_empty ())
+ {
+ ACE_Message_Block *msg;
+ queue_.dequeue_head (msg);
+ delete msg;
+ }
+
+ // Initialize the output device.
+ if (output_wrapper_->modify_device_settings ((void *) &logging_level) < 0)
+ {
+ status_ = ERROR_DETECTED;
+ transmission_end_ = ACE_OS::gettimeofday ();
+ is_active_ = 0;
+ ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n",
+ "failed to initialize output device object"),
+ -1);
+ }
+ // Initialize the input device.
+ if (input_wrapper_->modify_device_settings ((void *) &logging_level) < 0)
+ {
+ status_ = ERROR_DETECTED;
+ transmission_end_ = ACE_OS::gettimeofday ();
+ is_active_ = 0;
+ ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n",
+ "failed to initialize output device object"),
+ -1);
+ }
+ else if (input_wrapper_->set_input_period (arrival_period) < 0)
+ {
+ status_ = ERROR_DETECTED;
+ transmission_end_ = ACE_OS::gettimeofday ();
+ is_active_ = 0;
+ ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n",
+ "failed to initialize input device object"),
+ -1);
+ }
+ else if (input_wrapper_->set_send_count (packet_count) < 0)
+ {
+ status_ = ERROR_DETECTED;
+ transmission_end_ = ACE_OS::gettimeofday ();
+ is_active_ = 0;
+ ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n",
+ "failed to initialize input device object"),
+ -1);
+ }
+ // Activate the input device.
+ else if (input_wrapper_->activate () < 0)
+ {
+ status_ = ERROR_DETECTED;
+ transmission_end_ = ACE_OS::gettimeofday ();
+ is_active_ = 0;
+ ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n",
+ "failed to activate input device object"),
+ -1);
+ }
+
+ // If all went well, print a startup message and return success.
+ ACE_DEBUG ((LM_DEBUG,
+ "\n\nTransmission %u started\n\n",
+ transmission_number_));
+ return 0;
+}
+
+// Requests a transmission be ended.
+
+int
+Bounded_Packet_Relay::end_transmission (Transmission_Status status)
+{
+ // Serialize access to start and end transmission calls,
+ // statistics reporting calls.
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->transmission_lock_, -1);
+
+ // If a transmission is not already in progress, just return.
+ if (! is_active_)
+ return 1;
+
+ // Set transmission in progress flag false.
+ is_active_ = 0;
+
+ // Ask the the input thread to stop.
+ if (input_wrapper_->request_stop () < 0)
+ {
+ status_ = ERROR_DETECTED;
+ transmission_end_ = ACE_OS::gettimeofday ();
+ ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n",
+ "failed asking input device thread to stop"),
+ -1);
+ }
+
+ // Deactivate the queue, allowing all waiting threads to continue.
+ queue_.deactivate ();
+
+ // Wait for input thread to stop.
+ input_task_mgr_->wait_task (input_wrapper_);
+
+ // Reactivate the queue, and then clear it.
+ queue_.activate ();
+ while (! queue_.is_empty ())
+ {
+ ACE_Message_Block *msg;
+ queue_.dequeue_head (msg);
+ delete msg;
+ }
+
+ // If all went well, set passed status, stamp end time, print a
+ // termination message, and return success.
+ status_ = status;
+ transmission_end_ = ACE_OS::gettimeofday ();
+ ACE_DEBUG ((LM_DEBUG,
+ "\n\nTransmission %u ended with status: %s\n\n",
+ transmission_number_, status_msg ()));
+ return 0;
+}
+
+// Requests a report of statistics from the last transmission.
+
+int
+Bounded_Packet_Relay::report_statistics (void)
+{
+ // Serialize access to start and end transmission calls,
+ // statistics reporting calls.
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->transmission_lock_, -1);
+
+ // If a transmission is already in progress, just return.
+ if (is_active_)
+ return 1;
+
+ // Calculate duration of trasmission.
+ ACE_Time_Value duration (transmission_end_);
+ duration -= transmission_start_;
+
+ // Report transmission statistics.
+ ACE_DEBUG ((LM_DEBUG,
+ "\n\nStatisics for transmission %u:\n\n"
+ "Transmission status: %s\n"
+ "Start time: %d (sec) %d (usec)\n"
+ "End time: %d (sec) %d (usec)\n"
+ "Duration: %d (sec) %d (usec)\n"
+ "Packets relayed: %u\n\n",
+ transmission_number_, status_msg (),
+ transmission_start_.sec (),
+ transmission_start_.usec (),
+ transmission_end_.sec (),
+ transmission_end_.usec (),
+ duration.sec (),
+ duration.usec (),
+ packets_sent_));
+ return 0;
+}
+
+// Public entry point to which to push input.
+
+int
+Bounded_Packet_Relay::receive_input (void * arg)
+{
+ if (! arg)
+ {
+ if (is_active_)
+ ACE_ERROR ((LM_ERROR, "%t %p\n",
+ "Bounded_Packet_Relay::receive_input: "
+ "null argument"));
+
+ return -1;
+ }
+ ACE_Message_Block *message = static_cast<ACE_Message_Block *> (arg);
+ if (queue_.enqueue_tail (message) < 0)
+ {
+ if (is_active_)
+ ACE_ERROR ((LM_ERROR, "%t %p\n",
+ "Bounded_Packet_Relay::receive_input failed"));
+
+ return -1;
+ }
+
+ return 0;
+}
+
+// Get high water mark for relay queue.
+
+ACE_UINT32
+Bounded_Packet_Relay::queue_hwm (void)
+{
+ return queue_lwm_;
+}
+
+
+// Set high water mark for relay queue.
+
+void
+Bounded_Packet_Relay::queue_hwm (ACE_UINT32 hwm)
+{
+ queue_hwm_ = hwm;
+}
+
+// Get low water mark for relay queue.
+
+ACE_UINT32
+Bounded_Packet_Relay::queue_lwm (void)
+{
+ return queue_lwm_;
+}
+
+// Set low water mark for relay queue.
+
+void
+Bounded_Packet_Relay::queue_lwm (ACE_UINT32 lwm)
+{
+ queue_lwm_ = lwm;
+}
+
+
+
+// Returns string corresponding to current status.
+
+const char *
+Bounded_Packet_Relay::status_msg (void)
+{
+ const char *status_msg;
+ switch (status_)
+ {
+ case UN_INITIALIZED:
+ status_msg = "uninitialized";
+ break;
+ case STARTED:
+ status_msg = "in progress";
+ break;
+ case COMPLETED:
+ status_msg = "completed with all packets sent";
+ break;
+ case TIMED_OUT:
+ status_msg = "terminated by transmission duration timer";
+ break;
+ case CANCELLED:
+ status_msg = "cancelled by external control";
+ break;
+ case ERROR_DETECTED:
+ status_msg = "error was detected";
+ break;
+ default:
+ status_msg = "unknown transmission status";
+ break;
+ }
+
+ return status_msg;
+}