summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/ExecutionHandler.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client/ExecutionHandler.cpp')
-rw-r--r--cpp/src/qpid/client/ExecutionHandler.cpp159
1 files changed, 159 insertions, 0 deletions
diff --git a/cpp/src/qpid/client/ExecutionHandler.cpp b/cpp/src/qpid/client/ExecutionHandler.cpp
new file mode 100644
index 0000000000..e4270f4e98
--- /dev/null
+++ b/cpp/src/qpid/client/ExecutionHandler.cpp
@@ -0,0 +1,159 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ExecutionHandler.h"
+#include "qpid/Exception.h"
+#include "qpid/framing/BasicDeliverBody.h"
+#include "qpid/framing/MessageTransferBody.h"
+
+using namespace qpid::client;
+using namespace qpid::framing;
+using namespace boost;
+
+bool isMessageMethod(AMQMethodBody::shared_ptr method)
+{
+ return method->isA<BasicDeliverBody>() || method->isA<MessageTransferBody>() || method->isA<BasicGetOkBody>();
+}
+
+bool isMessageMethod(AMQBody::shared_ptr body)
+{
+ return body->type() == METHOD_BODY && isMessageMethod(shared_polymorphic_cast<AMQMethodBody>(body));
+}
+
+bool isContentFrame(AMQFrame& frame)
+{
+ AMQBody::shared_ptr body = frame.getBody();
+ uint8_t type = body->type();
+ return type == HEADER_BODY || type == CONTENT_BODY || isMessageMethod(body);
+}
+
+bool invoke(AMQBody::shared_ptr body, Invocable* target)
+{
+ return body->type() == METHOD_BODY && shared_polymorphic_cast<AMQMethodBody>(body)->invoke(target);
+}
+
+ExecutionHandler::ExecutionHandler() : version(framing::highestProtocolVersion) {}
+
+//incoming:
+void ExecutionHandler::handle(AMQFrame& frame)
+{
+ AMQBody::shared_ptr body = frame.getBody();
+ if (!invoke(body, this)) {
+ if (isContentFrame(frame)) {
+ if (!arriving) {
+ arriving = ReceivedContent::shared_ptr(new ReceivedContent(++incoming.hwm));
+ }
+ arriving->append(body);
+ if (arriving->isComplete()) {
+ received.push(arriving);
+ arriving.reset();
+ }
+ } else {
+ ++incoming.hwm;
+ correlation.receive(shared_polymorphic_cast<AMQMethodBody>(body));
+ }
+ }
+}
+
+void ExecutionHandler::complete(uint32_t cumulative, SequenceNumberSet range)
+{
+ SequenceNumber mark(cumulative);
+ if (outgoing.lwm < mark) {
+ outgoing.lwm = mark;
+ completion.completed(outgoing.lwm);
+ }
+ if (range.size() % 2) { //must be even number
+ throw ConnectionException(530, "Received odd number of elements in ranged mark");
+ } else {
+ //TODO: need to manage (record and accumulate) ranges such
+ //that we can implictly move the mark when appropriate
+
+ //TODO: signal listeners of early notification?
+ }
+}
+
+void ExecutionHandler::flush()
+{
+ //send completion
+ incoming.lwm = incoming.hwm;
+ //make_shared_ptr(new ExecutionCompleteBody(getVersion(), incoming.hwm.getValue(), SequenceNumberSet())));
+}
+
+void ExecutionHandler::send(AMQBody::shared_ptr command, CompletionTracker::Listener f, Correlator::Listener g)
+{
+ //allocate id:
+ ++outgoing.hwm;
+ //register listeners if necessary:
+ if (f) {
+ completion.listen(outgoing.hwm, f);
+ }
+ if (g) {
+ correlation.listen(g);
+ }
+
+ AMQFrame frame(version, 0/*id will be filled in be channel handler*/, command);
+ out(frame);
+
+ if (f) {
+ AMQFrame frame(version, 0, make_shared_ptr(new ExecutionFlushBody(version)));
+ out(frame);
+ }
+}
+
+void ExecutionHandler::sendContent(framing::AMQBody::shared_ptr content)
+{
+ AMQFrame frame(version, 0/*id will be filled in be channel handler*/, content);
+ out(frame);
+}
+
+void ExecutionHandler::sendContent(AMQBody::shared_ptr command, const BasicHeaderProperties& headers, const std::string& data,
+ uint64_t frameSize,
+ CompletionTracker::Listener f, Correlator::Listener g)
+{
+ send(command, f, g);
+
+ AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
+ BasicHeaderProperties::copy(*static_cast<BasicHeaderProperties*>(header->getProperties()), headers);
+ header->setContentSize(data.size());
+ AMQFrame h(version, 0, header);
+ out(h);
+
+ u_int64_t data_length = data.length();
+ if(data_length > 0){
+ //frame itself uses 8 bytes
+ u_int32_t frag_size = frameSize - 8;
+ if(data_length < frag_size){
+ AMQFrame frame(version, 0, make_shared_ptr(new AMQContentBody(data)));
+ out(frame);
+ }else{
+ u_int32_t offset = 0;
+ u_int32_t remaining = data_length - offset;
+ while (remaining > 0) {
+ u_int32_t length = remaining > frag_size ? frag_size : remaining;
+ string frag(data.substr(offset, length));
+ AMQFrame frame(version, 0, make_shared_ptr(new AMQContentBody(frag)));
+ out(frame);
+ offset += length;
+ remaining = data_length - offset;
+ }
+ }
+ }
+}