diff options
Diffstat (limited to 'cpp/bindings/qpid/examples/perl/drain.pl')
-rwxr-xr-x[-rw-r--r--] | cpp/bindings/qpid/examples/perl/drain.pl | 136 |
1 files changed, 96 insertions, 40 deletions
diff --git a/cpp/bindings/qpid/examples/perl/drain.pl b/cpp/bindings/qpid/examples/perl/drain.pl index 60ac0c50ed..f7a710c485 100644..100755 --- a/cpp/bindings/qpid/examples/perl/drain.pl +++ b/cpp/bindings/qpid/examples/perl/drain.pl @@ -1,4 +1,4 @@ -#!/usr/bin/perl +#!/usr/bin/env perl # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file @@ -20,79 +20,135 @@ use strict; use warnings; -use cqpid_perl; +use qpid; use Getopt::Long; +use Pod::Usage; -my $url = "127.0.0.1"; -my $timeout = 60; -my $forever = 0; -my $count = 1; +my $url = "127.0.0.1"; +my $timeout = 0; +my $forever = 0; +my $count = 0; my $connectionOptions = ""; -my $address = "amq.direct"; +my $address = "amq.direct"; +my $help; my $result = GetOptions( - "broker|b=s" => \ $url, - "timeout|t=i" => \ $timeout, - "forever|f" => \ $forever, - "connection-options=s" => \ $connectionOptions, - "count|c=i" => \ $count, -); - -if (! $result) { - print "Usage: perl drain.pl [OPTIONS]\n"; -} + "broker|b=s" => \$url, + "timeout|t=i" => \$timeout, + "forever|f" => \$forever, + "connection-options=s" => \$connectionOptions, + "count|c=i" => \$count, + "help|h" => \$help +) || pod2usage( -verbose => 0 ); + +pod2usage( -verbose => 1 ) if $help; -if ($#ARGV ge 0) { - $address = $ARGV[0] +if ( $#ARGV ge 0 ) { + $address = $ARGV[0]; } sub getTimeout { - return ($forever) ? $cqpid_perl::Duration::FOREVER : new cqpid_perl::Duration($timeout*1000); + + # returns either the named duration FOREVER if the + # forever cmdline argument was used, otherwise creates + # a new Duration of the specified length + return ($forever) + ? qpid::messaging::Duration::FOREVER + : new qpid::messaging::Duration( $timeout * 1000 ); } +sub printProperties { + my $h = shift(); + return qq[{${\(join', ',map"'$_': '$h->{$_}'",keys%$h)}}]; +} -my $connection = new cqpid_perl::Connection($url, $connectionOptions); +# create a connection object +my $connection = new qpid::messaging::Connection( $url, $connectionOptions ); eval { + # open the connection, then create a session and receiver $connection->open(); - my $session = $connection->createSession(); - my $receiver = $session->createReceiver($address); + my $session = $connection->create_session(); + my $receiver = $session->create_receiver($address); my $timeout = getTimeout(); + my $message = new qpid::messaging::Message(); + my $i = 0; + + for ( ; ; ) { + eval { $message = $receiver->fetch($timeout); }; - my $message = new cqpid_perl::Message(); - my $i = 0; + if ($@) { + last; + } + + # check if the message was on that was redelivered + my $redelivered = + ( $message->get_redelivered ) ? "redelivered=True, " : ""; + print "Message(" + . $redelivered + . "properties=" + . printProperties( $message->get_properties() ) + . ", content='"; - while($receiver->fetch($message, $timeout)) { - print "Message(properties=" . $message->getProperties() . ",content='"; - if ($message->getContentType() eq "amqp/map") { - my $content = cqpid_perl::decodeMap($message); - map{ print "\n$_ => $content->{$_}"; } keys %{$content}; + # if the message content was a map, then we will print + # it out as a series of name => value pairs + if ( $message->get_content_type() eq "amqp/map" ) { + my $content = $message->get_content(); + map { print "\n$_ => $content->{$_}"; } keys %{$content}; } else { - print $message->getContent(); + # it's not a map, so just print the content as a string + print $message->get_content(); } print "')\n"; - - my $replyto = $message->getReplyTo(); - if ($replyto->getName()) { - print "Replying to " . $message->getReplyTo()->str() . "...\n"; - my $sender = $session->createSender($replyto); - my $response = new cqpid_perl::Message("received by the server."); + + # if the message had a reply-to address, then we'll send a + # response back letting the send know the message was processed + my $replyto = $message->get_reply_to(); + if ( $replyto->get_name() ) { + print "Replying to " . $message->get_reply_to()->str() . "...\n"; + + # create a temporary sender for the specified queue + my $sender = $session->create_sender($replyto); + my $response = + new qpid::messaging::Message("received by the server."); $sender->send($response); } + + # acknowledge all messages received on this queue so far $session->acknowledge(); - if ($count and (++$i ==$count)) { + if ( $count and ( ++$i == $count ) ) { last; } } + + # close everything to clean up $receiver->close(); $session->close(); $connection->close(); }; if ($@) { - $connection->close(); - die $@; + $connection->close(); + die $@; } +__END__ + +=head1 NAME + +drain - Drains messages from the specified address + +=head1 SYNOPSIS + + Options: + -h, --help show this message + -b VALUE, --broker VALUE url of broker to connect to + -t VALUE, --timeout VALUE timeout in seconds to wait before exiting + -f, --forever ignore timeout and wait forever + --connection-options VALUE connection options string in the form {name1:value1, name2:value2} + -c VALUE, --count VALUE number of messages to read before exiting + +=cut + |