summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/PagedQueue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/PagedQueue.cpp')
-rw-r--r--cpp/src/qpid/broker/PagedQueue.cpp34
1 files changed, 27 insertions, 7 deletions
diff --git a/cpp/src/qpid/broker/PagedQueue.cpp b/cpp/src/qpid/broker/PagedQueue.cpp
index 3186182735..43208d74ee 100644
--- a/cpp/src/qpid/broker/PagedQueue.cpp
+++ b/cpp/src/qpid/broker/PagedQueue.cpp
@@ -24,13 +24,18 @@
#include "qpid/broker/Message.h"
#include "qpid/log/Statement.h"
#include "qpid/framing/reply_exceptions.h"
+#include "qpid/sys/Time.h"
#include <string.h>
namespace qpid {
namespace broker {
namespace {
+using qpid::sys::AbsTime;
+using qpid::sys::Duration;
+using qpid::sys::EPOCH;
+using qpid::sys::FAR_FUTURE;
using qpid::sys::MemoryMappedFile;
-const uint32_t OVERHEAD(4/*content-size*/ + 4/*sequence-number*/ + 8/*persistence-id*/);
+const uint32_t OVERHEAD(4/*content-size*/ + 4/*sequence-number*/ + 8/*persistence-id*/ + 8/*expiration*/);
size_t encodedSize(const Message& msg)
{
@@ -46,30 +51,45 @@ size_t encode(const Message& msg, char* data, size_t size)
buffer.putLong(encoded);
buffer.putLong(msg.getSequence());
buffer.putLongLong(msg.getPersistentContext()->getPersistenceId());
+ sys::AbsTime expiration = msg.getExpiration();
+ int64_t t(0);
+ if (expiration < FAR_FUTURE) {
+ t = Duration(EPOCH, expiration);
+ }
+ buffer.putLongLong(t);
msg.getPersistentContext()->encode(buffer);
assert(buffer.getPosition() == required);
return required;
}
-size_t decode(ProtocolRegistry& protocols, Message& msg, const char* data, size_t size)
+size_t decode(ProtocolRegistry& protocols, Message& msg, const char* data, size_t size,
+ boost::intrusive_ptr<ExpiryPolicy> expiryPolicy)
{
qpid::framing::Buffer metadata(const_cast<char*>(data), size);
uint32_t encoded = metadata.getLong();
uint32_t sequence = metadata.getLong();
uint64_t persistenceId = metadata.getLongLong();
+ int64_t t = metadata.getLongLong();
assert(metadata.available() >= encoded);
qpid::framing::Buffer buffer(const_cast<char*>(data) + metadata.getPosition(), encoded);
msg = protocols.decode(buffer);
assert(buffer.getPosition() == encoded);
msg.setSequence(qpid::framing::SequenceNumber(sequence));
msg.getPersistentContext()->setPersistenceId(persistenceId);
+ if (t) {
+ sys::AbsTime expiration(EPOCH, t);
+ msg.setExpiryPolicy(expiryPolicy);
+ msg.setExpiration(expiration);
+ }
return encoded + metadata.getPosition();
}
}
-PagedQueue::PagedQueue(const std::string& name_, const std::string& directory, uint m, uint factor, ProtocolRegistry& p)
- : name(name_), pageSize(file.getPageSize()*factor), maxLoaded(m), protocols(p), offset(0), loaded(0), version(0)
+PagedQueue::PagedQueue(const std::string& name_, const std::string& directory, uint m, uint factor, ProtocolRegistry& p,
+ boost::intrusive_ptr<ExpiryPolicy> e)
+ : name(name_), pageSize(file.getPageSize()*factor), maxLoaded(m), protocols(p), offset(0), loaded(0), version(0),
+ expiryPolicy(e)
{
path = file.open(name, directory);
QPID_LOG(debug, "PagedQueue[" << path << "]");
@@ -299,7 +319,7 @@ Message* PagedQueue::Page::find(qpid::framing::SequenceNumber position)
//if it is the last in the page, decrement the hint count of the page
}
-void PagedQueue::Page::load(MemoryMappedFile& file, ProtocolRegistry& protocols)
+void PagedQueue::Page::load(MemoryMappedFile& file, ProtocolRegistry& protocols, boost::intrusive_ptr<ExpiryPolicy> expiryPolicy)
{
QPID_LOG(debug, "Page[" << offset << "]::load" << " used=" << used << ", size=" << size);
assert(region == 0);
@@ -313,7 +333,7 @@ void PagedQueue::Page::load(MemoryMappedFile& file, ProtocolRegistry& protocols)
//decode messages into Page::messages
for (size_t i = 0; i < count; ++i) {
Message message;
- used += decode(protocols, message, region + used, size - used);
+ used += decode(protocols, message, region + used, size - used, expiryPolicy);
if (!contents.contains(message.getSequence())) {
message.setState(DELETED);
QPID_LOG(debug, "Setting state to deleted for message loaded at " << message.getSequence());
@@ -366,7 +386,7 @@ void PagedQueue::load(Page& page)
assert(i != used.rend());
unload(i->second);
}
- page.load(file, protocols);
+ page.load(file, protocols, expiryPolicy);
++loaded;
QPID_LOG(debug, "PagedQueue[" << path << "] loaded page, " << loaded << " pages now loaded");
}