diff options
Diffstat (limited to 'qpid/cpp/lib/client/IncomingMessage.cpp')
-rw-r--r-- | qpid/cpp/lib/client/IncomingMessage.cpp | 172 |
1 files changed, 172 insertions, 0 deletions
diff --git a/qpid/cpp/lib/client/IncomingMessage.cpp b/qpid/cpp/lib/client/IncomingMessage.cpp new file mode 100644 index 0000000000..07f94ceb64 --- /dev/null +++ b/qpid/cpp/lib/client/IncomingMessage.cpp @@ -0,0 +1,172 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <IncomingMessage.h> +#include "framing/AMQHeaderBody.h" +#include "framing/AMQContentBody.h" +#include "BasicGetOkBody.h" +#include "BasicReturnBody.h" +#include "BasicDeliverBody.h" +#include <QpidError.h> +#include <iostream> + +namespace qpid { +namespace client { + +using namespace sys; +using namespace framing; + +struct IncomingMessage::Guard: public Mutex::ScopedLock { + Guard(IncomingMessage* im) : Mutex::ScopedLock(im->lock) { + im->shutdownError.throwIf(); + } +}; + +IncomingMessage::IncomingMessage() { reset(); } + +void IncomingMessage::reset() { + state = &IncomingMessage::expectRequest; + endFn= &IncomingMessage::endRequest; + buildMessage = Message(); +} + +void IncomingMessage::startGet() { + Guard g(this); + if (state != &IncomingMessage::expectRequest) { + endGet(new QPID_ERROR(CLIENT_ERROR, "Message already in progress.")); + } + else { + state = &IncomingMessage::expectGetOk; + endFn = &IncomingMessage::endGet; + getError.reset(); + getState = GETTING; + } +} + +bool IncomingMessage::waitGet(Message& msg) { + Guard g(this); + while (getState == GETTING && !shutdownError && !getError) + getReady.wait(lock); + shutdownError.throwIf(); + getError.throwIf(); + msg = getMessage; + return getState==GOT; +} + +Message IncomingMessage::waitDispatch() { + Guard g(this); + while(dispatchQueue.empty() && !shutdownError) + dispatchReady.wait(lock); + shutdownError.throwIf(); + + Message msg(dispatchQueue.front()); + dispatchQueue.pop(); + return msg; +} + +void IncomingMessage::add(BodyPtr body) { + Guard g(this); + shutdownError.throwIf(); + // Call the current state function. + (this->*state)(body); +} + +void IncomingMessage::shutdown() { + Mutex::ScopedLock l(lock); + shutdownError.reset(new ShutdownException()); + getReady.notify(); + dispatchReady.notify(); +} + +bool IncomingMessage::isShutdown() const { + Mutex::ScopedLock l(lock); + return shutdownError; +} + +// Common check for all the expect functions. Called in network thread. +template<class T> +boost::shared_ptr<T> IncomingMessage::expectCheck(BodyPtr body) { + boost::shared_ptr<T> ptr = boost::dynamic_pointer_cast<T>(body); + if (!ptr) + throw QPID_ERROR(PROTOCOL_ERROR+504, "Unexpected frame type"); + return ptr; +} + +void IncomingMessage::expectGetOk(BodyPtr body) { + if (dynamic_cast<BasicGetOkBody*>(body.get())) + state = &IncomingMessage::expectHeader; + else if (dynamic_cast<BasicGetEmptyBody*>(body.get())) { + getState = EMPTY; + endGet(); + } + else + throw QPID_ERROR(PROTOCOL_ERROR+504, "Unexpected frame type"); +} + +void IncomingMessage::expectHeader(BodyPtr body) { + AMQHeaderBody::shared_ptr header = expectCheck<AMQHeaderBody>(body); + buildMessage.header = header; + state = &IncomingMessage::expectContent; + checkComplete(); +} + +void IncomingMessage::expectContent(BodyPtr body) { + AMQContentBody::shared_ptr content = expectCheck<AMQContentBody>(body); + buildMessage.setData(buildMessage.getData() + content->getData()); + checkComplete(); +} + +void IncomingMessage::checkComplete() { + size_t declaredSize = buildMessage.header->getContentSize(); + size_t currentSize = buildMessage.getData().size(); + if (declaredSize == currentSize) + (this->*endFn)(0); + else if (declaredSize < currentSize) + (this->*endFn)(new QPID_ERROR( + PROTOCOL_ERROR, "Message content exceeds declared size.")); +} + +void IncomingMessage::expectRequest(BodyPtr body) { + AMQMethodBody::shared_ptr method = expectCheck<AMQMethodBody>(body); + buildMessage.setMethod(method); + state = &IncomingMessage::expectHeader; +} + +void IncomingMessage::endGet(Exception* ex) { + getError.reset(ex); + if (getState == GETTING) { + getMessage = buildMessage; + getState = GOT; + } + reset(); + getReady.notify(); +} + +void IncomingMessage::endRequest(Exception* ex) { + ExceptionHolder eh(ex); + if (!eh) { + dispatchQueue.push(buildMessage); + reset(); + dispatchReady.notify(); + } + eh.throwIf(); +} + +}} // namespace qpid::client |