diff options
author | Gordon Sim <gsim@apache.org> | 2007-08-10 14:51:08 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2007-08-10 14:51:08 +0000 |
commit | 6577b14632d81c15482cb0793e01166cdb28eaff (patch) | |
tree | 8b8dc5e4db5690e9c024b862a1d725764687d6fc /cpp/src/qpid/broker/SemanticHandler.cpp | |
parent | c00a668cbf27d90edf18cc935cc982cab6581cae (diff) | |
download | qpid-python-6577b14632d81c15482cb0793e01166cdb28eaff.tar.gz |
Broker management of message acknowledgements now runs entirely off execution layer.
Flow control support.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@564611 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/SemanticHandler.cpp')
-rw-r--r-- | cpp/src/qpid/broker/SemanticHandler.cpp | 45 |
1 files changed, 21 insertions, 24 deletions
diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp index 27f484cfcb..08f91bf69a 100644 --- a/cpp/src/qpid/broker/SemanticHandler.cpp +++ b/cpp/src/qpid/broker/SemanticHandler.cpp @@ -63,20 +63,24 @@ void SemanticHandler::handle(framing::AMQFrame& frame) void SemanticHandler::handleMethodInContext(boost::shared_ptr<qpid::framing::AMQMethodBody> method, const qpid::framing::MethodContext& context) { - if (!method->invoke(this)) { - //else do the usual: - handleL4(method, context); - //(if the frameset is complete) we can move the execution-mark - //forward - - //temporary hack until channel management is moved to its own handler: - if (method->amqpClassId() != ChannelOpenBody::CLASS_ID) { - ++(incoming.hwm); + try { + if (!method->invoke(this)) { + //else do the usual: + handleL4(method, context); + //(if the frameset is complete) we can move the execution-mark + //forward + + //temporary hack until channel management is moved to its own handler: + if (method->amqpClassId() != ChannelOpenBody::CLASS_ID) { + ++(incoming.hwm); + } + + //note: need to be more sophisticated than this if we execute + //commands that arrive within an active message frameset (that + //can't happen until 0-10 framing is implemented) } - - //note: need to be more sophisticated than this if we execute - //commands that arrive within an active message frameset (that - //can't happen until 0-10 framing is implemented) + }catch(const std::exception& e){ + connection.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId()); } } @@ -87,15 +91,14 @@ void SemanticHandler::complete(uint32_t cumulative, SequenceNumberSet range) if (outgoing.lwm < mark) { outgoing.lwm = mark; //ack messages: - channel.ack(mark.getValue(), true); + channel.ackCumulative(mark.getValue()); //std::cout << "[" << this << "] acknowledged: " << mark << std::endl; } if (range.size() % 2) { //must be even number throw ConnectionException(530, "Received odd number of elements in ranged mark"); } else { - //TODO: need to keep a record of the full range previously acked for (SequenceNumberSet::iterator i = range.begin(); i != range.end(); i++) { - channel.ack((uint64_t) i->getValue(), (uint64_t) (++i)->getValue()); + channel.ackRange((uint64_t) i->getValue(), (uint64_t) (++i)->getValue()); } } } @@ -121,22 +124,16 @@ void SemanticHandler::handleL4(boost::shared_ptr<qpid::framing::AMQMethodBody> m throw ConnectionException(504, out.str()); } } else { - adapter->setResponseTo(context.getRequestId()); method->invoke(*adapter, context); - adapter->setResponseTo(0); } - }catch(ChannelException& e){ - adapter->setResponseTo(0); + }catch(const ChannelException& e){ adapter->getProxy().getChannel().close( e.code, e.toString(), method->amqpClassId(), method->amqpMethodId()); connection.closeChannel(getId()); - }catch(ConnectionException& e){ + }catch(const ConnectionException& e){ connection.close(e.code, e.toString(), method->amqpClassId(), method->amqpMethodId()); - }catch(std::exception& e){ - connection.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId()); } - } bool SemanticHandler::isOpen() const |