summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/SessionState.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/SessionState.cpp')
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp45
1 files changed, 29 insertions, 16 deletions
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index 571b8848ae..ceaa70db18 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -168,16 +168,16 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId,
return status;
}
-void SessionState::handleCommand(framing::AMQMethodBody* method)
+void SessionState::handleCommand(framing::AMQMethodBody* method, SequenceNumber& id)
{
- SequenceNumber id = nextIn++;
+ id = nextIn++;
Invoker::Result invocation = invoke(adapter, *method);
completed.add(id);
if (!invocation.wasHandled()) {
throw NotImplementedException("Not implemented");
} else if (invocation.hasResult()) {
- getProxy().getExecution().result(id.getValue(), invocation.getResult());
+ getProxy().getExecution010().result(id, invocation.getResult());
}
if (method->isSync()) {
sendCompletion();
@@ -185,16 +185,18 @@ void SessionState::handleCommand(framing::AMQMethodBody* method)
//TODO: if window gets too large send unsolicited completion
}
-void SessionState::handleContent(AMQFrame& frame)
+void SessionState::handleContent(AMQFrame& frame, SequenceNumber& id)
{
intrusive_ptr<Message> msg(msgBuilder.getMessage());
- if (!msg) {//start of frameset will be indicated by frame flags
- SequenceNumber id = nextIn++;
+ if (frame.getBof() && frame.getBos()) {//start of frameset
+ id = nextIn++;
msgBuilder.start(id);
msg = msgBuilder.getMessage();
+ } else {
+ id = msg->getCommandId();
}
msgBuilder.handle(frame);
- if (frame.getEof() && frame.getEos()) {//end of frameset will be indicated by frame flags
+ if (frame.getEof() && frame.getEos()) {//end of frameset
msg->setPublisher(&getConnection());
semanticState.handle(msg);
msgBuilder.end();
@@ -210,16 +212,27 @@ void SessionState::handle(AMQFrame& frame)
{
received(frame);
- //TODO: make command handling more uniform, regardless of whether
- //commands carry content. (For now, assume all single frame
- //assmblies are non-content bearing and all content-bearing
- //assmeblies will have more than one frame):
- if (frame.getBof() && frame.getEof()) {
- handleCommand(frame.getMethod());
- } else {
- handleContent(frame);
+ SequenceNumber commandId;
+ try {
+ //TODO: make command handling more uniform, regardless of whether
+ //commands carry content. (For now, assume all single frame
+ //assemblies are non-content bearing and all content-bearing
+ //assemblies will have more than one frame):
+ if (frame.getBof() && frame.getEof()) {
+ handleCommand(frame.getMethod(), commandId);
+ } else {
+ handleContent(frame, commandId);
+ }
+ } catch(const ChannelException& e) {
+ //TODO: better implementation of new exception handling mechanism
+ AMQMethodBody* m = frame.getMethod();
+ if (m) {
+ getProxy().getExecution010().exception(e.code, commandId, m->amqpClassId(), m->amqpMethodId(), 0, e.what(), FieldTable());
+ } else {
+ getProxy().getExecution010().exception(e.code, commandId, 0, 0, 0, e.what(), FieldTable());
+ }
+ handler->destroy();
}
-
}
DeliveryId SessionState::deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token)