summaryrefslogtreecommitdiff
path: root/cpp/client
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/client')
-rw-r--r--cpp/client/Makefile43
-rw-r--r--cpp/client/inc/Channel.h127
-rw-r--r--cpp/client/inc/Connection.h105
-rw-r--r--cpp/client/inc/Exchange.h49
-rw-r--r--cpp/client/inc/IncomingMessage.h60
-rw-r--r--cpp/client/inc/Message.h86
-rw-r--r--cpp/client/inc/MessageListener.h37
-rw-r--r--cpp/client/inc/Queue.h47
-rw-r--r--cpp/client/inc/ResponseHandler.h49
-rw-r--r--cpp/client/inc/ReturnedMessageHandler.h37
-rw-r--r--cpp/client/src/Channel.cpp432
-rw-r--r--cpp/client/src/Connection.cpp237
-rw-r--r--cpp/client/src/Exchange.cpp30
-rw-r--r--cpp/client/src/IncomingMessage.cpp85
-rw-r--r--cpp/client/src/Message.cpp147
-rw-r--r--cpp/client/src/Queue.cpp47
-rw-r--r--cpp/client/src/ResponseHandler.cpp63
-rw-r--r--cpp/client/test/Makefile45
-rw-r--r--cpp/client/test/client_test.cpp97
-rw-r--r--cpp/client/test/topic_listener.cpp180
-rw-r--r--cpp/client/test/topic_publisher.cpp253
21 files changed, 2256 insertions, 0 deletions
diff --git a/cpp/client/Makefile b/cpp/client/Makefile
new file mode 100644
index 0000000000..d08b92fe2b
--- /dev/null
+++ b/cpp/client/Makefile
@@ -0,0 +1,43 @@
+#
+# Copyright (c) 2006 The Apache Software Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#
+# Build client library.
+#
+
+QPID_HOME = ../..
+include ${QPID_HOME}/cpp/options.mk
+
+SOURCES := $(wildcard src/*.cpp)
+OBJECTS := $(subst .cpp,.o,$(SOURCES))
+CLIENT_LIB=$(LIB_DIR)/libqpid_client.so.1.0
+
+.PHONY: all test clean
+
+all: $(CLIENT_LIB)
+
+test:
+ @$(MAKE) -C test all
+
+clean:
+ -@rm -f $(CLIENT_LIB) $(OBJECTS) src/*.d
+ $(MAKE) -C test clean
+
+$(CLIENT_LIB): $(OBJECTS)
+ $(CXX) -shared -o $@ $^ $(LDFLAGS) $(COMMON_LIB)
+
+# Dependencies
+-include $(SOURCES:.cpp=.d)
diff --git a/cpp/client/inc/Channel.h b/cpp/client/inc/Channel.h
new file mode 100644
index 0000000000..debecf922e
--- /dev/null
+++ b/cpp/client/inc/Channel.h
@@ -0,0 +1,127 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <map>
+#include <string>
+#include <queue>
+#include "sys/types.h"
+
+#ifndef _Channel_
+#define _Channel_
+
+#include "amqp_framing.h"
+
+#include "ThreadFactory.h"
+
+#include "Connection.h"
+#include "Exchange.h"
+#include "IncomingMessage.h"
+#include "Message.h"
+#include "MessageListener.h"
+#include "Queue.h"
+#include "ResponseHandler.h"
+#include "ReturnedMessageHandler.h"
+
+namespace qpid {
+namespace client {
+ enum ack_modes {NO_ACK=0, AUTO_ACK=1, LAZY_ACK=2, CLIENT_ACK=3};
+
+ class Channel : private virtual qpid::framing::BodyHandler, public virtual qpid::concurrent::Runnable{
+ struct Consumer{
+ MessageListener* listener;
+ int ackMode;
+ int count;
+ u_int64_t lastDeliveryTag;
+ };
+ typedef std::map<string,Consumer*>::iterator consumer_iterator;
+
+ u_int16_t id;
+ Connection* con;
+ qpid::concurrent::ThreadFactory* threadFactory;
+ qpid::concurrent::Thread* dispatcher;
+ qpid::framing::OutputHandler* out;
+ IncomingMessage* incoming;
+ ResponseHandler responses;
+ std::queue<IncomingMessage*> messages;//holds returned messages or those delivered for a consume
+ IncomingMessage* retrieved;//holds response to basic.get
+ qpid::concurrent::Monitor* dispatchMonitor;
+ qpid::concurrent::Monitor* retrievalMonitor;
+ std::map<std::string, Consumer*> consumers;
+ ReturnedMessageHandler* returnsHandler;
+ bool closed;
+
+ u_int16_t prefetch;
+ const bool transactional;
+
+ void enqueue();
+ void retrieve(Message& msg);
+ IncomingMessage* dequeue();
+ void dispatch();
+ void stop();
+ void sendAndReceive(qpid::framing::AMQFrame* frame, const qpid::framing::AMQMethodBody& body);
+ void deliver(Consumer* consumer, Message& msg);
+ void setQos();
+ void cancelAll();
+
+ virtual void handleMethod(qpid::framing::AMQMethodBody::shared_ptr body);
+ virtual void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr body);
+ virtual void handleContent(qpid::framing::AMQContentBody::shared_ptr body);
+ virtual void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body);
+
+ public:
+ Channel(bool transactional = false, u_int16_t prefetch = 500);
+ ~Channel();
+
+ void declareExchange(Exchange& exchange, bool synch = true);
+ void deleteExchange(Exchange& exchange, bool synch = true);
+ void declareQueue(Queue& queue, bool synch = true);
+ void deleteQueue(Queue& queue, bool ifunused = false, bool ifempty = false, bool synch = true);
+ void bind(const Exchange& exchange, const Queue& queue, const std::string& key,
+ const qpid::framing::FieldTable& args, bool synch = true);
+ void consume(Queue& queue, std::string& tag, MessageListener* listener,
+ int ackMode = NO_ACK, bool noLocal = false, bool synch = true);
+ void cancel(std::string& tag, bool synch = true);
+ bool get(Message& msg, const Queue& queue, int ackMode = NO_ACK);
+ void publish(Message& msg, const Exchange& exchange, const std::string& routingKey,
+ bool mandatory = false, bool immediate = false);
+
+ void commit();
+ void rollback();
+
+ void setPrefetch(u_int16_t prefetch);
+
+ /**
+ * Start message dispatching on a new thread
+ */
+ void start();
+ /**
+ * Do message dispatching on this thread
+ */
+ void run();
+
+ void close();
+
+ void setReturnedMessageHandler(ReturnedMessageHandler* handler);
+
+ friend class Connection;
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/client/inc/Connection.h b/cpp/client/inc/Connection.h
new file mode 100644
index 0000000000..89169e92b1
--- /dev/null
+++ b/cpp/client/inc/Connection.h
@@ -0,0 +1,105 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <map>
+#include <string>
+
+#ifndef _Connection_
+#define _Connection_
+
+#include "QpidError.h"
+#include "Connector.h"
+#include "ShutdownHandler.h"
+#include "TimeoutHandler.h"
+
+#include "amqp_framing.h"
+#include "Exchange.h"
+#include "IncomingMessage.h"
+#include "Message.h"
+#include "MessageListener.h"
+#include "Queue.h"
+#include "ResponseHandler.h"
+
+namespace qpid {
+namespace client {
+
+ class Channel;
+
+ class Connection : public virtual qpid::framing::InputHandler,
+ public virtual qpid::io::TimeoutHandler,
+ public virtual qpid::io::ShutdownHandler,
+ private virtual qpid::framing::BodyHandler{
+
+ typedef std::map<int, Channel*>::iterator iterator;
+
+ static u_int16_t channelIdCounter;
+
+ std::string host;
+ int port;
+ const u_int32_t max_frame_size;
+ std::map<int, Channel*> channels;
+ qpid::io::Connector* connector;
+ qpid::framing::OutputHandler* out;
+ ResponseHandler responses;
+ volatile bool closed;
+
+ void channelException(Channel* channel, qpid::framing::AMQMethodBody* body, QpidError& e);
+ void error(int code, const string& msg, int classid = 0, int methodid = 0);
+ void closeChannel(Channel* channel, u_int16_t code, string& text, u_int16_t classId = 0, u_int16_t methodId = 0);
+ void sendAndReceive(qpid::framing::AMQFrame* frame, const qpid::framing::AMQMethodBody& body);
+
+ virtual void handleMethod(qpid::framing::AMQMethodBody::shared_ptr body);
+ virtual void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr body);
+ virtual void handleContent(qpid::framing::AMQContentBody::shared_ptr body);
+ virtual void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body);
+
+ public:
+
+ Connection(bool debug = false, u_int32_t max_frame_size = 65536);
+ ~Connection();
+ void open(const std::string& host, int port = 5672,
+ const std::string& uid = "guest", const std::string& pwd = "guest",
+ const std::string& virtualhost = "/");
+ void close();
+ void openChannel(Channel* channel);
+ /*
+ * Requests that the server close this channel, then removes
+ * the association to the channel from this connection
+ */
+ void closeChannel(Channel* channel);
+ /*
+ * Removes the channel from association with this connection,
+ * without sending a close request to the server.
+ */
+ void removeChannel(Channel* channel);
+
+ virtual void received(qpid::framing::AMQFrame* frame);
+
+ virtual void idleOut();
+ virtual void idleIn();
+
+ virtual void shutdown();
+
+ inline u_int32_t getMaxFrameSize(){ return max_frame_size; }
+ };
+
+
+}
+}
+
+
+#endif
diff --git a/cpp/client/inc/Exchange.h b/cpp/client/inc/Exchange.h
new file mode 100644
index 0000000000..66593a41cc
--- /dev/null
+++ b/cpp/client/inc/Exchange.h
@@ -0,0 +1,49 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <string>
+
+#ifndef _Exchange_
+#define _Exchange_
+
+namespace qpid {
+namespace client {
+
+ class Exchange{
+ const std::string name;
+ const std::string type;
+
+ public:
+
+ static const std::string DIRECT_EXCHANGE;
+ static const std::string TOPIC_EXCHANGE;
+ static const std::string HEADERS_EXCHANGE;
+
+ static const Exchange DEFAULT_DIRECT_EXCHANGE;
+ static const Exchange DEFAULT_TOPIC_EXCHANGE;
+ static const Exchange DEFAULT_HEADERS_EXCHANGE;
+
+ Exchange(std::string name, std::string type = DIRECT_EXCHANGE);
+ const std::string& getName() const;
+ const std::string& getType() const;
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/client/inc/IncomingMessage.h b/cpp/client/inc/IncomingMessage.h
new file mode 100644
index 0000000000..1fee6af433
--- /dev/null
+++ b/cpp/client/inc/IncomingMessage.h
@@ -0,0 +1,60 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <string>
+#include <vector>
+#include "amqp_framing.h"
+
+#ifndef _IncomingMessage_
+#define _IncomingMessage_
+
+#include "Message.h"
+
+namespace qpid {
+namespace client {
+
+ class IncomingMessage{
+ //content will be preceded by one of these method frames
+ qpid::framing::BasicDeliverBody::shared_ptr delivered;
+ qpid::framing::BasicReturnBody::shared_ptr returned;
+ qpid::framing::BasicGetOkBody::shared_ptr response;
+ qpid::framing::AMQHeaderBody::shared_ptr header;
+ std::vector<qpid::framing::AMQContentBody::shared_ptr> content;
+
+ long contentSize();
+ public:
+ IncomingMessage(qpid::framing::BasicDeliverBody::shared_ptr intro);
+ IncomingMessage(qpid::framing::BasicReturnBody::shared_ptr intro);
+ IncomingMessage(qpid::framing::BasicGetOkBody::shared_ptr intro);
+ ~IncomingMessage();
+ void setHeader(qpid::framing::AMQHeaderBody::shared_ptr header);
+ void addContent(qpid::framing::AMQContentBody::shared_ptr content);
+ bool isComplete();
+ bool isReturn();
+ bool isDelivery();
+ bool isResponse();
+ string& getConsumerTag();//only relevant if isDelivery()
+ qpid::framing::AMQHeaderBody::shared_ptr& getHeader();
+ u_int64_t getDeliveryTag();
+ void getData(string& data);
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/client/inc/Message.h b/cpp/client/inc/Message.h
new file mode 100644
index 0000000000..f8a5aef565
--- /dev/null
+++ b/cpp/client/inc/Message.h
@@ -0,0 +1,86 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <string>
+#include "amqp_framing.h"
+
+#ifndef _Message_
+#define _Message_
+
+
+namespace qpid {
+namespace client {
+
+ class Message{
+ qpid::framing::AMQHeaderBody::shared_ptr header;
+ string data;
+ bool redelivered;
+ u_int64_t deliveryTag;
+
+ qpid::framing::BasicHeaderProperties* getHeaderProperties();
+ Message(qpid::framing::AMQHeaderBody::shared_ptr& header);
+ public:
+ Message();
+ ~Message();
+
+ inline std::string getData(){ return data; }
+ inline void setData(const std::string& data){ this->data = data; }
+
+ inline bool isRedelivered(){ return redelivered; }
+ inline void setRedelivered(bool redelivered){ this->redelivered = redelivered; }
+
+ inline u_int64_t getDeliveryTag(){ return deliveryTag; }
+
+ std::string& getContentType();
+ std::string& getContentEncoding();
+ qpid::framing::FieldTable& getHeaders();
+ u_int8_t getDeliveryMode();
+ u_int8_t getPriority();
+ std::string& getCorrelationId();
+ std::string& getReplyTo();
+ std::string& getExpiration();
+ std::string& getMessageId();
+ u_int64_t getTimestamp();
+ std::string& getType();
+ std::string& getUserId();
+ std::string& getAppId();
+ std::string& getClusterId();
+
+ void setContentType(std::string& type);
+ void setContentEncoding(std::string& encoding);
+ void setHeaders(qpid::framing::FieldTable& headers);
+ void setDeliveryMode(u_int8_t mode);
+ void setPriority(u_int8_t priority);
+ void setCorrelationId(std::string& correlationId);
+ void setReplyTo(std::string& replyTo);
+ void setExpiration(std::string& expiration);
+ void setMessageId(std::string& messageId);
+ void setTimestamp(u_int64_t timestamp);
+ void setType(std::string& type);
+ void setUserId(std::string& userId);
+ void setAppId(std::string& appId);
+ void setClusterId(std::string& clusterId);
+
+
+ friend class Channel;
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/client/inc/MessageListener.h b/cpp/client/inc/MessageListener.h
new file mode 100644
index 0000000000..47307a4df5
--- /dev/null
+++ b/cpp/client/inc/MessageListener.h
@@ -0,0 +1,37 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <string>
+
+#ifndef _MessageListener_
+#define _MessageListener_
+
+#include "Message.h"
+
+namespace qpid {
+namespace client {
+
+ class MessageListener{
+ public:
+ virtual void received(Message& msg) = 0;
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/client/inc/Queue.h b/cpp/client/inc/Queue.h
new file mode 100644
index 0000000000..e0964af774
--- /dev/null
+++ b/cpp/client/inc/Queue.h
@@ -0,0 +1,47 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <string>
+
+#ifndef _Queue_
+#define _Queue_
+
+namespace qpid {
+namespace client {
+
+ class Queue{
+ std::string name;
+ const bool autodelete;
+ const bool exclusive;
+
+ public:
+
+ Queue();
+ Queue(std::string name);
+ Queue(std::string name, bool temp);
+ Queue(std::string name, bool autodelete, bool exclusive);
+ const std::string& getName() const;
+ void setName(const std::string&);
+ bool isAutoDelete() const;
+ bool isExclusive() const;
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/client/inc/ResponseHandler.h b/cpp/client/inc/ResponseHandler.h
new file mode 100644
index 0000000000..f5392c954d
--- /dev/null
+++ b/cpp/client/inc/ResponseHandler.h
@@ -0,0 +1,49 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <string>
+#include "amqp_framing.h"
+#include "Monitor.h"
+
+#ifndef _ResponseHandler_
+#define _ResponseHandler_
+
+namespace qpid {
+ namespace client {
+
+ class ResponseHandler{
+ bool waiting;
+ qpid::framing::AMQMethodBody::shared_ptr response;
+ qpid::concurrent::Monitor* monitor;
+
+ public:
+ ResponseHandler();
+ ~ResponseHandler();
+ inline bool isWaiting(){ return waiting; }
+ inline qpid::framing::AMQMethodBody::shared_ptr getResponse(){ return response; }
+ bool validate(const qpid::framing::AMQMethodBody& expected);
+ void waitForResponse();
+ void signalResponse(qpid::framing::AMQMethodBody::shared_ptr response);
+ void receive(const qpid::framing::AMQMethodBody& expected);
+ void expect();//must be called before calling receive
+ };
+
+ }
+}
+
+
+#endif
diff --git a/cpp/client/inc/ReturnedMessageHandler.h b/cpp/client/inc/ReturnedMessageHandler.h
new file mode 100644
index 0000000000..0117778fde
--- /dev/null
+++ b/cpp/client/inc/ReturnedMessageHandler.h
@@ -0,0 +1,37 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <string>
+
+#ifndef _ReturnedMessageHandler_
+#define _ReturnedMessageHandler_
+
+#include "Message.h"
+
+namespace qpid {
+namespace client {
+
+ class ReturnedMessageHandler{
+ public:
+ virtual void returned(Message& msg) = 0;
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/client/src/Channel.cpp b/cpp/client/src/Channel.cpp
new file mode 100644
index 0000000000..e965f7e5dd
--- /dev/null
+++ b/cpp/client/src/Channel.cpp
@@ -0,0 +1,432 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "Channel.h"
+#include "MonitorImpl.h"
+#include "ThreadFactoryImpl.h"
+#include "Message.h"
+#include "QpidError.h"
+
+using namespace std::tr1;//to use dynamic_pointer_cast
+using namespace qpid::client;
+using namespace qpid::framing;
+using namespace qpid::concurrent;
+
+Channel::Channel(bool _transactional, u_int16_t _prefetch) : id(0), incoming(0), con(0), out(0),
+ prefetch(_prefetch),
+ transactional(_transactional),
+ dispatcher(0),
+ closed(true){
+ threadFactory = new ThreadFactoryImpl();
+ dispatchMonitor = new MonitorImpl();
+ retrievalMonitor = new MonitorImpl();
+}
+
+Channel::~Channel(){
+ if(dispatcher){
+ stop();
+ delete dispatcher;
+ }
+ delete retrievalMonitor;
+ delete dispatchMonitor;
+ delete threadFactory;
+}
+
+void Channel::setPrefetch(u_int16_t prefetch){
+ this->prefetch = prefetch;
+ if(con != 0 && out != 0){
+ setQos();
+ }
+}
+
+void Channel::setQos(){
+ sendAndReceive(new AMQFrame(id, new BasicQosBody(0, prefetch, false)), basic_qos_ok);
+ if(transactional){
+ sendAndReceive(new AMQFrame(id, new TxSelectBody()), tx_select_ok);
+ }
+}
+
+void Channel::declareExchange(Exchange& exchange, bool synch){
+ string name = exchange.getName();
+ string type = exchange.getType();
+ FieldTable args;
+ AMQFrame* frame = new AMQFrame(id, new ExchangeDeclareBody(0, name, type, false, false, false, false, !synch, args));
+ if(synch){
+ sendAndReceive(frame, exchange_declare_ok);
+ }else{
+ out->send(frame);
+ }
+}
+
+void Channel::deleteExchange(Exchange& exchange, bool synch){
+ string name = exchange.getName();
+ AMQFrame* frame = new AMQFrame(id, new ExchangeDeleteBody(0, name, false, !synch));
+ if(synch){
+ sendAndReceive(frame, exchange_delete_ok);
+ }else{
+ out->send(frame);
+ }
+}
+
+void Channel::declareQueue(Queue& queue, bool synch){
+ string name = queue.getName();
+ FieldTable args;
+ AMQFrame* frame = new AMQFrame(id, new QueueDeclareBody(0, name, false, false,
+ queue.isExclusive(),
+ queue.isAutoDelete(), !synch, args));
+ if(synch){
+ sendAndReceive(frame, queue_declare_ok);
+ if(queue.getName().length() == 0){
+ QueueDeclareOkBody::shared_ptr response =
+ dynamic_pointer_cast<QueueDeclareOkBody, AMQMethodBody>(responses.getResponse());
+ queue.setName(response->getQueue());
+ }
+ }else{
+ out->send(frame);
+ }
+}
+
+void Channel::deleteQueue(Queue& queue, bool ifunused, bool ifempty, bool synch){
+ //ticket, queue, ifunused, ifempty, nowait
+ string name = queue.getName();
+ AMQFrame* frame = new AMQFrame(id, new QueueDeleteBody(0, name, ifunused, ifempty, !synch));
+ if(synch){
+ sendAndReceive(frame, queue_delete_ok);
+ }else{
+ out->send(frame);
+ }
+}
+
+void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){
+ string e = exchange.getName();
+ string q = queue.getName();
+ AMQFrame* frame = new AMQFrame(id, new QueueBindBody(0, q, e, (string&) key,!synch, (FieldTable&) args));
+ if(synch){
+ sendAndReceive(frame, queue_bind_ok);
+ }else{
+ out->send(frame);
+ }
+}
+
+void Channel::consume(Queue& queue, std::string& tag, MessageListener* listener,
+ int ackMode, bool noLocal, bool synch){
+
+ string q = queue.getName();
+ AMQFrame* frame = new AMQFrame(id, new BasicConsumeBody(0, q, (string&) tag, noLocal, ackMode == NO_ACK, false, !synch));
+ if(synch){
+ sendAndReceive(frame, basic_consume_ok);
+ BasicConsumeOkBody::shared_ptr response = dynamic_pointer_cast<BasicConsumeOkBody, AMQMethodBody>(responses.getResponse());
+ tag = response->getConsumerTag();
+ }else{
+ out->send(frame);
+ }
+ Consumer* c = new Consumer();
+ c->listener = listener;
+ c->ackMode = ackMode;
+ c->lastDeliveryTag = 0;
+ consumers[tag] = c;
+}
+
+void Channel::cancel(std::string& tag, bool synch){
+ Consumer* c = consumers[tag];
+ if(c->ackMode == LAZY_ACK && c->lastDeliveryTag > 0){
+ out->send(new AMQFrame(id, new BasicAckBody(c->lastDeliveryTag, true)));
+ }
+
+ AMQFrame* frame = new AMQFrame(id, new BasicCancelBody((string&) tag, !synch));
+ if(synch){
+ sendAndReceive(frame, basic_cancel_ok);
+ }else{
+ out->send(frame);
+ }
+ consumers.erase(tag);
+ if(c != 0){
+ delete c;
+ }
+}
+
+void Channel::cancelAll(){
+ int count(consumers.size());
+ for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin()){
+ Consumer* c = i->second;
+ if((c->ackMode == LAZY_ACK || c->ackMode == AUTO_ACK) && c->lastDeliveryTag > 0){
+ out->send(new AMQFrame(id, new BasicAckBody(c->lastDeliveryTag, true)));
+ }
+ consumers.erase(i);
+ delete c;
+ }
+}
+
+void Channel::retrieve(Message& msg){
+ retrievalMonitor->acquire();
+ while(retrieved == 0){
+ retrievalMonitor->wait();
+ }
+
+ msg.header = retrieved->getHeader();
+ msg.deliveryTag = retrieved->getDeliveryTag();
+ retrieved->getData(msg.data);
+ delete retrieved;
+ retrieved = 0;
+
+ retrievalMonitor->release();
+}
+
+bool Channel::get(Message& msg, const Queue& queue, int ackMode){
+ string name = queue.getName();
+ AMQFrame* frame = new AMQFrame(id, new BasicGetBody(0, name, ackMode));
+ responses.expect();
+ out->send(frame);
+ responses.waitForResponse();
+ AMQMethodBody::shared_ptr response = responses.getResponse();
+ if(basic_get_ok.match(response.get())){
+ if(incoming != 0){
+ std::cout << "Existing message not complete" << std::endl;
+ THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete");
+ }else{
+ incoming = new IncomingMessage(dynamic_pointer_cast<BasicGetOkBody, AMQMethodBody>(response));
+ }
+ retrieve(msg);
+ return true;
+ }if(basic_get_empty.match(response.get())){
+ return false;
+ }else{
+ THROW_QPID_ERROR(PROTOCOL_ERROR + 500, "Unexpected response to basic.get.");
+ }
+}
+
+
+void Channel::publish(Message& msg, const Exchange& exchange, const std::string& routingKey, bool mandatory, bool immediate){
+ string e = exchange.getName();
+ string key = routingKey;
+
+ out->send(new AMQFrame(id, new BasicPublishBody(0, e, key, mandatory, immediate)));
+ //break msg up into header frame and content frame(s) and send these
+ string data = msg.getData();
+ msg.header->setContentSize(data.length());
+ AMQBody::shared_ptr body(static_pointer_cast<AMQBody, AMQHeaderBody>(msg.header));
+ out->send(new AMQFrame(id, body));
+
+ int data_length = data.length();
+ if(data_length > 0){
+ //TODO fragmentation of messages, need to know max frame size for connection
+ int frag_size = con->getMaxFrameSize() - 4;
+ if(data_length < frag_size){
+ out->send(new AMQFrame(id, new AMQContentBody(data)));
+ }else{
+ int frag_count = data_length / frag_size;
+ for(int i = 0; i < frag_count; i++){
+ int pos = i*frag_size;
+ int len = i < frag_count - 1 ? frag_size : data_length - pos;
+ string frag(data.substr(pos, len));
+ out->send(new AMQFrame(id, new AMQContentBody(frag)));
+ }
+ }
+ }
+}
+
+void Channel::commit(){
+ AMQFrame* frame = new AMQFrame(id, new TxCommitBody());
+ sendAndReceive(frame, tx_commit_ok);
+}
+
+void Channel::rollback(){
+ AMQFrame* frame = new AMQFrame(id, new TxRollbackBody());
+ sendAndReceive(frame, tx_rollback_ok);
+}
+
+void Channel::handleMethod(AMQMethodBody::shared_ptr body){
+ //channel.flow, channel.close, basic.deliver, basic.return or a response to a synchronous request
+ if(responses.isWaiting()){
+ responses.signalResponse(body);
+ }else if(basic_deliver.match(body.get())){
+ if(incoming != 0){
+ std::cout << "Existing message not complete [deliveryTag=" << incoming->getDeliveryTag() << "]" << std::endl;
+ THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete");
+ }else{
+ incoming = new IncomingMessage(dynamic_pointer_cast<BasicDeliverBody, AMQMethodBody>(body));
+ }
+ }else if(basic_return.match(body.get())){
+ if(incoming != 0){
+ std::cout << "Existing message not complete" << std::endl;
+ THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete");
+ }else{
+ incoming = new IncomingMessage(dynamic_pointer_cast<BasicReturnBody, AMQMethodBody>(body));
+ }
+ }else if(channel_close.match(body.get())){
+ con->removeChannel(this);
+ //need to signal application that channel has been closed through exception
+
+ }else if(channel_flow.match(body.get())){
+
+ }else{
+ //signal error
+ std::cout << "Unhandled method: " << *body << std::endl;
+ THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Unhandled method");
+ }
+}
+
+void Channel::handleHeader(AMQHeaderBody::shared_ptr body){
+ if(incoming == 0){
+ //handle invalid frame sequence
+ std::cout << "Invalid message sequence: got header before return or deliver." << std::endl;
+ THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got header before return or deliver.");
+ }else{
+ incoming->setHeader(body);
+ if(incoming->isComplete()){
+ enqueue();
+ }
+ }
+}
+
+void Channel::handleContent(AMQContentBody::shared_ptr body){
+ if(incoming == 0){
+ //handle invalid frame sequence
+ std::cout << "Invalid message sequence: got content before return or deliver." << std::endl;
+ THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got content before return or deliver.");
+ }else{
+ incoming->addContent(body);
+ if(incoming->isComplete()){
+ enqueue();
+ }
+ }
+}
+
+void Channel::handleHeartbeat(AMQHeartbeatBody::shared_ptr body){
+ THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Channel received heartbeat");
+}
+
+void Channel::start(){
+ dispatcher = threadFactory->create(this);
+ dispatcher->start();
+}
+
+void Channel::stop(){
+ closed = true;
+ dispatchMonitor->acquire();
+ dispatchMonitor->notify();
+ dispatchMonitor->release();
+ if(dispatcher){
+ dispatcher->join();
+ }
+}
+
+void Channel::run(){
+ dispatch();
+}
+
+void Channel::enqueue(){
+ if(incoming->isResponse()){
+ retrievalMonitor->acquire();
+ retrieved = incoming;
+ retrievalMonitor->notify();
+ retrievalMonitor->release();
+ }else{
+ dispatchMonitor->acquire();
+ messages.push(incoming);
+ dispatchMonitor->notify();
+ dispatchMonitor->release();
+ }
+ incoming = 0;
+}
+
+IncomingMessage* Channel::dequeue(){
+ dispatchMonitor->acquire();
+ while(messages.empty() && !closed){
+ dispatchMonitor->wait();
+ }
+ IncomingMessage* msg = 0;
+ if(!messages.empty()){
+ msg = messages.front();
+ messages.pop();
+ }
+ dispatchMonitor->release();
+ return msg;
+}
+
+void Channel::deliver(Consumer* consumer, Message& msg){
+ //record delivery tag:
+ consumer->lastDeliveryTag = msg.getDeliveryTag();
+
+ //allow registered listener to handle the message
+ consumer->listener->received(msg);
+
+ //if the handler calls close on the channel or connection while
+ //handling this message, then consumer will now have been deleted.
+ if(!closed){
+ bool multiple(false);
+ switch(consumer->ackMode){
+ case LAZY_ACK:
+ multiple = true;
+ if(++(consumer->count) < prefetch) break;
+ //else drop-through
+ case AUTO_ACK:
+ out->send(new AMQFrame(id, new BasicAckBody(msg.getDeliveryTag(), multiple)));
+ consumer->lastDeliveryTag = 0;
+ }
+ }
+
+ //as it stands, transactionality is entirely orthogonal to ack
+ //mode, though the acks will not be processed by the broker under
+ //a transaction until it commits.
+}
+
+void Channel::dispatch(){
+ while(!closed){
+ IncomingMessage* incomingMsg = dequeue();
+ if(incomingMsg){
+ //Note: msg is currently only valid for duration of this call
+ Message msg(incomingMsg->getHeader());
+ incomingMsg->getData(msg.data);
+ if(incomingMsg->isReturn()){
+ if(returnsHandler == 0){
+ //print warning to log/console
+ std::cout << "Message returned: " << msg.getData() << std::endl;
+ }else{
+ returnsHandler->returned(msg);
+ }
+ }else{
+ msg.deliveryTag = incomingMsg->getDeliveryTag();
+ std::string tag = incomingMsg->getConsumerTag();
+
+ if(consumers[tag] == 0){
+ //signal error
+ std::cout << "Unknown consumer: " << tag << std::endl;
+ }else{
+ deliver(consumers[tag], msg);
+ }
+ }
+ delete incomingMsg;
+ }
+ }
+}
+
+void Channel::setReturnedMessageHandler(ReturnedMessageHandler* handler){
+ returnsHandler = handler;
+}
+
+void Channel::sendAndReceive(AMQFrame* frame, const AMQMethodBody& body){
+ responses.expect();
+ out->send(frame);
+ responses.receive(body);
+}
+
+void Channel::close(){
+ if(con != 0){
+ con->closeChannel(this);
+ }
+}
diff --git a/cpp/client/src/Connection.cpp b/cpp/client/src/Connection.cpp
new file mode 100644
index 0000000000..eeb2330561
--- /dev/null
+++ b/cpp/client/src/Connection.cpp
@@ -0,0 +1,237 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "Connection.h"
+#include "Channel.h"
+#include "ConnectorImpl.h"
+#include "Message.h"
+#include "QpidError.h"
+#include <iostream>
+
+using namespace qpid::client;
+using namespace qpid::framing;
+using namespace qpid::io;
+using namespace qpid::concurrent;
+
+u_int16_t Connection::channelIdCounter;
+
+Connection::Connection(bool debug, u_int32_t _max_frame_size) : max_frame_size(_max_frame_size), closed(true){
+ connector = new ConnectorImpl(debug, _max_frame_size);
+}
+
+Connection::~Connection(){
+ delete connector;
+}
+
+void Connection::open(const std::string& host, int port, const std::string& uid, const std::string& pwd, const std::string& virtualhost){
+ this->host = host;
+ this->port = port;
+ connector->setInputHandler(this);
+ connector->setTimeoutHandler(this);
+ connector->setShutdownHandler(this);
+ out = connector->getOutputHandler();
+ connector->connect(host, port);
+
+ ProtocolInitiation* header = new ProtocolInitiation(8, 0);
+ responses.expect();
+ connector->init(header);
+ responses.receive(connection_start);
+
+ FieldTable props;
+ string mechanism("PLAIN");
+ string response = ((char)0) + uid + ((char)0) + pwd;
+ string locale("en_US");
+ responses.expect();
+ out->send(new AMQFrame(0, new ConnectionStartOkBody(props, mechanism, response, locale)));
+
+ /**
+ * Assume for now that further challenges will not be required
+ //receive connection.secure
+ responses.receive(connection_secure));
+ //send connection.secure-ok
+ out->send(new AMQFrame(0, new ConnectionSecureOkBody(response)));
+ **/
+
+ responses.receive(connection_tune);
+
+ ConnectionTuneBody::shared_ptr proposal = std::tr1::dynamic_pointer_cast<ConnectionTuneBody, AMQMethodBody>(responses.getResponse());
+ out->send(new AMQFrame(0, new ConnectionTuneOkBody(proposal->getChannelMax(), max_frame_size, proposal->getHeartbeat())));
+
+ u_int16_t heartbeat = proposal->getHeartbeat();
+ connector->setReadTimeout(heartbeat * 2);
+ connector->setWriteTimeout(heartbeat);
+
+ //send connection.open
+ string capabilities;
+ string vhost = virtualhost;
+ responses.expect();
+ out->send(new AMQFrame(0, new ConnectionOpenBody(vhost, capabilities, true)));
+ //receive connection.open-ok (or redirect, but ignore that for now esp. as using force=true).
+ responses.waitForResponse();
+ if(responses.validate(connection_open_ok)){
+ //ok
+ }else if(responses.validate(connection_redirect)){
+ //ignore for now
+ ConnectionRedirectBody::shared_ptr redirect(std::tr1::dynamic_pointer_cast<ConnectionRedirectBody, AMQMethodBody>(responses.getResponse()));
+ std::cout << "Received redirection to " << redirect->getHost() << std::endl;
+ }else{
+ THROW_QPID_ERROR(PROTOCOL_ERROR, "Bad response");
+ }
+
+}
+
+void Connection::close(){
+ if(!closed){
+ u_int16_t code(200);
+ string text("Ok");
+ u_int16_t classId(0);
+ u_int16_t methodId(0);
+
+ sendAndReceive(new AMQFrame(0, new ConnectionCloseBody(code, text, classId, methodId)), connection_close_ok);
+ connector->close();
+ }
+}
+
+void Connection::openChannel(Channel* channel){
+ channel->con = this;
+ channel->id = ++channelIdCounter;
+ channel->out = out;
+ channels[channel->id] = channel;
+ //now send frame to open channel and wait for response
+ string oob;
+ channel->sendAndReceive(new AMQFrame(channel->id, new ChannelOpenBody(oob)), channel_open_ok);
+ channel->setQos();
+ channel->closed = false;
+}
+
+void Connection::closeChannel(Channel* channel){
+ //send frame to close channel
+ u_int16_t code(200);
+ string text("Ok");
+ u_int16_t classId(0);
+ u_int16_t methodId(0);
+ closeChannel(channel, code, text, classId, methodId);
+}
+
+void Connection::closeChannel(Channel* channel, u_int16_t code, string& text, u_int16_t classId, u_int16_t methodId){
+ //send frame to close channel
+ channel->cancelAll();
+ channel->closed = true;
+ channel->sendAndReceive(new AMQFrame(channel->id, new ChannelCloseBody(code, text, classId, methodId)), channel_close_ok);
+ channel->con = 0;
+ channel->out = 0;
+ removeChannel(channel);
+}
+
+void Connection::removeChannel(Channel* channel){
+ //send frame to close channel
+
+ channels.erase(channel->id);
+ channel->out = 0;
+ channel->id = 0;
+ channel->con = 0;
+}
+
+void Connection::received(AMQFrame* frame){
+ u_int16_t channelId = frame->getChannel();
+
+ if(channelId == 0){
+ this->handleBody(frame->getBody());
+ }else{
+ Channel* channel = channels[channelId];
+ if(channel == 0){
+ error(504, "Unknown channel");
+ }else{
+ try{
+ channel->handleBody(frame->getBody());
+ }catch(qpid::QpidError e){
+ channelException(channel, dynamic_cast<AMQMethodBody*>(frame->getBody().get()), e);
+ }
+ }
+ }
+}
+
+void Connection::handleMethod(AMQMethodBody::shared_ptr body){
+ //connection.close, basic.deliver, basic.return or a response to a synchronous request
+ if(responses.isWaiting()){
+ responses.signalResponse(body);
+ }else if(connection_close.match(body.get())){
+ //send back close ok
+ //close socket
+ ConnectionCloseBody* request = dynamic_cast<ConnectionCloseBody*>(body.get());
+ std::cout << "Connection closed by server: " << request->getReplyCode() << ":" << request->getReplyText() << std::endl;
+ connector->close();
+ }else{
+ std::cout << "Unhandled method for connection: " << *body << std::endl;
+ error(504, "Unrecognised method", body->amqpClassId(), body->amqpMethodId());
+ }
+}
+
+void Connection::handleHeader(AMQHeaderBody::shared_ptr body){
+ error(504, "Channel error: received header body with channel 0.");
+}
+
+void Connection::handleContent(AMQContentBody::shared_ptr body){
+ error(504, "Channel error: received content body with channel 0.");
+}
+
+void Connection::handleHeartbeat(AMQHeartbeatBody::shared_ptr body){
+}
+
+void Connection::sendAndReceive(AMQFrame* frame, const AMQMethodBody& body){
+ responses.expect();
+ out->send(frame);
+ responses.receive(body);
+}
+
+void Connection::error(int code, const string& msg, int classid, int methodid){
+ std::cout << "Connection exception generated: " << code << msg;
+ if(classid || methodid){
+ std::cout << " [" << methodid << ":" << classid << "]";
+ }
+ std::cout << std::endl;
+ sendAndReceive(new AMQFrame(0, new ConnectionCloseBody(code, (string&) msg, classid, methodid)), connection_close_ok);
+ connector->close();
+}
+
+void Connection::channelException(Channel* channel, AMQMethodBody* method, QpidError& e){
+ std::cout << "Caught error from channel [" << e.code << "] " << e.msg << " (" << e.file << ":" << e.line << ")" << std::endl;
+ int code = e.code == PROTOCOL_ERROR ? e.code - PROTOCOL_ERROR : 500;
+ string msg = e.msg;
+ if(method == 0){
+ closeChannel(channel, code, msg);
+ }else{
+ closeChannel(channel, code, msg, method->amqpClassId(), method->amqpMethodId());
+ }
+}
+
+void Connection::idleIn(){
+ std::cout << "Connection timed out due to abscence of heartbeat." << std::endl;
+ connector->close();
+}
+
+void Connection::idleOut(){
+ out->send(new AMQFrame(0, new AMQHeartbeatBody()));
+}
+
+void Connection::shutdown(){
+ closed = true;
+ //close all channels
+ for(iterator i = channels.begin(); i != channels.end(); i++){
+ i->second->stop();
+ }
+}
diff --git a/cpp/client/src/Exchange.cpp b/cpp/client/src/Exchange.cpp
new file mode 100644
index 0000000000..681068dc4c
--- /dev/null
+++ b/cpp/client/src/Exchange.cpp
@@ -0,0 +1,30 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "Exchange.h"
+
+qpid::client::Exchange::Exchange(std::string _name, std::string _type) : name(_name), type(_type){}
+const std::string& qpid::client::Exchange::getName() const { return name; }
+const std::string& qpid::client::Exchange::getType() const { return type; }
+
+const std::string qpid::client::Exchange::DIRECT_EXCHANGE = "direct";
+const std::string qpid::client::Exchange::TOPIC_EXCHANGE = "topic";
+const std::string qpid::client::Exchange::HEADERS_EXCHANGE = "headers";
+
+const qpid::client::Exchange qpid::client::Exchange::DEFAULT_DIRECT_EXCHANGE("amq.direct", DIRECT_EXCHANGE);
+const qpid::client::Exchange qpid::client::Exchange::DEFAULT_TOPIC_EXCHANGE("amq.topic", TOPIC_EXCHANGE);
+const qpid::client::Exchange qpid::client::Exchange::DEFAULT_HEADERS_EXCHANGE("amq.headers", HEADERS_EXCHANGE);
diff --git a/cpp/client/src/IncomingMessage.cpp b/cpp/client/src/IncomingMessage.cpp
new file mode 100644
index 0000000000..8e2604c4cb
--- /dev/null
+++ b/cpp/client/src/IncomingMessage.cpp
@@ -0,0 +1,85 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "IncomingMessage.h"
+#include "QpidError.h"
+#include <iostream>
+
+using namespace qpid::client;
+using namespace qpid::framing;
+
+IncomingMessage::IncomingMessage(BasicDeliverBody::shared_ptr intro) : delivered(intro){}
+IncomingMessage::IncomingMessage(BasicReturnBody::shared_ptr intro): returned(intro){}
+IncomingMessage::IncomingMessage(BasicGetOkBody::shared_ptr intro): response(intro){}
+
+IncomingMessage::~IncomingMessage(){
+}
+
+void IncomingMessage::setHeader(AMQHeaderBody::shared_ptr header){
+ this->header = header;
+}
+
+void IncomingMessage::addContent(AMQContentBody::shared_ptr content){
+ this->content.push_back(content);
+}
+
+bool IncomingMessage::isComplete(){
+ return header != 0 && header->getContentSize() == contentSize();
+}
+
+bool IncomingMessage::isReturn(){
+ return returned;
+}
+
+bool IncomingMessage::isDelivery(){
+ return delivered;
+}
+
+bool IncomingMessage::isResponse(){
+ return response;
+}
+
+string& IncomingMessage::getConsumerTag(){
+ if(!isDelivery()) THROW_QPID_ERROR(CLIENT_ERROR, "Consumer tag only valid for delivery");
+ return delivered->getConsumerTag();
+}
+
+u_int64_t IncomingMessage::getDeliveryTag(){
+ if(!isDelivery()) THROW_QPID_ERROR(CLIENT_ERROR, "Delivery tag only valid for delivery");
+ return delivered->getDeliveryTag();
+}
+
+AMQHeaderBody::shared_ptr& IncomingMessage::getHeader(){
+ return header;
+}
+
+void IncomingMessage::getData(string& s){
+ int count(content.size());
+ for(int i = 0; i < count; i++){
+ if(i == 0) s = content[i]->getData();
+ else s += content[i]->getData();
+ }
+}
+
+long IncomingMessage::contentSize(){
+ long size(0);
+ int count(content.size());
+ for(int i = 0; i < count; i++){
+ size += content[i]->size();
+ }
+ return size;
+}
diff --git a/cpp/client/src/Message.cpp b/cpp/client/src/Message.cpp
new file mode 100644
index 0000000000..71befe57b1
--- /dev/null
+++ b/cpp/client/src/Message.cpp
@@ -0,0 +1,147 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "Message.h"
+
+using namespace qpid::client;
+using namespace qpid::framing;
+
+Message::Message(){
+ header = AMQHeaderBody::shared_ptr(new AMQHeaderBody(BASIC));
+}
+
+Message::Message(AMQHeaderBody::shared_ptr& _header) : header(_header){
+}
+
+Message::~Message(){
+}
+
+BasicHeaderProperties* Message::getHeaderProperties(){
+ return dynamic_cast<BasicHeaderProperties*>(header->getProperties());
+}
+
+std::string& Message::getContentType(){
+ return getHeaderProperties()->getContentType();
+}
+
+std::string& Message::getContentEncoding(){
+ return getHeaderProperties()->getContentEncoding();
+}
+
+FieldTable& Message::getHeaders(){
+ return getHeaderProperties()->getHeaders();
+}
+
+u_int8_t Message::getDeliveryMode(){
+ return getHeaderProperties()->getDeliveryMode();
+}
+
+u_int8_t Message::getPriority(){
+ return getHeaderProperties()->getPriority();
+}
+
+std::string& Message::getCorrelationId(){
+ return getHeaderProperties()->getCorrelationId();
+}
+
+std::string& Message::getReplyTo(){
+ return getHeaderProperties()->getReplyTo();
+}
+
+std::string& Message::getExpiration(){
+ return getHeaderProperties()->getExpiration();
+}
+
+std::string& Message::getMessageId(){
+ return getHeaderProperties()->getMessageId();
+}
+
+u_int64_t Message::getTimestamp(){
+ return getHeaderProperties()->getTimestamp();
+}
+
+std::string& Message::getType(){
+ return getHeaderProperties()->getType();
+}
+
+std::string& Message::getUserId(){
+ return getHeaderProperties()->getUserId();
+}
+
+std::string& Message::getAppId(){
+ return getHeaderProperties()->getAppId();
+}
+
+std::string& Message::getClusterId(){
+ return getHeaderProperties()->getClusterId();
+}
+
+void Message::setContentType(std::string& type){
+ getHeaderProperties()->setContentType(type);
+}
+
+void Message::setContentEncoding(std::string& encoding){
+ getHeaderProperties()->setContentEncoding(encoding);
+}
+
+void Message::setHeaders(FieldTable& headers){
+ getHeaderProperties()->setHeaders(headers);
+}
+
+void Message::setDeliveryMode(u_int8_t mode){
+ getHeaderProperties()->setDeliveryMode(mode);
+}
+
+void Message::setPriority(u_int8_t priority){
+ getHeaderProperties()->setPriority(priority);
+}
+
+void Message::setCorrelationId(std::string& correlationId){
+ getHeaderProperties()->setCorrelationId(correlationId);
+}
+
+void Message::setReplyTo(std::string& replyTo){
+ getHeaderProperties()->setReplyTo(replyTo);
+}
+
+void Message::setExpiration(std::string& expiration){
+ getHeaderProperties()->setExpiration(expiration);
+}
+
+void Message::setMessageId(std::string& messageId){
+ getHeaderProperties()->setMessageId(messageId);
+}
+
+void Message::setTimestamp(u_int64_t timestamp){
+ getHeaderProperties()->setTimestamp(timestamp);
+}
+
+void Message::setType(std::string& type){
+ getHeaderProperties()->setType(type);
+}
+
+void Message::setUserId(std::string& userId){
+ getHeaderProperties()->setUserId(userId);
+}
+
+void Message::setAppId(std::string& appId){
+ getHeaderProperties()->setAppId(appId);
+}
+
+void Message::setClusterId(std::string& clusterId){
+ getHeaderProperties()->setClusterId(clusterId);
+}
diff --git a/cpp/client/src/Queue.cpp b/cpp/client/src/Queue.cpp
new file mode 100644
index 0000000000..cb957dd993
--- /dev/null
+++ b/cpp/client/src/Queue.cpp
@@ -0,0 +1,47 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "Queue.h"
+
+qpid::client::Queue::Queue() : name(""), autodelete(true), exclusive(true){}
+
+qpid::client::Queue::Queue(std::string _name) : name(_name), autodelete(false), exclusive(false){}
+
+qpid::client::Queue::Queue(std::string _name, bool temp) : name(_name), autodelete(temp), exclusive(temp){}
+
+qpid::client::Queue::Queue(std::string _name, bool _autodelete, bool _exclusive)
+ : name(_name), autodelete(_autodelete), exclusive(_exclusive){}
+
+const std::string& qpid::client::Queue::getName() const{
+ return name;
+}
+
+void qpid::client::Queue::setName(const std::string& name){
+ this->name = name;
+}
+
+bool qpid::client::Queue::isAutoDelete() const{
+ return autodelete;
+}
+
+bool qpid::client::Queue::isExclusive() const{
+ return exclusive;
+}
+
+
+
+
diff --git a/cpp/client/src/ResponseHandler.cpp b/cpp/client/src/ResponseHandler.cpp
new file mode 100644
index 0000000000..837bba37fd
--- /dev/null
+++ b/cpp/client/src/ResponseHandler.cpp
@@ -0,0 +1,63 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "ResponseHandler.h"
+#include "MonitorImpl.h"
+#include "QpidError.h"
+
+qpid::client::ResponseHandler::ResponseHandler() : waiting(false){
+ monitor = new qpid::concurrent::MonitorImpl();
+}
+
+qpid::client::ResponseHandler::~ResponseHandler(){
+ delete monitor;
+}
+
+bool qpid::client::ResponseHandler::validate(const qpid::framing::AMQMethodBody& expected){
+ return expected.match(response.get());
+}
+
+void qpid::client::ResponseHandler::waitForResponse(){
+ monitor->acquire();
+ if(waiting){
+ monitor->wait();
+ }
+ monitor->release();
+}
+
+void qpid::client::ResponseHandler::signalResponse(qpid::framing::AMQMethodBody::shared_ptr response){
+ this->response = response;
+ monitor->acquire();
+ waiting = false;
+ monitor->notify();
+ monitor->release();
+}
+
+void qpid::client::ResponseHandler::receive(const qpid::framing::AMQMethodBody& expected){
+ monitor->acquire();
+ if(waiting){
+ monitor->wait();
+ }
+ monitor->release();
+ if(!validate(expected)){
+ THROW_QPID_ERROR(PROTOCOL_ERROR, "Protocol Error");
+ }
+}
+
+void qpid::client::ResponseHandler::expect(){
+ waiting = true;
+}
diff --git a/cpp/client/test/Makefile b/cpp/client/test/Makefile
new file mode 100644
index 0000000000..f35aab3e17
--- /dev/null
+++ b/cpp/client/test/Makefile
@@ -0,0 +1,45 @@
+#
+# Copyright (c) 2006 The Apache Software Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+QPID_HOME = ../../..
+include ${QPID_HOME}/cpp/options.mk
+
+# TODO aconway 2006-09-12: These are system tests, not unit tests.
+# We need client side unit tests.
+# We should separate them from the system tets.
+# We need an approach to automate the C++ client/server system tests.
+#
+
+SOURCES=$(wildcard *.cpp)
+TESTS=$(SOURCES:.cpp=)
+DEPS= $(SOURCES:.cpp=.d)
+
+INCLUDES = $(TEST_INCLUDES)
+LDLIBS= -lapr-1 $(COMMON_LIB) $(CLIENT_LIB)
+
+.PHONY: all clean
+
+all: $(TESTS)
+
+clean:
+ -@rm -f $(TESTS) $(DEPS)
+
+# Rule to build test programs.
+%: %.cpp
+ $(CXX) -o $@ $< $(CXXFLAGS) $(LDFLAGS) $(LDLIBS)
+
+# Dependencies
+-include $(DEPS)
diff --git a/cpp/client/test/client_test.cpp b/cpp/client/test/client_test.cpp
new file mode 100644
index 0000000000..e33beb3b67
--- /dev/null
+++ b/cpp/client/test/client_test.cpp
@@ -0,0 +1,97 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <iostream>
+
+#include "QpidError.h"
+#include "Channel.h"
+#include "Connection.h"
+#include "FieldTable.h"
+#include "Message.h"
+#include "MessageListener.h"
+
+#include "MonitorImpl.h"
+
+
+using namespace qpid::client;
+using namespace qpid::concurrent;
+
+class SimpleListener : public virtual MessageListener{
+ Monitor* monitor;
+
+public:
+ inline SimpleListener(Monitor* _monitor) : monitor(_monitor){}
+
+ inline virtual void received(Message& msg){
+ std::cout << "Received message " /**<< msg **/<< std::endl;
+ monitor->acquire();
+ monitor->notify();
+ monitor->release();
+ }
+};
+
+int main(int argc, char** argv)
+{
+ try{
+ Connection con(argc > 1);
+ Channel channel;
+ Exchange exchange("MyExchange", Exchange::TOPIC_EXCHANGE);
+ Queue queue("MyQueue", true);
+
+ string host("localhost");
+
+ con.open(host);
+ std::cout << "Opened connection." << std::endl;
+ con.openChannel(&channel);
+ std::cout << "Opened channel." << std::endl;
+ channel.declareExchange(exchange);
+ std::cout << "Declared exchange." << std::endl;
+ channel.declareQueue(queue);
+ std::cout << "Declared queue." << std::endl;
+ qpid::framing::FieldTable args;
+ channel.bind(exchange, queue, "MyTopic", args);
+ std::cout << "Bound queue to exchange." << std::endl;
+
+ //set up a message listener
+ MonitorImpl monitor;
+ SimpleListener listener(&monitor);
+ string tag("MyTag");
+ channel.consume(queue, tag, &listener);
+ channel.start();
+ std::cout << "Registered consumer." << std::endl;
+
+ Message msg;
+ string data("MyMessage");
+ msg.setData(data);
+ channel.publish(msg, exchange, "MyTopic");
+ std::cout << "Published message." << std::endl;
+
+ monitor.acquire();
+ monitor.wait();
+ monitor.release();
+
+
+ con.closeChannel(&channel);
+ std::cout << "Closed channel." << std::endl;
+ con.close();
+ std::cout << "Closed connection." << std::endl;
+ }catch(qpid::QpidError error){
+ std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl;
+ return 1;
+ }
+ return 0;
+}
diff --git a/cpp/client/test/topic_listener.cpp b/cpp/client/test/topic_listener.cpp
new file mode 100644
index 0000000000..707b3443a1
--- /dev/null
+++ b/cpp/client/test/topic_listener.cpp
@@ -0,0 +1,180 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <iostream>
+#include <sstream>
+#include "apr_time.h"
+#include "QpidError.h"
+#include "Channel.h"
+#include "Connection.h"
+#include "Exchange.h"
+#include "MessageListener.h"
+#include "Queue.h"
+
+using namespace qpid::client;
+
+class Listener : public MessageListener{
+ Channel* const channel;
+ const std::string responseQueue;
+ const bool transactional;
+ bool init;
+ int count;
+ apr_time_t start;
+
+ void shutdown();
+ void report();
+public:
+ Listener(Channel* channel, const std::string& reponseQueue, bool tx);
+ virtual void received(Message& msg);
+};
+
+class Args{
+ string host;
+ int port;
+ int ackMode;
+ bool transactional;
+ int prefetch;
+ bool trace;
+ bool help;
+public:
+ inline Args() : host("localhost"), port(5672), ackMode(NO_ACK), transactional(false), prefetch(1000), trace(false), help(false){}
+ void parse(int argc, char** argv);
+ void usage();
+
+ inline const string& getHost() const { return host;}
+ inline int getPort() const { return port; }
+ inline int getAckMode(){ return ackMode; }
+ inline bool getTransactional() const { return transactional; }
+ inline int getPrefetch(){ return prefetch; }
+ inline bool getTrace() const { return trace; }
+ inline bool getHelp() const { return help; }
+};
+
+int main(int argc, char** argv){
+ Args args;
+ args.parse(argc, argv);
+ if(args.getHelp()){
+ args.usage();
+ }else{
+ try{
+ Connection connection(args.getTrace());
+ connection.open(args.getHost(), args.getPort());
+ Channel channel(args.getTransactional(), args.getPrefetch());
+ connection.openChannel(&channel);
+
+ //declare exchange, queue and bind them:
+ Queue response("response");
+ channel.declareQueue(response);
+
+ Queue control;
+ channel.declareQueue(control);
+ qpid::framing::FieldTable bindArgs;
+ channel.bind(Exchange::DEFAULT_TOPIC_EXCHANGE, control, "topic_control", bindArgs);
+ //set up listener
+ Listener listener(&channel, response.getName(), args.getTransactional());
+ std::string tag;
+ channel.consume(control, tag, &listener, args.getAckMode());
+ channel.run();
+ connection.close();
+ }catch(qpid::QpidError error){
+ std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl;
+ }
+ }
+}
+
+Listener::Listener(Channel* _channel, const std::string& _responseq, bool tx) :
+ channel(_channel), responseQueue(_responseq), transactional(tx), init(false), count(0){}
+
+void Listener::received(Message& message){
+ if(!init){
+ start = apr_time_as_msec(apr_time_now());
+ count = 0;
+ init = true;
+ }
+ std::string type(message.getHeaders().getString("TYPE"));
+
+ if(type == "TERMINATION_REQUEST"){
+ shutdown();
+ }else if(type == "REPORT_REQUEST"){
+ //send a report:
+ report();
+ init = false;
+ }else if (++count % 100 == 0){
+ std::cout <<"Received " << count << " messages." << std::endl;
+ }
+}
+
+void Listener::shutdown(){
+ channel->close();
+}
+
+void Listener::report(){
+ apr_time_t finish = apr_time_as_msec(apr_time_now());
+ apr_time_t time = finish - start;
+ std::stringstream report;
+ report << "Received " << count << " messages in " << time << " ms.";
+ Message msg;
+ msg.setData(report.str());
+ channel->publish(msg, Exchange::DEFAULT_DIRECT_EXCHANGE, responseQueue);
+ if(transactional){
+ channel->commit();
+ }
+}
+
+
+void Args::parse(int argc, char** argv){
+ for(int i = 1; i < argc; i++){
+ string name(argv[i]);
+ if("-help" == name){
+ help = true;
+ break;
+ }else if("-host" == name){
+ host = argv[++i];
+ }else if("-port" == name){
+ port = atoi(argv[++i]);
+ }else if("-ack_mode" == name){
+ ackMode = atoi(argv[++i]);
+ }else if("-transactional" == name){
+ transactional = true;
+ }else if("-prefetch" == name){
+ prefetch = atoi(argv[++i]);
+ }else if("-trace" == name){
+ trace = true;
+ }else{
+ std::cout << "Warning: unrecognised option " << name << std::endl;
+ }
+ }
+}
+
+void Args::usage(){
+ std::cout << "Options:" << std::endl;
+ std::cout << " -help" << std::endl;
+ std::cout << " Prints this usage message" << std::endl;
+ std::cout << " -host <host>" << std::endl;
+ std::cout << " Specifies host to connect to (default is localhost)" << std::endl;
+ std::cout << " -port <port>" << std::endl;
+ std::cout << " Specifies port to conect to (default is 5762)" << std::endl;
+ std::cout << " -ack_mode <mode>" << std::endl;
+ std::cout << " Sets the acknowledgement mode" << std::endl;
+ std::cout << " 0=NO_ACK (default), 1=AUTO_ACK, 2=LAZY_ACK" << std::endl;
+ std::cout << " -transactional" << std::endl;
+ std::cout << " Indicates the client should use transactions" << std::endl;
+ std::cout << " -prefetch <count>" << std::endl;
+ std::cout << " Specifies the prefetch count (default is 1000)" << std::endl;
+ std::cout << " -trace" << std::endl;
+ std::cout << " Indicates that the frames sent and received should be logged" << std::endl;
+}
diff --git a/cpp/client/test/topic_publisher.cpp b/cpp/client/test/topic_publisher.cpp
new file mode 100644
index 0000000000..fc6b7f3b30
--- /dev/null
+++ b/cpp/client/test/topic_publisher.cpp
@@ -0,0 +1,253 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <iostream>
+#include <cstdlib>
+#include "unistd.h"
+#include "apr_time.h"
+#include "MonitorImpl.h"
+#include "QpidError.h"
+#include "Channel.h"
+#include "Connection.h"
+#include "Exchange.h"
+#include "MessageListener.h"
+#include "Queue.h"
+
+using namespace qpid::client;
+using namespace qpid::concurrent;
+
+class Publisher : public MessageListener{
+ Channel* const channel;
+ const std::string controlTopic;
+ const bool transactional;
+ MonitorImpl monitor;
+ int count;
+
+ void waitForCompletion(int msgs);
+ string generateData(int size);
+
+public:
+ Publisher(Channel* channel, const std::string& controlTopic, bool tx);
+ virtual void received(Message& msg);
+ apr_time_t publish(int msgs, int listeners, int size);
+ void terminate();
+};
+
+class Args{
+ string host;
+ int port;
+ int messages;
+ int subscribers;
+ int ackMode;
+ bool transactional;
+ int prefetch;
+ int batches;
+ int delay;
+ int size;
+ bool trace;
+ bool help;
+public:
+ inline Args() : host("localhost"), port(5672), messages(1000), subscribers(1),
+ ackMode(NO_ACK), transactional(false), prefetch(1000), batches(1),
+ delay(0), size(256), trace(false), help(false){}
+
+ void parse(int argc, char** argv);
+ void usage();
+
+ inline const string& getHost() const { return host;}
+ inline int getPort() const { return port; }
+ inline int getMessages() const { return messages; }
+ inline int getSubscribers() const { return subscribers; }
+ inline int getAckMode(){ return ackMode; }
+ inline bool getTransactional() const { return transactional; }
+ inline int getPrefetch(){ return prefetch; }
+ inline int getBatches(){ return batches; }
+ inline int getDelay(){ return delay; }
+ inline int getSize(){ return size; }
+ inline bool getTrace() const { return trace; }
+ inline bool getHelp() const { return help; }
+};
+
+int main(int argc, char** argv){
+ Args args;
+ args.parse(argc, argv);
+ if(args.getHelp()){
+ args.usage();
+ }else{
+ try{
+ Connection connection(args.getTrace());
+ connection.open(args.getHost(), args.getPort());
+ Channel channel(args.getTransactional(), args.getPrefetch());
+ connection.openChannel(&channel);
+
+ //declare queue (relying on default binding):
+ Queue response("response");
+ channel.declareQueue(response);
+
+ //set up listener
+ Publisher publisher(&channel, "topic_control", args.getTransactional());
+ std::string tag("mytag");
+ channel.consume(response, tag, &publisher, args.getAckMode());
+ channel.start();
+
+ int batchSize(args.getBatches());
+ apr_time_t max(0);
+ apr_time_t min(0);
+ apr_time_t sum(0);
+ for(int i = 0; i < batchSize; i++){
+ if(i > 0 && args.getDelay()) sleep(args.getDelay());
+ apr_time_t time = publisher.publish(args.getMessages(), args.getSubscribers(), args.getSize());
+ if(!max || time > max) max = time;
+ if(!min || time < min) min = time;
+ sum += time;
+ std::cout << "Completed " << (i+1) << " of " << batchSize << " in " << time << "ms" << std::endl;
+ }
+ publisher.terminate();
+ apr_time_t avg = sum / batchSize;
+ if(batchSize > 1){
+ std::cout << batchSize << " batches completed. avg=" << avg <<
+ ", max=" << max << ", min=" << min << std::endl;
+ }
+ channel.close();
+ connection.close();
+ }catch(qpid::QpidError error){
+ std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl;
+ }
+ }
+}
+
+Publisher::Publisher(Channel* _channel, const std::string& _controlTopic, bool tx) :
+ channel(_channel), controlTopic(_controlTopic), transactional(tx){}
+
+void Publisher::received(Message& msg){
+ //count responses and when all are received end the current batch
+ monitor.acquire();
+ if(--count == 0){
+ monitor.notify();
+ }
+ std::cout << "Received report: " << msg.getData() << " (" << count << " remaining)." << std::endl;
+ monitor.release();
+}
+
+void Publisher::waitForCompletion(int msgs){
+ count = msgs;
+ monitor.wait();
+}
+
+apr_time_t Publisher::publish(int msgs, int listeners, int size){
+ monitor.acquire();
+ Message msg;
+ msg.setData(generateData(size));
+ apr_time_t start(apr_time_as_msec(apr_time_now()));
+ for(int i = 0; i < msgs; i++){
+ channel->publish(msg, Exchange::DEFAULT_TOPIC_EXCHANGE, controlTopic);
+ }
+ //send report request
+ Message reportRequest;
+ reportRequest.getHeaders().setString("TYPE", "REPORT_REQUEST");
+ channel->publish(reportRequest, Exchange::DEFAULT_TOPIC_EXCHANGE, controlTopic);
+ if(transactional){
+ channel->commit();
+ }
+
+ waitForCompletion(listeners);
+ monitor.release();
+ apr_time_t finish(apr_time_as_msec(apr_time_now()));
+
+ return finish - start;
+}
+
+string Publisher::generateData(int size){
+ string data;
+ for(int i = 0; i < size; i++){
+ data += ('A' + (i / 26));
+ }
+ return data;
+}
+
+void Publisher::terminate(){
+ //send termination request
+ Message terminationRequest;
+ terminationRequest.getHeaders().setString("TYPE", "TERMINATION_REQUEST");
+ channel->publish(terminationRequest, Exchange::DEFAULT_TOPIC_EXCHANGE, controlTopic);
+ if(transactional){
+ channel->commit();
+ }
+}
+
+void Args::parse(int argc, char** argv){
+ for(int i = 1; i < argc; i++){
+ string name(argv[i]);
+ if("-help" == name){
+ help = true;
+ break;
+ }else if("-host" == name){
+ host = argv[++i];
+ }else if("-port" == name){
+ port = atoi(argv[++i]);
+ }else if("-messages" == name){
+ messages = atoi(argv[++i]);
+ }else if("-subscribers" == name){
+ subscribers = atoi(argv[++i]);
+ }else if("-ack_mode" == name){
+ ackMode = atoi(argv[++i]);
+ }else if("-transactional" == name){
+ transactional = true;
+ }else if("-prefetch" == name){
+ prefetch = atoi(argv[++i]);
+ }else if("-batches" == name){
+ batches = atoi(argv[++i]);
+ }else if("-delay" == name){
+ delay = atoi(argv[++i]);
+ }else if("-size" == name){
+ size = atoi(argv[++i]);
+ }else if("-trace" == name){
+ trace = true;
+ }else{
+ std::cout << "Warning: unrecognised option " << name << std::endl;
+ }
+ }
+}
+
+void Args::usage(){
+ std::cout << "Options:" << std::endl;
+ std::cout << " -help" << std::endl;
+ std::cout << " Prints this usage message" << std::endl;
+ std::cout << " -host <host>" << std::endl;
+ std::cout << " Specifies host to connect to (default is localhost)" << std::endl;
+ std::cout << " -port <port>" << std::endl;
+ std::cout << " Specifies port to conect to (default is 5762)" << std::endl;
+ std::cout << " -messages <count>" << std::endl;
+ std::cout << " Specifies how many messages to send" << std::endl;
+ std::cout << " -subscribers <count>" << std::endl;
+ std::cout << " Specifies how many subscribers to expect reports from" << std::endl;
+ std::cout << " -ack_mode <mode>" << std::endl;
+ std::cout << " Sets the acknowledgement mode" << std::endl;
+ std::cout << " 0=NO_ACK (default), 1=AUTO_ACK, 2=LAZY_ACK" << std::endl;
+ std::cout << " -transactional" << std::endl;
+ std::cout << " Indicates the client should use transactions" << std::endl;
+ std::cout << " -prefetch <count>" << std::endl;
+ std::cout << " Specifies the prefetch count (default is 1000)" << std::endl;
+ std::cout << " -batches <count>" << std::endl;
+ std::cout << " Specifies how many batches to run" << std::endl;
+ std::cout << " -delay <seconds>" << std::endl;
+ std::cout << " Causes a delay between each batch" << std::endl;
+ std::cout << " -size <bytes>" << std::endl;
+ std::cout << " Sets the size of the published messages (default is 256 bytes)" << std::endl;
+ std::cout << " -trace" << std::endl;
+ std::cout << " Indicates that the frames sent and received should be logged" << std::endl;
+}