summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/Broker.h16
-rw-r--r--cpp/src/qpid/broker/Connection.cpp51
-rw-r--r--cpp/src/qpid/broker/Connection.h4
-rw-r--r--cpp/src/qpid/broker/DtxManager.cpp5
-rw-r--r--cpp/src/qpid/broker/DtxManager.h8
-rw-r--r--cpp/src/qpid/broker/DtxTimeout.h6
-rw-r--r--cpp/src/qpid/broker/LinkRegistry.cpp10
-rw-r--r--cpp/src/qpid/broker/LinkRegistry.h6
-rw-r--r--cpp/src/qpid/broker/NullMessageStore.cpp2
-rw-r--r--cpp/src/qpid/broker/QueueCleaner.cpp8
-rw-r--r--cpp/src/qpid/broker/QueueCleaner.h12
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp17
-rw-r--r--cpp/src/qpid/broker/SessionState.h7
-rw-r--r--cpp/src/qpid/cluster/ExpiryPolicy.cpp8
-rw-r--r--cpp/src/qpid/cluster/ExpiryPolicy.h9
-rw-r--r--cpp/src/qpid/management/ManagementAgent.cpp6
-rw-r--r--cpp/src/qpid/management/ManagementAgent.h6
-rw-r--r--cpp/src/qpid/sys/DispatchHandle.h6
-rw-r--r--cpp/src/qpid/sys/Timer.cpp12
-rw-r--r--cpp/src/qpid/sys/Timer.h3
20 files changed, 103 insertions, 99 deletions
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index 1e0ac64e01..8f4621bb39 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -36,6 +36,7 @@
#include "QueueEvents.h"
#include "Vhost.h"
#include "System.h"
+#include "Timer.h"
#include "ExpiryPolicy.h"
#include "qpid/management/Manageable.h"
#include "qpid/management/ManagementAgent.h"
@@ -48,7 +49,6 @@
#include "qpid/framing/OutputHandler.h"
#include "qpid/framing/ProtocolInitiation.h"
#include "qpid/sys/Runnable.h"
-#include "qpid/sys/Timer.h"
#include "qpid/RefCounted.h"
#include "AclModule.h"
@@ -112,14 +112,13 @@ public:
std::string knownHosts;
uint32_t maxSessionRate;
};
-
+
private:
typedef std::map<std::string, boost::shared_ptr<sys::ProtocolFactory> > ProtocolFactoryMap;
void declareStandardExchange(const std::string& name, const std::string& type);
boost::shared_ptr<sys::Poller> poller;
- sys::Timer timer;
Options config;
ProtocolFactoryMap protocolFactories;
std::auto_ptr<MessageStore> store;
@@ -130,6 +129,7 @@ public:
ExchangeRegistry exchanges;
LinkRegistry links;
boost::shared_ptr<sys::ConnectionCodec::Factory> factory;
+ Timer timer;
DtxManager dtxManager;
SessionManager sessionManager;
management::ManagementAgent* managementAgent;
@@ -145,6 +145,8 @@ public:
boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
public:
+
+
virtual ~Broker();
QPID_BROKER_EXTERN Broker(const Options& configuration);
@@ -183,7 +185,7 @@ public:
void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) { expiryPolicy = e; }
boost::intrusive_ptr<ExpiryPolicy> getExpiryPolicy() { return expiryPolicy; }
-
+
SessionManager& getSessionManager() { return sessionManager; }
const std::string& getFederationTag() const { return federationTag; }
@@ -192,7 +194,7 @@ public:
management::Manageable::status_t ManagementMethod (uint32_t methodId,
management::Args& args,
std::string& text);
-
+
/** Add to the broker's protocolFactorys */
void registerProtocolFactory(const std::string& name, boost::shared_ptr<sys::ProtocolFactory>);
@@ -224,7 +226,7 @@ public:
boost::shared_ptr<sys::ConnectionCodec::Factory> getConnectionFactory() { return factory; }
void setConnectionFactory(boost::shared_ptr<sys::ConnectionCodec::Factory> f) { factory = f; }
- sys::Timer& getTimer() { return timer; }
+ Timer& getTimer() { return timer; }
boost::function<std::vector<Url> ()> getKnownBrokers;
@@ -237,5 +239,7 @@ public:
};
}}
+
+
#endif /*!_Broker_*/
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index c0e9429ba9..a54bcc6db9 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -49,25 +49,35 @@ namespace _qmf = qmf::org::apache::qpid::broker;
namespace qpid {
namespace broker {
-struct ConnectionTimeoutTask : public sys::TimerTask {
- sys::Timer& timer;
+struct ConnectionTimeoutTask : public TimerTask {
+ Timer& timer;
Connection& connection;
+ AbsTime expires;
- ConnectionTimeoutTask(uint16_t hb, sys::Timer& t, Connection& c) :
+ ConnectionTimeoutTask(uint16_t hb, Timer& t, Connection& c) :
TimerTask(Duration(hb*2*TIME_SEC)),
timer(t),
- connection(c)
+ connection(c),
+ expires(AbsTime::now(), duration)
{}
- void touch() {
- restart();
+ void touch()
+ {
+ expires = AbsTime(AbsTime::now(), duration);
}
void fire() {
- // If we get here then we've not received any traffic in the timeout period
- // Schedule closing the connection for the io thread
- QPID_LOG(error, "Connection timed out: closing");
- connection.abort();
+ // This is the best we can currently do to avoid a destruction/fire race
+ if (isCancelled()) return;
+ if (expires < AbsTime::now()) {
+ // If we get here then we've not received any traffic in the timeout period
+ // Schedule closing the connection for the io thread
+ QPID_LOG(error, "Connection timed out: closing");
+ connection.abort();
+ } else {
+ reset();
+ timer.add(this);
+ }
}
};
@@ -328,22 +338,25 @@ void Connection::setSecureConnection(SecureConnection* s)
adapter.setSecureConnection(s);
}
-struct ConnectionHeartbeatTask : public sys::TimerTask {
- sys::Timer& timer;
+struct ConnectionHeartbeatTask : public TimerTask {
+ Timer& timer;
Connection& connection;
- ConnectionHeartbeatTask(uint16_t hb, sys::Timer& t, Connection& c) :
+ ConnectionHeartbeatTask(uint16_t hb, Timer& t, Connection& c) :
TimerTask(Duration(hb*TIME_SEC)),
timer(t),
connection(c)
{}
void fire() {
- // Setup next firing
- setupNextFire();
- timer.add(this);
-
- // Send Heartbeat
- connection.sendHeartbeat();
+ // This is the best we can currently do to avoid a destruction/fire race
+ if (!isCancelled()) {
+ // Setup next firing
+ reset();
+ timer.add(this);
+
+ // Send Heartbeat
+ connection.sendHeartbeat();
+ }
}
};
diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h
index f3a6cb2b7a..17bc8f0970 100644
--- a/cpp/src/qpid/broker/Connection.h
+++ b/cpp/src/qpid/broker/Connection.h
@@ -152,8 +152,8 @@ class Connection : public sys::ConnectionInputHandler,
qmf::org::apache::qpid::broker::Connection* mgmtObject;
LinkRegistry& links;
management::ManagementAgent* agent;
- sys::Timer& timer;
- boost::intrusive_ptr<sys::TimerTask> heartbeatTimer;
+ Timer& timer;
+ boost::intrusive_ptr<TimerTask> heartbeatTimer;
boost::intrusive_ptr<ConnectionTimeoutTask> timeoutTimer;
ErrorListener* errorListener;
bool shadow;
diff --git a/cpp/src/qpid/broker/DtxManager.cpp b/cpp/src/qpid/broker/DtxManager.cpp
index a9bdb5e152..11e16ec837 100644
--- a/cpp/src/qpid/broker/DtxManager.cpp
+++ b/cpp/src/qpid/broker/DtxManager.cpp
@@ -33,7 +33,7 @@ using qpid::ptr_map_ptr;
using namespace qpid::broker;
using namespace qpid::framing;
-DtxManager::DtxManager(sys::Timer& t) : store(0), timer(t) {}
+DtxManager::DtxManager(Timer& t) : store(0), timer(t) {}
DtxManager::~DtxManager() {}
@@ -130,7 +130,8 @@ void DtxManager::setTimeout(const std::string& xid, uint32_t secs)
}
timeout = intrusive_ptr<DtxTimeout>(new DtxTimeout(secs, *this, xid));
record->setTimeout(timeout);
- timer.add(timeout);
+ timer.add(boost::static_pointer_cast<TimerTask>(timeout));
+
}
uint32_t DtxManager::getTimeout(const std::string& xid)
diff --git a/cpp/src/qpid/broker/DtxManager.h b/cpp/src/qpid/broker/DtxManager.h
index bfea5e6daf..a61e8610f0 100644
--- a/cpp/src/qpid/broker/DtxManager.h
+++ b/cpp/src/qpid/broker/DtxManager.h
@@ -24,9 +24,9 @@
#include <boost/ptr_container/ptr_map.hpp>
#include "DtxBuffer.h"
#include "DtxWorkRecord.h"
+#include "Timer.h"
#include "TransactionalStore.h"
#include "qpid/framing/amqp_types.h"
-#include "qpid/sys/Timer.h"
#include "qpid/sys/Mutex.h"
namespace qpid {
@@ -35,7 +35,7 @@ namespace broker {
class DtxManager{
typedef boost::ptr_map<std::string, DtxWorkRecord> WorkMap;
- struct DtxCleanup : public sys::TimerTask
+ struct DtxCleanup : public TimerTask
{
DtxManager& mgr;
const std::string& xid;
@@ -47,14 +47,14 @@ class DtxManager{
WorkMap work;
TransactionalStore* store;
qpid::sys::Mutex lock;
- qpid::sys::Timer& timer;
+ Timer& timer;
void remove(const std::string& xid);
DtxWorkRecord* getWork(const std::string& xid);
DtxWorkRecord* createWork(std::string xid);
public:
- DtxManager(qpid::sys::Timer&);
+ DtxManager(Timer&);
~DtxManager();
void start(const std::string& xid, DtxBuffer::shared_ptr work);
void join(const std::string& xid, DtxBuffer::shared_ptr work);
diff --git a/cpp/src/qpid/broker/DtxTimeout.h b/cpp/src/qpid/broker/DtxTimeout.h
index 680a210e4f..6e949eab0d 100644
--- a/cpp/src/qpid/broker/DtxTimeout.h
+++ b/cpp/src/qpid/broker/DtxTimeout.h
@@ -22,7 +22,7 @@
#define _DtxTimeout_
#include "qpid/Exception.h"
-#include "qpid/sys/Timer.h"
+#include "Timer.h"
namespace qpid {
namespace broker {
@@ -31,12 +31,12 @@ class DtxManager;
struct DtxTimeoutException : public Exception {};
-struct DtxTimeout : public sys::TimerTask
+struct DtxTimeout : public TimerTask
{
const uint32_t timeout;
DtxManager& mgr;
const std::string xid;
-
+
DtxTimeout(uint32_t timeout, DtxManager& mgr, const std::string& xid);
void fire();
};
diff --git a/cpp/src/qpid/broker/LinkRegistry.cpp b/cpp/src/qpid/broker/LinkRegistry.cpp
index a82e828138..c6e10f0f8c 100644
--- a/cpp/src/qpid/broker/LinkRegistry.cpp
+++ b/cpp/src/qpid/broker/LinkRegistry.cpp
@@ -36,12 +36,10 @@ namespace _qmf = qmf::org::apache::qpid::broker;
#define LINK_MAINT_INTERVAL 2
-LinkRegistry::LinkRegistry (Broker* _broker) :
- broker(_broker), timer(broker->getTimer()),
- parent(0), store(0), passive(false), passiveChanged(false),
- realm(broker ? broker->getOptions().realm : "")
+LinkRegistry::LinkRegistry (Broker* _broker) : broker(_broker), parent(0), store(0), passive(false), passiveChanged(false),
+ realm(broker ? broker->getOptions().realm : "")
{
- timer.add (new Periodic(*this));
+ timer.add (intrusive_ptr<TimerTask> (new Periodic(*this)));
}
LinkRegistry::Periodic::Periodic (LinkRegistry& _links) :
@@ -50,7 +48,7 @@ LinkRegistry::Periodic::Periodic (LinkRegistry& _links) :
void LinkRegistry::Periodic::fire ()
{
links.periodicMaintenance ();
- links.timer.add (new Periodic(links));
+ links.timer.add (intrusive_ptr<TimerTask> (new Periodic(links)));
}
void LinkRegistry::periodicMaintenance ()
diff --git a/cpp/src/qpid/broker/LinkRegistry.h b/cpp/src/qpid/broker/LinkRegistry.h
index 1caffb9232..07fff5b979 100644
--- a/cpp/src/qpid/broker/LinkRegistry.h
+++ b/cpp/src/qpid/broker/LinkRegistry.h
@@ -25,9 +25,9 @@
#include <map>
#include "Bridge.h"
#include "MessageStore.h"
+#include "Timer.h"
#include "qpid/Address.h"
#include "qpid/sys/Mutex.h"
-#include "qpid/sys/Timer.h"
#include "qpid/management/Manageable.h"
#include <boost/shared_ptr.hpp>
@@ -41,7 +41,7 @@ namespace broker {
// Declare a timer task to manage the establishment of link connections and the
// re-establishment of lost link connections.
- struct Periodic : public sys::TimerTask
+ struct Periodic : public TimerTask
{
LinkRegistry& links;
@@ -62,7 +62,7 @@ namespace broker {
qpid::sys::Mutex lock;
Broker* broker;
- sys::Timer& timer;
+ Timer timer;
management::Manageable* parent;
MessageStore* store;
bool passive;
diff --git a/cpp/src/qpid/broker/NullMessageStore.cpp b/cpp/src/qpid/broker/NullMessageStore.cpp
index f5b0163c0f..62b546b3eb 100644
--- a/cpp/src/qpid/broker/NullMessageStore.cpp
+++ b/cpp/src/qpid/broker/NullMessageStore.cpp
@@ -49,7 +49,7 @@ public:
};
NullMessageStore::NullMessageStore() : nextPersistenceId(1) {
- QPID_LOG(info, "No message store configured, persistence is disabled.");
+ QPID_LOG(info, "No message store configured, persistence is disabled.")
}
bool NullMessageStore::init(const Options* /*options*/) {return true;}
diff --git a/cpp/src/qpid/broker/QueueCleaner.cpp b/cpp/src/qpid/broker/QueueCleaner.cpp
index 814eca6751..0774dce2b7 100644
--- a/cpp/src/qpid/broker/QueueCleaner.cpp
+++ b/cpp/src/qpid/broker/QueueCleaner.cpp
@@ -26,15 +26,15 @@
namespace qpid {
namespace broker {
-QueueCleaner::QueueCleaner(QueueRegistry& q, sys::Timer& t) : queues(q), timer(t) {}
+QueueCleaner::QueueCleaner(QueueRegistry& q, Timer& t) : queues(q), timer(t) {}
void QueueCleaner::start(qpid::sys::Duration p)
{
- task = new Task(*this, p);
+ task = boost::intrusive_ptr<TimerTask>(new Task(*this, p));
timer.add(task);
}
-QueueCleaner::Task::Task(QueueCleaner& p, qpid::sys::Duration d) : sys::TimerTask(d), parent(p) {}
+QueueCleaner::Task::Task(QueueCleaner& p, qpid::sys::Duration d) : TimerTask(d), parent(p) {}
void QueueCleaner::Task::fire()
{
@@ -44,7 +44,7 @@ void QueueCleaner::Task::fire()
void QueueCleaner::fired()
{
queues.eachQueue(boost::bind(&Queue::purgeExpired, _1));
- task->setupNextFire();
+ task->reset();
timer.add(task);
}
diff --git a/cpp/src/qpid/broker/QueueCleaner.h b/cpp/src/qpid/broker/QueueCleaner.h
index 0fbb12a5d4..007826f33e 100644
--- a/cpp/src/qpid/broker/QueueCleaner.h
+++ b/cpp/src/qpid/broker/QueueCleaner.h
@@ -23,7 +23,7 @@
*/
#include "BrokerImportExport.h"
-#include "qpid/sys/Timer.h"
+#include "Timer.h"
namespace qpid {
namespace broker {
@@ -35,10 +35,10 @@ class QueueRegistry;
class QueueCleaner
{
public:
- QPID_BROKER_EXTERN QueueCleaner(QueueRegistry& queues, sys::Timer& timer);
+ QPID_BROKER_EXTERN QueueCleaner(QueueRegistry& queues, Timer& timer);
QPID_BROKER_EXTERN void start(qpid::sys::Duration period);
private:
- class Task : public sys::TimerTask
+ class Task : public TimerTask
{
public:
Task(QueueCleaner& parent, qpid::sys::Duration duration);
@@ -46,10 +46,10 @@ class QueueCleaner
private:
QueueCleaner& parent;
};
-
- boost::intrusive_ptr<sys::TimerTask> task;
+
+ boost::intrusive_ptr<TimerTask> task;
QueueRegistry& queues;
- sys::Timer& timer;
+ Timer& timer;
void fired();
};
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index 2b8048ea3d..b465a65bd3 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -25,7 +25,7 @@
#include "SessionManager.h"
#include "SessionHandler.h"
#include "RateFlowcontrol.h"
-#include "qpid/sys/Timer.h"
+#include "Timer.h"
#include "qpid/framing/AMQContentBody.h"
#include "qpid/framing/AMQHeaderBody.h"
#include "qpid/framing/AMQMethodBody.h"
@@ -49,7 +49,6 @@ using qpid::management::ManagementObject;
using qpid::management::Manageable;
using qpid::management::Args;
using qpid::sys::AbsTime;
-//using qpid::sys::Timer;
namespace _qmf = qmf::org::apache::qpid::broker;
SessionState::SessionState(
@@ -207,10 +206,10 @@ void SessionState::handleCommand(framing::AMQMethodBody* method, const SequenceN
}
}
-struct ScheduledCreditTask : public sys::TimerTask {
- sys::Timer& timer;
+struct ScheduledCreditTask : public TimerTask {
+ Timer& timer;
SessionState& sessionState;
- ScheduledCreditTask(const qpid::sys::Duration& d, sys::Timer& t,
+ ScheduledCreditTask(const qpid::sys::Duration& d, Timer& t,
SessionState& s) :
TimerTask(d),
timer(t),
@@ -219,13 +218,15 @@ struct ScheduledCreditTask : public sys::TimerTask {
void fire() {
// This is the best we can currently do to avoid a destruction/fire race
- sessionState.getConnection().requestIOProcessing(boost::bind(&ScheduledCreditTask::sendCredit, this));
+ if (!isCancelled()) {
+ sessionState.getConnection().requestIOProcessing(boost::bind(&ScheduledCreditTask::sendCredit, this));
+ }
}
void sendCredit() {
if ( !sessionState.processSendCredit(0) ) {
QPID_LOG(warning, sessionState.getId() << ": Reschedule sending credit");
- setupNextFire();
+ reset();
timer.add(this);
}
}
@@ -268,7 +269,7 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id)
if (rateFlowcontrol && frame.getBof() && frame.getBos()) {
if ( !processSendCredit(1) ) {
QPID_LOG(debug, getId() << ": Schedule sending credit");
- sys::Timer& timer = getBroker().getTimer();
+ Timer& timer = getBroker().getTimer();
// Use heuristic for scheduled credit of time for 50 messages, but not longer than 500ms
sys::Duration d = std::min(sys::TIME_SEC * 50 / rateFlowcontrol->getRate(), 500 * sys::TIME_MSEC);
flowControlTimer = new ScheduledCreditTask(d, timer, *this);
diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h
index 1d000fca5f..f9d35e2aac 100644
--- a/cpp/src/qpid/broker/SessionState.h
+++ b/cpp/src/qpid/broker/SessionState.h
@@ -48,10 +48,6 @@ namespace framing {
class AMQP_ClientProxy;
}
-namespace sys {
-struct TimerTask;
-}
-
namespace broker {
class Broker;
@@ -60,6 +56,7 @@ class Message;
class SessionHandler;
class SessionManager;
class RateFlowcontrol;
+struct TimerTask;
/**
* Broker-side session state includes session's handler chains, which
@@ -156,7 +153,7 @@ class SessionState : public qpid::SessionState,
// State used for producer flow control (rate limited)
qpid::sys::Mutex rateLock;
boost::scoped_ptr<RateFlowcontrol> rateFlowcontrol;
- boost::intrusive_ptr<sys::TimerTask> flowControlTimer;
+ boost::intrusive_ptr<TimerTask> flowControlTimer;
friend class SessionManager;
};
diff --git a/cpp/src/qpid/cluster/ExpiryPolicy.cpp b/cpp/src/qpid/cluster/ExpiryPolicy.cpp
index 06bd3d3343..348963f901 100644
--- a/cpp/src/qpid/cluster/ExpiryPolicy.cpp
+++ b/cpp/src/qpid/cluster/ExpiryPolicy.cpp
@@ -21,19 +21,19 @@
#include "ExpiryPolicy.h"
#include "Multicaster.h"
-#include "qpid/broker/Message.h"
#include "qpid/framing/ClusterMessageExpiredBody.h"
#include "qpid/sys/Time.h"
-#include "qpid/sys/Timer.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/Timer.h"
#include "qpid/log/Statement.h"
namespace qpid {
namespace cluster {
-ExpiryPolicy::ExpiryPolicy(Multicaster& m, const MemberId& id, sys::Timer& t)
+ExpiryPolicy::ExpiryPolicy(Multicaster& m, const MemberId& id, broker::Timer& t)
: expiryId(0), expiredPolicy(new Expired), mcast(m), memberId(id), timer(t) {}
-struct ExpiryTask : public sys::TimerTask {
+struct ExpiryTask : public broker::TimerTask {
ExpiryTask(const boost::intrusive_ptr<ExpiryPolicy>& policy, uint64_t id, sys::AbsTime when)
: TimerTask(when), expiryPolicy(policy), expiryId(id) {}
void fire() { expiryPolicy->sendExpire(expiryId); }
diff --git a/cpp/src/qpid/cluster/ExpiryPolicy.h b/cpp/src/qpid/cluster/ExpiryPolicy.h
index bf35f4ca6d..c147e54796 100644
--- a/cpp/src/qpid/cluster/ExpiryPolicy.h
+++ b/cpp/src/qpid/cluster/ExpiryPolicy.h
@@ -33,11 +33,8 @@
namespace qpid {
namespace broker {
-class Message;
-}
-
-namespace sys {
class Timer;
+class Message;
}
namespace cluster {
@@ -49,7 +46,7 @@ class Multicaster;
class ExpiryPolicy : public broker::ExpiryPolicy
{
public:
- ExpiryPolicy(Multicaster&, const MemberId&, sys::Timer&);
+ ExpiryPolicy(Multicaster&, const MemberId&, broker::Timer&);
void willExpire(broker::Message&);
bool hasExpired(broker::Message&);
@@ -81,7 +78,7 @@ class ExpiryPolicy : public broker::ExpiryPolicy
boost::intrusive_ptr<Expired> expiredPolicy;
Multicaster& mcast;
MemberId memberId;
- sys::Timer& timer;
+ broker::Timer& timer;
};
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp
index 76859dd77f..75f7453058 100644
--- a/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/cpp/src/qpid/management/ManagementAgent.cpp
@@ -61,6 +61,7 @@ ManagementAgent::ManagementAgent () :
ManagementAgent::~ManagementAgent ()
{
+ timer.stop();
{
Mutex::ScopedLock lock (userLock);
@@ -88,10 +89,9 @@ void ManagementAgent::configure(const string& _dataDir, uint16_t _interval,
dataDir = _dataDir;
interval = _interval;
broker = _broker;
- timer = &_broker->getTimer();
threadPoolSize = _threads;
ManagementObject::maxThreads = threadPoolSize;
- timer->add (new Periodic(*this, interval));
+ timer.add (intrusive_ptr<TimerTask> (new Periodic(*this, interval)));
// Get from file or generate and save to file.
if (dataDir.empty())
@@ -218,7 +218,7 @@ ManagementAgent::Periodic::~Periodic () {}
void ManagementAgent::Periodic::fire ()
{
- agent.timer->add (new Periodic (agent, agent.interval));
+ agent.timer.add (intrusive_ptr<TimerTask> (new Periodic (agent, agent.interval)));
agent.periodicProcessing ();
}
diff --git a/cpp/src/qpid/management/ManagementAgent.h b/cpp/src/qpid/management/ManagementAgent.h
index ca89c1f8fb..34d53f778d 100644
--- a/cpp/src/qpid/management/ManagementAgent.h
+++ b/cpp/src/qpid/management/ManagementAgent.h
@@ -24,9 +24,9 @@
#include "qpid/broker/BrokerImportExport.h"
#include "qpid/Options.h"
#include "qpid/broker/Exchange.h"
+#include "qpid/broker/Timer.h"
#include "qpid/framing/Uuid.h"
#include "qpid/sys/Mutex.h"
-#include "qpid/sys/Timer.h"
#include "qpid/broker/ConnectionToken.h"
#include "ManagementObject.h"
#include "ManagementEvent.h"
@@ -98,7 +98,7 @@ public:
void disallow(const std::string& className, const std::string& methodName, const std::string& message);
private:
- struct Periodic : public qpid::sys::TimerTask
+ struct Periodic : public qpid::broker::TimerTask
{
ManagementAgent& agent;
@@ -183,12 +183,12 @@ private:
framing::Uuid uuid;
sys::Mutex addLock;
sys::Mutex userLock;
+ qpid::broker::Timer timer;
qpid::broker::Exchange::shared_ptr mExchange;
qpid::broker::Exchange::shared_ptr dExchange;
std::string dataDir;
uint16_t interval;
qpid::broker::Broker* broker;
- qpid::sys::Timer* timer;
uint16_t bootSequence;
uint32_t nextObjectId;
uint32_t brokerBank;
diff --git a/cpp/src/qpid/sys/DispatchHandle.h b/cpp/src/qpid/sys/DispatchHandle.h
index 860665877c..916d4c641a 100644
--- a/cpp/src/qpid/sys/DispatchHandle.h
+++ b/cpp/src/qpid/sys/DispatchHandle.h
@@ -38,14 +38,14 @@ class DispatchHandleRef;
* you need to:
*
* - Subclass IOHandle, in the constructor supply an appropriate
- * IOHandlerPrivate object for the platform.
- *
+ * IOHandlerPrivate object for the platform.
+ *
* - Construct a DispatchHandle passing it your IOHandle and
* callback functions for read, write and disconnect events.
*
* - Ensure the DispatchHandle is not deleted until the poller is no longer using it.
* TODO: astitcher document DispatchHandleRef to simplify this.
- *
+ *
* When an event occurs on the handle, the poller calls the relevant callback and
* stops watching that handle. Your callback can call rewatch() or related functions
* to re-enable polling.
diff --git a/cpp/src/qpid/sys/Timer.cpp b/cpp/src/qpid/sys/Timer.cpp
index fd42d7d62e..6967d812ae 100644
--- a/cpp/src/qpid/sys/Timer.cpp
+++ b/cpp/src/qpid/sys/Timer.cpp
@@ -30,14 +30,12 @@ namespace qpid {
namespace sys {
TimerTask::TimerTask(Duration timeout) :
- sortTime(AbsTime::FarFuture()),
period(timeout),
nextFireTime(AbsTime::now(), timeout),
cancelled(false)
{}
TimerTask::TimerTask(AbsTime time) :
- sortTime(AbsTime::FarFuture()),
period(0),
nextFireTime(time),
cancelled(false)
@@ -62,7 +60,7 @@ void TimerTask::setupNextFire() {
}
// Only allow tasks to be delayed
-void TimerTask::restart() { nextFireTime = max(nextFireTime, AbsTime(AbsTime::now(), period)); }
+void TimerTask::restart() { nextFireTime = AbsTime(AbsTime::now(), period); }
void TimerTask::delayTill(AbsTime time) { period = 0; nextFireTime = max(nextFireTime, time); }
void TimerTask::cancel() {
@@ -93,7 +91,7 @@ void Timer::run()
tasks.pop();
{
ScopedLock<Mutex> l(t->callbackLock);
- if (t->cancelled) {
+ if (t->isCancelled()) {
continue;
} else if(t->readyToFire()) {
Monitor::ScopedUnlock u(monitor);
@@ -102,9 +100,6 @@ void Timer::run()
} else {
// If the timer was adjusted into the future it might no longer
// be the next event, so push and then get top to make sure
- // You can only push events into the future
- assert(!(t->nextFireTime < t->sortTime));
- t->sortTime = t->nextFireTime;
tasks.push(t);
}
}
@@ -116,7 +111,6 @@ void Timer::run()
void Timer::add(intrusive_ptr<TimerTask> task)
{
Monitor::ScopedLock l(monitor);
- task->sortTime = task->nextFireTime;
tasks.push(task);
monitor.notify();
}
@@ -145,7 +139,7 @@ bool operator<(const intrusive_ptr<TimerTask>& a,
const intrusive_ptr<TimerTask>& b)
{
// Lower priority if time is later
- return a.get() && b.get() && a->sortTime > b->sortTime;
+ return a.get() && b.get() && a->nextFireTime > b->nextFireTime;
}
}}
diff --git a/cpp/src/qpid/sys/Timer.h b/cpp/src/qpid/sys/Timer.h
index fc7491d5ed..b5bf5d8a4c 100644
--- a/cpp/src/qpid/sys/Timer.h
+++ b/cpp/src/qpid/sys/Timer.h
@@ -42,7 +42,6 @@ class TimerTask : public RefCounted {
friend bool operator<(const boost::intrusive_ptr<TimerTask>&,
const boost::intrusive_ptr<TimerTask>&);
- AbsTime sortTime;
Duration period;
AbsTime nextFireTime;
Mutex callbackLock;
@@ -72,7 +71,7 @@ bool operator<(const boost::intrusive_ptr<TimerTask>& a,
const boost::intrusive_ptr<TimerTask>& b);
class Timer : private Runnable {
- qpid::sys::Monitor monitor;
+ qpid::sys::Monitor monitor;
std::priority_queue<boost::intrusive_ptr<TimerTask> > tasks;
qpid::sys::Thread runner;
bool active;