diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2011-05-27 15:44:23 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2011-05-27 15:44:23 +0000 |
commit | 66765100f4257159622cefe57bed50125a5ad017 (patch) | |
tree | a88ee23bb194eb91f0ebb2d9b23ff423e3ea8e37 /cpp/src/qpid/cluster/Connection.cpp | |
parent | 1aeaa7b16e5ce54f10c901d75c4d40f9f88b9db6 (diff) | |
parent | 88b98b2f4152ef59a671fad55a0d08338b6b78ca (diff) | |
download | qpid-python-rajith_jms_client.tar.gz |
Creating a branch for experimenting with some ideas for JMS client.rajith_jms_client
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rajith_jms_client@1128369 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Connection.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 728 |
1 files changed, 0 insertions, 728 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp deleted file mode 100644 index b9895290e9..0000000000 --- a/cpp/src/qpid/cluster/Connection.cpp +++ /dev/null @@ -1,728 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "qpid/amqp_0_10/Codecs.h" -#include "Connection.h" -#include "UpdateClient.h" -#include "Cluster.h" -#include "UpdateReceiver.h" -#include "qpid/assert.h" -#include "qpid/broker/SessionState.h" -#include "qpid/broker/SemanticState.h" -#include "qpid/broker/TxBuffer.h" -#include "qpid/broker/TxPublish.h" -#include "qpid/broker/TxAccept.h" -#include "qpid/broker/RecoveredEnqueue.h" -#include "qpid/broker/RecoveredDequeue.h" -#include "qpid/broker/Exchange.h" -#include "qpid/broker/Fairshare.h" -#include "qpid/broker/Link.h" -#include "qpid/broker/Bridge.h" -#include "qpid/broker/StatefulQueueObserver.h" -#include "qpid/broker/Queue.h" -#include "qpid/framing/enum.h" -#include "qpid/framing/AMQFrame.h" -#include "qpid/framing/AllInvoker.h" -#include "qpid/framing/DeliveryProperties.h" -#include "qpid/framing/ClusterConnectionDeliverCloseBody.h" -#include "qpid/framing/ClusterConnectionAnnounceBody.h" -#include "qpid/framing/ConnectionCloseBody.h" -#include "qpid/framing/ConnectionCloseOkBody.h" -#include "qpid/log/Statement.h" -#include "qpid/sys/ClusterSafe.h" -#include "qpid/types/Variant.h" -#include "qpid/management/ManagementAgent.h" -#include <boost/current_function.hpp> - - -namespace qpid { -namespace cluster { - -using namespace framing; -using namespace framing::cluster; -using amqp_0_10::ListCodec; -using types::Variant; - -qpid::sys::AtomicValue<uint64_t> Connection::catchUpId(0x5000000000000000LL); - -Connection::NullFrameHandler Connection::nullFrameHandler; - -struct NullFrameHandler : public framing::FrameHandler { - void handle(framing::AMQFrame&) {} -}; - - -namespace { -sys::AtomicValue<uint64_t> idCounter; -const std::string shadowPrefix("[shadow]"); -} - - -// Shadow connection -Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, - const std::string& mgmtId, - const ConnectionId& id, const qpid::sys::SecuritySettings& external) - : cluster(c), self(id), catchUp(false), announced(false), output(*this, out), - connectionCtor(&output, cluster.getBroker(), mgmtId, external, false, 0, true), - expectProtocolHeader(false), - mcastFrameHandler(cluster.getMulticast(), self), - updateIn(c.getUpdateReceiver()), - secureConnection(0) -{} - -// Local connection -Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, - const std::string& mgmtId, MemberId member, - bool isCatchUp, bool isLink, const qpid::sys::SecuritySettings& external -) : cluster(c), self(member, ++idCounter), catchUp(isCatchUp), announced(false), output(*this, out), - connectionCtor(&output, cluster.getBroker(), - mgmtId, - external, - isLink, - isCatchUp ? ++catchUpId : 0, - isCatchUp), // isCatchUp => shadow - expectProtocolHeader(isLink), - mcastFrameHandler(cluster.getMulticast(), self), - updateIn(c.getUpdateReceiver()), - secureConnection(0) -{ - if (isLocalClient()) { - giveReadCredit(cluster.getSettings().readMax); // Flow control - // Delay adding the connection to the management map until announce() - connectionCtor.delayManagement = true; - } - else { - // Catch-up shadow connections initialized using nextShadow id. - assert(catchUp); - if (!updateIn.nextShadowMgmtId.empty()) - connectionCtor.mgmtId = updateIn.nextShadowMgmtId; - updateIn.nextShadowMgmtId.clear(); - } - init(); - QPID_LOG(debug, cluster << " local connection " << *this); -} - -void Connection::setSecureConnection(broker::SecureConnection* sc) { - secureConnection = sc; - if (connection.get()) connection->setSecureConnection(sc); -} - -void Connection::init() { - connection = connectionCtor.construct(); - if (isLocalClient()) { - if (secureConnection) connection->setSecureConnection(secureConnection); - // Actively send cluster-order frames from local node - connection->setClusterOrderOutput(mcastFrameHandler); - } - else { // Shadow or catch-up connection - // Passive, discard cluster-order frames - connection->setClusterOrderOutput(nullFrameHandler); - // Disable client throttling, done by active node. - connection->setClientThrottling(false); - } - if (!isCatchUp()) - connection->setErrorListener(this); -} - -// Called when we have consumed a read buffer to give credit to the -// connection layer to continue reading. -void Connection::giveReadCredit(int credit) { - if (cluster.getSettings().readMax && credit) - output.giveReadCredit(credit); -} - -void Connection::announce( - const std::string& mgmtId, uint32_t ssf, const std::string& authid, bool nodict, - const std::string& username, const std::string& initialFrames) -{ - QPID_ASSERT(mgmtId == connectionCtor.mgmtId); - QPID_ASSERT(ssf == connectionCtor.external.ssf); - QPID_ASSERT(authid == connectionCtor.external.authid); - QPID_ASSERT(nodict == connectionCtor.external.nodict); - // Local connections are already initialized but with management delayed. - if (isLocalClient()) { - connection->addManagementObject(); - } - else if (isShadow()) { - init(); - // Play initial frames into the connection. - Buffer buf(const_cast<char*>(initialFrames.data()), initialFrames.size()); - AMQFrame frame; - while (frame.decode(buf)) - connection->received(frame); - connection->setUserId(username); - } - // Do managment actions now that the connection is replicated. - connection->raiseConnectEvent(); - QPID_LOG(debug, cluster << " replicated connection " << *this); -} - -Connection::~Connection() { - if (connection.get()) connection->setErrorListener(0); - // Don't trigger cluster-safe asserts in broker:: ~Connection as - // it may be called in an IO thread context during broker - // shutdown. - sys::ClusterSafeScope css; - connection.reset(); -} - -bool Connection::doOutput() { - return output.doOutput(); -} - -// Received from a directly connected client. -void Connection::received(framing::AMQFrame& f) { - if (!connection.get()) { - QPID_LOG(warning, cluster << " ignoring frame on closed connection " - << *this << ": " << f); - return; - } - QPID_LOG(trace, cluster << " RECV " << *this << ": " << f); - if (isLocal()) { // Local catch-up connection. - currentChannel = f.getChannel(); - if (!framing::invoke(*this, *f.getBody()).wasHandled()) - connection->received(f); - } - else { // Shadow or updated catch-up connection. - if (f.getMethod() && f.getMethod()->isA<ConnectionCloseBody>()) { - if (isShadow()) - cluster.addShadowConnection(this); - AMQFrame ok((ConnectionCloseOkBody())); - connection->getOutput().send(ok); - output.closeOutput(); - catchUp = false; - } - else - QPID_LOG(warning, cluster << " ignoring unexpected frame " << *this << ": " << f); - } -} - -bool Connection::checkUnsupported(const AMQBody& body) { - std::string message; - if (body.getMethod()) { - switch (body.getMethod()->amqpClassId()) { - case DTX_CLASS_ID: message = "DTX transactions are not currently supported by cluster."; break; - } - } - if (!message.empty()) - connection->close(connection::CLOSE_CODE_FRAMING_ERROR, message); - return !message.empty(); -} - -struct GiveReadCreditOnExit { - Connection& connection; - int credit; - GiveReadCreditOnExit(Connection& connection_, int credit_) : - connection(connection_), credit(credit_) {} - ~GiveReadCreditOnExit() { if (credit) connection.giveReadCredit(credit); } -}; - -void Connection::deliverDoOutput(uint32_t limit) { - output.deliverDoOutput(limit); -} - -// Called in delivery thread, in cluster order. -void Connection::deliveredFrame(const EventFrame& f) { - GiveReadCreditOnExit gc(*this, f.readCredit); - assert(!catchUp); - currentChannel = f.frame.getChannel(); - if (f.frame.getBody() // frame can be emtpy with just readCredit - && !framing::invoke(*this, *f.frame.getBody()).wasHandled() // Connection contol. - && !checkUnsupported(*f.frame.getBody())) // Unsupported operation. - { - if (f.type == DATA) // incoming data frames to broker::Connection - connection->received(const_cast<AMQFrame&>(f.frame)); - else { // frame control, send frame via SessionState - broker::SessionState* ss = connection->getChannel(currentChannel).getSession(); - if (ss) ss->out(const_cast<AMQFrame&>(f.frame)); - } - } -} - -// A local connection is closed by the network layer. Called in the connection thread. -void Connection::closed() { - try { - if (isUpdated()) { - QPID_LOG(debug, cluster << " update connection closed " << *this); - close(); - cluster.updateInClosed(); - } - else if (catchUp && cluster.isExpectingUpdate()) { - QPID_LOG(critical, cluster << " catch-up connection closed prematurely " << *this); - cluster.leave(); - } - else if (isLocal()) { - // This was a local replicated connection. Multicast a deliver - // closed and process any outstanding frames from the cluster - // until self-delivery of deliver-close. - output.closeOutput(); - if (announced) - cluster.getMulticast().mcastControl( - ClusterConnectionDeliverCloseBody(), self); - } - } - catch (const std::exception& e) { - QPID_LOG(error, cluster << " error closing connection " << *this << ": " << e.what()); - } -} - -// Self-delivery of close message, close the connection. -void Connection::deliverClose () { - close(); - cluster.erase(self); -} - -// Close the connection -void Connection::close() { - if (connection.get()) { - QPID_LOG(debug, cluster << " closed connection " << *this); - connection->closed(); - connection.reset(); - } -} - -// The connection has sent invalid data and should be aborted. -// All members will get the same abort since they all process the same data. -void Connection::abort() { - connection->abort(); - // Aborting the connection will result in a call to ::closed() - // and allow the connection to close in an orderly manner. -} - -// ConnectionCodec::decode receives read buffers from directly-connected clients. -size_t Connection::decode(const char* data, size_t size) { - GiveReadCreditOnExit grc(*this, 1); // Give a read credit by default. - const char* ptr = data; - const char* end = data + size; - if (catchUp) { // Handle catch-up locally. - if (!cluster.isExpectingUpdate()) { - QPID_LOG(error, "Rejecting unexpected catch-up connection."); - abort(); // Cluster is not expecting catch-up connections. - } - bool wasOpen = connection->isOpen(); - Buffer buf(const_cast<char*>(ptr), size); - ptr += size; - while (localDecoder.decode(buf)) - received(localDecoder.getFrame()); - if (!wasOpen && connection->isOpen()) { - // Connections marked as federation links are allowed to proxy - // messages with user-ID that doesn't match the connection's - // authenticated ID. This is important for updates. - connection->setFederationLink(isCatchUp()); - } - } - else { // Multicast local connections. - assert(isLocalClient()); - assert(connection.get()); - if (!checkProtocolHeader(ptr, size)) // Updates ptr - return 0; // Incomplete header - - if (!connection->isOpen()) - processInitialFrames(ptr, end-ptr); // Updates ptr - - if (connection->isOpen() && end - ptr > 0) { - // We're multi-casting, we will give read credit on delivery. - grc.credit = 0; - cluster.getMulticast().mcastBuffer(ptr, end - ptr, self); - ptr = end; - } - } - return ptr - data; -} - -// Decode the protocol header if needed. Updates data and size -// returns true if the header is complete or already read. -bool Connection::checkProtocolHeader(const char*& data, size_t size) { - if (expectProtocolHeader) { - // This is an outgoing link connection, we will receive a protocol - // header which needs to be decoded first - framing::ProtocolInitiation pi; - Buffer buf(const_cast<char*&>(data), size); - if (pi.decode(buf)) { - //TODO: check the version is correct - expectProtocolHeader = false; - data += pi.encodedSize(); - } else { - return false; - } - } - return true; -} - -void Connection::processInitialFrames(const char*& ptr, size_t size) { - // Process the initial negotiation locally and store it so - // it can be replayed on other brokers in announce() - Buffer buf(const_cast<char*>(ptr), size); - framing::AMQFrame frame; - while (!connection->isOpen() && frame.decode(buf)) - received(frame); - initialFrames.append(ptr, buf.getPosition()); - ptr += buf.getPosition(); - if (connection->isOpen()) { // initial negotiation complete - cluster.getMulticast().mcastControl( - ClusterConnectionAnnounceBody( - ProtocolVersion(), - connectionCtor.mgmtId, - connectionCtor.external.ssf, - connectionCtor.external.authid, - connectionCtor.external.nodict, - connection->getUserId(), - initialFrames), - getId()); - announced = true; - initialFrames.clear(); - } -} - -broker::SessionState& Connection::sessionState() { - return *connection->getChannel(currentChannel).getSession(); -} - -broker::SemanticState& Connection::semanticState() { - return sessionState().getSemanticState(); -} - -void Connection::shadowPrepare(const std::string& mgmtId) { - updateIn.nextShadowMgmtId = mgmtId; -} - -void Connection::shadowSetUser(const std::string& userId) { - connection->setUserId(userId); -} - -void Connection::consumerState(const string& name, bool blocked, bool notifyEnabled, const SequenceNumber& position) -{ - broker::SemanticState::ConsumerImpl& c = semanticState().find(name); - c.position = position; - c.setBlocked(blocked); - if (notifyEnabled) c.enableNotify(); else c.disableNotify(); - updateIn.consumerNumbering.add(c.shared_from_this()); -} - - -void Connection::sessionState( - const SequenceNumber& replayStart, - const SequenceNumber& sendCommandPoint, - const SequenceSet& sentIncomplete, - const SequenceNumber& expected, - const SequenceNumber& received, - const SequenceSet& unknownCompleted, - const SequenceSet& receivedIncomplete) -{ - sessionState().setState( - replayStart, - sendCommandPoint, - sentIncomplete, - expected, - received, - unknownCompleted, - receivedIncomplete); - QPID_LOG(debug, cluster << " received session state update for " << sessionState().getId()); - // The output tasks will be added later in the update process. - connection->getOutputTasks().removeAll(); -} - -void Connection::outputTask(uint16_t channel, const std::string& name) { - broker::SessionState* session = connection->getChannel(channel).getSession(); - if (!session) - throw Exception(QPID_MSG(cluster << " channel not attached " << *this - << "[" << channel << "] ")); - OutputTask* task = &session->getSemanticState().find(name); - connection->getOutputTasks().addOutputTask(task); -} - -void Connection::shadowReady( - uint64_t memberId, uint64_t connectionId, const string& mgmtId, - const string& username, const string& fragment, uint32_t sendMax) -{ - QPID_ASSERT(mgmtId == getBrokerConnection()->getMgmtId()); - ConnectionId shadowId = ConnectionId(memberId, connectionId); - QPID_LOG(debug, cluster << " catch-up connection " << *this - << " becomes shadow " << shadowId); - self = shadowId; - connection->setUserId(username); - // OK to use decoder here because cluster is stalled for update. - cluster.getDecoder().get(self).setFragment(fragment.data(), fragment.size()); - connection->setErrorListener(this); - output.setSendMax(sendMax); -} - -void Connection::membership(const FieldTable& joiners, const FieldTable& members, - const framing::SequenceNumber& frameSeq) -{ - QPID_LOG(debug, cluster << " incoming update complete on connection " << *this); - updateIn.consumerNumbering.clear(); - closeUpdated(); - cluster.updateInDone(ClusterMap(joiners, members, frameSeq)); -} - -void Connection::retractOffer() { - QPID_LOG(info, cluster << " incoming update retracted on connection " << *this); - closeUpdated(); - cluster.updateInRetracted(); -} - -void Connection::closeUpdated() { - self.second = 0; // Mark this as completed update connection. - if (connection.get()) - connection->close(connection::CLOSE_CODE_NORMAL, "OK"); -} - -bool Connection::isLocal() const { - return self.first == cluster.getId() && self.second; -} - -bool Connection::isShadow() const { - return self.first != cluster.getId(); -} - -bool Connection::isUpdated() const { - return self.first == cluster.getId() && self.second == 0; -} - - -boost::shared_ptr<broker::Queue> Connection::findQueue(const std::string& qname) { - boost::shared_ptr<broker::Queue> queue = cluster.getBroker().getQueues().find(qname); - if (!queue) throw Exception(QPID_MSG(cluster << " can't find queue " << qname)); - return queue; -} - -broker::QueuedMessage Connection::getUpdateMessage() { - boost::shared_ptr<broker::Queue> updateq = findQueue(UpdateClient::UPDATE); - assert(!updateq->isDurable()); - broker::QueuedMessage m = updateq->get(); - if (!m.payload) throw Exception(QPID_MSG(cluster << " empty update queue")); - return m; -} - -void Connection::deliveryRecord(const string& qname, - const SequenceNumber& position, - const string& tag, - const SequenceNumber& id, - bool acquired, - bool accepted, - bool cancelled, - bool completed, - bool ended, - bool windowing, - bool enqueued, - uint32_t credit) -{ - broker::QueuedMessage m; - broker::Queue::shared_ptr queue = findQueue(qname); - if (!ended) { // Has a message - if (acquired) { // Message is on the update queue - m = getUpdateMessage(); - m.queue = queue.get(); - m.position = position; - if (enqueued) queue->updateEnqueued(m); //inform queue of the message - } else { // Message at original position in original queue - m = queue->find(position); - } - if (!m.payload) - throw Exception(QPID_MSG("deliveryRecord no update message")); - } - - broker::DeliveryRecord dr(m, queue, tag, acquired, accepted, windowing, credit); - dr.setId(id); - if (cancelled) dr.cancel(dr.getTag()); - if (completed) dr.complete(); - if (ended) dr.setEnded(); // Exsitance of message - semanticState().record(dr); // Part of the session's unacked list. -} - -void Connection::queuePosition(const string& qname, const SequenceNumber& position) { - findQueue(qname)->setPosition(position); -} - -void Connection::queueFairshareState(const std::string& qname, const uint8_t priority, const uint8_t count) -{ - if (!qpid::broker::Fairshare::setState(findQueue(qname)->getMessages(), priority, count)) { - QPID_LOG(error, "Failed to set fair share state on queue " << qname << "; this will result in inconsistencies."); - } -} - - -namespace { - // find a StatefulQueueObserver that matches a given identifier - class ObserverFinder { - const std::string id; - boost::shared_ptr<broker::QueueObserver> target; - ObserverFinder(const ObserverFinder&) {} - public: - ObserverFinder(const std::string& _id) : id(_id) {} - broker::StatefulQueueObserver *getObserver() - { - if (target) - return dynamic_cast<broker::StatefulQueueObserver *>(target.get()); - return 0; - } - void operator() (boost::shared_ptr<broker::QueueObserver> o) - { - if (!target) { - broker::StatefulQueueObserver *p = dynamic_cast<broker::StatefulQueueObserver *>(o.get()); - if (p && p->getId() == id) { - target = o; - } - } - } - }; -} - - -void Connection::queueObserverState(const std::string& qname, const std::string& observerId, const FieldTable& state) -{ - boost::shared_ptr<broker::Queue> queue(findQueue(qname)); - ObserverFinder finder(observerId); // find this observer - queue->eachObserver<ObserverFinder &>(finder); - broker::StatefulQueueObserver *so = finder.getObserver(); - if (so) { - so->setState( state ); - QPID_LOG(debug, "updated queue observer " << observerId << "'s state on queue " << qname << "; ..."); - return; - } - QPID_LOG(error, "Failed to find observer " << observerId << " state on queue " << qname << "; this will result in inconsistencies."); -} - -void Connection::expiryId(uint64_t id) { - cluster.getExpiryPolicy().setId(id); -} - -std::ostream& operator<<(std::ostream& o, const Connection& c) { - const char* type="unknown"; - if (c.isLocal()) type = "local"; - else if (c.isShadow()) type = "shadow"; - else if (c.isUpdated()) type = "updated"; - const broker::Connection* bc = c.getBrokerConnection(); - if (bc) o << bc->getMgmtId(); - else o << "<disconnected>"; - return o << "(" << c.getId() << " " << type << (c.isCatchUp() ? ",catchup":"") << ")"; -} - -void Connection::txStart() { - txBuffer.reset(new broker::TxBuffer()); -} -void Connection::txAccept(const framing::SequenceSet& acked) { - txBuffer->enlist(boost::shared_ptr<broker::TxAccept>( - new broker::TxAccept(acked, semanticState().getUnacked()))); -} - -void Connection::txDequeue(const std::string& queue) { - txBuffer->enlist(boost::shared_ptr<broker::RecoveredDequeue>( - new broker::RecoveredDequeue(findQueue(queue), getUpdateMessage().payload))); -} - -void Connection::txEnqueue(const std::string& queue) { - txBuffer->enlist(boost::shared_ptr<broker::RecoveredEnqueue>( - new broker::RecoveredEnqueue(findQueue(queue), getUpdateMessage().payload))); -} - -void Connection::txPublish(const framing::Array& queues, bool delivered) { - boost::shared_ptr<broker::TxPublish> txPub(new broker::TxPublish(getUpdateMessage().payload)); - for (framing::Array::const_iterator i = queues.begin(); i != queues.end(); ++i) - txPub->deliverTo(findQueue((*i)->get<std::string>())); - txPub->delivered = delivered; - txBuffer->enlist(txPub); -} - -void Connection::txEnd() { - semanticState().setTxBuffer(txBuffer); -} - -void Connection::accumulatedAck(const qpid::framing::SequenceSet& s) { - semanticState().setAccumulatedAck(s); -} - -void Connection::exchange(const std::string& encoded) { - Buffer buf(const_cast<char*>(encoded.data()), encoded.size()); - broker::Exchange::shared_ptr ex = broker::Exchange::decode(cluster.getBroker().getExchanges(), buf); - if(ex.get() && ex->isDurable() && !ex->getName().find("amq.") == 0 && !ex->getName().find("qpid.") == 0) { - cluster.getBroker().getStore().create(*(ex.get()), ex->getArgs()); - } - QPID_LOG(debug, cluster << " updated exchange " << ex->getName()); -} - -void Connection::sessionError(uint16_t , const std::string& msg) { - // Ignore errors before isOpen(), we're not multicasting yet. - if (connection->isOpen()) - cluster.flagError(*this, ERROR_TYPE_SESSION, msg); -} - -void Connection::connectionError(const std::string& msg) { - // Ignore errors before isOpen(), we're not multicasting yet. - if (connection->isOpen()) - cluster.flagError(*this, ERROR_TYPE_CONNECTION, msg); -} - -void Connection::addQueueListener(const std::string& q, uint32_t listener) { - if (listener >= updateIn.consumerNumbering.size()) - throw Exception(QPID_MSG("Invalid listener ID: " << listener)); - findQueue(q)->getListeners().addListener(updateIn.consumerNumbering[listener]); -} - -// -// This is the handler for incoming managementsetup messages. -// -void Connection::managementSetupState( - uint64_t objectNum, uint16_t bootSequence, const framing::Uuid& id, - const std::string& vendor, const std::string& product, const std::string& instance) -{ - QPID_LOG(debug, cluster << " updated management: object number=" - << objectNum << " boot sequence=" << bootSequence - << " broker-id=" << id - << " vendor=" << vendor - << " product=" << product - << " instance=" << instance); - management::ManagementAgent* agent = cluster.getBroker().getManagementAgent(); - if (!agent) - throw Exception(QPID_MSG("Management schema update but management not enabled.")); - agent->setNextObjectId(objectNum); - agent->setBootSequence(bootSequence); - agent->setUuid(id); - agent->setName(vendor, product, instance); -} - -void Connection::config(const std::string& encoded) { - Buffer buf(const_cast<char*>(encoded.data()), encoded.size()); - string kind; - buf.getShortString (kind); - if (kind == "link") { - broker::Link::shared_ptr link = - broker::Link::decode(cluster.getBroker().getLinks(), buf); - QPID_LOG(debug, cluster << " updated link " - << link->getHost() << ":" << link->getPort()); - } - else if (kind == "bridge") { - broker::Bridge::shared_ptr bridge = - broker::Bridge::decode(cluster.getBroker().getLinks(), buf); - QPID_LOG(debug, cluster << " updated bridge " << bridge->getName()); - } - else throw Exception(QPID_MSG("Update failed, invalid kind of config: " << kind)); -} - -void Connection::doCatchupIoCallbacks() { - // We need to process IO callbacks during the catch-up phase in - // order to service asynchronous completions for messages - // transferred during catch-up. - - if (catchUp) getBrokerConnection()->doIoCallbacks(); -} -}} // Namespace qpid::cluster - |