diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2013-10-22 19:09:56 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2013-10-22 19:09:56 +0000 |
| commit | 10450e2bc10d56fe3696bc5284e0ec77c63647ee (patch) | |
| tree | cd676b8ad920aca643a149cde3397688c4bd1964 /cpp/src/qpid/linearstore/jrnl/EmptyFilePool.cpp | |
| parent | f7cbe401aa405da618e39e9e77aa95d9375d23eb (diff) | |
| download | qpid-python-10450e2bc10d56fe3696bc5284e0ec77c63647ee.tar.gz | |
QPID-4984: WIP: Copy of work over from branch to trunk. Non-tx works with some known bugs, tx not yet operational.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1534736 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/linearstore/jrnl/EmptyFilePool.cpp')
| -rw-r--r-- | cpp/src/qpid/linearstore/jrnl/EmptyFilePool.cpp | 326 |
1 files changed, 326 insertions, 0 deletions
diff --git a/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.cpp b/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.cpp new file mode 100644 index 0000000000..c78028362a --- /dev/null +++ b/cpp/src/qpid/linearstore/jrnl/EmptyFilePool.cpp @@ -0,0 +1,326 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "EmptyFilePool.h" + +#include <cctype> +#include <fstream> +#include "qpid/linearstore/jrnl/EmptyFilePoolPartition.h" +#include "qpid/linearstore/jrnl/jcfg.h" +#include "qpid/linearstore/jrnl/jdir.h" +#include "qpid/linearstore/jrnl/JournalFile.h" +#include "qpid/linearstore/jrnl/JournalLog.h" +#include "qpid/linearstore/jrnl/slock.h" +#include "qpid/linearstore/jrnl/utils/file_hdr.h" +#include <sys/stat.h> +#include <unistd.h> +#include <uuid/uuid.h> +#include <vector> + +namespace qpid { +namespace qls_jrnl { + +EmptyFilePool::EmptyFilePool(const std::string& efpDirectory, + const EmptyFilePoolPartition* partitionPtr, + JournalLog& journalLogRef) : + efpDirectory_(efpDirectory), + efpDataSize_kib_(fileSizeKbFromDirName(efpDirectory, partitionPtr->getPartitionNumber())), + partitionPtr_(partitionPtr), + journalLogRef_(journalLogRef) +{} + +EmptyFilePool::~EmptyFilePool() {} + +void EmptyFilePool::initialize() { + std::vector<std::string> dirList; + jdir::read_dir(efpDirectory_, dirList, false, true, false, false); + for (std::vector<std::string>::iterator i = dirList.begin(); i != dirList.end(); ++i) { + size_t dotPos = i->rfind("."); + if (dotPos != std::string::npos) { + if (i->substr(dotPos).compare(".jrnl") == 0 && i->length() == 41) { + std::string emptyFile(efpDirectory_ + "/" + (*i)); + if (validateEmptyFile(emptyFile)) { + pushEmptyFile(emptyFile); + } + } + } + } +} + +efpDataSize_kib_t EmptyFilePool::dataSize_kib() const { + return efpDataSize_kib_; +} + +efpFileSize_kib_t EmptyFilePool::fileSize_kib() const { + return efpDataSize_kib_ + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB); +} + +efpDataSize_sblks_t EmptyFilePool::dataSize_sblks() const { + return efpDataSize_kib_ / QLS_SBLK_SIZE_KIB; +} + +efpFileSize_sblks_t EmptyFilePool::fileSize_sblks() const { + return (efpDataSize_kib_ / QLS_SBLK_SIZE_KIB) + QLS_JRNL_FHDR_RES_SIZE_SBLKS; +} + +efpFileCount_t EmptyFilePool::numEmptyFiles() const { + slock l(emptyFileListMutex_); + return efpFileCount_t(emptyFileList_.size()); +} + +efpDataSize_kib_t EmptyFilePool::cumFileSize_kib() const { + slock l(emptyFileListMutex_); + return efpDataSize_kib_t(emptyFileList_.size()) * efpDataSize_kib_; +} + +efpPartitionNumber_t EmptyFilePool::getPartitionNumber() const { + return partitionPtr_->getPartitionNumber(); +} + +const EmptyFilePoolPartition* EmptyFilePool::getPartition() const { + return partitionPtr_; +} + +const efpIdentity_t EmptyFilePool::getIdentity() const { + return efpIdentity_t(partitionPtr_->getPartitionNumber(), efpDataSize_kib_); +} + +std::string EmptyFilePool::takeEmptyFile(const std::string& destDirectory) { + std::string emptyFileName = popEmptyFile(); + std::string newFileName = destDirectory + emptyFileName.substr(emptyFileName.rfind('/')); // NOTE: substr() includes leading '/' + if (moveEmptyFile(emptyFileName.c_str(), newFileName.c_str())) { + // Try again with new UUID for file name + newFileName = destDirectory + "/" + getEfpFileName(); + if (moveEmptyFile(emptyFileName.c_str(), newFileName.c_str())) { + pushEmptyFile(emptyFileName); + std::ostringstream oss; + oss << "file=\"" << emptyFileName << "\" dest=\"" << newFileName << "\"" << FORMAT_SYSERR(errno); + throw jexception(jerrno::JERR_JDIR_FMOVE, oss.str(), "EmptyFilePool", "takeEmptyFile"); + } + } + return newFileName; +} + +void EmptyFilePool::returnEmptyFile(const std::string& fqSrcFile) { + std::string emptyFileName(efpDirectory_ + fqSrcFile.substr(fqSrcFile.rfind('/'))); // NOTE: substr() includes leading '/' + if (moveEmptyFile(fqSrcFile.c_str(), emptyFileName.c_str())) { + // Try again with new UUID for file name + emptyFileName = efpDirectory_ + "/" + getEfpFileName(); + if (moveEmptyFile(fqSrcFile.c_str(), emptyFileName.c_str())) { + // Failed twice in a row - delete file + ::unlink(fqSrcFile.c_str()); + return; + } + } + resetEmptyFileHeader(emptyFileName); + pushEmptyFile(emptyFileName); +} + +// --- protected functions --- + +void EmptyFilePool::createEmptyFile() { + ::file_hdr_t fh; + ::file_hdr_create(&fh, QLS_FILE_MAGIC, QLS_JRNL_VERSION, QLS_JRNL_FHDR_RES_SIZE_SBLKS, partitionPtr_->getPartitionNumber(), efpDataSize_kib_); + std::string efpfn = getEfpFileName(); + std::ofstream ofs(efpfn.c_str(), std::ofstream::out | std::ofstream::binary); + if (ofs.good()) { + ofs.write((char*)&fh, sizeof(::file_hdr_t)); + uint64_t rem = ((efpDataSize_kib_ + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB)) * 1024) - sizeof(::file_hdr_t); + while (rem--) + ofs.put('\0'); + ofs.close(); + pushEmptyFile(efpfn); +//std::cout << "WARNING: EFP " << efpDirectory << " is empty - created new journal file " << efpfn.substr(efpfn.rfind('/') + 1) << " on the fly" << std::endl; + } else { +//std::cerr << "ERROR: Unable to open file \"" << efpfn << "\"" << std::endl; // DEBUG + } +} + +std::string EmptyFilePool::getEfpFileName() { + uuid_t uuid; + ::uuid_generate(uuid); // NOTE: uuid_generate() is not thread safe + char uuid_str[37]; // 36 char uuid + trailing \0 + ::uuid_unparse(uuid, uuid_str); + std::ostringstream oss; + oss << efpDirectory_ << "/" << uuid_str << QLS_JRNL_FILE_EXTENSION; + return oss.str(); +} + +std::string EmptyFilePool::popEmptyFile() { + std::string emptyFileName; + bool isEmpty = false; + { + slock l(emptyFileListMutex_); + isEmpty = emptyFileList_.empty(); + } + if (isEmpty) { + createEmptyFile(); + } + { + slock l(emptyFileListMutex_); + emptyFileName = emptyFileList_.front(); + emptyFileList_.pop_front(); + } + return emptyFileName; +} + +void EmptyFilePool::pushEmptyFile(const std::string fqFileName) { + slock l(emptyFileListMutex_); + emptyFileList_.push_back(fqFileName); +} + +void EmptyFilePool::resetEmptyFileHeader(const std::string& fqFileName) { + std::fstream fs(fqFileName.c_str(), std::fstream::in | std::fstream::out | std::fstream::binary); + if (fs.good()) { + const std::streamsize buffsize = QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_BYTES; + char buff[buffsize]; + fs.read((char*)buff, buffsize); + std::streampos bytesRead = fs.tellg(); + if (bytesRead == buffsize) { + ::file_hdr_reset((::file_hdr_t*)buff); + ::memset(buff + sizeof(::file_hdr_t), 0, MAX_FILE_HDR_LEN - sizeof(::file_hdr_t)); // set rest of buffer to 0 + fs.seekp(0, std::fstream::beg); + fs.write(buff, buffsize); + std::streampos bytesWritten = fs.tellp(); + if (bytesWritten != buffsize) { +//std::cerr << "ERROR: Unable to write file header of file \"" << fqFileName_ << "\": tried to write " << buffsize << " bytes; wrote " << bytesWritten << " bytes." << std::endl; + } + } else { +//std::cerr << "ERROR: Unable to read file header of file \"" << fqFileName_ << "\": tried to read " << sizeof(::file_hdr_t) << " bytes; read " << bytesRead << " bytes." << std::endl; + } + fs.close(); + } else { +//std::cerr << "ERROR: Unable to open file \"" << fqFileName_ << "\" for reading" << std::endl; // DEBUG + } +} + +bool EmptyFilePool::validateEmptyFile(const std::string& emptyFileName) const { + std::ostringstream oss; + struct stat s; + if (::stat(emptyFileName.c_str(), &s)) + { + oss << "stat: file=\"" << emptyFileName << "\"" << FORMAT_SYSERR(errno); + throw jexception(jerrno::JERR_JDIR_STAT, oss.str(), "EmptyFilePool", "validateEmptyFile"); + } + + // Size matches pool + efpDataSize_kib_t expectedSize = (QLS_SBLK_SIZE_KIB + efpDataSize_kib_) * 1024; + if ((efpDataSize_kib_t)s.st_size != expectedSize) { + oss << "ERROR: File " << emptyFileName << ": Incorrect size: Expected=" << expectedSize << "; actual=" << s.st_size; + journalLogRef_.log(JournalLog::LOG_ERROR, oss.str()); + return false; + } + + // Open file and read header + std::fstream fs(emptyFileName.c_str(), std::fstream::in | std::fstream::out | std::fstream::binary); + if (!fs) { + oss << "ERROR: File " << emptyFileName << ": Unable to open for reading"; + journalLogRef_.log(JournalLog::LOG_ERROR, oss.str()); + return false; + } + const std::streamsize buffsize = QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_BYTES; + char buff[buffsize]; + fs.read((char*)buff, buffsize); + std::streampos bytesRead = fs.tellg(); + if (bytesRead != buffsize) { + oss << "ERROR: Unable to read file header of file \"" << emptyFileName << "\": tried to read " << buffsize << " bytes; read " << bytesRead << " bytes"; + journalLogRef_.log(JournalLog::LOG_ERROR, oss.str()); + fs.close(); + return false; + } + + // Check file header + const bool jrnlMagicError = ((::file_hdr_t*)buff)->_rhdr._magic != QLS_FILE_MAGIC; + const bool jrnlVersionError = ((::file_hdr_t*)buff)->_rhdr._version != QLS_JRNL_VERSION; + const bool jrnlPartitionError = ((::file_hdr_t*)buff)->_efp_partition != partitionPtr_->getPartitionNumber(); + const bool jrnlFileSizeError = ((::file_hdr_t*)buff)->_file_size_kib != efpDataSize_kib_; + if (jrnlMagicError || jrnlVersionError || jrnlPartitionError || jrnlFileSizeError) + { + oss << "ERROR: File " << emptyFileName << ": Invalid file header - mismatched header fields: " << + (jrnlMagicError ? "magic " : "") << + (jrnlVersionError ? "version " : "") << + (jrnlPartitionError ? "partition" : "") << + (jrnlFileSizeError ? "file-size" : ""); + journalLogRef_.log(JournalLog::LOG_ERROR, oss.str()); + fs.close(); + return false; + } + + // Check file header is reset + if (!::is_file_hdr_reset((::file_hdr_t*)buff)) { + ::file_hdr_reset((::file_hdr_t*)buff); + ::memset(buff + sizeof(::file_hdr_t), 0, MAX_FILE_HDR_LEN - sizeof(::file_hdr_t)); // set rest of buffer to 0 + fs.seekp(0, std::fstream::beg); + fs.write(buff, buffsize); + std::streampos bytesWritten = fs.tellp(); + if (bytesWritten != buffsize) { + oss << "ERROR: Unable to write file header of file \"" << emptyFileName << "\": tried to write " << buffsize << " bytes; wrote " << bytesWritten << " bytes"; + journalLogRef_.log(JournalLog::LOG_ERROR, oss.str()); + fs.close(); + return false; + } + oss << "WARNING: File " << emptyFileName << ": File header not reset"; + journalLogRef_.log(JournalLog::LOG_WARN, oss.str()); + } + + // Close file + fs.close(); + return true; +} + +// static +efpDataSize_kib_t EmptyFilePool::fileSizeKbFromDirName(const std::string& dirName, + const efpPartitionNumber_t partitionNumber) { + // Check for dirName format 'NNNk', where NNN is a number, convert NNN into an integer. NNN cannot be 0. + std::string n(dirName.substr(dirName.rfind('/')+1)); + bool valid = true; + for (uint16_t charNum = 0; charNum < n.length(); ++charNum) { + if (charNum < n.length()-1) { + if (!::isdigit((int)n[charNum])) { + valid = false; + break; + } + } else { + valid = n[charNum] == 'k'; + } + } + efpDataSize_kib_t s = ::atol(n.c_str()); + if (!valid || s == 0 || s % QLS_SBLK_SIZE_KIB != 0) { + std::ostringstream oss; + oss << "Partition: " << partitionNumber << "; EFP directory: \'" << n << "\'"; + throw jexception(jerrno::JERR_EFP_BADEFPDIRNAME, oss.str(), "EmptyFilePool", "fileSizeKbFromDirName"); + } + return s; +} + +// static +int EmptyFilePool::moveEmptyFile(const std::string& from, + const std::string& to) { + if (::rename(from.c_str(), to.c_str())) { + if (errno == EEXIST) return errno; // File name exists + std::ostringstream oss; + oss << "file=\"" << from << "\" dest=\"" << to << "\"" << FORMAT_SYSERR(errno); + throw jexception(jerrno::JERR_JDIR_FMOVE, oss.str(), "EmptyFilePool", "returnEmptyFile"); + } + return 0; +} + +}} // namespace qpid::qls_jrnl |
