diff options
Diffstat (limited to 'qpid/cpp/src/qpid/client/Channel.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/client/Channel.cpp | 45 |
1 files changed, 31 insertions, 14 deletions
diff --git a/qpid/cpp/src/qpid/client/Channel.cpp b/qpid/cpp/src/qpid/client/Channel.cpp index 4af69c8552..ae9f78483d 100644 --- a/qpid/cpp/src/qpid/client/Channel.cpp +++ b/qpid/cpp/src/qpid/client/Channel.cpp @@ -26,7 +26,6 @@ #include "Message.h" #include "Connection.h" #include "Demux.h" -#include "FutureResponse.h" #include "MessageListener.h" #include "MessageQueue.h" #include <boost/format.hpp> @@ -47,9 +46,17 @@ const std::string empty; class ScopedSync { Session& session; + const bool change; + const bool value; public: - ScopedSync(Session& s, bool enabled = true) : session(s) { session.setSynchronous(enabled); } - ~ScopedSync() { session.setSynchronous(false); } + ScopedSync(Session& s, bool desired = true) : session(s), change(s.isSynchronous() != desired), value(desired) + { + if (change) session.setSynchronous(value); + } + ~ScopedSync() + { + if (change) session.setSynchronous(!value); + } }; Channel::Channel(bool _transactional, u_int16_t _prefetch) : @@ -116,7 +123,7 @@ void Channel::bind(const Exchange& exchange, const Queue& queue, const std::stri string e = exchange.getName(); string q = queue.getName(); ScopedSync s(session, synch); - session.queueBind(0, q, e, key, args); + session.exchangeBind(q, e, key, args); } void Channel::commit(){ @@ -129,7 +136,7 @@ void Channel::rollback(){ void Channel::consume( Queue& _queue, const std::string& tag, MessageListener* listener, - AckMode ackMode, bool noLocal, bool synch, const FieldTable* fields) { + AckMode ackMode, bool noLocal, bool synch, FieldTable* fields) { if (tag.empty()) { throw Exception("A tag must be specified for a consumer."); @@ -144,13 +151,18 @@ void Channel::consume( c.ackMode = ackMode; c.count = 0; } - uint8_t confirmMode = ackMode == NO_ACK ? 0 : 1; + uint8_t confirmMode = ackMode == NO_ACK ? 1 : 0; ScopedSync s(session, synch); - session.messageSubscribe(0, _queue.getName(), tag, noLocal, + FieldTable ft; + FieldTable* ftptr = fields ? fields : &ft; + if (noLocal) { + ftptr->setString("qpid.no-local","yes"); + } + session.messageSubscribe(_queue.getName(), tag, confirmMode, 0/*pre-acquire*/, - false, fields ? *fields : FieldTable()); + false, "", 0, *ftptr); if (!prefetch) { - session.messageFlowMode(tag, 0/*credit based*/); + session.messageSetFlowMode(tag, 0/*credit based*/); } //allocate some credit: @@ -177,17 +189,22 @@ bool Channel::get(Message& msg, const Queue& _queue, AckMode ackMode) { ScopedDivert handler(tag, session.getExecution().getDemux()); Demux::QueuePtr incoming = handler.getQueue(); - session.messageSubscribe(destination=tag, queue=_queue.getName(), confirmMode=(ackMode == NO_ACK ? 0 : 1)); + session.messageSubscribe(destination=tag, queue=_queue.getName(), acceptMode=(ackMode == NO_ACK ? 1 : 0)); session.messageFlow(tag, 1/*BYTES*/, 0xFFFFFFFF); session.messageFlow(tag, 0/*MESSAGES*/, 1); - Completion status = session.messageFlush(tag); - status.sync(); + { + ScopedSync s(session); + session.messageFlush(tag); + } session.messageCancel(tag); FrameSet::shared_ptr p; if (incoming->tryPop(p)) { msg.populate(*p); - if (ackMode == AUTO_ACK) msg.acknowledge(session, false, true); + if (ackMode == AUTO_ACK) { + msg.setSession(session); + msg.acknowledge(false, true); + } return true; } else @@ -243,7 +260,7 @@ void Channel::dispatch(FrameSet& content, const std::string& destination) bool send = i->second.ackMode == AUTO_ACK || (prefetch && ++(i->second.count) > (prefetch / 2)); if (send) i->second.count = 0; - session.getExecution().completed(content.getId(), true, send); + session.getExecution().markCompleted(content.getId(), true, send); } } else { QPID_LOG(warning, "Dropping message for unrecognised consumer: " << destination); |