summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/sys/DispatchHandle.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/sys/DispatchHandle.cpp')
-rw-r--r--qpid/cpp/src/qpid/sys/DispatchHandle.cpp352
1 files changed, 352 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/sys/DispatchHandle.cpp b/qpid/cpp/src/qpid/sys/DispatchHandle.cpp
new file mode 100644
index 0000000000..5d6fc4e72f
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/DispatchHandle.cpp
@@ -0,0 +1,352 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/DispatchHandle.h"
+#include "qpid/log/Statement.h"
+
+#include <algorithm>
+
+#include <boost/cast.hpp>
+
+#include <assert.h>
+
+namespace qpid {
+namespace sys {
+
+DispatchHandle::DispatchHandle(const IOHandle& h, Callback rCb, Callback wCb, Callback dCb) :
+ PollerHandle(h),
+ readableCallback(rCb),
+ writableCallback(wCb),
+ disconnectedCallback(dCb),
+ state(IDLE)
+{
+}
+
+
+DispatchHandle::~DispatchHandle() {
+}
+
+void DispatchHandle::startWatch(Poller::shared_ptr poller0) {
+ bool r = readableCallback;
+ bool w = writableCallback;
+
+ ScopedLock<Mutex> lock(stateLock);
+ assert(state == IDLE);
+
+ poller = poller0;
+ poller->registerHandle(*this);
+ state = WAITING;
+ Poller::Direction dir = r ?
+ ( w ? Poller::INOUT : Poller::INPUT ) :
+ ( w ? Poller::OUTPUT : Poller::NONE );
+ poller->monitorHandle(*this, dir);
+}
+
+void DispatchHandle::rewatch() {
+ bool r = readableCallback;
+ bool w = writableCallback;
+ if (!r && !w) {
+ return;
+ }
+ Poller::Direction dir = r ?
+ ( w ? Poller::INOUT : Poller::INPUT ) :
+ ( w ? Poller::OUTPUT : Poller::NONE );
+
+ ScopedLock<Mutex> lock(stateLock);
+ switch(state) {
+ case IDLE:
+ case STOPPING:
+ case DELETING:
+ return;
+ default:
+ break;
+ }
+ assert(poller);
+ poller->monitorHandle(*this, dir);
+}
+
+void DispatchHandle::rewatchRead() {
+ if (!readableCallback) {
+ return;
+ }
+
+ ScopedLock<Mutex> lock(stateLock);
+ switch(state) {
+ case IDLE:
+ case STOPPING:
+ case DELETING:
+ return;
+ default:
+ break;
+ }
+ assert(poller);
+ poller->monitorHandle(*this, Poller::INPUT);
+}
+
+void DispatchHandle::rewatchWrite() {
+ if (!writableCallback) {
+ return;
+ }
+
+ ScopedLock<Mutex> lock(stateLock);
+ switch(state) {
+ case IDLE:
+ case STOPPING:
+ case DELETING:
+ return;
+ default:
+ break;
+ }
+ assert(poller);
+ poller->monitorHandle(*this, Poller::OUTPUT);
+}
+
+void DispatchHandle::unwatchRead() {
+ if (!readableCallback) {
+ return;
+ }
+
+ ScopedLock<Mutex> lock(stateLock);
+ switch(state) {
+ case IDLE:
+ case STOPPING:
+ case DELETING:
+ return;
+ default:
+ break;
+ }
+ assert(poller);
+ poller->unmonitorHandle(*this, Poller::INPUT);
+}
+
+void DispatchHandle::unwatchWrite() {
+ if (!writableCallback) {
+ return;
+ }
+
+ ScopedLock<Mutex> lock(stateLock);
+ switch(state) {
+ case IDLE:
+ case STOPPING:
+ case DELETING:
+ return;
+ default:
+ break;
+ }
+ assert(poller);
+ poller->unmonitorHandle(*this, Poller::OUTPUT);
+}
+
+void DispatchHandle::unwatch() {
+ ScopedLock<Mutex> lock(stateLock);
+ switch(state) {
+ case IDLE:
+ case STOPPING:
+ case DELETING:
+ return;
+ default:
+ break;
+ }
+ assert(poller);
+ poller->unmonitorHandle(*this, Poller::INOUT);
+}
+
+void DispatchHandle::stopWatch() {
+ ScopedLock<Mutex> lock(stateLock);
+ switch (state) {
+ case IDLE:
+ assert(state != IDLE);
+ return;
+ case STOPPING:
+ assert(state != STOPPING);
+ return;
+ case CALLING:
+ state = STOPPING;
+ break;
+ case WAITING:
+ state = IDLE;
+ break;
+ case DELETING:
+ return;
+ }
+ assert(poller);
+ poller->unregisterHandle(*this);
+ poller.reset();
+}
+
+// If we are in the IDLE/STOPPING state we can't do the callback as we've
+// not/no longer got the fd registered in any poller
+void DispatchHandle::call(Callback iCb) {
+ assert(iCb);
+ ScopedLock<Mutex> lock(stateLock);
+ switch (state) {
+ case IDLE:
+ case STOPPING:
+ case DELETING:
+ return;
+ default:
+ interruptedCallbacks.push(iCb);
+ assert(poller);
+ (void) poller->interrupt(*this);
+ }
+}
+
+// The slightly strange switch structure
+// is to ensure that the lock is released before
+// we do the delete
+void DispatchHandle::doDelete() {
+ {
+ ScopedLock<Mutex> lock(stateLock);
+ // Ensure that we're no longer watching anything
+ switch (state) {
+ case IDLE:
+ state = DELETING;
+ break;
+ case STOPPING:
+ state = DELETING;
+ return;
+ case WAITING:
+ state = DELETING;
+ assert(poller);
+ (void) poller->interrupt(*this);
+ poller->unregisterHandle(*this);
+ return;
+ case CALLING:
+ state = DELETING;
+ assert(poller);
+ poller->unregisterHandle(*this);
+ return;
+ case DELETING:
+ return;
+ }
+ }
+ // If we're IDLE we can do this right away
+ delete this;
+}
+
+void DispatchHandle::processEvent(Poller::EventType type) {
+
+ // Phase I
+ {
+ ScopedLock<Mutex> lock(stateLock);
+
+ switch(state) {
+ case IDLE:
+ // Can get here if a non connection thread stops watching
+ // whilst we were stuck in the above lock
+ return;
+ case WAITING:
+ state = CALLING;
+ break;
+ case CALLING:
+ assert(state!=CALLING);
+ return;
+ case STOPPING:
+ assert(state!=STOPPING);
+ return;
+ case DELETING:
+ // Need to make sure we clean up any pending callbacks in this case
+ std::swap(callbacks, interruptedCallbacks);
+ goto saybyebye;
+ }
+
+ std::swap(callbacks, interruptedCallbacks);
+ }
+
+ // Do callbacks - whilst we are doing the callbacks we are prevented from processing
+ // the same handle until we re-enable it. To avoid rentering the callbacks for a single
+ // handle re-enabling in the callbacks is actually deferred until they are complete.
+ try {
+ switch (type) {
+ case Poller::READABLE:
+ readableCallback(*this);
+ break;
+ case Poller::WRITABLE:
+ writableCallback(*this);
+ break;
+ case Poller::READ_WRITABLE:
+ readableCallback(*this);
+ writableCallback(*this);
+ break;
+ case Poller::DISCONNECTED:
+ if (disconnectedCallback) {
+ disconnectedCallback(*this);
+ }
+ break;
+ case Poller::INTERRUPTED:
+ {
+ // We'll actually do the interrupt below
+ }
+ break;
+ default:
+ assert(false);
+ }
+
+ // If we have any callbacks do them now -
+ // (because we use a copy from before the previous callbacks we won't
+ // do anything yet that was just added)
+ while (callbacks.size() > 0) {
+ {
+ ScopedLock<Mutex> lock(stateLock);
+ switch (state) {
+ case DELETING:
+ goto finishcallbacks;
+ default:
+ break;
+ }
+ }
+ Callback cb = callbacks.front();
+ assert(cb);
+ cb(*this);
+ callbacks.pop();
+ }
+ } catch (std::exception& e) {
+ // One of the callbacks threw an exception - that's not allowed
+ QPID_LOG(error, "Caught exception in state: " << state << " with event: " << type << ": " << e.what());
+ // It would be nice to clean up and delete ourselves here, but we can't
+ }
+
+finishcallbacks:
+ {
+ ScopedLock<Mutex> lock(stateLock);
+ switch (state) {
+ case IDLE:
+ assert(state!=IDLE);
+ return;
+ case STOPPING:
+ state = IDLE;
+ return;
+ case WAITING:
+ assert(state!=WAITING);
+ return;
+ case CALLING:
+ state = WAITING;
+ return;
+ case DELETING:
+ break;
+ }
+ }
+
+saybyebye:
+ delete this;
+}
+
+}}