summaryrefslogtreecommitdiff
path: root/qpid/cpp/lib/client/ClientChannel.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/lib/client/ClientChannel.cpp')
-rw-r--r--qpid/cpp/lib/client/ClientChannel.cpp73
1 files changed, 40 insertions, 33 deletions
diff --git a/qpid/cpp/lib/client/ClientChannel.cpp b/qpid/cpp/lib/client/ClientChannel.cpp
index 97e0a394d2..98feff9389 100644
--- a/qpid/cpp/lib/client/ClientChannel.cpp
+++ b/qpid/cpp/lib/client/ClientChannel.cpp
@@ -68,7 +68,8 @@ void Channel::protocolInit(
assert(connection);
responses.expect();
connection->connector->init(); // Send ProtocolInit block.
- responses.receive<ConnectionStartBody>();
+ ConnectionStartBody::shared_ptr connectionStart =
+ responses.receive<ConnectionStartBody>();
FieldTable props;
string mechanism("PLAIN");
@@ -77,7 +78,8 @@ void Channel::protocolInit(
ConnectionTuneBody::shared_ptr proposal =
sendAndReceive<ConnectionTuneBody>(
new ConnectionStartOkBody(
- version, responses.getRequestId(), props, mechanism,
+ version, connectionStart->getRequestId(),
+ props, mechanism,
response, locale));
/**
@@ -89,7 +91,8 @@ void Channel::protocolInit(
**/
send(new ConnectionTuneOkBody(
- version, responses.getRequestId(), proposal->getChannelMax(), connection->getMaxFrameSize(),
+ version, proposal->getRequestId(),
+ proposal->getChannelMax(), connection->getMaxFrameSize(),
proposal->getHeartbeat()));
uint16_t heartbeat = proposal->getHeartbeat();
@@ -102,18 +105,17 @@ void Channel::protocolInit(
send(new ConnectionOpenBody(version, vhost, capabilities, true));
//receive connection.open-ok (or redirect, but ignore that for now
//esp. as using force=true).
- responses.waitForResponse();
- if(responses.validate<ConnectionOpenOkBody>()) {
+ AMQMethodBody::shared_ptr openResponse = responses.receive();
+ if(openResponse->isA<ConnectionOpenOkBody>()) {
//ok
- }else if(responses.validate<ConnectionRedirectBody>()){
+ }else if(openResponse->isA<ConnectionRedirectBody>()){
//ignore for now
ConnectionRedirectBody::shared_ptr redirect(
- shared_polymorphic_downcast<ConnectionRedirectBody>(
- responses.getResponse()));
+ shared_polymorphic_downcast<ConnectionRedirectBody>(openResponse));
cout << "Received redirection to " << redirect->getHost()
<< endl;
} else {
- THROW_QPID_ERROR(PROTOCOL_ERROR, "Bad response");
+ THROW_QPID_ERROR(PROTOCOL_ERROR, "Bad response to Connection.open");
}
}
@@ -121,7 +123,6 @@ bool Channel::isOpen() const { return connection; }
void Channel::setQos() {
messaging->setQos();
- // FIXME aconway 2007-02-22: message
}
void Channel::setPrefetch(uint16_t _prefetch){
@@ -149,18 +150,15 @@ void Channel::deleteExchange(Exchange& exchange, bool synch){
void Channel::declareQueue(Queue& queue, bool synch){
string name = queue.getName();
FieldTable args;
- sendAndReceiveSync<QueueDeclareOkBody>(
- synch,
- new QueueDeclareBody(
- version, 0, name, false/*passive*/, queue.isDurable(),
- queue.isExclusive(), queue.isAutoDelete(), !synch, args));
- if(synch){
- if(queue.getName().length() == 0){
- QueueDeclareOkBody::shared_ptr response =
- shared_polymorphic_downcast<QueueDeclareOkBody>(
- responses.getResponse());
+ QueueDeclareOkBody::shared_ptr response =
+ sendAndReceiveSync<QueueDeclareOkBody>(
+ synch,
+ new QueueDeclareBody(
+ version, 0, name, false/*passive*/, queue.isDurable(),
+ queue.isExclusive(), queue.isAutoDelete(), !synch, args));
+ if(synch) {
+ if(queue.getName().length() == 0)
queue.setName(response->getQueue());
- }
}
}
@@ -191,6 +189,14 @@ void Channel::rollback(){
void Channel::handleMethodInContext(
AMQMethodBody::shared_ptr method, const MethodContext&)
{
+ // TODO aconway 2007-03-23: Special case for consume OK as it
+ // is both an expected response and needs handling in this thread.
+ // Need to review & reationalize the client-side processing model.
+ if (method->isA<BasicConsumeOkBody>()) {
+ messaging->handle(method);
+ responses.signalResponse(method);
+ return;
+ }
if(responses.isWaiting()) {
responses.signalResponse(method);
return;
@@ -204,11 +210,11 @@ void Channel::handleMethodInContext(
}
}
catch (const UnknownMethod&) {
- connection->close(
- 504, "Unknown method",
- method->amqpClassId(), method->amqpMethodId());
- }
-}
+ connection->close(
+ 504, "Unknown method",
+ method->amqpClassId(), method->amqpMethodId());
+ }
+ }
void Channel::handleChannel(AMQMethodBody::shared_ptr method) {
switch (method->amqpMethodId()) {
@@ -272,8 +278,6 @@ void Channel::close(
void Channel::peerClose(ChannelCloseBody::shared_ptr) {
assert(isOpen());
closeInternal();
- // FIXME aconway 2007-01-26: How to throw the proper exception
- // to the application thread?
}
void Channel::closeInternal() {
@@ -287,20 +291,23 @@ void Channel::closeInternal() {
dispatcher.join();
}
-void Channel::sendAndReceive(AMQMethodBody* toSend, ClassId c, MethodId m)
+AMQMethodBody::shared_ptr Channel::sendAndReceive(
+ AMQMethodBody* toSend, ClassId c, MethodId m)
{
responses.expect();
send(toSend);
- responses.receive(c, m);
+ return responses.receive(c, m);
}
-void Channel::sendAndReceiveSync(
+AMQMethodBody::shared_ptr Channel::sendAndReceiveSync(
bool sync, AMQMethodBody* body, ClassId c, MethodId m)
{
if(sync)
- sendAndReceive(body, c, m);
- else
+ return sendAndReceive(body, c, m);
+ else {
send(body);
+ return AMQMethodBody::shared_ptr();
+ }
}
void Channel::consume(