summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2014-03-18 13:54:46 +0000
committerKim van der Riet <kpvdr@apache.org>2014-03-18 13:54:46 +0000
commit1d8697cfcfa2d292d5b303797a0f1266cd3bb1d7 (patch)
treec92d248684b6607d4deb370edf396c543e00022d
parenteba5294974fb2a73b4e765c74196ba4a63079f03 (diff)
downloadqpid-python-1d8697cfcfa2d292d5b303797a0f1266cd3bb1d7.tar.gz
QPID-5362: No store tools exist for examining the journals - Bugfix and reorganization of qls python modules.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1578899 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/linearstore/ISSUES30
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/jdir.cpp152
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/jdir.h2
-rwxr-xr-xqpid/cpp/src/tests/linearstore/linearstoredirsetup.sh51
-rwxr-xr-xqpid/cpp/src/tests/linearstore/tx-test-soak.sh31
-rw-r--r--qpid/tools/src/py/qls/anal.py593
-rw-r--r--qpid/tools/src/py/qls/efp.py275
-rw-r--r--qpid/tools/src/py/qls/err.py62
-rw-r--r--qpid/tools/src/py/qls/jrnl.py873
-rw-r--r--qpid/tools/src/py/qls/utils.py206
-rwxr-xr-xqpid/tools/src/py/qpid_qls_analyze.py67
11 files changed, 1366 insertions, 976 deletions
diff --git a/qpid/cpp/src/qpid/linearstore/ISSUES b/qpid/cpp/src/qpid/linearstore/ISSUES
index a9908e882e..ccadefc20c 100644
--- a/qpid/cpp/src/qpid/linearstore/ISSUES
+++ b/qpid/cpp/src/qpid/linearstore/ISSUES
@@ -47,8 +47,6 @@ Current/pending:
svn r.1558592 2014-01-15 fixes an issue with using /dev/random as a source of random numbers for Journal serial numbers.
svn r.1558913 2014-01-16 replaces use of /dev/urandom with several calls to rand() to construct a 64-bit random number.
* Recommend rebuilding and testing for performance again with these two fixes. Marked POST.
-# - 1036026 [LinearStore] Qpid linear store unable to create durable queue - framing-error: Queue <q-name>: create() failed: jexception 0x0000
- UNABLE TO REPRODUCE - but Frantizek has additional info
- 1039522 Qpid crashes while recovering from linear store around apid::linearstore::journal::JournalFile::getFqFileName() including enq_rec::decode() threw JERR_JREC_BAD_RECTAIL
* Possible dup of 1039525
* May be fixed by QPID-5483 - waiting for needinfo, recommend rebuilding with QPID-5483 fix and re-testing. Marked POST.
@@ -56,18 +54,6 @@ Current/pending:
* Possible dup of 1039522
* May be fixed by QPID-5483 - waiting for needinfo, recommend rebuilding with QPID-5483 fix and re-testing. Marked POST.
# - 1049870 [LinearStore] auto-delete property does not survive restart
-# 5480 1053749 [linearstore] Recovery of store failure with "JERR_MAP_NOTFOUND: Key not found in map." error message
- svn r.1564877 2014-02-05: Proposed fix
- * Probability: 6 of 600 (1.0%) using tx-test-soak.sh
- * If broker is started a second time after failure, it starts correctly and test completes ok.
- * Problem: File is being recycled to EFP with still-locked enqueues in it (ie dequeued transactionally).
- * Problem: Record alignment check writes filler records to wrong file when decoding bad record moves across a file boundary
- * Test of fix failed on RHEL-7
-# - 1064181 [linearstore] Qpidd closes transactional client session&connection with async_dequeue() failed
- * jexception 0x010b LinearFileController::getCurrentSerial() threw JERR_NULL
-# - 1064230 [linearstore] Qpidd linearstore recovery sometimes fail to recover messages with recoverMessages() failed
- * jexception 0x0701 RecoveryManager::readNextRemainingRecord() threw JERR_JREC_BADRECTAIL
- * possible dup of 1063700
Fixed/closed (in commit order):
===============================
@@ -104,9 +90,24 @@ NO-JIRA - Added missing Apache copyright/license text
5479 1053701 [linearstore] Using recovered store results in "JERR_JNLF_FILEOFFSOVFL: Attempted to increase submitted offset past file size. (JournalFile::submittedDblkCount)" error message
* Probability: 2 of 600 (0.3%) using tx-test-soak.sh
* Fixed by checkin for QPID-5480, no longer able to reproduce. VERIFIED
+ 5480 1053749 [linearstore] Recovery of store failure with "JERR_MAP_NOTFOUND: Key not found in map." error message
+ svn r.1564877 2014-02-05: Proposed fix
+ * Probability: 6 of 600 (1.0%) using tx-test-soak.sh
+ * If broker is started a second time after failure, it starts correctly and test completes ok.
+ * Problem: File is being recycled to EFP with still-locked enqueues in it (ie dequeued transactionally).
+ * Problem: Record alignment check writes filler records to wrong file when decoding bad record moves across a file boundary
5603 1063700 [linearstore] broker restart fails under stress test
svn r.1574513 2014-03-05: Proposed fix. POST
* jexception 0x0701 RecoveryManager::readNextRemainingRecord() threw JERR_JREC_BADRECTAIL
+ 5607 1064181 [linearstore] Qpidd closes transactional client session&connection with async_dequeue() failed
+ svn r.1575009 2014-03-06 Proposed fix. POST
+ * jexception 0x010b LinearFileController::getCurrentSerial() threw JERR_NULL
+ - 1064230 [linearstore] Qpidd linearstore recovery sometimes fail to recover messages with recoverMessages() failed
+ * jexception 0x0701 RecoveryManager::readNextRemainingRecord() threw JERR_JREC_BADRECTAIL
+ * possible dup of 1063700
+ - 1036026 [LinearStore] Qpid linear store unable to create durable queue - framing-error: Queue <q-name>: create() failed: jexception 0x0000
+ * UNABLE TO REPRODUCE - but Frantizek has additional info
+ * Retested after checkin 1575009, problem solved. VERIFIED
Ordered checkin list:
=====================
@@ -135,6 +136,7 @@ no. svn r Q-JIRA RHBZ Date
19. 1564893 5361 - 2014-02-05
20. 1564935 5361 - 2014-02-05
21. 1574513 5603 1063700 2014-03-05
+22. 1575009 5607 1064181 2014-03-06
See above sections for details on these checkins.
diff --git a/qpid/cpp/src/qpid/linearstore/journal/jdir.cpp b/qpid/cpp/src/qpid/linearstore/journal/jdir.cpp
index 896f44ceff..36f180c21f 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/jdir.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/jdir.cpp
@@ -101,17 +101,9 @@ jdir::clear_dir(const std::string& dirname/*, const std::string&
*/
, const bool create_flag)
{
- DIR* dir = ::opendir(dirname.c_str());
- if (!dir)
- {
- if (errno == 2 && create_flag) // ENOENT (No such file or dir)
- {
- create_dir(dirname);
- return;
- }
- std::ostringstream oss;
- oss << "dir=\"" << dirname << "\"" << FORMAT_SYSERR(errno);
- throw jexception(jerrno::JERR_JDIR_OPENDIR, oss.str(), "jdir", "clear_dir");
+ DIR* dir = open_dir(dirname, "clear_dir", true);
+ if (!dir && create_flag) {
+ create_dir(dirname);
}
//#ifndef RHM_JOWRITE
struct dirent* entry;
@@ -161,13 +153,7 @@ jdir::push_down(const std::string& dirname, const std::string& target_dir/*, con
{
std::string bak_dir_name = create_bak_dir(dirname/*, bak_dir_base*/);
- DIR* dir = ::opendir(dirname.c_str());
- if (!dir)
- {
- std::ostringstream oss;
- oss << "dir=\"" << dirname << "\"" << FORMAT_SYSERR(errno);
- throw jexception(jerrno::JERR_JDIR_OPENDIR, oss.str(), "jdir", "push_down");
- }
+ DIR* dir = open_dir(dirname, "push_down", false);
// Copy contents of targetDirName into bak dir
struct dirent* entry;
while ((entry = ::readdir(dir)) != 0)
@@ -251,60 +237,49 @@ jdir::delete_dir(const std::string& dirname, bool children_only)
{
struct dirent* entry;
struct stat s;
- DIR* dir = ::opendir(dirname.c_str());
- if (!dir)
- {
- if (errno == ENOENT) // dir does not exist.
- return;
-
- std::ostringstream oss;
- oss << "dir=\"" << dirname << "\"" << FORMAT_SYSERR(errno);
- throw jexception(jerrno::JERR_JDIR_OPENDIR, oss.str(), "jdir", "delete_dir");
- }
- else
+ DIR* dir = open_dir(dirname, "delete_dir", true); // true = allow dir does not exist, return 0
+ if (!dir) return;
+ while ((entry = ::readdir(dir)) != 0)
{
- while ((entry = ::readdir(dir)) != 0)
+ // Ignore . and ..
+ if (std::strcmp(entry->d_name, ".") != 0 && std::strcmp(entry->d_name, "..") != 0)
{
- // Ignore . and ..
- if (std::strcmp(entry->d_name, ".") != 0 && std::strcmp(entry->d_name, "..") != 0)
+ std::string full_name(dirname + "/" + entry->d_name);
+ if (::lstat(full_name.c_str(), &s))
{
- std::string full_name(dirname + "/" + entry->d_name);
- if (::lstat(full_name.c_str(), &s))
- {
- ::closedir(dir);
- std::ostringstream oss;
- oss << "stat: file=\"" << full_name << "\"" << FORMAT_SYSERR(errno);
- throw jexception(jerrno::JERR_JDIR_STAT, oss.str(), "jdir", "delete_dir");
- }
- if (S_ISREG(s.st_mode) || S_ISLNK(s.st_mode)) // This is a file or slink
- {
- if(::unlink(full_name.c_str()))
- {
- ::closedir(dir);
- std::ostringstream oss;
- oss << "unlink: file=\"" << entry->d_name << "\"" << FORMAT_SYSERR(errno);
- throw jexception(jerrno::JERR_JDIR_UNLINK, oss.str(), "jdir", "delete_dir");
- }
- }
- else if (S_ISDIR(s.st_mode)) // This is a dir
- {
- delete_dir(full_name);
- }
- else // all other types, throw up!
+ ::closedir(dir);
+ std::ostringstream oss;
+ oss << "stat: file=\"" << full_name << "\"" << FORMAT_SYSERR(errno);
+ throw jexception(jerrno::JERR_JDIR_STAT, oss.str(), "jdir", "delete_dir");
+ }
+ if (S_ISREG(s.st_mode) || S_ISLNK(s.st_mode)) // This is a file or slink
+ {
+ if(::unlink(full_name.c_str()))
{
::closedir(dir);
std::ostringstream oss;
- oss << "file=\"" << entry->d_name << "\" is not a dir, file or slink.";
- oss << " (mode=0x" << std::hex << s.st_mode << std::dec << ")";
- throw jexception(jerrno::JERR_JDIR_BADFTYPE, oss.str(), "jdir", "delete_dir");
+ oss << "unlink: file=\"" << entry->d_name << "\"" << FORMAT_SYSERR(errno);
+ throw jexception(jerrno::JERR_JDIR_UNLINK, oss.str(), "jdir", "delete_dir");
}
}
+ else if (S_ISDIR(s.st_mode)) // This is a dir
+ {
+ delete_dir(full_name);
+ }
+ else // all other types, throw up!
+ {
+ ::closedir(dir);
+ std::ostringstream oss;
+ oss << "file=\"" << entry->d_name << "\" is not a dir, file or slink.";
+ oss << " (mode=0x" << std::hex << s.st_mode << std::dec << ")";
+ throw jexception(jerrno::JERR_JDIR_BADFTYPE, oss.str(), "jdir", "delete_dir");
+ }
}
+ }
// FIXME: Find out why this fails with false alarms/errors from time to time...
// While commented out, there is no error capture from reading dir entries.
// check_err(errno, dir, dirname, "delete_dir");
- }
// Now dir is empty, close and delete it
close_dir(dir, dirname, "delete_dir");
@@ -321,14 +296,8 @@ jdir::delete_dir(const std::string& dirname, bool children_only)
std::string
jdir::create_bak_dir(const std::string& dirname)
{
- DIR* dir = ::opendir(dirname.c_str());
+ DIR* dir = open_dir(dirname, "create_bak_dir", false);
long dir_num = 0L;
- if (!dir)
- {
- std::ostringstream oss;
- oss << "dir=\"" << dirname << "\"" << FORMAT_SYSERR(errno);
- throw jexception(jerrno::JERR_JDIR_OPENDIR, oss.str(), "jdir", "create_bak_dir");
- }
struct dirent* entry;
while ((entry = ::readdir(dir)) != 0)
{
@@ -407,25 +376,23 @@ void
jdir::read_dir(const std::string& name, std::vector<std::string>& dir_list, const bool incl_dirs, const bool incl_files, const bool incl_links, const bool return_fqfn) {
struct stat s;
if (is_dir(name)) {
- DIR* dir = ::opendir(name.c_str());
- if (dir != 0) {
- struct dirent* entry;
- while ((entry = ::readdir(dir)) != 0) {
- if (std::strcmp(entry->d_name, ".") != 0 && std::strcmp(entry->d_name, "..") != 0) { // Ignore . and ..
- std::string full_name(name + "/" + entry->d_name);
- if (::stat(full_name.c_str(), &s))
- {
- ::closedir(dir);
- std::ostringstream oss;
- oss << "stat: file=\"" << full_name << "\"" << FORMAT_SYSERR(errno);
- throw jexception(jerrno::JERR_JDIR_STAT, oss.str(), "jdir", "delete_dir");
- }
- if ((S_ISREG(s.st_mode) && incl_files) || (S_ISDIR(s.st_mode) && incl_dirs) || (S_ISLNK(s.st_mode) && incl_links)) {
- if (return_fqfn) {
- dir_list.push_back(name + "/" + entry->d_name);
- } else {
- dir_list.push_back(entry->d_name);
- }
+ DIR* dir = open_dir(name, "read_dir", false);
+ struct dirent* entry;
+ while ((entry = ::readdir(dir)) != 0) {
+ if (std::strcmp(entry->d_name, ".") != 0 && std::strcmp(entry->d_name, "..") != 0) { // Ignore . and ..
+ std::string full_name(name + "/" + entry->d_name);
+ if (::stat(full_name.c_str(), &s))
+ {
+ ::closedir(dir);
+ std::ostringstream oss;
+ oss << "stat: file=\"" << full_name << "\"" << FORMAT_SYSERR(errno);
+ throw jexception(jerrno::JERR_JDIR_STAT, oss.str(), "jdir", "delete_dir");
+ }
+ if ((S_ISREG(s.st_mode) && incl_files) || (S_ISDIR(s.st_mode) && incl_dirs) || (S_ISLNK(s.st_mode) && incl_links)) {
+ if (return_fqfn) {
+ dir_list.push_back(name + "/" + entry->d_name);
+ } else {
+ dir_list.push_back(entry->d_name);
}
}
}
@@ -457,6 +424,21 @@ jdir::close_dir(DIR* dir, const std::string& dir_name, const std::string& fn_nam
}
}
+DIR*
+jdir::open_dir(const std::string& dir_name, const std::string& fn_name, const bool test_enoent)
+{
+ DIR* dir = ::opendir(dir_name.c_str());
+ if (!dir) {
+ if (test_enoent && errno == ENOENT) {
+ return 0;
+ }
+ std::ostringstream oss;
+ oss << "dir=\"" << dir_name << "\"" << FORMAT_SYSERR(errno);
+ throw jexception(jerrno::JERR_JDIR_OPENDIR, oss.str(), "jdir", fn_name);
+ }
+ return dir;
+}
+
std::ostream&
operator<<(std::ostream& os, const jdir& jdir)
{
diff --git a/qpid/cpp/src/qpid/linearstore/journal/jdir.h b/qpid/cpp/src/qpid/linearstore/journal/jdir.h
index 86b16f8545..59f21ce499 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/jdir.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/jdir.h
@@ -353,6 +353,8 @@ namespace journal {
* \exception jerrno::JERR_JDIR_CLOSEDIR The directory handle could not be closed.
*/
static void close_dir(DIR* dir, const std::string& dir_name, const std::string& fn_name);
+
+ static DIR* open_dir(const std::string& dir_name, const std::string& fn_name, const bool test_enoent);
};
}}}
diff --git a/qpid/cpp/src/tests/linearstore/linearstoredirsetup.sh b/qpid/cpp/src/tests/linearstore/linearstoredirsetup.sh
index 3cad50b1c5..ef39767e9b 100755
--- a/qpid/cpp/src/tests/linearstore/linearstoredirsetup.sh
+++ b/qpid/cpp/src/tests/linearstore/linearstoredirsetup.sh
@@ -19,26 +19,37 @@
# under the License.
#
-
-STORE_DIR=/tmp
-LINEARSTOREDIR=~/RedHat/linearstore
-
-rm -rf $STORE_DIR/qls
-rm -rf $STORE_DIR/p002
-rm $STORE_DIR/p004
-
-mkdir $STORE_DIR/qls
-mkdir $STORE_DIR/p002
-touch $STORE_DIR/p004
-mkdir $STORE_DIR/qls/p001
-touch $STORE_DIR/qls/p003
-ln -s $STORE_DIR/p002 $STORE_DIR/qls/p002
-ln -s $STORE_DIR/p004 $STORE_DIR/qls/p004
-
-${LINEARSTOREDIR}/tools/src/py/linearstore/efptool.py $STORE_DIR/qls/ -a -p 1 -s 2048 -n 25
-${LINEARSTOREDIR}/tools/src/py/linearstore/efptool.py $STORE_DIR/qls/ -a -p 1 -s 512 -n 25
-${LINEARSTOREDIR}/tools/src/py/linearstore/efptool.py $STORE_DIR/qls/ -a -p 2 -s 2048 -n 25
-
+# This script sets up a test directory which contains both
+# recoverable and non-recoverable files and directories for
+# the empty file pool (EFP).
+
+# NOTE: The following is based on typical development tree paths, not installed paths
+
+BASE_DIR=${HOME}/RedHat
+STORE_DIR=${BASE_DIR}
+PYTHON_TOOLS_DIR=${BASE_DIR}/qpid/tools/src/linearstore
+export PYTHONPATH=${BASE_DIR}/qpid/python:${BASE_DIR}/qpid/extras/qmf/src/py:${BASE_DIR}/qpid/tools/src/py
+
+# Remove old dirs (if present)
+rm -rf ${STORE_DIR}/qls
+rm -rf ${STORE_DIR}/p002
+rm ${STORE_DIR}/p004
+
+# Create new dir tree and links
+mkdir ${STORE_DIR}/p002_ext
+touch ${STORE_DIR}/p004_ext
+mkdir ${STORE_DIR}/qls
+mkdir ${STORE_DIR}/qls/p001
+touch ${STORE_DIR}/qls/p003
+ln -s ${STORE_DIR}/p002_ext ${STORE_DIR}/qls/p002
+ln -s ${STORE_DIR}/p004_ext ${STORE_DIR}/qls/p004
+
+# Populate efp dirs with empty files
+${PYTHON_TOOLS_DIR}/efptool.py $STORE_DIR/qls/ -a -p 1 -s 2048 -n 25
+${PYTHON_TOOLS_DIR}/efptool.py $STORE_DIR/qls/ -a -p 1 -s 512 -n 25
+${PYTHON_TOOLS_DIR}/efptool.py $STORE_DIR/qls/ -a -p 2 -s 2048 -n 25
+
+# Show the result for information
${LINEARSTOREDIR}/tools/src/py/linearstore/efptool.py $STORE_DIR/qls/ -l
tree -la $STORE_DIR/qls
diff --git a/qpid/cpp/src/tests/linearstore/tx-test-soak.sh b/qpid/cpp/src/tests/linearstore/tx-test-soak.sh
index fa05e0a4a8..7d5581961f 100755
--- a/qpid/cpp/src/tests/linearstore/tx-test-soak.sh
+++ b/qpid/cpp/src/tests/linearstore/tx-test-soak.sh
@@ -19,7 +19,6 @@
# under the License.
#
-
# tx-test-soak
#
# Basic test methodology:
@@ -30,6 +29,8 @@
# 5. Run qpid-txtest against broker in check mode, which checks that all expected messages are present.
# 6. Wash, rinse, repeat... The number of runs is determined by ${NUM_RUNS}
+# NOTE: The following is based on typical development tree paths, not installed paths
+
NUM_RUNS=1000
BASE_DIR=${HOME}/RedHat
CMAKE_BUILD_DIR=${BASE_DIR}/q.cm
@@ -43,13 +44,18 @@ BROKER_MANAGEMENT="no" # "no" or "yes"
TRUNCATE_INTERVAL=10
MAX_DISK_PERC_USED=90
-# Consts (don't adjust these...)
+# Constants (don't adjust these)
export BASE_DIR
RELATIVE_BASE_DIR=`python -c "import os,os.path; print os.path.relpath(os.environ['BASE_DIR'], os.environ['PWD'])"`
+export PYTHONPATH=${BASE_DIR}/qpid/python:${BASE_DIR}/qpid/extras/qmf/src/py:${BASE_DIR}/qpid/tools/src/py
LOG_FILE_NAME=log.txt
QPIDD_FN=qpidd
QPIDD=${CMAKE_BUILD_DIR}/src/${QPIDD_FN}
-TXTEST=${CMAKE_BUILD_DIR}/src/tests/qpid-txtest
+TXTEST_FN=qpid-txtest
+TXTEST=${CMAKE_BUILD_DIR}/src/tests/${TXTEST_FN}
+ANALYZE_FN=qpid_qls_analyze.py
+ANALYZE=${BASE_DIR}/qpid/tools/src/py/${ANALYZE_FN}
+ANALYZE_ARGS="--efp --show-recs --stats"
QPIDD_BASE_ARGS="--load-module ${STORE_MODULE} -m ${BROKER_MANAGEMENT} --auth no --default-flow-stop-threshold 0 --default-flow-resume-threshold 0 --default-queue-limit 0 --store-dir ${BASE_DIR} --log-enable ${BROKER_LOG_LEVEL} --log-to-stderr no --log-to-stdout no"
TXTEST_INIT_STR="--init yes --transfer no --check no"
TXTEST_RUN_STR="--init no --transfer yes --check no"
@@ -181,6 +187,17 @@ check_ready_to_run() {
fi
}
+# Analyze store files
+# $1: Log suffix flag: either "A" or "B". If "A", client is started in test mode, otherwise client evaluates recovery.
+analyze_store() {
+ ${ANALYZE} ${ANALYZE_ARGS} ${BASE_DIR}/qls &> ${RESULT_DIR}/qls_analysis.$1.log
+ echo >> ${RESULT_DIR}/qls_analysis.$1.log
+ echo "----------------------------------------------------------" >> ${RESULT_DIR}/qls_analysis.$1.log
+ echo "With transactional reconsiliation:" >> ${RESULT_DIR}/qls_analysis.$1.log
+ echo >> ${RESULT_DIR}/qls_analysis.$1.log
+ ${ANALYZE} ${ANALYZE_ARGS} --txn ${BASE_DIR}/qls &>> ${RESULT_DIR}/qls_analysis.$1.log
+}
+
ulimit -c unlimited # Allow core files to be created
RESULT_BASE_DIR_SUFFIX=`date "${TIMESTAMP_FORMAT}"`
@@ -219,7 +236,8 @@ for rn in `seq ${NUM_RUNS}`; do
sleep ${RUN_TIME}
kill_process ${SIG_KILL} ${QPIDD_PID}
sleep 2
- tar -czf ${RESULT_DIR}/qls_B.tar.gz ${RELATIVE_BASE_DIR}/qls
+ analyze_store "A"
+ tar -czf ${RESULT_DIR}/qls_A.tar.gz ${RELATIVE_BASE_DIR}/qls
# === PART B: Recovery and check ===
start_broker "B"
@@ -234,11 +252,14 @@ for rn in `seq ${NUM_RUNS}`; do
kill_process ${SIG_KILL} ${PID}
sleep 2
fi
- tar -czf ${RESULT_DIR}/qls_C.tar.gz ${RELATIVE_BASE_DIR}/qls
+ analyze_store "B"
+ tar -czf ${RESULT_DIR}/qls_B.tar.gz ${RELATIVE_BASE_DIR}/qls
# === Check for errors, cores and exceptions in logs ===
grep -Hn "jexception" ${RESULT_DIR}/qpidd.A.log | tee -a ${LOG_FILE}
grep -Hn "jexception" ${RESULT_DIR}/qpidd.B.log | tee -a ${LOG_FILE}
+ grep -Hn "Traceback (most recent call last):" ${RESULT_DIR}/qls_analysis.A.log | tee -a ${LOG_FILE}
+ grep -Hn "Traceback (most recent call last):" ${RESULT_DIR}/qls_analysis.B.log | tee -a ${LOG_FILE}
grep "${SUCCESS_MSG}" ${RESULT_DIR}/txtest.B.log &> /dev/null
if [[ "$?" != "0" ]]; then
echo "ERROR in run ${rn}" >> ${LOG_FILE}
diff --git a/qpid/tools/src/py/qls/anal.py b/qpid/tools/src/py/qls/anal.py
new file mode 100644
index 0000000000..865dfa16c7
--- /dev/null
+++ b/qpid/tools/src/py/qls/anal.py
@@ -0,0 +1,593 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+Module: qls.anal
+
+Classes for recovery and analysis of a Qpid Linear Store (QLS).
+"""
+
+import os.path
+import qls.err
+import qls.jrnl
+import qls.utils
+
+class HighCounter(object):
+ def __init__(self):
+ self.num = 0
+ def check(self, num):
+ if self.num < num:
+ self.num = num
+ def get(self):
+ return self.num
+ def get_next(self):
+ self.num += 1
+ return self.num
+
+class JournalRecoveryManager(object):
+ TPL_DIR_NAME = 'tpl'
+ JRNL_DIR_NAME = 'jrnl'
+ def __init__(self, directory, args):
+ if not os.path.exists(directory):
+ raise qls.err.InvalidQlsDirectoryNameError(directory)
+ self.directory = directory
+ self.args = args
+ self.tpl = None
+ self.journals = {}
+ self.high_rid_counter = HighCounter()
+ self.prepared_list = None
+ def report(self, print_stats_flag):
+ self._reconcile_transactions(self.prepared_list, self.args.txn)
+ if self.tpl is not None:
+ self.tpl.report(print_stats_flag)
+ for queue_name in sorted(self.journals.keys()):
+ self.journals[queue_name].report(print_stats_flag)
+ def run(self):
+ tpl_dir = os.path.join(self.directory, JournalRecoveryManager.TPL_DIR_NAME)
+ if os.path.exists(tpl_dir):
+ self.tpl = Journal(tpl_dir, None, self.args)
+ self.tpl.recover(self.high_rid_counter)
+ if self.args.show_recs or self.args.show_all_recs:
+ print
+ jrnl_dir = os.path.join(self.directory, JournalRecoveryManager.JRNL_DIR_NAME)
+ self.prepared_list = self.tpl.txn_map.get_prepared_list() if self.tpl is not None else {}
+ if os.path.exists(jrnl_dir):
+ for dir_entry in sorted(os.listdir(jrnl_dir)):
+ jrnl = Journal(os.path.join(jrnl_dir, dir_entry), self.prepared_list, self.args)
+ jrnl.recover(self.high_rid_counter)
+ self.journals[jrnl.get_queue_name()] = jrnl
+ if self.args.show_recs or self.args.show_all_recs:
+ print
+ def _reconcile_transactions(self, prepared_list, txn_flag):
+ print 'Transaction reconciliation report:'
+ print '=================================='
+ print len(prepared_list), 'open transaction(s) found in prepared transaction list:'
+ for xid in prepared_list.keys():
+ commit_flag = prepared_list[xid]
+ if commit_flag is None:
+ status = '[Prepared, neither committed nor aborted - assuming commit]'
+ elif commit_flag:
+ status = '[Prepared, but interrupted during commit phase]'
+ else:
+ status = '[Prepared, but interrupted during abort phase]'
+ print ' ', qls.utils.format_xid(xid), status
+ if prepared_list[xid] is None: # Prepared, but not committed or aborted
+ enqueue_record = self.tpl.get_txn_map_record(xid)[0][1]
+ dequeue_record = qls.utils.create_record(qls.jrnl.DequeueRecord.MAGIC, \
+ qls.jrnl.DequeueRecord.TXN_COMPLETE_COMMIT_FLAG, \
+ self.tpl.current_journal_file, \
+ self.high_rid_counter.get_next(), \
+ enqueue_record.record_id, xid, None)
+ if txn_flag:
+ self.tpl.add_record(dequeue_record)
+ for queue_name in sorted(self.journals.keys()):
+ self.journals[queue_name].reconcile_transactions(prepared_list, txn_flag)
+ if len(prepared_list) > 0:
+ print 'Completing prepared transactions in prepared transaction list:'
+ for xid in prepared_list.keys():
+ print ' ', qls.utils.format_xid(xid)
+ transaction_record = qls.utils.create_record(qls.jrnl.TransactionRecord.MAGIC_COMMIT, 0, \
+ self.tpl.current_journal_file, \
+ self.high_rid_counter.get_next(), None, xid, None)
+ if txn_flag:
+ self.tpl.add_record(transaction_record)
+ print
+
+class EnqueueMap(object):
+ """
+ Map of enqueued records in a QLS journal
+ """
+ def __init__(self, journal):
+ self.journal = journal
+ self.enq_map = {}
+ def add(self, journal_file, enq_record, locked_flag):
+ if enq_record.record_id in self.enq_map:
+ raise qls.err.DuplicateRecordIdError(self.journal.current_file_header, enq_record)
+ self.enq_map[enq_record.record_id] = [journal_file, enq_record, locked_flag]
+ def contains(self, rid):
+ """Return True if the map contains the given rid"""
+ return rid in self.enq_map
+ def delete(self, journal_file, deq_record):
+ if deq_record.dequeue_record_id in self.enq_map:
+ enq_list = self.enq_map[deq_record.dequeue_record_id]
+ del self.enq_map[deq_record.dequeue_record_id]
+ return enq_list
+ else:
+ raise qls.err.RecordIdNotFoundError(journal_file.file_header, deq_record)
+ def get(self, record_id):
+ if record_id in self.enq_map:
+ return self.enq_map[record_id]
+ return None
+ def lock(self, journal_file, dequeue_record):
+ if dequeue_record.dequeue_record_id not in self.enq_map:
+ raise qls.err.RecordIdNotFoundError(journal_file.file_header, dequeue_record)
+ self.enq_map[dequeue_record.dequeue_record_id][2] = True
+ def report_str(self, _, show_records):
+ """Return a string containing a text report for all records in the map"""
+ if len(self.enq_map) == 0:
+ return 'No enqueued records found.'
+ rstr = '%d enqueued records found' % len(self.enq_map)
+ if show_records:
+ rstr += ":"
+ rid_list = self.enq_map.keys()
+ rid_list.sort()
+ for rid in rid_list:
+ journal_file, record, locked_flag = self.enq_map[rid]
+ if locked_flag:
+ lock_str = '[LOCKED]'
+ else:
+ lock_str = ''
+ rstr += '\n 0x%x:%s %s' % (journal_file.file_header.file_num, record, lock_str)
+ else:
+ rstr += '.'
+ return rstr
+ def unlock(self, journal_file, dequeue_record):
+ """Set the transaction lock for a given record_id to False"""
+ if dequeue_record.dequeue_record_id in self.enq_map:
+ if self.enq_map[dequeue_record.dequeue_record_id][2]:
+ self.enq_map[dequeue_record.dequeue_record_id][2] = False
+ else:
+ raise qls.err.RecordNotLockedError(journal_file.file_header, dequeue_record)
+ else:
+ raise qls.err.RecordIdNotFoundError(journal_file.file_header, dequeue_record)
+
+class TransactionMap(object):
+ """
+ Map of open transactions used while recovering a QLS journal
+ """
+ def __init__(self, enq_map):
+ self.txn_map = {}
+ self.enq_map = enq_map
+ def abort(self, xid):
+ """Perform an abort operation for the given xid record"""
+ for journal_file, record, _ in self.txn_map[xid]:
+ if isinstance(record, qls.jrnl.DequeueRecord):
+ if self.enq_map.contains(record.dequeue_record_id):
+ self.enq_map.unlock(journal_file, record)
+ else:
+ journal_file.decr_enq_cnt(record)
+ del self.txn_map[xid]
+ def add(self, journal_file, record):
+ if record.xid is None:
+ raise qls.err.NonTransactionalRecordError(journal_file.file_header, record, 'TransactionMap.add()')
+ if isinstance(record, qls.jrnl.DequeueRecord):
+ try:
+ self.enq_map.lock(journal_file, record)
+ except qls.err.RecordIdNotFoundError:
+ # Not in emap, look for rid in tmap - should not happen in practice
+ txn_op = self._find_record_id(record.xid, record.dequeue_record_id)
+ if txn_op != None:
+ if txn_op[2]:
+ raise qls.err.AlreadyLockedError(journal_file.file_header, record)
+ txn_op[2] = True
+ if record.xid in self.txn_map:
+ self.txn_map[record.xid].append([journal_file, record, False]) # append to existing list
+ else:
+ self.txn_map[record.xid] = [[journal_file, record, False]] # create new list
+ def commit(self, xid):
+ """Perform a commit operation for the given xid record"""
+ mismatch_list = []
+ for journal_file, record, lock in self.txn_map[xid]:
+ if isinstance(record, qls.jrnl.EnqueueRecord):
+ self.enq_map.add(journal_file, record, lock) # Transfer enq to emap
+ else:
+ if self.enq_map.contains(record.dequeue_record_id):
+ self.enq_map.unlock(journal_file, record)
+ self.enq_map.delete(journal_file, record)[0].decr_enq_cnt(record)
+ else:
+ mismatch_list.append('0x%x' % record.dequeue_record_id)
+ del self.txn_map[xid]
+ return mismatch_list
+ def contains(self, xid):
+ """Return True if the xid exists in the map; False otherwise"""
+ return xid in self.txn_map
+ def delete(self, journal_file, transaction_record):
+ """Remove a transaction record from the map using either a commit or abort header"""
+ if transaction_record.magic[-1] == 'c':
+ return self.commit(transaction_record.xid)
+ if transaction_record.magic[-1] == 'a':
+ self.abort(transaction_record.xid)
+ else:
+ raise qls.err.InvalidRecordTypeError(journal_file.file_header, transaction_record,
+ 'delete from Transaction Map')
+ def get(self, xid):
+ if xid in self.txn_map:
+ return self.txn_map[xid]
+ return None
+ def get_prepared_list(self):
+ """
+ Prepared list is a map of xid(key) to one of None, True or False. These represent respectively:
+ None: prepared, but neither committed or aborted (interrupted before commit or abort)
+ False: prepared and aborted (interrupted before abort complete)
+ True: prepared and committed (interrupted before commit complete)
+ """
+ prepared_list = {}
+ for xid in self.get_xid_list():
+ for _, record, _ in self.txn_map[xid]:
+ if isinstance(record, qls.jrnl.EnqueueRecord):
+ prepared_list[xid] = None
+ else:
+ prepared_list[xid] = record.is_transaction_complete_commit()
+ return prepared_list
+ def get_xid_list(self):
+ return self.txn_map.keys()
+ def report_str(self, _, show_records):
+ """Return a string containing a text report for all records in the map"""
+ if len(self.txn_map) == 0:
+ return 'No outstanding transactions found.'
+ rstr = '%d outstanding transaction(s)' % len(self.txn_map)
+ if show_records:
+ rstr += ':'
+ for xid, op_list in self.txn_map.iteritems():
+ rstr += '\n %s containing %d operations:' % (qls.utils.format_xid(xid), len(op_list))
+ for journal_file, record, _ in op_list:
+ rstr += '\n 0x%x:%s' % (journal_file.file_header.file_num, record)
+ else:
+ rstr += '.'
+ return rstr
+ def _find_record_id(self, xid, record_id):
+ """ Search for and return map list with supplied rid."""
+ if xid in self.txn_map:
+ for txn_op in self.txn_map[xid]:
+ if txn_op[1].record_id == record_id:
+ return txn_op
+ for this_xid in self.txn_map.iterkeys():
+ for txn_op in self.txn_map[this_xid]:
+ if txn_op[1].record_id == record_id:
+ return txn_op
+ return None
+
+class JournalStatistics(object):
+ """Journal statistics"""
+ def __init__(self):
+ self.total_record_count = 0
+ self.transient_record_count = 0
+ self.filler_record_count = 0
+ self.enqueue_count = 0
+ self.dequeue_count = 0
+ self.transaction_record_count = 0
+ self.transaction_enqueue_count = 0
+ self.transaction_dequeue_count = 0
+ self.transaction_commit_count = 0
+ self.transaction_abort_count = 0
+ self.transaction_operation_count = 0
+ def __str__(self):
+ fstr = 'Total record count: %d\n' + \
+ 'Transient record count: %d\n' + \
+ 'Filler_record_count: %d\n' + \
+ 'Enqueue_count: %d\n' + \
+ 'Dequeue_count: %d\n' + \
+ 'Transaction_record_count: %d\n' + \
+ 'Transaction_enqueue_count: %d\n' + \
+ 'Transaction_dequeue_count: %d\n' + \
+ 'Transaction_commit_count: %d\n' + \
+ 'Transaction_abort_count: %d\n' + \
+ 'Transaction_operation_count: %d\n'
+ return fstr % (self.total_record_count,
+ self.transient_record_count,
+ self.filler_record_count,
+ self.enqueue_count,
+ self.dequeue_count,
+ self.transaction_record_count,
+ self.transaction_enqueue_count,
+ self.transaction_dequeue_count,
+ self.transaction_commit_count,
+ self.transaction_abort_count,
+ self.transaction_operation_count)
+
+class Journal(object):
+ """
+ Instance of a Qpid Linear Store (QLS) journal.
+ """
+ def __init__(self, directory, xid_prepared_list, args):
+ self.directory = directory
+ self.queue_name = os.path.basename(directory)
+ self.files = {}
+ self.file_num_list = None
+ self.file_num_itr = None
+ self.enq_map = EnqueueMap(self)
+ self.txn_map = TransactionMap(self.enq_map)
+ self.current_journal_file = None
+ self.first_rec_flag = None
+ self.statistics = JournalStatistics()
+ self.xid_prepared_list = xid_prepared_list # This is None for the TPL instance only
+ self.args = args
+ self.last_record_offset = None # TODO: Move into JournalFile
+ self.num_filler_records_required = None # TODO: Move into JournalFile
+ self.fill_to_offset = None
+ def add_record(self, record):
+ if isinstance(record, qls.jrnl.EnqueueRecord) or isinstance(record, qls.jrnl.DequeueRecord):
+ if record.xid_size > 0:
+ self.txn_map.add(self.current_journal_file, record)
+ else:
+ self.enq_map.add(self.current_journal_file, record, False)
+ elif isinstance(record, qls.jrnl.TransactionRecord):
+ self.txn_map.delete(self.current_journal_file, record)
+ else:
+ raise qls.err.InvalidRecordTypeError(self.current_journal_file, record, 'add to Journal')
+ def get_enq_map_record(self, rid):
+ return self.enq_map.get(rid)
+ def get_txn_map_record(self, xid):
+ return self.txn_map.get(xid)
+ def get_outstanding_txn_list(self):
+ return self.txn_map.get_xid_list()
+ def get_queue_name(self):
+ return self.queue_name
+ def recover(self, high_rid_counter):
+ print 'Recovering', self.queue_name
+ self._analyze_files()
+ try:
+ while self._get_next_record(high_rid_counter):
+ pass
+ self._check_alignment()
+ except qls.err.NoMoreFilesInJournalError:
+ print 'No more files in journal'
+ except qls.err.FirstRecordOffsetMismatchError as err:
+ print '0x%08x: **** FRO ERROR: queue=\"%s\" fid=0x%x fro actual=0x%08x expected=0x%08x' % \
+ (err.get_expected_fro(), err.get_queue_name(), err.get_file_number(), err.get_record_offset(),
+ err.get_expected_fro())
+ def reconcile_transactions(self, prepared_list, txn_flag):
+ xid_list = self.txn_map.get_xid_list()
+ if len(xid_list) > 0:
+ print self.queue_name, 'contains', len(xid_list), 'open transaction(s):'
+ for xid in xid_list:
+ if xid in prepared_list.keys():
+ commit_flag = prepared_list[xid]
+ if commit_flag is None:
+ print ' ', qls.utils.format_xid(xid), '- Assuming commit after prepare'
+ if txn_flag:
+ self.txn_map.commit(xid)
+ elif commit_flag:
+ print ' ', qls.utils.format_xid(xid), '- Completing interrupted commit operation'
+ if txn_flag:
+ self.txn_map.commit(xid)
+ else:
+ print ' ', qls.utils.format_xid(xid), '- Completing interrupted abort operation'
+ if txn_flag:
+ self.txn_map.abort(xid)
+ else:
+ print ' ', qls.utils.format_xid(xid), '- Ignoring, not in prepared transaction list'
+ if txn_flag:
+ self.txn_map.abort(xid)
+ def report(self, print_stats_flag):
+ print 'Journal "%s":' % self.queue_name
+ print '=' * (11 + len(self.queue_name))
+ if print_stats_flag:
+ print str(self.statistics)
+ print self.enq_map.report_str(True, True)
+ print self.txn_map.report_str(True, True)
+ JournalFile.report_header()
+ for file_num in sorted(self.files.keys()):
+ self.files[file_num].report()
+ #TODO: move this to JournalFile, append to file info
+ if self.num_filler_records_required is not None and self.fill_to_offset is not None:
+ print '0x%x:0x%08x: %d filler records required for DBLK alignment to 0x%08x' % \
+ (self.current_journal_file.file_header.file_num, self.last_record_offset,
+ self.num_filler_records_required, self.fill_to_offset)
+ print
+ #--- protected functions ---
+ def _analyze_files(self):
+ for dir_entry in os.listdir(self.directory):
+ dir_entry_bits = dir_entry.split('.')
+ if len(dir_entry_bits) == 2 and dir_entry_bits[1] == JournalRecoveryManager.JRNL_DIR_NAME:
+ fq_file_name = os.path.join(self.directory, dir_entry)
+ file_handle = open(fq_file_name)
+ args = qls.utils.load_args(file_handle, qls.jrnl.RecordHeader)
+ file_hdr = qls.jrnl.FileHeader(*args)
+ file_hdr.init(file_handle, *qls.utils.load_args(file_handle, qls.jrnl.FileHeader))
+ if file_hdr.is_header_valid(file_hdr):
+ file_hdr.load(file_handle)
+ if file_hdr.is_valid():
+ qls.utils.skip(file_handle, file_hdr.file_header_size_sblks * qls.utils.DEFAULT_SBLK_SIZE)
+ self.files[file_hdr.file_num] = JournalFile(file_hdr)
+ self.file_num_list = sorted(self.files.keys())
+ self.file_num_itr = iter(self.file_num_list)
+ def _check_alignment(self): # TODO: Move into JournalFile
+ remaining_sblks = self.last_record_offset % qls.utils.DEFAULT_SBLK_SIZE
+ if remaining_sblks == 0:
+ self.num_filler_records_required = 0
+ else:
+ self.num_filler_records_required = (qls.utils.DEFAULT_SBLK_SIZE - remaining_sblks) / \
+ qls.utils.DEFAULT_DBLK_SIZE
+ self.fill_to_offset = self.last_record_offset + \
+ (self.num_filler_records_required * qls.utils.DEFAULT_DBLK_SIZE)
+ if self.args.show_recs or self.args.show_all_recs:
+ print '0x%x:0x%08x: %d filler records required for DBLK alignment to 0x%08x' % \
+ (self.current_journal_file.file_header.file_num, self.last_record_offset,
+ self.num_filler_records_required, self.fill_to_offset)
+ def _check_file(self):
+ if self.current_journal_file is not None:
+ if not self.current_journal_file.file_header.is_end_of_file():
+ return True
+ if self.current_journal_file.file_header.is_end_of_file():
+ self.last_record_offset = self.current_journal_file.file_header.file_handle.tell()
+ if not self._get_next_file():
+ return False
+ fhdr = self.current_journal_file.file_header
+ fhdr.file_handle.seek(fhdr.first_record_offset)
+ return True
+ def _get_next_file(self):
+ if self.current_journal_file is not None:
+ file_handle = self.current_journal_file.file_header.file_handle
+ if not file_handle.closed: # sanity check, should not be necessary
+ file_handle.close()
+ file_num = 0
+ try:
+ while file_num == 0:
+ file_num = self.file_num_itr.next()
+ except StopIteration:
+ pass
+ if file_num == 0:
+ return False
+ self.current_journal_file = self.files[file_num]
+ self.first_rec_flag = True
+ if self.args.show_recs or self.args.show_all_recs:
+ file_header = self.current_journal_file.file_header
+ print '0x%x:%s' % (file_header.file_num, file_header.to_string())
+ return True
+ def _get_next_record(self, high_rid_counter):
+ if not self._check_file():
+ return False
+ self.last_record_offset = self.current_journal_file.file_header.file_handle.tell()
+ this_record = qls.utils.load(self.current_journal_file.file_header.file_handle, qls.jrnl.RecordHeader)
+ if not this_record.is_header_valid(self.current_journal_file.file_header):
+ return False
+ if self.first_rec_flag:
+ if this_record.file_offset != self.current_journal_file.file_header.first_record_offset:
+ raise qls.err.FirstRecordOffsetMismatchError(self.current_journal_file.file_header, this_record)
+ self.first_rec_flag = False
+ self.statistics.total_record_count += 1
+ start_journal_file = self.current_journal_file
+ if isinstance(this_record, qls.jrnl.EnqueueRecord):
+ ok_flag = self._handle_enqueue_record(this_record, start_journal_file)
+ high_rid_counter.check(this_record.record_id)
+ if self.args.show_recs or self.args.show_all_recs:
+ print '0x%x:%s' % (start_journal_file.file_header.file_num, \
+ this_record.to_string(self.args.show_xids, self.args.show_data))
+ elif isinstance(this_record, qls.jrnl.DequeueRecord):
+ ok_flag = self._handle_dequeue_record(this_record, start_journal_file)
+ high_rid_counter.check(this_record.record_id)
+ if self.args.show_recs or self.args.show_all_recs:
+ print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record.to_string(self.args.show_xids))
+ elif isinstance(this_record, qls.jrnl.TransactionRecord):
+ ok_flag = self._handle_transaction_record(this_record, start_journal_file)
+ high_rid_counter.check(this_record.record_id)
+ if self.args.show_recs or self.args.show_all_recs:
+ print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record.to_string(self.args.show_xids))
+ else:
+ self.statistics.filler_record_count += 1
+ ok_flag = True
+ if self.args.show_all_recs:
+ print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record)
+ qls.utils.skip(self.current_journal_file.file_header.file_handle, qls.utils.DEFAULT_DBLK_SIZE)
+ return ok_flag
+ def _handle_enqueue_record(self, enqueue_record, start_journal_file):
+ while enqueue_record.load(self.current_journal_file.file_header.file_handle):
+ if not self._get_next_file():
+ enqueue_record.truncated_flag = True
+ return False
+ if not enqueue_record.is_valid(start_journal_file):
+ return False
+ if enqueue_record.is_external() and enqueue_record.data != None:
+ raise qls.err.ExternalDataError(self.current_journal_file.file_header, enqueue_record)
+ if enqueue_record.is_transient():
+ self.statistics.transient_record_count += 1
+ return True
+ if enqueue_record.xid_size > 0:
+ self.txn_map.add(start_journal_file, enqueue_record)
+ self.statistics.transaction_operation_count += 1
+ self.statistics.transaction_record_count += 1
+ self.statistics.transaction_enqueue_count += 1
+ else:
+ self.enq_map.add(start_journal_file, enqueue_record, False)
+ start_journal_file.incr_enq_cnt()
+ self.statistics.enqueue_count += 1
+ return True
+ def _handle_dequeue_record(self, dequeue_record, start_journal_file):
+ while dequeue_record.load(self.current_journal_file.file_header.file_handle):
+ if not self._get_next_file():
+ dequeue_record.truncated_flag = True
+ return False
+ if not dequeue_record.is_valid(start_journal_file):
+ return False
+ if dequeue_record.xid_size > 0:
+ if self.xid_prepared_list is None: # ie this is the TPL
+ dequeue_record.transaction_prepared_list_flag = True
+ elif not self.enq_map.contains(dequeue_record.dequeue_record_id):
+ dequeue_record.warnings.append('NOT IN EMAP') # Only for non-TPL records
+ self.txn_map.add(start_journal_file, dequeue_record)
+ self.statistics.transaction_operation_count += 1
+ self.statistics.transaction_record_count += 1
+ self.statistics.transaction_dequeue_count += 1
+ else:
+ try:
+ self.enq_map.delete(start_journal_file, dequeue_record)[0].decr_enq_cnt(dequeue_record)
+ except qls.err.RecordIdNotFoundError:
+ dequeue_record.warnings.append('NOT IN EMAP')
+ self.statistics.dequeue_count += 1
+ return True
+ def _handle_transaction_record(self, transaction_record, start_journal_file):
+ while transaction_record.load(self.current_journal_file.file_header.file_handle):
+ if not self._get_next_file():
+ transaction_record.truncated_flag = True
+ return False
+ if not transaction_record.is_valid(start_journal_file):
+ return False
+ if transaction_record.magic[-1] == 'a':
+ self.statistics.transaction_abort_count += 1
+ else:
+ self.statistics.transaction_commit_count += 1
+ if self.txn_map.contains(transaction_record.xid):
+ self.txn_map.delete(self.current_journal_file, transaction_record)
+ else:
+ transaction_record.warnings.append('NOT IN TMAP')
+# if transaction_record.magic[-1] == 'c': # commits only
+# self._txn_obj_list[hdr.xid] = hdr
+ self.statistics.transaction_record_count += 1
+ return True
+ def _load_data(self, record):
+ while not record.is_complete:
+ record.load(self.current_journal_file.file_handle)
+
+class JournalFile(object):
+ def __init__(self, file_header):
+ self.file_header = file_header
+ self.enq_cnt = 0
+ self.deq_cnt = 0
+ self.num_filler_records_required = None
+ def incr_enq_cnt(self):
+ self.enq_cnt += 1
+ def decr_enq_cnt(self, record):
+ if self.enq_cnt <= self.deq_cnt:
+ raise qls.err.EnqueueCountUnderflowError(self.file_header, record)
+ self.deq_cnt += 1
+ def get_enq_cnt(self):
+ return self.enq_cnt - self.deq_cnt
+ def is_outstanding_enq(self):
+ return self.enq_cnt > self.deq_cnt
+ @staticmethod
+ def report_header():
+ print 'file_num enq_cnt p_no efp journal_file'
+ print '-------- ------- ---- ----- ------------'
+ def report(self):
+ comment = '<uninitialized>' if self.file_header.file_num == 0 else ''
+ file_num_str = '0x%x' % self.file_header.file_num
+ print '%8s %7d %4d %4dk %s %s' % (file_num_str, self.get_enq_cnt(), self.file_header.partition_num,
+ self.file_header.efp_data_size_kb,
+ os.path.basename(self.file_header.file_handle.name), comment)
diff --git a/qpid/tools/src/py/qls/efp.py b/qpid/tools/src/py/qls/efp.py
index abf289dc12..93b77eea93 100644
--- a/qpid/tools/src/py/qls/efp.py
+++ b/qpid/tools/src/py/qls/efp.py
@@ -17,38 +17,137 @@
# under the License.
#
+"""
+Module: qls.efp
+
+Contains empty file pool (EFP) classes.
+"""
+
import os
import os.path
import qls.err
+import shutil
+import uuid
class EfpManager(object):
"""
Top level class to analyze the Qpid Linear Store (QLS) directory for the partitions that make up the
Empty File Pool (EFP).
"""
- def __init__(self, directory, args):
+ def __init__(self, directory, disk_space_required_kb):
if not os.path.exists(directory):
raise qls.err.InvalidQlsDirectoryNameError(directory)
self.directory = directory
- self.args = args
- self.partitions = []
+ self.disk_space_required_kb = disk_space_required_kb
+ self.efp_partitions = []
+ self.efp_pools = {}
+ self.total_num_files = 0
+ self.total_cum_file_size_kb = 0
+ self.current_efp_partition = None
+ def add_file_pool(self, file_size_kb, num_files):
+ """ Add an EFP in the specified partition of the specified size containing the specified number of files """
+ dir_name = EmptyFilePool.get_directory_name(file_size_kb)
+ print 'Adding pool \'%s\' to partition %s' % (dir_name, self.current_efp_partition.partition_number)
+ self.total_cum_file_size_kb += self.current_efp_partition.create_new_efp(file_size_kb, num_files)
+ self.total_num_files += num_files
+ def freshen_file_pool(self, file_size_kb, num_files):
+ """ Freshen an EFP in the specified partition and of the specified size to the specified number of files """
+ if self.current_efp_partition is None:
+ partition_list = self.efp_partitions
+ partition_str = 'all partitions'
+ else:
+ partition_list = [self.current_efp_partition]
+ partition_str = 'partition %d' % self.current_efp_partition.partition_number
+ if file_size_kb is None:
+ pool_str = 'all pools'
+ else:
+ pool_str = 'pool \'%s\'' % EmptyFilePool.get_directory_name(int(file_size_kb))
+ print 'Freshening %s in %s to %d files' % (pool_str, partition_str, num_files)
+ for self.current_efp_partition in partition_list: # Partition objects
+ if file_size_kb is None:
+ file_size_list = self.current_efp_partition.efp_pools.keys()
+ else:
+ file_size_list = ['%sk' % file_size_kb]
+ for file_size in file_size_list:
+ efp = self.current_efp_partition.efp_pools[file_size]
+ num_files_needed = num_files - efp.get_tot_file_count()
+ if num_files_needed > 0:
+ self.current_efp_partition.create_new_efp_files(qls.utils.efp_directory_size(file_size),
+ num_files_needed)
+ else:
+ print ' WARNING: Pool %s in partition %s already contains %d files: no action taken' % \
+ (self.current_efp_partition.efp_pools[file_size].size_str,
+ self.current_efp_partition.partition_number, efp.get_num_files())
+ def remove_file_pool(self, file_size_kb):
+ """ Remove an existing EFP from the specified partition and of the specified size """
+ dir_name = EmptyFilePool.get_directory_name(file_size_kb)
+ print 'Removing pool \'%s\' from partition %s' % (dir_name, self.current_efp_partition.partition_number)
+ self.efp_partitions.remove(self.current_efp_partition)
+ shutil.rmtree(os.path.join(self.current_efp_partition.efp_directory, dir_name))
def report(self):
- print 'Found', len(self.partitions), 'partition(s).'
- if (len(self.partitions)) > 0:
+ print 'Empty File Pool (EFP) report:'
+ print '============================='
+ print 'Found', len(self.efp_partitions), 'partition(s).'
+ if (len(self.efp_partitions)) > 0:
EfpPartition.print_report_table_header()
- for ptn in self.partitions:
+ for ptn in self.efp_partitions:
ptn.print_report_table_line()
print
- for ptn in self.partitions:
+ for ptn in self.efp_partitions:
ptn.report()
- def run(self, _):
+ def run(self, arg_tup):
+ self._analyze_efp()
+ if arg_tup is not None:
+ _, arg_file_size, arg_num_files, arg_add, arg_remove, arg_freshen, arg_list = arg_tup
+ self._check_args(arg_tup)
+ if arg_add:
+ self.add_file_pool(int(arg_file_size), int(arg_num_files))
+ if arg_remove:
+ self.remove_file_pool(int(arg_file_size))
+ if arg_freshen:
+ self.freshen_file_pool(arg_file_size, int(arg_num_files))
+ if arg_list:
+ self.report()
+ def _analyze_efp(self):
for dir_entry in os.listdir(self.directory):
try:
- efpp = EfpPartition(os.path.join(self.directory, dir_entry))
- efpp.scan()
- self.partitions.append(efpp)
+ efp_partition = EfpPartition(os.path.join(self.directory, dir_entry), self.disk_space_required_kb)
+ efp_partition.scan()
+ self.efp_partitions.append(efp_partition)
+ for efpl in efp_partition.efp_pools.iterkeys():
+ if efpl not in self.efp_pools:
+ self.efp_pools[efpl] = []
+ self.efp_pools[efpl].append(efp_partition.efp_pools[efpl])
+ self.total_num_files += efp_partition.tot_file_count
+ self.total_cum_file_size_kb += efp_partition.tot_file_size_kb
except qls.err.InvalidPartitionDirectoryNameError:
pass
+ def _check_args(self, arg_tup):
+ """ Value check of args. The names of partitions and pools are validated against the discovered instances """
+ arg_partition, arg_file_size, _, arg_add, arg_remove, arg_freshen, _ = arg_tup
+ if arg_partition is not None:
+ try:
+ if arg_partition[0] == 'p': # string partition name, eg 'p001'
+ partition_num = int(arg_partition[1:])
+ else: # numeric partition, eg '1'
+ partition_num = int(arg_partition)
+ found = False
+ for partition in self.efp_partitions:
+ if partition.partition_number == partition_num:
+ self.current_efp_partition = partition
+ found = True
+ break
+ if not found:
+ raise qls.err.PartitionDoesNotExistError(arg_partition)
+ except ValueError:
+ raise qls.err.InvalidPartitionDirectoryNameError(arg_partition)
+ if self.current_efp_partition is not None:
+ pool_list = self.current_efp_partition.efp_pools.keys()
+ efp_directory_name = EmptyFilePool.get_directory_name(int(arg_file_size))
+ if arg_add and efp_directory_name in pool_list:
+ raise qls.err.PoolDirectoryAlreadyExistsError(efp_directory_name)
+ if (arg_remove or arg_freshen) and efp_directory_name not in pool_list:
+ raise qls.err.PoolDirectoryDoesNotExistError(efp_directory_name)
class EfpPartition(object):
"""
@@ -56,55 +155,59 @@ class EfpPartition(object):
"""
PTN_DIR_PREFIX = 'p'
EFP_DIR_NAME = 'efp'
- def __init__(self, directory):
- self.base_dir = os.path.basename(directory)
- if self.base_dir[0] is not EfpPartition.PTN_DIR_PREFIX:
- raise qls.err.InvalidPartitionDirectoryNameError(directory)
- try:
- self.partition_number = int(self.base_dir[1:])
- except ValueError:
- raise qls.err.InvalidPartitionDirectoryNameError(directory)
+ def __init__(self, directory, disk_space_required_kb):
self.directory = directory
- self.pools = []
- self.efp_count = 0
+ self.partition_number = None
+ self.efp_pools = {}
self.tot_file_count = 0
self.tot_file_size_kb = 0
- def get_directory(self):
- return self.directory
- def get_efp_count(self):
- return self.efp_count
- def get_name(self):
- return self.base_dir
- def get_number(self):
- return self.partition_number
- def get_number_pools(self):
- return len(self.pools)
- def get_tot_file_count(self):
- return self.tot_file_count
- def get_tot_file_size_kb(self):
- return self.tot_file_size_kb
+ self._validate_partition_directory(disk_space_required_kb)
+ def create_new_efp_files(self, file_size_kb, num_files):
+ """ Create new EFP files in this partition """
+ dir_name = EmptyFilePool.get_directory_name(file_size_kb)
+ if dir_name in self.efp_pools.keys():
+ efp = self.efp_pools[dir_name]
+ else:
+ efp = EmptyFilePool(os.path.join(self.directory, EfpPartition.EFP_DIR_NAME), dir_name)
+ this_tot_file_size_kb = efp.create_new_efp_files(num_files)
+ self.tot_file_size_kb += this_tot_file_size_kb
+ self.tot_file_count += num_files
+ return this_tot_file_size_kb
@staticmethod
def print_report_table_header():
print 'p_no no_efp tot_files tot_size_kb directory'
print '---- ------ --------- ----------- ---------'
def print_report_table_line(self):
- print '%4d %6d %9d %11d %s' % (self.get_number(), self.get_efp_count(), self.get_tot_file_count(),
- self.get_tot_file_size_kb(), self.get_directory())
+ print '%4d %6d %9d %11d %s' % (self.partition_number, len(self.efp_pools), self.tot_file_count,
+ self.tot_file_size_kb, self.directory)
def report(self):
- print 'Partition %s:' % self.base_dir
+ print 'Partition %s:' % os.path.basename(self.directory)
EmptyFilePool.print_report_table_header()
- for pool in self.pools:
- pool.print_report_table_line()
+ for dir_name in self.efp_pools.keys():
+ self.efp_pools[dir_name].print_report_table_line()
print
def scan(self):
if os.path.exists(self.directory):
efp_dir = os.path.join(self.directory, EfpPartition.EFP_DIR_NAME)
for dir_entry in os.listdir(efp_dir):
- efp = EmptyFilePool(os.path.join(efp_dir, dir_entry))
- self.efp_count += 1
+ efp = EmptyFilePool(os.path.join(efp_dir, dir_entry), self.partition_number)
+ efp.scan()
self.tot_file_count += efp.get_tot_file_count()
self.tot_file_size_kb += efp.get_tot_file_size_kb()
- self.pools.append(efp)
+ self.efp_pools[dir_entry] = efp
+ def _validate_partition_directory(self, disk_space_required_kb):
+ if os.path.basename(self.directory)[0] is not EfpPartition.PTN_DIR_PREFIX:
+ raise qls.err.InvalidPartitionDirectoryNameError(self.directory)
+ try:
+ self.partition_number = int(os.path.basename(self.directory)[1:])
+ except ValueError:
+ raise qls.err.InvalidPartitionDirectoryNameError(self.directory)
+ if not qls.utils.has_write_permission(self.directory):
+ raise qls.err.WritePermissionError(self.directory)
+ if disk_space_required_kb is not None:
+ space_avail = qls.utils.get_avail_disk_space(self.directory)
+ if space_avail < (disk_space_required_kb * 1024):
+ raise qls.err.InsufficientSpaceOnDiskError(self.directory, space_avail, disk_space_required_kb * 1024)
class EmptyFilePool(object):
"""
@@ -112,33 +215,89 @@ class EmptyFilePool(object):
journal files (but it may also be empty).
"""
EFP_DIR_SUFFIX = 'k'
- def __init__(self, directory):
- self.base_dir = os.path.basename(directory)
- if self.base_dir[-1] is not EmptyFilePool.EFP_DIR_SUFFIX:
- raise qls.err.InvalidEfpDirectoryNameError(directory)
- try:
- self.data_size_kb = int(os.path.basename(self.base_dir)[:-1])
- except ValueError:
- raise qls.err.InvalidEfpDirectoryNameError(directory)
+ EFP_JRNL_EXTENTION = '.jrnl'
+ def __init__(self, directory, partition_number):
+ self.base_dir_name = os.path.basename(directory)
self.directory = directory
- self.files = os.listdir(directory)
+ self.partition_number = partition_number
+ self.data_size_kb = None
+ self.files = []
+ self.tot_file_size_kb = 0
+ self._validate_efp_directory()
+ def create_new_efp_files(self, num_files):
+ """ Create one or more new empty journal files of the prescribed size for this EFP """
+ this_total_file_size = 0
+ for _ in range(num_files):
+ this_total_file_size += self._create_new_efp_file()
+ return this_total_file_size
def get_directory(self):
return self.directory
- def get_file_data_size_kb(self):
- return self.data_size_kb
- def get_name(self):
- return self.base_dir
+ @staticmethod
+ def get_directory_name(file_size_kb):
+ """ Static function to create an EFP directory name from the size of the files it contains """
+ return '%dk' % file_size_kb
def get_tot_file_count(self):
return len(self.files)
def get_tot_file_size_kb(self):
return self.data_size_kb * len(self.files)
@staticmethod
def print_report_table_header():
- print 'file_size_kb file_count tot_file_size_kb efp_directory'
+ print 'data_size_kb file_count tot_file_size_kb efp_directory'
print '------------ ---------- ---------------- -------------'
def print_report_table_line(self):
- print '%12d %10d %16d %s' % (self.get_file_data_size_kb(), self.get_tot_file_count(),
+ print '%12d %10d %16d %s' % (self.data_size_kb, self.get_tot_file_count(),
self.get_tot_file_size_kb(), self.get_directory())
+ def scan(self):
+ for efp_file in os.listdir(self.directory):
+ if self._validate_efp_file(os.path.join(self.directory, efp_file)):
+ self.files.append(efp_file)
+ def _add_efp_file(self, efp_file_name):
+ """ Add a single journal file of the appropriate size to this EFP. No file size check is made here. """
+ self.files.append(efp_file_name)
+ self.tot_file_size_kb += os.path.getsize(efp_file_name)
+ def _create_new_efp_file(self):
+ """ Create a single new empty journal file of the prescribed size for this EFP """
+ file_name = str(uuid.uuid4()) + EmptyFilePool.EFP_JRNL_EXTENTION
+ file_header = qls.jrnl.FileHeader(0, qls.jrnl.FileHeader.MAGIC, qls.utils.DEFAULT_RECORD_VERSION, 0, 0, 0)
+ file_header.init(None, None, qls.utils.DEFAULT_HEADER_SIZE_SBLKS, self.partition_number, self.data_size_kb,
+ 0, 0, 0, 0, 0)
+ efh = file_header.encode()
+ efh_bytes = len(efh)
+ file_handle = open(os.path.join(self.directory, file_name), 'wb')
+ file_handle.write(efh)
+ file_handle.write('\xff' * (qls.utils.DEFAULT_SBLK_SIZE - efh_bytes))
+ file_handle.write('\x00' * (int(self.data_size_kb) * 1024))
+ file_handle.close()
+ fqfn = os.path.join(self.directory, file_name)
+ self._add_efp_file(fqfn)
+ return os.path.getsize(fqfn)
+ def _validate_efp_directory(self):
+ if self.base_dir_name[-1] is not EmptyFilePool.EFP_DIR_SUFFIX:
+ raise qls.err.InvalidEfpDirectoryNameError(self.directory)
+ try:
+ self.data_size_kb = int(os.path.basename(self.base_dir_name)[:-1])
+ except ValueError:
+ raise qls.err.InvalidEfpDirectoryNameError(self.directory)
+ def _validate_efp_file(self, efp_file):
+ file_size = os.path.getsize(efp_file)
+ expected_file_size = (self.data_size_kb * 1024) + qls.utils.DEFAULT_SBLK_SIZE
+ if file_size != expected_file_size:
+ print 'WARNING: File %s not of correct size (size=%d, expected=%d): Ignoring' % (efp_file, file_size,
+ expected_file_size)
+ return False
+ file_handle = open(efp_file)
+ args = qls.utils.load_args(file_handle, qls.jrnl.RecordHeader)
+ file_hdr = qls.jrnl.FileHeader(*args)
+ file_hdr.init(file_handle, *qls.utils.load_args(file_handle, qls.jrnl.FileHeader))
+ if not file_hdr.is_header_valid(file_hdr):
+ file_handle.close()
+ return False
+ file_hdr.load(file_handle)
+ file_handle.close()
+ if not file_hdr.is_valid():
+ return False
+ return True
+
# =============================================================================
diff --git a/qpid/tools/src/py/qls/err.py b/qpid/tools/src/py/qls/err.py
index 702fbb9520..bceaf041c9 100644
--- a/qpid/tools/src/py/qls/err.py
+++ b/qpid/tools/src/py/qls/err.py
@@ -17,12 +17,20 @@
# under the License.
#
+"""
+Module: qls.err
+
+Contains error classes.
+"""
+
# --- Parent classes
class QlsError(Exception):
"""Base error class for QLS errors and exceptions"""
def __init__(self):
Exception.__init__(self)
+ def __str__(self):
+ return ''
class QlsRecordError(QlsError):
"""Base error class for individual records"""
@@ -92,10 +100,22 @@ class FirstRecordOffsetMismatchError(QlsRecordError):
def __str__(self):
return 'First record offset mismatch: ' + QlsRecordError.__str__(self) + ' expected_offset=0x%x' % \
self.file_header.first_record_offset
-
+
+class InsufficientSpaceOnDiskError(QlsError):
+ """Insufficient space on disk"""
+ def __init__(self, directory, space_avail, space_requried):
+ QlsError.__init__(self)
+ self.directory = directory
+ self.space_avail = space_avail
+ self.space_required = space_requried
+ def __str__(self):
+ return 'Insufficient space on disk: directory=%s; avail_space=%d required_space=%d' % \
+ (self.directory, self.space_avail, self.space_required)
+
class InvalidClassError(QlsError):
"""Invalid class name or type"""
def __init__(self, class_name):
+ QlsError.__init__(self)
self.class_name = class_name
def __str__(self):
return 'Invalid class name "%s"' % self.class_name
@@ -108,6 +128,14 @@ class InvalidEfpDirectoryNameError(QlsError):
def __str__(self):
return 'Invalid EFP directory name "%s"' % self.directory_name
+#class InvalidFileSizeString(QlsError):
+# """Invalid file size string"""
+# def __init__(self, file_size_string):
+# QlsError.__init__(self)
+# self.file_size_string = file_size_string
+# def __str__(self):
+# return 'Invalid file size string "%s"' % self.file_size_string
+
class InvalidPartitionDirectoryNameError(QlsError):
"""Invalid EFP partition name - should be pNNN, where NNN is a 3-digit partition number"""
def __init__(self, directory_name):
@@ -158,6 +186,30 @@ class NonTransactionalRecordError(QlsRecordError):
return 'Transactional operation on non-transactional record: ' + QlsRecordError.__str__() + \
' operation=%s' % self.operation
+class PartitionDoesNotExistError(QlsError):
+ """Partition name does not exist on disk"""
+ def __init__(self, partition_directory):
+ QlsError.__init__(self)
+ self.partition_directory = partition_directory
+ def __str__(self):
+ return 'Partition %s does not exist' % self.partition_directory
+
+class PoolDirectoryAlreadyExistsError(QlsError):
+ """Pool directory already exists"""
+ def __init__(self, pool_directory):
+ QlsError.__init__(self)
+ self.pool_directory = pool_directory
+ def __str__(self):
+ return 'Pool directory %s already exists' % self.pool_directory
+
+class PoolDirectoryDoesNotExistError(QlsError):
+ """Pool directory does not exist"""
+ def __init__(self, pool_directory):
+ QlsError.__init__(self)
+ self.pool_directory = pool_directory
+ def __str__(self):
+ return 'Pool directory %s does not exist' % self.pool_directory
+
class RecordIdNotFoundError(QlsRecordError):
"""Record Id not found in enqueue map"""
def __init__(self, file_header, record):
@@ -184,6 +236,14 @@ class UnexpectedEndOfFileError(QlsError):
return 'Tried to read %d at offset %d in file "%s"; only read %d' % \
(self.size_read, self.file_offset, self.file_name, self.size_expected)
+class WritePermissionError(QlsError):
+ """No write permission"""
+ def __init__(self, directory):
+ QlsError.__init__(self)
+ self.directory = directory
+ def __str__(self):
+ return 'No write permission in directory %s' % self.directory
+
class XidSizeError(QlsError):
"""Error class for Xid size mismatch"""
def __init__(self, expected_size, actual_size, xid_str):
diff --git a/qpid/tools/src/py/qls/jrnl.py b/qpid/tools/src/py/qls/jrnl.py
index 5bce78bfad..f4fb16ef9f 100644
--- a/qpid/tools/src/py/qls/jrnl.py
+++ b/qpid/tools/src/py/qls/jrnl.py
@@ -17,567 +17,17 @@
# under the License.
#
-import os
-import os.path
+"""
+Module: qls.jrnl
+
+Contains journal record classes.
+"""
+
import qls.err
+import qls.utils
import string
import struct
-from time import gmtime, strftime
-import zlib
-
-class HighCounter(object):
- def __init__(self):
- self.num = 0
- def check(self, num):
- if self.num < num:
- self.num = num
- def get(self):
- return self.num
- def get_next(self):
- self.num += 1
- return self.num
-
-class JournalRecoveryManager(object):
- TPL_DIR_NAME = 'tpl'
- JRNL_DIR_NAME = 'jrnl'
- def __init__(self, directory, args):
- if not os.path.exists(directory):
- raise qls.err.InvalidQlsDirectoryNameError(directory)
- self.directory = directory
- self.args = args
- self.tpl = None
- self.journals = {}
- self.high_rid_counter = HighCounter()
- def report(self, print_stats_flag):
- if self.tpl is not None:
- self.tpl.report(print_stats_flag)
- for queue_name in sorted(self.journals.keys()):
- self.journals[queue_name].report(print_stats_flag)
- def run(self, args):
- tpl_dir = os.path.join(self.directory, JournalRecoveryManager.TPL_DIR_NAME)
- if os.path.exists(tpl_dir):
- self.tpl = Journal(tpl_dir, None, self.args)
- self.tpl.recover(self.high_rid_counter)
- print
- jrnl_dir = os.path.join(self.directory, JournalRecoveryManager.JRNL_DIR_NAME)
- prepared_list = self.tpl.txn_map.get_prepared_list() if self.tpl is not None else {}
- if os.path.exists(jrnl_dir):
- for dir_entry in sorted(os.listdir(jrnl_dir)):
- jrnl = Journal(os.path.join(jrnl_dir, dir_entry), prepared_list, self.args)
- jrnl.recover(self.high_rid_counter)
- self.journals[jrnl.get_queue_name()] = jrnl
- print
- self._reconcile_transactions(prepared_list, args.txn)
- def _reconcile_transactions(self, prepared_list, txn_flag):
- print 'Transaction reconciliation report:'
- print '=================================='
- print len(prepared_list), 'open transaction(s) found in prepared transaction list:'
- for xid in prepared_list.keys():
- commit_flag = prepared_list[xid]
- if commit_flag is None:
- status = '[Prepared, neither committed nor aborted - assuming commit]'
- elif commit_flag:
- status = '[Prepared, but interrupted during commit phase]'
- else:
- status = '[Prepared, but interrupted during abort phase]'
- print ' ', Utils.format_xid(xid), status
- if prepared_list[xid] is None: # Prepared, but not committed or aborted
- enqueue_record = self.tpl.get_txn_map_record(xid)[0][1]
- dequeue_record = Utils.create_record('QLSd', DequeueRecord.TXN_COMPLETE_COMMIT_FLAG, \
- self.tpl.current_journal_file, self.high_rid_counter.get_next(), \
- enqueue_record.record_id, xid, None)
- if txn_flag:
- self.tpl.add_record(dequeue_record)
- for queue_name in sorted(self.journals.keys()):
- self.journals[queue_name].reconcile_transactions(prepared_list, txn_flag)
- if len(prepared_list) > 0:
- print 'Completing prepared transactions in prepared transaction list:'
- for xid in prepared_list.keys():
- print ' ', Utils.format_xid(xid)
- transaction_record = Utils.create_record('QLSc', 0, self.tpl.current_journal_file, \
- self.high_rid_counter.get_next(), None, xid, None)
- if txn_flag:
- self.tpl.add_record(transaction_record)
- print
-
-class EnqueueMap(object):
- """
- Map of enqueued records in a QLS journal
- """
- def __init__(self, journal):
- self.journal = journal
- self.enq_map = {}
- def add(self, journal_file, enq_record, locked_flag):
- if enq_record.record_id in self.enq_map:
- raise qls.err.DuplicateRecordIdError(self.journal.current_file_header, enq_record)
- self.enq_map[enq_record.record_id] = [journal_file, enq_record, locked_flag]
- def contains(self, rid):
- """Return True if the map contains the given rid"""
- return rid in self.enq_map
- def delete(self, journal_file, deq_record):
- if deq_record.dequeue_record_id in self.enq_map:
- enq_list = self.enq_map[deq_record.dequeue_record_id]
- del self.enq_map[deq_record.dequeue_record_id]
- return enq_list
- else:
- raise qls.err.RecordIdNotFoundError(journal_file.file_header, deq_record)
- def get(self, record_id):
- if record_id in self.enq_map:
- return self.enq_map[record_id]
- return None
- def lock(self, journal_file, dequeue_record):
- if dequeue_record.dequeue_record_id not in self.enq_map:
- raise qls.err.RecordIdNotFoundError(journal_file.file_header, dequeue_record)
- self.enq_map[dequeue_record.dequeue_record_id][2] = True
- def report_str(self, _, show_records):
- """Return a string containing a text report for all records in the map"""
- if len(self.enq_map) == 0:
- return 'No enqueued records found.'
- rstr = '%d enqueued records found' % len(self.enq_map)
- if show_records:
- rstr += ":"
- rid_list = self.enq_map.keys()
- rid_list.sort()
- for rid in rid_list:
- journal_file, record, locked_flag = self.enq_map[rid]
- if locked_flag:
- lock_str = '[LOCKED]'
- else:
- lock_str = ''
- rstr += '\n 0x%x:%s %s' % (journal_file.file_header.file_num, record, lock_str)
- else:
- rstr += '.'
- return rstr
- def unlock(self, journal_file, dequeue_record):
- """Set the transaction lock for a given record_id to False"""
- if dequeue_record.dequeue_record_id in self.enq_map:
- if self.enq_map[dequeue_record.dequeue_record_id][2]:
- self.enq_map[dequeue_record.dequeue_record_id][2] = False
- else:
- raise qls.err.RecordNotLockedError(journal_file.file_header, dequeue_record)
- else:
- raise qls.err.RecordIdNotFoundError(journal_file.file_header, dequeue_record)
-
-class TransactionMap(object):
- """
- Map of open transactions used while recovering a QLS journal
- """
- def __init__(self, enq_map):
- self.txn_map = {}
- self.enq_map = enq_map
- def abort(self, xid):
- """Perform an abort operation for the given xid record"""
- for journal_file, record, _ in self.txn_map[xid]:
- if isinstance(record, DequeueRecord):
- if self.enq_map.contains(record.dequeue_record_id):
- self.enq_map.unlock(journal_file, record)
- else:
- journal_file.decr_enq_cnt(record)
- del self.txn_map[xid]
- def add(self, journal_file, record):
- if record.xid is None:
- raise qls.err.NonTransactionalRecordError(journal_file.file_header, record, 'TransactionMap.add()')
- if isinstance(record, DequeueRecord):
- try:
- self.enq_map.lock(journal_file, record)
- except qls.err.RecordIdNotFoundError:
- # Not in emap, look for rid in tmap - should not happen in practice
- txn_op = self._find_record_id(record.xid, record.dequeue_record_id)
- if txn_op != None:
- if txn_op[2]:
- raise qls.err.AlreadyLockedError(journal_file.file_header, record)
- txn_op[2] = True
- if record.xid in self.txn_map:
- self.txn_map[record.xid].append([journal_file, record, False]) # append to existing list
- else:
- self.txn_map[record.xid] = [[journal_file, record, False]] # create new list
- def commit(self, xid):
- """Perform a commit operation for the given xid record"""
- mismatch_list = []
- for journal_file, record, lock in self.txn_map[xid]:
- if isinstance(record, EnqueueRecord):
- self.enq_map.add(journal_file, record, lock) # Transfer enq to emap
- else:
- if self.enq_map.contains(record.dequeue_record_id):
- self.enq_map.unlock(journal_file, record)
- self.enq_map.delete(journal_file, record)[0].decr_enq_cnt(record)
- else:
- mismatch_list.append('0x%x' % record.dequeue_record_id)
- del self.txn_map[xid]
- return mismatch_list
- def contains(self, xid):
- """Return True if the xid exists in the map; False otherwise"""
- return xid in self.txn_map
- def delete(self, journal_file, transaction_record):
- """Remove a transaction record from the map using either a commit or abort header"""
- if transaction_record.magic[-1] == 'c':
- return self.commit(transaction_record.xid)
- if transaction_record.magic[-1] == 'a':
- self.abort(transaction_record.xid)
- else:
- raise qls.err.InvalidRecordTypeError(journal_file.file_header, transaction_record,
- 'delete from Transaction Map')
- def get(self, xid):
- if xid in self.txn_map:
- return self.txn_map[xid]
- return None
- def get_prepared_list(self):
- """
- Prepared list is a map of xid(key) to one of None, True or False. These represent respectively:
- None: prepared, but neither committed or aborted (interrupted before commit or abort)
- False: prepared and aborted (interrupted before abort complete)
- True: prepared and committed (interrupted before commit complete)
- """
- prepared_list = {}
- for xid in self.get_xid_list():
- for _, record, _ in self.txn_map[xid]:
- if isinstance(record, EnqueueRecord):
- prepared_list[xid] = None
- else:
- prepared_list[xid] = record.is_transaction_complete_commit()
- return prepared_list
- def get_xid_list(self):
- return self.txn_map.keys()
- def report_str(self, _, show_records):
- """Return a string containing a text report for all records in the map"""
- if len(self.txn_map) == 0:
- return 'No outstanding transactions found.'
- rstr = '%d outstanding transaction(s)' % len(self.txn_map)
- if show_records:
- rstr += ':'
- for xid, op_list in self.txn_map.iteritems():
- rstr += '\n %s containing %d operations:' % (Utils.format_xid(xid), len(op_list))
- for journal_file, record, _ in op_list:
- rstr += '\n 0x%x:%s' % (journal_file.file_header.file_num, record)
- else:
- rstr += '.'
- return rstr
- def _find_record_id(self, xid, record_id):
- """ Search for and return map list with supplied rid."""
- if xid in self.txn_map:
- for txn_op in self.txn_map[xid]:
- if txn_op[1].record_id == record_id:
- return txn_op
- for this_xid in self.txn_map.iterkeys():
- for txn_op in self.txn_map[this_xid]:
- if txn_op[1].record_id == record_id:
- return txn_op
- return None
-
-class JournalStatistics(object):
- """Journal statistics"""
- def __init__(self):
- self.total_record_count = 0
- self.transient_record_count = 0
- self.filler_record_count = 0
- self.enqueue_count = 0
- self.dequeue_count = 0
- self.transaction_record_count = 0
- self.transaction_enqueue_count = 0
- self.transaction_dequeue_count = 0
- self.transaction_commit_count = 0
- self.transaction_abort_count = 0
- self.transaction_operation_count = 0
- def __str__(self):
- fstr = 'Total record count: %d\n' + \
- 'Transient record count: %d\n' + \
- 'Filler_record_count: %d\n' + \
- 'Enqueue_count: %d\n' + \
- 'Dequeue_count: %d\n' + \
- 'Transaction_record_count: %d\n' + \
- 'Transaction_enqueue_count: %d\n' + \
- 'Transaction_dequeue_count: %d\n' + \
- 'Transaction_commit_count: %d\n' + \
- 'Transaction_abort_count: %d\n' + \
- 'Transaction_operation_count: %d\n'
- return fstr % (self.total_record_count,
- self.transient_record_count,
- self.filler_record_count,
- self.enqueue_count,
- self.dequeue_count,
- self.transaction_record_count,
- self.transaction_enqueue_count,
- self.transaction_dequeue_count,
- self.transaction_commit_count,
- self.transaction_abort_count,
- self.transaction_operation_count)
-
-class Journal(object):
- """
- Instance of a Qpid Linear Store (QLS) journal.
- """
- def __init__(self, directory, xid_prepared_list, args):
- self.directory = directory
- self.queue_name = os.path.basename(directory)
- self.files = {}
- self.file_num_list = None
- self.file_num_itr = None
- self.enq_map = EnqueueMap(self)
- self.txn_map = TransactionMap(self.enq_map)
- self.current_journal_file = None
- self.first_rec_flag = None
- self.statistics = JournalStatistics()
- self.xid_prepared_list = xid_prepared_list # This is None for the TPL instance only
- self.args = args
- self.last_record_offset = None # TODO: Move into JournalFile
- self.num_filler_records_required = None # TODO: Move into JournalFile
- self.fill_to_offset = None
- def add_record(self, record):
- if isinstance(record, EnqueueRecord) or isinstance(record, DequeueRecord):
- if record.xid_size > 0:
- self.txn_map.add(self.current_journal_file, record)
- else:
- self.enq_map.add(self.current_journal_file, record, False)
- elif isinstance(record, TransactionRecord):
- self.txn_map.delete(self.current_journal_file, record)
- else:
- raise qls.err.InvalidRecordTypeError(self.current_journal_file, record, 'add to Journal')
- def get_enq_map_record(self, rid):
- return self.enq_map.get(rid)
- def get_txn_map_record(self, xid):
- return self.txn_map.get(xid)
- def get_outstanding_txn_list(self):
- return self.txn_map.get_xid_list()
- def get_queue_name(self):
- return self.queue_name
- def recover(self, high_rid_counter):
- print 'Recovering %s' % self.queue_name
- self._analyze_files()
- try:
- while self._get_next_record(high_rid_counter):
- pass
- self._check_alignment()
- except qls.err.NoMoreFilesInJournalError:
- print 'No more files in journal'
- except qls.err.FirstRecordOffsetMismatchError as err:
- print '0x%08x: **** FRO ERROR: queue=\"%s\" fid=0x%x fro actual=0x%08x expected=0x%08x' % \
- (err.get_expected_fro(), err.get_queue_name(), err.get_file_number(), err.get_record_offset(),
- err.get_expected_fro())
- def reconcile_transactions(self, prepared_list, txn_flag):
- xid_list = self.txn_map.get_xid_list()
- if len(xid_list) > 0:
- print self.queue_name, 'contains', len(xid_list), 'open transaction(s):'
- for xid in xid_list:
- if xid in prepared_list.keys():
- commit_flag = prepared_list[xid]
- if commit_flag is None:
- print ' ', Utils.format_xid(xid), '- Assuming commit after prepare'
- if txn_flag:
- self.txn_map.commit(xid)
- elif commit_flag:
- print ' ', Utils.format_xid(xid), '- Completing interrupted commit operation'
- if txn_flag:
- self.txn_map.commit(xid)
- else:
- print ' ', Utils.format_xid(xid), '- Completing interrupted abort operation'
- if txn_flag:
- self.txn_map.abort(xid)
- else:
- print ' ', Utils.format_xid(xid), '- Ignoring, not in prepared transaction list'
- if txn_flag:
- self.txn_map.abort(xid)
- def report(self, print_stats_flag):
- print 'Journal "%s":' % self.queue_name
- print '=' * (11 + len(self.queue_name))
- if print_stats_flag:
- print str(self.statistics)
- print self.enq_map.report_str(True, True)
- print self.txn_map.report_str(True, True)
- JournalFile.report_header()
- for file_num in sorted(self.files.keys()):
- self.files[file_num].report()
- #TODO: move this to JournalFile, append to file info
- if self.num_filler_records_required is not None and self.fill_to_offset is not None:
- print '0x%x:0x%08x: %d filler records required for DBLK alignment to 0x%08x' % \
- (self.current_journal_file.file_header.file_num, self.last_record_offset,
- self.num_filler_records_required, self.fill_to_offset)
- print
- #--- protected functions ---
- def _analyze_files(self):
- for dir_entry in os.listdir(self.directory):
- dir_entry_bits = dir_entry.split('.')
- if len(dir_entry_bits) == 2 and dir_entry_bits[1] == JournalRecoveryManager.JRNL_DIR_NAME:
- fq_file_name = os.path.join(self.directory, dir_entry)
- file_handle = open(fq_file_name)
- args = Utils.load_args(file_handle, RecordHeader)
- file_hdr = FileHeader(*args)
- file_hdr.init(file_handle, *Utils.load_args(file_handle, FileHeader))
- if file_hdr.is_header_valid(file_hdr):
- file_hdr.load(file_handle)
- if file_hdr.is_valid():
- Utils.skip(file_handle, file_hdr.file_header_size_sblks * Utils.SBLK_SIZE)
- self.files[file_hdr.file_num] = JournalFile(file_hdr)
- self.file_num_list = sorted(self.files.keys())
- self.file_num_itr = iter(self.file_num_list)
- def _check_alignment(self): # TODO: Move into JournalFile
- remaining_sblks = self.last_record_offset % Utils.SBLK_SIZE
- if remaining_sblks == 0:
- self.num_filler_records_required = 0
- else:
- self.num_filler_records_required = (Utils.SBLK_SIZE - remaining_sblks) / Utils.DBLK_SIZE
- self.fill_to_offset = self.last_record_offset + (self.num_filler_records_required * Utils.DBLK_SIZE)
- if self.args.show_recs or self.args.show_all_recs:
- print '0x%x:0x%08x: %d filler records required for DBLK alignment to 0x%08x' % \
- (self.current_journal_file.file_header.file_num, self.last_record_offset,
- self.num_filler_records_required, self.fill_to_offset)
- def _check_file(self):
- if self.current_journal_file is not None:
- if not self.current_journal_file.file_header.is_end_of_file():
- return True
- if self.current_journal_file.file_header.is_end_of_file():
- self.last_record_offset = self.current_journal_file.file_header.file_handle.tell()
- if not self._get_next_file():
- return False
- fhdr = self.current_journal_file.file_header
- fhdr.file_handle.seek(fhdr.first_record_offset)
- return True
- def _get_next_file(self):
- if self.current_journal_file is not None:
- file_handle = self.current_journal_file.file_header.file_handle
- if not file_handle.closed: # sanity check, should not be necessary
- file_handle.close()
- file_num = 0
- try:
- while file_num == 0:
- file_num = self.file_num_itr.next()
- except StopIteration:
- pass
- if file_num == 0:
- return False
- self.current_journal_file = self.files[file_num]
- self.first_rec_flag = True
- if self.args.show_recs or self.args.show_all_recs:
- print '0x%x:%s' % (self.current_journal_file.file_header.file_num, self.current_journal_file.file_header)
- return True
- def _get_next_record(self, high_rid_counter):
- if not self._check_file():
- return False
- self.last_record_offset = self.current_journal_file.file_header.file_handle.tell()
- this_record = Utils.load(self.current_journal_file.file_header.file_handle, RecordHeader)
- if not this_record.is_header_valid(self.current_journal_file.file_header):
- return False
- if self.first_rec_flag:
- if this_record.file_offset != self.current_journal_file.file_header.first_record_offset:
- raise qls.err.FirstRecordOffsetMismatchError(self.current_journal_file.file_header, this_record)
- self.first_rec_flag = False
- self.statistics.total_record_count += 1
- start_journal_file = self.current_journal_file
- if isinstance(this_record, EnqueueRecord):
- ok_flag = self._handle_enqueue_record(this_record, start_journal_file)
- high_rid_counter.check(this_record.record_id)
- if self.args.show_recs or self.args.show_all_recs:
- print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record)
- elif isinstance(this_record, DequeueRecord):
- ok_flag = self._handle_dequeue_record(this_record, start_journal_file)
- high_rid_counter.check(this_record.record_id)
- if self.args.show_recs or self.args.show_all_recs:
- print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record)
- elif isinstance(this_record, TransactionRecord):
- ok_flag = self._handle_transaction_record(this_record, start_journal_file)
- high_rid_counter.check(this_record.record_id)
- if self.args.show_recs or self.args.show_all_recs:
- print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record)
- else:
- self.statistics.filler_record_count += 1
- ok_flag = True
- if self.args.show_all_recs:
- print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record)
- Utils.skip(self.current_journal_file.file_header.file_handle, Utils.DBLK_SIZE)
- return ok_flag
- def _handle_enqueue_record(self, enqueue_record, start_journal_file):
- while enqueue_record.load(self.current_journal_file.file_header.file_handle):
- if not self._get_next_file():
- enqueue_record.truncated_flag = True
- return False
- if not enqueue_record.is_valid(start_journal_file):
- return False
- if enqueue_record.is_external() and enqueue_record.data != None:
- raise qls.err.ExternalDataError(self.current_journal_file.file_header, enqueue_record)
- if enqueue_record.is_transient():
- self.statistics.transient_record_count += 1
- return True
- if enqueue_record.xid_size > 0:
- self.txn_map.add(start_journal_file, enqueue_record)
- self.statistics.transaction_operation_count += 1
- self.statistics.transaction_record_count += 1
- self.statistics.transaction_enqueue_count += 1
- else:
- self.enq_map.add(start_journal_file, enqueue_record, False)
- start_journal_file.incr_enq_cnt()
- self.statistics.enqueue_count += 1
- return True
- def _handle_dequeue_record(self, dequeue_record, start_journal_file):
- while dequeue_record.load(self.current_journal_file.file_header.file_handle):
- if not self._get_next_file():
- dequeue_record.truncated_flag = True
- return False
- if not dequeue_record.is_valid(start_journal_file):
- return False
- if dequeue_record.xid_size > 0:
- if self.xid_prepared_list is None: # ie this is the TPL
- dequeue_record.transaction_prepared_list_flag = True
- elif not self.enq_map.contains(dequeue_record.dequeue_record_id):
- dequeue_record.warnings.append('NOT IN EMAP') # Only for non-TPL records
- self.txn_map.add(start_journal_file, dequeue_record)
- self.statistics.transaction_operation_count += 1
- self.statistics.transaction_record_count += 1
- self.statistics.transaction_dequeue_count += 1
- else:
- try:
- self.enq_map.delete(start_journal_file, dequeue_record)[0].decr_enq_cnt(dequeue_record)
- except qls.err.RecordIdNotFoundError:
- dequeue_record.warnings.append('NOT IN EMAP')
- self.statistics.dequeue_count += 1
- return True
- def _handle_transaction_record(self, transaction_record, start_journal_file):
- while transaction_record.load(self.current_journal_file.file_header.file_handle):
- if not self._get_next_file():
- transaction_record.truncated_flag = True
- return False
- if not transaction_record.is_valid(start_journal_file):
- return False
- if transaction_record.magic[-1] == 'a':
- self.statistics.transaction_abort_count += 1
- else:
- self.statistics.transaction_commit_count += 1
- if self.txn_map.contains(transaction_record.xid):
- self.txn_map.delete(self.current_journal_file, transaction_record)
- else:
- transaction_record.warnings.append('NOT IN TMAP')
-# if transaction_record.magic[-1] == 'c': # commits only
-# self._txn_obj_list[hdr.xid] = hdr
- self.statistics.transaction_record_count += 1
- return True
- def _load_data(self, record):
- while not record.is_complete:
- record.load(self.current_journal_file.file_handle)
-
-class JournalFile(object):
- def __init__(self, file_header):
- self.file_header = file_header
- self.enq_cnt = 0
- self.deq_cnt = 0
- self.num_filler_records_required = None
- def incr_enq_cnt(self):
- self.enq_cnt += 1
- def decr_enq_cnt(self, record):
- if self.enq_cnt <= self.deq_cnt:
- raise qls.err.EnqueueCountUnderflowError(self.file_header, record)
- self.deq_cnt += 1
- def get_enq_cnt(self):
- return self.enq_cnt - self.deq_cnt
- def is_outstanding_enq(self):
- return self.enq_cnt > self.deq_cnt
- @staticmethod
- def report_header():
- print 'file_num enq_cnt p_no efp journal_file'
- print '-------- ------- ---- ----- ------------'
- def report(self):
- comment = '<uninitialized>' if self.file_header.file_num == 0 else ''
- file_num_str = '0x%x' % self.file_header.file_num
- print '%8s %7d %4d %4dk %s %s' % (file_num_str, self.get_enq_cnt(), self.file_header.partition_num,
- self.file_header.efp_data_size_kb,
- os.path.basename(self.file_header.file_handle.name), comment)
+import time
class RecordHeader(object):
FORMAT = '<4s2H2Q'
@@ -590,14 +40,14 @@ class RecordHeader(object):
self.record_id = record_id
self.warnings = []
self.truncated_flag = False
- def checksum_encode(self):
+ def encode(self):
return struct.pack(RecordHeader.FORMAT, self.magic, self.version, self.user_flags, self.serial, self.record_id)
def load(self, file_handle):
pass
@staticmethod
def discriminate(args):
"""Use the last char in the header magic to determine the header type"""
- return _CLASSES.get(args[1][-1], RecordHeader)
+ return CLASSES.get(args[1][-1], RecordHeader)
def is_empty(self):
"""Return True if this record is empty (ie has a magic of 0x0000"""
return self.magic == '\x00'*4
@@ -608,17 +58,12 @@ class RecordHeader(object):
if self.magic[:3] != 'QLS' or self.magic[3] not in ['a', 'c', 'd', 'e', 'f', 'x']:
return False
if self.magic[-1] != 'x':
- if self.version != Utils.RECORD_VERSION:
- raise qls.err.InvalidRecordVersionError(file_header, self, Utils.RECORD_VERSION)
+ if self.version != qls.utils.DEFAULT_RECORD_VERSION:
+ raise qls.err.InvalidRecordVersionError(file_header, self, qls.utils.DEFAULT_RECORD_VERSION)
if self.serial != file_header.serial:
return False
return True
- def _get_warnings(self):
- warn_str = ''
- for warn in self.warnings:
- warn_str += '<%s>' % warn
- return warn_str
- def __str__(self):
+ def to_string(self):
"""Return string representation of this header"""
if self.is_empty():
return '0x%08x: <empty>' % (self.file_offset)
@@ -628,6 +73,14 @@ class RecordHeader(object):
return '0x%08x: [%c v=%d f=0x%04x rid=0x%x]' % \
(self.file_offset, self.magic[-1].upper(), self.version, self.user_flags, self.record_id)
return '0x%08x: <error, unknown magic "%s" (possible overwrite boundary?)>' % (self.file_offset, self.magic)
+ def _get_warnings(self):
+ warn_str = ''
+ for warn in self.warnings:
+ warn_str += '<%s>' % warn
+ return warn_str
+ def __str__(self):
+ """Return string representation of this header"""
+ return RecordHeader.to_string(self)
class RecordTail(object):
FORMAT = '<4sL2Q'
@@ -653,24 +106,28 @@ class RecordTail(object):
if self.valid_flag is None:
if not self.complete:
return False
- self.valid_flag = Utils.inv_str(self.xmagic) == record.magic and \
+ self.valid_flag = qls.utils.inv_str(self.xmagic) == record.magic and \
self.serial == record.serial and \
self.record_id == record.record_id and \
- Utils.adler32(record.checksum_encode()) == self.checksum
+ qls.utils.adler32(record.checksum_encode()) == self.checksum
return self.valid_flag
- def __str__(self):
+ def to_string(self):
"""Return a string representation of the this RecordTail instance"""
if self.valid_flag is not None:
if not self.valid_flag:
return '[INVALID RECORD TAIL]'
- magic = Utils.inv_str(self.xmagic)
+ magic = qls.utils.inv_str(self.xmagic)
magic_char = magic[-1].upper() if magic[-1] in string.printable else '?'
return '[%c cs=0x%08x rid=0x%x]' % (magic_char, self.checksum, self.record_id)
+ def __str__(self):
+ """Return a string representation of the this RecordTail instance"""
+ return RecordTail.to_string(self)
class FileHeader(RecordHeader):
FORMAT = '<2H4x5QH'
- def init(self, file_handle, _, file_header_size_sblks, partition_num, efp_data_size_kb,
- first_record_offset, timestamp_sec, timestamp_ns, file_num, queue_name_len):
+ MAGIC = 'QLSf'
+ def init(self, file_handle, _, file_header_size_sblks, partition_num, efp_data_size_kb, first_record_offset,
+ timestamp_sec, timestamp_ns, file_num, queue_name_len):
self.file_handle = file_handle
self.file_header_size_sblks = file_header_size_sblks
self.partition_num = partition_num
@@ -681,11 +138,21 @@ class FileHeader(RecordHeader):
self.file_num = file_num
self.queue_name_len = queue_name_len
self.queue_name = None
- def load(self, file_handle):
- self.queue_name = file_handle.read(self.queue_name_len)
+ def encode(self):
+ if self.queue_name is None:
+ return RecordHeader.encode(self) + struct.pack(self.FORMAT, self.file_header_size_sblks, \
+ self.partition_num, self.efp_data_size_kb, \
+ self.first_record_offset, self.timestamp_sec, \
+ self.timestamp_ns, self.file_num, 0)
+ return RecordHeader.encode(self) + struct.pack(self.FORMAT, self.file_header_size_sblks, self.partition_num, \
+ self.efp_data_size_kb, self.first_record_offset, \
+ self.timestamp_sec, self.timestamp_ns, self.file_num, \
+ self.queue_name_len) + self.queue_name
def get_file_size(self):
"""Sum of file header size and data size"""
- return (self.file_header_size_sblks * Utils.SBLK_SIZE) + (self.efp_data_size_kb * 1024)
+ return (self.file_header_size_sblks * qls.utils.DEFAULT_SBLK_SIZE) + (self.efp_data_size_kb * 1024)
+ def load(self, file_handle):
+ self.queue_name = file_handle.read(self.queue_name_len)
def is_end_of_file(self):
return self.file_handle.tell() >= self.get_file_size()
def is_valid(self):
@@ -704,18 +171,22 @@ class FileHeader(RecordHeader):
return True
def timestamp_str(self):
"""Get the timestamp of this record in string format"""
- time = gmtime(self.timestamp_sec)
+ now = time.gmtime(self.timestamp_sec)
fstr = '%%a %%b %%d %%H:%%M:%%S.%09d %%Y' % (self.timestamp_ns)
- return strftime(fstr, time)
- def __str__(self):
+ return time.strftime(fstr, now)
+ def to_string(self):
"""Return a string representation of the this FileHeader instance"""
- return '%s fnum=0x%x fro=0x%08x p=%d s=%dk t=%s %s' % (RecordHeader.__str__(self), self.file_num,
+ return '%s fnum=0x%x fro=0x%08x p=%d s=%dk t=%s %s' % (RecordHeader.to_string(self), self.file_num,
self.first_record_offset, self.partition_num,
self.efp_data_size_kb, self.timestamp_str(),
self._get_warnings())
+ def __str__(self):
+ """Return a string representation of the this FileHeader instance"""
+ return FileHeader.to_string(self)
class EnqueueRecord(RecordHeader):
FORMAT = '<2Q'
+ MAGIC = 'QLSe'
EXTERNAL_FLAG_MASK = 0x20
TRANSIENT_FLAG_MASK = 0x10
def init(self, _, xid_size, data_size):
@@ -726,9 +197,13 @@ class EnqueueRecord(RecordHeader):
self.data = None
self.data_complete = False
self.record_tail = None
- def checksum_encode(self):
- return RecordHeader.checksum_encode(self) + struct.pack(self.FORMAT, self.xid_size, self.data_size) + \
- self.xid + self.data
+ def checksum_encode(self): # encode excluding record tail
+ bytes = RecordHeader.encode(self) + struct.pack(self.FORMAT, self.xid_size, self.data_size)
+ if self.xid is not None:
+ bytes += self.xid
+ if self.data is not None:
+ bytes += self.data
+ return bytes
def is_external(self):
return self.user_flags & EnqueueRecord.EXTERNAL_FLAG_MASK > 0
def is_transient(self):
@@ -750,13 +225,13 @@ class EnqueueRecord(RecordHeader):
return True
def load(self, file_handle):
"""Return True when load is incomplete and must be called again with new file handle"""
- self.xid, self.xid_complete = Utils.load_data(file_handle, self.xid, self.xid_size)
+ self.xid, self.xid_complete = qls.utils.load_data(file_handle, self.xid, self.xid_size)
if not self.xid_complete:
return True
if self.is_external():
self.data_complete = True
else:
- self.data, self.data_complete = Utils.load_data(file_handle, self.data, self.data_size)
+ self.data, self.data_complete = qls.utils.load_data(file_handle, self.data, self.data_size)
if not self.data_complete:
return True
if self.xid_size > 0 or self.data_size > 0:
@@ -769,6 +244,19 @@ class EnqueueRecord(RecordHeader):
else:
return True
return False
+ def to_string(self, show_xid_flag, show_data_flag):
+ """Return a string representation of the this EnqueueRecord instance"""
+ if self.truncated_flag:
+ return '%s xid(%d) data(%d) [Truncated, no more files in journal]' % (RecordHeader.__str__(self),
+ self.xid_size, self.data_size)
+ if self.record_tail is None:
+ record_tail_str = ''
+ else:
+ record_tail_str = self.record_tail.to_string()
+ return '%s %s %s %s %s %s' % (RecordHeader.to_string(self),
+ qls.utils.format_xid(self.xid, self.xid_size, show_xid_flag),
+ qls.utils.format_data(self.data, self.data_size, show_data_flag),
+ record_tail_str, self._print_flags(), self._get_warnings())
def _print_flags(self):
"""Utility function to decode the flags field in the header and print a string representation"""
fstr = ''
@@ -784,19 +272,11 @@ class EnqueueRecord(RecordHeader):
return fstr
def __str__(self):
"""Return a string representation of the this EnqueueRecord instance"""
- if self.truncated_flag:
- return '%s xid(%d) data(%d) [Truncated, no more files in journal]' % (RecordHeader.__str__(self),
- self.xid_size, self.data_size)
- if self.record_tail is None:
- record_tail_str = ''
- else:
- record_tail_str = str(self.record_tail)
- return '%s %s %s %s %s %s' % (RecordHeader.__str__(self), Utils.format_xid(self.xid, self.xid_size),
- Utils.format_data(self.data_size, self.data), record_tail_str,
- self._print_flags(), self._get_warnings())
+ return EnqueueRecord.to_string(self, False, False)
class DequeueRecord(RecordHeader):
FORMAT = '<2Q'
+ MAGIC = 'QLSd'
TXN_COMPLETE_COMMIT_FLAG = 0x10
def init(self, _, dequeue_record_id, xid_size):
self.dequeue_record_id = dequeue_record_id
@@ -805,8 +285,8 @@ class DequeueRecord(RecordHeader):
self.xid = None
self.xid_complete = False
self.record_tail = None
- def checksum_encode(self):
- return RecordHeader.checksum_encode(self) + struct.pack(self.FORMAT, self.dequeue_record_id, self.xid_size) + \
+ def checksum_encode(self): # encode excluding record tail
+ return RecordHeader.encode(self) + struct.pack(self.FORMAT, self.dequeue_record_id, self.xid_size) + \
self.xid
def is_transaction_complete_commit(self):
return self.user_flags & DequeueRecord.TXN_COMPLETE_COMMIT_FLAG > 0
@@ -825,7 +305,7 @@ class DequeueRecord(RecordHeader):
return True
def load(self, file_handle):
"""Return True when load is incomplete and must be called again with new file handle"""
- self.xid, self.xid_complete = Utils.load_data(file_handle, self.xid, self.xid_size)
+ self.xid, self.xid_complete = qls.utils.load_data(file_handle, self.xid, self.xid_size)
if not self.xid_complete:
return True
if self.xid_size > 0:
@@ -838,6 +318,19 @@ class DequeueRecord(RecordHeader):
else:
return True
return False
+ def to_string(self, show_xid_flag):
+ """Return a string representation of the this DequeueRecord instance"""
+ if self.truncated_flag:
+ return '%s xid(%d) drid=0x%x [Truncated, no more files in journal]' % (RecordHeader.__str__(self),
+ self.xid_size,
+ self.dequeue_record_id)
+ if self.record_tail is None:
+ record_tail_str = ''
+ else:
+ record_tail_str = self.record_tail.to_string()
+ return '%s drid=0x%x %s %s %s %s' % (RecordHeader.to_string(self), self.dequeue_record_id,
+ qls.utils.format_xid(self.xid, self.xid_size, show_xid_flag),
+ record_tail_str, self._print_flags(), self._get_warnings())
def _print_flags(self):
"""Utility function to decode the flags field in the header and print a string representation"""
if self.transaction_prepared_list_flag:
@@ -848,27 +341,19 @@ class DequeueRecord(RecordHeader):
return ''
def __str__(self):
"""Return a string representation of the this DequeueRecord instance"""
- if self.truncated_flag:
- return '%s xid(%d) drid=0x%x [Truncated, no more files in journal]' % (RecordHeader.__str__(self),
- self.xid_size,
- self.dequeue_record_id)
- if self.record_tail is None:
- record_tail_str = ''
- else:
- record_tail_str = str(self.record_tail)
- return '%s %s drid=0x%x %s %s %s' % (RecordHeader.__str__(self), Utils.format_xid(self.xid, self.xid_size),
- self.dequeue_record_id, record_tail_str, self._print_flags(),
- self._get_warnings())
+ return DequeueRecord.to_string(self, False)
class TransactionRecord(RecordHeader):
FORMAT = '<Q'
+ MAGIC_ABORT = 'QLSa'
+ MAGIC_COMMIT = 'QLSc'
def init(self, _, xid_size):
self.xid_size = xid_size
self.xid = None
self.xid_complete = False
self.record_tail = None
- def checksum_encode(self):
- return RecordHeader.checksum_encode(self) + struct.pack(self.FORMAT, self.xid_size) + self.xid
+ def checksum_encode(self): # encode excluding record tail
+ return RecordHeader.encode(self) + struct.pack(self.FORMAT, self.xid_size) + self.xid
def is_valid(self, journal_file):
if not RecordHeader.is_header_valid(self, journal_file.file_header):
return False
@@ -881,7 +366,7 @@ class TransactionRecord(RecordHeader):
return True
def load(self, file_handle):
"""Return True when load is incomplete and must be called again with new file handle"""
- self.xid, self.xid_complete = Utils.load_data(file_handle, self.xid, self.xid_size)
+ self.xid, self.xid_complete = qls.utils.load_data(file_handle, self.xid, self.xid_size)
if not self.xid_complete:
return True
if self.xid_size > 0:
@@ -894,168 +379,24 @@ class TransactionRecord(RecordHeader):
else:
return True
return False
- def __str__(self):
+ def to_string(self, show_xid_flag):
"""Return a string representation of the this TransactionRecord instance"""
if self.truncated_flag:
return '%s xid(%d) [Truncated, no more files in journal]' % (RecordHeader.__str__(self), self.xid_size)
if self.record_tail is None:
record_tail_str = ''
else:
- record_tail_str = str(self.record_tail)
- return '%s %s %s %s' % (RecordHeader.__str__(self), Utils.format_xid(self.xid, self.xid_size), record_tail_str,
- self._get_warnings())
-
-class Utils(object):
- """Class containing utility functions for dealing with the journal"""
- DBLK_SIZE = 128
- RECORD_VERSION = 2
- SBLK_SIZE = 4096
- @staticmethod
- def adler32(data):
- return zlib.adler32(data) & 0xffffffff
- @staticmethod
- def create_record(magic, uflags, journal_file, record_id, dequeue_record_id, xid, data):
- record_class = _CLASSES.get(magic[-1])
- record = record_class(0, magic, Utils.RECORD_VERSION, uflags, journal_file.file_header.serial, record_id)
- xid_length = len(xid) if xid is not None else 0
- if isinstance(record, EnqueueRecord):
- data_length = len(data) if data is not None else 0
- record.init(None, xid_length, data_length)
- elif isinstance(record, DequeueRecord):
- record.init(None, dequeue_record_id, xid_length)
- elif isinstance(record, TransactionRecord):
- record.init(None, xid_length)
- else:
- raise qls.err.InvalidClassError(record.__class__.__name__)
- if xid is not None:
- record.xid = xid
- record.xid_complete = True
- if data is not None:
- record.data = data
- record.data_complete = True
- record.record_tail = RecordTail(None)
- record.record_tail.xmagic = Utils.inv_str(magic)
- record.record_tail.checksum = Utils.adler32(record.checksum_encode())
- record.record_tail.serial = record.serial
- record.record_tail.record_id = record.record_id
- return record
- @staticmethod
- def format_data(dsize, data):
- """Format binary data for printing"""
- if data == None:
- return ''
- # << DEBUG >>
- begin = data.find('msg')
- end = data.find('\0', begin)
- return 'data(%d)="%s"' % (dsize, data[begin:end])
- # << END DEBUG
- if Utils._is_printable(data):
- datastr = Utils._split_str(data)
- else:
- datastr = Utils._hex_split_str(data)
- if dsize != len(data):
- raise qls.err.DataSizeError(dsize, len(data), datastr)
- return 'data(%d)="%s"' % (dsize, datastr)
- @staticmethod
- def format_xid(xid, xidsize=None):
- """Format binary XID for printing"""
- if xid == None and xidsize != None:
- if xidsize > 0:
- raise qls.err.XidSizeError(xidsize, 0, None)
- return ''
- if Utils._is_printable(xid):
- xidstr = '"%s"' % Utils._split_str(xid)
- else:
- xidstr = '0x%s' % Utils._hex_split_str(xid)
- if xidsize == None:
- xidsize = len(xid)
- elif xidsize != len(xid):
- raise qls.err.XidSizeError(xidsize, len(xid), xidstr)
- return 'xid(%d)=%s' % (xidsize, xidstr)
- @staticmethod
- def inv_str(in_string):
- """Perform a binary 1's compliment (invert all bits) on a binary string"""
- istr = ''
- for index in range(0, len(in_string)):
- istr += chr(~ord(in_string[index]) & 0xff)
- return istr
- @staticmethod
- def load(file_handle, klass):
- """Load a record of class klass from a file"""
- args = Utils.load_args(file_handle, klass)
- subclass = klass.discriminate(args)
- result = subclass(*args) # create instance of record
- if subclass != klass:
- result.init(*Utils.load_args(file_handle, subclass))
- return result
- @staticmethod
- def load_args(file_handle, klass):
- """Load the arguments from class klass"""
- size = struct.calcsize(klass.FORMAT)
- foffs = file_handle.tell(),
- fbin = file_handle.read(size)
- if len(fbin) != size:
- raise qls.err.UnexpectedEndOfFileError(len(fbin), size, foffs, file_handle.name)
- return foffs + struct.unpack(klass.FORMAT, fbin)
- @staticmethod
- def load_data(file_handle, element, element_size):
- if element_size == 0:
- return element, True
- if element is None:
- element = file_handle.read(element_size)
- else:
- read_size = element_size - len(element)
- element += file_handle.read(read_size)
- return element, len(element) == element_size
- @staticmethod
- def skip(file_handle, boundary):
- """Read and discard disk bytes until the next multiple of boundary"""
- if not file_handle.closed:
- file_handle.read(Utils._rem_bytes_in_block(file_handle, boundary))
- #--- protected functions ---
- @staticmethod
- def _hex_str(in_str, begin, end):
- """Return a binary string as a hex string"""
- hstr = ''
- for index in range(begin, end):
- if Utils._is_printable(in_str[index]):
- hstr += in_str[index]
- else:
- hstr += '\\%02x' % ord(in_str[index])
- return hstr
- @staticmethod
- def _hex_split_str(in_str):#, split_size = 50):
- """Split a hex string into two parts separated by an ellipsis"""
-# if len(in_str) <= split_size:
-# return Utils._hex_str(in_str, 0, len(in_str))
-# return Utils._hex_str(in_str, 0, 10) + ' ... ' + Utils._hex_str(in_str, len(in_str)-10, len(in_str))
- return ''.join(x.encode('hex') for x in reversed(in_str))
- @staticmethod
- def _is_printable(in_str):
- """Return True if in_str in printable; False otherwise."""
- for this_char in in_str:
- if this_char not in string.printable:
- return False
- return True
- @staticmethod
- def _rem_bytes_in_block(file_handle, block_size):
- """Return the remaining bytes in a block"""
- foffs = file_handle.tell()
- return (Utils._size_in_blocks(foffs, block_size) * block_size) - foffs
- @staticmethod
- def _size_in_blocks(size, block_size):
- """Return the size in terms of data blocks"""
- return int((size + block_size - 1) / block_size)
- @staticmethod
- def _split_str(in_str, split_size = 50):
- """Split a string into two parts separated by an ellipsis if it is longer than split_size"""
- if len(in_str) < split_size:
- return in_str
- return in_str[:25] + ' ... ' + in_str[-25:]
+ record_tail_str = self.record_tail.to_string()
+ return '%s %s %s %s' % (RecordHeader.to_string(self),
+ qls.utils.format_xid(self.xid, self.xid_size, show_xid_flag),
+ record_tail_str, self._get_warnings())
+ def __str__(self):
+ """Return a string representation of the this TransactionRecord instance"""
+ return TransactionRecord.to_string(self, False)
# =============================================================================
-_CLASSES = {
+CLASSES = {
'a': TransactionRecord,
'c': TransactionRecord,
'd': DequeueRecord,
diff --git a/qpid/tools/src/py/qls/utils.py b/qpid/tools/src/py/qls/utils.py
new file mode 100644
index 0000000000..758dc446c0
--- /dev/null
+++ b/qpid/tools/src/py/qls/utils.py
@@ -0,0 +1,206 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+Module: qls.utils
+
+Contains helper functions for qpid_qls_analyze.
+"""
+
+import os
+import qls.jrnl
+import stat
+import string
+import struct
+import subprocess
+import zlib
+
+DEFAULT_DBLK_SIZE = 128
+DEFAULT_SBLK_SIZE = 4096 # 32 dblks
+DEFAULT_SBLK_SIZE_KB = DEFAULT_SBLK_SIZE / 1024
+DEFAULT_RECORD_VERSION = 2
+DEFAULT_HEADER_SIZE_SBLKS = 1
+
+def adler32(data):
+ """return the adler32 checksum of data"""
+ return zlib.adler32(data) & 0xffffffff
+
+def create_record(magic, uflags, journal_file, record_id, dequeue_record_id, xid, data):
+ """Helper function to construct a record with xid, data (where applicable) and consistent tail with checksum"""
+ record_class = qls.jrnl.CLASSES.get(magic[-1])
+ record = record_class(0, magic, DEFAULT_RECORD_VERSION, uflags, journal_file.file_header.serial, record_id)
+ xid_length = len(xid) if xid is not None else 0
+ if isinstance(record, qls.jrnl.EnqueueRecord):
+ data_length = len(data) if data is not None else 0
+ record.init(None, xid_length, data_length)
+ elif isinstance(record, qls.jrnl.DequeueRecord):
+ record.init(None, dequeue_record_id, xid_length)
+ elif isinstance(record, qls.jrnl.TransactionRecord):
+ record.init(None, xid_length)
+ else:
+ raise qls.err.InvalidClassError(record.__class__.__name__)
+ if xid is not None:
+ record.xid = xid
+ record.xid_complete = True
+ if data is not None:
+ record.data = data
+ record.data_complete = True
+ record.record_tail = _mk_record_tail(record)
+ return record
+
+def efp_directory_size(directory_name):
+ """"Decode the directory name in the format NNNk to a numeric size, where NNN is a number string"""
+ try:
+ if directory_name[-1] == 'k':
+ return int(directory_name[:-1])
+ except ValueError:
+ pass
+ return 0
+
+def format_data(data, data_size=None, show_data_flag=True):
+ """Format binary data for printing"""
+ return _format_binary(data, data_size, show_data_flag, 'data', qls.err.DataSizeError, False)
+
+def format_xid(xid, xid_size=None, show_xid_flag=True):
+ """Format binary XID for printing"""
+ return _format_binary(xid, xid_size, show_xid_flag, 'xid', qls.err.XidSizeError, True)
+
+def get_avail_disk_space(path):
+ df_proc = subprocess.Popen(["df", path], stdout=subprocess.PIPE)
+ output = df_proc.communicate()[0]
+ return int(output.split('\n')[1].split()[3])
+
+def has_write_permission(path):
+ stat_info = os.stat(path)
+ return bool(stat_info.st_mode & stat.S_IRGRP)
+
+def inv_str(in_string):
+ """Perform a binary 1's compliment (invert all bits) on a binary string"""
+ istr = ''
+ for index in range(0, len(in_string)):
+ istr += chr(~ord(in_string[index]) & 0xff)
+ return istr
+
+def load(file_handle, klass):
+ """Load a record of class klass from a file"""
+ args = load_args(file_handle, klass)
+ subclass = klass.discriminate(args)
+ result = subclass(*args) # create instance of record
+ if subclass != klass:
+ result.init(*load_args(file_handle, subclass))
+ return result
+
+def load_args(file_handle, klass):
+ """Load the arguments from class klass"""
+ size = struct.calcsize(klass.FORMAT)
+ foffs = file_handle.tell(),
+ fbin = file_handle.read(size)
+ if len(fbin) != size:
+ raise qls.err.UnexpectedEndOfFileError(len(fbin), size, foffs, file_handle.name)
+ return foffs + struct.unpack(klass.FORMAT, fbin)
+
+def load_data(file_handle, element, element_size):
+ """Read element_size bytes of binary data from file_handle into element"""
+ if element_size == 0:
+ return element, True
+ if element is None:
+ element = file_handle.read(element_size)
+ else:
+ read_size = element_size - len(element)
+ element += file_handle.read(read_size)
+ return element, len(element) == element_size
+
+def skip(file_handle, boundary):
+ """Read and discard disk bytes until the next multiple of boundary"""
+ if not file_handle.closed:
+ file_handle.read(_rem_bytes_in_block(file_handle, boundary))
+
+#--- protected functions ---
+
+def _format_binary(bin_str, bin_size, show_bin_flag, prefix, err_class, hex_num_flag):
+ """Format binary XID for printing"""
+ if bin_str is None and bin_size is not None:
+ if bin_size > 0:
+ raise err_class(bin_size, len(bin_str), bin_str)
+ return ''
+ if bin_size is None:
+ bin_size = len(bin_str)
+ elif bin_size != len(bin_str):
+ raise err_class(bin_size, len(bin_str), bin_str)
+ out_str = '%s(%d)' % (prefix, bin_size)
+ if show_bin_flag:
+ if _is_printable(bin_str):
+ binstr = '"%s"' % _split_str(bin_str)
+ elif hex_num_flag:
+ binstr = '0x%s' % _str_to_hex_num(bin_str)
+ else:
+ binstr = _hex_split_str(bin_str)
+ out_str += '=%s' % binstr
+ return out_str
+
+def _hex_str(in_str, begin, end):
+ """Return a binary string as a hex string"""
+ hstr = ''
+ for index in range(begin, end):
+ if _is_printable(in_str[index]):
+ hstr += in_str[index]
+ else:
+ hstr += '\\%02x' % ord(in_str[index])
+ return hstr
+
+def _hex_split_str(in_str, split_size = 50):
+ """Split a hex string into two parts separated by an ellipsis"""
+ if len(in_str) <= split_size:
+ return _hex_str(in_str, 0, len(in_str))
+ return _hex_str(in_str, 0, 10) + ' ... ' + _hex_str(in_str, len(in_str)-10, len(in_str))
+ #return ''.join(x.encode('hex') for x in reversed(in_str))
+
+def _is_printable(in_str):
+ """Return True if in_str in printable; False otherwise."""
+ for this_char in in_str:
+ if this_char not in string.letters and this_char not in string.digits and this_char not in string.punctuation:
+ return False
+ return True
+
+def _mk_record_tail(record):
+ record_tail = qls.jrnl.RecordTail(None)
+ record_tail.xmagic = inv_str(record.magic)
+ record_tail.checksum = adler32(record.checksum_encode())
+ record_tail.serial = record.serial
+ record_tail.record_id = record.record_id
+ return record_tail
+
+def _rem_bytes_in_block(file_handle, block_size):
+ """Return the remaining bytes in a block"""
+ foffs = file_handle.tell()
+ return (_size_in_blocks(foffs, block_size) * block_size) - foffs
+
+def _size_in_blocks(size, block_size):
+ """Return the size in terms of data blocks"""
+ return int((size + block_size - 1) / block_size)
+
+def _split_str(in_str, split_size = 50):
+ """Split a string into two parts separated by an ellipsis if it is longer than split_size"""
+ if len(in_str) < split_size:
+ return in_str
+ return in_str[:25] + ' ... ' + in_str[-25:]
+
+def _str_to_hex_num(in_str):
+ """Turn a string into a hex number representation, little endian assumed (ie LSB is first, MSB is last)"""
+ return ''.join(x.encode('hex') for x in reversed(in_str))
diff --git a/qpid/tools/src/py/qpid_qls_analyze.py b/qpid/tools/src/py/qpid_qls_analyze.py
index a540587547..1b2655896c 100755
--- a/qpid/tools/src/py/qpid_qls_analyze.py
+++ b/qpid/tools/src/py/qpid_qls_analyze.py
@@ -1,5 +1,5 @@
#!/usr/bin/env python
-#
+
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -16,13 +16,44 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-#
+
+"""
+qpid-qls-analyze
+
+Reads and analyzes a Qpid Linear Store (QLS) store directory.
+"""
import argparse
import os
import os.path
-from qls import efp
-from qls import jrnl
+import qls.anal
+import qls.efp
+
+class QlsAnalyzerArgParser(argparse.ArgumentParser):
+ def __init__(self):
+ argparse.ArgumentParser.__init__(self, description = 'Qpid Linear Store Analyzer', prog = 'qpid-qls-analyze')
+ self.add_argument('qls_dir', metavar='DIR',
+ help='Qpid Linear Store (QLS) directory to be analyzed')
+ self.add_argument('--efp', action='store_true',
+ help='Analyze the Emtpy File Pool (EFP) and show stats')
+ self.add_argument('--show-recs', action='store_true',
+ help='Show material records found during recovery')
+ self.add_argument('--show-all-recs', action='store_true',
+ help='Show all records (including fillers) found during recovery')
+ self.add_argument('--show-xids', action='store_true',
+ help='Show xid as hex number, otherwise show only xid length')
+ self.add_argument('--show-data', action='store_true',
+ help='Show data, otherwise show only data length')
+ self.add_argument('--stats', action='store_true',
+ help='Print journal record stats')
+ self.add_argument('--txn', action='store_true',
+ help='Reconcile incomplete transactions')
+ self.add_argument('--version', action='version',
+ version='%(prog)s ' + QqpdLinearStoreAnalyzer.QLS_ANALYZE_VERSION)
+ def parse_args(self, args=None, namespace=None):
+ args = argparse.ArgumentParser.parse_args(self, args, namespace)
+ # If required, perform additional validity checks here, raise errors if req'd
+ return args
class QqpdLinearStoreAnalyzer(object):
"""
@@ -37,28 +68,10 @@ class QqpdLinearStoreAnalyzer(object):
self.args = None
self._process_args()
self.qls_dir = os.path.abspath(self.args.qls_dir)
- self.efp_manager = efp.EfpManager(self.qls_dir, self.args)
- self.jrnl_recovery_mgr = jrnl.JournalRecoveryManager(self.qls_dir, self.args)
- def _analyze_efp(self):
- self.efp_manager.run(self.args)
- def _analyze_journals(self):
- self.jrnl_recovery_mgr.run(self.args)
+ self.efp_manager = qls.efp.EfpManager(self.qls_dir, None)
+ self.jrnl_recovery_mgr = qls.anal.JournalRecoveryManager(self.qls_dir, self.args)
def _process_args(self):
- parser = argparse.ArgumentParser(description = 'Qpid Linear Store Analyzer')
- parser.add_argument('qls_dir', metavar='DIR',
- help='Qpid Linear Store (QLS) directory to be analyzed')
- parser.add_argument('--efp', action='store_true',
- help='Analyze the Emtpy File Pool (EFP) and show stats')
- parser.add_argument('--show-recs', action='store_true',
- help='Show material records found during recovery')
- parser.add_argument('--show-all-recs', action='store_true',
- help='Show all records (including fillers) found during recovery')
- parser.add_argument('--stats', action='store_true',
- help='Print journal record stats')
- parser.add_argument('--txn', action='store_true',
- help='Reconcile incomplete transactions')
- parser.add_argument('--version', action='version', version='%(prog)s ' +
- QqpdLinearStoreAnalyzer.QLS_ANALYZE_VERSION)
+ parser = QlsAnalyzerArgParser()
self.args = parser.parse_args()
if not os.path.exists(self.args.qls_dir):
parser.error('Journal path "%s" does not exist' % self.args.qls_dir)
@@ -68,8 +81,8 @@ class QqpdLinearStoreAnalyzer(object):
self.jrnl_recovery_mgr.report(self.args.stats)
def run(self):
if self.args.efp:
- self._analyze_efp()
- self._analyze_journals()
+ self.efp_manager.run(None)
+ self.jrnl_recovery_mgr.run()
#==============================================================================
# main program