diff options
| author | Andrew Stitcher <astitcher@apache.org> | 2012-07-06 15:41:33 +0000 |
|---|---|---|
| committer | Andrew Stitcher <astitcher@apache.org> | 2012-07-06 15:41:33 +0000 |
| commit | 3bf81e9a28c108630f11e8beabdcd0c3b6acb970 (patch) | |
| tree | 610cf0f315b62fe1b9d4e8849ead2b2d327a6b8c /cpp | |
| parent | 6074d880829770ac447927808a605a7013375b0a (diff) | |
| download | qpid-python-3bf81e9a28c108630f11e8beabdcd0c3b6acb970.tar.gz | |
QPID-3883: Using application headers in messages causes a very large slowdown
Add subject to outgoing messsage before encoding it to save a round trip
decode-encode.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1358275 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
| -rw-r--r-- | cpp/include/qpid/amqp_0_10/Codecs.h | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/amqp_0_10/Codecs.cpp | 45 | ||||
| -rw-r--r-- | cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp | 15 | ||||
| -rw-r--r-- | cpp/src/qpid/client/amqp0_10/OutgoingMessage.h | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/client/amqp0_10/SenderImpl.cpp | 8 | ||||
| -rw-r--r-- | cpp/src/qpid/client/amqp0_10/SenderImpl.h | 12 |
6 files changed, 66 insertions, 17 deletions
diff --git a/cpp/include/qpid/amqp_0_10/Codecs.h b/cpp/include/qpid/amqp_0_10/Codecs.h index 73846f33a8..d632a9f20a 100644 --- a/cpp/include/qpid/amqp_0_10/Codecs.h +++ b/cpp/include/qpid/amqp_0_10/Codecs.h @@ -69,6 +69,8 @@ class QPID_COMMON_CLASS_EXTERN ListCodec */ QPID_COMMON_EXTERN void translate(const qpid::types::Variant::Map& from, qpid::framing::FieldTable& to); +QPID_COMMON_EXTERN void translate(const qpid::types::Variant::Map& from, const std::string& efield, const qpid::types::Variant& evalue, + qpid::framing::FieldTable& to); QPID_COMMON_EXTERN void translate(const qpid::framing::FieldTable& from, qpid::types::Variant::Map& to); diff --git a/cpp/src/qpid/amqp_0_10/Codecs.cpp b/cpp/src/qpid/amqp_0_10/Codecs.cpp index da2c84d6d5..8fc6629801 100644 --- a/cpp/src/qpid/amqp_0_10/Codecs.cpp +++ b/cpp/src/qpid/amqp_0_10/Codecs.cpp @@ -271,6 +271,16 @@ uint32_t encodedSize(const Variant::Map& values) return size; } +uint32_t encodedSize(const Variant::Map& values, const std::string& efield, const Variant& evalue) +{ + uint32_t size = 4/*size field*/ + 4/*count field*/; + for(Variant::Map::const_iterator i = values.begin(); i != values.end(); ++i) { + size += 1/*size of key*/ + (i->first).size() + 1/*typecode*/ + encodedSize(i->second); + } + size += 1/*size of key*/ + efield.size() + 1/*typecode*/ + encodedSize(evalue); + return size; +} + uint32_t encodedSize(const Variant::List& values) { uint32_t size = 4/*size field*/ + 4/*count field*/; @@ -399,6 +409,21 @@ void encode(const Variant::Map& map, uint32_t len, qpid::framing::Buffer& buffer assert(s + len == buffer.getPosition()); } +void encode(const Variant::Map& map, const std::string& efield, const Variant& evalue, uint32_t len, qpid::framing::Buffer& buffer) +{ + uint32_t s = buffer.getPosition(); + buffer.putLong(len - 4);//exclusive of the size field itself + buffer.putLong(map.size() + 1 /* The extra field */ ); + for (Variant::Map::const_iterator i = map.begin(); i != map.end(); ++i) { + buffer.putShortString(i->first); + encode(i->second, buffer); + } + buffer.putShortString(efield); + encode(evalue, buffer); + + assert(s + len == buffer.getPosition()); +} + void encode(const Variant::List& list, uint32_t len, qpid::framing::Buffer& buffer) { uint32_t s = buffer.getPosition(); @@ -475,8 +500,26 @@ void translate(const Variant::Map& from, FieldTable& to) assert( len == buff.getPosition() ); buff.reset(); to.decode(buff); +} - //convert(from, to, &toFieldTableEntry); +void translate(const Variant::Map& from, const std::string& efield, const Variant& evalue, FieldTable& to) +{ + // Create buffer of correct size to encode Variant::Map + uint32_t len = encodedSize(from, efield, evalue); + std::vector<char> space(len); + qpid::framing::Buffer buff(&space[0], len); + + // Encode Variant::Map into buffer directly - + // We pass the already calculated length in to avoid + // recalculating it. + encode(from, efield, evalue, len, buff); + + // Give buffer to FieldTable + // Could speed this up a bit by avoiding copying + // the buffer we just created into the FieldTable + assert( len == buff.getPosition() ); + buff.reset(); + to.decode(buff); } void translate(const FieldTable& from, Variant::Map& to) diff --git a/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp b/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp index d93416da75..dd14d11c4c 100644 --- a/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp +++ b/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp @@ -58,7 +58,12 @@ void OutgoingMessage::convert(const qpid::messaging::Message& from) if (address) { message.getMessageProperties().setReplyTo(AddressResolution::convert(address)); } - translate(from.getProperties(), message.getMessageProperties().getApplicationHeaders()); + if (!subject.empty()) { + Variant v(subject); v.setEncoding("utf8"); + translate(from.getProperties(), SUBJECT, v, message.getMessageProperties().getApplicationHeaders()); + } else { + translate(from.getProperties(), message.getMessageProperties().getApplicationHeaders()); + } if (from.getTtl().getMilliseconds()) { message.getDeliveryProperties().setTtl(from.getTtl().getMilliseconds()); } @@ -89,16 +94,14 @@ void OutgoingMessage::convert(const qpid::messaging::Message& from) } } -void OutgoingMessage::setSubject(const std::string& subject) +void OutgoingMessage::setSubject(const std::string& s) { - if (!subject.empty()) { - message.getMessageProperties().getApplicationHeaders().setString(SUBJECT, subject); - } + subject = s; } std::string OutgoingMessage::getSubject() const { - return message.getMessageProperties().getApplicationHeaders().getAsString(SUBJECT); + return subject; } }}} // namespace qpid::client::amqp0_10 diff --git a/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h b/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h index 0cdd2a2336..2191f45546 100644 --- a/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h +++ b/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h @@ -35,6 +35,7 @@ struct OutgoingMessage { qpid::client::Message message; qpid::client::Completion status; + std::string subject; void convert(const qpid::messaging::Message&); void setSubject(const std::string& subject); diff --git a/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp b/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp index f2f0f1a9e5..b275db38d7 100644 --- a/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp @@ -37,10 +37,10 @@ SenderImpl::SenderImpl(SessionImpl& _parent, const std::string& _name, void SenderImpl::send(const qpid::messaging::Message& message, bool sync) { if (unreliable) { // immutable, don't need lock - UnreliableSend f(*this, &message); + UnreliableSend f(*this, message); parent->execute(f); } else { - Send f(*this, &message); + Send f(*this, message); while (f.repeat) parent->execute(f); } if (sync) parent->sync(true); @@ -117,8 +117,8 @@ void SenderImpl::sendImpl(const qpid::messaging::Message& m) { sys::Mutex::ScopedLock l(lock); std::auto_ptr<OutgoingMessage> msg(new OutgoingMessage()); - msg->convert(m); msg->setSubject(m.getSubject().empty() ? address.getSubject() : m.getSubject()); + msg->convert(m); outgoing.push_back(msg.release()); sink->send(session, name, outgoing.back()); } @@ -127,8 +127,8 @@ void SenderImpl::sendUnreliable(const qpid::messaging::Message& m) { sys::Mutex::ScopedLock l(lock); OutgoingMessage msg; - msg.convert(m); msg.setSubject(m.getSubject().empty() ? address.getSubject() : m.getSubject()); + msg.convert(m); sink->send(session, name, msg); } diff --git a/cpp/src/qpid/client/amqp0_10/SenderImpl.h b/cpp/src/qpid/client/amqp0_10/SenderImpl.h index c10c77ae18..d75863c743 100644 --- a/cpp/src/qpid/client/amqp0_10/SenderImpl.h +++ b/cpp/src/qpid/client/amqp0_10/SenderImpl.h @@ -99,32 +99,32 @@ class SenderImpl : public qpid::messaging::SenderImpl struct Send : Command { - const qpid::messaging::Message* message; + const qpid::messaging::Message& message; bool repeat; - Send(SenderImpl& i, const qpid::messaging::Message* m) : Command(i), message(m), repeat(true) {} + Send(SenderImpl& i, const qpid::messaging::Message& m) : Command(i), message(m), repeat(true) {} void operator()() { impl.waitForCapacity(); //from this point message will be recorded if there is any //failure (and replayed) so need not repeat the call repeat = false; - impl.sendImpl(*message); + impl.sendImpl(message); } }; struct UnreliableSend : Command { - const qpid::messaging::Message* message; + const qpid::messaging::Message& message; - UnreliableSend(SenderImpl& i, const qpid::messaging::Message* m) : Command(i), message(m) {} + UnreliableSend(SenderImpl& i, const qpid::messaging::Message& m) : Command(i), message(m) {} void operator()() { //TODO: ideally want to put messages on the outbound //queue and pull them off in io thread, but the old //0-10 client doesn't support that option so for now //we simply don't queue unreliable messages - impl.sendUnreliable(*message); + impl.sendUnreliable(message); } }; |
