summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/cluster/Connection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/cluster/Connection.cpp')
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.cpp728
1 files changed, 728 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp
new file mode 100644
index 0000000000..b9895290e9
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/Connection.cpp
@@ -0,0 +1,728 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/amqp_0_10/Codecs.h"
+#include "Connection.h"
+#include "UpdateClient.h"
+#include "Cluster.h"
+#include "UpdateReceiver.h"
+#include "qpid/assert.h"
+#include "qpid/broker/SessionState.h"
+#include "qpid/broker/SemanticState.h"
+#include "qpid/broker/TxBuffer.h"
+#include "qpid/broker/TxPublish.h"
+#include "qpid/broker/TxAccept.h"
+#include "qpid/broker/RecoveredEnqueue.h"
+#include "qpid/broker/RecoveredDequeue.h"
+#include "qpid/broker/Exchange.h"
+#include "qpid/broker/Fairshare.h"
+#include "qpid/broker/Link.h"
+#include "qpid/broker/Bridge.h"
+#include "qpid/broker/StatefulQueueObserver.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/framing/enum.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/AllInvoker.h"
+#include "qpid/framing/DeliveryProperties.h"
+#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
+#include "qpid/framing/ClusterConnectionAnnounceBody.h"
+#include "qpid/framing/ConnectionCloseBody.h"
+#include "qpid/framing/ConnectionCloseOkBody.h"
+#include "qpid/log/Statement.h"
+#include "qpid/sys/ClusterSafe.h"
+#include "qpid/types/Variant.h"
+#include "qpid/management/ManagementAgent.h"
+#include <boost/current_function.hpp>
+
+
+namespace qpid {
+namespace cluster {
+
+using namespace framing;
+using namespace framing::cluster;
+using amqp_0_10::ListCodec;
+using types::Variant;
+
+qpid::sys::AtomicValue<uint64_t> Connection::catchUpId(0x5000000000000000LL);
+
+Connection::NullFrameHandler Connection::nullFrameHandler;
+
+struct NullFrameHandler : public framing::FrameHandler {
+ void handle(framing::AMQFrame&) {}
+};
+
+
+namespace {
+sys::AtomicValue<uint64_t> idCounter;
+const std::string shadowPrefix("[shadow]");
+}
+
+
+// Shadow connection
+Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
+ const std::string& mgmtId,
+ const ConnectionId& id, const qpid::sys::SecuritySettings& external)
+ : cluster(c), self(id), catchUp(false), announced(false), output(*this, out),
+ connectionCtor(&output, cluster.getBroker(), mgmtId, external, false, 0, true),
+ expectProtocolHeader(false),
+ mcastFrameHandler(cluster.getMulticast(), self),
+ updateIn(c.getUpdateReceiver()),
+ secureConnection(0)
+{}
+
+// Local connection
+Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
+ const std::string& mgmtId, MemberId member,
+ bool isCatchUp, bool isLink, const qpid::sys::SecuritySettings& external
+) : cluster(c), self(member, ++idCounter), catchUp(isCatchUp), announced(false), output(*this, out),
+ connectionCtor(&output, cluster.getBroker(),
+ mgmtId,
+ external,
+ isLink,
+ isCatchUp ? ++catchUpId : 0,
+ isCatchUp), // isCatchUp => shadow
+ expectProtocolHeader(isLink),
+ mcastFrameHandler(cluster.getMulticast(), self),
+ updateIn(c.getUpdateReceiver()),
+ secureConnection(0)
+{
+ if (isLocalClient()) {
+ giveReadCredit(cluster.getSettings().readMax); // Flow control
+ // Delay adding the connection to the management map until announce()
+ connectionCtor.delayManagement = true;
+ }
+ else {
+ // Catch-up shadow connections initialized using nextShadow id.
+ assert(catchUp);
+ if (!updateIn.nextShadowMgmtId.empty())
+ connectionCtor.mgmtId = updateIn.nextShadowMgmtId;
+ updateIn.nextShadowMgmtId.clear();
+ }
+ init();
+ QPID_LOG(debug, cluster << " local connection " << *this);
+}
+
+void Connection::setSecureConnection(broker::SecureConnection* sc) {
+ secureConnection = sc;
+ if (connection.get()) connection->setSecureConnection(sc);
+}
+
+void Connection::init() {
+ connection = connectionCtor.construct();
+ if (isLocalClient()) {
+ if (secureConnection) connection->setSecureConnection(secureConnection);
+ // Actively send cluster-order frames from local node
+ connection->setClusterOrderOutput(mcastFrameHandler);
+ }
+ else { // Shadow or catch-up connection
+ // Passive, discard cluster-order frames
+ connection->setClusterOrderOutput(nullFrameHandler);
+ // Disable client throttling, done by active node.
+ connection->setClientThrottling(false);
+ }
+ if (!isCatchUp())
+ connection->setErrorListener(this);
+}
+
+// Called when we have consumed a read buffer to give credit to the
+// connection layer to continue reading.
+void Connection::giveReadCredit(int credit) {
+ if (cluster.getSettings().readMax && credit)
+ output.giveReadCredit(credit);
+}
+
+void Connection::announce(
+ const std::string& mgmtId, uint32_t ssf, const std::string& authid, bool nodict,
+ const std::string& username, const std::string& initialFrames)
+{
+ QPID_ASSERT(mgmtId == connectionCtor.mgmtId);
+ QPID_ASSERT(ssf == connectionCtor.external.ssf);
+ QPID_ASSERT(authid == connectionCtor.external.authid);
+ QPID_ASSERT(nodict == connectionCtor.external.nodict);
+ // Local connections are already initialized but with management delayed.
+ if (isLocalClient()) {
+ connection->addManagementObject();
+ }
+ else if (isShadow()) {
+ init();
+ // Play initial frames into the connection.
+ Buffer buf(const_cast<char*>(initialFrames.data()), initialFrames.size());
+ AMQFrame frame;
+ while (frame.decode(buf))
+ connection->received(frame);
+ connection->setUserId(username);
+ }
+ // Do managment actions now that the connection is replicated.
+ connection->raiseConnectEvent();
+ QPID_LOG(debug, cluster << " replicated connection " << *this);
+}
+
+Connection::~Connection() {
+ if (connection.get()) connection->setErrorListener(0);
+ // Don't trigger cluster-safe asserts in broker:: ~Connection as
+ // it may be called in an IO thread context during broker
+ // shutdown.
+ sys::ClusterSafeScope css;
+ connection.reset();
+}
+
+bool Connection::doOutput() {
+ return output.doOutput();
+}
+
+// Received from a directly connected client.
+void Connection::received(framing::AMQFrame& f) {
+ if (!connection.get()) {
+ QPID_LOG(warning, cluster << " ignoring frame on closed connection "
+ << *this << ": " << f);
+ return;
+ }
+ QPID_LOG(trace, cluster << " RECV " << *this << ": " << f);
+ if (isLocal()) { // Local catch-up connection.
+ currentChannel = f.getChannel();
+ if (!framing::invoke(*this, *f.getBody()).wasHandled())
+ connection->received(f);
+ }
+ else { // Shadow or updated catch-up connection.
+ if (f.getMethod() && f.getMethod()->isA<ConnectionCloseBody>()) {
+ if (isShadow())
+ cluster.addShadowConnection(this);
+ AMQFrame ok((ConnectionCloseOkBody()));
+ connection->getOutput().send(ok);
+ output.closeOutput();
+ catchUp = false;
+ }
+ else
+ QPID_LOG(warning, cluster << " ignoring unexpected frame " << *this << ": " << f);
+ }
+}
+
+bool Connection::checkUnsupported(const AMQBody& body) {
+ std::string message;
+ if (body.getMethod()) {
+ switch (body.getMethod()->amqpClassId()) {
+ case DTX_CLASS_ID: message = "DTX transactions are not currently supported by cluster."; break;
+ }
+ }
+ if (!message.empty())
+ connection->close(connection::CLOSE_CODE_FRAMING_ERROR, message);
+ return !message.empty();
+}
+
+struct GiveReadCreditOnExit {
+ Connection& connection;
+ int credit;
+ GiveReadCreditOnExit(Connection& connection_, int credit_) :
+ connection(connection_), credit(credit_) {}
+ ~GiveReadCreditOnExit() { if (credit) connection.giveReadCredit(credit); }
+};
+
+void Connection::deliverDoOutput(uint32_t limit) {
+ output.deliverDoOutput(limit);
+}
+
+// Called in delivery thread, in cluster order.
+void Connection::deliveredFrame(const EventFrame& f) {
+ GiveReadCreditOnExit gc(*this, f.readCredit);
+ assert(!catchUp);
+ currentChannel = f.frame.getChannel();
+ if (f.frame.getBody() // frame can be emtpy with just readCredit
+ && !framing::invoke(*this, *f.frame.getBody()).wasHandled() // Connection contol.
+ && !checkUnsupported(*f.frame.getBody())) // Unsupported operation.
+ {
+ if (f.type == DATA) // incoming data frames to broker::Connection
+ connection->received(const_cast<AMQFrame&>(f.frame));
+ else { // frame control, send frame via SessionState
+ broker::SessionState* ss = connection->getChannel(currentChannel).getSession();
+ if (ss) ss->out(const_cast<AMQFrame&>(f.frame));
+ }
+ }
+}
+
+// A local connection is closed by the network layer. Called in the connection thread.
+void Connection::closed() {
+ try {
+ if (isUpdated()) {
+ QPID_LOG(debug, cluster << " update connection closed " << *this);
+ close();
+ cluster.updateInClosed();
+ }
+ else if (catchUp && cluster.isExpectingUpdate()) {
+ QPID_LOG(critical, cluster << " catch-up connection closed prematurely " << *this);
+ cluster.leave();
+ }
+ else if (isLocal()) {
+ // This was a local replicated connection. Multicast a deliver
+ // closed and process any outstanding frames from the cluster
+ // until self-delivery of deliver-close.
+ output.closeOutput();
+ if (announced)
+ cluster.getMulticast().mcastControl(
+ ClusterConnectionDeliverCloseBody(), self);
+ }
+ }
+ catch (const std::exception& e) {
+ QPID_LOG(error, cluster << " error closing connection " << *this << ": " << e.what());
+ }
+}
+
+// Self-delivery of close message, close the connection.
+void Connection::deliverClose () {
+ close();
+ cluster.erase(self);
+}
+
+// Close the connection
+void Connection::close() {
+ if (connection.get()) {
+ QPID_LOG(debug, cluster << " closed connection " << *this);
+ connection->closed();
+ connection.reset();
+ }
+}
+
+// The connection has sent invalid data and should be aborted.
+// All members will get the same abort since they all process the same data.
+void Connection::abort() {
+ connection->abort();
+ // Aborting the connection will result in a call to ::closed()
+ // and allow the connection to close in an orderly manner.
+}
+
+// ConnectionCodec::decode receives read buffers from directly-connected clients.
+size_t Connection::decode(const char* data, size_t size) {
+ GiveReadCreditOnExit grc(*this, 1); // Give a read credit by default.
+ const char* ptr = data;
+ const char* end = data + size;
+ if (catchUp) { // Handle catch-up locally.
+ if (!cluster.isExpectingUpdate()) {
+ QPID_LOG(error, "Rejecting unexpected catch-up connection.");
+ abort(); // Cluster is not expecting catch-up connections.
+ }
+ bool wasOpen = connection->isOpen();
+ Buffer buf(const_cast<char*>(ptr), size);
+ ptr += size;
+ while (localDecoder.decode(buf))
+ received(localDecoder.getFrame());
+ if (!wasOpen && connection->isOpen()) {
+ // Connections marked as federation links are allowed to proxy
+ // messages with user-ID that doesn't match the connection's
+ // authenticated ID. This is important for updates.
+ connection->setFederationLink(isCatchUp());
+ }
+ }
+ else { // Multicast local connections.
+ assert(isLocalClient());
+ assert(connection.get());
+ if (!checkProtocolHeader(ptr, size)) // Updates ptr
+ return 0; // Incomplete header
+
+ if (!connection->isOpen())
+ processInitialFrames(ptr, end-ptr); // Updates ptr
+
+ if (connection->isOpen() && end - ptr > 0) {
+ // We're multi-casting, we will give read credit on delivery.
+ grc.credit = 0;
+ cluster.getMulticast().mcastBuffer(ptr, end - ptr, self);
+ ptr = end;
+ }
+ }
+ return ptr - data;
+}
+
+// Decode the protocol header if needed. Updates data and size
+// returns true if the header is complete or already read.
+bool Connection::checkProtocolHeader(const char*& data, size_t size) {
+ if (expectProtocolHeader) {
+ // This is an outgoing link connection, we will receive a protocol
+ // header which needs to be decoded first
+ framing::ProtocolInitiation pi;
+ Buffer buf(const_cast<char*&>(data), size);
+ if (pi.decode(buf)) {
+ //TODO: check the version is correct
+ expectProtocolHeader = false;
+ data += pi.encodedSize();
+ } else {
+ return false;
+ }
+ }
+ return true;
+}
+
+void Connection::processInitialFrames(const char*& ptr, size_t size) {
+ // Process the initial negotiation locally and store it so
+ // it can be replayed on other brokers in announce()
+ Buffer buf(const_cast<char*>(ptr), size);
+ framing::AMQFrame frame;
+ while (!connection->isOpen() && frame.decode(buf))
+ received(frame);
+ initialFrames.append(ptr, buf.getPosition());
+ ptr += buf.getPosition();
+ if (connection->isOpen()) { // initial negotiation complete
+ cluster.getMulticast().mcastControl(
+ ClusterConnectionAnnounceBody(
+ ProtocolVersion(),
+ connectionCtor.mgmtId,
+ connectionCtor.external.ssf,
+ connectionCtor.external.authid,
+ connectionCtor.external.nodict,
+ connection->getUserId(),
+ initialFrames),
+ getId());
+ announced = true;
+ initialFrames.clear();
+ }
+}
+
+broker::SessionState& Connection::sessionState() {
+ return *connection->getChannel(currentChannel).getSession();
+}
+
+broker::SemanticState& Connection::semanticState() {
+ return sessionState().getSemanticState();
+}
+
+void Connection::shadowPrepare(const std::string& mgmtId) {
+ updateIn.nextShadowMgmtId = mgmtId;
+}
+
+void Connection::shadowSetUser(const std::string& userId) {
+ connection->setUserId(userId);
+}
+
+void Connection::consumerState(const string& name, bool blocked, bool notifyEnabled, const SequenceNumber& position)
+{
+ broker::SemanticState::ConsumerImpl& c = semanticState().find(name);
+ c.position = position;
+ c.setBlocked(blocked);
+ if (notifyEnabled) c.enableNotify(); else c.disableNotify();
+ updateIn.consumerNumbering.add(c.shared_from_this());
+}
+
+
+void Connection::sessionState(
+ const SequenceNumber& replayStart,
+ const SequenceNumber& sendCommandPoint,
+ const SequenceSet& sentIncomplete,
+ const SequenceNumber& expected,
+ const SequenceNumber& received,
+ const SequenceSet& unknownCompleted,
+ const SequenceSet& receivedIncomplete)
+{
+ sessionState().setState(
+ replayStart,
+ sendCommandPoint,
+ sentIncomplete,
+ expected,
+ received,
+ unknownCompleted,
+ receivedIncomplete);
+ QPID_LOG(debug, cluster << " received session state update for " << sessionState().getId());
+ // The output tasks will be added later in the update process.
+ connection->getOutputTasks().removeAll();
+}
+
+void Connection::outputTask(uint16_t channel, const std::string& name) {
+ broker::SessionState* session = connection->getChannel(channel).getSession();
+ if (!session)
+ throw Exception(QPID_MSG(cluster << " channel not attached " << *this
+ << "[" << channel << "] "));
+ OutputTask* task = &session->getSemanticState().find(name);
+ connection->getOutputTasks().addOutputTask(task);
+}
+
+void Connection::shadowReady(
+ uint64_t memberId, uint64_t connectionId, const string& mgmtId,
+ const string& username, const string& fragment, uint32_t sendMax)
+{
+ QPID_ASSERT(mgmtId == getBrokerConnection()->getMgmtId());
+ ConnectionId shadowId = ConnectionId(memberId, connectionId);
+ QPID_LOG(debug, cluster << " catch-up connection " << *this
+ << " becomes shadow " << shadowId);
+ self = shadowId;
+ connection->setUserId(username);
+ // OK to use decoder here because cluster is stalled for update.
+ cluster.getDecoder().get(self).setFragment(fragment.data(), fragment.size());
+ connection->setErrorListener(this);
+ output.setSendMax(sendMax);
+}
+
+void Connection::membership(const FieldTable& joiners, const FieldTable& members,
+ const framing::SequenceNumber& frameSeq)
+{
+ QPID_LOG(debug, cluster << " incoming update complete on connection " << *this);
+ updateIn.consumerNumbering.clear();
+ closeUpdated();
+ cluster.updateInDone(ClusterMap(joiners, members, frameSeq));
+}
+
+void Connection::retractOffer() {
+ QPID_LOG(info, cluster << " incoming update retracted on connection " << *this);
+ closeUpdated();
+ cluster.updateInRetracted();
+}
+
+void Connection::closeUpdated() {
+ self.second = 0; // Mark this as completed update connection.
+ if (connection.get())
+ connection->close(connection::CLOSE_CODE_NORMAL, "OK");
+}
+
+bool Connection::isLocal() const {
+ return self.first == cluster.getId() && self.second;
+}
+
+bool Connection::isShadow() const {
+ return self.first != cluster.getId();
+}
+
+bool Connection::isUpdated() const {
+ return self.first == cluster.getId() && self.second == 0;
+}
+
+
+boost::shared_ptr<broker::Queue> Connection::findQueue(const std::string& qname) {
+ boost::shared_ptr<broker::Queue> queue = cluster.getBroker().getQueues().find(qname);
+ if (!queue) throw Exception(QPID_MSG(cluster << " can't find queue " << qname));
+ return queue;
+}
+
+broker::QueuedMessage Connection::getUpdateMessage() {
+ boost::shared_ptr<broker::Queue> updateq = findQueue(UpdateClient::UPDATE);
+ assert(!updateq->isDurable());
+ broker::QueuedMessage m = updateq->get();
+ if (!m.payload) throw Exception(QPID_MSG(cluster << " empty update queue"));
+ return m;
+}
+
+void Connection::deliveryRecord(const string& qname,
+ const SequenceNumber& position,
+ const string& tag,
+ const SequenceNumber& id,
+ bool acquired,
+ bool accepted,
+ bool cancelled,
+ bool completed,
+ bool ended,
+ bool windowing,
+ bool enqueued,
+ uint32_t credit)
+{
+ broker::QueuedMessage m;
+ broker::Queue::shared_ptr queue = findQueue(qname);
+ if (!ended) { // Has a message
+ if (acquired) { // Message is on the update queue
+ m = getUpdateMessage();
+ m.queue = queue.get();
+ m.position = position;
+ if (enqueued) queue->updateEnqueued(m); //inform queue of the message
+ } else { // Message at original position in original queue
+ m = queue->find(position);
+ }
+ if (!m.payload)
+ throw Exception(QPID_MSG("deliveryRecord no update message"));
+ }
+
+ broker::DeliveryRecord dr(m, queue, tag, acquired, accepted, windowing, credit);
+ dr.setId(id);
+ if (cancelled) dr.cancel(dr.getTag());
+ if (completed) dr.complete();
+ if (ended) dr.setEnded(); // Exsitance of message
+ semanticState().record(dr); // Part of the session's unacked list.
+}
+
+void Connection::queuePosition(const string& qname, const SequenceNumber& position) {
+ findQueue(qname)->setPosition(position);
+}
+
+void Connection::queueFairshareState(const std::string& qname, const uint8_t priority, const uint8_t count)
+{
+ if (!qpid::broker::Fairshare::setState(findQueue(qname)->getMessages(), priority, count)) {
+ QPID_LOG(error, "Failed to set fair share state on queue " << qname << "; this will result in inconsistencies.");
+ }
+}
+
+
+namespace {
+ // find a StatefulQueueObserver that matches a given identifier
+ class ObserverFinder {
+ const std::string id;
+ boost::shared_ptr<broker::QueueObserver> target;
+ ObserverFinder(const ObserverFinder&) {}
+ public:
+ ObserverFinder(const std::string& _id) : id(_id) {}
+ broker::StatefulQueueObserver *getObserver()
+ {
+ if (target)
+ return dynamic_cast<broker::StatefulQueueObserver *>(target.get());
+ return 0;
+ }
+ void operator() (boost::shared_ptr<broker::QueueObserver> o)
+ {
+ if (!target) {
+ broker::StatefulQueueObserver *p = dynamic_cast<broker::StatefulQueueObserver *>(o.get());
+ if (p && p->getId() == id) {
+ target = o;
+ }
+ }
+ }
+ };
+}
+
+
+void Connection::queueObserverState(const std::string& qname, const std::string& observerId, const FieldTable& state)
+{
+ boost::shared_ptr<broker::Queue> queue(findQueue(qname));
+ ObserverFinder finder(observerId); // find this observer
+ queue->eachObserver<ObserverFinder &>(finder);
+ broker::StatefulQueueObserver *so = finder.getObserver();
+ if (so) {
+ so->setState( state );
+ QPID_LOG(debug, "updated queue observer " << observerId << "'s state on queue " << qname << "; ...");
+ return;
+ }
+ QPID_LOG(error, "Failed to find observer " << observerId << " state on queue " << qname << "; this will result in inconsistencies.");
+}
+
+void Connection::expiryId(uint64_t id) {
+ cluster.getExpiryPolicy().setId(id);
+}
+
+std::ostream& operator<<(std::ostream& o, const Connection& c) {
+ const char* type="unknown";
+ if (c.isLocal()) type = "local";
+ else if (c.isShadow()) type = "shadow";
+ else if (c.isUpdated()) type = "updated";
+ const broker::Connection* bc = c.getBrokerConnection();
+ if (bc) o << bc->getMgmtId();
+ else o << "<disconnected>";
+ return o << "(" << c.getId() << " " << type << (c.isCatchUp() ? ",catchup":"") << ")";
+}
+
+void Connection::txStart() {
+ txBuffer.reset(new broker::TxBuffer());
+}
+void Connection::txAccept(const framing::SequenceSet& acked) {
+ txBuffer->enlist(boost::shared_ptr<broker::TxAccept>(
+ new broker::TxAccept(acked, semanticState().getUnacked())));
+}
+
+void Connection::txDequeue(const std::string& queue) {
+ txBuffer->enlist(boost::shared_ptr<broker::RecoveredDequeue>(
+ new broker::RecoveredDequeue(findQueue(queue), getUpdateMessage().payload)));
+}
+
+void Connection::txEnqueue(const std::string& queue) {
+ txBuffer->enlist(boost::shared_ptr<broker::RecoveredEnqueue>(
+ new broker::RecoveredEnqueue(findQueue(queue), getUpdateMessage().payload)));
+}
+
+void Connection::txPublish(const framing::Array& queues, bool delivered) {
+ boost::shared_ptr<broker::TxPublish> txPub(new broker::TxPublish(getUpdateMessage().payload));
+ for (framing::Array::const_iterator i = queues.begin(); i != queues.end(); ++i)
+ txPub->deliverTo(findQueue((*i)->get<std::string>()));
+ txPub->delivered = delivered;
+ txBuffer->enlist(txPub);
+}
+
+void Connection::txEnd() {
+ semanticState().setTxBuffer(txBuffer);
+}
+
+void Connection::accumulatedAck(const qpid::framing::SequenceSet& s) {
+ semanticState().setAccumulatedAck(s);
+}
+
+void Connection::exchange(const std::string& encoded) {
+ Buffer buf(const_cast<char*>(encoded.data()), encoded.size());
+ broker::Exchange::shared_ptr ex = broker::Exchange::decode(cluster.getBroker().getExchanges(), buf);
+ if(ex.get() && ex->isDurable() && !ex->getName().find("amq.") == 0 && !ex->getName().find("qpid.") == 0) {
+ cluster.getBroker().getStore().create(*(ex.get()), ex->getArgs());
+ }
+ QPID_LOG(debug, cluster << " updated exchange " << ex->getName());
+}
+
+void Connection::sessionError(uint16_t , const std::string& msg) {
+ // Ignore errors before isOpen(), we're not multicasting yet.
+ if (connection->isOpen())
+ cluster.flagError(*this, ERROR_TYPE_SESSION, msg);
+}
+
+void Connection::connectionError(const std::string& msg) {
+ // Ignore errors before isOpen(), we're not multicasting yet.
+ if (connection->isOpen())
+ cluster.flagError(*this, ERROR_TYPE_CONNECTION, msg);
+}
+
+void Connection::addQueueListener(const std::string& q, uint32_t listener) {
+ if (listener >= updateIn.consumerNumbering.size())
+ throw Exception(QPID_MSG("Invalid listener ID: " << listener));
+ findQueue(q)->getListeners().addListener(updateIn.consumerNumbering[listener]);
+}
+
+//
+// This is the handler for incoming managementsetup messages.
+//
+void Connection::managementSetupState(
+ uint64_t objectNum, uint16_t bootSequence, const framing::Uuid& id,
+ const std::string& vendor, const std::string& product, const std::string& instance)
+{
+ QPID_LOG(debug, cluster << " updated management: object number="
+ << objectNum << " boot sequence=" << bootSequence
+ << " broker-id=" << id
+ << " vendor=" << vendor
+ << " product=" << product
+ << " instance=" << instance);
+ management::ManagementAgent* agent = cluster.getBroker().getManagementAgent();
+ if (!agent)
+ throw Exception(QPID_MSG("Management schema update but management not enabled."));
+ agent->setNextObjectId(objectNum);
+ agent->setBootSequence(bootSequence);
+ agent->setUuid(id);
+ agent->setName(vendor, product, instance);
+}
+
+void Connection::config(const std::string& encoded) {
+ Buffer buf(const_cast<char*>(encoded.data()), encoded.size());
+ string kind;
+ buf.getShortString (kind);
+ if (kind == "link") {
+ broker::Link::shared_ptr link =
+ broker::Link::decode(cluster.getBroker().getLinks(), buf);
+ QPID_LOG(debug, cluster << " updated link "
+ << link->getHost() << ":" << link->getPort());
+ }
+ else if (kind == "bridge") {
+ broker::Bridge::shared_ptr bridge =
+ broker::Bridge::decode(cluster.getBroker().getLinks(), buf);
+ QPID_LOG(debug, cluster << " updated bridge " << bridge->getName());
+ }
+ else throw Exception(QPID_MSG("Update failed, invalid kind of config: " << kind));
+}
+
+void Connection::doCatchupIoCallbacks() {
+ // We need to process IO callbacks during the catch-up phase in
+ // order to service asynchronous completions for messages
+ // transferred during catch-up.
+
+ if (catchUp) getBrokerConnection()->doIoCallbacks();
+}
+}} // Namespace qpid::cluster
+