diff options
Diffstat (limited to 'qpid/cpp/src/qpid/client/ExecutionHandler.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/client/ExecutionHandler.cpp | 267 |
1 files changed, 0 insertions, 267 deletions
diff --git a/qpid/cpp/src/qpid/client/ExecutionHandler.cpp b/qpid/cpp/src/qpid/client/ExecutionHandler.cpp deleted file mode 100644 index afdd13c9e9..0000000000 --- a/qpid/cpp/src/qpid/client/ExecutionHandler.cpp +++ /dev/null @@ -1,267 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "ExecutionHandler.h" -#include "qpid/Exception.h" -#include "qpid/framing/BasicDeliverBody.h" -#include "qpid/framing/MessageTransferBody.h" -#include "qpid/framing/AMQP_HighestVersion.h" -#include "qpid/framing/all_method_bodies.h" -#include "qpid/framing/ServerInvoker.h" -#include "qpid/client/FutureCompletion.h" -#include <boost/bind.hpp> - -using namespace qpid::client; -using namespace qpid::framing; -using namespace boost; -using qpid::sys::Mutex; - -bool isMessageMethod(AMQMethodBody* method) -{ - return method->isA<BasicDeliverBody>() || method->isA<MessageTransferBody>() || method->isA<BasicGetOkBody>(); -} - -bool isMessageMethod(AMQBody* body) -{ - AMQMethodBody* method=body->getMethod(); - return method && isMessageMethod(method); -} - -bool isContentFrame(AMQFrame& frame) -{ - AMQBody* body = frame.getBody(); - uint8_t type = body->type(); - return type == HEADER_BODY || type == CONTENT_BODY || isMessageMethod(body); -} - -ExecutionHandler::ExecutionHandler(uint64_t _maxFrameSize) : - version(framing::highestProtocolVersion), maxFrameSize(_maxFrameSize) {} - -//incoming: -void ExecutionHandler::handle(AMQFrame& frame) -{ - if (!invoke(*this, *frame.getBody())) { - if (!arriving) { - arriving = FrameSet::shared_ptr(new FrameSet(++incomingCounter)); - } - arriving->append(frame); - if (arriving->isComplete()) { - if (arriving->isContentBearing() || !correlation.receive(arriving->getMethod())) { - demux.handle(arriving); - } - arriving.reset(); - } - } -} - -void ExecutionHandler::complete(uint32_t cumulative, const SequenceNumberSet& range) -{ - if (range.size() % 2) { //must be even number - throw NotAllowedException(QPID_MSG("Received odd number of elements in ranged mark")); - } else { - SequenceNumber mark(cumulative); - { - Mutex::ScopedLock l(lock); - outgoingCompletionStatus.update(mark, range); - } - if (completionListener) completionListener(); - completion.completed(outgoingCompletionStatus.mark); - //TODO: signal listeners of early notification? - } -} - -void ExecutionHandler::flush() -{ - sendCompletion(); -} - -void ExecutionHandler::noop() -{ - //do nothing -} - -void ExecutionHandler::result(uint32_t command, const std::string& data) -{ - completion.received(command, data); -} - -void ExecutionHandler::sync() -{ - //TODO: implement - need to note the mark requested and then - //remember to send a response when that point is reached -} - -void ExecutionHandler::flushTo(const framing::SequenceNumber& point) -{ - Mutex::ScopedLock l(lock); - if (point > outgoingCompletionStatus.mark) { - sendFlushRequest(); - } -} - -void ExecutionHandler::sendFlushRequest() -{ - Mutex::ScopedLock l(lock); - AMQFrame frame(in_place<ExecutionFlushBody>()); - out(frame); -} - -void ExecutionHandler::syncTo(const framing::SequenceNumber& point) -{ - Mutex::ScopedLock l(lock); - if (point > outgoingCompletionStatus.mark) { - sendSyncRequest(); - } -} - - -void ExecutionHandler::sendSyncRequest() -{ - Mutex::ScopedLock l(lock); - AMQFrame frame(in_place<ExecutionSyncBody>()); - out(frame); -} - -void ExecutionHandler::completed(const SequenceNumber& id, bool cumulative, bool send) -{ - { - Mutex::ScopedLock l(lock); - if (id > incomingCompletionStatus.mark) { - if (cumulative) { - incomingCompletionStatus.update(incomingCompletionStatus.mark, id); - } else { - incomingCompletionStatus.update(id, id); - } - } - } - if (send) { - sendCompletion(); - } -} - - -void ExecutionHandler::sendCompletion() -{ - Mutex::ScopedLock l(lock); - SequenceNumberSet range; - incomingCompletionStatus.collectRanges(range); - AMQFrame frame( - in_place<ExecutionCompleteBody>( - version, incomingCompletionStatus.mark.getValue(), range)); - out(frame); -} - -SequenceNumber ExecutionHandler::lastSent() const { return outgoingCounter; } - -SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker::ResultListener listener) -{ - Mutex::ScopedLock l(lock); - return send(command, listener, false); -} - -SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker::ResultListener l, bool hasContent) -{ - SequenceNumber id = ++outgoingCounter; - if(l) { - completion.listenForResult(id, l); - } - AMQFrame frame(command); - if (hasContent) { - frame.setEof(false); - } - out(frame); - return id; -} - -SequenceNumber ExecutionHandler::send(const AMQBody& command, const MethodContent& content, - CompletionTracker::ResultListener listener) -{ - Mutex::ScopedLock l(lock); - SequenceNumber id = send(command, listener, true); - sendContent(content); - return id; -} - -void ExecutionHandler::sendContent(const MethodContent& content) -{ - AMQFrame header(content.getHeader()); - header.setBof(false); - u_int64_t data_length = content.getData().length(); - if(data_length > 0){ - header.setEof(false); - out(header); - const u_int32_t frag_size = maxFrameSize - (AMQFrame::frameOverhead() - 1 /*end of frame marker included in overhead but not in size*/); - if(data_length < frag_size){ - AMQFrame frame(in_place<AMQContentBody>(content.getData())); - frame.setBof(false); - out(frame); - }else{ - u_int32_t offset = 0; - u_int32_t remaining = data_length - offset; - while (remaining > 0) { - u_int32_t length = remaining > frag_size ? frag_size : remaining; - string frag(content.getData().substr(offset, length)); - AMQFrame frame(in_place<AMQContentBody>(frag)); - frame.setBof(false); - if (offset > 0) { - frame.setBos(false); - } - offset += length; - remaining = data_length - offset; - if (remaining) { - frame.setEos(false); - frame.setEof(false); - } - out(frame); - } - } - } else { - out(header); - } -} - -bool ExecutionHandler::isComplete(const SequenceNumber& id) -{ - Mutex::ScopedLock l(lock); - return outgoingCompletionStatus.covers(id); -} - -bool ExecutionHandler::isCompleteUpTo(const SequenceNumber& id) -{ - Mutex::ScopedLock l(lock); - return outgoingCompletionStatus.mark >= id; -} - -void ExecutionHandler::setCompletionListener(boost::function<void()> l) -{ - completionListener = l; -} - - -void ExecutionHandler::syncWait(const SequenceNumber& id) { - syncTo(id); - FutureCompletion fc; - completion.listenForCompletion( - id, boost::bind(&FutureCompletion::completed, &fc) - ); - fc.waitForCompletion(); - assert(isCompleteUpTo(id)); -} |