diff options
Diffstat (limited to 'cpp/src/qpid/broker/SemanticHandler.cpp')
-rw-r--r-- | cpp/src/qpid/broker/SemanticHandler.cpp | 198 |
1 files changed, 123 insertions, 75 deletions
diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp index f65e450e82..5e9106c1dd 100644 --- a/cpp/src/qpid/broker/SemanticHandler.cpp +++ b/cpp/src/qpid/broker/SemanticHandler.cpp @@ -20,7 +20,10 @@ */ #include "SemanticHandler.h" + +#include "boost/format.hpp" #include "BrokerAdapter.h" +#include "MessageDelivery.h" #include "qpid/framing/ChannelAdapter.h" #include "qpid/framing/ChannelCloseOkBody.h" #include "qpid/framing/ExecutionCompleteBody.h" @@ -32,18 +35,16 @@ using namespace qpid::framing; using namespace qpid::sys; SemanticHandler::SemanticHandler(ChannelId id, Connection& c) : - connection(c), - channel(c, *this, id, &c.broker.getStore()) + connection(c), channel(c, *this, id) { init(id, connection.getOutput(), connection.getVersion()); adapter = std::auto_ptr<BrokerAdapter>(new BrokerAdapter(channel, connection, connection.broker, *this)); } - void SemanticHandler::handle(framing::AMQFrame& frame) { - //TODO: assembly etc when move to 0-10 framing - // + //TODO: assembly for method and headers + //have potentially three separate tracks at this point: // // (1) execution controls @@ -51,46 +52,43 @@ void SemanticHandler::handle(framing::AMQFrame& frame) // (3) data i.e. content-bearing commands // //framesets on each can be interleaved. framesets on the latter - //two share a command-id sequence. + //two share a command-id sequence. controls on the first track are + //used to communicate details about that command-id sequence. // //need to decide what to do if a frame on the command track //arrives while a frameset on the data track is still //open. execute it (i.e. out-of order execution with respect to - //the command id sequence) or queue it up. + //the command id sequence) or queue it up? - //if ready to execute (i.e. if segment is complete or frame is - //message content): - handleBody(frame.getBody()); -} - -//ChannelAdapter virtual methods: -void SemanticHandler::handleMethod(framing::AMQMethodBody* method) -{ - try { - if (!method->invoke(this)) { - //temporary hack until channel management is moved to its own handler: - if (method->amqpClassId() != ChannelOpenBody::CLASS_ID) { - ++(incoming.lwm); - } + try{ - //else do the usual: - handleL4(method); - //(if the frameset is complete) we can move the execution-mark - //forward - - //temporary hack until channel management is moved to its own handler: - if (method->amqpClassId() != ChannelOpenBody::CLASS_ID) { - //TODO: need to account for async store opreations - //when this command is a message publication - ++(incoming.hwm); + TrackId track = getTrack(frame);//will be replaced by field in 0-10 frame header + + switch(track) { + case SESSION_CONTROL_TRACK://TODO: L2 should be handled by separate handler + handleL2(frame.castBody<AMQMethodBody>()); + break; + case EXECUTION_CONTROL_TRACK: + handleL3(frame.castBody<AMQMethodBody>()); + break; + case MODEL_COMMAND_TRACK: + if (!isOpen()) { + throw ConnectionException(504, (boost::format("Attempt to use unopened channel: %g") % getId()).str()); } - - //note: need to be more sophisticated than this if we execute - //commands that arrive within an active message frameset (that - //can't happen until 0-10 framing is implemented) + handleCommand(frame.castBody<AMQMethodBody>()); + break; + case MODEL_CONTENT_TRACK: + handleContent(frame); + break; } + + }catch(const ChannelException& e){ + adapter->getProxy().getChannel().close(e.code, e.toString(), getClassId(frame), getMethodId(frame)); + connection.closeChannel(getId()); + }catch(const ConnectionException& e){ + connection.close(e.code, e.toString(), getClassId(frame), getMethodId(frame)); }catch(const std::exception& e){ - connection.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId()); + connection.close(541/*internal error*/, e.what(), getClassId(frame), getMethodId(frame)); } } @@ -102,7 +100,6 @@ void SemanticHandler::complete(uint32_t cumulative, const SequenceNumberSet& ran outgoing.lwm = mark; //ack messages: channel.ackCumulative(mark.getValue()); - //std::cout << "[" << this << "] acknowledged: " << mark << std::endl; } if (range.size() % 2) { //must be even number throw ConnectionException(530, "Received odd number of elements in ranged mark"); @@ -116,7 +113,6 @@ void SemanticHandler::complete(uint32_t cumulative, const SequenceNumberSet& ran void SemanticHandler::flush() { //flush doubles as a sync to begin with - send an execution.complete - incoming.lwm = incoming.hwm; if (isOpen()) { Mutex::ScopedLock l(outLock); ChannelAdapter::send(ExecutionCompleteBody(getVersion(), incoming.hwm.getValue(), SequenceNumberSet())); @@ -142,52 +138,59 @@ void SemanticHandler::result(uint32_t /*command*/, const std::string& /*data*/) //never actually sent by client at present } -void SemanticHandler::handleL4(framing::AMQMethodBody* method) +void SemanticHandler::handleCommand(framing::AMQMethodBody* method) { - try{ - if(getId() != 0 && !method->isA<ChannelOpenBody>() && !isOpen()) { - if (!method->isA<ChannelCloseOkBody>()) { - std::stringstream out; - out << "Attempt to use unopened channel: " << getId(); - throw ConnectionException(504, out.str()); - } - } else { - InvocationVisitor v(adapter.get()); - method->accept(v); - if (!v.wasHandled()) { - throw ConnectionException(540, "Not implemented"); - } else if (v.hasResult()) { - ChannelAdapter::send(ExecutionResultBody(getVersion(), incoming.lwm.getValue(), v.getResult())); - } - } - }catch(const ChannelException& e){ - adapter->getProxy().getChannel().close( - e.code, e.toString(), - method->amqpClassId(), method->amqpMethodId()); - connection.closeChannel(getId()); - }catch(const ConnectionException& e){ - connection.close(e.code, e.toString(), method->amqpClassId(), method->amqpMethodId()); + ++(incoming.lwm); + InvocationVisitor v(adapter.get()); + method->accept(v); + //TODO: need to account for async store operations and interleaving + ++(incoming.hwm); + + if (!v.wasHandled()) { + throw ConnectionException(540, "Not implemented"); + } else if (v.hasResult()) { + ChannelAdapter::send(ExecutionResultBody(getVersion(), incoming.lwm.getValue(), v.getResult())); } } -bool SemanticHandler::isOpen() const -{ - return channel.isOpen(); +void SemanticHandler::handleL2(framing::AMQMethodBody* method) +{ + if(!method->isA<ChannelOpenBody>() && !isOpen()) { + if (!method->isA<ChannelCloseOkBody>()) { + throw ConnectionException(504, (boost::format("Attempt to use unopened channel: %g") % getId()).str()); + } + } else { + method->invoke(adapter->getChannelHandler()); + } } -void SemanticHandler::handleHeader(qpid::framing::AMQHeaderBody* body) +void SemanticHandler::handleL3(framing::AMQMethodBody* method) { - channel.handleHeader(body); + if (!method->invoke(this)) { + throw ConnectionException(540, "Not implemented"); + } } -void SemanticHandler::handleContent(qpid::framing::AMQContentBody* body) +void SemanticHandler::handleContent(AMQFrame& frame) { - channel.handleContent(body); + Message::shared_ptr msg(msgBuilder.getMessage()); + if (!msg) {//start of frameset will be indicated by frame flags + msgBuilder.start(++(incoming.lwm)); + msg = msgBuilder.getMessage(); + } + msgBuilder.handle(frame); + if (msg->getFrames().isComplete()) {//end of frameset will be indicated by frame flags + msg->setPublisher(&connection); + channel.handle(msg); + msgBuilder.end(); + //TODO: need to account for async store operations and interleaving + ++(incoming.hwm); + } } -void SemanticHandler::handleHeartbeat(qpid::framing::AMQHeartbeatBody* body) -{ - channel.handleHeartbeat(body); +bool SemanticHandler::isOpen() const +{ + return channel.isOpen(); } DeliveryId SemanticHandler::deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token) @@ -195,14 +198,13 @@ DeliveryId SemanticHandler::deliver(Message::shared_ptr& msg, DeliveryToken::sha Mutex::ScopedLock l(outLock); SequenceNumber copy(outgoing.hwm); ++copy; - msg->deliver(*this, copy.getValue(), token, connection.getFrameMax()); - //std::cout << "[" << this << "] delivered: " << outgoing.hwm.getValue() << std::endl; + MessageDelivery::deliver(msg, *this, copy.getValue(), token, connection.getFrameMax()); return outgoing.hwm.getValue(); } void SemanticHandler::redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId tag) { - msg->deliver(*this, tag, token, connection.getFrameMax()); + MessageDelivery::deliver(msg, *this, tag, token, connection.getFrameMax()); } void SemanticHandler::send(const AMQBody& body) @@ -214,3 +216,49 @@ void SemanticHandler::send(const AMQBody& body) } ChannelAdapter::send(body); } + +uint16_t SemanticHandler::getClassId(const AMQFrame& frame) +{ + return frame.getBody()->type() == METHOD_BODY ? frame.castBody<AMQMethodBody>()->amqpClassId() : 0; +} + +uint16_t SemanticHandler::getMethodId(const AMQFrame& frame) +{ + return frame.getBody()->type() == METHOD_BODY ? frame.castBody<AMQMethodBody>()->amqpMethodId() : 0; +} + +SemanticHandler::TrackId SemanticHandler::getTrack(const AMQFrame& frame) +{ + //will be replaced by field in 0-10 frame header + uint8_t type = frame.getBody()->type(); + uint16_t classId; + switch(type) { + case METHOD_BODY: + if (frame.castBody<AMQMethodBody>()->isContentBearing()) { + return MODEL_CONTENT_TRACK; + } + + classId = frame.castBody<AMQMethodBody>()->amqpClassId(); + switch (classId) { + case ChannelOpenBody::CLASS_ID: + return SESSION_CONTROL_TRACK; + case ExecutionCompleteBody::CLASS_ID: + return EXECUTION_CONTROL_TRACK; + } + + return MODEL_COMMAND_TRACK; + case HEADER_BODY: + case CONTENT_BODY: + return MODEL_CONTENT_TRACK; + } + throw Exception("Could not determine track"); +} + +//ChannelAdapter virtual methods, no longer used: +void SemanticHandler::handleMethod(framing::AMQMethodBody*){} + +void SemanticHandler::handleHeader(qpid::framing::AMQHeaderBody*) {} + +void SemanticHandler::handleContent(qpid::framing::AMQContentBody*) {} + +void SemanticHandler::handleHeartbeat(qpid::framing::AMQHeartbeatBody*) {} |