diff options
author | Gordon Sim <gsim@apache.org> | 2014-08-20 22:11:38 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2014-08-20 22:11:38 +0000 |
commit | 38d687074d863d9a5cd4bc3b4194d434960b999a (patch) | |
tree | 540d77fe7793785b3f80f496ba91a148f447dd0e | |
parent | 8ff9d718edc2f219890003f89a88feaa25f04051 (diff) | |
download | qpid-python-38d687074d863d9a5cd4bc3b4194d434960b999a.tar.gz |
QPID-6021: better batching
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1619252 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Connection.cpp | 101 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Connection.h | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp | 48 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Outgoing.h | 1 |
4 files changed, 85 insertions, 68 deletions
diff --git a/qpid/cpp/src/qpid/broker/amqp/Connection.cpp b/qpid/cpp/src/qpid/broker/amqp/Connection.cpp index 7f40683aa1..34afc955f1 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Connection.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Connection.cpp @@ -153,7 +153,8 @@ size_t Connection::decode(const char* buffer, size_t size) size_t Connection::encode(char* buffer, size_t size) { - QPID_LOG(trace, "encode(" << size << ")") + QPID_LOG(trace, "encode(" << size << ")"); + doOutput(size); ssize_t n = pn_transport_output(transport, buffer, size); if (n > 0) { QPID_LOG_CAT(debug, network, id << " encoded " << n << " bytes from " << size) @@ -166,6 +167,33 @@ size_t Connection::encode(char* buffer, size_t size) return 0; } } + +void Connection::doOutput(ssize_t capacity) +{ + for (ssize_t n = pn_transport_pending(transport); n > 0 && n < capacity; n = pn_transport_pending(transport)) { + if (dispatch()) processDeliveries(); + else break; + } +} + +bool Connection::dispatch() +{ + bool result = false; + for (Sessions::iterator i = sessions.begin();i != sessions.end();) { + if (i->second->endedByManagement()) { + pn_session_close(i->first); + i->second->close(); + sessions.erase(i++); + result = true; + QPID_LOG_CAT(debug, model, id << " session ended by management"); + } else { + if (i->second->dispatch()) result = true; + ++i; + } + } + return result; +} + bool Connection::canEncode() { if (!closeInitiated) { @@ -174,18 +202,7 @@ bool Connection::canEncode() return true; } try { - for (Sessions::iterator i = sessions.begin();i != sessions.end();) { - if (i->second->endedByManagement()) { - pn_session_close(i->first); - i->second->close(); - sessions.erase(i++); - haveOutput = true; - QPID_LOG_CAT(debug, model, id << " session ended by management"); - } else { - if (i->second->dispatch()) haveOutput = true; - ++i; - } - } + if (dispatch()) haveOutput = true; process(); } catch (const Exception& e) { QPID_LOG(error, id << ": " << e.what()); @@ -304,6 +321,37 @@ void Connection::process() } } + processDeliveries(); + + for (pn_link_t* l = pn_link_head(connection, REQUIRES_CLOSE); l; l = pn_link_next(l, REQUIRES_CLOSE)) { + pn_link_close(l); + Sessions::iterator session = sessions.find(pn_link_session(l)); + if (session == sessions.end()) { + QPID_LOG(error, id << " peer attempted to detach link on unknown session!"); + } else { + session->second->detach(l); + QPID_LOG_CAT(debug, model, id << " link detached"); + } + } + for (pn_session_t* s = pn_session_head(connection, REQUIRES_CLOSE); s; s = pn_session_next(s, REQUIRES_CLOSE)) { + pn_session_close(s); + Sessions::iterator i = sessions.find(s); + if (i != sessions.end()) { + i->second->close(); + sessions.erase(i); + QPID_LOG_CAT(debug, model, id << " session ended"); + } else { + QPID_LOG(error, id << " peer attempted to close unrecognised session"); + } + } + if ((pn_connection_state(connection) & REQUIRES_CLOSE) == REQUIRES_CLOSE) { + QPID_LOG_CAT(debug, model, id << " connection closed"); + pn_connection_close(connection); + } +} + +void Connection::processDeliveries() +{ //handle deliveries for (pn_delivery_t* delivery = pn_work_head(connection); delivery; delivery = pn_work_next(delivery)) { pn_link_t* link = pn_delivery_link(delivery); @@ -332,33 +380,6 @@ void Connection::process() pn_link_close(link); } } - - - for (pn_link_t* l = pn_link_head(connection, REQUIRES_CLOSE); l; l = pn_link_next(l, REQUIRES_CLOSE)) { - pn_link_close(l); - Sessions::iterator session = sessions.find(pn_link_session(l)); - if (session == sessions.end()) { - QPID_LOG(error, id << " peer attempted to detach link on unknown session!"); - } else { - session->second->detach(l); - QPID_LOG_CAT(debug, model, id << " link detached"); - } - } - for (pn_session_t* s = pn_session_head(connection, REQUIRES_CLOSE); s; s = pn_session_next(s, REQUIRES_CLOSE)) { - pn_session_close(s); - Sessions::iterator i = sessions.find(s); - if (i != sessions.end()) { - i->second->close(); - sessions.erase(i); - QPID_LOG_CAT(debug, model, id << " session ended"); - } else { - QPID_LOG(error, id << " peer attempted to close unrecognised session"); - } - } - if ((pn_connection_state(connection) & REQUIRES_CLOSE) == REQUIRES_CLOSE) { - QPID_LOG_CAT(debug, model, id << " connection closed"); - pn_connection_close(connection); - } } std::string Connection::getError() diff --git a/qpid/cpp/src/qpid/broker/amqp/Connection.h b/qpid/cpp/src/qpid/broker/amqp/Connection.h index 90478ae181..150df230d3 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Connection.h +++ b/qpid/cpp/src/qpid/broker/amqp/Connection.h @@ -72,6 +72,9 @@ class Connection : public BrokerContext, public sys::ConnectionCodec, public Man bool closeRequested; virtual void process(); + void doOutput(ssize_t); + bool dispatch(); + void processDeliveries(); std::string getError(); void close(); void open(); diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp index 7a1dbef9db..ba073c8a36 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp @@ -63,7 +63,7 @@ OutgoingFromQueue::OutgoingFromQueue(Broker& broker, const std::string& source, exclusive(e), isControllingUser(p), queue(q), deliveries(5000), link(l), out(o), - current(0), outstanding(0), + current(0), buffer(1024)/*used only for header at present*/, //for exclusive queues, assume unreliable unless reliable is explicitly requested; otherwise assume reliable unless unreliable requested unreliable(exclusive ? !requested_reliable(link) : requested_unreliable(link)), @@ -109,29 +109,7 @@ void OutgoingFromQueue::handle(pn_delivery_t* delivery) { size_t i = Record::getIndex(pn_delivery_tag(delivery)); Record& r = deliveries[i]; - if (pn_delivery_writable(delivery)) { - assert(r.msg); - assert(!r.delivery); - r.delivery = delivery; - //write header - qpid::amqp::MessageEncoder encoder(&buffer[0], buffer.size()); - encoder.writeHeader(Header(r.msg)); - write(&buffer[0], encoder.getPosition()); - Translation t(r.msg); - t.write(*this); - if (pn_link_advance(link)) { - if (unreliable) pn_delivery_settle(delivery); - --outstanding; - outgoingMessageSent(); - QPID_LOG(debug, "Sent message " << r.msg.getSequence() << " from " << queue->getName() << ", index=" << r.index); - } else { - QPID_LOG(error, "Failed to send message " << r.msg.getSequence() << " from " << queue->getName() << ", index=" << r.index); - } - } - if (unreliable) { - if (preAcquires()) queue->dequeue(0, r.cursor); - r.reset(); - } else if (pn_delivery_updated(delivery)) { + if (pn_delivery_updated(delivery)) { assert(r.delivery == delivery); r.disposition = pn_delivery_remote_state(delivery); if (!r.disposition && pn_delivery_settled(delivery)) { @@ -169,7 +147,7 @@ void OutgoingFromQueue::handle(pn_delivery_t* delivery) bool OutgoingFromQueue::canDeliver() { - return deliveries[current].delivery == 0 && pn_link_credit(link) > outstanding; + return deliveries[current].delivery == 0 && pn_link_credit(link); } void OutgoingFromQueue::detached() @@ -198,9 +176,25 @@ bool OutgoingFromQueue::deliver(const QueueCursor& cursor, const qpid::broker::M if (current >= deliveries.capacity()) current = 0; r.cursor = cursor; r.msg = msg; - pn_delivery(link, r.tag); + r.delivery = pn_delivery(link, r.tag); + //write header + qpid::amqp::MessageEncoder encoder(&buffer[0], buffer.size()); + encoder.writeHeader(Header(r.msg)); + write(&buffer[0], encoder.getPosition()); + Translation t(r.msg); + t.write(*this); + if (pn_link_advance(link)) { + if (unreliable) pn_delivery_settle(r.delivery); + outgoingMessageSent(); + QPID_LOG(debug, "Sent message " << r.msg.getSequence() << " from " << queue->getName() << ", index=" << r.index); + } else { + QPID_LOG(error, "Failed to send message " << r.msg.getSequence() << " from " << queue->getName() << ", index=" << r.index); + } + if (unreliable) { + if (preAcquires()) queue->dequeue(0, r.cursor); + r.reset(); + } QPID_LOG(debug, "Requested delivery of " << r.msg.getSequence() << " from " << queue->getName() << ", index=" << r.index); - ++outstanding; return true; } diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h index 3808b68d32..8039044aea 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h +++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h @@ -140,7 +140,6 @@ class OutgoingFromQueue : public Outgoing, public qpid::broker::Consumer, public pn_link_t* link; qpid::sys::OutputControl& out; size_t current; - int outstanding; std::vector<char> buffer; std::string subjectFilter; boost::scoped_ptr<Selector> selector; |