summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/SemanticHandler.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/SemanticHandler.cpp')
-rw-r--r--cpp/src/qpid/broker/SemanticHandler.cpp44
1 files changed, 39 insertions, 5 deletions
diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp
index b7aa2aad25..f65e450e82 100644
--- a/cpp/src/qpid/broker/SemanticHandler.cpp
+++ b/cpp/src/qpid/broker/SemanticHandler.cpp
@@ -22,8 +22,10 @@
#include "SemanticHandler.h"
#include "BrokerAdapter.h"
#include "qpid/framing/ChannelAdapter.h"
-#include "qpid/framing/ExecutionCompleteBody.h"
#include "qpid/framing/ChannelCloseOkBody.h"
+#include "qpid/framing/ExecutionCompleteBody.h"
+#include "qpid/framing/ExecutionResultBody.h"
+#include "qpid/framing/InvocationVisitor.h"
using namespace qpid::broker;
using namespace qpid::framing;
@@ -66,6 +68,11 @@ void SemanticHandler::handleMethod(framing::AMQMethodBody* method)
{
try {
if (!method->invoke(this)) {
+ //temporary hack until channel management is moved to its own handler:
+ if (method->amqpClassId() != ChannelOpenBody::CLASS_ID) {
+ ++(incoming.lwm);
+ }
+
//else do the usual:
handleL4(method);
//(if the frameset is complete) we can move the execution-mark
@@ -73,7 +80,9 @@ void SemanticHandler::handleMethod(framing::AMQMethodBody* method)
//temporary hack until channel management is moved to its own handler:
if (method->amqpClassId() != ChannelOpenBody::CLASS_ID) {
- ++(incoming.hwm);
+ //TODO: need to account for async store opreations
+ //when this command is a message publication
+ ++(incoming.hwm);
}
//note: need to be more sophisticated than this if we execute
@@ -85,7 +94,7 @@ void SemanticHandler::handleMethod(framing::AMQMethodBody* method)
}
}
-void SemanticHandler::complete(uint32_t cumulative, SequenceNumberSet range)
+void SemanticHandler::complete(uint32_t cumulative, const SequenceNumberSet& range)
{
//record:
SequenceNumber mark(cumulative);
@@ -98,7 +107,7 @@ void SemanticHandler::complete(uint32_t cumulative, SequenceNumberSet range)
if (range.size() % 2) { //must be even number
throw ConnectionException(530, "Received odd number of elements in ranged mark");
} else {
- for (SequenceNumberSet::iterator i = range.begin(); i != range.end(); i++) {
+ for (SequenceNumberSet::const_iterator i = range.begin(); i != range.end(); i++) {
channel.ackRange((uint64_t) i->getValue(), (uint64_t) (++i)->getValue());
}
}
@@ -113,6 +122,25 @@ void SemanticHandler::flush()
ChannelAdapter::send(ExecutionCompleteBody(getVersion(), incoming.hwm.getValue(), SequenceNumberSet()));
}
}
+void SemanticHandler::sync()
+{
+ //for now, just treat as flush; will need to get more clever when we deal with async publication
+ flush();
+}
+
+void SemanticHandler::noop()
+{
+ //Do nothing...
+ //
+ //is this an L3 control? or is it an L4 command?
+ //if the former, of what use is it?
+ //if the latter it may contain a synch request... but its odd to have it in this class
+}
+
+void SemanticHandler::result(uint32_t /*command*/, const std::string& /*data*/)
+{
+ //never actually sent by client at present
+}
void SemanticHandler::handleL4(framing::AMQMethodBody* method)
{
@@ -124,7 +152,13 @@ void SemanticHandler::handleL4(framing::AMQMethodBody* method)
throw ConnectionException(504, out.str());
}
} else {
- method->invoke(*adapter);
+ InvocationVisitor v(adapter.get());
+ method->accept(v);
+ if (!v.wasHandled()) {
+ throw ConnectionException(540, "Not implemented");
+ } else if (v.hasResult()) {
+ ChannelAdapter::send(ExecutionResultBody(getVersion(), incoming.lwm.getValue(), v.getResult()));
+ }
}
}catch(const ChannelException& e){
adapter->getProxy().getChannel().close(