summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2006-11-17 11:03:22 +0000
committerGordon Sim <gsim@apache.org>2006-11-17 11:03:22 +0000
commitbf74286e6a5eba055fd8bf9410c325205b8595d5 (patch)
tree46fafd1086d95b543e5b8c4927ba28daa50e7da9
parentd965a29414762f0b3bbc840485f6327c3d523946 (diff)
downloadqpid-python-bf74286e6a5eba055fd8bf9410c325205b8595d5.tar.gz
Some fixes and tests for bugs uncovered during testing of persistence.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@476108 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/qpid/broker/Channel.cpp2
-rw-r--r--cpp/src/qpid/broker/DeliveryRecord.cpp2
-rw-r--r--cpp/src/qpid/broker/SessionHandlerImpl.cpp27
-rw-r--r--cpp/src/qpid/broker/TxBuffer.cpp2
-rw-r--r--cpp/test/unit/qpid/broker/TxBufferTest.cpp34
-rw-r--r--python/qpid/testlib.py12
-rw-r--r--python/tests/tx.py38
7 files changed, 99 insertions, 18 deletions
diff --git a/cpp/src/qpid/broker/Channel.cpp b/cpp/src/qpid/broker/Channel.cpp
index f52d9ab8c7..f3624b4f3d 100644
--- a/cpp/src/qpid/broker/Channel.cpp
+++ b/cpp/src/qpid/broker/Channel.cpp
@@ -87,6 +87,8 @@ void Channel::close(){
for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin() ){
cancel(i);
}
+ //requeue:
+ recover(true);
}
void Channel::begin(){
diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp
index acfd2dccf6..a1efc17d19 100644
--- a/cpp/src/qpid/broker/DeliveryRecord.cpp
+++ b/cpp/src/qpid/broker/DeliveryRecord.cpp
@@ -64,7 +64,7 @@ void DeliveryRecord::redeliver(Channel* const channel) const{
void DeliveryRecord::requeue() const{
msg->redeliver();
- queue->deliver(msg);
+ queue->process(msg);
}
void DeliveryRecord::addTo(Prefetch* const prefetch) const{
diff --git a/cpp/src/qpid/broker/SessionHandlerImpl.cpp b/cpp/src/qpid/broker/SessionHandlerImpl.cpp
index b1ac30d493..bbb5d22c8d 100644
--- a/cpp/src/qpid/broker/SessionHandlerImpl.cpp
+++ b/cpp/src/qpid/broker/SessionHandlerImpl.cpp
@@ -97,6 +97,9 @@ void SessionHandlerImpl::received(qpid::framing::AMQFrame* frame){
client.getChannel().close(channel, e.code, e.text, method->amqpClassId(), method->amqpMethodId());
}catch(ConnectionException& e){
client.getConnection().close(0, e.code, e.text, method->amqpClassId(), method->amqpMethodId());
+ }catch(std::exception& e){
+ string error(e.what());
+ client.getConnection().close(0, 541/*internal error*/, error, method->amqpClassId(), method->amqpMethodId());
}
break;
@@ -132,16 +135,20 @@ void SessionHandlerImpl::idleIn(){
}
void SessionHandlerImpl::closed(){
- for(channel_iterator i = channels.begin(); i != channels.end(); i = channels.begin()){
- Channel* c = i->second;
- channels.erase(i);
- c->close();
- delete c;
- }
- for(queue_iterator i = exclusiveQueues.begin(); i < exclusiveQueues.end(); i = exclusiveQueues.begin()){
- string name = (*i)->getName();
- queues->destroy(name);
- exclusiveQueues.erase(i);
+ try {
+ for(channel_iterator i = channels.begin(); i != channels.end(); i = channels.begin()){
+ Channel* c = i->second;
+ channels.erase(i);
+ c->close();
+ delete c;
+ }
+ for(queue_iterator i = exclusiveQueues.begin(); i < exclusiveQueues.end(); i = exclusiveQueues.begin()){
+ string name = (*i)->getName();
+ queues->destroy(name);
+ exclusiveQueues.erase(i);
+ }
+ } catch(std::exception& e) {
+ std::cout << "Caught unhandled exception while closing session: " << e.what() << std::endl;
}
}
diff --git a/cpp/src/qpid/broker/TxBuffer.cpp b/cpp/src/qpid/broker/TxBuffer.cpp
index f344f89bf3..2bbe59b0e1 100644
--- a/cpp/src/qpid/broker/TxBuffer.cpp
+++ b/cpp/src/qpid/broker/TxBuffer.cpp
@@ -40,11 +40,13 @@ bool TxBuffer::prepare(TransactionalStore* const store)
void TxBuffer::commit()
{
for_each(ops.begin(), ops.end(), mem_fun(&TxOp::commit));
+ ops.clear();
}
void TxBuffer::rollback()
{
for_each(ops.begin(), ops.end(), mem_fun(&TxOp::rollback));
+ ops.clear();
}
void TxBuffer::enlist(TxOp* const op)
diff --git a/cpp/test/unit/qpid/broker/TxBufferTest.cpp b/cpp/test/unit/qpid/broker/TxBufferTest.cpp
index ea3cd1e2b7..3789d340f8 100644
--- a/cpp/test/unit/qpid/broker/TxBufferTest.cpp
+++ b/cpp/test/unit/qpid/broker/TxBufferTest.cpp
@@ -153,6 +153,8 @@ class TxBufferTest : public CppUnit::TestCase
CPPUNIT_TEST(testPrepareAndCommit);
CPPUNIT_TEST(testFailOnPrepare);
CPPUNIT_TEST(testRollback);
+ CPPUNIT_TEST(testBufferIsClearedAfterRollback);
+ CPPUNIT_TEST(testBufferIsClearedAfterCommit);
CPPUNIT_TEST_SUITE_END();
public:
@@ -224,6 +226,38 @@ class TxBufferTest : public CppUnit::TestCase
opB.check();
opC.check();
}
+
+ void testBufferIsClearedAfterRollback(){
+ MockTxOp opA;
+ opA.expectRollback();
+ MockTxOp opB;
+ opB.expectRollback();
+
+ TxBuffer buffer;
+ buffer.enlist(&opA);
+ buffer.enlist(&opB);
+
+ buffer.rollback();
+ buffer.commit();//second call should not reach ops
+ opA.check();
+ opB.check();
+ }
+
+ void testBufferIsClearedAfterCommit(){
+ MockTxOp opA;
+ opA.expectCommit();
+ MockTxOp opB;
+ opB.expectCommit();
+
+ TxBuffer buffer;
+ buffer.enlist(&opA);
+ buffer.enlist(&opB);
+
+ buffer.commit();
+ buffer.rollback();//second call should not reach ops
+ opA.check();
+ opB.check();
+ }
};
// Make this test suite a plugin.
diff --git a/python/qpid/testlib.py b/python/qpid/testlib.py
index f7a34294be..d35c38e4ad 100644
--- a/python/qpid/testlib.py
+++ b/python/qpid/testlib.py
@@ -225,13 +225,13 @@ class TestBase(unittest.TestCase):
self.assertPublishGet(self.consume(queue), exchange, routing_key, properties)
def assertChannelException(self, expectedCode, message):
- self.assertEqual(message.method.klass.name, "channel")
- self.assertEqual(message.method.name, "close")
- self.assertEqual(message.reply_code, expectedCode)
+ self.assertEqual("channel", message.method.klass.name)
+ self.assertEqual("close", message.method.name)
+ self.assertEqual(expectedCode, message.reply_code)
def assertConnectionException(self, expectedCode, message):
- self.assertEqual(message.method.klass.name, "connection")
- self.assertEqual(message.method.name, "close")
- self.assertEqual(message.reply_code, expectedCode)
+ self.assertEqual("connection", message.method.klass.name)
+ self.assertEqual("close", message.method.name)
+ self.assertEqual(expectedCode, message.reply_code)
diff --git a/python/tests/tx.py b/python/tests/tx.py
index e8d0e99628..054fb8d8b7 100644
--- a/python/tests/tx.py
+++ b/python/tests/tx.py
@@ -55,6 +55,42 @@ class TxTests(TestBase):
channel.basic_ack(delivery_tag=0, multiple=True)
channel.tx_commit()
+ def test_auto_rollback(self):
+ """
+ Test that a channel closed with an open transaction is effectively rolled back
+ """
+ channel = self.channel
+ queue_a, queue_b, queue_c = self.perform_txn_work(channel, "tx-autorollback-a", "tx-autorollback-b", "tx-autorollback-c")
+
+ for q in [queue_a, queue_b, queue_c]:
+ try:
+ extra = q.get(timeout=1)
+ self.fail("Got unexpected message: " + extra.content.body)
+ except Empty: None
+
+ channel.tx_rollback()
+
+ #check results
+ for i in range(1, 5):
+ msg = queue_a.get(timeout=1)
+ self.assertEqual("Message %d" % i, msg.content.body)
+
+ msg = queue_b.get(timeout=1)
+ self.assertEqual("Message 6", msg.content.body)
+
+ msg = queue_c.get(timeout=1)
+ self.assertEqual("Message 7", msg.content.body)
+
+ for q in [queue_a, queue_b, queue_c]:
+ try:
+ extra = q.get(timeout=1)
+ self.fail("Got unexpected message: " + extra.content.body)
+ except Empty: None
+
+ #cleanup
+ channel.basic_ack(delivery_tag=0, multiple=True)
+ channel.tx_commit()
+
def test_rollback(self):
"""
Test that rolled back publishes are not delivered and rolled back acks are re-delivered
@@ -90,7 +126,7 @@ class TxTests(TestBase):
#cleanup
channel.basic_ack(delivery_tag=0, multiple=True)
channel.tx_commit()
-
+
def perform_txn_work(self, channel, name_a, name_b, name_c):
"""
Utility method that does some setup and some work under a transaction. Used for testing both