summaryrefslogtreecommitdiff
path: root/qpid/cpp/include/qpid/client
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/include/qpid/client')
-rw-r--r--qpid/cpp/include/qpid/client/AsyncSession.h38
-rw-r--r--qpid/cpp/include/qpid/client/ClientImportExport.h35
-rw-r--r--qpid/cpp/include/qpid/client/Completion.h71
-rw-r--r--qpid/cpp/include/qpid/client/Connection.h228
-rw-r--r--qpid/cpp/include/qpid/client/ConnectionSettings.h134
-rw-r--r--qpid/cpp/include/qpid/client/FailoverListener.h88
-rw-r--r--qpid/cpp/include/qpid/client/FailoverManager.h137
-rw-r--r--qpid/cpp/include/qpid/client/FlowControl.h75
-rw-r--r--qpid/cpp/include/qpid/client/Future.h59
-rw-r--r--qpid/cpp/include/qpid/client/FutureCompletion.h49
-rw-r--r--qpid/cpp/include/qpid/client/FutureResult.h49
-rw-r--r--qpid/cpp/include/qpid/client/Handle.h71
-rw-r--r--qpid/cpp/include/qpid/client/LocalQueue.h120
-rw-r--r--qpid/cpp/include/qpid/client/Message.h175
-rw-r--r--qpid/cpp/include/qpid/client/MessageListener.h101
-rw-r--r--qpid/cpp/include/qpid/client/MessageReplayTracker.h73
-rw-r--r--qpid/cpp/include/qpid/client/QueueOptions.h129
-rw-r--r--qpid/cpp/include/qpid/client/Session.h39
-rw-r--r--qpid/cpp/include/qpid/client/SessionBase_0_10.h110
-rw-r--r--qpid/cpp/include/qpid/client/Subscription.h123
-rw-r--r--qpid/cpp/include/qpid/client/SubscriptionManager.h292
-rw-r--r--qpid/cpp/include/qpid/client/SubscriptionSettings.h123
-rw-r--r--qpid/cpp/include/qpid/client/TypedResult.h65
23 files changed, 2384 insertions, 0 deletions
diff --git a/qpid/cpp/include/qpid/client/AsyncSession.h b/qpid/cpp/include/qpid/client/AsyncSession.h
new file mode 100644
index 0000000000..d91efeb4f1
--- /dev/null
+++ b/qpid/cpp/include/qpid/client/AsyncSession.h
@@ -0,0 +1,38 @@
+#ifndef QPID_CLIENT_ASYNCSESSION_H
+#define QPID_CLIENT_ASYNCSESSION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/client/AsyncSession_0_10.h"
+
+namespace qpid {
+namespace client {
+
+/**
+ * AsyncSession is an alias for Session_0_10
+ *
+ * \ingroup clientapi
+ */
+typedef AsyncSession_0_10 AsyncSession;
+
+}} // namespace qpid::client
+
+#endif /*!QPID_CLIENT_ASYNCSESSION_H*/
diff --git a/qpid/cpp/include/qpid/client/ClientImportExport.h b/qpid/cpp/include/qpid/client/ClientImportExport.h
new file mode 100644
index 0000000000..2a3a5a52e9
--- /dev/null
+++ b/qpid/cpp/include/qpid/client/ClientImportExport.h
@@ -0,0 +1,35 @@
+#ifndef QPID_CLIENT_IMPORT_EXPORT_H
+#define QPID_CLIENT_IMPORT_EXPORT_H
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qpid/ImportExport.h"
+
+#if defined(CLIENT_EXPORT) || defined (qpidclient_EXPORTS)
+# define QPID_CLIENT_EXTERN QPID_EXPORT
+# define QPID_CLIENT_CLASS_EXTERN QPID_CLASS_EXPORT
+# define QPID_CLIENT_INLINE_EXTERN QPID_INLINE_EXPORT
+#else
+# define QPID_CLIENT_EXTERN QPID_IMPORT
+# define QPID_CLIENT_CLASS_EXTERN QPID_CLASS_IMPORT
+# define QPID_CLIENT_INLINE_EXTERN QPID_INLINE_IMPORT
+#endif
+
+#endif
diff --git a/qpid/cpp/include/qpid/client/Completion.h b/qpid/cpp/include/qpid/client/Completion.h
new file mode 100644
index 0000000000..9546db9258
--- /dev/null
+++ b/qpid/cpp/include/qpid/client/Completion.h
@@ -0,0 +1,71 @@
+#ifndef QPID_CLIENT_COMPLETION_H
+#define QPID_CLIENT_COMPLETION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/client/Handle.h"
+#include "qpid/client/ClientImportExport.h"
+#include <string>
+
+namespace qpid {
+namespace client {
+
+class CompletionImpl;
+template <class T> class PrivateImplRef;
+
+/**
+ * Asynchronous commands that do not return a result will return a
+ * Completion. You can use the completion to wait for that specific
+ * command to complete.
+ *
+ *@see TypedResult
+ *
+ *\ingroup clientapi
+ */
+class QPID_CLIENT_CLASS_EXTERN Completion : public Handle<CompletionImpl>
+{
+public:
+ QPID_CLIENT_EXTERN Completion(CompletionImpl* = 0);
+ QPID_CLIENT_EXTERN Completion(const Completion&);
+ QPID_CLIENT_EXTERN ~Completion();
+ QPID_CLIENT_EXTERN Completion& operator=(const Completion&);
+
+ /** Wait for the asynchronous command that returned this
+ *Completion to complete.
+ *
+ *@exception If the command returns an error.
+ */
+ QPID_CLIENT_EXTERN void wait();
+ QPID_CLIENT_EXTERN bool isComplete();
+
+ protected:
+ QPID_CLIENT_EXTERN std::string getResult();
+
+ private:
+ typedef CompletionImpl Impl;
+ friend class PrivateImplRef<Completion>;
+};
+
+}}
+
+
+#endif /*!QPID_CLIENT_COMPLETION_H*/
diff --git a/qpid/cpp/include/qpid/client/Connection.h b/qpid/cpp/include/qpid/client/Connection.h
new file mode 100644
index 0000000000..2477bf4800
--- /dev/null
+++ b/qpid/cpp/include/qpid/client/Connection.h
@@ -0,0 +1,228 @@
+#ifndef QPID_CLIENT_CONNECTION_H
+#define QPID_CLIENT_CONNECTION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <map>
+#include <string>
+#include "qpid/client/Session.h"
+#include "qpid/client/ClientImportExport.h"
+#include "qpid/client/ConnectionSettings.h"
+#include "qpid/framing/ProtocolVersion.h"
+
+#include "boost/function.hpp"
+
+namespace qpid {
+
+struct Url;
+
+namespace client {
+
+class ConnectionImpl;
+
+/**
+ * Represents a connection to an AMQP broker. All communication is
+ * initiated by establishing a connection, then creating one or more
+ * Session objects using the connection. @see newSession()
+ *
+ * \ingroup clientapi
+ *
+ * Some methods use an AMQP 0-10 URL to specify connection parameters.
+ * This is defined in the AMQP 0-10 specification (http://jira.amqp.org/confluence/display/AMQP/AMQP+Specification).
+ *
+ * amqp_url = "amqp:" prot_addr_list
+ * prot_addr_list = [prot_addr ","]* prot_addr
+ * prot_addr = tcp_prot_addr | tls_prot_addr
+ *
+ * tcp_prot_addr = tcp_id tcp_addr
+ * tcp_id = "tcp:" | ""
+ * tcp_addr = [host [":" port] ]
+ * host = <as per http://www.ietf.org/rfc/rfc3986.txt>
+ * port = number]]>
+ *
+ */
+
+class QPID_CLIENT_CLASS_EXTERN Connection
+{
+ framing::ProtocolVersion version;
+
+ boost::function<void ()> failureCallback;
+
+
+ protected:
+ boost::shared_ptr<ConnectionImpl> impl;
+
+
+ public:
+ /**
+ * Creates a Connection object, but does not open the connection.
+ * @see open()
+ */
+ QPID_CLIENT_EXTERN Connection();
+
+ /**
+ * Destroys a Connection object but does not close the connection if it
+ * was open. @see close()
+ */
+ QPID_CLIENT_EXTERN ~Connection();
+
+ /**
+ * Opens a connection to a broker.
+ *
+ * @param host the host on which the broker is running.
+ *
+ * @param port the port on the which the broker is listening.
+ *
+ * @param uid the userid to connect with.
+ *
+ * @param pwd the password to connect with (currently SASL
+ * PLAIN is the only authentication method supported so this
+ * is sent in clear text).
+ *
+ * @param virtualhost the AMQP virtual host to use (virtual
+ * hosts, where implemented(!), provide namespace partitioning
+ * within a single broker).
+ */
+ QPID_CLIENT_EXTERN void open(const std::string& host, int port = 5672,
+ const std::string& uid = "guest",
+ const std::string& pwd = "guest",
+ const std::string& virtualhost = "/", uint16_t maxFrameSize=65535);
+
+ /**
+ * Opens a connection to a broker using a URL.
+ * If the URL contains multiple addresses, try each in turn
+ * till connection is successful.
+ *
+ * @url address of the broker to connect to.
+ *
+ * @param uid the userid to connect with.
+ *
+ * @param pwd the password to connect with (currently SASL
+ * PLAIN is the only authentication method supported so this
+ * is sent in clear text).
+ *
+ * @param virtualhost the AMQP virtual host to use (virtual
+ * hosts, where implemented(!), provide namespace partitioning
+ * within a single broker).
+ */
+ QPID_CLIENT_EXTERN void open(const Url& url,
+ const std::string& uid = "guest",
+ const std::string& pwd = "guest",
+ const std::string& virtualhost = "/", uint16_t maxFrameSize=65535);
+
+ /**
+ * Opens a connection to a broker using a URL.
+ * If the URL contains multiple addresses, try each in turn
+ * till connection is successful.
+ *
+ * @url address of the broker to connect to.
+ *
+ * @param settings used for any settings not provided by the URL.
+ * Settings provided by the url (e.g. host, port) are ignored.
+ */
+ QPID_CLIENT_EXTERN void open(const Url& url, const ConnectionSettings& settings);
+
+ /**
+ * Opens a connection to a broker.
+ *
+ * @param the settings to use (host, port etc). @see ConnectionSettings.
+ */
+ QPID_CLIENT_EXTERN void open(const ConnectionSettings& settings);
+
+ /**
+ * Close the connection.
+ *
+ * Any further use of this connection (without reopening it) will
+ * not succeed.
+ */
+ QPID_CLIENT_EXTERN void close();
+
+ /**
+ * Create a new session on this connection. Sessions allow
+ * multiple streams of work to be multiplexed over the same
+ * connection. The returned Session provides functions to send
+ * commands to the broker.
+ *
+ * Session functions are synchronous. In other words, a Session
+ * function will send a command to the broker and does not return
+ * until it receives the broker's response confirming the command
+ * was executed.
+ *
+ * AsyncSession provides asynchronous versions of the same
+ * functions. These functions send a command to the broker but do
+ * not wait for a response.
+ *
+ * You can convert a Session s into an AsyncSession as follows:
+ * @code
+ * #include <qpid/client/AsyncSession.h>
+ * AsyncSession as = async(s);
+ * @endcode
+ *
+ * You can execute a single command asynchronously will a Session s
+ * like ths:
+ * @code
+ * async(s).messageTransfer(...);
+ * @endcode
+ *
+ * Using an AsyncSession is faster for sending large numbers of
+ * commands, since each command is sent as soon as possible
+ * without waiting for the previous command to be confirmed.
+ *
+ * However with AsyncSession you cannot assume that a command has
+ * completed until you explicitly synchronize. The simplest way to
+ * do this is to call Session::sync() or AsyncSession::sync().
+ * Both of these functions wait for the broker to confirm all
+ * commands issued so far on the session.
+ *
+ *@param name: A name to identify the session. @see qpid::SessionId
+ * If the name is empty (the default) then a unique name will be
+ * chosen using a Universally-unique identifier (UUID) algorithm.
+ */
+ QPID_CLIENT_EXTERN Session newSession(const std::string& name=std::string(), uint32_t timeoutSeconds = 0);
+
+ /**
+ * Resume a suspended session. A session may be resumed
+ * on a different connection to the one that created it.
+ */
+ QPID_CLIENT_EXTERN void resume(Session& session);
+
+ QPID_CLIENT_EXTERN bool isOpen() const;
+
+ /** In a cluster, returns the initial set of known broker URLs
+ * at the time of connection.
+ */
+ QPID_CLIENT_EXTERN std::vector<Url> getInitialBrokers();
+
+ QPID_CLIENT_EXTERN void registerFailureCallback ( boost::function<void ()> fn );
+
+ /**
+ * Return the set of client negotiated settings
+ */
+ QPID_CLIENT_EXTERN const ConnectionSettings& getNegotiatedSettings();
+
+ friend struct ConnectionAccess; ///<@internal
+ friend class SessionBase_0_10; ///<@internal
+};
+
+}} // namespace qpid::client
+
+
+#endif /*!QPID_CLIENT_CONNECTION_H*/
diff --git a/qpid/cpp/include/qpid/client/ConnectionSettings.h b/qpid/cpp/include/qpid/client/ConnectionSettings.h
new file mode 100644
index 0000000000..2b6b86f891
--- /dev/null
+++ b/qpid/cpp/include/qpid/client/ConnectionSettings.h
@@ -0,0 +1,134 @@
+#ifndef QPID_CLIENT_CONNECTIONSETTINGS_H
+#define QPID_CLIENT_CONNECTIONSETTINGS_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/client/ClientImportExport.h"
+#include "qpid/sys/IntegerTypes.h"
+#include <string>
+
+namespace qpid {
+
+namespace sys {
+class Socket;
+}
+
+namespace client {
+
+/**
+ * Settings for a Connection.
+ */
+struct QPID_CLIENT_CLASS_EXTERN ConnectionSettings {
+
+ QPID_CLIENT_EXTERN ConnectionSettings();
+ QPID_CLIENT_EXTERN virtual ~ConnectionSettings();
+
+ /**
+ * Allows socket to be configured; default only sets tcp-nodelay
+ * based on the flag set. Can be overridden.
+ */
+ QPID_CLIENT_EXTERN virtual void configureSocket(qpid::sys::Socket&) const;
+
+ /**
+ * The protocol used for the connection (defaults to 'tcp')
+ */
+ std::string protocol;
+
+ /**
+ * The host (or ip address) to connect to (defaults to 'localhost').
+ */
+ std::string host;
+ /**
+ * The port to connect to (defaults to 5672).
+ */
+ uint16_t port;
+ /**
+ * Allows an AMQP 'virtual host' to be specified for the
+ * connection.
+ */
+ std::string virtualhost;
+
+ /**
+ * The username to use when authenticating the connection. If not
+ * specified the current users login is used if available.
+ */
+ std::string username;
+ /**
+ * The password to use when authenticating the connection.
+ */
+ std::string password;
+ /**
+ * The SASL mechanism to use when authenticating the connection;
+ * the options are currently PLAIN or ANONYMOUS.
+ */
+ std::string mechanism;
+ /**
+ * Allows a locale to be specified for the connection.
+ */
+ std::string locale;
+ /**
+ * Allows a heartbeat frequency to be specified
+ */
+ uint16_t heartbeat;
+ /**
+ * The maximum number of channels that the client will request for
+ * use on this connection.
+ */
+ uint16_t maxChannels;
+ /**
+ * The maximum frame size that the client will request for this
+ * connection.
+ */
+ uint16_t maxFrameSize;
+ /**
+ * Limit the size of the connections send buffer . The buffer
+ * is limited to bounds * maxFrameSize.
+ */
+ unsigned int bounds;
+ /**
+ * If true, TCP_NODELAY will be set for the connection.
+ */
+ bool tcpNoDelay;
+ /**
+ * SASL service name
+ */
+ std::string service;
+ /**
+ * Minimum acceptable strength of any SASL negotiated security
+ * layer. 0 means no security layer required.
+ */
+ unsigned int minSsf;
+ /**
+ * Maximum acceptable strength of any SASL negotiated security
+ * layer. 0 means no security layer allowed.
+ */
+ unsigned int maxSsf;
+ /**
+ * SSL cert-name for the connection. Overrides global SSL
+ * settings. Used only when a client connects to the broker.
+ */
+ std::string sslCertName;
+};
+
+}} // namespace qpid::client
+
+#endif /*!QPID_CLIENT_CONNECTIONSETTINGS_H*/
diff --git a/qpid/cpp/include/qpid/client/FailoverListener.h b/qpid/cpp/include/qpid/client/FailoverListener.h
new file mode 100644
index 0000000000..53c7c26211
--- /dev/null
+++ b/qpid/cpp/include/qpid/client/FailoverListener.h
@@ -0,0 +1,88 @@
+#ifndef QPID_CLIENT_FAILOVERLISTENER_H
+#define QPID_CLIENT_FAILOVERLISTENER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/client/ClientImportExport.h"
+#include "qpid/client/MessageListener.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/Session.h"
+#include "qpid/client/SubscriptionManager.h"
+#include "qpid/Url.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Runnable.h"
+#include "qpid/sys/Thread.h"
+#include <vector>
+
+namespace qpid {
+namespace client {
+
+
+/**
+ * Listen for updates from the amq.failover exchange.
+ *
+ * In a cluster, the amq.failover exchange provides updates whenever
+ * the cluster membership changes. This class subscribes to the
+ * failover exchange and providees the latest list of known brokers.
+ *
+ * You can also subscribe to amq.failover yourself and use
+ * FailoverListener::decode to extract a list of broker URLs from a
+ * failover exchange message.
+ */
+class QPID_CLIENT_CLASS_EXTERN FailoverListener : private MessageListener, private qpid::sys::Runnable
+{
+ public:
+ /** The name of the standard failover exchange amq.failover */
+ static QPID_CLIENT_EXTERN const std::string AMQ_FAILOVER;
+
+ /** Extract the broker list from a failover exchange message */
+ static QPID_CLIENT_EXTERN std::vector<Url> getKnownBrokers(const Message& m);
+
+ /** Subscribe to amq.failover exchange. */
+ QPID_CLIENT_EXTERN FailoverListener(Connection);
+
+ /** Subscribe to amq.failover exchange.
+ *@param useInitial If true use the connection's initial brokers as
+ * the initial value of getKnownBrokers
+ */
+ QPID_CLIENT_EXTERN FailoverListener(Connection, bool useInitial);
+
+ QPID_CLIENT_EXTERN ~FailoverListener();
+
+ /** Returns the latest list of known broker URLs. */
+ QPID_CLIENT_EXTERN std::vector<Url> getKnownBrokers() const;
+
+ private:
+ void received(Message& msg);
+ void run();
+ void init(bool);
+
+ mutable sys::Mutex lock;
+ Connection connection;
+ Session session;
+ SubscriptionManager subscriptions;
+ sys::Thread thread;
+ std::vector<Url> knownBrokers;
+};
+}} // namespace qpid::client
+
+#endif /*!QPID_CLIENT_FAILOVERLISTENER_H*/
diff --git a/qpid/cpp/include/qpid/client/FailoverManager.h b/qpid/cpp/include/qpid/client/FailoverManager.h
new file mode 100644
index 0000000000..d3a0dbc976
--- /dev/null
+++ b/qpid/cpp/include/qpid/client/FailoverManager.h
@@ -0,0 +1,137 @@
+#ifndef QPID_CLIENT_FAILOVERMANAGER_H
+#define QPID_CLIENT_FAILOVERMANAGER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/Exception.h"
+#include "qpid/client/AsyncSession.h"
+#include "qpid/client/ClientImportExport.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/ConnectionSettings.h"
+#include "qpid/client/FailoverListener.h"
+#include "qpid/sys/Monitor.h"
+#include <vector>
+
+namespace qpid {
+namespace client {
+
+struct CannotConnectException : qpid::Exception
+{
+ CannotConnectException(const std::string& m) : qpid::Exception(m) {}
+};
+
+/**
+ * Utility to manage failover.
+ */
+class QPID_CLIENT_CLASS_EXTERN FailoverManager
+{
+ public:
+ /**
+ * Interface to implement for doing work that can be resumed on
+ * failover
+ */
+ struct Command
+ {
+ /**
+ * This method will be called with isRetry=false when the
+ * command is first executed. The session to use for the work
+ * will be passed to the implementing class. If the connection
+ * fails while the execute call is in progress, the
+ * FailoverManager controlling the execution will re-establish
+ * a connection, open a new session and call back to the
+ * Command implementations execute method with the new session
+ * and isRetry=true.
+ */
+ virtual void execute(AsyncSession& session, bool isRetry) = 0;
+ virtual ~Command() {}
+ };
+
+ struct ReconnectionStrategy
+ {
+ /**
+ * This method is called by the FailoverManager prior to
+ * establishing a connection (or re-connection) and can be
+ * used if the application wishes to edit or re-order the list
+ * which will default to the list of known brokers for the
+ * last connection.
+ */
+ virtual void editUrlList(std::vector<Url>& urls) = 0;
+ virtual ~ReconnectionStrategy() {}
+ };
+
+ /**
+ * Create a manager to control failover for a logical connection.
+ *
+ * @param settings the initial connection settings
+ * @param strategy optional stratgey callback allowing application
+ * to edit or reorder the list of urls to which reconnection is
+ * attempted
+ */
+ QPID_CLIENT_EXTERN FailoverManager(const ConnectionSettings& settings, ReconnectionStrategy* strategy = 0);
+ /**
+ * Return the current connection if open or attept to reconnect to
+ * the specified list of urls. If no list is specified the list of
+ * known brokers from the last connection will be used. If no list
+ * is specified and this is the first connect attempt, the host
+ * and port from the initial settings will be used.
+ *
+ * If the full list is tried and all attempts fail,
+ * CannotConnectException is thrown.
+ */
+ QPID_CLIENT_EXTERN Connection& connect(std::vector<Url> brokers = std::vector<Url>());
+ /**
+ * Return the current connection whether open or not
+ */
+ QPID_CLIENT_EXTERN Connection& getConnection();
+ /**
+ * Close the current connection
+ */
+ QPID_CLIENT_EXTERN void close();
+ /**
+ * Reliably execute the specified command. This involves creating
+ * a session on which to carry out the work of the command,
+ * handling failover occuring while exeuting that command and
+ * re-starting the work.
+ *
+ * Multiple concurrent threads can call execute with different
+ * commands; each thread will be allocated its own
+ * session. FailoverManager will coordinate the different threads
+ * on failover to ensure they continue to use the same logical
+ * connection.
+ */
+ QPID_CLIENT_EXTERN void execute(Command&);
+ private:
+ enum State {IDLE, CONNECTING, CANT_CONNECT};
+
+ qpid::sys::Monitor lock;
+ Connection connection;
+ std::auto_ptr<FailoverListener> failoverListener;
+ ConnectionSettings settings;
+ ReconnectionStrategy* strategy;
+ State state;
+
+ void attempt(Connection&, ConnectionSettings settings, std::vector<Url> urls);
+ void attempt(Connection&, ConnectionSettings settings);
+};
+}} // namespace qpid::client
+
+#endif /*!QPID_CLIENT_FAILOVERMANAGER_H*/
diff --git a/qpid/cpp/include/qpid/client/FlowControl.h b/qpid/cpp/include/qpid/client/FlowControl.h
new file mode 100644
index 0000000000..bff7071b3b
--- /dev/null
+++ b/qpid/cpp/include/qpid/client/FlowControl.h
@@ -0,0 +1,75 @@
+#ifndef QPID_CLIENT_FLOWCONTROL_H
+#define QPID_CLIENT_FLOWCONTROL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/sys/IntegerTypes.h>
+
+namespace qpid {
+namespace client {
+
+/**
+ * Flow control works by associating a finite amount of "credit"
+ * with a subscription.
+ *
+ * Credit includes a message count and a byte count. Each message
+ * received decreases the message count by one, and the byte count by
+ * the size of the message. Either count can have the special value
+ * UNLIMITED which is never decreased.
+ *
+ * A subscription's credit is exhausted when the message count is 0 or
+ * the byte count is too small for the next available message. The
+ * subscription will not receive any further messages until is credit
+ * is renewed.
+ *
+ * In "window mode" credit is automatically renewed when a message is
+ * completed (which by default happens when it is accepted). In
+ * non-window mode credit is not automatically renewed, it must be
+ * explicitly re-set (@see Subscription)
+ */
+struct FlowControl {
+ static const uint32_t UNLIMITED=0xFFFFFFFF;
+ FlowControl(uint32_t messages_=0, uint32_t bytes_=0, bool window_=false)
+ : messages(messages_), bytes(bytes_), window(window_) {}
+
+ static FlowControl messageCredit(uint32_t messages_) { return FlowControl(messages_,UNLIMITED,false); }
+ static FlowControl messageWindow(uint32_t messages_) { return FlowControl(messages_,UNLIMITED,true); }
+ static FlowControl byteCredit(uint32_t bytes_) { return FlowControl(UNLIMITED,bytes_,false); }
+ static FlowControl byteWindow(uint32_t bytes_) { return FlowControl(UNLIMITED,bytes_,true); }
+ static FlowControl unlimited() { return FlowControl(UNLIMITED, UNLIMITED, false); }
+ static FlowControl zero() { return FlowControl(0, 0, false); }
+
+ /** Message credit: subscription can accept up to this many messages. */
+ uint32_t messages;
+ /** Byte credit: subscription can accept up to this many bytes of message content. */
+ uint32_t bytes;
+ /** Window mode. If true credit is automatically renewed as messages are acknowledged. */
+ bool window;
+
+ bool operator==(const FlowControl& x) {
+ return messages == x.messages && bytes == x.bytes && window == x.window;
+ };
+};
+
+}} // namespace qpid::client
+
+#endif /*!QPID_CLIENT_FLOWCONTROL_H*/
diff --git a/qpid/cpp/include/qpid/client/Future.h b/qpid/cpp/include/qpid/client/Future.h
new file mode 100644
index 0000000000..630a7e03c0
--- /dev/null
+++ b/qpid/cpp/include/qpid/client/Future.h
@@ -0,0 +1,59 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef _Future_
+#define _Future_
+
+#include <boost/bind.hpp>
+#include <boost/shared_ptr.hpp>
+#include "qpid/Exception.h"
+#include "qpid/framing/SequenceNumber.h"
+#include "qpid/client/FutureCompletion.h"
+#include "qpid/client/FutureResult.h"
+#include "qpid/client/ClientImportExport.h"
+
+namespace qpid {
+namespace client {
+
+/**@internal */
+class QPID_CLIENT_CLASS_EXTERN Future
+{
+ framing::SequenceNumber command;
+ boost::shared_ptr<FutureResult> result;
+ bool complete;
+
+public:
+ Future() : complete(false) {}
+ Future(const framing::SequenceNumber& id) : command(id), complete(false) {}
+
+ std::string getResult(SessionImpl& session) {
+ if (result) return result->getResult(session);
+ else throw Exception("Result not expected");
+ }
+
+ QPID_CLIENT_EXTERN void wait(SessionImpl& session);
+ QPID_CLIENT_EXTERN bool isComplete(SessionImpl& session);
+ QPID_CLIENT_EXTERN void setFutureResult(boost::shared_ptr<FutureResult> r);
+};
+
+}}
+
+#endif
diff --git a/qpid/cpp/include/qpid/client/FutureCompletion.h b/qpid/cpp/include/qpid/client/FutureCompletion.h
new file mode 100644
index 0000000000..0970f494b7
--- /dev/null
+++ b/qpid/cpp/include/qpid/client/FutureCompletion.h
@@ -0,0 +1,49 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef _FutureCompletion_
+#define _FutureCompletion_
+
+#include "qpid/framing/amqp_framing.h"
+#include "qpid/sys/Monitor.h"
+
+namespace qpid {
+namespace client {
+
+///@internal
+class FutureCompletion
+{
+protected:
+ mutable sys::Monitor lock;
+ bool complete;
+
+public:
+ FutureCompletion();
+ virtual ~FutureCompletion(){}
+ bool isComplete() const;
+ void waitForCompletion() const;
+ void completed();
+};
+
+}}
+
+
+#endif
diff --git a/qpid/cpp/include/qpid/client/FutureResult.h b/qpid/cpp/include/qpid/client/FutureResult.h
new file mode 100644
index 0000000000..ead4929571
--- /dev/null
+++ b/qpid/cpp/include/qpid/client/FutureResult.h
@@ -0,0 +1,49 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef _FutureResult_
+#define _FutureResult_
+
+#include <string>
+
+#include "qpid/client/ClientImportExport.h"
+#include "qpid/framing/amqp_framing.h"
+#include "qpid/client/FutureCompletion.h"
+
+namespace qpid {
+namespace client {
+
+class SessionImpl;
+
+///@internal
+class QPID_CLIENT_CLASS_EXTERN FutureResult : public FutureCompletion
+{
+ std::string result;
+public:
+ QPID_CLIENT_EXTERN const std::string& getResult(SessionImpl& session) const;
+ void received(const std::string& result);
+};
+
+}}
+
+
+
+#endif
diff --git a/qpid/cpp/include/qpid/client/Handle.h b/qpid/cpp/include/qpid/client/Handle.h
new file mode 100644
index 0000000000..b8315481a9
--- /dev/null
+++ b/qpid/cpp/include/qpid/client/Handle.h
@@ -0,0 +1,71 @@
+#ifndef QPID_CLIENT_HANDLE_H
+#define QPID_CLIENT_HANDLE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/client/ClientImportExport.h"
+
+namespace qpid {
+namespace client {
+
+template <class> class PrivateImplRef;
+
+/**
+ * A handle is like a pointer: refers to an underlying implementation object.
+ * Copying the handle does not copy the object.
+ *
+ * Handles can be null, like a 0 pointer. Use isValid(), isNull() or the
+ * conversion to bool to test for a null handle.
+ */
+template <class T> class Handle {
+ public:
+
+ /**@return true if handle is valid, i.e. not null. */
+ QPID_CLIENT_INLINE_EXTERN bool isValid() const { return impl; }
+
+ /**@return true if handle is null. It is an error to call any function on a null handle. */
+ QPID_CLIENT_INLINE_EXTERN bool isNull() const { return !impl; }
+
+ /** Conversion to bool supports idiom if (handle) { handle->... } */
+ QPID_CLIENT_INLINE_EXTERN operator bool() const { return impl; }
+
+ /** Operator ! supports idiom if (!handle) { do_if_handle_is_null(); } */
+ QPID_CLIENT_INLINE_EXTERN bool operator !() const { return !impl; }
+
+ void swap(Handle<T>& h) { T* t = h.impl; h.impl = impl; impl = t; }
+
+ protected:
+ typedef T Impl;
+ QPID_CLIENT_INLINE_EXTERN Handle() :impl() {}
+
+ // Not implemented,subclasses must implement.
+ QPID_CLIENT_EXTERN Handle(const Handle&);
+ QPID_CLIENT_EXTERN Handle& operator=(const Handle&);
+
+ Impl* impl;
+
+ friend class PrivateImplRef<T>; // FIXME aconway 2009-04-30: Specify
+};
+
+}} // namespace qpid::client
+
+#endif /*!QPID_CLIENT_HANDLE_H*/
diff --git a/qpid/cpp/include/qpid/client/LocalQueue.h b/qpid/cpp/include/qpid/client/LocalQueue.h
new file mode 100644
index 0000000000..1a19a8499d
--- /dev/null
+++ b/qpid/cpp/include/qpid/client/LocalQueue.h
@@ -0,0 +1,120 @@
+#ifndef QPID_CLIENT_LOCALQUEUE_H
+#define QPID_CLIENT_LOCALQUEUE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/client/ClientImportExport.h"
+#include "qpid/client/Handle.h"
+#include "qpid/client/Message.h"
+#include "qpid/sys/Time.h"
+
+namespace qpid {
+namespace client {
+
+class LocalQueueImpl;
+template <class T> class PrivateImplRef;
+
+/**
+ * A local queue to collect messages retrieved from a remote broker
+ * queue. Create a queue and subscribe it using the SubscriptionManager.
+ * Messages from the remote queue on the broker will be stored in the
+ * local queue until you retrieve them.
+ *
+ * \ingroup clientapi
+ *
+ * \details Using a Local Queue
+ *
+ * <pre>
+ * LocalQueue local_queue;
+ * subscriptions.subscribe(local_queue, string("message_queue"));
+ * for (int i=0; i&lt;10; i++) {
+ * Message message = local_queue.get();
+ * std::cout &lt;&lt; message.getData() &lt;&lt; std::endl;
+ * }
+ * </pre>
+ *
+ * <h2>Getting Messages</h2>
+ *
+ * <ul><li>
+ * <p>get()</p>
+ * <pre>Message message = local_queue.get();</pre>
+ * <pre>// Specifying timeouts (TIME_SEC, TIME_MSEC, TIME_USEC, TIME_NSEC)
+ *#include <qpid/sys/Time.h>
+ *Message message;
+ *local_queue.get(message, 5*sys::TIME_SEC);</pre></li></ul>
+ *
+ * <h2>Checking size</h2>
+ * <ul><li>
+ * <p>empty()</p>
+ * <pre>if (local_queue.empty()) { ... }</pre></li>
+ * <li><p>size()</p>
+ * <pre>std::cout &lt;&lt; local_queue.size();</pre></li>
+ * </ul>
+ */
+
+class QPID_CLIENT_CLASS_EXTERN LocalQueue : public Handle<LocalQueueImpl> {
+ public:
+ /** Create a local queue. Subscribe the local queue to a remote broker
+ * queue with a SubscriptionManager.
+ *
+ * LocalQueue is an alternative to implementing a MessageListener.
+ */
+ QPID_CLIENT_EXTERN LocalQueue();
+ QPID_CLIENT_EXTERN LocalQueue(const LocalQueue&);
+ QPID_CLIENT_EXTERN ~LocalQueue();
+ QPID_CLIENT_EXTERN LocalQueue& operator=(const LocalQueue&);
+
+ /** Wait up to timeout for the next message from the local queue.
+ *@param result Set to the message from the queue.
+ *@param timeout wait up this timeout for a message to appear.
+ *@return true if result was set, false if queue was empty after timeout.
+ */
+ QPID_CLIENT_EXTERN bool get(Message& result, sys::Duration timeout=0);
+
+ /** Get the next message off the local queue, or wait up to the timeout
+ * for message from the broker queue.
+ *@param timeout wait up this timeout for a message to appear.
+ *@return message from the queue.
+ *@throw ClosedException if subscription is closed or timeout exceeded.
+ */
+ QPID_CLIENT_EXTERN Message get(sys::Duration timeout=sys::TIME_INFINITE);
+
+ /** Synonym for get() */
+ QPID_CLIENT_EXTERN Message pop(sys::Duration timeout=sys::TIME_INFINITE);
+
+ /** Return true if local queue is empty. */
+ QPID_CLIENT_EXTERN bool empty() const;
+
+ /** Number of messages on the local queue */
+ QPID_CLIENT_EXTERN size_t size() const;
+
+ LocalQueue(LocalQueueImpl*); ///<@internal
+
+
+ private:
+ typedef LocalQueueImpl Impl;
+ friend class PrivateImplRef<LocalQueue>;
+};
+
+}} // namespace qpid::client
+
+#endif /*!QPID_CLIENT_LOCALQUEUE_H*/
diff --git a/qpid/cpp/include/qpid/client/Message.h b/qpid/cpp/include/qpid/client/Message.h
new file mode 100644
index 0000000000..ba50dda9ba
--- /dev/null
+++ b/qpid/cpp/include/qpid/client/Message.h
@@ -0,0 +1,175 @@
+#ifndef QPID_CLIENT_MESSAGE_H
+#define QPID_CLIENT_MESSAGE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/client/ClientImportExport.h"
+#include "qpid/framing/MessageProperties.h"
+#include "qpid/framing/DeliveryProperties.h"
+#include <string>
+
+namespace qpid {
+
+namespace framing {
+class FieldTable;
+class SequenceNumber; // FIXME aconway 2009-04-17: remove with getID?
+}
+
+namespace client {
+
+class MessageImpl;
+
+/**
+ * A message sent to or received from the broker.
+ *
+ * \ingroup clientapi
+ * \details
+ *
+ * <h2>Getting and setting message contents</h2>
+ *
+ * <ul>
+ * <li>
+ * <p>getData()</p>
+ * <pre>std::cout &lt;&lt; "Response: " &lt;&lt; message.getData() &lt;&lt; std::endl;</pre>
+ * </li>
+ * <li>
+ * <p>setData()</p>
+ * <pre>message.setData("That's all, folks!");</pre></li>
+ * <li>
+ * <p>appendData()</p>
+ * <pre>message.appendData(" ... let's add a bit more ...");</pre></li>
+ * </ul>
+ *
+ * <h2>Getting and Setting Delivery Properties</h2>
+ *
+ * <ul>
+ * <li>
+ * <p>getDeliveryProperties()</p>
+ * <pre>message.getDeliveryProperties().setRoutingKey("control");</pre>
+ * <pre>message.getDeliveryProperties().setDeliveryMode(PERSISTENT);</pre>
+ * <pre>message.getDeliveryProperties().setPriority(9);</pre>
+ * <pre>message.getDeliveryProperties().setTtl(100);</pre></li>
+ *
+ * <li>
+ * <p>hasDeliveryProperties()</p>
+ * <pre>if (! message.hasDeliveryProperties()) {
+ * ...
+ *}</pre></li>
+ * </ul>
+ *
+ * <h2>Getting and Setting Message Properties</h2>
+ *
+ * <ul>
+ * <li>
+ * <p>getMessageProperties()</p>
+ * <pre>
+ *request.getMessageProperties().setReplyTo(ReplyTo("amq.direct", response_queue.str()));
+ * </pre>
+ * <pre>
+ *routingKey = request.getMessageProperties().getReplyTo().getRoutingKey();
+ *exchange = request.getMessageProperties().getReplyTo().getExchange();
+ * </pre>
+ * <pre>message.getMessageProperties().setContentType("text/plain");</pre>
+ * <pre>message.getMessageProperties().setContentEncoding("text/plain");</pre>
+ * </li>
+ * <li>
+ * <p>hasMessageProperties()</p>
+ * <pre>request.getMessageProperties().hasReplyTo();</pre>
+ * </li>
+ * </ul>
+ *
+ * <h2>Getting and Setting Application Headers</h2>
+ *
+ * <ul>
+ * <li>
+ * <p>getHeaders()</p>
+ * <pre>
+ *message.getHeaders().getString("control");
+ * </pre>
+ * <pre>
+ *message.getHeaders().setString("control","continue");
+ * </pre></li>
+ * </ul>
+ *
+ *
+ */
+class QPID_CLIENT_CLASS_EXTERN Message
+{
+public:
+ /** Create a Message.
+ *@param data Data for the message body.
+ *@param routingKey Passed to the exchange that routes the message.
+ */
+ QPID_CLIENT_EXTERN Message(
+ const std::string& data=std::string(),
+ const std::string& routingKey=std::string());
+ Message(MessageImpl*); ///< @internal
+ QPID_CLIENT_EXTERN Message(const Message&);
+ QPID_CLIENT_EXTERN ~Message();
+ QPID_CLIENT_EXTERN Message& operator=(const Message&);
+ QPID_CLIENT_EXTERN void swap(Message&);
+
+ QPID_CLIENT_EXTERN void setData(const std::string&);
+ QPID_CLIENT_EXTERN const std::string& getData() const;
+ QPID_CLIENT_EXTERN std::string& getData();
+
+ QPID_CLIENT_EXTERN void appendData(const std::string&);
+
+ QPID_CLIENT_EXTERN bool hasMessageProperties() const;
+ QPID_CLIENT_EXTERN framing::MessageProperties& getMessageProperties();
+ QPID_CLIENT_EXTERN const framing::MessageProperties& getMessageProperties() const;
+
+ QPID_CLIENT_EXTERN bool hasDeliveryProperties() const;
+ QPID_CLIENT_EXTERN framing::DeliveryProperties& getDeliveryProperties();
+ QPID_CLIENT_EXTERN const framing::DeliveryProperties& getDeliveryProperties() const;
+
+
+ /** The destination of messages sent to the broker is the exchange
+ * name. The destination of messages received from the broker is
+ * the delivery tag identifyig the local subscription (often this
+ * is the name of the subscribed queue.)
+ */
+ QPID_CLIENT_EXTERN std::string getDestination() const;
+
+ /** Check the redelivered flag. */
+ QPID_CLIENT_EXTERN bool isRedelivered() const;
+ /** Set the redelivered flag. */
+ QPID_CLIENT_EXTERN void setRedelivered(bool redelivered);
+
+ /** Get a modifyable reference to the message headers. */
+ QPID_CLIENT_EXTERN framing::FieldTable& getHeaders();
+
+ /** Get a non-modifyable reference to the message headers. */
+ QPID_CLIENT_EXTERN const framing::FieldTable& getHeaders() const;
+
+ // FIXME aconway 2009-04-17: does this need to be in public API?
+ ///@internal
+ QPID_CLIENT_EXTERN const framing::SequenceNumber& getId() const;
+
+ private:
+ MessageImpl* impl;
+ friend class MessageImpl; // Helper template for implementation
+};
+
+}}
+
+#endif /*!QPID_CLIENT_MESSAGE_H*/
diff --git a/qpid/cpp/include/qpid/client/MessageListener.h b/qpid/cpp/include/qpid/client/MessageListener.h
new file mode 100644
index 0000000000..3ca2fa964a
--- /dev/null
+++ b/qpid/cpp/include/qpid/client/MessageListener.h
@@ -0,0 +1,101 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <string>
+#include "qpid/client/ClientImportExport.h"
+
+#ifndef _MessageListener_
+#define _MessageListener_
+
+#include "qpid/client/Message.h"
+
+namespace qpid {
+namespace client {
+
+ /**
+ * Implement a subclass of MessageListener and subscribe it using
+ * the SubscriptionManager to receive messages.
+ *
+ * Another way to receive messages is by using a LocalQueue.
+ *
+ * \ingroup clientapi
+ * \details
+ *
+ * <h2>Using a MessageListener</h2>
+ *
+ * <ul>
+ * <li>
+ * <p>The received() function is called when a message arrives:</p>
+ * <pre>virtual void received(Message&amp; message)=0;</pre>
+ * </li>
+ * <li>
+ * <p>Derive your own listener, implement the received() function:</p>
+ * <pre>
+ * class Listener : public MessageListener {
+ * private:
+ * SubscriptionManager&amp; subscriptions;
+ * public:
+ * Listener(SubscriptionManager&amp; subscriptions);
+ * virtual void received(Message&amp; message);
+ * };
+ *
+ * Listener::Listener(SubscriptionManager&amp; subs) : subscriptions(subs)
+ * {}
+ *
+ * void Listener::received(Message&amp; message) {
+ * std::cout &lt;&lt; "Message: " &lt;&lt; message.getData() &lt;&lt; std::endl;
+ * if (message.getData() == "That's all, folks!") {
+ * std::cout &lt;&lt; "Shutting down listener for " &lt;&lt; message.getDestination()
+ * &lt;&lt; std::endl;
+ * subscriptions.cancel(message.getDestination());
+ * }
+ * }
+ *</pre>
+ * <pre>
+ * SubscriptionManager subscriptions(session);
+ *
+ * // Create a listener and subscribe it to the queue named "message_queue"
+ * Listener listener(subscriptions);
+ * subscriptions.subscribe(listener, "message_queue");
+ *
+ * // Receive messages until the subscription is cancelled
+ * // by Listener::received()
+ * subscriptions.run();
+ * </pre>
+ * </li>
+ * </ul>
+ *
+ */
+
+ class QPID_CLIENT_CLASS_EXTERN MessageListener{
+ public:
+ QPID_CLIENT_EXTERN virtual ~MessageListener();
+
+ /** Called for each message arriving from the broker. Override
+ * in your own subclass to process messages.
+ */
+ virtual void received(Message& msg) = 0;
+ };
+
+}
+}
+
+
+#endif
diff --git a/qpid/cpp/include/qpid/client/MessageReplayTracker.h b/qpid/cpp/include/qpid/client/MessageReplayTracker.h
new file mode 100644
index 0000000000..06a3f29c7d
--- /dev/null
+++ b/qpid/cpp/include/qpid/client/MessageReplayTracker.h
@@ -0,0 +1,73 @@
+#ifndef QPID_CLIENT_MESSAGEREPLAYTRACKER_H
+#define QPID_CLIENT_MESSAGEREPLAYTRACKER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/client/AsyncSession.h"
+#include "qpid/client/Message.h"
+#include "qpid/client/ClientImportExport.h"
+#include <list>
+#include <string>
+
+namespace qpid {
+namespace client {
+
+/**
+ * Utility to track messages sent asynchronously, allowing those that
+ * are indoubt to be replayed over a new session.
+ */
+class QPID_CLIENT_CLASS_EXTERN MessageReplayTracker
+{
+ public:
+ QPID_CLIENT_EXTERN MessageReplayTracker(uint flushInterval);
+ QPID_CLIENT_EXTERN void send(const Message& message, const std::string& destination = "");
+ QPID_CLIENT_EXTERN void init(AsyncSession session);
+ QPID_CLIENT_EXTERN void replay(AsyncSession session);
+ QPID_CLIENT_EXTERN void setFlushInterval(uint interval);
+ QPID_CLIENT_EXTERN uint getFlushInterval();
+ QPID_CLIENT_EXTERN void checkCompletion();
+
+ template <class F> void foreach(F& f) {
+ for (std::list<ReplayRecord>::const_iterator i = buffer.begin(); i != buffer.end(); i++) {
+ f(i->message);
+ }
+ }
+
+ private:
+ struct ReplayRecord
+ {
+ Completion status;
+ Message message;
+ std::string destination;
+
+ ReplayRecord(const Message& message, const std::string& destination);
+ void send(MessageReplayTracker&);
+ bool isComplete();
+ };
+
+ AsyncSession session;
+ uint flushInterval;
+ uint count;
+ std::list<ReplayRecord> buffer;
+};
+}} // namespace qpid::client
+
+#endif /*!QPID_CLIENT_MESSAGEREPLAYTRACKER_H*/
diff --git a/qpid/cpp/include/qpid/client/QueueOptions.h b/qpid/cpp/include/qpid/client/QueueOptions.h
new file mode 100644
index 0000000000..3984b63fdd
--- /dev/null
+++ b/qpid/cpp/include/qpid/client/QueueOptions.h
@@ -0,0 +1,129 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/client/ClientImportExport.h"
+#include "qpid/framing/FieldTable.h"
+
+#ifndef _QueueOptions_
+#define _QueueOptions_
+
+namespace qpid {
+namespace client {
+
+enum QueueSizePolicy {NONE, REJECT, FLOW_TO_DISK, RING, RING_STRICT};
+enum QueueOrderingPolicy {FIFO, LVQ, LVQ_NO_BROWSE};
+
+/**
+ * A help class to set options on the Queue. Create a configured args while
+ * still allowing any custom configuration via the FieldTable base class
+ */
+class QPID_CLIENT_CLASS_EXTERN QueueOptions: public framing::FieldTable
+{
+ public:
+ QPID_CLIENT_EXTERN QueueOptions();
+ QPID_CLIENT_EXTERN virtual ~QueueOptions();
+
+ /**
+ * Sets the queue sizing policy
+ *
+ * @param sp SizePolicy
+ * REJECT - reject if queue greater than size/count
+ * FLOW_TO_DISK - page messages to disk from this point is greater than size/count
+ * RING - limit the queue to size/count and over-write old messages round a ring
+ * RING_STRICT - limit the queue to size/count and reject is head == tail
+ * NONE - Use default broker sizing policy
+ * @param maxSize Set the max number of bytes for the sizing policies
+ * @param setMaxCount Set the max number of messages for the sizing policies
+ */
+ QPID_CLIENT_EXTERN void setSizePolicy(QueueSizePolicy sp, uint64_t maxSize, uint32_t maxCount );
+
+ /**
+ * Enables the persisting of a queue to the store module when a cluster fails down to it's last
+ * node. Does so optimistically. Will start persisting when cluster count >1 again.
+ */
+ QPID_CLIENT_EXTERN void setPersistLastNode();
+
+ /**
+ * Sets the odering policy on the Queue, default ordering is FIFO.
+ */
+ QPID_CLIENT_EXTERN void setOrdering(QueueOrderingPolicy op);
+
+ /**
+ * Use broker defualt sizing ploicy
+ */
+ QPID_CLIENT_EXTERN void clearSizePolicy();
+
+ /**
+ * Clear Persist Last Node Policy
+ */
+ QPID_CLIENT_EXTERN void clearPersistLastNode();
+
+ /**
+ * get the key used match LVQ in args for message transfer
+ */
+ QPID_CLIENT_EXTERN void getLVQKey(std::string& key);
+
+ /**
+ * Use default odering policy
+ */
+ QPID_CLIENT_EXTERN void clearOrdering();
+
+ /**
+ * Turns on event generation for this queue (either enqueue only
+ * or for enqueue and dequeue events); the events can then be
+ * processed by a regsitered broker plugin.
+ *
+ * DEPRECATED
+ *
+ * This is confusing to anyone who sees only the function call
+ * and not the variable name / doxygen. Consider the following call:
+ *
+ * options.enableQueueEvents(false);
+ *
+ * It looks like it disables queue events, but what it really does is
+ * enable both enqueue and dequeue events.
+ *
+ * Use setInt() instead:
+ *
+ * options.setInt("qpid.queue_event_generation", 2);
+ */
+
+ QPID_CLIENT_EXTERN void enableQueueEvents(bool enqueueOnly);
+
+ static QPID_CLIENT_EXTERN const std::string strMaxCountKey;
+ static QPID_CLIENT_EXTERN const std::string strMaxSizeKey;
+ static QPID_CLIENT_EXTERN const std::string strTypeKey;
+ static QPID_CLIENT_EXTERN const std::string strREJECT;
+ static QPID_CLIENT_EXTERN const std::string strFLOW_TO_DISK;
+ static QPID_CLIENT_EXTERN const std::string strRING;
+ static QPID_CLIENT_EXTERN const std::string strRING_STRICT;
+ static QPID_CLIENT_EXTERN const std::string strLastValueQueue;
+ static QPID_CLIENT_EXTERN const std::string strPersistLastNode;
+ static QPID_CLIENT_EXTERN const std::string strLVQMatchProperty;
+ static QPID_CLIENT_EXTERN const std::string strLastValueQueueNoBrowse;
+ static QPID_CLIENT_EXTERN const std::string strQueueEventMode;
+};
+
+}
+}
+
+
+#endif
diff --git a/qpid/cpp/include/qpid/client/Session.h b/qpid/cpp/include/qpid/client/Session.h
new file mode 100644
index 0000000000..c40549bbc5
--- /dev/null
+++ b/qpid/cpp/include/qpid/client/Session.h
@@ -0,0 +1,39 @@
+#ifndef QPID_CLIENT_SESSION_H
+#define QPID_CLIENT_SESSION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/client/Session_0_10.h"
+
+namespace qpid {
+namespace client {
+
+/**
+ * Session is an alias for Session_0_10
+ *
+ * \ingroup clientapi
+ */
+typedef Session_0_10 Session;
+
+
+}} // namespace qpid::client
+
+#endif /*!QPID_CLIENT_SESSION_H*/
diff --git a/qpid/cpp/include/qpid/client/SessionBase_0_10.h b/qpid/cpp/include/qpid/client/SessionBase_0_10.h
new file mode 100644
index 0000000000..ea50ab32f7
--- /dev/null
+++ b/qpid/cpp/include/qpid/client/SessionBase_0_10.h
@@ -0,0 +1,110 @@
+#ifndef QPID_CLIENT_SESSIONBASE_H
+#define QPID_CLIENT_SESSIONBASE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/SessionId.h"
+#include "qpid/framing/amqp_structs.h"
+#include "qpid/client/Message.h"
+#include "qpid/client/Completion.h"
+#include "qpid/client/TypedResult.h"
+#include "qpid/client/ClientImportExport.h"
+#include <string>
+
+namespace qpid {
+namespace client {
+
+class Connection;
+class SessionImpl;
+
+using std::string;
+using framing::Content;
+using framing::FieldTable;
+using framing::SequenceNumber;
+using framing::SequenceSet;
+using framing::SequenceNumberSet;
+using qpid::SessionId;
+using framing::Xid;
+
+/** Unit of message credit: messages or bytes */
+enum CreditUnit { MESSAGE_CREDIT=0, BYTE_CREDIT=1, UNLIMITED_CREDIT=0xFFFFFFFF };
+
+/**
+ * Base class for handles to an AMQP session.
+ *
+ * Subclasses provide the AMQP commands for a given
+ * version of the protocol.
+ */
+class QPID_CLIENT_CLASS_EXTERN SessionBase_0_10 {
+ public:
+
+ ///@internal
+ QPID_CLIENT_EXTERN SessionBase_0_10();
+ QPID_CLIENT_EXTERN ~SessionBase_0_10();
+
+ /** Get the session ID */
+ QPID_CLIENT_EXTERN SessionId getId() const;
+
+ /** Close the session.
+ * A session is automatically closed when all handles to it are destroyed.
+ */
+ QPID_CLIENT_EXTERN void close();
+
+ /**
+ * Synchronize the session: sync() waits until all commands issued
+ * on this session so far have been completed by the broker.
+ *
+ * Note sync() is always synchronous, even on an AsyncSession object
+ * because that's almost always what you want. You can call
+ * AsyncSession::executionSync() directly in the unusual event
+ * that you want to do an asynchronous sync.
+ */
+ QPID_CLIENT_EXTERN void sync();
+
+ /** Set the timeout for this session. */
+ QPID_CLIENT_EXTERN uint32_t timeout(uint32_t seconds);
+
+ /** Suspend the session - detach it from its connection */
+ QPID_CLIENT_EXTERN void suspend();
+
+ /** Resume a suspended session with a new connection */
+ QPID_CLIENT_EXTERN void resume(Connection);
+
+ /** Get the channel associated with this session */
+ QPID_CLIENT_EXTERN uint16_t getChannel() const;
+
+ QPID_CLIENT_EXTERN void flush();
+ QPID_CLIENT_EXTERN void markCompleted(const framing::SequenceSet& ids, bool notifyPeer);
+ QPID_CLIENT_EXTERN void markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer);
+ QPID_CLIENT_EXTERN void sendCompletion();
+
+ QPID_CLIENT_EXTERN bool isValid() const;
+
+ QPID_CLIENT_EXTERN Connection getConnection();
+ protected:
+ boost::shared_ptr<SessionImpl> impl;
+ friend class SessionBase_0_10Access;
+};
+
+}} // namespace qpid::client
+
+#endif /*!QPID_CLIENT_SESSIONBASE_H*/
diff --git a/qpid/cpp/include/qpid/client/Subscription.h b/qpid/cpp/include/qpid/client/Subscription.h
new file mode 100644
index 0000000000..bb9b98e8ff
--- /dev/null
+++ b/qpid/cpp/include/qpid/client/Subscription.h
@@ -0,0 +1,123 @@
+#ifndef QPID_CLIENT_SUBSCRIPTION_H
+#define QPID_CLIENT_SUBSCRIPTION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/client/Handle.h"
+#include "qpid/client/Session.h"
+#include "qpid/client/SubscriptionSettings.h"
+#include "qpid/client/Message.h"
+#include "qpid/client/ClientImportExport.h"
+
+namespace qpid {
+namespace client {
+
+template <class> class PrivateImplRef;
+class SubscriptionImpl;
+class SubscriptionManager;
+
+/**
+ * A handle to an active subscription. Provides methods to query the subscription status
+ * and control acknowledgement (acquire and accept) of messages.
+ */
+class QPID_CLIENT_CLASS_EXTERN Subscription : public Handle<SubscriptionImpl> {
+ public:
+ QPID_CLIENT_EXTERN Subscription(SubscriptionImpl* = 0);
+ QPID_CLIENT_EXTERN Subscription(const Subscription&);
+ QPID_CLIENT_EXTERN ~Subscription();
+ QPID_CLIENT_EXTERN Subscription& operator=(const Subscription&);
+
+
+ /** The name of the subscription, used as the "destination" for messages from the broker.
+ * Usually the same as the queue name but can be set differently.
+ */
+ QPID_CLIENT_EXTERN std::string getName() const;
+
+ /** Name of the queue this subscription subscribes to */
+ QPID_CLIENT_EXTERN std::string getQueue() const;
+
+ /** Get the flow control and acknowledgement settings for this subscription */
+ QPID_CLIENT_EXTERN const SubscriptionSettings& getSettings() const;
+
+ /** Set the flow control parameters */
+ QPID_CLIENT_EXTERN void setFlowControl(const FlowControl&);
+
+ /** Automatically acknowledge (acquire and accept) batches of n messages.
+ * You can disable auto-acknowledgement by setting n=0, and use acquire() and accept()
+ * to manually acquire and accept messages.
+ */
+ QPID_CLIENT_EXTERN void setAutoAck(unsigned int n);
+
+ /** Get the set of ID's for messages received by this subscription but not yet acquired.
+ * This will always be empty if getSettings().acquireMode=ACQUIRE_MODE_PRE_ACQUIRED
+ */
+ QPID_CLIENT_EXTERN SequenceSet getUnacquired() const;
+
+ /** Get the set of ID's for messages received by this subscription but not yet accepted. */
+ QPID_CLIENT_EXTERN SequenceSet getUnaccepted() const;
+
+ /** Acquire messageIds and remove them from the unacquired set.
+ * oAdd them to the unaccepted set if getSettings().acceptMode == ACCEPT_MODE_EXPLICIT.
+ */
+ QPID_CLIENT_EXTERN void acquire(const SequenceSet& messageIds);
+
+ /** Accept messageIds and remove them from the unaccepted set.
+ *@pre messageIds is a subset of getUnaccepted()
+ */
+ QPID_CLIENT_EXTERN void accept(const SequenceSet& messageIds);
+
+ /** Release messageIds and remove them from the unaccepted set.
+ *@pre messageIds is a subset of getUnaccepted()
+ */
+ QPID_CLIENT_EXTERN void release(const SequenceSet& messageIds);
+
+ /* Acquire a single message */
+ QPID_CLIENT_INLINE_EXTERN void acquire(const Message& m) { acquire(SequenceSet(m.getId())); }
+
+ /* Accept a single message */
+ QPID_CLIENT_INLINE_EXTERN void accept(const Message& m) { accept(SequenceSet(m.getId())); }
+
+ /* Release a single message */
+ QPID_CLIENT_INLINE_EXTERN void release(const Message& m) { release(SequenceSet(m.getId())); }
+
+ /** Get the session associated with this subscription */
+ QPID_CLIENT_EXTERN Session getSession() const;
+
+ /** Get the subscription manager associated with this subscription */
+ QPID_CLIENT_EXTERN SubscriptionManager getSubscriptionManager();
+
+ /** Cancel the subscription. */
+ QPID_CLIENT_EXTERN void cancel();
+
+ /** Grant the specified amount of message credit */
+ QPID_CLIENT_EXTERN void grantMessageCredit(uint32_t);
+
+ /** Grant the specified amount of byte credit */
+ QPID_CLIENT_EXTERN void grantByteCredit(uint32_t);
+
+ private:
+ friend class PrivateImplRef<Subscription>;
+ friend class SubscriptionManager;
+};
+}} // namespace qpid::client
+
+#endif /*!QPID_CLIENT_SUBSCRIPTION_H*/
diff --git a/qpid/cpp/include/qpid/client/SubscriptionManager.h b/qpid/cpp/include/qpid/client/SubscriptionManager.h
new file mode 100644
index 0000000000..b69819a8ff
--- /dev/null
+++ b/qpid/cpp/include/qpid/client/SubscriptionManager.h
@@ -0,0 +1,292 @@
+#ifndef QPID_CLIENT_SUBSCRIPTIONMANAGER_H
+#define QPID_CLIENT_SUBSCRIPTIONMANAGER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/client/Session.h"
+#include "qpid/client/Subscription.h"
+#include "qpid/sys/Runnable.h"
+#include "qpid/sys/Thread.h"
+#include "qpid/client/ClientImportExport.h"
+#include "qpid/client/MessageListener.h"
+#include "qpid/client/LocalQueue.h"
+#include "qpid/client/Handle.h"
+#include <string>
+
+namespace qpid {
+namespace client {
+
+class SubscriptionManagerImpl;
+
+/**
+ * A class to help create and manage subscriptions.
+ *
+ * Set up your subscriptions, then call run() to have messages
+ * delivered.
+ *
+ * \ingroup clientapi
+ *
+ * \details
+ *
+ * <h2>Subscribing and canceling subscriptions</h2>
+ *
+ * <ul>
+ * <li>
+ * <p>subscribe()</p>
+ * <pre> SubscriptionManager subscriptions(session);
+ * Listener listener(subscriptions);
+ * subscriptions.subscribe(listener, myQueue);</pre>
+ * <pre> SubscriptionManager subscriptions(session);
+ * LocalQueue local_queue;
+ * subscriptions.subscribe(local_queue, string("message_queue"));</pre></li>
+ * <li>
+ * <p>cancel()</p>
+ * <pre>subscriptions.cancel();</pre></li>
+ * </ul>
+ *
+ * <h2>Waiting for messages (and returning)</h2>
+ *
+ * <ul>
+ * <li>
+ * <p>run()</p>
+ * <pre> // Give up control to receive messages
+ * subscriptions.run();</pre></li>
+ * <li>
+ * <p>stop()</p>
+ * <pre>.// Use this code in a listener to return from run()
+ * subscriptions.stop();</pre></li>
+ * <li>
+ * <p>setAutoStop()</p>
+ * <pre>.// Return from subscriptions.run() when last subscription is cancelled
+ *.subscriptions.setAutoStop(true);
+ *.subscriptons.run();
+ * </pre></li>
+ * <li>
+ * <p>Ending a subscription in a listener</p>
+ * <pre>
+ * void Listener::received(Message&amp; message) {
+ *
+ * if (message.getData() == "That's all, folks!") {
+ * subscriptions.cancel(message.getDestination());
+ * }
+ * }
+ * </pre>
+ * </li>
+ * </ul>
+ *
+ */
+class QPID_CLIENT_CLASS_EXTERN SubscriptionManager : public sys::Runnable, public Handle<SubscriptionManagerImpl>
+{
+ public:
+ /** Create a new SubscriptionManager associated with a session */
+ QPID_CLIENT_EXTERN SubscriptionManager(const Session& session);
+ QPID_CLIENT_EXTERN SubscriptionManager(const SubscriptionManager&);
+ QPID_CLIENT_EXTERN ~SubscriptionManager();
+ QPID_CLIENT_EXTERN SubscriptionManager& operator=(const SubscriptionManager&);
+
+ /**
+ * Subscribe a MessagesListener to receive messages from queue.
+ *
+ * Provide your own subclass of MessagesListener to process
+ * incoming messages. It will be called for each message received.
+ *
+ *@param listener Listener object to receive messages.
+ *@param queue Name of the queue to subscribe to.
+ *@param settings settings for the subscription.
+ *@param name unique destination name for the subscription, defaults to queue name.
+ */
+ QPID_CLIENT_EXTERN Subscription subscribe(MessageListener& listener,
+ const std::string& queue,
+ const SubscriptionSettings& settings,
+ const std::string& name=std::string());
+
+ /**
+ * Subscribe a LocalQueue to receive messages from queue.
+ *
+ * Incoming messages are stored in the queue for you to retrieve.
+ *
+ *@param queue Name of the queue to subscribe to.
+ *@param flow initial FlowControl for the subscription.
+ *@param name unique destination name for the subscription, defaults to queue name.
+ * If not specified, the queue name is used.
+ */
+ QPID_CLIENT_EXTERN Subscription subscribe(LocalQueue& localQueue,
+ const std::string& queue,
+ const SubscriptionSettings& settings,
+ const std::string& name=std::string());
+
+ /**
+ * Subscribe a MessagesListener to receive messages from queue.
+ *
+ * Provide your own subclass of MessagesListener to process
+ * incoming messages. It will be called for each message received.
+ *
+ *@param listener Listener object to receive messages.
+ *@param queue Name of the queue to subscribe to.
+ *@param name unique destination name for the subscription, defaults to queue name.
+ * If not specified, the queue name is used.
+ */
+ QPID_CLIENT_EXTERN Subscription subscribe(MessageListener& listener,
+ const std::string& queue,
+ const std::string& name=std::string());
+
+ /**
+ * Subscribe a LocalQueue to receive messages from queue.
+ *
+ * Incoming messages are stored in the queue for you to retrieve.
+ *
+ *@param queue Name of the queue to subscribe to.
+ *@param name unique destination name for the subscription, defaults to queue name.
+ * If not specified, the queue name is used.
+ */
+ QPID_CLIENT_EXTERN Subscription subscribe(LocalQueue& localQueue,
+ const std::string& queue,
+ const std::string& name=std::string());
+
+
+ /** Get a single message from a queue.
+ * (Note: this currently uses a subscription per invocation and is
+ * thus relatively expensive. The subscription is cancelled as
+ * part of each call which can trigger auto-deletion).
+ *@param result is set to the message from the queue.
+ *@param timeout wait up this timeout for a message to appear.
+ *@return true if result was set, false if no message available after timeout.
+ */
+ QPID_CLIENT_EXTERN bool get(Message& result, const std::string& queue, sys::Duration timeout=0);
+
+ /** Get a single message from a queue.
+ * (Note: this currently uses a subscription per invocation and is
+ * thus relatively expensive. The subscription is cancelled as
+ * part of each call which can trigger auto-deletion).
+ *@param timeout wait up this timeout for a message to appear.
+ *@return message from the queue.
+ *@throw Exception if the timeout is exceeded.
+ */
+ QPID_CLIENT_EXTERN Message get(const std::string& queue, sys::Duration timeout=sys::TIME_INFINITE);
+
+ /** Get a subscription by name.
+ *@throw Exception if not found.
+ */
+ QPID_CLIENT_EXTERN Subscription getSubscription(const std::string& name) const;
+
+ /** Cancel a subscription. See also: Subscription.cancel() */
+ QPID_CLIENT_EXTERN void cancel(const std::string& name);
+
+ /** Deliver messages in the current thread until stop() is called.
+ * Only one thread may be running in a SubscriptionManager at a time.
+ * @see run
+ */
+ QPID_CLIENT_EXTERN void run();
+
+ /** Start a new thread to deliver messages.
+ * Only one thread may be running in a SubscriptionManager at a time.
+ * @see start
+ */
+ QPID_CLIENT_EXTERN void start();
+
+ /**
+ * Wait for the thread started by a call to start() to complete.
+ */
+ QPID_CLIENT_EXTERN void wait();
+
+ /** If set true, run() will stop when all subscriptions
+ * are cancelled. If false, run will only stop when stop()
+ * is called. True by default.
+ */
+ QPID_CLIENT_EXTERN void setAutoStop(bool set=true);
+
+ /** Stop delivery. Causes run() to return, or the thread started with start() to exit. */
+ QPID_CLIENT_EXTERN void stop();
+
+ static const uint32_t UNLIMITED=0xFFFFFFFF;
+
+ /** Set the flow control for a subscription. */
+ QPID_CLIENT_EXTERN void setFlowControl(const std::string& name, const FlowControl& flow);
+
+ /** Set the flow control for a subscription.
+ *@param name: name of the subscription.
+ *@param messages: message credit.
+ *@param bytes: byte credit.
+ *@param window: if true use window-based flow control.
+ */
+ QPID_CLIENT_EXTERN void setFlowControl(const std::string& name, uint32_t messages, uint32_t bytes, bool window=true);
+
+ /** Set the default settings for subscribe() calls that don't
+ * include a SubscriptionSettings parameter.
+ */
+ QPID_CLIENT_EXTERN void setDefaultSettings(const SubscriptionSettings& s);
+
+ /** Get the default settings for subscribe() calls that don't
+ * include a SubscriptionSettings parameter.
+ */
+ QPID_CLIENT_EXTERN const SubscriptionSettings& getDefaultSettings() const;
+
+ /** Get the default settings for subscribe() calls that don't
+ * include a SubscriptionSettings parameter.
+ */
+ QPID_CLIENT_EXTERN SubscriptionSettings& getDefaultSettings();
+
+ /**
+ * Set the default flow control settings for subscribe() calls
+ * that don't include a SubscriptionSettings parameter.
+ *
+ *@param messages: message credit.
+ *@param bytes: byte credit.
+ *@param window: if true use window-based flow control.
+ */
+ QPID_CLIENT_EXTERN void setFlowControl(uint32_t messages, uint32_t bytes, bool window=true);
+
+ /**
+ *Set the default accept-mode for subscribe() calls that don't
+ *include a SubscriptionSettings parameter.
+ */
+ QPID_CLIENT_EXTERN void setAcceptMode(AcceptMode mode);
+
+ /**
+ * Set the default acquire-mode subscribe()s that don't specify SubscriptionSettings.
+ */
+ QPID_CLIENT_EXTERN void setAcquireMode(AcquireMode mode);
+
+ QPID_CLIENT_EXTERN void registerFailoverHandler ( boost::function<void ()> fh );
+
+ QPID_CLIENT_EXTERN Session getSession() const;
+
+ SubscriptionManager(SubscriptionManagerImpl*); ///<@internal
+
+ private:
+ typedef SubscriptionManagerImpl Impl;
+ friend class PrivateImplRef<SubscriptionManager>;
+};
+
+/** AutoCancel cancels a subscription in its destructor */
+class AutoCancel {
+ public:
+ AutoCancel(SubscriptionManager& sm_, const std::string& tag_) : sm(sm_), tag(tag_) {}
+ ~AutoCancel() { sm.cancel(tag); }
+ private:
+ SubscriptionManager& sm;
+ std::string tag;
+};
+
+}} // namespace qpid::client
+
+#endif /*!QPID_CLIENT_SUBSCRIPTIONMANAGER_H*/
diff --git a/qpid/cpp/include/qpid/client/SubscriptionSettings.h b/qpid/cpp/include/qpid/client/SubscriptionSettings.h
new file mode 100644
index 0000000000..b4cb302b56
--- /dev/null
+++ b/qpid/cpp/include/qpid/client/SubscriptionSettings.h
@@ -0,0 +1,123 @@
+#ifndef QPID_CLIENT_SUBSCRIPTIONSETTINGS_H
+#define QPID_CLIENT_SUBSCRIPTIONSETTINGS_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/client/FlowControl.h"
+#include "qpid/framing/enum.h"
+
+namespace qpid {
+namespace client {
+
+/** Bring AMQP enum definitions for message class into this namespace. */
+using namespace qpid::framing::message;
+
+enum CompletionMode {
+ MANUAL_COMPLETION = 0,
+ COMPLETE_ON_DELIVERY = 1,
+ COMPLETE_ON_ACCEPT = 2
+};
+/**
+ * Settings for a subscription.
+ */
+struct SubscriptionSettings
+{
+ SubscriptionSettings(
+ FlowControl flow=FlowControl::unlimited(),
+ AcceptMode accept=ACCEPT_MODE_EXPLICIT,
+ AcquireMode acquire=ACQUIRE_MODE_PRE_ACQUIRED,
+ unsigned int autoAck_=1,
+ CompletionMode completion=COMPLETE_ON_DELIVERY
+ ) : flowControl(flow), acceptMode(accept), acquireMode(acquire), autoAck(autoAck_), completionMode(completion), exclusive(false) {}
+
+ FlowControl flowControl; ///@< Flow control settings. @see FlowControl
+ /**
+ * The acceptMode determines whether the broker should expect
+ * delivery of messages to be acknowledged by the client
+ * indicating that it accepts them. A value of
+ * ACCEPT_MODE_EXPLICIT means that messages must be accepted
+ * (note: this may be done automatically by the library - see
+ * autoAck - or through an explicit call be the application - see
+ * Subscription::accept()) before they can be dequeued. A value of
+ * ACCEPT_MODE_NONE means that the broker can dequeue a message as
+ * soon as it is acquired.
+ */
+ AcceptMode acceptMode; ///@< ACCEPT_MODE_EXPLICIT or ACCEPT_MODE_NONE
+ /**
+ * The acquireMode determines whether messages are locked for the
+ * subscriber when delivered, and thus are not delivered to any
+ * other subscriber unless this subscriber releases them.
+ *
+ * The default is ACQUIRE_MODE_PRE_ACQUIRED meaning that the
+ * subscriber expects to have been given that message exclusively
+ * (i.e. the message will not be given to any other subscriber
+ * unless released explicitly or by this subscribers session
+ * failing without having accepted the message).
+ *
+ * Delivery of message in ACQUIRE_MODE_NOT_ACQUIRED mode means the
+ * message will still be available for other subscribers to
+ * receive. The application can if desired acquire a (set of)
+ * messages through an explicit acquire call - see
+ * Subscription::acquire().
+ */
+ AcquireMode acquireMode; ///@< ACQUIRE_MODE_PRE_ACQUIRED or ACQUIRE_MODE_NOT_ACQUIRED
+
+ /**
+ * Configures the frequency at which messages are automatically
+ * accepted (e.g. a value of 5 means that messages are accepted in
+ * batches of 5). A value of 0 means no automatic acknowledgement
+ * will occur and the application will itself be responsible for
+ * accepting messages.
+ */
+ unsigned int autoAck;
+ /**
+ * In windowing mode, completion of a message will cause the
+ * credit used up by that message to be reallocated. The
+ * subscriptions completion mode controls how completion is
+ * managed.
+ *
+ * If set to COMPLETE_ON_DELIVERY (which is the default), messages
+ * will be marked as completed once they have been received. The
+ * server will be explicitly notified of all completed messages
+ * for the session when the next accept is sent through the
+ * subscription (either explictly or through autAck). However the
+ * server may also periodically request information on the
+ * completed messages.
+ *
+ * If set to COMPLETE_ON_ACCEPT, messages will be marked as
+ * completed once they are accepted (via the Subscription class)
+ * and the server will also be notified of all completed messages
+ * for the session.
+ *
+ * If set to MANUAL_COMPLETION the application is responsible for
+ * completing messages (@see Session::markCompleted()).
+ */
+ CompletionMode completionMode;
+ /**
+ * If set, requests that no other subscriber be allowed to access
+ * the queue while this subscription is active.
+ */
+ bool exclusive;
+};
+
+}} // namespace qpid::client
+
+#endif /*!QPID_CLIENT_SUBSCRIPTIONSETTINGS_H*/
diff --git a/qpid/cpp/include/qpid/client/TypedResult.h b/qpid/cpp/include/qpid/client/TypedResult.h
new file mode 100644
index 0000000000..8e1a16580c
--- /dev/null
+++ b/qpid/cpp/include/qpid/client/TypedResult.h
@@ -0,0 +1,65 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef _TypedResult_
+#define _TypedResult_
+
+#include "qpid/client/Completion.h"
+#include "qpid/framing/StructHelper.h"
+
+namespace qpid {
+namespace client {
+
+/**
+ * Returned by asynchronous commands that return a result.
+ * You can use get() to wait for completion and get the result value.
+ * \ingroup clientapi
+ */
+template <class T> class TypedResult : public Completion
+{
+ T result;
+ bool decoded;
+
+public:
+ ///@internal
+ TypedResult(const Completion& c) : Completion(c), decoded(false) {}
+
+ /**
+ * Wait for the asynchronous command that returned this TypedResult to complete
+ * and return its result.
+ *
+ *@return The result returned by the command.
+ *@exception If the command returns an error, get() throws an exception.
+ *
+ */
+ T& get() {
+ if (!decoded) {
+ framing::StructHelper helper;
+ helper.decode(result, getResult());
+ decoded = true;
+ }
+ return result;
+ }
+};
+
+}}
+
+#endif