diff options
author | Gordon Sim <gsim@apache.org> | 2008-02-21 17:40:42 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2008-02-21 17:40:42 +0000 |
commit | 3767d7e49e80c268c60ee247b3526b986eb7fc17 (patch) | |
tree | 36f0c9487e54705530be26271de7e52676bad524 | |
parent | 5f06a953368f7f41dd8ab94a6775fcd9b5c99792 (diff) | |
download | qpid-python-3767d7e49e80c268c60ee247b3526b986eb7fc17.tar.gz |
Start moving towards final 0-10 spec:
* marked preview spec as 99-0 to distinguish it from 0-10 (which will now be used for the final version)
* modified python client to treat 99-0 as 0-10 for now
* modified broker to have two paths for the two different versions: 99-0 uses PreviewConnection, PreviewConnectionHandler
and PreviewSessionHandler which are straight copy & pastes of the Connection, ConnectionHandler and SessionHandler now
associated with 0-10 (so we can migrate the 0-10 path to the final spec without affecting clients working with the preview
version)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@629883 13f79535-47bb-0310-9956-ffa450edef68
50 files changed, 1428 insertions, 147 deletions
diff --git a/cpp/rubygen/templates/Session.rb b/cpp/rubygen/templates/Session.rb index 21534b78fb..6a50fdb462 100644 --- a/cpp/rubygen/templates/Session.rb +++ b/cpp/rubygen/templates/Session.rb @@ -85,8 +85,8 @@ class SessionNoKeywordGen < CppGen } cpp_class(@classname, "public SessionBase") { public - genl "Session_0_10() {}" - genl "Session_0_10(shared_ptr<SessionCore> core) : SessionBase(core) {}" + genl "Session_#{@amqp.version.bars}() {}" + genl "Session_#{@amqp.version.bars}(shared_ptr<SessionCore> core) : SessionBase(core) {}" session_methods.each { |m| genl doxygen(m) diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 37f8eb8c85..b44aaa7e5e 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -163,6 +163,9 @@ libqpidbroker_la_SOURCES = \ qpid/broker/Queue.cpp \ qpid/broker/PersistableMessage.cpp \ qpid/broker/Bridge.cpp \ + qpid/broker/PreviewConnection.cpp \ + qpid/broker/PreviewConnectionHandler.cpp \ + qpid/broker/PreviewSessionHandler.cpp \ qpid/broker/Connection.cpp \ qpid/broker/ConnectionHandler.cpp \ qpid/broker/ConnectionFactory.cpp \ @@ -186,6 +189,7 @@ libqpidbroker_la_SOURCES = \ qpid/broker/MessageDelivery.cpp \ qpid/broker/MessageHandlerImpl.cpp \ qpid/broker/MessageStoreModule.cpp \ + qpid/broker/MultiVersionConnectionInputHandler.cpp \ qpid/broker/NameGenerator.cpp \ qpid/broker/NullMessageStore.cpp \ qpid/broker/QueueBindings.cpp \ @@ -201,6 +205,7 @@ libqpidbroker_la_SOURCES = \ qpid/broker/SessionManager.h \ qpid/broker/SessionManager.cpp \ qpid/broker/SessionHandler.h \ + qpid/broker/SessionContext.h \ qpid/broker/SessionHandler.cpp \ qpid/broker/SemanticHandler.cpp \ qpid/broker/Timer.cpp \ @@ -262,7 +267,11 @@ nobase_include_HEADERS = \ qpid/broker/Queue.h \ qpid/broker/BrokerSingleton.h \ qpid/broker/Bridge.h \ + qpid/broker/PreviewConnection.h \ + qpid/broker/PreviewConnectionHandler.h \ + qpid/broker/PreviewSessionHandler.h \ qpid/broker/Connection.h \ + qpid/broker/ConnectionState.h \ qpid/broker/ConnectionFactory.h \ qpid/broker/ConnectionHandler.h \ qpid/broker/ConnectionToken.h \ @@ -293,6 +302,7 @@ nobase_include_HEADERS = \ qpid/broker/MessageHandlerImpl.h \ qpid/broker/MessageStore.h \ qpid/broker/MessageStoreModule.h \ + qpid/broker/MultiVersionConnectionInputHandler.h \ qpid/broker/NameGenerator.h \ qpid/broker/NullMessageStore.h \ qpid/broker/Persistable.h \ diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp index 6c1d8e7ca5..566b9cc197 100644 --- a/cpp/src/qpid/broker/Bridge.cpp +++ b/cpp/src/qpid/broker/Bridge.cpp @@ -19,7 +19,7 @@ * */ #include "Bridge.h" -#include "Connection.h" +#include "ConnectionState.h" #include "qpid/management/ManagementAgent.h" #include "qpid/framing/FieldTable.h" @@ -31,7 +31,7 @@ using qpid::framing::Uuid; namespace qpid { namespace broker { -Bridge::Bridge(framing::ChannelId id, Connection& c, CancellationListener l, const management::ArgsLinkBridge& _args) : +Bridge::Bridge(framing::ChannelId id, ConnectionState& c, CancellationListener l, const management::ArgsLinkBridge& _args) : args(_args), channel(id, &(c.getOutput())), peer(channel), mgmtObject(new management::Bridge(this, &c, id, args.i_src, args.i_dest, args.i_key, args.i_src_is_queue, args.i_src_is_local)), connection(c), listener(l) diff --git a/cpp/src/qpid/broker/Bridge.h b/cpp/src/qpid/broker/Bridge.h index 8506325ddb..1198285c93 100644 --- a/cpp/src/qpid/broker/Bridge.h +++ b/cpp/src/qpid/broker/Bridge.h @@ -32,14 +32,14 @@ namespace qpid { namespace broker { -class Connection; +class ConnectionState; class Bridge : public management::Manageable { public: typedef boost::function<void(Bridge*)> CancellationListener; - Bridge(framing::ChannelId id, Connection& c, CancellationListener l, + Bridge(framing::ChannelId id, ConnectionState& c, CancellationListener l, const management::ArgsLinkBridge& args); ~Bridge(); @@ -54,7 +54,7 @@ private: framing::ChannelHandler channel; framing::AMQP_ServerProxy peer; management::Bridge::shared_ptr mgmtObject; - Connection& connection; + ConnectionState& connection; CancellationListener listener; }; diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index d73a249184..822890ae76 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -86,13 +86,7 @@ public: Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_) : - broker(broker_), - outputTasks(*out_), - out(out_), - framemax(65535), - heartbeat(0), - client(0), - stagingThreshold(broker.getStagingThreshold()), + ConnectionState(out_, broker_), adapter(*this), mgmtClosing(0), mgmtId(mgmtId_) @@ -228,17 +222,6 @@ Manageable::status_t Connection::ManagementMethod (uint32_t methodId, return status; } -void Connection::setUserId(const string& uid) -{ - userId = uid; - QPID_LOG (debug, "UserId is " << userId); -} - -const string& Connection::getUserId() const -{ - return userId; -} - Connection::MgmtLink::MgmtLink(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId) : channelCounter(1) { diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h index 99b394dda0..8719a9dfcd 100644 --- a/cpp/src/qpid/broker/Connection.h +++ b/cpp/src/qpid/broker/Connection.h @@ -39,6 +39,7 @@ #include "qpid/sys/Socket.h" #include "qpid/Exception.h" #include "ConnectionHandler.h" +#include "ConnectionState.h" #include "SessionHandler.h" #include "qpid/management/Manageable.h" #include "qpid/management/Client.h" @@ -50,8 +51,7 @@ namespace qpid { namespace broker { class Connection : public sys::ConnectionInputHandler, - public ConnectionToken, - public management::Manageable + public ConnectionState { public: Connection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId); @@ -63,25 +63,6 @@ class Connection : public sys::ConnectionInputHandler, /** Close the connection */ void close(framing::ReplyCode code, const string& text, framing::ClassId classId, framing::MethodId methodId); - sys::ConnectionOutputHandler& getOutput() const { return *out; } - framing::ProtocolVersion getVersion() const { return version; } - - uint32_t getFrameMax() const { return framemax; } - uint16_t getHeartbeat() const { return heartbeat; } - uint64_t getStagingThreshold() const { return stagingThreshold; } - - void setFrameMax(uint32_t fm) { framemax = fm; } - void setHeartbeat(uint16_t hb) { heartbeat = hb; } - void setStagingThreshold(uint64_t st) { stagingThreshold = st; } - - Broker& getBroker() { return broker; } - - Broker& broker; - std::vector<Queue::shared_ptr> exclusiveQueues; - - //contained output tasks - sys::AggregateOutput outputTasks; - // ConnectionInputHandler methods void received(framing::AMQFrame& frame); void initiated(const framing::ProtocolInitiation& header); @@ -98,9 +79,6 @@ class Connection : public sys::ConnectionInputHandler, management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args); - void setUserId(const string& uid); - const string& getUserId() const; - void initMgmt(bool asLink = false); private: @@ -126,17 +104,11 @@ class Connection : public sys::ConnectionInputHandler, class MgmtClient; class MgmtLink; - framing::ProtocolVersion version; ChannelMap channels; - sys::ConnectionOutputHandler* out; - uint32_t framemax; - uint16_t heartbeat; framing::AMQP_ClientProxy::Connection* client; - uint64_t stagingThreshold; ConnectionHandler adapter; std::auto_ptr<MgmtWrapper> mgmtWrapper; bool mgmtClosing; - string userId; const std::string mgmtId; }; diff --git a/cpp/src/qpid/broker/ConnectionFactory.cpp b/cpp/src/qpid/broker/ConnectionFactory.cpp index 9577853de4..a0cd4e35d7 100644 --- a/cpp/src/qpid/broker/ConnectionFactory.cpp +++ b/cpp/src/qpid/broker/ConnectionFactory.cpp @@ -20,6 +20,7 @@ */ #include "ConnectionFactory.h" #include "Connection.h" +#include "MultiVersionConnectionInputHandler.h" namespace qpid { namespace broker { @@ -38,7 +39,7 @@ qpid::sys::ConnectionInputHandler* ConnectionFactory::create(qpid::sys::ConnectionOutputHandler* out, const std::string& id) { - return new Connection(out, broker, id); + return new MultiVersionConnectionInputHandler(out, broker, id); } }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/ConnectionState.h b/cpp/src/qpid/broker/ConnectionState.h new file mode 100644 index 0000000000..691d47d866 --- /dev/null +++ b/cpp/src/qpid/broker/ConnectionState.h @@ -0,0 +1,84 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _ConnectionState_ +#define _ConnectionState_ + +#include <vector> + +#include "qpid/sys/AggregateOutput.h" +#include "qpid/sys/ConnectionOutputHandler.h" +#include "qpid/framing/ProtocolVersion.h" +#include "qpid/management/Manageable.h" +#include "Broker.h" + +namespace qpid { +namespace broker { + +class ConnectionState : public ConnectionToken, public management::Manageable +{ + public: + ConnectionState(qpid::sys::ConnectionOutputHandler* o, Broker& b) : + broker(b), + outputTasks(*o), + out(o), + framemax(65535), + heartbeat(0), + stagingThreshold(broker.getStagingThreshold()) + {} + + + + virtual ~ConnectionState () {} + + uint32_t getFrameMax() const { return framemax; } + uint16_t getHeartbeat() const { return heartbeat; } + uint64_t getStagingThreshold() const { return stagingThreshold; } + + void setFrameMax(uint32_t fm) { framemax = fm; } + void setHeartbeat(uint16_t hb) { heartbeat = hb; } + void setStagingThreshold(uint64_t st) { stagingThreshold = st; } + + void setUserId(const string& uid) { userId = uid; } + const string& getUserId() const { return userId; } + + Broker& getBroker() { return broker; } + + Broker& broker; + std::vector<Queue::shared_ptr> exclusiveQueues; + + //contained output tasks + sys::AggregateOutput outputTasks; + + sys::ConnectionOutputHandler& getOutput() const { return *out; } + framing::ProtocolVersion getVersion() const { return version; } + + protected: + framing::ProtocolVersion version; + sys::ConnectionOutputHandler* out; + uint32_t framemax; + uint16_t heartbeat; + uint64_t stagingThreshold; + string userId; +}; + +}} + +#endif diff --git a/cpp/src/qpid/broker/HandlerImpl.h b/cpp/src/qpid/broker/HandlerImpl.h index 0250805f52..410d400c9d 100644 --- a/cpp/src/qpid/broker/HandlerImpl.h +++ b/cpp/src/qpid/broker/HandlerImpl.h @@ -21,6 +21,7 @@ #include "SemanticState.h" #include "SessionState.h" +#include "ConnectionState.h" namespace qpid { namespace broker { @@ -39,7 +40,7 @@ class HandlerImpl { HandlerImpl(SemanticState& s) : state(s), session(s.getSession()) {} framing::AMQP_ClientProxy& getProxy() { return session.getProxy(); } - Connection& getConnection() { return session.getConnection(); } + ConnectionState& getConnection() { return session.getConnection(); } Broker& getBroker() { return session.getBroker(); } }; diff --git a/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp b/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp new file mode 100644 index 0000000000..676f9e4b3d --- /dev/null +++ b/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp @@ -0,0 +1,92 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "MultiVersionConnectionInputHandler.h" +#include "Connection.h" +#include "PreviewConnection.h" +#include "qpid/framing/reply_exceptions.h" + +namespace qpid { +namespace broker { + +MultiVersionConnectionInputHandler::MultiVersionConnectionInputHandler( + qpid::sys::ConnectionOutputHandler* _out, + Broker& _broker, + const std::string& _id) : linkVersion(99,0), out(_out), broker(_broker), id(_id) {} + + +void MultiVersionConnectionInputHandler::initiated(const qpid::framing::ProtocolInitiation& i) +{ + if (i.getMajor() == 99 && i.getMinor() == 0) { + handler = std::auto_ptr<ConnectionInputHandler>(new PreviewConnection(out, broker, id)); + } else if (i.getMajor() == 0 && i.getMinor() == 10) { + handler = std::auto_ptr<ConnectionInputHandler>(new Connection(out, broker, id)); + } else { + throw qpid::framing::InternalErrorException("Unsupported version: " + i.getVersion().toString()); + } + handler->initiated(i); +} + +void MultiVersionConnectionInputHandler::received(qpid::framing::AMQFrame& f) +{ + check(); + handler->received(f); +} + +void MultiVersionConnectionInputHandler::idleOut() +{ + check(); + handler->idleOut(); +} + +void MultiVersionConnectionInputHandler::idleIn() +{ + check(); + handler->idleIn(); +} + +bool MultiVersionConnectionInputHandler::doOutput() +{ + return check(false) && handler->doOutput(); +} + +qpid::framing::ProtocolInitiation MultiVersionConnectionInputHandler::getInitiation() +{ + return qpid::framing::ProtocolInitiation(linkVersion); +} + +void MultiVersionConnectionInputHandler::closed() +{ + check(); + handler->closed(); +} + +bool MultiVersionConnectionInputHandler::check(bool fail) +{ + if (!handler.get()) { + if (fail) throw qpid::framing::InternalErrorException("Handler not initialised!"); + else return false; + } else { + return true; + } +} + +} +} diff --git a/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h b/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h new file mode 100644 index 0000000000..4301eba57c --- /dev/null +++ b/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h @@ -0,0 +1,60 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _MultiVersionConnectionInputHandler_ +#define _MultiVersionConnectionInputHandler_ + +#include <memory> +#include <string> +#include "qpid/sys/ConnectionInputHandler.h" +#include "qpid/sys/ConnectionOutputHandler.h" +#include "qpid/broker/Broker.h" + +namespace qpid { +namespace broker { + +class MultiVersionConnectionInputHandler : public qpid::sys::ConnectionInputHandler +{ + qpid::framing::ProtocolVersion linkVersion;//version used for inter-broker links + std::auto_ptr<qpid::sys::ConnectionInputHandler> handler; + qpid::sys::ConnectionOutputHandler* out; + Broker& broker; + const std::string id; + + bool check(bool fail = true); + +public: + MultiVersionConnectionInputHandler(qpid::sys::ConnectionOutputHandler* out, Broker& broker, const std::string& id); + virtual ~MultiVersionConnectionInputHandler() {} + + void initiated(const qpid::framing::ProtocolInitiation&); + void received(qpid::framing::AMQFrame&); + void idleOut(); + void idleIn(); + bool doOutput(); + qpid::framing::ProtocolInitiation getInitiation(); + void closed(); +}; + +} +} + + +#endif diff --git a/cpp/src/qpid/broker/PreviewConnection.cpp b/cpp/src/qpid/broker/PreviewConnection.cpp new file mode 100644 index 0000000000..05879a0329 --- /dev/null +++ b/cpp/src/qpid/broker/PreviewConnection.cpp @@ -0,0 +1,327 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "PreviewConnection.h" +#include "SessionState.h" +#include "BrokerAdapter.h" +#include "Bridge.h" +#include "SemanticHandler.h" + +#include "qpid/log/Statement.h" +#include "qpid/ptr_map.h" +#include "qpid/framing/AMQP_ClientProxy.h" +#include "qpid/management/ManagementAgent.h" + +#include <boost/bind.hpp> +#include <boost/ptr_container/ptr_vector.hpp> + +#include <algorithm> +#include <iostream> +#include <assert.h> + +using namespace boost; +using namespace qpid::sys; +using namespace qpid::framing; +using namespace qpid::sys; +using namespace qpid::ptr_map; +using qpid::management::ManagementAgent; +using qpid::management::ManagementObject; +using qpid::management::Manageable; +using qpid::management::Args; + +namespace qpid { +namespace broker { + +class PreviewConnection::MgmtClient : public PreviewConnection::MgmtWrapper +{ + management::Client::shared_ptr mgmtClient; + +public: + MgmtClient(PreviewConnection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId); + ~MgmtClient(); + void received(framing::AMQFrame& frame); + management::ManagementObject::shared_ptr getManagementObject() const; + void closing(); +}; + +class PreviewConnection::MgmtLink : public PreviewConnection::MgmtWrapper +{ + typedef boost::ptr_vector<Bridge> Bridges; + + management::Link::shared_ptr mgmtLink; + Bridges created;//holds list of bridges pending creation + Bridges cancelled;//holds list of bridges pending cancellation + Bridges active;//holds active bridges + uint channelCounter; + sys::Mutex lock; + + void cancel(Bridge*); + +public: + MgmtLink(PreviewConnection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId); + ~MgmtLink(); + void received(framing::AMQFrame& frame); + management::ManagementObject::shared_ptr getManagementObject() const; + void closing(); + void processPending(); + void process(PreviewConnection& connection, const management::Args& args); +}; + + +PreviewConnection::PreviewConnection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_) : + ConnectionState(out_, broker_), + adapter(*this), + mgmtClosing(0), + mgmtId(mgmtId_) +{} + +void PreviewConnection::initMgmt(bool asLink) +{ + Manageable* parent = broker.GetVhostObject (); + + if (parent != 0) + { + ManagementAgent::shared_ptr agent = ManagementAgent::getAgent (); + + if (agent.get () != 0) + { + if (asLink) { + mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtLink(this, parent, agent, mgmtId)); + } else { + mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtClient(this, parent, agent, mgmtId)); + } + } + } +} + +PreviewConnection::~PreviewConnection () {} + +void PreviewConnection::received(framing::AMQFrame& frame){ + if (mgmtClosing) + close (403, "Closed by Management Request", 0, 0); + + if (frame.getChannel() == 0) { + adapter.handle(frame); + } else { + getChannel(frame.getChannel()).in(frame); + } + + if (mgmtWrapper.get()) mgmtWrapper->received(frame); +} + +void PreviewConnection::close( + ReplyCode code, const string& text, ClassId classId, MethodId methodId) +{ + adapter.close(code, text, classId, methodId); + channels.clear(); + getOutput().close(); +} + +void PreviewConnection::initiated(const framing::ProtocolInitiation& header) { + version = ProtocolVersion(header.getMajor(), header.getMinor()); + adapter.init(header); + initMgmt(); +} + +void PreviewConnection::idleOut(){} + +void PreviewConnection::idleIn(){} + +void PreviewConnection::closed(){ // Physically closed, suspend open sessions. + try { + for (ChannelMap::iterator i = channels.begin(); i != channels.end(); ++i) + get_pointer(i)->localSuspend(); + while (!exclusiveQueues.empty()) { + Queue::shared_ptr q(exclusiveQueues.front()); + q->releaseExclusiveOwnership(); + if (q->canAutoDelete()) { + Queue::tryAutoDelete(broker, q); + } + exclusiveQueues.erase(exclusiveQueues.begin()); + } + } catch(std::exception& e) { + QPID_LOG(error, " Unhandled exception while closing session: " << + e.what()); + assert(0); + } +} + +bool PreviewConnection::doOutput() +{ + try{ + //process any pending mgmt commands: + if (mgmtWrapper.get()) mgmtWrapper->processPending(); + + //then do other output as needed: + return outputTasks.doOutput(); + }catch(ConnectionException& e){ + close(e.code, e.what(), 0, 0); + }catch(std::exception& e){ + close(541/*internal error*/, e.what(), 0, 0); + } + return false; +} + +void PreviewConnection::closeChannel(uint16_t id) { + ChannelMap::iterator i = channels.find(id); + if (i != channels.end()) channels.erase(i); +} + +PreviewSessionHandler& PreviewConnection::getChannel(ChannelId id) { + ChannelMap::iterator i=channels.find(id); + if (i == channels.end()) { + i = channels.insert(id, new PreviewSessionHandler(*this, id)).first; + } + return *get_pointer(i); +} + +ManagementObject::shared_ptr PreviewConnection::GetManagementObject (void) const +{ + return mgmtWrapper.get() ? mgmtWrapper->getManagementObject() : ManagementObject::shared_ptr(); +} + +Manageable::status_t PreviewConnection::ManagementMethod (uint32_t methodId, + Args& args) +{ + Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; + + QPID_LOG (debug, "PreviewConnection::ManagementMethod [id=" << methodId << "]"); + + switch (methodId) + { + case management::Client::METHOD_CLOSE : + mgmtClosing = 1; + if (mgmtWrapper.get()) mgmtWrapper->closing(); + status = Manageable::STATUS_OK; + break; + case management::Link::METHOD_BRIDGE : + //queue this up and request chance to do output (i.e. get connections thread of control): + mgmtWrapper->process(*this, args); + out->activateOutput(); + status = Manageable::STATUS_OK; + break; + } + + return status; +} + +PreviewConnection::MgmtLink::MgmtLink(PreviewConnection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId) + : channelCounter(1) +{ + mgmtLink = management::Link::shared_ptr + (new management::Link(conn, parent, mgmtId)); + agent->addObject (mgmtLink); +} + +PreviewConnection::MgmtLink::~MgmtLink() +{ + if (mgmtLink.get () != 0) + mgmtLink->resourceDestroy (); +} + +void PreviewConnection::MgmtLink::received(framing::AMQFrame& frame) +{ + if (mgmtLink.get () != 0) + { + mgmtLink->inc_framesFromPeer (); + mgmtLink->inc_bytesFromPeer (frame.size ()); + } +} + +management::ManagementObject::shared_ptr PreviewConnection::MgmtLink::getManagementObject() const +{ + return dynamic_pointer_cast<ManagementObject>(mgmtLink); +} + +void PreviewConnection::MgmtLink::closing() +{ + if (mgmtLink) mgmtLink->set_closing (1); +} + +void PreviewConnection::MgmtLink::processPending() +{ + //process any pending creates + if (!created.empty()) { + for (Bridges::iterator i = created.begin(); i != created.end(); ++i) { + i->create(); + } + active.transfer(active.end(), created.begin(), created.end(), created); + } + if (!cancelled.empty()) { + //process any pending cancellations + for (Bridges::iterator i = cancelled.begin(); i != cancelled.end(); ++i) { + i->cancel(); + } + cancelled.clear(); + } +} + +void PreviewConnection::MgmtLink::process(PreviewConnection& connection, const management::Args& args) +{ + created.push_back(new Bridge(channelCounter++, connection, + boost::bind(&MgmtLink::cancel, this, _1), + dynamic_cast<const management::ArgsLinkBridge&>(args))); +} + +void PreviewConnection::MgmtLink::cancel(Bridge* b) +{ + //need to take this out the active map and add it to the cancelled map + for (Bridges::iterator i = active.begin(); i != active.end(); i++) { + if (&(*i) == b) { + cancelled.transfer(cancelled.end(), i, active); + break; + } + } +} + +PreviewConnection::MgmtClient::MgmtClient(PreviewConnection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId) +{ + mgmtClient = management::Client::shared_ptr + (new management::Client (conn, parent, mgmtId)); + agent->addObject (mgmtClient); +} + +PreviewConnection::MgmtClient::~MgmtClient() +{ + if (mgmtClient.get () != 0) + mgmtClient->resourceDestroy (); +} + +void PreviewConnection::MgmtClient::received(framing::AMQFrame& frame) +{ + if (mgmtClient.get () != 0) + { + mgmtClient->inc_framesFromClient (); + mgmtClient->inc_bytesFromClient (frame.size ()); + } +} + +management::ManagementObject::shared_ptr PreviewConnection::MgmtClient::getManagementObject() const +{ + return dynamic_pointer_cast<ManagementObject>(mgmtClient); +} + +void PreviewConnection::MgmtClient::closing() +{ + if (mgmtClient) mgmtClient->set_closing (1); +} + +}} + diff --git a/cpp/src/qpid/broker/PreviewConnection.h b/cpp/src/qpid/broker/PreviewConnection.h new file mode 100644 index 0000000000..d6a945c26c --- /dev/null +++ b/cpp/src/qpid/broker/PreviewConnection.h @@ -0,0 +1,118 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _PreviewConnection_ +#define _PreviewConnection_ + +#include <memory> +#include <sstream> +#include <vector> + +#include <boost/ptr_container/ptr_map.hpp> + +#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/AMQP_ServerOperations.h" +#include "qpid/framing/AMQP_ClientProxy.h" +#include "qpid/sys/AggregateOutput.h" +#include "qpid/sys/ConnectionOutputHandler.h" +#include "qpid/sys/ConnectionInputHandler.h" +#include "qpid/sys/TimeoutHandler.h" +#include "qpid/framing/ProtocolVersion.h" +#include "Broker.h" +#include "qpid/sys/Socket.h" +#include "qpid/Exception.h" +#include "PreviewConnectionHandler.h" +#include "ConnectionState.h" +#include "PreviewSessionHandler.h" +#include "qpid/management/Manageable.h" +#include "qpid/management/Client.h" +#include "qpid/management/Link.h" + +#include <boost/ptr_container/ptr_map.hpp> + +namespace qpid { +namespace broker { + +class PreviewConnection : public sys::ConnectionInputHandler, + public ConnectionState +{ + public: + PreviewConnection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId); + ~PreviewConnection (); + + /** Get the PreviewSessionHandler for channel. Create if it does not already exist */ + PreviewSessionHandler& getChannel(framing::ChannelId channel); + + /** Close the connection */ + void close(framing::ReplyCode code, const string& text, framing::ClassId classId, framing::MethodId methodId); + + // ConnectionInputHandler methods + void received(framing::AMQFrame& frame); + void initiated(const framing::ProtocolInitiation& header); + void idleOut(); + void idleIn(); + void closed(); + bool doOutput(); + framing::ProtocolInitiation getInitiation() { return framing::ProtocolInitiation(version); } + + void closeChannel(framing::ChannelId channel); + + // Manageable entry points + management::ManagementObject::shared_ptr GetManagementObject (void) const; + management::Manageable::status_t + ManagementMethod (uint32_t methodId, management::Args& args); + + void initMgmt(bool asLink = false); + + private: + typedef boost::ptr_map<framing::ChannelId, PreviewSessionHandler> ChannelMap; + typedef std::vector<Queue::shared_ptr>::iterator queue_iterator; + + /** + * Connection may appear, for the purposes of management, as a + * normal client initiated connection or as an agent initiated + * inter-broker link. This wrapper abstracts the common interface + * for both. + */ + class MgmtWrapper + { + public: + virtual ~MgmtWrapper(){} + virtual void received(framing::AMQFrame& frame) = 0; + virtual management::ManagementObject::shared_ptr getManagementObject() const = 0; + virtual void closing() = 0; + virtual void processPending(){} + virtual void process(PreviewConnection&, const management::Args&){} + }; + class MgmtClient; + class MgmtLink; + + ChannelMap channels; + framing::AMQP_ClientProxy::Connection* client; + uint64_t stagingThreshold; + PreviewConnectionHandler adapter; + std::auto_ptr<MgmtWrapper> mgmtWrapper; + bool mgmtClosing; + const std::string mgmtId; +}; + +}} + +#endif diff --git a/cpp/src/qpid/broker/PreviewConnectionHandler.cpp b/cpp/src/qpid/broker/PreviewConnectionHandler.cpp new file mode 100644 index 0000000000..c0f0d9f5e0 --- /dev/null +++ b/cpp/src/qpid/broker/PreviewConnectionHandler.cpp @@ -0,0 +1,158 @@ + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "PreviewConnectionHandler.h" +#include "PreviewConnection.h" +#include "qpid/framing/ConnectionStartBody.h" +#include "qpid/framing/ClientInvoker.h" +#include "qpid/framing/ServerInvoker.h" + +using namespace qpid; +using namespace qpid::broker; +using namespace qpid::framing; + + +namespace +{ +const std::string PLAIN = "PLAIN"; +const std::string en_US = "en_US"; +} + +void PreviewConnectionHandler::init(const framing::ProtocolInitiation& header) { + FieldTable properties; + string mechanisms(PLAIN); + string locales(en_US); + handler->serverMode = true; + handler->client.start(header.getMajor(), header.getMinor(), properties, mechanisms, locales); +} + +void PreviewConnectionHandler::close(ReplyCode code, const string& text, ClassId classId, MethodId methodId) +{ + handler->client.close(code, text, classId, methodId); +} + +void PreviewConnectionHandler::handle(framing::AMQFrame& frame) +{ + AMQMethodBody* method=frame.getBody()->getMethod(); + try{ + if (handler->serverMode) { + if (!invoke(static_cast<AMQP_ServerOperations::ConnectionHandler&>(*handler.get()), *method)) + throw ChannelErrorException(QPID_MSG("Class can't be accessed over channel 0")); + } else { + if (!invoke(static_cast<AMQP_ClientOperations::ConnectionHandler&>(*handler.get()), *method)) + throw ChannelErrorException(QPID_MSG("Class can't be accessed over channel 0")); + } + }catch(ConnectionException& e){ + handler->client.close(e.code, e.what(), method->amqpClassId(), method->amqpMethodId()); + }catch(std::exception& e){ + handler->client.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId()); + } +} + +PreviewConnectionHandler::PreviewConnectionHandler(PreviewConnection& connection) : handler(new Handler(connection)) {} + +PreviewConnectionHandler::Handler:: Handler(PreviewConnection& c) : client(c.getOutput()), server(c.getOutput()), + connection(c), serverMode(false) {} + +void PreviewConnectionHandler::Handler::startOk(const framing::FieldTable& /*clientProperties*/, + const string& mechanism, + const string& response, const string& /*locale*/) +{ + //TODO: handle SASL mechanisms more cleverly + if (mechanism == PLAIN) { + if (response.size() > 0 && response[0] == (char) 0) { + string temp = response.substr(1); + string::size_type i = temp.find((char)0); + string uid = temp.substr(0, i); + string pwd = temp.substr(i + 1); + //TODO: authentication + connection.setUserId(uid); + } + } + client.tune(framing::CHANNEL_MAX, connection.getFrameMax(), connection.getHeartbeat()); +} + +void PreviewConnectionHandler::Handler::secureOk(const string& /*response*/){} + +void PreviewConnectionHandler::Handler::tuneOk(uint16_t /*channelmax*/, + uint32_t framemax, uint16_t heartbeat) +{ + connection.setFrameMax(framemax); + connection.setHeartbeat(heartbeat); +} + +void PreviewConnectionHandler::Handler::open(const string& /*virtualHost*/, + const string& /*capabilities*/, bool /*insist*/) +{ + string knownhosts; + client.openOk(knownhosts); +} + + +void PreviewConnectionHandler::Handler::close(uint16_t /*replyCode*/, const string& /*replyText*/, + uint16_t /*classId*/, uint16_t /*methodId*/) +{ + client.closeOk(); + connection.getOutput().close(); +} + +void PreviewConnectionHandler::Handler::closeOk(){ + connection.getOutput().close(); +} + + +void PreviewConnectionHandler::Handler::start(uint8_t /*versionMajor*/, + uint8_t /*versionMinor*/, + const FieldTable& /*serverProperties*/, + const string& /*mechanisms*/, + const string& /*locales*/) +{ + string uid = "qpidd"; + string pwd = "qpidd"; + string response = ((char)0) + uid + ((char)0) + pwd; + server.startOk(FieldTable(), PLAIN, response, en_US); + connection.initMgmt(true); +} + +void PreviewConnectionHandler::Handler::secure(const string& /*challenge*/) +{ + server.secureOk(""); +} + +void PreviewConnectionHandler::Handler::tune(uint16_t channelMax, + uint32_t frameMax, + uint16_t heartbeat) +{ + connection.setFrameMax(frameMax); + connection.setHeartbeat(heartbeat); + server.tuneOk(channelMax, frameMax, heartbeat); + server.open("/", "", true); +} + +void PreviewConnectionHandler::Handler::openOk(const string& /*knownHosts*/) +{ +} + +void PreviewConnectionHandler::Handler::redirect(const string& /*host*/, const string& /*knownHosts*/) +{ + +} diff --git a/cpp/src/qpid/broker/PreviewConnectionHandler.h b/cpp/src/qpid/broker/PreviewConnectionHandler.h new file mode 100644 index 0000000000..93901dd492 --- /dev/null +++ b/cpp/src/qpid/broker/PreviewConnectionHandler.h @@ -0,0 +1,92 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _PreviewConnectionAdapter_ +#define _PreviewConnectionAdapter_ + +#include <memory> +#include "qpid/framing/amqp_types.h" +#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/AMQP_ClientOperations.h" +#include "qpid/framing/AMQP_ClientProxy.h" +#include "qpid/framing/AMQP_ServerOperations.h" +#include "qpid/framing/AMQP_ServerProxy.h" +#include "qpid/framing/FrameHandler.h" +#include "qpid/framing/ProtocolInitiation.h" +#include "qpid/framing/ProtocolVersion.h" +#include "qpid/Exception.h" + +namespace qpid { +namespace broker { + +class PreviewConnection; + +// TODO aconway 2007-09-18: Rename to ConnectionHandler +class PreviewConnectionHandler : public framing::FrameHandler +{ + struct Handler : public framing::AMQP_ServerOperations::ConnectionHandler, + public framing::AMQP_ClientOperations::ConnectionHandler + { + framing::AMQP_ClientProxy::Connection client; + framing::AMQP_ServerProxy::Connection server; + PreviewConnection& connection; + bool serverMode; + + Handler(PreviewConnection& connection); + void startOk(const qpid::framing::FieldTable& clientProperties, + const std::string& mechanism, const std::string& response, + const std::string& locale); + void secureOk(const std::string& response); + void tuneOk(uint16_t channelMax, uint32_t frameMax, uint16_t heartbeat); + void open(const std::string& virtualHost, + const std::string& capabilities, bool insist); + void close(uint16_t replyCode, const std::string& replyText, + uint16_t classId, uint16_t methodId); + void closeOk(); + + + void start(uint8_t versionMajor, + uint8_t versionMinor, + const qpid::framing::FieldTable& serverProperties, + const std::string& mechanisms, + const std::string& locales); + + void secure(const std::string& challenge); + + void tune(uint16_t channelMax, + uint32_t frameMax, + uint16_t heartbeat); + + void openOk(const std::string& knownHosts); + + void redirect(const std::string& host, const std::string& knownHosts); + }; + std::auto_ptr<Handler> handler; + public: + PreviewConnectionHandler(PreviewConnection& connection); + void init(const framing::ProtocolInitiation& header); + void close(framing::ReplyCode code, const std::string& text, framing::ClassId classId, framing::MethodId methodId); + void handle(framing::AMQFrame& frame); +}; + + +}} + +#endif diff --git a/cpp/src/qpid/broker/PreviewSessionHandler.cpp b/cpp/src/qpid/broker/PreviewSessionHandler.cpp new file mode 100644 index 0000000000..19e6a235c4 --- /dev/null +++ b/cpp/src/qpid/broker/PreviewSessionHandler.cpp @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "PreviewSessionHandler.h" +#include "SessionState.h" +#include "PreviewConnection.h" +#include "qpid/framing/reply_exceptions.h" +#include "qpid/framing/constants.h" +#include "qpid/framing/ClientInvoker.h" +#include "qpid/framing/ServerInvoker.h" +#include "qpid/log/Statement.h" + +#include <boost/bind.hpp> + +namespace qpid { +namespace broker { +using namespace framing; +using namespace std; +using namespace qpid::sys; + +PreviewSessionHandler::PreviewSessionHandler(PreviewConnection& c, ChannelId ch) + : SessionContext(c.getOutput()), + connection(c), channel(ch, &c.getOutput()), + proxy(out), // Via my own handleOut() for L2 data. + peerSession(channel), // Direct to channel for L2 commands. + ignoring(false) {} + +PreviewSessionHandler::~PreviewSessionHandler() {} + +namespace { +ClassId classId(AMQMethodBody* m) { return m ? m->amqpMethodId() : 0; } +MethodId methodId(AMQMethodBody* m) { return m ? m->amqpClassId() : 0; } +} // namespace + +void PreviewSessionHandler::handleIn(AMQFrame& f) { + // Note on channel states: a channel is open if session != 0. A + // channel that is closed (session == 0) can be in the "ignoring" + // state. This is a temporary state after we have sent a channel + // exception, where extra frames might arrive that should be + // ignored. + // + AMQMethodBody* m = f.getBody()->getMethod(); + try { + if (m && invoke(static_cast<AMQP_ServerOperations::SessionHandler&>(*this), *m)) { + return; + } else if (session.get()) { + boost::optional<SequenceNumber> ack=session->received(f); + session->in.handle(f); + if (ack) + peerSession.ack(*ack, SequenceNumberSet()); + } else if (m && invoke(static_cast<AMQP_ClientOperations::SessionHandler&>(*this), *m)) { + return; + } else if (!ignoring) { + throw ChannelErrorException( + QPID_MSG("Channel " << channel.get() << " is not open")); + } + } catch(const ChannelException& e) { + ignoring=true; // Ignore trailing frames sent by client. + session->detach(); + session.reset(); + peerSession.closed(e.code, e.what()); + }catch(const ConnectionException& e){ + connection.close(e.code, e.what(), classId(m), methodId(m)); + }catch(const std::exception& e){ + connection.close( + framing::INTERNAL_ERROR, e.what(), classId(m), methodId(m)); + } +} + +void PreviewSessionHandler::handleOut(AMQFrame& f) { + channel.handle(f); // Send it. + if (session->sent(f)) + peerSession.solicitAck(); +} + +void PreviewSessionHandler::assertAttached(const char* method) const { + if (!session.get()) + throw ChannelErrorException( + QPID_MSG(method << " failed: No session for channel " + << getChannel())); +} + +void PreviewSessionHandler::assertClosed(const char* method) const { + if (session.get()) + throw ChannelBusyException( + QPID_MSG(method << " failed: channel " << channel.get() + << " is already open.")); +} + +void PreviewSessionHandler::open(uint32_t detachedLifetime) { + assertClosed("open"); + std::auto_ptr<SessionState> state( + connection.broker.getSessionManager().open(*this, detachedLifetime)); + session.reset(state.release()); + peerSession.attached(session->getId(), session->getTimeout()); +} + +void PreviewSessionHandler::resume(const Uuid& id) { + assertClosed("resume"); + session = connection.broker.getSessionManager().resume(id); + session->attach(*this); + SequenceNumber seq = session->resuming(); + peerSession.attached(session->getId(), session->getTimeout()); + proxy.getSession().ack(seq, SequenceNumberSet()); +} + +void PreviewSessionHandler::flow(bool /*active*/) { + assertAttached("flow"); + // TODO aconway 2007-09-19: Removed in 0-10, remove + assert(0); throw NotImplementedException("session.flow"); +} + +void PreviewSessionHandler::flowOk(bool /*active*/) { + assertAttached("flowOk"); + // TODO aconway 2007-09-19: Removed in 0-10, remove + assert(0); throw NotImplementedException("session.flowOk"); +} + +void PreviewSessionHandler::close() { + assertAttached("close"); + QPID_LOG(info, "Received session.close"); + ignoring=false; + session->detach(); + session.reset(); + peerSession.closed(REPLY_SUCCESS, "ok"); + assert(&connection.getChannel(channel.get()) == this); + connection.closeChannel(channel.get()); +} + +void PreviewSessionHandler::closed(uint16_t replyCode, const string& replyText) { + QPID_LOG(warning, "Received session.closed: "<<replyCode<<" "<<replyText); + ignoring=false; + session->detach(); + session.reset(); +} + +void PreviewSessionHandler::localSuspend() { + if (session.get() && session->isAttached()) { + session->detach(); + connection.broker.getSessionManager().suspend(session); + session.reset(); + } +} + +void PreviewSessionHandler::suspend() { + assertAttached("suspend"); + localSuspend(); + peerSession.detached(); + assert(&connection.getChannel(channel.get()) == this); + connection.closeChannel(channel.get()); +} + +void PreviewSessionHandler::ack(uint32_t cumulativeSeenMark, + const SequenceNumberSet& /*seenFrameSet*/) +{ + assertAttached("ack"); + if (session->getState() == SessionState::RESUMING) { + session->receivedAck(cumulativeSeenMark); + framing::SessionState::Replay replay=session->replay(); + std::for_each(replay.begin(), replay.end(), + boost::bind(&PreviewSessionHandler::handleOut, this, _1)); + } + else + session->receivedAck(cumulativeSeenMark); +} + +void PreviewSessionHandler::highWaterMark(uint32_t /*lastSentMark*/) { + // TODO aconway 2007-10-02: may be removed from spec. + assert(0); throw NotImplementedException("session.high-water-mark"); +} + +void PreviewSessionHandler::solicitAck() { + assertAttached("solicit-ack"); + peerSession.ack(session->sendingAck(), SequenceNumberSet()); +} + +void PreviewSessionHandler::attached(const Uuid& /*sessionId*/, uint32_t detachedLifetime) +{ + std::auto_ptr<SessionState> state( + connection.broker.getSessionManager().open(*this, detachedLifetime)); + session.reset(state.release()); +} + +void PreviewSessionHandler::detached() +{ + connection.broker.getSessionManager().suspend(session); + session.reset(); +} + +ConnectionState& PreviewSessionHandler::getConnection() { return connection; } +const ConnectionState& PreviewSessionHandler::getConnection() const { return connection; } + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/PreviewSessionHandler.h b/cpp/src/qpid/broker/PreviewSessionHandler.h new file mode 100644 index 0000000000..e1096ebf9f --- /dev/null +++ b/cpp/src/qpid/broker/PreviewSessionHandler.h @@ -0,0 +1,111 @@ +#ifndef QPID_BROKER_PREVIEWSESSIONHANDLER_H +#define QPID_BROKER_PREVIEWSESSIONHANDLER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/framing/FrameHandler.h" +#include "qpid/framing/AMQP_ClientOperations.h" +#include "qpid/framing/AMQP_ServerOperations.h" +#include "qpid/framing/AMQP_ClientProxy.h" +#include "qpid/framing/amqp_types.h" +#include "qpid/framing/ChannelHandler.h" +#include "SessionContext.h" + +#include <boost/noncopyable.hpp> + +namespace qpid { +namespace broker { + +class PreviewConnection; +class SessionState; + +/** + * A SessionHandler is associated with each active channel. It + * receives incoming frames, handles session commands and manages the + * association between the channel and a session. + */ +class PreviewSessionHandler : public framing::AMQP_ServerOperations::SessionHandler, + public framing::AMQP_ClientOperations::SessionHandler, + public SessionContext, + private boost::noncopyable +{ + public: + PreviewSessionHandler(PreviewConnection&, framing::ChannelId); + ~PreviewSessionHandler(); + + /** Returns 0 if not attached to a session */ + SessionState* getSession() { return session.get(); } + const SessionState* getSession() const { return session.get(); } + + framing::ChannelId getChannel() const { return channel.get(); } + + ConnectionState& getConnection(); + const ConnectionState& getConnection() const; + + framing::AMQP_ClientProxy& getProxy() { return proxy; } + const framing::AMQP_ClientProxy& getProxy() const { return proxy; } + + // Called by closing connection. + void localSuspend(); + void detach() { localSuspend(); } + + protected: + void handleIn(framing::AMQFrame&); + void handleOut(framing::AMQFrame&); + + private: + /// Session methods + void open(uint32_t detachedLifetime); + void flow(bool active); + void flowOk(bool active); + void close(); + void closed(uint16_t replyCode, const std::string& replyText); + void resume(const framing::Uuid& sessionId); + void suspend(); + void ack(uint32_t cumulativeSeenMark, + const framing::SequenceNumberSet& seenFrameSet); + void highWaterMark(uint32_t lastSentMark); + void solicitAck(); + + //extra methods required for assuming client role + void attached(const framing::Uuid& sessionId, uint32_t detachedLifetime); + void detached(); + + + void assertAttached(const char* method) const; + void assertActive(const char* method) const; + void assertClosed(const char* method) const; + + + PreviewConnection& connection; + framing::ChannelHandler channel; + framing::AMQP_ClientProxy proxy; + framing::AMQP_ClientProxy::Session peerSession; + bool ignoring; + std::auto_ptr<SessionState> session; +}; + +}} // namespace qpid::broker + + + +#endif /*!QPID_BROKER_SESSIONHANDLER_H*/ diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp index 32c032e701..2a79496144 100644 --- a/cpp/src/qpid/broker/SemanticHandler.cpp +++ b/cpp/src/qpid/broker/SemanticHandler.cpp @@ -21,11 +21,10 @@ #include "SemanticHandler.h" #include "SemanticState.h" -#include "SessionHandler.h" +#include "SessionContext.h" #include "SessionState.h" #include "BrokerAdapter.h" #include "MessageDelivery.h" -#include "Connection.h" #include "qpid/framing/ExecutionCompleteBody.h" #include "qpid/framing/ExecutionResultBody.h" #include "qpid/framing/ServerInvoker.h" @@ -165,7 +164,7 @@ void SemanticHandler::handleContent(AMQFrame& frame) DeliveryId SemanticHandler::deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token) { - SessionHandler* handler = session.getHandler(); + SessionContext* handler = session.getHandler(); if (handler) { uint32_t maxFrameSize = handler->getConnection().getFrameMax(); MessageDelivery::deliver(msg, handler->out, ++outgoing.hwm, token, maxFrameSize); diff --git a/cpp/src/qpid/broker/SemanticHandler.h b/cpp/src/qpid/broker/SemanticHandler.h index 52dfa4dcf9..d7f3ec8799 100644 --- a/cpp/src/qpid/broker/SemanticHandler.h +++ b/cpp/src/qpid/broker/SemanticHandler.h @@ -77,7 +77,7 @@ class SemanticHandler : public DeliveryAdapter, DeliveryId deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token); framing::AMQP_ClientProxy& getProxy() { return session.getProxy(); } - Connection& getConnection() { return session.getConnection(); } + //Connection& getConnection() { return session.getConnection(); } Broker& getBroker() { return session.getBroker(); } public: diff --git a/cpp/src/qpid/broker/SessionContext.h b/cpp/src/qpid/broker/SessionContext.h new file mode 100644 index 0000000000..a27b43cf65 --- /dev/null +++ b/cpp/src/qpid/broker/SessionContext.h @@ -0,0 +1,53 @@ +#ifndef QPID_BROKER_SESSIONCONTEXT_H +#define QPID_BROKER_SESSIONCONTEXT_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/framing/FrameHandler.h" +#include "qpid/framing/AMQP_ClientProxy.h" +#include "qpid/framing/amqp_types.h" +#include "ConnectionState.h" + + +#include <boost/noncopyable.hpp> + +namespace qpid { +namespace broker { + +class SessionContext : public framing::FrameHandler::InOutHandler +{ + public: + SessionContext(qpid::framing::OutputHandler& out) : InOutHandler(0, &out) {} + virtual ~SessionContext(){} + virtual ConnectionState& getConnection() = 0; + virtual const ConnectionState& getConnection() const = 0; + virtual framing::AMQP_ClientProxy& getProxy() = 0; + virtual const framing::AMQP_ClientProxy& getProxy() const = 0; + virtual void detach() = 0; + virtual framing::ChannelId getChannel() const = 0; +}; + +}} // namespace qpid::broker + + + +#endif /*!QPID_BROKER_SESSIONCONTEXT_H*/ diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp index fb46cb522d..1cb10d0c19 100644 --- a/cpp/src/qpid/broker/SessionHandler.cpp +++ b/cpp/src/qpid/broker/SessionHandler.cpp @@ -36,7 +36,7 @@ using namespace std; using namespace qpid::sys; SessionHandler::SessionHandler(Connection& c, ChannelId ch) - : InOutHandler(0, &c.getOutput()), + : SessionContext(c.getOutput()), connection(c), channel(ch, &c.getOutput()), proxy(out), // Via my own handleOut() for L2 data. peerSession(channel), // Direct to channel for L2 commands. @@ -204,4 +204,8 @@ void SessionHandler::detached() session.reset(); } + +ConnectionState& SessionHandler::getConnection() { return connection; } +const ConnectionState& SessionHandler::getConnection() const { return connection; } + }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SessionHandler.h b/cpp/src/qpid/broker/SessionHandler.h index 6f6f5e941f..5a72bfb12d 100644 --- a/cpp/src/qpid/broker/SessionHandler.h +++ b/cpp/src/qpid/broker/SessionHandler.h @@ -28,6 +28,7 @@ #include "qpid/framing/AMQP_ClientProxy.h" #include "qpid/framing/amqp_types.h" #include "qpid/framing/ChannelHandler.h" +#include "SessionContext.h" #include <boost/noncopyable.hpp> @@ -42,9 +43,9 @@ class SessionState; * receives incoming frames, handles session commands and manages the * association between the channel and a session. */ -class SessionHandler : public framing::FrameHandler::InOutHandler, - public framing::AMQP_ServerOperations::SessionHandler, +class SessionHandler : public framing::AMQP_ServerOperations::SessionHandler, public framing::AMQP_ClientOperations::SessionHandler, + public SessionContext, private boost::noncopyable { public: @@ -57,14 +58,15 @@ class SessionHandler : public framing::FrameHandler::InOutHandler, framing::ChannelId getChannel() const { return channel.get(); } - Connection& getConnection() { return connection; } - const Connection& getConnection() const { return connection; } + ConnectionState& getConnection(); + const ConnectionState& getConnection() const; framing::AMQP_ClientProxy& getProxy() { return proxy; } const framing::AMQP_ClientProxy& getProxy() const { return proxy; } // Called by closing connection. void localSuspend(); + void detach() { localSuspend(); } protected: void handleIn(framing::AMQFrame&); diff --git a/cpp/src/qpid/broker/SessionManager.cpp b/cpp/src/qpid/broker/SessionManager.cpp index 571d3365db..aa7ac9a8bb 100644 --- a/cpp/src/qpid/broker/SessionManager.cpp +++ b/cpp/src/qpid/broker/SessionManager.cpp @@ -45,7 +45,7 @@ SessionManager::~SessionManager() {} // FIXME aconway 2008-02-01: pass handler*, allow open unattached. std::auto_ptr<SessionState> SessionManager::open( - SessionHandler& h, uint32_t timeout_) + SessionContext& h, uint32_t timeout_) { Mutex::ScopedLock l(lock); std::auto_ptr<SessionState> session( diff --git a/cpp/src/qpid/broker/SessionManager.h b/cpp/src/qpid/broker/SessionManager.h index bb61f5a8be..94956a83ed 100644 --- a/cpp/src/qpid/broker/SessionManager.h +++ b/cpp/src/qpid/broker/SessionManager.h @@ -38,7 +38,7 @@ namespace qpid { namespace broker { class SessionState; -class SessionHandler; +class SessionContext; /** * Create and manage SessionState objects. @@ -57,7 +57,7 @@ class SessionManager : private boost::noncopyable { ~SessionManager(); /** Open a new active session, caller takes ownership */ - std::auto_ptr<SessionState> open(SessionHandler& h, uint32_t timeout_); + std::auto_ptr<SessionState> open(SessionContext& c, uint32_t timeout_); /** Suspend a session, start it's timeout counter. * The factory takes ownership. diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index 80fafe0386..b6c59cfb3b 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -20,8 +20,8 @@ */ #include "SessionState.h" #include "SessionManager.h" -#include "SessionHandler.h" -#include "Connection.h" +#include "SessionContext.h" +#include "ConnectionState.h" #include "Broker.h" #include "SemanticHandler.h" #include "qpid/framing/reply_exceptions.h" @@ -37,7 +37,7 @@ using qpid::management::Manageable; using qpid::management::Args; SessionState::SessionState( - SessionManager* f, SessionHandler* h, uint32_t timeout_, uint32_t ack) + SessionManager* f, SessionContext* h, uint32_t timeout_, uint32_t ack) : framing::SessionState(ack, timeout_ > 0), factory(f), handler(h), id(true), timeout(timeout_), broker(h->getConnection().broker), @@ -76,7 +76,7 @@ SessionState::~SessionState() { mgmtObject->resourceDestroy (); } -SessionHandler* SessionState::getHandler() { +SessionContext* SessionState::getHandler() { return handler; } @@ -85,7 +85,7 @@ AMQP_ClientProxy& SessionState::getProxy() { return getHandler()->getProxy(); } -Connection& SessionState::getConnection() { +ConnectionState& SessionState::getConnection() { assert(isAttached()); return getHandler()->getConnection(); } @@ -100,7 +100,7 @@ void SessionState::detach() { } } -void SessionState::attach(SessionHandler& h) { +void SessionState::attach(SessionContext& h) { { Mutex::ScopedLock l(lock); handler = &h; @@ -141,7 +141,7 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId, case management::Session::METHOD_DETACH : if (handler != 0) { - handler->localSuspend (); + handler->detach(); } status = Manageable::STATUS_OK; break; diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h index bc1b974eaa..8a12e580b7 100644 --- a/cpp/src/qpid/broker/SessionState.h +++ b/cpp/src/qpid/broker/SessionState.h @@ -48,10 +48,10 @@ class AMQP_ClientProxy; namespace broker { class SemanticHandler; -class SessionHandler; +class SessionContext; class SessionManager; class Broker; -class Connection; +class ConnectionState; /** * Broker-side session state includes sessions handler chains, which may @@ -67,16 +67,16 @@ class SessionState : public framing::SessionState, bool isAttached() { return handler; } void detach(); - void attach(SessionHandler& handler); + void attach(SessionContext& handler); - SessionHandler* getHandler(); + SessionContext* getHandler(); /** @pre isAttached() */ framing::AMQP_ClientProxy& getProxy(); /** @pre isAttached() */ - Connection& getConnection(); + ConnectionState& getConnection(); uint32_t getTimeout() const { return timeout; } Broker& getBroker() { return broker; } @@ -92,14 +92,14 @@ class SessionState : public framing::SessionState, // Normally SessionManager creates sessions. SessionState(SessionManager*, - SessionHandler* out, + SessionContext* out, uint32_t timeout, uint32_t ackInterval); private: SessionManager* factory; - SessionHandler* handler; + SessionContext* handler; framing::Uuid id; uint32_t timeout; sys::AbsTime expiry; // Used by SessionManager. diff --git a/cpp/src/qpid/client/Channel.cpp b/cpp/src/qpid/client/Channel.cpp index a6875fcb63..4af69c8552 100644 --- a/cpp/src/qpid/client/Channel.cpp +++ b/cpp/src/qpid/client/Channel.cpp @@ -46,9 +46,9 @@ const std::string empty; class ScopedSync { - Session_0_10& session; + Session& session; public: - ScopedSync(Session_0_10& s, bool enabled = true) : session(s) { session.setSynchronous(enabled); } + ScopedSync(Session& s, bool enabled = true) : session(s) { session.setSynchronous(enabled); } ~ScopedSync() { session.setSynchronous(false); } }; @@ -63,7 +63,7 @@ Channel::~Channel() join(); } -void Channel::open(const Session_0_10& s) +void Channel::open(const Session& s) { Mutex::ScopedLock l(stopLock); if (isOpen()) diff --git a/cpp/src/qpid/client/Channel.h b/cpp/src/qpid/client/Channel.h index 35e306e9b5..2cda97dc63 100644 --- a/cpp/src/qpid/client/Channel.h +++ b/cpp/src/qpid/client/Channel.h @@ -29,7 +29,7 @@ #include "Message.h" #include "Queue.h" #include "ConnectionImpl.h" -#include "qpid/client/Session_0_10.h" +#include "qpid/client/Session.h" #include "qpid/Exception.h" #include "qpid/sys/Mutex.h" #include "qpid/sys/Runnable.h" @@ -79,7 +79,7 @@ class Channel : private sys::Runnable bool running; ConsumerMap consumers; - Session_0_10 session; + Session session; framing::ChannelId channelId; sys::BlockingQueue<framing::FrameSet::shared_ptr> gets; framing::Uuid uniqueId; @@ -88,7 +88,7 @@ class Channel : private sys::Runnable void stop(); - void open(const Session_0_10& session); + void open(const Session& session); void closeInternal(); void join(); diff --git a/cpp/src/qpid/client/Connection.cpp b/cpp/src/qpid/client/Connection.cpp index 26113c1254..872e04b3b5 100644 --- a/cpp/src/qpid/client/Connection.cpp +++ b/cpp/src/qpid/client/Connection.cpp @@ -73,7 +73,7 @@ void Connection::openChannel(Channel& channel) { channel.open(newSession(ASYNC)); } -Session_0_10 Connection::newSession(SynchronousMode sync, +Session Connection::newSession(SynchronousMode sync, uint32_t detachedLifetime) { shared_ptr<SessionCore> core( @@ -81,10 +81,10 @@ Session_0_10 Connection::newSession(SynchronousMode sync, core->setSync(sync); impl->addSession(core); core->open(detachedLifetime); - return Session_0_10(core); + return Session(core); } -void Connection::resume(Session_0_10& session) { +void Connection::resume(Session& session) { session.impl->setChannel(++channelIdCounter); impl->addSession(session.impl); session.impl->resume(impl); diff --git a/cpp/src/qpid/client/Connection.h b/cpp/src/qpid/client/Connection.h index e6bfbddef6..81d9b972b6 100644 --- a/cpp/src/qpid/client/Connection.h +++ b/cpp/src/qpid/client/Connection.h @@ -25,7 +25,7 @@ #include <string> #include "Channel.h" #include "ConnectionImpl.h" -#include "qpid/client/Session_0_10.h" +#include "qpid/client/Session.h" #include "qpid/framing/AMQP_HighestVersion.h" #include "qpid/framing/Uuid.h" @@ -134,13 +134,13 @@ class Connection * that the broker may discard the session state. Default is 0, * meaning the session cannot be resumed. */ - Session_0_10 newSession(SynchronousMode sync, uint32_t detachedLifetime=0); + Session newSession(SynchronousMode sync, uint32_t detachedLifetime=0); /** * Resume a suspendded session. A session may be resumed * on a different connection to the one that created it. */ - void resume(Session_0_10& session); + void resume(Session& session); }; }} // namespace qpid::client diff --git a/cpp/src/qpid/client/Dispatcher.cpp b/cpp/src/qpid/client/Dispatcher.cpp index 8df4637c88..2484dabf1f 100644 --- a/cpp/src/qpid/client/Dispatcher.cpp +++ b/cpp/src/qpid/client/Dispatcher.cpp @@ -20,7 +20,6 @@ */ #include "Dispatcher.h" -#include "qpid/client/Session_0_10.h" #include "qpid/framing/FrameSet.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/log/Statement.h" @@ -38,7 +37,7 @@ using qpid::sys::Thread; namespace qpid { namespace client { -Subscriber::Subscriber(Session_0_10& s, MessageListener* l, AckPolicy a) : session(s), listener(l), autoAck(a) {} +Subscriber::Subscriber(Session& s, MessageListener* l, AckPolicy a) : session(s), listener(l), autoAck(a) {} void Subscriber::received(Message& msg) { @@ -48,7 +47,7 @@ void Subscriber::received(Message& msg) } } -Dispatcher::Dispatcher(Session_0_10& s, const std::string& q) +Dispatcher::Dispatcher(Session& s, const std::string& q) : session(s), running(false), autoStop(true) { queue = q.empty() ? diff --git a/cpp/src/qpid/client/Dispatcher.h b/cpp/src/qpid/client/Dispatcher.h index ae67e61299..e23d0c198c 100644 --- a/cpp/src/qpid/client/Dispatcher.h +++ b/cpp/src/qpid/client/Dispatcher.h @@ -25,6 +25,7 @@ #include <memory> #include <string> #include <boost/shared_ptr.hpp> +#include "qpid/client/Session.h" #include "qpid/sys/Mutex.h" #include "qpid/sys/Runnable.h" #include "qpid/sys/Thread.h" @@ -34,17 +35,15 @@ namespace qpid { namespace client { -class Session_0_10; - class Subscriber : public MessageListener { - Session_0_10& session; + Session& session; MessageListener* const listener; AckPolicy autoAck; public: typedef boost::shared_ptr<Subscriber> shared_ptr; - Subscriber(Session_0_10& session, MessageListener* listener, AckPolicy); + Subscriber(Session& session, MessageListener* listener, AckPolicy); void received(Message& msg); }; @@ -56,7 +55,7 @@ class Dispatcher : public sys::Runnable typedef std::map<std::string, Subscriber::shared_ptr> Listeners; sys::Mutex lock; sys::Thread worker; - Session_0_10& session; + Session& session; Demux::QueuePtr queue; bool running; bool autoStop; @@ -68,7 +67,7 @@ class Dispatcher : public sys::Runnable bool isStopped(); public: - Dispatcher(Session_0_10& session, const std::string& queue = ""); + Dispatcher(Session& session, const std::string& queue = ""); void start(); void run(); diff --git a/cpp/src/qpid/client/LocalQueue.h b/cpp/src/qpid/client/LocalQueue.h index eba28f6599..f8b2c2e0b3 100644 --- a/cpp/src/qpid/client/LocalQueue.h +++ b/cpp/src/qpid/client/LocalQueue.h @@ -50,7 +50,7 @@ class LocalQueue private: friend class SubscriptionManager; - Session_0_10 session; + Session session; Demux::QueuePtr queue; AckPolicy autoAck; }; diff --git a/cpp/src/qpid/client/Message.h b/cpp/src/qpid/client/Message.h index 86404ac792..daac30ba36 100644 --- a/cpp/src/qpid/client/Message.h +++ b/cpp/src/qpid/client/Message.h @@ -22,7 +22,7 @@ * */ #include <string> -#include "qpid/client/Session_0_10.h" +#include "qpid/client/Session.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/framing/TransferContent.h" @@ -63,18 +63,18 @@ public: return getMessageProperties().getApplicationHeaders(); } - void acknowledge(Session_0_10& session, bool cumulative = true, bool send = true) const + void acknowledge(Session& session, bool cumulative = true, bool send = true) const { session.getExecution().completed(id, cumulative, send); } void acknowledge(bool cumulative = true, bool send = true) const { - const_cast<Session_0_10&>(session).getExecution().completed(id, cumulative, send); + const_cast<Session&>(session).getExecution().completed(id, cumulative, send); } /**@internal for incoming messages */ - Message(const framing::FrameSet& frameset, Session_0_10 s) : + Message(const framing::FrameSet& frameset, Session s) : method(*frameset.as<framing::MessageTransferBody>()), id(frameset.getId()), session(s) { populate(frameset); @@ -91,12 +91,12 @@ public: } /**@internal use for incoming messages. */ - void setSession(Session_0_10 s) { session=s; } + void setSession(Session s) { session=s; } private: //method and id are only set for received messages: framing::MessageTransferBody method; framing::SequenceNumber id; - Session_0_10 session; + Session session; }; }} diff --git a/cpp/src/qpid/client/Session.h b/cpp/src/qpid/client/Session.h index 11105dcd36..3293af60fe 100644 --- a/cpp/src/qpid/client/Session.h +++ b/cpp/src/qpid/client/Session.h @@ -21,7 +21,7 @@ * under the License. * */ -#include "qpid/client/Session_0_10.h" +#include "qpid/client/Session_99_0.h" namespace qpid { namespace client { @@ -31,7 +31,7 @@ namespace client { * * \ingroup clientapi */ -typedef Session_0_10 Session; +typedef Session_99_0 Session; }} // namespace qpid::client diff --git a/cpp/src/qpid/client/SubscriptionManager.cpp b/cpp/src/qpid/client/SubscriptionManager.cpp index 7289997a69..f14344225c 100644 --- a/cpp/src/qpid/client/SubscriptionManager.cpp +++ b/cpp/src/qpid/client/SubscriptionManager.cpp @@ -23,7 +23,7 @@ #include "SubscriptionManager.h" #include <qpid/client/Dispatcher.h> -#include <qpid/client/Session_0_10.h> +#include <qpid/client/Session.h> #include <qpid/client/MessageListener.h> #include <set> #include <sstream> @@ -32,7 +32,7 @@ namespace qpid { namespace client { -SubscriptionManager::SubscriptionManager(Session_0_10& s) +SubscriptionManager::SubscriptionManager(Session& s) : dispatcher(s), session(s), messages(UNLIMITED), bytes(UNLIMITED), window(true), confirmMode(true), acquireMode(false), diff --git a/cpp/src/qpid/client/SubscriptionManager.h b/cpp/src/qpid/client/SubscriptionManager.h index 73331450cf..1741796f4f 100644 --- a/cpp/src/qpid/client/SubscriptionManager.h +++ b/cpp/src/qpid/client/SubscriptionManager.h @@ -24,7 +24,7 @@ #include "qpid/sys/Mutex.h" #include <qpid/client/Dispatcher.h> #include <qpid/client/Completion.h> -#include <qpid/client/Session_0_10.h> +#include <qpid/client/Session.h> #include <qpid/client/MessageListener.h> #include <qpid/client/LocalQueue.h> #include <qpid/sys/Runnable.h> @@ -48,7 +48,7 @@ class SubscriptionManager : public sys::Runnable Completion subscribeInternal(const std::string& q, const std::string& dest); qpid::client::Dispatcher dispatcher; - qpid::client::Session_0_10& session; + qpid::client::Session& session; uint32_t messages; uint32_t bytes; bool window; @@ -58,7 +58,7 @@ class SubscriptionManager : public sys::Runnable bool autoStop; public: - SubscriptionManager(Session_0_10& session); + SubscriptionManager(Session& session); /** * Subscribe a MessagesListener to receive messages from queue. diff --git a/cpp/src/qpid/framing/AMQP_HighestVersion.h b/cpp/src/qpid/framing/AMQP_HighestVersion.h index 42139c7937..1be8856c13 100644 --- a/cpp/src/qpid/framing/AMQP_HighestVersion.h +++ b/cpp/src/qpid/framing/AMQP_HighestVersion.h @@ -32,7 +32,7 @@ namespace qpid { namespace framing { -static ProtocolVersion highestProtocolVersion(0, 10); +static ProtocolVersion highestProtocolVersion(99, 0); } /* namespace framing */ } /* namespace qpid */ diff --git a/cpp/src/tests/BrokerFixture.h b/cpp/src/tests/BrokerFixture.h index 9fcdb57a99..fd350b77fe 100644 --- a/cpp/src/tests/BrokerFixture.h +++ b/cpp/src/tests/BrokerFixture.h @@ -27,7 +27,7 @@ #include "qpid/broker/Broker.h" #include "qpid/client/Connection.h" #include "qpid/client/ConnectionImpl.h" -#include "qpid/client/Session_0_10.h" +#include "qpid/client/Session.h" #include "qpid/client/SubscriptionManager.h" /** @@ -86,7 +86,7 @@ struct ProxyConnection : public qpid::client::Connection { template <class ConnectionType> struct SessionFixtureT : BrokerFixture { ConnectionType connection; - qpid::client::Session_0_10 session; + qpid::client::Session session; qpid::client::SubscriptionManager subs; qpid::client::LocalQueue lq; diff --git a/cpp/src/tests/ClientSessionTest.cpp b/cpp/src/tests/ClientSessionTest.cpp index c299837f86..7a997db327 100644 --- a/cpp/src/tests/ClientSessionTest.cpp +++ b/cpp/src/tests/ClientSessionTest.cpp @@ -24,7 +24,7 @@ #include "qpid/sys/Monitor.h" #include "qpid/sys/Thread.h" #include "qpid/sys/Runnable.h" -#include "qpid/client/Session_0_10.h" +#include "qpid/client/Session.h" #include "qpid/framing/TransferContent.h" #include "qpid/framing/reply_exceptions.h" @@ -52,7 +52,7 @@ struct DummyListener : public sys::Runnable, public MessageListener { uint expected; Dispatcher dispatcher; - DummyListener(Session_0_10& session, const string& n, uint ex) : + DummyListener(Session& session, const string& n, uint ex) : name(n), expected(ex), dispatcher(session) {} void run() diff --git a/cpp/src/tests/client_test.cpp b/cpp/src/tests/client_test.cpp index 84fc9434de..bd2a541c92 100644 --- a/cpp/src/tests/client_test.cpp +++ b/cpp/src/tests/client_test.cpp @@ -31,7 +31,7 @@ #include "TestOptions.h" #include "qpid/client/Connection.h" #include "qpid/client/Message.h" -#include "qpid/client/Session_0_10.h" +#include "qpid/client/Session.h" #include "qpid/framing/FrameSet.h" #include "qpid/framing/MessageTransferBody.h" @@ -92,7 +92,7 @@ int main(int argc, char** argv) //Create and open a session on the connection through which //most functionality is exposed: - Session_0_10 session = connection.newSession(ASYNC); + Session session = connection.newSession(ASYNC); if (opts.trace) std::cout << "Opened session." << std::endl; diff --git a/cpp/src/tests/latencytest.cpp b/cpp/src/tests/latencytest.cpp index 86200054d8..2b44a5477a 100644 --- a/cpp/src/tests/latencytest.cpp +++ b/cpp/src/tests/latencytest.cpp @@ -30,7 +30,7 @@ #include "TestOptions.h" #include "qpid/client/Connection.h" #include "qpid/client/Message.h" -#include "qpid/client/Session_0_10.h" +#include "qpid/client/Session.h" #include "qpid/client/SubscriptionManager.h" using namespace qpid; @@ -98,7 +98,7 @@ class Client : public Runnable { protected: Connection connection; - Session_0_10 session; + Session session; Thread thread; string queue; diff --git a/cpp/src/tests/perftest.cpp b/cpp/src/tests/perftest.cpp index bc638635da..b950e432f5 100644 --- a/cpp/src/tests/perftest.cpp +++ b/cpp/src/tests/perftest.cpp @@ -21,7 +21,7 @@ #include "TestOptions.h" -#include "qpid/client/Session_0_10.h" +#include "qpid/client/Session.h" #include "qpid/client/SubscriptionManager.h" #include "qpid/client/Connection.h" #include "qpid/client/Completion.h" @@ -191,7 +191,7 @@ Opts opts; struct Client : public Runnable { Connection connection; - Session_0_10 session; + Session session; Thread thread; Client() { diff --git a/cpp/src/tests/topic_listener.cpp b/cpp/src/tests/topic_listener.cpp index ec73f3cbe0..e5e7d24112 100644 --- a/cpp/src/tests/topic_listener.cpp +++ b/cpp/src/tests/topic_listener.cpp @@ -35,7 +35,7 @@ #include "TestOptions.h" #include "qpid/client/Connection.h" #include "qpid/client/MessageListener.h" -#include "qpid/client/Session_0_10.h" +#include "qpid/client/Session.h" #include "qpid/client/SubscriptionManager.h" #include "qpid/sys/Time.h" #include "qpid/framing/FieldValue.h" @@ -53,7 +53,7 @@ using namespace std; * defined. */ class Listener : public MessageListener{ - Session_0_10& session; + Session& session; SubscriptionManager& mgr; const string responseQueue; const bool transactional; @@ -64,7 +64,7 @@ class Listener : public MessageListener{ void shutdown(); void report(); public: - Listener(Session_0_10& session, SubscriptionManager& mgr, const string& reponseQueue, bool tx); + Listener(Session& session, SubscriptionManager& mgr, const string& reponseQueue, bool tx); virtual void received(Message& msg); }; @@ -101,7 +101,7 @@ int main(int argc, char** argv){ else { Connection connection(args.trace); args.open(connection); - Session_0_10 session = connection.newSession(ASYNC); + Session session = connection.newSession(ASYNC); if (args.transactional) { session.txSelect(); } @@ -144,7 +144,7 @@ int main(int argc, char** argv){ return 1; } -Listener::Listener(Session_0_10& s, SubscriptionManager& m, const string& _responseq, bool tx) : +Listener::Listener(Session& s, SubscriptionManager& m, const string& _responseq, bool tx) : session(s), mgr(m), responseQueue(_responseq), transactional(tx), init(false), count(0){} void Listener::received(Message& message){ diff --git a/cpp/src/tests/topic_publisher.cpp b/cpp/src/tests/topic_publisher.cpp index 24a4fc6752..2271849c35 100644 --- a/cpp/src/tests/topic_publisher.cpp +++ b/cpp/src/tests/topic_publisher.cpp @@ -37,7 +37,7 @@ #include "TestOptions.h" #include "qpid/client/Connection.h" #include "qpid/client/MessageListener.h" -#include "qpid/client/Session_0_10.h" +#include "qpid/client/Session.h" #include "qpid/client/SubscriptionManager.h" #include "qpid/sys/Monitor.h" #include <unistd.h> @@ -56,7 +56,7 @@ using namespace std; * back by the subscribers. */ class Publisher { - Session_0_10& session; + Session& session; SubscriptionManager mgr; LocalQueue queue; const string controlTopic; @@ -66,7 +66,7 @@ class Publisher { string generateData(int size); public: - Publisher(Session_0_10& session, const string& controlTopic, bool tx, bool durable); + Publisher(Session& session, const string& controlTopic, bool tx, bool durable); int64_t publish(int msgs, int listeners, int size); void terminate(); }; @@ -107,7 +107,7 @@ int main(int argc, char** argv) { else { Connection connection(args.trace); args.open(connection); - Session_0_10 session = connection.newSession(ASYNC); + Session session = connection.newSession(ASYNC); if (args.transactional) { session.txSelect(); } @@ -150,7 +150,7 @@ int main(int argc, char** argv) { return 1; } -Publisher::Publisher(Session_0_10& _session, const string& _controlTopic, bool tx, bool d) : +Publisher::Publisher(Session& _session, const string& _controlTopic, bool tx, bool d) : session(_session), mgr(session), controlTopic(_controlTopic), transactional(tx), durable(d) { mgr.subscribe(queue, "response"); diff --git a/cpp/src/tests/txtest.cpp b/cpp/src/tests/txtest.cpp index f7776dee8d..4c5814986c 100644 --- a/cpp/src/tests/txtest.cpp +++ b/cpp/src/tests/txtest.cpp @@ -28,7 +28,7 @@ #include "TestOptions.h" #include "qpid/client/Connection.h" #include "qpid/client/Message.h" -#include "qpid/client/Session_0_10.h" +#include "qpid/client/Session.h" #include "qpid/client/SubscriptionManager.h" using namespace qpid; @@ -96,7 +96,7 @@ Args opts; struct Client { Connection connection; - Session_0_10 session; + Session session; Client() { diff --git a/python/qpid/connection.py b/python/qpid/connection.py index ecbce295b7..eafad7067a 100644 --- a/python/qpid/connection.py +++ b/python/qpid/connection.py @@ -178,6 +178,12 @@ class Connection: raise "frame error: expected %r, got %r" % (self.FRAME_END, garbage) return frame + def write_99_0(self, frame): + self.write_0_10(frame) + + def read_99_0(self): + return self.read_0_10() + class Frame: DECODERS = {} @@ -233,7 +239,7 @@ class Method(Frame): def encode(self, c): version = (c.spec.major, c.spec.minor) - if version == (0, 10): + if version == (0, 10) or version == (99, 0): c.encode_octet(self.method.klass.id) c.encode_octet(self.method.id) else: @@ -244,7 +250,7 @@ class Method(Frame): def decode(spec, c, size): version = (c.spec.major, c.spec.minor) - if version == (0, 10): + if version == (0, 10) or version == (99, 0): klass = spec.classes.byid[c.decode_octet()] meth = klass.methods.byid[c.decode_octet()] else: @@ -315,7 +321,7 @@ class Response(Frame): return "[%s] Response(%s,%s,%s) %s" % (self.channel, self.id, self.request_id, self.batch_offset, self.method) def uses_struct_encoding(spec): - return (spec.major == 0 and spec.minor == 10) + return (spec.major == 0 and spec.minor == 10) or (spec.major == 99 and spec.minor == 0) class Header(Frame): diff --git a/python/qpid/peer.py b/python/qpid/peer.py index c7f56b49b2..a464e95593 100644 --- a/python/qpid/peer.py +++ b/python/qpid/peer.py @@ -198,7 +198,7 @@ class Channel: self.invoker = self.invoke_reliable else: self.invoker = self.invoke_method - self.use_execution_layer = (spec.major == 0 and spec.minor == 10) + self.use_execution_layer = (spec.major == 0 and spec.minor == 10) or (spec.major == 99 and spec.minor == 0) self.synchronous = True def closed(self, reason): diff --git a/python/qpid/testlib.py b/python/qpid/testlib.py index c4f55be18a..5174fe10f4 100644 --- a/python/qpid/testlib.py +++ b/python/qpid/testlib.py @@ -141,7 +141,7 @@ Options: self.tests=findmodules("tests") if self.use08spec(): self.tests+=findmodules("tests_0-8") - elif self.spec.major == 0 and self.spec.minor == 10: + elif (self.spec.major == 0 and self.spec.minor == 10) or (self.spec.major == 99 and self.spec.minor == 0): self.tests+=findmodules("tests_0-10") else: self.tests+=findmodules("tests_0-9") diff --git a/specs/amqp.0-10-preview.xml b/specs/amqp.0-10-preview.xml index 6ba6bfc5ed..5af956e75d 100644 --- a/specs/amqp.0-10-preview.xml +++ b/specs/amqp.0-10-preview.xml @@ -137,7 +137,7 @@ --> <amqp xmlns="http://www.amqp.org/schema/amqp.xsd" - major="0" minor="10" port="5672" comment="AMQ Protocol (Working version)"> + major="99" minor="0" port="5672" comment="AMQ Protocol (Working version)"> <!-- ====================================================== |