From 1d8697cfcfa2d292d5b303797a0f1266cd3bb1d7 Mon Sep 17 00:00:00 2001 From: Kim van der Riet Date: Tue, 18 Mar 2014 13:54:46 +0000 Subject: QPID-5362: No store tools exist for examining the journals - Bugfix and reorganization of qls python modules. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1578899 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/linearstore/ISSUES | 30 +- qpid/cpp/src/qpid/linearstore/journal/jdir.cpp | 152 ++-- qpid/cpp/src/qpid/linearstore/journal/jdir.h | 2 + .../src/tests/linearstore/linearstoredirsetup.sh | 51 +- qpid/cpp/src/tests/linearstore/tx-test-soak.sh | 31 +- qpid/tools/src/py/qls/anal.py | 593 ++++++++++++++ qpid/tools/src/py/qls/efp.py | 275 +++++-- qpid/tools/src/py/qls/err.py | 62 +- qpid/tools/src/py/qls/jrnl.py | 873 +++------------------ qpid/tools/src/py/qls/utils.py | 206 +++++ qpid/tools/src/py/qpid_qls_analyze.py | 67 +- 11 files changed, 1366 insertions(+), 976 deletions(-) create mode 100644 qpid/tools/src/py/qls/anal.py create mode 100644 qpid/tools/src/py/qls/utils.py diff --git a/qpid/cpp/src/qpid/linearstore/ISSUES b/qpid/cpp/src/qpid/linearstore/ISSUES index a9908e882e..ccadefc20c 100644 --- a/qpid/cpp/src/qpid/linearstore/ISSUES +++ b/qpid/cpp/src/qpid/linearstore/ISSUES @@ -47,8 +47,6 @@ Current/pending: svn r.1558592 2014-01-15 fixes an issue with using /dev/random as a source of random numbers for Journal serial numbers. svn r.1558913 2014-01-16 replaces use of /dev/urandom with several calls to rand() to construct a 64-bit random number. * Recommend rebuilding and testing for performance again with these two fixes. Marked POST. -# - 1036026 [LinearStore] Qpid linear store unable to create durable queue - framing-error: Queue : create() failed: jexception 0x0000 - UNABLE TO REPRODUCE - but Frantizek has additional info - 1039522 Qpid crashes while recovering from linear store around apid::linearstore::journal::JournalFile::getFqFileName() including enq_rec::decode() threw JERR_JREC_BAD_RECTAIL * Possible dup of 1039525 * May be fixed by QPID-5483 - waiting for needinfo, recommend rebuilding with QPID-5483 fix and re-testing. Marked POST. @@ -56,18 +54,6 @@ Current/pending: * Possible dup of 1039522 * May be fixed by QPID-5483 - waiting for needinfo, recommend rebuilding with QPID-5483 fix and re-testing. Marked POST. # - 1049870 [LinearStore] auto-delete property does not survive restart -# 5480 1053749 [linearstore] Recovery of store failure with "JERR_MAP_NOTFOUND: Key not found in map." error message - svn r.1564877 2014-02-05: Proposed fix - * Probability: 6 of 600 (1.0%) using tx-test-soak.sh - * If broker is started a second time after failure, it starts correctly and test completes ok. - * Problem: File is being recycled to EFP with still-locked enqueues in it (ie dequeued transactionally). - * Problem: Record alignment check writes filler records to wrong file when decoding bad record moves across a file boundary - * Test of fix failed on RHEL-7 -# - 1064181 [linearstore] Qpidd closes transactional client session&connection with async_dequeue() failed - * jexception 0x010b LinearFileController::getCurrentSerial() threw JERR_NULL -# - 1064230 [linearstore] Qpidd linearstore recovery sometimes fail to recover messages with recoverMessages() failed - * jexception 0x0701 RecoveryManager::readNextRemainingRecord() threw JERR_JREC_BADRECTAIL - * possible dup of 1063700 Fixed/closed (in commit order): =============================== @@ -104,9 +90,24 @@ NO-JIRA - Added missing Apache copyright/license text 5479 1053701 [linearstore] Using recovered store results in "JERR_JNLF_FILEOFFSOVFL: Attempted to increase submitted offset past file size. (JournalFile::submittedDblkCount)" error message * Probability: 2 of 600 (0.3%) using tx-test-soak.sh * Fixed by checkin for QPID-5480, no longer able to reproduce. VERIFIED + 5480 1053749 [linearstore] Recovery of store failure with "JERR_MAP_NOTFOUND: Key not found in map." error message + svn r.1564877 2014-02-05: Proposed fix + * Probability: 6 of 600 (1.0%) using tx-test-soak.sh + * If broker is started a second time after failure, it starts correctly and test completes ok. + * Problem: File is being recycled to EFP with still-locked enqueues in it (ie dequeued transactionally). + * Problem: Record alignment check writes filler records to wrong file when decoding bad record moves across a file boundary 5603 1063700 [linearstore] broker restart fails under stress test svn r.1574513 2014-03-05: Proposed fix. POST * jexception 0x0701 RecoveryManager::readNextRemainingRecord() threw JERR_JREC_BADRECTAIL + 5607 1064181 [linearstore] Qpidd closes transactional client session&connection with async_dequeue() failed + svn r.1575009 2014-03-06 Proposed fix. POST + * jexception 0x010b LinearFileController::getCurrentSerial() threw JERR_NULL + - 1064230 [linearstore] Qpidd linearstore recovery sometimes fail to recover messages with recoverMessages() failed + * jexception 0x0701 RecoveryManager::readNextRemainingRecord() threw JERR_JREC_BADRECTAIL + * possible dup of 1063700 + - 1036026 [LinearStore] Qpid linear store unable to create durable queue - framing-error: Queue : create() failed: jexception 0x0000 + * UNABLE TO REPRODUCE - but Frantizek has additional info + * Retested after checkin 1575009, problem solved. VERIFIED Ordered checkin list: ===================== @@ -135,6 +136,7 @@ no. svn r Q-JIRA RHBZ Date 19. 1564893 5361 - 2014-02-05 20. 1564935 5361 - 2014-02-05 21. 1574513 5603 1063700 2014-03-05 +22. 1575009 5607 1064181 2014-03-06 See above sections for details on these checkins. diff --git a/qpid/cpp/src/qpid/linearstore/journal/jdir.cpp b/qpid/cpp/src/qpid/linearstore/journal/jdir.cpp index 896f44ceff..36f180c21f 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/jdir.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/jdir.cpp @@ -101,17 +101,9 @@ jdir::clear_dir(const std::string& dirname/*, const std::string& */ , const bool create_flag) { - DIR* dir = ::opendir(dirname.c_str()); - if (!dir) - { - if (errno == 2 && create_flag) // ENOENT (No such file or dir) - { - create_dir(dirname); - return; - } - std::ostringstream oss; - oss << "dir=\"" << dirname << "\"" << FORMAT_SYSERR(errno); - throw jexception(jerrno::JERR_JDIR_OPENDIR, oss.str(), "jdir", "clear_dir"); + DIR* dir = open_dir(dirname, "clear_dir", true); + if (!dir && create_flag) { + create_dir(dirname); } //#ifndef RHM_JOWRITE struct dirent* entry; @@ -161,13 +153,7 @@ jdir::push_down(const std::string& dirname, const std::string& target_dir/*, con { std::string bak_dir_name = create_bak_dir(dirname/*, bak_dir_base*/); - DIR* dir = ::opendir(dirname.c_str()); - if (!dir) - { - std::ostringstream oss; - oss << "dir=\"" << dirname << "\"" << FORMAT_SYSERR(errno); - throw jexception(jerrno::JERR_JDIR_OPENDIR, oss.str(), "jdir", "push_down"); - } + DIR* dir = open_dir(dirname, "push_down", false); // Copy contents of targetDirName into bak dir struct dirent* entry; while ((entry = ::readdir(dir)) != 0) @@ -251,60 +237,49 @@ jdir::delete_dir(const std::string& dirname, bool children_only) { struct dirent* entry; struct stat s; - DIR* dir = ::opendir(dirname.c_str()); - if (!dir) - { - if (errno == ENOENT) // dir does not exist. - return; - - std::ostringstream oss; - oss << "dir=\"" << dirname << "\"" << FORMAT_SYSERR(errno); - throw jexception(jerrno::JERR_JDIR_OPENDIR, oss.str(), "jdir", "delete_dir"); - } - else + DIR* dir = open_dir(dirname, "delete_dir", true); // true = allow dir does not exist, return 0 + if (!dir) return; + while ((entry = ::readdir(dir)) != 0) { - while ((entry = ::readdir(dir)) != 0) + // Ignore . and .. + if (std::strcmp(entry->d_name, ".") != 0 && std::strcmp(entry->d_name, "..") != 0) { - // Ignore . and .. - if (std::strcmp(entry->d_name, ".") != 0 && std::strcmp(entry->d_name, "..") != 0) + std::string full_name(dirname + "/" + entry->d_name); + if (::lstat(full_name.c_str(), &s)) { - std::string full_name(dirname + "/" + entry->d_name); - if (::lstat(full_name.c_str(), &s)) - { - ::closedir(dir); - std::ostringstream oss; - oss << "stat: file=\"" << full_name << "\"" << FORMAT_SYSERR(errno); - throw jexception(jerrno::JERR_JDIR_STAT, oss.str(), "jdir", "delete_dir"); - } - if (S_ISREG(s.st_mode) || S_ISLNK(s.st_mode)) // This is a file or slink - { - if(::unlink(full_name.c_str())) - { - ::closedir(dir); - std::ostringstream oss; - oss << "unlink: file=\"" << entry->d_name << "\"" << FORMAT_SYSERR(errno); - throw jexception(jerrno::JERR_JDIR_UNLINK, oss.str(), "jdir", "delete_dir"); - } - } - else if (S_ISDIR(s.st_mode)) // This is a dir - { - delete_dir(full_name); - } - else // all other types, throw up! + ::closedir(dir); + std::ostringstream oss; + oss << "stat: file=\"" << full_name << "\"" << FORMAT_SYSERR(errno); + throw jexception(jerrno::JERR_JDIR_STAT, oss.str(), "jdir", "delete_dir"); + } + if (S_ISREG(s.st_mode) || S_ISLNK(s.st_mode)) // This is a file or slink + { + if(::unlink(full_name.c_str())) { ::closedir(dir); std::ostringstream oss; - oss << "file=\"" << entry->d_name << "\" is not a dir, file or slink."; - oss << " (mode=0x" << std::hex << s.st_mode << std::dec << ")"; - throw jexception(jerrno::JERR_JDIR_BADFTYPE, oss.str(), "jdir", "delete_dir"); + oss << "unlink: file=\"" << entry->d_name << "\"" << FORMAT_SYSERR(errno); + throw jexception(jerrno::JERR_JDIR_UNLINK, oss.str(), "jdir", "delete_dir"); } } + else if (S_ISDIR(s.st_mode)) // This is a dir + { + delete_dir(full_name); + } + else // all other types, throw up! + { + ::closedir(dir); + std::ostringstream oss; + oss << "file=\"" << entry->d_name << "\" is not a dir, file or slink."; + oss << " (mode=0x" << std::hex << s.st_mode << std::dec << ")"; + throw jexception(jerrno::JERR_JDIR_BADFTYPE, oss.str(), "jdir", "delete_dir"); + } } + } // FIXME: Find out why this fails with false alarms/errors from time to time... // While commented out, there is no error capture from reading dir entries. // check_err(errno, dir, dirname, "delete_dir"); - } // Now dir is empty, close and delete it close_dir(dir, dirname, "delete_dir"); @@ -321,14 +296,8 @@ jdir::delete_dir(const std::string& dirname, bool children_only) std::string jdir::create_bak_dir(const std::string& dirname) { - DIR* dir = ::opendir(dirname.c_str()); + DIR* dir = open_dir(dirname, "create_bak_dir", false); long dir_num = 0L; - if (!dir) - { - std::ostringstream oss; - oss << "dir=\"" << dirname << "\"" << FORMAT_SYSERR(errno); - throw jexception(jerrno::JERR_JDIR_OPENDIR, oss.str(), "jdir", "create_bak_dir"); - } struct dirent* entry; while ((entry = ::readdir(dir)) != 0) { @@ -407,25 +376,23 @@ void jdir::read_dir(const std::string& name, std::vector& dir_list, const bool incl_dirs, const bool incl_files, const bool incl_links, const bool return_fqfn) { struct stat s; if (is_dir(name)) { - DIR* dir = ::opendir(name.c_str()); - if (dir != 0) { - struct dirent* entry; - while ((entry = ::readdir(dir)) != 0) { - if (std::strcmp(entry->d_name, ".") != 0 && std::strcmp(entry->d_name, "..") != 0) { // Ignore . and .. - std::string full_name(name + "/" + entry->d_name); - if (::stat(full_name.c_str(), &s)) - { - ::closedir(dir); - std::ostringstream oss; - oss << "stat: file=\"" << full_name << "\"" << FORMAT_SYSERR(errno); - throw jexception(jerrno::JERR_JDIR_STAT, oss.str(), "jdir", "delete_dir"); - } - if ((S_ISREG(s.st_mode) && incl_files) || (S_ISDIR(s.st_mode) && incl_dirs) || (S_ISLNK(s.st_mode) && incl_links)) { - if (return_fqfn) { - dir_list.push_back(name + "/" + entry->d_name); - } else { - dir_list.push_back(entry->d_name); - } + DIR* dir = open_dir(name, "read_dir", false); + struct dirent* entry; + while ((entry = ::readdir(dir)) != 0) { + if (std::strcmp(entry->d_name, ".") != 0 && std::strcmp(entry->d_name, "..") != 0) { // Ignore . and .. + std::string full_name(name + "/" + entry->d_name); + if (::stat(full_name.c_str(), &s)) + { + ::closedir(dir); + std::ostringstream oss; + oss << "stat: file=\"" << full_name << "\"" << FORMAT_SYSERR(errno); + throw jexception(jerrno::JERR_JDIR_STAT, oss.str(), "jdir", "delete_dir"); + } + if ((S_ISREG(s.st_mode) && incl_files) || (S_ISDIR(s.st_mode) && incl_dirs) || (S_ISLNK(s.st_mode) && incl_links)) { + if (return_fqfn) { + dir_list.push_back(name + "/" + entry->d_name); + } else { + dir_list.push_back(entry->d_name); } } } @@ -457,6 +424,21 @@ jdir::close_dir(DIR* dir, const std::string& dir_name, const std::string& fn_nam } } +DIR* +jdir::open_dir(const std::string& dir_name, const std::string& fn_name, const bool test_enoent) +{ + DIR* dir = ::opendir(dir_name.c_str()); + if (!dir) { + if (test_enoent && errno == ENOENT) { + return 0; + } + std::ostringstream oss; + oss << "dir=\"" << dir_name << "\"" << FORMAT_SYSERR(errno); + throw jexception(jerrno::JERR_JDIR_OPENDIR, oss.str(), "jdir", fn_name); + } + return dir; +} + std::ostream& operator<<(std::ostream& os, const jdir& jdir) { diff --git a/qpid/cpp/src/qpid/linearstore/journal/jdir.h b/qpid/cpp/src/qpid/linearstore/journal/jdir.h index 86b16f8545..59f21ce499 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/jdir.h +++ b/qpid/cpp/src/qpid/linearstore/journal/jdir.h @@ -353,6 +353,8 @@ namespace journal { * \exception jerrno::JERR_JDIR_CLOSEDIR The directory handle could not be closed. */ static void close_dir(DIR* dir, const std::string& dir_name, const std::string& fn_name); + + static DIR* open_dir(const std::string& dir_name, const std::string& fn_name, const bool test_enoent); }; }}} diff --git a/qpid/cpp/src/tests/linearstore/linearstoredirsetup.sh b/qpid/cpp/src/tests/linearstore/linearstoredirsetup.sh index 3cad50b1c5..ef39767e9b 100755 --- a/qpid/cpp/src/tests/linearstore/linearstoredirsetup.sh +++ b/qpid/cpp/src/tests/linearstore/linearstoredirsetup.sh @@ -19,26 +19,37 @@ # under the License. # - -STORE_DIR=/tmp -LINEARSTOREDIR=~/RedHat/linearstore - -rm -rf $STORE_DIR/qls -rm -rf $STORE_DIR/p002 -rm $STORE_DIR/p004 - -mkdir $STORE_DIR/qls -mkdir $STORE_DIR/p002 -touch $STORE_DIR/p004 -mkdir $STORE_DIR/qls/p001 -touch $STORE_DIR/qls/p003 -ln -s $STORE_DIR/p002 $STORE_DIR/qls/p002 -ln -s $STORE_DIR/p004 $STORE_DIR/qls/p004 - -${LINEARSTOREDIR}/tools/src/py/linearstore/efptool.py $STORE_DIR/qls/ -a -p 1 -s 2048 -n 25 -${LINEARSTOREDIR}/tools/src/py/linearstore/efptool.py $STORE_DIR/qls/ -a -p 1 -s 512 -n 25 -${LINEARSTOREDIR}/tools/src/py/linearstore/efptool.py $STORE_DIR/qls/ -a -p 2 -s 2048 -n 25 - +# This script sets up a test directory which contains both +# recoverable and non-recoverable files and directories for +# the empty file pool (EFP). + +# NOTE: The following is based on typical development tree paths, not installed paths + +BASE_DIR=${HOME}/RedHat +STORE_DIR=${BASE_DIR} +PYTHON_TOOLS_DIR=${BASE_DIR}/qpid/tools/src/linearstore +export PYTHONPATH=${BASE_DIR}/qpid/python:${BASE_DIR}/qpid/extras/qmf/src/py:${BASE_DIR}/qpid/tools/src/py + +# Remove old dirs (if present) +rm -rf ${STORE_DIR}/qls +rm -rf ${STORE_DIR}/p002 +rm ${STORE_DIR}/p004 + +# Create new dir tree and links +mkdir ${STORE_DIR}/p002_ext +touch ${STORE_DIR}/p004_ext +mkdir ${STORE_DIR}/qls +mkdir ${STORE_DIR}/qls/p001 +touch ${STORE_DIR}/qls/p003 +ln -s ${STORE_DIR}/p002_ext ${STORE_DIR}/qls/p002 +ln -s ${STORE_DIR}/p004_ext ${STORE_DIR}/qls/p004 + +# Populate efp dirs with empty files +${PYTHON_TOOLS_DIR}/efptool.py $STORE_DIR/qls/ -a -p 1 -s 2048 -n 25 +${PYTHON_TOOLS_DIR}/efptool.py $STORE_DIR/qls/ -a -p 1 -s 512 -n 25 +${PYTHON_TOOLS_DIR}/efptool.py $STORE_DIR/qls/ -a -p 2 -s 2048 -n 25 + +# Show the result for information ${LINEARSTOREDIR}/tools/src/py/linearstore/efptool.py $STORE_DIR/qls/ -l tree -la $STORE_DIR/qls diff --git a/qpid/cpp/src/tests/linearstore/tx-test-soak.sh b/qpid/cpp/src/tests/linearstore/tx-test-soak.sh index fa05e0a4a8..7d5581961f 100755 --- a/qpid/cpp/src/tests/linearstore/tx-test-soak.sh +++ b/qpid/cpp/src/tests/linearstore/tx-test-soak.sh @@ -19,7 +19,6 @@ # under the License. # - # tx-test-soak # # Basic test methodology: @@ -30,6 +29,8 @@ # 5. Run qpid-txtest against broker in check mode, which checks that all expected messages are present. # 6. Wash, rinse, repeat... The number of runs is determined by ${NUM_RUNS} +# NOTE: The following is based on typical development tree paths, not installed paths + NUM_RUNS=1000 BASE_DIR=${HOME}/RedHat CMAKE_BUILD_DIR=${BASE_DIR}/q.cm @@ -43,13 +44,18 @@ BROKER_MANAGEMENT="no" # "no" or "yes" TRUNCATE_INTERVAL=10 MAX_DISK_PERC_USED=90 -# Consts (don't adjust these...) +# Constants (don't adjust these) export BASE_DIR RELATIVE_BASE_DIR=`python -c "import os,os.path; print os.path.relpath(os.environ['BASE_DIR'], os.environ['PWD'])"` +export PYTHONPATH=${BASE_DIR}/qpid/python:${BASE_DIR}/qpid/extras/qmf/src/py:${BASE_DIR}/qpid/tools/src/py LOG_FILE_NAME=log.txt QPIDD_FN=qpidd QPIDD=${CMAKE_BUILD_DIR}/src/${QPIDD_FN} -TXTEST=${CMAKE_BUILD_DIR}/src/tests/qpid-txtest +TXTEST_FN=qpid-txtest +TXTEST=${CMAKE_BUILD_DIR}/src/tests/${TXTEST_FN} +ANALYZE_FN=qpid_qls_analyze.py +ANALYZE=${BASE_DIR}/qpid/tools/src/py/${ANALYZE_FN} +ANALYZE_ARGS="--efp --show-recs --stats" QPIDD_BASE_ARGS="--load-module ${STORE_MODULE} -m ${BROKER_MANAGEMENT} --auth no --default-flow-stop-threshold 0 --default-flow-resume-threshold 0 --default-queue-limit 0 --store-dir ${BASE_DIR} --log-enable ${BROKER_LOG_LEVEL} --log-to-stderr no --log-to-stdout no" TXTEST_INIT_STR="--init yes --transfer no --check no" TXTEST_RUN_STR="--init no --transfer yes --check no" @@ -181,6 +187,17 @@ check_ready_to_run() { fi } +# Analyze store files +# $1: Log suffix flag: either "A" or "B". If "A", client is started in test mode, otherwise client evaluates recovery. +analyze_store() { + ${ANALYZE} ${ANALYZE_ARGS} ${BASE_DIR}/qls &> ${RESULT_DIR}/qls_analysis.$1.log + echo >> ${RESULT_DIR}/qls_analysis.$1.log + echo "----------------------------------------------------------" >> ${RESULT_DIR}/qls_analysis.$1.log + echo "With transactional reconsiliation:" >> ${RESULT_DIR}/qls_analysis.$1.log + echo >> ${RESULT_DIR}/qls_analysis.$1.log + ${ANALYZE} ${ANALYZE_ARGS} --txn ${BASE_DIR}/qls &>> ${RESULT_DIR}/qls_analysis.$1.log +} + ulimit -c unlimited # Allow core files to be created RESULT_BASE_DIR_SUFFIX=`date "${TIMESTAMP_FORMAT}"` @@ -219,7 +236,8 @@ for rn in `seq ${NUM_RUNS}`; do sleep ${RUN_TIME} kill_process ${SIG_KILL} ${QPIDD_PID} sleep 2 - tar -czf ${RESULT_DIR}/qls_B.tar.gz ${RELATIVE_BASE_DIR}/qls + analyze_store "A" + tar -czf ${RESULT_DIR}/qls_A.tar.gz ${RELATIVE_BASE_DIR}/qls # === PART B: Recovery and check === start_broker "B" @@ -234,11 +252,14 @@ for rn in `seq ${NUM_RUNS}`; do kill_process ${SIG_KILL} ${PID} sleep 2 fi - tar -czf ${RESULT_DIR}/qls_C.tar.gz ${RELATIVE_BASE_DIR}/qls + analyze_store "B" + tar -czf ${RESULT_DIR}/qls_B.tar.gz ${RELATIVE_BASE_DIR}/qls # === Check for errors, cores and exceptions in logs === grep -Hn "jexception" ${RESULT_DIR}/qpidd.A.log | tee -a ${LOG_FILE} grep -Hn "jexception" ${RESULT_DIR}/qpidd.B.log | tee -a ${LOG_FILE} + grep -Hn "Traceback (most recent call last):" ${RESULT_DIR}/qls_analysis.A.log | tee -a ${LOG_FILE} + grep -Hn "Traceback (most recent call last):" ${RESULT_DIR}/qls_analysis.B.log | tee -a ${LOG_FILE} grep "${SUCCESS_MSG}" ${RESULT_DIR}/txtest.B.log &> /dev/null if [[ "$?" != "0" ]]; then echo "ERROR in run ${rn}" >> ${LOG_FILE} diff --git a/qpid/tools/src/py/qls/anal.py b/qpid/tools/src/py/qls/anal.py new file mode 100644 index 0000000000..865dfa16c7 --- /dev/null +++ b/qpid/tools/src/py/qls/anal.py @@ -0,0 +1,593 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +Module: qls.anal + +Classes for recovery and analysis of a Qpid Linear Store (QLS). +""" + +import os.path +import qls.err +import qls.jrnl +import qls.utils + +class HighCounter(object): + def __init__(self): + self.num = 0 + def check(self, num): + if self.num < num: + self.num = num + def get(self): + return self.num + def get_next(self): + self.num += 1 + return self.num + +class JournalRecoveryManager(object): + TPL_DIR_NAME = 'tpl' + JRNL_DIR_NAME = 'jrnl' + def __init__(self, directory, args): + if not os.path.exists(directory): + raise qls.err.InvalidQlsDirectoryNameError(directory) + self.directory = directory + self.args = args + self.tpl = None + self.journals = {} + self.high_rid_counter = HighCounter() + self.prepared_list = None + def report(self, print_stats_flag): + self._reconcile_transactions(self.prepared_list, self.args.txn) + if self.tpl is not None: + self.tpl.report(print_stats_flag) + for queue_name in sorted(self.journals.keys()): + self.journals[queue_name].report(print_stats_flag) + def run(self): + tpl_dir = os.path.join(self.directory, JournalRecoveryManager.TPL_DIR_NAME) + if os.path.exists(tpl_dir): + self.tpl = Journal(tpl_dir, None, self.args) + self.tpl.recover(self.high_rid_counter) + if self.args.show_recs or self.args.show_all_recs: + print + jrnl_dir = os.path.join(self.directory, JournalRecoveryManager.JRNL_DIR_NAME) + self.prepared_list = self.tpl.txn_map.get_prepared_list() if self.tpl is not None else {} + if os.path.exists(jrnl_dir): + for dir_entry in sorted(os.listdir(jrnl_dir)): + jrnl = Journal(os.path.join(jrnl_dir, dir_entry), self.prepared_list, self.args) + jrnl.recover(self.high_rid_counter) + self.journals[jrnl.get_queue_name()] = jrnl + if self.args.show_recs or self.args.show_all_recs: + print + def _reconcile_transactions(self, prepared_list, txn_flag): + print 'Transaction reconciliation report:' + print '==================================' + print len(prepared_list), 'open transaction(s) found in prepared transaction list:' + for xid in prepared_list.keys(): + commit_flag = prepared_list[xid] + if commit_flag is None: + status = '[Prepared, neither committed nor aborted - assuming commit]' + elif commit_flag: + status = '[Prepared, but interrupted during commit phase]' + else: + status = '[Prepared, but interrupted during abort phase]' + print ' ', qls.utils.format_xid(xid), status + if prepared_list[xid] is None: # Prepared, but not committed or aborted + enqueue_record = self.tpl.get_txn_map_record(xid)[0][1] + dequeue_record = qls.utils.create_record(qls.jrnl.DequeueRecord.MAGIC, \ + qls.jrnl.DequeueRecord.TXN_COMPLETE_COMMIT_FLAG, \ + self.tpl.current_journal_file, \ + self.high_rid_counter.get_next(), \ + enqueue_record.record_id, xid, None) + if txn_flag: + self.tpl.add_record(dequeue_record) + for queue_name in sorted(self.journals.keys()): + self.journals[queue_name].reconcile_transactions(prepared_list, txn_flag) + if len(prepared_list) > 0: + print 'Completing prepared transactions in prepared transaction list:' + for xid in prepared_list.keys(): + print ' ', qls.utils.format_xid(xid) + transaction_record = qls.utils.create_record(qls.jrnl.TransactionRecord.MAGIC_COMMIT, 0, \ + self.tpl.current_journal_file, \ + self.high_rid_counter.get_next(), None, xid, None) + if txn_flag: + self.tpl.add_record(transaction_record) + print + +class EnqueueMap(object): + """ + Map of enqueued records in a QLS journal + """ + def __init__(self, journal): + self.journal = journal + self.enq_map = {} + def add(self, journal_file, enq_record, locked_flag): + if enq_record.record_id in self.enq_map: + raise qls.err.DuplicateRecordIdError(self.journal.current_file_header, enq_record) + self.enq_map[enq_record.record_id] = [journal_file, enq_record, locked_flag] + def contains(self, rid): + """Return True if the map contains the given rid""" + return rid in self.enq_map + def delete(self, journal_file, deq_record): + if deq_record.dequeue_record_id in self.enq_map: + enq_list = self.enq_map[deq_record.dequeue_record_id] + del self.enq_map[deq_record.dequeue_record_id] + return enq_list + else: + raise qls.err.RecordIdNotFoundError(journal_file.file_header, deq_record) + def get(self, record_id): + if record_id in self.enq_map: + return self.enq_map[record_id] + return None + def lock(self, journal_file, dequeue_record): + if dequeue_record.dequeue_record_id not in self.enq_map: + raise qls.err.RecordIdNotFoundError(journal_file.file_header, dequeue_record) + self.enq_map[dequeue_record.dequeue_record_id][2] = True + def report_str(self, _, show_records): + """Return a string containing a text report for all records in the map""" + if len(self.enq_map) == 0: + return 'No enqueued records found.' + rstr = '%d enqueued records found' % len(self.enq_map) + if show_records: + rstr += ":" + rid_list = self.enq_map.keys() + rid_list.sort() + for rid in rid_list: + journal_file, record, locked_flag = self.enq_map[rid] + if locked_flag: + lock_str = '[LOCKED]' + else: + lock_str = '' + rstr += '\n 0x%x:%s %s' % (journal_file.file_header.file_num, record, lock_str) + else: + rstr += '.' + return rstr + def unlock(self, journal_file, dequeue_record): + """Set the transaction lock for a given record_id to False""" + if dequeue_record.dequeue_record_id in self.enq_map: + if self.enq_map[dequeue_record.dequeue_record_id][2]: + self.enq_map[dequeue_record.dequeue_record_id][2] = False + else: + raise qls.err.RecordNotLockedError(journal_file.file_header, dequeue_record) + else: + raise qls.err.RecordIdNotFoundError(journal_file.file_header, dequeue_record) + +class TransactionMap(object): + """ + Map of open transactions used while recovering a QLS journal + """ + def __init__(self, enq_map): + self.txn_map = {} + self.enq_map = enq_map + def abort(self, xid): + """Perform an abort operation for the given xid record""" + for journal_file, record, _ in self.txn_map[xid]: + if isinstance(record, qls.jrnl.DequeueRecord): + if self.enq_map.contains(record.dequeue_record_id): + self.enq_map.unlock(journal_file, record) + else: + journal_file.decr_enq_cnt(record) + del self.txn_map[xid] + def add(self, journal_file, record): + if record.xid is None: + raise qls.err.NonTransactionalRecordError(journal_file.file_header, record, 'TransactionMap.add()') + if isinstance(record, qls.jrnl.DequeueRecord): + try: + self.enq_map.lock(journal_file, record) + except qls.err.RecordIdNotFoundError: + # Not in emap, look for rid in tmap - should not happen in practice + txn_op = self._find_record_id(record.xid, record.dequeue_record_id) + if txn_op != None: + if txn_op[2]: + raise qls.err.AlreadyLockedError(journal_file.file_header, record) + txn_op[2] = True + if record.xid in self.txn_map: + self.txn_map[record.xid].append([journal_file, record, False]) # append to existing list + else: + self.txn_map[record.xid] = [[journal_file, record, False]] # create new list + def commit(self, xid): + """Perform a commit operation for the given xid record""" + mismatch_list = [] + for journal_file, record, lock in self.txn_map[xid]: + if isinstance(record, qls.jrnl.EnqueueRecord): + self.enq_map.add(journal_file, record, lock) # Transfer enq to emap + else: + if self.enq_map.contains(record.dequeue_record_id): + self.enq_map.unlock(journal_file, record) + self.enq_map.delete(journal_file, record)[0].decr_enq_cnt(record) + else: + mismatch_list.append('0x%x' % record.dequeue_record_id) + del self.txn_map[xid] + return mismatch_list + def contains(self, xid): + """Return True if the xid exists in the map; False otherwise""" + return xid in self.txn_map + def delete(self, journal_file, transaction_record): + """Remove a transaction record from the map using either a commit or abort header""" + if transaction_record.magic[-1] == 'c': + return self.commit(transaction_record.xid) + if transaction_record.magic[-1] == 'a': + self.abort(transaction_record.xid) + else: + raise qls.err.InvalidRecordTypeError(journal_file.file_header, transaction_record, + 'delete from Transaction Map') + def get(self, xid): + if xid in self.txn_map: + return self.txn_map[xid] + return None + def get_prepared_list(self): + """ + Prepared list is a map of xid(key) to one of None, True or False. These represent respectively: + None: prepared, but neither committed or aborted (interrupted before commit or abort) + False: prepared and aborted (interrupted before abort complete) + True: prepared and committed (interrupted before commit complete) + """ + prepared_list = {} + for xid in self.get_xid_list(): + for _, record, _ in self.txn_map[xid]: + if isinstance(record, qls.jrnl.EnqueueRecord): + prepared_list[xid] = None + else: + prepared_list[xid] = record.is_transaction_complete_commit() + return prepared_list + def get_xid_list(self): + return self.txn_map.keys() + def report_str(self, _, show_records): + """Return a string containing a text report for all records in the map""" + if len(self.txn_map) == 0: + return 'No outstanding transactions found.' + rstr = '%d outstanding transaction(s)' % len(self.txn_map) + if show_records: + rstr += ':' + for xid, op_list in self.txn_map.iteritems(): + rstr += '\n %s containing %d operations:' % (qls.utils.format_xid(xid), len(op_list)) + for journal_file, record, _ in op_list: + rstr += '\n 0x%x:%s' % (journal_file.file_header.file_num, record) + else: + rstr += '.' + return rstr + def _find_record_id(self, xid, record_id): + """ Search for and return map list with supplied rid.""" + if xid in self.txn_map: + for txn_op in self.txn_map[xid]: + if txn_op[1].record_id == record_id: + return txn_op + for this_xid in self.txn_map.iterkeys(): + for txn_op in self.txn_map[this_xid]: + if txn_op[1].record_id == record_id: + return txn_op + return None + +class JournalStatistics(object): + """Journal statistics""" + def __init__(self): + self.total_record_count = 0 + self.transient_record_count = 0 + self.filler_record_count = 0 + self.enqueue_count = 0 + self.dequeue_count = 0 + self.transaction_record_count = 0 + self.transaction_enqueue_count = 0 + self.transaction_dequeue_count = 0 + self.transaction_commit_count = 0 + self.transaction_abort_count = 0 + self.transaction_operation_count = 0 + def __str__(self): + fstr = 'Total record count: %d\n' + \ + 'Transient record count: %d\n' + \ + 'Filler_record_count: %d\n' + \ + 'Enqueue_count: %d\n' + \ + 'Dequeue_count: %d\n' + \ + 'Transaction_record_count: %d\n' + \ + 'Transaction_enqueue_count: %d\n' + \ + 'Transaction_dequeue_count: %d\n' + \ + 'Transaction_commit_count: %d\n' + \ + 'Transaction_abort_count: %d\n' + \ + 'Transaction_operation_count: %d\n' + return fstr % (self.total_record_count, + self.transient_record_count, + self.filler_record_count, + self.enqueue_count, + self.dequeue_count, + self.transaction_record_count, + self.transaction_enqueue_count, + self.transaction_dequeue_count, + self.transaction_commit_count, + self.transaction_abort_count, + self.transaction_operation_count) + +class Journal(object): + """ + Instance of a Qpid Linear Store (QLS) journal. + """ + def __init__(self, directory, xid_prepared_list, args): + self.directory = directory + self.queue_name = os.path.basename(directory) + self.files = {} + self.file_num_list = None + self.file_num_itr = None + self.enq_map = EnqueueMap(self) + self.txn_map = TransactionMap(self.enq_map) + self.current_journal_file = None + self.first_rec_flag = None + self.statistics = JournalStatistics() + self.xid_prepared_list = xid_prepared_list # This is None for the TPL instance only + self.args = args + self.last_record_offset = None # TODO: Move into JournalFile + self.num_filler_records_required = None # TODO: Move into JournalFile + self.fill_to_offset = None + def add_record(self, record): + if isinstance(record, qls.jrnl.EnqueueRecord) or isinstance(record, qls.jrnl.DequeueRecord): + if record.xid_size > 0: + self.txn_map.add(self.current_journal_file, record) + else: + self.enq_map.add(self.current_journal_file, record, False) + elif isinstance(record, qls.jrnl.TransactionRecord): + self.txn_map.delete(self.current_journal_file, record) + else: + raise qls.err.InvalidRecordTypeError(self.current_journal_file, record, 'add to Journal') + def get_enq_map_record(self, rid): + return self.enq_map.get(rid) + def get_txn_map_record(self, xid): + return self.txn_map.get(xid) + def get_outstanding_txn_list(self): + return self.txn_map.get_xid_list() + def get_queue_name(self): + return self.queue_name + def recover(self, high_rid_counter): + print 'Recovering', self.queue_name + self._analyze_files() + try: + while self._get_next_record(high_rid_counter): + pass + self._check_alignment() + except qls.err.NoMoreFilesInJournalError: + print 'No more files in journal' + except qls.err.FirstRecordOffsetMismatchError as err: + print '0x%08x: **** FRO ERROR: queue=\"%s\" fid=0x%x fro actual=0x%08x expected=0x%08x' % \ + (err.get_expected_fro(), err.get_queue_name(), err.get_file_number(), err.get_record_offset(), + err.get_expected_fro()) + def reconcile_transactions(self, prepared_list, txn_flag): + xid_list = self.txn_map.get_xid_list() + if len(xid_list) > 0: + print self.queue_name, 'contains', len(xid_list), 'open transaction(s):' + for xid in xid_list: + if xid in prepared_list.keys(): + commit_flag = prepared_list[xid] + if commit_flag is None: + print ' ', qls.utils.format_xid(xid), '- Assuming commit after prepare' + if txn_flag: + self.txn_map.commit(xid) + elif commit_flag: + print ' ', qls.utils.format_xid(xid), '- Completing interrupted commit operation' + if txn_flag: + self.txn_map.commit(xid) + else: + print ' ', qls.utils.format_xid(xid), '- Completing interrupted abort operation' + if txn_flag: + self.txn_map.abort(xid) + else: + print ' ', qls.utils.format_xid(xid), '- Ignoring, not in prepared transaction list' + if txn_flag: + self.txn_map.abort(xid) + def report(self, print_stats_flag): + print 'Journal "%s":' % self.queue_name + print '=' * (11 + len(self.queue_name)) + if print_stats_flag: + print str(self.statistics) + print self.enq_map.report_str(True, True) + print self.txn_map.report_str(True, True) + JournalFile.report_header() + for file_num in sorted(self.files.keys()): + self.files[file_num].report() + #TODO: move this to JournalFile, append to file info + if self.num_filler_records_required is not None and self.fill_to_offset is not None: + print '0x%x:0x%08x: %d filler records required for DBLK alignment to 0x%08x' % \ + (self.current_journal_file.file_header.file_num, self.last_record_offset, + self.num_filler_records_required, self.fill_to_offset) + print + #--- protected functions --- + def _analyze_files(self): + for dir_entry in os.listdir(self.directory): + dir_entry_bits = dir_entry.split('.') + if len(dir_entry_bits) == 2 and dir_entry_bits[1] == JournalRecoveryManager.JRNL_DIR_NAME: + fq_file_name = os.path.join(self.directory, dir_entry) + file_handle = open(fq_file_name) + args = qls.utils.load_args(file_handle, qls.jrnl.RecordHeader) + file_hdr = qls.jrnl.FileHeader(*args) + file_hdr.init(file_handle, *qls.utils.load_args(file_handle, qls.jrnl.FileHeader)) + if file_hdr.is_header_valid(file_hdr): + file_hdr.load(file_handle) + if file_hdr.is_valid(): + qls.utils.skip(file_handle, file_hdr.file_header_size_sblks * qls.utils.DEFAULT_SBLK_SIZE) + self.files[file_hdr.file_num] = JournalFile(file_hdr) + self.file_num_list = sorted(self.files.keys()) + self.file_num_itr = iter(self.file_num_list) + def _check_alignment(self): # TODO: Move into JournalFile + remaining_sblks = self.last_record_offset % qls.utils.DEFAULT_SBLK_SIZE + if remaining_sblks == 0: + self.num_filler_records_required = 0 + else: + self.num_filler_records_required = (qls.utils.DEFAULT_SBLK_SIZE - remaining_sblks) / \ + qls.utils.DEFAULT_DBLK_SIZE + self.fill_to_offset = self.last_record_offset + \ + (self.num_filler_records_required * qls.utils.DEFAULT_DBLK_SIZE) + if self.args.show_recs or self.args.show_all_recs: + print '0x%x:0x%08x: %d filler records required for DBLK alignment to 0x%08x' % \ + (self.current_journal_file.file_header.file_num, self.last_record_offset, + self.num_filler_records_required, self.fill_to_offset) + def _check_file(self): + if self.current_journal_file is not None: + if not self.current_journal_file.file_header.is_end_of_file(): + return True + if self.current_journal_file.file_header.is_end_of_file(): + self.last_record_offset = self.current_journal_file.file_header.file_handle.tell() + if not self._get_next_file(): + return False + fhdr = self.current_journal_file.file_header + fhdr.file_handle.seek(fhdr.first_record_offset) + return True + def _get_next_file(self): + if self.current_journal_file is not None: + file_handle = self.current_journal_file.file_header.file_handle + if not file_handle.closed: # sanity check, should not be necessary + file_handle.close() + file_num = 0 + try: + while file_num == 0: + file_num = self.file_num_itr.next() + except StopIteration: + pass + if file_num == 0: + return False + self.current_journal_file = self.files[file_num] + self.first_rec_flag = True + if self.args.show_recs or self.args.show_all_recs: + file_header = self.current_journal_file.file_header + print '0x%x:%s' % (file_header.file_num, file_header.to_string()) + return True + def _get_next_record(self, high_rid_counter): + if not self._check_file(): + return False + self.last_record_offset = self.current_journal_file.file_header.file_handle.tell() + this_record = qls.utils.load(self.current_journal_file.file_header.file_handle, qls.jrnl.RecordHeader) + if not this_record.is_header_valid(self.current_journal_file.file_header): + return False + if self.first_rec_flag: + if this_record.file_offset != self.current_journal_file.file_header.first_record_offset: + raise qls.err.FirstRecordOffsetMismatchError(self.current_journal_file.file_header, this_record) + self.first_rec_flag = False + self.statistics.total_record_count += 1 + start_journal_file = self.current_journal_file + if isinstance(this_record, qls.jrnl.EnqueueRecord): + ok_flag = self._handle_enqueue_record(this_record, start_journal_file) + high_rid_counter.check(this_record.record_id) + if self.args.show_recs or self.args.show_all_recs: + print '0x%x:%s' % (start_journal_file.file_header.file_num, \ + this_record.to_string(self.args.show_xids, self.args.show_data)) + elif isinstance(this_record, qls.jrnl.DequeueRecord): + ok_flag = self._handle_dequeue_record(this_record, start_journal_file) + high_rid_counter.check(this_record.record_id) + if self.args.show_recs or self.args.show_all_recs: + print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record.to_string(self.args.show_xids)) + elif isinstance(this_record, qls.jrnl.TransactionRecord): + ok_flag = self._handle_transaction_record(this_record, start_journal_file) + high_rid_counter.check(this_record.record_id) + if self.args.show_recs or self.args.show_all_recs: + print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record.to_string(self.args.show_xids)) + else: + self.statistics.filler_record_count += 1 + ok_flag = True + if self.args.show_all_recs: + print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record) + qls.utils.skip(self.current_journal_file.file_header.file_handle, qls.utils.DEFAULT_DBLK_SIZE) + return ok_flag + def _handle_enqueue_record(self, enqueue_record, start_journal_file): + while enqueue_record.load(self.current_journal_file.file_header.file_handle): + if not self._get_next_file(): + enqueue_record.truncated_flag = True + return False + if not enqueue_record.is_valid(start_journal_file): + return False + if enqueue_record.is_external() and enqueue_record.data != None: + raise qls.err.ExternalDataError(self.current_journal_file.file_header, enqueue_record) + if enqueue_record.is_transient(): + self.statistics.transient_record_count += 1 + return True + if enqueue_record.xid_size > 0: + self.txn_map.add(start_journal_file, enqueue_record) + self.statistics.transaction_operation_count += 1 + self.statistics.transaction_record_count += 1 + self.statistics.transaction_enqueue_count += 1 + else: + self.enq_map.add(start_journal_file, enqueue_record, False) + start_journal_file.incr_enq_cnt() + self.statistics.enqueue_count += 1 + return True + def _handle_dequeue_record(self, dequeue_record, start_journal_file): + while dequeue_record.load(self.current_journal_file.file_header.file_handle): + if not self._get_next_file(): + dequeue_record.truncated_flag = True + return False + if not dequeue_record.is_valid(start_journal_file): + return False + if dequeue_record.xid_size > 0: + if self.xid_prepared_list is None: # ie this is the TPL + dequeue_record.transaction_prepared_list_flag = True + elif not self.enq_map.contains(dequeue_record.dequeue_record_id): + dequeue_record.warnings.append('NOT IN EMAP') # Only for non-TPL records + self.txn_map.add(start_journal_file, dequeue_record) + self.statistics.transaction_operation_count += 1 + self.statistics.transaction_record_count += 1 + self.statistics.transaction_dequeue_count += 1 + else: + try: + self.enq_map.delete(start_journal_file, dequeue_record)[0].decr_enq_cnt(dequeue_record) + except qls.err.RecordIdNotFoundError: + dequeue_record.warnings.append('NOT IN EMAP') + self.statistics.dequeue_count += 1 + return True + def _handle_transaction_record(self, transaction_record, start_journal_file): + while transaction_record.load(self.current_journal_file.file_header.file_handle): + if not self._get_next_file(): + transaction_record.truncated_flag = True + return False + if not transaction_record.is_valid(start_journal_file): + return False + if transaction_record.magic[-1] == 'a': + self.statistics.transaction_abort_count += 1 + else: + self.statistics.transaction_commit_count += 1 + if self.txn_map.contains(transaction_record.xid): + self.txn_map.delete(self.current_journal_file, transaction_record) + else: + transaction_record.warnings.append('NOT IN TMAP') +# if transaction_record.magic[-1] == 'c': # commits only +# self._txn_obj_list[hdr.xid] = hdr + self.statistics.transaction_record_count += 1 + return True + def _load_data(self, record): + while not record.is_complete: + record.load(self.current_journal_file.file_handle) + +class JournalFile(object): + def __init__(self, file_header): + self.file_header = file_header + self.enq_cnt = 0 + self.deq_cnt = 0 + self.num_filler_records_required = None + def incr_enq_cnt(self): + self.enq_cnt += 1 + def decr_enq_cnt(self, record): + if self.enq_cnt <= self.deq_cnt: + raise qls.err.EnqueueCountUnderflowError(self.file_header, record) + self.deq_cnt += 1 + def get_enq_cnt(self): + return self.enq_cnt - self.deq_cnt + def is_outstanding_enq(self): + return self.enq_cnt > self.deq_cnt + @staticmethod + def report_header(): + print 'file_num enq_cnt p_no efp journal_file' + print '-------- ------- ---- ----- ------------' + def report(self): + comment = '' if self.file_header.file_num == 0 else '' + file_num_str = '0x%x' % self.file_header.file_num + print '%8s %7d %4d %4dk %s %s' % (file_num_str, self.get_enq_cnt(), self.file_header.partition_num, + self.file_header.efp_data_size_kb, + os.path.basename(self.file_header.file_handle.name), comment) diff --git a/qpid/tools/src/py/qls/efp.py b/qpid/tools/src/py/qls/efp.py index abf289dc12..93b77eea93 100644 --- a/qpid/tools/src/py/qls/efp.py +++ b/qpid/tools/src/py/qls/efp.py @@ -17,38 +17,137 @@ # under the License. # +""" +Module: qls.efp + +Contains empty file pool (EFP) classes. +""" + import os import os.path import qls.err +import shutil +import uuid class EfpManager(object): """ Top level class to analyze the Qpid Linear Store (QLS) directory for the partitions that make up the Empty File Pool (EFP). """ - def __init__(self, directory, args): + def __init__(self, directory, disk_space_required_kb): if not os.path.exists(directory): raise qls.err.InvalidQlsDirectoryNameError(directory) self.directory = directory - self.args = args - self.partitions = [] + self.disk_space_required_kb = disk_space_required_kb + self.efp_partitions = [] + self.efp_pools = {} + self.total_num_files = 0 + self.total_cum_file_size_kb = 0 + self.current_efp_partition = None + def add_file_pool(self, file_size_kb, num_files): + """ Add an EFP in the specified partition of the specified size containing the specified number of files """ + dir_name = EmptyFilePool.get_directory_name(file_size_kb) + print 'Adding pool \'%s\' to partition %s' % (dir_name, self.current_efp_partition.partition_number) + self.total_cum_file_size_kb += self.current_efp_partition.create_new_efp(file_size_kb, num_files) + self.total_num_files += num_files + def freshen_file_pool(self, file_size_kb, num_files): + """ Freshen an EFP in the specified partition and of the specified size to the specified number of files """ + if self.current_efp_partition is None: + partition_list = self.efp_partitions + partition_str = 'all partitions' + else: + partition_list = [self.current_efp_partition] + partition_str = 'partition %d' % self.current_efp_partition.partition_number + if file_size_kb is None: + pool_str = 'all pools' + else: + pool_str = 'pool \'%s\'' % EmptyFilePool.get_directory_name(int(file_size_kb)) + print 'Freshening %s in %s to %d files' % (pool_str, partition_str, num_files) + for self.current_efp_partition in partition_list: # Partition objects + if file_size_kb is None: + file_size_list = self.current_efp_partition.efp_pools.keys() + else: + file_size_list = ['%sk' % file_size_kb] + for file_size in file_size_list: + efp = self.current_efp_partition.efp_pools[file_size] + num_files_needed = num_files - efp.get_tot_file_count() + if num_files_needed > 0: + self.current_efp_partition.create_new_efp_files(qls.utils.efp_directory_size(file_size), + num_files_needed) + else: + print ' WARNING: Pool %s in partition %s already contains %d files: no action taken' % \ + (self.current_efp_partition.efp_pools[file_size].size_str, + self.current_efp_partition.partition_number, efp.get_num_files()) + def remove_file_pool(self, file_size_kb): + """ Remove an existing EFP from the specified partition and of the specified size """ + dir_name = EmptyFilePool.get_directory_name(file_size_kb) + print 'Removing pool \'%s\' from partition %s' % (dir_name, self.current_efp_partition.partition_number) + self.efp_partitions.remove(self.current_efp_partition) + shutil.rmtree(os.path.join(self.current_efp_partition.efp_directory, dir_name)) def report(self): - print 'Found', len(self.partitions), 'partition(s).' - if (len(self.partitions)) > 0: + print 'Empty File Pool (EFP) report:' + print '=============================' + print 'Found', len(self.efp_partitions), 'partition(s).' + if (len(self.efp_partitions)) > 0: EfpPartition.print_report_table_header() - for ptn in self.partitions: + for ptn in self.efp_partitions: ptn.print_report_table_line() print - for ptn in self.partitions: + for ptn in self.efp_partitions: ptn.report() - def run(self, _): + def run(self, arg_tup): + self._analyze_efp() + if arg_tup is not None: + _, arg_file_size, arg_num_files, arg_add, arg_remove, arg_freshen, arg_list = arg_tup + self._check_args(arg_tup) + if arg_add: + self.add_file_pool(int(arg_file_size), int(arg_num_files)) + if arg_remove: + self.remove_file_pool(int(arg_file_size)) + if arg_freshen: + self.freshen_file_pool(arg_file_size, int(arg_num_files)) + if arg_list: + self.report() + def _analyze_efp(self): for dir_entry in os.listdir(self.directory): try: - efpp = EfpPartition(os.path.join(self.directory, dir_entry)) - efpp.scan() - self.partitions.append(efpp) + efp_partition = EfpPartition(os.path.join(self.directory, dir_entry), self.disk_space_required_kb) + efp_partition.scan() + self.efp_partitions.append(efp_partition) + for efpl in efp_partition.efp_pools.iterkeys(): + if efpl not in self.efp_pools: + self.efp_pools[efpl] = [] + self.efp_pools[efpl].append(efp_partition.efp_pools[efpl]) + self.total_num_files += efp_partition.tot_file_count + self.total_cum_file_size_kb += efp_partition.tot_file_size_kb except qls.err.InvalidPartitionDirectoryNameError: pass + def _check_args(self, arg_tup): + """ Value check of args. The names of partitions and pools are validated against the discovered instances """ + arg_partition, arg_file_size, _, arg_add, arg_remove, arg_freshen, _ = arg_tup + if arg_partition is not None: + try: + if arg_partition[0] == 'p': # string partition name, eg 'p001' + partition_num = int(arg_partition[1:]) + else: # numeric partition, eg '1' + partition_num = int(arg_partition) + found = False + for partition in self.efp_partitions: + if partition.partition_number == partition_num: + self.current_efp_partition = partition + found = True + break + if not found: + raise qls.err.PartitionDoesNotExistError(arg_partition) + except ValueError: + raise qls.err.InvalidPartitionDirectoryNameError(arg_partition) + if self.current_efp_partition is not None: + pool_list = self.current_efp_partition.efp_pools.keys() + efp_directory_name = EmptyFilePool.get_directory_name(int(arg_file_size)) + if arg_add and efp_directory_name in pool_list: + raise qls.err.PoolDirectoryAlreadyExistsError(efp_directory_name) + if (arg_remove or arg_freshen) and efp_directory_name not in pool_list: + raise qls.err.PoolDirectoryDoesNotExistError(efp_directory_name) class EfpPartition(object): """ @@ -56,55 +155,59 @@ class EfpPartition(object): """ PTN_DIR_PREFIX = 'p' EFP_DIR_NAME = 'efp' - def __init__(self, directory): - self.base_dir = os.path.basename(directory) - if self.base_dir[0] is not EfpPartition.PTN_DIR_PREFIX: - raise qls.err.InvalidPartitionDirectoryNameError(directory) - try: - self.partition_number = int(self.base_dir[1:]) - except ValueError: - raise qls.err.InvalidPartitionDirectoryNameError(directory) + def __init__(self, directory, disk_space_required_kb): self.directory = directory - self.pools = [] - self.efp_count = 0 + self.partition_number = None + self.efp_pools = {} self.tot_file_count = 0 self.tot_file_size_kb = 0 - def get_directory(self): - return self.directory - def get_efp_count(self): - return self.efp_count - def get_name(self): - return self.base_dir - def get_number(self): - return self.partition_number - def get_number_pools(self): - return len(self.pools) - def get_tot_file_count(self): - return self.tot_file_count - def get_tot_file_size_kb(self): - return self.tot_file_size_kb + self._validate_partition_directory(disk_space_required_kb) + def create_new_efp_files(self, file_size_kb, num_files): + """ Create new EFP files in this partition """ + dir_name = EmptyFilePool.get_directory_name(file_size_kb) + if dir_name in self.efp_pools.keys(): + efp = self.efp_pools[dir_name] + else: + efp = EmptyFilePool(os.path.join(self.directory, EfpPartition.EFP_DIR_NAME), dir_name) + this_tot_file_size_kb = efp.create_new_efp_files(num_files) + self.tot_file_size_kb += this_tot_file_size_kb + self.tot_file_count += num_files + return this_tot_file_size_kb @staticmethod def print_report_table_header(): print 'p_no no_efp tot_files tot_size_kb directory' print '---- ------ --------- ----------- ---------' def print_report_table_line(self): - print '%4d %6d %9d %11d %s' % (self.get_number(), self.get_efp_count(), self.get_tot_file_count(), - self.get_tot_file_size_kb(), self.get_directory()) + print '%4d %6d %9d %11d %s' % (self.partition_number, len(self.efp_pools), self.tot_file_count, + self.tot_file_size_kb, self.directory) def report(self): - print 'Partition %s:' % self.base_dir + print 'Partition %s:' % os.path.basename(self.directory) EmptyFilePool.print_report_table_header() - for pool in self.pools: - pool.print_report_table_line() + for dir_name in self.efp_pools.keys(): + self.efp_pools[dir_name].print_report_table_line() print def scan(self): if os.path.exists(self.directory): efp_dir = os.path.join(self.directory, EfpPartition.EFP_DIR_NAME) for dir_entry in os.listdir(efp_dir): - efp = EmptyFilePool(os.path.join(efp_dir, dir_entry)) - self.efp_count += 1 + efp = EmptyFilePool(os.path.join(efp_dir, dir_entry), self.partition_number) + efp.scan() self.tot_file_count += efp.get_tot_file_count() self.tot_file_size_kb += efp.get_tot_file_size_kb() - self.pools.append(efp) + self.efp_pools[dir_entry] = efp + def _validate_partition_directory(self, disk_space_required_kb): + if os.path.basename(self.directory)[0] is not EfpPartition.PTN_DIR_PREFIX: + raise qls.err.InvalidPartitionDirectoryNameError(self.directory) + try: + self.partition_number = int(os.path.basename(self.directory)[1:]) + except ValueError: + raise qls.err.InvalidPartitionDirectoryNameError(self.directory) + if not qls.utils.has_write_permission(self.directory): + raise qls.err.WritePermissionError(self.directory) + if disk_space_required_kb is not None: + space_avail = qls.utils.get_avail_disk_space(self.directory) + if space_avail < (disk_space_required_kb * 1024): + raise qls.err.InsufficientSpaceOnDiskError(self.directory, space_avail, disk_space_required_kb * 1024) class EmptyFilePool(object): """ @@ -112,33 +215,89 @@ class EmptyFilePool(object): journal files (but it may also be empty). """ EFP_DIR_SUFFIX = 'k' - def __init__(self, directory): - self.base_dir = os.path.basename(directory) - if self.base_dir[-1] is not EmptyFilePool.EFP_DIR_SUFFIX: - raise qls.err.InvalidEfpDirectoryNameError(directory) - try: - self.data_size_kb = int(os.path.basename(self.base_dir)[:-1]) - except ValueError: - raise qls.err.InvalidEfpDirectoryNameError(directory) + EFP_JRNL_EXTENTION = '.jrnl' + def __init__(self, directory, partition_number): + self.base_dir_name = os.path.basename(directory) self.directory = directory - self.files = os.listdir(directory) + self.partition_number = partition_number + self.data_size_kb = None + self.files = [] + self.tot_file_size_kb = 0 + self._validate_efp_directory() + def create_new_efp_files(self, num_files): + """ Create one or more new empty journal files of the prescribed size for this EFP """ + this_total_file_size = 0 + for _ in range(num_files): + this_total_file_size += self._create_new_efp_file() + return this_total_file_size def get_directory(self): return self.directory - def get_file_data_size_kb(self): - return self.data_size_kb - def get_name(self): - return self.base_dir + @staticmethod + def get_directory_name(file_size_kb): + """ Static function to create an EFP directory name from the size of the files it contains """ + return '%dk' % file_size_kb def get_tot_file_count(self): return len(self.files) def get_tot_file_size_kb(self): return self.data_size_kb * len(self.files) @staticmethod def print_report_table_header(): - print 'file_size_kb file_count tot_file_size_kb efp_directory' + print 'data_size_kb file_count tot_file_size_kb efp_directory' print '------------ ---------- ---------------- -------------' def print_report_table_line(self): - print '%12d %10d %16d %s' % (self.get_file_data_size_kb(), self.get_tot_file_count(), + print '%12d %10d %16d %s' % (self.data_size_kb, self.get_tot_file_count(), self.get_tot_file_size_kb(), self.get_directory()) + def scan(self): + for efp_file in os.listdir(self.directory): + if self._validate_efp_file(os.path.join(self.directory, efp_file)): + self.files.append(efp_file) + def _add_efp_file(self, efp_file_name): + """ Add a single journal file of the appropriate size to this EFP. No file size check is made here. """ + self.files.append(efp_file_name) + self.tot_file_size_kb += os.path.getsize(efp_file_name) + def _create_new_efp_file(self): + """ Create a single new empty journal file of the prescribed size for this EFP """ + file_name = str(uuid.uuid4()) + EmptyFilePool.EFP_JRNL_EXTENTION + file_header = qls.jrnl.FileHeader(0, qls.jrnl.FileHeader.MAGIC, qls.utils.DEFAULT_RECORD_VERSION, 0, 0, 0) + file_header.init(None, None, qls.utils.DEFAULT_HEADER_SIZE_SBLKS, self.partition_number, self.data_size_kb, + 0, 0, 0, 0, 0) + efh = file_header.encode() + efh_bytes = len(efh) + file_handle = open(os.path.join(self.directory, file_name), 'wb') + file_handle.write(efh) + file_handle.write('\xff' * (qls.utils.DEFAULT_SBLK_SIZE - efh_bytes)) + file_handle.write('\x00' * (int(self.data_size_kb) * 1024)) + file_handle.close() + fqfn = os.path.join(self.directory, file_name) + self._add_efp_file(fqfn) + return os.path.getsize(fqfn) + def _validate_efp_directory(self): + if self.base_dir_name[-1] is not EmptyFilePool.EFP_DIR_SUFFIX: + raise qls.err.InvalidEfpDirectoryNameError(self.directory) + try: + self.data_size_kb = int(os.path.basename(self.base_dir_name)[:-1]) + except ValueError: + raise qls.err.InvalidEfpDirectoryNameError(self.directory) + def _validate_efp_file(self, efp_file): + file_size = os.path.getsize(efp_file) + expected_file_size = (self.data_size_kb * 1024) + qls.utils.DEFAULT_SBLK_SIZE + if file_size != expected_file_size: + print 'WARNING: File %s not of correct size (size=%d, expected=%d): Ignoring' % (efp_file, file_size, + expected_file_size) + return False + file_handle = open(efp_file) + args = qls.utils.load_args(file_handle, qls.jrnl.RecordHeader) + file_hdr = qls.jrnl.FileHeader(*args) + file_hdr.init(file_handle, *qls.utils.load_args(file_handle, qls.jrnl.FileHeader)) + if not file_hdr.is_header_valid(file_hdr): + file_handle.close() + return False + file_hdr.load(file_handle) + file_handle.close() + if not file_hdr.is_valid(): + return False + return True + # ============================================================================= diff --git a/qpid/tools/src/py/qls/err.py b/qpid/tools/src/py/qls/err.py index 702fbb9520..bceaf041c9 100644 --- a/qpid/tools/src/py/qls/err.py +++ b/qpid/tools/src/py/qls/err.py @@ -17,12 +17,20 @@ # under the License. # +""" +Module: qls.err + +Contains error classes. +""" + # --- Parent classes class QlsError(Exception): """Base error class for QLS errors and exceptions""" def __init__(self): Exception.__init__(self) + def __str__(self): + return '' class QlsRecordError(QlsError): """Base error class for individual records""" @@ -92,10 +100,22 @@ class FirstRecordOffsetMismatchError(QlsRecordError): def __str__(self): return 'First record offset mismatch: ' + QlsRecordError.__str__(self) + ' expected_offset=0x%x' % \ self.file_header.first_record_offset - + +class InsufficientSpaceOnDiskError(QlsError): + """Insufficient space on disk""" + def __init__(self, directory, space_avail, space_requried): + QlsError.__init__(self) + self.directory = directory + self.space_avail = space_avail + self.space_required = space_requried + def __str__(self): + return 'Insufficient space on disk: directory=%s; avail_space=%d required_space=%d' % \ + (self.directory, self.space_avail, self.space_required) + class InvalidClassError(QlsError): """Invalid class name or type""" def __init__(self, class_name): + QlsError.__init__(self) self.class_name = class_name def __str__(self): return 'Invalid class name "%s"' % self.class_name @@ -108,6 +128,14 @@ class InvalidEfpDirectoryNameError(QlsError): def __str__(self): return 'Invalid EFP directory name "%s"' % self.directory_name +#class InvalidFileSizeString(QlsError): +# """Invalid file size string""" +# def __init__(self, file_size_string): +# QlsError.__init__(self) +# self.file_size_string = file_size_string +# def __str__(self): +# return 'Invalid file size string "%s"' % self.file_size_string + class InvalidPartitionDirectoryNameError(QlsError): """Invalid EFP partition name - should be pNNN, where NNN is a 3-digit partition number""" def __init__(self, directory_name): @@ -158,6 +186,30 @@ class NonTransactionalRecordError(QlsRecordError): return 'Transactional operation on non-transactional record: ' + QlsRecordError.__str__() + \ ' operation=%s' % self.operation +class PartitionDoesNotExistError(QlsError): + """Partition name does not exist on disk""" + def __init__(self, partition_directory): + QlsError.__init__(self) + self.partition_directory = partition_directory + def __str__(self): + return 'Partition %s does not exist' % self.partition_directory + +class PoolDirectoryAlreadyExistsError(QlsError): + """Pool directory already exists""" + def __init__(self, pool_directory): + QlsError.__init__(self) + self.pool_directory = pool_directory + def __str__(self): + return 'Pool directory %s already exists' % self.pool_directory + +class PoolDirectoryDoesNotExistError(QlsError): + """Pool directory does not exist""" + def __init__(self, pool_directory): + QlsError.__init__(self) + self.pool_directory = pool_directory + def __str__(self): + return 'Pool directory %s does not exist' % self.pool_directory + class RecordIdNotFoundError(QlsRecordError): """Record Id not found in enqueue map""" def __init__(self, file_header, record): @@ -184,6 +236,14 @@ class UnexpectedEndOfFileError(QlsError): return 'Tried to read %d at offset %d in file "%s"; only read %d' % \ (self.size_read, self.file_offset, self.file_name, self.size_expected) +class WritePermissionError(QlsError): + """No write permission""" + def __init__(self, directory): + QlsError.__init__(self) + self.directory = directory + def __str__(self): + return 'No write permission in directory %s' % self.directory + class XidSizeError(QlsError): """Error class for Xid size mismatch""" def __init__(self, expected_size, actual_size, xid_str): diff --git a/qpid/tools/src/py/qls/jrnl.py b/qpid/tools/src/py/qls/jrnl.py index 5bce78bfad..f4fb16ef9f 100644 --- a/qpid/tools/src/py/qls/jrnl.py +++ b/qpid/tools/src/py/qls/jrnl.py @@ -17,567 +17,17 @@ # under the License. # -import os -import os.path +""" +Module: qls.jrnl + +Contains journal record classes. +""" + import qls.err +import qls.utils import string import struct -from time import gmtime, strftime -import zlib - -class HighCounter(object): - def __init__(self): - self.num = 0 - def check(self, num): - if self.num < num: - self.num = num - def get(self): - return self.num - def get_next(self): - self.num += 1 - return self.num - -class JournalRecoveryManager(object): - TPL_DIR_NAME = 'tpl' - JRNL_DIR_NAME = 'jrnl' - def __init__(self, directory, args): - if not os.path.exists(directory): - raise qls.err.InvalidQlsDirectoryNameError(directory) - self.directory = directory - self.args = args - self.tpl = None - self.journals = {} - self.high_rid_counter = HighCounter() - def report(self, print_stats_flag): - if self.tpl is not None: - self.tpl.report(print_stats_flag) - for queue_name in sorted(self.journals.keys()): - self.journals[queue_name].report(print_stats_flag) - def run(self, args): - tpl_dir = os.path.join(self.directory, JournalRecoveryManager.TPL_DIR_NAME) - if os.path.exists(tpl_dir): - self.tpl = Journal(tpl_dir, None, self.args) - self.tpl.recover(self.high_rid_counter) - print - jrnl_dir = os.path.join(self.directory, JournalRecoveryManager.JRNL_DIR_NAME) - prepared_list = self.tpl.txn_map.get_prepared_list() if self.tpl is not None else {} - if os.path.exists(jrnl_dir): - for dir_entry in sorted(os.listdir(jrnl_dir)): - jrnl = Journal(os.path.join(jrnl_dir, dir_entry), prepared_list, self.args) - jrnl.recover(self.high_rid_counter) - self.journals[jrnl.get_queue_name()] = jrnl - print - self._reconcile_transactions(prepared_list, args.txn) - def _reconcile_transactions(self, prepared_list, txn_flag): - print 'Transaction reconciliation report:' - print '==================================' - print len(prepared_list), 'open transaction(s) found in prepared transaction list:' - for xid in prepared_list.keys(): - commit_flag = prepared_list[xid] - if commit_flag is None: - status = '[Prepared, neither committed nor aborted - assuming commit]' - elif commit_flag: - status = '[Prepared, but interrupted during commit phase]' - else: - status = '[Prepared, but interrupted during abort phase]' - print ' ', Utils.format_xid(xid), status - if prepared_list[xid] is None: # Prepared, but not committed or aborted - enqueue_record = self.tpl.get_txn_map_record(xid)[0][1] - dequeue_record = Utils.create_record('QLSd', DequeueRecord.TXN_COMPLETE_COMMIT_FLAG, \ - self.tpl.current_journal_file, self.high_rid_counter.get_next(), \ - enqueue_record.record_id, xid, None) - if txn_flag: - self.tpl.add_record(dequeue_record) - for queue_name in sorted(self.journals.keys()): - self.journals[queue_name].reconcile_transactions(prepared_list, txn_flag) - if len(prepared_list) > 0: - print 'Completing prepared transactions in prepared transaction list:' - for xid in prepared_list.keys(): - print ' ', Utils.format_xid(xid) - transaction_record = Utils.create_record('QLSc', 0, self.tpl.current_journal_file, \ - self.high_rid_counter.get_next(), None, xid, None) - if txn_flag: - self.tpl.add_record(transaction_record) - print - -class EnqueueMap(object): - """ - Map of enqueued records in a QLS journal - """ - def __init__(self, journal): - self.journal = journal - self.enq_map = {} - def add(self, journal_file, enq_record, locked_flag): - if enq_record.record_id in self.enq_map: - raise qls.err.DuplicateRecordIdError(self.journal.current_file_header, enq_record) - self.enq_map[enq_record.record_id] = [journal_file, enq_record, locked_flag] - def contains(self, rid): - """Return True if the map contains the given rid""" - return rid in self.enq_map - def delete(self, journal_file, deq_record): - if deq_record.dequeue_record_id in self.enq_map: - enq_list = self.enq_map[deq_record.dequeue_record_id] - del self.enq_map[deq_record.dequeue_record_id] - return enq_list - else: - raise qls.err.RecordIdNotFoundError(journal_file.file_header, deq_record) - def get(self, record_id): - if record_id in self.enq_map: - return self.enq_map[record_id] - return None - def lock(self, journal_file, dequeue_record): - if dequeue_record.dequeue_record_id not in self.enq_map: - raise qls.err.RecordIdNotFoundError(journal_file.file_header, dequeue_record) - self.enq_map[dequeue_record.dequeue_record_id][2] = True - def report_str(self, _, show_records): - """Return a string containing a text report for all records in the map""" - if len(self.enq_map) == 0: - return 'No enqueued records found.' - rstr = '%d enqueued records found' % len(self.enq_map) - if show_records: - rstr += ":" - rid_list = self.enq_map.keys() - rid_list.sort() - for rid in rid_list: - journal_file, record, locked_flag = self.enq_map[rid] - if locked_flag: - lock_str = '[LOCKED]' - else: - lock_str = '' - rstr += '\n 0x%x:%s %s' % (journal_file.file_header.file_num, record, lock_str) - else: - rstr += '.' - return rstr - def unlock(self, journal_file, dequeue_record): - """Set the transaction lock for a given record_id to False""" - if dequeue_record.dequeue_record_id in self.enq_map: - if self.enq_map[dequeue_record.dequeue_record_id][2]: - self.enq_map[dequeue_record.dequeue_record_id][2] = False - else: - raise qls.err.RecordNotLockedError(journal_file.file_header, dequeue_record) - else: - raise qls.err.RecordIdNotFoundError(journal_file.file_header, dequeue_record) - -class TransactionMap(object): - """ - Map of open transactions used while recovering a QLS journal - """ - def __init__(self, enq_map): - self.txn_map = {} - self.enq_map = enq_map - def abort(self, xid): - """Perform an abort operation for the given xid record""" - for journal_file, record, _ in self.txn_map[xid]: - if isinstance(record, DequeueRecord): - if self.enq_map.contains(record.dequeue_record_id): - self.enq_map.unlock(journal_file, record) - else: - journal_file.decr_enq_cnt(record) - del self.txn_map[xid] - def add(self, journal_file, record): - if record.xid is None: - raise qls.err.NonTransactionalRecordError(journal_file.file_header, record, 'TransactionMap.add()') - if isinstance(record, DequeueRecord): - try: - self.enq_map.lock(journal_file, record) - except qls.err.RecordIdNotFoundError: - # Not in emap, look for rid in tmap - should not happen in practice - txn_op = self._find_record_id(record.xid, record.dequeue_record_id) - if txn_op != None: - if txn_op[2]: - raise qls.err.AlreadyLockedError(journal_file.file_header, record) - txn_op[2] = True - if record.xid in self.txn_map: - self.txn_map[record.xid].append([journal_file, record, False]) # append to existing list - else: - self.txn_map[record.xid] = [[journal_file, record, False]] # create new list - def commit(self, xid): - """Perform a commit operation for the given xid record""" - mismatch_list = [] - for journal_file, record, lock in self.txn_map[xid]: - if isinstance(record, EnqueueRecord): - self.enq_map.add(journal_file, record, lock) # Transfer enq to emap - else: - if self.enq_map.contains(record.dequeue_record_id): - self.enq_map.unlock(journal_file, record) - self.enq_map.delete(journal_file, record)[0].decr_enq_cnt(record) - else: - mismatch_list.append('0x%x' % record.dequeue_record_id) - del self.txn_map[xid] - return mismatch_list - def contains(self, xid): - """Return True if the xid exists in the map; False otherwise""" - return xid in self.txn_map - def delete(self, journal_file, transaction_record): - """Remove a transaction record from the map using either a commit or abort header""" - if transaction_record.magic[-1] == 'c': - return self.commit(transaction_record.xid) - if transaction_record.magic[-1] == 'a': - self.abort(transaction_record.xid) - else: - raise qls.err.InvalidRecordTypeError(journal_file.file_header, transaction_record, - 'delete from Transaction Map') - def get(self, xid): - if xid in self.txn_map: - return self.txn_map[xid] - return None - def get_prepared_list(self): - """ - Prepared list is a map of xid(key) to one of None, True or False. These represent respectively: - None: prepared, but neither committed or aborted (interrupted before commit or abort) - False: prepared and aborted (interrupted before abort complete) - True: prepared and committed (interrupted before commit complete) - """ - prepared_list = {} - for xid in self.get_xid_list(): - for _, record, _ in self.txn_map[xid]: - if isinstance(record, EnqueueRecord): - prepared_list[xid] = None - else: - prepared_list[xid] = record.is_transaction_complete_commit() - return prepared_list - def get_xid_list(self): - return self.txn_map.keys() - def report_str(self, _, show_records): - """Return a string containing a text report for all records in the map""" - if len(self.txn_map) == 0: - return 'No outstanding transactions found.' - rstr = '%d outstanding transaction(s)' % len(self.txn_map) - if show_records: - rstr += ':' - for xid, op_list in self.txn_map.iteritems(): - rstr += '\n %s containing %d operations:' % (Utils.format_xid(xid), len(op_list)) - for journal_file, record, _ in op_list: - rstr += '\n 0x%x:%s' % (journal_file.file_header.file_num, record) - else: - rstr += '.' - return rstr - def _find_record_id(self, xid, record_id): - """ Search for and return map list with supplied rid.""" - if xid in self.txn_map: - for txn_op in self.txn_map[xid]: - if txn_op[1].record_id == record_id: - return txn_op - for this_xid in self.txn_map.iterkeys(): - for txn_op in self.txn_map[this_xid]: - if txn_op[1].record_id == record_id: - return txn_op - return None - -class JournalStatistics(object): - """Journal statistics""" - def __init__(self): - self.total_record_count = 0 - self.transient_record_count = 0 - self.filler_record_count = 0 - self.enqueue_count = 0 - self.dequeue_count = 0 - self.transaction_record_count = 0 - self.transaction_enqueue_count = 0 - self.transaction_dequeue_count = 0 - self.transaction_commit_count = 0 - self.transaction_abort_count = 0 - self.transaction_operation_count = 0 - def __str__(self): - fstr = 'Total record count: %d\n' + \ - 'Transient record count: %d\n' + \ - 'Filler_record_count: %d\n' + \ - 'Enqueue_count: %d\n' + \ - 'Dequeue_count: %d\n' + \ - 'Transaction_record_count: %d\n' + \ - 'Transaction_enqueue_count: %d\n' + \ - 'Transaction_dequeue_count: %d\n' + \ - 'Transaction_commit_count: %d\n' + \ - 'Transaction_abort_count: %d\n' + \ - 'Transaction_operation_count: %d\n' - return fstr % (self.total_record_count, - self.transient_record_count, - self.filler_record_count, - self.enqueue_count, - self.dequeue_count, - self.transaction_record_count, - self.transaction_enqueue_count, - self.transaction_dequeue_count, - self.transaction_commit_count, - self.transaction_abort_count, - self.transaction_operation_count) - -class Journal(object): - """ - Instance of a Qpid Linear Store (QLS) journal. - """ - def __init__(self, directory, xid_prepared_list, args): - self.directory = directory - self.queue_name = os.path.basename(directory) - self.files = {} - self.file_num_list = None - self.file_num_itr = None - self.enq_map = EnqueueMap(self) - self.txn_map = TransactionMap(self.enq_map) - self.current_journal_file = None - self.first_rec_flag = None - self.statistics = JournalStatistics() - self.xid_prepared_list = xid_prepared_list # This is None for the TPL instance only - self.args = args - self.last_record_offset = None # TODO: Move into JournalFile - self.num_filler_records_required = None # TODO: Move into JournalFile - self.fill_to_offset = None - def add_record(self, record): - if isinstance(record, EnqueueRecord) or isinstance(record, DequeueRecord): - if record.xid_size > 0: - self.txn_map.add(self.current_journal_file, record) - else: - self.enq_map.add(self.current_journal_file, record, False) - elif isinstance(record, TransactionRecord): - self.txn_map.delete(self.current_journal_file, record) - else: - raise qls.err.InvalidRecordTypeError(self.current_journal_file, record, 'add to Journal') - def get_enq_map_record(self, rid): - return self.enq_map.get(rid) - def get_txn_map_record(self, xid): - return self.txn_map.get(xid) - def get_outstanding_txn_list(self): - return self.txn_map.get_xid_list() - def get_queue_name(self): - return self.queue_name - def recover(self, high_rid_counter): - print 'Recovering %s' % self.queue_name - self._analyze_files() - try: - while self._get_next_record(high_rid_counter): - pass - self._check_alignment() - except qls.err.NoMoreFilesInJournalError: - print 'No more files in journal' - except qls.err.FirstRecordOffsetMismatchError as err: - print '0x%08x: **** FRO ERROR: queue=\"%s\" fid=0x%x fro actual=0x%08x expected=0x%08x' % \ - (err.get_expected_fro(), err.get_queue_name(), err.get_file_number(), err.get_record_offset(), - err.get_expected_fro()) - def reconcile_transactions(self, prepared_list, txn_flag): - xid_list = self.txn_map.get_xid_list() - if len(xid_list) > 0: - print self.queue_name, 'contains', len(xid_list), 'open transaction(s):' - for xid in xid_list: - if xid in prepared_list.keys(): - commit_flag = prepared_list[xid] - if commit_flag is None: - print ' ', Utils.format_xid(xid), '- Assuming commit after prepare' - if txn_flag: - self.txn_map.commit(xid) - elif commit_flag: - print ' ', Utils.format_xid(xid), '- Completing interrupted commit operation' - if txn_flag: - self.txn_map.commit(xid) - else: - print ' ', Utils.format_xid(xid), '- Completing interrupted abort operation' - if txn_flag: - self.txn_map.abort(xid) - else: - print ' ', Utils.format_xid(xid), '- Ignoring, not in prepared transaction list' - if txn_flag: - self.txn_map.abort(xid) - def report(self, print_stats_flag): - print 'Journal "%s":' % self.queue_name - print '=' * (11 + len(self.queue_name)) - if print_stats_flag: - print str(self.statistics) - print self.enq_map.report_str(True, True) - print self.txn_map.report_str(True, True) - JournalFile.report_header() - for file_num in sorted(self.files.keys()): - self.files[file_num].report() - #TODO: move this to JournalFile, append to file info - if self.num_filler_records_required is not None and self.fill_to_offset is not None: - print '0x%x:0x%08x: %d filler records required for DBLK alignment to 0x%08x' % \ - (self.current_journal_file.file_header.file_num, self.last_record_offset, - self.num_filler_records_required, self.fill_to_offset) - print - #--- protected functions --- - def _analyze_files(self): - for dir_entry in os.listdir(self.directory): - dir_entry_bits = dir_entry.split('.') - if len(dir_entry_bits) == 2 and dir_entry_bits[1] == JournalRecoveryManager.JRNL_DIR_NAME: - fq_file_name = os.path.join(self.directory, dir_entry) - file_handle = open(fq_file_name) - args = Utils.load_args(file_handle, RecordHeader) - file_hdr = FileHeader(*args) - file_hdr.init(file_handle, *Utils.load_args(file_handle, FileHeader)) - if file_hdr.is_header_valid(file_hdr): - file_hdr.load(file_handle) - if file_hdr.is_valid(): - Utils.skip(file_handle, file_hdr.file_header_size_sblks * Utils.SBLK_SIZE) - self.files[file_hdr.file_num] = JournalFile(file_hdr) - self.file_num_list = sorted(self.files.keys()) - self.file_num_itr = iter(self.file_num_list) - def _check_alignment(self): # TODO: Move into JournalFile - remaining_sblks = self.last_record_offset % Utils.SBLK_SIZE - if remaining_sblks == 0: - self.num_filler_records_required = 0 - else: - self.num_filler_records_required = (Utils.SBLK_SIZE - remaining_sblks) / Utils.DBLK_SIZE - self.fill_to_offset = self.last_record_offset + (self.num_filler_records_required * Utils.DBLK_SIZE) - if self.args.show_recs or self.args.show_all_recs: - print '0x%x:0x%08x: %d filler records required for DBLK alignment to 0x%08x' % \ - (self.current_journal_file.file_header.file_num, self.last_record_offset, - self.num_filler_records_required, self.fill_to_offset) - def _check_file(self): - if self.current_journal_file is not None: - if not self.current_journal_file.file_header.is_end_of_file(): - return True - if self.current_journal_file.file_header.is_end_of_file(): - self.last_record_offset = self.current_journal_file.file_header.file_handle.tell() - if not self._get_next_file(): - return False - fhdr = self.current_journal_file.file_header - fhdr.file_handle.seek(fhdr.first_record_offset) - return True - def _get_next_file(self): - if self.current_journal_file is not None: - file_handle = self.current_journal_file.file_header.file_handle - if not file_handle.closed: # sanity check, should not be necessary - file_handle.close() - file_num = 0 - try: - while file_num == 0: - file_num = self.file_num_itr.next() - except StopIteration: - pass - if file_num == 0: - return False - self.current_journal_file = self.files[file_num] - self.first_rec_flag = True - if self.args.show_recs or self.args.show_all_recs: - print '0x%x:%s' % (self.current_journal_file.file_header.file_num, self.current_journal_file.file_header) - return True - def _get_next_record(self, high_rid_counter): - if not self._check_file(): - return False - self.last_record_offset = self.current_journal_file.file_header.file_handle.tell() - this_record = Utils.load(self.current_journal_file.file_header.file_handle, RecordHeader) - if not this_record.is_header_valid(self.current_journal_file.file_header): - return False - if self.first_rec_flag: - if this_record.file_offset != self.current_journal_file.file_header.first_record_offset: - raise qls.err.FirstRecordOffsetMismatchError(self.current_journal_file.file_header, this_record) - self.first_rec_flag = False - self.statistics.total_record_count += 1 - start_journal_file = self.current_journal_file - if isinstance(this_record, EnqueueRecord): - ok_flag = self._handle_enqueue_record(this_record, start_journal_file) - high_rid_counter.check(this_record.record_id) - if self.args.show_recs or self.args.show_all_recs: - print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record) - elif isinstance(this_record, DequeueRecord): - ok_flag = self._handle_dequeue_record(this_record, start_journal_file) - high_rid_counter.check(this_record.record_id) - if self.args.show_recs or self.args.show_all_recs: - print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record) - elif isinstance(this_record, TransactionRecord): - ok_flag = self._handle_transaction_record(this_record, start_journal_file) - high_rid_counter.check(this_record.record_id) - if self.args.show_recs or self.args.show_all_recs: - print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record) - else: - self.statistics.filler_record_count += 1 - ok_flag = True - if self.args.show_all_recs: - print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record) - Utils.skip(self.current_journal_file.file_header.file_handle, Utils.DBLK_SIZE) - return ok_flag - def _handle_enqueue_record(self, enqueue_record, start_journal_file): - while enqueue_record.load(self.current_journal_file.file_header.file_handle): - if not self._get_next_file(): - enqueue_record.truncated_flag = True - return False - if not enqueue_record.is_valid(start_journal_file): - return False - if enqueue_record.is_external() and enqueue_record.data != None: - raise qls.err.ExternalDataError(self.current_journal_file.file_header, enqueue_record) - if enqueue_record.is_transient(): - self.statistics.transient_record_count += 1 - return True - if enqueue_record.xid_size > 0: - self.txn_map.add(start_journal_file, enqueue_record) - self.statistics.transaction_operation_count += 1 - self.statistics.transaction_record_count += 1 - self.statistics.transaction_enqueue_count += 1 - else: - self.enq_map.add(start_journal_file, enqueue_record, False) - start_journal_file.incr_enq_cnt() - self.statistics.enqueue_count += 1 - return True - def _handle_dequeue_record(self, dequeue_record, start_journal_file): - while dequeue_record.load(self.current_journal_file.file_header.file_handle): - if not self._get_next_file(): - dequeue_record.truncated_flag = True - return False - if not dequeue_record.is_valid(start_journal_file): - return False - if dequeue_record.xid_size > 0: - if self.xid_prepared_list is None: # ie this is the TPL - dequeue_record.transaction_prepared_list_flag = True - elif not self.enq_map.contains(dequeue_record.dequeue_record_id): - dequeue_record.warnings.append('NOT IN EMAP') # Only for non-TPL records - self.txn_map.add(start_journal_file, dequeue_record) - self.statistics.transaction_operation_count += 1 - self.statistics.transaction_record_count += 1 - self.statistics.transaction_dequeue_count += 1 - else: - try: - self.enq_map.delete(start_journal_file, dequeue_record)[0].decr_enq_cnt(dequeue_record) - except qls.err.RecordIdNotFoundError: - dequeue_record.warnings.append('NOT IN EMAP') - self.statistics.dequeue_count += 1 - return True - def _handle_transaction_record(self, transaction_record, start_journal_file): - while transaction_record.load(self.current_journal_file.file_header.file_handle): - if not self._get_next_file(): - transaction_record.truncated_flag = True - return False - if not transaction_record.is_valid(start_journal_file): - return False - if transaction_record.magic[-1] == 'a': - self.statistics.transaction_abort_count += 1 - else: - self.statistics.transaction_commit_count += 1 - if self.txn_map.contains(transaction_record.xid): - self.txn_map.delete(self.current_journal_file, transaction_record) - else: - transaction_record.warnings.append('NOT IN TMAP') -# if transaction_record.magic[-1] == 'c': # commits only -# self._txn_obj_list[hdr.xid] = hdr - self.statistics.transaction_record_count += 1 - return True - def _load_data(self, record): - while not record.is_complete: - record.load(self.current_journal_file.file_handle) - -class JournalFile(object): - def __init__(self, file_header): - self.file_header = file_header - self.enq_cnt = 0 - self.deq_cnt = 0 - self.num_filler_records_required = None - def incr_enq_cnt(self): - self.enq_cnt += 1 - def decr_enq_cnt(self, record): - if self.enq_cnt <= self.deq_cnt: - raise qls.err.EnqueueCountUnderflowError(self.file_header, record) - self.deq_cnt += 1 - def get_enq_cnt(self): - return self.enq_cnt - self.deq_cnt - def is_outstanding_enq(self): - return self.enq_cnt > self.deq_cnt - @staticmethod - def report_header(): - print 'file_num enq_cnt p_no efp journal_file' - print '-------- ------- ---- ----- ------------' - def report(self): - comment = '' if self.file_header.file_num == 0 else '' - file_num_str = '0x%x' % self.file_header.file_num - print '%8s %7d %4d %4dk %s %s' % (file_num_str, self.get_enq_cnt(), self.file_header.partition_num, - self.file_header.efp_data_size_kb, - os.path.basename(self.file_header.file_handle.name), comment) +import time class RecordHeader(object): FORMAT = '<4s2H2Q' @@ -590,14 +40,14 @@ class RecordHeader(object): self.record_id = record_id self.warnings = [] self.truncated_flag = False - def checksum_encode(self): + def encode(self): return struct.pack(RecordHeader.FORMAT, self.magic, self.version, self.user_flags, self.serial, self.record_id) def load(self, file_handle): pass @staticmethod def discriminate(args): """Use the last char in the header magic to determine the header type""" - return _CLASSES.get(args[1][-1], RecordHeader) + return CLASSES.get(args[1][-1], RecordHeader) def is_empty(self): """Return True if this record is empty (ie has a magic of 0x0000""" return self.magic == '\x00'*4 @@ -608,17 +58,12 @@ class RecordHeader(object): if self.magic[:3] != 'QLS' or self.magic[3] not in ['a', 'c', 'd', 'e', 'f', 'x']: return False if self.magic[-1] != 'x': - if self.version != Utils.RECORD_VERSION: - raise qls.err.InvalidRecordVersionError(file_header, self, Utils.RECORD_VERSION) + if self.version != qls.utils.DEFAULT_RECORD_VERSION: + raise qls.err.InvalidRecordVersionError(file_header, self, qls.utils.DEFAULT_RECORD_VERSION) if self.serial != file_header.serial: return False return True - def _get_warnings(self): - warn_str = '' - for warn in self.warnings: - warn_str += '<%s>' % warn - return warn_str - def __str__(self): + def to_string(self): """Return string representation of this header""" if self.is_empty(): return '0x%08x: ' % (self.file_offset) @@ -628,6 +73,14 @@ class RecordHeader(object): return '0x%08x: [%c v=%d f=0x%04x rid=0x%x]' % \ (self.file_offset, self.magic[-1].upper(), self.version, self.user_flags, self.record_id) return '0x%08x: ' % (self.file_offset, self.magic) + def _get_warnings(self): + warn_str = '' + for warn in self.warnings: + warn_str += '<%s>' % warn + return warn_str + def __str__(self): + """Return string representation of this header""" + return RecordHeader.to_string(self) class RecordTail(object): FORMAT = '<4sL2Q' @@ -653,24 +106,28 @@ class RecordTail(object): if self.valid_flag is None: if not self.complete: return False - self.valid_flag = Utils.inv_str(self.xmagic) == record.magic and \ + self.valid_flag = qls.utils.inv_str(self.xmagic) == record.magic and \ self.serial == record.serial and \ self.record_id == record.record_id and \ - Utils.adler32(record.checksum_encode()) == self.checksum + qls.utils.adler32(record.checksum_encode()) == self.checksum return self.valid_flag - def __str__(self): + def to_string(self): """Return a string representation of the this RecordTail instance""" if self.valid_flag is not None: if not self.valid_flag: return '[INVALID RECORD TAIL]' - magic = Utils.inv_str(self.xmagic) + magic = qls.utils.inv_str(self.xmagic) magic_char = magic[-1].upper() if magic[-1] in string.printable else '?' return '[%c cs=0x%08x rid=0x%x]' % (magic_char, self.checksum, self.record_id) + def __str__(self): + """Return a string representation of the this RecordTail instance""" + return RecordTail.to_string(self) class FileHeader(RecordHeader): FORMAT = '<2H4x5QH' - def init(self, file_handle, _, file_header_size_sblks, partition_num, efp_data_size_kb, - first_record_offset, timestamp_sec, timestamp_ns, file_num, queue_name_len): + MAGIC = 'QLSf' + def init(self, file_handle, _, file_header_size_sblks, partition_num, efp_data_size_kb, first_record_offset, + timestamp_sec, timestamp_ns, file_num, queue_name_len): self.file_handle = file_handle self.file_header_size_sblks = file_header_size_sblks self.partition_num = partition_num @@ -681,11 +138,21 @@ class FileHeader(RecordHeader): self.file_num = file_num self.queue_name_len = queue_name_len self.queue_name = None - def load(self, file_handle): - self.queue_name = file_handle.read(self.queue_name_len) + def encode(self): + if self.queue_name is None: + return RecordHeader.encode(self) + struct.pack(self.FORMAT, self.file_header_size_sblks, \ + self.partition_num, self.efp_data_size_kb, \ + self.first_record_offset, self.timestamp_sec, \ + self.timestamp_ns, self.file_num, 0) + return RecordHeader.encode(self) + struct.pack(self.FORMAT, self.file_header_size_sblks, self.partition_num, \ + self.efp_data_size_kb, self.first_record_offset, \ + self.timestamp_sec, self.timestamp_ns, self.file_num, \ + self.queue_name_len) + self.queue_name def get_file_size(self): """Sum of file header size and data size""" - return (self.file_header_size_sblks * Utils.SBLK_SIZE) + (self.efp_data_size_kb * 1024) + return (self.file_header_size_sblks * qls.utils.DEFAULT_SBLK_SIZE) + (self.efp_data_size_kb * 1024) + def load(self, file_handle): + self.queue_name = file_handle.read(self.queue_name_len) def is_end_of_file(self): return self.file_handle.tell() >= self.get_file_size() def is_valid(self): @@ -704,18 +171,22 @@ class FileHeader(RecordHeader): return True def timestamp_str(self): """Get the timestamp of this record in string format""" - time = gmtime(self.timestamp_sec) + now = time.gmtime(self.timestamp_sec) fstr = '%%a %%b %%d %%H:%%M:%%S.%09d %%Y' % (self.timestamp_ns) - return strftime(fstr, time) - def __str__(self): + return time.strftime(fstr, now) + def to_string(self): """Return a string representation of the this FileHeader instance""" - return '%s fnum=0x%x fro=0x%08x p=%d s=%dk t=%s %s' % (RecordHeader.__str__(self), self.file_num, + return '%s fnum=0x%x fro=0x%08x p=%d s=%dk t=%s %s' % (RecordHeader.to_string(self), self.file_num, self.first_record_offset, self.partition_num, self.efp_data_size_kb, self.timestamp_str(), self._get_warnings()) + def __str__(self): + """Return a string representation of the this FileHeader instance""" + return FileHeader.to_string(self) class EnqueueRecord(RecordHeader): FORMAT = '<2Q' + MAGIC = 'QLSe' EXTERNAL_FLAG_MASK = 0x20 TRANSIENT_FLAG_MASK = 0x10 def init(self, _, xid_size, data_size): @@ -726,9 +197,13 @@ class EnqueueRecord(RecordHeader): self.data = None self.data_complete = False self.record_tail = None - def checksum_encode(self): - return RecordHeader.checksum_encode(self) + struct.pack(self.FORMAT, self.xid_size, self.data_size) + \ - self.xid + self.data + def checksum_encode(self): # encode excluding record tail + bytes = RecordHeader.encode(self) + struct.pack(self.FORMAT, self.xid_size, self.data_size) + if self.xid is not None: + bytes += self.xid + if self.data is not None: + bytes += self.data + return bytes def is_external(self): return self.user_flags & EnqueueRecord.EXTERNAL_FLAG_MASK > 0 def is_transient(self): @@ -750,13 +225,13 @@ class EnqueueRecord(RecordHeader): return True def load(self, file_handle): """Return True when load is incomplete and must be called again with new file handle""" - self.xid, self.xid_complete = Utils.load_data(file_handle, self.xid, self.xid_size) + self.xid, self.xid_complete = qls.utils.load_data(file_handle, self.xid, self.xid_size) if not self.xid_complete: return True if self.is_external(): self.data_complete = True else: - self.data, self.data_complete = Utils.load_data(file_handle, self.data, self.data_size) + self.data, self.data_complete = qls.utils.load_data(file_handle, self.data, self.data_size) if not self.data_complete: return True if self.xid_size > 0 or self.data_size > 0: @@ -769,6 +244,19 @@ class EnqueueRecord(RecordHeader): else: return True return False + def to_string(self, show_xid_flag, show_data_flag): + """Return a string representation of the this EnqueueRecord instance""" + if self.truncated_flag: + return '%s xid(%d) data(%d) [Truncated, no more files in journal]' % (RecordHeader.__str__(self), + self.xid_size, self.data_size) + if self.record_tail is None: + record_tail_str = '' + else: + record_tail_str = self.record_tail.to_string() + return '%s %s %s %s %s %s' % (RecordHeader.to_string(self), + qls.utils.format_xid(self.xid, self.xid_size, show_xid_flag), + qls.utils.format_data(self.data, self.data_size, show_data_flag), + record_tail_str, self._print_flags(), self._get_warnings()) def _print_flags(self): """Utility function to decode the flags field in the header and print a string representation""" fstr = '' @@ -784,19 +272,11 @@ class EnqueueRecord(RecordHeader): return fstr def __str__(self): """Return a string representation of the this EnqueueRecord instance""" - if self.truncated_flag: - return '%s xid(%d) data(%d) [Truncated, no more files in journal]' % (RecordHeader.__str__(self), - self.xid_size, self.data_size) - if self.record_tail is None: - record_tail_str = '' - else: - record_tail_str = str(self.record_tail) - return '%s %s %s %s %s %s' % (RecordHeader.__str__(self), Utils.format_xid(self.xid, self.xid_size), - Utils.format_data(self.data_size, self.data), record_tail_str, - self._print_flags(), self._get_warnings()) + return EnqueueRecord.to_string(self, False, False) class DequeueRecord(RecordHeader): FORMAT = '<2Q' + MAGIC = 'QLSd' TXN_COMPLETE_COMMIT_FLAG = 0x10 def init(self, _, dequeue_record_id, xid_size): self.dequeue_record_id = dequeue_record_id @@ -805,8 +285,8 @@ class DequeueRecord(RecordHeader): self.xid = None self.xid_complete = False self.record_tail = None - def checksum_encode(self): - return RecordHeader.checksum_encode(self) + struct.pack(self.FORMAT, self.dequeue_record_id, self.xid_size) + \ + def checksum_encode(self): # encode excluding record tail + return RecordHeader.encode(self) + struct.pack(self.FORMAT, self.dequeue_record_id, self.xid_size) + \ self.xid def is_transaction_complete_commit(self): return self.user_flags & DequeueRecord.TXN_COMPLETE_COMMIT_FLAG > 0 @@ -825,7 +305,7 @@ class DequeueRecord(RecordHeader): return True def load(self, file_handle): """Return True when load is incomplete and must be called again with new file handle""" - self.xid, self.xid_complete = Utils.load_data(file_handle, self.xid, self.xid_size) + self.xid, self.xid_complete = qls.utils.load_data(file_handle, self.xid, self.xid_size) if not self.xid_complete: return True if self.xid_size > 0: @@ -838,6 +318,19 @@ class DequeueRecord(RecordHeader): else: return True return False + def to_string(self, show_xid_flag): + """Return a string representation of the this DequeueRecord instance""" + if self.truncated_flag: + return '%s xid(%d) drid=0x%x [Truncated, no more files in journal]' % (RecordHeader.__str__(self), + self.xid_size, + self.dequeue_record_id) + if self.record_tail is None: + record_tail_str = '' + else: + record_tail_str = self.record_tail.to_string() + return '%s drid=0x%x %s %s %s %s' % (RecordHeader.to_string(self), self.dequeue_record_id, + qls.utils.format_xid(self.xid, self.xid_size, show_xid_flag), + record_tail_str, self._print_flags(), self._get_warnings()) def _print_flags(self): """Utility function to decode the flags field in the header and print a string representation""" if self.transaction_prepared_list_flag: @@ -848,27 +341,19 @@ class DequeueRecord(RecordHeader): return '' def __str__(self): """Return a string representation of the this DequeueRecord instance""" - if self.truncated_flag: - return '%s xid(%d) drid=0x%x [Truncated, no more files in journal]' % (RecordHeader.__str__(self), - self.xid_size, - self.dequeue_record_id) - if self.record_tail is None: - record_tail_str = '' - else: - record_tail_str = str(self.record_tail) - return '%s %s drid=0x%x %s %s %s' % (RecordHeader.__str__(self), Utils.format_xid(self.xid, self.xid_size), - self.dequeue_record_id, record_tail_str, self._print_flags(), - self._get_warnings()) + return DequeueRecord.to_string(self, False) class TransactionRecord(RecordHeader): FORMAT = ' 0: @@ -894,168 +379,24 @@ class TransactionRecord(RecordHeader): else: return True return False - def __str__(self): + def to_string(self, show_xid_flag): """Return a string representation of the this TransactionRecord instance""" if self.truncated_flag: return '%s xid(%d) [Truncated, no more files in journal]' % (RecordHeader.__str__(self), self.xid_size) if self.record_tail is None: record_tail_str = '' else: - record_tail_str = str(self.record_tail) - return '%s %s %s %s' % (RecordHeader.__str__(self), Utils.format_xid(self.xid, self.xid_size), record_tail_str, - self._get_warnings()) - -class Utils(object): - """Class containing utility functions for dealing with the journal""" - DBLK_SIZE = 128 - RECORD_VERSION = 2 - SBLK_SIZE = 4096 - @staticmethod - def adler32(data): - return zlib.adler32(data) & 0xffffffff - @staticmethod - def create_record(magic, uflags, journal_file, record_id, dequeue_record_id, xid, data): - record_class = _CLASSES.get(magic[-1]) - record = record_class(0, magic, Utils.RECORD_VERSION, uflags, journal_file.file_header.serial, record_id) - xid_length = len(xid) if xid is not None else 0 - if isinstance(record, EnqueueRecord): - data_length = len(data) if data is not None else 0 - record.init(None, xid_length, data_length) - elif isinstance(record, DequeueRecord): - record.init(None, dequeue_record_id, xid_length) - elif isinstance(record, TransactionRecord): - record.init(None, xid_length) - else: - raise qls.err.InvalidClassError(record.__class__.__name__) - if xid is not None: - record.xid = xid - record.xid_complete = True - if data is not None: - record.data = data - record.data_complete = True - record.record_tail = RecordTail(None) - record.record_tail.xmagic = Utils.inv_str(magic) - record.record_tail.checksum = Utils.adler32(record.checksum_encode()) - record.record_tail.serial = record.serial - record.record_tail.record_id = record.record_id - return record - @staticmethod - def format_data(dsize, data): - """Format binary data for printing""" - if data == None: - return '' - # << DEBUG >> - begin = data.find('msg') - end = data.find('\0', begin) - return 'data(%d)="%s"' % (dsize, data[begin:end]) - # << END DEBUG - if Utils._is_printable(data): - datastr = Utils._split_str(data) - else: - datastr = Utils._hex_split_str(data) - if dsize != len(data): - raise qls.err.DataSizeError(dsize, len(data), datastr) - return 'data(%d)="%s"' % (dsize, datastr) - @staticmethod - def format_xid(xid, xidsize=None): - """Format binary XID for printing""" - if xid == None and xidsize != None: - if xidsize > 0: - raise qls.err.XidSizeError(xidsize, 0, None) - return '' - if Utils._is_printable(xid): - xidstr = '"%s"' % Utils._split_str(xid) - else: - xidstr = '0x%s' % Utils._hex_split_str(xid) - if xidsize == None: - xidsize = len(xid) - elif xidsize != len(xid): - raise qls.err.XidSizeError(xidsize, len(xid), xidstr) - return 'xid(%d)=%s' % (xidsize, xidstr) - @staticmethod - def inv_str(in_string): - """Perform a binary 1's compliment (invert all bits) on a binary string""" - istr = '' - for index in range(0, len(in_string)): - istr += chr(~ord(in_string[index]) & 0xff) - return istr - @staticmethod - def load(file_handle, klass): - """Load a record of class klass from a file""" - args = Utils.load_args(file_handle, klass) - subclass = klass.discriminate(args) - result = subclass(*args) # create instance of record - if subclass != klass: - result.init(*Utils.load_args(file_handle, subclass)) - return result - @staticmethod - def load_args(file_handle, klass): - """Load the arguments from class klass""" - size = struct.calcsize(klass.FORMAT) - foffs = file_handle.tell(), - fbin = file_handle.read(size) - if len(fbin) != size: - raise qls.err.UnexpectedEndOfFileError(len(fbin), size, foffs, file_handle.name) - return foffs + struct.unpack(klass.FORMAT, fbin) - @staticmethod - def load_data(file_handle, element, element_size): - if element_size == 0: - return element, True - if element is None: - element = file_handle.read(element_size) - else: - read_size = element_size - len(element) - element += file_handle.read(read_size) - return element, len(element) == element_size - @staticmethod - def skip(file_handle, boundary): - """Read and discard disk bytes until the next multiple of boundary""" - if not file_handle.closed: - file_handle.read(Utils._rem_bytes_in_block(file_handle, boundary)) - #--- protected functions --- - @staticmethod - def _hex_str(in_str, begin, end): - """Return a binary string as a hex string""" - hstr = '' - for index in range(begin, end): - if Utils._is_printable(in_str[index]): - hstr += in_str[index] - else: - hstr += '\\%02x' % ord(in_str[index]) - return hstr - @staticmethod - def _hex_split_str(in_str):#, split_size = 50): - """Split a hex string into two parts separated by an ellipsis""" -# if len(in_str) <= split_size: -# return Utils._hex_str(in_str, 0, len(in_str)) -# return Utils._hex_str(in_str, 0, 10) + ' ... ' + Utils._hex_str(in_str, len(in_str)-10, len(in_str)) - return ''.join(x.encode('hex') for x in reversed(in_str)) - @staticmethod - def _is_printable(in_str): - """Return True if in_str in printable; False otherwise.""" - for this_char in in_str: - if this_char not in string.printable: - return False - return True - @staticmethod - def _rem_bytes_in_block(file_handle, block_size): - """Return the remaining bytes in a block""" - foffs = file_handle.tell() - return (Utils._size_in_blocks(foffs, block_size) * block_size) - foffs - @staticmethod - def _size_in_blocks(size, block_size): - """Return the size in terms of data blocks""" - return int((size + block_size - 1) / block_size) - @staticmethod - def _split_str(in_str, split_size = 50): - """Split a string into two parts separated by an ellipsis if it is longer than split_size""" - if len(in_str) < split_size: - return in_str - return in_str[:25] + ' ... ' + in_str[-25:] + record_tail_str = self.record_tail.to_string() + return '%s %s %s %s' % (RecordHeader.to_string(self), + qls.utils.format_xid(self.xid, self.xid_size, show_xid_flag), + record_tail_str, self._get_warnings()) + def __str__(self): + """Return a string representation of the this TransactionRecord instance""" + return TransactionRecord.to_string(self, False) # ============================================================================= -_CLASSES = { +CLASSES = { 'a': TransactionRecord, 'c': TransactionRecord, 'd': DequeueRecord, diff --git a/qpid/tools/src/py/qls/utils.py b/qpid/tools/src/py/qls/utils.py new file mode 100644 index 0000000000..758dc446c0 --- /dev/null +++ b/qpid/tools/src/py/qls/utils.py @@ -0,0 +1,206 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +Module: qls.utils + +Contains helper functions for qpid_qls_analyze. +""" + +import os +import qls.jrnl +import stat +import string +import struct +import subprocess +import zlib + +DEFAULT_DBLK_SIZE = 128 +DEFAULT_SBLK_SIZE = 4096 # 32 dblks +DEFAULT_SBLK_SIZE_KB = DEFAULT_SBLK_SIZE / 1024 +DEFAULT_RECORD_VERSION = 2 +DEFAULT_HEADER_SIZE_SBLKS = 1 + +def adler32(data): + """return the adler32 checksum of data""" + return zlib.adler32(data) & 0xffffffff + +def create_record(magic, uflags, journal_file, record_id, dequeue_record_id, xid, data): + """Helper function to construct a record with xid, data (where applicable) and consistent tail with checksum""" + record_class = qls.jrnl.CLASSES.get(magic[-1]) + record = record_class(0, magic, DEFAULT_RECORD_VERSION, uflags, journal_file.file_header.serial, record_id) + xid_length = len(xid) if xid is not None else 0 + if isinstance(record, qls.jrnl.EnqueueRecord): + data_length = len(data) if data is not None else 0 + record.init(None, xid_length, data_length) + elif isinstance(record, qls.jrnl.DequeueRecord): + record.init(None, dequeue_record_id, xid_length) + elif isinstance(record, qls.jrnl.TransactionRecord): + record.init(None, xid_length) + else: + raise qls.err.InvalidClassError(record.__class__.__name__) + if xid is not None: + record.xid = xid + record.xid_complete = True + if data is not None: + record.data = data + record.data_complete = True + record.record_tail = _mk_record_tail(record) + return record + +def efp_directory_size(directory_name): + """"Decode the directory name in the format NNNk to a numeric size, where NNN is a number string""" + try: + if directory_name[-1] == 'k': + return int(directory_name[:-1]) + except ValueError: + pass + return 0 + +def format_data(data, data_size=None, show_data_flag=True): + """Format binary data for printing""" + return _format_binary(data, data_size, show_data_flag, 'data', qls.err.DataSizeError, False) + +def format_xid(xid, xid_size=None, show_xid_flag=True): + """Format binary XID for printing""" + return _format_binary(xid, xid_size, show_xid_flag, 'xid', qls.err.XidSizeError, True) + +def get_avail_disk_space(path): + df_proc = subprocess.Popen(["df", path], stdout=subprocess.PIPE) + output = df_proc.communicate()[0] + return int(output.split('\n')[1].split()[3]) + +def has_write_permission(path): + stat_info = os.stat(path) + return bool(stat_info.st_mode & stat.S_IRGRP) + +def inv_str(in_string): + """Perform a binary 1's compliment (invert all bits) on a binary string""" + istr = '' + for index in range(0, len(in_string)): + istr += chr(~ord(in_string[index]) & 0xff) + return istr + +def load(file_handle, klass): + """Load a record of class klass from a file""" + args = load_args(file_handle, klass) + subclass = klass.discriminate(args) + result = subclass(*args) # create instance of record + if subclass != klass: + result.init(*load_args(file_handle, subclass)) + return result + +def load_args(file_handle, klass): + """Load the arguments from class klass""" + size = struct.calcsize(klass.FORMAT) + foffs = file_handle.tell(), + fbin = file_handle.read(size) + if len(fbin) != size: + raise qls.err.UnexpectedEndOfFileError(len(fbin), size, foffs, file_handle.name) + return foffs + struct.unpack(klass.FORMAT, fbin) + +def load_data(file_handle, element, element_size): + """Read element_size bytes of binary data from file_handle into element""" + if element_size == 0: + return element, True + if element is None: + element = file_handle.read(element_size) + else: + read_size = element_size - len(element) + element += file_handle.read(read_size) + return element, len(element) == element_size + +def skip(file_handle, boundary): + """Read and discard disk bytes until the next multiple of boundary""" + if not file_handle.closed: + file_handle.read(_rem_bytes_in_block(file_handle, boundary)) + +#--- protected functions --- + +def _format_binary(bin_str, bin_size, show_bin_flag, prefix, err_class, hex_num_flag): + """Format binary XID for printing""" + if bin_str is None and bin_size is not None: + if bin_size > 0: + raise err_class(bin_size, len(bin_str), bin_str) + return '' + if bin_size is None: + bin_size = len(bin_str) + elif bin_size != len(bin_str): + raise err_class(bin_size, len(bin_str), bin_str) + out_str = '%s(%d)' % (prefix, bin_size) + if show_bin_flag: + if _is_printable(bin_str): + binstr = '"%s"' % _split_str(bin_str) + elif hex_num_flag: + binstr = '0x%s' % _str_to_hex_num(bin_str) + else: + binstr = _hex_split_str(bin_str) + out_str += '=%s' % binstr + return out_str + +def _hex_str(in_str, begin, end): + """Return a binary string as a hex string""" + hstr = '' + for index in range(begin, end): + if _is_printable(in_str[index]): + hstr += in_str[index] + else: + hstr += '\\%02x' % ord(in_str[index]) + return hstr + +def _hex_split_str(in_str, split_size = 50): + """Split a hex string into two parts separated by an ellipsis""" + if len(in_str) <= split_size: + return _hex_str(in_str, 0, len(in_str)) + return _hex_str(in_str, 0, 10) + ' ... ' + _hex_str(in_str, len(in_str)-10, len(in_str)) + #return ''.join(x.encode('hex') for x in reversed(in_str)) + +def _is_printable(in_str): + """Return True if in_str in printable; False otherwise.""" + for this_char in in_str: + if this_char not in string.letters and this_char not in string.digits and this_char not in string.punctuation: + return False + return True + +def _mk_record_tail(record): + record_tail = qls.jrnl.RecordTail(None) + record_tail.xmagic = inv_str(record.magic) + record_tail.checksum = adler32(record.checksum_encode()) + record_tail.serial = record.serial + record_tail.record_id = record.record_id + return record_tail + +def _rem_bytes_in_block(file_handle, block_size): + """Return the remaining bytes in a block""" + foffs = file_handle.tell() + return (_size_in_blocks(foffs, block_size) * block_size) - foffs + +def _size_in_blocks(size, block_size): + """Return the size in terms of data blocks""" + return int((size + block_size - 1) / block_size) + +def _split_str(in_str, split_size = 50): + """Split a string into two parts separated by an ellipsis if it is longer than split_size""" + if len(in_str) < split_size: + return in_str + return in_str[:25] + ' ... ' + in_str[-25:] + +def _str_to_hex_num(in_str): + """Turn a string into a hex number representation, little endian assumed (ie LSB is first, MSB is last)""" + return ''.join(x.encode('hex') for x in reversed(in_str)) diff --git a/qpid/tools/src/py/qpid_qls_analyze.py b/qpid/tools/src/py/qpid_qls_analyze.py index a540587547..1b2655896c 100755 --- a/qpid/tools/src/py/qpid_qls_analyze.py +++ b/qpid/tools/src/py/qpid_qls_analyze.py @@ -1,5 +1,5 @@ #!/usr/bin/env python -# + # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -16,13 +16,44 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -# + +""" +qpid-qls-analyze + +Reads and analyzes a Qpid Linear Store (QLS) store directory. +""" import argparse import os import os.path -from qls import efp -from qls import jrnl +import qls.anal +import qls.efp + +class QlsAnalyzerArgParser(argparse.ArgumentParser): + def __init__(self): + argparse.ArgumentParser.__init__(self, description = 'Qpid Linear Store Analyzer', prog = 'qpid-qls-analyze') + self.add_argument('qls_dir', metavar='DIR', + help='Qpid Linear Store (QLS) directory to be analyzed') + self.add_argument('--efp', action='store_true', + help='Analyze the Emtpy File Pool (EFP) and show stats') + self.add_argument('--show-recs', action='store_true', + help='Show material records found during recovery') + self.add_argument('--show-all-recs', action='store_true', + help='Show all records (including fillers) found during recovery') + self.add_argument('--show-xids', action='store_true', + help='Show xid as hex number, otherwise show only xid length') + self.add_argument('--show-data', action='store_true', + help='Show data, otherwise show only data length') + self.add_argument('--stats', action='store_true', + help='Print journal record stats') + self.add_argument('--txn', action='store_true', + help='Reconcile incomplete transactions') + self.add_argument('--version', action='version', + version='%(prog)s ' + QqpdLinearStoreAnalyzer.QLS_ANALYZE_VERSION) + def parse_args(self, args=None, namespace=None): + args = argparse.ArgumentParser.parse_args(self, args, namespace) + # If required, perform additional validity checks here, raise errors if req'd + return args class QqpdLinearStoreAnalyzer(object): """ @@ -37,28 +68,10 @@ class QqpdLinearStoreAnalyzer(object): self.args = None self._process_args() self.qls_dir = os.path.abspath(self.args.qls_dir) - self.efp_manager = efp.EfpManager(self.qls_dir, self.args) - self.jrnl_recovery_mgr = jrnl.JournalRecoveryManager(self.qls_dir, self.args) - def _analyze_efp(self): - self.efp_manager.run(self.args) - def _analyze_journals(self): - self.jrnl_recovery_mgr.run(self.args) + self.efp_manager = qls.efp.EfpManager(self.qls_dir, None) + self.jrnl_recovery_mgr = qls.anal.JournalRecoveryManager(self.qls_dir, self.args) def _process_args(self): - parser = argparse.ArgumentParser(description = 'Qpid Linear Store Analyzer') - parser.add_argument('qls_dir', metavar='DIR', - help='Qpid Linear Store (QLS) directory to be analyzed') - parser.add_argument('--efp', action='store_true', - help='Analyze the Emtpy File Pool (EFP) and show stats') - parser.add_argument('--show-recs', action='store_true', - help='Show material records found during recovery') - parser.add_argument('--show-all-recs', action='store_true', - help='Show all records (including fillers) found during recovery') - parser.add_argument('--stats', action='store_true', - help='Print journal record stats') - parser.add_argument('--txn', action='store_true', - help='Reconcile incomplete transactions') - parser.add_argument('--version', action='version', version='%(prog)s ' + - QqpdLinearStoreAnalyzer.QLS_ANALYZE_VERSION) + parser = QlsAnalyzerArgParser() self.args = parser.parse_args() if not os.path.exists(self.args.qls_dir): parser.error('Journal path "%s" does not exist' % self.args.qls_dir) @@ -68,8 +81,8 @@ class QqpdLinearStoreAnalyzer(object): self.jrnl_recovery_mgr.report(self.args.stats) def run(self): if self.args.efp: - self._analyze_efp() - self._analyze_journals() + self.efp_manager.run(None) + self.jrnl_recovery_mgr.run() #============================================================================== # main program -- cgit v1.2.1