summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-02-20 15:26:05 +0000
committerAlan Conway <aconway@apache.org>2008-02-20 15:26:05 +0000
commitb9c71c852e2e7bbc1635ff62fabf2469e2175399 (patch)
treeee3ce9dcd6dc45e76270f6f4d696eb09bfc63895 /cpp/src
parent615beb132f725ebc4a88d58dc5f3b8af8419f932 (diff)
downloadqpid-python-b9c71c852e2e7bbc1635ff62fabf2469e2175399.tar.gz
Added non-optional enum { SYNC, ASYNC } parameter to newSession.
Updated API doc in client/SessionBase.h git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@629503 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/client/Connection.cpp7
-rw-r--r--cpp/src/qpid/client/Connection.h3
-rw-r--r--cpp/src/qpid/client/SessionBase.cpp6
-rw-r--r--cpp/src/qpid/client/SessionBase.h46
-rw-r--r--cpp/src/tests/BrokerFixture.h4
-rw-r--r--cpp/src/tests/ClientSessionTest.cpp14
-rw-r--r--cpp/src/tests/client_test.cpp2
-rw-r--r--cpp/src/tests/cluster_client.cpp4
-rw-r--r--cpp/src/tests/latencytest.cpp2
-rw-r--r--cpp/src/tests/perftest.cpp2
-rw-r--r--cpp/src/tests/topic_listener.cpp2
-rw-r--r--cpp/src/tests/topic_publisher.cpp2
-rw-r--r--cpp/src/tests/txtest.cpp2
13 files changed, 66 insertions, 30 deletions
diff --git a/cpp/src/qpid/client/Connection.cpp b/cpp/src/qpid/client/Connection.cpp
index b2508f4a24..26113c1254 100644
--- a/cpp/src/qpid/client/Connection.cpp
+++ b/cpp/src/qpid/client/Connection.cpp
@@ -70,12 +70,15 @@ void Connection::open(
}
void Connection::openChannel(Channel& channel) {
- channel.open(newSession());
+ channel.open(newSession(ASYNC));
}
-Session_0_10 Connection::newSession(uint32_t detachedLifetime) {
+Session_0_10 Connection::newSession(SynchronousMode sync,
+ uint32_t detachedLifetime)
+{
shared_ptr<SessionCore> core(
new SessionCore(impl, ++channelIdCounter, max_frame_size));
+ core->setSync(sync);
impl->addSession(core);
core->open(detachedLifetime);
return Session_0_10(core);
diff --git a/cpp/src/qpid/client/Connection.h b/cpp/src/qpid/client/Connection.h
index 053949a51b..e6bfbddef6 100644
--- a/cpp/src/qpid/client/Connection.h
+++ b/cpp/src/qpid/client/Connection.h
@@ -39,7 +39,6 @@ namespace qpid {
*/
namespace client {
-
/**
* \defgroup clientapi Application API for an AMQP client
*/
@@ -135,7 +134,7 @@ class Connection
* that the broker may discard the session state. Default is 0,
* meaning the session cannot be resumed.
*/
- Session_0_10 newSession(uint32_t detachedLifetime=0);
+ Session_0_10 newSession(SynchronousMode sync, uint32_t detachedLifetime=0);
/**
* Resume a suspendded session. A session may be resumed
diff --git a/cpp/src/qpid/client/SessionBase.cpp b/cpp/src/qpid/client/SessionBase.cpp
index 9b6123cbc4..0e1fa67bda 100644
--- a/cpp/src/qpid/client/SessionBase.cpp
+++ b/cpp/src/qpid/client/SessionBase.cpp
@@ -29,8 +29,14 @@ SessionBase::~SessionBase() {}
SessionBase::SessionBase(shared_ptr<SessionCore> core) : impl(core) {}
void SessionBase::suspend() { impl->suspend(); }
void SessionBase::close() { impl->close(); }
+
void SessionBase::setSynchronous(bool isSync) { impl->setSync(isSync); }
+void SessionBase::setSynchronous(SynchronousMode m) { impl->setSync(m); }
bool SessionBase::isSynchronous() const { return impl->isSync(); }
+SynchronousMode SessionBase::getSynchronous() const {
+ return SynchronousMode(impl->isSync());
+}
+
Execution& SessionBase::getExecution() { return impl->getExecution(); }
Uuid SessionBase::getId() const { return impl->getId(); }
framing::FrameSet::shared_ptr SessionBase::get() { return impl->get(); }
diff --git a/cpp/src/qpid/client/SessionBase.h b/cpp/src/qpid/client/SessionBase.h
index 87c0892b61..3565145bb9 100644
--- a/cpp/src/qpid/client/SessionBase.h
+++ b/cpp/src/qpid/client/SessionBase.h
@@ -45,6 +45,32 @@ using framing::MethodContent;
using framing::SequenceNumberSet;
using framing::Uuid;
+/** \defgroup clientapi Synchronous mode of a session.
+ *
+ * SYNC means that Session functions do not return until the remote
+ * broker has confirmed that the command was executed.
+ *
+ * ASYNC means that the client sends commands asynchronously, Session
+ * functions return immediately.
+ *
+ * ASYNC mode gives better performance for high-volume traffic, but
+ * requires some additional caution:
+ *
+ * Session functions return immediately. If the command causes an
+ * exception on the broker, the exception will be thrown on a
+ * <em>later</em> function call.
+ *
+ * If you need to notify some extenal agent that some actions have
+ * been taken (e.g. binding queues to exchanages), you must call
+ * Session::sync() first, to ensure that all the commands are complete.
+ *
+ * You can freely switch between modes by calling Session::setSynchronous()
+ *
+ * @see Session::sync(), Session::setSynchronous()
+ */
+enum SynchronousMode { SYNC=true, ASYNC=false };
+
+
/**
* Basic session operations that are not derived from AMQP XML methods.
*/
@@ -61,20 +87,20 @@ class SessionBase
Uuid getId() const;
/**
- * In synchronous mode, the session sets the sync bit on every
- * command and waits for the broker's response before returning.
- * Note this gives lower throughput than non-synchronous mode.
+ * In synchronous mode, wait for the broker's response before
+ * returning. Note this gives lower throughput than asynchronous
+ * mode.
*
- * In non-synchronous mode commands are sent without waiting
+ * In asynchronous mode commands are sent without waiting
* for a respose (you can use the returned Completion object
* to wait for completion.)
*
- *@param if true set the session to synchronous mode, else
- * set it to non-synchronous mode.
+ * @see SynchronousMode
*/
- void setSynchronous(bool isSync);
-
+ void setSynchronous(SynchronousMode mode);
+ void setSynchronous(bool set);
bool isSynchronous() const;
+ SynchronousMode getSynchronous() const;
/**
* Suspend the session, can be resumed on a different connection.
@@ -85,8 +111,10 @@ class SessionBase
/** Close the session */
void close();
- /** Synchronize with the broker. Wait for all commands issued so far in
+ /**
+ * Synchronize with the broker. Wait for all commands issued so far in
* the session to complete.
+ * @see SynchronousMode
*/
void sync();
diff --git a/cpp/src/tests/BrokerFixture.h b/cpp/src/tests/BrokerFixture.h
index b4e4a2082b..9fcdb57a99 100644
--- a/cpp/src/tests/BrokerFixture.h
+++ b/cpp/src/tests/BrokerFixture.h
@@ -91,8 +91,8 @@ struct SessionFixtureT : BrokerFixture {
qpid::client::LocalQueue lq;
SessionFixtureT() : connection(broker->getPort()),
- session(connection.newSession()),
- subs(session)
+ session(connection.newSession(qpid::client::ASYNC)),
+ subs(session)
{}
~SessionFixtureT() {
diff --git a/cpp/src/tests/ClientSessionTest.cpp b/cpp/src/tests/ClientSessionTest.cpp
index 87a4f59999..c299837f86 100644
--- a/cpp/src/tests/ClientSessionTest.cpp
+++ b/cpp/src/tests/ClientSessionTest.cpp
@@ -103,7 +103,7 @@ struct ClientSessionFixture : public ProxySessionFixture
};
BOOST_FIXTURE_TEST_CASE(testQueueQuery, ClientSessionFixture) {
- session =connection.newSession();
+ session =connection.newSession(ASYNC);
session.queueDeclare(queue="my-queue", alternateExchange="amq.fanout", exclusive=true, autoDelete=true);
TypedResult<QueueQueryResult> result = session.queueQuery(string("my-queue"));
BOOST_CHECK_EQUAL(false, result.get().getDurable());
@@ -114,7 +114,7 @@ BOOST_FIXTURE_TEST_CASE(testQueueQuery, ClientSessionFixture) {
BOOST_FIXTURE_TEST_CASE(testTransfer, ClientSessionFixture)
{
- session=connection.newSession();
+ session=connection.newSession(ASYNC);
declareSubscribe();
session.messageTransfer(content=TransferContent("my-message", "my-queue"));
//get & test the message:
@@ -127,7 +127,7 @@ BOOST_FIXTURE_TEST_CASE(testTransfer, ClientSessionFixture)
BOOST_FIXTURE_TEST_CASE(testDispatcher, ClientSessionFixture)
{
- session =connection.newSession();
+ session =connection.newSession(ASYNC);
declareSubscribe();
size_t count = 100;
for (size_t i = 0; i < count; ++i)
@@ -142,7 +142,7 @@ BOOST_FIXTURE_TEST_CASE(testDispatcher, ClientSessionFixture)
/* FIXME aconway 2008-01-28: hangs
BOOST_FIXTURE_TEST_CASE(testDispatcherThread, ClientSessionFixture)
{
- session =connection.newSession();
+ session =connection.newSession(ASYNC);
declareSubscribe();
size_t count = 10000;
DummyListener listener(session, "my-dest", count);
@@ -160,7 +160,7 @@ BOOST_FIXTURE_TEST_CASE(testDispatcherThread, ClientSessionFixture)
BOOST_FIXTURE_TEST_CASE(_FIXTURE, ClientSessionFixture)
{
- session =connection.newSession(0);
+ session =connection.newSession(ASYNC, 0);
session.suspend(); // session has 0 timeout.
try {
connection.resume(session);
@@ -170,7 +170,7 @@ BOOST_FIXTURE_TEST_CASE(_FIXTURE, ClientSessionFixture)
BOOST_FIXTURE_TEST_CASE(testUseSuspendedError, ClientSessionFixture)
{
- session =connection.newSession(60);
+ session =connection.newSession(ASYNC, 60);
session.suspend();
try {
session.exchangeQuery(name="amq.fanout");
@@ -180,7 +180,7 @@ BOOST_FIXTURE_TEST_CASE(testUseSuspendedError, ClientSessionFixture)
BOOST_FIXTURE_TEST_CASE(testSuspendResume, ClientSessionFixture)
{
- session =connection.newSession(60);
+ session =connection.newSession(ASYNC, 60);
declareSubscribe();
session.suspend();
// Make sure we are still subscribed after resume.
diff --git a/cpp/src/tests/client_test.cpp b/cpp/src/tests/client_test.cpp
index eb145272ca..84fc9434de 100644
--- a/cpp/src/tests/client_test.cpp
+++ b/cpp/src/tests/client_test.cpp
@@ -92,7 +92,7 @@ int main(int argc, char** argv)
//Create and open a session on the connection through which
//most functionality is exposed:
- Session_0_10 session = connection.newSession();
+ Session_0_10 session = connection.newSession(ASYNC);
if (opts.trace) std::cout << "Opened session." << std::endl;
diff --git a/cpp/src/tests/cluster_client.cpp b/cpp/src/tests/cluster_client.cpp
index 30b7e38801..cd048d1651 100644
--- a/cpp/src/tests/cluster_client.cpp
+++ b/cpp/src/tests/cluster_client.cpp
@@ -62,14 +62,14 @@ BOOST_AUTO_TEST_CASE(testWiringReplication) {
ClusterConnections cluster;
BOOST_REQUIRE(cluster.size() > 1);
- Session broker0 = cluster[0]->newSession();
+ Session broker0 = cluster[0]->newSession(ASYNC);
broker0.exchangeDeclare(exchange="ex");
broker0.queueDeclare(queue="q");
broker0.queueBind(exchange="ex", queue="q", routingKey="key");
broker0.close();
for (size_t i = 1; i < cluster.size(); ++i) {
- Session s = cluster[i]->newSession();
+ Session s = cluster[i]->newSession(ASYNC);
s.messageTransfer(content=TransferContent("data", "key", "ex"));
s.messageSubscribe(queue="q", destination="q");
s.messageFlow(destination="q", unit=0, value=1);//messages
diff --git a/cpp/src/tests/latencytest.cpp b/cpp/src/tests/latencytest.cpp
index ddafe0f49b..86200054d8 100644
--- a/cpp/src/tests/latencytest.cpp
+++ b/cpp/src/tests/latencytest.cpp
@@ -156,7 +156,7 @@ public:
Client::Client(const string& q) : queue(q)
{
opts.open(connection);
- session = connection.newSession();
+ session = connection.newSession(ASYNC);
}
void Client::start()
diff --git a/cpp/src/tests/perftest.cpp b/cpp/src/tests/perftest.cpp
index e67acd4465..bc638635da 100644
--- a/cpp/src/tests/perftest.cpp
+++ b/cpp/src/tests/perftest.cpp
@@ -196,7 +196,7 @@ struct Client : public Runnable {
Client() {
opts.open(connection);
- session = connection.newSession();
+ session = connection.newSession(ASYNC);
}
~Client() {
diff --git a/cpp/src/tests/topic_listener.cpp b/cpp/src/tests/topic_listener.cpp
index c78bc4d73b..ec73f3cbe0 100644
--- a/cpp/src/tests/topic_listener.cpp
+++ b/cpp/src/tests/topic_listener.cpp
@@ -101,7 +101,7 @@ int main(int argc, char** argv){
else {
Connection connection(args.trace);
args.open(connection);
- Session_0_10 session = connection.newSession();
+ Session_0_10 session = connection.newSession(ASYNC);
if (args.transactional) {
session.txSelect();
}
diff --git a/cpp/src/tests/topic_publisher.cpp b/cpp/src/tests/topic_publisher.cpp
index 80c9bf6607..24a4fc6752 100644
--- a/cpp/src/tests/topic_publisher.cpp
+++ b/cpp/src/tests/topic_publisher.cpp
@@ -107,7 +107,7 @@ int main(int argc, char** argv) {
else {
Connection connection(args.trace);
args.open(connection);
- Session_0_10 session = connection.newSession();
+ Session_0_10 session = connection.newSession(ASYNC);
if (args.transactional) {
session.txSelect();
}
diff --git a/cpp/src/tests/txtest.cpp b/cpp/src/tests/txtest.cpp
index 4373a9c1e7..f7776dee8d 100644
--- a/cpp/src/tests/txtest.cpp
+++ b/cpp/src/tests/txtest.cpp
@@ -101,7 +101,7 @@ struct Client
Client()
{
opts.open(connection);
- session = connection.newSession();
+ session = connection.newSession(ASYNC);
}
~Client()