diff options
117 files changed, 2032 insertions, 3746 deletions
diff --git a/qpid/cpp/rubygen/templates/InvocationVisitor.rb b/qpid/cpp/rubygen/templates/InvocationVisitor.rb index befbdd53c7..67a0479bb6 100644 --- a/qpid/cpp/rubygen/templates/InvocationVisitor.rb +++ b/qpid/cpp/rubygen/templates/InvocationVisitor.rb @@ -12,11 +12,7 @@ class InvocationVisitor < CppGen end def invocation_args(m) - if (m.parent.name == "message" && (m.name == "transfer" || m.name == "append")) - "body" - else - m.param_names.collect {|p| "body.get" + p.caps + "()" }.join(",\n") - end + m.param_names.collect {|p| "body.get" + p.caps + "()" }.join(",\n") end def null_visit(m) @@ -95,7 +91,7 @@ EOS } genl "}" genl - @amqp.methods_.each { |m| m.on_server? ? define_visit(m) : null_visit(m) } + @amqp.methods_.each { |m| m.on_server? && !m.content() ? define_visit(m) : null_visit(m) } } } end diff --git a/qpid/cpp/rubygen/templates/Operations.rb b/qpid/cpp/rubygen/templates/Operations.rb index 9f242f5910..fff4f796c3 100755 --- a/qpid/cpp/rubygen/templates/Operations.rb +++ b/qpid/cpp/rubygen/templates/Operations.rb @@ -17,13 +17,9 @@ class OperationsGen < CppGen def handler_method (m) return_type = m.result ? m.result.cpptype.ret : "void" - if (m.parent.name == "message" && (m.name == "transfer" || m.name == "append")) - gen "\nvirtual #{return_type} #{m.cppname}(const framing::AMQMethodBody& context) = 0;\n" - else - gen "\nvirtual #{return_type} #{m.cppname}(" - gen m.signature.join(",\n") - gen ") = 0;\n" - end + gen "\nvirtual #{return_type} #{m.cppname}(" + gen m.signature.join(",\n") + gen ") = 0;\n" end def handler_classname(c) c.name.caps+"Handler"; end @@ -40,7 +36,7 @@ class #{handlerclass} : public virtual Invocable { virtual ~#{handlerclass}() {} // Protocol methods EOS - c.methods_on(@chassis).each { |m| handler_method(m) } + c.methods_on(@chassis).each { |m| handler_method(m) if !m.content() } gen <<EOS }; // class #{handlerclass} diff --git a/qpid/cpp/rubygen/templates/Session.rb b/qpid/cpp/rubygen/templates/Session.rb index f4af2041dd..95b10c6dbf 100644 --- a/qpid/cpp/rubygen/templates/Session.rb +++ b/qpid/cpp/rubygen/templates/Session.rb @@ -88,7 +88,7 @@ public: #{@classname}(ConnectionImpl::shared_ptr, SessionCore::shared_ptr); ~#{@classname}(); - ReceivedContent::shared_ptr get() { return impl->get(); } + framing::FrameSet::shared_ptr get() { return impl->get(); } void setSynchronous(bool sync) { impl->setSync(sync); } void close(); EOS diff --git a/qpid/cpp/rubygen/templates/structs.rb b/qpid/cpp/rubygen/templates/structs.rb index c92f71c777..2543c43717 100644 --- a/qpid/cpp/rubygen/templates/structs.rb +++ b/qpid/cpp/rubygen/templates/structs.rb @@ -33,6 +33,16 @@ class StructGen < CppGen ValueTypes=["octet", "short", "long", "longlong", "timestamp"] + def default_initialisation(s) + params = s.fields.select {|f| ValueTypes.include?(f.domain.type_) || f.domain.type_ == "bit"} + strings = params.collect {|f| "#{f.cppname}(0)"} + if strings.empty? + return "" + else + return " : " + strings.join(", ") + end + end + def printable_form(f) if (f.cpptype.name == "u_int8_t") return "(int) " + f.cppname @@ -108,12 +118,19 @@ class StructGen < CppGen end def methodbody_extra_defs(s) + if (s.content) + content = "true" + else + content = "false" + end + gen <<EOS using AMQMethodBody::accept; void accept(MethodBodyConstVisitor& v) const { v.visit(*this); } inline ClassId amqpClassId() const { return CLASS_ID; } inline MethodId amqpMethodId() const { return METHOD_ID; } + inline bool isContentBearing() const { return #{content}; } EOS end @@ -132,6 +149,9 @@ EOS if (s.kind_of? AmqpMethod) genl "#{name}(ProtocolVersion=ProtocolVersion()) {}" end + if (s.kind_of? AmqpStruct) + genl "#{name}() #{default_initialisation(s)} {}" + end end def define_accessors(f) diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index 387d4dce91..f1fee3e61d 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -111,10 +111,12 @@ libqpidcommon_la_SOURCES = \ qpid/framing/Buffer.cpp \ qpid/framing/FieldTable.cpp \ qpid/framing/FramingContent.cpp \ + qpid/framing/FrameSet.cpp \ qpid/framing/InitiationHandler.cpp \ qpid/framing/ProtocolInitiation.cpp \ qpid/framing/ProtocolVersion.cpp \ qpid/framing/ProtocolVersionException.cpp \ + qpid/framing/SendContent.cpp \ qpid/framing/SequenceNumber.cpp \ qpid/framing/SequenceNumberSet.cpp \ qpid/framing/Value.cpp \ @@ -159,8 +161,6 @@ libqpidbroker_la_SOURCES = \ qpid/broker/BrokerSingleton.cpp \ qpid/broker/BrokerChannel.cpp \ qpid/broker/BrokerExchange.cpp \ - qpid/broker/BrokerMessage.cpp \ - qpid/broker/BrokerMessageMessage.cpp \ qpid/broker/BrokerQueue.cpp \ qpid/broker/Connection.cpp \ qpid/broker/ConnectionAdapter.cpp \ @@ -178,9 +178,9 @@ libqpidbroker_la_SOURCES = \ qpid/broker/ExchangeRegistry.cpp \ qpid/broker/FanOutExchange.cpp \ qpid/broker/HeadersExchange.cpp \ - qpid/broker/InMemoryContent.cpp \ - qpid/broker/LazyLoadedContent.cpp \ + qpid/broker/Message.cpp \ qpid/broker/MessageBuilder.cpp \ + qpid/broker/MessageDelivery.cpp \ qpid/broker/MessageHandlerImpl.cpp \ qpid/broker/MessageStoreModule.cpp \ qpid/broker/NameGenerator.cpp \ @@ -191,7 +191,6 @@ libqpidbroker_la_SOURCES = \ qpid/broker/RecoveryManagerImpl.cpp \ qpid/broker/RecoveredEnqueue.cpp \ qpid/broker/RecoveredDequeue.cpp \ - qpid/broker/Reference.cpp \ qpid/broker/SessionState.h \ qpid/broker/SuspendedSessions.h \ qpid/broker/SuspendedSessions.cpp \ @@ -222,7 +221,6 @@ libqpidclient_la_SOURCES = \ qpid/client/FutureCompletion.cpp \ qpid/client/FutureResponse.cpp \ qpid/client/FutureFactory.cpp \ - qpid/client/ReceivedContent.cpp \ qpid/client/SessionCore.cpp \ qpid/client/StateManager.cpp @@ -232,10 +230,7 @@ nobase_include_HEADERS = \ qpid/broker/AccumulatedAck.h \ qpid/broker/BrokerChannel.h \ qpid/broker/BrokerExchange.h \ - qpid/broker/BrokerMessage.h \ - qpid/broker/BrokerMessageBase.h \ qpid/broker/BrokerQueue.h \ - qpid/broker/CompletionHandler.h \ qpid/broker/Consumer.h \ qpid/broker/Deliverable.h \ qpid/broker/DeliverableMessage.h \ @@ -252,8 +247,10 @@ nobase_include_HEADERS = \ qpid/broker/ExchangeRegistry.h \ qpid/broker/FanOutExchange.h \ qpid/broker/HandlerImpl.h \ - qpid/broker/InMemoryContent.h \ + qpid/broker/Message.h \ + qpid/broker/MessageAdapter.h \ qpid/broker/MessageBuilder.h \ + qpid/broker/MessageDelivery.h \ qpid/broker/MessageHandlerImpl.h \ qpid/broker/MessageStoreModule.h \ qpid/broker/NameGenerator.h \ @@ -269,23 +266,19 @@ nobase_include_HEADERS = \ qpid/broker/RecoveryManager.h \ qpid/broker/RecoveredEnqueue.h \ qpid/broker/RecoveredDequeue.h \ - qpid/broker/Reference.h \ qpid/broker/TxBuffer.h \ qpid/broker/TxOp.h \ qpid/broker/TxPublish.h \ qpid/broker/Broker.h \ qpid/broker/BrokerAdapter.h \ - qpid/broker/BrokerMessageMessage.h \ qpid/broker/BrokerSingleton.h \ qpid/broker/Connection.h \ qpid/broker/ConnectionAdapter.h \ qpid/broker/ConnectionFactory.h \ qpid/broker/ConnectionToken.h \ - qpid/broker/Content.h \ qpid/broker/Daemon.h \ qpid/broker/DeliveryRecord.h \ qpid/broker/HeadersExchange.h \ - qpid/broker/LazyLoadedContent.h \ qpid/broker/MessageStore.h \ qpid/broker/PersistableExchange.h \ qpid/broker/PersistableMessage.h \ @@ -316,7 +309,6 @@ nobase_include_HEADERS = \ qpid/client/FutureCompletion.h \ qpid/client/FutureResponse.h \ qpid/client/FutureFactory.h \ - qpid/client/ReceivedContent.h \ qpid/client/Response.h \ qpid/client/SessionCore.h \ qpid/client/StateManager.h \ @@ -334,6 +326,8 @@ nobase_include_HEADERS = \ qpid/framing/FieldTable.h \ qpid/framing/FrameDefaultVisitor.h \ qpid/framing/FramingContent.h \ + qpid/framing/FrameSet.h \ + qpid/framing/frame_functors.h \ qpid/framing/HeaderProperties.h \ qpid/framing/InitiationHandler.h \ qpid/framing/InputHandler.h \ @@ -343,10 +337,12 @@ nobase_include_HEADERS = \ qpid/framing/ProtocolVersion.h \ qpid/framing/ProtocolVersionException.h \ qpid/framing/Proxy.h \ - qpid/framing/SerializeHandler.h \ + qpid/framing/SendContent.h \ qpid/framing/SequenceNumber.h \ qpid/framing/SequenceNumberSet.h \ + qpid/framing/SerializeHandler.h \ qpid/framing/StructHelper.h \ + qpid/framing/TypeFilter.h \ qpid/framing/Value.h \ qpid/framing/Visitor.h \ qpid/framing/Uuid.h \ diff --git a/qpid/cpp/src/qpid/broker/AccumulatedAck.h b/qpid/cpp/src/qpid/broker/AccumulatedAck.h index be01c5e02c..b53f4a8ba5 100644 --- a/qpid/cpp/src/qpid/broker/AccumulatedAck.h +++ b/qpid/cpp/src/qpid/broker/AccumulatedAck.h @@ -48,13 +48,12 @@ namespace qpid { class AccumulatedAck { public: /** - * If not zero, then everything up to this value has been - * acked. + * Everything up to this value has been acked. */ DeliveryId mark; /** - * List of individually acked messages that are not - * included in the range marked by 'range'. + * List of individually acked messages greater than the + * 'mark'. */ std::list<Range> ranges; diff --git a/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp b/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp index b733f77390..07b7b4f638 100644 --- a/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp +++ b/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp @@ -21,6 +21,7 @@ #include "BrokerChannel.h" #include "Connection.h" #include "DeliveryToken.h" +#include "MessageDelivery.h" #include "qpid/framing/AMQMethodBody.h" #include "qpid/Exception.h" @@ -327,7 +328,7 @@ void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/, //need to generate name here, so we have it for the adapter (it is //also version specific behaviour now) if (newTag.empty()) newTag = tagGenerator.generate(); - DeliveryToken::shared_ptr token(BasicMessage::createConsumeToken(newTag)); + DeliveryToken::shared_ptr token(MessageDelivery::getBasicConsumeToken(newTag)); channel.consume(token, newTag, queue, noLocal, !noAck, exclusive, &fields); if(!nowait) client.consumeOk(newTag); @@ -340,21 +341,9 @@ void BrokerAdapter::BasicHandlerImpl::cancel(const string& consumerTag){ channel.cancel(consumerTag); } -void BrokerAdapter::BasicHandlerImpl::publish(uint16_t /*ticket*/, - const string& exchangeName, const string& routingKey, - bool rejectUnroutable, bool immediate) -{ - - // exeption moved to ChannelAdaptor -- TODO this code should be removed once basic is removed - - BasicMessage* msg = new BasicMessage(&connection, exchangeName, routingKey, rejectUnroutable, immediate); - channel.handlePublish(msg); - -} - void BrokerAdapter::BasicHandlerImpl::get(uint16_t /*ticket*/, const string& queueName, bool noAck){ Queue::shared_ptr queue = getQueue(queueName); - DeliveryToken::shared_ptr token(BasicMessage::createGetToken(queue)); + DeliveryToken::shared_ptr token(MessageDelivery::getBasicGetToken(queue)); if(!channel.get(token, queue, !noAck)){ string clusterId;//not used, part of an imatix hack @@ -384,7 +373,7 @@ void BrokerAdapter::TxHandlerImpl::select() void BrokerAdapter::TxHandlerImpl::commit() { - channel.commit(); + channel.commit(&broker.getStore()); } void BrokerAdapter::TxHandlerImpl::rollback() diff --git a/qpid/cpp/src/qpid/broker/BrokerAdapter.h b/qpid/cpp/src/qpid/broker/BrokerAdapter.h index 99b7f14525..9e0cf64b7f 100644 --- a/qpid/cpp/src/qpid/broker/BrokerAdapter.h +++ b/qpid/cpp/src/qpid/broker/BrokerAdapter.h @@ -183,9 +183,6 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations bool noLocal, bool noAck, bool exclusive, bool nowait, const qpid::framing::FieldTable& fields); void cancel(const std::string& consumerTag); - void publish(uint16_t ticket, - const std::string& exchange, const std::string& routingKey, - bool rejectUnroutable, bool immediate); void get(uint16_t ticket, const std::string& queue, bool noAck); void ack(uint64_t deliveryTag, bool multiple); void reject(uint64_t deliveryTag, bool requeue); diff --git a/qpid/cpp/src/qpid/broker/BrokerChannel.cpp b/qpid/cpp/src/qpid/broker/BrokerChannel.cpp index 9712b3903f..615a26beab 100644 --- a/qpid/cpp/src/qpid/broker/BrokerChannel.cpp +++ b/qpid/cpp/src/qpid/broker/BrokerChannel.cpp @@ -32,13 +32,12 @@ #include "BrokerAdapter.h" #include "BrokerChannel.h" -#include "BrokerMessage.h" #include "BrokerQueue.h" #include "Connection.h" #include "DeliverableMessage.h" #include "DtxAck.h" #include "DtxTimeout.h" -#include "MessageStore.h" +#include "Message.h" #include "TxAck.h" #include "TxPublish.h" @@ -49,7 +48,7 @@ using namespace qpid::framing; using namespace qpid::sys; -Channel::Channel(Connection& con, DeliveryAdapter& _out, ChannelId _id, MessageStore* const _store) : +Channel::Channel(Connection& con, DeliveryAdapter& _out, ChannelId _id) : id(_id), connection(con), out(_out), @@ -58,8 +57,6 @@ Channel::Channel(Connection& con, DeliveryAdapter& _out, ChannelId _id, MessageS tagGenerator("sgen"), dtxSelected(false), accumulatedAck(0), - store(_store), - messageBuilder(this, _store, connection.getStagingThreshold()), opened(id == 0),//channel 0 is automatically open, other must be explicitly opened flowActive(true) { @@ -108,7 +105,7 @@ void Channel::startTx() txBuffer = TxBuffer::shared_ptr(new TxBuffer()); } -void Channel::commit() +void Channel::commit(MessageStore* const store) { if (!txBuffer) throw ConnectionException(503, "Channel has not been selected for use with transactions"); @@ -296,34 +293,7 @@ void Channel::ConsumerImpl::requestDispatch() queue->requestDispatch(); } -void Channel::handleInlineTransfer(Message::shared_ptr msg) -{ - complete(msg); -} - -void Channel::handlePublish(Message* _message) -{ - Message::shared_ptr message(_message); - messageBuilder.initialise(message); -} - -void Channel::handleHeader(AMQHeaderBody* header) -{ - messageBuilder.setHeader(header); - //at this point, decide based on the size of the message whether we want - //to stage it by saving content directly to disk as it arrives -} - -void Channel::handleContent(AMQContentBody* content) -{ - messageBuilder.addContent(content); -} - -void Channel::handleHeartbeat(AMQHeartbeatBody*) { - // TODO aconway 2007-01-17: Implement heartbeating. -} - -void Channel::complete(Message::shared_ptr msg) { +void Channel::handle(Message::shared_ptr msg) { if (txBuffer.get()) { TxPublish* deliverable(new TxPublish(msg)); TxOp::shared_ptr op(deliverable); @@ -335,20 +305,12 @@ void Channel::complete(Message::shared_ptr msg) { } } - - void Channel::route(Message::shared_ptr msg, Deliverable& strategy) { - - std::string routeToExchangeName = msg->getExchange(); - // cache the exchange lookup - if (!cacheExchange.get() || cacheExchangeName != routeToExchangeName){ - cacheExchangeName = routeToExchangeName; - cacheExchange = connection.broker.getExchanges().get(routeToExchangeName); + std::string exchangeName = msg->getExchangeName(); + if (!cacheExchange || cacheExchange->getName() != exchangeName){ + cacheExchange = connection.broker.getExchanges().get(exchangeName); } - if (!cacheExchange.get() ) - throw ChannelException(404, "Exchange not found '" + routeToExchangeName + "'"); - cacheExchange->route(strategy, msg->getRoutingKey(), &(msg->getApplicationHeaders())); if (!strategy.delivered) { diff --git a/qpid/cpp/src/qpid/broker/BrokerChannel.h b/qpid/cpp/src/qpid/broker/BrokerChannel.h index fcfcd73679..cdbab37ebc 100644 --- a/qpid/cpp/src/qpid/broker/BrokerChannel.h +++ b/qpid/cpp/src/qpid/broker/BrokerChannel.h @@ -37,14 +37,12 @@ #include "Deliverable.h" #include "DtxBuffer.h" #include "DtxManager.h" -#include "MessageBuilder.h" #include "NameGenerator.h" #include "Prefetch.h" #include "TxBuffer.h" #include "qpid/framing/amqp_types.h" #include "qpid/framing/ChannelAdapter.h" #include "qpid/framing/ChannelOpenBody.h" -#include "CompletionHandler.h" namespace qpid { namespace broker { @@ -60,7 +58,7 @@ using framing::string; * Maintains state for an AMQP channel. Handles incoming and * outgoing messages for that channel. */ -class Channel : public CompletionHandler +class Channel { class ConsumerImpl : public Consumer { @@ -113,25 +111,22 @@ class Channel : public CompletionHandler DtxBuffer::shared_ptr dtxBuffer; bool dtxSelected; AccumulatedAck accumulatedAck; - MessageStore* const store; - MessageBuilder messageBuilder;//builder for in-progress message bool opened; bool flowActive; - std::string cacheExchangeName; // pair holds last exchange used for routing - Exchange::shared_ptr cacheExchange; + boost::shared_ptr<Exchange> cacheExchange; void route(Message::shared_ptr msg, Deliverable& strategy); - void complete(Message::shared_ptr msg);// completion handler for MessageBuilder void record(const DeliveryRecord& delivery); bool checkPrefetch(Message::shared_ptr& msg); void checkDtxTimeout(); ConsumerImpl& find(const std::string& destination); void ack(DeliveryId deliveryTag, DeliveryId endTag, bool cumulative); void acknowledged(const DeliveryRecord&); - + + public: - Channel(Connection& parent, DeliveryAdapter& out, framing::ChannelId id, MessageStore* const store = 0); + Channel(Connection& parent, DeliveryAdapter& out, framing::ChannelId id); ~Channel(); bool isOpen() const { return opened; } @@ -162,7 +157,7 @@ class Channel : public CompletionHandler bool get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool ackExpected); void close(); void startTx(); - void commit(); + void commit(MessageStore* const store); void rollback(); void selectDtx(); void startDtx(const std::string& xid, DtxManager& mgr, bool join); @@ -174,12 +169,8 @@ class Channel : public CompletionHandler void recover(bool requeue); void flow(bool active); void deliver(Message::shared_ptr& msg, const string& consumerTag, DeliveryId deliveryTag); - void handlePublish(Message* msg); - void handleHeader(framing::AMQHeaderBody*); - void handleContent(framing::AMQContentBody*); - void handleHeartbeat(framing::AMQHeartbeatBody*); - - void handleInlineTransfer(Message::shared_ptr msg); + + void handle(Message::shared_ptr msg); }; }} // namespace broker diff --git a/qpid/cpp/src/qpid/broker/BrokerExchange.h b/qpid/cpp/src/qpid/broker/BrokerExchange.h index 91c295e1b7..c3dd7b998d 100644 --- a/qpid/cpp/src/qpid/broker/BrokerExchange.h +++ b/qpid/cpp/src/qpid/broker/BrokerExchange.h @@ -51,7 +51,7 @@ namespace qpid { : name(_name), durable(_durable), args(_args), alternateUsers(0), persistenceId(0){} virtual ~Exchange(){} - string getName() const { return name; } + const string& getName() const { return name; } bool isDurable() { return durable; } qpid::framing::FieldTable& getArgs() { return args; } diff --git a/qpid/cpp/src/qpid/broker/BrokerMessage.cpp b/qpid/cpp/src/qpid/broker/BrokerMessage.cpp deleted file mode 100644 index bddd5802cf..0000000000 --- a/qpid/cpp/src/qpid/broker/BrokerMessage.cpp +++ /dev/null @@ -1,299 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <boost/cast.hpp> - -#include "BrokerMessage.h" -#include <iostream> - -#include "InMemoryContent.h" -#include "LazyLoadedContent.h" -#include "MessageStore.h" -#include "BrokerQueue.h" -#include "qpid/log/Statement.h" -#include "qpid/framing/BasicDeliverBody.h" -#include "qpid/framing/BasicGetOkBody.h" -#include "qpid/framing/BasicPublishBody.h" -#include "qpid/framing/AMQContentBody.h" -#include "qpid/framing/AMQHeaderBody.h" -#include "qpid/framing/AMQMethodBody.h" -#include "qpid/framing/AMQFrame.h" -#include "qpid/framing/ChannelAdapter.h" -#include "RecoveryManagerImpl.h" - -namespace qpid{ -namespace broker{ - -struct BasicGetToken : DeliveryToken -{ - typedef boost::shared_ptr<BasicGetToken> shared_ptr; - - Queue::shared_ptr queue; - - BasicGetToken(Queue::shared_ptr q) : queue(q) {} -}; - -struct BasicConsumeToken : DeliveryToken -{ - typedef boost::shared_ptr<BasicConsumeToken> shared_ptr; - - const string consumer; - - BasicConsumeToken(const string c) : consumer(c) {} -}; - -} -} - -using namespace boost; -using namespace qpid::broker; -using namespace qpid::framing; -using namespace qpid::sys; - -BasicMessage::BasicMessage( - const ConnectionToken* const _publisher, - const string& _exchange, const string& _routingKey, - bool _mandatory, bool _immediate -) : - Message(_publisher, _exchange, _routingKey, _mandatory, _immediate), - size(0) -{} - -// For tests only. -BasicMessage::BasicMessage() : isHeaderSet(false), size(0) {} - -BasicMessage::~BasicMessage(){} - -void BasicMessage::setHeader(AMQHeaderBody* _header){ - if (_header) { - this->header = *_header; - isHeaderSet = true; - } - else - isHeaderSet = false; -} - -void BasicMessage::addContent(AMQContentBody* data){ - if (!content.get()) { - content = std::auto_ptr<Content>(new InMemoryContent()); - } - content->add(data); - size += data->size(); -} - -bool BasicMessage::isComplete(){ - return isHeaderSet && (header.getContentSize() == contentSize()); -} - -DeliveryToken::shared_ptr BasicMessage::createGetToken(Queue::shared_ptr queue) -{ - return DeliveryToken::shared_ptr(new BasicGetToken(queue)); -} - -DeliveryToken::shared_ptr BasicMessage::createConsumeToken(const string& consumer) -{ - return DeliveryToken::shared_ptr(new BasicConsumeToken(consumer)); -} - -void BasicMessage::deliver(ChannelAdapter& channel, - const string& consumerTag, DeliveryId id, - uint32_t framesize) -{ - channel.send(BasicDeliverBody( - channel.getVersion(), consumerTag, id.getValue(), - getRedelivered(), getExchange(), getRoutingKey())); - sendContent(channel, framesize); -} - -void BasicMessage::sendGetOk(ChannelAdapter& channel, - uint32_t messageCount, - DeliveryId id, - uint32_t framesize) -{ - channel.send( - BasicGetOkBody( - channel.getVersion(), - id.getValue(), getRedelivered(), getExchange(), - getRoutingKey(), messageCount)); - sendContent(channel, framesize); -} - -void BasicMessage::deliver(framing::ChannelAdapter& channel, DeliveryId id, DeliveryToken::shared_ptr token, uint32_t framesize) -{ - BasicConsumeToken::shared_ptr consume = dynamic_pointer_cast<BasicConsumeToken>(token); - if (consume) { - deliver(channel, consume->consumer, id, framesize); - } else { - BasicGetToken::shared_ptr get = dynamic_pointer_cast<BasicGetToken>(token); - if (get) { - sendGetOk(channel, get->queue->getMessageCount(), id.getValue(), framesize); - } else { - //TODO: - //either need to be able to convert to a message transfer or - //throw error of some kind to allow this to be handled higher up - throw Exception("Conversion to BasicMessage not defined!"); - } - } -} - -void BasicMessage::sendContent(ChannelAdapter& channel, uint32_t framesize) -{ - channel.send(header); - Mutex::ScopedLock locker(contentLock); - if (content.get()) - content->send(channel, framesize); -} - -BasicHeaderProperties* BasicMessage::getHeaderProperties(){ - return isHeaderSet ? dynamic_cast<BasicHeaderProperties*>(header.getProperties()) : 0; -} - -const FieldTable& BasicMessage::getApplicationHeaders(){ - return getHeaderProperties()->getHeaders(); -} - -bool BasicMessage::isPersistent() -{ - if(!isHeaderSet) return false; - BasicHeaderProperties* props = getHeaderProperties(); - return props && props->getDeliveryMode() == PERSISTENT; -} - -void BasicMessage::decode(Buffer& buffer, bool headersOnly, uint32_t contentChunkSize) -{ - decodeHeader(buffer); - if (!headersOnly) decodeContent(buffer, contentChunkSize); -} - -void BasicMessage::decodeHeader(Buffer& buffer) -{ - //don't care about the type here, but want encode/decode to be symmetric - RecoveryManagerImpl::decodeMessageType(buffer); - - string exchange; - string routingKey; - - buffer.getShortString(exchange); - buffer.getShortString(routingKey); - setRouting(exchange, routingKey); - - uint32_t headerSize = buffer.getLong(); - AMQHeaderBody headerBody; - headerBody.decode(buffer, headerSize); - setHeader(&headerBody); -} - -void BasicMessage::decodeContent(Buffer& buffer, uint32_t chunkSize) -{ - uint64_t expected = expectedContentSize(); - if (expected != buffer.available()) { - QPID_LOG(error, "Expected " << expectedContentSize() << " bytes, got " << buffer.available()); - throw Exception("Cannot decode content, buffer not large enough."); - } - - if (!chunkSize || chunkSize > expected) { - chunkSize = expected; - } - - uint64_t total = 0; - while (total < expectedContentSize()) { - uint64_t remaining = expected - total; - AMQContentBody contentBody; - contentBody.decode(buffer, remaining < chunkSize ? remaining : chunkSize); - addContent(&contentBody); - total += chunkSize; - } -} - -void BasicMessage::encode(Buffer& buffer) const -{ - encodeHeader(buffer); - encodeContent(buffer); -} - -void BasicMessage::encodeHeader(Buffer& buffer) const -{ - RecoveryManagerImpl::encodeMessageType(*this, buffer); - buffer.putShortString(getExchange()); - buffer.putShortString(getRoutingKey()); - buffer.putLong(header.size()); - header.encode(buffer); -} - -void BasicMessage::encodeContent(Buffer& buffer) const -{ - Mutex::ScopedLock locker(contentLock); - if (content.get()) content->encode(buffer); -} - -uint32_t BasicMessage::encodedSize() const -{ - return encodedHeaderSize() + encodedContentSize(); -} - -uint32_t BasicMessage::encodedContentSize() const -{ - Mutex::ScopedLock locker(contentLock); - return content.get() ? content->size() : 0; -} - -uint32_t BasicMessage::encodedHeaderSize() const -{ - return RecoveryManagerImpl::encodedMessageTypeSize() - +getExchange().size() + 1 - + getRoutingKey().size() + 1 - + header.size() + 4;//4 extra bytes for size -} - -uint64_t BasicMessage::expectedContentSize() -{ - return isHeaderSet ? header.getContentSize() : 0; -} - -void BasicMessage::releaseContent(MessageStore* store) -{ - Mutex::ScopedLock locker(contentLock); - if (!isPersistent() && getPersistenceId() == 0) { - store->stage(*this); - } - if (!content.get() || content->size() > 0) { - //set content to lazy loading mode (but only if there is - //stored content): - - //Note: the LazyLoadedContent instance contains a raw pointer - //to the message, however it is then set as a member of that - //message so its lifetime is guaranteed to be no longer than - //that of the message itself - content = std::auto_ptr<Content>( - new LazyLoadedContent(store, this, expectedContentSize())); - } -} - -void BasicMessage::setContent(std::auto_ptr<Content>& _content) -{ - Mutex::ScopedLock locker(contentLock); - content = _content; -} - - -uint32_t BasicMessage::getRequiredCredit() const -{ - return header.size() + contentSize(); -} diff --git a/qpid/cpp/src/qpid/broker/BrokerMessage.h b/qpid/cpp/src/qpid/broker/BrokerMessage.h deleted file mode 100644 index 0f46ff2e83..0000000000 --- a/qpid/cpp/src/qpid/broker/BrokerMessage.h +++ /dev/null @@ -1,146 +0,0 @@ -#ifndef _broker_BrokerMessage_h -#define _broker_BrokerMessage_h - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <memory> -#include <boost/shared_ptr.hpp> - -#include "BrokerMessageBase.h" -#include "qpid/framing/BasicHeaderProperties.h" -#include "qpid/framing/AMQHeaderBody.h" -#include "ConnectionToken.h" -#include "Content.h" -#include "qpid/sys/Mutex.h" -#include "TxBuffer.h" - -namespace qpid { - -namespace framing { -class ChannelAdapter; -class AMQHeaderBody; -} - -namespace broker { - -class MessageStore; -class Queue; -using framing::string; - -/** - * Represents an AMQP message, i.e. a header body, a list of - * content bodies and some details about the publication - * request. - */ -class BasicMessage : public Message { - framing::AMQHeaderBody header; - bool isHeaderSet; - std::auto_ptr<Content> content; - mutable sys::Mutex contentLock; - uint64_t size; - - void sendContent(framing::ChannelAdapter&, uint32_t framesize); - - public: - typedef boost::shared_ptr<BasicMessage> shared_ptr; - - BasicMessage(const ConnectionToken* const publisher, - const string& exchange, const string& routingKey, - bool mandatory, bool immediate); - BasicMessage(); - ~BasicMessage(); - void setHeader(framing::AMQHeaderBody* header); - void addContent(framing::AMQContentBody* data); - bool isComplete(); - - static DeliveryToken::shared_ptr createGetToken(boost::shared_ptr<Queue> queue); - static DeliveryToken::shared_ptr createConsumeToken(const string& consumer); - void deliver(framing::ChannelAdapter& channel, DeliveryId deliveryTag, DeliveryToken::shared_ptr token, uint32_t framesize); - - void deliver(framing::ChannelAdapter&, - const string& consumerTag, - DeliveryId deliveryTag, - uint32_t framesize); - - void sendGetOk(framing::ChannelAdapter& channel, - uint32_t messageCount, - DeliveryId deliveryTag, - uint32_t framesize); - - framing::BasicHeaderProperties* getHeaderProperties(); - const framing::FieldTable& getApplicationHeaders(); - bool isPersistent(); - uint64_t contentSize() const { return size; } - - void decode(framing::Buffer& buffer, bool headersOnly = false, - uint32_t contentChunkSize = 0); - void decodeHeader(framing::Buffer& buffer); - void decodeContent(framing::Buffer& buffer, uint32_t contentChunkSize = 0); - - void encode(framing::Buffer& buffer) const; - void encodeHeader(framing::Buffer& buffer) const; - void encodeContent(framing::Buffer& buffer) const; - /** - * @returns the size of the buffer needed to encode this - * message in its entirety - */ - uint32_t encodedSize() const; - /** - * @returns the size of the buffer needed to encode the - * 'header' of this message (not just the header frame, - * but other meta data e.g.routing key and exchange) - */ - uint32_t encodedHeaderSize() const; - /** - * @returns the size of the buffer needed to encode the - * (possibly partial) content held by this message - */ - uint32_t encodedContentSize() const; - /** - * Releases the in-memory content data held by this - * message. Must pass in a store from which the data can - * be reloaded. - */ - void releaseContent(MessageStore* store); - /** - * If headers have been received, returns the expected - * content size else returns 0. - */ - uint64_t expectedContentSize(); - /** - * Sets the 'content' implementation of this message (the - * message controls the lifecycle of the content instance - * it uses). - */ - void setContent(std::auto_ptr<Content>& content); - - /** - * Returns the byte credits required to transfer this message. - */ - uint32_t getRequiredCredit() const; -}; - -} -} - - -#endif /*!_broker_BrokerMessage_h*/ diff --git a/qpid/cpp/src/qpid/broker/BrokerMessageBase.h b/qpid/cpp/src/qpid/broker/BrokerMessageBase.h deleted file mode 100644 index bac5dc6386..0000000000 --- a/qpid/cpp/src/qpid/broker/BrokerMessageBase.h +++ /dev/null @@ -1,168 +0,0 @@ -#ifndef _broker_BrokerMessageBase_h -#define _broker_BrokerMessageBase_h - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <string> -#include <boost/shared_ptr.hpp> -#include "Content.h" -#include "DeliveryId.h" -#include "DeliveryToken.h" -#include "PersistableMessage.h" -#include "qpid/framing/amqp_types.h" - -namespace qpid { - -namespace framing { -class ChannelAdapter; -class BasicHeaderProperties; -class FieldTable; -class AMQMethodBody; -class AMQContentBody; -class AMQHeaderBody; -} - - -namespace broker { -class ConnectionToken; -class MessageStore; - -/** - * Base class for all types of internal broker messages - * abstracting away the operations - * TODO; AMS: for the moment this is mostly a placeholder - */ -class Message : public PersistableMessage{ - public: - typedef boost::shared_ptr<Message> shared_ptr; - - Message(const ConnectionToken* publisher_, - const std::string& _exchange, - const std::string& _routingKey, - bool _mandatory, bool _immediate) : - publisher(publisher_), - exchange(_exchange), - routingKey(_routingKey), - mandatory(_mandatory), - immediate(_immediate), - persistenceId(0), - redelivered(false) - {} - - Message() : - mandatory(false), - immediate(false), - persistenceId(0), - redelivered(false) - {} - - virtual ~Message() {}; - - // Accessors - const std::string& getRoutingKey() const { return routingKey; } - const std::string& getExchange() const { return exchange; } - uint64_t getPersistenceId() const { return persistenceId; } - bool getRedelivered() const { return redelivered; } - - void setRouting(const std::string& _exchange, const std::string& _routingKey) - { exchange = _exchange; routingKey = _routingKey; } - void setPersistenceId(uint64_t _persistenceId) const { persistenceId = _persistenceId; } - void redeliver() { redelivered = true; } - - virtual void deliver(framing::ChannelAdapter& channel, DeliveryId deliveryTag/*only needed for basic class*/, - DeliveryToken::shared_ptr token, uint32_t framesize) = 0; - - virtual bool isComplete() = 0; - - virtual uint64_t contentSize() const = 0; - virtual framing::BasicHeaderProperties* getHeaderProperties() = 0; - virtual const framing::FieldTable& getApplicationHeaders() = 0; - virtual bool isPersistent() = 0; - virtual const ConnectionToken* getPublisher() const { - return publisher; - } - - virtual uint32_t getRequiredCredit() const = 0; - - virtual void encode(framing::Buffer& buffer) const = 0; - virtual void encodeHeader(framing::Buffer& buffer) const = 0; - - /** - * @returns the size of the buffer needed to encode this - * message in its entirety - */ - virtual uint32_t encodedSize() const = 0; - /** - * @returns the size of the buffer needed to encode the - * 'header' of this message (not just the header frame, - * but other meta data e.g.routing key and exchange) - */ - virtual uint32_t encodedHeaderSize() const = 0; - /** - * @returns the size of the buffer needed to encode the - * (possibly partial) content held by this message - */ - virtual uint32_t encodedContentSize() const = 0; - /** - * If headers have been received, returns the expected - * content size else returns 0. - */ - virtual uint64_t expectedContentSize() = 0; - - virtual void decodeHeader(framing::Buffer& buffer) = 0; - virtual void decodeContent(framing::Buffer& buffer, uint32_t contentChunkSize = 0) = 0; - - static shared_ptr decode(framing::Buffer& buffer); - - // TODO: AMS 29/1/2007 Don't think these are really part of base class - - /** - * Sets the 'content' implementation of this message (the - * message controls the lifecycle of the content instance - * it uses). - */ - virtual void setContent(std::auto_ptr<Content>& /*content*/) {}; - virtual void setHeader(framing::AMQHeaderBody*) {}; - virtual void addContent(framing::AMQContentBody*) {}; - /** - * Releases the in-memory content data held by this - * message. Must pass in a store from which the data can - * be reloaded. - */ - virtual void releaseContent(MessageStore* /*store*/) {}; - - bool isImmediate() const { return immediate; } - - private: - const ConnectionToken* publisher; - std::string exchange; - std::string routingKey; - const bool mandatory; - const bool immediate; - mutable uint64_t persistenceId; - bool redelivered; -}; - -}} - - -#endif /*!_broker_BrokerMessage_h*/ diff --git a/qpid/cpp/src/qpid/broker/BrokerMessageMessage.cpp b/qpid/cpp/src/qpid/broker/BrokerMessageMessage.cpp deleted file mode 100644 index 1184885aeb..0000000000 --- a/qpid/cpp/src/qpid/broker/BrokerMessageMessage.cpp +++ /dev/null @@ -1,328 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "qpid/QpidError.h" -#include "BrokerMessageMessage.h" -#include "qpid/framing/ChannelAdapter.h" -#include "qpid/framing/MessageTransferBody.h" -#include "qpid/framing/MessageOpenBody.h" -#include "qpid/framing/MessageCloseBody.h" -#include "qpid/framing/MessageAppendBody.h" -#include "Reference.h" -#include "qpid/framing/AMQFrame.h" -#include "qpid/framing/FieldTable.h" -#include "qpid/framing/BasicHeaderProperties.h" -#include "RecoveryManagerImpl.h" - -#include <algorithm> - -using namespace std; -using namespace boost; -using namespace qpid::framing; - -namespace qpid { -namespace broker { - -struct MessageDeliveryToken : public DeliveryToken -{ - const std::string destination; - - MessageDeliveryToken(const std::string& d) : destination(d) {} -}; - -MessageMessage::MessageMessage( - ConnectionToken* publisher, const MessageTransferBody* transfer_ -) : Message(publisher, transfer_->getDestination(), - transfer_->getRoutingKey(), - transfer_->getRejectUnroutable(), - transfer_->getImmediate()), - transfer(*transfer_) -{ - assert(transfer.getBody().isInline()); -} - -MessageMessage::MessageMessage( - ConnectionToken* publisher, const MessageTransferBody* transfer_, - ReferencePtr reference_ -) : Message(publisher, transfer_->getDestination(), - transfer_->getRoutingKey(), - transfer_->getRejectUnroutable(), - transfer_->getImmediate()), - transfer(*transfer_), - reference(reference_) -{ - assert(!transfer.getBody().isInline()); - assert(reference_); -} - -/** - * Currently used by message store impls to recover messages - */ -MessageMessage::MessageMessage() {} - -// TODO: astitcher 1-Mar-2007: This code desperately needs better factoring -void MessageMessage::transferMessage( - framing::ChannelAdapter& channel, - const std::string& consumerTag, - uint32_t framesize) -{ - const framing::Content& body = transfer.getBody(); - // Send any reference data - ReferencePtr ref= getReference(); - if (ref){ - - // Open - channel.send(MessageOpenBody(channel.getVersion(), ref->getId())); - // Appends - for(Reference::Appends::const_iterator a = ref->getAppends().begin(); - a != ref->getAppends().end(); - ++a) { - uint32_t sizeleft = a->size(); - const string& content = a->getBytes(); - // Calculate overhead bytes - // Assume that the overhead is constant as the reference name doesn't change - uint32_t overhead = sizeleft - content.size(); - string::size_type contentStart = 0; - while (sizeleft) { - string::size_type contentSize = sizeleft <= framesize ? sizeleft : framesize-overhead; - - channel.send(MessageAppendBody(channel.getVersion(), ref->getId(), - string(content, contentStart, contentSize))); - sizeleft -= contentSize; - contentStart += contentSize; - } - } - } - - // The transfer - if ( transfer.size()<=framesize ) { - channel.send(MessageTransferBody(ProtocolVersion(), - transfer.getTicket(), - consumerTag, - getRedelivered(), - transfer.getRejectUnroutable(), - transfer.getImmediate(), - transfer.getTtl(), - transfer.getPriority(), - transfer.getTimestamp(), - transfer.getDeliveryMode(), - transfer.getExpiration(), - getExchange(), - getRoutingKey(), - transfer.getMessageId(), - transfer.getCorrelationId(), - transfer.getReplyTo(), - transfer.getContentType(), - transfer.getContentEncoding(), - 0, /*content-length*/ - string(), /*type*/ - transfer.getUserId(), - transfer.getAppId(), - transfer.getTransactionId(), - transfer.getSecurityToken(), - transfer.getApplicationHeaders(), - body)); - } else { - // Thing to do here is to construct a simple reference message then deliver that instead - // fragmentation will be taken care of in the delivery if necessary; - string content = body.getValue(); - string refname = "dummy"; - MessageTransferBody newTransfer(channel.getVersion(), - transfer.getTicket(), - consumerTag, - getRedelivered(), - transfer.getRejectUnroutable(), - transfer.getImmediate(), - transfer.getTtl(), - transfer.getPriority(), - transfer.getTimestamp(), - transfer.getDeliveryMode(), - transfer.getExpiration(), - getExchange(), - getRoutingKey(), - transfer.getMessageId(), - transfer.getCorrelationId(), - transfer.getReplyTo(), - transfer.getContentType(), - transfer.getContentEncoding(), - 0, /*content-length*/ - string(), /*type*/ - transfer.getUserId(), - transfer.getAppId(), - transfer.getTransactionId(), - transfer.getSecurityToken(), - transfer.getApplicationHeaders(), - framing::Content(REFERENCE, refname)); - ReferencePtr newRef(new Reference(refname)); - newRef->append(MessageAppendBody(channel.getVersion(), refname, content)); - MessageMessage newMsg(const_cast<ConnectionToken*>(getPublisher()), &newTransfer, newRef); - newMsg.transferMessage(channel, consumerTag, framesize); - return; - } - // Close any reference data - if (ref) - channel.send(MessageCloseBody(ProtocolVersion(), ref->getId())); -} - - -void MessageMessage::deliver(ChannelAdapter& channel, DeliveryId, DeliveryToken::shared_ptr token, uint32_t framesize) -{ - transferMessage(channel, shared_polymorphic_cast<MessageDeliveryToken>(token)->destination, framesize); -} - -void MessageMessage::deliver(ChannelAdapter& channel, const std::string& destination, uint32_t framesize) -{ - transferMessage(channel, destination, framesize); -} - -bool MessageMessage::isComplete() -{ - return true; -} - -uint64_t MessageMessage::contentSize() const -{ - if (transfer.getBody().isInline()) - return transfer.getBody().getValue().size(); - else { - assert(getReference()); - return getReference()->getSize(); - } -} - -qpid::framing::BasicHeaderProperties* MessageMessage::getHeaderProperties() -{ - return 0; // FIXME aconway 2007-02-05: -} - -const FieldTable& MessageMessage::getApplicationHeaders() -{ - return transfer.getApplicationHeaders(); -} -bool MessageMessage::isPersistent() -{ - return transfer.getDeliveryMode() == PERSISTENT; -} - -uint32_t MessageMessage::encodedSize() const -{ - return encodedHeaderSize() + encodedContentSize(); -} - -uint32_t MessageMessage::encodedHeaderSize() const -{ - return RecoveryManagerImpl::encodedMessageTypeSize() + transfer.size(); -} - -uint32_t MessageMessage::encodedContentSize() const -{ - return 0; -} - -uint64_t MessageMessage::expectedContentSize() -{ - return 0; -} - -void MessageMessage::encode(Buffer& buffer) const -{ - encodeHeader(buffer); -} - -void MessageMessage::encodeHeader(Buffer& buffer) const -{ - RecoveryManagerImpl::encodeMessageType(*this, buffer); - if (transfer.getBody().isInline()) { - transfer.encode(buffer); - } else { - assert(getReference()); - string data; - const Reference::Appends& appends = getReference()->getAppends(); - for(Reference::Appends::const_iterator a = appends.begin(); a != appends.end(); ++a) { - data += a->getBytes(); - } - framing::Content body(INLINE, data); - copyTransfer(ProtocolVersion(), transfer.getDestination(), body).encode(buffer); - } -} - -void MessageMessage::decodeHeader(Buffer& buffer) -{ - //don't care about the type here, but want encode/decode to be symmetric - RecoveryManagerImpl::decodeMessageType(buffer); - transfer.decode(buffer); -} - -void MessageMessage::decodeContent(Buffer& /*buffer*/, uint32_t /*chunkSize*/) -{ -} - - -MessageTransferBody MessageMessage::copyTransfer(const ProtocolVersion& version, - const string& destination, - const framing::Content& body) const -{ - return MessageTransferBody(version, - transfer.getTicket(), - destination, - getRedelivered(), - transfer.getRejectUnroutable(), - transfer.getImmediate(), - transfer.getTtl(), - transfer.getPriority(), - transfer.getTimestamp(), - transfer.getDeliveryMode(), - transfer.getExpiration(), - getExchange(), - getRoutingKey(), - transfer.getMessageId(), - transfer.getCorrelationId(), - transfer.getReplyTo(), - transfer.getContentType(), - transfer.getContentEncoding(), - 0, /*content-length*/ - string(), /*type*/ - transfer.getUserId(), - transfer.getAppId(), - transfer.getTransactionId(), - transfer.getSecurityToken(), - transfer.getApplicationHeaders(), - body); -} - -MessageMessage::ReferencePtr MessageMessage::getReference() const { - return reference; -} - -uint32_t MessageMessage::getRequiredCredit() const -{ - //TODO: change when encoding changes. Should be the payload of any - //header & body frames. - return transfer.size(); -} - - -DeliveryToken::shared_ptr MessageMessage::getToken(const std::string& destination) -{ - return DeliveryToken::shared_ptr(new MessageDeliveryToken(destination)); -} - -}} // namespace qpid::broker - diff --git a/qpid/cpp/src/qpid/broker/BrokerMessageMessage.h b/qpid/cpp/src/qpid/broker/BrokerMessageMessage.h deleted file mode 100644 index 6bfd0e045d..0000000000 --- a/qpid/cpp/src/qpid/broker/BrokerMessageMessage.h +++ /dev/null @@ -1,90 +0,0 @@ -#ifndef _broker_BrokerMessageMessage_h -#define _broker_BrokerMessageMessage_h - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "BrokerMessageBase.h" -#include "qpid/framing/MessageTransferBody.h" -#include "qpid/framing/amqp_types.h" -#include <boost/weak_ptr.hpp> -#include <vector> - -namespace qpid { - -namespace broker { -class ConnectionToken; -class Reference; - -class MessageMessage: public Message{ - public: - typedef boost::shared_ptr<MessageMessage> shared_ptr; - typedef boost::shared_ptr<Reference> ReferencePtr; - - MessageMessage(ConnectionToken* publisher, const framing::MessageTransferBody* transfer); - MessageMessage(ConnectionToken* publisher, const framing::MessageTransferBody* transfer, ReferencePtr reference); - MessageMessage(); - - // Default destructor okay - - framing::MessageTransferBody* getTransfer() const { return const_cast<framing::MessageTransferBody*>(&transfer); } - ReferencePtr getReference() const ; - - void deliver(framing::ChannelAdapter& channel, DeliveryId deliveryTag, DeliveryToken::shared_ptr token, uint32_t framesize); - void deliver(framing::ChannelAdapter&, const std::string& destination, uint32_t framesize); - - bool isComplete(); - - uint64_t contentSize() const; - framing::BasicHeaderProperties* getHeaderProperties(); - const framing::FieldTable& getApplicationHeaders(); - bool isPersistent(); - - void encode(framing::Buffer& buffer) const; - void encodeHeader(framing::Buffer& buffer) const; - uint32_t encodedSize() const; - uint32_t encodedHeaderSize() const; - uint32_t encodedContentSize() const; - uint64_t expectedContentSize(); - void decodeHeader(framing::Buffer& buffer); - void decodeContent(framing::Buffer& buffer, uint32_t contentChunkSize = 0); - uint32_t getRequiredCredit() const; - - static DeliveryToken::shared_ptr getToken(const std::string& destination); - - private: - void transferMessage( - framing::ChannelAdapter& channel, - const std::string& consumerTag, - uint32_t framesize); - - framing::MessageTransferBody copyTransfer( - const framing::ProtocolVersion& version, - const std::string& destination, - const framing::Content& body) const; - - framing::MessageTransferBody transfer; - const boost::shared_ptr<Reference> reference; -}; - -}} - - -#endif /*!_broker_BrokerMessage_h*/ diff --git a/qpid/cpp/src/qpid/broker/BrokerQueue.cpp b/qpid/cpp/src/qpid/broker/BrokerQueue.cpp index 5ff9f950eb..7311d043d0 100644 --- a/qpid/cpp/src/qpid/broker/BrokerQueue.cpp +++ b/qpid/cpp/src/qpid/broker/BrokerQueue.cpp @@ -88,7 +88,7 @@ void Queue::deliver(Message::shared_ptr& msg){ void Queue::recover(Message::shared_ptr& msg){ push(msg); msg->enqueueComplete(); // mark the message as enqueued - if (store && msg->expectedContentSize() != msg->encodedContentSize()) { + if (store && !msg->isContentLoaded()) { //content has not been loaded, need to ensure that lazy loading mode is set: //TODO: find a nicer way to do this msg->releaseContent(store); diff --git a/qpid/cpp/src/qpid/broker/BrokerQueue.h b/qpid/cpp/src/qpid/broker/BrokerQueue.h index 962c11d8ee..5ba103d3ed 100644 --- a/qpid/cpp/src/qpid/broker/BrokerQueue.h +++ b/qpid/cpp/src/qpid/broker/BrokerQueue.h @@ -28,7 +28,7 @@ #include "qpid/framing/amqp_types.h" #include "ConnectionToken.h" #include "Consumer.h" -#include "BrokerMessage.h" +#include "Message.h" #include "qpid/framing/FieldTable.h" #include "qpid/sys/Serializer.h" #include "qpid/sys/Monitor.h" @@ -43,6 +43,7 @@ namespace qpid { namespace broker { class MessageStore; class QueueRegistry; + class TransactionContext; class Exchange; /** diff --git a/qpid/cpp/src/qpid/broker/CompletionHandler.h b/qpid/cpp/src/qpid/broker/CompletionHandler.h deleted file mode 100644 index 9d51656282..0000000000 --- a/qpid/cpp/src/qpid/broker/CompletionHandler.h +++ /dev/null @@ -1,39 +0,0 @@ -#ifndef _broker_CompletionHandler_h -#define _broker_CompletionHandler_h - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -namespace qpid { -namespace broker { - -/** - * Callback interface to handle completion of a message. - */ -class CompletionHandler -{ - public: - virtual ~CompletionHandler(){} - virtual void complete(Message::shared_ptr) = 0; -}; - -}} // namespace qpid::broker - - - -#endif /*!_broker_CompletionHandler_h*/ diff --git a/qpid/cpp/src/qpid/broker/Consumer.h b/qpid/cpp/src/qpid/broker/Consumer.h index d0c397d184..dc229947b9 100644 --- a/qpid/cpp/src/qpid/broker/Consumer.h +++ b/qpid/cpp/src/qpid/broker/Consumer.h @@ -21,7 +21,7 @@ #ifndef _Consumer_ #define _Consumer_ -#include "BrokerMessage.h" +#include "Message.h" namespace qpid { namespace broker { diff --git a/qpid/cpp/src/qpid/broker/Content.h b/qpid/cpp/src/qpid/broker/Content.h deleted file mode 100644 index 97dce0d3f7..0000000000 --- a/qpid/cpp/src/qpid/broker/Content.h +++ /dev/null @@ -1,64 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _Content_ -#define _Content_ - -#include <boost/function.hpp> - -#include "qpid/framing/AMQContentBody.h" -#include "qpid/framing/Buffer.h" -#include "qpid/framing/OutputHandler.h" - -namespace qpid { - -namespace framing { -class ChannelAdapter; -} - -namespace broker { -class Content{ - public: - typedef std::string DataBlock; - typedef boost::function1<void, const DataBlock&> SendFn; - - virtual ~Content(){} - - /** Add a block of data to the content */ - virtual void add(framing::AMQContentBody* data) = 0; - - /** Total size of content in bytes */ - virtual uint32_t size() = 0; - - /** - * Iterate over the content calling SendFn for each block. - * Subdivide blocks if necessary to ensure each block is - * <= framesize bytes long. - */ - virtual void send(framing::ChannelAdapter& channel, uint32_t framesize) = 0; - - //FIXME aconway 2007-02-07: This is inconsistently implemented - //find out what is needed. - virtual void encode(qpid::framing::Buffer& buffer) = 0; -}; -}} - - -#endif diff --git a/qpid/cpp/src/qpid/broker/DeliverableMessage.h b/qpid/cpp/src/qpid/broker/DeliverableMessage.h index e8c4f5ba19..9719d972fc 100644 --- a/qpid/cpp/src/qpid/broker/DeliverableMessage.h +++ b/qpid/cpp/src/qpid/broker/DeliverableMessage.h @@ -22,8 +22,8 @@ #define _DeliverableMessage_ #include "Deliverable.h" -#include "BrokerMessage.h" #include "BrokerQueue.h" +#include "Message.h" namespace qpid { namespace broker { diff --git a/qpid/cpp/src/qpid/broker/DeliveryAdapter.h b/qpid/cpp/src/qpid/broker/DeliveryAdapter.h index d59c4769d7..f645b37c23 100644 --- a/qpid/cpp/src/qpid/broker/DeliveryAdapter.h +++ b/qpid/cpp/src/qpid/broker/DeliveryAdapter.h @@ -21,9 +21,9 @@ #ifndef _DeliveryAdapter_ #define _DeliveryAdapter_ -#include "BrokerMessageBase.h" #include "DeliveryId.h" #include "DeliveryToken.h" +#include "Message.h" #include "qpid/framing/amqp_types.h" namespace qpid { diff --git a/qpid/cpp/src/qpid/broker/DeliveryRecord.h b/qpid/cpp/src/qpid/broker/DeliveryRecord.h index 745a246c78..a1f82cb757 100644 --- a/qpid/cpp/src/qpid/broker/DeliveryRecord.h +++ b/qpid/cpp/src/qpid/broker/DeliveryRecord.h @@ -25,10 +25,10 @@ #include <list> #include <ostream> #include "AccumulatedAck.h" -#include "BrokerMessage.h" -#include "Prefetch.h" #include "BrokerQueue.h" #include "DeliveryId.h" +#include "Message.h" +#include "Prefetch.h" namespace qpid { namespace broker { diff --git a/qpid/cpp/src/qpid/broker/DirectExchange.h b/qpid/cpp/src/qpid/broker/DirectExchange.h index 554be295bf..7b20bd610c 100644 --- a/qpid/cpp/src/qpid/broker/DirectExchange.h +++ b/qpid/cpp/src/qpid/broker/DirectExchange.h @@ -25,7 +25,6 @@ #include <vector> #include "BrokerExchange.h" #include "qpid/framing/FieldTable.h" -#include "BrokerMessage.h" #include "qpid/sys/Monitor.h" #include "BrokerQueue.h" diff --git a/qpid/cpp/src/qpid/broker/FanOutExchange.h b/qpid/cpp/src/qpid/broker/FanOutExchange.h index 3cbffc6f2f..070e438bcc 100644 --- a/qpid/cpp/src/qpid/broker/FanOutExchange.h +++ b/qpid/cpp/src/qpid/broker/FanOutExchange.h @@ -25,7 +25,6 @@ #include <vector> #include "BrokerExchange.h" #include "qpid/framing/FieldTable.h" -#include "BrokerMessage.h" #include "qpid/sys/Monitor.h" #include "BrokerQueue.h" diff --git a/qpid/cpp/src/qpid/broker/HeadersExchange.h b/qpid/cpp/src/qpid/broker/HeadersExchange.h index a99cc1c92c..48d115c1ec 100644 --- a/qpid/cpp/src/qpid/broker/HeadersExchange.h +++ b/qpid/cpp/src/qpid/broker/HeadersExchange.h @@ -24,7 +24,6 @@ #include <vector> #include "BrokerExchange.h" #include "qpid/framing/FieldTable.h" -#include "BrokerMessage.h" #include "qpid/sys/Monitor.h" #include "BrokerQueue.h" diff --git a/qpid/cpp/src/qpid/broker/InMemoryContent.cpp b/qpid/cpp/src/qpid/broker/InMemoryContent.cpp deleted file mode 100644 index d69dcfafe7..0000000000 --- a/qpid/cpp/src/qpid/broker/InMemoryContent.cpp +++ /dev/null @@ -1,70 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "InMemoryContent.h" -#include "qpid/framing/AMQFrame.h" -#include "qpid/framing/ChannelAdapter.h" - -using namespace qpid::broker; -using namespace qpid::framing; -using boost::static_pointer_cast; - -void InMemoryContent::add(AMQContentBody* data) -{ - content.push_back(*data); -} - -uint32_t InMemoryContent::size() -{ - int sum(0); - for (content_iterator i = content.begin(); i != content.end(); i++) { - sum += i->size(); - } - return sum; -} - -void InMemoryContent::send(ChannelAdapter& channel, uint32_t framesize) -{ - for (content_iterator i = content.begin(); i != content.end(); i++) { - if (i->size() > framesize) { - uint32_t offset = 0; - for (int chunk = i->size() / framesize; chunk > 0; chunk--) { - string data = i->getData().substr(offset, framesize); - channel.send(AMQContentBody(data)); - offset += framesize; - } - uint32_t remainder = i->size() % framesize; - if (remainder) { - string data = i->getData().substr(offset, remainder); - channel.send(AMQContentBody(data)); - } - } else { - channel.send(*i); - } - } -} - -void InMemoryContent::encode(Buffer& buffer) -{ - for (content_iterator i = content.begin(); i != content.end(); i++) { - i->encode(buffer); - } -} - diff --git a/qpid/cpp/src/qpid/broker/LazyLoadedContent.cpp b/qpid/cpp/src/qpid/broker/LazyLoadedContent.cpp deleted file mode 100644 index b8b5b37f45..0000000000 --- a/qpid/cpp/src/qpid/broker/LazyLoadedContent.cpp +++ /dev/null @@ -1,68 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "LazyLoadedContent.h" -#include "qpid/framing/AMQFrame.h" -#include "qpid/framing/ChannelAdapter.h" - -using namespace qpid::broker; -using namespace qpid::framing; - -LazyLoadedContent::~LazyLoadedContent() -{ - store->destroy(*msg); -} - -LazyLoadedContent::LazyLoadedContent(MessageStore* const _store, Message* const _msg, uint64_t _expectedSize) : - store(_store), msg(_msg), expectedSize(_expectedSize) {} - -void LazyLoadedContent::add(AMQContentBody* data) -{ - store->appendContent(*msg, data->getData()); -} - -uint32_t LazyLoadedContent::size() -{ - return 0;//all content is written as soon as it is added -} - -void LazyLoadedContent::send(ChannelAdapter& channel, uint32_t framesize) -{ - if (expectedSize > framesize) { - for (uint64_t offset = 0; offset < expectedSize; offset += framesize) - { - uint64_t remaining = expectedSize - offset; - string data; - store->loadContent(*msg, data, offset, - remaining > framesize ? framesize : remaining); - channel.send(AMQContentBody(data)); - } - } else { - string data; - store->loadContent(*msg, data, 0, expectedSize); - channel.send(AMQContentBody(data)); - } -} - -void LazyLoadedContent::encode(Buffer&) -{ - //do nothing as all content is written as soon as it is added -} - diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp new file mode 100644 index 0000000000..e5f92297b7 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/Message.cpp @@ -0,0 +1,195 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Message.h" +#include "ExchangeRegistry.h" +#include "qpid/framing/frame_functors.h" +#include "qpid/framing/BasicPublishBody.h" +#include "qpid/framing/FieldTable.h" +#include "qpid/framing/MessageTransferBody.h" +#include "qpid/framing/SendContent.h" +#include "qpid/framing/SequenceNumber.h" +#include "qpid/framing/TypeFilter.h" + +using namespace qpid::broker; +using namespace qpid::framing; +using std::string; + +TransferAdapter Message::TRANSFER; +PublishAdapter Message::PUBLISH; + +Message::Message(const SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), publisher(0), store(0), adapter(0) {} + +const std::string& Message::getRoutingKey() const +{ + return getAdapter().getRoutingKey(frames); +} + +const std::string& Message::getExchangeName() const +{ + return getAdapter().getExchange(frames); +} + +const boost::shared_ptr<Exchange> Message::getExchange(ExchangeRegistry& registry) const +{ + if (!exchange) { + exchange = registry.get(getExchangeName()); + } + return exchange; +} + +bool Message::isImmediate() const +{ + return getAdapter().isImmediate(frames); +} + +const FieldTable& Message::getApplicationHeaders() const +{ + return getAdapter().getApplicationHeaders(frames); +} + +bool Message::isPersistent() +{ + return getAdapter().isPersistent(frames); +} + +uint32_t Message::getRequiredCredit() const +{ + //add up payload for all header and content frames in the frameset + SumBodySize sum; + frames.map_if(sum, TypeFilter(METHOD_BODY, HEADER_BODY)); + return sum.getSize(); +} + +void Message::encode(framing::Buffer& buffer) const +{ + //encode method and header frames + EncodeFrame f1(buffer); + frames.map_if(f1, TypeFilter(METHOD_BODY, HEADER_BODY)); + + //then encode the payload of each content frame + EncodeBody f2(buffer); + frames.map_if(f2, TypeFilter(CONTENT_BODY)); +} + +uint32_t Message::encodedSize() const +{ + return encodedHeaderSize() + encodedContentSize(); +} + +uint32_t Message::encodedContentSize() const +{ + return frames.getContentSize(); +} + +uint32_t Message::encodedHeaderSize() const +{ + //add up the size for all method and header frames in the frameset + SumFrameSize sum; + frames.map_if(sum, TypeFilter(METHOD_BODY, HEADER_BODY)); + return sum.getSize(); +} + +void Message::decodeHeader(framing::Buffer& buffer) +{ + AMQFrame method; + method.decode(buffer); + frames.append(method); + + AMQFrame header; + header.decode(buffer); + frames.append(header); +} + +void Message::decodeContent(framing::Buffer& buffer) +{ + //get the data as a string and set that as the content + //body on a frame then add that frame to the frameset + AMQFrame frame; + frame.setBody(AMQContentBody()); + frame.castBody<AMQContentBody>()->decode(buffer, buffer.available()); + frames.append(frame); +} + +void Message::releaseContent(MessageStore* _store) +{ + store = _store; + if (!getPersistenceId()) { + store->stage(*this); + } + //remove any content frames from the frameset + frames.remove(TypeFilter(CONTENT_BODY)); +} + +void Message::sendContent(framing::FrameHandler& out, uint16_t channel, uint16_t maxFrameSize) +{ + if (isContentReleased()) { + //load content from store in chunks of maxContentSize + uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead(); + uint64_t expectedSize(frames.getHeaders()->getContentLength());//TODO: how do we know how much data to load? + for (uint64_t offset = 0; offset < expectedSize; offset += maxContentSize) + { + uint64_t remaining = expectedSize - offset; + AMQFrame frame(channel, AMQContentBody()); + string& data = frame.castBody<AMQContentBody>()->getData(); + + store->loadContent(*this, data, offset, + remaining > maxContentSize ? maxContentSize : remaining); + out.handle(frame); + } + + } else { + SendContent f(out, channel, maxFrameSize); + frames.map_if(f, TypeFilter(CONTENT_BODY)); + } +} + +void Message::sendHeader(framing::FrameHandler& out, uint16_t channel, uint16_t /*maxFrameSize*/) +{ + Relay f(out, channel); + frames.map_if(f, TypeFilter(HEADER_BODY)); +} + +MessageAdapter& Message::getAdapter() const +{ + if (!adapter) { + if (frames.isA<BasicPublishBody>()) { + adapter = &PUBLISH; + } else if(frames.isA<MessageTransferBody>()) { + adapter = &TRANSFER; + } else { + const AMQMethodBody* method = frames.getMethod(); + if (!method) throw Exception("Can't adapt message with no method"); + else throw Exception(QPID_MSG("Can't adapt message based on " << *method)); + } + } + return *adapter; +} + +uint64_t Message::contentSize() const +{ + return frames.getContentSize(); +} + +bool Message::isContentLoaded() const +{ + return contentSize() > 0; +} diff --git a/qpid/cpp/src/qpid/broker/Message.h b/qpid/cpp/src/qpid/broker/Message.h new file mode 100644 index 0000000000..95b3f38b55 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/Message.h @@ -0,0 +1,139 @@ +#ifndef _broker_Message_h +#define _broker_Message_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <string> +#include <boost/shared_ptr.hpp> +#include <boost/variant.hpp> +#include "PersistableMessage.h" +#include "MessageAdapter.h" +#include "qpid/framing/amqp_types.h" + +namespace qpid { + +namespace framing { +class FieldTable; +class SequenceNumber; +} + +namespace broker { +class ConnectionToken; +class Exchange; +class ExchangeRegistry; +class MessageStore; + +class Message : public PersistableMessage { +public: + typedef boost::shared_ptr<Message> shared_ptr; + + Message(const framing::SequenceNumber& id = framing::SequenceNumber()); + + uint64_t getPersistenceId() const { return persistenceId; } + void setPersistenceId(uint64_t _persistenceId) const { persistenceId = _persistenceId; } + + bool getRedelivered() const { return redelivered; } + void redeliver() { redelivered = true; } + + const ConnectionToken* getPublisher() const { return publisher; } + void setPublisher(ConnectionToken* p) { publisher = p; } + + uint64_t contentSize() const; + + const std::string& getRoutingKey() const; + const boost::shared_ptr<Exchange> getExchange(ExchangeRegistry&) const; + const std::string& getExchangeName() const; + bool isImmediate() const; + const framing::FieldTable& getApplicationHeaders() const; + bool isPersistent(); + + framing::FrameSet& getFrames() { return frames; } + const framing::FrameSet& getFrames() const { return frames; } + + template <class T> T* getProperties() { + return frames.getHeaders()->get<T>(true); + } + + template <class T> const T* getProperties() const { + return frames.getHeaders()->get<T>(); + } + + template <class T> const T* getMethod() const { + return frames.as<T>(); + } + + template <class T> bool isA() const { + return frames.isA<T>(); + } + + uint32_t getRequiredCredit() const; + + void encode(framing::Buffer& buffer) const; + + /** + * @returns the size of the buffer needed to encode this + * message in its entirety + */ + uint32_t encodedSize() const; + /** + * @returns the size of the buffer needed to encode the + * 'header' of this message (not just the header frame, + * but other meta data e.g.routing key and exchange) + */ + uint32_t encodedHeaderSize() const; + uint32_t encodedContentSize() const; + + void decodeHeader(framing::Buffer& buffer); + void decodeContent(framing::Buffer& buffer); + + /** + * Releases the in-memory content data held by this + * message. Must pass in a store from which the data can + * be reloaded. + */ + void releaseContent(MessageStore* store); + + void sendContent(framing::FrameHandler& out, uint16_t channel, uint16_t maxFrameSize); + void sendHeader(framing::FrameHandler& out, uint16_t channel, uint16_t maxFrameSize); + + bool isContentLoaded() const; + + private: + framing::FrameSet frames; + mutable boost::shared_ptr<Exchange> exchange; + mutable uint64_t persistenceId; + bool redelivered; + ConnectionToken* publisher; + MessageStore* store; + mutable MessageAdapter* adapter; + + static TransferAdapter TRANSFER; + static PublishAdapter PUBLISH; + + MessageAdapter& getAdapter() const; + bool isContentReleased() { return store; } +}; + +}} + + +#endif diff --git a/qpid/cpp/src/qpid/broker/MessageAdapter.h b/qpid/cpp/src/qpid/broker/MessageAdapter.h new file mode 100644 index 0000000000..0b2dc6307a --- /dev/null +++ b/qpid/cpp/src/qpid/broker/MessageAdapter.h @@ -0,0 +1,108 @@ +#ifndef _broker_MessageAdapter_h +#define _broker_MessageAdapter_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <string> +#include "qpid/framing/BasicPublishBody.h" +#include "qpid/framing/BasicHeaderProperties.h" +#include "qpid/framing/FieldTable.h" +#include "qpid/framing/FrameSet.h" +#include "qpid/framing/DeliveryProperties.h" +#include "qpid/framing/MessageProperties.h" +#include "qpid/framing/MessageTransferBody.h" + +namespace qpid { +namespace broker { + +struct MessageAdapter +{ + virtual ~MessageAdapter() {} + + virtual const std::string& getRoutingKey(const framing::FrameSet& f) = 0; + virtual const std::string& getExchange(const framing::FrameSet& f) = 0; + virtual bool isImmediate(const framing::FrameSet& f) = 0; + virtual const framing::FieldTable& getApplicationHeaders(const framing::FrameSet& f) = 0; + virtual bool isPersistent(const framing::FrameSet& f) = 0; +}; + +struct PublishAdapter : MessageAdapter +{ + const std::string& getRoutingKey(const framing::FrameSet& f) + { + return f.as<framing::BasicPublishBody>()->getRoutingKey(); + } + + const std::string& getExchange(const framing::FrameSet& f) + { + return f.as<framing::BasicPublishBody>()->getExchange(); + } + + bool isImmediate(const framing::FrameSet& f) + { + return f.as<framing::BasicPublishBody>()->getImmediate(); + } + + const framing::FieldTable& getApplicationHeaders(const framing::FrameSet& f) + { + return f.getHeaders()->get<framing::BasicHeaderProperties>()->getHeaders(); + } + + bool isPersistent(const framing::FrameSet& f) + { + return f.getHeaders()->get<framing::BasicHeaderProperties>()->getDeliveryMode() == 2; + } +}; + +struct TransferAdapter : MessageAdapter +{ + const std::string& getRoutingKey(const framing::FrameSet& f) + { + return f.getHeaders()->get<framing::DeliveryProperties>()->getRoutingKey(); + } + + const std::string& getExchange(const framing::FrameSet& f) + { + return f.as<framing::MessageTransferBody>()->getDestination(); + } + + bool isImmediate(const framing::FrameSet&) + { + //TODO: we seem to have lost the immediate flag + return false; + } + + const framing::FieldTable& getApplicationHeaders(const framing::FrameSet& f) + { + return f.getHeaders()->get<framing::MessageProperties>()->getApplicationHeaders(); + } + + bool isPersistent(const framing::FrameSet& f) + { + return f.getHeaders()->get<framing::DeliveryProperties>()->getDeliveryMode() == 2; + } +}; + +}} + + +#endif diff --git a/qpid/cpp/src/qpid/broker/MessageBuilder.cpp b/qpid/cpp/src/qpid/broker/MessageBuilder.cpp index f19927b708..1a84aa9b65 100644 --- a/qpid/cpp/src/qpid/broker/MessageBuilder.cpp +++ b/qpid/cpp/src/qpid/broker/MessageBuilder.cpp @@ -20,55 +20,64 @@ */ #include "MessageBuilder.h" -#include "InMemoryContent.h" -#include "LazyLoadedContent.h" +#include "Message.h" +#include "MessageStore.h" +#include "qpid/Exception.h" +#include "qpid/framing/AMQFrame.h" using namespace qpid::broker; using namespace qpid::framing; -using std::auto_ptr; -MessageBuilder::MessageBuilder(CompletionHandler* _handler, - MessageStore* const _store, - uint64_t _stagingThreshold -) : - handler(_handler), - store(_store), - stagingThreshold(_stagingThreshold) -{} +MessageBuilder::MessageBuilder(MessageStore* const _store, uint64_t _stagingThreshold) : + state(DORMANT), store(_store), stagingThreshold(_stagingThreshold), staging(false) {} -void MessageBuilder::route(){ - if (message->isComplete()) { - if (handler) handler->complete(message); - message.reset(); +void MessageBuilder::handle(AMQFrame& frame) +{ + switch(state) { + case METHOD: + checkType(METHOD_BODY, frame.getBody()->type()); + state = HEADER; + break; + case HEADER: + checkType(HEADER_BODY, frame.getBody()->type()); + state = CONTENT; + break; + case CONTENT: + checkType(CONTENT_BODY, frame.getBody()->type()); + break; + default: + throw ConnectionException(504, "Invalid frame sequence for message."); + } + if (staging) { + store->appendContent(*message, frame.castBody<AMQContentBody>()->getData()); + } else { + message->getFrames().append(frame); + //have we reached the staging limit? if so stage message and release content + if (state == CONTENT && stagingThreshold && message->getFrames().getContentSize() >= stagingThreshold) { + store->stage(*message); + message->releaseContent(store); + staging = true; + } } } -void MessageBuilder::initialise(Message::shared_ptr& msg){ - if(message.get()){ - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got publish before previous content was completed."); +void MessageBuilder::checkType(uint8_t expected, uint8_t actual) +{ + if (expected != actual) { + throw ConnectionException(504, "Invalid frame sequence for message."); } - message = msg; } -void MessageBuilder::setHeader(AMQHeaderBody* header){ - if(!message.get()){ - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got header before publish."); - } - message->setHeader(header); - if (stagingThreshold && header->getContentSize() >= stagingThreshold) { - store->stage(*message); - message->releaseContent(store); - } else { - auto_ptr<Content> content(new InMemoryContent()); - message->setContent(content); - } - route(); +void MessageBuilder::end() +{ + message.reset(); + state = DORMANT; + staging = false; } -void MessageBuilder::addContent(AMQContentBody* content){ - if(!message.get()){ - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got content before publish."); - } - message->addContent(content); - route(); +void MessageBuilder::start(const SequenceNumber& id) +{ + message = Message::shared_ptr(new Message(id)); + state = METHOD; + staging = false; } diff --git a/qpid/cpp/src/qpid/broker/MessageBuilder.h b/qpid/cpp/src/qpid/broker/MessageBuilder.h index 18e85d7383..134f93b68f 100644 --- a/qpid/cpp/src/qpid/broker/MessageBuilder.h +++ b/qpid/cpp/src/qpid/broker/MessageBuilder.h @@ -21,37 +21,35 @@ #ifndef _MessageBuilder_ #define _MessageBuilder_ -#include <memory> -#include "qpid/QpidError.h" -#include "BrokerExchange.h" -#include "BrokerMessage.h" -#include "MessageStore.h" -#include "qpid/framing/AMQContentBody.h" -#include "qpid/framing/AMQHeaderBody.h" -#include "qpid/framing/BasicPublishBody.h" -#include "CompletionHandler.h" +#include "boost/shared_ptr.hpp" +#include "qpid/framing/FrameHandler.h" +#include "qpid/framing/SequenceNumber.h" namespace qpid { namespace broker { - class MessageBuilder{ + class Message; + class MessageStore; + + class MessageBuilder : public framing::FrameHandler{ public: - MessageBuilder(CompletionHandler* _handler, - MessageStore* const store = 0, - uint64_t stagingThreshold = 0); - void initialise(Message::shared_ptr& msg); - void setHeader(framing::AMQHeaderBody* header); - void addContent(framing::AMQContentBody* content); - Message::shared_ptr getMessage() { return message; } + MessageBuilder(MessageStore* const store = 0, uint64_t stagingThreshold = 0); + void handle(framing::AMQFrame& frame); + boost::shared_ptr<Message> getMessage() { return message; } + void start(const framing::SequenceNumber& id); + void end(); private: - Message::shared_ptr message; - CompletionHandler* handler; + enum State {DORMANT, METHOD, HEADER, CONTENT}; + State state; + boost::shared_ptr<Message> message; MessageStore* const store; const uint64_t stagingThreshold; + bool staging; - void route(); + void checkType(uint8_t expected, uint8_t actual); }; } } #endif + diff --git a/qpid/cpp/src/qpid/broker/MessageDelivery.cpp b/qpid/cpp/src/qpid/broker/MessageDelivery.cpp new file mode 100644 index 0000000000..09ab8ec465 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/MessageDelivery.cpp @@ -0,0 +1,140 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "MessageDelivery.h" + +#include "DeliveryToken.h" +#include "Message.h" +#include "BrokerQueue.h" +#include "qpid/framing/ChannelAdapter.h" +#include "qpid/framing/BasicDeliverBody.h" +#include "qpid/framing/BasicGetOkBody.h" +#include "qpid/framing/MessageTransferBody.h" + + +using namespace boost; +using namespace qpid::broker; +using namespace qpid::framing; + +namespace qpid{ +namespace broker{ + +struct BaseToken : DeliveryToken +{ + virtual ~BaseToken() {} + virtual void sendMethod(Message::shared_ptr msg, ChannelAdapter& channel, DeliveryId id) = 0; +}; + +struct BasicGetToken : BaseToken +{ + typedef boost::shared_ptr<BasicGetToken> shared_ptr; + + Queue::shared_ptr queue; + + BasicGetToken(Queue::shared_ptr q) : queue(q) {} + + void sendMethod(Message::shared_ptr msg, ChannelAdapter& channel, DeliveryId id) + { + channel.send(BasicGetOkBody( + channel.getVersion(), id.getValue(), msg->getRedelivered(), msg->getExchangeName(), + msg->getRoutingKey(), queue->getMessageCount())); + + } +}; + +struct BasicConsumeToken : BaseToken +{ + typedef boost::shared_ptr<BasicConsumeToken> shared_ptr; + + const string consumer; + + BasicConsumeToken(const string c) : consumer(c) {} + + void sendMethod(Message::shared_ptr msg, ChannelAdapter& channel, DeliveryId id) + { + channel.send(BasicDeliverBody( + channel.getVersion(), consumer, id.getValue(), + msg->getRedelivered(), msg->getExchangeName(), msg->getRoutingKey())); + } + +}; + +struct MessageDeliveryToken : BaseToken +{ + const std::string destination; + const u_int8_t confirmMode; + const u_int8_t acquireMode; + + MessageDeliveryToken(const std::string& d, u_int8_t c, u_int8_t a) : + destination(d), confirmMode(c), acquireMode(a) {} + + void sendMethod(Message::shared_ptr msg, ChannelAdapter& channel, DeliveryId /*id*/) + { + //TODO; need to figure out how the acquire mode gets + //communicated (this is just a temporary solution) + channel.send(MessageTransferBody(channel.getVersion(), 0, destination, confirmMode, acquireMode)); + + //may need to set the redelivered flag: + if (msg->getRedelivered()){ + msg->getProperties<DeliveryProperties>()->setRedelivered(true); + } + } +}; + +} +} + +DeliveryToken::shared_ptr MessageDelivery::getBasicGetToken(Queue::shared_ptr queue) +{ + return DeliveryToken::shared_ptr(new BasicGetToken(queue)); +} + +DeliveryToken::shared_ptr MessageDelivery::getBasicConsumeToken(const string& consumer) +{ + return DeliveryToken::shared_ptr(new BasicConsumeToken(consumer)); +} + +DeliveryToken::shared_ptr MessageDelivery::getMessageDeliveryToken(const std::string& destination, + u_int8_t confirmMode, u_int8_t acquireMode) +{ + return DeliveryToken::shared_ptr(new MessageDeliveryToken(destination, confirmMode, acquireMode)); +} + +void MessageDelivery::deliver(Message::shared_ptr msg, + framing::ChannelAdapter& channel, + DeliveryId id, + DeliveryToken::shared_ptr token, + uint16_t framesize) +{ + //currently a message published from one class and delivered to + //another may well have the wrong headers; however we will only + //have one content class for 0-10 proper + + //send method + boost::shared_ptr<BaseToken> t = dynamic_pointer_cast<BaseToken>(token); + t->sendMethod(msg, channel, id); + + boost::shared_ptr<FrameHandler> handler = channel.getHandlers().out; + //send header + msg->sendHeader(*handler, channel.getId(), framesize); + + //send content + msg->sendContent(*handler, channel.getId(), framesize); +} diff --git a/qpid/cpp/src/qpid/broker/MessageDelivery.h b/qpid/cpp/src/qpid/broker/MessageDelivery.h new file mode 100644 index 0000000000..b87ef2a5ce --- /dev/null +++ b/qpid/cpp/src/qpid/broker/MessageDelivery.h @@ -0,0 +1,60 @@ +#ifndef _broker_MessageDelivery_h +#define _broker_MessageDelivery_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <boost/shared_ptr.hpp> +#include "DeliveryId.h" + +namespace qpid { + +namespace framing { + +class ChannelAdapter; + +} + +namespace broker { + +class DeliveryToken; +class Message; +class Queue; + +/** + * Encapsulates the different options for message delivery currently supported. + */ +class MessageDelivery { +public: + static boost::shared_ptr<DeliveryToken> getBasicGetToken(boost::shared_ptr<Queue> queue); + static boost::shared_ptr<DeliveryToken> getBasicConsumeToken(const std::string& consumer); + static boost::shared_ptr<DeliveryToken> getMessageDeliveryToken(const std::string& destination, + u_int8_t confirmMode, + u_int8_t acquireMode); + + static void deliver(boost::shared_ptr<Message> msg, framing::ChannelAdapter& channel, + DeliveryId deliveryTag, boost::shared_ptr<DeliveryToken> token, uint16_t framesize); +}; + +} +} + + +#endif /*!_broker_MessageDelivery_h*/ diff --git a/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp index ce1fa1e028..a4ceb77c12 100644 --- a/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp +++ b/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp @@ -22,7 +22,7 @@ #include "qpid/framing/FramingContent.h" #include "Connection.h" #include "Broker.h" -#include "BrokerMessageMessage.h" +#include "MessageDelivery.h" #include "qpid/framing/MessageAppendBody.h" #include "qpid/framing/MessageTransferBody.h" #include "BrokerAdapter.h" @@ -55,7 +55,7 @@ MessageHandlerImpl::open(const string& /*reference*/) } void -MessageHandlerImpl::append(const framing::AMQMethodBody& ) +MessageHandlerImpl::append(const std::string& /*reference*/, const std::string& /*bytes*/) { throw ConnectionException(540, "References no longer supported"); } @@ -92,7 +92,7 @@ MessageHandlerImpl::subscribe(uint16_t /*ticket*/, const string& destination, bool noLocal, u_int8_t confirmMode, - u_int8_t /*acquireMode*/,//TODO: implement acquire modes + u_int8_t acquireMode,//TODO: implement acquire modes bool exclusive, const framing::FieldTable& filter ) { @@ -101,7 +101,8 @@ MessageHandlerImpl::subscribe(uint16_t /*ticket*/, throw ConnectionException(530, "Consumer tags must be unique"); string tag = destination; - channel.consume(MessageMessage::getToken(destination), tag, queue, noLocal, confirmMode == 1, exclusive, &filter); + channel.consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode), + tag, queue, noLocal, confirmMode == 1, exclusive, &filter); // Dispatch messages as there is now a consumer. queue->requestDispatch(); } @@ -115,7 +116,7 @@ MessageHandlerImpl::get(uint16_t /*ticket*/, { Queue::shared_ptr queue = getQueue(queueName); - if (channel.get(MessageMessage::getToken(destination), queue, !noAck)){ + if (channel.get(MessageDelivery::getMessageDeliveryToken(destination, noAck ? 0 : 1, 0), queue, !noAck)){ //don't send any response... rely on execution completion } else { //temporarily disabled: @@ -160,20 +161,6 @@ MessageHandlerImpl::reject(const SequenceNumberSet& /*transfers*/, uint16_t /*co //TODO: implement } -void -MessageHandlerImpl::transfer(const framing::AMQMethodBody& context) -{ - const MessageTransferBody* transfer = boost::polymorphic_downcast<const MessageTransferBody*>(&context); - if (transfer->getBody().isInline()) { - MessageMessage::shared_ptr message(new MessageMessage(&connection, transfer)); - channel.handleInlineTransfer(message); - } else { - throw ConnectionException(540, "References no longer supported"); - } -} - - - void MessageHandlerImpl::flow(const std::string& destination, u_int8_t unit, u_int32_t value) { diff --git a/qpid/cpp/src/qpid/broker/MessageHandlerImpl.h b/qpid/cpp/src/qpid/broker/MessageHandlerImpl.h index f4d9fa0c76..35d34bf94e 100644 --- a/qpid/cpp/src/qpid/broker/MessageHandlerImpl.h +++ b/qpid/cpp/src/qpid/broker/MessageHandlerImpl.h @@ -23,7 +23,6 @@ #include "qpid/framing/AMQP_ServerOperations.h" #include "qpid/framing/AMQP_ClientProxy.h" -#include "Reference.h" #include "HandlerImpl.h" namespace qpid { @@ -40,7 +39,7 @@ class MessageHandlerImpl : public: MessageHandlerImpl(CoreRefs& parent); - void append(const framing::AMQMethodBody& context); + void append(const std::string& reference, const std::string& bytes); void cancel(const std::string& destination ); @@ -75,8 +74,6 @@ class MessageHandlerImpl : void resume(const std::string& reference, const std::string& identifier ); - void transfer(const framing::AMQMethodBody& context); - void flow(const std::string& destination, u_int8_t unit, u_int32_t value); void flowMode(const std::string& destination, u_int8_t mode); @@ -98,8 +95,6 @@ class MessageHandlerImpl : bool exclusive, const framing::FieldTable& filter); - private: - ReferenceRegistry references; }; }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/MessageStoreModule.h b/qpid/cpp/src/qpid/broker/MessageStoreModule.h index 0da12a1a75..1254c3890b 100644 --- a/qpid/cpp/src/qpid/broker/MessageStoreModule.h +++ b/qpid/cpp/src/qpid/broker/MessageStoreModule.h @@ -21,7 +21,6 @@ #ifndef _MessageStoreModule_ #define _MessageStoreModule_ -#include "BrokerMessage.h" #include "MessageStore.h" #include "BrokerQueue.h" #include "RecoveryManager.h" diff --git a/qpid/cpp/src/qpid/broker/NameGenerator.h b/qpid/cpp/src/qpid/broker/NameGenerator.h index affcedba41..6ea25c9797 100644 --- a/qpid/cpp/src/qpid/broker/NameGenerator.h +++ b/qpid/cpp/src/qpid/broker/NameGenerator.h @@ -21,7 +21,7 @@ #ifndef _NameGenerator_ #define _NameGenerator_ -#include "BrokerMessage.h" +#include <string> namespace qpid { namespace broker { diff --git a/qpid/cpp/src/qpid/broker/NullMessageStore.h b/qpid/cpp/src/qpid/broker/NullMessageStore.h index 0d5a5b55f9..95f55f21b9 100644 --- a/qpid/cpp/src/qpid/broker/NullMessageStore.h +++ b/qpid/cpp/src/qpid/broker/NullMessageStore.h @@ -22,7 +22,6 @@ #define _NullMessageStore_ #include <set> -#include "BrokerMessage.h" #include "MessageStore.h" #include "BrokerQueue.h" diff --git a/qpid/cpp/src/qpid/broker/PersistableExchange.h b/qpid/cpp/src/qpid/broker/PersistableExchange.h index 9ba883cec0..683b740ddc 100644 --- a/qpid/cpp/src/qpid/broker/PersistableExchange.h +++ b/qpid/cpp/src/qpid/broker/PersistableExchange.h @@ -35,7 +35,7 @@ namespace broker { class PersistableExchange : public Persistable { public: - virtual std::string getName() const = 0; + virtual const std::string& getName() const = 0; virtual ~PersistableExchange() {}; }; diff --git a/qpid/cpp/src/qpid/broker/PersistableMessage.h b/qpid/cpp/src/qpid/broker/PersistableMessage.h index e47ca0ae48..06fc59107e 100644 --- a/qpid/cpp/src/qpid/broker/PersistableMessage.h +++ b/qpid/cpp/src/qpid/broker/PersistableMessage.h @@ -34,7 +34,7 @@ namespace broker { * The interface messages must expose to the MessageStore in order to * be persistable. */ - class PersistableMessage : public Persistable +class PersistableMessage : public Persistable { @@ -72,10 +72,11 @@ public: virtual uint32_t encodedHeaderSize() const = 0; virtual ~PersistableMessage() {}; + PersistableMessage(): - enqueueCompleted(false), - asyncCounter(0), - dequeueCompleted(false){}; + enqueueCompleted(false), + asyncCounter(0), + dequeueCompleted(false){}; inline bool isEnqueueComplete() {return enqueueCompleted;}; inline void enqueueComplete() { diff --git a/qpid/cpp/src/qpid/broker/RecoveredDequeue.h b/qpid/cpp/src/qpid/broker/RecoveredDequeue.h index 9e0c334dc3..9dcc9d4233 100644 --- a/qpid/cpp/src/qpid/broker/RecoveredDequeue.h +++ b/qpid/cpp/src/qpid/broker/RecoveredDequeue.h @@ -25,7 +25,7 @@ #include <functional> #include <list> #include "Deliverable.h" -#include "BrokerMessage.h" +#include "Message.h" #include "MessageStore.h" #include "BrokerQueue.h" #include "TxOp.h" diff --git a/qpid/cpp/src/qpid/broker/RecoveredEnqueue.h b/qpid/cpp/src/qpid/broker/RecoveredEnqueue.h index 25c5baf364..a571343e93 100644 --- a/qpid/cpp/src/qpid/broker/RecoveredEnqueue.h +++ b/qpid/cpp/src/qpid/broker/RecoveredEnqueue.h @@ -25,7 +25,7 @@ #include <functional> #include <list> #include "Deliverable.h" -#include "BrokerMessage.h" +#include "Message.h" #include "MessageStore.h" #include "BrokerQueue.h" #include "TxOp.h" diff --git a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp index 954c50faee..29390a6452 100644 --- a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp +++ b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp @@ -20,8 +20,7 @@ */ #include "RecoveryManagerImpl.h" -#include "BrokerMessage.h" -#include "BrokerMessageMessage.h" +#include "Message.h" #include "BrokerQueue.h" #include "RecoveredEnqueue.h" #include "RecoveredDequeue.h" @@ -110,10 +109,7 @@ RecoverableMessage::shared_ptr RecoveryManagerImpl::recoverMessage(framing::Buff { buffer.record(); //peek at type: - Message::shared_ptr message(decodeMessageType(buffer) == MESSAGE ? - ((Message*) new MessageMessage()) : - ((Message*) new BasicMessage())); - buffer.restore(); + Message::shared_ptr message(new Message()); message->decodeHeader(buffer); return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(message, stagingThreshold)); } @@ -131,21 +127,6 @@ void RecoveryManagerImpl::recoveryComplete() //TODO (finalise binding setup etc) } -uint8_t RecoveryManagerImpl::decodeMessageType(framing::Buffer& buffer) -{ - return buffer.getOctet(); -} - -void RecoveryManagerImpl::encodeMessageType(const Message& msg, framing::Buffer& buffer) -{ - buffer.putOctet(dynamic_cast<const MessageMessage*>(&msg) ? MESSAGE : BASIC); -} - -uint32_t RecoveryManagerImpl::encodedMessageTypeSize() -{ - return 1; -} - bool RecoverableMessageImpl::loadContent(uint64_t available) { return !stagingThreshold || available < stagingThreshold; diff --git a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h index bcd71defb1..58ec63926c 100644 --- a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h +++ b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h @@ -45,10 +45,6 @@ namespace broker { RecoverableTransaction::shared_ptr recoverTransaction(const std::string& xid, std::auto_ptr<TPCTransactionContext> txn); void recoveryComplete(); - - static uint8_t decodeMessageType(framing::Buffer& buffer); - static void encodeMessageType(const Message& msg, framing::Buffer& buffer); - static uint32_t encodedMessageTypeSize(); }; diff --git a/qpid/cpp/src/qpid/broker/Reference.cpp b/qpid/cpp/src/qpid/broker/Reference.cpp deleted file mode 100644 index 283b231b60..0000000000 --- a/qpid/cpp/src/qpid/broker/Reference.cpp +++ /dev/null @@ -1,53 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <boost/bind.hpp> -#include "Reference.h" -#include "BrokerMessageMessage.h" -#include "qpid/QpidError.h" -#include "qpid/framing/MessageAppendBody.h" -#include "CompletionHandler.h" - -namespace qpid { -namespace broker { - -Reference::shared_ptr ReferenceRegistry::open(const Reference::Id& id) { - ReferenceMap::iterator i = references.find(id); - if (i != references.end()) - throw ConnectionException(503, "Attempt to re-open reference " +id); - return references[id] = Reference::shared_ptr(new Reference(id, this)); -} - -Reference::shared_ptr ReferenceRegistry::get(const Reference::Id& id) { - ReferenceMap::iterator i = references.find(id); - if (i == references.end()) - throw ConnectionException(503, "Attempt to use non-existent reference "+id); - return i->second; -} - -void Reference::append(const framing::MessageAppendBody& app) { - appends.push_back(app); - size += app.getBytes().length(); -} - -void Reference::close() { - messages.clear(); - registry->references.erase(getId()); -} - -}} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/Reference.h b/qpid/cpp/src/qpid/broker/Reference.h deleted file mode 100644 index 5a373fbeba..0000000000 --- a/qpid/cpp/src/qpid/broker/Reference.h +++ /dev/null @@ -1,115 +0,0 @@ -#ifndef _broker_Reference_h -#define _broker_Reference_h - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "qpid/framing/MessageAppendBody.h" - -#include <string> -#include <vector> -#include <map> -#include <boost/shared_ptr.hpp> -#include <boost/range.hpp> - -namespace qpid { - -namespace framing { -class MessageAppendBody; -} - -namespace broker { - -class MessageMessage; -class ReferenceRegistry; - -// FIXME aconway 2007-03-27: Merge with client::IncomingMessage -// to common reference handling code. - -/** - * A reference is an accumulation point for data in a multi-frame - * message. A reference can be used by multiple transfer commands to - * create multiple messages, so the reference tracks which commands - * are using it. When the reference is closed, all the associated - * transfers are completed. - * - * THREAD UNSAFE: per-channel resource, access to channels is - * serialized. - */ -class Reference -{ - public: - typedef std::string Id; - typedef boost::shared_ptr<Reference> shared_ptr; - typedef boost::shared_ptr<MessageMessage> MessagePtr; - typedef std::vector<MessagePtr> Messages; - typedef std::vector<framing::MessageAppendBody> Appends; - - Reference(const Id& id_=Id(), ReferenceRegistry* reg=0) - : id(id_), size(0), registry(reg) {} - - const std::string& getId() const { return id; } - uint64_t getSize() const { return size; } - - /** Add a message to be completed with this reference */ - void addMessage(MessagePtr message) { messages.push_back(message); } - - /** Append more data to the reference */ - void append(const framing::MessageAppendBody&); - - /** Close the reference, complete each associated message */ - void close(); - - const Appends& getAppends() const { return appends; } - const Messages& getMessages() const { return messages; } - - private: - Id id; - uint64_t size; - ReferenceRegistry* registry; - Messages messages; - Appends appends; -}; - - -/** - * A registry/factory for references. - * - * THREAD UNSAFE: per-channel resource, access to channels is - * serialized. - */ -class ReferenceRegistry { - public: - ReferenceRegistry() {}; - Reference::shared_ptr open(const Reference::Id& id); - Reference::shared_ptr get(const Reference::Id& id); - - private: - typedef std::map<Reference::Id, Reference::shared_ptr> ReferenceMap; - ReferenceMap references; - - // Reference calls references.erase(). - friend class Reference; -}; - - -}} // namespace qpid::broker - - - -#endif /*!_broker_Reference_h*/ diff --git a/qpid/cpp/src/qpid/broker/SemanticHandler.cpp b/qpid/cpp/src/qpid/broker/SemanticHandler.cpp index f65e450e82..5e9106c1dd 100644 --- a/qpid/cpp/src/qpid/broker/SemanticHandler.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticHandler.cpp @@ -20,7 +20,10 @@ */ #include "SemanticHandler.h" + +#include "boost/format.hpp" #include "BrokerAdapter.h" +#include "MessageDelivery.h" #include "qpid/framing/ChannelAdapter.h" #include "qpid/framing/ChannelCloseOkBody.h" #include "qpid/framing/ExecutionCompleteBody.h" @@ -32,18 +35,16 @@ using namespace qpid::framing; using namespace qpid::sys; SemanticHandler::SemanticHandler(ChannelId id, Connection& c) : - connection(c), - channel(c, *this, id, &c.broker.getStore()) + connection(c), channel(c, *this, id) { init(id, connection.getOutput(), connection.getVersion()); adapter = std::auto_ptr<BrokerAdapter>(new BrokerAdapter(channel, connection, connection.broker, *this)); } - void SemanticHandler::handle(framing::AMQFrame& frame) { - //TODO: assembly etc when move to 0-10 framing - // + //TODO: assembly for method and headers + //have potentially three separate tracks at this point: // // (1) execution controls @@ -51,46 +52,43 @@ void SemanticHandler::handle(framing::AMQFrame& frame) // (3) data i.e. content-bearing commands // //framesets on each can be interleaved. framesets on the latter - //two share a command-id sequence. + //two share a command-id sequence. controls on the first track are + //used to communicate details about that command-id sequence. // //need to decide what to do if a frame on the command track //arrives while a frameset on the data track is still //open. execute it (i.e. out-of order execution with respect to - //the command id sequence) or queue it up. + //the command id sequence) or queue it up? - //if ready to execute (i.e. if segment is complete or frame is - //message content): - handleBody(frame.getBody()); -} - -//ChannelAdapter virtual methods: -void SemanticHandler::handleMethod(framing::AMQMethodBody* method) -{ - try { - if (!method->invoke(this)) { - //temporary hack until channel management is moved to its own handler: - if (method->amqpClassId() != ChannelOpenBody::CLASS_ID) { - ++(incoming.lwm); - } + try{ - //else do the usual: - handleL4(method); - //(if the frameset is complete) we can move the execution-mark - //forward - - //temporary hack until channel management is moved to its own handler: - if (method->amqpClassId() != ChannelOpenBody::CLASS_ID) { - //TODO: need to account for async store opreations - //when this command is a message publication - ++(incoming.hwm); + TrackId track = getTrack(frame);//will be replaced by field in 0-10 frame header + + switch(track) { + case SESSION_CONTROL_TRACK://TODO: L2 should be handled by separate handler + handleL2(frame.castBody<AMQMethodBody>()); + break; + case EXECUTION_CONTROL_TRACK: + handleL3(frame.castBody<AMQMethodBody>()); + break; + case MODEL_COMMAND_TRACK: + if (!isOpen()) { + throw ConnectionException(504, (boost::format("Attempt to use unopened channel: %g") % getId()).str()); } - - //note: need to be more sophisticated than this if we execute - //commands that arrive within an active message frameset (that - //can't happen until 0-10 framing is implemented) + handleCommand(frame.castBody<AMQMethodBody>()); + break; + case MODEL_CONTENT_TRACK: + handleContent(frame); + break; } + + }catch(const ChannelException& e){ + adapter->getProxy().getChannel().close(e.code, e.toString(), getClassId(frame), getMethodId(frame)); + connection.closeChannel(getId()); + }catch(const ConnectionException& e){ + connection.close(e.code, e.toString(), getClassId(frame), getMethodId(frame)); }catch(const std::exception& e){ - connection.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId()); + connection.close(541/*internal error*/, e.what(), getClassId(frame), getMethodId(frame)); } } @@ -102,7 +100,6 @@ void SemanticHandler::complete(uint32_t cumulative, const SequenceNumberSet& ran outgoing.lwm = mark; //ack messages: channel.ackCumulative(mark.getValue()); - //std::cout << "[" << this << "] acknowledged: " << mark << std::endl; } if (range.size() % 2) { //must be even number throw ConnectionException(530, "Received odd number of elements in ranged mark"); @@ -116,7 +113,6 @@ void SemanticHandler::complete(uint32_t cumulative, const SequenceNumberSet& ran void SemanticHandler::flush() { //flush doubles as a sync to begin with - send an execution.complete - incoming.lwm = incoming.hwm; if (isOpen()) { Mutex::ScopedLock l(outLock); ChannelAdapter::send(ExecutionCompleteBody(getVersion(), incoming.hwm.getValue(), SequenceNumberSet())); @@ -142,52 +138,59 @@ void SemanticHandler::result(uint32_t /*command*/, const std::string& /*data*/) //never actually sent by client at present } -void SemanticHandler::handleL4(framing::AMQMethodBody* method) +void SemanticHandler::handleCommand(framing::AMQMethodBody* method) { - try{ - if(getId() != 0 && !method->isA<ChannelOpenBody>() && !isOpen()) { - if (!method->isA<ChannelCloseOkBody>()) { - std::stringstream out; - out << "Attempt to use unopened channel: " << getId(); - throw ConnectionException(504, out.str()); - } - } else { - InvocationVisitor v(adapter.get()); - method->accept(v); - if (!v.wasHandled()) { - throw ConnectionException(540, "Not implemented"); - } else if (v.hasResult()) { - ChannelAdapter::send(ExecutionResultBody(getVersion(), incoming.lwm.getValue(), v.getResult())); - } - } - }catch(const ChannelException& e){ - adapter->getProxy().getChannel().close( - e.code, e.toString(), - method->amqpClassId(), method->amqpMethodId()); - connection.closeChannel(getId()); - }catch(const ConnectionException& e){ - connection.close(e.code, e.toString(), method->amqpClassId(), method->amqpMethodId()); + ++(incoming.lwm); + InvocationVisitor v(adapter.get()); + method->accept(v); + //TODO: need to account for async store operations and interleaving + ++(incoming.hwm); + + if (!v.wasHandled()) { + throw ConnectionException(540, "Not implemented"); + } else if (v.hasResult()) { + ChannelAdapter::send(ExecutionResultBody(getVersion(), incoming.lwm.getValue(), v.getResult())); } } -bool SemanticHandler::isOpen() const -{ - return channel.isOpen(); +void SemanticHandler::handleL2(framing::AMQMethodBody* method) +{ + if(!method->isA<ChannelOpenBody>() && !isOpen()) { + if (!method->isA<ChannelCloseOkBody>()) { + throw ConnectionException(504, (boost::format("Attempt to use unopened channel: %g") % getId()).str()); + } + } else { + method->invoke(adapter->getChannelHandler()); + } } -void SemanticHandler::handleHeader(qpid::framing::AMQHeaderBody* body) +void SemanticHandler::handleL3(framing::AMQMethodBody* method) { - channel.handleHeader(body); + if (!method->invoke(this)) { + throw ConnectionException(540, "Not implemented"); + } } -void SemanticHandler::handleContent(qpid::framing::AMQContentBody* body) +void SemanticHandler::handleContent(AMQFrame& frame) { - channel.handleContent(body); + Message::shared_ptr msg(msgBuilder.getMessage()); + if (!msg) {//start of frameset will be indicated by frame flags + msgBuilder.start(++(incoming.lwm)); + msg = msgBuilder.getMessage(); + } + msgBuilder.handle(frame); + if (msg->getFrames().isComplete()) {//end of frameset will be indicated by frame flags + msg->setPublisher(&connection); + channel.handle(msg); + msgBuilder.end(); + //TODO: need to account for async store operations and interleaving + ++(incoming.hwm); + } } -void SemanticHandler::handleHeartbeat(qpid::framing::AMQHeartbeatBody* body) -{ - channel.handleHeartbeat(body); +bool SemanticHandler::isOpen() const +{ + return channel.isOpen(); } DeliveryId SemanticHandler::deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token) @@ -195,14 +198,13 @@ DeliveryId SemanticHandler::deliver(Message::shared_ptr& msg, DeliveryToken::sha Mutex::ScopedLock l(outLock); SequenceNumber copy(outgoing.hwm); ++copy; - msg->deliver(*this, copy.getValue(), token, connection.getFrameMax()); - //std::cout << "[" << this << "] delivered: " << outgoing.hwm.getValue() << std::endl; + MessageDelivery::deliver(msg, *this, copy.getValue(), token, connection.getFrameMax()); return outgoing.hwm.getValue(); } void SemanticHandler::redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId tag) { - msg->deliver(*this, tag, token, connection.getFrameMax()); + MessageDelivery::deliver(msg, *this, tag, token, connection.getFrameMax()); } void SemanticHandler::send(const AMQBody& body) @@ -214,3 +216,49 @@ void SemanticHandler::send(const AMQBody& body) } ChannelAdapter::send(body); } + +uint16_t SemanticHandler::getClassId(const AMQFrame& frame) +{ + return frame.getBody()->type() == METHOD_BODY ? frame.castBody<AMQMethodBody>()->amqpClassId() : 0; +} + +uint16_t SemanticHandler::getMethodId(const AMQFrame& frame) +{ + return frame.getBody()->type() == METHOD_BODY ? frame.castBody<AMQMethodBody>()->amqpMethodId() : 0; +} + +SemanticHandler::TrackId SemanticHandler::getTrack(const AMQFrame& frame) +{ + //will be replaced by field in 0-10 frame header + uint8_t type = frame.getBody()->type(); + uint16_t classId; + switch(type) { + case METHOD_BODY: + if (frame.castBody<AMQMethodBody>()->isContentBearing()) { + return MODEL_CONTENT_TRACK; + } + + classId = frame.castBody<AMQMethodBody>()->amqpClassId(); + switch (classId) { + case ChannelOpenBody::CLASS_ID: + return SESSION_CONTROL_TRACK; + case ExecutionCompleteBody::CLASS_ID: + return EXECUTION_CONTROL_TRACK; + } + + return MODEL_COMMAND_TRACK; + case HEADER_BODY: + case CONTENT_BODY: + return MODEL_CONTENT_TRACK; + } + throw Exception("Could not determine track"); +} + +//ChannelAdapter virtual methods, no longer used: +void SemanticHandler::handleMethod(framing::AMQMethodBody*){} + +void SemanticHandler::handleHeader(qpid::framing::AMQHeaderBody*) {} + +void SemanticHandler::handleContent(qpid::framing::AMQContentBody*) {} + +void SemanticHandler::handleHeartbeat(qpid::framing::AMQHeartbeatBody*) {} diff --git a/qpid/cpp/src/qpid/broker/SemanticHandler.h b/qpid/cpp/src/qpid/broker/SemanticHandler.h index 672c6ad929..611cd3a99b 100644 --- a/qpid/cpp/src/qpid/broker/SemanticHandler.h +++ b/qpid/cpp/src/qpid/broker/SemanticHandler.h @@ -25,6 +25,7 @@ #include "BrokerChannel.h" #include "Connection.h" #include "DeliveryAdapter.h" +#include "MessageBuilder.h" #include "qpid/framing/amqp_types.h" #include "qpid/framing/AMQP_ServerOperations.h" #include "qpid/framing/FrameHandler.h" @@ -55,8 +56,17 @@ class SemanticHandler : private framing::ChannelAdapter, framing::Window incoming; framing::Window outgoing; sys::Mutex outLock; + MessageBuilder msgBuilder; - void handleL4(framing::AMQMethodBody* method); + enum TrackId {SESSION_CONTROL_TRACK, EXECUTION_CONTROL_TRACK, MODEL_COMMAND_TRACK, MODEL_CONTENT_TRACK}; + TrackId getTrack(const framing::AMQFrame& frame); + uint16_t getClassId(const framing::AMQFrame& frame); + uint16_t getMethodId(const framing::AMQFrame& frame); + + void handleL3(framing::AMQMethodBody* method); + void handleL2(framing::AMQMethodBody* method); + void handleCommand(framing::AMQMethodBody* method); + void handleContent(framing::AMQFrame& frame); //ChannelAdapter virtual methods: void handleMethod(framing::AMQMethodBody* method); diff --git a/qpid/cpp/src/qpid/broker/TopicExchange.h b/qpid/cpp/src/qpid/broker/TopicExchange.h index 6536a7c4ce..c411fb1965 100644 --- a/qpid/cpp/src/qpid/broker/TopicExchange.h +++ b/qpid/cpp/src/qpid/broker/TopicExchange.h @@ -25,7 +25,6 @@ #include <vector> #include "BrokerExchange.h" #include "qpid/framing/FieldTable.h" -#include "BrokerMessage.h" #include "qpid/sys/Monitor.h" #include "BrokerQueue.h" diff --git a/qpid/cpp/src/qpid/broker/TxPublish.h b/qpid/cpp/src/qpid/broker/TxPublish.h index 29b1dc38af..564e021c5a 100644 --- a/qpid/cpp/src/qpid/broker/TxPublish.h +++ b/qpid/cpp/src/qpid/broker/TxPublish.h @@ -24,10 +24,10 @@ #include <algorithm> #include <functional> #include <list> +#include "BrokerQueue.h" #include "Deliverable.h" -#include "BrokerMessage.h" +#include "Message.h" #include "MessageStore.h" -#include "BrokerQueue.h" #include "TxOp.h" namespace qpid { diff --git a/qpid/cpp/src/qpid/client/ChannelHandler.cpp b/qpid/cpp/src/qpid/client/ChannelHandler.cpp index b3d720baf0..754b0544c6 100644 --- a/qpid/cpp/src/qpid/client/ChannelHandler.cpp +++ b/qpid/cpp/src/qpid/client/ChannelHandler.cpp @@ -75,7 +75,7 @@ void ChannelHandler::open(uint16_t _id) id = _id; setState(OPENING); - AMQFrame f(version, id, ChannelOpenBody(version)); + AMQFrame f(id, ChannelOpenBody(version)); out(f); std::set<int> states; @@ -90,7 +90,7 @@ void ChannelHandler::open(uint16_t _id) void ChannelHandler::close(uint16_t code, const std::string& message, uint16_t classId, uint16_t methodId) { setState(CLOSING); - AMQFrame f(version, id, ChannelCloseBody(version, code, message, classId, methodId)); + AMQFrame f(id, ChannelCloseBody(version, code, message, classId, methodId)); out(f); } diff --git a/qpid/cpp/src/qpid/client/ClientChannel.cpp b/qpid/cpp/src/qpid/client/ClientChannel.cpp index d1cc4734eb..cc2b7aedc8 100644 --- a/qpid/cpp/src/qpid/client/ClientChannel.cpp +++ b/qpid/cpp/src/qpid/client/ClientChannel.cpp @@ -181,8 +181,8 @@ bool Channel::get(Message& msg, const Queue& queue, AckMode ackMode) { if (response.isA<BasicGetEmptyBody>()) { return false; } else { - ReceivedContent::shared_ptr content = gets.pop(); - content->populate(msg); + FrameSet::shared_ptr content = gets.pop(); + msg.populate(*content); return true; } } @@ -232,13 +232,13 @@ void Channel::join() { void Channel::run() { try { while (true) { - ReceivedContent::shared_ptr content = session->get(); + FrameSet::shared_ptr content = session->get(); //need to dispatch this to the relevant listener: if (content->isA<BasicDeliverBody>()) { ConsumerMap::iterator i = consumers.find(content->as<BasicDeliverBody>()->getConsumerTag()); if (i != consumers.end()) { Message msg; - content->populate(msg); + msg.populate(*content); i->second.listener->received(msg); } else { QPID_LOG(warning, "Dropping message for unrecognised consumer: " << content->getMethod()); diff --git a/qpid/cpp/src/qpid/client/ClientChannel.h b/qpid/cpp/src/qpid/client/ClientChannel.h index d73addc950..c355fe007a 100644 --- a/qpid/cpp/src/qpid/client/ClientChannel.h +++ b/qpid/cpp/src/qpid/client/ClientChannel.h @@ -83,7 +83,7 @@ class Channel : private sys::Runnable std::auto_ptr<Session> session; SessionCore::shared_ptr sessionCore; framing::ChannelId channelId; - BlockingQueue<ReceivedContent::shared_ptr> gets; + BlockingQueue<framing::FrameSet::shared_ptr> gets; framing::Uuid uniqueId; uint32_t nameCounter; diff --git a/qpid/cpp/src/qpid/client/ClientMessage.h b/qpid/cpp/src/qpid/client/ClientMessage.h index fd33fbc830..19b0f867bc 100644 --- a/qpid/cpp/src/qpid/client/ClientMessage.h +++ b/qpid/cpp/src/qpid/client/ClientMessage.h @@ -23,8 +23,13 @@ */ #include <string> #include "qpid/framing/BasicHeaderProperties.h" +#include "qpid/framing/FrameSet.h" #include "qpid/framing/MethodContent.h" +#include "qpid/framing/BasicDeliverBody.h" +#include "qpid/framing/BasicGetOkBody.h" +#include "qpid/framing/MessageTransferBody.h" + namespace qpid { namespace client { @@ -55,6 +60,17 @@ class Message : public framing::BasicHeaderProperties, public framing::MethodCon const HeaderProperties& getMethodHeaders() const { return *this; } + + //TODO: move this elsewhere (GRS 24/08/2007) + void populate(framing::FrameSet& frameset) + { + const BasicHeaderProperties* properties = frameset.getHeaders()->get<BasicHeaderProperties>(); + if (properties) { + BasicHeaderProperties::copy<Message, BasicHeaderProperties>(*this, *properties); + } + frameset.getContent(data); + } + private: std::string data; std::string destination; diff --git a/qpid/cpp/src/qpid/client/ConnectionHandler.cpp b/qpid/cpp/src/qpid/client/ConnectionHandler.cpp index 66db9384e2..40e13593ea 100644 --- a/qpid/cpp/src/qpid/client/ConnectionHandler.cpp +++ b/qpid/cpp/src/qpid/client/ConnectionHandler.cpp @@ -109,7 +109,7 @@ void ConnectionHandler::close() void ConnectionHandler::send(const framing::AMQBody& body) { - AMQFrame f(ProtocolVersion(), 0, body); + AMQFrame f(0, body); out(f); } diff --git a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp index e63ac69da6..b4d2156c31 100644 --- a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp @@ -107,7 +107,7 @@ void ConnectionImpl::idleIn() void ConnectionImpl::idleOut() { - AMQFrame frame(version, 0, new AMQHeartbeatBody()); + AMQFrame frame(0, new AMQHeartbeatBody()); connector->send(frame); } diff --git a/qpid/cpp/src/qpid/client/Connector.cpp b/qpid/cpp/src/qpid/client/Connector.cpp index b25f19e4ba..6e12a9c84f 100644 --- a/qpid/cpp/src/qpid/client/Connector.cpp +++ b/qpid/cpp/src/qpid/client/Connector.cpp @@ -180,7 +180,7 @@ void Connector::run(){ inbuf.move(received); inbuf.flip();//position = 0, limit = total data read - AMQFrame frame(version); + AMQFrame frame; while(frame.decode(inbuf)){ QPID_LOG(trace, "RECV: " << frame); input->received(frame); diff --git a/qpid/cpp/src/qpid/client/ExecutionHandler.cpp b/qpid/cpp/src/qpid/client/ExecutionHandler.cpp index 6c2600d00b..d10b3d3fe8 100644 --- a/qpid/cpp/src/qpid/client/ExecutionHandler.cpp +++ b/qpid/cpp/src/qpid/client/ExecutionHandler.cpp @@ -64,9 +64,9 @@ void ExecutionHandler::handle(AMQFrame& frame) if (!invoke(body, this)) { if (isContentFrame(frame)) { if (!arriving) { - arriving = ReceivedContent::shared_ptr(new ReceivedContent(++incoming.hwm)); + arriving = FrameSet::shared_ptr(new FrameSet(++incoming.hwm)); } - arriving->append(body); + arriving->append(frame); if (arriving->isComplete()) { received.push(arriving); arriving.reset(); @@ -123,7 +123,7 @@ void ExecutionHandler::sync() void ExecutionHandler::sendFlush() { - AMQFrame frame(version, 0, ExecutionFlushBody()); + AMQFrame frame(0, ExecutionFlushBody()); out(frame); } @@ -139,8 +139,7 @@ void ExecutionHandler::send(const AMQBody& command, CompletionTracker::Listener correlation.listen(g); } - AMQFrame frame(version, 0/*id will be filled in be channel handler*/, - command); + AMQFrame frame(0/*id will be filled in be channel handler*/, command); out(frame); } @@ -149,10 +148,10 @@ void ExecutionHandler::sendContent(const AMQBody& command, const BasicHeaderProp { send(command, f, g); - AMQHeaderBody header(BASIC); - BasicHeaderProperties::copy(*static_cast<BasicHeaderProperties*>(header.getProperties()), headers); - header.setContentSize(data.size()); - AMQFrame h(version, 0, header); + AMQHeaderBody header; + BasicHeaderProperties::copy(*header.get<BasicHeaderProperties>(true), headers); + header.get<BasicHeaderProperties>(true)->setContentLength(data.size()); + AMQFrame h(0, header); out(h); u_int64_t data_length = data.length(); @@ -160,7 +159,7 @@ void ExecutionHandler::sendContent(const AMQBody& command, const BasicHeaderProp //frame itself uses 8 bytes u_int32_t frag_size = maxFrameSize - 8; if(data_length < frag_size){ - AMQFrame frame(version, 0, AMQContentBody(data)); + AMQFrame frame(0, AMQContentBody(data)); out(frame); }else{ u_int32_t offset = 0; @@ -168,7 +167,7 @@ void ExecutionHandler::sendContent(const AMQBody& command, const BasicHeaderProp while (remaining > 0) { u_int32_t length = remaining > frag_size ? frag_size : remaining; string frag(data.substr(offset, length)); - AMQFrame frame(version, 0, AMQContentBody(frag)); + AMQFrame frame(0, AMQContentBody(frag)); out(frame); offset += length; remaining = data_length - offset; diff --git a/qpid/cpp/src/qpid/client/ExecutionHandler.h b/qpid/cpp/src/qpid/client/ExecutionHandler.h index b409d5df7b..f740e14185 100644 --- a/qpid/cpp/src/qpid/client/ExecutionHandler.h +++ b/qpid/cpp/src/qpid/client/ExecutionHandler.h @@ -23,12 +23,12 @@ #include <queue> #include "qpid/framing/AMQP_ServerOperations.h" +#include "qpid/framing/FrameSet.h" #include "qpid/framing/SequenceNumber.h" #include "BlockingQueue.h" #include "ChainableFrameHandler.h" #include "CompletionTracker.h" #include "Correlator.h" -#include "ReceivedContent.h" namespace qpid { namespace client { @@ -39,7 +39,7 @@ class ExecutionHandler : { framing::Window incoming; framing::Window outgoing; - ReceivedContent::shared_ptr arriving; + framing::FrameSet::shared_ptr arriving; Correlator correlation; CompletionTracker completion; framing::ProtocolVersion version; @@ -52,7 +52,7 @@ class ExecutionHandler : void sync(); public: - BlockingQueue<ReceivedContent::shared_ptr> received; + BlockingQueue<framing::FrameSet::shared_ptr> received; ExecutionHandler(uint64_t maxFrameSize = 65536); diff --git a/qpid/cpp/src/qpid/client/ReceivedContent.cpp b/qpid/cpp/src/qpid/client/ReceivedContent.cpp deleted file mode 100644 index 5a1f48901a..0000000000 --- a/qpid/cpp/src/qpid/client/ReceivedContent.cpp +++ /dev/null @@ -1,105 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "ReceivedContent.h" -#include "qpid/framing/all_method_bodies.h" - -using qpid::client::ReceivedContent; -using namespace qpid::framing; -using namespace boost; - -ReceivedContent::ReceivedContent(const SequenceNumber& _id) : id(_id) {} - -void ReceivedContent::append(AMQBody* part) -{ - parts.push_back(AMQFrame(ProtocolVersion(), 0, part)); -} - -bool ReceivedContent::isComplete() const -{ - if (parts.empty()) { - return false; - } else if (isA<BasicDeliverBody>() || isA<BasicGetOkBody>()) { - const AMQHeaderBody* headers(getHeaders()); - return headers && headers->getContentSize() == getContentSize(); - } else if (isA<MessageTransferBody>()) { - //no longer support references, headers and data are still method fields - return true; - } else { - throw Exception("Unknown content class"); - } -} - - -const AMQMethodBody* ReceivedContent::getMethod() const -{ - return parts.empty() ? 0 : dynamic_cast<const AMQMethodBody*>(parts[0].getBody()); -} - -const AMQHeaderBody* ReceivedContent::getHeaders() const -{ - return parts.size() < 2 ? 0 : dynamic_cast<const AMQHeaderBody*>(parts[1].getBody()); -} - -uint64_t ReceivedContent::getContentSize() const -{ - if (isA<BasicDeliverBody>() || isA<BasicGetOkBody>()) { - uint64_t size(0); - for (uint i = 2; i < parts.size(); i++) { - size += parts[i].getBody()->size(); - } - return size; - } else if (isA<MessageTransferBody>()) { - return as<MessageTransferBody>()->getBody().getValue().size(); - } else { - throw Exception("Unknown content class"); - } -} - -std::string ReceivedContent::getContent() const -{ - if (isA<BasicDeliverBody>() || isA<BasicGetOkBody>()) { - string data; - for (uint i = 2; i < parts.size(); i++) { - data += static_cast<const AMQContentBody*>(parts[i].getBody())->getData(); - } - return data; - } else if (isA<MessageTransferBody>()) { - return as<MessageTransferBody>()->getBody().getValue(); - } else { - throw Exception("Unknown content class"); - } -} - -void ReceivedContent::populate(Message& msg) -{ - if (!isComplete()) throw Exception("Incomplete message"); - - if (isA<BasicDeliverBody>() || isA<BasicGetOkBody>()) { - const BasicHeaderProperties* properties = dynamic_cast<const BasicHeaderProperties*>(getHeaders()->getProperties()); - BasicHeaderProperties::copy<Message, BasicHeaderProperties>(msg, *properties); - msg.setData(getContent()); - } else if (isA<MessageTransferBody>()) { - throw Exception("Transfer not yet supported"); - } else { - throw Exception("Unknown content class"); - } -} diff --git a/qpid/cpp/src/qpid/client/ReceivedContent.h b/qpid/cpp/src/qpid/client/ReceivedContent.h deleted file mode 100644 index 4f84039c10..0000000000 --- a/qpid/cpp/src/qpid/client/ReceivedContent.h +++ /dev/null @@ -1,75 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <string> -#include <vector> -#include "qpid/framing/amqp_framing.h" -#include "qpid/framing/AMQFrame.h" -#include "qpid/framing/SequenceNumber.h" -#include "ClientMessage.h" - -#ifndef _ReceivedContent_ -#define _ReceivedContent_ - -namespace qpid { -namespace client { - -/** - * Collects the frames representing some received 'content'. This - * provides a raw interface to 'message' data and attributes. - */ -class ReceivedContent -{ - const framing::SequenceNumber id; - std::vector<framing::AMQFrame> parts; - -public: - typedef boost::shared_ptr<ReceivedContent> shared_ptr; - - ReceivedContent(const framing::SequenceNumber& id); - void append(framing::AMQBody* part); - bool isComplete() const; - - uint64_t getContentSize() const; - std::string getContent() const; - - const framing::AMQMethodBody* getMethod() const; - const framing::AMQHeaderBody* getHeaders() const; - - template <class T> bool isA() const { - const framing::AMQMethodBody* method=getMethod(); - return method && method->isA<T>(); - } - - template <class T> const T* as() const { - const framing::AMQMethodBody* method=getMethod(); - return (method && method->isA<T>()) ? dynamic_cast<const T*>(method) : 0; - } - - const framing::SequenceNumber& getId() const { return id; } - - void populate(Message& msg); -}; - -} -} - - -#endif diff --git a/qpid/cpp/src/qpid/client/SessionCore.cpp b/qpid/cpp/src/qpid/client/SessionCore.cpp index f7ed7416cd..1b04e74af4 100644 --- a/qpid/cpp/src/qpid/client/SessionCore.cpp +++ b/qpid/cpp/src/qpid/client/SessionCore.cpp @@ -77,7 +77,7 @@ Response SessionCore::send(const AMQMethodBody& method, const MethodContent& con return Response(f); } -ReceivedContent::shared_ptr SessionCore::get() +FrameSet::shared_ptr SessionCore::get() { return l3.received.pop(); } diff --git a/qpid/cpp/src/qpid/client/SessionCore.h b/qpid/cpp/src/qpid/client/SessionCore.h index bcbaf0028d..0febb956b9 100644 --- a/qpid/cpp/src/qpid/client/SessionCore.h +++ b/qpid/cpp/src/qpid/client/SessionCore.h @@ -25,11 +25,11 @@ #include <boost/shared_ptr.hpp> #include "qpid/framing/AMQMethodBody.h" #include "qpid/framing/FrameHandler.h" +#include "qpid/framing/FrameSet.h" #include "qpid/framing/MethodContent.h" #include "ChannelHandler.h" #include "ExecutionHandler.h" #include "FutureFactory.h" -#include "ReceivedContent.h" #include "Response.h" namespace qpid { @@ -49,7 +49,7 @@ public: SessionCore(uint16_t id, boost::shared_ptr<framing::FrameHandler> out, uint64_t maxFrameSize); Response send(const framing::AMQMethodBody& method, bool expectResponse = false); Response send(const framing::AMQMethodBody& method, const framing::MethodContent& content, bool expectResponse = false); - ReceivedContent::shared_ptr get(); + framing::FrameSet::shared_ptr get(); uint16_t getId() const { return id; } void setSync(bool); bool isSync(); diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index e80912a2ea..b8cf568bf7 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -94,8 +94,7 @@ void Cluster::handle(AMQFrame& frame) { } void Cluster::notify() { - AMQFrame frame(ProtocolVersion(), 0, - ClusterNotifyBody(ProtocolVersion(), url)); + AMQFrame frame(0, ClusterNotifyBody(ProtocolVersion(), url)); handle(frame); } diff --git a/qpid/cpp/src/qpid/cluster/SessionManager.cpp b/qpid/cpp/src/qpid/cluster/SessionManager.cpp index 44c5ff24c5..5aa9c3dc21 100644 --- a/qpid/cpp/src/qpid/cluster/SessionManager.cpp +++ b/qpid/cpp/src/qpid/cluster/SessionManager.cpp @@ -51,7 +51,7 @@ struct BrokerHandler : public FrameHandler, private ChannelAdapter, private Deli // BrokerHandler(Broker& broker) : connection(0, broker), - channel(connection, *this, 1, 0), + channel(connection, *this, 1), adapter(channel, connection, broker, *this) {} void handle(AMQFrame& frame) { diff --git a/qpid/cpp/src/qpid/framing/AMQContentBody.h b/qpid/cpp/src/qpid/framing/AMQContentBody.h index dd4ab10d7d..5d530a1b9a 100644 --- a/qpid/cpp/src/qpid/framing/AMQContentBody.h +++ b/qpid/cpp/src/qpid/framing/AMQContentBody.h @@ -38,6 +38,7 @@ public: inline virtual ~AMQContentBody(){} inline uint8_t type() const { return CONTENT_BODY; }; inline const string& getData() const { return data; } + inline string& getData() { return data; } uint32_t size() const; void encode(Buffer& buffer) const; void decode(Buffer& buffer, uint32_t size); diff --git a/qpid/cpp/src/qpid/framing/AMQDataBlock.h b/qpid/cpp/src/qpid/framing/AMQDataBlock.h index 6ff61b185e..9b6fdfd966 100644 --- a/qpid/cpp/src/qpid/framing/AMQDataBlock.h +++ b/qpid/cpp/src/qpid/framing/AMQDataBlock.h @@ -30,7 +30,7 @@ class AMQDataBlock { public: virtual ~AMQDataBlock() {} - virtual void encode(Buffer& buffer) = 0; + virtual void encode(Buffer& buffer) const = 0; virtual bool decode(Buffer& buffer) = 0; virtual uint32_t size() const = 0; }; diff --git a/qpid/cpp/src/qpid/framing/AMQFrame.cpp b/qpid/cpp/src/qpid/framing/AMQFrame.cpp index 780af71be4..a7fd068ee4 100644 --- a/qpid/cpp/src/qpid/framing/AMQFrame.cpp +++ b/qpid/cpp/src/qpid/framing/AMQFrame.cpp @@ -70,7 +70,7 @@ const AMQBody* AMQFrame::getBody() const { return boost::apply_visitor(GetBodyVisitor(), const_cast<Variant&>(body)); } -void AMQFrame::encode(Buffer& buffer) +void AMQFrame::encode(Buffer& buffer) const { buffer.putOctet(getBody()->type()); buffer.putShort(channel); @@ -80,8 +80,11 @@ void AMQFrame::encode(Buffer& buffer) } uint32_t AMQFrame::size() const{ - return 1/*type*/ + 2/*channel*/ + 4/*body size*/ + - boost::apply_visitor(SizeVisitor(), body) + 1/*0xCE*/; + return frameOverhead() + boost::apply_visitor(SizeVisitor(), body); +} + +uint32_t AMQFrame::frameOverhead() { + return 1/*type*/ + 2/*channel*/ + 4/*body size*/ + 1/*0xCE*/; } bool AMQFrame::decode(Buffer& buffer) diff --git a/qpid/cpp/src/qpid/framing/AMQFrame.h b/qpid/cpp/src/qpid/framing/AMQFrame.h index 9e825a9936..84e7660218 100644 --- a/qpid/cpp/src/qpid/framing/AMQFrame.h +++ b/qpid/cpp/src/qpid/framing/AMQFrame.h @@ -37,14 +37,14 @@ namespace framing { class AMQFrame : public AMQDataBlock { public: - AMQFrame(ProtocolVersion=ProtocolVersion()) {} + AMQFrame() : channel(0) {} /** Construct a frame with a copy of b */ - AMQFrame(ProtocolVersion, ChannelId c, const AMQBody* b) : channel(c) { + AMQFrame(ChannelId c, const AMQBody* b) : channel(c) { setBody(*b); } - AMQFrame(ProtocolVersion, ChannelId c, const AMQBody& b) : channel(c) { + AMQFrame(ChannelId c, const AMQBody& b) : channel(c) { setBody(b); } @@ -52,21 +52,26 @@ class AMQFrame : public AMQDataBlock void setChannel(ChannelId c) { channel = c; } AMQBody* getBody(); - const AMQBody* getBody() const; + const AMQBody* getBody() const; /** Copy a body instance to the frame */ void setBody(const AMQBody& b) { CopyVisitor cv(*this); b.accept(cv); } /** Convenience template to cast the body to an expected type. */ template <class T> T* castBody() { - boost::polymorphic_downcast<T*>(getBody()); + return boost::polymorphic_downcast<T*>(getBody()); + } + + template <class T> const T* castBody() const { + return boost::polymorphic_downcast<const T*>(getBody()); } bool empty() { return boost::get<boost::blank>(&body); } - void encode(Buffer& buffer); + void encode(Buffer& buffer) const; bool decode(Buffer& buffer); uint32_t size() const; + static uint32_t frameOverhead(); private: struct CopyVisitor : public AMQBodyConstVisitor { @@ -77,7 +82,7 @@ class AMQFrame : public AMQDataBlock void visit(const AMQHeartbeatBody& x) { frame.body=x; } void visit(const AMQMethodBody& x) { frame.body=MethodHolder(x); } }; - friend struct CopyVisitor; + friend struct CopyVisitor; typedef boost::variant<boost::blank, AMQHeaderBody, diff --git a/qpid/cpp/src/qpid/framing/AMQHeaderBody.cpp b/qpid/cpp/src/qpid/framing/AMQHeaderBody.cpp index 6a3c8f27d1..7083709fde 100644 --- a/qpid/cpp/src/qpid/framing/AMQHeaderBody.cpp +++ b/qpid/cpp/src/qpid/framing/AMQHeaderBody.cpp @@ -19,37 +19,65 @@ * */ #include "AMQHeaderBody.h" -#include "qpid/QpidError.h" -#include "BasicHeaderProperties.h" +#include "qpid/Exception.h" +#include "qpid/log/Statement.h" -qpid::framing::AMQHeaderBody::AMQHeaderBody(int) : weight(0), contentSize(0) {} +qpid::framing::AMQHeaderBody::AMQHeaderBody() {} -qpid::framing::AMQHeaderBody::AMQHeaderBody() : weight(0), contentSize(0){} - -qpid::framing::AMQHeaderBody::~AMQHeaderBody(){} +qpid::framing::AMQHeaderBody::~AMQHeaderBody() {} uint32_t qpid::framing::AMQHeaderBody::size() const{ - return 12 + properties.size(); + CalculateSize visitor; + for_each(properties.begin(), properties.end(), boost::apply_visitor(visitor)); + return visitor.totalSize() + (properties.size() * (2/*type codes*/ + 4/*size*/)); } void qpid::framing::AMQHeaderBody::encode(Buffer& buffer) const { - buffer.putShort(properties.classId()); - buffer.putShort(weight); - buffer.putLongLong(contentSize); - properties.encode(buffer); + Encode visitor(buffer); + for_each(properties.begin(), properties.end(), boost::apply_visitor(visitor)); +} + +void qpid::framing::AMQHeaderBody::decode(Buffer& buffer, uint32_t size){ + uint32_t limit = buffer.available() - size; + while (buffer.available() > limit + 2) { + uint32_t len = buffer.getLong(); + uint16_t type = buffer.getShort(); + //The following switch could be generated as the number of options increases: + switch(type) { + case BasicHeaderProperties::TYPE: + decode(BasicHeaderProperties(), buffer, len - 2); + break; + case MessageProperties::TYPE: + decode(MessageProperties(), buffer, len - 2); + break; + case DeliveryProperties::TYPE: + decode(DeliveryProperties(), buffer, len - 2); + break; + default: + //TODO: should just skip over them keeping them for later dispatch as is + throw Exception(QPID_MSG("Unexpected property type: " << type)); + } + } } -void qpid::framing::AMQHeaderBody::decode(Buffer& buffer, uint32_t bufSize){ - buffer.getShort(); // Ignore classId - weight = buffer.getShort(); - contentSize = buffer.getLongLong(); - properties.decode(buffer, bufSize - 12); +uint64_t qpid::framing::AMQHeaderBody::getContentLength() const +{ + const MessageProperties* mProps = get<MessageProperties>(); + if (mProps) { + return mProps->getContentLength(); + } + const BasicHeaderProperties* bProps = get<BasicHeaderProperties>(); + if (bProps) { + return bProps->getContentLength(); + } + return 0; } void qpid::framing::AMQHeaderBody::print(std::ostream& out) const { - out << "header (" << size() << " bytes)" << " content_size=" << getContentSize(); - out << ", message_id=" << properties.getMessageId(); - out << ", delivery_mode=" << (int) properties.getDeliveryMode(); - out << ", headers=" << properties.getHeaders(); + out << "header (" << size() << " bytes)"; + out << "; properties={"; + Print visitor(out); + for_each(properties.begin(), properties.end(), boost::apply_visitor(visitor)); + out << "}"; } diff --git a/qpid/cpp/src/qpid/framing/AMQHeaderBody.h b/qpid/cpp/src/qpid/framing/AMQHeaderBody.h index 894936060c..76bd60559e 100644 --- a/qpid/cpp/src/qpid/framing/AMQHeaderBody.h +++ b/qpid/cpp/src/qpid/framing/AMQHeaderBody.h @@ -22,6 +22,12 @@ #include "AMQBody.h" #include "Buffer.h" #include "BasicHeaderProperties.h" +#include "qpid/framing/DeliveryProperties.h" +#include "qpid/framing/MessageProperties.h" +#include <iostream> +#include <vector> +#include <boost/variant.hpp> +#include <boost/variant/get.hpp> #ifndef _AMQHeaderBody_ #define _AMQHeaderBody_ @@ -31,24 +37,85 @@ namespace framing { class AMQHeaderBody : public AMQBody { - BasicHeaderProperties properties; - uint16_t weight; - uint64_t contentSize; - public: - AMQHeaderBody(int classId); + typedef std::vector< boost::variant<BasicHeaderProperties, DeliveryProperties, MessageProperties> > PropertyList; + + PropertyList properties; + + template <class T> void decode(T t, Buffer& b, uint32_t size) { + t.decode(b, size); + properties.push_back(t); + } + + class Encode : public boost::static_visitor<> { + Buffer& buffer; + public: + Encode(Buffer& b) : buffer(b) {} + + template <class T> void operator()(T& t) const { + buffer.putLong(t.size() + 2/*typecode*/); + buffer.putShort(T::TYPE); + t.encode(buffer); + } + }; + + class CalculateSize : public boost::static_visitor<> { + uint32_t size; + public: + CalculateSize() : size(0) {} + + template <class T> void operator()(T& t) { + size += t.size(); + } + + uint32_t totalSize() { + return size; + } + }; + + class Print : public boost::static_visitor<> { + std::ostream& out; + public: + Print(std::ostream& o) : out(o) {} + + template <class T> void operator()(T& t) { + out << t; + } + }; + +public: + AMQHeaderBody(); + ~AMQHeaderBody(); inline uint8_t type() const { return HEADER_BODY; } - BasicHeaderProperties* getProperties(){ return &properties; } - const BasicHeaderProperties* getProperties() const { return &properties; } - inline uint64_t getContentSize() const { return contentSize; } - inline void setContentSize(uint64_t _size) { contentSize = _size; } - virtual ~AMQHeaderBody(); - virtual uint32_t size() const; - virtual void encode(Buffer& buffer) const; - virtual void decode(Buffer& buffer, uint32_t size); - virtual void print(std::ostream& out) const; + + uint32_t size() const; + void encode(Buffer& buffer) const; + void decode(Buffer& buffer, uint32_t size); + uint64_t getContentLength() const; + void print(std::ostream& out) const; void accept(AMQBodyConstVisitor& v) const { v.visit(*this); } + + template <class T> T* get(bool create) { + for (PropertyList::iterator i = properties.begin(); i != properties.end(); i++) { + T* p = boost::get<T>(&(*i)); + if (p) return p; + } + if (create) { + properties.push_back(T()); + return boost::get<T>(&(properties.back())); + } else { + return 0; + } + } + + template <class T> const T* get() const { + for (PropertyList::const_iterator i = properties.begin(); i != properties.end(); i++) { + const T* p = boost::get<T>(&(*i)); + if (p) return p; + } + return 0; + } }; } diff --git a/qpid/cpp/src/qpid/framing/AMQMethodBody.h b/qpid/cpp/src/qpid/framing/AMQMethodBody.h index 5acb3a7b66..a5c14a37e9 100644 --- a/qpid/cpp/src/qpid/framing/AMQMethodBody.h +++ b/qpid/cpp/src/qpid/framing/AMQMethodBody.h @@ -49,6 +49,7 @@ class AMQMethodBody : public AMQBody { virtual MethodId amqpMethodId() const = 0; virtual ClassId amqpClassId() const = 0; + virtual bool isContentBearing() const = 0; void invoke(AMQP_ServerOperations&); bool invoke(Invocable*); diff --git a/qpid/cpp/src/qpid/framing/BasicHeaderProperties.cpp b/qpid/cpp/src/qpid/framing/BasicHeaderProperties.cpp index dfa5e1bc3f..7d933d0db8 100644 --- a/qpid/cpp/src/qpid/framing/BasicHeaderProperties.cpp +++ b/qpid/cpp/src/qpid/framing/BasicHeaderProperties.cpp @@ -22,7 +22,10 @@ //TODO: This could be easily generated from the spec -qpid::framing::BasicHeaderProperties::BasicHeaderProperties() : deliveryMode(DeliveryMode(0)), priority(0), timestamp(0){} +qpid::framing::BasicHeaderProperties::BasicHeaderProperties() : deliveryMode(DeliveryMode(0)), + priority(0), + timestamp(0), + contentLength(0){} qpid::framing::BasicHeaderProperties::~BasicHeaderProperties(){} uint32_t qpid::framing::BasicHeaderProperties::size() const{ @@ -41,6 +44,7 @@ uint32_t qpid::framing::BasicHeaderProperties::size() const{ if(userId.length() > 0) bytes += userId.length() + 1; if(appId.length() > 0) bytes += appId.length() + 1; if(clusterId.length() > 0) bytes += clusterId.length() + 1; + if(contentLength != 0) bytes += 8; return bytes; } @@ -63,6 +67,7 @@ void qpid::framing::BasicHeaderProperties::encode(qpid::framing::Buffer& buffer) if(userId.length() > 0) buffer.putShortString(userId); if(appId.length() > 0) buffer.putShortString(appId); if(clusterId.length() > 0) buffer.putShortString(clusterId); + if(contentLength != 0) buffer.putLongLong(contentLength); } void qpid::framing::BasicHeaderProperties::decode(qpid::framing::Buffer& buffer, uint32_t /*size*/){ @@ -81,6 +86,7 @@ void qpid::framing::BasicHeaderProperties::decode(qpid::framing::Buffer& buffer, if(flags & (1 << 4)) buffer.getShortString(userId); if(flags & (1 << 3)) buffer.getShortString(appId); if(flags & (1 << 2)) buffer.getShortString(clusterId); + if(flags & (1 << 1)) contentLength = buffer.getLongLong(); } uint16_t qpid::framing::BasicHeaderProperties::getFlags() const{ @@ -99,5 +105,32 @@ uint16_t qpid::framing::BasicHeaderProperties::getFlags() const{ if(userId.length() > 0) flags |= (1 << 4); if(appId.length() > 0) flags |= (1 << 3); if(clusterId.length() > 0) flags |= (1 << 2); + if(contentLength != 0) flags |= (1 << 1); return flags; } + +namespace qpid{ +namespace framing{ + + std::ostream& operator<<(std::ostream& out, const BasicHeaderProperties& props) + { + if(props.contentType.length() > 0) out << "contentType=" << props.contentType << ";"; + if(props.contentEncoding.length() > 0) out << "contentEncoding=" << props.contentEncoding << ";"; + if(props.headers.count() > 0) out << "headers=" << props.headers << ";"; + if(props.deliveryMode != 0) out << "deliveryMode=" << props.deliveryMode << ";"; + if(props.priority != 0) out << "priority=" << props.priority << ";"; + if(props.correlationId.length() > 0) out << "correlationId=" << props.correlationId << ";"; + if(props.replyTo.length() > 0) out << "replyTo=" << props.replyTo << ";"; + if(props.expiration.length() > 0) out << "expiration=" << props.expiration << ";"; + if(props.messageId.length() > 0) out << "messageId=" << props.messageId << ";"; + if(props.timestamp != 0) out << "timestamp=" << props.timestamp << ";"; + if(props.type.length() > 0) out << "type=" << props.type << ";"; + if(props.userId.length() > 0) out << "userId=" << props.userId << ";"; + if(props.appId.length() > 0) out << "appId=" << props.appId << ";"; + if(props.clusterId.length() > 0) out << "clusterId=" << props.clusterId << ";"; + if(props.contentLength != 0) out << "contentLength=" << props.contentLength << ";"; + + return out; + } + +}} diff --git a/qpid/cpp/src/qpid/framing/BasicHeaderProperties.h b/qpid/cpp/src/qpid/framing/BasicHeaderProperties.h index a8ef401b50..d6c71437fb 100644 --- a/qpid/cpp/src/qpid/framing/BasicHeaderProperties.h +++ b/qpid/cpp/src/qpid/framing/BasicHeaderProperties.h @@ -47,15 +47,18 @@ class BasicHeaderProperties : public HeaderProperties string userId; string appId; string clusterId; + uint64_t contentLength; uint16_t getFlags() const; public: + static const uint16_t TYPE = BASIC; + BasicHeaderProperties(); virtual ~BasicHeaderProperties(); virtual uint32_t size() const; virtual void encode(Buffer& buffer) const; - virtual void decode(Buffer& buffer, uint32_t size); + virtual void decode(Buffer& buffer, uint32_t size = 0); virtual uint8_t classId() const { return BASIC; } @@ -74,6 +77,7 @@ class BasicHeaderProperties : public HeaderProperties string getUserId() const { return userId; } string getAppId() const { return appId; } string getClusterId() const { return clusterId; } + uint64_t getContentLength() const { return contentLength; } void setContentType(const string& _type){ contentType = _type; } void setContentEncoding(const string& encoding){ contentEncoding = encoding; } @@ -89,6 +93,9 @@ class BasicHeaderProperties : public HeaderProperties void setUserId(const string& _userId){ userId = _userId; } void setAppId(const string& _appId){appId = _appId; } void setClusterId(const string& _clusterId){ clusterId = _clusterId; } + void setContentLength(uint64_t _contentLength){ contentLength = _contentLength; } + + friend std::ostream& operator<<(std::ostream&, const BasicHeaderProperties&); /** \internal * Template to copy between types like BasicHeaderProperties. @@ -109,6 +116,7 @@ class BasicHeaderProperties : public HeaderProperties to.setUserId(from.getUserId()); to.setAppId(from.getAppId()); to.setClusterId(from.getClusterId()); + to.setContentLength(from.getContentLength()); } }; }} diff --git a/qpid/cpp/src/qpid/framing/ChannelAdapter.cpp b/qpid/cpp/src/qpid/framing/ChannelAdapter.cpp index 25ff46acdd..86b60d896b 100644 --- a/qpid/cpp/src/qpid/framing/ChannelAdapter.cpp +++ b/qpid/cpp/src/qpid/framing/ChannelAdapter.cpp @@ -51,7 +51,7 @@ void ChannelAdapter::init(ChannelId i, OutputHandler& out, ProtocolVersion v) void ChannelAdapter::send(const AMQBody& body) { assertChannelOpen(); - AMQFrame frame(getVersion(), getId(), body); + AMQFrame frame(getId(), body); handlers.out->handle(frame); } diff --git a/qpid/cpp/src/qpid/framing/FrameSet.cpp b/qpid/cpp/src/qpid/framing/FrameSet.cpp new file mode 100644 index 0000000000..434f1b3aad --- /dev/null +++ b/qpid/cpp/src/qpid/framing/FrameSet.cpp @@ -0,0 +1,83 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "FrameSet.h" +#include "qpid/framing/all_method_bodies.h" +#include "qpid/framing/frame_functors.h" +#include "qpid/framing/BasicHeaderProperties.h" +#include "qpid/framing/MessageProperties.h" +#include "qpid/framing/TypeFilter.h" + +using namespace qpid::framing; +using namespace boost; + +FrameSet::FrameSet(const SequenceNumber& _id) : id(_id) {} + +void FrameSet::append(AMQFrame& part) +{ + parts.push_back(part); +} + +bool FrameSet::isComplete() const +{ + //TODO: should eventually use the 0-10 frame header flags when available + const AMQMethodBody* method = getMethod(); + if (!method) { + return false; + } else if (method->isContentBearing()) { + const AMQHeaderBody* header = getHeaders(); + if (header) { + return header->getContentLength() == getContentSize(); + } else { + return false; + } + } else { + return true; + } +} + +const AMQMethodBody* FrameSet::getMethod() const +{ + return parts.empty() ? 0 : dynamic_cast<const AMQMethodBody*>(parts[0].getBody()); +} + +const AMQHeaderBody* FrameSet::getHeaders() const +{ + return parts.size() < 2 ? 0 : dynamic_cast<const AMQHeaderBody*>(parts[1].getBody()); +} + +AMQHeaderBody* FrameSet::getHeaders() +{ + return parts.size() < 2 ? 0 : dynamic_cast<AMQHeaderBody*>(parts[1].getBody()); +} + +uint64_t FrameSet::getContentSize() const +{ + SumBodySize sum; + map_if(sum, TypeFilter(CONTENT_BODY)); + return sum.getSize(); +} + +void FrameSet::getContent(std::string& out) const +{ + AccumulateContent accumulator(out); + map_if(accumulator, TypeFilter(CONTENT_BODY)); +} diff --git a/qpid/cpp/src/qpid/framing/FrameSet.h b/qpid/cpp/src/qpid/framing/FrameSet.h new file mode 100644 index 0000000000..d6d5cd7a13 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/FrameSet.h @@ -0,0 +1,102 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <string> +#include <vector> +#include "qpid/framing/amqp_framing.h" +#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/SequenceNumber.h" + +#ifndef _FrameSet_ +#define _FrameSet_ + +namespace qpid { +namespace framing { + +/** + * Collects the frames representing a message. + */ +class FrameSet +{ + typedef std::vector<AMQFrame> Frames; + const SequenceNumber id; + Frames parts; + +public: + typedef boost::shared_ptr<FrameSet> shared_ptr; + + FrameSet(const SequenceNumber& id); + void append(AMQFrame& part); + bool isComplete() const; + + uint64_t getContentSize() const; + void getContent(std::string&) const; + + const AMQMethodBody* getMethod() const; + const AMQHeaderBody* getHeaders() const; + AMQHeaderBody* getHeaders(); + + template <class T> bool isA() const { + const AMQMethodBody* method = getMethod(); + return method && method->isA<T>(); + } + + template <class T> const T* as() const { + const AMQMethodBody* method = getMethod(); + return (method && method->isA<T>()) ? dynamic_cast<const T*>(method) : 0; + } + + template <class T> const T* getHeaderProperties() const { + const AMQHeaderBody* header = getHeaders(); + return header ? header->get<T>() : 0; + } + + const SequenceNumber& getId() const { return id; } + + template <class P> void remove(P predicate) { + parts.erase(remove_if(parts.begin(), parts.end(), predicate), parts.end()); + } + + template <class F> void map(F& functor) { + for_each(parts.begin(), parts.end(), functor); + } + + template <class F> void map(F& functor) const { + for_each(parts.begin(), parts.end(), functor); + } + + template <class F, class P> void map_if(F& functor, P predicate) { + for(Frames::iterator i = parts.begin(); i != parts.end(); i++) { + if (predicate(*i)) functor(*i); + } + } + + template <class F, class P> void map_if(F& functor, P predicate) const { + for(Frames::const_iterator i = parts.begin(); i != parts.end(); i++) { + if (predicate(*i)) functor(*i); + } + } +}; + +} +} + + +#endif diff --git a/qpid/cpp/src/qpid/framing/ProtocolInitiation.cpp b/qpid/cpp/src/qpid/framing/ProtocolInitiation.cpp index a6d1b17f6e..7164bceb12 100644 --- a/qpid/cpp/src/qpid/framing/ProtocolInitiation.cpp +++ b/qpid/cpp/src/qpid/framing/ProtocolInitiation.cpp @@ -31,7 +31,7 @@ ProtocolInitiation::ProtocolInitiation(ProtocolVersion p) : version(p) {} ProtocolInitiation::~ProtocolInitiation(){} -void ProtocolInitiation::encode(Buffer& buffer){ +void ProtocolInitiation::encode(Buffer& buffer) const { buffer.putOctet('A'); buffer.putOctet('M'); buffer.putOctet('Q'); diff --git a/qpid/cpp/src/qpid/framing/ProtocolInitiation.h b/qpid/cpp/src/qpid/framing/ProtocolInitiation.h index adfdc8215d..31c73eb124 100644 --- a/qpid/cpp/src/qpid/framing/ProtocolInitiation.h +++ b/qpid/cpp/src/qpid/framing/ProtocolInitiation.h @@ -39,7 +39,7 @@ public: ProtocolInitiation(uint8_t major, uint8_t minor); ProtocolInitiation(ProtocolVersion p); virtual ~ProtocolInitiation(); - virtual void encode(Buffer& buffer); + virtual void encode(Buffer& buffer) const; virtual bool decode(Buffer& buffer); inline virtual uint32_t size() const { return 8; } inline uint8_t getMajor() const { return version.getMajor(); } diff --git a/qpid/cpp/src/qpid/framing/SendContent.cpp b/qpid/cpp/src/qpid/framing/SendContent.cpp new file mode 100644 index 0000000000..568cc01665 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/SendContent.cpp @@ -0,0 +1,51 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "SendContent.h" + +qpid::framing::SendContent::SendContent(FrameHandler& h, uint16_t c, uint16_t mfs) : handler(h), channel(c), maxFrameSize(mfs) {} + +void qpid::framing::SendContent::operator()(AMQFrame& f) const +{ + uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead(); + const AMQContentBody* body(f.castBody<AMQContentBody>()); + if (body->size() > maxContentSize) { + uint32_t offset = 0; + for (int chunk = body->size() / maxContentSize; chunk > 0; chunk--) { + sendFragment(*body, offset, maxContentSize); + offset += maxContentSize; + } + uint32_t remainder = body->size() % maxContentSize; + if (remainder) { + sendFragment(*body, offset, remainder); + } + } else { + AMQFrame copy(f); + copy.setChannel(channel); + handler.handle(copy); + } +} + +void qpid::framing::SendContent::sendFragment(const AMQContentBody& body, uint32_t offset, uint16_t size) const +{ + AMQFrame fragment(channel, AMQContentBody(body.getData().substr(offset, size))); + handler.handle(fragment); +} diff --git a/qpid/cpp/src/qpid/broker/LazyLoadedContent.h b/qpid/cpp/src/qpid/framing/SendContent.h index 79a33ed7a9..a88319e2f9 100644 --- a/qpid/cpp/src/qpid/broker/LazyLoadedContent.h +++ b/qpid/cpp/src/qpid/framing/SendContent.h @@ -18,32 +18,35 @@ * under the License. * */ -#ifndef _LazyLoadedContent_ -#define _LazyLoadedContent_ +#include <string> +#include "qpid/framing/amqp_framing.h" +#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/FrameHandler.h" -#include "Content.h" -#include "MessageStore.h" -#include "BrokerMessageBase.h" +#ifndef _SendContent_ +#define _SendContent_ namespace qpid { - namespace broker { - class LazyLoadedContent : public Content{ - MessageStore* const store; - Message* const msg; - const uint64_t expectedSize; - public: - LazyLoadedContent( - MessageStore* const store, Message* const msg, - uint64_t expectedSize); - ~LazyLoadedContent(); - void add(qpid::framing::AMQContentBody* data); - uint32_t size(); - void send( - framing::ChannelAdapter&, - uint32_t framesize); - void encode(qpid::framing::Buffer& buffer); - }; - } +namespace framing { + +/** + * Functor that sends frame to handler, refragmenting if + * necessary. Currently only works on content frames but this could be + * changed once we support multi-frame segments in general. + */ +class SendContent +{ + mutable FrameHandler& handler; + const uint16_t channel; + const uint16_t maxFrameSize; + + void sendFragment(const AMQContentBody& body, uint32_t offset, uint16_t size) const; +public: + SendContent(FrameHandler& _handler, uint16_t channel, uint16_t _maxFrameSize); + void operator()(AMQFrame& f) const; +}; + +} } diff --git a/qpid/cpp/src/qpid/broker/InMemoryContent.h b/qpid/cpp/src/qpid/framing/TypeFilter.h index a6fca7ca98..3a607190fd 100644 --- a/qpid/cpp/src/qpid/broker/InMemoryContent.h +++ b/qpid/cpp/src/qpid/framing/TypeFilter.h @@ -18,28 +18,34 @@ * under the License. * */ -#ifndef _InMemoryContent_ -#define _InMemoryContent_ - -#include "Content.h" -#include "qpid/framing/AMQContentBody.h" -#include <vector> +#include <string> +#include "qpid/framing/amqp_framing.h" +#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/FrameHandler.h" +#ifndef _TypeFilter_ +#define _TypeFilter_ namespace qpid { - namespace broker { - class InMemoryContent : public Content{ - typedef std::vector<framing::AMQContentBody> content_list; - typedef content_list::iterator content_iterator; +namespace framing { + +/** + * Predicate that selects frames by type + */ +class TypeFilter +{ + std::vector<uint8_t> types; +public: + TypeFilter(uint8_t type) { add(type); } + TypeFilter(uint8_t type1, uint8_t type2) { add(type1); add(type2); } + void add(uint8_t type) { types.push_back(type); } + bool operator()(const AMQFrame& f) const + { + return find(types.begin(), types.end(), f.getBody()->type()) != types.end(); + } +}; - content_list content; - public: - void add(framing::AMQContentBody* data); - uint32_t size(); - void send(framing::ChannelAdapter&, uint32_t framesize); - void encode(framing::Buffer& buffer); - }; - } +} } diff --git a/qpid/cpp/src/qpid/framing/frame_functors.h b/qpid/cpp/src/qpid/framing/frame_functors.h new file mode 100644 index 0000000000..3112da8e24 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/frame_functors.h @@ -0,0 +1,108 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <string> +#include <ostream> +#include <iostream> +#include "qpid/framing/amqp_framing.h" +#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/Buffer.h" + +#ifndef _frame_functors_ +#define _frame_functors_ + +namespace qpid { +namespace framing { + +class SumFrameSize +{ + uint64_t size; +public: + SumFrameSize() : size(0) {} + void operator()(const AMQFrame& f) { size += f.size(); } + uint64_t getSize() { return size; } +}; + +class SumBodySize +{ + uint64_t size; +public: + SumBodySize() : size(0) {} + void operator()(const AMQFrame& f) { size += f.getBody()->size(); } + uint64_t getSize() { return size; } +}; + +class EncodeFrame +{ + Buffer& buffer; +public: + EncodeFrame(Buffer& b) : buffer(b) {} + void operator()(const AMQFrame& f) { f.encode(buffer); } +}; + +class EncodeBody +{ + Buffer& buffer; +public: + EncodeBody(Buffer& b) : buffer(b) {} + void operator()(const AMQFrame& f) { f.getBody()->encode(buffer); } +}; + +class AccumulateContent +{ + std::string& content; +public: + AccumulateContent(std::string& c) : content(c) {} + void operator()(const AMQFrame& f) { content += f.castBody<AMQContentBody>()->getData(); } +}; + +class Relay +{ + FrameHandler& handler; + const uint16_t channel; + +public: + Relay(FrameHandler& h, uint16_t c) : handler(h), channel(c) {} + + void operator()(AMQFrame& f) + { + AMQFrame copy(f); + copy.setChannel(channel); + handler.handle(copy); + } +}; + +class Print +{ + std::ostream& out; +public: + Print(std::ostream& o) : out(o) {} + + void operator()(const AMQFrame& f) + { + out << f << std::endl; + } +}; + +} +} + + +#endif diff --git a/qpid/cpp/src/tests/BrokerChannelTest.cpp b/qpid/cpp/src/tests/BrokerChannelTest.cpp index 3253a3d27a..1e5a30f157 100644 --- a/qpid/cpp/src/tests/BrokerChannelTest.cpp +++ b/qpid/cpp/src/tests/BrokerChannelTest.cpp @@ -19,12 +19,14 @@ * */ #include "qpid/broker/BrokerChannel.h" -#include "qpid/broker/BrokerMessage.h" #include "qpid/broker/BrokerQueue.h" #include "qpid/broker/FanOutExchange.h" +#include "qpid/broker/Message.h" +#include "qpid/broker/MessageDelivery.h" #include "qpid/broker/NullMessageStore.h" #include "qpid_test_plugin.h" #include <iostream> +#include <sstream> #include <memory> #include "qpid/framing/AMQP_HighestVersion.h" #include "qpid/framing/AMQFrame.h" @@ -72,7 +74,6 @@ class BrokerChannelTest : public CppUnit::TestCase CPPUNIT_TEST_SUITE(BrokerChannelTest); CPPUNIT_TEST(testConsumerMgmt);; CPPUNIT_TEST(testDeliveryNoAck); - CPPUNIT_TEST(testStaging); CPPUNIT_TEST(testQueuePolicy); CPPUNIT_TEST(testFlow); CPPUNIT_TEST(testAsyncMesgToMoreThanOneQueue); @@ -155,7 +156,16 @@ class BrokerChannelTest : public CppUnit::TestCase void check() { - CPPUNIT_ASSERT(expected.empty()); + if (!expected.empty()) { + std::stringstream error; + error << "Expected: "; + while (!expected.empty()) { + MethodCall& m = expected.front(); + error << m.name << "(" << m.msg << ", '" << m.data << "'); "; + expected.pop(); + } + CPPUNIT_FAIL(error.str()); + } } }; @@ -173,7 +183,7 @@ class BrokerChannelTest : public CppUnit::TestCase void testConsumerMgmt(){ Queue::shared_ptr queue(new Queue("my_queue")); - Channel channel(connection, recorder, 0, 0); + Channel channel(connection, recorder, 0); channel.open(); CPPUNIT_ASSERT(!channel.exists("my_consumer")); @@ -203,7 +213,7 @@ class BrokerChannelTest : public CppUnit::TestCase Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14)); Queue::shared_ptr queue(new Queue("my_queue")); string tag("test"); - DeliveryToken::shared_ptr token(BasicMessage::createConsumeToken("my-token")); + DeliveryToken::shared_ptr token(MessageDelivery::getBasicConsumeToken("my-token")); channel.consume(token, tag, queue, false, false, 0); queue->deliver(msg); sleep(2); @@ -213,48 +223,6 @@ class BrokerChannelTest : public CppUnit::TestCase CPPUNIT_ASSERT_EQUAL(token, recorder.delivered.front().second); } - void testStaging(){ - MockMessageStore store; - connection.setFrameMax(1000); - connection.setStagingThreshold(10); - Channel channel(connection, recorder, 1, &store); - const string data[] = {"abcde", "fghij", "klmno"}; - - Message* msg = new BasicMessage(0, "my_exchange", "my_routing_key", false, false); - - store.expect(); - store.stage(*msg); - for (int i = 0; i < 3; i++) { - store.appendContent(*msg, data[i]); - } - store.destroy(*msg); - store.test(); - - Exchange::shared_ptr exchange = - broker->getExchanges().declare("my_exchange", "fanout").first; - Queue::shared_ptr queue(new Queue("my_queue")); - exchange->bind(queue, "", 0); - - AMQHeaderBody header(BASIC); - uint64_t contentSize(0); - for (int i = 0; i < 3; i++) { - contentSize += data[i].size(); - } - header.setContentSize(contentSize); - channel.handlePublish(msg); - channel.handleHeader(&header); - - for (int i = 0; i < 3; i++) { - AMQContentBody body(data[i]); - channel.handleContent(&body); - } - Message::shared_ptr msg2 = queue->dequeue(); - CPPUNIT_ASSERT_EQUAL(msg, msg2.get()); - msg2.reset();//should trigger destroy call - - store.check(); - } - //NOTE: strictly speaking this should/could be part of QueueTest, //but as it can usefully use the same utility classes as this @@ -279,7 +247,6 @@ class BrokerChannelTest : public CppUnit::TestCase store.expect(); store.stage(*msg3); - store.destroy(*msg3); store.test(); Queue::shared_ptr queue(new Queue("my_queue", false, &store, 0)); @@ -348,16 +315,17 @@ class BrokerChannelTest : public CppUnit::TestCase CPPUNIT_ASSERT_EQUAL(ChannelId(0), handler.frames[0].getChannel()); CPPUNIT_ASSERT(dynamic_cast<ConnectionStartBody*>(handler.frames[0].getBody())); - const string data("abcdefghijklmn"); - - Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14)); - addContent(msg, data); Queue::shared_ptr queue(new Queue("my_queue")); string tag("test"); - DeliveryToken::shared_ptr token(BasicMessage::createConsumeToken("my-token")); + DeliveryToken::shared_ptr token(MessageDelivery::getBasicConsumeToken("my-token")); channel.consume(token, tag, queue, false, false, 0); channel.flow(false); + + //'publish' a message + Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14)); + addContent(msg, "abcdefghijklmn"); queue->deliver(msg); + //ensure no messages have been delivered CPPUNIT_ASSERT_EQUAL((size_t) 0, recorder.delivered.size()); @@ -369,21 +337,26 @@ class BrokerChannelTest : public CppUnit::TestCase CPPUNIT_ASSERT_EQUAL(token, recorder.delivered.front().second); } - Message* createMessage(const string& exchange, const string& routingKey, const string& messageId, uint64_t contentSize) + Message::shared_ptr createMessage(const string& exchange, const string& routingKey, const string& messageId, uint64_t contentSize) { - BasicMessage* msg = new BasicMessage( - 0, exchange, routingKey, false, false); - AMQHeaderBody header(BASIC); - header.setContentSize(contentSize); - msg->setHeader(&header); - msg->getHeaderProperties()->setMessageId(messageId); + Message::shared_ptr msg(new Message()); + + AMQFrame method(0, MessageTransferBody(ProtocolVersion(), 0, exchange, 0, 0)); + AMQFrame header(0, AMQHeaderBody()); + + msg->getFrames().append(method); + msg->getFrames().append(header); + MessageProperties* props = msg->getFrames().getHeaders()->get<MessageProperties>(true); + props->setContentLength(contentSize); + props->setMessageId(messageId); + msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey); return msg; } void addContent(Message::shared_ptr msg, const string& data) { - AMQContentBody body(data); - msg->addContent(&body); + AMQFrame content(0, AMQContentBody(data)); + msg->getFrames().append(content); } }; diff --git a/qpid/cpp/src/tests/Cluster.cpp b/qpid/cpp/src/tests/Cluster.cpp index a9caa89321..b3a6a745b8 100644 --- a/qpid/cpp/src/tests/Cluster.cpp +++ b/qpid/cpp/src/tests/Cluster.cpp @@ -34,7 +34,7 @@ static const ProtocolVersion VER; /** Verify membership in a cluster with one member. */ BOOST_AUTO_TEST_CASE(testClusterOne) { TestCluster cluster("clusterOne", "amqp:one:1"); - AMQFrame send(VER, 1, SessionOpenBody(VER)); + AMQFrame send(1, SessionOpenBody(VER)); cluster.handle(send); AMQFrame received; BOOST_REQUIRE(cluster.received.waitPop(received)); @@ -60,7 +60,7 @@ BOOST_AUTO_TEST_CASE(testClusterTwo) { BOOST_REQUIRE(cluster.waitFor(2)); // Myself and child. // Exchange frames with child. - AMQFrame send(VER, 1, SessionOpenBody(VER)); + AMQFrame send(1, SessionOpenBody(VER)); cluster.handle(send); AMQFrame received; BOOST_REQUIRE(cluster.received.waitPop(received)); @@ -91,8 +91,8 @@ struct CountHandler : public FrameHandler { /** Test the ClassifierHandler */ BOOST_AUTO_TEST_CASE(testClassifierHandlerWiring) { - AMQFrame queueDecl(VER, 0, QueueDeclareBody(VER)); - AMQFrame messageTrans(VER, 0, MessageTransferBody(VER)); + AMQFrame queueDecl(0, QueueDeclareBody(VER)); + AMQFrame messageTrans(0, MessageTransferBody(VER)); shared_ptr<CountHandler> wiring(new CountHandler()); shared_ptr<CountHandler> other(new CountHandler()); diff --git a/qpid/cpp/src/tests/Cluster_child.cpp b/qpid/cpp/src/tests/Cluster_child.cpp index bd76e58127..c03d7396f0 100644 --- a/qpid/cpp/src/tests/Cluster_child.cpp +++ b/qpid/cpp/src/tests/Cluster_child.cpp @@ -40,7 +40,7 @@ void clusterTwo() { BOOST_CHECK_TYPEID_EQUAL(SessionOpenBody, *frame.getBody()); BOOST_CHECK_EQUAL(2u, cluster.size()); // Me and parent - AMQFrame send(VER, 1, SessionAttachedBody(VER)); + AMQFrame send(1, SessionAttachedBody(VER)); cluster.handle(send); BOOST_REQUIRE(cluster.received.waitPop(frame)); BOOST_CHECK_TYPEID_EQUAL(SessionAttachedBody, *frame.getBody()); diff --git a/qpid/cpp/src/tests/ExchangeTest.cpp b/qpid/cpp/src/tests/ExchangeTest.cpp index ef2646519d..59941864e2 100644 --- a/qpid/cpp/src/tests/ExchangeTest.cpp +++ b/qpid/cpp/src/tests/ExchangeTest.cpp @@ -31,6 +31,7 @@ #include "qpid_test_plugin.h" #include <iostream> #include "qpid/framing/BasicGetBody.h" +#include "MessageUtils.h" using namespace qpid::broker; using namespace qpid::framing; @@ -63,7 +64,7 @@ class ExchangeTest : public CppUnit::TestCase queue.reset(); queue2.reset(); - Message::shared_ptr msgPtr(new BasicMessage(0, "e", "A", true, true)); + Message::shared_ptr msgPtr(MessageUtils::createMessage("exchange", "key", "id")); DeliverableMessage msg(msgPtr); topic.route(msg, "abc", 0); direct.route(msg, "abc", 0); diff --git a/qpid/cpp/src/tests/FramingTest.cpp b/qpid/cpp/src/tests/FramingTest.cpp index a0dd8d37f6..1b843defc1 100644 --- a/qpid/cpp/src/tests/FramingTest.cpp +++ b/qpid/cpp/src/tests/FramingTest.cpp @@ -137,8 +137,7 @@ class FramingTest : public CppUnit::TestCase { std::string a = "hostA"; std::string b = "hostB"; - AMQFrame in(version, 999, - ConnectionRedirectBody(version, a, b)); + AMQFrame in(999, ConnectionRedirectBody(version, a, b)); in.encode(buffer); buffer.flip(); AMQFrame out; @@ -149,7 +148,7 @@ class FramingTest : public CppUnit::TestCase void testBasicConsumeOkBodyFrame() { std::string s = "hostA"; - AMQFrame in(version, 999, BasicConsumeOkBody(version, s)); + AMQFrame in(999, BasicConsumeOkBody(version, s)); in.encode(buffer); buffer.flip(); AMQFrame out; diff --git a/qpid/cpp/src/tests/HeaderTest.cpp b/qpid/cpp/src/tests/HeaderTest.cpp index 17381cc868..df2230342c 100644 --- a/qpid/cpp/src/tests/HeaderTest.cpp +++ b/qpid/cpp/src/tests/HeaderTest.cpp @@ -36,8 +36,8 @@ public: void testGenericProperties() { - AMQHeaderBody body(BASIC); - dynamic_cast<BasicHeaderProperties*>(body.getProperties())->getHeaders().setString("A", "BCDE"); + AMQHeaderBody body; + body.get<BasicHeaderProperties>(true)->getHeaders().setString("A", "BCDE"); Buffer buffer(100); body.encode(buffer); @@ -45,7 +45,7 @@ public: AMQHeaderBody body2; body2.decode(buffer, body.size()); BasicHeaderProperties* props = - dynamic_cast<BasicHeaderProperties*>(body2.getProperties()); + body2.get<BasicHeaderProperties>(true); CPPUNIT_ASSERT_EQUAL(std::string("BCDE"), props->getHeaders().getString("A")); } @@ -64,10 +64,11 @@ public: string userId("guest"); string appId("just testing"); string clusterId("no clustering required"); + uint64_t contentLength(54321); - AMQHeaderBody body(BASIC); + AMQFrame out(0, AMQHeaderBody()); BasicHeaderProperties* properties = - dynamic_cast<BasicHeaderProperties*>(body.getProperties()); + out.castBody<AMQHeaderBody>()->get<BasicHeaderProperties>(true); properties->setContentType(contentType); properties->getHeaders().setString("A", "BCDE"); properties->setDeliveryMode(deliveryMode); @@ -81,13 +82,14 @@ public: properties->setUserId(userId); properties->setAppId(appId); properties->setClusterId(clusterId); + properties->setContentLength(contentLength); Buffer buffer(10000); - body.encode(buffer); + out.encode(buffer); buffer.flip(); - AMQHeaderBody temp; - temp.decode(buffer, body.size()); - properties = dynamic_cast<BasicHeaderProperties*>(temp.getProperties()); + AMQFrame in; + in.decode(buffer); + properties = in.castBody<AMQHeaderBody>()->get<BasicHeaderProperties>(true); CPPUNIT_ASSERT_EQUAL(contentType, properties->getContentType()); CPPUNIT_ASSERT_EQUAL(std::string("BCDE"), properties->getHeaders().getString("A")); @@ -102,6 +104,7 @@ public: CPPUNIT_ASSERT_EQUAL(userId, properties->getUserId()); CPPUNIT_ASSERT_EQUAL(appId, properties->getAppId()); CPPUNIT_ASSERT_EQUAL(clusterId, properties->getClusterId()); + CPPUNIT_ASSERT_EQUAL(contentLength, properties->getContentLength()); } void testSomeSpecificProperties(){ @@ -111,9 +114,9 @@ public: string expiration("Z"); uint64_t timestamp(0xabe4a34a); - AMQHeaderBody body(BASIC); + AMQHeaderBody body; BasicHeaderProperties* properties = - dynamic_cast<BasicHeaderProperties*>(body.getProperties()); + body.get<BasicHeaderProperties>(true); properties->setContentType(contentType); properties->setDeliveryMode(deliveryMode); properties->setPriority(priority); @@ -125,7 +128,7 @@ public: buffer.flip(); AMQHeaderBody temp; temp.decode(buffer, body.size()); - properties = dynamic_cast<BasicHeaderProperties*>(temp.getProperties()); + properties = temp.get<BasicHeaderProperties>(true); CPPUNIT_ASSERT_EQUAL(contentType, properties->getContentType()); CPPUNIT_ASSERT_EQUAL((int) deliveryMode, (int) properties->getDeliveryMode()); diff --git a/qpid/cpp/src/tests/InMemoryContentTest.cpp b/qpid/cpp/src/tests/InMemoryContentTest.cpp deleted file mode 100644 index bc95548d45..0000000000 --- a/qpid/cpp/src/tests/InMemoryContentTest.cpp +++ /dev/null @@ -1,91 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "qpid/broker/InMemoryContent.h" -#include "qpid_test_plugin.h" -#include "qpid/framing/AMQP_HighestVersion.h" -#include <iostream> -#include <list> -#include "qpid/framing/AMQFrame.h" -#include "MockChannel.h" - -using std::list; -using std::string; -using boost::dynamic_pointer_cast; -using namespace qpid::broker; -using namespace qpid::framing; - - -class InMemoryContentTest : public CppUnit::TestCase -{ - CPPUNIT_TEST_SUITE(InMemoryContentTest); - CPPUNIT_TEST(testRefragmentation); - CPPUNIT_TEST_SUITE_END(); - -public: - void testRefragmentation() - { - {//no remainder - string out[] = {"abcde", "fghij", "klmno", "pqrst"}; - string in[] = {out[0] + out[1], out[2] + out[3]}; - refragment(2, in, 4, out); - } - {//remainder for last frame - string out[] = {"abcde", "fghij", "klmno", "pqrst", "uvw"}; - string in[] = {out[0] + out[1], out[2] + out[3] + out[4]}; - refragment(2, in, 5, out); - } - } - - - void refragment(size_t inCount, string* in, size_t outCount, string* out, uint32_t framesize = 5) - { - InMemoryContent content; - MockChannel channel(3); - - addframes(content, inCount, in); - content.send(channel, framesize); - CPPUNIT_ASSERT_EQUAL(outCount, channel.out.frames.size()); - - for (unsigned int i = 0; i < outCount; i++) { - AMQContentBody* chunk = dynamic_cast<AMQContentBody*>( - channel.out.frames[i].getBody()); - CPPUNIT_ASSERT(chunk); - CPPUNIT_ASSERT_EQUAL(out[i], chunk->getData()); - CPPUNIT_ASSERT_EQUAL( - ChannelId(3), channel.out.frames[i].getChannel()); - } - } - - void addframes(InMemoryContent& content, size_t frameCount, string* frameData) - { - for (unsigned int i = 0; i < frameCount; i++) { - AMQContentBody frame(frameData[i]); - content.add(&frame); - } - } - - -}; - -// Make this test suite a plugin. -CPPUNIT_PLUGIN_IMPLEMENT(); -CPPUNIT_TEST_SUITE_REGISTRATION(InMemoryContentTest); - diff --git a/qpid/cpp/src/tests/LazyLoadedContentTest.cpp b/qpid/cpp/src/tests/LazyLoadedContentTest.cpp deleted file mode 100644 index df46f6b48e..0000000000 --- a/qpid/cpp/src/tests/LazyLoadedContentTest.cpp +++ /dev/null @@ -1,113 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "qpid/broker/LazyLoadedContent.h" -#include "qpid/framing/AMQP_HighestVersion.h" -#include "qpid/broker/NullMessageStore.h" -#include "qpid_test_plugin.h" -#include <iostream> -#include <list> -#include <sstream> -#include "qpid/framing/AMQFrame.h" -#include "MockChannel.h" -using std::list; -using std::string; -using boost::dynamic_pointer_cast; -using namespace qpid::broker; -using namespace qpid::framing; - - - -class LazyLoadedContentTest : public CppUnit::TestCase -{ - CPPUNIT_TEST_SUITE(LazyLoadedContentTest); - CPPUNIT_TEST(testFragmented); - CPPUNIT_TEST(testWhole); - CPPUNIT_TEST(testHalved); - CPPUNIT_TEST_SUITE_END(); - - class TestMessageStore : public NullMessageStore - { - const string content; - - public: - TestMessageStore(const string& _content) : content(_content) {} - - void loadContent(PersistableMessage&, string& data, uint64_t offset, uint32_t length) - { - if (offset + length <= content.size()) { - data = content.substr(offset, length); - } else{ - std::stringstream error; - error << "Invalid segment: offset=" << offset << ", length=" << length << ", content_length=" << content.size(); - throw qpid::Exception(error.str()); - } - } - }; - - -public: - void testFragmented() - { - string data = "abcdefghijklmnopqrstuvwxyz"; - uint32_t framesize = 5; - string out[] = {"abcde", "fghij", "klmno", "pqrst", "uvwxy", "z"}; - load(data, 6, out, framesize); - } - - void testWhole() - { - string data = "abcdefghijklmnopqrstuvwxyz"; - uint32_t framesize = 50; - string out[] = {data}; - load(data, 1, out, framesize); - } - - void testHalved() - { - string data = "abcdefghijklmnopqrstuvwxyz"; - uint32_t framesize = 13; - string out[] = {"abcdefghijklm", "nopqrstuvwxyz"}; - load(data, 2, out, framesize); - } - - void load(string& in, size_t outCount, string* out, uint32_t framesize) - { - TestMessageStore store(in); - LazyLoadedContent content(&store, 0, in.size()); - MockChannel channel(3); - content.send(channel, framesize); - CPPUNIT_ASSERT_EQUAL(outCount, channel.out.frames.size()); - - for (unsigned int i = 0; i < outCount; i++) { - AMQContentBody* chunk(dynamic_cast<AMQContentBody*>( - channel.out.frames[i].getBody())); - CPPUNIT_ASSERT(chunk); - CPPUNIT_ASSERT_EQUAL(out[i], chunk->getData()); - CPPUNIT_ASSERT_EQUAL( - ChannelId(3), channel.out.frames[i].getChannel()); - } - } -}; - -// Make this test suite a plugin. -CPPUNIT_PLUGIN_IMPLEMENT(); -CPPUNIT_TEST_SUITE_REGISTRATION(LazyLoadedContentTest); - diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am index 34e7e973ac..7ff6a843a9 100644 --- a/qpid/cpp/src/tests/Makefile.am +++ b/qpid/cpp/src/tests/Makefile.am @@ -82,11 +82,8 @@ broker_unit_tests = \ DtxWorkRecordTest \ ExchangeTest \ HeadersExchangeTest \ - InMemoryContentTest \ - LazyLoadedContentTest \ MessageBuilderTest \ MessageTest \ - ReferenceTest \ QueueRegistryTest \ QueueTest \ QueuePolicyTest \ @@ -142,6 +139,7 @@ EXTRA_DIST += \ .valgrind.supp-default \ .valgrindrc-default \ InProcessBroker.h \ + MessageUtils.h \ MockChannel.h \ MockConnectionInputHandler.h \ TxMocks.h \ diff --git a/qpid/cpp/src/tests/MessageBuilderTest.cpp b/qpid/cpp/src/tests/MessageBuilderTest.cpp index a12fc603ce..341fdf56f5 100644 --- a/qpid/cpp/src/tests/MessageBuilderTest.cpp +++ b/qpid/cpp/src/tests/MessageBuilderTest.cpp @@ -18,15 +18,13 @@ * under the License. * */ -#include "qpid/Exception.h" -#include "qpid/broker/BrokerMessage.h" +#include "qpid/broker/Message.h" #include "qpid/broker/MessageBuilder.h" #include "qpid/broker/NullMessageStore.h" -#include "qpid/framing/Buffer.h" +#include "qpid/framing/frame_functors.h" +#include "qpid/framing/TypeFilter.h" #include "qpid_test_plugin.h" -#include <iostream> -#include <memory> -#include "MockChannel.h" +#include <list> using namespace boost; using namespace qpid::broker; @@ -35,72 +33,55 @@ using namespace qpid::sys; class MessageBuilderTest : public CppUnit::TestCase { - struct MockHandler : CompletionHandler { - Message::shared_ptr msg; + class MockMessageStore : public NullMessageStore + { + enum Op {STAGE=1, APPEND=2}; - virtual void complete(Message::shared_ptr _msg){ - msg = _msg; + uint64_t id; + PersistableMessage* expectedMsg; + string expectedData; + std::list<Op> ops; + + void checkExpectation(Op actual) + { + CPPUNIT_ASSERT_EQUAL(ops.front(), actual); + ops.pop_front(); } - }; - class TestMessageStore : public NullMessageStore - { - Buffer* header; - Buffer* content; - const uint32_t contentBufferSize; - - public: + public: + MockMessageStore() : id(0), expectedMsg(0) {} - void stage(PersistableMessage& msg) - { - if (msg.getPersistenceId() == 0) { - header = new Buffer(msg.encodedSize()); - msg.encode(*header); - content = new Buffer(contentBufferSize); - msg.setPersistenceId(1); - } else { - throw qpid::Exception("Message already staged!"); - } + void expectStage(PersistableMessage& msg) + { + expectedMsg = &msg; + ops.push_back(STAGE); } - void appendContent(PersistableMessage& msg, const string& data) - { - if (msg.getPersistenceId() == 1) { - content->putRawData(data); - } else { - throw qpid::Exception("Invalid message id!"); - } + void expectAppendContent(PersistableMessage& msg, const string& data) + { + expectedMsg = &msg; + expectedData = data; + ops.push_back(APPEND); } - using NullMessageStore::destroy; + void stage(PersistableMessage& msg) + { + checkExpectation(STAGE); + CPPUNIT_ASSERT_EQUAL(expectedMsg, &msg); + msg.setPersistenceId(++id); + } - void destroy(PersistableMessage& msg) + void appendContent(PersistableMessage& msg, const string& data) { - CPPUNIT_ASSERT(msg.getPersistenceId()); + checkExpectation(APPEND); + CPPUNIT_ASSERT_EQUAL(expectedMsg, &msg); + CPPUNIT_ASSERT_EQUAL(expectedData, data); } - BasicMessage::shared_ptr getRestoredMessage() + bool expectationsMet() { - BasicMessage::shared_ptr msg(new BasicMessage()); - if (header) { - header->flip(); - msg->decodeHeader(*header); - delete header; - header = 0; - if (content) { - content->flip(); - msg->decodeContent(*content); - delete content; - content = 0; - } - } - return msg; + return ops.empty(); } - - //dont care about any of the other methods: - TestMessageStore(uint32_t _contentBufferSize) : NullMessageStore(), header(0), content(0), - contentBufferSize(_contentBufferSize) {} - ~TestMessageStore(){} }; CPPUNIT_TEST_SUITE(MessageBuilderTest); @@ -113,106 +94,115 @@ class MessageBuilderTest : public CppUnit::TestCase public: void testHeaderOnly(){ - MockHandler handler; - MessageBuilder builder(&handler); - - Message::shared_ptr message( - new BasicMessage( - 0, "test", "my_routing_key", false, false)); - AMQHeaderBody header(BASIC); - header.setContentSize(0); - - builder.initialise(message); - CPPUNIT_ASSERT(!handler.msg); - builder.setHeader(&header); - CPPUNIT_ASSERT(handler.msg); - CPPUNIT_ASSERT_EQUAL(message, handler.msg); + MessageBuilder builder; + builder.start(SequenceNumber()); + + std::string exchange("builder-exchange"); + std::string key("builder-exchange"); + + AMQFrame method(0, MessageTransferBody(ProtocolVersion(), 0, exchange, 0, 0)); + AMQFrame header(0, AMQHeaderBody()); + + header.castBody<AMQHeaderBody>()->get<MessageProperties>(true)->setContentLength(0); + header.castBody<AMQHeaderBody>()->get<DeliveryProperties>(true)->setRoutingKey(key); + + builder.handle(method); + builder.handle(header); + + CPPUNIT_ASSERT(builder.getMessage()); + CPPUNIT_ASSERT_EQUAL(exchange, builder.getMessage()->getExchangeName()); + CPPUNIT_ASSERT_EQUAL(key, builder.getMessage()->getRoutingKey()); + CPPUNIT_ASSERT(builder.getMessage()->getFrames().isComplete()); } void test1ContentFrame(){ - MockHandler handler; - MessageBuilder builder(&handler); + MessageBuilder builder; + builder.start(SequenceNumber()); - string data1("abcdefg"); + std::string data("abcdefg"); + std::string exchange("builder-exchange"); + std::string key("builder-exchange"); - Message::shared_ptr message( - new BasicMessage(0, "test", "my_routing_key", false, false)); - AMQHeaderBody header(BASIC); - header.setContentSize(7); - AMQContentBody part1(data1); - - builder.initialise(message); - CPPUNIT_ASSERT(!handler.msg); - builder.setHeader(&header); - CPPUNIT_ASSERT(!handler.msg); - builder.addContent(&part1); - CPPUNIT_ASSERT(handler.msg); - CPPUNIT_ASSERT_EQUAL(message, handler.msg); + AMQFrame method(0, MessageTransferBody(ProtocolVersion(), 0, exchange, 0, 0)); + AMQFrame header(0, AMQHeaderBody()); + AMQFrame content(0, AMQContentBody(data)); + + header.castBody<AMQHeaderBody>()->get<MessageProperties>(true)->setContentLength(data.size()); + header.castBody<AMQHeaderBody>()->get<DeliveryProperties>(true)->setRoutingKey(key); + + builder.handle(method); + CPPUNIT_ASSERT(builder.getMessage()); + CPPUNIT_ASSERT(!builder.getMessage()->getFrames().isComplete()); + + builder.handle(header); + CPPUNIT_ASSERT(builder.getMessage()); + CPPUNIT_ASSERT(!builder.getMessage()->getFrames().isComplete()); + + builder.handle(content); + CPPUNIT_ASSERT(builder.getMessage()); + CPPUNIT_ASSERT(builder.getMessage()->getFrames().isComplete()); } void test2ContentFrames(){ - MockHandler handler; - MessageBuilder builder(&handler); - - string data1("abcdefg"); - string data2("hijklmn"); - - Message::shared_ptr message( - new BasicMessage(0, "test", "my_routing_key", false, false)); - AMQHeaderBody header(BASIC); - header.setContentSize(14); - AMQContentBody part1(data1); - AMQContentBody part2(data2); - - builder.initialise(message); - CPPUNIT_ASSERT(!handler.msg); - builder.setHeader(&header); - CPPUNIT_ASSERT(!handler.msg); - builder.addContent(&part1); - CPPUNIT_ASSERT(!handler.msg); - builder.addContent(&part2); - CPPUNIT_ASSERT(handler.msg); - CPPUNIT_ASSERT_EQUAL(message, handler.msg); + MessageBuilder builder; + builder.start(SequenceNumber()); + + std::string data1("abcdefg"); + std::string data2("hijklmn"); + std::string exchange("builder-exchange"); + std::string key("builder-exchange"); + + AMQFrame method(0, MessageTransferBody(ProtocolVersion(), 0, exchange, 0, 0)); + AMQFrame header(0, AMQHeaderBody()); + AMQFrame content1(0, AMQContentBody(data1)); + AMQFrame content2(0, AMQContentBody(data2)); + + header.castBody<AMQHeaderBody>()->get<MessageProperties>(true)->setContentLength(data1.size() + data2.size()); + header.castBody<AMQHeaderBody>()->get<DeliveryProperties>(true)->setRoutingKey(key); + + builder.handle(method); + builder.handle(header); + builder.handle(content1); + CPPUNIT_ASSERT(builder.getMessage()); + CPPUNIT_ASSERT(!builder.getMessage()->getFrames().isComplete()); + + builder.handle(content2); + CPPUNIT_ASSERT(builder.getMessage()); + CPPUNIT_ASSERT(builder.getMessage()->getFrames().isComplete()); } void testStaging(){ - //store must be the last thing to be destroyed or destructor - //of Message fails (it uses the store to call destroy if lazy - //loaded content is in use) - TestMessageStore store(14); - { - MockHandler handler; - MessageBuilder builder(&handler, &store, 5); - - string data1("abcdefg"); - string data2("hijklmn"); - - Message::shared_ptr message( - new BasicMessage(0, "test", "my_routing_key", false, false)); - AMQHeaderBody header(BASIC); - header.setContentSize(14); - BasicHeaderProperties* properties = dynamic_cast<BasicHeaderProperties*>(header.getProperties()); - properties->setMessageId("MyMessage"); - properties->getHeaders().setString("abc", "xyz"); - - AMQContentBody part1(data1); - AMQContentBody part2(data2); - - builder.initialise(message); - builder.setHeader(&header); - builder.addContent(&part1); - builder.addContent(&part2); - CPPUNIT_ASSERT(handler.msg); - CPPUNIT_ASSERT_EQUAL(message, handler.msg); - - BasicMessage::shared_ptr restored = store.getRestoredMessage(); - CPPUNIT_ASSERT_EQUAL(message->getExchange(), restored->getExchange()); - CPPUNIT_ASSERT_EQUAL(message->getRoutingKey(), restored->getRoutingKey()); - CPPUNIT_ASSERT_EQUAL(message->getHeaderProperties()->getMessageId(), restored->getHeaderProperties()->getMessageId()); - CPPUNIT_ASSERT_EQUAL(message->getHeaderProperties()->getHeaders().getString("abc"), - restored->getHeaderProperties()->getHeaders().getString("abc")); - CPPUNIT_ASSERT_EQUAL((uint64_t) 14, restored->contentSize()); - } + MockMessageStore store; + MessageBuilder builder(&store, 5); + builder.start(SequenceNumber()); + + std::string data1("abcdefg"); + std::string data2("hijklmn"); + std::string exchange("builder-exchange"); + std::string key("builder-exchange"); + + AMQFrame method(0, MessageTransferBody(ProtocolVersion(), 0, exchange, 0, 0)); + AMQFrame header(0, AMQHeaderBody()); + AMQFrame content1(0, AMQContentBody(data1)); + AMQFrame content2(0, AMQContentBody(data2)); + + header.castBody<AMQHeaderBody>()->get<MessageProperties>(true)->setContentLength(data1.size() + data2.size()); + header.castBody<AMQHeaderBody>()->get<DeliveryProperties>(true)->setRoutingKey(key); + + builder.handle(method); + builder.handle(header); + + store.expectStage(*builder.getMessage()); + builder.handle(content1); + CPPUNIT_ASSERT(store.expectationsMet()); + CPPUNIT_ASSERT_EQUAL((uint64_t) 1, builder.getMessage()->getPersistenceId()); + + store.expectAppendContent(*builder.getMessage(), data2); + builder.handle(content2); + CPPUNIT_ASSERT(store.expectationsMet()); + + //were the content frames dropped? + CPPUNIT_ASSERT_EQUAL((uint64_t) 0, builder.getMessage()->contentSize()); } }; diff --git a/qpid/cpp/src/tests/MessageTest.cpp b/qpid/cpp/src/tests/MessageTest.cpp index 1fbb18b7d3..3d080ef3dc 100644 --- a/qpid/cpp/src/tests/MessageTest.cpp +++ b/qpid/cpp/src/tests/MessageTest.cpp @@ -18,7 +18,7 @@ * under the License. * */ -#include "qpid/broker/BrokerMessage.h" +#include "qpid/broker/Message.h" #include "qpid_test_plugin.h" #include <iostream> #include "qpid/framing/AMQP_HighestVersion.h" @@ -45,40 +45,45 @@ class MessageTest : public CppUnit::TestCase string data1("abcdefg"); string data2("hijklmn"); - BasicMessage::shared_ptr msg( - new BasicMessage(0, exchange, routingKey, false, false)); - AMQHeaderBody header(BASIC); - header.setContentSize(14); - AMQContentBody part1(data1); - AMQContentBody part2(data2); - msg->setHeader(&header); - msg->addContent(&part1); - msg->addContent(&part2); + Message::shared_ptr msg(new Message()); + + AMQFrame method(0, MessageTransferBody(ProtocolVersion(), 0, exchange, 0, 0)); + AMQFrame header(0, AMQHeaderBody()); + AMQFrame content1(0, AMQContentBody(data1)); + AMQFrame content2(0, AMQContentBody(data2)); + + msg->getFrames().append(method); + msg->getFrames().append(header); + msg->getFrames().append(content1); + msg->getFrames().append(content2); + + MessageProperties* mProps = msg->getFrames().getHeaders()->get<MessageProperties>(true); + mProps->setContentLength(data1.size() + data2.size()); + mProps->setMessageId(messageId); + FieldTable applicationHeaders; + applicationHeaders.setString("abc", "xyz"); + mProps->setApplicationHeaders(applicationHeaders); + DeliveryProperties* dProps = msg->getFrames().getHeaders()->get<DeliveryProperties>(true); + dProps->setRoutingKey(routingKey); + dProps->setDeliveryMode(PERSISTENT); + CPPUNIT_ASSERT(msg->isPersistent()); - msg->getHeaderProperties()->setMessageId(messageId); - msg->getHeaderProperties()->setDeliveryMode(PERSISTENT); - msg->getHeaderProperties()->getHeaders().setString("abc", "xyz"); Buffer buffer(msg->encodedSize()); msg->encode(buffer); - buffer.flip(); - - msg.reset(new BasicMessage()); - msg->decode(buffer); - CPPUNIT_ASSERT_EQUAL(exchange, msg->getExchange()); - CPPUNIT_ASSERT_EQUAL(routingKey, msg->getRoutingKey()); - CPPUNIT_ASSERT_EQUAL(messageId, msg->getHeaderProperties()->getMessageId()); - CPPUNIT_ASSERT_EQUAL(PERSISTENT, msg->getHeaderProperties()->getDeliveryMode()); - CPPUNIT_ASSERT_EQUAL(string("xyz"), msg->getHeaderProperties()->getHeaders().getString("abc")); - CPPUNIT_ASSERT_EQUAL((uint64_t) 14, msg->contentSize()); + buffer.flip(); + msg.reset(new Message()); + msg->decodeHeader(buffer); + msg->decodeContent(buffer); - MockChannel channel(1); - msg->deliver(channel, "ignore", 0, 100); - CPPUNIT_ASSERT_EQUAL((size_t) 3, channel.out.frames.size()); - AMQContentBody* contentBody( - dynamic_cast<AMQContentBody*>(channel.out.frames[2].getBody())); - CPPUNIT_ASSERT(contentBody); - CPPUNIT_ASSERT_EQUAL(data1 + data2, contentBody->getData()); + CPPUNIT_ASSERT_EQUAL(exchange, msg->getExchangeName()); + CPPUNIT_ASSERT_EQUAL(routingKey, msg->getRoutingKey()); + CPPUNIT_ASSERT_EQUAL((uint64_t) data1.size() + data2.size(), msg->contentSize()); + CPPUNIT_ASSERT_EQUAL((uint64_t) data1.size() + data2.size(), msg->getProperties<MessageProperties>()->getContentLength()); + CPPUNIT_ASSERT_EQUAL(messageId, msg->getProperties<MessageProperties>()->getMessageId()); + CPPUNIT_ASSERT_EQUAL(string("xyz"), msg->getProperties<MessageProperties>()->getApplicationHeaders().getString("abc")); + CPPUNIT_ASSERT_EQUAL((uint8_t) PERSISTENT, msg->getProperties<DeliveryProperties>()->getDeliveryMode()); + CPPUNIT_ASSERT(msg->isPersistent()); } }; diff --git a/qpid/cpp/src/tests/MessageUtils.h b/qpid/cpp/src/tests/MessageUtils.h new file mode 100644 index 0000000000..7fb1755c4b --- /dev/null +++ b/qpid/cpp/src/tests/MessageUtils.h @@ -0,0 +1,53 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/broker/Message.h" +#include "qpid/broker/MessageDelivery.h" +#include "qpid/framing/AMQFrame.h" + +using namespace qpid::broker; +using namespace qpid::framing; + +struct MessageUtils +{ + static Message::shared_ptr createMessage(const string& exchange, const string& routingKey, + const string& messageId, uint64_t contentSize = 0) + { + Message::shared_ptr msg(new Message()); + + AMQFrame method(0, MessageTransferBody(ProtocolVersion(), 0, exchange, 0, 0)); + AMQFrame header(0, AMQHeaderBody()); + + msg->getFrames().append(method); + msg->getFrames().append(header); + MessageProperties* props = msg->getFrames().getHeaders()->get<MessageProperties>(true); + props->setContentLength(contentSize); + props->setMessageId(messageId); + msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey); + return msg; + } + + static void addContent(Message::shared_ptr msg, const string& data) + { + AMQFrame content(0, AMQContentBody(data)); + msg->getFrames().append(content); + } +}; diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp index e7ca124631..ef1518af4c 100644 --- a/qpid/cpp/src/tests/QueueTest.cpp +++ b/qpid/cpp/src/tests/QueueTest.cpp @@ -70,8 +70,13 @@ class QueueTest : public CppUnit::TestCase public: Message::shared_ptr message(std::string exchange, std::string routingKey) { - return Message::shared_ptr( - new BasicMessage(0, exchange, routingKey, false, false)); + Message::shared_ptr msg(new Message()); + AMQFrame method(0, MessageTransferBody(ProtocolVersion(), 0, exchange, 0, 0)); + AMQFrame header(0, AMQHeaderBody()); + msg->getFrames().append(method); + msg->getFrames().append(header); + msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey); + return msg; } diff --git a/qpid/cpp/src/tests/ReferenceTest.cpp b/qpid/cpp/src/tests/ReferenceTest.cpp deleted file mode 100644 index 411462564a..0000000000 --- a/qpid/cpp/src/tests/ReferenceTest.cpp +++ /dev/null @@ -1,94 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <iostream> -#include <memory> -#include "qpid_test_plugin.h" -#include "qpid/broker/Reference.h" -#include "qpid/broker/BrokerMessageMessage.h" -#include "qpid/framing/MessageTransferBody.h" -#include "qpid/framing/MessageAppendBody.h" -#include "qpid/broker/CompletionHandler.h" - -using namespace boost; -using namespace qpid; -using namespace qpid::broker; -using namespace qpid::framing; -using namespace std; - -class ReferenceTest : public CppUnit::TestCase -{ - CPPUNIT_TEST_SUITE(ReferenceTest); - CPPUNIT_TEST(testRegistry); - CPPUNIT_TEST(testReference); - CPPUNIT_TEST_SUITE_END(); - - ProtocolVersion v; - ReferenceRegistry registry; - - public: - void testRegistry() { - Reference::shared_ptr ref = registry.open("foo"); - CPPUNIT_ASSERT_EQUAL(string("foo"), ref->getId()); - CPPUNIT_ASSERT(ref == registry.get("foo")); - try { - registry.get("none"); - CPPUNIT_FAIL("Expected exception"); - } catch (...) {} - try { - registry.open("foo"); - CPPUNIT_FAIL("Expected exception"); - } catch(...) {} - ref->close(); - try { - registry.get("foo"); - CPPUNIT_FAIL("Expected exception"); - } catch(...) {} - } - - void testReference() { - - Reference::shared_ptr r1(registry.open("bar")); - - MessageTransferBody t1(v); - // TODO aconway 2007-04-03: hack around lack of generated setters. Clean this up. - const_cast<framing::Content&>(t1.getBody()) = framing::Content(REFERENCE,"bar"); - MessageMessage::shared_ptr m1(new MessageMessage(0, &t1, r1)); - - MessageTransferBody t2(v); - const_cast<framing::Content&>(t2.getBody()) = framing::Content(REFERENCE,"bar"); - MessageMessage::shared_ptr m2(new MessageMessage(0, &t2, r1)); - - MessageAppendBody a1(v); - MessageAppendBody a2(v); - - r1->addMessage(m1); - r1->addMessage(m2); - CPPUNIT_ASSERT_EQUAL(size_t(2), r1->getMessages().size()); - r1->append(a1); - r1->append(a2); - CPPUNIT_ASSERT_EQUAL(size_t(2), r1->getAppends().size()); - r1->close(); - } -}; - -// Make this test suite a plugin. -CPPUNIT_PLUGIN_IMPLEMENT(); -CPPUNIT_TEST_SUITE_REGISTRATION(ReferenceTest); diff --git a/qpid/cpp/src/tests/TxAckTest.cpp b/qpid/cpp/src/tests/TxAckTest.cpp index 24e8aac701..89a907d495 100644 --- a/qpid/cpp/src/tests/TxAckTest.cpp +++ b/qpid/cpp/src/tests/TxAckTest.cpp @@ -68,11 +68,13 @@ public: TxAckTest() : acked(0), queue(new Queue("my_queue", false, &store, 0)), op(acked, deliveries) { for(int i = 0; i < 10; i++){ - Message::shared_ptr msg( - new BasicMessage(0, "exchange", "routing_key", false, false)); - AMQHeaderBody body(BASIC); - msg->setHeader(&body); - msg->getHeaderProperties()->setDeliveryMode(PERSISTENT); + Message::shared_ptr msg(new Message()); + AMQFrame method(0, MessageTransferBody(ProtocolVersion(), 0, "exchange", 0, 0)); + AMQFrame header(0, AMQHeaderBody()); + msg->getFrames().append(method); + msg->getFrames().append(header); + msg->getProperties<DeliveryProperties>()->setDeliveryMode(PERSISTENT); + msg->getProperties<DeliveryProperties>()->setRoutingKey("routing_key"); messages.push_back(msg); deliveries.push_back(DeliveryRecord(msg, queue, "xyz", (i+1))); } diff --git a/qpid/cpp/src/tests/TxPublishTest.cpp b/qpid/cpp/src/tests/TxPublishTest.cpp index d009dd9112..5628cf1d1c 100644 --- a/qpid/cpp/src/tests/TxPublishTest.cpp +++ b/qpid/cpp/src/tests/TxPublishTest.cpp @@ -26,6 +26,7 @@ #include <list> #include <vector> #include "MockChannel.h" +#include "MessageUtils.h" using std::list; using std::pair; @@ -70,12 +71,10 @@ public: TxPublishTest() : queue1(new Queue("queue1", false, &store, 0)), queue2(new Queue("queue2", false, &store, 0)), - msg(new BasicMessage(0, "exchange", "routing_key", false, false)), + msg(MessageUtils::createMessage("exchange", "routing_key", "id")), op(msg) { - AMQHeaderBody body(BASIC); - msg->setHeader(&body); - msg->getHeaderProperties()->setDeliveryMode(PERSISTENT); + msg->getProperties<DeliveryProperties>()->setDeliveryMode(PERSISTENT); op.deliverTo(queue1); op.deliverTo(queue2); } diff --git a/qpid/python/cpp_failing_0-10.txt b/qpid/python/cpp_failing_0-10.txt index e68f942d67..97cf420717 100644 --- a/qpid/python/cpp_failing_0-10.txt +++ b/qpid/python/cpp_failing_0-10.txt @@ -1,15 +1,4 @@ +tests_0-10.alternate-exchange.AlternateExchangeTests.test_immediate tests_0-10.message.MessageTests.test_reject tests_0-10.basic.BasicTests.test_get -tests_0-10.message.MessageTests.test_get -tests_0-10.message.MessageTests.test_checkpoint -tests_0-10.message.MessageTests.test_empty_reference -tests_0-10.message.MessageTests.test_reference_already_opened_error -tests_0-10.message.MessageTests.test_reference_completion -tests_0-10.message.MessageTests.test_reference_large -tests_0-10.message.MessageTests.test_reference_multi_transfer -tests_0-10.message.MessageTests.test_reference_simple -tests_0-10.message.MessageTests.test_reference_unopened_on_append_error -tests_0-10.message.MessageTests.test_reference_unopened_on_close_error -tests_0-10.message.MessageTests.test_reference_unopened_on_transfer_error - diff --git a/qpid/python/qpid/client.py b/qpid/python/qpid/client.py index 3efd79c389..edcd1b8ad2 100644 --- a/qpid/python/qpid/client.py +++ b/qpid/python/qpid/client.py @@ -115,8 +115,6 @@ class ClientDelegate(Delegate): self.client.started.set() def message_transfer(self, ch, msg): - if isinstance(msg.body, ReferenceId): - msg.reference = ch.references.get(msg.body.id) self.client.queue(msg.destination).put(msg) def message_open(self, ch, msg): diff --git a/qpid/python/qpid/connection.py b/qpid/python/qpid/connection.py index 46b58e83b7..39bcde17df 100644 --- a/qpid/python/qpid/connection.py +++ b/qpid/python/qpid/connection.py @@ -239,9 +239,7 @@ class Response(Frame): return "[%s] Response(%s,%s,%s) %s" % (self.channel, self.id, self.request_id, self.batch_offset, self.method) def uses_struct_encoding(spec): - return (spec.major == 0 and - spec.minor == 10 and - "transitional" not in spec.file) + return (spec.major == 0 and spec.minor == 10) class Header(Frame): diff --git a/qpid/python/qpid/testlib.py b/qpid/python/qpid/testlib.py index 0b2a1b78d6..28c07ba43a 100644 --- a/qpid/python/qpid/testlib.py +++ b/qpid/python/qpid/testlib.py @@ -280,18 +280,17 @@ class TestBase(unittest.TestCase): routing_key=routing_key) else: self.channel.message_transfer( - destination=exchange, body=body, - application_headers=properties, - routing_key=routing_key) + destination=exchange, + content=Content(body, properties={'application_headers':properties,'routing_key':routing_key})) msg = queue.get(timeout=1) if testrunner.use08spec(): self.assertEqual(body, msg.content.body) if (properties): self.assertEqual(properties, msg.content.properties) else: - self.assertEqual(body, msg.body) + self.assertEqual(body, msg.content.body) if (properties): - self.assertEqual(properties, msg.application_headers) + self.assertEqual(properties, msg.content['application_headers']) def assertPublishConsume(self, queue="", exchange="", routing_key="", properties=None): """ diff --git a/qpid/python/tests_0-10/alternate-exchange.py b/qpid/python/tests_0-10/alternate-exchange.py index a1c6151fca..d6ac62ccfe 100644 --- a/qpid/python/tests_0-10/alternate-exchange.py +++ b/qpid/python/tests_0-10/alternate-exchange.py @@ -50,17 +50,17 @@ class AlternateExchangeTests(TestBase): #publish to the primary exchange #...one message that makes it to the 'processed' queue: - channel.message_transfer(destination="primary", routing_key="my-key", body="Good") + channel.message_transfer(destination="primary", content=Content("Good", properties={'routing_key':"my-key"})) #...and one that does not: - channel.message_transfer(destination="primary", routing_key="unused-key", body="Bad") + channel.message_transfer(destination="primary", content=Content("Bad", properties={'routing_key':"unused-key"})) #delete the exchanges channel.exchange_delete(exchange="primary") channel.exchange_delete(exchange="secondary") #verify behaviour - self.assertEqual("Good", processed.get(timeout=1).body) - self.assertEqual("Bad", returned.get(timeout=1).body) + self.assertEqual("Good", processed.get(timeout=1).content.body) + self.assertEqual("Bad", returned.get(timeout=1).content.body) self.assertEmpty(processed) self.assertEmpty(returned) @@ -79,18 +79,18 @@ class AlternateExchangeTests(TestBase): #create a queue using the dlq as its alternate exchange: channel.queue_declare(queue="delete-me", alternate_exchange="dlq") #send it some messages: - channel.message_transfer(routing_key="delete-me", body="One") - channel.message_transfer(routing_key="delete-me", body="Two") - channel.message_transfer(routing_key="delete-me", body="Three") + channel.message_transfer(content=Content("One", properties={'routing_key':"delete-me"})) + channel.message_transfer(content=Content("Two", properties={'routing_key':"delete-me"})) + channel.message_transfer(content=Content("Three", properties={'routing_key':"delete-me"})) #delete it: channel.queue_delete(queue="delete-me") #delete the dlq exchange: channel.exchange_delete(exchange="dlq") #check the messages were delivered to the dlq: - self.assertEqual("One", dlq.get(timeout=1).body) - self.assertEqual("Two", dlq.get(timeout=1).body) - self.assertEqual("Three", dlq.get(timeout=1).body) + self.assertEqual("One", dlq.get(timeout=1).content.body) + self.assertEqual("Two", dlq.get(timeout=1).content.body) + self.assertEqual("Three", dlq.get(timeout=1).content.body) self.assertEmpty(dlq) @@ -109,10 +109,11 @@ class AlternateExchangeTests(TestBase): #create a queue using the dlq as its alternate exchange: channel.queue_declare(queue="no-consumers", alternate_exchange="dlq", exclusive=True) #send it some messages: - channel.message_transfer(routing_key="no-consumers", body="no one wants me", immediate=True) + #TODO: WE HAVE LOST THE IMMEDIATE FLAG; FIX THIS ONCE ITS BACK + channel.message_transfer(content=Content("no one wants me", properties={'routing_key':"no-consumers"})) #check the messages were delivered to the dlq: - self.assertEqual("no one wants me", dlq.get(timeout=1).body) + self.assertEqual("no one wants me", dlq.get(timeout=1).content.body) self.assertEmpty(dlq) #cleanup: diff --git a/qpid/python/tests_0-10/basic.py b/qpid/python/tests_0-10/basic.py deleted file mode 100644 index e7d22e00da..0000000000 --- a/qpid/python/tests_0-10/basic.py +++ /dev/null @@ -1,396 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -from qpid.client import Client, Closed -from qpid.queue import Empty -from qpid.content import Content -from qpid.testlib import testrunner, TestBase - -class BasicTests(TestBase): - """Tests for 'methods' on the amqp basic 'class'""" - - def test_consume_no_local(self): - """ - Test that the no_local flag is honoured in the consume method - """ - channel = self.channel - #setup, declare two queues: - channel.queue_declare(queue="test-queue-1a", exclusive=True) - channel.queue_declare(queue="test-queue-1b", exclusive=True) - #establish two consumers one of which excludes delivery of locally sent messages - channel.basic_consume(consumer_tag="local_included", queue="test-queue-1a") - channel.basic_consume(consumer_tag="local_excluded", queue="test-queue-1b", no_local=True) - - #send a message - channel.basic_publish(routing_key="test-queue-1a", content=Content("consume_no_local")) - channel.basic_publish(routing_key="test-queue-1b", content=Content("consume_no_local")) - - #check the queues of the two consumers - excluded = self.client.queue("local_excluded") - included = self.client.queue("local_included") - msg = included.get(timeout=1) - self.assertEqual("consume_no_local", msg.content.body) - try: - excluded.get(timeout=1) - self.fail("Received locally published message though no_local=true") - except Empty: None - - - def test_consume_exclusive(self): - """ - Test that the exclusive flag is honoured in the consume method - """ - channel = self.channel - #setup, declare a queue: - channel.queue_declare(queue="test-queue-2", exclusive=True) - - #check that an exclusive consumer prevents other consumer being created: - channel.basic_consume(consumer_tag="first", queue="test-queue-2", exclusive=True) - try: - channel.basic_consume(consumer_tag="second", queue="test-queue-2") - self.fail("Expected consume request to fail due to previous exclusive consumer") - except Closed, e: - self.assertChannelException(403, e.args[0]) - - #open new channel and cleanup last consumer: - channel = self.client.channel(2) - channel.channel_open() - - #check that an exclusive consumer cannot be created if a consumer already exists: - channel.basic_consume(consumer_tag="first", queue="test-queue-2") - try: - channel.basic_consume(consumer_tag="second", queue="test-queue-2", exclusive=True) - self.fail("Expected exclusive consume request to fail due to previous consumer") - except Closed, e: - self.assertChannelException(403, e.args[0]) - - def test_consume_queue_errors(self): - """ - Test error conditions associated with the queue field of the consume method: - """ - channel = self.channel - try: - #queue specified but doesn't exist: - channel.basic_consume(queue="invalid-queue") - self.fail("Expected failure when consuming from non-existent queue") - except Closed, e: - self.assertChannelException(404, e.args[0]) - - channel = self.client.channel(2) - channel.channel_open() - try: - #queue not specified and none previously declared for channel: - channel.basic_consume(queue="") - self.fail("Expected failure when consuming from unspecified queue") - except Closed, e: - self.assertConnectionException(530, e.args[0]) - - def test_consume_unique_consumers(self): - """ - Ensure unique consumer tags are enforced - """ - channel = self.channel - #setup, declare a queue: - channel.queue_declare(queue="test-queue-3", exclusive=True) - - #check that attempts to use duplicate tags are detected and prevented: - channel.basic_consume(consumer_tag="first", queue="test-queue-3") - try: - channel.basic_consume(consumer_tag="first", queue="test-queue-3") - self.fail("Expected consume request to fail due to non-unique tag") - except Closed, e: - self.assertConnectionException(530, e.args[0]) - - def test_cancel(self): - """ - Test compliance of the basic.cancel method - """ - channel = self.channel - #setup, declare a queue: - channel.queue_declare(queue="test-queue-4", exclusive=True) - channel.basic_consume(consumer_tag="my-consumer", queue="test-queue-4") - channel.basic_publish(routing_key="test-queue-4", content=Content("One")) - - myqueue = self.client.queue("my-consumer") - msg = myqueue.get(timeout=1) - self.assertEqual("One", msg.content.body) - - #cancel should stop messages being delivered - channel.basic_cancel(consumer_tag="my-consumer") - channel.basic_publish(routing_key="test-queue-4", content=Content("Two")) - try: - msg = myqueue.get(timeout=1) - self.fail("Got message after cancellation: " + msg) - except Empty: None - - #cancellation of non-existant consumers should be handled without error - channel.basic_cancel(consumer_tag="my-consumer") - channel.basic_cancel(consumer_tag="this-never-existed") - - - def test_ack(self): - """ - Test basic ack/recover behaviour - """ - channel = self.channel - channel.queue_declare(queue="test-ack-queue", exclusive=True) - - reply = channel.basic_consume(queue="test-ack-queue", no_ack=False) - queue = self.client.queue(reply.consumer_tag) - - channel.basic_publish(routing_key="test-ack-queue", content=Content("One")) - channel.basic_publish(routing_key="test-ack-queue", content=Content("Two")) - channel.basic_publish(routing_key="test-ack-queue", content=Content("Three")) - channel.basic_publish(routing_key="test-ack-queue", content=Content("Four")) - channel.basic_publish(routing_key="test-ack-queue", content=Content("Five")) - - msg1 = queue.get(timeout=1) - msg2 = queue.get(timeout=1) - msg3 = queue.get(timeout=1) - msg4 = queue.get(timeout=1) - msg5 = queue.get(timeout=1) - - self.assertEqual("One", msg1.content.body) - self.assertEqual("Two", msg2.content.body) - self.assertEqual("Three", msg3.content.body) - self.assertEqual("Four", msg4.content.body) - self.assertEqual("Five", msg5.content.body) - - channel.basic_ack(delivery_tag=msg2.delivery_tag, multiple=True) #One & Two - channel.basic_ack(delivery_tag=msg4.delivery_tag, multiple=False) #Four - - channel.basic_recover(requeue=False) - - msg3b = queue.get(timeout=1) - msg5b = queue.get(timeout=1) - - self.assertEqual("Three", msg3b.content.body) - self.assertEqual("Five", msg5b.content.body) - - try: - extra = queue.get(timeout=1) - self.fail("Got unexpected message: " + extra.content.body) - except Empty: None - - def test_recover_requeue(self): - """ - Test requeing on recovery - """ - channel = self.channel - channel.queue_declare(queue="test-requeue", exclusive=True) - - subscription = channel.basic_consume(queue="test-requeue", no_ack=False) - queue = self.client.queue(subscription.consumer_tag) - - channel.basic_publish(routing_key="test-requeue", content=Content("One")) - channel.basic_publish(routing_key="test-requeue", content=Content("Two")) - channel.basic_publish(routing_key="test-requeue", content=Content("Three")) - channel.basic_publish(routing_key="test-requeue", content=Content("Four")) - channel.basic_publish(routing_key="test-requeue", content=Content("Five")) - - msg1 = queue.get(timeout=1) - msg2 = queue.get(timeout=1) - msg3 = queue.get(timeout=1) - msg4 = queue.get(timeout=1) - msg5 = queue.get(timeout=1) - - self.assertEqual("One", msg1.content.body) - self.assertEqual("Two", msg2.content.body) - self.assertEqual("Three", msg3.content.body) - self.assertEqual("Four", msg4.content.body) - self.assertEqual("Five", msg5.content.body) - - channel.basic_ack(delivery_tag=msg2.delivery_tag, multiple=True) #One & Two - channel.basic_ack(delivery_tag=msg4.delivery_tag, multiple=False) #Four - - channel.basic_cancel(consumer_tag=subscription.consumer_tag) - - channel.basic_recover(requeue=True) - - subscription2 = channel.basic_consume(queue="test-requeue") - queue2 = self.client.queue(subscription2.consumer_tag) - - msg3b = queue2.get(timeout=1) - msg5b = queue2.get(timeout=1) - - self.assertEqual("Three", msg3b.content.body) - self.assertEqual("Five", msg5b.content.body) - - self.assertEqual(True, msg3b.redelivered) - self.assertEqual(True, msg5b.redelivered) - - try: - extra = queue2.get(timeout=1) - self.fail("Got unexpected message in second queue: " + extra.content.body) - except Empty: None - try: - extra = queue.get(timeout=1) - self.fail("Got unexpected message in original queue: " + extra.content.body) - except Empty: None - - - def test_qos_prefetch_count(self): - """ - Test that the prefetch count specified is honoured - """ - #setup: declare queue and subscribe - channel = self.channel - channel.queue_declare(queue="test-prefetch-count", exclusive=True) - subscription = channel.basic_consume(queue="test-prefetch-count", no_ack=False) - queue = self.client.queue(subscription.consumer_tag) - - #set prefetch to 5: - channel.basic_qos(prefetch_count=5) - - #publish 10 messages: - for i in range(1, 11): - channel.basic_publish(routing_key="test-prefetch-count", content=Content("Message %d" % i)) - - #only 5 messages should have been delivered: - for i in range(1, 6): - msg = queue.get(timeout=1) - self.assertEqual("Message %d" % i, msg.content.body) - try: - extra = queue.get(timeout=1) - self.fail("Got unexpected 6th message in original queue: " + extra.content.body) - except Empty: None - - #ack messages and check that the next set arrive ok: - channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True) - - for i in range(6, 11): - msg = queue.get(timeout=1) - self.assertEqual("Message %d" % i, msg.content.body) - - channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True) - - try: - extra = queue.get(timeout=1) - self.fail("Got unexpected 11th message in original queue: " + extra.content.body) - except Empty: None - - - - def test_qos_prefetch_size(self): - """ - Test that the prefetch size specified is honoured - """ - #setup: declare queue and subscribe - channel = self.channel - channel.queue_declare(queue="test-prefetch-size", exclusive=True) - subscription = channel.basic_consume(queue="test-prefetch-size", no_ack=False) - queue = self.client.queue(subscription.consumer_tag) - - #set prefetch to 50 bytes (each message is 9 or 10 bytes): - channel.basic_qos(prefetch_size=50) - - #publish 10 messages: - for i in range(1, 11): - channel.basic_publish(routing_key="test-prefetch-size", content=Content("Message %d" % i)) - - #only 5 messages should have been delivered (i.e. 45 bytes worth): - for i in range(1, 6): - msg = queue.get(timeout=1) - self.assertEqual("Message %d" % i, msg.content.body) - - try: - extra = queue.get(timeout=1) - self.fail("Got unexpected 6th message in original queue: " + extra.content.body) - except Empty: None - - #ack messages and check that the next set arrive ok: - channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True) - - for i in range(6, 11): - msg = queue.get(timeout=1) - self.assertEqual("Message %d" % i, msg.content.body) - - channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True) - - try: - extra = queue.get(timeout=1) - self.fail("Got unexpected 11th message in original queue: " + extra.content.body) - except Empty: None - - #make sure that a single oversized message still gets delivered - large = "abcdefghijklmnopqrstuvwxyz" - large = large + "-" + large; - channel.basic_publish(routing_key="test-prefetch-size", content=Content(large)) - msg = queue.get(timeout=1) - self.assertEqual(large, msg.content.body) - - def test_get(self): - """ - Test basic_get method - """ - channel = self.channel - channel.queue_declare(queue="test-get", exclusive=True) - - #publish some messages (no_ack=True) - for i in range(1, 11): - channel.basic_publish(routing_key="test-get", content=Content("Message %d" % i)) - - #use basic_get to read back the messages, and check that we get an empty at the end - for i in range(1, 11): - reply = channel.basic_get(no_ack=True) - self.assertEqual(reply.method.klass.name, "basic") - self.assertEqual(reply.method.name, "get_ok") - self.assertEqual("Message %d" % i, reply.content.body) - - reply = channel.basic_get(no_ack=True) - self.assertEqual(reply.method.klass.name, "basic") - self.assertEqual(reply.method.name, "get_empty") - - #repeat for no_ack=False - for i in range(11, 21): - channel.basic_publish(routing_key="test-get", content=Content("Message %d" % i)) - - for i in range(11, 21): - reply = channel.basic_get(no_ack=False) - self.assertEqual(reply.method.klass.name, "basic") - self.assertEqual(reply.method.name, "get_ok") - self.assertEqual("Message %d" % i, reply.content.body) - if(i == 13): - channel.basic_ack(delivery_tag=reply.delivery_tag, multiple=True) - if(i in [15, 17, 19]): - channel.basic_ack(delivery_tag=reply.delivery_tag) - - reply = channel.basic_get(no_ack=True) - self.assertEqual(reply.method.klass.name, "basic") - self.assertEqual(reply.method.name, "get_empty") - - #recover(requeue=True) - channel.basic_recover(requeue=True) - - #get the unacked messages again (14, 16, 18, 20) - for i in [14, 16, 18, 20]: - reply = channel.basic_get(no_ack=False) - self.assertEqual(reply.method.klass.name, "basic") - self.assertEqual(reply.method.name, "get_ok") - self.assertEqual("Message %d" % i, reply.content.body) - channel.basic_ack(delivery_tag=reply.delivery_tag) - - reply = channel.basic_get(no_ack=True) - self.assertEqual(reply.method.klass.name, "basic") - self.assertEqual(reply.method.name, "get_empty") - - channel.basic_recover(requeue=True) - - reply = channel.basic_get(no_ack=True) - self.assertEqual(reply.method.klass.name, "basic") - self.assertEqual(reply.method.name, "get_empty") diff --git a/qpid/python/tests_0-10/broker.py b/qpid/python/tests_0-10/broker.py index 647f5d4fa5..0eb71287ec 100644 --- a/qpid/python/tests_0-10/broker.py +++ b/qpid/python/tests_0-10/broker.py @@ -37,19 +37,19 @@ class BrokerTests(TestBase): ctag = "tag1" ch.message_subscribe(queue = "myqueue", destination = ctag, confirm_mode = 0) body = "test no-ack" - ch.message_transfer(routing_key = "myqueue", body = body) + ch.message_transfer(content = Content(body, properties = {"routing_key" : "myqueue"})) msg = self.client.queue(ctag).get(timeout = 5) - self.assert_(msg.body == body) + self.assert_(msg.content.body == body) # Acknowledging consumer self.queue_declare(ch, queue = "otherqueue") ctag = "tag2" ch.message_subscribe(queue = "otherqueue", destination = ctag, confirm_mode = 1) body = "test ack" - ch.message_transfer(routing_key = "otherqueue", body = body) + ch.message_transfer(content = Content(body, properties = {"routing_key" : "otherqueue"})) msg = self.client.queue(ctag).get(timeout = 5) msg.complete() - self.assert_(msg.body == body) + self.assert_(msg.content.body == body) def test_simple_delivery_immediate(self): """ @@ -64,9 +64,9 @@ class BrokerTests(TestBase): queue = self.client.queue(consumer_tag) body = "Immediate Delivery" - channel.message_transfer(destination="test-exchange", routing_key="key", body=body, immediate=True) + channel.message_transfer(destination="test-exchange", content = Content(body, properties = {"routing_key" : "key"})) msg = queue.get(timeout=5) - self.assert_(msg.body == body) + self.assert_(msg.content.body == body) # TODO: Ensure we fail if immediate=True and there's no consumer. @@ -81,13 +81,13 @@ class BrokerTests(TestBase): self.queue_declare(channel, queue="test-queue") channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key") body = "Queued Delivery" - channel.message_transfer(destination="test-exchange", routing_key="key", body=body) + channel.message_transfer(destination="test-exchange", content = Content(body, properties = {"routing_key" : "key"})) consumer_tag = "tag1" channel.message_subscribe(queue="test-queue", destination=consumer_tag, confirm_mode = 0) queue = self.client.queue(consumer_tag) msg = queue.get(timeout=5) - self.assert_(msg.body == body) + self.assert_(msg.content.body == body) def test_invalid_channel(self): channel = self.client.channel(200) @@ -114,8 +114,9 @@ class BrokerTests(TestBase): channel.message_subscribe(destination="my-tag", queue="flow_test_queue") incoming = self.client.queue("my-tag") - channel.channel_flow(active=False) - channel.message_transfer(routing_key="flow_test_queue", body="abcdefghijklmnopqrstuvwxyz") + channel.channel_flow(active=False) + c = Content("abcdefghijklmnopqrstuvwxyz", properties = {"routing_key" : "flow_test_queue"}) + channel.message_transfer(content = c) try: incoming.get(timeout=1) self.fail("Received message when flow turned off.") @@ -123,4 +124,4 @@ class BrokerTests(TestBase): channel.channel_flow(active=True) msg = incoming.get(timeout=1) - self.assertEqual("abcdefghijklmnopqrstuvwxyz", msg.body) + self.assertEqual("abcdefghijklmnopqrstuvwxyz", msg.content.body) diff --git a/qpid/python/tests_0-10/dtx.py b/qpid/python/tests_0-10/dtx.py index a5b53ac65b..ea587f5998 100644 --- a/qpid/python/tests_0-10/dtx.py +++ b/qpid/python/tests_0-10/dtx.py @@ -248,8 +248,8 @@ class DtxTests(TestBase): #setup channel1.queue_declare(queue="one", exclusive=True) channel1.queue_declare(queue="two", exclusive=True) - channel1.message_transfer(routing_key="one", message_id="a", body="DtxMessage") - channel1.message_transfer(routing_key="two", message_id="b", body="DtxMessage") + channel1.message_transfer(content=Content(properties={'routing_key':"one", 'message_id':"a"}, body="DtxMessage")) + channel1.message_transfer(content=Content(properties={'routing_key':"two", 'message_id':"b"}, body="DtxMessage")) #create a xid tx = self.xid("dummy") @@ -284,8 +284,8 @@ class DtxTests(TestBase): #setup channel.queue_declare(queue="one", exclusive=True) channel.queue_declare(queue="two", exclusive=True) - channel.message_transfer(routing_key="one", message_id="a", body="DtxMessage") - channel.message_transfer(routing_key="two", message_id="b", body="DtxMessage") + channel.message_transfer(content=Content(properties={'routing_key':"one", 'message_id':"a"}, body="DtxMessage")) + channel.message_transfer(content=Content(properties={'routing_key':"two", 'message_id':"b"}, body="DtxMessage")) tx = self.xid("dummy") @@ -358,17 +358,17 @@ class DtxTests(TestBase): channel.dtx_demarcation_select() tx = self.xid("dummy") channel.dtx_demarcation_start(xid=tx) - channel.message_transfer(routing_key="tx-queue", message_id="one", body="DtxMessage") + channel.message_transfer(content=Content(properties={'routing_key':"tx-queue", 'message_id':"one"}, body="DtxMessage")) channel.dtx_demarcation_end(xid=tx) #now that association with txn is ended, publish another message - channel.message_transfer(routing_key="tx-queue", message_id="two", body="DtxMessage") + channel.message_transfer(content=Content(properties={'routing_key':"tx-queue", 'message_id':"two"}, body="DtxMessage")) #check the second message is available, but not the first self.assertMessageCount(1, "tx-queue") channel.message_subscribe(queue="tx-queue", destination="results", confirm_mode=1) msg = self.client.queue("results").get(timeout=1) - self.assertEqual("two", msg.message_id) + self.assertEqual("two", msg.content['message_id']) channel.message_cancel(destination="results") #ack the message then close the channel msg.complete() @@ -393,7 +393,7 @@ class DtxTests(TestBase): tester.dtx_demarcation_select() tx = self.xid("dummy") tester.dtx_demarcation_start(xid=tx) - tester.message_transfer(routing_key="dummy", body="whatever") + tester.message_transfer(content=Content(properties={'routing_key':"dummy"}, body="whatever")) tester.dtx_demarcation_end(xid=tx) tester.dtx_coordination_prepare(xid=tx) failed = False @@ -427,7 +427,7 @@ class DtxTests(TestBase): tester.dtx_demarcation_select() tx = self.xid("dummy") tester.dtx_demarcation_start(xid=tx) - tester.message_transfer(routing_key="dummy", body="whatever") + tester.message_transfer(content=Content(properties={'routing_key':"dummy"}, body="whatever")) tester.dtx_demarcation_end(xid=tx) failed = False try: @@ -456,14 +456,14 @@ class DtxTests(TestBase): #setup: channel2.queue_declare(queue="dummy", exclusive=True) - channel2.message_transfer(routing_key="dummy", body="whatever") + channel2.message_transfer(content=Content(properties={'routing_key':"dummy"}, body="whatever")) tx = self.xid("dummy") channel2.dtx_demarcation_select() channel2.dtx_demarcation_start(xid=tx) channel2.message_get(queue="dummy", destination="dummy") self.client.queue("dummy").get(timeout=1).complete() - channel2.message_transfer(routing_key="dummy", body="whatever") + channel2.message_transfer(content=Content(properties={'routing_key':"dummy"}, body="whatever")) channel2.channel_close() self.assertEqual(self.XA_RBROLLBACK, channel1.dtx_coordination_prepare(xid=tx).status) @@ -497,7 +497,7 @@ class DtxTests(TestBase): tx = self.xid("dummy") channel.queue_declare(queue="queue-a", exclusive=True) channel.queue_declare(queue="queue-b", exclusive=True) - channel.message_transfer(routing_key="queue-a", message_id="timeout", body="DtxMessage") + channel.message_transfer(content=Content(properties={'routing_key':"queue-a", 'message_id':"timeout"}, body="DtxMessage")) channel.dtx_demarcation_select() channel.dtx_demarcation_start(xid=tx) @@ -527,7 +527,7 @@ class DtxTests(TestBase): for i in range(1, 10): tx = self.xid("tx%s" % (i)) channel.dtx_demarcation_start(xid=tx) - channel.message_transfer(routing_key="dummy", body="message%s" % (i)) + channel.message_transfer(content=Content(properties={'routing_key':"dummy"}, body="message%s" % (i))) channel.dtx_demarcation_end(xid=tx) if i in [2, 5, 6, 8]: channel.dtx_coordination_prepare(xid=tx) @@ -575,7 +575,7 @@ class DtxTests(TestBase): channel.queue_declare(queue="queue-a", exclusive=True) channel.queue_declare(queue="queue-b", exclusive=True) #put message with specified id on one queue: - channel.message_transfer(routing_key="queue-a", message_id=id, body="DtxMessage") + channel.message_transfer(content=Content(properties={'routing_key':"queue-a", 'message_id':id}, body="DtxMessage")) #start the transaction: channel.dtx_demarcation_select() @@ -594,12 +594,13 @@ class DtxTests(TestBase): msg.complete(); #re-publish to dest - channel.message_transfer(routing_key=dest, message_id=msg.message_id, body=msg.body) + channel.message_transfer(content=Content(properties={'routing_key':dest, 'message_id':msg.content['message_id']}, + body=msg.content.body)) def assertMessageCount(self, expected, queue): self.assertEqual(expected, self.channel.queue_query(queue=queue).message_count) def assertMessageId(self, expected, queue): self.channel.message_subscribe(queue=queue, destination="results") - self.assertEqual(expected, self.client.queue("results").get(timeout=1).message_id) + self.assertEqual(expected, self.client.queue("results").get(timeout=1).content['message_id']) self.channel.message_cancel(destination="results") diff --git a/qpid/python/tests_0-10/example.py b/qpid/python/tests_0-10/example.py index e4c80951ac..e3e2c3b095 100644 --- a/qpid/python/tests_0-10/example.py +++ b/qpid/python/tests_0-10/example.py @@ -76,10 +76,9 @@ class ExampleTest (TestBase): # Now lets publish a message and see if our consumer gets it. To do # this we need to import the Content class. - body = "Hello World!" - channel.message_transfer(destination="test", - routing_key="key", - body = body) + sent = Content("Hello World!") + sent["routing_key"] = "key" + channel.message_transfer(destination="test", content=sent) # Now we'll wait for the message to arrive. We can use the timeout # argument in case the server hangs. By default queue.get() will wait @@ -87,7 +86,7 @@ class ExampleTest (TestBase): msg = queue.get(timeout=10) # And check that we got the right response with assertEqual - self.assertEqual(body, msg.body) + self.assertEqual(sent.body, msg.content.body) # Now acknowledge the message. msg.complete() diff --git a/qpid/python/tests_0-10/exchange.py b/qpid/python/tests_0-10/exchange.py index 3a47ffff8c..4137eb7a51 100644 --- a/qpid/python/tests_0-10/exchange.py +++ b/qpid/python/tests_0-10/exchange.py @@ -61,10 +61,10 @@ class StandardExchangeVerifier: self.assertPublishGet(q, ex, "a.x.b.x") self.assertPublishGet(q, ex, "a.x.x.b.x") # Shouldn't match - self.channel.message_transfer(destination=ex, routing_key="a.b", body="") - self.channel.message_transfer(destination=ex, routing_key="a.b.x.y", body="") - self.channel.message_transfer(destination=ex, routing_key="x.a.b.x", body="") - self.channel.message_transfer(destination=ex, routing_key="a.b", body="") + self.channel.message_transfer(destination=ex, content=Content(properties={'routing_key':"a.b"})) + self.channel.message_transfer(destination=ex, content=Content(properties={'routing_key':"a.b.x.y"})) + self.channel.message_transfer(destination=ex, content=Content(properties={'routing_key':"x.a.b.x"})) + self.channel.message_transfer(destination=ex, content=Content(properties={'routing_key':"a.b"})) self.assert_(q.empty()) def verifyHeadersExchange(self, ex): @@ -74,7 +74,7 @@ class StandardExchangeVerifier: q = self.consume("q") headers = {"name":"fred", "age":3} self.assertPublishGet(q, exchange=ex, properties=headers) - self.channel.message_transfer(destination=ex, body="") # No headers, won't deliver + self.channel.message_transfer(destination=ex) # No headers, won't deliver self.assertEmpty(q); @@ -275,7 +275,7 @@ class HeadersExchangeTests(TestBase): self.assertPublishGet(self.q, exchange="amq.match", properties=headers) def myBasicPublish(self, headers): - self.channel.message_transfer(destination="amq.match", body="foobar", application_headers=headers) + self.channel.message_transfer(destination="amq.match", content=Content("foobar", properties={'application_headers':headers})) def testMatchAll(self): self.channel.queue_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'all', "name":"fred", "age":3}) diff --git a/qpid/python/tests_0-10/message.py b/qpid/python/tests_0-10/message.py index 6cf2f3ef89..f08f437a65 100644 --- a/qpid/python/tests_0-10/message.py +++ b/qpid/python/tests_0-10/message.py @@ -38,14 +38,14 @@ class MessageTests(TestBase): channel.message_subscribe(destination="local_excluded", queue="test-queue-1b", no_local=True) #send a message - channel.message_transfer(routing_key="test-queue-1a", body="consume_no_local") - channel.message_transfer(routing_key="test-queue-1b", body="consume_no_local") + channel.message_transfer(content=Content(properties={'routing_key' : "test-queue-1a"}, body="consume_no_local")) + channel.message_transfer(content=Content(properties={'routing_key' : "test-queue-1b"}, body="consume_no_local")) #check the queues of the two consumers excluded = self.client.queue("local_excluded") included = self.client.queue("local_included") msg = included.get(timeout=1) - self.assertEqual("consume_no_local", msg.body) + self.assertEqual("consume_no_local", msg.content.body) try: excluded.get(timeout=1) self.fail("Received locally published message though no_local=true") @@ -125,14 +125,14 @@ class MessageTests(TestBase): #setup, declare a queue: channel.queue_declare(queue="test-queue-4", exclusive=True) channel.message_subscribe(destination="my-consumer", queue="test-queue-4") - channel.message_transfer(routing_key="test-queue-4", body="One") + channel.message_transfer(content=Content(properties={'routing_key' : "test-queue-4"}, body="One")) #cancel should stop messages being delivered channel.message_cancel(destination="my-consumer") - channel.message_transfer(routing_key="test-queue-4", body="Two") + channel.message_transfer(content=Content(properties={'routing_key' : "test-queue-4"}, body="Two")) myqueue = self.client.queue("my-consumer") msg = myqueue.get(timeout=1) - self.assertEqual("One", msg.body) + self.assertEqual("One", msg.content.body) try: msg = myqueue.get(timeout=1) self.fail("Got message after cancellation: " + msg) @@ -153,11 +153,11 @@ class MessageTests(TestBase): channel.message_subscribe(queue="test-ack-queue", destination="consumer_tag", confirm_mode=1) queue = self.client.queue("consumer_tag") - channel.message_transfer(routing_key="test-ack-queue", body="One") - channel.message_transfer(routing_key="test-ack-queue", body="Two") - channel.message_transfer(routing_key="test-ack-queue", body="Three") - channel.message_transfer(routing_key="test-ack-queue", body="Four") - channel.message_transfer(routing_key="test-ack-queue", body="Five") + channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="One")) + channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="Two")) + channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="Three")) + channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="Four")) + channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="Five")) msg1 = queue.get(timeout=1) msg2 = queue.get(timeout=1) @@ -165,11 +165,11 @@ class MessageTests(TestBase): msg4 = queue.get(timeout=1) msg5 = queue.get(timeout=1) - self.assertEqual("One", msg1.body) - self.assertEqual("Two", msg2.body) - self.assertEqual("Three", msg3.body) - self.assertEqual("Four", msg4.body) - self.assertEqual("Five", msg5.body) + self.assertEqual("One", msg1.content.body) + self.assertEqual("Two", msg2.content.body) + self.assertEqual("Three", msg3.content.body) + self.assertEqual("Four", msg4.content.body) + self.assertEqual("Five", msg5.content.body) msg2.complete(cumulative=True)#One and Two msg4.complete(cumulative=False) @@ -179,12 +179,12 @@ class MessageTests(TestBase): msg3b = queue.get(timeout=1) msg5b = queue.get(timeout=1) - self.assertEqual("Three", msg3b.body) - self.assertEqual("Five", msg5b.body) + self.assertEqual("Three", msg3b.content.body) + self.assertEqual("Five", msg5b.content.body) try: extra = queue.get(timeout=1) - self.fail("Got unexpected message: " + extra.body) + self.fail("Got unexpected message: " + extra.content.body) except Empty: None def test_recover_requeue(self): @@ -197,11 +197,11 @@ class MessageTests(TestBase): channel.message_subscribe(queue="test-requeue", destination="consumer_tag", confirm_mode=1) queue = self.client.queue("consumer_tag") - channel.message_transfer(routing_key="test-requeue", body="One") - channel.message_transfer(routing_key="test-requeue", body="Two") - channel.message_transfer(routing_key="test-requeue", body="Three") - channel.message_transfer(routing_key="test-requeue", body="Four") - channel.message_transfer(routing_key="test-requeue", body="Five") + channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="One")) + channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Two")) + channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Three")) + channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Four")) + channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Five")) msg1 = queue.get(timeout=1) msg2 = queue.get(timeout=1) @@ -209,11 +209,11 @@ class MessageTests(TestBase): msg4 = queue.get(timeout=1) msg5 = queue.get(timeout=1) - self.assertEqual("One", msg1.body) - self.assertEqual("Two", msg2.body) - self.assertEqual("Three", msg3.body) - self.assertEqual("Four", msg4.body) - self.assertEqual("Five", msg5.body) + self.assertEqual("One", msg1.content.body) + self.assertEqual("Two", msg2.content.body) + self.assertEqual("Three", msg3.content.body) + self.assertEqual("Four", msg4.content.body) + self.assertEqual("Five", msg5.content.body) msg2.complete(cumulative=True) #One and Two msg4.complete(cumulative=False) #Four @@ -221,7 +221,7 @@ class MessageTests(TestBase): channel.message_cancel(destination="consumer_tag") #publish a new message - channel.message_transfer(routing_key="test-requeue", body="Six") + channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Six")) #requeue unacked messages (Three and Five) channel.message_recover(requeue=True) @@ -231,21 +231,21 @@ class MessageTests(TestBase): msg3b = queue2.get(timeout=1) msg5b = queue2.get(timeout=1) - self.assertEqual("Three", msg3b.body) - self.assertEqual("Five", msg5b.body) + self.assertEqual("Three", msg3b.content.body) + self.assertEqual("Five", msg5b.content.body) - self.assertEqual(True, msg3b.redelivered) - self.assertEqual(True, msg5b.redelivered) + self.assertEqual(True, msg3b.content['redelivered']) + self.assertEqual(True, msg5b.content['redelivered']) - self.assertEqual("Six", queue2.get(timeout=1).body) + self.assertEqual("Six", queue2.get(timeout=1).content.body) try: extra = queue2.get(timeout=1) - self.fail("Got unexpected message in second queue: " + extra.body) + self.fail("Got unexpected message in second queue: " + extra.content.body) except Empty: None try: extra = queue.get(timeout=1) - self.fail("Got unexpected message in original queue: " + extra.body) + self.fail("Got unexpected message in original queue: " + extra.content.body) except Empty: None @@ -264,15 +264,15 @@ class MessageTests(TestBase): #publish 10 messages: for i in range(1, 11): - channel.message_transfer(routing_key="test-prefetch-count", body="Message %d" % i) + channel.message_transfer(content=Content(properties={'routing_key' : "test-prefetch-count"}, body="Message %d" % i)) #only 5 messages should have been delivered: for i in range(1, 6): msg = queue.get(timeout=1) - self.assertEqual("Message %d" % i, msg.body) + self.assertEqual("Message %d" % i, msg.content.body) try: extra = queue.get(timeout=1) - self.fail("Got unexpected 6th message in original queue: " + extra.body) + self.fail("Got unexpected 6th message in original queue: " + extra.content.body) except Empty: None #ack messages and check that the next set arrive ok: @@ -280,13 +280,13 @@ class MessageTests(TestBase): for i in range(6, 11): msg = queue.get(timeout=1) - self.assertEqual("Message %d" % i, msg.body) + self.assertEqual("Message %d" % i, msg.content.body) msg.complete() try: extra = queue.get(timeout=1) - self.fail("Got unexpected 11th message in original queue: " + extra.body) + self.fail("Got unexpected 11th message in original queue: " + extra.content.body) except Empty: None @@ -306,16 +306,16 @@ class MessageTests(TestBase): #publish 10 messages: for i in range(1, 11): - channel.message_transfer(routing_key="test-prefetch-size", body="Message %d" % i) + channel.message_transfer(content=Content(properties={'routing_key' : "test-prefetch-size"}, body="Message %d" % i)) #only 5 messages should have been delivered (i.e. 45 bytes worth): for i in range(1, 6): msg = queue.get(timeout=1) - self.assertEqual("Message %d" % i, msg.body) + self.assertEqual("Message %d" % i, msg.content.body) try: extra = queue.get(timeout=1) - self.fail("Got unexpected 6th message in original queue: " + extra.body) + self.fail("Got unexpected 6th message in original queue: " + extra.content.body) except Empty: None #ack messages and check that the next set arrive ok: @@ -323,328 +323,38 @@ class MessageTests(TestBase): for i in range(6, 11): msg = queue.get(timeout=1) - self.assertEqual("Message %d" % i, msg.body) + self.assertEqual("Message %d" % i, msg.content.body) msg.complete() try: extra = queue.get(timeout=1) - self.fail("Got unexpected 11th message in original queue: " + extra.body) + self.fail("Got unexpected 11th message in original queue: " + extra.content.body) except Empty: None #make sure that a single oversized message still gets delivered large = "abcdefghijklmnopqrstuvwxyz" large = large + "-" + large; - channel.message_transfer(routing_key="test-prefetch-size", body=large) + channel.message_transfer(content=Content(properties={'routing_key' : "test-prefetch-size"}, body=large)) msg = queue.get(timeout=1) - self.assertEqual(large, msg.body) + self.assertEqual(large, msg.content.body) - def test_get(self): - """ - Test message_get method - """ - channel = self.channel - channel.queue_declare(queue="test-get", exclusive=True) - - #publish some messages (no_ack=True) - for i in range(1, 11): - channel.message_transfer(routing_key="test-get", body="Message %d" % i) - - #use message_get to read back the messages, and check that we get an empty at the end - for i in range(1, 11): - tag = "queue %d" % i - reply = channel.message_get(no_ack=True, queue="test-get", destination=tag) - self.assertEqual(reply.method.klass.name, "message") - self.assertEqual(reply.method.name, "ok") - self.assertEqual("Message %d" % i, self.client.queue(tag).get(timeout=1).body) - - reply = channel.message_get(no_ack=True, queue="test-get") - self.assertEqual(reply.method.klass.name, "message") - self.assertEqual(reply.method.name, "empty") - - #repeat for confirm_mode=1 - for i in range(11, 21): - channel.message_transfer(routing_key="test-get", body="Message %d" % i) - - for i in range(11, 21): - tag = "queue %d" % i - reply = channel.message_get(confirm_mode=1, queue="test-get", destination=tag) - self.assertEqual(reply.method.klass.name, "message") - self.assertEqual(reply.method.name, "ok") - msg = self.client.queue(tag).get(timeout=1) - self.assertEqual("Message %d" % i, msg.body) - - if (i==13): - msg.complete()#11, 12 & 13 - if(i in [15, 17, 19]): - msg.complete(cumulative=False) - - reply = channel.message_get(no_ack=True, queue="test-get") - self.assertEqual(reply.method.klass.name, "message") - self.assertEqual(reply.method.name, "empty") - - #recover(requeue=True) - channel.message_recover(requeue=True) - - #get the unacked messages again (14, 16, 18, 20) - for i in [14, 16, 18, 20]: - tag = "queue %d" % i - reply = channel.message_get(confirm_mode=1, queue="test-get", destination=tag) - self.assertEqual(reply.method.klass.name, "message") - self.assertEqual(reply.method.name, "ok") - msg = self.client.queue(tag).get(timeout=1) - self.assertEqual("Message %d" % i, msg.body) - msg.complete() - - reply = channel.message_get(no_ack=True, queue="test-get") - self.assertEqual(reply.method.klass.name, "message") - self.assertEqual(reply.method.name, "empty") - - channel.message_recover(requeue=True) - - reply = channel.message_get(no_ack=True, queue="test-get") - self.assertEqual(reply.method.klass.name, "message") - self.assertEqual(reply.method.name, "empty") - - def test_reference_simple(self): - """ - Test basic ability to handle references - """ - channel = self.channel - channel.queue_declare(queue="ref_queue", exclusive=True) - channel.message_subscribe(queue="ref_queue", destination="c1") - queue = self.client.queue("c1") - - refId = "myref" - channel.message_open(reference=refId) - channel.message_append(reference=refId, bytes="abcd") - channel.synchronous = False - ack = channel.message_transfer(routing_key="ref_queue", body=ReferenceId(refId)) - channel.synchronous = True - - channel.message_append(reference=refId, bytes="efgh") - channel.message_append(reference=refId, bytes="ijkl") - channel.message_close(reference=refId) - - #first, wait for the ok for the transfer - ack.get_response(timeout=1) - - self.assertDataEquals(channel, queue.get(timeout=1), "abcdefghijkl") - - - def test_reference_large(self): - """ - Test basic ability to handle references whose content exceeds max frame size - """ - channel = self.channel - self.queue_declare(queue="ref_queue") - - #generate a big data string (> max frame size of consumer): - data = "0123456789" - for i in range(0, 10): - data += data - #send it inline - channel.synchronous = False - ack = channel.message_transfer(routing_key="ref_queue", body=data) - channel.synchronous = True - #first, wait for the ok for the transfer - ack.get_response(timeout=1) - - #create a new connection for consumer, with specific max frame size (< data) - other = self.connect(tune_params={"channel_max":10, "frame_max":5120, "heartbeat":0}) - ch2 = other.channel(1) - ch2.channel_open() - ch2.message_subscribe(queue="ref_queue", destination="c1") - queue = other.queue("c1") - - msg = queue.get(timeout=1) - self.assertTrue(isinstance(msg.body, ReferenceId)) - self.assertTrue(msg.reference) - self.assertEquals(data, msg.reference.get_complete()) - - def test_reference_completion(self): - """ - Test that reference transfer are not deemed complete until - closed (therefore are not acked or routed until that point) - """ - channel = self.channel - channel.queue_declare(queue="ref_queue", exclusive=True) - channel.message_subscribe(queue="ref_queue", destination="c1") - queue = self.client.queue("c1") - - refId = "myref" - channel.message_open(reference=refId) - channel.message_append(reference=refId, bytes="abcd") - channel.synchronous = False - ack = channel.message_transfer(routing_key="ref_queue", body=ReferenceId(refId)) - channel.synchronous = True - - try: - msg = queue.get(timeout=1) - self.fail("Got unexpected message on queue: " + msg) - except Empty: None - - self.assertTrue(not ack.is_complete()) - - channel.message_close(reference=refId) - - #first, wait for the ok for the transfer - ack.get_response(timeout=1) - - self.assertDataEquals(channel, queue.get(timeout=1), "abcd") - def test_reference_multi_transfer(self): - """ - Test that multiple transfer requests for the same reference are - correctly handled. - """ - channel = self.channel - #declare and consume from two queues - channel.queue_declare(queue="q-one", exclusive=True) - channel.queue_declare(queue="q-two", exclusive=True) - channel.message_subscribe(queue="q-one", destination="q-one") - channel.message_subscribe(queue="q-two", destination="q-two") - queue1 = self.client.queue("q-one") - queue2 = self.client.queue("q-two") - - #transfer a single ref to both queues (in separate commands) - channel.message_open(reference="my-ref") - channel.synchronous = False - ack1 = channel.message_transfer(routing_key="q-one", body=ReferenceId("my-ref")) - channel.message_append(reference="my-ref", bytes="my data") - ack2 = channel.message_transfer(routing_key="q-two", body=ReferenceId("my-ref")) - channel.synchronous = True - channel.message_close(reference="my-ref") - - #check that both queues have the message - self.assertDataEquals(channel, queue1.get(timeout=1), "my data") - self.assertDataEquals(channel, queue2.get(timeout=1), "my data") - self.assertEmpty(queue1) - self.assertEmpty(queue2) - - #transfer a single ref to the same queue twice (in separate commands) - channel.message_open(reference="my-ref") - channel.synchronous = False - ack1 = channel.message_transfer(routing_key="q-one", message_id="abc", body=ReferenceId("my-ref")) - channel.message_append(reference="my-ref", bytes="second message") - ack2 = channel.message_transfer(routing_key="q-one", message_id="xyz", body=ReferenceId("my-ref")) - channel.synchronous = True - channel.message_close(reference="my-ref") - - msg1 = queue1.get(timeout=1) - msg2 = queue1.get(timeout=1) - #order is undefined - if msg1.message_id == "abc": - self.assertEquals(msg2.message_id, "xyz") - else: - self.assertEquals(msg1.message_id, "xyz") - self.assertEquals(msg2.message_id, "abc") - - #would be legal for the incoming messages to be transfered - #inline or by reference in any combination - - if isinstance(msg1.body, ReferenceId): - self.assertEquals("second message", msg1.reference.get_complete()) - if isinstance(msg2.body, ReferenceId): - if msg1.body != msg2.body: - self.assertEquals("second message", msg2.reference.get_complete()) - #else ok, as same ref as msg1 - else: - self.assertEquals("second message", msg1.body) - if isinstance(msg2.body, ReferenceId): - self.assertEquals("second message", msg2.reference.get_complete()) - else: - self.assertEquals("second message", msg2.body) - - self.assertEmpty(queue1) - - def test_reference_unopened_on_append_error(self): - channel = self.channel - try: - channel.message_append(reference="unopened") - except Closed, e: - self.assertConnectionException(503, e.args[0]) - - def test_reference_unopened_on_close_error(self): - channel = self.channel - try: - channel.message_close(reference="unopened") - except Closed, e: - self.assertConnectionException(503, e.args[0]) - - def test_reference_unopened_on_transfer_error(self): - channel = self.channel - try: - channel.message_transfer(body=ReferenceId("unopened")) - except Closed, e: - self.assertConnectionException(503, e.args[0]) - - def test_reference_already_opened_error(self): - channel = self.channel - channel.message_open(reference="a") - try: - channel.message_open(reference="a") - except Closed, e: - self.assertConnectionException(503, e.args[0]) - - def test_empty_reference(self): - channel = self.channel - channel.queue_declare(queue="ref_queue", exclusive=True) - channel.message_subscribe(queue="ref_queue", destination="c1") - queue = self.client.queue("c1") - - refId = "myref" - channel.message_open(reference=refId) - channel.synchronous = False - ack = channel.message_transfer(routing_key="ref_queue", message_id="empty-msg", body=ReferenceId(refId)) - channel.synchronous = True - channel.message_close(reference=refId) - - #first, wait for the ok for the transfer - ack.get_response(timeout=1) - - msg = queue.get(timeout=1) - self.assertEquals(msg.message_id, "empty-msg") - self.assertDataEquals(channel, msg, "") def test_reject(self): channel = self.channel channel.queue_declare(queue = "q", exclusive=True) channel.message_subscribe(queue = "q", destination = "consumer") - channel.message_transfer(routing_key = "q", body="blah, blah") + channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body="blah, blah")) msg = self.client.queue("consumer").get(timeout = 1) - self.assertEquals(msg.body, "blah, blah") + self.assertEquals(msg.content.body, "blah, blah") channel.message_cancel(destination = "consumer") msg.reject() channel.message_subscribe(queue = "q", destination = "checker") msg = self.client.queue("checker").get(timeout = 1) - self.assertEquals(msg.body, "blah, blah") - - def test_checkpoint(self): - channel = self.channel - channel.queue_declare(queue = "q", exclusive=True) - - channel.message_open(reference="my-ref") - channel.message_append(reference="my-ref", bytes="abcdefgh") - channel.message_append(reference="my-ref", bytes="ijklmnop") - channel.message_checkpoint(reference="my-ref", identifier="my-checkpoint") - channel.channel_close() - - channel = self.client.channel(2) - channel.channel_open() - channel.message_subscribe(queue = "q", destination = "consumer") - offset = channel.message_resume(reference="my-ref", identifier="my-checkpoint").value - self.assertTrue(offset<=16) - channel.message_append(reference="my-ref", bytes="qrstuvwxyz") - channel.synchronous = False - channel.message_transfer(routing_key="q-one", message_id="abcd", body=ReferenceId("my-ref")) - channel.synchronous = True - channel.message_close(reference="my-ref") - - self.assertDataEquals(channel, self.client.queue("consumer").get(timeout = 1), "abcdefghijklmnopqrstuvwxyz") - self.assertEmpty(self.client.queue("consumer")) + self.assertEquals(msg.content.body, "blah, blah") def test_credit_flow_messages(self): """ @@ -660,7 +370,7 @@ class MessageTests(TestBase): channel.message_stop(destination = "c") #send batch of messages to queue for i in range(1, 11): - channel.message_transfer(routing_key = "q", body = "Message %d" % i) + channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %d" % i)) #set message credit to finite amount (less than enough for all messages) channel.message_flow(unit = 0, value = 5, destination = "c") @@ -692,7 +402,7 @@ class MessageTests(TestBase): channel.message_stop(destination = "c") #send batch of messages to queue for i in range(1, 11): - channel.message_transfer(routing_key = "q", body = "abcdefgh") + channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "abcdefgh")) #each message is currently interpreted as requiring 75 bytes of credit #set byte credit to finite amount (less than enough for all messages) @@ -726,7 +436,7 @@ class MessageTests(TestBase): channel.message_stop(destination = "c") #send batch of messages to queue for i in range(1, 11): - channel.message_transfer(routing_key = "q", body = "Message %d" % i) + channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %d" % i)) #set message credit to finite amount (less than enough for all messages) channel.message_flow(unit = 0, value = 5, destination = "c") @@ -760,7 +470,7 @@ class MessageTests(TestBase): channel.message_stop(destination = "c") #send batch of messages to queue for i in range(1, 11): - channel.message_transfer(routing_key = "q", body = "abcdefgh") + channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "abcdefgh")) #each message is currently interpreted as requiring 75 bytes of credit #set byte credit to finite amount (less than enough for all messages) @@ -783,11 +493,5 @@ class MessageTests(TestBase): self.assertDataEquals(channel, q.get(timeout = 1), "abcdefgh") self.assertEmpty(q) - - def assertDataEquals(self, channel, msg, expected): - if isinstance(msg.body, ReferenceId): - data = msg.reference.get_complete() - else: - data = msg.body - self.assertEquals(expected, data) + self.assertEquals(expected, msg.content.body) diff --git a/qpid/python/tests_0-10/queue.py b/qpid/python/tests_0-10/queue.py index 8d99c50d32..05fa1aebc6 100644 --- a/qpid/python/tests_0-10/queue.py +++ b/qpid/python/tests_0-10/queue.py @@ -33,9 +33,9 @@ class QueueTests(TestBase): channel.exchange_declare(exchange="test-exchange", type="direct") channel.queue_declare(queue="test-queue", exclusive=True) channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key") - channel.message_transfer(destination="test-exchange", routing_key="key", body="one") - channel.message_transfer(destination="test-exchange", routing_key="key", body="two") - channel.message_transfer(destination="test-exchange", routing_key="key", body="three") + channel.message_transfer(destination="test-exchange", content=Content("one", properties={'routing_key':"key"})) + channel.message_transfer(destination="test-exchange", content=Content("two", properties={'routing_key':"key"})) + channel.message_transfer(destination="test-exchange", content=Content("three", properties={'routing_key':"key"})) #check that the queue now reports 3 messages: channel.queue_declare(queue="test-queue") @@ -48,11 +48,11 @@ class QueueTests(TestBase): self.assertEqual(0, reply.message_count) #send a further message and consume it, ensuring that the other messages are really gone - channel.message_transfer(destination="test-exchange", routing_key="key", body="four") + channel.message_transfer(destination="test-exchange", content=Content("four", properties={'routing_key':"key"})) channel.message_subscribe(queue="test-queue", destination="tag") queue = self.client.queue("tag") msg = queue.get(timeout=1) - self.assertEqual("four", msg.body) + self.assertEqual("four", msg.content.body) #check error conditions (use new channels): channel = self.client.channel(2) @@ -179,23 +179,25 @@ class QueueTests(TestBase): channel.queue_bind(exchange=exchange, queue="queue-2", routing_key=routing_key, arguments=args) #send a message that will match both bindings - channel.message_transfer(destination=exchange, routing_key=routing_key, application_headers=headers, body="one") + channel.message_transfer(destination=exchange, + content=Content("one", properties={'routing_key':routing_key, 'application_headers':headers})) #unbind first queue channel.queue_unbind(exchange=exchange, queue="queue-1", routing_key=routing_key, arguments=args) #send another message - channel.message_transfer(destination=exchange, routing_key=routing_key, application_headers=headers, body="two") + channel.message_transfer(destination=exchange, + content=Content("two", properties={'routing_key':routing_key, 'application_headers':headers})) #check one queue has both messages and the other has only one - self.assertEquals("one", queue1.get(timeout=1).body) + self.assertEquals("one", queue1.get(timeout=1).content.body) try: msg = queue1.get(timeout=1) - self.fail("Got extra message: %s" % msg.body) + self.fail("Got extra message: %s" % msg.content.body) except Empty: pass - self.assertEquals("one", queue2.get(timeout=1).body) - self.assertEquals("two", queue2.get(timeout=1).body) + self.assertEquals("one", queue2.get(timeout=1).content.body) + self.assertEquals("two", queue2.get(timeout=1).content.body) try: msg = queue2.get(timeout=1) self.fail("Got extra message: " + msg) @@ -210,9 +212,9 @@ class QueueTests(TestBase): #straight-forward case: channel.queue_declare(queue="delete-me") - channel.message_transfer(routing_key="delete-me", body="a") - channel.message_transfer(routing_key="delete-me", body="b") - channel.message_transfer(routing_key="delete-me", body="c") + channel.message_transfer(content=Content("a", properties={'routing_key':"delete-me"})) + channel.message_transfer(content=Content("b", properties={'routing_key':"delete-me"})) + channel.message_transfer(content=Content("c", properties={'routing_key':"delete-me"})) channel.queue_delete(queue="delete-me") #check that it has gone be declaring passively try: @@ -241,7 +243,7 @@ class QueueTests(TestBase): #create a queue and add a message to it (use default binding): channel.queue_declare(queue="delete-me-2") channel.queue_declare(queue="delete-me-2", passive="True") - channel.message_transfer(routing_key="delete-me-2", body="message") + channel.message_transfer(content=Content("message", properties={'routing_key':"delete-me-2"})) #try to delete, but only if empty: try: @@ -258,7 +260,7 @@ class QueueTests(TestBase): channel.message_subscribe(destination="consumer_tag", queue="delete-me-2") queue = self.client.queue("consumer_tag") msg = queue.get(timeout=1) - self.assertEqual("message", msg.body) + self.assertEqual("message", msg.content.body) channel.message_cancel(destination="consumer_tag") #retry deletion on empty queue: diff --git a/qpid/python/tests_0-10/testlib.py b/qpid/python/tests_0-10/testlib.py index f345fbbd80..a0355c4ce0 100644 --- a/qpid/python/tests_0-10/testlib.py +++ b/qpid/python/tests_0-10/testlib.py @@ -49,7 +49,7 @@ class TestBaseTest(TestBase): def testAssertEmptyFail(self): self.queue_declare(queue="full") q = self.consume("full") - self.channel.message_transfer(routing_key="full", body="") + self.channel.message_transfer(content=Content("", properties={'routing_key':"full"})) try: self.assertEmpty(q); self.fail("assertEmpty did not assert on non-empty queue") diff --git a/qpid/python/tests_0-10/tx.py b/qpid/python/tests_0-10/tx.py index 4c2f75d35e..7c50de4ee2 100644 --- a/qpid/python/tests_0-10/tx.py +++ b/qpid/python/tests_0-10/tx.py @@ -53,21 +53,21 @@ class TxTests(TestBase): #check results for i in range(1, 5): msg = queue_c.get(timeout=1) - self.assertEqual("TxMessage %d" % i, msg.body) + self.assertEqual("TxMessage %d" % i, msg.content.body) msg.complete() msg = queue_b.get(timeout=1) - self.assertEqual("TxMessage 6", msg.body) + self.assertEqual("TxMessage 6", msg.content.body) msg.complete() msg = queue_a.get(timeout=1) - self.assertEqual("TxMessage 7", msg.body) + self.assertEqual("TxMessage 7", msg.content.body) msg.complete() for q in [queue_a, queue_b, queue_c]: try: extra = q.get(timeout=1) - self.fail("Got unexpected message: " + extra.body) + self.fail("Got unexpected message: " + extra.content.body) except Empty: None #cleanup @@ -83,7 +83,7 @@ class TxTests(TestBase): for q in [queue_a, queue_b, queue_c]: try: extra = q.get(timeout=1) - self.fail("Got unexpected message: " + extra.body) + self.fail("Got unexpected message: " + extra.content.body) except Empty: None channel.tx_rollback() @@ -91,21 +91,21 @@ class TxTests(TestBase): #check results for i in range(1, 5): msg = queue_a.get(timeout=1) - self.assertEqual("Message %d" % i, msg.body) + self.assertEqual("Message %d" % i, msg.content.body) msg.complete() msg = queue_b.get(timeout=1) - self.assertEqual("Message 6", msg.body) + self.assertEqual("Message 6", msg.content.body) msg.complete() msg = queue_c.get(timeout=1) - self.assertEqual("Message 7", msg.body) + self.assertEqual("Message 7", msg.content.body) msg.complete() for q in [queue_a, queue_b, queue_c]: try: extra = q.get(timeout=1) - self.fail("Got unexpected message: " + extra.body) + self.fail("Got unexpected message: " + extra.content.body) except Empty: None #cleanup @@ -121,7 +121,7 @@ class TxTests(TestBase): for q in [queue_a, queue_b, queue_c]: try: extra = q.get(timeout=1) - self.fail("Got unexpected message: " + extra.body) + self.fail("Got unexpected message: " + extra.content.body) except Empty: None channel.tx_rollback() @@ -129,21 +129,21 @@ class TxTests(TestBase): #check results for i in range(1, 5): msg = queue_a.get(timeout=1) - self.assertEqual("Message %d" % i, msg.body) + self.assertEqual("Message %d" % i, msg.content.body) msg.complete() msg = queue_b.get(timeout=1) - self.assertEqual("Message 6", msg.body) + self.assertEqual("Message 6", msg.content.body) msg.complete() msg = queue_c.get(timeout=1) - self.assertEqual("Message 7", msg.body) + self.assertEqual("Message 7", msg.content.body) msg.complete() for q in [queue_a, queue_b, queue_c]: try: extra = q.get(timeout=1) - self.fail("Got unexpected message: " + extra.body) + self.fail("Got unexpected message: " + extra.content.body) except Empty: None #cleanup @@ -166,10 +166,12 @@ class TxTests(TestBase): channel.queue_bind(queue=name_c, exchange="amq.topic", routing_key=topic) for i in range(1, 5): - channel.message_transfer(routing_key=name_a, message_id="msg%d" % i, body="Message %d" % i) + channel.message_transfer(content=Content(properties={'routing_key':name_a, 'message_id':"msg%d" % i}, body="Message %d" % i)) - channel.message_transfer(routing_key=key, destination="amq.direct", message_id="msg6", body="Message 6") - channel.message_transfer(routing_key=topic, destination="amq.topic", message_id="msg7", body="Message 7") + channel.message_transfer(destination="amq.direct", + content=Content(properties={'routing_key':key, 'message_id':"msg6"}, body="Message 6")) + channel.message_transfer(destination="amq.topic", + content=Content(properties={'routing_key':topic, 'message_id':"msg7"}, body="Message 7")) channel.tx_select() @@ -178,27 +180,31 @@ class TxTests(TestBase): queue_a = self.client.queue("sub_a") for i in range(1, 5): msg = queue_a.get(timeout=1) - self.assertEqual("Message %d" % i, msg.body) + self.assertEqual("Message %d" % i, msg.content.body) msg.complete() channel.message_subscribe(queue=name_b, destination="sub_b", confirm_mode=1) queue_b = self.client.queue("sub_b") msg = queue_b.get(timeout=1) - self.assertEqual("Message 6", msg.body) + self.assertEqual("Message 6", msg.content.body) msg.complete() sub_c = channel.message_subscribe(queue=name_c, destination="sub_c", confirm_mode=1) queue_c = self.client.queue("sub_c") msg = queue_c.get(timeout=1) - self.assertEqual("Message 7", msg.body) + self.assertEqual("Message 7", msg.content.body) msg.complete() #publish messages for i in range(1, 5): - channel.message_transfer(routing_key=topic, destination="amq.topic", message_id="tx-msg%d" % i, body="TxMessage %d" % i) - - channel.message_transfer(routing_key=key, destination="amq.direct", message_id="tx-msg6", body="TxMessage 6") - channel.message_transfer(routing_key=name_a, message_id="tx-msg7", body="TxMessage 7") - + channel.message_transfer(destination="amq.topic", + content=Content(properties={'routing_key':topic, 'message_id':"tx-msg%d" % i}, + body="TxMessage %d" % i)) + + channel.message_transfer(destination="amq.direct", + content=Content(properties={'routing_key':key, 'message_id':"tx-msg6"}, + body="TxMessage 6")) + channel.message_transfer(content=Content(properties={'routing_key':name_a, 'message_id':"tx-msg7"}, + body="TxMessage 7")) return queue_a, queue_b, queue_c diff --git a/qpid/specs/amqp-transitional.0-10.xml b/qpid/specs/amqp-transitional.0-10.xml index b1fe81373a..7949ed7ed9 100644 --- a/qpid/specs/amqp-transitional.0-10.xml +++ b/qpid/specs/amqp-transitional.0-10.xml @@ -6253,252 +6253,6 @@ <chassis name="client" implement="MUST" /> <!-- - Method: message.transfer - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - --> - <method name="transfer" index="10" label="transfer a message"> - <doc> - This method transfers a message between two peers. When a client uses this method to publish - a message to a broker, the destination identifies a specific exchange. The message will then - be routed to queues as defined by the exchange configuration and distributed to any active - consumers when the transaction, if any, is committed. - - In the asynchronous message delivery model, the client starts a consumer using the Consume - method and passing in a destination, then the broker responds with transfer methods to the - specified destination as and when messages arrive for that consumer. - - If synchronous message delivery is required, the client may issue a get request which on - success causes a single message to be transferred to the specified destination. - - Message acknowledgement is signalled by the return result of this method. - </doc> - - <rule name="process-before-ok"> - <doc> - The recipient MUST NOT return ok before the message has been processed as defined by the - QoS settings. - </doc> - </rule> - - <chassis name="server" implement="MUST" /> - <chassis name="client" implement="MUST" /> - - <field name="ticket" domain="access-ticket" label="access ticket"> - <rule name="validity" on-failure="access-refused"> - <doc> - The client MUST provide a valid access ticket giving "passive" access rights to the - realm for the exchange and "write" access rights to the realm for the queue to which the - message will be published. - </doc> - </rule> - </field> - - <field name="destination" domain="destination" label="message distination"> - <doc> - Specifies the destination to which the message is to be transferred. The destination can - be empty, meaning the default exchange or consumer. If the destination is specified, and - that exchange or consumer does not exist, the peer must raise a channel exception. - </doc> - - <rule name="blank-destination"> - <doc> - The server MUST accept a blank destination to mean the default exchange. - </doc> - </rule> - - <rule name="internal-exchange"> - <doc> - If the destination refers to an internal exchange, the server MUST raise a channel - exception with a reply code 403 (access refused). - </doc> - </rule> - - <rule name="message-refusal"> - <doc> - A destination MAY refuse message content in which case it MUST raise a channel exception - with reply code 540 (not implemented). - </doc> - </rule> - </field> - - <field name="redelivered" domain="redelivered" label="redelivery flag"> - <doc> - This boolean flag indicates that the message has been previously delivered to this or - another client. - </doc> - </field> - - <field name="reject-unroutable" domain="bit" label="reject message if unroutable flag"> - <doc> - If the reject-unroutable flag is set, then at the time of publishing the broker determines - if the message will be routed to any queues. If it will not be routed to any queue then - the broker responds with a message.reject. - </doc> - </field> - - <field name="immediate" domain="bit" label="request immediate delivery"> - </field> - - <field name="ttl" domain="duration" label="time to live"> - <doc> - If this is set to a non zero value then a message expiration time will be computed based - on the current time plus this value. Messages that live longer than their expiration time - will be discarded (or dead lettered). - </doc> - <rule name="ttl-decrement"> - <doc> - If a message is transfered between brokers before delivery to a final consumer the ttl - should be decremented before peer to peer transfer and both timestamp and expiration - should be cleared. - </doc> - </rule> - </field> - - <!-- begin headers --> - <field name="priority" domain="octet" label="message priority, 0 to 9"> - <doc> - Message priority, which can be between 0 and 9. Messages with higher priorities may be - delivered before those with lower priorities. - </doc> - </field> - - <field name="timestamp" domain="timestamp" label="message timestamp"> - <doc> - The timestamp is set by the broker on arrival of the message. - </doc> - </field> - - <field name="delivery-mode" domain="octet" label="message persistence"> - <doc> - The delivery mode may be non-persistent (1) or persistent (2). A persistent message is one - which must be stored on a persistent medium (usually hard drive) at every stage of - delivery so that it will not be lost in event of failure (other than the medium itself). - This is normally accomplished with some additional overhead. A persistent message may be - delivered more than once if there is uncertainty about the state of its delivery after a - failure and recovery. - - Conversely, a non-persistent message may be lost in event of a failure, but the nature of - the communication is such that an occasional message loss is tolerable. This is the lowest - overhead mode. Non-persistent messages are delivered at most once only. - </doc> - </field> - - <field name="expiration" domain="timestamp" label="message expiration time"> - <doc> - The expiration header assigned by the broker. After receiving the message the broker sets - expiration to the sum of the ttl specified in the publish method and the current time. - (ttl=expiration - timestamp) - </doc> - </field> - - <field name="exchange" domain="exchange-name" label="originating exchange"> - <doc> - The exchange name is a client-selected string that identifies the exchange for transfer - methods. Exchange names may consist of any mixture of digits, letters, and underscores. - Exchange names are scoped by the virtual host. - </doc> - </field> - - <field name="routing-key" domain="shortstr" label="message routing key"> - <doc> - The value of the key determines to which queue the exchange will send the message. The way - in which keys are used to make this routing decision depends on the type of exchange to - which the message is sent. For example, a direct exchange will route a message to a queue - if that queue is bound to the exchange with an identical key to that of the message. - </doc> - </field> - - <field name="message-id" domain="shortstr" label="application message identifier"> - <doc> - This is a unique identifier for the message that is guaranteed to be unique across - multiple instances, sessions and in time. This allows duplicate messages to be detected. - This may be a UUID. Note that this is usually set by the server when it first receives a - message. - - If a client wishes to identify a message, it should use the correlation-id instead. - </doc> - </field> - - <field name="correlation-id" domain="shortstr" label="application correlation identifier"> - <doc> - This is a client-specific id that may be used to mark or identify messages between - clients. The server ignores this field. - </doc> - </field> - - <field name="reply-to" domain="shortstr" label="destination to reply to"> - <doc> - The destination of any message that is sent in reply to this message. - </doc> - </field> - - <field name="content-type" domain="shortstr" label="MIME content type"> - <doc> - The RFC-2046 MIME type for the message content (such as "text/plain"). This is set by the - originating client. - </doc> - </field> - - <field name="content-encoding" domain="shortstr" label="MIME content encoding"> - <doc> - The encoding for character-based message content. This is set by the originating client. - Examples include UTF-8 and ISO-8859-16. - </doc> - </field> - - <field name="content-length" domain="longlong" label="length of content in bytes"> - <doc> - The length of the message content in bytes. - </doc> - </field> - - <field name="type" domain="shortstr" label="message type name"> - <doc> - The JMS message type. - </doc> - </field> - - <field name="user-id" domain="shortstr" label="creating user id"> - <doc> - The identity of the user responsible for producing the message. - </doc> - </field> - - <field name="app-id" domain="shortstr" label="creating application id"> - <doc> - The identity of the client application responsible for producing the message. - </doc> - </field> - - <field name="transaction-id" domain="shortstr" label="distributed transaction id"> - <doc> - An identifier that links this message to a distributed transaction. - </doc> - </field> - - <field name="security-token" domain="security-token" label="security token"> - <doc> - A security token used for authentication, replay prevention, and encrypted message bodies. - </doc> - </field> - - <field name="application-headers" domain="table" label="application specific headers table"> - <doc> - This is a collection of user-defined headers or properties which may be set by the - producing client and retrieved by the consuming client. Similar to JMS Properties. - </doc> - </field> - <!-- end headers --> - - <field name="body" domain="content" label="message body"> - <doc> - Message content. This should be considered opaque data. - </doc> - </field> - - </method> - - <!-- - the above is still the 0-9 definition and will shortly be - removed in favour of the following which is the real 0-10 - defintion: <method name="transfer" content="1" index="10" label="transfer a message"> <doc> @@ -6556,7 +6310,6 @@ <field name="confirm-mode" domain="confirm-mode" /> <field name="acquire-mode" domain="acquire-mode" /> </method> - --> <!-- - Method: message.reject - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - --> |