summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorpoberlin <poberlin@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2007-08-04 19:27:22 +0000
committerpoberlin <poberlin@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2007-08-04 19:27:22 +0000
commit0f5b2520349cfb8903ad12df6e09c2ec7b756504 (patch)
tree42912264366b8e70e8ff888ceabd5b06e4b4aaa6
parent1caf6e6050a12bc5a822aacfeae71eee6c8b0a69 (diff)
downloadATCD-reactor_deadlock_protocol.tar.gz
improving tests, bug fixingreactor_deadlock_protocol
-rw-r--r--ACE/ace/DA_Strategy_Base.h2
-rw-r--r--ACE/ace/DA_Strategy_Base.inl9
-rw-r--r--ACE/ace/Deadlock_Free_TP_Reactor.h1
-rw-r--r--ACE/ace/Deadlock_Free_TP_Reactor.inl6
-rw-r--r--ACE/tests/DA_Reactor_Test.cpp35
5 files changed, 46 insertions, 7 deletions
diff --git a/ACE/ace/DA_Strategy_Base.h b/ACE/ace/DA_Strategy_Base.h
index c2cc5b32c8e..e2361b59196 100644
--- a/ACE/ace/DA_Strategy_Base.h
+++ b/ACE/ace/DA_Strategy_Base.h
@@ -81,7 +81,7 @@ public:
private:
HASH_ANNOTATIONS_MAP annotations_repo_;
ACE_RW_Thread_Mutex lock_;
- ACE_Atomic_Op<ACE_Thread_Mutex, long> num_avail_threads_;
+ ACE_Atomic_Op<ACE_Thread_Mutex, int> num_avail_threads_;
};
diff --git a/ACE/ace/DA_Strategy_Base.inl b/ACE/ace/DA_Strategy_Base.inl
index ad62a9f585a..be3e999e798 100644
--- a/ACE/ace/DA_Strategy_Base.inl
+++ b/ACE/ace/DA_Strategy_Base.inl
@@ -43,7 +43,14 @@ ACE_INLINE int
DA_Strategy_Base<AnnotationId>::add_annotation (AnnotationId id, int annotation)
{
int rc;
- rc = annotations_repo_.bind (id, annotation);
+ if (annotation > num_avail_threads_.value()) {
+ rc = -1;
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("%p.\n"),
+ ACE_TEXT ("DA_Strategy_Base annotation may not exceed number of threads")));
+ } else {
+ rc = annotations_repo_.bind (id, annotation);
+ }
/*
ACE_DEBUG ((LM_DEBUG, "In add_annotation\n"));
HASH_ANNOTATIONS_CONST_ITER iter(annotations_repo_);
diff --git a/ACE/ace/Deadlock_Free_TP_Reactor.h b/ACE/ace/Deadlock_Free_TP_Reactor.h
index 2a2d999b0e5..5f586be3343 100644
--- a/ACE/ace/Deadlock_Free_TP_Reactor.h
+++ b/ACE/ace/Deadlock_Free_TP_Reactor.h
@@ -101,6 +101,7 @@ public:
virtual void pre_upcall_hook (ACE_HANDLE upcall_handle);
virtual void post_upcall_hook (ACE_HANDLE upcall_handle);
virtual int get_annotation (ACE_Event_Handler* eh);
+ virtual int get_annotation (ACE_HANDLE h);
virtual int add_annotation (ACE_Event_Handler* eh, int annotation);
virtual int remove_annotation (ACE_Event_Handler* eh);
diff --git a/ACE/ace/Deadlock_Free_TP_Reactor.inl b/ACE/ace/Deadlock_Free_TP_Reactor.inl
index 573ac5eb38a..6441d3f7f31 100644
--- a/ACE/ace/Deadlock_Free_TP_Reactor.inl
+++ b/ACE/ace/Deadlock_Free_TP_Reactor.inl
@@ -92,6 +92,12 @@ ACE_Deadlock_Free_TP_Reactor_Impl::get_annotation (ACE_Event_Handler* eh)
}
ACE_INLINE int
+ACE_Deadlock_Free_TP_Reactor_Impl::get_annotation (ACE_HANDLE h)
+{
+ return da_strategy_->get_annotation(h);
+}
+
+ACE_INLINE int
ACE_Deadlock_Free_TP_Reactor_Impl::add_annotation (ACE_Event_Handler* eh, int annotation)
{
return da_strategy_->add_annotation(eh->get_handle(), annotation);
diff --git a/ACE/tests/DA_Reactor_Test.cpp b/ACE/tests/DA_Reactor_Test.cpp
index c943305f277..6a37f7bfcc3 100644
--- a/ACE/tests/DA_Reactor_Test.cpp
+++ b/ACE/tests/DA_Reactor_Test.cpp
@@ -437,11 +437,21 @@ Receiver::open (void *)
flg_mask_ = ACE_Event_Handler::NULL_MASK ;
+ int annotation = (rand()%threads) + 1;
+ if (ACE_Deadlock_Free_TP_Reactor::instance()->add_annotation(this, annotation) == -1) {
+ return -1;
+ } else {
+ ACE_DEBUG ((LM_DEBUG, "%s = %d ", "adding handle", this->get_handle()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "with annotation", annotation));
+ }
if (ACE_Reactor::instance()->register_handler (this, flg_mask_) == -1)
+ {
+ ACE_Deadlock_Free_TP_Reactor::instance()->remove_annotation(this);
return -1;
- //randomize annotation between 1 and 10
- ACE_Deadlock_Free_TP_Reactor::instance()->add_annotation(this, (rand()%9) + 1);
+ }
+
+
initiate_io (ACE_Event_Handler::READ_MASK);
return check_destroy ();
@@ -581,7 +591,7 @@ Receiver::handle_output (ACE_HANDLE h)
int err = 0;
ssize_t res = 0;
size_t bytes = 0;
-
+ int annotation = ACE_Deadlock_Free_TP_Reactor::instance()->get_annotation(h);
int qcount = this->getq (mb, &tv);
if (mb != 0) // qcount >= 0)
@@ -604,6 +614,7 @@ Receiver::handle_output (ACE_HANDLE h)
ACE_DEBUG ((LM_DEBUG, "**** Receiver::handle_output () SessionId=%d****\n", index_));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write", bytes));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", h));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "annotation", annotation));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transferred", res));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", err));
ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block", mb->rd_ptr ()));
@@ -814,10 +825,20 @@ int Sender::open (void *)
flg_mask_ = ACE_Event_Handler::NULL_MASK ;
-
+ int annotation = (rand()%threads) + 1;
+ if (ACE_Deadlock_Free_TP_Reactor::instance()->add_annotation(this, annotation) == -1)
+ {
+ return -1;
+ } else {
+ ACE_DEBUG ((LM_DEBUG, "%s = %d ", "adding handle", this->get_handle()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "with annotation", annotation));
+ }
if (ACE_Reactor::instance()->register_handler (this,flg_mask_) == -1)
+ {
+ ACE_Deadlock_Free_TP_Reactor::instance()->remove_annotation(this);
return -1;
- ACE_Deadlock_Free_TP_Reactor::instance()->add_annotation(this, (rand()%9) + 1);
+ }
+
if (this->initiate_write () == -1)
return -1;
@@ -907,6 +928,7 @@ Sender::handle_input (ACE_HANDLE h)
-1);
int err = 0;
+ int annotation = ACE_Deadlock_Free_TP_Reactor::instance()->get_annotation(h);
ssize_t res = this->peer ().recv (mb->rd_ptr (),
BUFSIZ-1);
this->total_r_++;
@@ -928,6 +950,7 @@ Sender::handle_input (ACE_HANDLE h)
ACE_DEBUG ((LM_DEBUG, "**** Sender::handle_input () SessionId=%d****\n", index_));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", BUFSIZ));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", h));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "annotation", annotation));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transferred", res));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", err));
ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block", mb->rd_ptr ()));
@@ -974,6 +997,7 @@ Sender::handle_output (ACE_HANDLE h)
int err=0;
ssize_t res=0;
size_t bytes=0;
+ int annotation = ACE_Deadlock_Free_TP_Reactor::instance()->get_annotation(h);
int qcount = this->getq (mb , & tv);
@@ -996,6 +1020,7 @@ Sender::handle_output (ACE_HANDLE h)
ACE_DEBUG ((LM_DEBUG, "**** Sender::handle_output () SessionId=%d****\n", index_));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write", bytes));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", h));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "annotation", annotation));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transferred", res));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", err));
ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block", mb->rd_ptr ()));