diff options
Diffstat (limited to 'cpp/src/qpid/legacystore/jrnl/fcntl.cpp')
-rw-r--r-- | cpp/src/qpid/legacystore/jrnl/fcntl.cpp | 375 |
1 files changed, 375 insertions, 0 deletions
diff --git a/cpp/src/qpid/legacystore/jrnl/fcntl.cpp b/cpp/src/qpid/legacystore/jrnl/fcntl.cpp new file mode 100644 index 0000000000..fbb176667e --- /dev/null +++ b/cpp/src/qpid/legacystore/jrnl/fcntl.cpp @@ -0,0 +1,375 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/** + * \file fcntl.cpp + * + * Qpid asynchronous store plugin library + * + * File containing code for class mrg::journal::fcntl (non-logging file + * handle), used for controlling journal log files. See comments in file + * fcntl.h for details. + */ + +#include "qpid/legacystore/jrnl/fcntl.h" + +#include <cerrno> +#include <cstdlib> +#include <cstring> +#include <fcntl.h> +#include <iomanip> +#include "qpid/legacystore/jrnl/jerrno.h" +#include "qpid/legacystore/jrnl/jexception.h" +#include <sstream> +#include <unistd.h> + +namespace mrg +{ +namespace journal +{ + +fcntl::fcntl(const std::string& fbasename, const u_int16_t pfid, const u_int16_t lfid, const u_int32_t jfsize_sblks, + const rcvdat* const ro): + _fname(), + _pfid(pfid), + _lfid(lfid), + _ffull_dblks(JRNL_SBLK_SIZE * (jfsize_sblks + 1)), + _wr_fh(-1), + _rec_enqcnt(0), + _rd_subm_cnt_dblks(0), + _rd_cmpl_cnt_dblks(0), + _wr_subm_cnt_dblks(0), + _wr_cmpl_cnt_dblks(0), + _aio_cnt(0), + _fhdr_wr_aio_outstanding(false) +{ + initialize(fbasename, pfid, lfid, jfsize_sblks, ro); + open_wr_fh(); +} + +fcntl::~fcntl() +{ + close_wr_fh(); +} + +bool +fcntl::reset(const rcvdat* const ro) +{ + rd_reset(); + return wr_reset(ro); +} + +void +fcntl::rd_reset() +{ + _rd_subm_cnt_dblks = 0; + _rd_cmpl_cnt_dblks = 0; +} + +bool +fcntl::wr_reset(const rcvdat* const ro) +{ + if (ro) + { + if (!ro->_jempty) + { + if (ro->_lfid == _pfid) + { + _wr_subm_cnt_dblks = ro->_eo/JRNL_DBLK_SIZE; + _wr_cmpl_cnt_dblks = ro->_eo/JRNL_DBLK_SIZE; + } + else + { + _wr_subm_cnt_dblks = _ffull_dblks; + _wr_cmpl_cnt_dblks = _ffull_dblks; + } + _rec_enqcnt = ro->_enq_cnt_list[_pfid]; + return true; + } + } + // Journal overflow test - checks if the file to be reset still contains enqueued records + // or outstanding aios + if (_rec_enqcnt || _aio_cnt) + return false; + _wr_subm_cnt_dblks = 0; + _wr_cmpl_cnt_dblks = 0; + return true; +} + +int +fcntl::open_wr_fh() +{ + if (_wr_fh < 0) + { + _wr_fh = ::open(_fname.c_str(), O_WRONLY | O_DIRECT, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); // 0644 -rw-r--r-- + if (_wr_fh < 0) + { + std::ostringstream oss; + oss << "pfid=" << _pfid << " lfid=" << _lfid << " file=\"" << _fname << "\"" << FORMAT_SYSERR(errno); + throw jexception(jerrno::JERR_FCNTL_OPENWR, oss.str(), "fcntl", "open_fh"); + } + } + return _wr_fh; +} + +void +fcntl::close_wr_fh() +{ + if (_wr_fh >= 0) + { + ::close(_wr_fh); + _wr_fh = -1; + } +} + +u_int32_t +fcntl::add_enqcnt(u_int32_t a) +{ + _rec_enqcnt += a; + return _rec_enqcnt; +} + +u_int32_t +fcntl::decr_enqcnt() +{ + if (_rec_enqcnt == 0) + { + std::ostringstream oss; + oss << "pfid=" << _pfid << " lfid=" << _lfid; + throw jexception(jerrno::JERR__UNDERFLOW, oss.str(), "fcntl", "decr_enqcnt"); + } + return --_rec_enqcnt; +} + +u_int32_t +fcntl::subtr_enqcnt(u_int32_t s) +{ + if (_rec_enqcnt < s) + { + std::ostringstream oss; + oss << "pfid=" << _pfid << " lfid=" << _lfid << " rec_enqcnt=" << _rec_enqcnt << " decr=" << s; + throw jexception(jerrno::JERR__UNDERFLOW, oss.str(), "fcntl", "subtr_enqcnt"); + } + _rec_enqcnt -= s; + return _rec_enqcnt; +} + +u_int32_t +fcntl::add_rd_subm_cnt_dblks(u_int32_t a) +{ + if (_rd_subm_cnt_dblks + a > _wr_subm_cnt_dblks) + { + std::ostringstream oss; + oss << "pfid=" << _pfid << " lfid=" << _lfid << " rd_subm_cnt_dblks=" << _rd_subm_cnt_dblks << " incr=" << a; + oss << " wr_subm_cnt_dblks=" << _wr_subm_cnt_dblks; + throw jexception(jerrno::JERR_FCNTL_RDOFFSOVFL, oss.str(), "fcntl", "add_rd_subm_cnt_dblks"); + } + _rd_subm_cnt_dblks += a; + return _rd_subm_cnt_dblks; +} + +u_int32_t +fcntl::add_rd_cmpl_cnt_dblks(u_int32_t a) +{ + if (_rd_cmpl_cnt_dblks + a > _rd_subm_cnt_dblks) + { + std::ostringstream oss; + oss << "pfid=" << _pfid << " lfid=" << _lfid << " rd_cmpl_cnt_dblks=" << _rd_cmpl_cnt_dblks << " incr=" << a; + oss << " rd_subm_cnt_dblks=" << _rd_subm_cnt_dblks; + throw jexception(jerrno::JERR_FCNTL_CMPLOFFSOVFL, oss.str(), "fcntl", "add_rd_cmpl_cnt_dblks"); + } + _rd_cmpl_cnt_dblks += a; + return _rd_cmpl_cnt_dblks; +} + +u_int32_t +fcntl::add_wr_subm_cnt_dblks(u_int32_t a) +{ + if (_wr_subm_cnt_dblks + a > _ffull_dblks) // Allow for file header + { + std::ostringstream oss; + oss << "pfid=" << _pfid << " lfid=" << _lfid << " wr_subm_cnt_dblks=" << _wr_subm_cnt_dblks << " incr=" << a; + oss << " fsize=" << _ffull_dblks << " dblks"; + throw jexception(jerrno::JERR_FCNTL_FILEOFFSOVFL, oss.str(), "fcntl", "add_wr_subm_cnt_dblks"); + } + _wr_subm_cnt_dblks += a; + return _wr_subm_cnt_dblks; +} + +u_int32_t +fcntl::add_wr_cmpl_cnt_dblks(u_int32_t a) +{ + if (_wr_cmpl_cnt_dblks + a > _wr_subm_cnt_dblks) + { + std::ostringstream oss; + oss << "pfid=" << _pfid << " lfid=" << _lfid << " wr_cmpl_cnt_dblks=" << _wr_cmpl_cnt_dblks << " incr=" << a; + oss << " wr_subm_cnt_dblks=" << _wr_subm_cnt_dblks; + throw jexception(jerrno::JERR_FCNTL_CMPLOFFSOVFL, oss.str(), "fcntl", "add_wr_cmpl_cnt_dblks"); + } + _wr_cmpl_cnt_dblks += a; + return _wr_cmpl_cnt_dblks; +} + +u_int16_t +fcntl::decr_aio_cnt() +{ + if(_aio_cnt == 0) + { + std::ostringstream oss; + oss << "pfid=" << _pfid << " lfid=" << _lfid << " Decremented aio_cnt to below zero"; + throw jexception(jerrno::JERR__UNDERFLOW, oss.str(), "fcntl", "decr_aio_cnt"); + } + return --_aio_cnt; +} + +// Debug function +const std::string +fcntl::status_str() const +{ + std::ostringstream oss; + oss << "pfid=" << _pfid << " ws=" << _wr_subm_cnt_dblks << " wc=" << _wr_cmpl_cnt_dblks; + oss << " rs=" << _rd_subm_cnt_dblks << " rc=" << _rd_cmpl_cnt_dblks; + oss << " ec=" << _rec_enqcnt << " ac=" << _aio_cnt; + return oss.str(); +} + +// Protected functions + +void +fcntl::initialize(const std::string& fbasename, const u_int16_t pfid, const u_int16_t lfid, const u_int32_t jfsize_sblks, + const rcvdat* const ro) +{ + _pfid = pfid; + _lfid = lfid; + _fname = filename(fbasename, pfid); + +#ifdef RHM_JOWRITE + // In test mode, only create file if it does not exist + struct stat s; + if (::stat(_fname.c_str(), &s)) + { +#endif + if (ro) // Recovery initialization: set counters only + { + if (!ro->_jempty) + { + // For last file only, set write counters to end of last record (the + // continuation point); for all others, set to eof. + if (ro->_lfid == _pfid) + { + _wr_subm_cnt_dblks = ro->_eo/JRNL_DBLK_SIZE; + _wr_cmpl_cnt_dblks = ro->_eo/JRNL_DBLK_SIZE; + } + else + { + _wr_subm_cnt_dblks = _ffull_dblks; + _wr_cmpl_cnt_dblks = _ffull_dblks; + } + // Set the number of enqueued records for this file. + _rec_enqcnt = ro->_enq_cnt_list[_pfid]; + } + } + else // Normal initialization: create empty journal files + create_jfile(jfsize_sblks); +#ifdef RHM_JOWRITE + } +#endif +} + +std::string +fcntl::filename(const std::string& fbasename, const u_int16_t pfid) +{ + std::ostringstream oss; + oss << fbasename << "."; + oss << std::setw(4) << std::setfill('0') << std::hex << pfid; + oss << "." << JRNL_DATA_EXTENSION; + return oss.str(); +} + +void +fcntl::clean_file(const u_int32_t jfsize_sblks) +{ + // NOTE: The journal file size is always one sblock bigger than the specified journal + // file size, which is the data content size. The extra block is for the journal file + // header which precedes all data on each file and is exactly one sblock in size. + u_int32_t nsblks = jfsize_sblks + 1; + + // TODO - look at more efficient alternatives to allocating a null block: + // 1. mmap() against /dev/zero, but can alignment for O_DIRECT be assured? + // 2. ftruncate(), but does this result in a sparse file? If so, then this is no good. + + // Create temp null block for writing + const std::size_t sblksize = JRNL_DBLK_SIZE * JRNL_SBLK_SIZE; + void* nullbuf = 0; + // Allocate no more than 2MB (4096 sblks) as a null buffer + const u_int32_t nullbuffsize_sblks = nsblks > 4096 ? 4096 : nsblks; + const std::size_t nullbuffsize = nullbuffsize_sblks * sblksize; + if (::posix_memalign(&nullbuf, sblksize, nullbuffsize)) + { + std::ostringstream oss; + oss << "posix_memalign() failed: size=" << nullbuffsize << " blk_size=" << sblksize; + oss << FORMAT_SYSERR(errno); + throw jexception(jerrno::JERR__MALLOC, oss.str(), "fcntl", "clean_file"); + } + std::memset(nullbuf, 0, nullbuffsize); + + int fh = ::open(_fname.c_str(), O_WRONLY | O_CREAT | O_DIRECT, + S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); // 0644 -rw-r--r-- + if (fh < 0) + { + std::free(nullbuf); + std::ostringstream oss; + oss << "open() failed:" << FORMAT_SYSERR(errno); + throw jexception(jerrno::JERR_FCNTL_OPENWR, oss.str(), "fcntl", "clean_file"); + } + + while (nsblks > 0) + { + u_int32_t this_write_sblks = nsblks >= nullbuffsize_sblks ? nullbuffsize_sblks : nsblks; + if (::write(fh, nullbuf, this_write_sblks * sblksize) == -1) + { + ::close(fh); + std::free(nullbuf); + std::ostringstream oss; + oss << "wr_size=" << (this_write_sblks * sblksize) << FORMAT_SYSERR(errno); + throw jexception(jerrno::JERR_FCNTL_WRITE, oss.str(), "fcntl", "clean_file"); + } + nsblks -= this_write_sblks; + } + + // Clean up + std::free(nullbuf); + if (::close(fh)) + { + std::ostringstream oss; + oss << FORMAT_SYSERR(errno); + throw jexception(jerrno::JERR_FCNTL_CLOSE, oss.str(), "fcntl", "clean_file"); + } +} + +void +fcntl::create_jfile(const u_int32_t jfsize_sblks) +{ + clean_file(jfsize_sblks); +} + +} // namespace journal +} // namespace mrg |