diff options
Diffstat (limited to 'cpp/src/qpid/broker/SessionState.cpp')
-rw-r--r-- | cpp/src/qpid/broker/SessionState.cpp | 45 |
1 files changed, 29 insertions, 16 deletions
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index 571b8848ae..ceaa70db18 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -168,16 +168,16 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId, return status; } -void SessionState::handleCommand(framing::AMQMethodBody* method) +void SessionState::handleCommand(framing::AMQMethodBody* method, SequenceNumber& id) { - SequenceNumber id = nextIn++; + id = nextIn++; Invoker::Result invocation = invoke(adapter, *method); completed.add(id); if (!invocation.wasHandled()) { throw NotImplementedException("Not implemented"); } else if (invocation.hasResult()) { - getProxy().getExecution().result(id.getValue(), invocation.getResult()); + getProxy().getExecution010().result(id, invocation.getResult()); } if (method->isSync()) { sendCompletion(); @@ -185,16 +185,18 @@ void SessionState::handleCommand(framing::AMQMethodBody* method) //TODO: if window gets too large send unsolicited completion } -void SessionState::handleContent(AMQFrame& frame) +void SessionState::handleContent(AMQFrame& frame, SequenceNumber& id) { intrusive_ptr<Message> msg(msgBuilder.getMessage()); - if (!msg) {//start of frameset will be indicated by frame flags - SequenceNumber id = nextIn++; + if (frame.getBof() && frame.getBos()) {//start of frameset + id = nextIn++; msgBuilder.start(id); msg = msgBuilder.getMessage(); + } else { + id = msg->getCommandId(); } msgBuilder.handle(frame); - if (frame.getEof() && frame.getEos()) {//end of frameset will be indicated by frame flags + if (frame.getEof() && frame.getEos()) {//end of frameset msg->setPublisher(&getConnection()); semanticState.handle(msg); msgBuilder.end(); @@ -210,16 +212,27 @@ void SessionState::handle(AMQFrame& frame) { received(frame); - //TODO: make command handling more uniform, regardless of whether - //commands carry content. (For now, assume all single frame - //assmblies are non-content bearing and all content-bearing - //assmeblies will have more than one frame): - if (frame.getBof() && frame.getEof()) { - handleCommand(frame.getMethod()); - } else { - handleContent(frame); + SequenceNumber commandId; + try { + //TODO: make command handling more uniform, regardless of whether + //commands carry content. (For now, assume all single frame + //assemblies are non-content bearing and all content-bearing + //assemblies will have more than one frame): + if (frame.getBof() && frame.getEof()) { + handleCommand(frame.getMethod(), commandId); + } else { + handleContent(frame, commandId); + } + } catch(const ChannelException& e) { + //TODO: better implementation of new exception handling mechanism + AMQMethodBody* m = frame.getMethod(); + if (m) { + getProxy().getExecution010().exception(e.code, commandId, m->amqpClassId(), m->amqpMethodId(), 0, e.what(), FieldTable()); + } else { + getProxy().getExecution010().exception(e.code, commandId, 0, 0, 0, e.what(), FieldTable()); + } + handler->destroy(); } - } DeliveryId SessionState::deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token) |