/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include "qpid/broker/PagedQueue.h" #include "qpid/broker/Protocol.h" #include "qpid/broker/QueueCursor.h" #include "qpid/broker/Message.h" #include "qpid/log/Statement.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/sys/Time.h" #include namespace qpid { namespace broker { namespace { using qpid::sys::AbsTime; using qpid::sys::Duration; using qpid::sys::ZERO; using qpid::sys::FAR_FUTURE; using qpid::sys::MemoryMappedFile; const uint32_t OVERHEAD(4/*content-size*/ + 4/*sequence-number*/ + 8/*persistence-id*/ + 8/*expiration*/); size_t encodedSize(const Message& msg) { return msg.getPersistentContext()->encodedSize() + OVERHEAD; } size_t encode(const Message& msg, char* data, size_t size) { uint32_t encoded = msg.getPersistentContext()->encodedSize(); uint32_t required = encoded + OVERHEAD; if (required > size) return 0; qpid::framing::Buffer buffer(data, required); buffer.putLong(encoded); buffer.putLong(msg.getSequence()); buffer.putLongLong(msg.getPersistentContext()->getPersistenceId()); sys::AbsTime expiration = msg.getExpiration(); int64_t t(0); if (expiration < FAR_FUTURE) { // Just need an integer that will round trip t = Duration(ZERO, expiration); } buffer.putLongLong(t); msg.getPersistentContext()->encode(buffer); assert(buffer.getPosition() == required); return required; } size_t decode(ProtocolRegistry& protocols, Message& msg, const char* data, size_t size) { qpid::framing::Buffer metadata(const_cast(data), size); uint32_t encoded = metadata.getLong(); uint32_t sequence = metadata.getLong(); uint64_t persistenceId = metadata.getLongLong(); int64_t t = metadata.getLongLong(); assert(metadata.available() >= encoded); qpid::framing::Buffer buffer(const_cast(data) + metadata.getPosition(), encoded); msg = protocols.decode(buffer); assert(buffer.getPosition() == encoded); msg.setSequence(qpid::framing::SequenceNumber(sequence)); msg.getPersistentContext()->setPersistenceId(persistenceId); if (t) { sys::AbsTime expiration(ZERO, t); msg.getSharedState().setExpiration(expiration); } return encoded + metadata.getPosition(); } } PagedQueue::PagedQueue(const std::string& name_, const std::string& directory, uint m, uint factor, ProtocolRegistry& p) : name(name_), pageSize(file.getPageSize()*factor), maxLoaded(m), protocols(p), offset(0), loaded(0), version(0) { if (directory.empty()) { throw qpid::Exception(QPID_MSG("Cannot create paged queue: No paged queue directory specified")); } file.open(name, directory); QPID_LOG(debug, "PagedQueue[" << name << "]"); } PagedQueue::~PagedQueue() { file.close(); } size_t PagedQueue::size() { size_t total(0); for (Used::const_iterator i = used.begin(); i != used.end(); ++i) { total += i->second.available(); } return total; } bool PagedQueue::deleted(const QueueCursor& cursor) { if (cursor.valid) { Used::iterator page = findPage(cursor.position, false); if (page == used.end()) { return false; } page->second.deleted(cursor.position); if (page->second.empty()) { //move page to free list --loaded; page->second.clear(file); free.push_back(page->second); used.erase(page); } return true; } else { return false; } } void PagedQueue::check(const Message& added) { if (encodedSize(added) > pageSize) { QPID_LOG(error, "Message is larger than page size for queue " << name); throw qpid::framing::PreconditionFailedException(QPID_MSG("Message is larger than page size for queue " << name)); } } void PagedQueue::publish(const Message& added) { check(added); Used::reverse_iterator i = used.rbegin(); if (i != used.rend()) { if (!i->second.isLoaded()) load(i->second); if (i->second.add(added)) return; } //used is empty or last page is full, need to add a new page if (!newPage(added.getSequence()).add(added)) { QPID_LOG(error, "Could not add message to paged queue " << name); throw qpid::Exception(QPID_MSG("Could not add message to paged queue " << name)); } } Message* PagedQueue::next(QueueCursor& cursor) { Used::iterator i = used.begin(); if (cursor.valid) { qpid::framing::SequenceNumber position(cursor.position); ++position; i = findPage(position, true); if (i == used.end() && !used.empty() && used.begin()->first > position) i = used.begin(); } while (i != used.end()) { if (!i->second.isLoaded()) load(i->second); Message* m = i->second.next(version, cursor); QPID_LOG(debug, "PagedQueue::next(" << cursor.valid << ":" << cursor.position << "): " << m); if (m) return m; ++i; } QPID_LOG(debug, "PagedQueue::next(" << cursor.valid << ":" << cursor.position << ") returning 0 "); return 0; } Message* PagedQueue::release(const QueueCursor& cursor) { if (cursor.valid) { Used::iterator i = findPage(cursor.position, true); if (i == used.end()) return 0; return i->second.release(cursor.position); } else { return 0; } } Message* PagedQueue::find(const framing::SequenceNumber& position, QueueCursor* cursor) { Used::iterator i = findPage(position, true); if (i != used.end()) { Message* m = i->second.find(position); if (cursor) { cursor->setPosition(version, m ? m->getSequence() : position); } return m; } else { return 0; } } void PagedQueue::foreach(Functor) { //TODO: } Message* PagedQueue::find(const QueueCursor& cursor) { if (cursor.valid) return find(cursor.position, 0); else return 0; } PagedQueue::Page::Page(size_t s, size_t o) : size(s), offset(o), region(0), used(0) { QPID_LOG(debug, "Created Page[" << offset << "], size=" << size); } void PagedQueue::Page::deleted(qpid::framing::SequenceNumber s) { if (isLoaded()) { Message* message = find(s); assert(message);//could this ever legitimately be 0? message->setState(DELETED); } contents.remove(s); acquired.remove(s); } Message* PagedQueue::Page::release(qpid::framing::SequenceNumber s) { Message* m = find(s); if (m) { m->setState(AVAILABLE); } acquired.remove(s); return m; } bool PagedQueue::Page::add(const Message& message) { assert(region); assert (size >= used); size_t encoded = encode(message, region + used, size - used); QPID_LOG(debug, "Calling Page[" << offset << "]::add() used=" << used << ", size=" << size << ", encoded=" << encoded << ")"); if (encoded) { used += encoded; messages.push_back(message); messages.back().setState(AVAILABLE); contents.add(message.getSequence()); return true; } else { return false; } } bool PagedQueue::Page::empty() const { return contents.empty(); } bool PagedQueue::Page::isLoaded() const { return region; } Message* PagedQueue::Page::next(uint32_t version, QueueCursor& cursor) { if (messages.empty()) return 0; qpid::framing::SequenceNumber position; if (cursor.valid) { position = cursor.position + 1; if (position < messages.front().getSequence()) { position = messages.front().getSequence(); cursor.setPosition(position, version); } } else { position = messages.front().getSequence(); cursor.setPosition(position, version); } Message* m; do { m = find(position); if (m) cursor.setPosition(position, version); ++position; } while (m != 0 && !cursor.check(*m)); return m; //if it is the first in the page, increment the hint count of the page //if it is the last in the page, decrement the hint count of the page } /** * Called before adding to the free list */ void PagedQueue::Page::clear(MemoryMappedFile& file) { if (region) file.unmap(region, size); region = 0; used = 0; contents.clear(); messages.clear(); } size_t PagedQueue::Page::available() const { return contents.size() - acquired.size(); } Message* PagedQueue::Page::find(qpid::framing::SequenceNumber position) { if (messages.size()) { assert(position >= messages.front().getSequence()); size_t index = position - messages.front().getSequence(); if (index < messages.size()) return &(messages[index]); else return 0; } else { //page is empty, is this an error? QPID_LOG(warning, "Could not find message at " << position << "; empty page."); return 0; } //if it is the first in the page, increment the hint count of the page //if it is the last in the page, decrement the hint count of the page } void PagedQueue::Page::load(MemoryMappedFile& file, ProtocolRegistry& protocols) { QPID_LOG(debug, "Page[" << offset << "]::load" << " used=" << used << ", size=" << size); assert(region == 0); region = file.map(offset, size); assert(region != 0); bool haveData = used > 0; used = 4;//first 4 bytes are the count if (haveData) { qpid::framing::Buffer buffer(region, sizeof(uint32_t)); uint32_t count = buffer.getLong(); //decode messages into Page::messages for (size_t i = 0; i < count; ++i) { Message message; used += decode(protocols, message, region + used, size - used); if (!contents.contains(message.getSequence())) { message.setState(DELETED); QPID_LOG(debug, "Setting state to deleted for message loaded at " << message.getSequence()); } else if (acquired.contains(message.getSequence())) { message.setState(ACQUIRED); } else { message.setState(AVAILABLE); } messages.push_back(message); } if (messages.size()) { QPID_LOG(debug, "Page[" << offset << "]::load " << messages.size() << " messages loaded from " << messages.front().getSequence() << " to " << messages.back().getSequence()); } else { QPID_LOG(debug, "Page[" << offset << "]::load no messages loaded"); } }//else there is nothing we need to explicitly load, just needed to map region } void PagedQueue::Page::unload(MemoryMappedFile& file) { if (messages.size()) { QPID_LOG(debug, "Page[" << offset << "]::unload " << messages.size() << " messages to unload from " << messages.front().getSequence() << " to " << messages.back().getSequence()); } else { QPID_LOG(debug, "Page[" << offset << "]::unload no messages to unload"); } for (std::deque::iterator i = messages.begin(); i != messages.end(); ++i) { if (i->getState() == ACQUIRED) acquired.add(i->getSequence()); } uint32_t count = messages.size(); qpid::framing::Buffer buffer(region, sizeof(uint32_t)); buffer.putLong(count); file.flush(region, size); file.unmap(region, size); //remove messages from memory messages.clear(); region = 0; } void PagedQueue::load(Page& page) { //if needed, release another page if (loaded == maxLoaded) { //which page to select? Used::reverse_iterator i = used.rbegin(); while (i != used.rend() && !i->second.isLoaded()) { ++i; } assert(i != used.rend()); unload(i->second); } page.load(file, protocols); ++loaded; QPID_LOG(debug, "PagedQueue[" << name << "] loaded page, " << loaded << " pages now loaded"); } void PagedQueue::unload(Page& page) { page.unload(file); --loaded; QPID_LOG(debug, "PagedQueue[" << name << "] unloaded page, " << loaded << " pages now loaded"); } PagedQueue::Page& PagedQueue::newPage(qpid::framing::SequenceNumber id) { if (loaded == maxLoaded) { //need to release a page from memory to make way for a new one //choose last one? Used::reverse_iterator i = used.rbegin(); while (!i->second.isLoaded() && i != used.rend()) { ++i; } assert(i != used.rend()); unload(i->second); } if (free.empty()) { //need to extend file and add some pages to the free list addPages(4/*arbitrary number, should this be config item?*/); } std::pair result = used.insert(Used::value_type(id, free.front())); QPID_LOG(debug, "Added page for sequence starting from " << id); assert(result.second); free.pop_front(); load(result.first->second); return result.first->second; } void PagedQueue::addPages(size_t count) { for (size_t i = 0; i < count; ++i) { free.push_back(Page(pageSize, offset)); offset += pageSize; file.expand(offset); } QPID_LOG(debug, "Added " << count << " pages to free list; now have " << used.size() << " used, and " << free.size() << " free"); } PagedQueue::Used::iterator PagedQueue::findPage(const QueueCursor& cursor) { Used::iterator i = used.begin(); if (cursor.valid) { i = findPage(cursor.position, true); } else if (i != used.end() && !i->second.isLoaded()) { load(i->second); } return i; } PagedQueue::Used::iterator PagedQueue::findPage(qpid::framing::SequenceNumber n, bool loadIfRequired) { Used::iterator i = used.end(); for (Used::iterator j = used.begin(); j != used.end() && j->first <= n; ++j) { i = j; } if (loadIfRequired && i != used.end() && !i->second.isLoaded()) { load(i->second); } return i; } }} // namespace qpid::broker