diff options
Diffstat (limited to 'cpp/src/qpid/broker/SemanticHandler.cpp')
-rw-r--r-- | cpp/src/qpid/broker/SemanticHandler.cpp | 44 |
1 files changed, 39 insertions, 5 deletions
diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp index b7aa2aad25..f65e450e82 100644 --- a/cpp/src/qpid/broker/SemanticHandler.cpp +++ b/cpp/src/qpid/broker/SemanticHandler.cpp @@ -22,8 +22,10 @@ #include "SemanticHandler.h" #include "BrokerAdapter.h" #include "qpid/framing/ChannelAdapter.h" -#include "qpid/framing/ExecutionCompleteBody.h" #include "qpid/framing/ChannelCloseOkBody.h" +#include "qpid/framing/ExecutionCompleteBody.h" +#include "qpid/framing/ExecutionResultBody.h" +#include "qpid/framing/InvocationVisitor.h" using namespace qpid::broker; using namespace qpid::framing; @@ -66,6 +68,11 @@ void SemanticHandler::handleMethod(framing::AMQMethodBody* method) { try { if (!method->invoke(this)) { + //temporary hack until channel management is moved to its own handler: + if (method->amqpClassId() != ChannelOpenBody::CLASS_ID) { + ++(incoming.lwm); + } + //else do the usual: handleL4(method); //(if the frameset is complete) we can move the execution-mark @@ -73,7 +80,9 @@ void SemanticHandler::handleMethod(framing::AMQMethodBody* method) //temporary hack until channel management is moved to its own handler: if (method->amqpClassId() != ChannelOpenBody::CLASS_ID) { - ++(incoming.hwm); + //TODO: need to account for async store opreations + //when this command is a message publication + ++(incoming.hwm); } //note: need to be more sophisticated than this if we execute @@ -85,7 +94,7 @@ void SemanticHandler::handleMethod(framing::AMQMethodBody* method) } } -void SemanticHandler::complete(uint32_t cumulative, SequenceNumberSet range) +void SemanticHandler::complete(uint32_t cumulative, const SequenceNumberSet& range) { //record: SequenceNumber mark(cumulative); @@ -98,7 +107,7 @@ void SemanticHandler::complete(uint32_t cumulative, SequenceNumberSet range) if (range.size() % 2) { //must be even number throw ConnectionException(530, "Received odd number of elements in ranged mark"); } else { - for (SequenceNumberSet::iterator i = range.begin(); i != range.end(); i++) { + for (SequenceNumberSet::const_iterator i = range.begin(); i != range.end(); i++) { channel.ackRange((uint64_t) i->getValue(), (uint64_t) (++i)->getValue()); } } @@ -113,6 +122,25 @@ void SemanticHandler::flush() ChannelAdapter::send(ExecutionCompleteBody(getVersion(), incoming.hwm.getValue(), SequenceNumberSet())); } } +void SemanticHandler::sync() +{ + //for now, just treat as flush; will need to get more clever when we deal with async publication + flush(); +} + +void SemanticHandler::noop() +{ + //Do nothing... + // + //is this an L3 control? or is it an L4 command? + //if the former, of what use is it? + //if the latter it may contain a synch request... but its odd to have it in this class +} + +void SemanticHandler::result(uint32_t /*command*/, const std::string& /*data*/) +{ + //never actually sent by client at present +} void SemanticHandler::handleL4(framing::AMQMethodBody* method) { @@ -124,7 +152,13 @@ void SemanticHandler::handleL4(framing::AMQMethodBody* method) throw ConnectionException(504, out.str()); } } else { - method->invoke(*adapter); + InvocationVisitor v(adapter.get()); + method->accept(v); + if (!v.wasHandled()) { + throw ConnectionException(540, "Not implemented"); + } else if (v.hasResult()) { + ChannelAdapter::send(ExecutionResultBody(getVersion(), incoming.lwm.getValue(), v.getResult())); + } } }catch(const ChannelException& e){ adapter->getProxy().getChannel().close( |