summaryrefslogtreecommitdiff
path: root/cpp/lib/client/IncomingMessage.h
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-03-27 15:36:39 +0000
committerAlan Conway <aconway@apache.org>2007-03-27 15:36:39 +0000
commit847ee577e23fbdd2175709a08a7160e8b2c1f464 (patch)
treee4962c5246c91a08ef635f2c68e06b82cfb100ee /cpp/lib/client/IncomingMessage.h
parentfb14a2042dd5bdae5a5c79b8cd4f1ad87e59bee1 (diff)
downloadqpid-python-847ee577e23fbdd2175709a08a7160e8b2c1f464.tar.gz
Refactored client::Message to be independent of all Basic class concepts
and client::IncomingMessage to handle 0-9 style references and appends. * cpp/lib/client/ClientMessage.cpp: Made independent of Basic class. * cpp/lib/client/IncomingMessage.cpp: Refactored to handle references/appends. * cpp/lib/client/BasicMessageChannel.cpp: Refactored to use new IncomingMessage Thread safety fixes: * cpp/lib/client/ResponseHandler.h: Remove stateful functions. * cpp/lib/client/ClientChannel.cpp: use new ResponseHandler interface. Minor cleanup: * cpp/lib/common/framing/BasicHeaderProperties.cpp: use DeliveryMode enum. * cpp/tests/HeaderTest.cpp: use DeliveryMode enum. * cpp/tests/MessageTest.cpp: use DeliveryMode enum. * cpp/lib/common/shared_ptr.h: #include <boost/cast.hpp> for convenience. * cpp/lib/common/sys/ThreadSafeQueue.h: Changed "stop" "shutdown" * cpp/lib/common/sys/ProducerConsumer.h: Changed "stop" "shutdown" * cpp/tests/ClientChannelTest.cpp (TestCase): Removed debug couts. * cpp/tests/setup: valgrind --demangle=yes by default. * cpp/tests/topictest: sleep to hack around startup race. * cpp/lib/broker/BrokerQueue.cpp (configure): Fixed memory leak. Removed/updated FIXME comments in: * cpp/lib/broker/BrokerMessage.cpp: * cpp/lib/broker/BrokerMessageBase.h: * cpp/lib/broker/InMemoryContent.cpp: * cpp/lib/common/framing/MethodContext.h: git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@522956 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/client/IncomingMessage.h')
-rw-r--r--cpp/lib/client/IncomingMessage.h131
1 files changed, 63 insertions, 68 deletions
diff --git a/cpp/lib/client/IncomingMessage.h b/cpp/lib/client/IncomingMessage.h
index 6ec949028d..d78a90327d 100644
--- a/cpp/lib/client/IncomingMessage.h
+++ b/cpp/lib/client/IncomingMessage.h
@@ -21,96 +21,91 @@
* under the License.
*
*/
-#include <string>
-#include <queue>
-#include <framing/amqp_framing.h>
-#include "ExceptionHolder.h"
-#include "ClientMessage.h"
#include "sys/Mutex.h"
-#include "sys/Condition.h"
+#include <map>
+#include <vector>
+
namespace qpid {
+namespace client {
-namespace framing {
-class AMQBody;
-}
+class Message;
-namespace client {
/**
- * Accumulates incoming message frames into messages.
- * Client-initiated messages (basic.get) are initiated and made
- * available to the user thread one at a time.
- *
- * Broker initiated messages (basic.return, basic.deliver) are
- * queued for handling by the user dispatch thread.
- *
+ * Manage incoming messages.
+ *
+ * Uses reference and destination concepts from 0-9 Messsage class.
+ *
+ * Basic messages use special destination and reference names to indicate
+ * get-ok, return etc. messages.
+ *
*/
class IncomingMessage {
public:
- typedef boost::shared_ptr<framing::AMQBody> BodyPtr;
- IncomingMessage();
-
- /** Expect a new message starting with getOk. Called in user thread.*/
- void startGet();
+ /** Accumulate data associated with a set of messages. */
+ struct Reference {
+ std::string data;
+ std::vector<Message> messages;
+ };
- /** Wait for the message to complete, return the message.
- * Called in user thread.
- *@raises QpidError if there was an error.
- */
- bool waitGet(Message&);
+ /** Interface to a destination for messages. */
+ class Destination {
+ public:
+ virtual ~Destination();
- /** Wait for the next broker-initiated message. */
- Message waitDispatch();
+ /** Pass a message to the destination */
+ virtual void message(const Message&) = 0;
- /** Add a frame body to the message. Called in network thread. */
- void add(BodyPtr);
+ /** Notify destination of queue-empty contition */
+ virtual void empty() = 0;
+ };
- /** Shut down: all further calls to any function throw ex. */
- void shutdown();
- /** Check if shutdown */
- bool isShutdown() const;
+ /** Add a reference. Throws if already open. */
+ void openReference(const std::string& name);
- private:
+ /** Get a reference. Throws if not already open. */
+ void appendReference(const std::string& name,
+ const std::string& data);
- typedef void (IncomingMessage::* ExpectFn)(BodyPtr);
- typedef void (IncomingMessage::* EndFn)(Exception*);
- typedef std::queue<Message> MessageQueue;
- struct Guard;
- friend struct Guard;
+ /** Create a message to destination associated with reference
+ *@exception if destination or reference non-existent.
+ */
+ Message& createMessage(const std::string& destination,
+ const std::string& reference);
- void reset();
- template <class T> boost::shared_ptr<T> expectCheck(BodyPtr);
+ /** Get a reference.
+ *@exception if non-existent.
+ */
+ Reference& getReference(const std::string& name);
+
+ /** Close a reference and deliver all its messages.
+ * Throws if not open or a message has an invalid destination.
+ */
+ void closeReference(const std::string& name);
- // State functions - a state machine where each state is
- // a member function that processes a frame body.
- void expectGetOk(BodyPtr);
- void expectHeader(BodyPtr);
- void expectContent(BodyPtr);
- void expectRequest(BodyPtr);
+ /** Add a destination.
+ *@exception if a different Destination is already registered
+ * under name.
+ */
+ void addDestination(std::string name, Destination&);
+
+ /** Remove a destination. Throws if does not exist */
+ void removeDestination(std::string name);
- // End functions.
- void endGet(Exception* ex = 0);
- void endRequest(Exception* ex);
+ /** Get a destination. Throws if does not exist */
+ Destination& getDestination(const std::string& name);
+ private:
- // Check for complete message.
- void checkComplete();
+ typedef std::map<std::string, Reference> ReferenceMap;
+ typedef std::map<std::string, Destination*> DestinationMap;
+ Reference& getRefUnlocked(const std::string& name);
+ Destination& getDestUnlocked(const std::string& name);
+
mutable sys::Mutex lock;
- ExpectFn state;
- EndFn endFn;
- Message buildMessage;
- ExceptionHolder shutdownError;
-
- // For basic.get messages.
- sys::Condition getReady;
- ExceptionHolder getError;
- Message getMessage;
- enum { GETTING, GOT, EMPTY } getState;
-
- // For broker-initiated messages
- sys::Condition dispatchReady;
- MessageQueue dispatchQueue;
+ ReferenceMap references;
+ DestinationMap destinations;
};
}}