diff options
-rw-r--r-- | cpp/lib/broker/BrokerMessage.cpp | 6 | ||||
-rw-r--r-- | cpp/lib/broker/BrokerMessage.h | 8 | ||||
-rw-r--r-- | cpp/lib/broker/BrokerQueue.cpp | 51 | ||||
-rw-r--r-- | cpp/lib/broker/BrokerQueue.h | 10 | ||||
-rw-r--r-- | cpp/lib/broker/Makefile.am | 2 | ||||
-rw-r--r-- | cpp/lib/broker/QueuePolicy.cpp | 49 | ||||
-rw-r--r-- | cpp/lib/broker/QueuePolicy.h | 46 | ||||
-rw-r--r-- | cpp/lib/broker/SessionHandlerImpl.cpp | 6 |
8 files changed, 163 insertions, 15 deletions
diff --git a/cpp/lib/broker/BrokerMessage.cpp b/cpp/lib/broker/BrokerMessage.cpp index 50bf319797..d30cd12bc3 100644 --- a/cpp/lib/broker/BrokerMessage.cpp +++ b/cpp/lib/broker/BrokerMessage.cpp @@ -32,6 +32,7 @@ using namespace boost; using namespace qpid::broker; using namespace qpid::framing; +using namespace qpid::sys; Message::Message(const ConnectionToken* const _publisher, const string& _exchange, const string& _routingKey, @@ -100,6 +101,7 @@ void Message::sendContent(OutputHandler* out, int channel, u_int32_t framesize){ AMQBody::shared_ptr headerBody = static_pointer_cast<AMQBody, AMQHeaderBody>(header); out->send(new AMQFrame(channel, headerBody)); + Mutex::ScopedLock locker(contentLock); if (content.get()) content->send(out, channel, framesize); } @@ -173,6 +175,7 @@ void Message::encodeHeader(Buffer& buffer) void Message::encodeContent(Buffer& buffer) { + Mutex::ScopedLock locker(contentLock); if (content.get()) content->encode(buffer); } @@ -183,6 +186,7 @@ u_int32_t Message::encodedSize() u_int32_t Message::encodedContentSize() { + Mutex::ScopedLock locker(contentLock); return content.get() ? content->size() : 0; } @@ -200,6 +204,7 @@ u_int64_t Message::expectedContentSize() void Message::releaseContent(MessageStore* store) { + Mutex::ScopedLock locker(contentLock); if (!content.get() || content->size() > 0) { //set content to lazy loading mode (but only if there is stored content): @@ -212,5 +217,6 @@ void Message::releaseContent(MessageStore* store) void Message::setContent(std::auto_ptr<Content>& _content) { + Mutex::ScopedLock locker(contentLock); content = _content; } diff --git a/cpp/lib/broker/BrokerMessage.h b/cpp/lib/broker/BrokerMessage.h index 59e146959d..3bf70551d3 100644 --- a/cpp/lib/broker/BrokerMessage.h +++ b/cpp/lib/broker/BrokerMessage.h @@ -23,13 +23,14 @@ #include <memory> #include <boost/shared_ptr.hpp> -#include <ConnectionToken.h> -#include <Content.h> -#include <TxBuffer.h> #include <AMQContentBody.h> #include <AMQHeaderBody.h> #include <BasicHeaderProperties.h> +#include <ConnectionToken.h> +#include <Content.h> #include <OutputHandler.h> +#include <Mutex.h> +#include <TxBuffer.h> namespace qpid { namespace broker { @@ -52,6 +53,7 @@ namespace qpid { std::auto_ptr<Content> content; u_int64_t size; u_int64_t persistenceId; + qpid::sys::Mutex contentLock; void sendContent(qpid::framing::OutputHandler* out, int channel, u_int32_t framesize); diff --git a/cpp/lib/broker/BrokerQueue.cpp b/cpp/lib/broker/BrokerQueue.cpp index 4eabfdec50..26857b6d31 100644 --- a/cpp/lib/broker/BrokerQueue.cpp +++ b/cpp/lib/broker/BrokerQueue.cpp @@ -26,6 +26,7 @@ using namespace qpid::broker; using namespace qpid::sys; +using namespace qpid::framing; Queue::Queue(const string& _name, u_int32_t _autodelete, MessageStore* const _store, @@ -62,8 +63,7 @@ void Queue::deliver(Message::shared_ptr& msg){ } void Queue::recover(Message::shared_ptr& msg){ - queueing = true; - messages.push(msg); + push(msg); if (store && msg->expectedContentSize() != msg->encodedContentSize()) { msg->releaseContent(store); } @@ -72,8 +72,7 @@ void Queue::recover(Message::shared_ptr& msg){ void Queue::process(Message::shared_ptr& msg){ Mutex::ScopedLock locker(lock); if(queueing || !dispatch(msg)){ - queueing = true; - messages.push(msg); + push(msg); } } @@ -116,7 +115,7 @@ void Queue::dispatch(){ while(proceed){ Mutex::ScopedLock locker(lock); if(!messages.empty() && dispatch(messages.front())){ - messages.pop(); + pop(); }else{ dispatching = false; proceed = false; @@ -149,7 +148,7 @@ Message::shared_ptr Queue::dequeue(){ Message::shared_ptr msg; if(!messages.empty()){ msg = messages.front(); - messages.pop(); + pop(); } return msg; } @@ -157,10 +156,19 @@ Message::shared_ptr Queue::dequeue(){ u_int32_t Queue::purge(){ Mutex::ScopedLock locker(lock); int count = messages.size(); - while(!messages.empty()) messages.pop(); + while(!messages.empty()) pop(); return count; } +void Queue::pop(){ + messages.pop(); +} + +void Queue::push(Message::shared_ptr& msg){ + queueing = true; + messages.push(msg); +} + u_int32_t Queue::getMessageCount() const{ Mutex::ScopedLock locker(lock); return messages.size(); @@ -190,8 +198,30 @@ void Queue::dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const st } } -void Queue::create() +namespace { + const std::string qpidMaxSize("qpid.max_size"); + const std::string qpidMaxCount("qpid.max_count"); +} + +void Queue::create(const FieldTable& settings) +{ + //Note: currently field table only contain signed 32 bit ints, which + // restricts the values that can be set on the queue policy. + u_int32_t maxCount(0); + try { + maxCount = settings.getInt(qpidMaxSize); + } catch (FieldNotFoundException& ignore) { + } + u_int32_t maxSize(0); + try { + maxSize = settings.getInt(qpidMaxCount); + } catch (FieldNotFoundException& ignore) { + } + if (maxCount || maxSize) { + setPolicy(std::auto_ptr<QueuePolicy>(new QueuePolicy(maxCount, maxSize))); + } + if (store) { store->create(*this); } @@ -203,3 +233,8 @@ void Queue::destroy() store->destroy(*this); } } + +void Queue::setPolicy(std::auto_ptr<QueuePolicy> _policy) +{ + policy = _policy; +} diff --git a/cpp/lib/broker/BrokerQueue.h b/cpp/lib/broker/BrokerQueue.h index 13f4bf2de0..18befbee03 100644 --- a/cpp/lib/broker/BrokerQueue.h +++ b/cpp/lib/broker/BrokerQueue.h @@ -22,6 +22,7 @@ #define _Queue_ #include <vector> +#include <memory> #include <queue> #include <boost/shared_ptr.hpp> #include <amqp_types.h> @@ -29,7 +30,9 @@ #include <ConnectionToken.h> #include <Consumer.h> #include <BrokerMessage.h> +#include <FieldTable.h> #include <sys/Monitor.h> +#include <QueuePolicy.h> namespace qpid { namespace broker { @@ -41,6 +44,7 @@ namespace qpid { struct ExclusiveAccessException{}; using std::string; + /** * The brokers representation of an amqp queue. Messages are * delivered to a queue from where they can be dispatched to @@ -62,9 +66,13 @@ namespace qpid { int64_t lastUsed; Consumer* exclusive; mutable u_int64_t persistenceId; + std::auto_ptr<QueuePolicy> policy; + void pop(); + void push(Message::shared_ptr& msg); bool startDispatching(); bool dispatch(Message::shared_ptr& msg); + void setPolicy(std::auto_ptr<QueuePolicy> policy); public: @@ -77,7 +85,7 @@ namespace qpid { const ConnectionToken* const owner = 0); ~Queue(); - void create(); + void create(const qpid::framing::FieldTable& settings); void destroy(); /** * Informs the queue of a binding that should be cancelled on diff --git a/cpp/lib/broker/Makefile.am b/cpp/lib/broker/Makefile.am index 58da2e562c..f02dc4f545 100644 --- a/cpp/lib/broker/Makefile.am +++ b/cpp/lib/broker/Makefile.am @@ -59,6 +59,8 @@ libbroker_la_SOURCES = \ NullMessageStore.cpp \ NullMessageStore.h \ Prefetch.h \ + QueuePolicy.cpp \ + QueuePolicy.h \ QueueRegistry.cpp \ QueueRegistry.h \ RecoveryManager.cpp \ diff --git a/cpp/lib/broker/QueuePolicy.cpp b/cpp/lib/broker/QueuePolicy.cpp new file mode 100644 index 0000000000..3cf0882695 --- /dev/null +++ b/cpp/lib/broker/QueuePolicy.cpp @@ -0,0 +1,49 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <QueuePolicy.h> + +using namespace qpid::broker; + +QueuePolicy::QueuePolicy(u_int32_t _maxCount, u_int64_t _maxSize) : maxCount(_maxCount), maxSize(_maxSize) {} + +void QueuePolicy::enqueued(Message::shared_ptr& msg, MessageStore* store) +{ + if (checkCount(msg) || checkSize(msg)) { + msg->releaseContent(store); + } +} + +void QueuePolicy::dequeued(Message::shared_ptr& msg, MessageStore* /*store*/) +{ + if (maxCount) count--; + if (maxSize) size -= msg->contentSize(); +} + +bool QueuePolicy::checkCount(Message::shared_ptr& /*msg*/) +{ + return maxCount && ++count > maxCount; +} + +bool QueuePolicy::checkSize(Message::shared_ptr& msg) +{ + return maxSize && (size += msg->contentSize()) > maxSize; +} + diff --git a/cpp/lib/broker/QueuePolicy.h b/cpp/lib/broker/QueuePolicy.h new file mode 100644 index 0000000000..399c67d837 --- /dev/null +++ b/cpp/lib/broker/QueuePolicy.h @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _QueuePolicy_ +#define _QueuePolicy_ + +#include <BrokerMessage.h> + +namespace qpid { + namespace broker { + class QueuePolicy + { + const u_int32_t maxCount; + const u_int64_t maxSize; + u_int32_t count; + u_int64_t size; + + bool checkCount(Message::shared_ptr& msg); + bool checkSize(Message::shared_ptr& msg); + public: + QueuePolicy(u_int32_t maxCount, u_int64_t maxSize); + void enqueued(Message::shared_ptr& msg, MessageStore* store); + void dequeued(Message::shared_ptr& msg, MessageStore* store); + }; + } +} + + +#endif diff --git a/cpp/lib/broker/SessionHandlerImpl.cpp b/cpp/lib/broker/SessionHandlerImpl.cpp index 0dddd957fd..6d7f5048ea 100644 --- a/cpp/lib/broker/SessionHandlerImpl.cpp +++ b/cpp/lib/broker/SessionHandlerImpl.cpp @@ -256,7 +256,7 @@ void SessionHandlerImpl::ExchangeHandlerImpl::delete_(u_int16_t channel, u_int16 void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& name, bool passive, bool durable, bool exclusive, - bool autoDelete, bool nowait, const qpid::framing::FieldTable& /*arguments*/){ + bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){ Queue::shared_ptr queue; if (passive && !name.empty()) { queue = parent->getQueue(name, channel); @@ -268,8 +268,8 @@ void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t if (queue_created.second) { // This is a new queue parent->getChannel(channel)->setDefaultQueue(queue); - //create persistent record if required - queue_created.first->create(); + //apply settings & create persistent record if required + queue_created.first->create(arguments); //add default binding: parent->exchanges->getDefault()->bind(queue, name, 0); |