diff options
Diffstat (limited to 'cpp/src')
21 files changed, 312 insertions, 155 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 98e38cf89a..70c43188d7 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -14,37 +14,23 @@ force: # AMQP_XML is defined in ../configure.ac specs=@AMQP_XML@ $(top_srcdir)/xml/cluster.xml -EXTRA_DIST += generate.mk generate.sh $(generated_cpp) $(generated_h) $(rgen_srcs) +EXTRA_DIST += $(rgen_h) -if GENERATE - -# Java code generator. -# Must generate into a separate gen directory because otherwise -# there's no way to figure out which files are generated. - -gentools_dir=$(top_srcdir)/gentools -$(srcdir)/generate.mk $(generated_cpp) $(generated_h): generate.timestamp -generate.timestamp: generate.sh $(specs) $(generator) - env gentools_dir=$(gentools_dir) specs="$(specs)" $(srcdir)/generate.sh - touch $@ - -# Empty rule in case a generator file is renamed/removed. -$(generator): +if GENERATE maintainer-clean-local: rm -rf gen # Ruby generator. rgen_dir=$(top_srcdir)/rubygen -rgen_cmd=ruby -I $(rgen_dir) $(rgen_dir)/generate . $(specs) all $(srcdir)/rubygen.mk +rgen_cmd=ruby -I $(rgen_dir) $(rgen_dir)/generate $(srcdir)/gen $(specs) all $(srcdir)/rubygen.mk endif # GENERATE -include $(srcdir)/generate.mk include $(srcdir)/rubygen.mk -DISTCLEANFILES=generate.mk rubygen.mk +DISTCLEANFILES=rubygen.mk # Code generated by C++ noinst_PROGRAMS=generate_MethodHolderMaxSize_h @@ -110,6 +96,7 @@ libqpidcommon_la_LIBADD = \ $(LIB_CLOCK_GETTIME) libqpidcommon_la_SOURCES = \ + $(rgen_common_cpp) \ $(platform_src) \ qpid/framing/AMQBody.cpp \ qpid/framing/AMQMethodBody.cpp \ @@ -137,13 +124,10 @@ libqpidcommon_la_SOURCES = \ qpid/framing/FrameHandler.h \ qpid/framing/HandlerUpdater.h \ qpid/framing/Blob.h \ - qpid/framing/AMQP_ClientProxy.cpp \ - qpid/framing/AMQP_ServerProxy.cpp \ qpid/framing/variant.h \ - gen/qpid/framing/AMQP_HighestVersion.h \ + qpid/framing/AMQP_HighestVersion.h \ qpid/framing/Blob.cpp \ qpid/framing/MethodHolder.h qpid/framing/MethodHolder.cpp \ - qpid/framing/MethodHolder_construct.cpp \ qpid/framing/MethodHolderMaxSize.h \ qpid/Exception.cpp \ qpid/Plugin.h \ @@ -217,6 +201,7 @@ libqpidbroker_la_SOURCES = \ libqpidclient_la_LIBADD = libqpidcommon.la libqpidclient_la_SOURCES = \ + $(rgen_client_cpp) \ qpid/client/ClientConnection.cpp \ qpid/client/ClientChannel.cpp \ qpid/client/ClientExchange.cpp \ @@ -233,12 +218,12 @@ libqpidclient_la_SOURCES = \ qpid/client/FutureResponse.cpp \ qpid/client/FutureFactory.cpp \ qpid/client/ReceivedContent.cpp \ - qpid/client/Session.cpp \ qpid/client/SessionCore.cpp \ qpid/client/StateManager.cpp nobase_include_HEADERS = \ + $(rgen_h) \ $(platform_hdr) \ qpid/broker/AccumulatedAck.h \ qpid/broker/BrokerChannel.h \ @@ -329,7 +314,6 @@ nobase_include_HEADERS = \ qpid/client/FutureFactory.h \ qpid/client/ReceivedContent.h \ qpid/client/Response.h \ - qpid/client/Session.h \ qpid/client/SessionCore.h \ qpid/client/StateManager.h \ qpid/framing/AMQBody.h \ @@ -357,6 +341,7 @@ nobase_include_HEADERS = \ qpid/framing/SerializeHandler.h \ qpid/framing/SequenceNumber.h \ qpid/framing/SequenceNumberSet.h \ + qpid/framing/StructHelper.h \ qpid/framing/Value.h \ qpid/framing/Visitor.h \ qpid/framing/Uuid.h \ diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp index 77030855ff..024516fb7b 100644 --- a/cpp/src/qpid/broker/BrokerAdapter.cpp +++ b/cpp/src/qpid/broker/BrokerAdapter.cpp @@ -137,17 +137,17 @@ void BrokerAdapter::ExchangeHandlerImpl::delete_(uint16_t /*ticket*/, const stri broker.getExchanges().destroy(name); } -void BrokerAdapter::ExchangeHandlerImpl::query(u_int16_t /*ticket*/, const string& name) +ExchangeQueryResult BrokerAdapter::ExchangeHandlerImpl::query(u_int16_t /*ticket*/, const string& name) { try { Exchange::shared_ptr exchange(broker.getExchanges().get(name)); - client.queryOk(exchange->getType(), exchange->isDurable(), false, exchange->getArgs()); + return ExchangeQueryResult(exchange->getType(), exchange->isDurable(), false, exchange->getArgs()); } catch (const ChannelException& e) { - client.queryOk("", false, true, FieldTable()); + return ExchangeQueryResult("", false, true, FieldTable()); } } -void BrokerAdapter::BindingHandlerImpl::query(u_int16_t /*ticket*/, +BindingQueryResult BrokerAdapter::BindingHandlerImpl::query(u_int16_t /*ticket*/, const std::string& exchangeName, const std::string& queueName, const std::string& key, @@ -164,24 +164,40 @@ void BrokerAdapter::BindingHandlerImpl::query(u_int16_t /*ticket*/, } if (!exchange) { - client.queryOk(true, false, false, false, false); + return BindingQueryResult(true, false, false, false, false); } else if (!queueName.empty() && !queue) { - client.queryOk(false, true, false, false, false); + return BindingQueryResult(false, true, false, false, false); } else if (exchange->isBound(queue, key.empty() ? 0 : &key, args.count() > 0 ? &args : &args)) { - client.queryOk(false, false, false, false, false); + return BindingQueryResult(false, false, false, false, false); } else { //need to test each specified option individually bool queueMatched = queueName.empty() || exchange->isBound(queue, 0, 0); bool keyMatched = key.empty() || exchange->isBound(Queue::shared_ptr(), &key, 0); bool argsMatched = args.count() == 0 || exchange->isBound(Queue::shared_ptr(), 0, &args); - client.queryOk(false, false, !queueMatched, !keyMatched, !argsMatched); + return BindingQueryResult(false, false, !queueMatched, !keyMatched, !argsMatched); } } +QueueQueryResult BrokerAdapter::QueueHandlerImpl::query(const string& name) +{ + Queue::shared_ptr queue = getQueue(name); + Exchange::shared_ptr alternateExchange = queue->getAlternateExchange(); + + return QueueQueryResult(queue->getName(), + alternateExchange ? alternateExchange->getName() : "", + queue->isDurable(), + queue->hasExclusiveOwner(), + queue->isAutoDelete(), + queue->getSettings(), + queue->getMessageCount(), + queue->getConsumerCount()); + +} + void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string& name, const string& alternateExchange, bool passive, bool durable, bool exclusive, - bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){ + bool autoDelete, const qpid::framing::FieldTable& arguments){ Exchange::shared_ptr alternate; if (!alternateExchange.empty()) { alternate = broker.getExchanges().get(alternateExchange); @@ -223,11 +239,6 @@ void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string& 405, format("Cannot grant exclusive access to queue '%s'") % queue->getName()); - if (!nowait) { - string queueName = queue->getName(); - client.declareOk( - queueName, queue->getMessageCount(), queue->getConsumerCount()); - } } void BrokerAdapter::QueueHandlerImpl::bind(uint16_t /*ticket*/, const string& queueName, @@ -269,17 +280,13 @@ BrokerAdapter::QueueHandlerImpl::unbind(uint16_t /*ticket*/, } -void BrokerAdapter::QueueHandlerImpl::purge(uint16_t /*ticket*/, const string& queueName, bool nowait){ - - Queue::shared_ptr queue = getQueue(queueName); - int count = queue->purge(); - if(!nowait) client.purgeOk( count); +void BrokerAdapter::QueueHandlerImpl::purge(uint16_t /*ticket*/, const string& queue){ + getQueue(queue)->purge(); } void BrokerAdapter::QueueHandlerImpl::delete_(uint16_t /*ticket*/, const string& queue, - bool ifUnused, bool ifEmpty, bool nowait){ + bool ifUnused, bool ifEmpty){ ChannelException error(0, ""); - int count(0); Queue::shared_ptr q = getQueue(queue); if(ifEmpty && q->getMessageCount() > 0){ throw ChannelException(406, "Queue not empty."); @@ -291,14 +298,10 @@ void BrokerAdapter::QueueHandlerImpl::delete_(uint16_t /*ticket*/, const string& QueueVector::iterator i = find(connection.exclusiveQueues.begin(), connection.exclusiveQueues.end(), q); if(i < connection.exclusiveQueues.end()) connection.exclusiveQueues.erase(i); } - count = q->getMessageCount(); q->destroy(); broker.getQueues().destroy(queue); q->unbind(broker.getExchanges(), q); } - - if(!nowait) - client.deleteOk(count); } @@ -333,10 +336,8 @@ void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/, queue->requestDispatch(); } -void BrokerAdapter::BasicHandlerImpl::cancel(const string& consumerTag, bool nowait){ +void BrokerAdapter::BasicHandlerImpl::cancel(const string& consumerTag){ channel.cancel(consumerTag); - - if(!nowait) client.cancelOk(consumerTag); } void BrokerAdapter::BasicHandlerImpl::publish(uint16_t /*ticket*/, diff --git a/cpp/src/qpid/broker/BrokerAdapter.h b/cpp/src/qpid/broker/BrokerAdapter.h index 3fe2eb9eba..99b7f14525 100644 --- a/cpp/src/qpid/broker/BrokerAdapter.h +++ b/cpp/src/qpid/broker/BrokerAdapter.h @@ -121,7 +121,7 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations const qpid::framing::FieldTable& arguments); void delete_(uint16_t ticket, const std::string& exchange, bool ifUnused); - void query(u_int16_t ticket, const string& name); + framing::ExchangeQueryResult query(u_int16_t ticket, const string& name); private: void checkType(Exchange::shared_ptr exchange, const std::string& type); void checkAlternate(Exchange::shared_ptr exchange, Exchange::shared_ptr alternate); @@ -134,11 +134,11 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations public: BindingHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {} - void query(u_int16_t ticket, - const std::string& exchange, - const std::string& queue, - const std::string& routingKey, - const framing::FieldTable& arguments); + framing::BindingQueryResult query(u_int16_t ticket, + const std::string& exchange, + const std::string& queue, + const std::string& routingKey, + const framing::FieldTable& arguments); }; class QueueHandlerImpl : @@ -151,7 +151,7 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations void declare(uint16_t ticket, const std::string& queue, const std::string& alternateExchange, bool passive, bool durable, bool exclusive, - bool autoDelete, bool nowait, + bool autoDelete, const qpid::framing::FieldTable& arguments); void bind(uint16_t ticket, const std::string& queue, const std::string& exchange, const std::string& routingKey, @@ -161,11 +161,10 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations const std::string& exchange, const std::string& routingKey, const qpid::framing::FieldTable& arguments ); - void purge(uint16_t ticket, const std::string& queue, - bool nowait); + framing::QueueQueryResult query(const string& queue); + void purge(uint16_t ticket, const std::string& queue); void delete_(uint16_t ticket, const std::string& queue, - bool ifUnused, bool ifEmpty, - bool nowait); + bool ifUnused, bool ifEmpty); }; class BasicHandlerImpl : @@ -179,18 +178,15 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations void qos(uint32_t prefetchSize, uint16_t prefetchCount, bool global); - void consume( - uint16_t ticket, const std::string& queue, - const std::string& consumerTag, bool noLocal, bool noAck, - bool exclusive, bool nowait, - const qpid::framing::FieldTable& fields); - void cancel(const std::string& consumerTag, - bool nowait); + void consume(uint16_t ticket, const std::string& queue, + const std::string& consumerTag, + bool noLocal, bool noAck, bool exclusive, bool nowait, + const qpid::framing::FieldTable& fields); + void cancel(const std::string& consumerTag); void publish(uint16_t ticket, const std::string& exchange, const std::string& routingKey, bool rejectUnroutable, bool immediate); - void get(uint16_t ticket, const std::string& queue, - bool noAck); + void get(uint16_t ticket, const std::string& queue, bool noAck); void ack(uint64_t deliveryTag, bool multiple); void reject(uint64_t deliveryTag, bool requeue); void recover(bool requeue); diff --git a/cpp/src/qpid/broker/BrokerChannel.cpp b/cpp/src/qpid/broker/BrokerChannel.cpp index e135e960c4..0dc4bed661 100644 --- a/cpp/src/qpid/broker/BrokerChannel.cpp +++ b/cpp/src/qpid/broker/BrokerChannel.cpp @@ -374,7 +374,7 @@ void Channel::ack(DeliveryId first, DeliveryId last, bool cumulative) //just acked single element (move end past it) ++end; } - + for_each(start, end, boost::bind(&Channel::acknowledged, this, _1)); if (txBuffer.get()) { diff --git a/cpp/src/qpid/broker/BrokerQueue.h b/cpp/src/qpid/broker/BrokerQueue.h index 857a7adfc2..35aa954c1e 100644 --- a/cpp/src/qpid/broker/BrokerQueue.h +++ b/cpp/src/qpid/broker/BrokerQueue.h @@ -145,8 +145,10 @@ namespace qpid { inline const string& getName() const { return name; } inline const bool isExclusiveOwner(const ConnectionToken* const o) const { return o == owner; } inline bool hasExclusiveConsumer() const { return exclusive; } + inline bool hasExclusiveOwner() const { return owner != 0; } inline bool isDurable() const { return store != 0; } - + inline const framing::FieldTable& getSettings() const { return settings; } + inline bool isAutoDelete() const { return autodelete; } bool canAutoDelete() const; bool enqueue(TransactionContext* ctxt, Message::shared_ptr& msg); diff --git a/cpp/src/qpid/broker/DtxHandlerImpl.cpp b/cpp/src/qpid/broker/DtxHandlerImpl.cpp index 8b3629dff9..5a69ff0d65 100644 --- a/cpp/src/qpid/broker/DtxHandlerImpl.cpp +++ b/cpp/src/qpid/broker/DtxHandlerImpl.cpp @@ -22,19 +22,9 @@ #include "BrokerChannel.h" using namespace qpid::broker; -using qpid::framing::AMQP_ClientProxy; -using qpid::framing::Buffer; -using qpid::framing::FieldTable; +using namespace qpid::framing; using std::string; -DtxHandlerImpl::DtxHandlerImpl(CoreRefs& parent) : - CoreRefs(parent), - dClient(AMQP_ClientProxy::DtxDemarcation::get(proxy)), - cClient(AMQP_ClientProxy::DtxCoordination::get(proxy)) - -{ -} - const int XA_RBROLLBACK(1); const int XA_RBTIMEOUT(2); const int XA_HEURHAZ(3); @@ -44,6 +34,7 @@ const int XA_HEURMIX(6); const int XA_RDONLY(7); const int XA_OK(8); +DtxHandlerImpl::DtxHandlerImpl(CoreRefs& parent) : CoreRefs(parent) {} // DtxDemarcationHandler: @@ -53,10 +44,10 @@ void DtxHandlerImpl::select() channel.selectDtx(); } -void DtxHandlerImpl::end(u_int16_t /*ticket*/, - const string& xid, - bool fail, - bool suspend) +DtxDemarcationEndResult DtxHandlerImpl::end(u_int16_t /*ticket*/, + const string& xid, + bool fail, + bool suspend) { try { if (fail) { @@ -64,7 +55,7 @@ void DtxHandlerImpl::end(u_int16_t /*ticket*/, if (suspend) { throw ConnectionException(503, "End and suspend cannot both be set."); } else { - dClient.endOk(XA_RBROLLBACK); + return DtxDemarcationEndResult(XA_RBROLLBACK); } } else { if (suspend) { @@ -72,14 +63,14 @@ void DtxHandlerImpl::end(u_int16_t /*ticket*/, } else { channel.endDtx(xid, false); } - dClient.endOk(XA_OK); + return DtxDemarcationEndResult(XA_OK); } } catch (const DtxTimeoutException& e) { - dClient.endOk(XA_RBTIMEOUT); + return DtxDemarcationEndResult(XA_RBTIMEOUT); } } -void DtxHandlerImpl::start(u_int16_t /*ticket*/, +DtxDemarcationStartResult DtxHandlerImpl::start(u_int16_t /*ticket*/, const string& xid, bool join, bool resume) @@ -93,52 +84,52 @@ void DtxHandlerImpl::start(u_int16_t /*ticket*/, } else { channel.startDtx(xid, broker.getDtxManager(), join); } - dClient.startOk(XA_OK); + return DtxDemarcationStartResult(XA_OK); } catch (const DtxTimeoutException& e) { - dClient.startOk(XA_RBTIMEOUT); + return DtxDemarcationStartResult(XA_RBTIMEOUT); } } // DtxCoordinationHandler: -void DtxHandlerImpl::prepare(u_int16_t /*ticket*/, +DtxCoordinationPrepareResult DtxHandlerImpl::prepare(u_int16_t /*ticket*/, const string& xid) { try { bool ok = broker.getDtxManager().prepare(xid); - cClient.prepareOk(ok ? XA_OK : XA_RBROLLBACK); + return DtxCoordinationPrepareResult(ok ? XA_OK : XA_RBROLLBACK); } catch (const DtxTimeoutException& e) { - cClient.prepareOk(XA_RBTIMEOUT); + return DtxCoordinationPrepareResult(XA_RBTIMEOUT); } } -void DtxHandlerImpl::commit(u_int16_t /*ticket*/, +DtxCoordinationCommitResult DtxHandlerImpl::commit(u_int16_t /*ticket*/, const string& xid, bool onePhase) { try { bool ok = broker.getDtxManager().commit(xid, onePhase); - cClient.commitOk(ok ? XA_OK : XA_RBROLLBACK); + return DtxCoordinationCommitResult(ok ? XA_OK : XA_RBROLLBACK); } catch (const DtxTimeoutException& e) { - cClient.commitOk(XA_RBTIMEOUT); + return DtxCoordinationCommitResult(XA_RBTIMEOUT); } } -void DtxHandlerImpl::rollback(u_int16_t /*ticket*/, +DtxCoordinationRollbackResult DtxHandlerImpl::rollback(u_int16_t /*ticket*/, const string& xid ) { try { broker.getDtxManager().rollback(xid); - cClient.rollbackOk(XA_OK); + return DtxCoordinationRollbackResult(XA_OK); } catch (const DtxTimeoutException& e) { - cClient.rollbackOk(XA_RBTIMEOUT); + return DtxCoordinationRollbackResult(XA_RBTIMEOUT); } } -void DtxHandlerImpl::recover(u_int16_t /*ticket*/, - bool /*startscan*/, - bool /*endscan*/ ) +DtxCoordinationRecoverResult DtxHandlerImpl::recover(u_int16_t /*ticket*/, + bool /*startscan*/, + bool /*endscan*/ ) { //TODO: what do startscan and endscan actually mean? @@ -169,7 +160,7 @@ void DtxHandlerImpl::recover(u_int16_t /*ticket*/, FieldTable response; response.setString("xids", data); - cClient.recoverOk(response); + return DtxCoordinationRecoverResult(response); } void DtxHandlerImpl::forget(u_int16_t /*ticket*/, @@ -179,10 +170,10 @@ void DtxHandlerImpl::forget(u_int16_t /*ticket*/, throw ConnectionException(503, boost::format("Forget is invalid. Branch with xid %1% not heuristically completed!") % xid); } -void DtxHandlerImpl::getTimeout(const string& xid) +DtxCoordinationGetTimeoutResult DtxHandlerImpl::getTimeout(const string& xid) { uint32_t timeout = broker.getDtxManager().getTimeout(xid); - cClient.getTimeoutOk(timeout); + return DtxCoordinationGetTimeoutResult(timeout); } diff --git a/cpp/src/qpid/broker/DtxHandlerImpl.h b/cpp/src/qpid/broker/DtxHandlerImpl.h index 067ba47fb5..da6379b26c 100644 --- a/cpp/src/qpid/broker/DtxHandlerImpl.h +++ b/cpp/src/qpid/broker/DtxHandlerImpl.h @@ -31,34 +31,34 @@ class DtxHandlerImpl public framing::AMQP_ServerOperations::DtxCoordinationHandler, public framing::AMQP_ServerOperations::DtxDemarcationHandler { - framing::AMQP_ClientProxy::DtxDemarcation dClient; - framing::AMQP_ClientProxy::DtxCoordination cClient; public: DtxHandlerImpl(CoreRefs& parent); // DtxCoordinationHandler: - void commit(u_int16_t ticket, const std::string& xid, bool onePhase); + framing::DtxCoordinationCommitResult commit(u_int16_t ticket, const std::string& xid, bool onePhase); void forget(u_int16_t ticket, const std::string& xid); - void getTimeout(const std::string& xid); + framing::DtxCoordinationGetTimeoutResult getTimeout(const std::string& xid); - void prepare(u_int16_t ticket, const std::string& xid); + framing::DtxCoordinationPrepareResult prepare(u_int16_t ticket, const std::string& xid); - void recover(u_int16_t ticket, bool startscan, bool endscan); + framing::DtxCoordinationRecoverResult recover(u_int16_t ticket, bool startscan, bool endscan); - void rollback(u_int16_t ticket, const std::string& xid); + framing::DtxCoordinationRollbackResult rollback(u_int16_t ticket, const std::string& xid); void setTimeout(u_int16_t ticket, const std::string& xid, u_int32_t timeout); // DtxDemarcationHandler: - - void end(u_int16_t ticket, const std::string& xid, bool fail, bool suspend); - + + framing::DtxDemarcationEndResult end(u_int16_t ticket, const std::string& xid, bool fail, bool suspend); + void select(); + + framing::DtxDemarcationStartResult start(u_int16_t ticket, const std::string& xid, bool join, bool resume); + - void start(u_int16_t ticket, const std::string& xid, bool join, bool resume); }; diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp index 3f407c11f7..ce1fa1e028 100644 --- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp +++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp @@ -87,19 +87,21 @@ MessageHandlerImpl::offset(uint64_t /*value*/ ) } void -MessageHandlerImpl::consume(uint16_t /*ticket*/, +MessageHandlerImpl::subscribe(uint16_t /*ticket*/, const string& queueName, const string& destination, bool noLocal, - bool noAck, + u_int8_t confirmMode, + u_int8_t /*acquireMode*/,//TODO: implement acquire modes bool exclusive, const framing::FieldTable& filter ) { Queue::shared_ptr queue = getQueue(queueName); if(!destination.empty() && channel.exists(destination)) throw ConnectionException(530, "Consumer tags must be unique"); + string tag = destination; - channel.consume(MessageMessage::getToken(destination), tag, queue, noLocal, !noAck, exclusive, &filter); + channel.consume(MessageMessage::getToken(destination), tag, queue, noLocal, confirmMode == 1, exclusive, &filter); // Dispatch messages as there is now a consumer. queue->requestDispatch(); } @@ -153,8 +155,9 @@ MessageHandlerImpl::recover(bool requeue) } void -MessageHandlerImpl::reject(uint16_t /*code*/, const string& /*text*/ ) +MessageHandlerImpl::reject(const SequenceNumberSet& /*transfers*/, uint16_t /*code*/, const string& /*text*/ ) { + //TODO: implement } void @@ -210,5 +213,14 @@ void MessageHandlerImpl::stop(const std::string& destination) channel.stop(destination); } +void MessageHandlerImpl::acquire(const SequenceNumberSet& /*transfers*/, u_int8_t /*mode*/) +{ + throw ConnectionException(540, "Not yet implemented"); +} + +void MessageHandlerImpl::release(const SequenceNumberSet& /*transfers*/) +{ + throw ConnectionException(540, "Not yet implemented"); +} }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.h b/cpp/src/qpid/broker/MessageHandlerImpl.h index 20cae46da4..f4d9fa0c76 100644 --- a/cpp/src/qpid/broker/MessageHandlerImpl.h +++ b/cpp/src/qpid/broker/MessageHandlerImpl.h @@ -49,14 +49,6 @@ class MessageHandlerImpl : void close(const std::string& reference ); - void consume(uint16_t ticket, - const std::string& queue, - const std::string& destination, - bool noLocal, - bool noAck, - bool exclusive, - const framing::FieldTable& filter ); - void empty(); void get(uint16_t ticket, @@ -76,8 +68,9 @@ class MessageHandlerImpl : void recover(bool requeue ); - void reject(uint16_t code, - const std::string& text ); + void reject(const framing::SequenceNumberSet& transfers, + uint16_t code, + const std::string& text ); void resume(const std::string& reference, const std::string& identifier ); @@ -92,6 +85,19 @@ class MessageHandlerImpl : void stop(const std::string& destination); + void acquire(const framing::SequenceNumberSet& transfers, u_int8_t mode); + + void release(const framing::SequenceNumberSet& transfers); + + void subscribe(u_int16_t ticket, + const string& queue, + const string& destination, + bool noLocal, + u_int8_t confirmMode, + u_int8_t acquireMode, + bool exclusive, + const framing::FieldTable& filter); + private: ReferenceRegistry references; }; diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp index b7aa2aad25..f65e450e82 100644 --- a/cpp/src/qpid/broker/SemanticHandler.cpp +++ b/cpp/src/qpid/broker/SemanticHandler.cpp @@ -22,8 +22,10 @@ #include "SemanticHandler.h" #include "BrokerAdapter.h" #include "qpid/framing/ChannelAdapter.h" -#include "qpid/framing/ExecutionCompleteBody.h" #include "qpid/framing/ChannelCloseOkBody.h" +#include "qpid/framing/ExecutionCompleteBody.h" +#include "qpid/framing/ExecutionResultBody.h" +#include "qpid/framing/InvocationVisitor.h" using namespace qpid::broker; using namespace qpid::framing; @@ -66,6 +68,11 @@ void SemanticHandler::handleMethod(framing::AMQMethodBody* method) { try { if (!method->invoke(this)) { + //temporary hack until channel management is moved to its own handler: + if (method->amqpClassId() != ChannelOpenBody::CLASS_ID) { + ++(incoming.lwm); + } + //else do the usual: handleL4(method); //(if the frameset is complete) we can move the execution-mark @@ -73,7 +80,9 @@ void SemanticHandler::handleMethod(framing::AMQMethodBody* method) //temporary hack until channel management is moved to its own handler: if (method->amqpClassId() != ChannelOpenBody::CLASS_ID) { - ++(incoming.hwm); + //TODO: need to account for async store opreations + //when this command is a message publication + ++(incoming.hwm); } //note: need to be more sophisticated than this if we execute @@ -85,7 +94,7 @@ void SemanticHandler::handleMethod(framing::AMQMethodBody* method) } } -void SemanticHandler::complete(uint32_t cumulative, SequenceNumberSet range) +void SemanticHandler::complete(uint32_t cumulative, const SequenceNumberSet& range) { //record: SequenceNumber mark(cumulative); @@ -98,7 +107,7 @@ void SemanticHandler::complete(uint32_t cumulative, SequenceNumberSet range) if (range.size() % 2) { //must be even number throw ConnectionException(530, "Received odd number of elements in ranged mark"); } else { - for (SequenceNumberSet::iterator i = range.begin(); i != range.end(); i++) { + for (SequenceNumberSet::const_iterator i = range.begin(); i != range.end(); i++) { channel.ackRange((uint64_t) i->getValue(), (uint64_t) (++i)->getValue()); } } @@ -113,6 +122,25 @@ void SemanticHandler::flush() ChannelAdapter::send(ExecutionCompleteBody(getVersion(), incoming.hwm.getValue(), SequenceNumberSet())); } } +void SemanticHandler::sync() +{ + //for now, just treat as flush; will need to get more clever when we deal with async publication + flush(); +} + +void SemanticHandler::noop() +{ + //Do nothing... + // + //is this an L3 control? or is it an L4 command? + //if the former, of what use is it? + //if the latter it may contain a synch request... but its odd to have it in this class +} + +void SemanticHandler::result(uint32_t /*command*/, const std::string& /*data*/) +{ + //never actually sent by client at present +} void SemanticHandler::handleL4(framing::AMQMethodBody* method) { @@ -124,7 +152,13 @@ void SemanticHandler::handleL4(framing::AMQMethodBody* method) throw ConnectionException(504, out.str()); } } else { - method->invoke(*adapter); + InvocationVisitor v(adapter.get()); + method->accept(v); + if (!v.wasHandled()) { + throw ConnectionException(540, "Not implemented"); + } else if (v.hasResult()) { + ChannelAdapter::send(ExecutionResultBody(getVersion(), incoming.lwm.getValue(), v.getResult())); + } } }catch(const ChannelException& e){ adapter->getProxy().getChannel().close( diff --git a/cpp/src/qpid/broker/SemanticHandler.h b/cpp/src/qpid/broker/SemanticHandler.h index 6748da8500..672c6ad929 100644 --- a/cpp/src/qpid/broker/SemanticHandler.h +++ b/cpp/src/qpid/broker/SemanticHandler.h @@ -79,8 +79,11 @@ public: void handle(framing::AMQFrame& frame); //execution class method handlers: - void complete(uint32_t cumulativeExecutionMark, framing::SequenceNumberSet range); + void complete(uint32_t cumulativeExecutionMark, const framing::SequenceNumberSet& range); void flush(); + void noop(); + void result(uint32_t command, const std::string& data); + void sync(); }; }} diff --git a/cpp/src/qpid/client/ClientChannel.cpp b/cpp/src/qpid/client/ClientChannel.cpp index aa73e83328..d1cc4734eb 100644 --- a/cpp/src/qpid/client/ClientChannel.cpp +++ b/cpp/src/qpid/client/ClientChannel.cpp @@ -20,6 +20,7 @@ */ #include "qpid/log/Statement.h" #include <iostream> +#include <sstream> #include "ClientChannel.h" #include "qpid/sys/Monitor.h" #include "ClientMessage.h" @@ -54,7 +55,8 @@ class ScopedSync }; Channel::Channel(bool _transactional, u_int16_t _prefetch) : - prefetch(_prefetch), transactional(_transactional), running(false) + prefetch(_prefetch), transactional(_transactional), running(false), + uniqueId(true)/*could eventually be the session id*/, nameCounter(0) { } @@ -103,20 +105,22 @@ void Channel::deleteExchange(Exchange& exchange, bool synch){ } void Channel::declareQueue(Queue& queue, bool synch){ + if (queue.getName().empty()) { + stringstream uniqueName; + uniqueName << uniqueId << "-queue-" << ++nameCounter; + queue.setName(uniqueName.str()); + } + FieldTable args; ScopedSync s(*session, synch); - Response r = session->queueDeclare(0, queue.getName(), empty, false/*passive*/, queue.isDurable(), - queue.isExclusive(), queue.isAutoDelete(), !synch, args); + session->queueDeclare(0, queue.getName(), empty, false/*passive*/, queue.isDurable(), + queue.isExclusive(), queue.isAutoDelete(), args); - if(synch) { - if(queue.getName().length() == 0) - queue.setName(r.as<QueueDeclareOkBody>().getQueue()); - } } void Channel::deleteQueue(Queue& queue, bool ifunused, bool ifempty, bool synch){ ScopedSync s(*session, synch); - session->queueDelete(0, queue.getName(), ifunused, ifempty, !synch); + session->queueDelete(0, queue.getName(), ifunused, ifempty); } void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){ @@ -168,7 +172,7 @@ void Channel::cancel(const std::string& tag, bool synch) { consumers.erase(i); } ScopedSync s(*session, synch); - session->basicCancel(tag, !synch); + session->basicCancel(tag); } bool Channel::get(Message& msg, const Queue& queue, AckMode ackMode) { diff --git a/cpp/src/qpid/client/ClientChannel.h b/cpp/src/qpid/client/ClientChannel.h index 98e04db109..d73addc950 100644 --- a/cpp/src/qpid/client/ClientChannel.h +++ b/cpp/src/qpid/client/ClientChannel.h @@ -24,11 +24,12 @@ #include <memory> #include <boost/scoped_ptr.hpp> #include "qpid/framing/amqp_framing.h" +#include "qpid/framing/Uuid.h" #include "ClientExchange.h" #include "ClientMessage.h" #include "ClientQueue.h" #include "ConnectionImpl.h" -#include "Session.h" +#include "qpid/client/Session.h" #include "qpid/Exception.h" #include "qpid/sys/Mutex.h" #include "qpid/sys/Runnable.h" @@ -83,6 +84,8 @@ class Channel : private sys::Runnable SessionCore::shared_ptr sessionCore; framing::ChannelId channelId; BlockingQueue<ReceivedContent::shared_ptr> gets; + framing::Uuid uniqueId; + uint32_t nameCounter; void stop(); diff --git a/cpp/src/qpid/client/Connection.h b/cpp/src/qpid/client/Connection.h index e41ab363b5..e309b5c63e 100644 --- a/cpp/src/qpid/client/Connection.h +++ b/cpp/src/qpid/client/Connection.h @@ -26,7 +26,7 @@ #include "qpid/QpidError.h" #include "ClientChannel.h" #include "ConnectionImpl.h" -#include "Session.h" +#include "qpid/client/Session.h" #include "qpid/framing/AMQP_HighestVersion.h" diff --git a/cpp/src/qpid/client/ExecutionHandler.cpp b/cpp/src/qpid/client/ExecutionHandler.cpp index 6ee6429b6b..6c2600d00b 100644 --- a/cpp/src/qpid/client/ExecutionHandler.cpp +++ b/cpp/src/qpid/client/ExecutionHandler.cpp @@ -78,7 +78,7 @@ void ExecutionHandler::handle(AMQFrame& frame) } } -void ExecutionHandler::complete(uint32_t cumulative, SequenceNumberSet range) +void ExecutionHandler::complete(uint32_t cumulative, const SequenceNumberSet& range) { SequenceNumber mark(cumulative); if (outgoing.lwm < mark) { @@ -101,6 +101,26 @@ void ExecutionHandler::flush() incoming.lwm = incoming.hwm; } +void ExecutionHandler::noop() +{ + //do nothing +} + +void ExecutionHandler::result(uint32_t /*command*/, const std::string& /*data*/) +{ + //TODO: need to signal the result to the appropriate listener +} + +void ExecutionHandler::sync() +{ + //TODO: implement (the application is in charge of completion of + //some commands, so need to track completion for them). + + //This shouldn't ever need to be called by the server (in my + //opinion) as the server never needs to synchronise with the + //clients execution +} + void ExecutionHandler::sendFlush() { AMQFrame frame(version, 0, ExecutionFlushBody()); diff --git a/cpp/src/qpid/client/ExecutionHandler.h b/cpp/src/qpid/client/ExecutionHandler.h index 21613df779..b409d5df7b 100644 --- a/cpp/src/qpid/client/ExecutionHandler.h +++ b/cpp/src/qpid/client/ExecutionHandler.h @@ -45,8 +45,11 @@ class ExecutionHandler : framing::ProtocolVersion version; uint64_t maxFrameSize; - void complete(uint32_t mark, framing::SequenceNumberSet range); + void complete(uint32_t mark, const framing::SequenceNumberSet& range); void flush(); + void noop(); + void result(uint32_t command, const std::string& data); + void sync(); public: BlockingQueue<ReceivedContent::shared_ptr> received; diff --git a/cpp/src/qpid/framing/AMQP_HighestVersion.h b/cpp/src/qpid/framing/AMQP_HighestVersion.h new file mode 100644 index 0000000000..42139c7937 --- /dev/null +++ b/cpp/src/qpid/framing/AMQP_HighestVersion.h @@ -0,0 +1,40 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/* + * This file used to be auto-generated by Qpid Gentools v.0.1 + * its here temporarily until we get a full solution to multi-version support + */ +#ifndef qpid_framing_highestProtocolVersion__ +#define qpid_framing_highestProtocolVersion__ + +#include "qpid/framing/ProtocolVersion.h" + + +namespace qpid { +namespace framing { + +static ProtocolVersion highestProtocolVersion(0, 10); + +} /* namespace framing */ +} /* namespace qpid */ + +#endif diff --git a/cpp/src/qpid/framing/StructHelper.h b/cpp/src/qpid/framing/StructHelper.h new file mode 100644 index 0000000000..b5d1b1e78c --- /dev/null +++ b/cpp/src/qpid/framing/StructHelper.h @@ -0,0 +1,56 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _StructHelper_ +#define _StructHelper_ + +#include "qpid/Exception.h" + +namespace qpid { +namespace framing { + +class StructHelper +{ +public: + + template <class T> void encode(const T t, std::string& data) { + uint32_t size = t.size() + 2/*type*/; + Buffer buffer(size); + buffer.putShort(T::TYPE); + t.encode(buffer); + buffer.flip(); + buffer.getRawData(data, size); + } + + template <class T> void decode(T t, std::string& data) { + Buffer buffer(data.length()); + buffer.putRawData(data); + buffer.flip(); + uint16_t type = buffer.getShort(); + if (type == T::TYPE) { + t.decode(buffer); + } else { + throw Exception("Type code does not match"); + } + } +}; + +}} +#endif diff --git a/cpp/src/qpid/framing/amqp_types_full.h b/cpp/src/qpid/framing/amqp_types_full.h index 6a24a99d38..027b563caf 100644 --- a/cpp/src/qpid/framing/amqp_types_full.h +++ b/cpp/src/qpid/framing/amqp_types_full.h @@ -32,5 +32,6 @@ #include "amqp_types.h" #include "FramingContent.h" #include "FieldTable.h" +#include "SequenceNumberSet.h" #endif /*!_framing_amqp_types_decl_h*/ diff --git a/cpp/src/tests/Blob b/cpp/src/tests/Blob Binary files differdeleted file mode 100755 index 05e67dc01f..0000000000 --- a/cpp/src/tests/Blob +++ /dev/null diff --git a/cpp/src/tests/Cluster_child.cpp b/cpp/src/tests/Cluster_child.cpp index 62ccb9bd72..7789ada614 100644 --- a/cpp/src/tests/Cluster_child.cpp +++ b/cpp/src/tests/Cluster_child.cpp @@ -32,7 +32,7 @@ using namespace qpid::log; static const ProtocolVersion VER; -/** Chlid part of Cluster::clusterTwo test */ +/** Child part of Cluster::clusterTwo test */ void clusterTwo() { TestCluster cluster("clusterTwo", "amqp:child:2"); AMQFrame frame; |