summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2006-12-05 17:43:00 +0000
committerGordon Sim <gsim@apache.org>2006-12-05 17:43:00 +0000
commit722b16a1a7bbde82a4cd82c99a0e29d31d0545ca (patch)
tree4866c7dc7e3e85a9f62ce6512ee1a569a4882e20
parent96aa6c6e76ffa946192778d69d36d4c372a0de7b (diff)
downloadqpid-python-722b16a1a7bbde82a4cd82c99a0e29d31d0545ca.tar.gz
Allow settings to be set and persisted for queues.
Define policy based on these settings. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@482723 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/lib/broker/BrokerQueue.cpp36
-rw-r--r--cpp/lib/broker/BrokerQueue.h5
-rw-r--r--cpp/lib/broker/MessageStore.h3
-rw-r--r--cpp/lib/broker/MessageStoreModule.cpp4
-rw-r--r--cpp/lib/broker/MessageStoreModule.h2
-rw-r--r--cpp/lib/broker/NullMessageStore.cpp2
-rw-r--r--cpp/lib/broker/NullMessageStore.h2
-rw-r--r--cpp/lib/broker/QueuePolicy.cpp28
-rw-r--r--cpp/lib/broker/QueuePolicy.h9
9 files changed, 65 insertions, 26 deletions
diff --git a/cpp/lib/broker/BrokerQueue.cpp b/cpp/lib/broker/BrokerQueue.cpp
index 26857b6d31..b0e1f20b01 100644
--- a/cpp/lib/broker/BrokerQueue.cpp
+++ b/cpp/lib/broker/BrokerQueue.cpp
@@ -161,12 +161,14 @@ u_int32_t Queue::purge(){
}
void Queue::pop(){
- messages.pop();
+ if (policy.get()) policy->dequeued(messages.front(), store);
+ messages.pop();
}
void Queue::push(Message::shared_ptr& msg){
queueing = true;
messages.push(msg);
+ if (policy.get()) policy->enqueued(messages.front(), store);
}
u_int32_t Queue::getMessageCount() const{
@@ -206,24 +208,17 @@ namespace
void Queue::create(const FieldTable& settings)
{
- //Note: currently field table only contain signed 32 bit ints, which
- // restricts the values that can be set on the queue policy.
- u_int32_t maxCount(0);
- try {
- maxCount = settings.getInt(qpidMaxSize);
- } catch (FieldNotFoundException& ignore) {
- }
- u_int32_t maxSize(0);
- try {
- maxSize = settings.getInt(qpidMaxCount);
- } catch (FieldNotFoundException& ignore) {
- }
- if (maxCount || maxSize) {
- setPolicy(std::auto_ptr<QueuePolicy>(new QueuePolicy(maxCount, maxSize)));
- }
-
if (store) {
- store->create(*this);
+ store->create(*this, settings);
+ }
+ configure(settings);
+}
+
+void Queue::configure(const FieldTable& settings)
+{
+ QueuePolicy* _policy = new QueuePolicy(settings);
+ if (_policy->getMaxCount() || _policy->getMaxSize()) {
+ setPolicy(std::auto_ptr<QueuePolicy>(_policy));
}
}
@@ -238,3 +233,8 @@ void Queue::setPolicy(std::auto_ptr<QueuePolicy> _policy)
{
policy = _policy;
}
+
+const QueuePolicy* const Queue::getPolicy()
+{
+ return policy.get();
+}
diff --git a/cpp/lib/broker/BrokerQueue.h b/cpp/lib/broker/BrokerQueue.h
index 18befbee03..41611bebe9 100644
--- a/cpp/lib/broker/BrokerQueue.h
+++ b/cpp/lib/broker/BrokerQueue.h
@@ -66,7 +66,7 @@ namespace qpid {
int64_t lastUsed;
Consumer* exclusive;
mutable u_int64_t persistenceId;
- std::auto_ptr<QueuePolicy> policy;
+ std::auto_ptr<QueuePolicy> policy;
void pop();
void push(Message::shared_ptr& msg);
@@ -86,6 +86,7 @@ namespace qpid {
~Queue();
void create(const qpid::framing::FieldTable& settings);
+ void configure(const qpid::framing::FieldTable& settings);
void destroy();
/**
* Informs the queue of a binding that should be cancelled on
@@ -135,6 +136,8 @@ namespace qpid {
* dequeues from memory only
*/
Message::shared_ptr dequeue();
+
+ const QueuePolicy* const getPolicy();
};
}
}
diff --git a/cpp/lib/broker/MessageStore.h b/cpp/lib/broker/MessageStore.h
index ac74155e64..be9172e383 100644
--- a/cpp/lib/broker/MessageStore.h
+++ b/cpp/lib/broker/MessageStore.h
@@ -22,6 +22,7 @@
#define _MessageStore_
#include <BrokerMessage.h>
+#include <FieldTable.h>
#include <RecoveryManager.h>
#include <TransactionalStore.h>
@@ -45,7 +46,7 @@ namespace qpid {
/**
* Record the existance of a durable queue
*/
- virtual void create(const Queue& queue) = 0;
+ virtual void create(const Queue& queue, const qpid::framing::FieldTable& settings) = 0;
/**
* Destroy a durable queue
*/
diff --git a/cpp/lib/broker/MessageStoreModule.cpp b/cpp/lib/broker/MessageStoreModule.cpp
index 7b0335df68..b3f5d6e63c 100644
--- a/cpp/lib/broker/MessageStoreModule.cpp
+++ b/cpp/lib/broker/MessageStoreModule.cpp
@@ -28,9 +28,9 @@ MessageStoreModule::MessageStoreModule(const std::string& name) : store(name)
{
}
-void MessageStoreModule::create(const Queue& queue)
+void MessageStoreModule::create(const Queue& queue, const qpid::framing::FieldTable& settings)
{
- store->create(queue);
+ store->create(queue, settings);
}
void MessageStoreModule::destroy(const Queue& queue)
diff --git a/cpp/lib/broker/MessageStoreModule.h b/cpp/lib/broker/MessageStoreModule.h
index 045abc3a1a..d70aab6d13 100644
--- a/cpp/lib/broker/MessageStoreModule.h
+++ b/cpp/lib/broker/MessageStoreModule.h
@@ -36,7 +36,7 @@ namespace qpid {
qpid::sys::Module<MessageStore> store;
public:
MessageStoreModule(const std::string& name);
- void create(const Queue& queue);
+ void create(const Queue& queue, const qpid::framing::FieldTable& settings);
void destroy(const Queue& queue);
void recover(RecoveryManager& queues, const MessageStoreSettings* const settings = 0);
void stage(Message::shared_ptr& msg);
diff --git a/cpp/lib/broker/NullMessageStore.cpp b/cpp/lib/broker/NullMessageStore.cpp
index 57c297c063..3c29994aac 100644
--- a/cpp/lib/broker/NullMessageStore.cpp
+++ b/cpp/lib/broker/NullMessageStore.cpp
@@ -30,7 +30,7 @@ using namespace qpid::broker;
NullMessageStore::NullMessageStore(bool _warn) : warn(_warn){}
-void NullMessageStore::create(const Queue& queue)
+void NullMessageStore::create(const Queue& queue, const qpid::framing::FieldTable&)
{
if (warn) std::cout << "WARNING: Can't create durable queue '" << queue.getName() << "'. Persistence not enabled." << std::endl;
}
diff --git a/cpp/lib/broker/NullMessageStore.h b/cpp/lib/broker/NullMessageStore.h
index e427cc723f..61afe36281 100644
--- a/cpp/lib/broker/NullMessageStore.h
+++ b/cpp/lib/broker/NullMessageStore.h
@@ -35,7 +35,7 @@ namespace qpid {
const bool warn;
public:
NullMessageStore(bool warn = true);
- virtual void create(const Queue& queue);
+ virtual void create(const Queue& queue, const qpid::framing::FieldTable& settings);
virtual void destroy(const Queue& queue);
virtual void recover(RecoveryManager& queues, const MessageStoreSettings* const settings = 0);
virtual void stage(Message::shared_ptr& msg);
diff --git a/cpp/lib/broker/QueuePolicy.cpp b/cpp/lib/broker/QueuePolicy.cpp
index 3cf0882695..055d415226 100644
--- a/cpp/lib/broker/QueuePolicy.cpp
+++ b/cpp/lib/broker/QueuePolicy.cpp
@@ -21,8 +21,14 @@
#include <QueuePolicy.h>
using namespace qpid::broker;
+using namespace qpid::framing;
-QueuePolicy::QueuePolicy(u_int32_t _maxCount, u_int64_t _maxSize) : maxCount(_maxCount), maxSize(_maxSize) {}
+QueuePolicy::QueuePolicy(u_int32_t _maxCount, u_int64_t _maxSize) :
+ maxCount(_maxCount), maxSize(_maxSize) {}
+
+QueuePolicy::QueuePolicy(const FieldTable& settings) :
+ maxCount(getInt(settings, maxCountKey, 0)),
+ maxSize(getInt(settings, maxSizeKey, 0)) {}
void QueuePolicy::enqueued(Message::shared_ptr& msg, MessageStore* store)
{
@@ -47,3 +53,23 @@ bool QueuePolicy::checkSize(Message::shared_ptr& msg)
return maxSize && (size += msg->contentSize()) > maxSize;
}
+void QueuePolicy::update(FieldTable& settings)
+{
+ if (maxCount) settings.setInt(maxCountKey, maxCount);
+ if (maxSize) settings.setInt(maxSizeKey, maxSize);
+}
+
+
+int QueuePolicy::getInt(const FieldTable& settings, const std::string& key, int defaultValue)
+{
+ //Note: currently field table only contain signed 32 bit ints, which
+ // restricts the values that can be set on the queue policy.
+ try {
+ return settings.getInt(key);
+ } catch (FieldNotFoundException& ignore) {
+ return defaultValue;
+ }
+}
+
+const std::string QueuePolicy::maxCountKey("qpid.max_count");
+const std::string QueuePolicy::maxSizeKey("qpid.max_size");
diff --git a/cpp/lib/broker/QueuePolicy.h b/cpp/lib/broker/QueuePolicy.h
index 399c67d837..c31e9ec968 100644
--- a/cpp/lib/broker/QueuePolicy.h
+++ b/cpp/lib/broker/QueuePolicy.h
@@ -22,22 +22,31 @@
#define _QueuePolicy_
#include <BrokerMessage.h>
+#include <FieldTable.h>
namespace qpid {
namespace broker {
class QueuePolicy
{
+ static const std::string maxCountKey;
+ static const std::string maxSizeKey;
+
const u_int32_t maxCount;
const u_int64_t maxSize;
u_int32_t count;
u_int64_t size;
+ static int getInt(const qpid::framing::FieldTable& settings, const std::string& key, int defaultValue);
bool checkCount(Message::shared_ptr& msg);
bool checkSize(Message::shared_ptr& msg);
public:
QueuePolicy(u_int32_t maxCount, u_int64_t maxSize);
+ QueuePolicy(const qpid::framing::FieldTable& settings);
void enqueued(Message::shared_ptr& msg, MessageStore* store);
void dequeued(Message::shared_ptr& msg, MessageStore* store);
+ void update(qpid::framing::FieldTable& settings);
+ u_int32_t getMaxCount() const { return maxCount; }
+ u_int64_t getMaxSize() const { return maxSize; }
};
}
}