summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-07-23 12:29:17 +0000
committerGordon Sim <gsim@apache.org>2007-07-23 12:29:17 +0000
commit0db1af31320aa010c8e97da80000f7548d889068 (patch)
treece2cd8dba8cf46b685dcb626b31e25c17702c1a0
parent747ac26509e78ac9aa9120be02cd446ac99d21cd (diff)
downloadqpid-python-0db1af31320aa010c8e97da80000f7548d889068.tar.gz
Added initial 'execution-layer' to try out methods form the 0-10 execution class.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@558700 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/gentools/src/org/apache/qpid/gentools/CppGenerator.java52
-rw-r--r--cpp/gentools/templ.cpp/AMQP_ServerOperations.h.tmpl7
-rw-r--r--cpp/gentools/templ.cpp/MethodBodyClass.h.tmpl2
-rw-r--r--cpp/src/qpid/broker/BrokerAdapter.cpp50
-rw-r--r--cpp/src/qpid/broker/BrokerAdapter.h1
-rw-r--r--cpp/src/qpid/broker/BrokerMessage.cpp8
-rw-r--r--cpp/src/qpid/broker/BrokerMessageMessage.cpp12
-rw-r--r--cpp/src/qpid/broker/ConnectionAdapter.h1
-rw-r--r--cpp/src/qpid/broker/DtxHandlerImpl.cpp30
-rw-r--r--cpp/src/qpid/broker/InMemoryContent.cpp4
-rw-r--r--cpp/src/qpid/broker/LazyLoadedContent.cpp4
-rw-r--r--cpp/src/qpid/broker/MessageHandlerImpl.cpp27
-rw-r--r--cpp/src/qpid/broker/SemanticHandler.cpp35
-rw-r--r--cpp/src/qpid/broker/SemanticHandler.h16
-rw-r--r--cpp/src/qpid/client/BasicMessageChannel.cpp21
-rw-r--r--cpp/src/qpid/client/ClientChannel.cpp53
-rw-r--r--cpp/src/qpid/client/ClientChannel.h12
-rw-r--r--cpp/src/qpid/framing/AMQMethodBody.cpp4
-rw-r--r--cpp/src/qpid/framing/AMQMethodBody.h1
-rw-r--r--cpp/src/qpid/framing/ChannelAdapter.h4
-rw-r--r--cpp/src/qpid/framing/SequenceNumber.h6
-rw-r--r--cpp/src/tests/client_test.cpp6
-rw-r--r--python/qpid/client.py3
-rw-r--r--python/qpid/peer.py30
-rw-r--r--python/tests_0-9/basic.py2
-rw-r--r--specs/amqp-dtx-preview.0-9.xml34
26 files changed, 322 insertions, 103 deletions
diff --git a/cpp/gentools/src/org/apache/qpid/gentools/CppGenerator.java b/cpp/gentools/src/org/apache/qpid/gentools/CppGenerator.java
index edcf9e09e3..fd3684125c 100644
--- a/cpp/gentools/src/org/apache/qpid/gentools/CppGenerator.java
+++ b/cpp/gentools/src/org/apache/qpid/gentools/CppGenerator.java
@@ -348,6 +348,8 @@ public class CppGenerator extends Generator
return generateConstructor(thisClass, method, version, 4, 4);
if (token.equals("${mb_server_operation_invoke}"))
return generateServerOperationsInvoke(thisClass, method, version, 4, 4);
+ if (token.equals("${mb_server_operation_invoke2}"))
+ return generateServerOperationsInvoke2(thisClass, method, version, 4, 4);
if (token.equals("${mb_buffer_param}"))
return method.fieldMap.size() > 0 ? " buffer" : "/*buffer*/";
if (token.equals("${hv_latest_major}"))
@@ -719,7 +721,7 @@ public class CppGenerator extends Generator
sb.append(cr);
sb.append(indent + "// ==================== class " + handlerClassName +
" ====================" + cr);
- sb.append(indent + "class " + handlerClassName);
+ sb.append(indent + "class " + handlerClassName + " : public virtual Invocable");
if (thisClass.versionSet.size() != globalVersionSet.size())
sb.append(" // AMQP Version(s) " + thisClass.versionSet + cr);
else
@@ -1085,11 +1087,11 @@ public class CppGenerator extends Generator
String tab = Utils.createSpaces(tabSize);
String namespace = version != null ? version.namespace() + "::" : "";
StringBuffer sb = new StringBuffer();
- sb.append(indent+tab+(method.isResponse(version) ? "" : "return ")+"channel.send(new ");
+ sb.append(indent+tab+(method.isResponse(version) ? "" : "return ")+"channel.send(make_shared_ptr(new ");
sb.append(namespace + methodBodyClassName + "( channel.getVersion()");
if (method.isResponse(version)) sb.append(", responseTo");
sb.append(generateMethodParameterList(fieldMap, indentSize + (5*tabSize), true, false, true));
- sb.append("));\n");
+ sb.append(")));\n");
return sb.toString();
}
@@ -1505,6 +1507,50 @@ public class CppGenerator extends Generator
return sb.toString();
}
+ protected String generateServerOperationsInvoke2(AmqpClass thisClass, AmqpMethod method,
+ AmqpVersion version, int indentSize, int tabSize)
+ throws AmqpTypeMappingException
+ {
+ String indent = Utils.createSpaces(indentSize);
+ String tab = Utils.createSpaces(tabSize);
+ StringBuffer sb = new StringBuffer();
+ String ptrType = "AMQP_ServerOperations:: " + thisClass.name + "Handler*";
+ if (isSpecialCase(thisClass.name, method.name)) {
+ sb.append(indent + "bool invoke(Invocable*)" + cr);
+ sb.append(indent + "{" + cr);
+ sb.append(indent + tab + "return false;" + cr);
+ sb.append(indent + "}" + cr);
+ } else if (method.serverMethodFlagMap.size() > 0) { // At least one AMQP version defines this method as a server method
+
+ Iterator<Boolean> bItr = method.serverMethodFlagMap.keySet().iterator();
+ while (bItr.hasNext())
+ {
+ if (bItr.next()) // This is a server operation
+ {
+ boolean fieldMapNotEmptyFlag = method.fieldMap.size() > 0;
+ sb.append(indent + "bool invoke(Invocable* target)" + cr);
+ sb.append(indent + "{" + cr);
+ sb.append(indent + tab + ptrType + " ptr = dynamic_cast<" + ptrType + ">(target);" + cr);
+ sb.append(indent + tab + "if (ptr) {" + cr);
+ sb.append(indent + tab + tab + "ptr->");
+ sb.append(parseForReservedWords(Utils.firstLower(method.name),null) + "(" + cr);
+ if (fieldMapNotEmptyFlag)
+ {
+ sb.append(generateFieldList(method.fieldMap, version, false, false, indentSize + 4*tabSize));
+ sb.append(indent + tab + tab + tab + tab);
+ }
+ sb.append(");" + cr);
+ sb.append(indent + tab + tab + "return true;" + cr);
+ sb.append(indent + tab + "} else {" + cr);
+ sb.append(indent + tab + tab + "return false;" + cr);
+ sb.append(indent + tab + "}" + cr);
+ sb.append(indent + "}" + cr);
+ }
+ }
+ }
+ return sb.toString();
+ }
+
// Methods for generation of code snippets for amqp_methods.h/cpp files
protected String generateMethodBodyIncludeList(AmqpModel model, int indentSize)
diff --git a/cpp/gentools/templ.cpp/AMQP_ServerOperations.h.tmpl b/cpp/gentools/templ.cpp/AMQP_ServerOperations.h.tmpl
index be8dc2a6b3..06ef88c84e 100644
--- a/cpp/gentools/templ.cpp/AMQP_ServerOperations.h.tmpl
+++ b/cpp/gentools/templ.cpp/AMQP_ServerOperations.h.tmpl
@@ -36,6 +36,13 @@ namespace framing {
class MethodContext;
+class Invocable
+{
+protected:
+ Invocable() {}
+ virtual ~Invocable() {}
+};
+
class AMQP_ServerOperations
{
protected:
diff --git a/cpp/gentools/templ.cpp/MethodBodyClass.h.tmpl b/cpp/gentools/templ.cpp/MethodBodyClass.h.tmpl
index 2e1b2ed482..093a5ffe90 100644
--- a/cpp/gentools/templ.cpp/MethodBodyClass.h.tmpl
+++ b/cpp/gentools/templ.cpp/MethodBodyClass.h.tmpl
@@ -100,6 +100,8 @@ ${mb_constructor_with_initializers}
${mb_server_operation_invoke}
+${mb_server_operation_invoke2}
+
}; // class ${CLASS}${METHOD}Body
${version_namespace_end}
diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp
index be43dacb27..9bf148bcf0 100644
--- a/cpp/src/qpid/broker/BrokerAdapter.cpp
+++ b/cpp/src/qpid/broker/BrokerAdapter.cpp
@@ -56,12 +56,12 @@ ProtocolVersion BrokerAdapter::getVersion() const {
void BrokerAdapter::ChannelHandlerImpl::open(const string& /*outOfBand*/){
channel.open();
// FIXME aconway 2007-01-04: provide valid ID as per ampq 0-9
- client.openOk(std::string()/* ID */);//GRS, context.getRequestId());
+ client.openOk(std::string()/* ID */);
}
void BrokerAdapter::ChannelHandlerImpl::flow(bool active){
channel.flow(active);
- client.flowOk(active);//GRS, context.getRequestId());
+ client.flowOk(active);
}
void BrokerAdapter::ChannelHandlerImpl::flowOk(bool /*active*/){}
@@ -70,7 +70,7 @@ void BrokerAdapter::ChannelHandlerImpl::close(uint16_t /*replyCode*/,
const string& /*replyText*/,
uint16_t /*classId*/, uint16_t /*methodId*/)
{
- client.closeOk();//GRS context.getRequestId());
+ client.closeOk();
// FIXME aconway 2007-01-18: Following line will "delete this". Ugly.
connection.closeChannel(channel.getId());
}
@@ -104,7 +104,7 @@ void BrokerAdapter::ExchangeHandlerImpl::declare(uint16_t /*ticket*/, const stri
}
}
if(!nowait){
- client.declareOk();//GRS context.getRequestId());
+ client.declareOk();
}
}
@@ -114,16 +114,16 @@ void BrokerAdapter::ExchangeHandlerImpl::delete_(uint16_t /*ticket*/,
Exchange::shared_ptr exchange(broker.getExchanges().get(name));
if (exchange->isDurable()) broker.getStore().destroy(*exchange);
broker.getExchanges().destroy(name);
- if(!nowait) client.deleteOk();//GRS context.getRequestId());
+ if(!nowait) client.deleteOk();
}
void BrokerAdapter::ExchangeHandlerImpl::query(u_int16_t /*ticket*/, const string& name)
{
try {
Exchange::shared_ptr exchange(broker.getExchanges().get(name));
- client.queryOk(exchange->getType(), exchange->isDurable(), false, exchange->getArgs());//GRS, context.getRequestId());
+ client.queryOk(exchange->getType(), exchange->isDurable(), false, exchange->getArgs());
} catch (const ChannelException& e) {
- client.queryOk("", false, true, FieldTable());//GRS, context.getRequestId());
+ client.queryOk("", false, true, FieldTable());
}
}
@@ -144,18 +144,18 @@ void BrokerAdapter::BindingHandlerImpl::query(u_int16_t /*ticket*/,
}
if (!exchange) {
- client.queryOk(true, false, false, false, false);//GRS, context.getRequestId());
+ client.queryOk(true, false, false, false, false);
} else if (!queueName.empty() && !queue) {
- client.queryOk(false, true, false, false, false);//GRS, context.getRequestId());
+ client.queryOk(false, true, false, false, false);
} else if (exchange->isBound(queue, key.empty() ? 0 : &key, args.count() > 0 ? &args : &args)) {
- client.queryOk(false, false, false, false, false);//GRS, context.getRequestId());
+ client.queryOk(false, false, false, false, false);
} else {
//need to test each specified option individually
bool queueMatched = queueName.empty() || exchange->isBound(queue, 0, 0);
bool keyMatched = key.empty() || exchange->isBound(Queue::shared_ptr(), &key, 0);
bool argsMatched = args.count() == 0 || exchange->isBound(Queue::shared_ptr(), 0, &args);
- client.queryOk(false, false, !queueMatched, !keyMatched, !argsMatched);//GRS, context.getRequestId());
+ client.queryOk(false, false, !queueMatched, !keyMatched, !argsMatched);
}
}
@@ -196,7 +196,7 @@ void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string&
if (!nowait) {
string queueName = queue->getName();
client.declareOk(
- queueName, queue->getMessageCount(), queue->getConsumerCount());//GRS, context.getRequestId());
+ queueName, queue->getMessageCount(), queue->getConsumerCount());
}
}
@@ -214,7 +214,7 @@ void BrokerAdapter::QueueHandlerImpl::bind(uint16_t /*ticket*/, const string& qu
broker.getStore().bind(*exchange, *queue, routingKey, arguments);
}
}
- if(!nowait) client.bindOk();//GRS context.getRequestId());
+ if(!nowait) client.bindOk();
}else{
throw ChannelException(
404, "Bind failed. No such exchange: " + exchangeName);
@@ -238,14 +238,14 @@ BrokerAdapter::QueueHandlerImpl::unbind(uint16_t /*ticket*/,
broker.getStore().unbind(*exchange, *queue, routingKey, arguments);
}
- client.unbindOk();//GRS context.getRequestId());
+ client.unbindOk();
}
void BrokerAdapter::QueueHandlerImpl::purge(uint16_t /*ticket*/, const string& queueName, bool nowait){
Queue::shared_ptr queue = getQueue(queueName);
int count = queue->purge();
- if(!nowait) client.purgeOk( count);//GRS, context.getRequestId());
+ if(!nowait) client.purgeOk( count);
}
void BrokerAdapter::QueueHandlerImpl::delete_(uint16_t /*ticket*/, const string& queue,
@@ -270,7 +270,7 @@ void BrokerAdapter::QueueHandlerImpl::delete_(uint16_t /*ticket*/, const string&
}
if(!nowait)
- client.deleteOk(count);//GRS, context.getRequestId());
+ client.deleteOk(count);
}
@@ -280,7 +280,7 @@ void BrokerAdapter::BasicHandlerImpl::qos(uint32_t prefetchSize, uint16_t prefet
//TODO: handle global
channel.setPrefetchSize(prefetchSize);
channel.setPrefetchCount(prefetchCount);
- client.qosOk();//GRS context.getRequestId());
+ client.qosOk();
}
void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/,
@@ -300,7 +300,7 @@ void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/,
channel.consume(std::auto_ptr<DeliveryAdapter>(new ConsumeAdapter(adapter, newTag, connection.getFrameMax())),
newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &fields);
- if(!nowait) client.consumeOk(newTag);//GRS, context.getRequestId());
+ if(!nowait) client.consumeOk(newTag);
//allow messages to be dispatched if required as there is now a consumer:
queue->requestDispatch();
@@ -309,7 +309,7 @@ void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/,
void BrokerAdapter::BasicHandlerImpl::cancel(const string& consumerTag, bool nowait){
channel.cancel(consumerTag);
- if(!nowait) client.cancelOk(consumerTag);//GRS, context.getRequestId());
+ if(!nowait) client.cancelOk(consumerTag);
}
void BrokerAdapter::BasicHandlerImpl::publish(uint16_t /*ticket*/,
@@ -333,7 +333,7 @@ void BrokerAdapter::BasicHandlerImpl::get(uint16_t /*ticket*/, const string& que
if(!channel.get(out, queue, !noAck)){
string clusterId;//not used, part of an imatix hack
- client.getEmpty(clusterId);//GRS, context.getRequestId());
+ client.getEmpty(clusterId);
}
}
@@ -351,19 +351,19 @@ void BrokerAdapter::BasicHandlerImpl::recover(bool requeue)
void BrokerAdapter::TxHandlerImpl::select()
{
channel.startTx();
- client.selectOk();//GRS context.getRequestId());
+ client.selectOk();
}
void BrokerAdapter::TxHandlerImpl::commit()
{
channel.commit();
- client.commitOk();//GRS context.getRequestId());
+ client.commitOk();
}
void BrokerAdapter::TxHandlerImpl::rollback()
{
channel.rollback();
- client.rollbackOk();//GRS context.getRequestId());
+ client.rollbackOk();
channel.recover(false);
}
@@ -378,7 +378,7 @@ void BrokerAdapter::ChannelHandlerImpl::ok()
//
void BrokerAdapter::ChannelHandlerImpl::ping()
{
- client.ok();//GRS context.getRequestId());
+ client.ok();
client.pong();
}
@@ -386,7 +386,7 @@ void BrokerAdapter::ChannelHandlerImpl::ping()
void
BrokerAdapter::ChannelHandlerImpl::pong()
{
- client.ok();//GRS context.getRequestId());
+ client.ok();
}
void BrokerAdapter::ChannelHandlerImpl::resume(const string& /*channel*/)
diff --git a/cpp/src/qpid/broker/BrokerAdapter.h b/cpp/src/qpid/broker/BrokerAdapter.h
index 41013e8bf6..a7e27a0ee6 100644
--- a/cpp/src/qpid/broker/BrokerAdapter.h
+++ b/cpp/src/qpid/broker/BrokerAdapter.h
@@ -79,6 +79,7 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations
DtxCoordinationHandler* getDtxCoordinationHandler() { return &dtxHandler; }
DtxDemarcationHandler* getDtxDemarcationHandler() { return &dtxHandler; }
+ ExecutionHandler* getExecutionHandler() { throw ConnectionException(531, "Wrong adapter for execution layer method!"); }
ConnectionHandler* getConnectionHandler() {
throw ConnectionException(503, "Can't access connection class on non-zero channel!");
diff --git a/cpp/src/qpid/broker/BrokerMessage.cpp b/cpp/src/qpid/broker/BrokerMessage.cpp
index 21b56869d4..d192b09a63 100644
--- a/cpp/src/qpid/broker/BrokerMessage.cpp
+++ b/cpp/src/qpid/broker/BrokerMessage.cpp
@@ -78,10 +78,10 @@ void BasicMessage::deliver(ChannelAdapter& channel,
const string& consumerTag, uint64_t deliveryTag,
uint32_t framesize)
{
- channel.send(
+ channel.send(make_shared_ptr(
new BasicDeliverBody(
channel.getVersion(), consumerTag, deliveryTag,
- getRedelivered(), getExchange(), getRoutingKey()));
+ getRedelivered(), getExchange(), getRoutingKey())));
sendContent(channel, framesize);
}
@@ -92,12 +92,12 @@ void BasicMessage::sendGetOk(ChannelAdapter& channel,
uint64_t deliveryTag,
uint32_t framesize)
{
- channel.send(
+ channel.send(make_shared_ptr(
new BasicGetOkBody(
channel.getVersion(),
responseTo,
deliveryTag, getRedelivered(), getExchange(),
- getRoutingKey(), messageCount));
+ getRoutingKey(), messageCount)));
sendContent(channel, framesize);
}
diff --git a/cpp/src/qpid/broker/BrokerMessageMessage.cpp b/cpp/src/qpid/broker/BrokerMessageMessage.cpp
index b23ebaf50b..01f8250b84 100644
--- a/cpp/src/qpid/broker/BrokerMessageMessage.cpp
+++ b/cpp/src/qpid/broker/BrokerMessageMessage.cpp
@@ -85,7 +85,7 @@ void MessageMessage::transferMessage(
if (ref){
// Open
- channel.send(new MessageOpenBody(channel.getVersion(), ref->getId()));
+ channel.send(make_shared_ptr(new MessageOpenBody(channel.getVersion(), ref->getId())));
// Appends
for(Reference::Appends::const_iterator a = ref->getAppends().begin();
a != ref->getAppends().end();
@@ -98,8 +98,8 @@ void MessageMessage::transferMessage(
string::size_type contentStart = 0;
while (sizeleft) {
string::size_type contentSize = sizeleft <= framesize ? sizeleft : framesize-overhead;
- channel.send(new MessageAppendBody(channel.getVersion(), ref->getId(),
- string(content, contentStart, contentSize)));
+ channel.send(make_shared_ptr(new MessageAppendBody(channel.getVersion(), ref->getId(),
+ string(content, contentStart, contentSize))));
sizeleft -= contentSize;
contentStart += contentSize;
}
@@ -108,7 +108,7 @@ void MessageMessage::transferMessage(
// The transfer
if ( transfer->size()<=framesize ) {
- channel.send(
+ channel.send(make_shared_ptr(
new MessageTransferBody(channel.getVersion(),
transfer->getTicket(),
consumerTag,
@@ -132,7 +132,7 @@ void MessageMessage::transferMessage(
transfer->getSecurityToken(),
transfer->getApplicationHeaders(),
body,
- transfer->getMandatory()));
+ transfer->getMandatory())));
} else {
// Thing to do here is to construct a simple reference message then deliver that instead
// fragmentation will be taken care of in the delivery if necessary;
@@ -172,7 +172,7 @@ void MessageMessage::transferMessage(
}
// Close any reference data
if (ref)
- channel.send(new MessageCloseBody(channel.getVersion(), ref->getId()));
+ channel.send(make_shared_ptr(new MessageCloseBody(channel.getVersion(), ref->getId())));
}
void MessageMessage::deliver(
diff --git a/cpp/src/qpid/broker/ConnectionAdapter.h b/cpp/src/qpid/broker/ConnectionAdapter.h
index 2e27abd333..b624102cd2 100644
--- a/cpp/src/qpid/broker/ConnectionAdapter.h
+++ b/cpp/src/qpid/broker/ConnectionAdapter.h
@@ -71,6 +71,7 @@ public:
TunnelHandler* getTunnelHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); }
DtxCoordinationHandler* getDtxCoordinationHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); }
DtxDemarcationHandler* getDtxDemarcationHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); }
+ ExecutionHandler* getExecutionHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); }
framing::ProtocolVersion getVersion() const;
};
diff --git a/cpp/src/qpid/broker/DtxHandlerImpl.cpp b/cpp/src/qpid/broker/DtxHandlerImpl.cpp
index 2aa53e66bd..d1f925e40c 100644
--- a/cpp/src/qpid/broker/DtxHandlerImpl.cpp
+++ b/cpp/src/qpid/broker/DtxHandlerImpl.cpp
@@ -52,7 +52,7 @@ const int XA_OK(8);
void DtxHandlerImpl::select()
{
channel.selectDtx();
- dClient.selectOk();//GRS context.getRequestId());
+ dClient.selectOk();
}
void DtxHandlerImpl::end(u_int16_t /*ticket*/,
@@ -66,7 +66,7 @@ void DtxHandlerImpl::end(u_int16_t /*ticket*/,
if (suspend) {
throw ConnectionException(503, "End and suspend cannot both be set.");
} else {
- dClient.endOk(XA_RBROLLBACK);//GRS, context.getRequestId());
+ dClient.endOk(XA_RBROLLBACK);
}
} else {
if (suspend) {
@@ -74,10 +74,10 @@ void DtxHandlerImpl::end(u_int16_t /*ticket*/,
} else {
channel.endDtx(xid, false);
}
- dClient.endOk(XA_OK);//GRS, context.getRequestId());
+ dClient.endOk(XA_OK);
}
} catch (const DtxTimeoutException& e) {
- dClient.endOk(XA_RBTIMEOUT);//GRS, context.getRequestId());
+ dClient.endOk(XA_RBTIMEOUT);
}
}
@@ -95,9 +95,9 @@ void DtxHandlerImpl::start(u_int16_t /*ticket*/,
} else {
channel.startDtx(xid, broker.getDtxManager(), join);
}
- dClient.startOk(XA_OK);//GRS, context.getRequestId());
+ dClient.startOk(XA_OK);
} catch (const DtxTimeoutException& e) {
- dClient.startOk(XA_RBTIMEOUT);//GRS, context.getRequestId());
+ dClient.startOk(XA_RBTIMEOUT);
}
}
@@ -108,9 +108,9 @@ void DtxHandlerImpl::prepare(u_int16_t /*ticket*/,
{
try {
bool ok = broker.getDtxManager().prepare(xid);
- cClient.prepareOk(ok ? XA_OK : XA_RBROLLBACK);//GRS, context.getRequestId());
+ cClient.prepareOk(ok ? XA_OK : XA_RBROLLBACK);
} catch (const DtxTimeoutException& e) {
- cClient.prepareOk(XA_RBTIMEOUT);//GRS, context.getRequestId());
+ cClient.prepareOk(XA_RBTIMEOUT);
}
}
@@ -120,9 +120,9 @@ void DtxHandlerImpl::commit(u_int16_t /*ticket*/,
{
try {
bool ok = broker.getDtxManager().commit(xid, onePhase);
- cClient.commitOk(ok ? XA_OK : XA_RBROLLBACK);//GRS, context.getRequestId());
+ cClient.commitOk(ok ? XA_OK : XA_RBROLLBACK);
} catch (const DtxTimeoutException& e) {
- cClient.commitOk(XA_RBTIMEOUT);//GRS, context.getRequestId());
+ cClient.commitOk(XA_RBTIMEOUT);
}
}
@@ -132,9 +132,9 @@ void DtxHandlerImpl::rollback(u_int16_t /*ticket*/,
{
try {
broker.getDtxManager().rollback(xid);
- cClient.rollbackOk(XA_OK);//GRS, context.getRequestId());
+ cClient.rollbackOk(XA_OK);
} catch (const DtxTimeoutException& e) {
- cClient.rollbackOk(XA_RBTIMEOUT);//GRS, context.getRequestId());
+ cClient.rollbackOk(XA_RBTIMEOUT);
}
}
@@ -171,7 +171,7 @@ void DtxHandlerImpl::recover(u_int16_t /*ticket*/,
FieldTable response;
response.setString("xids", data);
- cClient.recoverOk(response);//GRS, context.getRequestId());
+ cClient.recoverOk(response);
}
void DtxHandlerImpl::forget(u_int16_t /*ticket*/,
@@ -184,7 +184,7 @@ void DtxHandlerImpl::forget(u_int16_t /*ticket*/,
void DtxHandlerImpl::getTimeout(const string& xid)
{
uint32_t timeout = broker.getDtxManager().getTimeout(xid);
- cClient.getTimeoutOk(timeout);//GRS, context.getRequestId());
+ cClient.getTimeoutOk(timeout);
}
@@ -193,7 +193,7 @@ void DtxHandlerImpl::setTimeout(u_int16_t /*ticket*/,
u_int32_t timeout)
{
broker.getDtxManager().setTimeout(xid, timeout);
- cClient.setTimeoutOk();//GRS context.getRequestId());
+ cClient.setTimeoutOk();
}
void DtxHandlerImpl::setResponseTo(framing::RequestId r)
diff --git a/cpp/src/qpid/broker/InMemoryContent.cpp b/cpp/src/qpid/broker/InMemoryContent.cpp
index 96563b0329..a6ce820f7e 100644
--- a/cpp/src/qpid/broker/InMemoryContent.cpp
+++ b/cpp/src/qpid/broker/InMemoryContent.cpp
@@ -47,13 +47,13 @@ void InMemoryContent::send(ChannelAdapter& channel, uint32_t framesize)
uint32_t offset = 0;
for (int chunk = (*i)->size() / framesize; chunk > 0; chunk--) {
string data = (*i)->getData().substr(offset, framesize);
- channel.send(new AMQContentBody(data));
+ channel.send(make_shared_ptr(new AMQContentBody(data)));
offset += framesize;
}
uint32_t remainder = (*i)->size() % framesize;
if (remainder) {
string data = (*i)->getData().substr(offset, remainder);
- channel.send(new AMQContentBody(data));
+ channel.send(make_shared_ptr(new AMQContentBody(data)));
}
} else {
AMQBody::shared_ptr contentBody =
diff --git a/cpp/src/qpid/broker/LazyLoadedContent.cpp b/cpp/src/qpid/broker/LazyLoadedContent.cpp
index c11d049317..80d06ebf2b 100644
--- a/cpp/src/qpid/broker/LazyLoadedContent.cpp
+++ b/cpp/src/qpid/broker/LazyLoadedContent.cpp
@@ -52,12 +52,12 @@ void LazyLoadedContent::send(ChannelAdapter& channel, uint32_t framesize)
string data;
store->loadContent(*msg, data, offset,
remaining > framesize ? framesize : remaining);
- channel.send(new AMQContentBody(data));
+ channel.send(make_shared_ptr(new AMQContentBody(data)));
}
} else {
string data;
store->loadContent(*msg, data, 0, expectedSize);
- channel.send(new AMQContentBody(data));
+ channel.send(make_shared_ptr(new AMQContentBody(data)));
}
}
diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
index c9fbc2b95d..de32368158 100644
--- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp
+++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
@@ -45,14 +45,14 @@ void
MessageHandlerImpl::cancel(const string& destination )
{
channel.cancel(destination);
- client.ok();//GRS context.getRequestId());
+ client.ok();
}
void
MessageHandlerImpl::open(const string& reference)
{
references.open(reference);
- client.ok();//GRS context.getRequestId());
+ client.ok();
}
void
@@ -60,14 +60,14 @@ MessageHandlerImpl::append(const framing::MethodContext& context)
{
MessageAppendBody::shared_ptr body(boost::shared_polymorphic_downcast<MessageAppendBody>(context.methodBody));
references.get(body->getReference())->append(body);
- client.ok();//GRS context.getRequestId());
+ client.ok();
}
void
MessageHandlerImpl::close(const string& reference)
{
Reference::shared_ptr ref = references.get(reference);
- client.ok();//GRS context.getRequestId());
+ client.ok();
// Send any transfer messages to their correct exchanges and okay them
const Reference::Messages& msgs = ref->getMessages();
@@ -85,7 +85,7 @@ MessageHandlerImpl::checkpoint(const string& /*reference*/,
{
// Initial implementation (which is conforming) is to do nothing here
// and return offset zero for the resume
- client.ok();//GRS context.getRequestId());
+ client.ok();
}
void
@@ -95,7 +95,7 @@ MessageHandlerImpl::resume(const string& reference,
// Initial (null) implementation
// open reference and return 0 offset
references.open(reference);
- client.offset(0);//GRS, );//GRS, context.getRequestId());
+ client.offset(0);
}
void
@@ -123,7 +123,7 @@ MessageHandlerImpl::consume(uint16_t /*ticket*/,
channel.consume(std::auto_ptr<DeliveryAdapter>(new ConsumeAdapter(adapter, destination, connection.getFrameMax())),
tag, queue, !noAck, exclusive,
noLocal ? &connection : 0, &filter);
- client.ok();//GRS context.getRequestId());
+ client.ok();
// Dispatch messages as there is now a consumer.
queue->requestDispatch();
}
@@ -138,9 +138,9 @@ MessageHandlerImpl::get(uint16_t /*ticket*/,
GetAdapter out(adapter, queue, destination, connection.getFrameMax());
if(channel.get(out, queue, !noAck))
- client.ok();//GRS context.getRequestId());
+ client.ok();
else
- client.empty();//GRS context.getRequestId());
+ client.empty();
}
void
@@ -166,22 +166,19 @@ MessageHandlerImpl::qos(uint32_t prefetchSize,
//TODO: handle global
channel.setPrefetchSize(prefetchSize);
channel.setPrefetchCount(prefetchCount);
- client.ok();//GRS context.getRequestId());
+ client.ok();
}
void
MessageHandlerImpl::recover(bool requeue)
{
channel.recover(requeue);
- client.ok();//GRS context.getRequestId());
+ client.ok();
}
void
-MessageHandlerImpl::reject(uint16_t /*code*/,
- const string& /*text*/ )
+MessageHandlerImpl::reject(uint16_t /*code*/, const string& /*text*/ )
{
- //channel.ack();
- // channel.requeue();
}
void
diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp
index a96a8c5cde..f616ec2db8 100644
--- a/cpp/src/qpid/broker/SemanticHandler.cpp
+++ b/cpp/src/qpid/broker/SemanticHandler.cpp
@@ -36,7 +36,7 @@ SemanticHandler::SemanticHandler(ChannelId id, Connection& c) :
void SemanticHandler::handle(framing::AMQFrame& frame)
-{
+{
//TODO: assembly etc when move to 0-10 framing
//
//have potentially three separate tracks at this point:
@@ -56,16 +56,41 @@ void SemanticHandler::handle(framing::AMQFrame& frame)
//if ready to execute (i.e. if segment is complete or frame is
//message content):
handleBody(frame.getBody());
- //if the frameset is complete, we can move the execution-mark
- //forward (not for execution controls)
- //note: need to be more sophisticated than this if we execute
- //commands that arrive within an active message frameset
}
//ChannelAdapter virtual methods:
void SemanticHandler::handleMethodInContext(boost::shared_ptr<qpid::framing::AMQMethodBody> method,
const qpid::framing::MethodContext& context)
{
+ if (!method->invoke(this)) {
+ //else do the usual:
+ handleL4(method, context);
+ //(if the frameset is complete) we can move the execution-mark
+ //forward
+ ++(incoming.hwm);
+
+ //note: need to be more sophisticated than this if we execute
+ //commands that arrive within an active message frameset (that
+ //can't happen until 0-10 framing is implemented)
+ }
+}
+
+void SemanticHandler::complete(u_int32_t mark)
+{
+ //just record it for now (will eventually need to use it to ack messages):
+ outgoing.lwm = SequenceNumber(mark);
+}
+
+void SemanticHandler::flush()
+{
+ //flush doubles as a sync to begin with - send an execution.complete
+ incoming.lwm = incoming.hwm;
+ send(make_shared_ptr(new ExecutionCompleteBody(getVersion(), incoming.hwm.getValue())));
+}
+
+void SemanticHandler::handleL4(boost::shared_ptr<qpid::framing::AMQMethodBody> method,
+ const qpid::framing::MethodContext& context)
+{
try{
if(getId() != 0 && !method->isA<ChannelOpenBody>() && !isOpen()) {
if (!method->isA<ChannelCloseOkBody>()) {
diff --git a/cpp/src/qpid/broker/SemanticHandler.h b/cpp/src/qpid/broker/SemanticHandler.h
index eecf61466b..6003bbec0c 100644
--- a/cpp/src/qpid/broker/SemanticHandler.h
+++ b/cpp/src/qpid/broker/SemanticHandler.h
@@ -25,6 +25,7 @@
#include "BrokerChannel.h"
#include "Connection.h"
#include "qpid/framing/amqp_types.h"
+#include "qpid/framing/AMQP_ServerOperations.h"
#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/SequenceNumber.h"
@@ -34,11 +35,18 @@ namespace broker {
class BrokerAdapter;
class framing::ChannelAdapter;
-class SemanticHandler : private framing::ChannelAdapter, public framing::FrameHandler {
+class SemanticHandler : private framing::ChannelAdapter,
+ public framing::FrameHandler,
+ public framing::AMQP_ServerOperations::ExecutionHandler
+{
Connection& connection;
Channel channel;
std::auto_ptr<BrokerAdapter> adapter;
- framing::SequenceNumber executionMark;
+ framing::Window incoming;
+ framing::Window outgoing;
+
+ void handleL4(boost::shared_ptr<qpid::framing::AMQMethodBody> method,
+ const qpid::framing::MethodContext& context);
//ChannelAdapter virtual methods:
void handleMethodInContext(boost::shared_ptr<qpid::framing::AMQMethodBody> method,
@@ -50,6 +58,10 @@ class SemanticHandler : private framing::ChannelAdapter, public framing::FrameHa
public:
SemanticHandler(framing::ChannelId id, Connection& c);
void handle(framing::AMQFrame& frame);
+
+ //execution class method handlers:
+ void complete(u_int32_t cumulativeExecutionMark);
+ void flush();
};
}}
diff --git a/cpp/src/qpid/client/BasicMessageChannel.cpp b/cpp/src/qpid/client/BasicMessageChannel.cpp
index 60368268c0..a1aacdee4e 100644
--- a/cpp/src/qpid/client/BasicMessageChannel.cpp
+++ b/cpp/src/qpid/client/BasicMessageChannel.cpp
@@ -101,7 +101,7 @@ void BasicMessageChannel::cancel(const std::string& tag, bool synch) {
consumers.erase(i);
}
if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0) {
- channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true));
+ channel.send(make_shared_ptr(new BasicAckBody(channel.version, c.lastDeliveryTag, true)));
}
channel.sendAndReceiveSync<BasicCancelOkBody>(
synch, make_shared_ptr(new BasicCancelBody(channel.version, tag, !synch)));
@@ -119,9 +119,9 @@ void BasicMessageChannel::cancelAll(){
Consumer& c = i->second;
if (c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0)
{
- channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true));
+ channel.send(make_shared_ptr(new BasicAckBody(channel.version, c.lastDeliveryTag, true)));
}
- channel.send(new BasicCancelBody(channel.version, i->first, true));
+ channel.send(make_shared_ptr(new BasicCancelBody(channel.version, i->first, true)));
}
consumers.clear();
}
@@ -131,8 +131,7 @@ bool BasicMessageChannel::get(
{
// Prepare for incoming response
incoming.addDestination(BASIC_GET, destGet);
- channel.send(
- new BasicGetBody(channel.version, 0, queue.getName(), ackMode));
+ channel.send(make_shared_ptr(new BasicGetBody(channel.version, 0, queue.getName(), ackMode)));
bool got = destGet.wait(msg);
return got;
}
@@ -150,9 +149,7 @@ void BasicMessageChannel::publish(
*static_cast<BasicHeaderProperties*>(header->getProperties()), msg);
header->setContentSize(msg.getData().size());
- channel.send(
- new BasicPublishBody(
- channel.version, 0, e, key, mandatory, immediate));
+ channel.send(make_shared_ptr(new BasicPublishBody(channel.version, 0, e, key, mandatory, immediate)));
channel.send(header);
string data = msg.getData();
u_int64_t data_length = data.length();
@@ -160,14 +157,14 @@ void BasicMessageChannel::publish(
//frame itself uses 8 bytes
u_int32_t frag_size = channel.connection->getMaxFrameSize() - 8;
if(data_length < frag_size){
- channel.send(new AMQContentBody(data));
+ channel.send(make_shared_ptr(new AMQContentBody(data)));
}else{
u_int32_t offset = 0;
u_int32_t remaining = data_length - offset;
while (remaining > 0) {
u_int32_t length = remaining > frag_size ? frag_size : remaining;
string frag(data.substr(offset, length));
- channel.send(new AMQContentBody(frag));
+ channel.send(make_shared_ptr(new AMQContentBody(frag)));
offset += length;
remaining = data_length - offset;
@@ -268,11 +265,11 @@ void BasicMessageChannel::deliver(Consumer& consumer, Message& msg){
//else drop-through
case AUTO_ACK:
consumer.lastDeliveryTag = 0;
- channel.send(
+ channel.send(make_shared_ptr(
new BasicAckBody(
channel.version,
msg.getDeliveryTag(),
- multiple));
+ multiple)));
case NO_ACK: // Nothing to do
case CLIENT_ACK: // User code must ack.
break;
diff --git a/cpp/src/qpid/client/ClientChannel.cpp b/cpp/src/qpid/client/ClientChannel.cpp
index ab6b9a41c3..816ff05e85 100644
--- a/cpp/src/qpid/client/ClientChannel.cpp
+++ b/cpp/src/qpid/client/ClientChannel.cpp
@@ -92,10 +92,10 @@ void Channel::protocolInit(
connection->send(new AMQFrame(0, new ConnectionSecureOkBody(response)));
**/
- send(new ConnectionTuneOkBody(
+ sendCommand(make_shared_ptr(new ConnectionTuneOkBody(
version, proposal->getRequestId(),
proposal->getChannelMax(), connection->getMaxFrameSize(),
- proposal->getHeartbeat()));
+ proposal->getHeartbeat())));
uint16_t heartbeat = proposal->getHeartbeat();
connection->connector->setReadTimeout(heartbeat * 2);
@@ -104,7 +104,7 @@ void Channel::protocolInit(
// Send connection open.
std::string capabilities;
responses.expect();
- send(new ConnectionOpenBody(version, vhost, capabilities, true));
+ sendCommand(make_shared_ptr(new ConnectionOpenBody(version, vhost, capabilities, true)));
//receive connection.open-ok (or redirect, but ignore that for now
//esp. as using force=true).
AMQMethodBody::shared_ptr openResponse = responses.receive();
@@ -210,6 +210,7 @@ AMQMethodBody::shared_ptr method, const MethodContext& ctxt)
case BasicGetOkBody::CLASS_ID: messaging->handle(method); break;
case ChannelCloseBody::CLASS_ID: handleChannel(method, ctxt); break;
case ConnectionCloseBody::CLASS_ID: handleConnection(method); break;
+ case ExecutionCompleteBody::CLASS_ID: handleExecution(method); break;
default: throw UnknownMethod();
}
}
@@ -223,7 +224,7 @@ AMQMethodBody::shared_ptr method, const MethodContext& ctxt)
void Channel::handleChannel(AMQMethodBody::shared_ptr method, const MethodContext& ctxt) {
switch (method->amqpMethodId()) {
case ChannelCloseBody::METHOD_ID:
- send(new ChannelCloseOkBody(version, ctxt.getRequestId()));
+ sendCommand(make_shared_ptr(new ChannelCloseOkBody(version, ctxt.getRequestId())));
peerClose(shared_polymorphic_downcast<ChannelCloseBody>(method));
return;
case ChannelFlowBody::METHOD_ID:
@@ -241,6 +242,18 @@ void Channel::handleConnection(AMQMethodBody::shared_ptr method) {
throw UnknownMethod();
}
+void Channel::handleExecution(AMQMethodBody::shared_ptr method) {
+ if (method->amqpMethodId() == ExecutionCompleteBody::METHOD_ID) {
+ Monitor::ScopedLock l(outgoingMonitor);
+ //record the completion mark:
+ outgoing.lwm = shared_polymorphic_downcast<ExecutionCompleteBody>(method)->getCumulativeExecutionMark();
+ //TODO: notify anyone waiting for completion notification:
+ outgoingMonitor.notifyAll();
+ } else{
+ throw UnknownMethod();
+ }
+}
+
void Channel::handleHeader(AMQHeaderBody::shared_ptr body){
messaging->handle(body);
}
@@ -315,7 +328,7 @@ AMQMethodBody::shared_ptr Channel::sendAndReceive(
AMQMethodBody::shared_ptr toSend, ClassId c, MethodId m)
{
responses.expect();
- send(toSend);
+ sendCommand(toSend);
return responses.receive(c, m);
}
@@ -325,7 +338,7 @@ AMQMethodBody::shared_ptr Channel::sendAndReceiveSync(
if(sync)
return sendAndReceive(body, c, m);
else {
- send(body);
+ sendCommand(body);
return AMQMethodBody::shared_ptr();
}
}
@@ -362,3 +375,31 @@ void Channel::run() {
messaging->run();
}
+void Channel::sendCommand(AMQBody::shared_ptr body)
+{
+ ++(outgoing.hwm);
+ send(body);
+}
+
+bool Channel::waitForCompletion(SequenceNumber poi, Duration timeout)
+{
+ AbsTime end;
+ if (timeout == 0) {
+ end = AbsTime::FarFuture();
+ } else {
+ end = AbsTime(AbsTime::now(), timeout);
+ }
+
+ Monitor::ScopedLock l(outgoingMonitor);
+ while (end > AbsTime::now() && outgoing.lwm < poi) {
+ outgoingMonitor.wait(end);
+ }
+ return !(outgoing.lwm < poi);
+}
+
+bool Channel::synchWithServer(Duration timeout)
+{
+ send(make_shared_ptr(new ExecutionFlushBody(version)));
+ return waitForCompletion(outgoing.hwm, timeout);
+}
+
diff --git a/cpp/src/qpid/client/ClientChannel.h b/cpp/src/qpid/client/ClientChannel.h
index cea1245e6a..fc82fb41ff 100644
--- a/cpp/src/qpid/client/ClientChannel.h
+++ b/cpp/src/qpid/client/ClientChannel.h
@@ -29,6 +29,7 @@
#include "ResponseHandler.h"
#include "qpid/Exception.h"
#include "qpid/framing/ChannelAdapter.h"
+#include "qpid/framing/SequenceNumber.h"
#include "qpid/sys/Thread.h"
#include "AckMode.h"
@@ -64,6 +65,8 @@ class Channel : public framing::ChannelAdapter
Connection* connection;
sys::Thread dispatcher;
ResponseHandler responses;
+ sys::Monitor outgoingMonitor;
+ framing::Window outgoing;
uint16_t prefetch;
const bool transactional;
@@ -84,6 +87,7 @@ class Channel : public framing::ChannelAdapter
framing::AMQMethodBody::shared_ptr, const framing::MethodContext&);
void handleChannel(framing::AMQMethodBody::shared_ptr method, const framing::MethodContext& ctxt);
void handleConnection(framing::AMQMethodBody::shared_ptr method);
+ void handleExecution(framing::AMQMethodBody::shared_ptr method);
void setQos();
@@ -114,9 +118,12 @@ class Channel : public framing::ChannelAdapter
sync, body, BodyType::CLASS_ID, BodyType::METHOD_ID));
}
+ void sendCommand(framing::AMQBody::shared_ptr body);
+
void open(framing::ChannelId, Connection&);
void closeInternal();
void peerClose(boost::shared_ptr<framing::ChannelCloseBody>);
+ bool waitForCompletion(framing::SequenceNumber, sys::Duration);
// FIXME aconway 2007-02-23: Get rid of friendships.
friend class Connection;
@@ -358,7 +365,10 @@ class Channel : public framing::ChannelAdapter
*/
void run();
-
+ /**
+ * TESTING ONLY FOR NOW!
+ */
+ bool synchWithServer(sys::Duration timeout = 0);
};
}}
diff --git a/cpp/src/qpid/framing/AMQMethodBody.cpp b/cpp/src/qpid/framing/AMQMethodBody.cpp
index a7df0578c2..04941eaa58 100644
--- a/cpp/src/qpid/framing/AMQMethodBody.cpp
+++ b/cpp/src/qpid/framing/AMQMethodBody.cpp
@@ -36,6 +36,10 @@ void AMQMethodBody::invoke(AMQP_ServerOperations&, const MethodContext&){
THROW_QPID_ERROR(PROTOCOL_ERROR, "Method not supported by AMQP Server.");
}
+bool AMQMethodBody::invoke(Invocable*) {
+ return false;
+}
+
AMQMethodBody::shared_ptr AMQMethodBody::create(
AMQP_MethodVersionMap& versionMap, ProtocolVersion version,
Buffer& buffer)
diff --git a/cpp/src/qpid/framing/AMQMethodBody.h b/cpp/src/qpid/framing/AMQMethodBody.h
index 306c26cd27..55cf5cb864 100644
--- a/cpp/src/qpid/framing/AMQMethodBody.h
+++ b/cpp/src/qpid/framing/AMQMethodBody.h
@@ -52,6 +52,7 @@ class AMQMethodBody : public AMQBody
virtual ClassId amqpClassId() const = 0;
virtual void invoke(AMQP_ServerOperations&, const MethodContext&);
+ virtual bool invoke(Invocable* target);
template <class T> bool isA() {
return amqpClassId()==T::CLASS_ID && amqpMethodId()==T::METHOD_ID;
diff --git a/cpp/src/qpid/framing/ChannelAdapter.h b/cpp/src/qpid/framing/ChannelAdapter.h
index 1c3f29d762..50b1c9ff7e 100644
--- a/cpp/src/qpid/framing/ChannelAdapter.h
+++ b/cpp/src/qpid/framing/ChannelAdapter.h
@@ -78,10 +78,6 @@ class ChannelAdapter : protected BodyHandler {
RequestId send(shared_ptr<AMQBody> body,
Correlator::Action action=Correlator::Action());
- // TODO aconway 2007-04-05: remove and use make_shared_ptr at call sites.
- /**@deprecated Use make_shared_ptr with the other send() override */
- RequestId send(AMQBody* body) { return send(AMQBody::shared_ptr(body)); }
-
virtual bool isOpen() const = 0;
RequestId getFirstAckRequest() { return requester.getFirstAckRequest(); }
diff --git a/cpp/src/qpid/framing/SequenceNumber.h b/cpp/src/qpid/framing/SequenceNumber.h
index bf9d133cef..3e0dfea2af 100644
--- a/cpp/src/qpid/framing/SequenceNumber.h
+++ b/cpp/src/qpid/framing/SequenceNumber.h
@@ -48,6 +48,12 @@ class SequenceNumber
friend int32_t operator-(const SequenceNumber& a, const SequenceNumber& b);
};
+struct Window
+{
+ SequenceNumber hwm;
+ SequenceNumber lwm;
+};
+
}} // namespace qpid::framing
diff --git a/cpp/src/tests/client_test.cpp b/cpp/src/tests/client_test.cpp
index 3b8a3a2ee6..cefc4338eb 100644
--- a/cpp/src/tests/client_test.cpp
+++ b/cpp/src/tests/client_test.cpp
@@ -35,6 +35,7 @@
#include "qpid/client/ClientMessage.h"
#include "qpid/client/MessageListener.h"
#include "qpid/sys/Monitor.h"
+#include "qpid/sys/Time.h"
using namespace qpid::client;
using namespace qpid::sys;
@@ -117,6 +118,11 @@ int main(int argc, char** argv)
msg.setData(data);
channel.publish(msg, exchange, "MyTopic");
if (opts.trace) std::cout << "Published message: " << data << std::endl;
+ if (opts.trace) {
+ std::cout << "Publication "
+ << (channel.synchWithServer(qpid::sys::TIME_SEC * 1) ? " DID " : " did NOT ")
+ << "complete" << std::endl;
+ }
{
Monitor::ScopedLock l(monitor);
diff --git a/python/qpid/client.py b/python/qpid/client.py
index cdceb87bdf..f1800204db 100644
--- a/python/qpid/client.py
+++ b/python/qpid/client.py
@@ -140,6 +140,9 @@ class ClientDelegate(Delegate):
def connection_close(self, ch, msg):
self.client.peer.close(msg)
+ def execution_complete(self, ch, msg):
+ ch.completion.complete(msg.cumulative_execution_mark)
+
def close(self, reason):
self.client.closed = True
self.client.reason = reason
diff --git a/python/qpid/peer.py b/python/qpid/peer.py
index 72e6a19bc7..9880eea19b 100644
--- a/python/qpid/peer.py
+++ b/python/qpid/peer.py
@@ -186,6 +186,8 @@ class Channel:
self.requester = Requester(self.write)
self.responder = Responder(self.write)
+ self.completion = ExecutionCompletion()
+
# Use reliable framing if version == 0-9.
self.reliable = (spec.major == 0 and spec.minor == 9)
self.synchronous = True
@@ -247,6 +249,7 @@ class Channel:
self.responder.respond(method, batch, request)
def invoke(self, type, args, kwargs):
+ self.completion.next_command(type)
content = kwargs.pop("content", None)
frame = Method(type, type.arguments(*args, **kwargs))
if self.reliable:
@@ -337,3 +340,30 @@ class Future:
def is_complete(self):
return self.completed.isSet()
+
+class ExecutionCompletion:
+ def __init__(self):
+ self.completed = threading.Event()
+ self.sequence = Sequence(0)
+ self.command_id = 0
+ self.mark = 0
+
+ def next_command(self, method):
+ #the following test is a hack until the track/sub-channel is available
+ if method.klass.name != "execution":
+ self.command_id = self.sequence.next()
+
+ def complete(self, mark):
+ self.mark = mark
+ self.completed.set()
+ self.completed.clear()
+
+ def wait(self, point_of_interest=-1, timeout=None):
+ """
+ todo: really want to allow different threads to call this with
+ different points of interest on the same channel, this is a quick
+ hack for now
+ """
+ if point_of_interest == -1: point_of_interest = self.command_id
+ self.completed.wait(timeout)
+ return point_of_interest <= self.mark
diff --git a/python/tests_0-9/basic.py b/python/tests_0-9/basic.py
index 75633908cd..e7d22e00da 100644
--- a/python/tests_0-9/basic.py
+++ b/python/tests_0-9/basic.py
@@ -127,7 +127,7 @@ class BasicTests(TestBase):
channel.basic_publish(routing_key="test-queue-4", content=Content("One"))
myqueue = self.client.queue("my-consumer")
- msg = myqueue.get(timeout=5)
+ msg = myqueue.get(timeout=1)
self.assertEqual("One", msg.content.body)
#cancel should stop messages being delivered
diff --git a/specs/amqp-dtx-preview.0-9.xml b/specs/amqp-dtx-preview.0-9.xml
index defbdd067e..dd70e91d1d 100644
--- a/specs/amqp-dtx-preview.0-9.xml
+++ b/specs/amqp-dtx-preview.0-9.xml
@@ -1040,4 +1040,38 @@
</method>
</class>
+ <class name="execution" handler="execution" index="140">
+ <doc>
+ This class allows for efficiently communicating information
+ about completion of processing.
+ </doc>
+
+ <chassis name="server" implement="MUST"/>
+ <chassis name="client" implement="MUST"/>
+
+ <method name="flush" index="10" label="request an execution.complete return method">
+ <chassis name="server" implement="MUST"/>
+ <chassis name="client" implement="MUST"/>
+ </method>
+
+ <method name="complete" index="20">
+ <chassis name="server" implement="MUST"/>
+ <chassis name="client" implement="MUST"/>
+
+
+ <field name="cumulative-execution-mark" domain="long" label="Low-water mark for command ids">
+ <doc>
+ The low-water mark for executed command-ids. All ids below this mark have been executed;
+ above this mark, there are gaps containing unexecuted command ids (i.e. discontinuous). By
+ definition, the first id above this mark (if it exists) is an unexecuted command-id.
+ </doc>
+ </field>
+
+
+ <!-- The ranged mark on the complete method has been temporarily removed -->
+ </method>
+
+ </class>
+
+
</amqp>