diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-02-24 22:35:53 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-02-24 22:35:53 +0000 |
commit | b5027a43f535995f37472672583dbb87c0a02aff (patch) | |
tree | db8ba67b7f11ed359b286d6cb307b9d678707790 /cpp/src/qpid/cluster/Connection.cpp | |
parent | fb0db96f52a38bc8f185d23abc7908361bc4dd68 (diff) | |
download | qpid-python-b5027a43f535995f37472672583dbb87c0a02aff.tar.gz |
QPID-3084: apply Alan's fix to allow io callbacks to run during a cluster update.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1074332 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Connection.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 30 |
1 files changed, 18 insertions, 12 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index e9b718e6de..2532adab84 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -143,7 +143,7 @@ void Connection::init() { // Called when we have consumed a read buffer to give credit to the // connection layer to continue reading. void Connection::giveReadCredit(int credit) { - if (cluster.getSettings().readMax && credit) + if (cluster.getSettings().readMax && credit) output.giveReadCredit(credit); } @@ -201,7 +201,7 @@ void Connection::received(framing::AMQFrame& f) { } else { // Shadow or updated catch-up connection. if (f.getMethod() && f.getMethod()->isA<ConnectionCloseBody>()) { - if (isShadow()) + if (isShadow()) cluster.addShadowConnection(this); AMQFrame ok((ConnectionCloseOkBody())); connection->getOutput().send(ok); @@ -241,7 +241,7 @@ void Connection::deliverDoOutput(uint32_t limit) { void Connection::deliveredFrame(const EventFrame& f) { GiveReadCreditOnExit gc(*this, f.readCredit); assert(!catchUp); - currentChannel = f.frame.getChannel(); + currentChannel = f.frame.getChannel(); if (f.frame.getBody() // frame can be emtpy with just readCredit && !framing::invoke(*this, *f.frame.getBody()).wasHandled() // Connection contol. && !checkUnsupported(*f.frame.getBody())) // Unsupported operation. @@ -287,7 +287,7 @@ void Connection::deliverClose () { cluster.erase(self); } -// Close the connection +// Close the connection void Connection::close() { if (connection.get()) { QPID_LOG(debug, cluster << " closed connection " << *this); @@ -332,9 +332,9 @@ size_t Connection::decode(const char* data, size_t size) { if (!checkProtocolHeader(ptr, size)) // Updates ptr return 0; // Incomplete header - if (!connection->isOpen()) + if (!connection->isOpen()) processInitialFrames(ptr, end-ptr); // Updates ptr - + if (connection->isOpen() && end - ptr > 0) { // We're multi-casting, we will give read credit on delivery. grc.credit = 0; @@ -432,7 +432,7 @@ void Connection::sessionState( unknownCompleted, receivedIncomplete); QPID_LOG(debug, cluster << " received session state update for " << sessionState().getId()); - // The output tasks will be added later in the update process. + // The output tasks will be added later in the update process. connection->getOutputTasks().removeAll(); } @@ -478,7 +478,7 @@ void Connection::retractOffer() { void Connection::closeUpdated() { self.second = 0; // Mark this as completed update connection. - if (connection.get()) + if (connection.get()) connection->close(connection::CLOSE_CODE_NORMAL, "OK"); } @@ -529,7 +529,7 @@ void Connection::deliveryRecord(const string& qname, m = getUpdateMessage(); m.queue = queue.get(); m.position = position; - if (enqueued) queue->updateEnqueued(m); //inform queue of the message + if (enqueued) queue->updateEnqueued(m); //inform queue of the message } else { // Message at original position in original queue m = queue->find(position); } @@ -591,7 +591,7 @@ void Connection::txEnqueue(const std::string& queue) { void Connection::txPublish(const framing::Array& queues, bool delivered) { boost::shared_ptr<broker::TxPublish> txPub(new broker::TxPublish(getUpdateMessage().payload)); - for (framing::Array::const_iterator i = queues.begin(); i != queues.end(); ++i) + for (framing::Array::const_iterator i = queues.begin(); i != queues.end(); ++i) txPub->deliverTo(findQueue((*i)->get<std::string>())); txPub->delivered = delivered; txBuffer->enlist(txPub); @@ -678,6 +678,12 @@ void Connection::config(const std::string& encoded) { else throw Exception(QPID_MSG("Update failed, invalid kind of config: " << kind)); } +void Connection::doCatchupIoCallbacks() { + // We need to process IO callbacks during the catch-up phase in + // order to service asynchronous completions for messages + // transferred during catch-up. + if (catchUp) getBrokerConnection()->doIoCallbacks(); +} }} // Namespace qpid::cluster |