diff options
author | Gordon Sim <gsim@apache.org> | 2007-08-02 18:09:48 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2007-08-02 18:09:48 +0000 |
commit | 89aa36d093182e9e191c000504c174663932458f (patch) | |
tree | 06d7e9a3feb4abdaab74b79c94e4352dfa40adaa /cpp/src/qpid/client/ClientChannel.cpp | |
parent | 2290d4ed915f1202bcd6cd50b1a85f27f3eb6cd2 (diff) | |
download | qpid-python-89aa36d093182e9e191c000504c174663932458f.tar.gz |
Some restructuring of the client code:
* Introduced three separate 'handlers' for the connection, channel and execution 'layers'.
* Support for asynchronous retrieval of response or completion status.
* Channel methods no longer included in execution layers command id count.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@562212 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client/ClientChannel.cpp')
-rw-r--r-- | cpp/src/qpid/client/ClientChannel.cpp | 390 |
1 files changed, 160 insertions, 230 deletions
diff --git a/cpp/src/qpid/client/ClientChannel.cpp b/cpp/src/qpid/client/ClientChannel.cpp index 19b4726a72..8b85017ba0 100644 --- a/cpp/src/qpid/client/ClientChannel.cpp +++ b/cpp/src/qpid/client/ClientChannel.cpp @@ -24,9 +24,12 @@ #include "qpid/sys/Monitor.h" #include "ClientMessage.h" #include "qpid/QpidError.h" -#include "MethodBodyInstances.h" #include "Connection.h" -#include "BasicMessageChannel.h" +#include "ConnectionHandler.h" +#include "FutureResponse.h" +#include "MessageListener.h" +#include <boost/format.hpp> +#include <boost/bind.hpp> // FIXME aconway 2007-01-26: Evaluate all throws, ensure consistent // handling of errors that should close the connection or the channel. @@ -45,18 +48,13 @@ const std::string empty; }} -Channel::Channel(bool _transactional, u_int16_t _prefetch, InteropMode mode) : +Channel::Channel(bool _transactional, u_int16_t _prefetch) : connection(0), prefetch(_prefetch), transactional(_transactional), errorCode(200), errorText("Ok"), running(false) { - switch (mode) { - case AMQP_08: messaging.reset(new BasicMessageChannel(*this)); break; - default: assert(0); QPID_ERROR(INTERNAL_ERROR, "Invalid interop-mode."); - } } Channel::~Channel(){ closeInternal(); - stop(); } void Channel::open(ChannelId id, Connection& con) @@ -64,65 +62,15 @@ void Channel::open(ChannelId id, Connection& con) if (isOpen()) THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel "+id); connection = &con; - init(id, con, con.getVersion()); // ChannelAdapter initialization. - string oob; - if (id != 0) - sendAndReceive<ChannelOpenOkBody>(make_shared_ptr(new ChannelOpenBody(version, oob))); -} + channelId = id; + //link up handlers: + channelHandler.out = boost::bind(&ConnectionHandler::outgoing, &(connection->handler), _1); + channelHandler.in = boost::bind(&ExecutionHandler::handle, &executionHandler, _1); + executionHandler.out = boost::bind(&ChannelHandler::outgoing, &channelHandler, _1); + //set up close notification: + channelHandler.onClose = boost::bind(&Channel::peerClose, this, _1, _2); -void Channel::protocolInit( - const std::string& uid, const std::string& pwd, const std::string& vhost) { - assert(connection); - responses.expect(); - connection->connector->init(); // Send ProtocolInit block. - ConnectionStartBody::shared_ptr connectionStart = - responses.receive<ConnectionStartBody>(); - - FieldTable props; - string mechanism("PLAIN"); - string response = ((char)0) + uid + ((char)0) + pwd; - string locale("en_US"); - ConnectionTuneBody::shared_ptr proposal = - sendAndReceive<ConnectionTuneBody>( - make_shared_ptr(new ConnectionStartOkBody( - version, //connectionStart->getRequestId(), - props, mechanism, - response, locale))); - - /** - * Assume for now that further challenges will not be required - //receive connection.secure - responses.receive(connection_secure)); - //send connection.secure-ok - connection->send(new AMQFrame(0, new ConnectionSecureOkBody(response))); - **/ - - sendCommand(make_shared_ptr(new ConnectionTuneOkBody( - version, //proposal->getRequestId(), - proposal->getChannelMax(), connection->getMaxFrameSize(), - proposal->getHeartbeat()))); - - uint16_t heartbeat = proposal->getHeartbeat(); - connection->connector->setReadTimeout(heartbeat * 2); - connection->connector->setWriteTimeout(heartbeat); - - // Send connection open. - std::string capabilities; - responses.expect(); - sendCommand(make_shared_ptr(new ConnectionOpenBody(version, vhost, capabilities, true))); - //receive connection.open-ok (or redirect, but ignore that for now - //esp. as using force=true). - AMQMethodBody::shared_ptr openResponse = responses.receive(); - if(openResponse->isA<ConnectionOpenOkBody>()) { - //ok - }else if(openResponse->isA<ConnectionRedirectBody>()){ - //ignore for now - ConnectionRedirectBody::shared_ptr redirect( - shared_polymorphic_downcast<ConnectionRedirectBody>(openResponse)); - QPID_LOG(error, "Ignoring redirect to " << redirect->getHost()); - } else { - THROW_QPID_ERROR(PROTOCOL_ERROR, "Bad response to Connection.open"); - } + channelHandler.open(id); } bool Channel::isOpen() const { @@ -131,7 +79,11 @@ bool Channel::isOpen() const { } void Channel::setQos() { - messaging->setQos(); + executionHandler.send(make_shared_ptr(new BasicQosBody(version, 0, getPrefetch(), false))); + if(isTransactional()) { + //I think this is wrong! should only send TxSelect once... + executionHandler.send(make_shared_ptr(new TxSelectBody(version))); + } } void Channel::setPrefetch(uint16_t _prefetch){ @@ -143,14 +95,12 @@ void Channel::declareExchange(Exchange& exchange, bool synch){ string name = exchange.getName(); string type = exchange.getType(); FieldTable args; - send(make_shared_ptr(new ExchangeDeclareBody(version, 0, name, type, empty, false, false, false, args))); - if (synch) synchWithServer(); + sendSync(synch, make_shared_ptr(new ExchangeDeclareBody(version, 0, name, type, empty, false, false, false, args))); } void Channel::deleteExchange(Exchange& exchange, bool synch){ string name = exchange.getName(); - send(make_shared_ptr(new ExchangeDeleteBody(version, 0, name, false))); - if (synch) synchWithServer(); + sendSync(synch, make_shared_ptr(new ExchangeDeleteBody(version, 0, name, false))); } void Channel::declareQueue(Queue& queue, bool synch){ @@ -179,131 +129,41 @@ void Channel::deleteQueue(Queue& queue, bool ifunused, bool ifempty, bool synch) void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){ string e = exchange.getName(); string q = queue.getName(); - send(make_shared_ptr(new QueueBindBody(version, 0, q, e, key, args))); - if (synch) synchWithServer(); + sendSync(synch, make_shared_ptr(new QueueBindBody(version, 0, q, e, key, args))); } void Channel::commit(){ - send(make_shared_ptr(new TxCommitBody(version))); + executionHandler.send(make_shared_ptr(new TxCommitBody(version))); } void Channel::rollback(){ - send(make_shared_ptr(new TxRollbackBody(version))); + executionHandler.send(make_shared_ptr(new TxRollbackBody(version))); } -void Channel::handleMethodInContext( -AMQMethodBody::shared_ptr method, const MethodContext& ctxt) +void Channel::close() { - // Special case for consume OK as it is both an expected response - // and needs handling in this thread. - if (method->isA<BasicConsumeOkBody>()) { - messaging->handle(method); - responses.signalResponse(method); - return; - } - if(responses.isWaiting()) { - responses.signalResponse(method); - return; - } - try { - switch (method->amqpClassId()) { - case MessageTransferBody::CLASS_ID: - case BasicGetOkBody::CLASS_ID: messaging->handle(method); break; - case ChannelCloseBody::CLASS_ID: handleChannel(method, ctxt); break; - case ConnectionCloseBody::CLASS_ID: handleConnection(method); break; - case ExecutionCompleteBody::CLASS_ID: handleExecution(method); break; - default: throw UnknownMethod(); - } - } - catch (const UnknownMethod&) { - connection->close( - 504, "Unknown method", - method->amqpClassId(), method->amqpMethodId()); - } - } - -void Channel::handleChannel(AMQMethodBody::shared_ptr method, const MethodContext& /*ctxt*/) { - switch (method->amqpMethodId()) { - case ChannelCloseBody::METHOD_ID: - sendCommand(make_shared_ptr(new ChannelCloseOkBody(version/*, ctxt.getRequestId()*/))); - peerClose(shared_polymorphic_downcast<ChannelCloseBody>(method)); - return; - case ChannelFlowBody::METHOD_ID: - // FIXME aconway 2007-02-22: Not yet implemented. - return; - } - throw UnknownMethod(); -} - -void Channel::handleConnection(AMQMethodBody::shared_ptr method) { - if (method->amqpMethodId() == ConnectionCloseBody::METHOD_ID) { - connection->close(); - return; - } - throw UnknownMethod(); -} - -void Channel::handleExecution(AMQMethodBody::shared_ptr method) { - if (method->amqpMethodId() == ExecutionCompleteBody::METHOD_ID) { - Monitor::ScopedLock l(outgoingMonitor); - //record the completion mark: - outgoing.lwm = shared_polymorphic_downcast<ExecutionCompleteBody>(method)->getCumulativeExecutionMark(); - //TODO: notify anyone waiting for completion notification: - outgoingMonitor.notifyAll(); - } else{ - throw UnknownMethod(); - } -} - -void Channel::handleHeader(AMQHeaderBody::shared_ptr body){ - messaging->handle(body); -} - -void Channel::handleContent(AMQContentBody::shared_ptr body){ - messaging->handle(body); -} - -void Channel::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){ - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Channel received heartbeat"); -} - -void Channel::start(){ - running = true; - dispatcher = Thread(*messaging); -} - -// Close called by local application. -void Channel::close( - uint16_t code, const std::string& text, - ClassId classId, MethodId methodId) -{ - if (isOpen()) { - try { - if (getId() != 0) { - if (code == 200) messaging->cancelAll(); - - sendAndReceive<ChannelCloseOkBody>( - make_shared_ptr(new ChannelCloseBody( - version, code, text, classId, methodId))); - } - static_cast<ConnectionForChannel*>(connection)->erase(getId()); - closeInternal(); - } catch (...) { - static_cast<ConnectionForChannel*>(connection)->erase(getId()); - closeInternal(); - throw; + channelHandler.close(); + { + Mutex::ScopedLock l(lock); + if (connection); + { + connection->erase(channelId); + connection = 0; } } stop(); } + // Channel closed by peer. -void Channel::peerClose(ChannelCloseBody::shared_ptr reason) { +void Channel::peerClose(uint16_t code, const std::string& message) { assert(isOpen()); //record reason: - errorCode = reason->getReplyCode(); - errorText = reason->getReplyText(); + errorCode = code; + errorText = message; closeInternal(); + stop(); + futures.close(code, message); } void Channel::closeInternal() { @@ -311,26 +171,26 @@ void Channel::closeInternal() { if (connection); { connection = 0; - messaging->close(); - // A 0 response means we are closed. - responses.signalResponse(AMQMethodBody::shared_ptr()); } } -void Channel::stop() { - Mutex::ScopedLock l(stopLock); - if(running) { - dispatcher.join(); - running = false; - } +AMQMethodBody::shared_ptr Channel::sendAndReceive(AMQMethodBody::shared_ptr toSend, ClassId /*c*/, MethodId /*m*/) +{ + + boost::shared_ptr<FutureResponse> fr(futures.createResponse()); + executionHandler.send(toSend, boost::bind(&FutureResponse::completed, fr), boost::bind(&FutureResponse::received, fr, _1)); + return fr->getResponse(); } -AMQMethodBody::shared_ptr Channel::sendAndReceive( - AMQMethodBody::shared_ptr toSend, ClassId c, MethodId m) +void Channel::sendSync(bool sync, AMQMethodBody::shared_ptr command) { - responses.expect(); - sendCommand(toSend); - return responses.receive(c, m); + if(sync) { + boost::shared_ptr<FutureCompletion> fc(futures.createCompletion()); + executionHandler.send(command, boost::bind(&FutureCompletion::completed, fc)); + fc->waitForCompletion(); + } else { + executionHandler.send(command); + } } AMQMethodBody::shared_ptr Channel::sendAndReceiveSync( @@ -339,68 +199,138 @@ AMQMethodBody::shared_ptr Channel::sendAndReceiveSync( if(sync) return sendAndReceive(body, c, m); else { - sendCommand(body); + executionHandler.send(body); return AMQMethodBody::shared_ptr(); } } void Channel::consume( - Queue& queue, std::string& tag, MessageListener* listener, + Queue& queue, const std::string& tag, MessageListener* listener, AckMode ackMode, bool noLocal, bool synch, const FieldTable* fields) { - messaging->consume(queue, tag, listener, ackMode, noLocal, synch, fields); + + if (tag.empty()) { + throw Exception("A tag must be specified for a consumer."); + } + { + Mutex::ScopedLock l(lock); + ConsumerMap::iterator i = consumers.find(tag); + if (i != consumers.end()) + throw Exception(boost::format("Consumer already exists with tag: '%1%'") % tag); + Consumer& c = consumers[tag]; + c.listener = listener; + c.ackMode = ackMode; + c.lastDeliveryTag = 0; + } + sendAndReceiveSync<BasicConsumeOkBody>( + synch, + make_shared_ptr(new BasicConsumeBody( + version, 0, queue.getName(), tag, noLocal, + ackMode == NO_ACK, false, !synch, + fields ? *fields : FieldTable()))); } void Channel::cancel(const std::string& tag, bool synch) { - messaging->cancel(tag, synch); + Consumer c; + { + Mutex::ScopedLock l(lock); + ConsumerMap::iterator i = consumers.find(tag); + if (i == consumers.end()) + return; + c = i->second; + consumers.erase(i); + } + sendAndReceiveSync<BasicCancelOkBody>( + synch, make_shared_ptr(new BasicCancelBody(version, tag, !synch))); } bool Channel::get(Message& msg, const Queue& queue, AckMode ackMode) { - bool result = messaging->get(msg, queue, ackMode); - if (!isOpen()) { - throw ChannelException(errorCode, errorText); + + AMQMethodBody::shared_ptr request(new BasicGetBody(version, 0, queue.getName(), ackMode)); + AMQMethodBody::shared_ptr response = sendAndReceive(request); + if (response && response->isA<BasicGetEmptyBody>()) { + return false; + } else { + ReceivedContent::shared_ptr content = gets.pop(); + content->populate(msg); + return true; } - return result; } void Channel::publish(const Message& msg, const Exchange& exchange, const std::string& routingKey, bool mandatory, bool immediate) { - messaging->publish(msg, exchange, routingKey, mandatory, immediate); -} - -void Channel::setReturnedMessageHandler(ReturnedMessageHandler* handler) { - messaging->setReturnedMessageHandler(handler); -} -void Channel::run() { - messaging->run(); + const string e = exchange.getName(); + string key = routingKey; + + executionHandler.sendContent(make_shared_ptr(new BasicPublishBody(version, 0, e, key, mandatory, immediate)), + msg, msg.getData(), connection->getMaxFrameSize());//sending framesize here is horrible, fix this! + /* + // Make a header for the message + AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); + BasicHeaderProperties::copy( + *static_cast<BasicHeaderProperties*>(header->getProperties()), msg); + header->setContentSize(msg.getData().size()); + + executionHandler.send(make_shared_ptr(new BasicPublishBody(version, 0, e, key, mandatory, immediate))); + executionHandler.sendContent(header); + string data = msg.getData(); + u_int64_t data_length = data.length(); + if(data_length > 0){ + //frame itself uses 8 bytes + u_int32_t frag_size = connection->getMaxFrameSize() - 8; + if(data_length < frag_size){ + executionHandler.sendContent(make_shared_ptr(new AMQContentBody(data))); + }else{ + u_int32_t offset = 0; + u_int32_t remaining = data_length - offset; + while (remaining > 0) { + u_int32_t length = remaining > frag_size ? frag_size : remaining; + string frag(data.substr(offset, length)); + executionHandler.sendContent(make_shared_ptr(new AMQContentBody(frag))); + + offset += length; + remaining = data_length - offset; + } + } + } + */ } -void Channel::sendCommand(AMQBody::shared_ptr body) -{ - ++(outgoing.hwm); - send(body); +void Channel::start(){ + running = true; + dispatcher = Thread(*this); } -bool Channel::waitForCompletion(SequenceNumber poi, Duration timeout) -{ - AbsTime end; - if (timeout == 0) { - end = AbsTime::FarFuture(); - } else { - end = AbsTime(AbsTime::now(), timeout); - } - - Monitor::ScopedLock l(outgoingMonitor); - while (end > AbsTime::now() && outgoing.lwm < poi) { - outgoingMonitor.wait(end); +void Channel::stop() { + executionHandler.received.close(); + gets.close(); + Mutex::ScopedLock l(stopLock); + if(running) { + dispatcher.join(); + running = false; } - return !(outgoing.lwm < poi); } -bool Channel::synchWithServer(Duration timeout) -{ - send(make_shared_ptr(new ExecutionFlushBody(version))); - return waitForCompletion(outgoing.hwm, timeout); +void Channel::run() { + try { + while (true) { + ReceivedContent::shared_ptr content = executionHandler.received.pop(); + //need to dispatch this to the relevant listener: + if (content->isA<BasicDeliverBody>()) { + ConsumerMap::iterator i = consumers.find(content->as<BasicDeliverBody>()->getConsumerTag()); + if (i != consumers.end()) { + Message msg; + content->populate(msg); + i->second.listener->received(msg); + } else { + QPID_LOG(warning, "Dropping message for unrecognised consumer: " << content->getMethod()); + } + } else if (content->isA<BasicGetOkBody>()) { + gets.push(content); + } else { + QPID_LOG(warning, "Dropping unsupported message type: " << content->getMethod()); + } + } + } catch (const QueueClosed&) {} } - |