diff options
author | Alan Conway <aconway@apache.org> | 2009-02-09 22:25:26 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2009-02-09 22:25:26 +0000 |
commit | 3a60db0672b78a75c52f39f5cefeeb00d3eeba97 (patch) | |
tree | 3f9c211e3649a3ef8a883e95d741387cf402dd17 /cpp/src/qpid | |
parent | c9a654925355a4dd128d5111af862e8be89e0a45 (diff) | |
download | qpid-python-3a60db0672b78a75c52f39f5cefeeb00d3eeba97.tar.gz |
Cluster support for message time-to-live.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@742774 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.h | 13 | ||||
-rw-r--r-- | cpp/src/qpid/broker/ExpiryPolicy.cpp | 36 | ||||
-rw-r--r-- | cpp/src/qpid/broker/ExpiryPolicy.h | 44 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Message.cpp | 22 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Message.h | 9 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 11 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 31 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 12 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ClusterPlugin.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 8 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ExpiryPolicy.cpp | 80 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ExpiryPolicy.h | 76 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/UpdateClient.cpp | 11 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/UpdateClient.h | 8 | ||||
-rw-r--r-- | cpp/src/qpid/framing/AMQFrame.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/framing/FrameSet.h | 4 |
18 files changed, 327 insertions, 48 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 091f67ec58..95f55bb596 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -30,6 +30,7 @@ #include "SecureConnectionFactory.h" #include "TopicExchange.h" #include "Link.h" +#include "ExpiryPolicy.h" #include "qmf/org/apache/qpid/broker/Package.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerEcho.h" @@ -150,6 +151,7 @@ Broker::Broker(const Broker::Options& conf) : queueCleaner(queues, timer), queueEvents(poller), recovery(true), + expiryPolicy(new ExpiryPolicy), getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this)) { if (conf.enableMgmt) { diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index 71b69b51aa..a52a0f67e0 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -36,6 +36,7 @@ #include "Vhost.h" #include "System.h" #include "Timer.h" +#include "ExpiryPolicy.h" #include "qpid/management/Manageable.h" #include "qpid/management/ManagementBroker.h" #include "qmf/org/apache/qpid/broker/Broker.h" @@ -65,6 +66,8 @@ struct Url; namespace broker { +class ExpiryPolicy; + static const uint16_t DEFAULT_PORT=5672; struct NoSuchTransportException : qpid::Exception @@ -111,6 +114,8 @@ class Broker : public sys::Runnable, public Plugin::Target, private: typedef std::map<std::string, boost::shared_ptr<sys::ProtocolFactory> > ProtocolFactoryMap; + void declareStandardExchange(const std::string& name, const std::string& type); + boost::shared_ptr<sys::Poller> poller; Options config; management::ManagementAgent::Singleton managementAgentSingleton; @@ -132,14 +137,11 @@ class Broker : public sys::Runnable, public Plugin::Target, System::shared_ptr systemObject; QueueCleaner queueCleaner; QueueEvents queueEvents; - - void declareStandardExchange(const std::string& name, const std::string& type); - std::vector<Url> knownBrokers; std::vector<Url> getKnownBrokersImpl(); std::string federationTag; - bool recovery; + boost::intrusive_ptr<ExpiryPolicy> expiryPolicy; public: @@ -180,6 +182,9 @@ class Broker : public sys::Runnable, public Plugin::Target, Options& getOptions() { return config; } QueueEvents& getQueueEvents() { return queueEvents; } + void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) { expiryPolicy = e; } + boost::intrusive_ptr<ExpiryPolicy> getExpiryPolicy() { return expiryPolicy; } + SessionManager& getSessionManager() { return sessionManager; } const std::string& getFederationTag() const { return federationTag; } diff --git a/cpp/src/qpid/broker/ExpiryPolicy.cpp b/cpp/src/qpid/broker/ExpiryPolicy.cpp new file mode 100644 index 0000000000..907f1e56e1 --- /dev/null +++ b/cpp/src/qpid/broker/ExpiryPolicy.cpp @@ -0,0 +1,36 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "ExpiryPolicy.h" +#include "Message.h" +#include "qpid/sys/Time.h" + +namespace qpid { +namespace broker { + +ExpiryPolicy::~ExpiryPolicy() {} + +void ExpiryPolicy::willExpire(Message&) {} + +bool ExpiryPolicy::hasExpired(Message& m) { + return m.getExpiration() < sys::AbsTime::now(); +} + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/ExpiryPolicy.h b/cpp/src/qpid/broker/ExpiryPolicy.h new file mode 100644 index 0000000000..1b7316f6f9 --- /dev/null +++ b/cpp/src/qpid/broker/ExpiryPolicy.h @@ -0,0 +1,44 @@ +#ifndef QPID_BROKER_EXPIRYPOLICY_H +#define QPID_BROKER_EXPIRYPOLICY_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/RefCounted.h" + +namespace qpid { +namespace broker { + +class Message; + +/** + * Default expiry policy. + */ +class ExpiryPolicy : public RefCounted +{ + public: + virtual ~ExpiryPolicy(); + virtual void willExpire(Message&); + virtual bool hasExpired(Message&); +}; +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_EXPIRYPOLICY_H*/ diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp index e5a0c3e9e1..ce0477b08c 100644 --- a/cpp/src/qpid/broker/Message.cpp +++ b/cpp/src/qpid/broker/Message.cpp @@ -21,6 +21,7 @@ #include "Message.h" #include "ExchangeRegistry.h" +#include "ExpiryPolicy.h" #include "qpid/StringUtils.h" #include "qpid/framing/frame_functors.h" #include "qpid/framing/FieldTable.h" @@ -316,24 +317,29 @@ void Message::addTraceId(const std::string& id) } } -void Message::setTimestamp() +void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e) { DeliveryProperties* props = getProperties<DeliveryProperties>(); - //Spec states that timestamp should be set, evaluate the - //performance impact before re-enabling this: - //time_t now = ::time(0); - //props->setTimestamp(now); if (props->getTtl()) { - //set expiration (nb: ttl is in millisecs, time_t is in secs) + // AMQP requires setting the expiration property to be posix + // time_t in seconds. TTL is in milliseconds time_t now = ::time(0); props->setExpiration(now + (props->getTtl()/1000)); + // Use higher resolution time for the internal expiry calculation. expiration = AbsTime(AbsTime::now(), Duration(props->getTtl() * TIME_MSEC)); + setExpiryPolicy(e); } } -bool Message::hasExpired() const +void Message::setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) { + expiryPolicy = e; + if (expiryPolicy) + expiryPolicy->willExpire(*this); +} + +bool Message::hasExpired() { - return expiration < FAR_FUTURE && expiration < AbsTime::now(); + return expiryPolicy && expiryPolicy->hasExpired(*this); } boost::intrusive_ptr<Message>& Message::getReplacementMessage(const Queue* qfor) const diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h index de716e9441..96fcf61dfc 100644 --- a/cpp/src/qpid/broker/Message.h +++ b/cpp/src/qpid/broker/Message.h @@ -45,6 +45,7 @@ class Exchange; class ExchangeRegistry; class MessageStore; class Queue; +class ExpiryPolicy; class Message : public PersistableMessage { public: @@ -73,8 +74,11 @@ public: const framing::FieldTable* getApplicationHeaders() const; bool isPersistent(); bool requiresAccept(); - void setTimestamp(); - bool hasExpired() const; + + void setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e); + void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e); + bool hasExpired(); + sys::AbsTime getExpiration() const { return expiration; } framing::FrameSet& getFrames() { return frames; } const framing::FrameSet& getFrames() const { return frames; } @@ -171,6 +175,7 @@ public: ConnectionToken* publisher; mutable MessageAdapter* adapter; qpid::sys::AbsTime expiration; + boost::intrusive_ptr<ExpiryPolicy> expiryPolicy; static TransferAdapter TRANSFER; diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index f9f75679e5..13a8c649d2 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -358,14 +358,13 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) { std::string exchangeName = msg->getExchangeName(); //TODO: the following should be hidden behind message (using MessageAdapter or similar) - // Do not replace the delivery-properties.exchange if it is is already set. - // This is used internally (by the cluster) to force the exchange name on a message. - // The client library ensures this is always empty for messages from normal clients. if (msg->isA<MessageTransferBody>()) { - if (!msg->hasProperties<DeliveryProperties>() || - msg->getProperties<DeliveryProperties>()->getExchange().empty()) + // Do not replace the delivery-properties.exchange if it is is already set. + // This is used internally (by the cluster) to force the exchange name on a message. + // The client library ensures this is always empty for messages from normal clients. + if (!msg->hasProperties<DeliveryProperties>() || msg->getProperties<DeliveryProperties>()->getExchange().empty()) msg->getProperties<DeliveryProperties>()->setExchange(exchangeName); - msg->setTimestamp(); + msg->setTimestamp(getSession().getBroker().getExpiryPolicy()); } if (!cacheExchange || cacheExchange->getName() != exchangeName){ cacheExchange = session.getBroker().getExchanges().get(exchangeName); diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 8e6ece11cc..be04eebc57 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -76,6 +76,7 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { void ready(const std::string& url) { cluster.ready(member, url, l); } void configChange(const std::string& addresses) { cluster.configChange(member, addresses, l); } void updateOffer(uint64_t updatee, const Uuid& id) { cluster.updateOffer(member, updatee, id, l); } + void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); } void shutdown() { cluster.shutdown(member, l); } bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); } @@ -103,6 +104,8 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b poller), connections(*this), decoder(boost::bind(&PollableFrameQueue::push, &deliverFrameQueue, _1)), + expiryPolicy(new ExpiryPolicy(boost::bind(&Cluster::isLeader, this), mcast, myId, broker.getTimer())), + frameId(0), initialized(false), state(INIT), lastSize(0), @@ -134,6 +137,7 @@ void Cluster::initialize() { myUrl = Url::getIpAddressesUrl(broker.getPort(broker::Broker::TCP_TRANSPORT)); QPID_LOG(notice, *this << " joining cluster " << name << " with url=" << myUrl); broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this); + broker.setExpiryPolicy(expiryPolicy); dispatcher.start(); deliverEventQueue.start(); deliverFrameQueue.start(); @@ -238,7 +242,8 @@ void Cluster::deliveredEvent(const Event& e) { // Handler for deliverFrameQueue void Cluster::deliveredFrame(const EventFrame& e) { - Mutex::ScopedLock l(lock); + Mutex::ScopedLock l(lock); + const_cast<AMQFrame&>(e.frame).setClusterId(frameId++); QPID_LOG(trace, *this << " DLVR: " << e); QPID_LATENCY_RECORD("delivered frame queue", e.frame); if (e.isCluster()) { // Cluster control frame @@ -333,22 +338,23 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& state = JOINER; QPID_LOG(info, *this << " joining cluster: " << map); mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), myId); - ClusterMap::Set members = map.getAlive(); - members.erase(myId); - myElders = members; + elders = map.getAlive(); + elders.erase(myId); broker.getLinks().setPassive(true); } } else if (state >= READY && memberChange) { memberUpdate(l); - myElders = ClusterMap::intersection(myElders, map.getAlive()); - if (myElders.empty()) { + elders = ClusterMap::intersection(elders, map.getAlive()); + if (elders.empty()) { //assume we are oldest, reactive links if necessary broker.getLinks().setPassive(false); } } } +bool Cluster::isLeader() const { return elders.empty(); } + void Cluster::tryMakeOffer(const MemberId& id, Lock& ) { if (state == READY && map.isJoiner(id)) { state = OFFER; @@ -420,15 +426,16 @@ void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock&) { deliverFrameQueue.stop(); if (updateThread.id()) updateThread.join(); // Join the previous updatethread. updateThread = Thread( - new UpdateClient(myId, updatee, url, broker, map, connections.values(), - boost::bind(&Cluster::updateOutDone, this), - boost::bind(&Cluster::updateOutError, this, _1))); + new UpdateClient(myId, updatee, url, broker, map, frameId, connections.values(), + boost::bind(&Cluster::updateOutDone, this), + boost::bind(&Cluster::updateOutError, this, _1))); } // Called in update thread. -void Cluster::updateInDone(const ClusterMap& m) { +void Cluster::updateInDone(const ClusterMap& m, uint64_t fid) { Lock l(lock); updatedMap = m; + frameId = fid; checkUpdateIn(l); } @@ -573,4 +580,8 @@ void Cluster::setClusterId(const Uuid& uuid) { QPID_LOG(debug, *this << " cluster-id = " << clusterId); } +void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) { + expiryPolicy->deliverExpire(id); +} + }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index f7955aa743..4d994943f7 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -31,6 +31,7 @@ #include "Quorum.h" #include "Decoder.h" #include "PollableQueue.h" +#include "ExpiryPolicy.h" #include "qpid/broker/Broker.h" #include "qpid/sys/Monitor.h" @@ -89,7 +90,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { void leave(); // Update completed - called in update thread - void updateInDone(const ClusterMap&); + void updateInDone(const ClusterMap&, uint64_t frameId); MemberId getId() const; broker::Broker& getBroker() const; @@ -100,6 +101,8 @@ class Cluster : private Cpg::Handler, public management::Manageable { size_t getReadMax() { return readMax; } size_t getWriteEstimate() { return writeEstimate; } + + bool isLeader() const; // Called in deliver thread. private: typedef sys::Monitor::ScopedLock Lock; @@ -129,6 +132,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { void updateOffer(const MemberId& updater, uint64_t updatee, const framing::Uuid&, Lock&); void ready(const MemberId&, const std::string&, Lock&); void configChange(const MemberId&, const std::string& addresses, Lock& l); + void messageExpired(const MemberId&, uint64_t, Lock& l); void shutdown(const MemberId&, Lock&); void deliveredEvent(const Event&); void deliveredFrame(const EventFrame&); @@ -185,7 +189,6 @@ class Cluster : private Cpg::Handler, public management::Manageable { const size_t writeEstimate; framing::Uuid clusterId; NoOpConnectionOutputHandler shadowOut; - ClusterMap::Set myElders; qpid::management::ManagementAgent* mAgent; // Thread safe members @@ -197,8 +200,11 @@ class Cluster : private Cpg::Handler, public management::Manageable { boost::shared_ptr<FailoverExchange> failoverExchange; Quorum quorum; - // Called only from event delivery thread + // Used only in delivery thread Decoder decoder; + ClusterMap::Set elders; + boost::intrusive_ptr<ExpiryPolicy> expiryPolicy; + uint64_t frameId; // Used only during initialization bool initialized; diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp index 2f7d12dcfe..d54d8389e0 100644 --- a/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -54,7 +54,7 @@ struct ClusterValues { bool quorum; size_t readMax, writeEstimate; - ClusterValues() : quorum(false), readMax(3), writeEstimate(64) {} + ClusterValues() : quorum(false), readMax(10), writeEstimate(64) {} Url getUrl(uint16_t port) const { if (url.empty()) return Url::getIpAddressesUrl(port); diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 9ea79fa2b6..295705e967 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -127,10 +127,6 @@ bool Connection::checkUnsupported(const AMQBody& body) { case DTX_CLASS_ID: message = "DTX transactions are not currently supported by cluster."; break; } } - else if (body.type() == HEADER_BODY) { - const DeliveryProperties* dp = static_cast<const AMQHeaderBody&>(body).get<DeliveryProperties>(); - if (dp && dp->getTtl()) message = "Message TTL is not currently supported by cluster."; - } if (!message.empty()) connection.close(connection::CLOSE_CODE_FRAMING_ERROR, message); return !message.empty(); @@ -259,9 +255,9 @@ void Connection::shadowReady(uint64_t memberId, uint64_t connectionId) { self = shadow; } -void Connection::membership(const FieldTable& joiners, const FieldTable& members) { +void Connection::membership(const FieldTable& joiners, const FieldTable& members, uint64_t frameId) { QPID_LOG(debug, cluster << " incoming update complete on connection " << *this); - cluster.updateInDone(ClusterMap(joiners, members)); + cluster.updateInDone(ClusterMap(joiners, members), frameId); self.second = 0; // Mark this as completed update connection. } diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h index 160855dc2d..a6e9aa65f9 100644 --- a/cpp/src/qpid/cluster/Connection.h +++ b/cpp/src/qpid/cluster/Connection.h @@ -119,7 +119,7 @@ class Connection : void shadowReady(uint64_t memberId, uint64_t connectionId); - void membership(const framing::FieldTable&, const framing::FieldTable&); + void membership(const framing::FieldTable&, const framing::FieldTable&, uint64_t frameId); void deliveryRecord(const std::string& queue, const framing::SequenceNumber& position, diff --git a/cpp/src/qpid/cluster/ExpiryPolicy.cpp b/cpp/src/qpid/cluster/ExpiryPolicy.cpp new file mode 100644 index 0000000000..690acfc3ad --- /dev/null +++ b/cpp/src/qpid/cluster/ExpiryPolicy.cpp @@ -0,0 +1,80 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ExpiryPolicy.h" +#include "Multicaster.h" +#include "qpid/framing/ClusterMessageExpiredBody.h" +#include "qpid/sys/Time.h" +#include "qpid/broker/Message.h" +#include "qpid/broker/Timer.h" +#include "qpid/log/Statement.h" + +namespace qpid { +namespace cluster { + +ExpiryPolicy::ExpiryPolicy(const boost::function<bool()> & f, Multicaster& m, const MemberId& id, broker::Timer& t) + : expiredPolicy(new Expired), isLeader(f), mcast(m), memberId(id), timer(t) {} + +namespace { +uint64_t clusterId(const broker::Message& m) { + assert(m.getFrames().begin() != m.getFrames().end()); + return m.getFrames().begin()->getClusterId(); +} + +struct ExpiryTask : public broker::TimerTask { + ExpiryTask(const boost::intrusive_ptr<ExpiryPolicy>& policy, uint64_t id, sys::AbsTime when) + : TimerTask(when), expiryPolicy(policy), messageId(id) {} + void fire() { expiryPolicy->sendExpire(messageId); } + boost::intrusive_ptr<ExpiryPolicy> expiryPolicy; + const uint64_t messageId; +}; +} + +void ExpiryPolicy::willExpire(broker::Message& m) { + timer.add(new ExpiryTask(this, clusterId(m), m.getExpiration())); +} + +bool ExpiryPolicy::hasExpired(broker::Message& m) { + sys::Mutex::ScopedLock l(lock); + IdSet::iterator i = expired.find(clusterId(m)); + if (i != expired.end()) { + expired.erase(i); + const_cast<broker::Message&>(m).setExpiryPolicy(expiredPolicy); // hasExpired() == true; + return true; + } + return false; +} + +void ExpiryPolicy::sendExpire(uint64_t id) { + sys::Mutex::ScopedLock l(lock); + if (isLeader()) + mcast.mcastControl(framing::ClusterMessageExpiredBody(framing::ProtocolVersion(), id), memberId); +} + +void ExpiryPolicy::deliverExpire(uint64_t id) { + sys::Mutex::ScopedLock l(lock); + expired.insert(id); +} + +bool ExpiryPolicy::Expired::hasExpired(broker::Message&) { return true; } +void ExpiryPolicy::Expired::willExpire(broker::Message&) { } + +}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/ExpiryPolicy.h b/cpp/src/qpid/cluster/ExpiryPolicy.h new file mode 100644 index 0000000000..7fb63c731e --- /dev/null +++ b/cpp/src/qpid/cluster/ExpiryPolicy.h @@ -0,0 +1,76 @@ +#ifndef QPID_CLUSTER_EXPIRYPOLICY_H +#define QPID_CLUSTER_EXPIRYPOLICY_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "types.h" +#include "qpid/broker/ExpiryPolicy.h" +#include "qpid/sys/Mutex.h" +#include <boost/function.hpp> +#include <boost/intrusive_ptr.hpp> +#include <set> + +namespace qpid { + +namespace broker { class Timer; } + +namespace cluster { +class Multicaster; + +/** + * Cluster expiry policy + */ +class ExpiryPolicy : public broker::ExpiryPolicy +{ + public: + ExpiryPolicy(const boost::function<bool()> & isLeader, Multicaster&, const MemberId&, broker::Timer&); + + void willExpire(broker::Message&); + + bool hasExpired(broker::Message&); + + // Send expiration notice to cluster. + void sendExpire(uint64_t); + + // Cluster delivers expiry notice. + void deliverExpire(uint64_t); + + private: + sys::Mutex lock; + typedef std::set<uint64_t> IdSet; + + struct Expired : public broker::ExpiryPolicy { + bool hasExpired(broker::Message&); + void willExpire(broker::Message&); + }; + + IdSet expired; + boost::intrusive_ptr<Expired> expiredPolicy; + boost::function<bool()> isLeader; + Multicaster& mcast; + MemberId memberId; + broker::Timer& timer; +}; + +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_EXPIRYPOLICY_H*/ diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp index 91d4c6d3ce..e50c936b50 100644 --- a/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/cpp/src/qpid/cluster/UpdateClient.cpp @@ -86,10 +86,12 @@ void send(client::AsyncSession& s, const AMQBody& body) { // TODO aconway 2008-09-24: optimization: update connections/sessions in parallel. UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, const Url& url, - broker::Broker& broker, const ClusterMap& m, const Cluster::Connections& cons, - const boost::function<void()>& ok, - const boost::function<void(const std::exception&)>& fail) - : updaterId(updater), updateeId(updatee), updateeUrl(url), updaterBroker(broker), map(m), connections(cons), + broker::Broker& broker, const ClusterMap& m, uint64_t frameId_, + const Cluster::Connections& cons, + const boost::function<void()>& ok, + const boost::function<void(const std::exception&)>& fail) + : updaterId(updater), updateeId(updatee), updateeUrl(url), updaterBroker(broker), map(m), + frameId(frameId_), connections(cons), connection(catchUpConnection()), shadowConnection(catchUpConnection()), done(ok), failed(fail) { @@ -120,6 +122,7 @@ void UpdateClient::update() { ClusterConnectionMembershipBody membership; map.toMethodBody(membership); + membership.setFrameId(frameId); AMQFrame frame(membership); client::ConnectionAccess::getImpl(connection)->handle(frame); connection.close(); diff --git a/cpp/src/qpid/cluster/UpdateClient.h b/cpp/src/qpid/cluster/UpdateClient.h index 93dca9f0c6..0819eb4cdb 100644 --- a/cpp/src/qpid/cluster/UpdateClient.h +++ b/cpp/src/qpid/cluster/UpdateClient.h @@ -63,9 +63,10 @@ class UpdateClient : public sys::Runnable { static const std::string UPDATE; // Name for special update queue and exchange. UpdateClient(const MemberId& updater, const MemberId& updatee, const Url&, - broker::Broker& donor, const ClusterMap& map, const std::vector<boost::intrusive_ptr<Connection> >& , - const boost::function<void()>& done, - const boost::function<void(const std::exception&)>& fail); + broker::Broker& donor, const ClusterMap& map, uint64_t sequence, + const std::vector<boost::intrusive_ptr<Connection> >& , + const boost::function<void()>& done, + const boost::function<void(const std::exception&)>& fail); ~UpdateClient(); void update(); @@ -89,6 +90,7 @@ class UpdateClient : public sys::Runnable { Url updateeUrl; broker::Broker& updaterBroker; ClusterMap map; + uint64_t frameId; std::vector<boost::intrusive_ptr<Connection> > connections; client::Connection connection, shadowConnection; client::AsyncSession session, shadowSession; diff --git a/cpp/src/qpid/framing/AMQFrame.h b/cpp/src/qpid/framing/AMQFrame.h index 02a1ea4622..028d0c1d8a 100644 --- a/cpp/src/qpid/framing/AMQFrame.h +++ b/cpp/src/qpid/framing/AMQFrame.h @@ -92,6 +92,9 @@ class AMQFrame : public AMQDataBlock, public sys::LatencyMetricTimestamp /** Must point to at least DECODE_SIZE_MIN bytes of data */ static uint16_t decodeSize(char* data); + uint64_t getClusterId() const { return clusterId; } + void setClusterId(uint64_t id) { clusterId = id; } + private: void init(); @@ -103,6 +106,7 @@ class AMQFrame : public AMQDataBlock, public sys::LatencyMetricTimestamp bool bos : 1; bool eos : 1; mutable uint32_t encodedSizeCache; + uint64_t clusterId; // Used to identify frames in a clustered broekr. }; std::ostream& operator<<(std::ostream&, const AMQFrame&); diff --git a/cpp/src/qpid/framing/FrameSet.h b/cpp/src/qpid/framing/FrameSet.h index 82987910a7..b13ca16e97 100644 --- a/cpp/src/qpid/framing/FrameSet.h +++ b/cpp/src/qpid/framing/FrameSet.h @@ -49,6 +49,7 @@ public: bool isComplete() const; uint64_t getContentSize() const; + void getContent(std::string&) const; std::string getContent() const; @@ -73,6 +74,9 @@ public: return header ? header->get<T>() : 0; } + Frames::const_iterator begin() const { return parts.begin(); } + Frames::const_iterator end() const { return parts.end(); } + const SequenceNumber& getId() const { return id; } template <class P> void remove(P predicate) { |