diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/Makefile.am | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Connection.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/ConnectionHandler.cpp (renamed from cpp/src/qpid/broker/ConnectionAdapter.cpp) | 38 | ||||
-rw-r--r-- | cpp/src/qpid/broker/ConnectionHandler.h (renamed from cpp/src/qpid/broker/ConnectionAdapter.h) | 23 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SemanticHandler.cpp | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionHandler.cpp | 57 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionHandler.h | 21 | ||||
-rwxr-xr-x | cpp/src/tests/python_tests | 2 |
8 files changed, 24 insertions, 126 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 7f25c194d4..cf7029dabc 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -157,7 +157,7 @@ libqpidbroker_la_SOURCES = \ qpid/broker/BrokerExchange.cpp \ qpid/broker/BrokerQueue.cpp \ qpid/broker/Connection.cpp \ - qpid/broker/ConnectionAdapter.cpp \ + qpid/broker/ConnectionHandler.cpp \ qpid/broker/ConnectionFactory.cpp \ qpid/broker/Daemon.cpp \ qpid/broker/DeliverableMessage.cpp \ @@ -268,7 +268,7 @@ nobase_include_HEADERS = \ qpid/broker/BrokerAdapter.h \ qpid/broker/BrokerSingleton.h \ qpid/broker/Connection.h \ - qpid/broker/ConnectionAdapter.h \ + qpid/broker/ConnectionHandler.h \ qpid/broker/ConnectionFactory.h \ qpid/broker/ConnectionToken.h \ qpid/broker/Daemon.h \ diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h index 94651701dd..2723ac9acc 100644 --- a/cpp/src/qpid/broker/Connection.h +++ b/cpp/src/qpid/broker/Connection.h @@ -36,7 +36,7 @@ #include "Broker.h" #include "qpid/Exception.h" #include "Session.h" -#include "ConnectionAdapter.h" +#include "ConnectionHandler.h" #include "SessionHandler.h" #include <boost/optional.hpp> @@ -95,7 +95,7 @@ class Connection : public sys::ConnectionInputHandler, uint16_t heartbeat; framing::AMQP_ClientProxy::Connection* client; uint64_t stagingThreshold; - ConnectionAdapter adapter; + ConnectionHandler adapter; }; }} diff --git a/cpp/src/qpid/broker/ConnectionAdapter.cpp b/cpp/src/qpid/broker/ConnectionHandler.cpp index e33aeda8c7..a769d05470 100644 --- a/cpp/src/qpid/broker/ConnectionAdapter.cpp +++ b/cpp/src/qpid/broker/ConnectionHandler.cpp @@ -20,7 +20,7 @@ * */ -#include "ConnectionAdapter.h" +#include "ConnectionHandler.h" #include "Connection.h" #include "qpid/framing/ConnectionStartBody.h" @@ -28,34 +28,24 @@ using namespace qpid; using namespace qpid::broker; using namespace qpid::framing; -void ConnectionAdapter::init(const framing::ProtocolInitiation& header) { +void ConnectionHandler::init(const framing::ProtocolInitiation& header) { FieldTable properties; string mechanisms("PLAIN"); string locales("en_US"); handler->client.start(header.getMajor(), header.getMinor(), properties, mechanisms, locales); } -void ConnectionAdapter::close(ReplyCode code, const string& text, ClassId classId, MethodId methodId) +void ConnectionHandler::close(ReplyCode code, const string& text, ClassId classId, MethodId methodId) { handler->client.close(code, text, classId, methodId); } - -framing::AMQP_ServerOperations::ConnectionHandler* ConnectionAdapter::getConnectionHandler() -{ - return handler.get(); -} - -framing::ProtocolVersion ConnectionAdapter::getVersion() const -{ - return handler->connection.getVersion(); -} - -void ConnectionAdapter::handle(framing::AMQFrame& frame) +void ConnectionHandler::handle(framing::AMQFrame& frame) { AMQMethodBody* method=frame.getBody()->getMethod(); try{ - method->invoke(*this); + if (!method->invoke(handler.get())) + throw ConnectionException(503, "Class can't be accessed over channel 0"); }catch(ConnectionException& e){ handler->client.close(e.code, e.toString(), method->amqpClassId(), method->amqpMethodId()); }catch(std::exception& e){ @@ -63,27 +53,27 @@ void ConnectionAdapter::handle(framing::AMQFrame& frame) } } -ConnectionAdapter::ConnectionAdapter(Connection& connection) : handler(new Handler(connection)) {} +ConnectionHandler::ConnectionHandler(Connection& connection) : handler(new Handler(connection)) {} -ConnectionAdapter::Handler:: Handler(Connection& c) : client(c.getOutput()), connection(c) {} +ConnectionHandler::Handler:: Handler(Connection& c) : client(c.getOutput()), connection(c) {} -void ConnectionAdapter::Handler::startOk(const FieldTable& /*clientProperties*/, +void ConnectionHandler::Handler::startOk(const FieldTable& /*clientProperties*/, const string& /*mechanism*/, const string& /*response*/, const string& /*locale*/) { client.tune(framing::CHANNEL_MAX, connection.getFrameMax(), connection.getHeartbeat()); } -void ConnectionAdapter::Handler::secureOk(const string& /*response*/){} +void ConnectionHandler::Handler::secureOk(const string& /*response*/){} -void ConnectionAdapter::Handler::tuneOk(uint16_t /*channelmax*/, +void ConnectionHandler::Handler::tuneOk(uint16_t /*channelmax*/, uint32_t framemax, uint16_t heartbeat) { connection.setFrameMax(framemax); connection.setHeartbeat(heartbeat); } -void ConnectionAdapter::Handler::open(const string& /*virtualHost*/, +void ConnectionHandler::Handler::open(const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/) { string knownhosts; @@ -91,13 +81,13 @@ void ConnectionAdapter::Handler::open(const string& /*virtualHost*/, } -void ConnectionAdapter::Handler::close(uint16_t /*replyCode*/, const string& /*replyText*/, +void ConnectionHandler::Handler::close(uint16_t /*replyCode*/, const string& /*replyText*/, uint16_t /*classId*/, uint16_t /*methodId*/) { client.closeOk(); connection.getOutput().close(); } -void ConnectionAdapter::Handler::closeOk(){ +void ConnectionHandler::Handler::closeOk(){ connection.getOutput().close(); } diff --git a/cpp/src/qpid/broker/ConnectionAdapter.h b/cpp/src/qpid/broker/ConnectionHandler.h index eb96575c9d..aa8c9366cd 100644 --- a/cpp/src/qpid/broker/ConnectionAdapter.h +++ b/cpp/src/qpid/broker/ConnectionHandler.h @@ -37,7 +37,7 @@ namespace broker { class Connection; // TODO aconway 2007-09-18: Rename to ConnectionHandler -class ConnectionAdapter : public framing::FrameHandler, public framing::AMQP_ServerOperations +class ConnectionHandler : public framing::FrameHandler { struct Handler : public framing::AMQP_ServerOperations::ConnectionHandler { @@ -58,29 +58,10 @@ class ConnectionAdapter : public framing::FrameHandler, public framing::AMQP_Ser }; std::auto_ptr<Handler> handler; public: - ConnectionAdapter(Connection& connection); + ConnectionHandler(Connection& connection); void init(const framing::ProtocolInitiation& header); void close(framing::ReplyCode code, const std::string& text, framing::ClassId classId, framing::MethodId methodId); void handle(framing::AMQFrame& frame); - - //AMQP_ServerOperations: - ConnectionHandler* getConnectionHandler(); - ChannelHandler* getChannelHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); } - BasicHandler* getBasicHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); } - ExchangeHandler* getExchangeHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); } - BindingHandler* getBindingHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); } - QueueHandler* getQueueHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); } - TxHandler* getTxHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); } - MessageHandler* getMessageHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); } - AccessHandler* getAccessHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); } - FileHandler* getFileHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); } - StreamHandler* getStreamHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); } - TunnelHandler* getTunnelHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); } - DtxCoordinationHandler* getDtxCoordinationHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); } - DtxDemarcationHandler* getDtxDemarcationHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); } - ExecutionHandler* getExecutionHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); } - SessionHandler* getSessionHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); } - framing::ProtocolVersion getVersion() const; }; diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp index fc878ca346..f8d76c3b5f 100644 --- a/cpp/src/qpid/broker/SemanticHandler.cpp +++ b/cpp/src/qpid/broker/SemanticHandler.cpp @@ -28,7 +28,6 @@ #include "Session.h" #include "qpid/framing/ExecutionCompleteBody.h" #include "qpid/framing/ExecutionResultBody.h" -#include "qpid/framing/ChannelOpenBody.h" #include "qpid/framing/InvocationVisitor.h" #include <boost/format.hpp> diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp index e7ef6fdb87..01ce88059a 100644 --- a/cpp/src/qpid/broker/SessionHandler.cpp +++ b/cpp/src/qpid/broker/SessionHandler.cpp @@ -33,8 +33,7 @@ using namespace std; SessionHandler::SessionHandler(Connection& c, ChannelId ch) : InOutHandler(0, &c.getOutput()), connection(c), channel(ch), proxy(out), - ignoring(false), channelHandler(*this), - useChannelClose(false) {} + ignoring(false) {} SessionHandler::~SessionHandler() {} @@ -52,7 +51,7 @@ void SessionHandler::handleIn(AMQFrame& f) { // AMQMethodBody* m=f.getMethod(); try { - if (m && (m->invoke(this) || m->invoke(&channelHandler))) + if (m && m->invoke(this)) return; else if (session) session->in(f); @@ -62,12 +61,7 @@ void SessionHandler::handleIn(AMQFrame& f) { } catch(const ChannelException& e) { ignoring=true; // Ignore trailing frames sent by client. session.reset(); - // FIXME aconway 2007-09-19: Dual-mode hack. - if (useChannelClose) - getProxy().getChannel().close( - e.code, e.toString(), classId(m), methodId(m)); - else - getProxy().getSession().closed(e.code, e.toString()); + getProxy().getSession().closed(e.code, e.toString()); }catch(const ConnectionException& e){ connection.close(e.code, e.what(), classId(m), methodId(m)); }catch(const std::exception& e){ @@ -98,51 +92,6 @@ void SessionHandler::assertClosed(const char* method) { << getChannel())); } -void SessionHandler::ChannelMethods::open(const string& /*outOfBand*/){ - parent.useChannelClose=true; - parent.assertClosed("open"); - parent.session.reset(new Session(parent, 0)); - parent.getProxy().getChannel().openOk(); -} - -// FIXME aconway 2007-08-31: flow is no longer in the spec. -void SessionHandler::ChannelMethods::flow(bool active){ - parent.session->flow(active); - parent.getProxy().getChannel().flowOk(active); -} - -void SessionHandler::ChannelMethods::flowOk(bool /*active*/){} - -void SessionHandler::ChannelMethods::close(uint16_t replyCode, - const string& replyText, - uint16_t classId, uint16_t methodId) -{ - // FIXME aconway 2007-08-31: Extend constants.h to map codes & ids - // to text names. - QPID_LOG(warning, "Received channel.close("<<replyCode<<"," - <<replyText << "," - << "classid=" <<classId<< "," - << "methodid=" <<methodId); - parent.ignoring=false; - parent.getProxy().getChannel().closeOk(); - // FIXME aconway 2007-08-31: sould reset session BEFORE - // sending closeOK to avoid races. SessionHandler - // needs its own private proxy, see getProxy() above. - parent.session.reset(); - // No need to remove from connection map, will be re-used - // if channel is re-opened. -} - -void SessionHandler::ChannelMethods::closeOk(){ - parent.ignoring=false; -} - -void SessionHandler::ChannelMethods::ok() -{ - //no specific action required, generic response handling should be - //sufficient -} - void SessionHandler::open(uint32_t detachedLifetime) { assertClosed("open"); session.reset(new Session(*this, detachedLifetime)); diff --git a/cpp/src/qpid/broker/SessionHandler.h b/cpp/src/qpid/broker/SessionHandler.h index 5962ab77a8..a9c0f69985 100644 --- a/cpp/src/qpid/broker/SessionHandler.h +++ b/cpp/src/qpid/broker/SessionHandler.h @@ -64,25 +64,6 @@ class SessionHandler : public framing::FrameHandler::InOutHandler, void handleOut(framing::AMQFrame&); private: - // FIXME aconway 2007-08-31: Drop channel. - struct ChannelMethods : public framing::AMQP_ServerOperations::ChannelHandler { - SessionHandler& parent; - - ChannelMethods(SessionHandler& p) : parent(p) {} - void open(const std::string& outOfBand); - void flow(bool active); - void flowOk(bool active); - void ok( ); - void ping( ); - void pong( ); - void resume( const std::string& channelId ); - void close(uint16_t replyCode, - const std::string& replyText, - uint16_t classId, uint16_t methodId); - void closeOk(); - }; - friend class ChannelMethods; - /// Session methods void open(uint32_t detachedLifetime); void flow(bool active); @@ -105,8 +86,6 @@ class SessionHandler : public framing::FrameHandler::InOutHandler, framing::AMQP_ClientProxy proxy; shared_ptr<Session> session; bool ignoring; - ChannelMethods channelHandler; - bool useChannelClose; // FIXME aconway 2007-09-19: remove with channel. }; }} // namespace qpid::broker diff --git a/cpp/src/tests/python_tests b/cpp/src/tests/python_tests index 33d60fcf09..d9754ed0fb 100755 --- a/cpp/src/tests/python_tests +++ b/cpp/src/tests/python_tests @@ -1,7 +1,7 @@ #!/bin/sh # Run the python tests. if test -d ../../../python ; then - cd ../../../python && ./run-tests -v -s ../specs/amqp-transitional.0-10.xml -I cpp_failing_0-10.txt -b localhost:$QPID_PORT $PYTHON_TESTS + cd ../../../python && ./run-tests -v -s ../specs/amqp.0-10-preview.xml -I cpp_failing_0-10.txt -b localhost:$QPID_PORT $PYTHON_TESTS else echo Warning: python tests not found. fi |