summaryrefslogtreecommitdiff
path: root/ACE/apps/JAWS3/jaws3/THYBRID_Concurrency.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ACE/apps/JAWS3/jaws3/THYBRID_Concurrency.cpp')
-rw-r--r--ACE/apps/JAWS3/jaws3/THYBRID_Concurrency.cpp130
1 files changed, 130 insertions, 0 deletions
diff --git a/ACE/apps/JAWS3/jaws3/THYBRID_Concurrency.cpp b/ACE/apps/JAWS3/jaws3/THYBRID_Concurrency.cpp
new file mode 100644
index 00000000000..83c3da28bc7
--- /dev/null
+++ b/ACE/apps/JAWS3/jaws3/THYBRID_Concurrency.cpp
@@ -0,0 +1,130 @@
+// $Id$
+
+#include "ace/OS_NS_stdlib.h"
+#include "ace/Message_Block.h"
+
+#ifndef JAWS_BUILD_DLL
+#define JAWS_BUILD_DLL
+#endif
+
+#include "jaws3/Concurrency.h"
+#include "jaws3/THYBRID_Concurrency.h"
+#include "jaws3/Protocol_Handler.h"
+#include "jaws3/Options.h"
+
+
+JAWS_THYBRID_Concurrency::JAWS_THYBRID_Concurrency (void)
+ : getting_ (0)
+ , min_number_of_threads_ (1)
+ , max_number_of_threads_ (-1)
+ , shutdown_task_ (0)
+ , error_ (0)
+{
+ const char *value;
+
+ value = JAWS_Options::instance ()->getenv ("JAWS_MIN_THYBRID_THREADS");
+ if (value != 0)
+ this->min_number_of_threads_ = ACE_OS::atoi (value);
+ else
+ this->min_number_of_threads_ =
+ ACE_OS::atoi (JAWS_DEFAULT_MIN_THYBRID_THREADS);
+
+ if (this->min_number_of_threads_ <= 0)
+ this->min_number_of_threads_ = 1;
+
+ value = JAWS_Options::instance ()->getenv ("JAWS_MAX_THYBRID_THREADS");
+ if (value != 0)
+ this->max_number_of_threads_ = ACE_OS::atoi (value);
+ else
+ this->max_number_of_threads_ =
+ ACE_OS::atoi (JAWS_DEFAULT_MAX_THYBRID_THREADS);
+
+ if (this->max_number_of_threads_ <= 0)
+ this->max_number_of_threads_ = -1;
+ else if (this->max_number_of_threads_ < this->min_number_of_threads_)
+ this->max_number_of_threads_ = this->min_number_of_threads_;
+
+ int r;
+ r = this->activate (THR_BOUND | THR_JOINABLE, this->min_number_of_threads_);
+ if (r < 0)
+ {
+ this->shutdown_task_ = 1;
+ this->error_ = 1;
+ }
+}
+
+int
+JAWS_THYBRID_Concurrency::putq (JAWS_Protocol_Handler *ph)
+{
+ if (this->error_)
+ return -1;
+
+ JAWS_CONCURRENCY_TASK *task = this;
+ int result = task->putq (& ph->mb_);
+
+ if (result != -1)
+ {
+ if (this->getting_ < this->min_number_of_threads_
+ && (this->max_number_of_threads_ < 0
+ || this->thr_count () < (size_t) this->max_number_of_threads_))
+ {
+ int r;
+ r = this->activate ( THR_BOUND | THR_JOINABLE
+ , 1 // number of threads
+ , 1 // force active
+ );
+ if (r < 0)
+ {
+ // ACE_ERROR
+ return -1;
+ }
+ }
+ }
+
+ return result;
+}
+
+int
+JAWS_THYBRID_Concurrency::getq (JAWS_Protocol_Handler *&ph)
+{
+ ph = 0;
+
+ JAWS_CONCURRENCY_TASK *task = this;
+
+ if (this->shutdown_task_ && task->msg_queue ()->message_count () == 0)
+ return -1;
+
+ int getting = ++(this->getting_);
+
+ if (getting > this->min_number_of_threads_)
+ {
+ if (task->msg_queue ()->message_count () == 0)
+ {
+ --(this->getting_);
+ return -1;
+ }
+ }
+
+ ACE_Message_Block *mb = 0;
+ int result = task->getq (mb);
+
+ if (result != -1)
+ {
+ ph = (JAWS_Protocol_Handler *) mb->base ();
+
+ if (ph == 0)
+ {
+ // Shutdown this task;
+ this->shutdown_task_ = 1;
+ if (this->getting_ > 1)
+ {
+ task->putq (mb);
+ result = -1;
+ }
+
+ }
+ }
+
+ --(this->getting_);
+ return result;
+}