/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include "SemanticHandler.h" #include "boost/format.hpp" #include "BrokerAdapter.h" #include "MessageDelivery.h" #include "qpid/framing/ChannelAdapter.h" #include "qpid/framing/ChannelCloseOkBody.h" #include "qpid/framing/ExecutionCompleteBody.h" #include "qpid/framing/ExecutionResultBody.h" #include "qpid/framing/InvocationVisitor.h" using namespace qpid::broker; using namespace qpid::framing; using namespace qpid::sys; SemanticHandler::SemanticHandler(ChannelId id, Connection& c) : connection(c), channel(c, *this, id) { init(id, connection.getOutput(), connection.getVersion()); adapter = std::auto_ptr(new BrokerAdapter(channel, connection, connection.broker, *this)); } void SemanticHandler::handle(framing::AMQFrame& frame) { //TODO: assembly for method and headers //have potentially three separate tracks at this point: // // (1) execution controls // (2) commands // (3) data i.e. content-bearing commands // //framesets on each can be interleaved. framesets on the latter //two share a command-id sequence. controls on the first track are //used to communicate details about that command-id sequence. // //need to decide what to do if a frame on the command track //arrives while a frameset on the data track is still //open. execute it (i.e. out-of order execution with respect to //the command id sequence) or queue it up? try{ TrackId track = getTrack(frame);//will be replaced by field in 0-10 frame header switch(track) { case SESSION_CONTROL_TRACK://TODO: L2 should be handled by separate handler handleL2(frame.castBody()); break; case EXECUTION_CONTROL_TRACK: handleL3(frame.castBody()); break; case MODEL_COMMAND_TRACK: if (!isOpen()) { throw ConnectionException(504, (boost::format("Attempt to use unopened channel: %g") % getId()).str()); } handleCommand(frame.castBody()); break; case MODEL_CONTENT_TRACK: handleContent(frame); break; } }catch(const ChannelException& e){ adapter->getProxy().getChannel().close(e.code, e.toString(), getClassId(frame), getMethodId(frame)); connection.closeChannel(getId()); }catch(const ConnectionException& e){ connection.close(e.code, e.toString(), getClassId(frame), getMethodId(frame)); }catch(const std::exception& e){ connection.close(541/*internal error*/, e.what(), getClassId(frame), getMethodId(frame)); } } void SemanticHandler::complete(uint32_t cumulative, const SequenceNumberSet& range) { //record: SequenceNumber mark(cumulative); if (outgoing.lwm < mark) { outgoing.lwm = mark; //ack messages: channel.ackCumulative(mark.getValue()); } if (range.size() % 2) { //must be even number throw ConnectionException(530, "Received odd number of elements in ranged mark"); } else { for (SequenceNumberSet::const_iterator i = range.begin(); i != range.end(); i++) { channel.ackRange((uint64_t) i->getValue(), (uint64_t) (++i)->getValue()); } } } void SemanticHandler::flush() { //flush doubles as a sync to begin with - send an execution.complete if (isOpen()) { Mutex::ScopedLock l(outLock); ChannelAdapter::send(ExecutionCompleteBody(getVersion(), incoming.hwm.getValue(), SequenceNumberSet())); } } void SemanticHandler::sync() { //for now, just treat as flush; will need to get more clever when we deal with async publication flush(); } void SemanticHandler::noop() { //Do nothing... // //is this an L3 control? or is it an L4 command? //if the former, of what use is it? //if the latter it may contain a synch request... but its odd to have it in this class } void SemanticHandler::result(uint32_t /*command*/, const std::string& /*data*/) { //never actually sent by client at present } void SemanticHandler::handleCommand(framing::AMQMethodBody* method) { ++(incoming.lwm); InvocationVisitor v(adapter.get()); method->accept(v); //TODO: need to account for async store operations and interleaving ++(incoming.hwm); if (!v.wasHandled()) { throw ConnectionException(540, "Not implemented"); } else if (v.hasResult()) { ChannelAdapter::send(ExecutionResultBody(getVersion(), incoming.lwm.getValue(), v.getResult())); } } void SemanticHandler::handleL2(framing::AMQMethodBody* method) { if(!method->isA() && !isOpen()) { if (!method->isA()) { throw ConnectionException(504, (boost::format("Attempt to use unopened channel: %g") % getId()).str()); } } else { method->invoke(adapter->getChannelHandler()); } } void SemanticHandler::handleL3(framing::AMQMethodBody* method) { if (!method->invoke(this)) { throw ConnectionException(540, "Not implemented"); } } void SemanticHandler::handleContent(AMQFrame& frame) { Message::shared_ptr msg(msgBuilder.getMessage()); if (!msg) {//start of frameset will be indicated by frame flags msgBuilder.start(++(incoming.lwm)); msg = msgBuilder.getMessage(); } msgBuilder.handle(frame); if (msg->getFrames().isComplete()) {//end of frameset will be indicated by frame flags msg->setPublisher(&connection); channel.handle(msg); msgBuilder.end(); //TODO: need to account for async store operations and interleaving ++(incoming.hwm); } } bool SemanticHandler::isOpen() const { return channel.isOpen(); } DeliveryId SemanticHandler::deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token) { Mutex::ScopedLock l(outLock); SequenceNumber copy(outgoing.hwm); ++copy; MessageDelivery::deliver(msg, *this, copy.getValue(), token, connection.getFrameMax()); return outgoing.hwm.getValue(); } void SemanticHandler::redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId tag) { MessageDelivery::deliver(msg, *this, tag, token, connection.getFrameMax()); } void SemanticHandler::send(const AMQBody& body) { Mutex::ScopedLock l(outLock); if (body.getMethod() && body.getMethod()->amqpClassId() != ChannelOpenBody::CLASS_ID) { //temporary hack until channel management is moved to its own handler: ++outgoing.hwm; } ChannelAdapter::send(body); } uint16_t SemanticHandler::getClassId(const AMQFrame& frame) { return frame.getBody()->type() == METHOD_BODY ? frame.castBody()->amqpClassId() : 0; } uint16_t SemanticHandler::getMethodId(const AMQFrame& frame) { return frame.getBody()->type() == METHOD_BODY ? frame.castBody()->amqpMethodId() : 0; } SemanticHandler::TrackId SemanticHandler::getTrack(const AMQFrame& frame) { //will be replaced by field in 0-10 frame header uint8_t type = frame.getBody()->type(); uint16_t classId; switch(type) { case METHOD_BODY: if (frame.castBody()->isContentBearing()) { return MODEL_CONTENT_TRACK; } classId = frame.castBody()->amqpClassId(); switch (classId) { case ChannelOpenBody::CLASS_ID: return SESSION_CONTROL_TRACK; case ExecutionCompleteBody::CLASS_ID: return EXECUTION_CONTROL_TRACK; } return MODEL_COMMAND_TRACK; case HEADER_BODY: case CONTENT_BODY: return MODEL_CONTENT_TRACK; } throw Exception("Could not determine track"); } //ChannelAdapter virtual methods, no longer used: void SemanticHandler::handleMethod(framing::AMQMethodBody*){} void SemanticHandler::handleHeader(qpid::framing::AMQHeaderBody*) {} void SemanticHandler::handleContent(qpid::framing::AMQContentBody*) {} void SemanticHandler::handleHeartbeat(qpid::framing::AMQHeartbeatBody*) {}