summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/cluster/Connection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/cluster/Connection.cpp')
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.cpp53
1 files changed, 36 insertions, 17 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp
index aa7d082720..97cafbabaa 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.cpp
+++ b/qpid/cpp/src/qpid/cluster/Connection.cpp
@@ -39,8 +39,6 @@
#include "qpid/framing/ConnectionCloseBody.h"
#include "qpid/framing/ConnectionCloseOkBody.h"
#include "qpid/log/Statement.h"
-#include "qpid/sys/LatencyMetric.h"
-#include "qpid/sys/AtomicValue.h"
#include <boost/current_function.hpp>
@@ -56,8 +54,16 @@ namespace qpid {
namespace cluster {
using namespace framing;
+using namespace framing::cluster;
+
+qpid::sys::AtomicValue<uint64_t> Connection::catchUpId(0x5000000000000000LL);
+
+Connection::NullFrameHandler Connection::nullFrameHandler;
+
+struct NullFrameHandler : public framing::FrameHandler {
+ void handle(framing::AMQFrame&) {}
+};
-NoOpConnectionOutputHandler Connection::discardHandler;
namespace {
sys::AtomicValue<uint64_t> idCounter;
@@ -89,6 +95,8 @@ void Connection::init() {
connection.setClusterOrderOutput(nullFrameHandler); // Passive, discard cluster-order frames
connection.setClientThrottling(false); // Disable client throttling, done by active node.
}
+ if (!isCatchUp())
+ connection.setErrorListener(this);
}
void Connection::giveReadCredit(int credit) {
@@ -97,6 +105,7 @@ void Connection::giveReadCredit(int credit) {
}
Connection::~Connection() {
+ connection.setErrorListener(0);
QPID_LOG(debug, cluster << " deleted connection: " << *this);
}
@@ -126,7 +135,7 @@ void Connection::received(framing::AMQFrame& f) {
cluster.addShadowConnection(this);
AMQFrame ok((ConnectionCloseOkBody()));
connection.getOutput().send(ok);
- output.closeOutput(discardHandler);
+ output.closeOutput();
catchUp = false;
}
else
@@ -156,8 +165,8 @@ void Connection::deliveredFrame(const EventFrame& f) {
{
if (f.type == DATA) // incoming data frames to broker::Connection
connection.received(const_cast<AMQFrame&>(f.frame));
- else { // frame control, send frame via SessionState
- broker::SessionState* ss = connection.getChannel(f.frame.getChannel()).getSession();
+ else { // frame control, send frame via SessionState
+ broker::SessionState* ss = connection.getChannel(currentChannel).getSession();
if (ss) ss->out(const_cast<AMQFrame&>(f.frame));
}
}
@@ -180,7 +189,7 @@ void Connection::closed() {
// This was a local replicated connection. Multicast a deliver
// closed and process any outstanding frames from the cluster
// until self-delivery of deliver-close.
- output.closeOutput(discardHandler);
+ output.closeOutput();
cluster.getMulticast().mcastControl(ClusterConnectionDeliverCloseBody(), self);
}
}
@@ -275,13 +284,14 @@ void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const str
QPID_LOG(debug, cluster << " catch-up connection " << *this << " becomes shadow " << shadowId);
self = shadowId;
connection.setUserId(username);
- // OK to use decoder here because we are stalled for update.
+ // OK to use decoder here because cluster is stalled for update.
cluster.getDecoder().get(self).setFragment(fragment.data(), fragment.size());
+ connection.setErrorListener(this);
}
-void Connection::membership(const FieldTable& joiners, const FieldTable& members) {
+void Connection::membership(const FieldTable& joiners, const FieldTable& members, uint64_t frameSeq) {
QPID_LOG(debug, cluster << " incoming update complete on connection " << *this);
- cluster.updateInDone(ClusterMap(joiners, members));
+ cluster.updateInDone(ClusterMap(joiners, members, frameSeq));
self.second = 0; // Mark this as completed update connection.
}
@@ -305,7 +315,9 @@ shared_ptr<broker::Queue> Connection::findQueue(const std::string& qname) {
}
broker::QueuedMessage Connection::getUpdateMessage() {
- broker::QueuedMessage m = findQueue(UpdateClient::UPDATE)->get();
+ shared_ptr<broker::Queue> updateq = findQueue(UpdateClient::UPDATE);
+ assert(!updateq->isDurable());
+ broker::QueuedMessage m = updateq->get();
if (!m.payload) throw Exception(QPID_MSG(cluster << " empty update queue"));
return m;
}
@@ -342,15 +354,15 @@ void Connection::deliveryRecord(const string& qname,
// If the message was unacked, the newbie broker must place
// it in its messageStore.
- if ( m.payload && m.payload->isPersistent() && !completed && !ended && !accepted && !cancelled )
+ if ( m.payload && m.payload->isPersistent() && acquired && !ended)
queue->enqueue ( 0, m.payload );
}
void Connection::queuePosition(const string& qname, const SequenceNumber& position) {
- shared_ptr<broker::Queue> q = cluster.getBroker().getQueues().find(qname);
- if (!q) throw InvalidArgumentException(QPID_MSG("Invalid queue name " << qname));
- q->setPosition(position);
-}
+ shared_ptr<broker::Queue> q = cluster.getBroker().getQueues().find(qname);
+ if (!q) throw InvalidArgumentException(QPID_MSG("Invalid queue name " << qname));
+ q->setPosition(position);
+ }
void Connection::expiryId(uint64_t id) {
cluster.getExpiryPolicy().setId(id);
@@ -407,7 +419,14 @@ void Connection::queue(const std::string& encoded) {
QPID_LOG(debug, cluster << " decoded queue " << q->getName());
}
-qpid::sys::AtomicValue<uint64_t> Connection::catchUpId(0x5000000000000000LL);
+void Connection::sessionError(uint16_t , const std::string& ) {
+ cluster.flagError(*this, ERROR_TYPE_SESSION);
+
+}
+
+void Connection::connectionError(const std::string& ) {
+ cluster.flagError(*this, ERROR_TYPE_CONNECTION);
+}
}} // namespace qpid::cluster