#!/usr/bin/env python # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # # Functions for comparing broker log files, used by cluster_tests.py. import os, os.path, re, glob from itertools import izip def split_log(log): """Split a broker log at checkpoints where a member joins. Return the set of checkpoints discovered.""" checkpoint_re = re.compile("Member joined, frameSeq=([0-9]+), queue snapshot:") outfile = None checkpoints = [] for l in open(log): match = checkpoint_re.search(l) if match: checkpoint = match.groups()[0] checkpoints.append(checkpoint) if outfile: outfile.close() outfile = open("%s.%s"%(log, checkpoint), 'w') if outfile: outfile.write(l) if outfile: outfile.close() return checkpoints def filter_log(log): """Filter the contents of a log file to remove data that is expected to differ between brokers in a cluster. Filtered log contents between the same checkpoints should match across the cluster.""" out = open("%s.filter"%(log), 'w') # Lines to skip entirely, expected differences skip = "|".join([ 'local connection', # Only on local broker 'UPDATER|UPDATEE', # Ignore update process 'stall for update|unstall, ignore update|cancelled offer .* unstall', 'caught up', 'active for links|Passivating links|Activating links', 'info Connecting: .*', # UpdateClient connection 'info Connection.* connected to', # UpdateClient connection 'warning Connection \\[[-0-9.: ]+\\] closed', # UpdateClient connection 'warning Broker closed connection: 200, OK', 'task late', 'task overran', 'warning CLOSING .* unsent data', 'Inter-broker link ', 'Running in a cluster, marking store', 'debug Sending keepalive signal to watchdog', # Watchdog timer thread 'last broker standing joined by 1 replicas, updating queue policies.', 'Connection .* timed out: closing' # heartbeat connection close ]) # Regex to match a UUID uuid='\w\w\w\w\w\w\w\w-\w\w\w\w-\w\w\w\w-\w\w\w\w-\w\w\w\w\w\w\w\w\w\w\w\w' # Substitutions to remove expected differences subs = [ (r'\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d ', ''), # Remove timestamp (r'cluster\([0-9.: ]*', 'cluster('), # Remove cluster node id (r' local\)| shadow\)', ')'), # Remove local/shadow indication (r'CATCHUP', 'READY'), # Treat catchup as equivalent to ready. (r'OFFER', 'READY'), # Treat offer as equivalent to ready. # System UUID expected to be different (r'(org.apache.qpid.broker:system[:(])%s(\)?)'%(uuid), r'\1UUID\2'), # TODO aconway 2010-12-20: review if these should be expected: (r' len=\d+', ' len=NN'), # buffer lengths (r' map={.*_object_name:([^,}]*)[,}].*', r' \1'), # V2 map - just keep name (r'\d+-\d+-\d+--\d+', 'X-X-X--X'), # V1 Object IDs ] # Substitutions to mask known issue: durable test shows inconsistent "changed stats for com.redhat.rhm.store:journal" messages. skip += '|Changed V[12] statistics com.redhat.rhm.store:journal' subs += [(r'to=console.obj.1.0.com.redhat.rhm.store.journal props=\d+ stats=\d+', 'to=console.obj.1.0.com.redhat.rhm.store.journal props=NN stats=NN')] skip_re = re.compile(skip) subs = [(re.compile(pattern), subst) for pattern, subst in subs] for l in open(log): if skip_re.search(l): continue for pattern,subst in subs: l = re.sub(pattern,subst,l) out.write(l) out.close() def verify_logs(): """Compare log files from cluster brokers, verify that they correspond correctly.""" for l in glob.glob("*.log"): filter_log(l) checkpoints = set() for l in glob.glob("*.filter"): checkpoints = checkpoints.union(set(split_log(l))) errors=[] for c in checkpoints: fragments = glob.glob("*.filter.%s"%(c)) fragments.sort(reverse=True, key=os.path.getsize) while len(fragments) >= 2: a = fragments.pop(0) b = fragments[0] for ab in izip(open(a), open(b)): if ab[0] != ab[1]: errors.append("\n %s %s"%(a, b)) break if errors: raise Exception("Files differ in %s"%(os.getcwd())+"".join(errors)) # Can be run as a script. if __name__ == "__main__": verify_logs()