diff options
author | Alan Conway <aconway@apache.org> | 2007-03-27 15:36:39 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2007-03-27 15:36:39 +0000 |
commit | 847ee577e23fbdd2175709a08a7160e8b2c1f464 (patch) | |
tree | e4962c5246c91a08ef635f2c68e06b82cfb100ee /cpp/lib/client/IncomingMessage.cpp | |
parent | fb14a2042dd5bdae5a5c79b8cd4f1ad87e59bee1 (diff) | |
download | qpid-python-847ee577e23fbdd2175709a08a7160e8b2c1f464.tar.gz |
Refactored client::Message to be independent of all Basic class concepts
and client::IncomingMessage to handle 0-9 style references and appends.
* cpp/lib/client/ClientMessage.cpp: Made independent of Basic class.
* cpp/lib/client/IncomingMessage.cpp: Refactored to handle references/appends.
* cpp/lib/client/BasicMessageChannel.cpp: Refactored to use new IncomingMessage
Thread safety fixes:
* cpp/lib/client/ResponseHandler.h: Remove stateful functions.
* cpp/lib/client/ClientChannel.cpp: use new ResponseHandler interface.
Minor cleanup:
* cpp/lib/common/framing/BasicHeaderProperties.cpp: use DeliveryMode enum.
* cpp/tests/HeaderTest.cpp: use DeliveryMode enum.
* cpp/tests/MessageTest.cpp: use DeliveryMode enum.
* cpp/lib/common/shared_ptr.h: #include <boost/cast.hpp> for convenience.
* cpp/lib/common/sys/ThreadSafeQueue.h: Changed "stop" "shutdown"
* cpp/lib/common/sys/ProducerConsumer.h: Changed "stop" "shutdown"
* cpp/tests/ClientChannelTest.cpp (TestCase): Removed debug couts.
* cpp/tests/setup: valgrind --demangle=yes by default.
* cpp/tests/topictest: sleep to hack around startup race.
* cpp/lib/broker/BrokerQueue.cpp (configure): Fixed memory leak.
Removed/updated FIXME comments in:
* cpp/lib/broker/BrokerMessage.cpp:
* cpp/lib/broker/BrokerMessageBase.h:
* cpp/lib/broker/InMemoryContent.cpp:
* cpp/lib/common/framing/MethodContext.h:
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@522956 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/client/IncomingMessage.cpp')
-rw-r--r-- | cpp/lib/client/IncomingMessage.cpp | 204 |
1 files changed, 81 insertions, 123 deletions
diff --git a/cpp/lib/client/IncomingMessage.cpp b/cpp/lib/client/IncomingMessage.cpp index 07f94ceb64..8f69f8c3ef 100644 --- a/cpp/lib/client/IncomingMessage.cpp +++ b/cpp/lib/client/IncomingMessage.cpp @@ -18,155 +18,113 @@ * under the License. * */ -#include <IncomingMessage.h> -#include "framing/AMQHeaderBody.h" -#include "framing/AMQContentBody.h" -#include "BasicGetOkBody.h" -#include "BasicReturnBody.h" -#include "BasicDeliverBody.h" -#include <QpidError.h> -#include <iostream> + +#include "IncomingMessage.h" +#include "Exception.h" +#include "ClientMessage.h" +#include <boost/format.hpp> namespace qpid { namespace client { -using namespace sys; -using namespace framing; - -struct IncomingMessage::Guard: public Mutex::ScopedLock { - Guard(IncomingMessage* im) : Mutex::ScopedLock(im->lock) { - im->shutdownError.throwIf(); - } -}; - -IncomingMessage::IncomingMessage() { reset(); } +using boost::format; +using sys::Mutex; -void IncomingMessage::reset() { - state = &IncomingMessage::expectRequest; - endFn= &IncomingMessage::endRequest; - buildMessage = Message(); -} - -void IncomingMessage::startGet() { - Guard g(this); - if (state != &IncomingMessage::expectRequest) { - endGet(new QPID_ERROR(CLIENT_ERROR, "Message already in progress.")); - } - else { - state = &IncomingMessage::expectGetOk; - endFn = &IncomingMessage::endGet; - getError.reset(); - getState = GETTING; - } -} - -bool IncomingMessage::waitGet(Message& msg) { - Guard g(this); - while (getState == GETTING && !shutdownError && !getError) - getReady.wait(lock); - shutdownError.throwIf(); - getError.throwIf(); - msg = getMessage; - return getState==GOT; -} - -Message IncomingMessage::waitDispatch() { - Guard g(this); - while(dispatchQueue.empty() && !shutdownError) - dispatchReady.wait(lock); - shutdownError.throwIf(); - - Message msg(dispatchQueue.front()); - dispatchQueue.pop(); - return msg; -} +IncomingMessage::Destination::~Destination() {} -void IncomingMessage::add(BodyPtr body) { - Guard g(this); - shutdownError.throwIf(); - // Call the current state function. - (this->*state)(body); -} - -void IncomingMessage::shutdown() { +void IncomingMessage::openReference(const std::string& name) { Mutex::ScopedLock l(lock); - shutdownError.reset(new ShutdownException()); - getReady.notify(); - dispatchReady.notify(); + if (references.find(name) != references.end()) + throw ChannelException( + 406, format("Attempt to open existing reference %s.") % name); + references[name]; + return; } -bool IncomingMessage::isShutdown() const { +void IncomingMessage::appendReference( + const std::string& name, const std::string& data) +{ Mutex::ScopedLock l(lock); - return shutdownError; + getRefUnlocked(name).data += data; } -// Common check for all the expect functions. Called in network thread. -template<class T> -boost::shared_ptr<T> IncomingMessage::expectCheck(BodyPtr body) { - boost::shared_ptr<T> ptr = boost::dynamic_pointer_cast<T>(body); - if (!ptr) - throw QPID_ERROR(PROTOCOL_ERROR+504, "Unexpected frame type"); - return ptr; +Message& IncomingMessage::createMessage( + const std::string& destination, const std::string& reference) +{ + Mutex::ScopedLock l(lock); + getDestUnlocked(destination); // Verify destination. + Reference& ref = getRefUnlocked(reference); + ref.messages.resize(ref.messages.size() +1); + ref.messages.back().setDestination(destination); + return ref.messages.back(); } -void IncomingMessage::expectGetOk(BodyPtr body) { - if (dynamic_cast<BasicGetOkBody*>(body.get())) - state = &IncomingMessage::expectHeader; - else if (dynamic_cast<BasicGetEmptyBody*>(body.get())) { - getState = EMPTY; - endGet(); +void IncomingMessage::closeReference(const std::string& name) { + Reference refCopy; + { + Mutex::ScopedLock l(lock); + refCopy = getRefUnlocked(name); + references.erase(name); + } + for (std::vector<Message>::iterator i = refCopy.messages.begin(); + i != refCopy.messages.end(); + ++i) + { + i->setData(refCopy.data); + // TODO aconway 2007-03-23: Thread safety, + // can a destination be removed while we're doing this? + getDestination(i->getDestination()).message(*i); } - else - throw QPID_ERROR(PROTOCOL_ERROR+504, "Unexpected frame type"); } -void IncomingMessage::expectHeader(BodyPtr body) { - AMQHeaderBody::shared_ptr header = expectCheck<AMQHeaderBody>(body); - buildMessage.header = header; - state = &IncomingMessage::expectContent; - checkComplete(); + +void IncomingMessage::addDestination(std::string name, Destination& dest) { + Mutex::ScopedLock l(lock); + DestinationMap::iterator i = destinations.find(name); + if (i == destinations.end()) + destinations[name]=&dest; + else if (i->second != &dest) + throw ChannelException( + 404, format("Destination already exists: %s.") % name); } -void IncomingMessage::expectContent(BodyPtr body) { - AMQContentBody::shared_ptr content = expectCheck<AMQContentBody>(body); - buildMessage.setData(buildMessage.getData() + content->getData()); - checkComplete(); +void IncomingMessage::removeDestination(std::string name) { + Mutex::ScopedLock l(lock); + DestinationMap::iterator i = destinations.find(name); + if (i == destinations.end()) + throw ChannelException( + 406, format("No such destination: %s.") % name); + destinations.erase(i); } -void IncomingMessage::checkComplete() { - size_t declaredSize = buildMessage.header->getContentSize(); - size_t currentSize = buildMessage.getData().size(); - if (declaredSize == currentSize) - (this->*endFn)(0); - else if (declaredSize < currentSize) - (this->*endFn)(new QPID_ERROR( - PROTOCOL_ERROR, "Message content exceeds declared size.")); +IncomingMessage::Destination& IncomingMessage::getDestination( + const std::string& name) { + return getDestUnlocked(name); } -void IncomingMessage::expectRequest(BodyPtr body) { - AMQMethodBody::shared_ptr method = expectCheck<AMQMethodBody>(body); - buildMessage.setMethod(method); - state = &IncomingMessage::expectHeader; +IncomingMessage::Reference& IncomingMessage::getReference( + const std::string& name) { + return getRefUnlocked(name); } - -void IncomingMessage::endGet(Exception* ex) { - getError.reset(ex); - if (getState == GETTING) { - getMessage = buildMessage; - getState = GOT; - } - reset(); - getReady.notify(); + +IncomingMessage::Reference& IncomingMessage::getRefUnlocked( + const std::string& name) { + Mutex::ScopedLock l(lock); + ReferenceMap::iterator i = references.find(name); + if (i == references.end()) + throw ChannelException( + 404, format("No such reference: %s.") % name); + return i->second; } -void IncomingMessage::endRequest(Exception* ex) { - ExceptionHolder eh(ex); - if (!eh) { - dispatchQueue.push(buildMessage); - reset(); - dispatchReady.notify(); - } - eh.throwIf(); +IncomingMessage::Destination& IncomingMessage::getDestUnlocked( + const std::string& name) { + Mutex::ScopedLock l(lock); + DestinationMap::iterator i = destinations.find(name); + if (i == destinations.end()) + throw ChannelException( + 404, format("No such destination: %s.") % name); + return *i->second; } }} // namespace qpid::client |