summaryrefslogtreecommitdiff
path: root/cpp/bindings/qpid/examples/perl/drain.pl
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/bindings/qpid/examples/perl/drain.pl')
-rwxr-xr-x[-rw-r--r--]cpp/bindings/qpid/examples/perl/drain.pl136
1 files changed, 96 insertions, 40 deletions
diff --git a/cpp/bindings/qpid/examples/perl/drain.pl b/cpp/bindings/qpid/examples/perl/drain.pl
index 60ac0c50ed..f7a710c485 100644..100755
--- a/cpp/bindings/qpid/examples/perl/drain.pl
+++ b/cpp/bindings/qpid/examples/perl/drain.pl
@@ -1,4 +1,4 @@
-#!/usr/bin/perl
+#!/usr/bin/env perl
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
@@ -20,79 +20,135 @@
use strict;
use warnings;
-use cqpid_perl;
+use qpid;
use Getopt::Long;
+use Pod::Usage;
-my $url = "127.0.0.1";
-my $timeout = 60;
-my $forever = 0;
-my $count = 1;
+my $url = "127.0.0.1";
+my $timeout = 0;
+my $forever = 0;
+my $count = 0;
my $connectionOptions = "";
-my $address = "amq.direct";
+my $address = "amq.direct";
+my $help;
my $result = GetOptions(
- "broker|b=s" => \ $url,
- "timeout|t=i" => \ $timeout,
- "forever|f" => \ $forever,
- "connection-options=s" => \ $connectionOptions,
- "count|c=i" => \ $count,
-);
-
-if (! $result) {
- print "Usage: perl drain.pl [OPTIONS]\n";
-}
+ "broker|b=s" => \$url,
+ "timeout|t=i" => \$timeout,
+ "forever|f" => \$forever,
+ "connection-options=s" => \$connectionOptions,
+ "count|c=i" => \$count,
+ "help|h" => \$help
+) || pod2usage( -verbose => 0 );
+
+pod2usage( -verbose => 1 ) if $help;
-if ($#ARGV ge 0) {
- $address = $ARGV[0]
+if ( $#ARGV ge 0 ) {
+ $address = $ARGV[0];
}
sub getTimeout {
- return ($forever) ? $cqpid_perl::Duration::FOREVER : new cqpid_perl::Duration($timeout*1000);
+
+ # returns either the named duration FOREVER if the
+ # forever cmdline argument was used, otherwise creates
+ # a new Duration of the specified length
+ return ($forever)
+ ? qpid::messaging::Duration::FOREVER
+ : new qpid::messaging::Duration( $timeout * 1000 );
}
+sub printProperties {
+ my $h = shift();
+ return qq[{${\(join', ',map"'$_': '$h->{$_}'",keys%$h)}}];
+}
-my $connection = new cqpid_perl::Connection($url, $connectionOptions);
+# create a connection object
+my $connection = new qpid::messaging::Connection( $url, $connectionOptions );
eval {
+ # open the connection, then create a session and receiver
$connection->open();
- my $session = $connection->createSession();
- my $receiver = $session->createReceiver($address);
+ my $session = $connection->create_session();
+ my $receiver = $session->create_receiver($address);
my $timeout = getTimeout();
+ my $message = new qpid::messaging::Message();
+ my $i = 0;
+
+ for ( ; ; ) {
+ eval { $message = $receiver->fetch($timeout); };
- my $message = new cqpid_perl::Message();
- my $i = 0;
+ if ($@) {
+ last;
+ }
+
+ # check if the message was on that was redelivered
+ my $redelivered =
+ ( $message->get_redelivered ) ? "redelivered=True, " : "";
+ print "Message("
+ . $redelivered
+ . "properties="
+ . printProperties( $message->get_properties() )
+ . ", content='";
- while($receiver->fetch($message, $timeout)) {
- print "Message(properties=" . $message->getProperties() . ",content='";
- if ($message->getContentType() eq "amqp/map") {
- my $content = cqpid_perl::decodeMap($message);
- map{ print "\n$_ => $content->{$_}"; } keys %{$content};
+ # if the message content was a map, then we will print
+ # it out as a series of name => value pairs
+ if ( $message->get_content_type() eq "amqp/map" ) {
+ my $content = $message->get_content();
+ map { print "\n$_ => $content->{$_}"; } keys %{$content};
}
else {
- print $message->getContent();
+ # it's not a map, so just print the content as a string
+ print $message->get_content();
}
print "')\n";
-
- my $replyto = $message->getReplyTo();
- if ($replyto->getName()) {
- print "Replying to " . $message->getReplyTo()->str() . "...\n";
- my $sender = $session->createSender($replyto);
- my $response = new cqpid_perl::Message("received by the server.");
+
+ # if the message had a reply-to address, then we'll send a
+ # response back letting the send know the message was processed
+ my $replyto = $message->get_reply_to();
+ if ( $replyto->get_name() ) {
+ print "Replying to " . $message->get_reply_to()->str() . "...\n";
+
+ # create a temporary sender for the specified queue
+ my $sender = $session->create_sender($replyto);
+ my $response =
+ new qpid::messaging::Message("received by the server.");
$sender->send($response);
}
+
+ # acknowledge all messages received on this queue so far
$session->acknowledge();
- if ($count and (++$i ==$count)) {
+ if ( $count and ( ++$i == $count ) ) {
last;
}
}
+
+ # close everything to clean up
$receiver->close();
$session->close();
$connection->close();
};
if ($@) {
- $connection->close();
- die $@;
+ $connection->close();
+ die $@;
}
+__END__
+
+=head1 NAME
+
+drain - Drains messages from the specified address
+
+=head1 SYNOPSIS
+
+ Options:
+ -h, --help show this message
+ -b VALUE, --broker VALUE url of broker to connect to
+ -t VALUE, --timeout VALUE timeout in seconds to wait before exiting
+ -f, --forever ignore timeout and wait forever
+ --connection-options VALUE connection options string in the form {name1:value1, name2:value2}
+ -c VALUE, --count VALUE number of messages to read before exiting
+
+=cut
+