summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2010-02-17 16:13:12 +0000
committerRafael H. Schloming <rhs@apache.org>2010-02-17 16:13:12 +0000
commit6c2a7ea80b9cabbd9c9051e2d5c9f4274451ed7f (patch)
tree23b47299599cba71bb48e4a9ec3e83909ade0839
parent2c38527ac3039332ce613ee05e1f6cdb5fa2f871 (diff)
downloadqpid-python-6c2a7ea80b9cabbd9c9051e2d5c9f4274451ed7f.tar.gz
added reservations to examples
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@911048 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/python/examples/reservations/common.py86
-rwxr-xr-xqpid/python/examples/reservations/inventory100
-rwxr-xr-xqpid/python/examples/reservations/machine-agent109
-rwxr-xr-xqpid/python/examples/reservations/reserve200
4 files changed, 495 insertions, 0 deletions
diff --git a/qpid/python/examples/reservations/common.py b/qpid/python/examples/reservations/common.py
new file mode 100644
index 0000000000..4f9efd0227
--- /dev/null
+++ b/qpid/python/examples/reservations/common.py
@@ -0,0 +1,86 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import traceback
+from fnmatch import fnmatch
+from qpid.messaging import *
+
+class Dispatcher:
+
+ def unhandled(self, msg):
+ print "UNHANDLED MESSAGE: %s" % msg
+
+ def ignored(self, msg):
+ return False
+
+ def dispatch(self, msg):
+ try:
+ if self.ignored(msg):
+ return ()
+ else:
+ type = msg.properties.get("type")
+ replies = getattr(self, "do_%s" % type, self.unhandled)(msg)
+ if replies is None:
+ return ()
+ else:
+ return replies
+ except:
+ traceback.print_exc()
+ return ()
+
+ def run(self, session):
+ senders = {}
+ while self.running():
+ msg = session.next_receiver().fetch()
+ replies = self.dispatch(msg)
+
+ count = len(replies)
+ sequence = 1
+ for r in replies:
+ if senders.has_key(r.to):
+ rsnd = senders[r.to]
+ else:
+ rsnd = session.sender(r.to)
+ senders[r.to] = rsnd
+
+ r.correlation_id = msg.correlation_id
+ r.properties["count"] = count
+ r.properties["sequence"] = sequence
+ sequence += 1
+ try:
+ rsnd.send(r)
+ except SendError, e:
+ print e
+ del senders[r.to]
+ rsnd.close()
+
+ session.acknowledge(msg)
+
+def get_status(msg):
+ return msg.content["identity"], msg.content["status"], msg.content["owner"]
+
+FREE = "free"
+BUSY = "busy"
+
+def match(value, patterns):
+ for p in patterns:
+ if fnmatch(value, p):
+ return True
+ return False
diff --git a/qpid/python/examples/reservations/inventory b/qpid/python/examples/reservations/inventory
new file mode 100755
index 0000000000..10c2034efc
--- /dev/null
+++ b/qpid/python/examples/reservations/inventory
@@ -0,0 +1,100 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import optparse, traceback
+from qpid.messaging import *
+from qpid.util import URL
+from qpid.log import enable, DEBUG, WARN
+from common import *
+
+parser = optparse.OptionParser(usage="usage: %prog [options]",
+ description="machine inventory agent")
+parser.add_option("-b", "--broker", default="localhost",
+ help="connect to specified BROKER (default %default)")
+parser.add_option("-d", "--database",
+ help="database file for persistent machine status")
+parser.add_option("-a", "--address", default="reservations",
+ help="address for reservation requests")
+parser.add_option("-v", dest="verbose", action="store_true",
+ help="enable verbose logging")
+
+opts, args = parser.parse_args()
+
+if opts.verbose:
+ enable("qpid", DEBUG)
+else:
+ enable("qpid", WARN)
+
+url = URL(opts.broker)
+conn = Connection.open(url.host, url.port or AMQP_PORT,
+ username=url.user, password=url.password,
+ reconnect=True,
+ reconnect_delay=1)
+
+class Inventory(Dispatcher):
+
+ def __init__(self):
+ self.agents = {}
+
+ def running(self):
+ return True
+
+ def do_status(self, msg):
+ id, status, owner = get_status(msg)
+ self.agents[id] = (status, owner)
+
+ def do_query(self, msg):
+ patterns = msg.content["identity"]
+ result = []
+ for id, (status, owner) in self.agents.items():
+ if match(id, patterns):
+ r = Message(to = msg.reply_to,
+ properties = {
+ "type": "status"
+ },
+ content = {
+ "identity": id,
+ "status": status,
+ "owner": owner
+ })
+ result.append(r)
+ continue
+ if not result:
+ result.append(Message(to = msg.reply_to,
+ properties = {"type": "empty"}))
+ return result
+
+ def ignored(self, msg):
+ type = msg.properties.get("type")
+ return type not in ("status", "query")
+
+try:
+ ssn = conn.session()
+ rcv = ssn.receiver(opts.address, capacity = 10)
+ snd = ssn.sender(opts.address)
+ snd.send(Message(reply_to = opts.address,
+ properties = {"type": "discover", "identity": ["*"]}))
+
+ inv = Inventory()
+ inv.run(ssn)
+except KeyboardInterrupt:
+ pass
+finally:
+ conn.close()
diff --git a/qpid/python/examples/reservations/machine-agent b/qpid/python/examples/reservations/machine-agent
new file mode 100755
index 0000000000..9df663bdf5
--- /dev/null
+++ b/qpid/python/examples/reservations/machine-agent
@@ -0,0 +1,109 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import optparse, socket
+from qpid.messaging import *
+from qpid.util import URL
+from qpid.log import enable, DEBUG, WARN
+from common import *
+
+host = socket.gethostname()
+
+parser = optparse.OptionParser(usage="usage: %prog [options]",
+ description="machine reservation agent")
+parser.add_option("-b", "--broker", default="localhost",
+ help="connect to specified BROKER (default %default)")
+parser.add_option("-d", "--database",
+ help="database file for persistent machine status")
+parser.add_option("-a", "--address", default="reservations",
+ help="address for reservation requests")
+parser.add_option("-i", "--identity", default=host,
+ help="resource id (default %default)")
+parser.add_option("-v", dest="verbose", action="store_true",
+ help="enable verbose logging")
+
+opts, args = parser.parse_args()
+
+if opts.verbose:
+ enable("qpid", DEBUG)
+else:
+ enable("qpid", WARN)
+
+url = URL(opts.broker)
+conn = Connection.open(url.host, url.port or AMQP_PORT,
+ username=url.user, password=url.password,
+ reconnect=True,
+ reconnect_delay=1)
+
+
+class Agent(Dispatcher):
+
+ def __init__(self, identity):
+ self.identity = identity
+ self.status = FREE
+ self.owner = None
+
+ def running(self):
+ return True
+
+ def get_status(self):
+ msg = Message(properties = {"type": "status"},
+ content = {"identity": self.identity,
+ "status": self.status,
+ "owner": self.owner})
+ return msg
+
+ def do_discover(self, msg):
+ r = self.get_status()
+ r.to = msg.reply_to
+ return [r]
+
+ def do_reserve(self, msg):
+ if self.status == FREE:
+ self.owner = msg.content["owner"]
+ self.status = BUSY
+ return self.do_discover(msg)
+
+ def do_release(self, msg):
+ if self.owner == msg.content["owner"]:
+ self.status = FREE
+ self.owner = None
+ return self.do_discover(msg)
+
+ def ignored(self, msg):
+ patterns = msg.properties.get("identity")
+ type = msg.properties.get("type")
+ if patterns and match(self.identity, patterns):
+ return type == "status"
+ else:
+ return True
+
+try:
+ ssn = conn.session()
+ rcv = ssn.receiver(opts.address)
+ rcv.capacity = 10
+ snd = ssn.sender(opts.address)
+ agent = Agent(opts.identity)
+ snd.send(agent.get_status())
+ agent.run(ssn)
+except KeyboardInterrupt:
+ pass
+finally:
+ conn.close()
diff --git a/qpid/python/examples/reservations/reserve b/qpid/python/examples/reservations/reserve
new file mode 100755
index 0000000000..cabc103c77
--- /dev/null
+++ b/qpid/python/examples/reservations/reserve
@@ -0,0 +1,200 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import optparse, os, sys, time
+from uuid import uuid4
+from qpid.messaging import *
+from qpid.util import URL
+from qpid.log import enable, DEBUG, WARN
+from common import *
+
+parser = optparse.OptionParser(usage="usage: %prog [options] PATTERN ...",
+ description="reserve a machine")
+parser.add_option("-b", "--broker", default="localhost",
+ help="connect to specified BROKER (default %default)")
+parser.add_option("-a", "--address", default="reservations",
+ help="address for reservation requests")
+parser.add_option("-r", "--release", action="store_true",
+ help="release any machines matching the pattern")
+parser.add_option("-s", "--status", action="store_true",
+ help="list machine status")
+parser.add_option("-d", "--discover", action="store_true",
+ help="use discovery instead of inventory")
+parser.add_option("-o", "--owner", default=os.environ["USER"],
+ help="the holder of the reservation")
+parser.add_option("-n", "--number", type=int, default=1,
+ help="the number of machines to reserve")
+parser.add_option("-t", "--timeout", type=float, default=10,
+ help="timeout in seconds to wait for resources")
+parser.add_option("-v", dest="verbose", action="store_true",
+ help="enable verbose logging")
+
+opts, args = parser.parse_args()
+
+if opts.verbose:
+ enable("qpid", DEBUG)
+else:
+ enable("qpid", WARN)
+
+if args:
+ patterns = args
+else:
+ patterns = ["*"]
+
+url = URL(opts.broker)
+conn = Connection.open(url.host, url.port or AMQP_PORT,
+ username=url.user, password=url.password)
+
+if opts.release:
+ request_type = "release"
+ candidate_status = BUSY
+ candidate_owner = opts.owner
+else:
+ request_type = "reserve"
+ candidate_status = FREE
+ candidate_owner = None
+
+class Requester(Dispatcher):
+
+ def __init__(self):
+ self.agents = {}
+ self.requests = set()
+ self.outstanding = set()
+
+ def agent_status(self, id):
+ status, owner = self.agents[id]
+ if owner:
+ return "%s %s(%s)" % (id, status, owner)
+ else:
+ return "%s %s" % (id, status)
+
+ def correlation(self, cid):
+ self.requests.add(cid)
+ self.outstanding.add(cid)
+
+ def ignored(self, msg):
+ return msg.properties.get("type") not in ("status", "empty") or \
+ msg.correlation_id not in self.requests
+
+ def do_status(self, msg):
+ id, status, owner = get_status(msg)
+ self.agents[id] = (status, owner)
+
+ if opts.status:
+ print self.agent_status(id)
+
+ def do_empty(self, msg):
+ print "no matching resources"
+
+ def candidates(self, candidate_status, candidate_owner):
+ for id, (status, owner) in self.agents.items():
+ if status == candidate_status and owner == candidate_owner:
+ yield id
+
+ def dispatch(self, msg):
+ result = Dispatcher.dispatch(self, msg)
+ count = msg.properties.get("count")
+ sequence = msg.properties.get("sequence")
+ if count and sequence == count:
+ self.outstanding.discard(msg.correlation_id)
+ return result
+
+try:
+ ssn = conn.session()
+ rcv = ssn.receiver(opts.address, capacity=10)
+ snd = ssn.sender(opts.address)
+
+ correlation_id = str(uuid4())
+
+ if opts.discover:
+ properties = {"type": "discover", "identity": patterns}
+ content = None
+ else:
+ properties = {"type": "query"}
+ content = {"identity": patterns}
+
+ snd.send(Message(reply_to = opts.address,
+ correlation_id = correlation_id,
+ properties = properties,
+ content = content))
+
+ req = Requester()
+ req.correlation(correlation_id)
+
+ start = time.time()
+ ellapsed = 0
+ requested = set()
+ discovering = opts.discover
+
+ while ellapsed <= opts.timeout and (discovering or req.outstanding):
+ try:
+ msg = rcv.fetch(opts.timeout - ellapsed)
+ ssn.acknowledge(msg)
+ except Empty:
+ continue
+ finally:
+ ellapsed = time.time() - start
+
+ req.dispatch(msg)
+ if not opts.status:
+ if len(requested) < opts.number:
+ for cid in req.candidates(candidate_status, candidate_owner):
+ if cid in requested: continue
+ req_msg = Message(reply_to = opts.address,
+ correlation_id = str(uuid4()),
+ properties = {"type": request_type,
+ "identity": [cid]},
+ content = {"owner": opts.owner})
+ if not requested:
+ print "requesting %s:" % request_type,
+ print cid,
+ sys.stdout.flush()
+ req.correlation(req_msg.correlation_id)
+ snd.send(req_msg)
+ requested.add(cid)
+ else:
+ discovering = False
+
+ if requested:
+ print
+ owners = {}
+ for id in requested:
+ st, ow = req.agents[id]
+ if not owners.has_key(ow):
+ owners[ow] = []
+ owners[ow].append(id)
+ keys = list(owners.keys())
+ keys.sort()
+ for k in keys:
+ owners[k].sort()
+ v = ", ".join(owners[k])
+ if k is None:
+ print "free: %s" % v
+ else:
+ print "owner %s: %s" % (k, v)
+ elif req.agents and not opts.status:
+ print "no available resources"
+
+ if req.outstanding:
+ print "request timed out"
+except KeyboardInterrupt:
+ pass
+finally:
+ conn.close()