diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-02-24 22:35:53 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-02-24 22:35:53 +0000 |
commit | 09f3f9617416e1a9bf11efff0ea4df4e746f3592 (patch) | |
tree | 1113d1a75fb194bb316af61270ecd04d200d653f | |
parent | 0315f575a14370dcff19d91cc1695dc751d4f420 (diff) | |
download | qpid-python-09f3f9617416e1a9bf11efff0ea4df4e746f3592.tar.gz |
QPID-3084: apply Alan's fix to allow io callbacks to run during a cluster update.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1074332 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/Connection.cpp | 26 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Connection.h | 7 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Connection.cpp | 30 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Connection.h | 20 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp | 17 |
5 files changed, 59 insertions, 41 deletions
diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp index 67713a6eb7..f138a2d55e 100644 --- a/qpid/cpp/src/qpid/broker/Connection.cpp +++ b/qpid/cpp/src/qpid/broker/Connection.cpp @@ -345,17 +345,21 @@ void Connection::closed(){ // Physically closed, suspend open sessions. } } +void Connection::doIoCallbacks() { + { + ScopedLock<Mutex> l(ioCallbackLock); + while (!ioCallbacks.empty()) { + boost::function0<void> cb = ioCallbacks.front(); + ioCallbacks.pop(); + ScopedUnlock<Mutex> ul(ioCallbackLock); + cb(); // Lend the IO thread for management processing + } + } +} + bool Connection::doOutput() { try { - { - ScopedLock<Mutex> l(ioCallbackLock); - while (!ioCallbacks.empty()) { - boost::function0<void> cb = ioCallbacks.front(); - ioCallbacks.pop(); - ScopedUnlock<Mutex> ul(ioCallbackLock); - cb(); // Lend the IO thread for management processing - } - } + doIoCallbacks(); if (mgmtClosing) { closed(); close(connection::CLOSE_CODE_CONNECTION_FORCED, "Closed by Management Request"); @@ -475,8 +479,8 @@ void Connection::OutboundFrameTracker::abort() { next->abort(); } void Connection::OutboundFrameTracker::activateOutput() { next->activateOutput(); } void Connection::OutboundFrameTracker::giveReadCredit(int32_t credit) { next->giveReadCredit(credit); } void Connection::OutboundFrameTracker::send(framing::AMQFrame& f) -{ - next->send(f); +{ + next->send(f); con.sent(f); } void Connection::OutboundFrameTracker::wrap(sys::ConnectionOutputHandlerPtr& p) diff --git a/qpid/cpp/src/qpid/broker/Connection.h b/qpid/cpp/src/qpid/broker/Connection.h index b751848d73..8f1aa701ef 100644 --- a/qpid/cpp/src/qpid/broker/Connection.h +++ b/qpid/cpp/src/qpid/broker/Connection.h @@ -153,13 +153,16 @@ class Connection : public sys::ConnectionInputHandler, void addManagementObject(); const qpid::sys::SecuritySettings& getExternalSecuritySettings() const - { + { return securitySettings; } /** @return true if the initial connection negotiation is complete. */ bool isOpen(); + // Used by cluster during catch-up, see cluster::OutputInterceptor + void doIoCallbacks(); + private: typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap; typedef std::vector<boost::shared_ptr<Queue> >::iterator queue_iterator; @@ -201,7 +204,7 @@ class Connection : public sys::ConnectionInputHandler, sys::ConnectionOutputHandler* next; }; OutboundFrameTracker outboundTracker; - + void sent(const framing::AMQFrame& f); public: diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp index e9b718e6de..2532adab84 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.cpp +++ b/qpid/cpp/src/qpid/cluster/Connection.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -143,7 +143,7 @@ void Connection::init() { // Called when we have consumed a read buffer to give credit to the // connection layer to continue reading. void Connection::giveReadCredit(int credit) { - if (cluster.getSettings().readMax && credit) + if (cluster.getSettings().readMax && credit) output.giveReadCredit(credit); } @@ -201,7 +201,7 @@ void Connection::received(framing::AMQFrame& f) { } else { // Shadow or updated catch-up connection. if (f.getMethod() && f.getMethod()->isA<ConnectionCloseBody>()) { - if (isShadow()) + if (isShadow()) cluster.addShadowConnection(this); AMQFrame ok((ConnectionCloseOkBody())); connection->getOutput().send(ok); @@ -241,7 +241,7 @@ void Connection::deliverDoOutput(uint32_t limit) { void Connection::deliveredFrame(const EventFrame& f) { GiveReadCreditOnExit gc(*this, f.readCredit); assert(!catchUp); - currentChannel = f.frame.getChannel(); + currentChannel = f.frame.getChannel(); if (f.frame.getBody() // frame can be emtpy with just readCredit && !framing::invoke(*this, *f.frame.getBody()).wasHandled() // Connection contol. && !checkUnsupported(*f.frame.getBody())) // Unsupported operation. @@ -287,7 +287,7 @@ void Connection::deliverClose () { cluster.erase(self); } -// Close the connection +// Close the connection void Connection::close() { if (connection.get()) { QPID_LOG(debug, cluster << " closed connection " << *this); @@ -332,9 +332,9 @@ size_t Connection::decode(const char* data, size_t size) { if (!checkProtocolHeader(ptr, size)) // Updates ptr return 0; // Incomplete header - if (!connection->isOpen()) + if (!connection->isOpen()) processInitialFrames(ptr, end-ptr); // Updates ptr - + if (connection->isOpen() && end - ptr > 0) { // We're multi-casting, we will give read credit on delivery. grc.credit = 0; @@ -432,7 +432,7 @@ void Connection::sessionState( unknownCompleted, receivedIncomplete); QPID_LOG(debug, cluster << " received session state update for " << sessionState().getId()); - // The output tasks will be added later in the update process. + // The output tasks will be added later in the update process. connection->getOutputTasks().removeAll(); } @@ -478,7 +478,7 @@ void Connection::retractOffer() { void Connection::closeUpdated() { self.second = 0; // Mark this as completed update connection. - if (connection.get()) + if (connection.get()) connection->close(connection::CLOSE_CODE_NORMAL, "OK"); } @@ -529,7 +529,7 @@ void Connection::deliveryRecord(const string& qname, m = getUpdateMessage(); m.queue = queue.get(); m.position = position; - if (enqueued) queue->updateEnqueued(m); //inform queue of the message + if (enqueued) queue->updateEnqueued(m); //inform queue of the message } else { // Message at original position in original queue m = queue->find(position); } @@ -591,7 +591,7 @@ void Connection::txEnqueue(const std::string& queue) { void Connection::txPublish(const framing::Array& queues, bool delivered) { boost::shared_ptr<broker::TxPublish> txPub(new broker::TxPublish(getUpdateMessage().payload)); - for (framing::Array::const_iterator i = queues.begin(); i != queues.end(); ++i) + for (framing::Array::const_iterator i = queues.begin(); i != queues.end(); ++i) txPub->deliverTo(findQueue((*i)->get<std::string>())); txPub->delivered = delivered; txBuffer->enlist(txPub); @@ -678,6 +678,12 @@ void Connection::config(const std::string& encoded) { else throw Exception(QPID_MSG("Update failed, invalid kind of config: " << kind)); } +void Connection::doCatchupIoCallbacks() { + // We need to process IO callbacks during the catch-up phase in + // order to service asynchronous completions for messages + // transferred during catch-up. + if (catchUp) getBrokerConnection()->doIoCallbacks(); +} }} // Namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h index 7ee85bf1aa..b96fa73072 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.h +++ b/qpid/cpp/src/qpid/cluster/Connection.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -62,7 +62,7 @@ class Connection : public sys::ConnectionInputHandler, public framing::AMQP_AllOperations::ClusterConnectionHandler, private broker::Connection::ErrorListener - + { public: @@ -73,7 +73,7 @@ class Connection : Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& mgmtId, const ConnectionId& id, const qpid::sys::SecuritySettings& external); ~Connection(); - + ConnectionId getId() const { return self; } broker::Connection* getBrokerConnection() { return connection.get(); } const broker::Connection* getBrokerConnection() const { return connection.get(); } @@ -108,9 +108,9 @@ class Connection : void deliveredFrame(const EventFrame&); void consumerState(const std::string& name, bool blocked, bool notifyEnabled, const qpid::framing::SequenceNumber& position); - + // ==== Used in catch-up mode to build initial state. - // + // // State update methods. void shadowPrepare(const std::string&); @@ -123,9 +123,9 @@ class Connection : const framing::SequenceNumber& received, const framing::SequenceSet& unknownCompleted, const SequenceSet& receivedIncomplete); - + void outputTask(uint16_t channel, const std::string& name); - + void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& managementId, @@ -189,6 +189,8 @@ class Connection : void setSecureConnection ( broker::SecureConnection * sc ); + void doCatchupIoCallbacks(); + private: struct NullFrameHandler : public framing::FrameHandler { void handle(framing::AMQFrame&) {} @@ -233,7 +235,7 @@ class Connection : // Error listener functions void connectionError(const std::string&); void sessionError(uint16_t channel, const std::string&); - + void init(); bool checkUnsupported(const framing::AMQBody& body); void deliverDoOutput(uint32_t limit); diff --git a/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp index 1354dab17b..13e95d1ec2 100644 --- a/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp +++ b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -66,11 +66,14 @@ void OutputInterceptor::giveReadCredit(int32_t credit) { } // Called in write thread when the IO layer has no more data to write. -// We do nothing in the write thread, we run doOutput only on delivery -// of doOutput requests. -bool OutputInterceptor::doOutput() { return false; } +// We only process IO callbacks in the write thread during catch-up. +// Normally we run doOutput only on delivery of doOutput requests. +bool OutputInterceptor::doOutput() { + parent.doCatchupIoCallbacks(); + return false; +} -// Send output up to limit, calculate new limit. +// Send output up to limit, calculate new limit. void OutputInterceptor::deliverDoOutput(uint32_t limit) { sentDoOutput = false; sendMax = limit; @@ -78,7 +81,7 @@ void OutputInterceptor::deliverDoOutput(uint32_t limit) { if (parent.isLocal()) { size_t buffered = getBuffered(); if (buffered == 0 && sent == sendMax) // Could have sent more, increase the limit. - newLimit = sendMax*2; + newLimit = sendMax*2; else if (buffered > 0 && sent > 1) // Data left unsent, reduce the limit. newLimit = (sendMax + sent) / 2; } |