summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/examples/README.txt5
-rwxr-xr-xpython/examples/api/receive194
-rwxr-xr-xpython/examples/api/send281
-rw-r--r--python/examples/api/statistics.py139
-rw-r--r--python/qpid/client.py10
-rw-r--r--python/qpid/connection08.py41
-rw-r--r--python/qpid/delegates.py20
-rw-r--r--python/qpid/messaging/driver.py19
-rw-r--r--python/qpid/messaging/endpoints.py2
-rw-r--r--python/qpid/messaging/transports.py36
-rw-r--r--python/qpid/testlib.py6
-rw-r--r--python/qpid/tests/__init__.py1
-rw-r--r--python/qpid/tests/util.py46
-rw-r--r--python/qpid/util.py26
-rwxr-xr-xpython/setup.py2
15 files changed, 768 insertions, 60 deletions
diff --git a/python/examples/README.txt b/python/examples/README.txt
index 4395160fec..3a3e421a1e 100644
--- a/python/examples/README.txt
+++ b/python/examples/README.txt
@@ -14,6 +14,11 @@ api/spout -- A simple messaging client that sends
messages to the target specified on the
command line.
+api/send -- Sends messages to a specified queue.
+
+api/receive -- Receives messages from a specified queue.
+ Use with the send example above.
+
api/server -- An example server that process incoming
messages and sends replies.
diff --git a/python/examples/api/receive b/python/examples/api/receive
new file mode 100755
index 0000000000..f14df277ac
--- /dev/null
+++ b/python/examples/api/receive
@@ -0,0 +1,194 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import optparse, sys, time
+import statistics
+from qpid.messaging import *
+
+SECOND = 1000
+TIME_SEC = 1000000000
+
+op = optparse.OptionParser(usage="usage: %prog [options]", description="Drains messages from the specified address")
+op.add_option("-b", "--broker", default="localhost:5672", type="str", help="url of broker to connect to")
+op.add_option("-a", "--address", type="str", help="address to receive from")
+op.add_option("--connection-options", default={}, help="options for the connection")
+op.add_option("-m", "--messages", default=0, type="int", help="stop after N messages have been received, 0 means no limit")
+op.add_option("--timeout", default=0, type="int", help="timeout in seconds to wait before exiting")
+op.add_option("-f", "--forever", default=False, action="store_true", help="ignore timeout and wait forever")
+op.add_option("--ignore-duplicates", default=False, action="store_true", help="Detect and ignore duplicates (by checking 'sn' header)")
+op.add_option("--verify-sequence", default=False, action="store_true", help="Verify there are no gaps in the message sequence (by checking 'sn' header)")
+op.add_option("--check-redelivered", default=False, action="store_true", help="Fails with exception if a duplicate is not marked as redelivered (only relevant when ignore-duplicates is selected)")
+op.add_option("--capacity", default=1000, type="int", help="size of the senders outgoing message queue")
+op.add_option("--ack-frequency", default=100, type="int", help="Ack frequency (0 implies none of the messages will get accepted)")
+op.add_option("--tx", default=0, type="int", help="batch size for transactions (0 implies transaction are not used)")
+op.add_option("--rollback-frequency", default=0, type="int", help="rollback frequency (0 implies no transaction will be rolledback)")
+op.add_option("--print-content", type="str", default="yes", help="print out message content")
+op.add_option("--print-headers", type="str", default="no", help="print out message headers")
+op.add_option("--failover-updates", default=False, action="store_true", help="Listen for membership updates distributed via amq.failover")
+op.add_option("--report-total", default=False, action="store_true", help="Report total throughput statistics")
+op.add_option("--report-every", default=0, type="int", help="Report throughput statistics every N messages")
+op.add_option("--report-header", type="str", default="yes", help="Headers on report")
+op.add_option("--ready-address", type="str", help="send a message to this address when ready to receive")
+op.add_option("--receive-rate", default=0, type="int", help="Receive at rate of N messages/second. 0 means receive as fast as possible")
+#op.add_option("--help", default=False, action="store_true", help="print this usage statement")
+
+def getTimeout(timeout, forever):
+ if forever:
+ return None
+ else:
+ return SECOND*timeout
+
+
+EOS = "eos"
+SN = "sn"
+
+# Check for duplicate or dropped messages by sequence number
+class SequenceTracker:
+ def __init__(self, opts):
+ self.opts = opts
+ self.lastSn = 0
+
+ # Return True if the message should be procesed, false if it should be ignored.
+ def track(self, message):
+ if not(self.opts.verify_sequence) or (self.opts.ignore_duplicates):
+ return True
+ sn = message.properties[SN]
+ duplicate = (sn <= lastSn)
+ dropped = (sn > lastSn+1)
+ if self.opts.verify_sequence and dropped:
+ raise Exception("Gap in sequence numbers %s-%s" %(lastSn, sn))
+ ignore = (duplicate and self.opts.ignore_duplicates)
+ if ignore and self.opts.check_redelivered and (not msg.redelivered):
+ raise Exception("duplicate sequence number received, message not marked as redelivered!")
+ if not duplicate:
+ lastSn = sn
+ return (not(ignore))
+
+
+def main():
+ opts, args = op.parse_args()
+ if not opts.address:
+ raise Exception("Address must be specified!")
+
+ broker = opts.broker
+ address = opts.address
+ connection = Connection(opts.broker, **opts.connection_options)
+
+ try:
+ connection.open()
+ if opts.failover_updates:
+ auto_fetch_reconnect_urls(connection)
+ session = connection.session(transactional=(opts.tx))
+ receiver = session.receiver(opts.address)
+ if opts.capacity > 0:
+ receiver.capacity = opts.capacity
+ msg = Message()
+ count = 0
+ txCount = 0
+ sequenceTracker = SequenceTracker(opts)
+ timeout = getTimeout(opts.timeout, opts.forever)
+ done = False
+ stats = statistics.ThroughputAndLatency()
+ reporter = statistics.Reporter(opts.report_every, opts.report_header == "yes", stats)
+
+ if opts.ready_address is not None:
+ session.sender(opts.ready_address).send(msg)
+ if opts.tx > 0:
+ session.commit()
+ # For receive rate calculation
+ start = time.time()*TIME_SEC
+ interval = 0
+ if opts.receive_rate > 0:
+ interval = TIME_SEC / opts.receive_rate
+
+ replyTo = {} # a dictionary of reply-to address -> sender mapping
+
+ while (not done):
+ try:
+ msg = receiver.fetch(timeout=timeout)
+ reporter.message(msg)
+ if sequenceTracker.track(msg):
+ if msg.content == EOS:
+ done = True
+ else:
+ count+=1
+ if opts.print_headers == "yes":
+ if msg.subject is not None:
+ print "Subject: %s" %msg.subject
+ if msg.reply_to is not None:
+ print "ReplyTo: %s" %msg.reply_to
+ if msg.correlation_id is not None:
+ print "CorrelationId: %s" %msg.correlation_id
+ if msg.user_id is not None:
+ print "UserId: %s" %msg.user_id
+ if msg.ttl is not None:
+ print "TTL: %s" %msg.ttl
+ if msg.priority is not None:
+ print "Priority: %s" %msg.priority
+ if msg.durable:
+ print "Durable: true"
+ if msg.redelivered:
+ print "Redelivered: true"
+ print "Properties: %s" %msg.properties
+ print
+ if opts.print_content == "yes":
+ print msg.content
+ if (opts.messages > 0) and (count >= opts.messages):
+ done = True
+ # end of "if sequenceTracker.track(msg):"
+ if (opts.tx > 0) and (count % opts.tx == 0):
+ txCount+=1
+ if (opts.rollback_frequency > 0) and (txCount % opts.rollback_frequency == 0):
+ session.rollback()
+ else:
+ session.commit()
+ elif (opts.ack_frequency > 0) and (count % opts.ack_frequency == 0):
+ session.acknowledge()
+ if msg.reply_to is not None: # Echo message back to reply-to address.
+ if msg.reply_to not in replyTo:
+ replyTo[msg.reply_to] = session.sender(msg.reply_to)
+ replyTo[msg.reply_to].capacity = opts.capacity
+ replyTo[msg.reply_to].send(msg)
+ if opts.receive_rate > 0:
+ delay = start + count*interval - time.time()*TIME_SEC
+ if delay > 0:
+ time.sleep(delay)
+ # Clear out message properties & content for next iteration.
+ msg = Message()
+ except Empty: # no message fetched => break the while cycle
+ break
+ # end of while cycle
+ if opts.report_total:
+ reporter.report()
+ if opts.tx > 0:
+ txCount+=1
+ if opts.rollback_frequency and (txCount % opts.rollback_frequency == 0):
+ session.rollback()
+ else:
+ session.commit()
+ else:
+ session.acknowledge()
+ session.close()
+ connection.close()
+ except Exception,e:
+ print e
+ connection.close()
+
+if __name__ == "__main__": main()
diff --git a/python/examples/api/send b/python/examples/api/send
new file mode 100755
index 0000000000..b0105e41a6
--- /dev/null
+++ b/python/examples/api/send
@@ -0,0 +1,281 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import optparse, random, os, time, uuid
+from qpid.messaging import *
+import statistics
+
+EOS = "eos"
+SN = "sn"
+TS = "ts"
+
+TIME_SEC = 1000000000
+SECOND = 1000
+
+def nameval(st):
+ idx = st.find("=")
+ if idx >= 0:
+ name = st[0:idx]
+ value = st[idx+1:]
+ else:
+ name = st
+ value = None
+ return name, value
+
+
+op = optparse.OptionParser(usage="usage: %prog [options]", description="Spouts messages to the specified address")
+op.add_option("-b", "--broker", default="localhost:5672", type="str", help="url of broker to connect to")
+op.add_option("-a", "--address", type="str", help="address to send to")
+op.add_option("--connection-options", default={}, help="options for the connection")
+op.add_option("-m", "--messages", default=1, type="int", help="stop after N messages have been sent, 0 means no limit")
+op.add_option("-i", "--id", type="str", help="use the supplied id instead of generating one")
+op.add_option("--reply-to", type="str", help="specify reply-to address")
+op.add_option("--send-eos", default=0, type="int", help="Send N EOS messages to mark end of input")
+op.add_option("--durable", default=False, action="store_true", help="Mark messages as durable")
+op.add_option("--ttl", default=0, type="int", help="Time-to-live for messages, in milliseconds")
+op.add_option("--priority", default=0, type="int", help="Priority for messages (higher value implies higher priority)")
+op.add_option("-P", "--property", default=[], action="append", type="str", help="specify message property")
+op.add_option("--correlation-id", type="str", help="correlation-id for message")
+op.add_option("--user-id", type="str", help="userid for message")
+op.add_option("--content-string", type="str", help="use CONTENT as message content")
+op.add_option("--content-size", default=0, type="int", help="create an N-byte message content")
+op.add_option("-M", "--content-map", default=[], action="append", type="str", help="specify entry for map content")
+op.add_option("--content-stdin", default=False, action="store_true", help="read message content from stdin, one line per message")
+op.add_option("--capacity", default=1000, type="int", help="size of the senders outgoing message queue")
+op.add_option("--tx", default=0, type="int", help="batch size for transactions (0 implies transaction are not used)")
+op.add_option("--rollback-frequency", default=0, type="int", help="rollback frequency (0 implies no transaction will be rolledback)")
+op.add_option("--failover-updates", default=False, action="store_true", help="Listen for membership updates distributed via amq.failover")
+op.add_option("--report-total", default=False, action="store_true", help="Report total throughput statistics")
+op.add_option("--report-every", default=0, type="int", help="Report throughput statistics every N messages")
+op.add_option("--report-header", type="str", default="yes", help="Headers on report")
+op.add_option("--send-rate", default=0, type="int", help="Send at rate of N messages/second. 0 means send as fast as possible")
+op.add_option("--flow-control", default=0, type="int", help="Do end to end flow control to limit queue depth to 2*N. 0 means no flow control.")
+op.add_option("--sequence", type="str", default="yes", help="Add a sequence number messages property (required for duplicate/lost message detection)")
+op.add_option("--timestamp", type="str", default="yes", help="Add a time stamp messages property (required for latency measurement)")
+op.add_option("--group-key", type="str", help="Generate groups of messages using message header 'KEY' to hold the group identifier")
+op.add_option("--group-prefix", default="GROUP-", type="str", help="Generate group identifers with 'STRING' prefix (if group-key specified)")
+op.add_option("--group-size", default=10, type="int", help="Number of messages per a group (if group-key specified)")
+op.add_option("--group-randomize-size", default=False, action="store_true", help="Randomize the number of messages per group to [1...group-size] (if group-key specified)")
+op.add_option("--group-interleave", default=1, type="int", help="Simultaineously interleave messages from N different groups (if group-key specified)")
+
+
+class ContentGenerator:
+ def setContent(self, msg):
+ return
+
+class GetlineContentGenerator(ContentGenerator):
+ def setContent(self, msg):
+ content = sys.stdin.readline()
+ got = (not line)
+ if (got):
+ msg.content = content
+ return got
+
+class FixedContentGenerator(ContentGenerator):
+ def __init__(self, content=None):
+ self.content = content
+
+ def setContent(self, msg):
+ msg.content = self.content
+ return True
+
+class MapContentGenerator(ContentGenerator):
+ def __init__(self, opts=None):
+ self.opts = opts
+
+ def setContent(self, msg):
+ self.content = {}
+ for e in self.opts.content_map:
+ name, val = nameval(p)
+ content[name] = val
+ msg.content = self.content
+ return True
+
+
+# tag each generated message with a group identifer
+class GroupGenerator:
+ def __init__(self, key, prefix, size, randomize, interleave):
+ groupKey = key
+ groupPrefix = prefix
+ groupSize = size
+ randomizeSize = randomize
+ groupSuffix = 0
+ if (randomize > 0):
+ random.seed(os.getpid())
+
+ for i in range(0, interleave):
+ self.newGroup()
+ current = 0
+
+ def setGroupInfo(self, msg):
+ if (current == len(groups)):
+ current = 0
+ my_group = groups[current]
+ msg.properties[groupKey] = my_group[id];
+ # print "SENDING GROUPID=[%s]\n" % my_group[id]
+ my_group[count]=my_group[count]+1
+ if (my_group[count] == my_group[size]):
+ self.newGroup()
+ del groups[current]
+ else:
+ current+=1
+
+ def newGroup(self):
+ groupId = "%s%s" % (groupPrefix, groupSuffix)
+ groupSuffix+=1
+ size = groupSize
+ if (randomizeSize == True):
+ size = random.randint(1,groupSize)
+ # print "New group: GROUPID=["%s] size=%s" % (groupId, size)
+ groups.append({'id':groupId, 'size':size, 'count':0})
+
+
+
+def main():
+ opts, args = op.parse_args()
+ if not opts.address:
+ raise Exception("Address must be specified!")
+
+ broker = opts.broker
+ address = opts.address
+ connection = Connection(opts.broker, **opts.connection_options)
+
+ try:
+ connection.open()
+ if (opts.failover_updates):
+ auto_fetch_reconnect_urls(connection)
+ session = connection.session(transactional=(opts.tx))
+ sender = session.sender(opts.address)
+ if (opts.capacity>0):
+ sender.capacity = opts.capacity
+ sent = 0
+ txCount = 0
+ stats = statistics.Throughput()
+ reporter = statistics.Reporter(opts.report_every, opts.report_header == "yes", stats)
+
+ contentGen = ContentGenerator()
+ content = "" # auxiliary variable for determining content type of message - needs to be changed to {} for Map message
+ if opts.content_stdin:
+ opts.messages = 0 # Don't limit number of messages sent.
+ contentGen = GetlineContentGenerator()
+ elif opts.content_map is not None:
+ contentGen = MapContentGenerator(opts)
+ content = {}
+ elif opts.content_size is not None:
+ contentGen = FixedContentGenerator('X' * opts.content_size)
+ else:
+ contentGen = FixedContentGenerator(opts.content_string)
+ if opts.group_key is not None:
+ groupGen = GroupGenerator(opts.group_key, opts.group_prefix, opts.group_size, opts.group_random_size, opts.group_interleave)
+
+ msg = Message(content=content)
+ msg.durable = opts.durable
+ if opts.ttl:
+ msg.ttl = opts.ttl/1000.0
+ if opts.priority:
+ msg.priority = opts.priority
+ if opts.reply_to is not None:
+ if opts.flow_control > 0:
+ raise Exception("Can't use reply-to and flow-control together")
+ msg.reply_to = opts.reply_to
+ if opts.user_id is not None:
+ msg.user_id = opts.user_id
+ if opts.correlation_id is not None:
+ msg.correlation_id = opts.correlation_id
+ for p in opts.property:
+ name, val = nameval(p)
+ msg.properties[name] = val
+
+ start = time.time()*TIME_SEC
+ interval = 0
+ if opts.send_rate > 0:
+ interval = TIME_SEC/opts.send_rate
+
+ flowControlAddress = "flow-" + str(uuid.uuid1()) + ";{create:always,delete:always}"
+ flowSent = 0
+ if opts.flow_control > 0:
+ flowControlReceiver = session.receiver(flowControlAddress)
+ flowControlReceiver.capacity = 2
+
+ while (contentGen.setContent(msg) == True):
+ sent+=1
+ if opts.sequence == "yes":
+ msg.properties[SN] = sent
+
+ if opts.flow_control > 0:
+ if (sent % opts.flow_control == 0):
+ msg.reply_to = flowControlAddress
+ flowSent+=1
+ else:
+ msg.reply_to = "" # Clear the reply address.
+
+ if 'groupGen' in vars():
+ groupGen.setGroupInfo(msg)
+
+ if (opts.timestamp == "yes"):
+ msg.properties[TS] = int(time.time()*TIME_SEC)
+ sender.send(msg)
+ reporter.message(msg)
+
+ if ((opts.tx > 0) and (sent % opts.tx == 0)):
+ txCount+=1
+ if ((opts.rollbackFrequency > 0) and (txCount % opts.rollbackFrequency == 0)):
+ session.rollback()
+ else:
+ session.commit()
+ if ((opts.messages > 0) and (sent >= opts.messages)):
+ break
+
+ if (opts.flow_control > 0) and (flowSent == 2):
+ flowControlReceiver.fetch(timeout=SECOND)
+ flowSent -= 1
+
+ if (opts.send_rate > 0):
+ delay = start + sent*interval - time.time()*TIME_SEC
+ if (delay > 0):
+ time.sleep(delay)
+ #end of while
+
+ while flowSent > 0:
+ flowControlReceiver.fetch(timeout=SECOND)
+ flowSent -= 1
+
+ if (opts.report_total):
+ reporter.report()
+ for i in reversed(range(1,opts.send_eos+1)):
+ if (opts.sequence == "yes"):
+ sent+=1
+ msg.properties[SN] = sent
+ msg.properties[EOS] = True #TODO (also in C++ client): add in ability to send digest or similar
+ sender.send(msg)
+ if ((opts.tx > 0) and (sent % opts.tx == 0)):
+ txCount+=1
+ if ((opts.rollback_frequency > 0) and (txCount % opts.rollback_frequency == 0)):
+ session.rollback()
+ else:
+ session.commit()
+ session.sync()
+ session.close()
+ connection.close()
+ except Exception,e:
+ print e
+ connection.close()
+
+if __name__ == "__main__": main()
diff --git a/python/examples/api/statistics.py b/python/examples/api/statistics.py
new file mode 100644
index 0000000000..e095920e90
--- /dev/null
+++ b/python/examples/api/statistics.py
@@ -0,0 +1,139 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import time
+
+TS = "ts"
+TIME_SEC = 1000000000
+MILLISECOND = 1000
+
+class Statistic:
+ def message(self, msg):
+ return
+ def report(self):
+ return ""
+ def header(self):
+ return ""
+
+
+class Throughput(Statistic):
+ def __init__(self):
+ self.messages = 0
+ self.started = False
+
+ def message(self, m):
+ self.messages += 1
+ if not self.started:
+ self.start = time.time()
+ self.started = True
+
+ def header(self):
+ return "tp(m/s)"
+
+ def report(self):
+ if self.started:
+ elapsed = time.time() - self.start
+ return str(int(self.messages/elapsed))
+ else:
+ return "0"
+
+
+class ThroughputAndLatency(Throughput):
+ def __init__(self):
+ Throughput.__init__(self)
+ self.total = 0.0
+ self.min = float('inf')
+ self.max = -float('inf')
+ self.samples = 0
+
+ def message(self, m):
+ Throughput.message(self, m)
+ if TS in m.properties:
+ self.samples+=1
+ latency = MILLISECOND * (time.time() - float(m.properties[TS])/TIME_SEC)
+ if latency > 0:
+ self.total += latency
+ if latency < self.min:
+ self.min = latency
+ if latency > self.max:
+ self.max = latency
+
+ def header(self):
+# Throughput.header(self)
+ return "%s\tl-min\tl-max\tl-avg" % Throughput.header(self)
+
+ def report(self):
+ output = Throughput.report(self)
+ if (self.samples > 0):
+ output += "\t%.2f\t%.2f\t%.2f" %(self.min, self.max, self.total/self.samples)
+ return output
+
+
+# Report batch and overall statistics
+class ReporterBase:
+ def __init__(self, batch, wantHeader):
+ self.batchSize = batch
+ self.batchCount = 0
+ self.headerPrinted = not wantHeader
+ self.overall = None
+ self.batch = None
+
+ def create(self):
+ return
+
+ # Count message in the statistics
+ def message(self, m):
+ if self.overall == None:
+ self.overall = self.create()
+ self.overall.message(m)
+ if self.batchSize:
+ if self.batch == None:
+ self.batch = self.create()
+ self.batch.message(m)
+ self.batchCount+=1
+ if self.batchCount == self.batchSize:
+ self.header()
+ print self.batch.report()
+ self.create()
+ self.batchCount = 0
+
+ # Print overall report.
+ def report(self):
+ if self.overall == None:
+ self.overall = self.create()
+ self.header()
+ print self.overall.report()
+
+ def header(self):
+ if not self.headerPrinted:
+ if self.overall == None:
+ self.overall = self.create()
+ print self.overall.header()
+ self.headerPrinted = True
+
+
+class Reporter(ReporterBase):
+ def __init__(self, batchSize, wantHeader, Stats):
+ ReporterBase.__init__(self, batchSize, wantHeader)
+ self.__stats = Stats
+
+ def create(self):
+ ClassName = self.__stats.__class__
+ return ClassName()
diff --git a/python/qpid/client.py b/python/qpid/client.py
index 5a877bb8d6..4d42a8b20f 100644
--- a/python/qpid/client.py
+++ b/python/qpid/client.py
@@ -18,13 +18,14 @@
#
"""
-An AQMP client implementation that uses a custom delegate for
+An AMQP client implementation that uses a custom delegate for
interacting with the server.
"""
import os, threading
from peer import Peer, Channel, Closed
from delegate import Delegate
+from util import get_client_properties_with_defaults
from connection08 import Connection, Frame, connect
from spec08 import load
from queue import Queue
@@ -76,12 +77,12 @@ class Client:
self.lock.release()
return q
- def start(self, response, mechanism="AMQPLAIN", locale="en_US", tune_params=None):
+ def start(self, response, mechanism="AMQPLAIN", locale="en_US", tune_params=None, client_properties=None):
self.mechanism = mechanism
self.response = response
self.locale = locale
self.tune_params = tune_params
-
+ self.client_properties=get_client_properties_with_defaults(provided_client_properties=client_properties)
self.socket = connect(self.host, self.port)
self.conn = Connection(self.socket, self.spec)
self.peer = Peer(self.conn, ClientDelegate(self), Session)
@@ -128,7 +129,8 @@ class ClientDelegate(Delegate):
def connection_start(self, ch, msg):
msg.start_ok(mechanism=self.client.mechanism,
response=self.client.response,
- locale=self.client.locale)
+ locale=self.client.locale,
+ client_properties=self.client.client_properties)
def connection_tune(self, ch, msg):
if self.client.tune_params:
diff --git a/python/qpid/connection08.py b/python/qpid/connection08.py
index 654148dad2..0045e122ea 100644
--- a/python/qpid/connection08.py
+++ b/python/qpid/connection08.py
@@ -28,6 +28,9 @@ from cStringIO import StringIO
from codec import EOF
from compat import SHUT_RDWR
from exceptions import VersionError
+from logging import getLogger, DEBUG
+
+log = getLogger("qpid.connection08")
class SockIO:
@@ -35,7 +38,8 @@ class SockIO:
self.sock = sock
def write(self, buf):
-# print "OUT: %r" % buf
+ if log.isEnabledFor(DEBUG):
+ log.debug("OUT: %r", buf)
self.sock.sendall(buf)
def read(self, n):
@@ -47,8 +51,9 @@ class SockIO:
break
if len(s) == 0:
break
-# print "IN: %r" % s
data += s
+ if log.isEnabledFor(DEBUG):
+ log.debug("IN: %r", data)
return data
def flush(self):
@@ -120,19 +125,25 @@ class Connection:
(self.spec.major, self.spec.minor, major, minor))
else:
raise FramingError("unknown frame type: %s" % tid)
- channel = c.decode_short()
- body = c.decode_longstr()
- dec = codec.Codec(StringIO(body), self.spec)
- frame = Frame.DECODERS[type].decode(self.spec, dec, len(body))
- frame.channel = channel
- end = c.decode_octet()
- if end != self.FRAME_END:
- garbage = ""
- while end != self.FRAME_END:
- garbage += chr(end)
- end = c.decode_octet()
- raise "frame error: expected %r, got %r" % (self.FRAME_END, garbage)
- return frame
+ try:
+ channel = c.decode_short()
+ body = c.decode_longstr()
+ dec = codec.Codec(StringIO(body), self.spec)
+ frame = Frame.DECODERS[type].decode(self.spec, dec, len(body))
+ frame.channel = channel
+ end = c.decode_octet()
+ if end != self.FRAME_END:
+ garbage = ""
+ while end != self.FRAME_END:
+ garbage += chr(end)
+ end = c.decode_octet()
+ raise "frame error: expected %r, got %r" % (self.FRAME_END, garbage)
+ return frame
+ except EOF:
+ # An EOF caught here can indicate an error decoding the frame,
+ # rather than that a disconnection occurred,so it's worth logging it.
+ log.exception("Error occurred when reading frame with tid %s" % tid)
+ raise
def write_0_9(self, frame):
self.write_8_0(frame)
diff --git a/python/qpid/delegates.py b/python/qpid/delegates.py
index 5e44a3a6dc..ae7ed7f988 100644
--- a/python/qpid/delegates.py
+++ b/python/qpid/delegates.py
@@ -18,7 +18,7 @@
#
import os, connection, session
-from util import notify
+from util import notify, get_client_properties_with_defaults
from datatypes import RangedSet
from exceptions import VersionError, Closed
from logging import getLogger
@@ -137,24 +137,12 @@ class Server(Delegate):
class Client(Delegate):
- ppid = 0
- try:
- ppid = os.getppid()
- except:
- pass
-
- PROPERTIES = {"product": "qpid python client",
- "version": "development",
- "platform": os.name,
- "qpid.client_process": os.path.basename(sys.argv[0]),
- "qpid.client_pid": os.getpid(),
- "qpid.client_ppid": ppid}
-
def __init__(self, connection, username=None, password=None,
mechanism=None, heartbeat=None, **kwargs):
Delegate.__init__(self, connection)
- self.client_properties=Client.PROPERTIES.copy()
- self.client_properties.update(kwargs.get("client_properties",{}))
+ provided_client_properties = kwargs.get("client_properties")
+ self.client_properties=get_client_properties_with_defaults(provided_client_properties)
+
##
## self.acceptableMechanisms is the list of SASL mechanisms that the client is willing to
## use. If it's None, then any mechanism is acceptable.
diff --git a/python/qpid/messaging/driver.py b/python/qpid/messaging/driver.py
index 3cb62d75c9..2bd638f327 100644
--- a/python/qpid/messaging/driver.py
+++ b/python/qpid/messaging/driver.py
@@ -31,7 +31,7 @@ from qpid.messaging.exceptions import *
from qpid.messaging.message import get_codec, Disposition, Message
from qpid.ops import *
from qpid.selector import Selector
-from qpid.util import URL, default
+from qpid.util import URL, default,get_client_properties_with_defaults
from qpid.validator import And, Context, List, Map, Types, Values
from threading import Condition, Thread
@@ -90,20 +90,6 @@ SUBJECT_DEFAULTS = {
"topic": "#"
}
-# XXX
-ppid = 0
-try:
- ppid = os.getppid()
-except:
- pass
-
-CLIENT_PROPERTIES = {"product": "qpid python client",
- "version": "development",
- "platform": os.name,
- "qpid.client_process": os.path.basename(sys.argv[0]),
- "qpid.client_pid": os.getpid(),
- "qpid.client_ppid": ppid}
-
def noop(): pass
def sync_noop(): pass
@@ -710,8 +696,7 @@ class Engine:
except sasl.SASLError, e:
raise AuthenticationFailure(text=str(e))
- client_properties = CLIENT_PROPERTIES.copy()
- client_properties.update(self.connection.client_properties)
+ client_properties = get_client_properties_with_defaults(provided_client_properties=self.connection.client_properties);
self.write_op(ConnectionStartOk(client_properties=client_properties,
mechanism=mech, response=initial))
diff --git a/python/qpid/messaging/endpoints.py b/python/qpid/messaging/endpoints.py
index e632c0c5b8..95ff5516d0 100644
--- a/python/qpid/messaging/endpoints.py
+++ b/python/qpid/messaging/endpoints.py
@@ -871,7 +871,7 @@ class Sender(Endpoint):
self.queued += 1
if sync:
- self.sync()
+ self.sync(timeout=timeout)
assert message not in self.session.outgoing
else:
self._wakeup()
diff --git a/python/qpid/messaging/transports.py b/python/qpid/messaging/transports.py
index 532c365884..e901e98258 100644
--- a/python/qpid/messaging/transports.py
+++ b/python/qpid/messaging/transports.py
@@ -55,7 +55,41 @@ try:
from ssl import wrap_socket, SSLError, SSL_ERROR_WANT_READ, \
SSL_ERROR_WANT_WRITE
except ImportError:
- pass
+
+ ## try the older python SSL api:
+ from socket import ssl
+
+ class old_ssl(SocketTransport):
+ def __init__(self, conn, host, port):
+ SocketTransport.__init__(self, conn, host, port)
+ # Bug (QPID-4337): this is the "old" version of python SSL.
+ # The private key is required. If a certificate is given, but no
+ # keyfile, assume the key is contained in the certificate
+ ssl_keyfile = conn.ssl_keyfile
+ ssl_certfile = conn.ssl_certfile
+ if ssl_certfile and not ssl_keyfile:
+ ssl_keyfile = ssl_certfile
+ self.ssl = ssl(self.socket, keyfile=ssl_keyfile, certfile=ssl_certfile)
+ self.socket.setblocking(1)
+
+ def reading(self, reading):
+ return reading
+
+ def writing(self, writing):
+ return writing
+
+ def recv(self, n):
+ return self.ssl.read(n)
+
+ def send(self, s):
+ return self.ssl.write(s)
+
+ def close(self):
+ self.socket.close()
+
+ TRANSPORTS["ssl"] = old_ssl
+ TRANSPORTS["tcp+tls"] = old_ssl
+
else:
class tls(SocketTransport):
diff --git a/python/qpid/testlib.py b/python/qpid/testlib.py
index f9796982f5..2b283f3998 100644
--- a/python/qpid/testlib.py
+++ b/python/qpid/testlib.py
@@ -73,7 +73,7 @@ class TestBase(unittest.TestCase):
else:
self.client.close()
- def connect(self, host=None, port=None, user=None, password=None, tune_params=None):
+ def connect(self, host=None, port=None, user=None, password=None, tune_params=None, client_properties=None):
"""Create a new connction, return the Client object"""
host = host or self.config.broker.host
port = port or self.config.broker.port or 5672
@@ -82,9 +82,9 @@ class TestBase(unittest.TestCase):
client = qpid.client.Client(host, port)
try:
if client.spec.major == 8 and client.spec.minor == 0:
- client.start({"LOGIN": user, "PASSWORD": password}, tune_params=tune_params)
+ client.start({"LOGIN": user, "PASSWORD": password}, tune_params=tune_params, client_properties=client_properties)
else:
- client.start("\x00" + user + "\x00" + password, mechanism="PLAIN", tune_params=tune_params)
+ client.start("\x00" + user + "\x00" + password, mechanism="PLAIN", tune_params=tune_params, client_properties=client_properties)
except qpid.client.Closed, e:
if isinstance(e.args[0], VersionError):
raise Skipped(e.args[0])
diff --git a/python/qpid/tests/__init__.py b/python/qpid/tests/__init__.py
index 101a0c3759..dc9988515e 100644
--- a/python/qpid/tests/__init__.py
+++ b/python/qpid/tests/__init__.py
@@ -37,6 +37,7 @@ import qpid.tests.datatypes
import qpid.tests.connection
import qpid.tests.spec010
import qpid.tests.codec010
+import qpid.tests.util
class TestTestsXXX(Test):
diff --git a/python/qpid/tests/util.py b/python/qpid/tests/util.py
new file mode 100644
index 0000000000..9777443720
--- /dev/null
+++ b/python/qpid/tests/util.py
@@ -0,0 +1,46 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from unittest import TestCase
+from qpid.util import get_client_properties_with_defaults
+
+class UtilTest (TestCase):
+
+ def test_get_spec_recommended_client_properties(self):
+ client_properties = get_client_properties_with_defaults(provided_client_properties={"mykey":"myvalue"})
+ self.assertTrue("product" in client_properties)
+ self.assertTrue("version" in client_properties)
+ self.assertTrue("platform" in client_properties)
+
+ def test_get_client_properties_with_provided_value(self):
+ client_properties = get_client_properties_with_defaults(provided_client_properties={"mykey":"myvalue"})
+ self.assertTrue("product" in client_properties)
+ self.assertTrue("mykey" in client_properties)
+ self.assertEqual("myvalue", client_properties["mykey"])
+
+ def test_get_client_properties_with_no_provided_values(self):
+ client_properties = get_client_properties_with_defaults(provided_client_properties=None)
+ self.assertTrue("product" in client_properties)
+
+ client_properties = get_client_properties_with_defaults()
+ self.assertTrue("product" in client_properties)
+
+ def test_get_client_properties_with_provided_value_that_overrides_default(self):
+ client_properties = get_client_properties_with_defaults(provided_client_properties={"version":"myversion"})
+ self.assertEqual("myversion", client_properties["version"])
+
diff --git a/python/qpid/util.py b/python/qpid/util.py
index 39ad1d830e..8da17ce0c6 100644
--- a/python/qpid/util.py
+++ b/python/qpid/util.py
@@ -17,15 +17,19 @@
# under the License.
#
-import os, socket, time, textwrap, re
+import os, socket, time, textwrap, re, sys
try:
from ssl import wrap_socket as ssl
except ImportError:
from socket import ssl as wrap_socket
class ssl:
-
def __init__(self, sock, keyfile=None, certfile=None, trustfile=None):
+ # Bug (QPID-4337): this is the "old" version of python SSL.
+ # The private key is required. If a certificate is given, but no
+ # keyfile, assume the key is contained in the certificate
+ if certfile and not keyfile:
+ keyfile = certfile
self.sock = sock
self.ssl = wrap_socket(sock, keyfile=keyfile, certfile=certfile)
@@ -38,6 +42,24 @@ except ImportError:
def close(self):
self.sock.close()
+def get_client_properties_with_defaults(provided_client_properties={}):
+ ppid = 0
+ try:
+ ppid = os.getppid()
+ except:
+ pass
+
+ client_properties = {"product": "qpid python client",
+ "version": "development",
+ "platform": os.name,
+ "qpid.client_process": os.path.basename(sys.argv[0]),
+ "qpid.client_pid": os.getpid(),
+ "qpid.client_ppid": ppid}
+
+ if provided_client_properties:
+ client_properties.update(provided_client_properties)
+ return client_properties
+
def connect(host, port):
for res in socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM):
af, socktype, proto, canonname, sa = res
diff --git a/python/setup.py b/python/setup.py
index 0b9d99a1af..56af530b43 100755
--- a/python/setup.py
+++ b/python/setup.py
@@ -298,7 +298,7 @@ class install_lib(_install_lib):
return outfiles + extra
setup(name="qpid-python",
- version="0.19",
+ version="0.21",
author="Apache Qpid",
author_email="dev@qpid.apache.org",
packages=["mllib", "qpid", "qpid.messaging", "qpid.tests",