summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2014-08-20 22:11:38 +0000
committerGordon Sim <gsim@apache.org>2014-08-20 22:11:38 +0000
commit38d687074d863d9a5cd4bc3b4194d434960b999a (patch)
tree540d77fe7793785b3f80f496ba91a148f447dd0e
parent8ff9d718edc2f219890003f89a88feaa25f04051 (diff)
downloadqpid-python-38d687074d863d9a5cd4bc3b4194d434960b999a.tar.gz
QPID-6021: better batching
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1619252 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Connection.cpp101
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Connection.h3
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp48
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Outgoing.h1
4 files changed, 85 insertions, 68 deletions
diff --git a/qpid/cpp/src/qpid/broker/amqp/Connection.cpp b/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
index 7f40683aa1..34afc955f1 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
@@ -153,7 +153,8 @@ size_t Connection::decode(const char* buffer, size_t size)
size_t Connection::encode(char* buffer, size_t size)
{
- QPID_LOG(trace, "encode(" << size << ")")
+ QPID_LOG(trace, "encode(" << size << ")");
+ doOutput(size);
ssize_t n = pn_transport_output(transport, buffer, size);
if (n > 0) {
QPID_LOG_CAT(debug, network, id << " encoded " << n << " bytes from " << size)
@@ -166,6 +167,33 @@ size_t Connection::encode(char* buffer, size_t size)
return 0;
}
}
+
+void Connection::doOutput(ssize_t capacity)
+{
+ for (ssize_t n = pn_transport_pending(transport); n > 0 && n < capacity; n = pn_transport_pending(transport)) {
+ if (dispatch()) processDeliveries();
+ else break;
+ }
+}
+
+bool Connection::dispatch()
+{
+ bool result = false;
+ for (Sessions::iterator i = sessions.begin();i != sessions.end();) {
+ if (i->second->endedByManagement()) {
+ pn_session_close(i->first);
+ i->second->close();
+ sessions.erase(i++);
+ result = true;
+ QPID_LOG_CAT(debug, model, id << " session ended by management");
+ } else {
+ if (i->second->dispatch()) result = true;
+ ++i;
+ }
+ }
+ return result;
+}
+
bool Connection::canEncode()
{
if (!closeInitiated) {
@@ -174,18 +202,7 @@ bool Connection::canEncode()
return true;
}
try {
- for (Sessions::iterator i = sessions.begin();i != sessions.end();) {
- if (i->second->endedByManagement()) {
- pn_session_close(i->first);
- i->second->close();
- sessions.erase(i++);
- haveOutput = true;
- QPID_LOG_CAT(debug, model, id << " session ended by management");
- } else {
- if (i->second->dispatch()) haveOutput = true;
- ++i;
- }
- }
+ if (dispatch()) haveOutput = true;
process();
} catch (const Exception& e) {
QPID_LOG(error, id << ": " << e.what());
@@ -304,6 +321,37 @@ void Connection::process()
}
}
+ processDeliveries();
+
+ for (pn_link_t* l = pn_link_head(connection, REQUIRES_CLOSE); l; l = pn_link_next(l, REQUIRES_CLOSE)) {
+ pn_link_close(l);
+ Sessions::iterator session = sessions.find(pn_link_session(l));
+ if (session == sessions.end()) {
+ QPID_LOG(error, id << " peer attempted to detach link on unknown session!");
+ } else {
+ session->second->detach(l);
+ QPID_LOG_CAT(debug, model, id << " link detached");
+ }
+ }
+ for (pn_session_t* s = pn_session_head(connection, REQUIRES_CLOSE); s; s = pn_session_next(s, REQUIRES_CLOSE)) {
+ pn_session_close(s);
+ Sessions::iterator i = sessions.find(s);
+ if (i != sessions.end()) {
+ i->second->close();
+ sessions.erase(i);
+ QPID_LOG_CAT(debug, model, id << " session ended");
+ } else {
+ QPID_LOG(error, id << " peer attempted to close unrecognised session");
+ }
+ }
+ if ((pn_connection_state(connection) & REQUIRES_CLOSE) == REQUIRES_CLOSE) {
+ QPID_LOG_CAT(debug, model, id << " connection closed");
+ pn_connection_close(connection);
+ }
+}
+
+void Connection::processDeliveries()
+{
//handle deliveries
for (pn_delivery_t* delivery = pn_work_head(connection); delivery; delivery = pn_work_next(delivery)) {
pn_link_t* link = pn_delivery_link(delivery);
@@ -332,33 +380,6 @@ void Connection::process()
pn_link_close(link);
}
}
-
-
- for (pn_link_t* l = pn_link_head(connection, REQUIRES_CLOSE); l; l = pn_link_next(l, REQUIRES_CLOSE)) {
- pn_link_close(l);
- Sessions::iterator session = sessions.find(pn_link_session(l));
- if (session == sessions.end()) {
- QPID_LOG(error, id << " peer attempted to detach link on unknown session!");
- } else {
- session->second->detach(l);
- QPID_LOG_CAT(debug, model, id << " link detached");
- }
- }
- for (pn_session_t* s = pn_session_head(connection, REQUIRES_CLOSE); s; s = pn_session_next(s, REQUIRES_CLOSE)) {
- pn_session_close(s);
- Sessions::iterator i = sessions.find(s);
- if (i != sessions.end()) {
- i->second->close();
- sessions.erase(i);
- QPID_LOG_CAT(debug, model, id << " session ended");
- } else {
- QPID_LOG(error, id << " peer attempted to close unrecognised session");
- }
- }
- if ((pn_connection_state(connection) & REQUIRES_CLOSE) == REQUIRES_CLOSE) {
- QPID_LOG_CAT(debug, model, id << " connection closed");
- pn_connection_close(connection);
- }
}
std::string Connection::getError()
diff --git a/qpid/cpp/src/qpid/broker/amqp/Connection.h b/qpid/cpp/src/qpid/broker/amqp/Connection.h
index 90478ae181..150df230d3 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Connection.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Connection.h
@@ -72,6 +72,9 @@ class Connection : public BrokerContext, public sys::ConnectionCodec, public Man
bool closeRequested;
virtual void process();
+ void doOutput(ssize_t);
+ bool dispatch();
+ void processDeliveries();
std::string getError();
void close();
void open();
diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
index 7a1dbef9db..ba073c8a36 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
@@ -63,7 +63,7 @@ OutgoingFromQueue::OutgoingFromQueue(Broker& broker, const std::string& source,
exclusive(e),
isControllingUser(p),
queue(q), deliveries(5000), link(l), out(o),
- current(0), outstanding(0),
+ current(0),
buffer(1024)/*used only for header at present*/,
//for exclusive queues, assume unreliable unless reliable is explicitly requested; otherwise assume reliable unless unreliable requested
unreliable(exclusive ? !requested_reliable(link) : requested_unreliable(link)),
@@ -109,29 +109,7 @@ void OutgoingFromQueue::handle(pn_delivery_t* delivery)
{
size_t i = Record::getIndex(pn_delivery_tag(delivery));
Record& r = deliveries[i];
- if (pn_delivery_writable(delivery)) {
- assert(r.msg);
- assert(!r.delivery);
- r.delivery = delivery;
- //write header
- qpid::amqp::MessageEncoder encoder(&buffer[0], buffer.size());
- encoder.writeHeader(Header(r.msg));
- write(&buffer[0], encoder.getPosition());
- Translation t(r.msg);
- t.write(*this);
- if (pn_link_advance(link)) {
- if (unreliable) pn_delivery_settle(delivery);
- --outstanding;
- outgoingMessageSent();
- QPID_LOG(debug, "Sent message " << r.msg.getSequence() << " from " << queue->getName() << ", index=" << r.index);
- } else {
- QPID_LOG(error, "Failed to send message " << r.msg.getSequence() << " from " << queue->getName() << ", index=" << r.index);
- }
- }
- if (unreliable) {
- if (preAcquires()) queue->dequeue(0, r.cursor);
- r.reset();
- } else if (pn_delivery_updated(delivery)) {
+ if (pn_delivery_updated(delivery)) {
assert(r.delivery == delivery);
r.disposition = pn_delivery_remote_state(delivery);
if (!r.disposition && pn_delivery_settled(delivery)) {
@@ -169,7 +147,7 @@ void OutgoingFromQueue::handle(pn_delivery_t* delivery)
bool OutgoingFromQueue::canDeliver()
{
- return deliveries[current].delivery == 0 && pn_link_credit(link) > outstanding;
+ return deliveries[current].delivery == 0 && pn_link_credit(link);
}
void OutgoingFromQueue::detached()
@@ -198,9 +176,25 @@ bool OutgoingFromQueue::deliver(const QueueCursor& cursor, const qpid::broker::M
if (current >= deliveries.capacity()) current = 0;
r.cursor = cursor;
r.msg = msg;
- pn_delivery(link, r.tag);
+ r.delivery = pn_delivery(link, r.tag);
+ //write header
+ qpid::amqp::MessageEncoder encoder(&buffer[0], buffer.size());
+ encoder.writeHeader(Header(r.msg));
+ write(&buffer[0], encoder.getPosition());
+ Translation t(r.msg);
+ t.write(*this);
+ if (pn_link_advance(link)) {
+ if (unreliable) pn_delivery_settle(r.delivery);
+ outgoingMessageSent();
+ QPID_LOG(debug, "Sent message " << r.msg.getSequence() << " from " << queue->getName() << ", index=" << r.index);
+ } else {
+ QPID_LOG(error, "Failed to send message " << r.msg.getSequence() << " from " << queue->getName() << ", index=" << r.index);
+ }
+ if (unreliable) {
+ if (preAcquires()) queue->dequeue(0, r.cursor);
+ r.reset();
+ }
QPID_LOG(debug, "Requested delivery of " << r.msg.getSequence() << " from " << queue->getName() << ", index=" << r.index);
- ++outstanding;
return true;
}
diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
index 3808b68d32..8039044aea 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
@@ -140,7 +140,6 @@ class OutgoingFromQueue : public Outgoing, public qpid::broker::Consumer, public
pn_link_t* link;
qpid::sys::OutputControl& out;
size_t current;
- int outstanding;
std::vector<char> buffer;
std::string subjectFilter;
boost::scoped_ptr<Selector> selector;