summaryrefslogtreecommitdiff
path: root/qpid/cpp/management/python/bin/qpid-ha
blob: 1c07658d347961f31a0b1d8a6e2dee82b0eef6dd (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
#!/usr/bin/env python

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#

import optparse, sys, time, os, re, math
from qpid.messaging import Connection
from qpid.messaging import Message as QpidMessage
from qpid.util import URL
from qpidtoollibs.broker import BrokerAgent
from qpidtoollibs.config import parse_qpidd_conf
try:
    from uuid import uuid4
except ImportError:
    from qpid.datatypes import uuid4

# QMF address for the HA broker object.
HA_BROKER = "org.apache.qpid.ha:habroker:ha-broker"
# Define these defaults here rather than in add_option because we want
# to use qpidd.conf for defaults if --config is specified and
# these defaults otherwise:
DEFAULTS = { "broker":"0.0.0.0", "timeout":10.0}

class ExitStatus(Exception):
    """Raised if a command want's a non-0 exit status from the script"""
    def __init__(self, status): self.status = status

def find_qpidd_conf():
    """Return the path to the local qpid.conf file or None if it is not found"""
    p = os.path
    prefix, bin = p.split(p.dirname(__file__))
    if bin == "bin":            # Installed in a standard place.
        conf = p.join(prefix, "etc", "qpid", "qpidd.conf")
        if p.isfile(conf): return conf
    return None

class Command(object):
    """
    Common options and logic for all commands. Subclasses provide additional
    options and execution logic.
    """

    commands = []

    def __init__(self, name, help, arg_names=[], connect_agent=True):
        """@param connect_agent true if we should establish a QMF agent connection"""
        Command.commands.append(self)
        self.name = name
        self.connect_agent = connect_agent
        self.arg_names = arg_names
        usage="%s [options] %s\n\n%s"%(name, " ".join(arg_names), help)
        self.help = help
        self.op=optparse.OptionParser(usage)
        common = optparse.OptionGroup(self.op, "Broker connection options")
        def help_default(what): return " (Default %s)"%DEFAULTS[what]
        common.add_option("-b", "--broker", metavar="<address>", help="Address of qpidd broker with syntax: [username/password@] hostname | ip-address [:<port>]"+help_default("broker"))
        common.add_option("--timeout", type="float", metavar="<seconds>", help="Give up if the broker does not respond within the timeout. 0 means wait forever"+help_default("timeout"))
        common.add_option("--sasl-mechanism", metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD5, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override")
        common.add_option("--sasl-service-name", action="store", type="string", help="SASL service name to use")
        common.add_option("--ssl-certificate", metavar="<cert>", help="Client SSL certificate (PEM Format)")
        common.add_option("--ssl-key", metavar="<key>", help="Client SSL private key (PEM Format)")
        common.add_option("--config", metavar="<path/to/qpidd.conf>", help="Read default connection configuration from the qpidd.conf broker configuration file. Defaults are overridden by command-line options.)")
        self.op.add_option_group(common)

    def connect(self, opts):
        conn_options = {}
        if not opts.broker:
            opts.broker = DEFAULTS["broker"]
            # If we are connecting locally, use local qpidd.conf by default
            if not opts.config: opts.config = find_qpidd_conf()
        url = URL(opts.broker)
        if opts.config:         # Use broker config file for defaults
            config = parse_qpidd_conf(opts.config)
            if not url.user: url.user = config.get("ha-username")
            if not url.password: url.password = config.get("ha-password")
            if not url.port: url.port = config.get("port")
            opts.broker = str(url)
            if not opts.sasl_mechanism: opts.sasl_mechanism = config.get("ha-mechanism")
            if  not opts.timeout:
                timeout = config.get("ha-heartbeat-interval") or config.get("link-heartbeat-interval")
                if timeout: opts.timeout = float(timeout)
        else:                   # Use DEFAULTS
            if not opts.timeout: opts.timeout = DEFAULTS["timeout"]
        if opts.sasl_mechanism: conn_options['sasl_mechanisms'] = opts.sasl_mechanism
        if opts.sasl_service_name:
            conn_options['sasl_service'] = opts.sasl_service_name
        if opts.ssl_certificate: conn_options['ssl_certfile'] = opts.ssl_certificate
        if opts.ssl_key:
            if not opts.ssl_certificate:
                self.op.error("missing '--ssl-certificate' (required by '--ssl-key')")
            conn_options['ssl_keyfile'] = opts.ssl_key
        conn_options['client_properties'] = {'qpid.ha-admin' : 1}
        if opts.timeout:
            conn_options['timeout'] = opts.timeout
            conn_options['heartbeat'] = int(math.ceil(opts.timeout/2))
        connection = Connection.establish(opts.broker, **conn_options)
        qmf_broker = self.connect_agent and BrokerAgent(connection)
        ha_broker = self.connect_agent and qmf_broker.getHaBroker()
        return (connection, qmf_broker, ha_broker)

    def all_brokers(self, ha_broker, opts, func):
        """@return: List of (broker_addr, ha_broker) for all brokers in the cluster.
        Returns (broker_addr, Exception) if an exception is raised accessing a broker.
        """
        # The brokersUrl setting is not in python URL format, simpler parsing here.
        result = []
        brokers = filter(None, re.sub(r'(^amqps?:)|(tcp:)', "", ha_broker.brokersUrl).split(","))
        if brokers and opts.all:
            if "@" in opts.broker: userpass = opts.broker.split("@")[0]
            else: userpass = None
            for b in brokers:
                if userpass and not "@" in b: opts.broker = userpass+"@"+b
                else: opts.broker = b
                try:
                    connection, qmf_broker, ha_broker = self.connect(opts)
                    func(ha_broker, b)
                except Exception,e:
                    func(ha_broker, b, e)
        else:
            func(ha_broker)

    def execute(self, args):
        opts, args = self.op.parse_args(args)
        if len(args) != len(self.arg_names)+1:
            self.op.print_help()
            raise Exception("Wrong number of arguments")
        self.connection, qmf_broker, ha_broker = self.connect(opts)
        if self.connect_agent and not ha_broker:
            raise Exception("HA module is not loaded on broker at %s" % opts.broker)
        try: self.do_execute(qmf_broker, ha_broker, opts, args)
        finally: self.connection.close()

    def do_execute(self, qmf_broker, opts, args):
        raise Exception("Command '%s' is not yet implemented"%self.name)

class ManagerCommand(Command):
    """
    Base for commands that should only be used by a cluster manager tool that ensures
    cluster consistency.
    """

    manager_commands = []           # Cluster manager commands

    def __init__(self, name, help, arg_names=[], connect_agent=True):
        """@param connect_agent true if we should establish a QMF agent connection"""
        super(ManagerCommand, self).__init__(name, "[Cluster manager only] "+help, arg_names, connect_agent)
        self.commands.remove(self)   # Not a user command
        self.manager_commands.append(self)


class PingCmd(Command):
    def __init__(self):
        Command.__init__(self, "ping","Check if the broker is alive and responding", connect_agent=False)
    def do_execute(self, qmf_broker, ha_broker, opts, args):
        self.connection.session() # Make sure we can establish a session.
PingCmd()

class PromoteCmd(ManagerCommand):
    def __init__(self):
        super(PromoteCmd, self).__init__("promote", "Promote a backup broker to primary. This command should *only* be used by a cluster manager (such as rgmanager) that ensures only one broker is primary at a time. Promoting more than one broker to primary at the same time will make the cluster inconsistent and will cause data loss and unexpected behavior.")

    def do_execute(self, qmf_broker, ha_broker, opts, args):
        qmf_broker._method("promote", {}, HA_BROKER, timeout=opts.timeout)

PromoteCmd()


class StatusCmd(Command):
    def __init__(self):
        Command.__init__(self, "status", "Print HA status")
        self.op.add_option(
            "--expect", metavar="<status>",
            help="Don't print status. Return 0 if it matches <status>, 1 otherwise")
        self.op.add_option(
            "--is-primary", action="store_true", default=False,
            help="Don't print status. Return 0 if the broker is primary, 1 otherwise")
        self.op.add_option(
            "--all", action="store_true", default=False,
            help="Print status for all brokers in the cluster")

    def do_execute(self, qmf_broker, ha_broker, opts, args):
        if opts.is_primary:
            if not ha_broker.status in ["active", "recovering"]: raise ExitStatus(1)
            return
        if opts.expect:
            if opts.expect != ha_broker.status: raise ExitStatus(1)
            return

        def status(hb, b=None, ex=None):
            if ex: print b, ex
            elif b: print b, hb.status
            else: print hb.status
        self.all_brokers(ha_broker, opts, status)

StatusCmd()

class ReplicateCmd(Command):
    def __init__(self):
        Command.__init__(self, "replicate", "Set up replication from <queue> on <remote-broker> to <queue> on the current broker.", ["<queue>", "<remote-broker>"])
    def do_execute(self, qmf_broker, ha_broker, opts, args):
        qmf_broker._method("replicate", {"broker":args[1], "queue":args[2]}, HA_BROKER, timeout=opts.timeout)
ReplicateCmd()

class QueryCmd(Command):
    def __init__(self):
        Command.__init__(self, "query", "Print HA configuration and status")
        self.op.add_option(
            "--all", action="store_true", default=False,
            help="Print configuration and status for all brokers in the cluster")

    def do_execute(self, qmf_broker, ha_broker, opts, args):
        def query(hb, b=None, ex=None):
            if ex:
                print "%s %s\n" % (b, ex)
            else:
                if b:
                    print "%-20s %s"%("Address:", b)
                for x in [("Status:", hb.status),
                          ("Broker ID:", hb.systemId),
                          ("Brokers URL:", hb.brokersUrl),
                          ("Public URL:", hb.publicUrl),
                          ("Replicate: ", hb.replicateDefault)
                          ]:
                    print "%-20s %s"%x
                if b: print
        self.all_brokers(ha_broker, opts, query)


QueryCmd()

def print_usage(prog):
    print "usage: %s <command> [<arguments>]\n\nCommands are:\n"%prog
    for cmd in Command.commands:
        print "  %-12s %s."%(cmd.name, cmd.help.split(".")[0])
    print "\nFor help with a command type: %s <command> --help\n"%prog

def find_command(args, commands):
    """Find a command among the arguments and options"""
    for arg in args:
        cmds = [cmd for cmd in commands if cmd.name == arg]
        if cmds: return cmds[0]
    return None

def main_except(argv):
    """This version of main raises exceptions"""
    args = argv[1:]
    commands = Command.commands
    if "--cluster-manager" in args:
        commands += ManagerCommand.manager_commands
        args.remove("--cluster-manager")
    if len(args) and args[0] in ['help', '--help', '-help', '-h', 'help-all', '--help-all']:
        if 'help-all' in args[0]:
            for c in commands: c.op.print_help(); print
        else:
            print_usage(os.path.basename(argv[0]));
    else:
        command = find_command(args, commands)
        if command:
            command.execute(args)
        else:
            # Check for attempt to use a manager command without --cluster-manager
            command = find_command(args, ManagerCommand.manager_commands)
            if command:
                message="""'%s' should only be called by the cluster manager.
Incorrect use of '%s' will cause cluster malfunction.
To call from a cluster manager use '%s --cluster-manager'. """
                raise Exception(message%((command.name,)*3))
            else:
                print_usage(os.path.basename(argv[0]));
                raise Exception("No valid command")

def main(argv):
    try:
        main_except(argv)
        return 0
    except ExitStatus, e:
        return e.status
    except Exception, e:
        print "%s: %s"%(type(e).__name__, e)
        return 1

if __name__ == "__main__":
    sys.exit(main(sys.argv))