summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/SemanticHandler.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/SemanticHandler.cpp')
-rw-r--r--cpp/src/qpid/broker/SemanticHandler.cpp45
1 files changed, 21 insertions, 24 deletions
diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp
index 27f484cfcb..08f91bf69a 100644
--- a/cpp/src/qpid/broker/SemanticHandler.cpp
+++ b/cpp/src/qpid/broker/SemanticHandler.cpp
@@ -63,20 +63,24 @@ void SemanticHandler::handle(framing::AMQFrame& frame)
void SemanticHandler::handleMethodInContext(boost::shared_ptr<qpid::framing::AMQMethodBody> method,
const qpid::framing::MethodContext& context)
{
- if (!method->invoke(this)) {
- //else do the usual:
- handleL4(method, context);
- //(if the frameset is complete) we can move the execution-mark
- //forward
-
- //temporary hack until channel management is moved to its own handler:
- if (method->amqpClassId() != ChannelOpenBody::CLASS_ID) {
- ++(incoming.hwm);
+ try {
+ if (!method->invoke(this)) {
+ //else do the usual:
+ handleL4(method, context);
+ //(if the frameset is complete) we can move the execution-mark
+ //forward
+
+ //temporary hack until channel management is moved to its own handler:
+ if (method->amqpClassId() != ChannelOpenBody::CLASS_ID) {
+ ++(incoming.hwm);
+ }
+
+ //note: need to be more sophisticated than this if we execute
+ //commands that arrive within an active message frameset (that
+ //can't happen until 0-10 framing is implemented)
}
-
- //note: need to be more sophisticated than this if we execute
- //commands that arrive within an active message frameset (that
- //can't happen until 0-10 framing is implemented)
+ }catch(const std::exception& e){
+ connection.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId());
}
}
@@ -87,15 +91,14 @@ void SemanticHandler::complete(uint32_t cumulative, SequenceNumberSet range)
if (outgoing.lwm < mark) {
outgoing.lwm = mark;
//ack messages:
- channel.ack(mark.getValue(), true);
+ channel.ackCumulative(mark.getValue());
//std::cout << "[" << this << "] acknowledged: " << mark << std::endl;
}
if (range.size() % 2) { //must be even number
throw ConnectionException(530, "Received odd number of elements in ranged mark");
} else {
- //TODO: need to keep a record of the full range previously acked
for (SequenceNumberSet::iterator i = range.begin(); i != range.end(); i++) {
- channel.ack((uint64_t) i->getValue(), (uint64_t) (++i)->getValue());
+ channel.ackRange((uint64_t) i->getValue(), (uint64_t) (++i)->getValue());
}
}
}
@@ -121,22 +124,16 @@ void SemanticHandler::handleL4(boost::shared_ptr<qpid::framing::AMQMethodBody> m
throw ConnectionException(504, out.str());
}
} else {
- adapter->setResponseTo(context.getRequestId());
method->invoke(*adapter, context);
- adapter->setResponseTo(0);
}
- }catch(ChannelException& e){
- adapter->setResponseTo(0);
+ }catch(const ChannelException& e){
adapter->getProxy().getChannel().close(
e.code, e.toString(),
method->amqpClassId(), method->amqpMethodId());
connection.closeChannel(getId());
- }catch(ConnectionException& e){
+ }catch(const ConnectionException& e){
connection.close(e.code, e.toString(), method->amqpClassId(), method->amqpMethodId());
- }catch(std::exception& e){
- connection.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId());
}
-
}
bool SemanticHandler::isOpen() const