summaryrefslogtreecommitdiff
path: root/cpp/bindings/qpid/examples
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/bindings/qpid/examples')
-rw-r--r--cpp/bindings/qpid/examples/perl/README26
-rwxr-xr-x[-rw-r--r--]cpp/bindings/qpid/examples/perl/client.pl72
-rwxr-xr-x[-rw-r--r--]cpp/bindings/qpid/examples/perl/drain.pl136
-rwxr-xr-x[-rw-r--r--]cpp/bindings/qpid/examples/perl/hello_world.pl30
-rwxr-xr-x[-rw-r--r--]cpp/bindings/qpid/examples/perl/hello_xml.pl35
-rwxr-xr-x[-rw-r--r--]cpp/bindings/qpid/examples/perl/map_receiver.pl28
-rwxr-xr-x[-rw-r--r--]cpp/bindings/qpid/examples/perl/map_sender.pl46
-rwxr-xr-x[-rw-r--r--]cpp/bindings/qpid/examples/perl/server.pl53
-rwxr-xr-x[-rw-r--r--]cpp/bindings/qpid/examples/perl/spout.pl132
9 files changed, 337 insertions, 221 deletions
diff --git a/cpp/bindings/qpid/examples/perl/README b/cpp/bindings/qpid/examples/perl/README
deleted file mode 100644
index 1e113f1fa0..0000000000
--- a/cpp/bindings/qpid/examples/perl/README
+++ /dev/null
@@ -1,26 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-
-The examples in this directory are written against the raw Perl
-binding ("cqpid"). This binding is identical to the C++ messaging (in
-namespace qpid::messaging).
-
-It is desired that a layer will be written over this interface (called
-"qpid") that provides a more Perl-specific API. When this occurs,
-these examples will be changed to use the new Perl API.
-
diff --git a/cpp/bindings/qpid/examples/perl/client.pl b/cpp/bindings/qpid/examples/perl/client.pl
index 19d9d3f14f..586beb787e 100644..100755
--- a/cpp/bindings/qpid/examples/perl/client.pl
+++ b/cpp/bindings/qpid/examples/perl/client.pl
@@ -1,4 +1,4 @@
-#!/usr/bin/perl
+#!/usr/bin/env perl
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
@@ -20,47 +20,59 @@
use strict;
use warnings;
-use cqpid_perl;
+use qpid;
-my $url = ( @ARGV == 1 ) ? $ARGV[0] : "amqp:tcp:127.0.0.1:5672";
-my $connectionOptions = ( @ARGV > 1 ) ? $ARGV[1] : "";
+my $url = ( @ARGV == 1 ) ? $ARGV[0] : "amqp:tcp:127.0.0.1:5672";
+my $connectionOptions = ( @ARGV > 1 ) ? $ARGV[1] : "";
-
-my $connection = new cqpid_perl::Connection($url, $connectionOptions);
+# creates a new connection instance
+my $connection = new qpid::messaging::Connection( $url, $connectionOptions );
eval {
-$connection->open();
-my $session = $connection->createSession();
+ # open the connection and create a session for interacting with it
+ $connection->open();
-my $sender = $session->createSender("service_queue");
+ my $session = $connection->create_session();
+ my $sender = $session->create_sender("service_queue");
-#create temp queue & receiver...
-my $responseQueue = new cqpid_perl::Address("#response-queue; {create:always, delete:always}");
-my $receiver = $session->createReceiver($responseQueue);
+ # create an address and receiver for incoming messages
+ # the queue will be created always, and will be deleted
+ # when the receive disconnects
+ my $responseQueue = new qpid::messaging::Address(
+ "#response-queue; {create:always, delete:always}");
+ my $receiver = $session->create_receiver($responseQueue);
-#Now send some messages...
+ # Now send some messages...
-my @s = (
- "Twas brillig, and the slithy toves",
- "Did gire and gymble in the wabe.",
- "All mimsy were the borogroves,",
- "And the mome raths outgrabe."
- );
+ my @s = (
+ "Twas brillig, and the slithy toves",
+ "Did gire and gymble in the wabe.",
+ "All mimsy were the borogroves,",
+ "And the mome raths outgrabe."
+ );
-my $request = new cqpid_perl::Message();
-$request->setReplyTo($responseQueue);
-for (my $i=0; $i<4; $i++) {
- $request->setContent($s[$i]);
- $sender->send($request);
- my $response = $receiver->fetch();
- print $request->getContent() . " -> " . $response->getContent() . "\n";
-}
+ # create the message object, and set a reply-to address
+ # so that the server knows where to send responses
+ # the message object will be reused to send each line
+ my $request = new qpid::messaging::Message();
+ $request->set_reply_to($responseQueue);
+ for ( my $i = 0 ; $i < 4 ; $i++ ) {
+ $request->set_content( $s[$i] );
+ $sender->send($request);
+
+ # wait for the response to the last line sent
+ # the message will be taken directly from the
+ # broker's queue rather than waiting for it
+ # to be queued locally
+ my $response = $receiver->fetch();
+ print $request->get_content() . " -> "
+ . $response->get_content() . "\n";
+ }
-$connection->close();
+ # close the connection
+ $connection->close();
};
if ($@) {
die $@;
}
-
-
diff --git a/cpp/bindings/qpid/examples/perl/drain.pl b/cpp/bindings/qpid/examples/perl/drain.pl
index 60ac0c50ed..f7a710c485 100644..100755
--- a/cpp/bindings/qpid/examples/perl/drain.pl
+++ b/cpp/bindings/qpid/examples/perl/drain.pl
@@ -1,4 +1,4 @@
-#!/usr/bin/perl
+#!/usr/bin/env perl
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
@@ -20,79 +20,135 @@
use strict;
use warnings;
-use cqpid_perl;
+use qpid;
use Getopt::Long;
+use Pod::Usage;
-my $url = "127.0.0.1";
-my $timeout = 60;
-my $forever = 0;
-my $count = 1;
+my $url = "127.0.0.1";
+my $timeout = 0;
+my $forever = 0;
+my $count = 0;
my $connectionOptions = "";
-my $address = "amq.direct";
+my $address = "amq.direct";
+my $help;
my $result = GetOptions(
- "broker|b=s" => \ $url,
- "timeout|t=i" => \ $timeout,
- "forever|f" => \ $forever,
- "connection-options=s" => \ $connectionOptions,
- "count|c=i" => \ $count,
-);
-
-if (! $result) {
- print "Usage: perl drain.pl [OPTIONS]\n";
-}
+ "broker|b=s" => \$url,
+ "timeout|t=i" => \$timeout,
+ "forever|f" => \$forever,
+ "connection-options=s" => \$connectionOptions,
+ "count|c=i" => \$count,
+ "help|h" => \$help
+) || pod2usage( -verbose => 0 );
+
+pod2usage( -verbose => 1 ) if $help;
-if ($#ARGV ge 0) {
- $address = $ARGV[0]
+if ( $#ARGV ge 0 ) {
+ $address = $ARGV[0];
}
sub getTimeout {
- return ($forever) ? $cqpid_perl::Duration::FOREVER : new cqpid_perl::Duration($timeout*1000);
+
+ # returns either the named duration FOREVER if the
+ # forever cmdline argument was used, otherwise creates
+ # a new Duration of the specified length
+ return ($forever)
+ ? qpid::messaging::Duration::FOREVER
+ : new qpid::messaging::Duration( $timeout * 1000 );
}
+sub printProperties {
+ my $h = shift();
+ return qq[{${\(join', ',map"'$_': '$h->{$_}'",keys%$h)}}];
+}
-my $connection = new cqpid_perl::Connection($url, $connectionOptions);
+# create a connection object
+my $connection = new qpid::messaging::Connection( $url, $connectionOptions );
eval {
+ # open the connection, then create a session and receiver
$connection->open();
- my $session = $connection->createSession();
- my $receiver = $session->createReceiver($address);
+ my $session = $connection->create_session();
+ my $receiver = $session->create_receiver($address);
my $timeout = getTimeout();
+ my $message = new qpid::messaging::Message();
+ my $i = 0;
+
+ for ( ; ; ) {
+ eval { $message = $receiver->fetch($timeout); };
- my $message = new cqpid_perl::Message();
- my $i = 0;
+ if ($@) {
+ last;
+ }
+
+ # check if the message was on that was redelivered
+ my $redelivered =
+ ( $message->get_redelivered ) ? "redelivered=True, " : "";
+ print "Message("
+ . $redelivered
+ . "properties="
+ . printProperties( $message->get_properties() )
+ . ", content='";
- while($receiver->fetch($message, $timeout)) {
- print "Message(properties=" . $message->getProperties() . ",content='";
- if ($message->getContentType() eq "amqp/map") {
- my $content = cqpid_perl::decodeMap($message);
- map{ print "\n$_ => $content->{$_}"; } keys %{$content};
+ # if the message content was a map, then we will print
+ # it out as a series of name => value pairs
+ if ( $message->get_content_type() eq "amqp/map" ) {
+ my $content = $message->get_content();
+ map { print "\n$_ => $content->{$_}"; } keys %{$content};
}
else {
- print $message->getContent();
+ # it's not a map, so just print the content as a string
+ print $message->get_content();
}
print "')\n";
-
- my $replyto = $message->getReplyTo();
- if ($replyto->getName()) {
- print "Replying to " . $message->getReplyTo()->str() . "...\n";
- my $sender = $session->createSender($replyto);
- my $response = new cqpid_perl::Message("received by the server.");
+
+ # if the message had a reply-to address, then we'll send a
+ # response back letting the send know the message was processed
+ my $replyto = $message->get_reply_to();
+ if ( $replyto->get_name() ) {
+ print "Replying to " . $message->get_reply_to()->str() . "...\n";
+
+ # create a temporary sender for the specified queue
+ my $sender = $session->create_sender($replyto);
+ my $response =
+ new qpid::messaging::Message("received by the server.");
$sender->send($response);
}
+
+ # acknowledge all messages received on this queue so far
$session->acknowledge();
- if ($count and (++$i ==$count)) {
+ if ( $count and ( ++$i == $count ) ) {
last;
}
}
+
+ # close everything to clean up
$receiver->close();
$session->close();
$connection->close();
};
if ($@) {
- $connection->close();
- die $@;
+ $connection->close();
+ die $@;
}
+__END__
+
+=head1 NAME
+
+drain - Drains messages from the specified address
+
+=head1 SYNOPSIS
+
+ Options:
+ -h, --help show this message
+ -b VALUE, --broker VALUE url of broker to connect to
+ -t VALUE, --timeout VALUE timeout in seconds to wait before exiting
+ -f, --forever ignore timeout and wait forever
+ --connection-options VALUE connection options string in the form {name1:value1, name2:value2}
+ -c VALUE, --count VALUE number of messages to read before exiting
+
+=cut
+
diff --git a/cpp/bindings/qpid/examples/perl/hello_world.pl b/cpp/bindings/qpid/examples/perl/hello_world.pl
index a96b98a002..6ec7d52f1f 100644..100755
--- a/cpp/bindings/qpid/examples/perl/hello_world.pl
+++ b/cpp/bindings/qpid/examples/perl/hello_world.pl
@@ -1,4 +1,4 @@
-#!/usr/bin/perl
+#!/usr/bin/env perl
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
@@ -21,35 +21,35 @@ use strict;
use warnings;
use Data::Dumper;
-use cqpid_perl;
+use qpid;
my $broker = ( @ARGV > 0 ) ? $ARGV[0] : "localhost:5672";
my $address = ( @ARGV > 1 ) ? $ARGV[0] : "amq.topic";
my $connectionOptions = ( @ARGV > 2 ) ? $ARGV[1] : "";
-my $connection = new cqpid_perl::Connection($broker, $connectionOptions);
+# create a connection
+my $connection = new qpid::messaging::Connection( $broker, $connectionOptions );
eval {
+ # open the connection and create a session, and both a sender a receive
$connection->open();
- my $session = $connection->createSession();
- my $receiver = $session->createReceiver($address);
- my $sender = $session->createSender($address);
+ my $session = $connection->create_session();
- $sender->send(new cqpid_perl::Message("Hello world!"));
+ my $receiver = $session->create_receiver($address);
+ my $sender = $session->create_sender($address);
- #my $duration = new cqpid_perl::Duration(1000);
- #print ">>>" . $duration->getMilliseconds() . "\n";
+ # send a simple message
+ $sender->send( new qpid::messaging::Message("Hello world!") );
- my $message = $receiver->fetch($cqpid_perl::Duration::SECOND);
+ # receive the message, fetching it directly from the broker
+ my $message = $receiver->fetch(qpid::messaging::Duration::SECOND);
- #$message->setDurable(1);
- #print "Durable: " . $message->getDurable() . "\n";
- #print Dumper($message->getProperties());
-
- print $message->getContent() . "\n";
+ # output the message content, then acknowledge it
+ print $message->get_content() . "\n";
$session->acknowledge();
+ # close the connection
$connection->close();
};
diff --git a/cpp/bindings/qpid/examples/perl/hello_xml.pl b/cpp/bindings/qpid/examples/perl/hello_xml.pl
index cebf2ceee6..8d77c4b2b8 100644..100755
--- a/cpp/bindings/qpid/examples/perl/hello_xml.pl
+++ b/cpp/bindings/qpid/examples/perl/hello_xml.pl
@@ -1,4 +1,4 @@
-#!/usr/bin/perl
+#!/usr/bin/env perl
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
@@ -20,7 +20,7 @@
use strict;
use warnings;
-use cqpid_perl;
+use qpid;
my $broker = ( @ARGV > 0 ) ? $ARGV[0] : "localhost:5672";
my $connectionOptions = ( @ARGV > 1 ) ? $ARGV[1] : "";
@@ -36,23 +36,25 @@ END
my $address = <<END;
xml-exchange; {
-create: always,
+create: always,
node: { type: topic, x-declare: { type: xml } },
link: {
x-bindings: [{ exchange: xml-exchange, key: weather, arguments: { xquery:" $query" } }]
}}
END
-
-my $connection = new cqpid_perl::Connection($broker, $connectionOptions);
+# create a connection object
+my $connection = new qpid::messaging::Connection( $broker, $connectionOptions );
eval {
+ # open the connection, then create from it a session
+ # from the session, create a receiver to handle incoming messages
$connection->open();
- my $session = $connection->createSession();
+ my $session = $connection->create_session();
+ my $receiver = $session->create_receiver($address);
- my $receiver = $session->createReceiver($address);
-
- my $message = new cqpid_perl::Message();
+ # create a message and set its contentn
+ my $message = new qpid::messaging::Message();
my $content = <<END;
<weather>
@@ -62,14 +64,19 @@ eval {
<dewpoint>35</dewpoint>
</weather>
END
-
- $message->setContent($content);
- my $sender = $session->createSender('xml-exchange/weather');
+
+ $message->set_content($content);
+
+ # create a sender for the xml-exchange/weater topic
+ # then send the message
+ my $sender = $session->create_sender('xml-exchange/weather');
$sender->send($message);
-
+
+ # wait for the response and then output it to the screen
my $response = $receiver->fetch();
- print $response->getContent() . "\n";
+ print $response->get_content() . "\n";
+ # close the connection
$connection->close();
};
diff --git a/cpp/bindings/qpid/examples/perl/map_receiver.pl b/cpp/bindings/qpid/examples/perl/map_receiver.pl
index 2e2611e38f..a538adf380 100644..100755
--- a/cpp/bindings/qpid/examples/perl/map_receiver.pl
+++ b/cpp/bindings/qpid/examples/perl/map_receiver.pl
@@ -1,4 +1,4 @@
-#! /usr/bin/perl5
+#! /usr/bin/env perl
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
@@ -21,25 +21,33 @@ use strict;
use warnings;
use Data::Dumper;
-use cqpid_perl;
+use qpid;
-my $url = ( @ARGV > 0 ) ? $ARGV[0] : "amqp:tcp:127.0.0.1:5672";
-my $address = ( @ARGV > 1 ) ? $ARGV[0] : "message_queue; {create: always}";
+my $url = ( @ARGV > 0 ) ? $ARGV[0] : "amqp:tcp:127.0.0.1:5672";
+my $address = ( @ARGV > 1 ) ? $ARGV[0] : "message_queue; {create: always}";
my $connectionOptions = ( @ARGV > 2 ) ? $ARGV[1] : "";
-my $connection = new cqpid_perl::Connection($url, $connectionOptions);
+# create a connection object
+my $connection = new qpid::messaging::Connection( $url, $connectionOptions );
eval {
+ # open the connection, then create a session from it
$connection->open();
- my $session = $connection->createSession();
- my $receiver = $session->createReceiver($address);
+ my $session = $connection->create_session();
- my $content = cqpid_perl::decodeMap($receiver->fetch());
- #my $content = cqpid_perl::decodeList($receiver->fetch());
-
+ # create a receiver for the session, subscribed the the specified queue
+ my $receiver = $session->create_receiver($address);
+ # wait for a message to appear in the queue
+ my $message = $receiver->fetch();
+
+ # display the content of the message
+ my $content = $message->get_content();
print Dumper($content);
+ # acknowledge the message, removing it from the queue
$session->acknowledge();
+
+ # close everything, cleaning up
$receiver->close();
$connection->close();
};
diff --git a/cpp/bindings/qpid/examples/perl/map_sender.pl b/cpp/bindings/qpid/examples/perl/map_sender.pl
index 4107cd48b9..27063ef780 100644..100755
--- a/cpp/bindings/qpid/examples/perl/map_sender.pl
+++ b/cpp/bindings/qpid/examples/perl/map_sender.pl
@@ -1,4 +1,4 @@
-#! /usr/bin/perl5
+#! /usr/bin/env perl
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
@@ -21,29 +21,39 @@ use strict;
use warnings;
use Data::Dumper;
-use cqpid_perl;
+use qpid;
-my $url = ( @ARGV > 0 ) ? $ARGV[0] : "amqp:tcp:127.0.0.1:5672";
-my $address = ( @ARGV > 1 ) ? $ARGV[1] : "message_queue; {create: always}";
+my $url = ( @ARGV > 0 ) ? $ARGV[0] : "amqp:tcp:127.0.0.1:5672";
+my $address = ( @ARGV > 1 ) ? $ARGV[1] : "message_queue; {create: always}";
my $connectionOptions = ( @ARGV > 2 ) ? $ARGV[2] : "";
-my $connection = new cqpid_perl::Connection($url, $connectionOptions);
+# create a new connection object
+my $connection = new qpid::messaging::Connection( $url, $connectionOptions );
eval {
- $connection->open();
-
- my $session = $connection->createSession();
- my $sender = $session->createSender($address);
-
- my $message = new cqpid_perl::Message();
- my $content = { id => 987654321,
- name => "Widget",
- percent => sprintf("%.2f", 0.99),
- colours => [ qw (red green white) ],
- };
- cqpid_perl::encode($content, $message);
- $sender->send($message, 1);
+ # open the connection and create a session
+ $connection->open();
+ my $session = $connection->create_session();
+
+ # create a sender and connect it to the supplied address string
+ my $sender = $session->create_sender($address);
+
+ # create a message and set the content to be a map of values
+ my $message = new qpid::messaging::Message();
+ my $content = {
+ id => 987654321,
+ name => "Widget",
+ percent => sprintf( "%.2f", 0.99 ),
+ colours => [qw (red green white)],
+ };
+ $message->set_content($content);
+
+ # send the message
+ $sender->send( $message, 1 );
+
+ # close the connection and session
+ $session->close();
$connection->close();
};
diff --git a/cpp/bindings/qpid/examples/perl/server.pl b/cpp/bindings/qpid/examples/perl/server.pl
index b14da565b9..be43655aeb 100644..100755
--- a/cpp/bindings/qpid/examples/perl/server.pl
+++ b/cpp/bindings/qpid/examples/perl/server.pl
@@ -1,4 +1,4 @@
-#!/usr/bin/perl
+#!/usr/bin/env perl
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
@@ -20,43 +20,64 @@
use strict;
use warnings;
-use cqpid_perl;
+use qpid;
-my $url = ( @ARGV == 1 ) ? $ARGV[0] : "amqp:tcp:127.0.0.1:5672";
-my $connectionOptions = ( @ARGV > 1 ) ? $ARGV[1] : "";
+my $url = ( @ARGV == 1 ) ? $ARGV[0] : "amqp:tcp:127.0.0.1:5672";
+my $connectionOptions = ( @ARGV > 1 ) ? $ARGV[1] : "";
-
-my $connection = new cqpid_perl::Connection($url, $connectionOptions);
+# create a connection object
+my $connection = new qpid::messaging::Connection( $url, $connectionOptions );
eval {
+
+ # connect to the broker and create a session
$connection->open();
- my $session = $connection->createSession();
+ my $session = $connection->create_session();
- my $receiver = $session->createReceiver("service_queue; {create: always}");
+ # create a receiver for accepting incoming messages
+ my $receiver = $session->create_receiver("service_queue; {create: always}");
+ # go into an infinite loop to receive messages and process them
while (1) {
+
+ # wait for the next message to be processed
my $request = $receiver->fetch();
- my $address = $request->getReplyTo();
+
+
+ # get the address for sending replies
+ # if no address was supplised then we can't really respond, so
+ # only process when one is present
+ my $address = $request->get_reply_to();
if ($address) {
- my $sender = $session->createSender($address);
- my $s = $request->getContent();
+
+ # a temporary sender for sending to the response queue
+ my $sender = $session->create_sender($address);
+ my $s = $request->get_content();
$s = uc($s);
- my $response = new cqpid_perl::Message($s);
+
+ # create the response message and send it
+ my $response = new qpid::messaging::Message($s);
$sender->send($response);
- print "Processed request: " . $request->getContent() . " -> " . $response->getContent() . "\n";
+ print "Processed request: "
+ . $request->get_content() . " -> "
+ . $response->get_content() . "\n";
+
+ # acknowledge the message since it was processed
$session->acknowledge();
}
else {
- print "Error: no reply address specified for request: " . $request->getContent() . "\n";
+ print "Error: no reply address specified for request: "
+ . $request->get_content() . "\n";
$session->reject($request);
}
}
-$connection->close();
+ # close connections to clean up
+ $session->close();
+ $connection->close();
};
if ($@) {
die $@;
}
-
diff --git a/cpp/bindings/qpid/examples/perl/spout.pl b/cpp/bindings/qpid/examples/perl/spout.pl
index 7365e732bf..d8ac860143 100644..100755
--- a/cpp/bindings/qpid/examples/perl/spout.pl
+++ b/cpp/bindings/qpid/examples/perl/spout.pl
@@ -1,4 +1,4 @@
-#!/usr/bin/perl
+#!/usr/bin/env perl
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
@@ -20,117 +20,145 @@
use strict;
use warnings;
-use cqpid_perl;
+use qpid;
use Getopt::Long;
+use Pod::Usage;
use Time::Local;
-my $url = "127.0.0.1";
+my $url = "127.0.0.1";
my $timeout = 0;
my $count = 1;
my $id = "";
my $replyto = "";
my @properties;
my @entries;
-my $content = "";
+my $content = "";
my $connectionOptions = "";
-my $address = "amq.direct";
+my $address = "amq.direct";
+my $help;
my $result = GetOptions(
- "broker|b=s" => \ $url,
- "timeout|t=i" => \ $timeout,
- "count|c=i" => \ $count,
- "id|i=s" => \ $id,
- "replyto=s" => \ $replyto,
- "property|p=s@" => \ @properties,
- "map|m=s@" => \ @entries,
- "content=s" => \ $content,
- "connection-options=s" => \ $connectionOptions,
-);
-
-
-if (! $result) {
- print "Usage: perl drain.pl [OPTIONS]\n";
+ "broker|b=s" => \$url,
+ "timeout|t=i" => \$timeout,
+ "count|c=i" => \$count,
+ "id|i=s" => \$id,
+ "replyto=s" => \$replyto,
+ "property|p=s@" => \@properties,
+ "map|m=s@" => \@entries,
+ "content=s" => \$content,
+ "connection-options=s" => \$connectionOptions,
+ "help|h" => \$help
+) || pod2usage( -verbose => 0 );
+
+pod2usage( -verbose => 1 ) if $help;
+
+if ( $#ARGV ge 0 ) {
+ $address = $ARGV[0];
}
-
-if ($#ARGV ge 0) {
- $address = $ARGV[0]
-}
-
-
sub setEntries {
my ($content) = @_;
foreach (@entries) {
- my ($name, $value) = split("=", $_);
+ my ( $name, $value ) = split( "=", $_ );
$content->{$name} = $value;
}
}
-
sub setProperties {
my ($message) = @_;
foreach (@properties) {
- my ($name, $value) = split("=", $_);
- $message->getProperties()->{$name} = $value;
+ my ( $name, $value ) = split( "=", $_ );
+ $message->setProperty( $name, $value );
}
}
-my $connection = new cqpid_perl::Connection($url, $connectionOptions);
+# create a connection object
+my $connection = new qpid::messaging::Connection( $url, $connectionOptions );
eval {
+ # open the connection, create a session and then a sender
$connection->open();
- my $session = $connection->createSession();
- my $sender = $session->createSender($address);
+ my $session = $connection->create_session();
+ my $sender = $session->create_sender($address);
- my $message = new cqpid_perl::Message();
+ # create a message to be sent
+ my $message = new qpid::messaging::Message();
setProperties($message) if (@properties);
if (@entries) {
my $content = {};
setEntries($content);
- cqpid_perl::encode($content, $message);
+ $message->set_content($content);
}
elsif ($content) {
- $message->setContent($content);
- $message->setContentType("text/plain");
+ $message->set_content($content);
+ $message->set_content_type("text/plain");
}
+ # if a reply-to address was supplied, then create a receiver from the
+ # session and wait for a response to be sent
my $receiver;
if ($replyto) {
- my $responseQueue = new cqpid_perl::Address($replyto);
- $receiver = $session->createReceiver($responseQueue);
- $message->setReplyTo($responseQueue);
+ my $responseQueue = new qpid::messaging::Address($replyto);
+ $receiver = $session->create_receiver($responseQueue);
+ $message->set_reply_to($responseQueue);
}
my $start = localtime;
- my @s = split(/[:\s]/, $start);
- my $s = "$s[3]$s[4]$s[5]";
- my $n = $s;
-
- for (my $i = 0;
- ($i < $count || $count == 0) and
- ($timeout == 0 || abs($n - $s) < $timeout);
- $i++) {
+ my @s = split( /[:\s]/, $start );
+ my $s = "$s[3]$s[4]$s[5]";
+ my $n = $s;
+
+ for (
+ my $i = 0 ;
+ ( $i < $count || $count == 0 )
+ and ( $timeout == 0 || abs( $n - $s ) < $timeout ) ;
+ $i++
+ )
+ {
$sender->send($message);
if ($receiver) {
+ print "Waiting for a response.\n";
my $response = $receiver->fetch();
- print "$i -> " . $response->getContent() . "\n";
+ print "$i -> " . $response->get_content() . "\n";
}
my $now = localtime;
- my @n = split(/[:\s]/, $now);
- my $n = "$n[3]$n[4]$n[5]";
+ my @n = split( /[:\s]/, $now );
+ my $n = "$n[3]$n[4]$n[5]";
}
$session->sync();
$connection->close();
};
if ($@) {
- $connection->close();
- die $@;
+ $connection->close();
+ die $@;
}
+__END__
+
+=head1 NAME
+
+spout - Send messages to the specified address
+
+=head1 SYNOPSIS
+
+ Usage: spout [OPTIONS] ADDRESS
+
+ Options:
+ -h, --help show this message
+ -b VALUE, --broker VALUE url of broker to connect to
+ -t VALUE, --timeout VALUE exit after the specified time
+ -c VALUE, --count VALUE stop after count messageshave been sent, zero disables
+ -i VALUE, --id VALUE use the supplied id instead of generating one
+ --replyto VALUE specify reply-to value
+ -P VALUE, --property VALUE specify message property
+ -M VALUE, --map VALUE specify entry for map content
+ --content VALUE specify textual content
+ --connection-options VALUE connection options string in the form {name1:value1, name2:value2}
+=cut