summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-08-12 17:31:53 +0000
committerAlan Conway <aconway@apache.org>2008-08-12 17:31:53 +0000
commita1f525cb64b851a5177b7564d36c08f4d5c8898d (patch)
tree1b4aa57113579dd58cddb394d7194aa023b1f90d
parentc2c4435546670b6010205f82418caac7297a3e23 (diff)
downloadqpid-python-a1f525cb64b851a5177b7564d36c08f4d5c8898d.tar.gz
Move frame processing out of CPG dispatch queue for cluster.
PollableQueue is a pollable in-memory queue, will probably move it to sys. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@685237 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/cluster.mk5
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.cpp59
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.h14
-rw-r--r--qpid/cpp/src/qpid/cluster/PollableCondition.cpp59
-rw-r--r--qpid/cpp/src/qpid/cluster/PollableCondition.h57
-rw-r--r--qpid/cpp/src/qpid/cluster/PollableQueue.h99
6 files changed, 273 insertions, 20 deletions
diff --git a/qpid/cpp/src/cluster.mk b/qpid/cpp/src/cluster.mk
index 718dffff38..aa3644785b 100644
--- a/qpid/cpp/src/cluster.mk
+++ b/qpid/cpp/src/cluster.mk
@@ -16,7 +16,10 @@ libqpidcluster_la_SOURCES = \
qpid/cluster/ConnectionInterceptor.cpp \
qpid/cluster/ClassifierHandler.h \
qpid/cluster/ClassifierHandler.cpp \
- qpid/cluster/ShadowConnectionOutputHandler.h
+ qpid/cluster/ShadowConnectionOutputHandler.h \
+ qpid/cluster/PollableCondition.h \
+ qpid/cluster/PollableCondition.cpp \
+ qpid/cluster/PollableQueue.h
libqpidcluster_la_LIBADD= -lcpg libqpidbroker.la
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp
index 3a0d11b5d1..37b126f5a9 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp
@@ -66,7 +66,9 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
cpgDispatchHandle(cpg,
boost::bind(&Cluster::dispatch, this, _1), // read
0, // write
- boost::bind(&Cluster::disconnect, this, _1)) // disconnect
+ boost::bind(&Cluster::disconnect, this, _1) // disconnect
+ ),
+ deliverQueue(boost::bind(&Cluster::deliverFrames, this, _1, _2))
{
broker->addFinalizer(boost::bind(&Cluster::leave, this));
QPID_LOG(trace, "Joining cluster: " << name_);
@@ -80,10 +82,10 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
// Start dispatching from the poller.
cpgDispatchHandle.startWatch(poller);
+ deliverQueue.start(poller);
}
-Cluster::~Cluster() {
-}
+Cluster::~Cluster() {}
// local connection initializes plugins
void Cluster::initialize(broker::Connection& c) {
@@ -181,31 +183,50 @@ void Cluster::deliver(
Buffer buf(static_cast<char*>(msg), msg_len);
AMQFrame frame;
if (!frame.decode(buf)) // Not enough data.
- throw Exception("Received incomplete cluster event."); // FIXME aconway 2008-08-05: cluster error handling.
- ConnectionInterceptor* connection;
+ throw Exception("Received incomplete cluster event."); // FIXME aconway 2008-08-05: error handling.
+ void* connection;
decodePtr(buf, connection);
- QPID_LOG(trace, "DLVR [" << from << " " << connection << "] " << frame);
-
- if (!broker) {
- QPID_LOG(warning, "Unexpected DLVR, already left the cluster.");
- return;
- }
- if (connection && from != self) // Look up shadow for remote connections
- connection = getShadowConnection(from, connection);
-
- if (frame.getMethod() && frame.getMethod()->amqpClassId() == CLUSTER_CLASS_ID)
- handleMethod(from, connection, *frame.getMethod());
- else
- connection->deliver(frame);
+ deliverQueue.push(DeliveredFrame(frame, from, connection));
}
catch (const std::exception& e) {
// FIXME aconway 2008-01-30: exception handling.
- QPID_LOG(critical, "Error in cluster delivery: " << e.what());
+ QPID_LOG(critical, "Error in cluster deliver: " << e.what());
assert(0);
throw;
}
}
+void Cluster::deliverFrames(const PollableQueue<DeliveredFrame>::iterator& begin,
+ const PollableQueue<DeliveredFrame>::iterator& end)
+{
+ for (PollableQueue<DeliveredFrame>::iterator i = begin; i != end; ++i) {
+ AMQFrame& frame(i->frame);
+ Id from(i->from);
+ ConnectionInterceptor* connection = reinterpret_cast<ConnectionInterceptor*>(i->connection);
+ try {
+ QPID_LOG(trace, "DLVR [" << from << " " << connection << "] " << frame);
+
+ if (!broker) {
+ QPID_LOG(warning, "Unexpected DLVR, already left the cluster.");
+ return;
+ }
+ if (connection && from != self) // Look up shadow for remote connections
+ connection = getShadowConnection(from, connection);
+
+ if (frame.getMethod() && frame.getMethod()->amqpClassId() == CLUSTER_CLASS_ID)
+ handleMethod(from, connection, *frame.getMethod());
+ else
+ connection->deliver(frame);
+ }
+ catch (const std::exception& e) {
+ // FIXME aconway 2008-01-30: exception handling.
+ QPID_LOG(critical, "Error in cluster deliverFrame: " << e.what());
+ assert(0);
+ throw;
+ }
+ }
+}
+
// Handle cluster methods
// FIXME aconway 2008-07-11: Generate/template a better dispatch mechanism.
void Cluster::handleMethod(Id from, ConnectionInterceptor* connection, AMQMethodBody& method) {
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h
index 1018684e7e..6f5e6d9cfb 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.h
+++ b/qpid/cpp/src/qpid/cluster/Cluster.h
@@ -21,6 +21,7 @@
#include "qpid/cluster/Cpg.h"
#include "qpid/cluster/ShadowConnectionOutputHandler.h"
+#include "qpid/cluster/PollableQueue.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Connection.h"
@@ -96,10 +97,17 @@ class Cluster : private Cpg::Handler, public RefCounted
typedef std::map<Id, Member> MemberMap;
typedef std::map<ShadowConnectionId, ConnectionInterceptor*> ShadowConnectionMap;
+ struct DeliveredFrame {
+ framing::AMQFrame frame; Id from; void* connection;
+ DeliveredFrame(const framing::AMQFrame& f, const Id i, void* c)
+ : frame(f), from(i), connection(c) {}
+ };
+
boost::function<void()> shutdownNext;
void notify(); ///< Notify cluster of my details.
+ /** CPG deliver callback. */
void deliver(
cpg_handle_t /*handle*/,
struct cpg_name *group,
@@ -108,6 +116,7 @@ class Cluster : private Cpg::Handler, public RefCounted
void* /*msg*/,
int /*msg_len*/);
+ /** CPG config change callback */
void configChange(
cpg_handle_t /*handle*/,
struct cpg_name */*group*/,
@@ -116,6 +125,10 @@ class Cluster : private Cpg::Handler, public RefCounted
struct cpg_address */*joined*/, int /*nJoined*/
);
+ /** Callback to handle delivered frames from the deliverQueue. */
+ void deliverFrames(const PollableQueue<DeliveredFrame>::iterator& begin,
+ const PollableQueue<DeliveredFrame>::iterator& end);
+
void dispatch(sys::DispatchHandle&);
void disconnect(sys::DispatchHandle&);
@@ -134,6 +147,7 @@ class Cluster : private Cpg::Handler, public RefCounted
ShadowConnectionMap shadowConnectionMap;
ShadowConnectionOutputHandler shadowOut;
sys::DispatchHandle cpgDispatchHandle;
+ PollableQueue<DeliveredFrame> deliverQueue;
friend std::ostream& operator <<(std::ostream&, const Cluster&);
friend std::ostream& operator <<(std::ostream&, const MemberMap::value_type&);
diff --git a/qpid/cpp/src/qpid/cluster/PollableCondition.cpp b/qpid/cpp/src/qpid/cluster/PollableCondition.cpp
new file mode 100644
index 0000000000..1970420297
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/PollableCondition.cpp
@@ -0,0 +1,59 @@
+#ifndef QPID_SYS_LINUX_POLLABLECONDITION_CPP
+#define QPID_SYS_LINUX_POLLABLECONDITION_CPP
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+// Linux implementation of PollableCondition using the conditionfd(2) system call.
+
+// FIXME aconway 2008-08-11: this could be of more general interest,
+// move to common lib.
+//
+
+#include "qpid/sys/posix/PrivatePosix.h"
+#include "qpid/cluster/PollableCondition.h"
+#include "qpid/Exception.h"
+#include <sys/eventfd.h>
+
+namespace qpid {
+namespace cluster {
+
+PollableCondition::PollableCondition() : IOHandle(new sys::IOHandlePrivate) {
+ impl->fd = ::eventfd(0, 0);
+ if (impl->fd < 0) throw ErrnoException("conditionfd() failed");
+}
+
+bool PollableCondition::clear() {
+ char buf[8];
+ ssize_t n = ::read(impl->fd, buf, 8);
+ if (n != 8) throw ErrnoException("read failed on conditionfd");
+ return *reinterpret_cast<uint64_t*>(buf);
+}
+
+void PollableCondition::set() {
+ static const uint64_t value=1;
+ ssize_t n = ::write(impl->fd, reinterpret_cast<const void*>(&value), 8);
+ if (n != 8) throw ErrnoException("write failed on conditionfd");
+}
+
+}} // namespace qpid::cluster
+
+#endif /*!QPID_SYS_LINUX_POLLABLECONDITION_CPP*/
diff --git a/qpid/cpp/src/qpid/cluster/PollableCondition.h b/qpid/cpp/src/qpid/cluster/PollableCondition.h
new file mode 100644
index 0000000000..affcae580a
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/PollableCondition.h
@@ -0,0 +1,57 @@
+#ifndef QPID_SYS_POLLABLECONDITION_H
+#define QPID_SYS_POLLABLECONDITION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/IOHandle.h"
+
+// FIXME aconway 2008-08-11: this could be of more general interest,
+// move to sys namespace in common lib.
+//
+
+namespace qpid {
+namespace cluster {
+
+/**
+ * A pollable condition to integrate in-process conditions with IO
+ * conditions in a polling loop.
+ *
+ * Setting the condition makes it readable for a poller.
+ *
+ * Writable/disconnected conditions are undefined and should not be
+ * polled for.
+ */
+class PollableCondition : public sys::IOHandle {
+ public:
+ PollableCondition();
+
+ /** Set the condition, triggers readable in a poller. */
+ void set();
+
+ /** Get the current state of the condition, then clear it.
+ *@return The state of the condition before it was cleared.
+ */
+ bool clear();
+};
+}} // namespace qpid::cluster
+
+#endif /*!QPID_SYS_POLLABLECONDITION_H*/
diff --git a/qpid/cpp/src/qpid/cluster/PollableQueue.h b/qpid/cpp/src/qpid/cluster/PollableQueue.h
new file mode 100644
index 0000000000..0bba2ba790
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/PollableQueue.h
@@ -0,0 +1,99 @@
+#ifndef QPID_CLUSTER_POLLABLEQUEUE_H
+#define QPID_CLUSTER_POLLABLEQUEUE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/cluster/PollableCondition.h"
+#include "qpid/sys/Dispatcher.h"
+#include "qpid/sys/Mutex.h"
+#include <boost/function.hpp>
+#include <boost/bind.hpp>
+#include <deque>
+
+namespace qpid {
+
+namespace sys { class Poller; }
+
+namespace cluster {
+
+// FIXME aconway 2008-08-11: this could be of more general interest,
+// move to common lib.
+
+/**
+ * A queue that can be polled by sys::Poller. Any thread can push to
+ * the queue, on wakeup the poller thread processes all items on the
+ * queue by passing them to a callback in a batch.
+ */
+template <class T>
+class PollableQueue {
+ typedef std::deque<T> Queue;
+
+ public:
+ typedef typename Queue::iterator iterator;
+
+ /** Callback to process a range of items. */
+ typedef boost::function<void (const iterator&, const iterator&)> Callback;
+
+ /** When the queue is selected by the poller, values are passed to callback cb. */
+ explicit PollableQueue(const Callback& cb);
+
+ /** Push a value onto the queue. Thread safe */
+ void push(const T& t) { ScopedLock l(lock); queue.push_back(t); condition.set(); }
+
+ /** Start polling. */
+ void start(const boost::shared_ptr<sys::Poller>& poller) { handle.startWatch(poller); }
+
+ /** Stop polling. */
+ void stop() { handle.stopWatch(); }
+
+ private:
+ typedef sys::Mutex::ScopedLock ScopedLock;
+ typedef sys::Mutex::ScopedUnlock ScopedUnlock;
+
+ void dispatch(sys::DispatchHandle&);
+
+ sys::Mutex lock;
+ Callback callback;
+ PollableCondition condition;
+ sys::DispatchHandle handle;
+ Queue queue;
+ Queue batch;
+};
+
+template <class T> PollableQueue<T>::PollableQueue(const Callback& cb) // FIXME aconway 2008-08-12:
+ : callback(cb),
+ handle(condition, boost::bind(&PollableQueue<T>::dispatch, this, _1), 0, 0)
+{}
+
+template <class T> void PollableQueue<T>::dispatch(sys::DispatchHandle& h) {
+ ScopedLock l(lock); // Lock for concurrent push()
+ batch.clear();
+ batch.swap(queue);
+ condition.clear();
+ ScopedUnlock u(lock);
+ callback(batch.begin(), batch.end()); // Process the batch outside the lock.
+ h.rewatch();
+}
+
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_POLLABLEQUEUE_H*/