diff options
| author | Andrew Stitcher <astitcher@apache.org> | 2014-01-30 23:10:08 +0000 |
|---|---|---|
| committer | Andrew Stitcher <astitcher@apache.org> | 2014-01-30 23:10:08 +0000 |
| commit | 8090b6360aede941d76b4e9cf7e983feb2ae2504 (patch) | |
| tree | 5875148dbcaafe0d79bea14050ffac09f39c4ff4 /cpp/src | |
| parent | e36486dd627acf54a07b905792a2dd90fb959dee (diff) | |
| download | qpid-python-8090b6360aede941d76b4e9cf7e983feb2ae2504.tar.gz | |
QPID-5485: By default use a subdirectory of the data dir called "pq" for
page queue files. If neither data dir nor paging dir are set then error
on any attempt to create a paged queue.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1563012 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
| -rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Broker.h | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/PagedQueue.cpp | 17 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/PagedQueue.h | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/posix/BrokerDefaults.cpp | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/windows/BrokerDefaults.cpp | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/MemoryMappedFile.h | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/posix/MemoryMappedFile.cpp | 31 | ||||
| -rwxr-xr-x | cpp/src/tests/run_paged_queue_tests | 2 |
9 files changed, 40 insertions, 22 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 975acfdb87..7741d83665 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -217,7 +217,7 @@ Broker::Broker(const Broker::Options& conf) : store(new NullMessageStore), acl(0), dataDir(conf.noDataDir ? std::string() : conf.dataDir), - pagingDir(conf.pagingDir), + pagingDir(!conf.pagingDir.empty() ? conf.pagingDir : !conf.noDataDir ? dataDir.getPath() + Options::DEFAULT_PAGED_QUEUE_DIR : std::string() ), queues(this), exchanges(this), links(this), @@ -387,7 +387,7 @@ Broker::Broker(const Broker::Options& conf) : std::string Broker::getPagingDirectoryPath() { - return pagingDir.isEnabled() ? pagingDir.getPath() : dataDir.getPath(); + return pagingDir.getPath(); } void Broker::declareStandardExchange(const std::string& name, const std::string& type) diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index 4e46b71cc9..5923d6c0e5 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -86,6 +86,7 @@ class Broker : public sys::Runnable, public Plugin::Target, struct Options : public qpid::Options { static const std::string DEFAULT_DATA_DIR_LOCATION; static const std::string DEFAULT_DATA_DIR_NAME; + static const std::string DEFAULT_PAGED_QUEUE_DIR; QPID_BROKER_EXTERN Options(const std::string& name="Broker Options"); diff --git a/cpp/src/qpid/broker/PagedQueue.cpp b/cpp/src/qpid/broker/PagedQueue.cpp index 43208d74ee..afb330489b 100644 --- a/cpp/src/qpid/broker/PagedQueue.cpp +++ b/cpp/src/qpid/broker/PagedQueue.cpp @@ -91,13 +91,16 @@ PagedQueue::PagedQueue(const std::string& name_, const std::string& directory, u : name(name_), pageSize(file.getPageSize()*factor), maxLoaded(m), protocols(p), offset(0), loaded(0), version(0), expiryPolicy(e) { - path = file.open(name, directory); - QPID_LOG(debug, "PagedQueue[" << path << "]"); + if (directory.empty()) { + throw qpid::Exception(QPID_MSG("Cannot create paged queue: No paged queue directory specified")); + } + file.open(name, directory); + QPID_LOG(debug, "PagedQueue[" << name << "]"); } PagedQueue::~PagedQueue() { - file.close(path); + file.close(); } size_t PagedQueue::size() @@ -133,7 +136,7 @@ bool PagedQueue::deleted(const QueueCursor& cursor) void PagedQueue::publish(const Message& added) { if (encodedSize(added) > pageSize) { - QPID_LOG(error, "Message is larger than page size for queue " << name << ", backed by " << path); + QPID_LOG(error, "Message is larger than page size for queue " << name); throw qpid::framing::PreconditionFailedException(QPID_MSG("Message is larger than page size for queue " << name)); } Used::reverse_iterator i = used.rbegin(); @@ -143,7 +146,7 @@ void PagedQueue::publish(const Message& added) } //used is empty or last page is full, need to add a new page if (!newPage(added.getSequence()).add(added)) { - QPID_LOG(error, "Could not add message to paged queue " << name << ", backed by " << path); + QPID_LOG(error, "Could not add message to paged queue " << name); throw qpid::Exception(QPID_MSG("Could not add message to paged queue " << name)); } } @@ -388,14 +391,14 @@ void PagedQueue::load(Page& page) } page.load(file, protocols, expiryPolicy); ++loaded; - QPID_LOG(debug, "PagedQueue[" << path << "] loaded page, " << loaded << " pages now loaded"); + QPID_LOG(debug, "PagedQueue[" << name << "] loaded page, " << loaded << " pages now loaded"); } void PagedQueue::unload(Page& page) { page.unload(file); --loaded; - QPID_LOG(debug, "PagedQueue[" << path << "] unloaded page, " << loaded << " pages now loaded"); + QPID_LOG(debug, "PagedQueue[" << name << "] unloaded page, " << loaded << " pages now loaded"); } diff --git a/cpp/src/qpid/broker/PagedQueue.h b/cpp/src/qpid/broker/PagedQueue.h index cb83fa9f34..e4c98f4119 100644 --- a/cpp/src/qpid/broker/PagedQueue.h +++ b/cpp/src/qpid/broker/PagedQueue.h @@ -79,7 +79,6 @@ class PagedQueue : public Messages { qpid::sys::MemoryMappedFile file; std::string name; - std::string path; const size_t pageSize; const uint maxLoaded; ProtocolRegistry& protocols; diff --git a/cpp/src/qpid/broker/posix/BrokerDefaults.cpp b/cpp/src/qpid/broker/posix/BrokerDefaults.cpp index 9e463fa32d..9e118c19f7 100644 --- a/cpp/src/qpid/broker/posix/BrokerDefaults.cpp +++ b/cpp/src/qpid/broker/posix/BrokerDefaults.cpp @@ -27,6 +27,7 @@ namespace broker { const std::string Broker::Options::DEFAULT_DATA_DIR_LOCATION("/tmp"); const std::string Broker::Options::DEFAULT_DATA_DIR_NAME("/.qpidd"); +const std::string Broker::Options::DEFAULT_PAGED_QUEUE_DIR("/pq"); std::string Broker::Options::getHome() { diff --git a/cpp/src/qpid/broker/windows/BrokerDefaults.cpp b/cpp/src/qpid/broker/windows/BrokerDefaults.cpp index b65440b5ad..260bd7c073 100644 --- a/cpp/src/qpid/broker/windows/BrokerDefaults.cpp +++ b/cpp/src/qpid/broker/windows/BrokerDefaults.cpp @@ -27,6 +27,7 @@ namespace broker { const std::string Broker::Options::DEFAULT_DATA_DIR_LOCATION("\\TEMP"); const std::string Broker::Options::DEFAULT_DATA_DIR_NAME("\\QPIDD.DATA"); +const std::string Broker::Options::DEFAULT_PAGED_QUEUE_DIR("\\PQ"); std::string Broker::Options::getHome() { diff --git a/cpp/src/qpid/sys/MemoryMappedFile.h b/cpp/src/qpid/sys/MemoryMappedFile.h index 43e4c852af..ecf38e98e6 100644 --- a/cpp/src/qpid/sys/MemoryMappedFile.h +++ b/cpp/src/qpid/sys/MemoryMappedFile.h @@ -38,11 +38,11 @@ class MemoryMappedFile { /** * Opens a file that can be mapped by region into memory */ - QPID_COMMON_EXTERN std::string open(const std::string& name, const std::string& directory); + QPID_COMMON_EXTERN void open(const std::string& name, const std::string& directory); /** * Closes and removes the file that can be mapped by region into memory */ - QPID_COMMON_EXTERN void close(const std::string& path); + QPID_COMMON_EXTERN void close(); /** * Returns the page size */ diff --git a/cpp/src/qpid/sys/posix/MemoryMappedFile.cpp b/cpp/src/qpid/sys/posix/MemoryMappedFile.cpp index f647ab3943..b4292aa4bc 100644 --- a/cpp/src/qpid/sys/posix/MemoryMappedFile.cpp +++ b/cpp/src/qpid/sys/posix/MemoryMappedFile.cpp @@ -29,13 +29,14 @@ namespace qpid { namespace sys { namespace { +const std::string PAGEFILE_PREFIX("pf_"); const std::string PATH_SEPARATOR("/"); const std::string ESCAPE("%"); const std::string VALID("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-."); std::string getFileName(const std::string& name, const std::string& dir) { std::stringstream filename; - if (dir.size()) filename << dir << PATH_SEPARATOR; + if (dir.size()) filename << dir << PATH_SEPARATOR << PAGEFILE_PREFIX; size_t start = 0; while (true) { size_t i = name.find_first_not_of(VALID, start); @@ -55,31 +56,39 @@ std::string getFileName(const std::string& name, const std::string& dir) class MemoryMappedFilePrivate { friend class MemoryMappedFile; + std::string path; int fd; MemoryMappedFilePrivate() : fd(0) {} }; MemoryMappedFile::MemoryMappedFile() : state(new MemoryMappedFilePrivate) {} MemoryMappedFile::~MemoryMappedFile() { delete state; } -std::string MemoryMappedFile::open(const std::string& name, const std::string& directory) +void MemoryMappedFile::open(const std::string& name, const std::string& directory) { - std::string path = getFileName(name, directory); + // Ensure directory exists + if ( ::mkdir(directory.c_str(), S_IRWXU | S_IRGRP | S_IXGRP )!=0 && errno!=EEXIST ) { + throw qpid::Exception(QPID_MSG("Failed to create memory mapped file directory " << directory << ": " << qpid::sys::strError(errno))); + } + + state->path = getFileName(name, directory); - int flags = O_CREAT | O_EXCL | O_RDWR; - int fd = ::open(path.c_str(), flags, S_IRUSR | S_IWUSR); - if (fd == -1) throw qpid::Exception(QPID_MSG("Failed to open memory mapped file " << path << ": " << qpid::sys::strError(errno) << " [flags=" << flags << "]")); + int flags = O_CREAT | O_TRUNC | O_RDWR; + int fd = ::open(state->path.c_str(), flags, S_IRUSR | S_IWUSR); + if (fd == -1) throw qpid::Exception(QPID_MSG("Failed to open memory mapped file " << state->path << ": " << qpid::sys::strError(errno) << " [flags=" << flags << "]")); state->fd = fd; - return path; } -void MemoryMappedFile::close(const std::string& path) + +void MemoryMappedFile::close() { ::close(state->fd); - ::unlink(path.c_str()); + ::unlink(state->path.c_str()); } + size_t MemoryMappedFile::getPageSize() { return ::sysconf(_SC_PAGE_SIZE); } + char* MemoryMappedFile::map(size_t offset, size_t size) { int protection = PROT_READ | PROT_WRITE; @@ -90,20 +99,24 @@ char* MemoryMappedFile::map(size_t offset, size_t size) return region; } + void MemoryMappedFile::unmap(char* region, size_t size) { ::munmap(region, size); } + void MemoryMappedFile::flush(char* region, size_t size) { ::msync(region, size, MS_ASYNC); } + void MemoryMappedFile::expand(size_t offset) { if ((::lseek(state->fd, offset - 1, SEEK_SET) == -1) || (::write(state->fd, "", 1) == -1)) { throw qpid::Exception(QPID_MSG("Failed to expand paged queue file: " << qpid::sys::strError(errno))); } } + bool MemoryMappedFile::isSupported() { return true; diff --git a/cpp/src/tests/run_paged_queue_tests b/cpp/src/tests/run_paged_queue_tests index 8f72ddd5b7..cd1dc72641 100755 --- a/cpp/src/tests/run_paged_queue_tests +++ b/cpp/src/tests/run_paged_queue_tests @@ -26,7 +26,7 @@ trap stop_broker INT TERM QUIT export PATH=$PWD:$srcdir:$PYTHON_COMMANDS:$PATH start_broker() { - QPID_PORT=$($QPIDD_EXEC --daemon --port 0 --interface 127.0.0.1 --no-data-dir $MODULES --auth no) || fail "Could not start broker" + QPID_PORT=$($QPIDD_EXEC --daemon --port 0 --interface 127.0.0.1 --no-data-dir --paging-dir=$PWD/pqtest_data $MODULES --auth no) || fail "Could not start broker" } stop_broker() { |
