summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker')
-rw-r--r--qpid/cpp/src/qpid/broker/AclModule.h22
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp16
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.h3
-rw-r--r--qpid/cpp/src/qpid/broker/DirectExchange.cpp33
-rw-r--r--qpid/cpp/src/qpid/broker/DtxAck.cpp22
-rw-r--r--qpid/cpp/src/qpid/broker/Exchange.cpp34
-rw-r--r--qpid/cpp/src/qpid/broker/Exchange.h3
-rw-r--r--qpid/cpp/src/qpid/broker/FanOutExchange.cpp32
-rw-r--r--qpid/cpp/src/qpid/broker/HeadersExchange.cpp29
-rw-r--r--qpid/cpp/src/qpid/broker/Message.cpp110
-rw-r--r--qpid/cpp/src/qpid/broker/Message.h17
-rw-r--r--qpid/cpp/src/qpid/broker/MessageBuilder.cpp3
-rw-r--r--qpid/cpp/src/qpid/broker/MessageStoreModule.cpp6
-rw-r--r--qpid/cpp/src/qpid/broker/MessageStoreModule.h5
-rw-r--r--qpid/cpp/src/qpid/broker/PersistableMessage.cpp31
-rw-r--r--qpid/cpp/src/qpid/broker/PersistableMessage.h21
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp102
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h17
-rw-r--r--qpid/cpp/src/qpid/broker/QueueEvents.cpp41
-rw-r--r--qpid/cpp/src/qpid/broker/QueueEvents.h4
-rw-r--r--qpid/cpp/src/qpid/broker/QueuePolicy.cpp144
-rw-r--r--qpid/cpp/src/qpid/broker/QueuePolicy.h34
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp35
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.h2
-rw-r--r--qpid/cpp/src/qpid/broker/SessionAdapter.cpp7
-rw-r--r--qpid/cpp/src/qpid/broker/SessionContext.h3
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.h2
-rw-r--r--qpid/cpp/src/qpid/broker/SignalHandler.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/SignalHandler.h3
-rw-r--r--qpid/cpp/src/qpid/broker/TopicExchange.cpp43
-rw-r--r--qpid/cpp/src/qpid/broker/TxAccept.cpp8
-rw-r--r--qpid/cpp/src/qpid/broker/TxPublish.cpp54
-rw-r--r--qpid/cpp/src/qpid/broker/TxPublish.h14
33 files changed, 519 insertions, 383 deletions
diff --git a/qpid/cpp/src/qpid/broker/AclModule.h b/qpid/cpp/src/qpid/broker/AclModule.h
index 536fa21b2b..2f4f7eaacc 100644
--- a/qpid/cpp/src/qpid/broker/AclModule.h
+++ b/qpid/cpp/src/qpid/broker/AclModule.h
@@ -40,7 +40,8 @@ enum Action {ACT_CONSUME, ACT_PUBLISH, ACT_CREATE, ACT_ACCESS, ACT_BIND,
enum Property {PROP_NAME, PROP_DURABLE, PROP_OWNER, PROP_ROUTINGKEY,
PROP_PASSIVE, PROP_AUTODELETE, PROP_EXCLUSIVE, PROP_TYPE,
PROP_ALTERNATE, PROP_QUEUENAME, PROP_SCHEMAPACKAGE,
- PROP_SCHEMACLASS};
+ PROP_SCHEMACLASS, PROP_POLICYTYPE, PROP_MAXQUEUESIZE,
+ PROP_MAXQUEUECOUNT};
enum AclResult {ALLOW, ALLOWLOG, DENY, DENYLOG};
} // namespace acl
@@ -132,6 +133,9 @@ class AclHelper {
if (str.compare("queuename") == 0) return PROP_QUEUENAME;
if (str.compare("schemapackage") == 0) return PROP_SCHEMAPACKAGE;
if (str.compare("schemaclass") == 0) return PROP_SCHEMACLASS;
+ if (str.compare("policytype") == 0) return PROP_POLICYTYPE;
+ if (str.compare("maxqueuesize") == 0) return PROP_MAXQUEUESIZE;
+ if (str.compare("maxqueuecount") == 0) return PROP_MAXQUEUECOUNT;
throw str;
}
static inline std::string getPropertyStr(const Property p) {
@@ -148,6 +152,9 @@ class AclHelper {
case PROP_QUEUENAME: return "queuename";
case PROP_SCHEMAPACKAGE: return "schemapackage";
case PROP_SCHEMACLASS: return "schemaclass";
+ case PROP_POLICYTYPE: return "policytype";
+ case PROP_MAXQUEUESIZE: return "maxqueuesize";
+ case PROP_MAXQUEUECOUNT: return "maxqueuecount";
default: assert(false); // should never get here
}
return "";
@@ -217,11 +224,14 @@ class AclHelper {
// == Queues ==
propSetPtr p4(new propSet);
- p3->insert(PROP_ALTERNATE);
- p3->insert(PROP_PASSIVE);
- p3->insert(PROP_DURABLE);
- p3->insert(PROP_EXCLUSIVE);
- p3->insert(PROP_AUTODELETE);
+ p4->insert(PROP_ALTERNATE);
+ p4->insert(PROP_PASSIVE);
+ p4->insert(PROP_DURABLE);
+ p4->insert(PROP_EXCLUSIVE);
+ p4->insert(PROP_AUTODELETE);
+ p4->insert(PROP_POLICYTYPE);
+ p4->insert(PROP_MAXQUEUESIZE);
+ p4->insert(PROP_MAXQUEUECOUNT);
actionMapPtr a1(new actionMap);
a1->insert(actionPair(ACT_ACCESS, p0));
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index 13cf88fb11..4259bb2f31 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -91,7 +91,8 @@ Broker::Options::Options(const std::string& name) :
queueLimit(100*1048576/*100M default limit*/),
tcpNoDelay(false),
requireEncrypted(false),
- maxSessionRate(0)
+ maxSessionRate(0),
+ asyncQueueEvents(true)
{
int c = sys::SystemInfo::concurrency();
workerThreads=c+1;
@@ -121,7 +122,8 @@ Broker::Options::Options(const std::string& name) :
("tcp-nodelay", optValue(tcpNoDelay), "Set TCP_NODELAY on TCP connections")
("require-encryption", optValue(requireEncrypted), "Only accept connections that are encrypted")
("known-hosts-url", optValue(knownHosts, "URL or 'none'"), "URL to send as 'known-hosts' to clients ('none' implies empty list)")
- ("max-session-rate", optValue(maxSessionRate, "MESSAGES/S"), "Sets the maximum message rate per session (0=unlimited)");
+ ("max-session-rate", optValue(maxSessionRate, "MESSAGES/S"), "Sets the maximum message rate per session (0=unlimited)")
+ ("async-queue-events", optValue(asyncQueueEvents, "yes|no"), "Set Queue Events async, used for services like replication");
}
const std::string empty;
@@ -150,7 +152,7 @@ Broker::Broker(const Broker::Options& conf) :
*this),
managementAgent(conf.enableMgmt ? new ManagementAgent() : 0),
queueCleaner(queues, timer),
- queueEvents(poller),
+ queueEvents(poller,!conf.asyncQueueEvents),
recovery(true),
expiryPolicy(new ExpiryPolicy),
getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this))
@@ -208,8 +210,10 @@ Broker::Broker(const Broker::Options& conf) :
(*i)->earlyInitialize(*this);
// If no plugin store module registered itself, set up the null store.
- if (store.get() == 0)
- setStore (new NullMessageStore());
+ if (store.get() == 0) {
+ boost::shared_ptr<MessageStore> p(new NullMessageStore());
+ setStore (p);
+ }
exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
@@ -296,7 +300,7 @@ boost::intrusive_ptr<Broker> Broker::create(const Options& opts)
return boost::intrusive_ptr<Broker>(new Broker(opts));
}
-void Broker::setStore (MessageStore* _store)
+void Broker::setStore (boost::shared_ptr<MessageStore>& _store)
{
store.reset(new MessageStoreModule (_store));
queues.setStore (store.get());
diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h
index 0517ceca95..5ca01e0867 100644
--- a/qpid/cpp/src/qpid/broker/Broker.h
+++ b/qpid/cpp/src/qpid/broker/Broker.h
@@ -111,6 +111,7 @@ public:
bool requireEncrypted;
std::string knownHosts;
uint32_t maxSessionRate;
+ bool asyncQueueEvents;
private:
std::string getHome();
@@ -171,7 +172,7 @@ public:
/** Shut down the broker */
virtual void shutdown();
- QPID_BROKER_EXTERN void setStore (MessageStore*);
+ QPID_BROKER_EXTERN void setStore (boost::shared_ptr<MessageStore>& store);
MessageStore& getStore() { return *store; }
void setAcl (AclModule* _acl) {acl = _acl;}
AclModule* getAcl() { return acl; }
diff --git a/qpid/cpp/src/qpid/broker/DirectExchange.cpp b/qpid/cpp/src/qpid/broker/DirectExchange.cpp
index b9f24dee5f..29fe47beac 100644
--- a/qpid/cpp/src/qpid/broker/DirectExchange.cpp
+++ b/qpid/cpp/src/qpid/broker/DirectExchange.cpp
@@ -145,39 +145,12 @@ bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, c
void DirectExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/)
{
PreRoute pr(msg, this);
- Queues::ConstPtr p;
+ ConstBindingList b;
{
Mutex::ScopedLock l(lock);
- p = bindings[routingKey].queues.snapshot();
- }
- int count(0);
-
- if (p) {
- for(std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); i++, count++) {
- msg.deliverTo((*i)->queue);
- if ((*i)->mgmtBinding != 0)
- (*i)->mgmtBinding->inc_msgMatched();
- }
- }
-
- if(!count){
- QPID_LOG(info, "DirectExchange " << getName() << " could not route message with key " << routingKey
- << "; no matching binding found");
- if (mgmtExchange != 0) {
- mgmtExchange->inc_msgDrops();
- mgmtExchange->inc_byteDrops(msg.contentSize());
- }
- } else {
- if (mgmtExchange != 0) {
- mgmtExchange->inc_msgRoutes(count);
- mgmtExchange->inc_byteRoutes(count * msg.contentSize());
- }
- }
-
- if (mgmtExchange != 0) {
- mgmtExchange->inc_msgReceives();
- mgmtExchange->inc_byteReceives(msg.contentSize());
+ b = bindings[routingKey].queues.snapshot();
}
+ doRoute(msg, b);
}
diff --git a/qpid/cpp/src/qpid/broker/DtxAck.cpp b/qpid/cpp/src/qpid/broker/DtxAck.cpp
index b189ef4cdb..bca3f90bbe 100644
--- a/qpid/cpp/src/qpid/broker/DtxAck.cpp
+++ b/qpid/cpp/src/qpid/broker/DtxAck.cpp
@@ -48,12 +48,26 @@ bool DtxAck::prepare(TransactionContext* ctxt) throw()
void DtxAck::commit() throw()
{
- for_each(pending.begin(), pending.end(), mem_fun_ref(&DeliveryRecord::committed));
- pending.clear();
+ try {
+ for_each(pending.begin(), pending.end(), mem_fun_ref(&DeliveryRecord::committed));
+ pending.clear();
+ } catch (const std::exception& e) {
+ QPID_LOG(error, "Failed to commit: " << e.what());
+ } catch(...) {
+ QPID_LOG(error, "Failed to commit (unknown error)");
+ }
+
}
void DtxAck::rollback() throw()
{
- for_each(pending.begin(), pending.end(), mem_fun_ref(&DeliveryRecord::requeue));
- pending.clear();
+ try {
+ for_each(pending.begin(), pending.end(), mem_fun_ref(&DeliveryRecord::requeue));
+ pending.clear();
+ } catch (const std::exception& e) {
+ QPID_LOG(error, "Failed to complete rollback: " << e.what());
+ } catch(...) {
+ QPID_LOG(error, "Failed to complete rollback (unknown error)");
+ }
+
}
diff --git a/qpid/cpp/src/qpid/broker/Exchange.cpp b/qpid/cpp/src/qpid/broker/Exchange.cpp
index 90d81b81c6..757127eef2 100644
--- a/qpid/cpp/src/qpid/broker/Exchange.cpp
+++ b/qpid/cpp/src/qpid/broker/Exchange.cpp
@@ -76,6 +76,40 @@ Exchange::PreRoute::~PreRoute(){
}
}
+void Exchange::doRoute(Deliverable& msg, ConstBindingList b)
+{
+ int count = 0;
+
+ if (b.get()) {
+ // Block the content release if the message is transient AND there is more than one binding
+ if (!msg.getMessage().isPersistent() && b->size() > 1)
+ msg.getMessage().blockContentRelease();
+
+ for(std::vector<Binding::shared_ptr>::const_iterator i = b->begin(); i != b->end(); i++, count++) {
+ msg.deliverTo((*i)->queue);
+ if ((*i)->mgmtBinding != 0)
+ (*i)->mgmtBinding->inc_msgMatched();
+ }
+ }
+
+ if (mgmtExchange != 0)
+ {
+ mgmtExchange->inc_msgReceives ();
+ mgmtExchange->inc_byteReceives (msg.contentSize ());
+ if (count == 0)
+ {
+ //QPID_LOG(warning, "Exchange " << getName() << " could not route message; no matching binding found");
+ mgmtExchange->inc_msgDrops ();
+ mgmtExchange->inc_byteDrops (msg.contentSize ());
+ }
+ else
+ {
+ mgmtExchange->inc_msgRoutes (count);
+ mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
+ }
+ }
+}
+
void Exchange::routeIVE(){
if (ive && lastMsg.get()){
DeliverableMessage dmsg(lastMsg);
diff --git a/qpid/cpp/src/qpid/broker/Exchange.h b/qpid/cpp/src/qpid/broker/Exchange.h
index c1e878200f..9bea376c28 100644
--- a/qpid/cpp/src/qpid/broker/Exchange.h
+++ b/qpid/cpp/src/qpid/broker/Exchange.h
@@ -79,6 +79,9 @@ protected:
Exchange* parent;
};
+ typedef boost::shared_ptr<const std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> > > ConstBindingList;
+ typedef boost::shared_ptr< std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> > > BindingList;
+ void doRoute(Deliverable& msg, ConstBindingList b);
void routeIVE();
diff --git a/qpid/cpp/src/qpid/broker/FanOutExchange.cpp b/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
index e9007ba682..b7d46a33fe 100644
--- a/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
+++ b/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
@@ -106,36 +106,12 @@ bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*key*/, cons
return true;
}
-void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* /*args*/){
+void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* /*args*/)
+{
PreRoute pr(msg, this);
- uint32_t count(0);
-
- BindingsArray::ConstPtr p = bindings.snapshot();
- if (p.get()){
- for(std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i, count++){
- msg.deliverTo((*i)->queue);
- if ((*i)->mgmtBinding != 0)
- (*i)->mgmtBinding->inc_msgMatched ();
- }
- }
-
- if (mgmtExchange != 0)
- {
- mgmtExchange->inc_msgReceives ();
- mgmtExchange->inc_byteReceives (msg.contentSize ());
- if (count == 0)
- {
- mgmtExchange->inc_msgDrops ();
- mgmtExchange->inc_byteDrops (msg.contentSize ());
- }
- else
- {
- mgmtExchange->inc_msgRoutes (count);
- mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
- }
- }
+ doRoute(msg, bindings.snapshot());
}
-
+
bool FanOutExchange::isBound(Queue::shared_ptr queue, const string* const, const FieldTable* const)
{
BindingsArray::ConstPtr ptr = bindings.snapshot();
diff --git a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp b/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
index c628c44909..a7c90156e1 100644
--- a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
+++ b/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
@@ -104,7 +104,8 @@ bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& bindingKey,
}
-void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* args){
+void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* args)
+{
if (!args) {
//can't match if there were no headers passed in
if (mgmtExchange != 0) {
@@ -118,31 +119,17 @@ void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, cons
PreRoute pr(msg, this);
- uint32_t count(0);
-
- Bindings::ConstPtr p = bindings.snapshot();
- if (p.get()){
+ ConstBindingList p = bindings.snapshot();
+ BindingList b(new std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> >);
+ if (p.get())
+ {
for (std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i) {
if (match((*i)->args, *args)) {
- msg.deliverTo((*i)->queue);
- count++;
- if ((*i)->mgmtBinding != 0)
- (*i)->mgmtBinding->inc_msgMatched();
+ b->push_back(*i);
}
}
}
-
- if (mgmtExchange != 0) {
- mgmtExchange->inc_msgReceives();
- mgmtExchange->inc_byteReceives(msg.contentSize());
- if (count == 0) {
- mgmtExchange->inc_msgDrops();
- mgmtExchange->inc_byteDrops(msg.contentSize());
- } else {
- mgmtExchange->inc_msgRoutes(count);
- mgmtExchange->inc_byteRoutes(count * msg.contentSize());
- }
- }
+ doRoute(msg, b);
}
diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp
index 7360010192..e2799b0bff 100644
--- a/qpid/cpp/src/qpid/broker/Message.cpp
+++ b/qpid/cpp/src/qpid/broker/Message.cpp
@@ -49,7 +49,7 @@ TransferAdapter Message::TRANSFER;
Message::Message(const framing::SequenceNumber& id) :
frames(id), persistenceId(0), redelivered(false), loaded(false),
staged(false), forcePersistentPolicy(false), publisher(0), adapter(0),
- expiration(FAR_FUTURE), enqueueCallback(0), dequeueCallback(0) {}
+ expiration(FAR_FUTURE), enqueueCallback(0), dequeueCallback(0), requiredCredit(0) {}
Message::~Message()
{
@@ -98,7 +98,7 @@ const FieldTable* Message::getApplicationHeaders() const
return getAdapter().getApplicationHeaders(frames);
}
-bool Message::isPersistent()
+bool Message::isPersistent() const
{
return (getAdapter().isPersistent(frames) || forcePersistentPolicy);
}
@@ -108,12 +108,16 @@ bool Message::requiresAccept()
return getAdapter().requiresAccept(frames);
}
-uint32_t Message::getRequiredCredit() const
+uint32_t Message::getRequiredCredit()
{
- //add up payload for all header and content frames in the frameset
- SumBodySize sum;
- frames.map_if(sum, TypeFilter2<HEADER_BODY, CONTENT_BODY>());
- return sum.getSize();
+ sys::Mutex::ScopedLock l(lock);
+ if (!requiredCredit) {
+ //add up payload for all header and content frames in the frameset
+ SumBodySize sum;
+ frames.map_if(sum, TypeFilter2<HEADER_BODY, CONTENT_BODY>());
+ requiredCredit = sum.getSize();
+ }
+ return requiredCredit;
}
void Message::encode(framing::Buffer& buffer) const
@@ -181,17 +185,31 @@ void Message::decodeContent(framing::Buffer& buffer)
loaded = true;
}
-void Message::releaseContent(MessageStore* _store)
+void Message::tryReleaseContent()
{
- if (!store) {
- store = _store;
+ if (checkContentReleasable()) {
+ releaseContent();
}
+}
+
+void Message::releaseContent(MessageStore* s)
+{
+ //deprecated, use setStore(store); releaseContent(); instead
+ if (!store) setStore(s);
+ releaseContent();
+}
+
+void Message::releaseContent()
+{
+ sys::Mutex::ScopedLock l(lock);
if (store) {
if (!getPersistenceId()) {
intrusive_ptr<PersistableMessage> pmsg(this);
store->stage(pmsg);
staged = true;
}
+ //ensure required credit is cached before content frames are released
+ getRequiredCredit();
//remove any content frames from the frameset
frames.remove(TypeFilter<CONTENT_BODY>());
setContentReleased();
@@ -211,31 +229,29 @@ void Message::destroy()
bool Message::getContentFrame(const Queue& queue, AMQFrame& frame, uint16_t maxContentSize, uint64_t offset) const
{
- if (isContentReleased()) {
- intrusive_ptr<const PersistableMessage> pmsg(this);
-
- bool done = false;
- string& data = frame.castBody<AMQContentBody>()->getData();
- store->loadContent(queue, pmsg, data, offset, maxContentSize);
- done = data.size() < maxContentSize;
- frame.setBof(false);
- frame.setEof(true);
- QPID_LOG(debug, "loaded frame" << frame);
- if (offset > 0) {
- frame.setBos(false);
- }
- if (!done) {
- frame.setEos(false);
- } else return false;
- return true;
+ intrusive_ptr<const PersistableMessage> pmsg(this);
+
+ bool done = false;
+ string& data = frame.castBody<AMQContentBody>()->getData();
+ store->loadContent(queue, pmsg, data, offset, maxContentSize);
+ done = data.size() < maxContentSize;
+ frame.setBof(false);
+ frame.setEof(true);
+ QPID_LOG(debug, "loaded frame" << frame);
+ if (offset > 0) {
+ frame.setBos(false);
}
- else return false;
+ if (!done) {
+ frame.setEos(false);
+ } else return false;
+ return true;
}
void Message::sendContent(const Queue& queue, framing::FrameHandler& out, uint16_t maxFrameSize) const
{
+ sys::Mutex::ScopedLock l(lock);
if (isContentReleased() && !frames.isComplete()) {
-
+ sys::Mutex::ScopedUnlock u(lock);
uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead();
bool morecontent = true;
for (uint64_t offset = 0; morecontent; offset += maxContentSize)
@@ -373,28 +389,36 @@ void Message::setReplacementMessage(boost::intrusive_ptr<Message> msg, const Que
}
void Message::allEnqueuesComplete() {
- MessageCallback* cb = 0;
- {
- sys::Mutex::ScopedLock l(lock);
- std::swap(cb, enqueueCallback);
- }
+ sys::Mutex::ScopedLock l(callbackLock);
+ MessageCallback* cb = enqueueCallback;
if (cb && *cb) (*cb)(intrusive_ptr<Message>(this));
}
void Message::allDequeuesComplete() {
- MessageCallback* cb = 0;
- {
- sys::Mutex::ScopedLock l(lock);
- std::swap(cb, dequeueCallback);
- }
+ sys::Mutex::ScopedLock l(callbackLock);
+ MessageCallback* cb = dequeueCallback;
if (cb && *cb) (*cb)(intrusive_ptr<Message>(this));
}
-void Message::setEnqueueCompleteCallback(MessageCallback& cb) { enqueueCallback = &cb; }
-void Message::resetEnqueueCompleteCallback() { enqueueCallback = 0; }
+void Message::setEnqueueCompleteCallback(MessageCallback& cb) {
+ sys::Mutex::ScopedLock l(callbackLock);
+ enqueueCallback = &cb;
+}
+
+void Message::resetEnqueueCompleteCallback() {
+ sys::Mutex::ScopedLock l(callbackLock);
+ enqueueCallback = 0;
+}
-void Message::setDequeueCompleteCallback(MessageCallback& cb) { dequeueCallback = &cb; }
-void Message::resetDequeueCompleteCallback() { dequeueCallback = 0; }
+void Message::setDequeueCompleteCallback(MessageCallback& cb) {
+ sys::Mutex::ScopedLock l(callbackLock);
+ dequeueCallback = &cb;
+}
+
+void Message::resetDequeueCompleteCallback() {
+ sys::Mutex::ScopedLock l(callbackLock);
+ dequeueCallback = 0;
+}
framing::FieldTable& Message::getOrInsertHeaders()
{
diff --git a/qpid/cpp/src/qpid/broker/Message.h b/qpid/cpp/src/qpid/broker/Message.h
index e4d09b1042..3894960c95 100644
--- a/qpid/cpp/src/qpid/broker/Message.h
+++ b/qpid/cpp/src/qpid/broker/Message.h
@@ -74,7 +74,7 @@ public:
bool isImmediate() const;
QPID_BROKER_EXTERN const framing::FieldTable* getApplicationHeaders() const;
framing::FieldTable& getOrInsertHeaders();
- QPID_BROKER_EXTERN bool isPersistent();
+ QPID_BROKER_EXTERN bool isPersistent() const;
bool requiresAccept();
QPID_BROKER_EXTERN void setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e);
@@ -108,7 +108,7 @@ public:
return frames.isA<T>();
}
- uint32_t getRequiredCredit() const;
+ uint32_t getRequiredCredit();
void encode(framing::Buffer& buffer) const;
void encodeContent(framing::Buffer& buffer) const;
@@ -129,12 +129,9 @@ public:
QPID_BROKER_EXTERN void decodeHeader(framing::Buffer& buffer);
QPID_BROKER_EXTERN void decodeContent(framing::Buffer& buffer);
- /**
- * Releases the in-memory content data held by this
- * message. Must pass in a store from which the data can
- * be reloaded.
- */
- void releaseContent(MessageStore* store);
+ void QPID_BROKER_EXTERN tryReleaseContent();
+ void releaseContent();
+ void releaseContent(MessageStore* s);//deprecated, use 'setStore(store); releaseContent();' instead
void destroy();
bool getContentFrame(const Queue& queue, framing::AMQFrame& frame, uint16_t maxContentSize, uint64_t offset) const;
@@ -187,8 +184,12 @@ public:
mutable Replacement replacement;
mutable boost::intrusive_ptr<Message> empty;
+
+ sys::Mutex callbackLock;
MessageCallback* enqueueCallback;
MessageCallback* dequeueCallback;
+
+ uint32_t requiredCredit;
static std::string updateDestination;
};
diff --git a/qpid/cpp/src/qpid/broker/MessageBuilder.cpp b/qpid/cpp/src/qpid/broker/MessageBuilder.cpp
index 14b233fd6c..b1a2b77b05 100644
--- a/qpid/cpp/src/qpid/broker/MessageBuilder.cpp
+++ b/qpid/cpp/src/qpid/broker/MessageBuilder.cpp
@@ -80,7 +80,7 @@ void MessageBuilder::handle(AMQFrame& frame)
&& !NullMessageStore::isNullStore(store)
&& message->getExchangeName() != QPID_MANAGEMENT /* don't stage mgnt messages */)
{
- message->releaseContent(store);
+ message->releaseContent();
staging = true;
}
}
@@ -96,6 +96,7 @@ void MessageBuilder::end()
void MessageBuilder::start(const SequenceNumber& id)
{
message = intrusive_ptr<Message>(new Message(id));
+ message->setStore(store);
state = METHOD;
staging = false;
}
diff --git a/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp b/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
index 0b8a5db1c7..5f7cceebd3 100644
--- a/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
+++ b/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
@@ -32,11 +32,11 @@ using qpid::framing::FieldTable;
namespace qpid {
namespace broker {
-MessageStoreModule::MessageStoreModule(MessageStore* _store) : store(_store) {}
+MessageStoreModule::MessageStoreModule(boost::shared_ptr<MessageStore>& _store)
+ : store(_store) {}
MessageStoreModule::~MessageStoreModule()
{
- delete store;
}
bool MessageStoreModule::init(const Options*) { return true; }
@@ -173,7 +173,7 @@ void MessageStoreModule::collectPreparedXids(std::set<std::string>& xids)
bool MessageStoreModule::isNull() const
{
- return NullMessageStore::isNullStore(store);
+ return NullMessageStore::isNullStore(store.get());
}
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/MessageStoreModule.h b/qpid/cpp/src/qpid/broker/MessageStoreModule.h
index 02cbd13cf1..56b5a3c1ae 100644
--- a/qpid/cpp/src/qpid/broker/MessageStoreModule.h
+++ b/qpid/cpp/src/qpid/broker/MessageStoreModule.h
@@ -26,6 +26,7 @@
#include "qpid/broker/RecoveryManager.h"
#include <boost/intrusive_ptr.hpp>
+#include <boost/shared_ptr.hpp>
namespace qpid {
namespace broker {
@@ -35,9 +36,9 @@ namespace broker {
*/
class MessageStoreModule : public MessageStore
{
- MessageStore* store;
+ boost::shared_ptr<MessageStore> store;
public:
- MessageStoreModule(MessageStore* store);
+ MessageStoreModule(boost::shared_ptr<MessageStore>& store);
bool init(const Options* options);
void truncateInit(const bool pushDownStoreFiles = false);
diff --git a/qpid/cpp/src/qpid/broker/PersistableMessage.cpp b/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
index 2ef223aa81..303a0501f4 100644
--- a/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
+++ b/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
@@ -36,7 +36,6 @@ PersistableMessage::~PersistableMessage() {}
PersistableMessage::PersistableMessage() :
asyncEnqueueCounter(0),
asyncDequeueCounter(0),
- contentReleased(false),
store(0)
{}
@@ -59,9 +58,15 @@ void PersistableMessage::flush()
}
}
-void PersistableMessage::setContentReleased() {contentReleased = true; }
+void PersistableMessage::setContentReleased()
+{
+ contentReleaseState.released = true;
+}
-bool PersistableMessage::isContentReleased()const { return contentReleased; }
+bool PersistableMessage::isContentReleased() const
+{
+ return contentReleaseState.released;
+}
bool PersistableMessage::isEnqueueComplete() {
sys::ScopedLock<sys::Mutex> l(asyncEnqueueLock);
@@ -153,6 +158,26 @@ void PersistableMessage::dequeueAsync() {
asyncDequeueCounter++;
}
+PersistableMessage::ContentReleaseState::ContentReleaseState() : blocked(false), requested(false), released(false) {}
+
+void PersistableMessage::setStore(MessageStore* s)
+{
+ store = s;
+}
+
+void PersistableMessage::requestContentRelease()
+{
+ contentReleaseState.requested = true;
+}
+void PersistableMessage::blockContentRelease()
+{
+ contentReleaseState.blocked = true;
+}
+bool PersistableMessage::checkContentReleasable()
+{
+ return contentReleaseState.requested && !contentReleaseState.blocked;
+}
+
}}
diff --git a/qpid/cpp/src/qpid/broker/PersistableMessage.h b/qpid/cpp/src/qpid/broker/PersistableMessage.h
index 0274b41375..2576e266d2 100644
--- a/qpid/cpp/src/qpid/broker/PersistableMessage.h
+++ b/qpid/cpp/src/qpid/broker/PersistableMessage.h
@@ -68,8 +68,16 @@ class PersistableMessage : public Persistable
void enqueueAsync();
void dequeueAsync();
- bool contentReleased;
syncList synclist;
+ struct ContentReleaseState
+ {
+ bool blocked;
+ bool requested;
+ bool released;
+
+ ContentReleaseState();
+ };
+ ContentReleaseState contentReleaseState;
protected:
/** Called when all enqueues are complete for this message. */
@@ -96,8 +104,15 @@ class PersistableMessage : public Persistable
void flush();
- bool isContentReleased() const;
-
+ bool QPID_BROKER_EXTERN isContentReleased() const;
+
+ void QPID_BROKER_EXTERN setStore(MessageStore*);
+ void requestContentRelease();
+ void blockContentRelease();
+ bool checkContentReleasable();
+
+ virtual QPID_BROKER_EXTERN bool isPersistent() const = 0;
+
QPID_BROKER_EXTERN bool isEnqueueComplete();
QPID_BROKER_EXTERN void enqueueComplete();
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index b2a8e223c5..86de96468d 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -181,6 +181,8 @@ void Queue::deliver(boost::intrusive_ptr<Message>& msg){
void Queue::recover(boost::intrusive_ptr<Message>& msg){
+ if (policy.get()) policy->recoverEnqueued(msg);
+
push(msg, true);
if (store){
// setup synclist for recovered messages, so they don't get re-stored on lastNodeFailure
@@ -206,11 +208,10 @@ void Queue::process(boost::intrusive_ptr<Message>& msg){
}
void Queue::requeue(const QueuedMessage& msg){
- if (!isEnqueued(msg)) return;
-
QueueListeners::NotificationSet copy;
{
Mutex::ScopedLock locker(messageLock);
+ if (!isEnqueued(msg)) return;
msg.payload->enqueueComplete(); // mark the message as enqueued
messages.push_front(msg);
listeners.populate(copy);
@@ -563,16 +564,10 @@ void Queue::popMsg(QueuedMessage& qmsg)
}
void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
- Messages dequeues;
QueueListeners::NotificationSet copy;
{
Mutex::ScopedLock locker(messageLock);
QueuedMessage qm(this, msg, ++sequence);
- if (policy.get()) {
- policy->tryEnqueue(qm);
- //depending on policy, may have some dequeues
- if (!isRecovery) pendingDequeues.swap(dequeues);
- }
if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence);
LVQ::iterator i;
@@ -606,12 +601,11 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
if (eventMgr) eventMgr->enqueued(qm);
else QPID_LOG(warning, "Enqueue manager not set, events not generated for " << getName());
}
+ if (policy.get()) {
+ policy->enqueued(qm);
+ }
}
copy.notify();
- if (!dequeues.empty()) {
- //depending on policy, may have some dequeues
- for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
- }
}
QueuedMessage Queue::getFront()
@@ -697,8 +691,19 @@ void Queue::setLastNodeFailure()
}
// return true if store exists,
-bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg)
+bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg, bool suppressPolicyCheck)
{
+ if (policy.get() && !suppressPolicyCheck) {
+ Messages dequeues;
+ {
+ Mutex::ScopedLock locker(messageLock);
+ policy->tryEnqueue(msg);
+ policy->getPendingDequeues(dequeues);
+ }
+ //depending on policy, may have some dequeues that need to performed without holding the lock
+ for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
+ }
+
if (inLastNodeFailure && persistLastNode){
msg->forcePersistent();
}
@@ -707,15 +712,27 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg)
msg->addTraceId(traceId);
}
- if (msg->isPersistent() && store) {
+ if ((msg->isPersistent() || msg->checkContentReleasable()) && store) {
msg->enqueueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg);
store->enqueue(ctxt, pmsg, *this);
return true;
}
+ if (!store) {
+ //Messages enqueued on a transient queue should be prevented
+ //from having their content released as it may not be
+ //recoverable by these queue for delivery
+ msg->blockContentRelease();
+ }
return false;
}
+void Queue::enqueueAborted(boost::intrusive_ptr<Message> msg)
+{
+ Mutex::ScopedLock locker(messageLock);
+ if (policy.get()) policy->enqueueAborted(msg);
+}
+
// return true if store exists,
bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
{
@@ -726,7 +743,7 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
dequeued(msg);
}
}
- if (msg.payload->isPersistent() && store) {
+ if ((msg.payload->isPersistent() || msg.payload->checkContentReleasable()) && store) {
msg.payload->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg.payload);
store->dequeue(ctxt, pmsg, *this);
@@ -781,22 +798,37 @@ void Queue::create(const FieldTable& _settings)
void Queue::configure(const FieldTable& _settings, bool recovering)
{
- setPolicy(QueuePolicy::createQueuePolicy(_settings));
+
+ eventMode = _settings.getAsInt(qpidQueueEventGeneration);
+
+ if (QueuePolicy::getType(_settings) == QueuePolicy::FLOW_TO_DISK &&
+ (!store || NullMessageStore::isNullStore(store) || (eventMode && eventMgr && !eventMgr->isSync()) )) {
+ if ( NullMessageStore::isNullStore(store)) {
+ QPID_LOG(warning, "Flow to disk not valid for non-persisted queue:" << getName());
+ } else if (eventMgr && !eventMgr->isSync() ) {
+ QPID_LOG(warning, "Flow to disk not valid with async Queue Events:" << getName());
+ }
+ FieldTable copy(_settings);
+ copy.erase(QueuePolicy::typeKey);
+ setPolicy(QueuePolicy::createQueuePolicy(getName(), copy));
+ } else {
+ setPolicy(QueuePolicy::createQueuePolicy(getName(), _settings));
+ }
//set this regardless of owner to allow use of no-local with exclusive consumers also
noLocal = _settings.get(qpidNoLocal);
- QPID_LOG(debug, "Configured queue with no-local=" << noLocal);
+ QPID_LOG(debug, "Configured queue " << getName() << " with no-local=" << noLocal);
lastValueQueue= _settings.get(qpidLastValueQueue);
- if (lastValueQueue) QPID_LOG(debug, "Configured queue as Last Value Queue");
+ if (lastValueQueue) QPID_LOG(debug, "Configured queue as Last Value Queue for: " << getName());
lastValueQueueNoBrowse = _settings.get(qpidLastValueQueueNoBrowse);
if (lastValueQueueNoBrowse){
- QPID_LOG(debug, "Configured queue as Last Value Queue No Browse");
+ QPID_LOG(debug, "Configured queue as Last Value Queue No Browse for: " << getName());
lastValueQueue = lastValueQueueNoBrowse;
}
persistLastNode= _settings.get(qpidPersistLastNode);
- if (persistLastNode) QPID_LOG(debug, "Configured queue to Persist data if cluster fails to one node");
+ if (persistLastNode) QPID_LOG(debug, "Configured queue to Persist data if cluster fails to one node for: " << getName());
traceId = _settings.getAsString(qpidTraceIdentity);
std::string excludeList = _settings.getAsString(qpidTraceExclude);
@@ -806,8 +838,6 @@ void Queue::configure(const FieldTable& _settings, bool recovering)
QPID_LOG(debug, "Configured queue " << getName() << " with qpid.trace.id='" << traceId
<< "' and qpid.trace.exclude='"<< excludeList << "' i.e. " << traceExclude.size() << " elements");
- eventMode = _settings.getAsInt(qpidQueueEventGeneration);
-
FieldTable::ValuePtr p =_settings.get(qpidInsertSequenceNumbers);
if (p && p->convertsTo<std::string>()) insertSequenceNumbers(p->get<std::string>());
@@ -975,19 +1005,6 @@ void Queue::setExternalQueueStore(ExternalQueueStore* inst) {
}
}
-bool Queue::releaseMessageContent(const QueuedMessage& m)
-{
- if (store && !NullMessageStore::isNullStore(store)) {
- QPID_LOG(debug, "Message " << m.position << " on " << name << " released from memory");
- m.payload->releaseContent(store);
- return true;
- } else {
- QPID_LOG(warning, "Message " << m.position << " on " << name
- << " cannot be released from memory as the queue is not durable");
- return false;
- }
-}
-
ManagementObject* Queue::GetManagementObject (void) const
{
return (ManagementObject*) mgmtObject;
@@ -1044,11 +1061,12 @@ void Queue::insertSequenceNumbers(const std::string& key)
void Queue::enqueued(const QueuedMessage& m)
{
if (m.payload) {
- if (policy.get()) policy->tryEnqueue(m);
- mgntEnqStats(m.payload);
- if (m.payload->isPersistent()) {
- enqueue ( 0, m.payload );
+ if (policy.get()) {
+ policy->recoverEnqueued(m.payload);
+ policy->enqueued(m);
}
+ mgntEnqStats(m.payload);
+ enqueue ( 0, m.payload, true );
} else {
QPID_LOG(warning, "Queue informed of enqueued message that has no payload");
}
@@ -1059,10 +1077,4 @@ bool Queue::isEnqueued(const QueuedMessage& msg)
return !policy.get() || policy->isEnqueued(msg);
}
-void Queue::addPendingDequeue(const QueuedMessage& msg)
-{
- //assumes lock is held - true at present but rather nasty as this is a public method
- pendingDequeues.push_back(msg);
-}
-
QueueListeners& Queue::getListeners() { return listeners; }
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index 77799fd967..661e46f619 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -239,7 +239,8 @@ namespace qpid {
QPID_BROKER_EXTERN void setLastNodeFailure();
QPID_BROKER_EXTERN void clearLastNodeFailure();
- bool enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg);
+ bool enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg, bool suppressPolicyCheck = false);
+ void enqueueAborted(boost::intrusive_ptr<Message> msg);
/**
* dequeue from store (only done once messages is acknowledged)
*/
@@ -315,8 +316,6 @@ namespace qpid {
bindings.eachBinding(f);
}
- bool releaseMessageContent(const QueuedMessage&);
-
void popMsg(QueuedMessage& qmsg);
/** Set the position sequence number for the next message on the queue.
@@ -335,18 +334,6 @@ namespace qpid {
*/
void recoveryComplete();
- /**
- * This is a hack to avoid deadlocks in durable ring
- * queues. It is used for dequeueing messages in response
- * to an enqueue while avoid holding lock over call to
- * store.
- *
- * Assumes messageLock is held - true for curent use case
- * (QueuePolicy::tryEnqueue()) but rather nasty as this is a public
- * method
- **/
- void addPendingDequeue(const QueuedMessage &msg);
-
// For cluster update
QueueListeners& getListeners();
};
diff --git a/qpid/cpp/src/qpid/broker/QueueEvents.cpp b/qpid/cpp/src/qpid/broker/QueueEvents.cpp
index 6df869673d..bba054b0b8 100644
--- a/qpid/cpp/src/qpid/broker/QueueEvents.cpp
+++ b/qpid/cpp/src/qpid/broker/QueueEvents.cpp
@@ -25,25 +25,41 @@
namespace qpid {
namespace broker {
-QueueEvents::QueueEvents(const boost::shared_ptr<sys::Poller>& poller) :
- eventQueue(boost::bind(&QueueEvents::handle, this, _1), poller), enabled(true)
+QueueEvents::QueueEvents(const boost::shared_ptr<sys::Poller>& poller, bool isSync) :
+ eventQueue(boost::bind(&QueueEvents::handle, this, _1), poller), enabled(true), sync(isSync)
{
- eventQueue.start();
+ if (!sync) eventQueue.start();
}
QueueEvents::~QueueEvents()
{
- eventQueue.stop();
+ if (!sync) eventQueue.stop();
}
void QueueEvents::enqueued(const QueuedMessage& m)
{
- if (enabled) eventQueue.push(Event(ENQUEUE, m));
+ if (enabled) {
+ Event enq(ENQUEUE, m);
+ if (sync) {
+ for (Listeners::iterator j = listeners.begin(); j != listeners.end(); j++)
+ j->second(enq);
+ } else {
+ eventQueue.push(enq);
+ }
+ }
}
void QueueEvents::dequeued(const QueuedMessage& m)
{
- if (enabled) eventQueue.push(Event(DEQUEUE, m));
+ if (enabled) {
+ Event deq(DEQUEUE, m);
+ if (sync) {
+ for (Listeners::iterator j = listeners.begin(); j != listeners.end(); j++)
+ j->second(deq);
+ } else {
+ eventQueue.push(Event(DEQUEUE, m));
+ }
+ }
}
void QueueEvents::registerListener(const std::string& id, const EventListener& listener)
@@ -70,15 +86,16 @@ QueueEvents::EventQueue::Batch::const_iterator
QueueEvents::handle(const EventQueue::Batch& events) {
qpid::sys::Mutex::ScopedLock l(lock);
for (EventQueue::Batch::const_iterator i = events.begin(); i != events.end(); ++i) {
- for (Listeners::iterator j = listeners.begin(); j != listeners.end(); j++)
- j->second(*i);
+ for (Listeners::iterator j = listeners.begin(); j != listeners.end(); j++) {
+ j->second(*i);
+ }
}
return events.end();
}
void QueueEvents::shutdown()
{
- if (!eventQueue.empty() && !listeners.empty()) eventQueue.shutdown();
+ if (!sync && !eventQueue.empty() && !listeners.empty()) eventQueue.shutdown();
}
void QueueEvents::enable()
@@ -93,6 +110,12 @@ void QueueEvents::disable()
QPID_LOG(debug, "Queue events disabled");
}
+bool QueueEvents::isSync()
+{
+ return sync;
+}
+
+
QueueEvents::Event::Event(EventType t, const QueuedMessage& m) : type(t), msg(m) {}
diff --git a/qpid/cpp/src/qpid/broker/QueueEvents.h b/qpid/cpp/src/qpid/broker/QueueEvents.h
index 6826c6e79a..c42752133e 100644
--- a/qpid/cpp/src/qpid/broker/QueueEvents.h
+++ b/qpid/cpp/src/qpid/broker/QueueEvents.h
@@ -54,7 +54,7 @@ class QueueEvents
typedef boost::function<void (Event)> EventListener;
- QPID_BROKER_EXTERN QueueEvents(const boost::shared_ptr<sys::Poller>& poller);
+ QPID_BROKER_EXTERN QueueEvents(const boost::shared_ptr<sys::Poller>& poller, bool isSync = false);
QPID_BROKER_EXTERN ~QueueEvents();
QPID_BROKER_EXTERN void enqueued(const QueuedMessage&);
QPID_BROKER_EXTERN void dequeued(const QueuedMessage&);
@@ -65,6 +65,7 @@ class QueueEvents
void disable();
//process all outstanding events
QPID_BROKER_EXTERN void shutdown();
+ QPID_BROKER_EXTERN bool isSync();
private:
typedef qpid::sys::PollableQueue<Event> EventQueue;
typedef std::map<std::string, EventListener> Listeners;
@@ -73,6 +74,7 @@ class QueueEvents
Listeners listeners;
volatile bool enabled;
qpid::sys::Mutex lock;//protect listeners from concurrent access
+ bool sync;
EventQueue::Batch::const_iterator handle(const EventQueue::Batch& e);
diff --git a/qpid/cpp/src/qpid/broker/QueuePolicy.cpp b/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
index 39afe90134..a8aa674c53 100644
--- a/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
+++ b/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
@@ -28,8 +28,8 @@
using namespace qpid::broker;
using namespace qpid::framing;
-QueuePolicy::QueuePolicy(uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) :
- maxCount(_maxCount), maxSize(_maxSize), type(_type), count(0), size(0), policyExceeded(false) {}
+QueuePolicy::QueuePolicy(const std::string& _name, uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) :
+ maxCount(_maxCount), maxSize(_maxSize), type(_type), count(0), size(0), policyExceeded(false), name(_name) {}
void QueuePolicy::enqueued(uint64_t _size)
{
@@ -39,18 +39,15 @@ void QueuePolicy::enqueued(uint64_t _size)
void QueuePolicy::dequeued(uint64_t _size)
{
- //Note: underflow detection is not reliable in the face of
- //concurrent updates (at present locking in Queue.cpp prevents
- //these anyway); updates are atomic and are safe regardless.
if (maxCount) {
- if (count.get() > 0) {
+ if (count > 0) {
--count;
} else {
throw Exception(QPID_MSG("Attempted count underflow on dequeue(" << _size << "): " << *this));
}
}
if (maxSize) {
- if (_size > size.get()) {
+ if (_size > size) {
throw Exception(QPID_MSG("Attempted size underflow on dequeue(" << _size << "): " << *this));
} else {
size -= _size;
@@ -58,47 +55,47 @@ void QueuePolicy::dequeued(uint64_t _size)
}
}
-bool QueuePolicy::checkLimit(const QueuedMessage& m)
+bool QueuePolicy::checkLimit(boost::intrusive_ptr<Message> m)
{
- bool sizeExceeded = maxSize && (size.get() + m.payload->contentSize()) > maxSize;
- bool countExceeded = maxCount && (count.get() + 1) > maxCount;
+ bool sizeExceeded = maxSize && (size + m->contentSize()) > maxSize;
+ bool countExceeded = maxCount && (count + 1) > maxCount;
bool exceeded = sizeExceeded || countExceeded;
if (exceeded) {
if (!policyExceeded) {
- policyExceeded = true;
- if (m.queue) {
- if (sizeExceeded) QPID_LOG(info, "Queue cumulative message size exceeded policy for " << m.queue->getName());
- if (countExceeded) QPID_LOG(info, "Queue message count exceeded policy for " << m.queue->getName());
- }
+ policyExceeded = true;
+ if (sizeExceeded) QPID_LOG(info, "Queue cumulative message size exceeded policy for " << name);
+ if (countExceeded) QPID_LOG(info, "Queue message count exceeded policy for " << name);
}
} else {
if (policyExceeded) {
policyExceeded = false;
- if (m.queue) {
- QPID_LOG(info, "Queue cumulative message size and message count within policy for " << m.queue->getName());
- }
+ QPID_LOG(info, "Queue cumulative message size and message count within policy for " << name);
}
}
return !exceeded;
}
-void QueuePolicy::tryEnqueue(const QueuedMessage& m)
+void QueuePolicy::tryEnqueue(boost::intrusive_ptr<Message> m)
{
if (checkLimit(m)) {
- enqueued(m);
+ enqueued(m->contentSize());
} else {
- std::string queue = m.queue ? m.queue->getName() : std::string("unknown queue");
- throw ResourceLimitExceededException(
- QPID_MSG("Policy exceeded on " << queue << " by message " << m.position
- << " of size " << m.payload->contentSize() << " , policy: " << *this));
+ throw ResourceLimitExceededException(QPID_MSG("Policy exceeded on " << name << ", policy: " << *this));
}
}
-void QueuePolicy::enqueued(const QueuedMessage& m)
+void QueuePolicy::recoverEnqueued(boost::intrusive_ptr<Message> m)
{
- enqueued(m.payload->contentSize());
+ enqueued(m->contentSize());
}
+void QueuePolicy::enqueueAborted(boost::intrusive_ptr<Message> m)
+{
+ dequeued(m->contentSize());
+}
+
+void QueuePolicy::enqueued(const QueuedMessage&) {}
+
void QueuePolicy::dequeued(const QueuedMessage& m)
{
dequeued(m.payload->contentSize());
@@ -132,7 +129,7 @@ std::string QueuePolicy::getType(const FieldTable& settings)
std::transform(t.begin(), t.end(), t.begin(), tolower);
if (t == REJECT || t == FLOW_TO_DISK || t == RING || t == RING_STRICT) return t;
}
- return FLOW_TO_DISK;
+ return REJECT;
}
void QueuePolicy::setDefaultMaxSize(uint64_t s)
@@ -140,6 +137,7 @@ void QueuePolicy::setDefaultMaxSize(uint64_t s)
defaultMaxSize = s;
}
+void QueuePolicy::getPendingDequeues(Messages&) {}
@@ -148,8 +146,8 @@ void QueuePolicy::encode(Buffer& buffer) const
{
buffer.putLong(maxCount);
buffer.putLongLong(maxSize);
- buffer.putLong(count.get());
- buffer.putLongLong(size.get());
+ buffer.putLong(count);
+ buffer.putLongLong(size);
}
void QueuePolicy::decode ( Buffer& buffer )
@@ -179,16 +177,18 @@ const std::string QueuePolicy::RING("ring");
const std::string QueuePolicy::RING_STRICT("ring_strict");
uint64_t QueuePolicy::defaultMaxSize(0);
-FlowToDiskPolicy::FlowToDiskPolicy(uint32_t _maxCount, uint64_t _maxSize) :
- QueuePolicy(_maxCount, _maxSize, FLOW_TO_DISK) {}
+FlowToDiskPolicy::FlowToDiskPolicy(const std::string& _name, uint32_t _maxCount, uint64_t _maxSize) :
+ QueuePolicy(_name, _maxCount, _maxSize, FLOW_TO_DISK) {}
-bool FlowToDiskPolicy::checkLimit(const QueuedMessage& m)
+bool FlowToDiskPolicy::checkLimit(boost::intrusive_ptr<Message> m)
{
- return QueuePolicy::checkLimit(m) || m.queue->releaseMessageContent(m);
+ if (!QueuePolicy::checkLimit(m)) m->requestContentRelease();
+ return true;
}
-RingQueuePolicy::RingQueuePolicy(uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) :
- QueuePolicy(_maxCount, _maxSize, _type), strict(_type == RING_STRICT) {}
+RingQueuePolicy::RingQueuePolicy(const std::string& _name,
+ uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) :
+ QueuePolicy(_name, _maxCount, _maxSize, _type), strict(_type == RING_STRICT) {}
bool before(const QueuedMessage& a, const QueuedMessage& b)
{
@@ -197,15 +197,12 @@ bool before(const QueuedMessage& a, const QueuedMessage& b)
void RingQueuePolicy::enqueued(const QueuedMessage& m)
{
- QueuePolicy::enqueued(m);
- qpid::sys::Mutex::ScopedLock l(lock);
//need to insert in correct location based on position
queue.insert(lower_bound(queue.begin(), queue.end(), m, before), m);
}
void RingQueuePolicy::dequeued(const QueuedMessage& m)
{
- qpid::sys::Mutex::ScopedLock l(lock);
//find and remove m from queue
if (find(m, pendingDequeues, true) || find(m, queue, true)) {
//now update count and size
@@ -215,49 +212,32 @@ void RingQueuePolicy::dequeued(const QueuedMessage& m)
bool RingQueuePolicy::isEnqueued(const QueuedMessage& m)
{
- qpid::sys::Mutex::ScopedLock l(lock);
//for non-strict ring policy, a message can be replaced (and
//therefore dequeued) before it is accepted or released by
//subscriber; need to detect this
return find(m, pendingDequeues, false) || find(m, queue, false);
}
-bool RingQueuePolicy::checkLimit(const QueuedMessage& m)
+bool RingQueuePolicy::checkLimit(boost::intrusive_ptr<Message> m)
{
if (QueuePolicy::checkLimit(m)) return true;//if haven't hit limit, ok to accept
QueuedMessage oldest;
- {
- qpid::sys::Mutex::ScopedLock l(lock);
- if (queue.empty()) {
- QPID_LOG(debug, "Message too large for ring queue "
- << (m.queue ? m.queue->getName() : std::string("unknown queue"))
- << " [" << *this << "] "
- << ": message size = " << m.payload->contentSize() << " bytes");
- return false;
- }
- oldest = queue.front();
+ if (queue.empty()) {
+ QPID_LOG(debug, "Message too large for ring queue " << name
+ << " [" << *this << "] "
+ << ": message size = " << m->contentSize() << " bytes");
+ return false;
}
+ oldest = queue.front();
if (oldest.queue->acquire(oldest) || !strict) {
- {
- //TODO: fix this! In the current code, this method is
- //only ever called with the Queue lock already taken. This
- //should not be relied upon going forward however and
- //clearly the locking in this class is insufficient as
- //there is no guarantee that the message previously atthe
- //front is still there.
- qpid::sys::Mutex::ScopedLock l(lock);
- queue.pop_front();
- pendingDequeues.push_back(oldest);
- }
- oldest.queue->addPendingDequeue(oldest);
- QPID_LOG(debug, "Ring policy triggered in queue "
- << (m.queue ? m.queue->getName() : std::string("unknown queue"))
- << ": removed message " << oldest.position << " to make way for " << m.position);
+ queue.pop_front();
+ pendingDequeues.push_back(oldest);
+ QPID_LOG(debug, "Ring policy triggered in " << name
+ << ": removed message " << oldest.position << " to make way for new message");
return true;
} else {
- QPID_LOG(debug, "Ring policy could not be triggered in queue "
- << (m.queue ? m.queue->getName() : std::string("unknown queue"))
+ QPID_LOG(debug, "Ring policy could not be triggered in " << name
<< ": oldest message (seq-no=" << oldest.position << ") has been delivered but not yet acknowledged or requeued");
//in strict mode, if oldest message has been delivered (hence
//cannot be acquired) but not yet acked, it should not be
@@ -266,6 +246,11 @@ bool RingQueuePolicy::checkLimit(const QueuedMessage& m)
}
}
+void RingQueuePolicy::getPendingDequeues(Messages& result)
+{
+ result = pendingDequeues;
+}
+
bool RingQueuePolicy::find(const QueuedMessage& m, Messages& q, bool remove)
{
for (Messages::iterator i = q.begin(); i != q.end(); i++) {
@@ -277,25 +262,36 @@ bool RingQueuePolicy::find(const QueuedMessage& m, Messages& q, bool remove)
return false;
}
+std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type)
+{
+ return createQueuePolicy("<unspecified>", maxCount, maxSize, type);
+}
+
std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(const qpid::framing::FieldTable& settings)
{
+ return createQueuePolicy("<unspecified>", settings);
+}
+
+std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(const std::string& name, const qpid::framing::FieldTable& settings)
+{
uint32_t maxCount = getInt(settings, maxCountKey, 0);
uint32_t maxSize = getInt(settings, maxSizeKey, defaultMaxSize);
if (maxCount || maxSize) {
- return createQueuePolicy(maxCount, maxSize, getType(settings));
+ return createQueuePolicy(name, maxCount, maxSize, getType(settings));
} else {
return std::auto_ptr<QueuePolicy>();
}
}
-std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type)
+std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(const std::string& name,
+ uint32_t maxCount, uint64_t maxSize, const std::string& type)
{
if (type == RING || type == RING_STRICT) {
- return std::auto_ptr<QueuePolicy>(new RingQueuePolicy(maxCount, maxSize, type));
+ return std::auto_ptr<QueuePolicy>(new RingQueuePolicy(name, maxCount, maxSize, type));
} else if (type == FLOW_TO_DISK) {
- return std::auto_ptr<QueuePolicy>(new FlowToDiskPolicy(maxCount, maxSize));
+ return std::auto_ptr<QueuePolicy>(new FlowToDiskPolicy(name, maxCount, maxSize));
} else {
- return std::auto_ptr<QueuePolicy>(new QueuePolicy(maxCount, maxSize, type));
+ return std::auto_ptr<QueuePolicy>(new QueuePolicy(name, maxCount, maxSize, type));
}
}
@@ -305,10 +301,10 @@ namespace qpid {
std::ostream& operator<<(std::ostream& out, const QueuePolicy& p)
{
- if (p.maxSize) out << "size: max=" << p.maxSize << ", current=" << p.size.get();
+ if (p.maxSize) out << "size: max=" << p.maxSize << ", current=" << p.size;
else out << "size: unlimited";
out << "; ";
- if (p.maxCount) out << "count: max=" << p.maxCount << ", current=" << p.count.get();
+ if (p.maxCount) out << "count: max=" << p.maxCount << ", current=" << p.count;
else out << "count: unlimited";
out << "; type=" << p.type;
return out;
diff --git a/qpid/cpp/src/qpid/broker/QueuePolicy.h b/qpid/cpp/src/qpid/broker/QueuePolicy.h
index 54745876d5..b2937e94c7 100644
--- a/qpid/cpp/src/qpid/broker/QueuePolicy.h
+++ b/qpid/cpp/src/qpid/broker/QueuePolicy.h
@@ -40,14 +40,14 @@ class QueuePolicy
uint32_t maxCount;
uint64_t maxSize;
const std::string type;
- qpid::sys::AtomicValue<uint32_t> count;
- qpid::sys::AtomicValue<uint64_t> size;
+ uint32_t count;
+ uint64_t size;
bool policyExceeded;
static int getInt(const qpid::framing::FieldTable& settings, const std::string& key, int defaultValue);
- static std::string getType(const qpid::framing::FieldTable& settings);
public:
+ typedef std::deque<QueuedMessage> Messages;
static QPID_BROKER_EXTERN const std::string maxCountKey;
static QPID_BROKER_EXTERN const std::string maxSizeKey;
static QPID_BROKER_EXTERN const std::string typeKey;
@@ -57,27 +57,34 @@ class QueuePolicy
static QPID_BROKER_EXTERN const std::string RING_STRICT;
virtual ~QueuePolicy() {}
- QPID_BROKER_EXTERN void tryEnqueue(const QueuedMessage&);
+ QPID_BROKER_EXTERN void tryEnqueue(boost::intrusive_ptr<Message> msg);
+ QPID_BROKER_EXTERN void recoverEnqueued(boost::intrusive_ptr<Message> msg);
+ QPID_BROKER_EXTERN void enqueueAborted(boost::intrusive_ptr<Message> msg);
+ virtual void enqueued(const QueuedMessage&);
virtual void dequeued(const QueuedMessage&);
virtual bool isEnqueued(const QueuedMessage&);
- virtual bool checkLimit(const QueuedMessage&);
QPID_BROKER_EXTERN void update(qpid::framing::FieldTable& settings);
uint32_t getMaxCount() const { return maxCount; }
uint64_t getMaxSize() const { return maxSize; }
void encode(framing::Buffer& buffer) const;
void decode ( framing::Buffer& buffer );
uint32_t encodedSize() const;
+ virtual void getPendingDequeues(Messages& result);
-
+ static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const std::string& name, const qpid::framing::FieldTable& settings);
+ static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT);
static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const qpid::framing::FieldTable& settings);
static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT);
+ static std::string getType(const qpid::framing::FieldTable& settings);
static void setDefaultMaxSize(uint64_t);
friend QPID_BROKER_EXTERN std::ostream& operator<<(std::ostream&,
const QueuePolicy&);
protected:
- QueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT);
+ const std::string name;
- virtual void enqueued(const QueuedMessage&);
+ QueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT);
+
+ virtual bool checkLimit(boost::intrusive_ptr<Message> msg);
void enqueued(uint64_t size);
void dequeued(uint64_t size);
};
@@ -86,21 +93,20 @@ class QueuePolicy
class FlowToDiskPolicy : public QueuePolicy
{
public:
- FlowToDiskPolicy(uint32_t maxCount, uint64_t maxSize);
- bool checkLimit(const QueuedMessage&);
+ FlowToDiskPolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize);
+ bool checkLimit(boost::intrusive_ptr<Message> msg);
};
class RingQueuePolicy : public QueuePolicy
{
public:
- RingQueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type = RING);
+ RingQueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = RING);
void enqueued(const QueuedMessage&);
void dequeued(const QueuedMessage&);
bool isEnqueued(const QueuedMessage&);
- bool checkLimit(const QueuedMessage&);
+ bool checkLimit(boost::intrusive_ptr<Message> msg);
+ void getPendingDequeues(Messages& result);
private:
- typedef std::deque<QueuedMessage> Messages;
- qpid::sys::Mutex lock;
Messages pendingDequeues;
Messages queue;
const bool strict;
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index bdd5f33601..7e3090bf17 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -65,7 +65,7 @@ SemanticState::SemanticState(DeliveryAdapter& da, SessionContext& ss)
tagGenerator("sgen"),
dtxSelected(false),
authMsg(getSession().getBroker().getOptions().auth && !getSession().getConnection().isFederationLink()),
- userID(getSession().getConnection().getUserId().substr(0,getSession().getConnection().getUserId().find('@')))
+ userID(getSession().getConnection().getUserId())
{
acl = getSession().getBroker().getAcl();
}
@@ -302,6 +302,18 @@ bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg)
return !blocked;
}
+namespace {
+struct ConsumerName {
+ const SemanticState::ConsumerImpl& consumer;
+ ConsumerName(const SemanticState::ConsumerImpl& ci) : consumer(ci) {}
+};
+
+ostream& operator<<(ostream& o, const ConsumerName& pc) {
+ return o << pc.consumer.getName() << " on "
+ << pc.consumer.getParent().getSession().getSessionId();
+}
+}
+
void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg)
{
uint32_t originalMsgCredit = msgCredit;
@@ -312,7 +324,7 @@ void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg)
if (byteCredit != 0xFFFFFFFF) {
byteCredit -= msg->getRequiredCredit();
}
- QPID_LOG(debug, "Credit allocated for '" << name << "' on " << parent
+ QPID_LOG(debug, "Credit allocated for " << ConsumerName(*this)
<< ", was " << " bytes: " << originalByteCredit << " msgs: " << originalMsgCredit
<< " now bytes: " << byteCredit << " msgs: " << msgCredit);
@@ -320,15 +332,13 @@ void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg)
bool SemanticState::ConsumerImpl::checkCredit(intrusive_ptr<Message>& msg)
{
- if (msgCredit == 0 || (byteCredit != 0xFFFFFFFF && byteCredit < msg->getRequiredCredit())) {
- QPID_LOG(debug, "Not enough credit for '" << name << "' on " << parent
- << ", bytes: " << byteCredit << " msgs: " << msgCredit);
- return false;
- } else {
- QPID_LOG(debug, "Credit available for '" << name << "' on " << parent
- << " bytes: " << byteCredit << " msgs: " << msgCredit);
- return true;
- }
+ bool enoughCredit = msgCredit > 0 &&
+ (byteCredit == 0xFFFFFFFF || byteCredit >= msg->getRequiredCredit());
+ QPID_LOG(debug, (enoughCredit ? "Sufficient credit for " : "Insufficient credit for ")
+ << ConsumerName(*this)
+ << ", have bytes: " << byteCredit << " msgs: " << msgCredit
+ << ", need " << msg->getRequiredCredit() << " bytes");
+ return enoughCredit;
}
SemanticState::ConsumerImpl::~ConsumerImpl() {}
@@ -356,6 +366,9 @@ void SemanticState::handle(intrusive_ptr<Message> msg) {
} else {
DeliverableMessage deliverable(msg);
route(msg, deliverable);
+ if (msg->checkContentReleasable()) {
+ msg->releaseContent();
+ }
}
}
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h
index da8383fc12..89fe7b83dd 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.h
+++ b/qpid/cpp/src/qpid/broker/SemanticState.h
@@ -129,6 +129,7 @@ class SemanticState : private boost::noncopyable {
const framing::FieldTable& getArguments() const { return arguments; }
SemanticState& getParent() { return *parent; }
+ const SemanticState& getParent() const { return *parent; }
};
private:
@@ -163,6 +164,7 @@ class SemanticState : private boost::noncopyable {
~SemanticState();
SessionContext& getSession() { return session; }
+ const SessionContext& getSession() const { return session; }
ConsumerImpl& find(const std::string& destination);
diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
index a1ad5a0a30..2ac6d66e62 100644
--- a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -337,6 +337,10 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string&
params.insert(make_pair(acl::PROP_DURABLE, std::string(durable ? _TRUE : _FALSE)));
params.insert(make_pair(acl::PROP_EXCLUSIVE, std::string(exclusive ? _TRUE : _FALSE)));
params.insert(make_pair(acl::PROP_AUTODELETE, std::string(autoDelete ? _TRUE : _FALSE)));
+ params.insert(make_pair(acl::PROP_POLICYTYPE, arguments.getAsString("qpid.policy_type")));
+ params.insert(make_pair(acl::PROP_MAXQUEUECOUNT, boost::lexical_cast<string>(arguments.getAsInt("qpid.max_count"))));
+ params.insert(make_pair(acl::PROP_MAXQUEUESIZE, boost::lexical_cast<string>(arguments.getAsInt64("qpid.max_size"))));
+
if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_QUEUE,name,&params) )
throw NotAllowedException(QPID_MSG("ACL denied queue create request from " << getConnection().getUserId()));
}
@@ -472,8 +476,7 @@ SessionAdapter::MessageHandlerImpl::subscribe(const string& queueName,
AclModule* acl = getBroker().getAcl();
if (acl)
- {
- // add flags as needed
+ {
if (!acl->authorise(getConnection().getUserId(),acl::ACT_CONSUME,acl::OBJ_QUEUE,queueName,NULL) )
throw NotAllowedException(QPID_MSG("ACL denied Queue subscribe request from " << getConnection().getUserId()));
}
diff --git a/qpid/cpp/src/qpid/broker/SessionContext.h b/qpid/cpp/src/qpid/broker/SessionContext.h
index cfdbd100c3..afbbb2cc22 100644
--- a/qpid/cpp/src/qpid/broker/SessionContext.h
+++ b/qpid/cpp/src/qpid/broker/SessionContext.h
@@ -28,7 +28,7 @@
#include "qpid/sys/OutputControl.h"
#include "qpid/broker/ConnectionState.h"
#include "qpid/broker/OwnershipToken.h"
-
+#include "qpid/SessionId.h"
#include <boost/noncopyable.hpp>
@@ -45,6 +45,7 @@ class SessionContext : public OwnershipToken, public sys::OutputControl
virtual framing::AMQP_ClientProxy& getProxy() = 0;
virtual Broker& getBroker() = 0;
virtual uint16_t getChannel() const = 0;
+ virtual const SessionId& getSessionId() const = 0;
};
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h
index 67fd4f4f38..eade93ddaa 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.h
+++ b/qpid/cpp/src/qpid/broker/SessionState.h
@@ -118,6 +118,8 @@ class SessionState : public qpid::SessionState,
bool processSendCredit(uint32_t msgs);
+ const SessionId& getSessionId() const { return getId(); }
+
private:
void handleCommand(framing::AMQMethodBody* method, const framing::SequenceNumber& id);
diff --git a/qpid/cpp/src/qpid/broker/SignalHandler.cpp b/qpid/cpp/src/qpid/broker/SignalHandler.cpp
index f4a3822554..b565cfd419 100644
--- a/qpid/cpp/src/qpid/broker/SignalHandler.cpp
+++ b/qpid/cpp/src/qpid/broker/SignalHandler.cpp
@@ -38,6 +38,8 @@ void SignalHandler::setBroker(const boost::intrusive_ptr<Broker>& b) {
signal(SIGCHLD,SIG_IGN);
}
+void SignalHandler::shutdown() { shutdownHandler(0); }
+
void SignalHandler::shutdownHandler(int) {
if (broker.get()) {
broker->shutdown();
diff --git a/qpid/cpp/src/qpid/broker/SignalHandler.h b/qpid/cpp/src/qpid/broker/SignalHandler.h
index d2cdfae07c..bbe831b61d 100644
--- a/qpid/cpp/src/qpid/broker/SignalHandler.h
+++ b/qpid/cpp/src/qpid/broker/SignalHandler.h
@@ -38,6 +38,9 @@ class SignalHandler
/** Set the broker to be shutdown on signals */
static void setBroker(const boost::intrusive_ptr<Broker>& broker);
+ /** Initiate shut-down of broker */
+ static void shutdown();
+
private:
static void shutdownHandler(int);
static boost::intrusive_ptr<Broker> broker;
diff --git a/qpid/cpp/src/qpid/broker/TopicExchange.cpp b/qpid/cpp/src/qpid/broker/TopicExchange.cpp
index 6bf0b104ea..cb04742677 100644
--- a/qpid/cpp/src/qpid/broker/TopicExchange.cpp
+++ b/qpid/cpp/src/qpid/broker/TopicExchange.cpp
@@ -293,44 +293,23 @@ bool TopicExchange::isBound(Queue::shared_ptr queue, const string& pattern)
return q != qv.end();
}
-void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){
+void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/)
+{
Binding::vector mb;
+ BindingList b(new std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> >);
PreRoute pr(msg, this);
- uint32_t count(0);
-
{
- RWlock::ScopedRlock l(lock);
- for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {
- if (match(i->first, routingKey)) {
- Binding::vector& qv(i->second.bindingVector);
- for(Binding::vector::iterator j = qv.begin(); j != qv.end(); j++, count++){
- mb.push_back(*j);
+ RWlock::ScopedRlock l(lock);
+ for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {
+ if (match(i->first, routingKey)) {
+ Binding::vector& qv(i->second.bindingVector);
+ for(Binding::vector::iterator j = qv.begin(); j != qv.end(); j++){
+ b->push_back(*j);
+ }
}
}
}
- }
-
- for (Binding::vector::iterator j = mb.begin(); j != mb.end(); ++j) {
- msg.deliverTo((*j)->queue);
- if ((*j)->mgmtBinding != 0)
- (*j)->mgmtBinding->inc_msgMatched ();
- }
-
- if (mgmtExchange != 0)
- {
- mgmtExchange->inc_msgReceives ();
- mgmtExchange->inc_byteReceives (msg.contentSize ());
- if (count == 0)
- {
- mgmtExchange->inc_msgDrops ();
- mgmtExchange->inc_byteDrops (msg.contentSize ());
- }
- else
- {
- mgmtExchange->inc_msgRoutes (count);
- mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
- }
- }
+ doRoute(msg, b);
}
bool TopicExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const)
diff --git a/qpid/cpp/src/qpid/broker/TxAccept.cpp b/qpid/cpp/src/qpid/broker/TxAccept.cpp
index e47ac84990..928ac12c10 100644
--- a/qpid/cpp/src/qpid/broker/TxAccept.cpp
+++ b/qpid/cpp/src/qpid/broker/TxAccept.cpp
@@ -88,7 +88,13 @@ bool TxAccept::prepare(TransactionContext* ctxt) throw()
void TxAccept::commit() throw()
{
- ops.commit();
+ try {
+ ops.commit();
+ } catch (const std::exception& e) {
+ QPID_LOG(error, "Failed to commit: " << e.what());
+ } catch(...) {
+ QPID_LOG(error, "Failed to commit (unknown error)");
+ }
}
void TxAccept::rollback() throw() {}
diff --git a/qpid/cpp/src/qpid/broker/TxPublish.cpp b/qpid/cpp/src/qpid/broker/TxPublish.cpp
index 17b99fd883..4b083033ea 100644
--- a/qpid/cpp/src/qpid/broker/TxPublish.cpp
+++ b/qpid/cpp/src/qpid/broker/TxPublish.cpp
@@ -26,9 +26,14 @@ using namespace qpid::broker;
TxPublish::TxPublish(intrusive_ptr<Message> _msg) : msg(_msg) {}
-bool TxPublish::prepare(TransactionContext* ctxt) throw(){
+bool TxPublish::prepare(TransactionContext* ctxt) throw()
+{
try{
- for_each(queues.begin(), queues.end(), Prepare(ctxt, msg));
+ while (!queues.empty()) {
+ prepare(ctxt, queues.front());
+ prepared.push_back(queues.front());
+ queues.pop_front();
+ }
return true;
}catch(const std::exception& e){
QPID_LOG(error, "Failed to prepare: " << e.what());
@@ -38,11 +43,30 @@ bool TxPublish::prepare(TransactionContext* ctxt) throw(){
return false;
}
-void TxPublish::commit() throw(){
- for_each(queues.begin(), queues.end(), Commit(msg));
+void TxPublish::commit() throw()
+{
+ try {
+ for_each(prepared.begin(), prepared.end(), Commit(msg));
+ if (msg->checkContentReleasable()) {
+ msg->releaseContent();
+ }
+ } catch (const std::exception& e) {
+ QPID_LOG(error, "Failed to commit: " << e.what());
+ } catch(...) {
+ QPID_LOG(error, "Failed to commit (unknown error)");
+ }
}
-void TxPublish::rollback() throw(){
+void TxPublish::rollback() throw()
+{
+ try {
+ for_each(prepared.begin(), prepared.end(), Rollback(msg));
+ } catch (const std::exception& e) {
+ QPID_LOG(error, "Failed to complete rollback: " << e.what());
+ } catch(...) {
+ QPID_LOG(error, "Failed to complete rollback (unknown error)");
+ }
+
}
void TxPublish::deliverTo(const boost::shared_ptr<Queue>& queue){
@@ -54,16 +78,14 @@ void TxPublish::deliverTo(const boost::shared_ptr<Queue>& queue){
}
}
-TxPublish::Prepare::Prepare(TransactionContext* _ctxt, intrusive_ptr<Message>& _msg)
- : ctxt(_ctxt), msg(_msg){}
-
-void TxPublish::Prepare::operator()(const boost::shared_ptr<Queue>& queue){
+void TxPublish::prepare(TransactionContext* ctxt, const boost::shared_ptr<Queue> queue)
+{
if (!queue->enqueue(ctxt, msg)){
/**
- * if not store then mark message for ack and deleivery once
- * commit happens, as async IO will never set it when no store
- * exists
- */
+ * if not store then mark message for ack and deleivery once
+ * commit happens, as async IO will never set it when no store
+ * exists
+ */
msg->enqueueComplete();
}
}
@@ -74,6 +96,12 @@ void TxPublish::Commit::operator()(const boost::shared_ptr<Queue>& queue){
queue->process(msg);
}
+TxPublish::Rollback::Rollback(intrusive_ptr<Message>& _msg) : msg(_msg){}
+
+void TxPublish::Rollback::operator()(const boost::shared_ptr<Queue>& queue){
+ queue->enqueueAborted(msg);
+}
+
uint64_t TxPublish::contentSize ()
{
return msg->contentSize ();
diff --git a/qpid/cpp/src/qpid/broker/TxPublish.h b/qpid/cpp/src/qpid/broker/TxPublish.h
index d5cf5639c4..b6ab9767ab 100644
--- a/qpid/cpp/src/qpid/broker/TxPublish.h
+++ b/qpid/cpp/src/qpid/broker/TxPublish.h
@@ -47,23 +47,25 @@ namespace qpid {
* dispatch or to be added to the in-memory queue.
*/
class TxPublish : public TxOp, public Deliverable{
- class Prepare{
- TransactionContext* ctxt;
+
+ class Commit{
boost::intrusive_ptr<Message>& msg;
public:
- Prepare(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg);
+ Commit(boost::intrusive_ptr<Message>& msg);
void operator()(const boost::shared_ptr<Queue>& queue);
};
-
- class Commit{
+ class Rollback{
boost::intrusive_ptr<Message>& msg;
public:
- Commit(boost::intrusive_ptr<Message>& msg);
+ Rollback(boost::intrusive_ptr<Message>& msg);
void operator()(const boost::shared_ptr<Queue>& queue);
};
boost::intrusive_ptr<Message> msg;
std::list<Queue::shared_ptr> queues;
+ std::list<Queue::shared_ptr> prepared;
+
+ void prepare(TransactionContext* ctxt, boost::shared_ptr<Queue>);
public:
QPID_BROKER_EXTERN TxPublish(boost::intrusive_ptr<Message> msg);