path: root/qpid/java/perftests/bin/processing/
diff options
Diffstat (limited to 'qpid/java/perftests/bin/processing/')
1 files changed, 0 insertions, 850 deletions
diff --git a/qpid/java/perftests/bin/processing/ b/qpid/java/perftests/bin/processing/
deleted file mode 100755
index 8db44eda79..0000000000
--- a/qpid/java/perftests/bin/processing/
+++ /dev/null
@@ -1,850 +0,0 @@
-#!/usr/bin/env python
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-import os
-import re
-import datetime
-import sys
-import string
-from optparse import OptionParser
-from datetime import datetime, timedelta
-import shutil
-def showUsage():
- log("./ [-b|--broker-log-dir] <dir> [-t|--test-dir] <dir>")
-_verbose = False
-_debug = False
-_brokerLogs = ""
-def exitError(message):
- log(message)
- sys.exit(1)
-def main():
- global _log, _verbose, _debug, _brokerLogs
- # Load the
- parser = OptionParser()
- parser.add_option("-v", "--verbose", dest="verbose",
- action="store_true", default=False, help="enable verbose output")
- parser.add_option("-d", "--debug", dest="debug",
- action="store_true", default=False, help="enable debug output")
- parser.add_option("-b", "--broker-log-dir", dest="brokerLogs",
- action="store", default=True, help="Broker Logs")
- parser.add_option("-t", "--test-dir", dest="testDir",
- action="store", default="", help="Test Results")
- (options, args) = parser.parse_args()
- _verbose = options.verbose
- _debug = options.debug
- testDir = options.testDir
- _brokerLogs = options.brokerLogs
- if testDir == "" or _brokerLogs == "" :
- log("Broker Log Dir and Test Dir are both requried.")
- showUsage()
- if not os.path.exists(testDir):
- exitError("Test directory does not exist:" + testDir)
- if not os.path.exists(_brokerLogs):
- exitError("Broker log directory does not exist:" + _brokerLogs)
- # Standardize the format of the broker logs
- preProcessBrokerLogs(_brokerLogs)
- # Get list of test results from test_dir
- processTestResults(testDir)
-# Process the log files we know of
-def preProcessBrokerLogs(resultDir):
- print "Pre Processing Broker Logs"
- # Pre-Process GC - no pre processing required
- # Process Log4j - no processing required as file is already time stamped.
- # Pre-Process broker_cpu
- processCPUUsage(resultDir)
-# Process the broker CPU log file and create an output file of format
-# <Date Time> <CPU Usage>
-def processCPUUsage(resultDir):
- logfile=resultDir+os.sep+BROKER_CPU
- datedFile=resultDir+os.sep+BROKER_CPU_DATED
- start = extractTime(ACCESS, logfile+".stat")
- pid = getPID(BROKER_PID)
- topRate = getFirstLine(_brokerLogs+os.sep+"top.rate")
- #
- # Calulate addition required per process line output
- #
- if topRate.find(".") == -1:
- seconds = topRate
- millis = 0
- else:
- split = topRate.split('.')
- seconds = split[0]
- # Convert
- millis = float("0."+split[1]) * 1000
- offset = timedelta(seconds=int(seconds),milliseconds=int(millis))
- #
- # Process the CPU log file and make a file of format:
- # datetime <CPU% usage> <MEM% usage>
- #
- # Open log CPU file for reading
- logfile = open(logfile, "r")
- # Open the output file, erasing any existing version
- # Keep track of the min/max sum and entries,.
- minCPU=float(sys.maxint)
- maxCPU=0.0
- minMem=float(sys.maxint)
- maxMem=0.0
- entries=0
- sumCPU=0.0
- sumMem=0.0
- output= open(datedFile, "w")
- for line in logfile:
- #
- # Data format
- # 0 1 2 3 4 5 6 7 8 9 10 11
- #
- # %CPU and %MEM are vary, probably based on os/version of top.
- # lets auto-detect where it is.
- #
- # Index is 0 based for array usage.
- index = 0
- if line.find("PID") != -1:
- for key in line.split(" "):
- strippedKey = key.lstrip()
- if len(strippedKey) > 0:
- # Record the key index
- if (strippedKey == "%CPU"):
- cpuIndex=index
- if (strippedKey == "%MEM"):
- memIndex=index
- # Increase count for next key
- index = index + 1
- # Find lines that contain our broker process
- if line.find("QPBRKR") != -1:
- # Split line on whitespace
- data = line.split()
- #Write out the date time (ISO-8601 format)
- output.write(str(start))
- # Output the %CPU value
- output.write(" "+str(data[cpuIndex]))
- # Output the %MEM value
- output.write(" "+str(data[memIndex]))
- output.write('\n')
- # Add the offset based on the logging rate
- start = start + offset
- # Record entires
- entries = entries + 1
- # Record Metrics
- # Record CPU data
- cpu = float(data[cpuIndex])
- if (cpu < minCPU):
- minCPU = cpu
- if (cpu > maxCPU):
- maxCPU = cpu
- sumCPU = sumCPU + cpu
- # Record Mem data
- mem = float(data[memIndex])
- if (mem < minMem):
- minMem = mem
- if (mem > maxMem):
- maxMem = mem
- sumMem = sumMem + mem
- #end for
- # Close the files
- logfile.close
- output.close
- # Output stats file
- statFile=resultDir+os.sep+BROKER_CPU+".stats"
- output= open(statFile, "w")
- output.write("#type:min/max/avg")
- output.write('\n')
- output.write("CPU:"+str(minCPU)+"/"+str(maxCPU)+"/"+str(float(sumCPU)/float(entries)))
- output.write('\n')
- output.write("MEM:"+str(minMem)+"/"+str(maxMem)+"/"+str(float(sumMem)/float(entries)))
- output.write('\n')
- output.close
- log("Pre Process of CPU Log file '"+BROKER_CPU+"' complete")
-# Give an known process type get the recorded PID.
-def getPID(process):
- return getFirstLine(_brokerLogs+os.sep+process)
-# Get the first line of the file without EOL chars.
-# NOTE: this will load the entire file into memory to do it.
-def getFirstLine(fileName):
- f = open(fileName,"r")
- line =[0]
- f.close
- return line
-# Walk the directory given and process all csv test results
-def processTestResults(resultDir):
- for root, dirs, files in os.walk(resultDir, topdown=False):
- if len(files) == 0:
- exitError("Test result directory is empty:" + resultDir)
- for file in files:
- if file.endswith(".csv"):
- processTestResult(root , file)
-def processTestResult(root, resultFile):
- # Open stat file and extract test times, we determine:
- # -start time based on the 'Access' value
- # -end time based on the 'Modify' value 'Change' would also work
- statFile=root+os.sep+resultFile+".stat"
- if not os.path.exists(statFile):
- log("Unable to process : Unable to open stat file:" + statFile)
- return
- createResultSetPackage(root, resultFile)
-def extractTime(field, statFile):
- stats = open(statFile, "r")
- for line in stats:
- if line.startswith(field):
- if line.find("(") == -1:
- dt = lineToDate(" ".join(line.split()[1:]))
- #
- # TODO We need to handle time time zone issues as I'm sure we will have issues with the
- # log4j matching.
- stats.close
- return dt
-# Given a text line in ISO format convert it to a date object
-def lineToDate(line):
- #2009-06-22 17:04:44,320
- #2009-06-22 17:04:44.320
- pattern = re.compile(r'(?P<year>^[0-9][0-9][0-9][0-9])-(?P<month>[0-9][0-9])-(?P<day>[0-9][0-9]) (?P<hour>[0-9][0-9]):(?P<minute>[0-9][0-9]):(?P<seconds>[0-9][0-9])')
- m = pattern.match(line)
- if m:
- year = int('year'))
- month = int('month'))
- day = int('day'))
- hour = int('hour'))
- minute = int('minute'))
- seconds = int('seconds'))
- pattern = re.compile(r'(?P<year>^[0-9][0-9][0-9][0-9])-(?P<month>[0-9][0-9])-(?P<day>[0-9][0-9]) (?P<hour>[0-9][0-9]):(?P<minute>[0-9][0-9]):(?P<seconds>[0-9][0-9])[.|,](?P<micro>[0-9]+)')
- m = pattern.match(line)
- micro = None
- if m:
- micro ='micro')
- if micro == None:
- micro = 0
- # Correct issue where micros are actually nanos
- if int(micro) > 999999:
- micro = int(micro) / 1000
- return datetime(year,month,day,hour,minute,seconds,int(micro))
- else:
- # Error we shouldn't get here
- return null
-def createResultSetPackage(root, resultFile):
- # Get the Name of the test to make a directory with said name
- testName = resultFile.split(".csv")[0]
- resultDir = root+ os.sep + testName
- log("Processing Result set for:"+ testName)
- mkdir(resultDir)
- # Move result file to new directory
- shutil.move(root + os.sep + resultFile, resultDir)
- # Move stat file to new directory
- shutil.move(root + os.sep + resultFile + ".stat", resultDir)
- statFile=resultDir + os.sep + resultFile + ".stat"
- #
- # Get start and end time for test run
- #
- start = extractTime(ACCESS, statFile)
- end = extractTime(MODIFY, statFile)
- sliceBrokerLogs(resultDir, start, end)
- createGraphData(resultDir, testName)
- createTestStatData(resultDir, testName)
- log("Created Result Package for:"+ testName)
-def sliceBrokerLogs(resultDir, start, end):
- sliceCPULog(resultDir, start, end)
- sliceLog4j(resultDir, start, end)
- sliceGCLog(resultDir, start, end)
-def sliceCPULog(resultDir, start, end):
- global _brokerLogs
- logfilePath=_brokerLogs+os.sep+BROKER_CPU_DATED
- cpuSliceFile=resultDir+os.sep+BROKER_CPU
- # Process the CPU log file and make a file of format:
- # datetime <CPU% usage> <MEM% usage>
- #
- # Open log CPU file for reading
- logFile = open(logfilePath, "r")
- # Open the output file, erasing any existing version
- # Keep track of the min/max sum and entries,.
- minCPU=float(sys.maxint)
- maxCPU=0.0
- minMem=float(sys.maxint)
- maxMem=0.0
- entries=0
- sumCPU=0.0
- sumMem=0.0
- #
- # Create outputfile
- #
- cpuslice = open(cpuSliceFile,"w")
- for line in logFile:
- data = line.split()
- #
- # //fixme remove tz addition.
- #
- lineTime = lineToDate(" ".join(data[0:2])+" +0000")
- if lineTime > start:
- if lineTime < end:
- # Write the data though to the new file
- cpuslice.writelines(line)
- # Perform stat processing for the min/max/avg
- data = line.split()
- #
- # Data format is
- # <Date> <Time> <%CPU> <%MEM>
- # 2010-02-19 10:16:17 157 28.1
- #
- cpuIndex = 2
- memIndex = 3
- # Record entires
- entries = entries + 1
- # Record Metrics
- # Record CPU data
- cpu = float(data[cpuIndex])
- if (cpu < minCPU):
- minCPU = cpu
- if (cpu > maxCPU):
- maxCPU = cpu
- sumCPU = sumCPU + cpu
- # Record Mem data
- mem = float(data[memIndex])
- if (mem < minMem):
- minMem = mem
- if (mem > maxMem):
- maxMem = mem
- sumMem = sumMem + mem
- logFile.close()
- cpuslice.close()
- log("Sliced CPU log")
- # Output stats file
- statFile=cpuSliceFile+".stats"
- output= open(statFile, "w")
- output.write("#type:min/max/avg")
- output.write('\n')
- output.write("CPU:"+str(minCPU)+"/"+str(maxCPU)+"/"+str(float(sumCPU)/float(entries)))
- output.write('\n')
- output.write("MEM:"+str(minMem)+"/"+str(maxMem)+"/"+str(float(sumMem)/float(entries)))
- output.write('\n')
- output.close
- log("Generated stat data from CPU Log file")
-def sliceGCLog(resultDir, start, end):
- global _brokerLogs
- logfilePath=_brokerLogs+os.sep+BROKER_GC
- sliceFile=resultDir+os.sep+BROKER_GC
- gcstart = extractTime(ACCESS, logfilePath+".stat")
- # Open the output file, erasing any existing version
- # Keep track of the min/max sum and entries,.
- minGCDuration=float(sys.maxint)
- maxGCDuration=0.0
- sumGCDuration=0.0
- entriesGCDuration = 0
- # Open log GC file for reading
- logFile = open(logfilePath, "r")
- # Open the output file, erasing any existing version
- output= open(sliceFile, "w")
- # Use a regular expression to pull out the Seconds.Millis values from the
- # Start of the gc log line.
- pattern = re.compile(r'(?P<seconds>^[0-9]+)\.(?P<millis>[0-9]+):')
- for line in logFile:
- m = pattern.match(line)
- if m:
- seconds ='seconds');
- millis ='millis');
- offset = timedelta(seconds=int(seconds),milliseconds=int(millis))
- lineTime = gcstart + offset
- if lineTime > start:
- if lineTime < end:
- output.writelines(line)
- # Perform stat processing for the min/max/avg
- # Process GC Duration lines in ParNew gc ,
- # ensure we do not have CMS printed as that means the line line has been corrupted
- if line.find("ParNew") != -1 & line.find("CMS") == -1:
- #
- # Example data line
- # 7.646: [GC 7.646: [ParNew: 14778K->461K(14784K), 0.0026610 secs] 49879K->36609K(73288K), 0.0027560 secs] [Times: user=0.01 sys=0.00, real=0.01 secs]
- #
- # So entry 5 is the ParNew time and 8 is the whole GC cycle. 14 entries total
- data = line.split()
- gcTime = 0
- # Check we have a valid ParNew Line
- if (len(data) == 15):
- # Record entires
- # Record GC Duration data
- entriesGCDuration = entriesGCDuration + 1
- gcTime = float(data[8])
- if (gcTime < minGCDuration):
- minGCDuration = gcTime
- if (gcTime > maxGCDuration):
- maxGCDuration = gcTime
- sumGCDuration = sumGCDuration + gcTime
- # Close the files
- logFile.close
- output.close()
- log("Sliced gc log")
- # Output stats file
- statFile=sliceFile+".stats"
- output= open(statFile, "w")
- output.write("#type:min/max/avg")
- output.write('\n')
- #
- # Only provide GCDuration if it was processed
- #
- output.write("GC_DUR:%.14f/%.14f/%.14f" % (minGCDuration, maxGCDuration , (sumGCDuration/float(entriesGCDuration))))
- output.write('\n')
- output.close
- log("Generated stat data from CPU Log file")
-def sliceLog4j(resultDir, start, end):
- global _brokerLogs
- logfilePath=_brokerLogs+os.sep+BROKER_LOG
- log4jSliceFile=resultDir+os.sep+BROKER_LOG
- log4jstart = extractTime(ACCESS, logfilePath+".stat")
- #
- # Say that first line is the start of the file,
- # This value will give a time value to the initial
- # logging before Log4j kicks in.
- #
- lineTime = log4jstart
- # Process the broker log4j file
- # Open log CPU file for reading
- logFile = open(logfilePath, "r")
- #
- # Create outputfile
- #
- log4jslice = open(log4jSliceFile,"w")
- for line in logFile:
- data = line.split()
- #
- # If the line has a time at the start then process it
- # otherwise use the previous time. This means if there is
- # a stack trace in the middle of the log file then it will
- # be copied over to the split file as long as it is in the
- # split time.
- #
- if (hasTime(data)):
- #
- # //fixme remove tz addition.
- #
- lineTime = lineToDate(" ".join(data[0:2])+" +0000")
- if lineTime > start:
- if lineTime < end:
- print line
- log4jslice.writelines(line)
- logFile.close()
- log4jslice.close()
- log("Sliced broker log")
-# Check the first two entries of data can make a datetime object
-def hasTime(data):
- date = data[0]
- time = data[1]
- # Examples:
- # 2009-06-22 17:04:44,246
- # 2009-06-22 17:04:44.2464
- # 2009-06-22 17:04:44
- # ISO-8601 '-' format date
- dateRE = re.compile('[0-9][0-9][0-9][0-9]-[0-9][0-9]-[0-9][0-9]')
- #
- # Check for times with/out millis
- # e.g.
- # 10:00:00,000 - log4j
- # 10:00:00.0000 - generated in script for cpu time
- #
- timeRE = re.compile('[0-9][0-9]:[0-9][0-9]:[0-9][0-9]?[0-9]*')
- return dateRE.match(date) and timeRE.match(time)
-def createGraphData(resultDir, testName):
- # Create file for
- # Format two lines : Title and filename
- # $version $type : $volume% volume
- # $version-$brokerState-$type-$volume
- version=getBrokerVersion()
- test= extractTestValue("n",resultDir, testName)
- volume = int(float(extractTestResult("Test * Size Throughput", resultDir, testName)) * 1000)
- messageSize = extractTestValue("messageSize",resultDir, testName)
- ackMode = ackModeToString(extractTestValue("consAckMode",resultDir, testName))
- graphDataFile=resultDir+os.sep+GRAPH_DATA
- graphData = open(graphDataFile, "w")
- #
- # Write Title
- graphData.write(version+":"+test+":"+str(messageSize)+"kb x "+str(volume)+" msg/sec using "+ackMode)
- graphData.write('\n')
- #
- # Write FileName
- graphData.writelines(version+"-"+testName)
- graphData.write('\n')
- graphData.close
- log("Created")
-def getBrokerVersion():
- global _brokerLogs
- READY = "Qpid Broker Ready"
- brokerLogFile = _brokerLogs + os.sep + BROKER_LOG
- log = open(brokerLogFile, "r")
- dataLine = ""
- for line in log:
- if line.find(READY) != -1:
- dataLine = line
- break
- # Log Entry
- #2009-06-19 17:04:02,493 INFO [main] server.Main ( - Qpid Broker Ready : build: 727403M
- # Split on READY
- data = dataLine.split(READY)
- # So [1] should be
- # : build: 727403M
- readyEntries = data[1].split()
- # so spliting on white space should give us ':version'
- # and a quick split on ':' will give us the version
- version = readyEntries[0].split(':')[1]
- # Strip to ensure we have no whitespace
- return version.strip()
-def extractTestValue(property,resultDir,testName):
- return extractTestData(property,resultDir,testName," =")
-def extractTestResult(property,resultDir,testName):
- return extractTestData(property,resultDir,testName,":")
-def extractTestData(property,resultDir,testName,type):
- resultFile = resultDir + os.sep + testName+".csv"
- results = open(resultFile, "r")
- dataLine = ""
- for line in results:
- if line.find("Total Tests:") == 0:
- dataLine = line
- results.close()
- # Data is CSV
- data = dataLine.split(',')
- found = False
- result = ""
- searchProperty = property+type
- for entry in data:
- if found:
- result = entry
- break
- if entry.strip() == searchProperty:
- found=True
- return result.strip()
-def createTestStatData(resultDir, testName):
- csvFilePath=resultDir + os.sep + testName + ".csv"
- # Open the output file, erasing any existing version
- # Keep track of the min/max sum and entries,.
- minLatency=float(sys.maxint)
- maxLatency=0.0
- minThroughput=float(sys.maxint)
- maxThroughput=0.0
- entries=0
- sumLatency=0.0
- sumThroughput=0.0
- #
- # Open csv File
- #
- csvFile = open(csvFilePath,"r")
- for line in csvFile:
- # The PingAsyncTestPerf test class outputs the latency and throughput data.
- if line.find("PingAsyncTestPerf") != -1:
- #
- # Data format is
- # <Test> <TestName> <Thread> <Status> <Time> <Latency> <Concurrency> <Thread> <TestSize>
-, testAsyncPingOk, Dispatcher-Channel-1, Pass, 209.074, 219.706, 0, 1, 10
- LatencyIndex = 5
- ThroughputIndex = 4
- # The PingLatencyTestPerf test class just outputs the latency data.
- if line.find("PingLatencyTestPerf") != -1:
- #
- # Data format is
- # <Test> <TestName> <Thread> <Status> <Time> <Latency> <Concurrency> <Thread> <TestSize>
- #, testPingLatency, Dispatcher-Channel-1, Pass, 397.05502, 0, 2, 1000
- LatencyIndex = 4
- ThroughputIndex = -1
- # Only process the test lines that have '', i.e. skip header and footer.
- if line.find("") != -1:
- # Perform stat processing for the min/max/avg
- data = line.split(",")
- # Record entires
- entries = entries + 1
- # Record Metrics
- # Record Latency data
- latency = float(data[LatencyIndex])
- if (latency < minLatency):
- minLatency = latency
- if (latency > maxLatency):
- maxLatency = latency
- sumLatency = sumLatency + latency
- if (ThroughputIndex != -1):
- # Record Latency data
- throughput = float(data[ThroughputIndex])
- if (throughput < minThroughput):
- minThroughput = throughput
- if (throughput > maxThroughput):
- maxThroughput = throughput
- sumThroughput = sumThroughput + throughput
- csvFile.close()
- # Output stats file
- statFile=resultDir + os.sep + testName+".stats"
- output= open(statFile, "w")
- output.write("#type:min/max/avg")
- output.write('\n')
- output.write("LATENCY:"+str(minLatency)+"/"+str(maxLatency)+"/"+str(float(sumLatency)/float(entries)))
- output.write('\n')
- if (ThroughputIndex != -1):
- # Output msgs/sec based on time for a batch of msgs
- output.write("THROUGHPUT:"+str(float(1000)/maxThroughput)+"/"+str(float(1000)/minThroughput)+"/"+str(float(1000)/(float(sumThroughput)/float(entries))))
- output.write('\n')
- output.close
- log("Generated stat data from test "+testName+" CSV file")
-def ackModeToString(ackMode):
- if ackMode == '0':
- return "Transacted"
- elif ackMode == '1':
- return "AutoAck"
- elif ackMode == '2':
- return "ClientAck"
- elif ackMode == '3':
- return "DupsOK"
- elif ackMode == '257':
- return "NoAck"
- elif ackMode == '258':
- return "PreAck"
- else:
- return str(ackMode)
-def debug(msg):
- global _debug
- if _debug:
- log(msg)
-def log(msg):
- print msg
-def mkdir(dir):
- if not os.path.exists(dir):
- os.mkdir(dir)
-if __name__ == "__main__":
- main() \ No newline at end of file