diff options
Diffstat (limited to 'qpid/cpp/examples')
-rw-r--r-- | qpid/cpp/examples/CMakeLists.txt | 1 | ||||
-rw-r--r-- | qpid/cpp/examples/Makefile.am | 2 | ||||
-rw-r--r-- | qpid/cpp/examples/messaging/CMakeLists.txt | 32 | ||||
-rw-r--r-- | qpid/cpp/examples/messaging/Makefile.am | 54 | ||||
-rw-r--r-- | qpid/cpp/examples/messaging/client.cpp | 76 | ||||
-rw-r--r-- | qpid/cpp/examples/messaging/map_receiver.cpp | 54 | ||||
-rw-r--r-- | qpid/cpp/examples/messaging/map_sender.cpp | 66 | ||||
-rw-r--r-- | qpid/cpp/examples/messaging/queue_listener.cpp | 82 | ||||
-rw-r--r-- | qpid/cpp/examples/messaging/queue_receiver.cpp | 65 | ||||
-rw-r--r-- | qpid/cpp/examples/messaging/queue_sender.cpp | 65 | ||||
-rw-r--r-- | qpid/cpp/examples/messaging/server.cpp | 76 | ||||
-rw-r--r-- | qpid/cpp/examples/messaging/topic_listener.cpp | 82 | ||||
-rw-r--r-- | qpid/cpp/examples/messaging/topic_receiver.cpp | 66 | ||||
-rw-r--r-- | qpid/cpp/examples/messaging/topic_sender.cpp | 78 |
14 files changed, 798 insertions, 1 deletions
diff --git a/qpid/cpp/examples/CMakeLists.txt b/qpid/cpp/examples/CMakeLists.txt index 7d0a4d0d21..126adab32e 100644 --- a/qpid/cpp/examples/CMakeLists.txt +++ b/qpid/cpp/examples/CMakeLists.txt @@ -62,3 +62,4 @@ add_subdirectory(qmf-console) add_subdirectory(request-response) add_subdirectory(tradedemo) add_subdirectory(xml-exchange) +add_subdirectory(messaging) diff --git a/qpid/cpp/examples/Makefile.am b/qpid/cpp/examples/Makefile.am index 9c355b84e4..b526424e2a 100644 --- a/qpid/cpp/examples/Makefile.am +++ b/qpid/cpp/examples/Makefile.am @@ -16,7 +16,7 @@ # specific language governing permissions and limitations # under the License. # -SUBDIRS = direct fanout pub-sub request-response failover qmf-console tradedemo +SUBDIRS = direct fanout pub-sub request-response failover qmf-console tradedemo messaging if HAVE_XML SUBDIRS += xml-exchange broker_args = "--no-module-dir --data-dir \"\" --auth no --load-module $(top_builddir)/src/.libs/xml.so" diff --git a/qpid/cpp/examples/messaging/CMakeLists.txt b/qpid/cpp/examples/messaging/CMakeLists.txt new file mode 100644 index 0000000000..e7885d0b50 --- /dev/null +++ b/qpid/cpp/examples/messaging/CMakeLists.txt @@ -0,0 +1,32 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +add_example(messaging queue_listener) +add_example(messaging queue_receiver) +add_example(messaging queue_sender) + +add_example(messaging topic_listener) +add_example(messaging topic_receiver) +add_example(messaging topic_sender) + +add_example(messaging map_receiver) +add_example(messaging map_sender) + +add_example(messaging client) +add_example(messaging server) diff --git a/qpid/cpp/examples/messaging/Makefile.am b/qpid/cpp/examples/messaging/Makefile.am new file mode 100644 index 0000000000..250e549e82 --- /dev/null +++ b/qpid/cpp/examples/messaging/Makefile.am @@ -0,0 +1,54 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +examplesdir=$(pkgdatadir)/examples/messaging + +MAKELDFLAGS=$(CLIENTFLAGS) +include $(top_srcdir)/examples/makedist.mk + +noinst_PROGRAMS=queue_sender queue_listener queue_receiver topic_sender topic_listener topic_receiver client server map_sender map_receiver + +queue_sender_SOURCES=queue_sender.cpp +queue_sender_LDADD=$(CLIENT_LIB) + +queue_listener_SOURCES=queue_listener.cpp +queue_listener_LDADD=$(CLIENT_LIB) + +queue_receiver_SOURCES=queue_receiver.cpp +queue_receiver_LDADD=$(CLIENT_LIB) + +topic_sender_SOURCES=topic_sender.cpp +topic_sender_LDADD=$(CLIENT_LIB) + +topic_listener_SOURCES=topic_listener.cpp +topic_listener_LDADD=$(CLIENT_LIB) + +topic_receiver_SOURCES=topic_receiver.cpp +topic_receiver_LDADD=$(CLIENT_LIB) + +client_SOURCES=client.cpp +client_LDADD=$(CLIENT_LIB) + +server_SOURCES=server.cpp +server_LDADD=$(CLIENT_LIB) + +map_sender_SOURCES=map_sender.cpp +map_sender_LDADD=$(CLIENT_LIB) + +map_receiver_SOURCES=map_receiver.cpp +map_receiver_LDADD=$(CLIENT_LIB) diff --git a/qpid/cpp/examples/messaging/client.cpp b/qpid/cpp/examples/messaging/client.cpp new file mode 100644 index 0000000000..45c065880b --- /dev/null +++ b/qpid/cpp/examples/messaging/client.cpp @@ -0,0 +1,76 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qpid/messaging/Address.h> +#include <qpid/messaging/Connection.h> +#include <qpid/messaging/Message.h> +#include <qpid/messaging/Receiver.h> +#include <qpid/messaging/Sender.h> +#include <qpid/messaging/Session.h> + +#include <cstdlib> +#include <iostream> + +#include <sstream> + +using namespace qpid::messaging; + +using std::stringstream; +using std::string; + +int main(int argc, char** argv) { + const char* url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672"; + + try { + Connection connection = Connection::open(url); + Session session = connection.newSession(); + + Sender sender = session.createSender("service_queue"); + + //create temp queue & receiver... + Address responseQueue = session.createTempQueue(); + Receiver receiver = session.createReceiver(responseQueue); + + // Now send some messages ... + string s[] = { + "Twas brillig, and the slithy toves", + "Did gire and gymble in the wabe.", + "All mimsy were the borogroves,", + "And the mome raths outgrabe." + }; + + Message request; + request.setReplyTo(responseQueue); + for (int i=0; i<4; i++) { + request.setContent(s[i]); + sender.send(request); + Message response = receiver.fetch(); + std::cout << request.getContent().asString() << " -> " << response.getContent().asString() << std::endl; + } + connection.close(); + return 0; + } catch(const std::exception& error) { + std::cout << error.what() << std::endl; + } + return 1; +} + + diff --git a/qpid/cpp/examples/messaging/map_receiver.cpp b/qpid/cpp/examples/messaging/map_receiver.cpp new file mode 100644 index 0000000000..e6557b1560 --- /dev/null +++ b/qpid/cpp/examples/messaging/map_receiver.cpp @@ -0,0 +1,54 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qpid/messaging/Connection.h> +#include <qpid/messaging/Message.h> +#include <qpid/messaging/Receiver.h> +#include <qpid/messaging/Session.h> + +#include <cstdlib> +#include <iostream> + +#include <sstream> + +using namespace qpid::messaging; + +using std::stringstream; +using std::string; + +int main(int argc, char** argv) { + const char* url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672"; + + try { + Connection connection = Connection::open(url); + Session session = connection.newSession(); + Receiver receiver = session.createReceiver("message_queue"); + Message message = receiver.fetch(); + std::cout << message.getContent().asMap() << std::endl; + session.acknowledge(); + receiver.cancel(); + connection.close(); + return 0; + } catch(const std::exception& error) { + std::cout << error.what() << std::endl; + } + return 1; +} diff --git a/qpid/cpp/examples/messaging/map_sender.cpp b/qpid/cpp/examples/messaging/map_sender.cpp new file mode 100644 index 0000000000..9301c1fe1f --- /dev/null +++ b/qpid/cpp/examples/messaging/map_sender.cpp @@ -0,0 +1,66 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qpid/messaging/Connection.h> +#include <qpid/messaging/Message.h> +#include <qpid/messaging/Sender.h> +#include <qpid/messaging/Session.h> + +#include <cstdlib> +#include <iostream> + +#include <sstream> + +using namespace qpid::messaging; + +using std::stringstream; +using std::string; + +int main(int argc, char** argv) { + const char* url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672"; + + try { + Connection connection = Connection::open(url); + Session session = connection.newSession(); + Sender sender = session.createSender("message_queue"); + + Message message; + message.getContent()["id"] = 987654321; + message.getContent()["name"] = "Widget"; + message.getContent()["price"] = 0.99;//bad use of floating point number, just an example! + Variant::List colours; + colours.push_back(Variant("red")); + colours.push_back(Variant("green")); + colours.push_back(Variant("white")); + message.getContent()["colours"] = colours; + + sender.send(message); + session.sync(); + + connection.close(); + return 0; + } catch(const std::exception& error) { + std::cout << error.what() << std::endl; + } + return 1; +} + + diff --git a/qpid/cpp/examples/messaging/queue_listener.cpp b/qpid/cpp/examples/messaging/queue_listener.cpp new file mode 100644 index 0000000000..099e8e145a --- /dev/null +++ b/qpid/cpp/examples/messaging/queue_listener.cpp @@ -0,0 +1,82 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qpid/messaging/Connection.h> +#include <qpid/messaging/Session.h> +#include <qpid/messaging/Message.h> +#include <qpid/messaging/MessageListener.h> +#include <qpid/messaging/Receiver.h> + +#include <cstdlib> +#include <iostream> + +using namespace qpid::messaging; + +class Listener : public MessageListener +{ + public: + Listener(const Receiver& receiver); + void received(Message& message); + bool isFinished(); + private: + Receiver receiver; + bool finished; +}; + +Listener::Listener(const Receiver& r) : receiver(r), finished(false) {} + +bool Listener::isFinished() { return finished; } + +void Listener::received(Message& message) +{ + std::cout << "Message: " << message.getContent().asString() << std::endl; + if (message.getContent().asString() == "That's all, folks!") { + std::cout << "Shutting down listener" << std::endl; + receiver.cancel(); + finished = true; + } +} + +int main(int argc, char** argv) { + const char* url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672"; + + try { + Connection connection = Connection::open(url); + Session session = connection.newSession(); + + Receiver receiver = session.createReceiver("message_queue"); + Listener listener(receiver); + receiver.setListener(&listener); + receiver.setCapacity(1); + receiver.start(); + while (session.dispatch()) { + session.acknowledge(); + if (listener.isFinished()) break; + } + connection.close(); + return 0; + } catch(const std::exception& error) { + std::cout << error.what() << std::endl; + } + return 1; +} + + diff --git a/qpid/cpp/examples/messaging/queue_receiver.cpp b/qpid/cpp/examples/messaging/queue_receiver.cpp new file mode 100644 index 0000000000..83a44b2ca9 --- /dev/null +++ b/qpid/cpp/examples/messaging/queue_receiver.cpp @@ -0,0 +1,65 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qpid/messaging/Connection.h> +#include <qpid/messaging/Message.h> +#include <qpid/messaging/Receiver.h> +#include <qpid/messaging/Session.h> + +#include <cstdlib> +#include <iostream> + +#include <sstream> + +using namespace qpid::messaging; + +using std::stringstream; +using std::string; + +int main(int argc, char** argv) { + const char* url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672"; + + try { + Variant::Map options; + if (argc>2) parseOptionString(argv[2], options); + Connection connection = Connection::open(url, options); + Session session = connection.newSession(); + Receiver receiver = session.createReceiver("message_queue"); + while (true) { + Message message = receiver.fetch(); + std::cout << "Message: " << message.getContent() << std::endl; + session.acknowledge(); + if (message.getContent().asString() == "That's all, folks!") { + std::cout << "Cancelling receiver" << std::endl; + receiver.cancel(); + break; + } + } + + connection.close(); + return 0; + } catch(const std::exception& error) { + std::cout << error.what() << std::endl; + } + return 1; +} + + diff --git a/qpid/cpp/examples/messaging/queue_sender.cpp b/qpid/cpp/examples/messaging/queue_sender.cpp new file mode 100644 index 0000000000..637e7eb8e4 --- /dev/null +++ b/qpid/cpp/examples/messaging/queue_sender.cpp @@ -0,0 +1,65 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qpid/messaging/Connection.h> +#include <qpid/messaging/Message.h> +#include <qpid/messaging/Sender.h> +#include <qpid/messaging/Session.h> + +#include <cstdlib> +#include <iostream> + +#include <sstream> + +using namespace qpid::messaging; + +using std::stringstream; +using std::string; + +int main(int argc, char** argv) { + const char* url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672"; + int count = argc>2 ? atoi(argv[2]) : 10; + + try { + Connection connection = Connection::open(url); + Session session = connection.newSession(); + Sender sender = session.createSender("message_queue"); + + // Now send some messages ... + for (int i=0; i<count; i++) { + Message message; + message.getContent() << "Message " << i; + sender.send(message); + } + + // And send a final message to indicate termination. + Message message("That's all, folks!"); + sender.send(message); + session.sync(); + connection.close(); + return 0; + } catch(const std::exception& error) { + std::cout << error.what() << std::endl; + } + return 1; +} + + diff --git a/qpid/cpp/examples/messaging/server.cpp b/qpid/cpp/examples/messaging/server.cpp new file mode 100644 index 0000000000..38f4601ff6 --- /dev/null +++ b/qpid/cpp/examples/messaging/server.cpp @@ -0,0 +1,76 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qpid/messaging/Address.h> +#include <qpid/messaging/Connection.h> +#include <qpid/messaging/Message.h> +#include <qpid/messaging/Receiver.h> +#include <qpid/messaging/Sender.h> +#include <qpid/messaging/Session.h> +#include <qpid/messaging/Variant.h> + +#include <algorithm> +#include <cstdlib> +#include <iostream> +#include <memory> +#include <sstream> + +using namespace qpid::messaging; + +using std::stringstream; +using std::string; + +int main(int argc, char** argv) { + const char* url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672"; + + try { + Connection connection = Connection::open(url); + Session session = connection.newSession(); + Receiver receiver = session.createReceiver("service_queue"); + + while (true) { + Message request = receiver.fetch(); + const Address& address = request.getReplyTo(); + if (address) { + Sender sender = session.createSender(address); + std::string s = request.getContent().asString(); + std::transform(s.begin(), s.end(), s.begin(), toupper); + Message response(s); + sender.send(response); + std::cout << "Processed request: " + << request.getContent().asString() + << " -> " + << response.getContent().asString() << std::endl; + session.acknowledge(); + } else { + std::cerr << "Error: no reply address specified for request: " << request.getContent().asString() << std::endl; + session.reject(request); + } + } + connection.close(); + return 0; + } catch(const std::exception& error) { + std::cout << error.what() << std::endl; + } + return 1; +} + + diff --git a/qpid/cpp/examples/messaging/topic_listener.cpp b/qpid/cpp/examples/messaging/topic_listener.cpp new file mode 100644 index 0000000000..700e03cdf9 --- /dev/null +++ b/qpid/cpp/examples/messaging/topic_listener.cpp @@ -0,0 +1,82 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qpid/messaging/Connection.h> +#include <qpid/messaging/Filter.h> +#include <qpid/messaging/Message.h> +#include <qpid/messaging/MessageListener.h> +#include <qpid/messaging/Session.h> +#include <qpid/messaging/Receiver.h> + +#include <cstdlib> +#include <iostream> + +using namespace qpid::messaging; + +class Listener : public MessageListener +{ + public: + Listener(const Receiver& receiver); + void received(Message& message); + bool isFinished(); + private: + Receiver receiver; + bool finished; +}; + +Listener::Listener(const Receiver& r) : receiver(r), finished(false) {} + +bool Listener::isFinished() { return finished; } + +void Listener::received(Message& message) +{ + std::cout << "Message: " << message.getContent().asString() << std::endl; + if (message.getContent().asString() == "That's all, folks!") { + std::cout << "Shutting down listener" << std::endl; + receiver.cancel(); + finished = true; + } +} + +int main(int argc, char** argv) { + const char* url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672"; + const char* pattern = argc>2 ? argv[2] : "#.#"; + + try { + Connection connection = Connection::open(url); + Session session = connection.newSession(); + + Filter filter(Filter::WILDCARD, pattern, "control"); + Receiver receiver = session.createReceiver("news_service", filter); + Listener listener(receiver); + receiver.setListener(&listener); + receiver.setCapacity(1); + receiver.start(); + while (session.dispatch() && !listener.isFinished()) ; + connection.close(); + return 0; + } catch(const std::exception& error) { + std::cout << error.what() << std::endl; + } + return 1; +} + + diff --git a/qpid/cpp/examples/messaging/topic_receiver.cpp b/qpid/cpp/examples/messaging/topic_receiver.cpp new file mode 100644 index 0000000000..063f0d9cb0 --- /dev/null +++ b/qpid/cpp/examples/messaging/topic_receiver.cpp @@ -0,0 +1,66 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qpid/messaging/Address.h> +#include <qpid/messaging/Connection.h> +#include <qpid/messaging/Filter.h> +#include <qpid/messaging/Message.h> +#include <qpid/messaging/Receiver.h> +#include <qpid/messaging/Session.h> + +#include <cstdlib> +#include <iostream> + +#include <sstream> + +using namespace qpid::messaging; + +using std::stringstream; +using std::string; + +int main(int argc, char** argv) { + const char* url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672"; + const char* pattern = argc>2 ? argv[2] : "#.#"; + + try { + Connection connection = Connection::open(url); + Session session = connection.newSession(); + Filter filter(Filter::WILDCARD, pattern, "control"); + Receiver receiver = session.createReceiver(Address("news_service", "topic"), filter); + while (true) { + Message message = receiver.fetch(); + std::cout << "Message: " << message.getContent().asString() << std::endl; + if (message.getContent().asString() == "That's all, folks!") { + std::cout << "Cancelling receiver" << std::endl; + receiver.cancel(); + break; + } + } + + connection.close(); + return 0; + } catch(const std::exception& error) { + std::cout << error.what() << std::endl; + } + return 1; +} + + diff --git a/qpid/cpp/examples/messaging/topic_sender.cpp b/qpid/cpp/examples/messaging/topic_sender.cpp new file mode 100644 index 0000000000..5665fc45f9 --- /dev/null +++ b/qpid/cpp/examples/messaging/topic_sender.cpp @@ -0,0 +1,78 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qpid/messaging/Connection.h> +#include <qpid/messaging/Message.h> +#include <qpid/messaging/Sender.h> +#include <qpid/messaging/Session.h> + +#include <cstdlib> +#include <iostream> + +#include <sstream> + +using namespace qpid::messaging; + +using std::stringstream; +using std::string; + +void sendMessages(Sender& sender, int count, const std::string& subject, const std::string& text) +{ + Message message; + message.setSubject(subject); + for (int i=0; i<count; i++) { + stringstream message_data; + message_data << text << i; + + message.setContent(message_data.str()); + sender.send(message); + } +} + +int main(int argc, char** argv) { + const char* url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672"; + int count = argc>2 ? atoi(argv[2]) : 10; + + try { + Connection connection = Connection::open(url); + Session session = connection.newSession(); + Sender sender = session.createSender("news_service"); + + // Now send some messages to each topic... + sendMessages(sender, count, "usa.news", "news about the usa"); + sendMessages(sender, count, "usa.weather", "weather report for the usa"); + sendMessages(sender, count, "europe.news", "news about europe"); + sendMessages(sender, count, "europe.weather", "weather report for europe"); + + // And send a final message to indicate termination. + Message message("That's all, folks!"); + message.setSubject("control"); + sender.send(message); + session.sync(); + connection.close(); + return 0; + } catch(const std::exception& error) { + std::cout << error.what() << std::endl; + } + return 1; +} + + |