summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/client/amqp0_10/FailoverUpdates.cpp2
-rw-r--r--cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp7
-rw-r--r--cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp7
-rw-r--r--cpp/src/qpid/messaging/Message.cpp10
-rw-r--r--cpp/src/qpid/messaging/MessageImpl.cpp2
-rw-r--r--cpp/src/qpid/messaging/MessageImpl.h1
-rw-r--r--cpp/src/tests/ClientMessage.cpp4
-rw-r--r--cpp/src/tests/MessagingSessionTests.cpp4
-rw-r--r--cpp/src/tests/qpid_recv.cpp6
-rw-r--r--cpp/src/tests/qpid_send.cpp8
-rw-r--r--cpp/src/tests/qpid_stream.cpp4
11 files changed, 33 insertions, 22 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/FailoverUpdates.cpp b/cpp/src/qpid/client/amqp0_10/FailoverUpdates.cpp
index 8ef62e4d41..5e526a2ffc 100644
--- a/cpp/src/qpid/client/amqp0_10/FailoverUpdates.cpp
+++ b/cpp/src/qpid/client/amqp0_10/FailoverUpdates.cpp
@@ -56,7 +56,7 @@ struct FailoverUpdatesImpl : qpid::sys::Runnable
try {
Message message;
while (!quit && receiver.fetch(message)) {
- connection.setOption("urls", message.getHeaders()["amq.failover"]);
+ connection.setOption("urls", message.getProperties()["amq.failover"]);
session.acknowledge();
}
} catch (const qpid::TransportFailure& e) {
diff --git a/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
index 3f5cccfedb..345ebfb66d 100644
--- a/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
+++ b/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
@@ -279,7 +279,8 @@ void populateHeaders(qpid::messaging::Message& message,
if (deliveryProperties) {
message.setTtl(qpid::messaging::Duration(deliveryProperties->getTtl()));
message.setDurable(deliveryProperties->getDeliveryMode() == DELIVERY_MODE_PERSISTENT);
- MessageImplAccess::get(message).redelivered = deliveryProperties->getRedelivered();
+ message.setPriority(deliveryProperties->getPriority());
+ message.setRedelivered(deliveryProperties->getRedelivered());
}
if (messageProperties) {
message.setContentType(messageProperties->getContentType());
@@ -287,8 +288,8 @@ void populateHeaders(qpid::messaging::Message& message,
message.setReplyTo(AddressResolution::convert(messageProperties->getReplyTo()));
}
message.setSubject(messageProperties->getApplicationHeaders().getAsString(SUBJECT));
- message.getHeaders().clear();
- translate(messageProperties->getApplicationHeaders(), message.getHeaders());
+ message.getProperties().clear();
+ translate(messageProperties->getApplicationHeaders(), message.getProperties());
message.setCorrelationId(messageProperties->getCorrelationId());
message.setUserId(messageProperties->getUserId());
}
diff --git a/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp b/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
index d0d945b934..b19b26f903 100644
--- a/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
+++ b/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
@@ -46,12 +46,15 @@ void OutgoingMessage::convert(const qpid::messaging::Message& from)
if (address) {
message.getMessageProperties().setReplyTo(AddressResolution::convert(address));
}
- translate(from.getHeaders(), message.getMessageProperties().getApplicationHeaders());
+ translate(from.getProperties(), message.getMessageProperties().getApplicationHeaders());
message.getDeliveryProperties().setTtl(from.getTtl().getMilliseconds());
if (from.getDurable()) {
message.getDeliveryProperties().setDeliveryMode(DELIVERY_MODE_PERSISTENT);
}
-
+ if (from.getRedelivered()) {
+ message.getDeliveryProperties().setRedelivered(true);
+ }
+ if (from.getPriority()) message.getDeliveryProperties().setPriority(from.getPriority());
}
namespace {
diff --git a/cpp/src/qpid/messaging/Message.cpp b/cpp/src/qpid/messaging/Message.cpp
index 822659f6ef..84245b7296 100644
--- a/cpp/src/qpid/messaging/Message.cpp
+++ b/cpp/src/qpid/messaging/Message.cpp
@@ -52,16 +52,20 @@ const std::string& Message::getUserId() const { return impl->userId; }
void Message::setCorrelationId(const std::string& id) { impl->correlationId = id; }
const std::string& Message::getCorrelationId() const { return impl->correlationId; }
+uint8_t Message::getPriority() const { return impl->priority; }
+void Message::setPriority(uint8_t priority) { impl->priority = priority; }
+
void Message::setTtl(Duration ttl) { impl->ttl = ttl.getMilliseconds(); }
Duration Message::getTtl() const { return Duration(impl->ttl); }
void Message::setDurable(bool durable) { impl->durable = durable; }
bool Message::getDurable() const { return impl->durable; }
-bool Message::isRedelivered() const { return impl->redelivered; }
+bool Message::getRedelivered() const { return impl->redelivered; }
+void Message::setRedelivered(bool redelivered) { impl->redelivered = redelivered; }
-const VariantMap& Message::getHeaders() const { return impl->getHeaders(); }
-VariantMap& Message::getHeaders() { return impl->getHeaders(); }
+const VariantMap& Message::getProperties() const { return impl->getHeaders(); }
+VariantMap& Message::getProperties() { return impl->getHeaders(); }
void Message::setContent(const std::string& c) { impl->setBytes(c); }
void Message::setContent(const char* chars, size_t count) { impl->setBytes(chars, count); }
diff --git a/cpp/src/qpid/messaging/MessageImpl.cpp b/cpp/src/qpid/messaging/MessageImpl.cpp
index 5f8ac20551..dea6681244 100644
--- a/cpp/src/qpid/messaging/MessageImpl.cpp
+++ b/cpp/src/qpid/messaging/MessageImpl.cpp
@@ -31,12 +31,14 @@ const std::string EMPTY_STRING = "";
using namespace qpid::types;
MessageImpl::MessageImpl(const std::string& c) :
+ priority(0),
ttl(0),
durable(false),
redelivered(false),
bytes(c),
internalId(0) {}
MessageImpl::MessageImpl(const char* chars, size_t count) :
+ priority(0),
ttl(0),
durable (false),
redelivered(false),
diff --git a/cpp/src/qpid/messaging/MessageImpl.h b/cpp/src/qpid/messaging/MessageImpl.h
index 3898ff9f98..7793fc7943 100644
--- a/cpp/src/qpid/messaging/MessageImpl.h
+++ b/cpp/src/qpid/messaging/MessageImpl.h
@@ -36,6 +36,7 @@ struct MessageImpl
std::string messageId;
std::string userId;
std::string correlationId;
+ uint8_t priority;
uint64_t ttl;
bool durable;
bool redelivered;
diff --git a/cpp/src/tests/ClientMessage.cpp b/cpp/src/tests/ClientMessage.cpp
index ab1cf9d102..994c46552c 100644
--- a/cpp/src/tests/ClientMessage.cpp
+++ b/cpp/src/tests/ClientMessage.cpp
@@ -34,11 +34,11 @@ QPID_AUTO_TEST_CASE(testCopyConstructor)
{
Message m("my-data");
m.setSubject("my-subject");
- m.getHeaders()["a"] = "ABC";
+ m.getProperties()["a"] = "ABC";
Message c(m);
BOOST_CHECK_EQUAL(m.getContent(), c.getContent());
BOOST_CHECK_EQUAL(m.getSubject(), c.getSubject());
- BOOST_CHECK_EQUAL(m.getHeaders()["a"], c.getHeaders()["a"]);
+ BOOST_CHECK_EQUAL(m.getProperties()["a"], c.getProperties()["a"]);
}
QPID_AUTO_TEST_SUITE_END()
diff --git a/cpp/src/tests/MessagingSessionTests.cpp b/cpp/src/tests/MessagingSessionTests.cpp
index c4659493ce..d1f7441216 100644
--- a/cpp/src/tests/MessagingSessionTests.cpp
+++ b/cpp/src/tests/MessagingSessionTests.cpp
@@ -241,7 +241,7 @@ QPID_AUTO_TEST_CASE(testSendReceiveHeaders)
Sender sender = fix.session.createSender(fix.queue);
Message out("test-message");
for (uint i = 0; i < 10; ++i) {
- out.getHeaders()["a"] = i;
+ out.getProperties()["a"] = i;
sender.send(out);
}
Receiver receiver = fix.session.createReceiver(fix.queue);
@@ -249,7 +249,7 @@ QPID_AUTO_TEST_CASE(testSendReceiveHeaders)
for (uint i = 0; i < 10; ++i) {
BOOST_CHECK(receiver.fetch(in, Duration::SECOND * 5));
BOOST_CHECK_EQUAL(in.getContent(), out.getContent());
- BOOST_CHECK_EQUAL(in.getHeaders()["a"].asUint32(), i);
+ BOOST_CHECK_EQUAL(in.getProperties()["a"].asUint32(), i);
fix.session.acknowledge();
}
}
diff --git a/cpp/src/tests/qpid_recv.cpp b/cpp/src/tests/qpid_recv.cpp
index 994ef00960..6e384e9672 100644
--- a/cpp/src/tests/qpid_recv.cpp
+++ b/cpp/src/tests/qpid_recv.cpp
@@ -131,7 +131,7 @@ class SequenceTracker
bool isDuplicate(Message& message)
{
- uint sn = message.getHeaders()["sn"];
+ uint sn = message.getProperties()["sn"];
if (lastSn < sn) {
lastSn = sn;
return false;
@@ -175,8 +175,8 @@ int main(int argc, char ** argv)
if (msg.getUserId().size()) std::cout << "UserId: " << msg.getUserId() << std::endl;
if (msg.getTtl().getMilliseconds()) std::cout << "TTL: " << msg.getTtl().getMilliseconds() << std::endl;
if (msg.getDurable()) std::cout << "Durable: true" << std::endl;
- if (msg.isRedelivered()) std::cout << "Redelivered: true" << std::endl;
- std::cout << "Headers: " << msg.getHeaders() << std::endl;
+ if (msg.getRedelivered()) std::cout << "Redelivered: true" << std::endl;
+ std::cout << "Properties: " << msg.getProperties() << std::endl;
std::cout << std::endl;
}
std::cout << msg.getContent() << std::endl;//TODO: handle map or list messages
diff --git a/cpp/src/tests/qpid_send.cpp b/cpp/src/tests/qpid_send.cpp
index a75d21e0a5..564007958e 100644
--- a/cpp/src/tests/qpid_send.cpp
+++ b/cpp/src/tests/qpid_send.cpp
@@ -145,9 +145,9 @@ struct Options : public qpid::Options
std::string name;
std::string value;
if (nameval(property, name, value)) {
- message.getHeaders()[name] = value;
+ message.getProperties()[name] = value;
} else {
- message.getHeaders()[name] = Variant();
+ message.getProperties()[name] = Variant();
}
}
@@ -203,7 +203,7 @@ int main(int argc, char ** argv)
uint txCount = 0;
while (getline(std::cin, content)) {
msg.setContent(content);
- msg.getHeaders()["sn"] = ++sent;
+ msg.getProperties()["sn"] = ++sent;
sender.send(msg);
if (opts.tx && (sent % opts.tx == 0)) {
if (opts.rollbackFrequency && (++txCount % opts.rollbackFrequency == 0)) {
@@ -214,7 +214,7 @@ int main(int argc, char ** argv)
}
}
for (uint i = opts.sendEos; i > 0; --i) {
- msg.getHeaders()["sn"] = ++sent;
+ msg.getProperties()["sn"] = ++sent;
msg.setContent(EOS);//TODO: add in ability to send digest or similar
sender.send(msg);
}
diff --git a/cpp/src/tests/qpid_stream.cpp b/cpp/src/tests/qpid_stream.cpp
index e451198ef4..642b7a821f 100644
--- a/cpp/src/tests/qpid_stream.cpp
+++ b/cpp/src/tests/qpid_stream.cpp
@@ -119,7 +119,7 @@ struct Publish : Client
qpid::sys::AbsTime start = qpid::sys::now();
while (true) {
qpid::sys::AbsTime sentAt = qpid::sys::now();
- msg.getHeaders()[TIMESTAMP] = timestamp(sentAt);
+ msg.getProperties()[TIMESTAMP] = timestamp(sentAt);
sender.send(msg);
++sent;
qpid::sys::AbsTime waitTill(start, sent*interval);
@@ -151,7 +151,7 @@ struct Consume : Client
}
//calculate latency
uint64_t receivedAt = timestamp(qpid::sys::now());
- uint64_t sentAt = msg.getHeaders()[TIMESTAMP].asUint64();
+ uint64_t sentAt = msg.getProperties()[TIMESTAMP].asUint64();
double latency = ((double) (receivedAt - sentAt)) / qpid::sys::TIME_MSEC;
//update avg, min & max