summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/Makefile.am7
-rw-r--r--cpp/src/qpid/SessionId.h12
-rw-r--r--cpp/src/qpid/client/AckPolicy.h5
-rw-r--r--cpp/src/qpid/client/AsyncSession.h38
-rw-r--r--cpp/src/qpid/client/Connection.cpp19
-rw-r--r--cpp/src/qpid/client/Connection.h12
-rw-r--r--cpp/src/qpid/client/Dispatcher.cpp7
-rw-r--r--cpp/src/qpid/client/Dispatcher.h8
-rw-r--r--cpp/src/qpid/client/Session.h1
-rw-r--r--cpp/src/qpid/client/SessionBase.cpp84
-rw-r--r--cpp/src/qpid/client/SessionBase.h148
-rw-r--r--cpp/src/qpid/client/SessionBase_0_10.cpp64
-rw-r--r--cpp/src/qpid/client/SessionBase_0_10.h104
-rw-r--r--cpp/src/qpid/client/SessionBase_0_10Access.h42
-rw-r--r--cpp/src/qpid/client/SessionImpl.cpp42
-rw-r--r--cpp/src/qpid/client/SessionImpl.h15
-rw-r--r--cpp/src/qpid/client/SubscriptionManager.cpp22
-rw-r--r--cpp/src/qpid/client/SubscriptionManager.h10
-rw-r--r--cpp/src/tests/BrokerFixture.h2
-rw-r--r--cpp/src/tests/ClientSessionTest.cpp84
-rw-r--r--cpp/src/tests/XmlClientSessionTest.cpp2
-rw-r--r--cpp/src/tests/client_test.cpp2
-rw-r--r--cpp/src/tests/consume.cpp2
-rw-r--r--cpp/src/tests/exception_test.cpp1
-rw-r--r--cpp/src/tests/latencytest.cpp10
-rw-r--r--cpp/src/tests/perftest.cpp8
-rw-r--r--cpp/src/tests/publish.cpp6
-rw-r--r--cpp/src/tests/topic_listener.cpp11
-rw-r--r--cpp/src/tests/topic_publisher.cpp10
-rwxr-xr-xcpp/src/tests/topictest2
-rw-r--r--cpp/src/tests/txtest.cpp6
31 files changed, 396 insertions, 390 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index 1c8ca9da12..0bd04ec6cb 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -324,7 +324,9 @@ libqpidclient_la_SOURCES = \
qpid/client/MessageListener.cpp \
qpid/client/Queue.cpp \
qpid/client/Results.cpp \
- qpid/client/SessionBase.cpp \
+ qpid/client/SessionBase_0_10.cpp \
+ qpid/client/SessionBase_0_10.h \
+ qpid/client/SessionBase_0_10Access.h \
qpid/client/SessionImpl.cpp \
qpid/client/StateManager.cpp \
qpid/client/SubscriptionManager.cpp
@@ -446,8 +448,9 @@ nobase_include_HEADERS = \
qpid/client/MessageQueue.h \
qpid/client/Queue.h \
qpid/client/Results.h \
- qpid/client/SessionBase.h \
+ qpid/client/SessionBase_0_10.h \
qpid/client/Session.h \
+ qpid/client/AsyncSession.h \
qpid/client/SessionImpl.h \
qpid/client/StateManager.h \
qpid/client/SubscriptionManager.h \
diff --git a/cpp/src/qpid/SessionId.h b/cpp/src/qpid/SessionId.h
index 08553e8b1d..291c42a2bb 100644
--- a/cpp/src/qpid/SessionId.h
+++ b/cpp/src/qpid/SessionId.h
@@ -27,7 +27,17 @@
namespace qpid {
-/** Identifier for a session */
+/** Identifier for a session.
+ * There are two parts to a session identifier:
+ *
+ * getUserId() returns the authentication principal associated with
+ * the session's connection.
+ *
+ * getName() returns the session name.
+ *
+ * The name must be unique among sessions with the same authentication
+ * principal.
+ */
class SessionId : boost::totally_ordered1<SessionId> {
std::string userId;
std::string name;
diff --git a/cpp/src/qpid/client/AckPolicy.h b/cpp/src/qpid/client/AckPolicy.h
index 8d62b6f4f2..d00df1ef26 100644
--- a/cpp/src/qpid/client/AckPolicy.h
+++ b/cpp/src/qpid/client/AckPolicy.h
@@ -22,6 +22,7 @@
*/
#include "qpid/framing/SequenceSet.h"
+#include "qpid/client/AsyncSession.h"
namespace qpid {
namespace client {
@@ -44,7 +45,7 @@ class AckPolicy
*/
AckPolicy(size_t n=1) : interval(n), count(n) {}
- void ack(const Message& msg, Session& session) {
+ void ack(const Message& msg, AsyncSession session) {
accepted.add(msg.getId());
if (!interval) return;
if (--count==0) {
@@ -57,7 +58,7 @@ class AckPolicy
}
}
- void ackOutstanding(Session& session) {
+ void ackOutstanding(AsyncSession session) {
if (!accepted.empty()) {
session.messageAccept(accepted);
accepted.clear();
diff --git a/cpp/src/qpid/client/AsyncSession.h b/cpp/src/qpid/client/AsyncSession.h
new file mode 100644
index 0000000000..150aabe191
--- /dev/null
+++ b/cpp/src/qpid/client/AsyncSession.h
@@ -0,0 +1,38 @@
+#ifndef QPID_CLIENT_ASYNCSESSION_H
+#define QPID_CLIENT_ASYNCSESSION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/client/AsyncSession_0_10.h"
+
+namespace qpid {
+namespace client {
+
+/**
+ * AsyncSession is an alias for Session_0_10
+ *
+ * \ingroup clientapi
+ */
+typedef AsyncSession_0_10 AsyncSession;
+
+}} // namespace qpid::client
+
+#endif /*!QPID_CLIENT_ASYNCSESSION_H*/
diff --git a/cpp/src/qpid/client/Connection.cpp b/cpp/src/qpid/client/Connection.cpp
index 82d1eac8b4..bec2b0345d 100644
--- a/cpp/src/qpid/client/Connection.cpp
+++ b/cpp/src/qpid/client/Connection.cpp
@@ -22,6 +22,7 @@
#include "ConnectionSettings.h"
#include "Message.h"
#include "SessionImpl.h"
+#include "SessionBase_0_10Access.h"
#include "qpid/log/Logger.h"
#include "qpid/log/Options.h"
#include "qpid/log/Statement.h"
@@ -72,18 +73,16 @@ void Connection::open(const ConnectionSettings& settings)
max_frame_size = impl->getNegotiatedSettings().maxFrameSize;
}
-Session Connection::newSession(SynchronousMode sync,
- uint32_t detachedLifetime)
-{
+Session Connection::newSession(const std::string& name) {
if (!impl)
throw Exception(QPID_MSG("Connection has not yet been opened"));
-
- shared_ptr<SessionImpl> core(
- new SessionImpl(impl, ++channelIdCounter, max_frame_size));
- core->setSync(sync);
- impl->addSession(core);
- core->open(detachedLifetime);
- return Session(core);
+ shared_ptr<SessionImpl> simpl(
+ new SessionImpl(name, impl, ++channelIdCounter, max_frame_size));
+ impl->addSession(simpl);
+ simpl->open(0);
+ Session s;
+ SessionBase_0_10Access(s).set(simpl);
+ return s;
}
void Connection::resume(Session& session) {
diff --git a/cpp/src/qpid/client/Connection.h b/cpp/src/qpid/client/Connection.h
index 0c01c77509..5337a20bfa 100644
--- a/cpp/src/qpid/client/Connection.h
+++ b/cpp/src/qpid/client/Connection.h
@@ -26,7 +26,6 @@
#include "ConnectionImpl.h"
#include "qpid/client/Session.h"
#include "qpid/framing/AMQP_HighestVersion.h"
-#include "qpid/framing/Uuid.h"
namespace qpid {
@@ -111,14 +110,11 @@ class Connection
* multiple streams of work to be multiplexed over the same
* connection.
*
- *@param detachedLifetime: A session may be detached from its
- * channel, either by calling Session::suspend() or because of a
- * network failure. The session state is preserved for
- * detachedLifetime seconds to allow a call to resume(). After
- * that the broker may discard the session state. Default is 0,
- * meaning the session cannot be resumed.
+ *@param name: A name to identify the session. @see qpid::SessionId
+ * If the name is empty (the default) then a unique name will be
+ * chosen using a Universally-unique identifier (UUID) algorithm.
*/
- Session newSession(SynchronousMode sync, uint32_t detachedLifetime=0);
+ Session newSession(const std::string& name=std::string());
/**
* Resume a suspended session. A session may be resumed
diff --git a/cpp/src/qpid/client/Dispatcher.cpp b/cpp/src/qpid/client/Dispatcher.cpp
index 2bbe5a122f..0bd0cb9d08 100644
--- a/cpp/src/qpid/client/Dispatcher.cpp
+++ b/cpp/src/qpid/client/Dispatcher.cpp
@@ -37,7 +37,8 @@ using qpid::sys::Thread;
namespace qpid {
namespace client {
-Subscriber::Subscriber(Session& s, MessageListener* l, AckPolicy a) : session(s), listener(l), autoAck(a) {}
+Subscriber::Subscriber(const Session& s, MessageListener* l, AckPolicy a)
+ : session(s), listener(l), autoAck(a) {}
void Subscriber::received(Message& msg)
{
@@ -47,7 +48,7 @@ void Subscriber::received(Message& msg)
}
}
-Dispatcher::Dispatcher(Session& s, const std::string& q)
+Dispatcher::Dispatcher(const Session& s, const std::string& q)
: session(s), running(false), autoStop(true)
{
queue = q.empty() ?
@@ -88,7 +89,7 @@ void Dispatcher::run()
}
}
}
- session.sync(); // Make sure all our acks are received before returning.
+ sync(session).sync(); // Make sure all our acks are received before returning.
}
catch (const ClosedException&) {} //ignore it and return
catch (const std::exception& e) {
diff --git a/cpp/src/qpid/client/Dispatcher.h b/cpp/src/qpid/client/Dispatcher.h
index e23d0c198c..1b31ddf4cf 100644
--- a/cpp/src/qpid/client/Dispatcher.h
+++ b/cpp/src/qpid/client/Dispatcher.h
@@ -37,13 +37,13 @@ namespace client {
class Subscriber : public MessageListener
{
- Session& session;
+ AsyncSession session;
MessageListener* const listener;
AckPolicy autoAck;
public:
typedef boost::shared_ptr<Subscriber> shared_ptr;
- Subscriber(Session& session, MessageListener* listener, AckPolicy);
+ Subscriber(const Session& session, MessageListener* listener, AckPolicy);
void received(Message& msg);
};
@@ -55,7 +55,7 @@ class Dispatcher : public sys::Runnable
typedef std::map<std::string, Subscriber::shared_ptr> Listeners;
sys::Mutex lock;
sys::Thread worker;
- Session& session;
+ Session session;
Demux::QueuePtr queue;
bool running;
bool autoStop;
@@ -67,7 +67,7 @@ class Dispatcher : public sys::Runnable
bool isStopped();
public:
- Dispatcher(Session& session, const std::string& queue = "");
+ Dispatcher(const Session& session, const std::string& queue = "");
void start();
void run();
diff --git a/cpp/src/qpid/client/Session.h b/cpp/src/qpid/client/Session.h
index 84831ec442..bdabd26c82 100644
--- a/cpp/src/qpid/client/Session.h
+++ b/cpp/src/qpid/client/Session.h
@@ -33,6 +33,7 @@ namespace client {
*/
typedef Session_0_10 Session;
+
}} // namespace qpid::client
#endif /*!QPID_CLIENT_SESSION_H*/
diff --git a/cpp/src/qpid/client/SessionBase.cpp b/cpp/src/qpid/client/SessionBase.cpp
deleted file mode 100644
index dfd0f62e7e..0000000000
--- a/cpp/src/qpid/client/SessionBase.cpp
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "SessionBase.h"
-#include "qpid/framing/all_method_bodies.h"
-
-namespace qpid {
-namespace client {
-using namespace framing;
-
-SessionBase::SessionBase() {}
-SessionBase::~SessionBase() {}
-SessionBase::SessionBase(shared_ptr<SessionImpl> core) : impl(core) {}
-void SessionBase::suspend() { impl->suspend(); }
-void SessionBase::close() { impl->close(); }
-
-void SessionBase::setSynchronous(bool isSync) { impl->setSync(isSync); }
-void SessionBase::setSynchronous(SynchronousMode m) { impl->setSync(m); }
-bool SessionBase::isSynchronous() const { return impl->isSync(); }
-SynchronousMode SessionBase::getSynchronous() const {
- return SynchronousMode(impl->isSync());
-}
-
-Execution& SessionBase::getExecution()
-{
- return *impl;
-}
-
-void SessionBase::flush()
-{
- impl->sendFlush();
-}
-
-// FIXME aconway 2008-04-24: do we need to provide a non-synchronous version
-// of sync() or bool paramter to allow setting a sync point for a later wait?
-void SessionBase::sync()
-{
- ExecutionSyncBody b;
- b.setSync(true);
- impl->send(b).wait(*impl);
-}
-
-void SessionBase::markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer)
-{
- impl->markCompleted(id, cumulative, notifyPeer);
-}
-
-void SessionBase::sendCompletion()
-{
- impl->sendCompletion();
-}
-
-Uuid SessionBase::getId() const { return impl->getId(); }
-framing::FrameSet::shared_ptr SessionBase::get() { return impl->get(); }
-
-SessionBase::ScopedSync::ScopedSync(SessionBase& s) : session(s), change(!s.isSynchronous())
-{
- if (change) session.setSynchronous(true);
-}
-
-SessionBase::ScopedSync::~ScopedSync()
-{
- if (change) session.setSynchronous(false);
-}
-
-
-}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/SessionBase.h b/cpp/src/qpid/client/SessionBase.h
deleted file mode 100644
index 7f4a27dc09..0000000000
--- a/cpp/src/qpid/client/SessionBase.h
+++ /dev/null
@@ -1,148 +0,0 @@
-#ifndef QPID_CLIENT_SESSIONBASE_H
-#define QPID_CLIENT_SESSIONBASE_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/framing/Uuid.h"
-#include "qpid/framing/amqp_structs.h"
-#include "qpid/framing/ProtocolVersion.h"
-#include "qpid/framing/MethodContent.h"
-#include "qpid/framing/TransferContent.h"
-#include "qpid/client/Completion.h"
-#include "qpid/client/ConnectionImpl.h"
-#include "qpid/client/Execution.h"
-#include "qpid/client/SessionImpl.h"
-#include "qpid/client/TypedResult.h"
-#include "qpid/shared_ptr.h"
-#include <string>
-
-namespace qpid {
-namespace client {
-
-using std::string;
-using framing::Content;
-using framing::FieldTable;
-using framing::MethodContent;
-using framing::SequenceNumberSet;
-using framing::Uuid;
-
-enum CreditUnit { MESSAGE=0, BYTE=1 };
-
-/** \defgroup clientapi Synchronous mode of a session.
- *
- * SYNC means that Session functions do not return until the remote
- * broker has confirmed that the command was executed.
- *
- * ASYNC means that the client sends commands asynchronously, Session
- * functions return immediately.
- *
- * ASYNC mode gives better performance for high-volume traffic, but
- * requires some additional caution.
- *
- * Session functions return immediately. If the command causes an
- * exception on the broker, the exception will be thrown on a
- * <em>later</em> function call.
- *
- * If you need to notify some extenal agent that some actions have
- * been taken (e.g. binding queues to exchanges), you must call
- * Session::sync() first to ensure that all the commands are complete.
- *
- * You can freely switch between modes by calling Session::setSynchronous().
- *
- * @see Session::sync(), Session::setSynchronous()
- */
-enum SynchronousMode { SYNC=true, ASYNC=false };
-
-
-/**
- * Basic session operations that are not derived from AMQP XML methods.
- */
-class SessionBase
-{
- public:
- /**
- * Instances of this class turn synchronous mode on for the
- * duration of their scope (and revert back to async if required
- * afterwards).
- */
- class ScopedSync
- {
- SessionBase& session;
- const bool change;
- public:
- ScopedSync(SessionBase& s);
- ~ScopedSync();
- };
-
-
- SessionBase();
- ~SessionBase();
-
- /** Get the next message frame-set from the session. */
- framing::FrameSet::shared_ptr get();
-
- /** Get the session ID */
- Uuid getId() const;
-
- /**
- * In synchronous mode, wait for the broker's response before
- * returning. This gives lower throughput than asynchronous
- * mode.
- *
- * In asynchronous mode commands are sent without waiting
- * for a response (you can use the returned Completion object
- * to wait for completion).
- *
- * @see SynchronousMode
- */
- void setSynchronous(SynchronousMode mode);
- void setSynchronous(bool set);
- bool isSynchronous() const;
- SynchronousMode getSynchronous() const;
-
- /**
- * Suspend the session, which can be resumed on a different connection.
- * @see Connection::resume()
- */
- void suspend();
-
- /** Close the session */
- void close();
-
- Execution& getExecution();
- void sync();
- void flush();
- void markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer);
- void sendCompletion();
-
- typedef framing::TransferContent DefaultContent;
-
- protected:
- shared_ptr<SessionImpl> impl;
- framing::ProtocolVersion version;
- friend class Connection;
- SessionBase(shared_ptr<SessionImpl>);
-};
-
-}} // namespace qpid::client
-
-#endif /*!QPID_CLIENT_SESSIONBASE_H*/
diff --git a/cpp/src/qpid/client/SessionBase_0_10.cpp b/cpp/src/qpid/client/SessionBase_0_10.cpp
new file mode 100644
index 0000000000..974acbfcf6
--- /dev/null
+++ b/cpp/src/qpid/client/SessionBase_0_10.cpp
@@ -0,0 +1,64 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "SessionBase_0_10.h"
+#include "qpid/framing/all_method_bodies.h"
+
+namespace qpid {
+namespace client {
+using namespace framing;
+
+SessionBase_0_10::SessionBase_0_10() {}
+SessionBase_0_10::~SessionBase_0_10() {}
+
+void SessionBase_0_10::close() { impl->close(); }
+
+Execution& SessionBase_0_10::getExecution()
+{
+ return *impl;
+}
+
+void SessionBase_0_10::flush()
+{
+ impl->sendFlush();
+}
+
+void SessionBase_0_10::sync()
+{
+ ExecutionSyncBody b;
+ b.setSync(true);
+ impl->send(b).wait(*impl);
+}
+
+void SessionBase_0_10::markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer)
+{
+ impl->markCompleted(id, cumulative, notifyPeer);
+}
+
+void SessionBase_0_10::sendCompletion()
+{
+ impl->sendCompletion();
+}
+
+SessionId SessionBase_0_10::getId() const { return impl->getId(); }
+framing::FrameSet::shared_ptr SessionBase_0_10::get() { return impl->get(); }
+
+
+}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/SessionBase_0_10.h b/cpp/src/qpid/client/SessionBase_0_10.h
new file mode 100644
index 0000000000..f9ced049a9
--- /dev/null
+++ b/cpp/src/qpid/client/SessionBase_0_10.h
@@ -0,0 +1,104 @@
+#ifndef QPID_CLIENT_SESSIONBASE_H
+#define QPID_CLIENT_SESSIONBASE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/SessionId.h"
+#include "qpid/framing/amqp_structs.h"
+#include "qpid/framing/ProtocolVersion.h"
+#include "qpid/framing/MethodContent.h"
+#include "qpid/framing/TransferContent.h"
+#include "qpid/client/Completion.h"
+#include "qpid/client/ConnectionImpl.h"
+#include "qpid/client/Execution.h"
+#include "qpid/client/SessionImpl.h"
+#include "qpid/client/TypedResult.h"
+#include "qpid/shared_ptr.h"
+#include <string>
+
+namespace qpid {
+namespace client {
+
+using std::string;
+using framing::Content;
+using framing::FieldTable;
+using framing::MethodContent;
+using framing::SequenceNumber;
+using framing::SequenceSet;
+using framing::SequenceNumberSet;
+using qpid::SessionId;
+using framing::Xid;
+
+enum CreditUnit { MESSAGE=0, BYTE=1 };
+
+/**
+ * Base class for handles to an AMQP session.
+ *
+ * Subclasses provide the AMQP commands for a given
+ * version of the protocol.
+ */
+class SessionBase_0_10 {
+ public:
+
+ typedef framing::TransferContent DefaultContent;
+
+ SessionBase_0_10();
+ ~SessionBase_0_10();
+
+ /** Get the next message frame-set from the session. */
+ framing::FrameSet::shared_ptr get();
+
+ /** Get the session ID */
+ SessionId getId() const;
+
+ /** Close the session.
+ * A session is automatically closed when all handles to it are destroyed.
+ */
+ void close();
+
+ /** Synchronize the session: sync() waits until all commands
+ * issued on this session have been completed. It is equivalent to
+ * calling Session::executionSync()
+ *
+ * Note sync() is always synchronous, even on an AsyncSession object
+ * because that's almost always what you want. You can call
+ * AsyncSession::executionSync() directly in the unusual event
+ * that you want to do an asynchronous sync.
+ */
+ void sync();
+
+ /** Set the timeout for this session. */
+ uint32_t timeout(uint32_t seconds);
+
+ Execution& getExecution();
+ void flush();
+ void markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer);
+ void sendCompletion();
+
+ protected:
+ boost::shared_ptr<SessionImpl> impl;
+ friend class SessionBase_0_10Access;
+};
+
+}} // namespace qpid::client
+
+#endif /*!QPID_CLIENT_SESSIONBASE_H*/
diff --git a/cpp/src/qpid/client/SessionBase_0_10Access.h b/cpp/src/qpid/client/SessionBase_0_10Access.h
new file mode 100644
index 0000000000..e2189a53dd
--- /dev/null
+++ b/cpp/src/qpid/client/SessionBase_0_10Access.h
@@ -0,0 +1,42 @@
+#ifndef QPID_CLIENT_SESSIONBASEACCESS_H
+#define QPID_CLIENT_SESSIONBASEACCESS_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/client/SessionBase_0_10.h"
+
+/**@file @internal Internal use only */
+
+namespace qpid {
+namespace client {
+
+class SessionBase_0_10Access {
+ public:
+ SessionBase_0_10Access(SessionBase_0_10& sb_) : sb(sb_) {}
+ void set(const boost::shared_ptr<SessionImpl>& si) { sb.impl = si; }
+ boost::shared_ptr<SessionImpl> get() { return sb.impl; }
+ private:
+ SessionBase_0_10& sb;
+};
+}} // namespace qpid::client
+
+#endif /*!QPID_CLIENT_SESSIONBASEACCESS_H*/
diff --git a/cpp/src/qpid/client/SessionImpl.cpp b/cpp/src/qpid/client/SessionImpl.cpp
index 18b573b464..58f4bc0aa7 100644
--- a/cpp/src/qpid/client/SessionImpl.cpp
+++ b/cpp/src/qpid/client/SessionImpl.cpp
@@ -48,18 +48,16 @@ typedef sys::Monitor::ScopedUnlock UnLock;
typedef sys::ScopedLock<sys::Semaphore> Acquire;
-SessionImpl::SessionImpl(shared_ptr<ConnectionImpl> conn,
+SessionImpl::SessionImpl(const std::string& name,
+ shared_ptr<ConnectionImpl> conn,
uint16_t ch, uint64_t _maxFrameSize)
: error(OK),
code(NORMAL),
text(EMPTY),
state(INACTIVE),
- syncMode(false),
detachedLifetime(0),
maxFrameSize(_maxFrameSize),
- id(true),//generate unique uuid for each session
- name(id.str()),
- //TODO: may want to allow application defined names instead
+ id(conn->getNegotiatedSettings().username, name.empty() ? Uuid(true).str() : name),
connection(conn),
ioHandler(*this),
channel(ch),
@@ -81,15 +79,6 @@ SessionImpl::~SessionImpl() {
connection->erase(channel);
}
-void SessionImpl::setSync(bool s) // user thread
-{
- syncMode = s; //syncMode is volatile
-}
-
-bool SessionImpl::isSync() // user thread
-{
- return syncMode; //syncMode is volatile
-}
FrameSet::shared_ptr SessionImpl::get() // user thread
{
@@ -97,7 +86,7 @@ FrameSet::shared_ptr SessionImpl::get() // user thread
return demux.getDefault()->pop();
}
-const Uuid SessionImpl::getId() const //user thread
+const SessionId SessionImpl::getId() const //user thread
{
return id; //id is immutable
}
@@ -107,7 +96,7 @@ void SessionImpl::open(uint32_t timeout) // user thread
Lock l(state);
if (state == INACTIVE) {
setState(ATTACHING);
- proxy.attach(name, false);
+ proxy.attach(id.getName(), false);
waitFor(ATTACHED);
//TODO: timeout will not be set locally until get response to
//confirm, should we wait for that?
@@ -144,7 +133,7 @@ void SessionImpl::suspend() //user thread
void SessionImpl::detach() //call with lock held
{
if (state == ATTACHED) {
- proxy.detach(name);
+ proxy.detach(id.getName());
setState(DETACHING);
}
}
@@ -285,15 +274,11 @@ Future SessionImpl::sendCommand(const AMQBody& command, const MethodContent* con
{
Acquire a(sendLock);
SequenceNumber id = nextOut++;
- bool sync;
{
Lock l(state);
checkOpen();
incompleteOut.add(id);
- sync = syncMode;
}
-
- if (sync) command.getMethod()->setSync(true);
Future f(id);
if (command.getMethod()->resultExpected()) {
Lock l(state);
@@ -308,9 +293,6 @@ Future SessionImpl::sendCommand(const AMQBody& command, const MethodContent* con
if (content) {
sendContent(*content);
}
- if (sync) {
- waitForCompletion(id);
- }
return f;
}
void SessionImpl::sendContent(const MethodContent& content)
@@ -441,27 +423,27 @@ void SessionImpl::attach(const std::string& /*name*/, bool /*force*/)
void SessionImpl::attached(const std::string& _name)
{
Lock l(state);
- if (name != _name) throw InternalErrorException("Incorrect session name");
+ if (id.getName() != _name) throw InternalErrorException("Incorrect session name");
setState(ATTACHED);
}
void SessionImpl::detach(const std::string& _name)
{
Lock l(state);
- if (name != _name) throw InternalErrorException("Incorrect session name");
+ if (id.getName() != _name) throw InternalErrorException("Incorrect session name");
setState(DETACHED);
- QPID_LOG(info, "Session detached by peer: " << name);
+ QPID_LOG(info, "Session detached by peer: " << id);
}
void SessionImpl::detached(const std::string& _name, uint8_t _code)
{
Lock l(state);
- if (name != _name) throw InternalErrorException("Incorrect session name");
+ if (id.getName() != _name) throw InternalErrorException("Incorrect session name");
setState(DETACHED);
if (_code) {
//TODO: make sure this works with execution.exception - don't
//want to overwrite the code from that
- QPID_LOG(error, "Session detached by peer: " << name << " " << code);
+ QPID_LOG(error, "Session detached by peer: " << id << " " << code);
error = SESSION_DETACH;
code = _code;
text = "Session detached by peer";
@@ -561,8 +543,6 @@ void SessionImpl::gap(const framing::SequenceSet& /*commands*/)
throw NotImplementedException("gap not yet supported");
}
-
-
void SessionImpl::sync() {}
void SessionImpl::result(const framing::SequenceNumber& commandId, const std::string& value)
diff --git a/cpp/src/qpid/client/SessionImpl.h b/cpp/src/qpid/client/SessionImpl.h
index 0bcec4dd0c..7bb7136912 100644
--- a/cpp/src/qpid/client/SessionImpl.h
+++ b/cpp/src/qpid/client/SessionImpl.h
@@ -26,6 +26,7 @@
#include "Execution.h"
#include "Results.h"
+#include "qpid/SessionId.h"
#include "qpid/shared_ptr.h"
#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/ChannelHandler.h"
@@ -59,14 +60,14 @@ class SessionImpl : public framing::FrameHandler::InOutHandler,
private framing::AMQP_ClientOperations::ExecutionHandler
{
public:
- SessionImpl(shared_ptr<ConnectionImpl>, uint16_t channel, uint64_t maxFrameSize);
+ SessionImpl(const std::string& name, shared_ptr<ConnectionImpl>, uint16_t channel, uint64_t maxFrameSize);
~SessionImpl();
//NOTE: Public functions called in user thread.
framing::FrameSet::shared_ptr get();
- const framing::Uuid getId() const;
+ const SessionId getId() const;
uint16_t getChannel() const;
void setChannel(uint16_t channel);
@@ -76,8 +77,6 @@ public:
void resume(shared_ptr<ConnectionImpl>);
void suspend();
- void setSync(bool s);
- bool isSync();
void assertOpen() const;
Future send(const framing::AMQBody& command);
@@ -131,7 +130,8 @@ private:
Future sendCommand(const framing::AMQBody&, const framing::MethodContent* = 0);
void sendContent(const framing::MethodContent&);
void waitForCompletionImpl(const framing::SequenceNumber& id);
-
+ void requestTimeout(uint32_t timeout);
+
void sendCompletionImpl();
// Note: Following methods are called by network thread in
@@ -140,7 +140,6 @@ private:
void attached(const std::string& name);
void detach(const std::string& name);
void detached(const std::string& name, uint8_t detachCode);
- void requestTimeout(uint32_t timeout);
void timeout(uint32_t timeout);
void commandPoint(const framing::SequenceNumber& commandId, uint64_t commandOffset);
void expected(const framing::SequenceSet& commands, const framing::Array& fragments);
@@ -167,11 +166,9 @@ private:
std::string text; // Error text
mutable StateMonitor state;
mutable sys::Semaphore sendLock;
- volatile bool syncMode;
uint32_t detachedLifetime;
const uint64_t maxFrameSize;
- const framing::Uuid id;
- const std::string name;
+ const SessionId id;
shared_ptr<ConnectionImpl> connection;
framing::FrameHandler::MemFunRef<SessionImpl, &SessionImpl::proxyOut> ioHandler;
diff --git a/cpp/src/qpid/client/SubscriptionManager.cpp b/cpp/src/qpid/client/SubscriptionManager.cpp
index 2ba3f5fe62..6036f153f6 100644
--- a/cpp/src/qpid/client/SubscriptionManager.cpp
+++ b/cpp/src/qpid/client/SubscriptionManager.cpp
@@ -32,23 +32,23 @@
namespace qpid {
namespace client {
-SubscriptionManager::SubscriptionManager(Session& s)
+SubscriptionManager::SubscriptionManager(const Session& s)
: dispatcher(s), session(s),
messages(UNLIMITED), bytes(UNLIMITED), window(true),
acceptMode(0), acquireMode(0),
autoStop(true)
{}
-Completion SubscriptionManager::subscribeInternal(
+void SubscriptionManager::subscribeInternal(
const std::string& q, const std::string& dest)
{
- Completion c = session.messageSubscribe(arg::queue=q, arg::destination=dest,
- arg::acceptMode=acceptMode, arg::acquireMode=acquireMode);
- setFlowControl(dest, messages, bytes, window);
- return c;
+ async(session).messageSubscribe( // setFlowControl will sync.
+ arg::queue=q, arg::destination=dest,
+ arg::acceptMode=acceptMode, arg::acquireMode=acquireMode);
+ setFlowControl(dest, messages, bytes, window);
}
-Completion SubscriptionManager::subscribe(
+void SubscriptionManager::subscribe(
MessageListener& listener, const std::string& q, const std::string& d)
{
std::string dest=d.empty() ? q:d;
@@ -56,7 +56,7 @@ Completion SubscriptionManager::subscribe(
return subscribeInternal(q, dest);
}
-Completion SubscriptionManager::subscribe(
+void SubscriptionManager::subscribe(
LocalQueue& lq, const std::string& q, const std::string& d)
{
std::string dest=d.empty() ? q:d;
@@ -68,9 +68,9 @@ Completion SubscriptionManager::subscribe(
void SubscriptionManager::setFlowControl(
const std::string& dest, uint32_t messages, uint32_t bytes, bool window)
{
- session.messageSetFlowMode(dest, window);
- session.messageFlow(dest, 0, messages);
- session.messageFlow(dest, 1, bytes);
+ async(session).messageSetFlowMode(dest, window);
+ async(session).messageFlow(dest, 0, messages);
+ session.messageFlow(dest, 1, bytes); // Only need one sync
}
void SubscriptionManager::setFlowControl(
diff --git a/cpp/src/qpid/client/SubscriptionManager.h b/cpp/src/qpid/client/SubscriptionManager.h
index 4ccb95c968..4ff962f67b 100644
--- a/cpp/src/qpid/client/SubscriptionManager.h
+++ b/cpp/src/qpid/client/SubscriptionManager.h
@@ -45,10 +45,10 @@ class SubscriptionManager : public sys::Runnable
typedef sys::Mutex::ScopedLock Lock;
typedef sys::Mutex::ScopedUnlock Unlock;
- Completion subscribeInternal(const std::string& q, const std::string& dest);
+ void subscribeInternal(const std::string& q, const std::string& dest);
qpid::client::Dispatcher dispatcher;
- qpid::client::Session& session;
+ qpid::client::Session session;
uint32_t messages;
uint32_t bytes;
bool window;
@@ -58,7 +58,7 @@ class SubscriptionManager : public sys::Runnable
bool autoStop;
public:
- SubscriptionManager(Session& session);
+ SubscriptionManager(const Session& session);
/**
* Subscribe a MessagesListener to receive messages from queue.
@@ -68,7 +68,7 @@ class SubscriptionManager : public sys::Runnable
*@param tag Unique destination tag for the listener.
* If not specified, the queue name is used.
*/
- Completion subscribe(MessageListener& listener,
+ void subscribe(MessageListener& listener,
const std::string& queue,
const std::string& tag=std::string());
@@ -79,7 +79,7 @@ class SubscriptionManager : public sys::Runnable
*@param tag Unique destination tag for the listener.
* If not specified, the queue name is used.
*/
- Completion subscribe(LocalQueue& localQueue,
+ void subscribe(LocalQueue& localQueue,
const std::string& queue,
const std::string& tag=std::string());
diff --git a/cpp/src/tests/BrokerFixture.h b/cpp/src/tests/BrokerFixture.h
index 83b3f621c7..31f63d71a0 100644
--- a/cpp/src/tests/BrokerFixture.h
+++ b/cpp/src/tests/BrokerFixture.h
@@ -92,7 +92,7 @@ struct SessionFixtureT : BrokerFixture {
qpid::client::LocalQueue lq;
SessionFixtureT() : connection(broker->getPort()),
- session(connection.newSession(qpid::client::ASYNC)),
+ session(connection.newSession("SessionFixture")),
subs(session)
{}
diff --git a/cpp/src/tests/ClientSessionTest.cpp b/cpp/src/tests/ClientSessionTest.cpp
index 801e33d412..1dade47ee9 100644
--- a/cpp/src/tests/ClientSessionTest.cpp
+++ b/cpp/src/tests/ClientSessionTest.cpp
@@ -106,19 +106,19 @@ struct ClientSessionFixture : public ProxySessionFixture
QPID_AUTO_TEST_CASE(testQueueQuery) {
ClientSessionFixture fix;
- fix.session = fix.connection.newSession(ASYNC);
+ fix.session = fix.connection.newSession();
fix.session.queueDeclare(queue="my-queue", alternateExchange="amq.fanout", exclusive=true, autoDelete=true);
- TypedResult<QueueQueryResult> result = fix.session.queueQuery(string("my-queue"));
- BOOST_CHECK_EQUAL(false, result.get().getDurable());
- BOOST_CHECK_EQUAL(true, result.get().getExclusive());
+ QueueQueryResult result = fix.session.queueQuery(string("my-queue"));
+ BOOST_CHECK_EQUAL(false, result.getDurable());
+ BOOST_CHECK_EQUAL(true, result.getExclusive());
BOOST_CHECK_EQUAL(string("amq.fanout"),
- result.get().getAlternateExchange());
+ result.getAlternateExchange());
}
QPID_AUTO_TEST_CASE(testTransfer)
{
ClientSessionFixture fix;
- fix.session=fix.connection.newSession(ASYNC);
+ fix.session=fix.connection.newSession();
fix.declareSubscribe();
fix.session.messageTransfer(acceptMode=1, content=TransferContent("my-message", "my-queue"));
//get & test the message:
@@ -133,7 +133,7 @@ QPID_AUTO_TEST_CASE(testTransfer)
QPID_AUTO_TEST_CASE(testDispatcher)
{
ClientSessionFixture fix;
- fix.session =fix.connection.newSession(ASYNC);
+ fix.session =fix.connection.newSession();
fix.declareSubscribe();
size_t count = 100;
for (size_t i = 0; i < count; ++i)
@@ -148,7 +148,7 @@ QPID_AUTO_TEST_CASE(testDispatcher)
QPID_AUTO_TEST_CASE(testDispatcherThread)
{
ClientSessionFixture fix;
- fix.session =fix.connection.newSession(ASYNC);
+ fix.session =fix.connection.newSession();
fix.declareSubscribe();
size_t count = 10;
DummyListener listener(fix.session, "my-dest", count);
@@ -162,40 +162,42 @@ QPID_AUTO_TEST_CASE(testDispatcherThread)
BOOST_CHECK_EQUAL(lexical_cast<string>(i), listener.messages[i].getData());
}
-QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testSuspend0Timeout, 1)
-{
- ClientSessionFixture fix;
- fix.session =fix.connection.newSession(ASYNC, 0);
- fix.session.suspend(); // session has 0 timeout.
- try {
- fix.connection.resume(fix.session);
- BOOST_FAIL("Expected InvalidArgumentException.");
- } catch(const InternalErrorException&) {}
-}
-
-QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testUseSuspendedError, 1)
-{
- ClientSessionFixture fix;
- fix.session =fix.connection.newSession(ASYNC, 60);
- fix.session.suspend();
- try {
- fix.session.exchangeQuery(name="amq.fanout");
- BOOST_FAIL("Expected session suspended exception");
- } catch(const CommandInvalidException&) {}
-}
+// FIXME aconway 2008-05-26: Re-enable with final resume implementation.
+//
+// QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testSuspend0Timeout, 1)
+// {
+// ClientSessionFixture fix;
+// fix.session.suspend(); // session has 0 timeout.
+// try {
+// fix.connection.resume(fix.session);
+// BOOST_FAIL("Expected InvalidArgumentException.");
+// } catch(const InternalErrorException&) {}
+// }
+
+// QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testUseSuspendedError, 1)
+// {
+// ClientSessionFixture fix;
+// fix.session =fix.session.timeout(60);
+// fix.session.suspend();
+// try {
+// fix.session.exchangeQuery(name="amq.fanout");
+// BOOST_FAIL("Expected session suspended exception");
+// } catch(const CommandInvalidException&) {}
+// }
+
+// QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testSuspendResume, 1)
+// {
+// ClientSessionFixture fix;
+// fix.session.timeout(60);
+// fix.declareSubscribe();
+// fix.session.suspend();
+// // Make sure we are still subscribed after resume.
+// fix.connection.resume(fix.session);
+// fix.session.messageTransfer(content=TransferContent("my-message", "my-queue"));
+// FrameSet::shared_ptr msg = fix.session.get();
+// BOOST_CHECK_EQUAL(string("my-message"), msg->getContent());
+// }
-QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testSuspendResume, 1)
-{
- ClientSessionFixture fix;
- fix.session =fix.connection.newSession(ASYNC, 60);
- fix.declareSubscribe();
- fix.session.suspend();
- // Make sure we are still subscribed after resume.
- fix.connection.resume(fix.session);
- fix.session.messageTransfer(content=TransferContent("my-message", "my-queue"));
- FrameSet::shared_ptr msg = fix.session.get();
- BOOST_CHECK_EQUAL(string("my-message"), msg->getContent());
-}
QPID_AUTO_TEST_CASE(testSendToSelf) {
ClientSessionFixture fix;
diff --git a/cpp/src/tests/XmlClientSessionTest.cpp b/cpp/src/tests/XmlClientSessionTest.cpp
index dc6cce24fc..9b0c035f37 100644
--- a/cpp/src/tests/XmlClientSessionTest.cpp
+++ b/cpp/src/tests/XmlClientSessionTest.cpp
@@ -121,7 +121,7 @@ struct ClientSessionFixture : public ProxySessionFixture
QPID_AUTO_TEST_CASE(testXmlBinding) {
ClientSessionFixture f;
- Session session = f.connection.newSession(ASYNC);
+ Session session = f.connection.newSession();
SubscriptionManager subscriptions(session);
SubscribedLocalQueue localQueue(subscriptions);
diff --git a/cpp/src/tests/client_test.cpp b/cpp/src/tests/client_test.cpp
index 20e8b21a3a..04269b299d 100644
--- a/cpp/src/tests/client_test.cpp
+++ b/cpp/src/tests/client_test.cpp
@@ -92,7 +92,7 @@ int main(int argc, char** argv)
//Create and open a session on the connection through which
//most functionality is exposed:
- Session session = connection.newSession(ASYNC);
+ Session session = connection.newSession();
if (opts.verbose) std::cout << "Opened session." << std::endl;
diff --git a/cpp/src/tests/consume.cpp b/cpp/src/tests/consume.cpp
index 6d2f0a7413..43e08a80b7 100644
--- a/cpp/src/tests/consume.cpp
+++ b/cpp/src/tests/consume.cpp
@@ -62,7 +62,7 @@ struct Client
Client()
{
opts.open(connection);
- session = connection.newSession(ASYNC);
+ session = connection.newSession();
}
void consume()
diff --git a/cpp/src/tests/exception_test.cpp b/cpp/src/tests/exception_test.cpp
index f75269c959..a656e0cf1a 100644
--- a/cpp/src/tests/exception_test.cpp
+++ b/cpp/src/tests/exception_test.cpp
@@ -96,7 +96,6 @@ QPID_AUTO_TEST_CASE(DisconnectedListen) {
QPID_AUTO_TEST_CASE(NoSuchQueueTest) {
ProxySessionFixture fix;
- fix.session.setSynchronous(true);
BOOST_CHECK_THROW(fix.subs.subscribe(fix.lq, "no such queue"), NotFoundException);
}
diff --git a/cpp/src/tests/latencytest.cpp b/cpp/src/tests/latencytest.cpp
index 0b343d0243..f4cbade36b 100644
--- a/cpp/src/tests/latencytest.cpp
+++ b/cpp/src/tests/latencytest.cpp
@@ -30,7 +30,7 @@
#include "TestOptions.h"
#include "qpid/client/Connection.h"
#include "qpid/client/Message.h"
-#include "qpid/client/Session.h"
+#include "qpid/client/AsyncSession.h"
#include "qpid/client/SubscriptionManager.h"
using namespace qpid;
@@ -99,7 +99,7 @@ class Client : public Runnable
{
protected:
Connection connection;
- Session session;
+ AsyncSession session;
Thread thread;
string queue;
@@ -157,7 +157,7 @@ public:
Client::Client(const string& q) : queue(q)
{
opts.open(connection);
- session = connection.newSession(ASYNC);
+ session = connection.newSession();
}
void Client::start()
@@ -262,7 +262,7 @@ void Sender::sendByCount()
uint64_t sentAt(current_time());
msg.getDeliveryProperties().setTimestamp(sentAt);
//msg.getHeaders().setTimestamp("sent-at", sentAt);//TODO add support for uint64_t to field tables
- session.messageTransfer(arg::content=msg, arg::acceptMode=1);
+ async(session).messageTransfer(arg::content=msg, arg::acceptMode=1);
}
session.sync();
}
@@ -283,7 +283,7 @@ void Sender::sendByRate()
uint64_t sentAt(current_time());
msg.getDeliveryProperties().setTimestamp(sentAt);
//msg.getHeaders().setTimestamp("sent-at", sentAt);//TODO add support for uint64_t to field tables
- session.messageTransfer(arg::content=msg, arg::acceptMode=1);
+ async(session).messageTransfer(arg::content=msg, arg::acceptMode=1);
}
uint64_t timeTaken = (current_time() - start) / TIME_USEC;
if (timeTaken < 1000) {
diff --git a/cpp/src/tests/perftest.cpp b/cpp/src/tests/perftest.cpp
index 2a8a9ec17c..91ecd83f50 100644
--- a/cpp/src/tests/perftest.cpp
+++ b/cpp/src/tests/perftest.cpp
@@ -21,7 +21,7 @@
#include "TestOptions.h"
-#include "qpid/client/Session.h"
+#include "qpid/client/AsyncSession.h"
#include "qpid/client/SubscriptionManager.h"
#include "qpid/client/Connection.h"
#include "qpid/client/Completion.h"
@@ -194,12 +194,12 @@ Opts opts;
struct Client : public Runnable {
Connection connection;
- Session session;
+ AsyncSession session;
Thread thread;
Client() {
opts.open(connection);
- session = connection.newSession(ASYNC);
+ session = connection.newSession();
}
~Client() {
@@ -431,7 +431,7 @@ struct PublishThread : public Client {
offset = 5;
data += "data:";//marker (requested for latency testing tool scripts)
data += string(sizeof(size_t), 'X');//space for seq no
- data += string(reinterpret_cast<const char*>(session.getId().data()), session.getId().size());
+ data += session.getId().str();
if (opts.size > data.size()) {
data += string(opts.size - data.size(), 'X');
} else if(opts.size < data.size()) {
diff --git a/cpp/src/tests/publish.cpp b/cpp/src/tests/publish.cpp
index 17e3d4e104..b78f3fdf6d 100644
--- a/cpp/src/tests/publish.cpp
+++ b/cpp/src/tests/publish.cpp
@@ -28,7 +28,7 @@
#include "TestOptions.h"
#include "qpid/client/Connection.h"
#include "qpid/client/Message.h"
-#include "qpid/client/Session.h"
+#include "qpid/client/AsyncSession.h"
#include "qpid/client/SubscriptionManager.h"
using namespace qpid;
@@ -61,12 +61,12 @@ Args opts;
struct Client
{
Connection connection;
- Session session;
+ AsyncSession session;
Client()
{
opts.open(connection);
- session = connection.newSession(ASYNC);
+ session = connection.newSession();
}
std::string id(uint i)
diff --git a/cpp/src/tests/topic_listener.cpp b/cpp/src/tests/topic_listener.cpp
index 8f0e290070..6daf928401 100644
--- a/cpp/src/tests/topic_listener.cpp
+++ b/cpp/src/tests/topic_listener.cpp
@@ -53,7 +53,7 @@ using namespace std;
* defined.
*/
class Listener : public MessageListener{
- Session& session;
+ Session session;
SubscriptionManager& mgr;
const string responseQueue;
const bool transactional;
@@ -64,7 +64,7 @@ class Listener : public MessageListener{
void shutdown();
void report();
public:
- Listener(Session& session, SubscriptionManager& mgr, const string& reponseQueue, bool tx);
+ Listener(const Session& session, SubscriptionManager& mgr, const string& reponseQueue, bool tx);
virtual void received(Message& msg);
};
@@ -101,7 +101,7 @@ int main(int argc, char** argv){
else {
Connection connection;
args.open(connection);
- Session session = connection.newSession(ASYNC);
+ AsyncSession session = connection.newSession();
if (args.transactional) {
session.txSelect();
}
@@ -127,7 +127,8 @@ int main(int argc, char** argv){
mgr.setFlowControl(SubscriptionManager::UNLIMITED, SubscriptionManager::UNLIMITED, false);
}
mgr.subscribe(listener, control);
-
+ session.sync();
+
cout << "topic_listener: listening..." << endl;
mgr.run();
if (args.durable) {
@@ -144,7 +145,7 @@ int main(int argc, char** argv){
return 1;
}
-Listener::Listener(Session& s, SubscriptionManager& m, const string& _responseq, bool tx) :
+Listener::Listener(const Session& s, SubscriptionManager& m, const string& _responseq, bool tx) :
session(s), mgr(m), responseQueue(_responseq), transactional(tx), init(false), count(0){}
void Listener::received(Message& message){
diff --git a/cpp/src/tests/topic_publisher.cpp b/cpp/src/tests/topic_publisher.cpp
index a6a7b4d80d..c8f0d543ec 100644
--- a/cpp/src/tests/topic_publisher.cpp
+++ b/cpp/src/tests/topic_publisher.cpp
@@ -37,7 +37,7 @@
#include "TestOptions.h"
#include "qpid/client/Connection.h"
#include "qpid/client/MessageListener.h"
-#include "qpid/client/Session.h"
+#include "qpid/client/AsyncSession.h"
#include "qpid/client/SubscriptionManager.h"
#include "qpid/sys/Monitor.h"
#include <unistd.h>
@@ -56,7 +56,7 @@ using namespace std;
* back by the subscribers.
*/
class Publisher {
- Session& session;
+ AsyncSession session;
SubscriptionManager mgr;
LocalQueue queue;
const string controlTopic;
@@ -66,7 +66,7 @@ class Publisher {
string generateData(int size);
public:
- Publisher(Session& session, const string& controlTopic, bool tx, bool durable);
+ Publisher(const AsyncSession& session, const string& controlTopic, bool tx, bool durable);
int64_t publish(int msgs, int listeners, int size);
void terminate();
};
@@ -107,7 +107,7 @@ int main(int argc, char** argv) {
else {
Connection connection;
args.open(connection);
- Session session = connection.newSession(ASYNC);
+ AsyncSession session = connection.newSession();
if (args.transactional) {
session.txSelect();
}
@@ -150,7 +150,7 @@ int main(int argc, char** argv) {
return 1;
}
-Publisher::Publisher(Session& _session, const string& _controlTopic, bool tx, bool d) :
+Publisher::Publisher(const AsyncSession& _session, const string& _controlTopic, bool tx, bool d) :
session(_session), mgr(session), controlTopic(_controlTopic), transactional(tx), durable(d)
{
mgr.subscribe(queue, "response");
diff --git a/cpp/src/tests/topictest b/cpp/src/tests/topictest
index c36aa319ba..ad7c5df693 100755
--- a/cpp/src/tests/topictest
+++ b/cpp/src/tests/topictest
@@ -36,5 +36,5 @@ for ((i=$SUBSCRIBERS ; i--; )); do
subscribe $i &
done
# FIXME aconway 2007-03-27: Hack around startup race. Fix topic test.
-sleep 1
+sleep 2
publish 2>&1 || exit 1
diff --git a/cpp/src/tests/txtest.cpp b/cpp/src/tests/txtest.cpp
index a8369df759..6eb812738d 100644
--- a/cpp/src/tests/txtest.cpp
+++ b/cpp/src/tests/txtest.cpp
@@ -28,7 +28,7 @@
#include "TestOptions.h"
#include "qpid/client/Connection.h"
#include "qpid/client/Message.h"
-#include "qpid/client/Session.h"
+#include "qpid/client/AsyncSession.h"
#include "qpid/client/SubscriptionManager.h"
using namespace qpid;
@@ -96,12 +96,12 @@ Args opts;
struct Client
{
Connection connection;
- Session session;
+ AsyncSession session;
Client()
{
opts.open(connection);
- session = connection.newSession(ASYNC);
+ session = connection.newSession();
}
~Client()