summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/client/Channel.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/client/Channel.cpp')
-rw-r--r--qpid/cpp/src/qpid/client/Channel.cpp45
1 files changed, 31 insertions, 14 deletions
diff --git a/qpid/cpp/src/qpid/client/Channel.cpp b/qpid/cpp/src/qpid/client/Channel.cpp
index 4af69c8552..ae9f78483d 100644
--- a/qpid/cpp/src/qpid/client/Channel.cpp
+++ b/qpid/cpp/src/qpid/client/Channel.cpp
@@ -26,7 +26,6 @@
#include "Message.h"
#include "Connection.h"
#include "Demux.h"
-#include "FutureResponse.h"
#include "MessageListener.h"
#include "MessageQueue.h"
#include <boost/format.hpp>
@@ -47,9 +46,17 @@ const std::string empty;
class ScopedSync
{
Session& session;
+ const bool change;
+ const bool value;
public:
- ScopedSync(Session& s, bool enabled = true) : session(s) { session.setSynchronous(enabled); }
- ~ScopedSync() { session.setSynchronous(false); }
+ ScopedSync(Session& s, bool desired = true) : session(s), change(s.isSynchronous() != desired), value(desired)
+ {
+ if (change) session.setSynchronous(value);
+ }
+ ~ScopedSync()
+ {
+ if (change) session.setSynchronous(!value);
+ }
};
Channel::Channel(bool _transactional, u_int16_t _prefetch) :
@@ -116,7 +123,7 @@ void Channel::bind(const Exchange& exchange, const Queue& queue, const std::stri
string e = exchange.getName();
string q = queue.getName();
ScopedSync s(session, synch);
- session.queueBind(0, q, e, key, args);
+ session.exchangeBind(q, e, key, args);
}
void Channel::commit(){
@@ -129,7 +136,7 @@ void Channel::rollback(){
void Channel::consume(
Queue& _queue, const std::string& tag, MessageListener* listener,
- AckMode ackMode, bool noLocal, bool synch, const FieldTable* fields) {
+ AckMode ackMode, bool noLocal, bool synch, FieldTable* fields) {
if (tag.empty()) {
throw Exception("A tag must be specified for a consumer.");
@@ -144,13 +151,18 @@ void Channel::consume(
c.ackMode = ackMode;
c.count = 0;
}
- uint8_t confirmMode = ackMode == NO_ACK ? 0 : 1;
+ uint8_t confirmMode = ackMode == NO_ACK ? 1 : 0;
ScopedSync s(session, synch);
- session.messageSubscribe(0, _queue.getName(), tag, noLocal,
+ FieldTable ft;
+ FieldTable* ftptr = fields ? fields : &ft;
+ if (noLocal) {
+ ftptr->setString("qpid.no-local","yes");
+ }
+ session.messageSubscribe(_queue.getName(), tag,
confirmMode, 0/*pre-acquire*/,
- false, fields ? *fields : FieldTable());
+ false, "", 0, *ftptr);
if (!prefetch) {
- session.messageFlowMode(tag, 0/*credit based*/);
+ session.messageSetFlowMode(tag, 0/*credit based*/);
}
//allocate some credit:
@@ -177,17 +189,22 @@ bool Channel::get(Message& msg, const Queue& _queue, AckMode ackMode) {
ScopedDivert handler(tag, session.getExecution().getDemux());
Demux::QueuePtr incoming = handler.getQueue();
- session.messageSubscribe(destination=tag, queue=_queue.getName(), confirmMode=(ackMode == NO_ACK ? 0 : 1));
+ session.messageSubscribe(destination=tag, queue=_queue.getName(), acceptMode=(ackMode == NO_ACK ? 1 : 0));
session.messageFlow(tag, 1/*BYTES*/, 0xFFFFFFFF);
session.messageFlow(tag, 0/*MESSAGES*/, 1);
- Completion status = session.messageFlush(tag);
- status.sync();
+ {
+ ScopedSync s(session);
+ session.messageFlush(tag);
+ }
session.messageCancel(tag);
FrameSet::shared_ptr p;
if (incoming->tryPop(p)) {
msg.populate(*p);
- if (ackMode == AUTO_ACK) msg.acknowledge(session, false, true);
+ if (ackMode == AUTO_ACK) {
+ msg.setSession(session);
+ msg.acknowledge(false, true);
+ }
return true;
}
else
@@ -243,7 +260,7 @@ void Channel::dispatch(FrameSet& content, const std::string& destination)
bool send = i->second.ackMode == AUTO_ACK
|| (prefetch && ++(i->second.count) > (prefetch / 2));
if (send) i->second.count = 0;
- session.getExecution().completed(content.getId(), true, send);
+ session.getExecution().markCompleted(content.getId(), true, send);
}
} else {
QPID_LOG(warning, "Dropping message for unrecognised consumer: " << destination);