summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/PagedQueue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/PagedQueue.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/PagedQueue.cpp460
1 files changed, 460 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/broker/PagedQueue.cpp b/qpid/cpp/src/qpid/broker/PagedQueue.cpp
new file mode 100644
index 0000000000..b5edfb89c0
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/PagedQueue.cpp
@@ -0,0 +1,460 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/PagedQueue.h"
+#include "qpid/broker/Protocol.h"
+#include "qpid/broker/QueueCursor.h"
+#include "qpid/broker/Message.h"
+#include "qpid/log/Statement.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "qpid/sys/Time.h"
+#include <string.h>
+
+namespace qpid {
+namespace broker {
+namespace {
+using qpid::sys::AbsTime;
+using qpid::sys::Duration;
+using qpid::sys::ZERO;
+using qpid::sys::FAR_FUTURE;
+using qpid::sys::MemoryMappedFile;
+const uint32_t OVERHEAD(4/*content-size*/ + 4/*sequence-number*/ + 8/*persistence-id*/ + 8/*expiration*/);
+
+size_t encodedSize(const Message& msg)
+{
+ return msg.getPersistentContext()->encodedSize() + OVERHEAD;
+}
+
+size_t encode(const Message& msg, char* data, size_t size)
+{
+ uint32_t encoded = msg.getPersistentContext()->encodedSize();
+ uint32_t required = encoded + OVERHEAD;
+ if (required > size) return 0;
+ qpid::framing::Buffer buffer(data, required);
+ buffer.putLong(encoded);
+ buffer.putLong(msg.getSequence());
+ buffer.putLongLong(msg.getPersistentContext()->getPersistenceId());
+ sys::AbsTime expiration = msg.getExpiration();
+ int64_t t(0);
+ if (expiration < FAR_FUTURE) {
+ // Just need an integer that will round trip
+ t = Duration(ZERO, expiration);
+ }
+ buffer.putLongLong(t);
+ msg.getPersistentContext()->encode(buffer);
+ assert(buffer.getPosition() == required);
+ return required;
+}
+
+size_t decode(ProtocolRegistry& protocols, Message& msg, const char* data, size_t size)
+{
+ qpid::framing::Buffer metadata(const_cast<char*>(data), size);
+ uint32_t encoded = metadata.getLong();
+ uint32_t sequence = metadata.getLong();
+ uint64_t persistenceId = metadata.getLongLong();
+ int64_t t = metadata.getLongLong();
+ assert(metadata.available() >= encoded);
+ qpid::framing::Buffer buffer(const_cast<char*>(data) + metadata.getPosition(), encoded);
+ msg = protocols.decode(buffer);
+ assert(buffer.getPosition() == encoded);
+ msg.setSequence(qpid::framing::SequenceNumber(sequence));
+ msg.getPersistentContext()->setPersistenceId(persistenceId);
+ if (t) {
+ sys::AbsTime expiration(ZERO, t);
+ msg.getSharedState().setExpiration(expiration);
+ }
+ return encoded + metadata.getPosition();
+}
+
+}
+
+PagedQueue::PagedQueue(const std::string& name_, const std::string& directory, uint m, uint factor, ProtocolRegistry& p)
+ : name(name_), pageSize(file.getPageSize()*factor), maxLoaded(m), protocols(p), offset(0), loaded(0), version(0)
+{
+ if (directory.empty()) {
+ throw qpid::Exception(QPID_MSG("Cannot create paged queue: No paged queue directory specified"));
+ }
+ file.open(name, directory);
+ QPID_LOG(debug, "PagedQueue[" << name << "]");
+}
+
+PagedQueue::~PagedQueue()
+{
+ file.close();
+}
+
+size_t PagedQueue::size()
+{
+ size_t total(0);
+ for (Used::const_iterator i = used.begin(); i != used.end(); ++i) {
+ total += i->second.available();
+ }
+ return total;
+}
+
+bool PagedQueue::deleted(const QueueCursor& cursor)
+{
+ if (cursor.valid) {
+ Used::iterator page = findPage(cursor.position, false);
+ if (page == used.end()) {
+ return false;
+ }
+ page->second.deleted(cursor.position);
+ if (page->second.empty()) {
+ //move page to free list
+ --loaded;
+ page->second.clear(file);
+ free.push_back(page->second);
+ used.erase(page);
+ }
+ return true;
+ } else {
+ return false;
+ }
+}
+
+void PagedQueue::publish(const Message& added)
+{
+ if (encodedSize(added) > pageSize) {
+ QPID_LOG(error, "Message is larger than page size for queue " << name);
+ throw qpid::framing::PreconditionFailedException(QPID_MSG("Message is larger than page size for queue " << name));
+ }
+ Used::reverse_iterator i = used.rbegin();
+ if (i != used.rend()) {
+ if (!i->second.isLoaded()) load(i->second);
+ if (i->second.add(added)) return;
+ }
+ //used is empty or last page is full, need to add a new page
+ if (!newPage(added.getSequence()).add(added)) {
+ QPID_LOG(error, "Could not add message to paged queue " << name);
+ throw qpid::Exception(QPID_MSG("Could not add message to paged queue " << name));
+ }
+}
+
+Message* PagedQueue::next(QueueCursor& cursor)
+{
+ Used::iterator i = used.begin();
+ if (cursor.valid) {
+ qpid::framing::SequenceNumber position(cursor.position);
+ ++position;
+ i = findPage(position, true);
+ if (i == used.end() && !used.empty() && used.begin()->first > position) i = used.begin();
+ }
+ while (i != used.end()) {
+ if (!i->second.isLoaded()) load(i->second);
+ Message* m = i->second.next(version, cursor);
+ QPID_LOG(debug, "PagedQueue::next(" << cursor.valid << ":" << cursor.position << "): " << m);
+ if (m) return m;
+ ++i;
+ }
+ QPID_LOG(debug, "PagedQueue::next(" << cursor.valid << ":" << cursor.position << ") returning 0 ");
+ return 0;
+}
+
+Message* PagedQueue::release(const QueueCursor& cursor)
+{
+ if (cursor.valid) {
+ Used::iterator i = findPage(cursor.position, true);
+ if (i == used.end()) return 0;
+ return i->second.release(cursor.position);
+ } else {
+ return 0;
+ }
+}
+
+Message* PagedQueue::find(const framing::SequenceNumber& position, QueueCursor* cursor)
+{
+ Used::iterator i = findPage(position, true);
+ if (i != used.end()) {
+ Message* m = i->second.find(position);
+ if (cursor) {
+ cursor->setPosition(version, m ? m->getSequence() : position);
+ }
+ return m;
+ } else {
+ return 0;
+ }
+}
+
+void PagedQueue::foreach(Functor)
+{
+ //TODO:
+}
+
+Message* PagedQueue::find(const QueueCursor& cursor)
+{
+ if (cursor.valid) return find(cursor.position, 0);
+ else return 0;
+}
+
+PagedQueue::Page::Page(size_t s, size_t o) : size(s), offset(o), region(0), used(0)
+{
+ QPID_LOG(debug, "Created Page[" << offset << "], size=" << size);
+}
+
+void PagedQueue::Page::deleted(qpid::framing::SequenceNumber s)
+{
+ if (isLoaded()) {
+ Message* message = find(s);
+ assert(message);//could this ever legitimately be 0?
+ message->setState(DELETED);
+ }
+ contents.remove(s);
+ acquired.remove(s);
+}
+
+Message* PagedQueue::Page::release(qpid::framing::SequenceNumber s)
+{
+ Message* m = find(s);
+ if (m) {
+ m->setState(AVAILABLE);
+ }
+ acquired.remove(s);
+ return m;
+}
+
+bool PagedQueue::Page::add(const Message& message)
+{
+ assert(region);
+ assert (size >= used);
+ size_t encoded = encode(message, region + used, size - used);
+ QPID_LOG(debug, "Calling Page[" << offset << "]::add() used=" << used << ", size=" << size << ", encoded=" << encoded << ")");
+ if (encoded) {
+ used += encoded;
+ messages.push_back(message);
+ messages.back().setState(AVAILABLE);
+ contents.add(message.getSequence());
+ return true;
+ } else {
+ return false;
+ }
+}
+
+bool PagedQueue::Page::empty() const
+{
+ return contents.empty();
+}
+
+bool PagedQueue::Page::isLoaded() const
+{
+ return region;
+}
+
+Message* PagedQueue::Page::next(uint32_t version, QueueCursor& cursor)
+{
+ if (messages.empty()) return 0;
+
+ qpid::framing::SequenceNumber position;
+ if (cursor.valid) {
+ position = cursor.position + 1;
+ if (position < messages.front().getSequence()) {
+ position = messages.front().getSequence();
+ cursor.setPosition(position, version);
+ }
+ } else {
+ position = messages.front().getSequence();
+ cursor.setPosition(position, version);
+ }
+
+ Message* m;
+ do {
+ m = find(position);
+ if (m) cursor.setPosition(position, version);
+ ++position;
+ } while (m != 0 && !cursor.check(*m));
+ return m;
+ //if it is the first in the page, increment the hint count of the page
+ //if it is the last in the page, decrement the hint count of the page
+}
+
+/**
+ * Called before adding to the free list
+ */
+void PagedQueue::Page::clear(MemoryMappedFile& file)
+{
+ if (region) file.unmap(region, size);
+ region = 0;
+ used = 0;
+ contents.clear();
+ messages.clear();
+}
+
+size_t PagedQueue::Page::available() const
+{
+ return contents.size() - acquired.size();
+}
+
+Message* PagedQueue::Page::find(qpid::framing::SequenceNumber position)
+{
+ if (messages.size()) {
+ assert(position >= messages.front().getSequence());
+
+ size_t index = position - messages.front().getSequence();
+ if (index < messages.size()) return &(messages[index]);
+ else return 0;
+ } else {
+ //page is empty, is this an error?
+ QPID_LOG(warning, "Could not find message at " << position << "; empty page.");
+ return 0;
+ }
+
+ //if it is the first in the page, increment the hint count of the page
+ //if it is the last in the page, decrement the hint count of the page
+}
+
+void PagedQueue::Page::load(MemoryMappedFile& file, ProtocolRegistry& protocols)
+{
+ QPID_LOG(debug, "Page[" << offset << "]::load" << " used=" << used << ", size=" << size);
+ assert(region == 0);
+ region = file.map(offset, size);
+ assert(region != 0);
+ bool haveData = used > 0;
+ used = 4;//first 4 bytes are the count
+ if (haveData) {
+ qpid::framing::Buffer buffer(region, sizeof(uint32_t));
+ uint32_t count = buffer.getLong();
+ //decode messages into Page::messages
+ for (size_t i = 0; i < count; ++i) {
+ Message message;
+ used += decode(protocols, message, region + used, size - used);
+ if (!contents.contains(message.getSequence())) {
+ message.setState(DELETED);
+ QPID_LOG(debug, "Setting state to deleted for message loaded at " << message.getSequence());
+ } else if (acquired.contains(message.getSequence())) {
+ message.setState(ACQUIRED);
+ } else {
+ message.setState(AVAILABLE);
+ }
+ messages.push_back(message);
+ }
+ if (messages.size()) {
+ QPID_LOG(debug, "Page[" << offset << "]::load " << messages.size() << " messages loaded from "
+ << messages.front().getSequence() << " to " << messages.back().getSequence());
+ } else {
+ QPID_LOG(debug, "Page[" << offset << "]::load no messages loaded");
+ }
+ }//else there is nothing we need to explicitly load, just needed to map region
+}
+
+void PagedQueue::Page::unload(MemoryMappedFile& file)
+{
+ if (messages.size()) {
+ QPID_LOG(debug, "Page[" << offset << "]::unload " << messages.size() << " messages to unload from "
+ << messages.front().getSequence() << " to " << messages.back().getSequence());
+ } else {
+ QPID_LOG(debug, "Page[" << offset << "]::unload no messages to unload");
+ }
+ for (std::deque<Message>::iterator i = messages.begin(); i != messages.end(); ++i) {
+ if (i->getState() == ACQUIRED) acquired.add(i->getSequence());
+ }
+ uint32_t count = messages.size();
+ qpid::framing::Buffer buffer(region, sizeof(uint32_t));
+ buffer.putLong(count);
+ file.flush(region, size);
+ file.unmap(region, size);
+ //remove messages from memory
+ messages.clear();
+ region = 0;
+}
+
+void PagedQueue::load(Page& page)
+{
+ //if needed, release another page
+ if (loaded == maxLoaded) {
+ //which page to select?
+ Used::reverse_iterator i = used.rbegin();
+ while (i != used.rend() && !i->second.isLoaded()) {
+ ++i;
+ }
+ assert(i != used.rend());
+ unload(i->second);
+ }
+ page.load(file, protocols);
+ ++loaded;
+ QPID_LOG(debug, "PagedQueue[" << name << "] loaded page, " << loaded << " pages now loaded");
+}
+
+void PagedQueue::unload(Page& page)
+{
+ page.unload(file);
+ --loaded;
+ QPID_LOG(debug, "PagedQueue[" << name << "] unloaded page, " << loaded << " pages now loaded");
+}
+
+
+PagedQueue::Page& PagedQueue::newPage(qpid::framing::SequenceNumber id)
+{
+ if (loaded == maxLoaded) {
+ //need to release a page from memory to make way for a new one
+
+ //choose last one?
+ Used::reverse_iterator i = used.rbegin();
+ while (!i->second.isLoaded() && i != used.rend()) {
+ ++i;
+ }
+ assert(i != used.rend());
+ unload(i->second);
+ }
+ if (free.empty()) {
+ //need to extend file and add some pages to the free list
+ addPages(4/*arbitrary number, should this be config item?*/);
+ }
+ std::pair<Used::iterator, bool> result = used.insert(Used::value_type(id, free.front()));
+ QPID_LOG(debug, "Added page for sequence starting from " << id);
+ assert(result.second);
+ free.pop_front();
+ load(result.first->second);
+ return result.first->second;
+}
+
+void PagedQueue::addPages(size_t count)
+{
+ for (size_t i = 0; i < count; ++i) {
+ free.push_back(Page(pageSize, offset));
+ offset += pageSize;
+ file.expand(offset);
+ }
+ QPID_LOG(debug, "Added " << count << " pages to free list; now have " << used.size() << " used, and " << free.size() << " free");
+}
+
+PagedQueue::Used::iterator PagedQueue::findPage(const QueueCursor& cursor)
+{
+ Used::iterator i = used.begin();
+ if (cursor.valid) {
+ i = findPage(cursor.position, true);
+ } else if (i != used.end() && !i->second.isLoaded()) {
+ load(i->second);
+ }
+ return i;
+}
+
+PagedQueue::Used::iterator PagedQueue::findPage(qpid::framing::SequenceNumber n, bool loadIfRequired)
+{
+ Used::iterator i = used.end();
+ for (Used::iterator j = used.begin(); j != used.end() && j->first <= n; ++j) {
+ i = j;
+ }
+ if (loadIfRequired && i != used.end() && !i->second.isLoaded()) {
+ load(i->second);
+ }
+ return i;
+}
+
+}} // namespace qpid::broker