summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2010-02-18 20:22:23 +0000
committerRafael H. Schloming <rhs@apache.org>2010-02-18 20:22:23 +0000
commit1d2ab4dbafd09fd0ae959b48810c2ef93f8d7af4 (patch)
tree62518b17d00a16d0d253922af7916987a2c5df42
parent6dc4db12c7055ff40d43ed020a847517cd56033f (diff)
downloadqpid-python-1d2ab4dbafd09fd0ae959b48810c2ef93f8d7af4.tar.gz
split messaging.py into multiple files and made qpid.messaging a package
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@911550 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/python/qpid/messaging/__init__.py35
-rw-r--r--qpid/python/qpid/messaging/address.py (renamed from qpid/python/qpid/address.py)4
-rw-r--r--qpid/python/qpid/messaging/constants.py32
-rw-r--r--qpid/python/qpid/messaging/driver.py (renamed from qpid/python/qpid/driver.py)28
-rw-r--r--qpid/python/qpid/messaging/endpoints.py (renamed from qpid/python/qpid/messaging.py)199
-rw-r--r--qpid/python/qpid/messaging/exceptions.py67
-rw-r--r--qpid/python/qpid/messaging/message.py141
-rw-r--r--qpid/python/qpid/tests/__init__.py32
-rw-r--r--qpid/python/qpid/tests/messaging/__init__.py106
-rw-r--r--qpid/python/qpid/tests/messaging/address.py (renamed from qpid/python/qpid/tests/address.py)4
-rw-r--r--qpid/python/qpid/tests/messaging/endpoints.py (renamed from qpid/python/qpid/tests/messaging.py)205
-rw-r--r--qpid/python/qpid/tests/messaging/message.py116
12 files changed, 559 insertions, 410 deletions
diff --git a/qpid/python/qpid/messaging/__init__.py b/qpid/python/qpid/messaging/__init__.py
new file mode 100644
index 0000000000..f9ddda2e80
--- /dev/null
+++ b/qpid/python/qpid/messaging/__init__.py
@@ -0,0 +1,35 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+A candidate high level messaging API for python.
+
+Areas that still need work:
+
+ - definition of the arguments for L{Session.sender} and L{Session.receiver}
+ - standard L{Message} properties
+ - L{Message} content encoding
+ - protocol negotiation/multiprotocol impl
+"""
+
+from qpid.datatypes import timestamp, uuid4, Serial
+from qpid.messaging.constants import *
+from qpid.messaging.endpoints import *
+from qpid.messaging.exceptions import *
+from qpid.messaging.message import *
diff --git a/qpid/python/qpid/address.py b/qpid/python/qpid/messaging/address.py
index ab0fe8221a..bf494433e4 100644
--- a/qpid/python/qpid/address.py
+++ b/qpid/python/qpid/messaging/address.py
@@ -17,8 +17,8 @@
# under the License.
#
import re
-from lexer import Lexicon, LexError
-from parser import Parser, ParseError
+from qpid.lexer import Lexicon, LexError
+from qpid.parser import Parser, ParseError
l = Lexicon()
diff --git a/qpid/python/qpid/messaging/constants.py b/qpid/python/qpid/messaging/constants.py
new file mode 100644
index 0000000000..cad47bd52a
--- /dev/null
+++ b/qpid/python/qpid/messaging/constants.py
@@ -0,0 +1,32 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+class Constant:
+
+ def __init__(self, name, value=None):
+ self.name = name
+ self.value = value
+
+ def __repr__(self):
+ return self.name
+
+AMQP_PORT = 5672
+AMQPS_PORT = 5671
+
+UNLIMITED = Constant("UNLIMITED", 0xFFFFFFFFL)
diff --git a/qpid/python/qpid/driver.py b/qpid/python/qpid/messaging/driver.py
index ed6fbc3b6a..9d58616804 100644
--- a/qpid/python/qpid/driver.py
+++ b/qpid/python/qpid/messaging/driver.py
@@ -17,18 +17,25 @@
# under the License.
#
-import address, compat, connection, sasl, socket, struct, sys, time
-from concurrency import synchronized
-from datatypes import RangedSet, Serial
-from exceptions import Timeout, VersionError
-from framing import OpEncoder, SegmentEncoder, FrameEncoder, FrameDecoder, SegmentDecoder, OpDecoder
+import socket, struct, sys, time
from logging import getLogger
-from messaging import get_codec, ConnectError, Message, Pattern, UNLIMITED
-from ops import *
-from selector import Selector
+from qpid import compat
+from qpid import sasl
+from qpid.concurrency import synchronized
+from qpid.datatypes import RangedSet, Serial
+from qpid.exceptions import Timeout, VersionError
+from qpid.framing import OpEncoder, SegmentEncoder, FrameEncoder, \
+ FrameDecoder, SegmentDecoder, OpDecoder
+from qpid.messaging import address
+from qpid.messaging.constants import UNLIMITED
+from qpid.messaging.endpoints import Pattern
+from qpid.messaging.exceptions import ConnectError
+from qpid.messaging.message import get_codec, Message
+from qpid.ops import *
+from qpid.selector import Selector
+from qpid.util import connect
+from qpid.validator import And, Context, Map, Types, Values
from threading import Condition, Thread
-from util import connect
-from validator import And, Context, Map, Types, Values
log = getLogger("qpid.messaging")
rawlog = getLogger("qpid.messaging.io.raw")
@@ -190,7 +197,6 @@ class LinkIn:
raise Exception("can't supply both subject and filter")
elif _rcv.subject:
# XXX
- from messaging import Pattern
f = Pattern(_rcv.subject)
else:
f = filter
diff --git a/qpid/python/qpid/messaging.py b/qpid/python/qpid/messaging/endpoints.py
index ed67879a4b..2337986ecb 100644
--- a/qpid/python/qpid/messaging.py
+++ b/qpid/python/qpid/messaging/endpoints.py
@@ -28,45 +28,21 @@ Areas that still need work:
- protocol negotiation/multiprotocol impl
"""
-from codec010 import StringCodec
-from concurrency import synchronized, Waiter, Condition
-from datatypes import timestamp, uuid4, Serial
from logging import getLogger
-from ops import PRIMITIVE
+from qpid.codec010 import StringCodec
+from qpid.concurrency import synchronized, Waiter, Condition
+from qpid.datatypes import Serial, uuid4
+from qpid.messaging.constants import *
+from qpid.messaging.exceptions import *
+from qpid.messaging.message import *
+from qpid.ops import PRIMITIVE
+from qpid.util import default
from threading import Thread, RLock
-from util import default
log = getLogger("qpid.messaging")
static = staticmethod
-AMQP_PORT = 5672
-AMQPS_PORT = 5671
-
-class Constant:
-
- def __init__(self, name, value=None):
- self.name = name
- self.value = value
-
- def __repr__(self):
- return self.name
-
-UNLIMITED = Constant("UNLIMITED", 0xFFFFFFFFL)
-
-class ConnectionError(Exception):
- """
- The base class for all connection related exceptions.
- """
- pass
-
-class ConnectError(ConnectionError):
- """
- Exception raised when there is an error connecting to the remote
- peer.
- """
- pass
-
class Connection:
"""
@@ -230,26 +206,6 @@ class Pattern:
sst.write_cmd(ExchangeBind(exchange=exchange, queue=queue,
binding_key=self.value.replace("*", "#")))
-class SessionError(Exception):
- pass
-
-class Disconnected(SessionError):
- """
- Exception raised when an operation is attempted that is illegal when
- disconnected.
- """
- pass
-
-class NontransactionalSession(SessionError):
- """
- Exception raised when commit or rollback is attempted on a non
- transactional session.
- """
- pass
-
-class TransactionAborted(SessionError):
- pass
-
class Session:
"""
@@ -639,12 +595,6 @@ class Session:
self._ewait(lambda: self.closed)
self.connection._remove_session(self)
-class SendError(SessionError):
- pass
-
-class InsufficientCapacity(SendError):
- pass
-
class Sender:
"""
@@ -757,16 +707,6 @@ class Sender:
finally:
self.session.senders.remove(self)
-class ReceiveError(SessionError):
- pass
-
-class Empty(ReceiveError):
- """
- Exception raised by L{Receiver.fetch} when there is no message
- available within the alloted time.
- """
- pass
-
class Receiver(object):
"""
@@ -889,125 +829,4 @@ class Receiver(object):
finally:
self.session.receivers.remove(self)
-def codec(name):
- type = PRIMITIVE[name]
-
- def encode(x):
- sc = StringCodec()
- sc.write_primitive(type, x)
- return sc.encoded
-
- def decode(x):
- sc = StringCodec(x)
- return sc.read_primitive(type)
-
- return encode, decode
-
-# XXX: need to correctly parse the mime type and deal with
-# content-encoding header
-
-TYPE_MAPPINGS={
- dict: "amqp/map",
- list: "amqp/list",
- unicode: "text/plain; charset=utf8",
- unicode: "text/plain",
- buffer: None,
- str: None,
- None.__class__: None
- }
-
-TYPE_CODEC={
- "amqp/map": codec("map"),
- "amqp/list": codec("list"),
- "text/plain; charset=utf8": (lambda x: x.encode("utf8"), lambda x: x.decode("utf8")),
- "text/plain": (lambda x: x.encode("utf8"), lambda x: x.decode("utf8")),
- "": (lambda x: x, lambda x: x),
- None: (lambda x: x, lambda x: x)
- }
-
-def get_type(content):
- return TYPE_MAPPINGS[content.__class__]
-
-def get_codec(content_type):
- return TYPE_CODEC[content_type]
-
-UNSPECIFIED = object()
-
-class Message:
-
- """
- A message consists of a standard set of fields, an application
- defined set of properties, and some content.
-
- @type id: str
- @ivar id: the message id
- @type user_id: str
- @ivar user_id: the user-id of the message producer
- @type to: str
- @ivar to: the destination address
- @type reply_to: str
- @ivar reply_to: the address to send replies
- @type correlation_id: str
- @ivar correlation_id: a correlation-id for the message
- @type properties: dict
- @ivar properties: application specific message properties
- @type content_type: str
- @ivar content_type: the content-type of the message
- @type content: str, unicode, buffer, dict, list
- @ivar content: the message content
- """
-
- def __init__(self, content=None, content_type=UNSPECIFIED, id=None,
- subject=None, to=None, user_id=None, reply_to=None,
- correlation_id=None, durable=None, properties=None):
- """
- Construct a new message with the supplied content. The
- content-type of the message will be automatically inferred from
- type of the content parameter.
-
- @type content: str, unicode, buffer, dict, list
- @param content: the message content
-
- @type content_type: str
- @param content_type: the content-type of the message
- """
- self.id = id
- self.subject = subject
- self.to = to
- self.user_id = user_id
- self.reply_to = reply_to
- self.correlation_id = correlation_id
- self.durable = durable
- self.redelivered = False
- if properties is None:
- self.properties = {}
- else:
- self.properties = properties
- if content_type is UNSPECIFIED:
- self.content_type = get_type(content)
- else:
- self.content_type = content_type
- self.content = content
-
- def __repr__(self):
- args = []
- for name in ["id", "subject", "to", "user_id", "reply_to",
- "correlation_id"]:
- value = self.__dict__[name]
- if value is not None: args.append("%s=%r" % (name, value))
- for name in ["durable", "properties"]:
- value = self.__dict__[name]
- if value: args.append("%s=%r" % (name, value))
- if self.content_type != get_type(self.content):
- args.append("content_type=%r" % self.content_type)
- if self.content is not None:
- if args:
- args.append("content=%r" % self.content)
- else:
- args.append(repr(self.content))
- return "Message(%s)" % ", ".join(args)
-
-__all__ = ["Connection", "Session", "Sender", "Receiver", "Pattern", "Message",
- "ConnectionError", "ConnectError", "SessionError", "Disconnected",
- "SendError", "InsufficientCapacity", "ReceiveError", "Empty",
- "timestamp", "uuid4", "UNLIMITED", "AMQP_PORT", "AMQPS_PORT"]
+__all__ = ["Connection", "Session", "Sender", "Receiver"]
diff --git a/qpid/python/qpid/messaging/exceptions.py b/qpid/python/qpid/messaging/exceptions.py
new file mode 100644
index 0000000000..5c8bdedc26
--- /dev/null
+++ b/qpid/python/qpid/messaging/exceptions.py
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+class ConnectionError(Exception):
+ """
+ The base class for all connection related exceptions.
+ """
+ pass
+
+class ConnectError(ConnectionError):
+ """
+ Exception raised when there is an error connecting to the remote
+ peer.
+ """
+ pass
+
+class SessionError(Exception):
+ pass
+
+class Disconnected(SessionError):
+ """
+ Exception raised when an operation is attempted that is illegal when
+ disconnected.
+ """
+ pass
+
+class NontransactionalSession(SessionError):
+ """
+ Exception raised when commit or rollback is attempted on a non
+ transactional session.
+ """
+ pass
+
+class TransactionAborted(SessionError):
+ pass
+
+class SendError(SessionError):
+ pass
+
+class InsufficientCapacity(SendError):
+ pass
+
+class ReceiveError(SessionError):
+ pass
+
+class Empty(ReceiveError):
+ """
+ Exception raised by L{Receiver.fetch} when there is no message
+ available within the alloted time.
+ """
+ pass
diff --git a/qpid/python/qpid/messaging/message.py b/qpid/python/qpid/messaging/message.py
new file mode 100644
index 0000000000..1c7c7beb81
--- /dev/null
+++ b/qpid/python/qpid/messaging/message.py
@@ -0,0 +1,141 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from qpid.codec010 import StringCodec
+from qpid.ops import PRIMITIVE
+
+def codec(name):
+ type = PRIMITIVE[name]
+
+ def encode(x):
+ sc = StringCodec()
+ sc.write_primitive(type, x)
+ return sc.encoded
+
+ def decode(x):
+ sc = StringCodec(x)
+ return sc.read_primitive(type)
+
+ return encode, decode
+
+# XXX: need to correctly parse the mime type and deal with
+# content-encoding header
+
+TYPE_MAPPINGS={
+ dict: "amqp/map",
+ list: "amqp/list",
+ unicode: "text/plain; charset=utf8",
+ unicode: "text/plain",
+ buffer: None,
+ str: None,
+ None.__class__: None
+ }
+
+TYPE_CODEC={
+ "amqp/map": codec("map"),
+ "amqp/list": codec("list"),
+ "text/plain; charset=utf8": (lambda x: x.encode("utf8"), lambda x: x.decode("utf8")),
+ "text/plain": (lambda x: x.encode("utf8"), lambda x: x.decode("utf8")),
+ "": (lambda x: x, lambda x: x),
+ None: (lambda x: x, lambda x: x)
+ }
+
+def get_type(content):
+ return TYPE_MAPPINGS[content.__class__]
+
+def get_codec(content_type):
+ return TYPE_CODEC[content_type]
+
+UNSPECIFIED = object()
+
+class Message:
+
+ """
+ A message consists of a standard set of fields, an application
+ defined set of properties, and some content.
+
+ @type id: str
+ @ivar id: the message id
+ @type user_id: str
+ @ivar user_id: the user-id of the message producer
+ @type to: str
+ @ivar to: the destination address
+ @type reply_to: str
+ @ivar reply_to: the address to send replies
+ @type correlation_id: str
+ @ivar correlation_id: a correlation-id for the message
+ @type properties: dict
+ @ivar properties: application specific message properties
+ @type content_type: str
+ @ivar content_type: the content-type of the message
+ @type content: str, unicode, buffer, dict, list
+ @ivar content: the message content
+ """
+
+ def __init__(self, content=None, content_type=UNSPECIFIED, id=None,
+ subject=None, to=None, user_id=None, reply_to=None,
+ correlation_id=None, durable=None, properties=None):
+ """
+ Construct a new message with the supplied content. The
+ content-type of the message will be automatically inferred from
+ type of the content parameter.
+
+ @type content: str, unicode, buffer, dict, list
+ @param content: the message content
+
+ @type content_type: str
+ @param content_type: the content-type of the message
+ """
+ self.id = id
+ self.subject = subject
+ self.to = to
+ self.user_id = user_id
+ self.reply_to = reply_to
+ self.correlation_id = correlation_id
+ self.durable = durable
+ self.redelivered = False
+ if properties is None:
+ self.properties = {}
+ else:
+ self.properties = properties
+ if content_type is UNSPECIFIED:
+ self.content_type = get_type(content)
+ else:
+ self.content_type = content_type
+ self.content = content
+
+ def __repr__(self):
+ args = []
+ for name in ["id", "subject", "to", "user_id", "reply_to",
+ "correlation_id"]:
+ value = self.__dict__[name]
+ if value is not None: args.append("%s=%r" % (name, value))
+ for name in ["durable", "properties"]:
+ value = self.__dict__[name]
+ if value: args.append("%s=%r" % (name, value))
+ if self.content_type != get_type(self.content):
+ args.append("content_type=%r" % self.content_type)
+ if self.content is not None:
+ if args:
+ args.append("content=%r" % self.content)
+ else:
+ args.append(repr(self.content))
+ return "Message(%s)" % ", ".join(args)
+
+__all__ = ["Message"]
diff --git a/qpid/python/qpid/tests/__init__.py b/qpid/python/qpid/tests/__init__.py
index 039214ca42..101a0c3759 100644
--- a/qpid/python/qpid/tests/__init__.py
+++ b/qpid/python/qpid/tests/__init__.py
@@ -26,7 +26,35 @@ class Test:
self.config = config
# API Tests
-import address, framing, mimetype, messaging
+import qpid.tests.framing
+import qpid.tests.mimetype
+import qpid.tests.messaging
# Legacy Tests
-import codec, queue, datatypes, connection, spec010, codec010
+import qpid.tests.codec
+import qpid.tests.queue
+import qpid.tests.datatypes
+import qpid.tests.connection
+import qpid.tests.spec010
+import qpid.tests.codec010
+
+class TestTestsXXX(Test):
+
+ def testFoo(self):
+ print "this test has output"
+
+ def testBar(self):
+ print "this test "*8
+ print "has"*10
+ print "a"*75
+ print "lot of"*10
+ print "output"*10
+
+ def testQux(self):
+ import sys
+ sys.stdout.write("this test has output with no newline")
+
+ def testQuxFail(self):
+ import sys
+ sys.stdout.write("this test has output with no newline")
+ fdsa
diff --git a/qpid/python/qpid/tests/messaging/__init__.py b/qpid/python/qpid/tests/messaging/__init__.py
new file mode 100644
index 0000000000..6785614f41
--- /dev/null
+++ b/qpid/python/qpid/tests/messaging/__init__.py
@@ -0,0 +1,106 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import time
+from qpid.messaging import *
+from qpid.tests import Test
+
+class Base(Test):
+
+ def setup_connection(self):
+ return None
+
+ def setup_session(self):
+ return None
+
+ def setup_sender(self):
+ return None
+
+ def setup_receiver(self):
+ return None
+
+ def setup(self):
+ self.test_id = uuid4()
+ self.broker = self.config.broker
+ try:
+ self.conn = self.setup_connection()
+ except ConnectError, e:
+ raise Skipped(e)
+ self.ssn = self.setup_session()
+ self.snd = self.setup_sender()
+ if self.snd is not None:
+ self.snd.durable = self.durable()
+ self.rcv = self.setup_receiver()
+
+ def teardown(self):
+ if self.conn is not None and self.conn.connected():
+ self.conn.close()
+
+ def content(self, base, count = None):
+ if count is None:
+ return "%s[%s]" % (base, self.test_id)
+ else:
+ return "%s[%s, %s]" % (base, count, self.test_id)
+
+ def ping(self, ssn):
+ PING_Q = 'ping-queue; {create: always, delete: always}'
+ # send a message
+ sender = ssn.sender(PING_Q, durable=self.durable())
+ content = self.content("ping")
+ sender.send(content)
+ receiver = ssn.receiver(PING_Q)
+ msg = receiver.fetch(0)
+ ssn.acknowledge()
+ assert msg.content == content, "expected %r, got %r" % (content, msg.content)
+
+ def drain(self, rcv, limit=None, timeout=0, expected=None):
+ contents = []
+ try:
+ while limit is None or len(contents) < limit:
+ contents.append(rcv.fetch(timeout=timeout).content)
+ except Empty:
+ pass
+ if expected is not None:
+ assert expected == contents, "expected %s, got %s" % (expected, contents)
+ return contents
+
+ def assertEmpty(self, rcv):
+ contents = self.drain(rcv)
+ assert len(contents) == 0, "%s is supposed to be empty: %s" % (rcv, contents)
+
+ def assertPending(self, rcv, expected):
+ p = rcv.pending()
+ assert p == expected, "expected %s, got %s" % (expected, p)
+
+ def sleep(self):
+ time.sleep(self.delay())
+
+ def delay(self):
+ return float(self.config.defines.get("delay", "2"))
+
+ def get_bool(self, name):
+ return self.config.defines.get(name, "false").lower() in ("true", "yes", "1")
+
+ def durable(self):
+ return self.get_bool("durable")
+
+ def reconnect(self):
+ return self.get_bool("reconnect")
+
+import address, endpoints, message
diff --git a/qpid/python/qpid/tests/address.py b/qpid/python/qpid/tests/messaging/address.py
index 7e6c6a5ee5..7adbc0c6f7 100644
--- a/qpid/python/qpid/tests/address.py
+++ b/qpid/python/qpid/tests/messaging/address.py
@@ -19,11 +19,11 @@
from qpid.tests import Test
-from qpid.address import lex, parse, ParseError, EOF, ID, NUMBER, SYM, WSPACE, \
+from qpid.messaging.address import lex, parse, ParseError, EOF, ID, NUMBER, SYM, WSPACE, \
LEXER
from qpid.lexer import Token
from qpid.harness import Skipped
-from parser import ParserBase
+from qpid.tests.parser import ParserBase
def indent(st):
return " " + st.replace("\n", "\n ")
diff --git a/qpid/python/qpid/tests/messaging.py b/qpid/python/qpid/tests/messaging/endpoints.py
index 125f1b7157..2e70f13f3a 100644
--- a/qpid/python/qpid/tests/messaging.py
+++ b/qpid/python/qpid/tests/messaging/endpoints.py
@@ -22,94 +22,9 @@
import time
from qpid import compat
-from qpid.tests import Test
from qpid.harness import Skipped
-from qpid.messaging import Connection, ConnectError, Disconnected, Empty, \
- InsufficientCapacity, Message, ReceiveError, SendError, SessionError, \
- UNLIMITED, uuid4
-from Queue import Queue, Empty as QueueEmpty
-
-class Base(Test):
-
- def setup_connection(self):
- return None
-
- def setup_session(self):
- return None
-
- def setup_sender(self):
- return None
-
- def setup_receiver(self):
- return None
-
- def setup(self):
- self.test_id = uuid4()
- self.broker = self.config.broker
- try:
- self.conn = self.setup_connection()
- except ConnectError, e:
- raise Skipped(e)
- self.ssn = self.setup_session()
- self.snd = self.setup_sender()
- if self.snd is not None:
- self.snd.durable = self.durable()
- self.rcv = self.setup_receiver()
-
- def teardown(self):
- if self.conn is not None and self.conn.connected():
- self.conn.close()
-
- def content(self, base, count = None):
- if count is None:
- return "%s[%s]" % (base, self.test_id)
- else:
- return "%s[%s, %s]" % (base, count, self.test_id)
-
- def ping(self, ssn):
- PING_Q = 'ping-queue; {create: always, delete: always}'
- # send a message
- sender = ssn.sender(PING_Q, durable=self.durable())
- content = self.content("ping")
- sender.send(content)
- receiver = ssn.receiver(PING_Q)
- msg = receiver.fetch(0)
- ssn.acknowledge()
- assert msg.content == content, "expected %r, got %r" % (content, msg.content)
-
- def drain(self, rcv, limit=None, timeout=0, expected=None):
- contents = []
- try:
- while limit is None or len(contents) < limit:
- contents.append(rcv.fetch(timeout=timeout).content)
- except Empty:
- pass
- if expected is not None:
- assert expected == contents, "expected %s, got %s" % (expected, contents)
- return contents
-
- def assertEmpty(self, rcv):
- contents = self.drain(rcv)
- assert len(contents) == 0, "%s is supposed to be empty: %s" % (rcv, contents)
-
- def assertPending(self, rcv, expected):
- p = rcv.pending()
- assert p == expected, "expected %s, got %s" % (expected, p)
-
- def sleep(self):
- time.sleep(self.delay())
-
- def delay(self):
- return float(self.config.defines.get("delay", "2"))
-
- def get_bool(self, name):
- return self.config.defines.get(name, "false").lower() in ("true", "yes", "1")
-
- def durable(self):
- return self.get_bool("durable")
-
- def reconnect(self):
- return self.get_bool("reconnect")
+from qpid.messaging import *
+from qpid.tests.messaging import Base
class SetupTests(Base):
@@ -961,119 +876,3 @@ class SenderTests(Base):
self.drain(self.rcv, expected=msgs)
self.ssn.acknowledge()
assert caught, "did not exceed capacity"
-
-class MessageTests(Base):
-
- def testCreateString(self):
- m = Message("string")
- assert m.content == "string"
- assert m.content_type is None
-
- def testCreateUnicode(self):
- m = Message(u"unicode")
- assert m.content == u"unicode"
- assert m.content_type == "text/plain"
-
- def testCreateMap(self):
- m = Message({})
- assert m.content == {}
- assert m.content_type == "amqp/map"
-
- def testCreateList(self):
- m = Message([])
- assert m.content == []
- assert m.content_type == "amqp/list"
-
- def testContentTypeOverride(self):
- m = Message()
- m.content_type = "text/html; charset=utf8"
- m.content = u"<html/>"
- assert m.content_type == "text/html; charset=utf8"
-
-ECHO_Q = 'test-message-echo-queue; {create: always, delete: always}'
-
-class MessageEchoTests(Base):
-
- def setup_connection(self):
- return Connection.open(self.broker.host, self.broker.port,
- reconnect=self.reconnect())
-
- def setup_session(self):
- return self.conn.session()
-
- def setup_sender(self):
- return self.ssn.sender(ECHO_Q)
-
- def setup_receiver(self):
- return self.ssn.receiver(ECHO_Q)
-
- def check(self, msg):
- self.snd.send(msg)
- echo = self.rcv.fetch(0)
-
- assert msg.id == echo.id
- assert msg.subject == echo.subject
- assert msg.user_id == echo.user_id
- assert msg.to == echo.to
- assert msg.reply_to == echo.reply_to
- assert msg.correlation_id == echo.correlation_id
- assert msg.properties == echo.properties
- assert msg.content_type == echo.content_type
- assert msg.content == echo.content, "%s, %s" % (msg, echo)
-
- self.ssn.acknowledge(echo)
-
- def testStringContent(self):
- self.check(Message("string"))
-
- def testUnicodeContent(self):
- self.check(Message(u"unicode"))
-
-
- TEST_MAP = {"key1": "string",
- "key2": u"unicode",
- "key3": 3,
- "key4": -3,
- "key5": 3.14,
- "key6": -3.14,
- "key7": ["one", 2, 3.14],
- "key8": [],
- "key9": {"sub-key0": 3}}
-
- def testMapContent(self):
- self.check(Message(MessageEchoTests.TEST_MAP))
-
- def testListContent(self):
- self.check(Message([]))
- self.check(Message([1, 2, 3]))
- self.check(Message(["one", 2, 3.14, {"four": 4}]))
-
- def testProperties(self):
- msg = Message()
- msg.to = "to-address"
- msg.subject = "subject"
- msg.correlation_id = str(self.test_id)
- msg.properties = MessageEchoTests.TEST_MAP
- msg.reply_to = "reply-address"
- self.check(msg)
-
-class TestTestsXXX(Test):
-
- def testFoo(self):
- print "this test has output"
-
- def testBar(self):
- print "this test "*8
- print "has"*10
- print "a"*75
- print "lot of"*10
- print "output"*10
-
- def testQux(self):
- import sys
- sys.stdout.write("this test has output with no newline")
-
- def testQuxFail(self):
- import sys
- sys.stdout.write("this test has output with no newline")
- fdsa
diff --git a/qpid/python/qpid/tests/messaging/message.py b/qpid/python/qpid/tests/messaging/message.py
new file mode 100644
index 0000000000..ef2ec1aac4
--- /dev/null
+++ b/qpid/python/qpid/tests/messaging/message.py
@@ -0,0 +1,116 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from qpid.messaging import *
+from qpid.tests.messaging import Base
+
+class MessageTests(Base):
+
+ def testCreateString(self):
+ m = Message("string")
+ assert m.content == "string"
+ assert m.content_type is None
+
+ def testCreateUnicode(self):
+ m = Message(u"unicode")
+ assert m.content == u"unicode"
+ assert m.content_type == "text/plain"
+
+ def testCreateMap(self):
+ m = Message({})
+ assert m.content == {}
+ assert m.content_type == "amqp/map"
+
+ def testCreateList(self):
+ m = Message([])
+ assert m.content == []
+ assert m.content_type == "amqp/list"
+
+ def testContentTypeOverride(self):
+ m = Message()
+ m.content_type = "text/html; charset=utf8"
+ m.content = u"<html/>"
+ assert m.content_type == "text/html; charset=utf8"
+
+ECHO_Q = 'test-message-echo-queue; {create: always, delete: always}'
+
+class MessageEchoTests(Base):
+
+ def setup_connection(self):
+ return Connection.open(self.broker.host, self.broker.port,
+ reconnect=self.reconnect())
+
+ def setup_session(self):
+ return self.conn.session()
+
+ def setup_sender(self):
+ return self.ssn.sender(ECHO_Q)
+
+ def setup_receiver(self):
+ return self.ssn.receiver(ECHO_Q)
+
+ def check(self, msg):
+ self.snd.send(msg)
+ echo = self.rcv.fetch(0)
+
+ assert msg.id == echo.id
+ assert msg.subject == echo.subject
+ assert msg.user_id == echo.user_id
+ assert msg.to == echo.to
+ assert msg.reply_to == echo.reply_to
+ assert msg.correlation_id == echo.correlation_id
+ assert msg.properties == echo.properties
+ assert msg.content_type == echo.content_type
+ assert msg.content == echo.content, "%s, %s" % (msg, echo)
+
+ self.ssn.acknowledge(echo)
+
+ def testStringContent(self):
+ self.check(Message("string"))
+
+ def testUnicodeContent(self):
+ self.check(Message(u"unicode"))
+
+
+ TEST_MAP = {"key1": "string",
+ "key2": u"unicode",
+ "key3": 3,
+ "key4": -3,
+ "key5": 3.14,
+ "key6": -3.14,
+ "key7": ["one", 2, 3.14],
+ "key8": [],
+ "key9": {"sub-key0": 3}}
+
+ def testMapContent(self):
+ self.check(Message(MessageEchoTests.TEST_MAP))
+
+ def testListContent(self):
+ self.check(Message([]))
+ self.check(Message([1, 2, 3]))
+ self.check(Message(["one", 2, 3.14, {"four": 4}]))
+
+ def testProperties(self):
+ msg = Message()
+ msg.to = "to-address"
+ msg.subject = "subject"
+ msg.correlation_id = str(self.test_id)
+ msg.properties = MessageEchoTests.TEST_MAP
+ msg.reply_to = "reply-address"
+ self.check(msg)