summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2014-01-30 23:10:08 +0000
committerAndrew Stitcher <astitcher@apache.org>2014-01-30 23:10:08 +0000
commit8090b6360aede941d76b4e9cf7e983feb2ae2504 (patch)
tree5875148dbcaafe0d79bea14050ffac09f39c4ff4 /cpp/src
parente36486dd627acf54a07b905792a2dd90fb959dee (diff)
downloadqpid-python-8090b6360aede941d76b4e9cf7e983feb2ae2504.tar.gz
QPID-5485: By default use a subdirectory of the data dir called "pq" for
page queue files. If neither data dir nor paging dir are set then error on any attempt to create a paged queue. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1563012 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/Broker.cpp4
-rw-r--r--cpp/src/qpid/broker/Broker.h1
-rw-r--r--cpp/src/qpid/broker/PagedQueue.cpp17
-rw-r--r--cpp/src/qpid/broker/PagedQueue.h1
-rw-r--r--cpp/src/qpid/broker/posix/BrokerDefaults.cpp1
-rw-r--r--cpp/src/qpid/broker/windows/BrokerDefaults.cpp1
-rw-r--r--cpp/src/qpid/sys/MemoryMappedFile.h4
-rw-r--r--cpp/src/qpid/sys/posix/MemoryMappedFile.cpp31
-rwxr-xr-xcpp/src/tests/run_paged_queue_tests2
9 files changed, 40 insertions, 22 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index 975acfdb87..7741d83665 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -217,7 +217,7 @@ Broker::Broker(const Broker::Options& conf) :
store(new NullMessageStore),
acl(0),
dataDir(conf.noDataDir ? std::string() : conf.dataDir),
- pagingDir(conf.pagingDir),
+ pagingDir(!conf.pagingDir.empty() ? conf.pagingDir : !conf.noDataDir ? dataDir.getPath() + Options::DEFAULT_PAGED_QUEUE_DIR : std::string() ),
queues(this),
exchanges(this),
links(this),
@@ -387,7 +387,7 @@ Broker::Broker(const Broker::Options& conf) :
std::string Broker::getPagingDirectoryPath()
{
- return pagingDir.isEnabled() ? pagingDir.getPath() : dataDir.getPath();
+ return pagingDir.getPath();
}
void Broker::declareStandardExchange(const std::string& name, const std::string& type)
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index 4e46b71cc9..5923d6c0e5 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -86,6 +86,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
struct Options : public qpid::Options {
static const std::string DEFAULT_DATA_DIR_LOCATION;
static const std::string DEFAULT_DATA_DIR_NAME;
+ static const std::string DEFAULT_PAGED_QUEUE_DIR;
QPID_BROKER_EXTERN Options(const std::string& name="Broker Options");
diff --git a/cpp/src/qpid/broker/PagedQueue.cpp b/cpp/src/qpid/broker/PagedQueue.cpp
index 43208d74ee..afb330489b 100644
--- a/cpp/src/qpid/broker/PagedQueue.cpp
+++ b/cpp/src/qpid/broker/PagedQueue.cpp
@@ -91,13 +91,16 @@ PagedQueue::PagedQueue(const std::string& name_, const std::string& directory, u
: name(name_), pageSize(file.getPageSize()*factor), maxLoaded(m), protocols(p), offset(0), loaded(0), version(0),
expiryPolicy(e)
{
- path = file.open(name, directory);
- QPID_LOG(debug, "PagedQueue[" << path << "]");
+ if (directory.empty()) {
+ throw qpid::Exception(QPID_MSG("Cannot create paged queue: No paged queue directory specified"));
+ }
+ file.open(name, directory);
+ QPID_LOG(debug, "PagedQueue[" << name << "]");
}
PagedQueue::~PagedQueue()
{
- file.close(path);
+ file.close();
}
size_t PagedQueue::size()
@@ -133,7 +136,7 @@ bool PagedQueue::deleted(const QueueCursor& cursor)
void PagedQueue::publish(const Message& added)
{
if (encodedSize(added) > pageSize) {
- QPID_LOG(error, "Message is larger than page size for queue " << name << ", backed by " << path);
+ QPID_LOG(error, "Message is larger than page size for queue " << name);
throw qpid::framing::PreconditionFailedException(QPID_MSG("Message is larger than page size for queue " << name));
}
Used::reverse_iterator i = used.rbegin();
@@ -143,7 +146,7 @@ void PagedQueue::publish(const Message& added)
}
//used is empty or last page is full, need to add a new page
if (!newPage(added.getSequence()).add(added)) {
- QPID_LOG(error, "Could not add message to paged queue " << name << ", backed by " << path);
+ QPID_LOG(error, "Could not add message to paged queue " << name);
throw qpid::Exception(QPID_MSG("Could not add message to paged queue " << name));
}
}
@@ -388,14 +391,14 @@ void PagedQueue::load(Page& page)
}
page.load(file, protocols, expiryPolicy);
++loaded;
- QPID_LOG(debug, "PagedQueue[" << path << "] loaded page, " << loaded << " pages now loaded");
+ QPID_LOG(debug, "PagedQueue[" << name << "] loaded page, " << loaded << " pages now loaded");
}
void PagedQueue::unload(Page& page)
{
page.unload(file);
--loaded;
- QPID_LOG(debug, "PagedQueue[" << path << "] unloaded page, " << loaded << " pages now loaded");
+ QPID_LOG(debug, "PagedQueue[" << name << "] unloaded page, " << loaded << " pages now loaded");
}
diff --git a/cpp/src/qpid/broker/PagedQueue.h b/cpp/src/qpid/broker/PagedQueue.h
index cb83fa9f34..e4c98f4119 100644
--- a/cpp/src/qpid/broker/PagedQueue.h
+++ b/cpp/src/qpid/broker/PagedQueue.h
@@ -79,7 +79,6 @@ class PagedQueue : public Messages {
qpid::sys::MemoryMappedFile file;
std::string name;
- std::string path;
const size_t pageSize;
const uint maxLoaded;
ProtocolRegistry& protocols;
diff --git a/cpp/src/qpid/broker/posix/BrokerDefaults.cpp b/cpp/src/qpid/broker/posix/BrokerDefaults.cpp
index 9e463fa32d..9e118c19f7 100644
--- a/cpp/src/qpid/broker/posix/BrokerDefaults.cpp
+++ b/cpp/src/qpid/broker/posix/BrokerDefaults.cpp
@@ -27,6 +27,7 @@ namespace broker {
const std::string Broker::Options::DEFAULT_DATA_DIR_LOCATION("/tmp");
const std::string Broker::Options::DEFAULT_DATA_DIR_NAME("/.qpidd");
+const std::string Broker::Options::DEFAULT_PAGED_QUEUE_DIR("/pq");
std::string
Broker::Options::getHome() {
diff --git a/cpp/src/qpid/broker/windows/BrokerDefaults.cpp b/cpp/src/qpid/broker/windows/BrokerDefaults.cpp
index b65440b5ad..260bd7c073 100644
--- a/cpp/src/qpid/broker/windows/BrokerDefaults.cpp
+++ b/cpp/src/qpid/broker/windows/BrokerDefaults.cpp
@@ -27,6 +27,7 @@ namespace broker {
const std::string Broker::Options::DEFAULT_DATA_DIR_LOCATION("\\TEMP");
const std::string Broker::Options::DEFAULT_DATA_DIR_NAME("\\QPIDD.DATA");
+const std::string Broker::Options::DEFAULT_PAGED_QUEUE_DIR("\\PQ");
std::string
Broker::Options::getHome() {
diff --git a/cpp/src/qpid/sys/MemoryMappedFile.h b/cpp/src/qpid/sys/MemoryMappedFile.h
index 43e4c852af..ecf38e98e6 100644
--- a/cpp/src/qpid/sys/MemoryMappedFile.h
+++ b/cpp/src/qpid/sys/MemoryMappedFile.h
@@ -38,11 +38,11 @@ class MemoryMappedFile {
/**
* Opens a file that can be mapped by region into memory
*/
- QPID_COMMON_EXTERN std::string open(const std::string& name, const std::string& directory);
+ QPID_COMMON_EXTERN void open(const std::string& name, const std::string& directory);
/**
* Closes and removes the file that can be mapped by region into memory
*/
- QPID_COMMON_EXTERN void close(const std::string& path);
+ QPID_COMMON_EXTERN void close();
/**
* Returns the page size
*/
diff --git a/cpp/src/qpid/sys/posix/MemoryMappedFile.cpp b/cpp/src/qpid/sys/posix/MemoryMappedFile.cpp
index f647ab3943..b4292aa4bc 100644
--- a/cpp/src/qpid/sys/posix/MemoryMappedFile.cpp
+++ b/cpp/src/qpid/sys/posix/MemoryMappedFile.cpp
@@ -29,13 +29,14 @@
namespace qpid {
namespace sys {
namespace {
+const std::string PAGEFILE_PREFIX("pf_");
const std::string PATH_SEPARATOR("/");
const std::string ESCAPE("%");
const std::string VALID("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-.");
std::string getFileName(const std::string& name, const std::string& dir)
{
std::stringstream filename;
- if (dir.size()) filename << dir << PATH_SEPARATOR;
+ if (dir.size()) filename << dir << PATH_SEPARATOR << PAGEFILE_PREFIX;
size_t start = 0;
while (true) {
size_t i = name.find_first_not_of(VALID, start);
@@ -55,31 +56,39 @@ std::string getFileName(const std::string& name, const std::string& dir)
class MemoryMappedFilePrivate
{
friend class MemoryMappedFile;
+ std::string path;
int fd;
MemoryMappedFilePrivate() : fd(0) {}
};
MemoryMappedFile::MemoryMappedFile() : state(new MemoryMappedFilePrivate) {}
MemoryMappedFile::~MemoryMappedFile() { delete state; }
-std::string MemoryMappedFile::open(const std::string& name, const std::string& directory)
+void MemoryMappedFile::open(const std::string& name, const std::string& directory)
{
- std::string path = getFileName(name, directory);
+ // Ensure directory exists
+ if ( ::mkdir(directory.c_str(), S_IRWXU | S_IRGRP | S_IXGRP )!=0 && errno!=EEXIST ) {
+ throw qpid::Exception(QPID_MSG("Failed to create memory mapped file directory " << directory << ": " << qpid::sys::strError(errno)));
+ }
+
+ state->path = getFileName(name, directory);
- int flags = O_CREAT | O_EXCL | O_RDWR;
- int fd = ::open(path.c_str(), flags, S_IRUSR | S_IWUSR);
- if (fd == -1) throw qpid::Exception(QPID_MSG("Failed to open memory mapped file " << path << ": " << qpid::sys::strError(errno) << " [flags=" << flags << "]"));
+ int flags = O_CREAT | O_TRUNC | O_RDWR;
+ int fd = ::open(state->path.c_str(), flags, S_IRUSR | S_IWUSR);
+ if (fd == -1) throw qpid::Exception(QPID_MSG("Failed to open memory mapped file " << state->path << ": " << qpid::sys::strError(errno) << " [flags=" << flags << "]"));
state->fd = fd;
- return path;
}
-void MemoryMappedFile::close(const std::string& path)
+
+void MemoryMappedFile::close()
{
::close(state->fd);
- ::unlink(path.c_str());
+ ::unlink(state->path.c_str());
}
+
size_t MemoryMappedFile::getPageSize()
{
return ::sysconf(_SC_PAGE_SIZE);
}
+
char* MemoryMappedFile::map(size_t offset, size_t size)
{
int protection = PROT_READ | PROT_WRITE;
@@ -90,20 +99,24 @@ char* MemoryMappedFile::map(size_t offset, size_t size)
return region;
}
+
void MemoryMappedFile::unmap(char* region, size_t size)
{
::munmap(region, size);
}
+
void MemoryMappedFile::flush(char* region, size_t size)
{
::msync(region, size, MS_ASYNC);
}
+
void MemoryMappedFile::expand(size_t offset)
{
if ((::lseek(state->fd, offset - 1, SEEK_SET) == -1) || (::write(state->fd, "", 1) == -1)) {
throw qpid::Exception(QPID_MSG("Failed to expand paged queue file: " << qpid::sys::strError(errno)));
}
}
+
bool MemoryMappedFile::isSupported()
{
return true;
diff --git a/cpp/src/tests/run_paged_queue_tests b/cpp/src/tests/run_paged_queue_tests
index 8f72ddd5b7..cd1dc72641 100755
--- a/cpp/src/tests/run_paged_queue_tests
+++ b/cpp/src/tests/run_paged_queue_tests
@@ -26,7 +26,7 @@ trap stop_broker INT TERM QUIT
export PATH=$PWD:$srcdir:$PYTHON_COMMANDS:$PATH
start_broker() {
- QPID_PORT=$($QPIDD_EXEC --daemon --port 0 --interface 127.0.0.1 --no-data-dir $MODULES --auth no) || fail "Could not start broker"
+ QPID_PORT=$($QPIDD_EXEC --daemon --port 0 --interface 127.0.0.1 --no-data-dir --paging-dir=$PWD/pqtest_data $MODULES --auth no) || fail "Could not start broker"
}
stop_broker() {