diff options
author | Kim van der Riet <kpvdr@apache.org> | 2012-08-03 12:13:32 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2012-08-03 12:13:32 +0000 |
commit | d43d1912b376322e27fdcda551a73f9ff5487972 (patch) | |
tree | ce493e10baa95f44be8beb5778ce51783463196d /cpp/src/qpid/cluster/Connection.cpp | |
parent | 04877fec0c6346edec67072d7f2d247740cf2af5 (diff) | |
download | qpid-python-d43d1912b376322e27fdcda551a73f9ff5487972.tar.gz |
QPID-3858: Updated branch - merged from trunk r.1368650
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1368910 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Connection.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 29 |
1 files changed, 20 insertions, 9 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 512e0f03cb..ff855eef18 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -58,6 +58,8 @@ namespace qpid { namespace cluster { +using std::string; + using namespace framing; using namespace framing::cluster; using amqp_0_10::ListCodec; @@ -83,7 +85,9 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& mgmtId, const ConnectionId& id, const qpid::sys::SecuritySettings& external) : cluster(c), self(id), catchUp(false), announced(false), output(*this, out), - connectionCtor(&output, cluster.getBroker(), mgmtId, external, false, 0, true), + connectionCtor(&output, cluster.getBroker(), mgmtId, external, + false/*isLink*/, 0/*objectId*/, true/*shadow*/, false/*delayManagement*/, + false/*authenticated*/), expectProtocolHeader(false), mcastFrameHandler(cluster.getMulticast(), self), updateIn(c.getUpdateReceiver()), @@ -100,9 +104,10 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, external, isLink, isCatchUp ? ++catchUpId : 0, - // The first catch-up connection is not considered a shadow - // as it needs to be authenticated. - isCatchUp && self.second > 1), + // The first catch-up connection is not a shadow + isCatchUp && self.second > 1, + false, // delayManagement + true), // catch up connecytions are authenticated expectProtocolHeader(isLink), mcastFrameHandler(cluster.getMulticast(), self), updateIn(c.getUpdateReceiver()), @@ -272,6 +277,8 @@ void Connection::closed() { if (announced) cluster.getMulticast().mcastControl( ClusterConnectionDeliverCloseBody(), self); + else + close(); } } catch (const std::exception& e) { @@ -404,11 +411,12 @@ void Connection::shadowSetUser(const std::string& userId) { } void Connection::consumerState(const string& name, bool blocked, bool notifyEnabled, const SequenceNumber& position, - uint32_t usedMsgCredit, uint32_t usedByteCredit) + uint32_t usedMsgCredit, uint32_t usedByteCredit, const uint32_t deliveryCount) { broker::SemanticState::ConsumerImpl::shared_ptr c = semanticState().find(name); c->setPosition(position); c->setBlocked(blocked); + c->setDeliveryCount(deliveryCount); if (c->getCredit().isWindowMode()) c->getCredit().consume(usedMsgCredit, usedByteCredit); if (notifyEnabled) c->enableNotify(); else c->disableNotify(); updateIn.consumerNumbering.add(c); @@ -522,6 +530,7 @@ broker::QueuedMessage Connection::getUpdateMessage() { boost::shared_ptr<broker::Queue> updateq = findQueue(UpdateClient::UPDATE); assert(!updateq->isDurable()); broker::QueuedMessage m = updateq->get(); + updateq->dequeue(0, m); if (!m.payload) throw Exception(QPID_MSG(cluster << " empty update queue")); return m; } @@ -782,16 +791,18 @@ void Connection::managementSetupState( void Connection::config(const std::string& encoded) { Buffer buf(const_cast<char*>(encoded.data()), encoded.size()); string kind; + uint32_t p = buf.getPosition(); buf.getShortString (kind); - if (kind == "link") { + buf.setPosition(p); + if (broker::Link::isEncodedLink(kind)) { broker::Link::shared_ptr link = - broker::Link::decode(cluster.getBroker().getLinks(), buf); + broker::Link::decode(cluster.getBroker().getLinks(), buf); QPID_LOG(debug, cluster << " updated link " << link->getHost() << ":" << link->getPort()); } - else if (kind == "bridge") { + else if (broker::Bridge::isEncodedBridge(kind)) { broker::Bridge::shared_ptr bridge = - broker::Bridge::decode(cluster.getBroker().getLinks(), buf); + broker::Bridge::decode(cluster.getBroker().getLinks(), buf); QPID_LOG(debug, cluster << " updated bridge " << bridge->getName()); } else throw Exception(QPID_MSG("Update failed, invalid kind of config: " << kind)); |