summaryrefslogtreecommitdiff
path: root/M4-RCs/qpid/python/tests/connection.py
diff options
context:
space:
mode:
Diffstat (limited to 'M4-RCs/qpid/python/tests/connection.py')
-rw-r--r--M4-RCs/qpid/python/tests/connection.py215
1 files changed, 0 insertions, 215 deletions
diff --git a/M4-RCs/qpid/python/tests/connection.py b/M4-RCs/qpid/python/tests/connection.py
deleted file mode 100644
index 512fa62189..0000000000
--- a/M4-RCs/qpid/python/tests/connection.py
+++ /dev/null
@@ -1,215 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-from threading import *
-from unittest import TestCase
-from qpid.util import connect, listen
-from qpid.connection import *
-from qpid.datatypes import Message
-from qpid.testlib import testrunner
-from qpid.delegates import Server
-from qpid.queue import Queue
-from qpid.spec010 import load
-from qpid.session import Delegate
-
-PORT = 1234
-
-class TestServer:
-
- def __init__(self, queue):
- self.queue = queue
-
- def connection(self, connection):
- return Server(connection, delegate=self.session)
-
- def session(self, session):
- session.auto_sync = False
- return TestSession(session, self.queue)
-
-class TestSession(Delegate):
-
- def __init__(self, session, queue):
- self.session = session
- self.queue = queue
-
- def execution_sync(self, es):
- pass
-
- def queue_query(self, qq):
- return qq._type.result.type.new((qq.queue,), {})
-
- def message_transfer(self, cmd, headers, body):
- if cmd.destination == "echo":
- m = Message(body)
- m.headers = headers
- self.session.message_transfer(cmd.destination, cmd.accept_mode,
- cmd.acquire_mode, m)
- elif cmd.destination == "abort":
- self.session.channel.connection.sock.close()
- else:
- self.queue.put((cmd, headers, body))
-
-class ConnectionTest(TestCase):
-
- def setUp(self):
- self.spec = load(testrunner.get_spec_file("amqp.0-10.xml"))
- self.queue = Queue()
- self.running = True
- started = Event()
-
- def run():
- ts = TestServer(self.queue)
- for s in listen("0.0.0.0", PORT, lambda: self.running, lambda: started.set()):
- conn = Connection(s, self.spec, ts.connection)
- try:
- conn.start(5)
- except Closed:
- pass
-
- self.server = Thread(target=run)
- self.server.setDaemon(True)
- self.server.start()
-
- started.wait(3)
- assert started.isSet()
-
- def tearDown(self):
- self.running = False
- connect("0.0.0.0", PORT).close()
- self.server.join(3)
-
- def connect(self):
- return Connection(connect("0.0.0.0", PORT), self.spec)
-
- def test(self):
- c = self.connect()
- c.start(10)
-
- ssn1 = c.session("test1", timeout=10)
- ssn2 = c.session("test2", timeout=10)
-
- assert ssn1 == c.sessions["test1"]
- assert ssn2 == c.sessions["test2"]
- assert ssn1.channel != None
- assert ssn2.channel != None
- assert ssn1 in c.attached.values()
- assert ssn2 in c.attached.values()
-
- ssn1.close(5)
-
- assert ssn1.channel == None
- assert ssn1 not in c.attached.values()
- assert ssn2 in c.sessions.values()
-
- ssn2.close(5)
-
- assert ssn2.channel == None
- assert ssn2 not in c.attached.values()
- assert ssn2 not in c.sessions.values()
-
- ssn = c.session("session", timeout=10)
-
- assert ssn.channel != None
- assert ssn in c.sessions.values()
-
- destinations = ("one", "two", "three")
-
- for d in destinations:
- ssn.message_transfer(d)
-
- for d in destinations:
- cmd, header, body = self.queue.get(10)
- assert cmd.destination == d
- assert header == None
- assert body == None
-
- msg = Message("this is a test")
- ssn.message_transfer("four", message=msg)
- cmd, header, body = self.queue.get(10)
- assert cmd.destination == "four"
- assert header == None
- assert body == msg.body
-
- qq = ssn.queue_query("asdf")
- assert qq.queue == "asdf"
- c.close(5)
-
- def testCloseGet(self):
- c = self.connect()
- c.start(10)
- ssn = c.session("test", timeout=10)
- echos = ssn.incoming("echo")
-
- for i in range(10):
- ssn.message_transfer("echo", message=Message("test%d" % i))
-
- ssn.auto_sync=False
- ssn.message_transfer("abort")
-
- for i in range(10):
- m = echos.get(timeout=10)
- assert m.body == "test%d" % i
-
- try:
- m = echos.get(timeout=10)
- assert False
- except Closed, e:
- pass
-
- def testCloseListen(self):
- c = self.connect()
- c.start(10)
- ssn = c.session("test", timeout=10)
- echos = ssn.incoming("echo")
-
- messages = []
- exceptions = []
- condition = Condition()
- def listener(m): messages.append(m)
- def exc_listener(e):
- exceptions.append(e)
- condition.acquire()
- condition.notify()
- condition.release()
-
- echos.listen(listener, exc_listener)
-
- for i in range(10):
- ssn.message_transfer("echo", message=Message("test%d" % i))
-
- ssn.auto_sync=False
- ssn.message_transfer("abort")
-
- condition.acquire()
- condition.wait(10)
- condition.release()
-
- for i in range(10):
- m = messages.pop(0)
- assert m.body == "test%d" % i
-
- assert len(exceptions) == 1
-
- def testSync(self):
- c = self.connect()
- c.start(10)
- s = c.session("test")
- s.auto_sync = False
- s.message_transfer("echo", message=Message("test"))
- s.sync(10)