diff options
author | Gordon Sim <gsim@apache.org> | 2010-04-07 19:41:44 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2010-04-07 19:41:44 +0000 |
commit | 00e14b01033ae8c33399bc6ebfd28930498b2533 (patch) | |
tree | 3615da0715bfe4f09bb930f108611f47d3ae794c /cpp | |
parent | 736179f004fbc65bb82f0ceade570375197e29e7 (diff) | |
download | qpid-python-00e14b01033ae8c33399bc6ebfd28930498b2533.tar.gz |
QPID-664: removed flush, added option to make sync non-blocking if so desired
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@931651 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/include/qpid/client/SessionBase_0_10.h | 2 | ||||
-rw-r--r-- | cpp/include/qpid/messaging/Connection.h | 3 | ||||
-rw-r--r-- | cpp/include/qpid/messaging/Session.h | 11 | ||||
-rw-r--r-- | cpp/src/qpid/client/SessionBase_0_10.cpp | 7 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/SenderImpl.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/SessionImpl.cpp | 23 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/SessionImpl.h | 14 | ||||
-rw-r--r-- | cpp/src/qpid/messaging/Session.cpp | 9 | ||||
-rw-r--r-- | cpp/src/qpid/messaging/SessionImpl.h | 3 |
9 files changed, 35 insertions, 39 deletions
diff --git a/cpp/include/qpid/client/SessionBase_0_10.h b/cpp/include/qpid/client/SessionBase_0_10.h index e76019dd4d..6b7ed97df4 100644 --- a/cpp/include/qpid/client/SessionBase_0_10.h +++ b/cpp/include/qpid/client/SessionBase_0_10.h @@ -101,6 +101,8 @@ class SessionBase_0_10 { QPID_CLIENT_EXTERN Connection getConnection(); + /** Send sync request without actually blocking for it**/ + QPID_CLIENT_EXTERN void sendSyncRequest(); protected: boost::shared_ptr<SessionImpl> impl; friend class SessionBase_0_10Access; diff --git a/cpp/include/qpid/messaging/Connection.h b/cpp/include/qpid/messaging/Connection.h index 34a37ab776..933c503635 100644 --- a/cpp/include/qpid/messaging/Connection.h +++ b/cpp/include/qpid/messaging/Connection.h @@ -55,9 +55,6 @@ class Connection : public qpid::messaging::Handle<ConnectionImpl> * sasl-max-ssf * protocol * - * (note also bounds, locale, max-channels and max-framesize, but - * not sure whether those should be documented here) - * * Reconnect behaviour can be controlled through the following options: * * reconnect: true/false (enables/disables reconnect entirely) diff --git a/cpp/include/qpid/messaging/Session.h b/cpp/include/qpid/messaging/Session.h index c20de9079b..95f9832576 100644 --- a/cpp/include/qpid/messaging/Session.h +++ b/cpp/include/qpid/messaging/Session.h @@ -89,8 +89,15 @@ class Session : public qpid::messaging::Handle<SessionImpl> */ QPID_CLIENT_EXTERN void release(Message&); - QPID_CLIENT_EXTERN void sync(); - QPID_CLIENT_EXTERN void flush(); + /** + * Request synchronisation with the server. + * + * @param block if true, this call will block until the server + * confirms completion of all pending operations; if false the + * call will request notifcation from the server but will return + * before receiving it. + */ + QPID_CLIENT_EXTERN void sync(bool block=true); /** * Returns the total number of messages received and waiting to be diff --git a/cpp/src/qpid/client/SessionBase_0_10.cpp b/cpp/src/qpid/client/SessionBase_0_10.cpp index e114b7aacc..6aa13bb579 100644 --- a/cpp/src/qpid/client/SessionBase_0_10.cpp +++ b/cpp/src/qpid/client/SessionBase_0_10.cpp @@ -65,6 +65,13 @@ void SessionBase_0_10::sendCompletion() impl->sendCompletion(); } +void SessionBase_0_10::sendSyncRequest() +{ + ExecutionSyncBody b; + b.setSync(true); + impl->send(b); +} + uint16_t SessionBase_0_10::getChannel() const { return impl->getChannel(); } void SessionBase_0_10::suspend() { impl->suspend(); } diff --git a/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp b/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp index e8c106976f..522c93a552 100644 --- a/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp @@ -43,7 +43,7 @@ void SenderImpl::send(const qpid::messaging::Message& message, bool sync) Send f(*this, &message); while (f.repeat) parent->execute(f); } - if (sync) parent->sync(); + if (sync) parent->sync(true); } void SenderImpl::close() diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp index 9efafb1d16..8f9751a967 100644 --- a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp @@ -51,14 +51,10 @@ namespace amqp0_10 { SessionImpl::SessionImpl(ConnectionImpl& c, bool t) : connection(&c), transactional(t) {} -void SessionImpl::sync() +void SessionImpl::sync(bool block) { - retry<Sync>(); -} - -void SessionImpl::flush() -{ - retry<Flush>(); + if (block) retry<Sync>(); + else execute<NonBlockingSync>(); } void SessionImpl::commit() @@ -82,7 +78,7 @@ void SessionImpl::acknowledge(bool sync_) //message may be redelivered; i.e. the application cannot delete //any state necessary for preventing reprocessing of the message execute<Acknowledge>(); - if (sync_) sync(); + if (sync_) sync(true); } void SessionImpl::reject(qpid::messaging::Message& m) @@ -378,17 +374,12 @@ uint32_t SessionImpl::pendingAckImpl(const std::string* destination) } } -void SessionImpl::syncImpl() -{ - session.sync(); -} - -void SessionImpl::flushImpl() +void SessionImpl::syncImpl(bool block) { - session.flush(); + if (block) session.sync(); + else session.sendSyncRequest(); } - void SessionImpl::commitImpl() { incoming.accept(); diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.h b/cpp/src/qpid/client/amqp0_10/SessionImpl.h index 5f9b23fb14..8b098e65d6 100644 --- a/cpp/src/qpid/client/amqp0_10/SessionImpl.h +++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.h @@ -62,8 +62,7 @@ class SessionImpl : public qpid::messaging::SessionImpl void reject(qpid::messaging::Message&); void release(qpid::messaging::Message&); void close(); - void sync(); - void flush(); + void sync(bool block); qpid::messaging::Sender createSender(const qpid::messaging::Address& address); qpid::messaging::Receiver createReceiver(const qpid::messaging::Address& address); @@ -126,8 +125,7 @@ class SessionImpl : public qpid::messaging::SessionImpl void rejectImpl(qpid::messaging::Message&); void releaseImpl(qpid::messaging::Message&); void closeImpl(); - void syncImpl(); - void flushImpl(); + void syncImpl(bool block); qpid::messaging::Sender createSenderImpl(const qpid::messaging::Address& address); qpid::messaging::Receiver createReceiverImpl(const qpid::messaging::Address& address); uint32_t availableImpl(const std::string* destination); @@ -163,13 +161,13 @@ class SessionImpl : public qpid::messaging::SessionImpl struct Sync : Command { Sync(SessionImpl& i) : Command(i) {} - void operator()() { impl.syncImpl(); } + void operator()() { impl.syncImpl(true); } }; - struct Flush : Command + struct NonBlockingSync : Command { - Flush(SessionImpl& i) : Command(i) {} - void operator()() { impl.flushImpl(); } + NonBlockingSync(SessionImpl& i) : Command(i) {} + void operator()() { impl.syncImpl(false); } }; struct Reject : Command diff --git a/cpp/src/qpid/messaging/Session.cpp b/cpp/src/qpid/messaging/Session.cpp index cc0b528777..eb5e3766b8 100644 --- a/cpp/src/qpid/messaging/Session.cpp +++ b/cpp/src/qpid/messaging/Session.cpp @@ -61,14 +61,9 @@ Receiver Session::createReceiver(const std::string& address) return impl->createReceiver(Address(address)); } -void Session::sync() +void Session::sync(bool block) { - impl->sync(); -} - -void Session::flush() -{ - impl->flush(); + impl->sync(block); } bool Session::nextReceiver(Receiver& receiver, Duration timeout) diff --git a/cpp/src/qpid/messaging/SessionImpl.h b/cpp/src/qpid/messaging/SessionImpl.h index 7f76f8556f..7acead5b04 100644 --- a/cpp/src/qpid/messaging/SessionImpl.h +++ b/cpp/src/qpid/messaging/SessionImpl.h @@ -44,8 +44,7 @@ class SessionImpl : public virtual qpid::RefCounted virtual void reject(Message&) = 0; virtual void release(Message&) = 0; virtual void close() = 0; - virtual void sync() = 0; - virtual void flush() = 0; + virtual void sync(bool block) = 0; virtual Sender createSender(const Address& address) = 0; virtual Receiver createReceiver(const Address& address) = 0; virtual bool nextReceiver(Receiver& receiver, Duration timeout) = 0; |