#!/usr/bin/perl # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # use strict; use warnings; use cqpid_perl; use Getopt::Long; use Time::Local; my $url = "127.0.0.1"; my $timeout = 0; my $count = 1; my $id = ""; my $replyto = ""; my @properties; my @entries; my $content = ""; my $connectionOptions = ""; my $address = "amq.direct"; my $result = GetOptions( "broker|b=s" => \ $url, "timeout|t=i" => \ $timeout, "count|c=i" => \ $count, "id|i=s" => \ $id, "replyto=s" => \ $replyto, "property|p=s@" => \ @properties, "map|m=s@" => \ @entries, "content=s" => \ $content, "connection-options=s" => \ $connectionOptions, ); if (! $result) { print "Usage: perl drain.pl [OPTIONS]\n"; } if ($#ARGV ge 0) { $address = $ARGV[0] } sub setEntries { my ($content) = @_; foreach (@entries) { my ($name, $value) = split("=", $_); $content->{$name} = $value; } } sub setProperties { my ($message) = @_; foreach (@properties) { my ($name, $value) = split("=", $_); $message->getProperties()->{$name} = $value; } } my $connection = new cqpid_perl::Connection($url, $connectionOptions); eval { $connection->open(); my $session = $connection->createSession(); my $sender = $session->createSender($address); my $message = new cqpid_perl::Message(); setProperties($message) if (@properties); if (@entries) { my $content = {}; setEntries($content); cqpid_perl::encode($content, $message); } elsif ($content) { $message->setContent($content); $message->setContentType("text/plain"); } my $receiver; if ($replyto) { my $responseQueue = new cqpid_perl::Address($replyto); $receiver = $session->createReceiver($responseQueue); $message->setReplyTo($responseQueue); } my $start = localtime; my @s = split(/[:\s]/, $start); my $s = "$s[3]$s[4]$s[5]"; my $n = $s; for (my $i = 0; ($i < $count || $count == 0) and ($timeout == 0 || abs($n - $s) < $timeout); $i++) { $sender->send($message); if ($receiver) { my $response = $receiver->fetch(); print "$i -> " . $response->getContent() . "\n"; } my $now = localtime; my @n = split(/[:\s]/, $now); my $n = "$n[3]$n[4]$n[5]"; } $session->sync(); $connection->close(); }; if ($@) { $connection->close(); die $@; }