summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Connection.cpp
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2011-05-27 15:44:23 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2011-05-27 15:44:23 +0000
commit66765100f4257159622cefe57bed50125a5ad017 (patch)
treea88ee23bb194eb91f0ebb2d9b23ff423e3ea8e37 /cpp/src/qpid/cluster/Connection.cpp
parent1aeaa7b16e5ce54f10c901d75c4d40f9f88b9db6 (diff)
parent88b98b2f4152ef59a671fad55a0d08338b6b78ca (diff)
downloadqpid-python-rajith_jms_client.tar.gz
Creating a branch for experimenting with some ideas for JMS client.rajith_jms_client
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rajith_jms_client@1128369 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Connection.cpp')
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp728
1 files changed, 0 insertions, 728 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
deleted file mode 100644
index b9895290e9..0000000000
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ /dev/null
@@ -1,728 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "qpid/amqp_0_10/Codecs.h"
-#include "Connection.h"
-#include "UpdateClient.h"
-#include "Cluster.h"
-#include "UpdateReceiver.h"
-#include "qpid/assert.h"
-#include "qpid/broker/SessionState.h"
-#include "qpid/broker/SemanticState.h"
-#include "qpid/broker/TxBuffer.h"
-#include "qpid/broker/TxPublish.h"
-#include "qpid/broker/TxAccept.h"
-#include "qpid/broker/RecoveredEnqueue.h"
-#include "qpid/broker/RecoveredDequeue.h"
-#include "qpid/broker/Exchange.h"
-#include "qpid/broker/Fairshare.h"
-#include "qpid/broker/Link.h"
-#include "qpid/broker/Bridge.h"
-#include "qpid/broker/StatefulQueueObserver.h"
-#include "qpid/broker/Queue.h"
-#include "qpid/framing/enum.h"
-#include "qpid/framing/AMQFrame.h"
-#include "qpid/framing/AllInvoker.h"
-#include "qpid/framing/DeliveryProperties.h"
-#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
-#include "qpid/framing/ClusterConnectionAnnounceBody.h"
-#include "qpid/framing/ConnectionCloseBody.h"
-#include "qpid/framing/ConnectionCloseOkBody.h"
-#include "qpid/log/Statement.h"
-#include "qpid/sys/ClusterSafe.h"
-#include "qpid/types/Variant.h"
-#include "qpid/management/ManagementAgent.h"
-#include <boost/current_function.hpp>
-
-
-namespace qpid {
-namespace cluster {
-
-using namespace framing;
-using namespace framing::cluster;
-using amqp_0_10::ListCodec;
-using types::Variant;
-
-qpid::sys::AtomicValue<uint64_t> Connection::catchUpId(0x5000000000000000LL);
-
-Connection::NullFrameHandler Connection::nullFrameHandler;
-
-struct NullFrameHandler : public framing::FrameHandler {
- void handle(framing::AMQFrame&) {}
-};
-
-
-namespace {
-sys::AtomicValue<uint64_t> idCounter;
-const std::string shadowPrefix("[shadow]");
-}
-
-
-// Shadow connection
-Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
- const std::string& mgmtId,
- const ConnectionId& id, const qpid::sys::SecuritySettings& external)
- : cluster(c), self(id), catchUp(false), announced(false), output(*this, out),
- connectionCtor(&output, cluster.getBroker(), mgmtId, external, false, 0, true),
- expectProtocolHeader(false),
- mcastFrameHandler(cluster.getMulticast(), self),
- updateIn(c.getUpdateReceiver()),
- secureConnection(0)
-{}
-
-// Local connection
-Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
- const std::string& mgmtId, MemberId member,
- bool isCatchUp, bool isLink, const qpid::sys::SecuritySettings& external
-) : cluster(c), self(member, ++idCounter), catchUp(isCatchUp), announced(false), output(*this, out),
- connectionCtor(&output, cluster.getBroker(),
- mgmtId,
- external,
- isLink,
- isCatchUp ? ++catchUpId : 0,
- isCatchUp), // isCatchUp => shadow
- expectProtocolHeader(isLink),
- mcastFrameHandler(cluster.getMulticast(), self),
- updateIn(c.getUpdateReceiver()),
- secureConnection(0)
-{
- if (isLocalClient()) {
- giveReadCredit(cluster.getSettings().readMax); // Flow control
- // Delay adding the connection to the management map until announce()
- connectionCtor.delayManagement = true;
- }
- else {
- // Catch-up shadow connections initialized using nextShadow id.
- assert(catchUp);
- if (!updateIn.nextShadowMgmtId.empty())
- connectionCtor.mgmtId = updateIn.nextShadowMgmtId;
- updateIn.nextShadowMgmtId.clear();
- }
- init();
- QPID_LOG(debug, cluster << " local connection " << *this);
-}
-
-void Connection::setSecureConnection(broker::SecureConnection* sc) {
- secureConnection = sc;
- if (connection.get()) connection->setSecureConnection(sc);
-}
-
-void Connection::init() {
- connection = connectionCtor.construct();
- if (isLocalClient()) {
- if (secureConnection) connection->setSecureConnection(secureConnection);
- // Actively send cluster-order frames from local node
- connection->setClusterOrderOutput(mcastFrameHandler);
- }
- else { // Shadow or catch-up connection
- // Passive, discard cluster-order frames
- connection->setClusterOrderOutput(nullFrameHandler);
- // Disable client throttling, done by active node.
- connection->setClientThrottling(false);
- }
- if (!isCatchUp())
- connection->setErrorListener(this);
-}
-
-// Called when we have consumed a read buffer to give credit to the
-// connection layer to continue reading.
-void Connection::giveReadCredit(int credit) {
- if (cluster.getSettings().readMax && credit)
- output.giveReadCredit(credit);
-}
-
-void Connection::announce(
- const std::string& mgmtId, uint32_t ssf, const std::string& authid, bool nodict,
- const std::string& username, const std::string& initialFrames)
-{
- QPID_ASSERT(mgmtId == connectionCtor.mgmtId);
- QPID_ASSERT(ssf == connectionCtor.external.ssf);
- QPID_ASSERT(authid == connectionCtor.external.authid);
- QPID_ASSERT(nodict == connectionCtor.external.nodict);
- // Local connections are already initialized but with management delayed.
- if (isLocalClient()) {
- connection->addManagementObject();
- }
- else if (isShadow()) {
- init();
- // Play initial frames into the connection.
- Buffer buf(const_cast<char*>(initialFrames.data()), initialFrames.size());
- AMQFrame frame;
- while (frame.decode(buf))
- connection->received(frame);
- connection->setUserId(username);
- }
- // Do managment actions now that the connection is replicated.
- connection->raiseConnectEvent();
- QPID_LOG(debug, cluster << " replicated connection " << *this);
-}
-
-Connection::~Connection() {
- if (connection.get()) connection->setErrorListener(0);
- // Don't trigger cluster-safe asserts in broker:: ~Connection as
- // it may be called in an IO thread context during broker
- // shutdown.
- sys::ClusterSafeScope css;
- connection.reset();
-}
-
-bool Connection::doOutput() {
- return output.doOutput();
-}
-
-// Received from a directly connected client.
-void Connection::received(framing::AMQFrame& f) {
- if (!connection.get()) {
- QPID_LOG(warning, cluster << " ignoring frame on closed connection "
- << *this << ": " << f);
- return;
- }
- QPID_LOG(trace, cluster << " RECV " << *this << ": " << f);
- if (isLocal()) { // Local catch-up connection.
- currentChannel = f.getChannel();
- if (!framing::invoke(*this, *f.getBody()).wasHandled())
- connection->received(f);
- }
- else { // Shadow or updated catch-up connection.
- if (f.getMethod() && f.getMethod()->isA<ConnectionCloseBody>()) {
- if (isShadow())
- cluster.addShadowConnection(this);
- AMQFrame ok((ConnectionCloseOkBody()));
- connection->getOutput().send(ok);
- output.closeOutput();
- catchUp = false;
- }
- else
- QPID_LOG(warning, cluster << " ignoring unexpected frame " << *this << ": " << f);
- }
-}
-
-bool Connection::checkUnsupported(const AMQBody& body) {
- std::string message;
- if (body.getMethod()) {
- switch (body.getMethod()->amqpClassId()) {
- case DTX_CLASS_ID: message = "DTX transactions are not currently supported by cluster."; break;
- }
- }
- if (!message.empty())
- connection->close(connection::CLOSE_CODE_FRAMING_ERROR, message);
- return !message.empty();
-}
-
-struct GiveReadCreditOnExit {
- Connection& connection;
- int credit;
- GiveReadCreditOnExit(Connection& connection_, int credit_) :
- connection(connection_), credit(credit_) {}
- ~GiveReadCreditOnExit() { if (credit) connection.giveReadCredit(credit); }
-};
-
-void Connection::deliverDoOutput(uint32_t limit) {
- output.deliverDoOutput(limit);
-}
-
-// Called in delivery thread, in cluster order.
-void Connection::deliveredFrame(const EventFrame& f) {
- GiveReadCreditOnExit gc(*this, f.readCredit);
- assert(!catchUp);
- currentChannel = f.frame.getChannel();
- if (f.frame.getBody() // frame can be emtpy with just readCredit
- && !framing::invoke(*this, *f.frame.getBody()).wasHandled() // Connection contol.
- && !checkUnsupported(*f.frame.getBody())) // Unsupported operation.
- {
- if (f.type == DATA) // incoming data frames to broker::Connection
- connection->received(const_cast<AMQFrame&>(f.frame));
- else { // frame control, send frame via SessionState
- broker::SessionState* ss = connection->getChannel(currentChannel).getSession();
- if (ss) ss->out(const_cast<AMQFrame&>(f.frame));
- }
- }
-}
-
-// A local connection is closed by the network layer. Called in the connection thread.
-void Connection::closed() {
- try {
- if (isUpdated()) {
- QPID_LOG(debug, cluster << " update connection closed " << *this);
- close();
- cluster.updateInClosed();
- }
- else if (catchUp && cluster.isExpectingUpdate()) {
- QPID_LOG(critical, cluster << " catch-up connection closed prematurely " << *this);
- cluster.leave();
- }
- else if (isLocal()) {
- // This was a local replicated connection. Multicast a deliver
- // closed and process any outstanding frames from the cluster
- // until self-delivery of deliver-close.
- output.closeOutput();
- if (announced)
- cluster.getMulticast().mcastControl(
- ClusterConnectionDeliverCloseBody(), self);
- }
- }
- catch (const std::exception& e) {
- QPID_LOG(error, cluster << " error closing connection " << *this << ": " << e.what());
- }
-}
-
-// Self-delivery of close message, close the connection.
-void Connection::deliverClose () {
- close();
- cluster.erase(self);
-}
-
-// Close the connection
-void Connection::close() {
- if (connection.get()) {
- QPID_LOG(debug, cluster << " closed connection " << *this);
- connection->closed();
- connection.reset();
- }
-}
-
-// The connection has sent invalid data and should be aborted.
-// All members will get the same abort since they all process the same data.
-void Connection::abort() {
- connection->abort();
- // Aborting the connection will result in a call to ::closed()
- // and allow the connection to close in an orderly manner.
-}
-
-// ConnectionCodec::decode receives read buffers from directly-connected clients.
-size_t Connection::decode(const char* data, size_t size) {
- GiveReadCreditOnExit grc(*this, 1); // Give a read credit by default.
- const char* ptr = data;
- const char* end = data + size;
- if (catchUp) { // Handle catch-up locally.
- if (!cluster.isExpectingUpdate()) {
- QPID_LOG(error, "Rejecting unexpected catch-up connection.");
- abort(); // Cluster is not expecting catch-up connections.
- }
- bool wasOpen = connection->isOpen();
- Buffer buf(const_cast<char*>(ptr), size);
- ptr += size;
- while (localDecoder.decode(buf))
- received(localDecoder.getFrame());
- if (!wasOpen && connection->isOpen()) {
- // Connections marked as federation links are allowed to proxy
- // messages with user-ID that doesn't match the connection's
- // authenticated ID. This is important for updates.
- connection->setFederationLink(isCatchUp());
- }
- }
- else { // Multicast local connections.
- assert(isLocalClient());
- assert(connection.get());
- if (!checkProtocolHeader(ptr, size)) // Updates ptr
- return 0; // Incomplete header
-
- if (!connection->isOpen())
- processInitialFrames(ptr, end-ptr); // Updates ptr
-
- if (connection->isOpen() && end - ptr > 0) {
- // We're multi-casting, we will give read credit on delivery.
- grc.credit = 0;
- cluster.getMulticast().mcastBuffer(ptr, end - ptr, self);
- ptr = end;
- }
- }
- return ptr - data;
-}
-
-// Decode the protocol header if needed. Updates data and size
-// returns true if the header is complete or already read.
-bool Connection::checkProtocolHeader(const char*& data, size_t size) {
- if (expectProtocolHeader) {
- // This is an outgoing link connection, we will receive a protocol
- // header which needs to be decoded first
- framing::ProtocolInitiation pi;
- Buffer buf(const_cast<char*&>(data), size);
- if (pi.decode(buf)) {
- //TODO: check the version is correct
- expectProtocolHeader = false;
- data += pi.encodedSize();
- } else {
- return false;
- }
- }
- return true;
-}
-
-void Connection::processInitialFrames(const char*& ptr, size_t size) {
- // Process the initial negotiation locally and store it so
- // it can be replayed on other brokers in announce()
- Buffer buf(const_cast<char*>(ptr), size);
- framing::AMQFrame frame;
- while (!connection->isOpen() && frame.decode(buf))
- received(frame);
- initialFrames.append(ptr, buf.getPosition());
- ptr += buf.getPosition();
- if (connection->isOpen()) { // initial negotiation complete
- cluster.getMulticast().mcastControl(
- ClusterConnectionAnnounceBody(
- ProtocolVersion(),
- connectionCtor.mgmtId,
- connectionCtor.external.ssf,
- connectionCtor.external.authid,
- connectionCtor.external.nodict,
- connection->getUserId(),
- initialFrames),
- getId());
- announced = true;
- initialFrames.clear();
- }
-}
-
-broker::SessionState& Connection::sessionState() {
- return *connection->getChannel(currentChannel).getSession();
-}
-
-broker::SemanticState& Connection::semanticState() {
- return sessionState().getSemanticState();
-}
-
-void Connection::shadowPrepare(const std::string& mgmtId) {
- updateIn.nextShadowMgmtId = mgmtId;
-}
-
-void Connection::shadowSetUser(const std::string& userId) {
- connection->setUserId(userId);
-}
-
-void Connection::consumerState(const string& name, bool blocked, bool notifyEnabled, const SequenceNumber& position)
-{
- broker::SemanticState::ConsumerImpl& c = semanticState().find(name);
- c.position = position;
- c.setBlocked(blocked);
- if (notifyEnabled) c.enableNotify(); else c.disableNotify();
- updateIn.consumerNumbering.add(c.shared_from_this());
-}
-
-
-void Connection::sessionState(
- const SequenceNumber& replayStart,
- const SequenceNumber& sendCommandPoint,
- const SequenceSet& sentIncomplete,
- const SequenceNumber& expected,
- const SequenceNumber& received,
- const SequenceSet& unknownCompleted,
- const SequenceSet& receivedIncomplete)
-{
- sessionState().setState(
- replayStart,
- sendCommandPoint,
- sentIncomplete,
- expected,
- received,
- unknownCompleted,
- receivedIncomplete);
- QPID_LOG(debug, cluster << " received session state update for " << sessionState().getId());
- // The output tasks will be added later in the update process.
- connection->getOutputTasks().removeAll();
-}
-
-void Connection::outputTask(uint16_t channel, const std::string& name) {
- broker::SessionState* session = connection->getChannel(channel).getSession();
- if (!session)
- throw Exception(QPID_MSG(cluster << " channel not attached " << *this
- << "[" << channel << "] "));
- OutputTask* task = &session->getSemanticState().find(name);
- connection->getOutputTasks().addOutputTask(task);
-}
-
-void Connection::shadowReady(
- uint64_t memberId, uint64_t connectionId, const string& mgmtId,
- const string& username, const string& fragment, uint32_t sendMax)
-{
- QPID_ASSERT(mgmtId == getBrokerConnection()->getMgmtId());
- ConnectionId shadowId = ConnectionId(memberId, connectionId);
- QPID_LOG(debug, cluster << " catch-up connection " << *this
- << " becomes shadow " << shadowId);
- self = shadowId;
- connection->setUserId(username);
- // OK to use decoder here because cluster is stalled for update.
- cluster.getDecoder().get(self).setFragment(fragment.data(), fragment.size());
- connection->setErrorListener(this);
- output.setSendMax(sendMax);
-}
-
-void Connection::membership(const FieldTable& joiners, const FieldTable& members,
- const framing::SequenceNumber& frameSeq)
-{
- QPID_LOG(debug, cluster << " incoming update complete on connection " << *this);
- updateIn.consumerNumbering.clear();
- closeUpdated();
- cluster.updateInDone(ClusterMap(joiners, members, frameSeq));
-}
-
-void Connection::retractOffer() {
- QPID_LOG(info, cluster << " incoming update retracted on connection " << *this);
- closeUpdated();
- cluster.updateInRetracted();
-}
-
-void Connection::closeUpdated() {
- self.second = 0; // Mark this as completed update connection.
- if (connection.get())
- connection->close(connection::CLOSE_CODE_NORMAL, "OK");
-}
-
-bool Connection::isLocal() const {
- return self.first == cluster.getId() && self.second;
-}
-
-bool Connection::isShadow() const {
- return self.first != cluster.getId();
-}
-
-bool Connection::isUpdated() const {
- return self.first == cluster.getId() && self.second == 0;
-}
-
-
-boost::shared_ptr<broker::Queue> Connection::findQueue(const std::string& qname) {
- boost::shared_ptr<broker::Queue> queue = cluster.getBroker().getQueues().find(qname);
- if (!queue) throw Exception(QPID_MSG(cluster << " can't find queue " << qname));
- return queue;
-}
-
-broker::QueuedMessage Connection::getUpdateMessage() {
- boost::shared_ptr<broker::Queue> updateq = findQueue(UpdateClient::UPDATE);
- assert(!updateq->isDurable());
- broker::QueuedMessage m = updateq->get();
- if (!m.payload) throw Exception(QPID_MSG(cluster << " empty update queue"));
- return m;
-}
-
-void Connection::deliveryRecord(const string& qname,
- const SequenceNumber& position,
- const string& tag,
- const SequenceNumber& id,
- bool acquired,
- bool accepted,
- bool cancelled,
- bool completed,
- bool ended,
- bool windowing,
- bool enqueued,
- uint32_t credit)
-{
- broker::QueuedMessage m;
- broker::Queue::shared_ptr queue = findQueue(qname);
- if (!ended) { // Has a message
- if (acquired) { // Message is on the update queue
- m = getUpdateMessage();
- m.queue = queue.get();
- m.position = position;
- if (enqueued) queue->updateEnqueued(m); //inform queue of the message
- } else { // Message at original position in original queue
- m = queue->find(position);
- }
- if (!m.payload)
- throw Exception(QPID_MSG("deliveryRecord no update message"));
- }
-
- broker::DeliveryRecord dr(m, queue, tag, acquired, accepted, windowing, credit);
- dr.setId(id);
- if (cancelled) dr.cancel(dr.getTag());
- if (completed) dr.complete();
- if (ended) dr.setEnded(); // Exsitance of message
- semanticState().record(dr); // Part of the session's unacked list.
-}
-
-void Connection::queuePosition(const string& qname, const SequenceNumber& position) {
- findQueue(qname)->setPosition(position);
-}
-
-void Connection::queueFairshareState(const std::string& qname, const uint8_t priority, const uint8_t count)
-{
- if (!qpid::broker::Fairshare::setState(findQueue(qname)->getMessages(), priority, count)) {
- QPID_LOG(error, "Failed to set fair share state on queue " << qname << "; this will result in inconsistencies.");
- }
-}
-
-
-namespace {
- // find a StatefulQueueObserver that matches a given identifier
- class ObserverFinder {
- const std::string id;
- boost::shared_ptr<broker::QueueObserver> target;
- ObserverFinder(const ObserverFinder&) {}
- public:
- ObserverFinder(const std::string& _id) : id(_id) {}
- broker::StatefulQueueObserver *getObserver()
- {
- if (target)
- return dynamic_cast<broker::StatefulQueueObserver *>(target.get());
- return 0;
- }
- void operator() (boost::shared_ptr<broker::QueueObserver> o)
- {
- if (!target) {
- broker::StatefulQueueObserver *p = dynamic_cast<broker::StatefulQueueObserver *>(o.get());
- if (p && p->getId() == id) {
- target = o;
- }
- }
- }
- };
-}
-
-
-void Connection::queueObserverState(const std::string& qname, const std::string& observerId, const FieldTable& state)
-{
- boost::shared_ptr<broker::Queue> queue(findQueue(qname));
- ObserverFinder finder(observerId); // find this observer
- queue->eachObserver<ObserverFinder &>(finder);
- broker::StatefulQueueObserver *so = finder.getObserver();
- if (so) {
- so->setState( state );
- QPID_LOG(debug, "updated queue observer " << observerId << "'s state on queue " << qname << "; ...");
- return;
- }
- QPID_LOG(error, "Failed to find observer " << observerId << " state on queue " << qname << "; this will result in inconsistencies.");
-}
-
-void Connection::expiryId(uint64_t id) {
- cluster.getExpiryPolicy().setId(id);
-}
-
-std::ostream& operator<<(std::ostream& o, const Connection& c) {
- const char* type="unknown";
- if (c.isLocal()) type = "local";
- else if (c.isShadow()) type = "shadow";
- else if (c.isUpdated()) type = "updated";
- const broker::Connection* bc = c.getBrokerConnection();
- if (bc) o << bc->getMgmtId();
- else o << "<disconnected>";
- return o << "(" << c.getId() << " " << type << (c.isCatchUp() ? ",catchup":"") << ")";
-}
-
-void Connection::txStart() {
- txBuffer.reset(new broker::TxBuffer());
-}
-void Connection::txAccept(const framing::SequenceSet& acked) {
- txBuffer->enlist(boost::shared_ptr<broker::TxAccept>(
- new broker::TxAccept(acked, semanticState().getUnacked())));
-}
-
-void Connection::txDequeue(const std::string& queue) {
- txBuffer->enlist(boost::shared_ptr<broker::RecoveredDequeue>(
- new broker::RecoveredDequeue(findQueue(queue), getUpdateMessage().payload)));
-}
-
-void Connection::txEnqueue(const std::string& queue) {
- txBuffer->enlist(boost::shared_ptr<broker::RecoveredEnqueue>(
- new broker::RecoveredEnqueue(findQueue(queue), getUpdateMessage().payload)));
-}
-
-void Connection::txPublish(const framing::Array& queues, bool delivered) {
- boost::shared_ptr<broker::TxPublish> txPub(new broker::TxPublish(getUpdateMessage().payload));
- for (framing::Array::const_iterator i = queues.begin(); i != queues.end(); ++i)
- txPub->deliverTo(findQueue((*i)->get<std::string>()));
- txPub->delivered = delivered;
- txBuffer->enlist(txPub);
-}
-
-void Connection::txEnd() {
- semanticState().setTxBuffer(txBuffer);
-}
-
-void Connection::accumulatedAck(const qpid::framing::SequenceSet& s) {
- semanticState().setAccumulatedAck(s);
-}
-
-void Connection::exchange(const std::string& encoded) {
- Buffer buf(const_cast<char*>(encoded.data()), encoded.size());
- broker::Exchange::shared_ptr ex = broker::Exchange::decode(cluster.getBroker().getExchanges(), buf);
- if(ex.get() && ex->isDurable() && !ex->getName().find("amq.") == 0 && !ex->getName().find("qpid.") == 0) {
- cluster.getBroker().getStore().create(*(ex.get()), ex->getArgs());
- }
- QPID_LOG(debug, cluster << " updated exchange " << ex->getName());
-}
-
-void Connection::sessionError(uint16_t , const std::string& msg) {
- // Ignore errors before isOpen(), we're not multicasting yet.
- if (connection->isOpen())
- cluster.flagError(*this, ERROR_TYPE_SESSION, msg);
-}
-
-void Connection::connectionError(const std::string& msg) {
- // Ignore errors before isOpen(), we're not multicasting yet.
- if (connection->isOpen())
- cluster.flagError(*this, ERROR_TYPE_CONNECTION, msg);
-}
-
-void Connection::addQueueListener(const std::string& q, uint32_t listener) {
- if (listener >= updateIn.consumerNumbering.size())
- throw Exception(QPID_MSG("Invalid listener ID: " << listener));
- findQueue(q)->getListeners().addListener(updateIn.consumerNumbering[listener]);
-}
-
-//
-// This is the handler for incoming managementsetup messages.
-//
-void Connection::managementSetupState(
- uint64_t objectNum, uint16_t bootSequence, const framing::Uuid& id,
- const std::string& vendor, const std::string& product, const std::string& instance)
-{
- QPID_LOG(debug, cluster << " updated management: object number="
- << objectNum << " boot sequence=" << bootSequence
- << " broker-id=" << id
- << " vendor=" << vendor
- << " product=" << product
- << " instance=" << instance);
- management::ManagementAgent* agent = cluster.getBroker().getManagementAgent();
- if (!agent)
- throw Exception(QPID_MSG("Management schema update but management not enabled."));
- agent->setNextObjectId(objectNum);
- agent->setBootSequence(bootSequence);
- agent->setUuid(id);
- agent->setName(vendor, product, instance);
-}
-
-void Connection::config(const std::string& encoded) {
- Buffer buf(const_cast<char*>(encoded.data()), encoded.size());
- string kind;
- buf.getShortString (kind);
- if (kind == "link") {
- broker::Link::shared_ptr link =
- broker::Link::decode(cluster.getBroker().getLinks(), buf);
- QPID_LOG(debug, cluster << " updated link "
- << link->getHost() << ":" << link->getPort());
- }
- else if (kind == "bridge") {
- broker::Bridge::shared_ptr bridge =
- broker::Bridge::decode(cluster.getBroker().getLinks(), buf);
- QPID_LOG(debug, cluster << " updated bridge " << bridge->getName());
- }
- else throw Exception(QPID_MSG("Update failed, invalid kind of config: " << kind));
-}
-
-void Connection::doCatchupIoCallbacks() {
- // We need to process IO callbacks during the catch-up phase in
- // order to service asynchronous completions for messages
- // transferred during catch-up.
-
- if (catchUp) getBrokerConnection()->doIoCallbacks();
-}
-}} // Namespace qpid::cluster
-