diff options
author | Gordon Sim <gsim@apache.org> | 2006-11-07 11:26:50 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2006-11-07 11:26:50 +0000 |
commit | c2a8f493b4d87c71dc04a432e686e2d9151a393f (patch) | |
tree | 1bb137e50aa711e6ead9e70e9117f04916b32b07 /cpp/src | |
parent | e4ec69544d05f04c64b92d85905978495c1aee77 (diff) | |
download | qpid-python-c2a8f493b4d87c71dc04a432e686e2d9151a393f.tar.gz |
Made passing of transaction context in message store explicit (to avoid thread local storage in case this doesn't fit with new io design).
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@472067 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/Channel.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DeletingTxOp.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DeletingTxOp.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DeliveryRecord.cpp | 8 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DeliveryRecord.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/MessageStore.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/NullMessageStore.cpp | 55 | ||||
-rw-r--r-- | cpp/src/qpid/broker/NullMessageStore.h | 51 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 19 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.h | 8 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp | 8 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TransactionalStore.h | 11 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TxAck.cpp | 9 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TxAck.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TxBuffer.cpp | 18 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TxBuffer.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TxOp.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TxPublish.cpp | 9 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TxPublish.h | 5 |
19 files changed, 174 insertions, 50 deletions
diff --git a/cpp/src/qpid/broker/Channel.cpp b/cpp/src/qpid/broker/Channel.cpp index 45375a9fd3..967c5855fa 100644 --- a/cpp/src/qpid/broker/Channel.cpp +++ b/cpp/src/qpid/broker/Channel.cpp @@ -198,7 +198,7 @@ void Channel::ack(u_int64_t deliveryTag, bool multiple){ throw InvalidAckException(); }else if(multiple){ ack_iterator end = ++i; - for_each(unacked.begin(), end, mem_fun_ref(&DeliveryRecord::discard)); + for_each(unacked.begin(), end, bind2nd(mem_fun_ref(&DeliveryRecord::discard), 0)); unacked.erase(unacked.begin(), end); //recalculate the prefetch: diff --git a/cpp/src/qpid/broker/DeletingTxOp.cpp b/cpp/src/qpid/broker/DeletingTxOp.cpp index e9b9f30326..ea3735f361 100644 --- a/cpp/src/qpid/broker/DeletingTxOp.cpp +++ b/cpp/src/qpid/broker/DeletingTxOp.cpp @@ -21,8 +21,8 @@ using namespace qpid::broker; DeletingTxOp::DeletingTxOp(TxOp* const _delegate) : delegate(_delegate){} -bool DeletingTxOp::prepare() throw(){ - return delegate && delegate->prepare(); +bool DeletingTxOp::prepare(TransactionContext* ctxt) throw(){ + return delegate && delegate->prepare(ctxt); } void DeletingTxOp::commit() throw(){ diff --git a/cpp/src/qpid/broker/DeletingTxOp.h b/cpp/src/qpid/broker/DeletingTxOp.h index 7e43717f17..f450807256 100644 --- a/cpp/src/qpid/broker/DeletingTxOp.h +++ b/cpp/src/qpid/broker/DeletingTxOp.h @@ -30,7 +30,7 @@ namespace qpid { TxOp* delegate; public: DeletingTxOp(TxOp* const delegate); - virtual bool prepare() throw(); + virtual bool prepare(TransactionContext* ctxt) throw(); virtual void commit() throw(); virtual void rollback() throw(); virtual ~DeletingTxOp(){} diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp index 19b786a8d3..a49c5fbc02 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.cpp +++ b/cpp/src/qpid/broker/DeliveryRecord.cpp @@ -38,8 +38,8 @@ DeliveryRecord::DeliveryRecord(Message::shared_ptr _msg, pull(true){} -void DeliveryRecord::discard() const{ - queue->dequeue(msg, 0); +void DeliveryRecord::discard(TransactionContext* ctxt) const{ + queue->dequeue(ctxt, msg, 0); } bool DeliveryRecord::matches(u_int64_t tag) const{ @@ -50,10 +50,6 @@ bool DeliveryRecord::coveredBy(const AccumulatedAck* const range) const{ return range->covers(deliveryTag); } -void DeliveryRecord::discardIfCoveredBy(const AccumulatedAck* const range) const{ - if(coveredBy(range)) discard(); -} - void DeliveryRecord::redeliver(Channel* const channel) const{ if(pull){ //if message was originally sent as response to get, we must requeue it diff --git a/cpp/src/qpid/broker/DeliveryRecord.h b/cpp/src/qpid/broker/DeliveryRecord.h index da74156000..5ec7be77cb 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.h +++ b/cpp/src/qpid/broker/DeliveryRecord.h @@ -43,10 +43,9 @@ namespace qpid { DeliveryRecord(Message::shared_ptr msg, Queue::shared_ptr queue, const string consumerTag, const u_int64_t deliveryTag); DeliveryRecord(Message::shared_ptr msg, Queue::shared_ptr queue, const u_int64_t deliveryTag); - void discard() const; + void discard(TransactionContext* ctxt = 0) const; bool matches(u_int64_t tag) const; bool coveredBy(const AccumulatedAck* const range) const; - void discardIfCoveredBy(const AccumulatedAck* const range) const; void requeue() const; void redeliver(Channel* const) const; void addTo(Prefetch* const prefetch) const; diff --git a/cpp/src/qpid/broker/MessageStore.h b/cpp/src/qpid/broker/MessageStore.h index 9db7e81ed7..276f33eacd 100644 --- a/cpp/src/qpid/broker/MessageStore.h +++ b/cpp/src/qpid/broker/MessageStore.h @@ -57,7 +57,7 @@ namespace qpid { * distributed transaction in which the operation takes * place or null for 'local' transactions */ - virtual void enqueue(Message::shared_ptr& msg, const Queue& queue, const string * const xid) = 0; + virtual void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid) = 0; /** * Dequeues a message, recording that the given message is * no longer on the given queue and deleting the message @@ -69,7 +69,7 @@ namespace qpid { * distributed transaction in which the operation takes * place or null for 'local' transactions */ - virtual void dequeue(Message::shared_ptr& msg, const Queue& queue, const string * const xid) = 0; + virtual void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid) = 0; /** * Treat all enqueue/dequeues where this xid was specified as being committed. */ diff --git a/cpp/src/qpid/broker/NullMessageStore.cpp b/cpp/src/qpid/broker/NullMessageStore.cpp new file mode 100644 index 0000000000..3a07961670 --- /dev/null +++ b/cpp/src/qpid/broker/NullMessageStore.cpp @@ -0,0 +1,55 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "qpid/broker/NullMessageStore.h" + +#include "qpid/broker/Queue.h" +#include "qpid/broker/QueueRegistry.h" + +#include <iostream> + +using namespace qpid::broker; + +void NullMessageStore::create(const Queue& queue){ + std::cout << "WARNING: Can't create durable queue '" << queue.getName() << "'. Persistence not enabled." << std::endl; +} +void NullMessageStore::destroy(const Queue& queue){ + std::cout << "WARNING: Can't destroy durable queue '" << queue.getName() << "'. Persistence not enabled." << std::endl; +} +void NullMessageStore::recover(QueueRegistry&){ + std::cout << "WARNING: Persistence not enabled, no recovery of queues or messages." << std::endl; +} +void NullMessageStore::enqueue(TransactionContext*, Message::shared_ptr&, const Queue& queue, const string * const){ + std::cout << "WARNING: Can't enqueue message onto '" << queue.getName() << "'. Persistence not enabled." << std::endl; +} +void NullMessageStore::dequeue(TransactionContext*, Message::shared_ptr&, const Queue& queue, const string * const){ + std::cout << "WARNING: Can't dequeue message from '" << queue.getName() << "'. Persistence not enabled." << std::endl; +} +void NullMessageStore::committed(const string * const){ + std::cout << "WARNING: Persistence not enabled." << std::endl; +} +void NullMessageStore::aborted(const string * const){ + std::cout << "WARNING: Persistence not enabled." << std::endl; +} +TransactionContext* NullMessageStore::begin(){ + return 0; +} +void NullMessageStore::commit(TransactionContext*){ +} +void NullMessageStore::abort(TransactionContext*){ +} diff --git a/cpp/src/qpid/broker/NullMessageStore.h b/cpp/src/qpid/broker/NullMessageStore.h new file mode 100644 index 0000000000..9b89920416 --- /dev/null +++ b/cpp/src/qpid/broker/NullMessageStore.h @@ -0,0 +1,51 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _NullMessageStore_ +#define _NullMessageStore_ + +#include "qpid/broker/Message.h" +#include "qpid/broker/MessageStore.h" +#include "qpid/broker/Queue.h" + +namespace qpid { + namespace broker { + class Queue; + class QueueRegistry; + + /** + * A null implementation of the MessageStore interface + */ + class NullMessageStore : public MessageStore{ + public: + void create(const Queue& queue); + void destroy(const Queue& queue); + void recover(QueueRegistry& queues); + void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid); + void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid); + void committed(const string * const xid); + void aborted(const string * const xid); + TransactionContext* begin(); + void commit(TransactionContext* ctxt); + void abort(TransactionContext* ctxt); + ~NullMessageStore(){} + }; + } +} + + +#endif diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 7f3cfdc470..000552715b 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -35,7 +35,8 @@ Queue::Queue(const string& _name, u_int32_t _autodelete, dispatching(false), next(0), lastUsed(0), - exclusive(0) + exclusive(0), + persistenceId(0) { if(autodelete) lastUsed = Time::now().msecs(); } @@ -52,7 +53,7 @@ void Queue::bound(Binding* b){ } void Queue::deliver(Message::shared_ptr& msg){ - enqueue(msg, 0); + enqueue(0, msg, 0); process(msg); } @@ -163,15 +164,17 @@ bool Queue::canAutoDelete() const{ return lastUsed && (Time::now().msecs() - lastUsed > autodelete); } -void Queue::enqueue(Message::shared_ptr& msg, const string * const xid){ - if(store){ - store->enqueue(msg, *this, xid); +void Queue::enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const string * const xid) +{ + if(msg->isPersistent() && store){ + store->enqueue(ctxt, msg, *this, xid); } } -void Queue::dequeue(Message::shared_ptr& msg, const string * const xid){ - if(store){ - store->dequeue(msg, *this, xid); +void Queue::dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const string * const xid) +{ + if(msg->isPersistent() && store){ + store->dequeue(ctxt, msg, *this, xid); } } diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index 8473465cab..fd0bad43ff 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -58,6 +58,7 @@ namespace qpid { mutable qpid::sys::Monitor lock; int64_t lastUsed; Consumer* exclusive; + u_int64_t persistenceId; bool startDispatching(); bool dispatch(Message::shared_ptr& msg); @@ -106,10 +107,13 @@ namespace qpid { inline const string& getName() const { return name; } inline const bool isExclusiveOwner(const ConnectionToken* const o) const { return o == owner; } inline bool hasExclusiveConsumer() const { return exclusive; } + inline u_int64_t getPersistenceId() const { return persistenceId; } + inline void setPersistenceId(u_int64_t _persistenceId) { persistenceId = _persistenceId; } + bool canAutoDelete() const; - void enqueue(Message::shared_ptr& msg, const string * const xid); - void dequeue(Message::shared_ptr& msg, const string * const xid); + void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const string * const xid); + void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const string * const xid); }; } } diff --git a/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp b/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp index 8d4f955270..707881b6bb 100644 --- a/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp +++ b/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp @@ -16,10 +16,12 @@ * */ #include "qpid/broker/SessionHandlerFactoryImpl.h" -#include "qpid/broker/SessionHandlerImpl.h" + +#include "qpid/broker/DirectExchange.h" #include "qpid/broker/FanOutExchange.h" #include "qpid/broker/HeadersExchange.h" -#include "qpid/broker/DirectExchange.h" +#include "qpid/broker/NullMessageStore.h" +#include "qpid/broker/SessionHandlerImpl.h" using namespace qpid::broker; using namespace qpid::sys; @@ -34,7 +36,7 @@ const std::string amq_match("amq.match"); } SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(u_int32_t _timeout) : - queues(store.get()), timeout(_timeout), cleaner(&queues, timeout/10) + store(new NullMessageStore()), queues(store.get()), timeout(_timeout), cleaner(&queues, timeout/10) { exchanges.declare(empty, DirectExchange::typeName); // Default exchange. exchanges.declare(amq_direct, DirectExchange::typeName); diff --git a/cpp/src/qpid/broker/TransactionalStore.h b/cpp/src/qpid/broker/TransactionalStore.h index 3976edd7b9..57dc411cd8 100644 --- a/cpp/src/qpid/broker/TransactionalStore.h +++ b/cpp/src/qpid/broker/TransactionalStore.h @@ -20,11 +20,16 @@ namespace qpid { namespace broker { + class TransactionContext{ + public: + virtual ~TransactionContext(){} + }; + class TransactionalStore{ public: - virtual void begin() = 0; - virtual void commit() = 0; - virtual void abort() = 0; + virtual TransactionContext* begin() = 0; + virtual void commit(TransactionContext*) = 0; + virtual void abort(TransactionContext*) = 0; virtual ~TransactionalStore(){} }; diff --git a/cpp/src/qpid/broker/TxAck.cpp b/cpp/src/qpid/broker/TxAck.cpp index 6dba6fd79d..0bd525cc68 100644 --- a/cpp/src/qpid/broker/TxAck.cpp +++ b/cpp/src/qpid/broker/TxAck.cpp @@ -26,10 +26,15 @@ TxAck::TxAck(AccumulatedAck& _acked, std::list<DeliveryRecord>& _unacked) : acke } -bool TxAck::prepare() throw(){ +bool TxAck::prepare(TransactionContext* ctxt) throw(){ try{ //dequeue all acked messages from their queues - for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::discardIfCoveredBy), &acked)); + for (ack_iterator i = unacked.begin(); i != unacked.end(); i++) { + if (i->coveredBy(&acked)) { + i->discard(ctxt); + } + } + //for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::discardIfCoveredBy), &acked)); return true; }catch(...){ std::cout << "TxAck::prepare() - Failed to prepare" << std::endl; diff --git a/cpp/src/qpid/broker/TxAck.h b/cpp/src/qpid/broker/TxAck.h index 9046a89384..f284f8827d 100644 --- a/cpp/src/qpid/broker/TxAck.h +++ b/cpp/src/qpid/broker/TxAck.h @@ -41,7 +41,7 @@ namespace qpid { * @param unacked the record of delivered messages */ TxAck(AccumulatedAck& acked, std::list<DeliveryRecord>& unacked); - virtual bool prepare() throw(); + virtual bool prepare(TransactionContext* ctxt) throw(); virtual void commit() throw(); virtual void rollback() throw(); virtual ~TxAck(){} diff --git a/cpp/src/qpid/broker/TxBuffer.cpp b/cpp/src/qpid/broker/TxBuffer.cpp index 0529892930..fe2ea8fbb1 100644 --- a/cpp/src/qpid/broker/TxBuffer.cpp +++ b/cpp/src/qpid/broker/TxBuffer.cpp @@ -17,30 +17,28 @@ */ #include "qpid/broker/TxBuffer.h" +using std::mem_fun; using namespace qpid::broker; bool TxBuffer::prepare(TransactionalStore* const store){ - if(store) store->begin(); + TransactionContext* ctxt(0); + if(store) ctxt = store->begin(); for(op_iterator i = ops.begin(); i < ops.end(); i++){ - if(!(*i)->prepare()){ - if(store) store->abort(); + if(!(*i)->prepare(ctxt)){ + if(store) store->abort(ctxt); return false; } } - if(store) store->commit(); + if(store) store->commit(ctxt); return true; } void TxBuffer::commit(){ - for(op_iterator i = ops.begin(); i < ops.end(); i++){ - (*i)->commit(); - } + for_each(ops.begin(), ops.end(), mem_fun(&TxOp::commit)); } void TxBuffer::rollback(){ - for(op_iterator i = ops.begin(); i < ops.end(); i++){ - (*i)->rollback(); - } + for_each(ops.begin(), ops.end(), mem_fun(&TxOp::rollback)); } void TxBuffer::enlist(TxOp* const op){ diff --git a/cpp/src/qpid/broker/TxBuffer.h b/cpp/src/qpid/broker/TxBuffer.h index 0963c7472a..dbf77304ed 100644 --- a/cpp/src/qpid/broker/TxBuffer.h +++ b/cpp/src/qpid/broker/TxBuffer.h @@ -18,6 +18,8 @@ #ifndef _TxBuffer_ #define _TxBuffer_ +#include <algorithm> +#include <functional> #include <vector> #include "qpid/broker/TransactionalStore.h" #include "qpid/broker/TxOp.h" diff --git a/cpp/src/qpid/broker/TxOp.h b/cpp/src/qpid/broker/TxOp.h index 37934dbec6..af6eaded23 100644 --- a/cpp/src/qpid/broker/TxOp.h +++ b/cpp/src/qpid/broker/TxOp.h @@ -18,11 +18,13 @@ #ifndef _TxOp_ #define _TxOp_ +#include "qpid/broker/TransactionalStore.h" + namespace qpid { namespace broker { class TxOp{ public: - virtual bool prepare() throw() = 0; + virtual bool prepare(TransactionContext*) throw() = 0; virtual void commit() throw() = 0; virtual void rollback() throw() = 0; virtual ~TxOp(){} diff --git a/cpp/src/qpid/broker/TxPublish.cpp b/cpp/src/qpid/broker/TxPublish.cpp index 93250dbb20..461ddfe052 100644 --- a/cpp/src/qpid/broker/TxPublish.cpp +++ b/cpp/src/qpid/broker/TxPublish.cpp @@ -21,9 +21,9 @@ using namespace qpid::broker; TxPublish::TxPublish(Message::shared_ptr _msg) : msg(_msg) {} -bool TxPublish::prepare() throw(){ +bool TxPublish::prepare(TransactionContext* ctxt) throw(){ try{ - for_each(queues.begin(), queues.end(), Prepare(msg, 0)); + for_each(queues.begin(), queues.end(), Prepare(ctxt, msg, 0)); return true; }catch(...){ std::cout << "TxPublish::prepare() - Failed to prepare" << std::endl; @@ -42,10 +42,11 @@ void TxPublish::deliverTo(Queue::shared_ptr& queue){ queues.push_back(queue); } -TxPublish::Prepare::Prepare(Message::shared_ptr& _msg, const string* const _xid) : msg(_msg), xid(_xid){} +TxPublish::Prepare::Prepare(TransactionContext* _ctxt, Message::shared_ptr& _msg, const string* const _xid) + : ctxt(_ctxt), msg(_msg), xid(_xid){} void TxPublish::Prepare::operator()(Queue::shared_ptr& queue){ - queue->enqueue(msg, xid); + queue->enqueue(ctxt, msg, xid); } TxPublish::Commit::Commit(Message::shared_ptr& _msg) : msg(_msg){} diff --git a/cpp/src/qpid/broker/TxPublish.h b/cpp/src/qpid/broker/TxPublish.h index 5076e0f56f..f955ab4bc0 100644 --- a/cpp/src/qpid/broker/TxPublish.h +++ b/cpp/src/qpid/broker/TxPublish.h @@ -41,10 +41,11 @@ namespace qpid { */ class TxPublish : public TxOp, public Deliverable{ class Prepare{ + TransactionContext* ctxt; Message::shared_ptr& msg; const string* const xid; public: - Prepare(Message::shared_ptr& msg, const string* const xid); + Prepare(TransactionContext* ctxt, Message::shared_ptr& msg, const string* const xid); void operator()(Queue::shared_ptr& queue); }; @@ -60,7 +61,7 @@ namespace qpid { public: TxPublish(Message::shared_ptr msg); - virtual bool prepare() throw(); + virtual bool prepare(TransactionContext* ctxt) throw(); virtual void commit() throw(); virtual void rollback() throw(); |