summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/Makefile.am29
-rw-r--r--cpp/src/qpid/broker/SemanticHandler.cpp13
-rw-r--r--cpp/src/qpid/client/BasicMessageChannel.cpp335
-rw-r--r--cpp/src/qpid/client/BasicMessageChannel.h92
-rw-r--r--cpp/src/qpid/client/BlockingQueue.h87
-rw-r--r--cpp/src/qpid/client/ChainableFrameHandler.h47
-rw-r--r--cpp/src/qpid/client/ChannelHandler.cpp123
-rw-r--r--cpp/src/qpid/client/ChannelHandler.h64
-rw-r--r--cpp/src/qpid/client/ClientChannel.cpp390
-rw-r--r--cpp/src/qpid/client/ClientChannel.h97
-rw-r--r--cpp/src/qpid/client/ClientConnection.cpp52
-rw-r--r--cpp/src/qpid/client/CompletionTracker.cpp64
-rw-r--r--cpp/src/qpid/client/CompletionTracker.h56
-rw-r--r--cpp/src/qpid/client/Connection.h7
-rw-r--r--cpp/src/qpid/client/ConnectionHandler.cpp196
-rw-r--r--cpp/src/qpid/client/ConnectionHandler.h80
-rw-r--r--cpp/src/qpid/client/Correlator.cpp44
-rw-r--r--cpp/src/qpid/client/Correlator.h52
-rw-r--r--cpp/src/qpid/client/ExecutionHandler.cpp159
-rw-r--r--cpp/src/qpid/client/ExecutionHandler.h70
-rw-r--r--cpp/src/qpid/client/FutureCompletion.cpp61
-rw-r--r--cpp/src/qpid/client/FutureCompletion.h52
-rw-r--r--cpp/src/qpid/client/FutureFactory.cpp51
-rw-r--r--cpp/src/qpid/client/FutureFactory.h48
-rw-r--r--cpp/src/qpid/client/FutureResponse.cpp42
-rw-r--r--cpp/src/qpid/client/FutureResponse.h44
-rw-r--r--cpp/src/qpid/client/IncomingMessage.cpp168
-rw-r--r--cpp/src/qpid/client/IncomingMessage.h136
-rw-r--r--cpp/src/qpid/client/ReceivedContent.cpp104
-rw-r--r--cpp/src/qpid/client/ReceivedContent.h83
-rw-r--r--cpp/src/qpid/client/StateManager.cpp68
-rw-r--r--cpp/src/qpid/client/StateManager.h46
-rw-r--r--cpp/src/tests/FramingTest.cpp6
-rw-r--r--cpp/src/tests/Serializer.cpp13
-rw-r--r--cpp/src/tests/client_test.cpp11
-rw-r--r--cpp/src/tests/topic_listener.cpp3
-rw-r--r--cpp/src/tests/topic_publisher.cpp3
37 files changed, 1914 insertions, 1082 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index d4293b70fd..0f8ec224cf 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -229,12 +229,21 @@ libqpidclient_la_SOURCES = \
qpid/client/ClientChannel.cpp \
qpid/client/ClientExchange.cpp \
qpid/client/ClientQueue.cpp \
- qpid/client/BasicMessageChannel.cpp \
qpid/client/Connector.cpp \
- qpid/client/IncomingMessage.cpp \
qpid/client/MessageListener.cpp \
qpid/client/ResponseHandler.cpp \
- qpid/client/ReturnedMessageHandler.cpp
+ qpid/client/ReturnedMessageHandler.cpp \
+ qpid/client/Correlator.cpp \
+ qpid/client/CompletionTracker.cpp \
+ qpid/client/ChannelHandler.cpp \
+ qpid/client/ConnectionHandler.cpp \
+ qpid/client/ExecutionHandler.cpp \
+ qpid/client/FutureCompletion.cpp \
+ qpid/client/FutureResponse.cpp \
+ qpid/client/FutureFactory.cpp \
+ qpid/client/ReceivedContent.cpp \
+ qpid/client/StateManager.cpp
+
nobase_include_HEADERS = \
$(platform_hdr) \
@@ -306,19 +315,29 @@ nobase_include_HEADERS = \
qpid/broker/TransactionalStore.h \
qpid/broker/TxAck.h \
qpid/client/AckMode.h \
- qpid/client/BasicMessageChannel.h \
qpid/client/ClientChannel.h \
qpid/client/ClientExchange.h \
qpid/client/ClientMessage.h \
qpid/client/ClientQueue.h \
qpid/client/Connection.h \
qpid/client/Connector.h \
- qpid/client/IncomingMessage.h \
qpid/client/MessageChannel.h \
qpid/client/MessageListener.h \
qpid/client/MethodBodyInstances.h \
qpid/client/ResponseHandler.h \
qpid/client/ReturnedMessageHandler.h \
+ qpid/client/BlockingQueue.h \
+ qpid/client/Correlator.h \
+ qpid/client/CompletionTracker.h \
+ qpid/client/ChannelHandler.h \
+ qpid/client/ChainableFrameHandler.h \
+ qpid/client/ConnectionHandler.h \
+ qpid/client/ExecutionHandler.h \
+ qpid/client/FutureCompletion.h \
+ qpid/client/FutureResponse.h \
+ qpid/client/FutureFactory.h \
+ qpid/client/ReceivedContent.h \
+ qpid/client/StateManager.h \
qpid/framing/AMQBody.h \
qpid/framing/AMQContentBody.h \
qpid/framing/AMQDataBlock.h \
diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp
index e9ec698400..27f484cfcb 100644
--- a/cpp/src/qpid/broker/SemanticHandler.cpp
+++ b/cpp/src/qpid/broker/SemanticHandler.cpp
@@ -68,7 +68,11 @@ void SemanticHandler::handleMethodInContext(boost::shared_ptr<qpid::framing::AMQ
handleL4(method, context);
//(if the frameset is complete) we can move the execution-mark
//forward
- ++(incoming.hwm);
+
+ //temporary hack until channel management is moved to its own handler:
+ if (method->amqpClassId() != ChannelOpenBody::CLASS_ID) {
+ ++(incoming.hwm);
+ }
//note: need to be more sophisticated than this if we execute
//commands that arrive within an active message frameset (that
@@ -175,8 +179,11 @@ RequestId SemanticHandler::send(shared_ptr<AMQBody> body, Correlator::Action act
Mutex::ScopedLock l(outLock);
uint8_t type(body->type());
if (type == REQUEST_BODY || type == RESPONSE_BODY || type == METHOD_BODY) {
- ++outgoing.hwm;
- //std::cout << "[" << this << "] allocated: " << outgoing.hwm.getValue() << " to " << *body << std::endl;
+ //temporary hack until channel management is moved to its own handler:
+ if (dynamic_pointer_cast<AMQMethodBody>(body)->amqpClassId() != ChannelOpenBody::CLASS_ID) {
+ ++outgoing.hwm;
+ //std::cout << "[" << this << "] allocated: " << outgoing.hwm.getValue() << " to " << *body << std::endl;
+ }
}
return ChannelAdapter::send(body, action);
}
diff --git a/cpp/src/qpid/client/BasicMessageChannel.cpp b/cpp/src/qpid/client/BasicMessageChannel.cpp
deleted file mode 100644
index 70cb473426..0000000000
--- a/cpp/src/qpid/client/BasicMessageChannel.cpp
+++ /dev/null
@@ -1,335 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "qpid/log/Statement.h"
-#include "BasicMessageChannel.h"
-#include "qpid/framing/AMQMethodBody.h"
-#include "ClientChannel.h"
-#include "ReturnedMessageHandler.h"
-#include "MessageListener.h"
-#include "qpid/framing/FieldTable.h"
-#include "Connection.h"
-#include <queue>
-#include <iostream>
-#include <boost/format.hpp>
-#include <boost/variant.hpp>
-
-namespace qpid {
-namespace client {
-
-using namespace std;
-using namespace sys;
-using namespace framing;
-using boost::format;
-
-namespace {
-
-// Destination name constants
-const std::string BASIC_GET("__basic_get__");
-const std::string BASIC_RETURN("__basic_return__");
-
-// Reference name constant
-const std::string BASIC_REF("__basic_reference__");
-}
-
-BasicMessageChannel::BasicMessageChannel(Channel& ch)
- : channel(ch), returnsHandler(0)
-{
- incoming.addDestination(BASIC_RETURN, destDispatch);
-}
-
-void BasicMessageChannel::consume(
- Queue& queue, std::string& tag, MessageListener* listener,
- AckMode ackMode, bool noLocal, bool synch, const FieldTable* fields)
-{
- {
- // Note we create a consumer even if tag="". In that case
- // It will be renamed when we handle BasicConsumeOkBody.
- //
- Mutex::ScopedLock l(lock);
- ConsumerMap::iterator i = consumers.find(tag);
- if (i != consumers.end())
- THROW_QPID_ERROR(CLIENT_ERROR,
- "Consumer already exists with tag="+tag);
- Consumer& c = consumers[tag];
- c.listener = listener;
- c.ackMode = ackMode;
- c.lastDeliveryTag = 0;
- }
-
- // FIXME aconway 2007-03-23: get processed in both.
-
- // BasicConsumeOkBody is really processed in handle(), here
- // we just pick up the tag to return to the user.
- //
- // We can't process it here because messages for the consumer may
- // already be arriving.
- //
- BasicConsumeOkBody::shared_ptr ok =
- channel.sendAndReceiveSync<BasicConsumeOkBody>(
- synch,
- make_shared_ptr(new BasicConsumeBody(
- channel.version, 0, queue.getName(), tag, noLocal,
- ackMode == NO_ACK, false, !synch,
- fields ? *fields : FieldTable())));
- tag = ok->getConsumerTag();
-}
-
-
-void BasicMessageChannel::cancel(const std::string& tag, bool synch) {
- Consumer c;
- {
- Mutex::ScopedLock l(lock);
- ConsumerMap::iterator i = consumers.find(tag);
- if (i == consumers.end())
- return;
- c = i->second;
- consumers.erase(i);
- }
- if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0) {
- channel.send(make_shared_ptr(new BasicAckBody(channel.version, c.lastDeliveryTag, true)));
- }
- channel.sendAndReceiveSync<BasicCancelOkBody>(
- synch, make_shared_ptr(new BasicCancelBody(channel.version, tag, !synch)));
-}
-
-void BasicMessageChannel::close(){
- destGet.shutdown();
- destDispatch.shutdown();
-}
-
-void BasicMessageChannel::cancelAll(){
- Mutex::ScopedLock l(lock);
- for (ConsumerMap::iterator i = consumers.begin(); i != consumers.end(); i++)
- {
- Consumer& c = i->second;
- if (c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0)
- {
- channel.send(make_shared_ptr(new BasicAckBody(channel.version, c.lastDeliveryTag, true)));
- }
- channel.send(make_shared_ptr(new BasicCancelBody(channel.version, i->first, true)));
- }
- consumers.clear();
-}
-
-bool BasicMessageChannel::get(
- Message& msg, const Queue& queue, AckMode ackMode)
-{
- // Prepare for incoming response
- incoming.addDestination(BASIC_GET, destGet);
- channel.send(make_shared_ptr(new BasicGetBody(channel.version, 0, queue.getName(), ackMode)));
- bool got = destGet.wait(msg);
- return got;
-}
-
-void BasicMessageChannel::publish(
- const Message& msg, const Exchange& exchange,
- const std::string& routingKey, bool mandatory, bool immediate)
-{
- const string e = exchange.getName();
- string key = routingKey;
-
- // Make a header for the message
- AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
- BasicHeaderProperties::copy(
- *static_cast<BasicHeaderProperties*>(header->getProperties()), msg);
- header->setContentSize(msg.getData().size());
-
- channel.send(make_shared_ptr(new BasicPublishBody(channel.version, 0, e, key, mandatory, immediate)));
- channel.send(header);
- string data = msg.getData();
- u_int64_t data_length = data.length();
- if(data_length > 0){
- //frame itself uses 8 bytes
- u_int32_t frag_size = channel.connection->getMaxFrameSize() - 8;
- if(data_length < frag_size){
- channel.send(make_shared_ptr(new AMQContentBody(data)));
- }else{
- u_int32_t offset = 0;
- u_int32_t remaining = data_length - offset;
- while (remaining > 0) {
- u_int32_t length = remaining > frag_size ? frag_size : remaining;
- string frag(data.substr(offset, length));
- channel.send(make_shared_ptr(new AMQContentBody(frag)));
-
- offset += length;
- remaining = data_length - offset;
- }
- }
- }
-}
-
-void BasicMessageChannel::handle(boost::shared_ptr<AMQMethodBody> method) {
- assert(method->amqpClassId() ==BasicGetBody::CLASS_ID);
- switch(method->amqpMethodId()) {
- case BasicGetOkBody::METHOD_ID: {
- incoming.openReference(BASIC_REF);
- incoming.createMessage(BASIC_GET, BASIC_REF);
- return;
- }
- case BasicGetEmptyBody::METHOD_ID: {
- incoming.getDestination(BASIC_GET).empty();
- incoming.removeDestination(BASIC_GET);
- return;
- }
- case BasicDeliverBody::METHOD_ID: {
- BasicDeliverBody::shared_ptr deliver=
- boost::shared_polymorphic_downcast<BasicDeliverBody>(method);
- incoming.openReference(BASIC_REF);
- Message& msg = incoming.createMessage(
- deliver->getConsumerTag(), BASIC_REF);
- msg.setDestination(deliver->getConsumerTag());
- msg.setDeliveryTag(deliver->getDeliveryTag());
- msg.setRedelivered(deliver->getRedelivered());
- return;
- }
- case BasicConsumeOkBody::METHOD_ID: {
- Mutex::ScopedLock l(lock);
- BasicConsumeOkBody::shared_ptr consumeOk =
- boost::shared_polymorphic_downcast<BasicConsumeOkBody>(method);
- std::string tag = consumeOk->getConsumerTag();
- ConsumerMap::iterator i = consumers.find(std::string());
- if (i != consumers.end()) {
- // Need to rename the un-named consumer.
- if (consumers.find(tag) == consumers.end()) {
- consumers[tag] = i->second;
- consumers.erase(i);
- }
- else // Tag already exists.
- throw ChannelException(404, "Tag already exists: "+tag);
- }
- // FIXME aconway 2007-03-23: Integrate consumer & destination
- // maps.
- incoming.addDestination(tag, destDispatch);
- return;
- }
- }
- throw Channel::UnknownMethod();
-}
-
-void BasicMessageChannel::handle(AMQHeaderBody::shared_ptr header) {
- BasicHeaderProperties* props =
- boost::polymorphic_downcast<BasicHeaderProperties*>(
- header->getProperties());
- IncomingMessage::Reference& ref = incoming.getReference(BASIC_REF);
- assert (ref.messages.size() == 1);
- ref.messages.front().BasicHeaderProperties::operator=(*props);
- incoming_size = header->getContentSize();
- if (incoming_size==0)
- incoming.closeReference(BASIC_REF);
-}
-
-void BasicMessageChannel::handle(AMQContentBody::shared_ptr content){
- incoming.appendReference(BASIC_REF, content->getData());
- size_t size = incoming.getReference(BASIC_REF).data.size();
- if (size >= incoming_size) {
- incoming.closeReference(BASIC_REF);
- if (size > incoming_size)
- throw ChannelException(502, "Content exceeded declared size");
- }
-}
-
-void BasicMessageChannel::deliver(Consumer& consumer, Message& msg){
- //record delivery tag:
- consumer.lastDeliveryTag = msg.getDeliveryTag();
-
- //allow registered listener to handle the message
- consumer.listener->received(msg);
-
- if(channel.isOpen()){
- bool multiple(false);
- switch(consumer.ackMode){
- case LAZY_ACK:
- multiple = true;
- if(++(consumer.count) < channel.getPrefetch())
- break;
- //else drop-through
- case AUTO_ACK:
- consumer.lastDeliveryTag = 0;
- channel.send(make_shared_ptr(
- new BasicAckBody(
- channel.version,
- msg.getDeliveryTag(),
- multiple)));
- case NO_ACK: // Nothing to do
- case CLIENT_ACK: // User code must ack.
- break;
- // TODO aconway 2007-02-22: Provide a way for user
- // to ack!
- }
- }
-
- //as it stands, transactionality is entirely orthogonal to ack
- //mode, though the acks will not be processed by the broker under
- //a transaction until it commits.
-}
-
-
-void BasicMessageChannel::run() {
- while(channel.isOpen()) {
- try {
- Message msg;
- bool gotMessge = destDispatch.wait(msg);
- if (gotMessge) {
- if(msg.getDestination() == BASIC_RETURN) {
- ReturnedMessageHandler* handler=0;
- {
- Mutex::ScopedLock l(lock);
- handler=returnsHandler;
- }
- if(handler != 0)
- handler->returned(msg);
- }
- else {
- Consumer consumer;
- {
- Mutex::ScopedLock l(lock);
- ConsumerMap::iterator i = consumers.find(
- msg.getDestination());
- if(i == consumers.end())
- THROW_QPID_ERROR(PROTOCOL_ERROR+504,
- "Unknown consumer tag=" +
- msg.getDestination());
- consumer = i->second;
- }
- deliver(consumer, msg);
- }
- }
- }
- catch (const ShutdownException&) {
- // Orderly shutdown.
- }
- catch (const Exception& e) {
- std::cout << "Error caught by dispatch thread: " << e.what() << std::endl;
- // FIXME aconway 2007-02-20: Report exception to user.
- QPID_LOG(error, e.what());
- }
- }
-}
-
-void BasicMessageChannel::setReturnedMessageHandler(ReturnedMessageHandler* handler){
- Mutex::ScopedLock l(lock);
- returnsHandler = handler;
-}
-
-void BasicMessageChannel::setQos(){
- channel.send(make_shared_ptr(new BasicQosBody(channel.version, 0, channel.getPrefetch(), false)));
- if(channel.isTransactional())
- channel.send(make_shared_ptr(new TxSelectBody(channel.version)));
-}
-
-}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/BasicMessageChannel.h b/cpp/src/qpid/client/BasicMessageChannel.h
deleted file mode 100644
index 99838321ae..0000000000
--- a/cpp/src/qpid/client/BasicMessageChannel.h
+++ /dev/null
@@ -1,92 +0,0 @@
-#ifndef _client_BasicMessageChannel_h
-#define _client_BasicMessageChannel_h
-
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "MessageChannel.h"
-#include "IncomingMessage.h"
-#include <boost/scoped_ptr.hpp>
-
-namespace qpid {
-namespace client {
-/**
- * Messaging implementation using AMQP 0-8 BasicMessageChannel class
- * to send and receiving messages.
- */
-class BasicMessageChannel : public MessageChannel
-{
- public:
- BasicMessageChannel(Channel& parent);
-
- void consume(
- Queue& queue, std::string& tag, MessageListener* listener,
- AckMode ackMode = NO_ACK, bool noLocal = false, bool synch = true,
- const framing::FieldTable* fields = 0);
-
- void cancel(const std::string& tag, bool synch = true);
-
- bool get(Message& msg, const Queue& queue, AckMode ackMode = NO_ACK);
-
- void publish(const Message& msg, const Exchange& exchange,
- const std::string& routingKey,
- bool mandatory = false, bool immediate = false);
-
- void setReturnedMessageHandler(ReturnedMessageHandler* handler);
-
- void run();
-
- void handle(boost::shared_ptr<framing::AMQMethodBody>);
-
- void handle(shared_ptr<framing::AMQHeaderBody>);
-
- void handle(shared_ptr<framing::AMQContentBody>);
-
- void setQos();
-
- void close();
-
- void cancelAll();
-
- private:
-
- struct Consumer{
- MessageListener* listener;
- AckMode ackMode;
- int count;
- u_int64_t lastDeliveryTag;
- };
- typedef std::map<std::string, Consumer> ConsumerMap;
-
- void deliver(Consumer& consumer, Message& msg);
-
- sys::Mutex lock;
- Channel& channel;
- IncomingMessage incoming;
- uint64_t incoming_size;
- ConsumerMap consumers ;
- ReturnedMessageHandler* returnsHandler;
- IncomingMessage::WaitableDestination destGet;
- IncomingMessage::WaitableDestination destDispatch;
-};
-
-}} // namespace qpid::client
-
-
-
-#endif /*!_client_BasicMessageChannel_h*/
diff --git a/cpp/src/qpid/client/BlockingQueue.h b/cpp/src/qpid/client/BlockingQueue.h
new file mode 100644
index 0000000000..7081b76b68
--- /dev/null
+++ b/cpp/src/qpid/client/BlockingQueue.h
@@ -0,0 +1,87 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef _BlockingQueue_
+#define _BlockingQueue_
+
+#include <queue>
+#include "qpid/sys/Monitor.h"
+
+namespace qpid {
+namespace client {
+
+struct QueueClosed {};
+
+template <class T>
+class BlockingQueue
+{
+ sys::Monitor lock;
+ std::queue<T> queue;
+ bool closed;
+
+public:
+
+ BlockingQueue() : closed(false) {}
+
+ void reset()
+ {
+ sys::Monitor::ScopedLock l(lock);
+ closed = true;
+ }
+
+ T pop()
+ {
+ sys::Monitor::ScopedLock l(lock);
+ while (!closed && queue.empty()) {
+ lock.wait();
+ }
+ if (closed) {
+ throw QueueClosed();
+ } else {
+ T t = queue.front();
+ queue.pop();
+ return t;
+ }
+ }
+
+ void push(T t)
+ {
+ sys::Monitor::ScopedLock l(lock);
+ bool wasEmpty = queue.empty();
+ queue.push(t);
+ if (wasEmpty) {
+ lock.notifyAll();
+ }
+ }
+
+ void close()
+ {
+ sys::Monitor::ScopedLock l(lock);
+ closed = true;
+ lock.notifyAll();
+ }
+};
+
+}}
+
+
+
+#endif
diff --git a/cpp/src/qpid/client/ChainableFrameHandler.h b/cpp/src/qpid/client/ChainableFrameHandler.h
new file mode 100644
index 0000000000..29e16d53dc
--- /dev/null
+++ b/cpp/src/qpid/client/ChainableFrameHandler.h
@@ -0,0 +1,47 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef _ChainableFrameHandler_
+#define _ChainableFrameHandler_
+
+#include <boost/function.hpp>
+#include "qpid/framing/AMQFrame.h"
+
+namespace qpid {
+namespace client {
+
+struct ChainableFrameHandler
+{
+ typedef boost::function<void(framing::AMQFrame&)> FrameDelegate;
+
+ FrameDelegate in;
+ FrameDelegate out;
+
+ ChainableFrameHandler() {}
+ ChainableFrameHandler(FrameDelegate i, FrameDelegate o): in(i), out(o) {}
+ virtual ~ChainableFrameHandler() {}
+};
+
+}}
+
+
+
+#endif
diff --git a/cpp/src/qpid/client/ChannelHandler.cpp b/cpp/src/qpid/client/ChannelHandler.cpp
new file mode 100644
index 0000000000..a6aea438f0
--- /dev/null
+++ b/cpp/src/qpid/client/ChannelHandler.cpp
@@ -0,0 +1,123 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ChannelHandler.h"
+#include "qpid/framing/amqp_framing.h"
+
+using namespace qpid::client;
+using namespace qpid::framing;
+using namespace boost;
+
+ChannelHandler::ChannelHandler() : StateManager(CLOSED), id(0) {}
+
+void ChannelHandler::incoming(AMQFrame& frame)
+{
+ AMQBody::shared_ptr body = frame.getBody();
+ if (getState() == OPEN) {
+ if (isA<ChannelCloseBody>(body)) {
+ ChannelCloseBody::shared_ptr method(shared_polymorphic_cast<ChannelCloseBody>(body));
+ setState(CLOSED);
+ if (onClose) {
+ onClose(method->getReplyCode(), method->getReplyText());
+ }
+ } else {
+ try {
+ in(frame);
+ }catch(ChannelException& e){
+ if (body->type() == METHOD_BODY) {
+ AMQMethodBody::shared_ptr method(shared_polymorphic_cast<AMQMethodBody>(body));
+ close(e.code, e.toString(), method->amqpClassId(), method->amqpMethodId());
+ } else {
+ close(e.code, e.toString(), 0, 0);
+ }
+ }
+ }
+ } else {
+ if (body->type() == METHOD_BODY) {
+ handleMethod(shared_polymorphic_cast<AMQMethodBody>(body));
+ } else {
+ throw new ConnectionException(504, "Channel not open.");
+ }
+
+ }
+}
+
+void ChannelHandler::outgoing(AMQFrame& frame)
+{
+ if (getState() == OPEN) {
+ frame.channel = id;
+ out(frame);
+ } else {
+ throw Exception("Channel not open");
+ }
+}
+
+void ChannelHandler::open(uint16_t _id)
+{
+ id = _id;
+
+ setState(OPENING);
+ AMQFrame f(version, id, make_shared_ptr(new ChannelOpenBody(version)));
+ out(f);
+
+ std::set<int> states;
+ states.insert(OPEN);
+ states.insert(CLOSED);
+ waitFor(states);
+ if (getState() != OPEN) {
+ throw Exception("Failed to open channel.");
+ }
+}
+
+void ChannelHandler::close(uint16_t code, const std::string& message, uint16_t classId, uint16_t methodId)
+{
+ setState(CLOSING);
+ AMQFrame f(version, id, make_shared_ptr(new ChannelCloseBody(version, code, message, classId, methodId)));
+ out(f);
+}
+
+void ChannelHandler::close()
+{
+ close(200, "OK", 0, 0);
+ waitFor(CLOSED);
+}
+
+void ChannelHandler::handleMethod(AMQMethodBody::shared_ptr method)
+{
+ switch (getState()) {
+ case OPENING:
+ if (method->isA<ChannelOpenOkBody>()) {
+ setState(OPEN);
+ } else {
+ throw ConnectionException(504, "Channel not opened.");
+ }
+ break;
+ case CLOSING:
+ if (method->isA<ChannelCloseOkBody>()) {
+ setState(CLOSED);
+ } //else just ignore it
+ break;
+ case CLOSED:
+ throw ConnectionException(504, "Channel not opened.");
+ default:
+ throw Exception("Unexpected state encountered in ChannelHandler!");
+ }
+}
diff --git a/cpp/src/qpid/client/ChannelHandler.h b/cpp/src/qpid/client/ChannelHandler.h
new file mode 100644
index 0000000000..eaa7e7cc72
--- /dev/null
+++ b/cpp/src/qpid/client/ChannelHandler.h
@@ -0,0 +1,64 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _ChannelHandler_
+#define _ChannelHandler_
+
+#include "StateManager.h"
+#include "ChainableFrameHandler.h"
+#include "qpid/framing/amqp_framing.h"
+
+namespace qpid {
+namespace client {
+
+class ChannelHandler : private StateManager, public ChainableFrameHandler
+{
+ enum STATES {OPENING, OPEN, CLOSING, CLOSED};
+ framing::ProtocolVersion version;
+ uint16_t id;
+
+ void handleMethod(framing::AMQMethodBody::shared_ptr method);
+
+ template <class T> bool isA(framing::AMQBody::shared_ptr body) {
+ return body->type() == framing::METHOD_BODY &&
+ boost::shared_polymorphic_cast<framing::AMQMethodBody>(body)->isA<T>();
+ }
+
+
+ void close(uint16_t code, const std::string& message, uint16_t classId, uint16_t methodId);
+
+
+public:
+ typedef boost::function<void(uint16_t, const std::string&)> CloseListener;
+
+ ChannelHandler();
+
+ void incoming(framing::AMQFrame& frame);
+ void outgoing(framing::AMQFrame& frame);
+
+ void open(uint16_t id);
+ void close();
+
+ CloseListener onClose;
+};
+
+}}
+
+#endif
diff --git a/cpp/src/qpid/client/ClientChannel.cpp b/cpp/src/qpid/client/ClientChannel.cpp
index 19b4726a72..8b85017ba0 100644
--- a/cpp/src/qpid/client/ClientChannel.cpp
+++ b/cpp/src/qpid/client/ClientChannel.cpp
@@ -24,9 +24,12 @@
#include "qpid/sys/Monitor.h"
#include "ClientMessage.h"
#include "qpid/QpidError.h"
-#include "MethodBodyInstances.h"
#include "Connection.h"
-#include "BasicMessageChannel.h"
+#include "ConnectionHandler.h"
+#include "FutureResponse.h"
+#include "MessageListener.h"
+#include <boost/format.hpp>
+#include <boost/bind.hpp>
// FIXME aconway 2007-01-26: Evaluate all throws, ensure consistent
// handling of errors that should close the connection or the channel.
@@ -45,18 +48,13 @@ const std::string empty;
}}
-Channel::Channel(bool _transactional, u_int16_t _prefetch, InteropMode mode) :
+Channel::Channel(bool _transactional, u_int16_t _prefetch) :
connection(0), prefetch(_prefetch), transactional(_transactional), errorCode(200), errorText("Ok"), running(false)
{
- switch (mode) {
- case AMQP_08: messaging.reset(new BasicMessageChannel(*this)); break;
- default: assert(0); QPID_ERROR(INTERNAL_ERROR, "Invalid interop-mode.");
- }
}
Channel::~Channel(){
closeInternal();
- stop();
}
void Channel::open(ChannelId id, Connection& con)
@@ -64,65 +62,15 @@ void Channel::open(ChannelId id, Connection& con)
if (isOpen())
THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel "+id);
connection = &con;
- init(id, con, con.getVersion()); // ChannelAdapter initialization.
- string oob;
- if (id != 0)
- sendAndReceive<ChannelOpenOkBody>(make_shared_ptr(new ChannelOpenBody(version, oob)));
-}
+ channelId = id;
+ //link up handlers:
+ channelHandler.out = boost::bind(&ConnectionHandler::outgoing, &(connection->handler), _1);
+ channelHandler.in = boost::bind(&ExecutionHandler::handle, &executionHandler, _1);
+ executionHandler.out = boost::bind(&ChannelHandler::outgoing, &channelHandler, _1);
+ //set up close notification:
+ channelHandler.onClose = boost::bind(&Channel::peerClose, this, _1, _2);
-void Channel::protocolInit(
- const std::string& uid, const std::string& pwd, const std::string& vhost) {
- assert(connection);
- responses.expect();
- connection->connector->init(); // Send ProtocolInit block.
- ConnectionStartBody::shared_ptr connectionStart =
- responses.receive<ConnectionStartBody>();
-
- FieldTable props;
- string mechanism("PLAIN");
- string response = ((char)0) + uid + ((char)0) + pwd;
- string locale("en_US");
- ConnectionTuneBody::shared_ptr proposal =
- sendAndReceive<ConnectionTuneBody>(
- make_shared_ptr(new ConnectionStartOkBody(
- version, //connectionStart->getRequestId(),
- props, mechanism,
- response, locale)));
-
- /**
- * Assume for now that further challenges will not be required
- //receive connection.secure
- responses.receive(connection_secure));
- //send connection.secure-ok
- connection->send(new AMQFrame(0, new ConnectionSecureOkBody(response)));
- **/
-
- sendCommand(make_shared_ptr(new ConnectionTuneOkBody(
- version, //proposal->getRequestId(),
- proposal->getChannelMax(), connection->getMaxFrameSize(),
- proposal->getHeartbeat())));
-
- uint16_t heartbeat = proposal->getHeartbeat();
- connection->connector->setReadTimeout(heartbeat * 2);
- connection->connector->setWriteTimeout(heartbeat);
-
- // Send connection open.
- std::string capabilities;
- responses.expect();
- sendCommand(make_shared_ptr(new ConnectionOpenBody(version, vhost, capabilities, true)));
- //receive connection.open-ok (or redirect, but ignore that for now
- //esp. as using force=true).
- AMQMethodBody::shared_ptr openResponse = responses.receive();
- if(openResponse->isA<ConnectionOpenOkBody>()) {
- //ok
- }else if(openResponse->isA<ConnectionRedirectBody>()){
- //ignore for now
- ConnectionRedirectBody::shared_ptr redirect(
- shared_polymorphic_downcast<ConnectionRedirectBody>(openResponse));
- QPID_LOG(error, "Ignoring redirect to " << redirect->getHost());
- } else {
- THROW_QPID_ERROR(PROTOCOL_ERROR, "Bad response to Connection.open");
- }
+ channelHandler.open(id);
}
bool Channel::isOpen() const {
@@ -131,7 +79,11 @@ bool Channel::isOpen() const {
}
void Channel::setQos() {
- messaging->setQos();
+ executionHandler.send(make_shared_ptr(new BasicQosBody(version, 0, getPrefetch(), false)));
+ if(isTransactional()) {
+ //I think this is wrong! should only send TxSelect once...
+ executionHandler.send(make_shared_ptr(new TxSelectBody(version)));
+ }
}
void Channel::setPrefetch(uint16_t _prefetch){
@@ -143,14 +95,12 @@ void Channel::declareExchange(Exchange& exchange, bool synch){
string name = exchange.getName();
string type = exchange.getType();
FieldTable args;
- send(make_shared_ptr(new ExchangeDeclareBody(version, 0, name, type, empty, false, false, false, args)));
- if (synch) synchWithServer();
+ sendSync(synch, make_shared_ptr(new ExchangeDeclareBody(version, 0, name, type, empty, false, false, false, args)));
}
void Channel::deleteExchange(Exchange& exchange, bool synch){
string name = exchange.getName();
- send(make_shared_ptr(new ExchangeDeleteBody(version, 0, name, false)));
- if (synch) synchWithServer();
+ sendSync(synch, make_shared_ptr(new ExchangeDeleteBody(version, 0, name, false)));
}
void Channel::declareQueue(Queue& queue, bool synch){
@@ -179,131 +129,41 @@ void Channel::deleteQueue(Queue& queue, bool ifunused, bool ifempty, bool synch)
void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){
string e = exchange.getName();
string q = queue.getName();
- send(make_shared_ptr(new QueueBindBody(version, 0, q, e, key, args)));
- if (synch) synchWithServer();
+ sendSync(synch, make_shared_ptr(new QueueBindBody(version, 0, q, e, key, args)));
}
void Channel::commit(){
- send(make_shared_ptr(new TxCommitBody(version)));
+ executionHandler.send(make_shared_ptr(new TxCommitBody(version)));
}
void Channel::rollback(){
- send(make_shared_ptr(new TxRollbackBody(version)));
+ executionHandler.send(make_shared_ptr(new TxRollbackBody(version)));
}
-void Channel::handleMethodInContext(
-AMQMethodBody::shared_ptr method, const MethodContext& ctxt)
+void Channel::close()
{
- // Special case for consume OK as it is both an expected response
- // and needs handling in this thread.
- if (method->isA<BasicConsumeOkBody>()) {
- messaging->handle(method);
- responses.signalResponse(method);
- return;
- }
- if(responses.isWaiting()) {
- responses.signalResponse(method);
- return;
- }
- try {
- switch (method->amqpClassId()) {
- case MessageTransferBody::CLASS_ID:
- case BasicGetOkBody::CLASS_ID: messaging->handle(method); break;
- case ChannelCloseBody::CLASS_ID: handleChannel(method, ctxt); break;
- case ConnectionCloseBody::CLASS_ID: handleConnection(method); break;
- case ExecutionCompleteBody::CLASS_ID: handleExecution(method); break;
- default: throw UnknownMethod();
- }
- }
- catch (const UnknownMethod&) {
- connection->close(
- 504, "Unknown method",
- method->amqpClassId(), method->amqpMethodId());
- }
- }
-
-void Channel::handleChannel(AMQMethodBody::shared_ptr method, const MethodContext& /*ctxt*/) {
- switch (method->amqpMethodId()) {
- case ChannelCloseBody::METHOD_ID:
- sendCommand(make_shared_ptr(new ChannelCloseOkBody(version/*, ctxt.getRequestId()*/)));
- peerClose(shared_polymorphic_downcast<ChannelCloseBody>(method));
- return;
- case ChannelFlowBody::METHOD_ID:
- // FIXME aconway 2007-02-22: Not yet implemented.
- return;
- }
- throw UnknownMethod();
-}
-
-void Channel::handleConnection(AMQMethodBody::shared_ptr method) {
- if (method->amqpMethodId() == ConnectionCloseBody::METHOD_ID) {
- connection->close();
- return;
- }
- throw UnknownMethod();
-}
-
-void Channel::handleExecution(AMQMethodBody::shared_ptr method) {
- if (method->amqpMethodId() == ExecutionCompleteBody::METHOD_ID) {
- Monitor::ScopedLock l(outgoingMonitor);
- //record the completion mark:
- outgoing.lwm = shared_polymorphic_downcast<ExecutionCompleteBody>(method)->getCumulativeExecutionMark();
- //TODO: notify anyone waiting for completion notification:
- outgoingMonitor.notifyAll();
- } else{
- throw UnknownMethod();
- }
-}
-
-void Channel::handleHeader(AMQHeaderBody::shared_ptr body){
- messaging->handle(body);
-}
-
-void Channel::handleContent(AMQContentBody::shared_ptr body){
- messaging->handle(body);
-}
-
-void Channel::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Channel received heartbeat");
-}
-
-void Channel::start(){
- running = true;
- dispatcher = Thread(*messaging);
-}
-
-// Close called by local application.
-void Channel::close(
- uint16_t code, const std::string& text,
- ClassId classId, MethodId methodId)
-{
- if (isOpen()) {
- try {
- if (getId() != 0) {
- if (code == 200) messaging->cancelAll();
-
- sendAndReceive<ChannelCloseOkBody>(
- make_shared_ptr(new ChannelCloseBody(
- version, code, text, classId, methodId)));
- }
- static_cast<ConnectionForChannel*>(connection)->erase(getId());
- closeInternal();
- } catch (...) {
- static_cast<ConnectionForChannel*>(connection)->erase(getId());
- closeInternal();
- throw;
+ channelHandler.close();
+ {
+ Mutex::ScopedLock l(lock);
+ if (connection);
+ {
+ connection->erase(channelId);
+ connection = 0;
}
}
stop();
}
+
// Channel closed by peer.
-void Channel::peerClose(ChannelCloseBody::shared_ptr reason) {
+void Channel::peerClose(uint16_t code, const std::string& message) {
assert(isOpen());
//record reason:
- errorCode = reason->getReplyCode();
- errorText = reason->getReplyText();
+ errorCode = code;
+ errorText = message;
closeInternal();
+ stop();
+ futures.close(code, message);
}
void Channel::closeInternal() {
@@ -311,26 +171,26 @@ void Channel::closeInternal() {
if (connection);
{
connection = 0;
- messaging->close();
- // A 0 response means we are closed.
- responses.signalResponse(AMQMethodBody::shared_ptr());
}
}
-void Channel::stop() {
- Mutex::ScopedLock l(stopLock);
- if(running) {
- dispatcher.join();
- running = false;
- }
+AMQMethodBody::shared_ptr Channel::sendAndReceive(AMQMethodBody::shared_ptr toSend, ClassId /*c*/, MethodId /*m*/)
+{
+
+ boost::shared_ptr<FutureResponse> fr(futures.createResponse());
+ executionHandler.send(toSend, boost::bind(&FutureResponse::completed, fr), boost::bind(&FutureResponse::received, fr, _1));
+ return fr->getResponse();
}
-AMQMethodBody::shared_ptr Channel::sendAndReceive(
- AMQMethodBody::shared_ptr toSend, ClassId c, MethodId m)
+void Channel::sendSync(bool sync, AMQMethodBody::shared_ptr command)
{
- responses.expect();
- sendCommand(toSend);
- return responses.receive(c, m);
+ if(sync) {
+ boost::shared_ptr<FutureCompletion> fc(futures.createCompletion());
+ executionHandler.send(command, boost::bind(&FutureCompletion::completed, fc));
+ fc->waitForCompletion();
+ } else {
+ executionHandler.send(command);
+ }
}
AMQMethodBody::shared_ptr Channel::sendAndReceiveSync(
@@ -339,68 +199,138 @@ AMQMethodBody::shared_ptr Channel::sendAndReceiveSync(
if(sync)
return sendAndReceive(body, c, m);
else {
- sendCommand(body);
+ executionHandler.send(body);
return AMQMethodBody::shared_ptr();
}
}
void Channel::consume(
- Queue& queue, std::string& tag, MessageListener* listener,
+ Queue& queue, const std::string& tag, MessageListener* listener,
AckMode ackMode, bool noLocal, bool synch, const FieldTable* fields) {
- messaging->consume(queue, tag, listener, ackMode, noLocal, synch, fields);
+
+ if (tag.empty()) {
+ throw Exception("A tag must be specified for a consumer.");
+ }
+ {
+ Mutex::ScopedLock l(lock);
+ ConsumerMap::iterator i = consumers.find(tag);
+ if (i != consumers.end())
+ throw Exception(boost::format("Consumer already exists with tag: '%1%'") % tag);
+ Consumer& c = consumers[tag];
+ c.listener = listener;
+ c.ackMode = ackMode;
+ c.lastDeliveryTag = 0;
+ }
+ sendAndReceiveSync<BasicConsumeOkBody>(
+ synch,
+ make_shared_ptr(new BasicConsumeBody(
+ version, 0, queue.getName(), tag, noLocal,
+ ackMode == NO_ACK, false, !synch,
+ fields ? *fields : FieldTable())));
}
void Channel::cancel(const std::string& tag, bool synch) {
- messaging->cancel(tag, synch);
+ Consumer c;
+ {
+ Mutex::ScopedLock l(lock);
+ ConsumerMap::iterator i = consumers.find(tag);
+ if (i == consumers.end())
+ return;
+ c = i->second;
+ consumers.erase(i);
+ }
+ sendAndReceiveSync<BasicCancelOkBody>(
+ synch, make_shared_ptr(new BasicCancelBody(version, tag, !synch)));
}
bool Channel::get(Message& msg, const Queue& queue, AckMode ackMode) {
- bool result = messaging->get(msg, queue, ackMode);
- if (!isOpen()) {
- throw ChannelException(errorCode, errorText);
+
+ AMQMethodBody::shared_ptr request(new BasicGetBody(version, 0, queue.getName(), ackMode));
+ AMQMethodBody::shared_ptr response = sendAndReceive(request);
+ if (response && response->isA<BasicGetEmptyBody>()) {
+ return false;
+ } else {
+ ReceivedContent::shared_ptr content = gets.pop();
+ content->populate(msg);
+ return true;
}
- return result;
}
void Channel::publish(const Message& msg, const Exchange& exchange,
const std::string& routingKey,
bool mandatory, bool immediate) {
- messaging->publish(msg, exchange, routingKey, mandatory, immediate);
-}
-
-void Channel::setReturnedMessageHandler(ReturnedMessageHandler* handler) {
- messaging->setReturnedMessageHandler(handler);
-}
-void Channel::run() {
- messaging->run();
+ const string e = exchange.getName();
+ string key = routingKey;
+
+ executionHandler.sendContent(make_shared_ptr(new BasicPublishBody(version, 0, e, key, mandatory, immediate)),
+ msg, msg.getData(), connection->getMaxFrameSize());//sending framesize here is horrible, fix this!
+ /*
+ // Make a header for the message
+ AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
+ BasicHeaderProperties::copy(
+ *static_cast<BasicHeaderProperties*>(header->getProperties()), msg);
+ header->setContentSize(msg.getData().size());
+
+ executionHandler.send(make_shared_ptr(new BasicPublishBody(version, 0, e, key, mandatory, immediate)));
+ executionHandler.sendContent(header);
+ string data = msg.getData();
+ u_int64_t data_length = data.length();
+ if(data_length > 0){
+ //frame itself uses 8 bytes
+ u_int32_t frag_size = connection->getMaxFrameSize() - 8;
+ if(data_length < frag_size){
+ executionHandler.sendContent(make_shared_ptr(new AMQContentBody(data)));
+ }else{
+ u_int32_t offset = 0;
+ u_int32_t remaining = data_length - offset;
+ while (remaining > 0) {
+ u_int32_t length = remaining > frag_size ? frag_size : remaining;
+ string frag(data.substr(offset, length));
+ executionHandler.sendContent(make_shared_ptr(new AMQContentBody(frag)));
+
+ offset += length;
+ remaining = data_length - offset;
+ }
+ }
+ }
+ */
}
-void Channel::sendCommand(AMQBody::shared_ptr body)
-{
- ++(outgoing.hwm);
- send(body);
+void Channel::start(){
+ running = true;
+ dispatcher = Thread(*this);
}
-bool Channel::waitForCompletion(SequenceNumber poi, Duration timeout)
-{
- AbsTime end;
- if (timeout == 0) {
- end = AbsTime::FarFuture();
- } else {
- end = AbsTime(AbsTime::now(), timeout);
- }
-
- Monitor::ScopedLock l(outgoingMonitor);
- while (end > AbsTime::now() && outgoing.lwm < poi) {
- outgoingMonitor.wait(end);
+void Channel::stop() {
+ executionHandler.received.close();
+ gets.close();
+ Mutex::ScopedLock l(stopLock);
+ if(running) {
+ dispatcher.join();
+ running = false;
}
- return !(outgoing.lwm < poi);
}
-bool Channel::synchWithServer(Duration timeout)
-{
- send(make_shared_ptr(new ExecutionFlushBody(version)));
- return waitForCompletion(outgoing.hwm, timeout);
+void Channel::run() {
+ try {
+ while (true) {
+ ReceivedContent::shared_ptr content = executionHandler.received.pop();
+ //need to dispatch this to the relevant listener:
+ if (content->isA<BasicDeliverBody>()) {
+ ConsumerMap::iterator i = consumers.find(content->as<BasicDeliverBody>()->getConsumerTag());
+ if (i != consumers.end()) {
+ Message msg;
+ content->populate(msg);
+ i->second.listener->received(msg);
+ } else {
+ QPID_LOG(warning, "Dropping message for unrecognised consumer: " << content->getMethod());
+ }
+ } else if (content->isA<BasicGetOkBody>()) {
+ gets.push(content);
+ } else {
+ QPID_LOG(warning, "Dropping unsupported message type: " << content->getMethod());
+ }
+ }
+ } catch (const QueueClosed&) {}
}
-
diff --git a/cpp/src/qpid/client/ClientChannel.h b/cpp/src/qpid/client/ClientChannel.h
index fc82fb41ff..4853603281 100644
--- a/cpp/src/qpid/client/ClientChannel.h
+++ b/cpp/src/qpid/client/ClientChannel.h
@@ -26,10 +26,12 @@
#include "ClientExchange.h"
#include "ClientMessage.h"
#include "ClientQueue.h"
-#include "ResponseHandler.h"
+#include "ChannelHandler.h"
+#include "ExecutionHandler.h"
+#include "FutureFactory.h"
#include "qpid/Exception.h"
-#include "qpid/framing/ChannelAdapter.h"
-#include "qpid/framing/SequenceNumber.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Runnable.h"
#include "qpid/sys/Thread.h"
#include "AckMode.h"
@@ -54,19 +56,23 @@ class ReturnedMessageHandler;
*
* \ingroup clientapi
*/
-class Channel : public framing::ChannelAdapter
+class Channel : private sys::Runnable
{
private:
struct UnknownMethod {};
typedef shared_ptr<framing::AMQMethodBody> MethodPtr;
+
+ struct Consumer{
+ MessageListener* listener;
+ AckMode ackMode;
+ int count;
+ u_int64_t lastDeliveryTag;
+ };
+ typedef std::map<std::string, Consumer> ConsumerMap;
mutable sys::Mutex lock;
- boost::scoped_ptr<MessageChannel> messaging;
Connection* connection;
sys::Thread dispatcher;
- ResponseHandler responses;
- sys::Monitor outgoingMonitor;
- framing::Window outgoing;
uint16_t prefetch;
const bool transactional;
@@ -78,32 +84,29 @@ class Channel : public framing::ChannelAdapter
sys::Mutex stopLock;
bool running;
- void stop();
+ ConsumerMap consumers;
+ ExecutionHandler executionHandler;
+ ChannelHandler channelHandler;
+ framing::ChannelId channelId;
+ BlockingQueue<ReceivedContent::shared_ptr> gets;
+ FutureFactory futures;
- void handleHeader(framing::AMQHeaderBody::shared_ptr body);
- void handleContent(framing::AMQContentBody::shared_ptr body);
- void handleHeartbeat(framing::AMQHeartbeatBody::shared_ptr body);
- void handleMethodInContext(
- framing::AMQMethodBody::shared_ptr, const framing::MethodContext&);
- void handleChannel(framing::AMQMethodBody::shared_ptr method, const framing::MethodContext& ctxt);
- void handleConnection(framing::AMQMethodBody::shared_ptr method);
- void handleExecution(framing::AMQMethodBody::shared_ptr method);
+ void stop();
void setQos();
-
- void protocolInit(
- const std::string& uid, const std::string& pwd,
- const std::string& vhost);
framing::AMQMethodBody::shared_ptr sendAndReceive(
framing::AMQMethodBody::shared_ptr,
- framing::ClassId, framing::MethodId);
+ framing::ClassId = 0, framing::MethodId = 0);
framing::AMQMethodBody::shared_ptr sendAndReceiveSync(
bool sync,
framing::AMQMethodBody::shared_ptr,
framing::ClassId, framing::MethodId);
+ void sendSync(bool sync, framing::AMQMethodBody::shared_ptr body);
+
+
template <class BodyType>
boost::shared_ptr<BodyType> sendAndReceive(framing::AMQMethodBody::shared_ptr body) {
return boost::shared_polymorphic_downcast<BodyType>(
@@ -118,21 +121,16 @@ class Channel : public framing::ChannelAdapter
sync, body, BodyType::CLASS_ID, BodyType::METHOD_ID));
}
- void sendCommand(framing::AMQBody::shared_ptr body);
-
void open(framing::ChannelId, Connection&);
void closeInternal();
- void peerClose(boost::shared_ptr<framing::ChannelCloseBody>);
- bool waitForCompletion(framing::SequenceNumber, sys::Duration);
-
+ void peerClose(uint16_t, const std::string&);
+
// FIXME aconway 2007-02-23: Get rid of friendships.
- friend class Connection;
- friend class BasicMessageChannel; // for sendAndReceive.
- friend class MessageMessageChannel; // for sendAndReceive.
+ friend class Connection;
+ friend class BasicMessageChannel; // for sendAndReceive.
+ friend class MessageMessageChannel; // for sendAndReceive.
public:
- enum InteropMode { AMQP_08, AMQP_09 };
-
/**
* Creates a channel object.
*
@@ -143,16 +141,10 @@ class Channel : public framing::ChannelAdapter
* @param prefetch specifies the number of unacknowledged
* messages the channel is willing to have sent to it
* asynchronously
- *
- * @param messageImpl Alternate messaging implementation class to
- * allow alternate protocol implementations of messaging
- * operations. Takes ownership.
*/
- Channel(
- bool transactional = false, u_int16_t prefetch = 500,
- InteropMode=AMQP_08);
+ Channel(bool transactional = false, u_int16_t prefetch = 500);
- ~Channel();
+ ~Channel();
/**
* Declares an exchange.
@@ -254,12 +246,10 @@ class Channel : public framing::ChannelAdapter
void start();
/**
- * Close the channel with optional error information.
- * Closing a channel that is not open has no effect.
+ * Close the channel. Closing a channel that is not open has no
+ * effect.
*/
- void close(
- framing::ReplyCode = 200, const std::string& ="OK",
- framing::ClassId = 0, framing::MethodId = 0);
+ void close();
/** True if the channel is transactional */
bool isTransactional() { return transactional; }
@@ -301,7 +291,7 @@ class Channel : public framing::ChannelAdapter
* is received from the broker
*/
void consume(
- Queue& queue, std::string& tag, MessageListener* listener,
+ Queue& queue, const std::string& tag, MessageListener* listener,
AckMode ackMode = NO_ACK, bool noLocal = false, bool synch = true,
const framing::FieldTable* fields = 0);
@@ -353,22 +343,9 @@ class Channel : public framing::ChannelAdapter
bool mandatory = false, bool immediate = false);
/**
- * Set a handler for this channel that will process any
- * returned messages
- *
- * @see publish()
- */
- void setReturnedMessageHandler(ReturnedMessageHandler* handler);
-
- /**
- * Deliver messages from the broker to the appropriate MessageListener.
+ * Deliver incoming messages to the appropriate MessageListener.
*/
void run();
-
- /**
- * TESTING ONLY FOR NOW!
- */
- bool synchWithServer(sys::Duration timeout = 0);
};
}}
diff --git a/cpp/src/qpid/client/ClientConnection.cpp b/cpp/src/qpid/client/ClientConnection.cpp
index 4b8f32a26f..c998ec30df 100644
--- a/cpp/src/qpid/client/ClientConnection.cpp
+++ b/cpp/src/qpid/client/ClientConnection.cpp
@@ -46,11 +46,13 @@ const std::string Connection::OK("OK");
Connection::Connection(
bool _debug, uint32_t _max_frame_size,
framing::ProtocolVersion _version
-) : channelIdCounter(0), version(_version), max_frame_size(_max_frame_size),
+ ) : channelIdCounter(0), version(_version), max_frame_size(_max_frame_size),
defaultConnector(version, _debug, _max_frame_size),
isOpen(false), debug(_debug)
{
setConnector(defaultConnector);
+
+ handler.maxFrameSize = _max_frame_size;
}
Connection::~Connection(){}
@@ -58,7 +60,7 @@ Connection::~Connection(){}
void Connection::setConnector(Connector& con)
{
connector = &con;
- connector->setInputHandler(this);
+ connector->setInputHandler(&handler);
connector->setTimeoutHandler(this);
connector->setShutdownHandler(this);
out = connector->getOutputHandler();
@@ -70,10 +72,19 @@ void Connection::open(
{
if (isOpen)
THROW_QPID_ERROR(INTERNAL_ERROR, "Channel object is already open");
+
+ //wire up the handler:
+ handler.in = boost::bind(&Connection::received, this, _1);
+ handler.out = boost::bind(&Connector::send, connector, _1);
+ handler.onClose = boost::bind(&Connection::closeChannels, this);
+
+ handler.uid = uid;
+ handler.pwd = pwd;
+ handler.vhost = vhost;
+
connector->connect(host, port);
- channels[0] = &channel0;
- channel0.open(0, *this);
- channel0.protocolInit(uid, pwd, vhost);
+ connector->init();
+ handler.waitForOpen();
isOpen = true;
}
@@ -87,14 +98,12 @@ void Connection::shutdown() {
}
void Connection::close(
- ReplyCode code, const string& msg, ClassId classId, MethodId methodId
+ ReplyCode /*code*/, const string& /*msg*/, ClassId /*classId*/, MethodId /*methodId*/
)
{
if(markClosed()) {
try {
- channel0.sendAndReceive<ConnectionCloseOkBody>(
- make_shared_ptr(new ConnectionCloseBody(
- getVersion(), code, msg, classId, methodId)));
+ handler.close();
} catch (const std::exception& e) {
QPID_LOG(error, "Exception closing channel: " << e.what());
}
@@ -138,35 +147,16 @@ void Connection::erase(ChannelId id) {
void Connection::received(AMQFrame& frame){
ChannelId id = frame.getChannel();
Channel* channel = channels[id];
- if (channel == 0)
- THROW_QPID_ERROR(
- PROTOCOL_ERROR+504,
- (boost::format("Invalid channel number %g") % id).str());
- try{
- channel->getHandlers().in->handle(frame);
- }catch(const qpid::QpidError& e){
- std::cout << "Caught error while handling " << frame << ": " << e.what() <<std::endl;
- channelException(
- *channel, dynamic_cast<AMQMethodBody*>(frame.getBody().get()), e);
+ if (channel == 0) {
+ throw ConnectionException(504, (boost::format("Invalid channel number %g") % id).str());
}
+ channel->channelHandler.incoming(frame);
}
void Connection::send(AMQFrame& frame) {
out->send(frame);
}
-void Connection::channelException(
- Channel& channel, AMQMethodBody* method, const QpidError& e)
-{
- int code = (e.code >= PROTOCOL_ERROR) ? e.code - PROTOCOL_ERROR : 500;
- string msg = e.msg;
- if(method == 0)
- channel.close(code, msg);
- else
- channel.close(
- code, msg, method->amqpClassId(), method->amqpMethodId());
-}
-
void Connection::idleIn(){
connector->close();
}
diff --git a/cpp/src/qpid/client/CompletionTracker.cpp b/cpp/src/qpid/client/CompletionTracker.cpp
new file mode 100644
index 0000000000..996971dbd2
--- /dev/null
+++ b/cpp/src/qpid/client/CompletionTracker.cpp
@@ -0,0 +1,64 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "CompletionTracker.h"
+
+using qpid::client::CompletionTracker;
+using namespace qpid::framing;
+using namespace boost;
+
+CompletionTracker::CompletionTracker() {}
+CompletionTracker::CompletionTracker(const SequenceNumber& m) : mark(m) {}
+
+
+void CompletionTracker::completed(const SequenceNumber& _mark)
+{
+ sys::Mutex::ScopedLock l(lock);
+ mark = _mark;
+ while (!listeners.empty() && !(listeners.front().first > mark)) {
+ Listener f(listeners.front().second);
+ {
+ sys::Mutex::ScopedUnlock u(lock);
+ f();
+ }
+ listeners.pop();
+ }
+}
+
+void CompletionTracker::listen(const SequenceNumber& point, Listener listener)
+{
+ if (!add(point, listener)) {
+ listener();
+ }
+}
+
+bool CompletionTracker::add(const SequenceNumber& point, Listener listener)
+{
+ sys::Mutex::ScopedLock l(lock);
+ if (point < mark) {
+ return false;
+ } else {
+ listeners.push(make_pair(point, listener));
+ return true;
+ }
+}
+
+
diff --git a/cpp/src/qpid/client/CompletionTracker.h b/cpp/src/qpid/client/CompletionTracker.h
new file mode 100644
index 0000000000..30999b4184
--- /dev/null
+++ b/cpp/src/qpid/client/CompletionTracker.h
@@ -0,0 +1,56 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <queue>
+#include <boost/function.hpp>
+#include "qpid/framing/amqp_framing.h"
+#include "qpid/framing/SequenceNumber.h"
+#include "qpid/sys/Mutex.h"
+
+#ifndef _CompletionTracker_
+#define _CompletionTracker_
+
+namespace qpid {
+namespace client {
+
+class CompletionTracker
+{
+public:
+ typedef boost::function<void()> Listener;
+
+ CompletionTracker();
+ CompletionTracker(const framing::SequenceNumber& mark);
+ void completed(const framing::SequenceNumber& mark);
+ void listen(const framing::SequenceNumber& point, Listener l);
+
+private:
+ sys::Mutex lock;
+ framing::SequenceNumber mark;
+ std::queue< std::pair<framing::SequenceNumber, Listener> > listeners;
+
+ bool add(const framing::SequenceNumber& point, Listener l);
+};
+
+}
+}
+
+
+#endif
diff --git a/cpp/src/qpid/client/Connection.h b/cpp/src/qpid/client/Connection.h
index 51434fcefd..4d32456c40 100644
--- a/cpp/src/qpid/client/Connection.h
+++ b/cpp/src/qpid/client/Connection.h
@@ -26,6 +26,7 @@
#include "qpid/QpidError.h"
#include "ClientChannel.h"
#include "Connector.h"
+#include "ConnectionHandler.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/ShutdownHandler.h"
#include "qpid/sys/TimeoutHandler.h"
@@ -79,17 +80,15 @@ class Connection : public ConnectionForChannel
framing::ProtocolVersion version;
const uint32_t max_frame_size;
ChannelMap channels;
+ ConnectionHandler handler;
Connector defaultConnector;
Connector* connector;
framing::OutputHandler* out;
bool isOpen;
sys::Mutex shutdownLock;
- Channel channel0;
bool debug;
void erase(framing::ChannelId);
- void channelException(
- Channel&, framing::AMQMethodBody*, const QpidError&);
void closeChannels();
bool markClosed();
@@ -174,7 +173,7 @@ class Connection : public ConnectionForChannel
inline uint32_t getMaxFrameSize(){ return max_frame_size; }
/** @return protocol version in use on this connection. */
- framing::ProtocolVersion getVersion() const { return version; }
+ //framing::ProtocolVersion getVersion() const { return version; }
};
}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/ConnectionHandler.cpp b/cpp/src/qpid/client/ConnectionHandler.cpp
new file mode 100644
index 0000000000..ada3fa4fb0
--- /dev/null
+++ b/cpp/src/qpid/client/ConnectionHandler.cpp
@@ -0,0 +1,196 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ConnectionHandler.h"
+#include "qpid/log/Statement.h"
+#include "qpid/framing/amqp_framing.h"
+
+using namespace qpid::client;
+using namespace qpid::framing;
+using namespace boost;
+
+namespace {
+const std::string OK("OK");
+}
+
+ConnectionHandler::ConnectionHandler()
+ : StateManager(NOT_STARTED)
+{
+
+ mechanism = "PLAIN";
+ locale = "en_US";
+ heartbeat = 0;
+ maxChannels = 32767;
+ maxFrameSize = 65536;
+ insist = true;
+ version = framing::highestProtocolVersion;
+
+ ESTABLISHED.insert(FAILED);
+ ESTABLISHED.insert(OPEN);
+}
+
+void ConnectionHandler::incoming(AMQFrame& frame)
+{
+ if (getState() == CLOSED) {
+ throw Exception("Connection is closed.");
+ }
+
+ AMQBody::shared_ptr body = frame.getBody();
+ if (frame.getChannel() == 0) {
+ if (body->type() == METHOD_BODY) {
+ handle(shared_polymorphic_cast<AMQMethodBody>(body));
+ } else {
+ error(503, "Cannot send content on channel zero.");
+ }
+ } else {
+ switch(getState()) {
+ case OPEN:
+ try {
+ in(frame);
+ }catch(ConnectionException& e){
+ error(e.code, e.toString(), body);
+ }catch(std::exception& e){
+ error(541/*internal error*/, e.what(), body);
+ }
+ break;
+ case CLOSING:
+ QPID_LOG(warning, "Received frame on non-zero channel while closing connection; frame ignored.");
+ break;
+ default:
+ //must be in connection initialisation:
+ fail("Cannot receive frames on non-zero channel until connection is established.");
+ }
+ }
+}
+
+void ConnectionHandler::outgoing(AMQFrame& frame)
+{
+ if (getState() == OPEN) {
+ out(frame);
+ } else {
+ throw Exception("Connection is not open.");
+ }
+}
+
+void ConnectionHandler::waitForOpen()
+{
+ waitFor(ESTABLISHED);
+ if (getState() == FAILED) {
+ throw Exception("Failed to establish connection.");
+ }
+}
+
+void ConnectionHandler::close()
+{
+ setState(CLOSING);
+ send(make_shared_ptr(new ConnectionCloseBody(version, 200, OK, 0, 0)));
+
+ waitFor(CLOSED);
+}
+
+void ConnectionHandler::send(framing::AMQBody::shared_ptr body)
+{
+ AMQFrame f;
+ f.setBody(body);
+ out(f);
+}
+
+void ConnectionHandler::error(uint16_t code, const std::string& message, uint16_t classId, uint16_t methodId)
+{
+ setState(CLOSING);
+ send(make_shared_ptr(new ConnectionCloseBody(version, code, message, classId, methodId)));
+}
+
+void ConnectionHandler::error(uint16_t code, const std::string& message, AMQBody::shared_ptr body)
+{
+ if (body->type() == METHOD_BODY) {
+ AMQMethodBody::shared_ptr method(shared_polymorphic_cast<AMQMethodBody>(body));
+ error(code, message, method->amqpClassId(), method->amqpMethodId());
+ } else {
+ error(code, message);
+ }
+}
+
+
+void ConnectionHandler::fail(const std::string& message)
+{
+ QPID_LOG(error, message);
+ setState(FAILED);
+}
+
+void ConnectionHandler::handle(AMQMethodBody::shared_ptr method)
+{
+ switch (getState()) {
+ case NOT_STARTED:
+ if (method->isA<ConnectionStartBody>()) {
+ setState(NEGOTIATING);
+ string response = ((char)0) + uid + ((char)0) + pwd;
+ send(make_shared_ptr(new ConnectionStartOkBody(version, properties, mechanism, response, locale)));
+ } else {
+ fail("Bad method sequence, expected connection-start.");
+ }
+ break;
+ case NEGOTIATING:
+ if (method->isA<ConnectionTuneBody>()) {
+ ConnectionTuneBody::shared_ptr proposal(shared_polymorphic_cast<ConnectionTuneBody>(method));
+ heartbeat = proposal->getHeartbeat();
+ maxChannels = proposal->getChannelMax();
+ send(make_shared_ptr(new ConnectionTuneOkBody(version, maxChannels, maxFrameSize, heartbeat)));
+ setState(OPENING);
+ send(make_shared_ptr(new ConnectionOpenBody(version, vhost, capabilities, insist)));
+ //TODO: support for further security challenges
+ //} else if (method->isA<ConnectionSecureBody>()) {
+ } else {
+ fail("Unexpected method sequence, expected connection-tune.");
+ }
+ break;
+ case OPENING:
+ if (method->isA<ConnectionOpenOkBody>()) {
+ setState(OPEN);
+ //TODO: support for redirection
+ //} else if (method->isA<ConnectionRedirectBody>()) {
+ } else {
+ fail("Unexpected method sequence, expected connection-open-ok.");
+ }
+ break;
+ case OPEN:
+ if (method->isA<ConnectionCloseBody>()) {
+ send(make_shared_ptr(new ConnectionCloseOkBody(version)));
+ setState(CLOSED);
+ if (onClose) {
+ onClose();
+ }
+ } else {
+ error(503, "Unexpected method on channel zero.", method->amqpClassId(), method->amqpMethodId());
+ }
+ break;
+ case CLOSING:
+ if (method->isA<ConnectionCloseOkBody>()) {
+ setState(CLOSED);
+ if (onClose) {
+ onClose();
+ }
+ } else {
+ QPID_LOG(warning, "Received frame on channel zero while closing connection; frame ignored.");
+ }
+ break;
+ }
+}
diff --git a/cpp/src/qpid/client/ConnectionHandler.h b/cpp/src/qpid/client/ConnectionHandler.h
new file mode 100644
index 0000000000..50618b50b1
--- /dev/null
+++ b/cpp/src/qpid/client/ConnectionHandler.h
@@ -0,0 +1,80 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _ConnectionHandler_
+#define _ConnectionHandler_
+
+#include "Connector.h"
+#include "StateManager.h"
+#include "ChainableFrameHandler.h"
+#include "qpid/framing/InputHandler.h"
+
+namespace qpid {
+namespace client {
+
+struct ConnectionProperties
+{
+ std::string uid;
+ std::string pwd;
+ std::string vhost;
+ framing::FieldTable properties;
+ std::string mechanism;
+ std::string locale;
+ std::string capabilities;
+ uint16_t heartbeat;
+ uint16_t maxChannels;
+ uint64_t maxFrameSize;
+ bool insist;
+ framing::ProtocolVersion version;
+};
+
+class ConnectionHandler : private StateManager,
+ public ConnectionProperties,
+ public ChainableFrameHandler,
+ public framing::InputHandler
+{
+ enum STATES {NOT_STARTED, NEGOTIATING, OPENING, OPEN, CLOSING, CLOSED, FAILED};
+ std::set<int> ESTABLISHED;
+
+ void handle(framing::AMQMethodBody::shared_ptr method);
+ void send(framing::AMQBody::shared_ptr body);
+ void error(uint16_t code, const std::string& message, uint16_t classId = 0, uint16_t methodId = 0);
+ void error(uint16_t code, const std::string& message, framing::AMQBody::shared_ptr body);
+ void fail(const std::string& message);
+
+public:
+ typedef boost::function<void()> CloseListener;
+
+ ConnectionHandler();
+
+ void received(framing::AMQFrame& f) { incoming(f); }
+
+ void incoming(framing::AMQFrame& frame);
+ void outgoing(framing::AMQFrame& frame);
+
+ void waitForOpen();
+ void close();
+
+ CloseListener onClose;
+};
+
+}}
+
+#endif
diff --git a/cpp/src/qpid/client/Correlator.cpp b/cpp/src/qpid/client/Correlator.cpp
new file mode 100644
index 0000000000..edb16bbcee
--- /dev/null
+++ b/cpp/src/qpid/client/Correlator.cpp
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Correlator.h"
+
+using qpid::client::Correlator;
+using namespace qpid::framing;
+using namespace boost;
+
+void Correlator::receive(AMQMethodBody::shared_ptr response)
+{
+ if (listeners.empty()) {
+ throw ConnectionException(503, "Unexpected method!");//TODO: include the method & class name
+ } else {
+ Listener l = listeners.front();
+ if (l) l(response);
+ listeners.pop();
+ }
+}
+
+void Correlator::listen(Listener l)
+{
+ listeners.push(l);
+}
+
+
diff --git a/cpp/src/qpid/client/Correlator.h b/cpp/src/qpid/client/Correlator.h
new file mode 100644
index 0000000000..339c9bd0c4
--- /dev/null
+++ b/cpp/src/qpid/client/Correlator.h
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <memory>
+#include <queue>
+#include <set>
+#include <boost/function.hpp>
+#include "qpid/framing/AMQMethodBody.h"
+#include "qpid/sys/Monitor.h"
+
+#ifndef _Correlator_
+#define _Correlator_
+
+namespace qpid {
+namespace client {
+
+
+class Correlator
+{
+public:
+ typedef boost::function<void(framing::AMQMethodBody::shared_ptr)> Listener;
+
+ void receive(framing::AMQMethodBody::shared_ptr);
+ void listen(Listener l);
+
+private:
+ std::queue<Listener> listeners;
+};
+
+}
+}
+
+
+#endif
diff --git a/cpp/src/qpid/client/ExecutionHandler.cpp b/cpp/src/qpid/client/ExecutionHandler.cpp
new file mode 100644
index 0000000000..e4270f4e98
--- /dev/null
+++ b/cpp/src/qpid/client/ExecutionHandler.cpp
@@ -0,0 +1,159 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ExecutionHandler.h"
+#include "qpid/Exception.h"
+#include "qpid/framing/BasicDeliverBody.h"
+#include "qpid/framing/MessageTransferBody.h"
+
+using namespace qpid::client;
+using namespace qpid::framing;
+using namespace boost;
+
+bool isMessageMethod(AMQMethodBody::shared_ptr method)
+{
+ return method->isA<BasicDeliverBody>() || method->isA<MessageTransferBody>() || method->isA<BasicGetOkBody>();
+}
+
+bool isMessageMethod(AMQBody::shared_ptr body)
+{
+ return body->type() == METHOD_BODY && isMessageMethod(shared_polymorphic_cast<AMQMethodBody>(body));
+}
+
+bool isContentFrame(AMQFrame& frame)
+{
+ AMQBody::shared_ptr body = frame.getBody();
+ uint8_t type = body->type();
+ return type == HEADER_BODY || type == CONTENT_BODY || isMessageMethod(body);
+}
+
+bool invoke(AMQBody::shared_ptr body, Invocable* target)
+{
+ return body->type() == METHOD_BODY && shared_polymorphic_cast<AMQMethodBody>(body)->invoke(target);
+}
+
+ExecutionHandler::ExecutionHandler() : version(framing::highestProtocolVersion) {}
+
+//incoming:
+void ExecutionHandler::handle(AMQFrame& frame)
+{
+ AMQBody::shared_ptr body = frame.getBody();
+ if (!invoke(body, this)) {
+ if (isContentFrame(frame)) {
+ if (!arriving) {
+ arriving = ReceivedContent::shared_ptr(new ReceivedContent(++incoming.hwm));
+ }
+ arriving->append(body);
+ if (arriving->isComplete()) {
+ received.push(arriving);
+ arriving.reset();
+ }
+ } else {
+ ++incoming.hwm;
+ correlation.receive(shared_polymorphic_cast<AMQMethodBody>(body));
+ }
+ }
+}
+
+void ExecutionHandler::complete(uint32_t cumulative, SequenceNumberSet range)
+{
+ SequenceNumber mark(cumulative);
+ if (outgoing.lwm < mark) {
+ outgoing.lwm = mark;
+ completion.completed(outgoing.lwm);
+ }
+ if (range.size() % 2) { //must be even number
+ throw ConnectionException(530, "Received odd number of elements in ranged mark");
+ } else {
+ //TODO: need to manage (record and accumulate) ranges such
+ //that we can implictly move the mark when appropriate
+
+ //TODO: signal listeners of early notification?
+ }
+}
+
+void ExecutionHandler::flush()
+{
+ //send completion
+ incoming.lwm = incoming.hwm;
+ //make_shared_ptr(new ExecutionCompleteBody(getVersion(), incoming.hwm.getValue(), SequenceNumberSet())));
+}
+
+void ExecutionHandler::send(AMQBody::shared_ptr command, CompletionTracker::Listener f, Correlator::Listener g)
+{
+ //allocate id:
+ ++outgoing.hwm;
+ //register listeners if necessary:
+ if (f) {
+ completion.listen(outgoing.hwm, f);
+ }
+ if (g) {
+ correlation.listen(g);
+ }
+
+ AMQFrame frame(version, 0/*id will be filled in be channel handler*/, command);
+ out(frame);
+
+ if (f) {
+ AMQFrame frame(version, 0, make_shared_ptr(new ExecutionFlushBody(version)));
+ out(frame);
+ }
+}
+
+void ExecutionHandler::sendContent(framing::AMQBody::shared_ptr content)
+{
+ AMQFrame frame(version, 0/*id will be filled in be channel handler*/, content);
+ out(frame);
+}
+
+void ExecutionHandler::sendContent(AMQBody::shared_ptr command, const BasicHeaderProperties& headers, const std::string& data,
+ uint64_t frameSize,
+ CompletionTracker::Listener f, Correlator::Listener g)
+{
+ send(command, f, g);
+
+ AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
+ BasicHeaderProperties::copy(*static_cast<BasicHeaderProperties*>(header->getProperties()), headers);
+ header->setContentSize(data.size());
+ AMQFrame h(version, 0, header);
+ out(h);
+
+ u_int64_t data_length = data.length();
+ if(data_length > 0){
+ //frame itself uses 8 bytes
+ u_int32_t frag_size = frameSize - 8;
+ if(data_length < frag_size){
+ AMQFrame frame(version, 0, make_shared_ptr(new AMQContentBody(data)));
+ out(frame);
+ }else{
+ u_int32_t offset = 0;
+ u_int32_t remaining = data_length - offset;
+ while (remaining > 0) {
+ u_int32_t length = remaining > frag_size ? frag_size : remaining;
+ string frag(data.substr(offset, length));
+ AMQFrame frame(version, 0, make_shared_ptr(new AMQContentBody(frag)));
+ out(frame);
+ offset += length;
+ remaining = data_length - offset;
+ }
+ }
+ }
+}
diff --git a/cpp/src/qpid/client/ExecutionHandler.h b/cpp/src/qpid/client/ExecutionHandler.h
new file mode 100644
index 0000000000..99b0f4b915
--- /dev/null
+++ b/cpp/src/qpid/client/ExecutionHandler.h
@@ -0,0 +1,70 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _ExecutionHandler_
+#define _ExecutionHandler_
+
+#include <queue>
+#include "qpid/framing/AMQP_ServerOperations.h"
+#include "qpid/framing/SequenceNumber.h"
+#include "BlockingQueue.h"
+#include "ChainableFrameHandler.h"
+#include "CompletionTracker.h"
+#include "Correlator.h"
+#include "ReceivedContent.h"
+
+namespace qpid {
+namespace client {
+
+class ExecutionHandler :
+ private framing::AMQP_ServerOperations::ExecutionHandler,
+ public ChainableFrameHandler
+{
+ framing::Window incoming;
+ framing::Window outgoing;
+ ReceivedContent::shared_ptr arriving;
+ Correlator correlation;
+ CompletionTracker completion;
+ framing::ProtocolVersion version;
+
+ void complete(uint32_t mark, framing::SequenceNumberSet range);
+ void flush();
+
+public:
+ BlockingQueue<ReceivedContent::shared_ptr> received;
+
+ ExecutionHandler();
+
+ void handle(framing::AMQFrame& frame);
+ void send(framing::AMQBody::shared_ptr command,
+ CompletionTracker::Listener f = CompletionTracker::Listener(),
+ Correlator::Listener g = Correlator::Listener());
+ void sendContent(framing::AMQBody::shared_ptr command,
+ const framing::BasicHeaderProperties& headers, const std::string& data,
+ uint64_t frameSize,
+ CompletionTracker::Listener f = CompletionTracker::Listener(),
+ Correlator::Listener g = Correlator::Listener());
+
+ void sendContent(framing::AMQBody::shared_ptr content);
+};
+
+}}
+
+#endif
diff --git a/cpp/src/qpid/client/FutureCompletion.cpp b/cpp/src/qpid/client/FutureCompletion.cpp
new file mode 100644
index 0000000000..6fc3d5f088
--- /dev/null
+++ b/cpp/src/qpid/client/FutureCompletion.cpp
@@ -0,0 +1,61 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "FutureCompletion.h"
+
+using namespace qpid::client;
+using namespace qpid::sys;
+
+FutureCompletion::FutureCompletion() : complete(false), closed(false), code(0) {}
+
+bool FutureCompletion::isComplete()
+{
+ Monitor::ScopedLock l(lock);
+ return complete;
+}
+
+void FutureCompletion::completed()
+{
+ Monitor::ScopedLock l(lock);
+ complete = true;
+ lock.notifyAll();
+}
+
+void FutureCompletion::waitForCompletion()
+{
+ Monitor::ScopedLock l(lock);
+ while (!complete && !closed) {
+ lock.wait();
+ }
+ if (closed) {
+ throw ChannelException(code, text);
+ }
+}
+
+void FutureCompletion::close(uint16_t _code, const std::string& _text)
+{
+ Monitor::ScopedLock l(lock);
+ complete = true;
+ closed = true;
+ code = _code;
+ text = _text;
+ lock.notifyAll();
+}
diff --git a/cpp/src/qpid/client/FutureCompletion.h b/cpp/src/qpid/client/FutureCompletion.h
new file mode 100644
index 0000000000..3487a0910a
--- /dev/null
+++ b/cpp/src/qpid/client/FutureCompletion.h
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef _FutureCompletion_
+#define _FutureCompletion_
+
+#include "qpid/framing/amqp_framing.h"
+#include "qpid/sys/Monitor.h"
+
+namespace qpid {
+namespace client {
+
+class FutureCompletion
+{
+protected:
+ sys::Monitor lock;
+ bool complete;
+ bool closed;
+ uint16_t code;
+ std::string text;
+
+public:
+ FutureCompletion();
+ virtual ~FutureCompletion(){}
+ bool isComplete();
+ void waitForCompletion();
+ void completed();
+ void close(uint16_t code, const std::string& text);
+};
+
+}}
+
+
+#endif
diff --git a/cpp/src/qpid/client/FutureFactory.cpp b/cpp/src/qpid/client/FutureFactory.cpp
new file mode 100644
index 0000000000..7f9d51e77f
--- /dev/null
+++ b/cpp/src/qpid/client/FutureFactory.cpp
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "FutureFactory.h"
+
+using namespace qpid::client;
+using namespace boost;
+
+shared_ptr<FutureCompletion> FutureFactory::createCompletion()
+{
+ shared_ptr<FutureCompletion> f(new FutureCompletion());
+ weak_ptr<FutureCompletion> w(f);
+ set.push_back(w);
+ return f;
+}
+
+shared_ptr<FutureResponse> FutureFactory::createResponse()
+{
+ shared_ptr<FutureResponse> f(new FutureResponse());
+ weak_ptr<FutureCompletion> w(static_pointer_cast<FutureCompletion>(f));
+ set.push_back(w);
+ return f;
+}
+
+void FutureFactory::close(uint16_t code, const std::string& text)
+{
+ for (WeakPtrSet::iterator i = set.begin(); i != set.end(); i++) {
+ shared_ptr<FutureCompletion> p = i->lock();
+ if (p) {
+ p->close(code, text);
+ }
+ }
+}
diff --git a/cpp/src/qpid/client/FutureFactory.h b/cpp/src/qpid/client/FutureFactory.h
new file mode 100644
index 0000000000..b126e296fd
--- /dev/null
+++ b/cpp/src/qpid/client/FutureFactory.h
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef _FutureFactory_
+#define _FutureFactory_
+
+#include <vector>
+#include <boost/shared_ptr.hpp>
+#include <boost/weak_ptr.hpp>
+#include "FutureCompletion.h"
+#include "FutureResponse.h"
+
+namespace qpid {
+namespace client {
+
+class FutureFactory
+{
+ typedef std::vector< boost::weak_ptr<FutureCompletion> > WeakPtrSet;
+ WeakPtrSet set;
+
+public:
+ boost::shared_ptr<FutureCompletion> createCompletion();
+ boost::shared_ptr<FutureResponse> createResponse();
+ void close(uint16_t code, const std::string& text);
+};
+
+}}
+
+
+#endif
diff --git a/cpp/src/qpid/client/FutureResponse.cpp b/cpp/src/qpid/client/FutureResponse.cpp
new file mode 100644
index 0000000000..6b1246a449
--- /dev/null
+++ b/cpp/src/qpid/client/FutureResponse.cpp
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "FutureResponse.h"
+
+using namespace qpid::client;
+using namespace qpid::framing;
+using namespace qpid::sys;
+
+
+AMQMethodBody::shared_ptr FutureResponse::getResponse()
+{
+ waitForCompletion();
+ return response;
+}
+
+void FutureResponse::received(AMQMethodBody::shared_ptr r)
+{
+ Monitor::ScopedLock l(lock);
+ response = r;
+ complete = true;
+ lock.notifyAll();
+}
+
diff --git a/cpp/src/qpid/client/FutureResponse.h b/cpp/src/qpid/client/FutureResponse.h
new file mode 100644
index 0000000000..ccc6fb5894
--- /dev/null
+++ b/cpp/src/qpid/client/FutureResponse.h
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef _FutureResponse_
+#define _FutureResponse_
+
+#include "qpid/framing/amqp_framing.h"
+#include "FutureCompletion.h"
+
+namespace qpid {
+namespace client {
+
+class FutureResponse : public FutureCompletion
+{
+ framing::AMQMethodBody::shared_ptr response;
+
+public:
+ framing::AMQMethodBody::shared_ptr getResponse();
+ void received(framing::AMQMethodBody::shared_ptr response);
+};
+
+}}
+
+
+
+#endif
diff --git a/cpp/src/qpid/client/IncomingMessage.cpp b/cpp/src/qpid/client/IncomingMessage.cpp
deleted file mode 100644
index 059e644464..0000000000
--- a/cpp/src/qpid/client/IncomingMessage.cpp
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "IncomingMessage.h"
-#include "qpid/Exception.h"
-#include "ClientMessage.h"
-#include <boost/format.hpp>
-
-namespace qpid {
-namespace client {
-
-using boost::format;
-using sys::Mutex;
-
-IncomingMessage::Destination::~Destination() {}
-
-
-IncomingMessage::WaitableDestination::WaitableDestination()
- : shutdownFlag(false) {}
-
-void IncomingMessage::WaitableDestination::message(const Message& msg) {
- Mutex::ScopedLock l(monitor);
- queue.push(msg);
- monitor.notify();
-}
-
-void IncomingMessage::WaitableDestination::empty() {
- Mutex::ScopedLock l(monitor);
- queue.push(Empty());
- monitor.notify();
-}
-
-bool IncomingMessage::WaitableDestination::wait(Message& msgOut) {
- Mutex::ScopedLock l(monitor);
- while (queue.empty() && !shutdownFlag)
- monitor.wait();
- if (shutdownFlag)
- return false;
- Message* msg = boost::get<Message>(&queue.front());
- bool success = msg;
- if (success)
- msgOut=*msg;
- queue.pop();
- if (!queue.empty())
- monitor.notify(); // Wake another waiter.
- return success;
-}
-
-void IncomingMessage::WaitableDestination::shutdown() {
- Mutex::ScopedLock l(monitor);
- shutdownFlag = true;
- monitor.notifyAll();
-}
-
-void IncomingMessage::openReference(const std::string& name) {
- Mutex::ScopedLock l(lock);
- if (references.find(name) != references.end())
- throw ConnectionException(
- 503, format("Attempt to open existing reference %s.") % name);
- references[name];
- return;
-}
-
-void IncomingMessage::appendReference(
- const std::string& name, const std::string& data)
-{
- Mutex::ScopedLock l(lock);
- getRefUnlocked(name).data += data;
-}
-
-Message& IncomingMessage::createMessage(
- const std::string& destination, const std::string& reference)
-{
- Mutex::ScopedLock l(lock);
- getDestUnlocked(destination); // Verify destination.
- Reference& ref = getRefUnlocked(reference);
- ref.messages.resize(ref.messages.size() +1);
- ref.messages.back().setDestination(destination);
- return ref.messages.back();
-}
-
-void IncomingMessage::closeReference(const std::string& name) {
- Reference refCopy;
- {
- Mutex::ScopedLock l(lock);
- refCopy = getRefUnlocked(name);
- references.erase(name);
- }
- for (std::vector<Message>::iterator i = refCopy.messages.begin();
- i != refCopy.messages.end();
- ++i)
- {
- i->setData(refCopy.data);
- // TODO aconway 2007-03-23: Thread safety,
- // can a destination be removed while we're doing this?
- getDestination(i->getDestination()).message(*i);
- }
-}
-
-
-void IncomingMessage::addDestination(std::string name, Destination& dest) {
- Mutex::ScopedLock l(lock);
- DestinationMap::iterator i = destinations.find(name);
- if (i == destinations.end())
- destinations[name]=&dest;
- else if (i->second != &dest)
- throw ConnectionException(
- 503, format("Destination already exists: %s.") % name);
-}
-
-void IncomingMessage::removeDestination(std::string name) {
- Mutex::ScopedLock l(lock);
- DestinationMap::iterator i = destinations.find(name);
- if (i == destinations.end())
- throw ConnectionException(
- 503, format("No such destination: %s.") % name);
- destinations.erase(i);
-}
-
-IncomingMessage::Destination& IncomingMessage::getDestination(
- const std::string& name) {
- return getDestUnlocked(name);
-}
-
-IncomingMessage::Reference& IncomingMessage::getReference(
- const std::string& name) {
- return getRefUnlocked(name);
-}
-
-IncomingMessage::Reference& IncomingMessage::getRefUnlocked(
- const std::string& name) {
- Mutex::ScopedLock l(lock);
- ReferenceMap::iterator i = references.find(name);
- if (i == references.end())
- throw ConnectionException(
- 503, format("No such reference: %s.") % name);
- return i->second;
-}
-
-IncomingMessage::Destination& IncomingMessage::getDestUnlocked(
- const std::string& name) {
- Mutex::ScopedLock l(lock);
- DestinationMap::iterator i = destinations.find(name);
- if (i == destinations.end())
- throw ConnectionException(
- 503, format("No such destination: %s.") % name);
- return *i->second;
-}
-
-}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/IncomingMessage.h b/cpp/src/qpid/client/IncomingMessage.h
deleted file mode 100644
index 7aa8e33df2..0000000000
--- a/cpp/src/qpid/client/IncomingMessage.h
+++ /dev/null
@@ -1,136 +0,0 @@
-#ifndef _IncomingMessage_
-#define _IncomingMessage_
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "qpid/sys/Monitor.h"
-#include <map>
-#include <queue>
-#include <vector>
-#include <boost/variant.hpp>
-
-namespace qpid {
-namespace client {
-
-class Message;
-
-/**
- * Manage incoming messages.
- *
- * Uses reference and destination concepts from 0-9 Messsage class.
- *
- * Basic messages use special destination and reference names to indicate
- * get-ok, return etc. messages.
- *
- */
-class IncomingMessage {
- public:
- /** Accumulate data associated with a set of messages. */
- struct Reference {
- std::string data;
- std::vector<Message> messages;
- };
-
- /** Interface to a destination for messages. */
- class Destination {
- public:
- virtual ~Destination();
-
- /** Pass a message to the destination */
- virtual void message(const Message&) = 0;
-
- /** Notify destination of queue-empty contition */
- virtual void empty() = 0;
- };
-
-
- /** A destination that a thread can wait on till a message arrives. */
- class WaitableDestination : public Destination
- {
- public:
- WaitableDestination();
- void message(const Message& msg);
- void empty();
- /** Wait till message() or empty() is called. True for message() */
- bool wait(Message& msgOut);
- void shutdown();
-
- private:
- struct Empty {};
- typedef boost::variant<Message,Empty> Item;
- sys::Monitor monitor;
- std::queue<Item> queue;
- bool shutdownFlag;
- };
-
-
-
- /** Add a reference. Throws if already open. */
- void openReference(const std::string& name);
-
- /** Get a reference. Throws if not already open. */
- void appendReference(const std::string& name,
- const std::string& data);
-
- /** Create a message to destination associated with reference
- *@exception if destination or reference non-existent.
- */
- Message& createMessage(const std::string& destination,
- const std::string& reference);
-
- /** Get a reference.
- *@exception if non-existent.
- */
- Reference& getReference(const std::string& name);
-
- /** Close a reference and deliver all its messages.
- * Throws if not open or a message has an invalid destination.
- */
- void closeReference(const std::string& name);
-
- /** Add a destination.
- *@exception if a different Destination is already registered
- * under name.
- */
- void addDestination(std::string name, Destination&);
-
- /** Remove a destination. Throws if does not exist */
- void removeDestination(std::string name);
-
- /** Get a destination. Throws if does not exist */
- Destination& getDestination(const std::string& name);
- private:
-
- typedef std::map<std::string, Reference> ReferenceMap;
- typedef std::map<std::string, Destination*> DestinationMap;
-
- Reference& getRefUnlocked(const std::string& name);
- Destination& getDestUnlocked(const std::string& name);
-
- mutable sys::Mutex lock;
- ReferenceMap references;
- DestinationMap destinations;
-};
-
-}}
-
-
-#endif
diff --git a/cpp/src/qpid/client/ReceivedContent.cpp b/cpp/src/qpid/client/ReceivedContent.cpp
new file mode 100644
index 0000000000..9cfee21c3c
--- /dev/null
+++ b/cpp/src/qpid/client/ReceivedContent.cpp
@@ -0,0 +1,104 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ReceivedContent.h"
+
+using qpid::client::ReceivedContent;
+using namespace qpid::framing;
+using namespace boost;
+
+ReceivedContent::ReceivedContent(const SequenceNumber& _id) : id(_id) {}
+
+void ReceivedContent::append(AMQBody::shared_ptr part)
+{
+ parts.push_back(part);
+}
+
+bool ReceivedContent::isComplete() const
+{
+ if (parts.empty()) {
+ return false;
+ } else if (isA<BasicDeliverBody>() || isA<BasicGetOkBody>()) {
+ AMQHeaderBody::shared_ptr headers(getHeaders());
+ return headers && headers->getContentSize() == getContentSize();
+ } else if (isA<MessageTransferBody>()) {
+ //no longer support references, headers and data are still method fields
+ return true;
+ } else {
+ throw Exception("Unknown content class");
+ }
+}
+
+
+AMQMethodBody::shared_ptr ReceivedContent::getMethod() const
+{
+ return parts.empty() ? AMQMethodBody::shared_ptr() : dynamic_pointer_cast<AMQMethodBody>(parts[0]);
+}
+
+AMQHeaderBody::shared_ptr ReceivedContent::getHeaders() const
+{
+ return parts.size() < 2 ? AMQHeaderBody::shared_ptr() : dynamic_pointer_cast<AMQHeaderBody>(parts[1]);
+}
+
+uint64_t ReceivedContent::getContentSize() const
+{
+ if (isA<BasicDeliverBody>() || isA<BasicGetOkBody>()) {
+ uint64_t size(0);
+ for (uint i = 2; i < parts.size(); i++) {
+ size += parts[i]->size();
+ }
+ return size;
+ } else if (isA<MessageTransferBody>()) {
+ return as<MessageTransferBody>()->getBody().getValue().size();
+ } else {
+ throw Exception("Unknown content class");
+ }
+}
+
+std::string ReceivedContent::getContent() const
+{
+ if (isA<BasicDeliverBody>() || isA<BasicGetOkBody>()) {
+ string data;
+ for (uint i = 2; i < parts.size(); i++) {
+ data += dynamic_pointer_cast<AMQContentBody>(parts[i])->getData();
+ }
+ return data;
+ } else if (isA<MessageTransferBody>()) {
+ return as<MessageTransferBody>()->getBody().getValue();
+ } else {
+ throw Exception("Unknown content class");
+ }
+}
+
+void ReceivedContent::populate(Message& msg)
+{
+ if (!isComplete()) throw Exception("Incomplete message");
+
+ if (isA<BasicDeliverBody>() || isA<BasicGetOkBody>()) {
+ BasicHeaderProperties* properties = dynamic_cast<BasicHeaderProperties*>(getHeaders()->getProperties());
+ BasicHeaderProperties::copy<Message, BasicHeaderProperties>(msg, *properties);
+ msg.setData(getContent());
+ } else if (isA<MessageTransferBody>()) {
+ throw Exception("Transfer not yet supported");
+ } else {
+ throw Exception("Unknown content class");
+ }
+}
diff --git a/cpp/src/qpid/client/ReceivedContent.h b/cpp/src/qpid/client/ReceivedContent.h
new file mode 100644
index 0000000000..1886034f9b
--- /dev/null
+++ b/cpp/src/qpid/client/ReceivedContent.h
@@ -0,0 +1,83 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <string>
+#include <vector>
+#include <boost/shared_ptr.hpp>
+#include "qpid/framing/amqp_framing.h"
+#include "qpid/framing/SequenceNumber.h"
+#include "ClientMessage.h"
+
+#ifndef _ReceivedContent_
+#define _ReceivedContent_
+
+namespace qpid {
+namespace client {
+
+/**
+ * Collects the frames representing some received 'content'. This
+ * provides a raw interface to 'message' data and attributes.
+ */
+class ReceivedContent
+{
+ const framing::SequenceNumber id;
+ std::vector<framing::AMQBody::shared_ptr> parts;
+
+public:
+ typedef boost::shared_ptr<ReceivedContent> shared_ptr;
+
+ ReceivedContent(const framing::SequenceNumber& id);
+ void append(framing::AMQBody::shared_ptr part);
+ bool isComplete() const;
+
+ uint64_t getContentSize() const;
+ std::string getContent() const;
+
+ framing::AMQMethodBody::shared_ptr getMethod() const;
+ framing::AMQHeaderBody::shared_ptr getHeaders() const;
+
+ template <class T> bool isA() const {
+ framing::AMQMethodBody::shared_ptr method = getMethod();
+ if (!method) {
+ return false;
+ } else {
+ return method->isA<T>();
+ }
+ }
+
+ template <class T> boost::shared_ptr<T> as() const {
+ framing::AMQMethodBody::shared_ptr method = getMethod();
+ if (method && method->isA<T>()) {
+ return boost::dynamic_pointer_cast<T>(method);
+ } else {
+ return boost::shared_ptr<T>();
+ }
+ }
+
+ const framing::SequenceNumber& getId() const { return id; }
+
+ void populate(Message& msg);
+};
+
+}
+}
+
+
+#endif
diff --git a/cpp/src/qpid/client/StateManager.cpp b/cpp/src/qpid/client/StateManager.cpp
new file mode 100644
index 0000000000..b72967c098
--- /dev/null
+++ b/cpp/src/qpid/client/StateManager.cpp
@@ -0,0 +1,68 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "StateManager.h"
+#include "qpid/framing/amqp_framing.h"
+
+using namespace qpid::client;
+using namespace qpid::framing;
+using namespace qpid::sys;
+
+StateManager::StateManager(int s) : state(s) {}
+
+void StateManager::waitForStateChange(int current)
+{
+ Monitor::ScopedLock l(stateLock);
+ while (state == current) {
+ stateLock.wait();
+ }
+}
+
+void StateManager::waitFor(int desired)
+{
+ Monitor::ScopedLock l(stateLock);
+ while (state != desired) {
+ stateLock.wait();
+ }
+}
+
+void StateManager::waitFor(std::set<int> desired)
+{
+ Monitor::ScopedLock l(stateLock);
+ while (desired.find(state) == desired.end()) {
+ stateLock.wait();
+ }
+}
+
+
+void StateManager::setState(int s)
+{
+ Monitor::ScopedLock l(stateLock);
+ state = s;
+ stateLock.notifyAll();
+}
+
+int StateManager::getState()
+{
+ Monitor::ScopedLock l(stateLock);
+ return state;
+}
+
diff --git a/cpp/src/qpid/client/StateManager.h b/cpp/src/qpid/client/StateManager.h
new file mode 100644
index 0000000000..fd0c1b7f86
--- /dev/null
+++ b/cpp/src/qpid/client/StateManager.h
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _StateManager_
+#define _StateManager_
+
+#include <set>
+#include "qpid/sys/Monitor.h"
+
+namespace qpid {
+namespace client {
+
+class StateManager
+{
+ int state;
+ sys::Monitor stateLock;
+
+public:
+ StateManager(int initial);
+ void setState(int state);
+ int getState();
+ void waitForStateChange(int current);
+ void waitFor(std::set<int> states);
+ void waitFor(int state);
+};
+
+}}
+
+#endif
diff --git a/cpp/src/tests/FramingTest.cpp b/cpp/src/tests/FramingTest.cpp
index 98f89b59be..582c7d6e55 100644
--- a/cpp/src/tests/FramingTest.cpp
+++ b/cpp/src/tests/FramingTest.cpp
@@ -410,12 +410,14 @@ class FramingTest : public CppUnit::TestCase
ASSERT_FRAME("BROKER: Frame[channel=1; ChannelOpenOk: ]", *i++);
ASSERT_FRAME("CLIENT: Frame[channel=1; ExchangeDeclare: ticket=0; exchange=MyExchange; type=topic; alternateExchange=; passive=0; durable=0; autoDelete=0; arguments={}]", *i++);
ASSERT_FRAME("CLIENT: Frame[channel=1; ExecutionFlush: ]", *i++);
- ASSERT_FRAME("BROKER: Frame[channel=1; ExecutionComplete: cumulativeExecutionMark=2; rangedExecutionSet={}]", *i++);
+ ASSERT_FRAME("BROKER: Frame[channel=1; ExecutionComplete: cumulativeExecutionMark=1; rangedExecutionSet={}]", *i++);
ASSERT_FRAME("CLIENT: Frame[channel=1; QueueDeclare: ticket=0; queue=MyQueue; alternateExchange=; passive=0; durable=0; exclusive=1; autoDelete=1; nowait=0; arguments={}]", *i++);
ASSERT_FRAME("BROKER: Frame[channel=1; QueueDeclareOk: queue=MyQueue; messageCount=0; consumerCount=0]", *i++);
+ ASSERT_FRAME("CLIENT: Frame[channel=1; ExecutionFlush: ]", *i++);
+ ASSERT_FRAME("BROKER: Frame[channel=1; ExecutionComplete: cumulativeExecutionMark=2; rangedExecutionSet={}]", *i++);
ASSERT_FRAME("CLIENT: Frame[channel=1; QueueBind: ticket=0; queue=MyQueue; exchange=MyExchange; routingKey=MyTopic; arguments={}]", *i++);
ASSERT_FRAME("CLIENT: Frame[channel=1; ExecutionFlush: ]", *i++);
- ASSERT_FRAME("BROKER: Frame[channel=1; ExecutionComplete: cumulativeExecutionMark=4; rangedExecutionSet={}]", *i++);
+ ASSERT_FRAME("BROKER: Frame[channel=1; ExecutionComplete: cumulativeExecutionMark=3; rangedExecutionSet={}]", *i++);
}
};
diff --git a/cpp/src/tests/Serializer.cpp b/cpp/src/tests/Serializer.cpp
index 8c0ee7b85c..d7345acf06 100644
--- a/cpp/src/tests/Serializer.cpp
+++ b/cpp/src/tests/Serializer.cpp
@@ -61,12 +61,17 @@ struct Tester {
}
};
+void execute(Serializer& s, Serializer::Task t)
+{
+ s.execute(t);
+}
+
BOOST_AUTO_TEST_CASE(testSingleThread) {
// Verify that we call in the same thread by default.
Tester tester;
Serializer s;
for (int i = 0; i < 100; ++i)
- s.execute(boost::bind(&Tester::test, &tester));
+ execute(s, boost::bind(&Tester::test, &tester));
// All should be executed in this thread.
BOOST_CHECK_EQUAL(0u, tester.collisions);
BOOST_CHECK_EQUAL(100u, tester.count);
@@ -80,7 +85,7 @@ BOOST_AUTO_TEST_CASE(testSingleThreadNoImmediate) {
Tester tester;
Serializer s(false);
for (int i = 0; i < 100; ++i)
- s.execute(boost::bind(&Tester::test, &tester));
+ execute(s, boost::bind(&Tester::test, &tester));
{
// Wait for dispatch thread to complete.
Mutex::ScopedLock l(tester.lock);
@@ -95,7 +100,7 @@ BOOST_AUTO_TEST_CASE(testSingleThreadNoImmediate) {
struct Caller : public Runnable, public Tester {
Caller(Serializer& s) : serializer(s) {}
- void run() { serializer.execute(boost::bind(&Tester::test, this)); }
+ void run() { execute(serializer, boost::bind(&Tester::test, this)); }
Serializer& serializer;
};
@@ -134,7 +139,7 @@ BOOST_AUTO_TEST_CASE(testExternalDispatch) {
serializer.reset(new Serializer(false, &notifyDispatch));
Tester tester;
for (int i = 0; i < 100; ++i)
- serializer->execute(boost::bind(&Tester::test, &tester));
+ execute(*serializer, boost::bind(&Tester::test, &tester));
{
// Wait for dispatch thread to complete.
Mutex::ScopedLock l(tester.lock);
diff --git a/cpp/src/tests/client_test.cpp b/cpp/src/tests/client_test.cpp
index cefc4338eb..4903312cd7 100644
--- a/cpp/src/tests/client_test.cpp
+++ b/cpp/src/tests/client_test.cpp
@@ -41,7 +41,6 @@ using namespace qpid::client;
using namespace qpid::sys;
using std::string;
-bool verbose = false;
/**
* A simple message listener implementation that prints out the
@@ -50,9 +49,10 @@ bool verbose = false;
*/
class SimpleListener : public virtual MessageListener{
Monitor* monitor;
+ bool verbose;
public:
- inline SimpleListener(Monitor* _monitor) : monitor(_monitor){}
+ inline SimpleListener(Monitor* _monitor, bool debug) : monitor(_monitor), verbose(debug) {}
inline virtual void received(Message& msg){
if (verbose)
@@ -101,7 +101,7 @@ int main(int argc, char** argv)
//montior to use to notify the main thread when that message
//is received.
Monitor monitor;
- SimpleListener listener(&monitor);
+ SimpleListener listener(&monitor, opts.trace);
string tag("MyTag");
channel.consume(queue, tag, &listener);
if (opts.trace) std::cout << "Registered consumer." << std::endl;
@@ -118,11 +118,6 @@ int main(int argc, char** argv)
msg.setData(data);
channel.publish(msg, exchange, "MyTopic");
if (opts.trace) std::cout << "Published message: " << data << std::endl;
- if (opts.trace) {
- std::cout << "Publication "
- << (channel.synchWithServer(qpid::sys::TIME_SEC * 1) ? " DID " : " did NOT ")
- << "complete" << std::endl;
- }
{
Monitor::ScopedLock l(monitor);
diff --git a/cpp/src/tests/topic_listener.cpp b/cpp/src/tests/topic_listener.cpp
index cb6bafcd8e..cddf3cb92a 100644
--- a/cpp/src/tests/topic_listener.cpp
+++ b/cpp/src/tests/topic_listener.cpp
@@ -111,8 +111,7 @@ int main(int argc, char** argv){
channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, control, "topic_control", bindArgs);
//set up listener
Listener listener(&channel, response.getName(), args.transactional);
- string tag;
- channel.consume(control, tag, &listener, AckMode(args.ackmode));
+ channel.consume(control, "c1", &listener, AckMode(args.ackmode));
cout << "topic_listener: Consuming." << endl;
channel.run();
connection.close();
diff --git a/cpp/src/tests/topic_publisher.cpp b/cpp/src/tests/topic_publisher.cpp
index f792540c09..5800f9225d 100644
--- a/cpp/src/tests/topic_publisher.cpp
+++ b/cpp/src/tests/topic_publisher.cpp
@@ -121,8 +121,7 @@ int main(int argc, char** argv) {
//set up listener
Publisher publisher(&channel, "topic_control", args.transactional);
- string tag("mytag");
- channel.consume(response, tag, &publisher, AckMode(args.ackmode));
+ channel.consume(response, "mytag", &publisher, AckMode(args.ackmode));
channel.start();
int batchSize(args.batches);