diff options
Diffstat (limited to 'cpp/client')
-rw-r--r-- | cpp/client/Makefile | 43 | ||||
-rw-r--r-- | cpp/client/inc/Channel.h | 127 | ||||
-rw-r--r-- | cpp/client/inc/Connection.h | 105 | ||||
-rw-r--r-- | cpp/client/inc/Exchange.h | 49 | ||||
-rw-r--r-- | cpp/client/inc/IncomingMessage.h | 60 | ||||
-rw-r--r-- | cpp/client/inc/Message.h | 86 | ||||
-rw-r--r-- | cpp/client/inc/MessageListener.h | 37 | ||||
-rw-r--r-- | cpp/client/inc/Queue.h | 47 | ||||
-rw-r--r-- | cpp/client/inc/ResponseHandler.h | 49 | ||||
-rw-r--r-- | cpp/client/inc/ReturnedMessageHandler.h | 37 | ||||
-rw-r--r-- | cpp/client/src/Channel.cpp | 432 | ||||
-rw-r--r-- | cpp/client/src/Connection.cpp | 237 | ||||
-rw-r--r-- | cpp/client/src/Exchange.cpp | 30 | ||||
-rw-r--r-- | cpp/client/src/IncomingMessage.cpp | 85 | ||||
-rw-r--r-- | cpp/client/src/Message.cpp | 147 | ||||
-rw-r--r-- | cpp/client/src/Queue.cpp | 47 | ||||
-rw-r--r-- | cpp/client/src/ResponseHandler.cpp | 63 | ||||
-rw-r--r-- | cpp/client/test/Makefile | 45 | ||||
-rw-r--r-- | cpp/client/test/client_test.cpp | 97 | ||||
-rw-r--r-- | cpp/client/test/topic_listener.cpp | 180 | ||||
-rw-r--r-- | cpp/client/test/topic_publisher.cpp | 253 |
21 files changed, 2256 insertions, 0 deletions
diff --git a/cpp/client/Makefile b/cpp/client/Makefile new file mode 100644 index 0000000000..d08b92fe2b --- /dev/null +++ b/cpp/client/Makefile @@ -0,0 +1,43 @@ +# +# Copyright (c) 2006 The Apache Software Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# +# Build client library. +# + +QPID_HOME = ../.. +include ${QPID_HOME}/cpp/options.mk + +SOURCES := $(wildcard src/*.cpp) +OBJECTS := $(subst .cpp,.o,$(SOURCES)) +CLIENT_LIB=$(LIB_DIR)/libqpid_client.so.1.0 + +.PHONY: all test clean + +all: $(CLIENT_LIB) + +test: + @$(MAKE) -C test all + +clean: + -@rm -f $(CLIENT_LIB) $(OBJECTS) src/*.d + $(MAKE) -C test clean + +$(CLIENT_LIB): $(OBJECTS) + $(CXX) -shared -o $@ $^ $(LDFLAGS) $(COMMON_LIB) + +# Dependencies +-include $(SOURCES:.cpp=.d) diff --git a/cpp/client/inc/Channel.h b/cpp/client/inc/Channel.h new file mode 100644 index 0000000000..debecf922e --- /dev/null +++ b/cpp/client/inc/Channel.h @@ -0,0 +1,127 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <map> +#include <string> +#include <queue> +#include "sys/types.h" + +#ifndef _Channel_ +#define _Channel_ + +#include "amqp_framing.h" + +#include "ThreadFactory.h" + +#include "Connection.h" +#include "Exchange.h" +#include "IncomingMessage.h" +#include "Message.h" +#include "MessageListener.h" +#include "Queue.h" +#include "ResponseHandler.h" +#include "ReturnedMessageHandler.h" + +namespace qpid { +namespace client { + enum ack_modes {NO_ACK=0, AUTO_ACK=1, LAZY_ACK=2, CLIENT_ACK=3}; + + class Channel : private virtual qpid::framing::BodyHandler, public virtual qpid::concurrent::Runnable{ + struct Consumer{ + MessageListener* listener; + int ackMode; + int count; + u_int64_t lastDeliveryTag; + }; + typedef std::map<string,Consumer*>::iterator consumer_iterator; + + u_int16_t id; + Connection* con; + qpid::concurrent::ThreadFactory* threadFactory; + qpid::concurrent::Thread* dispatcher; + qpid::framing::OutputHandler* out; + IncomingMessage* incoming; + ResponseHandler responses; + std::queue<IncomingMessage*> messages;//holds returned messages or those delivered for a consume + IncomingMessage* retrieved;//holds response to basic.get + qpid::concurrent::Monitor* dispatchMonitor; + qpid::concurrent::Monitor* retrievalMonitor; + std::map<std::string, Consumer*> consumers; + ReturnedMessageHandler* returnsHandler; + bool closed; + + u_int16_t prefetch; + const bool transactional; + + void enqueue(); + void retrieve(Message& msg); + IncomingMessage* dequeue(); + void dispatch(); + void stop(); + void sendAndReceive(qpid::framing::AMQFrame* frame, const qpid::framing::AMQMethodBody& body); + void deliver(Consumer* consumer, Message& msg); + void setQos(); + void cancelAll(); + + virtual void handleMethod(qpid::framing::AMQMethodBody::shared_ptr body); + virtual void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr body); + virtual void handleContent(qpid::framing::AMQContentBody::shared_ptr body); + virtual void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body); + + public: + Channel(bool transactional = false, u_int16_t prefetch = 500); + ~Channel(); + + void declareExchange(Exchange& exchange, bool synch = true); + void deleteExchange(Exchange& exchange, bool synch = true); + void declareQueue(Queue& queue, bool synch = true); + void deleteQueue(Queue& queue, bool ifunused = false, bool ifempty = false, bool synch = true); + void bind(const Exchange& exchange, const Queue& queue, const std::string& key, + const qpid::framing::FieldTable& args, bool synch = true); + void consume(Queue& queue, std::string& tag, MessageListener* listener, + int ackMode = NO_ACK, bool noLocal = false, bool synch = true); + void cancel(std::string& tag, bool synch = true); + bool get(Message& msg, const Queue& queue, int ackMode = NO_ACK); + void publish(Message& msg, const Exchange& exchange, const std::string& routingKey, + bool mandatory = false, bool immediate = false); + + void commit(); + void rollback(); + + void setPrefetch(u_int16_t prefetch); + + /** + * Start message dispatching on a new thread + */ + void start(); + /** + * Do message dispatching on this thread + */ + void run(); + + void close(); + + void setReturnedMessageHandler(ReturnedMessageHandler* handler); + + friend class Connection; + }; + +} +} + + +#endif diff --git a/cpp/client/inc/Connection.h b/cpp/client/inc/Connection.h new file mode 100644 index 0000000000..89169e92b1 --- /dev/null +++ b/cpp/client/inc/Connection.h @@ -0,0 +1,105 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <map> +#include <string> + +#ifndef _Connection_ +#define _Connection_ + +#include "QpidError.h" +#include "Connector.h" +#include "ShutdownHandler.h" +#include "TimeoutHandler.h" + +#include "amqp_framing.h" +#include "Exchange.h" +#include "IncomingMessage.h" +#include "Message.h" +#include "MessageListener.h" +#include "Queue.h" +#include "ResponseHandler.h" + +namespace qpid { +namespace client { + + class Channel; + + class Connection : public virtual qpid::framing::InputHandler, + public virtual qpid::io::TimeoutHandler, + public virtual qpid::io::ShutdownHandler, + private virtual qpid::framing::BodyHandler{ + + typedef std::map<int, Channel*>::iterator iterator; + + static u_int16_t channelIdCounter; + + std::string host; + int port; + const u_int32_t max_frame_size; + std::map<int, Channel*> channels; + qpid::io::Connector* connector; + qpid::framing::OutputHandler* out; + ResponseHandler responses; + volatile bool closed; + + void channelException(Channel* channel, qpid::framing::AMQMethodBody* body, QpidError& e); + void error(int code, const string& msg, int classid = 0, int methodid = 0); + void closeChannel(Channel* channel, u_int16_t code, string& text, u_int16_t classId = 0, u_int16_t methodId = 0); + void sendAndReceive(qpid::framing::AMQFrame* frame, const qpid::framing::AMQMethodBody& body); + + virtual void handleMethod(qpid::framing::AMQMethodBody::shared_ptr body); + virtual void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr body); + virtual void handleContent(qpid::framing::AMQContentBody::shared_ptr body); + virtual void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body); + + public: + + Connection(bool debug = false, u_int32_t max_frame_size = 65536); + ~Connection(); + void open(const std::string& host, int port = 5672, + const std::string& uid = "guest", const std::string& pwd = "guest", + const std::string& virtualhost = "/"); + void close(); + void openChannel(Channel* channel); + /* + * Requests that the server close this channel, then removes + * the association to the channel from this connection + */ + void closeChannel(Channel* channel); + /* + * Removes the channel from association with this connection, + * without sending a close request to the server. + */ + void removeChannel(Channel* channel); + + virtual void received(qpid::framing::AMQFrame* frame); + + virtual void idleOut(); + virtual void idleIn(); + + virtual void shutdown(); + + inline u_int32_t getMaxFrameSize(){ return max_frame_size; } + }; + + +} +} + + +#endif diff --git a/cpp/client/inc/Exchange.h b/cpp/client/inc/Exchange.h new file mode 100644 index 0000000000..66593a41cc --- /dev/null +++ b/cpp/client/inc/Exchange.h @@ -0,0 +1,49 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <string> + +#ifndef _Exchange_ +#define _Exchange_ + +namespace qpid { +namespace client { + + class Exchange{ + const std::string name; + const std::string type; + + public: + + static const std::string DIRECT_EXCHANGE; + static const std::string TOPIC_EXCHANGE; + static const std::string HEADERS_EXCHANGE; + + static const Exchange DEFAULT_DIRECT_EXCHANGE; + static const Exchange DEFAULT_TOPIC_EXCHANGE; + static const Exchange DEFAULT_HEADERS_EXCHANGE; + + Exchange(std::string name, std::string type = DIRECT_EXCHANGE); + const std::string& getName() const; + const std::string& getType() const; + }; + +} +} + + +#endif diff --git a/cpp/client/inc/IncomingMessage.h b/cpp/client/inc/IncomingMessage.h new file mode 100644 index 0000000000..1fee6af433 --- /dev/null +++ b/cpp/client/inc/IncomingMessage.h @@ -0,0 +1,60 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <string> +#include <vector> +#include "amqp_framing.h" + +#ifndef _IncomingMessage_ +#define _IncomingMessage_ + +#include "Message.h" + +namespace qpid { +namespace client { + + class IncomingMessage{ + //content will be preceded by one of these method frames + qpid::framing::BasicDeliverBody::shared_ptr delivered; + qpid::framing::BasicReturnBody::shared_ptr returned; + qpid::framing::BasicGetOkBody::shared_ptr response; + qpid::framing::AMQHeaderBody::shared_ptr header; + std::vector<qpid::framing::AMQContentBody::shared_ptr> content; + + long contentSize(); + public: + IncomingMessage(qpid::framing::BasicDeliverBody::shared_ptr intro); + IncomingMessage(qpid::framing::BasicReturnBody::shared_ptr intro); + IncomingMessage(qpid::framing::BasicGetOkBody::shared_ptr intro); + ~IncomingMessage(); + void setHeader(qpid::framing::AMQHeaderBody::shared_ptr header); + void addContent(qpid::framing::AMQContentBody::shared_ptr content); + bool isComplete(); + bool isReturn(); + bool isDelivery(); + bool isResponse(); + string& getConsumerTag();//only relevant if isDelivery() + qpid::framing::AMQHeaderBody::shared_ptr& getHeader(); + u_int64_t getDeliveryTag(); + void getData(string& data); + }; + +} +} + + +#endif diff --git a/cpp/client/inc/Message.h b/cpp/client/inc/Message.h new file mode 100644 index 0000000000..f8a5aef565 --- /dev/null +++ b/cpp/client/inc/Message.h @@ -0,0 +1,86 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <string> +#include "amqp_framing.h" + +#ifndef _Message_ +#define _Message_ + + +namespace qpid { +namespace client { + + class Message{ + qpid::framing::AMQHeaderBody::shared_ptr header; + string data; + bool redelivered; + u_int64_t deliveryTag; + + qpid::framing::BasicHeaderProperties* getHeaderProperties(); + Message(qpid::framing::AMQHeaderBody::shared_ptr& header); + public: + Message(); + ~Message(); + + inline std::string getData(){ return data; } + inline void setData(const std::string& data){ this->data = data; } + + inline bool isRedelivered(){ return redelivered; } + inline void setRedelivered(bool redelivered){ this->redelivered = redelivered; } + + inline u_int64_t getDeliveryTag(){ return deliveryTag; } + + std::string& getContentType(); + std::string& getContentEncoding(); + qpid::framing::FieldTable& getHeaders(); + u_int8_t getDeliveryMode(); + u_int8_t getPriority(); + std::string& getCorrelationId(); + std::string& getReplyTo(); + std::string& getExpiration(); + std::string& getMessageId(); + u_int64_t getTimestamp(); + std::string& getType(); + std::string& getUserId(); + std::string& getAppId(); + std::string& getClusterId(); + + void setContentType(std::string& type); + void setContentEncoding(std::string& encoding); + void setHeaders(qpid::framing::FieldTable& headers); + void setDeliveryMode(u_int8_t mode); + void setPriority(u_int8_t priority); + void setCorrelationId(std::string& correlationId); + void setReplyTo(std::string& replyTo); + void setExpiration(std::string& expiration); + void setMessageId(std::string& messageId); + void setTimestamp(u_int64_t timestamp); + void setType(std::string& type); + void setUserId(std::string& userId); + void setAppId(std::string& appId); + void setClusterId(std::string& clusterId); + + + friend class Channel; + }; + +} +} + + +#endif diff --git a/cpp/client/inc/MessageListener.h b/cpp/client/inc/MessageListener.h new file mode 100644 index 0000000000..47307a4df5 --- /dev/null +++ b/cpp/client/inc/MessageListener.h @@ -0,0 +1,37 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <string> + +#ifndef _MessageListener_ +#define _MessageListener_ + +#include "Message.h" + +namespace qpid { +namespace client { + + class MessageListener{ + public: + virtual void received(Message& msg) = 0; + }; + +} +} + + +#endif diff --git a/cpp/client/inc/Queue.h b/cpp/client/inc/Queue.h new file mode 100644 index 0000000000..e0964af774 --- /dev/null +++ b/cpp/client/inc/Queue.h @@ -0,0 +1,47 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <string> + +#ifndef _Queue_ +#define _Queue_ + +namespace qpid { +namespace client { + + class Queue{ + std::string name; + const bool autodelete; + const bool exclusive; + + public: + + Queue(); + Queue(std::string name); + Queue(std::string name, bool temp); + Queue(std::string name, bool autodelete, bool exclusive); + const std::string& getName() const; + void setName(const std::string&); + bool isAutoDelete() const; + bool isExclusive() const; + }; + +} +} + + +#endif diff --git a/cpp/client/inc/ResponseHandler.h b/cpp/client/inc/ResponseHandler.h new file mode 100644 index 0000000000..f5392c954d --- /dev/null +++ b/cpp/client/inc/ResponseHandler.h @@ -0,0 +1,49 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <string> +#include "amqp_framing.h" +#include "Monitor.h" + +#ifndef _ResponseHandler_ +#define _ResponseHandler_ + +namespace qpid { + namespace client { + + class ResponseHandler{ + bool waiting; + qpid::framing::AMQMethodBody::shared_ptr response; + qpid::concurrent::Monitor* monitor; + + public: + ResponseHandler(); + ~ResponseHandler(); + inline bool isWaiting(){ return waiting; } + inline qpid::framing::AMQMethodBody::shared_ptr getResponse(){ return response; } + bool validate(const qpid::framing::AMQMethodBody& expected); + void waitForResponse(); + void signalResponse(qpid::framing::AMQMethodBody::shared_ptr response); + void receive(const qpid::framing::AMQMethodBody& expected); + void expect();//must be called before calling receive + }; + + } +} + + +#endif diff --git a/cpp/client/inc/ReturnedMessageHandler.h b/cpp/client/inc/ReturnedMessageHandler.h new file mode 100644 index 0000000000..0117778fde --- /dev/null +++ b/cpp/client/inc/ReturnedMessageHandler.h @@ -0,0 +1,37 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <string> + +#ifndef _ReturnedMessageHandler_ +#define _ReturnedMessageHandler_ + +#include "Message.h" + +namespace qpid { +namespace client { + + class ReturnedMessageHandler{ + public: + virtual void returned(Message& msg) = 0; + }; + +} +} + + +#endif diff --git a/cpp/client/src/Channel.cpp b/cpp/client/src/Channel.cpp new file mode 100644 index 0000000000..e965f7e5dd --- /dev/null +++ b/cpp/client/src/Channel.cpp @@ -0,0 +1,432 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "Channel.h" +#include "MonitorImpl.h" +#include "ThreadFactoryImpl.h" +#include "Message.h" +#include "QpidError.h" + +using namespace std::tr1;//to use dynamic_pointer_cast +using namespace qpid::client; +using namespace qpid::framing; +using namespace qpid::concurrent; + +Channel::Channel(bool _transactional, u_int16_t _prefetch) : id(0), incoming(0), con(0), out(0), + prefetch(_prefetch), + transactional(_transactional), + dispatcher(0), + closed(true){ + threadFactory = new ThreadFactoryImpl(); + dispatchMonitor = new MonitorImpl(); + retrievalMonitor = new MonitorImpl(); +} + +Channel::~Channel(){ + if(dispatcher){ + stop(); + delete dispatcher; + } + delete retrievalMonitor; + delete dispatchMonitor; + delete threadFactory; +} + +void Channel::setPrefetch(u_int16_t prefetch){ + this->prefetch = prefetch; + if(con != 0 && out != 0){ + setQos(); + } +} + +void Channel::setQos(){ + sendAndReceive(new AMQFrame(id, new BasicQosBody(0, prefetch, false)), basic_qos_ok); + if(transactional){ + sendAndReceive(new AMQFrame(id, new TxSelectBody()), tx_select_ok); + } +} + +void Channel::declareExchange(Exchange& exchange, bool synch){ + string name = exchange.getName(); + string type = exchange.getType(); + FieldTable args; + AMQFrame* frame = new AMQFrame(id, new ExchangeDeclareBody(0, name, type, false, false, false, false, !synch, args)); + if(synch){ + sendAndReceive(frame, exchange_declare_ok); + }else{ + out->send(frame); + } +} + +void Channel::deleteExchange(Exchange& exchange, bool synch){ + string name = exchange.getName(); + AMQFrame* frame = new AMQFrame(id, new ExchangeDeleteBody(0, name, false, !synch)); + if(synch){ + sendAndReceive(frame, exchange_delete_ok); + }else{ + out->send(frame); + } +} + +void Channel::declareQueue(Queue& queue, bool synch){ + string name = queue.getName(); + FieldTable args; + AMQFrame* frame = new AMQFrame(id, new QueueDeclareBody(0, name, false, false, + queue.isExclusive(), + queue.isAutoDelete(), !synch, args)); + if(synch){ + sendAndReceive(frame, queue_declare_ok); + if(queue.getName().length() == 0){ + QueueDeclareOkBody::shared_ptr response = + dynamic_pointer_cast<QueueDeclareOkBody, AMQMethodBody>(responses.getResponse()); + queue.setName(response->getQueue()); + } + }else{ + out->send(frame); + } +} + +void Channel::deleteQueue(Queue& queue, bool ifunused, bool ifempty, bool synch){ + //ticket, queue, ifunused, ifempty, nowait + string name = queue.getName(); + AMQFrame* frame = new AMQFrame(id, new QueueDeleteBody(0, name, ifunused, ifempty, !synch)); + if(synch){ + sendAndReceive(frame, queue_delete_ok); + }else{ + out->send(frame); + } +} + +void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){ + string e = exchange.getName(); + string q = queue.getName(); + AMQFrame* frame = new AMQFrame(id, new QueueBindBody(0, q, e, (string&) key,!synch, (FieldTable&) args)); + if(synch){ + sendAndReceive(frame, queue_bind_ok); + }else{ + out->send(frame); + } +} + +void Channel::consume(Queue& queue, std::string& tag, MessageListener* listener, + int ackMode, bool noLocal, bool synch){ + + string q = queue.getName(); + AMQFrame* frame = new AMQFrame(id, new BasicConsumeBody(0, q, (string&) tag, noLocal, ackMode == NO_ACK, false, !synch)); + if(synch){ + sendAndReceive(frame, basic_consume_ok); + BasicConsumeOkBody::shared_ptr response = dynamic_pointer_cast<BasicConsumeOkBody, AMQMethodBody>(responses.getResponse()); + tag = response->getConsumerTag(); + }else{ + out->send(frame); + } + Consumer* c = new Consumer(); + c->listener = listener; + c->ackMode = ackMode; + c->lastDeliveryTag = 0; + consumers[tag] = c; +} + +void Channel::cancel(std::string& tag, bool synch){ + Consumer* c = consumers[tag]; + if(c->ackMode == LAZY_ACK && c->lastDeliveryTag > 0){ + out->send(new AMQFrame(id, new BasicAckBody(c->lastDeliveryTag, true))); + } + + AMQFrame* frame = new AMQFrame(id, new BasicCancelBody((string&) tag, !synch)); + if(synch){ + sendAndReceive(frame, basic_cancel_ok); + }else{ + out->send(frame); + } + consumers.erase(tag); + if(c != 0){ + delete c; + } +} + +void Channel::cancelAll(){ + int count(consumers.size()); + for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin()){ + Consumer* c = i->second; + if((c->ackMode == LAZY_ACK || c->ackMode == AUTO_ACK) && c->lastDeliveryTag > 0){ + out->send(new AMQFrame(id, new BasicAckBody(c->lastDeliveryTag, true))); + } + consumers.erase(i); + delete c; + } +} + +void Channel::retrieve(Message& msg){ + retrievalMonitor->acquire(); + while(retrieved == 0){ + retrievalMonitor->wait(); + } + + msg.header = retrieved->getHeader(); + msg.deliveryTag = retrieved->getDeliveryTag(); + retrieved->getData(msg.data); + delete retrieved; + retrieved = 0; + + retrievalMonitor->release(); +} + +bool Channel::get(Message& msg, const Queue& queue, int ackMode){ + string name = queue.getName(); + AMQFrame* frame = new AMQFrame(id, new BasicGetBody(0, name, ackMode)); + responses.expect(); + out->send(frame); + responses.waitForResponse(); + AMQMethodBody::shared_ptr response = responses.getResponse(); + if(basic_get_ok.match(response.get())){ + if(incoming != 0){ + std::cout << "Existing message not complete" << std::endl; + THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete"); + }else{ + incoming = new IncomingMessage(dynamic_pointer_cast<BasicGetOkBody, AMQMethodBody>(response)); + } + retrieve(msg); + return true; + }if(basic_get_empty.match(response.get())){ + return false; + }else{ + THROW_QPID_ERROR(PROTOCOL_ERROR + 500, "Unexpected response to basic.get."); + } +} + + +void Channel::publish(Message& msg, const Exchange& exchange, const std::string& routingKey, bool mandatory, bool immediate){ + string e = exchange.getName(); + string key = routingKey; + + out->send(new AMQFrame(id, new BasicPublishBody(0, e, key, mandatory, immediate))); + //break msg up into header frame and content frame(s) and send these + string data = msg.getData(); + msg.header->setContentSize(data.length()); + AMQBody::shared_ptr body(static_pointer_cast<AMQBody, AMQHeaderBody>(msg.header)); + out->send(new AMQFrame(id, body)); + + int data_length = data.length(); + if(data_length > 0){ + //TODO fragmentation of messages, need to know max frame size for connection + int frag_size = con->getMaxFrameSize() - 4; + if(data_length < frag_size){ + out->send(new AMQFrame(id, new AMQContentBody(data))); + }else{ + int frag_count = data_length / frag_size; + for(int i = 0; i < frag_count; i++){ + int pos = i*frag_size; + int len = i < frag_count - 1 ? frag_size : data_length - pos; + string frag(data.substr(pos, len)); + out->send(new AMQFrame(id, new AMQContentBody(frag))); + } + } + } +} + +void Channel::commit(){ + AMQFrame* frame = new AMQFrame(id, new TxCommitBody()); + sendAndReceive(frame, tx_commit_ok); +} + +void Channel::rollback(){ + AMQFrame* frame = new AMQFrame(id, new TxRollbackBody()); + sendAndReceive(frame, tx_rollback_ok); +} + +void Channel::handleMethod(AMQMethodBody::shared_ptr body){ + //channel.flow, channel.close, basic.deliver, basic.return or a response to a synchronous request + if(responses.isWaiting()){ + responses.signalResponse(body); + }else if(basic_deliver.match(body.get())){ + if(incoming != 0){ + std::cout << "Existing message not complete [deliveryTag=" << incoming->getDeliveryTag() << "]" << std::endl; + THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete"); + }else{ + incoming = new IncomingMessage(dynamic_pointer_cast<BasicDeliverBody, AMQMethodBody>(body)); + } + }else if(basic_return.match(body.get())){ + if(incoming != 0){ + std::cout << "Existing message not complete" << std::endl; + THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete"); + }else{ + incoming = new IncomingMessage(dynamic_pointer_cast<BasicReturnBody, AMQMethodBody>(body)); + } + }else if(channel_close.match(body.get())){ + con->removeChannel(this); + //need to signal application that channel has been closed through exception + + }else if(channel_flow.match(body.get())){ + + }else{ + //signal error + std::cout << "Unhandled method: " << *body << std::endl; + THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Unhandled method"); + } +} + +void Channel::handleHeader(AMQHeaderBody::shared_ptr body){ + if(incoming == 0){ + //handle invalid frame sequence + std::cout << "Invalid message sequence: got header before return or deliver." << std::endl; + THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got header before return or deliver."); + }else{ + incoming->setHeader(body); + if(incoming->isComplete()){ + enqueue(); + } + } +} + +void Channel::handleContent(AMQContentBody::shared_ptr body){ + if(incoming == 0){ + //handle invalid frame sequence + std::cout << "Invalid message sequence: got content before return or deliver." << std::endl; + THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got content before return or deliver."); + }else{ + incoming->addContent(body); + if(incoming->isComplete()){ + enqueue(); + } + } +} + +void Channel::handleHeartbeat(AMQHeartbeatBody::shared_ptr body){ + THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Channel received heartbeat"); +} + +void Channel::start(){ + dispatcher = threadFactory->create(this); + dispatcher->start(); +} + +void Channel::stop(){ + closed = true; + dispatchMonitor->acquire(); + dispatchMonitor->notify(); + dispatchMonitor->release(); + if(dispatcher){ + dispatcher->join(); + } +} + +void Channel::run(){ + dispatch(); +} + +void Channel::enqueue(){ + if(incoming->isResponse()){ + retrievalMonitor->acquire(); + retrieved = incoming; + retrievalMonitor->notify(); + retrievalMonitor->release(); + }else{ + dispatchMonitor->acquire(); + messages.push(incoming); + dispatchMonitor->notify(); + dispatchMonitor->release(); + } + incoming = 0; +} + +IncomingMessage* Channel::dequeue(){ + dispatchMonitor->acquire(); + while(messages.empty() && !closed){ + dispatchMonitor->wait(); + } + IncomingMessage* msg = 0; + if(!messages.empty()){ + msg = messages.front(); + messages.pop(); + } + dispatchMonitor->release(); + return msg; +} + +void Channel::deliver(Consumer* consumer, Message& msg){ + //record delivery tag: + consumer->lastDeliveryTag = msg.getDeliveryTag(); + + //allow registered listener to handle the message + consumer->listener->received(msg); + + //if the handler calls close on the channel or connection while + //handling this message, then consumer will now have been deleted. + if(!closed){ + bool multiple(false); + switch(consumer->ackMode){ + case LAZY_ACK: + multiple = true; + if(++(consumer->count) < prefetch) break; + //else drop-through + case AUTO_ACK: + out->send(new AMQFrame(id, new BasicAckBody(msg.getDeliveryTag(), multiple))); + consumer->lastDeliveryTag = 0; + } + } + + //as it stands, transactionality is entirely orthogonal to ack + //mode, though the acks will not be processed by the broker under + //a transaction until it commits. +} + +void Channel::dispatch(){ + while(!closed){ + IncomingMessage* incomingMsg = dequeue(); + if(incomingMsg){ + //Note: msg is currently only valid for duration of this call + Message msg(incomingMsg->getHeader()); + incomingMsg->getData(msg.data); + if(incomingMsg->isReturn()){ + if(returnsHandler == 0){ + //print warning to log/console + std::cout << "Message returned: " << msg.getData() << std::endl; + }else{ + returnsHandler->returned(msg); + } + }else{ + msg.deliveryTag = incomingMsg->getDeliveryTag(); + std::string tag = incomingMsg->getConsumerTag(); + + if(consumers[tag] == 0){ + //signal error + std::cout << "Unknown consumer: " << tag << std::endl; + }else{ + deliver(consumers[tag], msg); + } + } + delete incomingMsg; + } + } +} + +void Channel::setReturnedMessageHandler(ReturnedMessageHandler* handler){ + returnsHandler = handler; +} + +void Channel::sendAndReceive(AMQFrame* frame, const AMQMethodBody& body){ + responses.expect(); + out->send(frame); + responses.receive(body); +} + +void Channel::close(){ + if(con != 0){ + con->closeChannel(this); + } +} diff --git a/cpp/client/src/Connection.cpp b/cpp/client/src/Connection.cpp new file mode 100644 index 0000000000..eeb2330561 --- /dev/null +++ b/cpp/client/src/Connection.cpp @@ -0,0 +1,237 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "Connection.h" +#include "Channel.h" +#include "ConnectorImpl.h" +#include "Message.h" +#include "QpidError.h" +#include <iostream> + +using namespace qpid::client; +using namespace qpid::framing; +using namespace qpid::io; +using namespace qpid::concurrent; + +u_int16_t Connection::channelIdCounter; + +Connection::Connection(bool debug, u_int32_t _max_frame_size) : max_frame_size(_max_frame_size), closed(true){ + connector = new ConnectorImpl(debug, _max_frame_size); +} + +Connection::~Connection(){ + delete connector; +} + +void Connection::open(const std::string& host, int port, const std::string& uid, const std::string& pwd, const std::string& virtualhost){ + this->host = host; + this->port = port; + connector->setInputHandler(this); + connector->setTimeoutHandler(this); + connector->setShutdownHandler(this); + out = connector->getOutputHandler(); + connector->connect(host, port); + + ProtocolInitiation* header = new ProtocolInitiation(8, 0); + responses.expect(); + connector->init(header); + responses.receive(connection_start); + + FieldTable props; + string mechanism("PLAIN"); + string response = ((char)0) + uid + ((char)0) + pwd; + string locale("en_US"); + responses.expect(); + out->send(new AMQFrame(0, new ConnectionStartOkBody(props, mechanism, response, locale))); + + /** + * Assume for now that further challenges will not be required + //receive connection.secure + responses.receive(connection_secure)); + //send connection.secure-ok + out->send(new AMQFrame(0, new ConnectionSecureOkBody(response))); + **/ + + responses.receive(connection_tune); + + ConnectionTuneBody::shared_ptr proposal = std::tr1::dynamic_pointer_cast<ConnectionTuneBody, AMQMethodBody>(responses.getResponse()); + out->send(new AMQFrame(0, new ConnectionTuneOkBody(proposal->getChannelMax(), max_frame_size, proposal->getHeartbeat()))); + + u_int16_t heartbeat = proposal->getHeartbeat(); + connector->setReadTimeout(heartbeat * 2); + connector->setWriteTimeout(heartbeat); + + //send connection.open + string capabilities; + string vhost = virtualhost; + responses.expect(); + out->send(new AMQFrame(0, new ConnectionOpenBody(vhost, capabilities, true))); + //receive connection.open-ok (or redirect, but ignore that for now esp. as using force=true). + responses.waitForResponse(); + if(responses.validate(connection_open_ok)){ + //ok + }else if(responses.validate(connection_redirect)){ + //ignore for now + ConnectionRedirectBody::shared_ptr redirect(std::tr1::dynamic_pointer_cast<ConnectionRedirectBody, AMQMethodBody>(responses.getResponse())); + std::cout << "Received redirection to " << redirect->getHost() << std::endl; + }else{ + THROW_QPID_ERROR(PROTOCOL_ERROR, "Bad response"); + } + +} + +void Connection::close(){ + if(!closed){ + u_int16_t code(200); + string text("Ok"); + u_int16_t classId(0); + u_int16_t methodId(0); + + sendAndReceive(new AMQFrame(0, new ConnectionCloseBody(code, text, classId, methodId)), connection_close_ok); + connector->close(); + } +} + +void Connection::openChannel(Channel* channel){ + channel->con = this; + channel->id = ++channelIdCounter; + channel->out = out; + channels[channel->id] = channel; + //now send frame to open channel and wait for response + string oob; + channel->sendAndReceive(new AMQFrame(channel->id, new ChannelOpenBody(oob)), channel_open_ok); + channel->setQos(); + channel->closed = false; +} + +void Connection::closeChannel(Channel* channel){ + //send frame to close channel + u_int16_t code(200); + string text("Ok"); + u_int16_t classId(0); + u_int16_t methodId(0); + closeChannel(channel, code, text, classId, methodId); +} + +void Connection::closeChannel(Channel* channel, u_int16_t code, string& text, u_int16_t classId, u_int16_t methodId){ + //send frame to close channel + channel->cancelAll(); + channel->closed = true; + channel->sendAndReceive(new AMQFrame(channel->id, new ChannelCloseBody(code, text, classId, methodId)), channel_close_ok); + channel->con = 0; + channel->out = 0; + removeChannel(channel); +} + +void Connection::removeChannel(Channel* channel){ + //send frame to close channel + + channels.erase(channel->id); + channel->out = 0; + channel->id = 0; + channel->con = 0; +} + +void Connection::received(AMQFrame* frame){ + u_int16_t channelId = frame->getChannel(); + + if(channelId == 0){ + this->handleBody(frame->getBody()); + }else{ + Channel* channel = channels[channelId]; + if(channel == 0){ + error(504, "Unknown channel"); + }else{ + try{ + channel->handleBody(frame->getBody()); + }catch(qpid::QpidError e){ + channelException(channel, dynamic_cast<AMQMethodBody*>(frame->getBody().get()), e); + } + } + } +} + +void Connection::handleMethod(AMQMethodBody::shared_ptr body){ + //connection.close, basic.deliver, basic.return or a response to a synchronous request + if(responses.isWaiting()){ + responses.signalResponse(body); + }else if(connection_close.match(body.get())){ + //send back close ok + //close socket + ConnectionCloseBody* request = dynamic_cast<ConnectionCloseBody*>(body.get()); + std::cout << "Connection closed by server: " << request->getReplyCode() << ":" << request->getReplyText() << std::endl; + connector->close(); + }else{ + std::cout << "Unhandled method for connection: " << *body << std::endl; + error(504, "Unrecognised method", body->amqpClassId(), body->amqpMethodId()); + } +} + +void Connection::handleHeader(AMQHeaderBody::shared_ptr body){ + error(504, "Channel error: received header body with channel 0."); +} + +void Connection::handleContent(AMQContentBody::shared_ptr body){ + error(504, "Channel error: received content body with channel 0."); +} + +void Connection::handleHeartbeat(AMQHeartbeatBody::shared_ptr body){ +} + +void Connection::sendAndReceive(AMQFrame* frame, const AMQMethodBody& body){ + responses.expect(); + out->send(frame); + responses.receive(body); +} + +void Connection::error(int code, const string& msg, int classid, int methodid){ + std::cout << "Connection exception generated: " << code << msg; + if(classid || methodid){ + std::cout << " [" << methodid << ":" << classid << "]"; + } + std::cout << std::endl; + sendAndReceive(new AMQFrame(0, new ConnectionCloseBody(code, (string&) msg, classid, methodid)), connection_close_ok); + connector->close(); +} + +void Connection::channelException(Channel* channel, AMQMethodBody* method, QpidError& e){ + std::cout << "Caught error from channel [" << e.code << "] " << e.msg << " (" << e.file << ":" << e.line << ")" << std::endl; + int code = e.code == PROTOCOL_ERROR ? e.code - PROTOCOL_ERROR : 500; + string msg = e.msg; + if(method == 0){ + closeChannel(channel, code, msg); + }else{ + closeChannel(channel, code, msg, method->amqpClassId(), method->amqpMethodId()); + } +} + +void Connection::idleIn(){ + std::cout << "Connection timed out due to abscence of heartbeat." << std::endl; + connector->close(); +} + +void Connection::idleOut(){ + out->send(new AMQFrame(0, new AMQHeartbeatBody())); +} + +void Connection::shutdown(){ + closed = true; + //close all channels + for(iterator i = channels.begin(); i != channels.end(); i++){ + i->second->stop(); + } +} diff --git a/cpp/client/src/Exchange.cpp b/cpp/client/src/Exchange.cpp new file mode 100644 index 0000000000..681068dc4c --- /dev/null +++ b/cpp/client/src/Exchange.cpp @@ -0,0 +1,30 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "Exchange.h" + +qpid::client::Exchange::Exchange(std::string _name, std::string _type) : name(_name), type(_type){} +const std::string& qpid::client::Exchange::getName() const { return name; } +const std::string& qpid::client::Exchange::getType() const { return type; } + +const std::string qpid::client::Exchange::DIRECT_EXCHANGE = "direct"; +const std::string qpid::client::Exchange::TOPIC_EXCHANGE = "topic"; +const std::string qpid::client::Exchange::HEADERS_EXCHANGE = "headers"; + +const qpid::client::Exchange qpid::client::Exchange::DEFAULT_DIRECT_EXCHANGE("amq.direct", DIRECT_EXCHANGE); +const qpid::client::Exchange qpid::client::Exchange::DEFAULT_TOPIC_EXCHANGE("amq.topic", TOPIC_EXCHANGE); +const qpid::client::Exchange qpid::client::Exchange::DEFAULT_HEADERS_EXCHANGE("amq.headers", HEADERS_EXCHANGE); diff --git a/cpp/client/src/IncomingMessage.cpp b/cpp/client/src/IncomingMessage.cpp new file mode 100644 index 0000000000..8e2604c4cb --- /dev/null +++ b/cpp/client/src/IncomingMessage.cpp @@ -0,0 +1,85 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "IncomingMessage.h" +#include "QpidError.h" +#include <iostream> + +using namespace qpid::client; +using namespace qpid::framing; + +IncomingMessage::IncomingMessage(BasicDeliverBody::shared_ptr intro) : delivered(intro){} +IncomingMessage::IncomingMessage(BasicReturnBody::shared_ptr intro): returned(intro){} +IncomingMessage::IncomingMessage(BasicGetOkBody::shared_ptr intro): response(intro){} + +IncomingMessage::~IncomingMessage(){ +} + +void IncomingMessage::setHeader(AMQHeaderBody::shared_ptr header){ + this->header = header; +} + +void IncomingMessage::addContent(AMQContentBody::shared_ptr content){ + this->content.push_back(content); +} + +bool IncomingMessage::isComplete(){ + return header != 0 && header->getContentSize() == contentSize(); +} + +bool IncomingMessage::isReturn(){ + return returned; +} + +bool IncomingMessage::isDelivery(){ + return delivered; +} + +bool IncomingMessage::isResponse(){ + return response; +} + +string& IncomingMessage::getConsumerTag(){ + if(!isDelivery()) THROW_QPID_ERROR(CLIENT_ERROR, "Consumer tag only valid for delivery"); + return delivered->getConsumerTag(); +} + +u_int64_t IncomingMessage::getDeliveryTag(){ + if(!isDelivery()) THROW_QPID_ERROR(CLIENT_ERROR, "Delivery tag only valid for delivery"); + return delivered->getDeliveryTag(); +} + +AMQHeaderBody::shared_ptr& IncomingMessage::getHeader(){ + return header; +} + +void IncomingMessage::getData(string& s){ + int count(content.size()); + for(int i = 0; i < count; i++){ + if(i == 0) s = content[i]->getData(); + else s += content[i]->getData(); + } +} + +long IncomingMessage::contentSize(){ + long size(0); + int count(content.size()); + for(int i = 0; i < count; i++){ + size += content[i]->size(); + } + return size; +} diff --git a/cpp/client/src/Message.cpp b/cpp/client/src/Message.cpp new file mode 100644 index 0000000000..71befe57b1 --- /dev/null +++ b/cpp/client/src/Message.cpp @@ -0,0 +1,147 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "Message.h" + +using namespace qpid::client; +using namespace qpid::framing; + +Message::Message(){ + header = AMQHeaderBody::shared_ptr(new AMQHeaderBody(BASIC)); +} + +Message::Message(AMQHeaderBody::shared_ptr& _header) : header(_header){ +} + +Message::~Message(){ +} + +BasicHeaderProperties* Message::getHeaderProperties(){ + return dynamic_cast<BasicHeaderProperties*>(header->getProperties()); +} + +std::string& Message::getContentType(){ + return getHeaderProperties()->getContentType(); +} + +std::string& Message::getContentEncoding(){ + return getHeaderProperties()->getContentEncoding(); +} + +FieldTable& Message::getHeaders(){ + return getHeaderProperties()->getHeaders(); +} + +u_int8_t Message::getDeliveryMode(){ + return getHeaderProperties()->getDeliveryMode(); +} + +u_int8_t Message::getPriority(){ + return getHeaderProperties()->getPriority(); +} + +std::string& Message::getCorrelationId(){ + return getHeaderProperties()->getCorrelationId(); +} + +std::string& Message::getReplyTo(){ + return getHeaderProperties()->getReplyTo(); +} + +std::string& Message::getExpiration(){ + return getHeaderProperties()->getExpiration(); +} + +std::string& Message::getMessageId(){ + return getHeaderProperties()->getMessageId(); +} + +u_int64_t Message::getTimestamp(){ + return getHeaderProperties()->getTimestamp(); +} + +std::string& Message::getType(){ + return getHeaderProperties()->getType(); +} + +std::string& Message::getUserId(){ + return getHeaderProperties()->getUserId(); +} + +std::string& Message::getAppId(){ + return getHeaderProperties()->getAppId(); +} + +std::string& Message::getClusterId(){ + return getHeaderProperties()->getClusterId(); +} + +void Message::setContentType(std::string& type){ + getHeaderProperties()->setContentType(type); +} + +void Message::setContentEncoding(std::string& encoding){ + getHeaderProperties()->setContentEncoding(encoding); +} + +void Message::setHeaders(FieldTable& headers){ + getHeaderProperties()->setHeaders(headers); +} + +void Message::setDeliveryMode(u_int8_t mode){ + getHeaderProperties()->setDeliveryMode(mode); +} + +void Message::setPriority(u_int8_t priority){ + getHeaderProperties()->setPriority(priority); +} + +void Message::setCorrelationId(std::string& correlationId){ + getHeaderProperties()->setCorrelationId(correlationId); +} + +void Message::setReplyTo(std::string& replyTo){ + getHeaderProperties()->setReplyTo(replyTo); +} + +void Message::setExpiration(std::string& expiration){ + getHeaderProperties()->setExpiration(expiration); +} + +void Message::setMessageId(std::string& messageId){ + getHeaderProperties()->setMessageId(messageId); +} + +void Message::setTimestamp(u_int64_t timestamp){ + getHeaderProperties()->setTimestamp(timestamp); +} + +void Message::setType(std::string& type){ + getHeaderProperties()->setType(type); +} + +void Message::setUserId(std::string& userId){ + getHeaderProperties()->setUserId(userId); +} + +void Message::setAppId(std::string& appId){ + getHeaderProperties()->setAppId(appId); +} + +void Message::setClusterId(std::string& clusterId){ + getHeaderProperties()->setClusterId(clusterId); +} diff --git a/cpp/client/src/Queue.cpp b/cpp/client/src/Queue.cpp new file mode 100644 index 0000000000..cb957dd993 --- /dev/null +++ b/cpp/client/src/Queue.cpp @@ -0,0 +1,47 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "Queue.h" + +qpid::client::Queue::Queue() : name(""), autodelete(true), exclusive(true){} + +qpid::client::Queue::Queue(std::string _name) : name(_name), autodelete(false), exclusive(false){} + +qpid::client::Queue::Queue(std::string _name, bool temp) : name(_name), autodelete(temp), exclusive(temp){} + +qpid::client::Queue::Queue(std::string _name, bool _autodelete, bool _exclusive) + : name(_name), autodelete(_autodelete), exclusive(_exclusive){} + +const std::string& qpid::client::Queue::getName() const{ + return name; +} + +void qpid::client::Queue::setName(const std::string& name){ + this->name = name; +} + +bool qpid::client::Queue::isAutoDelete() const{ + return autodelete; +} + +bool qpid::client::Queue::isExclusive() const{ + return exclusive; +} + + + + diff --git a/cpp/client/src/ResponseHandler.cpp b/cpp/client/src/ResponseHandler.cpp new file mode 100644 index 0000000000..837bba37fd --- /dev/null +++ b/cpp/client/src/ResponseHandler.cpp @@ -0,0 +1,63 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "ResponseHandler.h" +#include "MonitorImpl.h" +#include "QpidError.h" + +qpid::client::ResponseHandler::ResponseHandler() : waiting(false){ + monitor = new qpid::concurrent::MonitorImpl(); +} + +qpid::client::ResponseHandler::~ResponseHandler(){ + delete monitor; +} + +bool qpid::client::ResponseHandler::validate(const qpid::framing::AMQMethodBody& expected){ + return expected.match(response.get()); +} + +void qpid::client::ResponseHandler::waitForResponse(){ + monitor->acquire(); + if(waiting){ + monitor->wait(); + } + monitor->release(); +} + +void qpid::client::ResponseHandler::signalResponse(qpid::framing::AMQMethodBody::shared_ptr response){ + this->response = response; + monitor->acquire(); + waiting = false; + monitor->notify(); + monitor->release(); +} + +void qpid::client::ResponseHandler::receive(const qpid::framing::AMQMethodBody& expected){ + monitor->acquire(); + if(waiting){ + monitor->wait(); + } + monitor->release(); + if(!validate(expected)){ + THROW_QPID_ERROR(PROTOCOL_ERROR, "Protocol Error"); + } +} + +void qpid::client::ResponseHandler::expect(){ + waiting = true; +} diff --git a/cpp/client/test/Makefile b/cpp/client/test/Makefile new file mode 100644 index 0000000000..f35aab3e17 --- /dev/null +++ b/cpp/client/test/Makefile @@ -0,0 +1,45 @@ +# +# Copyright (c) 2006 The Apache Software Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +QPID_HOME = ../../.. +include ${QPID_HOME}/cpp/options.mk + +# TODO aconway 2006-09-12: These are system tests, not unit tests. +# We need client side unit tests. +# We should separate them from the system tets. +# We need an approach to automate the C++ client/server system tests. +# + +SOURCES=$(wildcard *.cpp) +TESTS=$(SOURCES:.cpp=) +DEPS= $(SOURCES:.cpp=.d) + +INCLUDES = $(TEST_INCLUDES) +LDLIBS= -lapr-1 $(COMMON_LIB) $(CLIENT_LIB) + +.PHONY: all clean + +all: $(TESTS) + +clean: + -@rm -f $(TESTS) $(DEPS) + +# Rule to build test programs. +%: %.cpp + $(CXX) -o $@ $< $(CXXFLAGS) $(LDFLAGS) $(LDLIBS) + +# Dependencies +-include $(DEPS) diff --git a/cpp/client/test/client_test.cpp b/cpp/client/test/client_test.cpp new file mode 100644 index 0000000000..e33beb3b67 --- /dev/null +++ b/cpp/client/test/client_test.cpp @@ -0,0 +1,97 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <iostream> + +#include "QpidError.h" +#include "Channel.h" +#include "Connection.h" +#include "FieldTable.h" +#include "Message.h" +#include "MessageListener.h" + +#include "MonitorImpl.h" + + +using namespace qpid::client; +using namespace qpid::concurrent; + +class SimpleListener : public virtual MessageListener{ + Monitor* monitor; + +public: + inline SimpleListener(Monitor* _monitor) : monitor(_monitor){} + + inline virtual void received(Message& msg){ + std::cout << "Received message " /**<< msg **/<< std::endl; + monitor->acquire(); + monitor->notify(); + monitor->release(); + } +}; + +int main(int argc, char** argv) +{ + try{ + Connection con(argc > 1); + Channel channel; + Exchange exchange("MyExchange", Exchange::TOPIC_EXCHANGE); + Queue queue("MyQueue", true); + + string host("localhost"); + + con.open(host); + std::cout << "Opened connection." << std::endl; + con.openChannel(&channel); + std::cout << "Opened channel." << std::endl; + channel.declareExchange(exchange); + std::cout << "Declared exchange." << std::endl; + channel.declareQueue(queue); + std::cout << "Declared queue." << std::endl; + qpid::framing::FieldTable args; + channel.bind(exchange, queue, "MyTopic", args); + std::cout << "Bound queue to exchange." << std::endl; + + //set up a message listener + MonitorImpl monitor; + SimpleListener listener(&monitor); + string tag("MyTag"); + channel.consume(queue, tag, &listener); + channel.start(); + std::cout << "Registered consumer." << std::endl; + + Message msg; + string data("MyMessage"); + msg.setData(data); + channel.publish(msg, exchange, "MyTopic"); + std::cout << "Published message." << std::endl; + + monitor.acquire(); + monitor.wait(); + monitor.release(); + + + con.closeChannel(&channel); + std::cout << "Closed channel." << std::endl; + con.close(); + std::cout << "Closed connection." << std::endl; + }catch(qpid::QpidError error){ + std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; + return 1; + } + return 0; +} diff --git a/cpp/client/test/topic_listener.cpp b/cpp/client/test/topic_listener.cpp new file mode 100644 index 0000000000..707b3443a1 --- /dev/null +++ b/cpp/client/test/topic_listener.cpp @@ -0,0 +1,180 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <iostream> +#include <sstream> +#include "apr_time.h" +#include "QpidError.h" +#include "Channel.h" +#include "Connection.h" +#include "Exchange.h" +#include "MessageListener.h" +#include "Queue.h" + +using namespace qpid::client; + +class Listener : public MessageListener{ + Channel* const channel; + const std::string responseQueue; + const bool transactional; + bool init; + int count; + apr_time_t start; + + void shutdown(); + void report(); +public: + Listener(Channel* channel, const std::string& reponseQueue, bool tx); + virtual void received(Message& msg); +}; + +class Args{ + string host; + int port; + int ackMode; + bool transactional; + int prefetch; + bool trace; + bool help; +public: + inline Args() : host("localhost"), port(5672), ackMode(NO_ACK), transactional(false), prefetch(1000), trace(false), help(false){} + void parse(int argc, char** argv); + void usage(); + + inline const string& getHost() const { return host;} + inline int getPort() const { return port; } + inline int getAckMode(){ return ackMode; } + inline bool getTransactional() const { return transactional; } + inline int getPrefetch(){ return prefetch; } + inline bool getTrace() const { return trace; } + inline bool getHelp() const { return help; } +}; + +int main(int argc, char** argv){ + Args args; + args.parse(argc, argv); + if(args.getHelp()){ + args.usage(); + }else{ + try{ + Connection connection(args.getTrace()); + connection.open(args.getHost(), args.getPort()); + Channel channel(args.getTransactional(), args.getPrefetch()); + connection.openChannel(&channel); + + //declare exchange, queue and bind them: + Queue response("response"); + channel.declareQueue(response); + + Queue control; + channel.declareQueue(control); + qpid::framing::FieldTable bindArgs; + channel.bind(Exchange::DEFAULT_TOPIC_EXCHANGE, control, "topic_control", bindArgs); + //set up listener + Listener listener(&channel, response.getName(), args.getTransactional()); + std::string tag; + channel.consume(control, tag, &listener, args.getAckMode()); + channel.run(); + connection.close(); + }catch(qpid::QpidError error){ + std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; + } + } +} + +Listener::Listener(Channel* _channel, const std::string& _responseq, bool tx) : + channel(_channel), responseQueue(_responseq), transactional(tx), init(false), count(0){} + +void Listener::received(Message& message){ + if(!init){ + start = apr_time_as_msec(apr_time_now()); + count = 0; + init = true; + } + std::string type(message.getHeaders().getString("TYPE")); + + if(type == "TERMINATION_REQUEST"){ + shutdown(); + }else if(type == "REPORT_REQUEST"){ + //send a report: + report(); + init = false; + }else if (++count % 100 == 0){ + std::cout <<"Received " << count << " messages." << std::endl; + } +} + +void Listener::shutdown(){ + channel->close(); +} + +void Listener::report(){ + apr_time_t finish = apr_time_as_msec(apr_time_now()); + apr_time_t time = finish - start; + std::stringstream report; + report << "Received " << count << " messages in " << time << " ms."; + Message msg; + msg.setData(report.str()); + channel->publish(msg, Exchange::DEFAULT_DIRECT_EXCHANGE, responseQueue); + if(transactional){ + channel->commit(); + } +} + + +void Args::parse(int argc, char** argv){ + for(int i = 1; i < argc; i++){ + string name(argv[i]); + if("-help" == name){ + help = true; + break; + }else if("-host" == name){ + host = argv[++i]; + }else if("-port" == name){ + port = atoi(argv[++i]); + }else if("-ack_mode" == name){ + ackMode = atoi(argv[++i]); + }else if("-transactional" == name){ + transactional = true; + }else if("-prefetch" == name){ + prefetch = atoi(argv[++i]); + }else if("-trace" == name){ + trace = true; + }else{ + std::cout << "Warning: unrecognised option " << name << std::endl; + } + } +} + +void Args::usage(){ + std::cout << "Options:" << std::endl; + std::cout << " -help" << std::endl; + std::cout << " Prints this usage message" << std::endl; + std::cout << " -host <host>" << std::endl; + std::cout << " Specifies host to connect to (default is localhost)" << std::endl; + std::cout << " -port <port>" << std::endl; + std::cout << " Specifies port to conect to (default is 5762)" << std::endl; + std::cout << " -ack_mode <mode>" << std::endl; + std::cout << " Sets the acknowledgement mode" << std::endl; + std::cout << " 0=NO_ACK (default), 1=AUTO_ACK, 2=LAZY_ACK" << std::endl; + std::cout << " -transactional" << std::endl; + std::cout << " Indicates the client should use transactions" << std::endl; + std::cout << " -prefetch <count>" << std::endl; + std::cout << " Specifies the prefetch count (default is 1000)" << std::endl; + std::cout << " -trace" << std::endl; + std::cout << " Indicates that the frames sent and received should be logged" << std::endl; +} diff --git a/cpp/client/test/topic_publisher.cpp b/cpp/client/test/topic_publisher.cpp new file mode 100644 index 0000000000..fc6b7f3b30 --- /dev/null +++ b/cpp/client/test/topic_publisher.cpp @@ -0,0 +1,253 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <iostream> +#include <cstdlib> +#include "unistd.h" +#include "apr_time.h" +#include "MonitorImpl.h" +#include "QpidError.h" +#include "Channel.h" +#include "Connection.h" +#include "Exchange.h" +#include "MessageListener.h" +#include "Queue.h" + +using namespace qpid::client; +using namespace qpid::concurrent; + +class Publisher : public MessageListener{ + Channel* const channel; + const std::string controlTopic; + const bool transactional; + MonitorImpl monitor; + int count; + + void waitForCompletion(int msgs); + string generateData(int size); + +public: + Publisher(Channel* channel, const std::string& controlTopic, bool tx); + virtual void received(Message& msg); + apr_time_t publish(int msgs, int listeners, int size); + void terminate(); +}; + +class Args{ + string host; + int port; + int messages; + int subscribers; + int ackMode; + bool transactional; + int prefetch; + int batches; + int delay; + int size; + bool trace; + bool help; +public: + inline Args() : host("localhost"), port(5672), messages(1000), subscribers(1), + ackMode(NO_ACK), transactional(false), prefetch(1000), batches(1), + delay(0), size(256), trace(false), help(false){} + + void parse(int argc, char** argv); + void usage(); + + inline const string& getHost() const { return host;} + inline int getPort() const { return port; } + inline int getMessages() const { return messages; } + inline int getSubscribers() const { return subscribers; } + inline int getAckMode(){ return ackMode; } + inline bool getTransactional() const { return transactional; } + inline int getPrefetch(){ return prefetch; } + inline int getBatches(){ return batches; } + inline int getDelay(){ return delay; } + inline int getSize(){ return size; } + inline bool getTrace() const { return trace; } + inline bool getHelp() const { return help; } +}; + +int main(int argc, char** argv){ + Args args; + args.parse(argc, argv); + if(args.getHelp()){ + args.usage(); + }else{ + try{ + Connection connection(args.getTrace()); + connection.open(args.getHost(), args.getPort()); + Channel channel(args.getTransactional(), args.getPrefetch()); + connection.openChannel(&channel); + + //declare queue (relying on default binding): + Queue response("response"); + channel.declareQueue(response); + + //set up listener + Publisher publisher(&channel, "topic_control", args.getTransactional()); + std::string tag("mytag"); + channel.consume(response, tag, &publisher, args.getAckMode()); + channel.start(); + + int batchSize(args.getBatches()); + apr_time_t max(0); + apr_time_t min(0); + apr_time_t sum(0); + for(int i = 0; i < batchSize; i++){ + if(i > 0 && args.getDelay()) sleep(args.getDelay()); + apr_time_t time = publisher.publish(args.getMessages(), args.getSubscribers(), args.getSize()); + if(!max || time > max) max = time; + if(!min || time < min) min = time; + sum += time; + std::cout << "Completed " << (i+1) << " of " << batchSize << " in " << time << "ms" << std::endl; + } + publisher.terminate(); + apr_time_t avg = sum / batchSize; + if(batchSize > 1){ + std::cout << batchSize << " batches completed. avg=" << avg << + ", max=" << max << ", min=" << min << std::endl; + } + channel.close(); + connection.close(); + }catch(qpid::QpidError error){ + std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; + } + } +} + +Publisher::Publisher(Channel* _channel, const std::string& _controlTopic, bool tx) : + channel(_channel), controlTopic(_controlTopic), transactional(tx){} + +void Publisher::received(Message& msg){ + //count responses and when all are received end the current batch + monitor.acquire(); + if(--count == 0){ + monitor.notify(); + } + std::cout << "Received report: " << msg.getData() << " (" << count << " remaining)." << std::endl; + monitor.release(); +} + +void Publisher::waitForCompletion(int msgs){ + count = msgs; + monitor.wait(); +} + +apr_time_t Publisher::publish(int msgs, int listeners, int size){ + monitor.acquire(); + Message msg; + msg.setData(generateData(size)); + apr_time_t start(apr_time_as_msec(apr_time_now())); + for(int i = 0; i < msgs; i++){ + channel->publish(msg, Exchange::DEFAULT_TOPIC_EXCHANGE, controlTopic); + } + //send report request + Message reportRequest; + reportRequest.getHeaders().setString("TYPE", "REPORT_REQUEST"); + channel->publish(reportRequest, Exchange::DEFAULT_TOPIC_EXCHANGE, controlTopic); + if(transactional){ + channel->commit(); + } + + waitForCompletion(listeners); + monitor.release(); + apr_time_t finish(apr_time_as_msec(apr_time_now())); + + return finish - start; +} + +string Publisher::generateData(int size){ + string data; + for(int i = 0; i < size; i++){ + data += ('A' + (i / 26)); + } + return data; +} + +void Publisher::terminate(){ + //send termination request + Message terminationRequest; + terminationRequest.getHeaders().setString("TYPE", "TERMINATION_REQUEST"); + channel->publish(terminationRequest, Exchange::DEFAULT_TOPIC_EXCHANGE, controlTopic); + if(transactional){ + channel->commit(); + } +} + +void Args::parse(int argc, char** argv){ + for(int i = 1; i < argc; i++){ + string name(argv[i]); + if("-help" == name){ + help = true; + break; + }else if("-host" == name){ + host = argv[++i]; + }else if("-port" == name){ + port = atoi(argv[++i]); + }else if("-messages" == name){ + messages = atoi(argv[++i]); + }else if("-subscribers" == name){ + subscribers = atoi(argv[++i]); + }else if("-ack_mode" == name){ + ackMode = atoi(argv[++i]); + }else if("-transactional" == name){ + transactional = true; + }else if("-prefetch" == name){ + prefetch = atoi(argv[++i]); + }else if("-batches" == name){ + batches = atoi(argv[++i]); + }else if("-delay" == name){ + delay = atoi(argv[++i]); + }else if("-size" == name){ + size = atoi(argv[++i]); + }else if("-trace" == name){ + trace = true; + }else{ + std::cout << "Warning: unrecognised option " << name << std::endl; + } + } +} + +void Args::usage(){ + std::cout << "Options:" << std::endl; + std::cout << " -help" << std::endl; + std::cout << " Prints this usage message" << std::endl; + std::cout << " -host <host>" << std::endl; + std::cout << " Specifies host to connect to (default is localhost)" << std::endl; + std::cout << " -port <port>" << std::endl; + std::cout << " Specifies port to conect to (default is 5762)" << std::endl; + std::cout << " -messages <count>" << std::endl; + std::cout << " Specifies how many messages to send" << std::endl; + std::cout << " -subscribers <count>" << std::endl; + std::cout << " Specifies how many subscribers to expect reports from" << std::endl; + std::cout << " -ack_mode <mode>" << std::endl; + std::cout << " Sets the acknowledgement mode" << std::endl; + std::cout << " 0=NO_ACK (default), 1=AUTO_ACK, 2=LAZY_ACK" << std::endl; + std::cout << " -transactional" << std::endl; + std::cout << " Indicates the client should use transactions" << std::endl; + std::cout << " -prefetch <count>" << std::endl; + std::cout << " Specifies the prefetch count (default is 1000)" << std::endl; + std::cout << " -batches <count>" << std::endl; + std::cout << " Specifies how many batches to run" << std::endl; + std::cout << " -delay <seconds>" << std::endl; + std::cout << " Causes a delay between each batch" << std::endl; + std::cout << " -size <bytes>" << std::endl; + std::cout << " Sets the size of the published messages (default is 256 bytes)" << std::endl; + std::cout << " -trace" << std::endl; + std::cout << " Indicates that the frames sent and received should be logged" << std::endl; +} |