summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorCarl C. Trieloff <cctrieloff@apache.org>2009-09-23 20:34:48 +0000
committerCarl C. Trieloff <cctrieloff@apache.org>2009-09-23 20:34:48 +0000
commitf022060ada6a14a91a762bbe3cd351244f8c06a5 (patch)
tree12064e0f37c03d8170b16fd69e5cfdbff62936d8 /cpp/src
parente22a66333c43313a480273577d8834a1a04fabdc (diff)
downloadqpid-python-f022060ada6a14a91a762bbe3cd351244f8c06a5.tar.gz
This patch requires svn 817742, corrects the lock issue for ring queue in 817742, and protects replication when used together with flow to disk
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@818244 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/Broker.cpp8
-rw-r--r--cpp/src/qpid/broker/Broker.h1
-rw-r--r--cpp/src/qpid/broker/Queue.cpp28
-rw-r--r--cpp/src/qpid/broker/QueueEvents.cpp41
-rw-r--r--cpp/src/qpid/broker/QueueEvents.h4
5 files changed, 58 insertions, 24 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index 13cf88fb11..427226f77e 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -91,7 +91,8 @@ Broker::Options::Options(const std::string& name) :
queueLimit(100*1048576/*100M default limit*/),
tcpNoDelay(false),
requireEncrypted(false),
- maxSessionRate(0)
+ maxSessionRate(0),
+ asyncQueueEvents(true)
{
int c = sys::SystemInfo::concurrency();
workerThreads=c+1;
@@ -121,7 +122,8 @@ Broker::Options::Options(const std::string& name) :
("tcp-nodelay", optValue(tcpNoDelay), "Set TCP_NODELAY on TCP connections")
("require-encryption", optValue(requireEncrypted), "Only accept connections that are encrypted")
("known-hosts-url", optValue(knownHosts, "URL or 'none'"), "URL to send as 'known-hosts' to clients ('none' implies empty list)")
- ("max-session-rate", optValue(maxSessionRate, "MESSAGES/S"), "Sets the maximum message rate per session (0=unlimited)");
+ ("max-session-rate", optValue(maxSessionRate, "MESSAGES/S"), "Sets the maximum message rate per session (0=unlimited)")
+ ("async-queue-events", optValue(asyncQueueEvents, "yes|no"), "Set Queue Events async, used for services like replication");
}
const std::string empty;
@@ -150,7 +152,7 @@ Broker::Broker(const Broker::Options& conf) :
*this),
managementAgent(conf.enableMgmt ? new ManagementAgent() : 0),
queueCleaner(queues, timer),
- queueEvents(poller),
+ queueEvents(poller,!conf.asyncQueueEvents),
recovery(true),
expiryPolicy(new ExpiryPolicy),
getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this))
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index 0517ceca95..87fa45d947 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -111,6 +111,7 @@ public:
bool requireEncrypted;
std::string knownHosts;
uint32_t maxSessionRate;
+ bool asyncQueueEvents;
private:
std::string getHome();
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 1cc48a949e..80794f791f 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -569,9 +569,6 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
{
Mutex::ScopedLock locker(messageLock);
QueuedMessage qm(this, msg, ++sequence);
- if (policy.get()) {
- policy->enqueued(qm);
- }
if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence);
LVQ::iterator i;
@@ -605,6 +602,10 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
if (eventMgr) eventMgr->enqueued(qm);
else QPID_LOG(warning, "Enqueue manager not set, events not generated for " << getName());
}
+ if (policy.get()) {
+ Mutex::ScopedUnlock locker(messageLock);
+ policy->enqueued(qm);
+ }
}
copy.notify();
}
@@ -792,9 +793,16 @@ void Queue::create(const FieldTable& _settings)
void Queue::configure(const FieldTable& _settings, bool recovering)
{
+
+ eventMode = _settings.getAsInt(qpidQueueEventGeneration);
+
if (QueuePolicy::getType(_settings) == QueuePolicy::FLOW_TO_DISK &&
- (!store || NullMessageStore::isNullStore(store))) {
- QPID_LOG(warning, "Flow to disk not valid for non-persisted queue");
+ (!store || NullMessageStore::isNullStore(store) || (eventMode && eventMgr && !eventMgr->isSync()) )) {
+ if ( NullMessageStore::isNullStore(store)) {
+ QPID_LOG(warning, "Flow to disk not valid for non-persisted queue:" << getName());
+ } else if (eventMgr && !eventMgr->isSync() ) {
+ QPID_LOG(warning, "Flow to disk not valid with async Queue Events:" << getName());
+ }
FieldTable copy(_settings);
copy.erase(QueuePolicy::typeKey);
setPolicy(QueuePolicy::createQueuePolicy(getName(), copy));
@@ -803,19 +811,19 @@ void Queue::configure(const FieldTable& _settings, bool recovering)
}
//set this regardless of owner to allow use of no-local with exclusive consumers also
noLocal = _settings.get(qpidNoLocal);
- QPID_LOG(debug, "Configured queue with no-local=" << noLocal);
+ QPID_LOG(debug, "Configured queue " << getName() << " with no-local=" << noLocal);
lastValueQueue= _settings.get(qpidLastValueQueue);
- if (lastValueQueue) QPID_LOG(debug, "Configured queue as Last Value Queue");
+ if (lastValueQueue) QPID_LOG(debug, "Configured queue as Last Value Queue for: " << getName());
lastValueQueueNoBrowse = _settings.get(qpidLastValueQueueNoBrowse);
if (lastValueQueueNoBrowse){
- QPID_LOG(debug, "Configured queue as Last Value Queue No Browse");
+ QPID_LOG(debug, "Configured queue as Last Value Queue No Browse for: " << getName());
lastValueQueue = lastValueQueueNoBrowse;
}
persistLastNode= _settings.get(qpidPersistLastNode);
- if (persistLastNode) QPID_LOG(debug, "Configured queue to Persist data if cluster fails to one node");
+ if (persistLastNode) QPID_LOG(debug, "Configured queue to Persist data if cluster fails to one node for: " << getName());
traceId = _settings.getAsString(qpidTraceIdentity);
std::string excludeList = _settings.getAsString(qpidTraceExclude);
@@ -825,8 +833,6 @@ void Queue::configure(const FieldTable& _settings, bool recovering)
QPID_LOG(debug, "Configured queue " << getName() << " with qpid.trace.id='" << traceId
<< "' and qpid.trace.exclude='"<< excludeList << "' i.e. " << traceExclude.size() << " elements");
- eventMode = _settings.getAsInt(qpidQueueEventGeneration);
-
FieldTable::ValuePtr p =_settings.get(qpidInsertSequenceNumbers);
if (p && p->convertsTo<std::string>()) insertSequenceNumbers(p->get<std::string>());
diff --git a/cpp/src/qpid/broker/QueueEvents.cpp b/cpp/src/qpid/broker/QueueEvents.cpp
index 6df869673d..bba054b0b8 100644
--- a/cpp/src/qpid/broker/QueueEvents.cpp
+++ b/cpp/src/qpid/broker/QueueEvents.cpp
@@ -25,25 +25,41 @@
namespace qpid {
namespace broker {
-QueueEvents::QueueEvents(const boost::shared_ptr<sys::Poller>& poller) :
- eventQueue(boost::bind(&QueueEvents::handle, this, _1), poller), enabled(true)
+QueueEvents::QueueEvents(const boost::shared_ptr<sys::Poller>& poller, bool isSync) :
+ eventQueue(boost::bind(&QueueEvents::handle, this, _1), poller), enabled(true), sync(isSync)
{
- eventQueue.start();
+ if (!sync) eventQueue.start();
}
QueueEvents::~QueueEvents()
{
- eventQueue.stop();
+ if (!sync) eventQueue.stop();
}
void QueueEvents::enqueued(const QueuedMessage& m)
{
- if (enabled) eventQueue.push(Event(ENQUEUE, m));
+ if (enabled) {
+ Event enq(ENQUEUE, m);
+ if (sync) {
+ for (Listeners::iterator j = listeners.begin(); j != listeners.end(); j++)
+ j->second(enq);
+ } else {
+ eventQueue.push(enq);
+ }
+ }
}
void QueueEvents::dequeued(const QueuedMessage& m)
{
- if (enabled) eventQueue.push(Event(DEQUEUE, m));
+ if (enabled) {
+ Event deq(DEQUEUE, m);
+ if (sync) {
+ for (Listeners::iterator j = listeners.begin(); j != listeners.end(); j++)
+ j->second(deq);
+ } else {
+ eventQueue.push(Event(DEQUEUE, m));
+ }
+ }
}
void QueueEvents::registerListener(const std::string& id, const EventListener& listener)
@@ -70,15 +86,16 @@ QueueEvents::EventQueue::Batch::const_iterator
QueueEvents::handle(const EventQueue::Batch& events) {
qpid::sys::Mutex::ScopedLock l(lock);
for (EventQueue::Batch::const_iterator i = events.begin(); i != events.end(); ++i) {
- for (Listeners::iterator j = listeners.begin(); j != listeners.end(); j++)
- j->second(*i);
+ for (Listeners::iterator j = listeners.begin(); j != listeners.end(); j++) {
+ j->second(*i);
+ }
}
return events.end();
}
void QueueEvents::shutdown()
{
- if (!eventQueue.empty() && !listeners.empty()) eventQueue.shutdown();
+ if (!sync && !eventQueue.empty() && !listeners.empty()) eventQueue.shutdown();
}
void QueueEvents::enable()
@@ -93,6 +110,12 @@ void QueueEvents::disable()
QPID_LOG(debug, "Queue events disabled");
}
+bool QueueEvents::isSync()
+{
+ return sync;
+}
+
+
QueueEvents::Event::Event(EventType t, const QueuedMessage& m) : type(t), msg(m) {}
diff --git a/cpp/src/qpid/broker/QueueEvents.h b/cpp/src/qpid/broker/QueueEvents.h
index 6826c6e79a..c42752133e 100644
--- a/cpp/src/qpid/broker/QueueEvents.h
+++ b/cpp/src/qpid/broker/QueueEvents.h
@@ -54,7 +54,7 @@ class QueueEvents
typedef boost::function<void (Event)> EventListener;
- QPID_BROKER_EXTERN QueueEvents(const boost::shared_ptr<sys::Poller>& poller);
+ QPID_BROKER_EXTERN QueueEvents(const boost::shared_ptr<sys::Poller>& poller, bool isSync = false);
QPID_BROKER_EXTERN ~QueueEvents();
QPID_BROKER_EXTERN void enqueued(const QueuedMessage&);
QPID_BROKER_EXTERN void dequeued(const QueuedMessage&);
@@ -65,6 +65,7 @@ class QueueEvents
void disable();
//process all outstanding events
QPID_BROKER_EXTERN void shutdown();
+ QPID_BROKER_EXTERN bool isSync();
private:
typedef qpid::sys::PollableQueue<Event> EventQueue;
typedef std::map<std::string, EventListener> Listeners;
@@ -73,6 +74,7 @@ class QueueEvents
Listeners listeners;
volatile bool enabled;
qpid::sys::Mutex lock;//protect listeners from concurrent access
+ bool sync;
EventQueue::Batch::const_iterator handle(const EventQueue::Batch& e);