summaryrefslogtreecommitdiff
path: root/cpp/client/inc
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2006-09-19 22:06:50 +0000
committerRafael H. Schloming <rhs@apache.org>2006-09-19 22:06:50 +0000
commit913489deb2ee9dbf44455de5f407ddaf4bd8c540 (patch)
tree7ea442d6867d0076f1c9ea4f4265664059e7aff5 /cpp/client/inc
downloadqpid-python-913489deb2ee9dbf44455de5f407ddaf4bd8c540.tar.gz
Import of qpid from etp:
URL: https://etp.108.redhat.com/svn/etp/trunk/blaze Repository Root: https://etp.108.redhat.com/svn/etp Repository UUID: 06e15bec-b515-0410-bef0-cc27a458cf48 Revision: 608 git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@447994 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/client/inc')
-rw-r--r--cpp/client/inc/Channel.h127
-rw-r--r--cpp/client/inc/Connection.h105
-rw-r--r--cpp/client/inc/Exchange.h49
-rw-r--r--cpp/client/inc/IncomingMessage.h60
-rw-r--r--cpp/client/inc/Message.h86
-rw-r--r--cpp/client/inc/MessageListener.h37
-rw-r--r--cpp/client/inc/Queue.h47
-rw-r--r--cpp/client/inc/ResponseHandler.h49
-rw-r--r--cpp/client/inc/ReturnedMessageHandler.h37
9 files changed, 597 insertions, 0 deletions
diff --git a/cpp/client/inc/Channel.h b/cpp/client/inc/Channel.h
new file mode 100644
index 0000000000..debecf922e
--- /dev/null
+++ b/cpp/client/inc/Channel.h
@@ -0,0 +1,127 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <map>
+#include <string>
+#include <queue>
+#include "sys/types.h"
+
+#ifndef _Channel_
+#define _Channel_
+
+#include "amqp_framing.h"
+
+#include "ThreadFactory.h"
+
+#include "Connection.h"
+#include "Exchange.h"
+#include "IncomingMessage.h"
+#include "Message.h"
+#include "MessageListener.h"
+#include "Queue.h"
+#include "ResponseHandler.h"
+#include "ReturnedMessageHandler.h"
+
+namespace qpid {
+namespace client {
+ enum ack_modes {NO_ACK=0, AUTO_ACK=1, LAZY_ACK=2, CLIENT_ACK=3};
+
+ class Channel : private virtual qpid::framing::BodyHandler, public virtual qpid::concurrent::Runnable{
+ struct Consumer{
+ MessageListener* listener;
+ int ackMode;
+ int count;
+ u_int64_t lastDeliveryTag;
+ };
+ typedef std::map<string,Consumer*>::iterator consumer_iterator;
+
+ u_int16_t id;
+ Connection* con;
+ qpid::concurrent::ThreadFactory* threadFactory;
+ qpid::concurrent::Thread* dispatcher;
+ qpid::framing::OutputHandler* out;
+ IncomingMessage* incoming;
+ ResponseHandler responses;
+ std::queue<IncomingMessage*> messages;//holds returned messages or those delivered for a consume
+ IncomingMessage* retrieved;//holds response to basic.get
+ qpid::concurrent::Monitor* dispatchMonitor;
+ qpid::concurrent::Monitor* retrievalMonitor;
+ std::map<std::string, Consumer*> consumers;
+ ReturnedMessageHandler* returnsHandler;
+ bool closed;
+
+ u_int16_t prefetch;
+ const bool transactional;
+
+ void enqueue();
+ void retrieve(Message& msg);
+ IncomingMessage* dequeue();
+ void dispatch();
+ void stop();
+ void sendAndReceive(qpid::framing::AMQFrame* frame, const qpid::framing::AMQMethodBody& body);
+ void deliver(Consumer* consumer, Message& msg);
+ void setQos();
+ void cancelAll();
+
+ virtual void handleMethod(qpid::framing::AMQMethodBody::shared_ptr body);
+ virtual void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr body);
+ virtual void handleContent(qpid::framing::AMQContentBody::shared_ptr body);
+ virtual void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body);
+
+ public:
+ Channel(bool transactional = false, u_int16_t prefetch = 500);
+ ~Channel();
+
+ void declareExchange(Exchange& exchange, bool synch = true);
+ void deleteExchange(Exchange& exchange, bool synch = true);
+ void declareQueue(Queue& queue, bool synch = true);
+ void deleteQueue(Queue& queue, bool ifunused = false, bool ifempty = false, bool synch = true);
+ void bind(const Exchange& exchange, const Queue& queue, const std::string& key,
+ const qpid::framing::FieldTable& args, bool synch = true);
+ void consume(Queue& queue, std::string& tag, MessageListener* listener,
+ int ackMode = NO_ACK, bool noLocal = false, bool synch = true);
+ void cancel(std::string& tag, bool synch = true);
+ bool get(Message& msg, const Queue& queue, int ackMode = NO_ACK);
+ void publish(Message& msg, const Exchange& exchange, const std::string& routingKey,
+ bool mandatory = false, bool immediate = false);
+
+ void commit();
+ void rollback();
+
+ void setPrefetch(u_int16_t prefetch);
+
+ /**
+ * Start message dispatching on a new thread
+ */
+ void start();
+ /**
+ * Do message dispatching on this thread
+ */
+ void run();
+
+ void close();
+
+ void setReturnedMessageHandler(ReturnedMessageHandler* handler);
+
+ friend class Connection;
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/client/inc/Connection.h b/cpp/client/inc/Connection.h
new file mode 100644
index 0000000000..89169e92b1
--- /dev/null
+++ b/cpp/client/inc/Connection.h
@@ -0,0 +1,105 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <map>
+#include <string>
+
+#ifndef _Connection_
+#define _Connection_
+
+#include "QpidError.h"
+#include "Connector.h"
+#include "ShutdownHandler.h"
+#include "TimeoutHandler.h"
+
+#include "amqp_framing.h"
+#include "Exchange.h"
+#include "IncomingMessage.h"
+#include "Message.h"
+#include "MessageListener.h"
+#include "Queue.h"
+#include "ResponseHandler.h"
+
+namespace qpid {
+namespace client {
+
+ class Channel;
+
+ class Connection : public virtual qpid::framing::InputHandler,
+ public virtual qpid::io::TimeoutHandler,
+ public virtual qpid::io::ShutdownHandler,
+ private virtual qpid::framing::BodyHandler{
+
+ typedef std::map<int, Channel*>::iterator iterator;
+
+ static u_int16_t channelIdCounter;
+
+ std::string host;
+ int port;
+ const u_int32_t max_frame_size;
+ std::map<int, Channel*> channels;
+ qpid::io::Connector* connector;
+ qpid::framing::OutputHandler* out;
+ ResponseHandler responses;
+ volatile bool closed;
+
+ void channelException(Channel* channel, qpid::framing::AMQMethodBody* body, QpidError& e);
+ void error(int code, const string& msg, int classid = 0, int methodid = 0);
+ void closeChannel(Channel* channel, u_int16_t code, string& text, u_int16_t classId = 0, u_int16_t methodId = 0);
+ void sendAndReceive(qpid::framing::AMQFrame* frame, const qpid::framing::AMQMethodBody& body);
+
+ virtual void handleMethod(qpid::framing::AMQMethodBody::shared_ptr body);
+ virtual void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr body);
+ virtual void handleContent(qpid::framing::AMQContentBody::shared_ptr body);
+ virtual void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body);
+
+ public:
+
+ Connection(bool debug = false, u_int32_t max_frame_size = 65536);
+ ~Connection();
+ void open(const std::string& host, int port = 5672,
+ const std::string& uid = "guest", const std::string& pwd = "guest",
+ const std::string& virtualhost = "/");
+ void close();
+ void openChannel(Channel* channel);
+ /*
+ * Requests that the server close this channel, then removes
+ * the association to the channel from this connection
+ */
+ void closeChannel(Channel* channel);
+ /*
+ * Removes the channel from association with this connection,
+ * without sending a close request to the server.
+ */
+ void removeChannel(Channel* channel);
+
+ virtual void received(qpid::framing::AMQFrame* frame);
+
+ virtual void idleOut();
+ virtual void idleIn();
+
+ virtual void shutdown();
+
+ inline u_int32_t getMaxFrameSize(){ return max_frame_size; }
+ };
+
+
+}
+}
+
+
+#endif
diff --git a/cpp/client/inc/Exchange.h b/cpp/client/inc/Exchange.h
new file mode 100644
index 0000000000..66593a41cc
--- /dev/null
+++ b/cpp/client/inc/Exchange.h
@@ -0,0 +1,49 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <string>
+
+#ifndef _Exchange_
+#define _Exchange_
+
+namespace qpid {
+namespace client {
+
+ class Exchange{
+ const std::string name;
+ const std::string type;
+
+ public:
+
+ static const std::string DIRECT_EXCHANGE;
+ static const std::string TOPIC_EXCHANGE;
+ static const std::string HEADERS_EXCHANGE;
+
+ static const Exchange DEFAULT_DIRECT_EXCHANGE;
+ static const Exchange DEFAULT_TOPIC_EXCHANGE;
+ static const Exchange DEFAULT_HEADERS_EXCHANGE;
+
+ Exchange(std::string name, std::string type = DIRECT_EXCHANGE);
+ const std::string& getName() const;
+ const std::string& getType() const;
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/client/inc/IncomingMessage.h b/cpp/client/inc/IncomingMessage.h
new file mode 100644
index 0000000000..1fee6af433
--- /dev/null
+++ b/cpp/client/inc/IncomingMessage.h
@@ -0,0 +1,60 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <string>
+#include <vector>
+#include "amqp_framing.h"
+
+#ifndef _IncomingMessage_
+#define _IncomingMessage_
+
+#include "Message.h"
+
+namespace qpid {
+namespace client {
+
+ class IncomingMessage{
+ //content will be preceded by one of these method frames
+ qpid::framing::BasicDeliverBody::shared_ptr delivered;
+ qpid::framing::BasicReturnBody::shared_ptr returned;
+ qpid::framing::BasicGetOkBody::shared_ptr response;
+ qpid::framing::AMQHeaderBody::shared_ptr header;
+ std::vector<qpid::framing::AMQContentBody::shared_ptr> content;
+
+ long contentSize();
+ public:
+ IncomingMessage(qpid::framing::BasicDeliverBody::shared_ptr intro);
+ IncomingMessage(qpid::framing::BasicReturnBody::shared_ptr intro);
+ IncomingMessage(qpid::framing::BasicGetOkBody::shared_ptr intro);
+ ~IncomingMessage();
+ void setHeader(qpid::framing::AMQHeaderBody::shared_ptr header);
+ void addContent(qpid::framing::AMQContentBody::shared_ptr content);
+ bool isComplete();
+ bool isReturn();
+ bool isDelivery();
+ bool isResponse();
+ string& getConsumerTag();//only relevant if isDelivery()
+ qpid::framing::AMQHeaderBody::shared_ptr& getHeader();
+ u_int64_t getDeliveryTag();
+ void getData(string& data);
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/client/inc/Message.h b/cpp/client/inc/Message.h
new file mode 100644
index 0000000000..f8a5aef565
--- /dev/null
+++ b/cpp/client/inc/Message.h
@@ -0,0 +1,86 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <string>
+#include "amqp_framing.h"
+
+#ifndef _Message_
+#define _Message_
+
+
+namespace qpid {
+namespace client {
+
+ class Message{
+ qpid::framing::AMQHeaderBody::shared_ptr header;
+ string data;
+ bool redelivered;
+ u_int64_t deliveryTag;
+
+ qpid::framing::BasicHeaderProperties* getHeaderProperties();
+ Message(qpid::framing::AMQHeaderBody::shared_ptr& header);
+ public:
+ Message();
+ ~Message();
+
+ inline std::string getData(){ return data; }
+ inline void setData(const std::string& data){ this->data = data; }
+
+ inline bool isRedelivered(){ return redelivered; }
+ inline void setRedelivered(bool redelivered){ this->redelivered = redelivered; }
+
+ inline u_int64_t getDeliveryTag(){ return deliveryTag; }
+
+ std::string& getContentType();
+ std::string& getContentEncoding();
+ qpid::framing::FieldTable& getHeaders();
+ u_int8_t getDeliveryMode();
+ u_int8_t getPriority();
+ std::string& getCorrelationId();
+ std::string& getReplyTo();
+ std::string& getExpiration();
+ std::string& getMessageId();
+ u_int64_t getTimestamp();
+ std::string& getType();
+ std::string& getUserId();
+ std::string& getAppId();
+ std::string& getClusterId();
+
+ void setContentType(std::string& type);
+ void setContentEncoding(std::string& encoding);
+ void setHeaders(qpid::framing::FieldTable& headers);
+ void setDeliveryMode(u_int8_t mode);
+ void setPriority(u_int8_t priority);
+ void setCorrelationId(std::string& correlationId);
+ void setReplyTo(std::string& replyTo);
+ void setExpiration(std::string& expiration);
+ void setMessageId(std::string& messageId);
+ void setTimestamp(u_int64_t timestamp);
+ void setType(std::string& type);
+ void setUserId(std::string& userId);
+ void setAppId(std::string& appId);
+ void setClusterId(std::string& clusterId);
+
+
+ friend class Channel;
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/client/inc/MessageListener.h b/cpp/client/inc/MessageListener.h
new file mode 100644
index 0000000000..47307a4df5
--- /dev/null
+++ b/cpp/client/inc/MessageListener.h
@@ -0,0 +1,37 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <string>
+
+#ifndef _MessageListener_
+#define _MessageListener_
+
+#include "Message.h"
+
+namespace qpid {
+namespace client {
+
+ class MessageListener{
+ public:
+ virtual void received(Message& msg) = 0;
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/client/inc/Queue.h b/cpp/client/inc/Queue.h
new file mode 100644
index 0000000000..e0964af774
--- /dev/null
+++ b/cpp/client/inc/Queue.h
@@ -0,0 +1,47 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <string>
+
+#ifndef _Queue_
+#define _Queue_
+
+namespace qpid {
+namespace client {
+
+ class Queue{
+ std::string name;
+ const bool autodelete;
+ const bool exclusive;
+
+ public:
+
+ Queue();
+ Queue(std::string name);
+ Queue(std::string name, bool temp);
+ Queue(std::string name, bool autodelete, bool exclusive);
+ const std::string& getName() const;
+ void setName(const std::string&);
+ bool isAutoDelete() const;
+ bool isExclusive() const;
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/client/inc/ResponseHandler.h b/cpp/client/inc/ResponseHandler.h
new file mode 100644
index 0000000000..f5392c954d
--- /dev/null
+++ b/cpp/client/inc/ResponseHandler.h
@@ -0,0 +1,49 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <string>
+#include "amqp_framing.h"
+#include "Monitor.h"
+
+#ifndef _ResponseHandler_
+#define _ResponseHandler_
+
+namespace qpid {
+ namespace client {
+
+ class ResponseHandler{
+ bool waiting;
+ qpid::framing::AMQMethodBody::shared_ptr response;
+ qpid::concurrent::Monitor* monitor;
+
+ public:
+ ResponseHandler();
+ ~ResponseHandler();
+ inline bool isWaiting(){ return waiting; }
+ inline qpid::framing::AMQMethodBody::shared_ptr getResponse(){ return response; }
+ bool validate(const qpid::framing::AMQMethodBody& expected);
+ void waitForResponse();
+ void signalResponse(qpid::framing::AMQMethodBody::shared_ptr response);
+ void receive(const qpid::framing::AMQMethodBody& expected);
+ void expect();//must be called before calling receive
+ };
+
+ }
+}
+
+
+#endif
diff --git a/cpp/client/inc/ReturnedMessageHandler.h b/cpp/client/inc/ReturnedMessageHandler.h
new file mode 100644
index 0000000000..0117778fde
--- /dev/null
+++ b/cpp/client/inc/ReturnedMessageHandler.h
@@ -0,0 +1,37 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <string>
+
+#ifndef _ReturnedMessageHandler_
+#define _ReturnedMessageHandler_
+
+#include "Message.h"
+
+namespace qpid {
+namespace client {
+
+ class ReturnedMessageHandler{
+ public:
+ virtual void returned(Message& msg) = 0;
+ };
+
+}
+}
+
+
+#endif