summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/ClientChannel.cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-08-02 18:09:48 +0000
committerGordon Sim <gsim@apache.org>2007-08-02 18:09:48 +0000
commit89aa36d093182e9e191c000504c174663932458f (patch)
tree06d7e9a3feb4abdaab74b79c94e4352dfa40adaa /cpp/src/qpid/client/ClientChannel.cpp
parent2290d4ed915f1202bcd6cd50b1a85f27f3eb6cd2 (diff)
downloadqpid-python-89aa36d093182e9e191c000504c174663932458f.tar.gz
Some restructuring of the client code:
* Introduced three separate 'handlers' for the connection, channel and execution 'layers'. * Support for asynchronous retrieval of response or completion status. * Channel methods no longer included in execution layers command id count. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@562212 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client/ClientChannel.cpp')
-rw-r--r--cpp/src/qpid/client/ClientChannel.cpp390
1 files changed, 160 insertions, 230 deletions
diff --git a/cpp/src/qpid/client/ClientChannel.cpp b/cpp/src/qpid/client/ClientChannel.cpp
index 19b4726a72..8b85017ba0 100644
--- a/cpp/src/qpid/client/ClientChannel.cpp
+++ b/cpp/src/qpid/client/ClientChannel.cpp
@@ -24,9 +24,12 @@
#include "qpid/sys/Monitor.h"
#include "ClientMessage.h"
#include "qpid/QpidError.h"
-#include "MethodBodyInstances.h"
#include "Connection.h"
-#include "BasicMessageChannel.h"
+#include "ConnectionHandler.h"
+#include "FutureResponse.h"
+#include "MessageListener.h"
+#include <boost/format.hpp>
+#include <boost/bind.hpp>
// FIXME aconway 2007-01-26: Evaluate all throws, ensure consistent
// handling of errors that should close the connection or the channel.
@@ -45,18 +48,13 @@ const std::string empty;
}}
-Channel::Channel(bool _transactional, u_int16_t _prefetch, InteropMode mode) :
+Channel::Channel(bool _transactional, u_int16_t _prefetch) :
connection(0), prefetch(_prefetch), transactional(_transactional), errorCode(200), errorText("Ok"), running(false)
{
- switch (mode) {
- case AMQP_08: messaging.reset(new BasicMessageChannel(*this)); break;
- default: assert(0); QPID_ERROR(INTERNAL_ERROR, "Invalid interop-mode.");
- }
}
Channel::~Channel(){
closeInternal();
- stop();
}
void Channel::open(ChannelId id, Connection& con)
@@ -64,65 +62,15 @@ void Channel::open(ChannelId id, Connection& con)
if (isOpen())
THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel "+id);
connection = &con;
- init(id, con, con.getVersion()); // ChannelAdapter initialization.
- string oob;
- if (id != 0)
- sendAndReceive<ChannelOpenOkBody>(make_shared_ptr(new ChannelOpenBody(version, oob)));
-}
+ channelId = id;
+ //link up handlers:
+ channelHandler.out = boost::bind(&ConnectionHandler::outgoing, &(connection->handler), _1);
+ channelHandler.in = boost::bind(&ExecutionHandler::handle, &executionHandler, _1);
+ executionHandler.out = boost::bind(&ChannelHandler::outgoing, &channelHandler, _1);
+ //set up close notification:
+ channelHandler.onClose = boost::bind(&Channel::peerClose, this, _1, _2);
-void Channel::protocolInit(
- const std::string& uid, const std::string& pwd, const std::string& vhost) {
- assert(connection);
- responses.expect();
- connection->connector->init(); // Send ProtocolInit block.
- ConnectionStartBody::shared_ptr connectionStart =
- responses.receive<ConnectionStartBody>();
-
- FieldTable props;
- string mechanism("PLAIN");
- string response = ((char)0) + uid + ((char)0) + pwd;
- string locale("en_US");
- ConnectionTuneBody::shared_ptr proposal =
- sendAndReceive<ConnectionTuneBody>(
- make_shared_ptr(new ConnectionStartOkBody(
- version, //connectionStart->getRequestId(),
- props, mechanism,
- response, locale)));
-
- /**
- * Assume for now that further challenges will not be required
- //receive connection.secure
- responses.receive(connection_secure));
- //send connection.secure-ok
- connection->send(new AMQFrame(0, new ConnectionSecureOkBody(response)));
- **/
-
- sendCommand(make_shared_ptr(new ConnectionTuneOkBody(
- version, //proposal->getRequestId(),
- proposal->getChannelMax(), connection->getMaxFrameSize(),
- proposal->getHeartbeat())));
-
- uint16_t heartbeat = proposal->getHeartbeat();
- connection->connector->setReadTimeout(heartbeat * 2);
- connection->connector->setWriteTimeout(heartbeat);
-
- // Send connection open.
- std::string capabilities;
- responses.expect();
- sendCommand(make_shared_ptr(new ConnectionOpenBody(version, vhost, capabilities, true)));
- //receive connection.open-ok (or redirect, but ignore that for now
- //esp. as using force=true).
- AMQMethodBody::shared_ptr openResponse = responses.receive();
- if(openResponse->isA<ConnectionOpenOkBody>()) {
- //ok
- }else if(openResponse->isA<ConnectionRedirectBody>()){
- //ignore for now
- ConnectionRedirectBody::shared_ptr redirect(
- shared_polymorphic_downcast<ConnectionRedirectBody>(openResponse));
- QPID_LOG(error, "Ignoring redirect to " << redirect->getHost());
- } else {
- THROW_QPID_ERROR(PROTOCOL_ERROR, "Bad response to Connection.open");
- }
+ channelHandler.open(id);
}
bool Channel::isOpen() const {
@@ -131,7 +79,11 @@ bool Channel::isOpen() const {
}
void Channel::setQos() {
- messaging->setQos();
+ executionHandler.send(make_shared_ptr(new BasicQosBody(version, 0, getPrefetch(), false)));
+ if(isTransactional()) {
+ //I think this is wrong! should only send TxSelect once...
+ executionHandler.send(make_shared_ptr(new TxSelectBody(version)));
+ }
}
void Channel::setPrefetch(uint16_t _prefetch){
@@ -143,14 +95,12 @@ void Channel::declareExchange(Exchange& exchange, bool synch){
string name = exchange.getName();
string type = exchange.getType();
FieldTable args;
- send(make_shared_ptr(new ExchangeDeclareBody(version, 0, name, type, empty, false, false, false, args)));
- if (synch) synchWithServer();
+ sendSync(synch, make_shared_ptr(new ExchangeDeclareBody(version, 0, name, type, empty, false, false, false, args)));
}
void Channel::deleteExchange(Exchange& exchange, bool synch){
string name = exchange.getName();
- send(make_shared_ptr(new ExchangeDeleteBody(version, 0, name, false)));
- if (synch) synchWithServer();
+ sendSync(synch, make_shared_ptr(new ExchangeDeleteBody(version, 0, name, false)));
}
void Channel::declareQueue(Queue& queue, bool synch){
@@ -179,131 +129,41 @@ void Channel::deleteQueue(Queue& queue, bool ifunused, bool ifempty, bool synch)
void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){
string e = exchange.getName();
string q = queue.getName();
- send(make_shared_ptr(new QueueBindBody(version, 0, q, e, key, args)));
- if (synch) synchWithServer();
+ sendSync(synch, make_shared_ptr(new QueueBindBody(version, 0, q, e, key, args)));
}
void Channel::commit(){
- send(make_shared_ptr(new TxCommitBody(version)));
+ executionHandler.send(make_shared_ptr(new TxCommitBody(version)));
}
void Channel::rollback(){
- send(make_shared_ptr(new TxRollbackBody(version)));
+ executionHandler.send(make_shared_ptr(new TxRollbackBody(version)));
}
-void Channel::handleMethodInContext(
-AMQMethodBody::shared_ptr method, const MethodContext& ctxt)
+void Channel::close()
{
- // Special case for consume OK as it is both an expected response
- // and needs handling in this thread.
- if (method->isA<BasicConsumeOkBody>()) {
- messaging->handle(method);
- responses.signalResponse(method);
- return;
- }
- if(responses.isWaiting()) {
- responses.signalResponse(method);
- return;
- }
- try {
- switch (method->amqpClassId()) {
- case MessageTransferBody::CLASS_ID:
- case BasicGetOkBody::CLASS_ID: messaging->handle(method); break;
- case ChannelCloseBody::CLASS_ID: handleChannel(method, ctxt); break;
- case ConnectionCloseBody::CLASS_ID: handleConnection(method); break;
- case ExecutionCompleteBody::CLASS_ID: handleExecution(method); break;
- default: throw UnknownMethod();
- }
- }
- catch (const UnknownMethod&) {
- connection->close(
- 504, "Unknown method",
- method->amqpClassId(), method->amqpMethodId());
- }
- }
-
-void Channel::handleChannel(AMQMethodBody::shared_ptr method, const MethodContext& /*ctxt*/) {
- switch (method->amqpMethodId()) {
- case ChannelCloseBody::METHOD_ID:
- sendCommand(make_shared_ptr(new ChannelCloseOkBody(version/*, ctxt.getRequestId()*/)));
- peerClose(shared_polymorphic_downcast<ChannelCloseBody>(method));
- return;
- case ChannelFlowBody::METHOD_ID:
- // FIXME aconway 2007-02-22: Not yet implemented.
- return;
- }
- throw UnknownMethod();
-}
-
-void Channel::handleConnection(AMQMethodBody::shared_ptr method) {
- if (method->amqpMethodId() == ConnectionCloseBody::METHOD_ID) {
- connection->close();
- return;
- }
- throw UnknownMethod();
-}
-
-void Channel::handleExecution(AMQMethodBody::shared_ptr method) {
- if (method->amqpMethodId() == ExecutionCompleteBody::METHOD_ID) {
- Monitor::ScopedLock l(outgoingMonitor);
- //record the completion mark:
- outgoing.lwm = shared_polymorphic_downcast<ExecutionCompleteBody>(method)->getCumulativeExecutionMark();
- //TODO: notify anyone waiting for completion notification:
- outgoingMonitor.notifyAll();
- } else{
- throw UnknownMethod();
- }
-}
-
-void Channel::handleHeader(AMQHeaderBody::shared_ptr body){
- messaging->handle(body);
-}
-
-void Channel::handleContent(AMQContentBody::shared_ptr body){
- messaging->handle(body);
-}
-
-void Channel::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Channel received heartbeat");
-}
-
-void Channel::start(){
- running = true;
- dispatcher = Thread(*messaging);
-}
-
-// Close called by local application.
-void Channel::close(
- uint16_t code, const std::string& text,
- ClassId classId, MethodId methodId)
-{
- if (isOpen()) {
- try {
- if (getId() != 0) {
- if (code == 200) messaging->cancelAll();
-
- sendAndReceive<ChannelCloseOkBody>(
- make_shared_ptr(new ChannelCloseBody(
- version, code, text, classId, methodId)));
- }
- static_cast<ConnectionForChannel*>(connection)->erase(getId());
- closeInternal();
- } catch (...) {
- static_cast<ConnectionForChannel*>(connection)->erase(getId());
- closeInternal();
- throw;
+ channelHandler.close();
+ {
+ Mutex::ScopedLock l(lock);
+ if (connection);
+ {
+ connection->erase(channelId);
+ connection = 0;
}
}
stop();
}
+
// Channel closed by peer.
-void Channel::peerClose(ChannelCloseBody::shared_ptr reason) {
+void Channel::peerClose(uint16_t code, const std::string& message) {
assert(isOpen());
//record reason:
- errorCode = reason->getReplyCode();
- errorText = reason->getReplyText();
+ errorCode = code;
+ errorText = message;
closeInternal();
+ stop();
+ futures.close(code, message);
}
void Channel::closeInternal() {
@@ -311,26 +171,26 @@ void Channel::closeInternal() {
if (connection);
{
connection = 0;
- messaging->close();
- // A 0 response means we are closed.
- responses.signalResponse(AMQMethodBody::shared_ptr());
}
}
-void Channel::stop() {
- Mutex::ScopedLock l(stopLock);
- if(running) {
- dispatcher.join();
- running = false;
- }
+AMQMethodBody::shared_ptr Channel::sendAndReceive(AMQMethodBody::shared_ptr toSend, ClassId /*c*/, MethodId /*m*/)
+{
+
+ boost::shared_ptr<FutureResponse> fr(futures.createResponse());
+ executionHandler.send(toSend, boost::bind(&FutureResponse::completed, fr), boost::bind(&FutureResponse::received, fr, _1));
+ return fr->getResponse();
}
-AMQMethodBody::shared_ptr Channel::sendAndReceive(
- AMQMethodBody::shared_ptr toSend, ClassId c, MethodId m)
+void Channel::sendSync(bool sync, AMQMethodBody::shared_ptr command)
{
- responses.expect();
- sendCommand(toSend);
- return responses.receive(c, m);
+ if(sync) {
+ boost::shared_ptr<FutureCompletion> fc(futures.createCompletion());
+ executionHandler.send(command, boost::bind(&FutureCompletion::completed, fc));
+ fc->waitForCompletion();
+ } else {
+ executionHandler.send(command);
+ }
}
AMQMethodBody::shared_ptr Channel::sendAndReceiveSync(
@@ -339,68 +199,138 @@ AMQMethodBody::shared_ptr Channel::sendAndReceiveSync(
if(sync)
return sendAndReceive(body, c, m);
else {
- sendCommand(body);
+ executionHandler.send(body);
return AMQMethodBody::shared_ptr();
}
}
void Channel::consume(
- Queue& queue, std::string& tag, MessageListener* listener,
+ Queue& queue, const std::string& tag, MessageListener* listener,
AckMode ackMode, bool noLocal, bool synch, const FieldTable* fields) {
- messaging->consume(queue, tag, listener, ackMode, noLocal, synch, fields);
+
+ if (tag.empty()) {
+ throw Exception("A tag must be specified for a consumer.");
+ }
+ {
+ Mutex::ScopedLock l(lock);
+ ConsumerMap::iterator i = consumers.find(tag);
+ if (i != consumers.end())
+ throw Exception(boost::format("Consumer already exists with tag: '%1%'") % tag);
+ Consumer& c = consumers[tag];
+ c.listener = listener;
+ c.ackMode = ackMode;
+ c.lastDeliveryTag = 0;
+ }
+ sendAndReceiveSync<BasicConsumeOkBody>(
+ synch,
+ make_shared_ptr(new BasicConsumeBody(
+ version, 0, queue.getName(), tag, noLocal,
+ ackMode == NO_ACK, false, !synch,
+ fields ? *fields : FieldTable())));
}
void Channel::cancel(const std::string& tag, bool synch) {
- messaging->cancel(tag, synch);
+ Consumer c;
+ {
+ Mutex::ScopedLock l(lock);
+ ConsumerMap::iterator i = consumers.find(tag);
+ if (i == consumers.end())
+ return;
+ c = i->second;
+ consumers.erase(i);
+ }
+ sendAndReceiveSync<BasicCancelOkBody>(
+ synch, make_shared_ptr(new BasicCancelBody(version, tag, !synch)));
}
bool Channel::get(Message& msg, const Queue& queue, AckMode ackMode) {
- bool result = messaging->get(msg, queue, ackMode);
- if (!isOpen()) {
- throw ChannelException(errorCode, errorText);
+
+ AMQMethodBody::shared_ptr request(new BasicGetBody(version, 0, queue.getName(), ackMode));
+ AMQMethodBody::shared_ptr response = sendAndReceive(request);
+ if (response && response->isA<BasicGetEmptyBody>()) {
+ return false;
+ } else {
+ ReceivedContent::shared_ptr content = gets.pop();
+ content->populate(msg);
+ return true;
}
- return result;
}
void Channel::publish(const Message& msg, const Exchange& exchange,
const std::string& routingKey,
bool mandatory, bool immediate) {
- messaging->publish(msg, exchange, routingKey, mandatory, immediate);
-}
-
-void Channel::setReturnedMessageHandler(ReturnedMessageHandler* handler) {
- messaging->setReturnedMessageHandler(handler);
-}
-void Channel::run() {
- messaging->run();
+ const string e = exchange.getName();
+ string key = routingKey;
+
+ executionHandler.sendContent(make_shared_ptr(new BasicPublishBody(version, 0, e, key, mandatory, immediate)),
+ msg, msg.getData(), connection->getMaxFrameSize());//sending framesize here is horrible, fix this!
+ /*
+ // Make a header for the message
+ AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
+ BasicHeaderProperties::copy(
+ *static_cast<BasicHeaderProperties*>(header->getProperties()), msg);
+ header->setContentSize(msg.getData().size());
+
+ executionHandler.send(make_shared_ptr(new BasicPublishBody(version, 0, e, key, mandatory, immediate)));
+ executionHandler.sendContent(header);
+ string data = msg.getData();
+ u_int64_t data_length = data.length();
+ if(data_length > 0){
+ //frame itself uses 8 bytes
+ u_int32_t frag_size = connection->getMaxFrameSize() - 8;
+ if(data_length < frag_size){
+ executionHandler.sendContent(make_shared_ptr(new AMQContentBody(data)));
+ }else{
+ u_int32_t offset = 0;
+ u_int32_t remaining = data_length - offset;
+ while (remaining > 0) {
+ u_int32_t length = remaining > frag_size ? frag_size : remaining;
+ string frag(data.substr(offset, length));
+ executionHandler.sendContent(make_shared_ptr(new AMQContentBody(frag)));
+
+ offset += length;
+ remaining = data_length - offset;
+ }
+ }
+ }
+ */
}
-void Channel::sendCommand(AMQBody::shared_ptr body)
-{
- ++(outgoing.hwm);
- send(body);
+void Channel::start(){
+ running = true;
+ dispatcher = Thread(*this);
}
-bool Channel::waitForCompletion(SequenceNumber poi, Duration timeout)
-{
- AbsTime end;
- if (timeout == 0) {
- end = AbsTime::FarFuture();
- } else {
- end = AbsTime(AbsTime::now(), timeout);
- }
-
- Monitor::ScopedLock l(outgoingMonitor);
- while (end > AbsTime::now() && outgoing.lwm < poi) {
- outgoingMonitor.wait(end);
+void Channel::stop() {
+ executionHandler.received.close();
+ gets.close();
+ Mutex::ScopedLock l(stopLock);
+ if(running) {
+ dispatcher.join();
+ running = false;
}
- return !(outgoing.lwm < poi);
}
-bool Channel::synchWithServer(Duration timeout)
-{
- send(make_shared_ptr(new ExecutionFlushBody(version)));
- return waitForCompletion(outgoing.hwm, timeout);
+void Channel::run() {
+ try {
+ while (true) {
+ ReceivedContent::shared_ptr content = executionHandler.received.pop();
+ //need to dispatch this to the relevant listener:
+ if (content->isA<BasicDeliverBody>()) {
+ ConsumerMap::iterator i = consumers.find(content->as<BasicDeliverBody>()->getConsumerTag());
+ if (i != consumers.end()) {
+ Message msg;
+ content->populate(msg);
+ i->second.listener->received(msg);
+ } else {
+ QPID_LOG(warning, "Dropping message for unrecognised consumer: " << content->getMethod());
+ }
+ } else if (content->isA<BasicGetOkBody>()) {
+ gets.push(content);
+ } else {
+ QPID_LOG(warning, "Dropping unsupported message type: " << content->getMethod());
+ }
+ }
+ } catch (const QueueClosed&) {}
}
-