diff options
author | Robert Gemmell <robbie@apache.org> | 2015-06-25 10:22:51 +0000 |
---|---|---|
committer | Robert Gemmell <robbie@apache.org> | 2015-06-25 10:22:51 +0000 |
commit | 32ae758bc2e8fd962b66a4ab6341b14009f1907e (patch) | |
tree | 2f4d8174813284a6ea58bb6b7f6520aa92287476 /qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp | |
parent | 116d91ad7825a98af36a869fc751206fbce0c59f (diff) | |
parent | f7e896076143de4572b4f1f67ef0765125f2498d (diff) | |
download | qpid-python-32ae758bc2e8fd962b66a4ab6341b14009f1907e.tar.gz |
NO-JIRA: create branch for qpid-cpp 0.34 RC process
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-cpp-0.34-rc@1687469 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp | 440 |
1 files changed, 440 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp b/qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp new file mode 100644 index 0000000000..cc31f2e1df --- /dev/null +++ b/qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp @@ -0,0 +1,440 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/linearstore/journal/jcntl.h" + +#include <iomanip> +#include "qpid/linearstore/journal/data_tok.h" +#include "qpid/linearstore/journal/JournalLog.h" + +namespace qpid { +namespace linearstore { +namespace journal { + +#define AIO_CMPL_TIMEOUT_SEC 5 +#define AIO_CMPL_TIMEOUT_NSEC 0 +#define FINAL_AIO_CMPL_TIMEOUT_SEC 15 +#define FINAL_AIO_CMPL_TIMEOUT_NSEC 0 + +// Static +timespec jcntl::_aio_cmpl_timeout; ///< Timeout for blocking libaio returns +timespec jcntl::_final_aio_cmpl_timeout; ///< Timeout for blocking libaio returns when stopping or finalizing +bool jcntl::_init = init_statics(); +bool jcntl::init_statics() +{ + _aio_cmpl_timeout.tv_sec = AIO_CMPL_TIMEOUT_SEC; + _aio_cmpl_timeout.tv_nsec = AIO_CMPL_TIMEOUT_NSEC; + _final_aio_cmpl_timeout.tv_sec = FINAL_AIO_CMPL_TIMEOUT_SEC; + _final_aio_cmpl_timeout.tv_nsec = FINAL_AIO_CMPL_TIMEOUT_NSEC; + return true; +} + + +// Functions + +jcntl::jcntl(const std::string& jid, + const std::string& jdir, + JournalLog& jrnl_log): + _jid(jid), + _jdir(jdir), + _init_flag(false), + _stop_flag(false), + _readonly_flag(false), + _jrnl_log(jrnl_log), + _linearFileController(*this), + _emptyFilePoolPtr(0), + _emap(), + _tmap(), + _wmgr(this, _emap, _tmap, _linearFileController), + _recoveryManager(_jdir.dirname(), _jid, _emap, _tmap, jrnl_log) +{} + +jcntl::~jcntl() +{ + if (_init_flag && !_stop_flag) + try { stop(true); } + catch (const jexception& e) { std::cerr << e << std::endl; } + _linearFileController.finalize(); +} + +void +jcntl::initialize(EmptyFilePool* efpp, + const uint16_t wcache_num_pages, + const uint32_t wcache_pgsize_sblks, + aio_callback* const cbp) +{ + _init_flag = false; + _stop_flag = false; + _readonly_flag = false; + + _emap.clear(); + _tmap.clear(); + + _linearFileController.finalize(); + _jdir.clear_dir(); // Clear any existing journal files + _linearFileController.initialize(_jdir.dirname(), efpp, 0ULL); + _linearFileController.getNextJournalFile(); + _wmgr.initialize(cbp, wcache_pgsize_sblks, wcache_num_pages, QLS_WMGR_MAXDTOKPP, QLS_WMGR_MAXWAITUS, 0); + _init_flag = true; +} + +void +jcntl::recover(EmptyFilePoolManager* efpmp, + const uint16_t wcache_num_pages, + const uint32_t wcache_pgsize_sblks, + aio_callback* const cbp, + const std::vector<std::string>* prep_txn_list_ptr, + uint64_t& highest_rid) +{ + _init_flag = false; + _stop_flag = false; + _readonly_flag = false; + + _emap.clear(); + _tmap.clear(); + + _linearFileController.finalize(); + + // Verify journal dir and journal files + _jdir.verify_dir(); + _recoveryManager.analyzeJournals(prep_txn_list_ptr, efpmp, &_emptyFilePoolPtr); + assert(_emptyFilePoolPtr != 0); + + highest_rid = _recoveryManager.getHighestRecordId(); + _jrnl_log.log(/*LOG_DEBUG*/JournalLog::LOG_INFO, _jid, _recoveryManager.toString(_jid, 5U)); + _linearFileController.initialize(_jdir.dirname(), _emptyFilePoolPtr, _recoveryManager.getHighestFileNumber()); + _recoveryManager.setLinearFileControllerJournals(&qpid::linearstore::journal::LinearFileController::addJournalFile, &_linearFileController); + if (_recoveryManager.isLastFileFull()) { + _linearFileController.getNextJournalFile(); + } + _wmgr.initialize(cbp, wcache_pgsize_sblks, wcache_num_pages, QLS_WMGR_MAXDTOKPP, QLS_WMGR_MAXWAITUS, + (_recoveryManager.isLastFileFull() ? 0 : _recoveryManager.getEndOffset())); + + _readonly_flag = true; + _init_flag = true; +} + +void +jcntl::recover_complete() +{ + if (!_readonly_flag) + throw jexception(jerrno::JERR_JCNTL_NOTRECOVERED, "jcntl", "recover_complete"); + _recoveryManager.recoveryComplete(); + _readonly_flag = false; +} + +void +jcntl::delete_jrnl_files() +{ + stop(true); // wait for AIO to complete + _linearFileController.purgeEmptyFilesToEfp(); + _jdir.delete_dir(); +} + + +iores +jcntl::enqueue_data_record(const void* const data_buff, + const std::size_t tot_data_len, + const std::size_t this_data_len, + data_tok* dtokp, + const bool transient) +{ + iores r; + check_wstatus("enqueue_data_record"); + { + slock s(_wr_mutex); + while (handle_aio_wait(_wmgr.enqueue(data_buff, tot_data_len, this_data_len, dtokp, 0, 0, false, transient, false), r, + dtokp)) ; + } + return r; +} + +iores +jcntl::enqueue_extern_data_record(const std::size_t tot_data_len, + data_tok* dtokp, + const bool transient) +{ + iores r; + check_wstatus("enqueue_extern_data_record"); + { + slock s(_wr_mutex); + while (handle_aio_wait(_wmgr.enqueue(0, tot_data_len, 0, dtokp, 0, 0, false, transient, true), r, dtokp)) ; + } + return r; +} + +iores +jcntl::enqueue_txn_data_record(const void* const data_buff, + const std::size_t tot_data_len, + const std::size_t this_data_len, + data_tok* dtokp, + const std::string& xid, + const bool tpc_flag, + const bool transient) +{ + iores r; + check_wstatus("enqueue_tx_data_record"); + { + slock s(_wr_mutex); + while (handle_aio_wait(_wmgr.enqueue(data_buff, tot_data_len, this_data_len, dtokp, xid.data(), xid.size(), + tpc_flag, transient, false), r, dtokp)) ; + } + return r; +} + +iores +jcntl::enqueue_extern_txn_data_record(const std::size_t tot_data_len, + data_tok* dtokp, + const std::string& xid, + const bool tpc_flag, + const bool transient) +{ + iores r; + check_wstatus("enqueue_extern_txn_data_record"); + { + slock s(_wr_mutex); + while (handle_aio_wait(_wmgr.enqueue(0, tot_data_len, 0, dtokp, xid.data(), xid.size(), tpc_flag, transient, + true), r, dtokp)) ; + } + return r; +} + +iores +jcntl::read_data_record(void** const datapp, + std::size_t& dsize, + void** const xidpp, + std::size_t& xidsize, + bool& transient, + bool& external, + data_tok* const dtokp, + bool ignore_pending_txns) +{ + check_rstatus("read_data"); + if (_recoveryManager.readNextRemainingRecord(datapp, dsize, xidpp, xidsize, transient, external, dtokp, ignore_pending_txns)) { + return RHM_IORES_SUCCESS; + } + return RHM_IORES_EMPTY; +} + +iores +jcntl::dequeue_data_record(data_tok* const dtokp, + const bool txn_coml_commit) +{ + iores r; + check_wstatus("dequeue_data"); + { + slock s(_wr_mutex); + while (handle_aio_wait(_wmgr.dequeue(dtokp, 0, 0, false, txn_coml_commit), r, dtokp)) ; + } + return r; +} + +iores +jcntl::dequeue_txn_data_record(data_tok* const dtokp, + const std::string& xid, + const bool tpc_flag, + const bool txn_coml_commit) +{ + iores r; + check_wstatus("dequeue_data"); + { + slock s(_wr_mutex); + while (handle_aio_wait(_wmgr.dequeue(dtokp, xid.data(), xid.size(), tpc_flag, txn_coml_commit), r, dtokp)) ; + } + return r; +} + +iores +jcntl::txn_abort(data_tok* const dtokp, + const std::string& xid) +{ + iores r; + check_wstatus("txn_abort"); + { + slock s(_wr_mutex); + while (handle_aio_wait(_wmgr.abort(dtokp, xid.data(), xid.size()), r, dtokp)) ; + } + return r; +} + +iores +jcntl::txn_commit(data_tok* const dtokp, + const std::string& xid) +{ + iores r; + check_wstatus("txn_commit"); + { + slock s(_wr_mutex); + while (handle_aio_wait(_wmgr.commit(dtokp, xid.data(), xid.size()), r, dtokp)) ; + } + return r; +} + +bool +jcntl::is_txn_synced(const std::string& xid) +{ + slock s(_wr_mutex); + bool res = _wmgr.is_txn_synced(xid); + return res; +} + +int32_t +jcntl::get_wr_events(timespec* const timeout) +{ + stlock t(_wr_mutex); + if (!t.locked()) + return jerrno::LOCK_TAKEN; + return _wmgr.get_events(timeout, false); +} + +void +jcntl::stop(const bool block_till_aio_cmpl) +{ + if (_readonly_flag) + check_rstatus("stop"); + else + check_wstatus("stop"); + _stop_flag = true; + if (!_readonly_flag) + flush(block_till_aio_cmpl); +} + +LinearFileController& +jcntl::getLinearFileControllerRef() { + return _linearFileController; +} + +// static +std::string +jcntl::str2hexnum(const std::string& str) { + if (str.empty()) { + return "<null>"; + } + std::ostringstream oss; + oss << "(" << str.size() << ")0x" << std::hex; + for (unsigned i=str.size(); i>0; --i) { + oss << std::setfill('0') << std::setw(2) << (uint16_t)(uint8_t)str[i-1]; + } + return oss.str(); +} + +iores +jcntl::flush(const bool block_till_aio_cmpl) +{ + if (!_init_flag) + return RHM_IORES_SUCCESS; + if (_readonly_flag) + throw jexception(jerrno::JERR_JCNTL_READONLY, "jcntl", "flush"); + iores res; + { + slock s(_wr_mutex); + res = _wmgr.flush(); + } + if (block_till_aio_cmpl) + aio_cmpl_wait(); + return res; +} + +// Protected/Private functions + +void +jcntl::check_wstatus(const char* fn_name) const +{ + if (!_init_flag) + throw jexception(jerrno::JERR__NINIT, "jcntl", fn_name); + if (_readonly_flag) + throw jexception(jerrno::JERR_JCNTL_READONLY, "jcntl", fn_name); + if (_stop_flag) + throw jexception(jerrno::JERR_JCNTL_STOPPED, "jcntl", fn_name); +} + +void +jcntl::check_rstatus(const char* fn_name) const +{ + if (!_init_flag) + throw jexception(jerrno::JERR__NINIT, "jcntl", fn_name); + if (_stop_flag) + throw jexception(jerrno::JERR_JCNTL_STOPPED, "jcntl", fn_name); +} + + +void +jcntl::aio_cmpl_wait() +{ + //while (_wmgr.get_aio_evt_rem()) + while (true) + { + uint32_t aer; + { + slock s(_wr_mutex); + aer = _wmgr.get_aio_evt_rem(); + } + if (aer == 0) break; // no events left + if (get_wr_events(&_aio_cmpl_timeout) == jerrno::AIO_TIMEOUT) + throw jexception(jerrno::JERR_JCNTL_AIOCMPLWAIT, "jcntl", "aio_cmpl_wait"); + } +} + + +bool +jcntl::handle_aio_wait(const iores res, iores& resout, const data_tok* dtp) +{ + resout = res; + if (res == RHM_IORES_PAGE_AIOWAIT) + { + while (_wmgr.curr_pg_blocked()) + { + if (_wmgr.get_aio_evt_rem() == 0) { +//std::cout << "&&&&&& jcntl::handle_aio_wait() " << _wmgr.status_str() << std::endl; // DEBUG + throw jexception("_wmgr.curr_pg_blocked() with no events remaining"); // TODO - complete exception + } + if (_wmgr.get_events(&_aio_cmpl_timeout, false) == jerrno::AIO_TIMEOUT) + { + std::ostringstream oss; + oss << "get_events() returned JERR_JCNTL_AIOCMPLWAIT; wmgr_status: " << _wmgr.status_str(); + _jrnl_log.log(JournalLog::LOG_CRITICAL, _jid, oss.str()); + throw jexception(jerrno::JERR_JCNTL_AIOCMPLWAIT, "jcntl", "handle_aio_wait"); + } + } + return true; + } + else if (res == RHM_IORES_FILE_AIOWAIT) + { +// while (_wmgr.curr_file_blocked()) +// { +// if (_wmgr.get_events(pmgr::UNUSED, &_aio_cmpl_timeout) == jerrno::AIO_TIMEOUT) +// { +// std::ostringstream oss; +// oss << "get_events() returned JERR_JCNTL_AIOCMPLWAIT; wmgr_status: " << _wmgr.status_str(); +// this->log(LOG_CRITICAL, oss.str()); +// throw jexception(jerrno::JERR_JCNTL_AIOCMPLWAIT, "jcntl", "handle_aio_wait"); +// } +// } +// _wrfc.wr_reset(); + resout = RHM_IORES_SUCCESS; + data_tok::write_state ws = dtp->wstate(); + return ws == data_tok::ENQ_PART || ws == data_tok::DEQ_PART || ws == data_tok::ABORT_PART || + ws == data_tok::COMMIT_PART; + } + return false; +} + +}}} |