summaryrefslogtreecommitdiff
path: root/ACE/apps/JAWS3/jaws3/TPR_Concurrency.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ACE/apps/JAWS3/jaws3/TPR_Concurrency.cpp')
-rw-r--r--ACE/apps/JAWS3/jaws3/TPR_Concurrency.cpp89
1 files changed, 89 insertions, 0 deletions
diff --git a/ACE/apps/JAWS3/jaws3/TPR_Concurrency.cpp b/ACE/apps/JAWS3/jaws3/TPR_Concurrency.cpp
new file mode 100644
index 00000000000..6e15def5cb8
--- /dev/null
+++ b/ACE/apps/JAWS3/jaws3/TPR_Concurrency.cpp
@@ -0,0 +1,89 @@
+// $Id$
+
+#include "ace/Message_Block.h"
+
+#ifndef JAWS_BUILD_DLL
+#define JAWS_BUILD_DLL
+#endif
+
+#include "jaws3/Concurrency.h"
+#include "jaws3/TPR_Concurrency.h"
+#include "jaws3/Protocol_Handler.h"
+#include "jaws3/Options.h"
+
+
+JAWS_TPR_Concurrency::JAWS_TPR_Concurrency (void)
+ : getting_ (0)
+ , shutdown_task_ (0)
+ , error_ (0)
+{
+ int r;
+ r = this->activate (THR_BOUND | THR_JOINABLE);
+ if (r < 0)
+ {
+ this->error_ = 1;
+ this->shutdown_task_ = 1;
+ }
+}
+
+int
+JAWS_TPR_Concurrency::putq (JAWS_Protocol_Handler *ph)
+{
+ if (this->error_)
+ return -1;
+
+ JAWS_CONCURRENCY_TASK *task = this;
+ int result = task->putq (& ph->mb_);
+
+ if (result != -1)
+ {
+ int r;
+ r = this->activate ( THR_BOUND | THR_JOINABLE
+ , 1 // number of threads
+ , 1 // force active
+ );
+ if (r < 0)
+ {
+ // ACE_ERROR
+ return -1;
+ }
+ }
+
+ return result;
+}
+
+int
+JAWS_TPR_Concurrency::getq (JAWS_Protocol_Handler *&ph)
+{
+ ph = 0;
+
+ JAWS_CONCURRENCY_TASK *task = this;
+
+ if (this->shutdown_task_ && task->msg_queue ()->message_count () == 0)
+ return -1;
+
+ int getting = ++(this->getting_);
+
+ if (getting > 1 && task->msg_queue ()->message_count () == 0)
+ {
+ --(this->getting_);
+ return -1;
+ }
+
+ ACE_Message_Block *mb = 0;
+ int result = task->getq (mb);
+
+ if (result != -1)
+ {
+ ph = (JAWS_Protocol_Handler *) mb->base ();
+
+ if (ph == 0)
+ {
+ // Shutdown this task;
+ this->shutdown_task_ = 1;
+ }
+ }
+
+ --(this->getting_);
+ return result;
+}