diff options
Diffstat (limited to 'qpid/cpp/lib/client/ClientChannel.cpp')
-rw-r--r-- | qpid/cpp/lib/client/ClientChannel.cpp | 51 |
1 files changed, 41 insertions, 10 deletions
diff --git a/qpid/cpp/lib/client/ClientChannel.cpp b/qpid/cpp/lib/client/ClientChannel.cpp index 84aa73e6bc..97e0a394d2 100644 --- a/qpid/cpp/lib/client/ClientChannel.cpp +++ b/qpid/cpp/lib/client/ClientChannel.cpp @@ -25,6 +25,9 @@ #include <QpidError.h> #include <MethodBodyInstances.h> #include "Connection.h" +#include "BasicMessageChannel.h" +// FIXME aconway 2007-03-21: +//#include "MessageMessageChannel.h" // FIXME aconway 2007-01-26: Evaluate all throws, ensure consistent // handling of errors that should close the connection or the channel. @@ -36,8 +39,10 @@ using namespace qpid::client; using namespace qpid::framing; using namespace qpid::sys; -Channel::Channel(bool _transactional, uint16_t _prefetch) : - basic(*this), +Channel::Channel(bool _transactional, u_int16_t _prefetch, + MessageChannel* impl) : + // FIXME aconway 2007-03-21: MessageMessageChannel + messaging(impl ? impl : new BasicMessageChannel(*this)), connection(0), prefetch(_prefetch), transactional(_transactional) @@ -115,7 +120,7 @@ void Channel::protocolInit( bool Channel::isOpen() const { return connection; } void Channel::setQos() { - basic.setQos(); + messaging->setQos(); // FIXME aconway 2007-02-22: message } @@ -192,7 +197,7 @@ void Channel::handleMethodInContext( } try { switch (method->amqpClassId()) { - case BasicDeliverBody::CLASS_ID: basic.handle(method); break; + case BasicDeliverBody::CLASS_ID: messaging->handle(method); break; case ChannelCloseBody::CLASS_ID: handleChannel(method); break; case ConnectionCloseBody::CLASS_ID: handleConnection(method); break; default: throw UnknownMethod(); @@ -226,11 +231,11 @@ void Channel::handleConnection(AMQMethodBody::shared_ptr method) { } void Channel::handleHeader(AMQHeaderBody::shared_ptr body){ - basic.incoming.add(body); + messaging->handle(body); } void Channel::handleContent(AMQContentBody::shared_ptr body){ - basic.incoming.add(body); + messaging->handle(body); } void Channel::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){ @@ -238,7 +243,7 @@ void Channel::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){ } void Channel::start(){ - basicDispatcher = Thread(basic); + dispatcher = Thread(*messaging); } // Close called by local application. @@ -274,13 +279,12 @@ void Channel::peerClose(ChannelCloseBody::shared_ptr) { void Channel::closeInternal() { if (isOpen()); { - basic.cancelAll(); - basic.incoming.shutdown(); + messaging->close(); connection = 0; // A 0 response means we are closed. responses.signalResponse(AMQMethodBody::shared_ptr()); } - basicDispatcher.join(); + dispatcher.join(); } void Channel::sendAndReceive(AMQMethodBody* toSend, ClassId c, MethodId m) @@ -299,4 +303,31 @@ void Channel::sendAndReceiveSync( send(body); } +void Channel::consume( + Queue& queue, std::string& tag, MessageListener* listener, + AckMode ackMode, bool noLocal, bool synch, const FieldTable* fields) { + messaging->consume(queue, tag, listener, ackMode, noLocal, synch, fields); +} + +void Channel::cancel(const std::string& tag, bool synch) { + messaging->cancel(tag, synch); +} + +bool Channel::get(Message& msg, const Queue& queue, AckMode ackMode) { + return messaging->get(msg, queue, ackMode); +} + +void Channel::publish(const Message& msg, const Exchange& exchange, + const std::string& routingKey, + bool mandatory, bool immediate) { + messaging->publish(msg, exchange, routingKey, mandatory, immediate); +} + +void Channel::setReturnedMessageHandler(ReturnedMessageHandler* handler) { + messaging->setReturnedMessageHandler(handler); +} + +void Channel::run() { + messaging->run(); +} |