diff options
author | Carl C. Trieloff <cctrieloff@apache.org> | 2009-09-23 20:34:48 +0000 |
---|---|---|
committer | Carl C. Trieloff <cctrieloff@apache.org> | 2009-09-23 20:34:48 +0000 |
commit | f022060ada6a14a91a762bbe3cd351244f8c06a5 (patch) | |
tree | 12064e0f37c03d8170b16fd69e5cfdbff62936d8 /cpp/src | |
parent | e22a66333c43313a480273577d8834a1a04fabdc (diff) | |
download | qpid-python-f022060ada6a14a91a762bbe3cd351244f8c06a5.tar.gz |
This patch requires svn 817742, corrects the lock issue for ring queue in 817742, and protects replication when used together with flow to disk
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@818244 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 8 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 28 | ||||
-rw-r--r-- | cpp/src/qpid/broker/QueueEvents.cpp | 41 | ||||
-rw-r--r-- | cpp/src/qpid/broker/QueueEvents.h | 4 |
5 files changed, 58 insertions, 24 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 13cf88fb11..427226f77e 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -91,7 +91,8 @@ Broker::Options::Options(const std::string& name) : queueLimit(100*1048576/*100M default limit*/), tcpNoDelay(false), requireEncrypted(false), - maxSessionRate(0) + maxSessionRate(0), + asyncQueueEvents(true) { int c = sys::SystemInfo::concurrency(); workerThreads=c+1; @@ -121,7 +122,8 @@ Broker::Options::Options(const std::string& name) : ("tcp-nodelay", optValue(tcpNoDelay), "Set TCP_NODELAY on TCP connections") ("require-encryption", optValue(requireEncrypted), "Only accept connections that are encrypted") ("known-hosts-url", optValue(knownHosts, "URL or 'none'"), "URL to send as 'known-hosts' to clients ('none' implies empty list)") - ("max-session-rate", optValue(maxSessionRate, "MESSAGES/S"), "Sets the maximum message rate per session (0=unlimited)"); + ("max-session-rate", optValue(maxSessionRate, "MESSAGES/S"), "Sets the maximum message rate per session (0=unlimited)") + ("async-queue-events", optValue(asyncQueueEvents, "yes|no"), "Set Queue Events async, used for services like replication"); } const std::string empty; @@ -150,7 +152,7 @@ Broker::Broker(const Broker::Options& conf) : *this), managementAgent(conf.enableMgmt ? new ManagementAgent() : 0), queueCleaner(queues, timer), - queueEvents(poller), + queueEvents(poller,!conf.asyncQueueEvents), recovery(true), expiryPolicy(new ExpiryPolicy), getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this)) diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index 0517ceca95..87fa45d947 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -111,6 +111,7 @@ public: bool requireEncrypted; std::string knownHosts; uint32_t maxSessionRate; + bool asyncQueueEvents; private: std::string getHome(); diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 1cc48a949e..80794f791f 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -569,9 +569,6 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ { Mutex::ScopedLock locker(messageLock); QueuedMessage qm(this, msg, ++sequence); - if (policy.get()) { - policy->enqueued(qm); - } if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence); LVQ::iterator i; @@ -605,6 +602,10 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ if (eventMgr) eventMgr->enqueued(qm); else QPID_LOG(warning, "Enqueue manager not set, events not generated for " << getName()); } + if (policy.get()) { + Mutex::ScopedUnlock locker(messageLock); + policy->enqueued(qm); + } } copy.notify(); } @@ -792,9 +793,16 @@ void Queue::create(const FieldTable& _settings) void Queue::configure(const FieldTable& _settings, bool recovering) { + + eventMode = _settings.getAsInt(qpidQueueEventGeneration); + if (QueuePolicy::getType(_settings) == QueuePolicy::FLOW_TO_DISK && - (!store || NullMessageStore::isNullStore(store))) { - QPID_LOG(warning, "Flow to disk not valid for non-persisted queue"); + (!store || NullMessageStore::isNullStore(store) || (eventMode && eventMgr && !eventMgr->isSync()) )) { + if ( NullMessageStore::isNullStore(store)) { + QPID_LOG(warning, "Flow to disk not valid for non-persisted queue:" << getName()); + } else if (eventMgr && !eventMgr->isSync() ) { + QPID_LOG(warning, "Flow to disk not valid with async Queue Events:" << getName()); + } FieldTable copy(_settings); copy.erase(QueuePolicy::typeKey); setPolicy(QueuePolicy::createQueuePolicy(getName(), copy)); @@ -803,19 +811,19 @@ void Queue::configure(const FieldTable& _settings, bool recovering) } //set this regardless of owner to allow use of no-local with exclusive consumers also noLocal = _settings.get(qpidNoLocal); - QPID_LOG(debug, "Configured queue with no-local=" << noLocal); + QPID_LOG(debug, "Configured queue " << getName() << " with no-local=" << noLocal); lastValueQueue= _settings.get(qpidLastValueQueue); - if (lastValueQueue) QPID_LOG(debug, "Configured queue as Last Value Queue"); + if (lastValueQueue) QPID_LOG(debug, "Configured queue as Last Value Queue for: " << getName()); lastValueQueueNoBrowse = _settings.get(qpidLastValueQueueNoBrowse); if (lastValueQueueNoBrowse){ - QPID_LOG(debug, "Configured queue as Last Value Queue No Browse"); + QPID_LOG(debug, "Configured queue as Last Value Queue No Browse for: " << getName()); lastValueQueue = lastValueQueueNoBrowse; } persistLastNode= _settings.get(qpidPersistLastNode); - if (persistLastNode) QPID_LOG(debug, "Configured queue to Persist data if cluster fails to one node"); + if (persistLastNode) QPID_LOG(debug, "Configured queue to Persist data if cluster fails to one node for: " << getName()); traceId = _settings.getAsString(qpidTraceIdentity); std::string excludeList = _settings.getAsString(qpidTraceExclude); @@ -825,8 +833,6 @@ void Queue::configure(const FieldTable& _settings, bool recovering) QPID_LOG(debug, "Configured queue " << getName() << " with qpid.trace.id='" << traceId << "' and qpid.trace.exclude='"<< excludeList << "' i.e. " << traceExclude.size() << " elements"); - eventMode = _settings.getAsInt(qpidQueueEventGeneration); - FieldTable::ValuePtr p =_settings.get(qpidInsertSequenceNumbers); if (p && p->convertsTo<std::string>()) insertSequenceNumbers(p->get<std::string>()); diff --git a/cpp/src/qpid/broker/QueueEvents.cpp b/cpp/src/qpid/broker/QueueEvents.cpp index 6df869673d..bba054b0b8 100644 --- a/cpp/src/qpid/broker/QueueEvents.cpp +++ b/cpp/src/qpid/broker/QueueEvents.cpp @@ -25,25 +25,41 @@ namespace qpid { namespace broker { -QueueEvents::QueueEvents(const boost::shared_ptr<sys::Poller>& poller) : - eventQueue(boost::bind(&QueueEvents::handle, this, _1), poller), enabled(true) +QueueEvents::QueueEvents(const boost::shared_ptr<sys::Poller>& poller, bool isSync) : + eventQueue(boost::bind(&QueueEvents::handle, this, _1), poller), enabled(true), sync(isSync) { - eventQueue.start(); + if (!sync) eventQueue.start(); } QueueEvents::~QueueEvents() { - eventQueue.stop(); + if (!sync) eventQueue.stop(); } void QueueEvents::enqueued(const QueuedMessage& m) { - if (enabled) eventQueue.push(Event(ENQUEUE, m)); + if (enabled) { + Event enq(ENQUEUE, m); + if (sync) { + for (Listeners::iterator j = listeners.begin(); j != listeners.end(); j++) + j->second(enq); + } else { + eventQueue.push(enq); + } + } } void QueueEvents::dequeued(const QueuedMessage& m) { - if (enabled) eventQueue.push(Event(DEQUEUE, m)); + if (enabled) { + Event deq(DEQUEUE, m); + if (sync) { + for (Listeners::iterator j = listeners.begin(); j != listeners.end(); j++) + j->second(deq); + } else { + eventQueue.push(Event(DEQUEUE, m)); + } + } } void QueueEvents::registerListener(const std::string& id, const EventListener& listener) @@ -70,15 +86,16 @@ QueueEvents::EventQueue::Batch::const_iterator QueueEvents::handle(const EventQueue::Batch& events) { qpid::sys::Mutex::ScopedLock l(lock); for (EventQueue::Batch::const_iterator i = events.begin(); i != events.end(); ++i) { - for (Listeners::iterator j = listeners.begin(); j != listeners.end(); j++) - j->second(*i); + for (Listeners::iterator j = listeners.begin(); j != listeners.end(); j++) { + j->second(*i); + } } return events.end(); } void QueueEvents::shutdown() { - if (!eventQueue.empty() && !listeners.empty()) eventQueue.shutdown(); + if (!sync && !eventQueue.empty() && !listeners.empty()) eventQueue.shutdown(); } void QueueEvents::enable() @@ -93,6 +110,12 @@ void QueueEvents::disable() QPID_LOG(debug, "Queue events disabled"); } +bool QueueEvents::isSync() +{ + return sync; +} + + QueueEvents::Event::Event(EventType t, const QueuedMessage& m) : type(t), msg(m) {} diff --git a/cpp/src/qpid/broker/QueueEvents.h b/cpp/src/qpid/broker/QueueEvents.h index 6826c6e79a..c42752133e 100644 --- a/cpp/src/qpid/broker/QueueEvents.h +++ b/cpp/src/qpid/broker/QueueEvents.h @@ -54,7 +54,7 @@ class QueueEvents typedef boost::function<void (Event)> EventListener; - QPID_BROKER_EXTERN QueueEvents(const boost::shared_ptr<sys::Poller>& poller); + QPID_BROKER_EXTERN QueueEvents(const boost::shared_ptr<sys::Poller>& poller, bool isSync = false); QPID_BROKER_EXTERN ~QueueEvents(); QPID_BROKER_EXTERN void enqueued(const QueuedMessage&); QPID_BROKER_EXTERN void dequeued(const QueuedMessage&); @@ -65,6 +65,7 @@ class QueueEvents void disable(); //process all outstanding events QPID_BROKER_EXTERN void shutdown(); + QPID_BROKER_EXTERN bool isSync(); private: typedef qpid::sys::PollableQueue<Event> EventQueue; typedef std::map<std::string, EventListener> Listeners; @@ -73,6 +74,7 @@ class QueueEvents Listeners listeners; volatile bool enabled; qpid::sys::Mutex lock;//protect listeners from concurrent access + bool sync; EventQueue::Batch::const_iterator handle(const EventQueue::Batch& e); |