summaryrefslogtreecommitdiff
path: root/qpid/python/examples/reservations/reserve
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/python/examples/reservations/reserve')
-rwxr-xr-xqpid/python/examples/reservations/reserve197
1 files changed, 197 insertions, 0 deletions
diff --git a/qpid/python/examples/reservations/reserve b/qpid/python/examples/reservations/reserve
new file mode 100755
index 0000000000..68e7fee912
--- /dev/null
+++ b/qpid/python/examples/reservations/reserve
@@ -0,0 +1,197 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import optparse, os, sys, time
+from uuid import uuid4
+from qpid.messaging import *
+from qpid.log import enable, DEBUG, WARN
+from common import *
+
+parser = optparse.OptionParser(usage="usage: %prog [options] PATTERN ...",
+ description="reserve a machine")
+parser.add_option("-b", "--broker", default="localhost",
+ help="connect to specified BROKER (default %default)")
+parser.add_option("-a", "--address", default="reservations",
+ help="address for reservation requests")
+parser.add_option("-r", "--release", action="store_true",
+ help="release any machines matching the pattern")
+parser.add_option("-s", "--status", action="store_true",
+ help="list machine status")
+parser.add_option("-d", "--discover", action="store_true",
+ help="use discovery instead of inventory")
+parser.add_option("-o", "--owner", default=os.environ["USER"],
+ help="the holder of the reservation")
+parser.add_option("-n", "--number", type=int, default=1,
+ help="the number of machines to reserve")
+parser.add_option("-t", "--timeout", type=float, default=10,
+ help="timeout in seconds to wait for resources")
+parser.add_option("-v", dest="verbose", action="store_true",
+ help="enable verbose logging")
+
+opts, args = parser.parse_args()
+
+if opts.verbose:
+ enable("qpid", DEBUG)
+else:
+ enable("qpid", WARN)
+
+if args:
+ patterns = args
+else:
+ patterns = ["*"]
+
+conn = Connection.establish(opts.broker)
+
+if opts.release:
+ request_type = "release"
+ candidate_status = BUSY
+ candidate_owner = opts.owner
+else:
+ request_type = "reserve"
+ candidate_status = FREE
+ candidate_owner = None
+
+class Requester(Dispatcher):
+
+ def __init__(self):
+ self.agents = {}
+ self.requests = set()
+ self.outstanding = set()
+
+ def agent_status(self, id):
+ status, owner = self.agents[id]
+ if owner:
+ return "%s %s(%s)" % (id, status, owner)
+ else:
+ return "%s %s" % (id, status)
+
+ def correlation(self, cid):
+ self.requests.add(cid)
+ self.outstanding.add(cid)
+
+ def ignored(self, msg):
+ return msg.properties.get("type") not in ("status", "empty") or \
+ msg.correlation_id not in self.requests
+
+ def do_status(self, msg):
+ id, status, owner = get_status(msg)
+ self.agents[id] = (status, owner)
+
+ if opts.status:
+ print self.agent_status(id)
+
+ def do_empty(self, msg):
+ print "no matching resources"
+
+ def candidates(self, candidate_status, candidate_owner):
+ for id, (status, owner) in self.agents.items():
+ if status == candidate_status and owner == candidate_owner:
+ yield id
+
+ def dispatch(self, msg):
+ result = Dispatcher.dispatch(self, msg)
+ count = msg.properties.get("count")
+ sequence = msg.properties.get("sequence")
+ if count and sequence == count:
+ self.outstanding.discard(msg.correlation_id)
+ return result
+
+try:
+ ssn = conn.session()
+ rcv = ssn.receiver(opts.address, capacity=10)
+ snd = ssn.sender(opts.address)
+
+ correlation_id = str(uuid4())
+
+ if opts.discover:
+ properties = {"type": "discover", "identity": patterns}
+ content = None
+ else:
+ properties = {"type": "query"}
+ content = {"identity": patterns}
+
+ snd.send(Message(reply_to = opts.address,
+ correlation_id = correlation_id,
+ properties = properties,
+ content = content))
+
+ req = Requester()
+ req.correlation(correlation_id)
+
+ start = time.time()
+ ellapsed = 0
+ requested = set()
+ discovering = opts.discover
+
+ while ellapsed <= opts.timeout and (discovering or req.outstanding):
+ try:
+ msg = rcv.fetch(opts.timeout - ellapsed)
+ ssn.acknowledge(msg)
+ except Empty:
+ continue
+ finally:
+ ellapsed = time.time() - start
+
+ req.dispatch(msg)
+ if not opts.status:
+ if len(requested) < opts.number:
+ for cid in req.candidates(candidate_status, candidate_owner):
+ if cid in requested: continue
+ req_msg = Message(reply_to = opts.address,
+ correlation_id = str(uuid4()),
+ properties = {"type": request_type,
+ "identity": [cid]},
+ content = {"owner": opts.owner})
+ if not requested:
+ print "requesting %s:" % request_type,
+ print cid,
+ sys.stdout.flush()
+ req.correlation(req_msg.correlation_id)
+ snd.send(req_msg)
+ requested.add(cid)
+ else:
+ discovering = False
+
+ if requested:
+ print
+ owners = {}
+ for id in requested:
+ st, ow = req.agents[id]
+ if not owners.has_key(ow):
+ owners[ow] = []
+ owners[ow].append(id)
+ keys = list(owners.keys())
+ keys.sort()
+ for k in keys:
+ owners[k].sort()
+ v = ", ".join(owners[k])
+ if k is None:
+ print "free: %s" % v
+ else:
+ print "owner %s: %s" % (k, v)
+ elif req.agents and not opts.status:
+ print "no available resources"
+
+ if req.outstanding:
+ print "request timed out"
+except KeyboardInterrupt:
+ pass
+finally:
+ conn.close()