summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-05-23 21:23:07 +0000
committerAlan Conway <aconway@apache.org>2008-05-23 21:23:07 +0000
commit3b4d08876f63637cd9ffb28988eb2ec9a9a7f30e (patch)
tree11206920074aef9a5b9d794b6ed550f72d3a198d /cpp/src
parent52833097fb1737316c76822bf7e6dda31dec3433 (diff)
downloadqpid-python-3b4d08876f63637cd9ffb28988eb2ec9a9a7f30e.tar.gz
Delete obsolete Channel class.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@659663 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/Makefile.am2
-rw-r--r--cpp/src/qpid/client/Channel.cpp289
-rw-r--r--cpp/src/qpid/client/Channel.h316
-rw-r--r--cpp/src/qpid/client/Connection.cpp8
-rw-r--r--cpp/src/qpid/client/Connection.h13
-rw-r--r--cpp/src/tests/ClientChannelTest.cpp220
-rw-r--r--cpp/src/tests/Makefile.am35
-rw-r--r--cpp/src/tests/echo_service.cpp229
8 files changed, 18 insertions, 1094 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index 9321e0d855..1c8ca9da12 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -308,7 +308,6 @@ libqpidclient_la_LIBADD = libqpidcommon.la -luuid
libqpidclient_la_SOURCES = \
$(rgen_client_srcs) \
qpid/client/Bounds.cpp \
- qpid/client/Channel.cpp \
qpid/client/ConnectionImpl.cpp \
qpid/client/Connector.cpp \
qpid/client/Connection.cpp \
@@ -434,7 +433,6 @@ nobase_include_HEADERS = \
qpid/client/ConnectionSettings.h \
qpid/client/Connector.h \
qpid/client/ChainableFrameHandler.h \
- qpid/client/Channel.h \
qpid/client/Demux.h \
qpid/client/Dispatcher.h \
qpid/client/Exchange.h \
diff --git a/cpp/src/qpid/client/Channel.cpp b/cpp/src/qpid/client/Channel.cpp
deleted file mode 100644
index 3bcba8983c..0000000000
--- a/cpp/src/qpid/client/Channel.cpp
+++ /dev/null
@@ -1,289 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "qpid/log/Statement.h"
-#include <iostream>
-#include <sstream>
-#include "Channel.h"
-#include "qpid/sys/Monitor.h"
-#include "AckPolicy.h"
-#include "Message.h"
-#include "Connection.h"
-#include "Demux.h"
-#include "MessageListener.h"
-#include "MessageQueue.h"
-#include <boost/format.hpp>
-#include <boost/bind.hpp>
-#include "qpid/framing/all_method_bodies.h"
-#include "qpid/framing/reply_exceptions.h"
-
-using namespace std;
-using namespace boost;
-using namespace qpid::framing;
-using namespace qpid::sys;
-
-namespace qpid{
-namespace client{
-using namespace arg;
-
-const std::string empty;
-
-class ScopedSync
-{
- Session& session;
- const bool change;
- const bool value;
- public:
- ScopedSync(Session& s, bool desired = true) : session(s), change(s.isSynchronous() != desired), value(desired)
- {
- if (change) session.setSynchronous(value);
- }
- ~ScopedSync()
- {
- if (change) session.setSynchronous(!value);
- }
-};
-
-Channel::Channel(bool _transactional, u_int16_t _prefetch) :
- prefetch(_prefetch), transactional(_transactional), running(false),
- uniqueId(true)/*could eventually be the session id*/, nameCounter(0), active(false)
-{
-}
-
-Channel::~Channel()
-{
- join();
-}
-
-void Channel::open(const Session& s)
-{
- Mutex::ScopedLock l(stopLock);
- if (isOpen())
- throw SessionBusyException();
- active = true;
- session = s;
- if(isTransactional()) {
- session.txSelect();
- }
-}
-
-bool Channel::isOpen() const {
- Mutex::ScopedLock l(stopLock);
- return active;
-}
-
-void Channel::setPrefetch(uint32_t _prefetch){
- prefetch = _prefetch;
-}
-
-void Channel::declareExchange(Exchange& _exchange, bool synch){
- ScopedSync s(session, synch);
- session.exchangeDeclare(exchange=_exchange.getName(), type=_exchange.getType());
-}
-
-void Channel::deleteExchange(Exchange& _exchange, bool synch){
- ScopedSync s(session, synch);
- session.exchangeDelete(exchange=_exchange.getName(), ifUnused=false);
-}
-
-void Channel::declareQueue(Queue& _queue, bool synch){
- if (_queue.getName().empty()) {
- stringstream uniqueName;
- uniqueName << uniqueId << "-queue-" << ++nameCounter;
- _queue.setName(uniqueName.str());
- }
-
- ScopedSync s(session, synch);
- session.queueDeclare(queue=_queue.getName(), passive=false/*passive*/, durable=_queue.isDurable(),
- exclusive=_queue.isExclusive(), autoDelete=_queue.isAutoDelete());
-
-}
-
-void Channel::deleteQueue(Queue& _queue, bool ifunused, bool ifempty, bool synch){
- ScopedSync s(session, synch);
- session.queueDelete(queue=_queue.getName(), ifUnused=ifunused, ifEmpty=ifempty);
-}
-
-void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){
- string e = exchange.getName();
- string q = queue.getName();
- ScopedSync s(session, synch);
- session.exchangeBind(q, e, key, args);
-}
-
-void Channel::commit(){
- session.txCommit();
-}
-
-void Channel::rollback(){
- session.txRollback();
-}
-
-void Channel::consume(
- Queue& _queue, const std::string& tag, MessageListener* listener,
- AckMode ackMode, bool noLocal, bool synch, FieldTable* fields) {
-
- if (tag.empty()) {
- throw Exception("A tag must be specified for a consumer.");
- }
- {
- Mutex::ScopedLock l(lock);
- ConsumerMap::iterator i = consumers.find(tag);
- if (i != consumers.end())
- throw PreconditionFailedException(QPID_MSG("Consumer already exists with tag " << tag ));
- Consumer& c = consumers[tag];
- c.listener = listener;
- c.ackMode = ackMode;
- c.count = 0;
- }
- uint8_t confirmMode = ackMode == NO_ACK ? 1 : 0;
- ScopedSync s(session, synch);
- FieldTable ft;
- FieldTable* ftptr = fields ? fields : &ft;
- if (noLocal) {
- ftptr->setString("qpid.no-local","yes");
- }
- session.messageSubscribe(_queue.getName(), tag,
- confirmMode, 0/*pre-acquire*/,
- false, "", 0, *ftptr);
- if (!prefetch) {
- session.messageSetFlowMode(tag, 0/*credit based*/);
- }
-
- //allocate some credit:
- session.messageFlow(tag, 1/*BYTES*/, 0xFFFFFFFF);
- session.messageFlow(tag, 0/*MESSAGES*/, prefetch ? prefetch : 0xFFFFFFFF);
-}
-
-void Channel::cancel(const std::string& tag, bool synch) {
- Consumer c;
- {
- Mutex::ScopedLock l(lock);
- ConsumerMap::iterator i = consumers.find(tag);
- if (i == consumers.end())
- return;
- c = i->second;
- consumers.erase(i);
- }
- ScopedSync s(session, synch);
- session.messageCancel(tag);
-}
-
-bool Channel::get(Message& msg, const Queue& _queue, AckMode ackMode) {
- string tag = "get-handler";
- ScopedDivert handler(tag, session.getExecution().getDemux());
- Demux::QueuePtr incoming = handler.getQueue();
-
- session.messageSubscribe(destination=tag, queue=_queue.getName(), acceptMode=(ackMode == NO_ACK ? 1 : 0));
- session.messageFlow(tag, 1/*BYTES*/, 0xFFFFFFFF);
- session.messageFlow(tag, 0/*MESSAGES*/, 1);
- {
- ScopedSync s(session);
- session.messageFlush(tag);
- }
- session.messageCancel(tag);
-
- FrameSet::shared_ptr p;
- if (incoming->tryPop(p)) {
- msg.populate(*p);
- if (ackMode == AUTO_ACK) {
- AckPolicy acker;
- acker.ack(msg, session);
- } else {
- session.markCompleted(msg.getId(), false, false);
- }
- return true;
- }
- else
- return false;
-}
-
-void Channel::publish(Message& msg, const Exchange& exchange,
- const std::string& routingKey,
- bool mandatory, bool /*?TODO-restore immediate?*/) {
-
- msg.getDeliveryProperties().setRoutingKey(routingKey);
- msg.getDeliveryProperties().setDiscardUnroutable(!mandatory);
- session.messageTransfer(destination=exchange.getName(), content=msg);
-}
-
-void Channel::close()
-{
- session.close();
- {
- Mutex::ScopedLock l(stopLock);
- active = false;
- }
- stop();
-}
-
-void Channel::start(){
- running = true;
- dispatcher = Thread(*this);
-}
-
-void Channel::stop() {
- gets.close();
- join();
-}
-
-void Channel::join() {
- Mutex::ScopedLock l(stopLock);
- if(running && dispatcher.id()) {
- dispatcher.join();
- running = false;
- }
-}
-
-void Channel::dispatch(FrameSet& content, const std::string& destination)
-{
- ConsumerMap::iterator i = consumers.find(destination);
- if (i != consumers.end()) {
- Message msg;
- msg.populate(content);
- MessageListener* listener = i->second.listener;
- listener->received(msg);
- if (isOpen() && i->second.ackMode != CLIENT_ACK) {
- bool send = i->second.ackMode == AUTO_ACK
- || (prefetch && ++(i->second.count) > (prefetch / 2));
- if (send) i->second.count = 0;
- session.markCompleted(content.getId(), true, send);
- }
- } else {
- QPID_LOG(warning, "Dropping message for unrecognised consumer: " << destination);
- }
-}
-
-void Channel::run() {
- try {
- while (true) {
- FrameSet::shared_ptr content = session.get();
- //need to dispatch this to the relevant listener:
- if (content->isA<MessageTransferBody>()) {
- dispatch(*content, content->as<MessageTransferBody>()->getDestination());
- } else {
- QPID_LOG(warning, "Dropping unsupported message type: " << content->getMethod());
- }
- }
- } catch (const ClosedException&) {}
-}
-
-}}
-
diff --git a/cpp/src/qpid/client/Channel.h b/cpp/src/qpid/client/Channel.h
deleted file mode 100644
index 9a22c455c4..0000000000
--- a/cpp/src/qpid/client/Channel.h
+++ /dev/null
@@ -1,316 +0,0 @@
-#ifndef _client_Channel_h
-#define _client_Channel_h
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <memory>
-#include <boost/scoped_ptr.hpp>
-#include "qpid/framing/amqp_framing.h"
-#include "qpid/framing/Uuid.h"
-#include "Exchange.h"
-#include "Message.h"
-#include "Queue.h"
-#include "ConnectionImpl.h"
-#include "qpid/client/Session.h"
-#include "qpid/Exception.h"
-#include "qpid/sys/Mutex.h"
-#include "qpid/sys/Runnable.h"
-#include "qpid/sys/Thread.h"
-#include "AckMode.h"
-
-namespace qpid {
-
-namespace framing {
-class ChannelCloseBody;
-class AMQMethodBody;
-}
-
-namespace client {
-
-class Connection;
-class MessageChannel;
-class MessageListener;
-class ReturnedMessageHandler;
-
-/**
- * THIS CLASS IS DEPRECATED AND WILL BE SHORTLY REMOVED
- *
- * Represents an AMQP channel, i.e. loosely a session of work. It
- * is through a channel that most of the AMQP 'methods' are
- * exposed.
- *
- */
-class Channel : private sys::Runnable
-{
- private:
- struct Consumer{
- MessageListener* listener;
- AckMode ackMode;
- uint32_t count;
- };
- typedef std::map<std::string, Consumer> ConsumerMap;
-
- mutable sys::Mutex lock;
- sys::Thread dispatcher;
-
- uint32_t prefetch;
- const bool transactional;
- framing::ProtocolVersion version;
-
- mutable sys::Mutex stopLock;
- bool running;
-
- ConsumerMap consumers;
- Session session;
- framing::ChannelId channelId;
- sys::BlockingQueue<framing::FrameSet::shared_ptr> gets;
- framing::Uuid uniqueId;
- uint32_t nameCounter;
- bool active;
-
- void stop();
-
- void open(const Session& session);
- void closeInternal();
- void join();
-
- void dispatch(framing::FrameSet& msg, const std::string& destination);
-
- friend class Connection;
-
- public:
- /**
- * Creates a channel object.
- *
- * @param transactional if true, the publishing and acknowledgement
- * of messages will be transactional and can be committed or
- * aborted in atomic units (@see commit(), @see rollback()).
- *
- * @param prefetch specifies the number of unacknowledged
- * messages the channel is willing to have sent to it
- * asynchronously.
- */
- Channel(bool transactional = false, u_int16_t prefetch = 0);
-
- ~Channel();
-
- /**
- * Declares an exchange.
- *
- * In AMQP, Exchanges are the destinations to which messages
- * are published. They have Queues bound to them and route
- * messages they receive to those queues. The routing rules
- * depend on the type of the exchange.
- *
- * @param exchange an Exchange object representing the
- * exchange to declare.
- *
- * @param synch if true this call will block until a response
- * is received from the broker.
- */
- void declareExchange(Exchange& exchange, bool synch = true);
- /**
- * Deletes an exchange.
- *
- * @param exchange an Exchange object representing the exchange to delete.
- *
- * @param synch if true this call will block until a response
- * is received from the broker.
- */
- void deleteExchange(Exchange& exchange, bool synch = true);
- /**
- * Declares a Queue.
- *
- * @param queue a Queue object representing the queue to declare.
- *
- * @param synch if true this call will block until a response
- * is received from the broker.
- */
- void declareQueue(Queue& queue, bool synch = true);
- /**
- * Deletes a Queue.
- *
- * @param queue a Queue object representing the queue to delete.
- *
- * @param synch if true this call will block until a response
- * is received from the broker.
- */
- void deleteQueue(Queue& queue, bool ifunused = false, bool ifempty = false, bool synch = true);
- /**
- * Binds a queue to an exchange. The exact semantics of this
- * (in particular how 'routing keys' and 'binding arguments'
- * are used) depends on the type of the exchange.
- *
- * @param exchange an Exchange object representing the
- * exchange to bind to.
- *
- * @param queue a Queue object representing the queue to be
- * bound.
- *
- * @param key the 'routing key' for the binding.
- *
- * @param args the 'binding arguments' for the binding.
- *
- * @param synch if true this call will block until a response
- * is received from the broker.
- */
- void bind(const Exchange& exchange, const Queue& queue,
- const std::string& key,
- const framing::FieldTable& args=framing::FieldTable(),
- bool synch = true);
-
- /**
- * For a transactional channel this will commit all
- * publications and acknowledgements since the last commit (or
- * the channel that was opened if there has been no previous
- * commit). This will cause published messages to become
- * available to consumers and acknowledged messages to be
- * consumed and removed from the queues they were dispatched
- * from.
- *
- * A channel is specified as transactional or not when the channel
- * object is created (@see Channel()).
- */
- void commit();
-
- /**
- * For a transactional channel, this will rollback any
- * publications or acknowledgements. It will be as if the
- * published messages were never sent and the acknowledged
- * messages were never consumed.
- */
- void rollback();
-
- /**
- * Change the prefetch in use.
- */
- void setPrefetch(uint32_t prefetch);
-
- uint32_t getPrefetch() { return prefetch; }
-
- /**
- * Start message dispatching on a new thread.
- */
- void start();
-
- /**
- * Close the channel. Closing a channel that is not open has no
- * effect.
- */
- void close();
-
- /** True if the channel is transactional. */
- bool isTransactional() { return transactional; }
-
- /** True if the channel is open. */
- bool isOpen() const;
-
- /** Return the protocol version. */
- framing::ProtocolVersion getVersion() const { return version ; }
-
- /**
- * Creates a 'consumer' for a queue. Messages in (or arriving
- * at) that queue will be delivered to consumers
- * asynchronously.
- *
- * @param queue a Queue instance representing the queue to
- * consume from.
- *
- * @param tag an identifier to associate with the consumer
- * that can be used to cancel its subscription (if empty, this
- * will be assigned by the broker).
- *
- * @param listener a pointer to an instance of an
- * implementation of the MessageListener interface. Messages
- * received from this queue for this consumer will result in
- * invocation of the received() method on the listener, with
- * the message itself passed in.
- *
- * @param ackMode the mode of acknowledgement that the broker
- * should assume for this consumer. @see AckMode
- *
- * @param noLocal if true, this consumer will not be sent any
- * message published by this connection.
- *
- * @param synch if true this call will block until a response
- * is received from the broker.
- */
- void consume(
- Queue& queue, const std::string& tag, MessageListener* listener,
- AckMode ackMode = NO_ACK, bool noLocal = false, bool synch = true,
- framing::FieldTable* fields = 0);
-
- /**
- * Cancels a subscription previously set up through a call to consume().
- *
- * @param tag the identifier used (or assigned) in the consume
- * request that set up the subscription to be cancelled.
- *
- * @param synch if true this call will block until a response
- * is received from the broker.
- */
- void cancel(const std::string& tag, bool synch = true);
- /**
- * Synchronous pull of a message from a queue.
- *
- * @param msg a message object that will contain the message
- * headers and content if the call completes.
- *
- * @param queue the queue to consume from.
- *
- * @param ackMode the acknowledgement mode to use (@see
- * AckMode)
- *
- * @return true if a message was succcessfully dequeued from
- * the queue, false if the queue was empty.
- */
- bool get(Message& msg, const Queue& queue, AckMode ackMode = NO_ACK);
-
- /**
- * Publishes (i.e. sends a message to the broker).
- *
- * @param msg the message to publish.
- *
- * @param exchange the exchange to publish the message to.
- *
- * @param routingKey the routing key to publish with.
- *
- * @param mandatory if true and the exchange to which this
- * publish is directed has no matching bindings, the message
- * will be returned (see setReturnedMessageHandler()).
- *
- * @param immediate if true and there is no consumer to
- * receive this message on publication, the message will be
- * returned (see setReturnedMessageHandler()).
- */
- void publish(Message& msg, const Exchange& exchange,
- const std::string& routingKey,
- bool mandatory = false, bool immediate = false);
-
- /**
- * Deliver incoming messages to the appropriate MessageListener.
- */
- void run();
-};
-
-}}
-
-#endif /*!_client_Channel_h*/
diff --git a/cpp/src/qpid/client/Connection.cpp b/cpp/src/qpid/client/Connection.cpp
index 4089ad79ce..82d1eac8b4 100644
--- a/cpp/src/qpid/client/Connection.cpp
+++ b/cpp/src/qpid/client/Connection.cpp
@@ -20,7 +20,6 @@
*/
#include "Connection.h"
#include "ConnectionSettings.h"
-#include "Channel.h"
#include "Message.h"
#include "SessionImpl.h"
#include "qpid/log/Logger.h"
@@ -73,13 +72,6 @@ void Connection::open(const ConnectionSettings& settings)
max_frame_size = impl->getNegotiatedSettings().maxFrameSize;
}
-void Connection::openChannel(Channel& channel)
-{
- if (!impl)
- throw Exception(QPID_MSG("Connection has not yet been opened"));
- channel.open(newSession(ASYNC));
-}
-
Session Connection::newSession(SynchronousMode sync,
uint32_t detachedLifetime)
{
diff --git a/cpp/src/qpid/client/Connection.h b/cpp/src/qpid/client/Connection.h
index 417739fd1d..0c01c77509 100644
--- a/cpp/src/qpid/client/Connection.h
+++ b/cpp/src/qpid/client/Connection.h
@@ -23,7 +23,6 @@
*/
#include <map>
#include <string>
-#include "Channel.h"
#include "ConnectionImpl.h"
#include "qpid/client/Session.h"
#include "qpid/framing/AMQP_HighestVersion.h"
@@ -108,18 +107,6 @@ class Connection
void close();
/**
- * Associate a Channel with this connection and open it for use.
- *
- * In AMQP, channels are like multiplexed 'sessions' of work over
- * a connection. Almost all the interaction with AMQP is done over
- * a channel.
- *
- * @param connection the connection object to be associated with
- * the channel. Call Channel::close() to close the channel.
- */
- void openChannel(Channel&);
-
- /**
* Create a new session on this connection. Sessions allow
* multiple streams of work to be multiplexed over the same
* connection.
diff --git a/cpp/src/tests/ClientChannelTest.cpp b/cpp/src/tests/ClientChannelTest.cpp
deleted file mode 100644
index 605d5e4885..0000000000
--- a/cpp/src/tests/ClientChannelTest.cpp
+++ /dev/null
@@ -1,220 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <vector>
-#include "qpid_test_plugin.h"
-#include "BrokerFixture.h"
-#include "qpid/client/Channel.h"
-#include "qpid/client/Message.h"
-#include "qpid/client/Queue.h"
-#include "qpid/client/Exchange.h"
-#include "qpid/client/MessageListener.h"
-#include "qpid/client/BasicMessageChannel.h"
-#include "qpid/client/MessageMessageChannel.h"
-
-using namespace std;
-using namespace boost;
-using namespace qpid::client;
-using namespace qpid::sys;
-using namespace qpid::framing;
-
-/// Small frame size so we can create fragmented messages.
-const size_t FRAME_MAX = 256;
-
-
-/**
- * Test base for client API using an in-process broker.
- * The test base defines the tests methods, derived classes
- * instantiate the channel in Basic or Message mode.
- */
-class ChannelTestBase : public CppUnit::TestCase, public SessionFixture
-{
- struct Listener: public qpid::client::MessageListener {
- vector<Message> messages;
- Monitor monitor;
- void received(Message& msg) {
- Mutex::ScopedLock l(monitor);
- messages.push_back(msg);
- monitor.notifyAll();
- }
- };
-
- const std::string qname;
- const std::string data;
- Queue queue;
- Exchange exchange;
- Listener listener;
-
- protected:
- boost::scoped_ptr<Channel> channel;
-
- public:
-
- ChannelTestBase()
- : qname("testq"), data("hello"),
- queue(qname, true), exchange("", Exchange::DIRECT_EXCHANGE)
- {}
-
- void setUp() {
- CPPUNIT_ASSERT(channel);
- connection.openChannel(*channel);
- CPPUNIT_ASSERT(channel->getId() != 0);
- channel->declareQueue(queue);
- }
-
- void testPublishGet() {
- Message pubMsg(data);
- pubMsg.getHeaders().setString("hello", "world");
- channel->publish(pubMsg, exchange, qname);
- Message getMsg;
- CPPUNIT_ASSERT(channel->get(getMsg, queue));
- CPPUNIT_ASSERT_EQUAL(data, getMsg.getData());
- CPPUNIT_ASSERT_EQUAL(string("world"),
- getMsg.getHeaders().getString("hello"));
- CPPUNIT_ASSERT(!channel->get(getMsg, queue)); // Empty queue
- }
-
- void testGetNoContent() {
- Message pubMsg;
- pubMsg.getHeaders().setString("hello", "world");
- channel->publish(pubMsg, exchange, qname);
- Message getMsg;
- CPPUNIT_ASSERT(channel->get(getMsg, queue));
- CPPUNIT_ASSERT(getMsg.getData().empty());
- CPPUNIT_ASSERT_EQUAL(string("world"),
- getMsg.getHeaders().getString("hello"));
- }
-
- void testConsumeCancel() {
- string tag; // Broker assigned
- channel->consume(queue, tag, &listener);
- channel->start();
- CPPUNIT_ASSERT_EQUAL(size_t(0), listener.messages.size());
- channel->publish(Message("a"), exchange, qname);
- {
- Mutex::ScopedLock l(listener.monitor);
- Time deadline(now() + 1*TIME_SEC);
- while (listener.messages.size() != 1) {
- CPPUNIT_ASSERT(listener.monitor.wait(deadline));
- }
- }
- CPPUNIT_ASSERT_EQUAL(size_t(1), listener.messages.size());
- CPPUNIT_ASSERT_EQUAL(string("a"), listener.messages[0].getData());
-
- channel->publish(Message("b"), exchange, qname);
- channel->publish(Message("c"), exchange, qname);
- {
- Mutex::ScopedLock l(listener.monitor);
- while (listener.messages.size() != 3) {
- CPPUNIT_ASSERT(listener.monitor.wait(1*TIME_SEC));
- }
- }
- CPPUNIT_ASSERT_EQUAL(size_t(3), listener.messages.size());
- CPPUNIT_ASSERT_EQUAL(string("b"), listener.messages[1].getData());
- CPPUNIT_ASSERT_EQUAL(string("c"), listener.messages[2].getData());
-
- channel->cancel(tag);
- channel->publish(Message("d"), exchange, qname);
- CPPUNIT_ASSERT_EQUAL(size_t(3), listener.messages.size());
- {
- Mutex::ScopedLock l(listener.monitor);
- CPPUNIT_ASSERT(!listener.monitor.wait(TIME_SEC/2));
- }
- Message msg;
- CPPUNIT_ASSERT(channel->get(msg, queue));
- CPPUNIT_ASSERT_EQUAL(string("d"), msg.getData());
- }
-
- // Consume already-published messages
- void testConsumePublished() {
- Message pubMsg("x");
- pubMsg.getHeaders().setString("y", "z");
- channel->publish(pubMsg, exchange, qname);
- string tag;
- channel->consume(queue, tag, &listener);
- CPPUNIT_ASSERT_EQUAL(size_t(0), listener.messages.size());
- channel->start();
- {
- Mutex::ScopedLock l(listener.monitor);
- while (listener.messages.size() != 1)
- CPPUNIT_ASSERT(listener.monitor.wait(1*TIME_SEC));
- }
- CPPUNIT_ASSERT_EQUAL(string("x"), listener.messages[0].getData());
- CPPUNIT_ASSERT_EQUAL(string("z"),
- listener.messages[0].getHeaders().getString("y"));
- }
-
- void testGetFragmentedMessage() {
- string longStr(FRAME_MAX*2, 'x'); // Longer than max frame size.
- channel->publish(Message(longStr), exchange, qname);
- Message getMsg;
- CPPUNIT_ASSERT(channel->get(getMsg, queue));
- }
-
- void testConsumeFragmentedMessage() {
- string xx(FRAME_MAX*2, 'x');
- channel->publish(Message(xx), exchange, qname);
- channel->start();
- string tag;
- channel->consume(queue, tag, &listener);
- string yy(FRAME_MAX*2, 'y');
- channel->publish(Message(yy), exchange, qname);
- {
- Mutex::ScopedLock l(listener.monitor);
- while (listener.messages.size() != 2)
- CPPUNIT_ASSERT(listener.monitor.wait(1*TIME_SEC));
- }
- CPPUNIT_ASSERT_EQUAL(xx, listener.messages[0].getData());
- CPPUNIT_ASSERT_EQUAL(yy, listener.messages[1].getData());
- }
-};
-
-class BasicChannelTest : public ChannelTestBase {
- CPPUNIT_TEST_SUITE(BasicChannelTest);
- CPPUNIT_TEST(testPublishGet);
- CPPUNIT_TEST(testGetNoContent);
- CPPUNIT_TEST(testConsumeCancel);
- CPPUNIT_TEST(testConsumePublished);
- CPPUNIT_TEST(testGetFragmentedMessage);
- CPPUNIT_TEST(testConsumeFragmentedMessage);
- CPPUNIT_TEST_SUITE_END();
-
- public:
- BasicChannelTest(){
- channel.reset(new Channel(false, 500, Channel::AMQP_08));
- }
-};
-
-class MessageChannelTest : public ChannelTestBase {
- CPPUNIT_TEST_SUITE(MessageChannelTest);
- CPPUNIT_TEST(testPublishGet);
- CPPUNIT_TEST(testGetNoContent);
- CPPUNIT_TEST(testGetFragmentedMessage);
- CPPUNIT_TEST_SUITE_END();
- public:
- MessageChannelTest() {
- channel.reset(new Channel(false, 500, Channel::AMQP_09));
- }
-};
-
-// Make this test suite a plugin.
-CPPUNIT_PLUGIN_IMPLEMENT();
-CPPUNIT_TEST_SUITE_REGISTRATION(BasicChannelTest);
-CPPUNIT_TEST_SUITE_REGISTRATION(MessageChannelTest);
diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am
index 4a1a8d9a66..6c68a9d648 100644
--- a/cpp/src/tests/Makefile.am
+++ b/cpp/src/tests/Makefile.am
@@ -102,9 +102,6 @@ broker_unit_tests = \
TxPublishTest \
MessageBuilderTest
-#client_unit_tests = \
-# ClientChannelTest
-
framing_unit_tests = \
FramingTest \
HeaderTest \
@@ -121,10 +118,9 @@ unit_tests = \
testprogs= \
client_test \
topic_listener \
- topic_publisher
-# echo_service
+ topic_publisher
-check_PROGRAMS += $(testprogs) interop_runner publish consume
+check_PROGRAMS += $(testprogs) publish consume
TESTS_ENVIRONMENT = VALGRIND=$(VALGRIND) srcdir=$(srcdir) QPID_DATA_DIR= $(srcdir)/run_test
@@ -172,17 +168,22 @@ gen.mk: Makefile.am
CLEANFILES+=valgrind.out *.log *.vglog dummy_test $(unit_wrappers)
MAINTAINERCLEANFILES=gen.mk
-interop_runner_SOURCES = \
- interop_runner.cpp \
- SimpleTestCaseBase.cpp \
- BasicP2PTest.cpp \
- BasicPubSubTest.cpp \
- SimpleTestCaseBase.h \
- BasicP2PTest.h \
- BasicPubSubTest.h \
- TestCase.h \
- TestOptions.h
-interop_runner_LDADD = $(lib_client) $(lib_common) $(extra_libs)
+# FIXME aconway 2008-05-23: Disabled interop_runner because it uses
+# the obsolete Channel class. Convert to Session and re-enable.
+#
+# check_PROGRAMS += interop_runner
+
+# interop_runner_SOURCES = \
+# interop_runner.cpp \
+# SimpleTestCaseBase.cpp \
+# BasicP2PTest.cpp \
+# BasicPubSubTest.cpp \
+# SimpleTestCaseBase.h \
+# BasicP2PTest.h \
+# BasicPubSubTest.h \
+# TestCase.h \
+# TestOptions.h
+# interop_runner_LDADD = $(lib_client) $(lib_common) $(extra_libs)
publish_SOURCES = publish.cpp
publish_LDADD = $(lib_client) $(lib_common) $(extra_libs)
diff --git a/cpp/src/tests/echo_service.cpp b/cpp/src/tests/echo_service.cpp
deleted file mode 100644
index c3569d5fd4..0000000000
--- a/cpp/src/tests/echo_service.cpp
+++ /dev/null
@@ -1,229 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-/**
- * This class provides an example of using AMQP for a request-response
- * style system. 'Requests' are messages sent to a well known
- * destination. A 'service' process consumes these message and
- * responds by echoing the message back to the sender on a
- * sender-specified private queue.
- */
-
-#include "qpid/client/Channel.h"
-#include "qpid/client/Connection.h"
-#include "qpid/client/Exchange.h"
-#include "qpid/client/MessageListener.h"
-#include "qpid/client/Queue.h"
-#include "qpid/sys/Time.h"
-#include <iostream>
-#include <sstream>
-
-using namespace qpid::client;
-using namespace qpid::sys;
-using std::string;
-
-
-/**
- * A message listener implementation representing the 'service', this
- * will 'echo' any requests received.
- */
-class EchoServer : public MessageListener{
- Channel* const channel;
-public:
- EchoServer(Channel* channel);
- virtual void received(Message& msg);
-};
-
-/**
- * A message listener implementation that merely prints received
- * messages to the console. Used to report on 'echo' responses.
- */
-class LoggingListener : public MessageListener{
-public:
- virtual void received(Message& msg);
-};
-
-/**
- * A utility class that manages the command line options needed to run
- * the example confirgurably.
- */
-class Args{
- string host;
- int port;
- bool trace;
- bool help;
- bool client;
-public:
- inline Args() : host("localhost"), port(5672), trace(false), help(false), client(false){}
- void parse(int argc, char** argv);
- void usage();
-
- inline const string& getHost() const { return host;}
- inline int getPort() const { return port; }
- inline bool getTrace() const { return trace; }
- inline bool getHelp() const { return help; }
- inline bool getClient() const { return client; }
-};
-
-/**
- * The main test path. There are two basic modes: 'client' and
- * 'service'. First one or more services are started, then one or more
- * clients are started and messages can be sent.
- */
-int main(int argc, char** argv){
- const std::string echo_service("echo_service");
- Args args;
- args.parse(argc, argv);
- if (args.getHelp()) {
- args.usage();
- } else if (args.getClient()) {
- //we have been started in 'client' mode, i.e. we will send an
- //echo requests and print responses received.
- try {
- //Create connection & open a channel
- Connection connection(args.getTrace());
- connection.open(args.getHost(), args.getPort());
- Channel channel;
- connection.openChannel(channel);
-
- //Setup: declare the private 'response' queue and bind it
- //to the direct exchange by its name which will be
- //generated by the server
- Queue response;
- channel.declareQueue(response);
- qpid::framing::FieldTable emptyArgs;
- channel.bind(Exchange::STANDARD_DIRECT_EXCHANGE, response, response.getName(), emptyArgs);
-
- //Consume from the response queue, logging all echoed message to console:
- LoggingListener listener;
- std::string tag;
- channel.consume(response, tag, &listener);
-
- //Process incoming requests on a new thread
- channel.start();
-
- //get messages from console and send them:
- std::string text;
- std::cout << "Enter text to send:" << std::endl;
- while (std::getline(std::cin, text)) {
- std::cout << "Sending " << text << " to echo server." << std::endl;
- Message msg;
- msg.getHeaders().setString("RESPONSE_QUEUE", response.getName());
- msg.setData(text);
- channel.publish(msg, Exchange::STANDARD_DIRECT_EXCHANGE, echo_service);
-
- std::cout << "Enter text to send:" << std::endl;
- }
-
- connection.close();
- } catch(std::exception& error) {
- std::cout << error.what() << std::endl;
- }
- } else {
- // we are in 'service' mode, i.e. we will consume messages
- // from the request queue and echo each request back to the
- // senders own private response queue.
- try {
- //Create connection & open a channel
- Connection connection(args.getTrace());
- connection.open(args.getHost(), args.getPort());
- Channel channel;
- connection.openChannel(channel);
-
- //Setup: declare the 'request' queue and bind it to the direct exchange with a 'well known' name
- Queue request("request");
- channel.declareQueue(request);
- qpid::framing::FieldTable emptyArgs;
- channel.bind(Exchange::STANDARD_DIRECT_EXCHANGE, request, echo_service, emptyArgs);
-
- //Consume from the request queue, echoing back all messages received to the client that sent them
- EchoServer server(&channel);
- std::string tag = "server_tag";
- channel.consume(request, tag, &server);
-
- //Process incoming requests on the main thread
- channel.run();
-
- connection.close();
- } catch(std::exception& error) {
- std::cout << error.what() << std::endl;
- }
- }
-}
-
-EchoServer::EchoServer(Channel* _channel) : channel(_channel){}
-
-void EchoServer::received(Message& message)
-{
- //get name of response queues binding to the default direct exchange:
- const std::string name = message.getHeaders().getString("RESPONSE_QUEUE");
-
- if (name.empty()) {
- std::cout << "Cannot echo " << message.getData() << ", no response queue specified." << std::endl;
- } else {
- //print message to console:
- std::cout << "Echoing " << message.getData() << " back to " << name << std::endl;
-
- //'echo' the message back:
- channel->publish(message, Exchange::STANDARD_DIRECT_EXCHANGE, name);
- }
-}
-
-void LoggingListener::received(Message& message)
-{
- //print message to console:
- std::cout << "Received echo: " << message.getData() << std::endl;
-}
-
-
-void Args::parse(int argc, char** argv){
- for(int i = 1; i < argc; i++){
- string name(argv[i]);
- if("-help" == name){
- help = true;
- break;
- }else if("-host" == name){
- host = argv[++i];
- }else if("-port" == name){
- port = atoi(argv[++i]);
- }else if("-trace" == name){
- trace = true;
- }else if("-client" == name){
- client = true;
- }else{
- std::cout << "Warning: unrecognised option " << name << std::endl;
- }
- }
-}
-
-void Args::usage(){
- std::cout << "Options:" << std::endl;
- std::cout << " -help" << std::endl;
- std::cout << " Prints this usage message" << std::endl;
- std::cout << " -host <host>" << std::endl;
- std::cout << " Specifies host to connect to (default is localhost)" << std::endl;
- std::cout << " -port <port>" << std::endl;
- std::cout << " Specifies port to conect to (default is 5762)" << std::endl;
- std::cout << " -trace" << std::endl;
- std::cout << " Indicates that the frames sent and received should be logged" << std::endl;
- std::cout << " -client" << std::endl;
- std::cout << " Run as a client (else will run as a server)" << std::endl;
-}