diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/cluster.mk | 5 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 178 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 96 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 33 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.h | 10 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ConnectionMap.cpp | 84 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ConnectionMap.h | 84 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Decoder.cpp | 60 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Decoder.h | 57 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Event.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Event.h | 5 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/EventFrame.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/EventFrame.h | 8 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/LockedConnectionMap.h | 62 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/UpdateClient.cpp | 11 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/UpdateClient.h | 8 | ||||
-rw-r--r-- | cpp/src/qpid/framing/AMQFrame.cpp | 3 |
17 files changed, 362 insertions, 352 deletions
diff --git a/cpp/src/cluster.mk b/cpp/src/cluster.mk index 925b1d1e68..e2054d75e9 100644 --- a/cpp/src/cluster.mk +++ b/cpp/src/cluster.mk @@ -40,6 +40,8 @@ cluster_la_SOURCES = \ $(CMAN_SOURCES) \ qpid/cluster/Cluster.cpp \ qpid/cluster/Cluster.h \ + qpid/cluster/Decoder.cpp \ + qpid/cluster/Decoder.h \ qpid/cluster/PollableQueue.h \ qpid/cluster/ClusterMap.cpp \ qpid/cluster/ClusterMap.h \ @@ -49,8 +51,6 @@ cluster_la_SOURCES = \ qpid/cluster/Connection.h \ qpid/cluster/ConnectionCodec.cpp \ qpid/cluster/ConnectionCodec.h \ - qpid/cluster/ConnectionMap.h \ - qpid/cluster/ConnectionMap.cpp \ qpid/cluster/Cpg.cpp \ qpid/cluster/Cpg.h \ qpid/cluster/Dispatchable.h \ @@ -65,6 +65,7 @@ cluster_la_SOURCES = \ qpid/cluster/FailoverExchange.cpp \ qpid/cluster/FailoverExchange.h \ qpid/cluster/UpdateExchange.h \ + qpid/cluster/LockedConnectionMap.h \ qpid/cluster/Multicaster.cpp \ qpid/cluster/Multicaster.h \ qpid/cluster/McastFrameHandler.h \ diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 8946a71446..169d0fb1af 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -107,11 +107,11 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : boost::bind(&Cluster::leave, this), "Error delivering frames", poller), - connections(*this), - frameId(0), initialized(false), + decoder(boost::bind(&Cluster::deliverFrame, this, _1)), + discarding(true), state(INIT), - eventId(0), + frameId(0), lastSize(0), lastBroker(false) { @@ -156,14 +156,19 @@ void Cluster::initialize() { // Called in connection thread to insert a client connection. void Cluster::addLocalConnection(const boost::intrusive_ptr<Connection>& c) { - connections.insert(c); + localConnections.insert(c); } // Called in connection thread to insert an updated shadow connection. void Cluster::addShadowConnection(const boost::intrusive_ptr<Connection>& c) { - connections.insert(c); + // Safe to use connections here because we're pre-catchup, either + // discarding or stalled, so deliveredFrame is not processing any + // connection events. + assert(discarding); + connections.insert(ConnectionMap::value_type(c->getId(), c)); } +// Called by Connection::deliverClose() in deliverFrameQueue thread. void Cluster::erase(const ConnectionId& id) { connections.erase(id); } @@ -195,7 +200,6 @@ void Cluster::leave(Lock&) { if (state != LEFT) { state = LEFT; QPID_LOG(notice, *this << " leaving cluster " << name); - connections.clear(); try { broker.shutdown(); } catch (const std::exception& e) { QPID_LOG(critical, *this << " error during broker shutdown: " << e.what()); @@ -217,58 +221,89 @@ void Cluster::deliver( Event e(Event::decodeCopy(from, buf)); if (from == self) // Record self-deliveries for flow control. mcast.selfDeliver(e); - deliver(e); + deliverEvent(e); } -void Cluster::deliver(const Event& e) { +void Cluster::deliverEvent(const Event& e) { deliverEventQueue.push(e); } +void Cluster::deliverFrame(const EventFrame& e) { + deliverFrameQueue.push(e); +} + // Handler for deliverEventQueue. -// This thread executes cluster controls and decodes connection data events. -void Cluster::deliveredEvent(const Event& event) { - Event e(event); - Mutex::ScopedLock l(lock); - if (state >= CATCHUP) { - e.setId(++eventId); +// This thread decodes frames from events. +void Cluster::deliveredEvent(const Event& e) { QPID_LOG(trace, *this << " DLVR: " << e); - } - if (e.isCluster()) { // Cluster control, process in this thread. + if (e.isCluster()) { EventFrame ef(e, e.getFrame()); - QPID_LOG(trace, *this << " DLVR: " << ef); - ClusterDispatcher dispatch(*this, e.getConnectionId().getMember(), l); - if (!framing::invoke(dispatch, *ef.frame.getBody()).wasHandled()) - throw Exception(QPID_MSG("Invalid cluster control")); + // Stop the deliverEventQueue on update offers. + // This preserves the connection decoder fragments for an update. + ClusterUpdateOfferBody* offer = dynamic_cast<ClusterUpdateOfferBody*>(ef.frame.getBody()); + if (offer) + deliverEventQueue.stop(); + deliverFrame(ef); } - else if (state >= CATCHUP) { // Handle connection frames - if (e.getType() == CONTROL) - connectionFrame(EventFrame(e, e.getFrame())); + else if(!discarding) { + if (e.isControl()) + deliverFrame(EventFrame(e, e.getFrame())); else - connections.decode(e, e.getData()); - } - // Drop connection frames while state < CATCHUP + decoder.decode(e, e.getData()); } - -void Cluster::connectionFrame(const EventFrame& frame) { - deliverFrameQueue.push(frame); + else // Discard connection events if discarding is set. + QPID_LOG(trace, *this << " DROP: " << e); } // Handler for deliverFrameQueue. -// This thread executes connection control and data frames. -void Cluster::deliveredFrame(const EventFrame& event) { - // No lock, only use connections, not Cluster state. - EventFrame e(event); - if(!e.frame.getBody()) { // marks the stall point, start the update task. - updateThread=Thread(*updateTask); +// This thread executes the main logic. +void Cluster::deliveredFrame(const EventFrame& e) { + Mutex::ScopedLock l(lock); + if (e.isCluster()) { + QPID_LOG(trace, *this << " DLVR: " << e); + ClusterDispatcher dispatch(*this, e.connectionId.getMember(), l); + if (!framing::invoke(dispatch, *e.frame.getBody()).wasHandled()) + throw Exception(QPID_MSG("Invalid cluster control")); } - else { + else if (state >= CATCHUP) { QPID_LOG(trace, *this << " DLVR: " << e); - if (e.type == DATA) // Add cluster-id to to data frames. - e.frame.setClusterId(frameId++); - boost::intrusive_ptr<Connection> connection = connections.get(e.connectionId); + EventFrame ef(e); // Non-const copy + if (ef.type == DATA) // Add cluster-id to to data frames. + ef.frame.setClusterId(frameId++); + ConnectionPtr connection = getConnection(e.connectionId, l); if (connection) connection->deliveredFrame(e); } + else // Drop connection frames while state < CATCHUP + QPID_LOG(trace, *this << " DROP: " << e); +} + +// Called in deliverFrameQueue thread +ConnectionPtr Cluster::getConnection(const ConnectionId& id, Lock&) { + ConnectionPtr cp; + ConnectionMap::iterator i = connections.find(id); + if (i != connections.end()) + cp = i->second; + else { + if(id.getMember() == self) + cp = localConnections.getErase(id); + else { + // New remote connection, create a shadow. + std::ostringstream mgmtId; + mgmtId << id; + cp = new Connection(*this, shadowOut, mgmtId.str(), id); + } + if (cp) + connections.insert(ConnectionMap::value_type(id, cp)); + } + return cp; +} + +Cluster::ConnectionVector Cluster::getConnections(Lock&) { + ConnectionVector result(connections.size()); + std::transform(connections.begin(), connections.end(), result.begin(), + boost::bind(&ConnectionMap::value_type::second, _1)); + return result; } struct AddrList { @@ -316,7 +351,7 @@ void Cluster::configChange ( std::string addresses; for (cpg_address* p = current; p < current+nCurrent; ++p) addresses.append(MemberId(*p).str()); - deliver(Event::control(ClusterConfigChangeBody(ProtocolVersion(), addresses), self)); + deliverEvent(Event::control(ClusterConfigChangeBody(ProtocolVersion(), addresses), self)); } void Cluster::setReady(Lock&) { @@ -337,6 +372,7 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& if (state == INIT) { // First configChange if (map.aliveCount() == 1) { setClusterId(true, l); + discarding = false; setReady(l); map = ClusterMap(self, myUrl, true); memberUpdate(l); @@ -396,28 +432,18 @@ void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) { } } -void Cluster::stall(Lock&) { - // Stop processing the deliveredEventQueue in order to send or - // recieve an update. - deliverEventQueue.stop(); -} - -void Cluster::unstall(Lock&) { - // Stop processing the deliveredEventQueue in order to send or - // recieve an update. - deliverEventQueue.start(); -} - void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uuid& uuid, Lock& l) { + // NOTE: deliverEventQueue has been stopped at the update offer by + // deliveredEvent in case an update is required. if (state == LEFT) return; MemberId updatee(updateeInt); boost::optional<Url> url = map.updateOffer(updater, updatee); if (updater == self) { assert(state == OFFER); - if (url) { // My offer was first. + if (url) // My offer was first. updateStart(updatee, *url, l); - } else { // Another offer was first. + deliverEventQueue.start(); // Don't need to update setReady(l); QPID_LOG(info, *this << " cancelled update offer to " << updatee); makeOffer(map.firstJoiner(), l); // Maybe make another offer. @@ -428,50 +454,48 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uu setClusterId(uuid, l); state = UPDATEE; QPID_LOG(info, *this << " receiving update from " << updater); - stall(l); checkUpdateIn(l); } + else + deliverEventQueue.start(); // Don't need to update } void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock& l) { + // NOTE: deliverEventQueue is already stopped at the stall point by deliveredEvent. if (state == LEFT) return; assert(state == OFFER); state = UPDATER; - QPID_LOG(info, *this << " stall for update to " << updatee << " at " << url); - stall(l); - + QPID_LOG(info, *this << " sending update to " << updatee << " at " << url); if (updateThread.id()) updateThread.join(); // Join the previous updateThread to avoid leaks. client::ConnectionSettings cs; cs.username = settings.username; cs.password = settings.password; cs.mechanism = settings.mechanism; - updateTask = new UpdateClient(self, updatee, url, broker, map, eventId, frameId, connections.values(), + updateThread = Thread( + new UpdateClient(self, updatee, url, broker, map, frameId, getConnections(l), decoder, boost::bind(&Cluster::updateOutDone, this), boost::bind(&Cluster::updateOutError, this, _1), - cs); - // Push an empty frame onto the deliverFrameQueue to mark the stall point. - // The deliverFrameQueue thread will start the update at that point. - deliverFrameQueue.push(EventFrame(EventHeader(), AMQFrame())); + cs)); } // Called in update thread. -void Cluster::updateInDone(const ClusterMap& m, uint64_t eventId_, uint64_t frameId_) { +void Cluster::updateInDone(const ClusterMap& m, uint64_t frameId_) { Lock l(lock); updatedMap = m; - eventId = eventId_; - // Safe to use frameId here because we are stalled: deliveredFrame cannot be called concurrently. + // Safe to set frameId here because we are stalled: deliveredFrame cannot be called concurrently. frameId = frameId_; checkUpdateIn(l); } -void Cluster::checkUpdateIn(Lock& l) { +void Cluster::checkUpdateIn(Lock&) { if (state == UPDATEE && updatedMap) { map = *updatedMap; mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self); state = CATCHUP; + discarding = false; // ok to set, we're stalled for update. QPID_LOG(info, *this << " received update, starting catch-up"); - unstall(l); + deliverEventQueue.start(); } } @@ -485,7 +509,7 @@ void Cluster::updateOutDone(Lock& l) { assert(state == UPDATER); state = READY; mcast.release(); - unstall(l); + deliverEventQueue.start(); // Start processing events again. makeOffer(map.firstJoiner(), l); // Try another offer } @@ -569,15 +593,13 @@ void Cluster::memberUpdate(Lock& l) { mgmtObject->set_memberIDs(idstr); } - // Generate a deliver-close control frame for connections - // belonging to defunct members, so they will be erased in the - // deliverFrameQueue thread. - ConnectionMap::Vector c = connections.values(); - for (ConnectionMap::Vector::iterator i = c.begin(); i != c.end(); ++i) { - ConnectionId cid = (*i)->getId(); - MemberId mid = cid.getMember(); - if (mid != self && !map.isMember(mid)) - connectionFrame(EventFrame(EventHeader(CONTROL, cid), AMQFrame(ClusterConnectionDeliverCloseBody()))); + // Erase connections belonging to members that have left the cluster. + ConnectionMap::iterator i = connections.begin(); + while (i != connections.end()) { + ConnectionMap::iterator j = i++; + MemberId m = j->second->getId().getMember(); + if (m != self && !map.isMember(m)) + connections.erase(j); } } diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index 3e25815b8e..5e66db0097 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -19,33 +19,34 @@ * */ -#include "ClusterSettings.h" #include "ClusterMap.h" -#include "ConnectionMap.h" +#include "ClusterSettings.h" #include "Cpg.h" +#include "Decoder.h" #include "Event.h" +#include "EventFrame.h" +#include "ExpiryPolicy.h" #include "FailoverExchange.h" +#include "LockedConnectionMap.h" #include "Multicaster.h" -#include "EventFrame.h" #include "NoOpConnectionOutputHandler.h" +#include "PollableQueue.h" #include "PollerDispatch.h" #include "Quorum.h" -#include "PollableQueue.h" -#include "ExpiryPolicy.h" +#include "qmf/org/apache/qpid/cluster/Cluster.h" +#include "qpid/Url.h" #include "qpid/broker/Broker.h" -#include "qpid/sys/Monitor.h" #include "qpid/management/Manageable.h" -#include "qpid/Url.h" -#include "qmf/org/apache/qpid/cluster/Cluster.h" +#include "qpid/sys/Monitor.h" -#include <boost/intrusive_ptr.hpp> #include <boost/bind.hpp> +#include <boost/intrusive_ptr.hpp> #include <boost/optional.hpp> #include <algorithm> -#include <vector> #include <map> +#include <vector> namespace qpid { @@ -57,6 +58,7 @@ class Uuid; namespace cluster { class Connection; +class EventFrame; /** * Connection to the cluster @@ -64,7 +66,7 @@ class Connection; class Cluster : private Cpg::Handler, public management::Manageable { public: typedef boost::intrusive_ptr<Connection> ConnectionPtr; - typedef std::vector<ConnectionPtr> Connections; + typedef std::vector<ConnectionPtr> ConnectionVector; // Public functions are thread safe unless otherwise mentioned in a comment. @@ -90,7 +92,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { void leave(); // Update completed - called in update thread - void updateInDone(const ClusterMap&, uint64_t eventId, uint64_t frameId); + void updateInDone(const ClusterMap&, uint64_t frameId); MemberId getId() const; broker::Broker& getBroker() const; @@ -101,15 +103,19 @@ class Cluster : private Cpg::Handler, public management::Manageable { size_t getReadMax() { return readMax; } size_t getWriteEstimate() { return writeEstimate; } - // Process a connection frame. Called by Connection with decoded frames. - // Thread safety: only called in deliverEventQueue thread. - void connectionFrame(const EventFrame&); + void deliverFrame(const EventFrame&); + + // Called only during update by Connection::shadowReady + Decoder& getDecoder() { return decoder; } private: typedef sys::Monitor::ScopedLock Lock; typedef PollableQueue<Event> PollableEventQueue; typedef PollableQueue<EventFrame> PollableFrameQueue; + typedef std::map<ConnectionId, ConnectionPtr> ConnectionMap; + + // FIXME aconway 2009-03-07: sort functions by thread // NB: A dummy Lock& parameter marks functions that must only be // called with Cluster::lock locked. @@ -118,33 +124,33 @@ class Cluster : private Cpg::Handler, public management::Manageable { std::vector<std::string> getIds(Lock&) const; std::vector<Url> getUrls(Lock&) const; - // Make an offer if we can - called in deliver thread. - void makeOffer(const MemberId&, Lock&); - - // Called in main thread from Broker destructor. + // == Called in main thread from Broker destructor. void brokerShutdown(); + // == Called in deliverEventQueue thread + void deliveredEvent(const Event&); + + // == Called in deliverFrameQueue thread + void deliveredFrame(const EventFrame&); + // Cluster controls implement XML methods from cluster.xml. - // Called in deliverEventQueue thread. void updateRequest(const MemberId&, const std::string&, Lock&); void updateOffer(const MemberId& updater, uint64_t updatee, const framing::Uuid&, Lock&); void ready(const MemberId&, const std::string&, Lock&); void configChange(const MemberId&, const std::string& addresses, Lock& l); void messageExpired(const MemberId&, uint64_t, Lock& l); void shutdown(const MemberId&, Lock&); - // Helper, called by updateOffer. - void updateStart(const MemberId& updatee, const Url& url, Lock&); - - // Used by cluster controls. - void stall(Lock&); - void unstall(Lock&); - - // Handlers for pollable queues. - void deliveredEvent(const Event&); - void deliveredFrame(const EventFrame&); + // Helper functions + ConnectionPtr getConnection(const ConnectionId&, Lock&); + ConnectionVector getConnections(Lock&); + void updateStart(const MemberId& updatee, const Url& url, Lock&); + void makeOffer(const MemberId&, Lock&); void setReady(Lock&); + void memberUpdate(Lock&); + void setClusterId(const framing::Uuid&, Lock&); + // == Called in CPG dispatch thread void deliver( // CPG deliver callback. cpg_handle_t /*handle*/, struct cpg_name *group, @@ -153,7 +159,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { void* /*msg*/, int /*msg_len*/); - void deliver(const Event&); + void deliverEvent(const Event&); void configChange( // CPG config change callback. cpg_handle_t /*handle*/, @@ -163,23 +169,21 @@ class Cluster : private Cpg::Handler, public management::Manageable { struct cpg_address */*joined*/, int /*nJoined*/ ); + // == Called in management threads. virtual qpid::management::ManagementObject* GetManagementObject() const; virtual management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text); void stopClusterNode(Lock&); void stopFullCluster(Lock&); - void memberUpdate(Lock&); - // Called in connection IO threads . + // == Called in connection IO threads . void checkUpdateIn(Lock&); - // Called in UpdateClient thread. + // == Called in UpdateClient thread. void updateOutDone(); void updateOutError(const std::exception&); void updateOutDone(Lock&); - void setClusterId(const framing::Uuid&, Lock&); - // Immutable members set on construction, never changed. ClusterSettings settings; broker::Broker& broker; @@ -203,17 +207,23 @@ class Cluster : private Cpg::Handler, public management::Manageable { PollableFrameQueue deliverFrameQueue; boost::shared_ptr<FailoverExchange> failoverExchange; Quorum quorum; - ConnectionMap connections; - - // Used only in deliverFrameQueue thread or in deliverEventQueue thread when stalled. - uint64_t frameId; + LockedConnectionMap localConnections; // Used only during initialization bool initialized; - // Remaining members are protected by lock + // Used only in deliverEventQueue thread or when stalled for update. + Decoder decoder; + bool discarding; + + // Remaining members are protected by lock. + // FIXME aconway 2009-03-06: Most of these members are also only used in + // deliverFrameQueue thread or during stall. Review and separate members + // that require a lock, drop lock when not needed. + // mutable sys::Monitor lock; + // Local cluster state, cluster map enum { INIT, ///< Initial state, no CPG messages received. @@ -226,13 +236,13 @@ class Cluster : private Cpg::Handler, public management::Manageable { LEFT ///< Final state, left the cluster. } state; - uint64_t eventId; + ConnectionMap connections; + uint64_t frameId; ClusterMap map; ClusterMap::Set elders; size_t lastSize; bool lastBroker; sys::Thread updateThread; - sys::Runnable* updateTask; boost::optional<ClusterMap> updatedMap; diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 4391b3eccb..1889b37e9f 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -150,7 +150,8 @@ bool Connection::checkUnsupported(const AMQBody& body) { void Connection::deliveredFrame(const EventFrame& f) { assert(!catchUp); currentChannel = f.frame.getChannel(); - if (!framing::invoke(*this, *f.frame.getBody()).wasHandled() // Connection contol. + if (f.frame.getBody() // frame can be emtpy with just readCredit + && !framing::invoke(*this, *f.frame.getBody()).wasHandled() // Connection contol. && !checkUnsupported(*f.frame.getBody())) // Unsupported operation. { if (f.type == DATA) // incoming data frames to broker::Connection @@ -234,29 +235,6 @@ size_t Connection::decode(const char* buffer, size_t size) { return size; } -// Decode a data event, a read buffer that has been delivered by the cluster. -void Connection::decode(const EventHeader& eh, const void* data) { - assert(eh.getType() == DATA); // Only handle connection data events. - const char* cp = static_cast<const char*>(data); - Buffer buf(const_cast<char*>(cp), eh.getSize()); - if (clusterDecoder.decode(buf)) { // Decoded a frame - AMQFrame frame(clusterDecoder.getFrame()); - while (clusterDecoder.decode(buf)) { - cluster.connectionFrame(EventFrame(eh, frame)); - frame = clusterDecoder.getFrame(); - } - // Set read-credit on the last frame ending in this event. - // Credit will be given when this frame is processed. - cluster.connectionFrame(EventFrame(eh, frame, 1)); - } - else { - // We must give 1 unit read credit per event. - // This event does not complete any frames so - // we give read credit directly. - giveReadCredit(1); - } -} - broker::SessionState& Connection::sessionState() { return *connection.getChannel(currentChannel).getSession(); } @@ -297,12 +275,13 @@ void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const str QPID_LOG(debug, cluster << " catch-up connection " << *this << " becomes shadow " << shadowId); self = shadowId; connection.setUserId(username); - clusterDecoder.setFragment(fragment.data(), fragment.size()); + // OK to use decoder here because we are stalled for update. + cluster.getDecoder().get(self).setFragment(fragment.data(), fragment.size()); } -void Connection::membership(const FieldTable& joiners, const FieldTable& members, uint64_t eventId, uint64_t frameId) { +void Connection::membership(const FieldTable& joiners, const FieldTable& members, uint64_t frameId) { QPID_LOG(debug, cluster << " incoming update complete on connection " << *this); - cluster.updateInDone(ClusterMap(joiners, members), eventId, frameId); + cluster.updateInDone(ClusterMap(joiners, members), frameId); self.second = 0; // Mark this as completed update connection. } diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h index 9f126d68c4..0659015672 100644 --- a/cpp/src/qpid/cluster/Connection.h +++ b/cpp/src/qpid/cluster/Connection.h @@ -34,8 +34,8 @@ #include "qpid/sys/AtomicValue.h" #include "qpid/sys/ConnectionInputHandler.h" #include "qpid/sys/ConnectionOutputHandler.h" -#include "qpid/framing/FrameDecoder.h" #include "qpid/framing/SequenceNumber.h" +#include "qpid/framing/FrameDecoder.h" #include <iosfwd> @@ -103,9 +103,6 @@ class Connection : // ConnectionCodec methods - called by IO layer with a read buffer. size_t decode(const char* buffer, size_t size); - // Decode a data event, a read buffer that has been delivered by the cluster. - void decode(const EventHeader& eh, const void* data); - // Called for data delivered from the cluster. void deliveredFrame(const EventFrame&); @@ -123,7 +120,7 @@ class Connection : void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& username, const std::string& fragment); - void membership(const framing::FieldTable&, const framing::FieldTable&, uint64_t eventId, uint64_t frameId); + void membership(const framing::FieldTable&, const framing::FieldTable&, uint64_t frameId); void deliveryRecord(const std::string& queue, const framing::SequenceNumber& position, @@ -153,8 +150,6 @@ class Connection : void giveReadCredit(int credit); - framing::FrameDecoder& getDecoder() { return clusterDecoder; } - private: struct NullFrameHandler : public framing::FrameHandler { void handle(framing::AMQFrame&) {} @@ -179,7 +174,6 @@ class Connection : WriteEstimate writeEstimate; OutputInterceptor output; framing::FrameDecoder localDecoder; - framing::FrameDecoder clusterDecoder; broker::Connection connection; framing::SequenceNumber deliverSeq; framing::ChannelId currentChannel; diff --git a/cpp/src/qpid/cluster/ConnectionMap.cpp b/cpp/src/qpid/cluster/ConnectionMap.cpp deleted file mode 100644 index d4b2aa6675..0000000000 --- a/cpp/src/qpid/cluster/ConnectionMap.cpp +++ /dev/null @@ -1,84 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "ConnectionMap.h" -#include "Cluster.h" -#include "qpid/framing/reply_exceptions.h" -#include "qpid/log/Statement.h" -#include "qpid/assert.h" - -namespace qpid { -namespace cluster { - -using framing::InternalErrorException; -typedef sys::Mutex::ScopedLock Lock; - -void ConnectionMap::insert(ConnectionPtr p) { - Lock l(lock); - std::pair<Map::iterator, bool> ib = map.insert(Map::value_type(p->getId(), p)); - QPID_ASSERT(ib.second); -} - -void ConnectionMap::erase(const ConnectionId& id) { - Lock l(lock); - size_t erased = map.erase(id); - assert(erased); - (void)erased; // Avoid unused variable warnings. -} - -ConnectionMap::ConnectionPtr ConnectionMap::get(const ConnectionId& id) { - Lock l(lock); - Map::const_iterator i = map.find(id); - if (i == map.end()) { - // Deleted local connection. - if(id.getMember() == cluster.getId()) - return 0; - // New remote connection, create a shadow. - std::ostringstream mgmtId; - mgmtId << id; - ConnectionPtr cp = new Connection(cluster, shadowOut, mgmtId.str(), id); - std::pair<Map::iterator, bool> ib = map.insert(Map::value_type(id, cp)); - QPID_ASSERT(ib.second); - i = ib.first; - } - return i->second; -} - -ConnectionMap::Vector ConnectionMap::values() const { - Lock l(lock); - Vector result(map.size()); - std::transform(map.begin(), map.end(), result.begin(), - boost::bind(&Map::value_type::second, _1)); - return result; -} - -void ConnectionMap::clear() { - Lock l(lock); - map.clear(); -} - -void ConnectionMap::decode(const EventHeader& eh, const void* data) { - ConnectionPtr connection = get(eh.getConnectionId()); - if (connection) - connection->decode(eh, data); -} - - -}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/ConnectionMap.h b/cpp/src/qpid/cluster/ConnectionMap.h deleted file mode 100644 index b449f329b1..0000000000 --- a/cpp/src/qpid/cluster/ConnectionMap.h +++ /dev/null @@ -1,84 +0,0 @@ -#ifndef QPID_CLUSTER_CONNECTIONMAP_H -#define QPID_CLUSTER_CONNECTIONMAP_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "types.h" -#include "Connection.h" -#include "ClusterMap.h" -#include "NoOpConnectionOutputHandler.h" -#include "qpid/sys/Mutex.h" -#include <boost/intrusive_ptr.hpp> -#include <map> - -namespace qpid { -namespace cluster { - -class Cluster; - -/** - * Thread safe map of connections. The map is used in: - * - deliver thread to look connections and create new shadow connections. - * - local catch-up connection threads to add a caught-up shadow connections. - * - local client connection threads when local connections are created. - */ -class ConnectionMap { - public: - typedef boost::intrusive_ptr<cluster::Connection> ConnectionPtr; - typedef std::vector<ConnectionPtr> Vector; - - ConnectionMap(Cluster& c) : cluster(c) {} - - /** Insert a local connection or a caught up shadow connection. - * Called in local connection thread. - */ - void insert(ConnectionPtr p); - - /** Erase a closed connection. Called in deliver thread. */ - void erase(const ConnectionId& id); - - /** Get an existing connection. Returns 0 if id is a closed local - * connections, frames for closed connections should be ignored. - */ - ConnectionPtr get(const ConnectionId& id); - - /** Get connections for sending an update. */ - Vector values() const; - - /** Decode a connection data event. */ - void decode(const EventHeader& eh, const void* data); - - void clear(); - size_t size() const; - - private: - typedef std::map<ConnectionId, ConnectionPtr> Map; - - mutable sys::Mutex lock; - Cluster& cluster; - NoOpConnectionOutputHandler shadowOut; - Map map; -}; - - -}} // namespace qpid::cluster - -#endif /*!QPID_CLUSTER_CONNECTIONMAP_H*/ diff --git a/cpp/src/qpid/cluster/Decoder.cpp b/cpp/src/qpid/cluster/Decoder.cpp new file mode 100644 index 0000000000..4de586d89f --- /dev/null +++ b/cpp/src/qpid/cluster/Decoder.cpp @@ -0,0 +1,60 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "Decoder.h" +#include "EventFrame.h" +#include "qpid/framing/ClusterConnectionDeliverCloseBody.h" +#include "qpid/framing/Buffer.h" +#include "qpid/framing/AMQFrame.h" + + +namespace qpid { +namespace cluster { + +void Decoder::decode(const EventHeader& eh, const char* data) { + assert(eh.getType() == DATA); // Only handle connection data events. + const char* cp = static_cast<const char*>(data); + framing::Buffer buf(const_cast<char*>(cp), eh.getSize()); + framing::FrameDecoder& decoder = map[eh.getConnectionId()]; + if (decoder.decode(buf)) { // Decoded a frame + framing::AMQFrame frame(decoder.getFrame()); + while (decoder.decode(buf)) { + process(EventFrame(eh, frame)); + frame = decoder.getFrame(); + } + // Set read-credit on the last frame ending in this event. + // Credit will be given when this frame is processed. + process(EventFrame(eh, frame, 1)); + } + else { + // We must give 1 unit read credit per event. + // This event does not complete any frames so + // send an empty frame with the read credit. + process(EventFrame(EventHeader(), framing::AMQFrame(), 1)); + } +} + +void Decoder::process(const EventFrame& ef) { + if (ef.frame.getMethod() && ef.frame.getMethod()->isA<framing::ClusterConnectionDeliverCloseBody>()) + map.erase(ef.connectionId); + callback(ef); +} + +}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Decoder.h b/cpp/src/qpid/cluster/Decoder.h new file mode 100644 index 0000000000..acde4258a2 --- /dev/null +++ b/cpp/src/qpid/cluster/Decoder.h @@ -0,0 +1,57 @@ +#ifndef QPID_CLUSTER_DECODER_H +#define QPID_CLUSTER_DECODER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "types.h" +#include "qpid/framing/FrameDecoder.h" +#include <boost/function.hpp> +#include <map> + +namespace qpid { +namespace cluster { + +class EventFrame; +class EventHeader; + +/** + * A map of decoders for connections. + */ +class Decoder +{ + public: + typedef boost::function<void(const EventFrame&)> FrameHandler; + + Decoder(FrameHandler fh) : callback(fh) {} + void decode(const EventHeader& eh, const char* data); + void erase(const ConnectionId&); + framing::FrameDecoder& get(const ConnectionId& c) { return map[c]; } + + private: + typedef std::map<ConnectionId, framing::FrameDecoder> Map; + Map map; + void process(const EventFrame&); + FrameHandler callback; +}; +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_DECODER_H*/ diff --git a/cpp/src/qpid/cluster/Event.cpp b/cpp/src/qpid/cluster/Event.cpp index ccb4d3ede8..1cb010c266 100644 --- a/cpp/src/qpid/cluster/Event.cpp +++ b/cpp/src/qpid/cluster/Event.cpp @@ -44,7 +44,7 @@ const size_t EventHeader::HEADER_SIZE = ; EventHeader::EventHeader(EventType t, const ConnectionId& c, size_t s) - : type(t), connectionId(c), size(s), id(0) {} + : type(t), connectionId(c), size(s) {} Event::Event() {} @@ -128,7 +128,7 @@ std::ostream& operator << (std::ostream& o, EventType t) { } std::ostream& operator << (std::ostream& o, const EventHeader& e) { - o << "Event[id=" << e.getId() << " connection=" << e.getConnectionId() << " " << e.getType() << " " << e.getSize() << " bytes]"; + o << "Event[" << e.getConnectionId() << " " << e.getType() << " " << e.getSize() << " bytes]"; return o; } diff --git a/cpp/src/qpid/cluster/Event.h b/cpp/src/qpid/cluster/Event.h index 382a550015..e05ad60bcf 100644 --- a/cpp/src/qpid/cluster/Event.h +++ b/cpp/src/qpid/cluster/Event.h @@ -57,11 +57,9 @@ class EventHeader : public ::qpid::sys::LatencyMetricTimestamp { /** Size of header + payload. */ size_t getStoreSize() { return size + HEADER_SIZE; } - uint64_t getId() const { return id; } - void setId(uint64_t n) { id = n; } - bool isCluster() const { return connectionId.getNumber() == 0; } bool isConnection() const { return connectionId.getNumber() != 0; } + bool isControl() const { return type == CONTROL; } protected: static const size_t HEADER_SIZE; @@ -69,7 +67,6 @@ class EventHeader : public ::qpid::sys::LatencyMetricTimestamp { EventType type; ConnectionId connectionId; size_t size; - uint64_t id; }; /** diff --git a/cpp/src/qpid/cluster/EventFrame.cpp b/cpp/src/qpid/cluster/EventFrame.cpp index 4de76eafbe..1a57528c3a 100644 --- a/cpp/src/qpid/cluster/EventFrame.cpp +++ b/cpp/src/qpid/cluster/EventFrame.cpp @@ -24,16 +24,16 @@ namespace qpid { namespace cluster { -EventFrame::EventFrame() : eventId(0) {} +EventFrame::EventFrame() {} EventFrame::EventFrame(const EventHeader& e, const framing::AMQFrame& f, int rc) - : connectionId(e.getConnectionId()), frame(f), eventId(e.getId()), readCredit(rc), type(e.getType()) + : connectionId(e.getConnectionId()), frame(f), readCredit(rc), type(e.getType()) { QPID_LATENCY_INIT(frame); } std::ostream& operator<<(std::ostream& o, const EventFrame& e) { - return o << e.frame << "(from event " << e.eventId << " read-credit=" << e.readCredit << ")"; + return o << e.frame << " " << e.type << " " << e.connectionId << " read-credit=" << e.readCredit; } }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/EventFrame.h b/cpp/src/qpid/cluster/EventFrame.h index bb2d9d5493..d6ff58dd38 100644 --- a/cpp/src/qpid/cluster/EventFrame.h +++ b/cpp/src/qpid/cluster/EventFrame.h @@ -47,16 +47,8 @@ struct EventFrame bool isLastInEvent() const { return readCredit; } - // True if this frame follows immediately after frame e. - bool follows(const EventFrame& e) const { - return eventId == e.eventId || (eventId == e.eventId+1 && e.readCredit); - } - - bool operator<(const EventFrame& e) const { return eventId < e.eventId; } - ConnectionId connectionId; framing::AMQFrame frame; - uint64_t eventId; int readCredit; ///< last frame in an event, give credit when processed. EventType type; }; diff --git a/cpp/src/qpid/cluster/LockedConnectionMap.h b/cpp/src/qpid/cluster/LockedConnectionMap.h new file mode 100644 index 0000000000..8b2f6dae8e --- /dev/null +++ b/cpp/src/qpid/cluster/LockedConnectionMap.h @@ -0,0 +1,62 @@ +#ifndef QPID_CLUSTER_LOCKEDCONNECTIONMAP_H +#define QPID_CLUSTER_LOCKEDCONNECTIONMAP_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "types.h" +#include "qpid/sys/Mutex.h" +#include "Connection.h" + +namespace qpid { +namespace cluster { + +/** + * Thread safe map of connections. + */ +class LockedConnectionMap +{ + public: + void insert(const ConnectionPtr& c) { + sys::Mutex::ScopedLock l(lock); + map[c->getId()] = c; + } + + ConnectionPtr getErase(const ConnectionId& c) { + sys::Mutex::ScopedLock l(lock); + Map::iterator i = map.find(c); + if (i != map.end()) { + ConnectionPtr cp = i->second; + map.erase(i); + return cp; + } + else + return 0; + } + + private: + typedef std::map<ConnectionId, ConnectionPtr> Map; + mutable sys::Mutex lock; + Map map; +}; +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_LOCKEDCONNECTIONMAP_H*/ diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp index d5e6635c45..cf1633e40b 100644 --- a/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/cpp/src/qpid/cluster/UpdateClient.cpp @@ -22,6 +22,7 @@ #include "Cluster.h" #include "ClusterMap.h" #include "Connection.h" +#include "Decoder.h" #include "qpid/client/SessionBase_0_10Access.h" #include "qpid/client/ConnectionAccess.h" #include "qpid/broker/Broker.h" @@ -86,14 +87,14 @@ void send(client::AsyncSession& s, const AMQBody& body) { // TODO aconway 2008-09-24: optimization: update connections/sessions in parallel. UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, const Url& url, - broker::Broker& broker, const ClusterMap& m, uint64_t eventId_, uint64_t frameId_, - const Cluster::Connections& cons, + broker::Broker& broker, const ClusterMap& m, uint64_t frameId_, + const Cluster::ConnectionVector& cons, Decoder& decoder_, const boost::function<void()>& ok, const boost::function<void(const std::exception&)>& fail, const client::ConnectionSettings& cs ) : updaterId(updater), updateeId(updatee), updateeUrl(url), updaterBroker(broker), map(m), - eventId(eventId_), frameId(frameId_), connections(cons), + frameId(frameId_), connections(cons), decoder(decoder_), connection(catchUpConnection()), shadowConnection(catchUpConnection()), done(ok), failed(fail), connectionSettings(cs) { @@ -130,7 +131,6 @@ void UpdateClient::update() { ClusterConnectionMembershipBody membership; map.toMethodBody(membership); - membership.setEventId(eventId); membership.setFrameId(frameId); AMQFrame frame(membership); client::ConnectionAccess::getImpl(connection)->handle(frame); @@ -232,7 +232,8 @@ void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& upda connectionSettings.maxFrameSize = bc.getFrameMax(); shadowConnection.open(updateeUrl, connectionSettings); bc.eachSessionHandler(boost::bind(&UpdateClient::updateSession, this, _1)); - std::pair<const char*, size_t> fragment = updateConnection->getDecoder().getFragment(); + // Safe to use decoder here because we are stalled for update. + std::pair<const char*, size_t> fragment = decoder.get(updateConnection->getId()).getFragment(); ClusterConnectionProxy(shadowConnection).shadowReady( updateConnection->getId().getMember(), updateConnection->getId().getNumber(), diff --git a/cpp/src/qpid/cluster/UpdateClient.h b/cpp/src/qpid/cluster/UpdateClient.h index d6b821904f..a0813d0a17 100644 --- a/cpp/src/qpid/cluster/UpdateClient.h +++ b/cpp/src/qpid/cluster/UpdateClient.h @@ -46,6 +46,7 @@ class SessionHandler; class DeliveryRecord; class SessionState; class SemanticState; +class Decoder; } // namespace broker @@ -54,6 +55,7 @@ namespace cluster { class Cluster; class Connection; class ClusterMap; +class Decoder; /** * A client that updates the contents of a local broker to a remote one using AMQP. @@ -63,8 +65,8 @@ class UpdateClient : public sys::Runnable { static const std::string UPDATE; // Name for special update queue and exchange. UpdateClient(const MemberId& updater, const MemberId& updatee, const Url&, - broker::Broker& donor, const ClusterMap& map, uint64_t eventId, uint64_t frameId, - const std::vector<boost::intrusive_ptr<Connection> >& , + broker::Broker& donor, const ClusterMap& map, uint64_t frameId, + const std::vector<boost::intrusive_ptr<Connection> >&, Decoder&, const boost::function<void()>& done, const boost::function<void(const std::exception&)>& fail, const client::ConnectionSettings& @@ -92,9 +94,9 @@ class UpdateClient : public sys::Runnable { Url updateeUrl; broker::Broker& updaterBroker; ClusterMap map; - uint64_t eventId; uint64_t frameId; std::vector<boost::intrusive_ptr<Connection> > connections; + Decoder& decoder; client::Connection connection, shadowConnection; client::AsyncSession session, shadowSession; boost::function<void()> done; diff --git a/cpp/src/qpid/framing/AMQFrame.cpp b/cpp/src/qpid/framing/AMQFrame.cpp index 80c8e0b56d..e56cf5c546 100644 --- a/cpp/src/qpid/framing/AMQFrame.cpp +++ b/cpp/src/qpid/framing/AMQFrame.cpp @@ -35,13 +35,14 @@ void AMQFrame::init() { subchannel=0; channel=0; encodedSizeCache = 0; + clusterId = 0; } AMQFrame::AMQFrame(const boost::intrusive_ptr<AMQBody>& b) : body(b) { init(); } AMQFrame::AMQFrame(const AMQBody& b) : body(b.clone()) { init(); } -AMQFrame::~AMQFrame() {} +AMQFrame::~AMQFrame() { init(); } AMQBody* AMQFrame::getBody() { // Non-const AMQBody* may be used to modify the body. |