diff options
Diffstat (limited to 'qpid/cpp/bindings/qpid/examples')
-rwxr-xr-x | qpid/cpp/bindings/qpid/examples/perl/client.pl | 76 | ||||
-rwxr-xr-x | qpid/cpp/bindings/qpid/examples/perl/drain.pl | 154 | ||||
-rwxr-xr-x | qpid/cpp/bindings/qpid/examples/perl/hello_world.pl | 56 | ||||
-rwxr-xr-x | qpid/cpp/bindings/qpid/examples/perl/hello_xml.pl | 83 | ||||
-rwxr-xr-x | qpid/cpp/bindings/qpid/examples/perl/map_receiver.pl | 55 | ||||
-rwxr-xr-x | qpid/cpp/bindings/qpid/examples/perl/map_sender.pl | 60 | ||||
-rwxr-xr-x | qpid/cpp/bindings/qpid/examples/perl/server.pl | 83 | ||||
-rwxr-xr-x | qpid/cpp/bindings/qpid/examples/perl/spout.pl | 169 | ||||
-rwxr-xr-x | qpid/cpp/bindings/qpid/examples/python/console | 99 | ||||
-rwxr-xr-x | qpid/cpp/bindings/qpid/examples/python/drain | 102 | ||||
-rwxr-xr-x | qpid/cpp/bindings/qpid/examples/python/hello | 56 | ||||
-rwxr-xr-x | qpid/cpp/bindings/qpid/examples/python/hello_xml | 81 | ||||
-rwxr-xr-x | qpid/cpp/bindings/qpid/examples/python/server | 100 | ||||
-rwxr-xr-x | qpid/cpp/bindings/qpid/examples/python/spout | 139 | ||||
-rw-r--r-- | qpid/cpp/bindings/qpid/examples/python/statistics.py | 139 |
15 files changed, 1452 insertions, 0 deletions
diff --git a/qpid/cpp/bindings/qpid/examples/perl/client.pl b/qpid/cpp/bindings/qpid/examples/perl/client.pl new file mode 100755 index 0000000000..ee7bc6cd53 --- /dev/null +++ b/qpid/cpp/bindings/qpid/examples/perl/client.pl @@ -0,0 +1,76 @@ +#!/usr/bin/env perl +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +use strict; +use warnings; + +use qpid; + +my $url = ( @ARGV == 1 ) ? $ARGV[0] : "amqp:tcp:127.0.0.1:5672"; +my $connectionOptions = ( @ARGV > 1 ) ? $ARGV[1] : ""; + +# creates a new connection instance +my $connection = new qpid::messaging::Connection( $url, $connectionOptions ); + +eval { + # open the connection and create a session for interacting with it + $connection->open(); + + my $session = $connection->create_session(); + my $sender = $session->create_sender("service_queue"); + + # create an address and receiver for incoming messages + # the queue will be created always, and will be deleted + # when the receive disconnects + my $receiver = $session->create_receiver("#"); + my $responseQueue = $receiver->get_address(); + # Now send some messages... + + my @s = ( + "Twas brillig, and the slithy toves", + "Did gire and gymble in the wabe.", + "All mimsy were the borogroves,", + "And the mome raths outgrabe." + ); + + # create the message object, and set a reply-to address + # so that the server knows where to send responses + # the message object will be reused to send each line + my $request = new qpid::messaging::Message(); + $request->set_reply_to($responseQueue); + for ( my $i = 0 ; $i < 4 ; $i++ ) { + $request->set_content( $s[$i] ); + $sender->send($request); + + # wait for the response to the last line sent + # the message will be taken directly from the + # broker's queue rather than waiting for it + # to be queued locally + my $response = $receiver->fetch(); + print $request->get_content() . " -> " + . $response->get_content() . "\n"; + } + + # close the connection + $connection->close(); +}; + +if ($@) { + die $@; +} diff --git a/qpid/cpp/bindings/qpid/examples/perl/drain.pl b/qpid/cpp/bindings/qpid/examples/perl/drain.pl new file mode 100755 index 0000000000..d0150854b2 --- /dev/null +++ b/qpid/cpp/bindings/qpid/examples/perl/drain.pl @@ -0,0 +1,154 @@ +#!/usr/bin/env perl +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +use strict; +use warnings; + +use qpid; +use Getopt::Long; +use Pod::Usage; + +my $url = "127.0.0.1"; +my $timeout = 0; +my $forever = 0; +my $count = 0; +my $connectionOptions = ""; +my $address = "amq.direct"; +my $help; + +my $result = GetOptions( + "broker|b=s" => \$url, + "timeout|t=i" => \$timeout, + "forever|f" => \$forever, + "connection-options=s" => \$connectionOptions, + "count|c=i" => \$count, + "help|h" => \$help +) || pod2usage( -verbose => 0 ); + +pod2usage( -verbose => 1 ) if $help; + +if ( $#ARGV ge 0 ) { + $address = $ARGV[0]; +} + +sub getTimeout { + + # returns either the named duration FOREVER if the + # forever cmdline argument was used, otherwise creates + # a new Duration of the specified length + return ($forever) + ? qpid::messaging::Duration::FOREVER + : new qpid::messaging::Duration( $timeout * 1000 ); +} + +sub printProperties { + my $h = shift(); + return qq[{${\(join', ',map"'$_': '$h->{$_}'",keys%$h)}}]; +} + +# create a connection object +my $connection = new qpid::messaging::Connection( $url, $connectionOptions ); + +eval { + # open the connection, then create a session and receiver + $connection->open(); + my $session = $connection->create_session(); + my $receiver = $session->create_receiver($address); + my $timeout = getTimeout(); + my $message = new qpid::messaging::Message(); + my $i = 0; + + for ( ; ; ) { + eval { $message = $receiver->fetch($timeout); }; + + if ($@) { + last; + } + + # check if the message was on that was redelivered + my $redelivered = + ( $message->get_redelivered ) ? "redelivered=True, " : ""; + print "Message(" + . $redelivered + . "properties=" + . printProperties( $message->get_properties() ) + . ", content='"; + + # if the message content was a map, then we will print + # it out as a series of name => value pairs + my $content = $message->get_content_object; + if ( $message->get_content_type() eq "amqp/map" ) { + map { print "\n$_ => $content->{$_}"; } keys %{$content}; + } + else { + # it's not a map, so just print the content as a string + print $content; + } + print "')\n"; + + # if the message had a reply-to address, then we'll send a + # response back letting the send know the message was processed + my $replyto = $message->get_reply_to(); + if ( $replyto->get_name() ) { + print "Replying to " . $message->get_reply_to()->str() . "...\n"; + + # create a temporary sender for the specified queue + my $sender = $session->create_sender($replyto); + my $response = + new qpid::messaging::Message("received by the server."); + $sender->send($response); + } + + # acknowledge all messages received on this queue so far + $session->acknowledge(); + + if ( $count and ( ++$i == $count ) ) { + last; + } + } + + # close everything to clean up + $receiver->close(); + $session->close(); + $connection->close(); +}; + +if ($@) { + $connection->close(); + die $@; +} + +__END__ + +=head1 NAME + +drain - Drains messages from the specified address + +=head1 SYNOPSIS + + Options: + -h, --help show this message + -b VALUE, --broker VALUE url of broker to connect to + -t VALUE, --timeout VALUE timeout in seconds to wait before exiting + -f, --forever ignore timeout and wait forever + --connection-options VALUE connection options string in the form {name1:value1, name2:value2} + -c VALUE, --count VALUE number of messages to read before exiting + +=cut + diff --git a/qpid/cpp/bindings/qpid/examples/perl/hello_world.pl b/qpid/cpp/bindings/qpid/examples/perl/hello_world.pl new file mode 100755 index 0000000000..917038f0ef --- /dev/null +++ b/qpid/cpp/bindings/qpid/examples/perl/hello_world.pl @@ -0,0 +1,56 @@ +#!/usr/bin/env perl +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +use strict; +use warnings; +use Data::Dumper; + +use qpid; + +my $broker = ( @ARGV > 0 ) ? $ARGV[0] : "localhost:5672"; +my $address = ( @ARGV > 1 ) ? $ARGV[1] : "amq.topic"; +my $connectionOptions = ( @ARGV > 2 ) ? $ARGV[2] : ""; + +# create a connection +my $connection = new qpid::messaging::Connection( $broker, $connectionOptions ); + +eval { + # open the connection and create a session, and both a sender a receive + $connection->open(); + + my $session = $connection->create_session(); + + my $receiver = $session->create_receiver($address); + my $sender = $session->create_sender($address); + + # send a simple message + $sender->send( new qpid::messaging::Message("Hello world!") ); + + # receive the message, fetching it directly from the broker + my $message = $receiver->fetch(qpid::messaging::Duration::SECOND, 1); + + # output the message content, then acknowledge it + print $message->get_content() . "\n"; + $session->acknowledge(); + + # close the connection + $connection->close(); +}; + +die $@ if ($@); diff --git a/qpid/cpp/bindings/qpid/examples/perl/hello_xml.pl b/qpid/cpp/bindings/qpid/examples/perl/hello_xml.pl new file mode 100755 index 0000000000..8d77c4b2b8 --- /dev/null +++ b/qpid/cpp/bindings/qpid/examples/perl/hello_xml.pl @@ -0,0 +1,83 @@ +#!/usr/bin/env perl +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +use strict; +use warnings; + +use qpid; + +my $broker = ( @ARGV > 0 ) ? $ARGV[0] : "localhost:5672"; +my $connectionOptions = ( @ARGV > 1 ) ? $ARGV[1] : ""; + +my $query = <<END; + let \$w := ./weather + return \$w/station = 'Raleigh-Durham International Airport (KRDU)' + and \$w/temperature_f > 50 + and \$w/temperature_f - \$w/dewpoint > 5 + and \$w/wind_speed_mph > 7 + and \$w/wind_speed_mph < 20 +END + +my $address = <<END; +xml-exchange; { +create: always, +node: { type: topic, x-declare: { type: xml } }, +link: { +x-bindings: [{ exchange: xml-exchange, key: weather, arguments: { xquery:" $query" } }] +}} +END + +# create a connection object +my $connection = new qpid::messaging::Connection( $broker, $connectionOptions ); + +eval { + # open the connection, then create from it a session + # from the session, create a receiver to handle incoming messages + $connection->open(); + my $session = $connection->create_session(); + my $receiver = $session->create_receiver($address); + + # create a message and set its contentn + my $message = new qpid::messaging::Message(); + + my $content = <<END; + <weather> + <station>Raleigh-Durham International Airport (KRDU)</station> + <wind_speed_mph>16</wind_speed_mph> + <temperature_f>70</temperature_f> + <dewpoint>35</dewpoint> + </weather> +END + + $message->set_content($content); + + # create a sender for the xml-exchange/weater topic + # then send the message + my $sender = $session->create_sender('xml-exchange/weather'); + $sender->send($message); + + # wait for the response and then output it to the screen + my $response = $receiver->fetch(); + print $response->get_content() . "\n"; + + # close the connection + $connection->close(); +}; + +die $@ if ($@); diff --git a/qpid/cpp/bindings/qpid/examples/perl/map_receiver.pl b/qpid/cpp/bindings/qpid/examples/perl/map_receiver.pl new file mode 100755 index 0000000000..21b1cb09f2 --- /dev/null +++ b/qpid/cpp/bindings/qpid/examples/perl/map_receiver.pl @@ -0,0 +1,55 @@ +#! /usr/bin/env perl +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +use strict; +use warnings; +use Data::Dumper; + +use qpid; + +my $url = ( @ARGV > 0 ) ? $ARGV[0] : "amqp:tcp:127.0.0.1:5672"; +my $address = ( @ARGV > 1 ) ? $ARGV[1] : "message_queue; {create: always}"; +my $connectionOptions = ( @ARGV > 2 ) ? $ARGV[2] : ""; + +# create a connection object +my $connection = new qpid::messaging::Connection( $url, $connectionOptions ); + +eval { + # open the connection, then create a session from it + $connection->open(); + my $session = $connection->create_session(); + + # create a receiver for the session, subscribed the the specified queue + my $receiver = $session->create_receiver($address); + # wait for a message to appear in the queue + my $message = $receiver->fetch(); + + # display the content of the message + my $content = $message->get_content(); + print Dumper($content); + + # acknowledge the message, removing it from the queue + $session->acknowledge(); + + # close everything, cleaning up + $receiver->close(); + $connection->close(); +}; + +die $@ if ($@); diff --git a/qpid/cpp/bindings/qpid/examples/perl/map_sender.pl b/qpid/cpp/bindings/qpid/examples/perl/map_sender.pl new file mode 100755 index 0000000000..27063ef780 --- /dev/null +++ b/qpid/cpp/bindings/qpid/examples/perl/map_sender.pl @@ -0,0 +1,60 @@ +#! /usr/bin/env perl +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +use strict; +use warnings; +use Data::Dumper; + +use qpid; + +my $url = ( @ARGV > 0 ) ? $ARGV[0] : "amqp:tcp:127.0.0.1:5672"; +my $address = ( @ARGV > 1 ) ? $ARGV[1] : "message_queue; {create: always}"; +my $connectionOptions = ( @ARGV > 2 ) ? $ARGV[2] : ""; + +# create a new connection object +my $connection = new qpid::messaging::Connection( $url, $connectionOptions ); + +eval { + + # open the connection and create a session + $connection->open(); + my $session = $connection->create_session(); + + # create a sender and connect it to the supplied address string + my $sender = $session->create_sender($address); + + # create a message and set the content to be a map of values + my $message = new qpid::messaging::Message(); + my $content = { + id => 987654321, + name => "Widget", + percent => sprintf( "%.2f", 0.99 ), + colours => [qw (red green white)], + }; + $message->set_content($content); + + # send the message + $sender->send( $message, 1 ); + + # close the connection and session + $session->close(); + $connection->close(); +}; + +die $@ if ($@); diff --git a/qpid/cpp/bindings/qpid/examples/perl/server.pl b/qpid/cpp/bindings/qpid/examples/perl/server.pl new file mode 100755 index 0000000000..be43655aeb --- /dev/null +++ b/qpid/cpp/bindings/qpid/examples/perl/server.pl @@ -0,0 +1,83 @@ +#!/usr/bin/env perl +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +use strict; +use warnings; + +use qpid; + +my $url = ( @ARGV == 1 ) ? $ARGV[0] : "amqp:tcp:127.0.0.1:5672"; +my $connectionOptions = ( @ARGV > 1 ) ? $ARGV[1] : ""; + +# create a connection object +my $connection = new qpid::messaging::Connection( $url, $connectionOptions ); + +eval { + + # connect to the broker and create a session + $connection->open(); + my $session = $connection->create_session(); + + # create a receiver for accepting incoming messages + my $receiver = $session->create_receiver("service_queue; {create: always}"); + + # go into an infinite loop to receive messages and process them + while (1) { + + # wait for the next message to be processed + my $request = $receiver->fetch(); + + + # get the address for sending replies + # if no address was supplised then we can't really respond, so + # only process when one is present + my $address = $request->get_reply_to(); + if ($address) { + + # a temporary sender for sending to the response queue + my $sender = $session->create_sender($address); + my $s = $request->get_content(); + $s = uc($s); + + # create the response message and send it + my $response = new qpid::messaging::Message($s); + $sender->send($response); + print "Processed request: " + . $request->get_content() . " -> " + . $response->get_content() . "\n"; + + # acknowledge the message since it was processed + $session->acknowledge(); + } + else { + print "Error: no reply address specified for request: " + . $request->get_content() . "\n"; + $session->reject($request); + } + } + + # close connections to clean up + $session->close(); + $connection->close(); +}; + +if ($@) { + die $@; +} + diff --git a/qpid/cpp/bindings/qpid/examples/perl/spout.pl b/qpid/cpp/bindings/qpid/examples/perl/spout.pl new file mode 100755 index 0000000000..c97c2a58af --- /dev/null +++ b/qpid/cpp/bindings/qpid/examples/perl/spout.pl @@ -0,0 +1,169 @@ +#!/usr/bin/env perl +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +use strict; +use warnings; + +use qpid; +use Getopt::Long; +use Pod::Usage; +use Time::Local; + +my $url = "127.0.0.1"; +my $timeout = 0; +my $count = 1; +my $durable = 0; +my $id = ""; +my $replyto = ""; +my @properties; +my @entries; +my $content = ""; +my $connectionOptions = ""; +my $address = "amq.direct"; +my $help; + +my $result = GetOptions( + "broker|b=s" => \$url, + "timeout|t=i" => \$timeout, + "count|c=i" => \$count, + "durable|d" => \$durable, + "id|i=s" => \$id, + "replyto=s" => \$replyto, + "property|p=s@" => \@properties, + "map|m=s@" => \@entries, + "content=s" => \$content, + "connection-options=s" => \$connectionOptions, + "help|h" => \$help +) || pod2usage( -verbose => 0 ); + +pod2usage( -verbose => 1 ) if $help; + +if ( $#ARGV ge 0 ) { + $address = $ARGV[0]; +} + +sub setEntries { + my ($content) = @_; + + foreach (@entries) { + my ( $name, $value ) = split( "=", $_ ); + $content->{$name} = $value; + } +} + +sub setProperties { + my ($message) = @_; + + foreach (@properties) { + my ( $name, $value ) = split( "=", $_ ); + $message->set_property( $name, $value ); + } +} + +# create a connection object +my $connection = new qpid::messaging::Connection( $url, $connectionOptions ); + +eval { + # open the connection, create a session and then a sender + $connection->open(); + my $session = $connection->create_session(); + my $sender = $session->create_sender($address); + + # create a message to be sent + my $message = new qpid::messaging::Message(); + setProperties($message) if (@properties); + if (@entries) { + my $content = {}; + setEntries($content); + $message->set_content_object($content); + } + elsif ($content) { + $message->set_content($content); + $message->set_content_type("text/plain"); + } + + # set durable flag + $message->set_durable($durable); + + # if a reply-to address was supplied, then create a receiver from the + # session and wait for a response to be sent + my $receiver; + if ($replyto) { + my $responseQueue = new qpid::messaging::Address($replyto); + $receiver = $session->create_receiver($responseQueue); + $message->set_reply_to($responseQueue); + } + + my $start = localtime; + my @s = split( /[:\s]/, $start ); + my $s = "$s[3]$s[4]$s[5]"; + my $n = $s; + + for ( + my $i = 0 ; + ( $i < $count || $count == 0 ) + and ( $timeout == 0 || abs( $n - $s ) < $timeout ) ; + $i++ + ) + { + + $sender->send($message); + + if ($receiver) { + print "Waiting for a response.\n"; + my $response = $receiver->fetch(); + print "$i -> " . $response->get_content() . "\n"; + } + + my $now = localtime; + my @n = split( /[:\s]/, $now ); + my $n = "$n[3]$n[4]$n[5]"; + } + $session->sync(); + $connection->close(); +}; + +if ($@) { + $connection->close(); + die $@; +} + +__END__ + +=head1 NAME + +spout - Send messages to the specified address + +=head1 SYNOPSIS + + Usage: spout [OPTIONS] ADDRESS + + Options: + -h, --help show this message + -b VALUE, --broker VALUE url of broker to connect to + -t VALUE, --timeout VALUE exit after the specified time + -c VALUE, --count VALUE stop after count messageshave been sent, zero disables + -i VALUE, --id VALUE use the supplied id instead of generating one + --replyto VALUE specify reply-to value + -P VALUE, --property VALUE specify message property + -M VALUE, --map VALUE specify entry for map content + --content VALUE specify textual content + --connection-options VALUE connection options string in the form {name1:value1, name2:value2} + +=cut diff --git a/qpid/cpp/bindings/qpid/examples/python/console b/qpid/cpp/bindings/qpid/examples/python/console new file mode 100755 index 0000000000..2facc368c3 --- /dev/null +++ b/qpid/cpp/bindings/qpid/examples/python/console @@ -0,0 +1,99 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import optparse, sys, traceback + +try: + from qpid_messaging import * +except: + from qpid.messaging import * + +parser = optparse.OptionParser(usage="usage: %prog [options] ADDRESS ...", + description="handle requests from the supplied address.") +parser.add_option("-b", "--broker", default="localhost", + help="connect to specified BROKER (default %default)") +parser.add_option("-r", "--reconnect", action="store_true", + help="enable auto reconnect") +parser.add_option("-i", "--reconnect-interval", type="float", default=3, + help="interval between reconnect attempts") +parser.add_option("-l", "--reconnect-limit", type="int", default=10, + help="maximum number of reconnect attempts") +parser.add_option("-v", dest="verbose", action="store_true", + help="enable logging") + +opts, args = parser.parse_args() + +if args: + addr = args.pop(0) +else: + parser.error("address is required") + +conn = Connection(opts.broker, + reconnect=opts.reconnect, + reconnect_interval=opts.reconnect_interval, + reconnect_limit=opts.reconnect_limit) + +try: + conn.open() + session = conn.session() + sender = session.sender(addr) + response_queue = "response-queue;{create:always}" + receiver = session.receiver(response_queue) + receiver.capacity = 10 + + while True: + cmdtype = None + data = None + input = raw_input("Type (eval/shell/exit, ENTER=shell):") + if input != "exit": + if input == "eval": + cmdtype = input + data = raw_input("Text to evaluate: ") + elif input == "shell" or input == "": + cmdtype = "shell" + data = raw_input("Shell cmd: ") + + if cmdtype != None and data != "": + msg = Message() + msg.properties["type"] = cmdtype + # TODO: fix this + # msg.setProperty("type", cmdtype) + msg.content = data + msg.reply_to = response_queue + try: + sender.send(msg) + response = receiver.fetch() + print "Response:" + print "%s" % response.content + session.acknowledge(response) + except SendError, e: + print e + else: + break + if sender is not None: + sender.close() + if receiver is not None: + receiver.close() +except ReceiverError, e: + print e +except KeyboardInterrupt: + pass + +conn.close() diff --git a/qpid/cpp/bindings/qpid/examples/python/drain b/qpid/cpp/bindings/qpid/examples/python/drain new file mode 100755 index 0000000000..2b15a50500 --- /dev/null +++ b/qpid/cpp/bindings/qpid/examples/python/drain @@ -0,0 +1,102 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import optparse + +try: + from qpid_messaging import * +except: + from qpid.messaging import * + +from qpid.util import URL +from qpid.log import enable, DEBUG, WARN + +parser = optparse.OptionParser(usage="usage: %prog [options] ADDRESS ...", + description="Drain messages from the supplied address.") +parser.add_option("-b", "--broker", default="localhost", + help="connect to specified BROKER (default %default)") +parser.add_option("-c", "--count", type="int", + help="number of messages to drain") +parser.add_option("-f", "--forever", action="store_true", + help="ignore timeout and wait forever") +parser.add_option("-r", "--reconnect", action="store_true", + help="enable auto reconnect") +parser.add_option("-i", "--reconnect-interval", type="float", default=3, + help="interval between reconnect attempts") +parser.add_option("-l", "--reconnect-limit", type="int", + help="maximum number of reconnect attempts") +parser.add_option("-t", "--timeout", type="float", default=0, + help="timeout in seconds to wait before exiting (default %default)") +parser.add_option("-p", "--print", dest="format", default="%(M)s", + help="format string for printing messages (default %default)") +parser.add_option("-v", dest="verbose", action="store_true", + help="enable logging") + +opts, args = parser.parse_args() + +if opts.verbose: + enable("qpid", DEBUG) +else: + enable("qpid", WARN) + +if args: + addr = args.pop(0) +else: + parser.error("address is required") +if opts.forever: + timeout = None +else: + timeout = opts.timeout + +class Formatter: + + def __init__(self, message): + self.message = message + self.environ = {"M": self.message, + "P": self.message.properties, + "C": self.message.content} + + def __getitem__(self, st): + return eval(st, self.environ) + +conn = Connection(opts.broker, + reconnect=opts.reconnect, + reconnect_interval=opts.reconnect_interval, + reconnect_limit=opts.reconnect_limit) +try: + conn.open() + ssn = conn.session() + rcv = ssn.receiver(addr) + + count = 0 + while not opts.count or count < opts.count: + try: + msg = rcv.fetch(timeout=timeout) + print opts.format % Formatter(msg) + count += 1 + ssn.acknowledge() + except Empty: + break +except ReceiverError, e: + print e +except KeyboardInterrupt: + pass + +conn.close() diff --git a/qpid/cpp/bindings/qpid/examples/python/hello b/qpid/cpp/bindings/qpid/examples/python/hello new file mode 100755 index 0000000000..52ea955093 --- /dev/null +++ b/qpid/cpp/bindings/qpid/examples/python/hello @@ -0,0 +1,56 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import sys + +try: + from qpid_messaging import * +except: + from qpid.messaging import * + +if len(sys.argv)<2: + broker = "localhost:5672" +else: + broker = sys.argv[1] + +if len(sys.argv)<3: + address = "amq.topic" +else: + address = sys.argv[2] + +connection = Connection(broker) + +try: + connection.open() + session = connection.session() + + sender = session.sender(address) + receiver = session.receiver(address) + + sender.send(Message("Hello world!")); + + message = receiver.fetch() + print message.content + session.acknowledge() + +except MessagingError,m: + print m + +connection.close() diff --git a/qpid/cpp/bindings/qpid/examples/python/hello_xml b/qpid/cpp/bindings/qpid/examples/python/hello_xml new file mode 100755 index 0000000000..05fa5cc7ba --- /dev/null +++ b/qpid/cpp/bindings/qpid/examples/python/hello_xml @@ -0,0 +1,81 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import sys + +try: + from qpid_messaging import * +except: + from qpid.messaging import * + +broker = "localhost:5672" +connection = Connection(broker) + +try: + connection.open() + session = connection.session() + +# Set up the receiver + query = """ + let $w := ./weather + return $w/station = 'Raleigh-Durham International Airport (KRDU)' + and $w/temperature_f > 50 + and $w/temperature_f - $w/dewpoint > 5 + and $w/wind_speed_mph > 7 + and $w/wind_speed_mph < 20 """ + +# query="./weather" + + address = """ + xml; { + create: always, + node:{ type: queue }, + link: { + x-bindings: [{ exchange: xml, key: weather, arguments: { xquery: %r} }] + } + } + """ % query + + receiver = session.receiver(address) + +# Send an observation + + observations = """ + <weather> + <station>Raleigh-Durham International Airport (KRDU)</station> + <wind_speed_mph>16</wind_speed_mph> + <temperature_f>70</temperature_f> + <dewpoint>35</dewpoint> + </weather> """ + + message = Message(subject="weather", content=observations) + sender = session.sender("xml") + sender.send(message) + +# Retrieve matching message from the receiver and print it + + message = receiver.fetch(timeout=1) + print message.content + session.acknowledge() + +except MessagingError,m: + print m + +connection.close() diff --git a/qpid/cpp/bindings/qpid/examples/python/server b/qpid/cpp/bindings/qpid/examples/python/server new file mode 100755 index 0000000000..fb87951bad --- /dev/null +++ b/qpid/cpp/bindings/qpid/examples/python/server @@ -0,0 +1,100 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import optparse, sys, traceback + +try: + from qpid_messaging import * +except: + from qpid.messaging import * + +from qpid.util import URL +from subprocess import Popen, STDOUT, PIPE +from qpid.log import enable, DEBUG, WARN + +parser = optparse.OptionParser(usage="usage: %prog [options] ADDRESS ...", + description="handle requests from the supplied address.") +parser.add_option("-b", "--broker", default="localhost", + help="connect to specified BROKER (default %default)") +parser.add_option("-r", "--reconnect", action="store_true", + help="enable auto reconnect") +parser.add_option("-i", "--reconnect-interval", type="float", default=3, + help="interval between reconnect attempts") +parser.add_option("-l", "--reconnect-limit", type="int", + help="maximum number of reconnect attempts") +parser.add_option("-v", dest="verbose", action="store_true", + help="enable logging") + +opts, args = parser.parse_args() + +if opts.verbose: + enable("qpid", DEBUG) +else: + enable("qpid", WARN) + +if args: + addr = args.pop(0) +else: + parser.error("address is required") + +conn = Connection(opts.broker, + reconnect=opts.reconnect, + reconnect_interval=opts.reconnect_interval, + reconnect_limit=opts.reconnect_limit) +def dispatch(msg): + msg_type = msg.properties.get("type") + if msg_type == "shell": + proc = Popen(msg.content, shell=True, stderr=STDOUT, stdin=PIPE, stdout=PIPE) + output, _ = proc.communicate() + result = Message(output) + result.properties["exit"] = proc.returncode + elif msg_type == "eval": + try: + content = str(eval(msg.content)) + except: + content = traceback.format_exc() + result = Message(content = content) + else: + result = Message("unrecognized message type: %s" % msg_type) + return result + +try: + conn.open() + ssn = conn.session() + rcv = ssn.receiver(addr) + + while True: + msg = rcv.fetch() + response = dispatch(msg) + snd = None + try: + snd = ssn.sender(msg.reply_to) + snd.send(response) + except SendError, e: + print e + if snd is not None: + snd.close() + ssn.acknowledge() +except ReceiverError, e: + print e +except KeyboardInterrupt: + pass + +conn.close() diff --git a/qpid/cpp/bindings/qpid/examples/python/spout b/qpid/cpp/bindings/qpid/examples/python/spout new file mode 100755 index 0000000000..48921d4387 --- /dev/null +++ b/qpid/cpp/bindings/qpid/examples/python/spout @@ -0,0 +1,139 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import optparse, time + +try: + from qpid_messaging import * + from uuid import uuid4 +except: + from qpid.messaging import * + +from qpid.util import URL +from qpid.log import enable, DEBUG, WARN + +def nameval(st): + idx = st.find("=") + if idx >= 0: + name = st[0:idx] + value = st[idx+1:] + else: + name = st + value = None + return name, value + +parser = optparse.OptionParser(usage="usage: %prog [options] ADDRESS [ CONTENT ... ]", + description="Send messages to the supplied address.") +parser.add_option("-b", "--broker", default="localhost", + help="connect to specified BROKER (default %default)") +parser.add_option("-r", "--reconnect", action="store_true", + help="enable auto reconnect") +parser.add_option("-i", "--reconnect-interval", type="float", default=3, + help="interval between reconnect attempts") +parser.add_option("-l", "--reconnect-limit", type="int", + help="maximum number of reconnect attempts") +parser.add_option("-c", "--count", type="int", default=1, + help="stop after count messages have been sent, zero disables (default %default)") +parser.add_option("-d", "--durable", action="store_true", + help="make the message persistent") +parser.add_option("-t", "--timeout", type="float", default=None, + help="exit after the specified time") +parser.add_option("-I", "--id", help="use the supplied id instead of generating one") +parser.add_option("-S", "--subject", help="specify a subject") +parser.add_option("-R", "--reply-to", help="specify reply-to address") +parser.add_option("-P", "--property", dest="properties", action="append", default=[], + metavar="NAME=VALUE", help="specify message property") +parser.add_option("-M", "--map", dest="entries", action="append", default=[], + metavar="KEY=VALUE", + help="specify map entry for message body") +parser.add_option("-v", dest="verbose", action="store_true", + help="enable logging") + +opts, args = parser.parse_args() + +if opts.verbose: + enable("qpid", DEBUG) +else: + enable("qpid", WARN) + +if opts.id is None: + spout_id = str(uuid4()) +else: + spout_id = opts.id +if args: + addr = args.pop(0) +else: + parser.error("address is required") + +content = None +content_type = None + +if args: + text = " ".join(args) +else: + text = None + +if opts.entries: + content = {} + if text: + content["text"] = text + for e in opts.entries: + name, val = nameval(e) + content[name] = val +else: + content = text + # no entries were supplied, so assume text/plain for + # compatibility with java (and other) clients + content_type = "text/plain" + +conn = Connection(opts.broker, + reconnect=opts.reconnect, + reconnect_interval=opts.reconnect_interval, + reconnect_limit=opts.reconnect_limit) +try: + conn.open() + ssn = conn.session() + snd = ssn.sender(addr) + + count = 0 + start = time.time() + while (opts.count == 0 or count < opts.count) and \ + (opts.timeout is None or time.time() - start < opts.timeout): + msg = Message(subject=opts.subject, + reply_to=opts.reply_to, + content=content) + if opts.durable: + msg.durable = True + if content_type is not None: + msg.content_type = content_type + msg.properties["spout-id"] = "%s:%s" % (spout_id, count) + for p in opts.properties: + name, val = nameval(p) + msg.properties[name] = val + + snd.send(msg) + count += 1 + print msg +except SendError, e: + print e +except KeyboardInterrupt: + pass + +conn.close() diff --git a/qpid/cpp/bindings/qpid/examples/python/statistics.py b/qpid/cpp/bindings/qpid/examples/python/statistics.py new file mode 100644 index 0000000000..089b81b740 --- /dev/null +++ b/qpid/cpp/bindings/qpid/examples/python/statistics.py @@ -0,0 +1,139 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import time + +TS = "ts" +TIME_SEC = 1000000000 +MILLISECOND = 1000 + +class Statistic: + def message(self, msg): + return + def report(self): + return "" + def header(self): + return "" + + +class Throughput(Statistic): + def __init__(self): + self.messages = 0 + self.started = False + + def message(self, m): + self.messages += 1 + if not self.started: + self.start = time.time() + self.started = True + + def header(self): + return "tp(m/s)" + + def report(self): + if self.started: + elapsed = time.time() - self.start + return str(int(self.messages/elapsed)) + else: + return "0" + + +class ThroughputAndLatency(Throughput): + def __init__(self): + Throughput.__init__(self) + self.total = 0.0 + self.min = float('inf') + self.max = -float('inf') + self.samples = 0 + + def message(self, m): + Throughput.message(self, m) + if TS in m.properties: + self.samples+=1 + latency = MILLISECOND * (time.time() - float(m.properties[TS])/TIME_SEC) + if latency > 0: + self.total += latency + if latency < self.min: + self.min = latency + if latency > self.max: + self.max = latency + + def header(self): +# Throughput.header(self) + return "%s\tl-min\tl-max\tl-avg" % Throughput.header(self) + + def report(self): + output = Throughput.report(self) + if (self.samples > 0): + output += "\t%.2f\t%.2f\t%.2f" %(self.min, self.max, self.total/self.samples) + return output + + +# Report batch and overall statistics +class ReporterBase: + def __init__(self, batch, wantHeader): + self.batchSize = batch + self.batchCount = 0 + self.headerPrinted = not wantHeader + self.overall = None + self.batch = None + + def create(self): + return + + # Count message in the statistics + def message(self, m): + if self.overall == None: + self.overall = self.create() + self.overall.message(m) + if self.batchSize: + if self.batch == None: + self.batch = self.create() + self.batch.message(m) + self.batchCount+=1 + if self.batchCount == self.batchSize: + self.header() + print self.batch.report() + self.create() + self.batchCount = 0 + + # Print overall report. + def report(self): + if self.overall == None: + self.overall = self.create() + self.header() + print self.overall.report() + + def header(self): + if not self.headerPrinted: + if self.overall == None: + self.overall = self.create() + print self.overall.header() + self.headerPrinted = True + + +class Reporter(ReporterBase): + def __init__(self, batchSize, wantHeader, Stats): + ReporterBase.__init__(self, batchSize, wantHeader) + self.__stats = Stats + + def create(self): + ClassName = self.__stats.__class__ + return ClassName() |