diff options
author | Carl C. Trieloff <cctrieloff@apache.org> | 2009-01-30 18:59:24 +0000 |
---|---|---|
committer | Carl C. Trieloff <cctrieloff@apache.org> | 2009-01-30 18:59:24 +0000 |
commit | 6c32dd7ce1d7919f1c8f79752d20c3a341062aad (patch) | |
tree | a909fe6d1ec64c680b7b8bf6b9d0b830e987057c | |
parent | 5827482775336cd49f24bba381beb6812778a622 (diff) | |
download | qpid-python-6c32dd7ce1d7919f1c8f79752d20c3a341062aad.tar.gz |
Correction for: start a broker in cluster, send messages that are flow to disk, then join a broker to the cluster. Then consume from the new node. Cotent released messages where loosing content. This patch corrects that.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@739378 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/Message.cpp | 43 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Message.h | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/Connection.cpp | 7 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/Connection.h | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/SessionImpl.cpp | 10 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/SessionImpl.h | 7 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/UpdateClient.cpp | 16 | ||||
-rw-r--r-- | qpid/cpp/src/tests/consume.cpp | 9 |
8 files changed, 79 insertions, 21 deletions
diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp index 6bcee99f49..e5a0c3e9e1 100644 --- a/qpid/cpp/src/qpid/broker/Message.cpp +++ b/qpid/cpp/src/qpid/broker/Message.cpp @@ -197,30 +197,39 @@ void Message::destroy() } } -void Message::sendContent(Queue& queue, framing::FrameHandler& out, uint16_t maxFrameSize) const +bool Message::getContentFrame(const Queue& queue, AMQFrame& frame, uint16_t maxContentSize, uint64_t offset) const { if (isContentReleased()) { - //load content from store in chunks of maxContentSize - uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead(); intrusive_ptr<const PersistableMessage> pmsg(this); bool done = false; - for (uint64_t offset = 0; !done; offset += maxContentSize) + string& data = frame.castBody<AMQContentBody>()->getData(); + store->loadContent(queue, pmsg, data, offset, maxContentSize); + done = data.size() < maxContentSize; + frame.setBof(false); + frame.setEof(true); + QPID_LOG(debug, "loaded frame" << frame); + if (offset > 0) { + frame.setBos(false); + } + if (!done) { + frame.setEos(false); + } else return false; + return true; + } + else return false; +} + +void Message::sendContent(const Queue& queue, framing::FrameHandler& out, uint16_t maxFrameSize) const +{ + if (isContentReleased()) { + + uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead(); + bool morecontent = true; + for (uint64_t offset = 0; morecontent; offset += maxContentSize) { AMQFrame frame((AMQContentBody())); - string& data = frame.castBody<AMQContentBody>()->getData(); - - store->loadContent(queue, pmsg, data, offset, maxContentSize); - done = data.size() < maxContentSize; - frame.setBof(false); - frame.setEof(true); - if (offset > 0) { - frame.setBos(false); - } - if (!done) { - frame.setEos(false); - } - QPID_LOG(debug, "loaded frame for delivery: " << frame); + morecontent = getContentFrame(queue, frame, maxContentSize, offset); out.handle(frame); } } else { diff --git a/qpid/cpp/src/qpid/broker/Message.h b/qpid/cpp/src/qpid/broker/Message.h index bed191fb8d..de716e9441 100644 --- a/qpid/cpp/src/qpid/broker/Message.h +++ b/qpid/cpp/src/qpid/broker/Message.h @@ -131,7 +131,8 @@ public: void releaseContent(MessageStore* store); void destroy(); - void sendContent(Queue& queue, framing::FrameHandler& out, uint16_t maxFrameSize) const; + bool getContentFrame(const Queue& queue, framing::AMQFrame& frame, uint16_t maxContentSize, uint64_t offset) const; + void sendContent(const Queue& queue, framing::FrameHandler& out, uint16_t maxFrameSize) const; void sendHeader(framing::FrameHandler& out, uint16_t maxFrameSize) const; bool isContentLoaded() const; diff --git a/qpid/cpp/src/qpid/client/Connection.cpp b/qpid/cpp/src/qpid/client/Connection.cpp index f450344aa7..3562df4a74 100644 --- a/qpid/cpp/src/qpid/client/Connection.cpp +++ b/qpid/cpp/src/qpid/client/Connection.cpp @@ -119,6 +119,13 @@ void Connection::open(const ConnectionSettings& settings) impl->registerFailureCallback ( failureCallback ); } +const ConnectionSettings& Connection::getNegotiatedSettings() +{ + if (!isOpen()) + throw Exception(QPID_MSG("Connection is not open.")); + return impl->getNegotiatedSettings(); +} + Session Connection::newSession(const std::string& name, uint32_t timeout) { if (!isOpen()) throw Exception(QPID_MSG("Connection has not yet been opened")); diff --git a/qpid/cpp/src/qpid/client/Connection.h b/qpid/cpp/src/qpid/client/Connection.h index d03542bb5b..ed984ccb42 100644 --- a/qpid/cpp/src/qpid/client/Connection.h +++ b/qpid/cpp/src/qpid/client/Connection.h @@ -175,6 +175,11 @@ class Connection std::vector<Url> getKnownBrokers(); void registerFailureCallback ( boost::function<void ()> fn ); + /** + * Return the set of client negotiated settings + */ + const ConnectionSettings& getNegotiatedSettings(); + friend class ConnectionAccess; ///<@internal friend class SessionBase_0_10; ///<@internal }; diff --git a/qpid/cpp/src/qpid/client/SessionImpl.cpp b/qpid/cpp/src/qpid/client/SessionImpl.cpp index 7cf68956ea..c179a31853 100644 --- a/qpid/cpp/src/qpid/client/SessionImpl.cpp +++ b/qpid/cpp/src/qpid/client/SessionImpl.cpp @@ -322,6 +322,11 @@ Future SessionImpl::send(const AMQBody& command, const FrameSet& content) { return f; } +void SessionImpl::sendRawFrame(AMQFrame& frame) { + Acquire a(sendLock); + handleOut(frame); +} + Future SessionImpl::sendCommand(const AMQBody& command, const MethodContent* content) { Acquire a(sendLock); @@ -767,4 +772,9 @@ void SessionImpl::setWeakPtr(bool weak) { connectionShared = connectionWeak.lock(); } +shared_ptr<ConnectionImpl> SessionImpl::getConnection() +{ + return connectionWeak.lock(); +} + }} diff --git a/qpid/cpp/src/qpid/client/SessionImpl.h b/qpid/cpp/src/qpid/client/SessionImpl.h index 9d0c4ff796..d826b759ae 100644 --- a/qpid/cpp/src/qpid/client/SessionImpl.h +++ b/qpid/cpp/src/qpid/client/SessionImpl.h @@ -87,6 +87,7 @@ public: Future send(const framing::AMQBody& command); Future send(const framing::AMQBody& command, const framing::MethodContent& content); Future send(const framing::AMQBody& command, const framing::FrameSet& content); + void sendRawFrame(framing::AMQFrame& frame); Demux& getDemux(); void markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer); @@ -114,6 +115,11 @@ public: */ void setWeakPtr(bool weak=true); + /** + * get the Connection associated with this connection + */ + shared_ptr<ConnectionImpl> getConnection(); + private: enum State { INACTIVE, @@ -204,7 +210,6 @@ private: const uint64_t maxFrameSize; const SessionId id; - shared_ptr<ConnectionImpl> connection(); shared_ptr<ConnectionImpl> connectionShared; boost::weak_ptr<ConnectionImpl> connectionWeak; bool weakPtr; diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp index f2580cb777..2ed0b26a0d 100644 --- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp @@ -168,6 +168,7 @@ class MessageUpdater { session.exchangeUnbind(queue, UpdateClient::UPDATE); } + void updateQueuedMessage(const broker::QueuedMessage& message) { if (!haveLastPos || message.position - lastPos != 1) { ClusterConnectionProxy(session).queuePosition(queue, message.position.getValue()-1); @@ -177,12 +178,27 @@ class MessageUpdater { SessionBase_0_10Access sb(session); framing::MessageTransferBody transfer( framing::ProtocolVersion(), UpdateClient::UPDATE, message::ACCEPT_MODE_NONE, message::ACQUIRE_MODE_PRE_ACQUIRED); + sb.get()->send(transfer, message.payload->getFrames()); + if (message.payload->isContentReleased()){ + uint16_t maxFrameSize = sb.get()->getConnection()->getNegotiatedSettings().maxFrameSize; + + uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead(); + bool morecontent = true; + for (uint64_t offset = 0; morecontent; offset += maxContentSize) + { + AMQFrame frame((AMQContentBody())); + morecontent = message.payload->getContentFrame(*(message.queue), frame, maxContentSize, offset); + sb.get()->sendRawFrame(frame); + } + } } void updateMessage(const boost::intrusive_ptr<broker::Message>& message) { updateQueuedMessage(broker::QueuedMessage(0, message, haveLastPos? lastPos.getValue()+1 : 1)); } + + }; diff --git a/qpid/cpp/src/tests/consume.cpp b/qpid/cpp/src/tests/consume.cpp index 4d74b8ae57..3aacf8b3da 100644 --- a/qpid/cpp/src/tests/consume.cpp +++ b/qpid/cpp/src/tests/consume.cpp @@ -44,15 +44,19 @@ struct Args : public qpid::TestOptions { string queue; bool declare; bool summary; + bool print; + bool durable; Args() : count(1000), ack(0), queue("publish-consume"), - declare(false), summary(false) + declare(false), summary(false), print(false) { addOptions() ("count", optValue(count, "N"), "number of messages to publish") ("ack-frequency", optValue(ack, "N"), "ack every N messages (0 means use no-ack mode)") ("queue", optValue(queue, "<queue name>"), "queue to consume from") ("declare", optValue(declare), "declare the queue") + ("durable", optValue(durable), "declare the queue durable, use with declare") + ("print-data", optValue(print), "Print the recieved data at info level") ("s,summary", optValue(summary), "Print undecorated rate."); } }; @@ -73,7 +77,7 @@ struct Client void consume() { if (opts.declare) - session.queueDeclare(opts.queue); + session.queueDeclare(arg::queue=opts.queue, arg::durable=opts.durable); SubscriptionManager subs(session); LocalQueue lq; SubscriptionSettings settings; @@ -85,6 +89,7 @@ struct Client for (size_t i = 0; i < opts.count; ++i) { msg=lq.pop(); QPID_LOG(info, "Received: " << msg.getMessageProperties().getCorrelationId()); + if (opts.print) QPID_LOG(info, "Data: " << msg.getData()); } if (opts.ack != 0) sub.accept(sub.getUnaccepted()); // Cumulative ack for final batch. |