summaryrefslogtreecommitdiff
path: root/qpid/cpp/bindings/qpid/examples/perl/drain.pl
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/bindings/qpid/examples/perl/drain.pl')
-rwxr-xr-xqpid/cpp/bindings/qpid/examples/perl/drain.pl154
1 files changed, 154 insertions, 0 deletions
diff --git a/qpid/cpp/bindings/qpid/examples/perl/drain.pl b/qpid/cpp/bindings/qpid/examples/perl/drain.pl
new file mode 100755
index 0000000000..d0150854b2
--- /dev/null
+++ b/qpid/cpp/bindings/qpid/examples/perl/drain.pl
@@ -0,0 +1,154 @@
+#!/usr/bin/env perl
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+use strict;
+use warnings;
+
+use qpid;
+use Getopt::Long;
+use Pod::Usage;
+
+my $url = "127.0.0.1";
+my $timeout = 0;
+my $forever = 0;
+my $count = 0;
+my $connectionOptions = "";
+my $address = "amq.direct";
+my $help;
+
+my $result = GetOptions(
+ "broker|b=s" => \$url,
+ "timeout|t=i" => \$timeout,
+ "forever|f" => \$forever,
+ "connection-options=s" => \$connectionOptions,
+ "count|c=i" => \$count,
+ "help|h" => \$help
+) || pod2usage( -verbose => 0 );
+
+pod2usage( -verbose => 1 ) if $help;
+
+if ( $#ARGV ge 0 ) {
+ $address = $ARGV[0];
+}
+
+sub getTimeout {
+
+ # returns either the named duration FOREVER if the
+ # forever cmdline argument was used, otherwise creates
+ # a new Duration of the specified length
+ return ($forever)
+ ? qpid::messaging::Duration::FOREVER
+ : new qpid::messaging::Duration( $timeout * 1000 );
+}
+
+sub printProperties {
+ my $h = shift();
+ return qq[{${\(join', ',map"'$_': '$h->{$_}'",keys%$h)}}];
+}
+
+# create a connection object
+my $connection = new qpid::messaging::Connection( $url, $connectionOptions );
+
+eval {
+ # open the connection, then create a session and receiver
+ $connection->open();
+ my $session = $connection->create_session();
+ my $receiver = $session->create_receiver($address);
+ my $timeout = getTimeout();
+ my $message = new qpid::messaging::Message();
+ my $i = 0;
+
+ for ( ; ; ) {
+ eval { $message = $receiver->fetch($timeout); };
+
+ if ($@) {
+ last;
+ }
+
+ # check if the message was on that was redelivered
+ my $redelivered =
+ ( $message->get_redelivered ) ? "redelivered=True, " : "";
+ print "Message("
+ . $redelivered
+ . "properties="
+ . printProperties( $message->get_properties() )
+ . ", content='";
+
+ # if the message content was a map, then we will print
+ # it out as a series of name => value pairs
+ my $content = $message->get_content_object;
+ if ( $message->get_content_type() eq "amqp/map" ) {
+ map { print "\n$_ => $content->{$_}"; } keys %{$content};
+ }
+ else {
+ # it's not a map, so just print the content as a string
+ print $content;
+ }
+ print "')\n";
+
+ # if the message had a reply-to address, then we'll send a
+ # response back letting the send know the message was processed
+ my $replyto = $message->get_reply_to();
+ if ( $replyto->get_name() ) {
+ print "Replying to " . $message->get_reply_to()->str() . "...\n";
+
+ # create a temporary sender for the specified queue
+ my $sender = $session->create_sender($replyto);
+ my $response =
+ new qpid::messaging::Message("received by the server.");
+ $sender->send($response);
+ }
+
+ # acknowledge all messages received on this queue so far
+ $session->acknowledge();
+
+ if ( $count and ( ++$i == $count ) ) {
+ last;
+ }
+ }
+
+ # close everything to clean up
+ $receiver->close();
+ $session->close();
+ $connection->close();
+};
+
+if ($@) {
+ $connection->close();
+ die $@;
+}
+
+__END__
+
+=head1 NAME
+
+drain - Drains messages from the specified address
+
+=head1 SYNOPSIS
+
+ Options:
+ -h, --help show this message
+ -b VALUE, --broker VALUE url of broker to connect to
+ -t VALUE, --timeout VALUE timeout in seconds to wait before exiting
+ -f, --forever ignore timeout and wait forever
+ --connection-options VALUE connection options string in the form {name1:value1, name2:value2}
+ -c VALUE, --count VALUE number of messages to read before exiting
+
+=cut
+