diff options
author | Alan Conway <aconway@apache.org> | 2007-01-30 18:20:00 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2007-01-30 18:20:00 +0000 |
commit | 98ccae7574a18f8d0a1f9e28e86ccfde4541c81f (patch) | |
tree | 528fe0c686b9193e66bdd222d0aee6c4705f34e7 /cpp/lib/client/ClientChannel.cpp | |
parent | 53d097bd7e565d08f902671180be58d4b2a9d843 (diff) | |
download | qpid-python-98ccae7574a18f8d0a1f9e28e86ccfde4541c81f.tar.gz |
* client/* framing/*: fixed client-side request ID processing.
* cpp/tests/InProcessBroker.h: For tests: connect to an in-process
broker directly, bypass the network. Keeps log of client/broker
conversation for verification in test code.
* cpp/tests/FramingTest.cpp (testRequestResponseRoundtrip):
Client/broker round-trip test for request/reponse IDs and response mark.
* APRAcceptor.cpp (APRAcceptor): fixed valgrind uninitialized error.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@501502 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/client/ClientChannel.cpp')
-rw-r--r-- | cpp/lib/client/ClientChannel.cpp | 39 |
1 files changed, 19 insertions, 20 deletions
diff --git a/cpp/lib/client/ClientChannel.cpp b/cpp/lib/client/ClientChannel.cpp index b93596ebfc..a207763aac 100644 --- a/cpp/lib/client/ClientChannel.cpp +++ b/cpp/lib/client/ClientChannel.cpp @@ -85,13 +85,9 @@ void Channel::protocolInit( connection->send(new AMQFrame(0, new ConnectionSecureOkBody(response))); **/ - connection->send( - new AMQFrame( - version, 0, - new ConnectionTuneOkBody( - version, proposal->getChannelMax(), - connection->getMaxFrameSize(), - proposal->getHeartbeat()))); + (new ConnectionTuneOkBody( + version, proposal->getChannelMax(), connection->getMaxFrameSize(), + proposal->getHeartbeat()))->send(context); u_int16_t heartbeat = proposal->getHeartbeat(); connection->connector->setReadTimeout(heartbeat * 2); @@ -100,9 +96,8 @@ void Channel::protocolInit( // Send connection open. std::string capabilities; responses.expect(); - send(new AMQFrame( - version, 0, - new ConnectionOpenBody(version, vhost, capabilities, true))); + (new ConnectionOpenBody(version, vhost, capabilities, true)) + ->send(context); //receive connection.open-ok (or redirect, but ignore that for now //esp. as using force=true). responses.waitForResponse(); @@ -213,7 +208,8 @@ void Channel::cancel(const std::string& tag, bool synch) { if (i != consumers.end()) { Consumer& c = i->second; if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0) - send(new BasicAckBody(version, c.lastDeliveryTag, true)); + (new BasicAckBody(version, c.lastDeliveryTag, true)) + ->send(context); sendAndReceiveSync<BasicCancelOkBody>( synch, new BasicCancelBody(version, tag, !synch)); consumers.erase(tag); @@ -231,7 +227,8 @@ void Channel::cancelAll(){ // trying the rest. NB no memory leaks if we do, // ConsumerMap holds values, not pointers. // - send(new BasicAckBody(version, c.lastDeliveryTag, true)); + (new BasicAckBody(version, c.lastDeliveryTag, true)) + ->send(context); } } } @@ -251,9 +248,8 @@ void Channel::retrieve(Message& msg){ bool Channel::get(Message& msg, const Queue& queue, int ackMode) { string name = queue.getName(); - AMQBody::shared_ptr body(new BasicGetBody(version, 0, name, ackMode)); responses.expect(); - send(body); + (new BasicGetBody(version, 0, name, ackMode))->send(context); responses.waitForResponse(); AMQMethodBody::shared_ptr response = responses.getResponse(); if(response->isA<BasicGetOkBody>()) { @@ -276,10 +272,12 @@ bool Channel::get(Message& msg, const Queue& queue, int ackMode) { void Channel::publish(Message& msg, const Exchange& exchange, const std::string& routingKey, bool mandatory, bool immediate){ + // FIXME aconway 2007-01-30: Rework for message class. + string e = exchange.getName(); string key = routingKey; - send(new BasicPublishBody(version, 0, e, key, mandatory, immediate)); + (new BasicPublishBody(version, 0, e, key, mandatory, immediate))->send(context); //break msg up into header frame and content frame(s) and send these string data = msg.getData(); msg.header->setContentSize(data.length()); @@ -428,7 +426,8 @@ void Channel::deliver(Consumer& consumer, Message& msg){ if(++(consumer.count) < prefetch) break; //else drop-through case AUTO_ACK: - send(new BasicAckBody(version, msg.getDeliveryTag(), multiple)); + (new BasicAckBody(version, msg.getDeliveryTag(), multiple)) + ->send(context); consumer.lastDeliveryTag = 0; } } @@ -510,20 +509,20 @@ void Channel::closeInternal() { dispatcher.join(); } -void Channel::sendAndReceive(AMQBody* toSend, ClassId c, MethodId m) +void Channel::sendAndReceive(AMQMethodBody* toSend, ClassId c, MethodId m) { responses.expect(); - send(toSend); + toSend->send(context); responses.receive(c, m); } void Channel::sendAndReceiveSync( - bool sync, AMQBody* body, ClassId c, MethodId m) + bool sync, AMQMethodBody* body, ClassId c, MethodId m) { if(sync) sendAndReceive(body, c, m); else - send(body); + body->send(context); } |