diff options
Diffstat (limited to 'cpp/src/qpid/broker')
34 files changed, 165 insertions, 106 deletions
diff --git a/cpp/src/qpid/broker/Consumer.h b/cpp/src/qpid/broker/Consumer.h index 65c60182b8..00eb41a428 100644 --- a/cpp/src/qpid/broker/Consumer.h +++ b/cpp/src/qpid/broker/Consumer.h @@ -33,11 +33,11 @@ namespace qpid { struct QueuedMessage { - intrusive_ptr<Message> payload; + boost::intrusive_ptr<Message> payload; framing::SequenceNumber position; Queue* queue; - QueuedMessage(Queue* q, intrusive_ptr<Message> msg, framing::SequenceNumber sn) : + QueuedMessage(Queue* q, boost::intrusive_ptr<Message> msg, framing::SequenceNumber sn) : payload(msg), position(sn), queue(q) {} QueuedMessage(Queue* q) : queue(q) {} }; @@ -54,8 +54,8 @@ namespace qpid { bool preAcquires() const { return acquires; } virtual bool deliver(QueuedMessage& msg) = 0; virtual void notify() = 0; - virtual bool filter(intrusive_ptr<Message>) { return true; } - virtual bool accept(intrusive_ptr<Message>) { return true; } + virtual bool filter(boost::intrusive_ptr<Message>) { return true; } + virtual bool accept(boost::intrusive_ptr<Message>) { return true; } virtual ~Consumer(){} }; } diff --git a/cpp/src/qpid/broker/DeliverableMessage.cpp b/cpp/src/qpid/broker/DeliverableMessage.cpp index e79a3aa773..fd15acf464 100644 --- a/cpp/src/qpid/broker/DeliverableMessage.cpp +++ b/cpp/src/qpid/broker/DeliverableMessage.cpp @@ -22,7 +22,7 @@ using namespace qpid::broker; -DeliverableMessage::DeliverableMessage(intrusive_ptr<Message>& _msg) : msg(_msg) +DeliverableMessage::DeliverableMessage(boost::intrusive_ptr<Message>& _msg) : msg(_msg) { } diff --git a/cpp/src/qpid/broker/DeliverableMessage.h b/cpp/src/qpid/broker/DeliverableMessage.h index 440d1184eb..18e1ec5e29 100644 --- a/cpp/src/qpid/broker/DeliverableMessage.h +++ b/cpp/src/qpid/broker/DeliverableMessage.h @@ -25,12 +25,14 @@ #include "Queue.h" #include "Message.h" +#include <boost/intrusive_ptr.hpp> + namespace qpid { namespace broker { class DeliverableMessage : public Deliverable{ - intrusive_ptr<Message> msg; + boost::intrusive_ptr<Message> msg; public: - DeliverableMessage(intrusive_ptr<Message>& msg); + DeliverableMessage(boost::intrusive_ptr<Message>& msg); virtual void deliverTo(Queue::shared_ptr& queue); Message& getMessage(); uint64_t contentSize(); diff --git a/cpp/src/qpid/broker/DtxManager.cpp b/cpp/src/qpid/broker/DtxManager.cpp index 6070b17b24..cef3a4b02b 100644 --- a/cpp/src/qpid/broker/DtxManager.cpp +++ b/cpp/src/qpid/broker/DtxManager.cpp @@ -27,6 +27,7 @@ #include <boost/format.hpp> #include <iostream> +using boost::intrusive_ptr; using qpid::sys::Mutex; using namespace qpid::ptr_map; using namespace qpid::broker; diff --git a/cpp/src/qpid/broker/DtxWorkRecord.h b/cpp/src/qpid/broker/DtxWorkRecord.h index cd0e13e991..6677784c32 100644 --- a/cpp/src/qpid/broker/DtxWorkRecord.h +++ b/cpp/src/qpid/broker/DtxWorkRecord.h @@ -21,15 +21,19 @@ #ifndef _DtxWorkRecord_ #define _DtxWorkRecord_ -#include <algorithm> -#include <functional> -#include <vector> #include "DtxBuffer.h" #include "DtxTimeout.h" #include "TransactionalStore.h" + #include "qpid/framing/amqp_types.h" #include "qpid/sys/Mutex.h" +#include <algorithm> +#include <functional> +#include <vector> + +#include <boost/intrusive_ptr.hpp> + namespace qpid { namespace broker { @@ -48,7 +52,7 @@ class DtxWorkRecord bool rolledback; bool prepared; bool expired; - intrusive_ptr<DtxTimeout> timeout; + boost::intrusive_ptr<DtxTimeout> timeout; Work work; std::auto_ptr<TPCTransactionContext> txn; qpid::sys::Mutex lock; @@ -65,8 +69,8 @@ public: void add(DtxBuffer::shared_ptr ops); void recover(std::auto_ptr<TPCTransactionContext> txn, DtxBuffer::shared_ptr ops); void timedout(); - void setTimeout(intrusive_ptr<DtxTimeout> t) { timeout = t; } - intrusive_ptr<DtxTimeout> getTimeout() { return timeout; } + void setTimeout(boost::intrusive_ptr<DtxTimeout> t) { timeout = t; } + boost::intrusive_ptr<DtxTimeout> getTimeout() { return timeout; } }; } diff --git a/cpp/src/qpid/broker/IncomingExecutionContext.cpp b/cpp/src/qpid/broker/IncomingExecutionContext.cpp index f2c2965ecd..6c6cae6740 100644 --- a/cpp/src/qpid/broker/IncomingExecutionContext.cpp +++ b/cpp/src/qpid/broker/IncomingExecutionContext.cpp @@ -25,6 +25,7 @@ namespace qpid { namespace broker { +using boost::intrusive_ptr; using qpid::framing::AccumulatedAck; using qpid::framing::SequenceNumber; using qpid::framing::SequenceNumberSet; diff --git a/cpp/src/qpid/broker/IncomingExecutionContext.h b/cpp/src/qpid/broker/IncomingExecutionContext.h index 3056c2b4bc..7380e9ae64 100644 --- a/cpp/src/qpid/broker/IncomingExecutionContext.h +++ b/cpp/src/qpid/broker/IncomingExecutionContext.h @@ -21,16 +21,19 @@ #ifndef _IncomingExecutionContext_ #define _IncomingExecutionContext_ +#include "Message.h" + #include "qpid/framing/AccumulatedAck.h" #include "qpid/framing/SequenceNumber.h" -#include "Message.h" + +#include <boost/intrusive_ptr.hpp> namespace qpid { namespace broker { class IncomingExecutionContext { - typedef std::list<intrusive_ptr<Message> > Messages; + typedef std::list<boost::intrusive_ptr<Message> > Messages; framing::Window window; framing::AccumulatedAck completed; Messages incomplete; @@ -45,7 +48,7 @@ public: void sync(const framing::SequenceNumber& point); framing::SequenceNumber next(); void complete(const framing::SequenceNumber& command); - void track(intrusive_ptr<Message>); + void track(boost::intrusive_ptr<Message>); const framing::SequenceNumber& getMark(); framing::SequenceNumberSet getRange(); diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp index a2c78daa7c..21908256a1 100644 --- a/cpp/src/qpid/broker/Message.cpp +++ b/cpp/src/qpid/broker/Message.cpp @@ -30,6 +30,7 @@ #include "qpid/framing/TypeFilter.h" #include "qpid/log/Statement.h" +using boost::intrusive_ptr; using namespace qpid::broker; using namespace qpid::framing; using std::string; diff --git a/cpp/src/qpid/broker/MessageBuilder.cpp b/cpp/src/qpid/broker/MessageBuilder.cpp index d00a474aee..eda71ed3da 100644 --- a/cpp/src/qpid/broker/MessageBuilder.cpp +++ b/cpp/src/qpid/broker/MessageBuilder.cpp @@ -25,6 +25,7 @@ #include "qpid/framing/AMQFrame.h" #include "qpid/framing/reply_exceptions.h" +using boost::intrusive_ptr; using namespace qpid::broker; using namespace qpid::framing; diff --git a/cpp/src/qpid/broker/MessageBuilder.h b/cpp/src/qpid/broker/MessageBuilder.h index 5bf6b1939a..395de024ab 100644 --- a/cpp/src/qpid/broker/MessageBuilder.h +++ b/cpp/src/qpid/broker/MessageBuilder.h @@ -25,6 +25,8 @@ #include "qpid/framing/SequenceNumber.h" #include "qpid/RefCounted.h" +#include <boost/intrusive_ptr.hpp> + namespace qpid { namespace broker { class Message; @@ -34,13 +36,13 @@ namespace qpid { public: MessageBuilder(MessageStore* const store, uint64_t stagingThreshold); void handle(framing::AMQFrame& frame); - intrusive_ptr<Message> getMessage() { return message; } + boost::intrusive_ptr<Message> getMessage() { return message; } void start(const framing::SequenceNumber& id); void end(); private: enum State {DORMANT, METHOD, HEADER, CONTENT}; State state; - intrusive_ptr<Message> message; + boost::intrusive_ptr<Message> message; MessageStore* const store; const uint64_t stagingThreshold; bool staging; diff --git a/cpp/src/qpid/broker/MessageStore.h b/cpp/src/qpid/broker/MessageStore.h index eaf9f1688f..72633b04a5 100644 --- a/cpp/src/qpid/broker/MessageStore.h +++ b/cpp/src/qpid/broker/MessageStore.h @@ -21,14 +21,17 @@ #ifndef _MessageStore_ #define _MessageStore_ -#include <boost/shared_ptr.hpp> -#include <qpid/Options.h> #include "PersistableExchange.h" #include "PersistableMessage.h" #include "PersistableQueue.h" #include "RecoveryManager.h" #include "TransactionalStore.h" +#include <qpid/Options.h> + +#include <boost/shared_ptr.hpp> +#include <boost/intrusive_ptr.hpp> + namespace qpid { namespace broker { @@ -96,7 +99,7 @@ public: * for that queue and avoid searching based on id. Set queue = 0 for * large message staging when the queue is not known. */ - virtual void stage(intrusive_ptr<PersistableMessage>& msg) = 0; + virtual void stage(boost::intrusive_ptr<PersistableMessage>& msg) = 0; /** * Destroys a previously staged message. This only needs @@ -109,7 +112,7 @@ public: /** * Appends content to a previously staged message */ - virtual void appendContent(intrusive_ptr<const PersistableMessage>& msg, + virtual void appendContent(boost::intrusive_ptr<const PersistableMessage>& msg, const std::string& data) = 0; /** @@ -121,7 +124,7 @@ public: * meta-data). */ virtual void loadContent(const qpid::broker::PersistableQueue& queue, - intrusive_ptr<const PersistableMessage>& msg, + boost::intrusive_ptr<const PersistableMessage>& msg, std::string& data, uint64_t offset, uint32_t length) = 0; /** @@ -138,7 +141,7 @@ public: * distributed transaction in which the operation takes * place or null for 'local' transactions */ - virtual void enqueue(TransactionContext* ctxt, intrusive_ptr<PersistableMessage>& msg, + virtual void enqueue(TransactionContext* ctxt, boost::intrusive_ptr<PersistableMessage>& msg, const PersistableQueue& queue) = 0; /** @@ -155,7 +158,7 @@ public: * distributed transaction in which the operation takes * place or null for 'local' transactions */ - virtual void dequeue(TransactionContext* ctxt, intrusive_ptr<PersistableMessage>& msg, + virtual void dequeue(TransactionContext* ctxt, boost::intrusive_ptr<PersistableMessage>& msg, const PersistableQueue& queue) = 0; /** diff --git a/cpp/src/qpid/broker/MessageStoreModule.cpp b/cpp/src/qpid/broker/MessageStoreModule.cpp index 094983e3fb..5d684ce6d7 100644 --- a/cpp/src/qpid/broker/MessageStoreModule.cpp +++ b/cpp/src/qpid/broker/MessageStoreModule.cpp @@ -25,6 +25,7 @@ // This transfer protects against the unloading of the store lib prior to the handling of the exception #define TRANSFER_EXCEPTION(fn) try { fn; } catch (std::exception& e) { throw Exception(e.what()); } +using boost::intrusive_ptr; using namespace qpid::broker; MessageStoreModule::MessageStoreModule(MessageStore* _store) : store(_store) {} diff --git a/cpp/src/qpid/broker/MessageStoreModule.h b/cpp/src/qpid/broker/MessageStoreModule.h index 160d681fab..abc0fbfd7a 100644 --- a/cpp/src/qpid/broker/MessageStoreModule.h +++ b/cpp/src/qpid/broker/MessageStoreModule.h @@ -24,8 +24,11 @@ #include "MessageStore.h" #include "Queue.h" #include "RecoveryManager.h" + #include "qpid/sys/Module.h" +#include <boost/intrusive_ptr.hpp> + namespace qpid { namespace broker { @@ -55,16 +58,16 @@ public: void unbind(const PersistableExchange& exchange, const PersistableQueue& queue, const std::string& key, const framing::FieldTable& args); void recover(RecoveryManager& queues); - void stage(intrusive_ptr<PersistableMessage>& msg); + void stage(boost::intrusive_ptr<PersistableMessage>& msg); void destroy(PersistableMessage& msg); - void appendContent(intrusive_ptr<const PersistableMessage>& msg, const std::string& data); + void appendContent(boost::intrusive_ptr<const PersistableMessage>& msg, const std::string& data); void loadContent(const qpid::broker::PersistableQueue& queue, - intrusive_ptr<const PersistableMessage>& msg, std::string& data, + boost::intrusive_ptr<const PersistableMessage>& msg, std::string& data, uint64_t offset, uint32_t length); - void enqueue(TransactionContext* ctxt, intrusive_ptr<PersistableMessage>& msg, + void enqueue(TransactionContext* ctxt, boost::intrusive_ptr<PersistableMessage>& msg, const PersistableQueue& queue); - void dequeue(TransactionContext* ctxt, intrusive_ptr<PersistableMessage>& msg, + void dequeue(TransactionContext* ctxt, boost::intrusive_ptr<PersistableMessage>& msg, const PersistableQueue& queue); u_int32_t outstandingQueueAIO(const PersistableQueue& queue); void flush(const qpid::broker::PersistableQueue& queue); diff --git a/cpp/src/qpid/broker/NullMessageStore.cpp b/cpp/src/qpid/broker/NullMessageStore.cpp index 5dbd3379fa..0dc7dbb82d 100644 --- a/cpp/src/qpid/broker/NullMessageStore.cpp +++ b/cpp/src/qpid/broker/NullMessageStore.cpp @@ -25,6 +25,8 @@ #include <iostream> +using boost::intrusive_ptr; + namespace qpid{ namespace broker{ diff --git a/cpp/src/qpid/broker/NullMessageStore.h b/cpp/src/qpid/broker/NullMessageStore.h index 9c5631b75d..7cde4db70b 100644 --- a/cpp/src/qpid/broker/NullMessageStore.h +++ b/cpp/src/qpid/broker/NullMessageStore.h @@ -25,6 +25,8 @@ #include "MessageStore.h" #include "Queue.h" +#include <boost/intrusive_ptr.hpp> + namespace qpid { namespace broker { @@ -56,16 +58,16 @@ public: virtual void unbind(const PersistableExchange& exchange, const PersistableQueue& queue, const std::string& key, const framing::FieldTable& args); virtual void recover(RecoveryManager& queues); - virtual void stage(intrusive_ptr<PersistableMessage>& msg); + virtual void stage(boost::intrusive_ptr<PersistableMessage>& msg); virtual void destroy(PersistableMessage& msg); - virtual void appendContent(intrusive_ptr<const PersistableMessage>& msg, + virtual void appendContent(boost::intrusive_ptr<const PersistableMessage>& msg, const std::string& data); virtual void loadContent(const qpid::broker::PersistableQueue& queue, - intrusive_ptr<const PersistableMessage>& msg, std::string& data, + boost::intrusive_ptr<const PersistableMessage>& msg, std::string& data, uint64_t offset, uint32_t length); - virtual void enqueue(TransactionContext* ctxt, intrusive_ptr<PersistableMessage>& msg, + virtual void enqueue(TransactionContext* ctxt, boost::intrusive_ptr<PersistableMessage>& msg, const PersistableQueue& queue); - virtual void dequeue(TransactionContext* ctxt, intrusive_ptr<PersistableMessage>& msg, + virtual void dequeue(TransactionContext* ctxt, boost::intrusive_ptr<PersistableMessage>& msg, const PersistableQueue& queue); virtual u_int32_t outstandingQueueAIO(const PersistableQueue& queue); virtual void flush(const qpid::broker::PersistableQueue& queue); diff --git a/cpp/src/qpid/broker/PreviewSessionManager.cpp b/cpp/src/qpid/broker/PreviewSessionManager.cpp index ec73082817..97a7c87e34 100644 --- a/cpp/src/qpid/broker/PreviewSessionManager.cpp +++ b/cpp/src/qpid/broker/PreviewSessionManager.cpp @@ -28,6 +28,7 @@ #include <boost/bind.hpp> #include <boost/range.hpp> +#include <boost/intrusive_ptr.hpp> #include <algorithm> #include <functional> @@ -105,7 +106,7 @@ void PreviewSessionManager::eraseExpired() { } } -void PreviewSessionManager::add(const intrusive_ptr<Observer>& o) { +void PreviewSessionManager::add(const boost::intrusive_ptr<Observer>& o) { observers.push_back(o); } diff --git a/cpp/src/qpid/broker/PreviewSessionManager.h b/cpp/src/qpid/broker/PreviewSessionManager.h index 65ca49ec89..9bc6bc5bbc 100644 --- a/cpp/src/qpid/broker/PreviewSessionManager.h +++ b/cpp/src/qpid/broker/PreviewSessionManager.h @@ -27,13 +27,14 @@ #include <qpid/sys/Mutex.h> #include <qpid/RefCounted.h> -#include <boost/noncopyable.hpp> -#include <boost/ptr_container/ptr_vector.hpp> - #include <set> #include <vector> #include <memory> +#include <boost/noncopyable.hpp> +#include <boost/ptr_container/ptr_vector.hpp> +#include <boost/intrusive_ptr.hpp> + namespace qpid { namespace broker { @@ -70,12 +71,12 @@ class PreviewSessionManager : private boost::noncopyable { std::auto_ptr<PreviewSessionState> resume(const framing::Uuid&); /** Add an Observer. */ - void add(const intrusive_ptr<Observer>&); + void add(const boost::intrusive_ptr<Observer>&); private: typedef boost::ptr_vector<PreviewSessionState> Suspended; typedef std::set<framing::Uuid> Active; - typedef std::vector<intrusive_ptr<Observer> > Observers; + typedef std::vector<boost::intrusive_ptr<Observer> > Observers; void erase(const framing::Uuid&); void eraseExpired(); diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index c4094a117b..92e87cc9d8 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -19,21 +19,25 @@ * */ -#include "qpid/log/Statement.h" -#include "qpid/framing/reply_exceptions.h" #include "Broker.h" #include "Queue.h" #include "Exchange.h" #include "DeliverableMessage.h" #include "MessageStore.h" +#include "QueueRegistry.h" + +#include "qpid/log/Statement.h" +#include "qpid/framing/reply_exceptions.h" #include "qpid/sys/Monitor.h" #include "qpid/sys/Time.h" + #include <iostream> -#include <boost/bind.hpp> -#include "QueueRegistry.h" #include <algorithm> #include <functional> +#include <boost/bind.hpp> +#include <boost/intrusive_ptr.hpp> + using namespace qpid::broker; using namespace qpid::sys; using namespace qpid::framing; @@ -87,7 +91,7 @@ void Queue::notifyDurableIOComplete() } -void Queue::deliver(intrusive_ptr<Message>& msg){ +void Queue::deliver(boost::intrusive_ptr<Message>& msg){ if (msg->isImmediate() && getConsumerCount() == 0) { if (alternateExchange) { DeliverableMessage deliverable(msg); @@ -124,7 +128,7 @@ void Queue::deliver(intrusive_ptr<Message>& msg){ } -void Queue::recover(intrusive_ptr<Message>& msg){ +void Queue::recover(boost::intrusive_ptr<Message>& msg){ push(msg); msg->enqueueComplete(); // mark the message as enqueued if (mgmtObject.get() != 0) { @@ -144,7 +148,7 @@ void Queue::recover(intrusive_ptr<Message>& msg){ } } -void Queue::process(intrusive_ptr<Message>& msg){ +void Queue::process(boost::intrusive_ptr<Message>& msg){ push(msg); if (mgmtObject.get() != 0) { Mutex::ScopedLock alock(mgmtObject->accessorLock); @@ -393,7 +397,7 @@ void Queue::pop(){ messages.pop_front(); } -void Queue::push(intrusive_ptr<Message>& msg){ +void Queue::push(boost::intrusive_ptr<Message>& msg){ Mutex::ScopedLock locker(messageLock); messages.push_back(QueuedMessage(this, msg, ++sequence)); if (policy.get()) { @@ -434,11 +438,11 @@ bool Queue::canAutoDelete() const{ } // return true if store exists, -bool Queue::enqueue(TransactionContext* ctxt, intrusive_ptr<Message> msg) +bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg) { if (msg->isPersistent() && store) { msg->enqueueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue - intrusive_ptr<PersistableMessage> pmsg = static_pointer_cast<PersistableMessage>(msg); + boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg); store->enqueue(ctxt, pmsg, *this); return true; } @@ -447,11 +451,11 @@ bool Queue::enqueue(TransactionContext* ctxt, intrusive_ptr<Message> msg) } // return true if store exists, -bool Queue::dequeue(TransactionContext* ctxt, intrusive_ptr<Message> msg) +bool Queue::dequeue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg) { if (msg->isPersistent() && store) { msg->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue - intrusive_ptr<PersistableMessage> pmsg = static_pointer_cast<PersistableMessage>(msg); + boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg); store->dequeue(ctxt, pmsg, *this); return true; } diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index bd6f1fe2c7..deaa0d58a6 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -21,24 +21,28 @@ * under the License. * */ -#include <vector> -#include <memory> -#include <deque> -#include <set> -#include <boost/shared_ptr.hpp> -#include <boost/enable_shared_from_this.hpp> -#include "qpid/framing/amqp_types.h" #include "OwnershipToken.h" #include "Consumer.h" #include "Message.h" -#include "qpid/framing/FieldTable.h" -#include "qpid/sys/Serializer.h" -#include "qpid/sys/Monitor.h" #include "PersistableQueue.h" #include "QueuePolicy.h" #include "QueueBindings.h" + +#include "qpid/framing/FieldTable.h" +#include "qpid/sys/Serializer.h" +#include "qpid/sys/Monitor.h" #include "qpid/management/Manageable.h" #include "qpid/management/Queue.h" +#include "qpid/framing/amqp_types.h" + +#include <vector> +#include <memory> +#include <deque> +#include <set> + +#include <boost/shared_ptr.hpp> +#include <boost/enable_shared_from_this.hpp> +#include <boost/intrusive_ptr.hpp> namespace qpid { namespace broker { @@ -80,7 +84,7 @@ namespace qpid { management::Queue::shared_ptr mgmtObject; void pop(); - void push(intrusive_ptr<Message>& msg); + void push(boost::intrusive_ptr<Message>& msg); void setPolicy(std::auto_ptr<QueuePolicy> policy); bool seek(QueuedMessage& msg, Consumer& position); bool getNextMessage(QueuedMessage& msg, Consumer& c); @@ -118,12 +122,12 @@ namespace qpid { * Delivers a message to the queue. Will record it as * enqueued if persistent then process it. */ - void deliver(intrusive_ptr<Message>& msg); + void deliver(boost::intrusive_ptr<Message>& msg); /** * Dispatches the messages immediately to a consumer if * one is available or stores it for later if not. */ - void process(intrusive_ptr<Message>& msg); + void process(boost::intrusive_ptr<Message>& msg); /** * Returns a message to the in-memory queue (due to lack * of acknowledegement from a receiver). If a consumer is @@ -134,7 +138,7 @@ namespace qpid { /** * Used during recovery to add stored messages back to the queue */ - void recover(intrusive_ptr<Message>& msg); + void recover(boost::intrusive_ptr<Message>& msg); void consume(Consumer& c, bool exclusive = false); void cancel(Consumer& c); @@ -153,11 +157,11 @@ namespace qpid { inline bool isAutoDelete() const { return autodelete; } bool canAutoDelete() const; - bool enqueue(TransactionContext* ctxt, intrusive_ptr<Message> msg); + bool enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg); /** * dequeue from store (only done once messages is acknowledged) */ - bool dequeue(TransactionContext* ctxt, intrusive_ptr<Message> msg); + bool dequeue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg); /** * dequeues from memory only */ diff --git a/cpp/src/qpid/broker/RecoveredDequeue.cpp b/cpp/src/qpid/broker/RecoveredDequeue.cpp index 9b5e23884e..e2d70964fb 100644 --- a/cpp/src/qpid/broker/RecoveredDequeue.cpp +++ b/cpp/src/qpid/broker/RecoveredDequeue.cpp @@ -20,6 +20,7 @@ */ #include "RecoveredDequeue.h" +using boost::intrusive_ptr; using namespace qpid::broker; RecoveredDequeue::RecoveredDequeue(Queue::shared_ptr _queue, intrusive_ptr<Message> _msg) : queue(_queue), msg(_msg) {} diff --git a/cpp/src/qpid/broker/RecoveredDequeue.h b/cpp/src/qpid/broker/RecoveredDequeue.h index 82668adb67..276e1f4c5c 100644 --- a/cpp/src/qpid/broker/RecoveredDequeue.h +++ b/cpp/src/qpid/broker/RecoveredDequeue.h @@ -21,23 +21,26 @@ #ifndef _RecoveredDequeue_ #define _RecoveredDequeue_ -#include <algorithm> -#include <functional> -#include <list> #include "Deliverable.h" #include "Message.h" #include "MessageStore.h" #include "Queue.h" #include "TxOp.h" +#include <boost/intrusive_ptr.hpp> + +#include <algorithm> +#include <functional> +#include <list> + namespace qpid { namespace broker { class RecoveredDequeue : public TxOp{ Queue::shared_ptr queue; - intrusive_ptr<Message> msg; + boost::intrusive_ptr<Message> msg; public: - RecoveredDequeue(Queue::shared_ptr queue, intrusive_ptr<Message> msg); + RecoveredDequeue(Queue::shared_ptr queue, boost::intrusive_ptr<Message> msg); virtual bool prepare(TransactionContext* ctxt) throw(); virtual void commit() throw(); virtual void rollback() throw(); diff --git a/cpp/src/qpid/broker/RecoveredEnqueue.cpp b/cpp/src/qpid/broker/RecoveredEnqueue.cpp index 5eeab7a435..1984a5d4a8 100644 --- a/cpp/src/qpid/broker/RecoveredEnqueue.cpp +++ b/cpp/src/qpid/broker/RecoveredEnqueue.cpp @@ -20,6 +20,7 @@ */ #include "RecoveredEnqueue.h" +using boost::intrusive_ptr; using namespace qpid::broker; RecoveredEnqueue::RecoveredEnqueue(Queue::shared_ptr _queue, intrusive_ptr<Message> _msg) : queue(_queue), msg(_msg) {} diff --git a/cpp/src/qpid/broker/RecoveredEnqueue.h b/cpp/src/qpid/broker/RecoveredEnqueue.h index 25b55e3e0c..6525179769 100644 --- a/cpp/src/qpid/broker/RecoveredEnqueue.h +++ b/cpp/src/qpid/broker/RecoveredEnqueue.h @@ -21,23 +21,26 @@ #ifndef _RecoveredEnqueue_ #define _RecoveredEnqueue_ -#include <algorithm> -#include <functional> -#include <list> #include "Deliverable.h" #include "Message.h" #include "MessageStore.h" #include "Queue.h" #include "TxOp.h" +#include <boost/intrusive_ptr.hpp> + +#include <algorithm> +#include <functional> +#include <list> + namespace qpid { namespace broker { class RecoveredEnqueue : public TxOp{ Queue::shared_ptr queue; - intrusive_ptr<Message> msg; + boost::intrusive_ptr<Message> msg; public: - RecoveredEnqueue(Queue::shared_ptr queue, intrusive_ptr<Message> msg); + RecoveredEnqueue(Queue::shared_ptr queue, boost::intrusive_ptr<Message> msg); virtual bool prepare(TransactionContext* ctxt) throw(); virtual void commit() throw(); virtual void rollback() throw(); diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp index 65583f1964..97226ebc22 100644 --- a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp +++ b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp @@ -28,7 +28,7 @@ using namespace qpid; using namespace qpid::broker; using boost::dynamic_pointer_cast; - +using boost::intrusive_ptr; static const uint8_t BASIC = 1; static const uint8_t MESSAGE = 2; diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp index fdde7ec18c..eb45ff1492 100644 --- a/cpp/src/qpid/broker/SemanticHandler.cpp +++ b/cpp/src/qpid/broker/SemanticHandler.cpp @@ -32,6 +32,7 @@ #include <boost/format.hpp> #include <boost/bind.hpp> +using boost::intrusive_ptr; using namespace qpid::broker; using namespace qpid::framing; using namespace qpid::sys; diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index f372c60044..8c29a4a4ec 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -53,6 +53,7 @@ namespace broker { using std::mem_fun_ref; using std::bind2nd; +using boost::intrusive_ptr; using namespace qpid::broker; using namespace qpid::framing; using namespace qpid::sys; diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h index 3d31d5a5a2..f9f343de38 100644 --- a/cpp/src/qpid/broker/SemanticState.h +++ b/cpp/src/qpid/broker/SemanticState.h @@ -32,6 +32,7 @@ #include "NameGenerator.h" #include "Prefetch.h" #include "TxBuffer.h" + #include "qpid/framing/FrameHandler.h" #include "qpid/framing/AccumulatedAck.h" #include "qpid/framing/Uuid.h" @@ -42,6 +43,8 @@ #include <map> #include <vector> +#include <boost/intrusive_ptr.hpp> + namespace qpid { namespace broker { @@ -69,8 +72,8 @@ class SemanticState : public framing::FrameHandler::Chains, uint32_t msgCredit; uint32_t byteCredit; - bool checkCredit(intrusive_ptr<Message>& msg); - void allocateCredit(intrusive_ptr<Message>& msg); + bool checkCredit(boost::intrusive_ptr<Message>& msg); + void allocateCredit(boost::intrusive_ptr<Message>& msg); public: ConsumerImpl(SemanticState* parent, DeliveryToken::shared_ptr token, @@ -78,8 +81,8 @@ class SemanticState : public framing::FrameHandler::Chains, bool ack, bool nolocal, bool acquire); ~ConsumerImpl(); bool deliver(QueuedMessage& msg); - bool filter(intrusive_ptr<Message> msg); - bool accept(intrusive_ptr<Message> msg); + bool filter(boost::intrusive_ptr<Message> msg); + bool accept(boost::intrusive_ptr<Message> msg); void notify(); void setWindowMode(); @@ -116,9 +119,9 @@ class SemanticState : public framing::FrameHandler::Chains, boost::shared_ptr<Exchange> cacheExchange; sys::AggregateOutput outputTasks; - void route(intrusive_ptr<Message> msg, Deliverable& strategy); + void route(boost::intrusive_ptr<Message> msg, Deliverable& strategy); void record(const DeliveryRecord& delivery); - bool checkPrefetch(intrusive_ptr<Message>& msg); + bool checkPrefetch(boost::intrusive_ptr<Message>& msg); void checkDtxTimeout(); ConsumerImpl& find(const std::string& destination); void ack(DeliveryId deliveryTag, DeliveryId endTag, bool cumulative); @@ -177,7 +180,7 @@ class SemanticState : public framing::FrameHandler::Chains, void acquire(DeliveryId first, DeliveryId last, DeliveryIds& acquired); void release(DeliveryId first, DeliveryId last); void reject(DeliveryId first, DeliveryId last); - void handle(intrusive_ptr<Message> msg); + void handle(boost::intrusive_ptr<Message> msg); bool doOutput() { return outputTasks.doOutput(); } //preview only (completed == ack): diff --git a/cpp/src/qpid/broker/SessionManager.cpp b/cpp/src/qpid/broker/SessionManager.cpp index 571d3365db..6e235e32c3 100644 --- a/cpp/src/qpid/broker/SessionManager.cpp +++ b/cpp/src/qpid/broker/SessionManager.cpp @@ -36,6 +36,7 @@ namespace qpid { namespace broker { +using boost::intrusive_ptr; using namespace sys; using namespace framing; diff --git a/cpp/src/qpid/broker/SessionManager.h b/cpp/src/qpid/broker/SessionManager.h index 7e8bd18f57..cc2190c2d1 100644 --- a/cpp/src/qpid/broker/SessionManager.h +++ b/cpp/src/qpid/broker/SessionManager.h @@ -27,13 +27,14 @@ #include <qpid/sys/Mutex.h> #include <qpid/RefCounted.h> -#include <boost/noncopyable.hpp> -#include <boost/ptr_container/ptr_vector.hpp> - #include <set> #include <vector> #include <memory> +#include <boost/noncopyable.hpp> +#include <boost/ptr_container/ptr_vector.hpp> +#include <boost/intrusive_ptr.hpp> + namespace qpid { namespace broker { @@ -70,12 +71,12 @@ class SessionManager : private boost::noncopyable { std::auto_ptr<SessionState> resume(const framing::Uuid&); /** Add an Observer. */ - void add(const intrusive_ptr<Observer>&); + void add(const boost::intrusive_ptr<Observer>&); private: typedef boost::ptr_vector<SessionState> Suspended; typedef std::set<framing::Uuid> Active; - typedef std::vector<intrusive_ptr<Observer> > Observers; + typedef std::vector<boost::intrusive_ptr<Observer> > Observers; void erase(const framing::Uuid&); void eraseExpired(); diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index e2f18bc927..105f946dcb 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -35,6 +35,7 @@ namespace broker { using namespace framing; using sys::Mutex; +using boost::intrusive_ptr; using qpid::management::ManagementAgent; using qpid::management::ManagementObject; using qpid::management::Manageable; diff --git a/cpp/src/qpid/broker/Timer.cpp b/cpp/src/qpid/broker/Timer.cpp index 173f350cde..33261dad6e 100644 --- a/cpp/src/qpid/broker/Timer.cpp +++ b/cpp/src/qpid/broker/Timer.cpp @@ -21,6 +21,7 @@ #include "Timer.h" #include <iostream> +using boost::intrusive_ptr; using qpid::sys::AbsTime; using qpid::sys::Duration; using qpid::sys::Monitor; diff --git a/cpp/src/qpid/broker/Timer.h b/cpp/src/qpid/broker/Timer.h index dcb02a5e0a..f702f0f32d 100644 --- a/cpp/src/qpid/broker/Timer.h +++ b/cpp/src/qpid/broker/Timer.h @@ -29,6 +29,8 @@ #include <memory> #include <queue> +#include <boost/intrusive_ptr.hpp> + namespace qpid { namespace broker { @@ -45,15 +47,15 @@ struct TimerTask : public RefCounted { }; struct Later { - bool operator()(const intrusive_ptr<TimerTask>& a, - const intrusive_ptr<TimerTask>& b) const; + bool operator()(const boost::intrusive_ptr<TimerTask>& a, + const boost::intrusive_ptr<TimerTask>& b) const; }; class Timer : private qpid::sys::Runnable { protected: qpid::sys::Monitor monitor; - std::priority_queue<intrusive_ptr<TimerTask>, - std::vector<intrusive_ptr<TimerTask> >, + std::priority_queue<boost::intrusive_ptr<TimerTask>, + std::vector<boost::intrusive_ptr<TimerTask> >, Later> tasks; qpid::sys::Thread runner; bool active; @@ -64,7 +66,7 @@ class Timer : private qpid::sys::Runnable { Timer(); virtual ~Timer(); - void add(intrusive_ptr<TimerTask> task); + void add(boost::intrusive_ptr<TimerTask> task); void start(); void stop(); diff --git a/cpp/src/qpid/broker/TxPublish.cpp b/cpp/src/qpid/broker/TxPublish.cpp index 0ad2eac080..46ec308bd2 100644 --- a/cpp/src/qpid/broker/TxPublish.cpp +++ b/cpp/src/qpid/broker/TxPublish.cpp @@ -21,6 +21,7 @@ #include "qpid/log/Statement.h" #include "TxPublish.h" +using boost::intrusive_ptr; using namespace qpid::broker; TxPublish::TxPublish(intrusive_ptr<Message> _msg) : msg(_msg) {} diff --git a/cpp/src/qpid/broker/TxPublish.h b/cpp/src/qpid/broker/TxPublish.h index 085dd28316..680e0c7546 100644 --- a/cpp/src/qpid/broker/TxPublish.h +++ b/cpp/src/qpid/broker/TxPublish.h @@ -21,15 +21,18 @@ #ifndef _TxPublish_ #define _TxPublish_ -#include <algorithm> -#include <functional> -#include <list> #include "Queue.h" #include "Deliverable.h" #include "Message.h" #include "MessageStore.h" #include "TxOp.h" +#include <algorithm> +#include <functional> +#include <list> + +#include <boost/intrusive_ptr.hpp> + namespace qpid { namespace broker { /** @@ -45,24 +48,24 @@ namespace qpid { class TxPublish : public TxOp, public Deliverable{ class Prepare{ TransactionContext* ctxt; - intrusive_ptr<Message>& msg; + boost::intrusive_ptr<Message>& msg; public: - Prepare(TransactionContext* ctxt, intrusive_ptr<Message>& msg); + Prepare(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg); void operator()(Queue::shared_ptr& queue); }; class Commit{ - intrusive_ptr<Message>& msg; + boost::intrusive_ptr<Message>& msg; public: - Commit(intrusive_ptr<Message>& msg); + Commit(boost::intrusive_ptr<Message>& msg); void operator()(Queue::shared_ptr& queue); }; - intrusive_ptr<Message> msg; + boost::intrusive_ptr<Message> msg; std::list<Queue::shared_ptr> queues; public: - TxPublish(intrusive_ptr<Message> msg); + TxPublish(boost::intrusive_ptr<Message> msg); virtual bool prepare(TransactionContext* ctxt) throw(); virtual void commit() throw(); virtual void rollback() throw(); |