summaryrefslogtreecommitdiff
path: root/cpp/lib
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/lib')
-rw-r--r--cpp/lib/broker/BrokerMessage.cpp19
-rw-r--r--cpp/lib/broker/BrokerMessageBase.h4
-rw-r--r--cpp/lib/broker/BrokerQueue.cpp7
-rw-r--r--cpp/lib/broker/InMemoryContent.cpp1
-rw-r--r--cpp/lib/client/BasicMessageChannel.cpp276
-rw-r--r--cpp/lib/client/BasicMessageChannel.h8
-rw-r--r--cpp/lib/client/ClientChannel.cpp73
-rw-r--r--cpp/lib/client/ClientChannel.h15
-rw-r--r--cpp/lib/client/ClientMessage.cpp161
-rw-r--r--cpp/lib/client/ClientMessage.h75
-rw-r--r--cpp/lib/client/IncomingMessage.cpp204
-rw-r--r--cpp/lib/client/IncomingMessage.h131
-rw-r--r--cpp/lib/client/Makefile.am1
-rw-r--r--cpp/lib/client/ResponseHandler.cpp57
-rw-r--r--cpp/lib/client/ResponseHandler.h51
-rw-r--r--cpp/lib/common/framing/BasicHeaderProperties.cpp4
-rw-r--r--cpp/lib/common/framing/BasicHeaderProperties.h136
-rw-r--r--cpp/lib/common/framing/MethodContext.h5
-rw-r--r--cpp/lib/common/shared_ptr.h7
-rw-r--r--cpp/lib/common/sys/ProducerConsumer.cpp20
-rw-r--r--cpp/lib/common/sys/ProducerConsumer.h20
-rw-r--r--cpp/lib/common/sys/ThreadSafeQueue.h8
22 files changed, 592 insertions, 691 deletions
diff --git a/cpp/lib/broker/BrokerMessage.cpp b/cpp/lib/broker/BrokerMessage.cpp
index 91ba3dfec0..fc61cd2296 100644
--- a/cpp/lib/broker/BrokerMessage.cpp
+++ b/cpp/lib/broker/BrokerMessage.cpp
@@ -49,14 +49,6 @@ BasicMessage::BasicMessage(
size(0)
{}
-// FIXME aconway 2007-02-01: remove.
-// BasicMessage::BasicMessage(Buffer& buffer, bool headersOnly, uint32_t contentChunkSize) :
-// publisher(0), size(0)
-// {
-
-// decode(buffer, headersOnly, contentChunkSize);
-// }
-
// For tests only.
BasicMessage::BasicMessage() : size(0)
{}
@@ -227,12 +219,13 @@ void BasicMessage::releaseContent(MessageStore* store)
store->stage(this);
}
if (!content.get() || content->size() > 0) {
- // FIXME aconway 2007-02-07: handle MessageMessage.
- //set content to lazy loading mode (but only if there is stored content):
+ //set content to lazy loading mode (but only if there is
+ //stored content):
- //Note: the LazyLoadedContent instance contains a raw pointer to the message, however it is
- // then set as a member of that message so its lifetime is guaranteed to be no longer than
- // that of the message itself
+ //Note: the LazyLoadedContent instance contains a raw pointer
+ //to the message, however it is then set as a member of that
+ //message so its lifetime is guaranteed to be no longer than
+ //that of the message itself
content = std::auto_ptr<Content>(
new LazyLoadedContent(store, this, expectedContentSize()));
}
diff --git a/cpp/lib/broker/BrokerMessageBase.h b/cpp/lib/broker/BrokerMessageBase.h
index 7739ab19e0..4989cccdd3 100644
--- a/cpp/lib/broker/BrokerMessageBase.h
+++ b/cpp/lib/broker/BrokerMessageBase.h
@@ -110,10 +110,6 @@ class Message {
virtual bool isComplete() = 0;
virtual uint64_t contentSize() const = 0;
- // FIXME aconway 2007-02-06: Get rid of BasicHeaderProperties
- // at this level. Expose only generic properties available from both
- // message types (e.g. getApplicationHeaders below).
- //
virtual framing::BasicHeaderProperties* getHeaderProperties() = 0;
virtual const framing::FieldTable& getApplicationHeaders() = 0;
virtual bool isPersistent() = 0;
diff --git a/cpp/lib/broker/BrokerQueue.cpp b/cpp/lib/broker/BrokerQueue.cpp
index 31309bd6c5..b65e8e3a9a 100644
--- a/cpp/lib/broker/BrokerQueue.cpp
+++ b/cpp/lib/broker/BrokerQueue.cpp
@@ -234,10 +234,9 @@ void Queue::create(const FieldTable& settings)
void Queue::configure(const FieldTable& settings)
{
- QueuePolicy* _policy = new QueuePolicy(settings);
- if (_policy->getMaxCount() || _policy->getMaxSize()) {
- setPolicy(std::auto_ptr<QueuePolicy>(_policy));
- }
+ std::auto_ptr<QueuePolicy> _policy(new QueuePolicy(settings));
+ if (_policy->getMaxCount() || _policy->getMaxSize())
+ setPolicy(_policy);
}
void Queue::destroy()
diff --git a/cpp/lib/broker/InMemoryContent.cpp b/cpp/lib/broker/InMemoryContent.cpp
index 3e4ac29486..237375e860 100644
--- a/cpp/lib/broker/InMemoryContent.cpp
+++ b/cpp/lib/broker/InMemoryContent.cpp
@@ -40,7 +40,6 @@ uint32_t InMemoryContent::size()
return sum;
}
-// FIXME aconway 2007-02-01: Remove version parameter.
void InMemoryContent::send(ChannelAdapter& channel, uint32_t framesize)
{
for (content_iterator i = content.begin(); i != content.end(); i++) {
diff --git a/cpp/lib/client/BasicMessageChannel.cpp b/cpp/lib/client/BasicMessageChannel.cpp
index 012a55b9ea..d6929965ee 100644
--- a/cpp/lib/client/BasicMessageChannel.cpp
+++ b/cpp/lib/client/BasicMessageChannel.cpp
@@ -15,7 +15,6 @@
* limitations under the License.
*
*/
-#include <iostream>
#include "BasicMessageChannel.h"
#include "AMQMethodBody.h"
#include "ClientChannel.h"
@@ -23,39 +22,93 @@
#include "MessageListener.h"
#include "framing/FieldTable.h"
#include "Connection.h"
-
-using namespace std;
+#include <queue>
+#include <iostream>
+#include <boost/format.hpp>
+#include <boost/variant.hpp>
namespace qpid {
namespace client {
+using namespace std;
using namespace sys;
using namespace framing;
+using boost::format;
+
+namespace {
+
+// Destination name constants
+const std::string BASIC_GET("__basic_get__");
+const std::string BASIC_RETURN("__basic_return__");
+
+// Reference name constant
+const std::string BASIC_REF("__basic_reference__");
+}
+
+class BasicMessageChannel::WaitableDestination :
+ public IncomingMessage::Destination
+{
+ public:
+ WaitableDestination() : shutdownFlag(false) {}
+ void message(const Message& msg) {
+ Mutex::ScopedLock l(monitor);
+ queue.push(msg);
+ monitor.notify();
+ }
+
+ void empty() {
+ Mutex::ScopedLock l(monitor);
+ queue.push(Empty());
+ monitor.notify();
+ }
+
+ bool wait(Message& msgOut) {
+ Mutex::ScopedLock l(monitor);
+ while (queue.empty() && !shutdownFlag)
+ monitor.wait();
+ if (shutdownFlag)
+ return false;
+ Message* msg = boost::get<Message>(&queue.front());
+ bool success = msg;
+ if (success)
+ msgOut=*msg;
+ queue.pop();
+ if (!queue.empty())
+ monitor.notify(); // Wake another waiter.
+ return success;
+ }
+
+ void shutdown() {
+ Mutex::ScopedLock l(monitor);
+ shutdownFlag = true;
+ monitor.notifyAll();
+ }
+
+ private:
+ struct Empty {};
+ typedef boost::variant<Message,Empty> Item;
+ sys::Monitor monitor;
+ std::queue<Item> queue;
+ bool shutdownFlag;
+};
+
BasicMessageChannel::BasicMessageChannel(Channel& ch)
- : channel(ch), returnsHandler(0) {}
+ : channel(ch), returnsHandler(0),
+ destGet(new WaitableDestination()),
+ destDispatch(new WaitableDestination())
+{
+ incoming.addDestination(BASIC_RETURN, *destDispatch);
+}
void BasicMessageChannel::consume(
Queue& queue, std::string& tag, MessageListener* listener,
AckMode ackMode, bool noLocal, bool synch, const FieldTable* fields)
{
- channel.sendAndReceiveSync<BasicConsumeOkBody>(
- synch,
- new BasicConsumeBody(
- channel.version, 0, queue.getName(), tag, noLocal,
- ackMode == NO_ACK, false, !synch,
- fields ? *fields : FieldTable()));
- if (synch) {
- BasicConsumeOkBody::shared_ptr response =
- boost::shared_polymorphic_downcast<BasicConsumeOkBody>(
- channel.responses.getResponse());
- tag = response->getConsumerTag();
- }
- // FIXME aconway 2007-02-20: Race condition!
- // We could receive the first message for the consumer
- // before we create the consumer below.
- // Move consumer creation to handler for BasicConsumeOkBody
{
+ // Note we create a consumer even if tag="". In that case
+ // It will be renamed when we handle BasicConsumeOkBody.
+ //
Mutex::ScopedLock l(lock);
ConsumerMap::iterator i = consumers.find(tag);
if (i != consumers.end())
@@ -66,6 +119,23 @@ void BasicMessageChannel::consume(
c.ackMode = ackMode;
c.lastDeliveryTag = 0;
}
+
+ // FIXME aconway 2007-03-23: get processed in both.
+
+ // BasicConsumeOkBody is really processed in handle(), here
+ // we just pick up the tag to return to the user.
+ //
+ // We can't process it here because messages for the consumer may
+ // already be arriving.
+ //
+ BasicConsumeOkBody::shared_ptr ok =
+ channel.sendAndReceiveSync<BasicConsumeOkBody>(
+ synch,
+ new BasicConsumeBody(
+ channel.version, 0, queue.getName(), tag, noLocal,
+ ackMode == NO_ACK, false, !synch,
+ fields ? *fields : FieldTable()));
+ tag = ok->getConsumerTag();
}
@@ -92,6 +162,8 @@ void BasicMessageChannel::close(){
consumersCopy = consumers;
consumers.clear();
}
+ destGet->shutdown();
+ destDispatch->shutdown();
for (ConsumerMap::iterator i=consumersCopy.begin();
i != consumersCopy.end(); ++i)
{
@@ -102,28 +174,37 @@ void BasicMessageChannel::close(){
channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true));
}
}
- incoming.shutdown();
}
-
-bool BasicMessageChannel::get(Message& msg, const Queue& queue, AckMode ackMode) {
- // Expect a message starting with a BasicGetOk
- incoming.startGet();
- channel.send(new BasicGetBody(channel.version, 0, queue.getName(), ackMode));
- return incoming.waitGet(msg);
+bool BasicMessageChannel::get(
+ Message& msg, const Queue& queue, AckMode ackMode)
+{
+ // Prepare for incoming response
+ incoming.addDestination(BASIC_GET, *destGet);
+ channel.send(
+ new BasicGetBody(channel.version, 0, queue.getName(), ackMode));
+ bool got = destGet->wait(msg);
+ return got;
}
void BasicMessageChannel::publish(
const Message& msg, const Exchange& exchange,
const std::string& routingKey, bool mandatory, bool immediate)
{
- msg.getHeader()->setContentSize(msg.getData().size());
const string e = exchange.getName();
string key = routingKey;
- channel.send(new BasicPublishBody(channel.version, 0, e, key, mandatory, immediate));
- //break msg up into header frame and content frame(s) and send these
- channel.send(msg.getHeader());
+
+ // Make a header for the message
+ AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
+ BasicHeaderProperties::copy(
+ *static_cast<BasicHeaderProperties*>(header->getProperties()), msg);
+ header->setContentSize(msg.getData().size());
+
+ channel.send(
+ new BasicPublishBody(
+ channel.version, 0, e, key, mandatory, immediate));
+ channel.send(header);
string data = msg.getData();
u_int64_t data_length = data.length();
if(data_length > 0){
@@ -149,22 +230,76 @@ void BasicMessageChannel::publish(
void BasicMessageChannel::handle(boost::shared_ptr<AMQMethodBody> method) {
assert(method->amqpClassId() ==BasicGetBody::CLASS_ID);
switch(method->amqpMethodId()) {
- case BasicDeliverBody::METHOD_ID:
- case BasicReturnBody::METHOD_ID:
- case BasicGetOkBody::METHOD_ID:
- case BasicGetEmptyBody::METHOD_ID:
- incoming.add(method);
- return;
+ case BasicGetOkBody::METHOD_ID: {
+ incoming.openReference(BASIC_REF);
+ incoming.createMessage(BASIC_GET, BASIC_REF);
+ return;
+ }
+ case BasicGetEmptyBody::METHOD_ID: {
+ incoming.getDestination(BASIC_GET).empty();
+ incoming.removeDestination(BASIC_GET);
+ return;
+ }
+ case BasicDeliverBody::METHOD_ID: {
+ BasicDeliverBody::shared_ptr deliver=
+ boost::shared_polymorphic_downcast<BasicDeliverBody>(method);
+ incoming.openReference(BASIC_REF);
+ Message& msg = incoming.createMessage(
+ deliver->getConsumerTag(), BASIC_REF);
+ msg.setDestination(deliver->getConsumerTag());
+ msg.setDeliveryTag(deliver->getDeliveryTag());
+ msg.setRedelivered(deliver->getRedelivered());
+ return;
+ }
+ case BasicReturnBody::METHOD_ID: {
+ incoming.openReference(BASIC_REF);
+ incoming.createMessage(BASIC_RETURN, BASIC_REF);
+ return;
+ }
+ case BasicConsumeOkBody::METHOD_ID: {
+ Mutex::ScopedLock l(lock);
+ BasicConsumeOkBody::shared_ptr consumeOk =
+ boost::shared_polymorphic_downcast<BasicConsumeOkBody>(method);
+ std::string tag = consumeOk->getConsumerTag();
+ ConsumerMap::iterator i = consumers.find(std::string());
+ if (i != consumers.end()) {
+ // Need to rename the un-named consumer.
+ if (consumers.find(tag) == consumers.end()) {
+ consumers[tag] = i->second;
+ consumers.erase(i);
+ }
+ else // Tag already exists.
+ throw ChannelException(404, "Tag already exists: "+tag);
+ }
+ // FIXME aconway 2007-03-23: Integrate consumer & destination
+ // maps.
+ incoming.addDestination(tag, *destDispatch);
+ return;
+ }
}
throw Channel::UnknownMethod();
}
-void BasicMessageChannel::handle(AMQHeaderBody::shared_ptr body){
- incoming.add(body);
+void BasicMessageChannel::handle(AMQHeaderBody::shared_ptr header) {
+ BasicHeaderProperties* props =
+ boost::polymorphic_downcast<BasicHeaderProperties*>(
+ header->getProperties());
+ IncomingMessage::Reference& ref = incoming.getReference(BASIC_REF);
+ assert (ref.messages.size() == 1);
+ ref.messages.front().BasicHeaderProperties::operator=(*props);
+ incoming_size = header->getContentSize();
+ if (incoming_size==0)
+ incoming.closeReference(BASIC_REF);
}
-void BasicMessageChannel::handle(AMQContentBody::shared_ptr body){
- incoming.add(body);
+void BasicMessageChannel::handle(AMQContentBody::shared_ptr content){
+ incoming.appendReference(BASIC_REF, content->getData());
+ size_t size = incoming.getReference(BASIC_REF).data.size();
+ if (size >= incoming_size) {
+ incoming.closeReference(BASIC_REF);
+ if (size > incoming_size)
+ throw ChannelException(502, "Content exceeded declared size");
+ }
}
void BasicMessageChannel::deliver(Consumer& consumer, Message& msg){
@@ -186,7 +321,9 @@ void BasicMessageChannel::deliver(Consumer& consumer, Message& msg){
consumer.lastDeliveryTag = 0;
channel.send(
new BasicAckBody(
- channel.version, msg.getDeliveryTag(), multiple));
+ channel.version,
+ msg.getDeliveryTag(),
+ multiple));
case NO_ACK: // Nothing to do
case CLIENT_ACK: // User code must ack.
break;
@@ -204,35 +341,32 @@ void BasicMessageChannel::deliver(Consumer& consumer, Message& msg){
void BasicMessageChannel::run() {
while(channel.isOpen()) {
try {
- Message msg = incoming.waitDispatch();
- if(msg.getMethod()->isA<BasicReturnBody>()) {
- ReturnedMessageHandler* handler=0;
- {
- Mutex::ScopedLock l(lock);
- handler=returnsHandler;
- }
- if(handler == 0) {
- // TODO aconway 2007-02-20: proper logging.
- cout << "Message returned: " << msg.getData() << endl;
+ Message msg;
+ bool gotMessge = destDispatch->wait(msg);
+ if (gotMessge) {
+ if(msg.getDestination() == BASIC_RETURN) {
+ ReturnedMessageHandler* handler=0;
+ {
+ Mutex::ScopedLock l(lock);
+ handler=returnsHandler;
+ }
+ if(handler != 0)
+ handler->returned(msg);
}
- else
- handler->returned(msg);
- }
- else {
- BasicDeliverBody::shared_ptr deliverBody =
- boost::shared_polymorphic_downcast<BasicDeliverBody>(
- msg.getMethod());
- std::string tag = deliverBody->getConsumerTag();
- Consumer consumer;
- {
- Mutex::ScopedLock l(lock);
- ConsumerMap::iterator i = consumers.find(tag);
- if(i == consumers.end())
- THROW_QPID_ERROR(PROTOCOL_ERROR+504,
- "Unknown consumer tag=" + tag);
- consumer = i->second;
+ else {
+ Consumer consumer;
+ {
+ Mutex::ScopedLock l(lock);
+ ConsumerMap::iterator i = consumers.find(
+ msg.getDestination());
+ if(i == consumers.end())
+ THROW_QPID_ERROR(PROTOCOL_ERROR+504,
+ "Unknown consumer tag=" +
+ msg.getDestination());
+ consumer = i->second;
+ }
+ deliver(consumer, msg);
}
- deliver(consumer, msg);
}
}
catch (const ShutdownException&) {
@@ -240,8 +374,8 @@ void BasicMessageChannel::run() {
}
catch (const Exception& e) {
// FIXME aconway 2007-02-20: Report exception to user.
- cout << "client::Basic::run() terminated by: " << e.toString()
- << "(" << typeid(e).name() << ")" << endl;
+ cout << "client::BasicMessageChannel::run() terminated by: "
+ << e.toString() << endl;
}
}
}
diff --git a/cpp/lib/client/BasicMessageChannel.h b/cpp/lib/client/BasicMessageChannel.h
index b921ec24d9..aaedfd6bf1 100644
--- a/cpp/lib/client/BasicMessageChannel.h
+++ b/cpp/lib/client/BasicMessageChannel.h
@@ -21,6 +21,7 @@
#include "MessageChannel.h"
#include "IncomingMessage.h"
+#include <boost/scoped_ptr.hpp>
namespace qpid {
namespace client {
@@ -62,13 +63,13 @@ class BasicMessageChannel : public MessageChannel
private:
+ class WaitableDestination;
struct Consumer{
MessageListener* listener;
AckMode ackMode;
int count;
u_int64_t lastDeliveryTag;
};
-
typedef std::map<std::string, Consumer> ConsumerMap;
void deliver(Consumer& consumer, Message& msg);
@@ -76,8 +77,11 @@ class BasicMessageChannel : public MessageChannel
sys::Mutex lock;
Channel& channel;
IncomingMessage incoming;
- ConsumerMap consumers;
+ uint64_t incoming_size;
+ ConsumerMap consumers ;
ReturnedMessageHandler* returnsHandler;
+ boost::scoped_ptr<WaitableDestination> destGet;
+ boost::scoped_ptr<WaitableDestination> destDispatch;
};
}} // namespace qpid::client
diff --git a/cpp/lib/client/ClientChannel.cpp b/cpp/lib/client/ClientChannel.cpp
index 97e0a394d2..98feff9389 100644
--- a/cpp/lib/client/ClientChannel.cpp
+++ b/cpp/lib/client/ClientChannel.cpp
@@ -68,7 +68,8 @@ void Channel::protocolInit(
assert(connection);
responses.expect();
connection->connector->init(); // Send ProtocolInit block.
- responses.receive<ConnectionStartBody>();
+ ConnectionStartBody::shared_ptr connectionStart =
+ responses.receive<ConnectionStartBody>();
FieldTable props;
string mechanism("PLAIN");
@@ -77,7 +78,8 @@ void Channel::protocolInit(
ConnectionTuneBody::shared_ptr proposal =
sendAndReceive<ConnectionTuneBody>(
new ConnectionStartOkBody(
- version, responses.getRequestId(), props, mechanism,
+ version, connectionStart->getRequestId(),
+ props, mechanism,
response, locale));
/**
@@ -89,7 +91,8 @@ void Channel::protocolInit(
**/
send(new ConnectionTuneOkBody(
- version, responses.getRequestId(), proposal->getChannelMax(), connection->getMaxFrameSize(),
+ version, proposal->getRequestId(),
+ proposal->getChannelMax(), connection->getMaxFrameSize(),
proposal->getHeartbeat()));
uint16_t heartbeat = proposal->getHeartbeat();
@@ -102,18 +105,17 @@ void Channel::protocolInit(
send(new ConnectionOpenBody(version, vhost, capabilities, true));
//receive connection.open-ok (or redirect, but ignore that for now
//esp. as using force=true).
- responses.waitForResponse();
- if(responses.validate<ConnectionOpenOkBody>()) {
+ AMQMethodBody::shared_ptr openResponse = responses.receive();
+ if(openResponse->isA<ConnectionOpenOkBody>()) {
//ok
- }else if(responses.validate<ConnectionRedirectBody>()){
+ }else if(openResponse->isA<ConnectionRedirectBody>()){
//ignore for now
ConnectionRedirectBody::shared_ptr redirect(
- shared_polymorphic_downcast<ConnectionRedirectBody>(
- responses.getResponse()));
+ shared_polymorphic_downcast<ConnectionRedirectBody>(openResponse));
cout << "Received redirection to " << redirect->getHost()
<< endl;
} else {
- THROW_QPID_ERROR(PROTOCOL_ERROR, "Bad response");
+ THROW_QPID_ERROR(PROTOCOL_ERROR, "Bad response to Connection.open");
}
}
@@ -121,7 +123,6 @@ bool Channel::isOpen() const { return connection; }
void Channel::setQos() {
messaging->setQos();
- // FIXME aconway 2007-02-22: message
}
void Channel::setPrefetch(uint16_t _prefetch){
@@ -149,18 +150,15 @@ void Channel::deleteExchange(Exchange& exchange, bool synch){
void Channel::declareQueue(Queue& queue, bool synch){
string name = queue.getName();
FieldTable args;
- sendAndReceiveSync<QueueDeclareOkBody>(
- synch,
- new QueueDeclareBody(
- version, 0, name, false/*passive*/, queue.isDurable(),
- queue.isExclusive(), queue.isAutoDelete(), !synch, args));
- if(synch){
- if(queue.getName().length() == 0){
- QueueDeclareOkBody::shared_ptr response =
- shared_polymorphic_downcast<QueueDeclareOkBody>(
- responses.getResponse());
+ QueueDeclareOkBody::shared_ptr response =
+ sendAndReceiveSync<QueueDeclareOkBody>(
+ synch,
+ new QueueDeclareBody(
+ version, 0, name, false/*passive*/, queue.isDurable(),
+ queue.isExclusive(), queue.isAutoDelete(), !synch, args));
+ if(synch) {
+ if(queue.getName().length() == 0)
queue.setName(response->getQueue());
- }
}
}
@@ -191,6 +189,14 @@ void Channel::rollback(){
void Channel::handleMethodInContext(
AMQMethodBody::shared_ptr method, const MethodContext&)
{
+ // TODO aconway 2007-03-23: Special case for consume OK as it
+ // is both an expected response and needs handling in this thread.
+ // Need to review & reationalize the client-side processing model.
+ if (method->isA<BasicConsumeOkBody>()) {
+ messaging->handle(method);
+ responses.signalResponse(method);
+ return;
+ }
if(responses.isWaiting()) {
responses.signalResponse(method);
return;
@@ -204,11 +210,11 @@ void Channel::handleMethodInContext(
}
}
catch (const UnknownMethod&) {
- connection->close(
- 504, "Unknown method",
- method->amqpClassId(), method->amqpMethodId());
- }
-}
+ connection->close(
+ 504, "Unknown method",
+ method->amqpClassId(), method->amqpMethodId());
+ }
+ }
void Channel::handleChannel(AMQMethodBody::shared_ptr method) {
switch (method->amqpMethodId()) {
@@ -272,8 +278,6 @@ void Channel::close(
void Channel::peerClose(ChannelCloseBody::shared_ptr) {
assert(isOpen());
closeInternal();
- // FIXME aconway 2007-01-26: How to throw the proper exception
- // to the application thread?
}
void Channel::closeInternal() {
@@ -287,20 +291,23 @@ void Channel::closeInternal() {
dispatcher.join();
}
-void Channel::sendAndReceive(AMQMethodBody* toSend, ClassId c, MethodId m)
+AMQMethodBody::shared_ptr Channel::sendAndReceive(
+ AMQMethodBody* toSend, ClassId c, MethodId m)
{
responses.expect();
send(toSend);
- responses.receive(c, m);
+ return responses.receive(c, m);
}
-void Channel::sendAndReceiveSync(
+AMQMethodBody::shared_ptr Channel::sendAndReceiveSync(
bool sync, AMQMethodBody* body, ClassId c, MethodId m)
{
if(sync)
- sendAndReceive(body, c, m);
- else
+ return sendAndReceive(body, c, m);
+ else {
send(body);
+ return AMQMethodBody::shared_ptr();
+ }
}
void Channel::consume(
diff --git a/cpp/lib/client/ClientChannel.h b/cpp/lib/client/ClientChannel.h
index 58a007977d..f6fe18da20 100644
--- a/cpp/lib/client/ClientChannel.h
+++ b/cpp/lib/client/ClientChannel.h
@@ -81,24 +81,25 @@ class Channel : public framing::ChannelAdapter
const std::string& uid, const std::string& pwd,
const std::string& vhost);
- void sendAndReceive(
+ framing::AMQMethodBody::shared_ptr sendAndReceive(
framing::AMQMethodBody*, framing::ClassId, framing::MethodId);
- void sendAndReceiveSync(
+ framing::AMQMethodBody::shared_ptr sendAndReceiveSync(
bool sync,
framing::AMQMethodBody*, framing::ClassId, framing::MethodId);
template <class BodyType>
boost::shared_ptr<BodyType> sendAndReceive(framing::AMQMethodBody* body) {
- sendAndReceive(body, BodyType::CLASS_ID, BodyType::METHOD_ID);
return boost::shared_polymorphic_downcast<BodyType>(
- responses.getResponse());
+ sendAndReceive(body, BodyType::CLASS_ID, BodyType::METHOD_ID));
}
- template <class BodyType> void sendAndReceiveSync(
+ template <class BodyType>
+ boost::shared_ptr<BodyType> sendAndReceiveSync(
bool sync, framing::AMQMethodBody* body) {
- sendAndReceiveSync(
- sync, body, BodyType::CLASS_ID, BodyType::METHOD_ID);
+ return boost::shared_polymorphic_downcast<BodyType>(
+ sendAndReceiveSync(
+ sync, body, BodyType::CLASS_ID, BodyType::METHOD_ID));
}
void open(framing::ChannelId, Connection&);
diff --git a/cpp/lib/client/ClientMessage.cpp b/cpp/lib/client/ClientMessage.cpp
deleted file mode 100644
index 8edd0a474d..0000000000
--- a/cpp/lib/client/ClientMessage.cpp
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <ClientMessage.h>
-using namespace qpid::client;
-using namespace qpid::framing;
-
-Message::Message(const std::string& d)
- : header(new AMQHeaderBody(BASIC))
-{
- setData(d);
-}
-
-void Message::setData(const std::string& d) {
- data = d;
-}
-
-Message::Message(AMQHeaderBody::shared_ptr& _header) : header(_header){
-}
-
-Message::~Message(){
-}
-
-BasicHeaderProperties* Message::getHeaderProperties() const {
- return dynamic_cast<BasicHeaderProperties*>(header->getProperties());
-}
-
-std::string Message::getContentType() const {
- return getHeaderProperties()->getContentType();
-}
-
-std::string Message::getContentEncoding() const {
- return getHeaderProperties()->getContentEncoding();
-}
-
-FieldTable& Message::getHeaders() const {
- return getHeaderProperties()->getHeaders();
-}
-
-uint8_t Message::getDeliveryMode() const {
- return getHeaderProperties()->getDeliveryMode();
-}
-
-uint8_t Message::getPriority() const {
- return getHeaderProperties()->getPriority();
-}
-
-std::string Message::getCorrelationId() const {
- return getHeaderProperties()->getCorrelationId();
-}
-
-std::string Message::getReplyTo() const {
- return getHeaderProperties()->getReplyTo();
-}
-
-std::string Message::getExpiration() const {
- return getHeaderProperties()->getExpiration();
-}
-
-std::string Message::getMessageId() const {
- return getHeaderProperties()->getMessageId();
-}
-
-uint64_t Message::getTimestamp() const {
- return getHeaderProperties()->getTimestamp();
-}
-
-std::string Message::getType() const {
- return getHeaderProperties()->getType();
-}
-
-std::string Message::getUserId() const {
- return getHeaderProperties()->getUserId();
-}
-
-std::string Message::getAppId() const {
- return getHeaderProperties()->getAppId();
-}
-
-std::string Message::getClusterId() const {
- return getHeaderProperties()->getClusterId();
-}
-
-void Message::setContentType(const std::string& type){
- getHeaderProperties()->setContentType(type);
-}
-
-void Message::setContentEncoding(const std::string& encoding){
- getHeaderProperties()->setContentEncoding(encoding);
-}
-
-void Message::setHeaders(const FieldTable& headers){
- getHeaderProperties()->setHeaders(headers);
-}
-
-void Message::setDeliveryMode(DeliveryMode mode){
- getHeaderProperties()->setDeliveryMode(mode);
-}
-
-void Message::setPriority(uint8_t priority){
- getHeaderProperties()->setPriority(priority);
-}
-
-void Message::setCorrelationId(const std::string& correlationId){
- getHeaderProperties()->setCorrelationId(correlationId);
-}
-
-void Message::setReplyTo(const std::string& replyTo){
- getHeaderProperties()->setReplyTo(replyTo);
-}
-
-void Message::setExpiration(const std::string& expiration){
- getHeaderProperties()->setExpiration(expiration);
-}
-
-void Message::setMessageId(const std::string& messageId){
- getHeaderProperties()->setMessageId(messageId);
-}
-
-void Message::setTimestamp(uint64_t timestamp){
- getHeaderProperties()->setTimestamp(timestamp);
-}
-
-void Message::setType(const std::string& type){
- getHeaderProperties()->setType(type);
-}
-
-void Message::setUserId(const std::string& userId){
- getHeaderProperties()->setUserId(userId);
-}
-
-void Message::setAppId(const std::string& appId){
- getHeaderProperties()->setAppId(appId);
-}
-
-void Message::setClusterId(const std::string& clusterId){
- getHeaderProperties()->setClusterId(clusterId);
-}
-
-
-uint64_t Message::getDeliveryTag() const {
- BasicDeliverBody* deliver=dynamic_cast<BasicDeliverBody*>(method.get());
- return deliver ? deliver->getDeliveryTag() : 0;
-}
diff --git a/cpp/lib/client/ClientMessage.h b/cpp/lib/client/ClientMessage.h
index c89eeb1a0d..a326c24aed 100644
--- a/cpp/lib/client/ClientMessage.h
+++ b/cpp/lib/client/ClientMessage.h
@@ -22,12 +22,10 @@
*
*/
#include <string>
-#include <framing/amqp_framing.h>
+#include "framing/BasicHeaderProperties.h"
namespace qpid {
-
namespace client {
-class IncomingMessage;
/**
* A representation of messages for sent or recived through the
@@ -35,67 +33,28 @@ class IncomingMessage;
*
* \ingroup clientapi
*/
-class Message {
- framing::AMQMethodBody::shared_ptr method;
- framing::AMQHeaderBody::shared_ptr header;
- std::string data;
- bool redelivered;
-
- // FIXME aconway 2007-02-20: const incorrect, needs const return type.
- framing::BasicHeaderProperties* getHeaderProperties() const;
- Message(qpid::framing::AMQHeaderBody::shared_ptr& header);
-
+class Message : public framing::BasicHeaderProperties {
public:
- enum DeliveryMode { DURABLE=1, NON_DURABLE=2 };
- Message(const std::string& data=std::string());
- ~Message();
+ Message(const std::string& data_=std::string()) : data(data_) {}
std::string getData() const { return data; }
- bool isRedelivered() const { return redelivered; }
- uint64_t getDeliveryTag() const;
- std::string getContentType() const;
- std::string getContentEncoding() const;
- qpid::framing::FieldTable& getHeaders() const;
- uint8_t getDeliveryMode() const;
- uint8_t getPriority() const;
- std::string getCorrelationId() const;
- std::string getReplyTo() const;
- std::string getExpiration() const;
- std::string getMessageId() const;
- uint64_t getTimestamp() const;
- std::string getType() const;
- std::string getUserId() const;
- std::string getAppId() const;
- std::string getClusterId() const;
+ void setData(const std::string& _data) { data = _data; }
- void setData(const std::string& _data);
- void setRedelivered(bool _redelivered){ redelivered = _redelivered; }
- void setContentType(const std::string& type);
- void setContentEncoding(const std::string& encoding);
- void setHeaders(const qpid::framing::FieldTable& headers);
- void setDeliveryMode(DeliveryMode mode);
- void setPriority(uint8_t priority);
- void setCorrelationId(const std::string& correlationId);
- void setReplyTo(const std::string& replyTo);
- void setExpiration(const std::string& expiration);
- void setMessageId(const std::string& messageId);
- void setTimestamp(uint64_t timestamp);
- void setType(const std::string& type);
- void setUserId(const std::string& userId);
- void setAppId(const std::string& appId);
- void setClusterId(const std::string& clusterId);
+ std::string getDestination() const { return destination; }
+ void setDestination(const std::string& dest) { destination = dest; }
- /** Get the method used to deliver this message */
- boost::shared_ptr<framing::AMQMethodBody> getMethod() const
- { return method; }
-
- void setMethod(framing::AMQMethodBody::shared_ptr m) { method=m; }
- boost::shared_ptr<framing::AMQHeaderBody> getHeader() const
- { return header; }
+ // TODO aconway 2007-03-22: only needed for Basic.deliver support.
+ uint64_t getDeliveryTag() const { return deliveryTag; }
+ void setDeliveryTag(uint64_t dt) { deliveryTag = dt; }
- // TODO aconway 2007-02-15: remove friendships.
- friend class IncomingMessage;
- friend class Channel;
+ bool isRedelivered() const { return redelivered; }
+ void setRedelivered(bool _redelivered){ redelivered = _redelivered; }
+
+ private:
+ std::string data;
+ std::string destination;
+ bool redelivered;
+ uint64_t deliveryTag;
};
}}
diff --git a/cpp/lib/client/IncomingMessage.cpp b/cpp/lib/client/IncomingMessage.cpp
index 07f94ceb64..8f69f8c3ef 100644
--- a/cpp/lib/client/IncomingMessage.cpp
+++ b/cpp/lib/client/IncomingMessage.cpp
@@ -18,155 +18,113 @@
* under the License.
*
*/
-#include <IncomingMessage.h>
-#include "framing/AMQHeaderBody.h"
-#include "framing/AMQContentBody.h"
-#include "BasicGetOkBody.h"
-#include "BasicReturnBody.h"
-#include "BasicDeliverBody.h"
-#include <QpidError.h>
-#include <iostream>
+
+#include "IncomingMessage.h"
+#include "Exception.h"
+#include "ClientMessage.h"
+#include <boost/format.hpp>
namespace qpid {
namespace client {
-using namespace sys;
-using namespace framing;
-
-struct IncomingMessage::Guard: public Mutex::ScopedLock {
- Guard(IncomingMessage* im) : Mutex::ScopedLock(im->lock) {
- im->shutdownError.throwIf();
- }
-};
-
-IncomingMessage::IncomingMessage() { reset(); }
+using boost::format;
+using sys::Mutex;
-void IncomingMessage::reset() {
- state = &IncomingMessage::expectRequest;
- endFn= &IncomingMessage::endRequest;
- buildMessage = Message();
-}
-
-void IncomingMessage::startGet() {
- Guard g(this);
- if (state != &IncomingMessage::expectRequest) {
- endGet(new QPID_ERROR(CLIENT_ERROR, "Message already in progress."));
- }
- else {
- state = &IncomingMessage::expectGetOk;
- endFn = &IncomingMessage::endGet;
- getError.reset();
- getState = GETTING;
- }
-}
-
-bool IncomingMessage::waitGet(Message& msg) {
- Guard g(this);
- while (getState == GETTING && !shutdownError && !getError)
- getReady.wait(lock);
- shutdownError.throwIf();
- getError.throwIf();
- msg = getMessage;
- return getState==GOT;
-}
-
-Message IncomingMessage::waitDispatch() {
- Guard g(this);
- while(dispatchQueue.empty() && !shutdownError)
- dispatchReady.wait(lock);
- shutdownError.throwIf();
-
- Message msg(dispatchQueue.front());
- dispatchQueue.pop();
- return msg;
-}
+IncomingMessage::Destination::~Destination() {}
-void IncomingMessage::add(BodyPtr body) {
- Guard g(this);
- shutdownError.throwIf();
- // Call the current state function.
- (this->*state)(body);
-}
-
-void IncomingMessage::shutdown() {
+void IncomingMessage::openReference(const std::string& name) {
Mutex::ScopedLock l(lock);
- shutdownError.reset(new ShutdownException());
- getReady.notify();
- dispatchReady.notify();
+ if (references.find(name) != references.end())
+ throw ChannelException(
+ 406, format("Attempt to open existing reference %s.") % name);
+ references[name];
+ return;
}
-bool IncomingMessage::isShutdown() const {
+void IncomingMessage::appendReference(
+ const std::string& name, const std::string& data)
+{
Mutex::ScopedLock l(lock);
- return shutdownError;
+ getRefUnlocked(name).data += data;
}
-// Common check for all the expect functions. Called in network thread.
-template<class T>
-boost::shared_ptr<T> IncomingMessage::expectCheck(BodyPtr body) {
- boost::shared_ptr<T> ptr = boost::dynamic_pointer_cast<T>(body);
- if (!ptr)
- throw QPID_ERROR(PROTOCOL_ERROR+504, "Unexpected frame type");
- return ptr;
+Message& IncomingMessage::createMessage(
+ const std::string& destination, const std::string& reference)
+{
+ Mutex::ScopedLock l(lock);
+ getDestUnlocked(destination); // Verify destination.
+ Reference& ref = getRefUnlocked(reference);
+ ref.messages.resize(ref.messages.size() +1);
+ ref.messages.back().setDestination(destination);
+ return ref.messages.back();
}
-void IncomingMessage::expectGetOk(BodyPtr body) {
- if (dynamic_cast<BasicGetOkBody*>(body.get()))
- state = &IncomingMessage::expectHeader;
- else if (dynamic_cast<BasicGetEmptyBody*>(body.get())) {
- getState = EMPTY;
- endGet();
+void IncomingMessage::closeReference(const std::string& name) {
+ Reference refCopy;
+ {
+ Mutex::ScopedLock l(lock);
+ refCopy = getRefUnlocked(name);
+ references.erase(name);
+ }
+ for (std::vector<Message>::iterator i = refCopy.messages.begin();
+ i != refCopy.messages.end();
+ ++i)
+ {
+ i->setData(refCopy.data);
+ // TODO aconway 2007-03-23: Thread safety,
+ // can a destination be removed while we're doing this?
+ getDestination(i->getDestination()).message(*i);
}
- else
- throw QPID_ERROR(PROTOCOL_ERROR+504, "Unexpected frame type");
}
-void IncomingMessage::expectHeader(BodyPtr body) {
- AMQHeaderBody::shared_ptr header = expectCheck<AMQHeaderBody>(body);
- buildMessage.header = header;
- state = &IncomingMessage::expectContent;
- checkComplete();
+
+void IncomingMessage::addDestination(std::string name, Destination& dest) {
+ Mutex::ScopedLock l(lock);
+ DestinationMap::iterator i = destinations.find(name);
+ if (i == destinations.end())
+ destinations[name]=&dest;
+ else if (i->second != &dest)
+ throw ChannelException(
+ 404, format("Destination already exists: %s.") % name);
}
-void IncomingMessage::expectContent(BodyPtr body) {
- AMQContentBody::shared_ptr content = expectCheck<AMQContentBody>(body);
- buildMessage.setData(buildMessage.getData() + content->getData());
- checkComplete();
+void IncomingMessage::removeDestination(std::string name) {
+ Mutex::ScopedLock l(lock);
+ DestinationMap::iterator i = destinations.find(name);
+ if (i == destinations.end())
+ throw ChannelException(
+ 406, format("No such destination: %s.") % name);
+ destinations.erase(i);
}
-void IncomingMessage::checkComplete() {
- size_t declaredSize = buildMessage.header->getContentSize();
- size_t currentSize = buildMessage.getData().size();
- if (declaredSize == currentSize)
- (this->*endFn)(0);
- else if (declaredSize < currentSize)
- (this->*endFn)(new QPID_ERROR(
- PROTOCOL_ERROR, "Message content exceeds declared size."));
+IncomingMessage::Destination& IncomingMessage::getDestination(
+ const std::string& name) {
+ return getDestUnlocked(name);
}
-void IncomingMessage::expectRequest(BodyPtr body) {
- AMQMethodBody::shared_ptr method = expectCheck<AMQMethodBody>(body);
- buildMessage.setMethod(method);
- state = &IncomingMessage::expectHeader;
+IncomingMessage::Reference& IncomingMessage::getReference(
+ const std::string& name) {
+ return getRefUnlocked(name);
}
-
-void IncomingMessage::endGet(Exception* ex) {
- getError.reset(ex);
- if (getState == GETTING) {
- getMessage = buildMessage;
- getState = GOT;
- }
- reset();
- getReady.notify();
+
+IncomingMessage::Reference& IncomingMessage::getRefUnlocked(
+ const std::string& name) {
+ Mutex::ScopedLock l(lock);
+ ReferenceMap::iterator i = references.find(name);
+ if (i == references.end())
+ throw ChannelException(
+ 404, format("No such reference: %s.") % name);
+ return i->second;
}
-void IncomingMessage::endRequest(Exception* ex) {
- ExceptionHolder eh(ex);
- if (!eh) {
- dispatchQueue.push(buildMessage);
- reset();
- dispatchReady.notify();
- }
- eh.throwIf();
+IncomingMessage::Destination& IncomingMessage::getDestUnlocked(
+ const std::string& name) {
+ Mutex::ScopedLock l(lock);
+ DestinationMap::iterator i = destinations.find(name);
+ if (i == destinations.end())
+ throw ChannelException(
+ 404, format("No such destination: %s.") % name);
+ return *i->second;
}
}} // namespace qpid::client
diff --git a/cpp/lib/client/IncomingMessage.h b/cpp/lib/client/IncomingMessage.h
index 6ec949028d..d78a90327d 100644
--- a/cpp/lib/client/IncomingMessage.h
+++ b/cpp/lib/client/IncomingMessage.h
@@ -21,96 +21,91 @@
* under the License.
*
*/
-#include <string>
-#include <queue>
-#include <framing/amqp_framing.h>
-#include "ExceptionHolder.h"
-#include "ClientMessage.h"
#include "sys/Mutex.h"
-#include "sys/Condition.h"
+#include <map>
+#include <vector>
+
namespace qpid {
+namespace client {
-namespace framing {
-class AMQBody;
-}
+class Message;
-namespace client {
/**
- * Accumulates incoming message frames into messages.
- * Client-initiated messages (basic.get) are initiated and made
- * available to the user thread one at a time.
- *
- * Broker initiated messages (basic.return, basic.deliver) are
- * queued for handling by the user dispatch thread.
- *
+ * Manage incoming messages.
+ *
+ * Uses reference and destination concepts from 0-9 Messsage class.
+ *
+ * Basic messages use special destination and reference names to indicate
+ * get-ok, return etc. messages.
+ *
*/
class IncomingMessage {
public:
- typedef boost::shared_ptr<framing::AMQBody> BodyPtr;
- IncomingMessage();
-
- /** Expect a new message starting with getOk. Called in user thread.*/
- void startGet();
+ /** Accumulate data associated with a set of messages. */
+ struct Reference {
+ std::string data;
+ std::vector<Message> messages;
+ };
- /** Wait for the message to complete, return the message.
- * Called in user thread.
- *@raises QpidError if there was an error.
- */
- bool waitGet(Message&);
+ /** Interface to a destination for messages. */
+ class Destination {
+ public:
+ virtual ~Destination();
- /** Wait for the next broker-initiated message. */
- Message waitDispatch();
+ /** Pass a message to the destination */
+ virtual void message(const Message&) = 0;
- /** Add a frame body to the message. Called in network thread. */
- void add(BodyPtr);
+ /** Notify destination of queue-empty contition */
+ virtual void empty() = 0;
+ };
- /** Shut down: all further calls to any function throw ex. */
- void shutdown();
- /** Check if shutdown */
- bool isShutdown() const;
+ /** Add a reference. Throws if already open. */
+ void openReference(const std::string& name);
- private:
+ /** Get a reference. Throws if not already open. */
+ void appendReference(const std::string& name,
+ const std::string& data);
- typedef void (IncomingMessage::* ExpectFn)(BodyPtr);
- typedef void (IncomingMessage::* EndFn)(Exception*);
- typedef std::queue<Message> MessageQueue;
- struct Guard;
- friend struct Guard;
+ /** Create a message to destination associated with reference
+ *@exception if destination or reference non-existent.
+ */
+ Message& createMessage(const std::string& destination,
+ const std::string& reference);
- void reset();
- template <class T> boost::shared_ptr<T> expectCheck(BodyPtr);
+ /** Get a reference.
+ *@exception if non-existent.
+ */
+ Reference& getReference(const std::string& name);
+
+ /** Close a reference and deliver all its messages.
+ * Throws if not open or a message has an invalid destination.
+ */
+ void closeReference(const std::string& name);
- // State functions - a state machine where each state is
- // a member function that processes a frame body.
- void expectGetOk(BodyPtr);
- void expectHeader(BodyPtr);
- void expectContent(BodyPtr);
- void expectRequest(BodyPtr);
+ /** Add a destination.
+ *@exception if a different Destination is already registered
+ * under name.
+ */
+ void addDestination(std::string name, Destination&);
+
+ /** Remove a destination. Throws if does not exist */
+ void removeDestination(std::string name);
- // End functions.
- void endGet(Exception* ex = 0);
- void endRequest(Exception* ex);
+ /** Get a destination. Throws if does not exist */
+ Destination& getDestination(const std::string& name);
+ private:
- // Check for complete message.
- void checkComplete();
+ typedef std::map<std::string, Reference> ReferenceMap;
+ typedef std::map<std::string, Destination*> DestinationMap;
+ Reference& getRefUnlocked(const std::string& name);
+ Destination& getDestUnlocked(const std::string& name);
+
mutable sys::Mutex lock;
- ExpectFn state;
- EndFn endFn;
- Message buildMessage;
- ExceptionHolder shutdownError;
-
- // For basic.get messages.
- sys::Condition getReady;
- ExceptionHolder getError;
- Message getMessage;
- enum { GETTING, GOT, EMPTY } getState;
-
- // For broker-initiated messages
- sys::Condition dispatchReady;
- MessageQueue dispatchQueue;
+ ReferenceMap references;
+ DestinationMap destinations;
};
}}
diff --git a/cpp/lib/client/Makefile.am b/cpp/lib/client/Makefile.am
index 46d8775072..6dbe195232 100644
--- a/cpp/lib/client/Makefile.am
+++ b/cpp/lib/client/Makefile.am
@@ -12,7 +12,6 @@ libqpidclient_la_LDFLAGS = -version-info $(LIBTOOL_VERSION_INFO_ARG)
libqpidclient_la_SOURCES = \
ClientChannel.cpp \
ClientExchange.cpp \
- ClientMessage.cpp \
ClientQueue.cpp \
BasicMessageChannel.cpp \
Connection.cpp \
diff --git a/cpp/lib/client/ResponseHandler.cpp b/cpp/lib/client/ResponseHandler.cpp
index 4498de41ae..926a9ce336 100644
--- a/cpp/lib/client/ResponseHandler.cpp
+++ b/cpp/lib/client/ResponseHandler.cpp
@@ -18,12 +18,10 @@
* under the License.
*
*/
-#include <boost/format.hpp>
-
-#include <ResponseHandler.h>
-#include <sys/Monitor.h>
#include <QpidError.h>
-#include "amqp_types.h"
+#include <boost/format.hpp>
+#include "ResponseHandler.h"
+#include "AMQMethodBody.h"
using namespace qpid::sys;
using namespace qpid::framing;
@@ -31,56 +29,51 @@ using namespace qpid::framing;
namespace qpid {
namespace client {
-ResponseHandler::ResponseHandler() : waiting(false){}
+ResponseHandler::ResponseHandler() : waiting(false), shutdownFlag(false) {}
ResponseHandler::~ResponseHandler(){}
-bool ResponseHandler::validate(ClassId c, MethodId m) {
- return response != 0 &&
- response->amqpClassId() ==c && response->amqpMethodId() == m;
+bool ResponseHandler::isWaiting() {
+ Monitor::ScopedLock l(monitor);
+ return waiting;
}
-void ResponseHandler::waitForResponse(){
+void ResponseHandler::expect(){
Monitor::ScopedLock l(monitor);
- while (waiting)
- monitor.wait();
+ waiting = true;
}
-void ResponseHandler::signalResponse(
- qpid::framing::AMQMethodBody::shared_ptr _response)
+void ResponseHandler::signalResponse(MethodPtr _response)
{
Monitor::ScopedLock l(monitor);
response = _response;
+ if (!response)
+ shutdownFlag=true;
waiting = false;
monitor.notify();
}
-void ResponseHandler::receive(ClassId c, MethodId m) {
+ResponseHandler::MethodPtr ResponseHandler::receive() {
Monitor::ScopedLock l(monitor);
- while (waiting)
+ while (!response && !shutdownFlag)
monitor.wait();
- getResponse(); // Check for closed.
- if(!validate(response->amqpClassId(), response->amqpMethodId())) {
+ if (shutdownFlag)
+ THROW_QPID_ERROR(
+ PROTOCOL_ERROR, "Channel closed unexpectedly.");
+ MethodPtr result = response;
+ response.reset();
+ return result;
+}
+
+ResponseHandler::MethodPtr ResponseHandler::receive(ClassId c, MethodId m) {
+ MethodPtr response = receive();
+ if(c != response->amqpClassId() || m != response->amqpMethodId()) {
THROW_QPID_ERROR(
PROTOCOL_ERROR,
boost::format("Expected class:method %d:%d, got %d:%d")
% c % m % response->amqpClassId() % response->amqpMethodId());
}
-}
-
-framing::AMQMethodBody::shared_ptr ResponseHandler::getResponse() {
- if (!response)
- THROW_QPID_ERROR(
- PROTOCOL_ERROR, "Channel closed unexpectedly.");
return response;
}
-RequestId ResponseHandler::getRequestId() {
- assert(response->getRequestId());
- return response->getRequestId();
-}
-void ResponseHandler::expect(){
- waiting = true;
-}
-
}} // namespace qpid::client
diff --git a/cpp/lib/client/ResponseHandler.h b/cpp/lib/client/ResponseHandler.h
index d28048c3d3..500166144d 100644
--- a/cpp/lib/client/ResponseHandler.h
+++ b/cpp/lib/client/ResponseHandler.h
@@ -18,51 +18,58 @@
* under the License.
*
*/
-#include <string>
-
-#include <framing/amqp_framing.h> // FIXME aconway 2007-02-01: #include cleanup.
+#include "shared_ptr.h"
#include <sys/Monitor.h>
#ifndef _ResponseHandler_
#define _ResponseHandler_
namespace qpid {
+
+namespace framing {
+class AMQMethodBody;
+}
+
namespace client {
/**
* Holds a response from the broker peer for the client.
*/
class ResponseHandler{
+ typedef shared_ptr<framing::AMQMethodBody> MethodPtr;
bool waiting;
- qpid::framing::AMQMethodBody::shared_ptr response;
- qpid::sys::Monitor monitor;
+ bool shutdownFlag;
+ MethodPtr response;
+ sys::Monitor monitor;
public:
ResponseHandler();
~ResponseHandler();
-
- bool isWaiting(){ return waiting; }
- framing::AMQMethodBody::shared_ptr getResponse();
- void waitForResponse();
-
- void signalResponse(framing::AMQMethodBody::shared_ptr response);
- void expect();//must be called before calling receive
- bool validate(framing::ClassId, framing::MethodId);
- void receive(framing::ClassId, framing::MethodId);
+ /** Is a response expected? */
+ bool isWaiting();
- framing::RequestId getRequestId();
+ /** Provide a response to the waiting thread */
+ void signalResponse(MethodPtr response);
- template <class BodyType> bool validate() {
- return validate(BodyType::CLASS_ID, BodyType::METHOD_ID);
- }
- template <class BodyType> void receive() {
- receive(BodyType::CLASS_ID, BodyType::METHOD_ID);
+ /** Indicate a message is expected. */
+ void expect();
+
+ /** Wait for a response. */
+ MethodPtr receive();
+
+ /** Wait for a specific response. */
+ MethodPtr receive(framing::ClassId, framing::MethodId);
+
+ /** Template version of receive returns typed pointer. */
+ template <class BodyType>
+ shared_ptr<BodyType> receive() {
+ return shared_polymorphic_downcast<BodyType>(
+ receive(BodyType::CLASS_ID, BodyType::METHOD_ID));
}
};
-}
-}
+}}
#endif
diff --git a/cpp/lib/common/framing/BasicHeaderProperties.cpp b/cpp/lib/common/framing/BasicHeaderProperties.cpp
index 930ec9f4dd..d815d1e62f 100644
--- a/cpp/lib/common/framing/BasicHeaderProperties.cpp
+++ b/cpp/lib/common/framing/BasicHeaderProperties.cpp
@@ -22,7 +22,7 @@
//TODO: This could be easily generated from the spec
-qpid::framing::BasicHeaderProperties::BasicHeaderProperties() : deliveryMode(0), priority(0), timestamp(0){}
+qpid::framing::BasicHeaderProperties::BasicHeaderProperties() : deliveryMode(DeliveryMode(0)), priority(0), timestamp(0){}
qpid::framing::BasicHeaderProperties::~BasicHeaderProperties(){}
uint32_t qpid::framing::BasicHeaderProperties::size() const{
@@ -70,7 +70,7 @@ void qpid::framing::BasicHeaderProperties::decode(qpid::framing::Buffer& buffer,
if(flags & (1 << 15)) buffer.getShortString(contentType);
if(flags & (1 << 14)) buffer.getShortString(contentEncoding);
if(flags & (1 << 13)) buffer.getFieldTable(headers);
- if(flags & (1 << 12)) deliveryMode = buffer.getOctet();
+ if(flags & (1 << 12)) deliveryMode = DeliveryMode(buffer.getOctet());
if(flags & (1 << 11)) priority = buffer.getOctet();
if(flags & (1 << 10)) buffer.getShortString(correlationId);
if(flags & (1 << 9)) buffer.getShortString(replyTo);
diff --git a/cpp/lib/common/framing/BasicHeaderProperties.h b/cpp/lib/common/framing/BasicHeaderProperties.h
index 316e67b82c..248014aefb 100644
--- a/cpp/lib/common/framing/BasicHeaderProperties.h
+++ b/cpp/lib/common/framing/BasicHeaderProperties.h
@@ -28,70 +28,88 @@
namespace qpid {
namespace framing {
- enum delivery_mode {TRANSIENT = 1, PERSISTENT = 2};
- //TODO: This could be easily generated from the spec
- class BasicHeaderProperties : public HeaderProperties
- {
- string contentType;
- string contentEncoding;
- FieldTable headers;
- uint8_t deliveryMode;
- uint8_t priority;
- string correlationId;
- string replyTo;
- string expiration;
- string messageId;
- uint64_t timestamp;
- string type;
- string userId;
- string appId;
- string clusterId;
-
- uint16_t getFlags() const;
-
- public:
- BasicHeaderProperties();
- virtual ~BasicHeaderProperties();
- virtual uint32_t size() const;
- virtual void encode(Buffer& buffer) const;
- virtual void decode(Buffer& buffer, uint32_t size);
+enum DeliveryMode { TRANSIENT = 1, PERSISTENT = 2};
- virtual uint8_t classId() { return BASIC; }
+class BasicHeaderProperties : public HeaderProperties
+{
+ string contentType;
+ string contentEncoding;
+ FieldTable headers;
+ DeliveryMode deliveryMode;
+ uint8_t priority;
+ string correlationId;
+ string replyTo;
+ string expiration;
+ string messageId;
+ uint64_t timestamp;
+ string type;
+ string userId;
+ string appId;
+ string clusterId;
+
+ uint16_t getFlags() const;
- string getContentType() const { return contentType; }
- string getContentEncoding() const { return contentEncoding; }
- FieldTable& getHeaders() { return headers; }
- uint8_t getDeliveryMode() const { return deliveryMode; }
- uint8_t getPriority() const { return priority; }
- string getCorrelationId() const {return correlationId; }
- string getReplyTo() const { return replyTo; }
- string getExpiration() const { return expiration; }
- string getMessageId() const {return messageId; }
- uint64_t getTimestamp() const { return timestamp; }
- string getType() const { return type; }
- string getUserId() const { return userId; }
- string getAppId() const { return appId; }
- string getClusterId() const { return clusterId; }
+ public:
+ BasicHeaderProperties();
+ virtual ~BasicHeaderProperties();
+ virtual uint32_t size() const;
+ virtual void encode(Buffer& buffer) const;
+ virtual void decode(Buffer& buffer, uint32_t size);
- void setContentType(const string& _type){ contentType = _type; }
- void setContentEncoding(const string& encoding){ contentEncoding = encoding; }
- void setHeaders(const FieldTable& _headers){ headers = _headers; }
- void setDeliveryMode(uint8_t mode){ deliveryMode = mode; }
- void setPriority(uint8_t _priority){ priority = _priority; }
- void setCorrelationId(const string& _correlationId){ correlationId = _correlationId; }
- void setReplyTo(const string& _replyTo){ replyTo = _replyTo;}
- void setExpiration(const string& _expiration){ expiration = _expiration; }
- void setMessageId(const string& _messageId){ messageId = _messageId; }
- void setTimestamp(uint64_t _timestamp){ timestamp = _timestamp; }
- void setType(const string& _type){ type = _type; }
- void setUserId(const string& _userId){ userId = _userId; }
- void setAppId(const string& _appId){appId = _appId; }
- void setClusterId(const string& _clusterId){ clusterId = _clusterId; }
- };
+ virtual uint8_t classId() { return BASIC; }
-}
-}
+ string getContentType() const { return contentType; }
+ string getContentEncoding() const { return contentEncoding; }
+ FieldTable& getHeaders() { return headers; }
+ const FieldTable& getHeaders() const { return headers; }
+ DeliveryMode getDeliveryMode() const { return deliveryMode; }
+ uint8_t getPriority() const { return priority; }
+ string getCorrelationId() const {return correlationId; }
+ string getReplyTo() const { return replyTo; }
+ string getExpiration() const { return expiration; }
+ string getMessageId() const {return messageId; }
+ uint64_t getTimestamp() const { return timestamp; }
+ string getType() const { return type; }
+ string getUserId() const { return userId; }
+ string getAppId() const { return appId; }
+ string getClusterId() const { return clusterId; }
+ void setContentType(const string& _type){ contentType = _type; }
+ void setContentEncoding(const string& encoding){ contentEncoding = encoding; }
+ void setHeaders(const FieldTable& _headers){ headers = _headers; }
+ void setDeliveryMode(DeliveryMode mode){ deliveryMode = mode; }
+ void setPriority(uint8_t _priority){ priority = _priority; }
+ void setCorrelationId(const string& _correlationId){ correlationId = _correlationId; }
+ void setReplyTo(const string& _replyTo){ replyTo = _replyTo;}
+ void setExpiration(const string& _expiration){ expiration = _expiration; }
+ void setMessageId(const string& _messageId){ messageId = _messageId; }
+ void setTimestamp(uint64_t _timestamp){ timestamp = _timestamp; }
+ void setType(const string& _type){ type = _type; }
+ void setUserId(const string& _userId){ userId = _userId; }
+ void setAppId(const string& _appId){appId = _appId; }
+ void setClusterId(const string& _clusterId){ clusterId = _clusterId; }
+ /** \internal
+ * Template to copy between types like BasicHeaderProperties.
+ */
+ template <class T, class U>
+ static void copy(T& to, const U& from) {
+ to.setContentType(from.getContentType());
+ to.setContentEncoding(from.getContentEncoding());
+ to.setHeaders(from.getHeaders());
+ to.setDeliveryMode(from.getDeliveryMode());
+ to.setPriority(from.getPriority());
+ to.setCorrelationId(from.getCorrelationId());
+ to.setReplyTo(from.getReplyTo());
+ to.setExpiration(from.getExpiration());
+ to.setMessageId(from.getMessageId());
+ to.setTimestamp(from.getTimestamp());
+ to.setType(from.getType());
+ to.setUserId(from.getUserId());
+ to.setAppId(from.getAppId());
+ to.setClusterId(from.getClusterId());
+ }
+};
+}}
#endif
diff --git a/cpp/lib/common/framing/MethodContext.h b/cpp/lib/common/framing/MethodContext.h
index 3493924bf6..80e4c55d7e 100644
--- a/cpp/lib/common/framing/MethodContext.h
+++ b/cpp/lib/common/framing/MethodContext.h
@@ -67,11 +67,6 @@ struct MethodContext
RequestId getRequestId() const;
};
-// FIXME aconway 2007-02-01: Method context only required on Handler
-// functions, not on Proxy functions. If we add set/getChannel(ChannelAdapter*)
-// on AMQBody and set it during decodeing then we could get rid of the context.
-
-
}} // namespace qpid::framing
diff --git a/cpp/lib/common/shared_ptr.h b/cpp/lib/common/shared_ptr.h
index 6725f7acb3..c4d547e5bb 100644
--- a/cpp/lib/common/shared_ptr.h
+++ b/cpp/lib/common/shared_ptr.h
@@ -20,10 +20,15 @@
*/
#include <boost/shared_ptr.hpp>
+#include <boost/cast.hpp>
namespace qpid {
-/// Import shared_ptr into qpid namespace.
+/// Import shared_ptr definitions into qpid namespace.
using boost::shared_ptr;
+using boost::dynamic_pointer_cast;
+using boost::static_pointer_cast;
+using boost::const_pointer_cast;
+using boost::shared_polymorphic_downcast;
} // namespace qpid
diff --git a/cpp/lib/common/sys/ProducerConsumer.cpp b/cpp/lib/common/sys/ProducerConsumer.cpp
index 3f6156f230..7a0249f666 100644
--- a/cpp/lib/common/sys/ProducerConsumer.cpp
+++ b/cpp/lib/common/sys/ProducerConsumer.cpp
@@ -27,12 +27,12 @@ namespace sys {
// // ================ ProducerConsumer
ProducerConsumer::ProducerConsumer(size_t init_items)
- : items(init_items), waiters(0), stopped(false)
+ : items(init_items), waiters(0), shutdownFlag(false)
{}
-void ProducerConsumer::stop() {
+void ProducerConsumer::shutdown() {
Mutex::ScopedLock l(monitor);
- stopped = true;
+ shutdownFlag = true;
monitor.notifyAll();
// Wait for waiting consumers to wake up.
while (waiters > 0)
@@ -55,16 +55,16 @@ ProducerConsumer::Lock::Lock(ProducerConsumer& p)
: pc(p), lock(p.monitor), status(INCOMPLETE) {}
bool ProducerConsumer::Lock::isOk() const {
- return !pc.isStopped() && status==INCOMPLETE;
+ return !pc.isShutdown() && status==INCOMPLETE;
}
void ProducerConsumer::Lock::checkOk() const {
- assert(!pc.isStopped());
+ assert(!pc.isShutdown());
assert(status == INCOMPLETE);
}
ProducerConsumer::Lock::~Lock() {
- assert(status != INCOMPLETE || pc.isStopped());
+ assert(status != INCOMPLETE || pc.isShutdown());
}
void ProducerConsumer::Lock::confirm() {
@@ -96,7 +96,7 @@ ProducerConsumer::ConsumerLock::ConsumerLock(ProducerConsumer& p) : Lock(p)
{
if (isOk()) {
ScopedIncrement<size_t> inc(pc.waiters);
- while (pc.items == 0 && !pc.stopped) {
+ while (pc.items == 0 && !pc.shutdownFlag) {
pc.monitor.wait();
}
}
@@ -115,7 +115,7 @@ ProducerConsumer::ConsumerLock::ConsumerLock(
else {
Time deadline = now() + timeout;
ScopedIncrement<size_t> inc(pc.waiters);
- while (pc.items == 0 && !pc.stopped) {
+ while (pc.items == 0 && !pc.shutdownFlag) {
if (!pc.monitor.wait(deadline)) {
status = TIMEOUT;
return;
@@ -126,9 +126,9 @@ ProducerConsumer::ConsumerLock::ConsumerLock(
}
ProducerConsumer::ConsumerLock::~ConsumerLock() {
- if (pc.isStopped()) {
+ if (pc.isShutdown()) {
if (pc.waiters == 0)
- pc.monitor.notifyAll(); // All waiters woken, notify stop thread(s)
+ pc.monitor.notifyAll(); // Notify shutdown thread(s)
}
else if (status==CONFIRMED) {
pc.items--;
diff --git a/cpp/lib/common/sys/ProducerConsumer.h b/cpp/lib/common/sys/ProducerConsumer.h
index 742639323b..c7f42f266d 100644
--- a/cpp/lib/common/sys/ProducerConsumer.h
+++ b/cpp/lib/common/sys/ProducerConsumer.h
@@ -30,7 +30,7 @@ namespace sys {
*
* Producers increase the number of available items, consumers reduce it.
* Consumers wait till an item is available. Waiting threads can be
- * woken for shutdown using stop().
+ * woken for shutdown using shutdown().
*
* Note: Currently implements unbounded producer-consumer, i.e. no limit
* to available items, producers never block. Can be extended to support
@@ -43,16 +43,16 @@ class ProducerConsumer
public:
ProducerConsumer(size_t init_items=0);
- ~ProducerConsumer() { stop(); }
+ ~ProducerConsumer() { shutdown(); }
/**
* Wake any threads waiting for ProducerLock or ConsumerLock.
*@post No threads are waiting in Producer or Consumer locks.
*/
- void stop();
+ void shutdown();
- /** True if queue is stopped */
- bool isStopped() { return stopped; }
+ /** True if queue is shutdown */
+ bool isShutdown() { return shutdownFlag; }
/** Number of items available for consumers */
size_t available() const;
@@ -76,7 +76,7 @@ class ProducerConsumer
*confirm() or cancel() before the lock goes out of scope.
*
* false means the lock failed - timed out or the
- * ProducerConsumer is stopped. You should not do anything in
+ * ProducerConsumer is shutdown. You should not do anything in
* the scope of the lock.
*/
bool isOk() const;
@@ -98,8 +98,8 @@ class ProducerConsumer
/** True if this lock experienced a timeout */
bool isTimedOut() const { return status == TIMEOUT; }
- /** True if we have been stopped */
- bool isStopped() const { return pc.isStopped(); }
+ /** True if we have been shutdown */
+ bool isShutdown() const { return pc.isShutdown(); }
ProducerConsumer& pc;
@@ -141,7 +141,7 @@ class ProducerConsumer
* Wait up to timeout to acquire lock.
*@post If isOk() caller has a producer lock.
* If isTimedOut() there was a timeout.
- * If neither then we were stopped.
+ * If neither then we were shutdown.
*/
ConsumerLock(ProducerConsumer& p, const Time& timeout);
@@ -153,7 +153,7 @@ class ProducerConsumer
mutable Monitor monitor;
size_t items;
size_t waiters;
- bool stopped;
+ bool shutdownFlag;
friend class Lock;
friend class ProducerLock;
diff --git a/cpp/lib/common/sys/ThreadSafeQueue.h b/cpp/lib/common/sys/ThreadSafeQueue.h
index ff949a3e16..80ea92da0e 100644
--- a/cpp/lib/common/sys/ThreadSafeQueue.h
+++ b/cpp/lib/common/sys/ThreadSafeQueue.h
@@ -46,7 +46,7 @@ class ThreadSafeQueue
}
/** Pop a value from the front of the queue. Waits till value is available.
- *@throw ShutdownException if queue is stopped while waiting.
+ *@throw ShutdownException if queue is shutdown while waiting.
*/
T pop() {
ProducerConsumer::ConsumerLock consumer(pc);
@@ -75,10 +75,10 @@ class ThreadSafeQueue
}
/** Interrupt threads waiting in pop() */
- void stop() { pc.stop(); }
+ void shutdown() { pc.shutdown(); }
- /** True if queue is stopped */
- bool isStopped() { return pc.isStopped(); }
+ /** True if queue is shutdown */
+ bool isShutdown() { return pc.isShutdown(); }
/** Size of the queue */
size_t size() { ProducerConsumer::Lock l(pc); return container.size(); }