summaryrefslogtreecommitdiff
path: root/lib/DBD/Gofer
diff options
context:
space:
mode:
authorLorry Tar Creator <lorry-tar-importer@baserock.org>2012-06-06 16:41:29 +0000
committerLorry <lorry@roadtrain.codethink.co.uk>2012-09-26 13:46:50 +0000
commit7c48e67cf07ee41bfde7139a62bb232bd23a4a48 (patch)
tree6d7686b5075bd5cba253dabf2e6c302acb3a147c /lib/DBD/Gofer
downloadperl-dbi-tarball-master.tar.gz
Imported from /srv/lorry/lorry-area/perl-dbi-tarball/DBI-1.622.tar.gz.HEADDBI-1.622master
Diffstat (limited to 'lib/DBD/Gofer')
-rw-r--r--lib/DBD/Gofer/Policy/Base.pm162
-rw-r--r--lib/DBD/Gofer/Policy/classic.pm79
-rw-r--r--lib/DBD/Gofer/Policy/pedantic.pm53
-rw-r--r--lib/DBD/Gofer/Policy/rush.pm90
-rw-r--r--lib/DBD/Gofer/Transport/Base.pm410
-rw-r--r--lib/DBD/Gofer/Transport/corostream.pm144
-rw-r--r--lib/DBD/Gofer/Transport/null.pm111
-rw-r--r--lib/DBD/Gofer/Transport/pipeone.pm253
-rw-r--r--lib/DBD/Gofer/Transport/stream.pm292
9 files changed, 1594 insertions, 0 deletions
diff --git a/lib/DBD/Gofer/Policy/Base.pm b/lib/DBD/Gofer/Policy/Base.pm
new file mode 100644
index 0000000..1725b03
--- /dev/null
+++ b/lib/DBD/Gofer/Policy/Base.pm
@@ -0,0 +1,162 @@
+package DBD::Gofer::Policy::Base;
+
+# $Id: Base.pm 10087 2007-10-16 12:42:37Z timbo $
+#
+# Copyright (c) 2007, Tim Bunce, Ireland
+#
+# You may distribute under the terms of either the GNU General Public
+# License or the Artistic License, as specified in the Perl README file.
+
+use strict;
+use warnings;
+use Carp;
+
+our $VERSION = sprintf("0.%06d", q$Revision: 10087 $ =~ /(\d+)/o);
+our $AUTOLOAD;
+
+my %policy_defaults = (
+ # force connect method (unless overridden by go_connect_method=>'...' attribute)
+ # if false: call same method on client as on server
+ connect_method => 'connect',
+ # force prepare method (unless overridden by go_prepare_method=>'...' attribute)
+ # if false: call same method on client as on server
+ prepare_method => 'prepare',
+ skip_connect_check => 0,
+ skip_default_methods => 0,
+ skip_prepare_check => 0,
+ skip_ping => 0,
+ dbh_attribute_update => 'every',
+ dbh_attribute_list => ['*'],
+ locally_quote => 0,
+ locally_quote_identifier => 0,
+ cache_parse_trace_flags => 1,
+ cache_parse_trace_flag => 1,
+ cache_data_sources => 1,
+ cache_type_info_all => 1,
+ cache_tables => 0,
+ cache_table_info => 0,
+ cache_column_info => 0,
+ cache_primary_key_info => 0,
+ cache_foreign_key_info => 0,
+ cache_statistics_info => 0,
+ cache_get_info => 0,
+ cache_func => 0,
+);
+
+my $base_policy_file = $INC{"DBD/Gofer/Policy/Base.pm"};
+
+__PACKAGE__->create_policy_subs(\%policy_defaults);
+
+sub create_policy_subs {
+ my ($class, $policy_defaults) = @_;
+
+ while ( my ($policy_name, $policy_default) = each %$policy_defaults) {
+ my $policy_attr_name = "go_$policy_name";
+ my $sub = sub {
+ # $policy->foo($attr, ...)
+ #carp "$policy_name($_[1],...)";
+ # return the policy default value unless an attribute overrides it
+ return (ref $_[1] && exists $_[1]->{$policy_attr_name})
+ ? $_[1]->{$policy_attr_name}
+ : $policy_default;
+ };
+ no strict 'refs';
+ *{$class . '::' . $policy_name} = $sub;
+ }
+}
+
+sub AUTOLOAD {
+ carp "Unknown policy name $AUTOLOAD used";
+ # only warn once
+ no strict 'refs';
+ *$AUTOLOAD = sub { undef };
+ return undef;
+}
+
+sub new {
+ my ($class, $args) = @_;
+ my $policy = {};
+ bless $policy, $class;
+}
+
+sub DESTROY { };
+
+1;
+
+=head1 NAME
+
+DBD::Gofer::Policy::Base - Base class for DBD::Gofer policies
+
+=head1 SYNOPSIS
+
+ $dbh = DBI->connect("dbi:Gofer:transport=...;policy=...", ...)
+
+=head1 DESCRIPTION
+
+DBD::Gofer can be configured via a 'policy' mechanism that allows you to
+fine-tune the number of round-trips to the Gofer server. The policies are
+grouped into classes (which may be subclassed) and referenced by the name of
+the class.
+
+The L<DBD::Gofer::Policy::Base> class is the base class for all the policy
+classes and describes all the individual policy items.
+
+The Base policy is not used directly. You should use a policy class derived from it.
+
+=head1 POLICY CLASSES
+
+Three policy classes are supplied with DBD::Gofer:
+
+L<DBD::Gofer::Policy::pedantic> is most 'transparent' but slowest because it
+makes more round-trips to the Gofer server.
+
+L<DBD::Gofer::Policy::classic> is a reasonable compromise - it's the default policy.
+
+L<DBD::Gofer::Policy::rush> is fastest, but may require code changes in your applications.
+
+Generally the default C<classic> policy is fine. When first testing an existing
+application with Gofer it is a good idea to start with the C<pedantic> policy
+first and then switch to C<classic> or a custom policy, for final testing.
+
+=head1 POLICY ITEMS
+
+These are temporary docs: See the source code for list of policies and their defaults.
+
+In a future version the policies and their defaults will be defined in the pod and parsed out at load-time.
+
+See the source code to this module for more details.
+
+=head1 POLICY CUSTOMIZATION
+
+XXX This area of DBD::Gofer is subject to change.
+
+There are three ways to customize policies:
+
+Policy classes are designed to influence the overall behaviour of DBD::Gofer
+with existing, unaltered programs, so they work in a reasonably optimal way
+without requiring code changes. You can implement new policy classes as
+subclasses of existing policies.
+
+In many cases individual policy items can be overridden on a case-by-case basis
+within your application code. You do this by passing a corresponding
+C<<go_<policy_name>>> attribute into DBI methods by your application code.
+This let's you fine-tune the behaviour for special cases.
+
+The policy items are implemented as methods. In many cases the methods are
+passed parameters relating to the DBD::Gofer code being executed. This means
+the policy can implement dynamic behaviour that varies depending on the
+particular circumstances, such as the particular statement being executed.
+
+=head1 AUTHOR
+
+Tim Bunce, L<http://www.tim.bunce.name>
+
+=head1 LICENCE AND COPYRIGHT
+
+Copyright (c) 2007, Tim Bunce, Ireland. All rights reserved.
+
+This module is free software; you can redistribute it and/or
+modify it under the same terms as Perl itself. See L<perlartistic>.
+
+=cut
+
diff --git a/lib/DBD/Gofer/Policy/classic.pm b/lib/DBD/Gofer/Policy/classic.pm
new file mode 100644
index 0000000..8f828f0
--- /dev/null
+++ b/lib/DBD/Gofer/Policy/classic.pm
@@ -0,0 +1,79 @@
+package DBD::Gofer::Policy::classic;
+
+# $Id: classic.pm 10087 2007-10-16 12:42:37Z timbo $
+#
+# Copyright (c) 2007, Tim Bunce, Ireland
+#
+# You may distribute under the terms of either the GNU General Public
+# License or the Artistic License, as specified in the Perl README file.
+
+use strict;
+use warnings;
+
+our $VERSION = sprintf("0.%06d", q$Revision: 10087 $ =~ /(\d+)/o);
+
+use base qw(DBD::Gofer::Policy::Base);
+
+__PACKAGE__->create_policy_subs({
+
+ # always use connect_cached on server
+ connect_method => 'connect_cached',
+
+ # use same methods on server as is called on client
+ prepare_method => '',
+
+ # don't skip the connect check since that also sets dbh attributes
+ # although this makes connect more expensive, that's partly offset
+ # by skip_ping=>1 below, which makes connect_cached very fast.
+ skip_connect_check => 0,
+
+ # most code doesn't rely on sth attributes being set after prepare
+ skip_prepare_check => 1,
+
+ # we're happy to use local method if that's the same as the remote
+ skip_default_methods => 1,
+
+ # ping is not important for DBD::Gofer and most transports
+ skip_ping => 1,
+
+ # only update dbh attributes on first contact with server
+ dbh_attribute_update => 'first',
+
+ # we'd like to set locally_* but can't because drivers differ
+
+ # get_info results usually don't change
+ cache_get_info => 1,
+});
+
+
+1;
+
+=head1 NAME
+
+DBD::Gofer::Policy::classic - The 'classic' policy for DBD::Gofer
+
+=head1 SYNOPSIS
+
+ $dbh = DBI->connect("dbi:Gofer:transport=...;policy=classic", ...)
+
+The C<classic> policy is the default DBD::Gofer policy, so need not be included in the DSN.
+
+=head1 DESCRIPTION
+
+Temporary docs: See the source code for list of policies and their defaults.
+
+In a future version the policies and their defaults will be defined in the pod and parsed out at load-time.
+
+=head1 AUTHOR
+
+Tim Bunce, L<http://www.tim.bunce.name>
+
+=head1 LICENCE AND COPYRIGHT
+
+Copyright (c) 2007, Tim Bunce, Ireland. All rights reserved.
+
+This module is free software; you can redistribute it and/or
+modify it under the same terms as Perl itself. See L<perlartistic>.
+
+=cut
+
diff --git a/lib/DBD/Gofer/Policy/pedantic.pm b/lib/DBD/Gofer/Policy/pedantic.pm
new file mode 100644
index 0000000..6829bea
--- /dev/null
+++ b/lib/DBD/Gofer/Policy/pedantic.pm
@@ -0,0 +1,53 @@
+package DBD::Gofer::Policy::pedantic;
+
+# $Id: pedantic.pm 10087 2007-10-16 12:42:37Z timbo $
+#
+# Copyright (c) 2007, Tim Bunce, Ireland
+#
+# You may distribute under the terms of either the GNU General Public
+# License or the Artistic License, as specified in the Perl README file.
+
+use strict;
+use warnings;
+
+our $VERSION = sprintf("0.%06d", q$Revision: 10087 $ =~ /(\d+)/o);
+
+use base qw(DBD::Gofer::Policy::Base);
+
+# the 'pedantic' policy is the same as the Base policy
+
+1;
+
+=head1 NAME
+
+DBD::Gofer::Policy::pedantic - The 'pedantic' policy for DBD::Gofer
+
+=head1 SYNOPSIS
+
+ $dbh = DBI->connect("dbi:Gofer:transport=...;policy=pedantic", ...)
+
+=head1 DESCRIPTION
+
+The C<pedantic> policy tries to be as transparent as possible. To do this it
+makes round-trips to the server for almost every DBI method call.
+
+This is the best policy to use when first testing existing code with Gofer.
+Once it's working well you should consider moving to the C<classic> policy or defining your own policy class.
+
+Temporary docs: See the source code for list of policies and their defaults.
+
+In a future version the policies and their defaults will be defined in the pod and parsed out at load-time.
+
+=head1 AUTHOR
+
+Tim Bunce, L<http://www.tim.bunce.name>
+
+=head1 LICENCE AND COPYRIGHT
+
+Copyright (c) 2007, Tim Bunce, Ireland. All rights reserved.
+
+This module is free software; you can redistribute it and/or
+modify it under the same terms as Perl itself. See L<perlartistic>.
+
+=cut
+
diff --git a/lib/DBD/Gofer/Policy/rush.pm b/lib/DBD/Gofer/Policy/rush.pm
new file mode 100644
index 0000000..9cfd582
--- /dev/null
+++ b/lib/DBD/Gofer/Policy/rush.pm
@@ -0,0 +1,90 @@
+package DBD::Gofer::Policy::rush;
+
+# $Id: rush.pm 10087 2007-10-16 12:42:37Z timbo $
+#
+# Copyright (c) 2007, Tim Bunce, Ireland
+#
+# You may distribute under the terms of either the GNU General Public
+# License or the Artistic License, as specified in the Perl README file.
+
+use strict;
+use warnings;
+
+our $VERSION = sprintf("0.%06d", q$Revision: 10087 $ =~ /(\d+)/o);
+
+use base qw(DBD::Gofer::Policy::Base);
+
+__PACKAGE__->create_policy_subs({
+
+ # always use connect_cached on server
+ connect_method => 'connect_cached',
+
+ # use same methods on server as is called on client
+ # (because code not using placeholders would bloat the sth cache)
+ prepare_method => '',
+
+ # Skipping the connect check is fast, but it also skips
+ # fetching the remote dbh attributes!
+ # Make sure that your application doesn't need access to dbh attributes.
+ skip_connect_check => 1,
+
+ # most code doesn't rely on sth attributes being set after prepare
+ skip_prepare_check => 1,
+
+ # we're happy to use local method if that's the same as the remote
+ skip_default_methods => 1,
+
+ # ping is almost meaningless for DBD::Gofer and most transports anyway
+ skip_ping => 1,
+
+ # don't update dbh attributes at all
+ # XXX actually we currently need dbh_attribute_update for skip_default_methods to work
+ # and skip_default_methods is more valuable to us than the cost of dbh_attribute_update
+ dbh_attribute_update => 'none', # actually means 'first' currently
+ #dbh_attribute_list => undef,
+
+ # we'd like to set locally_* but can't because drivers differ
+
+ # in a rush assume metadata doesn't change
+ cache_tables => 1,
+ cache_table_info => 1,
+ cache_column_info => 1,
+ cache_primary_key_info => 1,
+ cache_foreign_key_info => 1,
+ cache_statistics_info => 1,
+ cache_get_info => 1,
+});
+
+
+1;
+
+=head1 NAME
+
+DBD::Gofer::Policy::rush - The 'rush' policy for DBD::Gofer
+
+=head1 SYNOPSIS
+
+ $dbh = DBI->connect("dbi:Gofer:transport=...;policy=rush", ...)
+
+=head1 DESCRIPTION
+
+The C<rush> policy tries to make as few round-trips as possible.
+It's the opposite end of the policy spectrum to the C<pedantic> policy.
+
+Temporary docs: See the source code for list of policies and their defaults.
+
+In a future version the policies and their defaults will be defined in the pod and parsed out at load-time.
+
+=head1 AUTHOR
+
+Tim Bunce, L<http://www.tim.bunce.name>
+
+=head1 LICENCE AND COPYRIGHT
+
+Copyright (c) 2007, Tim Bunce, Ireland. All rights reserved.
+
+This module is free software; you can redistribute it and/or
+modify it under the same terms as Perl itself. See L<perlartistic>.
+
+=cut
+
diff --git a/lib/DBD/Gofer/Transport/Base.pm b/lib/DBD/Gofer/Transport/Base.pm
new file mode 100644
index 0000000..fe0d078
--- /dev/null
+++ b/lib/DBD/Gofer/Transport/Base.pm
@@ -0,0 +1,410 @@
+package DBD::Gofer::Transport::Base;
+
+# $Id: Base.pm 14120 2010-06-07 19:52:19Z hmbrand $
+#
+# Copyright (c) 2007, Tim Bunce, Ireland
+#
+# You may distribute under the terms of either the GNU General Public
+# License or the Artistic License, as specified in the Perl README file.
+
+use strict;
+use warnings;
+
+use base qw(DBI::Gofer::Transport::Base);
+
+our $VERSION = sprintf("0.%06d", q$Revision: 14120 $ =~ /(\d+)/o);
+
+__PACKAGE__->mk_accessors(qw(
+ trace
+ go_dsn
+ go_url
+ go_policy
+ go_timeout
+ go_retry_hook
+ go_retry_limit
+ go_cache
+ cache_hit
+ cache_miss
+ cache_store
+));
+__PACKAGE__->mk_accessors_using(make_accessor_autoviv_hashref => qw(
+ meta
+));
+
+
+sub new {
+ my ($class, $args) = @_;
+ $args->{$_} = 0 for (qw(cache_hit cache_miss cache_store));
+ $args->{keep_meta_frozen} ||= 1 if $args->{go_cache};
+ #warn "args @{[ %$args ]}\n";
+ return $class->SUPER::new($args);
+}
+
+
+sub _init_trace { $ENV{DBD_GOFER_TRACE} || 0 }
+
+
+sub new_response {
+ my $self = shift;
+ return DBI::Gofer::Response->new(@_);
+}
+
+
+sub transmit_request {
+ my ($self, $request) = @_;
+ my $trace = $self->trace;
+ my $response;
+
+ my ($go_cache, $request_cache_key);
+ if ($go_cache = $self->{go_cache}) {
+ $request_cache_key
+ = $request->{meta}{request_cache_key}
+ = $self->get_cache_key_for_request($request);
+ if ($request_cache_key) {
+ my $frozen_response = eval { $go_cache->get($request_cache_key) };
+ if ($frozen_response) {
+ $self->_dump("cached response found for ".ref($request), $request)
+ if $trace;
+ $response = $self->thaw_response($frozen_response);
+ $self->trace_msg("transmit_request is returning a response from cache $go_cache\n")
+ if $trace;
+ ++$self->{cache_hit};
+ return $response;
+ }
+ warn $@ if $@;
+ ++$self->{cache_miss};
+ $self->trace_msg("transmit_request cache miss\n")
+ if $trace;
+ }
+ }
+
+ my $to = $self->go_timeout;
+ my $transmit_sub = sub {
+ $self->trace_msg("transmit_request\n") if $trace;
+ local $SIG{ALRM} = sub { die "TIMEOUT\n" } if $to;
+
+ my $response = eval {
+ local $SIG{PIPE} = sub {
+ my $extra = ($! eq "Broken pipe") ? "" : " ($!)";
+ die "Unable to send request: Broken pipe$extra\n";
+ };
+ alarm($to) if $to;
+ $self->transmit_request_by_transport($request);
+ };
+ alarm(0) if $to;
+
+ if ($@) {
+ return $self->transport_timedout("transmit_request", $to)
+ if $@ eq "TIMEOUT\n";
+ return $self->new_response({ err => 1, errstr => $@ });
+ }
+
+ return $response;
+ };
+
+ $response = $self->_transmit_request_with_retries($request, $transmit_sub);
+
+ if ($response) {
+ my $frozen_response = delete $response->{meta}{frozen};
+ $self->_store_response_in_cache($frozen_response, $request_cache_key)
+ if $request_cache_key;
+ }
+
+ $self->trace_msg("transmit_request is returning a response itself\n")
+ if $trace && $response;
+
+ return $response unless wantarray;
+ return ($response, $transmit_sub);
+}
+
+
+sub _transmit_request_with_retries {
+ my ($self, $request, $transmit_sub) = @_;
+ my $response;
+ do {
+ $response = $transmit_sub->();
+ } while ( $response && $self->response_needs_retransmit($request, $response) );
+ return $response;
+}
+
+
+sub receive_response {
+ my ($self, $request, $retransmit_sub) = @_;
+ my $to = $self->go_timeout;
+
+ my $receive_sub = sub {
+ $self->trace_msg("receive_response\n");
+ local $SIG{ALRM} = sub { die "TIMEOUT\n" } if $to;
+
+ my $response = eval {
+ alarm($to) if $to;
+ $self->receive_response_by_transport($request);
+ };
+ alarm(0) if $to;
+
+ if ($@) {
+ return $self->transport_timedout("receive_response", $to)
+ if $@ eq "TIMEOUT\n";
+ return $self->new_response({ err => 1, errstr => $@ });
+ }
+ return $response;
+ };
+
+ my $response;
+ do {
+ $response = $receive_sub->();
+ if ($self->response_needs_retransmit($request, $response)) {
+ $response = $self->_transmit_request_with_retries($request, $retransmit_sub);
+ $response ||= $receive_sub->();
+ }
+ } while ( $self->response_needs_retransmit($request, $response) );
+
+ if ($response) {
+ my $frozen_response = delete $response->{meta}{frozen};
+ my $request_cache_key = $request->{meta}{request_cache_key};
+ $self->_store_response_in_cache($frozen_response, $request_cache_key)
+ if $request_cache_key && $self->{go_cache};
+ }
+
+ return $response;
+}
+
+
+sub response_retry_preference {
+ my ($self, $request, $response) = @_;
+
+ # give the user a chance to express a preference (or undef for default)
+ if (my $go_retry_hook = $self->go_retry_hook) {
+ my $retry = $go_retry_hook->($request, $response, $self);
+ $self->trace_msg(sprintf "go_retry_hook returned %s\n",
+ (defined $retry) ? $retry : 'undef');
+ return $retry if defined $retry;
+ }
+
+ # This is the main decision point. We don't retry requests that got
+ # as far as executing because the error is probably from the database
+ # (not transport) so retrying is unlikely to help. But note that any
+ # severe transport error occuring after execute is likely to return
+ # a new response object that doesn't have the execute flag set. Beware!
+ return 0 if $response->executed_flag_set;
+
+ return 1 if ($response->errstr || '') =~ m/induced by DBI_GOFER_RANDOM/;
+
+ return 1 if $request->is_idempotent; # i.e. is SELECT or ReadOnly was set
+
+ return undef; # we couldn't make up our mind
+}
+
+
+sub response_needs_retransmit {
+ my ($self, $request, $response) = @_;
+
+ my $err = $response->err
+ or return 0; # nothing went wrong
+
+ my $retry = $self->response_retry_preference($request, $response);
+
+ if (!$retry) { # false or undef
+ $self->trace_msg("response_needs_retransmit: response not suitable for retry\n");
+ return 0;
+ }
+
+ # we'd like to retry but have we retried too much already?
+
+ my $retry_limit = $self->go_retry_limit;
+ if (!$retry_limit) {
+ $self->trace_msg("response_needs_retransmit: retries disabled (retry_limit not set)\n");
+ return 0;
+ }
+
+ my $request_meta = $request->meta;
+ my $retry_count = $request_meta->{retry_count} || 0;
+ if ($retry_count >= $retry_limit) {
+ $self->trace_msg("response_needs_retransmit: $retry_count is too many retries\n");
+ # XXX should be possible to disable altering the err
+ $response->errstr(sprintf "%s (after %d retries by gofer)", $response->errstr, $retry_count);
+ return 0;
+ }
+
+ # will retry now, do the admin
+ ++$retry_count;
+ $self->trace_msg("response_needs_retransmit: retry $retry_count\n");
+
+ # hook so response_retry_preference can defer some code execution
+ # until we've checked retry_count and retry_limit.
+ if (ref $retry eq 'CODE') {
+ $retry->($retry_count, $retry_limit)
+ and warn "should return false"; # protect future use
+ }
+
+ ++$request_meta->{retry_count}; # update count for this request object
+ ++$self->meta->{request_retry_count}; # update cumulative transport stats
+
+ return 1;
+}
+
+
+sub transport_timedout {
+ my ($self, $method, $timeout) = @_;
+ $timeout ||= $self->go_timeout;
+ return $self->new_response({ err => 1, errstr => "DBD::Gofer $method timed-out after $timeout seconds" });
+}
+
+
+# return undef if we don't want to cache this request
+# subclasses may use more specialized rules
+sub get_cache_key_for_request {
+ my ($self, $request) = @_;
+
+ # we only want to cache idempotent requests
+ # is_idempotent() is true if GOf_REQUEST_IDEMPOTENT or GOf_REQUEST_READONLY set
+ return undef if not $request->is_idempotent;
+
+ # XXX would be nice to avoid the extra freeze here
+ my $key = $self->freeze_request($request, undef, 1);
+
+ #use Digest::MD5; warn "get_cache_key_for_request: ".Digest::MD5::md5_base64($key)."\n";
+
+ return $key;
+}
+
+
+sub _store_response_in_cache {
+ my ($self, $frozen_response, $request_cache_key) = @_;
+ my $go_cache = $self->{go_cache}
+ or return;
+
+ # new() ensures that enabling go_cache also enables keep_meta_frozen
+ warn "No meta frozen in response" if !$frozen_response;
+ warn "No request_cache_key" if !$request_cache_key;
+
+ if ($frozen_response && $request_cache_key) {
+ $self->trace_msg("receive_response added response to cache $go_cache\n");
+ eval { $go_cache->set($request_cache_key, $frozen_response) };
+ warn $@ if $@;
+ ++$self->{cache_store};
+ }
+}
+
+1;
+
+__END__
+
+=head1 NAME
+
+DBD::Gofer::Transport::Base - base class for DBD::Gofer client transports
+
+=head1 SYNOPSIS
+
+ my $remote_dsn = "..."
+ DBI->connect("dbi:Gofer:transport=...;url=...;timeout=...;retry_limit=...;dsn=$remote_dsn",...)
+
+or, enable by setting the DBI_AUTOPROXY environment variable:
+
+ export DBI_AUTOPROXY='dbi:Gofer:transport=...;url=...'
+
+which will force I<all> DBI connections to be made via that Gofer server.
+
+=head1 DESCRIPTION
+
+This is the base class for all DBD::Gofer client transports.
+
+=head1 ATTRIBUTES
+
+Gofer transport attributes can be specified either in the attributes parameter
+of the connect() method call, or in the DSN string. When used in the DSN
+string, attribute names don't have the C<go_> prefix.
+
+=head2 go_dsn
+
+The full DBI DSN that the Gofer server should connect to on your behalf.
+
+When used in the DSN it must be the last element in the DSN string.
+
+=head2 go_timeout
+
+A time limit for sending a request and receiving a response. Some drivers may
+implement sending and receiving as separate steps, in which case (currently)
+the timeout applies to each separately.
+
+If a request needs to be resent then the timeout is restarted for each sending
+of a request and receiving of a response.
+
+=head2 go_retry_limit
+
+The maximum number of times an request may be retried. The default is 2.
+
+=head2 go_retry_hook
+
+This subroutine reference is called, if defined, for each response received where $response->err is true.
+
+The subroutine is pass three parameters: the request object, the response object, and the transport object.
+
+If it returns an undefined value then the default retry behaviour is used. See L</RETRY ON ERROR> below.
+
+If it returns a defined but false value then the request is not resent.
+
+If it returns true value then the request is resent, so long as the number of retries does not exceed C<go_retry_limit>.
+
+=head1 RETRY ON ERROR
+
+The default retry on error behaviour is:
+
+ - Retry if the error was due to DBI_GOFER_RANDOM. See L<DBI::Gofer::Execute>.
+
+ - Retry if $request->is_idempotent returns true. See L<DBI::Gofer::Request>.
+
+A retry won't be allowed if the number of previous retries has reached C<go_retry_limit>.
+
+=head1 TRACING
+
+Tracing of gofer requests and responses can be enabled by setting the
+C<DBD_GOFER_TRACE> environment variable. A value of 1 gives a reasonably
+compact summary of each request and response. A value of 2 or more gives a
+detailed, and voluminous, dump.
+
+The trace is written using DBI->trace_msg() and so is written to the default
+DBI trace output, which is usually STDERR.
+
+=head1 METHODS
+
+I<This section is currently far from complete.>
+
+=head2 response_retry_preference
+
+ $retry = $transport->response_retry_preference($request, $response);
+
+The response_retry_preference is called by DBD::Gofer when considering if a
+request should be retried after an error.
+
+Returns true (would like to retry), false (must not retry), undef (no preference).
+
+If a true value is returned in the form of a CODE ref then, if DBD::Gofer does
+decide to retry the request, it calls the code ref passing $retry_count, $retry_limit.
+Can be used for logging and/or to implement exponential backoff behaviour.
+Currently the called code must return using C<return;> to allow for future extensions.
+
+=head1 AUTHOR
+
+Tim Bunce, L<http://www.tim.bunce.name>
+
+=head1 LICENCE AND COPYRIGHT
+
+Copyright (c) 2007-2008, Tim Bunce, Ireland. All rights reserved.
+
+This module is free software; you can redistribute it and/or
+modify it under the same terms as Perl itself. See L<perlartistic>.
+
+=head1 SEE ALSO
+
+L<DBD::Gofer>, L<DBI::Gofer::Request>, L<DBI::Gofer::Response>, L<DBI::Gofer::Execute>.
+
+and some example transports:
+
+L<DBD::Gofer::Transport::stream>
+
+L<DBD::Gofer::Transport::http>
+
+L<DBI::Gofer::Transport::mod_perl>
+
+=cut
diff --git a/lib/DBD/Gofer/Transport/corostream.pm b/lib/DBD/Gofer/Transport/corostream.pm
new file mode 100644
index 0000000..6e79278
--- /dev/null
+++ b/lib/DBD/Gofer/Transport/corostream.pm
@@ -0,0 +1,144 @@
+package DBD::Gofer::Transport::corostream;
+
+use strict;
+use warnings;
+
+use Carp;
+
+use Coro::Select; # a slow but coro-aware replacement for CORE::select (global effect!)
+
+use Coro;
+use Coro::Handle;
+
+use base qw(DBD::Gofer::Transport::stream);
+
+# XXX ensure DBI_PUREPERL for parent doesn't pass to child
+sub start_pipe_command {
+ local $ENV{DBI_PUREPERL} = $ENV{DBI_PUREPERL_COROCHILD}; # typically undef
+ my $connection = shift->SUPER::start_pipe_command(@_);
+ return $connection;
+}
+
+
+
+1;
+
+__END__
+
+=head1 NAME
+
+DBD::Gofer::Transport::corostream - Async DBD::Gofer stream transport using Coro and AnyEvent
+
+=head1 SYNOPSIS
+
+ DBI_AUTOPROXY="dbi:Gofer:transport=corostream" perl some-perl-script-using-dbi.pl
+
+or
+
+ $dsn = ...; # the DSN for the driver and database you want to use
+ $dbh = DBI->connect("dbi:Gofer:transport=corostream;dsn=$dsn", ...);
+
+=head1 DESCRIPTION
+
+The I<BIG WIN> from using L<Coro> is that it enables the use of existing
+DBI frameworks like L<DBIx::Class>.
+
+=head1 KNOWN ISSUES AND LIMITATIONS
+
+ - Uses Coro::Select so alters CORE::select globally
+ Parent class probably needs refactoring to enable a more encapsulated approach.
+
+ - Doesn't prevent multiple concurrent requests
+ Probably just needs a per-connection semaphore
+
+ - Coro has many caveats. Caveat emptor.
+
+=head1 STATUS
+
+THIS IS CURRENTLY JUST A PROOF-OF-CONCEPT IMPLEMENTATION FOR EXPERIMENTATION.
+
+Please note that I have no plans to develop this code further myself.
+I'd very much welcome contributions. Interested? Let me know!
+
+=head1 AUTHOR
+
+Tim Bunce, L<http://www.tim.bunce.name>
+
+=head1 LICENCE AND COPYRIGHT
+
+Copyright (c) 2010, Tim Bunce, Ireland. All rights reserved.
+
+This module is free software; you can redistribute it and/or
+modify it under the same terms as Perl itself. See L<perlartistic>.
+
+=head1 SEE ALSO
+
+L<DBD::Gofer::Transport::stream>
+
+L<DBD::Gofer>
+
+=head1 APPENDIX
+
+Example code:
+
+ #!perl
+
+ use strict;
+ use warnings;
+ use Time::HiRes qw(time);
+
+ BEGIN { $ENV{PERL_ANYEVENT_STRICT} = 1; $ENV{PERL_ANYEVENT_VERBOSE} = 1; }
+
+ use AnyEvent;
+
+ BEGIN { $ENV{DBI_TRACE} = 0; $ENV{DBI_GOFER_TRACE} = 0; $ENV{DBD_GOFER_TRACE} = 0; };
+
+ use DBI;
+
+ $ENV{DBI_AUTOPROXY} = 'dbi:Gofer:transport=corostream';
+
+ my $ticker = AnyEvent->timer( after => 0, interval => 0.1, cb => sub {
+ warn sprintf "-tick- %.2f\n", time
+ } );
+
+ warn "connecting...\n";
+ my $dbh = DBI->connect("dbi:NullP:");
+ warn "...connected\n";
+
+ for (1..3) {
+ warn "entering DBI...\n";
+ $dbh->do("sleep 0.3"); # pseudo-sql understood by the DBD::NullP driver
+ warn "...returned\n";
+ }
+
+ warn "done.";
+
+Example output:
+
+ $ perl corogofer.pl
+ connecting...
+ -tick- 1293631437.14
+ -tick- 1293631437.14
+ ...connected
+ entering DBI...
+ -tick- 1293631437.25
+ -tick- 1293631437.35
+ -tick- 1293631437.45
+ -tick- 1293631437.55
+ ...returned
+ entering DBI...
+ -tick- 1293631437.66
+ -tick- 1293631437.76
+ -tick- 1293631437.86
+ ...returned
+ entering DBI...
+ -tick- 1293631437.96
+ -tick- 1293631438.06
+ -tick- 1293631438.16
+ ...returned
+ done. at corogofer.pl line 39.
+
+You can see that the timer callback is firing while the code 'waits' inside the
+do() method for the response from the database. Normally that would block.
+
+=cut
diff --git a/lib/DBD/Gofer/Transport/null.pm b/lib/DBD/Gofer/Transport/null.pm
new file mode 100644
index 0000000..4b8d86c
--- /dev/null
+++ b/lib/DBD/Gofer/Transport/null.pm
@@ -0,0 +1,111 @@
+package DBD::Gofer::Transport::null;
+
+# $Id: null.pm 10087 2007-10-16 12:42:37Z timbo $
+#
+# Copyright (c) 2007, Tim Bunce, Ireland
+#
+# You may distribute under the terms of either the GNU General Public
+# License or the Artistic License, as specified in the Perl README file.
+
+use strict;
+use warnings;
+
+use base qw(DBD::Gofer::Transport::Base);
+
+use DBI::Gofer::Execute;
+
+our $VERSION = sprintf("0.%06d", q$Revision: 10087 $ =~ /(\d+)/o);
+
+__PACKAGE__->mk_accessors(qw(
+ pending_response
+ transmit_count
+));
+
+my $executor = DBI::Gofer::Execute->new();
+
+
+sub transmit_request_by_transport {
+ my ($self, $request) = @_;
+ $self->transmit_count( ($self->transmit_count()||0) + 1 ); # just for tests
+
+ my $frozen_request = $self->freeze_request($request);
+
+ # ...
+ # the request is magically transported over to ... ourselves
+ # ...
+
+ my $response = $executor->execute_request( $self->thaw_request($frozen_request, undef, 1) );
+
+ # put response 'on the shelf' ready for receive_response()
+ $self->pending_response( $response );
+
+ return undef;
+}
+
+
+sub receive_response_by_transport {
+ my $self = shift;
+
+ my $response = $self->pending_response;
+
+ my $frozen_response = $self->freeze_response($response, undef, 1);
+
+ # ...
+ # the response is magically transported back to ... ourselves
+ # ...
+
+ return $self->thaw_response($frozen_response);
+}
+
+
+1;
+__END__
+
+=head1 NAME
+
+DBD::Gofer::Transport::null - DBD::Gofer client transport for testing
+
+=head1 SYNOPSIS
+
+ my $original_dsn = "..."
+ DBI->connect("dbi:Gofer:transport=null;dsn=$original_dsn",...)
+
+or, enable by setting the DBI_AUTOPROXY environment variable:
+
+ export DBI_AUTOPROXY="dbi:Gofer:transport=null"
+
+=head1 DESCRIPTION
+
+Connect via DBD::Gofer but execute the requests within the same process.
+
+This is a quick and simple way to test applications for compatibility with the
+(few) restrictions that DBD::Gofer imposes.
+
+It also provides a simple, portable way for the DBI test suite to be used to
+test DBD::Gofer on all platforms with no setup.
+
+Also, by measuring the difference in performance between normal connections and
+connections via C<dbi:Gofer:transport=null> the basic cost of using DBD::Gofer
+can be measured. Furthermore, the additional cost of more advanced transports can be
+isolated by comparing their performance with the null transport.
+
+The C<t/85gofer.t> script in the DBI distribution includes a comparative benchmark.
+
+=head1 AUTHOR
+
+Tim Bunce, L<http://www.tim.bunce.name>
+
+=head1 LICENCE AND COPYRIGHT
+
+Copyright (c) 2007, Tim Bunce, Ireland. All rights reserved.
+
+This module is free software; you can redistribute it and/or
+modify it under the same terms as Perl itself. See L<perlartistic>.
+
+=head1 SEE ALSO
+
+L<DBD::Gofer::Transport::Base>
+
+L<DBD::Gofer>
+
+=cut
diff --git a/lib/DBD/Gofer/Transport/pipeone.pm b/lib/DBD/Gofer/Transport/pipeone.pm
new file mode 100644
index 0000000..3df2bf3
--- /dev/null
+++ b/lib/DBD/Gofer/Transport/pipeone.pm
@@ -0,0 +1,253 @@
+package DBD::Gofer::Transport::pipeone;
+
+# $Id: pipeone.pm 10087 2007-10-16 12:42:37Z timbo $
+#
+# Copyright (c) 2007, Tim Bunce, Ireland
+#
+# You may distribute under the terms of either the GNU General Public
+# License or the Artistic License, as specified in the Perl README file.
+
+use strict;
+use warnings;
+
+use Carp;
+use Fcntl;
+use IO::Select;
+use IPC::Open3 qw(open3);
+use Symbol qw(gensym);
+
+use base qw(DBD::Gofer::Transport::Base);
+
+our $VERSION = sprintf("0.%06d", q$Revision: 10087 $ =~ /(\d+)/o);
+
+__PACKAGE__->mk_accessors(qw(
+ connection_info
+ go_perl
+));
+
+
+sub new {
+ my ($self, $args) = @_;
+ $args->{go_perl} ||= do {
+ ($INC{"blib.pm"}) ? [ $^X, '-Mblib' ] : [ $^X ];
+ };
+ if (not ref $args->{go_perl}) {
+ # user can override the perl to be used, either with an array ref
+ # containing the command name and args to use, or with a string
+ # (ie via the DSN) in which case, to enable args to be passed,
+ # we split on two or more consecutive spaces (otherwise the path
+ # to perl couldn't contain a space itself).
+ $args->{go_perl} = [ split /\s{2,}/, $args->{go_perl} ];
+ }
+ return $self->SUPER::new($args);
+}
+
+
+# nonblock($fh) puts filehandle into nonblocking mode
+sub nonblock {
+ my $fh = shift;
+ my $flags = fcntl($fh, F_GETFL, 0)
+ or croak "Can't get flags for filehandle $fh: $!";
+ fcntl($fh, F_SETFL, $flags | O_NONBLOCK)
+ or croak "Can't make filehandle $fh nonblocking: $!";
+}
+
+
+sub start_pipe_command {
+ my ($self, $cmd) = @_;
+ $cmd = [ $cmd ] unless ref $cmd eq 'ARRAY';
+
+ # if it's important that the subprocess uses the same
+ # (versions of) modules as us then the caller should
+ # set PERL5LIB itself.
+
+ # limit various forms of insanity, for now
+ local $ENV{DBI_TRACE}; # use DBI_GOFER_TRACE instead
+ local $ENV{DBI_AUTOPROXY};
+ local $ENV{DBI_PROFILE};
+
+ my ($wfh, $rfh, $efh) = (gensym, gensym, gensym);
+ my $pid = open3($wfh, $rfh, $efh, @$cmd)
+ or die "error starting @$cmd: $!\n";
+ if ($self->trace) {
+ $self->trace_msg(sprintf("Started pid $pid: @$cmd {fd: w%d r%d e%d, ppid=$$}\n", fileno $wfh, fileno $rfh, fileno $efh),0);
+ }
+ nonblock($rfh);
+ nonblock($efh);
+ my $ios = IO::Select->new($rfh, $efh);
+
+ return {
+ cmd=>$cmd,
+ pid=>$pid,
+ wfh=>$wfh, rfh=>$rfh, efh=>$efh,
+ ios=>$ios,
+ };
+}
+
+
+sub cmd_as_string {
+ my $self = shift;
+ # XXX meant to return a properly shell-escaped string suitable for system
+ # but its only for debugging so that can wait
+ my $connection_info = $self->connection_info;
+ return join " ", map { (m/^[-:\w]*$/) ? $_ : "'$_'" } @{$connection_info->{cmd}};
+}
+
+
+sub transmit_request_by_transport {
+ my ($self, $request) = @_;
+
+ my $frozen_request = $self->freeze_request($request);
+
+ my $cmd = [ @{$self->go_perl}, qw(-MDBI::Gofer::Transport::pipeone -e run_one_stdio)];
+ my $info = $self->start_pipe_command($cmd);
+
+ my $wfh = delete $info->{wfh};
+ # send frozen request
+ local $\;
+ print $wfh $frozen_request
+ or warn "error writing to @$cmd: $!\n";
+ # indicate that there's no more
+ close $wfh
+ or die "error closing pipe to @$cmd: $!\n";
+
+ $self->connection_info( $info );
+ return;
+}
+
+
+sub read_response_from_fh {
+ my ($self, $fh_actions) = @_;
+ my $trace = $self->trace;
+
+ my $info = $self->connection_info || die;
+ my ($ios) = @{$info}{qw(ios)};
+ my $errors = 0;
+ my $complete;
+
+ die "No handles to read response from" unless $ios->count;
+
+ while ($ios->count) {
+ my @readable = $ios->can_read();
+ for my $fh (@readable) {
+ local $_;
+ my $actions = $fh_actions->{$fh} || die "panic: no action for $fh";
+ my $rv = sysread($fh, $_='', 1024*31); # to fit in 32KB slab
+ unless ($rv) { # error (undef) or end of file (0)
+ my $action;
+ unless (defined $rv) { # was an error
+ $self->trace_msg("error on handle $fh: $!\n") if $trace >= 4;
+ $action = $actions->{error} || $actions->{eof};
+ ++$errors;
+ # XXX an error may be a permenent condition of the handle
+ # if so we'll loop here - not good
+ }
+ else {
+ $action = $actions->{eof};
+ $self->trace_msg("eof on handle $fh\n") if $trace >= 4;
+ }
+ if ($action->($fh)) {
+ $self->trace_msg("removing $fh from handle set\n") if $trace >= 4;
+ $ios->remove($fh);
+ }
+ next;
+ }
+ # action returns true if the response is now complete
+ # (we finish all handles
+ $actions->{read}->($fh) && ++$complete;
+ }
+ last if $complete;
+ }
+ return $errors;
+}
+
+
+sub receive_response_by_transport {
+ my $self = shift;
+
+ my $info = $self->connection_info || die;
+ my ($pid, $rfh, $efh, $ios, $cmd) = @{$info}{qw(pid rfh efh ios cmd)};
+
+ my $frozen_response;
+ my $stderr_msg;
+
+ $self->read_response_from_fh( {
+ $efh => {
+ error => sub { warn "error reading response stderr: $!"; 1 },
+ eof => sub { warn "eof on stderr" if 0; 1 },
+ read => sub { $stderr_msg .= $_; 0 },
+ },
+ $rfh => {
+ error => sub { warn "error reading response: $!"; 1 },
+ eof => sub { warn "eof on stdout" if 0; 1 },
+ read => sub { $frozen_response .= $_; 0 },
+ },
+ });
+
+ waitpid $info->{pid}, 0
+ or warn "waitpid: $!"; # XXX do something more useful?
+
+ die ref($self)." command (@$cmd) failed: $stderr_msg"
+ if not $frozen_response; # no output on stdout at all
+
+ # XXX need to be able to detect and deal with corruption
+ my $response = $self->thaw_response($frozen_response);
+
+ if ($stderr_msg) {
+ # add stderr messages as warnings (for PrintWarn)
+ $response->add_err(0, $stderr_msg, undef, $self->trace)
+ # but ignore warning from old version of blib
+ unless $stderr_msg =~ /^Using .*blib/ && "@$cmd" =~ /-Mblib/;
+ }
+
+ return $response;
+}
+
+
+1;
+
+__END__
+
+=head1 NAME
+
+DBD::Gofer::Transport::pipeone - DBD::Gofer client transport for testing
+
+=head1 SYNOPSIS
+
+ $original_dsn = "...";
+ DBI->connect("dbi:Gofer:transport=pipeone;dsn=$original_dsn",...)
+
+or, enable by setting the DBI_AUTOPROXY environment variable:
+
+ export DBI_AUTOPROXY="dbi:Gofer:transport=pipeone"
+
+=head1 DESCRIPTION
+
+Connect via DBD::Gofer and execute each request by starting executing a subprocess.
+
+This is, as you might imagine, spectacularly inefficient!
+
+It's only intended for testing. Specifically it demonstrates that the server
+side is completely stateless.
+
+It also provides a base class for the much more useful L<DBD::Gofer::Transport::stream>
+transport.
+
+=head1 AUTHOR
+
+Tim Bunce, L<http://www.tim.bunce.name>
+
+=head1 LICENCE AND COPYRIGHT
+
+Copyright (c) 2007, Tim Bunce, Ireland. All rights reserved.
+
+This module is free software; you can redistribute it and/or
+modify it under the same terms as Perl itself. See L<perlartistic>.
+
+=head1 SEE ALSO
+
+L<DBD::Gofer::Transport::Base>
+
+L<DBD::Gofer>
+
+=cut
diff --git a/lib/DBD/Gofer/Transport/stream.pm b/lib/DBD/Gofer/Transport/stream.pm
new file mode 100644
index 0000000..61e211c
--- /dev/null
+++ b/lib/DBD/Gofer/Transport/stream.pm
@@ -0,0 +1,292 @@
+package DBD::Gofer::Transport::stream;
+
+# $Id: stream.pm 14598 2010-12-21 22:53:25Z timbo $
+#
+# Copyright (c) 2007, Tim Bunce, Ireland
+#
+# You may distribute under the terms of either the GNU General Public
+# License or the Artistic License, as specified in the Perl README file.
+
+use strict;
+use warnings;
+
+use Carp;
+
+use base qw(DBD::Gofer::Transport::pipeone);
+
+our $VERSION = sprintf("0.%06d", q$Revision: 14598 $ =~ /(\d+)/o);
+
+__PACKAGE__->mk_accessors(qw(
+ go_persist
+));
+
+my $persist_all = 5;
+my %persist;
+
+
+sub _connection_key {
+ my ($self) = @_;
+ return join "~", $self->go_url||"", @{ $self->go_perl || [] };
+}
+
+
+sub _connection_get {
+ my ($self) = @_;
+
+ my $persist = $self->go_persist; # = 0 can force non-caching
+ $persist = $persist_all if not defined $persist;
+ my $key = ($persist) ? $self->_connection_key : '';
+ if ($persist{$key} && $self->_connection_check($persist{$key})) {
+ $self->trace_msg("reusing persistent connection $key\n",0) if $self->trace >= 1;
+ return $persist{$key};
+ }
+
+ my $connection = $self->_make_connection;
+
+ if ($key) {
+ %persist = () if keys %persist > $persist_all; # XXX quick hack to limit subprocesses
+ $persist{$key} = $connection;
+ }
+
+ return $connection;
+}
+
+
+sub _connection_check {
+ my ($self, $connection) = @_;
+ $connection ||= $self->connection_info;
+ my $pid = $connection->{pid};
+ my $ok = (kill 0, $pid);
+ $self->trace_msg("_connection_check: $ok (pid $$)\n",0) if $self->trace;
+ return $ok;
+}
+
+
+sub _connection_kill {
+ my ($self) = @_;
+ my $connection = $self->connection_info;
+ my ($pid, $wfh, $rfh, $efh) = @{$connection}{qw(pid wfh rfh efh)};
+ $self->trace_msg("_connection_kill: closing write handle\n",0) if $self->trace;
+ # closing the write file handle should be enough, generally
+ close $wfh;
+ # in future we may want to be more aggressive
+ #close $rfh; close $efh; kill 15, $pid
+ # but deleting from the persist cache...
+ delete $persist{ $self->_connection_key };
+ # ... and removing the connection_info should suffice
+ $self->connection_info( undef );
+ return;
+}
+
+
+sub _make_connection {
+ my ($self) = @_;
+
+ my $go_perl = $self->go_perl;
+ my $cmd = [ @$go_perl, qw(-MDBI::Gofer::Transport::stream -e run_stdio_hex)];
+
+ #push @$cmd, "DBI_TRACE=2=/tmp/goferstream.log", "sh", "-c";
+ if (my $url = $self->go_url) {
+ die "Only 'ssh:user\@host' style url supported by this transport"
+ unless $url =~ s/^ssh://;
+ my $ssh = $url;
+ my $setup_env = join "||", map { "source $_ 2>/dev/null" } qw(.bash_profile .bash_login .profile);
+ my $setup = $setup_env.q{; exec "$@"};
+ # don't use $^X on remote system by default as it's possibly wrong
+ $cmd->[0] = 'perl' if "@$go_perl" eq $^X;
+ # -x not only 'Disables X11 forwarding' but also makes connections *much* faster
+ unshift @$cmd, qw(ssh -xq), split(' ', $ssh), qw(bash -c), $setup;
+ }
+
+ $self->trace_msg("new connection: @$cmd\n",0) if $self->trace;
+
+ # XXX add a handshake - some message from DBI::Gofer::Transport::stream that's
+ # sent as soon as it starts that we can wait for to report success - and soak up
+ # and report useful warnings etc from ssh before we get it? Increases latency though.
+ my $connection = $self->start_pipe_command($cmd);
+ return $connection;
+}
+
+
+sub transmit_request_by_transport {
+ my ($self, $request) = @_;
+ my $trace = $self->trace;
+
+ my $connection = $self->connection_info || do {
+ my $con = $self->_connection_get;
+ $self->connection_info( $con );
+ $con;
+ };
+
+ my $encoded_request = unpack("H*", $self->freeze_request($request));
+ $encoded_request .= "\015\012";
+
+ my $wfh = $connection->{wfh};
+ $self->trace_msg(sprintf("transmit_request_by_transport: to fh %s fd%d\n", $wfh, fileno($wfh)),0)
+ if $trace >= 4;
+
+ # send frozen request
+ local $\;
+ $wfh->print($encoded_request) # autoflush enabled
+ or do {
+ my $err = $!;
+ # XXX could/should make new connection and retry
+ $self->_connection_kill;
+ die "Error sending request: $err";
+ };
+ $self->trace_msg("Request sent: $encoded_request\n",0) if $trace >= 4;
+
+ return undef; # indicate no response yet (so caller calls receive_response_by_transport)
+}
+
+
+sub receive_response_by_transport {
+ my $self = shift;
+ my $trace = $self->trace;
+
+ $self->trace_msg("receive_response_by_transport: awaiting response\n",0) if $trace >= 4;
+ my $connection = $self->connection_info || die;
+ my ($pid, $rfh, $efh, $cmd) = @{$connection}{qw(pid rfh efh cmd)};
+
+ my $errno = 0;
+ my $encoded_response;
+ my $stderr_msg;
+
+ $self->read_response_from_fh( {
+ $efh => {
+ error => sub { warn "error reading response stderr: $!"; $errno||=$!; 1 },
+ eof => sub { warn "eof reading efh" if $trace >= 4; 1 },
+ read => sub { $stderr_msg .= $_; 0 },
+ },
+ $rfh => {
+ error => sub { warn "error reading response: $!"; $errno||=$!; 1 },
+ eof => sub { warn "eof reading rfh" if $trace >= 4; 1 },
+ read => sub { $encoded_response .= $_; ($encoded_response=~s/\015\012$//) ? 1 : 0 },
+ },
+ });
+
+ # if we got no output on stdout at all then the command has
+ # probably exited, possibly with an error to stderr.
+ # Turn this situation into a reasonably useful DBI error.
+ if (not $encoded_response) {
+ my @msg;
+ push @msg, "error while reading response: $errno" if $errno;
+ if ($stderr_msg) {
+ chomp $stderr_msg;
+ push @msg, sprintf "error reported by \"%s\" (pid %d%s): %s",
+ $self->cmd_as_string,
+ $pid, ((kill 0, $pid) ? "" : ", exited"),
+ $stderr_msg;
+ }
+ die join(", ", "No response received", @msg)."\n";
+ }
+
+ $self->trace_msg("Response received: $encoded_response\n",0)
+ if $trace >= 4;
+
+ $self->trace_msg("Gofer stream stderr message: $stderr_msg\n",0)
+ if $stderr_msg && $trace;
+
+ my $frozen_response = pack("H*", $encoded_response);
+
+ # XXX need to be able to detect and deal with corruption
+ my $response = $self->thaw_response($frozen_response);
+
+ if ($stderr_msg) {
+ # add stderr messages as warnings (for PrintWarn)
+ $response->add_err(0, $stderr_msg, undef, $trace)
+ # but ignore warning from old version of blib
+ unless $stderr_msg =~ /^Using .*blib/ && "@$cmd" =~ /-Mblib/;
+ }
+
+ return $response;
+}
+
+sub transport_timedout {
+ my $self = shift;
+ $self->_connection_kill;
+ return $self->SUPER::transport_timedout(@_);
+}
+
+1;
+
+__END__
+
+=head1 NAME
+
+DBD::Gofer::Transport::stream - DBD::Gofer transport for stdio streaming
+
+=head1 SYNOPSIS
+
+ DBI->connect('dbi:Gofer:transport=stream;url=ssh:username@host.example.com;dsn=dbi:...',...)
+
+or, enable by setting the DBI_AUTOPROXY environment variable:
+
+ export DBI_AUTOPROXY='dbi:Gofer:transport=stream;url=ssh:username@host.example.com'
+
+=head1 DESCRIPTION
+
+Without the C<url=> parameter it launches a subprocess as
+
+ perl -MDBI::Gofer::Transport::stream -e run_stdio_hex
+
+and feeds requests into it and reads responses from it. But that's not very useful.
+
+With a C<url=ssh:username@host.example.com> parameter it uses ssh to launch the subprocess
+on a remote system. That's much more useful!
+
+It gives you secure remote access to DBI databases on any system you can login to.
+Using ssh also gives you optional compression and many other features (see the
+ssh manual for how to configure that and many other options via ~/.ssh/config file).
+
+The actual command invoked is something like:
+
+ ssh -xq ssh:username@host.example.com bash -c $setup $run
+
+where $run is the command shown above, and $command is
+
+ . .bash_profile 2>/dev/null || . .bash_login 2>/dev/null || . .profile 2>/dev/null; exec "$@"
+
+which is trying (in a limited and fairly unportable way) to setup the environment
+(PATH, PERL5LIB etc) as it would be if you had logged in to that system.
+
+The "C<perl>" used in the command will default to the value of $^X when not using ssh.
+On most systems that's the full path to the perl that's currently executing.
+
+
+=head1 PERSISTENCE
+
+Currently gofer stream connections persist (remain connected) after all
+database handles have been disconnected. This makes later connections in the
+same process very fast.
+
+Currently up to 5 different gofer stream connections (based on url) can
+persist. If more than 5 are in the cache when a new connection is made then
+the cache is cleared before adding the new connection. Simple but effective.
+
+=head1 TO DO
+
+Document go_perl attribute
+
+Automatically reconnect (within reason) if there's a transport error.
+
+Decide on default for persistent connection - on or off? limits? ttl?
+
+=head1 AUTHOR
+
+Tim Bunce, L<http://www.tim.bunce.name>
+
+=head1 LICENCE AND COPYRIGHT
+
+Copyright (c) 2007, Tim Bunce, Ireland. All rights reserved.
+
+This module is free software; you can redistribute it and/or
+modify it under the same terms as Perl itself. See L<perlartistic>.
+
+=head1 SEE ALSO
+
+L<DBD::Gofer::Transport::Base>
+
+L<DBD::Gofer>
+
+=cut