summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-12-13 18:44:24 +0000
committerGordon Sim <gsim@apache.org>2007-12-13 18:44:24 +0000
commit290f052bdcfa112292c891dc4f179e20bf06c812 (patch)
treedf7e46a810194e03043395eb3c98c9cd1641faa2 /cpp/src
parentd1910203a6803d4eed26d694909721e4a0142320 (diff)
downloadqpid-python-290f052bdcfa112292c891dc4f179e20bf06c812.tar.gz
Some fixes relating to message 'staging'.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@603973 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/Broker.cpp1
-rw-r--r--cpp/src/qpid/broker/Broker.h3
-rw-r--r--cpp/src/qpid/broker/Message.cpp2
-rw-r--r--cpp/src/qpid/broker/MessageBuilder.h2
-rw-r--r--cpp/src/qpid/broker/SemanticHandler.cpp3
-rw-r--r--cpp/src/qpid/client/ExecutionHandler.cpp3
-rw-r--r--cpp/src/tests/MessageBuilderTest.cpp6
7 files changed, 9 insertions, 11 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index 2651356b6c..1c3fb85ab1 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -124,7 +124,6 @@ Broker::Broker(const Broker::Options& conf) :
config(conf),
store(createStore(conf)),
queues(store.get()),
- stagingThreshold(0),
factory(*this),
dtxManager(store.get()),
sessionManager(conf.ack)
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index b7f580a744..1c1c303be8 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -113,7 +113,7 @@ class Broker : public sys::Runnable, public Plugin::Target, public management::M
MessageStore& getStore() { return *store; }
QueueRegistry& getQueues() { return queues; }
ExchangeRegistry& getExchanges() { return exchanges; }
- uint64_t getStagingThreshold() { return stagingThreshold; }
+ uint64_t getStagingThreshold() { return config.stagingThreshold; }
DtxManager& getDtxManager() { return dtxManager; }
SessionManager& getSessionManager() { return sessionManager; }
@@ -132,7 +132,6 @@ class Broker : public sys::Runnable, public Plugin::Target, public management::M
QueueRegistry queues;
ExchangeRegistry exchanges;
- uint64_t stagingThreshold;
ConnectionFactory factory;
DtxManager dtxManager;
HandlerUpdaters handlerUpdaters;
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp
index 0fa8380a32..cff834c765 100644
--- a/cpp/src/qpid/broker/Message.cpp
+++ b/cpp/src/qpid/broker/Message.cpp
@@ -177,7 +177,7 @@ void Message::sendContent(Queue& queue, framing::FrameHandler& out, uint16_t max
if (offset > 0) {
frame.setBos(false);
}
- if (remaining) {
+ if (remaining > maxContentSize) {
frame.setEos(false);
}
out.handle(frame);
diff --git a/cpp/src/qpid/broker/MessageBuilder.h b/cpp/src/qpid/broker/MessageBuilder.h
index c7ed2abc04..5bf6b1939a 100644
--- a/cpp/src/qpid/broker/MessageBuilder.h
+++ b/cpp/src/qpid/broker/MessageBuilder.h
@@ -32,7 +32,7 @@ namespace qpid {
class MessageBuilder : public framing::FrameHandler{
public:
- MessageBuilder(MessageStore* const store = 0, uint64_t stagingThreshold = 0);
+ MessageBuilder(MessageStore* const store, uint64_t stagingThreshold);
void handle(framing::AMQFrame& frame);
intrusive_ptr<Message> getMessage() { return message; }
void start(const framing::SequenceNumber& id);
diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp
index 768ea9ea08..32c032e701 100644
--- a/cpp/src/qpid/broker/SemanticHandler.cpp
+++ b/cpp/src/qpid/broker/SemanticHandler.cpp
@@ -40,6 +40,7 @@ using namespace qpid::sys;
SemanticHandler::SemanticHandler(SessionState& s) :
state(*this,s), session(s),
+ msgBuilder(&s.getBroker().getStore(), s.getBroker().getStagingThreshold()),
ackOp(boost::bind(&SemanticState::ackRange, &state, _1, _2))
{}
@@ -150,7 +151,7 @@ void SemanticHandler::handleContent(AMQFrame& frame)
msg = msgBuilder.getMessage();
}
msgBuilder.handle(frame);
- if (msg->getFrames().isComplete()) {//end of frameset will be indicated by frame flags
+ if (frame.getEof() && frame.getEos()) {//end of frameset will be indicated by frame flags
msg->setPublisher(&session.getConnection());
state.handle(msg);
msgBuilder.end();
diff --git a/cpp/src/qpid/client/ExecutionHandler.cpp b/cpp/src/qpid/client/ExecutionHandler.cpp
index f30a35aad0..63ee80d9f6 100644
--- a/cpp/src/qpid/client/ExecutionHandler.cpp
+++ b/cpp/src/qpid/client/ExecutionHandler.cpp
@@ -206,8 +206,7 @@ void ExecutionHandler::sendContent(const MethodContent& content)
if(data_length > 0){
header.setEof(false);
out(header);
- //frame itself uses 8 bytes
- u_int32_t frag_size = maxFrameSize - 8;
+ u_int32_t frag_size = maxFrameSize - AMQFrame::frameOverhead();
if(data_length < frag_size){
AMQFrame frame(in_place<AMQContentBody>(content.getData()));
frame.setBof(false);
diff --git a/cpp/src/tests/MessageBuilderTest.cpp b/cpp/src/tests/MessageBuilderTest.cpp
index 7335867140..092e02cc2f 100644
--- a/cpp/src/tests/MessageBuilderTest.cpp
+++ b/cpp/src/tests/MessageBuilderTest.cpp
@@ -94,7 +94,7 @@ class MessageBuilderTest : public CppUnit::TestCase
public:
void testHeaderOnly(){
- MessageBuilder builder;
+ MessageBuilder builder(0, 0);
builder.start(SequenceNumber());
std::string exchange("builder-exchange");
@@ -117,7 +117,7 @@ class MessageBuilderTest : public CppUnit::TestCase
}
void test1ContentFrame(){
- MessageBuilder builder;
+ MessageBuilder builder(0, 0);
builder.start(SequenceNumber());
std::string data("abcdefg");
@@ -149,7 +149,7 @@ class MessageBuilderTest : public CppUnit::TestCase
}
void test2ContentFrames(){
- MessageBuilder builder;
+ MessageBuilder builder(0, 0);
builder.start(SequenceNumber());
std::string data1("abcdefg");