summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/Channel.cpp2
-rw-r--r--cpp/src/qpid/broker/DeliveryRecord.cpp2
-rw-r--r--cpp/src/qpid/broker/SessionHandlerImpl.cpp27
-rw-r--r--cpp/src/qpid/broker/TxBuffer.cpp2
4 files changed, 22 insertions, 11 deletions
diff --git a/cpp/src/qpid/broker/Channel.cpp b/cpp/src/qpid/broker/Channel.cpp
index f52d9ab8c7..f3624b4f3d 100644
--- a/cpp/src/qpid/broker/Channel.cpp
+++ b/cpp/src/qpid/broker/Channel.cpp
@@ -87,6 +87,8 @@ void Channel::close(){
for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin() ){
cancel(i);
}
+ //requeue:
+ recover(true);
}
void Channel::begin(){
diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp
index acfd2dccf6..a1efc17d19 100644
--- a/cpp/src/qpid/broker/DeliveryRecord.cpp
+++ b/cpp/src/qpid/broker/DeliveryRecord.cpp
@@ -64,7 +64,7 @@ void DeliveryRecord::redeliver(Channel* const channel) const{
void DeliveryRecord::requeue() const{
msg->redeliver();
- queue->deliver(msg);
+ queue->process(msg);
}
void DeliveryRecord::addTo(Prefetch* const prefetch) const{
diff --git a/cpp/src/qpid/broker/SessionHandlerImpl.cpp b/cpp/src/qpid/broker/SessionHandlerImpl.cpp
index b1ac30d493..bbb5d22c8d 100644
--- a/cpp/src/qpid/broker/SessionHandlerImpl.cpp
+++ b/cpp/src/qpid/broker/SessionHandlerImpl.cpp
@@ -97,6 +97,9 @@ void SessionHandlerImpl::received(qpid::framing::AMQFrame* frame){
client.getChannel().close(channel, e.code, e.text, method->amqpClassId(), method->amqpMethodId());
}catch(ConnectionException& e){
client.getConnection().close(0, e.code, e.text, method->amqpClassId(), method->amqpMethodId());
+ }catch(std::exception& e){
+ string error(e.what());
+ client.getConnection().close(0, 541/*internal error*/, error, method->amqpClassId(), method->amqpMethodId());
}
break;
@@ -132,16 +135,20 @@ void SessionHandlerImpl::idleIn(){
}
void SessionHandlerImpl::closed(){
- for(channel_iterator i = channels.begin(); i != channels.end(); i = channels.begin()){
- Channel* c = i->second;
- channels.erase(i);
- c->close();
- delete c;
- }
- for(queue_iterator i = exclusiveQueues.begin(); i < exclusiveQueues.end(); i = exclusiveQueues.begin()){
- string name = (*i)->getName();
- queues->destroy(name);
- exclusiveQueues.erase(i);
+ try {
+ for(channel_iterator i = channels.begin(); i != channels.end(); i = channels.begin()){
+ Channel* c = i->second;
+ channels.erase(i);
+ c->close();
+ delete c;
+ }
+ for(queue_iterator i = exclusiveQueues.begin(); i < exclusiveQueues.end(); i = exclusiveQueues.begin()){
+ string name = (*i)->getName();
+ queues->destroy(name);
+ exclusiveQueues.erase(i);
+ }
+ } catch(std::exception& e) {
+ std::cout << "Caught unhandled exception while closing session: " << e.what() << std::endl;
}
}
diff --git a/cpp/src/qpid/broker/TxBuffer.cpp b/cpp/src/qpid/broker/TxBuffer.cpp
index f344f89bf3..2bbe59b0e1 100644
--- a/cpp/src/qpid/broker/TxBuffer.cpp
+++ b/cpp/src/qpid/broker/TxBuffer.cpp
@@ -40,11 +40,13 @@ bool TxBuffer::prepare(TransactionalStore* const store)
void TxBuffer::commit()
{
for_each(ops.begin(), ops.end(), mem_fun(&TxOp::commit));
+ ops.clear();
}
void TxBuffer::rollback()
{
for_each(ops.begin(), ops.end(), mem_fun(&TxOp::rollback));
+ ops.clear();
}
void TxBuffer::enlist(TxOp* const op)