summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/SemanticHandler.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/SemanticHandler.cpp')
-rw-r--r--cpp/src/qpid/broker/SemanticHandler.cpp198
1 files changed, 123 insertions, 75 deletions
diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp
index f65e450e82..5e9106c1dd 100644
--- a/cpp/src/qpid/broker/SemanticHandler.cpp
+++ b/cpp/src/qpid/broker/SemanticHandler.cpp
@@ -20,7 +20,10 @@
*/
#include "SemanticHandler.h"
+
+#include "boost/format.hpp"
#include "BrokerAdapter.h"
+#include "MessageDelivery.h"
#include "qpid/framing/ChannelAdapter.h"
#include "qpid/framing/ChannelCloseOkBody.h"
#include "qpid/framing/ExecutionCompleteBody.h"
@@ -32,18 +35,16 @@ using namespace qpid::framing;
using namespace qpid::sys;
SemanticHandler::SemanticHandler(ChannelId id, Connection& c) :
- connection(c),
- channel(c, *this, id, &c.broker.getStore())
+ connection(c), channel(c, *this, id)
{
init(id, connection.getOutput(), connection.getVersion());
adapter = std::auto_ptr<BrokerAdapter>(new BrokerAdapter(channel, connection, connection.broker, *this));
}
-
void SemanticHandler::handle(framing::AMQFrame& frame)
{
- //TODO: assembly etc when move to 0-10 framing
- //
+ //TODO: assembly for method and headers
+
//have potentially three separate tracks at this point:
//
// (1) execution controls
@@ -51,46 +52,43 @@ void SemanticHandler::handle(framing::AMQFrame& frame)
// (3) data i.e. content-bearing commands
//
//framesets on each can be interleaved. framesets on the latter
- //two share a command-id sequence.
+ //two share a command-id sequence. controls on the first track are
+ //used to communicate details about that command-id sequence.
//
//need to decide what to do if a frame on the command track
//arrives while a frameset on the data track is still
//open. execute it (i.e. out-of order execution with respect to
- //the command id sequence) or queue it up.
+ //the command id sequence) or queue it up?
- //if ready to execute (i.e. if segment is complete or frame is
- //message content):
- handleBody(frame.getBody());
-}
-
-//ChannelAdapter virtual methods:
-void SemanticHandler::handleMethod(framing::AMQMethodBody* method)
-{
- try {
- if (!method->invoke(this)) {
- //temporary hack until channel management is moved to its own handler:
- if (method->amqpClassId() != ChannelOpenBody::CLASS_ID) {
- ++(incoming.lwm);
- }
+ try{
- //else do the usual:
- handleL4(method);
- //(if the frameset is complete) we can move the execution-mark
- //forward
-
- //temporary hack until channel management is moved to its own handler:
- if (method->amqpClassId() != ChannelOpenBody::CLASS_ID) {
- //TODO: need to account for async store opreations
- //when this command is a message publication
- ++(incoming.hwm);
+ TrackId track = getTrack(frame);//will be replaced by field in 0-10 frame header
+
+ switch(track) {
+ case SESSION_CONTROL_TRACK://TODO: L2 should be handled by separate handler
+ handleL2(frame.castBody<AMQMethodBody>());
+ break;
+ case EXECUTION_CONTROL_TRACK:
+ handleL3(frame.castBody<AMQMethodBody>());
+ break;
+ case MODEL_COMMAND_TRACK:
+ if (!isOpen()) {
+ throw ConnectionException(504, (boost::format("Attempt to use unopened channel: %g") % getId()).str());
}
-
- //note: need to be more sophisticated than this if we execute
- //commands that arrive within an active message frameset (that
- //can't happen until 0-10 framing is implemented)
+ handleCommand(frame.castBody<AMQMethodBody>());
+ break;
+ case MODEL_CONTENT_TRACK:
+ handleContent(frame);
+ break;
}
+
+ }catch(const ChannelException& e){
+ adapter->getProxy().getChannel().close(e.code, e.toString(), getClassId(frame), getMethodId(frame));
+ connection.closeChannel(getId());
+ }catch(const ConnectionException& e){
+ connection.close(e.code, e.toString(), getClassId(frame), getMethodId(frame));
}catch(const std::exception& e){
- connection.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId());
+ connection.close(541/*internal error*/, e.what(), getClassId(frame), getMethodId(frame));
}
}
@@ -102,7 +100,6 @@ void SemanticHandler::complete(uint32_t cumulative, const SequenceNumberSet& ran
outgoing.lwm = mark;
//ack messages:
channel.ackCumulative(mark.getValue());
- //std::cout << "[" << this << "] acknowledged: " << mark << std::endl;
}
if (range.size() % 2) { //must be even number
throw ConnectionException(530, "Received odd number of elements in ranged mark");
@@ -116,7 +113,6 @@ void SemanticHandler::complete(uint32_t cumulative, const SequenceNumberSet& ran
void SemanticHandler::flush()
{
//flush doubles as a sync to begin with - send an execution.complete
- incoming.lwm = incoming.hwm;
if (isOpen()) {
Mutex::ScopedLock l(outLock);
ChannelAdapter::send(ExecutionCompleteBody(getVersion(), incoming.hwm.getValue(), SequenceNumberSet()));
@@ -142,52 +138,59 @@ void SemanticHandler::result(uint32_t /*command*/, const std::string& /*data*/)
//never actually sent by client at present
}
-void SemanticHandler::handleL4(framing::AMQMethodBody* method)
+void SemanticHandler::handleCommand(framing::AMQMethodBody* method)
{
- try{
- if(getId() != 0 && !method->isA<ChannelOpenBody>() && !isOpen()) {
- if (!method->isA<ChannelCloseOkBody>()) {
- std::stringstream out;
- out << "Attempt to use unopened channel: " << getId();
- throw ConnectionException(504, out.str());
- }
- } else {
- InvocationVisitor v(adapter.get());
- method->accept(v);
- if (!v.wasHandled()) {
- throw ConnectionException(540, "Not implemented");
- } else if (v.hasResult()) {
- ChannelAdapter::send(ExecutionResultBody(getVersion(), incoming.lwm.getValue(), v.getResult()));
- }
- }
- }catch(const ChannelException& e){
- adapter->getProxy().getChannel().close(
- e.code, e.toString(),
- method->amqpClassId(), method->amqpMethodId());
- connection.closeChannel(getId());
- }catch(const ConnectionException& e){
- connection.close(e.code, e.toString(), method->amqpClassId(), method->amqpMethodId());
+ ++(incoming.lwm);
+ InvocationVisitor v(adapter.get());
+ method->accept(v);
+ //TODO: need to account for async store operations and interleaving
+ ++(incoming.hwm);
+
+ if (!v.wasHandled()) {
+ throw ConnectionException(540, "Not implemented");
+ } else if (v.hasResult()) {
+ ChannelAdapter::send(ExecutionResultBody(getVersion(), incoming.lwm.getValue(), v.getResult()));
}
}
-bool SemanticHandler::isOpen() const
-{
- return channel.isOpen();
+void SemanticHandler::handleL2(framing::AMQMethodBody* method)
+{
+ if(!method->isA<ChannelOpenBody>() && !isOpen()) {
+ if (!method->isA<ChannelCloseOkBody>()) {
+ throw ConnectionException(504, (boost::format("Attempt to use unopened channel: %g") % getId()).str());
+ }
+ } else {
+ method->invoke(adapter->getChannelHandler());
+ }
}
-void SemanticHandler::handleHeader(qpid::framing::AMQHeaderBody* body)
+void SemanticHandler::handleL3(framing::AMQMethodBody* method)
{
- channel.handleHeader(body);
+ if (!method->invoke(this)) {
+ throw ConnectionException(540, "Not implemented");
+ }
}
-void SemanticHandler::handleContent(qpid::framing::AMQContentBody* body)
+void SemanticHandler::handleContent(AMQFrame& frame)
{
- channel.handleContent(body);
+ Message::shared_ptr msg(msgBuilder.getMessage());
+ if (!msg) {//start of frameset will be indicated by frame flags
+ msgBuilder.start(++(incoming.lwm));
+ msg = msgBuilder.getMessage();
+ }
+ msgBuilder.handle(frame);
+ if (msg->getFrames().isComplete()) {//end of frameset will be indicated by frame flags
+ msg->setPublisher(&connection);
+ channel.handle(msg);
+ msgBuilder.end();
+ //TODO: need to account for async store operations and interleaving
+ ++(incoming.hwm);
+ }
}
-void SemanticHandler::handleHeartbeat(qpid::framing::AMQHeartbeatBody* body)
-{
- channel.handleHeartbeat(body);
+bool SemanticHandler::isOpen() const
+{
+ return channel.isOpen();
}
DeliveryId SemanticHandler::deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token)
@@ -195,14 +198,13 @@ DeliveryId SemanticHandler::deliver(Message::shared_ptr& msg, DeliveryToken::sha
Mutex::ScopedLock l(outLock);
SequenceNumber copy(outgoing.hwm);
++copy;
- msg->deliver(*this, copy.getValue(), token, connection.getFrameMax());
- //std::cout << "[" << this << "] delivered: " << outgoing.hwm.getValue() << std::endl;
+ MessageDelivery::deliver(msg, *this, copy.getValue(), token, connection.getFrameMax());
return outgoing.hwm.getValue();
}
void SemanticHandler::redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId tag)
{
- msg->deliver(*this, tag, token, connection.getFrameMax());
+ MessageDelivery::deliver(msg, *this, tag, token, connection.getFrameMax());
}
void SemanticHandler::send(const AMQBody& body)
@@ -214,3 +216,49 @@ void SemanticHandler::send(const AMQBody& body)
}
ChannelAdapter::send(body);
}
+
+uint16_t SemanticHandler::getClassId(const AMQFrame& frame)
+{
+ return frame.getBody()->type() == METHOD_BODY ? frame.castBody<AMQMethodBody>()->amqpClassId() : 0;
+}
+
+uint16_t SemanticHandler::getMethodId(const AMQFrame& frame)
+{
+ return frame.getBody()->type() == METHOD_BODY ? frame.castBody<AMQMethodBody>()->amqpMethodId() : 0;
+}
+
+SemanticHandler::TrackId SemanticHandler::getTrack(const AMQFrame& frame)
+{
+ //will be replaced by field in 0-10 frame header
+ uint8_t type = frame.getBody()->type();
+ uint16_t classId;
+ switch(type) {
+ case METHOD_BODY:
+ if (frame.castBody<AMQMethodBody>()->isContentBearing()) {
+ return MODEL_CONTENT_TRACK;
+ }
+
+ classId = frame.castBody<AMQMethodBody>()->amqpClassId();
+ switch (classId) {
+ case ChannelOpenBody::CLASS_ID:
+ return SESSION_CONTROL_TRACK;
+ case ExecutionCompleteBody::CLASS_ID:
+ return EXECUTION_CONTROL_TRACK;
+ }
+
+ return MODEL_COMMAND_TRACK;
+ case HEADER_BODY:
+ case CONTENT_BODY:
+ return MODEL_CONTENT_TRACK;
+ }
+ throw Exception("Could not determine track");
+}
+
+//ChannelAdapter virtual methods, no longer used:
+void SemanticHandler::handleMethod(framing::AMQMethodBody*){}
+
+void SemanticHandler::handleHeader(qpid::framing::AMQHeaderBody*) {}
+
+void SemanticHandler::handleContent(qpid::framing::AMQContentBody*) {}
+
+void SemanticHandler::handleHeartbeat(qpid::framing::AMQHeartbeatBody*) {}