summaryrefslogtreecommitdiff
path: root/qpid/cpp/examples
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/examples')
-rw-r--r--qpid/cpp/examples/CMakeLists.txt1
-rw-r--r--qpid/cpp/examples/Makefile.am2
-rw-r--r--qpid/cpp/examples/messaging/CMakeLists.txt32
-rw-r--r--qpid/cpp/examples/messaging/Makefile.am54
-rw-r--r--qpid/cpp/examples/messaging/client.cpp76
-rw-r--r--qpid/cpp/examples/messaging/map_receiver.cpp54
-rw-r--r--qpid/cpp/examples/messaging/map_sender.cpp66
-rw-r--r--qpid/cpp/examples/messaging/queue_listener.cpp82
-rw-r--r--qpid/cpp/examples/messaging/queue_receiver.cpp65
-rw-r--r--qpid/cpp/examples/messaging/queue_sender.cpp65
-rw-r--r--qpid/cpp/examples/messaging/server.cpp76
-rw-r--r--qpid/cpp/examples/messaging/topic_listener.cpp82
-rw-r--r--qpid/cpp/examples/messaging/topic_receiver.cpp66
-rw-r--r--qpid/cpp/examples/messaging/topic_sender.cpp78
14 files changed, 798 insertions, 1 deletions
diff --git a/qpid/cpp/examples/CMakeLists.txt b/qpid/cpp/examples/CMakeLists.txt
index 7d0a4d0d21..126adab32e 100644
--- a/qpid/cpp/examples/CMakeLists.txt
+++ b/qpid/cpp/examples/CMakeLists.txt
@@ -62,3 +62,4 @@ add_subdirectory(qmf-console)
add_subdirectory(request-response)
add_subdirectory(tradedemo)
add_subdirectory(xml-exchange)
+add_subdirectory(messaging)
diff --git a/qpid/cpp/examples/Makefile.am b/qpid/cpp/examples/Makefile.am
index 9c355b84e4..b526424e2a 100644
--- a/qpid/cpp/examples/Makefile.am
+++ b/qpid/cpp/examples/Makefile.am
@@ -16,7 +16,7 @@
# specific language governing permissions and limitations
# under the License.
#
-SUBDIRS = direct fanout pub-sub request-response failover qmf-console tradedemo
+SUBDIRS = direct fanout pub-sub request-response failover qmf-console tradedemo messaging
if HAVE_XML
SUBDIRS += xml-exchange
broker_args = "--no-module-dir --data-dir \"\" --auth no --load-module $(top_builddir)/src/.libs/xml.so"
diff --git a/qpid/cpp/examples/messaging/CMakeLists.txt b/qpid/cpp/examples/messaging/CMakeLists.txt
new file mode 100644
index 0000000000..e7885d0b50
--- /dev/null
+++ b/qpid/cpp/examples/messaging/CMakeLists.txt
@@ -0,0 +1,32 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+add_example(messaging queue_listener)
+add_example(messaging queue_receiver)
+add_example(messaging queue_sender)
+
+add_example(messaging topic_listener)
+add_example(messaging topic_receiver)
+add_example(messaging topic_sender)
+
+add_example(messaging map_receiver)
+add_example(messaging map_sender)
+
+add_example(messaging client)
+add_example(messaging server)
diff --git a/qpid/cpp/examples/messaging/Makefile.am b/qpid/cpp/examples/messaging/Makefile.am
new file mode 100644
index 0000000000..250e549e82
--- /dev/null
+++ b/qpid/cpp/examples/messaging/Makefile.am
@@ -0,0 +1,54 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+examplesdir=$(pkgdatadir)/examples/messaging
+
+MAKELDFLAGS=$(CLIENTFLAGS)
+include $(top_srcdir)/examples/makedist.mk
+
+noinst_PROGRAMS=queue_sender queue_listener queue_receiver topic_sender topic_listener topic_receiver client server map_sender map_receiver
+
+queue_sender_SOURCES=queue_sender.cpp
+queue_sender_LDADD=$(CLIENT_LIB)
+
+queue_listener_SOURCES=queue_listener.cpp
+queue_listener_LDADD=$(CLIENT_LIB)
+
+queue_receiver_SOURCES=queue_receiver.cpp
+queue_receiver_LDADD=$(CLIENT_LIB)
+
+topic_sender_SOURCES=topic_sender.cpp
+topic_sender_LDADD=$(CLIENT_LIB)
+
+topic_listener_SOURCES=topic_listener.cpp
+topic_listener_LDADD=$(CLIENT_LIB)
+
+topic_receiver_SOURCES=topic_receiver.cpp
+topic_receiver_LDADD=$(CLIENT_LIB)
+
+client_SOURCES=client.cpp
+client_LDADD=$(CLIENT_LIB)
+
+server_SOURCES=server.cpp
+server_LDADD=$(CLIENT_LIB)
+
+map_sender_SOURCES=map_sender.cpp
+map_sender_LDADD=$(CLIENT_LIB)
+
+map_receiver_SOURCES=map_receiver.cpp
+map_receiver_LDADD=$(CLIENT_LIB)
diff --git a/qpid/cpp/examples/messaging/client.cpp b/qpid/cpp/examples/messaging/client.cpp
new file mode 100644
index 0000000000..45c065880b
--- /dev/null
+++ b/qpid/cpp/examples/messaging/client.cpp
@@ -0,0 +1,76 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/messaging/Address.h>
+#include <qpid/messaging/Connection.h>
+#include <qpid/messaging/Message.h>
+#include <qpid/messaging/Receiver.h>
+#include <qpid/messaging/Sender.h>
+#include <qpid/messaging/Session.h>
+
+#include <cstdlib>
+#include <iostream>
+
+#include <sstream>
+
+using namespace qpid::messaging;
+
+using std::stringstream;
+using std::string;
+
+int main(int argc, char** argv) {
+ const char* url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672";
+
+ try {
+ Connection connection = Connection::open(url);
+ Session session = connection.newSession();
+
+ Sender sender = session.createSender("service_queue");
+
+ //create temp queue & receiver...
+ Address responseQueue = session.createTempQueue();
+ Receiver receiver = session.createReceiver(responseQueue);
+
+ // Now send some messages ...
+ string s[] = {
+ "Twas brillig, and the slithy toves",
+ "Did gire and gymble in the wabe.",
+ "All mimsy were the borogroves,",
+ "And the mome raths outgrabe."
+ };
+
+ Message request;
+ request.setReplyTo(responseQueue);
+ for (int i=0; i<4; i++) {
+ request.setContent(s[i]);
+ sender.send(request);
+ Message response = receiver.fetch();
+ std::cout << request.getContent().asString() << " -> " << response.getContent().asString() << std::endl;
+ }
+ connection.close();
+ return 0;
+ } catch(const std::exception& error) {
+ std::cout << error.what() << std::endl;
+ }
+ return 1;
+}
+
+
diff --git a/qpid/cpp/examples/messaging/map_receiver.cpp b/qpid/cpp/examples/messaging/map_receiver.cpp
new file mode 100644
index 0000000000..e6557b1560
--- /dev/null
+++ b/qpid/cpp/examples/messaging/map_receiver.cpp
@@ -0,0 +1,54 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/messaging/Connection.h>
+#include <qpid/messaging/Message.h>
+#include <qpid/messaging/Receiver.h>
+#include <qpid/messaging/Session.h>
+
+#include <cstdlib>
+#include <iostream>
+
+#include <sstream>
+
+using namespace qpid::messaging;
+
+using std::stringstream;
+using std::string;
+
+int main(int argc, char** argv) {
+ const char* url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672";
+
+ try {
+ Connection connection = Connection::open(url);
+ Session session = connection.newSession();
+ Receiver receiver = session.createReceiver("message_queue");
+ Message message = receiver.fetch();
+ std::cout << message.getContent().asMap() << std::endl;
+ session.acknowledge();
+ receiver.cancel();
+ connection.close();
+ return 0;
+ } catch(const std::exception& error) {
+ std::cout << error.what() << std::endl;
+ }
+ return 1;
+}
diff --git a/qpid/cpp/examples/messaging/map_sender.cpp b/qpid/cpp/examples/messaging/map_sender.cpp
new file mode 100644
index 0000000000..9301c1fe1f
--- /dev/null
+++ b/qpid/cpp/examples/messaging/map_sender.cpp
@@ -0,0 +1,66 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/messaging/Connection.h>
+#include <qpid/messaging/Message.h>
+#include <qpid/messaging/Sender.h>
+#include <qpid/messaging/Session.h>
+
+#include <cstdlib>
+#include <iostream>
+
+#include <sstream>
+
+using namespace qpid::messaging;
+
+using std::stringstream;
+using std::string;
+
+int main(int argc, char** argv) {
+ const char* url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672";
+
+ try {
+ Connection connection = Connection::open(url);
+ Session session = connection.newSession();
+ Sender sender = session.createSender("message_queue");
+
+ Message message;
+ message.getContent()["id"] = 987654321;
+ message.getContent()["name"] = "Widget";
+ message.getContent()["price"] = 0.99;//bad use of floating point number, just an example!
+ Variant::List colours;
+ colours.push_back(Variant("red"));
+ colours.push_back(Variant("green"));
+ colours.push_back(Variant("white"));
+ message.getContent()["colours"] = colours;
+
+ sender.send(message);
+ session.sync();
+
+ connection.close();
+ return 0;
+ } catch(const std::exception& error) {
+ std::cout << error.what() << std::endl;
+ }
+ return 1;
+}
+
+
diff --git a/qpid/cpp/examples/messaging/queue_listener.cpp b/qpid/cpp/examples/messaging/queue_listener.cpp
new file mode 100644
index 0000000000..099e8e145a
--- /dev/null
+++ b/qpid/cpp/examples/messaging/queue_listener.cpp
@@ -0,0 +1,82 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/messaging/Connection.h>
+#include <qpid/messaging/Session.h>
+#include <qpid/messaging/Message.h>
+#include <qpid/messaging/MessageListener.h>
+#include <qpid/messaging/Receiver.h>
+
+#include <cstdlib>
+#include <iostream>
+
+using namespace qpid::messaging;
+
+class Listener : public MessageListener
+{
+ public:
+ Listener(const Receiver& receiver);
+ void received(Message& message);
+ bool isFinished();
+ private:
+ Receiver receiver;
+ bool finished;
+};
+
+Listener::Listener(const Receiver& r) : receiver(r), finished(false) {}
+
+bool Listener::isFinished() { return finished; }
+
+void Listener::received(Message& message)
+{
+ std::cout << "Message: " << message.getContent().asString() << std::endl;
+ if (message.getContent().asString() == "That's all, folks!") {
+ std::cout << "Shutting down listener" << std::endl;
+ receiver.cancel();
+ finished = true;
+ }
+}
+
+int main(int argc, char** argv) {
+ const char* url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672";
+
+ try {
+ Connection connection = Connection::open(url);
+ Session session = connection.newSession();
+
+ Receiver receiver = session.createReceiver("message_queue");
+ Listener listener(receiver);
+ receiver.setListener(&listener);
+ receiver.setCapacity(1);
+ receiver.start();
+ while (session.dispatch()) {
+ session.acknowledge();
+ if (listener.isFinished()) break;
+ }
+ connection.close();
+ return 0;
+ } catch(const std::exception& error) {
+ std::cout << error.what() << std::endl;
+ }
+ return 1;
+}
+
+
diff --git a/qpid/cpp/examples/messaging/queue_receiver.cpp b/qpid/cpp/examples/messaging/queue_receiver.cpp
new file mode 100644
index 0000000000..83a44b2ca9
--- /dev/null
+++ b/qpid/cpp/examples/messaging/queue_receiver.cpp
@@ -0,0 +1,65 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/messaging/Connection.h>
+#include <qpid/messaging/Message.h>
+#include <qpid/messaging/Receiver.h>
+#include <qpid/messaging/Session.h>
+
+#include <cstdlib>
+#include <iostream>
+
+#include <sstream>
+
+using namespace qpid::messaging;
+
+using std::stringstream;
+using std::string;
+
+int main(int argc, char** argv) {
+ const char* url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672";
+
+ try {
+ Variant::Map options;
+ if (argc>2) parseOptionString(argv[2], options);
+ Connection connection = Connection::open(url, options);
+ Session session = connection.newSession();
+ Receiver receiver = session.createReceiver("message_queue");
+ while (true) {
+ Message message = receiver.fetch();
+ std::cout << "Message: " << message.getContent() << std::endl;
+ session.acknowledge();
+ if (message.getContent().asString() == "That's all, folks!") {
+ std::cout << "Cancelling receiver" << std::endl;
+ receiver.cancel();
+ break;
+ }
+ }
+
+ connection.close();
+ return 0;
+ } catch(const std::exception& error) {
+ std::cout << error.what() << std::endl;
+ }
+ return 1;
+}
+
+
diff --git a/qpid/cpp/examples/messaging/queue_sender.cpp b/qpid/cpp/examples/messaging/queue_sender.cpp
new file mode 100644
index 0000000000..637e7eb8e4
--- /dev/null
+++ b/qpid/cpp/examples/messaging/queue_sender.cpp
@@ -0,0 +1,65 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/messaging/Connection.h>
+#include <qpid/messaging/Message.h>
+#include <qpid/messaging/Sender.h>
+#include <qpid/messaging/Session.h>
+
+#include <cstdlib>
+#include <iostream>
+
+#include <sstream>
+
+using namespace qpid::messaging;
+
+using std::stringstream;
+using std::string;
+
+int main(int argc, char** argv) {
+ const char* url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672";
+ int count = argc>2 ? atoi(argv[2]) : 10;
+
+ try {
+ Connection connection = Connection::open(url);
+ Session session = connection.newSession();
+ Sender sender = session.createSender("message_queue");
+
+ // Now send some messages ...
+ for (int i=0; i<count; i++) {
+ Message message;
+ message.getContent() << "Message " << i;
+ sender.send(message);
+ }
+
+ // And send a final message to indicate termination.
+ Message message("That's all, folks!");
+ sender.send(message);
+ session.sync();
+ connection.close();
+ return 0;
+ } catch(const std::exception& error) {
+ std::cout << error.what() << std::endl;
+ }
+ return 1;
+}
+
+
diff --git a/qpid/cpp/examples/messaging/server.cpp b/qpid/cpp/examples/messaging/server.cpp
new file mode 100644
index 0000000000..38f4601ff6
--- /dev/null
+++ b/qpid/cpp/examples/messaging/server.cpp
@@ -0,0 +1,76 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/messaging/Address.h>
+#include <qpid/messaging/Connection.h>
+#include <qpid/messaging/Message.h>
+#include <qpid/messaging/Receiver.h>
+#include <qpid/messaging/Sender.h>
+#include <qpid/messaging/Session.h>
+#include <qpid/messaging/Variant.h>
+
+#include <algorithm>
+#include <cstdlib>
+#include <iostream>
+#include <memory>
+#include <sstream>
+
+using namespace qpid::messaging;
+
+using std::stringstream;
+using std::string;
+
+int main(int argc, char** argv) {
+ const char* url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672";
+
+ try {
+ Connection connection = Connection::open(url);
+ Session session = connection.newSession();
+ Receiver receiver = session.createReceiver("service_queue");
+
+ while (true) {
+ Message request = receiver.fetch();
+ const Address& address = request.getReplyTo();
+ if (address) {
+ Sender sender = session.createSender(address);
+ std::string s = request.getContent().asString();
+ std::transform(s.begin(), s.end(), s.begin(), toupper);
+ Message response(s);
+ sender.send(response);
+ std::cout << "Processed request: "
+ << request.getContent().asString()
+ << " -> "
+ << response.getContent().asString() << std::endl;
+ session.acknowledge();
+ } else {
+ std::cerr << "Error: no reply address specified for request: " << request.getContent().asString() << std::endl;
+ session.reject(request);
+ }
+ }
+ connection.close();
+ return 0;
+ } catch(const std::exception& error) {
+ std::cout << error.what() << std::endl;
+ }
+ return 1;
+}
+
+
diff --git a/qpid/cpp/examples/messaging/topic_listener.cpp b/qpid/cpp/examples/messaging/topic_listener.cpp
new file mode 100644
index 0000000000..700e03cdf9
--- /dev/null
+++ b/qpid/cpp/examples/messaging/topic_listener.cpp
@@ -0,0 +1,82 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/messaging/Connection.h>
+#include <qpid/messaging/Filter.h>
+#include <qpid/messaging/Message.h>
+#include <qpid/messaging/MessageListener.h>
+#include <qpid/messaging/Session.h>
+#include <qpid/messaging/Receiver.h>
+
+#include <cstdlib>
+#include <iostream>
+
+using namespace qpid::messaging;
+
+class Listener : public MessageListener
+{
+ public:
+ Listener(const Receiver& receiver);
+ void received(Message& message);
+ bool isFinished();
+ private:
+ Receiver receiver;
+ bool finished;
+};
+
+Listener::Listener(const Receiver& r) : receiver(r), finished(false) {}
+
+bool Listener::isFinished() { return finished; }
+
+void Listener::received(Message& message)
+{
+ std::cout << "Message: " << message.getContent().asString() << std::endl;
+ if (message.getContent().asString() == "That's all, folks!") {
+ std::cout << "Shutting down listener" << std::endl;
+ receiver.cancel();
+ finished = true;
+ }
+}
+
+int main(int argc, char** argv) {
+ const char* url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672";
+ const char* pattern = argc>2 ? argv[2] : "#.#";
+
+ try {
+ Connection connection = Connection::open(url);
+ Session session = connection.newSession();
+
+ Filter filter(Filter::WILDCARD, pattern, "control");
+ Receiver receiver = session.createReceiver("news_service", filter);
+ Listener listener(receiver);
+ receiver.setListener(&listener);
+ receiver.setCapacity(1);
+ receiver.start();
+ while (session.dispatch() && !listener.isFinished()) ;
+ connection.close();
+ return 0;
+ } catch(const std::exception& error) {
+ std::cout << error.what() << std::endl;
+ }
+ return 1;
+}
+
+
diff --git a/qpid/cpp/examples/messaging/topic_receiver.cpp b/qpid/cpp/examples/messaging/topic_receiver.cpp
new file mode 100644
index 0000000000..063f0d9cb0
--- /dev/null
+++ b/qpid/cpp/examples/messaging/topic_receiver.cpp
@@ -0,0 +1,66 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/messaging/Address.h>
+#include <qpid/messaging/Connection.h>
+#include <qpid/messaging/Filter.h>
+#include <qpid/messaging/Message.h>
+#include <qpid/messaging/Receiver.h>
+#include <qpid/messaging/Session.h>
+
+#include <cstdlib>
+#include <iostream>
+
+#include <sstream>
+
+using namespace qpid::messaging;
+
+using std::stringstream;
+using std::string;
+
+int main(int argc, char** argv) {
+ const char* url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672";
+ const char* pattern = argc>2 ? argv[2] : "#.#";
+
+ try {
+ Connection connection = Connection::open(url);
+ Session session = connection.newSession();
+ Filter filter(Filter::WILDCARD, pattern, "control");
+ Receiver receiver = session.createReceiver(Address("news_service", "topic"), filter);
+ while (true) {
+ Message message = receiver.fetch();
+ std::cout << "Message: " << message.getContent().asString() << std::endl;
+ if (message.getContent().asString() == "That's all, folks!") {
+ std::cout << "Cancelling receiver" << std::endl;
+ receiver.cancel();
+ break;
+ }
+ }
+
+ connection.close();
+ return 0;
+ } catch(const std::exception& error) {
+ std::cout << error.what() << std::endl;
+ }
+ return 1;
+}
+
+
diff --git a/qpid/cpp/examples/messaging/topic_sender.cpp b/qpid/cpp/examples/messaging/topic_sender.cpp
new file mode 100644
index 0000000000..5665fc45f9
--- /dev/null
+++ b/qpid/cpp/examples/messaging/topic_sender.cpp
@@ -0,0 +1,78 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/messaging/Connection.h>
+#include <qpid/messaging/Message.h>
+#include <qpid/messaging/Sender.h>
+#include <qpid/messaging/Session.h>
+
+#include <cstdlib>
+#include <iostream>
+
+#include <sstream>
+
+using namespace qpid::messaging;
+
+using std::stringstream;
+using std::string;
+
+void sendMessages(Sender& sender, int count, const std::string& subject, const std::string& text)
+{
+ Message message;
+ message.setSubject(subject);
+ for (int i=0; i<count; i++) {
+ stringstream message_data;
+ message_data << text << i;
+
+ message.setContent(message_data.str());
+ sender.send(message);
+ }
+}
+
+int main(int argc, char** argv) {
+ const char* url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672";
+ int count = argc>2 ? atoi(argv[2]) : 10;
+
+ try {
+ Connection connection = Connection::open(url);
+ Session session = connection.newSession();
+ Sender sender = session.createSender("news_service");
+
+ // Now send some messages to each topic...
+ sendMessages(sender, count, "usa.news", "news about the usa");
+ sendMessages(sender, count, "usa.weather", "weather report for the usa");
+ sendMessages(sender, count, "europe.news", "news about europe");
+ sendMessages(sender, count, "europe.weather", "weather report for europe");
+
+ // And send a final message to indicate termination.
+ Message message("That's all, folks!");
+ message.setSubject("control");
+ sender.send(message);
+ session.sync();
+ connection.close();
+ return 0;
+ } catch(const std::exception& error) {
+ std::cout << error.what() << std::endl;
+ }
+ return 1;
+}
+
+