summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cpp/lib/broker/BrokerMessage.cpp6
-rw-r--r--cpp/lib/broker/BrokerMessage.h8
-rw-r--r--cpp/lib/broker/BrokerQueue.cpp51
-rw-r--r--cpp/lib/broker/BrokerQueue.h10
-rw-r--r--cpp/lib/broker/Makefile.am2
-rw-r--r--cpp/lib/broker/QueuePolicy.cpp49
-rw-r--r--cpp/lib/broker/QueuePolicy.h46
-rw-r--r--cpp/lib/broker/SessionHandlerImpl.cpp6
8 files changed, 163 insertions, 15 deletions
diff --git a/cpp/lib/broker/BrokerMessage.cpp b/cpp/lib/broker/BrokerMessage.cpp
index 50bf319797..d30cd12bc3 100644
--- a/cpp/lib/broker/BrokerMessage.cpp
+++ b/cpp/lib/broker/BrokerMessage.cpp
@@ -32,6 +32,7 @@
using namespace boost;
using namespace qpid::broker;
using namespace qpid::framing;
+using namespace qpid::sys;
Message::Message(const ConnectionToken* const _publisher,
const string& _exchange, const string& _routingKey,
@@ -100,6 +101,7 @@ void Message::sendContent(OutputHandler* out, int channel, u_int32_t framesize){
AMQBody::shared_ptr headerBody = static_pointer_cast<AMQBody, AMQHeaderBody>(header);
out->send(new AMQFrame(channel, headerBody));
+ Mutex::ScopedLock locker(contentLock);
if (content.get()) content->send(out, channel, framesize);
}
@@ -173,6 +175,7 @@ void Message::encodeHeader(Buffer& buffer)
void Message::encodeContent(Buffer& buffer)
{
+ Mutex::ScopedLock locker(contentLock);
if (content.get()) content->encode(buffer);
}
@@ -183,6 +186,7 @@ u_int32_t Message::encodedSize()
u_int32_t Message::encodedContentSize()
{
+ Mutex::ScopedLock locker(contentLock);
return content.get() ? content->size() : 0;
}
@@ -200,6 +204,7 @@ u_int64_t Message::expectedContentSize()
void Message::releaseContent(MessageStore* store)
{
+ Mutex::ScopedLock locker(contentLock);
if (!content.get() || content->size() > 0) {
//set content to lazy loading mode (but only if there is stored content):
@@ -212,5 +217,6 @@ void Message::releaseContent(MessageStore* store)
void Message::setContent(std::auto_ptr<Content>& _content)
{
+ Mutex::ScopedLock locker(contentLock);
content = _content;
}
diff --git a/cpp/lib/broker/BrokerMessage.h b/cpp/lib/broker/BrokerMessage.h
index 59e146959d..3bf70551d3 100644
--- a/cpp/lib/broker/BrokerMessage.h
+++ b/cpp/lib/broker/BrokerMessage.h
@@ -23,13 +23,14 @@
#include <memory>
#include <boost/shared_ptr.hpp>
-#include <ConnectionToken.h>
-#include <Content.h>
-#include <TxBuffer.h>
#include <AMQContentBody.h>
#include <AMQHeaderBody.h>
#include <BasicHeaderProperties.h>
+#include <ConnectionToken.h>
+#include <Content.h>
#include <OutputHandler.h>
+#include <Mutex.h>
+#include <TxBuffer.h>
namespace qpid {
namespace broker {
@@ -52,6 +53,7 @@ namespace qpid {
std::auto_ptr<Content> content;
u_int64_t size;
u_int64_t persistenceId;
+ qpid::sys::Mutex contentLock;
void sendContent(qpid::framing::OutputHandler* out,
int channel, u_int32_t framesize);
diff --git a/cpp/lib/broker/BrokerQueue.cpp b/cpp/lib/broker/BrokerQueue.cpp
index 4eabfdec50..26857b6d31 100644
--- a/cpp/lib/broker/BrokerQueue.cpp
+++ b/cpp/lib/broker/BrokerQueue.cpp
@@ -26,6 +26,7 @@
using namespace qpid::broker;
using namespace qpid::sys;
+using namespace qpid::framing;
Queue::Queue(const string& _name, u_int32_t _autodelete,
MessageStore* const _store,
@@ -62,8 +63,7 @@ void Queue::deliver(Message::shared_ptr& msg){
}
void Queue::recover(Message::shared_ptr& msg){
- queueing = true;
- messages.push(msg);
+ push(msg);
if (store && msg->expectedContentSize() != msg->encodedContentSize()) {
msg->releaseContent(store);
}
@@ -72,8 +72,7 @@ void Queue::recover(Message::shared_ptr& msg){
void Queue::process(Message::shared_ptr& msg){
Mutex::ScopedLock locker(lock);
if(queueing || !dispatch(msg)){
- queueing = true;
- messages.push(msg);
+ push(msg);
}
}
@@ -116,7 +115,7 @@ void Queue::dispatch(){
while(proceed){
Mutex::ScopedLock locker(lock);
if(!messages.empty() && dispatch(messages.front())){
- messages.pop();
+ pop();
}else{
dispatching = false;
proceed = false;
@@ -149,7 +148,7 @@ Message::shared_ptr Queue::dequeue(){
Message::shared_ptr msg;
if(!messages.empty()){
msg = messages.front();
- messages.pop();
+ pop();
}
return msg;
}
@@ -157,10 +156,19 @@ Message::shared_ptr Queue::dequeue(){
u_int32_t Queue::purge(){
Mutex::ScopedLock locker(lock);
int count = messages.size();
- while(!messages.empty()) messages.pop();
+ while(!messages.empty()) pop();
return count;
}
+void Queue::pop(){
+ messages.pop();
+}
+
+void Queue::push(Message::shared_ptr& msg){
+ queueing = true;
+ messages.push(msg);
+}
+
u_int32_t Queue::getMessageCount() const{
Mutex::ScopedLock locker(lock);
return messages.size();
@@ -190,8 +198,30 @@ void Queue::dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const st
}
}
-void Queue::create()
+namespace
{
+ const std::string qpidMaxSize("qpid.max_size");
+ const std::string qpidMaxCount("qpid.max_count");
+}
+
+void Queue::create(const FieldTable& settings)
+{
+ //Note: currently field table only contain signed 32 bit ints, which
+ // restricts the values that can be set on the queue policy.
+ u_int32_t maxCount(0);
+ try {
+ maxCount = settings.getInt(qpidMaxSize);
+ } catch (FieldNotFoundException& ignore) {
+ }
+ u_int32_t maxSize(0);
+ try {
+ maxSize = settings.getInt(qpidMaxCount);
+ } catch (FieldNotFoundException& ignore) {
+ }
+ if (maxCount || maxSize) {
+ setPolicy(std::auto_ptr<QueuePolicy>(new QueuePolicy(maxCount, maxSize)));
+ }
+
if (store) {
store->create(*this);
}
@@ -203,3 +233,8 @@ void Queue::destroy()
store->destroy(*this);
}
}
+
+void Queue::setPolicy(std::auto_ptr<QueuePolicy> _policy)
+{
+ policy = _policy;
+}
diff --git a/cpp/lib/broker/BrokerQueue.h b/cpp/lib/broker/BrokerQueue.h
index 13f4bf2de0..18befbee03 100644
--- a/cpp/lib/broker/BrokerQueue.h
+++ b/cpp/lib/broker/BrokerQueue.h
@@ -22,6 +22,7 @@
#define _Queue_
#include <vector>
+#include <memory>
#include <queue>
#include <boost/shared_ptr.hpp>
#include <amqp_types.h>
@@ -29,7 +30,9 @@
#include <ConnectionToken.h>
#include <Consumer.h>
#include <BrokerMessage.h>
+#include <FieldTable.h>
#include <sys/Monitor.h>
+#include <QueuePolicy.h>
namespace qpid {
namespace broker {
@@ -41,6 +44,7 @@ namespace qpid {
struct ExclusiveAccessException{};
using std::string;
+
/**
* The brokers representation of an amqp queue. Messages are
* delivered to a queue from where they can be dispatched to
@@ -62,9 +66,13 @@ namespace qpid {
int64_t lastUsed;
Consumer* exclusive;
mutable u_int64_t persistenceId;
+ std::auto_ptr<QueuePolicy> policy;
+ void pop();
+ void push(Message::shared_ptr& msg);
bool startDispatching();
bool dispatch(Message::shared_ptr& msg);
+ void setPolicy(std::auto_ptr<QueuePolicy> policy);
public:
@@ -77,7 +85,7 @@ namespace qpid {
const ConnectionToken* const owner = 0);
~Queue();
- void create();
+ void create(const qpid::framing::FieldTable& settings);
void destroy();
/**
* Informs the queue of a binding that should be cancelled on
diff --git a/cpp/lib/broker/Makefile.am b/cpp/lib/broker/Makefile.am
index 58da2e562c..f02dc4f545 100644
--- a/cpp/lib/broker/Makefile.am
+++ b/cpp/lib/broker/Makefile.am
@@ -59,6 +59,8 @@ libbroker_la_SOURCES = \
NullMessageStore.cpp \
NullMessageStore.h \
Prefetch.h \
+ QueuePolicy.cpp \
+ QueuePolicy.h \
QueueRegistry.cpp \
QueueRegistry.h \
RecoveryManager.cpp \
diff --git a/cpp/lib/broker/QueuePolicy.cpp b/cpp/lib/broker/QueuePolicy.cpp
new file mode 100644
index 0000000000..3cf0882695
--- /dev/null
+++ b/cpp/lib/broker/QueuePolicy.cpp
@@ -0,0 +1,49 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <QueuePolicy.h>
+
+using namespace qpid::broker;
+
+QueuePolicy::QueuePolicy(u_int32_t _maxCount, u_int64_t _maxSize) : maxCount(_maxCount), maxSize(_maxSize) {}
+
+void QueuePolicy::enqueued(Message::shared_ptr& msg, MessageStore* store)
+{
+ if (checkCount(msg) || checkSize(msg)) {
+ msg->releaseContent(store);
+ }
+}
+
+void QueuePolicy::dequeued(Message::shared_ptr& msg, MessageStore* /*store*/)
+{
+ if (maxCount) count--;
+ if (maxSize) size -= msg->contentSize();
+}
+
+bool QueuePolicy::checkCount(Message::shared_ptr& /*msg*/)
+{
+ return maxCount && ++count > maxCount;
+}
+
+bool QueuePolicy::checkSize(Message::shared_ptr& msg)
+{
+ return maxSize && (size += msg->contentSize()) > maxSize;
+}
+
diff --git a/cpp/lib/broker/QueuePolicy.h b/cpp/lib/broker/QueuePolicy.h
new file mode 100644
index 0000000000..399c67d837
--- /dev/null
+++ b/cpp/lib/broker/QueuePolicy.h
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _QueuePolicy_
+#define _QueuePolicy_
+
+#include <BrokerMessage.h>
+
+namespace qpid {
+ namespace broker {
+ class QueuePolicy
+ {
+ const u_int32_t maxCount;
+ const u_int64_t maxSize;
+ u_int32_t count;
+ u_int64_t size;
+
+ bool checkCount(Message::shared_ptr& msg);
+ bool checkSize(Message::shared_ptr& msg);
+ public:
+ QueuePolicy(u_int32_t maxCount, u_int64_t maxSize);
+ void enqueued(Message::shared_ptr& msg, MessageStore* store);
+ void dequeued(Message::shared_ptr& msg, MessageStore* store);
+ };
+ }
+}
+
+
+#endif
diff --git a/cpp/lib/broker/SessionHandlerImpl.cpp b/cpp/lib/broker/SessionHandlerImpl.cpp
index 0dddd957fd..6d7f5048ea 100644
--- a/cpp/lib/broker/SessionHandlerImpl.cpp
+++ b/cpp/lib/broker/SessionHandlerImpl.cpp
@@ -256,7 +256,7 @@ void SessionHandlerImpl::ExchangeHandlerImpl::delete_(u_int16_t channel, u_int16
void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& name,
bool passive, bool durable, bool exclusive,
- bool autoDelete, bool nowait, const qpid::framing::FieldTable& /*arguments*/){
+ bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){
Queue::shared_ptr queue;
if (passive && !name.empty()) {
queue = parent->getQueue(name, channel);
@@ -268,8 +268,8 @@ void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t
if (queue_created.second) { // This is a new queue
parent->getChannel(channel)->setDefaultQueue(queue);
- //create persistent record if required
- queue_created.first->create();
+ //apply settings & create persistent record if required
+ queue_created.first->create(arguments);
//add default binding:
parent->exchanges->getDefault()->bind(queue, name, 0);