summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp26
1 files changed, 25 insertions, 1 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index ee9bff4513..14a89f7a66 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -16,16 +16,21 @@
*
*/
#include "qpid/broker/Queue.h"
+#include "qpid/broker/MessageStore.h"
#include "qpid/concurrent/MonitorImpl.h"
#include <iostream>
using namespace qpid::broker;
using namespace qpid::concurrent;
-Queue::Queue(const string& _name, bool _durable, u_int32_t _autodelete, const ConnectionToken* const _owner) :
+Queue::Queue(const string& _name, bool _durable, u_int32_t _autodelete,
+ MessageStore* const _store,
+ const ConnectionToken* const _owner) :
+
name(_name),
autodelete(_autodelete),
durable(_durable),
+ store(_store),
owner(_owner),
queueing(false),
dispatching(false),
@@ -48,6 +53,11 @@ void Queue::bound(Binding* b){
}
void Queue::deliver(Message::shared_ptr& msg){
+ enqueue(msg, 0);
+ process(msg);
+}
+
+void Queue::process(Message::shared_ptr& msg){
Locker locker(lock);
if(queueing || !dispatch(msg)){
queueing = true;
@@ -153,3 +163,17 @@ bool Queue::canAutoDelete() const{
Locker locker(lock);
return lastUsed && ((apr_time_as_msec(apr_time_now()) - lastUsed) > autodelete);
}
+
+void Queue::enqueue(Message::shared_ptr& msg, const string * const xid){
+ bool persistent(false);//TODO: pull this from headers
+ if(persistent){
+ store->enqueue(msg, name, xid);
+ }
+}
+
+void Queue::dequeue(Message::shared_ptr& msg, const string * const xid){
+ bool persistent(false);//TODO: pull this from headers
+ if(persistent){
+ store->dequeue(msg, name, xid);
+ }
+}