From 6c24486bbc4ffcf8c70e7d0fbac846512c83b440 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Thu, 11 Sep 2008 13:26:10 +0000 Subject: Moved PollableCondition, PollableQueue and to sys. Fixed cluster shutdown issues. sys/PollableCondition: is a generic mechansim to poll for non-IO events in the Poller. sys/PollableQueue: is a thread-safe queue template that can be dispatched from the Poller when there are items on the queue. It uses PollableCondition. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@694243 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/Makefile.am | 3 + cpp/src/cluster.mk | 3 - cpp/src/qpid/broker/SessionState.cpp | 2 - cpp/src/qpid/client/SessionImpl.cpp | 2 +- cpp/src/qpid/cluster/Cluster.cpp | 66 ++++++++++------- cpp/src/qpid/cluster/Cluster.h | 19 +++-- cpp/src/qpid/cluster/ClusterPlugin.cpp | 9 +-- cpp/src/qpid/cluster/Connection.cpp | 16 ++++ cpp/src/qpid/cluster/Connection.h | 19 ++--- cpp/src/qpid/cluster/ConnectionCodec.cpp | 8 +- cpp/src/qpid/cluster/ConnectionCodec.h | 3 +- cpp/src/qpid/cluster/PollableCondition.cpp | 100 ------------------------- cpp/src/qpid/cluster/PollableCondition.h | 60 --------------- cpp/src/qpid/cluster/PollableQueue.h | 113 ----------------------------- cpp/src/qpid/sys/PollableCondition.cpp | 100 +++++++++++++++++++++++++ cpp/src/qpid/sys/PollableCondition.h | 60 +++++++++++++++ cpp/src/qpid/sys/PollableQueue.h | 113 +++++++++++++++++++++++++++++ cpp/src/tests/BrokerFixture.h | 3 +- cpp/src/tests/cluster_test.cpp | 73 +++++++++++++------ 19 files changed, 410 insertions(+), 362 deletions(-) delete mode 100644 cpp/src/qpid/cluster/PollableCondition.cpp delete mode 100644 cpp/src/qpid/cluster/PollableCondition.h delete mode 100644 cpp/src/qpid/cluster/PollableQueue.h create mode 100644 cpp/src/qpid/sys/PollableCondition.cpp create mode 100644 cpp/src/qpid/sys/PollableCondition.h create mode 100644 cpp/src/qpid/sys/PollableQueue.h (limited to 'cpp/src') diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 7c02516575..98dec2b12d 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -264,6 +264,9 @@ libqpidcommon_la_SOURCES = \ qpid/sys/AggregateOutput.cpp \ qpid/sys/AsynchIOHandler.cpp \ qpid/sys/Dispatcher.cpp \ + qpid/sys/PollableCondition.h \ + qpid/sys/PollableCondition.cpp \ + qpid/sys/PollableQueue.h \ qpid/sys/Runnable.cpp \ qpid/sys/SystemInfo.cpp \ qpid/sys/Shlib.cpp \ diff --git a/cpp/src/cluster.mk b/cpp/src/cluster.mk index bb9546f387..f02e5e1644 100644 --- a/cpp/src/cluster.mk +++ b/cpp/src/cluster.mk @@ -18,9 +18,6 @@ cluster_la_SOURCES = \ qpid/cluster/Connection.h \ qpid/cluster/Connection.cpp \ qpid/cluster/NoOpConnectionOutputHandler.h \ - qpid/cluster/PollableCondition.h \ - qpid/cluster/PollableCondition.cpp \ - qpid/cluster/PollableQueue.h \ qpid/cluster/WriteEstimate.h \ qpid/cluster/WriteEstimate.cpp \ qpid/cluster/OutputInterceptor.h \ diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index 913188845f..027f8a212d 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -70,8 +70,6 @@ SessionState::SessionState( } SessionState::~SessionState() { - // Remove ID from active session list. - broker.getSessionManager().forget(getId()); if (mgmtObject != 0) mgmtObject->resourceDestroy (); } diff --git a/cpp/src/qpid/client/SessionImpl.cpp b/cpp/src/qpid/client/SessionImpl.cpp index b736d116e1..7b1cacb640 100644 --- a/cpp/src/qpid/client/SessionImpl.cpp +++ b/cpp/src/qpid/client/SessionImpl.cpp @@ -649,7 +649,7 @@ void SessionImpl::checkOpen() const //call with lock held. { check(); if (state != ATTACHED) { - throw NotAttachedException("Session isn't attached"); + throw NotAttachedException(QPID_MSG("Session " << getId() << " isn't attached")); } } diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index ce156e85e4..07ed4596e0 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -61,7 +61,7 @@ struct ClusterOperations : public AMQP_AllOperations::ClusterHandler { }; Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : - broker(&b), + broker(b), poller(b.getPoller()), cpg(*this), name(name_), @@ -74,15 +74,17 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : ), deliverQueue(EventQueue::forEach(boost::bind(&Cluster::deliverEvent, this, _1))) { - broker->addFinalizer(boost::bind(&Cluster::leave, this)); - QPID_LOG(notice, "Joining cluster: " << name.str() << " as " << self); + QPID_LOG(notice, "Cluster member " << self << " joining cluster " << name.str()); + broker.addFinalizer(boost::bind(&Cluster::shutdown, this)); cpg.join(name); deliverQueue.start(poller); cpgDispatchHandle.startWatch(poller); } -Cluster::~Cluster() {} +Cluster::~Cluster() { + QPID_LOG(debug, "~Cluster()"); +} void Cluster::insert(const boost::intrusive_ptr& c) { Mutex::ScopedLock l(lock); @@ -94,20 +96,13 @@ void Cluster::erase(ConnectionId id) { connections.erase(id); } +// FIXME aconway 2008-09-10: leave is currently not called, +// It should be called if we are shut down by a cluster admin command. +// Any other type of exit is caught in disconnect(). +// void Cluster::leave() { - Mutex::ScopedLock l(lock); - if (!broker) return; // Already left. - // Leave is called by from Broker destructor after the poller has - // been shut down. No dispatches can occur. - - QPID_LOG(notice, "Leaving cluster " << name.str()); + QPID_LOG(notice, "Cluster member " << self << " leaving cluster " << name.str()); cpg.leave(name); - // broker= is set to 0 when the final config-change is delivered. - while(broker) { - Mutex::ScopedUnlock u(lock); - cpg.dispatchAll(); - } - cpg.shutdown(); } template void decodePtr(Buffer& buf, T*& ptr) { @@ -177,6 +172,7 @@ void Cluster::deliver( { try { MemberId from(nodeid, pid); + QPID_LOG(debug, "Cluster::deliver from " << from << " to " << self); // FIXME aconway 2008-09-10: deliverQueue.push(Event::delivered(from, msg, msg_len)); } catch (const std::exception& e) { @@ -238,7 +234,7 @@ void Cluster::configChange( cpg_address *left, int nLeft, cpg_address *joined, int nJoined) { - QPID_LOG(notice, "Cluster of " << nCurrent << ": " << AddrList(current, nCurrent) << ".\n Changes: " + QPID_LOG(info, "Cluster of " << nCurrent << ": " << AddrList(current, nCurrent) << ".\n Changes: " << AddrList(joined, nJoined) << AddrList(left, nLeft)); if (nJoined) // Notfiy new members of my URL. @@ -246,13 +242,14 @@ void Cluster::configChange( AMQFrame(in_place(ProtocolVersion(), url.str())), ConnectionId(self,0)); - + if (find(left, left+nLeft, self) != left+nLeft) { + // We have left the group, this is the final config change. + QPID_LOG(notice, "Cluster member " << self << " left cluster " << name.str()); + broker.shutdown(); + } Mutex::ScopedLock l(lock); for (int i = 0; i < nLeft; ++i) urls.erase(left[i]); // Add new members when their URL notice arraives. - - if (find(left, left+nLeft, self) != left+nLeft) - broker = 0; // We have left the group, this is the final config change. lock.notifyAll(); // Threads waiting for membership changes. } @@ -261,22 +258,35 @@ void Cluster::dispatch(sys::DispatchHandle& h) { h.rewatch(); } -void Cluster::disconnect(sys::DispatchHandle& h) { - h.stopWatch(); - QPID_LOG(critical, "Disconnected from cluster, shutting down"); - broker->shutdown(); +void Cluster::disconnect(sys::DispatchHandle& ) { + // FIXME aconway 2008-09-11: this should be logged as critical, + // when we provide admin option to shut down cluster and let + // members leave cleanly. + QPID_LOG(notice, "Cluster member " << self << " disconnected from cluster " << name.str()); + broker.shutdown(); } void Cluster::joining(const MemberId& m, const string& url) { - QPID_LOG(notice, "Cluster member " << m << " has URL " << url); + QPID_LOG(info, "Cluster member " << m << " has URL " << url); urls.insert(UrlMap::value_type(m,Url(url))); } void Cluster::ready(const MemberId& ) { // FIXME aconway 2008-09-08: TODO } - -}} // namespace qpid::cluster +// Called from Broker::~Broker when broker is shut down. At this +// point we know the poller has stopped so no poller callbacks will be +// invoked. We must ensure that CPG has also shut down so no CPG +// callbacks will be invoked. +// +void Cluster::shutdown() { + QPID_LOG(notice, "Cluster member " << self << " shutting down."); + try { cpg.shutdown(); } + catch (const std::exception& e) { QPID_LOG(error, "During CPG shutdown: " << e.what()); } + delete this; +} +broker::Broker& Cluster::getBroker(){ return broker; } +}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index a25b62ea12..3a254684ad 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -21,7 +21,7 @@ #include "qpid/cluster/Cpg.h" #include "qpid/cluster/Event.h" -#include "qpid/cluster/PollableQueue.h" +#include "qpid/sys/PollableQueue.h" #include "qpid/cluster/NoOpConnectionOutputHandler.h" #include "qpid/broker/Broker.h" @@ -43,7 +43,7 @@ class Connection; * Connection to the cluster. * Keeps cluster membership data. */ -class Cluster : public RefCounted, private Cpg::Handler +class Cluster : private Cpg::Handler { public: @@ -78,17 +78,16 @@ class Cluster : public RefCounted, private Cpg::Handler void joining(const MemberId&, const std::string& url); void ready(const MemberId&); - broker::Broker& getBroker() { assert(broker); return *broker; } - MemberId getSelf() const { return self; } + void shutdown(); + + broker::Broker& getBroker(); + private: typedef std::map UrlMap; typedef std::map > ConnectionMap; - - /** Message sent over the cluster. */ - typedef std::pair Message; - typedef PollableQueue EventQueue; + typedef sys::PollableQueue EventQueue; boost::function shutdownNext; @@ -127,7 +126,7 @@ class Cluster : public RefCounted, private Cpg::Handler boost::intrusive_ptr getConnection(const ConnectionId&); mutable sys::Monitor lock; // Protect access to members. - broker::Broker* broker; + broker::Broker& broker; boost::shared_ptr poller; Cpg cpg; Cpg::Name name; @@ -137,7 +136,7 @@ class Cluster : public RefCounted, private Cpg::Handler ConnectionMap connections; NoOpConnectionOutputHandler shadowOut; sys::DispatchHandle cpgDispatchHandle; - PollableQueue deliverQueue; + EventQueue deliverQueue; friend std::ostream& operator <<(std::ostream&, const Cluster&); friend std::ostream& operator <<(std::ostream&, const UrlMap::value_type&); diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp index 31447f2fd0..f4128634a6 100644 --- a/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -66,10 +66,10 @@ struct ClusterPlugin : public Plugin { ClusterValues values; ClusterOptions options; - boost::intrusive_ptr cluster; + Cluster* cluster; boost::scoped_ptr factory; - ClusterPlugin() : options(values) {} + ClusterPlugin() : options(values), cluster(0) {} Options* getOptions() { return &options; } @@ -78,20 +78,17 @@ struct ClusterPlugin : public Plugin { if (!broker || values.name.empty()) return; // Only if --cluster-name option was specified. QPID_LOG_IF(warning, cluster, "Ignoring multiple initialization of cluster plugin."); cluster = new Cluster(values.name, values.getUrl(broker->getPort()), *broker); - broker->addFinalizer(boost::bind(&ClusterPlugin::shutdown, this)); broker->setConnectionFactory( boost::shared_ptr( new ConnectionCodec::Factory(broker->getConnectionFactory(), *cluster))); } void earlyInitialize(Plugin::Target&) {} - - void shutdown() { cluster = 0; } }; static ClusterPlugin instance; // Static initialization. // For test purposes. -boost::intrusive_ptr getGlobalCluster() { return instance.cluster; } +Cluster& getGlobalCluster() { assert(instance.cluster); return *instance.cluster; } }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 506e982ffd..68d1b16dfa 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -106,5 +106,21 @@ void Connection::deliverBuffer(Buffer& buf) { deliver(decoder.frame); // FIXME aconway 2008-09-01: Queue frames for delivery in separate thread. } + +void Connection::sessionState(const SequenceNumber& /*replayStart*/, + const SequenceSet& /*sentIncomplete*/, + const SequenceNumber& /*expected*/, + const SequenceNumber& /*received*/, + const SequenceSet& /*unknownCompleted*/, + const SequenceSet& /*receivedIncomplete*/) +{ + // FIXME aconway 2008-09-10: TODO +} + +void Connection::shadowReady(uint64_t /*memberId*/, uint64_t /*connectionId*/) +{ + // FIXME aconway 2008-09-10: TODO +} + }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h index b3e151ce51..a30350585f 100644 --- a/cpp/src/qpid/cluster/Connection.h +++ b/cpp/src/qpid/cluster/Connection.h @@ -40,9 +40,7 @@ namespace framing { class AMQFrame; } namespace cluster { -/** - * Plug-in associated with broker::Connections, both local and shadow. - */ +/** Intercept broker::Connection calls for shadow and local cluster connections. */ class Connection : public RefCounted, public sys::ConnectionInputHandler, @@ -90,16 +88,13 @@ class Connection : sys::ConnectionInputHandler* create(sys::ConnectionOutputHandler* out, const std::string& id, bool isClient); // State dump methods. - virtual void sessionState(const framing::SequenceNumber& /*replayId*/, - const framing::SequenceNumber& /*sendId*/, - const framing::SequenceSet& /*sentIncomplete*/, - const framing::SequenceNumber& /*expectedId*/, - const framing::SequenceNumber& /*receivedId*/, - const framing::SequenceSet& /*unknownCompleted*/, - const framing::SequenceSet& /*receivedIncomplete*/) {} + virtual void sessionState(const SequenceNumber& replayStart, + const SequenceSet& sentIncomplete, + const SequenceNumber& expected, + const SequenceNumber& received, + const SequenceSet& unknownCompleted, const SequenceSet& receivedIncomplete); - virtual void shadowReady(uint64_t /*clusterId*/, - const std::string& /*userId*/) {} + virtual void shadowReady(uint64_t memberId, uint64_t connectionId); private: void sendDoOutput(); diff --git a/cpp/src/qpid/cluster/ConnectionCodec.cpp b/cpp/src/qpid/cluster/ConnectionCodec.cpp index f093a0cc1c..6179eab724 100644 --- a/cpp/src/qpid/cluster/ConnectionCodec.cpp +++ b/cpp/src/qpid/cluster/ConnectionCodec.cpp @@ -30,16 +30,16 @@ namespace cluster { sys::ConnectionCodec* ConnectionCodec::Factory::create(framing::ProtocolVersion v, sys::OutputControl& out, const std::string& id) { - if (v == framing::ProtocolVersion(0, 10)) + if (v == framing::ProtocolVersion(0, 10)) return new ConnectionCodec(out, id, cluster); return 0; } +// FIXME aconway 2008-08-27: outbound connections need to be made +// with proper qpid::client code for failover, get rid of this +// broker-side hack. sys::ConnectionCodec* ConnectionCodec::Factory::create(sys::OutputControl& out, const std::string& id) { - // FIXME aconway 2008-08-27: outbound connections need to be made - // with proper qpid::client code for failover, get rid of this - // broker-side hack. return next->create(out, id); } diff --git a/cpp/src/qpid/cluster/ConnectionCodec.h b/cpp/src/qpid/cluster/ConnectionCodec.h index 59ce20d821..22d752d174 100644 --- a/cpp/src/qpid/cluster/ConnectionCodec.h +++ b/cpp/src/qpid/cluster/ConnectionCodec.h @@ -50,7 +50,8 @@ class ConnectionCodec : public sys::ConnectionCodec { struct Factory : public sys::ConnectionCodec::Factory { boost::shared_ptr next; Cluster& cluster; - Factory(boost::shared_ptr f, Cluster& c) : next(f), cluster(c) {} + Factory(boost::shared_ptr f, Cluster& c) + : next(f), cluster(c) {} sys::ConnectionCodec* create(framing::ProtocolVersion, sys::OutputControl&, const std::string& id); sys::ConnectionCodec* create(sys::OutputControl&, const std::string& id); }; diff --git a/cpp/src/qpid/cluster/PollableCondition.cpp b/cpp/src/qpid/cluster/PollableCondition.cpp deleted file mode 100644 index eecf95ff8d..0000000000 --- a/cpp/src/qpid/cluster/PollableCondition.cpp +++ /dev/null @@ -1,100 +0,0 @@ -#ifndef QPID_SYS_LINUX_POLLABLECONDITION_CPP -#define QPID_SYS_LINUX_POLLABLECONDITION_CPP - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -// FIXME aconway 2008-08-11: this could be of more general interest, -// move to common lib. -// - -#include "qpid/sys/posix/PrivatePosix.h" -#include "qpid/cluster/PollableCondition.h" -#include "qpid/Exception.h" - -#include -#include - -namespace qpid { -namespace cluster { - -PollableCondition::PollableCondition() : IOHandle(new sys::IOHandlePrivate) { - int fds[2]; - if (::pipe(fds) == -1) - throw ErrnoException(QPID_MSG("Can't create PollableCondition")); - impl->fd = fds[0]; - writeFd = fds[1]; - if (::fcntl(impl->fd, F_SETFL, O_NONBLOCK) == -1) - throw ErrnoException(QPID_MSG("Can't create PollableCondition")); - if (::fcntl(writeFd, F_SETFL, O_NONBLOCK) == -1) - throw ErrnoException(QPID_MSG("Can't create PollableCondition")); -} - -bool PollableCondition::clear() { - char buf[256]; - ssize_t n; - bool wasSet = false; - while ((n = ::read(impl->fd, buf, sizeof(buf))) > 0) - wasSet = true; - if (n == -1 && errno != EAGAIN) throw ErrnoException(QPID_MSG("Error clearing PollableCondition")); - return wasSet; -} - -void PollableCondition::set() { - static const char dummy=0; - ssize_t n = ::write(writeFd, &dummy, 1); - if (n == -1 && errno != EAGAIN) throw ErrnoException("Error setting PollableCondition"); -} - - -#if 0 -// FIXME aconway 2008-08-12: More efficient Linux implementation using -// eventfd system call. Do a configure.ac test to enable this when -// eventfd is available. - -#include - -namespace qpid { -namespace cluster { - -PollableCondition::PollableCondition() : IOHandle(new sys::IOHandlePrivate) { - impl->fd = ::eventfd(0, 0); - if (impl->fd < 0) throw ErrnoException("conditionfd() failed"); -} - -bool PollableCondition::clear() { - char buf[8]; - ssize_t n = ::read(impl->fd, buf, 8); - if (n != 8) throw ErrnoException("read failed on conditionfd"); - return *reinterpret_cast(buf); -} - -void PollableCondition::set() { - static const uint64_t value=1; - ssize_t n = ::write(impl->fd, reinterpret_cast(&value), 8); - if (n != 8) throw ErrnoException("write failed on conditionfd"); -} - -#endif - -}} // namespace qpid::cluster - -#endif /*!QPID_SYS_LINUX_POLLABLECONDITION_CPP*/ diff --git a/cpp/src/qpid/cluster/PollableCondition.h b/cpp/src/qpid/cluster/PollableCondition.h deleted file mode 100644 index 6bfca6cabe..0000000000 --- a/cpp/src/qpid/cluster/PollableCondition.h +++ /dev/null @@ -1,60 +0,0 @@ -#ifndef QPID_SYS_POLLABLECONDITION_H -#define QPID_SYS_POLLABLECONDITION_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/sys/IOHandle.h" - -// FIXME aconway 2008-08-11: this could be of more general interest, -// move to sys namespace in common lib. -// - -namespace qpid { -namespace cluster { - -/** - * A pollable condition to integrate in-process conditions with IO - * conditions in a polling loop. - * - * Setting the condition makes it readable for a poller. - * - * Writable/disconnected conditions are undefined and should not be - * polled for. - */ -class PollableCondition : public sys::IOHandle { - public: - PollableCondition(); - - /** Set the condition, triggers readable in a poller. */ - void set(); - - /** Get the current state of the condition, then clear it. - *@return The state of the condition before it was cleared. - */ - bool clear(); - - private: - int writeFd; -}; -}} // namespace qpid::cluster - -#endif /*!QPID_SYS_POLLABLECONDITION_H*/ diff --git a/cpp/src/qpid/cluster/PollableQueue.h b/cpp/src/qpid/cluster/PollableQueue.h deleted file mode 100644 index 1c7720f5c6..0000000000 --- a/cpp/src/qpid/cluster/PollableQueue.h +++ /dev/null @@ -1,113 +0,0 @@ -#ifndef QPID_CLUSTER_POLLABLEQUEUE_H -#define QPID_CLUSTER_POLLABLEQUEUE_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/cluster/PollableCondition.h" -#include "qpid/sys/Dispatcher.h" -#include "qpid/sys/Mutex.h" -#include -#include -#include -#include - -namespace qpid { - -namespace sys { class Poller; } - -namespace cluster { - -// FIXME aconway 2008-08-11: this could be of more general interest, -// move to common lib. - -/** - * A queue that can be polled by sys::Poller. Any thread can push to - * the queue, on wakeup the poller thread processes all items on the - * queue by passing them to a callback in a batch. - */ -template -class PollableQueue { - typedef std::deque Queue; - - public: - typedef typename Queue::iterator iterator; - - /** Callback to process a range of items. */ - typedef boost::function Callback; - - /** Functor tempalate to create a Callback from a functor that handles a single item. */ - template struct ForEach { - F handleOne; - ForEach(const F& f) : handleOne(f) {} - void operator()(const iterator& i, const iterator& j) const { std::for_each(i, j, handleOne); } - }; - /** Function to create ForEach instances */ - template static ForEach forEach(const F& f) { return ForEach(f); } - - /** When the queue is selected by the poller, values are passed to callback cb. */ - explicit PollableQueue(const Callback& cb); - - /** Push a value onto the queue. Thread safe */ - void push(const T& t) { ScopedLock l(lock); queue.push_back(t); condition.set(); } - - /** Start polling. */ - void start(const boost::shared_ptr& poller) { handle.startWatch(poller); } - - /** Stop polling. */ - void stop() { handle.stopWatch(); } - - private: - typedef sys::Mutex::ScopedLock ScopedLock; - typedef sys::Mutex::ScopedUnlock ScopedUnlock; - - void dispatch(sys::DispatchHandle&); - - sys::Mutex lock; - Callback callback; - PollableCondition condition; - sys::DispatchHandle handle; - Queue queue; - Queue batch; -}; - -template PollableQueue::PollableQueue(const Callback& cb) // FIXME aconway 2008-08-12: - : callback(cb), - handle(condition, boost::bind(&PollableQueue::dispatch, this, _1), 0, 0) -{} - -template void PollableQueue::dispatch(sys::DispatchHandle& h) { - ScopedLock l(lock); // Lock for concurrent push() - batch.clear(); - batch.swap(queue); - condition.clear(); - { - // Process outside the lock to allow concurrent push. - ScopedUnlock u(lock); - callback(batch.begin(), batch.end()); - h.rewatch(); - } - batch.clear(); -} - -}} // namespace qpid::cluster - -#endif /*!QPID_CLUSTER_POLLABLEQUEUE_H*/ diff --git a/cpp/src/qpid/sys/PollableCondition.cpp b/cpp/src/qpid/sys/PollableCondition.cpp new file mode 100644 index 0000000000..5a3bd583cf --- /dev/null +++ b/cpp/src/qpid/sys/PollableCondition.cpp @@ -0,0 +1,100 @@ +#ifndef QPID_SYS_LINUX_POLLABLECONDITION_CPP +#define QPID_SYS_LINUX_POLLABLECONDITION_CPP + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +// FIXME aconway 2008-08-11: this could be of more general interest, +// move to common lib. +// + +#include "qpid/sys/posix/PrivatePosix.h" +#include "qpid/sys/PollableCondition.h" +#include "qpid/Exception.h" + +#include +#include + +namespace qpid { +namespace sys { + +PollableCondition::PollableCondition() : IOHandle(new sys::IOHandlePrivate) { + int fds[2]; + if (::pipe(fds) == -1) + throw ErrnoException(QPID_MSG("Can't create PollableCondition")); + impl->fd = fds[0]; + writeFd = fds[1]; + if (::fcntl(impl->fd, F_SETFL, O_NONBLOCK) == -1) + throw ErrnoException(QPID_MSG("Can't create PollableCondition")); + if (::fcntl(writeFd, F_SETFL, O_NONBLOCK) == -1) + throw ErrnoException(QPID_MSG("Can't create PollableCondition")); +} + +bool PollableCondition::clear() { + char buf[256]; + ssize_t n; + bool wasSet = false; + while ((n = ::read(impl->fd, buf, sizeof(buf))) > 0) + wasSet = true; + if (n == -1 && errno != EAGAIN) throw ErrnoException(QPID_MSG("Error clearing PollableCondition")); + return wasSet; +} + +void PollableCondition::set() { + static const char dummy=0; + ssize_t n = ::write(writeFd, &dummy, 1); + if (n == -1 && errno != EAGAIN) throw ErrnoException("Error setting PollableCondition"); +} + + +#if 0 +// FIXME aconway 2008-08-12: More efficient Linux implementation using +// eventfd system call. Move to separate file & do configure.ac test +// to enable this when ::eventfd() is available. + +#include + +namespace qpid { +namespace sys { + +PollableCondition::PollableCondition() : IOHandle(new sys::IOHandlePrivate) { + impl->fd = ::eventfd(0, 0); + if (impl->fd < 0) throw ErrnoException("conditionfd() failed"); +} + +bool PollableCondition::clear() { + char buf[8]; + ssize_t n = ::read(impl->fd, buf, 8); + if (n != 8) throw ErrnoException("read failed on conditionfd"); + return *reinterpret_cast(buf); +} + +void PollableCondition::set() { + static const uint64_t value=1; + ssize_t n = ::write(impl->fd, reinterpret_cast(&value), 8); + if (n != 8) throw ErrnoException("write failed on conditionfd"); +} + +#endif + +}} // namespace qpid::sys + +#endif /*!QPID_SYS_LINUX_POLLABLECONDITION_CPP*/ diff --git a/cpp/src/qpid/sys/PollableCondition.h b/cpp/src/qpid/sys/PollableCondition.h new file mode 100644 index 0000000000..6f0e12a474 --- /dev/null +++ b/cpp/src/qpid/sys/PollableCondition.h @@ -0,0 +1,60 @@ +#ifndef QPID_SYS_POLLABLECONDITION_H +#define QPID_SYS_POLLABLECONDITION_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/IOHandle.h" + +// FIXME aconway 2008-08-11: this could be of more general interest, +// move to sys namespace in common lib. +// + +namespace qpid { +namespace sys { + +/** + * A pollable condition to integrate in-process conditions with IO + * conditions in a polling loop. + * + * Setting the condition makes it readable for a poller. + * + * Writable/disconnected conditions are undefined and should not be + * polled for. + */ +class PollableCondition : public sys::IOHandle { + public: + PollableCondition(); + + /** Set the condition, triggers readable in a poller. */ + void set(); + + /** Get the current state of the condition, then clear it. + *@return The state of the condition before it was cleared. + */ + bool clear(); + + private: + int writeFd; +}; +}} // namespace qpid::sys + +#endif /*!QPID_SYS_POLLABLECONDITION_H*/ diff --git a/cpp/src/qpid/sys/PollableQueue.h b/cpp/src/qpid/sys/PollableQueue.h new file mode 100644 index 0000000000..2e5d3a0d3d --- /dev/null +++ b/cpp/src/qpid/sys/PollableQueue.h @@ -0,0 +1,113 @@ +#ifndef QPID_SYS_POLLABLEQUEUE_H +#define QPID_SYS_POLLABLEQUEUE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/PollableCondition.h" +#include "qpid/sys/Dispatcher.h" +#include "qpid/sys/Mutex.h" +#include +#include +#include +#include + +namespace qpid { + +namespace sys { class Poller; } + +namespace sys { + +// FIXME aconway 2008-08-11: this could be of more general interest, +// move to common lib. + +/** + * A queue that can be polled by sys::Poller. Any thread can push to + * the queue, on wakeup the poller thread processes all items on the + * queue by passing them to a callback in a batch. + */ +template +class PollableQueue { + typedef std::deque Queue; + + public: + typedef typename Queue::iterator iterator; + + /** Callback to process a range of items. */ + typedef boost::function Callback; + + /** Functor tempalate to create a Callback from a functor that handles a single item. */ + template struct ForEach { + F handleOne; + ForEach(const F& f) : handleOne(f) {} + void operator()(const iterator& i, const iterator& j) const { std::for_each(i, j, handleOne); } + }; + /** Function to create ForEach instances */ + template static ForEach forEach(const F& f) { return ForEach(f); } + + /** When the queue is selected by the poller, values are passed to callback cb. */ + explicit PollableQueue(const Callback& cb); + + /** Push a value onto the queue. Thread safe */ + void push(const T& t) { ScopedLock l(lock); queue.push_back(t); condition.set(); } + + /** Start polling. */ + void start(const boost::shared_ptr& poller) { handle.startWatch(poller); } + + /** Stop polling. */ + void stop() { handle.stopWatch(); } + + private: + typedef sys::Mutex::ScopedLock ScopedLock; + typedef sys::Mutex::ScopedUnlock ScopedUnlock; + + void dispatch(sys::DispatchHandle&); + + sys::Mutex lock; + Callback callback; + PollableCondition condition; + sys::DispatchHandle handle; + Queue queue; + Queue batch; +}; + +template PollableQueue::PollableQueue(const Callback& cb) // FIXME aconway 2008-08-12: + : callback(cb), + handle(condition, boost::bind(&PollableQueue::dispatch, this, _1), 0, 0) +{} + +template void PollableQueue::dispatch(sys::DispatchHandle& h) { + ScopedLock l(lock); // Lock for concurrent push() + batch.clear(); + batch.swap(queue); + condition.clear(); + { + // Process outside the lock to allow concurrent push. + ScopedUnlock u(lock); + callback(batch.begin(), batch.end()); + h.rewatch(); + } + batch.clear(); +} + +}} // namespace qpid::sys + +#endif /*!QPID_SYS_POLLABLEQUEUE_H*/ diff --git a/cpp/src/tests/BrokerFixture.h b/cpp/src/tests/BrokerFixture.h index 09cca066ef..4e10f82809 100644 --- a/cpp/src/tests/BrokerFixture.h +++ b/cpp/src/tests/BrokerFixture.h @@ -92,7 +92,8 @@ struct ClientT { SessionType session; qpid::client::SubscriptionManager subs; qpid::client::LocalQueue lq; - ClientT(uint16_t port) : connection(port), session(connection.newSession()), subs(session) {} + ClientT(uint16_t port, const std::string& name=std::string()) + : connection(port), session(connection.newSession(name)), subs(session) {} ~ClientT() { connection.close(); } }; diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp index d082d74367..871aa0c657 100644 --- a/cpp/src/tests/cluster_test.cpp +++ b/cpp/src/tests/cluster_test.cpp @@ -21,13 +21,14 @@ #include "ForkedBroker.h" #include "BrokerFixture.h" -#include "qpid/cluster/Cpg.h" +#include "qpid/client/Connection.h" +#include "qpid/client/Session.h" #include "qpid/cluster/Cluster.h" +#include "qpid/cluster/Cpg.h" #include "qpid/cluster/DumpClient.h" #include "qpid/framing/AMQBody.h" -#include "qpid/client/Connection.h" -#include "qpid/client/Session.h" #include "qpid/framing/Uuid.h" +#include "qpid/framing/reply_exceptions.h" #include "qpid/log/Logger.h" #include @@ -41,7 +42,7 @@ namespace qpid { namespace cluster { -boost::intrusive_ptr getGlobalCluster(); // Defined in qpid/cluster/ClusterPlugin.cpp +Cluster& getGlobalCluster(); // Defined in qpid/cluster/ClusterPlugin.cpp }} // namespace qpid::cluster @@ -81,11 +82,11 @@ ClusterFixture::ClusterFixture(size_t n) : name(Uuid(true).str()) { add(n); // Wait for all n members to join the cluster int retry=20; // TODO aconway 2008-07-16: nasty sleeps, clean this up. - while (retry && getGlobalCluster()->size() != n) { + while (retry && getGlobalCluster().size() != n) { ::sleep(1); --retry; } - BOOST_REQUIRE_EQUAL(n, getGlobalCluster()->size()); + BOOST_REQUIRE_EQUAL(n, getGlobalCluster().size()); } void ClusterFixture::add() { @@ -135,7 +136,37 @@ ostream& operator<<(ostream& o, const pair& array) { return o; } -QPID_AUTO_TEST_CASE(testDumpClient) { +#if 0 // FIXME aconway 2008-09-10: finish & enable +QPID_AUTO_TEST_CASE(testDumpConsumers) { + ClusterFixture cluster(1); + Client a(cluster[0]); + a.session.queueDeclare("q"); + a.subs.subscribe(a.lq, "q"); + + cluster.add(); + Client b(cluster[1]); + try { + b.connection.newSession(a.session.getId().getName()); + BOOST_FAIL("Expected SessionBusyException for " << a.session.getId().getName()); + } catch (const SessionBusyException&) {} + + // Transfer some messages to the subscription by client a. + Message m; + a.session.messageTransfer(arg::bindingKey="q", arg::content=Message("aaa", "q")); + BOOST_CHECK(a.lq.get(m, TIME_SEC)); + BOOST_CHECK_EQUAL(m.getData(), "aaa"); + + b.session.messageTransfer(arg::bindingKey="q", arg::content=Message("bbb", "q")); + BOOST_CHECK(a.lq.get(m, TIME_SEC)); + BOOST_CHECK_EQUAL(m.getData(), "bbb"); + + // Verify that the queue has been drained on both brokers. + // This proves that the consumer was replicated when the second broker joined. + BOOST_CHECK_EQUAL(a.session.queueQuery("q").getMessageCount(), 0); +} +#endif + +QPID_AUTO_TEST_CASE(testDumpClientSharedState) { BrokerFixture donor, receiver; { Client c(donor.getPort()); @@ -146,13 +177,13 @@ QPID_AUTO_TEST_CASE(testDumpClient) { c.session.exchangeDeclare(arg::exchange="exd", arg::type="direct", arg::arguments=args); c.session.exchangeBind(arg::exchange="exd", arg::queue="qa", arg::bindingKey="foo"); - c.session.messageTransfer(arg::destination="exd", arg::content=TransferContent("one", "foo")); + c.session.messageTransfer(arg::destination="exd", arg::content=Message("one", "foo")); c.session.exchangeDeclare("ext", arg::type="topic"); c.session.exchangeBind(arg::exchange="ext", arg::queue="qb", arg::bindingKey="bar"); c.subs.subscribe(c.lq, "qa", FlowControl::messageCredit(0)); - c.session.messageTransfer(arg::destination="ext", arg::content=TransferContent("one", "bar")); - c.session.messageTransfer(arg::destination="ext", arg::content=TransferContent("two", "bar")); + c.session.messageTransfer(arg::destination="ext", arg::content=Message("one", "bar")); + c.session.messageTransfer(arg::destination="ext", arg::content=Message("two", "bar")); c.session.close(); c.connection.close(); @@ -202,11 +233,11 @@ QPID_AUTO_TEST_CASE(testDumpClient) { BOOST_CHECK_EQUAL(m.getDeliveryProperties().getRoutingKey(), "bar"); // Verify bindings - r.session.messageTransfer(arg::destination="exd", arg::content=TransferContent("xxx", "foo")); + r.session.messageTransfer(arg::destination="exd", arg::content=Message("xxx", "foo")); BOOST_CHECK(r.subs.get(m, "qa")); BOOST_CHECK_EQUAL(m.getData(), "xxx"); - r.session.messageTransfer(arg::destination="ext", arg::content=TransferContent("yyy", "bar")); + r.session.messageTransfer(arg::destination="ext", arg::content=Message("yyy", "bar")); BOOST_CHECK(r.subs.get(m, "qb")); BOOST_CHECK_EQUAL(m.getData(), "yyy"); @@ -254,8 +285,8 @@ QPID_AUTO_TEST_CASE(testMessageEnqueue) { ClusterFixture cluster(2); Client c0(cluster[0]); c0.session.queueDeclare("q"); - c0.session.messageTransfer(arg::content=TransferContent("foo", "q")); - c0.session.messageTransfer(arg::content=TransferContent("bar", "q")); + c0.session.messageTransfer(arg::content=Message("foo", "q")); + c0.session.messageTransfer(arg::content=Message("bar", "q")); c0.session.close(); Client c1(cluster[1]); Message msg; @@ -268,19 +299,19 @@ QPID_AUTO_TEST_CASE(testMessageEnqueue) { QPID_AUTO_TEST_CASE(testMessageDequeue) { // Enqueue on one broker, dequeue on two others. ClusterFixture cluster (3); - Client c0(cluster[0]); + Client c0(cluster[0], "c0"); c0.session.queueDeclare("q"); - c0.session.messageTransfer(arg::content=TransferContent("foo", "q")); - c0.session.messageTransfer(arg::content=TransferContent("bar", "q")); + c0.session.messageTransfer(arg::content=Message("foo", "q")); + c0.session.messageTransfer(arg::content=Message("bar", "q")); Message msg; // Dequeue on 2 others, ensure correct order. - Client c1(cluster[1]); + Client c1(cluster[1], "c1"); BOOST_CHECK(c1.subs.get(msg, "q")); BOOST_CHECK_EQUAL("foo", msg.getData()); - Client c2(cluster[2]); + Client c2(cluster[2], "c2"); BOOST_CHECK(c1.subs.get(msg, "q")); BOOST_CHECK_EQUAL("bar", msg.getData()); @@ -298,8 +329,8 @@ QPID_AUTO_TEST_CASE(testDequeueWaitingSubscription) { c0.subs.subscribe(c0.lq, "q", FlowControl::messageCredit(2)); // Now send messages Client c1(cluster[1]); - c1.session.messageTransfer(arg::content=TransferContent("foo", "q")); - c1.session.messageTransfer(arg::content=TransferContent("bar", "q")); + c1.session.messageTransfer(arg::content=Message("foo", "q")); + c1.session.messageTransfer(arg::content=Message("bar", "q")); // Check they arrived Message m; -- cgit v1.2.1