diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/Broker.h | 16 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Connection.cpp | 51 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Connection.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DtxManager.cpp | 5 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DtxManager.h | 8 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DtxTimeout.h | 6 | ||||
-rw-r--r-- | cpp/src/qpid/broker/LinkRegistry.cpp | 10 | ||||
-rw-r--r-- | cpp/src/qpid/broker/LinkRegistry.h | 6 | ||||
-rw-r--r-- | cpp/src/qpid/broker/NullMessageStore.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/QueueCleaner.cpp | 8 | ||||
-rw-r--r-- | cpp/src/qpid/broker/QueueCleaner.h | 12 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionState.cpp | 17 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionState.h | 7 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ExpiryPolicy.cpp | 8 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ExpiryPolicy.h | 9 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementAgent.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementAgent.h | 6 | ||||
-rw-r--r-- | cpp/src/qpid/sys/DispatchHandle.h | 6 | ||||
-rw-r--r-- | cpp/src/qpid/sys/Timer.cpp | 12 | ||||
-rw-r--r-- | cpp/src/qpid/sys/Timer.h | 3 |
20 files changed, 103 insertions, 99 deletions
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index 1e0ac64e01..8f4621bb39 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -36,6 +36,7 @@ #include "QueueEvents.h" #include "Vhost.h" #include "System.h" +#include "Timer.h" #include "ExpiryPolicy.h" #include "qpid/management/Manageable.h" #include "qpid/management/ManagementAgent.h" @@ -48,7 +49,6 @@ #include "qpid/framing/OutputHandler.h" #include "qpid/framing/ProtocolInitiation.h" #include "qpid/sys/Runnable.h" -#include "qpid/sys/Timer.h" #include "qpid/RefCounted.h" #include "AclModule.h" @@ -112,14 +112,13 @@ public: std::string knownHosts; uint32_t maxSessionRate; }; - + private: typedef std::map<std::string, boost::shared_ptr<sys::ProtocolFactory> > ProtocolFactoryMap; void declareStandardExchange(const std::string& name, const std::string& type); boost::shared_ptr<sys::Poller> poller; - sys::Timer timer; Options config; ProtocolFactoryMap protocolFactories; std::auto_ptr<MessageStore> store; @@ -130,6 +129,7 @@ public: ExchangeRegistry exchanges; LinkRegistry links; boost::shared_ptr<sys::ConnectionCodec::Factory> factory; + Timer timer; DtxManager dtxManager; SessionManager sessionManager; management::ManagementAgent* managementAgent; @@ -145,6 +145,8 @@ public: boost::intrusive_ptr<ExpiryPolicy> expiryPolicy; public: + + virtual ~Broker(); QPID_BROKER_EXTERN Broker(const Options& configuration); @@ -183,7 +185,7 @@ public: void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) { expiryPolicy = e; } boost::intrusive_ptr<ExpiryPolicy> getExpiryPolicy() { return expiryPolicy; } - + SessionManager& getSessionManager() { return sessionManager; } const std::string& getFederationTag() const { return federationTag; } @@ -192,7 +194,7 @@ public: management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text); - + /** Add to the broker's protocolFactorys */ void registerProtocolFactory(const std::string& name, boost::shared_ptr<sys::ProtocolFactory>); @@ -224,7 +226,7 @@ public: boost::shared_ptr<sys::ConnectionCodec::Factory> getConnectionFactory() { return factory; } void setConnectionFactory(boost::shared_ptr<sys::ConnectionCodec::Factory> f) { factory = f; } - sys::Timer& getTimer() { return timer; } + Timer& getTimer() { return timer; } boost::function<std::vector<Url> ()> getKnownBrokers; @@ -237,5 +239,7 @@ public: }; }} + + #endif /*!_Broker_*/ diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index c0e9429ba9..a54bcc6db9 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -49,25 +49,35 @@ namespace _qmf = qmf::org::apache::qpid::broker; namespace qpid { namespace broker { -struct ConnectionTimeoutTask : public sys::TimerTask { - sys::Timer& timer; +struct ConnectionTimeoutTask : public TimerTask { + Timer& timer; Connection& connection; + AbsTime expires; - ConnectionTimeoutTask(uint16_t hb, sys::Timer& t, Connection& c) : + ConnectionTimeoutTask(uint16_t hb, Timer& t, Connection& c) : TimerTask(Duration(hb*2*TIME_SEC)), timer(t), - connection(c) + connection(c), + expires(AbsTime::now(), duration) {} - void touch() { - restart(); + void touch() + { + expires = AbsTime(AbsTime::now(), duration); } void fire() { - // If we get here then we've not received any traffic in the timeout period - // Schedule closing the connection for the io thread - QPID_LOG(error, "Connection timed out: closing"); - connection.abort(); + // This is the best we can currently do to avoid a destruction/fire race + if (isCancelled()) return; + if (expires < AbsTime::now()) { + // If we get here then we've not received any traffic in the timeout period + // Schedule closing the connection for the io thread + QPID_LOG(error, "Connection timed out: closing"); + connection.abort(); + } else { + reset(); + timer.add(this); + } } }; @@ -328,22 +338,25 @@ void Connection::setSecureConnection(SecureConnection* s) adapter.setSecureConnection(s); } -struct ConnectionHeartbeatTask : public sys::TimerTask { - sys::Timer& timer; +struct ConnectionHeartbeatTask : public TimerTask { + Timer& timer; Connection& connection; - ConnectionHeartbeatTask(uint16_t hb, sys::Timer& t, Connection& c) : + ConnectionHeartbeatTask(uint16_t hb, Timer& t, Connection& c) : TimerTask(Duration(hb*TIME_SEC)), timer(t), connection(c) {} void fire() { - // Setup next firing - setupNextFire(); - timer.add(this); - - // Send Heartbeat - connection.sendHeartbeat(); + // This is the best we can currently do to avoid a destruction/fire race + if (!isCancelled()) { + // Setup next firing + reset(); + timer.add(this); + + // Send Heartbeat + connection.sendHeartbeat(); + } } }; diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h index f3a6cb2b7a..17bc8f0970 100644 --- a/cpp/src/qpid/broker/Connection.h +++ b/cpp/src/qpid/broker/Connection.h @@ -152,8 +152,8 @@ class Connection : public sys::ConnectionInputHandler, qmf::org::apache::qpid::broker::Connection* mgmtObject; LinkRegistry& links; management::ManagementAgent* agent; - sys::Timer& timer; - boost::intrusive_ptr<sys::TimerTask> heartbeatTimer; + Timer& timer; + boost::intrusive_ptr<TimerTask> heartbeatTimer; boost::intrusive_ptr<ConnectionTimeoutTask> timeoutTimer; ErrorListener* errorListener; bool shadow; diff --git a/cpp/src/qpid/broker/DtxManager.cpp b/cpp/src/qpid/broker/DtxManager.cpp index a9bdb5e152..11e16ec837 100644 --- a/cpp/src/qpid/broker/DtxManager.cpp +++ b/cpp/src/qpid/broker/DtxManager.cpp @@ -33,7 +33,7 @@ using qpid::ptr_map_ptr; using namespace qpid::broker; using namespace qpid::framing; -DtxManager::DtxManager(sys::Timer& t) : store(0), timer(t) {} +DtxManager::DtxManager(Timer& t) : store(0), timer(t) {} DtxManager::~DtxManager() {} @@ -130,7 +130,8 @@ void DtxManager::setTimeout(const std::string& xid, uint32_t secs) } timeout = intrusive_ptr<DtxTimeout>(new DtxTimeout(secs, *this, xid)); record->setTimeout(timeout); - timer.add(timeout); + timer.add(boost::static_pointer_cast<TimerTask>(timeout)); + } uint32_t DtxManager::getTimeout(const std::string& xid) diff --git a/cpp/src/qpid/broker/DtxManager.h b/cpp/src/qpid/broker/DtxManager.h index bfea5e6daf..a61e8610f0 100644 --- a/cpp/src/qpid/broker/DtxManager.h +++ b/cpp/src/qpid/broker/DtxManager.h @@ -24,9 +24,9 @@ #include <boost/ptr_container/ptr_map.hpp> #include "DtxBuffer.h" #include "DtxWorkRecord.h" +#include "Timer.h" #include "TransactionalStore.h" #include "qpid/framing/amqp_types.h" -#include "qpid/sys/Timer.h" #include "qpid/sys/Mutex.h" namespace qpid { @@ -35,7 +35,7 @@ namespace broker { class DtxManager{ typedef boost::ptr_map<std::string, DtxWorkRecord> WorkMap; - struct DtxCleanup : public sys::TimerTask + struct DtxCleanup : public TimerTask { DtxManager& mgr; const std::string& xid; @@ -47,14 +47,14 @@ class DtxManager{ WorkMap work; TransactionalStore* store; qpid::sys::Mutex lock; - qpid::sys::Timer& timer; + Timer& timer; void remove(const std::string& xid); DtxWorkRecord* getWork(const std::string& xid); DtxWorkRecord* createWork(std::string xid); public: - DtxManager(qpid::sys::Timer&); + DtxManager(Timer&); ~DtxManager(); void start(const std::string& xid, DtxBuffer::shared_ptr work); void join(const std::string& xid, DtxBuffer::shared_ptr work); diff --git a/cpp/src/qpid/broker/DtxTimeout.h b/cpp/src/qpid/broker/DtxTimeout.h index 680a210e4f..6e949eab0d 100644 --- a/cpp/src/qpid/broker/DtxTimeout.h +++ b/cpp/src/qpid/broker/DtxTimeout.h @@ -22,7 +22,7 @@ #define _DtxTimeout_ #include "qpid/Exception.h" -#include "qpid/sys/Timer.h" +#include "Timer.h" namespace qpid { namespace broker { @@ -31,12 +31,12 @@ class DtxManager; struct DtxTimeoutException : public Exception {}; -struct DtxTimeout : public sys::TimerTask +struct DtxTimeout : public TimerTask { const uint32_t timeout; DtxManager& mgr; const std::string xid; - + DtxTimeout(uint32_t timeout, DtxManager& mgr, const std::string& xid); void fire(); }; diff --git a/cpp/src/qpid/broker/LinkRegistry.cpp b/cpp/src/qpid/broker/LinkRegistry.cpp index a82e828138..c6e10f0f8c 100644 --- a/cpp/src/qpid/broker/LinkRegistry.cpp +++ b/cpp/src/qpid/broker/LinkRegistry.cpp @@ -36,12 +36,10 @@ namespace _qmf = qmf::org::apache::qpid::broker; #define LINK_MAINT_INTERVAL 2 -LinkRegistry::LinkRegistry (Broker* _broker) : - broker(_broker), timer(broker->getTimer()), - parent(0), store(0), passive(false), passiveChanged(false), - realm(broker ? broker->getOptions().realm : "") +LinkRegistry::LinkRegistry (Broker* _broker) : broker(_broker), parent(0), store(0), passive(false), passiveChanged(false), + realm(broker ? broker->getOptions().realm : "") { - timer.add (new Periodic(*this)); + timer.add (intrusive_ptr<TimerTask> (new Periodic(*this))); } LinkRegistry::Periodic::Periodic (LinkRegistry& _links) : @@ -50,7 +48,7 @@ LinkRegistry::Periodic::Periodic (LinkRegistry& _links) : void LinkRegistry::Periodic::fire () { links.periodicMaintenance (); - links.timer.add (new Periodic(links)); + links.timer.add (intrusive_ptr<TimerTask> (new Periodic(links))); } void LinkRegistry::periodicMaintenance () diff --git a/cpp/src/qpid/broker/LinkRegistry.h b/cpp/src/qpid/broker/LinkRegistry.h index 1caffb9232..07fff5b979 100644 --- a/cpp/src/qpid/broker/LinkRegistry.h +++ b/cpp/src/qpid/broker/LinkRegistry.h @@ -25,9 +25,9 @@ #include <map> #include "Bridge.h" #include "MessageStore.h" +#include "Timer.h" #include "qpid/Address.h" #include "qpid/sys/Mutex.h" -#include "qpid/sys/Timer.h" #include "qpid/management/Manageable.h" #include <boost/shared_ptr.hpp> @@ -41,7 +41,7 @@ namespace broker { // Declare a timer task to manage the establishment of link connections and the // re-establishment of lost link connections. - struct Periodic : public sys::TimerTask + struct Periodic : public TimerTask { LinkRegistry& links; @@ -62,7 +62,7 @@ namespace broker { qpid::sys::Mutex lock; Broker* broker; - sys::Timer& timer; + Timer timer; management::Manageable* parent; MessageStore* store; bool passive; diff --git a/cpp/src/qpid/broker/NullMessageStore.cpp b/cpp/src/qpid/broker/NullMessageStore.cpp index f5b0163c0f..62b546b3eb 100644 --- a/cpp/src/qpid/broker/NullMessageStore.cpp +++ b/cpp/src/qpid/broker/NullMessageStore.cpp @@ -49,7 +49,7 @@ public: }; NullMessageStore::NullMessageStore() : nextPersistenceId(1) { - QPID_LOG(info, "No message store configured, persistence is disabled."); + QPID_LOG(info, "No message store configured, persistence is disabled.") } bool NullMessageStore::init(const Options* /*options*/) {return true;} diff --git a/cpp/src/qpid/broker/QueueCleaner.cpp b/cpp/src/qpid/broker/QueueCleaner.cpp index 814eca6751..0774dce2b7 100644 --- a/cpp/src/qpid/broker/QueueCleaner.cpp +++ b/cpp/src/qpid/broker/QueueCleaner.cpp @@ -26,15 +26,15 @@ namespace qpid { namespace broker { -QueueCleaner::QueueCleaner(QueueRegistry& q, sys::Timer& t) : queues(q), timer(t) {} +QueueCleaner::QueueCleaner(QueueRegistry& q, Timer& t) : queues(q), timer(t) {} void QueueCleaner::start(qpid::sys::Duration p) { - task = new Task(*this, p); + task = boost::intrusive_ptr<TimerTask>(new Task(*this, p)); timer.add(task); } -QueueCleaner::Task::Task(QueueCleaner& p, qpid::sys::Duration d) : sys::TimerTask(d), parent(p) {} +QueueCleaner::Task::Task(QueueCleaner& p, qpid::sys::Duration d) : TimerTask(d), parent(p) {} void QueueCleaner::Task::fire() { @@ -44,7 +44,7 @@ void QueueCleaner::Task::fire() void QueueCleaner::fired() { queues.eachQueue(boost::bind(&Queue::purgeExpired, _1)); - task->setupNextFire(); + task->reset(); timer.add(task); } diff --git a/cpp/src/qpid/broker/QueueCleaner.h b/cpp/src/qpid/broker/QueueCleaner.h index 0fbb12a5d4..007826f33e 100644 --- a/cpp/src/qpid/broker/QueueCleaner.h +++ b/cpp/src/qpid/broker/QueueCleaner.h @@ -23,7 +23,7 @@ */ #include "BrokerImportExport.h" -#include "qpid/sys/Timer.h" +#include "Timer.h" namespace qpid { namespace broker { @@ -35,10 +35,10 @@ class QueueRegistry; class QueueCleaner { public: - QPID_BROKER_EXTERN QueueCleaner(QueueRegistry& queues, sys::Timer& timer); + QPID_BROKER_EXTERN QueueCleaner(QueueRegistry& queues, Timer& timer); QPID_BROKER_EXTERN void start(qpid::sys::Duration period); private: - class Task : public sys::TimerTask + class Task : public TimerTask { public: Task(QueueCleaner& parent, qpid::sys::Duration duration); @@ -46,10 +46,10 @@ class QueueCleaner private: QueueCleaner& parent; }; - - boost::intrusive_ptr<sys::TimerTask> task; + + boost::intrusive_ptr<TimerTask> task; QueueRegistry& queues; - sys::Timer& timer; + Timer& timer; void fired(); }; diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index 2b8048ea3d..b465a65bd3 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -25,7 +25,7 @@ #include "SessionManager.h" #include "SessionHandler.h" #include "RateFlowcontrol.h" -#include "qpid/sys/Timer.h" +#include "Timer.h" #include "qpid/framing/AMQContentBody.h" #include "qpid/framing/AMQHeaderBody.h" #include "qpid/framing/AMQMethodBody.h" @@ -49,7 +49,6 @@ using qpid::management::ManagementObject; using qpid::management::Manageable; using qpid::management::Args; using qpid::sys::AbsTime; -//using qpid::sys::Timer; namespace _qmf = qmf::org::apache::qpid::broker; SessionState::SessionState( @@ -207,10 +206,10 @@ void SessionState::handleCommand(framing::AMQMethodBody* method, const SequenceN } } -struct ScheduledCreditTask : public sys::TimerTask { - sys::Timer& timer; +struct ScheduledCreditTask : public TimerTask { + Timer& timer; SessionState& sessionState; - ScheduledCreditTask(const qpid::sys::Duration& d, sys::Timer& t, + ScheduledCreditTask(const qpid::sys::Duration& d, Timer& t, SessionState& s) : TimerTask(d), timer(t), @@ -219,13 +218,15 @@ struct ScheduledCreditTask : public sys::TimerTask { void fire() { // This is the best we can currently do to avoid a destruction/fire race - sessionState.getConnection().requestIOProcessing(boost::bind(&ScheduledCreditTask::sendCredit, this)); + if (!isCancelled()) { + sessionState.getConnection().requestIOProcessing(boost::bind(&ScheduledCreditTask::sendCredit, this)); + } } void sendCredit() { if ( !sessionState.processSendCredit(0) ) { QPID_LOG(warning, sessionState.getId() << ": Reschedule sending credit"); - setupNextFire(); + reset(); timer.add(this); } } @@ -268,7 +269,7 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id) if (rateFlowcontrol && frame.getBof() && frame.getBos()) { if ( !processSendCredit(1) ) { QPID_LOG(debug, getId() << ": Schedule sending credit"); - sys::Timer& timer = getBroker().getTimer(); + Timer& timer = getBroker().getTimer(); // Use heuristic for scheduled credit of time for 50 messages, but not longer than 500ms sys::Duration d = std::min(sys::TIME_SEC * 50 / rateFlowcontrol->getRate(), 500 * sys::TIME_MSEC); flowControlTimer = new ScheduledCreditTask(d, timer, *this); diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h index 1d000fca5f..f9d35e2aac 100644 --- a/cpp/src/qpid/broker/SessionState.h +++ b/cpp/src/qpid/broker/SessionState.h @@ -48,10 +48,6 @@ namespace framing { class AMQP_ClientProxy; } -namespace sys { -struct TimerTask; -} - namespace broker { class Broker; @@ -60,6 +56,7 @@ class Message; class SessionHandler; class SessionManager; class RateFlowcontrol; +struct TimerTask; /** * Broker-side session state includes session's handler chains, which @@ -156,7 +153,7 @@ class SessionState : public qpid::SessionState, // State used for producer flow control (rate limited) qpid::sys::Mutex rateLock; boost::scoped_ptr<RateFlowcontrol> rateFlowcontrol; - boost::intrusive_ptr<sys::TimerTask> flowControlTimer; + boost::intrusive_ptr<TimerTask> flowControlTimer; friend class SessionManager; }; diff --git a/cpp/src/qpid/cluster/ExpiryPolicy.cpp b/cpp/src/qpid/cluster/ExpiryPolicy.cpp index 06bd3d3343..348963f901 100644 --- a/cpp/src/qpid/cluster/ExpiryPolicy.cpp +++ b/cpp/src/qpid/cluster/ExpiryPolicy.cpp @@ -21,19 +21,19 @@ #include "ExpiryPolicy.h" #include "Multicaster.h" -#include "qpid/broker/Message.h" #include "qpid/framing/ClusterMessageExpiredBody.h" #include "qpid/sys/Time.h" -#include "qpid/sys/Timer.h" +#include "qpid/broker/Message.h" +#include "qpid/broker/Timer.h" #include "qpid/log/Statement.h" namespace qpid { namespace cluster { -ExpiryPolicy::ExpiryPolicy(Multicaster& m, const MemberId& id, sys::Timer& t) +ExpiryPolicy::ExpiryPolicy(Multicaster& m, const MemberId& id, broker::Timer& t) : expiryId(0), expiredPolicy(new Expired), mcast(m), memberId(id), timer(t) {} -struct ExpiryTask : public sys::TimerTask { +struct ExpiryTask : public broker::TimerTask { ExpiryTask(const boost::intrusive_ptr<ExpiryPolicy>& policy, uint64_t id, sys::AbsTime when) : TimerTask(when), expiryPolicy(policy), expiryId(id) {} void fire() { expiryPolicy->sendExpire(expiryId); } diff --git a/cpp/src/qpid/cluster/ExpiryPolicy.h b/cpp/src/qpid/cluster/ExpiryPolicy.h index bf35f4ca6d..c147e54796 100644 --- a/cpp/src/qpid/cluster/ExpiryPolicy.h +++ b/cpp/src/qpid/cluster/ExpiryPolicy.h @@ -33,11 +33,8 @@ namespace qpid { namespace broker { -class Message; -} - -namespace sys { class Timer; +class Message; } namespace cluster { @@ -49,7 +46,7 @@ class Multicaster; class ExpiryPolicy : public broker::ExpiryPolicy { public: - ExpiryPolicy(Multicaster&, const MemberId&, sys::Timer&); + ExpiryPolicy(Multicaster&, const MemberId&, broker::Timer&); void willExpire(broker::Message&); bool hasExpired(broker::Message&); @@ -81,7 +78,7 @@ class ExpiryPolicy : public broker::ExpiryPolicy boost::intrusive_ptr<Expired> expiredPolicy; Multicaster& mcast; MemberId memberId; - sys::Timer& timer; + broker::Timer& timer; }; }} // namespace qpid::cluster diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp index 76859dd77f..75f7453058 100644 --- a/cpp/src/qpid/management/ManagementAgent.cpp +++ b/cpp/src/qpid/management/ManagementAgent.cpp @@ -61,6 +61,7 @@ ManagementAgent::ManagementAgent () : ManagementAgent::~ManagementAgent () { + timer.stop(); { Mutex::ScopedLock lock (userLock); @@ -88,10 +89,9 @@ void ManagementAgent::configure(const string& _dataDir, uint16_t _interval, dataDir = _dataDir; interval = _interval; broker = _broker; - timer = &_broker->getTimer(); threadPoolSize = _threads; ManagementObject::maxThreads = threadPoolSize; - timer->add (new Periodic(*this, interval)); + timer.add (intrusive_ptr<TimerTask> (new Periodic(*this, interval))); // Get from file or generate and save to file. if (dataDir.empty()) @@ -218,7 +218,7 @@ ManagementAgent::Periodic::~Periodic () {} void ManagementAgent::Periodic::fire () { - agent.timer->add (new Periodic (agent, agent.interval)); + agent.timer.add (intrusive_ptr<TimerTask> (new Periodic (agent, agent.interval))); agent.periodicProcessing (); } diff --git a/cpp/src/qpid/management/ManagementAgent.h b/cpp/src/qpid/management/ManagementAgent.h index ca89c1f8fb..34d53f778d 100644 --- a/cpp/src/qpid/management/ManagementAgent.h +++ b/cpp/src/qpid/management/ManagementAgent.h @@ -24,9 +24,9 @@ #include "qpid/broker/BrokerImportExport.h" #include "qpid/Options.h" #include "qpid/broker/Exchange.h" +#include "qpid/broker/Timer.h" #include "qpid/framing/Uuid.h" #include "qpid/sys/Mutex.h" -#include "qpid/sys/Timer.h" #include "qpid/broker/ConnectionToken.h" #include "ManagementObject.h" #include "ManagementEvent.h" @@ -98,7 +98,7 @@ public: void disallow(const std::string& className, const std::string& methodName, const std::string& message); private: - struct Periodic : public qpid::sys::TimerTask + struct Periodic : public qpid::broker::TimerTask { ManagementAgent& agent; @@ -183,12 +183,12 @@ private: framing::Uuid uuid; sys::Mutex addLock; sys::Mutex userLock; + qpid::broker::Timer timer; qpid::broker::Exchange::shared_ptr mExchange; qpid::broker::Exchange::shared_ptr dExchange; std::string dataDir; uint16_t interval; qpid::broker::Broker* broker; - qpid::sys::Timer* timer; uint16_t bootSequence; uint32_t nextObjectId; uint32_t brokerBank; diff --git a/cpp/src/qpid/sys/DispatchHandle.h b/cpp/src/qpid/sys/DispatchHandle.h index 860665877c..916d4c641a 100644 --- a/cpp/src/qpid/sys/DispatchHandle.h +++ b/cpp/src/qpid/sys/DispatchHandle.h @@ -38,14 +38,14 @@ class DispatchHandleRef; * you need to: * * - Subclass IOHandle, in the constructor supply an appropriate - * IOHandlerPrivate object for the platform. - * + * IOHandlerPrivate object for the platform. + * * - Construct a DispatchHandle passing it your IOHandle and * callback functions for read, write and disconnect events. * * - Ensure the DispatchHandle is not deleted until the poller is no longer using it. * TODO: astitcher document DispatchHandleRef to simplify this. - * + * * When an event occurs on the handle, the poller calls the relevant callback and * stops watching that handle. Your callback can call rewatch() or related functions * to re-enable polling. diff --git a/cpp/src/qpid/sys/Timer.cpp b/cpp/src/qpid/sys/Timer.cpp index fd42d7d62e..6967d812ae 100644 --- a/cpp/src/qpid/sys/Timer.cpp +++ b/cpp/src/qpid/sys/Timer.cpp @@ -30,14 +30,12 @@ namespace qpid { namespace sys { TimerTask::TimerTask(Duration timeout) : - sortTime(AbsTime::FarFuture()), period(timeout), nextFireTime(AbsTime::now(), timeout), cancelled(false) {} TimerTask::TimerTask(AbsTime time) : - sortTime(AbsTime::FarFuture()), period(0), nextFireTime(time), cancelled(false) @@ -62,7 +60,7 @@ void TimerTask::setupNextFire() { } // Only allow tasks to be delayed -void TimerTask::restart() { nextFireTime = max(nextFireTime, AbsTime(AbsTime::now(), period)); } +void TimerTask::restart() { nextFireTime = AbsTime(AbsTime::now(), period); } void TimerTask::delayTill(AbsTime time) { period = 0; nextFireTime = max(nextFireTime, time); } void TimerTask::cancel() { @@ -93,7 +91,7 @@ void Timer::run() tasks.pop(); { ScopedLock<Mutex> l(t->callbackLock); - if (t->cancelled) { + if (t->isCancelled()) { continue; } else if(t->readyToFire()) { Monitor::ScopedUnlock u(monitor); @@ -102,9 +100,6 @@ void Timer::run() } else { // If the timer was adjusted into the future it might no longer // be the next event, so push and then get top to make sure - // You can only push events into the future - assert(!(t->nextFireTime < t->sortTime)); - t->sortTime = t->nextFireTime; tasks.push(t); } } @@ -116,7 +111,6 @@ void Timer::run() void Timer::add(intrusive_ptr<TimerTask> task) { Monitor::ScopedLock l(monitor); - task->sortTime = task->nextFireTime; tasks.push(task); monitor.notify(); } @@ -145,7 +139,7 @@ bool operator<(const intrusive_ptr<TimerTask>& a, const intrusive_ptr<TimerTask>& b) { // Lower priority if time is later - return a.get() && b.get() && a->sortTime > b->sortTime; + return a.get() && b.get() && a->nextFireTime > b->nextFireTime; } }} diff --git a/cpp/src/qpid/sys/Timer.h b/cpp/src/qpid/sys/Timer.h index fc7491d5ed..b5bf5d8a4c 100644 --- a/cpp/src/qpid/sys/Timer.h +++ b/cpp/src/qpid/sys/Timer.h @@ -42,7 +42,6 @@ class TimerTask : public RefCounted { friend bool operator<(const boost::intrusive_ptr<TimerTask>&, const boost::intrusive_ptr<TimerTask>&); - AbsTime sortTime; Duration period; AbsTime nextFireTime; Mutex callbackLock; @@ -72,7 +71,7 @@ bool operator<(const boost::intrusive_ptr<TimerTask>& a, const boost::intrusive_ptr<TimerTask>& b); class Timer : private Runnable { - qpid::sys::Monitor monitor; + qpid::sys::Monitor monitor; std::priority_queue<boost::intrusive_ptr<TimerTask> > tasks; qpid::sys::Thread runner; bool active; |