1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
|
#!/usr/bin/env python
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
"""%prog [options] broker...
Check for brokers that lag behind other brokers in a cluster."""
import os, os.path, sys, socket, time, re
from qpid.messaging import *
from optparse import OptionParser
from threading import Thread
class Browser(Thread):
def __init__(self, broker, queue, timeout):
Thread.__init__(self)
self.broker = broker
self.queue = queue
self.timeout = timeout
self.error = None
self.time = None
def run(self):
try:
self.connection = Connection(self.broker)
self.connection.open()
self.session = self.connection.session()
self.receiver = self.session.receiver("%s;{mode:browse}"%self.queue)
self.msg = self.receiver.fetch(timeout=self.timeout)
self.time = time.time()
if (self.msg.content != self.queue):
raise Exception("Wrong message content, expected '%s' found '%s'"%
(self.queue, self.msg.content))
except Empty:
self.error = "No message on queue %s"%self.queue
except Exception, e:
self.error = "Error: %s"%e
def main(argv):
op = OptionParser(usage=__doc__)
op.add_option("--timeout", type="float", default=None, metavar="TIMEOUT",
help="Give up after TIMEOUT milliseconds, default never timeout")
(opts, args) = op.parse_args(argv)
if (len(args) <= 1): op.error("No brokers were specified")
brokers = args[1:]
# Put a message on a uniquely named queue.
queue = "%s:%s:%s"%(os.path.basename(args[0]), socket.gethostname(), os.getpid())
connection = Connection(brokers[0])
connection.open()
session = connection.session()
sender = session.sender(
"%s;{create:always,delete:always,node:{durable:False}}"%queue)
sender.send(Message(content=queue))
start = time.time()
# Browse for the message on each broker
if opts.timeout: opts.timeout
threads = [Browser(b, queue, opts.timeout) for b in brokers]
for t in threads: t.start()
delays=[]
for t in threads:
t.join()
if t.error:
delay=t.error
else:
delay = t.time-start
delays.append([delay, t.broker])
print "%s: %s"%(t.broker,delay)
if delays:
delays.sort()
print "lag: %s (%s-%s)"%(delays[-1][0] - delays[0][0], delays[-1][1], delays[0][1])
# Clean up
sender.close()
session.close()
connection.close()
if __name__ == "__main__": sys.exit(main(sys.argv))
|