diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2011-05-27 15:44:23 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2011-05-27 15:44:23 +0000 |
commit | 66765100f4257159622cefe57bed50125a5ad017 (patch) | |
tree | a88ee23bb194eb91f0ebb2d9b23ff423e3ea8e37 /qpid/java/perftests/bin/processing/processTests.py | |
parent | 1aeaa7b16e5ce54f10c901d75c4d40f9f88b9db6 (diff) | |
parent | 88b98b2f4152ef59a671fad55a0d08338b6b78ca (diff) | |
download | qpid-python-66765100f4257159622cefe57bed50125a5ad017.tar.gz |
Creating a branch for experimenting with some ideas for JMS client.rajith_jms_client
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rajith_jms_client@1128369 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/perftests/bin/processing/processTests.py')
-rwxr-xr-x | qpid/java/perftests/bin/processing/processTests.py | 850 |
1 files changed, 850 insertions, 0 deletions
diff --git a/qpid/java/perftests/bin/processing/processTests.py b/qpid/java/perftests/bin/processing/processTests.py new file mode 100755 index 0000000000..8db44eda79 --- /dev/null +++ b/qpid/java/perftests/bin/processing/processTests.py @@ -0,0 +1,850 @@ +#!/usr/bin/env python +# +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import os +import re +import datetime +import sys +import string +from optparse import OptionParser +from datetime import datetime, timedelta +import shutil + + +def showUsage(): + log("./processTests.py [-b|--broker-log-dir] <dir> [-t|--test-dir] <dir>") + + +ACCESS="Access" +MODIFY="Modify" + +BROKER_LOG="broker.log" +BROKER_PID="broker.pid" +BROKER_CPU="broker_cpu.log" +BROKER_CPU_DATED="broker_cpu.log.dated" +BROKER_STATS="broker.stats" +BROKER_GC="gc.log" + +GRAPH_DATA="graph.data" + +_verbose = False +_debug = False +_brokerLogs = "" + +def exitError(message): + log(message) + sys.exit(1) + +def main(): + global _log, _verbose, _debug, _brokerLogs + + # Load the + parser = OptionParser() + + parser.add_option("-v", "--verbose", dest="verbose", + action="store_true", default=False, help="enable verbose output") + + parser.add_option("-d", "--debug", dest="debug", + action="store_true", default=False, help="enable debug output") + + parser.add_option("-b", "--broker-log-dir", dest="brokerLogs", + action="store", default=True, help="Broker Logs") + + parser.add_option("-t", "--test-dir", dest="testDir", + action="store", default="", help="Test Results") + + + (options, args) = parser.parse_args() + + _verbose = options.verbose + _debug = options.debug + testDir = options.testDir + _brokerLogs = options.brokerLogs + + if testDir == "" or _brokerLogs == "" : + log("Broker Log Dir and Test Dir are both requried.") + showUsage() + + if not os.path.exists(testDir): + exitError("Test directory does not exist:" + testDir) + + if not os.path.exists(_brokerLogs): + exitError("Broker log directory does not exist:" + _brokerLogs) + + + # Standardize the format of the broker logs + preProcessBrokerLogs(_brokerLogs) + + # Get list of test results from test_dir + processTestResults(testDir) + +# +# Process the log files we know of +# +def preProcessBrokerLogs(resultDir): + + print "Pre Processing Broker Logs" + # Pre-Process GC - no pre processing required + + # Process Log4j - no processing required as file is already time stamped. + + # Pre-Process broker_cpu + processCPUUsage(resultDir) + +# +# Process the broker CPU log file and create an output file of format +# <Date Time> <CPU Usage> +# +# +def processCPUUsage(resultDir): + logfile=resultDir+os.sep+BROKER_CPU + datedFile=resultDir+os.sep+BROKER_CPU_DATED + + start = extractTime(ACCESS, logfile+".stat") + + pid = getPID(BROKER_PID) + + topRate = getFirstLine(_brokerLogs+os.sep+"top.rate") + + # + # Calulate addition required per process line output + # + if topRate.find(".") == -1: + seconds = topRate + millis = 0 + else: + split = topRate.split('.') + seconds = split[0] + # Convert + millis = float("0."+split[1]) * 1000 + + offset = timedelta(seconds=int(seconds),milliseconds=int(millis)) + + # + # Process the CPU log file and make a file of format: + # datetime <CPU% usage> <MEM% usage> + # + # Open log CPU file for reading + logfile = open(logfile, "r") + + # Open the output file, erasing any existing version + # Keep track of the min/max sum and entries,. + minCPU=float(sys.maxint) + maxCPU=0.0 + minMem=float(sys.maxint) + maxMem=0.0 + + entries=0 + sumCPU=0.0 + sumMem=0.0 + + output= open(datedFile, "w") + for line in logfile: + # + # Data format + # 0 1 2 3 4 5 6 7 8 9 10 11 + # PID USER PR NI %CPU TIME+ %MEM VIRT RES SHR S COMMAND + # PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND + # + # %CPU and %MEM are vary, probably based on os/version of top. + # lets auto-detect where it is. + # + # Index is 0 based for array usage. + index = 0 + if line.find("PID") != -1: + for key in line.split(" "): + strippedKey = key.lstrip() + if len(strippedKey) > 0: + # Record the key index + if (strippedKey == "%CPU"): + cpuIndex=index + if (strippedKey == "%MEM"): + memIndex=index + # Increase count for next key + index = index + 1 + + + # Find lines that contain our broker process + if line.find("QPBRKR") != -1: + + # Split line on whitespace + data = line.split() + + #Write out the date time (ISO-8601 format) + output.write(str(start)) + # Output the %CPU value + output.write(" "+str(data[cpuIndex])) + # Output the %MEM value + output.write(" "+str(data[memIndex])) + output.write('\n') + + # Add the offset based on the logging rate + start = start + offset + + # Record entires + entries = entries + 1 + + # Record Metrics + # Record CPU data + cpu = float(data[cpuIndex]) + if (cpu < minCPU): + minCPU = cpu + + if (cpu > maxCPU): + maxCPU = cpu + + sumCPU = sumCPU + cpu + + # Record Mem data + mem = float(data[memIndex]) + + if (mem < minMem): + minMem = mem + + if (mem > maxMem): + maxMem = mem + + sumMem = sumMem + mem + + + #end for + + # Close the files + logfile.close + output.close + + # Output stats file + statFile=resultDir+os.sep+BROKER_CPU+".stats" + output= open(statFile, "w") + output.write("#type:min/max/avg") + output.write('\n') + output.write("CPU:"+str(minCPU)+"/"+str(maxCPU)+"/"+str(float(sumCPU)/float(entries))) + output.write('\n') + output.write("MEM:"+str(minMem)+"/"+str(maxMem)+"/"+str(float(sumMem)/float(entries))) + output.write('\n') + output.close + + log("Pre Process of CPU Log file '"+BROKER_CPU+"' complete") + + +# +# Give an known process type get the recorded PID. +# +def getPID(process): + return getFirstLine(_brokerLogs+os.sep+process) + +# +# Get the first line of the file without EOL chars. +# NOTE: this will load the entire file into memory to do it. +# +def getFirstLine(fileName): + f = open(fileName,"r") + line = f.read().splitlines()[0] + f.close + return line + + +# +# Walk the directory given and process all csv test results +# +def processTestResults(resultDir): + for root, dirs, files in os.walk(resultDir, topdown=False): + if len(files) == 0: + exitError("Test result directory is empty:" + resultDir) + for file in files: + if file.endswith(".csv"): + processTestResult(root , file) + +def processTestResult(root, resultFile): + # Open stat file and extract test times, we determine: + # -start time based on the 'Access' value + # -end time based on the 'Modify' value 'Change' would also work + + statFile=root+os.sep+resultFile+".stat" + + if not os.path.exists(statFile): + log("Unable to process : Unable to open stat file:" + statFile) + return + + + createResultSetPackage(root, resultFile) + + +def extractTime(field, statFile): + stats = open(statFile, "r") + for line in stats: + if line.startswith(field): + if line.find("(") == -1: + dt = lineToDate(" ".join(line.split()[1:])) + + # + # TODO We need to handle time time zone issues as I'm sure we will have issues with the + # log4j matching. + + stats.close + return dt + +# +# Given a text line in ISO format convert it to a date object +# +def lineToDate(line): + #2009-06-22 17:04:44,320 + #2009-06-22 17:04:44.320 + pattern = re.compile(r'(?P<year>^[0-9][0-9][0-9][0-9])-(?P<month>[0-9][0-9])-(?P<day>[0-9][0-9]) (?P<hour>[0-9][0-9]):(?P<minute>[0-9][0-9]):(?P<seconds>[0-9][0-9])') + + + m = pattern.match(line) + if m: + year = int(m.group('year')) + month = int(m.group('month')) + day = int(m.group('day')) + hour = int(m.group('hour')) + minute = int(m.group('minute')) + seconds = int(m.group('seconds')) + + pattern = re.compile(r'(?P<year>^[0-9][0-9][0-9][0-9])-(?P<month>[0-9][0-9])-(?P<day>[0-9][0-9]) (?P<hour>[0-9][0-9]):(?P<minute>[0-9][0-9]):(?P<seconds>[0-9][0-9])[.|,](?P<micro>[0-9]+)') + m = pattern.match(line) + micro = None + if m: + micro = m.group('micro') + + if micro == None: + micro = 0 + + # Correct issue where micros are actually nanos + if int(micro) > 999999: + micro = int(micro) / 1000 + + return datetime(year,month,day,hour,minute,seconds,int(micro)) + else: + # Error we shouldn't get here + return null + +def createResultSetPackage(root, resultFile): + # Get the Name of the test to make a directory with said name + testName = resultFile.split(".csv")[0] + resultDir = root+ os.sep + testName + + log("Processing Result set for:"+ testName) + + mkdir(resultDir) + + # Move result file to new directory + shutil.move(root + os.sep + resultFile, resultDir) + + # Move stat file to new directory + shutil.move(root + os.sep + resultFile + ".stat", resultDir) + + statFile=resultDir + os.sep + resultFile + ".stat" + + # + # Get start and end time for test run + # + start = extractTime(ACCESS, statFile) + end = extractTime(MODIFY, statFile) + + sliceBrokerLogs(resultDir, start, end) + createGraphData(resultDir, testName) + createTestStatData(resultDir, testName) + + log("Created Result Package for:"+ testName) + +def sliceBrokerLogs(resultDir, start, end): + sliceCPULog(resultDir, start, end) + sliceLog4j(resultDir, start, end) + sliceGCLog(resultDir, start, end) + + +def sliceCPULog(resultDir, start, end): + global _brokerLogs + logfilePath=_brokerLogs+os.sep+BROKER_CPU_DATED + cpuSliceFile=resultDir+os.sep+BROKER_CPU + + # Process the CPU log file and make a file of format: + # datetime <CPU% usage> <MEM% usage> + # + # Open log CPU file for reading + logFile = open(logfilePath, "r") + + # Open the output file, erasing any existing version + # Keep track of the min/max sum and entries,. + minCPU=float(sys.maxint) + maxCPU=0.0 + minMem=float(sys.maxint) + maxMem=0.0 + + entries=0 + sumCPU=0.0 + sumMem=0.0 + + # + # Create outputfile + # + cpuslice = open(cpuSliceFile,"w") + for line in logFile: + data = line.split() + # + # //fixme remove tz addition. + # + lineTime = lineToDate(" ".join(data[0:2])+" +0000") + + if lineTime > start: + if lineTime < end: + # Write the data though to the new file + cpuslice.writelines(line) + + # Perform stat processing for the min/max/avg + + data = line.split() + + # + # Data format is + # <Date> <Time> <%CPU> <%MEM> + # 2010-02-19 10:16:17 157 28.1 + # + cpuIndex = 2 + memIndex = 3 + + # Record entires + entries = entries + 1 + + # Record Metrics + # Record CPU data + cpu = float(data[cpuIndex]) + if (cpu < minCPU): + minCPU = cpu + + if (cpu > maxCPU): + maxCPU = cpu + + sumCPU = sumCPU + cpu + + # Record Mem data + mem = float(data[memIndex]) + + if (mem < minMem): + minMem = mem + + if (mem > maxMem): + maxMem = mem + + sumMem = sumMem + mem + + logFile.close() + cpuslice.close() + log("Sliced CPU log") + + # Output stats file + statFile=cpuSliceFile+".stats" + output= open(statFile, "w") + output.write("#type:min/max/avg") + output.write('\n') + output.write("CPU:"+str(minCPU)+"/"+str(maxCPU)+"/"+str(float(sumCPU)/float(entries))) + output.write('\n') + output.write("MEM:"+str(minMem)+"/"+str(maxMem)+"/"+str(float(sumMem)/float(entries))) + output.write('\n') + output.close + + log("Generated stat data from CPU Log file") + + +def sliceGCLog(resultDir, start, end): + global _brokerLogs + logfilePath=_brokerLogs+os.sep+BROKER_GC + sliceFile=resultDir+os.sep+BROKER_GC + + gcstart = extractTime(ACCESS, logfilePath+".stat") + + # Open the output file, erasing any existing version + # Keep track of the min/max sum and entries,. + minGCDuration=float(sys.maxint) + maxGCDuration=0.0 + sumGCDuration=0.0 + entriesGCDuration = 0 + + + # Open log GC file for reading + logFile = open(logfilePath, "r") + + # Open the output file, erasing any existing version + output= open(sliceFile, "w") + + # Use a regular expression to pull out the Seconds.Millis values from the + # Start of the gc log line. + pattern = re.compile(r'(?P<seconds>^[0-9]+)\.(?P<millis>[0-9]+):') + + for line in logFile: + m = pattern.match(line) + + if m: + seconds = m.group('seconds'); + millis = m.group('millis'); + + offset = timedelta(seconds=int(seconds),milliseconds=int(millis)) + + lineTime = gcstart + offset + + if lineTime > start: + if lineTime < end: + output.writelines(line) + # Perform stat processing for the min/max/avg + + # Process GC Duration lines in ParNew gc , + # ensure we do not have CMS printed as that means the line line has been corrupted + if line.find("ParNew") != -1 & line.find("CMS") == -1: + + # + # Example data line + # 7.646: [GC 7.646: [ParNew: 14778K->461K(14784K), 0.0026610 secs] 49879K->36609K(73288K), 0.0027560 secs] [Times: user=0.01 sys=0.00, real=0.01 secs] + # + # So entry 5 is the ParNew time and 8 is the whole GC cycle. 14 entries total + + data = line.split() + + gcTime = 0 + # Check we have a valid ParNew Line + if (len(data) == 15): + + # Record entires + # Record GC Duration data + + entriesGCDuration = entriesGCDuration + 1 + gcTime = float(data[8]) + + if (gcTime < minGCDuration): + minGCDuration = gcTime + + if (gcTime > maxGCDuration): + maxGCDuration = gcTime + + sumGCDuration = sumGCDuration + gcTime + + + + # Close the files + logFile.close + output.close() + + log("Sliced gc log") + + # Output stats file + statFile=sliceFile+".stats" + output= open(statFile, "w") + output.write("#type:min/max/avg") + output.write('\n') + # + # Only provide GCDuration if it was processed + # + output.write("GC_DUR:%.14f/%.14f/%.14f" % (minGCDuration, maxGCDuration , (sumGCDuration/float(entriesGCDuration)))) + output.write('\n') + output.close + + log("Generated stat data from CPU Log file") + + +def sliceLog4j(resultDir, start, end): + global _brokerLogs + logfilePath=_brokerLogs+os.sep+BROKER_LOG + log4jSliceFile=resultDir+os.sep+BROKER_LOG + + log4jstart = extractTime(ACCESS, logfilePath+".stat") + + # + # Say that first line is the start of the file, + # This value will give a time value to the initial + # logging before Log4j kicks in. + # + lineTime = log4jstart + + # Process the broker log4j file + # Open log CPU file for reading + logFile = open(logfilePath, "r") + + # + # Create outputfile + # + log4jslice = open(log4jSliceFile,"w") + + for line in logFile: + data = line.split() + + # + # If the line has a time at the start then process it + # otherwise use the previous time. This means if there is + # a stack trace in the middle of the log file then it will + # be copied over to the split file as long as it is in the + # split time. + # + if (hasTime(data)): + # + # //fixme remove tz addition. + # + lineTime = lineToDate(" ".join(data[0:2])+" +0000") + + if lineTime > start: + if lineTime < end: + print line + log4jslice.writelines(line) + + logFile.close() + log4jslice.close() + log("Sliced broker log") + + +# +# Check the first two entries of data can make a datetime object +# +def hasTime(data): + date = data[0] + time = data[1] + + # Examples: + # 2009-06-22 17:04:44,246 + # 2009-06-22 17:04:44.2464 + # 2009-06-22 17:04:44 + + # ISO-8601 '-' format date + dateRE = re.compile('[0-9][0-9][0-9][0-9]-[0-9][0-9]-[0-9][0-9]') + + # + # Check for times with/out millis + # e.g. + # 10:00:00,000 - log4j + # 10:00:00.0000 - generated in script for cpu time + # + timeRE = re.compile('[0-9][0-9]:[0-9][0-9]:[0-9][0-9]?[0-9]*') + + return dateRE.match(date) and timeRE.match(time) + +def createGraphData(resultDir, testName): + # Create graph.data file for process.sh + # Format two lines : Title and filename + # $version $type : $volume% volume + # $version-$brokerState-$type-$volume + version=getBrokerVersion() + + test= extractTestValue("n",resultDir, testName) + volume = int(float(extractTestResult("Test * Size Throughput", resultDir, testName)) * 1000) + messageSize = extractTestValue("messageSize",resultDir, testName) + ackMode = ackModeToString(extractTestValue("consAckMode",resultDir, testName)) + + graphDataFile=resultDir+os.sep+GRAPH_DATA + + graphData = open(graphDataFile, "w") + + # + # Write Title + graphData.write(version+":"+test+":"+str(messageSize)+"kb x "+str(volume)+" msg/sec using "+ackMode) + graphData.write('\n') + + # + # Write FileName + graphData.writelines(version+"-"+testName) + graphData.write('\n') + graphData.close + log("Created graph.data") + + +def getBrokerVersion(): + global _brokerLogs + READY = "Qpid Broker Ready" + brokerLogFile = _brokerLogs + os.sep + BROKER_LOG + + log = open(brokerLogFile, "r") + + dataLine = "" + for line in log: + if line.find(READY) != -1: + dataLine = line + break + + # Log Entry + #2009-06-19 17:04:02,493 INFO [main] server.Main (Main.java:456) - Qpid Broker Ready :2.3.0.1 build: 727403M + # Split on READY + data = dataLine.split(READY) + + # So [1] should be + # :2.3.0.1 build: 727403M + readyEntries = data[1].split() + + # so spliting on white space should give us ':version' + # and a quick split on ':' will give us the version + version = readyEntries[0].split(':')[1] + + # Strip to ensure we have no whitespace + return version.strip() + + +def extractTestValue(property,resultDir,testName): + return extractTestData(property,resultDir,testName," =") + +def extractTestResult(property,resultDir,testName): + return extractTestData(property,resultDir,testName,":") + +def extractTestData(property,resultDir,testName,type): + resultFile = resultDir + os.sep + testName+".csv" + + results = open(resultFile, "r") + + dataLine = "" + for line in results: + if line.find("Total Tests:") == 0: + dataLine = line + + results.close() + + # Data is CSV + data = dataLine.split(',') + + found = False + result = "" + searchProperty = property+type + + for entry in data: + if found: + result = entry + break + if entry.strip() == searchProperty: + found=True + + return result.strip() + +def createTestStatData(resultDir, testName): + csvFilePath=resultDir + os.sep + testName + ".csv" + + # Open the output file, erasing any existing version + # Keep track of the min/max sum and entries,. + minLatency=float(sys.maxint) + maxLatency=0.0 + minThroughput=float(sys.maxint) + maxThroughput=0.0 + + entries=0 + sumLatency=0.0 + sumThroughput=0.0 + + # + # Open csv File + # + csvFile = open(csvFilePath,"r") + for line in csvFile: + + # The PingAsyncTestPerf test class outputs the latency and throughput data. + if line.find("PingAsyncTestPerf") != -1: + # + # Data format is + # <Test> <TestName> <Thread> <Status> <Time> <Latency> <Concurrency> <Thread> <TestSize> + #org.apache.qpid.ping.PingAsyncTestPerf, testAsyncPingOk, Dispatcher-Channel-1, Pass, 209.074, 219.706, 0, 1, 10 + LatencyIndex = 5 + ThroughputIndex = 4 + + # The PingLatencyTestPerf test class just outputs the latency data. + if line.find("PingLatencyTestPerf") != -1: + # + # Data format is + # <Test> <TestName> <Thread> <Status> <Time> <Latency> <Concurrency> <Thread> <TestSize> + # org.apache.qpid.ping.PingLatencyTestPerf, testPingLatency, Dispatcher-Channel-1, Pass, 397.05502, 0, 2, 1000 + LatencyIndex = 4 + ThroughputIndex = -1 + + + # Only process the test lines that have 'org.apache.qpid.ping', i.e. skip header and footer. + if line.find("org.apache.qpid.ping") != -1: + # Perform stat processing for the min/max/avg + data = line.split(",") + + # Record entires + entries = entries + 1 + + # Record Metrics + # Record Latency data + latency = float(data[LatencyIndex]) + if (latency < minLatency): + minLatency = latency + + if (latency > maxLatency): + maxLatency = latency + + sumLatency = sumLatency + latency + + if (ThroughputIndex != -1): + # Record Latency data + throughput = float(data[ThroughputIndex]) + if (throughput < minThroughput): + minThroughput = throughput + + if (throughput > maxThroughput): + maxThroughput = throughput + + sumThroughput = sumThroughput + throughput + + + csvFile.close() + + # Output stats file + statFile=resultDir + os.sep + testName+".stats" + output= open(statFile, "w") + output.write("#type:min/max/avg") + output.write('\n') + output.write("LATENCY:"+str(minLatency)+"/"+str(maxLatency)+"/"+str(float(sumLatency)/float(entries))) + output.write('\n') + + if (ThroughputIndex != -1): + # Output msgs/sec based on time for a batch of msgs + output.write("THROUGHPUT:"+str(float(1000)/maxThroughput)+"/"+str(float(1000)/minThroughput)+"/"+str(float(1000)/(float(sumThroughput)/float(entries)))) + output.write('\n') + + output.close + + log("Generated stat data from test "+testName+" CSV file") + + +def ackModeToString(ackMode): + if ackMode == '0': + return "Transacted" + elif ackMode == '1': + return "AutoAck" + elif ackMode == '2': + return "ClientAck" + elif ackMode == '3': + return "DupsOK" + elif ackMode == '257': + return "NoAck" + elif ackMode == '258': + return "PreAck" + else: + return str(ackMode) + + + +def debug(msg): + global _debug + if _debug: + log(msg) + +def log(msg): + print msg + +def mkdir(dir): + if not os.path.exists(dir): + os.mkdir(dir) + +if __name__ == "__main__": + main()
\ No newline at end of file |