summaryrefslogtreecommitdiff
path: root/qpid/cpp/lib/client/ClientChannel.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-03-27 15:36:39 +0000
committerAlan Conway <aconway@apache.org>2007-03-27 15:36:39 +0000
commit1d40f897b6850d6c91b807235c4105b815291a49 (patch)
tree129575dfa31d64a649030097ecf38deb786fd95f /qpid/cpp/lib/client/ClientChannel.cpp
parentab77fcc2ed974e8d4ac2a56be62cc2cb3f8e2c11 (diff)
downloadqpid-python-1d40f897b6850d6c91b807235c4105b815291a49.tar.gz
Refactored client::Message to be independent of all Basic class concepts
and client::IncomingMessage to handle 0-9 style references and appends. * cpp/lib/client/ClientMessage.cpp: Made independent of Basic class. * cpp/lib/client/IncomingMessage.cpp: Refactored to handle references/appends. * cpp/lib/client/BasicMessageChannel.cpp: Refactored to use new IncomingMessage Thread safety fixes: * cpp/lib/client/ResponseHandler.h: Remove stateful functions. * cpp/lib/client/ClientChannel.cpp: use new ResponseHandler interface. Minor cleanup: * cpp/lib/common/framing/BasicHeaderProperties.cpp: use DeliveryMode enum. * cpp/tests/HeaderTest.cpp: use DeliveryMode enum. * cpp/tests/MessageTest.cpp: use DeliveryMode enum. * cpp/lib/common/shared_ptr.h: #include <boost/cast.hpp> for convenience. * cpp/lib/common/sys/ThreadSafeQueue.h: Changed "stop" "shutdown" * cpp/lib/common/sys/ProducerConsumer.h: Changed "stop" "shutdown" * cpp/tests/ClientChannelTest.cpp (TestCase): Removed debug couts. * cpp/tests/setup: valgrind --demangle=yes by default. * cpp/tests/topictest: sleep to hack around startup race. * cpp/lib/broker/BrokerQueue.cpp (configure): Fixed memory leak. Removed/updated FIXME comments in: * cpp/lib/broker/BrokerMessage.cpp: * cpp/lib/broker/BrokerMessageBase.h: * cpp/lib/broker/InMemoryContent.cpp: * cpp/lib/common/framing/MethodContext.h: git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@522956 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/lib/client/ClientChannel.cpp')
-rw-r--r--qpid/cpp/lib/client/ClientChannel.cpp73
1 files changed, 40 insertions, 33 deletions
diff --git a/qpid/cpp/lib/client/ClientChannel.cpp b/qpid/cpp/lib/client/ClientChannel.cpp
index 97e0a394d2..98feff9389 100644
--- a/qpid/cpp/lib/client/ClientChannel.cpp
+++ b/qpid/cpp/lib/client/ClientChannel.cpp
@@ -68,7 +68,8 @@ void Channel::protocolInit(
assert(connection);
responses.expect();
connection->connector->init(); // Send ProtocolInit block.
- responses.receive<ConnectionStartBody>();
+ ConnectionStartBody::shared_ptr connectionStart =
+ responses.receive<ConnectionStartBody>();
FieldTable props;
string mechanism("PLAIN");
@@ -77,7 +78,8 @@ void Channel::protocolInit(
ConnectionTuneBody::shared_ptr proposal =
sendAndReceive<ConnectionTuneBody>(
new ConnectionStartOkBody(
- version, responses.getRequestId(), props, mechanism,
+ version, connectionStart->getRequestId(),
+ props, mechanism,
response, locale));
/**
@@ -89,7 +91,8 @@ void Channel::protocolInit(
**/
send(new ConnectionTuneOkBody(
- version, responses.getRequestId(), proposal->getChannelMax(), connection->getMaxFrameSize(),
+ version, proposal->getRequestId(),
+ proposal->getChannelMax(), connection->getMaxFrameSize(),
proposal->getHeartbeat()));
uint16_t heartbeat = proposal->getHeartbeat();
@@ -102,18 +105,17 @@ void Channel::protocolInit(
send(new ConnectionOpenBody(version, vhost, capabilities, true));
//receive connection.open-ok (or redirect, but ignore that for now
//esp. as using force=true).
- responses.waitForResponse();
- if(responses.validate<ConnectionOpenOkBody>()) {
+ AMQMethodBody::shared_ptr openResponse = responses.receive();
+ if(openResponse->isA<ConnectionOpenOkBody>()) {
//ok
- }else if(responses.validate<ConnectionRedirectBody>()){
+ }else if(openResponse->isA<ConnectionRedirectBody>()){
//ignore for now
ConnectionRedirectBody::shared_ptr redirect(
- shared_polymorphic_downcast<ConnectionRedirectBody>(
- responses.getResponse()));
+ shared_polymorphic_downcast<ConnectionRedirectBody>(openResponse));
cout << "Received redirection to " << redirect->getHost()
<< endl;
} else {
- THROW_QPID_ERROR(PROTOCOL_ERROR, "Bad response");
+ THROW_QPID_ERROR(PROTOCOL_ERROR, "Bad response to Connection.open");
}
}
@@ -121,7 +123,6 @@ bool Channel::isOpen() const { return connection; }
void Channel::setQos() {
messaging->setQos();
- // FIXME aconway 2007-02-22: message
}
void Channel::setPrefetch(uint16_t _prefetch){
@@ -149,18 +150,15 @@ void Channel::deleteExchange(Exchange& exchange, bool synch){
void Channel::declareQueue(Queue& queue, bool synch){
string name = queue.getName();
FieldTable args;
- sendAndReceiveSync<QueueDeclareOkBody>(
- synch,
- new QueueDeclareBody(
- version, 0, name, false/*passive*/, queue.isDurable(),
- queue.isExclusive(), queue.isAutoDelete(), !synch, args));
- if(synch){
- if(queue.getName().length() == 0){
- QueueDeclareOkBody::shared_ptr response =
- shared_polymorphic_downcast<QueueDeclareOkBody>(
- responses.getResponse());
+ QueueDeclareOkBody::shared_ptr response =
+ sendAndReceiveSync<QueueDeclareOkBody>(
+ synch,
+ new QueueDeclareBody(
+ version, 0, name, false/*passive*/, queue.isDurable(),
+ queue.isExclusive(), queue.isAutoDelete(), !synch, args));
+ if(synch) {
+ if(queue.getName().length() == 0)
queue.setName(response->getQueue());
- }
}
}
@@ -191,6 +189,14 @@ void Channel::rollback(){
void Channel::handleMethodInContext(
AMQMethodBody::shared_ptr method, const MethodContext&)
{
+ // TODO aconway 2007-03-23: Special case for consume OK as it
+ // is both an expected response and needs handling in this thread.
+ // Need to review & reationalize the client-side processing model.
+ if (method->isA<BasicConsumeOkBody>()) {
+ messaging->handle(method);
+ responses.signalResponse(method);
+ return;
+ }
if(responses.isWaiting()) {
responses.signalResponse(method);
return;
@@ -204,11 +210,11 @@ void Channel::handleMethodInContext(
}
}
catch (const UnknownMethod&) {
- connection->close(
- 504, "Unknown method",
- method->amqpClassId(), method->amqpMethodId());
- }
-}
+ connection->close(
+ 504, "Unknown method",
+ method->amqpClassId(), method->amqpMethodId());
+ }
+ }
void Channel::handleChannel(AMQMethodBody::shared_ptr method) {
switch (method->amqpMethodId()) {
@@ -272,8 +278,6 @@ void Channel::close(
void Channel::peerClose(ChannelCloseBody::shared_ptr) {
assert(isOpen());
closeInternal();
- // FIXME aconway 2007-01-26: How to throw the proper exception
- // to the application thread?
}
void Channel::closeInternal() {
@@ -287,20 +291,23 @@ void Channel::closeInternal() {
dispatcher.join();
}
-void Channel::sendAndReceive(AMQMethodBody* toSend, ClassId c, MethodId m)
+AMQMethodBody::shared_ptr Channel::sendAndReceive(
+ AMQMethodBody* toSend, ClassId c, MethodId m)
{
responses.expect();
send(toSend);
- responses.receive(c, m);
+ return responses.receive(c, m);
}
-void Channel::sendAndReceiveSync(
+AMQMethodBody::shared_ptr Channel::sendAndReceiveSync(
bool sync, AMQMethodBody* body, ClassId c, MethodId m)
{
if(sync)
- sendAndReceive(body, c, m);
- else
+ return sendAndReceive(body, c, m);
+ else {
send(body);
+ return AMQMethodBody::shared_ptr();
+ }
}
void Channel::consume(