diff options
author | Gordon Sim <gsim@apache.org> | 2007-07-23 12:29:17 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2007-07-23 12:29:17 +0000 |
commit | 0db1af31320aa010c8e97da80000f7548d889068 (patch) | |
tree | ce2cd8dba8cf46b685dcb626b31e25c17702c1a0 | |
parent | 747ac26509e78ac9aa9120be02cd446ac99d21cd (diff) | |
download | qpid-python-0db1af31320aa010c8e97da80000f7548d889068.tar.gz |
Added initial 'execution-layer' to try out methods form the 0-10 execution class.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@558700 13f79535-47bb-0310-9956-ffa450edef68
26 files changed, 322 insertions, 103 deletions
diff --git a/cpp/gentools/src/org/apache/qpid/gentools/CppGenerator.java b/cpp/gentools/src/org/apache/qpid/gentools/CppGenerator.java index edcf9e09e3..fd3684125c 100644 --- a/cpp/gentools/src/org/apache/qpid/gentools/CppGenerator.java +++ b/cpp/gentools/src/org/apache/qpid/gentools/CppGenerator.java @@ -348,6 +348,8 @@ public class CppGenerator extends Generator return generateConstructor(thisClass, method, version, 4, 4); if (token.equals("${mb_server_operation_invoke}")) return generateServerOperationsInvoke(thisClass, method, version, 4, 4); + if (token.equals("${mb_server_operation_invoke2}")) + return generateServerOperationsInvoke2(thisClass, method, version, 4, 4); if (token.equals("${mb_buffer_param}")) return method.fieldMap.size() > 0 ? " buffer" : "/*buffer*/"; if (token.equals("${hv_latest_major}")) @@ -719,7 +721,7 @@ public class CppGenerator extends Generator sb.append(cr); sb.append(indent + "// ==================== class " + handlerClassName + " ====================" + cr); - sb.append(indent + "class " + handlerClassName); + sb.append(indent + "class " + handlerClassName + " : public virtual Invocable"); if (thisClass.versionSet.size() != globalVersionSet.size()) sb.append(" // AMQP Version(s) " + thisClass.versionSet + cr); else @@ -1085,11 +1087,11 @@ public class CppGenerator extends Generator String tab = Utils.createSpaces(tabSize); String namespace = version != null ? version.namespace() + "::" : ""; StringBuffer sb = new StringBuffer(); - sb.append(indent+tab+(method.isResponse(version) ? "" : "return ")+"channel.send(new "); + sb.append(indent+tab+(method.isResponse(version) ? "" : "return ")+"channel.send(make_shared_ptr(new "); sb.append(namespace + methodBodyClassName + "( channel.getVersion()"); if (method.isResponse(version)) sb.append(", responseTo"); sb.append(generateMethodParameterList(fieldMap, indentSize + (5*tabSize), true, false, true)); - sb.append("));\n"); + sb.append(")));\n"); return sb.toString(); } @@ -1505,6 +1507,50 @@ public class CppGenerator extends Generator return sb.toString(); } + protected String generateServerOperationsInvoke2(AmqpClass thisClass, AmqpMethod method, + AmqpVersion version, int indentSize, int tabSize) + throws AmqpTypeMappingException + { + String indent = Utils.createSpaces(indentSize); + String tab = Utils.createSpaces(tabSize); + StringBuffer sb = new StringBuffer(); + String ptrType = "AMQP_ServerOperations:: " + thisClass.name + "Handler*"; + if (isSpecialCase(thisClass.name, method.name)) { + sb.append(indent + "bool invoke(Invocable*)" + cr); + sb.append(indent + "{" + cr); + sb.append(indent + tab + "return false;" + cr); + sb.append(indent + "}" + cr); + } else if (method.serverMethodFlagMap.size() > 0) { // At least one AMQP version defines this method as a server method + + Iterator<Boolean> bItr = method.serverMethodFlagMap.keySet().iterator(); + while (bItr.hasNext()) + { + if (bItr.next()) // This is a server operation + { + boolean fieldMapNotEmptyFlag = method.fieldMap.size() > 0; + sb.append(indent + "bool invoke(Invocable* target)" + cr); + sb.append(indent + "{" + cr); + sb.append(indent + tab + ptrType + " ptr = dynamic_cast<" + ptrType + ">(target);" + cr); + sb.append(indent + tab + "if (ptr) {" + cr); + sb.append(indent + tab + tab + "ptr->"); + sb.append(parseForReservedWords(Utils.firstLower(method.name),null) + "(" + cr); + if (fieldMapNotEmptyFlag) + { + sb.append(generateFieldList(method.fieldMap, version, false, false, indentSize + 4*tabSize)); + sb.append(indent + tab + tab + tab + tab); + } + sb.append(");" + cr); + sb.append(indent + tab + tab + "return true;" + cr); + sb.append(indent + tab + "} else {" + cr); + sb.append(indent + tab + tab + "return false;" + cr); + sb.append(indent + tab + "}" + cr); + sb.append(indent + "}" + cr); + } + } + } + return sb.toString(); + } + // Methods for generation of code snippets for amqp_methods.h/cpp files protected String generateMethodBodyIncludeList(AmqpModel model, int indentSize) diff --git a/cpp/gentools/templ.cpp/AMQP_ServerOperations.h.tmpl b/cpp/gentools/templ.cpp/AMQP_ServerOperations.h.tmpl index be8dc2a6b3..06ef88c84e 100644 --- a/cpp/gentools/templ.cpp/AMQP_ServerOperations.h.tmpl +++ b/cpp/gentools/templ.cpp/AMQP_ServerOperations.h.tmpl @@ -36,6 +36,13 @@ namespace framing { class MethodContext; +class Invocable +{ +protected: + Invocable() {} + virtual ~Invocable() {} +}; + class AMQP_ServerOperations { protected: diff --git a/cpp/gentools/templ.cpp/MethodBodyClass.h.tmpl b/cpp/gentools/templ.cpp/MethodBodyClass.h.tmpl index 2e1b2ed482..093a5ffe90 100644 --- a/cpp/gentools/templ.cpp/MethodBodyClass.h.tmpl +++ b/cpp/gentools/templ.cpp/MethodBodyClass.h.tmpl @@ -100,6 +100,8 @@ ${mb_constructor_with_initializers} ${mb_server_operation_invoke} +${mb_server_operation_invoke2} + }; // class ${CLASS}${METHOD}Body ${version_namespace_end} diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp index be43dacb27..9bf148bcf0 100644 --- a/cpp/src/qpid/broker/BrokerAdapter.cpp +++ b/cpp/src/qpid/broker/BrokerAdapter.cpp @@ -56,12 +56,12 @@ ProtocolVersion BrokerAdapter::getVersion() const { void BrokerAdapter::ChannelHandlerImpl::open(const string& /*outOfBand*/){ channel.open(); // FIXME aconway 2007-01-04: provide valid ID as per ampq 0-9 - client.openOk(std::string()/* ID */);//GRS, context.getRequestId()); + client.openOk(std::string()/* ID */); } void BrokerAdapter::ChannelHandlerImpl::flow(bool active){ channel.flow(active); - client.flowOk(active);//GRS, context.getRequestId()); + client.flowOk(active); } void BrokerAdapter::ChannelHandlerImpl::flowOk(bool /*active*/){} @@ -70,7 +70,7 @@ void BrokerAdapter::ChannelHandlerImpl::close(uint16_t /*replyCode*/, const string& /*replyText*/, uint16_t /*classId*/, uint16_t /*methodId*/) { - client.closeOk();//GRS context.getRequestId()); + client.closeOk(); // FIXME aconway 2007-01-18: Following line will "delete this". Ugly. connection.closeChannel(channel.getId()); } @@ -104,7 +104,7 @@ void BrokerAdapter::ExchangeHandlerImpl::declare(uint16_t /*ticket*/, const stri } } if(!nowait){ - client.declareOk();//GRS context.getRequestId()); + client.declareOk(); } } @@ -114,16 +114,16 @@ void BrokerAdapter::ExchangeHandlerImpl::delete_(uint16_t /*ticket*/, Exchange::shared_ptr exchange(broker.getExchanges().get(name)); if (exchange->isDurable()) broker.getStore().destroy(*exchange); broker.getExchanges().destroy(name); - if(!nowait) client.deleteOk();//GRS context.getRequestId()); + if(!nowait) client.deleteOk(); } void BrokerAdapter::ExchangeHandlerImpl::query(u_int16_t /*ticket*/, const string& name) { try { Exchange::shared_ptr exchange(broker.getExchanges().get(name)); - client.queryOk(exchange->getType(), exchange->isDurable(), false, exchange->getArgs());//GRS, context.getRequestId()); + client.queryOk(exchange->getType(), exchange->isDurable(), false, exchange->getArgs()); } catch (const ChannelException& e) { - client.queryOk("", false, true, FieldTable());//GRS, context.getRequestId()); + client.queryOk("", false, true, FieldTable()); } } @@ -144,18 +144,18 @@ void BrokerAdapter::BindingHandlerImpl::query(u_int16_t /*ticket*/, } if (!exchange) { - client.queryOk(true, false, false, false, false);//GRS, context.getRequestId()); + client.queryOk(true, false, false, false, false); } else if (!queueName.empty() && !queue) { - client.queryOk(false, true, false, false, false);//GRS, context.getRequestId()); + client.queryOk(false, true, false, false, false); } else if (exchange->isBound(queue, key.empty() ? 0 : &key, args.count() > 0 ? &args : &args)) { - client.queryOk(false, false, false, false, false);//GRS, context.getRequestId()); + client.queryOk(false, false, false, false, false); } else { //need to test each specified option individually bool queueMatched = queueName.empty() || exchange->isBound(queue, 0, 0); bool keyMatched = key.empty() || exchange->isBound(Queue::shared_ptr(), &key, 0); bool argsMatched = args.count() == 0 || exchange->isBound(Queue::shared_ptr(), 0, &args); - client.queryOk(false, false, !queueMatched, !keyMatched, !argsMatched);//GRS, context.getRequestId()); + client.queryOk(false, false, !queueMatched, !keyMatched, !argsMatched); } } @@ -196,7 +196,7 @@ void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string& if (!nowait) { string queueName = queue->getName(); client.declareOk( - queueName, queue->getMessageCount(), queue->getConsumerCount());//GRS, context.getRequestId()); + queueName, queue->getMessageCount(), queue->getConsumerCount()); } } @@ -214,7 +214,7 @@ void BrokerAdapter::QueueHandlerImpl::bind(uint16_t /*ticket*/, const string& qu broker.getStore().bind(*exchange, *queue, routingKey, arguments); } } - if(!nowait) client.bindOk();//GRS context.getRequestId()); + if(!nowait) client.bindOk(); }else{ throw ChannelException( 404, "Bind failed. No such exchange: " + exchangeName); @@ -238,14 +238,14 @@ BrokerAdapter::QueueHandlerImpl::unbind(uint16_t /*ticket*/, broker.getStore().unbind(*exchange, *queue, routingKey, arguments); } - client.unbindOk();//GRS context.getRequestId()); + client.unbindOk(); } void BrokerAdapter::QueueHandlerImpl::purge(uint16_t /*ticket*/, const string& queueName, bool nowait){ Queue::shared_ptr queue = getQueue(queueName); int count = queue->purge(); - if(!nowait) client.purgeOk( count);//GRS, context.getRequestId()); + if(!nowait) client.purgeOk( count); } void BrokerAdapter::QueueHandlerImpl::delete_(uint16_t /*ticket*/, const string& queue, @@ -270,7 +270,7 @@ void BrokerAdapter::QueueHandlerImpl::delete_(uint16_t /*ticket*/, const string& } if(!nowait) - client.deleteOk(count);//GRS, context.getRequestId()); + client.deleteOk(count); } @@ -280,7 +280,7 @@ void BrokerAdapter::BasicHandlerImpl::qos(uint32_t prefetchSize, uint16_t prefet //TODO: handle global channel.setPrefetchSize(prefetchSize); channel.setPrefetchCount(prefetchCount); - client.qosOk();//GRS context.getRequestId()); + client.qosOk(); } void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/, @@ -300,7 +300,7 @@ void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/, channel.consume(std::auto_ptr<DeliveryAdapter>(new ConsumeAdapter(adapter, newTag, connection.getFrameMax())), newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &fields); - if(!nowait) client.consumeOk(newTag);//GRS, context.getRequestId()); + if(!nowait) client.consumeOk(newTag); //allow messages to be dispatched if required as there is now a consumer: queue->requestDispatch(); @@ -309,7 +309,7 @@ void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/, void BrokerAdapter::BasicHandlerImpl::cancel(const string& consumerTag, bool nowait){ channel.cancel(consumerTag); - if(!nowait) client.cancelOk(consumerTag);//GRS, context.getRequestId()); + if(!nowait) client.cancelOk(consumerTag); } void BrokerAdapter::BasicHandlerImpl::publish(uint16_t /*ticket*/, @@ -333,7 +333,7 @@ void BrokerAdapter::BasicHandlerImpl::get(uint16_t /*ticket*/, const string& que if(!channel.get(out, queue, !noAck)){ string clusterId;//not used, part of an imatix hack - client.getEmpty(clusterId);//GRS, context.getRequestId()); + client.getEmpty(clusterId); } } @@ -351,19 +351,19 @@ void BrokerAdapter::BasicHandlerImpl::recover(bool requeue) void BrokerAdapter::TxHandlerImpl::select() { channel.startTx(); - client.selectOk();//GRS context.getRequestId()); + client.selectOk(); } void BrokerAdapter::TxHandlerImpl::commit() { channel.commit(); - client.commitOk();//GRS context.getRequestId()); + client.commitOk(); } void BrokerAdapter::TxHandlerImpl::rollback() { channel.rollback(); - client.rollbackOk();//GRS context.getRequestId()); + client.rollbackOk(); channel.recover(false); } @@ -378,7 +378,7 @@ void BrokerAdapter::ChannelHandlerImpl::ok() // void BrokerAdapter::ChannelHandlerImpl::ping() { - client.ok();//GRS context.getRequestId()); + client.ok(); client.pong(); } @@ -386,7 +386,7 @@ void BrokerAdapter::ChannelHandlerImpl::ping() void BrokerAdapter::ChannelHandlerImpl::pong() { - client.ok();//GRS context.getRequestId()); + client.ok(); } void BrokerAdapter::ChannelHandlerImpl::resume(const string& /*channel*/) diff --git a/cpp/src/qpid/broker/BrokerAdapter.h b/cpp/src/qpid/broker/BrokerAdapter.h index 41013e8bf6..a7e27a0ee6 100644 --- a/cpp/src/qpid/broker/BrokerAdapter.h +++ b/cpp/src/qpid/broker/BrokerAdapter.h @@ -79,6 +79,7 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations DtxCoordinationHandler* getDtxCoordinationHandler() { return &dtxHandler; } DtxDemarcationHandler* getDtxDemarcationHandler() { return &dtxHandler; } + ExecutionHandler* getExecutionHandler() { throw ConnectionException(531, "Wrong adapter for execution layer method!"); } ConnectionHandler* getConnectionHandler() { throw ConnectionException(503, "Can't access connection class on non-zero channel!"); diff --git a/cpp/src/qpid/broker/BrokerMessage.cpp b/cpp/src/qpid/broker/BrokerMessage.cpp index 21b56869d4..d192b09a63 100644 --- a/cpp/src/qpid/broker/BrokerMessage.cpp +++ b/cpp/src/qpid/broker/BrokerMessage.cpp @@ -78,10 +78,10 @@ void BasicMessage::deliver(ChannelAdapter& channel, const string& consumerTag, uint64_t deliveryTag, uint32_t framesize) { - channel.send( + channel.send(make_shared_ptr( new BasicDeliverBody( channel.getVersion(), consumerTag, deliveryTag, - getRedelivered(), getExchange(), getRoutingKey())); + getRedelivered(), getExchange(), getRoutingKey()))); sendContent(channel, framesize); } @@ -92,12 +92,12 @@ void BasicMessage::sendGetOk(ChannelAdapter& channel, uint64_t deliveryTag, uint32_t framesize) { - channel.send( + channel.send(make_shared_ptr( new BasicGetOkBody( channel.getVersion(), responseTo, deliveryTag, getRedelivered(), getExchange(), - getRoutingKey(), messageCount)); + getRoutingKey(), messageCount))); sendContent(channel, framesize); } diff --git a/cpp/src/qpid/broker/BrokerMessageMessage.cpp b/cpp/src/qpid/broker/BrokerMessageMessage.cpp index b23ebaf50b..01f8250b84 100644 --- a/cpp/src/qpid/broker/BrokerMessageMessage.cpp +++ b/cpp/src/qpid/broker/BrokerMessageMessage.cpp @@ -85,7 +85,7 @@ void MessageMessage::transferMessage( if (ref){ // Open - channel.send(new MessageOpenBody(channel.getVersion(), ref->getId())); + channel.send(make_shared_ptr(new MessageOpenBody(channel.getVersion(), ref->getId()))); // Appends for(Reference::Appends::const_iterator a = ref->getAppends().begin(); a != ref->getAppends().end(); @@ -98,8 +98,8 @@ void MessageMessage::transferMessage( string::size_type contentStart = 0; while (sizeleft) { string::size_type contentSize = sizeleft <= framesize ? sizeleft : framesize-overhead; - channel.send(new MessageAppendBody(channel.getVersion(), ref->getId(), - string(content, contentStart, contentSize))); + channel.send(make_shared_ptr(new MessageAppendBody(channel.getVersion(), ref->getId(), + string(content, contentStart, contentSize)))); sizeleft -= contentSize; contentStart += contentSize; } @@ -108,7 +108,7 @@ void MessageMessage::transferMessage( // The transfer if ( transfer->size()<=framesize ) { - channel.send( + channel.send(make_shared_ptr( new MessageTransferBody(channel.getVersion(), transfer->getTicket(), consumerTag, @@ -132,7 +132,7 @@ void MessageMessage::transferMessage( transfer->getSecurityToken(), transfer->getApplicationHeaders(), body, - transfer->getMandatory())); + transfer->getMandatory()))); } else { // Thing to do here is to construct a simple reference message then deliver that instead // fragmentation will be taken care of in the delivery if necessary; @@ -172,7 +172,7 @@ void MessageMessage::transferMessage( } // Close any reference data if (ref) - channel.send(new MessageCloseBody(channel.getVersion(), ref->getId())); + channel.send(make_shared_ptr(new MessageCloseBody(channel.getVersion(), ref->getId()))); } void MessageMessage::deliver( diff --git a/cpp/src/qpid/broker/ConnectionAdapter.h b/cpp/src/qpid/broker/ConnectionAdapter.h index 2e27abd333..b624102cd2 100644 --- a/cpp/src/qpid/broker/ConnectionAdapter.h +++ b/cpp/src/qpid/broker/ConnectionAdapter.h @@ -71,6 +71,7 @@ public: TunnelHandler* getTunnelHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); } DtxCoordinationHandler* getDtxCoordinationHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); } DtxDemarcationHandler* getDtxDemarcationHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); } + ExecutionHandler* getExecutionHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); } framing::ProtocolVersion getVersion() const; }; diff --git a/cpp/src/qpid/broker/DtxHandlerImpl.cpp b/cpp/src/qpid/broker/DtxHandlerImpl.cpp index 2aa53e66bd..d1f925e40c 100644 --- a/cpp/src/qpid/broker/DtxHandlerImpl.cpp +++ b/cpp/src/qpid/broker/DtxHandlerImpl.cpp @@ -52,7 +52,7 @@ const int XA_OK(8); void DtxHandlerImpl::select() { channel.selectDtx(); - dClient.selectOk();//GRS context.getRequestId()); + dClient.selectOk(); } void DtxHandlerImpl::end(u_int16_t /*ticket*/, @@ -66,7 +66,7 @@ void DtxHandlerImpl::end(u_int16_t /*ticket*/, if (suspend) { throw ConnectionException(503, "End and suspend cannot both be set."); } else { - dClient.endOk(XA_RBROLLBACK);//GRS, context.getRequestId()); + dClient.endOk(XA_RBROLLBACK); } } else { if (suspend) { @@ -74,10 +74,10 @@ void DtxHandlerImpl::end(u_int16_t /*ticket*/, } else { channel.endDtx(xid, false); } - dClient.endOk(XA_OK);//GRS, context.getRequestId()); + dClient.endOk(XA_OK); } } catch (const DtxTimeoutException& e) { - dClient.endOk(XA_RBTIMEOUT);//GRS, context.getRequestId()); + dClient.endOk(XA_RBTIMEOUT); } } @@ -95,9 +95,9 @@ void DtxHandlerImpl::start(u_int16_t /*ticket*/, } else { channel.startDtx(xid, broker.getDtxManager(), join); } - dClient.startOk(XA_OK);//GRS, context.getRequestId()); + dClient.startOk(XA_OK); } catch (const DtxTimeoutException& e) { - dClient.startOk(XA_RBTIMEOUT);//GRS, context.getRequestId()); + dClient.startOk(XA_RBTIMEOUT); } } @@ -108,9 +108,9 @@ void DtxHandlerImpl::prepare(u_int16_t /*ticket*/, { try { bool ok = broker.getDtxManager().prepare(xid); - cClient.prepareOk(ok ? XA_OK : XA_RBROLLBACK);//GRS, context.getRequestId()); + cClient.prepareOk(ok ? XA_OK : XA_RBROLLBACK); } catch (const DtxTimeoutException& e) { - cClient.prepareOk(XA_RBTIMEOUT);//GRS, context.getRequestId()); + cClient.prepareOk(XA_RBTIMEOUT); } } @@ -120,9 +120,9 @@ void DtxHandlerImpl::commit(u_int16_t /*ticket*/, { try { bool ok = broker.getDtxManager().commit(xid, onePhase); - cClient.commitOk(ok ? XA_OK : XA_RBROLLBACK);//GRS, context.getRequestId()); + cClient.commitOk(ok ? XA_OK : XA_RBROLLBACK); } catch (const DtxTimeoutException& e) { - cClient.commitOk(XA_RBTIMEOUT);//GRS, context.getRequestId()); + cClient.commitOk(XA_RBTIMEOUT); } } @@ -132,9 +132,9 @@ void DtxHandlerImpl::rollback(u_int16_t /*ticket*/, { try { broker.getDtxManager().rollback(xid); - cClient.rollbackOk(XA_OK);//GRS, context.getRequestId()); + cClient.rollbackOk(XA_OK); } catch (const DtxTimeoutException& e) { - cClient.rollbackOk(XA_RBTIMEOUT);//GRS, context.getRequestId()); + cClient.rollbackOk(XA_RBTIMEOUT); } } @@ -171,7 +171,7 @@ void DtxHandlerImpl::recover(u_int16_t /*ticket*/, FieldTable response; response.setString("xids", data); - cClient.recoverOk(response);//GRS, context.getRequestId()); + cClient.recoverOk(response); } void DtxHandlerImpl::forget(u_int16_t /*ticket*/, @@ -184,7 +184,7 @@ void DtxHandlerImpl::forget(u_int16_t /*ticket*/, void DtxHandlerImpl::getTimeout(const string& xid) { uint32_t timeout = broker.getDtxManager().getTimeout(xid); - cClient.getTimeoutOk(timeout);//GRS, context.getRequestId()); + cClient.getTimeoutOk(timeout); } @@ -193,7 +193,7 @@ void DtxHandlerImpl::setTimeout(u_int16_t /*ticket*/, u_int32_t timeout) { broker.getDtxManager().setTimeout(xid, timeout); - cClient.setTimeoutOk();//GRS context.getRequestId()); + cClient.setTimeoutOk(); } void DtxHandlerImpl::setResponseTo(framing::RequestId r) diff --git a/cpp/src/qpid/broker/InMemoryContent.cpp b/cpp/src/qpid/broker/InMemoryContent.cpp index 96563b0329..a6ce820f7e 100644 --- a/cpp/src/qpid/broker/InMemoryContent.cpp +++ b/cpp/src/qpid/broker/InMemoryContent.cpp @@ -47,13 +47,13 @@ void InMemoryContent::send(ChannelAdapter& channel, uint32_t framesize) uint32_t offset = 0; for (int chunk = (*i)->size() / framesize; chunk > 0; chunk--) { string data = (*i)->getData().substr(offset, framesize); - channel.send(new AMQContentBody(data)); + channel.send(make_shared_ptr(new AMQContentBody(data))); offset += framesize; } uint32_t remainder = (*i)->size() % framesize; if (remainder) { string data = (*i)->getData().substr(offset, remainder); - channel.send(new AMQContentBody(data)); + channel.send(make_shared_ptr(new AMQContentBody(data))); } } else { AMQBody::shared_ptr contentBody = diff --git a/cpp/src/qpid/broker/LazyLoadedContent.cpp b/cpp/src/qpid/broker/LazyLoadedContent.cpp index c11d049317..80d06ebf2b 100644 --- a/cpp/src/qpid/broker/LazyLoadedContent.cpp +++ b/cpp/src/qpid/broker/LazyLoadedContent.cpp @@ -52,12 +52,12 @@ void LazyLoadedContent::send(ChannelAdapter& channel, uint32_t framesize) string data; store->loadContent(*msg, data, offset, remaining > framesize ? framesize : remaining); - channel.send(new AMQContentBody(data)); + channel.send(make_shared_ptr(new AMQContentBody(data))); } } else { string data; store->loadContent(*msg, data, 0, expectedSize); - channel.send(new AMQContentBody(data)); + channel.send(make_shared_ptr(new AMQContentBody(data))); } } diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp index c9fbc2b95d..de32368158 100644 --- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp +++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp @@ -45,14 +45,14 @@ void MessageHandlerImpl::cancel(const string& destination ) { channel.cancel(destination); - client.ok();//GRS context.getRequestId()); + client.ok(); } void MessageHandlerImpl::open(const string& reference) { references.open(reference); - client.ok();//GRS context.getRequestId()); + client.ok(); } void @@ -60,14 +60,14 @@ MessageHandlerImpl::append(const framing::MethodContext& context) { MessageAppendBody::shared_ptr body(boost::shared_polymorphic_downcast<MessageAppendBody>(context.methodBody)); references.get(body->getReference())->append(body); - client.ok();//GRS context.getRequestId()); + client.ok(); } void MessageHandlerImpl::close(const string& reference) { Reference::shared_ptr ref = references.get(reference); - client.ok();//GRS context.getRequestId()); + client.ok(); // Send any transfer messages to their correct exchanges and okay them const Reference::Messages& msgs = ref->getMessages(); @@ -85,7 +85,7 @@ MessageHandlerImpl::checkpoint(const string& /*reference*/, { // Initial implementation (which is conforming) is to do nothing here // and return offset zero for the resume - client.ok();//GRS context.getRequestId()); + client.ok(); } void @@ -95,7 +95,7 @@ MessageHandlerImpl::resume(const string& reference, // Initial (null) implementation // open reference and return 0 offset references.open(reference); - client.offset(0);//GRS, );//GRS, context.getRequestId()); + client.offset(0); } void @@ -123,7 +123,7 @@ MessageHandlerImpl::consume(uint16_t /*ticket*/, channel.consume(std::auto_ptr<DeliveryAdapter>(new ConsumeAdapter(adapter, destination, connection.getFrameMax())), tag, queue, !noAck, exclusive, noLocal ? &connection : 0, &filter); - client.ok();//GRS context.getRequestId()); + client.ok(); // Dispatch messages as there is now a consumer. queue->requestDispatch(); } @@ -138,9 +138,9 @@ MessageHandlerImpl::get(uint16_t /*ticket*/, GetAdapter out(adapter, queue, destination, connection.getFrameMax()); if(channel.get(out, queue, !noAck)) - client.ok();//GRS context.getRequestId()); + client.ok(); else - client.empty();//GRS context.getRequestId()); + client.empty(); } void @@ -166,22 +166,19 @@ MessageHandlerImpl::qos(uint32_t prefetchSize, //TODO: handle global channel.setPrefetchSize(prefetchSize); channel.setPrefetchCount(prefetchCount); - client.ok();//GRS context.getRequestId()); + client.ok(); } void MessageHandlerImpl::recover(bool requeue) { channel.recover(requeue); - client.ok();//GRS context.getRequestId()); + client.ok(); } void -MessageHandlerImpl::reject(uint16_t /*code*/, - const string& /*text*/ ) +MessageHandlerImpl::reject(uint16_t /*code*/, const string& /*text*/ ) { - //channel.ack(); - // channel.requeue(); } void diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp index a96a8c5cde..f616ec2db8 100644 --- a/cpp/src/qpid/broker/SemanticHandler.cpp +++ b/cpp/src/qpid/broker/SemanticHandler.cpp @@ -36,7 +36,7 @@ SemanticHandler::SemanticHandler(ChannelId id, Connection& c) : void SemanticHandler::handle(framing::AMQFrame& frame) -{ +{ //TODO: assembly etc when move to 0-10 framing // //have potentially three separate tracks at this point: @@ -56,16 +56,41 @@ void SemanticHandler::handle(framing::AMQFrame& frame) //if ready to execute (i.e. if segment is complete or frame is //message content): handleBody(frame.getBody()); - //if the frameset is complete, we can move the execution-mark - //forward (not for execution controls) - //note: need to be more sophisticated than this if we execute - //commands that arrive within an active message frameset } //ChannelAdapter virtual methods: void SemanticHandler::handleMethodInContext(boost::shared_ptr<qpid::framing::AMQMethodBody> method, const qpid::framing::MethodContext& context) { + if (!method->invoke(this)) { + //else do the usual: + handleL4(method, context); + //(if the frameset is complete) we can move the execution-mark + //forward + ++(incoming.hwm); + + //note: need to be more sophisticated than this if we execute + //commands that arrive within an active message frameset (that + //can't happen until 0-10 framing is implemented) + } +} + +void SemanticHandler::complete(u_int32_t mark) +{ + //just record it for now (will eventually need to use it to ack messages): + outgoing.lwm = SequenceNumber(mark); +} + +void SemanticHandler::flush() +{ + //flush doubles as a sync to begin with - send an execution.complete + incoming.lwm = incoming.hwm; + send(make_shared_ptr(new ExecutionCompleteBody(getVersion(), incoming.hwm.getValue()))); +} + +void SemanticHandler::handleL4(boost::shared_ptr<qpid::framing::AMQMethodBody> method, + const qpid::framing::MethodContext& context) +{ try{ if(getId() != 0 && !method->isA<ChannelOpenBody>() && !isOpen()) { if (!method->isA<ChannelCloseOkBody>()) { diff --git a/cpp/src/qpid/broker/SemanticHandler.h b/cpp/src/qpid/broker/SemanticHandler.h index eecf61466b..6003bbec0c 100644 --- a/cpp/src/qpid/broker/SemanticHandler.h +++ b/cpp/src/qpid/broker/SemanticHandler.h @@ -25,6 +25,7 @@ #include "BrokerChannel.h" #include "Connection.h" #include "qpid/framing/amqp_types.h" +#include "qpid/framing/AMQP_ServerOperations.h" #include "qpid/framing/FrameHandler.h" #include "qpid/framing/SequenceNumber.h" @@ -34,11 +35,18 @@ namespace broker { class BrokerAdapter; class framing::ChannelAdapter; -class SemanticHandler : private framing::ChannelAdapter, public framing::FrameHandler { +class SemanticHandler : private framing::ChannelAdapter, + public framing::FrameHandler, + public framing::AMQP_ServerOperations::ExecutionHandler +{ Connection& connection; Channel channel; std::auto_ptr<BrokerAdapter> adapter; - framing::SequenceNumber executionMark; + framing::Window incoming; + framing::Window outgoing; + + void handleL4(boost::shared_ptr<qpid::framing::AMQMethodBody> method, + const qpid::framing::MethodContext& context); //ChannelAdapter virtual methods: void handleMethodInContext(boost::shared_ptr<qpid::framing::AMQMethodBody> method, @@ -50,6 +58,10 @@ class SemanticHandler : private framing::ChannelAdapter, public framing::FrameHa public: SemanticHandler(framing::ChannelId id, Connection& c); void handle(framing::AMQFrame& frame); + + //execution class method handlers: + void complete(u_int32_t cumulativeExecutionMark); + void flush(); }; }} diff --git a/cpp/src/qpid/client/BasicMessageChannel.cpp b/cpp/src/qpid/client/BasicMessageChannel.cpp index 60368268c0..a1aacdee4e 100644 --- a/cpp/src/qpid/client/BasicMessageChannel.cpp +++ b/cpp/src/qpid/client/BasicMessageChannel.cpp @@ -101,7 +101,7 @@ void BasicMessageChannel::cancel(const std::string& tag, bool synch) { consumers.erase(i); } if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0) { - channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true)); + channel.send(make_shared_ptr(new BasicAckBody(channel.version, c.lastDeliveryTag, true))); } channel.sendAndReceiveSync<BasicCancelOkBody>( synch, make_shared_ptr(new BasicCancelBody(channel.version, tag, !synch))); @@ -119,9 +119,9 @@ void BasicMessageChannel::cancelAll(){ Consumer& c = i->second; if (c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0) { - channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true)); + channel.send(make_shared_ptr(new BasicAckBody(channel.version, c.lastDeliveryTag, true))); } - channel.send(new BasicCancelBody(channel.version, i->first, true)); + channel.send(make_shared_ptr(new BasicCancelBody(channel.version, i->first, true))); } consumers.clear(); } @@ -131,8 +131,7 @@ bool BasicMessageChannel::get( { // Prepare for incoming response incoming.addDestination(BASIC_GET, destGet); - channel.send( - new BasicGetBody(channel.version, 0, queue.getName(), ackMode)); + channel.send(make_shared_ptr(new BasicGetBody(channel.version, 0, queue.getName(), ackMode))); bool got = destGet.wait(msg); return got; } @@ -150,9 +149,7 @@ void BasicMessageChannel::publish( *static_cast<BasicHeaderProperties*>(header->getProperties()), msg); header->setContentSize(msg.getData().size()); - channel.send( - new BasicPublishBody( - channel.version, 0, e, key, mandatory, immediate)); + channel.send(make_shared_ptr(new BasicPublishBody(channel.version, 0, e, key, mandatory, immediate))); channel.send(header); string data = msg.getData(); u_int64_t data_length = data.length(); @@ -160,14 +157,14 @@ void BasicMessageChannel::publish( //frame itself uses 8 bytes u_int32_t frag_size = channel.connection->getMaxFrameSize() - 8; if(data_length < frag_size){ - channel.send(new AMQContentBody(data)); + channel.send(make_shared_ptr(new AMQContentBody(data))); }else{ u_int32_t offset = 0; u_int32_t remaining = data_length - offset; while (remaining > 0) { u_int32_t length = remaining > frag_size ? frag_size : remaining; string frag(data.substr(offset, length)); - channel.send(new AMQContentBody(frag)); + channel.send(make_shared_ptr(new AMQContentBody(frag))); offset += length; remaining = data_length - offset; @@ -268,11 +265,11 @@ void BasicMessageChannel::deliver(Consumer& consumer, Message& msg){ //else drop-through case AUTO_ACK: consumer.lastDeliveryTag = 0; - channel.send( + channel.send(make_shared_ptr( new BasicAckBody( channel.version, msg.getDeliveryTag(), - multiple)); + multiple))); case NO_ACK: // Nothing to do case CLIENT_ACK: // User code must ack. break; diff --git a/cpp/src/qpid/client/ClientChannel.cpp b/cpp/src/qpid/client/ClientChannel.cpp index ab6b9a41c3..816ff05e85 100644 --- a/cpp/src/qpid/client/ClientChannel.cpp +++ b/cpp/src/qpid/client/ClientChannel.cpp @@ -92,10 +92,10 @@ void Channel::protocolInit( connection->send(new AMQFrame(0, new ConnectionSecureOkBody(response))); **/ - send(new ConnectionTuneOkBody( + sendCommand(make_shared_ptr(new ConnectionTuneOkBody( version, proposal->getRequestId(), proposal->getChannelMax(), connection->getMaxFrameSize(), - proposal->getHeartbeat())); + proposal->getHeartbeat()))); uint16_t heartbeat = proposal->getHeartbeat(); connection->connector->setReadTimeout(heartbeat * 2); @@ -104,7 +104,7 @@ void Channel::protocolInit( // Send connection open. std::string capabilities; responses.expect(); - send(new ConnectionOpenBody(version, vhost, capabilities, true)); + sendCommand(make_shared_ptr(new ConnectionOpenBody(version, vhost, capabilities, true))); //receive connection.open-ok (or redirect, but ignore that for now //esp. as using force=true). AMQMethodBody::shared_ptr openResponse = responses.receive(); @@ -210,6 +210,7 @@ AMQMethodBody::shared_ptr method, const MethodContext& ctxt) case BasicGetOkBody::CLASS_ID: messaging->handle(method); break; case ChannelCloseBody::CLASS_ID: handleChannel(method, ctxt); break; case ConnectionCloseBody::CLASS_ID: handleConnection(method); break; + case ExecutionCompleteBody::CLASS_ID: handleExecution(method); break; default: throw UnknownMethod(); } } @@ -223,7 +224,7 @@ AMQMethodBody::shared_ptr method, const MethodContext& ctxt) void Channel::handleChannel(AMQMethodBody::shared_ptr method, const MethodContext& ctxt) { switch (method->amqpMethodId()) { case ChannelCloseBody::METHOD_ID: - send(new ChannelCloseOkBody(version, ctxt.getRequestId())); + sendCommand(make_shared_ptr(new ChannelCloseOkBody(version, ctxt.getRequestId()))); peerClose(shared_polymorphic_downcast<ChannelCloseBody>(method)); return; case ChannelFlowBody::METHOD_ID: @@ -241,6 +242,18 @@ void Channel::handleConnection(AMQMethodBody::shared_ptr method) { throw UnknownMethod(); } +void Channel::handleExecution(AMQMethodBody::shared_ptr method) { + if (method->amqpMethodId() == ExecutionCompleteBody::METHOD_ID) { + Monitor::ScopedLock l(outgoingMonitor); + //record the completion mark: + outgoing.lwm = shared_polymorphic_downcast<ExecutionCompleteBody>(method)->getCumulativeExecutionMark(); + //TODO: notify anyone waiting for completion notification: + outgoingMonitor.notifyAll(); + } else{ + throw UnknownMethod(); + } +} + void Channel::handleHeader(AMQHeaderBody::shared_ptr body){ messaging->handle(body); } @@ -315,7 +328,7 @@ AMQMethodBody::shared_ptr Channel::sendAndReceive( AMQMethodBody::shared_ptr toSend, ClassId c, MethodId m) { responses.expect(); - send(toSend); + sendCommand(toSend); return responses.receive(c, m); } @@ -325,7 +338,7 @@ AMQMethodBody::shared_ptr Channel::sendAndReceiveSync( if(sync) return sendAndReceive(body, c, m); else { - send(body); + sendCommand(body); return AMQMethodBody::shared_ptr(); } } @@ -362,3 +375,31 @@ void Channel::run() { messaging->run(); } +void Channel::sendCommand(AMQBody::shared_ptr body) +{ + ++(outgoing.hwm); + send(body); +} + +bool Channel::waitForCompletion(SequenceNumber poi, Duration timeout) +{ + AbsTime end; + if (timeout == 0) { + end = AbsTime::FarFuture(); + } else { + end = AbsTime(AbsTime::now(), timeout); + } + + Monitor::ScopedLock l(outgoingMonitor); + while (end > AbsTime::now() && outgoing.lwm < poi) { + outgoingMonitor.wait(end); + } + return !(outgoing.lwm < poi); +} + +bool Channel::synchWithServer(Duration timeout) +{ + send(make_shared_ptr(new ExecutionFlushBody(version))); + return waitForCompletion(outgoing.hwm, timeout); +} + diff --git a/cpp/src/qpid/client/ClientChannel.h b/cpp/src/qpid/client/ClientChannel.h index cea1245e6a..fc82fb41ff 100644 --- a/cpp/src/qpid/client/ClientChannel.h +++ b/cpp/src/qpid/client/ClientChannel.h @@ -29,6 +29,7 @@ #include "ResponseHandler.h" #include "qpid/Exception.h" #include "qpid/framing/ChannelAdapter.h" +#include "qpid/framing/SequenceNumber.h" #include "qpid/sys/Thread.h" #include "AckMode.h" @@ -64,6 +65,8 @@ class Channel : public framing::ChannelAdapter Connection* connection; sys::Thread dispatcher; ResponseHandler responses; + sys::Monitor outgoingMonitor; + framing::Window outgoing; uint16_t prefetch; const bool transactional; @@ -84,6 +87,7 @@ class Channel : public framing::ChannelAdapter framing::AMQMethodBody::shared_ptr, const framing::MethodContext&); void handleChannel(framing::AMQMethodBody::shared_ptr method, const framing::MethodContext& ctxt); void handleConnection(framing::AMQMethodBody::shared_ptr method); + void handleExecution(framing::AMQMethodBody::shared_ptr method); void setQos(); @@ -114,9 +118,12 @@ class Channel : public framing::ChannelAdapter sync, body, BodyType::CLASS_ID, BodyType::METHOD_ID)); } + void sendCommand(framing::AMQBody::shared_ptr body); + void open(framing::ChannelId, Connection&); void closeInternal(); void peerClose(boost::shared_ptr<framing::ChannelCloseBody>); + bool waitForCompletion(framing::SequenceNumber, sys::Duration); // FIXME aconway 2007-02-23: Get rid of friendships. friend class Connection; @@ -358,7 +365,10 @@ class Channel : public framing::ChannelAdapter */ void run(); - + /** + * TESTING ONLY FOR NOW! + */ + bool synchWithServer(sys::Duration timeout = 0); }; }} diff --git a/cpp/src/qpid/framing/AMQMethodBody.cpp b/cpp/src/qpid/framing/AMQMethodBody.cpp index a7df0578c2..04941eaa58 100644 --- a/cpp/src/qpid/framing/AMQMethodBody.cpp +++ b/cpp/src/qpid/framing/AMQMethodBody.cpp @@ -36,6 +36,10 @@ void AMQMethodBody::invoke(AMQP_ServerOperations&, const MethodContext&){ THROW_QPID_ERROR(PROTOCOL_ERROR, "Method not supported by AMQP Server."); } +bool AMQMethodBody::invoke(Invocable*) { + return false; +} + AMQMethodBody::shared_ptr AMQMethodBody::create( AMQP_MethodVersionMap& versionMap, ProtocolVersion version, Buffer& buffer) diff --git a/cpp/src/qpid/framing/AMQMethodBody.h b/cpp/src/qpid/framing/AMQMethodBody.h index 306c26cd27..55cf5cb864 100644 --- a/cpp/src/qpid/framing/AMQMethodBody.h +++ b/cpp/src/qpid/framing/AMQMethodBody.h @@ -52,6 +52,7 @@ class AMQMethodBody : public AMQBody virtual ClassId amqpClassId() const = 0; virtual void invoke(AMQP_ServerOperations&, const MethodContext&); + virtual bool invoke(Invocable* target); template <class T> bool isA() { return amqpClassId()==T::CLASS_ID && amqpMethodId()==T::METHOD_ID; diff --git a/cpp/src/qpid/framing/ChannelAdapter.h b/cpp/src/qpid/framing/ChannelAdapter.h index 1c3f29d762..50b1c9ff7e 100644 --- a/cpp/src/qpid/framing/ChannelAdapter.h +++ b/cpp/src/qpid/framing/ChannelAdapter.h @@ -78,10 +78,6 @@ class ChannelAdapter : protected BodyHandler { RequestId send(shared_ptr<AMQBody> body, Correlator::Action action=Correlator::Action()); - // TODO aconway 2007-04-05: remove and use make_shared_ptr at call sites. - /**@deprecated Use make_shared_ptr with the other send() override */ - RequestId send(AMQBody* body) { return send(AMQBody::shared_ptr(body)); } - virtual bool isOpen() const = 0; RequestId getFirstAckRequest() { return requester.getFirstAckRequest(); } diff --git a/cpp/src/qpid/framing/SequenceNumber.h b/cpp/src/qpid/framing/SequenceNumber.h index bf9d133cef..3e0dfea2af 100644 --- a/cpp/src/qpid/framing/SequenceNumber.h +++ b/cpp/src/qpid/framing/SequenceNumber.h @@ -48,6 +48,12 @@ class SequenceNumber friend int32_t operator-(const SequenceNumber& a, const SequenceNumber& b); }; +struct Window +{ + SequenceNumber hwm; + SequenceNumber lwm; +}; + }} // namespace qpid::framing diff --git a/cpp/src/tests/client_test.cpp b/cpp/src/tests/client_test.cpp index 3b8a3a2ee6..cefc4338eb 100644 --- a/cpp/src/tests/client_test.cpp +++ b/cpp/src/tests/client_test.cpp @@ -35,6 +35,7 @@ #include "qpid/client/ClientMessage.h" #include "qpid/client/MessageListener.h" #include "qpid/sys/Monitor.h" +#include "qpid/sys/Time.h" using namespace qpid::client; using namespace qpid::sys; @@ -117,6 +118,11 @@ int main(int argc, char** argv) msg.setData(data); channel.publish(msg, exchange, "MyTopic"); if (opts.trace) std::cout << "Published message: " << data << std::endl; + if (opts.trace) { + std::cout << "Publication " + << (channel.synchWithServer(qpid::sys::TIME_SEC * 1) ? " DID " : " did NOT ") + << "complete" << std::endl; + } { Monitor::ScopedLock l(monitor); diff --git a/python/qpid/client.py b/python/qpid/client.py index cdceb87bdf..f1800204db 100644 --- a/python/qpid/client.py +++ b/python/qpid/client.py @@ -140,6 +140,9 @@ class ClientDelegate(Delegate): def connection_close(self, ch, msg): self.client.peer.close(msg) + def execution_complete(self, ch, msg): + ch.completion.complete(msg.cumulative_execution_mark) + def close(self, reason): self.client.closed = True self.client.reason = reason diff --git a/python/qpid/peer.py b/python/qpid/peer.py index 72e6a19bc7..9880eea19b 100644 --- a/python/qpid/peer.py +++ b/python/qpid/peer.py @@ -186,6 +186,8 @@ class Channel: self.requester = Requester(self.write) self.responder = Responder(self.write) + self.completion = ExecutionCompletion() + # Use reliable framing if version == 0-9. self.reliable = (spec.major == 0 and spec.minor == 9) self.synchronous = True @@ -247,6 +249,7 @@ class Channel: self.responder.respond(method, batch, request) def invoke(self, type, args, kwargs): + self.completion.next_command(type) content = kwargs.pop("content", None) frame = Method(type, type.arguments(*args, **kwargs)) if self.reliable: @@ -337,3 +340,30 @@ class Future: def is_complete(self): return self.completed.isSet() + +class ExecutionCompletion: + def __init__(self): + self.completed = threading.Event() + self.sequence = Sequence(0) + self.command_id = 0 + self.mark = 0 + + def next_command(self, method): + #the following test is a hack until the track/sub-channel is available + if method.klass.name != "execution": + self.command_id = self.sequence.next() + + def complete(self, mark): + self.mark = mark + self.completed.set() + self.completed.clear() + + def wait(self, point_of_interest=-1, timeout=None): + """ + todo: really want to allow different threads to call this with + different points of interest on the same channel, this is a quick + hack for now + """ + if point_of_interest == -1: point_of_interest = self.command_id + self.completed.wait(timeout) + return point_of_interest <= self.mark diff --git a/python/tests_0-9/basic.py b/python/tests_0-9/basic.py index 75633908cd..e7d22e00da 100644 --- a/python/tests_0-9/basic.py +++ b/python/tests_0-9/basic.py @@ -127,7 +127,7 @@ class BasicTests(TestBase): channel.basic_publish(routing_key="test-queue-4", content=Content("One")) myqueue = self.client.queue("my-consumer") - msg = myqueue.get(timeout=5) + msg = myqueue.get(timeout=1) self.assertEqual("One", msg.content.body) #cancel should stop messages being delivered diff --git a/specs/amqp-dtx-preview.0-9.xml b/specs/amqp-dtx-preview.0-9.xml index defbdd067e..dd70e91d1d 100644 --- a/specs/amqp-dtx-preview.0-9.xml +++ b/specs/amqp-dtx-preview.0-9.xml @@ -1040,4 +1040,38 @@ </method> </class> + <class name="execution" handler="execution" index="140"> + <doc> + This class allows for efficiently communicating information + about completion of processing. + </doc> + + <chassis name="server" implement="MUST"/> + <chassis name="client" implement="MUST"/> + + <method name="flush" index="10" label="request an execution.complete return method"> + <chassis name="server" implement="MUST"/> + <chassis name="client" implement="MUST"/> + </method> + + <method name="complete" index="20"> + <chassis name="server" implement="MUST"/> + <chassis name="client" implement="MUST"/> + + + <field name="cumulative-execution-mark" domain="long" label="Low-water mark for command ids"> + <doc> + The low-water mark for executed command-ids. All ids below this mark have been executed; + above this mark, there are gaps containing unexecuted command ids (i.e. discontinuous). By + definition, the first id above this mark (if it exists) is an unexecuted command-id. + </doc> + </field> + + + <!-- The ranged mark on the complete method has been temporarily removed --> + </method> + + </class> + + </amqp> |