diff options
author | Alan Conway <aconway@apache.org> | 2008-02-20 15:26:05 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-02-20 15:26:05 +0000 |
commit | b9c71c852e2e7bbc1635ff62fabf2469e2175399 (patch) | |
tree | ee3ce9dcd6dc45e76270f6f4d696eb09bfc63895 /cpp/src | |
parent | 615beb132f725ebc4a88d58dc5f3b8af8419f932 (diff) | |
download | qpid-python-b9c71c852e2e7bbc1635ff62fabf2469e2175399.tar.gz |
Added non-optional enum { SYNC, ASYNC } parameter to newSession.
Updated API doc in client/SessionBase.h
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@629503 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/client/Connection.cpp | 7 | ||||
-rw-r--r-- | cpp/src/qpid/client/Connection.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/client/SessionBase.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/client/SessionBase.h | 46 | ||||
-rw-r--r-- | cpp/src/tests/BrokerFixture.h | 4 | ||||
-rw-r--r-- | cpp/src/tests/ClientSessionTest.cpp | 14 | ||||
-rw-r--r-- | cpp/src/tests/client_test.cpp | 2 | ||||
-rw-r--r-- | cpp/src/tests/cluster_client.cpp | 4 | ||||
-rw-r--r-- | cpp/src/tests/latencytest.cpp | 2 | ||||
-rw-r--r-- | cpp/src/tests/perftest.cpp | 2 | ||||
-rw-r--r-- | cpp/src/tests/topic_listener.cpp | 2 | ||||
-rw-r--r-- | cpp/src/tests/topic_publisher.cpp | 2 | ||||
-rw-r--r-- | cpp/src/tests/txtest.cpp | 2 |
13 files changed, 66 insertions, 30 deletions
diff --git a/cpp/src/qpid/client/Connection.cpp b/cpp/src/qpid/client/Connection.cpp index b2508f4a24..26113c1254 100644 --- a/cpp/src/qpid/client/Connection.cpp +++ b/cpp/src/qpid/client/Connection.cpp @@ -70,12 +70,15 @@ void Connection::open( } void Connection::openChannel(Channel& channel) { - channel.open(newSession()); + channel.open(newSession(ASYNC)); } -Session_0_10 Connection::newSession(uint32_t detachedLifetime) { +Session_0_10 Connection::newSession(SynchronousMode sync, + uint32_t detachedLifetime) +{ shared_ptr<SessionCore> core( new SessionCore(impl, ++channelIdCounter, max_frame_size)); + core->setSync(sync); impl->addSession(core); core->open(detachedLifetime); return Session_0_10(core); diff --git a/cpp/src/qpid/client/Connection.h b/cpp/src/qpid/client/Connection.h index 053949a51b..e6bfbddef6 100644 --- a/cpp/src/qpid/client/Connection.h +++ b/cpp/src/qpid/client/Connection.h @@ -39,7 +39,6 @@ namespace qpid { */ namespace client { - /** * \defgroup clientapi Application API for an AMQP client */ @@ -135,7 +134,7 @@ class Connection * that the broker may discard the session state. Default is 0, * meaning the session cannot be resumed. */ - Session_0_10 newSession(uint32_t detachedLifetime=0); + Session_0_10 newSession(SynchronousMode sync, uint32_t detachedLifetime=0); /** * Resume a suspendded session. A session may be resumed diff --git a/cpp/src/qpid/client/SessionBase.cpp b/cpp/src/qpid/client/SessionBase.cpp index 9b6123cbc4..0e1fa67bda 100644 --- a/cpp/src/qpid/client/SessionBase.cpp +++ b/cpp/src/qpid/client/SessionBase.cpp @@ -29,8 +29,14 @@ SessionBase::~SessionBase() {} SessionBase::SessionBase(shared_ptr<SessionCore> core) : impl(core) {} void SessionBase::suspend() { impl->suspend(); } void SessionBase::close() { impl->close(); } + void SessionBase::setSynchronous(bool isSync) { impl->setSync(isSync); } +void SessionBase::setSynchronous(SynchronousMode m) { impl->setSync(m); } bool SessionBase::isSynchronous() const { return impl->isSync(); } +SynchronousMode SessionBase::getSynchronous() const { + return SynchronousMode(impl->isSync()); +} + Execution& SessionBase::getExecution() { return impl->getExecution(); } Uuid SessionBase::getId() const { return impl->getId(); } framing::FrameSet::shared_ptr SessionBase::get() { return impl->get(); } diff --git a/cpp/src/qpid/client/SessionBase.h b/cpp/src/qpid/client/SessionBase.h index 87c0892b61..3565145bb9 100644 --- a/cpp/src/qpid/client/SessionBase.h +++ b/cpp/src/qpid/client/SessionBase.h @@ -45,6 +45,32 @@ using framing::MethodContent; using framing::SequenceNumberSet; using framing::Uuid; +/** \defgroup clientapi Synchronous mode of a session. + * + * SYNC means that Session functions do not return until the remote + * broker has confirmed that the command was executed. + * + * ASYNC means that the client sends commands asynchronously, Session + * functions return immediately. + * + * ASYNC mode gives better performance for high-volume traffic, but + * requires some additional caution: + * + * Session functions return immediately. If the command causes an + * exception on the broker, the exception will be thrown on a + * <em>later</em> function call. + * + * If you need to notify some extenal agent that some actions have + * been taken (e.g. binding queues to exchanages), you must call + * Session::sync() first, to ensure that all the commands are complete. + * + * You can freely switch between modes by calling Session::setSynchronous() + * + * @see Session::sync(), Session::setSynchronous() + */ +enum SynchronousMode { SYNC=true, ASYNC=false }; + + /** * Basic session operations that are not derived from AMQP XML methods. */ @@ -61,20 +87,20 @@ class SessionBase Uuid getId() const; /** - * In synchronous mode, the session sets the sync bit on every - * command and waits for the broker's response before returning. - * Note this gives lower throughput than non-synchronous mode. + * In synchronous mode, wait for the broker's response before + * returning. Note this gives lower throughput than asynchronous + * mode. * - * In non-synchronous mode commands are sent without waiting + * In asynchronous mode commands are sent without waiting * for a respose (you can use the returned Completion object * to wait for completion.) * - *@param if true set the session to synchronous mode, else - * set it to non-synchronous mode. + * @see SynchronousMode */ - void setSynchronous(bool isSync); - + void setSynchronous(SynchronousMode mode); + void setSynchronous(bool set); bool isSynchronous() const; + SynchronousMode getSynchronous() const; /** * Suspend the session, can be resumed on a different connection. @@ -85,8 +111,10 @@ class SessionBase /** Close the session */ void close(); - /** Synchronize with the broker. Wait for all commands issued so far in + /** + * Synchronize with the broker. Wait for all commands issued so far in * the session to complete. + * @see SynchronousMode */ void sync(); diff --git a/cpp/src/tests/BrokerFixture.h b/cpp/src/tests/BrokerFixture.h index b4e4a2082b..9fcdb57a99 100644 --- a/cpp/src/tests/BrokerFixture.h +++ b/cpp/src/tests/BrokerFixture.h @@ -91,8 +91,8 @@ struct SessionFixtureT : BrokerFixture { qpid::client::LocalQueue lq; SessionFixtureT() : connection(broker->getPort()), - session(connection.newSession()), - subs(session) + session(connection.newSession(qpid::client::ASYNC)), + subs(session) {} ~SessionFixtureT() { diff --git a/cpp/src/tests/ClientSessionTest.cpp b/cpp/src/tests/ClientSessionTest.cpp index 87a4f59999..c299837f86 100644 --- a/cpp/src/tests/ClientSessionTest.cpp +++ b/cpp/src/tests/ClientSessionTest.cpp @@ -103,7 +103,7 @@ struct ClientSessionFixture : public ProxySessionFixture }; BOOST_FIXTURE_TEST_CASE(testQueueQuery, ClientSessionFixture) { - session =connection.newSession(); + session =connection.newSession(ASYNC); session.queueDeclare(queue="my-queue", alternateExchange="amq.fanout", exclusive=true, autoDelete=true); TypedResult<QueueQueryResult> result = session.queueQuery(string("my-queue")); BOOST_CHECK_EQUAL(false, result.get().getDurable()); @@ -114,7 +114,7 @@ BOOST_FIXTURE_TEST_CASE(testQueueQuery, ClientSessionFixture) { BOOST_FIXTURE_TEST_CASE(testTransfer, ClientSessionFixture) { - session=connection.newSession(); + session=connection.newSession(ASYNC); declareSubscribe(); session.messageTransfer(content=TransferContent("my-message", "my-queue")); //get & test the message: @@ -127,7 +127,7 @@ BOOST_FIXTURE_TEST_CASE(testTransfer, ClientSessionFixture) BOOST_FIXTURE_TEST_CASE(testDispatcher, ClientSessionFixture) { - session =connection.newSession(); + session =connection.newSession(ASYNC); declareSubscribe(); size_t count = 100; for (size_t i = 0; i < count; ++i) @@ -142,7 +142,7 @@ BOOST_FIXTURE_TEST_CASE(testDispatcher, ClientSessionFixture) /* FIXME aconway 2008-01-28: hangs BOOST_FIXTURE_TEST_CASE(testDispatcherThread, ClientSessionFixture) { - session =connection.newSession(); + session =connection.newSession(ASYNC); declareSubscribe(); size_t count = 10000; DummyListener listener(session, "my-dest", count); @@ -160,7 +160,7 @@ BOOST_FIXTURE_TEST_CASE(testDispatcherThread, ClientSessionFixture) BOOST_FIXTURE_TEST_CASE(_FIXTURE, ClientSessionFixture) { - session =connection.newSession(0); + session =connection.newSession(ASYNC, 0); session.suspend(); // session has 0 timeout. try { connection.resume(session); @@ -170,7 +170,7 @@ BOOST_FIXTURE_TEST_CASE(_FIXTURE, ClientSessionFixture) BOOST_FIXTURE_TEST_CASE(testUseSuspendedError, ClientSessionFixture) { - session =connection.newSession(60); + session =connection.newSession(ASYNC, 60); session.suspend(); try { session.exchangeQuery(name="amq.fanout"); @@ -180,7 +180,7 @@ BOOST_FIXTURE_TEST_CASE(testUseSuspendedError, ClientSessionFixture) BOOST_FIXTURE_TEST_CASE(testSuspendResume, ClientSessionFixture) { - session =connection.newSession(60); + session =connection.newSession(ASYNC, 60); declareSubscribe(); session.suspend(); // Make sure we are still subscribed after resume. diff --git a/cpp/src/tests/client_test.cpp b/cpp/src/tests/client_test.cpp index eb145272ca..84fc9434de 100644 --- a/cpp/src/tests/client_test.cpp +++ b/cpp/src/tests/client_test.cpp @@ -92,7 +92,7 @@ int main(int argc, char** argv) //Create and open a session on the connection through which //most functionality is exposed: - Session_0_10 session = connection.newSession(); + Session_0_10 session = connection.newSession(ASYNC); if (opts.trace) std::cout << "Opened session." << std::endl; diff --git a/cpp/src/tests/cluster_client.cpp b/cpp/src/tests/cluster_client.cpp index 30b7e38801..cd048d1651 100644 --- a/cpp/src/tests/cluster_client.cpp +++ b/cpp/src/tests/cluster_client.cpp @@ -62,14 +62,14 @@ BOOST_AUTO_TEST_CASE(testWiringReplication) { ClusterConnections cluster; BOOST_REQUIRE(cluster.size() > 1); - Session broker0 = cluster[0]->newSession(); + Session broker0 = cluster[0]->newSession(ASYNC); broker0.exchangeDeclare(exchange="ex"); broker0.queueDeclare(queue="q"); broker0.queueBind(exchange="ex", queue="q", routingKey="key"); broker0.close(); for (size_t i = 1; i < cluster.size(); ++i) { - Session s = cluster[i]->newSession(); + Session s = cluster[i]->newSession(ASYNC); s.messageTransfer(content=TransferContent("data", "key", "ex")); s.messageSubscribe(queue="q", destination="q"); s.messageFlow(destination="q", unit=0, value=1);//messages diff --git a/cpp/src/tests/latencytest.cpp b/cpp/src/tests/latencytest.cpp index ddafe0f49b..86200054d8 100644 --- a/cpp/src/tests/latencytest.cpp +++ b/cpp/src/tests/latencytest.cpp @@ -156,7 +156,7 @@ public: Client::Client(const string& q) : queue(q) { opts.open(connection); - session = connection.newSession(); + session = connection.newSession(ASYNC); } void Client::start() diff --git a/cpp/src/tests/perftest.cpp b/cpp/src/tests/perftest.cpp index e67acd4465..bc638635da 100644 --- a/cpp/src/tests/perftest.cpp +++ b/cpp/src/tests/perftest.cpp @@ -196,7 +196,7 @@ struct Client : public Runnable { Client() { opts.open(connection); - session = connection.newSession(); + session = connection.newSession(ASYNC); } ~Client() { diff --git a/cpp/src/tests/topic_listener.cpp b/cpp/src/tests/topic_listener.cpp index c78bc4d73b..ec73f3cbe0 100644 --- a/cpp/src/tests/topic_listener.cpp +++ b/cpp/src/tests/topic_listener.cpp @@ -101,7 +101,7 @@ int main(int argc, char** argv){ else { Connection connection(args.trace); args.open(connection); - Session_0_10 session = connection.newSession(); + Session_0_10 session = connection.newSession(ASYNC); if (args.transactional) { session.txSelect(); } diff --git a/cpp/src/tests/topic_publisher.cpp b/cpp/src/tests/topic_publisher.cpp index 80c9bf6607..24a4fc6752 100644 --- a/cpp/src/tests/topic_publisher.cpp +++ b/cpp/src/tests/topic_publisher.cpp @@ -107,7 +107,7 @@ int main(int argc, char** argv) { else { Connection connection(args.trace); args.open(connection); - Session_0_10 session = connection.newSession(); + Session_0_10 session = connection.newSession(ASYNC); if (args.transactional) { session.txSelect(); } diff --git a/cpp/src/tests/txtest.cpp b/cpp/src/tests/txtest.cpp index 4373a9c1e7..f7776dee8d 100644 --- a/cpp/src/tests/txtest.cpp +++ b/cpp/src/tests/txtest.cpp @@ -101,7 +101,7 @@ struct Client Client() { opts.open(connection); - session = connection.newSession(); + session = connection.newSession(ASYNC); } ~Client() |