summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2011-11-17 19:19:00 +0000
committerKim van der Riet <kpvdr@apache.org>2011-11-17 19:19:00 +0000
commita95a0529d6a02f991c07d7b315ae7228bdca05e2 (patch)
treed8b22e56f0d2a2f4f651ac2beec3e6dbf6f4709f
parentdf9881c3f0bd5e649fbd6d395125222495629dd4 (diff)
downloadqpid-python-a95a0529d6a02f991c07d7b315ae7228bdca05e2.tar.gz
QPID-3579: Tool to aid in analyzing trace messages in c++ broker log files - added from trunk to 0.14 branch
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.14@1203339 13f79535-47bb-0310-9956-ffa450edef68
-rwxr-xr-xqpid/tools/setup.py3
-rwxr-xr-xqpid/tools/src/py/qpid-analyze-trace258
2 files changed, 260 insertions, 1 deletions
diff --git a/qpid/tools/setup.py b/qpid/tools/setup.py
index dd849dc2ec..925a59d017 100755
--- a/qpid/tools/setup.py
+++ b/qpid/tools/setup.py
@@ -23,7 +23,8 @@ setup(name="qpid-tools",
version="0.14",
author="Apache Qpid",
author_email="dev@qpid.apache.org",
- scripts=["src/py/qpid-cluster",
+ scripts=["src/py/qpid-analyze_trace",
+ "src/py/qpid-cluster",
"src/py/qpid-cluster-store",
"src/py/qpid-config",
"src/py/qpid-printevents",
diff --git a/qpid/tools/src/py/qpid-analyze-trace b/qpid/tools/src/py/qpid-analyze-trace
new file mode 100755
index 0000000000..65c673e511
--- /dev/null
+++ b/qpid/tools/src/py/qpid-analyze-trace
@@ -0,0 +1,258 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import sys
+from datetime import datetime
+from optparse import OptionParser
+
+# Version of this tool software
+MAJOR_VERSION = 1
+MINOR_VERSION = 1
+# === Version history ===
+# 2011-11-16 1.1: Bugfixs:
+# QPID-3623 - Incorrect handling of transactions
+# QPID-3624 - Replace argparse lib with optparse so tool can be used on Python 2.6.
+# 2011-11-07 1.0: Initial checkin
+# QPID-3579: Initial version checked in
+
+
+# AMQP 0-10 commands - these increment the command counter
+EXEC_COMMANDS = ["ExecutionSync", "ExecutionResult", "ExecutionException", "MessageTransfer", "MessageAccept",
+ "MessageReject", "MessageRelease", "MessageAcquire", "MessageResume", "MessageSubscribe",
+ "MessageCancel", "MessageSetFlowMode", "MessageFlow", "MessageFlush", "MessageStop", "TxSelect",
+ "TxCommit", "TxRollback", "DtxSelect", "DtxStart", "DtxEnd", "DtxCommit", "DtxForget", "DtxGetTimeout",
+ "DtxPrepare", "DtxRecover", "DtxRollback", "DtxSetTimeout", "ExchangeDeclare", "ExchangeDelete",
+ "ExchangeQuery", "ExchangeBind", "ExchangeUnbind", "ExchangeBound", "QueueDeclare", "QueueDelete",
+ "QueuePurge", "QueueQuery", "FileQos", "FileQosOk", "FileConsume", "FileConsumeOk", "FileCancel",
+ "FileOpen", "FileOpenOk", "FileStage", "FilePublish", "FileReturn", "FileDeliver", "FileAck",
+ "FileReject", "StreamQos", "StreamQosOk", "StreamConsume", "StreamConsumeOk", "StreamCancel",
+ "StreamPublish", "StreamReturn", "StreamDeliver"]
+HEADER_STR = " -line ----------timestamp -----------connection ssn recv send- txn-- operation---------->"
+
+PROGRESS_LINES_PER_DOT = 100000
+
+class LogLevel:
+ CRITICAL = (1, "critical")
+ ERROR = (2, "error")
+ WARNING = (3, "warning")
+ NOTICE = (4, "notice")
+ INFO = (5, "info")
+ DEBUG = (6, "debug")
+ TRACE = (7, "trace")
+ @staticmethod
+ def get_level(level):
+ if level == LogLevel.CRITICAL[1]: return LogLevel.CRITICAL
+ if level == LogLevel.ERROR[1]: return LogLevel.ERROR
+ if level == LogLevel.WARNING[1]: return LogLevel.WARNING
+ if level == LogLevel.NOTICE[1]: return LogLevel.NOTICE
+ if level == LogLevel.INFO[1]: return LogLevel.INFO
+ if level == LogLevel.DEBUG[1]: return LogLevel.DEBUG
+ if level == LogLevel.TRACE[1]: return LogLevel.TRACE
+ raise Exception("Unknown log level: %s" % level)
+
+class LogLine:
+ def __init__(self, line_no, line):
+ self.line_no = line_no
+ self.timestamp = datetime.strptime(line[:19], "%Y-%m-%d %H:%M:%S")
+ self.level = LogLevel.get_level(line[20:].split(" ")[0])
+ self.line = line[21 + len(self.level[1]):].strip()
+ self.cmd_cnt = None
+ self.txn_cnt = None
+ def __str__(self):
+ if self.contains("RECV"): cnt_str = "R"
+ else: cnt_str = " S"
+ if self.cmd_cnt is not None: cnt_str += str(self.cmd_cnt)
+ set_index = self.find("{")
+ header_index = self.find("header")
+ content_index = self.find("content")
+ if self.txn_cnt is None:
+ txn_cnt_str = ""
+ else:
+ txn_cnt_str = "T%d" % self.txn_cnt
+ if header_index != -1 and header_index < set_index: op_str = " + " + self.line[header_index:self.line.rfind("]")]
+ elif content_index != -1 and set_index == -1: op_str = " + " + self.line[content_index:self.line.rfind("]")]
+ else: op_str = self.line[set_index+1:self.line.rfind("}")]
+ return " %7d %19s %22s %3d %-10s %-5s %s" % (self.line_no, self.timestamp.isoformat(" "),
+ self.get_identifier_remote_addr(), self.get_channel(),
+ cnt_str, txn_cnt_str, op_str)
+ def contains(self, string):
+ return self.line.find(string) != -1
+ def find(self, string):
+ return self.line.find(string)
+ def get_channel(self):
+ return int(self.get_named_value("channel"))
+ def get_identifier(self):
+ return self.line.partition("[")[2].partition("]")[0]
+ def get_identifier_remote_addr(self):
+ return self.get_identifier().partition("-")[2]
+ def get_named_value(self, name):
+ return self.line.partition("%s=" % name)[2].partition(";")[0]
+ def get_msg_accept_range(self):
+ str_nums = self.get_named_value("transfers").strip(" {[]}").split(",")
+ return range(int(str_nums[0]), int(str_nums[1]) + 1)
+ def is_log_level(self, level):
+ if self.level is None: return None
+ return level[0] == self.level[0]
+ def is_frame(self):
+ return self.contains("Frame[")
+
+class ConnectionProperty:
+ def __init__(self, line):
+ self.addr = line.get_identifier_remote_addr()
+ self.channel = line.get_channel()
+ self.ops = [line]
+ def add_op(self, line):
+ self.ops.append(line)
+
+class Connection(ConnectionProperty):
+ def __init__(self, line):
+ ConnectionProperty.__init__(self, line)
+ self.session_list = [] # Keeps session creation order
+ self.session_dict = {} # For looking up by channel no.
+ def __str__(self):
+ return "Connection %s (ops=%d; sessions=%d):" % (self.addr, len(self.ops), len(self.session_dict))
+ def add_session(self, session):
+ self.session_list.append(session)
+ self.session_dict[session.channel] = session
+ def get_session(self, channel):
+ return self.session_dict[channel]
+
+class Session(ConnectionProperty):
+ def __init__(self, line):
+ ConnectionProperty.__init__(self, line)
+ self.name = line.get_named_value("name")
+ self.send_cnt = 0
+ self.recv_cnt = 0
+ self.txn_flag = False
+ self.txn_cnt = 0
+ self.recv_cmds = {} # For looking up by cmd no
+ self.send_cmds = {} # For looking up by cmd no
+ def __str__(self):
+ if self.txn_flag:
+ return " + Session %d (name=%s send-cmds=%d recv-cmds=%d txns=%d):" % (self.channel, self.name,
+ self.send_cnt, self.recv_cnt,
+ self.txn_cnt)
+ return " + Session %d (name=%s send-cmds=%d recv-cmds=%d non-txn):" % (self.channel, self.name, self.send_cnt,
+ self.recv_cnt)
+ def incr_recv_cnt(self, line):
+ self.recv_cmds[self.recv_cnt] = line
+ self.recv_cnt += 1
+ def incr_send_cnt(self, line):
+ self.send_cmds[self.send_cnt] = line
+ self.send_cnt += 1
+ def set_send_txn_cnt(self, cmd):
+ self.send_cmds[cmd].txn_cnt = self.txn_cnt
+
+class TraceAnalysis:
+ def __init__(self):
+ self.connection_list = [] # Keeps connection creation order
+ self.connection_dict = {} # For looking up by connection address
+ parser = OptionParser(usage="%prog [options] trace-file", version="%%prog %d.%d" % (MAJOR_VERSION, MINOR_VERSION),
+ description="A tool to structure and display Qpid broker trace logs.")
+ parser.add_option("--connection-summary", action="store_true", default=False, dest="connection_summary",
+ help="Hide connection details, provide one-line summary")
+ parser.add_option("--session-summary", action="store_true", default=False, dest="session_summary",
+ help="Hide session details, provide one-line summary")
+ parser.add_option("--summary", "-s", action="store_true", default=False, dest="summary",
+ help="Hide both connection and session details. Equivalent to --connection-summary and"
+ "--session-summary")
+ self.opts, self.args = parser.parse_args()
+ if len(self.args) == 0: raise Exception("Missing trace-file argument")
+ def analyze_trace(self):
+ lcnt = 0
+ print "Reading trace file %s:" % self.args[0]
+ log_file = open(self.args[0], "r")
+ try:
+ for fline in log_file:
+ lcnt += 1
+ try:
+ lline = LogLine(lcnt, fline)
+ if lline.is_log_level(LogLevel.TRACE) and lline.is_frame():
+ if lline.contains("{ConnectionStartBody"):
+ conn = Connection(lline)
+ self.connection_list.append(conn)
+ self.connection_dict[conn.addr] = conn
+ elif lline.contains("{Connection"):
+ self.connection_dict[lline.get_identifier_remote_addr()].add_op(lline)
+ elif lline.contains("{SessionAttachBody"):
+ ssn = Session(lline)
+ self.connection_dict[ssn.addr].add_session(ssn)
+ else:
+ ssn = self.connection_dict[lline.get_identifier_remote_addr()].get_session(lline.get_channel())
+ ssn.add_op(lline)
+ if lline.line[lline.find("{") + 1 : lline.find("Body")] in EXEC_COMMANDS:
+ if lline.contains("RECV"):
+ lline.cmd_cnt = ssn.recv_cnt
+ if ssn.txn_flag:
+ if lline.contains("MessageAcceptBody"):
+ lline.txn_cnt = ssn.txn_cnt
+ for cmd in lline.get_msg_accept_range():
+ ssn.set_send_txn_cnt(cmd)
+ if lline.contains("MessageTransferBody"): lline.txn_cnt = ssn.txn_cnt
+ ssn.incr_recv_cnt(lline)
+ elif lline.contains("SEND") or lline.contains("SENT"):
+ lline.cmd_cnt = ssn.send_cnt
+ ssn.incr_send_cnt(lline)
+ # TODO: This treatment will probably break down for DTX
+ if lline.contains("xSelectBody"):
+ ssn.txn_flag = True
+ elif lline.contains("xCommitBody") or lline.contains("xRollbackBody"):
+ lline.txn_cnt = ssn.txn_cnt
+ ssn.txn_cnt += 1
+ except KeyboardInterrupt as e: raise e
+ except: pass
+ if (lcnt + 1) % PROGRESS_LINES_PER_DOT == 0:
+ sys.stdout.write(".")
+ sys.stdout.flush()
+ finally: log_file.close()
+ if lcnt > PROGRESS_LINES_PER_DOT: print
+ print "Read and analyzed", lcnt, "lines."
+ def print_analysis(self):
+ if len(self.connection_list) > 0:
+ for c in self.connection_list:
+ print
+ print c
+ if not self.opts.connection_summary and not self.opts.summary:
+ print HEADER_STR
+ for o in c.ops:
+ print o
+ for s in c.session_list:
+ print s
+ if not self.opts.session_summary and not self.opts.summary:
+ print HEADER_STR
+ for o in s.ops:
+ print o
+ else:
+ print "No trace-level entries found in log."
+
+def check_python_version(major, minor, micro):
+ if sys.version_info < (major, minor, micro):
+ print "Incorrect Python version: %s found; >= %d.%d.%d needed." % (sys.version.split()[0], major, minor, micro)
+ sys.exit(-1)
+
+# === Main program ===
+
+if __name__ == '__main__':
+ check_python_version(2, 4, 0)
+ t = TraceAnalysis()
+ t.analyze_trace()
+ t.print_analysis()
+ \ No newline at end of file