diff options
Diffstat (limited to 'qpid/python')
-rwxr-xr-x | qpid/python/examples/api/server | 87 | ||||
-rwxr-xr-x | qpid/python/examples/api/spout (renamed from qpid/python/examples/api/ping) | 49 | ||||
-rw-r--r-- | qpid/python/qpid/driver.py | 18 | ||||
-rw-r--r-- | qpid/python/qpid/tests/messaging.py | 8 |
4 files changed, 147 insertions, 15 deletions
diff --git a/qpid/python/examples/api/server b/qpid/python/examples/api/server new file mode 100755 index 0000000000..adb2dcf792 --- /dev/null +++ b/qpid/python/examples/api/server @@ -0,0 +1,87 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import optparse, sys, traceback +from qpid.messaging import * +from qpid.util import URL +from subprocess import Popen, STDOUT, PIPE +from qpid.log import enable, DEBUG, WARN + +parser = optparse.OptionParser(usage="usage: %prog [options] ADDRESS ...", + description="handle requests from the supplied address.") +parser.add_option("-b", "--broker", default="localhost", + help="connect to specified BROKER (default %default)") +parser.add_option("-v", dest="verbose", action="store_true", help="enable logging") + +opts, args = parser.parse_args() + +if opts.verbose: + enable("qpid", DEBUG) +else: + enable("qpid", WARN) + +url = URL(opts.broker) +if args: + addr = args.pop(0) +else: + parser.error("address is required") + +# XXX: should make URL default the port for us +conn = Connection.open(url.host, url.port or AMQP_PORT, + username=url.user, password=url.password) +conn.reconnect = True +ssn = conn.session() +rcv = ssn.receiver(addr) + +def dispatch(msg): + msg_type = msg.properties.get("type") + if msg_type == "shell": + proc = Popen(msg.content, shell=True, stderr=STDOUT, stdin=PIPE, stdout=PIPE) + output, _ = proc.communicate() + result = Message(output) + result.properties["exit"] = proc.returncode + elif msg_type == "eval": + try: + content = eval(msg.content) + except: + content = traceback.format_exc() + result = Message(content) + else: + result = Message("unrecognized message type: %s" % msg_type) + return result + +while True: + try: + msg = rcv.fetch() + response = dispatch(msg) + snd = ssn.sender(msg.reply_to) + try: + snd.send(response) + except SendError, e: + print e + snd.close() + ssn.acknowledge() + except Empty: + break + except ReceiveError, e: + print e + break + +conn.close() diff --git a/qpid/python/examples/api/ping b/qpid/python/examples/api/spout index 59b367cca6..6a9b2b6e3d 100755 --- a/qpid/python/examples/api/ping +++ b/qpid/python/examples/api/spout @@ -22,35 +22,59 @@ import optparse, time from qpid.messaging import * from qpid.util import URL +def nameval(st): + idx = st.find("=") + if idx >= 0: + name = st[0:idx] + value = st[idx+1:] + else: + name = st + value = None + return name, value + parser = optparse.OptionParser(usage="usage: %prog [options] ADDRESS [ CONTENT ... ]", - description="Drain messages from the supplied address.") + description="Send messages to the supplied address.") parser.add_option("-b", "--broker", default="localhost", help="connect to specified BROKER (default %default)") parser.add_option("-c", "--count", type=int, default=1, help="stop after count messages have been sent, zero disables (default %default)") parser.add_option("-t", "--timeout", type=float, default=None, help="exit after the specified time") -parser.add_option("-m", "--map", action="store_true", - help="interpret content as map") parser.add_option("-i", "--id", help="use the supplied id instead of generating one") +parser.add_option("-r", "--reply-to", help="specify reply-to address") +parser.add_option("-P", "--property", dest="properties", action="append", default=[], + help="specify message property") +parser.add_option("-M", "--map", dest="entries", action="append", default=[], + help="specify map entry for message body") opts, args = parser.parse_args() url = URL(opts.broker) if opts.id is None: - ping_id = str(uuid4()) + spout_id = str(uuid4()) else: - ping_id = opts.id + spout_id = opts.id if args: addr = args.pop(0) else: parser.error("address is required") + +content = None + if args: - content = " ".join(args) - if opts.map: - content = eval(content) + text = " ".join(args) +else: + text = None + +if opts.entries: + content = {} + if text: + content["text"] = text + for e in opts.entries: + name, val = nameval(e) + content[name] = val else: - content = None + content = text # XXX: should make URL default the port for us conn = Connection.open(url.host, url.port or AMQP_PORT, @@ -62,8 +86,11 @@ count = 0 start = time.time() while (opts.count == 0 or count < opts.count) and \ (opts.timeout is None or time.time() - start < opts.timeout): - msg = Message(content) - msg.properties["ping-id"] = "%s:%s" % (ping_id, count) + msg = Message(content, reply_to=opts.reply_to) + msg.properties["spout-id"] = "%s:%s" % (spout_id, count) + for p in opts.properties: + name, val = nameval(p) + msg.properties[name] = val try: snd.send(msg) diff --git a/qpid/python/qpid/driver.py b/qpid/python/qpid/driver.py index 7c293fe146..588b46064c 100644 --- a/qpid/python/qpid/driver.py +++ b/qpid/python/qpid/driver.py @@ -439,14 +439,19 @@ class Driver: if _snd is None and not snd.closing and not snd.closed: _snd = Attachment(snd) + if snd.target is None: + snd.error = ("target is None",) + snd.closed = True + return + try: _snd.name, _snd.subject, _snd.options = address.parse(snd.target) except address.LexError, e: - snd.error = e + snd.error = (e,) snd.closed = True return except address.ParseError, e: - snd.error = e + snd.error = (e,) snd.closed = True return @@ -502,14 +507,19 @@ class Driver: _rcv.canceled = False _rcv.draining = False + if rcv.source is None: + rcv.error = ("source is None",) + rcv.closed = True + return + try: _rcv.name, _rcv.subject, _rcv.options = address.parse(rcv.source) except address.LexError, e: - rcv.error = e + rcv.error = (e,) rcv.closed = True return except address.ParseError, e: - rcv.error = e + rcv.error = (e,) rcv.closed = True return diff --git a/qpid/python/qpid/tests/messaging.py b/qpid/python/qpid/tests/messaging.py index 2e4c0ca1ab..860c3660d1 100644 --- a/qpid/python/qpid/tests/messaging.py +++ b/qpid/python/qpid/tests/messaging.py @@ -597,6 +597,14 @@ class AddressErrorTests(Base): assert check(e), "unexpected error: %s" % e rcv.close() + def testNoneTarget(self): + # XXX: should have specific exception for this + self.sendErrorTest(None, SendError) + + def testNoneSource(self): + # XXX: should have specific exception for this + self.fetchErrorTest(None, ReceiveError) + def testNoTarget(self): # XXX: should have specific exception for this self.sendErrorTest(NOSUCH_Q, SendError, lambda e: NOSUCH_Q in str(e)) |