summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2006-11-17 11:03:22 +0000
committerGordon Sim <gsim@apache.org>2006-11-17 11:03:22 +0000
commit39afa34d1965ee3d97b8cc488ec33e46ea908699 (patch)
tree40c4919a5ad47bc06042a78bc88ceadeef8787a9 /qpid/cpp
parentf82c31e4317e8eb405db923e40501b2ba85c3abd (diff)
downloadqpid-python-39afa34d1965ee3d97b8cc488ec33e46ea908699.tar.gz
Some fixes and tests for bugs uncovered during testing of persistence.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@476108 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/Channel.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/DeliveryRecord.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/SessionHandlerImpl.cpp27
-rw-r--r--qpid/cpp/src/qpid/broker/TxBuffer.cpp2
-rw-r--r--qpid/cpp/test/unit/qpid/broker/TxBufferTest.cpp34
5 files changed, 56 insertions, 11 deletions
diff --git a/qpid/cpp/src/qpid/broker/Channel.cpp b/qpid/cpp/src/qpid/broker/Channel.cpp
index f52d9ab8c7..f3624b4f3d 100644
--- a/qpid/cpp/src/qpid/broker/Channel.cpp
+++ b/qpid/cpp/src/qpid/broker/Channel.cpp
@@ -87,6 +87,8 @@ void Channel::close(){
for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin() ){
cancel(i);
}
+ //requeue:
+ recover(true);
}
void Channel::begin(){
diff --git a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
index acfd2dccf6..a1efc17d19 100644
--- a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
+++ b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
@@ -64,7 +64,7 @@ void DeliveryRecord::redeliver(Channel* const channel) const{
void DeliveryRecord::requeue() const{
msg->redeliver();
- queue->deliver(msg);
+ queue->process(msg);
}
void DeliveryRecord::addTo(Prefetch* const prefetch) const{
diff --git a/qpid/cpp/src/qpid/broker/SessionHandlerImpl.cpp b/qpid/cpp/src/qpid/broker/SessionHandlerImpl.cpp
index b1ac30d493..bbb5d22c8d 100644
--- a/qpid/cpp/src/qpid/broker/SessionHandlerImpl.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionHandlerImpl.cpp
@@ -97,6 +97,9 @@ void SessionHandlerImpl::received(qpid::framing::AMQFrame* frame){
client.getChannel().close(channel, e.code, e.text, method->amqpClassId(), method->amqpMethodId());
}catch(ConnectionException& e){
client.getConnection().close(0, e.code, e.text, method->amqpClassId(), method->amqpMethodId());
+ }catch(std::exception& e){
+ string error(e.what());
+ client.getConnection().close(0, 541/*internal error*/, error, method->amqpClassId(), method->amqpMethodId());
}
break;
@@ -132,16 +135,20 @@ void SessionHandlerImpl::idleIn(){
}
void SessionHandlerImpl::closed(){
- for(channel_iterator i = channels.begin(); i != channels.end(); i = channels.begin()){
- Channel* c = i->second;
- channels.erase(i);
- c->close();
- delete c;
- }
- for(queue_iterator i = exclusiveQueues.begin(); i < exclusiveQueues.end(); i = exclusiveQueues.begin()){
- string name = (*i)->getName();
- queues->destroy(name);
- exclusiveQueues.erase(i);
+ try {
+ for(channel_iterator i = channels.begin(); i != channels.end(); i = channels.begin()){
+ Channel* c = i->second;
+ channels.erase(i);
+ c->close();
+ delete c;
+ }
+ for(queue_iterator i = exclusiveQueues.begin(); i < exclusiveQueues.end(); i = exclusiveQueues.begin()){
+ string name = (*i)->getName();
+ queues->destroy(name);
+ exclusiveQueues.erase(i);
+ }
+ } catch(std::exception& e) {
+ std::cout << "Caught unhandled exception while closing session: " << e.what() << std::endl;
}
}
diff --git a/qpid/cpp/src/qpid/broker/TxBuffer.cpp b/qpid/cpp/src/qpid/broker/TxBuffer.cpp
index f344f89bf3..2bbe59b0e1 100644
--- a/qpid/cpp/src/qpid/broker/TxBuffer.cpp
+++ b/qpid/cpp/src/qpid/broker/TxBuffer.cpp
@@ -40,11 +40,13 @@ bool TxBuffer::prepare(TransactionalStore* const store)
void TxBuffer::commit()
{
for_each(ops.begin(), ops.end(), mem_fun(&TxOp::commit));
+ ops.clear();
}
void TxBuffer::rollback()
{
for_each(ops.begin(), ops.end(), mem_fun(&TxOp::rollback));
+ ops.clear();
}
void TxBuffer::enlist(TxOp* const op)
diff --git a/qpid/cpp/test/unit/qpid/broker/TxBufferTest.cpp b/qpid/cpp/test/unit/qpid/broker/TxBufferTest.cpp
index ea3cd1e2b7..3789d340f8 100644
--- a/qpid/cpp/test/unit/qpid/broker/TxBufferTest.cpp
+++ b/qpid/cpp/test/unit/qpid/broker/TxBufferTest.cpp
@@ -153,6 +153,8 @@ class TxBufferTest : public CppUnit::TestCase
CPPUNIT_TEST(testPrepareAndCommit);
CPPUNIT_TEST(testFailOnPrepare);
CPPUNIT_TEST(testRollback);
+ CPPUNIT_TEST(testBufferIsClearedAfterRollback);
+ CPPUNIT_TEST(testBufferIsClearedAfterCommit);
CPPUNIT_TEST_SUITE_END();
public:
@@ -224,6 +226,38 @@ class TxBufferTest : public CppUnit::TestCase
opB.check();
opC.check();
}
+
+ void testBufferIsClearedAfterRollback(){
+ MockTxOp opA;
+ opA.expectRollback();
+ MockTxOp opB;
+ opB.expectRollback();
+
+ TxBuffer buffer;
+ buffer.enlist(&opA);
+ buffer.enlist(&opB);
+
+ buffer.rollback();
+ buffer.commit();//second call should not reach ops
+ opA.check();
+ opB.check();
+ }
+
+ void testBufferIsClearedAfterCommit(){
+ MockTxOp opA;
+ opA.expectCommit();
+ MockTxOp opB;
+ opB.expectCommit();
+
+ TxBuffer buffer;
+ buffer.enlist(&opA);
+ buffer.enlist(&opB);
+
+ buffer.commit();
+ buffer.rollback();//second call should not reach ops
+ opA.check();
+ opB.check();
+ }
};
// Make this test suite a plugin.