summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2009-11-19 11:46:00 +0000
committerGordon Sim <gsim@apache.org>2009-11-19 11:46:00 +0000
commit1ea31fb5ea5e9240776600d81ae4742aefa51320 (patch)
tree41416cfa50902b240ef8a7175ee15718a9dea91e
parent82faa3a8fe5406506855e99373734e84f3f0d581 (diff)
downloadqpid-python-1ea31fb5ea5e9240776600d81ae4742aefa51320.tar.gz
QPID-664: Add spout & drain examples as per python client
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@882118 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/examples/messaging/CMakeLists.txt3
-rw-r--r--cpp/examples/messaging/Makefile.am8
-rw-r--r--cpp/examples/messaging/drain.cpp125
-rw-r--r--cpp/examples/messaging/spout.cpp190
-rw-r--r--cpp/include/qpid/messaging/Message.h4
-rw-r--r--cpp/src/qpid/messaging/Address.cpp2
-rw-r--r--cpp/src/qpid/messaging/MapContent.cpp1
-rw-r--r--cpp/src/qpid/messaging/Variant.cpp8
8 files changed, 335 insertions, 6 deletions
diff --git a/cpp/examples/messaging/CMakeLists.txt b/cpp/examples/messaging/CMakeLists.txt
index 31310d4ae2..2274474d31 100644
--- a/cpp/examples/messaging/CMakeLists.txt
+++ b/cpp/examples/messaging/CMakeLists.txt
@@ -17,6 +17,9 @@
# under the License.
#
+add_example(messaging drain)
+add_example(messaging spout)
+
add_example(messaging queue_receiver)
add_example(messaging queue_sender)
diff --git a/cpp/examples/messaging/Makefile.am b/cpp/examples/messaging/Makefile.am
index 70a7fd59a6..b9f865625d 100644
--- a/cpp/examples/messaging/Makefile.am
+++ b/cpp/examples/messaging/Makefile.am
@@ -21,7 +21,13 @@ examplesdir=$(pkgdatadir)/examples/messaging
MAKELDFLAGS=$(CLIENTFLAGS)
include $(top_srcdir)/examples/makedist.mk
-noinst_PROGRAMS=queue_sender queue_receiver topic_sender topic_receiver client server map_sender map_receiver
+noinst_PROGRAMS=drain spout queue_sender queue_receiver topic_sender topic_receiver client server map_sender map_receiver
+
+drain_SOURCES=drain.cpp
+drain_LDADD=$(CLIENT_LIB)
+
+spout_SOURCES=spout.cpp
+spout_LDADD=$(CLIENT_LIB)
queue_sender_SOURCES=queue_sender.cpp
queue_sender_LDADD=$(CLIENT_LIB)
diff --git a/cpp/examples/messaging/drain.cpp b/cpp/examples/messaging/drain.cpp
new file mode 100644
index 0000000000..2bd9fcdac8
--- /dev/null
+++ b/cpp/examples/messaging/drain.cpp
@@ -0,0 +1,125 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/messaging/Connection.h>
+#include <qpid/messaging/MapView.h>
+#include <qpid/messaging/Message.h>
+#include <qpid/messaging/Receiver.h>
+#include <qpid/messaging/Session.h>
+#include <qpid/Exception.h>
+#include <qpid/Options.h>
+#include <qpid/log/Logger.h>
+#include <qpid/log/Options.h>
+#include <qpid/sys/Time.h>
+
+#include <iostream>
+
+using namespace qpid::messaging;
+using qpid::sys::Duration;
+using qpid::sys::TIME_INFINITE;
+using qpid::sys::TIME_SEC;
+
+struct Options : public qpid::Options
+{
+ bool help;
+ std::string url;
+ std::string address;
+ int64_t timeout;
+ bool forever;
+ qpid::log::Options log;
+
+ Options(const std::string& argv0=std::string())
+ : qpid::Options("Options"),
+ help(false),
+ url("amqp:tcp:127.0.0.1"),
+ timeout(0),
+ forever(false),
+ log(argv0)
+ {
+ addOptions()
+ ("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to")
+ ("address,a", qpid::optValue(address, "ADDRESS"), "address to drain from")
+ ("timeout,t", qpid::optValue(timeout, "TIMEOUT"), "timeout in seconds to wait before exiting")
+ ("forever,f", qpid::optValue(forever), "ignore timeout and wait forever")
+ ("help", qpid::optValue(help), "print this usage statement");
+ add(log);
+ }
+
+ Duration getTimeout()
+ {
+ if (forever) return TIME_INFINITE;
+ else return timeout*TIME_SEC;
+
+ }
+ bool parse(int argc, char** argv)
+ {
+ try {
+ qpid::Options::parse(argc, argv);
+ if (address.empty()) throw qpid::Exception("Address must be specified!");
+ qpid::log::Logger::instance().configure(log);
+ if (help) {
+ std::ostringstream msg;
+ std::cout << msg << *this << std::endl << std::endl
+ << "Drains messages from the specified address" << std::endl;
+ return false;
+ } else {
+ return true;
+ }
+ } catch (const std::exception& e) {
+ std::cerr << *this << std::endl << std::endl << e.what() << std::endl;
+ return false;
+ }
+ }
+};
+
+
+int main(int argc, char** argv)
+{
+ Options options(argv[0]);
+ if (options.parse(argc, argv)) {
+ try {
+ Connection connection = Connection::open(options.url);
+ Session session = connection.newSession();
+ Receiver receiver = session.createReceiver(options.address);
+ Duration timeout = options.getTimeout();
+ Message message;
+ while (receiver.fetch(message, timeout)) {
+ std::cout << "Message(properties=" << message.getHeaders() << ", content='" ;
+ if (message.getContentType() == "amqp/map") {
+ std::cout << MapView(message);
+ } else {
+ std::cout << message.getContent();
+ }
+ std::cout << "')" << std::endl;
+ session.acknowledge();
+ }
+ receiver.cancel();
+ session.close();
+ connection.close();
+ return 0;
+ } catch(const std::exception& error) {
+ std::cout << error.what() << std::endl;
+ }
+ }
+ return 1;
+}
+
+
diff --git a/cpp/examples/messaging/spout.cpp b/cpp/examples/messaging/spout.cpp
new file mode 100644
index 0000000000..18abe8863a
--- /dev/null
+++ b/cpp/examples/messaging/spout.cpp
@@ -0,0 +1,190 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/messaging/Address.h>
+#include <qpid/messaging/Connection.h>
+#include <qpid/messaging/MapContent.h>
+#include <qpid/messaging/Message.h>
+#include <qpid/messaging/Sender.h>
+#include <qpid/messaging/Session.h>
+#include <qpid/messaging/Variant.h>
+#include <qpid/framing/Uuid.h>
+#include <qpid/Exception.h>
+#include <qpid/Options.h>
+#include <qpid/log/Logger.h>
+#include <qpid/log/Options.h>
+#include <qpid/sys/Time.h>
+
+#include <iostream>
+#include <vector>
+
+#include <boost/format.hpp>
+
+using namespace qpid::messaging;
+using qpid::framing::Uuid;
+using qpid::sys::AbsTime;
+using qpid::sys::now;
+using qpid::sys::TIME_INFINITE;
+
+typedef std::vector<std::string> string_vector;
+
+struct Options : public qpid::Options
+{
+ bool help;
+ std::string url;
+ std::string address;
+ int64_t timeout;
+ uint count;
+ std::string id;
+ std::string replyto;
+ string_vector properties;
+ string_vector entries;
+ std::string content;
+ qpid::log::Options log;
+
+ Options(const std::string& argv0=std::string())
+ : qpid::Options("Options"),
+ help(false),
+ url("amqp:tcp:127.0.0.1"),
+ timeout(TIME_INFINITE),
+ count(1),
+ log(argv0)
+ {
+ addOptions()
+ ("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to")
+ ("address,a", qpid::optValue(address, "ADDRESS"), "address to drain from")
+ ("timeout,t", qpid::optValue(timeout, "TIMEOUT"), "exit after the specified time")
+ ("count,c", qpid::optValue(count, "COUNT"), "stop after count messages have been sent, zero disables")
+ ("id,i", qpid::optValue(id, "ID"), "use the supplied id instead of generating one")
+ ("reply-to", qpid::optValue(replyto, "REPLY-TO"), "specify reply-to address")
+ ("property,P", qpid::optValue(properties, "NAME=VALUE"), "specify message property")
+ ("entry,E", qpid::optValue(entries, "NAME=VALUE"), "specify entry for map content")
+ ("content", qpid::optValue(content, "CONTENT"), "specify textual content")
+ ("help", qpid::optValue(help), "print this usage statement");
+ add(log);
+ }
+
+ bool parse(int argc, char** argv)
+ {
+ try {
+ qpid::Options::parse(argc, argv);
+ if (address.empty()) throw qpid::Exception("Address must be specified!");
+ qpid::log::Logger::instance().configure(log);
+ if (help) {
+ std::ostringstream msg;
+ std::cout << msg << *this << std::endl << std::endl
+ << "Drains messages from the specified address" << std::endl;
+ return false;
+ } else {
+ return true;
+ }
+ } catch (const std::exception& e) {
+ std::cerr << *this << std::endl << std::endl << e.what() << std::endl;
+ return false;
+ }
+ }
+
+ static bool nameval(const std::string& in, std::string& name, std::string& value)
+ {
+ std::string::size_type i = in.find("=");
+ if (i == std::string::npos) {
+ name = in;
+ return false;
+ } else {
+ name = in.substr(0, i);
+ if (i+1 < in.size()) {
+ value = in.substr(i+1);
+ return true;
+ } else {
+ return false;
+ }
+ }
+ }
+
+ static void setProperty(Message& message, const std::string& property)
+ {
+ std::string name;
+ std::string value;
+ if (nameval(property, name, value)) {
+ message.getHeaders()[name] = value;
+ } else {
+ message.getHeaders()[name] = Variant();
+ }
+ }
+
+ void setProperties(Message& message) const
+ {
+ for (string_vector::const_iterator i = properties.begin(); i != properties.end(); ++i) {
+ setProperty(message, *i);
+ }
+ }
+
+ void setEntries(MapContent& content) const
+ {
+ for (string_vector::const_iterator i = entries.begin(); i != entries.end(); ++i) {
+ std::string name;
+ std::string value;
+ if (nameval(*i, name, value)) {
+ content[name] = value;
+ } else {
+ content[name] = Variant();
+ }
+ }
+ }
+};
+
+
+int main(int argc, char** argv)
+{
+ Options options(argv[0]);
+ if (options.parse(argc, argv)) {
+ try {
+ Connection connection = Connection::open(options.url);
+ Session session = connection.newSession();
+ Sender sender = session.createSender(options.address);
+
+ Message message;
+ options.setProperties(message);
+ if (options.entries.size()) {
+ MapContent content(message);
+ options.setEntries(content);
+ content.encode();
+ } else if (options.content.size()) {
+ message.setContent(options.content);
+ message.setContentType("text/plain; charset=utf8");
+ }
+ AbsTime end(now(), options.timeout);
+ for (uint count = 0; (count < options.count || options.count == 0) && end > now(); count++) {
+ if (!options.replyto.empty()) message.setReplyTo(Address(options.replyto));
+ std::string id = options.id.empty() ? Uuid(true).str() : options.id;
+ message.getHeaders()["spout-id"] = (boost::format("%1%:%2%") % id % count).str();
+ sender.send(message);
+ }
+ connection.close();
+ return 0;
+ } catch(const std::exception& error) {
+ std::cout << error.what() << std::endl;
+ }
+ }
+ return 1;
+}
+
+
diff --git a/cpp/include/qpid/messaging/Message.h b/cpp/include/qpid/messaging/Message.h
index 1acccecad0..368fc89772 100644
--- a/cpp/include/qpid/messaging/Message.h
+++ b/cpp/include/qpid/messaging/Message.h
@@ -58,8 +58,8 @@ class Message
QPID_CLIENT_EXTERN void setContentType(const std::string&);
QPID_CLIENT_EXTERN const std::string& getContentType() const;
- QPID_CLIENT_EXTERN const VariantMap& getHeaders() const;
- QPID_CLIENT_EXTERN VariantMap& getHeaders();
+ QPID_CLIENT_EXTERN const Variant::Map& getHeaders() const;
+ QPID_CLIENT_EXTERN Variant::Map& getHeaders();
QPID_CLIENT_EXTERN const std::string& getContent() const;
QPID_CLIENT_EXTERN std::string& getContent();
diff --git a/cpp/src/qpid/messaging/Address.cpp b/cpp/src/qpid/messaging/Address.cpp
index fff7c453b8..5262cec0db 100644
--- a/cpp/src/qpid/messaging/Address.cpp
+++ b/cpp/src/qpid/messaging/Address.cpp
@@ -95,7 +95,7 @@ std::string Address::toStr() const
std::stringstream out;
out << impl->name;
if (!impl->subject.empty()) out << SUBJECT_DIVIDER << impl->subject;
- if (!impl->options.empty()) out << OPTIONS_DIVIDER << " {" << impl->options << "}";
+ if (!impl->options.empty()) out << OPTIONS_DIVIDER << impl->options;
return out.str();
}
Address::operator bool() const { return !impl->name.empty(); }
diff --git a/cpp/src/qpid/messaging/MapContent.cpp b/cpp/src/qpid/messaging/MapContent.cpp
index c653561fc9..6dba22be99 100644
--- a/cpp/src/qpid/messaging/MapContent.cpp
+++ b/cpp/src/qpid/messaging/MapContent.cpp
@@ -41,6 +41,7 @@ class MapContentImpl : public Variant
{
qpid::client::amqp0_10::MapCodec codec;
codec.encode(*this, msg->getContent());
+ msg->setContentType(qpid::client::amqp0_10::MapCodec::contentType);
}
};
diff --git a/cpp/src/qpid/messaging/Variant.cpp b/cpp/src/qpid/messaging/Variant.cpp
index 3b0c3312ca..71f9fbe646 100644
--- a/cpp/src/qpid/messaging/Variant.cpp
+++ b/cpp/src/qpid/messaging/Variant.cpp
@@ -566,19 +566,23 @@ Variant::operator const char*() const { return asString().c_str(); }
std::ostream& operator<<(std::ostream& out, const Variant::Map& map)
{
+ out << "{";
for (Variant::Map::const_iterator i = map.begin(); i != map.end(); ++i) {
if (i != map.begin()) out << ", ";
out << i->first << ":" << i->second;
}
+ out << "}";
return out;
}
std::ostream& operator<<(std::ostream& out, const Variant::List& list)
{
+ out << "[";
for (Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) {
if (i != list.begin()) out << ", ";
out << *i;
}
+ out << "]";
return out;
}
@@ -586,10 +590,10 @@ std::ostream& operator<<(std::ostream& out, const Variant& value)
{
switch (value.getType()) {
case VAR_MAP:
- out << "{" << value.asMap() << "}";
+ out << value.asMap();
break;
case VAR_LIST:
- out << "[" << value.asList() << "]";
+ out << value.asList();
break;
case VAR_VOID:
out << "<void>";