summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Connection.cpp
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2011-02-24 22:35:53 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2011-02-24 22:35:53 +0000
commitb5027a43f535995f37472672583dbb87c0a02aff (patch)
treedb8ba67b7f11ed359b286d6cb307b9d678707790 /cpp/src/qpid/cluster/Connection.cpp
parentfb0db96f52a38bc8f185d23abc7908361bc4dd68 (diff)
downloadqpid-python-b5027a43f535995f37472672583dbb87c0a02aff.tar.gz
QPID-3084: apply Alan's fix to allow io callbacks to run during a cluster update.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1074332 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Connection.cpp')
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp30
1 files changed, 18 insertions, 12 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index e9b718e6de..2532adab84 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -143,7 +143,7 @@ void Connection::init() {
// Called when we have consumed a read buffer to give credit to the
// connection layer to continue reading.
void Connection::giveReadCredit(int credit) {
- if (cluster.getSettings().readMax && credit)
+ if (cluster.getSettings().readMax && credit)
output.giveReadCredit(credit);
}
@@ -201,7 +201,7 @@ void Connection::received(framing::AMQFrame& f) {
}
else { // Shadow or updated catch-up connection.
if (f.getMethod() && f.getMethod()->isA<ConnectionCloseBody>()) {
- if (isShadow())
+ if (isShadow())
cluster.addShadowConnection(this);
AMQFrame ok((ConnectionCloseOkBody()));
connection->getOutput().send(ok);
@@ -241,7 +241,7 @@ void Connection::deliverDoOutput(uint32_t limit) {
void Connection::deliveredFrame(const EventFrame& f) {
GiveReadCreditOnExit gc(*this, f.readCredit);
assert(!catchUp);
- currentChannel = f.frame.getChannel();
+ currentChannel = f.frame.getChannel();
if (f.frame.getBody() // frame can be emtpy with just readCredit
&& !framing::invoke(*this, *f.frame.getBody()).wasHandled() // Connection contol.
&& !checkUnsupported(*f.frame.getBody())) // Unsupported operation.
@@ -287,7 +287,7 @@ void Connection::deliverClose () {
cluster.erase(self);
}
-// Close the connection
+// Close the connection
void Connection::close() {
if (connection.get()) {
QPID_LOG(debug, cluster << " closed connection " << *this);
@@ -332,9 +332,9 @@ size_t Connection::decode(const char* data, size_t size) {
if (!checkProtocolHeader(ptr, size)) // Updates ptr
return 0; // Incomplete header
- if (!connection->isOpen())
+ if (!connection->isOpen())
processInitialFrames(ptr, end-ptr); // Updates ptr
-
+
if (connection->isOpen() && end - ptr > 0) {
// We're multi-casting, we will give read credit on delivery.
grc.credit = 0;
@@ -432,7 +432,7 @@ void Connection::sessionState(
unknownCompleted,
receivedIncomplete);
QPID_LOG(debug, cluster << " received session state update for " << sessionState().getId());
- // The output tasks will be added later in the update process.
+ // The output tasks will be added later in the update process.
connection->getOutputTasks().removeAll();
}
@@ -478,7 +478,7 @@ void Connection::retractOffer() {
void Connection::closeUpdated() {
self.second = 0; // Mark this as completed update connection.
- if (connection.get())
+ if (connection.get())
connection->close(connection::CLOSE_CODE_NORMAL, "OK");
}
@@ -529,7 +529,7 @@ void Connection::deliveryRecord(const string& qname,
m = getUpdateMessage();
m.queue = queue.get();
m.position = position;
- if (enqueued) queue->updateEnqueued(m); //inform queue of the message
+ if (enqueued) queue->updateEnqueued(m); //inform queue of the message
} else { // Message at original position in original queue
m = queue->find(position);
}
@@ -591,7 +591,7 @@ void Connection::txEnqueue(const std::string& queue) {
void Connection::txPublish(const framing::Array& queues, bool delivered) {
boost::shared_ptr<broker::TxPublish> txPub(new broker::TxPublish(getUpdateMessage().payload));
- for (framing::Array::const_iterator i = queues.begin(); i != queues.end(); ++i)
+ for (framing::Array::const_iterator i = queues.begin(); i != queues.end(); ++i)
txPub->deliverTo(findQueue((*i)->get<std::string>()));
txPub->delivered = delivered;
txBuffer->enlist(txPub);
@@ -678,6 +678,12 @@ void Connection::config(const std::string& encoded) {
else throw Exception(QPID_MSG("Update failed, invalid kind of config: " << kind));
}
+void Connection::doCatchupIoCallbacks() {
+ // We need to process IO callbacks during the catch-up phase in
+ // order to service asynchronous completions for messages
+ // transferred during catch-up.
+ if (catchUp) getBrokerConnection()->doIoCallbacks();
+}
}} // Namespace qpid::cluster