summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2014-08-20 22:11:38 +0000
committerGordon Sim <gsim@apache.org>2014-08-20 22:11:38 +0000
commit38d687074d863d9a5cd4bc3b4194d434960b999a (patch)
tree540d77fe7793785b3f80f496ba91a148f447dd0e /qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
parent8ff9d718edc2f219890003f89a88feaa25f04051 (diff)
downloadqpid-python-38d687074d863d9a5cd4bc3b4194d434960b999a.tar.gz
QPID-6021: better batching
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1619252 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp48
1 files changed, 21 insertions, 27 deletions
diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
index 7a1dbef9db..ba073c8a36 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
@@ -63,7 +63,7 @@ OutgoingFromQueue::OutgoingFromQueue(Broker& broker, const std::string& source,
exclusive(e),
isControllingUser(p),
queue(q), deliveries(5000), link(l), out(o),
- current(0), outstanding(0),
+ current(0),
buffer(1024)/*used only for header at present*/,
//for exclusive queues, assume unreliable unless reliable is explicitly requested; otherwise assume reliable unless unreliable requested
unreliable(exclusive ? !requested_reliable(link) : requested_unreliable(link)),
@@ -109,29 +109,7 @@ void OutgoingFromQueue::handle(pn_delivery_t* delivery)
{
size_t i = Record::getIndex(pn_delivery_tag(delivery));
Record& r = deliveries[i];
- if (pn_delivery_writable(delivery)) {
- assert(r.msg);
- assert(!r.delivery);
- r.delivery = delivery;
- //write header
- qpid::amqp::MessageEncoder encoder(&buffer[0], buffer.size());
- encoder.writeHeader(Header(r.msg));
- write(&buffer[0], encoder.getPosition());
- Translation t(r.msg);
- t.write(*this);
- if (pn_link_advance(link)) {
- if (unreliable) pn_delivery_settle(delivery);
- --outstanding;
- outgoingMessageSent();
- QPID_LOG(debug, "Sent message " << r.msg.getSequence() << " from " << queue->getName() << ", index=" << r.index);
- } else {
- QPID_LOG(error, "Failed to send message " << r.msg.getSequence() << " from " << queue->getName() << ", index=" << r.index);
- }
- }
- if (unreliable) {
- if (preAcquires()) queue->dequeue(0, r.cursor);
- r.reset();
- } else if (pn_delivery_updated(delivery)) {
+ if (pn_delivery_updated(delivery)) {
assert(r.delivery == delivery);
r.disposition = pn_delivery_remote_state(delivery);
if (!r.disposition && pn_delivery_settled(delivery)) {
@@ -169,7 +147,7 @@ void OutgoingFromQueue::handle(pn_delivery_t* delivery)
bool OutgoingFromQueue::canDeliver()
{
- return deliveries[current].delivery == 0 && pn_link_credit(link) > outstanding;
+ return deliveries[current].delivery == 0 && pn_link_credit(link);
}
void OutgoingFromQueue::detached()
@@ -198,9 +176,25 @@ bool OutgoingFromQueue::deliver(const QueueCursor& cursor, const qpid::broker::M
if (current >= deliveries.capacity()) current = 0;
r.cursor = cursor;
r.msg = msg;
- pn_delivery(link, r.tag);
+ r.delivery = pn_delivery(link, r.tag);
+ //write header
+ qpid::amqp::MessageEncoder encoder(&buffer[0], buffer.size());
+ encoder.writeHeader(Header(r.msg));
+ write(&buffer[0], encoder.getPosition());
+ Translation t(r.msg);
+ t.write(*this);
+ if (pn_link_advance(link)) {
+ if (unreliable) pn_delivery_settle(r.delivery);
+ outgoingMessageSent();
+ QPID_LOG(debug, "Sent message " << r.msg.getSequence() << " from " << queue->getName() << ", index=" << r.index);
+ } else {
+ QPID_LOG(error, "Failed to send message " << r.msg.getSequence() << " from " << queue->getName() << ", index=" << r.index);
+ }
+ if (unreliable) {
+ if (preAcquires()) queue->dequeue(0, r.cursor);
+ r.reset();
+ }
QPID_LOG(debug, "Requested delivery of " << r.msg.getSequence() << " from " << queue->getName() << ", index=" << r.index);
- ++outstanding;
return true;
}