From a20e61cd40e6236a61bde1d6d0af165e6b0b12d1 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Tue, 12 Aug 2008 17:31:53 +0000 Subject: Move frame processing out of CPG dispatch queue for cluster. PollableQueue is a pollable in-memory queue, will probably move it to sys. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@685237 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/cluster.mk | 5 +- cpp/src/qpid/cluster/Cluster.cpp | 59 ++++++++++++------ cpp/src/qpid/cluster/Cluster.h | 14 +++++ cpp/src/qpid/cluster/PollableCondition.cpp | 59 ++++++++++++++++++ cpp/src/qpid/cluster/PollableCondition.h | 57 +++++++++++++++++ cpp/src/qpid/cluster/PollableQueue.h | 99 ++++++++++++++++++++++++++++++ 6 files changed, 273 insertions(+), 20 deletions(-) create mode 100644 cpp/src/qpid/cluster/PollableCondition.cpp create mode 100644 cpp/src/qpid/cluster/PollableCondition.h create mode 100644 cpp/src/qpid/cluster/PollableQueue.h diff --git a/cpp/src/cluster.mk b/cpp/src/cluster.mk index 718dffff38..aa3644785b 100644 --- a/cpp/src/cluster.mk +++ b/cpp/src/cluster.mk @@ -16,7 +16,10 @@ libqpidcluster_la_SOURCES = \ qpid/cluster/ConnectionInterceptor.cpp \ qpid/cluster/ClassifierHandler.h \ qpid/cluster/ClassifierHandler.cpp \ - qpid/cluster/ShadowConnectionOutputHandler.h + qpid/cluster/ShadowConnectionOutputHandler.h \ + qpid/cluster/PollableCondition.h \ + qpid/cluster/PollableCondition.cpp \ + qpid/cluster/PollableQueue.h libqpidcluster_la_LIBADD= -lcpg libqpidbroker.la diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 3a0d11b5d1..37b126f5a9 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -66,7 +66,9 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : cpgDispatchHandle(cpg, boost::bind(&Cluster::dispatch, this, _1), // read 0, // write - boost::bind(&Cluster::disconnect, this, _1)) // disconnect + boost::bind(&Cluster::disconnect, this, _1) // disconnect + ), + deliverQueue(boost::bind(&Cluster::deliverFrames, this, _1, _2)) { broker->addFinalizer(boost::bind(&Cluster::leave, this)); QPID_LOG(trace, "Joining cluster: " << name_); @@ -80,10 +82,10 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : // Start dispatching from the poller. cpgDispatchHandle.startWatch(poller); + deliverQueue.start(poller); } -Cluster::~Cluster() { -} +Cluster::~Cluster() {} // local connection initializes plugins void Cluster::initialize(broker::Connection& c) { @@ -181,31 +183,50 @@ void Cluster::deliver( Buffer buf(static_cast(msg), msg_len); AMQFrame frame; if (!frame.decode(buf)) // Not enough data. - throw Exception("Received incomplete cluster event."); // FIXME aconway 2008-08-05: cluster error handling. - ConnectionInterceptor* connection; + throw Exception("Received incomplete cluster event."); // FIXME aconway 2008-08-05: error handling. + void* connection; decodePtr(buf, connection); - QPID_LOG(trace, "DLVR [" << from << " " << connection << "] " << frame); - - if (!broker) { - QPID_LOG(warning, "Unexpected DLVR, already left the cluster."); - return; - } - if (connection && from != self) // Look up shadow for remote connections - connection = getShadowConnection(from, connection); - - if (frame.getMethod() && frame.getMethod()->amqpClassId() == CLUSTER_CLASS_ID) - handleMethod(from, connection, *frame.getMethod()); - else - connection->deliver(frame); + deliverQueue.push(DeliveredFrame(frame, from, connection)); } catch (const std::exception& e) { // FIXME aconway 2008-01-30: exception handling. - QPID_LOG(critical, "Error in cluster delivery: " << e.what()); + QPID_LOG(critical, "Error in cluster deliver: " << e.what()); assert(0); throw; } } +void Cluster::deliverFrames(const PollableQueue::iterator& begin, + const PollableQueue::iterator& end) +{ + for (PollableQueue::iterator i = begin; i != end; ++i) { + AMQFrame& frame(i->frame); + Id from(i->from); + ConnectionInterceptor* connection = reinterpret_cast(i->connection); + try { + QPID_LOG(trace, "DLVR [" << from << " " << connection << "] " << frame); + + if (!broker) { + QPID_LOG(warning, "Unexpected DLVR, already left the cluster."); + return; + } + if (connection && from != self) // Look up shadow for remote connections + connection = getShadowConnection(from, connection); + + if (frame.getMethod() && frame.getMethod()->amqpClassId() == CLUSTER_CLASS_ID) + handleMethod(from, connection, *frame.getMethod()); + else + connection->deliver(frame); + } + catch (const std::exception& e) { + // FIXME aconway 2008-01-30: exception handling. + QPID_LOG(critical, "Error in cluster deliverFrame: " << e.what()); + assert(0); + throw; + } + } +} + // Handle cluster methods // FIXME aconway 2008-07-11: Generate/template a better dispatch mechanism. void Cluster::handleMethod(Id from, ConnectionInterceptor* connection, AMQMethodBody& method) { diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index 1018684e7e..6f5e6d9cfb 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -21,6 +21,7 @@ #include "qpid/cluster/Cpg.h" #include "qpid/cluster/ShadowConnectionOutputHandler.h" +#include "qpid/cluster/PollableQueue.h" #include "qpid/broker/Broker.h" #include "qpid/broker/Connection.h" @@ -96,10 +97,17 @@ class Cluster : private Cpg::Handler, public RefCounted typedef std::map MemberMap; typedef std::map ShadowConnectionMap; + struct DeliveredFrame { + framing::AMQFrame frame; Id from; void* connection; + DeliveredFrame(const framing::AMQFrame& f, const Id i, void* c) + : frame(f), from(i), connection(c) {} + }; + boost::function shutdownNext; void notify(); ///< Notify cluster of my details. + /** CPG deliver callback. */ void deliver( cpg_handle_t /*handle*/, struct cpg_name *group, @@ -108,6 +116,7 @@ class Cluster : private Cpg::Handler, public RefCounted void* /*msg*/, int /*msg_len*/); + /** CPG config change callback */ void configChange( cpg_handle_t /*handle*/, struct cpg_name */*group*/, @@ -116,6 +125,10 @@ class Cluster : private Cpg::Handler, public RefCounted struct cpg_address */*joined*/, int /*nJoined*/ ); + /** Callback to handle delivered frames from the deliverQueue. */ + void deliverFrames(const PollableQueue::iterator& begin, + const PollableQueue::iterator& end); + void dispatch(sys::DispatchHandle&); void disconnect(sys::DispatchHandle&); @@ -134,6 +147,7 @@ class Cluster : private Cpg::Handler, public RefCounted ShadowConnectionMap shadowConnectionMap; ShadowConnectionOutputHandler shadowOut; sys::DispatchHandle cpgDispatchHandle; + PollableQueue deliverQueue; friend std::ostream& operator <<(std::ostream&, const Cluster&); friend std::ostream& operator <<(std::ostream&, const MemberMap::value_type&); diff --git a/cpp/src/qpid/cluster/PollableCondition.cpp b/cpp/src/qpid/cluster/PollableCondition.cpp new file mode 100644 index 0000000000..1970420297 --- /dev/null +++ b/cpp/src/qpid/cluster/PollableCondition.cpp @@ -0,0 +1,59 @@ +#ifndef QPID_SYS_LINUX_POLLABLECONDITION_CPP +#define QPID_SYS_LINUX_POLLABLECONDITION_CPP + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +// Linux implementation of PollableCondition using the conditionfd(2) system call. + +// FIXME aconway 2008-08-11: this could be of more general interest, +// move to common lib. +// + +#include "qpid/sys/posix/PrivatePosix.h" +#include "qpid/cluster/PollableCondition.h" +#include "qpid/Exception.h" +#include + +namespace qpid { +namespace cluster { + +PollableCondition::PollableCondition() : IOHandle(new sys::IOHandlePrivate) { + impl->fd = ::eventfd(0, 0); + if (impl->fd < 0) throw ErrnoException("conditionfd() failed"); +} + +bool PollableCondition::clear() { + char buf[8]; + ssize_t n = ::read(impl->fd, buf, 8); + if (n != 8) throw ErrnoException("read failed on conditionfd"); + return *reinterpret_cast(buf); +} + +void PollableCondition::set() { + static const uint64_t value=1; + ssize_t n = ::write(impl->fd, reinterpret_cast(&value), 8); + if (n != 8) throw ErrnoException("write failed on conditionfd"); +} + +}} // namespace qpid::cluster + +#endif /*!QPID_SYS_LINUX_POLLABLECONDITION_CPP*/ diff --git a/cpp/src/qpid/cluster/PollableCondition.h b/cpp/src/qpid/cluster/PollableCondition.h new file mode 100644 index 0000000000..affcae580a --- /dev/null +++ b/cpp/src/qpid/cluster/PollableCondition.h @@ -0,0 +1,57 @@ +#ifndef QPID_SYS_POLLABLECONDITION_H +#define QPID_SYS_POLLABLECONDITION_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/IOHandle.h" + +// FIXME aconway 2008-08-11: this could be of more general interest, +// move to sys namespace in common lib. +// + +namespace qpid { +namespace cluster { + +/** + * A pollable condition to integrate in-process conditions with IO + * conditions in a polling loop. + * + * Setting the condition makes it readable for a poller. + * + * Writable/disconnected conditions are undefined and should not be + * polled for. + */ +class PollableCondition : public sys::IOHandle { + public: + PollableCondition(); + + /** Set the condition, triggers readable in a poller. */ + void set(); + + /** Get the current state of the condition, then clear it. + *@return The state of the condition before it was cleared. + */ + bool clear(); +}; +}} // namespace qpid::cluster + +#endif /*!QPID_SYS_POLLABLECONDITION_H*/ diff --git a/cpp/src/qpid/cluster/PollableQueue.h b/cpp/src/qpid/cluster/PollableQueue.h new file mode 100644 index 0000000000..0bba2ba790 --- /dev/null +++ b/cpp/src/qpid/cluster/PollableQueue.h @@ -0,0 +1,99 @@ +#ifndef QPID_CLUSTER_POLLABLEQUEUE_H +#define QPID_CLUSTER_POLLABLEQUEUE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/cluster/PollableCondition.h" +#include "qpid/sys/Dispatcher.h" +#include "qpid/sys/Mutex.h" +#include +#include +#include + +namespace qpid { + +namespace sys { class Poller; } + +namespace cluster { + +// FIXME aconway 2008-08-11: this could be of more general interest, +// move to common lib. + +/** + * A queue that can be polled by sys::Poller. Any thread can push to + * the queue, on wakeup the poller thread processes all items on the + * queue by passing them to a callback in a batch. + */ +template +class PollableQueue { + typedef std::deque Queue; + + public: + typedef typename Queue::iterator iterator; + + /** Callback to process a range of items. */ + typedef boost::function Callback; + + /** When the queue is selected by the poller, values are passed to callback cb. */ + explicit PollableQueue(const Callback& cb); + + /** Push a value onto the queue. Thread safe */ + void push(const T& t) { ScopedLock l(lock); queue.push_back(t); condition.set(); } + + /** Start polling. */ + void start(const boost::shared_ptr& poller) { handle.startWatch(poller); } + + /** Stop polling. */ + void stop() { handle.stopWatch(); } + + private: + typedef sys::Mutex::ScopedLock ScopedLock; + typedef sys::Mutex::ScopedUnlock ScopedUnlock; + + void dispatch(sys::DispatchHandle&); + + sys::Mutex lock; + Callback callback; + PollableCondition condition; + sys::DispatchHandle handle; + Queue queue; + Queue batch; +}; + +template PollableQueue::PollableQueue(const Callback& cb) // FIXME aconway 2008-08-12: + : callback(cb), + handle(condition, boost::bind(&PollableQueue::dispatch, this, _1), 0, 0) +{} + +template void PollableQueue::dispatch(sys::DispatchHandle& h) { + ScopedLock l(lock); // Lock for concurrent push() + batch.clear(); + batch.swap(queue); + condition.clear(); + ScopedUnlock u(lock); + callback(batch.begin(), batch.end()); // Process the batch outside the lock. + h.rewatch(); +} + +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_POLLABLEQUEUE_H*/ -- cgit v1.2.1