diff options
author | Kim van der Riet <kpvdr@apache.org> | 2011-11-17 19:19:00 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2011-11-17 19:19:00 +0000 |
commit | a95a0529d6a02f991c07d7b315ae7228bdca05e2 (patch) | |
tree | d8b22e56f0d2a2f4f651ac2beec3e6dbf6f4709f | |
parent | df9881c3f0bd5e649fbd6d395125222495629dd4 (diff) | |
download | qpid-python-a95a0529d6a02f991c07d7b315ae7228bdca05e2.tar.gz |
QPID-3579: Tool to aid in analyzing trace messages in c++ broker log files - added from trunk to 0.14 branch
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.14@1203339 13f79535-47bb-0310-9956-ffa450edef68
-rwxr-xr-x | qpid/tools/setup.py | 3 | ||||
-rwxr-xr-x | qpid/tools/src/py/qpid-analyze-trace | 258 |
2 files changed, 260 insertions, 1 deletions
diff --git a/qpid/tools/setup.py b/qpid/tools/setup.py index dd849dc2ec..925a59d017 100755 --- a/qpid/tools/setup.py +++ b/qpid/tools/setup.py @@ -23,7 +23,8 @@ setup(name="qpid-tools", version="0.14", author="Apache Qpid", author_email="dev@qpid.apache.org", - scripts=["src/py/qpid-cluster", + scripts=["src/py/qpid-analyze_trace", + "src/py/qpid-cluster", "src/py/qpid-cluster-store", "src/py/qpid-config", "src/py/qpid-printevents", diff --git a/qpid/tools/src/py/qpid-analyze-trace b/qpid/tools/src/py/qpid-analyze-trace new file mode 100755 index 0000000000..65c673e511 --- /dev/null +++ b/qpid/tools/src/py/qpid-analyze-trace @@ -0,0 +1,258 @@ +#!/usr/bin/env python + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import sys +from datetime import datetime +from optparse import OptionParser + +# Version of this tool software +MAJOR_VERSION = 1 +MINOR_VERSION = 1 +# === Version history === +# 2011-11-16 1.1: Bugfixs: +# QPID-3623 - Incorrect handling of transactions +# QPID-3624 - Replace argparse lib with optparse so tool can be used on Python 2.6. +# 2011-11-07 1.0: Initial checkin +# QPID-3579: Initial version checked in + + +# AMQP 0-10 commands - these increment the command counter +EXEC_COMMANDS = ["ExecutionSync", "ExecutionResult", "ExecutionException", "MessageTransfer", "MessageAccept", + "MessageReject", "MessageRelease", "MessageAcquire", "MessageResume", "MessageSubscribe", + "MessageCancel", "MessageSetFlowMode", "MessageFlow", "MessageFlush", "MessageStop", "TxSelect", + "TxCommit", "TxRollback", "DtxSelect", "DtxStart", "DtxEnd", "DtxCommit", "DtxForget", "DtxGetTimeout", + "DtxPrepare", "DtxRecover", "DtxRollback", "DtxSetTimeout", "ExchangeDeclare", "ExchangeDelete", + "ExchangeQuery", "ExchangeBind", "ExchangeUnbind", "ExchangeBound", "QueueDeclare", "QueueDelete", + "QueuePurge", "QueueQuery", "FileQos", "FileQosOk", "FileConsume", "FileConsumeOk", "FileCancel", + "FileOpen", "FileOpenOk", "FileStage", "FilePublish", "FileReturn", "FileDeliver", "FileAck", + "FileReject", "StreamQos", "StreamQosOk", "StreamConsume", "StreamConsumeOk", "StreamCancel", + "StreamPublish", "StreamReturn", "StreamDeliver"] +HEADER_STR = " -line ----------timestamp -----------connection ssn recv send- txn-- operation---------->" + +PROGRESS_LINES_PER_DOT = 100000 + +class LogLevel: + CRITICAL = (1, "critical") + ERROR = (2, "error") + WARNING = (3, "warning") + NOTICE = (4, "notice") + INFO = (5, "info") + DEBUG = (6, "debug") + TRACE = (7, "trace") + @staticmethod + def get_level(level): + if level == LogLevel.CRITICAL[1]: return LogLevel.CRITICAL + if level == LogLevel.ERROR[1]: return LogLevel.ERROR + if level == LogLevel.WARNING[1]: return LogLevel.WARNING + if level == LogLevel.NOTICE[1]: return LogLevel.NOTICE + if level == LogLevel.INFO[1]: return LogLevel.INFO + if level == LogLevel.DEBUG[1]: return LogLevel.DEBUG + if level == LogLevel.TRACE[1]: return LogLevel.TRACE + raise Exception("Unknown log level: %s" % level) + +class LogLine: + def __init__(self, line_no, line): + self.line_no = line_no + self.timestamp = datetime.strptime(line[:19], "%Y-%m-%d %H:%M:%S") + self.level = LogLevel.get_level(line[20:].split(" ")[0]) + self.line = line[21 + len(self.level[1]):].strip() + self.cmd_cnt = None + self.txn_cnt = None + def __str__(self): + if self.contains("RECV"): cnt_str = "R" + else: cnt_str = " S" + if self.cmd_cnt is not None: cnt_str += str(self.cmd_cnt) + set_index = self.find("{") + header_index = self.find("header") + content_index = self.find("content") + if self.txn_cnt is None: + txn_cnt_str = "" + else: + txn_cnt_str = "T%d" % self.txn_cnt + if header_index != -1 and header_index < set_index: op_str = " + " + self.line[header_index:self.line.rfind("]")] + elif content_index != -1 and set_index == -1: op_str = " + " + self.line[content_index:self.line.rfind("]")] + else: op_str = self.line[set_index+1:self.line.rfind("}")] + return " %7d %19s %22s %3d %-10s %-5s %s" % (self.line_no, self.timestamp.isoformat(" "), + self.get_identifier_remote_addr(), self.get_channel(), + cnt_str, txn_cnt_str, op_str) + def contains(self, string): + return self.line.find(string) != -1 + def find(self, string): + return self.line.find(string) + def get_channel(self): + return int(self.get_named_value("channel")) + def get_identifier(self): + return self.line.partition("[")[2].partition("]")[0] + def get_identifier_remote_addr(self): + return self.get_identifier().partition("-")[2] + def get_named_value(self, name): + return self.line.partition("%s=" % name)[2].partition(";")[0] + def get_msg_accept_range(self): + str_nums = self.get_named_value("transfers").strip(" {[]}").split(",") + return range(int(str_nums[0]), int(str_nums[1]) + 1) + def is_log_level(self, level): + if self.level is None: return None + return level[0] == self.level[0] + def is_frame(self): + return self.contains("Frame[") + +class ConnectionProperty: + def __init__(self, line): + self.addr = line.get_identifier_remote_addr() + self.channel = line.get_channel() + self.ops = [line] + def add_op(self, line): + self.ops.append(line) + +class Connection(ConnectionProperty): + def __init__(self, line): + ConnectionProperty.__init__(self, line) + self.session_list = [] # Keeps session creation order + self.session_dict = {} # For looking up by channel no. + def __str__(self): + return "Connection %s (ops=%d; sessions=%d):" % (self.addr, len(self.ops), len(self.session_dict)) + def add_session(self, session): + self.session_list.append(session) + self.session_dict[session.channel] = session + def get_session(self, channel): + return self.session_dict[channel] + +class Session(ConnectionProperty): + def __init__(self, line): + ConnectionProperty.__init__(self, line) + self.name = line.get_named_value("name") + self.send_cnt = 0 + self.recv_cnt = 0 + self.txn_flag = False + self.txn_cnt = 0 + self.recv_cmds = {} # For looking up by cmd no + self.send_cmds = {} # For looking up by cmd no + def __str__(self): + if self.txn_flag: + return " + Session %d (name=%s send-cmds=%d recv-cmds=%d txns=%d):" % (self.channel, self.name, + self.send_cnt, self.recv_cnt, + self.txn_cnt) + return " + Session %d (name=%s send-cmds=%d recv-cmds=%d non-txn):" % (self.channel, self.name, self.send_cnt, + self.recv_cnt) + def incr_recv_cnt(self, line): + self.recv_cmds[self.recv_cnt] = line + self.recv_cnt += 1 + def incr_send_cnt(self, line): + self.send_cmds[self.send_cnt] = line + self.send_cnt += 1 + def set_send_txn_cnt(self, cmd): + self.send_cmds[cmd].txn_cnt = self.txn_cnt + +class TraceAnalysis: + def __init__(self): + self.connection_list = [] # Keeps connection creation order + self.connection_dict = {} # For looking up by connection address + parser = OptionParser(usage="%prog [options] trace-file", version="%%prog %d.%d" % (MAJOR_VERSION, MINOR_VERSION), + description="A tool to structure and display Qpid broker trace logs.") + parser.add_option("--connection-summary", action="store_true", default=False, dest="connection_summary", + help="Hide connection details, provide one-line summary") + parser.add_option("--session-summary", action="store_true", default=False, dest="session_summary", + help="Hide session details, provide one-line summary") + parser.add_option("--summary", "-s", action="store_true", default=False, dest="summary", + help="Hide both connection and session details. Equivalent to --connection-summary and" + "--session-summary") + self.opts, self.args = parser.parse_args() + if len(self.args) == 0: raise Exception("Missing trace-file argument") + def analyze_trace(self): + lcnt = 0 + print "Reading trace file %s:" % self.args[0] + log_file = open(self.args[0], "r") + try: + for fline in log_file: + lcnt += 1 + try: + lline = LogLine(lcnt, fline) + if lline.is_log_level(LogLevel.TRACE) and lline.is_frame(): + if lline.contains("{ConnectionStartBody"): + conn = Connection(lline) + self.connection_list.append(conn) + self.connection_dict[conn.addr] = conn + elif lline.contains("{Connection"): + self.connection_dict[lline.get_identifier_remote_addr()].add_op(lline) + elif lline.contains("{SessionAttachBody"): + ssn = Session(lline) + self.connection_dict[ssn.addr].add_session(ssn) + else: + ssn = self.connection_dict[lline.get_identifier_remote_addr()].get_session(lline.get_channel()) + ssn.add_op(lline) + if lline.line[lline.find("{") + 1 : lline.find("Body")] in EXEC_COMMANDS: + if lline.contains("RECV"): + lline.cmd_cnt = ssn.recv_cnt + if ssn.txn_flag: + if lline.contains("MessageAcceptBody"): + lline.txn_cnt = ssn.txn_cnt + for cmd in lline.get_msg_accept_range(): + ssn.set_send_txn_cnt(cmd) + if lline.contains("MessageTransferBody"): lline.txn_cnt = ssn.txn_cnt + ssn.incr_recv_cnt(lline) + elif lline.contains("SEND") or lline.contains("SENT"): + lline.cmd_cnt = ssn.send_cnt + ssn.incr_send_cnt(lline) + # TODO: This treatment will probably break down for DTX + if lline.contains("xSelectBody"): + ssn.txn_flag = True + elif lline.contains("xCommitBody") or lline.contains("xRollbackBody"): + lline.txn_cnt = ssn.txn_cnt + ssn.txn_cnt += 1 + except KeyboardInterrupt as e: raise e + except: pass + if (lcnt + 1) % PROGRESS_LINES_PER_DOT == 0: + sys.stdout.write(".") + sys.stdout.flush() + finally: log_file.close() + if lcnt > PROGRESS_LINES_PER_DOT: print + print "Read and analyzed", lcnt, "lines." + def print_analysis(self): + if len(self.connection_list) > 0: + for c in self.connection_list: + print + print c + if not self.opts.connection_summary and not self.opts.summary: + print HEADER_STR + for o in c.ops: + print o + for s in c.session_list: + print s + if not self.opts.session_summary and not self.opts.summary: + print HEADER_STR + for o in s.ops: + print o + else: + print "No trace-level entries found in log." + +def check_python_version(major, minor, micro): + if sys.version_info < (major, minor, micro): + print "Incorrect Python version: %s found; >= %d.%d.%d needed." % (sys.version.split()[0], major, minor, micro) + sys.exit(-1) + +# === Main program === + +if __name__ == '__main__': + check_python_version(2, 4, 0) + t = TraceAnalysis() + t.analyze_trace() + t.print_analysis() +
\ No newline at end of file |