diff options
author | Kim van der Riet <kpvdr@apache.org> | 2015-01-02 17:43:14 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2015-01-02 17:43:14 +0000 |
commit | 81fd3962fe649940eb64262a260997a2da064dc6 (patch) | |
tree | a6a929788da25bb2c4551f9ab921f93a7d7ac63c | |
parent | d3e81de7862f78b15d47df841a1b22ae8cafd37c (diff) | |
download | qpid-python-81fd3962fe649940eb64262a260997a2da064dc6.tar.gz |
QPID-5671 [linearstore] Add ability to use disk partitions and select per-queue EFPs: WIP, but mostly complete. Needs additional testing. It is now possible to add queues which use a partition other than the broker default by using qpid-config --durable together with --efp--partition-num and/or --efp-pool-file-size
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1649081 13f79535-47bb-0310-9956-ffa450edef68
12 files changed, 198 insertions, 146 deletions
diff --git a/qpid/cpp/src/qpid/linearstore/ISSUES b/qpid/cpp/src/qpid/linearstore/ISSUES index 2d5a389615..26e62482a5 100644 --- a/qpid/cpp/src/qpid/linearstore/ISSUES +++ b/qpid/cpp/src/qpid/linearstore/ISSUES @@ -25,7 +25,7 @@ Current/pending: ------ ------- ---------------------- 5359 - Linearstore: Implement new management schema and wire into store 5360 - Linearstore: Evaluate and rework logging to produce a consistent log output - 5361 1145359 Linearstore: No tests for linearstore functionality currently exist +* 5361 1145359 Linearstore: No tests for linearstore functionality currently exist svn r.1564893 2014-02-05: Added tx-test-soak.sh svn r.1564935 2014-02-05: Added license text to tx-test-soak.sh svn r.1625283 2014-09-16: Basic python tests from legacystore ported over to linearstore @@ -41,11 +41,12 @@ Current/pending: - 1067480 [LinearStore] Provide a way to limit max count/size of empty files in EFP - 1067429 [LinearStore] last file from deleted queue is not moved to EFP <queue delete issue> - 1067482 [LinearStore] Provide a way to prealocate empty pages in EFP -* 5671 [linearstore] Add ability to use disk partitions and select per-queue EFPs +* 5671 1160367 [linearstore] Add ability to use disk partitions and select per-queue EFPs svn r.1636598 2014-11-04: WIP: New EFP and journal dir structure using symlinks svn r.1637985 2014-11-10: WIP: Auto-upgrade from old dir structure to new - 6230 1165200 [linearstore] qpid-qls-analyze fails when analyzing empty journal - svn r.1643053 2014-11-18: Proposed fix +* 1148807 [linearstore] Restarting broker with empty journal raises confusing warning +* 1066256 [linearstore] Changing EFP size after using store breaks durable queue creation + @@ -147,7 +148,9 @@ NO-JIRA - Added missing Apache copyright/license text svn r.1631360 2014-10-13 Proposed solution 6157 1150397 linearstore: segfault when 2 journals request new journal file from empty EFP svn r.1632504 2014-10-17 Proposed solution by pmoravec - 6248 1167911 [linearstore] Symlink creation fails if store dir path is not absolute + 6230 1165200 [linearstore] qpid-qls-analyze fails when analyzing empty journal + svn r.1643053 2014-11-18: Proposed fix + 6248 1167911 [linearstore] Symlink creation fails if store dir path is not absolute svn r.1641689 2014-11-25 Proposed solution @@ -194,7 +197,8 @@ no. svn r Q-JIRA RHBZ Date Alt Committer 34. 1632504 6157 1150397 2014-10-17 (pmoravec) 35. 1636598 5671 2014-11-04 36. 1637985 5671 2014-11-10 -37. 1641689 6248 1167911 2014-11-25 +37. 1643053 6230 1165200 2014-11-18 +38. 1641689 6248 1167911 2014-11-25 See above sections for details on these checkins. diff --git a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp index 77d5703636..68810936e1 100644 --- a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp +++ b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp @@ -148,7 +148,7 @@ void MessageStoreImpl::initManagement () mgmtObject = qmf::org::apache::qpid::linearstore::Store::shared_ptr ( new qmf::org::apache::qpid::linearstore::Store(agent, this, broker)); - mgmtObject->set_location(storeDir); + mgmtObject->set_storeDir(storeDir); mgmtObject->set_tplIsInitialized(false); mgmtObject->set_tplDirectory(getTplBaseDir()); mgmtObject->set_tplWritePageSize(tplWCachePgSizeSblks * QLS_SBLK_SIZE_BYTES); @@ -406,7 +406,7 @@ void MessageStoreImpl::create(qpid::broker::PersistableQueue& queue_, if (queue_.getName().size() == 0) { - QLS_LOG(error, "Cannot create store for empty (null) queue name - ignoring and attempting to continue."); + QLS_LOG(error, "Cannot create store for empty (null) queue name - queue create ignored."); return; } @@ -449,15 +449,15 @@ qpid::linearstore::journal::EmptyFilePool* MessageStoreImpl::getEmptyFilePool(const qpid::framing::FieldTable& args_) { qpid::framing::FieldTable::ValuePtr value; qpid::linearstore::journal::efpPartitionNumber_t localEfpPartition = defaultEfpPartitionNumber; - value = args_.get("qpid.efp_partition"); + value = args_.get("qpid.efp_partition_num"); if (value.get() != 0 && !value->empty() && value->convertsTo<int>()) { - localEfpPartition = chkEfpPartition((uint32_t)value->get<int>(), "qpid.efp_partition"); + localEfpPartition = chkEfpPartition((uint32_t)value->get<int>(), "qpid.efp_partition_num"); } qpid::linearstore::journal::efpDataSize_kib_t localEfpFileSizeKib = defaultEfpFileSize_kib; - value = args_.get("qpid.efp_file_size"); + value = args_.get("qpid.efp_pool_file_size"); if (value.get() != 0 && !value->empty() && value->convertsTo<int>()) { - localEfpFileSizeKib = chkEfpFileSizeKiB((uint32_t)value->get<int>(),"qpid.efp_file_size" ); + localEfpFileSizeKib = chkEfpFileSizeKiB((uint32_t)value->get<int>(), "qpid.efp_pool_file_size"); } return getEmptyFilePool(localEfpPartition, localEfpFileSizeKib); } diff --git a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp index 18f4d3afc3..08db3f75bd 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp @@ -59,10 +59,16 @@ EmptyFilePool::EmptyFilePool(const std::string& efpDirectory, EmptyFilePool::~EmptyFilePool() {} void EmptyFilePool::initialize() { -//std::cout << "*** Initializing EFP " << efpDataSize_kib_ << "k in partition " << partitionPtr_->getPartitionNumber() << "; efpDirectory=" << efpDirectory_ << std::endl; // DEBUG - std::vector<std::string> dirList; + if (::mkdir(efpDirectory_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH)) { // Create EFP dir if it does not yet exist + if (errno != EEXIST) { + std::ostringstream oss; + oss << "directory=" << efpDirectory_ << " " << FORMAT_SYSERR(errno); + throw jexception(jerrno::JERR_EFP_MKDIR, oss.str(), "EmptyFilePool", "initialize"); + } + } // Process empty files in main dir + std::vector<std::string> dirList; jdir::read_dir(efpDirectory_, dirList, false, true, false, false); for (std::vector<std::string>::iterator i = dirList.begin(); i != dirList.end(); ++i) { size_t dotPos = i->rfind("."); @@ -122,14 +128,14 @@ const efpIdentity_t EmptyFilePool::getIdentity() const { std::string EmptyFilePool::takeEmptyFile(const std::string& destDirectory) { std::string emptyFileName = popEmptyFile(); - std::string newFileName = efpDirectory_ + "/" + s_inuseFileDirectory_ + emptyFileName.substr(emptyFileName.rfind('/')); + std::string newFileName = efpDirectory_ + "/" + s_inuseFileDirectory_ + emptyFileName.substr(emptyFileName.rfind('/')); // NOTE: substr() includes leading '/' std::string symlinkName = destDirectory + emptyFileName.substr(emptyFileName.rfind('/')); // NOTE: substr() includes leading '/' - if (moveFile(emptyFileName, newFileName)) { + if (!moveFile(emptyFileName, newFileName)) { // Try again with new UUID for file name newFileName = efpDirectory_ + "/" + s_inuseFileDirectory_ + "/" + getEfpFileName(); - if (moveFile(emptyFileName, newFileName)) { + if (!moveFile(emptyFileName, newFileName)) { //std::cerr << "*** DEBUG: pushEmptyFile " << emptyFileName << "from EmptyFilePool::takeEmptyFile()" << std::endl; // DEBUG - pushEmptyFile(emptyFileName); + pushEmptyFile(emptyFileName); // Return empty file to pool std::ostringstream oss; oss << "file=\"" << emptyFileName << "\" dest=\"" << newFileName << "\"" << FORMAT_SYSERR(errno); throw jexception(jerrno::JERR_JDIR_FMOVE, oss.str(), "EmptyFilePool", "takeEmptyFile"); @@ -138,7 +144,7 @@ std::string EmptyFilePool::takeEmptyFile(const std::string& destDirectory) { if (createSymLink(newFileName, symlinkName)) { std::ostringstream oss; oss << "file=\"" << emptyFileName << "\" dest=\"" << newFileName << "\" symlink=\"" << symlinkName << "\"" << FORMAT_SYSERR(errno); - throw jexception(jerrno::JERR_EFP_SYMLINK, oss.str(), "EmptyFilePool", "takeEmptyFile"); + throw jexception(jerrno::JERR__SYMLINK, oss.str(), "EmptyFilePool", "takeEmptyFile"); } return symlinkName; } @@ -189,12 +195,27 @@ efpDataSize_kib_t EmptyFilePool::dataSizeFromDirName_kib(const std::string& dirN } // --- protected functions --- +void EmptyFilePool::checkIosState(std::ofstream& ofs, + const uint32_t jerrno, + const std::string& fqFileName, + const std::string& operation, + const std::string& errorMessage, + const std::string& className, + const std::string& fnName) { + if (!ofs.good()) { + if (ofs.is_open()) { + ofs.close(); + } + std::ostringstream oss; + oss << "IO failure: eofbit=" << (ofs.eof()?"T":"F") << " failbit=" << (ofs.fail()?"T":"F") << " badbit=" + << (ofs.bad()?"T":"F") << " file=" << fqFileName << " operation=" << operation << ": " << errorMessage; + throw jexception(jerrno, oss.str(), className, fnName); + } +} std::string EmptyFilePool::createEmptyFile() { std::string efpfn = getEfpFileName(); - if (!overwriteFileContents(efpfn)) { - // TODO: handle failure to prepare new file here - } + overwriteFileContents(efpfn); return efpfn; } @@ -226,24 +247,20 @@ void EmptyFilePool::initializeSubDirectory(const std::string& fqDirName) { } } -bool EmptyFilePool::overwriteFileContents(const std::string& fqFileName) { +void EmptyFilePool::overwriteFileContents(const std::string& fqFileName) { ::file_hdr_t fh; ::file_hdr_create(&fh, QLS_FILE_MAGIC, QLS_JRNL_VERSION, QLS_JRNL_FHDR_RES_SIZE_SBLKS, partitionPtr_->getPartitionNumber(), efpDataSize_kib_); std::ofstream ofs(fqFileName.c_str(), std::ofstream::out | std::ofstream::binary); - if (ofs.good()) { - ofs.write((char*)&fh, sizeof(::file_hdr_t)); - uint64_t rem = ((efpDataSize_kib_ + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB)) * 1024) - sizeof(::file_hdr_t); - while (rem--) - ofs.put('\0'); - ofs.close(); - return true; -//std::cout << "*** WARNING: EFP " << efpDirectory_ << " is empty - created new journal file " << fqFileName.substr(fqFileName.rfind('/') + 1) << " on the fly" << std::endl; // DEBUG - } else { - std::ostringstream oss; - oss << "std::ofstream ofs: file=\"" << fqFileName.c_str() << "\"" << " failed to be open"; - throw jexception(jerrno::JERR_EFP_FOPEN, oss.str(), "EmptyFilePool", "overwriteFileContents"); + checkIosState(ofs, jerrno::JERR_EFP_FOPEN, fqFileName, "constructor", "Failed to create file", "EmptyFilePool", "overwriteFileContents"); + ofs.write((char*)&fh, sizeof(::file_hdr_t)); + checkIosState(ofs, jerrno::JERR_EFP_FWRITE, fqFileName, "write()", "Failed to write header", "EmptyFilePool", "overwriteFileContents"); + uint64_t rem = ((efpDataSize_kib_ + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB)) * 1024) - sizeof(::file_hdr_t); + while (rem--) { + ofs.put('\0'); + checkIosState(ofs, jerrno::JERR_EFP_FWRITE, fqFileName, "put()", "Failed to put \0", "EmptyFilePool", "overwriteFileContents"); } - return false; + ofs.close(); +//std::cout << "*** WARNING: EFP " << efpDirectory_ << " is empty - created new journal file " << fqFileName.substr(fqFileName.rfind('/') + 1) << " on the fly" << std::endl; // DEBUG } std::string EmptyFilePool::popEmptyFile() { @@ -271,7 +288,7 @@ void EmptyFilePool::pushEmptyFile(const std::string fqFileName) { void EmptyFilePool::returnEmptyFile(const std::string& emptyFileName) { std::string returnedFileName = efpDirectory_ + "/" + s_returnedFileDirectory_ + emptyFileName.substr(emptyFileName.rfind('/')); // NOTE: substr() includes leading '/' - if (moveFile(emptyFileName, returnedFileName)) { + if (!moveFile(emptyFileName, returnedFileName)) { ::unlink(emptyFileName.c_str()); //std::cerr << "*** WARNING: Unable to move file " << emptyFileName << " to " << returnedFileName << "; deleted." << std::endl; // DEBUG } @@ -283,7 +300,7 @@ void EmptyFilePool::returnEmptyFile(const std::string& emptyFileName) { overwriteFileContents(returnedFileName); } std::string sanitizedEmptyFileName = efpDirectory_ + returnedFileName.substr(returnedFileName.rfind('/')); // NOTE: substr() includes leading '/' - if (moveFile(returnedFileName, sanitizedEmptyFileName)) { + if (!moveFile(returnedFileName, sanitizedEmptyFileName)) { ::unlink(returnedFileName.c_str()); //std::cerr << "*** WARNING: Unable to move file " << returnedFileName << " to " << sanitizedEmptyFileName << "; deleted." << std::endl; // DEBUG } else { @@ -395,18 +412,6 @@ bool EmptyFilePool::validateEmptyFile(const std::string& emptyFileName) const { return true; } -// static -int EmptyFilePool::moveFile(const std::string& from, - const std::string& to) { - if (::rename(from.c_str(), to.c_str())) { - if (errno == EEXIST) return errno; // File name exists - std::ostringstream oss; - oss << "file=\"" << from << "\" dest=\"" << to << "\"" << FORMAT_SYSERR(errno); - throw jexception(jerrno::JERR_JDIR_FMOVE, oss.str(), "EmptyFilePool", "returnEmptyFile"); - } - return 0; -} - //static int EmptyFilePool::createSymLink(const std::string& fqFileName, const std::string& fqLinkName) { @@ -414,7 +419,7 @@ int EmptyFilePool::createSymLink(const std::string& fqFileName, if (errno == EEXIST) return errno; // File name exists std::ostringstream oss; oss << "file=\"" << fqFileName << "\" symlink=\"" << fqLinkName << "\"" << FORMAT_SYSERR(errno); - throw jexception(jerrno::JERR_EFP_SYMLINK, oss.str(), "EmptyFilePool", "createSymLink"); + throw jexception(jerrno::JERR__SYMLINK, oss.str(), "EmptyFilePool", "createSymLink"); } return 0; } @@ -426,7 +431,7 @@ std::string EmptyFilePool::deleteSymlink(const std::string& fqLinkName) { if (len < 0) { std::ostringstream oss; oss << "symlink=\"" << fqLinkName << "\"" << FORMAT_SYSERR(errno); - throw jexception(jerrno::JERR_EFP_SYMLINK, oss.str(), "EmptyFilePool", "deleteSymlink"); + throw jexception(jerrno::JERR__SYMLINK, oss.str(), "EmptyFilePool", "deleteSymlink"); } ::unlink(fqLinkName.c_str()); return std::string(buff, len); @@ -455,4 +460,18 @@ bool EmptyFilePool::isSymlink(const std::string& fqName) { } +// static +bool EmptyFilePool::moveFile(const std::string& from, + const std::string& to) { + if (::rename(from.c_str(), to.c_str())) { + if (errno == EEXIST) { + return false; // File name exists + } + std::ostringstream oss; + oss << "file=\"" << from << "\" dest=\"" << to << "\"" << FORMAT_SYSERR(errno); + throw jexception(jerrno::JERR_JDIR_FMOVE, oss.str(), "EmptyFilePool", "returnEmptyFile"); + } + return true; +} + }}} diff --git a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.h b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.h index 1a1264fa26..dc567ff917 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.h +++ b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.h @@ -87,23 +87,30 @@ public: const efpPartitionNumber_t partitionNumber); protected: + void checkIosState(std::ofstream& ofs, + const uint32_t jerrno, + const std::string& fqFileName, + const std::string& operation, + const std::string& errorMessage, + const std::string& className, + const std::string& fnName); std::string createEmptyFile(); std::string getEfpFileName(); void initializeSubDirectory(const std::string& fqDirName); - bool overwriteFileContents(const std::string& fqFileName); + void overwriteFileContents(const std::string& fqFileName); std::string popEmptyFile(); void pushEmptyFile(const std::string fqFileName); void returnEmptyFile(const std::string& emptyFileName); void resetEmptyFileHeader(const std::string& fqFileName); bool validateEmptyFile(const std::string& emptyFileName) const; - static int moveFile(const std::string& fromFqPath, - const std::string& toFqPath); static int createSymLink(const std::string& fqFileName, const std::string& fqLinkName); static std::string deleteSymlink(const std::string& fqLinkName); static bool isFile(const std::string& fqName); static bool isSymlink(const std::string& fqName); + static bool moveFile(const std::string& fromFqPath, + const std::string& toFqPath); }; }}} diff --git a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp index 28e1b0b56e..33707578bf 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp @@ -165,9 +165,10 @@ EmptyFilePool* EmptyFilePoolManager::getEmptyFilePool(const efpIdentity_t efpIde EmptyFilePool* EmptyFilePoolManager::getEmptyFilePool(const efpPartitionNumber_t partitionNumber, const efpDataSize_kib_t efpDataSize_kib) { EmptyFilePoolPartition* efppp = getEfpPartition(partitionNumber > 0 ? partitionNumber : defaultPartitionNumber_); - if (efppp != 0) - return efppp->getEmptyFilePool(efpDataSize_kib > 0 ? efpDataSize_kib : defaultEfpDataSize_kib_); - return 0; + if (efppp == 0) { + return 0; + } + return efppp->getEmptyFilePool(efpDataSize_kib > 0 ? efpDataSize_kib : defaultEfpDataSize_kib_, true); } void EmptyFilePoolManager::getEmptyFilePools(std::vector<EmptyFilePool*>& emptyFilePoolList, diff --git a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp index a31855e0d8..9b58e8c4ff 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp @@ -85,26 +85,7 @@ EmptyFilePoolPartition::findEmptyFilePools() { } } } - EmptyFilePool* efpp = 0; - try { - efpp = new EmptyFilePool(fqFileName, this, overwriteBeforeReturnFlag_, truncateFlag_, journalLogRef_); - { - slock l(efpMapMutex_); - efpMap_[efpp->dataSize_kib()] = efpp; - } - } - catch (const std::exception& e) { - if (efpp != 0) { - delete efpp; - efpp = 0; - } - std::ostringstream oss; - oss << "EmptyFilePool create failed: " << e.what(); - journalLogRef_.log(JournalLog::LOG_WARN, oss.str()); - } - if (efpp != 0) { - efpp->initialize(); - } + createEmptyFilePool(fqFileName); } if (upgradeDirStructureFlag) { std::string oldEfpDir(partitionDir_ + "/efp"); @@ -117,12 +98,17 @@ EmptyFilePoolPartition::findEmptyFilePools() { } } -EmptyFilePool* EmptyFilePoolPartition::getEmptyFilePool(const efpDataSize_kib_t efpDataSize_kib) { - slock l(efpMapMutex_); - efpMapItr_t i = efpMap_.find(efpDataSize_kib); - if (i == efpMap_.end()) - return 0; - return i->second; +EmptyFilePool* EmptyFilePoolPartition::getEmptyFilePool(const efpDataSize_kib_t efpDataSize_kib, const bool createIfNonExistent) { + { + slock l(efpMapMutex_); + efpMapItr_t i = efpMap_.find(efpDataSize_kib); + if (i != efpMap_.end()) + return i->second; + } + if (createIfNonExistent) { + return createEmptyFilePool(efpDataSize_kib); + } + return 0; } void EmptyFilePoolPartition::getEmptyFilePools(std::vector<EmptyFilePool*>& efpList) { @@ -183,7 +169,7 @@ std::string EmptyFilePoolPartition::getPartionDirectoryName(const efpPartitionNu //static efpPartitionNumber_t EmptyFilePoolPartition::getPartitionNumber(const std::string& name) { if (name.length() == 4 && name[0] == 'p' && ::isdigit(name[1]) && ::isdigit(name[2]) && ::isdigit(name[3])) { - long pn = ::strtol(name.c_str() + 1, 0, 0); + long pn = ::strtol(name.c_str() + 1, 0, 10); if (pn == 0 && errno) { return 0; } else { @@ -195,6 +181,35 @@ efpPartitionNumber_t EmptyFilePoolPartition::getPartitionNumber(const std::strin // --- protected functions --- +EmptyFilePool* EmptyFilePoolPartition::createEmptyFilePool(const efpDataSize_kib_t efpDataSize_kib) { + std::string fqEfpDirectoryName(partitionDir_ + "/" + EmptyFilePool::dirNameFromDataSize(efpDataSize_kib)); + return createEmptyFilePool(fqEfpDirectoryName); +} + +EmptyFilePool* EmptyFilePoolPartition::createEmptyFilePool(const std::string fqEfpDirectoryName) { + EmptyFilePool* efpp = 0; + try { + efpp = new EmptyFilePool(fqEfpDirectoryName, this, overwriteBeforeReturnFlag_, truncateFlag_, journalLogRef_); + { + slock l(efpMapMutex_); + efpMap_[efpp->dataSize_kib()] = efpp; + } + } + catch (const std::exception& e) { + if (efpp != 0) { + delete efpp; + efpp = 0; + } + std::ostringstream oss; + oss << "EmptyFilePool create failed: " << e.what(); + journalLogRef_.log(JournalLog::LOG_WARN, oss.str()); + } + if (efpp != 0) { + efpp->initialize(); + } + return efpp; +} + void EmptyFilePoolPartition::validatePartitionDir() { if (!jdir::is_dir(partitionDir_)) { std::ostringstream ss; diff --git a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.h b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.h index c653c6be6a..23a541f8f4 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.h +++ b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.h @@ -59,7 +59,7 @@ public: virtual ~EmptyFilePoolPartition(); void findEmptyFilePools(); - EmptyFilePool* getEmptyFilePool(const efpDataSize_kib_t efpDataSize_kib); + EmptyFilePool* getEmptyFilePool(const efpDataSize_kib_t efpDataSize_kib, const bool createIfNonExistent); void getEmptyFilePools(std::vector<EmptyFilePool*>& efpList); void getEmptyFilePoolSizes_kib(std::vector<efpDataSize_kib_t>& efpDataSizesList) const; std::string getPartitionDirectory() const; @@ -70,6 +70,8 @@ public: static efpPartitionNumber_t getPartitionNumber(const std::string& name); protected: + EmptyFilePool* createEmptyFilePool(const efpDataSize_kib_t efpDataSize_kib); + EmptyFilePool* createEmptyFilePool(const std::string fqEfpDirectoryName); void validatePartitionDir(); }; diff --git a/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp b/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp index 3f39913422..198b39857c 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp @@ -43,6 +43,7 @@ #include "qpid/linearstore/journal/utils/file_hdr.h" #include <sstream> #include <string> +#include <unistd.h> #include <vector> namespace qpid { @@ -101,7 +102,11 @@ void RecoveryManager::analyzeJournals(const std::vector<std::string>* preparedTr analyzeJournalFileHeaders(efpIdentity); if (journalEmptyFlag_) { - *emptyFilePoolPtrPtr = emptyFilePoolManager->getEmptyFilePool(0, 0); // Use default EFP + if (uninitFileList_.empty()) { + *emptyFilePoolPtrPtr = emptyFilePoolManager->getEmptyFilePool(0, 0); // Use default EFP + } else { + *emptyFilePoolPtrPtr = emptyFilePoolManager->getEmptyFilePool(efpIdentity); + } } else { *emptyFilePoolPtrPtr = emptyFilePoolManager->getEmptyFilePool(efpIdentity); if (! *emptyFilePoolPtrPtr) { @@ -294,6 +299,7 @@ void RecoveryManager::setLinearFileControllerJournals(lfcAddJournalFileFn fnPtr, LinearFileController* lfcPtr) { if (journalEmptyFlag_) { if (uninitFileList_.size() > 0) { + // TODO: Handle case if uninitFileList_.size() > 1, but this should not happen in normal operation. Here we assume only one item in the list. std::string uninitFile = uninitFileList_.back(); uninitFileList_.pop_back(); lfcPtr->restoreEmptyFile(uninitFile); @@ -377,11 +383,28 @@ void RecoveryManager::analyzeJournalFileHeaders(efpIdentity_t& efpIdentity) { jdir::read_dir(journalDirectory_, directoryList, false, true, false, true); for (stringListConstItr_t i = directoryList.begin(); i != directoryList.end(); ++i) { bool hdrOk = readJournalFileHeader(*i, fileHeader, headerQueueName); - if (!hdrOk || headerQueueName.empty()) { + bool hdrEmpty = ::is_file_hdr_reset(&fileHeader); + if (!hdrOk) { std::ostringstream oss; - oss << "Journal file " << (*i) << " is uninitialized or corrupted"; + oss << "Journal file " << (*i) << " is corrupted or invalid"; journalLogRef_.log(JournalLog::LOG_WARN, queueName_, oss.str()); + } else if (hdrEmpty) { + // Read symlink, find efp directory name which is efp size in KiB + // TODO: place this bit into a common function as it is also used in EmptyFilePool.cpp::deleteSymlink() + char buff[1024]; + ssize_t len = ::readlink((*i).c_str(), buff, 1024); + if (len < 0) { + std::ostringstream oss; + oss << "symlink=\"" << (*i) << "\"" << FORMAT_SYSERR(errno); + throw jexception(jerrno::JERR__SYMLINK, oss.str(), "RecoveryManager", "analyzeJournalFileHeaders"); + } + // Find second and third '/' from back of string, which contains the EFP directory name + *(::strrchr(buff, '/')) = '\0'; + *(::strrchr(buff, '/')) = '\0'; + int efpDataSize_kib = atoi(::strrchr(buff, '/') + 1); uninitFileList_.push_back(*i); + efpIdentity.pn_ = fileHeader._efp_partition; + efpIdentity.ds_ = efpDataSize_kib; } else if (headerQueueName.compare(queueName_) != 0) { std::ostringstream oss; oss << "Journal file " << (*i) << " belongs to queue \"" << headerQueueName << "\": ignoring"; @@ -406,6 +429,7 @@ void RecoveryManager::analyzeJournalFileHeaders(efpIdentity_t& efpIdentity) { } } +std::cerr << "*** RecoveryManager::analyzeJournalFileHeaders() fileNumberMap_.size()=" << fileNumberMap_.size() << std::endl; // DEBUG if (fileNumberMap_.empty()) { journalEmptyFlag_ = true; } else { @@ -905,7 +929,9 @@ bool RecoveryManager::readJournalFileHeader(const std::string& journalFileName, } ifs.close(); ::memcpy(&fileHeaderRef, buffer, sizeof(::file_hdr_t)); - if (::file_hdr_check(&fileHeaderRef, QLS_FILE_MAGIC, QLS_JRNL_VERSION, 0, QLS_MAX_QUEUE_NAME_LEN)) return false; + if (::file_hdr_check(&fileHeaderRef, QLS_FILE_MAGIC, QLS_JRNL_VERSION, 0, QLS_MAX_QUEUE_NAME_LEN)) { + return false; + } queueName.assign(buffer + sizeof(::file_hdr_t), fileHeaderRef._queue_name_len); return true; } diff --git a/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp b/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp index 9d59039220..ce88e7809c 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp @@ -42,6 +42,7 @@ const uint32_t jerrno::JERR__UNEXPRESPONSE = 0x0108; const uint32_t jerrno::JERR__RECNFOUND = 0x0109; const uint32_t jerrno::JERR__NOTIMPL = 0x010a; const uint32_t jerrno::JERR__NULL = 0x010b; +const uint32_t jerrno::JERR__SYMLINK = 0x010c; // class jcntl const uint32_t jerrno::JERR_JCNTL_STOPPED = 0x0200; @@ -112,10 +113,11 @@ const uint32_t jerrno::JERR_EFP_BADPARTITIONDIR = 0x0d02; const uint32_t jerrno::JERR_EFP_BADEFPDIRNAME = 0x0d03; const uint32_t jerrno::JERR_EFP_NOEFP = 0x0d04; const uint32_t jerrno::JERR_EFP_EMPTY = 0x0d05; -const uint32_t jerrno::JERR_EFP_SYMLINK = 0x0d06; -const uint32_t jerrno::JERR_EFP_LSTAT = 0x0d07; -const uint32_t jerrno::JERR_EFP_BADFILETYPE = 0x0d08; -const uint32_t jerrno::JERR_EFP_FOPEN = 0x0d09; +const uint32_t jerrno::JERR_EFP_LSTAT = 0x0d06; +const uint32_t jerrno::JERR_EFP_BADFILETYPE = 0x0d07; +const uint32_t jerrno::JERR_EFP_FOPEN = 0x0d08; +const uint32_t jerrno::JERR_EFP_FWRITE = 0x0d09; +const uint32_t jerrno::JERR_EFP_MKDIR = 0x0d0a; // Negative returns for some functions const int32_t jerrno::AIO_TIMEOUT = -1; @@ -140,6 +142,7 @@ jerrno::__init() _err_map[JERR__RECNFOUND] = "JERR__RECNFOUND: Record not found."; _err_map[JERR__NOTIMPL] = "JERR__NOTIMPL: Not implemented"; _err_map[JERR__NULL] = "JERR__NULL: Operation on null pointer"; + _err_map[JERR__SYMLINK] = "JERR__SYMLINK: Symbolic link operation failed"; // class jcntl _err_map[JERR_JCNTL_STOPPED] = "JERR_JCNTL_STOPPED: Operation on stopped journal."; @@ -210,10 +213,11 @@ jerrno::__init() _err_map[JERR_EFP_BADPARTITIONDIR] = "JERR_EFP_BADPARTITIONDIR: Invalid partition directory"; _err_map[JERR_EFP_NOEFP] = "JERR_EFP_NOEFP: No Empty File Pool found for given partition and empty file size"; _err_map[JERR_EFP_EMPTY] = "JERR_EFP_EMPTY: Empty File Pool is empty"; - _err_map[JERR_EFP_SYMLINK] = "JERR_EFP_SYMLINK: Symbolic link operation failed"; _err_map[JERR_EFP_LSTAT] = "JERR_EFP_LSTAT: lstat() operation failed"; _err_map[JERR_EFP_BADFILETYPE] = "JERR_EFP_BADFILETYPE: File type incorrect for operation"; _err_map[JERR_EFP_FOPEN] = "JERR_EFP_FOPEN: Unable to fopen file for write"; + _err_map[JERR_EFP_FWRITE] = "JERR_EFP_FWRITE: Write failed"; + _err_map[JERR_EFP_MKDIR] = "JERR_EFP_MKDIR: Directory creation failed"; //_err_map[] = ""; diff --git a/qpid/cpp/src/qpid/linearstore/journal/jerrno.h b/qpid/cpp/src/qpid/linearstore/journal/jerrno.h index 77b18b17e8..6e817682ca 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/jerrno.h +++ b/qpid/cpp/src/qpid/linearstore/journal/jerrno.h @@ -60,6 +60,7 @@ namespace journal { static const uint32_t JERR__RECNFOUND; ///< Record not found static const uint32_t JERR__NOTIMPL; ///< Not implemented static const uint32_t JERR__NULL; ///< Operation on null pointer + static const uint32_t JERR__SYMLINK; ///< Symbolic Link operation failed // class jcntl static const uint32_t JERR_JCNTL_STOPPED; ///< Operation on stopped journal @@ -130,10 +131,11 @@ namespace journal { static const uint32_t JERR_EFP_BADPARTITIONDIR; ///< Invalid partition directory static const uint32_t JERR_EFP_NOEFP; ///< No EFP found for given partition and file size static const uint32_t JERR_EFP_EMPTY; ///< EFP empty - static const uint32_t JERR_EFP_SYMLINK; ///< Symbolic Link operation failed static const uint32_t JERR_EFP_LSTAT; ///< lstat operation failed static const uint32_t JERR_EFP_BADFILETYPE; ///< Bad file type static const uint32_t JERR_EFP_FOPEN; ///< Unable to fopen file for write + static const uint32_t JERR_EFP_FWRITE; ///< Write failed + static const uint32_t JERR_EFP_MKDIR; ///< Directory creation failed // Negative returns for some functions static const int32_t AIO_TIMEOUT; ///< Timeout waiting for AIO return diff --git a/qpid/cpp/src/qpid/linearstore/management-schema.xml b/qpid/cpp/src/qpid/linearstore/management-schema.xml index a55883a255..ebd388593e 100644 --- a/qpid/cpp/src/qpid/linearstore/management-schema.xml +++ b/qpid/cpp/src/qpid/linearstore/management-schema.xml @@ -21,38 +21,24 @@ <class name="Store"> <property name="brokerRef" type="objId" access="RO" references="qpid.Broker" index="y" parentRef="y"/> - <property name="location" type="sstr" access="RO" desc="Logical directory on disk"/> - <!--property name="defaultInitialFileCount" type="uint16" access="RO" unit="file" desc="Default number of files initially allocated to each journal"/--> - <!--property name="defaultDataFileSize" type="uint32" access="RO" unit="RdPg" desc="Default size of each journal data file"/--> + <property name="storeDir" type="sstr" access="RO" desc="Logical directory on disk"/> <property name="tplIsInitialized" type="bool" access="RO" desc="Transaction prepared list has been initialized by a transactional prepare"/> <property name="tplDirectory" type="sstr" access="RO" desc="Transaction prepared list directory"/> <property name="tplWritePageSize" type="uint32" access="RO" unit="byte" desc="Page size in transaction prepared list write-page-cache"/> <property name="tplWritePages" type="uint32" access="RO" unit="wpage" desc="Number of pages in transaction prepared list write-page-cache"/> - <!--property name="tplInitialFileCount" type="uint16" access="RO" unit="file" desc="Number of files initially allocated to transaction prepared list journal"/--> - <!--property name="tplDataFileSize" type="uint32" access="RO" unit="byte" desc="Size of each journal data file in transaction prepared list journal"/--> - <!--property name="tplCurrentFileCount" type="uint32" access="RO" unit="file" desc="Number of files currently allocated to transaction prepared list journal"/--> <statistic name="tplTransactionDepth" type="hilo32" unit="txn" desc="Number of currently enqueued prepared transactions"/> <statistic name="tplTxnPrepares" type="count64" unit="record" desc="Total transaction prepares on transaction prepared list"/> <statistic name="tplTxnCommits" type="count64" unit="record" desc="Total transaction commits on transaction prepared list"/> <statistic name="tplTxnAborts" type="count64" unit="record" desc="Total transaction aborts on transaction prepared list"/> - <statistic name="tplOutstandingAIOs" type="hilo32" unit="aio_op" desc="Deprecated"/> </class> <class name="Journal"> <property name="queueRef" type="objId" access="RO" references="qpid.Queue" isGeneralReference="y"/> - <property name="name" type="sstr" access="RC" index="y"/> + <property name="queueName" type="sstr" access="RC" index="y"/> <property name="directory" type="sstr" access="RO" desc="Directory containing journal files"/> - <property name="baseFileName" type="sstr" access="RO" desc="Deprecated"/> <property name="writePageSize" type="uint32" access="RO" unit="byte" desc="Deprecated"/> <property name="writePages" type="uint32" access="RO" unit="wpage" desc="Deprecated"/> - <property name="readPageSize" type="uint32" access="RO" unit="byte" desc="Deprecated"/> - <property name="readPages" type="uint32" access="RO" unit="rpage" desc="Deprecated"/> - <!--property name="initialFileCount" type="uint16" access="RO" unit="file" desc="Number of files initially allocated to this journal"/--> - <!--property name="autoExpand" type="bool" access="RO" desc="Auto-expand enabled"/--> - <!--property name="currentFileCount" type="uint16" access="RO" unit="file" desc="Number of files currently allocated to this journal"/--> - <!--property name="maxFileCount" type="uint16" access="RO" unit="file" desc="Max number of files allowed for this journal"/--> - <!--property name="dataFileSize" type="uint32" access="RO" unit="byte" desc="Size of each journal data file"/--> <statistic name="recordDepth" type="hilo32" unit="record" desc="Number of currently enqueued records (durable messages)"/> <statistic name="enqueues" type="count64" unit="record" desc="Total enqueued records on journal"/> @@ -64,36 +50,5 @@ <statistic name="txnAborts" type="count64" unit="record" desc="Total transactional abort records on journal"/> <statistic name="outstandingAIOs" type="hilo32" unit="aio_op" desc="Number of currently outstanding AIO requests in Async IO system"/> -<!-- - The following are not yet "wired up" in JournalImpl.cpp ---> - <statistic name="freeFileCount" type="hilo32" unit="file" desc="Deprecated"/> - <statistic name="availableFileCount" type="hilo32" unit="file" desc="Deprecated"/> - <statistic name="writeWaitFailures" type="count64" unit="record" desc="Deprecated"/> - <statistic name="writeBusyFailures" type="count64" unit="record" desc="Deprecated"/> - <statistic name="readRecordCount" type="count64" unit="record" desc="Deprecated"/> - <statistic name="readBusyFailures" type="count64" unit="record" desc="Deprecated"/> - <statistic name="writePageCacheDepth" type="hilo32" unit="wpage" desc="Deprecated"/> - <statistic name="readPageCacheDepth" type="hilo32" unit="rpage" desc="Deprecated"/> - - <!--method name="expand" desc="Increase number of files allocated for this journal"> - <arg name="by" type="uint32" dir="I" desc="Number of files to increase journal size by"/> - </method--> </class> - - <eventArguments> - <!--arg name="autoExpand" type="bool" desc="Journal auto-expand enabled"/--> - <arg name="fileSize" type="uint32" desc="Journal file size in bytes"/> - <arg name="jrnlId" type="sstr" desc="Journal Id"/> - <arg name="numEnq" type="uint32" desc="Number of recovered enqueues"/> - <arg name="numFiles" type="uint16" desc="Number of journal files"/> - <arg name="numTxn" type="uint32" desc="Number of recovered transactions"/> - <arg name="numTxnDeq" type="uint32" desc="Number of recovered transactional dequeues"/> - <arg name="numTxnEnq" type="uint32" desc="Number of recovered transactional enqueues"/> - <arg name="what" type="sstr" desc="Description of event"/> - </eventArguments> - <event name="enqThresholdExceeded" sev="warn" args="jrnlId, what"/> - <event name="created" sev="notice" args="jrnlId, fileSize, numFiles"/> - <event name="full" sev="error" args="jrnlId, what"/> - <event name="recovered" sev="notice" args="jrnlId, fileSize, numFiles, numEnq, numTxn, numTxnEnq, numTxnDeq"/> </schema> diff --git a/qpid/tools/src/py/qpid-config b/qpid/tools/src/py/qpid-config index ac93954484..8d38b1a342 100755 --- a/qpid/tools/src/py/qpid-config +++ b/qpid/tools/src/py/qpid-config @@ -105,6 +105,8 @@ class Config: self._if_unused = True self._fileCount = None self._fileSize = None + self._efp_partition_num = None + self._efp_pool_file_size = None self._maxQueueSize = None self._maxQueueCount = None self._limitPolicy = None @@ -137,6 +139,8 @@ conn_options = {} FILECOUNT = "qpid.file_count" FILESIZE = "qpid.file_size" +EFP_PARTITION_NUM = "qpid.efp_partition_num" +EFP_POOL_FILE_SIZE = "qpid.efp_pool_file_size" MAX_QUEUE_SIZE = "qpid.max_size" MAX_QUEUE_COUNT = "qpid.max_count" POLICY_TYPE = "qpid.policy_type" @@ -158,7 +162,8 @@ REPLICATE = "qpid.replicate" #i.e. the arguments for which there is special processing on add and #list SPECIAL_ARGS=[ - FILECOUNT,FILESIZE,MAX_QUEUE_SIZE,MAX_QUEUE_COUNT,POLICY_TYPE, + FILECOUNT,FILESIZE,EFP_PARTITION_NUM,EFP_POOL_FILE_SIZE, + MAX_QUEUE_SIZE,MAX_QUEUE_COUNT,POLICY_TYPE, LVQ_KEY,MSG_SEQUENCE,IVE,QUEUE_EVENT_GENERATION, FLOW_STOP_COUNT,FLOW_RESUME_COUNT,FLOW_STOP_SIZE,FLOW_RESUME_SIZE, MSG_GROUP_HDR_KEY,SHARED_MSG_GROUP,REPLICATE] @@ -213,8 +218,10 @@ def OptionsAndArguments(argv): parser.add_option_group(group2) group3 = OptionGroup(parser, "Options for Adding Queues") - group3.add_option("--file-count", action="store", type="int", metavar="<n>", help="Number of files in queue's persistence journal") - group3.add_option("--file-size", action="store", type="int", metavar="<n>", help="File size in pages (64KiB/page)") + group3.add_option("--file-count", action="store", type="int", metavar="<n>", help="[legacystore] Number of files in queue's persistence journal") + group3.add_option("--file-size", action="store", type="int", metavar="<n>", help="[legactystore] File size in pages (64KiB/page)") + group3.add_option("--efp-partition-num", action="store", type="int", metavar="<n>", help="[linearstore] EFP partition number") + group3.add_option("--efp-pool-file-size", action="store", type="int", metavar="<n>", help="[linearstore] EFP file size (KiB)") group3.add_option("--max-queue-size", action="store", type="int", metavar="<n>", help="Maximum in-memory queue size as bytes") group3.add_option("--max-queue-count", action="store", type="int", metavar="<n>", help="Maximum in-memory queue size as a number of messages") group3.add_option("--limit-policy", action="store", choices=["none", "reject", "ring", "ring-strict"], metavar="<policy>", help="Action to take when queue limit is reached") @@ -294,6 +301,10 @@ def OptionsAndArguments(argv): config._fileCount = opts.file_count if opts.file_size is not None: config._fileSize = opts.file_size + if opts.efp_partition_num is not None: + config._efp_partition_num = opts.efp_partition_num + if opts.efp_pool_file_size is not None: + config._efp_pool_file_size = opts.efp_pool_file_size if opts.max_queue_size is not None: config._maxQueueSize = opts.max_queue_size if opts.max_queue_count is not None: @@ -524,6 +535,8 @@ class BrokerManager: if q.exclusive: print "excl", if FILESIZE in args: print "--file-size=%s" % args[FILESIZE], if FILECOUNT in args: print "--file-count=%s" % args[FILECOUNT], + if EFP_PARTITION_NUM in args: print "--efp-partition-num=%s" % args[EFP_PARTITION_NUM], + if EFP_POOL_FILE_SIZE in args: print "--efp-pool-file-size=%s" % args[EFP_POOL_FILE_SIZE], if MAX_QUEUE_SIZE in args: print "--max-queue-size=%s" % args[MAX_QUEUE_SIZE], if MAX_QUEUE_COUNT in args: print "--max-queue-count=%s" % args[MAX_QUEUE_COUNT], if POLICY_TYPE in args: print "--limit-policy=%s" % args[POLICY_TYPE].replace("_", "-"), @@ -606,6 +619,10 @@ class BrokerManager: declArgs[FILECOUNT] = config._fileCount if config._fileSize: declArgs[FILESIZE] = config._fileSize + if config._efp_partition_num: + declArgs[EFP_PARTITION_NUM] = config._efp_partition_num + if config._efp_pool_file_size: + declArgs[EFP_POOL_FILE_SIZE] = config._efp_pool_file_size if config._maxQueueSize is not None: declArgs[MAX_QUEUE_SIZE] = config._maxQueueSize |