summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/SemanticHandler.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/SemanticHandler.cpp')
-rw-r--r--cpp/src/qpid/broker/SemanticHandler.cpp42
1 files changed, 21 insertions, 21 deletions
diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp
index ead2fad379..dab41dd92f 100644
--- a/cpp/src/qpid/broker/SemanticHandler.cpp
+++ b/cpp/src/qpid/broker/SemanticHandler.cpp
@@ -97,27 +97,29 @@ void SemanticHandler::complete(uint32_t cumulative, const SequenceNumberSet& ran
}
}
-void SemanticHandler::flush()
+void SemanticHandler::sendCompletion()
{
- //flush doubles as a sync to begin with - send an execution.complete
if (isOpen()) {
+ SequenceNumber mark = incoming.getMark();
+ SequenceNumberSet range = incoming.getRange();
Mutex::ScopedLock l(outLock);
- ChannelAdapter::send(ExecutionCompleteBody(getVersion(), incoming.hwm.getValue(), SequenceNumberSet()));
+ ChannelAdapter::send(ExecutionCompleteBody(getVersion(), mark.getValue(), range));
}
}
+void SemanticHandler::flush()
+{
+ incoming.flush();
+ sendCompletion();
+}
void SemanticHandler::sync()
{
- //for now, just treat as flush; will need to get more clever when we deal with async publication
- flush();
+ incoming.sync();
+ sendCompletion();
}
void SemanticHandler::noop()
{
- //Do nothing...
- //
- //is this an L3 control? or is it an L4 command?
- //if the former, of what use is it?
- //if the latter it may contain a synch request... but its odd to have it in this class
+ incoming.noop();
}
void SemanticHandler::result(uint32_t /*command*/, const std::string& /*data*/)
@@ -127,17 +129,18 @@ void SemanticHandler::result(uint32_t /*command*/, const std::string& /*data*/)
void SemanticHandler::handleCommand(framing::AMQMethodBody* method)
{
- ++(incoming.lwm);
+ SequenceNumber id = incoming.next();
InvocationVisitor v(&adapter);
method->accept(v);
- //TODO: need to account for async store operations and interleaving
- ++(incoming.hwm);
+ incoming.complete(id);
if (!v.wasHandled()) {
throw ConnectionException(540, "Not implemented");
} else if (v.hasResult()) {
- ChannelAdapter::send(ExecutionResultBody(getVersion(), incoming.lwm.getValue(), v.getResult()));
+ ChannelAdapter::send(ExecutionResultBody(getVersion(), id.getValue(), v.getResult()));
}
+ //TODO: if (method->isSync()) { incoming.synch(id); sendCompletion(); }
+ //TODO: if window gets too large send unsolicited completion
}
void SemanticHandler::handleL3(framing::AMQMethodBody* method)
@@ -151,16 +154,16 @@ void SemanticHandler::handleContent(AMQFrame& frame)
{
Message::shared_ptr msg(msgBuilder.getMessage());
if (!msg) {//start of frameset will be indicated by frame flags
- msgBuilder.start(++(incoming.lwm));
+ msgBuilder.start(incoming.next());
msg = msgBuilder.getMessage();
}
msgBuilder.handle(frame);
if (msg->getFrames().isComplete()) {//end of frameset will be indicated by frame flags
msg->setPublisher(&connection);
- session.handle(msg);
+ session.handle(msg);
msgBuilder.end();
- //TODO: need to account for async store operations and interleaving
- ++(incoming.hwm);
+ incoming.track(msg);
+ //TODO: if (msg.getMethod().isSync()) { incoming.synch(msg.getCommandId()); sendCompletion(); }
}
}
@@ -172,11 +175,8 @@ bool SemanticHandler::isOpen() const {
DeliveryId SemanticHandler::deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token)
{
Mutex::ScopedLock l(outLock);
- //SequenceNumber copy(outgoing.hwm);
- //++copy;
MessageDelivery::deliver(msg, *this, ++outgoing.hwm, token, connection.getFrameMax());
return outgoing.hwm;
- //return outgoing.hwm.getValue();
}
void SemanticHandler::redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId tag)