summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Queue.h
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Queue.h')
-rw-r--r--cpp/src/qpid/broker/Queue.h47
1 files changed, 32 insertions, 15 deletions
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index 1294f813aa..bb713eba2b 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -38,7 +38,6 @@
#include "qpid/framing/SequenceNumber.h"
#include "qpid/sys/AtomicValue.h"
#include "qpid/sys/Monitor.h"
-#include "qpid/sys/Timer.h"
#include "qpid/management/Manageable.h"
#include "qmf/org/apache/qpid/broker/Queue.h"
#include "qmf/org/apache/qpid/broker/Broker.h"
@@ -56,6 +55,9 @@
#include <algorithm>
namespace qpid {
+namespace sys {
+class TimerTask;
+}
namespace broker {
class Broker;
class Exchange;
@@ -83,6 +85,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
{
Queue& parent;
uint count;
+ qpid::sys::Monitor usageLock;
UsageBarrier(Queue&);
bool acquire();
@@ -142,18 +145,18 @@ class Queue : public boost::enable_shared_from_this<Queue>,
* o consumerCount (TBD: move under separate lock)
* o Queue::UsageBarrier (TBD: move under separate lock)
*/
- mutable qpid::sys::Monitor messageLock;
+ mutable qpid::sys::Mutex messageLock;
mutable qpid::sys::Mutex ownershipLock;
mutable uint64_t persistenceId;
- const QueueSettings settings;
+ QueueSettings settings;
qpid::framing::FieldTable encodableSettings;
QueueDepth current;
QueueBindings bindings;
std::string alternateExchangeName;
boost::shared_ptr<Exchange> alternateExchange;
framing::SequenceNumber sequence;
- qmf::org::apache::qpid::broker::Queue* mgmtObject;
- qmf::org::apache::qpid::broker::Broker* brokerMgmtObject;
+ qmf::org::apache::qpid::broker::Queue::shared_ptr mgmtObject;
+ qmf::org::apache::qpid::broker::Broker::shared_ptr brokerMgmtObject;
sys::AtomicValue<uint32_t> dequeueSincePurge; // Count dequeues since last purge.
int eventMode;
Observers observers;
@@ -189,7 +192,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
int getEventMode();
void dequeueFromStore(boost::intrusive_ptr<PersistableMessage>);
void abandoned(const Message& message);
- void checkNotDeleted(const Consumer::shared_ptr&);
+ bool checkNotDeleted(const Consumer::shared_ptr&);
void notifyDeleted();
uint32_t remove(uint32_t maxCount, MessagePredicate, MessageFunctor, SubscriptionType);
virtual bool checkDepth(const QueueDepth& increment, const Message&);
@@ -338,7 +341,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
* exclusive owner
*/
static Queue::shared_ptr restore(QueueRegistry& queues, framing::Buffer& buffer);
- static void tryAutoDelete(Broker& broker, Queue::shared_ptr, const std::string& connectionId, const std::string& userId);
+ QPID_BROKER_EXTERN static void tryAutoDelete(Broker& broker, Queue::shared_ptr, const std::string& connectionId, const std::string& userId);
virtual void setExternalQueueStore(ExternalQueueStore* inst);
@@ -352,7 +355,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
QPID_BROKER_EXTERN void countLoadedFromDisk(uint64_t size) const;
// Manageable entry points
- QPID_BROKER_EXTERN management::ManagementObject* GetManagementObject (void) const;
+ QPID_BROKER_EXTERN management::ManagementObject::shared_ptr GetManagementObject(void) const;
management::Manageable::status_t
QPID_BROKER_EXTERN ManagementMethod (uint32_t methodId, management::Args& args, std::string& text);
QPID_BROKER_EXTERN void query(::qpid::types::Variant::Map&) const;
@@ -382,15 +385,30 @@ class Queue : public boost::enable_shared_from_this<Queue>,
*
* The _caller_ must ensure that any messages after pos have been dequeued.
*
- * Used by HA/cluster code for queue replication.
+ * Used by HA code for queue replication.
*/
QPID_BROKER_EXTERN void setPosition(framing::SequenceNumber pos);
/**
*@return sequence number for the back of the queue. The next message pushed
- * will be at getPosition+1
+ * will be at getPosition()+1
*/
QPID_BROKER_EXTERN framing::SequenceNumber getPosition();
+
+ /**
+ * Set front and back.
+ * If the queue is empty then front = back+1 (the first message to
+ * consume will be the next message pushed.)
+ *
+ *@param front = Position of first message to consume.
+ *@param back = getPosition(), next message pushed will be getPosition()+1
+ *@param type Subscription type to use to determine the front.
+ */
+ QPID_BROKER_EXTERN void getRange(
+ framing::SequenceNumber& front, framing::SequenceNumber& back,
+ SubscriptionType type=CONSUMER
+ );
+
QPID_BROKER_EXTERN void addObserver(boost::shared_ptr<QueueObserver>);
QPID_BROKER_EXTERN void removeObserver(boost::shared_ptr<QueueObserver>);
QPID_BROKER_EXTERN void insertSequenceNumbers(const std::string& key);
@@ -399,11 +417,6 @@ class Queue : public boost::enable_shared_from_this<Queue>,
*/
QPID_BROKER_EXTERN void recoveryComplete(ExchangeRegistry& exchanges);
- // For cluster update
- QPID_BROKER_EXTERN QueueListeners& getListeners();
- QPID_BROKER_EXTERN Messages& getMessages();
- QPID_BROKER_EXTERN const Messages& getMessages() const;
-
/**
* Reserve space in policy for an enqueued message that
* has been recovered in the prepared state (dtx only)
@@ -420,6 +433,10 @@ class Queue : public boost::enable_shared_from_this<Queue>,
uint32_t getDequeueSincePurge() { return dequeueSincePurge.get(); }
QPID_BROKER_EXTERN void setDequeueSincePurge(uint32_t value);
+
+ /** Add an argument to be included in management messages about this queue. */
+ QPID_BROKER_EXTERN void addArgument(const std::string& key, const types::Variant& value);
+
friend class QueueFactory;
};
}