summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2015-01-02 17:43:14 +0000
committerKim van der Riet <kpvdr@apache.org>2015-01-02 17:43:14 +0000
commit81fd3962fe649940eb64262a260997a2da064dc6 (patch)
treea6a929788da25bb2c4551f9ab921f93a7d7ac63c
parentd3e81de7862f78b15d47df841a1b22ae8cafd37c (diff)
downloadqpid-python-81fd3962fe649940eb64262a260997a2da064dc6.tar.gz
QPID-5671 [linearstore] Add ability to use disk partitions and select per-queue EFPs: WIP, but mostly complete. Needs additional testing. It is now possible to add queues which use a partition other than the broker default by using qpid-config --durable together with --efp--partition-num and/or --efp-pool-file-size
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1649081 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/linearstore/ISSUES16
-rw-r--r--qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp12
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp99
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.h13
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp7
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp69
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.h4
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp34
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp14
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/jerrno.h4
-rw-r--r--qpid/cpp/src/qpid/linearstore/management-schema.xml49
-rwxr-xr-xqpid/tools/src/py/qpid-config23
12 files changed, 198 insertions, 146 deletions
diff --git a/qpid/cpp/src/qpid/linearstore/ISSUES b/qpid/cpp/src/qpid/linearstore/ISSUES
index 2d5a389615..26e62482a5 100644
--- a/qpid/cpp/src/qpid/linearstore/ISSUES
+++ b/qpid/cpp/src/qpid/linearstore/ISSUES
@@ -25,7 +25,7 @@ Current/pending:
------ ------- ----------------------
5359 - Linearstore: Implement new management schema and wire into store
5360 - Linearstore: Evaluate and rework logging to produce a consistent log output
- 5361 1145359 Linearstore: No tests for linearstore functionality currently exist
+* 5361 1145359 Linearstore: No tests for linearstore functionality currently exist
svn r.1564893 2014-02-05: Added tx-test-soak.sh
svn r.1564935 2014-02-05: Added license text to tx-test-soak.sh
svn r.1625283 2014-09-16: Basic python tests from legacystore ported over to linearstore
@@ -41,11 +41,12 @@ Current/pending:
- 1067480 [LinearStore] Provide a way to limit max count/size of empty files in EFP
- 1067429 [LinearStore] last file from deleted queue is not moved to EFP <queue delete issue>
- 1067482 [LinearStore] Provide a way to prealocate empty pages in EFP
-* 5671 [linearstore] Add ability to use disk partitions and select per-queue EFPs
+* 5671 1160367 [linearstore] Add ability to use disk partitions and select per-queue EFPs
svn r.1636598 2014-11-04: WIP: New EFP and journal dir structure using symlinks
svn r.1637985 2014-11-10: WIP: Auto-upgrade from old dir structure to new
- 6230 1165200 [linearstore] qpid-qls-analyze fails when analyzing empty journal
- svn r.1643053 2014-11-18: Proposed fix
+* 1148807 [linearstore] Restarting broker with empty journal raises confusing warning
+* 1066256 [linearstore] Changing EFP size after using store breaks durable queue creation
+
@@ -147,7 +148,9 @@ NO-JIRA - Added missing Apache copyright/license text
svn r.1631360 2014-10-13 Proposed solution
6157 1150397 linearstore: segfault when 2 journals request new journal file from empty EFP
svn r.1632504 2014-10-17 Proposed solution by pmoravec
- 6248 1167911 [linearstore] Symlink creation fails if store dir path is not absolute
+ 6230 1165200 [linearstore] qpid-qls-analyze fails when analyzing empty journal
+ svn r.1643053 2014-11-18: Proposed fix
+ 6248 1167911 [linearstore] Symlink creation fails if store dir path is not absolute
svn r.1641689 2014-11-25 Proposed solution
@@ -194,7 +197,8 @@ no. svn r Q-JIRA RHBZ Date Alt Committer
34. 1632504 6157 1150397 2014-10-17 (pmoravec)
35. 1636598 5671 2014-11-04
36. 1637985 5671 2014-11-10
-37. 1641689 6248 1167911 2014-11-25
+37. 1643053 6230 1165200 2014-11-18
+38. 1641689 6248 1167911 2014-11-25
See above sections for details on these checkins.
diff --git a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
index 77d5703636..68810936e1 100644
--- a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
+++ b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
@@ -148,7 +148,7 @@ void MessageStoreImpl::initManagement ()
mgmtObject = qmf::org::apache::qpid::linearstore::Store::shared_ptr (
new qmf::org::apache::qpid::linearstore::Store(agent, this, broker));
- mgmtObject->set_location(storeDir);
+ mgmtObject->set_storeDir(storeDir);
mgmtObject->set_tplIsInitialized(false);
mgmtObject->set_tplDirectory(getTplBaseDir());
mgmtObject->set_tplWritePageSize(tplWCachePgSizeSblks * QLS_SBLK_SIZE_BYTES);
@@ -406,7 +406,7 @@ void MessageStoreImpl::create(qpid::broker::PersistableQueue& queue_,
if (queue_.getName().size() == 0)
{
- QLS_LOG(error, "Cannot create store for empty (null) queue name - ignoring and attempting to continue.");
+ QLS_LOG(error, "Cannot create store for empty (null) queue name - queue create ignored.");
return;
}
@@ -449,15 +449,15 @@ qpid::linearstore::journal::EmptyFilePool*
MessageStoreImpl::getEmptyFilePool(const qpid::framing::FieldTable& args_) {
qpid::framing::FieldTable::ValuePtr value;
qpid::linearstore::journal::efpPartitionNumber_t localEfpPartition = defaultEfpPartitionNumber;
- value = args_.get("qpid.efp_partition");
+ value = args_.get("qpid.efp_partition_num");
if (value.get() != 0 && !value->empty() && value->convertsTo<int>()) {
- localEfpPartition = chkEfpPartition((uint32_t)value->get<int>(), "qpid.efp_partition");
+ localEfpPartition = chkEfpPartition((uint32_t)value->get<int>(), "qpid.efp_partition_num");
}
qpid::linearstore::journal::efpDataSize_kib_t localEfpFileSizeKib = defaultEfpFileSize_kib;
- value = args_.get("qpid.efp_file_size");
+ value = args_.get("qpid.efp_pool_file_size");
if (value.get() != 0 && !value->empty() && value->convertsTo<int>()) {
- localEfpFileSizeKib = chkEfpFileSizeKiB((uint32_t)value->get<int>(),"qpid.efp_file_size" );
+ localEfpFileSizeKib = chkEfpFileSizeKiB((uint32_t)value->get<int>(), "qpid.efp_pool_file_size");
}
return getEmptyFilePool(localEfpPartition, localEfpFileSizeKib);
}
diff --git a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp
index 18f4d3afc3..08db3f75bd 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp
@@ -59,10 +59,16 @@ EmptyFilePool::EmptyFilePool(const std::string& efpDirectory,
EmptyFilePool::~EmptyFilePool() {}
void EmptyFilePool::initialize() {
-//std::cout << "*** Initializing EFP " << efpDataSize_kib_ << "k in partition " << partitionPtr_->getPartitionNumber() << "; efpDirectory=" << efpDirectory_ << std::endl; // DEBUG
- std::vector<std::string> dirList;
+ if (::mkdir(efpDirectory_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH)) { // Create EFP dir if it does not yet exist
+ if (errno != EEXIST) {
+ std::ostringstream oss;
+ oss << "directory=" << efpDirectory_ << " " << FORMAT_SYSERR(errno);
+ throw jexception(jerrno::JERR_EFP_MKDIR, oss.str(), "EmptyFilePool", "initialize");
+ }
+ }
// Process empty files in main dir
+ std::vector<std::string> dirList;
jdir::read_dir(efpDirectory_, dirList, false, true, false, false);
for (std::vector<std::string>::iterator i = dirList.begin(); i != dirList.end(); ++i) {
size_t dotPos = i->rfind(".");
@@ -122,14 +128,14 @@ const efpIdentity_t EmptyFilePool::getIdentity() const {
std::string EmptyFilePool::takeEmptyFile(const std::string& destDirectory) {
std::string emptyFileName = popEmptyFile();
- std::string newFileName = efpDirectory_ + "/" + s_inuseFileDirectory_ + emptyFileName.substr(emptyFileName.rfind('/'));
+ std::string newFileName = efpDirectory_ + "/" + s_inuseFileDirectory_ + emptyFileName.substr(emptyFileName.rfind('/')); // NOTE: substr() includes leading '/'
std::string symlinkName = destDirectory + emptyFileName.substr(emptyFileName.rfind('/')); // NOTE: substr() includes leading '/'
- if (moveFile(emptyFileName, newFileName)) {
+ if (!moveFile(emptyFileName, newFileName)) {
// Try again with new UUID for file name
newFileName = efpDirectory_ + "/" + s_inuseFileDirectory_ + "/" + getEfpFileName();
- if (moveFile(emptyFileName, newFileName)) {
+ if (!moveFile(emptyFileName, newFileName)) {
//std::cerr << "*** DEBUG: pushEmptyFile " << emptyFileName << "from EmptyFilePool::takeEmptyFile()" << std::endl; // DEBUG
- pushEmptyFile(emptyFileName);
+ pushEmptyFile(emptyFileName); // Return empty file to pool
std::ostringstream oss;
oss << "file=\"" << emptyFileName << "\" dest=\"" << newFileName << "\"" << FORMAT_SYSERR(errno);
throw jexception(jerrno::JERR_JDIR_FMOVE, oss.str(), "EmptyFilePool", "takeEmptyFile");
@@ -138,7 +144,7 @@ std::string EmptyFilePool::takeEmptyFile(const std::string& destDirectory) {
if (createSymLink(newFileName, symlinkName)) {
std::ostringstream oss;
oss << "file=\"" << emptyFileName << "\" dest=\"" << newFileName << "\" symlink=\"" << symlinkName << "\"" << FORMAT_SYSERR(errno);
- throw jexception(jerrno::JERR_EFP_SYMLINK, oss.str(), "EmptyFilePool", "takeEmptyFile");
+ throw jexception(jerrno::JERR__SYMLINK, oss.str(), "EmptyFilePool", "takeEmptyFile");
}
return symlinkName;
}
@@ -189,12 +195,27 @@ efpDataSize_kib_t EmptyFilePool::dataSizeFromDirName_kib(const std::string& dirN
}
// --- protected functions ---
+void EmptyFilePool::checkIosState(std::ofstream& ofs,
+ const uint32_t jerrno,
+ const std::string& fqFileName,
+ const std::string& operation,
+ const std::string& errorMessage,
+ const std::string& className,
+ const std::string& fnName) {
+ if (!ofs.good()) {
+ if (ofs.is_open()) {
+ ofs.close();
+ }
+ std::ostringstream oss;
+ oss << "IO failure: eofbit=" << (ofs.eof()?"T":"F") << " failbit=" << (ofs.fail()?"T":"F") << " badbit="
+ << (ofs.bad()?"T":"F") << " file=" << fqFileName << " operation=" << operation << ": " << errorMessage;
+ throw jexception(jerrno, oss.str(), className, fnName);
+ }
+}
std::string EmptyFilePool::createEmptyFile() {
std::string efpfn = getEfpFileName();
- if (!overwriteFileContents(efpfn)) {
- // TODO: handle failure to prepare new file here
- }
+ overwriteFileContents(efpfn);
return efpfn;
}
@@ -226,24 +247,20 @@ void EmptyFilePool::initializeSubDirectory(const std::string& fqDirName) {
}
}
-bool EmptyFilePool::overwriteFileContents(const std::string& fqFileName) {
+void EmptyFilePool::overwriteFileContents(const std::string& fqFileName) {
::file_hdr_t fh;
::file_hdr_create(&fh, QLS_FILE_MAGIC, QLS_JRNL_VERSION, QLS_JRNL_FHDR_RES_SIZE_SBLKS, partitionPtr_->getPartitionNumber(), efpDataSize_kib_);
std::ofstream ofs(fqFileName.c_str(), std::ofstream::out | std::ofstream::binary);
- if (ofs.good()) {
- ofs.write((char*)&fh, sizeof(::file_hdr_t));
- uint64_t rem = ((efpDataSize_kib_ + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB)) * 1024) - sizeof(::file_hdr_t);
- while (rem--)
- ofs.put('\0');
- ofs.close();
- return true;
-//std::cout << "*** WARNING: EFP " << efpDirectory_ << " is empty - created new journal file " << fqFileName.substr(fqFileName.rfind('/') + 1) << " on the fly" << std::endl; // DEBUG
- } else {
- std::ostringstream oss;
- oss << "std::ofstream ofs: file=\"" << fqFileName.c_str() << "\"" << " failed to be open";
- throw jexception(jerrno::JERR_EFP_FOPEN, oss.str(), "EmptyFilePool", "overwriteFileContents");
+ checkIosState(ofs, jerrno::JERR_EFP_FOPEN, fqFileName, "constructor", "Failed to create file", "EmptyFilePool", "overwriteFileContents");
+ ofs.write((char*)&fh, sizeof(::file_hdr_t));
+ checkIosState(ofs, jerrno::JERR_EFP_FWRITE, fqFileName, "write()", "Failed to write header", "EmptyFilePool", "overwriteFileContents");
+ uint64_t rem = ((efpDataSize_kib_ + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB)) * 1024) - sizeof(::file_hdr_t);
+ while (rem--) {
+ ofs.put('\0');
+ checkIosState(ofs, jerrno::JERR_EFP_FWRITE, fqFileName, "put()", "Failed to put \0", "EmptyFilePool", "overwriteFileContents");
}
- return false;
+ ofs.close();
+//std::cout << "*** WARNING: EFP " << efpDirectory_ << " is empty - created new journal file " << fqFileName.substr(fqFileName.rfind('/') + 1) << " on the fly" << std::endl; // DEBUG
}
std::string EmptyFilePool::popEmptyFile() {
@@ -271,7 +288,7 @@ void EmptyFilePool::pushEmptyFile(const std::string fqFileName) {
void EmptyFilePool::returnEmptyFile(const std::string& emptyFileName) {
std::string returnedFileName = efpDirectory_ + "/" + s_returnedFileDirectory_ + emptyFileName.substr(emptyFileName.rfind('/')); // NOTE: substr() includes leading '/'
- if (moveFile(emptyFileName, returnedFileName)) {
+ if (!moveFile(emptyFileName, returnedFileName)) {
::unlink(emptyFileName.c_str());
//std::cerr << "*** WARNING: Unable to move file " << emptyFileName << " to " << returnedFileName << "; deleted." << std::endl; // DEBUG
}
@@ -283,7 +300,7 @@ void EmptyFilePool::returnEmptyFile(const std::string& emptyFileName) {
overwriteFileContents(returnedFileName);
}
std::string sanitizedEmptyFileName = efpDirectory_ + returnedFileName.substr(returnedFileName.rfind('/')); // NOTE: substr() includes leading '/'
- if (moveFile(returnedFileName, sanitizedEmptyFileName)) {
+ if (!moveFile(returnedFileName, sanitizedEmptyFileName)) {
::unlink(returnedFileName.c_str());
//std::cerr << "*** WARNING: Unable to move file " << returnedFileName << " to " << sanitizedEmptyFileName << "; deleted." << std::endl; // DEBUG
} else {
@@ -395,18 +412,6 @@ bool EmptyFilePool::validateEmptyFile(const std::string& emptyFileName) const {
return true;
}
-// static
-int EmptyFilePool::moveFile(const std::string& from,
- const std::string& to) {
- if (::rename(from.c_str(), to.c_str())) {
- if (errno == EEXIST) return errno; // File name exists
- std::ostringstream oss;
- oss << "file=\"" << from << "\" dest=\"" << to << "\"" << FORMAT_SYSERR(errno);
- throw jexception(jerrno::JERR_JDIR_FMOVE, oss.str(), "EmptyFilePool", "returnEmptyFile");
- }
- return 0;
-}
-
//static
int EmptyFilePool::createSymLink(const std::string& fqFileName,
const std::string& fqLinkName) {
@@ -414,7 +419,7 @@ int EmptyFilePool::createSymLink(const std::string& fqFileName,
if (errno == EEXIST) return errno; // File name exists
std::ostringstream oss;
oss << "file=\"" << fqFileName << "\" symlink=\"" << fqLinkName << "\"" << FORMAT_SYSERR(errno);
- throw jexception(jerrno::JERR_EFP_SYMLINK, oss.str(), "EmptyFilePool", "createSymLink");
+ throw jexception(jerrno::JERR__SYMLINK, oss.str(), "EmptyFilePool", "createSymLink");
}
return 0;
}
@@ -426,7 +431,7 @@ std::string EmptyFilePool::deleteSymlink(const std::string& fqLinkName) {
if (len < 0) {
std::ostringstream oss;
oss << "symlink=\"" << fqLinkName << "\"" << FORMAT_SYSERR(errno);
- throw jexception(jerrno::JERR_EFP_SYMLINK, oss.str(), "EmptyFilePool", "deleteSymlink");
+ throw jexception(jerrno::JERR__SYMLINK, oss.str(), "EmptyFilePool", "deleteSymlink");
}
::unlink(fqLinkName.c_str());
return std::string(buff, len);
@@ -455,4 +460,18 @@ bool EmptyFilePool::isSymlink(const std::string& fqName) {
}
+// static
+bool EmptyFilePool::moveFile(const std::string& from,
+ const std::string& to) {
+ if (::rename(from.c_str(), to.c_str())) {
+ if (errno == EEXIST) {
+ return false; // File name exists
+ }
+ std::ostringstream oss;
+ oss << "file=\"" << from << "\" dest=\"" << to << "\"" << FORMAT_SYSERR(errno);
+ throw jexception(jerrno::JERR_JDIR_FMOVE, oss.str(), "EmptyFilePool", "returnEmptyFile");
+ }
+ return true;
+}
+
}}}
diff --git a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.h b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.h
index 1a1264fa26..dc567ff917 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.h
@@ -87,23 +87,30 @@ public:
const efpPartitionNumber_t partitionNumber);
protected:
+ void checkIosState(std::ofstream& ofs,
+ const uint32_t jerrno,
+ const std::string& fqFileName,
+ const std::string& operation,
+ const std::string& errorMessage,
+ const std::string& className,
+ const std::string& fnName);
std::string createEmptyFile();
std::string getEfpFileName();
void initializeSubDirectory(const std::string& fqDirName);
- bool overwriteFileContents(const std::string& fqFileName);
+ void overwriteFileContents(const std::string& fqFileName);
std::string popEmptyFile();
void pushEmptyFile(const std::string fqFileName);
void returnEmptyFile(const std::string& emptyFileName);
void resetEmptyFileHeader(const std::string& fqFileName);
bool validateEmptyFile(const std::string& emptyFileName) const;
- static int moveFile(const std::string& fromFqPath,
- const std::string& toFqPath);
static int createSymLink(const std::string& fqFileName,
const std::string& fqLinkName);
static std::string deleteSymlink(const std::string& fqLinkName);
static bool isFile(const std::string& fqName);
static bool isSymlink(const std::string& fqName);
+ static bool moveFile(const std::string& fromFqPath,
+ const std::string& toFqPath);
};
}}}
diff --git a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp
index 28e1b0b56e..33707578bf 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp
@@ -165,9 +165,10 @@ EmptyFilePool* EmptyFilePoolManager::getEmptyFilePool(const efpIdentity_t efpIde
EmptyFilePool* EmptyFilePoolManager::getEmptyFilePool(const efpPartitionNumber_t partitionNumber,
const efpDataSize_kib_t efpDataSize_kib) {
EmptyFilePoolPartition* efppp = getEfpPartition(partitionNumber > 0 ? partitionNumber : defaultPartitionNumber_);
- if (efppp != 0)
- return efppp->getEmptyFilePool(efpDataSize_kib > 0 ? efpDataSize_kib : defaultEfpDataSize_kib_);
- return 0;
+ if (efppp == 0) {
+ return 0;
+ }
+ return efppp->getEmptyFilePool(efpDataSize_kib > 0 ? efpDataSize_kib : defaultEfpDataSize_kib_, true);
}
void EmptyFilePoolManager::getEmptyFilePools(std::vector<EmptyFilePool*>& emptyFilePoolList,
diff --git a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp
index a31855e0d8..9b58e8c4ff 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp
@@ -85,26 +85,7 @@ EmptyFilePoolPartition::findEmptyFilePools() {
}
}
}
- EmptyFilePool* efpp = 0;
- try {
- efpp = new EmptyFilePool(fqFileName, this, overwriteBeforeReturnFlag_, truncateFlag_, journalLogRef_);
- {
- slock l(efpMapMutex_);
- efpMap_[efpp->dataSize_kib()] = efpp;
- }
- }
- catch (const std::exception& e) {
- if (efpp != 0) {
- delete efpp;
- efpp = 0;
- }
- std::ostringstream oss;
- oss << "EmptyFilePool create failed: " << e.what();
- journalLogRef_.log(JournalLog::LOG_WARN, oss.str());
- }
- if (efpp != 0) {
- efpp->initialize();
- }
+ createEmptyFilePool(fqFileName);
}
if (upgradeDirStructureFlag) {
std::string oldEfpDir(partitionDir_ + "/efp");
@@ -117,12 +98,17 @@ EmptyFilePoolPartition::findEmptyFilePools() {
}
}
-EmptyFilePool* EmptyFilePoolPartition::getEmptyFilePool(const efpDataSize_kib_t efpDataSize_kib) {
- slock l(efpMapMutex_);
- efpMapItr_t i = efpMap_.find(efpDataSize_kib);
- if (i == efpMap_.end())
- return 0;
- return i->second;
+EmptyFilePool* EmptyFilePoolPartition::getEmptyFilePool(const efpDataSize_kib_t efpDataSize_kib, const bool createIfNonExistent) {
+ {
+ slock l(efpMapMutex_);
+ efpMapItr_t i = efpMap_.find(efpDataSize_kib);
+ if (i != efpMap_.end())
+ return i->second;
+ }
+ if (createIfNonExistent) {
+ return createEmptyFilePool(efpDataSize_kib);
+ }
+ return 0;
}
void EmptyFilePoolPartition::getEmptyFilePools(std::vector<EmptyFilePool*>& efpList) {
@@ -183,7 +169,7 @@ std::string EmptyFilePoolPartition::getPartionDirectoryName(const efpPartitionNu
//static
efpPartitionNumber_t EmptyFilePoolPartition::getPartitionNumber(const std::string& name) {
if (name.length() == 4 && name[0] == 'p' && ::isdigit(name[1]) && ::isdigit(name[2]) && ::isdigit(name[3])) {
- long pn = ::strtol(name.c_str() + 1, 0, 0);
+ long pn = ::strtol(name.c_str() + 1, 0, 10);
if (pn == 0 && errno) {
return 0;
} else {
@@ -195,6 +181,35 @@ efpPartitionNumber_t EmptyFilePoolPartition::getPartitionNumber(const std::strin
// --- protected functions ---
+EmptyFilePool* EmptyFilePoolPartition::createEmptyFilePool(const efpDataSize_kib_t efpDataSize_kib) {
+ std::string fqEfpDirectoryName(partitionDir_ + "/" + EmptyFilePool::dirNameFromDataSize(efpDataSize_kib));
+ return createEmptyFilePool(fqEfpDirectoryName);
+}
+
+EmptyFilePool* EmptyFilePoolPartition::createEmptyFilePool(const std::string fqEfpDirectoryName) {
+ EmptyFilePool* efpp = 0;
+ try {
+ efpp = new EmptyFilePool(fqEfpDirectoryName, this, overwriteBeforeReturnFlag_, truncateFlag_, journalLogRef_);
+ {
+ slock l(efpMapMutex_);
+ efpMap_[efpp->dataSize_kib()] = efpp;
+ }
+ }
+ catch (const std::exception& e) {
+ if (efpp != 0) {
+ delete efpp;
+ efpp = 0;
+ }
+ std::ostringstream oss;
+ oss << "EmptyFilePool create failed: " << e.what();
+ journalLogRef_.log(JournalLog::LOG_WARN, oss.str());
+ }
+ if (efpp != 0) {
+ efpp->initialize();
+ }
+ return efpp;
+}
+
void EmptyFilePoolPartition::validatePartitionDir() {
if (!jdir::is_dir(partitionDir_)) {
std::ostringstream ss;
diff --git a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.h b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.h
index c653c6be6a..23a541f8f4 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.h
@@ -59,7 +59,7 @@ public:
virtual ~EmptyFilePoolPartition();
void findEmptyFilePools();
- EmptyFilePool* getEmptyFilePool(const efpDataSize_kib_t efpDataSize_kib);
+ EmptyFilePool* getEmptyFilePool(const efpDataSize_kib_t efpDataSize_kib, const bool createIfNonExistent);
void getEmptyFilePools(std::vector<EmptyFilePool*>& efpList);
void getEmptyFilePoolSizes_kib(std::vector<efpDataSize_kib_t>& efpDataSizesList) const;
std::string getPartitionDirectory() const;
@@ -70,6 +70,8 @@ public:
static efpPartitionNumber_t getPartitionNumber(const std::string& name);
protected:
+ EmptyFilePool* createEmptyFilePool(const efpDataSize_kib_t efpDataSize_kib);
+ EmptyFilePool* createEmptyFilePool(const std::string fqEfpDirectoryName);
void validatePartitionDir();
};
diff --git a/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp b/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp
index 3f39913422..198b39857c 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp
@@ -43,6 +43,7 @@
#include "qpid/linearstore/journal/utils/file_hdr.h"
#include <sstream>
#include <string>
+#include <unistd.h>
#include <vector>
namespace qpid {
@@ -101,7 +102,11 @@ void RecoveryManager::analyzeJournals(const std::vector<std::string>* preparedTr
analyzeJournalFileHeaders(efpIdentity);
if (journalEmptyFlag_) {
- *emptyFilePoolPtrPtr = emptyFilePoolManager->getEmptyFilePool(0, 0); // Use default EFP
+ if (uninitFileList_.empty()) {
+ *emptyFilePoolPtrPtr = emptyFilePoolManager->getEmptyFilePool(0, 0); // Use default EFP
+ } else {
+ *emptyFilePoolPtrPtr = emptyFilePoolManager->getEmptyFilePool(efpIdentity);
+ }
} else {
*emptyFilePoolPtrPtr = emptyFilePoolManager->getEmptyFilePool(efpIdentity);
if (! *emptyFilePoolPtrPtr) {
@@ -294,6 +299,7 @@ void RecoveryManager::setLinearFileControllerJournals(lfcAddJournalFileFn fnPtr,
LinearFileController* lfcPtr) {
if (journalEmptyFlag_) {
if (uninitFileList_.size() > 0) {
+ // TODO: Handle case if uninitFileList_.size() > 1, but this should not happen in normal operation. Here we assume only one item in the list.
std::string uninitFile = uninitFileList_.back();
uninitFileList_.pop_back();
lfcPtr->restoreEmptyFile(uninitFile);
@@ -377,11 +383,28 @@ void RecoveryManager::analyzeJournalFileHeaders(efpIdentity_t& efpIdentity) {
jdir::read_dir(journalDirectory_, directoryList, false, true, false, true);
for (stringListConstItr_t i = directoryList.begin(); i != directoryList.end(); ++i) {
bool hdrOk = readJournalFileHeader(*i, fileHeader, headerQueueName);
- if (!hdrOk || headerQueueName.empty()) {
+ bool hdrEmpty = ::is_file_hdr_reset(&fileHeader);
+ if (!hdrOk) {
std::ostringstream oss;
- oss << "Journal file " << (*i) << " is uninitialized or corrupted";
+ oss << "Journal file " << (*i) << " is corrupted or invalid";
journalLogRef_.log(JournalLog::LOG_WARN, queueName_, oss.str());
+ } else if (hdrEmpty) {
+ // Read symlink, find efp directory name which is efp size in KiB
+ // TODO: place this bit into a common function as it is also used in EmptyFilePool.cpp::deleteSymlink()
+ char buff[1024];
+ ssize_t len = ::readlink((*i).c_str(), buff, 1024);
+ if (len < 0) {
+ std::ostringstream oss;
+ oss << "symlink=\"" << (*i) << "\"" << FORMAT_SYSERR(errno);
+ throw jexception(jerrno::JERR__SYMLINK, oss.str(), "RecoveryManager", "analyzeJournalFileHeaders");
+ }
+ // Find second and third '/' from back of string, which contains the EFP directory name
+ *(::strrchr(buff, '/')) = '\0';
+ *(::strrchr(buff, '/')) = '\0';
+ int efpDataSize_kib = atoi(::strrchr(buff, '/') + 1);
uninitFileList_.push_back(*i);
+ efpIdentity.pn_ = fileHeader._efp_partition;
+ efpIdentity.ds_ = efpDataSize_kib;
} else if (headerQueueName.compare(queueName_) != 0) {
std::ostringstream oss;
oss << "Journal file " << (*i) << " belongs to queue \"" << headerQueueName << "\": ignoring";
@@ -406,6 +429,7 @@ void RecoveryManager::analyzeJournalFileHeaders(efpIdentity_t& efpIdentity) {
}
}
+std::cerr << "*** RecoveryManager::analyzeJournalFileHeaders() fileNumberMap_.size()=" << fileNumberMap_.size() << std::endl; // DEBUG
if (fileNumberMap_.empty()) {
journalEmptyFlag_ = true;
} else {
@@ -905,7 +929,9 @@ bool RecoveryManager::readJournalFileHeader(const std::string& journalFileName,
}
ifs.close();
::memcpy(&fileHeaderRef, buffer, sizeof(::file_hdr_t));
- if (::file_hdr_check(&fileHeaderRef, QLS_FILE_MAGIC, QLS_JRNL_VERSION, 0, QLS_MAX_QUEUE_NAME_LEN)) return false;
+ if (::file_hdr_check(&fileHeaderRef, QLS_FILE_MAGIC, QLS_JRNL_VERSION, 0, QLS_MAX_QUEUE_NAME_LEN)) {
+ return false;
+ }
queueName.assign(buffer + sizeof(::file_hdr_t), fileHeaderRef._queue_name_len);
return true;
}
diff --git a/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp b/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp
index 9d59039220..ce88e7809c 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp
@@ -42,6 +42,7 @@ const uint32_t jerrno::JERR__UNEXPRESPONSE = 0x0108;
const uint32_t jerrno::JERR__RECNFOUND = 0x0109;
const uint32_t jerrno::JERR__NOTIMPL = 0x010a;
const uint32_t jerrno::JERR__NULL = 0x010b;
+const uint32_t jerrno::JERR__SYMLINK = 0x010c;
// class jcntl
const uint32_t jerrno::JERR_JCNTL_STOPPED = 0x0200;
@@ -112,10 +113,11 @@ const uint32_t jerrno::JERR_EFP_BADPARTITIONDIR = 0x0d02;
const uint32_t jerrno::JERR_EFP_BADEFPDIRNAME = 0x0d03;
const uint32_t jerrno::JERR_EFP_NOEFP = 0x0d04;
const uint32_t jerrno::JERR_EFP_EMPTY = 0x0d05;
-const uint32_t jerrno::JERR_EFP_SYMLINK = 0x0d06;
-const uint32_t jerrno::JERR_EFP_LSTAT = 0x0d07;
-const uint32_t jerrno::JERR_EFP_BADFILETYPE = 0x0d08;
-const uint32_t jerrno::JERR_EFP_FOPEN = 0x0d09;
+const uint32_t jerrno::JERR_EFP_LSTAT = 0x0d06;
+const uint32_t jerrno::JERR_EFP_BADFILETYPE = 0x0d07;
+const uint32_t jerrno::JERR_EFP_FOPEN = 0x0d08;
+const uint32_t jerrno::JERR_EFP_FWRITE = 0x0d09;
+const uint32_t jerrno::JERR_EFP_MKDIR = 0x0d0a;
// Negative returns for some functions
const int32_t jerrno::AIO_TIMEOUT = -1;
@@ -140,6 +142,7 @@ jerrno::__init()
_err_map[JERR__RECNFOUND] = "JERR__RECNFOUND: Record not found.";
_err_map[JERR__NOTIMPL] = "JERR__NOTIMPL: Not implemented";
_err_map[JERR__NULL] = "JERR__NULL: Operation on null pointer";
+ _err_map[JERR__SYMLINK] = "JERR__SYMLINK: Symbolic link operation failed";
// class jcntl
_err_map[JERR_JCNTL_STOPPED] = "JERR_JCNTL_STOPPED: Operation on stopped journal.";
@@ -210,10 +213,11 @@ jerrno::__init()
_err_map[JERR_EFP_BADPARTITIONDIR] = "JERR_EFP_BADPARTITIONDIR: Invalid partition directory";
_err_map[JERR_EFP_NOEFP] = "JERR_EFP_NOEFP: No Empty File Pool found for given partition and empty file size";
_err_map[JERR_EFP_EMPTY] = "JERR_EFP_EMPTY: Empty File Pool is empty";
- _err_map[JERR_EFP_SYMLINK] = "JERR_EFP_SYMLINK: Symbolic link operation failed";
_err_map[JERR_EFP_LSTAT] = "JERR_EFP_LSTAT: lstat() operation failed";
_err_map[JERR_EFP_BADFILETYPE] = "JERR_EFP_BADFILETYPE: File type incorrect for operation";
_err_map[JERR_EFP_FOPEN] = "JERR_EFP_FOPEN: Unable to fopen file for write";
+ _err_map[JERR_EFP_FWRITE] = "JERR_EFP_FWRITE: Write failed";
+ _err_map[JERR_EFP_MKDIR] = "JERR_EFP_MKDIR: Directory creation failed";
//_err_map[] = "";
diff --git a/qpid/cpp/src/qpid/linearstore/journal/jerrno.h b/qpid/cpp/src/qpid/linearstore/journal/jerrno.h
index 77b18b17e8..6e817682ca 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/jerrno.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/jerrno.h
@@ -60,6 +60,7 @@ namespace journal {
static const uint32_t JERR__RECNFOUND; ///< Record not found
static const uint32_t JERR__NOTIMPL; ///< Not implemented
static const uint32_t JERR__NULL; ///< Operation on null pointer
+ static const uint32_t JERR__SYMLINK; ///< Symbolic Link operation failed
// class jcntl
static const uint32_t JERR_JCNTL_STOPPED; ///< Operation on stopped journal
@@ -130,10 +131,11 @@ namespace journal {
static const uint32_t JERR_EFP_BADPARTITIONDIR; ///< Invalid partition directory
static const uint32_t JERR_EFP_NOEFP; ///< No EFP found for given partition and file size
static const uint32_t JERR_EFP_EMPTY; ///< EFP empty
- static const uint32_t JERR_EFP_SYMLINK; ///< Symbolic Link operation failed
static const uint32_t JERR_EFP_LSTAT; ///< lstat operation failed
static const uint32_t JERR_EFP_BADFILETYPE; ///< Bad file type
static const uint32_t JERR_EFP_FOPEN; ///< Unable to fopen file for write
+ static const uint32_t JERR_EFP_FWRITE; ///< Write failed
+ static const uint32_t JERR_EFP_MKDIR; ///< Directory creation failed
// Negative returns for some functions
static const int32_t AIO_TIMEOUT; ///< Timeout waiting for AIO return
diff --git a/qpid/cpp/src/qpid/linearstore/management-schema.xml b/qpid/cpp/src/qpid/linearstore/management-schema.xml
index a55883a255..ebd388593e 100644
--- a/qpid/cpp/src/qpid/linearstore/management-schema.xml
+++ b/qpid/cpp/src/qpid/linearstore/management-schema.xml
@@ -21,38 +21,24 @@
<class name="Store">
<property name="brokerRef" type="objId" access="RO" references="qpid.Broker" index="y" parentRef="y"/>
- <property name="location" type="sstr" access="RO" desc="Logical directory on disk"/>
- <!--property name="defaultInitialFileCount" type="uint16" access="RO" unit="file" desc="Default number of files initially allocated to each journal"/-->
- <!--property name="defaultDataFileSize" type="uint32" access="RO" unit="RdPg" desc="Default size of each journal data file"/-->
+ <property name="storeDir" type="sstr" access="RO" desc="Logical directory on disk"/>
<property name="tplIsInitialized" type="bool" access="RO" desc="Transaction prepared list has been initialized by a transactional prepare"/>
<property name="tplDirectory" type="sstr" access="RO" desc="Transaction prepared list directory"/>
<property name="tplWritePageSize" type="uint32" access="RO" unit="byte" desc="Page size in transaction prepared list write-page-cache"/>
<property name="tplWritePages" type="uint32" access="RO" unit="wpage" desc="Number of pages in transaction prepared list write-page-cache"/>
- <!--property name="tplInitialFileCount" type="uint16" access="RO" unit="file" desc="Number of files initially allocated to transaction prepared list journal"/-->
- <!--property name="tplDataFileSize" type="uint32" access="RO" unit="byte" desc="Size of each journal data file in transaction prepared list journal"/-->
- <!--property name="tplCurrentFileCount" type="uint32" access="RO" unit="file" desc="Number of files currently allocated to transaction prepared list journal"/-->
<statistic name="tplTransactionDepth" type="hilo32" unit="txn" desc="Number of currently enqueued prepared transactions"/>
<statistic name="tplTxnPrepares" type="count64" unit="record" desc="Total transaction prepares on transaction prepared list"/>
<statistic name="tplTxnCommits" type="count64" unit="record" desc="Total transaction commits on transaction prepared list"/>
<statistic name="tplTxnAborts" type="count64" unit="record" desc="Total transaction aborts on transaction prepared list"/>
- <statistic name="tplOutstandingAIOs" type="hilo32" unit="aio_op" desc="Deprecated"/>
</class>
<class name="Journal">
<property name="queueRef" type="objId" access="RO" references="qpid.Queue" isGeneralReference="y"/>
- <property name="name" type="sstr" access="RC" index="y"/>
+ <property name="queueName" type="sstr" access="RC" index="y"/>
<property name="directory" type="sstr" access="RO" desc="Directory containing journal files"/>
- <property name="baseFileName" type="sstr" access="RO" desc="Deprecated"/>
<property name="writePageSize" type="uint32" access="RO" unit="byte" desc="Deprecated"/>
<property name="writePages" type="uint32" access="RO" unit="wpage" desc="Deprecated"/>
- <property name="readPageSize" type="uint32" access="RO" unit="byte" desc="Deprecated"/>
- <property name="readPages" type="uint32" access="RO" unit="rpage" desc="Deprecated"/>
- <!--property name="initialFileCount" type="uint16" access="RO" unit="file" desc="Number of files initially allocated to this journal"/-->
- <!--property name="autoExpand" type="bool" access="RO" desc="Auto-expand enabled"/-->
- <!--property name="currentFileCount" type="uint16" access="RO" unit="file" desc="Number of files currently allocated to this journal"/-->
- <!--property name="maxFileCount" type="uint16" access="RO" unit="file" desc="Max number of files allowed for this journal"/-->
- <!--property name="dataFileSize" type="uint32" access="RO" unit="byte" desc="Size of each journal data file"/-->
<statistic name="recordDepth" type="hilo32" unit="record" desc="Number of currently enqueued records (durable messages)"/>
<statistic name="enqueues" type="count64" unit="record" desc="Total enqueued records on journal"/>
@@ -64,36 +50,5 @@
<statistic name="txnAborts" type="count64" unit="record" desc="Total transactional abort records on journal"/>
<statistic name="outstandingAIOs" type="hilo32" unit="aio_op" desc="Number of currently outstanding AIO requests in Async IO system"/>
-<!--
- The following are not yet "wired up" in JournalImpl.cpp
--->
- <statistic name="freeFileCount" type="hilo32" unit="file" desc="Deprecated"/>
- <statistic name="availableFileCount" type="hilo32" unit="file" desc="Deprecated"/>
- <statistic name="writeWaitFailures" type="count64" unit="record" desc="Deprecated"/>
- <statistic name="writeBusyFailures" type="count64" unit="record" desc="Deprecated"/>
- <statistic name="readRecordCount" type="count64" unit="record" desc="Deprecated"/>
- <statistic name="readBusyFailures" type="count64" unit="record" desc="Deprecated"/>
- <statistic name="writePageCacheDepth" type="hilo32" unit="wpage" desc="Deprecated"/>
- <statistic name="readPageCacheDepth" type="hilo32" unit="rpage" desc="Deprecated"/>
-
- <!--method name="expand" desc="Increase number of files allocated for this journal">
- <arg name="by" type="uint32" dir="I" desc="Number of files to increase journal size by"/>
- </method-->
</class>
-
- <eventArguments>
- <!--arg name="autoExpand" type="bool" desc="Journal auto-expand enabled"/-->
- <arg name="fileSize" type="uint32" desc="Journal file size in bytes"/>
- <arg name="jrnlId" type="sstr" desc="Journal Id"/>
- <arg name="numEnq" type="uint32" desc="Number of recovered enqueues"/>
- <arg name="numFiles" type="uint16" desc="Number of journal files"/>
- <arg name="numTxn" type="uint32" desc="Number of recovered transactions"/>
- <arg name="numTxnDeq" type="uint32" desc="Number of recovered transactional dequeues"/>
- <arg name="numTxnEnq" type="uint32" desc="Number of recovered transactional enqueues"/>
- <arg name="what" type="sstr" desc="Description of event"/>
- </eventArguments>
- <event name="enqThresholdExceeded" sev="warn" args="jrnlId, what"/>
- <event name="created" sev="notice" args="jrnlId, fileSize, numFiles"/>
- <event name="full" sev="error" args="jrnlId, what"/>
- <event name="recovered" sev="notice" args="jrnlId, fileSize, numFiles, numEnq, numTxn, numTxnEnq, numTxnDeq"/>
</schema>
diff --git a/qpid/tools/src/py/qpid-config b/qpid/tools/src/py/qpid-config
index ac93954484..8d38b1a342 100755
--- a/qpid/tools/src/py/qpid-config
+++ b/qpid/tools/src/py/qpid-config
@@ -105,6 +105,8 @@ class Config:
self._if_unused = True
self._fileCount = None
self._fileSize = None
+ self._efp_partition_num = None
+ self._efp_pool_file_size = None
self._maxQueueSize = None
self._maxQueueCount = None
self._limitPolicy = None
@@ -137,6 +139,8 @@ conn_options = {}
FILECOUNT = "qpid.file_count"
FILESIZE = "qpid.file_size"
+EFP_PARTITION_NUM = "qpid.efp_partition_num"
+EFP_POOL_FILE_SIZE = "qpid.efp_pool_file_size"
MAX_QUEUE_SIZE = "qpid.max_size"
MAX_QUEUE_COUNT = "qpid.max_count"
POLICY_TYPE = "qpid.policy_type"
@@ -158,7 +162,8 @@ REPLICATE = "qpid.replicate"
#i.e. the arguments for which there is special processing on add and
#list
SPECIAL_ARGS=[
- FILECOUNT,FILESIZE,MAX_QUEUE_SIZE,MAX_QUEUE_COUNT,POLICY_TYPE,
+ FILECOUNT,FILESIZE,EFP_PARTITION_NUM,EFP_POOL_FILE_SIZE,
+ MAX_QUEUE_SIZE,MAX_QUEUE_COUNT,POLICY_TYPE,
LVQ_KEY,MSG_SEQUENCE,IVE,QUEUE_EVENT_GENERATION,
FLOW_STOP_COUNT,FLOW_RESUME_COUNT,FLOW_STOP_SIZE,FLOW_RESUME_SIZE,
MSG_GROUP_HDR_KEY,SHARED_MSG_GROUP,REPLICATE]
@@ -213,8 +218,10 @@ def OptionsAndArguments(argv):
parser.add_option_group(group2)
group3 = OptionGroup(parser, "Options for Adding Queues")
- group3.add_option("--file-count", action="store", type="int", metavar="<n>", help="Number of files in queue's persistence journal")
- group3.add_option("--file-size", action="store", type="int", metavar="<n>", help="File size in pages (64KiB/page)")
+ group3.add_option("--file-count", action="store", type="int", metavar="<n>", help="[legacystore] Number of files in queue's persistence journal")
+ group3.add_option("--file-size", action="store", type="int", metavar="<n>", help="[legactystore] File size in pages (64KiB/page)")
+ group3.add_option("--efp-partition-num", action="store", type="int", metavar="<n>", help="[linearstore] EFP partition number")
+ group3.add_option("--efp-pool-file-size", action="store", type="int", metavar="<n>", help="[linearstore] EFP file size (KiB)")
group3.add_option("--max-queue-size", action="store", type="int", metavar="<n>", help="Maximum in-memory queue size as bytes")
group3.add_option("--max-queue-count", action="store", type="int", metavar="<n>", help="Maximum in-memory queue size as a number of messages")
group3.add_option("--limit-policy", action="store", choices=["none", "reject", "ring", "ring-strict"], metavar="<policy>", help="Action to take when queue limit is reached")
@@ -294,6 +301,10 @@ def OptionsAndArguments(argv):
config._fileCount = opts.file_count
if opts.file_size is not None:
config._fileSize = opts.file_size
+ if opts.efp_partition_num is not None:
+ config._efp_partition_num = opts.efp_partition_num
+ if opts.efp_pool_file_size is not None:
+ config._efp_pool_file_size = opts.efp_pool_file_size
if opts.max_queue_size is not None:
config._maxQueueSize = opts.max_queue_size
if opts.max_queue_count is not None:
@@ -524,6 +535,8 @@ class BrokerManager:
if q.exclusive: print "excl",
if FILESIZE in args: print "--file-size=%s" % args[FILESIZE],
if FILECOUNT in args: print "--file-count=%s" % args[FILECOUNT],
+ if EFP_PARTITION_NUM in args: print "--efp-partition-num=%s" % args[EFP_PARTITION_NUM],
+ if EFP_POOL_FILE_SIZE in args: print "--efp-pool-file-size=%s" % args[EFP_POOL_FILE_SIZE],
if MAX_QUEUE_SIZE in args: print "--max-queue-size=%s" % args[MAX_QUEUE_SIZE],
if MAX_QUEUE_COUNT in args: print "--max-queue-count=%s" % args[MAX_QUEUE_COUNT],
if POLICY_TYPE in args: print "--limit-policy=%s" % args[POLICY_TYPE].replace("_", "-"),
@@ -606,6 +619,10 @@ class BrokerManager:
declArgs[FILECOUNT] = config._fileCount
if config._fileSize:
declArgs[FILESIZE] = config._fileSize
+ if config._efp_partition_num:
+ declArgs[EFP_PARTITION_NUM] = config._efp_partition_num
+ if config._efp_pool_file_size:
+ declArgs[EFP_POOL_FILE_SIZE] = config._efp_pool_file_size
if config._maxQueueSize is not None:
declArgs[MAX_QUEUE_SIZE] = config._maxQueueSize