diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/Makefile.am | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DeliveryAdapter.h | 35 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DeliveryRecord.cpp | 36 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DeliveryRecord.h | 18 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DeliveryToken.h | 45 | ||||
-rw-r--r-- | cpp/src/qpid/broker/MessageDelivery.cpp | 89 | ||||
-rw-r--r-- | cpp/src/qpid/broker/MessageDelivery.h | 55 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 30 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.h | 17 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionAdapter.cpp | 9 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionState.cpp | 7 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionState.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementBroker.cpp | 1 | ||||
-rw-r--r-- | cpp/src/tests/DeliveryRecordTest.cpp | 4 | ||||
-rw-r--r-- | cpp/src/tests/MessageUtils.h | 1 | ||||
-rw-r--r-- | cpp/src/tests/QueuePolicyTest.cpp | 1 | ||||
-rwxr-xr-x | cpp/src/tests/run_federation_tests | 2 |
17 files changed, 80 insertions, 275 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 9248ea4b06..ef0af1b4d3 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -328,7 +328,6 @@ libqpidbroker_la_SOURCES = \ qpid/broker/Message.cpp \ qpid/broker/MessageAdapter.cpp \ qpid/broker/MessageBuilder.cpp \ - qpid/broker/MessageDelivery.cpp \ qpid/broker/MessageStoreModule.cpp \ qpid/broker/NameGenerator.cpp \ qpid/broker/NullMessageStore.cpp \ @@ -446,7 +445,6 @@ nobase_include_HEADERS = \ qpid/broker/DeliveryAdapter.h \ qpid/broker/DeliveryId.h \ qpid/broker/DeliveryRecord.h \ - qpid/broker/DeliveryToken.h \ qpid/broker/DirectExchange.h \ qpid/broker/DtxAck.h \ qpid/broker/DtxBuffer.h \ @@ -464,7 +462,6 @@ nobase_include_HEADERS = \ qpid/broker/Message.h \ qpid/broker/MessageAdapter.h \ qpid/broker/MessageBuilder.h \ - qpid/broker/MessageDelivery.h \ qpid/broker/MessageStore.h \ qpid/broker/MessageStoreModule.h \ qpid/broker/NameGenerator.h \ diff --git a/cpp/src/qpid/broker/DeliveryAdapter.h b/cpp/src/qpid/broker/DeliveryAdapter.h index 4c2b2f615f..0e9d7d3929 100644 --- a/cpp/src/qpid/broker/DeliveryAdapter.h +++ b/cpp/src/qpid/broker/DeliveryAdapter.h @@ -22,29 +22,30 @@ #define _DeliveryAdapter_ #include "DeliveryId.h" -#include "DeliveryToken.h" #include "Message.h" #include "qpid/framing/amqp_types.h" namespace qpid { namespace broker { - /** - * The intention behind this interface is to separate the generic - * handling of some form of message delivery to clients that is - * contained in the version independent Channel class from the - * details required for a particular situation or - * version. i.e. where the existing adapters allow (through - * supporting the generated interface for a version of the - * protocol) inputs of a channel to be adapted to the version - * independent part, this does the same for the outputs. - */ - class DeliveryAdapter - { - public: - virtual DeliveryId deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token) = 0; - virtual ~DeliveryAdapter(){} - }; +class DeliveryRecord; + +/** + * The intention behind this interface is to separate the generic + * handling of some form of message delivery to clients that is + * contained in the version independent Channel class from the + * details required for a particular situation or + * version. i.e. where the existing adapters allow (through + * supporting the generated interface for a version of the + * protocol) inputs of a channel to be adapted to the version + * independent part, this does the same for the outputs. + */ +class DeliveryAdapter +{ + public: + virtual void deliver(DeliveryRecord&) = 0; + virtual ~DeliveryAdapter(){} +}; }} diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp index e19a61f992..1d6c60b569 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.cpp +++ b/cpp/src/qpid/broker/DeliveryRecord.cpp @@ -23,32 +23,29 @@ #include "SemanticState.h" #include "Exchange.h" #include "qpid/log/Statement.h" +#include "qpid/framing/FrameHandler.h" +#include "qpid/framing/MessageTransferBody.h" using namespace qpid::broker; +using namespace qpid::framing; using std::string; DeliveryRecord::DeliveryRecord(const QueuedMessage& _msg, Queue::shared_ptr _queue, const std::string _tag, - DeliveryToken::shared_ptr _token, - const DeliveryId _id, bool _acquired, bool accepted, bool _windowing) : msg(_msg), queue(_queue), tag(_tag), - token(_token), - id(_id), acquired(_acquired), - pull(false), + acceptExpected(!accepted), cancelled(false), credit(msg.payload ? msg.payload->getRequiredCredit() : 0), size(msg.payload ? msg.payload->contentSize() : 0), completed(false), ended(accepted), windowing(_windowing) -{ - if (accepted) setEnded(); -} +{} void DeliveryRecord::setEnded() { @@ -77,20 +74,31 @@ bool DeliveryRecord::coveredBy(const framing::SequenceSet* const range) const{ void DeliveryRecord::redeliver(SemanticState* const session) { if (!ended) { - if(pull || cancelled){ - //if message was originally sent as response to get, we must requeue it - - //or if subscription was cancelled, requeue it (waiting for + if(cancelled){ + //if subscription was cancelled, requeue it (waiting for //final confirmation for AMQP WG on this case) - requeue(); }else{ msg.payload->redeliver();//mark as redelivered - id = session->redeliver(msg, token); + session->deliver(*this); } } } +void DeliveryRecord::deliver(framing::FrameHandler& h, DeliveryId deliveryId, uint16_t framesize) +{ + id = deliveryId; + if (msg.payload->getRedelivered()){ + msg.payload->getProperties<DeliveryProperties>()->setRedelivered(true); + } + + AMQFrame method(in_place<MessageTransferBody>(ProtocolVersion(), tag, acceptExpected ? 0 : 1, acquired ? 0 : 1)); + method.setEof(false); + h.handle(method); + msg.payload->sendHeader(h, framesize); + msg.payload->sendContent(*queue, h, framesize); +} + void DeliveryRecord::requeue() const { if (acquired && !ended) { diff --git a/cpp/src/qpid/broker/DeliveryRecord.h b/cpp/src/qpid/broker/DeliveryRecord.h index 8fdfaa71e6..d631fe124c 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.h +++ b/cpp/src/qpid/broker/DeliveryRecord.h @@ -27,9 +27,8 @@ #include <ostream> #include "qpid/framing/SequenceSet.h" #include "Queue.h" -#include "Consumer.h" +#include "QueuedMessage.h" #include "DeliveryId.h" -#include "DeliveryToken.h" #include "Message.h" namespace qpid { @@ -43,10 +42,9 @@ class DeliveryRecord{ QueuedMessage msg; mutable Queue::shared_ptr queue; const std::string tag; - DeliveryToken::shared_ptr token; DeliveryId id; bool acquired; - const bool pull; + bool acceptExpected; bool cancelled; const uint32_t credit; const uint64_t size; @@ -56,13 +54,14 @@ class DeliveryRecord{ const bool windowing; public: - DeliveryRecord(const QueuedMessage& msg, Queue::shared_ptr queue, const std::string tag, DeliveryToken::shared_ptr token, - const DeliveryId id, bool acquired, bool confirmed, bool windowing); + DeliveryRecord(const QueuedMessage& msg, Queue::shared_ptr queue, + const std::string tag, + bool acquired, bool confirmed, bool windowing); bool matches(DeliveryId tag) const; bool matchOrAfter(DeliveryId tag) const; bool after(DeliveryId tag) const; bool coveredBy(const framing::SequenceSet* const range) const; - + void dequeue(TransactionContext* ctxt = 0) const; void requeue() const; void release(bool setRedelivered); @@ -80,7 +79,10 @@ class DeliveryRecord{ uint32_t getCredit() const; const std::string& getTag() const { return tag; } - bool isPull() const { return pull; } + + void deliver(framing::FrameHandler& h, DeliveryId deliveryId, uint16_t framesize); + void setId(DeliveryId _id) { id = _id; } + friend bool operator<(const DeliveryRecord&, const DeliveryRecord&); friend std::ostream& operator<<(std::ostream&, const DeliveryRecord&); }; diff --git a/cpp/src/qpid/broker/DeliveryToken.h b/cpp/src/qpid/broker/DeliveryToken.h deleted file mode 100644 index 8bdf5e6359..0000000000 --- a/cpp/src/qpid/broker/DeliveryToken.h +++ /dev/null @@ -1,45 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _DeliveryToken_ -#define _DeliveryToken_ - -#include <boost/shared_ptr.hpp> - -namespace qpid { -namespace broker { - - /** - * A DeliveryToken allows the delivery of a message to be - * associated with whatever mechanism caused it to be - * delivered. (i.e. its a form of Memento). - */ - class DeliveryToken - { - public: - typedef boost::shared_ptr<DeliveryToken> shared_ptr; - - virtual ~DeliveryToken(){} - }; - -}} - - -#endif diff --git a/cpp/src/qpid/broker/MessageDelivery.cpp b/cpp/src/qpid/broker/MessageDelivery.cpp deleted file mode 100644 index a757d191e7..0000000000 --- a/cpp/src/qpid/broker/MessageDelivery.cpp +++ /dev/null @@ -1,89 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "MessageDelivery.h" - -#include "DeliveryToken.h" -#include "Message.h" -#include "Queue.h" -#include "qpid/framing/FrameHandler.h" -#include "qpid/framing/MessageTransferBody.h" - - -using namespace boost; -using namespace qpid::broker; -using namespace qpid::framing; - -namespace qpid{ -namespace broker{ - -struct BaseToken : DeliveryToken -{ - virtual ~BaseToken() {} - virtual AMQFrame sendMethod(intrusive_ptr<Message> msg, DeliveryId id) = 0; -}; - -struct MessageDeliveryToken : BaseToken -{ - const std::string destination; - const uint8_t confirmMode; - const uint8_t acquireMode; - const bool isPreview; - - MessageDeliveryToken(const std::string& d, uint8_t c, uint8_t a, bool p) : - destination(d), confirmMode(c), acquireMode(a), isPreview(p) {} - - AMQFrame sendMethod(intrusive_ptr<Message> msg, DeliveryId /*id*/) - { - //may need to set the redelivered flag: - if (msg->getRedelivered()){ - msg->getProperties<DeliveryProperties>()->setRedelivered(true); - } - return AMQFrame(in_place<MessageTransferBody>( - ProtocolVersion(), destination, confirmMode, acquireMode)); - } -}; - -} -} - -DeliveryToken::shared_ptr MessageDelivery::getMessageDeliveryToken(const std::string& destination, - uint8_t confirmMode, uint8_t acquireMode) -{ - return DeliveryToken::shared_ptr(new MessageDeliveryToken(destination, confirmMode, acquireMode, false)); -} - -void MessageDelivery::deliver(QueuedMessage& msg, - framing::FrameHandler& handler, - DeliveryId id, - DeliveryToken::shared_ptr token, - uint16_t framesize) -{ - //currently a message published from one class and delivered to - //another may well have the wrong headers; however we will only - //have one content class for 0-10 proper - - boost::shared_ptr<BaseToken> t = dynamic_pointer_cast<BaseToken>(token); - AMQFrame method = t->sendMethod(msg.payload, id); - method.setEof(false); - handler.handle(method); - msg.payload->sendHeader(handler, framesize); - msg.payload->sendContent(*(msg.queue), handler, framesize); -} diff --git a/cpp/src/qpid/broker/MessageDelivery.h b/cpp/src/qpid/broker/MessageDelivery.h deleted file mode 100644 index cfde9ee307..0000000000 --- a/cpp/src/qpid/broker/MessageDelivery.h +++ /dev/null @@ -1,55 +0,0 @@ -#ifndef _broker_MessageDelivery_h -#define _broker_MessageDelivery_h - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <boost/shared_ptr.hpp> -#include "DeliveryId.h" -#include "Consumer.h" -#include "qpid/framing/FrameHandler.h" - -namespace qpid { -namespace broker { - -class DeliveryToken; -class Message; -class Queue; - -/** - * TODO: clean this up; we don't need it anymore in its current form - * - * Encapsulates the different options for message delivery currently supported. - */ -class MessageDelivery { -public: - static boost::shared_ptr<DeliveryToken> getMessageDeliveryToken(const std::string& destination, - uint8_t confirmMode, - uint8_t acquireMode); - - static void deliver(QueuedMessage& msg, framing::FrameHandler& out, - DeliveryId deliveryTag, boost::shared_ptr<DeliveryToken> token, uint16_t framesize); -}; - -} -} - - -#endif /*!_broker_MessageDelivery_h*/ diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index 4d735c9abc..915b7e147c 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -84,16 +84,14 @@ bool SemanticState::exists(const string& consumerTag){ return consumers.find(consumerTag) != consumers.end(); } -void SemanticState::consume(DeliveryToken::shared_ptr token, string& tagInOut, - Queue::shared_ptr queue, bool nolocal, bool ackRequired, bool acquire, +void SemanticState::consume(const string& tag, + Queue::shared_ptr queue, bool ackRequired, bool acquire, bool exclusive, const string& resumeId, uint64_t resumeTtl, const FieldTable& arguments) { - if(tagInOut.empty()) - tagInOut = tagGenerator.generate(); - ConsumerImpl::shared_ptr c(new ConsumerImpl(this, token, tagInOut, queue, ackRequired, nolocal, acquire, exclusive, resumeId, resumeTtl, arguments)); + ConsumerImpl::shared_ptr c(new ConsumerImpl(this, tag, queue, ackRequired, acquire, exclusive, resumeId, resumeTtl, arguments)); queue->consume(c, exclusive);//may throw exception outputTasks.addOutputTask(c.get()); - consumers[tagInOut] = c; + consumers[tag] = c; } void SemanticState::cancel(const string& tag){ @@ -233,11 +231,9 @@ void SemanticState::record(const DeliveryRecord& delivery) } SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, - DeliveryToken::shared_ptr _token, const string& _name, Queue::shared_ptr _queue, bool ack, - bool _nolocal, bool _acquire, bool _exclusive, const string& _resumeId, @@ -248,11 +244,9 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, ) : Consumer(_acquire), parent(_parent), - token(_token), name(_name), queue(_queue), ackExpected(ack), - nolocal(_nolocal), acquire(_acquire), blocked(true), windowing(true), @@ -272,10 +266,11 @@ OwnershipToken* SemanticState::ConsumerImpl::getSession() bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg) { allocateCredit(msg.payload); - DeliveryId deliveryTag = - parent->deliveryAdapter.deliver(msg, token); + DeliveryRecord record(msg, queue, name, acquire, !ackExpected, windowing); + parent->deliver(record); + if (!ackExpected) record.setEnded();//allows message to be released now its been delivered if (windowing || ackExpected || !acquire) { - parent->record(DeliveryRecord(msg, queue, name, token, deliveryTag, acquire, !ackExpected, windowing)); + parent->record(record); } if (acquire && !ackExpected) { queue->dequeue(0, msg); @@ -283,10 +278,9 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg) return true; } -bool SemanticState::ConsumerImpl::filter(intrusive_ptr<Message> msg) +bool SemanticState::ConsumerImpl::filter(intrusive_ptr<Message>) { - return !(nolocal && - &parent->getSession().getConnection() == msg->getPublisher()); + return true; } bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg) @@ -454,9 +448,9 @@ void SemanticState::recover(bool requeue) } } -DeliveryId SemanticState::redeliver(QueuedMessage& msg, DeliveryToken::shared_ptr token) +void SemanticState::deliver(DeliveryRecord& msg) { - return deliveryAdapter.deliver(msg, token); + return deliveryAdapter.deliver(msg); } SemanticState::ConsumerImpl& SemanticState::find(const std::string& destination) diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h index c2d8cc7d0b..866ef4c209 100644 --- a/cpp/src/qpid/broker/SemanticState.h +++ b/cpp/src/qpid/broker/SemanticState.h @@ -26,7 +26,6 @@ #include "Deliverable.h" #include "DeliveryAdapter.h" #include "DeliveryRecord.h" -#include "DeliveryToken.h" #include "DtxBuffer.h" #include "DtxManager.h" #include "NameGenerator.h" @@ -65,11 +64,9 @@ class SemanticState : public sys::OutputTask, { qpid::sys::Mutex lock; SemanticState* const parent; - const DeliveryToken::shared_ptr token; const string name; const Queue::shared_ptr queue; const bool ackExpected; - const bool nolocal; const bool acquire; bool blocked; bool windowing; @@ -87,9 +84,9 @@ class SemanticState : public sys::OutputTask, public: typedef boost::shared_ptr<ConsumerImpl> shared_ptr; - ConsumerImpl(SemanticState* parent, DeliveryToken::shared_ptr token, + ConsumerImpl(SemanticState* parent, const string& name, Queue::shared_ptr queue, - bool ack, bool nolocal, bool acquire, bool exclusive, + bool ack, bool acquire, bool exclusive, const std::string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments); ~ConsumerImpl(); OwnershipToken* getSession(); @@ -177,11 +174,9 @@ class SemanticState : public sys::OutputTask, bool exists(const string& consumerTag); - /** - *@param tagInOut - if empty it is updated with the generated token. - */ - void consume(DeliveryToken::shared_ptr token, string& tagInOut, Queue::shared_ptr queue, - bool nolocal, bool ackRequired, bool acquire, bool exclusive, + void consume(const string& destination, + Queue::shared_ptr queue, + bool ackRequired, bool acquire, bool exclusive, const string& resumeId=string(), uint64_t resumeTtl=0, const framing::FieldTable& = framing::FieldTable()); @@ -203,7 +198,7 @@ class SemanticState : public sys::OutputTask, void suspendDtx(const std::string& xid); void resumeDtx(const std::string& xid); void recover(bool requeue); - DeliveryId redeliver(QueuedMessage& msg, DeliveryToken::shared_ptr token); + void deliver(DeliveryRecord& message); void acquire(DeliveryId first, DeliveryId last, DeliveryIds& acquired); void release(DeliveryId first, DeliveryId last, bool setRedelivered); void reject(DeliveryId first, DeliveryId last); diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp index e09c94398b..c5a55b64a3 100644 --- a/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/cpp/src/qpid/broker/SessionAdapter.cpp @@ -17,8 +17,6 @@ */ #include "SessionAdapter.h" #include "Connection.h" -#include "DeliveryToken.h" -#include "MessageDelivery.h" #include "Queue.h" #include "qpid/Exception.h" #include "qpid/framing/reply_exceptions.h" @@ -478,10 +476,9 @@ SessionAdapter::MessageHandlerImpl::subscribe(const string& queueName, if(!destination.empty() && state.exists(destination)) throw NotAllowedException(QPID_MSG("Consumer tags must be unique")); - string tag = destination; - state.consume(MessageDelivery::getMessageDeliveryToken(destination, acceptMode, acquireMode), - tag, queue, false, //TODO get rid of no-local - acceptMode == 0, acquireMode == 0, exclusive, resumeId, resumeTtl, arguments); + state.consume(destination, queue, + acceptMode == 0, acquireMode == 0, exclusive, + resumeId, resumeTtl, arguments); ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); if (agent) diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index 587dcaf724..7910ef3f0c 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -21,7 +21,7 @@ #include "SessionState.h" #include "Broker.h" #include "ConnectionState.h" -#include "MessageDelivery.h" +#include "DeliveryRecord.h" #include "SessionManager.h" #include "SessionHandler.h" #include "qpid/framing/AMQContentBody.h" @@ -230,14 +230,13 @@ void SessionState::handleOut(AMQFrame& frame) { handler->out(frame); } -DeliveryId SessionState::deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token) +void SessionState::deliver(DeliveryRecord& msg) { uint32_t maxFrameSize = getConnection().getFrameMax(); assert(senderGetCommandPoint().offset == 0); SequenceNumber commandId = senderGetCommandPoint().command; - MessageDelivery::deliver(msg, getProxy().getHandler(), commandId, token, maxFrameSize); + msg.deliver(getProxy().getHandler(), commandId, maxFrameSize); assert(senderGetCommandPoint() == SessionPoint(commandId+1, 0)); // Delivery has moved sendPoint. - return commandId; } void SessionState::sendCompletion() { handler->sendCompletion(); } diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h index 4c5a7a66b7..d08cc5fa2c 100644 --- a/cpp/src/qpid/broker/SessionState.h +++ b/cpp/src/qpid/broker/SessionState.h @@ -91,7 +91,7 @@ class SessionState : public qpid::SessionState, void sendCompletion(); //delivery adapter methods: - DeliveryId deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token); + void deliver(DeliveryRecord&); // Manageable entry points management::ManagementObject* GetManagementObject (void) const; diff --git a/cpp/src/qpid/management/ManagementBroker.cpp b/cpp/src/qpid/management/ManagementBroker.cpp index 7ced42f69b..480c63b03a 100644 --- a/cpp/src/qpid/management/ManagementBroker.cpp +++ b/cpp/src/qpid/management/ManagementBroker.cpp @@ -23,7 +23,6 @@ #include "qpid/broker/DeliverableMessage.h" #include "qpid/log/Statement.h" #include <qpid/broker/Message.h> -#include <qpid/broker/MessageDelivery.h> #include "qpid/framing/MessageTransferBody.h" #include "qpid/sys/Time.h" #include "qpid/broker/ConnectionState.h" diff --git a/cpp/src/tests/DeliveryRecordTest.cpp b/cpp/src/tests/DeliveryRecordTest.cpp index 9bdc456993..47c7157749 100644 --- a/cpp/src/tests/DeliveryRecordTest.cpp +++ b/cpp/src/tests/DeliveryRecordTest.cpp @@ -45,7 +45,9 @@ QPID_AUTO_TEST_CASE(testSort) list<DeliveryRecord> records; for (list<SequenceNumber>::iterator i = ids.begin(); i != ids.end(); i++) { - records.push_back(DeliveryRecord(QueuedMessage(0), Queue::shared_ptr(), "tag", DeliveryToken::shared_ptr(), *i, false, false, false)); + DeliveryRecord r(QueuedMessage(0), Queue::shared_ptr(), "tag", false, false, false); + r.setId(*i); + records.push_back(r); } records.sort(); diff --git a/cpp/src/tests/MessageUtils.h b/cpp/src/tests/MessageUtils.h index 21ee834ba7..81508e534e 100644 --- a/cpp/src/tests/MessageUtils.h +++ b/cpp/src/tests/MessageUtils.h @@ -20,7 +20,6 @@ */ #include "qpid/broker/Message.h" -#include "qpid/broker/MessageDelivery.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/framing/Uuid.h" diff --git a/cpp/src/tests/QueuePolicyTest.cpp b/cpp/src/tests/QueuePolicyTest.cpp index cf45f554df..f7fe81a709 100644 --- a/cpp/src/tests/QueuePolicyTest.cpp +++ b/cpp/src/tests/QueuePolicyTest.cpp @@ -23,6 +23,7 @@ #include "qpid/broker/QueuePolicy.h" #include "qpid/sys/Time.h" +#include "qpid/framing/reply_exceptions.h" #include "MessageUtils.h" #include "BrokerFixture.h" diff --git a/cpp/src/tests/run_federation_tests b/cpp/src/tests/run_federation_tests index 6142c1c37c..685467a681 100755 --- a/cpp/src/tests/run_federation_tests +++ b/cpp/src/tests/run_federation_tests @@ -22,7 +22,7 @@ if test -d ${PYTHON_DIR} ; then echo "Running federation tests using brokers on ports $LOCAL_PORT $REMOTE_PORT" PYTHONPATH=${PYTHON_DIR} export PYTHONPATH - ${MY_DIR}/federation.py -v -s ${MY_DIR}/../../../specs/amqp.0-10-qpid-errata.xml -b localhost:$LOCAL_PORT --remote-port $REMOTE_PORT + ${MY_DIR}/federation.py -v -s ${MY_DIR}/../../../specs/amqp.0-10-qpid-errata.xml -b localhost:$LOCAL_PORT --remote-port $REMOTE_PORT $@ RETCODE=$? stop_brokers if test x$RETCODE != x0; then |