summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/Makefile.am33
-rw-r--r--cpp/src/qpid/broker/BrokerAdapter.cpp57
-rw-r--r--cpp/src/qpid/broker/BrokerAdapter.h36
-rw-r--r--cpp/src/qpid/broker/BrokerChannel.cpp2
-rw-r--r--cpp/src/qpid/broker/BrokerQueue.h4
-rw-r--r--cpp/src/qpid/broker/DtxHandlerImpl.cpp63
-rw-r--r--cpp/src/qpid/broker/DtxHandlerImpl.h22
-rw-r--r--cpp/src/qpid/broker/MessageHandlerImpl.cpp20
-rw-r--r--cpp/src/qpid/broker/MessageHandlerImpl.h26
-rw-r--r--cpp/src/qpid/broker/SemanticHandler.cpp44
-rw-r--r--cpp/src/qpid/broker/SemanticHandler.h5
-rw-r--r--cpp/src/qpid/client/ClientChannel.cpp22
-rw-r--r--cpp/src/qpid/client/ClientChannel.h5
-rw-r--r--cpp/src/qpid/client/Connection.h2
-rw-r--r--cpp/src/qpid/client/ExecutionHandler.cpp22
-rw-r--r--cpp/src/qpid/client/ExecutionHandler.h5
-rw-r--r--cpp/src/qpid/framing/AMQP_HighestVersion.h40
-rw-r--r--cpp/src/qpid/framing/StructHelper.h56
-rw-r--r--cpp/src/qpid/framing/amqp_types_full.h1
-rwxr-xr-xcpp/src/tests/Blobbin223166 -> 0 bytes
-rw-r--r--cpp/src/tests/Cluster_child.cpp2
21 files changed, 312 insertions, 155 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index 98e38cf89a..70c43188d7 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -14,37 +14,23 @@ force:
# AMQP_XML is defined in ../configure.ac
specs=@AMQP_XML@ $(top_srcdir)/xml/cluster.xml
-EXTRA_DIST += generate.mk generate.sh $(generated_cpp) $(generated_h) $(rgen_srcs)
+EXTRA_DIST += $(rgen_h)
-if GENERATE
-
-# Java code generator.
-# Must generate into a separate gen directory because otherwise
-# there's no way to figure out which files are generated.
-
-gentools_dir=$(top_srcdir)/gentools
-$(srcdir)/generate.mk $(generated_cpp) $(generated_h): generate.timestamp
-generate.timestamp: generate.sh $(specs) $(generator)
- env gentools_dir=$(gentools_dir) specs="$(specs)" $(srcdir)/generate.sh
- touch $@
-
-# Empty rule in case a generator file is renamed/removed.
-$(generator):
+if GENERATE
maintainer-clean-local:
rm -rf gen
# Ruby generator.
rgen_dir=$(top_srcdir)/rubygen
-rgen_cmd=ruby -I $(rgen_dir) $(rgen_dir)/generate . $(specs) all $(srcdir)/rubygen.mk
+rgen_cmd=ruby -I $(rgen_dir) $(rgen_dir)/generate $(srcdir)/gen $(specs) all $(srcdir)/rubygen.mk
endif # GENERATE
-include $(srcdir)/generate.mk
include $(srcdir)/rubygen.mk
-DISTCLEANFILES=generate.mk rubygen.mk
+DISTCLEANFILES=rubygen.mk
# Code generated by C++
noinst_PROGRAMS=generate_MethodHolderMaxSize_h
@@ -110,6 +96,7 @@ libqpidcommon_la_LIBADD = \
$(LIB_CLOCK_GETTIME)
libqpidcommon_la_SOURCES = \
+ $(rgen_common_cpp) \
$(platform_src) \
qpid/framing/AMQBody.cpp \
qpid/framing/AMQMethodBody.cpp \
@@ -137,13 +124,10 @@ libqpidcommon_la_SOURCES = \
qpid/framing/FrameHandler.h \
qpid/framing/HandlerUpdater.h \
qpid/framing/Blob.h \
- qpid/framing/AMQP_ClientProxy.cpp \
- qpid/framing/AMQP_ServerProxy.cpp \
qpid/framing/variant.h \
- gen/qpid/framing/AMQP_HighestVersion.h \
+ qpid/framing/AMQP_HighestVersion.h \
qpid/framing/Blob.cpp \
qpid/framing/MethodHolder.h qpid/framing/MethodHolder.cpp \
- qpid/framing/MethodHolder_construct.cpp \
qpid/framing/MethodHolderMaxSize.h \
qpid/Exception.cpp \
qpid/Plugin.h \
@@ -217,6 +201,7 @@ libqpidbroker_la_SOURCES = \
libqpidclient_la_LIBADD = libqpidcommon.la
libqpidclient_la_SOURCES = \
+ $(rgen_client_cpp) \
qpid/client/ClientConnection.cpp \
qpid/client/ClientChannel.cpp \
qpid/client/ClientExchange.cpp \
@@ -233,12 +218,12 @@ libqpidclient_la_SOURCES = \
qpid/client/FutureResponse.cpp \
qpid/client/FutureFactory.cpp \
qpid/client/ReceivedContent.cpp \
- qpid/client/Session.cpp \
qpid/client/SessionCore.cpp \
qpid/client/StateManager.cpp
nobase_include_HEADERS = \
+ $(rgen_h) \
$(platform_hdr) \
qpid/broker/AccumulatedAck.h \
qpid/broker/BrokerChannel.h \
@@ -329,7 +314,6 @@ nobase_include_HEADERS = \
qpid/client/FutureFactory.h \
qpid/client/ReceivedContent.h \
qpid/client/Response.h \
- qpid/client/Session.h \
qpid/client/SessionCore.h \
qpid/client/StateManager.h \
qpid/framing/AMQBody.h \
@@ -357,6 +341,7 @@ nobase_include_HEADERS = \
qpid/framing/SerializeHandler.h \
qpid/framing/SequenceNumber.h \
qpid/framing/SequenceNumberSet.h \
+ qpid/framing/StructHelper.h \
qpid/framing/Value.h \
qpid/framing/Visitor.h \
qpid/framing/Uuid.h \
diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp
index 77030855ff..024516fb7b 100644
--- a/cpp/src/qpid/broker/BrokerAdapter.cpp
+++ b/cpp/src/qpid/broker/BrokerAdapter.cpp
@@ -137,17 +137,17 @@ void BrokerAdapter::ExchangeHandlerImpl::delete_(uint16_t /*ticket*/, const stri
broker.getExchanges().destroy(name);
}
-void BrokerAdapter::ExchangeHandlerImpl::query(u_int16_t /*ticket*/, const string& name)
+ExchangeQueryResult BrokerAdapter::ExchangeHandlerImpl::query(u_int16_t /*ticket*/, const string& name)
{
try {
Exchange::shared_ptr exchange(broker.getExchanges().get(name));
- client.queryOk(exchange->getType(), exchange->isDurable(), false, exchange->getArgs());
+ return ExchangeQueryResult(exchange->getType(), exchange->isDurable(), false, exchange->getArgs());
} catch (const ChannelException& e) {
- client.queryOk("", false, true, FieldTable());
+ return ExchangeQueryResult("", false, true, FieldTable());
}
}
-void BrokerAdapter::BindingHandlerImpl::query(u_int16_t /*ticket*/,
+BindingQueryResult BrokerAdapter::BindingHandlerImpl::query(u_int16_t /*ticket*/,
const std::string& exchangeName,
const std::string& queueName,
const std::string& key,
@@ -164,24 +164,40 @@ void BrokerAdapter::BindingHandlerImpl::query(u_int16_t /*ticket*/,
}
if (!exchange) {
- client.queryOk(true, false, false, false, false);
+ return BindingQueryResult(true, false, false, false, false);
} else if (!queueName.empty() && !queue) {
- client.queryOk(false, true, false, false, false);
+ return BindingQueryResult(false, true, false, false, false);
} else if (exchange->isBound(queue, key.empty() ? 0 : &key, args.count() > 0 ? &args : &args)) {
- client.queryOk(false, false, false, false, false);
+ return BindingQueryResult(false, false, false, false, false);
} else {
//need to test each specified option individually
bool queueMatched = queueName.empty() || exchange->isBound(queue, 0, 0);
bool keyMatched = key.empty() || exchange->isBound(Queue::shared_ptr(), &key, 0);
bool argsMatched = args.count() == 0 || exchange->isBound(Queue::shared_ptr(), 0, &args);
- client.queryOk(false, false, !queueMatched, !keyMatched, !argsMatched);
+ return BindingQueryResult(false, false, !queueMatched, !keyMatched, !argsMatched);
}
}
+QueueQueryResult BrokerAdapter::QueueHandlerImpl::query(const string& name)
+{
+ Queue::shared_ptr queue = getQueue(name);
+ Exchange::shared_ptr alternateExchange = queue->getAlternateExchange();
+
+ return QueueQueryResult(queue->getName(),
+ alternateExchange ? alternateExchange->getName() : "",
+ queue->isDurable(),
+ queue->hasExclusiveOwner(),
+ queue->isAutoDelete(),
+ queue->getSettings(),
+ queue->getMessageCount(),
+ queue->getConsumerCount());
+
+}
+
void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string& name, const string& alternateExchange,
bool passive, bool durable, bool exclusive,
- bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){
+ bool autoDelete, const qpid::framing::FieldTable& arguments){
Exchange::shared_ptr alternate;
if (!alternateExchange.empty()) {
alternate = broker.getExchanges().get(alternateExchange);
@@ -223,11 +239,6 @@ void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string&
405,
format("Cannot grant exclusive access to queue '%s'")
% queue->getName());
- if (!nowait) {
- string queueName = queue->getName();
- client.declareOk(
- queueName, queue->getMessageCount(), queue->getConsumerCount());
- }
}
void BrokerAdapter::QueueHandlerImpl::bind(uint16_t /*ticket*/, const string& queueName,
@@ -269,17 +280,13 @@ BrokerAdapter::QueueHandlerImpl::unbind(uint16_t /*ticket*/,
}
-void BrokerAdapter::QueueHandlerImpl::purge(uint16_t /*ticket*/, const string& queueName, bool nowait){
-
- Queue::shared_ptr queue = getQueue(queueName);
- int count = queue->purge();
- if(!nowait) client.purgeOk( count);
+void BrokerAdapter::QueueHandlerImpl::purge(uint16_t /*ticket*/, const string& queue){
+ getQueue(queue)->purge();
}
void BrokerAdapter::QueueHandlerImpl::delete_(uint16_t /*ticket*/, const string& queue,
- bool ifUnused, bool ifEmpty, bool nowait){
+ bool ifUnused, bool ifEmpty){
ChannelException error(0, "");
- int count(0);
Queue::shared_ptr q = getQueue(queue);
if(ifEmpty && q->getMessageCount() > 0){
throw ChannelException(406, "Queue not empty.");
@@ -291,14 +298,10 @@ void BrokerAdapter::QueueHandlerImpl::delete_(uint16_t /*ticket*/, const string&
QueueVector::iterator i = find(connection.exclusiveQueues.begin(), connection.exclusiveQueues.end(), q);
if(i < connection.exclusiveQueues.end()) connection.exclusiveQueues.erase(i);
}
- count = q->getMessageCount();
q->destroy();
broker.getQueues().destroy(queue);
q->unbind(broker.getExchanges(), q);
}
-
- if(!nowait)
- client.deleteOk(count);
}
@@ -333,10 +336,8 @@ void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/,
queue->requestDispatch();
}
-void BrokerAdapter::BasicHandlerImpl::cancel(const string& consumerTag, bool nowait){
+void BrokerAdapter::BasicHandlerImpl::cancel(const string& consumerTag){
channel.cancel(consumerTag);
-
- if(!nowait) client.cancelOk(consumerTag);
}
void BrokerAdapter::BasicHandlerImpl::publish(uint16_t /*ticket*/,
diff --git a/cpp/src/qpid/broker/BrokerAdapter.h b/cpp/src/qpid/broker/BrokerAdapter.h
index 3fe2eb9eba..99b7f14525 100644
--- a/cpp/src/qpid/broker/BrokerAdapter.h
+++ b/cpp/src/qpid/broker/BrokerAdapter.h
@@ -121,7 +121,7 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations
const qpid::framing::FieldTable& arguments);
void delete_(uint16_t ticket,
const std::string& exchange, bool ifUnused);
- void query(u_int16_t ticket, const string& name);
+ framing::ExchangeQueryResult query(u_int16_t ticket, const string& name);
private:
void checkType(Exchange::shared_ptr exchange, const std::string& type);
void checkAlternate(Exchange::shared_ptr exchange, Exchange::shared_ptr alternate);
@@ -134,11 +134,11 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations
public:
BindingHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {}
- void query(u_int16_t ticket,
- const std::string& exchange,
- const std::string& queue,
- const std::string& routingKey,
- const framing::FieldTable& arguments);
+ framing::BindingQueryResult query(u_int16_t ticket,
+ const std::string& exchange,
+ const std::string& queue,
+ const std::string& routingKey,
+ const framing::FieldTable& arguments);
};
class QueueHandlerImpl :
@@ -151,7 +151,7 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations
void declare(uint16_t ticket, const std::string& queue,
const std::string& alternateExchange,
bool passive, bool durable, bool exclusive,
- bool autoDelete, bool nowait,
+ bool autoDelete,
const qpid::framing::FieldTable& arguments);
void bind(uint16_t ticket, const std::string& queue,
const std::string& exchange, const std::string& routingKey,
@@ -161,11 +161,10 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations
const std::string& exchange,
const std::string& routingKey,
const qpid::framing::FieldTable& arguments );
- void purge(uint16_t ticket, const std::string& queue,
- bool nowait);
+ framing::QueueQueryResult query(const string& queue);
+ void purge(uint16_t ticket, const std::string& queue);
void delete_(uint16_t ticket, const std::string& queue,
- bool ifUnused, bool ifEmpty,
- bool nowait);
+ bool ifUnused, bool ifEmpty);
};
class BasicHandlerImpl :
@@ -179,18 +178,15 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations
void qos(uint32_t prefetchSize,
uint16_t prefetchCount, bool global);
- void consume(
- uint16_t ticket, const std::string& queue,
- const std::string& consumerTag, bool noLocal, bool noAck,
- bool exclusive, bool nowait,
- const qpid::framing::FieldTable& fields);
- void cancel(const std::string& consumerTag,
- bool nowait);
+ void consume(uint16_t ticket, const std::string& queue,
+ const std::string& consumerTag,
+ bool noLocal, bool noAck, bool exclusive, bool nowait,
+ const qpid::framing::FieldTable& fields);
+ void cancel(const std::string& consumerTag);
void publish(uint16_t ticket,
const std::string& exchange, const std::string& routingKey,
bool rejectUnroutable, bool immediate);
- void get(uint16_t ticket, const std::string& queue,
- bool noAck);
+ void get(uint16_t ticket, const std::string& queue, bool noAck);
void ack(uint64_t deliveryTag, bool multiple);
void reject(uint64_t deliveryTag, bool requeue);
void recover(bool requeue);
diff --git a/cpp/src/qpid/broker/BrokerChannel.cpp b/cpp/src/qpid/broker/BrokerChannel.cpp
index e135e960c4..0dc4bed661 100644
--- a/cpp/src/qpid/broker/BrokerChannel.cpp
+++ b/cpp/src/qpid/broker/BrokerChannel.cpp
@@ -374,7 +374,7 @@ void Channel::ack(DeliveryId first, DeliveryId last, bool cumulative)
//just acked single element (move end past it)
++end;
}
-
+
for_each(start, end, boost::bind(&Channel::acknowledged, this, _1));
if (txBuffer.get()) {
diff --git a/cpp/src/qpid/broker/BrokerQueue.h b/cpp/src/qpid/broker/BrokerQueue.h
index 857a7adfc2..35aa954c1e 100644
--- a/cpp/src/qpid/broker/BrokerQueue.h
+++ b/cpp/src/qpid/broker/BrokerQueue.h
@@ -145,8 +145,10 @@ namespace qpid {
inline const string& getName() const { return name; }
inline const bool isExclusiveOwner(const ConnectionToken* const o) const { return o == owner; }
inline bool hasExclusiveConsumer() const { return exclusive; }
+ inline bool hasExclusiveOwner() const { return owner != 0; }
inline bool isDurable() const { return store != 0; }
-
+ inline const framing::FieldTable& getSettings() const { return settings; }
+ inline bool isAutoDelete() const { return autodelete; }
bool canAutoDelete() const;
bool enqueue(TransactionContext* ctxt, Message::shared_ptr& msg);
diff --git a/cpp/src/qpid/broker/DtxHandlerImpl.cpp b/cpp/src/qpid/broker/DtxHandlerImpl.cpp
index 8b3629dff9..5a69ff0d65 100644
--- a/cpp/src/qpid/broker/DtxHandlerImpl.cpp
+++ b/cpp/src/qpid/broker/DtxHandlerImpl.cpp
@@ -22,19 +22,9 @@
#include "BrokerChannel.h"
using namespace qpid::broker;
-using qpid::framing::AMQP_ClientProxy;
-using qpid::framing::Buffer;
-using qpid::framing::FieldTable;
+using namespace qpid::framing;
using std::string;
-DtxHandlerImpl::DtxHandlerImpl(CoreRefs& parent) :
- CoreRefs(parent),
- dClient(AMQP_ClientProxy::DtxDemarcation::get(proxy)),
- cClient(AMQP_ClientProxy::DtxCoordination::get(proxy))
-
-{
-}
-
const int XA_RBROLLBACK(1);
const int XA_RBTIMEOUT(2);
const int XA_HEURHAZ(3);
@@ -44,6 +34,7 @@ const int XA_HEURMIX(6);
const int XA_RDONLY(7);
const int XA_OK(8);
+DtxHandlerImpl::DtxHandlerImpl(CoreRefs& parent) : CoreRefs(parent) {}
// DtxDemarcationHandler:
@@ -53,10 +44,10 @@ void DtxHandlerImpl::select()
channel.selectDtx();
}
-void DtxHandlerImpl::end(u_int16_t /*ticket*/,
- const string& xid,
- bool fail,
- bool suspend)
+DtxDemarcationEndResult DtxHandlerImpl::end(u_int16_t /*ticket*/,
+ const string& xid,
+ bool fail,
+ bool suspend)
{
try {
if (fail) {
@@ -64,7 +55,7 @@ void DtxHandlerImpl::end(u_int16_t /*ticket*/,
if (suspend) {
throw ConnectionException(503, "End and suspend cannot both be set.");
} else {
- dClient.endOk(XA_RBROLLBACK);
+ return DtxDemarcationEndResult(XA_RBROLLBACK);
}
} else {
if (suspend) {
@@ -72,14 +63,14 @@ void DtxHandlerImpl::end(u_int16_t /*ticket*/,
} else {
channel.endDtx(xid, false);
}
- dClient.endOk(XA_OK);
+ return DtxDemarcationEndResult(XA_OK);
}
} catch (const DtxTimeoutException& e) {
- dClient.endOk(XA_RBTIMEOUT);
+ return DtxDemarcationEndResult(XA_RBTIMEOUT);
}
}
-void DtxHandlerImpl::start(u_int16_t /*ticket*/,
+DtxDemarcationStartResult DtxHandlerImpl::start(u_int16_t /*ticket*/,
const string& xid,
bool join,
bool resume)
@@ -93,52 +84,52 @@ void DtxHandlerImpl::start(u_int16_t /*ticket*/,
} else {
channel.startDtx(xid, broker.getDtxManager(), join);
}
- dClient.startOk(XA_OK);
+ return DtxDemarcationStartResult(XA_OK);
} catch (const DtxTimeoutException& e) {
- dClient.startOk(XA_RBTIMEOUT);
+ return DtxDemarcationStartResult(XA_RBTIMEOUT);
}
}
// DtxCoordinationHandler:
-void DtxHandlerImpl::prepare(u_int16_t /*ticket*/,
+DtxCoordinationPrepareResult DtxHandlerImpl::prepare(u_int16_t /*ticket*/,
const string& xid)
{
try {
bool ok = broker.getDtxManager().prepare(xid);
- cClient.prepareOk(ok ? XA_OK : XA_RBROLLBACK);
+ return DtxCoordinationPrepareResult(ok ? XA_OK : XA_RBROLLBACK);
} catch (const DtxTimeoutException& e) {
- cClient.prepareOk(XA_RBTIMEOUT);
+ return DtxCoordinationPrepareResult(XA_RBTIMEOUT);
}
}
-void DtxHandlerImpl::commit(u_int16_t /*ticket*/,
+DtxCoordinationCommitResult DtxHandlerImpl::commit(u_int16_t /*ticket*/,
const string& xid,
bool onePhase)
{
try {
bool ok = broker.getDtxManager().commit(xid, onePhase);
- cClient.commitOk(ok ? XA_OK : XA_RBROLLBACK);
+ return DtxCoordinationCommitResult(ok ? XA_OK : XA_RBROLLBACK);
} catch (const DtxTimeoutException& e) {
- cClient.commitOk(XA_RBTIMEOUT);
+ return DtxCoordinationCommitResult(XA_RBTIMEOUT);
}
}
-void DtxHandlerImpl::rollback(u_int16_t /*ticket*/,
+DtxCoordinationRollbackResult DtxHandlerImpl::rollback(u_int16_t /*ticket*/,
const string& xid )
{
try {
broker.getDtxManager().rollback(xid);
- cClient.rollbackOk(XA_OK);
+ return DtxCoordinationRollbackResult(XA_OK);
} catch (const DtxTimeoutException& e) {
- cClient.rollbackOk(XA_RBTIMEOUT);
+ return DtxCoordinationRollbackResult(XA_RBTIMEOUT);
}
}
-void DtxHandlerImpl::recover(u_int16_t /*ticket*/,
- bool /*startscan*/,
- bool /*endscan*/ )
+DtxCoordinationRecoverResult DtxHandlerImpl::recover(u_int16_t /*ticket*/,
+ bool /*startscan*/,
+ bool /*endscan*/ )
{
//TODO: what do startscan and endscan actually mean?
@@ -169,7 +160,7 @@ void DtxHandlerImpl::recover(u_int16_t /*ticket*/,
FieldTable response;
response.setString("xids", data);
- cClient.recoverOk(response);
+ return DtxCoordinationRecoverResult(response);
}
void DtxHandlerImpl::forget(u_int16_t /*ticket*/,
@@ -179,10 +170,10 @@ void DtxHandlerImpl::forget(u_int16_t /*ticket*/,
throw ConnectionException(503, boost::format("Forget is invalid. Branch with xid %1% not heuristically completed!") % xid);
}
-void DtxHandlerImpl::getTimeout(const string& xid)
+DtxCoordinationGetTimeoutResult DtxHandlerImpl::getTimeout(const string& xid)
{
uint32_t timeout = broker.getDtxManager().getTimeout(xid);
- cClient.getTimeoutOk(timeout);
+ return DtxCoordinationGetTimeoutResult(timeout);
}
diff --git a/cpp/src/qpid/broker/DtxHandlerImpl.h b/cpp/src/qpid/broker/DtxHandlerImpl.h
index 067ba47fb5..da6379b26c 100644
--- a/cpp/src/qpid/broker/DtxHandlerImpl.h
+++ b/cpp/src/qpid/broker/DtxHandlerImpl.h
@@ -31,34 +31,34 @@ class DtxHandlerImpl
public framing::AMQP_ServerOperations::DtxCoordinationHandler,
public framing::AMQP_ServerOperations::DtxDemarcationHandler
{
- framing::AMQP_ClientProxy::DtxDemarcation dClient;
- framing::AMQP_ClientProxy::DtxCoordination cClient;
public:
DtxHandlerImpl(CoreRefs& parent);
// DtxCoordinationHandler:
- void commit(u_int16_t ticket, const std::string& xid, bool onePhase);
+ framing::DtxCoordinationCommitResult commit(u_int16_t ticket, const std::string& xid, bool onePhase);
void forget(u_int16_t ticket, const std::string& xid);
- void getTimeout(const std::string& xid);
+ framing::DtxCoordinationGetTimeoutResult getTimeout(const std::string& xid);
- void prepare(u_int16_t ticket, const std::string& xid);
+ framing::DtxCoordinationPrepareResult prepare(u_int16_t ticket, const std::string& xid);
- void recover(u_int16_t ticket, bool startscan, bool endscan);
+ framing::DtxCoordinationRecoverResult recover(u_int16_t ticket, bool startscan, bool endscan);
- void rollback(u_int16_t ticket, const std::string& xid);
+ framing::DtxCoordinationRollbackResult rollback(u_int16_t ticket, const std::string& xid);
void setTimeout(u_int16_t ticket, const std::string& xid, u_int32_t timeout);
// DtxDemarcationHandler:
-
- void end(u_int16_t ticket, const std::string& xid, bool fail, bool suspend);
-
+
+ framing::DtxDemarcationEndResult end(u_int16_t ticket, const std::string& xid, bool fail, bool suspend);
+
void select();
+
+ framing::DtxDemarcationStartResult start(u_int16_t ticket, const std::string& xid, bool join, bool resume);
+
- void start(u_int16_t ticket, const std::string& xid, bool join, bool resume);
};
diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
index 3f407c11f7..ce1fa1e028 100644
--- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp
+++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
@@ -87,19 +87,21 @@ MessageHandlerImpl::offset(uint64_t /*value*/ )
}
void
-MessageHandlerImpl::consume(uint16_t /*ticket*/,
+MessageHandlerImpl::subscribe(uint16_t /*ticket*/,
const string& queueName,
const string& destination,
bool noLocal,
- bool noAck,
+ u_int8_t confirmMode,
+ u_int8_t /*acquireMode*/,//TODO: implement acquire modes
bool exclusive,
const framing::FieldTable& filter )
{
Queue::shared_ptr queue = getQueue(queueName);
if(!destination.empty() && channel.exists(destination))
throw ConnectionException(530, "Consumer tags must be unique");
+
string tag = destination;
- channel.consume(MessageMessage::getToken(destination), tag, queue, noLocal, !noAck, exclusive, &filter);
+ channel.consume(MessageMessage::getToken(destination), tag, queue, noLocal, confirmMode == 1, exclusive, &filter);
// Dispatch messages as there is now a consumer.
queue->requestDispatch();
}
@@ -153,8 +155,9 @@ MessageHandlerImpl::recover(bool requeue)
}
void
-MessageHandlerImpl::reject(uint16_t /*code*/, const string& /*text*/ )
+MessageHandlerImpl::reject(const SequenceNumberSet& /*transfers*/, uint16_t /*code*/, const string& /*text*/ )
{
+ //TODO: implement
}
void
@@ -210,5 +213,14 @@ void MessageHandlerImpl::stop(const std::string& destination)
channel.stop(destination);
}
+void MessageHandlerImpl::acquire(const SequenceNumberSet& /*transfers*/, u_int8_t /*mode*/)
+{
+ throw ConnectionException(540, "Not yet implemented");
+}
+
+void MessageHandlerImpl::release(const SequenceNumberSet& /*transfers*/)
+{
+ throw ConnectionException(540, "Not yet implemented");
+}
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.h b/cpp/src/qpid/broker/MessageHandlerImpl.h
index 20cae46da4..f4d9fa0c76 100644
--- a/cpp/src/qpid/broker/MessageHandlerImpl.h
+++ b/cpp/src/qpid/broker/MessageHandlerImpl.h
@@ -49,14 +49,6 @@ class MessageHandlerImpl :
void close(const std::string& reference );
- void consume(uint16_t ticket,
- const std::string& queue,
- const std::string& destination,
- bool noLocal,
- bool noAck,
- bool exclusive,
- const framing::FieldTable& filter );
-
void empty();
void get(uint16_t ticket,
@@ -76,8 +68,9 @@ class MessageHandlerImpl :
void recover(bool requeue );
- void reject(uint16_t code,
- const std::string& text );
+ void reject(const framing::SequenceNumberSet& transfers,
+ uint16_t code,
+ const std::string& text );
void resume(const std::string& reference,
const std::string& identifier );
@@ -92,6 +85,19 @@ class MessageHandlerImpl :
void stop(const std::string& destination);
+ void acquire(const framing::SequenceNumberSet& transfers, u_int8_t mode);
+
+ void release(const framing::SequenceNumberSet& transfers);
+
+ void subscribe(u_int16_t ticket,
+ const string& queue,
+ const string& destination,
+ bool noLocal,
+ u_int8_t confirmMode,
+ u_int8_t acquireMode,
+ bool exclusive,
+ const framing::FieldTable& filter);
+
private:
ReferenceRegistry references;
};
diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp
index b7aa2aad25..f65e450e82 100644
--- a/cpp/src/qpid/broker/SemanticHandler.cpp
+++ b/cpp/src/qpid/broker/SemanticHandler.cpp
@@ -22,8 +22,10 @@
#include "SemanticHandler.h"
#include "BrokerAdapter.h"
#include "qpid/framing/ChannelAdapter.h"
-#include "qpid/framing/ExecutionCompleteBody.h"
#include "qpid/framing/ChannelCloseOkBody.h"
+#include "qpid/framing/ExecutionCompleteBody.h"
+#include "qpid/framing/ExecutionResultBody.h"
+#include "qpid/framing/InvocationVisitor.h"
using namespace qpid::broker;
using namespace qpid::framing;
@@ -66,6 +68,11 @@ void SemanticHandler::handleMethod(framing::AMQMethodBody* method)
{
try {
if (!method->invoke(this)) {
+ //temporary hack until channel management is moved to its own handler:
+ if (method->amqpClassId() != ChannelOpenBody::CLASS_ID) {
+ ++(incoming.lwm);
+ }
+
//else do the usual:
handleL4(method);
//(if the frameset is complete) we can move the execution-mark
@@ -73,7 +80,9 @@ void SemanticHandler::handleMethod(framing::AMQMethodBody* method)
//temporary hack until channel management is moved to its own handler:
if (method->amqpClassId() != ChannelOpenBody::CLASS_ID) {
- ++(incoming.hwm);
+ //TODO: need to account for async store opreations
+ //when this command is a message publication
+ ++(incoming.hwm);
}
//note: need to be more sophisticated than this if we execute
@@ -85,7 +94,7 @@ void SemanticHandler::handleMethod(framing::AMQMethodBody* method)
}
}
-void SemanticHandler::complete(uint32_t cumulative, SequenceNumberSet range)
+void SemanticHandler::complete(uint32_t cumulative, const SequenceNumberSet& range)
{
//record:
SequenceNumber mark(cumulative);
@@ -98,7 +107,7 @@ void SemanticHandler::complete(uint32_t cumulative, SequenceNumberSet range)
if (range.size() % 2) { //must be even number
throw ConnectionException(530, "Received odd number of elements in ranged mark");
} else {
- for (SequenceNumberSet::iterator i = range.begin(); i != range.end(); i++) {
+ for (SequenceNumberSet::const_iterator i = range.begin(); i != range.end(); i++) {
channel.ackRange((uint64_t) i->getValue(), (uint64_t) (++i)->getValue());
}
}
@@ -113,6 +122,25 @@ void SemanticHandler::flush()
ChannelAdapter::send(ExecutionCompleteBody(getVersion(), incoming.hwm.getValue(), SequenceNumberSet()));
}
}
+void SemanticHandler::sync()
+{
+ //for now, just treat as flush; will need to get more clever when we deal with async publication
+ flush();
+}
+
+void SemanticHandler::noop()
+{
+ //Do nothing...
+ //
+ //is this an L3 control? or is it an L4 command?
+ //if the former, of what use is it?
+ //if the latter it may contain a synch request... but its odd to have it in this class
+}
+
+void SemanticHandler::result(uint32_t /*command*/, const std::string& /*data*/)
+{
+ //never actually sent by client at present
+}
void SemanticHandler::handleL4(framing::AMQMethodBody* method)
{
@@ -124,7 +152,13 @@ void SemanticHandler::handleL4(framing::AMQMethodBody* method)
throw ConnectionException(504, out.str());
}
} else {
- method->invoke(*adapter);
+ InvocationVisitor v(adapter.get());
+ method->accept(v);
+ if (!v.wasHandled()) {
+ throw ConnectionException(540, "Not implemented");
+ } else if (v.hasResult()) {
+ ChannelAdapter::send(ExecutionResultBody(getVersion(), incoming.lwm.getValue(), v.getResult()));
+ }
}
}catch(const ChannelException& e){
adapter->getProxy().getChannel().close(
diff --git a/cpp/src/qpid/broker/SemanticHandler.h b/cpp/src/qpid/broker/SemanticHandler.h
index 6748da8500..672c6ad929 100644
--- a/cpp/src/qpid/broker/SemanticHandler.h
+++ b/cpp/src/qpid/broker/SemanticHandler.h
@@ -79,8 +79,11 @@ public:
void handle(framing::AMQFrame& frame);
//execution class method handlers:
- void complete(uint32_t cumulativeExecutionMark, framing::SequenceNumberSet range);
+ void complete(uint32_t cumulativeExecutionMark, const framing::SequenceNumberSet& range);
void flush();
+ void noop();
+ void result(uint32_t command, const std::string& data);
+ void sync();
};
}}
diff --git a/cpp/src/qpid/client/ClientChannel.cpp b/cpp/src/qpid/client/ClientChannel.cpp
index aa73e83328..d1cc4734eb 100644
--- a/cpp/src/qpid/client/ClientChannel.cpp
+++ b/cpp/src/qpid/client/ClientChannel.cpp
@@ -20,6 +20,7 @@
*/
#include "qpid/log/Statement.h"
#include <iostream>
+#include <sstream>
#include "ClientChannel.h"
#include "qpid/sys/Monitor.h"
#include "ClientMessage.h"
@@ -54,7 +55,8 @@ class ScopedSync
};
Channel::Channel(bool _transactional, u_int16_t _prefetch) :
- prefetch(_prefetch), transactional(_transactional), running(false)
+ prefetch(_prefetch), transactional(_transactional), running(false),
+ uniqueId(true)/*could eventually be the session id*/, nameCounter(0)
{
}
@@ -103,20 +105,22 @@ void Channel::deleteExchange(Exchange& exchange, bool synch){
}
void Channel::declareQueue(Queue& queue, bool synch){
+ if (queue.getName().empty()) {
+ stringstream uniqueName;
+ uniqueName << uniqueId << "-queue-" << ++nameCounter;
+ queue.setName(uniqueName.str());
+ }
+
FieldTable args;
ScopedSync s(*session, synch);
- Response r = session->queueDeclare(0, queue.getName(), empty, false/*passive*/, queue.isDurable(),
- queue.isExclusive(), queue.isAutoDelete(), !synch, args);
+ session->queueDeclare(0, queue.getName(), empty, false/*passive*/, queue.isDurable(),
+ queue.isExclusive(), queue.isAutoDelete(), args);
- if(synch) {
- if(queue.getName().length() == 0)
- queue.setName(r.as<QueueDeclareOkBody>().getQueue());
- }
}
void Channel::deleteQueue(Queue& queue, bool ifunused, bool ifempty, bool synch){
ScopedSync s(*session, synch);
- session->queueDelete(0, queue.getName(), ifunused, ifempty, !synch);
+ session->queueDelete(0, queue.getName(), ifunused, ifempty);
}
void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){
@@ -168,7 +172,7 @@ void Channel::cancel(const std::string& tag, bool synch) {
consumers.erase(i);
}
ScopedSync s(*session, synch);
- session->basicCancel(tag, !synch);
+ session->basicCancel(tag);
}
bool Channel::get(Message& msg, const Queue& queue, AckMode ackMode) {
diff --git a/cpp/src/qpid/client/ClientChannel.h b/cpp/src/qpid/client/ClientChannel.h
index 98e04db109..d73addc950 100644
--- a/cpp/src/qpid/client/ClientChannel.h
+++ b/cpp/src/qpid/client/ClientChannel.h
@@ -24,11 +24,12 @@
#include <memory>
#include <boost/scoped_ptr.hpp>
#include "qpid/framing/amqp_framing.h"
+#include "qpid/framing/Uuid.h"
#include "ClientExchange.h"
#include "ClientMessage.h"
#include "ClientQueue.h"
#include "ConnectionImpl.h"
-#include "Session.h"
+#include "qpid/client/Session.h"
#include "qpid/Exception.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/Runnable.h"
@@ -83,6 +84,8 @@ class Channel : private sys::Runnable
SessionCore::shared_ptr sessionCore;
framing::ChannelId channelId;
BlockingQueue<ReceivedContent::shared_ptr> gets;
+ framing::Uuid uniqueId;
+ uint32_t nameCounter;
void stop();
diff --git a/cpp/src/qpid/client/Connection.h b/cpp/src/qpid/client/Connection.h
index e41ab363b5..e309b5c63e 100644
--- a/cpp/src/qpid/client/Connection.h
+++ b/cpp/src/qpid/client/Connection.h
@@ -26,7 +26,7 @@
#include "qpid/QpidError.h"
#include "ClientChannel.h"
#include "ConnectionImpl.h"
-#include "Session.h"
+#include "qpid/client/Session.h"
#include "qpid/framing/AMQP_HighestVersion.h"
diff --git a/cpp/src/qpid/client/ExecutionHandler.cpp b/cpp/src/qpid/client/ExecutionHandler.cpp
index 6ee6429b6b..6c2600d00b 100644
--- a/cpp/src/qpid/client/ExecutionHandler.cpp
+++ b/cpp/src/qpid/client/ExecutionHandler.cpp
@@ -78,7 +78,7 @@ void ExecutionHandler::handle(AMQFrame& frame)
}
}
-void ExecutionHandler::complete(uint32_t cumulative, SequenceNumberSet range)
+void ExecutionHandler::complete(uint32_t cumulative, const SequenceNumberSet& range)
{
SequenceNumber mark(cumulative);
if (outgoing.lwm < mark) {
@@ -101,6 +101,26 @@ void ExecutionHandler::flush()
incoming.lwm = incoming.hwm;
}
+void ExecutionHandler::noop()
+{
+ //do nothing
+}
+
+void ExecutionHandler::result(uint32_t /*command*/, const std::string& /*data*/)
+{
+ //TODO: need to signal the result to the appropriate listener
+}
+
+void ExecutionHandler::sync()
+{
+ //TODO: implement (the application is in charge of completion of
+ //some commands, so need to track completion for them).
+
+ //This shouldn't ever need to be called by the server (in my
+ //opinion) as the server never needs to synchronise with the
+ //clients execution
+}
+
void ExecutionHandler::sendFlush()
{
AMQFrame frame(version, 0, ExecutionFlushBody());
diff --git a/cpp/src/qpid/client/ExecutionHandler.h b/cpp/src/qpid/client/ExecutionHandler.h
index 21613df779..b409d5df7b 100644
--- a/cpp/src/qpid/client/ExecutionHandler.h
+++ b/cpp/src/qpid/client/ExecutionHandler.h
@@ -45,8 +45,11 @@ class ExecutionHandler :
framing::ProtocolVersion version;
uint64_t maxFrameSize;
- void complete(uint32_t mark, framing::SequenceNumberSet range);
+ void complete(uint32_t mark, const framing::SequenceNumberSet& range);
void flush();
+ void noop();
+ void result(uint32_t command, const std::string& data);
+ void sync();
public:
BlockingQueue<ReceivedContent::shared_ptr> received;
diff --git a/cpp/src/qpid/framing/AMQP_HighestVersion.h b/cpp/src/qpid/framing/AMQP_HighestVersion.h
new file mode 100644
index 0000000000..42139c7937
--- /dev/null
+++ b/cpp/src/qpid/framing/AMQP_HighestVersion.h
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/*
+ * This file used to be auto-generated by Qpid Gentools v.0.1
+ * its here temporarily until we get a full solution to multi-version support
+ */
+#ifndef qpid_framing_highestProtocolVersion__
+#define qpid_framing_highestProtocolVersion__
+
+#include "qpid/framing/ProtocolVersion.h"
+
+
+namespace qpid {
+namespace framing {
+
+static ProtocolVersion highestProtocolVersion(0, 10);
+
+} /* namespace framing */
+} /* namespace qpid */
+
+#endif
diff --git a/cpp/src/qpid/framing/StructHelper.h b/cpp/src/qpid/framing/StructHelper.h
new file mode 100644
index 0000000000..b5d1b1e78c
--- /dev/null
+++ b/cpp/src/qpid/framing/StructHelper.h
@@ -0,0 +1,56 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _StructHelper_
+#define _StructHelper_
+
+#include "qpid/Exception.h"
+
+namespace qpid {
+namespace framing {
+
+class StructHelper
+{
+public:
+
+ template <class T> void encode(const T t, std::string& data) {
+ uint32_t size = t.size() + 2/*type*/;
+ Buffer buffer(size);
+ buffer.putShort(T::TYPE);
+ t.encode(buffer);
+ buffer.flip();
+ buffer.getRawData(data, size);
+ }
+
+ template <class T> void decode(T t, std::string& data) {
+ Buffer buffer(data.length());
+ buffer.putRawData(data);
+ buffer.flip();
+ uint16_t type = buffer.getShort();
+ if (type == T::TYPE) {
+ t.decode(buffer);
+ } else {
+ throw Exception("Type code does not match");
+ }
+ }
+};
+
+}}
+#endif
diff --git a/cpp/src/qpid/framing/amqp_types_full.h b/cpp/src/qpid/framing/amqp_types_full.h
index 6a24a99d38..027b563caf 100644
--- a/cpp/src/qpid/framing/amqp_types_full.h
+++ b/cpp/src/qpid/framing/amqp_types_full.h
@@ -32,5 +32,6 @@
#include "amqp_types.h"
#include "FramingContent.h"
#include "FieldTable.h"
+#include "SequenceNumberSet.h"
#endif /*!_framing_amqp_types_decl_h*/
diff --git a/cpp/src/tests/Blob b/cpp/src/tests/Blob
deleted file mode 100755
index 05e67dc01f..0000000000
--- a/cpp/src/tests/Blob
+++ /dev/null
Binary files differ
diff --git a/cpp/src/tests/Cluster_child.cpp b/cpp/src/tests/Cluster_child.cpp
index 62ccb9bd72..7789ada614 100644
--- a/cpp/src/tests/Cluster_child.cpp
+++ b/cpp/src/tests/Cluster_child.cpp
@@ -32,7 +32,7 @@ using namespace qpid::log;
static const ProtocolVersion VER;
-/** Chlid part of Cluster::clusterTwo test */
+/** Child part of Cluster::clusterTwo test */
void clusterTwo() {
TestCluster cluster("clusterTwo", "amqp:child:2");
AMQFrame frame;