diff options
author | Kim van der Riet <kpvdr@apache.org> | 2009-09-14 15:20:13 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2009-09-14 15:20:13 +0000 |
commit | 4403eeede5a7b7493031eaca767c35eb093766cb (patch) | |
tree | 9bc332238e8ebd343407a0b172bb07cdf82352d9 | |
parent | 3bff347ac137aaa284004216c7b857d89668a086 (diff) | |
download | qpid-python-4403eeede5a7b7493031eaca767c35eb093766cb.tar.gz |
Reversed checkin of r.813825 until its problems can be resolved
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@814692 13f79535-47bb-0310-9956-ffa450edef68
23 files changed, 411 insertions, 526 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index daae675749..4988f3f031 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -536,7 +536,6 @@ libqpidbroker_la_SOURCES = \ qpid/broker/MessageAdapter.h \ qpid/broker/MessageBuilder.cpp \ qpid/broker/MessageBuilder.h \ - qpid/broker/MessageReleaseManager.h \ qpid/broker/MessageStore.h \ qpid/broker/MessageStoreModule.cpp \ qpid/broker/MessageStoreModule.h \ diff --git a/qpid/cpp/src/qpid/broker/DirectExchange.cpp b/qpid/cpp/src/qpid/broker/DirectExchange.cpp index 6ffa7e2f8e..b9f24dee5f 100644 --- a/qpid/cpp/src/qpid/broker/DirectExchange.cpp +++ b/qpid/cpp/src/qpid/broker/DirectExchange.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -28,7 +28,7 @@ using namespace qpid::sys; using qpid::management::Manageable; namespace _qmf = qmf::org::apache::qpid::broker; -namespace +namespace { const std::string qpidFedOp("qpid.fed.op"); const std::string qpidFedTags("qpid.fed.tags"); @@ -98,7 +98,7 @@ bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, con */ std::vector<std::string> keys2prop; { - Mutex::ScopedLock l(lock); + Mutex::ScopedLock l(lock); for (Bindings::iterator iter = bindings.begin(); iter != bindings.end(); iter++) { const BoundKey& bk = iter->second; @@ -150,7 +150,34 @@ void DirectExchange::route(Deliverable& msg, const string& routingKey, const Fie Mutex::ScopedLock l(lock); p = bindings[routingKey].queues.snapshot(); } - doRoute(msg, p); + int count(0); + + if (p) { + for(std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); i++, count++) { + msg.deliverTo((*i)->queue); + if ((*i)->mgmtBinding != 0) + (*i)->mgmtBinding->inc_msgMatched(); + } + } + + if(!count){ + QPID_LOG(info, "DirectExchange " << getName() << " could not route message with key " << routingKey + << "; no matching binding found"); + if (mgmtExchange != 0) { + mgmtExchange->inc_msgDrops(); + mgmtExchange->inc_byteDrops(msg.contentSize()); + } + } else { + if (mgmtExchange != 0) { + mgmtExchange->inc_msgRoutes(count); + mgmtExchange->inc_byteRoutes(count * msg.contentSize()); + } + } + + if (mgmtExchange != 0) { + mgmtExchange->inc_msgReceives(); + mgmtExchange->inc_byteReceives(msg.contentSize()); + } } diff --git a/qpid/cpp/src/qpid/broker/Exchange.cpp b/qpid/cpp/src/qpid/broker/Exchange.cpp index 9b5796bde3..90d81b81c6 100644 --- a/qpid/cpp/src/qpid/broker/Exchange.cpp +++ b/qpid/cpp/src/qpid/broker/Exchange.cpp @@ -76,49 +76,6 @@ Exchange::PreRoute::~PreRoute(){ } } -void Exchange::blockContentReleaseCheck(Deliverable& msg, qpid::sys::CopyOnWriteArray<Binding::shared_ptr>::ConstPtr p) -{ - bool allQueuesPersistent = true; - for (std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); allQueuesPersistent && i!=p->end(); i++) { - allQueuesPersistent = (*i)->queue->getPersistenceId() > 0; - } - if (msg.getMessage().contentSize() && (!allQueuesPersistent || (p->size() > 1 && !msg.getMessage().isPersistent()))) { - msg.getMessage().blockRelease(); - } -} - -void Exchange::doRoute(Deliverable& msg, qpid::sys::CopyOnWriteArray<Binding::shared_ptr>::ConstPtr p) -{ - int count = 0; - - if (p.get()) { - blockContentReleaseCheck(msg, p); - - for(std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); i++, count++) { - msg.deliverTo((*i)->queue); - if ((*i)->mgmtBinding != 0) - (*i)->mgmtBinding->inc_msgMatched(); - } - } - - if (mgmtExchange != 0) - { - mgmtExchange->inc_msgReceives (); - mgmtExchange->inc_byteReceives (msg.contentSize ()); - if (count == 0) - { - //QPID_LOG(warning, "Exchange " << getName() << " could not route message; no matching binding found"); - mgmtExchange->inc_msgDrops (); - mgmtExchange->inc_byteDrops (msg.contentSize ()); - } - else - { - mgmtExchange->inc_msgRoutes (count); - mgmtExchange->inc_byteRoutes (count * msg.contentSize ()); - } - } -} - void Exchange::routeIVE(){ if (ive && lastMsg.get()){ DeliverableMessage dmsg(lastMsg); diff --git a/qpid/cpp/src/qpid/broker/Exchange.h b/qpid/cpp/src/qpid/broker/Exchange.h index c2393d0850..c1e878200f 100644 --- a/qpid/cpp/src/qpid/broker/Exchange.h +++ b/qpid/cpp/src/qpid/broker/Exchange.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -29,7 +29,6 @@ #include "qpid/broker/MessageStore.h" #include "qpid/broker/PersistableExchange.h" #include "qpid/framing/FieldTable.h" -#include "qpid/sys/CopyOnWriteArray.h" #include "qpid/sys/Mutex.h" #include "qpid/management/Manageable.h" #include "qmf/org/apache/qpid/broker/Exchange.h" @@ -79,14 +78,12 @@ protected: private: Exchange* parent; }; - - void blockContentReleaseCheck(Deliverable& msg, qpid::sys::CopyOnWriteArray<Binding::shared_ptr>::ConstPtr b); - void doRoute(Deliverable& msg, qpid::sys::CopyOnWriteArray<Binding::shared_ptr>::ConstPtr b); + void routeIVE(); - + struct MatchQueue { - const Queue::shared_ptr queue; + const Queue::shared_ptr queue; MatchQueue(Queue::shared_ptr q); bool operator()(Exchange::Binding::shared_ptr b); }; @@ -146,7 +143,7 @@ public: virtual bool isBound(Queue::shared_ptr queue, const std::string* const routingKey, const qpid::framing::FieldTable* const args) = 0; QPID_BROKER_EXTERN virtual void setProperties(const boost::intrusive_ptr<Message>&); virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args) = 0; - + //PersistableExchange: QPID_BROKER_EXTERN void setPersistenceId(uint64_t id) const; uint64_t getPersistenceId() const { return persistenceId; } diff --git a/qpid/cpp/src/qpid/broker/FanOutExchange.cpp b/qpid/cpp/src/qpid/broker/FanOutExchange.cpp index 972244c942..e9007ba682 100644 --- a/qpid/cpp/src/qpid/broker/FanOutExchange.cpp +++ b/qpid/cpp/src/qpid/broker/FanOutExchange.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -26,7 +26,7 @@ using namespace qpid::framing; using namespace qpid::sys; namespace _qmf = qmf::org::apache::qpid::broker; -namespace +namespace { const std::string qpidFedOp("qpid.fed.op"); const std::string qpidFedTags("qpid.fed.tags"); @@ -106,11 +106,34 @@ bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*key*/, cons return true; } -void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* /*args*/) -{ +void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* /*args*/){ PreRoute pr(msg, this); + uint32_t count(0); + BindingsArray::ConstPtr p = bindings.snapshot(); - doRoute(msg, p); + if (p.get()){ + for(std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i, count++){ + msg.deliverTo((*i)->queue); + if ((*i)->mgmtBinding != 0) + (*i)->mgmtBinding->inc_msgMatched (); + } + } + + if (mgmtExchange != 0) + { + mgmtExchange->inc_msgReceives (); + mgmtExchange->inc_byteReceives (msg.contentSize ()); + if (count == 0) + { + mgmtExchange->inc_msgDrops (); + mgmtExchange->inc_byteDrops (msg.contentSize ()); + } + else + { + mgmtExchange->inc_msgRoutes (count); + mgmtExchange->inc_byteRoutes (count * msg.contentSize ()); + } + } } bool FanOutExchange::isBound(Queue::shared_ptr queue, const string* const, const FieldTable* const) diff --git a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp b/qpid/cpp/src/qpid/broker/HeadersExchange.cpp index e4825344a0..c628c44909 100644 --- a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp +++ b/qpid/cpp/src/qpid/broker/HeadersExchange.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -118,17 +118,31 @@ void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, cons PreRoute pr(msg, this); + uint32_t count(0); + Bindings::ConstPtr p = bindings.snapshot(); - Bindings::Ptr b(new std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> >); - if (p.get()) - { + if (p.get()){ for (std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i) { if (match((*i)->args, *args)) { - b->push_back(*i); + msg.deliverTo((*i)->queue); + count++; + if ((*i)->mgmtBinding != 0) + (*i)->mgmtBinding->inc_msgMatched(); } } } - doRoute(msg, b); + + if (mgmtExchange != 0) { + mgmtExchange->inc_msgReceives(); + mgmtExchange->inc_byteReceives(msg.contentSize()); + if (count == 0) { + mgmtExchange->inc_msgDrops(); + mgmtExchange->inc_byteDrops(msg.contentSize()); + } else { + mgmtExchange->inc_msgRoutes(count); + mgmtExchange->inc_byteRoutes(count * msg.contentSize()); + } + } } @@ -149,7 +163,7 @@ HeadersExchange::~HeadersExchange() {} const std::string HeadersExchange::typeName("headers"); -namespace +namespace { bool match_values(const FieldValue& bind, const FieldValue& msg) { @@ -167,7 +181,7 @@ bool HeadersExchange::match(const FieldTable& bind, const FieldTable& msg) { i != bind.end(); ++i) { - if (i->first != x_match) + if (i->first != x_match) { Map::const_iterator j = msg.find(i->first); if (j == msg.end()) return false; @@ -180,7 +194,7 @@ bool HeadersExchange::match(const FieldTable& bind, const FieldTable& msg) { i != bind.end(); ++i) { - if (i->first != x_match) + if (i->first != x_match) { Map::const_iterator j = msg.find(i->first); if (j != msg.end()) { diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp index 8731a29d24..7360010192 100644 --- a/qpid/cpp/src/qpid/broker/Message.cpp +++ b/qpid/cpp/src/qpid/broker/Message.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -22,7 +22,6 @@ #include "qpid/broker/Message.h" #include "qpid/broker/ExchangeRegistry.h" #include "qpid/broker/ExpiryPolicy.h" -#include "qpid/broker/NullMessageStore.h" #include "qpid/StringUtils.h" #include "qpid/framing/frame_functors.h" #include "qpid/framing/FieldTable.h" @@ -49,7 +48,7 @@ TransferAdapter Message::TRANSFER; Message::Message(const framing::SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), loaded(false), - staged(false), forcePersistentPolicy(false), publisher(0), adapter(0), + staged(false), forcePersistentPolicy(false), publisher(0), adapter(0), expiration(FAR_FUTURE), enqueueCallback(0), dequeueCallback(0) {} Message::~Message() @@ -76,7 +75,7 @@ std::string Message::getRoutingKey() const return getAdapter().getRoutingKey(frames); } -std::string Message::getExchangeName() const +std::string Message::getExchangeName() const { return getAdapter().getExchange(frames); } @@ -85,7 +84,7 @@ const boost::shared_ptr<Exchange> Message::getExchange(ExchangeRegistry& registr { if (!exchange) { exchange = registry.get(getExchangeName()); - } + } return exchange; } @@ -99,7 +98,7 @@ const FieldTable* Message::getApplicationHeaders() const return getAdapter().getApplicationHeaders(frames); } -bool Message::isPersistent() const +bool Message::isPersistent() { return (getAdapter().isPersistent(frames) || forcePersistentPolicy); } @@ -176,25 +175,26 @@ void Message::decodeContent(framing::Buffer& buffer) } else { //adjust header flags MarkLastSegment f; - frames.map_if(f, TypeFilter<HEADER_BODY>()); + frames.map_if(f, TypeFilter<HEADER_BODY>()); } //mark content loaded loaded = true; } -void Message::releaseContent(bool immediate) +void Message::releaseContent(MessageStore* _store) { - if (store && !NullMessageStore::isNullStore(store) && (immediate || releaseMgr.canRelease())) { + if (!store) { + store = _store; + } + if (store) { if (!getPersistenceId()) { intrusive_ptr<PersistableMessage> pmsg(this); store->stage(pmsg); staged = true; - frames.remove(TypeFilter<CONTENT_BODY>()); - setContentReleased(); - } else if (immediate || releaseMgr.canRelease()) { - frames.remove(TypeFilter<CONTENT_BODY>()); - setContentReleased(); - } + } + //remove any content frames from the frameset + frames.remove(TypeFilter<CONTENT_BODY>()); + setContentReleased(); } } @@ -213,7 +213,7 @@ bool Message::getContentFrame(const Queue& queue, AMQFrame& frame, uint16_t maxC { if (isContentReleased()) { intrusive_ptr<const PersistableMessage> pmsg(this); - + bool done = false; string& data = frame.castBody<AMQContentBody>()->getData(); store->loadContent(queue, pmsg, data, offset, maxContentSize); @@ -239,7 +239,7 @@ void Message::sendContent(const Queue& queue, framing::FrameHandler& out, uint16 uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead(); bool morecontent = true; for (uint64_t offset = 0; morecontent; offset += maxContentSize) - { + { AMQFrame frame((AMQContentBody())); morecontent = getContentFrame(queue, frame, maxContentSize, offset); out.handle(frame); @@ -257,7 +257,7 @@ void Message::sendHeader(framing::FrameHandler& out, uint16_t /*maxFrameSize*/) { sys::Mutex::ScopedLock l(lock); Relay f(out); - frames.map_if(f, TypeFilter<HEADER_BODY>()); + frames.map_if(f, TypeFilter<HEADER_BODY>()); } // TODO aconway 2007-11-09: Obsolete, remove. Was used to cover over @@ -287,7 +287,7 @@ bool Message::isContentLoaded() const } -namespace +namespace { const std::string X_QPID_TRACE("x-qpid.trace"); } @@ -324,13 +324,13 @@ void Message::addTraceId(const std::string& id) trace += ","; trace += id; headers.setString(X_QPID_TRACE, trace); - } + } } } -void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e) +void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e) { - DeliveryProperties* props = getProperties<DeliveryProperties>(); + DeliveryProperties* props = getProperties<DeliveryProperties>(); if (props->getTtl()) { // AMQP requires setting the expiration property to be posix // time_t in seconds. TTL is in milliseconds @@ -347,7 +347,7 @@ void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e) void Message::setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) { expiryPolicy = e; - if (expiryPolicy) + if (expiryPolicy) expiryPolicy->willExpire(*this); } @@ -362,7 +362,7 @@ boost::intrusive_ptr<Message>& Message::getReplacementMessage(const Queue* qfor) Replacement::iterator i = replacement.find(qfor); if (i != replacement.end()){ return i->second; - } + } return empty; } @@ -410,7 +410,7 @@ void Message::setUpdateDestination(const std::string& d) bool Message::isUpdateMessage() { - return updateDestination.size() && isA<MessageTransferBody>() + return updateDestination.size() && isA<MessageTransferBody>() && getMethod<MessageTransferBody>()->getDestination() == updateDestination; } diff --git a/qpid/cpp/src/qpid/broker/Message.h b/qpid/cpp/src/qpid/broker/Message.h index 92fc3df7ec..e4d09b1042 100644 --- a/qpid/cpp/src/qpid/broker/Message.h +++ b/qpid/cpp/src/qpid/broker/Message.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -34,12 +34,12 @@ #include <vector> namespace qpid { - + namespace framing { class FieldTable; class SequenceNumber; } - + namespace broker { class ConnectionToken; class Exchange; @@ -51,10 +51,10 @@ class ExpiryPolicy; class Message : public PersistableMessage { public: typedef boost::function<void (const boost::intrusive_ptr<Message>&)> MessageCallback; - + QPID_BROKER_EXTERN Message(const framing::SequenceNumber& id = framing::SequenceNumber()); QPID_BROKER_EXTERN ~Message(); - + uint64_t getPersistenceId() const { return persistenceId; } void setPersistenceId(uint64_t _persistenceId) const { persistenceId = _persistenceId; } @@ -74,7 +74,7 @@ public: bool isImmediate() const; QPID_BROKER_EXTERN const framing::FieldTable* getApplicationHeaders() const; framing::FieldTable& getOrInsertHeaders(); - QPID_BROKER_EXTERN bool isPersistent() const; + QPID_BROKER_EXTERN bool isPersistent(); bool requiresAccept(); QPID_BROKER_EXTERN void setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e); @@ -82,8 +82,8 @@ public: bool hasExpired(); sys::AbsTime getExpiration() const { return expiration; } - framing::FrameSet& getFrames() { return frames; } - const framing::FrameSet& getFrames() const { return frames; } + framing::FrameSet& getFrames() { return frames; } + const framing::FrameSet& getFrames() const { return frames; } template <class T> T* getProperties() { qpid::framing::AMQHeaderBody* p = frames.getHeaders(); @@ -128,13 +128,13 @@ public: QPID_BROKER_EXTERN void decodeHeader(framing::Buffer& buffer); QPID_BROKER_EXTERN void decodeContent(framing::Buffer& buffer); - + /** * Releases the in-memory content data held by this * message. Must pass in a store from which the data can * be reloaded. */ - void releaseContent(bool immediate = false); + void releaseContent(MessageStore* store); void destroy(); bool getContentFrame(const Queue& queue, framing::AMQFrame& frame, uint16_t maxContentSize, uint64_t offset) const; @@ -145,11 +145,10 @@ public: bool isExcluded(const std::vector<std::string>& excludes) const; void addTraceId(const std::string& id); - - void forcePersistent(); - bool isForcedPersistent(); - void setStore(MessageStore* s) { store = s; } - + + void forcePersistent(); + bool isForcedPersistent(); + boost::intrusive_ptr<Message>& getReplacementMessage(const Queue* qfor) const; void setReplacementMessage(boost::intrusive_ptr<Message> msg, const Queue* qfor); diff --git a/qpid/cpp/src/qpid/broker/MessageBuilder.cpp b/qpid/cpp/src/qpid/broker/MessageBuilder.cpp index e886cc08a0..14b233fd6c 100644 --- a/qpid/cpp/src/qpid/broker/MessageBuilder.cpp +++ b/qpid/cpp/src/qpid/broker/MessageBuilder.cpp @@ -35,6 +35,7 @@ namespace std::string type_str(uint8_t type); const std::string QPID_MANAGEMENT("qpid.management"); } + MessageBuilder::MessageBuilder(MessageStore* const _store, uint64_t _stagingThreshold) : state(DORMANT), store(_store), stagingThreshold(_stagingThreshold), staging(false) {} @@ -79,7 +80,7 @@ void MessageBuilder::handle(AMQFrame& frame) && !NullMessageStore::isNullStore(store) && message->getExchangeName() != QPID_MANAGEMENT /* don't stage mgnt messages */) { - message->releaseContent(true); + message->releaseContent(store); staging = true; } } @@ -95,7 +96,6 @@ void MessageBuilder::end() void MessageBuilder::start(const SequenceNumber& id) { message = intrusive_ptr<Message>(new Message(id)); - message->setStore(store); state = METHOD; staging = false; } diff --git a/qpid/cpp/src/qpid/broker/MessageReleaseManager.h b/qpid/cpp/src/qpid/broker/MessageReleaseManager.h deleted file mode 100644 index 1c3f615f59..0000000000 --- a/qpid/cpp/src/qpid/broker/MessageReleaseManager.h +++ /dev/null @@ -1,54 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _broker_MessageReleaseManager_h -#define _broker_MessageReleaseManager_h - -namespace qpid { - namespace broker { - - class MessageReleaseManager - { - private: - bool releaseBlocked; - bool releaseRequested; - bool released; - - public: - MessageReleaseManager(): releaseBlocked(false), releaseRequested(false), released(false) {} - virtual ~MessageReleaseManager() {} - - bool isReleaseBlocked() const { return releaseBlocked; } - void blockRelease() { if (!released) releaseBlocked = true; } - - bool isReleaseRequested() const { return releaseRequested; } - void setReleaseRequested() { if (!released) releaseRequested = true; } - - bool isReleased() const { return released; } - void setReleased() { released = true; } - - bool canRelease() { return !releaseBlocked && releaseRequested; } - }; - - } -} - - -#endif /*_broker_MessageReleaseManager_h*/ diff --git a/qpid/cpp/src/qpid/broker/PersistableMessage.cpp b/qpid/cpp/src/qpid/broker/PersistableMessage.cpp index dc855315db..2ef223aa81 100644 --- a/qpid/cpp/src/qpid/broker/PersistableMessage.cpp +++ b/qpid/cpp/src/qpid/broker/PersistableMessage.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -22,7 +22,6 @@ #include "qpid/broker/PersistableMessage.h" #include "qpid/broker/MessageStore.h" -#include "qpid/broker/NullMessageStore.h" #include <iostream> using namespace qpid::broker; @@ -35,8 +34,9 @@ class MessageStore; PersistableMessage::~PersistableMessage() {} PersistableMessage::PersistableMessage() : - asyncEnqueueCounter(0), + asyncEnqueueCounter(0), asyncDequeueCounter(0), + contentReleased(false), store(0) {} @@ -56,22 +56,13 @@ void PersistableMessage::flush() if (q) { store->flush(*q); } - } + } } -void PersistableMessage::setContentReleased() { releaseMgr.setReleased(); } - -void PersistableMessage::blockRelease() { releaseMgr.blockRelease(); } - -bool PersistableMessage::requestContentRelease() -{ - if (!store || NullMessageStore::isNullStore(store) || releaseMgr.isReleaseBlocked()) return false; - releaseMgr.setReleaseRequested(); - return true; -} - -bool PersistableMessage::isContentReleased()const { return releaseMgr.isReleased(); } +void PersistableMessage::setContentReleased() {contentReleased = true; } +bool PersistableMessage::isContentReleased()const { return contentReleased; } + bool PersistableMessage::isEnqueueComplete() { sys::ScopedLock<sys::Mutex> l(asyncEnqueueLock); return asyncEnqueueCounter == 0; @@ -94,8 +85,8 @@ void PersistableMessage::enqueueComplete() { for (syncList::iterator i = synclist.begin(); i != synclist.end(); ++i) { PersistableQueue::shared_ptr q(i->lock()); if (q) q->notifyDurableIOComplete(); - } - } + } + } } } @@ -104,13 +95,13 @@ bool PersistableMessage::isStoredOnQueue(PersistableQueue::shared_ptr queue){ for (syncList::iterator i = synclist.begin(); i != synclist.end(); ++i) { PersistableQueue::shared_ptr q(i->lock()); if (q && q->getPersistenceId() == queue->getPersistenceId()) return true; - } - } + } + } return false; } -void PersistableMessage::addToSyncList(PersistableQueue::shared_ptr queue, MessageStore* _store) { +void PersistableMessage::addToSyncList(PersistableQueue::shared_ptr queue, MessageStore* _store) { if (_store){ sys::ScopedLock<sys::Mutex> l(storeLock); store = _store; @@ -119,22 +110,22 @@ void PersistableMessage::addToSyncList(PersistableQueue::shared_ptr queue, Messa } } -void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) { +void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) { addToSyncList(queue, _store); enqueueAsync(); } -void PersistableMessage::enqueueAsync() { +void PersistableMessage::enqueueAsync() { sys::ScopedLock<sys::Mutex> l(asyncEnqueueLock); - asyncEnqueueCounter++; + asyncEnqueueCounter++; } -bool PersistableMessage::isDequeueComplete() { +bool PersistableMessage::isDequeueComplete() { sys::ScopedLock<sys::Mutex> l(asyncDequeueLock); return asyncDequeueCounter == 0; } - -void PersistableMessage::dequeueComplete() { + +void PersistableMessage::dequeueComplete() { bool notify = false; { sys::ScopedLock<sys::Mutex> l(asyncDequeueLock); @@ -147,7 +138,7 @@ void PersistableMessage::dequeueComplete() { if (notify) allDequeuesComplete(); } -void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) { +void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) { if (_store){ sys::ScopedLock<sys::Mutex> l(storeLock); store = _store; @@ -157,9 +148,9 @@ void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr queue, Messag dequeueAsync(); } -void PersistableMessage::dequeueAsync() { +void PersistableMessage::dequeueAsync() { sys::ScopedLock<sys::Mutex> l(asyncDequeueLock); - asyncDequeueCounter++; + asyncDequeueCounter++; } }} diff --git a/qpid/cpp/src/qpid/broker/PersistableMessage.h b/qpid/cpp/src/qpid/broker/PersistableMessage.h index df645493c9..0274b41375 100644 --- a/qpid/cpp/src/qpid/broker/PersistableMessage.h +++ b/qpid/cpp/src/qpid/broker/PersistableMessage.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -31,7 +31,6 @@ #include "qpid/framing/amqp_types.h" #include "qpid/sys/Mutex.h" #include "qpid/broker/PersistableQueue.h" -#include "qpid/broker/MessageReleaseManager.h" namespace qpid { namespace broker { @@ -47,7 +46,7 @@ class PersistableMessage : public Persistable sys::Mutex asyncEnqueueLock; sys::Mutex asyncDequeueLock; sys::Mutex storeLock; - + /** * Tracks the number of outstanding asynchronous enqueue * operations. When the message is enqueued asynchronously the @@ -69,6 +68,7 @@ class PersistableMessage : public Persistable void enqueueAsync(); void dequeueAsync(); + bool contentReleased; syncList synclist; protected: @@ -81,8 +81,6 @@ class PersistableMessage : public Persistable MessageStore* store; - MessageReleaseManager releaseMgr; - public: typedef boost::shared_ptr<PersistableMessage> shared_ptr; @@ -97,15 +95,9 @@ class PersistableMessage : public Persistable PersistableMessage(); void flush(); - - bool requestContentRelease(); - + bool isContentReleased() const; - - void blockRelease(); - - virtual QPID_BROKER_EXTERN bool isPersistent() const = 0; - + QPID_BROKER_EXTERN bool isEnqueueComplete(); QPID_BROKER_EXTERN void enqueueComplete(); @@ -115,16 +107,16 @@ class PersistableMessage : public Persistable QPID_BROKER_EXTERN bool isDequeueComplete(); - + QPID_BROKER_EXTERN void dequeueComplete(); QPID_BROKER_EXTERN void dequeueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store); bool isStoredOnQueue(PersistableQueue::shared_ptr queue); - + void addToSyncList(PersistableQueue::shared_ptr queue, MessageStore* _store); - + }; }} diff --git a/qpid/cpp/src/qpid/broker/PersistableQueue.h b/qpid/cpp/src/qpid/broker/PersistableQueue.h index aa6a751f61..8d85d36fef 100644 --- a/qpid/cpp/src/qpid/broker/PersistableQueue.h +++ b/qpid/cpp/src/qpid/broker/PersistableQueue.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -55,18 +55,18 @@ public: virtual const std::string& getName() const = 0; virtual ~PersistableQueue() { - if (externalQueueStore) + if (externalQueueStore) delete externalQueueStore; }; virtual void setExternalQueueStore(ExternalQueueStore* inst) = 0; - + inline ExternalQueueStore* getExternalQueueStore() const {return externalQueueStore;}; - + PersistableQueue():externalQueueStore(NULL){ }; - - + + /** * call back to signal async AIO writes have * completed (enqueue/dequeue etc) @@ -76,9 +76,9 @@ public: */ virtual void notifyDurableIOComplete() = 0; protected: - + ExternalQueueStore* externalQueueStore; - + }; }} diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index c3b14688d6..b2a8e223c5 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -56,7 +56,7 @@ using std::mem_fun; namespace _qmf = qmf::org::apache::qpid::broker; -namespace +namespace { const std::string qpidMaxSize("qpid.max_size"); const std::string qpidMaxCount("qpid.max_count"); @@ -76,16 +76,16 @@ const int ENQUEUE_ONLY=1; const int ENQUEUE_AND_DEQUEUE=2; } -Queue::Queue(const string& _name, bool _autodelete, +Queue::Queue(const string& _name, bool _autodelete, MessageStore* const _store, const OwnershipToken* const _owner, Manageable* parent, Broker* b) : - name(_name), + name(_name), autodelete(_autodelete), store(_store), - owner(_owner), + owner(_owner), consumerCount(0), exclusive(0), noLocal(false), @@ -182,9 +182,9 @@ void Queue::deliver(boost::intrusive_ptr<Message>& msg){ void Queue::recover(boost::intrusive_ptr<Message>& msg){ push(msg, true); - if (store){ + if (store){ // setup synclist for recovered messages, so they don't get re-stored on lastNodeFailure - msg->addToSyncList(shared_from_this(), store); + msg->addToSyncList(shared_from_this(), store); } msg->enqueueComplete(); // mark the message as enqueued mgntEnqStats(msg); @@ -192,7 +192,7 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){ if (store && !msg->isContentLoaded()) { //content has not been loaded, need to ensure that lazy loading mode is set: //TODO: find a nicer way to do this - msg->releaseContent(true); + msg->releaseContent(store); } } @@ -209,13 +209,13 @@ void Queue::requeue(const QueuedMessage& msg){ if (!isEnqueued(msg)) return; QueueListeners::NotificationSet copy; - { + { Mutex::ScopedLock locker(messageLock); msg.payload->enqueueComplete(); // mark the message as enqueued messages.push_front(msg); listeners.populate(copy); - // for persistLastNode - don't force a message twice to disk, but force it if no force before + // for persistLastNode - don't force a message twice to disk, but force it if no force before if(inLastNodeFailure && persistLastNode && !msg.payload->isStoredOnQueue(shared_from_this())) { msg.payload->forcePersistent(); if (msg.payload->isForcedPersistent() ){ @@ -234,7 +234,7 @@ void Queue::clearLVQIndex(const QueuedMessage& msg){ } } -bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message) +bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message) { Mutex::ScopedLock locker(messageLock); QPID_LOG(debug, "Attempting to acquire message at " << position); @@ -258,7 +258,7 @@ bool Queue::acquire(const QueuedMessage& msg) { QPID_LOG(debug, "attempting to acquire " << msg.position); for (Messages::iterator i = messages.begin(); i != messages.end(); i++) { if ((i->position == msg.position && !lastValueQueue) // note that in some cases payload not be set - || (lastValueQueue && (i->position == msg.position) && + || (lastValueQueue && (i->position == msg.position) && msg.payload.get() == checkLvqReplace(*i).payload.get()) ) { clearLVQIndex(msg); @@ -296,7 +296,7 @@ bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr c) case NO_MESSAGES: default: return false; - } + } } else { return browseNextMessage(m, c); } @@ -317,7 +317,7 @@ bool Queue::checkForMessages(Consumer::shared_ptr c) //enqueued and so is not available for consumption yet, //register consumer for notification when this changes listeners.addListener(c); - return false; + return false; } else { //check that consumer has sufficient credit for the //message (if it does not, no need to register it for @@ -332,7 +332,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ { while (true) { Mutex::ScopedLock locker(messageLock); - if (messages.empty()) { + if (messages.empty()) { QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'"); listeners.addListener(c); return NO_MESSAGES; @@ -345,7 +345,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ } if (c->filter(msg.payload)) { - if (c->accept(msg.payload)) { + if (c->accept(msg.payload)) { m = msg; popMsg(msg); return CONSUMED; @@ -358,7 +358,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ //consumer will never want this message QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'"); return CANT_CONSUME; - } + } } } } @@ -423,7 +423,7 @@ bool Queue::seek(QueuedMessage& msg, Consumer::shared_ptr c) { if (c->position < getFront().position) { msg = getFront(); return true; - } else { + } else { //TODO: can improve performance of this search, for now just searching linearly from end Messages::reverse_iterator pos; for (Messages::reverse_iterator i = messages.rbegin(); i != messages.rend() && i->position > c->position; i++) { @@ -524,7 +524,7 @@ void Queue::purgeExpired() */ uint32_t Queue::purge(const uint32_t purge_request){ Mutex::ScopedLock locker(messageLock); - uint32_t purge_count = purge_request; // only comes into play if >0 + uint32_t purge_count = purge_request; // only comes into play if >0 uint32_t count = 0; // Either purge them all or just the some (purge_count) while the queue isn't empty. @@ -537,7 +537,7 @@ uint32_t Queue::purge(const uint32_t purge_request){ uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty) { Mutex::ScopedLock locker(messageLock); - uint32_t move_count = qty; // only comes into play if qty >0 + uint32_t move_count = qty; // only comes into play if qty >0 uint32_t count = 0; // count how many were moved for returning while((!qty || move_count--) && !messages.empty()) { @@ -566,16 +566,15 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ Messages dequeues; QueueListeners::NotificationSet copy; { - Mutex::ScopedLock locker(messageLock); + Mutex::ScopedLock locker(messageLock); QueuedMessage qm(this, msg, ++sequence); - msg->setStore(store); if (policy.get()) { policy->tryEnqueue(qm); //depending on policy, may have some dequeues if (!isRecovery) pendingDequeues.swap(dequeues); } if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence); - + LVQ::iterator i; const framing::FieldTable* ft = msg->getApplicationHeaders(); if (lastValueQueue && ft){ @@ -585,7 +584,7 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ if (i == lvq.end() || msg->isUpdateMessage()){ messages.push_back(qm); listeners.populate(copy); - lvq[key] = msg; + lvq[key] = msg; }else { boost::intrusive_ptr<Message> old = i->second->getReplacementMessage(this); if (!old) old = i->second; @@ -595,10 +594,10 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ //recovery is complete pendingDequeues.push_back(QueuedMessage(qm.queue, old, qm.position)); } else { - Mutex::ScopedUnlock u(messageLock); + Mutex::ScopedUnlock u(messageLock); dequeue(0, QueuedMessage(qm.queue, old, qm.position)); } - } + } }else { messages.push_back(qm); listeners.populate(copy); @@ -633,8 +632,8 @@ QueuedMessage& Queue::checkLvqReplace(QueuedMessage& msg) if (ft) { string key = ft->getAsString(qpidVQMatchProperty); if (lvq.find(key) != lvq.end()){ - lvq[key] = replacement; - } + lvq[key] = replacement; + } } msg.payload = replacement; } @@ -645,7 +644,7 @@ QueuedMessage& Queue::checkLvqReplace(QueuedMessage& msg) uint32_t Queue::getMessageCount() const { Mutex::ScopedLock locker(messageLock); - + uint32_t count = 0; for ( Messages::const_iterator i = messages.begin(); i != messages.end(); ++i ) { //NOTE: don't need to use checkLvqReplace() here as it @@ -653,7 +652,7 @@ uint32_t Queue::getMessageCount() const //so the enqueueComplete check has no effect if ( i->payload->isEnqueueComplete() ) count ++; } - + return count; } @@ -697,13 +696,13 @@ void Queue::setLastNodeFailure() } } -// return true if store exists, +// return true if store exists, bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg) { if (inLastNodeFailure && persistLastNode){ msg->forcePersistent(); } - + if (traceId.size()) { msg->addTraceId(traceId); } @@ -717,13 +716,13 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg) return false; } -// return true if store exists, +// return true if store exists, bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) { { Mutex::ScopedLock locker(messageLock); if (!isEnqueued(msg)) return false; - if (!ctxt) { + if (!ctxt) { dequeued(msg); } } @@ -739,7 +738,7 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) void Queue::dequeueCommitted(const QueuedMessage& msg) { Mutex::ScopedLock locker(messageLock); - dequeued(msg); + dequeued(msg); if (mgmtObject != 0) { mgmtObject->inc_msgTxnDequeues(); mgmtObject->inc_byteTxnDequeues(msg.payload->contentSize()); @@ -795,7 +794,7 @@ void Queue::configure(const FieldTable& _settings, bool recovering) QPID_LOG(debug, "Configured queue as Last Value Queue No Browse"); lastValueQueue = lastValueQueueNoBrowse; } - + persistLastNode= _settings.get(qpidPersistLastNode); if (persistLastNode) QPID_LOG(debug, "Configured queue to Persist data if cluster fails to one node"); @@ -804,7 +803,7 @@ void Queue::configure(const FieldTable& _settings, bool recovering) if (excludeList.size()) { split(traceExclude, excludeList, ", "); } - QPID_LOG(debug, "Configured queue " << getName() << " with qpid.trace.id='" << traceId + QPID_LOG(debug, "Configured queue " << getName() << " with qpid.trace.id='" << traceId << "' and qpid.trace.exclude='"<< excludeList << "' i.e. " << traceExclude.size() << " elements"); eventMode = _settings.getAsInt(qpidQueueEventGeneration); @@ -860,9 +859,9 @@ const QueuePolicy* Queue::getPolicy() return policy.get(); } -uint64_t Queue::getPersistenceId() const -{ - return persistenceId; +uint64_t Queue::getPersistenceId() const +{ + return persistenceId; } void Queue::setPersistenceId(uint64_t _persistenceId) const @@ -881,18 +880,18 @@ void Queue::setPersistenceId(uint64_t _persistenceId) const persistenceId = _persistenceId; } -void Queue::encode(Buffer& buffer) const +void Queue::encode(Buffer& buffer) const { buffer.putShortString(name); buffer.put(settings); - if (policy.get()) { + if (policy.get()) { buffer.put(*policy); } } uint32_t Queue::encodedSize() const { - return name.size() + 1/*short string size octet*/ + settings.encodedSize() + return name.size() + 1/*short string size octet*/ + settings.encodedSize() + (policy.get() ? (*policy).encodedSize() : 0); } @@ -923,50 +922,50 @@ boost::shared_ptr<Exchange> Queue::getAlternateExchange() void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue) { - if (broker.getQueues().destroyIf(queue->getName(), + if (broker.getQueues().destroyIf(queue->getName(), boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue))) { queue->unbind(broker.getExchanges(), queue); queue->destroy(); } } -bool Queue::isExclusiveOwner(const OwnershipToken* const o) const -{ +bool Queue::isExclusiveOwner(const OwnershipToken* const o) const +{ Mutex::ScopedLock locker(ownershipLock); - return o == owner; + return o == owner; } -void Queue::releaseExclusiveOwnership() -{ +void Queue::releaseExclusiveOwnership() +{ Mutex::ScopedLock locker(ownershipLock); - owner = 0; + owner = 0; } -bool Queue::setExclusiveOwner(const OwnershipToken* const o) -{ +bool Queue::setExclusiveOwner(const OwnershipToken* const o) +{ Mutex::ScopedLock locker(ownershipLock); if (owner) { return false; } else { - owner = o; + owner = o; return true; } } -bool Queue::hasExclusiveOwner() const -{ +bool Queue::hasExclusiveOwner() const +{ Mutex::ScopedLock locker(ownershipLock); - return owner != 0; + return owner != 0; } -bool Queue::hasExclusiveConsumer() const -{ - return exclusive; +bool Queue::hasExclusiveConsumer() const +{ + return exclusive; } void Queue::setExternalQueueStore(ExternalQueueStore* inst) { - if (externalQueueStore!=inst && externalQueueStore) - delete externalQueueStore; + if (externalQueueStore!=inst && externalQueueStore) + delete externalQueueStore; externalQueueStore = inst; if (inst) { @@ -976,6 +975,19 @@ void Queue::setExternalQueueStore(ExternalQueueStore* inst) { } } +bool Queue::releaseMessageContent(const QueuedMessage& m) +{ + if (store && !NullMessageStore::isNullStore(store)) { + QPID_LOG(debug, "Message " << m.position << " on " << name << " released from memory"); + m.payload->releaseContent(store); + return true; + } else { + QPID_LOG(warning, "Message " << m.position << " on " << name + << " cannot be released from memory as the queue is not durable"); + return false; + } +} + ManagementObject* Queue::GetManagementObject (void) const { return (ManagementObject*) mgmtObject; @@ -1050,7 +1062,7 @@ bool Queue::isEnqueued(const QueuedMessage& msg) void Queue::addPendingDequeue(const QueuedMessage& msg) { //assumes lock is held - true at present but rather nasty as this is a public method - pendingDequeues.push_back(msg); + pendingDequeues.push_back(msg); } QueueListeners& Queue::getListeners() { return listeners; } diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index 286ac67124..77799fd967 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -156,8 +156,8 @@ namespace qpid { typedef std::vector<shared_ptr> vector; QPID_BROKER_EXTERN Queue(const string& name, - bool autodelete = false, - MessageStore* const store = 0, + bool autodelete = false, + MessageStore* const store = 0, const OwnershipToken* const owner = 0, management::Manageable* parent = 0, Broker* broker = 0); @@ -213,11 +213,11 @@ namespace qpid { bool exclusive = false); QPID_BROKER_EXTERN void cancel(Consumer::shared_ptr c); - uint32_t purge(const uint32_t purge_request = 0); //defaults to all messages + uint32_t purge(const uint32_t purge_request = 0); //defaults to all messages QPID_BROKER_EXTERN void purgeExpired(); //move qty # of messages to destination Queue destq - uint32_t move(const Queue::shared_ptr destq, uint32_t qty); + uint32_t move(const Queue::shared_ptr destq, uint32_t qty); QPID_BROKER_EXTERN uint32_t getMessageCount() const; QPID_BROKER_EXTERN uint32_t getConsumerCount() const; @@ -254,8 +254,8 @@ namespace qpid { * Inform queue of messages that were enqueued, have since * been acquired but not yet accepted or released (and * thus are still logically on the queue) - used in - * clustered broker. - */ + * clustered broker. + */ void enqueued(const QueuedMessage& msg); /** @@ -266,9 +266,9 @@ namespace qpid { * accepted it). */ bool isEnqueued(const QueuedMessage& msg); - + /** - * Gets the next available message + * Gets the next available message */ QPID_BROKER_EXTERN QueuedMessage get(); @@ -315,6 +315,8 @@ namespace qpid { bindings.eachBinding(f); } + bool releaseMessageContent(const QueuedMessage&); + void popMsg(QueuedMessage& qmsg); /** Set the position sequence number for the next message on the queue. @@ -338,7 +340,7 @@ namespace qpid { * queues. It is used for dequeueing messages in response * to an enqueue while avoid holding lock over call to * store. - * + * * Assumes messageLock is held - true for curent use case * (QueuePolicy::tryEnqueue()) but rather nasty as this is a public * method diff --git a/qpid/cpp/src/qpid/broker/QueuePolicy.cpp b/qpid/cpp/src/qpid/broker/QueuePolicy.cpp index 03a7951237..39afe90134 100644 --- a/qpid/cpp/src/qpid/broker/QueuePolicy.cpp +++ b/qpid/cpp/src/qpid/broker/QueuePolicy.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -28,7 +28,7 @@ using namespace qpid::broker; using namespace qpid::framing; -QueuePolicy::QueuePolicy(uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) : +QueuePolicy::QueuePolicy(uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) : maxCount(_maxCount), maxSize(_maxSize), type(_type), count(0), size(0), policyExceeded(false) {} void QueuePolicy::enqueued(uint64_t _size) @@ -89,7 +89,7 @@ void QueuePolicy::tryEnqueue(const QueuedMessage& m) } else { std::string queue = m.queue ? m.queue->getName() : std::string("unknown queue"); throw ResourceLimitExceededException( - QPID_MSG("Policy exceeded on " << queue << " by message " << m.position + QPID_MSG("Policy exceeded on " << queue << " by message " << m.position << " of size " << m.payload->contentSize() << " , policy: " << *this)); } } @@ -129,7 +129,7 @@ std::string QueuePolicy::getType(const FieldTable& settings) FieldTable::ValuePtr v = settings.get(typeKey); if (v && v->convertsTo<std::string>()) { std::string t = v->get<std::string>(); - std::transform(t.begin(), t.end(), t.begin(), tolower); + std::transform(t.begin(), t.end(), t.begin(), tolower); if (t == REJECT || t == FLOW_TO_DISK || t == RING || t == RING_STRICT) return t; } return FLOW_TO_DISK; @@ -152,7 +152,7 @@ void QueuePolicy::encode(Buffer& buffer) const buffer.putLongLong(size.get()); } -void QueuePolicy::decode ( Buffer& buffer ) +void QueuePolicy::decode ( Buffer& buffer ) { maxCount = buffer.getLong(); maxSize = buffer.getLongLong(); @@ -179,15 +179,15 @@ const std::string QueuePolicy::RING("ring"); const std::string QueuePolicy::RING_STRICT("ring_strict"); uint64_t QueuePolicy::defaultMaxSize(0); -FlowToDiskPolicy::FlowToDiskPolicy(uint32_t _maxCount, uint64_t _maxSize) : +FlowToDiskPolicy::FlowToDiskPolicy(uint32_t _maxCount, uint64_t _maxSize) : QueuePolicy(_maxCount, _maxSize, FLOW_TO_DISK) {} bool FlowToDiskPolicy::checkLimit(const QueuedMessage& m) { - return QueuePolicy::checkLimit(m) || (m.queue->getPersistenceId() && m.payload->requestContentRelease()); + return QueuePolicy::checkLimit(m) || m.queue->releaseMessageContent(m); } -RingQueuePolicy::RingQueuePolicy(uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) : +RingQueuePolicy::RingQueuePolicy(uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) : QueuePolicy(_maxCount, _maxSize, _type), strict(_type == RING_STRICT) {} bool before(const QueuedMessage& a, const QueuedMessage& b) @@ -219,19 +219,19 @@ bool RingQueuePolicy::isEnqueued(const QueuedMessage& m) //for non-strict ring policy, a message can be replaced (and //therefore dequeued) before it is accepted or released by //subscriber; need to detect this - return find(m, pendingDequeues, false) || find(m, queue, false); + return find(m, pendingDequeues, false) || find(m, queue, false); } bool RingQueuePolicy::checkLimit(const QueuedMessage& m) { if (QueuePolicy::checkLimit(m)) return true;//if haven't hit limit, ok to accept - + QueuedMessage oldest; { qpid::sys::Mutex::ScopedLock l(lock); if (queue.empty()) { - QPID_LOG(debug, "Message too large for ring queue " - << (m.queue ? m.queue->getName() : std::string("unknown queue")) + QPID_LOG(debug, "Message too large for ring queue " + << (m.queue ? m.queue->getName() : std::string("unknown queue")) << " [" << *this << "] " << ": message size = " << m.payload->contentSize() << " bytes"); return false; @@ -251,13 +251,13 @@ bool RingQueuePolicy::checkLimit(const QueuedMessage& m) pendingDequeues.push_back(oldest); } oldest.queue->addPendingDequeue(oldest); - QPID_LOG(debug, "Ring policy triggered in queue " + QPID_LOG(debug, "Ring policy triggered in queue " << (m.queue ? m.queue->getName() : std::string("unknown queue")) << ": removed message " << oldest.position << " to make way for " << m.position); return true; } else { - QPID_LOG(debug, "Ring policy could not be triggered in queue " - << (m.queue ? m.queue->getName() : std::string("unknown queue")) + QPID_LOG(debug, "Ring policy could not be triggered in queue " + << (m.queue ? m.queue->getName() : std::string("unknown queue")) << ": oldest message (seq-no=" << oldest.position << ") has been delivered but not yet acknowledged or requeued"); //in strict mode, if oldest message has been delivered (hence //cannot be acquired) but not yet acked, it should not be @@ -299,7 +299,7 @@ std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(uint32_t maxCount, uin } } - + namespace qpid { namespace broker { @@ -309,7 +309,7 @@ std::ostream& operator<<(std::ostream& out, const QueuePolicy& p) else out << "size: unlimited"; out << "; "; if (p.maxCount) out << "count: max=" << p.maxCount << ", current=" << p.count.get(); - else out << "count: unlimited"; + else out << "count: unlimited"; out << "; type=" << p.type; return out; } diff --git a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp index e340209a44..5bc4cdf960 100644 --- a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp +++ b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -35,7 +35,7 @@ namespace qpid { namespace broker { RecoveryManagerImpl::RecoveryManagerImpl(QueueRegistry& _queues, ExchangeRegistry& _exchanges, LinkRegistry& _links, - DtxManager& _dtxMgr, uint64_t _stagingThreshold) + DtxManager& _dtxMgr, uint64_t _stagingThreshold) : queues(_queues), exchanges(_exchanges), links(_links), dtxMgr(_dtxMgr), stagingThreshold(_stagingThreshold) {} RecoveryManagerImpl::~RecoveryManagerImpl() {} @@ -45,7 +45,7 @@ class RecoverableMessageImpl : public RecoverableMessage intrusive_ptr<Message> msg; const uint64_t stagingThreshold; public: - RecoverableMessageImpl(const intrusive_ptr<Message>& _msg, uint64_t _stagingThreshold); + RecoverableMessageImpl(const intrusive_ptr<Message>& _msg, uint64_t _stagingThreshold); ~RecoverableMessageImpl() {}; void setPersistenceId(uint64_t id); bool loadContent(uint64_t available); @@ -61,7 +61,7 @@ class RecoverableQueueImpl : public RecoverableQueue public: RecoverableQueueImpl(const boost::shared_ptr<Queue>& _queue) : queue(_queue) {} ~RecoverableQueueImpl() {}; - void setPersistenceId(uint64_t id); + void setPersistenceId(uint64_t id); uint64_t getPersistenceId() const; const std::string& getName() const; void setExternalQueueStore(ExternalQueueStore* inst); @@ -129,10 +129,10 @@ RecoverableMessage::shared_ptr RecoveryManagerImpl::recoverMessage(framing::Buff { boost::intrusive_ptr<Message> message(new Message()); message->decodeHeader(buffer); - return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(message, stagingThreshold)); + return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(message, stagingThreshold)); } -RecoverableTransaction::shared_ptr RecoveryManagerImpl::recoverTransaction(const std::string& xid, +RecoverableTransaction::shared_ptr RecoveryManagerImpl::recoverTransaction(const std::string& xid, std::auto_ptr<TPCTransactionContext> txn) { DtxBuffer::shared_ptr buffer(new DtxBuffer()); @@ -159,7 +159,7 @@ void RecoveryManagerImpl::recoveryComplete() queues.eachQueue(boost::bind(&Queue::recoveryComplete, _1)); } -RecoverableMessageImpl:: RecoverableMessageImpl(const intrusive_ptr<Message>& _msg, uint64_t _stagingThreshold) : msg(_msg), stagingThreshold(_stagingThreshold) +RecoverableMessageImpl:: RecoverableMessageImpl(const intrusive_ptr<Message>& _msg, uint64_t _stagingThreshold) : msg(_msg), stagingThreshold(_stagingThreshold) { if (!msg->isPersistent()) { msg->forcePersistent(); // set so that message will get dequeued from store. @@ -195,7 +195,7 @@ void RecoverableQueueImpl::setPersistenceId(uint64_t id) { queue->setPersistenceId(id); } - + uint64_t RecoverableQueueImpl::getPersistenceId() const { return queue->getPersistenceId(); @@ -205,7 +205,7 @@ const std::string& RecoverableQueueImpl::getName() const { return queue->getName(); } - + void RecoverableQueueImpl::setExternalQueueStore(ExternalQueueStore* inst) { queue->setExternalQueueStore(inst); diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index 87d7f6f97b..bdd5f33601 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -86,7 +86,7 @@ bool SemanticState::exists(const string& consumerTag){ return consumers.find(consumerTag) != consumers.end(); } -void SemanticState::consume(const string& tag, +void SemanticState::consume(const string& tag, Queue::shared_ptr queue, bool ackRequired, bool acquire, bool exclusive, const string& resumeId, uint64_t resumeTtl, const FieldTable& arguments) { @@ -103,7 +103,7 @@ void SemanticState::cancel(const string& tag){ //should cancel all unacked messages for this consumer so that //they are not redelivered on recovery for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::cancel, _1, tag)); - + } } @@ -173,7 +173,7 @@ void SemanticState::endDtx(const std::string& xid, bool fail) dtxBuffer->fail(); } else { dtxBuffer->markEnded(); - } + } dtxBuffer.reset(); } @@ -233,9 +233,9 @@ void SemanticState::record(const DeliveryRecord& delivery) const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency"); -SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, - const string& _name, - Queue::shared_ptr _queue, +SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, + const string& _name, + Queue::shared_ptr _queue, bool ack, bool _acquire, bool _exclusive, @@ -244,20 +244,20 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, const framing::FieldTable& _arguments -) : +) : Consumer(_acquire), - parent(_parent), - name(_name), - queue(_queue), - ackExpected(ack), + parent(_parent), + name(_name), + queue(_queue), + ackExpected(ack), acquire(_acquire), - blocked(true), + blocked(true), windowing(true), exclusive(_exclusive), resumeId(_resumeId), resumeTtl(_resumeTtl), arguments(_arguments), - msgCredit(0), + msgCredit(0), byteCredit(0), notifyEnabled(true), syncFrequency(_arguments.getAsInt(QPID_SYNC_FREQUENCY)), @@ -279,7 +279,7 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg) if (!ackExpected) record.setEnded();//allows message to be released now its been delivered if (windowing || ackExpected || !acquire) { parent->record(record); - } + } if (acquire && !ackExpected) { queue->dequeue(0, msg); } @@ -297,7 +297,7 @@ bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg) // checkCredit fails because the message is to big, we should // remain on queue's listener list for possible smaller messages // in future. - // + // blocked = !(filter(msg) && checkCredit(msg)); return !blocked; } @@ -305,7 +305,7 @@ bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg) void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg) { uint32_t originalMsgCredit = msgCredit; - uint32_t originalByteCredit = byteCredit; + uint32_t originalByteCredit = byteCredit; if (msgCredit != 0xFFFFFFFF) { msgCredit--; } @@ -315,13 +315,13 @@ void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg) QPID_LOG(debug, "Credit allocated for '" << name << "' on " << parent << ", was " << " bytes: " << originalByteCredit << " msgs: " << originalMsgCredit << " now bytes: " << byteCredit << " msgs: " << msgCredit); - + } bool SemanticState::ConsumerImpl::checkCredit(intrusive_ptr<Message>& msg) { if (msgCredit == 0 || (byteCredit != 0xFFFFFFFF && byteCredit < msg->getRequiredCredit())) { - QPID_LOG(debug, "Not enough credit for '" << name << "' on " << parent + QPID_LOG(debug, "Not enough credit for '" << name << "' on " << parent << ", bytes: " << byteCredit << " msgs: " << msgCredit); return false; } else { @@ -341,7 +341,7 @@ void SemanticState::cancel(ConsumerImpl::shared_ptr c) Queue::shared_ptr queue = c->getQueue(); if(queue) { queue->cancel(c); - if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) { + if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) { Queue::tryAutoDelete(session.getBroker(), queue); } } @@ -366,7 +366,7 @@ const std::string nullstring; void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) { msg->setTimestamp(getSession().getBroker().getExpiryPolicy()); - + std::string exchangeName = msg->getExchangeName(); if (!cacheExchange || cacheExchange->getName() != exchangeName) cacheExchange = session.getBroker().getExchanges().get(exchangeName); @@ -393,7 +393,7 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) { if (!strategy.delivered) { //TODO:if discard-unroutable, just drop it - //TODO:else if accept-mode is explicit, reject it + //TODO:else if accept-mode is explicit, reject it //else route it to alternate exchange if (cacheExchange->getAlternate()) { cacheExchange->getAlternate()->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders()); @@ -402,7 +402,7 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) { msg->destroy(); } } - msg->releaseContent(); // release frames if release has been requested + } void SemanticState::requestDispatch() @@ -421,7 +421,7 @@ void SemanticState::ConsumerImpl::requestDispatch() } bool SemanticState::complete(DeliveryRecord& delivery) -{ +{ ConsumerImplMap::iterator i = consumers.find(delivery.getTag()); if (i != consumers.end()) { i->second->complete(delivery); @@ -449,7 +449,7 @@ void SemanticState::recover(bool requeue) unacked.clear(); for_each(copy.rbegin(), copy.rend(), mem_fun_ref(&DeliveryRecord::requeue)); }else{ - for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::redeliver, _1, this)); + for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::redeliver, _1, this)); //unconfirmed messages re redelivered and therefore have their //id adjusted, confirmed messages are not and so the ordering //w.r.t id is lost @@ -570,7 +570,7 @@ Queue::shared_ptr SemanticState::getQueue(const string& name) const { } AckRange SemanticState::findRange(DeliveryId first, DeliveryId last) -{ +{ return DeliveryRecord::findRange(unacked, first, last); } @@ -655,13 +655,13 @@ void SemanticState::accepted(const SequenceSet& commands) { //in transactional mode, don't dequeue or remove, just //maintain set of acknowledged messages: accumulatedAck.add(commands); - + if (dtxBuffer.get()) { //if enlisted in a dtx, copy the relevant slice from //unacked and record it against that transaction TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked)); accumulatedAck.clear(); - dtxBuffer->enlist(txAck); + dtxBuffer->enlist(txAck); //mark the relevant messages as 'ended' in unacked //if the messages are already completed, they can be diff --git a/qpid/cpp/src/qpid/broker/TopicExchange.cpp b/qpid/cpp/src/qpid/broker/TopicExchange.cpp index 99d6c1cb8d..6bf0b104ea 100644 --- a/qpid/cpp/src/qpid/broker/TopicExchange.cpp +++ b/qpid/cpp/src/qpid/broker/TopicExchange.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -36,7 +36,7 @@ namespace _qmf = qmf::org::apache::qpid::broker; // - excessive string copying: should be 0 copy, match from original buffer. // - match/lookup: use descision tree or other more efficient structure. -namespace +namespace { const std::string qpidFedOp("qpid.fed.op"); const std::string qpidFedTags("qpid.fed.tags"); @@ -53,7 +53,7 @@ namespace { // Iterate over a string of '.'-separated tokens. struct TokenIterator { typedef pair<const char*,const char*> Token; - + TokenIterator(const char* b, const char* e) : token(make_pair(b, find(b,e,'.'))), end(e) {} bool finished() const { return !token.first; } @@ -122,7 +122,7 @@ class Matcher { Matcher(const string& p, const string& k) : matched(), pattern(&p[0], &p[0]+p.size()), key(&k[0], &k[0]+k.size()) { matched = match(); } - + operator bool() const { return matched; } private: @@ -158,7 +158,7 @@ class Matcher { } if (!pattern.finished() && pattern.match1('#')) pattern.next(); // Trailing # matches empty. - return pattern.finished() && key.finished(); + return pattern.finished() && key.finished(); } bool matched; @@ -173,7 +173,7 @@ string TopicExchange::normalize(const string& pattern) { return normal; } -bool TopicExchange::match(const string& pattern, const string& key) +bool TopicExchange::match(const string& pattern, const string& key) { return Matcher(pattern, key); } @@ -231,11 +231,11 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons */ std::vector<std::string> keys2prop; { - RWlock::ScopedRlock l(lock); + RWlock::ScopedRlock l(lock); for (BindingMap::iterator iter = bindings.begin(); iter != bindings.end(); iter++) { const BoundKey& bk = iter->second; - + if (bk.fedBinding.hasLocal()) { keys2prop.push_back(iter->first); } @@ -293,24 +293,44 @@ bool TopicExchange::isBound(Queue::shared_ptr queue, const string& pattern) return q != qv.end(); } -void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/) -{ - qpid::sys::CopyOnWriteArray<Binding::shared_ptr>::Ptr b(new std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> >); +void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){ + Binding::vector mb; PreRoute pr(msg, this); + uint32_t count(0); { - RWlock::ScopedRlock l(lock); - for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) { - if (match(i->first, routingKey)) { - Binding::vector& qv(i->second.bindingVector); - for(Binding::vector::iterator j = qv.begin(); j != qv.end(); j++){ - b->push_back(*j); - } + RWlock::ScopedRlock l(lock); + for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) { + if (match(i->first, routingKey)) { + Binding::vector& qv(i->second.bindingVector); + for(Binding::vector::iterator j = qv.begin(); j != qv.end(); j++, count++){ + mb.push_back(*j); } } } + } + + for (Binding::vector::iterator j = mb.begin(); j != mb.end(); ++j) { + msg.deliverTo((*j)->queue); + if ((*j)->mgmtBinding != 0) + (*j)->mgmtBinding->inc_msgMatched (); + } - doRoute(msg, b); + if (mgmtExchange != 0) + { + mgmtExchange->inc_msgReceives (); + mgmtExchange->inc_byteReceives (msg.contentSize ()); + if (count == 0) + { + mgmtExchange->inc_msgDrops (); + mgmtExchange->inc_byteDrops (msg.contentSize ()); + } + else + { + mgmtExchange->inc_msgRoutes (count); + mgmtExchange->inc_byteRoutes (count * msg.contentSize ()); + } + } } bool TopicExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const) @@ -323,7 +343,7 @@ bool TopicExchange::isBound(Queue::shared_ptr queue, const string* const routing return bindings.size() > 0; } else if (routingKey) { for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) { - if (match(i->first, *routingKey)) + if (match(i->first, *routingKey)) return true; } } else { diff --git a/qpid/cpp/src/qpid/sys/CopyOnWriteArray.h b/qpid/cpp/src/qpid/sys/CopyOnWriteArray.h index b7111f2b17..e4ae3a6094 100644 --- a/qpid/cpp/src/qpid/sys/CopyOnWriteArray.h +++ b/qpid/cpp/src/qpid/sys/CopyOnWriteArray.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -39,7 +39,6 @@ class CopyOnWriteArray { public: typedef boost::shared_ptr<const std::vector<T> > ConstPtr; - typedef boost::shared_ptr<std::vector<T> > Ptr; CopyOnWriteArray() {} CopyOnWriteArray(const CopyOnWriteArray& c) : array(c.array) {} diff --git a/qpid/cpp/src/qpid/xml/XmlExchange.cpp b/qpid/cpp/src/qpid/xml/XmlExchange.cpp index 253b9ff8d0..8a1ef6149e 100644 --- a/qpid/cpp/src/qpid/xml/XmlExchange.cpp +++ b/qpid/cpp/src/qpid/xml/XmlExchange.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -69,7 +69,7 @@ XmlExchange::XmlExchange(const std::string& _name, bool _durable, // #### TODO: The Binding should take the query text // #### only. Consider encapsulating the entire block, including // #### the if condition. - + bool XmlExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* bindingArguments) { @@ -97,7 +97,7 @@ bool XmlExchange::bind(Queue::shared_ptr queue, const string& routingKey, const if ((*it)->getStaticAnalysis().areContextFlagsUsed()) { binding->parse_message_content = true; break; - } + } } } @@ -129,11 +129,11 @@ bool XmlExchange::unbind(Queue::shared_ptr queue, const string& routingKey, cons } return true; } else { - return false; + return false; } } -bool XmlExchange::matches(Query& query, Deliverable& msg, const qpid::framing::FieldTable* args, bool parse_message_content) +bool XmlExchange::matches(Query& query, Deliverable& msg, const qpid::framing::FieldTable* args, bool parse_message_content) { string msgContent; @@ -151,7 +151,7 @@ bool XmlExchange::matches(Query& query, Deliverable& msg, const qpid::framing::F QPID_LOG(trace, "matches: message content is [" << msgContent << "]"); - XERCES_CPP_NAMESPACE::MemBufInputSource xml((const XMLByte*) msgContent.c_str(), + XERCES_CPP_NAMESPACE::MemBufInputSource xml((const XMLByte*) msgContent.c_str(), msgContent.length(), "input" ); // This will parse the document using either Xerces or FastXDM, depending @@ -206,26 +206,49 @@ void XmlExchange::route(Deliverable& msg, const string& routingKey, const FieldT PreRoute pr(msg, this); try { XmlBinding::vector::ConstPtr p; - qpid::sys::CopyOnWriteArray<Binding::shared_ptr>::Ptr b(new std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> >); - { + { RWlock::ScopedRlock l(lock); - p = bindingsMap[routingKey].snapshot(); - if (!p.get()) return; - } + p = bindingsMap[routingKey].snapshot(); + if (!p) return; + } + int count(0); for (std::vector<XmlBinding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); i++) { - if (matches((*i)->xquery, msg, args, (*i)->parse_message_content)) { - b->push_back(*i); + if (matches((*i)->xquery, msg, args, (*i)->parse_message_content)) { + msg.deliverTo((*i)->queue); + count++; + QPID_LOG(trace, "Delivered to queue" ); + + if ((*i)->mgmtBinding != 0) + (*i)->mgmtBinding->inc_msgMatched (); } - } - doRoute(msg, b); + } + if (!count) { + QPID_LOG(warning, "XMLExchange " << getName() << ": could not route message with query " << routingKey); + if (mgmtExchange != 0) { + mgmtExchange->inc_msgDrops (); + mgmtExchange->inc_byteDrops (msg.contentSize ()); + } + } else { + if (mgmtExchange != 0) { + mgmtExchange->inc_msgRoutes (count); + mgmtExchange->inc_byteRoutes (count * msg.contentSize ()); + } + } + + if (mgmtExchange != 0) { + mgmtExchange->inc_msgReceives (); + mgmtExchange->inc_byteReceives (msg.contentSize ()); + } } catch (...) { QPID_LOG(warning, "XMLExchange " << getName() << ": exception routing message with query " << routingKey); } + + } -bool XmlExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const) +bool XmlExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const) { RWlock::ScopedRlock l(lock); if (routingKey) { @@ -251,12 +274,12 @@ bool XmlExchange::isBound(Queue::shared_ptr queue, const string* const routingKe } -XmlExchange::~XmlExchange() +XmlExchange::~XmlExchange() { bindingsMap.clear(); } const std::string XmlExchange::typeName("xml"); - + } } diff --git a/qpid/cpp/src/qpid/xml/XmlExchange.h b/qpid/cpp/src/qpid/xml/XmlExchange.h index 389bfebfd0..38cb7699b6 100644 --- a/qpid/cpp/src/qpid/xml/XmlExchange.h +++ b/qpid/cpp/src/qpid/xml/XmlExchange.h @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -53,7 +53,7 @@ class XmlExchange : public virtual Exchange { Binding(key, queue, parent), xquery(query), parse_message_content(true) {} }; - + typedef std::map<string, XmlBinding::vector > XmlBindingsMap; XmlBindingsMap bindingsMap; @@ -64,13 +64,13 @@ class XmlExchange : public virtual Exchange { public: static const std::string typeName; - + XmlExchange(const std::string& name, management::Manageable* parent = 0, Broker* broker = 0); XmlExchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args, management::Manageable* parent = 0, Broker* broker = 0); virtual std::string getType() const { return typeName; } - + virtual bool bind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args); virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args); diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp index 378c0471da..841a19f7c1 100644 --- a/qpid/cpp/src/tests/QueueTest.cpp +++ b/qpid/cpp/src/tests/QueueTest.cpp @@ -22,8 +22,6 @@ #include "test_tools.h" #include "qpid/Exception.h" #include "qpid/broker/Broker.h" -#include "qpid/broker/DeliverableMessage.h" -#include "qpid/broker/FanOutExchange.h" #include "qpid/broker/Queue.h" #include "qpid/broker/Deliverable.h" #include "qpid/broker/ExchangeRegistry.h" @@ -32,14 +30,12 @@ #include "qpid/broker/ExpiryPolicy.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/client/QueueOptions.h" -#include "qpid/framing/reply_exceptions.h" #include <iostream> #include "boost/format.hpp" using boost::intrusive_ptr; using namespace qpid; using namespace qpid::broker; -using namespace qpid::client; using namespace qpid::framing; using namespace qpid::sys; @@ -277,35 +273,14 @@ QPID_AUTO_TEST_CASE(testPersistLastNodeStanding){ } -const std::string nullxid = ""; - -class SimpleDummyCtxt : public TransactionContext {}; - -class DummyCtxt : public TPCTransactionContext -{ - const std::string xid; -public: - DummyCtxt(const std::string& _xid) : xid(_xid) {} - static std::string getXid(TransactionContext& ctxt) - { - DummyCtxt* c(dynamic_cast<DummyCtxt*>(&ctxt)); - return c ? c->xid : nullxid; - } -}; - -class TestMessageStoreOC : public MessageStore +class TestMessageStoreOC : public NullMessageStore { - std::set<std::string> prepared; - uint64_t nextPersistenceId; public: uint enqCnt; uint deqCnt; bool error; - TestMessageStoreOC() : MessageStore(),nextPersistenceId(1),enqCnt(0),deqCnt(0),error(false) {} - ~TestMessageStoreOC(){} - virtual void dequeue(TransactionContext*, const boost::intrusive_ptr<PersistableMessage>& /*msg*/, const PersistableQueue& /*queue*/) @@ -327,32 +302,8 @@ class TestMessageStoreOC : public MessageStore error=true; } - bool init(const Options*) { return true; } - void truncateInit(const bool) {} - void create(PersistableQueue& queue, const framing::FieldTable&) { queue.setPersistenceId(nextPersistenceId++); } - void destroy(PersistableQueue&) {} - void create(const PersistableExchange& exchange, const framing::FieldTable&) { exchange.setPersistenceId(nextPersistenceId++); } - void destroy(const PersistableExchange&) {} - void bind(const PersistableExchange&, const PersistableQueue&, const std::string&, const framing::FieldTable&) {} - void unbind(const PersistableExchange&, const PersistableQueue&, const std::string&, const framing::FieldTable&) {} - void create(const PersistableConfig& config) { config.setPersistenceId(nextPersistenceId++); } - void destroy(const PersistableConfig&) {} - void stage(const boost::intrusive_ptr<PersistableMessage>&) {} - void destroy(PersistableMessage&) {} - void appendContent(const boost::intrusive_ptr<const PersistableMessage>&, const std::string&) {} - void loadContent(const qpid::broker::PersistableQueue&, const boost::intrusive_ptr<const PersistableMessage>&, - std::string&, uint64_t, uint32_t) { throw qpid::framing::InternalErrorException("Can't load content; persistence not enabled"); } - void flush(const qpid::broker::PersistableQueue&) {} - uint32_t outstandingQueueAIO(const PersistableQueue&) { return 0; } - - std::auto_ptr<TransactionContext> begin() { return std::auto_ptr<TransactionContext>(new SimpleDummyCtxt()); } - std::auto_ptr<TPCTransactionContext> begin(const std::string& xid) { return std::auto_ptr<TPCTransactionContext>(new DummyCtxt(xid)); } - void prepare(TPCTransactionContext& ctxt) { prepared.insert(DummyCtxt::getXid(ctxt)); } - void commit(TransactionContext& ctxt) { prepared.erase(DummyCtxt::getXid(ctxt)); } - void abort(TransactionContext& ctxt) { prepared.erase(DummyCtxt::getXid(ctxt)); } - void collectPreparedXids(std::set<std::string>& out) { out.insert(prepared.begin(), prepared.end()); } - - void recover(RecoveryManager&) {} + TestMessageStoreOC() : NullMessageStore(),enqCnt(0),deqCnt(0),error(false) {} + ~TestMessageStoreOC(){} }; @@ -752,7 +703,7 @@ not requeued to the store. QPID_AUTO_TEST_CASE(testLastNodeJournalError){ /* -simulate store exception going into last node standing +simulate store excption going into last node standing */ TestMessageStoreOC testStore; @@ -786,73 +737,6 @@ intrusive_ptr<Message> mkMsg(std::string exchange, std::string routingKey) { return msg; } -QPID_AUTO_TEST_CASE(testFlowToDiskMsgProperties){ - - TestMessageStoreOC testStore; - client::QueueOptions args; - args.setSizePolicy(FLOW_TO_DISK, 0, 1); - - intrusive_ptr<Message> msg1 = mkMsg("e", "A"); - intrusive_ptr<Message> msg2 = mkMsg("e", "B"); - intrusive_ptr<Message> msg3 = mkMsg("e", "C"); - intrusive_ptr<Message> msg4 = mkMsg("e", "D"); - intrusive_ptr<Message> msg5 = mkMsg("e", "E"); - intrusive_ptr<Message> msg6 = mkMsg("e", "F"); - intrusive_ptr<Message> msg7 = mkMsg("e", "G"); - msg4->forcePersistent(); - msg5->forcePersistent(); - msg7->forcePersistent(); - - DeliverableMessage dmsg1(msg1); - DeliverableMessage dmsg2(msg2); - DeliverableMessage dmsg3(msg3); - DeliverableMessage dmsg4(msg4); - DeliverableMessage dmsg5(msg5); - DeliverableMessage dmsg6(msg6); - DeliverableMessage dmsg7(msg7); - - FanOutExchange fanout1("fanout1", false, args); - FanOutExchange fanout2("fanout2", false, args); - - Queue::shared_ptr queue1(new Queue("queue1", true, &testStore )); - queue1->configure(args); - Queue::shared_ptr queue2(new Queue("queue2", true, &testStore )); - queue2->configure(args); - Queue::shared_ptr queue3(new Queue("queue3", true)); - fanout1.bind(queue1, "", 0); - fanout1.bind(queue2, "", 0); - fanout1.route(dmsg1, "", 0); - msg1->releaseContent(); - fanout1.route(dmsg2, "", 0); - msg2->releaseContent(); - fanout1.route(dmsg3, "", 0); - msg3->releaseContent(); - - BOOST_CHECK_EQUAL(3u, queue1->getMessageCount()); - BOOST_CHECK_EQUAL(3u, queue2->getMessageCount()); - BOOST_CHECK_EQUAL(msg1->isContentReleased(), false); - BOOST_CHECK_EQUAL(msg2->isContentReleased(), true); - BOOST_CHECK_EQUAL(msg3->isContentReleased(), true); - - fanout1.bind(queue3, "", 0); - fanout1.route(dmsg4, "", 0); - msg4->releaseContent(); - BOOST_CHECK_EQUAL(msg4->isContentReleased(), false); - fanout1.route(dmsg5, "", 0); - msg5->releaseContent(); - BOOST_CHECK_EQUAL(msg5->isContentReleased(), false); - - fanout2.bind(queue3, "", 0); - fanout2.route(dmsg6, "", 0); - fanout2.route(dmsg7, "", 0); - msg6->releaseContent(); - BOOST_CHECK_EQUAL(msg6->isContentReleased(), false); - msg7->releaseContent(); - BOOST_CHECK_EQUAL(msg7->isContentReleased(), false); - -} - - QPID_AUTO_TEST_SUITE_END() }} // namespace qpid::tests |