summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2011-08-17 13:07:26 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2011-08-17 13:07:26 +0000
commit6833c389115a220fa0a05453bd19d54e5bd8e891 (patch)
treee2ead73531a141fea5d9c8d2f2b39797fcdfadef
parent7762a152b3dabd859089abb388268101b2c72ea0 (diff)
downloadqpid-python-6833c389115a220fa0a05453bd19d54e5bd8e891.tar.gz
QPID-3346: checkpoint - incorporate changes based on review feedback
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3346@1158686 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/DeliveryRecord.cpp5
-rw-r--r--qpid/cpp/src/qpid/broker/LegacyLVQ.cpp5
-rw-r--r--qpid/cpp/src/qpid/broker/LegacyLVQ.h1
-rw-r--r--qpid/cpp/src/qpid/broker/MessageDeque.cpp5
-rw-r--r--qpid/cpp/src/qpid/broker/MessageDeque.h1
-rw-r--r--qpid/cpp/src/qpid/broker/MessageMap.cpp5
-rw-r--r--qpid/cpp/src/qpid/broker/MessageMap.h1
-rw-r--r--qpid/cpp/src/qpid/broker/Messages.h9
-rw-r--r--qpid/cpp/src/qpid/broker/PriorityQueue.cpp19
-rw-r--r--qpid/cpp/src/qpid/broker/PriorityQueue.h1
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp215
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h8
12 files changed, 122 insertions, 153 deletions
diff --git a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
index a82a54f077..17ecef60d9 100644
--- a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
+++ b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
@@ -142,7 +142,7 @@ void DeliveryRecord::reject()
//just drop it
QPID_LOG(info, "Dropping rejected message from " << queue->getName());
}
- queue->dequeue(0, msg);
+ dequeue();
setEnded();
}
}
@@ -152,8 +152,7 @@ uint32_t DeliveryRecord::getCredit() const
return credit;
}
-void DeliveryRecord::acquire(DeliveryIds& results)
-{
+void DeliveryRecord::acquire(DeliveryIds& results) {
if (queue->acquire(msg, tag)) {
acquired = true;
results.push_back(id);
diff --git a/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp b/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp
index bc009f14d1..a811a86492 100644
--- a/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp
+++ b/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp
@@ -44,11 +44,6 @@ bool LegacyLVQ::remove(const framing::SequenceNumber& position, QueuedMessage& m
}
}
-bool LegacyLVQ::next(const QueuedMessage& message, QueuedMessage& next)
-{
- return this->next(message.position, next);
-}
-
bool LegacyLVQ::next(const framing::SequenceNumber& position, QueuedMessage& message)
{
if (MessageMap::next(position, message)) {
diff --git a/qpid/cpp/src/qpid/broker/LegacyLVQ.h b/qpid/cpp/src/qpid/broker/LegacyLVQ.h
index 33d4117845..dd0fd7aaec 100644
--- a/qpid/cpp/src/qpid/broker/LegacyLVQ.h
+++ b/qpid/cpp/src/qpid/broker/LegacyLVQ.h
@@ -42,7 +42,6 @@ class LegacyLVQ : public MessageMap
LegacyLVQ(const std::string& key, bool noBrowse = false, Broker* broker = 0);
bool remove(const framing::SequenceNumber&, QueuedMessage&);
bool next(const framing::SequenceNumber&, QueuedMessage&);
- bool next(const QueuedMessage&, QueuedMessage&);
bool push(const QueuedMessage& added, QueuedMessage& removed);
void removeIf(Predicate);
void setNoBrowse(bool);
diff --git a/qpid/cpp/src/qpid/broker/MessageDeque.cpp b/qpid/cpp/src/qpid/broker/MessageDeque.cpp
index b4fdfb3d6a..24b8f6f895 100644
--- a/qpid/cpp/src/qpid/broker/MessageDeque.cpp
+++ b/qpid/cpp/src/qpid/broker/MessageDeque.cpp
@@ -74,11 +74,6 @@ bool MessageDeque::find(const framing::SequenceNumber& position, QueuedMessage&
return find(position, message, false);
}
-bool MessageDeque::next(const QueuedMessage& message, QueuedMessage& next)
-{
- return this->next(message.position, next);
-}
-
bool MessageDeque::next(const framing::SequenceNumber& position, QueuedMessage& message)
{
if (messages.empty()) {
diff --git a/qpid/cpp/src/qpid/broker/MessageDeque.h b/qpid/cpp/src/qpid/broker/MessageDeque.h
index 0726e1dee6..0e1aef2986 100644
--- a/qpid/cpp/src/qpid/broker/MessageDeque.h
+++ b/qpid/cpp/src/qpid/broker/MessageDeque.h
@@ -41,7 +41,6 @@ class MessageDeque : public Messages
bool remove(const framing::SequenceNumber&, QueuedMessage&);
bool find(const framing::SequenceNumber&, QueuedMessage&);
bool next(const framing::SequenceNumber&, QueuedMessage&);
- bool next(const QueuedMessage&, QueuedMessage&);
QueuedMessage& front();
void pop();
diff --git a/qpid/cpp/src/qpid/broker/MessageMap.cpp b/qpid/cpp/src/qpid/broker/MessageMap.cpp
index 504b2e033c..39e23df533 100644
--- a/qpid/cpp/src/qpid/broker/MessageMap.cpp
+++ b/qpid/cpp/src/qpid/broker/MessageMap.cpp
@@ -77,11 +77,6 @@ bool MessageMap::find(const framing::SequenceNumber& position, QueuedMessage& me
}
}
-bool MessageMap::next(const QueuedMessage& message, QueuedMessage& next)
-{
- return this->next(message.position, next);
-}
-
bool MessageMap::next(const framing::SequenceNumber& position, QueuedMessage& message)
{
if (!messages.empty() && position < front().position) {
diff --git a/qpid/cpp/src/qpid/broker/MessageMap.h b/qpid/cpp/src/qpid/broker/MessageMap.h
index 9cfd9ea9af..1128a1d54a 100644
--- a/qpid/cpp/src/qpid/broker/MessageMap.h
+++ b/qpid/cpp/src/qpid/broker/MessageMap.h
@@ -47,7 +47,6 @@ class MessageMap : public Messages
virtual bool remove(const framing::SequenceNumber&, QueuedMessage&);
bool find(const framing::SequenceNumber&, QueuedMessage&);
virtual bool next(const framing::SequenceNumber&, QueuedMessage&);
- virtual bool next(const QueuedMessage&, QueuedMessage&);
QueuedMessage& front();
void pop();
diff --git a/qpid/cpp/src/qpid/broker/Messages.h b/qpid/cpp/src/qpid/broker/Messages.h
index 4b3a6d764f..448f17432a 100644
--- a/qpid/cpp/src/qpid/broker/Messages.h
+++ b/qpid/cpp/src/qpid/broker/Messages.h
@@ -76,15 +76,6 @@ class Messages
* @return true if there is another message, false otherwise.
*/
virtual bool next(const framing::SequenceNumber&, QueuedMessage&) = 0;
-
- /**
- * Return the next message based on the supplied queued message.
- * The next messages is passed back via the second parameter.
- * @todo replace with queue iterator
- * @return true if there is another message, false otherwise.
- */
- virtual bool next(const QueuedMessage&, QueuedMessage&) = 0;
-
/**
* Note: Caller is responsible for ensuring that there is a front
* (e.g. empty() returns false)
diff --git a/qpid/cpp/src/qpid/broker/PriorityQueue.cpp b/qpid/cpp/src/qpid/broker/PriorityQueue.cpp
index 3305bb4f79..e07e73d323 100644
--- a/qpid/cpp/src/qpid/broker/PriorityQueue.cpp
+++ b/qpid/cpp/src/qpid/broker/PriorityQueue.cpp
@@ -88,25 +88,6 @@ bool PriorityQueue::find(const framing::SequenceNumber& position, QueuedMessage&
return find(position, message, false);
}
-bool PriorityQueue::next(const QueuedMessage& message, QueuedMessage& next)
-{
- uint p = getPriorityLevel(message);
- QueuedMessage match;
- match.position = message.position+1;
- Deque::iterator m = lower_bound(messages[p].begin(), messages[p].end(), match);
- if (m != messages[p].end()) {
- next = *m;
- return true;
- }
- while (p-- > 0) {
- if (!messages[p].empty()) {
- next = messages[p].front();
- return true;
- }
- }
- return false;
-}
-
bool PriorityQueue::next(const framing::SequenceNumber& position, QueuedMessage& message)
{
QueuedMessage match;
diff --git a/qpid/cpp/src/qpid/broker/PriorityQueue.h b/qpid/cpp/src/qpid/broker/PriorityQueue.h
index 7464040e17..4bf9d26a9d 100644
--- a/qpid/cpp/src/qpid/broker/PriorityQueue.h
+++ b/qpid/cpp/src/qpid/broker/PriorityQueue.h
@@ -46,7 +46,6 @@ class PriorityQueue : public Messages
bool remove(const framing::SequenceNumber&, QueuedMessage&);
bool find(const framing::SequenceNumber&, QueuedMessage&);
bool next(const framing::SequenceNumber&, QueuedMessage&);
- bool next(const QueuedMessage&, QueuedMessage&);
QueuedMessage& front();
void pop();
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 551584f9d9..853bf09a9c 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -115,12 +115,6 @@ class MessageAllocator
/** hook to add any interesting management state to the status map (lock held) */
virtual void query(qpid::types::Variant::Map&, const Mutex::ScopedLock&) const {};
-
- /** for move, purge, reroute - check if message matches against a filter,
- * return true if message matches.
- */
- virtual bool match(const qpid::types::Variant::Map* filter,
- const QueuedMessage& message) const;
};
@@ -147,7 +141,6 @@ class MessageGroupManager : public QueueObserver, public MessageAllocator
static const std::string qpidMessageGroupKey;
static const std::string qpidMessageGroupTimestamp;
static const std::string qpidMessageGroupDefault;
- static const std::string qpidMessageGroupFilter; // key for move/purge filter map
const std::string getGroupId( const QueuedMessage& qm ) const;
@@ -174,9 +167,11 @@ class MessageGroupManager : public QueueObserver, public MessageAllocator
const std::string MessageGroupManager::qpidMessageGroupKey("qpid.group_header_key");
const std::string MessageGroupManager::qpidMessageGroupTimestamp("qpid.group_timestamp");
const std::string MessageGroupManager::qpidMessageGroupDefault("qpid.no_group"); /** @todo KAG: make configurable in Broker options */
-const std::string MessageGroupManager::qpidMessageGroupFilter("qpid.group_id");
}}
+// KAG TBD: END find me a home....
+
+
Queue::Queue(const string& _name, bool _autodelete,
MessageStore* const _store,
@@ -203,8 +198,7 @@ Queue::Queue(const string& _name, bool _autodelete,
deleted(false),
barrier(*this),
autoDeleteTimeout(0),
- allocator(new MessageAllocator( this )),
- type(FIFO)
+ allocator(new MessageAllocator( this ))
{
if (parent != 0 && broker != 0) {
ManagementAgent* agent = broker->getManagementAgent();
@@ -602,24 +596,101 @@ void Queue::purgeExpired(qpid::sys::Duration lapse)
namespace {
// for use with purge/move below - collect messages that match a given filter
+ //
+ class MessageFilter
+ {
+ public:
+ static const std::string typeKey;
+ static const std::string paramsKey;
+ static MessageFilter *create( const ::qpid::types::Variant::Map *filter );
+ virtual bool match( const QueuedMessage& ) const { return true; }
+ virtual ~MessageFilter() {}
+ protected:
+ MessageFilter() {};
+ };
+ const std::string MessageFilter::typeKey("filter_type");
+ const std::string MessageFilter::paramsKey("filter_params");
+
+ // filter by message header string value exact match
+ class HeaderMatchFilter : public MessageFilter
+ {
+ public:
+ /* Config:
+ { 'filter_type' : 'header_match_str',
+ 'filter_params' : { 'header_key' : "<header name>",
+ 'header_value' : "<value to match>"
+ }
+ }
+ */
+ static const std::string typeKey;
+ static const std::string headerKey;
+ static const std::string valueKey;
+ HeaderMatchFilter( const std::string& _header, const std::string& _value )
+ : MessageFilter (), header(_header), value(_value) {}
+ bool match( const QueuedMessage& msg ) const
+ {
+ const qpid::framing::FieldTable* headers = msg.payload->getApplicationHeaders();
+ if (!headers) return false;
+ FieldTable::ValuePtr h = headers->get(header);
+ if (!h || !h->convertsTo<std::string>()) return false;
+ return h->get<std::string>() == value;
+ }
+ private:
+ const std::string header;
+ const std::string value;
+ };
+ const std::string HeaderMatchFilter::typeKey("header_match_str");
+ const std::string HeaderMatchFilter::headerKey("header_key");
+ const std::string HeaderMatchFilter::valueKey("header_value");
+
+ // factory to create correct filter based on map
+ MessageFilter* MessageFilter::create( const ::qpid::types::Variant::Map *filter )
+ {
+ using namespace qpid::types;
+ if (filter) {
+ Variant::Map::const_iterator i = filter->find(MessageFilter::typeKey);
+ if (i != filter->end()) {
+
+ if (i->second.asString() == HeaderMatchFilter::typeKey) {
+ Variant::Map::const_iterator p = filter->find(MessageFilter::paramsKey);
+ if (p != filter->end() && p->second.getType() == VAR_MAP) {
+ Variant::Map::const_iterator k = p->second.asMap().find(HeaderMatchFilter::headerKey);
+ Variant::Map::const_iterator v = p->second.asMap().find(HeaderMatchFilter::valueKey);
+ if (k != p->second.asMap().end() && v != p->second.asMap().end()) {
+ std::string headerKey(k->second.asString());
+ std::string value(v->second.asString());
+ QPID_LOG(debug, "Message filtering by header value configured. key: " << headerKey << " value: " << value );
+ return new HeaderMatchFilter( headerKey, value );
+ }
+ }
+ }
+ }
+ QPID_LOG(error, "Ignoring unrecognized message filter: '" << *filter << "'");
+ }
+ return new MessageFilter();
+ }
+
+ // used by removeIf() to collect all messages matching a filter, maximum match count is
+ // optional.
struct Collector {
const uint32_t maxMatches;
- const qpid::types::Variant::Map *filter;
+ MessageFilter& filter;
std::deque<QueuedMessage> matches;
- boost::shared_ptr<MessageAllocator> allocator;
- Collector(boost::shared_ptr<MessageAllocator> a, uint32_t m,
- const qpid::types::Variant::Map *f)
- : maxMatches(m), filter(f), allocator(a) {}
- void operator() (QueuedMessage& qm)
+ Collector(MessageFilter& filter, uint32_t max)
+ : maxMatches(max), filter(filter) {}
+ bool operator() (QueuedMessage& qm)
{
if (maxMatches == 0 || matches.size() < maxMatches) {
- if (allocator->match( filter, qm )) {
+ if (filter.match( qm )) {
matches.push_back(qm);
+ return true;
}
}
+ return false;
}
};
-}
+
+} // end namespace
/**
@@ -640,56 +711,45 @@ namespace {
uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange> dest,
const qpid::types::Variant::Map *filter)
{
- Collector c(allocator, purge_request, filter);
+ std::auto_ptr<MessageFilter> mf(MessageFilter::create(filter));
+ Collector c(*mf.get(), purge_request);
Mutex::ScopedLock locker(messageLock);
- messages->foreach( boost::bind<void>(boost::ref(c), _1) );
-
- uint32_t count = c.matches.size();
-
- // first remove all matches
+ messages->removeIf( boost::bind<bool>(boost::ref(c), _1) );
for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin();
- qmsg != c.matches.end(); qmsg++) {
- /** @todo KAG: need a direct remove method here */
- bool ok = acquire(qmsg->position, *qmsg);
- (void) ok; assert(ok);
+ qmsg != c.matches.end(); ++qmsg) {
+ // Update observers and message state:
+ acquired(*qmsg);
dequeue(0, *qmsg);
- }
-
- // now reroute if necessary
- if (dest.get()) {
- while (!c.matches.empty()) {
- QueuedMessage msg = c.matches.front();
- c.matches.pop_front();
- assert(msg.payload);
- DeliverableMessage dmsg(msg.payload);
+ // now reroute if necessary
+ if (dest.get()) {
+ assert(qmsg->payload);
+ DeliverableMessage dmsg(qmsg->payload);
dest->routeWithAlternate(dmsg);
}
}
- return count;
+ return c.matches.size();
}
uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty,
const qpid::types::Variant::Map *filter)
{
- Collector c(allocator, qty, filter);
+ std::auto_ptr<MessageFilter> mf(MessageFilter::create(filter));
+ Collector c(*mf.get(), qty);
Mutex::ScopedLock locker(messageLock);
- messages->foreach( boost::bind<void>(boost::ref(c), _1) );
+ messages->removeIf( boost::bind<bool>(boost::ref(c), _1) );
- uint32_t count = c.matches.size();
-
- while (!c.matches.empty()) {
- QueuedMessage qmsg = c.matches.front();
- c.matches.pop_front();
- /** @todo KAG: need a direct remove method here */
- bool ok = acquire(qmsg.position, qmsg);
- (void) ok; assert(ok);
- dequeue(0, qmsg);
- assert(qmsg.payload);
- destq->deliver(qmsg.payload);
+ for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin();
+ qmsg != c.matches.end(); ++qmsg) {
+ // Update observers and message state:
+ acquired(*qmsg);
+ dequeue(0, *qmsg);
+ // and move to destination Queue.
+ assert(qmsg->payload);
+ destq->deliver(qmsg->payload);
}
- return count;
+ return c.matches.size();
}
/** Acquire the front (oldest) message from the in-memory queue.
@@ -939,7 +999,6 @@ void Queue::acquired(const QueuedMessage& msg)
}
}
-
void Queue::create(const FieldTable& _settings)
{
settings = _settings;
@@ -949,7 +1008,6 @@ void Queue::create(const FieldTable& _settings)
configureImpl(_settings);
}
-
int getIntegerSetting(const qpid::framing::FieldTable& settings, const std::string& key)
{
qpid::framing::FieldTable::ValuePtr v = settings.get(key);
@@ -1009,29 +1067,23 @@ void Queue::configureImpl(const FieldTable& _settings)
if (lvqKey.size()) {
QPID_LOG(debug, "Configured queue " << getName() << " as Last Value Queue with key " << lvqKey);
messages = std::auto_ptr<Messages>(new MessageMap(lvqKey));
- type = LVQ;
} else if (_settings.get(qpidLastValueQueueNoBrowse)) {
QPID_LOG(debug, "Configured queue " << getName() << " as Legacy Last Value Queue with 'no-browse' on");
messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, true, broker);
- type = LVQ;
} else if (_settings.get(qpidLastValueQueue)) {
QPID_LOG(debug, "Configured queue " << getName() << " as Legacy Last Value Queue");
messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, false, broker);
- type = LVQ;
} else {
std::auto_ptr<Messages> m = Fairshare::create(_settings);
if (m.get()) {
messages = m;
QPID_LOG(debug, "Configured queue " << getName() << " as priority queue.");
- type = PRIORITY;
- }
- }
-
- { // override default message allocator if message groups configured.
- boost::shared_ptr<MessageAllocator> ma = boost::static_pointer_cast<MessageAllocator>(MessageGroupManager::create( this, _settings ));
- if (ma) {
- allocator = ma;
- type = GROUP;
+ } else { // default (FIFO) queue type
+ // override default message allocator if message groups configured.
+ boost::shared_ptr<MessageAllocator> ma = boost::static_pointer_cast<MessageAllocator>(MessageGroupManager::create( this, _settings ));
+ if (ma) {
+ allocator = ma;
+ }
}
}
@@ -1593,11 +1645,9 @@ bool MessageGroupManager::nextMessage( Consumer::shared_ptr c, QueuedMessage& ne
if (c->preAcquires()) { // not browsing
next = messages.front();
- QueuedMessage current;
do {
- current = next;
/** @todo KAG: horrifingly suboptimal - optimize */
- std::string group( getGroupId( current ) );
+ std::string group( getGroupId( next ) );
GroupMap::iterator gs = messageGroups.find( group ); /** @todo need to cache this somehow */
assert( gs != messageGroups.end() );
GroupState& state( gs->second );
@@ -1610,7 +1660,7 @@ bool MessageGroupManager::nextMessage( Consumer::shared_ptr c, QueuedMessage& ne
if (state.owner == c->getName()) {
return true;
}
- } while (messages.next( current, next )); /** @todo: .next() is a linear search from front - optimize */
+ } while (messages.next( next.position, next )); /** @todo: .next() is a linear search from front - optimize */
return false;
} else if (messages.next(c->position, next))
return true;
@@ -1681,15 +1731,6 @@ void MessageGroupManager::query(qpid::types::Variant::Map& status,
status[GroupQueryKey] = state;
}
-bool MessageGroupManager::match(const qpid::types::Variant::Map* filter,
- const QueuedMessage& message) const
-{
- if (!filter) return true;
- qpid::types::Variant::Map::const_iterator i = filter->find( qpidMessageGroupFilter );
- if (i == filter->end()) return true;
- if (i->second.asString() == getGroupId(message)) return true;
- return false;
-}
boost::shared_ptr<MessageGroupManager> MessageGroupManager::create( Queue *q,
const qpid::framing::FieldTable& settings )
@@ -1698,16 +1739,6 @@ boost::shared_ptr<MessageGroupManager> MessageGroupManager::create( Queue *q,
if (settings.isSet(qpidMessageGroupKey)) {
- Queue::Disposition qt = q->getDisposition();
-
- if (qt == Queue::LVQ) {
- QPID_LOG( error, "Message Groups cannot be enabled on LVQ Queues, queue=" << q->getName());
- return empty;
- }
- if (qt == Queue::PRIORITY) {
- QPID_LOG( error, "Message Groups cannot be enabled for Priority Queues, queue=" << q->getName());
- return empty;
- }
std::string headerKey = settings.getAsString(qpidMessageGroupKey);
if (headerKey.empty()) {
QPID_LOG( error, "A Message Group header key must be configured, queue=" << q->getName());
@@ -1756,12 +1787,6 @@ bool MessageAllocator::canAcquire(const std::string&, const QueuedMessage&,
}
-// default match - ignore filter and always match.
-bool MessageAllocator::match(const qpid::types::Variant::Map*,
- const QueuedMessage&) const
-{
- return true;
-}
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index 67da0a1403..475da15577 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -70,10 +70,6 @@ class MessageAllocator;
class Queue : public boost::enable_shared_from_this<Queue>,
public PersistableQueue, public management::Manageable {
- public:
- enum Disposition {FIFO, LVQ, PRIORITY, GROUP};
-
- private:
struct UsageBarrier
{
Queue& parent;
@@ -134,8 +130,6 @@ class Queue : public boost::enable_shared_from_this<Queue>,
int autoDeleteTimeout;
boost::intrusive_ptr<qpid::sys::TimerTask> autoDeleteTask;
boost::shared_ptr<MessageAllocator> allocator;
- Disposition type;
-
void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false);
void setPolicy(std::auto_ptr<QueuePolicy> policy);
@@ -412,8 +406,6 @@ class Queue : public boost::enable_shared_from_this<Queue>,
uint32_t getDequeueSincePurge() { return dequeueSincePurge.get(); }
void setDequeueSincePurge(uint32_t value);
-
- Disposition getDisposition() const { return type; }
};
}
}