summaryrefslogtreecommitdiff
path: root/cpp/lib/client
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-03-27 15:36:39 +0000
committerAlan Conway <aconway@apache.org>2007-03-27 15:36:39 +0000
commit847ee577e23fbdd2175709a08a7160e8b2c1f464 (patch)
treee4962c5246c91a08ef635f2c68e06b82cfb100ee /cpp/lib/client
parentfb14a2042dd5bdae5a5c79b8cd4f1ad87e59bee1 (diff)
downloadqpid-python-847ee577e23fbdd2175709a08a7160e8b2c1f464.tar.gz
Refactored client::Message to be independent of all Basic class concepts
and client::IncomingMessage to handle 0-9 style references and appends. * cpp/lib/client/ClientMessage.cpp: Made independent of Basic class. * cpp/lib/client/IncomingMessage.cpp: Refactored to handle references/appends. * cpp/lib/client/BasicMessageChannel.cpp: Refactored to use new IncomingMessage Thread safety fixes: * cpp/lib/client/ResponseHandler.h: Remove stateful functions. * cpp/lib/client/ClientChannel.cpp: use new ResponseHandler interface. Minor cleanup: * cpp/lib/common/framing/BasicHeaderProperties.cpp: use DeliveryMode enum. * cpp/tests/HeaderTest.cpp: use DeliveryMode enum. * cpp/tests/MessageTest.cpp: use DeliveryMode enum. * cpp/lib/common/shared_ptr.h: #include <boost/cast.hpp> for convenience. * cpp/lib/common/sys/ThreadSafeQueue.h: Changed "stop" "shutdown" * cpp/lib/common/sys/ProducerConsumer.h: Changed "stop" "shutdown" * cpp/tests/ClientChannelTest.cpp (TestCase): Removed debug couts. * cpp/tests/setup: valgrind --demangle=yes by default. * cpp/tests/topictest: sleep to hack around startup race. * cpp/lib/broker/BrokerQueue.cpp (configure): Fixed memory leak. Removed/updated FIXME comments in: * cpp/lib/broker/BrokerMessage.cpp: * cpp/lib/broker/BrokerMessageBase.h: * cpp/lib/broker/InMemoryContent.cpp: * cpp/lib/common/framing/MethodContext.h: git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@522956 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/client')
-rw-r--r--cpp/lib/client/BasicMessageChannel.cpp276
-rw-r--r--cpp/lib/client/BasicMessageChannel.h8
-rw-r--r--cpp/lib/client/ClientChannel.cpp73
-rw-r--r--cpp/lib/client/ClientChannel.h15
-rw-r--r--cpp/lib/client/ClientMessage.cpp161
-rw-r--r--cpp/lib/client/ClientMessage.h75
-rw-r--r--cpp/lib/client/IncomingMessage.cpp204
-rw-r--r--cpp/lib/client/IncomingMessage.h131
-rw-r--r--cpp/lib/client/Makefile.am1
-rw-r--r--cpp/lib/client/ResponseHandler.cpp57
-rw-r--r--cpp/lib/client/ResponseHandler.h51
11 files changed, 474 insertions, 578 deletions
diff --git a/cpp/lib/client/BasicMessageChannel.cpp b/cpp/lib/client/BasicMessageChannel.cpp
index 012a55b9ea..d6929965ee 100644
--- a/cpp/lib/client/BasicMessageChannel.cpp
+++ b/cpp/lib/client/BasicMessageChannel.cpp
@@ -15,7 +15,6 @@
* limitations under the License.
*
*/
-#include <iostream>
#include "BasicMessageChannel.h"
#include "AMQMethodBody.h"
#include "ClientChannel.h"
@@ -23,39 +22,93 @@
#include "MessageListener.h"
#include "framing/FieldTable.h"
#include "Connection.h"
-
-using namespace std;
+#include <queue>
+#include <iostream>
+#include <boost/format.hpp>
+#include <boost/variant.hpp>
namespace qpid {
namespace client {
+using namespace std;
using namespace sys;
using namespace framing;
+using boost::format;
+
+namespace {
+
+// Destination name constants
+const std::string BASIC_GET("__basic_get__");
+const std::string BASIC_RETURN("__basic_return__");
+
+// Reference name constant
+const std::string BASIC_REF("__basic_reference__");
+}
+
+class BasicMessageChannel::WaitableDestination :
+ public IncomingMessage::Destination
+{
+ public:
+ WaitableDestination() : shutdownFlag(false) {}
+ void message(const Message& msg) {
+ Mutex::ScopedLock l(monitor);
+ queue.push(msg);
+ monitor.notify();
+ }
+
+ void empty() {
+ Mutex::ScopedLock l(monitor);
+ queue.push(Empty());
+ monitor.notify();
+ }
+
+ bool wait(Message& msgOut) {
+ Mutex::ScopedLock l(monitor);
+ while (queue.empty() && !shutdownFlag)
+ monitor.wait();
+ if (shutdownFlag)
+ return false;
+ Message* msg = boost::get<Message>(&queue.front());
+ bool success = msg;
+ if (success)
+ msgOut=*msg;
+ queue.pop();
+ if (!queue.empty())
+ monitor.notify(); // Wake another waiter.
+ return success;
+ }
+
+ void shutdown() {
+ Mutex::ScopedLock l(monitor);
+ shutdownFlag = true;
+ monitor.notifyAll();
+ }
+
+ private:
+ struct Empty {};
+ typedef boost::variant<Message,Empty> Item;
+ sys::Monitor monitor;
+ std::queue<Item> queue;
+ bool shutdownFlag;
+};
+
BasicMessageChannel::BasicMessageChannel(Channel& ch)
- : channel(ch), returnsHandler(0) {}
+ : channel(ch), returnsHandler(0),
+ destGet(new WaitableDestination()),
+ destDispatch(new WaitableDestination())
+{
+ incoming.addDestination(BASIC_RETURN, *destDispatch);
+}
void BasicMessageChannel::consume(
Queue& queue, std::string& tag, MessageListener* listener,
AckMode ackMode, bool noLocal, bool synch, const FieldTable* fields)
{
- channel.sendAndReceiveSync<BasicConsumeOkBody>(
- synch,
- new BasicConsumeBody(
- channel.version, 0, queue.getName(), tag, noLocal,
- ackMode == NO_ACK, false, !synch,
- fields ? *fields : FieldTable()));
- if (synch) {
- BasicConsumeOkBody::shared_ptr response =
- boost::shared_polymorphic_downcast<BasicConsumeOkBody>(
- channel.responses.getResponse());
- tag = response->getConsumerTag();
- }
- // FIXME aconway 2007-02-20: Race condition!
- // We could receive the first message for the consumer
- // before we create the consumer below.
- // Move consumer creation to handler for BasicConsumeOkBody
{
+ // Note we create a consumer even if tag="". In that case
+ // It will be renamed when we handle BasicConsumeOkBody.
+ //
Mutex::ScopedLock l(lock);
ConsumerMap::iterator i = consumers.find(tag);
if (i != consumers.end())
@@ -66,6 +119,23 @@ void BasicMessageChannel::consume(
c.ackMode = ackMode;
c.lastDeliveryTag = 0;
}
+
+ // FIXME aconway 2007-03-23: get processed in both.
+
+ // BasicConsumeOkBody is really processed in handle(), here
+ // we just pick up the tag to return to the user.
+ //
+ // We can't process it here because messages for the consumer may
+ // already be arriving.
+ //
+ BasicConsumeOkBody::shared_ptr ok =
+ channel.sendAndReceiveSync<BasicConsumeOkBody>(
+ synch,
+ new BasicConsumeBody(
+ channel.version, 0, queue.getName(), tag, noLocal,
+ ackMode == NO_ACK, false, !synch,
+ fields ? *fields : FieldTable()));
+ tag = ok->getConsumerTag();
}
@@ -92,6 +162,8 @@ void BasicMessageChannel::close(){
consumersCopy = consumers;
consumers.clear();
}
+ destGet->shutdown();
+ destDispatch->shutdown();
for (ConsumerMap::iterator i=consumersCopy.begin();
i != consumersCopy.end(); ++i)
{
@@ -102,28 +174,37 @@ void BasicMessageChannel::close(){
channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true));
}
}
- incoming.shutdown();
}
-
-bool BasicMessageChannel::get(Message& msg, const Queue& queue, AckMode ackMode) {
- // Expect a message starting with a BasicGetOk
- incoming.startGet();
- channel.send(new BasicGetBody(channel.version, 0, queue.getName(), ackMode));
- return incoming.waitGet(msg);
+bool BasicMessageChannel::get(
+ Message& msg, const Queue& queue, AckMode ackMode)
+{
+ // Prepare for incoming response
+ incoming.addDestination(BASIC_GET, *destGet);
+ channel.send(
+ new BasicGetBody(channel.version, 0, queue.getName(), ackMode));
+ bool got = destGet->wait(msg);
+ return got;
}
void BasicMessageChannel::publish(
const Message& msg, const Exchange& exchange,
const std::string& routingKey, bool mandatory, bool immediate)
{
- msg.getHeader()->setContentSize(msg.getData().size());
const string e = exchange.getName();
string key = routingKey;
- channel.send(new BasicPublishBody(channel.version, 0, e, key, mandatory, immediate));
- //break msg up into header frame and content frame(s) and send these
- channel.send(msg.getHeader());
+
+ // Make a header for the message
+ AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
+ BasicHeaderProperties::copy(
+ *static_cast<BasicHeaderProperties*>(header->getProperties()), msg);
+ header->setContentSize(msg.getData().size());
+
+ channel.send(
+ new BasicPublishBody(
+ channel.version, 0, e, key, mandatory, immediate));
+ channel.send(header);
string data = msg.getData();
u_int64_t data_length = data.length();
if(data_length > 0){
@@ -149,22 +230,76 @@ void BasicMessageChannel::publish(
void BasicMessageChannel::handle(boost::shared_ptr<AMQMethodBody> method) {
assert(method->amqpClassId() ==BasicGetBody::CLASS_ID);
switch(method->amqpMethodId()) {
- case BasicDeliverBody::METHOD_ID:
- case BasicReturnBody::METHOD_ID:
- case BasicGetOkBody::METHOD_ID:
- case BasicGetEmptyBody::METHOD_ID:
- incoming.add(method);
- return;
+ case BasicGetOkBody::METHOD_ID: {
+ incoming.openReference(BASIC_REF);
+ incoming.createMessage(BASIC_GET, BASIC_REF);
+ return;
+ }
+ case BasicGetEmptyBody::METHOD_ID: {
+ incoming.getDestination(BASIC_GET).empty();
+ incoming.removeDestination(BASIC_GET);
+ return;
+ }
+ case BasicDeliverBody::METHOD_ID: {
+ BasicDeliverBody::shared_ptr deliver=
+ boost::shared_polymorphic_downcast<BasicDeliverBody>(method);
+ incoming.openReference(BASIC_REF);
+ Message& msg = incoming.createMessage(
+ deliver->getConsumerTag(), BASIC_REF);
+ msg.setDestination(deliver->getConsumerTag());
+ msg.setDeliveryTag(deliver->getDeliveryTag());
+ msg.setRedelivered(deliver->getRedelivered());
+ return;
+ }
+ case BasicReturnBody::METHOD_ID: {
+ incoming.openReference(BASIC_REF);
+ incoming.createMessage(BASIC_RETURN, BASIC_REF);
+ return;
+ }
+ case BasicConsumeOkBody::METHOD_ID: {
+ Mutex::ScopedLock l(lock);
+ BasicConsumeOkBody::shared_ptr consumeOk =
+ boost::shared_polymorphic_downcast<BasicConsumeOkBody>(method);
+ std::string tag = consumeOk->getConsumerTag();
+ ConsumerMap::iterator i = consumers.find(std::string());
+ if (i != consumers.end()) {
+ // Need to rename the un-named consumer.
+ if (consumers.find(tag) == consumers.end()) {
+ consumers[tag] = i->second;
+ consumers.erase(i);
+ }
+ else // Tag already exists.
+ throw ChannelException(404, "Tag already exists: "+tag);
+ }
+ // FIXME aconway 2007-03-23: Integrate consumer & destination
+ // maps.
+ incoming.addDestination(tag, *destDispatch);
+ return;
+ }
}
throw Channel::UnknownMethod();
}
-void BasicMessageChannel::handle(AMQHeaderBody::shared_ptr body){
- incoming.add(body);
+void BasicMessageChannel::handle(AMQHeaderBody::shared_ptr header) {
+ BasicHeaderProperties* props =
+ boost::polymorphic_downcast<BasicHeaderProperties*>(
+ header->getProperties());
+ IncomingMessage::Reference& ref = incoming.getReference(BASIC_REF);
+ assert (ref.messages.size() == 1);
+ ref.messages.front().BasicHeaderProperties::operator=(*props);
+ incoming_size = header->getContentSize();
+ if (incoming_size==0)
+ incoming.closeReference(BASIC_REF);
}
-void BasicMessageChannel::handle(AMQContentBody::shared_ptr body){
- incoming.add(body);
+void BasicMessageChannel::handle(AMQContentBody::shared_ptr content){
+ incoming.appendReference(BASIC_REF, content->getData());
+ size_t size = incoming.getReference(BASIC_REF).data.size();
+ if (size >= incoming_size) {
+ incoming.closeReference(BASIC_REF);
+ if (size > incoming_size)
+ throw ChannelException(502, "Content exceeded declared size");
+ }
}
void BasicMessageChannel::deliver(Consumer& consumer, Message& msg){
@@ -186,7 +321,9 @@ void BasicMessageChannel::deliver(Consumer& consumer, Message& msg){
consumer.lastDeliveryTag = 0;
channel.send(
new BasicAckBody(
- channel.version, msg.getDeliveryTag(), multiple));
+ channel.version,
+ msg.getDeliveryTag(),
+ multiple));
case NO_ACK: // Nothing to do
case CLIENT_ACK: // User code must ack.
break;
@@ -204,35 +341,32 @@ void BasicMessageChannel::deliver(Consumer& consumer, Message& msg){
void BasicMessageChannel::run() {
while(channel.isOpen()) {
try {
- Message msg = incoming.waitDispatch();
- if(msg.getMethod()->isA<BasicReturnBody>()) {
- ReturnedMessageHandler* handler=0;
- {
- Mutex::ScopedLock l(lock);
- handler=returnsHandler;
- }
- if(handler == 0) {
- // TODO aconway 2007-02-20: proper logging.
- cout << "Message returned: " << msg.getData() << endl;
+ Message msg;
+ bool gotMessge = destDispatch->wait(msg);
+ if (gotMessge) {
+ if(msg.getDestination() == BASIC_RETURN) {
+ ReturnedMessageHandler* handler=0;
+ {
+ Mutex::ScopedLock l(lock);
+ handler=returnsHandler;
+ }
+ if(handler != 0)
+ handler->returned(msg);
}
- else
- handler->returned(msg);
- }
- else {
- BasicDeliverBody::shared_ptr deliverBody =
- boost::shared_polymorphic_downcast<BasicDeliverBody>(
- msg.getMethod());
- std::string tag = deliverBody->getConsumerTag();
- Consumer consumer;
- {
- Mutex::ScopedLock l(lock);
- ConsumerMap::iterator i = consumers.find(tag);
- if(i == consumers.end())
- THROW_QPID_ERROR(PROTOCOL_ERROR+504,
- "Unknown consumer tag=" + tag);
- consumer = i->second;
+ else {
+ Consumer consumer;
+ {
+ Mutex::ScopedLock l(lock);
+ ConsumerMap::iterator i = consumers.find(
+ msg.getDestination());
+ if(i == consumers.end())
+ THROW_QPID_ERROR(PROTOCOL_ERROR+504,
+ "Unknown consumer tag=" +
+ msg.getDestination());
+ consumer = i->second;
+ }
+ deliver(consumer, msg);
}
- deliver(consumer, msg);
}
}
catch (const ShutdownException&) {
@@ -240,8 +374,8 @@ void BasicMessageChannel::run() {
}
catch (const Exception& e) {
// FIXME aconway 2007-02-20: Report exception to user.
- cout << "client::Basic::run() terminated by: " << e.toString()
- << "(" << typeid(e).name() << ")" << endl;
+ cout << "client::BasicMessageChannel::run() terminated by: "
+ << e.toString() << endl;
}
}
}
diff --git a/cpp/lib/client/BasicMessageChannel.h b/cpp/lib/client/BasicMessageChannel.h
index b921ec24d9..aaedfd6bf1 100644
--- a/cpp/lib/client/BasicMessageChannel.h
+++ b/cpp/lib/client/BasicMessageChannel.h
@@ -21,6 +21,7 @@
#include "MessageChannel.h"
#include "IncomingMessage.h"
+#include <boost/scoped_ptr.hpp>
namespace qpid {
namespace client {
@@ -62,13 +63,13 @@ class BasicMessageChannel : public MessageChannel
private:
+ class WaitableDestination;
struct Consumer{
MessageListener* listener;
AckMode ackMode;
int count;
u_int64_t lastDeliveryTag;
};
-
typedef std::map<std::string, Consumer> ConsumerMap;
void deliver(Consumer& consumer, Message& msg);
@@ -76,8 +77,11 @@ class BasicMessageChannel : public MessageChannel
sys::Mutex lock;
Channel& channel;
IncomingMessage incoming;
- ConsumerMap consumers;
+ uint64_t incoming_size;
+ ConsumerMap consumers ;
ReturnedMessageHandler* returnsHandler;
+ boost::scoped_ptr<WaitableDestination> destGet;
+ boost::scoped_ptr<WaitableDestination> destDispatch;
};
}} // namespace qpid::client
diff --git a/cpp/lib/client/ClientChannel.cpp b/cpp/lib/client/ClientChannel.cpp
index 97e0a394d2..98feff9389 100644
--- a/cpp/lib/client/ClientChannel.cpp
+++ b/cpp/lib/client/ClientChannel.cpp
@@ -68,7 +68,8 @@ void Channel::protocolInit(
assert(connection);
responses.expect();
connection->connector->init(); // Send ProtocolInit block.
- responses.receive<ConnectionStartBody>();
+ ConnectionStartBody::shared_ptr connectionStart =
+ responses.receive<ConnectionStartBody>();
FieldTable props;
string mechanism("PLAIN");
@@ -77,7 +78,8 @@ void Channel::protocolInit(
ConnectionTuneBody::shared_ptr proposal =
sendAndReceive<ConnectionTuneBody>(
new ConnectionStartOkBody(
- version, responses.getRequestId(), props, mechanism,
+ version, connectionStart->getRequestId(),
+ props, mechanism,
response, locale));
/**
@@ -89,7 +91,8 @@ void Channel::protocolInit(
**/
send(new ConnectionTuneOkBody(
- version, responses.getRequestId(), proposal->getChannelMax(), connection->getMaxFrameSize(),
+ version, proposal->getRequestId(),
+ proposal->getChannelMax(), connection->getMaxFrameSize(),
proposal->getHeartbeat()));
uint16_t heartbeat = proposal->getHeartbeat();
@@ -102,18 +105,17 @@ void Channel::protocolInit(
send(new ConnectionOpenBody(version, vhost, capabilities, true));
//receive connection.open-ok (or redirect, but ignore that for now
//esp. as using force=true).
- responses.waitForResponse();
- if(responses.validate<ConnectionOpenOkBody>()) {
+ AMQMethodBody::shared_ptr openResponse = responses.receive();
+ if(openResponse->isA<ConnectionOpenOkBody>()) {
//ok
- }else if(responses.validate<ConnectionRedirectBody>()){
+ }else if(openResponse->isA<ConnectionRedirectBody>()){
//ignore for now
ConnectionRedirectBody::shared_ptr redirect(
- shared_polymorphic_downcast<ConnectionRedirectBody>(
- responses.getResponse()));
+ shared_polymorphic_downcast<ConnectionRedirectBody>(openResponse));
cout << "Received redirection to " << redirect->getHost()
<< endl;
} else {
- THROW_QPID_ERROR(PROTOCOL_ERROR, "Bad response");
+ THROW_QPID_ERROR(PROTOCOL_ERROR, "Bad response to Connection.open");
}
}
@@ -121,7 +123,6 @@ bool Channel::isOpen() const { return connection; }
void Channel::setQos() {
messaging->setQos();
- // FIXME aconway 2007-02-22: message
}
void Channel::setPrefetch(uint16_t _prefetch){
@@ -149,18 +150,15 @@ void Channel::deleteExchange(Exchange& exchange, bool synch){
void Channel::declareQueue(Queue& queue, bool synch){
string name = queue.getName();
FieldTable args;
- sendAndReceiveSync<QueueDeclareOkBody>(
- synch,
- new QueueDeclareBody(
- version, 0, name, false/*passive*/, queue.isDurable(),
- queue.isExclusive(), queue.isAutoDelete(), !synch, args));
- if(synch){
- if(queue.getName().length() == 0){
- QueueDeclareOkBody::shared_ptr response =
- shared_polymorphic_downcast<QueueDeclareOkBody>(
- responses.getResponse());
+ QueueDeclareOkBody::shared_ptr response =
+ sendAndReceiveSync<QueueDeclareOkBody>(
+ synch,
+ new QueueDeclareBody(
+ version, 0, name, false/*passive*/, queue.isDurable(),
+ queue.isExclusive(), queue.isAutoDelete(), !synch, args));
+ if(synch) {
+ if(queue.getName().length() == 0)
queue.setName(response->getQueue());
- }
}
}
@@ -191,6 +189,14 @@ void Channel::rollback(){
void Channel::handleMethodInContext(
AMQMethodBody::shared_ptr method, const MethodContext&)
{
+ // TODO aconway 2007-03-23: Special case for consume OK as it
+ // is both an expected response and needs handling in this thread.
+ // Need to review & reationalize the client-side processing model.
+ if (method->isA<BasicConsumeOkBody>()) {
+ messaging->handle(method);
+ responses.signalResponse(method);
+ return;
+ }
if(responses.isWaiting()) {
responses.signalResponse(method);
return;
@@ -204,11 +210,11 @@ void Channel::handleMethodInContext(
}
}
catch (const UnknownMethod&) {
- connection->close(
- 504, "Unknown method",
- method->amqpClassId(), method->amqpMethodId());
- }
-}
+ connection->close(
+ 504, "Unknown method",
+ method->amqpClassId(), method->amqpMethodId());
+ }
+ }
void Channel::handleChannel(AMQMethodBody::shared_ptr method) {
switch (method->amqpMethodId()) {
@@ -272,8 +278,6 @@ void Channel::close(
void Channel::peerClose(ChannelCloseBody::shared_ptr) {
assert(isOpen());
closeInternal();
- // FIXME aconway 2007-01-26: How to throw the proper exception
- // to the application thread?
}
void Channel::closeInternal() {
@@ -287,20 +291,23 @@ void Channel::closeInternal() {
dispatcher.join();
}
-void Channel::sendAndReceive(AMQMethodBody* toSend, ClassId c, MethodId m)
+AMQMethodBody::shared_ptr Channel::sendAndReceive(
+ AMQMethodBody* toSend, ClassId c, MethodId m)
{
responses.expect();
send(toSend);
- responses.receive(c, m);
+ return responses.receive(c, m);
}
-void Channel::sendAndReceiveSync(
+AMQMethodBody::shared_ptr Channel::sendAndReceiveSync(
bool sync, AMQMethodBody* body, ClassId c, MethodId m)
{
if(sync)
- sendAndReceive(body, c, m);
- else
+ return sendAndReceive(body, c, m);
+ else {
send(body);
+ return AMQMethodBody::shared_ptr();
+ }
}
void Channel::consume(
diff --git a/cpp/lib/client/ClientChannel.h b/cpp/lib/client/ClientChannel.h
index 58a007977d..f6fe18da20 100644
--- a/cpp/lib/client/ClientChannel.h
+++ b/cpp/lib/client/ClientChannel.h
@@ -81,24 +81,25 @@ class Channel : public framing::ChannelAdapter
const std::string& uid, const std::string& pwd,
const std::string& vhost);
- void sendAndReceive(
+ framing::AMQMethodBody::shared_ptr sendAndReceive(
framing::AMQMethodBody*, framing::ClassId, framing::MethodId);
- void sendAndReceiveSync(
+ framing::AMQMethodBody::shared_ptr sendAndReceiveSync(
bool sync,
framing::AMQMethodBody*, framing::ClassId, framing::MethodId);
template <class BodyType>
boost::shared_ptr<BodyType> sendAndReceive(framing::AMQMethodBody* body) {
- sendAndReceive(body, BodyType::CLASS_ID, BodyType::METHOD_ID);
return boost::shared_polymorphic_downcast<BodyType>(
- responses.getResponse());
+ sendAndReceive(body, BodyType::CLASS_ID, BodyType::METHOD_ID));
}
- template <class BodyType> void sendAndReceiveSync(
+ template <class BodyType>
+ boost::shared_ptr<BodyType> sendAndReceiveSync(
bool sync, framing::AMQMethodBody* body) {
- sendAndReceiveSync(
- sync, body, BodyType::CLASS_ID, BodyType::METHOD_ID);
+ return boost::shared_polymorphic_downcast<BodyType>(
+ sendAndReceiveSync(
+ sync, body, BodyType::CLASS_ID, BodyType::METHOD_ID));
}
void open(framing::ChannelId, Connection&);
diff --git a/cpp/lib/client/ClientMessage.cpp b/cpp/lib/client/ClientMessage.cpp
deleted file mode 100644
index 8edd0a474d..0000000000
--- a/cpp/lib/client/ClientMessage.cpp
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <ClientMessage.h>
-using namespace qpid::client;
-using namespace qpid::framing;
-
-Message::Message(const std::string& d)
- : header(new AMQHeaderBody(BASIC))
-{
- setData(d);
-}
-
-void Message::setData(const std::string& d) {
- data = d;
-}
-
-Message::Message(AMQHeaderBody::shared_ptr& _header) : header(_header){
-}
-
-Message::~Message(){
-}
-
-BasicHeaderProperties* Message::getHeaderProperties() const {
- return dynamic_cast<BasicHeaderProperties*>(header->getProperties());
-}
-
-std::string Message::getContentType() const {
- return getHeaderProperties()->getContentType();
-}
-
-std::string Message::getContentEncoding() const {
- return getHeaderProperties()->getContentEncoding();
-}
-
-FieldTable& Message::getHeaders() const {
- return getHeaderProperties()->getHeaders();
-}
-
-uint8_t Message::getDeliveryMode() const {
- return getHeaderProperties()->getDeliveryMode();
-}
-
-uint8_t Message::getPriority() const {
- return getHeaderProperties()->getPriority();
-}
-
-std::string Message::getCorrelationId() const {
- return getHeaderProperties()->getCorrelationId();
-}
-
-std::string Message::getReplyTo() const {
- return getHeaderProperties()->getReplyTo();
-}
-
-std::string Message::getExpiration() const {
- return getHeaderProperties()->getExpiration();
-}
-
-std::string Message::getMessageId() const {
- return getHeaderProperties()->getMessageId();
-}
-
-uint64_t Message::getTimestamp() const {
- return getHeaderProperties()->getTimestamp();
-}
-
-std::string Message::getType() const {
- return getHeaderProperties()->getType();
-}
-
-std::string Message::getUserId() const {
- return getHeaderProperties()->getUserId();
-}
-
-std::string Message::getAppId() const {
- return getHeaderProperties()->getAppId();
-}
-
-std::string Message::getClusterId() const {
- return getHeaderProperties()->getClusterId();
-}
-
-void Message::setContentType(const std::string& type){
- getHeaderProperties()->setContentType(type);
-}
-
-void Message::setContentEncoding(const std::string& encoding){
- getHeaderProperties()->setContentEncoding(encoding);
-}
-
-void Message::setHeaders(const FieldTable& headers){
- getHeaderProperties()->setHeaders(headers);
-}
-
-void Message::setDeliveryMode(DeliveryMode mode){
- getHeaderProperties()->setDeliveryMode(mode);
-}
-
-void Message::setPriority(uint8_t priority){
- getHeaderProperties()->setPriority(priority);
-}
-
-void Message::setCorrelationId(const std::string& correlationId){
- getHeaderProperties()->setCorrelationId(correlationId);
-}
-
-void Message::setReplyTo(const std::string& replyTo){
- getHeaderProperties()->setReplyTo(replyTo);
-}
-
-void Message::setExpiration(const std::string& expiration){
- getHeaderProperties()->setExpiration(expiration);
-}
-
-void Message::setMessageId(const std::string& messageId){
- getHeaderProperties()->setMessageId(messageId);
-}
-
-void Message::setTimestamp(uint64_t timestamp){
- getHeaderProperties()->setTimestamp(timestamp);
-}
-
-void Message::setType(const std::string& type){
- getHeaderProperties()->setType(type);
-}
-
-void Message::setUserId(const std::string& userId){
- getHeaderProperties()->setUserId(userId);
-}
-
-void Message::setAppId(const std::string& appId){
- getHeaderProperties()->setAppId(appId);
-}
-
-void Message::setClusterId(const std::string& clusterId){
- getHeaderProperties()->setClusterId(clusterId);
-}
-
-
-uint64_t Message::getDeliveryTag() const {
- BasicDeliverBody* deliver=dynamic_cast<BasicDeliverBody*>(method.get());
- return deliver ? deliver->getDeliveryTag() : 0;
-}
diff --git a/cpp/lib/client/ClientMessage.h b/cpp/lib/client/ClientMessage.h
index c89eeb1a0d..a326c24aed 100644
--- a/cpp/lib/client/ClientMessage.h
+++ b/cpp/lib/client/ClientMessage.h
@@ -22,12 +22,10 @@
*
*/
#include <string>
-#include <framing/amqp_framing.h>
+#include "framing/BasicHeaderProperties.h"
namespace qpid {
-
namespace client {
-class IncomingMessage;
/**
* A representation of messages for sent or recived through the
@@ -35,67 +33,28 @@ class IncomingMessage;
*
* \ingroup clientapi
*/
-class Message {
- framing::AMQMethodBody::shared_ptr method;
- framing::AMQHeaderBody::shared_ptr header;
- std::string data;
- bool redelivered;
-
- // FIXME aconway 2007-02-20: const incorrect, needs const return type.
- framing::BasicHeaderProperties* getHeaderProperties() const;
- Message(qpid::framing::AMQHeaderBody::shared_ptr& header);
-
+class Message : public framing::BasicHeaderProperties {
public:
- enum DeliveryMode { DURABLE=1, NON_DURABLE=2 };
- Message(const std::string& data=std::string());
- ~Message();
+ Message(const std::string& data_=std::string()) : data(data_) {}
std::string getData() const { return data; }
- bool isRedelivered() const { return redelivered; }
- uint64_t getDeliveryTag() const;
- std::string getContentType() const;
- std::string getContentEncoding() const;
- qpid::framing::FieldTable& getHeaders() const;
- uint8_t getDeliveryMode() const;
- uint8_t getPriority() const;
- std::string getCorrelationId() const;
- std::string getReplyTo() const;
- std::string getExpiration() const;
- std::string getMessageId() const;
- uint64_t getTimestamp() const;
- std::string getType() const;
- std::string getUserId() const;
- std::string getAppId() const;
- std::string getClusterId() const;
+ void setData(const std::string& _data) { data = _data; }
- void setData(const std::string& _data);
- void setRedelivered(bool _redelivered){ redelivered = _redelivered; }
- void setContentType(const std::string& type);
- void setContentEncoding(const std::string& encoding);
- void setHeaders(const qpid::framing::FieldTable& headers);
- void setDeliveryMode(DeliveryMode mode);
- void setPriority(uint8_t priority);
- void setCorrelationId(const std::string& correlationId);
- void setReplyTo(const std::string& replyTo);
- void setExpiration(const std::string& expiration);
- void setMessageId(const std::string& messageId);
- void setTimestamp(uint64_t timestamp);
- void setType(const std::string& type);
- void setUserId(const std::string& userId);
- void setAppId(const std::string& appId);
- void setClusterId(const std::string& clusterId);
+ std::string getDestination() const { return destination; }
+ void setDestination(const std::string& dest) { destination = dest; }
- /** Get the method used to deliver this message */
- boost::shared_ptr<framing::AMQMethodBody> getMethod() const
- { return method; }
-
- void setMethod(framing::AMQMethodBody::shared_ptr m) { method=m; }
- boost::shared_ptr<framing::AMQHeaderBody> getHeader() const
- { return header; }
+ // TODO aconway 2007-03-22: only needed for Basic.deliver support.
+ uint64_t getDeliveryTag() const { return deliveryTag; }
+ void setDeliveryTag(uint64_t dt) { deliveryTag = dt; }
- // TODO aconway 2007-02-15: remove friendships.
- friend class IncomingMessage;
- friend class Channel;
+ bool isRedelivered() const { return redelivered; }
+ void setRedelivered(bool _redelivered){ redelivered = _redelivered; }
+
+ private:
+ std::string data;
+ std::string destination;
+ bool redelivered;
+ uint64_t deliveryTag;
};
}}
diff --git a/cpp/lib/client/IncomingMessage.cpp b/cpp/lib/client/IncomingMessage.cpp
index 07f94ceb64..8f69f8c3ef 100644
--- a/cpp/lib/client/IncomingMessage.cpp
+++ b/cpp/lib/client/IncomingMessage.cpp
@@ -18,155 +18,113 @@
* under the License.
*
*/
-#include <IncomingMessage.h>
-#include "framing/AMQHeaderBody.h"
-#include "framing/AMQContentBody.h"
-#include "BasicGetOkBody.h"
-#include "BasicReturnBody.h"
-#include "BasicDeliverBody.h"
-#include <QpidError.h>
-#include <iostream>
+
+#include "IncomingMessage.h"
+#include "Exception.h"
+#include "ClientMessage.h"
+#include <boost/format.hpp>
namespace qpid {
namespace client {
-using namespace sys;
-using namespace framing;
-
-struct IncomingMessage::Guard: public Mutex::ScopedLock {
- Guard(IncomingMessage* im) : Mutex::ScopedLock(im->lock) {
- im->shutdownError.throwIf();
- }
-};
-
-IncomingMessage::IncomingMessage() { reset(); }
+using boost::format;
+using sys::Mutex;
-void IncomingMessage::reset() {
- state = &IncomingMessage::expectRequest;
- endFn= &IncomingMessage::endRequest;
- buildMessage = Message();
-}
-
-void IncomingMessage::startGet() {
- Guard g(this);
- if (state != &IncomingMessage::expectRequest) {
- endGet(new QPID_ERROR(CLIENT_ERROR, "Message already in progress."));
- }
- else {
- state = &IncomingMessage::expectGetOk;
- endFn = &IncomingMessage::endGet;
- getError.reset();
- getState = GETTING;
- }
-}
-
-bool IncomingMessage::waitGet(Message& msg) {
- Guard g(this);
- while (getState == GETTING && !shutdownError && !getError)
- getReady.wait(lock);
- shutdownError.throwIf();
- getError.throwIf();
- msg = getMessage;
- return getState==GOT;
-}
-
-Message IncomingMessage::waitDispatch() {
- Guard g(this);
- while(dispatchQueue.empty() && !shutdownError)
- dispatchReady.wait(lock);
- shutdownError.throwIf();
-
- Message msg(dispatchQueue.front());
- dispatchQueue.pop();
- return msg;
-}
+IncomingMessage::Destination::~Destination() {}
-void IncomingMessage::add(BodyPtr body) {
- Guard g(this);
- shutdownError.throwIf();
- // Call the current state function.
- (this->*state)(body);
-}
-
-void IncomingMessage::shutdown() {
+void IncomingMessage::openReference(const std::string& name) {
Mutex::ScopedLock l(lock);
- shutdownError.reset(new ShutdownException());
- getReady.notify();
- dispatchReady.notify();
+ if (references.find(name) != references.end())
+ throw ChannelException(
+ 406, format("Attempt to open existing reference %s.") % name);
+ references[name];
+ return;
}
-bool IncomingMessage::isShutdown() const {
+void IncomingMessage::appendReference(
+ const std::string& name, const std::string& data)
+{
Mutex::ScopedLock l(lock);
- return shutdownError;
+ getRefUnlocked(name).data += data;
}
-// Common check for all the expect functions. Called in network thread.
-template<class T>
-boost::shared_ptr<T> IncomingMessage::expectCheck(BodyPtr body) {
- boost::shared_ptr<T> ptr = boost::dynamic_pointer_cast<T>(body);
- if (!ptr)
- throw QPID_ERROR(PROTOCOL_ERROR+504, "Unexpected frame type");
- return ptr;
+Message& IncomingMessage::createMessage(
+ const std::string& destination, const std::string& reference)
+{
+ Mutex::ScopedLock l(lock);
+ getDestUnlocked(destination); // Verify destination.
+ Reference& ref = getRefUnlocked(reference);
+ ref.messages.resize(ref.messages.size() +1);
+ ref.messages.back().setDestination(destination);
+ return ref.messages.back();
}
-void IncomingMessage::expectGetOk(BodyPtr body) {
- if (dynamic_cast<BasicGetOkBody*>(body.get()))
- state = &IncomingMessage::expectHeader;
- else if (dynamic_cast<BasicGetEmptyBody*>(body.get())) {
- getState = EMPTY;
- endGet();
+void IncomingMessage::closeReference(const std::string& name) {
+ Reference refCopy;
+ {
+ Mutex::ScopedLock l(lock);
+ refCopy = getRefUnlocked(name);
+ references.erase(name);
+ }
+ for (std::vector<Message>::iterator i = refCopy.messages.begin();
+ i != refCopy.messages.end();
+ ++i)
+ {
+ i->setData(refCopy.data);
+ // TODO aconway 2007-03-23: Thread safety,
+ // can a destination be removed while we're doing this?
+ getDestination(i->getDestination()).message(*i);
}
- else
- throw QPID_ERROR(PROTOCOL_ERROR+504, "Unexpected frame type");
}
-void IncomingMessage::expectHeader(BodyPtr body) {
- AMQHeaderBody::shared_ptr header = expectCheck<AMQHeaderBody>(body);
- buildMessage.header = header;
- state = &IncomingMessage::expectContent;
- checkComplete();
+
+void IncomingMessage::addDestination(std::string name, Destination& dest) {
+ Mutex::ScopedLock l(lock);
+ DestinationMap::iterator i = destinations.find(name);
+ if (i == destinations.end())
+ destinations[name]=&dest;
+ else if (i->second != &dest)
+ throw ChannelException(
+ 404, format("Destination already exists: %s.") % name);
}
-void IncomingMessage::expectContent(BodyPtr body) {
- AMQContentBody::shared_ptr content = expectCheck<AMQContentBody>(body);
- buildMessage.setData(buildMessage.getData() + content->getData());
- checkComplete();
+void IncomingMessage::removeDestination(std::string name) {
+ Mutex::ScopedLock l(lock);
+ DestinationMap::iterator i = destinations.find(name);
+ if (i == destinations.end())
+ throw ChannelException(
+ 406, format("No such destination: %s.") % name);
+ destinations.erase(i);
}
-void IncomingMessage::checkComplete() {
- size_t declaredSize = buildMessage.header->getContentSize();
- size_t currentSize = buildMessage.getData().size();
- if (declaredSize == currentSize)
- (this->*endFn)(0);
- else if (declaredSize < currentSize)
- (this->*endFn)(new QPID_ERROR(
- PROTOCOL_ERROR, "Message content exceeds declared size."));
+IncomingMessage::Destination& IncomingMessage::getDestination(
+ const std::string& name) {
+ return getDestUnlocked(name);
}
-void IncomingMessage::expectRequest(BodyPtr body) {
- AMQMethodBody::shared_ptr method = expectCheck<AMQMethodBody>(body);
- buildMessage.setMethod(method);
- state = &IncomingMessage::expectHeader;
+IncomingMessage::Reference& IncomingMessage::getReference(
+ const std::string& name) {
+ return getRefUnlocked(name);
}
-
-void IncomingMessage::endGet(Exception* ex) {
- getError.reset(ex);
- if (getState == GETTING) {
- getMessage = buildMessage;
- getState = GOT;
- }
- reset();
- getReady.notify();
+
+IncomingMessage::Reference& IncomingMessage::getRefUnlocked(
+ const std::string& name) {
+ Mutex::ScopedLock l(lock);
+ ReferenceMap::iterator i = references.find(name);
+ if (i == references.end())
+ throw ChannelException(
+ 404, format("No such reference: %s.") % name);
+ return i->second;
}
-void IncomingMessage::endRequest(Exception* ex) {
- ExceptionHolder eh(ex);
- if (!eh) {
- dispatchQueue.push(buildMessage);
- reset();
- dispatchReady.notify();
- }
- eh.throwIf();
+IncomingMessage::Destination& IncomingMessage::getDestUnlocked(
+ const std::string& name) {
+ Mutex::ScopedLock l(lock);
+ DestinationMap::iterator i = destinations.find(name);
+ if (i == destinations.end())
+ throw ChannelException(
+ 404, format("No such destination: %s.") % name);
+ return *i->second;
}
}} // namespace qpid::client
diff --git a/cpp/lib/client/IncomingMessage.h b/cpp/lib/client/IncomingMessage.h
index 6ec949028d..d78a90327d 100644
--- a/cpp/lib/client/IncomingMessage.h
+++ b/cpp/lib/client/IncomingMessage.h
@@ -21,96 +21,91 @@
* under the License.
*
*/
-#include <string>
-#include <queue>
-#include <framing/amqp_framing.h>
-#include "ExceptionHolder.h"
-#include "ClientMessage.h"
#include "sys/Mutex.h"
-#include "sys/Condition.h"
+#include <map>
+#include <vector>
+
namespace qpid {
+namespace client {
-namespace framing {
-class AMQBody;
-}
+class Message;
-namespace client {
/**
- * Accumulates incoming message frames into messages.
- * Client-initiated messages (basic.get) are initiated and made
- * available to the user thread one at a time.
- *
- * Broker initiated messages (basic.return, basic.deliver) are
- * queued for handling by the user dispatch thread.
- *
+ * Manage incoming messages.
+ *
+ * Uses reference and destination concepts from 0-9 Messsage class.
+ *
+ * Basic messages use special destination and reference names to indicate
+ * get-ok, return etc. messages.
+ *
*/
class IncomingMessage {
public:
- typedef boost::shared_ptr<framing::AMQBody> BodyPtr;
- IncomingMessage();
-
- /** Expect a new message starting with getOk. Called in user thread.*/
- void startGet();
+ /** Accumulate data associated with a set of messages. */
+ struct Reference {
+ std::string data;
+ std::vector<Message> messages;
+ };
- /** Wait for the message to complete, return the message.
- * Called in user thread.
- *@raises QpidError if there was an error.
- */
- bool waitGet(Message&);
+ /** Interface to a destination for messages. */
+ class Destination {
+ public:
+ virtual ~Destination();
- /** Wait for the next broker-initiated message. */
- Message waitDispatch();
+ /** Pass a message to the destination */
+ virtual void message(const Message&) = 0;
- /** Add a frame body to the message. Called in network thread. */
- void add(BodyPtr);
+ /** Notify destination of queue-empty contition */
+ virtual void empty() = 0;
+ };
- /** Shut down: all further calls to any function throw ex. */
- void shutdown();
- /** Check if shutdown */
- bool isShutdown() const;
+ /** Add a reference. Throws if already open. */
+ void openReference(const std::string& name);
- private:
+ /** Get a reference. Throws if not already open. */
+ void appendReference(const std::string& name,
+ const std::string& data);
- typedef void (IncomingMessage::* ExpectFn)(BodyPtr);
- typedef void (IncomingMessage::* EndFn)(Exception*);
- typedef std::queue<Message> MessageQueue;
- struct Guard;
- friend struct Guard;
+ /** Create a message to destination associated with reference
+ *@exception if destination or reference non-existent.
+ */
+ Message& createMessage(const std::string& destination,
+ const std::string& reference);
- void reset();
- template <class T> boost::shared_ptr<T> expectCheck(BodyPtr);
+ /** Get a reference.
+ *@exception if non-existent.
+ */
+ Reference& getReference(const std::string& name);
+
+ /** Close a reference and deliver all its messages.
+ * Throws if not open or a message has an invalid destination.
+ */
+ void closeReference(const std::string& name);
- // State functions - a state machine where each state is
- // a member function that processes a frame body.
- void expectGetOk(BodyPtr);
- void expectHeader(BodyPtr);
- void expectContent(BodyPtr);
- void expectRequest(BodyPtr);
+ /** Add a destination.
+ *@exception if a different Destination is already registered
+ * under name.
+ */
+ void addDestination(std::string name, Destination&);
+
+ /** Remove a destination. Throws if does not exist */
+ void removeDestination(std::string name);
- // End functions.
- void endGet(Exception* ex = 0);
- void endRequest(Exception* ex);
+ /** Get a destination. Throws if does not exist */
+ Destination& getDestination(const std::string& name);
+ private:
- // Check for complete message.
- void checkComplete();
+ typedef std::map<std::string, Reference> ReferenceMap;
+ typedef std::map<std::string, Destination*> DestinationMap;
+ Reference& getRefUnlocked(const std::string& name);
+ Destination& getDestUnlocked(const std::string& name);
+
mutable sys::Mutex lock;
- ExpectFn state;
- EndFn endFn;
- Message buildMessage;
- ExceptionHolder shutdownError;
-
- // For basic.get messages.
- sys::Condition getReady;
- ExceptionHolder getError;
- Message getMessage;
- enum { GETTING, GOT, EMPTY } getState;
-
- // For broker-initiated messages
- sys::Condition dispatchReady;
- MessageQueue dispatchQueue;
+ ReferenceMap references;
+ DestinationMap destinations;
};
}}
diff --git a/cpp/lib/client/Makefile.am b/cpp/lib/client/Makefile.am
index 46d8775072..6dbe195232 100644
--- a/cpp/lib/client/Makefile.am
+++ b/cpp/lib/client/Makefile.am
@@ -12,7 +12,6 @@ libqpidclient_la_LDFLAGS = -version-info $(LIBTOOL_VERSION_INFO_ARG)
libqpidclient_la_SOURCES = \
ClientChannel.cpp \
ClientExchange.cpp \
- ClientMessage.cpp \
ClientQueue.cpp \
BasicMessageChannel.cpp \
Connection.cpp \
diff --git a/cpp/lib/client/ResponseHandler.cpp b/cpp/lib/client/ResponseHandler.cpp
index 4498de41ae..926a9ce336 100644
--- a/cpp/lib/client/ResponseHandler.cpp
+++ b/cpp/lib/client/ResponseHandler.cpp
@@ -18,12 +18,10 @@
* under the License.
*
*/
-#include <boost/format.hpp>
-
-#include <ResponseHandler.h>
-#include <sys/Monitor.h>
#include <QpidError.h>
-#include "amqp_types.h"
+#include <boost/format.hpp>
+#include "ResponseHandler.h"
+#include "AMQMethodBody.h"
using namespace qpid::sys;
using namespace qpid::framing;
@@ -31,56 +29,51 @@ using namespace qpid::framing;
namespace qpid {
namespace client {
-ResponseHandler::ResponseHandler() : waiting(false){}
+ResponseHandler::ResponseHandler() : waiting(false), shutdownFlag(false) {}
ResponseHandler::~ResponseHandler(){}
-bool ResponseHandler::validate(ClassId c, MethodId m) {
- return response != 0 &&
- response->amqpClassId() ==c && response->amqpMethodId() == m;
+bool ResponseHandler::isWaiting() {
+ Monitor::ScopedLock l(monitor);
+ return waiting;
}
-void ResponseHandler::waitForResponse(){
+void ResponseHandler::expect(){
Monitor::ScopedLock l(monitor);
- while (waiting)
- monitor.wait();
+ waiting = true;
}
-void ResponseHandler::signalResponse(
- qpid::framing::AMQMethodBody::shared_ptr _response)
+void ResponseHandler::signalResponse(MethodPtr _response)
{
Monitor::ScopedLock l(monitor);
response = _response;
+ if (!response)
+ shutdownFlag=true;
waiting = false;
monitor.notify();
}
-void ResponseHandler::receive(ClassId c, MethodId m) {
+ResponseHandler::MethodPtr ResponseHandler::receive() {
Monitor::ScopedLock l(monitor);
- while (waiting)
+ while (!response && !shutdownFlag)
monitor.wait();
- getResponse(); // Check for closed.
- if(!validate(response->amqpClassId(), response->amqpMethodId())) {
+ if (shutdownFlag)
+ THROW_QPID_ERROR(
+ PROTOCOL_ERROR, "Channel closed unexpectedly.");
+ MethodPtr result = response;
+ response.reset();
+ return result;
+}
+
+ResponseHandler::MethodPtr ResponseHandler::receive(ClassId c, MethodId m) {
+ MethodPtr response = receive();
+ if(c != response->amqpClassId() || m != response->amqpMethodId()) {
THROW_QPID_ERROR(
PROTOCOL_ERROR,
boost::format("Expected class:method %d:%d, got %d:%d")
% c % m % response->amqpClassId() % response->amqpMethodId());
}
-}
-
-framing::AMQMethodBody::shared_ptr ResponseHandler::getResponse() {
- if (!response)
- THROW_QPID_ERROR(
- PROTOCOL_ERROR, "Channel closed unexpectedly.");
return response;
}
-RequestId ResponseHandler::getRequestId() {
- assert(response->getRequestId());
- return response->getRequestId();
-}
-void ResponseHandler::expect(){
- waiting = true;
-}
-
}} // namespace qpid::client
diff --git a/cpp/lib/client/ResponseHandler.h b/cpp/lib/client/ResponseHandler.h
index d28048c3d3..500166144d 100644
--- a/cpp/lib/client/ResponseHandler.h
+++ b/cpp/lib/client/ResponseHandler.h
@@ -18,51 +18,58 @@
* under the License.
*
*/
-#include <string>
-
-#include <framing/amqp_framing.h> // FIXME aconway 2007-02-01: #include cleanup.
+#include "shared_ptr.h"
#include <sys/Monitor.h>
#ifndef _ResponseHandler_
#define _ResponseHandler_
namespace qpid {
+
+namespace framing {
+class AMQMethodBody;
+}
+
namespace client {
/**
* Holds a response from the broker peer for the client.
*/
class ResponseHandler{
+ typedef shared_ptr<framing::AMQMethodBody> MethodPtr;
bool waiting;
- qpid::framing::AMQMethodBody::shared_ptr response;
- qpid::sys::Monitor monitor;
+ bool shutdownFlag;
+ MethodPtr response;
+ sys::Monitor monitor;
public:
ResponseHandler();
~ResponseHandler();
-
- bool isWaiting(){ return waiting; }
- framing::AMQMethodBody::shared_ptr getResponse();
- void waitForResponse();
-
- void signalResponse(framing::AMQMethodBody::shared_ptr response);
- void expect();//must be called before calling receive
- bool validate(framing::ClassId, framing::MethodId);
- void receive(framing::ClassId, framing::MethodId);
+ /** Is a response expected? */
+ bool isWaiting();
- framing::RequestId getRequestId();
+ /** Provide a response to the waiting thread */
+ void signalResponse(MethodPtr response);
- template <class BodyType> bool validate() {
- return validate(BodyType::CLASS_ID, BodyType::METHOD_ID);
- }
- template <class BodyType> void receive() {
- receive(BodyType::CLASS_ID, BodyType::METHOD_ID);
+ /** Indicate a message is expected. */
+ void expect();
+
+ /** Wait for a response. */
+ MethodPtr receive();
+
+ /** Wait for a specific response. */
+ MethodPtr receive(framing::ClassId, framing::MethodId);
+
+ /** Template version of receive returns typed pointer. */
+ template <class BodyType>
+ shared_ptr<BodyType> receive() {
+ return shared_polymorphic_downcast<BodyType>(
+ receive(BodyType::CLASS_ID, BodyType::METHOD_ID));
}
};
-}
-}
+}}
#endif