summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCarl C. Trieloff <cctrieloff@apache.org>2009-01-30 18:59:24 +0000
committerCarl C. Trieloff <cctrieloff@apache.org>2009-01-30 18:59:24 +0000
commit6c32dd7ce1d7919f1c8f79752d20c3a341062aad (patch)
treea909fe6d1ec64c680b7b8bf6b9d0b830e987057c
parent5827482775336cd49f24bba381beb6812778a622 (diff)
downloadqpid-python-6c32dd7ce1d7919f1c8f79752d20c3a341062aad.tar.gz
Correction for: start a broker in cluster, send messages that are flow to disk, then join a broker to the cluster. Then consume from the new node. Cotent released messages where loosing content. This patch corrects that.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@739378 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Message.cpp43
-rw-r--r--qpid/cpp/src/qpid/broker/Message.h3
-rw-r--r--qpid/cpp/src/qpid/client/Connection.cpp7
-rw-r--r--qpid/cpp/src/qpid/client/Connection.h5
-rw-r--r--qpid/cpp/src/qpid/client/SessionImpl.cpp10
-rw-r--r--qpid/cpp/src/qpid/client/SessionImpl.h7
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateClient.cpp16
-rw-r--r--qpid/cpp/src/tests/consume.cpp9
8 files changed, 79 insertions, 21 deletions
diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp
index 6bcee99f49..e5a0c3e9e1 100644
--- a/qpid/cpp/src/qpid/broker/Message.cpp
+++ b/qpid/cpp/src/qpid/broker/Message.cpp
@@ -197,30 +197,39 @@ void Message::destroy()
}
}
-void Message::sendContent(Queue& queue, framing::FrameHandler& out, uint16_t maxFrameSize) const
+bool Message::getContentFrame(const Queue& queue, AMQFrame& frame, uint16_t maxContentSize, uint64_t offset) const
{
if (isContentReleased()) {
- //load content from store in chunks of maxContentSize
- uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead();
intrusive_ptr<const PersistableMessage> pmsg(this);
bool done = false;
- for (uint64_t offset = 0; !done; offset += maxContentSize)
+ string& data = frame.castBody<AMQContentBody>()->getData();
+ store->loadContent(queue, pmsg, data, offset, maxContentSize);
+ done = data.size() < maxContentSize;
+ frame.setBof(false);
+ frame.setEof(true);
+ QPID_LOG(debug, "loaded frame" << frame);
+ if (offset > 0) {
+ frame.setBos(false);
+ }
+ if (!done) {
+ frame.setEos(false);
+ } else return false;
+ return true;
+ }
+ else return false;
+}
+
+void Message::sendContent(const Queue& queue, framing::FrameHandler& out, uint16_t maxFrameSize) const
+{
+ if (isContentReleased()) {
+
+ uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead();
+ bool morecontent = true;
+ for (uint64_t offset = 0; morecontent; offset += maxContentSize)
{
AMQFrame frame((AMQContentBody()));
- string& data = frame.castBody<AMQContentBody>()->getData();
-
- store->loadContent(queue, pmsg, data, offset, maxContentSize);
- done = data.size() < maxContentSize;
- frame.setBof(false);
- frame.setEof(true);
- if (offset > 0) {
- frame.setBos(false);
- }
- if (!done) {
- frame.setEos(false);
- }
- QPID_LOG(debug, "loaded frame for delivery: " << frame);
+ morecontent = getContentFrame(queue, frame, maxContentSize, offset);
out.handle(frame);
}
} else {
diff --git a/qpid/cpp/src/qpid/broker/Message.h b/qpid/cpp/src/qpid/broker/Message.h
index bed191fb8d..de716e9441 100644
--- a/qpid/cpp/src/qpid/broker/Message.h
+++ b/qpid/cpp/src/qpid/broker/Message.h
@@ -131,7 +131,8 @@ public:
void releaseContent(MessageStore* store);
void destroy();
- void sendContent(Queue& queue, framing::FrameHandler& out, uint16_t maxFrameSize) const;
+ bool getContentFrame(const Queue& queue, framing::AMQFrame& frame, uint16_t maxContentSize, uint64_t offset) const;
+ void sendContent(const Queue& queue, framing::FrameHandler& out, uint16_t maxFrameSize) const;
void sendHeader(framing::FrameHandler& out, uint16_t maxFrameSize) const;
bool isContentLoaded() const;
diff --git a/qpid/cpp/src/qpid/client/Connection.cpp b/qpid/cpp/src/qpid/client/Connection.cpp
index f450344aa7..3562df4a74 100644
--- a/qpid/cpp/src/qpid/client/Connection.cpp
+++ b/qpid/cpp/src/qpid/client/Connection.cpp
@@ -119,6 +119,13 @@ void Connection::open(const ConnectionSettings& settings)
impl->registerFailureCallback ( failureCallback );
}
+const ConnectionSettings& Connection::getNegotiatedSettings()
+{
+ if (!isOpen())
+ throw Exception(QPID_MSG("Connection is not open."));
+ return impl->getNegotiatedSettings();
+}
+
Session Connection::newSession(const std::string& name, uint32_t timeout) {
if (!isOpen())
throw Exception(QPID_MSG("Connection has not yet been opened"));
diff --git a/qpid/cpp/src/qpid/client/Connection.h b/qpid/cpp/src/qpid/client/Connection.h
index d03542bb5b..ed984ccb42 100644
--- a/qpid/cpp/src/qpid/client/Connection.h
+++ b/qpid/cpp/src/qpid/client/Connection.h
@@ -175,6 +175,11 @@ class Connection
std::vector<Url> getKnownBrokers();
void registerFailureCallback ( boost::function<void ()> fn );
+ /**
+ * Return the set of client negotiated settings
+ */
+ const ConnectionSettings& getNegotiatedSettings();
+
friend class ConnectionAccess; ///<@internal
friend class SessionBase_0_10; ///<@internal
};
diff --git a/qpid/cpp/src/qpid/client/SessionImpl.cpp b/qpid/cpp/src/qpid/client/SessionImpl.cpp
index 7cf68956ea..c179a31853 100644
--- a/qpid/cpp/src/qpid/client/SessionImpl.cpp
+++ b/qpid/cpp/src/qpid/client/SessionImpl.cpp
@@ -322,6 +322,11 @@ Future SessionImpl::send(const AMQBody& command, const FrameSet& content) {
return f;
}
+void SessionImpl::sendRawFrame(AMQFrame& frame) {
+ Acquire a(sendLock);
+ handleOut(frame);
+}
+
Future SessionImpl::sendCommand(const AMQBody& command, const MethodContent* content)
{
Acquire a(sendLock);
@@ -767,4 +772,9 @@ void SessionImpl::setWeakPtr(bool weak) {
connectionShared = connectionWeak.lock();
}
+shared_ptr<ConnectionImpl> SessionImpl::getConnection()
+{
+ return connectionWeak.lock();
+}
+
}}
diff --git a/qpid/cpp/src/qpid/client/SessionImpl.h b/qpid/cpp/src/qpid/client/SessionImpl.h
index 9d0c4ff796..d826b759ae 100644
--- a/qpid/cpp/src/qpid/client/SessionImpl.h
+++ b/qpid/cpp/src/qpid/client/SessionImpl.h
@@ -87,6 +87,7 @@ public:
Future send(const framing::AMQBody& command);
Future send(const framing::AMQBody& command, const framing::MethodContent& content);
Future send(const framing::AMQBody& command, const framing::FrameSet& content);
+ void sendRawFrame(framing::AMQFrame& frame);
Demux& getDemux();
void markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer);
@@ -114,6 +115,11 @@ public:
*/
void setWeakPtr(bool weak=true);
+ /**
+ * get the Connection associated with this connection
+ */
+ shared_ptr<ConnectionImpl> getConnection();
+
private:
enum State {
INACTIVE,
@@ -204,7 +210,6 @@ private:
const uint64_t maxFrameSize;
const SessionId id;
- shared_ptr<ConnectionImpl> connection();
shared_ptr<ConnectionImpl> connectionShared;
boost::weak_ptr<ConnectionImpl> connectionWeak;
bool weakPtr;
diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
index f2580cb777..2ed0b26a0d 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -168,6 +168,7 @@ class MessageUpdater {
session.exchangeUnbind(queue, UpdateClient::UPDATE);
}
+
void updateQueuedMessage(const broker::QueuedMessage& message) {
if (!haveLastPos || message.position - lastPos != 1) {
ClusterConnectionProxy(session).queuePosition(queue, message.position.getValue()-1);
@@ -177,12 +178,27 @@ class MessageUpdater {
SessionBase_0_10Access sb(session);
framing::MessageTransferBody transfer(
framing::ProtocolVersion(), UpdateClient::UPDATE, message::ACCEPT_MODE_NONE, message::ACQUIRE_MODE_PRE_ACQUIRED);
+
sb.get()->send(transfer, message.payload->getFrames());
+ if (message.payload->isContentReleased()){
+ uint16_t maxFrameSize = sb.get()->getConnection()->getNegotiatedSettings().maxFrameSize;
+
+ uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead();
+ bool morecontent = true;
+ for (uint64_t offset = 0; morecontent; offset += maxContentSize)
+ {
+ AMQFrame frame((AMQContentBody()));
+ morecontent = message.payload->getContentFrame(*(message.queue), frame, maxContentSize, offset);
+ sb.get()->sendRawFrame(frame);
+ }
+ }
}
void updateMessage(const boost::intrusive_ptr<broker::Message>& message) {
updateQueuedMessage(broker::QueuedMessage(0, message, haveLastPos? lastPos.getValue()+1 : 1));
}
+
+
};
diff --git a/qpid/cpp/src/tests/consume.cpp b/qpid/cpp/src/tests/consume.cpp
index 4d74b8ae57..3aacf8b3da 100644
--- a/qpid/cpp/src/tests/consume.cpp
+++ b/qpid/cpp/src/tests/consume.cpp
@@ -44,15 +44,19 @@ struct Args : public qpid::TestOptions {
string queue;
bool declare;
bool summary;
+ bool print;
+ bool durable;
Args() : count(1000), ack(0), queue("publish-consume"),
- declare(false), summary(false)
+ declare(false), summary(false), print(false)
{
addOptions()
("count", optValue(count, "N"), "number of messages to publish")
("ack-frequency", optValue(ack, "N"), "ack every N messages (0 means use no-ack mode)")
("queue", optValue(queue, "<queue name>"), "queue to consume from")
("declare", optValue(declare), "declare the queue")
+ ("durable", optValue(durable), "declare the queue durable, use with declare")
+ ("print-data", optValue(print), "Print the recieved data at info level")
("s,summary", optValue(summary), "Print undecorated rate.");
}
};
@@ -73,7 +77,7 @@ struct Client
void consume()
{
if (opts.declare)
- session.queueDeclare(opts.queue);
+ session.queueDeclare(arg::queue=opts.queue, arg::durable=opts.durable);
SubscriptionManager subs(session);
LocalQueue lq;
SubscriptionSettings settings;
@@ -85,6 +89,7 @@ struct Client
for (size_t i = 0; i < opts.count; ++i) {
msg=lq.pop();
QPID_LOG(info, "Received: " << msg.getMessageProperties().getCorrelationId());
+ if (opts.print) QPID_LOG(info, "Data: " << msg.getData());
}
if (opts.ack != 0)
sub.accept(sub.getUnaccepted()); // Cumulative ack for final batch.