summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/Channel.cpp2
-rw-r--r--cpp/src/qpid/broker/DeletingTxOp.cpp4
-rw-r--r--cpp/src/qpid/broker/DeletingTxOp.h2
-rw-r--r--cpp/src/qpid/broker/DeliveryRecord.cpp8
-rw-r--r--cpp/src/qpid/broker/DeliveryRecord.h3
-rw-r--r--cpp/src/qpid/broker/MessageStore.h4
-rw-r--r--cpp/src/qpid/broker/NullMessageStore.cpp55
-rw-r--r--cpp/src/qpid/broker/NullMessageStore.h51
-rw-r--r--cpp/src/qpid/broker/Queue.cpp19
-rw-r--r--cpp/src/qpid/broker/Queue.h8
-rw-r--r--cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp8
-rw-r--r--cpp/src/qpid/broker/TransactionalStore.h11
-rw-r--r--cpp/src/qpid/broker/TxAck.cpp9
-rw-r--r--cpp/src/qpid/broker/TxAck.h2
-rw-r--r--cpp/src/qpid/broker/TxBuffer.cpp18
-rw-r--r--cpp/src/qpid/broker/TxBuffer.h2
-rw-r--r--cpp/src/qpid/broker/TxOp.h4
-rw-r--r--cpp/src/qpid/broker/TxPublish.cpp9
-rw-r--r--cpp/src/qpid/broker/TxPublish.h5
19 files changed, 174 insertions, 50 deletions
diff --git a/cpp/src/qpid/broker/Channel.cpp b/cpp/src/qpid/broker/Channel.cpp
index 45375a9fd3..967c5855fa 100644
--- a/cpp/src/qpid/broker/Channel.cpp
+++ b/cpp/src/qpid/broker/Channel.cpp
@@ -198,7 +198,7 @@ void Channel::ack(u_int64_t deliveryTag, bool multiple){
throw InvalidAckException();
}else if(multiple){
ack_iterator end = ++i;
- for_each(unacked.begin(), end, mem_fun_ref(&DeliveryRecord::discard));
+ for_each(unacked.begin(), end, bind2nd(mem_fun_ref(&DeliveryRecord::discard), 0));
unacked.erase(unacked.begin(), end);
//recalculate the prefetch:
diff --git a/cpp/src/qpid/broker/DeletingTxOp.cpp b/cpp/src/qpid/broker/DeletingTxOp.cpp
index e9b9f30326..ea3735f361 100644
--- a/cpp/src/qpid/broker/DeletingTxOp.cpp
+++ b/cpp/src/qpid/broker/DeletingTxOp.cpp
@@ -21,8 +21,8 @@ using namespace qpid::broker;
DeletingTxOp::DeletingTxOp(TxOp* const _delegate) : delegate(_delegate){}
-bool DeletingTxOp::prepare() throw(){
- return delegate && delegate->prepare();
+bool DeletingTxOp::prepare(TransactionContext* ctxt) throw(){
+ return delegate && delegate->prepare(ctxt);
}
void DeletingTxOp::commit() throw(){
diff --git a/cpp/src/qpid/broker/DeletingTxOp.h b/cpp/src/qpid/broker/DeletingTxOp.h
index 7e43717f17..f450807256 100644
--- a/cpp/src/qpid/broker/DeletingTxOp.h
+++ b/cpp/src/qpid/broker/DeletingTxOp.h
@@ -30,7 +30,7 @@ namespace qpid {
TxOp* delegate;
public:
DeletingTxOp(TxOp* const delegate);
- virtual bool prepare() throw();
+ virtual bool prepare(TransactionContext* ctxt) throw();
virtual void commit() throw();
virtual void rollback() throw();
virtual ~DeletingTxOp(){}
diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp
index 19b786a8d3..a49c5fbc02 100644
--- a/cpp/src/qpid/broker/DeliveryRecord.cpp
+++ b/cpp/src/qpid/broker/DeliveryRecord.cpp
@@ -38,8 +38,8 @@ DeliveryRecord::DeliveryRecord(Message::shared_ptr _msg,
pull(true){}
-void DeliveryRecord::discard() const{
- queue->dequeue(msg, 0);
+void DeliveryRecord::discard(TransactionContext* ctxt) const{
+ queue->dequeue(ctxt, msg, 0);
}
bool DeliveryRecord::matches(u_int64_t tag) const{
@@ -50,10 +50,6 @@ bool DeliveryRecord::coveredBy(const AccumulatedAck* const range) const{
return range->covers(deliveryTag);
}
-void DeliveryRecord::discardIfCoveredBy(const AccumulatedAck* const range) const{
- if(coveredBy(range)) discard();
-}
-
void DeliveryRecord::redeliver(Channel* const channel) const{
if(pull){
//if message was originally sent as response to get, we must requeue it
diff --git a/cpp/src/qpid/broker/DeliveryRecord.h b/cpp/src/qpid/broker/DeliveryRecord.h
index da74156000..5ec7be77cb 100644
--- a/cpp/src/qpid/broker/DeliveryRecord.h
+++ b/cpp/src/qpid/broker/DeliveryRecord.h
@@ -43,10 +43,9 @@ namespace qpid {
DeliveryRecord(Message::shared_ptr msg, Queue::shared_ptr queue, const string consumerTag, const u_int64_t deliveryTag);
DeliveryRecord(Message::shared_ptr msg, Queue::shared_ptr queue, const u_int64_t deliveryTag);
- void discard() const;
+ void discard(TransactionContext* ctxt = 0) const;
bool matches(u_int64_t tag) const;
bool coveredBy(const AccumulatedAck* const range) const;
- void discardIfCoveredBy(const AccumulatedAck* const range) const;
void requeue() const;
void redeliver(Channel* const) const;
void addTo(Prefetch* const prefetch) const;
diff --git a/cpp/src/qpid/broker/MessageStore.h b/cpp/src/qpid/broker/MessageStore.h
index 9db7e81ed7..276f33eacd 100644
--- a/cpp/src/qpid/broker/MessageStore.h
+++ b/cpp/src/qpid/broker/MessageStore.h
@@ -57,7 +57,7 @@ namespace qpid {
* distributed transaction in which the operation takes
* place or null for 'local' transactions
*/
- virtual void enqueue(Message::shared_ptr& msg, const Queue& queue, const string * const xid) = 0;
+ virtual void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid) = 0;
/**
* Dequeues a message, recording that the given message is
* no longer on the given queue and deleting the message
@@ -69,7 +69,7 @@ namespace qpid {
* distributed transaction in which the operation takes
* place or null for 'local' transactions
*/
- virtual void dequeue(Message::shared_ptr& msg, const Queue& queue, const string * const xid) = 0;
+ virtual void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid) = 0;
/**
* Treat all enqueue/dequeues where this xid was specified as being committed.
*/
diff --git a/cpp/src/qpid/broker/NullMessageStore.cpp b/cpp/src/qpid/broker/NullMessageStore.cpp
new file mode 100644
index 0000000000..3a07961670
--- /dev/null
+++ b/cpp/src/qpid/broker/NullMessageStore.cpp
@@ -0,0 +1,55 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "qpid/broker/NullMessageStore.h"
+
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/QueueRegistry.h"
+
+#include <iostream>
+
+using namespace qpid::broker;
+
+void NullMessageStore::create(const Queue& queue){
+ std::cout << "WARNING: Can't create durable queue '" << queue.getName() << "'. Persistence not enabled." << std::endl;
+}
+void NullMessageStore::destroy(const Queue& queue){
+ std::cout << "WARNING: Can't destroy durable queue '" << queue.getName() << "'. Persistence not enabled." << std::endl;
+}
+void NullMessageStore::recover(QueueRegistry&){
+ std::cout << "WARNING: Persistence not enabled, no recovery of queues or messages." << std::endl;
+}
+void NullMessageStore::enqueue(TransactionContext*, Message::shared_ptr&, const Queue& queue, const string * const){
+ std::cout << "WARNING: Can't enqueue message onto '" << queue.getName() << "'. Persistence not enabled." << std::endl;
+}
+void NullMessageStore::dequeue(TransactionContext*, Message::shared_ptr&, const Queue& queue, const string * const){
+ std::cout << "WARNING: Can't dequeue message from '" << queue.getName() << "'. Persistence not enabled." << std::endl;
+}
+void NullMessageStore::committed(const string * const){
+ std::cout << "WARNING: Persistence not enabled." << std::endl;
+}
+void NullMessageStore::aborted(const string * const){
+ std::cout << "WARNING: Persistence not enabled." << std::endl;
+}
+TransactionContext* NullMessageStore::begin(){
+ return 0;
+}
+void NullMessageStore::commit(TransactionContext*){
+}
+void NullMessageStore::abort(TransactionContext*){
+}
diff --git a/cpp/src/qpid/broker/NullMessageStore.h b/cpp/src/qpid/broker/NullMessageStore.h
new file mode 100644
index 0000000000..9b89920416
--- /dev/null
+++ b/cpp/src/qpid/broker/NullMessageStore.h
@@ -0,0 +1,51 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _NullMessageStore_
+#define _NullMessageStore_
+
+#include "qpid/broker/Message.h"
+#include "qpid/broker/MessageStore.h"
+#include "qpid/broker/Queue.h"
+
+namespace qpid {
+ namespace broker {
+ class Queue;
+ class QueueRegistry;
+
+ /**
+ * A null implementation of the MessageStore interface
+ */
+ class NullMessageStore : public MessageStore{
+ public:
+ void create(const Queue& queue);
+ void destroy(const Queue& queue);
+ void recover(QueueRegistry& queues);
+ void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid);
+ void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid);
+ void committed(const string * const xid);
+ void aborted(const string * const xid);
+ TransactionContext* begin();
+ void commit(TransactionContext* ctxt);
+ void abort(TransactionContext* ctxt);
+ ~NullMessageStore(){}
+ };
+ }
+}
+
+
+#endif
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 7f3cfdc470..000552715b 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -35,7 +35,8 @@ Queue::Queue(const string& _name, u_int32_t _autodelete,
dispatching(false),
next(0),
lastUsed(0),
- exclusive(0)
+ exclusive(0),
+ persistenceId(0)
{
if(autodelete) lastUsed = Time::now().msecs();
}
@@ -52,7 +53,7 @@ void Queue::bound(Binding* b){
}
void Queue::deliver(Message::shared_ptr& msg){
- enqueue(msg, 0);
+ enqueue(0, msg, 0);
process(msg);
}
@@ -163,15 +164,17 @@ bool Queue::canAutoDelete() const{
return lastUsed && (Time::now().msecs() - lastUsed > autodelete);
}
-void Queue::enqueue(Message::shared_ptr& msg, const string * const xid){
- if(store){
- store->enqueue(msg, *this, xid);
+void Queue::enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const string * const xid)
+{
+ if(msg->isPersistent() && store){
+ store->enqueue(ctxt, msg, *this, xid);
}
}
-void Queue::dequeue(Message::shared_ptr& msg, const string * const xid){
- if(store){
- store->dequeue(msg, *this, xid);
+void Queue::dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const string * const xid)
+{
+ if(msg->isPersistent() && store){
+ store->dequeue(ctxt, msg, *this, xid);
}
}
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index 8473465cab..fd0bad43ff 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -58,6 +58,7 @@ namespace qpid {
mutable qpid::sys::Monitor lock;
int64_t lastUsed;
Consumer* exclusive;
+ u_int64_t persistenceId;
bool startDispatching();
bool dispatch(Message::shared_ptr& msg);
@@ -106,10 +107,13 @@ namespace qpid {
inline const string& getName() const { return name; }
inline const bool isExclusiveOwner(const ConnectionToken* const o) const { return o == owner; }
inline bool hasExclusiveConsumer() const { return exclusive; }
+ inline u_int64_t getPersistenceId() const { return persistenceId; }
+ inline void setPersistenceId(u_int64_t _persistenceId) { persistenceId = _persistenceId; }
+
bool canAutoDelete() const;
- void enqueue(Message::shared_ptr& msg, const string * const xid);
- void dequeue(Message::shared_ptr& msg, const string * const xid);
+ void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const string * const xid);
+ void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const string * const xid);
};
}
}
diff --git a/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp b/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp
index 8d4f955270..707881b6bb 100644
--- a/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp
+++ b/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp
@@ -16,10 +16,12 @@
*
*/
#include "qpid/broker/SessionHandlerFactoryImpl.h"
-#include "qpid/broker/SessionHandlerImpl.h"
+
+#include "qpid/broker/DirectExchange.h"
#include "qpid/broker/FanOutExchange.h"
#include "qpid/broker/HeadersExchange.h"
-#include "qpid/broker/DirectExchange.h"
+#include "qpid/broker/NullMessageStore.h"
+#include "qpid/broker/SessionHandlerImpl.h"
using namespace qpid::broker;
using namespace qpid::sys;
@@ -34,7 +36,7 @@ const std::string amq_match("amq.match");
}
SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(u_int32_t _timeout) :
- queues(store.get()), timeout(_timeout), cleaner(&queues, timeout/10)
+ store(new NullMessageStore()), queues(store.get()), timeout(_timeout), cleaner(&queues, timeout/10)
{
exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
exchanges.declare(amq_direct, DirectExchange::typeName);
diff --git a/cpp/src/qpid/broker/TransactionalStore.h b/cpp/src/qpid/broker/TransactionalStore.h
index 3976edd7b9..57dc411cd8 100644
--- a/cpp/src/qpid/broker/TransactionalStore.h
+++ b/cpp/src/qpid/broker/TransactionalStore.h
@@ -20,11 +20,16 @@
namespace qpid {
namespace broker {
+ class TransactionContext{
+ public:
+ virtual ~TransactionContext(){}
+ };
+
class TransactionalStore{
public:
- virtual void begin() = 0;
- virtual void commit() = 0;
- virtual void abort() = 0;
+ virtual TransactionContext* begin() = 0;
+ virtual void commit(TransactionContext*) = 0;
+ virtual void abort(TransactionContext*) = 0;
virtual ~TransactionalStore(){}
};
diff --git a/cpp/src/qpid/broker/TxAck.cpp b/cpp/src/qpid/broker/TxAck.cpp
index 6dba6fd79d..0bd525cc68 100644
--- a/cpp/src/qpid/broker/TxAck.cpp
+++ b/cpp/src/qpid/broker/TxAck.cpp
@@ -26,10 +26,15 @@ TxAck::TxAck(AccumulatedAck& _acked, std::list<DeliveryRecord>& _unacked) : acke
}
-bool TxAck::prepare() throw(){
+bool TxAck::prepare(TransactionContext* ctxt) throw(){
try{
//dequeue all acked messages from their queues
- for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::discardIfCoveredBy), &acked));
+ for (ack_iterator i = unacked.begin(); i != unacked.end(); i++) {
+ if (i->coveredBy(&acked)) {
+ i->discard(ctxt);
+ }
+ }
+ //for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::discardIfCoveredBy), &acked));
return true;
}catch(...){
std::cout << "TxAck::prepare() - Failed to prepare" << std::endl;
diff --git a/cpp/src/qpid/broker/TxAck.h b/cpp/src/qpid/broker/TxAck.h
index 9046a89384..f284f8827d 100644
--- a/cpp/src/qpid/broker/TxAck.h
+++ b/cpp/src/qpid/broker/TxAck.h
@@ -41,7 +41,7 @@ namespace qpid {
* @param unacked the record of delivered messages
*/
TxAck(AccumulatedAck& acked, std::list<DeliveryRecord>& unacked);
- virtual bool prepare() throw();
+ virtual bool prepare(TransactionContext* ctxt) throw();
virtual void commit() throw();
virtual void rollback() throw();
virtual ~TxAck(){}
diff --git a/cpp/src/qpid/broker/TxBuffer.cpp b/cpp/src/qpid/broker/TxBuffer.cpp
index 0529892930..fe2ea8fbb1 100644
--- a/cpp/src/qpid/broker/TxBuffer.cpp
+++ b/cpp/src/qpid/broker/TxBuffer.cpp
@@ -17,30 +17,28 @@
*/
#include "qpid/broker/TxBuffer.h"
+using std::mem_fun;
using namespace qpid::broker;
bool TxBuffer::prepare(TransactionalStore* const store){
- if(store) store->begin();
+ TransactionContext* ctxt(0);
+ if(store) ctxt = store->begin();
for(op_iterator i = ops.begin(); i < ops.end(); i++){
- if(!(*i)->prepare()){
- if(store) store->abort();
+ if(!(*i)->prepare(ctxt)){
+ if(store) store->abort(ctxt);
return false;
}
}
- if(store) store->commit();
+ if(store) store->commit(ctxt);
return true;
}
void TxBuffer::commit(){
- for(op_iterator i = ops.begin(); i < ops.end(); i++){
- (*i)->commit();
- }
+ for_each(ops.begin(), ops.end(), mem_fun(&TxOp::commit));
}
void TxBuffer::rollback(){
- for(op_iterator i = ops.begin(); i < ops.end(); i++){
- (*i)->rollback();
- }
+ for_each(ops.begin(), ops.end(), mem_fun(&TxOp::rollback));
}
void TxBuffer::enlist(TxOp* const op){
diff --git a/cpp/src/qpid/broker/TxBuffer.h b/cpp/src/qpid/broker/TxBuffer.h
index 0963c7472a..dbf77304ed 100644
--- a/cpp/src/qpid/broker/TxBuffer.h
+++ b/cpp/src/qpid/broker/TxBuffer.h
@@ -18,6 +18,8 @@
#ifndef _TxBuffer_
#define _TxBuffer_
+#include <algorithm>
+#include <functional>
#include <vector>
#include "qpid/broker/TransactionalStore.h"
#include "qpid/broker/TxOp.h"
diff --git a/cpp/src/qpid/broker/TxOp.h b/cpp/src/qpid/broker/TxOp.h
index 37934dbec6..af6eaded23 100644
--- a/cpp/src/qpid/broker/TxOp.h
+++ b/cpp/src/qpid/broker/TxOp.h
@@ -18,11 +18,13 @@
#ifndef _TxOp_
#define _TxOp_
+#include "qpid/broker/TransactionalStore.h"
+
namespace qpid {
namespace broker {
class TxOp{
public:
- virtual bool prepare() throw() = 0;
+ virtual bool prepare(TransactionContext*) throw() = 0;
virtual void commit() throw() = 0;
virtual void rollback() throw() = 0;
virtual ~TxOp(){}
diff --git a/cpp/src/qpid/broker/TxPublish.cpp b/cpp/src/qpid/broker/TxPublish.cpp
index 93250dbb20..461ddfe052 100644
--- a/cpp/src/qpid/broker/TxPublish.cpp
+++ b/cpp/src/qpid/broker/TxPublish.cpp
@@ -21,9 +21,9 @@ using namespace qpid::broker;
TxPublish::TxPublish(Message::shared_ptr _msg) : msg(_msg) {}
-bool TxPublish::prepare() throw(){
+bool TxPublish::prepare(TransactionContext* ctxt) throw(){
try{
- for_each(queues.begin(), queues.end(), Prepare(msg, 0));
+ for_each(queues.begin(), queues.end(), Prepare(ctxt, msg, 0));
return true;
}catch(...){
std::cout << "TxPublish::prepare() - Failed to prepare" << std::endl;
@@ -42,10 +42,11 @@ void TxPublish::deliverTo(Queue::shared_ptr& queue){
queues.push_back(queue);
}
-TxPublish::Prepare::Prepare(Message::shared_ptr& _msg, const string* const _xid) : msg(_msg), xid(_xid){}
+TxPublish::Prepare::Prepare(TransactionContext* _ctxt, Message::shared_ptr& _msg, const string* const _xid)
+ : ctxt(_ctxt), msg(_msg), xid(_xid){}
void TxPublish::Prepare::operator()(Queue::shared_ptr& queue){
- queue->enqueue(msg, xid);
+ queue->enqueue(ctxt, msg, xid);
}
TxPublish::Commit::Commit(Message::shared_ptr& _msg) : msg(_msg){}
diff --git a/cpp/src/qpid/broker/TxPublish.h b/cpp/src/qpid/broker/TxPublish.h
index 5076e0f56f..f955ab4bc0 100644
--- a/cpp/src/qpid/broker/TxPublish.h
+++ b/cpp/src/qpid/broker/TxPublish.h
@@ -41,10 +41,11 @@ namespace qpid {
*/
class TxPublish : public TxOp, public Deliverable{
class Prepare{
+ TransactionContext* ctxt;
Message::shared_ptr& msg;
const string* const xid;
public:
- Prepare(Message::shared_ptr& msg, const string* const xid);
+ Prepare(TransactionContext* ctxt, Message::shared_ptr& msg, const string* const xid);
void operator()(Queue::shared_ptr& queue);
};
@@ -60,7 +61,7 @@ namespace qpid {
public:
TxPublish(Message::shared_ptr msg);
- virtual bool prepare() throw();
+ virtual bool prepare(TransactionContext* ctxt) throw();
virtual void commit() throw();
virtual void rollback() throw();