summaryrefslogtreecommitdiff
path: root/qpid/cpp/bindings/qpid/examples/perl/drain.pl
blob: 60ac0c50edb0991fa377f018cc8bfd10e3c28482 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
#!/usr/bin/perl
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#
use strict;
use warnings;

use cqpid_perl;
use Getopt::Long;

my $url = "127.0.0.1";
my $timeout = 60;
my $forever = 0;
my $count   = 1;
my $connectionOptions = "";
my $address = "amq.direct";

my $result = GetOptions(
    "broker|b=s"           => \ $url,
    "timeout|t=i"          => \ $timeout,
    "forever|f"            => \ $forever,
    "connection-options=s" => \ $connectionOptions,
    "count|c=i"            => \ $count,
);

if (! $result) {
    print "Usage: perl drain.pl [OPTIONS]\n";
}

if ($#ARGV ge 0) {
    $address = $ARGV[0]
}

sub getTimeout {
   return ($forever) ? $cqpid_perl::Duration::FOREVER : new cqpid_perl::Duration($timeout*1000);
}


my $connection = new cqpid_perl::Connection($url, $connectionOptions);

eval {
    $connection->open();
    my $session  = $connection->createSession();
    my $receiver = $session->createReceiver($address);
    my $timeout  = getTimeout();

    my $message = new cqpid_perl::Message();
    my $i = 0;

    while($receiver->fetch($message, $timeout)) {
        print "Message(properties=" . $message->getProperties() . ",content='";
        if ($message->getContentType() eq "amqp/map") {
            my $content = cqpid_perl::decodeMap($message);
            map{ print "\n$_ => $content->{$_}"; } keys %{$content};
        }
        else {
            print $message->getContent();
        }
        print "')\n";
       
        my $replyto = $message->getReplyTo();
        if ($replyto->getName()) {
            print "Replying to " . $message->getReplyTo()->str() . "...\n";
            my $sender = $session->createSender($replyto);
            my $response = new cqpid_perl::Message("received by the server.");
            $sender->send($response);
        }
        $session->acknowledge();

        if ($count and (++$i ==$count)) {
            last;
        }
    }
    $receiver->close();
    $session->close();
    $connection->close();
};

if ($@) {
  $connection->close();
  die $@;
}