summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-10-10 16:54:54 +0000
committerGordon Sim <gsim@apache.org>2008-10-10 16:54:54 +0000
commitd7c677547f17a51f8c97ea10b7516fd44ef8d1d2 (patch)
tree1f32b621fd6abd250bf074c52f1803c0d826654f /qpid/cpp
parentf042c7b950adcfa1ac9525a94925a7ce741afc50 (diff)
downloadqpid-python-d7c677547f17a51f8c97ea10b7516fd44ef8d1d2.tar.gz
Handle ttl in messages transfers received by the broker 7 added test for it
Moved Timer instance from DtxManager to Broker so it can be shared git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@703521 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp1
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.h6
-rw-r--r--qpid/cpp/src/qpid/broker/DtxManager.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/DtxManager.h4
-rw-r--r--qpid/cpp/src/qpid/broker/Message.cpp26
-rw-r--r--qpid/cpp/src/qpid/broker/Message.h4
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp8
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp15
8 files changed, 56 insertions, 10 deletions
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index 9a3925b053..4074723c28 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -136,6 +136,7 @@ Broker::Broker(const Broker::Options& conf) :
dataDir(conf.noDataDir ? std::string () : conf.dataDir),
links(this),
factory(new ConnectionFactory(*this)),
+ dtxManager(timer),
sessionManager(
qpid::SessionState::Configuration(
conf.replayFlushLimit*1024, // convert kb to bytes.
diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h
index fec32d620d..a15440bc0e 100644
--- a/qpid/cpp/src/qpid/broker/Broker.h
+++ b/qpid/cpp/src/qpid/broker/Broker.h
@@ -33,6 +33,7 @@
#include "SessionManager.h"
#include "Vhost.h"
#include "System.h"
+#include "Timer.h"
#include "qpid/management/Manageable.h"
#include "qpid/management/ManagementBroker.h"
#include "qmf/org/apache/qpid/broker/Broker.h"
@@ -97,13 +98,14 @@ class Broker : public sys::Runnable, public Plugin::Target,
management::ManagementAgent::Singleton managementAgentSingleton;
std::vector< boost::shared_ptr<sys::ProtocolFactory> > protocolFactories;
MessageStore* store;
- AclModule* acl;
+ AclModule* acl;
DataDir dataDir;
QueueRegistry queues;
ExchangeRegistry exchanges;
LinkRegistry links;
boost::shared_ptr<sys::ConnectionCodec::Factory> factory;
+ Timer timer;
DtxManager dtxManager;
SessionManager sessionManager;
management::ManagementAgent* managementAgent;
@@ -195,6 +197,8 @@ class Broker : public sys::Runnable, public Plugin::Target,
boost::shared_ptr<sys::ConnectionCodec::Factory> getConnectionFactory() { return factory; }
void setConnectionFactory(boost::shared_ptr<sys::ConnectionCodec::Factory> f) { factory = f; }
+ Timer& getTimer() { return timer; }
+
boost::function<std::vector<Url> ()> getKnownBrokers;
};
diff --git a/qpid/cpp/src/qpid/broker/DtxManager.cpp b/qpid/cpp/src/qpid/broker/DtxManager.cpp
index 942dbdcbc6..135e36d84a 100644
--- a/qpid/cpp/src/qpid/broker/DtxManager.cpp
+++ b/qpid/cpp/src/qpid/broker/DtxManager.cpp
@@ -33,7 +33,7 @@ using qpid::ptr_map_ptr;
using namespace qpid::broker;
using namespace qpid::framing;
-DtxManager::DtxManager() : store(0) {}
+DtxManager::DtxManager(Timer& t) : store(0), timer(t) {}
DtxManager::~DtxManager() {}
diff --git a/qpid/cpp/src/qpid/broker/DtxManager.h b/qpid/cpp/src/qpid/broker/DtxManager.h
index fa5c62c233..a61e8610f0 100644
--- a/qpid/cpp/src/qpid/broker/DtxManager.h
+++ b/qpid/cpp/src/qpid/broker/DtxManager.h
@@ -47,14 +47,14 @@ class DtxManager{
WorkMap work;
TransactionalStore* store;
qpid::sys::Mutex lock;
- Timer timer;
+ Timer& timer;
void remove(const std::string& xid);
DtxWorkRecord* getWork(const std::string& xid);
DtxWorkRecord* createWork(std::string xid);
public:
- DtxManager();
+ DtxManager(Timer&);
~DtxManager();
void start(const std::string& xid, DtxBuffer::shared_ptr work);
void join(const std::string& xid, DtxBuffer::shared_ptr work);
diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp
index 7d02fb3d3c..e5a167bd4e 100644
--- a/qpid/cpp/src/qpid/broker/Message.cpp
+++ b/qpid/cpp/src/qpid/broker/Message.cpp
@@ -30,15 +30,22 @@
#include "qpid/framing/TypeFilter.h"
#include "qpid/log/Statement.h"
+#include <time.h>
+
using boost::intrusive_ptr;
using namespace qpid::broker;
using namespace qpid::framing;
+using qpid::sys::AbsTime;
+using qpid::sys::Duration;
+using qpid::sys::TIME_MSEC;
+using qpid::sys::FAR_FUTURE;
using std::string;
TransferAdapter Message::TRANSFER;
Message::Message(const SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), loaded(false),
-staged(false), forcePersistentPolicy(false), publisher(0), adapter(0) {}
+ staged(false), forcePersistentPolicy(false), publisher(0), adapter(0),
+ expiration(FAR_FUTURE) {}
Message::~Message()
{
@@ -297,3 +304,20 @@ void Message::addTraceId(const std::string& id)
}
}
}
+
+void Message::setTimestamp()
+{
+ time_t now = ::time(0);
+ DeliveryProperties* props = getProperties<DeliveryProperties>();
+ props->setTimestamp(now);
+ if (props->getTtl()) {
+ //set expiration (nb: ttl is in millisecs, time_t is in secs)
+ props->setExpiration(now + (props->getTtl()/1000));
+ expiration = AbsTime(AbsTime::now(), Duration(props->getTtl() * TIME_MSEC));
+ }
+}
+
+bool Message::hasExpired() const
+{
+ return expiration < AbsTime::now();
+}
diff --git a/qpid/cpp/src/qpid/broker/Message.h b/qpid/cpp/src/qpid/broker/Message.h
index 8eb1b0b31c..f6eec361bb 100644
--- a/qpid/cpp/src/qpid/broker/Message.h
+++ b/qpid/cpp/src/qpid/broker/Message.h
@@ -30,6 +30,7 @@
#include "MessageAdapter.h"
#include "qpid/framing/amqp_types.h"
#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Time.h"
namespace qpid {
@@ -70,6 +71,8 @@ public:
const framing::FieldTable* getApplicationHeaders() const;
bool isPersistent();
bool requiresAccept();
+ void setTimestamp();
+ bool hasExpired() const;
framing::FrameSet& getFrames() { return frames; }
const framing::FrameSet& getFrames() const { return frames; }
@@ -147,6 +150,7 @@ public:
bool forcePersistentPolicy; // used to force message as durable, via a broker policy
ConnectionToken* publisher;
mutable MessageAdapter* adapter;
+ qpid::sys::AbsTime expiration;
static TransferAdapter TRANSFER;
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index ea5f7a0ba9..4382ac2e57 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -264,6 +264,12 @@ bool Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
return false;
} else {
QueuedMessage msg = messages.front();
+ if (msg.payload->hasExpired()) {
+ QPID_LOG(debug, "Message expired from queue '" << name << "'");
+ popAndDequeue();
+ continue;
+ }
+
if (!optimisticConsume && store && !msg.payload->isEnqueueComplete()) {
QPID_LOG(debug, "Messages not ready to dispatch on queue '" << name << "'");
addListener(c);
@@ -294,7 +300,7 @@ bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
{
QueuedMessage msg(this);
while (seek(msg, c)) {
- if (c->filter(msg.payload)) {
+ if (c->filter(msg.payload) && !msg.payload->hasExpired()) {
if (c->accept(msg.payload)) {
//consumer wants the message
c->position = msg.position;
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index 065e15543b..6fb38eb674 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -341,6 +341,11 @@ void SemanticState::handle(intrusive_ptr<Message> msg) {
}
}
+namespace
+{
+const std::string nullstring;
+}
+
void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
std::string exchangeName = msg->getExchangeName();
//TODO: the following should be hidden behind message (using MessageAdapter or similar)
@@ -352,6 +357,7 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
if (!msg->hasProperties<DeliveryProperties>() ||
msg->getProperties<DeliveryProperties>()->getExchange().empty())
msg->getProperties<DeliveryProperties>()->setExchange(exchangeName);
+ msg->setTimestamp();
}
if (!cacheExchange || cacheExchange->getName() != exchangeName){
cacheExchange = session.getBroker().getExchanges().get(exchangeName);
@@ -359,18 +365,19 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
/* verify the userid if specified: */
std::string id =
- msg->hasProperties<MessageProperties>()? msg->getProperties<MessageProperties>()->getUserId():"";
+ msg->hasProperties<MessageProperties>() ? msg->getProperties<MessageProperties>()->getUserId() : nullstring;
if (authMsg && !id.empty() && id != userID )
{
- QPID_LOG(debug, "user id : " << userID << " msgProps.getUserID() " << msg->getProperties<MessageProperties>()->getUserId());
- throw UnauthorizedAccessException("user id in the message is not the same id used to authenticate the connection");
+ QPID_LOG(debug, "authorised user id : " << userID << " but user id in message declared as " << id);
+ throw UnauthorizedAccessException(QPID_MSG("authorised user id : " << userID << " but user id in message declared as " << id));
}
if (acl && acl->doTransferAcl())
{
if (!acl->authorise(getSession().getConnection().getUserId(),acl::PUBLISH,acl::EXCHANGE,exchangeName, msg->getRoutingKey() ))
- throw NotAllowedException("ACL denied exhange publish request");
+ throw NotAllowedException(QPID_MSG(getSession().getConnection().getUserId() << " cannot publish to " <<
+ exchangeName << " with routing-key " << msg->getRoutingKey()));
}
cacheExchange->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders());