summaryrefslogtreecommitdiff
path: root/cpp/src/tests/store.py
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests/store.py')
-rwxr-xr-xcpp/src/tests/store.py197
1 files changed, 0 insertions, 197 deletions
diff --git a/cpp/src/tests/store.py b/cpp/src/tests/store.py
deleted file mode 100755
index 77e8a78e5d..0000000000
--- a/cpp/src/tests/store.py
+++ /dev/null
@@ -1,197 +0,0 @@
-#!/usr/bin/env python
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-import errno, os, time
-from brokertest import *
-from qpid import compat, session
-from qpid.util import connect
-from qpid.connection import Connection
-from qpid.datatypes import Message, uuid4
-from qpid.queue import Empty
-
-class StoreTests(BrokerTest):
-
- XA_RBROLLBACK = 1
- XA_RBTIMEOUT = 2
- XA_OK = 0
- tx_counter = 0
-
- def configure(self, config):
- self.config = config
- self.defines = self.config.defines
- BrokerTest.configure(self, config)
-
- def setup_connection(self):
- socket = connect(self._broker.host(), self._broker.port())
- return Connection(sock=socket)
-
- def setup_session(self):
- self.conn.start()
- return self.conn.session(str(uuid4()))
-
- def start_session(self):
- self.conn = self.setup_connection()
- self.ssn = self.setup_session()
-
- def setUp(self):
- BrokerTest.setUp(self)
- self._broker = self.broker()
- self.start_session()
-
- def cycle_broker(self):
- # tearDown resets working dir; change it back after.
- d = os.getcwd()
- BrokerTest.tearDown(self)
- os.chdir(d)
- self._broker = None
- self._broker = self.broker()
- self.conn = self.setup_connection()
- self.ssn = self.setup_session()
-
- def xid(self, txid):
- StoreTests.tx_counter += 1
- branchqual = "v%s" % StoreTests.tx_counter
- return self.ssn.xid(format=0, global_id=txid, branch_id=branchqual)
-
- def testDurableExchange(self):
- try:
- self.ssn.exchange_delete(exchange="DE1")
- except:
- # restart the session busted from the exception
- self.start_session()
-
- self.ssn.exchange_declare(exchange="DE1", type="direct", durable=True)
- response = self.ssn.exchange_query(name="DE1")
- self.assert_(response.durable)
- self.assert_(not response.not_found)
-
- # Cycle the broker and make sure the exchange recovers
- self.cycle_broker()
- response = self.ssn.exchange_query(name="DE1")
- self.assert_(response.durable)
- self.assert_(not response.not_found)
-
- self.ssn.exchange_delete(exchange="DE1")
-
- def testDurableQueues(self):
- try:
- self.ssn.queue_delete(queue="DQ1")
- except:
- self.start_session()
-
- self.ssn.queue_declare(queue="DQ1", durable=True)
- response = self.ssn.queue_query(queue="DQ1")
- self.assertEqual("DQ1", response.queue)
- self.assert_(response.durable)
-
- # Cycle the broker and make sure the queue recovers
- self.cycle_broker()
- response = self.ssn.queue_query(queue="DQ1")
- self.assertEqual("DQ1", response.queue)
- self.assert_(response.durable)
-
- self.ssn.queue_delete(queue="DQ1")
-
- def testDurableBindings(self):
- try:
- self.ssn.exchange_unbind(queue="DB_Q1", exchange="DB_E1", binding_key="K1")
- except:
- self.start_session()
- try:
- self.ssn.exchange_delete(exchange="DB_E1")
- except:
- self.start_session()
- try:
- self.ssn.queue_delete(queue="DB_Q1")
- except:
- self.start_session()
-
- self.ssn.queue_declare(queue="DB_Q1", durable=True)
- self.ssn.exchange_declare(exchange="DB_E1", type="direct", durable=True)
- self.ssn.exchange_bind(exchange="DB_E1", queue="DB_Q1", binding_key="K1")
-
- # Cycle the broker and make sure the binding recovers
- self.cycle_broker()
- response = self.ssn.exchange_bound(exchange="DB_E1", queue="DB_Q1", binding_key="K1")
- self.assert_(not response.exchange_not_found)
- self.assert_(not response.queue_not_found)
- self.assert_(not response.queue_not_matched)
- self.assert_(not response.key_not_matched)
-
- self.ssn.exchange_unbind(queue="DB_Q1", exchange="DB_E1", binding_key="K1")
- self.ssn.exchange_delete(exchange="DB_E1")
- self.ssn.queue_delete(queue="DB_Q1")
-
- def testDtxRecoverPrepared(self):
- try:
- self.ssn.exchange_unbind(queue="Dtx_Q", exchange="Dtx_E", binding_key="Dtx")
- except:
- self.start_session()
- try:
- self.ssn.exchange_delete(exchange="Dtx_E")
- except:
- self.start_session()
- try:
- self.ssn.queue_delete(queue="Dtx_Q")
- except:
- self.start_session()
-
- self.ssn.queue_declare(queue="Dtx_Q", auto_delete=False, durable=True)
- self.ssn.exchange_declare(exchange="Dtx_E", type="direct", durable=True)
- self.ssn.exchange_bind(exchange="Dtx_E", queue="Dtx_Q", binding_key="Dtx")
- txid = self.xid("DtxRecoverPrepared")
- self.ssn.dtx_select()
- self.ssn.dtx_start(xid=txid)
- # 2 = delivery_mode.persistent
- dp = self.ssn.delivery_properties(routing_key="Dtx_Q", delivery_mode=2)
- self.ssn.message_transfer(message=Message(dp, "transactional message"))
- self.ssn.dtx_end(xid=txid)
- self.assertEqual(self.XA_OK, self.ssn.dtx_prepare(xid=txid).status)
- # Cycle the broker and make sure the xid is there, the message is not
- # queued.
- self.cycle_broker()
- # The txid should be recovered and in doubt
- xids = self.ssn.dtx_recover().in_doubt
- xid_matched = False
- for x in xids:
- self.assertEqual(txid.format, x.format)
- self.assertEqual(txid.global_id, x.global_id)
- self.assertEqual(txid.branch_id, x.branch_id)
- xid_matched = True
- self.assert_(xid_matched)
- self.ssn.message_subscribe(destination="dtx_msgs", queue="Dtx_Q", accept_mode=1, acquire_mode=0)
- self.ssn.message_flow(unit = 1, value = 0xFFFFFFFFL, destination = "dtx_msgs")
- self.ssn.message_flow(unit = 0, value = 10, destination = "dtx_msgs")
- message_arrivals = self.ssn.incoming("dtx_msgs")
- try:
- message_arrivals.get(timeout=1)
- assert False, 'Message present in queue before commit'
- except Empty: pass
- self.ssn.dtx_select()
- self.assertEqual(self.XA_OK, self.ssn.dtx_commit(xid=txid, one_phase=False).status)
- try:
- msg = message_arrivals.get(timeout=1)
- self.assertEqual("transactional message", msg.body)
- except Empty:
- assert False, 'Message should be present after dtx commit but is not'
-
- self.ssn.exchange_unbind(queue="Dtx_Q", exchange="Dtx_E", binding_key="Dtx")
- self.ssn.exchange_delete(exchange="Dtx_E")
- self.ssn.queue_delete(queue="Dtx_Q")