diff options
author | Gordon Sim <gsim@apache.org> | 2007-12-13 18:44:24 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2007-12-13 18:44:24 +0000 |
commit | 290f052bdcfa112292c891dc4f179e20bf06c812 (patch) | |
tree | df7e46a810194e03043395eb3c98c9cd1641faa2 /cpp/src | |
parent | d1910203a6803d4eed26d694909721e4a0142320 (diff) | |
download | qpid-python-290f052bdcfa112292c891dc4f179e20bf06c812.tar.gz |
Some fixes relating to message 'staging'.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@603973 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Message.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/MessageBuilder.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SemanticHandler.cpp | 3 | ||||
-rw-r--r-- | cpp/src/qpid/client/ExecutionHandler.cpp | 3 | ||||
-rw-r--r-- | cpp/src/tests/MessageBuilderTest.cpp | 6 |
7 files changed, 9 insertions, 11 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 2651356b6c..1c3fb85ab1 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -124,7 +124,6 @@ Broker::Broker(const Broker::Options& conf) : config(conf), store(createStore(conf)), queues(store.get()), - stagingThreshold(0), factory(*this), dtxManager(store.get()), sessionManager(conf.ack) diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index b7f580a744..1c1c303be8 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -113,7 +113,7 @@ class Broker : public sys::Runnable, public Plugin::Target, public management::M MessageStore& getStore() { return *store; } QueueRegistry& getQueues() { return queues; } ExchangeRegistry& getExchanges() { return exchanges; } - uint64_t getStagingThreshold() { return stagingThreshold; } + uint64_t getStagingThreshold() { return config.stagingThreshold; } DtxManager& getDtxManager() { return dtxManager; } SessionManager& getSessionManager() { return sessionManager; } @@ -132,7 +132,6 @@ class Broker : public sys::Runnable, public Plugin::Target, public management::M QueueRegistry queues; ExchangeRegistry exchanges; - uint64_t stagingThreshold; ConnectionFactory factory; DtxManager dtxManager; HandlerUpdaters handlerUpdaters; diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp index 0fa8380a32..cff834c765 100644 --- a/cpp/src/qpid/broker/Message.cpp +++ b/cpp/src/qpid/broker/Message.cpp @@ -177,7 +177,7 @@ void Message::sendContent(Queue& queue, framing::FrameHandler& out, uint16_t max if (offset > 0) { frame.setBos(false); } - if (remaining) { + if (remaining > maxContentSize) { frame.setEos(false); } out.handle(frame); diff --git a/cpp/src/qpid/broker/MessageBuilder.h b/cpp/src/qpid/broker/MessageBuilder.h index c7ed2abc04..5bf6b1939a 100644 --- a/cpp/src/qpid/broker/MessageBuilder.h +++ b/cpp/src/qpid/broker/MessageBuilder.h @@ -32,7 +32,7 @@ namespace qpid { class MessageBuilder : public framing::FrameHandler{ public: - MessageBuilder(MessageStore* const store = 0, uint64_t stagingThreshold = 0); + MessageBuilder(MessageStore* const store, uint64_t stagingThreshold); void handle(framing::AMQFrame& frame); intrusive_ptr<Message> getMessage() { return message; } void start(const framing::SequenceNumber& id); diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp index 768ea9ea08..32c032e701 100644 --- a/cpp/src/qpid/broker/SemanticHandler.cpp +++ b/cpp/src/qpid/broker/SemanticHandler.cpp @@ -40,6 +40,7 @@ using namespace qpid::sys; SemanticHandler::SemanticHandler(SessionState& s) : state(*this,s), session(s), + msgBuilder(&s.getBroker().getStore(), s.getBroker().getStagingThreshold()), ackOp(boost::bind(&SemanticState::ackRange, &state, _1, _2)) {} @@ -150,7 +151,7 @@ void SemanticHandler::handleContent(AMQFrame& frame) msg = msgBuilder.getMessage(); } msgBuilder.handle(frame); - if (msg->getFrames().isComplete()) {//end of frameset will be indicated by frame flags + if (frame.getEof() && frame.getEos()) {//end of frameset will be indicated by frame flags msg->setPublisher(&session.getConnection()); state.handle(msg); msgBuilder.end(); diff --git a/cpp/src/qpid/client/ExecutionHandler.cpp b/cpp/src/qpid/client/ExecutionHandler.cpp index f30a35aad0..63ee80d9f6 100644 --- a/cpp/src/qpid/client/ExecutionHandler.cpp +++ b/cpp/src/qpid/client/ExecutionHandler.cpp @@ -206,8 +206,7 @@ void ExecutionHandler::sendContent(const MethodContent& content) if(data_length > 0){ header.setEof(false); out(header); - //frame itself uses 8 bytes - u_int32_t frag_size = maxFrameSize - 8; + u_int32_t frag_size = maxFrameSize - AMQFrame::frameOverhead(); if(data_length < frag_size){ AMQFrame frame(in_place<AMQContentBody>(content.getData())); frame.setBof(false); diff --git a/cpp/src/tests/MessageBuilderTest.cpp b/cpp/src/tests/MessageBuilderTest.cpp index 7335867140..092e02cc2f 100644 --- a/cpp/src/tests/MessageBuilderTest.cpp +++ b/cpp/src/tests/MessageBuilderTest.cpp @@ -94,7 +94,7 @@ class MessageBuilderTest : public CppUnit::TestCase public: void testHeaderOnly(){ - MessageBuilder builder; + MessageBuilder builder(0, 0); builder.start(SequenceNumber()); std::string exchange("builder-exchange"); @@ -117,7 +117,7 @@ class MessageBuilderTest : public CppUnit::TestCase } void test1ContentFrame(){ - MessageBuilder builder; + MessageBuilder builder(0, 0); builder.start(SequenceNumber()); std::string data("abcdefg"); @@ -149,7 +149,7 @@ class MessageBuilderTest : public CppUnit::TestCase } void test2ContentFrames(){ - MessageBuilder builder; + MessageBuilder builder(0, 0); builder.start(SequenceNumber()); std::string data1("abcdefg"); |