summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-07-27 15:44:52 +0000
committerGordon Sim <gsim@apache.org>2007-07-27 15:44:52 +0000
commita2f1ddbe4175c6136b1188faeccaf1f8e561e3b2 (patch)
treefec659c6eb749cf6eca5b790a439962dd930281d
parent6054ada715929a82afa55601f1c4b5f226cf45b8 (diff)
downloadqpid-python-a2f1ddbe4175c6136b1188faeccaf1f8e561e3b2.tar.gz
Use execution layer to acknowledge messages.
Turn off 0-9 framing of requests and responses. Some refactoring around message delivery. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@560285 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/gentools/src/org/apache/qpid/gentools/AmqpMethod.java1
-rw-r--r--qpid/cpp/gentools/src/org/apache/qpid/gentools/CppGenerator.java22
-rw-r--r--qpid/cpp/gentools/templ.cpp/MethodBodyClass.h.tmpl1
-rw-r--r--qpid/cpp/src/Makefile.am7
-rw-r--r--qpid/cpp/src/qpid/broker/BrokerAdapter.cpp11
-rw-r--r--qpid/cpp/src/qpid/broker/BrokerChannel.cpp86
-rw-r--r--qpid/cpp/src/qpid/broker/BrokerChannel.h12
-rw-r--r--qpid/cpp/src/qpid/broker/BrokerMessage.cpp61
-rw-r--r--qpid/cpp/src/qpid/broker/BrokerMessage.h6
-rw-r--r--qpid/cpp/src/qpid/broker/BrokerMessageBase.h21
-rw-r--r--qpid/cpp/src/qpid/broker/BrokerMessageMessage.cpp29
-rw-r--r--qpid/cpp/src/qpid/broker/BrokerMessageMessage.h15
-rw-r--r--qpid/cpp/src/qpid/broker/ConsumeAdapter.cpp37
-rw-r--r--qpid/cpp/src/qpid/broker/DeliveryAdapter.h6
-rw-r--r--qpid/cpp/src/qpid/broker/DeliveryToken.h (renamed from qpid/cpp/src/qpid/broker/ConsumeAdapter.h)24
-rw-r--r--qpid/cpp/src/qpid/broker/GetAdapter.cpp40
-rw-r--r--qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp65
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticHandler.cpp52
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticHandler.h15
-rw-r--r--qpid/cpp/src/qpid/client/ClientChannel.cpp10
-rw-r--r--qpid/cpp/src/qpid/cluster/SessionManager.cpp8
-rw-r--r--qpid/cpp/src/qpid/framing/AMQMethodBody.cpp5
-rw-r--r--qpid/cpp/src/qpid/framing/AMQMethodBody.h6
-rw-r--r--qpid/cpp/src/qpid/framing/SequenceNumberSet.cpp61
-rw-r--r--qpid/cpp/src/qpid/framing/SequenceNumberSet.h (renamed from qpid/cpp/src/qpid/broker/GetAdapter.h)44
-rw-r--r--qpid/cpp/src/qpid/framing/amqp_types.h1
-rw-r--r--qpid/cpp/src/tests/BrokerChannelTest.cpp48
-rw-r--r--qpid/cpp/src/tests/FramingTest.cpp38
-rw-r--r--qpid/python/cpp_failing_0-10.txt13
-rw-r--r--qpid/python/qpid/codec.py25
-rw-r--r--qpid/python/qpid/message.py8
-rw-r--r--qpid/python/qpid/peer.py82
-rw-r--r--qpid/python/qpid/spec.py2
-rw-r--r--qpid/python/tests_0-10/broker.py2
-rw-r--r--qpid/python/tests_0-10/dtx.py26
-rw-r--r--qpid/python/tests_0-10/example.py2
-rw-r--r--qpid/python/tests_0-10/message.py24
-rw-r--r--qpid/python/tests_0-10/tx.py56
-rw-r--r--qpid/specs/amqp-transitional.0-10.xml13
39 files changed, 586 insertions, 399 deletions
diff --git a/qpid/cpp/gentools/src/org/apache/qpid/gentools/AmqpMethod.java b/qpid/cpp/gentools/src/org/apache/qpid/gentools/AmqpMethod.java
index e78eec112f..66c19532c6 100644
--- a/qpid/cpp/gentools/src/org/apache/qpid/gentools/AmqpMethod.java
+++ b/qpid/cpp/gentools/src/org/apache/qpid/gentools/AmqpMethod.java
@@ -51,6 +51,7 @@ public class AmqpMethod implements Printable, NodeAware, VersionConsistencyCheck
}
public boolean isResponse(AmqpVersion version) {
+ if (!CppGenerator.USE_RELIABLE_FRAMING) return false;
return (version == null) ? isResponseFlagMap.isSet() : isResponseFlagMap.isSet(version);
}
diff --git a/qpid/cpp/gentools/src/org/apache/qpid/gentools/CppGenerator.java b/qpid/cpp/gentools/src/org/apache/qpid/gentools/CppGenerator.java
index f31f9615fc..4fbda4e0fb 100644
--- a/qpid/cpp/gentools/src/org/apache/qpid/gentools/CppGenerator.java
+++ b/qpid/cpp/gentools/src/org/apache/qpid/gentools/CppGenerator.java
@@ -29,6 +29,8 @@ import java.util.TreeMap;
public class CppGenerator extends Generator
{
+ protected static final boolean USE_RELIABLE_FRAMING = false;
+
protected static final String versionNamespaceStartToken = "${version_namespace_start}";
protected static final String versionNamespaceEndToken = "${version_namespace_end}";
@@ -150,12 +152,11 @@ public class CppGenerator extends Generator
"buffer.putLongString(#)", // encodeExpression
"buffer.getLongString(#)")); // decodeExpression
- //NB: this is WRONG! but is here as a transitional aid
typeMap.put("rfc1982-long-set", new DomainInfo(
- "u_int16_t", // type
- "2", // size
- "buffer.putShort(#)", // encodeExpression
- "# = buffer.getShort()")); // decodeExpression
+ "SequenceNumberSet", // type
+ "#.encodedSize()", // size
+ "#.encode(buffer)", // encodeExpression
+ "#.decode(buffer)")); // decodeExpression
}
public boolean isQuietFlag()
@@ -378,6 +379,7 @@ public class CppGenerator extends Generator
}
private String baseClass(AmqpMethod method, AmqpVersion version) {
+ if (!USE_RELIABLE_FRAMING) return "AMQMethodBody";
String base = method.isResponse(version) ? "AMQResponseBody":"AMQRequestBody";
return base;
}
@@ -787,16 +789,6 @@ public class CppGenerator extends Generator
sb.append(generateMethodParameterList(thisFieldMap, indentSize + (5*tabSize), false, true, true));
}
- //if (abstractMethodFlag) sb.append("const MethodContext& context");
- //boolean leadingComma = abstractMethodFlag;
- //int paramIndent = indentSize + (5*tabSize);
- // sb.append(generateMethodParameterList(thisFieldMap, paramIndent, leadingComma, true, true));
- /*
- if (!abstractMethodFlag && method.isResponse(null)) {
- if (!thisFieldMap.isEmpty()) sb.append(", \n"+Utils.createSpaces(paramIndent));
- sb.append(" RequestId responseTo");
- }
- */
sb.append(" )");
if (abstractMethodFlag)
sb.append(" = 0");
diff --git a/qpid/cpp/gentools/templ.cpp/MethodBodyClass.h.tmpl b/qpid/cpp/gentools/templ.cpp/MethodBodyClass.h.tmpl
index 093a5ffe90..aeabd8a256 100644
--- a/qpid/cpp/gentools/templ.cpp/MethodBodyClass.h.tmpl
+++ b/qpid/cpp/gentools/templ.cpp/MethodBodyClass.h.tmpl
@@ -37,6 +37,7 @@
#include "qpid/framing/Buffer.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/FramingContent.h"
+#include "qpid/framing/SequenceNumberSet.h"
namespace qpid
{
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index f4e807cf66..cf1598bcca 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -155,6 +155,7 @@ libqpidcommon_la_SOURCES = \
qpid/framing/Requester.cpp \
qpid/framing/Responder.cpp \
qpid/framing/SequenceNumber.cpp \
+ qpid/framing/SequenceNumberSet.cpp \
qpid/framing/Correlator.cpp \
qpid/framing/Value.cpp \
qpid/framing/Proxy.cpp \
@@ -200,7 +201,6 @@ libqpidbroker_la_SOURCES = \
qpid/broker/Connection.cpp \
qpid/broker/ConnectionAdapter.cpp \
qpid/broker/ConnectionFactory.cpp \
- qpid/broker/ConsumeAdapter.cpp \
qpid/broker/Daemon.cpp \
qpid/broker/DeliverableMessage.cpp \
qpid/broker/DeliveryRecord.cpp \
@@ -213,7 +213,6 @@ libqpidbroker_la_SOURCES = \
qpid/broker/DtxWorkRecord.cpp \
qpid/broker/ExchangeRegistry.cpp \
qpid/broker/FanOutExchange.cpp \
- qpid/broker/GetAdapter.cpp \
qpid/broker/HeadersExchange.cpp \
qpid/broker/InMemoryContent.cpp \
qpid/broker/LazyLoadedContent.cpp \
@@ -258,11 +257,11 @@ nobase_include_HEADERS = \
qpid/broker/BrokerMessageBase.h \
qpid/broker/BrokerQueue.h \
qpid/broker/CompletionHandler.h \
- qpid/broker/ConsumeAdapter.h \
qpid/broker/Consumer.h \
qpid/broker/Deliverable.h \
qpid/broker/DeliverableMessage.h \
qpid/broker/DeliveryAdapter.h \
+ qpid/broker/DeliveryToken.h \
qpid/broker/DirectExchange.h \
qpid/broker/DtxAck.h \
qpid/broker/DtxBuffer.h \
@@ -272,7 +271,6 @@ nobase_include_HEADERS = \
qpid/broker/DtxWorkRecord.h \
qpid/broker/ExchangeRegistry.h \
qpid/broker/FanOutExchange.h \
- qpid/broker/GetAdapter.h \
qpid/broker/HandlerImpl.h \
qpid/broker/InMemoryContent.h \
qpid/broker/MessageBuilder.h \
@@ -362,6 +360,7 @@ nobase_include_HEADERS = \
qpid/framing/Responder.h \
qpid/framing/SerializeHandler.h \
qpid/framing/SequenceNumber.h \
+ qpid/framing/SequenceNumberSet.h \
qpid/framing/Value.h \
qpid/framing/Uuid.h \
qpid/framing/amqp_framing.h \
diff --git a/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp b/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
index 376108193a..8edf448bc4 100644
--- a/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
+++ b/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
@@ -20,8 +20,7 @@
#include "BrokerAdapter.h"
#include "BrokerChannel.h"
#include "Connection.h"
-#include "ConsumeAdapter.h"
-#include "GetAdapter.h"
+#include "DeliveryToken.h"
#include "qpid/framing/AMQMethodBody.h"
#include "qpid/Exception.h"
@@ -325,8 +324,8 @@ void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/,
//need to generate name here, so we have it for the adapter (it is
//also version specific behaviour now)
if (newTag.empty()) newTag = tagGenerator.generate();
- channel.consume(std::auto_ptr<DeliveryAdapter>(new ConsumeAdapter(adapter, newTag, connection.getFrameMax())),
- newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &fields);
+ DeliveryToken::shared_ptr token(BasicMessage::createConsumeToken(newTag));
+ channel.consume(token, newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &fields);
if(!nowait) client.consumeOk(newTag);
@@ -357,8 +356,8 @@ void BrokerAdapter::BasicHandlerImpl::publish(uint16_t /*ticket*/,
void BrokerAdapter::BasicHandlerImpl::get(uint16_t /*ticket*/, const string& queueName, bool noAck){
Queue::shared_ptr queue = getQueue(queueName);
- GetAdapter out(adapter, queue, "", connection.getFrameMax());
- if(!channel.get(out, queue, !noAck)){
+ DeliveryToken::shared_ptr token(BasicMessage::createGetToken(queue));
+ if(!channel.get(token, queue, !noAck)){
string clusterId;//not used, part of an imatix hack
client.getEmpty(clusterId);
diff --git a/qpid/cpp/src/qpid/broker/BrokerChannel.cpp b/qpid/cpp/src/qpid/broker/BrokerChannel.cpp
index a598717c5d..c50fbd5559 100644
--- a/qpid/cpp/src/qpid/broker/BrokerChannel.cpp
+++ b/qpid/cpp/src/qpid/broker/BrokerChannel.cpp
@@ -49,9 +49,10 @@ using namespace qpid::framing;
using namespace qpid::sys;
-Channel::Channel(Connection& con, ChannelId _id, MessageStore* const _store) :
+Channel::Channel(Connection& con, DeliveryAdapter& _out, ChannelId _id, MessageStore* const _store) :
id(_id),
connection(con),
+ out(_out),
currentDeliveryTag(1),
prefetchSize(0),
prefetchCount(0),
@@ -76,7 +77,7 @@ bool Channel::exists(const string& consumerTag){
// TODO aconway 2007-02-12: Why is connection token passed in instead
// of using the channel's parent connection?
-void Channel::consume(std::auto_ptr<DeliveryAdapter> adapter, string& tagInOut,
+void Channel::consume(DeliveryToken::shared_ptr token, string& tagInOut,
Queue::shared_ptr queue, bool acks,
bool exclusive, ConnectionToken* const connection,
const FieldTable*)
@@ -84,7 +85,7 @@ void Channel::consume(std::auto_ptr<DeliveryAdapter> adapter, string& tagInOut,
if(tagInOut.empty())
tagInOut = tagGenerator.generate();
std::auto_ptr<ConsumerImpl> c(
- new ConsumerImpl(this, adapter, tagInOut, queue, connection, acks));
+ new ConsumerImpl(this, token, tagInOut, queue, connection, acks));
queue->consume(c.get(), exclusive);//may throw exception
consumers.insert(tagInOut, c.release());
}
@@ -97,7 +98,8 @@ void Channel::cancel(const string& tag){
consumers.erase(i);
}
-void Channel::close(){
+void Channel::close()
+{
opened = false;
consumers.clear();
if (dtxBuffer.get()) {
@@ -106,11 +108,15 @@ void Channel::close(){
recover(true);
}
-void Channel::startTx(){
+void Channel::startTx()
+{
txBuffer = TxBuffer::shared_ptr(new TxBuffer());
}
-void Channel::commit(){
+void Channel::commit()
+{
+ if (!txBuffer) throw ConnectionException(503, "Channel has not been selected for use with transactions");
+
TxOp::shared_ptr txAck(new TxAck(accumulatedAck, unacked));
txBuffer->enlist(txAck);
if (txBuffer->commitLocal(store)) {
@@ -118,16 +124,21 @@ void Channel::commit(){
}
}
-void Channel::rollback(){
+void Channel::rollback()
+{
+ if (!txBuffer) throw ConnectionException(503, "Channel has not been selected for use with transactions");
+
txBuffer->rollback();
accumulatedAck.clear();
}
-void Channel::selectDtx(){
+void Channel::selectDtx()
+{
dtxSelected = true;
}
-void Channel::startDtx(const std::string& xid, DtxManager& mgr, bool join){
+void Channel::startDtx(const std::string& xid, DtxManager& mgr, bool join)
+{
if (!dtxSelected) {
throw ConnectionException(503, "Channel has not been selected for use with dtx");
}
@@ -140,7 +151,8 @@ void Channel::startDtx(const std::string& xid, DtxManager& mgr, bool join){
}
}
-void Channel::endDtx(const std::string& xid, bool fail){
+void Channel::endDtx(const std::string& xid, bool fail)
+{
if (!dtxBuffer) {
throw ConnectionException(503, boost::format("xid %1% not associated with this channel") % xid);
}
@@ -160,7 +172,8 @@ void Channel::endDtx(const std::string& xid, bool fail){
dtxBuffer.reset();
}
-void Channel::suspendDtx(const std::string& xid){
+void Channel::suspendDtx(const std::string& xid)
+{
if (dtxBuffer->getXid() != xid) {
throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on suspend")
% dtxBuffer->getXid() % xid);
@@ -171,7 +184,8 @@ void Channel::suspendDtx(const std::string& xid){
dtxBuffer->setSuspended(true);
}
-void Channel::resumeDtx(const std::string& xid){
+void Channel::resumeDtx(const std::string& xid)
+{
if (dtxBuffer->getXid() != xid) {
throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on resume")
% dtxBuffer->getXid() % xid);
@@ -199,20 +213,22 @@ void Channel::record(const DeliveryRecord& delivery)
delivery.addTo(&outstanding);
}
-bool Channel::checkPrefetch(Message::shared_ptr& msg){
+bool Channel::checkPrefetch(Message::shared_ptr& msg)
+{
Mutex::ScopedLock locker(deliveryLock);
bool countOk = !prefetchCount || prefetchCount > unacked.size();
bool sizeOk = !prefetchSize || prefetchSize > msg->contentSize() + outstanding.size || unacked.empty();
return countOk && sizeOk;
}
-Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, std::auto_ptr<DeliveryAdapter> _adapter,
+Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, DeliveryToken::shared_ptr _token,
const string& _tag, Queue::shared_ptr _queue,
ConnectionToken* const _connection, bool ack
- ) : parent(_parent), adapter(_adapter), tag(_tag), queue(_queue), connection(_connection),
+ ) : parent(_parent), token(_token), tag(_tag), queue(_queue), connection(_connection),
ackExpected(ack), blocked(false) {}
-bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){
+bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg)
+{
if(!connection || connection != msg->getPublisher()){//check for no_local
if(!parent->flowActive || (ackExpected && !parent->checkPrefetch(msg))){
blocked = true;
@@ -220,11 +236,10 @@ bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){
blocked = false;
Mutex::ScopedLock locker(parent->deliveryLock);
- uint64_t deliveryTag = adapter->getNextDeliveryTag();
+ uint64_t deliveryTag = parent->out.deliver(msg, token);
if(ackExpected){
parent->record(DeliveryRecord(msg, queue, tag, deliveryTag));
}
- adapter->deliver(msg, deliveryTag);
return true;
}
@@ -234,14 +249,15 @@ bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){
void Channel::ConsumerImpl::redeliver(Message::shared_ptr& msg, uint64_t deliveryTag) {
Mutex::ScopedLock locker(parent->deliveryLock);
- adapter->deliver(msg, deliveryTag);
+ parent->out.redeliver(msg, token, deliveryTag);
}
Channel::ConsumerImpl::~ConsumerImpl() {
cancel();
}
-void Channel::ConsumerImpl::cancel(){
+void Channel::ConsumerImpl::cancel()
+{
if(queue) {
queue->cancel(this);
if (queue->canAutoDelete()) {
@@ -251,27 +267,32 @@ void Channel::ConsumerImpl::cancel(){
}
}
-void Channel::ConsumerImpl::requestDispatch(){
+void Channel::ConsumerImpl::requestDispatch()
+{
if(blocked)
queue->requestDispatch();
}
-void Channel::handleInlineTransfer(Message::shared_ptr msg){
+void Channel::handleInlineTransfer(Message::shared_ptr msg)
+{
complete(msg);
}
-void Channel::handlePublish(Message* _message){
+void Channel::handlePublish(Message* _message)
+{
Message::shared_ptr message(_message);
messageBuilder.initialise(message);
}
-void Channel::handleHeader(AMQHeaderBody::shared_ptr header){
+void Channel::handleHeader(AMQHeaderBody::shared_ptr header)
+{
messageBuilder.setHeader(header);
//at this point, decide based on the size of the message whether we want
//to stage it by saving content directly to disk as it arrives
}
-void Channel::handleContent(AMQContentBody::shared_ptr content){
+void Channel::handleContent(AMQContentBody::shared_ptr content)
+{
messageBuilder.addContent(content);
}
@@ -306,14 +327,16 @@ void Channel::route(Message::shared_ptr msg, Deliverable& strategy) {
}
// Used by Basic
-void Channel::ack(uint64_t deliveryTag, bool multiple){
+void Channel::ack(uint64_t deliveryTag, bool multiple)
+{
if (multiple)
ack(0, deliveryTag);
else
ack(deliveryTag, deliveryTag);
}
-void Channel::ack(uint64_t firstTag, uint64_t lastTag){
+void Channel::ack(uint64_t firstTag, uint64_t lastTag)
+{
if (txBuffer.get()) {
accumulatedAck.update(firstTag, lastTag);
//TODO: I think the outstanding prefetch size & count should be updated at this point...
@@ -355,7 +378,8 @@ void Channel::ack(uint64_t firstTag, uint64_t lastTag){
}
}
-void Channel::recover(bool requeue){
+void Channel::recover(bool requeue)
+{
Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery
if(requeue){
@@ -368,12 +392,12 @@ void Channel::recover(bool requeue){
}
}
-bool Channel::get(DeliveryAdapter& adapter, Queue::shared_ptr queue, bool ackExpected){
+bool Channel::get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool ackExpected)
+{
Message::shared_ptr msg = queue->dequeue();
if(msg){
Mutex::ScopedLock locker(deliveryLock);
- uint64_t myDeliveryTag = adapter.getNextDeliveryTag();
- adapter.deliver(msg, myDeliveryTag);
+ uint64_t myDeliveryTag = out.deliver(msg, token);
if(ackExpected){
unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag));
}
diff --git a/qpid/cpp/src/qpid/broker/BrokerChannel.h b/qpid/cpp/src/qpid/broker/BrokerChannel.h
index a70dce0ce8..e9672c96d7 100644
--- a/qpid/cpp/src/qpid/broker/BrokerChannel.h
+++ b/qpid/cpp/src/qpid/broker/BrokerChannel.h
@@ -33,6 +33,7 @@
#include "Consumer.h"
#include "DeliveryAdapter.h"
#include "DeliveryRecord.h"
+#include "DeliveryToken.h"
#include "Deliverable.h"
#include "DtxBuffer.h"
#include "DtxManager.h"
@@ -64,7 +65,7 @@ class Channel : public CompletionHandler
class ConsumerImpl : public Consumer
{
Channel* parent;
- std::auto_ptr<DeliveryAdapter> adapter;
+ DeliveryToken::shared_ptr token;
const string tag;
Queue::shared_ptr queue;
ConnectionToken* const connection;
@@ -72,7 +73,7 @@ class Channel : public CompletionHandler
bool blocked;
public:
- ConsumerImpl(Channel* parent, std::auto_ptr<DeliveryAdapter> adapter,
+ ConsumerImpl(Channel* parent, DeliveryToken::shared_ptr token,
const string& tag, Queue::shared_ptr queue,
ConnectionToken* const connection, bool ack);
~ConsumerImpl();
@@ -86,6 +87,7 @@ class Channel : public CompletionHandler
framing::ChannelId id;
Connection& connection;
+ DeliveryAdapter& out;
uint64_t currentDeliveryTag;
Queue::shared_ptr defaultQueue;
ConsumerImplMap consumers;
@@ -110,7 +112,7 @@ class Channel : public CompletionHandler
void checkDtxTimeout();
public:
- Channel(Connection& parent, framing::ChannelId id, MessageStore* const store = 0);
+ Channel(Connection& parent, DeliveryAdapter& out, framing::ChannelId id, MessageStore* const store = 0);
~Channel();
bool isOpen() const { return opened; }
@@ -127,11 +129,11 @@ class Channel : public CompletionHandler
/**
*@param tagInOut - if empty it is updated with the generated token.
*/
- void consume(std::auto_ptr<DeliveryAdapter> adapter, string& tagInOut, Queue::shared_ptr queue, bool acks,
+ void consume(DeliveryToken::shared_ptr token, string& tagInOut, Queue::shared_ptr queue, bool acks,
bool exclusive, ConnectionToken* const connection = 0,
const framing::FieldTable* = 0);
void cancel(const string& tag);
- bool get(DeliveryAdapter& adapter, Queue::shared_ptr queue, bool ackExpected);
+ bool get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool ackExpected);
void close();
void startTx();
void commit();
diff --git a/qpid/cpp/src/qpid/broker/BrokerMessage.cpp b/qpid/cpp/src/qpid/broker/BrokerMessage.cpp
index d192b09a63..bf0e37e8e3 100644
--- a/qpid/cpp/src/qpid/broker/BrokerMessage.cpp
+++ b/qpid/cpp/src/qpid/broker/BrokerMessage.cpp
@@ -26,6 +26,7 @@
#include "InMemoryContent.h"
#include "LazyLoadedContent.h"
#include "MessageStore.h"
+#include "BrokerQueue.h"
#include "qpid/log/Statement.h"
#include "qpid/framing/BasicDeliverBody.h"
#include "qpid/framing/BasicGetOkBody.h"
@@ -37,6 +38,30 @@
#include "qpid/framing/ChannelAdapter.h"
#include "RecoveryManagerImpl.h"
+namespace qpid{
+namespace broker{
+
+struct BasicGetToken : DeliveryToken
+{
+ typedef boost::shared_ptr<BasicGetToken> shared_ptr;
+
+ Queue::shared_ptr queue;
+
+ BasicGetToken(Queue::shared_ptr q) : queue(q) {}
+};
+
+struct BasicConsumeToken : DeliveryToken
+{
+ typedef boost::shared_ptr<BasicConsumeToken> shared_ptr;
+
+ const string consumer;
+
+ BasicConsumeToken(const string c) : consumer(c) {}
+};
+
+}
+}
+
using namespace boost;
using namespace qpid::broker;
using namespace qpid::framing;
@@ -74,6 +99,16 @@ bool BasicMessage::isComplete(){
return header.get() && (header->getContentSize() == contentSize());
}
+DeliveryToken::shared_ptr BasicMessage::createGetToken(Queue::shared_ptr queue)
+{
+ return DeliveryToken::shared_ptr(new BasicGetToken(queue));
+}
+
+DeliveryToken::shared_ptr BasicMessage::createConsumeToken(const string& consumer)
+{
+ return DeliveryToken::shared_ptr(new BasicConsumeToken(consumer));
+}
+
void BasicMessage::deliver(ChannelAdapter& channel,
const string& consumerTag, uint64_t deliveryTag,
uint32_t framesize)
@@ -86,23 +121,39 @@ void BasicMessage::deliver(ChannelAdapter& channel,
}
void BasicMessage::sendGetOk(ChannelAdapter& channel,
- const std::string& /*destination*/,
uint32_t messageCount,
- uint64_t responseTo,
+ uint64_t /*responseTo*/,
uint64_t deliveryTag,
uint32_t framesize)
{
channel.send(make_shared_ptr(
new BasicGetOkBody(
channel.getVersion(),
- responseTo,
+ //responseTo,
deliveryTag, getRedelivered(), getExchange(),
getRoutingKey(), messageCount)));
sendContent(channel, framesize);
}
-void BasicMessage::sendContent(
- ChannelAdapter& channel, uint32_t framesize)
+void BasicMessage::deliver(framing::ChannelAdapter& channel, uint64_t deliveryTag, DeliveryToken::shared_ptr token, uint32_t framesize)
+{
+ BasicConsumeToken::shared_ptr consume = dynamic_pointer_cast<BasicConsumeToken>(token);
+ if (consume) {
+ deliver(channel, consume->consumer, deliveryTag, framesize);
+ } else {
+ BasicGetToken::shared_ptr get = dynamic_pointer_cast<BasicGetToken>(token);
+ if (get) {
+ uint64_t request(1/*actual value doesn't affect anything at present*/);
+ sendGetOk(channel, get->queue->getMessageCount(), request, deliveryTag, framesize);
+ } else {
+ //TODO:
+ //either need to be able to convert to a message transfer or
+ //throw error of some kind to allow this to be handled higher up
+ }
+ }
+}
+
+void BasicMessage::sendContent(ChannelAdapter& channel, uint32_t framesize)
{
channel.send(header);
Mutex::ScopedLock locker(contentLock);
diff --git a/qpid/cpp/src/qpid/broker/BrokerMessage.h b/qpid/cpp/src/qpid/broker/BrokerMessage.h
index 2e031d0bb2..e6483b4733 100644
--- a/qpid/cpp/src/qpid/broker/BrokerMessage.h
+++ b/qpid/cpp/src/qpid/broker/BrokerMessage.h
@@ -43,6 +43,7 @@ class AMQHeaderBody;
namespace broker {
class MessageStore;
+class Queue;
using framing::string;
/**
@@ -70,13 +71,16 @@ class BasicMessage : public Message {
void addContent(framing::AMQContentBody::shared_ptr data);
bool isComplete();
+ static DeliveryToken::shared_ptr createGetToken(boost::shared_ptr<Queue> queue);
+ static DeliveryToken::shared_ptr createConsumeToken(const string& consumer);
+ void deliver(framing::ChannelAdapter& channel, uint64_t deliveryTag, DeliveryToken::shared_ptr token, uint32_t framesize);
+
void deliver(framing::ChannelAdapter&,
const string& consumerTag,
uint64_t deliveryTag,
uint32_t framesize);
void sendGetOk(framing::ChannelAdapter& channel,
- const std::string& destination,
uint32_t messageCount,
uint64_t responseTo,
uint64_t deliveryTag,
diff --git a/qpid/cpp/src/qpid/broker/BrokerMessageBase.h b/qpid/cpp/src/qpid/broker/BrokerMessageBase.h
index 73af3935a8..d9269fa94f 100644
--- a/qpid/cpp/src/qpid/broker/BrokerMessageBase.h
+++ b/qpid/cpp/src/qpid/broker/BrokerMessageBase.h
@@ -25,6 +25,7 @@
#include <string>
#include <boost/shared_ptr.hpp>
#include "Content.h"
+#include "DeliveryToken.h"
#include "PersistableMessage.h"
#include "qpid/framing/amqp_types.h"
@@ -91,23 +92,9 @@ class Message : public PersistableMessage{
void setPersistenceId(uint64_t _persistenceId) const { persistenceId = _persistenceId; }
void redeliver() { redelivered = true; }
- /**
- * Used to deliver the message from the queue
- */
- virtual void deliver(framing::ChannelAdapter& channel,
- const std::string& consumerTag,
- uint64_t deliveryTag,
- uint32_t framesize) = 0;
- /**
- * Used to return a message in response to a get from a queue
- */
- virtual void sendGetOk(framing::ChannelAdapter& channel,
- const std::string& destination,
- uint32_t messageCount,
- uint64_t responseTo,
- uint64_t deliveryTag,
- uint32_t framesize) = 0;
-
+ virtual void deliver(framing::ChannelAdapter& channel, uint64_t deliveryTag/*only needed for basic class*/,
+ DeliveryToken::shared_ptr token, uint32_t framesize) = 0;
+
virtual bool isComplete() = 0;
virtual uint64_t contentSize() const = 0;
diff --git a/qpid/cpp/src/qpid/broker/BrokerMessageMessage.cpp b/qpid/cpp/src/qpid/broker/BrokerMessageMessage.cpp
index efa295e44f..8e8eaf23f0 100644
--- a/qpid/cpp/src/qpid/broker/BrokerMessageMessage.cpp
+++ b/qpid/cpp/src/qpid/broker/BrokerMessageMessage.cpp
@@ -34,10 +34,18 @@
#include <algorithm>
using namespace std;
+using namespace boost;
using namespace qpid::framing;
namespace qpid {
namespace broker {
+
+struct MessageDeliveryToken : public DeliveryToken
+{
+ const std::string destination;
+
+ MessageDeliveryToken(const std::string& d) : destination(d) {}
+};
MessageMessage::MessageMessage(
ConnectionToken* publisher, RequestId requestId_, TransferPtr transfer_
@@ -179,22 +187,13 @@ void MessageMessage::transferMessage(
channel.send(make_shared_ptr(new MessageCloseBody(channel.getVersion(), ref->getId())));
}
-void MessageMessage::deliver(
- framing::ChannelAdapter& channel,
- const std::string& consumerTag,
- uint64_t /*deliveryTag*/,
- uint32_t framesize)
+
+void MessageMessage::deliver(ChannelAdapter& channel, uint64_t, DeliveryToken::shared_ptr token, uint32_t framesize)
{
- transferMessage(channel, consumerTag, framesize);
+ transferMessage(channel, shared_polymorphic_cast<MessageDeliveryToken>(token)->destination, framesize);
}
-void MessageMessage::sendGetOk(
- framing::ChannelAdapter& channel,
- const std::string& destination,
- uint32_t /*messageCount*/,
- uint64_t /*responseTo*/,
- uint64_t /*deliveryTag*/,
- uint32_t framesize)
+void MessageMessage::deliver(ChannelAdapter& channel, const std::string& destination, uint32_t framesize)
{
transferMessage(channel, destination, framesize);
}
@@ -321,6 +320,10 @@ MessageMessage::ReferencePtr MessageMessage::getReference() const {
return reference;
}
+DeliveryToken::shared_ptr MessageMessage::getToken(const std::string& destination)
+{
+ return DeliveryToken::shared_ptr(new MessageDeliveryToken(destination));
+}
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/BrokerMessageMessage.h b/qpid/cpp/src/qpid/broker/BrokerMessageMessage.h
index c2d4b7f20b..612f457ae4 100644
--- a/qpid/cpp/src/qpid/broker/BrokerMessageMessage.h
+++ b/qpid/cpp/src/qpid/broker/BrokerMessageMessage.h
@@ -53,17 +53,8 @@ class MessageMessage: public Message{
TransferPtr getTransfer() const { return transfer; }
ReferencePtr getReference() const ;
- void deliver(framing::ChannelAdapter& channel,
- const std::string& consumerTag,
- uint64_t deliveryTag,
- uint32_t framesize);
-
- void sendGetOk(framing::ChannelAdapter& channel,
- const std::string& destination,
- uint32_t messageCount,
- uint64_t responseTo,
- uint64_t deliveryTag,
- uint32_t framesize);
+ void deliver(framing::ChannelAdapter& channel, uint64_t deliveryTag, DeliveryToken::shared_ptr token, uint32_t framesize);
+ void deliver(framing::ChannelAdapter&, const std::string& destination, uint32_t framesize);
bool isComplete();
@@ -81,6 +72,8 @@ class MessageMessage: public Message{
void decodeHeader(framing::Buffer& buffer);
void decodeContent(framing::Buffer& buffer, uint32_t contentChunkSize = 0);
+ static DeliveryToken::shared_ptr getToken(const std::string& destination);
+
private:
void transferMessage(
framing::ChannelAdapter& channel,
diff --git a/qpid/cpp/src/qpid/broker/ConsumeAdapter.cpp b/qpid/cpp/src/qpid/broker/ConsumeAdapter.cpp
deleted file mode 100644
index 59b6795a77..0000000000
--- a/qpid/cpp/src/qpid/broker/ConsumeAdapter.cpp
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "ConsumeAdapter.h"
-
-using namespace qpid::broker;
-using qpid::framing::ChannelAdapter;
-using qpid::framing::RequestId;
-
-ConsumeAdapter::ConsumeAdapter(ChannelAdapter& a, const std::string t, uint32_t f) : adapter(a), tag(t), framesize(f) {}
-
-RequestId ConsumeAdapter::getNextDeliveryTag()
-{
- return adapter.getNextSendRequestId();
-}
-
-void ConsumeAdapter::deliver(Message::shared_ptr& msg, RequestId deliveryTag)
-{
- msg->deliver(adapter, tag, deliveryTag, framesize);
-}
diff --git a/qpid/cpp/src/qpid/broker/DeliveryAdapter.h b/qpid/cpp/src/qpid/broker/DeliveryAdapter.h
index 45b103bd68..971f4095cf 100644
--- a/qpid/cpp/src/qpid/broker/DeliveryAdapter.h
+++ b/qpid/cpp/src/qpid/broker/DeliveryAdapter.h
@@ -22,11 +22,13 @@
#define _DeliveryAdapter_
#include "BrokerMessageBase.h"
+#include "DeliveryToken.h"
#include "qpid/framing/amqp_types.h"
namespace qpid {
namespace broker {
+ typedef framing::RequestId DeliveryId;
/**
* The intention behind this interface is to separate the generic
* handling of some form of message delivery to clients that is
@@ -40,8 +42,8 @@ namespace broker {
class DeliveryAdapter
{
public:
- virtual framing::RequestId getNextDeliveryTag() = 0;
- virtual void deliver(Message::shared_ptr& msg, framing::RequestId tag) = 0;
+ virtual DeliveryId deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token) = 0;
+ virtual void redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId tag) = 0;
virtual ~DeliveryAdapter(){}
};
diff --git a/qpid/cpp/src/qpid/broker/ConsumeAdapter.h b/qpid/cpp/src/qpid/broker/DeliveryToken.h
index 43cda7753e..8bdf5e6359 100644
--- a/qpid/cpp/src/qpid/broker/ConsumeAdapter.h
+++ b/qpid/cpp/src/qpid/broker/DeliveryToken.h
@@ -18,23 +18,25 @@
* under the License.
*
*/
-#ifndef _ConsumeAdapter_
-#define _ConsumeAdapter_
+#ifndef _DeliveryToken_
+#define _DeliveryToken_
-#include "DeliveryAdapter.h"
-#include "qpid/framing/ChannelAdapter.h"
+#include <boost/shared_ptr.hpp>
namespace qpid {
namespace broker {
- class ConsumeAdapter : public DeliveryAdapter
+
+ /**
+ * A DeliveryToken allows the delivery of a message to be
+ * associated with whatever mechanism caused it to be
+ * delivered. (i.e. its a form of Memento).
+ */
+ class DeliveryToken
{
- framing::ChannelAdapter& adapter;
- const std::string tag;
- const uint32_t framesize;
public:
- ConsumeAdapter(framing::ChannelAdapter& adapter, const std::string tag, uint32_t framesize);
- framing::RequestId getNextDeliveryTag();
- void deliver(Message::shared_ptr& msg, framing::RequestId tag);
+ typedef boost::shared_ptr<DeliveryToken> shared_ptr;
+
+ virtual ~DeliveryToken(){}
};
}}
diff --git a/qpid/cpp/src/qpid/broker/GetAdapter.cpp b/qpid/cpp/src/qpid/broker/GetAdapter.cpp
deleted file mode 100644
index bbffade712..0000000000
--- a/qpid/cpp/src/qpid/broker/GetAdapter.cpp
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "GetAdapter.h"
-#include "qpid/framing/MethodContext.h"
-
-using namespace qpid::broker;
-using qpid::framing::ChannelAdapter;
-using qpid::framing::RequestId;
-using qpid::framing::MethodContext;
-
-GetAdapter::GetAdapter(ChannelAdapter& a, Queue::shared_ptr q, const std::string d, uint32_t f)
- : adapter(a), queue(q), destination(d), framesize(f) {}
-
-RequestId GetAdapter::getNextDeliveryTag()
-{
- return adapter.getNextSendRequestId();
-}
-
-void GetAdapter::deliver(Message::shared_ptr& msg, framing::RequestId deliveryTag)
-{
- msg->sendGetOk(adapter, destination, queue->getMessageCount(), 1, deliveryTag, framesize);
-}
diff --git a/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp
index 41dd8cc145..da57439e21 100644
--- a/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp
+++ b/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp
@@ -21,8 +21,6 @@
#include "BrokerChannel.h"
#include "qpid/framing/FramingContent.h"
#include "Connection.h"
-#include "ConsumeAdapter.h"
-#include "GetAdapter.h"
#include "Broker.h"
#include "BrokerMessageMessage.h"
#include "qpid/framing/MessageAppendBody.h"
@@ -45,66 +43,44 @@ void
MessageHandlerImpl::cancel(const string& destination )
{
channel.cancel(destination);
- //client.ok();
}
void
-MessageHandlerImpl::open(const string& reference)
+MessageHandlerImpl::open(const string& /*reference*/)
{
- references.open(reference);
- //client.ok();
+ throw ConnectionException(540, "References no longer supported");
}
void
-MessageHandlerImpl::append(const framing::MethodContext& context)
+MessageHandlerImpl::append(const framing::MethodContext& /*context*/)
{
- MessageAppendBody::shared_ptr body(boost::shared_polymorphic_downcast<MessageAppendBody>(context.methodBody));
- references.get(body->getReference())->append(body);
- //client.ok();
+ throw ConnectionException(540, "References no longer supported");
}
void
-MessageHandlerImpl::close(const string& reference)
+MessageHandlerImpl::close(const string& /*reference*/)
{
- Reference::shared_ptr ref = references.get(reference);
- //client.ok();
-
- // Send any transfer messages to their correct exchanges and okay them
- const Reference::Messages& msgs = ref->getMessages();
- for (Reference::Messages::const_iterator m = msgs.begin(); m != msgs.end(); ++m) {
- channel.handleInlineTransfer(*m);
- client.setResponseTo((*m)->getRequestId());
- client.ok();
- }
- ref->close();
+ throw ConnectionException(540, "References no longer supported");
}
void
MessageHandlerImpl::checkpoint(const string& /*reference*/,
const string& /*identifier*/ )
{
- // Initial implementation (which is conforming) is to do nothing here
- // and return offset zero for the resume
- //client.ok();
+ throw ConnectionException(540, "References no longer supported");
}
void
-MessageHandlerImpl::resume(const string& reference,
+MessageHandlerImpl::resume(const string& /*reference*/,
const string& /*identifier*/ )
{
- // Initial (null) implementation
- // open reference and return 0 offset
- references.open(reference);
- client.offset(0);
+ throw ConnectionException(540, "References no longer supported");
}
void
MessageHandlerImpl::offset(uint64_t /*value*/ )
{
- // Shouldn't ever receive this as it is reponse to resume
- // which is never sent
- // TODO astitcher 2007-02-16 What is the correct exception to throw here?
- THROW_QPID_ERROR(INTERNAL_ERROR, "impossible");
+ throw ConnectionException(540, "References no longer supported");
}
void
@@ -120,14 +96,12 @@ MessageHandlerImpl::consume(uint16_t /*ticket*/,
if(!destination.empty() && channel.exists(destination))
throw ConnectionException(530, "Consumer tags must be unique");
string tag = destination;
- channel.consume(std::auto_ptr<DeliveryAdapter>(new ConsumeAdapter(adapter, destination, connection.getFrameMax())),
- tag, queue, !noAck, exclusive,
- noLocal ? &connection : 0, &filter);
- //client.ok();
+ channel.consume(MessageMessage::getToken(destination), tag, queue, !noAck, exclusive, noLocal ? &connection : 0, &filter);
// Dispatch messages as there is now a consumer.
queue->requestDispatch();
}
+
void
MessageHandlerImpl::get(uint16_t /*ticket*/,
const string& queueName,
@@ -136,11 +110,11 @@ MessageHandlerImpl::get(uint16_t /*ticket*/,
{
Queue::shared_ptr queue = getQueue(queueName);
- GetAdapter out(adapter, queue, destination, connection.getFrameMax());
- if(channel.get(out, queue, !noAck)) {
- client.ok();
+ if (channel.get(MessageMessage::getToken(destination), queue, !noAck)){
+ //don't send any response... rely on execution completion
} else {
- client.empty();
+ //temporarily disabled:
+ //client.empty();
}
}
@@ -167,14 +141,12 @@ MessageHandlerImpl::qos(uint32_t prefetchSize,
//TODO: handle global
channel.setPrefetchSize(prefetchSize);
channel.setPrefetchCount(prefetchCount);
- //client.ok();
}
void
MessageHandlerImpl::recover(bool requeue)
{
channel.recover(requeue);
- //client.ok();
}
void
@@ -193,11 +165,8 @@ MessageHandlerImpl::transfer(const framing::MethodContext& context)
if (transfer->getBody().isInline()) {
MessageMessage::shared_ptr message(new MessageMessage(&connection, requestId, transfer));
channel.handleInlineTransfer(message);
- client.ok();
} else {
- Reference::shared_ptr ref(references.get(transfer->getBody().getValue()));
- MessageMessage::shared_ptr message(new MessageMessage(&connection, requestId, transfer, ref));
- ref->addMessage(message);
+ throw ConnectionException(540, "References no longer supported");
}
}
diff --git a/qpid/cpp/src/qpid/broker/SemanticHandler.cpp b/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
index 2b1de1bbc0..e9ec698400 100644
--- a/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
@@ -25,10 +25,11 @@
using namespace qpid::broker;
using namespace qpid::framing;
+using namespace qpid::sys;
SemanticHandler::SemanticHandler(ChannelId id, Connection& c) :
connection(c),
- channel(c, id, &c.broker.getStore())
+ channel(c, *this, id, &c.broker.getStore())
{
init(id, connection.getOutput(), connection.getVersion());
adapter = std::auto_ptr<BrokerAdapter>(new BrokerAdapter(channel, connection, connection.broker, *this));
@@ -75,10 +76,24 @@ void SemanticHandler::handleMethodInContext(boost::shared_ptr<qpid::framing::AMQ
}
}
-void SemanticHandler::complete(uint32_t mark, uint16_t /*range- not decoded correctly yet*/)
+void SemanticHandler::complete(uint32_t cumulative, SequenceNumberSet range)
{
- //just record it for now (will eventually need to use it to ack messages):
- outgoing.lwm = SequenceNumber(mark);
+ //record:
+ SequenceNumber mark(cumulative);
+ if (outgoing.lwm < mark) {
+ outgoing.lwm = mark;
+ //ack messages:
+ channel.ack(mark.getValue(), true);
+ //std::cout << "[" << this << "] acknowledged: " << mark << std::endl;
+ }
+ if (range.size() % 2) { //must be even number
+ throw ConnectionException(530, "Received odd number of elements in ranged mark");
+ } else {
+ //TODO: need to keep a record of the full range previously acked
+ for (SequenceNumberSet::iterator i = range.begin(); i != range.end(); i++) {
+ channel.ack((uint64_t) i->getValue(), (uint64_t) (++i)->getValue());
+ }
+ }
}
void SemanticHandler::flush()
@@ -86,8 +101,8 @@ void SemanticHandler::flush()
//flush doubles as a sync to begin with - send an execution.complete
incoming.lwm = incoming.hwm;
if (isOpen()) {
- /*use dummy value for range which is not yet encoded correctly*/
- send(make_shared_ptr(new ExecutionCompleteBody(getVersion(), incoming.hwm.getValue(), 0)));
+ Mutex::ScopedLock l(outLock);
+ ChannelAdapter::send(make_shared_ptr(new ExecutionCompleteBody(getVersion(), incoming.hwm.getValue(), SequenceNumberSet())));
}
}
@@ -140,3 +155,28 @@ void SemanticHandler::handleHeartbeat(boost::shared_ptr<qpid::framing::AMQHeartb
channel.handleHeartbeat(body);
}
+DeliveryId SemanticHandler::deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token)
+{
+ Mutex::ScopedLock l(outLock);
+ SequenceNumber copy(outgoing.hwm);
+ ++copy;
+ msg->deliver(*this, copy.getValue(), token, connection.getFrameMax());
+ //std::cout << "[" << this << "] delivered: " << outgoing.hwm.getValue() << std::endl;
+ return outgoing.hwm.getValue();
+}
+
+void SemanticHandler::redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId tag)
+{
+ msg->deliver(*this, tag, token, connection.getFrameMax());
+}
+
+RequestId SemanticHandler::send(shared_ptr<AMQBody> body, Correlator::Action action)
+{
+ Mutex::ScopedLock l(outLock);
+ uint8_t type(body->type());
+ if (type == REQUEST_BODY || type == RESPONSE_BODY || type == METHOD_BODY) {
+ ++outgoing.hwm;
+ //std::cout << "[" << this << "] allocated: " << outgoing.hwm.getValue() << " to " << *body << std::endl;
+ }
+ return ChannelAdapter::send(body, action);
+}
diff --git a/qpid/cpp/src/qpid/broker/SemanticHandler.h b/qpid/cpp/src/qpid/broker/SemanticHandler.h
index a57559d043..b863b3486e 100644
--- a/qpid/cpp/src/qpid/broker/SemanticHandler.h
+++ b/qpid/cpp/src/qpid/broker/SemanticHandler.h
@@ -24,6 +24,7 @@
#include <memory>
#include "BrokerChannel.h"
#include "Connection.h"
+#include "DeliveryAdapter.h"
#include "qpid/framing/amqp_types.h"
#include "qpid/framing/AMQP_ServerOperations.h"
#include "qpid/framing/FrameHandler.h"
@@ -36,6 +37,7 @@ class BrokerAdapter;
class framing::ChannelAdapter;
class SemanticHandler : private framing::ChannelAdapter,
+ private DeliveryAdapter,
public framing::FrameHandler,
public framing::AMQP_ServerOperations::ExecutionHandler
{
@@ -44,6 +46,7 @@ class SemanticHandler : private framing::ChannelAdapter,
std::auto_ptr<BrokerAdapter> adapter;
framing::Window incoming;
framing::Window outgoing;
+ sys::Mutex outLock;
void handleL4(boost::shared_ptr<qpid::framing::AMQMethodBody> method,
const qpid::framing::MethodContext& context);
@@ -55,12 +58,22 @@ class SemanticHandler : private framing::ChannelAdapter,
void handleHeader(boost::shared_ptr<qpid::framing::AMQHeaderBody>);
void handleContent(boost::shared_ptr<qpid::framing::AMQContentBody>);
void handleHeartbeat(boost::shared_ptr<qpid::framing::AMQHeartbeatBody>);
+
+ framing::RequestId send(shared_ptr<framing::AMQBody> body, framing::Correlator::Action action=framing::Correlator::Action());
+
+
+ //delivery adapter methods:
+ DeliveryId deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token);
+ void redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId tag);
+
public:
SemanticHandler(framing::ChannelId id, Connection& c);
+
+ //frame handler:
void handle(framing::AMQFrame& frame);
//execution class method handlers:
- void complete(uint32_t cumulativeExecutionMark, uint16_t);
+ void complete(uint32_t cumulativeExecutionMark, framing::SequenceNumberSet range);
void flush();
};
diff --git a/qpid/cpp/src/qpid/client/ClientChannel.cpp b/qpid/cpp/src/qpid/client/ClientChannel.cpp
index 0033cbdbe4..19b4726a72 100644
--- a/qpid/cpp/src/qpid/client/ClientChannel.cpp
+++ b/qpid/cpp/src/qpid/client/ClientChannel.cpp
@@ -77,7 +77,7 @@ void Channel::protocolInit(
connection->connector->init(); // Send ProtocolInit block.
ConnectionStartBody::shared_ptr connectionStart =
responses.receive<ConnectionStartBody>();
-
+
FieldTable props;
string mechanism("PLAIN");
string response = ((char)0) + uid + ((char)0) + pwd;
@@ -85,7 +85,7 @@ void Channel::protocolInit(
ConnectionTuneBody::shared_ptr proposal =
sendAndReceive<ConnectionTuneBody>(
make_shared_ptr(new ConnectionStartOkBody(
- version, connectionStart->getRequestId(),
+ version, //connectionStart->getRequestId(),
props, mechanism,
response, locale)));
@@ -98,7 +98,7 @@ void Channel::protocolInit(
**/
sendCommand(make_shared_ptr(new ConnectionTuneOkBody(
- version, proposal->getRequestId(),
+ version, //proposal->getRequestId(),
proposal->getChannelMax(), connection->getMaxFrameSize(),
proposal->getHeartbeat())));
@@ -222,10 +222,10 @@ AMQMethodBody::shared_ptr method, const MethodContext& ctxt)
}
}
-void Channel::handleChannel(AMQMethodBody::shared_ptr method, const MethodContext& ctxt) {
+void Channel::handleChannel(AMQMethodBody::shared_ptr method, const MethodContext& /*ctxt*/) {
switch (method->amqpMethodId()) {
case ChannelCloseBody::METHOD_ID:
- sendCommand(make_shared_ptr(new ChannelCloseOkBody(version, ctxt.getRequestId())));
+ sendCommand(make_shared_ptr(new ChannelCloseOkBody(version/*, ctxt.getRequestId()*/)));
peerClose(shared_polymorphic_downcast<ChannelCloseBody>(method));
return;
case ChannelFlowBody::METHOD_ID:
diff --git a/qpid/cpp/src/qpid/cluster/SessionManager.cpp b/qpid/cpp/src/qpid/cluster/SessionManager.cpp
index 9f6438cf92..88ddfe843f 100644
--- a/qpid/cpp/src/qpid/cluster/SessionManager.cpp
+++ b/qpid/cpp/src/qpid/cluster/SessionManager.cpp
@@ -36,7 +36,7 @@ using namespace sys;
using namespace broker;
/** Handler to send frames direct to local broker (bypass correlation etc.) */
-struct BrokerHandler : public FrameHandler, private ChannelAdapter {
+ struct BrokerHandler : public FrameHandler, private ChannelAdapter, private DeliveryAdapter {
Connection connection;
Channel channel;
BrokerAdapter adapter;
@@ -51,7 +51,7 @@ struct BrokerHandler : public FrameHandler, private ChannelAdapter {
//
BrokerHandler(Broker& broker) :
connection(0, broker),
- channel(connection, 1, 0),
+ channel(connection, *this, 1, 0),
adapter(channel, connection, broker, *this) {}
void handle(AMQFrame& frame) {
@@ -68,6 +68,10 @@ struct BrokerHandler : public FrameHandler, private ChannelAdapter {
virtual void handleMethodInContext(shared_ptr<AMQMethodBody>, const MethodContext&){}
// No-op send.
virtual RequestId send(shared_ptr<AMQBody>, Correlator::Action) { return 0; }
+
+ //delivery adapter methods, also no-ops:
+ virtual DeliveryId deliver(Message::shared_ptr&, DeliveryToken::shared_ptr) { return 0; }
+ virtual void redeliver(Message::shared_ptr&, DeliveryToken::shared_ptr, DeliveryId) {}
};
/** Wrap plain AMQFrames in SessionFrames */
diff --git a/qpid/cpp/src/qpid/framing/AMQMethodBody.cpp b/qpid/cpp/src/qpid/framing/AMQMethodBody.cpp
index 04941eaa58..eb34d48c5f 100644
--- a/qpid/cpp/src/qpid/framing/AMQMethodBody.cpp
+++ b/qpid/cpp/src/qpid/framing/AMQMethodBody.cpp
@@ -60,4 +60,9 @@ void AMQMethodBody::decode(Buffer& buffer, uint32_t /*size*/) {
decodeContent(buffer);
}
+void AMQMethodBody::encode(Buffer& buffer) const {
+ encodeId(buffer);
+ encodeContent(buffer);
+}
+
}} // namespace qpid::framing
diff --git a/qpid/cpp/src/qpid/framing/AMQMethodBody.h b/qpid/cpp/src/qpid/framing/AMQMethodBody.h
index 55cf5cb864..2b46c6ea00 100644
--- a/qpid/cpp/src/qpid/framing/AMQMethodBody.h
+++ b/qpid/cpp/src/qpid/framing/AMQMethodBody.h
@@ -47,6 +47,7 @@ class AMQMethodBody : public AMQBody
AMQMethodBody(ProtocolVersion ver) : version(ver) {}
virtual ~AMQMethodBody() {}
void decode(Buffer&, uint32_t);
+ virtual void encode(Buffer& buffer) const;
virtual MethodId amqpMethodId() const = 0;
virtual ClassId amqpClassId() const = 0;
@@ -64,8 +65,8 @@ class AMQMethodBody : public AMQBody
virtual bool isRequest() const { return false; }
virtual bool isResponse() const { return false; }
- protected:
static uint32_t baseSize() { return 4; }
+ protected:
struct ClassMethodId {
uint16_t classId;
@@ -76,6 +77,9 @@ class AMQMethodBody : public AMQBody
void encodeId(Buffer& buffer) const;
virtual void encodeContent(Buffer& buffer) const = 0;
virtual void decodeContent(Buffer& buffer) = 0;
+
+ virtual void printPrefix(std::ostream&) const {}
+
};
diff --git a/qpid/cpp/src/qpid/framing/SequenceNumberSet.cpp b/qpid/cpp/src/qpid/framing/SequenceNumberSet.cpp
new file mode 100644
index 0000000000..357b5dabd7
--- /dev/null
+++ b/qpid/cpp/src/qpid/framing/SequenceNumberSet.cpp
@@ -0,0 +1,61 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "SequenceNumberSet.h"
+
+using namespace qpid::framing;
+
+void SequenceNumberSet::encode(Buffer& buffer) const
+{
+ buffer.putShort(size());
+ for (const_iterator i = begin(); i != end(); i++) {
+ buffer.putLong(i->getValue());
+ }
+}
+
+void SequenceNumberSet::decode(Buffer& buffer)
+{
+ uint16_t count = buffer.getShort();
+ for (uint16_t i = 0; i < count; i++) {
+ push_back(SequenceNumber(buffer.getLong()));
+ }
+}
+
+uint32_t SequenceNumberSet::encodedSize() const
+{
+ return 2 /*count*/ + (size() * 4);
+}
+
+namespace qpid{
+namespace framing{
+
+std::ostream& operator<<(std::ostream& out, const SequenceNumberSet& set) {
+ out << "{";
+ for (SequenceNumberSet::const_iterator i = set.begin(); i != set.end(); i++) {
+ if (i != set.begin()) out << ", ";
+ out << (i->getValue());
+ }
+ out << "}";
+ return out;
+}
+
+}
+}
diff --git a/qpid/cpp/src/qpid/broker/GetAdapter.h b/qpid/cpp/src/qpid/framing/SequenceNumberSet.h
index e90619a5f3..bcf78d4f22 100644
--- a/qpid/cpp/src/qpid/broker/GetAdapter.h
+++ b/qpid/cpp/src/qpid/framing/SequenceNumberSet.h
@@ -18,30 +18,32 @@
* under the License.
*
*/
-#ifndef _GetAdapter_
-#define _GetAdapter_
+#ifndef _framing_SequenceNumberSet_h
+#define _framing_SequenceNumberSet_h
-#include "BrokerQueue.h"
-#include "DeliveryAdapter.h"
-#include "qpid/framing/ChannelAdapter.h"
+#include <ostream>
+#include <vector>
+#include "amqp_types.h"
+#include "Buffer.h"
+#include "SequenceNumber.h"
namespace qpid {
-namespace broker {
-
- class GetAdapter : public DeliveryAdapter
- {
- framing::ChannelAdapter& adapter;
- Queue::shared_ptr queue;
- const std::string destination;
- const uint32_t framesize;
- public:
- GetAdapter(framing::ChannelAdapter& adapter, Queue::shared_ptr queue, const std::string destination, uint32_t framesize);
- ~GetAdapter(){}
- framing::RequestId getNextDeliveryTag();
- void deliver(Message::shared_ptr& msg, framing::RequestId tag);
- };
-
-}}
+namespace framing {
+
+class SequenceNumberSet : public std::vector<SequenceNumber>
+{
+public:
+ typedef std::vector<SequenceNumber>::const_iterator const_iterator;
+ typedef std::vector<SequenceNumber>::iterator iterator;
+
+ void encode(Buffer& buffer) const;
+ void decode(Buffer& buffer);
+ uint32_t encodedSize() const;
+
+ friend std::ostream& operator<<(std::ostream&, const SequenceNumberSet&);
+};
+
+}} // namespace qpid::framing
#endif
diff --git a/qpid/cpp/src/qpid/framing/amqp_types.h b/qpid/cpp/src/qpid/framing/amqp_types.h
index efb720f047..ff75b28468 100644
--- a/qpid/cpp/src/qpid/framing/amqp_types.h
+++ b/qpid/cpp/src/qpid/framing/amqp_types.h
@@ -53,6 +53,7 @@ typedef uint16_t ReplyCode;
// Types represented by classes.
class Content;
class FieldTable;
+class SequenceNumberSet;
// Useful constants
diff --git a/qpid/cpp/src/tests/BrokerChannelTest.cpp b/qpid/cpp/src/tests/BrokerChannelTest.cpp
index 05bdb7b3f0..eb67601875 100644
--- a/qpid/cpp/src/tests/BrokerChannelTest.cpp
+++ b/qpid/cpp/src/tests/BrokerChannelTest.cpp
@@ -48,30 +48,21 @@ struct MockHandler : ConnectionOutputHandler{
void close() {};
};
-struct DeliveryRecorder
+struct DeliveryRecorder : DeliveryAdapter
{
- typedef std::pair<Message::shared_ptr, RequestId> Delivery;
+ DeliveryId id;
+ typedef std::pair<Message::shared_ptr, DeliveryToken::shared_ptr> Delivery;
std::vector<Delivery> delivered;
- struct Adapter : DeliveryAdapter
+ DeliveryId deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token)
{
- RequestId id;
- DeliveryRecorder& recorder;
-
- Adapter(DeliveryRecorder& r) : recorder(r) {}
-
- RequestId getNextDeliveryTag() { return id + 1; }
- void deliver(Message::shared_ptr& msg, RequestId tag)
- {
- recorder.delivered.push_back(Delivery(msg, tag));
- id++;
- }
-
- };
+ delivered.push_back(Delivery(msg, token));
+ return ++id;
+ }
- std::auto_ptr<DeliveryAdapter> createAdapter()
+ void redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId /*tag*/)
{
- return std::auto_ptr<DeliveryAdapter>(new Adapter(*this));
+ delivered.push_back(Delivery(msg, token));
}
};
@@ -166,6 +157,7 @@ class BrokerChannelTest : public CppUnit::TestCase
}
};
+ DeliveryRecorder recorder;
public:
@@ -179,13 +171,13 @@ class BrokerChannelTest : public CppUnit::TestCase
void testConsumerMgmt(){
Queue::shared_ptr queue(new Queue("my_queue"));
- Channel channel(connection, 0, 0);
+ Channel channel(connection, recorder, 0, 0);
channel.open();
CPPUNIT_ASSERT(!channel.exists("my_consumer"));
ConnectionToken* owner = 0;
string tag("my_consumer");
- std::auto_ptr<DeliveryAdapter> unused;
+ DeliveryToken::shared_ptr unused;
channel.consume(unused, tag, queue, false, false, owner);
string tagA;
string tagB;
@@ -205,24 +197,25 @@ class BrokerChannelTest : public CppUnit::TestCase
}
void testDeliveryNoAck(){
- Channel channel(connection, 7);
+ Channel channel(connection, recorder, 7);
Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14));
Queue::shared_ptr queue(new Queue("my_queue"));
- DeliveryRecorder recorder;
string tag("test");
- channel.consume(recorder.createAdapter(), tag, queue, false, false, 0);
+ DeliveryToken::shared_ptr token(BasicMessage::createConsumeToken("my-token"));
+ channel.consume(token, tag, queue, false, false, 0);
queue->deliver(msg);
sleep(2);
CPPUNIT_ASSERT_EQUAL((size_t) 1, recorder.delivered.size());
CPPUNIT_ASSERT_EQUAL(msg, recorder.delivered.front().first);
+ CPPUNIT_ASSERT_EQUAL(token, recorder.delivered.front().second);
}
void testStaging(){
MockMessageStore store;
connection.setFrameMax(1000);
connection.setStagingThreshold(10);
- Channel channel(connection, 1, &store);
+ Channel channel(connection, recorder, 1, &store);
const string data[] = {"abcde", "fghij", "klmno"};
Message* msg = new BasicMessage(0, "my_exchange", "my_routing_key", false, false);
@@ -314,7 +307,7 @@ class BrokerChannelTest : public CppUnit::TestCase
}
void testFlow(){
- Channel channel(connection, 7);
+ Channel channel(connection, recorder, 7);
channel.open();
//there will always be a connection-start frame
CPPUNIT_ASSERT_EQUAL((size_t) 1, handler.frames.size());
@@ -327,9 +320,9 @@ class BrokerChannelTest : public CppUnit::TestCase
Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14));
addContent(msg, data);
Queue::shared_ptr queue(new Queue("my_queue"));
- DeliveryRecorder recorder;
string tag("test");
- channel.consume(recorder.createAdapter(), tag, queue, false, false, 0);
+ DeliveryToken::shared_ptr token(BasicMessage::createConsumeToken("my-token"));
+ channel.consume(token, tag, queue, false, false, 0);
channel.flow(false);
queue->deliver(msg);
//ensure no messages have been delivered
@@ -340,6 +333,7 @@ class BrokerChannelTest : public CppUnit::TestCase
//ensure no messages have been delivered
CPPUNIT_ASSERT_EQUAL((size_t) 1, recorder.delivered.size());
CPPUNIT_ASSERT_EQUAL(msg, recorder.delivered.front().first);
+ CPPUNIT_ASSERT_EQUAL(token, recorder.delivered.front().second);
}
Message* createMessage(const string& exchange, const string& routingKey, const string& messageId, uint64_t contentSize)
diff --git a/qpid/cpp/src/tests/FramingTest.cpp b/qpid/cpp/src/tests/FramingTest.cpp
index 134acb94f9..98f89b59be 100644
--- a/qpid/cpp/src/tests/FramingTest.cpp
+++ b/qpid/cpp/src/tests/FramingTest.cpp
@@ -108,7 +108,7 @@ class FramingTest : public CppUnit::TestCase
{
std::string a = "hostA";
std::string b = "hostB";
- ConnectionRedirectBody in(version, 0, a, b);
+ ConnectionRedirectBody in(version, a, b);
in.encodeContent(buffer);
buffer.flip();
ConnectionRedirectBody out(version);
@@ -146,7 +146,7 @@ class FramingTest : public CppUnit::TestCase
std::string a = "hostA";
std::string b = "hostB";
AMQFrame in(version, 999,
- new ConnectionRedirectBody(version, 0, a, b));
+ new ConnectionRedirectBody(version, a, b));
in.encode(buffer);
buffer.flip();
AMQFrame out;
@@ -157,7 +157,7 @@ class FramingTest : public CppUnit::TestCase
void testBasicConsumeOkBodyFrame()
{
std::string s = "hostA";
- AMQFrame in(version, 999, new BasicConsumeOkBody(version, 0, s));
+ AMQFrame in(version, 999, new BasicConsumeOkBody(version, s));
in.encode(buffer);
buffer.flip();
AMQFrame out;
@@ -400,22 +400,22 @@ class FramingTest : public CppUnit::TestCase
c.declareQueue(queue);
c.bind(exchange, queue, "MyTopic", framing::FieldTable());
broker::InProcessBroker::Conversation::const_iterator i = ibroker.conversation.begin();
- ASSERT_FRAME("BROKER: Frame[channel=0; request(id=1,mark=0): ConnectionStart: versionMajor=0; versionMinor=10; serverProperties={}; mechanisms=PLAIN; locales=en_US]", *i++);
- ASSERT_FRAME("CLIENT: Frame[channel=0; response(id=1,request=1,batch=0): ConnectionStartOk: clientProperties={}; mechanism=PLAIN; response=\000guest\000guest; locale=en_US]", *i++);
- ASSERT_FRAME("BROKER: Frame[channel=0; request(id=2,mark=1): ConnectionTune: channelMax=32767; frameMax=65536; heartbeat=0]", *i++);
- ASSERT_FRAME("CLIENT: Frame[channel=0; response(id=2,request=2,batch=0): ConnectionTuneOk: channelMax=32767; frameMax=65536; heartbeat=0]", *i++);
- ASSERT_FRAME("CLIENT: Frame[channel=0; request(id=1,mark=0): ConnectionOpen: virtualHost=/; capabilities=; insist=1]", *i++);
- ASSERT_FRAME("BROKER: Frame[channel=0; response(id=1,request=1,batch=0): ConnectionOpenOk: knownHosts=]", *i++);
- ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=1,mark=0): ChannelOpen: outOfBand=]", *i++);
- ASSERT_FRAME("BROKER: Frame[channel=1; response(id=1,request=1,batch=0): ChannelOpenOk: ]", *i++);
- ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=2,mark=1): ExchangeDeclare: ticket=0; exchange=MyExchange; type=topic; alternateExchange=; passive=0; durable=0; autoDelete=0; arguments={}]", *i++);
- ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=3,mark=1): ExecutionFlush: ]", *i++);
- ASSERT_FRAME("BROKER: Frame[channel=1; request(id=1,mark=0): ExecutionComplete: cumulativeExecutionMark=2; rangedExecutionSet=0]", *i++);
- ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=4,mark=1): QueueDeclare: ticket=0; queue=MyQueue; alternateExchange=; passive=0; durable=0; exclusive=1; autoDelete=1; nowait=0; arguments={}]", *i++);
- ASSERT_FRAME("BROKER: Frame[channel=1; response(id=2,request=4,batch=0): QueueDeclareOk: queue=MyQueue; messageCount=0; consumerCount=0]", *i++);
- ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=5,mark=2): QueueBind: ticket=0; queue=MyQueue; exchange=MyExchange; routingKey=MyTopic; arguments={}]", *i++);
- ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=6,mark=2): ExecutionFlush: ]", *i++);
- ASSERT_FRAME("BROKER: Frame[channel=1; request(id=2,mark=0): ExecutionComplete: cumulativeExecutionMark=4; rangedExecutionSet=0]", *i++);
+ ASSERT_FRAME("BROKER: Frame[channel=0; ConnectionStart: versionMajor=0; versionMinor=10; serverProperties={}; mechanisms=PLAIN; locales=en_US]", *i++);
+ ASSERT_FRAME("CLIENT: Frame[channel=0; ConnectionStartOk: clientProperties={}; mechanism=PLAIN; response=\000guest\000guest; locale=en_US]", *i++);
+ ASSERT_FRAME("BROKER: Frame[channel=0; ConnectionTune: channelMax=32767; frameMax=65536; heartbeat=0]", *i++);
+ ASSERT_FRAME("CLIENT: Frame[channel=0; ConnectionTuneOk: channelMax=32767; frameMax=65536; heartbeat=0]", *i++);
+ ASSERT_FRAME("CLIENT: Frame[channel=0; ConnectionOpen: virtualHost=/; capabilities=; insist=1]", *i++);
+ ASSERT_FRAME("BROKER: Frame[channel=0; ConnectionOpenOk: knownHosts=]", *i++);
+ ASSERT_FRAME("CLIENT: Frame[channel=1; ChannelOpen: outOfBand=]", *i++);
+ ASSERT_FRAME("BROKER: Frame[channel=1; ChannelOpenOk: ]", *i++);
+ ASSERT_FRAME("CLIENT: Frame[channel=1; ExchangeDeclare: ticket=0; exchange=MyExchange; type=topic; alternateExchange=; passive=0; durable=0; autoDelete=0; arguments={}]", *i++);
+ ASSERT_FRAME("CLIENT: Frame[channel=1; ExecutionFlush: ]", *i++);
+ ASSERT_FRAME("BROKER: Frame[channel=1; ExecutionComplete: cumulativeExecutionMark=2; rangedExecutionSet={}]", *i++);
+ ASSERT_FRAME("CLIENT: Frame[channel=1; QueueDeclare: ticket=0; queue=MyQueue; alternateExchange=; passive=0; durable=0; exclusive=1; autoDelete=1; nowait=0; arguments={}]", *i++);
+ ASSERT_FRAME("BROKER: Frame[channel=1; QueueDeclareOk: queue=MyQueue; messageCount=0; consumerCount=0]", *i++);
+ ASSERT_FRAME("CLIENT: Frame[channel=1; QueueBind: ticket=0; queue=MyQueue; exchange=MyExchange; routingKey=MyTopic; arguments={}]", *i++);
+ ASSERT_FRAME("CLIENT: Frame[channel=1; ExecutionFlush: ]", *i++);
+ ASSERT_FRAME("BROKER: Frame[channel=1; ExecutionComplete: cumulativeExecutionMark=4; rangedExecutionSet={}]", *i++);
}
};
diff --git a/qpid/python/cpp_failing_0-10.txt b/qpid/python/cpp_failing_0-10.txt
index 798d1769eb..e68f942d67 100644
--- a/qpid/python/cpp_failing_0-10.txt
+++ b/qpid/python/cpp_failing_0-10.txt
@@ -1,4 +1,15 @@
-tests_0-10.message.MessageTests.test_checkpoint
tests_0-10.message.MessageTests.test_reject
tests_0-10.basic.BasicTests.test_get
+tests_0-10.message.MessageTests.test_get
+tests_0-10.message.MessageTests.test_checkpoint
+tests_0-10.message.MessageTests.test_empty_reference
+tests_0-10.message.MessageTests.test_reference_already_opened_error
+tests_0-10.message.MessageTests.test_reference_completion
+tests_0-10.message.MessageTests.test_reference_large
+tests_0-10.message.MessageTests.test_reference_multi_transfer
+tests_0-10.message.MessageTests.test_reference_simple
+tests_0-10.message.MessageTests.test_reference_unopened_on_append_error
+tests_0-10.message.MessageTests.test_reference_unopened_on_close_error
+tests_0-10.message.MessageTests.test_reference_unopened_on_transfer_error
+
diff --git a/qpid/python/qpid/codec.py b/qpid/python/qpid/codec.py
index a5228e8003..a0d9696c8b 100644
--- a/qpid/python/qpid/codec.py
+++ b/qpid/python/qpid/codec.py
@@ -329,12 +329,6 @@ class Codec:
return ReferenceId(self.decode_longstr())
# new domains for 0-10:
-
- def encode_uuid(self, s):
- self.encode_longstr(s)
-
- def decode_uuid(self):
- return self.decode_longstr()
def encode_rfc1982_long(self, s):
self.encode_long(s)
@@ -342,10 +336,21 @@ class Codec:
def decode_rfc1982_long(self):
return self.decode_long()
- #Not done yet
def encode_rfc1982_long_set(self, s):
- self.encode_short(0)
+ self.encode_short(len(s))
+ for i in s:
+ self.encode_long(i)
def decode_rfc1982_long_set(self):
- self.decode_short()
- return 0;
+ count = self.decode_short()
+ set = []
+ for i in range(0, count):
+ set.append(self.decode_long())
+ return set;
+
+ #not correct for 0-10 yet
+ def encode_uuid(self, s):
+ self.encode_longstr(s)
+
+ def decode_uuid(self):
+ return self.decode_longstr()
diff --git a/qpid/python/qpid/message.py b/qpid/python/qpid/message.py
index f80293180e..970ab9d974 100644
--- a/qpid/python/qpid/message.py
+++ b/qpid/python/qpid/message.py
@@ -26,7 +26,10 @@ class Message:
self.frame = frame
self.method = frame.method_type
self.content = content
-
+ if self.method.klass.name != "execution":
+ self.command_id = self.channel.incoming_completion.sequence.next()
+ #print "allocated: ", self.command_id, "to ", self.method.klass.name, "_", self.method.name
+
def __len__(self):
return len(self.frame.args)
@@ -66,3 +69,6 @@ class Message:
def __repr__(self):
return Message.REPR % (self.method, self.frame.args, self.content)
+
+ def complete(self, cumulative=True):
+ self.channel.incoming_completion.complete(mark=self.command_id, cumulative=cumulative)
diff --git a/qpid/python/qpid/peer.py b/qpid/python/qpid/peer.py
index 3927f20667..bedc96895b 100644
--- a/qpid/python/qpid/peer.py
+++ b/qpid/python/qpid/peer.py
@@ -30,6 +30,7 @@ from message import Message
from queue import Queue, Closed as QueueClosed
from content import Content
from cStringIO import StringIO
+from time import time
class Sequence:
@@ -186,11 +187,11 @@ class Channel:
self.requester = Requester(self.write)
self.responder = Responder(self.write)
- self.completion = ExecutionCompletion()
+ self.completion = OutgoingCompletion()
+ self.incoming_completion = IncomingCompletion(self)
# Use reliable framing if version == 0-9.
- # (also for 0-10 while transitioning...)
- self.reliable = (spec.major == 0 and (spec.minor == 9 or spec.minor == 10))
+ self.reliable = (spec.major == 0 and spec.minor == 9)
self.use_execution_layer = (spec.major == 0 and spec.minor == 10)
self.synchronous = True
@@ -202,6 +203,7 @@ class Channel:
self.incoming.close()
self.responses.close()
self.completion.close()
+ self.incoming_completion.reset()
def write(self, frame, content = None):
if self.closed:
@@ -252,6 +254,9 @@ class Channel:
self.responder.respond(method, batch, request)
def invoke(self, type, args, kwargs):
+ if type.klass.name == "channel" and (type.name == "close" or type.name == "open"):
+ self.completion.reset()
+ self.incoming_completion.reset()
self.completion.next_command(type)
content = kwargs.pop("content", None)
frame = Method(type, type.arguments(*args, **kwargs))
@@ -306,6 +311,13 @@ class Channel:
return Message(self, resp, content)
else:
raise ValueError(resp)
+ elif self.synchronous and not frame.method.response \
+ and self.use_execution_layer and frame.method.klass.name != "execution":
+ self.execution_flush()
+ self.completion.wait()
+ if self.closed:
+ raise Closed(self.reason)
+
except QueueClosed, e:
if self.closed:
raise Closed(self.reason)
@@ -349,21 +361,32 @@ class Future:
def is_complete(self):
return self.completed.isSet()
-class ExecutionCompletion:
+class OutgoingCompletion:
+ """
+ Manages completion of outgoing commands i.e. command sent by this peer
+ """
+
def __init__(self):
self.condition = threading.Condition()
- self.sequence = Sequence(1)
- self.command_id = 0
- self.mark = 0
+
+ self.sequence = Sequence(1) #issues ids for outgoing commands
+ self.command_id = 0 #last issued id
+ self.mark = 0 #commands up to this mark are known to be complete
+ self.closed = False
def next_command(self, method):
#the following test is a hack until the track/sub-channel is available
if method.klass.name != "execution":
self.command_id = self.sequence.next()
+ def reset(self):
+ self.sequence = Sequence(1) #reset counter
+
def close(self):
+ self.reset()
self.condition.acquire()
try:
+ self.closed = True
self.condition.notifyAll()
finally:
self.condition.release()
@@ -378,11 +401,50 @@ class ExecutionCompletion:
def wait(self, point_of_interest=-1, timeout=None):
if point_of_interest == -1: point_of_interest = self.command_id
+ start_time = time()
+ remaining = timeout
self.condition.acquire()
try:
- if point_of_interest > self.mark:
- self.condition.wait(timeout)
+ while not self.closed and point_of_interest > self.mark:
+ #print "waiting for ", point_of_interest, " mark is currently at ", self.mark
+ self.condition.wait(remaining)
+ if timeout:
+ if start_time + timeout > time(): break
+ else: remaining = timeout - (time() - start_time)
finally:
self.condition.release()
- #todo: retry until timed out or closed
return point_of_interest <= self.mark
+
+class IncomingCompletion:
+ """
+ Manages completion of incoming commands i.e. command received by this peer
+ """
+
+ def __init__(self, channel):
+ self.sequence = Sequence(1) #issues ids for incoming commands
+ self.mark = 0 #id of last command of whose completion notification was sent to the other peer
+ self.channel = channel
+
+ def next_id(self, method):
+ #the following test is a hack until the track/sub-channel is available
+ if method.klass.name != "execution":
+ return self.sequence.next()
+ else:
+ return 0
+
+ def reset(self):
+ self.sequence = Sequence(1) #reset counter
+
+ def complete(self, mark, cumulative=True):
+ if cumulative:
+ if mark > self.mark:
+ self.mark = mark
+ self.channel.execution_complete(cumulative_execution_mark=self.mark)
+ else:
+ #TODO: record and manage the ranges properly
+ range = [mark, mark]
+ self.channel.execution_complete(cumulative_execution_mark=self.mark, ranged_execution_set=range)
+
+
+
+
diff --git a/qpid/python/qpid/spec.py b/qpid/python/qpid/spec.py
index c537401164..09e7dc9d0b 100644
--- a/qpid/python/qpid/spec.py
+++ b/qpid/python/qpid/spec.py
@@ -240,7 +240,7 @@ class Method(Metadata):
"content": None,
"uuid": "",
"rfc1982_long": 0,
- "rfc1982_long_set": 0
+ "rfc1982_long_set": []
}
def define_method(self, name):
diff --git a/qpid/python/tests_0-10/broker.py b/qpid/python/tests_0-10/broker.py
index 684b36597e..6bc2f7ceb8 100644
--- a/qpid/python/tests_0-10/broker.py
+++ b/qpid/python/tests_0-10/broker.py
@@ -48,7 +48,7 @@ class BrokerTests(TestBase):
body = "test ack"
ch.message_transfer(routing_key = "otherqueue", body = body)
msg = self.client.queue(ctag).get(timeout = 5)
- msg.ok()
+ msg.complete()
self.assert_(msg.body == body)
def test_simple_delivery_immediate(self):
diff --git a/qpid/python/tests_0-10/dtx.py b/qpid/python/tests_0-10/dtx.py
index c0d1bd2b74..2835d703ae 100644
--- a/qpid/python/tests_0-10/dtx.py
+++ b/qpid/python/tests_0-10/dtx.py
@@ -40,6 +40,11 @@ class DtxTests(TestBase):
XA_RBROLLBACK = 1
XA_RBTIMEOUT = 2
XA_OK = 8
+ tx_counter = 0
+
+ def reset_channel(self):
+ self.channel.channel_close()
+ self.channel.channel_open()
def test_simple_commit(self):
"""
@@ -56,6 +61,9 @@ class DtxTests(TestBase):
#commit
self.assertEqual(self.XA_OK, channel.dtx_coordination_commit(xid=tx, one_phase=True).status)
+ #should close and reopen channel to ensure no unacked messages are held
+ self.reset_channel()
+
#check result
self.assertMessageCount(0, "queue-a")
self.assertMessageCount(1, "queue-b")
@@ -79,6 +87,8 @@ class DtxTests(TestBase):
#commit
self.assertEqual(self.XA_OK, channel.dtx_coordination_commit(xid=tx, one_phase=False).status)
+ self.reset_channel()
+
#check result
self.assertMessageCount(0, "queue-a")
self.assertMessageCount(1, "queue-b")
@@ -100,6 +110,8 @@ class DtxTests(TestBase):
#rollback
self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).status)
+ self.reset_channel()
+
#check result
self.assertMessageCount(1, "queue-a")
self.assertMessageCount(0, "queue-b")
@@ -123,6 +135,8 @@ class DtxTests(TestBase):
#rollback
self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).status)
+ self.reset_channel()
+
#check result
self.assertMessageCount(1, "queue-a")
self.assertMessageCount(0, "queue-b")
@@ -191,6 +205,8 @@ class DtxTests(TestBase):
channel = self.channel
#do some transactional work & complete the transaction
self.test_simple_commit()
+ # channel has been reset, so reselect for use with dtx
+ channel.dtx_demarcation_select()
#start association for the same xid as the previously completed txn
tx = self.xid("my-xid")
@@ -355,7 +371,7 @@ class DtxTests(TestBase):
self.assertEqual("two", msg.message_id)
channel.message_cancel(destination="results")
#ack the message then close the channel
- msg.ok()
+ msg.complete()
channel.channel_close()
channel = self.channel
@@ -446,7 +462,7 @@ class DtxTests(TestBase):
channel2.dtx_demarcation_select()
channel2.dtx_demarcation_start(xid=tx)
channel2.message_get(queue="dummy", destination="dummy")
- self.client.queue("dummy").get(timeout=1).ok()
+ self.client.queue("dummy").get(timeout=1).complete()
channel2.message_transfer(routing_key="dummy", body="whatever")
channel2.channel_close()
@@ -548,7 +564,9 @@ class DtxTests(TestBase):
channel.dtx_coordination_rollback(xid=x)
self.fail("Recovered xids not as expected. missing: %s; extra: %s" % (missing, extra))
- def xid(self, txid, branchqual = ''):
+ def xid(self, txid):
+ DtxTests.tx_counter += 1
+ branchqual = "v%s" % DtxTests.tx_counter
return pack('LBB', 0, len(txid), len(branchqual)) + txid + branchqual
def txswap(self, tx, id):
@@ -573,7 +591,7 @@ class DtxTests(TestBase):
#consume from src:
channel.message_get(destination="temp-swap", queue=src)
msg = self.client.queue("temp-swap").get(timeout=1)
- msg.ok();
+ msg.complete();
#re-publish to dest
channel.message_transfer(routing_key=dest, message_id=msg.message_id, body=msg.body)
diff --git a/qpid/python/tests_0-10/example.py b/qpid/python/tests_0-10/example.py
index 7ab4cc7d0a..dc71b0590b 100644
--- a/qpid/python/tests_0-10/example.py
+++ b/qpid/python/tests_0-10/example.py
@@ -90,5 +90,5 @@ class ExampleTest (TestBase):
self.assertEqual(body, msg.body)
# Now acknowledge the message.
- msg.ok()
+ msg.complete()
diff --git a/qpid/python/tests_0-10/message.py b/qpid/python/tests_0-10/message.py
index b25016e680..74e2b6416f 100644
--- a/qpid/python/tests_0-10/message.py
+++ b/qpid/python/tests_0-10/message.py
@@ -171,8 +171,8 @@ class MessageTests(TestBase):
self.assertEqual("Four", msg4.body)
self.assertEqual("Five", msg5.body)
- msg1.ok(batchoffset=1)#One and Two
- msg4.ok()
+ msg2.complete(cumulative=True)#One and Two
+ msg4.complete(cumulative=False)
channel.message_recover(requeue=False)
@@ -215,8 +215,8 @@ class MessageTests(TestBase):
self.assertEqual("Four", msg4.body)
self.assertEqual("Five", msg5.body)
- msg1.ok(batchoffset=1) #One and Two
- msg4.ok() #Four
+ msg2.complete(cumulative=True) #One and Two
+ msg4.complete(cumulative=False) #Four
channel.message_cancel(destination="consumer_tag")
@@ -276,14 +276,13 @@ class MessageTests(TestBase):
except Empty: None
#ack messages and check that the next set arrive ok:
- #todo: once batching is implmented, send a single response for all messages
- msg.ok(batchoffset=-4)#1-5
+ msg.complete()
for i in range(6, 11):
msg = queue.get(timeout=1)
self.assertEqual("Message %d" % i, msg.body)
- msg.ok(batchoffset=-4)#6-10
+ msg.complete()
try:
extra = queue.get(timeout=1)
@@ -320,13 +319,13 @@ class MessageTests(TestBase):
except Empty: None
#ack messages and check that the next set arrive ok:
- msg.ok(batchoffset=-4)#1-5
+ msg.complete()
for i in range(6, 11):
msg = queue.get(timeout=1)
self.assertEqual("Message %d" % i, msg.body)
- msg.ok(batchoffset=-4)#6-10
+ msg.complete()
try:
extra = queue.get(timeout=1)
@@ -376,9 +375,9 @@ class MessageTests(TestBase):
self.assertEqual("Message %d" % i, msg.body)
if (i==13):
- msg.ok(batchoffset=-2)#11, 12 & 13
+ msg.complete()#11, 12 & 13
if(i in [15, 17, 19]):
- msg.ok()
+ msg.complete(cumulative=False)
reply = channel.message_get(no_ack=True, queue="test-get")
self.assertEqual(reply.method.klass.name, "message")
@@ -395,8 +394,7 @@ class MessageTests(TestBase):
self.assertEqual(reply.method.name, "ok")
msg = self.client.queue(tag).get(timeout=1)
self.assertEqual("Message %d" % i, msg.body)
- msg.ok()
- #channel.message_ack(delivery_tag=reply.delivery_tag)
+ msg.complete()
reply = channel.message_get(no_ack=True, queue="test-get")
self.assertEqual(reply.method.klass.name, "message")
diff --git a/qpid/python/tests_0-10/tx.py b/qpid/python/tests_0-10/tx.py
index 0f6b4f5cd1..b499c2d1f9 100644
--- a/qpid/python/tests_0-10/tx.py
+++ b/qpid/python/tests_0-10/tx.py
@@ -30,23 +30,39 @@ class TxTests(TestBase):
"""
Test that commited publishes are delivered and commited acks are not re-delivered
"""
+ channel2 = self.client.channel(2)
+ channel2.channel_open()
+ self.perform_txn_work(channel2, "tx-commit-a", "tx-commit-b", "tx-commit-c")
+ channel2.tx_commit()
+ channel2.channel_close()
+
+ #use a different channel with new subscriptions to ensure
+ #there is no redelivery of acked messages:
channel = self.channel
- queue_a, queue_b, queue_c = self.perform_txn_work(channel, "tx-commit-a", "tx-commit-b", "tx-commit-c")
- channel.tx_commit()
+ channel.tx_select()
+
+ channel.message_consume(queue="tx-commit-a", destination="qa", no_ack=False)
+ queue_a = self.client.queue("qa")
+
+ channel.message_consume(queue="tx-commit-b", destination="qb", no_ack=False)
+ queue_b = self.client.queue("qb")
+
+ channel.message_consume(queue="tx-commit-c", destination="qc", no_ack=False)
+ queue_c = self.client.queue("qc")
#check results
for i in range(1, 5):
msg = queue_c.get(timeout=1)
self.assertEqual("TxMessage %d" % i, msg.body)
- msg.ok()
+ msg.complete()
msg = queue_b.get(timeout=1)
self.assertEqual("TxMessage 6", msg.body)
- msg.ok()
+ msg.complete()
msg = queue_a.get(timeout=1)
self.assertEqual("TxMessage 7", msg.body)
- msg.ok()
+ msg.complete()
for q in [queue_a, queue_b, queue_c]:
try:
@@ -76,15 +92,15 @@ class TxTests(TestBase):
for i in range(1, 5):
msg = queue_a.get(timeout=1)
self.assertEqual("Message %d" % i, msg.body)
- msg.ok()
+ msg.complete()
msg = queue_b.get(timeout=1)
self.assertEqual("Message 6", msg.body)
- msg.ok()
+ msg.complete()
msg = queue_c.get(timeout=1)
self.assertEqual("Message 7", msg.body)
- msg.ok()
+ msg.complete()
for q in [queue_a, queue_b, queue_c]:
try:
@@ -114,15 +130,15 @@ class TxTests(TestBase):
for i in range(1, 5):
msg = queue_a.get(timeout=1)
self.assertEqual("Message %d" % i, msg.body)
- msg.ok()
+ msg.complete()
msg = queue_b.get(timeout=1)
self.assertEqual("Message 6", msg.body)
- msg.ok()
+ msg.complete()
msg = queue_c.get(timeout=1)
self.assertEqual("Message 7", msg.body)
- msg.ok()
+ msg.complete()
for q in [queue_a, queue_b, queue_c]:
try:
@@ -150,10 +166,10 @@ class TxTests(TestBase):
channel.queue_bind(queue=name_c, exchange="amq.topic", routing_key=topic)
for i in range(1, 5):
- channel.message_transfer(routing_key=name_a, body="Message %d" % i)
+ channel.message_transfer(routing_key=name_a, message_id="msg%d" % i, body="Message %d" % i)
- channel.message_transfer(routing_key=key, destination="amq.direct", body="Message 6")
- channel.message_transfer(routing_key=topic, destination="amq.topic", body="Message 7")
+ channel.message_transfer(routing_key=key, destination="amq.direct", message_id="msg6", body="Message 6")
+ channel.message_transfer(routing_key=topic, destination="amq.topic", message_id="msg7", body="Message 7")
channel.tx_select()
@@ -164,25 +180,25 @@ class TxTests(TestBase):
msg = queue_a.get(timeout=1)
self.assertEqual("Message %d" % i, msg.body)
- msg.ok(batchoffset=-3)
+ msg.complete()
channel.message_consume(queue=name_b, destination="sub_b", no_ack=False)
queue_b = self.client.queue("sub_b")
msg = queue_b.get(timeout=1)
self.assertEqual("Message 6", msg.body)
- msg.ok()
+ msg.complete()
sub_c = channel.message_consume(queue=name_c, destination="sub_c", no_ack=False)
queue_c = self.client.queue("sub_c")
msg = queue_c.get(timeout=1)
self.assertEqual("Message 7", msg.body)
- msg.ok()
+ msg.complete()
#publish messages
for i in range(1, 5):
- channel.message_transfer(routing_key=topic, destination="amq.topic", body="TxMessage %d" % i)
+ channel.message_transfer(routing_key=topic, destination="amq.topic", message_id="tx-msg%d" % i, body="TxMessage %d" % i)
- channel.message_transfer(routing_key=key, destination="amq.direct", body="TxMessage 6")
- channel.message_transfer(routing_key=name_a, body="TxMessage 7")
+ channel.message_transfer(routing_key=key, destination="amq.direct", message_id="tx-msg6", body="TxMessage 6")
+ channel.message_transfer(routing_key=name_a, message_id="tx-msg7", body="TxMessage 7")
return queue_a, queue_b, queue_c
diff --git a/qpid/specs/amqp-transitional.0-10.xml b/qpid/specs/amqp-transitional.0-10.xml
index eb2d745c35..53912d0c2e 100644
--- a/qpid/specs/amqp-transitional.0-10.xml
+++ b/qpid/specs/amqp-transitional.0-10.xml
@@ -6184,13 +6184,6 @@
<chassis name="server" implement="MUST" />
<chassis name="client" implement="MUST" />
- <!-- TRANSITIONAL: this is included as a temporary measure to allow easier evolution from 0-9 to 0-10 -->
- <method name = "ok" index = "500" label = "old means of signalling completion">
- <chassis name = "server" implement = "MUST" />
- <chassis name = "client" implement = "MUST" />
- </method>
-
-
<!-- - Method: message.transfer - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -->
<method name="transfer" index="10" label="transfer a message">
@@ -6220,8 +6213,9 @@
<chassis name="server" implement="MUST" />
<chassis name="client" implement="MUST" />
- <response name="ok" />
+ <!-- commented out to ease transition from 0-9 to 0-10
<response name="reject" />
+ -->
<field name="ticket" domain="access-ticket" label="access ticket">
<rule name="validity" on-failure="access-refused">
@@ -6594,8 +6588,9 @@
<chassis name="server" implement="MUST" />
- <response name="ok" /><!-- added in just to ease transition -->
+ <!-- commented out to aid transition to 0-10
<response name="empty" />
+ -->
<field name="ticket" domain="access-ticket">
<rule name="ticket-required" on-failure="access-refused">