diff options
author | Gordon Sim <gsim@apache.org> | 2008-10-10 16:54:54 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2008-10-10 16:54:54 +0000 |
commit | d7c677547f17a51f8c97ea10b7516fd44ef8d1d2 (patch) | |
tree | 1f32b621fd6abd250bf074c52f1803c0d826654f /qpid/cpp | |
parent | f042c7b950adcfa1ac9525a94925a7ce741afc50 (diff) | |
download | qpid-python-d7c677547f17a51f8c97ea10b7516fd44ef8d1d2.tar.gz |
Handle ttl in messages transfers received by the broker 7 added test for it
Moved Timer instance from DtxManager to Broker so it can be shared
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@703521 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.cpp | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.h | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/DtxManager.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/DtxManager.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Message.cpp | 26 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Message.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 8 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.cpp | 15 |
8 files changed, 56 insertions, 10 deletions
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index 9a3925b053..4074723c28 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -136,6 +136,7 @@ Broker::Broker(const Broker::Options& conf) : dataDir(conf.noDataDir ? std::string () : conf.dataDir), links(this), factory(new ConnectionFactory(*this)), + dtxManager(timer), sessionManager( qpid::SessionState::Configuration( conf.replayFlushLimit*1024, // convert kb to bytes. diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h index fec32d620d..a15440bc0e 100644 --- a/qpid/cpp/src/qpid/broker/Broker.h +++ b/qpid/cpp/src/qpid/broker/Broker.h @@ -33,6 +33,7 @@ #include "SessionManager.h" #include "Vhost.h" #include "System.h" +#include "Timer.h" #include "qpid/management/Manageable.h" #include "qpid/management/ManagementBroker.h" #include "qmf/org/apache/qpid/broker/Broker.h" @@ -97,13 +98,14 @@ class Broker : public sys::Runnable, public Plugin::Target, management::ManagementAgent::Singleton managementAgentSingleton; std::vector< boost::shared_ptr<sys::ProtocolFactory> > protocolFactories; MessageStore* store; - AclModule* acl; + AclModule* acl; DataDir dataDir; QueueRegistry queues; ExchangeRegistry exchanges; LinkRegistry links; boost::shared_ptr<sys::ConnectionCodec::Factory> factory; + Timer timer; DtxManager dtxManager; SessionManager sessionManager; management::ManagementAgent* managementAgent; @@ -195,6 +197,8 @@ class Broker : public sys::Runnable, public Plugin::Target, boost::shared_ptr<sys::ConnectionCodec::Factory> getConnectionFactory() { return factory; } void setConnectionFactory(boost::shared_ptr<sys::ConnectionCodec::Factory> f) { factory = f; } + Timer& getTimer() { return timer; } + boost::function<std::vector<Url> ()> getKnownBrokers; }; diff --git a/qpid/cpp/src/qpid/broker/DtxManager.cpp b/qpid/cpp/src/qpid/broker/DtxManager.cpp index 942dbdcbc6..135e36d84a 100644 --- a/qpid/cpp/src/qpid/broker/DtxManager.cpp +++ b/qpid/cpp/src/qpid/broker/DtxManager.cpp @@ -33,7 +33,7 @@ using qpid::ptr_map_ptr; using namespace qpid::broker; using namespace qpid::framing; -DtxManager::DtxManager() : store(0) {} +DtxManager::DtxManager(Timer& t) : store(0), timer(t) {} DtxManager::~DtxManager() {} diff --git a/qpid/cpp/src/qpid/broker/DtxManager.h b/qpid/cpp/src/qpid/broker/DtxManager.h index fa5c62c233..a61e8610f0 100644 --- a/qpid/cpp/src/qpid/broker/DtxManager.h +++ b/qpid/cpp/src/qpid/broker/DtxManager.h @@ -47,14 +47,14 @@ class DtxManager{ WorkMap work; TransactionalStore* store; qpid::sys::Mutex lock; - Timer timer; + Timer& timer; void remove(const std::string& xid); DtxWorkRecord* getWork(const std::string& xid); DtxWorkRecord* createWork(std::string xid); public: - DtxManager(); + DtxManager(Timer&); ~DtxManager(); void start(const std::string& xid, DtxBuffer::shared_ptr work); void join(const std::string& xid, DtxBuffer::shared_ptr work); diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp index 7d02fb3d3c..e5a167bd4e 100644 --- a/qpid/cpp/src/qpid/broker/Message.cpp +++ b/qpid/cpp/src/qpid/broker/Message.cpp @@ -30,15 +30,22 @@ #include "qpid/framing/TypeFilter.h" #include "qpid/log/Statement.h" +#include <time.h> + using boost::intrusive_ptr; using namespace qpid::broker; using namespace qpid::framing; +using qpid::sys::AbsTime; +using qpid::sys::Duration; +using qpid::sys::TIME_MSEC; +using qpid::sys::FAR_FUTURE; using std::string; TransferAdapter Message::TRANSFER; Message::Message(const SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), loaded(false), -staged(false), forcePersistentPolicy(false), publisher(0), adapter(0) {} + staged(false), forcePersistentPolicy(false), publisher(0), adapter(0), + expiration(FAR_FUTURE) {} Message::~Message() { @@ -297,3 +304,20 @@ void Message::addTraceId(const std::string& id) } } } + +void Message::setTimestamp() +{ + time_t now = ::time(0); + DeliveryProperties* props = getProperties<DeliveryProperties>(); + props->setTimestamp(now); + if (props->getTtl()) { + //set expiration (nb: ttl is in millisecs, time_t is in secs) + props->setExpiration(now + (props->getTtl()/1000)); + expiration = AbsTime(AbsTime::now(), Duration(props->getTtl() * TIME_MSEC)); + } +} + +bool Message::hasExpired() const +{ + return expiration < AbsTime::now(); +} diff --git a/qpid/cpp/src/qpid/broker/Message.h b/qpid/cpp/src/qpid/broker/Message.h index 8eb1b0b31c..f6eec361bb 100644 --- a/qpid/cpp/src/qpid/broker/Message.h +++ b/qpid/cpp/src/qpid/broker/Message.h @@ -30,6 +30,7 @@ #include "MessageAdapter.h" #include "qpid/framing/amqp_types.h" #include "qpid/sys/Mutex.h" +#include "qpid/sys/Time.h" namespace qpid { @@ -70,6 +71,8 @@ public: const framing::FieldTable* getApplicationHeaders() const; bool isPersistent(); bool requiresAccept(); + void setTimestamp(); + bool hasExpired() const; framing::FrameSet& getFrames() { return frames; } const framing::FrameSet& getFrames() const { return frames; } @@ -147,6 +150,7 @@ public: bool forcePersistentPolicy; // used to force message as durable, via a broker policy ConnectionToken* publisher; mutable MessageAdapter* adapter; + qpid::sys::AbsTime expiration; static TransferAdapter TRANSFER; diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index ea5f7a0ba9..4382ac2e57 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -264,6 +264,12 @@ bool Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c) return false; } else { QueuedMessage msg = messages.front(); + if (msg.payload->hasExpired()) { + QPID_LOG(debug, "Message expired from queue '" << name << "'"); + popAndDequeue(); + continue; + } + if (!optimisticConsume && store && !msg.payload->isEnqueueComplete()) { QPID_LOG(debug, "Messages not ready to dispatch on queue '" << name << "'"); addListener(c); @@ -294,7 +300,7 @@ bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr c) { QueuedMessage msg(this); while (seek(msg, c)) { - if (c->filter(msg.payload)) { + if (c->filter(msg.payload) && !msg.payload->hasExpired()) { if (c->accept(msg.payload)) { //consumer wants the message c->position = msg.position; diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index 065e15543b..6fb38eb674 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -341,6 +341,11 @@ void SemanticState::handle(intrusive_ptr<Message> msg) { } } +namespace +{ +const std::string nullstring; +} + void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) { std::string exchangeName = msg->getExchangeName(); //TODO: the following should be hidden behind message (using MessageAdapter or similar) @@ -352,6 +357,7 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) { if (!msg->hasProperties<DeliveryProperties>() || msg->getProperties<DeliveryProperties>()->getExchange().empty()) msg->getProperties<DeliveryProperties>()->setExchange(exchangeName); + msg->setTimestamp(); } if (!cacheExchange || cacheExchange->getName() != exchangeName){ cacheExchange = session.getBroker().getExchanges().get(exchangeName); @@ -359,18 +365,19 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) { /* verify the userid if specified: */ std::string id = - msg->hasProperties<MessageProperties>()? msg->getProperties<MessageProperties>()->getUserId():""; + msg->hasProperties<MessageProperties>() ? msg->getProperties<MessageProperties>()->getUserId() : nullstring; if (authMsg && !id.empty() && id != userID ) { - QPID_LOG(debug, "user id : " << userID << " msgProps.getUserID() " << msg->getProperties<MessageProperties>()->getUserId()); - throw UnauthorizedAccessException("user id in the message is not the same id used to authenticate the connection"); + QPID_LOG(debug, "authorised user id : " << userID << " but user id in message declared as " << id); + throw UnauthorizedAccessException(QPID_MSG("authorised user id : " << userID << " but user id in message declared as " << id)); } if (acl && acl->doTransferAcl()) { if (!acl->authorise(getSession().getConnection().getUserId(),acl::PUBLISH,acl::EXCHANGE,exchangeName, msg->getRoutingKey() )) - throw NotAllowedException("ACL denied exhange publish request"); + throw NotAllowedException(QPID_MSG(getSession().getConnection().getUserId() << " cannot publish to " << + exchangeName << " with routing-key " << msg->getRoutingKey())); } cacheExchange->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders()); |