summaryrefslogtreecommitdiff
path: root/qpid/cpp/lib/client/ClientChannel.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/lib/client/ClientChannel.cpp')
-rw-r--r--qpid/cpp/lib/client/ClientChannel.cpp51
1 files changed, 41 insertions, 10 deletions
diff --git a/qpid/cpp/lib/client/ClientChannel.cpp b/qpid/cpp/lib/client/ClientChannel.cpp
index 84aa73e6bc..97e0a394d2 100644
--- a/qpid/cpp/lib/client/ClientChannel.cpp
+++ b/qpid/cpp/lib/client/ClientChannel.cpp
@@ -25,6 +25,9 @@
#include <QpidError.h>
#include <MethodBodyInstances.h>
#include "Connection.h"
+#include "BasicMessageChannel.h"
+// FIXME aconway 2007-03-21:
+//#include "MessageMessageChannel.h"
// FIXME aconway 2007-01-26: Evaluate all throws, ensure consistent
// handling of errors that should close the connection or the channel.
@@ -36,8 +39,10 @@ using namespace qpid::client;
using namespace qpid::framing;
using namespace qpid::sys;
-Channel::Channel(bool _transactional, uint16_t _prefetch) :
- basic(*this),
+Channel::Channel(bool _transactional, u_int16_t _prefetch,
+ MessageChannel* impl) :
+ // FIXME aconway 2007-03-21: MessageMessageChannel
+ messaging(impl ? impl : new BasicMessageChannel(*this)),
connection(0),
prefetch(_prefetch),
transactional(_transactional)
@@ -115,7 +120,7 @@ void Channel::protocolInit(
bool Channel::isOpen() const { return connection; }
void Channel::setQos() {
- basic.setQos();
+ messaging->setQos();
// FIXME aconway 2007-02-22: message
}
@@ -192,7 +197,7 @@ void Channel::handleMethodInContext(
}
try {
switch (method->amqpClassId()) {
- case BasicDeliverBody::CLASS_ID: basic.handle(method); break;
+ case BasicDeliverBody::CLASS_ID: messaging->handle(method); break;
case ChannelCloseBody::CLASS_ID: handleChannel(method); break;
case ConnectionCloseBody::CLASS_ID: handleConnection(method); break;
default: throw UnknownMethod();
@@ -226,11 +231,11 @@ void Channel::handleConnection(AMQMethodBody::shared_ptr method) {
}
void Channel::handleHeader(AMQHeaderBody::shared_ptr body){
- basic.incoming.add(body);
+ messaging->handle(body);
}
void Channel::handleContent(AMQContentBody::shared_ptr body){
- basic.incoming.add(body);
+ messaging->handle(body);
}
void Channel::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){
@@ -238,7 +243,7 @@ void Channel::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){
}
void Channel::start(){
- basicDispatcher = Thread(basic);
+ dispatcher = Thread(*messaging);
}
// Close called by local application.
@@ -274,13 +279,12 @@ void Channel::peerClose(ChannelCloseBody::shared_ptr) {
void Channel::closeInternal() {
if (isOpen());
{
- basic.cancelAll();
- basic.incoming.shutdown();
+ messaging->close();
connection = 0;
// A 0 response means we are closed.
responses.signalResponse(AMQMethodBody::shared_ptr());
}
- basicDispatcher.join();
+ dispatcher.join();
}
void Channel::sendAndReceive(AMQMethodBody* toSend, ClassId c, MethodId m)
@@ -299,4 +303,31 @@ void Channel::sendAndReceiveSync(
send(body);
}
+void Channel::consume(
+ Queue& queue, std::string& tag, MessageListener* listener,
+ AckMode ackMode, bool noLocal, bool synch, const FieldTable* fields) {
+ messaging->consume(queue, tag, listener, ackMode, noLocal, synch, fields);
+}
+
+void Channel::cancel(const std::string& tag, bool synch) {
+ messaging->cancel(tag, synch);
+}
+
+bool Channel::get(Message& msg, const Queue& queue, AckMode ackMode) {
+ return messaging->get(msg, queue, ackMode);
+}
+
+void Channel::publish(const Message& msg, const Exchange& exchange,
+ const std::string& routingKey,
+ bool mandatory, bool immediate) {
+ messaging->publish(msg, exchange, routingKey, mandatory, immediate);
+}
+
+void Channel::setReturnedMessageHandler(ReturnedMessageHandler* handler) {
+ messaging->setReturnedMessageHandler(handler);
+}
+
+void Channel::run() {
+ messaging->run();
+}