summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/amqp_0_10/Codecs.cpp45
-rw-r--r--cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp15
-rw-r--r--cpp/src/qpid/client/amqp0_10/OutgoingMessage.h1
-rw-r--r--cpp/src/qpid/client/amqp0_10/SenderImpl.cpp8
-rw-r--r--cpp/src/qpid/client/amqp0_10/SenderImpl.h12
5 files changed, 64 insertions, 17 deletions
diff --git a/cpp/src/qpid/amqp_0_10/Codecs.cpp b/cpp/src/qpid/amqp_0_10/Codecs.cpp
index da2c84d6d5..8fc6629801 100644
--- a/cpp/src/qpid/amqp_0_10/Codecs.cpp
+++ b/cpp/src/qpid/amqp_0_10/Codecs.cpp
@@ -271,6 +271,16 @@ uint32_t encodedSize(const Variant::Map& values)
return size;
}
+uint32_t encodedSize(const Variant::Map& values, const std::string& efield, const Variant& evalue)
+{
+ uint32_t size = 4/*size field*/ + 4/*count field*/;
+ for(Variant::Map::const_iterator i = values.begin(); i != values.end(); ++i) {
+ size += 1/*size of key*/ + (i->first).size() + 1/*typecode*/ + encodedSize(i->second);
+ }
+ size += 1/*size of key*/ + efield.size() + 1/*typecode*/ + encodedSize(evalue);
+ return size;
+}
+
uint32_t encodedSize(const Variant::List& values)
{
uint32_t size = 4/*size field*/ + 4/*count field*/;
@@ -399,6 +409,21 @@ void encode(const Variant::Map& map, uint32_t len, qpid::framing::Buffer& buffer
assert(s + len == buffer.getPosition());
}
+void encode(const Variant::Map& map, const std::string& efield, const Variant& evalue, uint32_t len, qpid::framing::Buffer& buffer)
+{
+ uint32_t s = buffer.getPosition();
+ buffer.putLong(len - 4);//exclusive of the size field itself
+ buffer.putLong(map.size() + 1 /* The extra field */ );
+ for (Variant::Map::const_iterator i = map.begin(); i != map.end(); ++i) {
+ buffer.putShortString(i->first);
+ encode(i->second, buffer);
+ }
+ buffer.putShortString(efield);
+ encode(evalue, buffer);
+
+ assert(s + len == buffer.getPosition());
+}
+
void encode(const Variant::List& list, uint32_t len, qpid::framing::Buffer& buffer)
{
uint32_t s = buffer.getPosition();
@@ -475,8 +500,26 @@ void translate(const Variant::Map& from, FieldTable& to)
assert( len == buff.getPosition() );
buff.reset();
to.decode(buff);
+}
- //convert(from, to, &toFieldTableEntry);
+void translate(const Variant::Map& from, const std::string& efield, const Variant& evalue, FieldTable& to)
+{
+ // Create buffer of correct size to encode Variant::Map
+ uint32_t len = encodedSize(from, efield, evalue);
+ std::vector<char> space(len);
+ qpid::framing::Buffer buff(&space[0], len);
+
+ // Encode Variant::Map into buffer directly -
+ // We pass the already calculated length in to avoid
+ // recalculating it.
+ encode(from, efield, evalue, len, buff);
+
+ // Give buffer to FieldTable
+ // Could speed this up a bit by avoiding copying
+ // the buffer we just created into the FieldTable
+ assert( len == buff.getPosition() );
+ buff.reset();
+ to.decode(buff);
}
void translate(const FieldTable& from, Variant::Map& to)
diff --git a/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp b/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
index d93416da75..dd14d11c4c 100644
--- a/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
+++ b/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
@@ -58,7 +58,12 @@ void OutgoingMessage::convert(const qpid::messaging::Message& from)
if (address) {
message.getMessageProperties().setReplyTo(AddressResolution::convert(address));
}
- translate(from.getProperties(), message.getMessageProperties().getApplicationHeaders());
+ if (!subject.empty()) {
+ Variant v(subject); v.setEncoding("utf8");
+ translate(from.getProperties(), SUBJECT, v, message.getMessageProperties().getApplicationHeaders());
+ } else {
+ translate(from.getProperties(), message.getMessageProperties().getApplicationHeaders());
+ }
if (from.getTtl().getMilliseconds()) {
message.getDeliveryProperties().setTtl(from.getTtl().getMilliseconds());
}
@@ -89,16 +94,14 @@ void OutgoingMessage::convert(const qpid::messaging::Message& from)
}
}
-void OutgoingMessage::setSubject(const std::string& subject)
+void OutgoingMessage::setSubject(const std::string& s)
{
- if (!subject.empty()) {
- message.getMessageProperties().getApplicationHeaders().setString(SUBJECT, subject);
- }
+ subject = s;
}
std::string OutgoingMessage::getSubject() const
{
- return message.getMessageProperties().getApplicationHeaders().getAsString(SUBJECT);
+ return subject;
}
}}} // namespace qpid::client::amqp0_10
diff --git a/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h b/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h
index 0cdd2a2336..2191f45546 100644
--- a/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h
+++ b/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h
@@ -35,6 +35,7 @@ struct OutgoingMessage
{
qpid::client::Message message;
qpid::client::Completion status;
+ std::string subject;
void convert(const qpid::messaging::Message&);
void setSubject(const std::string& subject);
diff --git a/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp b/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
index f2f0f1a9e5..b275db38d7 100644
--- a/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
+++ b/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
@@ -37,10 +37,10 @@ SenderImpl::SenderImpl(SessionImpl& _parent, const std::string& _name,
void SenderImpl::send(const qpid::messaging::Message& message, bool sync)
{
if (unreliable) { // immutable, don't need lock
- UnreliableSend f(*this, &message);
+ UnreliableSend f(*this, message);
parent->execute(f);
} else {
- Send f(*this, &message);
+ Send f(*this, message);
while (f.repeat) parent->execute(f);
}
if (sync) parent->sync(true);
@@ -117,8 +117,8 @@ void SenderImpl::sendImpl(const qpid::messaging::Message& m)
{
sys::Mutex::ScopedLock l(lock);
std::auto_ptr<OutgoingMessage> msg(new OutgoingMessage());
- msg->convert(m);
msg->setSubject(m.getSubject().empty() ? address.getSubject() : m.getSubject());
+ msg->convert(m);
outgoing.push_back(msg.release());
sink->send(session, name, outgoing.back());
}
@@ -127,8 +127,8 @@ void SenderImpl::sendUnreliable(const qpid::messaging::Message& m)
{
sys::Mutex::ScopedLock l(lock);
OutgoingMessage msg;
- msg.convert(m);
msg.setSubject(m.getSubject().empty() ? address.getSubject() : m.getSubject());
+ msg.convert(m);
sink->send(session, name, msg);
}
diff --git a/cpp/src/qpid/client/amqp0_10/SenderImpl.h b/cpp/src/qpid/client/amqp0_10/SenderImpl.h
index c10c77ae18..d75863c743 100644
--- a/cpp/src/qpid/client/amqp0_10/SenderImpl.h
+++ b/cpp/src/qpid/client/amqp0_10/SenderImpl.h
@@ -99,32 +99,32 @@ class SenderImpl : public qpid::messaging::SenderImpl
struct Send : Command
{
- const qpid::messaging::Message* message;
+ const qpid::messaging::Message& message;
bool repeat;
- Send(SenderImpl& i, const qpid::messaging::Message* m) : Command(i), message(m), repeat(true) {}
+ Send(SenderImpl& i, const qpid::messaging::Message& m) : Command(i), message(m), repeat(true) {}
void operator()()
{
impl.waitForCapacity();
//from this point message will be recorded if there is any
//failure (and replayed) so need not repeat the call
repeat = false;
- impl.sendImpl(*message);
+ impl.sendImpl(message);
}
};
struct UnreliableSend : Command
{
- const qpid::messaging::Message* message;
+ const qpid::messaging::Message& message;
- UnreliableSend(SenderImpl& i, const qpid::messaging::Message* m) : Command(i), message(m) {}
+ UnreliableSend(SenderImpl& i, const qpid::messaging::Message& m) : Command(i), message(m) {}
void operator()()
{
//TODO: ideally want to put messages on the outbound
//queue and pull them off in io thread, but the old
//0-10 client doesn't support that option so for now
//we simply don't queue unreliable messages
- impl.sendUnreliable(*message);
+ impl.sendUnreliable(message);
}
};