diff options
author | Rafael H. Schloming <rhs@apache.org> | 2010-02-18 20:22:23 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2010-02-18 20:22:23 +0000 |
commit | 1d2ab4dbafd09fd0ae959b48810c2ef93f8d7af4 (patch) | |
tree | 62518b17d00a16d0d253922af7916987a2c5df42 | |
parent | 6dc4db12c7055ff40d43ed020a847517cd56033f (diff) | |
download | qpid-python-1d2ab4dbafd09fd0ae959b48810c2ef93f8d7af4.tar.gz |
split messaging.py into multiple files and made qpid.messaging a package
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@911550 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/python/qpid/messaging/__init__.py | 35 | ||||
-rw-r--r-- | qpid/python/qpid/messaging/address.py (renamed from qpid/python/qpid/address.py) | 4 | ||||
-rw-r--r-- | qpid/python/qpid/messaging/constants.py | 32 | ||||
-rw-r--r-- | qpid/python/qpid/messaging/driver.py (renamed from qpid/python/qpid/driver.py) | 28 | ||||
-rw-r--r-- | qpid/python/qpid/messaging/endpoints.py (renamed from qpid/python/qpid/messaging.py) | 199 | ||||
-rw-r--r-- | qpid/python/qpid/messaging/exceptions.py | 67 | ||||
-rw-r--r-- | qpid/python/qpid/messaging/message.py | 141 | ||||
-rw-r--r-- | qpid/python/qpid/tests/__init__.py | 32 | ||||
-rw-r--r-- | qpid/python/qpid/tests/messaging/__init__.py | 106 | ||||
-rw-r--r-- | qpid/python/qpid/tests/messaging/address.py (renamed from qpid/python/qpid/tests/address.py) | 4 | ||||
-rw-r--r-- | qpid/python/qpid/tests/messaging/endpoints.py (renamed from qpid/python/qpid/tests/messaging.py) | 205 | ||||
-rw-r--r-- | qpid/python/qpid/tests/messaging/message.py | 116 |
12 files changed, 559 insertions, 410 deletions
diff --git a/qpid/python/qpid/messaging/__init__.py b/qpid/python/qpid/messaging/__init__.py new file mode 100644 index 0000000000..f9ddda2e80 --- /dev/null +++ b/qpid/python/qpid/messaging/__init__.py @@ -0,0 +1,35 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +A candidate high level messaging API for python. + +Areas that still need work: + + - definition of the arguments for L{Session.sender} and L{Session.receiver} + - standard L{Message} properties + - L{Message} content encoding + - protocol negotiation/multiprotocol impl +""" + +from qpid.datatypes import timestamp, uuid4, Serial +from qpid.messaging.constants import * +from qpid.messaging.endpoints import * +from qpid.messaging.exceptions import * +from qpid.messaging.message import * diff --git a/qpid/python/qpid/address.py b/qpid/python/qpid/messaging/address.py index ab0fe8221a..bf494433e4 100644 --- a/qpid/python/qpid/address.py +++ b/qpid/python/qpid/messaging/address.py @@ -17,8 +17,8 @@ # under the License. # import re -from lexer import Lexicon, LexError -from parser import Parser, ParseError +from qpid.lexer import Lexicon, LexError +from qpid.parser import Parser, ParseError l = Lexicon() diff --git a/qpid/python/qpid/messaging/constants.py b/qpid/python/qpid/messaging/constants.py new file mode 100644 index 0000000000..cad47bd52a --- /dev/null +++ b/qpid/python/qpid/messaging/constants.py @@ -0,0 +1,32 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +class Constant: + + def __init__(self, name, value=None): + self.name = name + self.value = value + + def __repr__(self): + return self.name + +AMQP_PORT = 5672 +AMQPS_PORT = 5671 + +UNLIMITED = Constant("UNLIMITED", 0xFFFFFFFFL) diff --git a/qpid/python/qpid/driver.py b/qpid/python/qpid/messaging/driver.py index ed6fbc3b6a..9d58616804 100644 --- a/qpid/python/qpid/driver.py +++ b/qpid/python/qpid/messaging/driver.py @@ -17,18 +17,25 @@ # under the License. # -import address, compat, connection, sasl, socket, struct, sys, time -from concurrency import synchronized -from datatypes import RangedSet, Serial -from exceptions import Timeout, VersionError -from framing import OpEncoder, SegmentEncoder, FrameEncoder, FrameDecoder, SegmentDecoder, OpDecoder +import socket, struct, sys, time from logging import getLogger -from messaging import get_codec, ConnectError, Message, Pattern, UNLIMITED -from ops import * -from selector import Selector +from qpid import compat +from qpid import sasl +from qpid.concurrency import synchronized +from qpid.datatypes import RangedSet, Serial +from qpid.exceptions import Timeout, VersionError +from qpid.framing import OpEncoder, SegmentEncoder, FrameEncoder, \ + FrameDecoder, SegmentDecoder, OpDecoder +from qpid.messaging import address +from qpid.messaging.constants import UNLIMITED +from qpid.messaging.endpoints import Pattern +from qpid.messaging.exceptions import ConnectError +from qpid.messaging.message import get_codec, Message +from qpid.ops import * +from qpid.selector import Selector +from qpid.util import connect +from qpid.validator import And, Context, Map, Types, Values from threading import Condition, Thread -from util import connect -from validator import And, Context, Map, Types, Values log = getLogger("qpid.messaging") rawlog = getLogger("qpid.messaging.io.raw") @@ -190,7 +197,6 @@ class LinkIn: raise Exception("can't supply both subject and filter") elif _rcv.subject: # XXX - from messaging import Pattern f = Pattern(_rcv.subject) else: f = filter diff --git a/qpid/python/qpid/messaging.py b/qpid/python/qpid/messaging/endpoints.py index ed67879a4b..2337986ecb 100644 --- a/qpid/python/qpid/messaging.py +++ b/qpid/python/qpid/messaging/endpoints.py @@ -28,45 +28,21 @@ Areas that still need work: - protocol negotiation/multiprotocol impl """ -from codec010 import StringCodec -from concurrency import synchronized, Waiter, Condition -from datatypes import timestamp, uuid4, Serial from logging import getLogger -from ops import PRIMITIVE +from qpid.codec010 import StringCodec +from qpid.concurrency import synchronized, Waiter, Condition +from qpid.datatypes import Serial, uuid4 +from qpid.messaging.constants import * +from qpid.messaging.exceptions import * +from qpid.messaging.message import * +from qpid.ops import PRIMITIVE +from qpid.util import default from threading import Thread, RLock -from util import default log = getLogger("qpid.messaging") static = staticmethod -AMQP_PORT = 5672 -AMQPS_PORT = 5671 - -class Constant: - - def __init__(self, name, value=None): - self.name = name - self.value = value - - def __repr__(self): - return self.name - -UNLIMITED = Constant("UNLIMITED", 0xFFFFFFFFL) - -class ConnectionError(Exception): - """ - The base class for all connection related exceptions. - """ - pass - -class ConnectError(ConnectionError): - """ - Exception raised when there is an error connecting to the remote - peer. - """ - pass - class Connection: """ @@ -230,26 +206,6 @@ class Pattern: sst.write_cmd(ExchangeBind(exchange=exchange, queue=queue, binding_key=self.value.replace("*", "#"))) -class SessionError(Exception): - pass - -class Disconnected(SessionError): - """ - Exception raised when an operation is attempted that is illegal when - disconnected. - """ - pass - -class NontransactionalSession(SessionError): - """ - Exception raised when commit or rollback is attempted on a non - transactional session. - """ - pass - -class TransactionAborted(SessionError): - pass - class Session: """ @@ -639,12 +595,6 @@ class Session: self._ewait(lambda: self.closed) self.connection._remove_session(self) -class SendError(SessionError): - pass - -class InsufficientCapacity(SendError): - pass - class Sender: """ @@ -757,16 +707,6 @@ class Sender: finally: self.session.senders.remove(self) -class ReceiveError(SessionError): - pass - -class Empty(ReceiveError): - """ - Exception raised by L{Receiver.fetch} when there is no message - available within the alloted time. - """ - pass - class Receiver(object): """ @@ -889,125 +829,4 @@ class Receiver(object): finally: self.session.receivers.remove(self) -def codec(name): - type = PRIMITIVE[name] - - def encode(x): - sc = StringCodec() - sc.write_primitive(type, x) - return sc.encoded - - def decode(x): - sc = StringCodec(x) - return sc.read_primitive(type) - - return encode, decode - -# XXX: need to correctly parse the mime type and deal with -# content-encoding header - -TYPE_MAPPINGS={ - dict: "amqp/map", - list: "amqp/list", - unicode: "text/plain; charset=utf8", - unicode: "text/plain", - buffer: None, - str: None, - None.__class__: None - } - -TYPE_CODEC={ - "amqp/map": codec("map"), - "amqp/list": codec("list"), - "text/plain; charset=utf8": (lambda x: x.encode("utf8"), lambda x: x.decode("utf8")), - "text/plain": (lambda x: x.encode("utf8"), lambda x: x.decode("utf8")), - "": (lambda x: x, lambda x: x), - None: (lambda x: x, lambda x: x) - } - -def get_type(content): - return TYPE_MAPPINGS[content.__class__] - -def get_codec(content_type): - return TYPE_CODEC[content_type] - -UNSPECIFIED = object() - -class Message: - - """ - A message consists of a standard set of fields, an application - defined set of properties, and some content. - - @type id: str - @ivar id: the message id - @type user_id: str - @ivar user_id: the user-id of the message producer - @type to: str - @ivar to: the destination address - @type reply_to: str - @ivar reply_to: the address to send replies - @type correlation_id: str - @ivar correlation_id: a correlation-id for the message - @type properties: dict - @ivar properties: application specific message properties - @type content_type: str - @ivar content_type: the content-type of the message - @type content: str, unicode, buffer, dict, list - @ivar content: the message content - """ - - def __init__(self, content=None, content_type=UNSPECIFIED, id=None, - subject=None, to=None, user_id=None, reply_to=None, - correlation_id=None, durable=None, properties=None): - """ - Construct a new message with the supplied content. The - content-type of the message will be automatically inferred from - type of the content parameter. - - @type content: str, unicode, buffer, dict, list - @param content: the message content - - @type content_type: str - @param content_type: the content-type of the message - """ - self.id = id - self.subject = subject - self.to = to - self.user_id = user_id - self.reply_to = reply_to - self.correlation_id = correlation_id - self.durable = durable - self.redelivered = False - if properties is None: - self.properties = {} - else: - self.properties = properties - if content_type is UNSPECIFIED: - self.content_type = get_type(content) - else: - self.content_type = content_type - self.content = content - - def __repr__(self): - args = [] - for name in ["id", "subject", "to", "user_id", "reply_to", - "correlation_id"]: - value = self.__dict__[name] - if value is not None: args.append("%s=%r" % (name, value)) - for name in ["durable", "properties"]: - value = self.__dict__[name] - if value: args.append("%s=%r" % (name, value)) - if self.content_type != get_type(self.content): - args.append("content_type=%r" % self.content_type) - if self.content is not None: - if args: - args.append("content=%r" % self.content) - else: - args.append(repr(self.content)) - return "Message(%s)" % ", ".join(args) - -__all__ = ["Connection", "Session", "Sender", "Receiver", "Pattern", "Message", - "ConnectionError", "ConnectError", "SessionError", "Disconnected", - "SendError", "InsufficientCapacity", "ReceiveError", "Empty", - "timestamp", "uuid4", "UNLIMITED", "AMQP_PORT", "AMQPS_PORT"] +__all__ = ["Connection", "Session", "Sender", "Receiver"] diff --git a/qpid/python/qpid/messaging/exceptions.py b/qpid/python/qpid/messaging/exceptions.py new file mode 100644 index 0000000000..5c8bdedc26 --- /dev/null +++ b/qpid/python/qpid/messaging/exceptions.py @@ -0,0 +1,67 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +class ConnectionError(Exception): + """ + The base class for all connection related exceptions. + """ + pass + +class ConnectError(ConnectionError): + """ + Exception raised when there is an error connecting to the remote + peer. + """ + pass + +class SessionError(Exception): + pass + +class Disconnected(SessionError): + """ + Exception raised when an operation is attempted that is illegal when + disconnected. + """ + pass + +class NontransactionalSession(SessionError): + """ + Exception raised when commit or rollback is attempted on a non + transactional session. + """ + pass + +class TransactionAborted(SessionError): + pass + +class SendError(SessionError): + pass + +class InsufficientCapacity(SendError): + pass + +class ReceiveError(SessionError): + pass + +class Empty(ReceiveError): + """ + Exception raised by L{Receiver.fetch} when there is no message + available within the alloted time. + """ + pass diff --git a/qpid/python/qpid/messaging/message.py b/qpid/python/qpid/messaging/message.py new file mode 100644 index 0000000000..1c7c7beb81 --- /dev/null +++ b/qpid/python/qpid/messaging/message.py @@ -0,0 +1,141 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from qpid.codec010 import StringCodec +from qpid.ops import PRIMITIVE + +def codec(name): + type = PRIMITIVE[name] + + def encode(x): + sc = StringCodec() + sc.write_primitive(type, x) + return sc.encoded + + def decode(x): + sc = StringCodec(x) + return sc.read_primitive(type) + + return encode, decode + +# XXX: need to correctly parse the mime type and deal with +# content-encoding header + +TYPE_MAPPINGS={ + dict: "amqp/map", + list: "amqp/list", + unicode: "text/plain; charset=utf8", + unicode: "text/plain", + buffer: None, + str: None, + None.__class__: None + } + +TYPE_CODEC={ + "amqp/map": codec("map"), + "amqp/list": codec("list"), + "text/plain; charset=utf8": (lambda x: x.encode("utf8"), lambda x: x.decode("utf8")), + "text/plain": (lambda x: x.encode("utf8"), lambda x: x.decode("utf8")), + "": (lambda x: x, lambda x: x), + None: (lambda x: x, lambda x: x) + } + +def get_type(content): + return TYPE_MAPPINGS[content.__class__] + +def get_codec(content_type): + return TYPE_CODEC[content_type] + +UNSPECIFIED = object() + +class Message: + + """ + A message consists of a standard set of fields, an application + defined set of properties, and some content. + + @type id: str + @ivar id: the message id + @type user_id: str + @ivar user_id: the user-id of the message producer + @type to: str + @ivar to: the destination address + @type reply_to: str + @ivar reply_to: the address to send replies + @type correlation_id: str + @ivar correlation_id: a correlation-id for the message + @type properties: dict + @ivar properties: application specific message properties + @type content_type: str + @ivar content_type: the content-type of the message + @type content: str, unicode, buffer, dict, list + @ivar content: the message content + """ + + def __init__(self, content=None, content_type=UNSPECIFIED, id=None, + subject=None, to=None, user_id=None, reply_to=None, + correlation_id=None, durable=None, properties=None): + """ + Construct a new message with the supplied content. The + content-type of the message will be automatically inferred from + type of the content parameter. + + @type content: str, unicode, buffer, dict, list + @param content: the message content + + @type content_type: str + @param content_type: the content-type of the message + """ + self.id = id + self.subject = subject + self.to = to + self.user_id = user_id + self.reply_to = reply_to + self.correlation_id = correlation_id + self.durable = durable + self.redelivered = False + if properties is None: + self.properties = {} + else: + self.properties = properties + if content_type is UNSPECIFIED: + self.content_type = get_type(content) + else: + self.content_type = content_type + self.content = content + + def __repr__(self): + args = [] + for name in ["id", "subject", "to", "user_id", "reply_to", + "correlation_id"]: + value = self.__dict__[name] + if value is not None: args.append("%s=%r" % (name, value)) + for name in ["durable", "properties"]: + value = self.__dict__[name] + if value: args.append("%s=%r" % (name, value)) + if self.content_type != get_type(self.content): + args.append("content_type=%r" % self.content_type) + if self.content is not None: + if args: + args.append("content=%r" % self.content) + else: + args.append(repr(self.content)) + return "Message(%s)" % ", ".join(args) + +__all__ = ["Message"] diff --git a/qpid/python/qpid/tests/__init__.py b/qpid/python/qpid/tests/__init__.py index 039214ca42..101a0c3759 100644 --- a/qpid/python/qpid/tests/__init__.py +++ b/qpid/python/qpid/tests/__init__.py @@ -26,7 +26,35 @@ class Test: self.config = config # API Tests -import address, framing, mimetype, messaging +import qpid.tests.framing +import qpid.tests.mimetype +import qpid.tests.messaging # Legacy Tests -import codec, queue, datatypes, connection, spec010, codec010 +import qpid.tests.codec +import qpid.tests.queue +import qpid.tests.datatypes +import qpid.tests.connection +import qpid.tests.spec010 +import qpid.tests.codec010 + +class TestTestsXXX(Test): + + def testFoo(self): + print "this test has output" + + def testBar(self): + print "this test "*8 + print "has"*10 + print "a"*75 + print "lot of"*10 + print "output"*10 + + def testQux(self): + import sys + sys.stdout.write("this test has output with no newline") + + def testQuxFail(self): + import sys + sys.stdout.write("this test has output with no newline") + fdsa diff --git a/qpid/python/qpid/tests/messaging/__init__.py b/qpid/python/qpid/tests/messaging/__init__.py new file mode 100644 index 0000000000..6785614f41 --- /dev/null +++ b/qpid/python/qpid/tests/messaging/__init__.py @@ -0,0 +1,106 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import time +from qpid.messaging import * +from qpid.tests import Test + +class Base(Test): + + def setup_connection(self): + return None + + def setup_session(self): + return None + + def setup_sender(self): + return None + + def setup_receiver(self): + return None + + def setup(self): + self.test_id = uuid4() + self.broker = self.config.broker + try: + self.conn = self.setup_connection() + except ConnectError, e: + raise Skipped(e) + self.ssn = self.setup_session() + self.snd = self.setup_sender() + if self.snd is not None: + self.snd.durable = self.durable() + self.rcv = self.setup_receiver() + + def teardown(self): + if self.conn is not None and self.conn.connected(): + self.conn.close() + + def content(self, base, count = None): + if count is None: + return "%s[%s]" % (base, self.test_id) + else: + return "%s[%s, %s]" % (base, count, self.test_id) + + def ping(self, ssn): + PING_Q = 'ping-queue; {create: always, delete: always}' + # send a message + sender = ssn.sender(PING_Q, durable=self.durable()) + content = self.content("ping") + sender.send(content) + receiver = ssn.receiver(PING_Q) + msg = receiver.fetch(0) + ssn.acknowledge() + assert msg.content == content, "expected %r, got %r" % (content, msg.content) + + def drain(self, rcv, limit=None, timeout=0, expected=None): + contents = [] + try: + while limit is None or len(contents) < limit: + contents.append(rcv.fetch(timeout=timeout).content) + except Empty: + pass + if expected is not None: + assert expected == contents, "expected %s, got %s" % (expected, contents) + return contents + + def assertEmpty(self, rcv): + contents = self.drain(rcv) + assert len(contents) == 0, "%s is supposed to be empty: %s" % (rcv, contents) + + def assertPending(self, rcv, expected): + p = rcv.pending() + assert p == expected, "expected %s, got %s" % (expected, p) + + def sleep(self): + time.sleep(self.delay()) + + def delay(self): + return float(self.config.defines.get("delay", "2")) + + def get_bool(self, name): + return self.config.defines.get(name, "false").lower() in ("true", "yes", "1") + + def durable(self): + return self.get_bool("durable") + + def reconnect(self): + return self.get_bool("reconnect") + +import address, endpoints, message diff --git a/qpid/python/qpid/tests/address.py b/qpid/python/qpid/tests/messaging/address.py index 7e6c6a5ee5..7adbc0c6f7 100644 --- a/qpid/python/qpid/tests/address.py +++ b/qpid/python/qpid/tests/messaging/address.py @@ -19,11 +19,11 @@ from qpid.tests import Test -from qpid.address import lex, parse, ParseError, EOF, ID, NUMBER, SYM, WSPACE, \ +from qpid.messaging.address import lex, parse, ParseError, EOF, ID, NUMBER, SYM, WSPACE, \ LEXER from qpid.lexer import Token from qpid.harness import Skipped -from parser import ParserBase +from qpid.tests.parser import ParserBase def indent(st): return " " + st.replace("\n", "\n ") diff --git a/qpid/python/qpid/tests/messaging.py b/qpid/python/qpid/tests/messaging/endpoints.py index 125f1b7157..2e70f13f3a 100644 --- a/qpid/python/qpid/tests/messaging.py +++ b/qpid/python/qpid/tests/messaging/endpoints.py @@ -22,94 +22,9 @@ import time from qpid import compat -from qpid.tests import Test from qpid.harness import Skipped -from qpid.messaging import Connection, ConnectError, Disconnected, Empty, \ - InsufficientCapacity, Message, ReceiveError, SendError, SessionError, \ - UNLIMITED, uuid4 -from Queue import Queue, Empty as QueueEmpty - -class Base(Test): - - def setup_connection(self): - return None - - def setup_session(self): - return None - - def setup_sender(self): - return None - - def setup_receiver(self): - return None - - def setup(self): - self.test_id = uuid4() - self.broker = self.config.broker - try: - self.conn = self.setup_connection() - except ConnectError, e: - raise Skipped(e) - self.ssn = self.setup_session() - self.snd = self.setup_sender() - if self.snd is not None: - self.snd.durable = self.durable() - self.rcv = self.setup_receiver() - - def teardown(self): - if self.conn is not None and self.conn.connected(): - self.conn.close() - - def content(self, base, count = None): - if count is None: - return "%s[%s]" % (base, self.test_id) - else: - return "%s[%s, %s]" % (base, count, self.test_id) - - def ping(self, ssn): - PING_Q = 'ping-queue; {create: always, delete: always}' - # send a message - sender = ssn.sender(PING_Q, durable=self.durable()) - content = self.content("ping") - sender.send(content) - receiver = ssn.receiver(PING_Q) - msg = receiver.fetch(0) - ssn.acknowledge() - assert msg.content == content, "expected %r, got %r" % (content, msg.content) - - def drain(self, rcv, limit=None, timeout=0, expected=None): - contents = [] - try: - while limit is None or len(contents) < limit: - contents.append(rcv.fetch(timeout=timeout).content) - except Empty: - pass - if expected is not None: - assert expected == contents, "expected %s, got %s" % (expected, contents) - return contents - - def assertEmpty(self, rcv): - contents = self.drain(rcv) - assert len(contents) == 0, "%s is supposed to be empty: %s" % (rcv, contents) - - def assertPending(self, rcv, expected): - p = rcv.pending() - assert p == expected, "expected %s, got %s" % (expected, p) - - def sleep(self): - time.sleep(self.delay()) - - def delay(self): - return float(self.config.defines.get("delay", "2")) - - def get_bool(self, name): - return self.config.defines.get(name, "false").lower() in ("true", "yes", "1") - - def durable(self): - return self.get_bool("durable") - - def reconnect(self): - return self.get_bool("reconnect") +from qpid.messaging import * +from qpid.tests.messaging import Base class SetupTests(Base): @@ -961,119 +876,3 @@ class SenderTests(Base): self.drain(self.rcv, expected=msgs) self.ssn.acknowledge() assert caught, "did not exceed capacity" - -class MessageTests(Base): - - def testCreateString(self): - m = Message("string") - assert m.content == "string" - assert m.content_type is None - - def testCreateUnicode(self): - m = Message(u"unicode") - assert m.content == u"unicode" - assert m.content_type == "text/plain" - - def testCreateMap(self): - m = Message({}) - assert m.content == {} - assert m.content_type == "amqp/map" - - def testCreateList(self): - m = Message([]) - assert m.content == [] - assert m.content_type == "amqp/list" - - def testContentTypeOverride(self): - m = Message() - m.content_type = "text/html; charset=utf8" - m.content = u"<html/>" - assert m.content_type == "text/html; charset=utf8" - -ECHO_Q = 'test-message-echo-queue; {create: always, delete: always}' - -class MessageEchoTests(Base): - - def setup_connection(self): - return Connection.open(self.broker.host, self.broker.port, - reconnect=self.reconnect()) - - def setup_session(self): - return self.conn.session() - - def setup_sender(self): - return self.ssn.sender(ECHO_Q) - - def setup_receiver(self): - return self.ssn.receiver(ECHO_Q) - - def check(self, msg): - self.snd.send(msg) - echo = self.rcv.fetch(0) - - assert msg.id == echo.id - assert msg.subject == echo.subject - assert msg.user_id == echo.user_id - assert msg.to == echo.to - assert msg.reply_to == echo.reply_to - assert msg.correlation_id == echo.correlation_id - assert msg.properties == echo.properties - assert msg.content_type == echo.content_type - assert msg.content == echo.content, "%s, %s" % (msg, echo) - - self.ssn.acknowledge(echo) - - def testStringContent(self): - self.check(Message("string")) - - def testUnicodeContent(self): - self.check(Message(u"unicode")) - - - TEST_MAP = {"key1": "string", - "key2": u"unicode", - "key3": 3, - "key4": -3, - "key5": 3.14, - "key6": -3.14, - "key7": ["one", 2, 3.14], - "key8": [], - "key9": {"sub-key0": 3}} - - def testMapContent(self): - self.check(Message(MessageEchoTests.TEST_MAP)) - - def testListContent(self): - self.check(Message([])) - self.check(Message([1, 2, 3])) - self.check(Message(["one", 2, 3.14, {"four": 4}])) - - def testProperties(self): - msg = Message() - msg.to = "to-address" - msg.subject = "subject" - msg.correlation_id = str(self.test_id) - msg.properties = MessageEchoTests.TEST_MAP - msg.reply_to = "reply-address" - self.check(msg) - -class TestTestsXXX(Test): - - def testFoo(self): - print "this test has output" - - def testBar(self): - print "this test "*8 - print "has"*10 - print "a"*75 - print "lot of"*10 - print "output"*10 - - def testQux(self): - import sys - sys.stdout.write("this test has output with no newline") - - def testQuxFail(self): - import sys - sys.stdout.write("this test has output with no newline") - fdsa diff --git a/qpid/python/qpid/tests/messaging/message.py b/qpid/python/qpid/tests/messaging/message.py new file mode 100644 index 0000000000..ef2ec1aac4 --- /dev/null +++ b/qpid/python/qpid/tests/messaging/message.py @@ -0,0 +1,116 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from qpid.messaging import * +from qpid.tests.messaging import Base + +class MessageTests(Base): + + def testCreateString(self): + m = Message("string") + assert m.content == "string" + assert m.content_type is None + + def testCreateUnicode(self): + m = Message(u"unicode") + assert m.content == u"unicode" + assert m.content_type == "text/plain" + + def testCreateMap(self): + m = Message({}) + assert m.content == {} + assert m.content_type == "amqp/map" + + def testCreateList(self): + m = Message([]) + assert m.content == [] + assert m.content_type == "amqp/list" + + def testContentTypeOverride(self): + m = Message() + m.content_type = "text/html; charset=utf8" + m.content = u"<html/>" + assert m.content_type == "text/html; charset=utf8" + +ECHO_Q = 'test-message-echo-queue; {create: always, delete: always}' + +class MessageEchoTests(Base): + + def setup_connection(self): + return Connection.open(self.broker.host, self.broker.port, + reconnect=self.reconnect()) + + def setup_session(self): + return self.conn.session() + + def setup_sender(self): + return self.ssn.sender(ECHO_Q) + + def setup_receiver(self): + return self.ssn.receiver(ECHO_Q) + + def check(self, msg): + self.snd.send(msg) + echo = self.rcv.fetch(0) + + assert msg.id == echo.id + assert msg.subject == echo.subject + assert msg.user_id == echo.user_id + assert msg.to == echo.to + assert msg.reply_to == echo.reply_to + assert msg.correlation_id == echo.correlation_id + assert msg.properties == echo.properties + assert msg.content_type == echo.content_type + assert msg.content == echo.content, "%s, %s" % (msg, echo) + + self.ssn.acknowledge(echo) + + def testStringContent(self): + self.check(Message("string")) + + def testUnicodeContent(self): + self.check(Message(u"unicode")) + + + TEST_MAP = {"key1": "string", + "key2": u"unicode", + "key3": 3, + "key4": -3, + "key5": 3.14, + "key6": -3.14, + "key7": ["one", 2, 3.14], + "key8": [], + "key9": {"sub-key0": 3}} + + def testMapContent(self): + self.check(Message(MessageEchoTests.TEST_MAP)) + + def testListContent(self): + self.check(Message([])) + self.check(Message([1, 2, 3])) + self.check(Message(["one", 2, 3.14, {"four": 4}])) + + def testProperties(self): + msg = Message() + msg.to = "to-address" + msg.subject = "subject" + msg.correlation_id = str(self.test_id) + msg.properties = MessageEchoTests.TEST_MAP + msg.reply_to = "reply-address" + self.check(msg) |