summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2007-11-08 17:45:54 +0000
committerAndrew Stitcher <astitcher@apache.org>2007-11-08 17:45:54 +0000
commit10a794c9d2fede2a0db9cf80e95f19d56e931196 (patch)
treed958e38ed4a14d992029548107a17e726ff20402 /cpp/src
parent36af925c0645e87661df4ba8448244b69492a0c1 (diff)
downloadqpid-python-10a794c9d2fede2a0db9cf80e95f19d56e931196.tar.gz
Improved Fix for the race condition where you've got a competing read and write
for the same handle. This can happen if we've just got a read event then before handling it we watch for write events and get one immediately. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@593237 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/sys/Dispatcher.cpp67
-rw-r--r--cpp/src/qpid/sys/Dispatcher.h22
2 files changed, 15 insertions, 74 deletions
diff --git a/cpp/src/qpid/sys/Dispatcher.cpp b/cpp/src/qpid/sys/Dispatcher.cpp
index 6218840cfa..7e47ce3d3a 100644
--- a/cpp/src/qpid/sys/Dispatcher.cpp
+++ b/cpp/src/qpid/sys/Dispatcher.cpp
@@ -46,7 +46,7 @@ void Dispatcher::run() {
//TODO: this is a temporary fix to ensure that if two
//events are being processed concurrently, the first thread
//will call dispatchCallbacks serially for each one
- h->handle(event.type);
+ h->dispatchCallbacks(event.type);
} else {
// Handle shutdown
switch (event.type) {
@@ -363,6 +363,18 @@ void DispatchHandle::dispatchCallbacks(Poller::EventType type) {
case ACTIVE_RW:
state = DELAYED_RW;
break;
+ // Can only get here in a DELAYED_* state in the rare case
+ // that we're already here for reading and we get activated for
+ // writing and we can write (it might be possible the other way
+ // round too). In this case we're already processing the handle
+ // in a different thread in this function so return right away
+ case DELAYED_R:
+ case DELAYED_W:
+ case DELAYED_RW:
+ case DELAYED_INACTIVE:
+ case DELAYED_IDLE:
+ case DELAYED_DELETE:
+ return;
default:
assert(false);
}
@@ -426,58 +438,7 @@ void DispatchHandle::dispatchCallbacks(Poller::EventType type) {
break;
}
}
- //TODO: this is a temporary fix to mark the handle as deleted,
- //but delay deletion until the handle loop below ends
- deleted = true;
-}
-
-/**
- * TODO: The following are part of a temporary fix to ensure that
- * where a new event is generated for the same handle while an
- * earlier one is still being processed (due to an interest in
- * writeability being declared) the events are processed serially
- * by the first thread.
- */
-void DispatchHandle::handle(Poller::EventType type)
-{
- if (start(type)) {
- dispatchCallbacks(type);
- drain();
- if (deleted) delete this;
- }
-}
-
-bool DispatchHandle::start(Poller::EventType type)
-{
- Mutex::ScopedLock l(processLock);
- if (processing) {
- events.push(type);
- return false;
- } else {
- processing = true;
- return true;
- }
-}
-
-void DispatchHandle::drain()
-{
- Poller::EventType type;
- while (next(type)) {
- dispatchCallbacks(type);
- }
-}
-
-bool DispatchHandle::next(Poller::EventType& type)
-{
- Mutex::ScopedLock l(processLock);
- if (events.empty()) {
- processing = false;
- return false;
- } else {
- type = events.front();
- events.pop();
- return true;
- }
+ delete this;
}
}}
diff --git a/cpp/src/qpid/sys/Dispatcher.h b/cpp/src/qpid/sys/Dispatcher.h
index 0cfaf8b5d4..7cc4873068 100644
--- a/cpp/src/qpid/sys/Dispatcher.h
+++ b/cpp/src/qpid/sys/Dispatcher.h
@@ -54,33 +54,13 @@ private:
DELAYED_DELETE
} state;
- /**
- * TODO: The following are part of a temporary fix to ensure that
- * where a new event is generated for the same handle while an
- * earlier one is still being processed (due to an interest in
- * writeability being declared) the events are processed serially
- * by the first thread.
- */
- Mutex processLock;
- bool processing;
- bool deleted;
- std::queue<Poller::EventType> events;
-
- bool start(Poller::EventType type);
- void handle(Poller::EventType type);
- void drain();
- bool next(Poller::EventType& type);
- /**************************************************************/
-
public:
DispatchHandle(const Socket& s, Callback rCb, Callback wCb, Callback dCb) :
PollerHandle(s),
readableCallback(rCb),
writableCallback(wCb),
disconnectedCallback(dCb),
- state(IDLE),
- processing(false),
- deleted(false)
+ state(IDLE)
{}
~DispatchHandle();