diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/BrokerAdapter.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionAdapter.cpp | 25 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionAdapter.h | 19 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionHandler.cpp | 12 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionHandler.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionState.cpp | 45 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionState.h | 6 | ||||
-rw-r--r-- | cpp/src/qpid/framing/AMQFrame.cpp | 5 | ||||
-rwxr-xr-x | cpp/src/tests/python_tests | 2 |
9 files changed, 88 insertions, 28 deletions
diff --git a/cpp/src/qpid/broker/BrokerAdapter.h b/cpp/src/qpid/broker/BrokerAdapter.h index 6e92f89706..3314ec6be3 100644 --- a/cpp/src/qpid/broker/BrokerAdapter.h +++ b/cpp/src/qpid/broker/BrokerAdapter.h @@ -83,6 +83,7 @@ class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations Exchange010Handler* getExchange010Handler() { throw framing::NotImplementedException("Class not implemented"); } Queue010Handler* getQueue010Handler() { throw framing::NotImplementedException("Class not implemented"); } Message010Handler* getMessage010Handler() { throw framing::NotImplementedException("Class not implemented"); } + Execution010Handler* getExecution010Handler() { throw framing::NotImplementedException("Class not implemented"); } // Handlers no longer implemented in BrokerAdapter: #define BADHANDLER() assert(0); throw framing::NotImplementedException("") diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp index 15c29ed482..5ab0dd84a9 100644 --- a/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/cpp/src/qpid/broker/SessionAdapter.cpp @@ -37,7 +37,8 @@ SessionAdapter::SessionAdapter(SemanticState& s) : HandlerImpl(s), exchangeImpl(s), queueImpl(s), - messageImpl(s) + messageImpl(s), + executionImpl(s) {} @@ -377,6 +378,28 @@ void SessionAdapter::MessageHandlerImpl::acquire(const SequenceSet& transfers) } */ + +void SessionAdapter::ExecutionHandlerImpl::sync() +{ + //TODO +} + +void SessionAdapter::ExecutionHandlerImpl::result(uint32_t /*commandId*/, const string& /*value*/) +{ + //TODO +} + +void SessionAdapter::ExecutionHandlerImpl::exception(uint16_t /*errorCode*/, + uint32_t /*commandId*/, + uint8_t /*classCode*/, + uint8_t /*commandCode*/, + uint8_t /*fieldIndex*/, + const std::string& /*description*/, + const framing::FieldTable& /*errorInfo*/) +{ + //TODO +} + }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SessionAdapter.h b/cpp/src/qpid/broker/SessionAdapter.h index 0dd3529359..c2d61392d7 100644 --- a/cpp/src/qpid/broker/SessionAdapter.h +++ b/cpp/src/qpid/broker/SessionAdapter.h @@ -55,6 +55,7 @@ class SessionAdapter : public HandlerImpl, public framing::AMQP_ServerOperations Message010Handler* getMessage010Handler(){ return &messageImpl; } Exchange010Handler* getExchange010Handler(){ return &exchangeImpl; } Queue010Handler* getQueue010Handler(){ return &queueImpl; } + Execution010Handler* getExecution010Handler(){ return &executionImpl; } BasicHandler* getBasicHandler() { throw framing::NotImplementedException("Class not implemented"); } @@ -172,9 +173,27 @@ class SessionAdapter : public HandlerImpl, public framing::AMQP_ServerOperations }; + class ExecutionHandlerImpl : public Execution010Handler, public HandlerImpl + { + public: + ExecutionHandlerImpl(SemanticState& session) : HandlerImpl(session) {} + + void sync(); + void result(uint32_t commandId, const string& value); + void exception(uint16_t errorCode, + uint32_t commandId, + uint8_t classCode, + uint8_t commandCode, + uint8_t fieldIndex, + const std::string& description, + const framing::FieldTable& errorInfo); + + }; + ExchangeHandlerImpl exchangeImpl; QueueHandlerImpl queueImpl; MessageHandlerImpl messageImpl; + ExecutionHandlerImpl executionImpl; }; }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp index 919a3e6ee8..3baa3a89a7 100644 --- a/cpp/src/qpid/broker/SessionHandler.cpp +++ b/cpp/src/qpid/broker/SessionHandler.cpp @@ -69,12 +69,6 @@ void SessionHandler::handleIn(AMQFrame& f) { QPID_MSG("Channel " << channel.get() << " is not open")); } } - } catch(const ChannelException& e) { - ignoring=true; // Ignore trailing frames sent by client. - session->detach(); - session.reset(); - //TODO: implement new exception handling mechanism - //peerSession.closed(e.code, e.what()); }catch(const ConnectionException& e){ connection.close(e.code, e.what(), classId(m), methodId(m)); }catch(const std::exception& e){ @@ -83,6 +77,12 @@ void SessionHandler::handleIn(AMQFrame& f) { } } +void SessionHandler::destroy() { + ignoring=true; // Ignore trailing frames sent by client. + session->detach(); + session.reset(); +} + void SessionHandler::handleOut(AMQFrame& f) { channel.handle(f); // Send it. if (session->sent(f)) diff --git a/cpp/src/qpid/broker/SessionHandler.h b/cpp/src/qpid/broker/SessionHandler.h index 4b031f2951..fa013a1c15 100644 --- a/cpp/src/qpid/broker/SessionHandler.h +++ b/cpp/src/qpid/broker/SessionHandler.h @@ -70,6 +70,7 @@ class SessionHandler : public framing::AMQP_ServerOperations::Session010Handler, void localSuspend(); void detach() { localSuspend(); } void sendCompletion(); + void destroy(); protected: void handleIn(framing::AMQFrame&); diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index 571b8848ae..ceaa70db18 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -168,16 +168,16 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId, return status; } -void SessionState::handleCommand(framing::AMQMethodBody* method) +void SessionState::handleCommand(framing::AMQMethodBody* method, SequenceNumber& id) { - SequenceNumber id = nextIn++; + id = nextIn++; Invoker::Result invocation = invoke(adapter, *method); completed.add(id); if (!invocation.wasHandled()) { throw NotImplementedException("Not implemented"); } else if (invocation.hasResult()) { - getProxy().getExecution().result(id.getValue(), invocation.getResult()); + getProxy().getExecution010().result(id, invocation.getResult()); } if (method->isSync()) { sendCompletion(); @@ -185,16 +185,18 @@ void SessionState::handleCommand(framing::AMQMethodBody* method) //TODO: if window gets too large send unsolicited completion } -void SessionState::handleContent(AMQFrame& frame) +void SessionState::handleContent(AMQFrame& frame, SequenceNumber& id) { intrusive_ptr<Message> msg(msgBuilder.getMessage()); - if (!msg) {//start of frameset will be indicated by frame flags - SequenceNumber id = nextIn++; + if (frame.getBof() && frame.getBos()) {//start of frameset + id = nextIn++; msgBuilder.start(id); msg = msgBuilder.getMessage(); + } else { + id = msg->getCommandId(); } msgBuilder.handle(frame); - if (frame.getEof() && frame.getEos()) {//end of frameset will be indicated by frame flags + if (frame.getEof() && frame.getEos()) {//end of frameset msg->setPublisher(&getConnection()); semanticState.handle(msg); msgBuilder.end(); @@ -210,16 +212,27 @@ void SessionState::handle(AMQFrame& frame) { received(frame); - //TODO: make command handling more uniform, regardless of whether - //commands carry content. (For now, assume all single frame - //assmblies are non-content bearing and all content-bearing - //assmeblies will have more than one frame): - if (frame.getBof() && frame.getEof()) { - handleCommand(frame.getMethod()); - } else { - handleContent(frame); + SequenceNumber commandId; + try { + //TODO: make command handling more uniform, regardless of whether + //commands carry content. (For now, assume all single frame + //assemblies are non-content bearing and all content-bearing + //assemblies will have more than one frame): + if (frame.getBof() && frame.getEof()) { + handleCommand(frame.getMethod(), commandId); + } else { + handleContent(frame, commandId); + } + } catch(const ChannelException& e) { + //TODO: better implementation of new exception handling mechanism + AMQMethodBody* m = frame.getMethod(); + if (m) { + getProxy().getExecution010().exception(e.code, commandId, m->amqpClassId(), m->amqpMethodId(), 0, e.what(), FieldTable()); + } else { + getProxy().getExecution010().exception(e.code, commandId, 0, 0, 0, e.what(), FieldTable()); + } + handler->destroy(); } - } DeliveryId SessionState::deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token) diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h index c936edee21..ecf0b41a7a 100644 --- a/cpp/src/qpid/broker/SessionState.h +++ b/cpp/src/qpid/broker/SessionState.h @@ -93,8 +93,6 @@ class SessionState : public framing::SessionState, void activateOutput(); void handle(framing::AMQFrame& frame); - void handleCommand(framing::AMQMethodBody* method); - void handleContent(framing::AMQFrame& frame); void complete(const framing::SequenceSet& ranges); void sendCompletion(); @@ -138,8 +136,10 @@ class SessionState : public framing::SessionState, RangedOperation ackOp; management::Session::shared_ptr mgmtObject; + void handleCommand(framing::AMQMethodBody* method, framing::SequenceNumber& id); + void handleContent(framing::AMQFrame& frame, framing::SequenceNumber& id); - friend class SessionManager; + friend class SessionManager; }; diff --git a/cpp/src/qpid/framing/AMQFrame.cpp b/cpp/src/qpid/framing/AMQFrame.cpp index 3750f4a9a8..eeb658600d 100644 --- a/cpp/src/qpid/framing/AMQFrame.cpp +++ b/cpp/src/qpid/framing/AMQFrame.cpp @@ -49,12 +49,15 @@ uint32_t AMQFrame::frameOverhead() { void AMQFrame::encode(Buffer& buffer) const { + //set track first (controls on track 0, everything else on 1): + uint8_t track = getBody()->type() ? 1 : 0; + uint8_t flags = (bof ? 0x08 : 0) | (eof ? 0x04 : 0) | (bos ? 0x02 : 0) | (eos ? 0x01 : 0); buffer.putOctet(flags); buffer.putOctet(getBody()->type()); buffer.putShort(size() - 1); // Don't include end marker (it's not part of the frame itself) buffer.putOctet(0); - buffer.putOctet(0x0f & subchannel); + buffer.putOctet(0x0f & track); buffer.putShort(channel); buffer.putLong(0); body->encode(buffer); diff --git a/cpp/src/tests/python_tests b/cpp/src/tests/python_tests index d9754ed0fb..73b3b970c1 100755 --- a/cpp/src/tests/python_tests +++ b/cpp/src/tests/python_tests @@ -1,7 +1,7 @@ #!/bin/sh # Run the python tests. if test -d ../../../python ; then - cd ../../../python && ./run-tests -v -s ../specs/amqp.0-10-preview.xml -I cpp_failing_0-10.txt -b localhost:$QPID_PORT $PYTHON_TESTS + cd ../../../python && ./run-tests -v -s ../specs/amqp.0-10-preview.xml -I cpp_failing_0-10_preview.txt -b localhost:$QPID_PORT $PYTHON_TESTS && ./run-tests --skip-self-test -v -s "0-10" -I cpp_failing_0-10.txt -b localhost:$QPID_PORT $PYTHON_TESTS else echo Warning: python tests not found. fi |