summaryrefslogtreecommitdiff
path: root/cpp/lib/client/IncomingMessage.h
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/lib/client/IncomingMessage.h')
-rw-r--r--cpp/lib/client/IncomingMessage.h131
1 files changed, 63 insertions, 68 deletions
diff --git a/cpp/lib/client/IncomingMessage.h b/cpp/lib/client/IncomingMessage.h
index 6ec949028d..d78a90327d 100644
--- a/cpp/lib/client/IncomingMessage.h
+++ b/cpp/lib/client/IncomingMessage.h
@@ -21,96 +21,91 @@
* under the License.
*
*/
-#include <string>
-#include <queue>
-#include <framing/amqp_framing.h>
-#include "ExceptionHolder.h"
-#include "ClientMessage.h"
#include "sys/Mutex.h"
-#include "sys/Condition.h"
+#include <map>
+#include <vector>
+
namespace qpid {
+namespace client {
-namespace framing {
-class AMQBody;
-}
+class Message;
-namespace client {
/**
- * Accumulates incoming message frames into messages.
- * Client-initiated messages (basic.get) are initiated and made
- * available to the user thread one at a time.
- *
- * Broker initiated messages (basic.return, basic.deliver) are
- * queued for handling by the user dispatch thread.
- *
+ * Manage incoming messages.
+ *
+ * Uses reference and destination concepts from 0-9 Messsage class.
+ *
+ * Basic messages use special destination and reference names to indicate
+ * get-ok, return etc. messages.
+ *
*/
class IncomingMessage {
public:
- typedef boost::shared_ptr<framing::AMQBody> BodyPtr;
- IncomingMessage();
-
- /** Expect a new message starting with getOk. Called in user thread.*/
- void startGet();
+ /** Accumulate data associated with a set of messages. */
+ struct Reference {
+ std::string data;
+ std::vector<Message> messages;
+ };
- /** Wait for the message to complete, return the message.
- * Called in user thread.
- *@raises QpidError if there was an error.
- */
- bool waitGet(Message&);
+ /** Interface to a destination for messages. */
+ class Destination {
+ public:
+ virtual ~Destination();
- /** Wait for the next broker-initiated message. */
- Message waitDispatch();
+ /** Pass a message to the destination */
+ virtual void message(const Message&) = 0;
- /** Add a frame body to the message. Called in network thread. */
- void add(BodyPtr);
+ /** Notify destination of queue-empty contition */
+ virtual void empty() = 0;
+ };
- /** Shut down: all further calls to any function throw ex. */
- void shutdown();
- /** Check if shutdown */
- bool isShutdown() const;
+ /** Add a reference. Throws if already open. */
+ void openReference(const std::string& name);
- private:
+ /** Get a reference. Throws if not already open. */
+ void appendReference(const std::string& name,
+ const std::string& data);
- typedef void (IncomingMessage::* ExpectFn)(BodyPtr);
- typedef void (IncomingMessage::* EndFn)(Exception*);
- typedef std::queue<Message> MessageQueue;
- struct Guard;
- friend struct Guard;
+ /** Create a message to destination associated with reference
+ *@exception if destination or reference non-existent.
+ */
+ Message& createMessage(const std::string& destination,
+ const std::string& reference);
- void reset();
- template <class T> boost::shared_ptr<T> expectCheck(BodyPtr);
+ /** Get a reference.
+ *@exception if non-existent.
+ */
+ Reference& getReference(const std::string& name);
+
+ /** Close a reference and deliver all its messages.
+ * Throws if not open or a message has an invalid destination.
+ */
+ void closeReference(const std::string& name);
- // State functions - a state machine where each state is
- // a member function that processes a frame body.
- void expectGetOk(BodyPtr);
- void expectHeader(BodyPtr);
- void expectContent(BodyPtr);
- void expectRequest(BodyPtr);
+ /** Add a destination.
+ *@exception if a different Destination is already registered
+ * under name.
+ */
+ void addDestination(std::string name, Destination&);
+
+ /** Remove a destination. Throws if does not exist */
+ void removeDestination(std::string name);
- // End functions.
- void endGet(Exception* ex = 0);
- void endRequest(Exception* ex);
+ /** Get a destination. Throws if does not exist */
+ Destination& getDestination(const std::string& name);
+ private:
- // Check for complete message.
- void checkComplete();
+ typedef std::map<std::string, Reference> ReferenceMap;
+ typedef std::map<std::string, Destination*> DestinationMap;
+ Reference& getRefUnlocked(const std::string& name);
+ Destination& getDestUnlocked(const std::string& name);
+
mutable sys::Mutex lock;
- ExpectFn state;
- EndFn endFn;
- Message buildMessage;
- ExceptionHolder shutdownError;
-
- // For basic.get messages.
- sys::Condition getReady;
- ExceptionHolder getError;
- Message getMessage;
- enum { GETTING, GOT, EMPTY } getState;
-
- // For broker-initiated messages
- sys::Condition dispatchReady;
- MessageQueue dispatchQueue;
+ ReferenceMap references;
+ DestinationMap destinations;
};
}}