diff options
Diffstat (limited to 'cpp/src/qpid/cluster')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 61 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ClusterPlugin.cpp | 1 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 5 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Event.cpp | 5 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Event.h | 6 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/EventFrame.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/EventFrame.h | 6 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/OutputInterceptor.cpp | 11 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/UpdateClient.cpp | 12 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/UpdateClient.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/UpdateExchange.h | 45 |
13 files changed, 111 insertions, 56 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index bea336644f..69a63ad83c 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -21,6 +21,7 @@ #include "Connection.h" #include "UpdateClient.h" #include "FailoverExchange.h" +#include "UpdateExchange.h" #include "qpid/assert.h" #include "qmf/org/apache/qpid/cluster/ArgsClusterStopClusterNode.h" @@ -106,13 +107,13 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : "Error delivering frames", poller), expiryPolicy(new ExpiryPolicy(boost::bind(&Cluster::isLeader, this), mcast, self, broker.getTimer())), + eventId(0), frameId(0), initialized(false), state(INIT), connections(*this), lastSize(0), - lastBroker(false), - sequence(0) + lastBroker(false) { mAgent = ManagementAgent::Singleton::getInstance(); if (mAgent != 0){ @@ -122,7 +123,13 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : mgmtObject->set_status("JOINING"); } + // Failover exchange provides membership updates to clients. failoverExchange.reset(new FailoverExchange(this)); + broker.getExchanges().registerExchange(failoverExchange); + + // Update exchange is used during updates to replicate messages without modifying delivery-properties.exchange. + broker.getExchanges().registerExchange(boost::shared_ptr<broker::Exchange>(new UpdateExchange(this))); + if (settings.quorum) quorum.init(); cpg.join(name); // pump the CPG dispatch manually till we get initialized. @@ -212,7 +219,6 @@ void Cluster::deliver( MemberId from(nodeid, pid); framing::Buffer buf(static_cast<char*>(msg), msg_len); Event e(Event::decodeCopy(from, buf)); - e.setSequence(sequence++); if (from == self) // Record self-deliveries for flow control. mcast.selfDeliver(e); deliver(e); @@ -225,34 +231,40 @@ void Cluster::deliver(const Event& e) { } // Handler for deliverEventQueue -void Cluster::deliveredEvent(const Event& e) { - QPID_LATENCY_RECORD("delivered event queue", e); +void Cluster::deliveredEvent(const Event& event) { + Event e(event); Mutex::ScopedLock l(lock); + if (state >= CATCHUP) { + e.setId(++eventId); + QPID_LOG(trace, *this << " DLVR: " << e); + } if (e.isCluster()) { // Cluster control, process in this thread. - AMQFrame frame(e.getFrame()); + EventFrame ef(e, e.getFrame()); + QPID_LOG(trace, *this << " DLVR: " << ef); ClusterDispatcher dispatch(*this, e.getConnectionId().getMember(), l); - if (!framing::invoke(dispatch, *frame.getBody()).wasHandled()) + if (!framing::invoke(dispatch, *ef.frame.getBody()).wasHandled()) throw Exception(QPID_MSG("Invalid cluster control")); } - else if (state >= CATCHUP) { // Connection frame, push onto deliver queue. - if (e.getType() == CONTROL) + else if (state >= CATCHUP) { // Handle connection frames + if (e.getType() == CONTROL) { connectionFrame(EventFrame(e, e.getFrame())); + } else connections.decode(e, e.getData()); } - else // connection frame && state < CATCHUP. Drop. - QPID_LOG(trace, *this << " DROP: " << e); + // Drop connection frames while state < CATCHUP } // Handler for deliverFrameQueue -void Cluster::deliveredFrame(const EventFrame& e) { +void Cluster::deliveredFrame(const EventFrame& event) { Mutex::ScopedLock l(lock); // TODO aconway 2009-03-02: don't need this lock? + EventFrame e(event); assert(!e.isCluster()); // Only connection frames on this queue. - QPID_LOG(trace, *this << " DLVR: " << e); - if (e.type == DATA) // Sequence number to identify data frames. - const_cast<AMQFrame&>(e.frame).setClusterId(frameId++); + QPID_LOG(trace, *this << " DLVR: " << e); + if (e.type == DATA) // Add cluster-id to to data frames. + e.frame.setClusterId(frameId++); boost::intrusive_ptr<Connection> connection = connections.get(e.connectionId); - if (connection) // Ignore frames to closed local connections. + if (connection) // Ignore frames to closed local connections. connection->deliveredFrame(e); } @@ -389,6 +401,10 @@ void Cluster::stall(Lock&) { // Stop processing the deliveredEventQueue in order to send or // recieve an update. deliverEventQueue.stop(); + + // FIXME aconway 2009-03-04: if frame queue is re-enabled, we must + // also wait for it to be empty before we are stalled, so that + // our local model is up-to-date to give an update. } void Cluster::unstall(Lock&) { @@ -434,17 +450,18 @@ void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock& l) { cs.password = settings.password; cs.mechanism = settings.mechanism; updateThread = Thread( - new UpdateClient(self, updatee, url, broker, map, frameId, connections.values(), + new UpdateClient(self, updatee, url, broker, map, eventId, frameId, connections.values(), boost::bind(&Cluster::updateOutDone, this), boost::bind(&Cluster::updateOutError, this, _1), cs)); } // Called in update thread. -void Cluster::updateInDone(const ClusterMap& m, uint64_t fid) { +void Cluster::updateInDone(const ClusterMap& m, uint64_t eventId_, uint64_t frameId_) { Lock l(lock); updatedMap = m; - frameId = fid; + eventId = eventId_; + frameId = frameId_; checkUpdateIn(l); } @@ -601,9 +618,11 @@ void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) { } void Cluster::connectionFrame(const EventFrame& frame) { - // FIXME aconway 2009-03-02: bypassing deliverFrameQueue to avoid race condition. - // Measure performance impact, restore with better locking. + // FIXME aconway 2009-03-02: bypass deliverFrameQueue to avoid race condition. + // Measure performance impact & review. + // // deliverFrameQueue.push(frame); + // deliveredFrame(frame); } diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index 4d358cf495..898ec2879f 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -88,7 +88,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { void leave(); // Update completed - called in update thread - void updateInDone(const ClusterMap&, uint64_t frameId); + void updateInDone(const ClusterMap&, uint64_t eventId, uint64_t frameId); MemberId getId() const; broker::Broker& getBroker() const; @@ -214,6 +214,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { // Used only in deliveredFrame thread ClusterMap::Set elders; boost::intrusive_ptr<ExpiryPolicy> expiryPolicy; + uint64_t eventId; // FIXME aconway 2009-03-04: review use for thread safety frame-q thread re-enabled. uint64_t frameId; // Used only during initialization @@ -238,7 +239,6 @@ class Cluster : private Cpg::Handler, public management::Manageable { ClusterMap map; size_t lastSize; bool lastBroker; - uint64_t sequence; // Update related sys::Thread updateThread; diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp index 132043f91a..adb6621caf 100644 --- a/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -138,7 +138,6 @@ struct ClusterPlugin : public Plugin { broker->setConnectionFactory( boost::shared_ptr<sys::ConnectionCodec::Factory>( new ConnectionCodec::Factory(broker->getConnectionFactory(), *cluster))); - broker->getExchanges().registerExchange(cluster->getFailoverExchange()); ManagementBroker* mgmt = dynamic_cast<ManagementBroker*>(ManagementAgent::Singleton::getInstance()); if (mgmt) { std::auto_ptr<IdAllocator> allocator(new UpdateClientIdAllocator()); diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 0f71a91293..4391b3eccb 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -280,6 +280,7 @@ void Connection::sessionState( const SequenceSet& unknownCompleted, const SequenceSet& receivedIncomplete) { + sessionState().setState( replayStart, sendCommandPoint, @@ -299,9 +300,9 @@ void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const str clusterDecoder.setFragment(fragment.data(), fragment.size()); } -void Connection::membership(const FieldTable& joiners, const FieldTable& members, uint64_t frameId) { +void Connection::membership(const FieldTable& joiners, const FieldTable& members, uint64_t eventId, uint64_t frameId) { QPID_LOG(debug, cluster << " incoming update complete on connection " << *this); - cluster.updateInDone(ClusterMap(joiners, members), frameId); + cluster.updateInDone(ClusterMap(joiners, members), eventId, frameId); self.second = 0; // Mark this as completed update connection. } diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h index 048008f2a5..9f126d68c4 100644 --- a/cpp/src/qpid/cluster/Connection.h +++ b/cpp/src/qpid/cluster/Connection.h @@ -123,7 +123,7 @@ class Connection : void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& username, const std::string& fragment); - void membership(const framing::FieldTable&, const framing::FieldTable&, uint64_t frameId); + void membership(const framing::FieldTable&, const framing::FieldTable&, uint64_t eventId, uint64_t frameId); void deliveryRecord(const std::string& queue, const framing::SequenceNumber& position, diff --git a/cpp/src/qpid/cluster/Event.cpp b/cpp/src/qpid/cluster/Event.cpp index 749fbf240f..ccb4d3ede8 100644 --- a/cpp/src/qpid/cluster/Event.cpp +++ b/cpp/src/qpid/cluster/Event.cpp @@ -44,7 +44,7 @@ const size_t EventHeader::HEADER_SIZE = ; EventHeader::EventHeader(EventType t, const ConnectionId& c, size_t s) - : type(t), connectionId(c), size(s), sequence(0) {} + : type(t), connectionId(c), size(s), id(0) {} Event::Event() {} @@ -128,8 +128,7 @@ std::ostream& operator << (std::ostream& o, EventType t) { } std::ostream& operator << (std::ostream& o, const EventHeader& e) { - o << "[event " << e.getConnectionId() << "/" << e.getSequence() - << " " << e.getType() << " " << e.getSize() << " bytes]"; + o << "Event[id=" << e.getId() << " connection=" << e.getConnectionId() << " " << e.getType() << " " << e.getSize() << " bytes]"; return o; } diff --git a/cpp/src/qpid/cluster/Event.h b/cpp/src/qpid/cluster/Event.h index c9f44725df..382a550015 100644 --- a/cpp/src/qpid/cluster/Event.h +++ b/cpp/src/qpid/cluster/Event.h @@ -57,8 +57,8 @@ class EventHeader : public ::qpid::sys::LatencyMetricTimestamp { /** Size of header + payload. */ size_t getStoreSize() { return size + HEADER_SIZE; } - uint64_t getSequence() const { return sequence; } - void setSequence(uint64_t n) { sequence = n; } + uint64_t getId() const { return id; } + void setId(uint64_t n) { id = n; } bool isCluster() const { return connectionId.getNumber() == 0; } bool isConnection() const { return connectionId.getNumber() != 0; } @@ -69,7 +69,7 @@ class EventHeader : public ::qpid::sys::LatencyMetricTimestamp { EventType type; ConnectionId connectionId; size_t size; - uint64_t sequence; + uint64_t id; }; /** diff --git a/cpp/src/qpid/cluster/EventFrame.cpp b/cpp/src/qpid/cluster/EventFrame.cpp index 48c9eab958..4de76eafbe 100644 --- a/cpp/src/qpid/cluster/EventFrame.cpp +++ b/cpp/src/qpid/cluster/EventFrame.cpp @@ -24,16 +24,16 @@ namespace qpid { namespace cluster { -EventFrame::EventFrame() : sequence(0) {} +EventFrame::EventFrame() : eventId(0) {} EventFrame::EventFrame(const EventHeader& e, const framing::AMQFrame& f, int rc) - : connectionId(e.getConnectionId()), frame(f), sequence(e.getSequence()), readCredit(rc), type(e.getType()) + : connectionId(e.getConnectionId()), frame(f), eventId(e.getId()), readCredit(rc), type(e.getType()) { QPID_LATENCY_INIT(frame); } std::ostream& operator<<(std::ostream& o, const EventFrame& e) { - return o << e.connectionId << "/" << e.sequence << " " << e.frame << " rc=" << e.readCredit << " type=" << e.type; + return o << e.frame << "(from event " << e.eventId << " read-credit=" << e.readCredit << ")"; } }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/EventFrame.h b/cpp/src/qpid/cluster/EventFrame.h index abeea3ef16..bb2d9d5493 100644 --- a/cpp/src/qpid/cluster/EventFrame.h +++ b/cpp/src/qpid/cluster/EventFrame.h @@ -49,14 +49,14 @@ struct EventFrame // True if this frame follows immediately after frame e. bool follows(const EventFrame& e) const { - return sequence == e.sequence || (sequence == e.sequence+1 && e.readCredit); + return eventId == e.eventId || (eventId == e.eventId+1 && e.readCredit); } - bool operator<(const EventFrame& e) const { return sequence < e.sequence; } + bool operator<(const EventFrame& e) const { return eventId < e.eventId; } ConnectionId connectionId; framing::AMQFrame frame; - uint64_t sequence; + uint64_t eventId; int readCredit; ///< last frame in an event, give credit when processed. EventType type; }; diff --git a/cpp/src/qpid/cluster/OutputInterceptor.cpp b/cpp/src/qpid/cluster/OutputInterceptor.cpp index 45a369eea9..cd42446016 100644 --- a/cpp/src/qpid/cluster/OutputInterceptor.cpp +++ b/cpp/src/qpid/cluster/OutputInterceptor.cpp @@ -70,17 +70,12 @@ void OutputInterceptor::giveReadCredit(int32_t credit) { // Called in write thread when the IO layer has no more data to write. // We do nothing in the write thread, we run doOutput only on delivery // of doOutput requests. -bool OutputInterceptor::doOutput() { - QPID_LOG(trace, parent << " write idle."); - return false; -} +bool OutputInterceptor::doOutput() { return false; } // Delivery of doOutput allows us to run the real connection doOutput() // which tranfers frames to the codec for writing. // void OutputInterceptor::deliverDoOutput(size_t requested) { - QPID_LATENCY_RECORD("deliver do-output", *this); - QPID_LATENCY_CLEAR(*this); size_t buf = getBuffered(); if (parent.isLocal()) writeEstimate.delivered(requested, sent, buf); // Update the estimate. @@ -91,9 +86,7 @@ void OutputInterceptor::deliverDoOutput(size_t requested) { moreOutput = parent.getBrokerConnection().doOutput(); } while (sent < requested && moreOutput); sent += buf; // Include buffered data in the sent total. - - QPID_LOG(trace, "Delivered doOutput: requested=" << requested << " output=" << sent << " more=" << moreOutput); - + QPID_LOG(trace, parent << " delivereDoOutput: requested=" << requested << " sent=" << sent << " more=" << moreOutput); if (parent.isLocal() && moreOutput) { QPID_LOG(trace, parent << " deliverDoOutput - sending doOutput, more output available."); sendDoOutput(); diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp index 9cba377122..7e349905ab 100644 --- a/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/cpp/src/qpid/cluster/UpdateClient.cpp @@ -86,14 +86,14 @@ void send(client::AsyncSession& s, const AMQBody& body) { // TODO aconway 2008-09-24: optimization: update connections/sessions in parallel. UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, const Url& url, - broker::Broker& broker, const ClusterMap& m, uint64_t frameId_, + broker::Broker& broker, const ClusterMap& m, uint64_t eventId_, uint64_t frameId_, const Cluster::Connections& cons, const boost::function<void()>& ok, const boost::function<void(const std::exception&)>& fail, const client::ConnectionSettings& cs ) : updaterId(updater), updateeId(updatee), updateeUrl(url), updaterBroker(broker), map(m), - frameId(frameId_), connections(cons), + eventId(eventId_), frameId(frameId_), connections(cons), connection(catchUpConnection()), shadowConnection(catchUpConnection()), done(ok), failed(fail), connectionSettings(cs) { @@ -104,7 +104,7 @@ UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, con UpdateClient::~UpdateClient() {} // Reserved exchange/queue name for catch-up, avoid clashes with user queues/exchanges. -const std::string UpdateClient::UPDATE("qpid.qpid-update"); +const std::string UpdateClient::UPDATE("qpid.cluster-update"); void UpdateClient::run() { try { @@ -120,9 +120,6 @@ void UpdateClient::update() { QPID_LOG(debug, updaterId << " updating state to " << updateeId << " at " << updateeUrl); Broker& b = updaterBroker; b.getExchanges().eachExchange(boost::bind(&UpdateClient::updateExchange, this, _1)); - - // Update exchange is used to route messages to the proper queue without modifying routing key. - session.exchangeDeclare(arg::exchange=UPDATE, arg::type="fanout", arg::autoDelete=true); b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueue, this, _1)); // Update queue is used to transfer acquired messages that are no longer on their original queue. session.queueDeclare(arg::queue=UPDATE, arg::autoDelete=true); @@ -133,6 +130,7 @@ void UpdateClient::update() { ClusterConnectionMembershipBody membership; map.toMethodBody(membership); + membership.setEventId(eventId); membership.setFrameId(frameId); AMQFrame frame(membership); client::ConnectionAccess::getImpl(connection)->handle(frame); @@ -274,7 +272,7 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) { SequenceNumber received = ss->receiverGetReceived().command; if (inProgress) --received; - + // Reset command-sequence state. proxy.sessionState( ss->senderGetReplayPoint().command, diff --git a/cpp/src/qpid/cluster/UpdateClient.h b/cpp/src/qpid/cluster/UpdateClient.h index 08267392f4..d6b821904f 100644 --- a/cpp/src/qpid/cluster/UpdateClient.h +++ b/cpp/src/qpid/cluster/UpdateClient.h @@ -63,7 +63,7 @@ class UpdateClient : public sys::Runnable { static const std::string UPDATE; // Name for special update queue and exchange. UpdateClient(const MemberId& updater, const MemberId& updatee, const Url&, - broker::Broker& donor, const ClusterMap& map, uint64_t sequence, + broker::Broker& donor, const ClusterMap& map, uint64_t eventId, uint64_t frameId, const std::vector<boost::intrusive_ptr<Connection> >& , const boost::function<void()>& done, const boost::function<void(const std::exception&)>& fail, @@ -92,6 +92,7 @@ class UpdateClient : public sys::Runnable { Url updateeUrl; broker::Broker& updaterBroker; ClusterMap map; + uint64_t eventId; uint64_t frameId; std::vector<boost::intrusive_ptr<Connection> > connections; client::Connection connection, shadowConnection; diff --git a/cpp/src/qpid/cluster/UpdateExchange.h b/cpp/src/qpid/cluster/UpdateExchange.h new file mode 100644 index 0000000000..7a4a484c8a --- /dev/null +++ b/cpp/src/qpid/cluster/UpdateExchange.h @@ -0,0 +1,45 @@ +#ifndef QPID_CLUSTER_UPDATEEXCHANGE_H +#define QPID_CLUSTER_UPDATEEXCHANGE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "UpdateClient.h" +#include "qpid/broker/FanOutExchange.h" + + +namespace qpid { +namespace cluster { + +/** + * A keyless exchange (like fanout exchange) that does not modify deliver-properties.exchange + * on messages. + */ +class UpdateExchange : public broker::FanOutExchange +{ + public: + UpdateExchange(management::Manageable* parent) : broker::Exchange(UpdateClient::UPDATE, parent), broker::FanOutExchange(UpdateClient::UPDATE, parent) {} + void setProperties(const boost::intrusive_ptr<broker::Message>&) {} +}; + +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_UPDATEEXCHANGE_H*/ |