diff options
Diffstat (limited to 'cpp/src/qpid/broker/SemanticHandler.cpp')
-rw-r--r-- | cpp/src/qpid/broker/SemanticHandler.cpp | 42 |
1 files changed, 21 insertions, 21 deletions
diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp index ead2fad379..dab41dd92f 100644 --- a/cpp/src/qpid/broker/SemanticHandler.cpp +++ b/cpp/src/qpid/broker/SemanticHandler.cpp @@ -97,27 +97,29 @@ void SemanticHandler::complete(uint32_t cumulative, const SequenceNumberSet& ran } } -void SemanticHandler::flush() +void SemanticHandler::sendCompletion() { - //flush doubles as a sync to begin with - send an execution.complete if (isOpen()) { + SequenceNumber mark = incoming.getMark(); + SequenceNumberSet range = incoming.getRange(); Mutex::ScopedLock l(outLock); - ChannelAdapter::send(ExecutionCompleteBody(getVersion(), incoming.hwm.getValue(), SequenceNumberSet())); + ChannelAdapter::send(ExecutionCompleteBody(getVersion(), mark.getValue(), range)); } } +void SemanticHandler::flush() +{ + incoming.flush(); + sendCompletion(); +} void SemanticHandler::sync() { - //for now, just treat as flush; will need to get more clever when we deal with async publication - flush(); + incoming.sync(); + sendCompletion(); } void SemanticHandler::noop() { - //Do nothing... - // - //is this an L3 control? or is it an L4 command? - //if the former, of what use is it? - //if the latter it may contain a synch request... but its odd to have it in this class + incoming.noop(); } void SemanticHandler::result(uint32_t /*command*/, const std::string& /*data*/) @@ -127,17 +129,18 @@ void SemanticHandler::result(uint32_t /*command*/, const std::string& /*data*/) void SemanticHandler::handleCommand(framing::AMQMethodBody* method) { - ++(incoming.lwm); + SequenceNumber id = incoming.next(); InvocationVisitor v(&adapter); method->accept(v); - //TODO: need to account for async store operations and interleaving - ++(incoming.hwm); + incoming.complete(id); if (!v.wasHandled()) { throw ConnectionException(540, "Not implemented"); } else if (v.hasResult()) { - ChannelAdapter::send(ExecutionResultBody(getVersion(), incoming.lwm.getValue(), v.getResult())); + ChannelAdapter::send(ExecutionResultBody(getVersion(), id.getValue(), v.getResult())); } + //TODO: if (method->isSync()) { incoming.synch(id); sendCompletion(); } + //TODO: if window gets too large send unsolicited completion } void SemanticHandler::handleL3(framing::AMQMethodBody* method) @@ -151,16 +154,16 @@ void SemanticHandler::handleContent(AMQFrame& frame) { Message::shared_ptr msg(msgBuilder.getMessage()); if (!msg) {//start of frameset will be indicated by frame flags - msgBuilder.start(++(incoming.lwm)); + msgBuilder.start(incoming.next()); msg = msgBuilder.getMessage(); } msgBuilder.handle(frame); if (msg->getFrames().isComplete()) {//end of frameset will be indicated by frame flags msg->setPublisher(&connection); - session.handle(msg); + session.handle(msg); msgBuilder.end(); - //TODO: need to account for async store operations and interleaving - ++(incoming.hwm); + incoming.track(msg); + //TODO: if (msg.getMethod().isSync()) { incoming.synch(msg.getCommandId()); sendCompletion(); } } } @@ -172,11 +175,8 @@ bool SemanticHandler::isOpen() const { DeliveryId SemanticHandler::deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token) { Mutex::ScopedLock l(outLock); - //SequenceNumber copy(outgoing.hwm); - //++copy; MessageDelivery::deliver(msg, *this, ++outgoing.hwm, token, connection.getFrameMax()); return outgoing.hwm; - //return outgoing.hwm.getValue(); } void SemanticHandler::redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId tag) |