summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-02-21 17:40:42 +0000
committerGordon Sim <gsim@apache.org>2008-02-21 17:40:42 +0000
commit3767d7e49e80c268c60ee247b3526b986eb7fc17 (patch)
tree36f0c9487e54705530be26271de7e52676bad524
parent5f06a953368f7f41dd8ab94a6775fcd9b5c99792 (diff)
downloadqpid-python-3767d7e49e80c268c60ee247b3526b986eb7fc17.tar.gz
Start moving towards final 0-10 spec:
* marked preview spec as 99-0 to distinguish it from 0-10 (which will now be used for the final version) * modified python client to treat 99-0 as 0-10 for now * modified broker to have two paths for the two different versions: 99-0 uses PreviewConnection, PreviewConnectionHandler and PreviewSessionHandler which are straight copy & pastes of the Connection, ConnectionHandler and SessionHandler now associated with 0-10 (so we can migrate the 0-10 path to the final spec without affecting clients working with the preview version) git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@629883 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/rubygen/templates/Session.rb4
-rw-r--r--cpp/src/Makefile.am10
-rw-r--r--cpp/src/qpid/broker/Bridge.cpp4
-rw-r--r--cpp/src/qpid/broker/Bridge.h6
-rw-r--r--cpp/src/qpid/broker/Connection.cpp19
-rw-r--r--cpp/src/qpid/broker/Connection.h32
-rw-r--r--cpp/src/qpid/broker/ConnectionFactory.cpp3
-rw-r--r--cpp/src/qpid/broker/ConnectionState.h84
-rw-r--r--cpp/src/qpid/broker/HandlerImpl.h3
-rw-r--r--cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp92
-rw-r--r--cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h60
-rw-r--r--cpp/src/qpid/broker/PreviewConnection.cpp327
-rw-r--r--cpp/src/qpid/broker/PreviewConnection.h118
-rw-r--r--cpp/src/qpid/broker/PreviewConnectionHandler.cpp158
-rw-r--r--cpp/src/qpid/broker/PreviewConnectionHandler.h92
-rw-r--r--cpp/src/qpid/broker/PreviewSessionHandler.cpp210
-rw-r--r--cpp/src/qpid/broker/PreviewSessionHandler.h111
-rw-r--r--cpp/src/qpid/broker/SemanticHandler.cpp5
-rw-r--r--cpp/src/qpid/broker/SemanticHandler.h2
-rw-r--r--cpp/src/qpid/broker/SessionContext.h53
-rw-r--r--cpp/src/qpid/broker/SessionHandler.cpp6
-rw-r--r--cpp/src/qpid/broker/SessionHandler.h10
-rw-r--r--cpp/src/qpid/broker/SessionManager.cpp2
-rw-r--r--cpp/src/qpid/broker/SessionManager.h4
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp14
-rw-r--r--cpp/src/qpid/broker/SessionState.h14
-rw-r--r--cpp/src/qpid/client/Channel.cpp6
-rw-r--r--cpp/src/qpid/client/Channel.h6
-rw-r--r--cpp/src/qpid/client/Connection.cpp6
-rw-r--r--cpp/src/qpid/client/Connection.h6
-rw-r--r--cpp/src/qpid/client/Dispatcher.cpp5
-rw-r--r--cpp/src/qpid/client/Dispatcher.h11
-rw-r--r--cpp/src/qpid/client/LocalQueue.h2
-rw-r--r--cpp/src/qpid/client/Message.h12
-rw-r--r--cpp/src/qpid/client/Session.h4
-rw-r--r--cpp/src/qpid/client/SubscriptionManager.cpp4
-rw-r--r--cpp/src/qpid/client/SubscriptionManager.h6
-rw-r--r--cpp/src/qpid/framing/AMQP_HighestVersion.h2
-rw-r--r--cpp/src/tests/BrokerFixture.h4
-rw-r--r--cpp/src/tests/ClientSessionTest.cpp4
-rw-r--r--cpp/src/tests/client_test.cpp4
-rw-r--r--cpp/src/tests/latencytest.cpp4
-rw-r--r--cpp/src/tests/perftest.cpp4
-rw-r--r--cpp/src/tests/topic_listener.cpp10
-rw-r--r--cpp/src/tests/topic_publisher.cpp10
-rw-r--r--cpp/src/tests/txtest.cpp4
-rw-r--r--python/qpid/connection.py12
-rw-r--r--python/qpid/peer.py2
-rw-r--r--python/qpid/testlib.py2
-rw-r--r--specs/amqp.0-10-preview.xml2
50 files changed, 1428 insertions, 147 deletions
diff --git a/cpp/rubygen/templates/Session.rb b/cpp/rubygen/templates/Session.rb
index 21534b78fb..6a50fdb462 100644
--- a/cpp/rubygen/templates/Session.rb
+++ b/cpp/rubygen/templates/Session.rb
@@ -85,8 +85,8 @@ class SessionNoKeywordGen < CppGen
}
cpp_class(@classname, "public SessionBase") {
public
- genl "Session_0_10() {}"
- genl "Session_0_10(shared_ptr<SessionCore> core) : SessionBase(core) {}"
+ genl "Session_#{@amqp.version.bars}() {}"
+ genl "Session_#{@amqp.version.bars}(shared_ptr<SessionCore> core) : SessionBase(core) {}"
session_methods.each { |m|
genl
doxygen(m)
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index 37f8eb8c85..b44aaa7e5e 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -163,6 +163,9 @@ libqpidbroker_la_SOURCES = \
qpid/broker/Queue.cpp \
qpid/broker/PersistableMessage.cpp \
qpid/broker/Bridge.cpp \
+ qpid/broker/PreviewConnection.cpp \
+ qpid/broker/PreviewConnectionHandler.cpp \
+ qpid/broker/PreviewSessionHandler.cpp \
qpid/broker/Connection.cpp \
qpid/broker/ConnectionHandler.cpp \
qpid/broker/ConnectionFactory.cpp \
@@ -186,6 +189,7 @@ libqpidbroker_la_SOURCES = \
qpid/broker/MessageDelivery.cpp \
qpid/broker/MessageHandlerImpl.cpp \
qpid/broker/MessageStoreModule.cpp \
+ qpid/broker/MultiVersionConnectionInputHandler.cpp \
qpid/broker/NameGenerator.cpp \
qpid/broker/NullMessageStore.cpp \
qpid/broker/QueueBindings.cpp \
@@ -201,6 +205,7 @@ libqpidbroker_la_SOURCES = \
qpid/broker/SessionManager.h \
qpid/broker/SessionManager.cpp \
qpid/broker/SessionHandler.h \
+ qpid/broker/SessionContext.h \
qpid/broker/SessionHandler.cpp \
qpid/broker/SemanticHandler.cpp \
qpid/broker/Timer.cpp \
@@ -262,7 +267,11 @@ nobase_include_HEADERS = \
qpid/broker/Queue.h \
qpid/broker/BrokerSingleton.h \
qpid/broker/Bridge.h \
+ qpid/broker/PreviewConnection.h \
+ qpid/broker/PreviewConnectionHandler.h \
+ qpid/broker/PreviewSessionHandler.h \
qpid/broker/Connection.h \
+ qpid/broker/ConnectionState.h \
qpid/broker/ConnectionFactory.h \
qpid/broker/ConnectionHandler.h \
qpid/broker/ConnectionToken.h \
@@ -293,6 +302,7 @@ nobase_include_HEADERS = \
qpid/broker/MessageHandlerImpl.h \
qpid/broker/MessageStore.h \
qpid/broker/MessageStoreModule.h \
+ qpid/broker/MultiVersionConnectionInputHandler.h \
qpid/broker/NameGenerator.h \
qpid/broker/NullMessageStore.h \
qpid/broker/Persistable.h \
diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp
index 6c1d8e7ca5..566b9cc197 100644
--- a/cpp/src/qpid/broker/Bridge.cpp
+++ b/cpp/src/qpid/broker/Bridge.cpp
@@ -19,7 +19,7 @@
*
*/
#include "Bridge.h"
-#include "Connection.h"
+#include "ConnectionState.h"
#include "qpid/management/ManagementAgent.h"
#include "qpid/framing/FieldTable.h"
@@ -31,7 +31,7 @@ using qpid::framing::Uuid;
namespace qpid {
namespace broker {
-Bridge::Bridge(framing::ChannelId id, Connection& c, CancellationListener l, const management::ArgsLinkBridge& _args) :
+Bridge::Bridge(framing::ChannelId id, ConnectionState& c, CancellationListener l, const management::ArgsLinkBridge& _args) :
args(_args), channel(id, &(c.getOutput())), peer(channel),
mgmtObject(new management::Bridge(this, &c, id, args.i_src, args.i_dest, args.i_key, args.i_src_is_queue, args.i_src_is_local)),
connection(c), listener(l)
diff --git a/cpp/src/qpid/broker/Bridge.h b/cpp/src/qpid/broker/Bridge.h
index 8506325ddb..1198285c93 100644
--- a/cpp/src/qpid/broker/Bridge.h
+++ b/cpp/src/qpid/broker/Bridge.h
@@ -32,14 +32,14 @@
namespace qpid {
namespace broker {
-class Connection;
+class ConnectionState;
class Bridge : public management::Manageable
{
public:
typedef boost::function<void(Bridge*)> CancellationListener;
- Bridge(framing::ChannelId id, Connection& c, CancellationListener l,
+ Bridge(framing::ChannelId id, ConnectionState& c, CancellationListener l,
const management::ArgsLinkBridge& args);
~Bridge();
@@ -54,7 +54,7 @@ private:
framing::ChannelHandler channel;
framing::AMQP_ServerProxy peer;
management::Bridge::shared_ptr mgmtObject;
- Connection& connection;
+ ConnectionState& connection;
CancellationListener listener;
};
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index d73a249184..822890ae76 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -86,13 +86,7 @@ public:
Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_) :
- broker(broker_),
- outputTasks(*out_),
- out(out_),
- framemax(65535),
- heartbeat(0),
- client(0),
- stagingThreshold(broker.getStagingThreshold()),
+ ConnectionState(out_, broker_),
adapter(*this),
mgmtClosing(0),
mgmtId(mgmtId_)
@@ -228,17 +222,6 @@ Manageable::status_t Connection::ManagementMethod (uint32_t methodId,
return status;
}
-void Connection::setUserId(const string& uid)
-{
- userId = uid;
- QPID_LOG (debug, "UserId is " << userId);
-}
-
-const string& Connection::getUserId() const
-{
- return userId;
-}
-
Connection::MgmtLink::MgmtLink(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId)
: channelCounter(1)
{
diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h
index 99b394dda0..8719a9dfcd 100644
--- a/cpp/src/qpid/broker/Connection.h
+++ b/cpp/src/qpid/broker/Connection.h
@@ -39,6 +39,7 @@
#include "qpid/sys/Socket.h"
#include "qpid/Exception.h"
#include "ConnectionHandler.h"
+#include "ConnectionState.h"
#include "SessionHandler.h"
#include "qpid/management/Manageable.h"
#include "qpid/management/Client.h"
@@ -50,8 +51,7 @@ namespace qpid {
namespace broker {
class Connection : public sys::ConnectionInputHandler,
- public ConnectionToken,
- public management::Manageable
+ public ConnectionState
{
public:
Connection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId);
@@ -63,25 +63,6 @@ class Connection : public sys::ConnectionInputHandler,
/** Close the connection */
void close(framing::ReplyCode code, const string& text, framing::ClassId classId, framing::MethodId methodId);
- sys::ConnectionOutputHandler& getOutput() const { return *out; }
- framing::ProtocolVersion getVersion() const { return version; }
-
- uint32_t getFrameMax() const { return framemax; }
- uint16_t getHeartbeat() const { return heartbeat; }
- uint64_t getStagingThreshold() const { return stagingThreshold; }
-
- void setFrameMax(uint32_t fm) { framemax = fm; }
- void setHeartbeat(uint16_t hb) { heartbeat = hb; }
- void setStagingThreshold(uint64_t st) { stagingThreshold = st; }
-
- Broker& getBroker() { return broker; }
-
- Broker& broker;
- std::vector<Queue::shared_ptr> exclusiveQueues;
-
- //contained output tasks
- sys::AggregateOutput outputTasks;
-
// ConnectionInputHandler methods
void received(framing::AMQFrame& frame);
void initiated(const framing::ProtocolInitiation& header);
@@ -98,9 +79,6 @@ class Connection : public sys::ConnectionInputHandler,
management::Manageable::status_t
ManagementMethod (uint32_t methodId, management::Args& args);
- void setUserId(const string& uid);
- const string& getUserId() const;
-
void initMgmt(bool asLink = false);
private:
@@ -126,17 +104,11 @@ class Connection : public sys::ConnectionInputHandler,
class MgmtClient;
class MgmtLink;
- framing::ProtocolVersion version;
ChannelMap channels;
- sys::ConnectionOutputHandler* out;
- uint32_t framemax;
- uint16_t heartbeat;
framing::AMQP_ClientProxy::Connection* client;
- uint64_t stagingThreshold;
ConnectionHandler adapter;
std::auto_ptr<MgmtWrapper> mgmtWrapper;
bool mgmtClosing;
- string userId;
const std::string mgmtId;
};
diff --git a/cpp/src/qpid/broker/ConnectionFactory.cpp b/cpp/src/qpid/broker/ConnectionFactory.cpp
index 9577853de4..a0cd4e35d7 100644
--- a/cpp/src/qpid/broker/ConnectionFactory.cpp
+++ b/cpp/src/qpid/broker/ConnectionFactory.cpp
@@ -20,6 +20,7 @@
*/
#include "ConnectionFactory.h"
#include "Connection.h"
+#include "MultiVersionConnectionInputHandler.h"
namespace qpid {
namespace broker {
@@ -38,7 +39,7 @@ qpid::sys::ConnectionInputHandler*
ConnectionFactory::create(qpid::sys::ConnectionOutputHandler* out,
const std::string& id)
{
- return new Connection(out, broker, id);
+ return new MultiVersionConnectionInputHandler(out, broker, id);
}
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/ConnectionState.h b/cpp/src/qpid/broker/ConnectionState.h
new file mode 100644
index 0000000000..691d47d866
--- /dev/null
+++ b/cpp/src/qpid/broker/ConnectionState.h
@@ -0,0 +1,84 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _ConnectionState_
+#define _ConnectionState_
+
+#include <vector>
+
+#include "qpid/sys/AggregateOutput.h"
+#include "qpid/sys/ConnectionOutputHandler.h"
+#include "qpid/framing/ProtocolVersion.h"
+#include "qpid/management/Manageable.h"
+#include "Broker.h"
+
+namespace qpid {
+namespace broker {
+
+class ConnectionState : public ConnectionToken, public management::Manageable
+{
+ public:
+ ConnectionState(qpid::sys::ConnectionOutputHandler* o, Broker& b) :
+ broker(b),
+ outputTasks(*o),
+ out(o),
+ framemax(65535),
+ heartbeat(0),
+ stagingThreshold(broker.getStagingThreshold())
+ {}
+
+
+
+ virtual ~ConnectionState () {}
+
+ uint32_t getFrameMax() const { return framemax; }
+ uint16_t getHeartbeat() const { return heartbeat; }
+ uint64_t getStagingThreshold() const { return stagingThreshold; }
+
+ void setFrameMax(uint32_t fm) { framemax = fm; }
+ void setHeartbeat(uint16_t hb) { heartbeat = hb; }
+ void setStagingThreshold(uint64_t st) { stagingThreshold = st; }
+
+ void setUserId(const string& uid) { userId = uid; }
+ const string& getUserId() const { return userId; }
+
+ Broker& getBroker() { return broker; }
+
+ Broker& broker;
+ std::vector<Queue::shared_ptr> exclusiveQueues;
+
+ //contained output tasks
+ sys::AggregateOutput outputTasks;
+
+ sys::ConnectionOutputHandler& getOutput() const { return *out; }
+ framing::ProtocolVersion getVersion() const { return version; }
+
+ protected:
+ framing::ProtocolVersion version;
+ sys::ConnectionOutputHandler* out;
+ uint32_t framemax;
+ uint16_t heartbeat;
+ uint64_t stagingThreshold;
+ string userId;
+};
+
+}}
+
+#endif
diff --git a/cpp/src/qpid/broker/HandlerImpl.h b/cpp/src/qpid/broker/HandlerImpl.h
index 0250805f52..410d400c9d 100644
--- a/cpp/src/qpid/broker/HandlerImpl.h
+++ b/cpp/src/qpid/broker/HandlerImpl.h
@@ -21,6 +21,7 @@
#include "SemanticState.h"
#include "SessionState.h"
+#include "ConnectionState.h"
namespace qpid {
namespace broker {
@@ -39,7 +40,7 @@ class HandlerImpl {
HandlerImpl(SemanticState& s) : state(s), session(s.getSession()) {}
framing::AMQP_ClientProxy& getProxy() { return session.getProxy(); }
- Connection& getConnection() { return session.getConnection(); }
+ ConnectionState& getConnection() { return session.getConnection(); }
Broker& getBroker() { return session.getBroker(); }
};
diff --git a/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp b/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp
new file mode 100644
index 0000000000..676f9e4b3d
--- /dev/null
+++ b/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp
@@ -0,0 +1,92 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "MultiVersionConnectionInputHandler.h"
+#include "Connection.h"
+#include "PreviewConnection.h"
+#include "qpid/framing/reply_exceptions.h"
+
+namespace qpid {
+namespace broker {
+
+MultiVersionConnectionInputHandler::MultiVersionConnectionInputHandler(
+ qpid::sys::ConnectionOutputHandler* _out,
+ Broker& _broker,
+ const std::string& _id) : linkVersion(99,0), out(_out), broker(_broker), id(_id) {}
+
+
+void MultiVersionConnectionInputHandler::initiated(const qpid::framing::ProtocolInitiation& i)
+{
+ if (i.getMajor() == 99 && i.getMinor() == 0) {
+ handler = std::auto_ptr<ConnectionInputHandler>(new PreviewConnection(out, broker, id));
+ } else if (i.getMajor() == 0 && i.getMinor() == 10) {
+ handler = std::auto_ptr<ConnectionInputHandler>(new Connection(out, broker, id));
+ } else {
+ throw qpid::framing::InternalErrorException("Unsupported version: " + i.getVersion().toString());
+ }
+ handler->initiated(i);
+}
+
+void MultiVersionConnectionInputHandler::received(qpid::framing::AMQFrame& f)
+{
+ check();
+ handler->received(f);
+}
+
+void MultiVersionConnectionInputHandler::idleOut()
+{
+ check();
+ handler->idleOut();
+}
+
+void MultiVersionConnectionInputHandler::idleIn()
+{
+ check();
+ handler->idleIn();
+}
+
+bool MultiVersionConnectionInputHandler::doOutput()
+{
+ return check(false) && handler->doOutput();
+}
+
+qpid::framing::ProtocolInitiation MultiVersionConnectionInputHandler::getInitiation()
+{
+ return qpid::framing::ProtocolInitiation(linkVersion);
+}
+
+void MultiVersionConnectionInputHandler::closed()
+{
+ check();
+ handler->closed();
+}
+
+bool MultiVersionConnectionInputHandler::check(bool fail)
+{
+ if (!handler.get()) {
+ if (fail) throw qpid::framing::InternalErrorException("Handler not initialised!");
+ else return false;
+ } else {
+ return true;
+ }
+}
+
+}
+}
diff --git a/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h b/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h
new file mode 100644
index 0000000000..4301eba57c
--- /dev/null
+++ b/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _MultiVersionConnectionInputHandler_
+#define _MultiVersionConnectionInputHandler_
+
+#include <memory>
+#include <string>
+#include "qpid/sys/ConnectionInputHandler.h"
+#include "qpid/sys/ConnectionOutputHandler.h"
+#include "qpid/broker/Broker.h"
+
+namespace qpid {
+namespace broker {
+
+class MultiVersionConnectionInputHandler : public qpid::sys::ConnectionInputHandler
+{
+ qpid::framing::ProtocolVersion linkVersion;//version used for inter-broker links
+ std::auto_ptr<qpid::sys::ConnectionInputHandler> handler;
+ qpid::sys::ConnectionOutputHandler* out;
+ Broker& broker;
+ const std::string id;
+
+ bool check(bool fail = true);
+
+public:
+ MultiVersionConnectionInputHandler(qpid::sys::ConnectionOutputHandler* out, Broker& broker, const std::string& id);
+ virtual ~MultiVersionConnectionInputHandler() {}
+
+ void initiated(const qpid::framing::ProtocolInitiation&);
+ void received(qpid::framing::AMQFrame&);
+ void idleOut();
+ void idleIn();
+ bool doOutput();
+ qpid::framing::ProtocolInitiation getInitiation();
+ void closed();
+};
+
+}
+}
+
+
+#endif
diff --git a/cpp/src/qpid/broker/PreviewConnection.cpp b/cpp/src/qpid/broker/PreviewConnection.cpp
new file mode 100644
index 0000000000..05879a0329
--- /dev/null
+++ b/cpp/src/qpid/broker/PreviewConnection.cpp
@@ -0,0 +1,327 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "PreviewConnection.h"
+#include "SessionState.h"
+#include "BrokerAdapter.h"
+#include "Bridge.h"
+#include "SemanticHandler.h"
+
+#include "qpid/log/Statement.h"
+#include "qpid/ptr_map.h"
+#include "qpid/framing/AMQP_ClientProxy.h"
+#include "qpid/management/ManagementAgent.h"
+
+#include <boost/bind.hpp>
+#include <boost/ptr_container/ptr_vector.hpp>
+
+#include <algorithm>
+#include <iostream>
+#include <assert.h>
+
+using namespace boost;
+using namespace qpid::sys;
+using namespace qpid::framing;
+using namespace qpid::sys;
+using namespace qpid::ptr_map;
+using qpid::management::ManagementAgent;
+using qpid::management::ManagementObject;
+using qpid::management::Manageable;
+using qpid::management::Args;
+
+namespace qpid {
+namespace broker {
+
+class PreviewConnection::MgmtClient : public PreviewConnection::MgmtWrapper
+{
+ management::Client::shared_ptr mgmtClient;
+
+public:
+ MgmtClient(PreviewConnection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId);
+ ~MgmtClient();
+ void received(framing::AMQFrame& frame);
+ management::ManagementObject::shared_ptr getManagementObject() const;
+ void closing();
+};
+
+class PreviewConnection::MgmtLink : public PreviewConnection::MgmtWrapper
+{
+ typedef boost::ptr_vector<Bridge> Bridges;
+
+ management::Link::shared_ptr mgmtLink;
+ Bridges created;//holds list of bridges pending creation
+ Bridges cancelled;//holds list of bridges pending cancellation
+ Bridges active;//holds active bridges
+ uint channelCounter;
+ sys::Mutex lock;
+
+ void cancel(Bridge*);
+
+public:
+ MgmtLink(PreviewConnection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId);
+ ~MgmtLink();
+ void received(framing::AMQFrame& frame);
+ management::ManagementObject::shared_ptr getManagementObject() const;
+ void closing();
+ void processPending();
+ void process(PreviewConnection& connection, const management::Args& args);
+};
+
+
+PreviewConnection::PreviewConnection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_) :
+ ConnectionState(out_, broker_),
+ adapter(*this),
+ mgmtClosing(0),
+ mgmtId(mgmtId_)
+{}
+
+void PreviewConnection::initMgmt(bool asLink)
+{
+ Manageable* parent = broker.GetVhostObject ();
+
+ if (parent != 0)
+ {
+ ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
+
+ if (agent.get () != 0)
+ {
+ if (asLink) {
+ mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtLink(this, parent, agent, mgmtId));
+ } else {
+ mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtClient(this, parent, agent, mgmtId));
+ }
+ }
+ }
+}
+
+PreviewConnection::~PreviewConnection () {}
+
+void PreviewConnection::received(framing::AMQFrame& frame){
+ if (mgmtClosing)
+ close (403, "Closed by Management Request", 0, 0);
+
+ if (frame.getChannel() == 0) {
+ adapter.handle(frame);
+ } else {
+ getChannel(frame.getChannel()).in(frame);
+ }
+
+ if (mgmtWrapper.get()) mgmtWrapper->received(frame);
+}
+
+void PreviewConnection::close(
+ ReplyCode code, const string& text, ClassId classId, MethodId methodId)
+{
+ adapter.close(code, text, classId, methodId);
+ channels.clear();
+ getOutput().close();
+}
+
+void PreviewConnection::initiated(const framing::ProtocolInitiation& header) {
+ version = ProtocolVersion(header.getMajor(), header.getMinor());
+ adapter.init(header);
+ initMgmt();
+}
+
+void PreviewConnection::idleOut(){}
+
+void PreviewConnection::idleIn(){}
+
+void PreviewConnection::closed(){ // Physically closed, suspend open sessions.
+ try {
+ for (ChannelMap::iterator i = channels.begin(); i != channels.end(); ++i)
+ get_pointer(i)->localSuspend();
+ while (!exclusiveQueues.empty()) {
+ Queue::shared_ptr q(exclusiveQueues.front());
+ q->releaseExclusiveOwnership();
+ if (q->canAutoDelete()) {
+ Queue::tryAutoDelete(broker, q);
+ }
+ exclusiveQueues.erase(exclusiveQueues.begin());
+ }
+ } catch(std::exception& e) {
+ QPID_LOG(error, " Unhandled exception while closing session: " <<
+ e.what());
+ assert(0);
+ }
+}
+
+bool PreviewConnection::doOutput()
+{
+ try{
+ //process any pending mgmt commands:
+ if (mgmtWrapper.get()) mgmtWrapper->processPending();
+
+ //then do other output as needed:
+ return outputTasks.doOutput();
+ }catch(ConnectionException& e){
+ close(e.code, e.what(), 0, 0);
+ }catch(std::exception& e){
+ close(541/*internal error*/, e.what(), 0, 0);
+ }
+ return false;
+}
+
+void PreviewConnection::closeChannel(uint16_t id) {
+ ChannelMap::iterator i = channels.find(id);
+ if (i != channels.end()) channels.erase(i);
+}
+
+PreviewSessionHandler& PreviewConnection::getChannel(ChannelId id) {
+ ChannelMap::iterator i=channels.find(id);
+ if (i == channels.end()) {
+ i = channels.insert(id, new PreviewSessionHandler(*this, id)).first;
+ }
+ return *get_pointer(i);
+}
+
+ManagementObject::shared_ptr PreviewConnection::GetManagementObject (void) const
+{
+ return mgmtWrapper.get() ? mgmtWrapper->getManagementObject() : ManagementObject::shared_ptr();
+}
+
+Manageable::status_t PreviewConnection::ManagementMethod (uint32_t methodId,
+ Args& args)
+{
+ Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
+
+ QPID_LOG (debug, "PreviewConnection::ManagementMethod [id=" << methodId << "]");
+
+ switch (methodId)
+ {
+ case management::Client::METHOD_CLOSE :
+ mgmtClosing = 1;
+ if (mgmtWrapper.get()) mgmtWrapper->closing();
+ status = Manageable::STATUS_OK;
+ break;
+ case management::Link::METHOD_BRIDGE :
+ //queue this up and request chance to do output (i.e. get connections thread of control):
+ mgmtWrapper->process(*this, args);
+ out->activateOutput();
+ status = Manageable::STATUS_OK;
+ break;
+ }
+
+ return status;
+}
+
+PreviewConnection::MgmtLink::MgmtLink(PreviewConnection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId)
+ : channelCounter(1)
+{
+ mgmtLink = management::Link::shared_ptr
+ (new management::Link(conn, parent, mgmtId));
+ agent->addObject (mgmtLink);
+}
+
+PreviewConnection::MgmtLink::~MgmtLink()
+{
+ if (mgmtLink.get () != 0)
+ mgmtLink->resourceDestroy ();
+}
+
+void PreviewConnection::MgmtLink::received(framing::AMQFrame& frame)
+{
+ if (mgmtLink.get () != 0)
+ {
+ mgmtLink->inc_framesFromPeer ();
+ mgmtLink->inc_bytesFromPeer (frame.size ());
+ }
+}
+
+management::ManagementObject::shared_ptr PreviewConnection::MgmtLink::getManagementObject() const
+{
+ return dynamic_pointer_cast<ManagementObject>(mgmtLink);
+}
+
+void PreviewConnection::MgmtLink::closing()
+{
+ if (mgmtLink) mgmtLink->set_closing (1);
+}
+
+void PreviewConnection::MgmtLink::processPending()
+{
+ //process any pending creates
+ if (!created.empty()) {
+ for (Bridges::iterator i = created.begin(); i != created.end(); ++i) {
+ i->create();
+ }
+ active.transfer(active.end(), created.begin(), created.end(), created);
+ }
+ if (!cancelled.empty()) {
+ //process any pending cancellations
+ for (Bridges::iterator i = cancelled.begin(); i != cancelled.end(); ++i) {
+ i->cancel();
+ }
+ cancelled.clear();
+ }
+}
+
+void PreviewConnection::MgmtLink::process(PreviewConnection& connection, const management::Args& args)
+{
+ created.push_back(new Bridge(channelCounter++, connection,
+ boost::bind(&MgmtLink::cancel, this, _1),
+ dynamic_cast<const management::ArgsLinkBridge&>(args)));
+}
+
+void PreviewConnection::MgmtLink::cancel(Bridge* b)
+{
+ //need to take this out the active map and add it to the cancelled map
+ for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
+ if (&(*i) == b) {
+ cancelled.transfer(cancelled.end(), i, active);
+ break;
+ }
+ }
+}
+
+PreviewConnection::MgmtClient::MgmtClient(PreviewConnection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId)
+{
+ mgmtClient = management::Client::shared_ptr
+ (new management::Client (conn, parent, mgmtId));
+ agent->addObject (mgmtClient);
+}
+
+PreviewConnection::MgmtClient::~MgmtClient()
+{
+ if (mgmtClient.get () != 0)
+ mgmtClient->resourceDestroy ();
+}
+
+void PreviewConnection::MgmtClient::received(framing::AMQFrame& frame)
+{
+ if (mgmtClient.get () != 0)
+ {
+ mgmtClient->inc_framesFromClient ();
+ mgmtClient->inc_bytesFromClient (frame.size ());
+ }
+}
+
+management::ManagementObject::shared_ptr PreviewConnection::MgmtClient::getManagementObject() const
+{
+ return dynamic_pointer_cast<ManagementObject>(mgmtClient);
+}
+
+void PreviewConnection::MgmtClient::closing()
+{
+ if (mgmtClient) mgmtClient->set_closing (1);
+}
+
+}}
+
diff --git a/cpp/src/qpid/broker/PreviewConnection.h b/cpp/src/qpid/broker/PreviewConnection.h
new file mode 100644
index 0000000000..d6a945c26c
--- /dev/null
+++ b/cpp/src/qpid/broker/PreviewConnection.h
@@ -0,0 +1,118 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _PreviewConnection_
+#define _PreviewConnection_
+
+#include <memory>
+#include <sstream>
+#include <vector>
+
+#include <boost/ptr_container/ptr_map.hpp>
+
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/AMQP_ServerOperations.h"
+#include "qpid/framing/AMQP_ClientProxy.h"
+#include "qpid/sys/AggregateOutput.h"
+#include "qpid/sys/ConnectionOutputHandler.h"
+#include "qpid/sys/ConnectionInputHandler.h"
+#include "qpid/sys/TimeoutHandler.h"
+#include "qpid/framing/ProtocolVersion.h"
+#include "Broker.h"
+#include "qpid/sys/Socket.h"
+#include "qpid/Exception.h"
+#include "PreviewConnectionHandler.h"
+#include "ConnectionState.h"
+#include "PreviewSessionHandler.h"
+#include "qpid/management/Manageable.h"
+#include "qpid/management/Client.h"
+#include "qpid/management/Link.h"
+
+#include <boost/ptr_container/ptr_map.hpp>
+
+namespace qpid {
+namespace broker {
+
+class PreviewConnection : public sys::ConnectionInputHandler,
+ public ConnectionState
+{
+ public:
+ PreviewConnection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId);
+ ~PreviewConnection ();
+
+ /** Get the PreviewSessionHandler for channel. Create if it does not already exist */
+ PreviewSessionHandler& getChannel(framing::ChannelId channel);
+
+ /** Close the connection */
+ void close(framing::ReplyCode code, const string& text, framing::ClassId classId, framing::MethodId methodId);
+
+ // ConnectionInputHandler methods
+ void received(framing::AMQFrame& frame);
+ void initiated(const framing::ProtocolInitiation& header);
+ void idleOut();
+ void idleIn();
+ void closed();
+ bool doOutput();
+ framing::ProtocolInitiation getInitiation() { return framing::ProtocolInitiation(version); }
+
+ void closeChannel(framing::ChannelId channel);
+
+ // Manageable entry points
+ management::ManagementObject::shared_ptr GetManagementObject (void) const;
+ management::Manageable::status_t
+ ManagementMethod (uint32_t methodId, management::Args& args);
+
+ void initMgmt(bool asLink = false);
+
+ private:
+ typedef boost::ptr_map<framing::ChannelId, PreviewSessionHandler> ChannelMap;
+ typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
+
+ /**
+ * Connection may appear, for the purposes of management, as a
+ * normal client initiated connection or as an agent initiated
+ * inter-broker link. This wrapper abstracts the common interface
+ * for both.
+ */
+ class MgmtWrapper
+ {
+ public:
+ virtual ~MgmtWrapper(){}
+ virtual void received(framing::AMQFrame& frame) = 0;
+ virtual management::ManagementObject::shared_ptr getManagementObject() const = 0;
+ virtual void closing() = 0;
+ virtual void processPending(){}
+ virtual void process(PreviewConnection&, const management::Args&){}
+ };
+ class MgmtClient;
+ class MgmtLink;
+
+ ChannelMap channels;
+ framing::AMQP_ClientProxy::Connection* client;
+ uint64_t stagingThreshold;
+ PreviewConnectionHandler adapter;
+ std::auto_ptr<MgmtWrapper> mgmtWrapper;
+ bool mgmtClosing;
+ const std::string mgmtId;
+};
+
+}}
+
+#endif
diff --git a/cpp/src/qpid/broker/PreviewConnectionHandler.cpp b/cpp/src/qpid/broker/PreviewConnectionHandler.cpp
new file mode 100644
index 0000000000..c0f0d9f5e0
--- /dev/null
+++ b/cpp/src/qpid/broker/PreviewConnectionHandler.cpp
@@ -0,0 +1,158 @@
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "PreviewConnectionHandler.h"
+#include "PreviewConnection.h"
+#include "qpid/framing/ConnectionStartBody.h"
+#include "qpid/framing/ClientInvoker.h"
+#include "qpid/framing/ServerInvoker.h"
+
+using namespace qpid;
+using namespace qpid::broker;
+using namespace qpid::framing;
+
+
+namespace
+{
+const std::string PLAIN = "PLAIN";
+const std::string en_US = "en_US";
+}
+
+void PreviewConnectionHandler::init(const framing::ProtocolInitiation& header) {
+ FieldTable properties;
+ string mechanisms(PLAIN);
+ string locales(en_US);
+ handler->serverMode = true;
+ handler->client.start(header.getMajor(), header.getMinor(), properties, mechanisms, locales);
+}
+
+void PreviewConnectionHandler::close(ReplyCode code, const string& text, ClassId classId, MethodId methodId)
+{
+ handler->client.close(code, text, classId, methodId);
+}
+
+void PreviewConnectionHandler::handle(framing::AMQFrame& frame)
+{
+ AMQMethodBody* method=frame.getBody()->getMethod();
+ try{
+ if (handler->serverMode) {
+ if (!invoke(static_cast<AMQP_ServerOperations::ConnectionHandler&>(*handler.get()), *method))
+ throw ChannelErrorException(QPID_MSG("Class can't be accessed over channel 0"));
+ } else {
+ if (!invoke(static_cast<AMQP_ClientOperations::ConnectionHandler&>(*handler.get()), *method))
+ throw ChannelErrorException(QPID_MSG("Class can't be accessed over channel 0"));
+ }
+ }catch(ConnectionException& e){
+ handler->client.close(e.code, e.what(), method->amqpClassId(), method->amqpMethodId());
+ }catch(std::exception& e){
+ handler->client.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId());
+ }
+}
+
+PreviewConnectionHandler::PreviewConnectionHandler(PreviewConnection& connection) : handler(new Handler(connection)) {}
+
+PreviewConnectionHandler::Handler:: Handler(PreviewConnection& c) : client(c.getOutput()), server(c.getOutput()),
+ connection(c), serverMode(false) {}
+
+void PreviewConnectionHandler::Handler::startOk(const framing::FieldTable& /*clientProperties*/,
+ const string& mechanism,
+ const string& response, const string& /*locale*/)
+{
+ //TODO: handle SASL mechanisms more cleverly
+ if (mechanism == PLAIN) {
+ if (response.size() > 0 && response[0] == (char) 0) {
+ string temp = response.substr(1);
+ string::size_type i = temp.find((char)0);
+ string uid = temp.substr(0, i);
+ string pwd = temp.substr(i + 1);
+ //TODO: authentication
+ connection.setUserId(uid);
+ }
+ }
+ client.tune(framing::CHANNEL_MAX, connection.getFrameMax(), connection.getHeartbeat());
+}
+
+void PreviewConnectionHandler::Handler::secureOk(const string& /*response*/){}
+
+void PreviewConnectionHandler::Handler::tuneOk(uint16_t /*channelmax*/,
+ uint32_t framemax, uint16_t heartbeat)
+{
+ connection.setFrameMax(framemax);
+ connection.setHeartbeat(heartbeat);
+}
+
+void PreviewConnectionHandler::Handler::open(const string& /*virtualHost*/,
+ const string& /*capabilities*/, bool /*insist*/)
+{
+ string knownhosts;
+ client.openOk(knownhosts);
+}
+
+
+void PreviewConnectionHandler::Handler::close(uint16_t /*replyCode*/, const string& /*replyText*/,
+ uint16_t /*classId*/, uint16_t /*methodId*/)
+{
+ client.closeOk();
+ connection.getOutput().close();
+}
+
+void PreviewConnectionHandler::Handler::closeOk(){
+ connection.getOutput().close();
+}
+
+
+void PreviewConnectionHandler::Handler::start(uint8_t /*versionMajor*/,
+ uint8_t /*versionMinor*/,
+ const FieldTable& /*serverProperties*/,
+ const string& /*mechanisms*/,
+ const string& /*locales*/)
+{
+ string uid = "qpidd";
+ string pwd = "qpidd";
+ string response = ((char)0) + uid + ((char)0) + pwd;
+ server.startOk(FieldTable(), PLAIN, response, en_US);
+ connection.initMgmt(true);
+}
+
+void PreviewConnectionHandler::Handler::secure(const string& /*challenge*/)
+{
+ server.secureOk("");
+}
+
+void PreviewConnectionHandler::Handler::tune(uint16_t channelMax,
+ uint32_t frameMax,
+ uint16_t heartbeat)
+{
+ connection.setFrameMax(frameMax);
+ connection.setHeartbeat(heartbeat);
+ server.tuneOk(channelMax, frameMax, heartbeat);
+ server.open("/", "", true);
+}
+
+void PreviewConnectionHandler::Handler::openOk(const string& /*knownHosts*/)
+{
+}
+
+void PreviewConnectionHandler::Handler::redirect(const string& /*host*/, const string& /*knownHosts*/)
+{
+
+}
diff --git a/cpp/src/qpid/broker/PreviewConnectionHandler.h b/cpp/src/qpid/broker/PreviewConnectionHandler.h
new file mode 100644
index 0000000000..93901dd492
--- /dev/null
+++ b/cpp/src/qpid/broker/PreviewConnectionHandler.h
@@ -0,0 +1,92 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _PreviewConnectionAdapter_
+#define _PreviewConnectionAdapter_
+
+#include <memory>
+#include "qpid/framing/amqp_types.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/AMQP_ClientOperations.h"
+#include "qpid/framing/AMQP_ClientProxy.h"
+#include "qpid/framing/AMQP_ServerOperations.h"
+#include "qpid/framing/AMQP_ServerProxy.h"
+#include "qpid/framing/FrameHandler.h"
+#include "qpid/framing/ProtocolInitiation.h"
+#include "qpid/framing/ProtocolVersion.h"
+#include "qpid/Exception.h"
+
+namespace qpid {
+namespace broker {
+
+class PreviewConnection;
+
+// TODO aconway 2007-09-18: Rename to ConnectionHandler
+class PreviewConnectionHandler : public framing::FrameHandler
+{
+ struct Handler : public framing::AMQP_ServerOperations::ConnectionHandler,
+ public framing::AMQP_ClientOperations::ConnectionHandler
+ {
+ framing::AMQP_ClientProxy::Connection client;
+ framing::AMQP_ServerProxy::Connection server;
+ PreviewConnection& connection;
+ bool serverMode;
+
+ Handler(PreviewConnection& connection);
+ void startOk(const qpid::framing::FieldTable& clientProperties,
+ const std::string& mechanism, const std::string& response,
+ const std::string& locale);
+ void secureOk(const std::string& response);
+ void tuneOk(uint16_t channelMax, uint32_t frameMax, uint16_t heartbeat);
+ void open(const std::string& virtualHost,
+ const std::string& capabilities, bool insist);
+ void close(uint16_t replyCode, const std::string& replyText,
+ uint16_t classId, uint16_t methodId);
+ void closeOk();
+
+
+ void start(uint8_t versionMajor,
+ uint8_t versionMinor,
+ const qpid::framing::FieldTable& serverProperties,
+ const std::string& mechanisms,
+ const std::string& locales);
+
+ void secure(const std::string& challenge);
+
+ void tune(uint16_t channelMax,
+ uint32_t frameMax,
+ uint16_t heartbeat);
+
+ void openOk(const std::string& knownHosts);
+
+ void redirect(const std::string& host, const std::string& knownHosts);
+ };
+ std::auto_ptr<Handler> handler;
+ public:
+ PreviewConnectionHandler(PreviewConnection& connection);
+ void init(const framing::ProtocolInitiation& header);
+ void close(framing::ReplyCode code, const std::string& text, framing::ClassId classId, framing::MethodId methodId);
+ void handle(framing::AMQFrame& frame);
+};
+
+
+}}
+
+#endif
diff --git a/cpp/src/qpid/broker/PreviewSessionHandler.cpp b/cpp/src/qpid/broker/PreviewSessionHandler.cpp
new file mode 100644
index 0000000000..19e6a235c4
--- /dev/null
+++ b/cpp/src/qpid/broker/PreviewSessionHandler.cpp
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "PreviewSessionHandler.h"
+#include "SessionState.h"
+#include "PreviewConnection.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/constants.h"
+#include "qpid/framing/ClientInvoker.h"
+#include "qpid/framing/ServerInvoker.h"
+#include "qpid/log/Statement.h"
+
+#include <boost/bind.hpp>
+
+namespace qpid {
+namespace broker {
+using namespace framing;
+using namespace std;
+using namespace qpid::sys;
+
+PreviewSessionHandler::PreviewSessionHandler(PreviewConnection& c, ChannelId ch)
+ : SessionContext(c.getOutput()),
+ connection(c), channel(ch, &c.getOutput()),
+ proxy(out), // Via my own handleOut() for L2 data.
+ peerSession(channel), // Direct to channel for L2 commands.
+ ignoring(false) {}
+
+PreviewSessionHandler::~PreviewSessionHandler() {}
+
+namespace {
+ClassId classId(AMQMethodBody* m) { return m ? m->amqpMethodId() : 0; }
+MethodId methodId(AMQMethodBody* m) { return m ? m->amqpClassId() : 0; }
+} // namespace
+
+void PreviewSessionHandler::handleIn(AMQFrame& f) {
+ // Note on channel states: a channel is open if session != 0. A
+ // channel that is closed (session == 0) can be in the "ignoring"
+ // state. This is a temporary state after we have sent a channel
+ // exception, where extra frames might arrive that should be
+ // ignored.
+ //
+ AMQMethodBody* m = f.getBody()->getMethod();
+ try {
+ if (m && invoke(static_cast<AMQP_ServerOperations::SessionHandler&>(*this), *m)) {
+ return;
+ } else if (session.get()) {
+ boost::optional<SequenceNumber> ack=session->received(f);
+ session->in.handle(f);
+ if (ack)
+ peerSession.ack(*ack, SequenceNumberSet());
+ } else if (m && invoke(static_cast<AMQP_ClientOperations::SessionHandler&>(*this), *m)) {
+ return;
+ } else if (!ignoring) {
+ throw ChannelErrorException(
+ QPID_MSG("Channel " << channel.get() << " is not open"));
+ }
+ } catch(const ChannelException& e) {
+ ignoring=true; // Ignore trailing frames sent by client.
+ session->detach();
+ session.reset();
+ peerSession.closed(e.code, e.what());
+ }catch(const ConnectionException& e){
+ connection.close(e.code, e.what(), classId(m), methodId(m));
+ }catch(const std::exception& e){
+ connection.close(
+ framing::INTERNAL_ERROR, e.what(), classId(m), methodId(m));
+ }
+}
+
+void PreviewSessionHandler::handleOut(AMQFrame& f) {
+ channel.handle(f); // Send it.
+ if (session->sent(f))
+ peerSession.solicitAck();
+}
+
+void PreviewSessionHandler::assertAttached(const char* method) const {
+ if (!session.get())
+ throw ChannelErrorException(
+ QPID_MSG(method << " failed: No session for channel "
+ << getChannel()));
+}
+
+void PreviewSessionHandler::assertClosed(const char* method) const {
+ if (session.get())
+ throw ChannelBusyException(
+ QPID_MSG(method << " failed: channel " << channel.get()
+ << " is already open."));
+}
+
+void PreviewSessionHandler::open(uint32_t detachedLifetime) {
+ assertClosed("open");
+ std::auto_ptr<SessionState> state(
+ connection.broker.getSessionManager().open(*this, detachedLifetime));
+ session.reset(state.release());
+ peerSession.attached(session->getId(), session->getTimeout());
+}
+
+void PreviewSessionHandler::resume(const Uuid& id) {
+ assertClosed("resume");
+ session = connection.broker.getSessionManager().resume(id);
+ session->attach(*this);
+ SequenceNumber seq = session->resuming();
+ peerSession.attached(session->getId(), session->getTimeout());
+ proxy.getSession().ack(seq, SequenceNumberSet());
+}
+
+void PreviewSessionHandler::flow(bool /*active*/) {
+ assertAttached("flow");
+ // TODO aconway 2007-09-19: Removed in 0-10, remove
+ assert(0); throw NotImplementedException("session.flow");
+}
+
+void PreviewSessionHandler::flowOk(bool /*active*/) {
+ assertAttached("flowOk");
+ // TODO aconway 2007-09-19: Removed in 0-10, remove
+ assert(0); throw NotImplementedException("session.flowOk");
+}
+
+void PreviewSessionHandler::close() {
+ assertAttached("close");
+ QPID_LOG(info, "Received session.close");
+ ignoring=false;
+ session->detach();
+ session.reset();
+ peerSession.closed(REPLY_SUCCESS, "ok");
+ assert(&connection.getChannel(channel.get()) == this);
+ connection.closeChannel(channel.get());
+}
+
+void PreviewSessionHandler::closed(uint16_t replyCode, const string& replyText) {
+ QPID_LOG(warning, "Received session.closed: "<<replyCode<<" "<<replyText);
+ ignoring=false;
+ session->detach();
+ session.reset();
+}
+
+void PreviewSessionHandler::localSuspend() {
+ if (session.get() && session->isAttached()) {
+ session->detach();
+ connection.broker.getSessionManager().suspend(session);
+ session.reset();
+ }
+}
+
+void PreviewSessionHandler::suspend() {
+ assertAttached("suspend");
+ localSuspend();
+ peerSession.detached();
+ assert(&connection.getChannel(channel.get()) == this);
+ connection.closeChannel(channel.get());
+}
+
+void PreviewSessionHandler::ack(uint32_t cumulativeSeenMark,
+ const SequenceNumberSet& /*seenFrameSet*/)
+{
+ assertAttached("ack");
+ if (session->getState() == SessionState::RESUMING) {
+ session->receivedAck(cumulativeSeenMark);
+ framing::SessionState::Replay replay=session->replay();
+ std::for_each(replay.begin(), replay.end(),
+ boost::bind(&PreviewSessionHandler::handleOut, this, _1));
+ }
+ else
+ session->receivedAck(cumulativeSeenMark);
+}
+
+void PreviewSessionHandler::highWaterMark(uint32_t /*lastSentMark*/) {
+ // TODO aconway 2007-10-02: may be removed from spec.
+ assert(0); throw NotImplementedException("session.high-water-mark");
+}
+
+void PreviewSessionHandler::solicitAck() {
+ assertAttached("solicit-ack");
+ peerSession.ack(session->sendingAck(), SequenceNumberSet());
+}
+
+void PreviewSessionHandler::attached(const Uuid& /*sessionId*/, uint32_t detachedLifetime)
+{
+ std::auto_ptr<SessionState> state(
+ connection.broker.getSessionManager().open(*this, detachedLifetime));
+ session.reset(state.release());
+}
+
+void PreviewSessionHandler::detached()
+{
+ connection.broker.getSessionManager().suspend(session);
+ session.reset();
+}
+
+ConnectionState& PreviewSessionHandler::getConnection() { return connection; }
+const ConnectionState& PreviewSessionHandler::getConnection() const { return connection; }
+
+}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/PreviewSessionHandler.h b/cpp/src/qpid/broker/PreviewSessionHandler.h
new file mode 100644
index 0000000000..e1096ebf9f
--- /dev/null
+++ b/cpp/src/qpid/broker/PreviewSessionHandler.h
@@ -0,0 +1,111 @@
+#ifndef QPID_BROKER_PREVIEWSESSIONHANDLER_H
+#define QPID_BROKER_PREVIEWSESSIONHANDLER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/framing/FrameHandler.h"
+#include "qpid/framing/AMQP_ClientOperations.h"
+#include "qpid/framing/AMQP_ServerOperations.h"
+#include "qpid/framing/AMQP_ClientProxy.h"
+#include "qpid/framing/amqp_types.h"
+#include "qpid/framing/ChannelHandler.h"
+#include "SessionContext.h"
+
+#include <boost/noncopyable.hpp>
+
+namespace qpid {
+namespace broker {
+
+class PreviewConnection;
+class SessionState;
+
+/**
+ * A SessionHandler is associated with each active channel. It
+ * receives incoming frames, handles session commands and manages the
+ * association between the channel and a session.
+ */
+class PreviewSessionHandler : public framing::AMQP_ServerOperations::SessionHandler,
+ public framing::AMQP_ClientOperations::SessionHandler,
+ public SessionContext,
+ private boost::noncopyable
+{
+ public:
+ PreviewSessionHandler(PreviewConnection&, framing::ChannelId);
+ ~PreviewSessionHandler();
+
+ /** Returns 0 if not attached to a session */
+ SessionState* getSession() { return session.get(); }
+ const SessionState* getSession() const { return session.get(); }
+
+ framing::ChannelId getChannel() const { return channel.get(); }
+
+ ConnectionState& getConnection();
+ const ConnectionState& getConnection() const;
+
+ framing::AMQP_ClientProxy& getProxy() { return proxy; }
+ const framing::AMQP_ClientProxy& getProxy() const { return proxy; }
+
+ // Called by closing connection.
+ void localSuspend();
+ void detach() { localSuspend(); }
+
+ protected:
+ void handleIn(framing::AMQFrame&);
+ void handleOut(framing::AMQFrame&);
+
+ private:
+ /// Session methods
+ void open(uint32_t detachedLifetime);
+ void flow(bool active);
+ void flowOk(bool active);
+ void close();
+ void closed(uint16_t replyCode, const std::string& replyText);
+ void resume(const framing::Uuid& sessionId);
+ void suspend();
+ void ack(uint32_t cumulativeSeenMark,
+ const framing::SequenceNumberSet& seenFrameSet);
+ void highWaterMark(uint32_t lastSentMark);
+ void solicitAck();
+
+ //extra methods required for assuming client role
+ void attached(const framing::Uuid& sessionId, uint32_t detachedLifetime);
+ void detached();
+
+
+ void assertAttached(const char* method) const;
+ void assertActive(const char* method) const;
+ void assertClosed(const char* method) const;
+
+
+ PreviewConnection& connection;
+ framing::ChannelHandler channel;
+ framing::AMQP_ClientProxy proxy;
+ framing::AMQP_ClientProxy::Session peerSession;
+ bool ignoring;
+ std::auto_ptr<SessionState> session;
+};
+
+}} // namespace qpid::broker
+
+
+
+#endif /*!QPID_BROKER_SESSIONHANDLER_H*/
diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp
index 32c032e701..2a79496144 100644
--- a/cpp/src/qpid/broker/SemanticHandler.cpp
+++ b/cpp/src/qpid/broker/SemanticHandler.cpp
@@ -21,11 +21,10 @@
#include "SemanticHandler.h"
#include "SemanticState.h"
-#include "SessionHandler.h"
+#include "SessionContext.h"
#include "SessionState.h"
#include "BrokerAdapter.h"
#include "MessageDelivery.h"
-#include "Connection.h"
#include "qpid/framing/ExecutionCompleteBody.h"
#include "qpid/framing/ExecutionResultBody.h"
#include "qpid/framing/ServerInvoker.h"
@@ -165,7 +164,7 @@ void SemanticHandler::handleContent(AMQFrame& frame)
DeliveryId SemanticHandler::deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token)
{
- SessionHandler* handler = session.getHandler();
+ SessionContext* handler = session.getHandler();
if (handler) {
uint32_t maxFrameSize = handler->getConnection().getFrameMax();
MessageDelivery::deliver(msg, handler->out, ++outgoing.hwm, token, maxFrameSize);
diff --git a/cpp/src/qpid/broker/SemanticHandler.h b/cpp/src/qpid/broker/SemanticHandler.h
index 52dfa4dcf9..d7f3ec8799 100644
--- a/cpp/src/qpid/broker/SemanticHandler.h
+++ b/cpp/src/qpid/broker/SemanticHandler.h
@@ -77,7 +77,7 @@ class SemanticHandler : public DeliveryAdapter,
DeliveryId deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token);
framing::AMQP_ClientProxy& getProxy() { return session.getProxy(); }
- Connection& getConnection() { return session.getConnection(); }
+ //Connection& getConnection() { return session.getConnection(); }
Broker& getBroker() { return session.getBroker(); }
public:
diff --git a/cpp/src/qpid/broker/SessionContext.h b/cpp/src/qpid/broker/SessionContext.h
new file mode 100644
index 0000000000..a27b43cf65
--- /dev/null
+++ b/cpp/src/qpid/broker/SessionContext.h
@@ -0,0 +1,53 @@
+#ifndef QPID_BROKER_SESSIONCONTEXT_H
+#define QPID_BROKER_SESSIONCONTEXT_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/framing/FrameHandler.h"
+#include "qpid/framing/AMQP_ClientProxy.h"
+#include "qpid/framing/amqp_types.h"
+#include "ConnectionState.h"
+
+
+#include <boost/noncopyable.hpp>
+
+namespace qpid {
+namespace broker {
+
+class SessionContext : public framing::FrameHandler::InOutHandler
+{
+ public:
+ SessionContext(qpid::framing::OutputHandler& out) : InOutHandler(0, &out) {}
+ virtual ~SessionContext(){}
+ virtual ConnectionState& getConnection() = 0;
+ virtual const ConnectionState& getConnection() const = 0;
+ virtual framing::AMQP_ClientProxy& getProxy() = 0;
+ virtual const framing::AMQP_ClientProxy& getProxy() const = 0;
+ virtual void detach() = 0;
+ virtual framing::ChannelId getChannel() const = 0;
+};
+
+}} // namespace qpid::broker
+
+
+
+#endif /*!QPID_BROKER_SESSIONCONTEXT_H*/
diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp
index fb46cb522d..1cb10d0c19 100644
--- a/cpp/src/qpid/broker/SessionHandler.cpp
+++ b/cpp/src/qpid/broker/SessionHandler.cpp
@@ -36,7 +36,7 @@ using namespace std;
using namespace qpid::sys;
SessionHandler::SessionHandler(Connection& c, ChannelId ch)
- : InOutHandler(0, &c.getOutput()),
+ : SessionContext(c.getOutput()),
connection(c), channel(ch, &c.getOutput()),
proxy(out), // Via my own handleOut() for L2 data.
peerSession(channel), // Direct to channel for L2 commands.
@@ -204,4 +204,8 @@ void SessionHandler::detached()
session.reset();
}
+
+ConnectionState& SessionHandler::getConnection() { return connection; }
+const ConnectionState& SessionHandler::getConnection() const { return connection; }
+
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/SessionHandler.h b/cpp/src/qpid/broker/SessionHandler.h
index 6f6f5e941f..5a72bfb12d 100644
--- a/cpp/src/qpid/broker/SessionHandler.h
+++ b/cpp/src/qpid/broker/SessionHandler.h
@@ -28,6 +28,7 @@
#include "qpid/framing/AMQP_ClientProxy.h"
#include "qpid/framing/amqp_types.h"
#include "qpid/framing/ChannelHandler.h"
+#include "SessionContext.h"
#include <boost/noncopyable.hpp>
@@ -42,9 +43,9 @@ class SessionState;
* receives incoming frames, handles session commands and manages the
* association between the channel and a session.
*/
-class SessionHandler : public framing::FrameHandler::InOutHandler,
- public framing::AMQP_ServerOperations::SessionHandler,
+class SessionHandler : public framing::AMQP_ServerOperations::SessionHandler,
public framing::AMQP_ClientOperations::SessionHandler,
+ public SessionContext,
private boost::noncopyable
{
public:
@@ -57,14 +58,15 @@ class SessionHandler : public framing::FrameHandler::InOutHandler,
framing::ChannelId getChannel() const { return channel.get(); }
- Connection& getConnection() { return connection; }
- const Connection& getConnection() const { return connection; }
+ ConnectionState& getConnection();
+ const ConnectionState& getConnection() const;
framing::AMQP_ClientProxy& getProxy() { return proxy; }
const framing::AMQP_ClientProxy& getProxy() const { return proxy; }
// Called by closing connection.
void localSuspend();
+ void detach() { localSuspend(); }
protected:
void handleIn(framing::AMQFrame&);
diff --git a/cpp/src/qpid/broker/SessionManager.cpp b/cpp/src/qpid/broker/SessionManager.cpp
index 571d3365db..aa7ac9a8bb 100644
--- a/cpp/src/qpid/broker/SessionManager.cpp
+++ b/cpp/src/qpid/broker/SessionManager.cpp
@@ -45,7 +45,7 @@ SessionManager::~SessionManager() {}
// FIXME aconway 2008-02-01: pass handler*, allow open unattached.
std::auto_ptr<SessionState> SessionManager::open(
- SessionHandler& h, uint32_t timeout_)
+ SessionContext& h, uint32_t timeout_)
{
Mutex::ScopedLock l(lock);
std::auto_ptr<SessionState> session(
diff --git a/cpp/src/qpid/broker/SessionManager.h b/cpp/src/qpid/broker/SessionManager.h
index bb61f5a8be..94956a83ed 100644
--- a/cpp/src/qpid/broker/SessionManager.h
+++ b/cpp/src/qpid/broker/SessionManager.h
@@ -38,7 +38,7 @@ namespace qpid {
namespace broker {
class SessionState;
-class SessionHandler;
+class SessionContext;
/**
* Create and manage SessionState objects.
@@ -57,7 +57,7 @@ class SessionManager : private boost::noncopyable {
~SessionManager();
/** Open a new active session, caller takes ownership */
- std::auto_ptr<SessionState> open(SessionHandler& h, uint32_t timeout_);
+ std::auto_ptr<SessionState> open(SessionContext& c, uint32_t timeout_);
/** Suspend a session, start it's timeout counter.
* The factory takes ownership.
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index 80fafe0386..b6c59cfb3b 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -20,8 +20,8 @@
*/
#include "SessionState.h"
#include "SessionManager.h"
-#include "SessionHandler.h"
-#include "Connection.h"
+#include "SessionContext.h"
+#include "ConnectionState.h"
#include "Broker.h"
#include "SemanticHandler.h"
#include "qpid/framing/reply_exceptions.h"
@@ -37,7 +37,7 @@ using qpid::management::Manageable;
using qpid::management::Args;
SessionState::SessionState(
- SessionManager* f, SessionHandler* h, uint32_t timeout_, uint32_t ack)
+ SessionManager* f, SessionContext* h, uint32_t timeout_, uint32_t ack)
: framing::SessionState(ack, timeout_ > 0),
factory(f), handler(h), id(true), timeout(timeout_),
broker(h->getConnection().broker),
@@ -76,7 +76,7 @@ SessionState::~SessionState() {
mgmtObject->resourceDestroy ();
}
-SessionHandler* SessionState::getHandler() {
+SessionContext* SessionState::getHandler() {
return handler;
}
@@ -85,7 +85,7 @@ AMQP_ClientProxy& SessionState::getProxy() {
return getHandler()->getProxy();
}
-Connection& SessionState::getConnection() {
+ConnectionState& SessionState::getConnection() {
assert(isAttached());
return getHandler()->getConnection();
}
@@ -100,7 +100,7 @@ void SessionState::detach() {
}
}
-void SessionState::attach(SessionHandler& h) {
+void SessionState::attach(SessionContext& h) {
{
Mutex::ScopedLock l(lock);
handler = &h;
@@ -141,7 +141,7 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId,
case management::Session::METHOD_DETACH :
if (handler != 0)
{
- handler->localSuspend ();
+ handler->detach();
}
status = Manageable::STATUS_OK;
break;
diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h
index bc1b974eaa..8a12e580b7 100644
--- a/cpp/src/qpid/broker/SessionState.h
+++ b/cpp/src/qpid/broker/SessionState.h
@@ -48,10 +48,10 @@ class AMQP_ClientProxy;
namespace broker {
class SemanticHandler;
-class SessionHandler;
+class SessionContext;
class SessionManager;
class Broker;
-class Connection;
+class ConnectionState;
/**
* Broker-side session state includes sessions handler chains, which may
@@ -67,16 +67,16 @@ class SessionState : public framing::SessionState,
bool isAttached() { return handler; }
void detach();
- void attach(SessionHandler& handler);
+ void attach(SessionContext& handler);
- SessionHandler* getHandler();
+ SessionContext* getHandler();
/** @pre isAttached() */
framing::AMQP_ClientProxy& getProxy();
/** @pre isAttached() */
- Connection& getConnection();
+ ConnectionState& getConnection();
uint32_t getTimeout() const { return timeout; }
Broker& getBroker() { return broker; }
@@ -92,14 +92,14 @@ class SessionState : public framing::SessionState,
// Normally SessionManager creates sessions.
SessionState(SessionManager*,
- SessionHandler* out,
+ SessionContext* out,
uint32_t timeout,
uint32_t ackInterval);
private:
SessionManager* factory;
- SessionHandler* handler;
+ SessionContext* handler;
framing::Uuid id;
uint32_t timeout;
sys::AbsTime expiry; // Used by SessionManager.
diff --git a/cpp/src/qpid/client/Channel.cpp b/cpp/src/qpid/client/Channel.cpp
index a6875fcb63..4af69c8552 100644
--- a/cpp/src/qpid/client/Channel.cpp
+++ b/cpp/src/qpid/client/Channel.cpp
@@ -46,9 +46,9 @@ const std::string empty;
class ScopedSync
{
- Session_0_10& session;
+ Session& session;
public:
- ScopedSync(Session_0_10& s, bool enabled = true) : session(s) { session.setSynchronous(enabled); }
+ ScopedSync(Session& s, bool enabled = true) : session(s) { session.setSynchronous(enabled); }
~ScopedSync() { session.setSynchronous(false); }
};
@@ -63,7 +63,7 @@ Channel::~Channel()
join();
}
-void Channel::open(const Session_0_10& s)
+void Channel::open(const Session& s)
{
Mutex::ScopedLock l(stopLock);
if (isOpen())
diff --git a/cpp/src/qpid/client/Channel.h b/cpp/src/qpid/client/Channel.h
index 35e306e9b5..2cda97dc63 100644
--- a/cpp/src/qpid/client/Channel.h
+++ b/cpp/src/qpid/client/Channel.h
@@ -29,7 +29,7 @@
#include "Message.h"
#include "Queue.h"
#include "ConnectionImpl.h"
-#include "qpid/client/Session_0_10.h"
+#include "qpid/client/Session.h"
#include "qpid/Exception.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/Runnable.h"
@@ -79,7 +79,7 @@ class Channel : private sys::Runnable
bool running;
ConsumerMap consumers;
- Session_0_10 session;
+ Session session;
framing::ChannelId channelId;
sys::BlockingQueue<framing::FrameSet::shared_ptr> gets;
framing::Uuid uniqueId;
@@ -88,7 +88,7 @@ class Channel : private sys::Runnable
void stop();
- void open(const Session_0_10& session);
+ void open(const Session& session);
void closeInternal();
void join();
diff --git a/cpp/src/qpid/client/Connection.cpp b/cpp/src/qpid/client/Connection.cpp
index 26113c1254..872e04b3b5 100644
--- a/cpp/src/qpid/client/Connection.cpp
+++ b/cpp/src/qpid/client/Connection.cpp
@@ -73,7 +73,7 @@ void Connection::openChannel(Channel& channel) {
channel.open(newSession(ASYNC));
}
-Session_0_10 Connection::newSession(SynchronousMode sync,
+Session Connection::newSession(SynchronousMode sync,
uint32_t detachedLifetime)
{
shared_ptr<SessionCore> core(
@@ -81,10 +81,10 @@ Session_0_10 Connection::newSession(SynchronousMode sync,
core->setSync(sync);
impl->addSession(core);
core->open(detachedLifetime);
- return Session_0_10(core);
+ return Session(core);
}
-void Connection::resume(Session_0_10& session) {
+void Connection::resume(Session& session) {
session.impl->setChannel(++channelIdCounter);
impl->addSession(session.impl);
session.impl->resume(impl);
diff --git a/cpp/src/qpid/client/Connection.h b/cpp/src/qpid/client/Connection.h
index e6bfbddef6..81d9b972b6 100644
--- a/cpp/src/qpid/client/Connection.h
+++ b/cpp/src/qpid/client/Connection.h
@@ -25,7 +25,7 @@
#include <string>
#include "Channel.h"
#include "ConnectionImpl.h"
-#include "qpid/client/Session_0_10.h"
+#include "qpid/client/Session.h"
#include "qpid/framing/AMQP_HighestVersion.h"
#include "qpid/framing/Uuid.h"
@@ -134,13 +134,13 @@ class Connection
* that the broker may discard the session state. Default is 0,
* meaning the session cannot be resumed.
*/
- Session_0_10 newSession(SynchronousMode sync, uint32_t detachedLifetime=0);
+ Session newSession(SynchronousMode sync, uint32_t detachedLifetime=0);
/**
* Resume a suspendded session. A session may be resumed
* on a different connection to the one that created it.
*/
- void resume(Session_0_10& session);
+ void resume(Session& session);
};
}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/Dispatcher.cpp b/cpp/src/qpid/client/Dispatcher.cpp
index 8df4637c88..2484dabf1f 100644
--- a/cpp/src/qpid/client/Dispatcher.cpp
+++ b/cpp/src/qpid/client/Dispatcher.cpp
@@ -20,7 +20,6 @@
*/
#include "Dispatcher.h"
-#include "qpid/client/Session_0_10.h"
#include "qpid/framing/FrameSet.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/log/Statement.h"
@@ -38,7 +37,7 @@ using qpid::sys::Thread;
namespace qpid {
namespace client {
-Subscriber::Subscriber(Session_0_10& s, MessageListener* l, AckPolicy a) : session(s), listener(l), autoAck(a) {}
+Subscriber::Subscriber(Session& s, MessageListener* l, AckPolicy a) : session(s), listener(l), autoAck(a) {}
void Subscriber::received(Message& msg)
{
@@ -48,7 +47,7 @@ void Subscriber::received(Message& msg)
}
}
-Dispatcher::Dispatcher(Session_0_10& s, const std::string& q)
+Dispatcher::Dispatcher(Session& s, const std::string& q)
: session(s), running(false), autoStop(true)
{
queue = q.empty() ?
diff --git a/cpp/src/qpid/client/Dispatcher.h b/cpp/src/qpid/client/Dispatcher.h
index ae67e61299..e23d0c198c 100644
--- a/cpp/src/qpid/client/Dispatcher.h
+++ b/cpp/src/qpid/client/Dispatcher.h
@@ -25,6 +25,7 @@
#include <memory>
#include <string>
#include <boost/shared_ptr.hpp>
+#include "qpid/client/Session.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/Runnable.h"
#include "qpid/sys/Thread.h"
@@ -34,17 +35,15 @@
namespace qpid {
namespace client {
-class Session_0_10;
-
class Subscriber : public MessageListener
{
- Session_0_10& session;
+ Session& session;
MessageListener* const listener;
AckPolicy autoAck;
public:
typedef boost::shared_ptr<Subscriber> shared_ptr;
- Subscriber(Session_0_10& session, MessageListener* listener, AckPolicy);
+ Subscriber(Session& session, MessageListener* listener, AckPolicy);
void received(Message& msg);
};
@@ -56,7 +55,7 @@ class Dispatcher : public sys::Runnable
typedef std::map<std::string, Subscriber::shared_ptr> Listeners;
sys::Mutex lock;
sys::Thread worker;
- Session_0_10& session;
+ Session& session;
Demux::QueuePtr queue;
bool running;
bool autoStop;
@@ -68,7 +67,7 @@ class Dispatcher : public sys::Runnable
bool isStopped();
public:
- Dispatcher(Session_0_10& session, const std::string& queue = "");
+ Dispatcher(Session& session, const std::string& queue = "");
void start();
void run();
diff --git a/cpp/src/qpid/client/LocalQueue.h b/cpp/src/qpid/client/LocalQueue.h
index eba28f6599..f8b2c2e0b3 100644
--- a/cpp/src/qpid/client/LocalQueue.h
+++ b/cpp/src/qpid/client/LocalQueue.h
@@ -50,7 +50,7 @@ class LocalQueue
private:
friend class SubscriptionManager;
- Session_0_10 session;
+ Session session;
Demux::QueuePtr queue;
AckPolicy autoAck;
};
diff --git a/cpp/src/qpid/client/Message.h b/cpp/src/qpid/client/Message.h
index 86404ac792..daac30ba36 100644
--- a/cpp/src/qpid/client/Message.h
+++ b/cpp/src/qpid/client/Message.h
@@ -22,7 +22,7 @@
*
*/
#include <string>
-#include "qpid/client/Session_0_10.h"
+#include "qpid/client/Session.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/framing/TransferContent.h"
@@ -63,18 +63,18 @@ public:
return getMessageProperties().getApplicationHeaders();
}
- void acknowledge(Session_0_10& session, bool cumulative = true, bool send = true) const
+ void acknowledge(Session& session, bool cumulative = true, bool send = true) const
{
session.getExecution().completed(id, cumulative, send);
}
void acknowledge(bool cumulative = true, bool send = true) const
{
- const_cast<Session_0_10&>(session).getExecution().completed(id, cumulative, send);
+ const_cast<Session&>(session).getExecution().completed(id, cumulative, send);
}
/**@internal for incoming messages */
- Message(const framing::FrameSet& frameset, Session_0_10 s) :
+ Message(const framing::FrameSet& frameset, Session s) :
method(*frameset.as<framing::MessageTransferBody>()), id(frameset.getId()), session(s)
{
populate(frameset);
@@ -91,12 +91,12 @@ public:
}
/**@internal use for incoming messages. */
- void setSession(Session_0_10 s) { session=s; }
+ void setSession(Session s) { session=s; }
private:
//method and id are only set for received messages:
framing::MessageTransferBody method;
framing::SequenceNumber id;
- Session_0_10 session;
+ Session session;
};
}}
diff --git a/cpp/src/qpid/client/Session.h b/cpp/src/qpid/client/Session.h
index 11105dcd36..3293af60fe 100644
--- a/cpp/src/qpid/client/Session.h
+++ b/cpp/src/qpid/client/Session.h
@@ -21,7 +21,7 @@
* under the License.
*
*/
-#include "qpid/client/Session_0_10.h"
+#include "qpid/client/Session_99_0.h"
namespace qpid {
namespace client {
@@ -31,7 +31,7 @@ namespace client {
*
* \ingroup clientapi
*/
-typedef Session_0_10 Session;
+typedef Session_99_0 Session;
}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/SubscriptionManager.cpp b/cpp/src/qpid/client/SubscriptionManager.cpp
index 7289997a69..f14344225c 100644
--- a/cpp/src/qpid/client/SubscriptionManager.cpp
+++ b/cpp/src/qpid/client/SubscriptionManager.cpp
@@ -23,7 +23,7 @@
#include "SubscriptionManager.h"
#include <qpid/client/Dispatcher.h>
-#include <qpid/client/Session_0_10.h>
+#include <qpid/client/Session.h>
#include <qpid/client/MessageListener.h>
#include <set>
#include <sstream>
@@ -32,7 +32,7 @@
namespace qpid {
namespace client {
-SubscriptionManager::SubscriptionManager(Session_0_10& s)
+SubscriptionManager::SubscriptionManager(Session& s)
: dispatcher(s), session(s),
messages(UNLIMITED), bytes(UNLIMITED), window(true),
confirmMode(true), acquireMode(false),
diff --git a/cpp/src/qpid/client/SubscriptionManager.h b/cpp/src/qpid/client/SubscriptionManager.h
index 73331450cf..1741796f4f 100644
--- a/cpp/src/qpid/client/SubscriptionManager.h
+++ b/cpp/src/qpid/client/SubscriptionManager.h
@@ -24,7 +24,7 @@
#include "qpid/sys/Mutex.h"
#include <qpid/client/Dispatcher.h>
#include <qpid/client/Completion.h>
-#include <qpid/client/Session_0_10.h>
+#include <qpid/client/Session.h>
#include <qpid/client/MessageListener.h>
#include <qpid/client/LocalQueue.h>
#include <qpid/sys/Runnable.h>
@@ -48,7 +48,7 @@ class SubscriptionManager : public sys::Runnable
Completion subscribeInternal(const std::string& q, const std::string& dest);
qpid::client::Dispatcher dispatcher;
- qpid::client::Session_0_10& session;
+ qpid::client::Session& session;
uint32_t messages;
uint32_t bytes;
bool window;
@@ -58,7 +58,7 @@ class SubscriptionManager : public sys::Runnable
bool autoStop;
public:
- SubscriptionManager(Session_0_10& session);
+ SubscriptionManager(Session& session);
/**
* Subscribe a MessagesListener to receive messages from queue.
diff --git a/cpp/src/qpid/framing/AMQP_HighestVersion.h b/cpp/src/qpid/framing/AMQP_HighestVersion.h
index 42139c7937..1be8856c13 100644
--- a/cpp/src/qpid/framing/AMQP_HighestVersion.h
+++ b/cpp/src/qpid/framing/AMQP_HighestVersion.h
@@ -32,7 +32,7 @@
namespace qpid {
namespace framing {
-static ProtocolVersion highestProtocolVersion(0, 10);
+static ProtocolVersion highestProtocolVersion(99, 0);
} /* namespace framing */
} /* namespace qpid */
diff --git a/cpp/src/tests/BrokerFixture.h b/cpp/src/tests/BrokerFixture.h
index 9fcdb57a99..fd350b77fe 100644
--- a/cpp/src/tests/BrokerFixture.h
+++ b/cpp/src/tests/BrokerFixture.h
@@ -27,7 +27,7 @@
#include "qpid/broker/Broker.h"
#include "qpid/client/Connection.h"
#include "qpid/client/ConnectionImpl.h"
-#include "qpid/client/Session_0_10.h"
+#include "qpid/client/Session.h"
#include "qpid/client/SubscriptionManager.h"
/**
@@ -86,7 +86,7 @@ struct ProxyConnection : public qpid::client::Connection {
template <class ConnectionType>
struct SessionFixtureT : BrokerFixture {
ConnectionType connection;
- qpid::client::Session_0_10 session;
+ qpid::client::Session session;
qpid::client::SubscriptionManager subs;
qpid::client::LocalQueue lq;
diff --git a/cpp/src/tests/ClientSessionTest.cpp b/cpp/src/tests/ClientSessionTest.cpp
index c299837f86..7a997db327 100644
--- a/cpp/src/tests/ClientSessionTest.cpp
+++ b/cpp/src/tests/ClientSessionTest.cpp
@@ -24,7 +24,7 @@
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Thread.h"
#include "qpid/sys/Runnable.h"
-#include "qpid/client/Session_0_10.h"
+#include "qpid/client/Session.h"
#include "qpid/framing/TransferContent.h"
#include "qpid/framing/reply_exceptions.h"
@@ -52,7 +52,7 @@ struct DummyListener : public sys::Runnable, public MessageListener {
uint expected;
Dispatcher dispatcher;
- DummyListener(Session_0_10& session, const string& n, uint ex) :
+ DummyListener(Session& session, const string& n, uint ex) :
name(n), expected(ex), dispatcher(session) {}
void run()
diff --git a/cpp/src/tests/client_test.cpp b/cpp/src/tests/client_test.cpp
index 84fc9434de..bd2a541c92 100644
--- a/cpp/src/tests/client_test.cpp
+++ b/cpp/src/tests/client_test.cpp
@@ -31,7 +31,7 @@
#include "TestOptions.h"
#include "qpid/client/Connection.h"
#include "qpid/client/Message.h"
-#include "qpid/client/Session_0_10.h"
+#include "qpid/client/Session.h"
#include "qpid/framing/FrameSet.h"
#include "qpid/framing/MessageTransferBody.h"
@@ -92,7 +92,7 @@ int main(int argc, char** argv)
//Create and open a session on the connection through which
//most functionality is exposed:
- Session_0_10 session = connection.newSession(ASYNC);
+ Session session = connection.newSession(ASYNC);
if (opts.trace) std::cout << "Opened session." << std::endl;
diff --git a/cpp/src/tests/latencytest.cpp b/cpp/src/tests/latencytest.cpp
index 86200054d8..2b44a5477a 100644
--- a/cpp/src/tests/latencytest.cpp
+++ b/cpp/src/tests/latencytest.cpp
@@ -30,7 +30,7 @@
#include "TestOptions.h"
#include "qpid/client/Connection.h"
#include "qpid/client/Message.h"
-#include "qpid/client/Session_0_10.h"
+#include "qpid/client/Session.h"
#include "qpid/client/SubscriptionManager.h"
using namespace qpid;
@@ -98,7 +98,7 @@ class Client : public Runnable
{
protected:
Connection connection;
- Session_0_10 session;
+ Session session;
Thread thread;
string queue;
diff --git a/cpp/src/tests/perftest.cpp b/cpp/src/tests/perftest.cpp
index bc638635da..b950e432f5 100644
--- a/cpp/src/tests/perftest.cpp
+++ b/cpp/src/tests/perftest.cpp
@@ -21,7 +21,7 @@
#include "TestOptions.h"
-#include "qpid/client/Session_0_10.h"
+#include "qpid/client/Session.h"
#include "qpid/client/SubscriptionManager.h"
#include "qpid/client/Connection.h"
#include "qpid/client/Completion.h"
@@ -191,7 +191,7 @@ Opts opts;
struct Client : public Runnable {
Connection connection;
- Session_0_10 session;
+ Session session;
Thread thread;
Client() {
diff --git a/cpp/src/tests/topic_listener.cpp b/cpp/src/tests/topic_listener.cpp
index ec73f3cbe0..e5e7d24112 100644
--- a/cpp/src/tests/topic_listener.cpp
+++ b/cpp/src/tests/topic_listener.cpp
@@ -35,7 +35,7 @@
#include "TestOptions.h"
#include "qpid/client/Connection.h"
#include "qpid/client/MessageListener.h"
-#include "qpid/client/Session_0_10.h"
+#include "qpid/client/Session.h"
#include "qpid/client/SubscriptionManager.h"
#include "qpid/sys/Time.h"
#include "qpid/framing/FieldValue.h"
@@ -53,7 +53,7 @@ using namespace std;
* defined.
*/
class Listener : public MessageListener{
- Session_0_10& session;
+ Session& session;
SubscriptionManager& mgr;
const string responseQueue;
const bool transactional;
@@ -64,7 +64,7 @@ class Listener : public MessageListener{
void shutdown();
void report();
public:
- Listener(Session_0_10& session, SubscriptionManager& mgr, const string& reponseQueue, bool tx);
+ Listener(Session& session, SubscriptionManager& mgr, const string& reponseQueue, bool tx);
virtual void received(Message& msg);
};
@@ -101,7 +101,7 @@ int main(int argc, char** argv){
else {
Connection connection(args.trace);
args.open(connection);
- Session_0_10 session = connection.newSession(ASYNC);
+ Session session = connection.newSession(ASYNC);
if (args.transactional) {
session.txSelect();
}
@@ -144,7 +144,7 @@ int main(int argc, char** argv){
return 1;
}
-Listener::Listener(Session_0_10& s, SubscriptionManager& m, const string& _responseq, bool tx) :
+Listener::Listener(Session& s, SubscriptionManager& m, const string& _responseq, bool tx) :
session(s), mgr(m), responseQueue(_responseq), transactional(tx), init(false), count(0){}
void Listener::received(Message& message){
diff --git a/cpp/src/tests/topic_publisher.cpp b/cpp/src/tests/topic_publisher.cpp
index 24a4fc6752..2271849c35 100644
--- a/cpp/src/tests/topic_publisher.cpp
+++ b/cpp/src/tests/topic_publisher.cpp
@@ -37,7 +37,7 @@
#include "TestOptions.h"
#include "qpid/client/Connection.h"
#include "qpid/client/MessageListener.h"
-#include "qpid/client/Session_0_10.h"
+#include "qpid/client/Session.h"
#include "qpid/client/SubscriptionManager.h"
#include "qpid/sys/Monitor.h"
#include <unistd.h>
@@ -56,7 +56,7 @@ using namespace std;
* back by the subscribers.
*/
class Publisher {
- Session_0_10& session;
+ Session& session;
SubscriptionManager mgr;
LocalQueue queue;
const string controlTopic;
@@ -66,7 +66,7 @@ class Publisher {
string generateData(int size);
public:
- Publisher(Session_0_10& session, const string& controlTopic, bool tx, bool durable);
+ Publisher(Session& session, const string& controlTopic, bool tx, bool durable);
int64_t publish(int msgs, int listeners, int size);
void terminate();
};
@@ -107,7 +107,7 @@ int main(int argc, char** argv) {
else {
Connection connection(args.trace);
args.open(connection);
- Session_0_10 session = connection.newSession(ASYNC);
+ Session session = connection.newSession(ASYNC);
if (args.transactional) {
session.txSelect();
}
@@ -150,7 +150,7 @@ int main(int argc, char** argv) {
return 1;
}
-Publisher::Publisher(Session_0_10& _session, const string& _controlTopic, bool tx, bool d) :
+Publisher::Publisher(Session& _session, const string& _controlTopic, bool tx, bool d) :
session(_session), mgr(session), controlTopic(_controlTopic), transactional(tx), durable(d)
{
mgr.subscribe(queue, "response");
diff --git a/cpp/src/tests/txtest.cpp b/cpp/src/tests/txtest.cpp
index f7776dee8d..4c5814986c 100644
--- a/cpp/src/tests/txtest.cpp
+++ b/cpp/src/tests/txtest.cpp
@@ -28,7 +28,7 @@
#include "TestOptions.h"
#include "qpid/client/Connection.h"
#include "qpid/client/Message.h"
-#include "qpid/client/Session_0_10.h"
+#include "qpid/client/Session.h"
#include "qpid/client/SubscriptionManager.h"
using namespace qpid;
@@ -96,7 +96,7 @@ Args opts;
struct Client
{
Connection connection;
- Session_0_10 session;
+ Session session;
Client()
{
diff --git a/python/qpid/connection.py b/python/qpid/connection.py
index ecbce295b7..eafad7067a 100644
--- a/python/qpid/connection.py
+++ b/python/qpid/connection.py
@@ -178,6 +178,12 @@ class Connection:
raise "frame error: expected %r, got %r" % (self.FRAME_END, garbage)
return frame
+ def write_99_0(self, frame):
+ self.write_0_10(frame)
+
+ def read_99_0(self):
+ return self.read_0_10()
+
class Frame:
DECODERS = {}
@@ -233,7 +239,7 @@ class Method(Frame):
def encode(self, c):
version = (c.spec.major, c.spec.minor)
- if version == (0, 10):
+ if version == (0, 10) or version == (99, 0):
c.encode_octet(self.method.klass.id)
c.encode_octet(self.method.id)
else:
@@ -244,7 +250,7 @@ class Method(Frame):
def decode(spec, c, size):
version = (c.spec.major, c.spec.minor)
- if version == (0, 10):
+ if version == (0, 10) or version == (99, 0):
klass = spec.classes.byid[c.decode_octet()]
meth = klass.methods.byid[c.decode_octet()]
else:
@@ -315,7 +321,7 @@ class Response(Frame):
return "[%s] Response(%s,%s,%s) %s" % (self.channel, self.id, self.request_id, self.batch_offset, self.method)
def uses_struct_encoding(spec):
- return (spec.major == 0 and spec.minor == 10)
+ return (spec.major == 0 and spec.minor == 10) or (spec.major == 99 and spec.minor == 0)
class Header(Frame):
diff --git a/python/qpid/peer.py b/python/qpid/peer.py
index c7f56b49b2..a464e95593 100644
--- a/python/qpid/peer.py
+++ b/python/qpid/peer.py
@@ -198,7 +198,7 @@ class Channel:
self.invoker = self.invoke_reliable
else:
self.invoker = self.invoke_method
- self.use_execution_layer = (spec.major == 0 and spec.minor == 10)
+ self.use_execution_layer = (spec.major == 0 and spec.minor == 10) or (spec.major == 99 and spec.minor == 0)
self.synchronous = True
def closed(self, reason):
diff --git a/python/qpid/testlib.py b/python/qpid/testlib.py
index c4f55be18a..5174fe10f4 100644
--- a/python/qpid/testlib.py
+++ b/python/qpid/testlib.py
@@ -141,7 +141,7 @@ Options:
self.tests=findmodules("tests")
if self.use08spec():
self.tests+=findmodules("tests_0-8")
- elif self.spec.major == 0 and self.spec.minor == 10:
+ elif (self.spec.major == 0 and self.spec.minor == 10) or (self.spec.major == 99 and self.spec.minor == 0):
self.tests+=findmodules("tests_0-10")
else:
self.tests+=findmodules("tests_0-9")
diff --git a/specs/amqp.0-10-preview.xml b/specs/amqp.0-10-preview.xml
index 6ba6bfc5ed..5af956e75d 100644
--- a/specs/amqp.0-10-preview.xml
+++ b/specs/amqp.0-10-preview.xml
@@ -137,7 +137,7 @@
-->
<amqp xmlns="http://www.amqp.org/schema/amqp.xsd"
- major="0" minor="10" port="5672" comment="AMQ Protocol (Working version)">
+ major="99" minor="0" port="5672" comment="AMQ Protocol (Working version)">
<!--
======================================================