diff options
Diffstat (limited to 'qpid/python/examples/reservations/reserve')
-rwxr-xr-x | qpid/python/examples/reservations/reserve | 197 |
1 files changed, 197 insertions, 0 deletions
diff --git a/qpid/python/examples/reservations/reserve b/qpid/python/examples/reservations/reserve new file mode 100755 index 0000000000..68e7fee912 --- /dev/null +++ b/qpid/python/examples/reservations/reserve @@ -0,0 +1,197 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import optparse, os, sys, time +from uuid import uuid4 +from qpid.messaging import * +from qpid.log import enable, DEBUG, WARN +from common import * + +parser = optparse.OptionParser(usage="usage: %prog [options] PATTERN ...", + description="reserve a machine") +parser.add_option("-b", "--broker", default="localhost", + help="connect to specified BROKER (default %default)") +parser.add_option("-a", "--address", default="reservations", + help="address for reservation requests") +parser.add_option("-r", "--release", action="store_true", + help="release any machines matching the pattern") +parser.add_option("-s", "--status", action="store_true", + help="list machine status") +parser.add_option("-d", "--discover", action="store_true", + help="use discovery instead of inventory") +parser.add_option("-o", "--owner", default=os.environ["USER"], + help="the holder of the reservation") +parser.add_option("-n", "--number", type=int, default=1, + help="the number of machines to reserve") +parser.add_option("-t", "--timeout", type=float, default=10, + help="timeout in seconds to wait for resources") +parser.add_option("-v", dest="verbose", action="store_true", + help="enable verbose logging") + +opts, args = parser.parse_args() + +if opts.verbose: + enable("qpid", DEBUG) +else: + enable("qpid", WARN) + +if args: + patterns = args +else: + patterns = ["*"] + +conn = Connection.establish(opts.broker) + +if opts.release: + request_type = "release" + candidate_status = BUSY + candidate_owner = opts.owner +else: + request_type = "reserve" + candidate_status = FREE + candidate_owner = None + +class Requester(Dispatcher): + + def __init__(self): + self.agents = {} + self.requests = set() + self.outstanding = set() + + def agent_status(self, id): + status, owner = self.agents[id] + if owner: + return "%s %s(%s)" % (id, status, owner) + else: + return "%s %s" % (id, status) + + def correlation(self, cid): + self.requests.add(cid) + self.outstanding.add(cid) + + def ignored(self, msg): + return msg.properties.get("type") not in ("status", "empty") or \ + msg.correlation_id not in self.requests + + def do_status(self, msg): + id, status, owner = get_status(msg) + self.agents[id] = (status, owner) + + if opts.status: + print self.agent_status(id) + + def do_empty(self, msg): + print "no matching resources" + + def candidates(self, candidate_status, candidate_owner): + for id, (status, owner) in self.agents.items(): + if status == candidate_status and owner == candidate_owner: + yield id + + def dispatch(self, msg): + result = Dispatcher.dispatch(self, msg) + count = msg.properties.get("count") + sequence = msg.properties.get("sequence") + if count and sequence == count: + self.outstanding.discard(msg.correlation_id) + return result + +try: + ssn = conn.session() + rcv = ssn.receiver(opts.address, capacity=10) + snd = ssn.sender(opts.address) + + correlation_id = str(uuid4()) + + if opts.discover: + properties = {"type": "discover", "identity": patterns} + content = None + else: + properties = {"type": "query"} + content = {"identity": patterns} + + snd.send(Message(reply_to = opts.address, + correlation_id = correlation_id, + properties = properties, + content = content)) + + req = Requester() + req.correlation(correlation_id) + + start = time.time() + ellapsed = 0 + requested = set() + discovering = opts.discover + + while ellapsed <= opts.timeout and (discovering or req.outstanding): + try: + msg = rcv.fetch(opts.timeout - ellapsed) + ssn.acknowledge(msg) + except Empty: + continue + finally: + ellapsed = time.time() - start + + req.dispatch(msg) + if not opts.status: + if len(requested) < opts.number: + for cid in req.candidates(candidate_status, candidate_owner): + if cid in requested: continue + req_msg = Message(reply_to = opts.address, + correlation_id = str(uuid4()), + properties = {"type": request_type, + "identity": [cid]}, + content = {"owner": opts.owner}) + if not requested: + print "requesting %s:" % request_type, + print cid, + sys.stdout.flush() + req.correlation(req_msg.correlation_id) + snd.send(req_msg) + requested.add(cid) + else: + discovering = False + + if requested: + print + owners = {} + for id in requested: + st, ow = req.agents[id] + if not owners.has_key(ow): + owners[ow] = [] + owners[ow].append(id) + keys = list(owners.keys()) + keys.sort() + for k in keys: + owners[k].sort() + v = ", ".join(owners[k]) + if k is None: + print "free: %s" % v + else: + print "owner %s: %s" % (k, v) + elif req.agents and not opts.status: + print "no available resources" + + if req.outstanding: + print "request timed out" +except KeyboardInterrupt: + pass +finally: + conn.close() |