summaryrefslogtreecommitdiff
path: root/cpp/src/qpid
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp70
-rw-r--r--cpp/src/qpid/broker/Queue.h5
-rw-r--r--cpp/src/qpid/client/QueueOptions.cpp11
-rw-r--r--cpp/src/qpid/client/QueueOptions.h7
4 files changed, 68 insertions, 25 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index c899a5befa..fb8bd1288f 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -51,6 +51,21 @@ using std::for_each;
using std::mem_fun;
namespace _qmf = qmf::org::apache::qpid::broker;
+
+namespace
+{
+ const std::string qpidMaxSize("qpid.max_size");
+ const std::string qpidMaxCount("qpid.max_count");
+ const std::string qpidNoLocal("no-local");
+ const std::string qpidTraceIdentity("qpid.trace.id");
+ const std::string qpidTraceExclude("qpid.trace.exclude");
+ const std::string qpidLastValueQueue("qpid.last_value_queue");
+ const std::string qpidOptimisticConsume("qpid.optimistic_consume");
+ const std::string qpidPersistLastNode("qpid.persist_last_node");
+ const std::string qpidVQMatchProperty("qpid.LVQ_key");
+}
+
+
Queue::Queue(const string& _name, bool _autodelete,
MessageStore* const _store,
const OwnershipToken* const _owner,
@@ -253,7 +268,7 @@ bool Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
if (c->filter(msg.payload)) {
if (c->accept(msg.payload)) {
m = msg;
- messages.pop_front();
+ popMsg(msg);
return true;
} else {
//message(s) are available but consumer hasn't got enough credit
@@ -371,7 +386,7 @@ QueuedMessage Queue::get(){
if(!messages.empty()){
msg = messages.front();
- messages.pop_front();
+ popMsg(msg);
}
return msg;
}
@@ -406,22 +421,49 @@ uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty) {
QueuedMessage qmsg = messages.front();
boost::intrusive_ptr<Message> msg = qmsg.payload;
destq->deliver(msg); // deliver message to the destination queue
- messages.pop_front();
+ popMsg(qmsg);
dequeue(0, qmsg);
count++;
}
return count;
}
+void Queue::popMsg(QueuedMessage& qmsg)
+{
+ if (lastValueQueue){
+ const framing::FieldTable* ft = qmsg.payload->getApplicationHeaders();
+ string key = ft->getString(qpidVQMatchProperty);
+ lvq.erase(key);
+ }
+ messages.pop_front();
+}
+
void Queue::push(boost::intrusive_ptr<Message>& msg){
Listeners copy;
{
Mutex::ScopedLock locker(messageLock);
QueuedMessage qm(this, msg, ++sequence);
if (policy.get()) policy->tryEnqueue(qm);
-
- messages.push_back(qm);
- listeners.swap(copy);
+
+ //if (lastValueQueue && LVQinsert(qm) ) return; // LVQ update of existing message
+ LVQ::iterator i;
+ if (lastValueQueue){
+ const framing::FieldTable* ft = msg->getApplicationHeaders();
+ string key = ft->getString(qpidVQMatchProperty);
+
+ i = lvq.find(key);
+ if (i == lvq.end()){
+ messages.push_back(qm);
+ listeners.swap(copy);
+ lvq[key] = &messages.back();
+ }else {
+ i->second->payload = msg;
+ }
+ }else {
+
+ messages.push_back(qm);
+ listeners.swap(copy);
+ }
}
for_each(copy.begin(), copy.end(), boost::mem_fn(&Consumer::notify));
}
@@ -514,8 +556,8 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
void Queue::popAndDequeue()
{
QueuedMessage msg = messages.front();
- messages.pop_front();
- dequeue(0, msg);
+ popMsg(msg);
+ dequeue(0, msg);
}
/**
@@ -529,18 +571,6 @@ void Queue::dequeued(const QueuedMessage& msg)
}
-namespace
-{
- const std::string qpidMaxSize("qpid.max_size");
- const std::string qpidMaxCount("qpid.max_count");
- const std::string qpidNoLocal("no-local");
- const std::string qpidTraceIdentity("qpid.trace.id");
- const std::string qpidTraceExclude("qpid.trace.exclude");
- const std::string qpidLastValueQueue("qpid.last_value_queue");
- const std::string qpidOptimisticConsume("qpid.optimistic_consume");
- const std::string qpidPersistLastNode("qpid.persist_last_node");
-}
-
void Queue::create(const FieldTable& _settings)
{
settings = _settings;
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index 3bde07c4d6..213a36d59d 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -65,6 +65,7 @@ namespace qpid {
typedef std::list<Consumer::shared_ptr> Listeners;
typedef std::deque<QueuedMessage> Messages;
+ typedef std::map<string,QueuedMessage*> LVQ;
const string name;
const bool autodelete;
@@ -81,6 +82,7 @@ namespace qpid {
std::vector<std::string> traceExclude;
Listeners listeners;
Messages messages;
+ LVQ lvq;
mutable qpid::sys::Mutex consumerLock;
mutable qpid::sys::Mutex messageLock;
mutable qpid::sys::Mutex ownershipLock;
@@ -253,6 +255,9 @@ namespace qpid {
}
bool releaseMessageContent(const QueuedMessage&);
+
+ void popMsg(QueuedMessage& qmsg);
+
};
}
}
diff --git a/cpp/src/qpid/client/QueueOptions.cpp b/cpp/src/qpid/client/QueueOptions.cpp
index d0fd6f1e5c..5d1cb74efd 100644
--- a/cpp/src/qpid/client/QueueOptions.cpp
+++ b/cpp/src/qpid/client/QueueOptions.cpp
@@ -38,6 +38,7 @@ const std::string QueueOptions::strRING_STRICT("ring_strict");
const std::string QueueOptions::strLastValueQueue("qpid.last_value_queue");
const std::string QueueOptions::strOptimisticConsume("qpid.optimistic_consume");
const std::string QueueOptions::strPersistLastNode("qpid.persist_last_node");
+const std::string QueueOptions::strLVQMatchProperty("qpid.LVQ_key");
QueueOptions::~QueueOptions()
@@ -83,15 +84,17 @@ void QueueOptions::setPersistLastNode()
void QueueOptions::setOrdering(QueueOrderingPolicy op)
{
if (op == LVQ){
- // TODO, add and test options with LVQ patch.
- // also set the key match for LVQ
- //setString(LastValueQueue, 1);
-
+ setInt(strLastValueQueue, 1);
}else{
clearOrdering();
}
}
+void QueueOptions::getLVQKey(std::string& key)
+{
+ key.assign(strLVQMatchProperty);
+}
+
void QueueOptions::clearSizePolicy()
{
erase(strMaxCountKey);
diff --git a/cpp/src/qpid/client/QueueOptions.h b/cpp/src/qpid/client/QueueOptions.h
index 21333794ac..37cb8616e3 100644
--- a/cpp/src/qpid/client/QueueOptions.h
+++ b/cpp/src/qpid/client/QueueOptions.h
@@ -86,6 +86,11 @@ class QueueOptions: public framing::FieldTable
void clearPersistLastNode();
/**
+ * get the key used match LVQ in args for message transfer
+ */
+ void getLVQKey(std::string& key);
+
+ /**
* Use default odering policy
*/
void clearOrdering();
@@ -100,7 +105,7 @@ class QueueOptions: public framing::FieldTable
static const std::string strLastValueQueue;
static const std::string strOptimisticConsume;
static const std::string strPersistLastNode;
- private:
+ static const std::string strLVQMatchProperty;