diff options
Diffstat (limited to 'qpid/cpp/src/qpid/cluster/Connection.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Connection.cpp | 53 |
1 files changed, 36 insertions, 17 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp index aa7d082720..97cafbabaa 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.cpp +++ b/qpid/cpp/src/qpid/cluster/Connection.cpp @@ -39,8 +39,6 @@ #include "qpid/framing/ConnectionCloseBody.h" #include "qpid/framing/ConnectionCloseOkBody.h" #include "qpid/log/Statement.h" -#include "qpid/sys/LatencyMetric.h" -#include "qpid/sys/AtomicValue.h" #include <boost/current_function.hpp> @@ -56,8 +54,16 @@ namespace qpid { namespace cluster { using namespace framing; +using namespace framing::cluster; + +qpid::sys::AtomicValue<uint64_t> Connection::catchUpId(0x5000000000000000LL); + +Connection::NullFrameHandler Connection::nullFrameHandler; + +struct NullFrameHandler : public framing::FrameHandler { + void handle(framing::AMQFrame&) {} +}; -NoOpConnectionOutputHandler Connection::discardHandler; namespace { sys::AtomicValue<uint64_t> idCounter; @@ -89,6 +95,8 @@ void Connection::init() { connection.setClusterOrderOutput(nullFrameHandler); // Passive, discard cluster-order frames connection.setClientThrottling(false); // Disable client throttling, done by active node. } + if (!isCatchUp()) + connection.setErrorListener(this); } void Connection::giveReadCredit(int credit) { @@ -97,6 +105,7 @@ void Connection::giveReadCredit(int credit) { } Connection::~Connection() { + connection.setErrorListener(0); QPID_LOG(debug, cluster << " deleted connection: " << *this); } @@ -126,7 +135,7 @@ void Connection::received(framing::AMQFrame& f) { cluster.addShadowConnection(this); AMQFrame ok((ConnectionCloseOkBody())); connection.getOutput().send(ok); - output.closeOutput(discardHandler); + output.closeOutput(); catchUp = false; } else @@ -156,8 +165,8 @@ void Connection::deliveredFrame(const EventFrame& f) { { if (f.type == DATA) // incoming data frames to broker::Connection connection.received(const_cast<AMQFrame&>(f.frame)); - else { // frame control, send frame via SessionState - broker::SessionState* ss = connection.getChannel(f.frame.getChannel()).getSession(); + else { // frame control, send frame via SessionState + broker::SessionState* ss = connection.getChannel(currentChannel).getSession(); if (ss) ss->out(const_cast<AMQFrame&>(f.frame)); } } @@ -180,7 +189,7 @@ void Connection::closed() { // This was a local replicated connection. Multicast a deliver // closed and process any outstanding frames from the cluster // until self-delivery of deliver-close. - output.closeOutput(discardHandler); + output.closeOutput(); cluster.getMulticast().mcastControl(ClusterConnectionDeliverCloseBody(), self); } } @@ -275,13 +284,14 @@ void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const str QPID_LOG(debug, cluster << " catch-up connection " << *this << " becomes shadow " << shadowId); self = shadowId; connection.setUserId(username); - // OK to use decoder here because we are stalled for update. + // OK to use decoder here because cluster is stalled for update. cluster.getDecoder().get(self).setFragment(fragment.data(), fragment.size()); + connection.setErrorListener(this); } -void Connection::membership(const FieldTable& joiners, const FieldTable& members) { +void Connection::membership(const FieldTable& joiners, const FieldTable& members, uint64_t frameSeq) { QPID_LOG(debug, cluster << " incoming update complete on connection " << *this); - cluster.updateInDone(ClusterMap(joiners, members)); + cluster.updateInDone(ClusterMap(joiners, members, frameSeq)); self.second = 0; // Mark this as completed update connection. } @@ -305,7 +315,9 @@ shared_ptr<broker::Queue> Connection::findQueue(const std::string& qname) { } broker::QueuedMessage Connection::getUpdateMessage() { - broker::QueuedMessage m = findQueue(UpdateClient::UPDATE)->get(); + shared_ptr<broker::Queue> updateq = findQueue(UpdateClient::UPDATE); + assert(!updateq->isDurable()); + broker::QueuedMessage m = updateq->get(); if (!m.payload) throw Exception(QPID_MSG(cluster << " empty update queue")); return m; } @@ -342,15 +354,15 @@ void Connection::deliveryRecord(const string& qname, // If the message was unacked, the newbie broker must place // it in its messageStore. - if ( m.payload && m.payload->isPersistent() && !completed && !ended && !accepted && !cancelled ) + if ( m.payload && m.payload->isPersistent() && acquired && !ended) queue->enqueue ( 0, m.payload ); } void Connection::queuePosition(const string& qname, const SequenceNumber& position) { - shared_ptr<broker::Queue> q = cluster.getBroker().getQueues().find(qname); - if (!q) throw InvalidArgumentException(QPID_MSG("Invalid queue name " << qname)); - q->setPosition(position); -} + shared_ptr<broker::Queue> q = cluster.getBroker().getQueues().find(qname); + if (!q) throw InvalidArgumentException(QPID_MSG("Invalid queue name " << qname)); + q->setPosition(position); + } void Connection::expiryId(uint64_t id) { cluster.getExpiryPolicy().setId(id); @@ -407,7 +419,14 @@ void Connection::queue(const std::string& encoded) { QPID_LOG(debug, cluster << " decoded queue " << q->getName()); } -qpid::sys::AtomicValue<uint64_t> Connection::catchUpId(0x5000000000000000LL); +void Connection::sessionError(uint16_t , const std::string& ) { + cluster.flagError(*this, ERROR_TYPE_SESSION); + +} + +void Connection::connectionError(const std::string& ) { + cluster.flagError(*this, ERROR_TYPE_CONNECTION); +} }} // namespace qpid::cluster |