diff options
author | Gordon Sim <gsim@apache.org> | 2014-08-20 22:11:38 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2014-08-20 22:11:38 +0000 |
commit | 38d687074d863d9a5cd4bc3b4194d434960b999a (patch) | |
tree | 540d77fe7793785b3f80f496ba91a148f447dd0e /qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp | |
parent | 8ff9d718edc2f219890003f89a88feaa25f04051 (diff) | |
download | qpid-python-38d687074d863d9a5cd4bc3b4194d434960b999a.tar.gz |
QPID-6021: better batching
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1619252 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp | 48 |
1 files changed, 21 insertions, 27 deletions
diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp index 7a1dbef9db..ba073c8a36 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp @@ -63,7 +63,7 @@ OutgoingFromQueue::OutgoingFromQueue(Broker& broker, const std::string& source, exclusive(e), isControllingUser(p), queue(q), deliveries(5000), link(l), out(o), - current(0), outstanding(0), + current(0), buffer(1024)/*used only for header at present*/, //for exclusive queues, assume unreliable unless reliable is explicitly requested; otherwise assume reliable unless unreliable requested unreliable(exclusive ? !requested_reliable(link) : requested_unreliable(link)), @@ -109,29 +109,7 @@ void OutgoingFromQueue::handle(pn_delivery_t* delivery) { size_t i = Record::getIndex(pn_delivery_tag(delivery)); Record& r = deliveries[i]; - if (pn_delivery_writable(delivery)) { - assert(r.msg); - assert(!r.delivery); - r.delivery = delivery; - //write header - qpid::amqp::MessageEncoder encoder(&buffer[0], buffer.size()); - encoder.writeHeader(Header(r.msg)); - write(&buffer[0], encoder.getPosition()); - Translation t(r.msg); - t.write(*this); - if (pn_link_advance(link)) { - if (unreliable) pn_delivery_settle(delivery); - --outstanding; - outgoingMessageSent(); - QPID_LOG(debug, "Sent message " << r.msg.getSequence() << " from " << queue->getName() << ", index=" << r.index); - } else { - QPID_LOG(error, "Failed to send message " << r.msg.getSequence() << " from " << queue->getName() << ", index=" << r.index); - } - } - if (unreliable) { - if (preAcquires()) queue->dequeue(0, r.cursor); - r.reset(); - } else if (pn_delivery_updated(delivery)) { + if (pn_delivery_updated(delivery)) { assert(r.delivery == delivery); r.disposition = pn_delivery_remote_state(delivery); if (!r.disposition && pn_delivery_settled(delivery)) { @@ -169,7 +147,7 @@ void OutgoingFromQueue::handle(pn_delivery_t* delivery) bool OutgoingFromQueue::canDeliver() { - return deliveries[current].delivery == 0 && pn_link_credit(link) > outstanding; + return deliveries[current].delivery == 0 && pn_link_credit(link); } void OutgoingFromQueue::detached() @@ -198,9 +176,25 @@ bool OutgoingFromQueue::deliver(const QueueCursor& cursor, const qpid::broker::M if (current >= deliveries.capacity()) current = 0; r.cursor = cursor; r.msg = msg; - pn_delivery(link, r.tag); + r.delivery = pn_delivery(link, r.tag); + //write header + qpid::amqp::MessageEncoder encoder(&buffer[0], buffer.size()); + encoder.writeHeader(Header(r.msg)); + write(&buffer[0], encoder.getPosition()); + Translation t(r.msg); + t.write(*this); + if (pn_link_advance(link)) { + if (unreliable) pn_delivery_settle(r.delivery); + outgoingMessageSent(); + QPID_LOG(debug, "Sent message " << r.msg.getSequence() << " from " << queue->getName() << ", index=" << r.index); + } else { + QPID_LOG(error, "Failed to send message " << r.msg.getSequence() << " from " << queue->getName() << ", index=" << r.index); + } + if (unreliable) { + if (preAcquires()) queue->dequeue(0, r.cursor); + r.reset(); + } QPID_LOG(debug, "Requested delivery of " << r.msg.getSequence() << " from " << queue->getName() << ", index=" << r.index); - ++outstanding; return true; } |