diff options
Diffstat (limited to 'ndb/tools/ndbnet')
34 files changed, 6072 insertions, 0 deletions
diff --git a/ndb/tools/ndbnet/Makefile.PL b/ndb/tools/ndbnet/Makefile.PL new file mode 100644 index 00000000000..4b27a17de15 --- /dev/null +++ b/ndb/tools/ndbnet/Makefile.PL @@ -0,0 +1,158 @@ +# -*- perl -*- + +use strict; +use Config; +use ExtUtils::MakeMaker qw(WriteMakefile); +use Test::Harness; + +require 5.005; + +my $base; +if ($base ||= $ENV{NDB_BASE}) { + warn "Using NDB_BASE=$base\n"; +} +$base or die "FATAL: need env.variable NDB_BASE\n"; + +my $top; +if ($top ||= $ENV{NDB_TOP}) { + warn "Using NDB_TOP=$top\n"; +} +$top or die "FATAL: need env.variable NDB_TOP\n"; + +my @scripts = qw(ndbnet.pl ndbnetd.pl); + +for my $f (@scripts) { + my $p = $f; + $p =~ s/\.pl$//; + unlink("$p.sh"); + open(G, ">$p.sh") or die "$p.sh: $!"; + if ($Config{osname} ne 'MSWin32') { + print G <<END; +#! /bin/sh + +# installed in \$NDB_BASE +# selects which $p to run (normally from latest release) +# created in source directory by "make install-base" + +NDB_BASE=$base +export NDB_BASE + +PATH=\$NDB_BASE/bin:\$PATH +export PATH + +LD_LIBRARY_PATH=\$NDB_BASE/lib:\$LD_LIBRARY_PATH +export LD_LIBRARY_PATH + +PERL5LIB=\$NDB_BASE/lib/perl5:\$PERL5LIB +export PERL5LIB + +NDB_TOP=$top +export NDB_TOP + +PATH=\$NDB_TOP/bin:\$PATH +export PATH + +LD_LIBRARY_PATH=\$NDB_TOP/lib:\$LD_LIBRARY_PATH +export LD_LIBRARY_PATH + +PERL5LIB=\$NDB_TOP/lib/perl5:\$PERL5LIB +export PERL5LIB + +exec perl \$NDB_TOP/lib/perl5/$p.pl "\$@" +END + } else { + print G <<END; +rem installed in \$NDB_BASE +rem selects which $p to run (normally from latest release) +rem created in source directory by "make install-base" + +set NDB_BASE=$base +set PATH=%NDB_BASE%\\bin;%PATH% +set PERL5LIB=%NDB_BASE%\\lib\\perl5;%PERL5LIB% +set NDB_TOP=$top +set PATH=%NDB_TOP%\\bin;%PATH% +set PERL5LIB=%NDB_TOP%\\lib\\perl5;%PERL5LIB% +perl %NDB_TOP%\\lib\\perl5\\$p.pl %1 %2 %3 %4 %5 %6 %7 %8 %9 +END + } + close G; +} + +unshift(@INC, 'lib'); +$main::onlymodules = 1; +require lib::NDB::Util; +require lib::NDB::Net; +require lib::NDB::Run; + +my @modules = ( + q(NDB::Util), + @NDB::Util::modules, + q(NDB::Net), + @NDB::Net::modules, + q(NDB::Run), + @NDB::Run::modules, +); + +my @modulepaths = map { s!::!/!g; s!$!.pm!; $_ } @modules; + +my %pm = (); +for my $pl (@scripts) { + $pm{"$pl"} = "\$(INST_LIBDIR)/$pl"; +} +for my $pm (@modulepaths) { + $pm{"lib/$pm"} = "\$(INST_LIBDIR)/$pm"; +} + +WriteMakefile( + NAME=> 'NDB', + PM=> \%pm, + EXE_FILES=> [ qw(ndbrun) ], +# install + PREFIX=> $top, + LIB=> "$top/lib/perl5", +); + +sub MY::postamble { + my $mk = ""; + $mk .= "\n" . <<END; +# NDB make targets +libs: all install +bins: +links: +depend: +clean_dep: +#clean: +cleanall: +tidy: +#distclean: +check: + perl -Ilib -cw -e "use NDB::Util" + perl -Ilib -cw -e "use NDB::Net" + perl -Ilib -cw -e "use NDB::Run" + perl -Ilib -cw ndbnetd.pl + perl -Ilib -cw ndbnet.pl +END + if ($Config{osname} ne 'MSWin32') { + $mk .= "\n" . <<END; +# install startup scripts to \$NDB_BASE +install-base: + test "\$\$NDB_BASE" + mkdir -p \$\$NDB_BASE/bin + rm -f \$\$NDB_BASE/bin/ndbnet + cp -p ndbnet.sh \$\$NDB_BASE/bin/ndbnet + chmod +x \$\$NDB_BASE/bin/ndbnet + rm -f \$\$NDB_BASE/bin/ndbnetd + cp -p ndbnetd.sh \$\$NDB_BASE/bin/ndbnetd + chmod +x \$\$NDB_BASE/bin/ndbnetd +END + } else { + $mk .= "\n" . <<END; +install-base: + copy ndbnet.sh $base\\bin\\ndbnet.bat + copy ndbnetd.sh $base\\bin\\ndbnetd.bat +END + } + return $mk; +} + +1; diff --git a/ndb/tools/ndbnet/lib/NDB/Net.pm b/ndb/tools/ndbnet/lib/NDB/Net.pm new file mode 100644 index 00000000000..3b7b16bb3cf --- /dev/null +++ b/ndb/tools/ndbnet/lib/NDB/Net.pm @@ -0,0 +1,42 @@ +package NDB::Net; + +use strict; +use Carp; +require Exporter; + +use NDB::Util; + +use vars qw(@ISA @EXPORT @EXPORT_OK); +@ISA = qw(Exporter); + +use vars qw(@modules); +@modules = qw( + NDB::Net::Base + NDB::Net::Client + NDB::Net::Command + NDB::Net::Config + NDB::Net::Database + NDB::Net::Env + NDB::Net::Node + NDB::Net::NodeApi + NDB::Net::NodeDb + NDB::Net::NodeMgmt + NDB::Net::Server + NDB::Net::ServerINET + NDB::Net::ServerUNIX +); + +return 1 if $main::onlymodules; + +for my $module (@modules) { + eval "require $module"; + $@ and confess "$module $@"; +} + +for my $module (@modules) { + eval "$module->initmodule"; + $@ and confess "$module $@"; +} + +1; +# vim:set sw=4: diff --git a/ndb/tools/ndbnet/lib/NDB/Net/Base.pm b/ndb/tools/ndbnet/lib/NDB/Net/Base.pm new file mode 100644 index 00000000000..900446138e8 --- /dev/null +++ b/ndb/tools/ndbnet/lib/NDB/Net/Base.pm @@ -0,0 +1,12 @@ +package NDB::Net::Base; + +use strict; +use Carp; + +require NDB::Util::Base; + +use vars qw(@ISA); +@ISA = qw(NDB::Util::Base); + +1; +# vim:set sw=4: diff --git a/ndb/tools/ndbnet/lib/NDB/Net/Client.pm b/ndb/tools/ndbnet/lib/NDB/Net/Client.pm new file mode 100644 index 00000000000..d34a18d63af --- /dev/null +++ b/ndb/tools/ndbnet/lib/NDB/Net/Client.pm @@ -0,0 +1,252 @@ +package NDB::Net::Client; + +use strict; +use Carp; +use POSIX(); +use Socket; + +require NDB::Net::Base; + +use vars qw(@ISA); +@ISA = qw(NDB::Net::Base); + +# constructors + +my $log; + +sub initmodule { + $log = NDB::Util::Log->instance; +} + +my %clientcache = (); +my $clientid = 0; + +NDB::Net::Client->attributes( + id => sub { /^\d+$/ }, + addtime => sub { /^\d+$/ }, + state => sub { /^(new|input|cmd)$/ }, + socket => sub { ref && $_->isa('NDB::Util::Socket') }, + serversocket => sub { ref && $_->isa('NDB::Util::Socket') }, + serverlock => sub { ref && $_->isa('NDB::Util::Lock') }, + event => sub { ref && $_->isa('NDB::Util::Event') }, + context => sub { defined }, + cmd => sub { ref && $_->isa('NDB::Net::Command') }, +); + +sub desc { + my $client = shift; + my $id = $client->getid; + my $fileno = fileno($client->getsocket->getfh); + return "client $id fd=$fileno"; +} + +sub new { + my $class = shift; + @_ % 2 == 0 or confess 0+@_; + my(%attr) = @_; + my $client = $class->SUPER::new(%attr); + $client->setid(++$clientid) + or $log->push, return undef; + $client->setaddtime(time) + or $log->push, return undef; + $client->setstate(q(new)) + or $log->push, return undef; + $client->setsocket($attr{socket}) + or $log->push, return undef; + $client->setserversocket($attr{serversocket}) + or $log->push, return undef; + $client->setserverlock($attr{serverlock}) + or $log->push, return undef; + $client->setevent($attr{event}) + or $log->push, return undef; + $client->setcontext($attr{context}) + or $log->push, return undef; + $log->put("add")->push($client)->info; + $clientcache{$client->getid} = $client; + return $client; +} + +sub listall { + my $class = shift; + my $list = []; + for my $id (sort { $a <=> $b } keys %clientcache) { + my $client = $clientcache{$id}; + push(@$list, $client); + } + return $list; +} + +sub exists { + my $client = shift; + return exists($clientcache{$client->getid}); +} + +sub delete { + my $client = shift; + $log->put("delete")->push($client)->info; + $client->getevent->clear($client->getsocket, 'r'); + $client->getsocket->close; + delete $clientcache{$client->getid} or confess 'oops'; +} + +sub deleteother { + my $thisclient = shift; + for my $id (sort { $a <=> $b } keys %clientcache) { + my $client = $clientcache{$id}; + if ($client ne $thisclient) { + $client->delete; + } + } +} + +sub deleteall { + my $class = shift; + for my $id (sort { $a <=> $b } keys %clientcache) { + my $client = $clientcache{$id}; + $client->delete; + } +} + +# processing + +sub processnew { + my $client = shift; + @_ == 0 or confess 0+@_; + $log->put("process new")->push($client)->debug; + $client->getevent->set($client->getsocket, 'r'); + $log->attachuser(io => $client->getsocket); + $client->setstate(q(input)) + or $log->push, return undef; + return 1; +} + +sub processinput { + my $client = shift; + @_ == 0 or confess 0+@_; + $log->put("process input")->push($client)->debug; + my $line = $client->getsocket->readline; + if (! defined($line)) { + $log->push; + return undef; + } + if (length($line) == 0) { + if ($client->getsocket->getreadend) { + $log->put("no command")->push($client); + return undef; + } + $log->put("wait for input")->push($client)->debug; + return 1; + } + $log->put("got line: $line")->push($client)->info; + $client->getevent->clear($client->getsocket, 'r'); + my $cmd = NDB::Net::Command->new(line => $line) + or $log->push, return undef; + $log->put("command received")->push($cmd)->push($client)->debug; + $client->setcmd($cmd) + or $log->push, return undef; + $client->setstate(q(cmd)) + or $log->push, return undef; + return 1; +} + +sub processcmd { + my $client = shift; + @_ == 0 or confess 0+@_; + $log->put("process cmd")->push($client)->debug; + my $cmd = $client->getcmd; + my $context = $client->getcontext; + my $name_fg = "cmd_" . $cmd->getname . "_fg"; + my $name_bg = "cmd_" . $cmd->getname . "_bg"; + my $fg = $context->can($name_fg); + my $bg = $context->can($name_bg); + unless ($fg || $bg) { + $log->put("%s: unimplemented", $cmd->getname); + return undef; + } + my $ret; + if ($fg) { + $log->put($name_fg)->push($cmd)->push($client)->info; + if (! ref($context)) { + $ret = &$fg($cmd); + } + else { + $ret = &$fg($context, $cmd); + } + defined($ret) + or $log->push, return undef; + if (! $bg) { + $log->push($name_fg)->putvalue($ret)->user; + return 1; + } + } + if ($bg) { + $log->put($name_bg)->push($cmd)->push($client)->info; + my $pid = fork; + if (! defined($pid)) { + $log->put("fork failed: $!"); + return undef; + } + if ($pid == 0) { + $client->getserversocket->close; + $client->getserverlock->close; + $client->deleteother; + if (! ref($context)) { + $ret = &$bg($cmd); + } + else { + $ret = &$bg($context, $cmd); + } + if (! $ret) { + $log->push($client)->error; + $log->push($name_bg)->putvalue(undef)->user; + exit(1); + } + $log->push($name_bg)->putvalue($ret)->user; + exit(0); + } + } + return 1; +} + +sub process { + my $client = shift; + @_ == 0 or confess 0+@_; + try: { + if ($client->getstate eq q(new)) { + $client->processnew + or $log->push, last try; + } + if ($client->getstate eq q(input)) { + $client->processinput + or $log->push, last try; + } + if ($client->getstate eq q(cmd)) { + $client->processcmd + or $log->push, last try; + $log->detachuser; + $client->delete; + return 1; + } + return 1; + } + $log->push($client)->error; + $log->putvalue(undef)->user; + $log->detachuser; + $client->delete; + return undef; +} + +sub processall { + my $class = shift; + @_ == 0 or confess 0+@_; + my $list = $class->listall; + for my $client (@$list) { + $client->process; + } + while ((my $pid = waitpid(-1, &POSIX::WNOHANG)) > 0) { + $log->put("harvested pid=$pid")->info; + } +} + +1; +# vim:set sw=4: diff --git a/ndb/tools/ndbnet/lib/NDB/Net/Command.pm b/ndb/tools/ndbnet/lib/NDB/Net/Command.pm new file mode 100644 index 00000000000..30145d09fa9 --- /dev/null +++ b/ndb/tools/ndbnet/lib/NDB/Net/Command.pm @@ -0,0 +1,641 @@ +package NDB::Net::Command; + +use strict; +use Carp; +use Getopt::Long; +use Text::ParseWords (); +use Text::Tabs (); + +require NDB::Net::Base; + +use vars qw(@ISA); +@ISA = qw(NDB::Net::Base); + +# constructors + +my $log; + +sub initmodule { + $log = NDB::Util::Log->instance; +} + +my($cmdtab, $aliastab); + +NDB::Net::Command->attributes( + name => sub { /^\s*\w+\b/ }, + argv => sub { ref eq 'ARRAY' }, + optspec => sub { ref eq 'ARRAY' }, + argspec => sub { /^\d+$/ || ref eq 'CODE' }, + short => sub { defined && ! ref }, + help => sub { defined && ! ref }, + opts => sub { ref eq 'HASH' }, + args => sub { ref eq 'ARRAY' }, +); + +sub desc { + my $cmd = shift; + return "command " . $cmd->getname("?"); +}; + +sub processname { + my $cmd = shift; + @_ == 0 or confess 0+@_; + my $cmdargv = $cmd->getargv; + my $name = shift(@$cmdargv); + my %seen = (); + while ((my $entry) = grep($name eq $_->{name}, @$aliastab)) { + $seen{$name}++ && last; + unshift(@$cmdargv, split(' ', $entry->{value})); + $name = shift(@$cmdargv); + } + if ((my $entry) = grep($_->{name} eq $name, @$cmdtab)) { + $cmd->setname($entry->{name}) + or $log->push, return undef; + $cmd->setoptspec($entry->{optspec}) + or $log->push, return undef; + $cmd->setargspec($entry->{argspec}) + or $log->push, return undef; + } + else { + $log->put("$name: undefined")->push($cmd); + return undef; + } + return 1; +} + +sub getopttype { + my $cmd = shift; + my($key) = @_; + if (grep(/^$key$/, @{$cmd->getoptspec})) { + return 1; + } + if (grep(/^$key=/, @{$cmd->getoptspec})) { + return 2; + } + return undef; +} + +sub processargv { + my $cmd = shift; + @_ == 0 or confess 0+@_; + my $cmdargv = $cmd->getargv; + my @newargv = (); + while (@$cmdargv) { + my $v = shift(@$cmdargv); + if (! defined($v)) { + next; + } + if (ref($v) eq 'ARRAY') { + unshift(@$cmdargv, @$v); # push back + next; + } + if (ref($v) eq 'HASH') { + for my $k (sort keys %$v) { + if ($cmd->getopttype($k) == 1) { + push(@newargv, "--$k"); + next; + } + if ($cmd->getopttype($k) == 2) { + push(@newargv, "--$k", $v->{$k}); + next; + } + $log->put("$k: undefined option")->push($cmd); + return undef; + } + next; + } + if (ref($v)) { + confess 'oops'; + } + push(@newargv, $v); + } + push(@$cmdargv, @newargv); + return 1; +} + +sub processopts { + my $cmd = shift; + @_ == 0 or confess 0+@_; + my $cmdargv = $cmd->getargv; + local(@ARGV) = @$cmdargv; + try: { + local $SIG{__WARN__} = sub { + my $errstr = "@_"; + while (chomp($errstr)) {} + $log->put($errstr)->push($cmd); + }; + $cmd->setopts({}) + or $log->push, return undef; + Getopt::Long::Configure(qw( + default no_getopt_compat no_ignore_case + )); + GetOptions($cmd->getopts, @{$cmd->getoptspec}) + or return undef; + } + $cmd->setargs([ @ARGV ]) + or $log->push, return undef; + return 1; +} + +sub processargs { + my $cmd = shift; + @_ == 0 or confess 0+@_; + my $cmdargs = $cmd->getargs; + if ($cmd->getargspec =~ /^\d+$/) { + if (@$cmdargs != $cmd->getargspec) { + $log->put("invalid arg count %d != %d", + scalar(@$cmdargs), $cmd->getargspec)->push($cmd); + return undef; + } + } + if (ref($cmd->getargspec) eq 'CODE') { + local $_ = scalar(@$cmdargs); + if (! &{$cmd->getargspec}()) { + $log->put("invalid arg count %d", + scalar(@$cmdargs))->push($cmd); + return undef; + } + } + return 1; +} + +sub new { + my $class = shift; + @_ % 2 == 0 or confess 0+@_; + my %attr = @_; + my $cmd = $class->SUPER::new(%attr); + my $cmdargv = []; + $cmd->setargv($cmdargv) + or $log->push, return undef; + my $line = $attr{line}; + my $argv = $attr{argv}; + defined($line) != defined($argv) # exactly one + or confess 'oops'; + if (defined($line)) { + ! ref($line) or confess 'oops'; + push(@$cmdargv, Text::ParseWords::shellwords($line)); + } + if (defined($argv)) { + ref($argv) eq 'ARRAY' or confess 'oops'; + push(@$cmdargv, @$argv); + } + if (! @$cmdargv) { + $log->put("empty command"); + return undef; + } + $cmd->processname + or $log->push, return undef; + $cmd->processargv + or $log->push, return undef; + $cmd->processopts + or $log->push, return undef; + $cmd->processargs + or $log->push, return undef; + return $cmd; +} + +sub getline { + my $cmd = shift; + @_ == 0 or confess 0+@_; + my @text = ($cmd->getname); + for my $k (sort keys %{$cmd->getopts}) { + if ($cmd->getopttype($k) == 1) { + push(@text, "--$k"); + next; + } + if ($cmd->getopttype($k) == 2) { + push(@text, "--$k", quotemeta($cmd->getopts->{$k})); + next; + } + confess 'oops'; + } + for my $s (@{$cmd->getargs}) { + push(@text, quotemeta($s)); + } + return "@text"; +} + +sub setopt { + my $cmd = shift; + my($key, $value) = @_; + if ($cmd->getopttype($key) == 1) { + @_ == 1 or confess 0+@_; + $cmd->getopts->{$key} = 1; + } + elsif ($cmd->getopttype($key) == 2) { + @_ == 2 or confess 0+@_; + $cmd->getopts->{$key} = $value; + } + else { + confess 'oops'; + } +} + +sub getopt { + my $cmd = shift; + @_ == 1 or confess 0+@_; + my($key) = @_; + $cmd->getopttype($key) or confess 'oops'; + return $cmd->getopts->{$key}; +} + +sub setarg { + my $cmd = shift; + @_ == 2 or confess 0+@_; + my($idx, $value) = @_; + $cmd->getargs->[$idx] = $value; +} + +sub getarg { + my $cmd = shift; + @_ == 1 or confess 0+@_; + my($idx) = @_; + return $cmd->getargs->[$idx]; +} + +sub getarglist { + my $cmd = shift; + @_ == 1 or confess 0+@_; + my($idx) = @_; + my @args = @{$cmd->getargs}; + @args = @args[$idx..$#args]; + return \@args; +} + +sub helptext { + my $cmd = shift; + @_ <= 1 or confess 0+@_; + my $name = $cmd->getargs->[0]; + my $text = ""; + my $indent = " "x4; + if (defined($name)) { + for my $entry (@$aliastab) { + if ($entry->{name} eq $name) { + $text .= "alias $name=\"$entry->{value}\"\n"; + ($name) = split(' ', $entry->{value}); + last; + } + } + } + else { + $text .= "COMMANDS\n"; + } + for my $entry (@$cmdtab) { + if (defined($name)) { + if ($entry->{name} eq $name) { + $text .= uc($name) . "\n"; + for my $t (split(/\n/, $entry->{help})) { + $text .= $indent; + $text .= Text::Tabs::expand($t) . "\n"; + } + last; + } + } + else { + $text .= $indent; + $text .= sprintf("%-16s%s\n", $entry->{name}, $entry->{short}); + } + } + if (! $text) { + $log->put("$name: undefined"); + return undef; + } + return $text; +} + +sub aliastext { + my $cmd = shift; + @_ == 0 or confess 0+@_; + my $text = ""; + my $indent = " "x4; + $text .= "ALIASES\n"; + for my $entry (@$aliastab) { + $text .= $indent; + $text .= sprintf("%-16s%s\n", $entry->{name}, $entry->{value}); + } + return $text; +} + +# commands +# name command name (unique) +# optspec option spec in Getopt::Long style +# argspec arg count (number or sub) +# short one line summary +# help long help text +# opts options HASH (after parse) +# args arguments ARRAY (after parse) + +$cmdtab = [ + { + name => "help", + optspec => [ qw() ], + argspec => sub { $_[0] <= 1 }, + short => "print help (try: h h)", + help => <<END, +help [name] +name command name or alias + +Print help summary or longer help text for one command. + +General: + +Options can be placed anywhere on command line and can be abbreviated. +Example: "start db11 -i" instead of "start --init_rm db11". + +Several commands have internal option --local which makes current server +do the work, instead of passing it to other servers. This option should +not be used explicitly, except for testing. +END + }, + { + name => "alias", + optspec => [ qw() ], + argspec => 0, + short => "list aliases", + help => <<END, +alias + +List built-in aliases. New ones cannot be defined (yet). +END + }, + { + name => "quit", + optspec => [ qw() ], + argspec => 0, + short => "exit ndbnet", + help => <<END, +quit + +Exit ndbnet client. +END + }, + { + name => "server", + optspec => [ qw(all direct pass parallel script=s local) ], + argspec => sub { $_ >= 1 }, + short => "net server commands", + help => <<END, +server action id... [options] +action start restart stop ping +id net server id from net config +--all do all servers listed in net config +--direct do not use a server +--pass pass current ndb environment to remote command +--parallel run in parallel when possible +--script path remote script instead of "ndbnetd" +--local for internal use by servers + +Each host needs one net server (ndbnetd). It should be started +from latest ndb installation, for example at system boot time. +A "server ping" is used to check that all servers are up (option +--all is added if no server ids are given). + +Other actions are mainly for testing. A "server start" tries to +start servers via "ssh". This does not work if "ssh" is not allowed +or if the remote command does not get right environment. + +Option --direct makes this ndbnet client do the work. It is assumed +for "server start" and it requires that a local net config exists. +Option --pass is useful in a homogeneous (NFS) environment. + +There are aliases "startserver" for "server start", etc. +END + }, + { + name => "start", + optspec => [ qw(init_rm nostart stop kill config old home=s clean proxy=s) ], + argspec => 1, + short => "start database", + help => <<END, +start dbname [options] +dbname database name +--init_rm destroy existing database files on each node +--nostart for DB nodes only do "ndb -n" +--stop do "stop dbname" first +--kill do "kill dbname" first +--config create run config but start no processes +--old use existing config files +--home dir override home (product dir) from config +--clean passed to startnode +--proxy list generate proxy ports (read the source) + +Start a database as follows: + +- start mgmt servers on all mgmt nodes +- start ndb processes on all db nodes +- send "all start" to first mgmt server (redundant) +- start processes on all api nodes (if runtype!="manual") + +Older database versions (v1.0) are started similarly except that there +are no management servers. + +The --proxy option is used for testing network problems. +END + }, + { + name => "startnode", + optspec => [ qw(init_rm nostart config old run=s home=s local clean proxy=s) ], + argspec => 2, + short => "start database node", + help => <<END, +startnode dbname nodeid [options] +dbname database name +nodeid node number +--init_rm destroy existing database files (if db node) +--nostart if DB node only do "ndb -n" +--config create run config but start no processes +--old use existing config files +--run cmd run this shell command, default from config file +--home dir override home (product dir) from config +--local node must be local to this ndbnet server +--clean remove old node dir first +--proxy list processed by mgmt nodes, see "start" command + +Start the process on one database node. The node can be of any type +(mgmt/db/api). If already running, does nothing. + +The --run option specifies a simple shell command (not pipeline etc). +Defaults: + +- mgmt node => mgmtsrvr -p port -l Ndb.cfg -i config.txt -c config.bin + where port comes from ndbnet.xml +- db node => ndb +- api node => based on ndbnet config, default empty + +The node server exits when the command exits (unless runtype is set to +auto). Command exit status is not available. + +Used internally by db "start" command. +END + }, + { + name => "stop", + optspec => [ qw() ], + argspec => 1, + short => "stop database", + help => <<END, +stop dbname [options] +dbname database name + +Stop a database as follows (see also "stopnode" command): + +- send SIGTERM to api processes, wait for them to exit +- send "all stop" command to first mgmt server +- wait for db processes to exit +- send "quit" to mgmt servers, wait for them to exit +END + }, + { + name => "stopnode", + optspec => [ qw(local) ], + argspec => 2, + short => "stop process on one node", + help => <<END, +stopnode dbname nodeid [options] +dbname database name +nodeid node number +--local node must be local to this server + +Stop process on one database node. Action depends on node type: + +- api node: send SIGTERM to the process, wait for it to exit +- db node: no action, wait for the ndb process to exit +- mgmt node: send "quit" command to mgmt server, wait for it to exit + +Used internally by db "stop" command. +END + }, + { + name => "kill", + optspec => [ qw() ], + argspec => 1, + short => "kill processes on all nodes", + help => <<END, +kill dbname [options] +dbname database name + +Send SIGKILL to processes on all nodes and wait for them to exit. +END + }, + { + name => "killnode", + optspec => [ qw(local) ], + argspec => 2, + short => "kill process on one node", + help => <<END, +killnode dbname nodeid [options] +dbname database name +nodeid node number +--local node must be local to this server + +Send SIGKILL to the process on the node and wait for it to exit. + +Used internally by db "kill" command. +END + }, + { + name => "statnode", + optspec => [ qw(local) ], + argspec => 2, + short => "get node run status (internal)", + help => <<END, +statnode dbname nodeid [options] +dbname database name +nodeid node number +--local node must be local to this server + +Get node run status (up/down) as a process. Used internally +and may not produce any output in ndbnet command. +END + }, + { + name => "list", + optspec => [ qw(quick short) ], + argspec => sub { 1 }, + short => "list databases", + help => <<END, +list [dbname] [options] +dbname database name, default is to list all +--quick only output config, do not query status +--short do list nodes + +List databases and nodes. Internally returns a data structure +of process and mgmt server status values for each node. Externally +(in ndbnet command) this is formatted as a listing. +END + }, + { + name => "writenode", + optspec => [ qw(wait=i local) ], + argspec => 3, + short => "write line of text to the process on a node", + help => <<END, +writenode dbname nodeid "some text" +dbname database name +nodeid node number +"some text" arbitrary text (quote if spaces) +--wait n wait n seconds for any response +--local node must be local to this server + +Write the text and a newline to the standard input of the process +running on the node. If wait > 0 is specified, prints whatever +the process wrote to stdout/stderr during that time. + +Used internally by "start" and other commands. +END + }, +]; + +# aliases +# name alias +# value expansion + +$aliastab = [ + { + name => "h", + value => "help", + }, + { + name => "q", + value => "quit", + }, + { + name => "EOF", + value => "quit", + }, + { + name => "startserver", + value => "server start", + }, + { + name => "ss", + value => "server start", + }, + { + name => "restartserver", + value => "server restart", + }, + { + name => "rss", + value => "server restart", + }, + { + name => "stopserver", + value => "server stop", + }, + { + name => "pingserver", + value => "server ping", + }, + { + name => "ps", + value => "server ping", + }, + { + name => "l", + value => "list", + }, +]; + +1; +# vim:set sw=4: diff --git a/ndb/tools/ndbnet/lib/NDB/Net/Config.pm b/ndb/tools/ndbnet/lib/NDB/Net/Config.pm new file mode 100644 index 00000000000..4c5db3cd3f5 --- /dev/null +++ b/ndb/tools/ndbnet/lib/NDB/Net/Config.pm @@ -0,0 +1,235 @@ +package NDB::Net::Config; + +use strict; +use Carp; +use Symbol; +use Socket; +use Errno; +use XML::Parser; + +require NDB::Net::Base; + +use vars qw(@ISA); +@ISA = qw(NDB::Net::Base); + +# constructors + +my $log; + +sub initmodule { + $log = NDB::Util::Log->instance; +} + +NDB::Net::Config->attributes( + file => sub { /^\S+$/ }, + loadtime => sub { /^\d+$/ }, +); + +sub new { + my $class = shift; + @_ % 2 == 0 or confess 0+@_; + my(%attr) = @_; + my $netcfg = $class->SUPER::new(%attr); + $netcfg->setfile($attr{file}) + or $log->put, return undef; + return $netcfg; +} + +sub desc { + my $netcfg = shift; + return $netcfg->getfile; +} + +use vars qw(@context); + +sub handle_start { + my($parser, $tag, %attr) = @_; + my $p = $context[-1]; + my $q = {}; + $p->{$tag} ||= []; + push(@{$p->{$tag}}, $q); + for my $k (keys %attr) { + $q->{$k} = $attr{$k}; + } + push(@context, $q); + return 1; +} + +sub handle_end { + my($parser, $tag, %attr) = @_; + pop(@context); + return 1; +} + +sub load { + my $netcfg = shift; + my $file = $netcfg->getfile; + my @s; + while (1) { + if (@s = stat($file)) { + last; + } + $log->put("$file: stat failed: $!"); + if (! $!{ESTALE}) { + return undef; + } + $log->put("(retry)")->info; + sleep 1; + } + if ($s[9] <= $netcfg->getloadtime(0)) { + return 1; + } + my $fh = gensym(); + if (! open($fh, "<$file")) { + $log->put("$file: open for read failed: $!"); + return undef; + } + my $text = ""; + my $line; + while (defined($line = <$fh>)) { + $text .= $line; + } + close($fh); + my $parser = XML::Parser->new( + ParseParamEnt => 1, + Handlers => { + Start => \&handle_start, + End => \&handle_end, + }, + ); + delete $netcfg->{config}; + local @context = ($netcfg); + $parser->parse($text); + $netcfg->{text} = $text; + $netcfg->{config} = $netcfg->{config}[0]; + $netcfg->setloadtime(time) + or $log->push, return undef; + NDB::Net::Server->deleteall; + NDB::Net::Database->deleteall; + NDB::Net::Node->deleteall; + return 1; +} + +sub getservers { + my $netcfg = shift; + @_ == 0 or confess 0+@_; + my $servers = []; + my $slist = $netcfg->{config}{server} || []; + for my $s (@$slist) { + my $server; + $server = NDB::Net::ServerINET->get($s->{id}); + if (! $server) { + $server = NDB::Net::ServerINET->new(%$s); + if (! $server) { + $log->push($netcfg)->warn; + next; + } + } + push(@$servers, $server); + } + return $servers; +} + +sub getdatabases { + my $netcfg = shift; + @_ == 0 or confess 0+@_; + my $databases = []; + my $dlist = $netcfg->{config}{database} || []; + for my $d (@$dlist) { + if ($d->{isproto} eq "y") { + next; + } + if ($d->{name} !~ /^\w(\w|-)*$/) { + $log->put("$d->{name}: invalid db name")->push($netcfg)->warn; + next; + } + my $db = $netcfg->getdatabase($d->{name}); + if (! $db) { + $log->push->warn; + next; + } + push(@$databases, $db); + } + return $databases; +} + +sub getdatabase { + my $netcfg = shift; + @_ == 1 or confess 0+@_; + my($name) = @_; + $netcfg->getservers or return undef; # cache them + my $default = $netcfg->{config}{default}[0] || {}; + my $db; + my $dlist = $netcfg->{config}{database} || []; + my $nlist; + for my $d (@$dlist) { + ($d->{name} ne $name) && next; + if ($d->{isproto} eq "y") { + next; + } + my %attr = (%$default, %$d); + $db = NDB::Net::Database->new(%attr); + if (! $db) { + $log->push($netcfg); + return undef; + } + if ($d->{proto}) { + if ($d->{isproto} eq "y") { + $log->put("$name: prototypes cannot be recursive"); + return undef; + } + for my $d2 (@$dlist) { + ($d2->{name} ne $d->{proto}) && next; + if ($d2->{isproto} ne "y") { + $log->put("$name: $d2->{name} is not a prototype"); + return undef; + } + if (! $d->{node}) { + $d->{node} = $d2->{node}; + } + last; + } + } + $nlist = $d->{node} || []; + last; + } + if (! $db) { + $log->put("$name: no such db")->push($netcfg); + return undef; + } + if (! @$nlist) { + $log->put("$name: empty node list")->push($netcfg); + return undef; + } + for my $n (@$nlist) { + my $node; + try: { + my $server = NDB::Net::Server->get($n->{server}) + or last try; + my %attr = (%$n, db => $db, server => $server); + my $type = $attr{type}; + if ($type eq 'db') { + $node = NDB::Net::NodeDb->new(%attr) + or last try; + } + if ($type eq 'mgmt') { + $node = NDB::Net::NodeMgmt->new(%attr) + or last try; + } + if ($type eq 'api') { + $node = NDB::Net::NodeApi->new(%attr) + or last try; + } + $log->put("bad node type '$type'"); + } + if (! $node) { + $log->push($netcfg); + $db->delete; + return undef; + } + } + return $db; +} + +1; +# vim:set sw=4: diff --git a/ndb/tools/ndbnet/lib/NDB/Net/Database.pm b/ndb/tools/ndbnet/lib/NDB/Net/Database.pm new file mode 100644 index 00000000000..7ea15be0650 --- /dev/null +++ b/ndb/tools/ndbnet/lib/NDB/Net/Database.pm @@ -0,0 +1,321 @@ +package NDB::Net::Database; + +use strict; +use Carp; +use Symbol; + +require NDB::Net::Base; + +use vars qw(@ISA); +@ISA = qw(NDB::Net::Base); + +# constructors + +my $log; + +sub initmodule { + $log = NDB::Util::Log->instance; +} + +my %dbcache = (); + +NDB::Net::Database->attributes( + name => sub { s/^\s+|\s+$//g; /^\S+$/ && ! m!/! }, + comment => sub { defined }, + version => sub { /^\d+(\.\d+)*$/ }, + base => sub { $^O eq 'MSWin32' || m!^/\S+$! }, + home => sub { $^O eq 'MSWin32' || m!^/\S+$! }, + nodeport => sub { $_ > 0 }, +); + +sub desc { + my $db = shift; + return $db->getname; +} + +sub new { + my $class = shift; + @_ % 2 == 0 or confess 0+@_; + my(%attr) = @_; + my $db = $class->SUPER::new(%attr); + $db->setname($attr{name}) + or $log->push, return undef; + if ($dbcache{$db->getname}) { + $log->put("duplicate db")->push($db); + return undef; + } + $db->setcomment($attr{comment}); + $db->setversion($attr{version}) + or $log->push, return undef; + if (defined($attr{base})) { + $db->setbase($attr{base}) + or $log->push, return undef; + } + if (defined($attr{home})) { + if ($^O ne 'MSWin32' && $attr{home} !~ m!^/! && $db->hasbase) { + $attr{home} = $db->getbase . "/$attr{home}"; + } + $db->sethome($attr{home}) + or $log->push, return undef; + } + if (defined($attr{nodeport})) { + $db->setnodeport($attr{nodeport}) + or $log->push, return undef; + } + if ($^O eq 'MSWin32' && ! $db->hasnodeport) { + $log->put("nodeport required on windows")->push($db), return undef; + } + $db->{nodehash} = {}; + $dbcache{$db->getname} = $db; + return $db; +} + +sub delete { + my $db = shift; + my $nodelist = $db->getnodelist('all'); + for my $node (@$nodelist) { + $node->delete; + } + delete $dbcache{$db->getname}; +} + +sub deleteall { + my $class = shift; + for my $name (sort keys %dbcache) { + my $db = $dbcache{$name}; + $db->delete; + } +} + +# assume numerical dot separated version numbers like 1.1.2 +sub cmpversion { + my $db = shift; + my $version = shift; + my @x = split(/\./, $db->getversion); + my @y = split(/\./, $version); + while (@x || @y) { + return -1 if $x[0] < $y[0]; + return +1 if $x[0] > $y[0]; + shift(@x); + shift(@y); + } + return 0; +} + +# nodes + +sub addnode { + my $db = shift; + @_ == 1 or confess 0+@_; + my($node) = @_; + unless (ref($node) && $node->isa('NDB::Net::Node')) { + confess 'oops'; + } + my $id = $node->getid; + if ($db->{nodehash}{$id}) { + $log->put("$id: duplicate node id")->push($db); + return undef; + } + $db->{nodehash}{$id} = $node; + return 1; +} + +sub getnode { + my $db = shift; + @_ == 1 or confess 0+@_; + my($id) = @_; + $id += 0; + my $node = $db->{nodehash}{$id}; + if (! $node) { + $log->put("$id: no such node id")->push($db); + return undef; + } + return $node; +} + +sub getnodelist { + my $db = shift; + @_ == 1 or confess 0+@_; + my($type) = @_; + $type =~ /^(all|mgmt|db|api)$/ or confess 'oops'; + my @nodes = (); + for my $id (sort { $a <=> $b } keys %{$db->{nodehash}}) { + my $node = $db->{nodehash}{$id}; + if ($type eq 'all' or $type eq $node->gettype) { + push(@nodes, $node); + } + } + return \@nodes; +} + +# start /stop + +sub start { + my $db = shift; + @_ == 1 or confess 0+@_; + my($opts) = @_; + if ($opts->{stop} || $opts->{kill}) { + my $method = $opts->{stop} ? "stop" : "kill"; + my %opts = (); + $db->$method(\%opts) + or $log->push, return undef; + } + $log->put("start")->push($db)->info; + my $nodesmgmt = $db->getnodelist('mgmt'); + my $nodesdb = $db->getnodelist('db'); + my $nodesapi = $db->getnodelist('api'); + my $ret; + try: { + my %startopts = (); + for my $k (qw(local init_rm nostart config old home clean proxy)) { + $startopts{$k} = $opts->{$k} if defined($opts->{$k}); + } + my %writeopts = (); + for my $k (qw(local)) { + $writeopts{$k} = $opts->{$k} if defined($opts->{$k}); + } + if ($db->cmpversion("1.0") > 0) { + for my $node (@$nodesmgmt) { + $node->start(\%startopts) or last try; + } + for my $node (@$nodesdb) { + $node->start(\%startopts) or last try; + } + if (! $opts->{config}) { + for my $node (@$nodesmgmt) { # probably redundant + $node->write(\%writeopts, "all start") or last try; + last; + } + } + } + else { + for my $node (@$nodesdb) { + $node->start(\%startopts) or last try; + } + if (! $opts->{config}) { + for my $node (@$nodesdb) { # probably redundant + $node->write(\%writeopts, "start") or last try; + } + } + } + for my $node (@$nodesapi) { + my %apiopts = %startopts; + if ($node->getruntype eq 'manual') { + $apiopts{config} = 1; + } + $node->start(\%apiopts) or last try; + } + $ret = 1; + } + if (! $ret) { + $log->push("start failed")->push($db); + return undef; + } + my $msg = ! $opts->{config} ? "start done" : "config created"; + $log->put($msg)->push($db)->user; + return 1; +} + +sub stop { + my $db = shift; + @_ == 1 or confess 0+@_; + my($opts) = @_; + $log->put("stop")->push($db)->info; + my $nodesmgmt = $db->getnodelist('mgmt'); + my $nodesdb = $db->getnodelist('db'); + my $nodesapi = $db->getnodelist('api'); + my $ret; + try: { + for my $node (@$nodesapi) { + $node->stop($opts) or last try; + } + if ($db->cmpversion("1.0") > 0) { + for my $node (@$nodesmgmt) { + $node->write($opts, "all stop") or last try; + last; + } + for my $node (@$nodesdb) { + $node->stop($opts) or last try; + } + for my $node (@$nodesmgmt) { + $node->stop($opts) or last try; + } + } + else { + for my $node (@$nodesdb) { + $node->write($opts, "stop") or last try; + } + for my $node (@$nodesdb) { + $node->stop($opts) or last try; + } + } + $ret = 1; + } + if (! $ret) { + $log->push("stop failed")->push($db); + return undef; + } + $log->put("stop done")->push($db)->user; + return 1; +} + +sub kill { + my $db = shift; + @_ == 1 or confess 0+@_; + my($opts) = @_; + $log->put("kill")->push($db)->info; + my $nodesmgmt = $db->getnodelist('mgmt'); + my $nodesdb = $db->getnodelist('db'); + my $nodesapi = $db->getnodelist('api'); + my $ret = 1; + try: { + for my $node (@$nodesapi) { + $node->kill($opts) || ($ret = undef); + } + for my $node (@$nodesdb) { + $node->kill($opts) || ($ret = undef); + } + for my $node (@$nodesmgmt) { + $node->kill($opts) || ($ret = undef); + } + } + if (! $ret) { + $log->push("kill failed")->push($db); + return undef; + } + $log->put("kill done")->push($db)->user; + return 1; +} + +sub list { + my $db = shift; + @_ == 1 or confess 0+@_; + my($opts) = @_; + my $dbsts = {}; + $dbsts->{comment} = $db->getcomment(""); + $dbsts->{home} = $db->gethome; + $log->put("status")->push($db)->info; + my $mgmsts; + for my $node (@{$db->getnodelist('mgmt')}) { + $mgmsts = $node->get_status or + $log->push->error; + last; + } + $mgmsts ||= {}; + for my $node (@{$db->getnodelist('all')}) { + my $id = $node->getid; + my $nodests = $dbsts->{node}{$id} ||= {}; + my $stat = $node->stat($opts) or + $log->push->error; + $nodests->{id} = $id; + $nodests->{type} = $node->gettype; + $nodests->{comment} = $node->getcomment(""); + $nodests->{host} = $node->getserver->gethost; + $nodests->{run} = $stat || "error"; + $nodests->{status} = $mgmsts->{node}{$id}; + } + return $dbsts; +} + +1; +# vim:set sw=4: diff --git a/ndb/tools/ndbnet/lib/NDB/Net/Env.pm b/ndb/tools/ndbnet/lib/NDB/Net/Env.pm new file mode 100644 index 00000000000..d79e72f2bb3 --- /dev/null +++ b/ndb/tools/ndbnet/lib/NDB/Net/Env.pm @@ -0,0 +1,94 @@ +package NDB::Net::Env; + +use strict; +use File::Spec; +use Carp; + +require NDB::Net::Base; + +use vars qw(@ISA); +@ISA = qw(NDB::Net::Base); + +# environment variables +# +# NDB_TOP source dir or installation dir +# NDB_BASE base dir not tied to any release or database +# NDB_NETCFG ndbnet config file, default $NDB_BASE/etc/ndbnet.xml +# +# ndbnet explicitly unsets NDB_TOP and NDB_HOME because they are +# specific to each database or database node + +# constructors + +my $log; + +sub initmodule { + $log = NDB::Util::Log->instance; +} + +NDB::Net::Env->attributes( + base => sub { /^\S+$/ }, + netcfg => sub { /^\S+$/ }, + hostname => sub { /^\S+$/ }, +); + +my $instance; + +sub desc { + my $netenv = shift; + return "net environment";; +} + +sub instance { + my $class = shift; + @_ % 2 == 0 or confess 0+@_; + my(%attr) = @_; + if ($instance) { + return $instance; + } + for my $var (qw(NDB_TOP NDB_HOME)) { + my $top = delete $ENV{$var}; + if (defined($top)) { + if ($^O ne 'MSWin32') { + $ENV{PATH} =~ s!(^|:)$top/bin($|:)!$1$2!g; + $ENV{LD_LIBRARY_PATH} =~ s!(^|:)$top/lib($|:)!$1$2!g; + $ENV{PERL5LIB} =~ s!(^|:)$top/lib/perl5($|:)!$1$2!g; + } + } + } + my $netenv = $class->SUPER::new(%attr); + for my $base ($attr{base}, $ENV{NDB_BASE}) { + if (defined($base)) { + $netenv->setbase($base) + or $log->push, return undef; + } + } + for my $netcfg ($attr{netcfg}, $ENV{NDB_NETCFG}) { + if (defined($netcfg)) { + $netenv->setnetcfg($netcfg) + or $log->push, return undef; + } + } + if ($netenv->hasbase && ! $netenv->hasnetcfg) { + $netenv->setnetcfg(File::Spec->catfile($netenv->getbase, "etc", "ndbnet.xml")) + or $log->push, return undef; + } + my $uname; + if ($^O ne 'MSWin32') { + chomp($uname = `uname -n`); + } else { + chomp($uname = `hostname`); + } + my($hostname) = gethostbyname($uname); + if (! defined($hostname)) { + $uname =~ s!\..*$!!; + ($hostname) = gethostbyname($uname); + } + $netenv->sethostname($hostname) + or $log->push, return undef; + $instance = $netenv; + return $instance; +} + +1; +# vim:set sw=4: diff --git a/ndb/tools/ndbnet/lib/NDB/Net/Node.pm b/ndb/tools/ndbnet/lib/NDB/Net/Node.pm new file mode 100644 index 00000000000..f41bf51168d --- /dev/null +++ b/ndb/tools/ndbnet/lib/NDB/Net/Node.pm @@ -0,0 +1,747 @@ +package NDB::Net::Node; + +use strict; +use Carp; +use Symbol; +use Socket; +use IPC::Open3; +use POSIX(); +use Errno; +use File::Spec; + +require NDB::Net::Base; + +use vars qw(@ISA); +@ISA = qw(NDB::Net::Base); + +# constructors + +my $log; + +sub initmodule { + $log = NDB::Util::Log->instance; +} + +my %nodecache = (); + +NDB::Net::Node->attributes( + db => sub { ref && $_->isa('NDB::Net::Database') }, + comment => sub { defined }, + id => sub { s/^\s+|\s+$//g; s/^0+(\d+)$/$1/; /^\d+$/ && $_ > 0 }, + type => sub { s/^\s+|\s+$//g; /^(mgmt|db|api)$/ }, + server => sub { ref && $_->isa('NDB::Net::Server') }, + base => sub { File::Spec->file_name_is_absolute($_) }, + home => sub { File::Spec->file_name_is_absolute($_) }, + state => sub { /^(new|run|stop)$/ }, + run => sub { defined }, + runenv => sub { defined }, + runtype => sub { m!(auto|once|manual)$! }, + lockpid => sub { $_ != 0 }, + iow => sub { ref && $_->isa('NDB::Util::IO') }, + ior => sub { ref && $_->isa('NDB::Util::IO') }, + pid => sub { $_ > 1 }, + event => sub { ref && $_->isa('NDB::Util::Event') }, +); + +sub desc { + my $node = shift; + my $dbname = $node->getdb->getname; + my $id = $node->getid; + my $type = $node->gettype; + return "$dbname.$id-$type"; +} + +sub new { + my $class = shift; + @_ % 2 == 0 or confess 0+@_; + my(%attr) = @_; + my $node = $class->SUPER::new(%attr); + $node->setdb($attr{db}) + or $log->push, return undef; + $node->setid($attr{id}) + or $log->push, return undef; + if ($nodecache{$node->getdb->getname,$node->getid}) { + $log->put("duplicate node")->push($node); + return undef; + } + $node->setcomment($attr{comment}); + $node->settype($attr{type}) + or $log->push, return undef; + if ($node->getdb->cmpversion("1.0") <= 0 && $node->gettype eq 'mgmt') { + $log->put("no mgmt nodes in db version <= 1.0")->push($node); + return undef; + } + $node->setserver($attr{server}) + or $log->push, return undef; + for my $base ($attr{base}, $node->getdb->getbase(undef)) { + if (defined($base)) { + $node->setbase($base) + or $log->push, return undef; + } + } + for my $home ($attr{home}, $node->getdb->gethome(undef)) { + if (defined($home)) { + if ($^O ne 'MSWin32' && $home !~ m!^/! && $node->hasbase) { + $home = $node->getbase . "/$home"; + } + $node->sethome($home) + or $log->push, return undef; + } + } + if (! $node->hashome) { + $log->put("home not defined")->push($node); + return undef; + } + $node->setstate('new') + or $log->push, return undef; + if (defined($attr{run})) { + $node->setrun($attr{run}) + or $log->push, return undef; + } + if (defined($attr{runenv})) { + $node->setrunenv($attr{runenv}) + or $log->push, return undef; + } + if (defined($attr{runtype})) { + $node->setruntype($attr{runtype}) + or $log->push, return undef; + } + if (! $node->hasruntype) { + my $runtype = "manual"; + $runtype = "once" + if $node->gettype =~ /^(mgmt|db)$/ || $node->hasrun; + $node->setruntype($runtype) + or $log->push, return undef; + } + if (! $node->getdb->addnode($node)) { + $log->push; + return undef; + } + $nodecache{$node->getdb->getname,$node->getid} = $node; + return $node; +} + +sub delete { + my $node = shift; + delete $nodecache{$node->getdb->getname,$node->getid} or + confess 'oops'; +} + +sub deleteall { + my $class = shift; + for my $k (sort keys %nodecache) { + my $node = $nodecache{$k}; + $node->delete; + } +} + +# node startup + +sub getconfdir { + my $node = shift; + @_ == 0 or confess 0+@_; + my $netenv = NDB::Net::Env->instance; + my $name = File::Spec->catfile($netenv->getbase, "etc"); + my $dir = NDB::Util::Dir->new(path => $name); + return $dir; +} + +sub getdbdir { + my $node = shift; + @_ == 0 or confess 0+@_; + my $netenv = NDB::Net::Env->instance; + my $name = File::Spec->catfile($netenv->getbase, "db", $node->getdb->getname); + my $dir = NDB::Util::Dir->new(path => $name); + return $dir; +} + +sub getnodedir { + my $node = shift; + @_ == 0 or confess 0+@_; + my $name = sprintf("%s-%s", $node->getid, $node->gettype); + my $dir = $node->getdbdir->getdir($name); + return $dir; +} + +sub getrundir { + my $node = shift; + @_ == 0 or confess 0+@_; + my $name = sprintf("run"); + my $dir = $node->getdbdir->getdir($name); + return $dir; +} + +sub getlogdir { + my $node = shift; + @_ == 0 or confess 0+@_; + my $name = sprintf("log"); + my $dir = $node->getdbdir->getdir($name); + return $dir; +} + +sub getlock { + my $node = shift; + @_ == 0 or confess 0+@_; + my $name = sprintf("%s-%s.pid", $node->getid, $node->gettype); + my $lock = $node->getrundir->getfile($name)->getlock; + return $lock; +} + +sub getsocketfile { + my $node = shift; + @_ == 0 or confess 0+@_; + my $name = sprintf("%s-%s.socket", $node->getid, $node->gettype); + my $file = $node->getrundir->getfile($name); + return $file; +} + +sub getlogfile { + my $node = shift; + @_ == 0 or confess 0+@_; + my $name = sprintf("%s-%s.log", $node->getid, $node->gettype); + my $file = $node->getlogdir->getfile($name); + return $file; +} + +sub getshellfile { + my $node = shift; + @_ == 0 or confess 0+@_; + my $name = sprintf("run.sh"); + my $file = $node->getnodedir->getfile($name); + return $file; +} + +sub getlocalcfg { + my $node = shift; + @_ == 0 or confess 0+@_; + my $name = "Ndb.cfg"; + my $file = $node->getnodedir->getfile($name); + return $file; +} + +sub writelocalcfg { + my $node = shift; + @_ == 0 or confess 0+@_; + my $db = $node->getdb; + my $file = $node->getlocalcfg; + $file->mkdir or $log->push, return undef; + if ($db->cmpversion("1.0") <= 0) { + my $section = ""; + my $edit = sub { + chomp; + if (/^\s*\[\s*(\S+)\s*\]/) { + $section = uc($1); + } + if ($section eq 'OWN_HOST') { + if (/^\s*ThisHostId\b/i) { + $_ = "ThisHostId " . $node->getid; + } + } + if ($section eq 'CM') { + if (/^\s*ThisNodeId\b/i) { + $_ = "ThisNodeId " . $node->getid; + } + } + if (0 and $section eq 'PROCESS_ID') { + if (/^\s*Host(\d+)\s+(\S+)(.*)/) { + my $id2 = $1; + my $host2 = $2; + my $rest2 = $3; + my $node2 = $db->getnode($id2) + or $log->push, return undef; + $_ = "Host$id2 "; + $_ .= $node2->getserver->getcanon; + $_ .= " $rest2"; + } + } + $_ .= "\n"; + return 1; + }; + $node->getinifile->copyedit($file, $edit) + or $log->push, return undef; + } + else { + my @text = (); + push(@text, sprintf("OwnProcessId %s", $node->getid)); + my $nodesmgmt = $db->getnodelist('mgmt'); + for my $mnode (@$nodesmgmt) { + my $host = $mnode->getserver->getcanon; + my $port = $mnode->getport; + push(@text, "$host $port"); + } + $file->putlines(\@text) or $log->push, return undef; + } + return 1; +} + +sub getinifile { + my $node = shift; + @_ == 0 or confess 0+@_; + my $name = sprintf("%s.ini", $node->getdb->getname); + my $file = $node->getconfdir->getfile($name); + return $file; +} + +sub getbincfg { + my $node = shift; + @_ == 0 or confess 0+@_; + my $name = sprintf("config.bin"); + my $file = $node->getnodedir->getfile($name); + return $file; +} + +sub getenvdefs { + my $node = shift; + @_ == 1 or confess 0+@_; + my $opts = shift; + my $home = $opts->{home} || $node->gethome; + my $netenv = NDB::Net::Env->instance; + if (! File::Spec->file_name_is_absolute($home)) { + $netenv->hasbase + or $log->put("no base and home=$home not absolute"), return undef; + $home = File::Spec->catfile($netenv->getbase, $home); + } + (-d $home) + or $log->put("$home: no such directory"), return undef; + my $defs; + if ($^O ne 'MSWin32') { + $defs = <<END; +# @{[ $node->desc ]} @{[ $node->getcomment("") ]} +# @{[ $node->getserver->desc ]} @{[ $node->getserver->getcanon ]} +# +debugger=\$1 +# +NDB_TOP=$home +export NDB_TOP +PATH=\$NDB_TOP/bin:\$PATH +export PATH +LD_LIBRARY_PATH=\$NDB_TOP/lib:\$LD_LIBRARY_PATH +export LD_LIBRARY_PATH +PERL5LIB=\$NDB_TOP/lib/perl5:\$PERL5LIB +export PERL5LIB +NDB_NODEID=@{[ $node->getid ]} +export NDB_NODEID +NDB_NODETYPE=@{[ $node->gettype ]} +export NDB_NODETYPE +ulimit -Sc unlimited +END + if ($node->hasrunenv) { + $defs .= <<END; +# +cd @{[ $node->getnodedir->getpath ]} || exit 1 +@{[ $node->getrunenv ]} +END + } + $defs .= <<END; +# +unset NDB_HOME # current NdbConfig.c would look here +# +END + } else { + $defs = <<END; +rem @{[ $node->desc ]} @{[ $node->getcomment("") ]} +rem @{[ $node->getserver->desc ]} @{[ $node->getserver->getcanon ]} +rem +set NDB_TOP=$home +set PATH=%NDB_TOP%\\bin;%PATH% +set PERL5LIB=%NDB_TOP%\\lib\\perl5;%PERL5LIB% +set NDB_NODEID=@{[ $node->getid ]} +set NDB_NODETYPE=@{[ $node->gettype ]} +END + if ($node->hasrunenv) { + $defs .= <<END; +rem +@{[ $node->getrunenv ]} +END + } + $defs .= <<END; +rem +rem current NdbConfig.c would look here +set NDB_HOME= +rem +END + } + chomp($defs); + return $defs; +} + +sub startlocal { + my $node = shift; + @_ == 1 or confess 0+@_; + my($opts) = @_; + $log->put("start local")->push($node)->info; + my $lock = $node->getlock; + $lock->mkdir or $log->push, return undef; + anon: { + my $ret = $lock->test; + defined($ret) or $log->push, return undef; + if ($ret) { + $log->put("already running under serverpid=%s", + $lock->getpid)->push($node)->user; + return 1; + } + $lock->set or $log->push, return undef; + } + if ($opts->{clean}) { + $node->getnodedir->rmdir(1); + $node->getlogfile->unlink; + } + if (! $opts->{old}) { + $node->writelocalcfg or $log->push, return undef; + $node->handleprepare($opts) or $log->push, return undef; + } + anon: { + $lock->close; + if ($opts->{config}) { + return 1; + } + my $file = $node->getlogfile; + $file->mkdir or $log->push, return undef; + my $pid = fork(); + defined($pid) or $log->put("fork failed: $!"), return undef; + if ($pid) { + exit(0); + } + $lock->set or $log->push->fatal; + $node->setlockpid($$) or $log->push->fatal; + if ($^O ne 'MSWin32') { + POSIX::setsid() or $log->put("setsid failed: $!")->fatal; + } + $log->setfile($file->getpath) or $log->push->fatal; + } + my $socket; + anon: { + my $file = $node->getsocketfile; + $file->mkdir or $log->push($node)->fatal; + unlink($file->getpath); + if ($^O ne 'MSWin32') { + $socket = NDB::Util::SocketUNIX->new + or $log->push($node)->fatal; + } else { + $socket = NDB::Util::SocketINET->new + or $log->push($node)->fatal; + } + $socket->setopt(SOL_SOCKET, SO_REUSEADDR, 1) + or $log->push($node)->fatal; + if ($^O ne 'MSWin32') { + $socket->bind($file->getpath) + or $log->push($node)->fatal; + } else { + $socket->bind($node->getdb->getnodeport + $node->getid) + or $log->push($node)->fatal; + } + $socket->listen + or $log->push($node)->fatal; + } + START: { + my $w = gensym(); + my $r = gensym(); + my @arg = ('/bin/sh', $node->getshellfile->getpath); + my $pid = open3($w, $r, undef, @arg); + $node->setiow(NDB::Util::IO->new(fh => $w)) + or $log->push->fatal; + $node->setior(NDB::Util::IO->new(fh => $r)) + or $log->push->fatal; + $node->setpid($pid) + or $log->push->fatal; + } + $node->setstate('run') + or $log->push($node)->fatal; + $log->put("started host=%s pid=%s", + $node->getserver->gethost, $node->getpid)->push($node)->user; + $log->push("started")->push($node)->putvalue(1)->user; + $log->detachuser; + NDB::Net::Client->deleteall; + my $event = NDB::Util::Event->new; + $event->set($socket, 'r'); + $event->set($node->getior, 'r'); + loop: { + try: { + my $n = $event->poll(10); + if (! defined($n)) { + $log->push->error; + sleep 1; + last try; + } + if (! $n) { + $log->push->debug; + last try; + } + if ($node->hasior && $event->test($node->getior, 'r')) { + my $data = $node->getior->read; + if (! defined($data)) { + $log->push->fatal; + } + if (length($data) > 0) { + $node->handleoutput($opts, $data); + } + if ($node->getior->getreadend) { + $log->put("input closed")->warn; + $event->clear($node->getior, 'r'); + $node->getior->close; + $node->delior; + $node->handleeof($opts); + last loop; + } + } + if (! $event->test($socket, 'r')) { + last try; + } + my $csocket = $socket->accept(10); + if (! defined($csocket)) { + $log->push->error; + last try; + } + if (! $csocket) { + $log->push->warn; + last try; + } + my $client = NDB::Net::Client->new( + socket => $csocket, + serversocket => $socket, + serverlock => $lock, + event => $event, + context => $node, + ); + $client or $log->push->fatal; + } + NDB::Net::Client->processall; + redo loop; + } + if ($node->getruntype eq "auto") { + if ($node->getstate eq "run") { + $log->put("restart in 5 seconds...")->info; + sleep 5; + goto START; + } + $log->put("stopping, skip restart")->info; + } + $lock->close; + $node->getsocketfile->unlink; + while (wait() != -1) {} + $log->put("exit")->push->info; + exit(0); +} + +# handlers can be overridden in subclass + +sub handleprepare { confess 'oops'; } + +sub handleoutput { + my $node = shift; + @_ == 2 or confess 0+@_; + my($opts, $data) = @_; + $data =~ s/\015//g; + $data = $node->{savedata} . $data; + while ((my $i = index($data, "\n")) >= 0) { + my $line = substr($data, 0, $i); + $data = substr($data, $i+1); + $log->put($line)->info; + if ($opts->{user} && $line !~ /^\s*$/) { + $log->put($line)->user; + } + } + $node->{savedata} = $data; + if (1 && length $node->{savedata}) { # XXX partial line + my $line = $node->{savedata}; + $log->put($line)->info; + if ($opts->{user} && $line !~ /^\s*$/) { + $log->put($line)->user; + } + $node->{savedata} = ""; + } +} + +sub handleeof { +} + +# command subs can be overridden by subclass + +sub waitforexit { + my $node = shift; + my $lock = $node->getlock; + my $lockpid = $node->getlockpid; + my $n1 = 0; + my $n2 = 10; + while (1) { + my $ret = $lock->test; + defined($ret) or $log->push, return undef; + if (! $ret) { + $log->put("exit done")->push($node)->user; + last; + } + if ($lockpid != $lock->getpid) { + $log->put("restarted: lock pid changed %s->%s", + $lockpid, $lock->getpid)->push($node); + return undef; + } + if (++$n1 >= $n2) { + $n2 *= 2; + $log->put("wait for exit")->push($node)->user; + } + select(undef, undef, undef, 0.1); + } + return 1; +} + +sub cmd_stopnode_bg { + my($node, $cmd) = @_; + return $node->waitforexit; +} + +sub cmd_killnode_fg { + my($node, $cmd) = @_; + my $pid = $node->getpid; + $log->put("kill -9 $pid")->push($node)->user; + kill(9, $pid); + $node->setstate('stop') + or $log->push($node), return undef; + return 1; +} + +sub cmd_killnode_bg { + my($node, $cmd) = @_; + return $node->waitforexit; +} + +sub cmd_statnode_bg { + my($node, $cmd) = @_; + return "up"; +} + +sub cmd_writenode_fg { + my($node, $cmd) = @_; + my $text = $cmd->getarg(2); + while(chomp($text)) {}; + $log->put("write: $text")->push($node)->user; + $node->getiow->write("$text\n"); + my $output = ""; + if ((my $num = $cmd->getopt("wait")) > 0) { + my $lim = time + $num; + $node->getior->settimeout(1); + loop: { + my $data = $node->getior->read; + if (length($data) > 0) { + $node->handleoutput({user => 1}, $data); + $output .= $data; + } + redo loop if time < $lim; + } + $node->getior->settimeout(0); + } + return { output => $output }; +} + +# commands + +sub doremote { + my $node = shift; + my($cmdname, $opts, @args) = @_; + my $server = $node->getserver; + $log->put("$cmdname remote")->push($server)->push($node)->info; + my $argv = [ + $cmdname, q(--local), + $opts, $node->getdb->getname, $node->getid, @args ]; + my $cmd = NDB::Net::Command->new(argv => $argv) + or $log->push, return undef; + my $ret = $server->request($cmd) + or $log->push, return undef; + return $ret; +} + +sub dolocal { + my $node = shift; + my($cmdname, $opts, @args) = @_; + $log->put("$cmdname local")->push($node)->info; + if (! $node->getserver->islocal) { + $log->put("not local")->push($node->getserver)->push($node); + return undef; + } + if ($cmdname eq "startnode") { + return $node->startlocal($opts); + } + my $lock = $node->getlock; + anon: { + my $ret = $lock->test; + defined($ret) or $log->push, return undef; + if (! $ret) { + if ($cmdname eq "statnode") { + return "down"; + } + $log->put("not running")->push($node)->user; + return $cmdname eq "writenode" ? undef : 1; + } + } + my $server; + anon: { + my $path = $node->getsocketfile->getpath; + if (! -e $path) { + $log->put("$path: no socket")->push($node); + return undef; + } + if ($^O ne 'MSWin32') { + $server = NDB::Net::ServerUNIX->new(id => 0, path => $path) + or $log->push, return undef; + } else { + $server = NDB::Net::ServerINET->new(id => 0, host => $node->getserver->getcanon, port => $node->getdb->getnodeport + $node->getid) + or $log->push, return undef; + } + } + my $argv = [ + $cmdname, + $opts, $node->getdb->getname, $node->getid, @args ]; + my $cmd = NDB::Net::Command->new(argv => $argv) + or $log->push, return undef; + my $ret = $server->request($cmd) + or $log->push, return undef; + $log->put("$cmdname done")->push($node)->info; + return $ret; +} + +sub start { + my $node = shift; + @_ == 1 or confess 0+@_; + my($opts) = @_; + $log->put("start")->push($node)->info; + my $do = $opts->{local} ? "dolocal" : "doremote"; + return $node->$do("startnode", $opts); +} + +sub stop { + my $node = shift; + @_ == 1 or confess 0+@_; + my($opts) = @_; + $log->put("stop")->push($node)->info; + my $do = $opts->{local} ? "dolocal" : "doremote"; + return $node->$do("stopnode", $opts); +} + +sub kill { + my $node = shift; + @_ == 1 or confess 0+@_; + my($opts) = @_; + $log->put("kill")->push($node)->info; + my $do = $opts->{local} ? "dolocal" : "doremote"; + return $node->$do("killnode", $opts); +} + +sub stat { + my $node = shift; + @_ == 1 or confess 0+@_; + my($opts) = @_; + $log->put("stat")->push($node)->info; + my $do = $opts->{local} ? "dolocal" : "doremote"; + return $node->$do("statnode", $opts); +} + +sub write { + my $node = shift; + @_ == 2 or confess 0+@_; + my($opts, $text) = @_; + $log->put("write: $text")->push($node)->info; + my $do = $opts->{local} ? "dolocal" : "doremote"; + return $node->$do("writenode", $opts, $text); +} + +1; +# vim:set sw=4: diff --git a/ndb/tools/ndbnet/lib/NDB/Net/NodeApi.pm b/ndb/tools/ndbnet/lib/NDB/Net/NodeApi.pm new file mode 100644 index 00000000000..08f5f85577d --- /dev/null +++ b/ndb/tools/ndbnet/lib/NDB/Net/NodeApi.pm @@ -0,0 +1,84 @@ +package NDB::Net::NodeApi; + +use strict; +use Carp; +use Symbol; + +require NDB::Net::Node; + +use vars qw(@ISA); +@ISA = qw(NDB::Net::Node); + +# constructors + +my $log; + +sub initmodule { + $log = NDB::Util::Log->instance; +} + +NDB::Net::NodeApi->attributes(); + +sub new { + my $class = shift; + @_ % 2 == 0 or confess 0+@_; + my(%attr) = @_; + my $node = $class->SUPER::new(%attr, type => 'api') + or $log->push, return undef; + return 1; +} + +# run methods + +sub handleprepare { + my $node = shift; + @_ == 1 or confess 0+@_; + my($opts) = @_; + my $netenv = NDB::Net::Env->instance; + my $envdefs = $node->getenvdefs($opts); + defined($envdefs) or return undef; + my $nodedir = $node->getnodedir; + my $shellfile = $node->getshellfile; + my $run; + if ($node->hasrun) { + $run = $node->getrun; + } + if (defined($opts->{run})) { + $run = $opts->{run}; + } + if (defined($run)) { + $log->put("run: $run")->push($node)->user; + } + if ($^O ne 'MSWin32') { + $shellfile->puttext(<<END) or $log->push, return undef; +$envdefs +cd @{[ $nodedir->getpath ]} || exit 1 +set -x +exec \$DEBUGGER $run +END + } else { + $shellfile->puttext(<<END) or $log->push, return undef; +$envdefs +cd @{[ $nodedir->getpath ]} +call $run +END + } + return 1; +} + +sub cmd_stopnode_fg { + my($node, $cmd) = @_; + my $pid = $node->getpid; + unless ($pid > 1) { + $log->put("bad pid=$pid")->push($node); + return undef; + } + $log->put("kill -15 $pid")->push($node)->user; + kill(15, $pid); + $node->setstate('stop') + or log->push($node), return undef; + return 1; +} + +1; +# vim:set sw=4: diff --git a/ndb/tools/ndbnet/lib/NDB/Net/NodeDb.pm b/ndb/tools/ndbnet/lib/NDB/Net/NodeDb.pm new file mode 100644 index 00000000000..88a35ba4f8d --- /dev/null +++ b/ndb/tools/ndbnet/lib/NDB/Net/NodeDb.pm @@ -0,0 +1,116 @@ +package NDB::Net::NodeDb; + +use strict; +use Carp; +use Symbol; + +require NDB::Net::Node; + +use vars qw(@ISA); +@ISA = qw(NDB::Net::Node); + +# constructors + +my $log; + +sub initmodule { + $log = NDB::Util::Log->instance; +} + +NDB::Net::NodeDb->attributes( + fsdir => sub { s/^\s+|\s+$//g; /^\S+$/ }, +); + +sub new { + my $class = shift; + @_ % 2 == 0 or confess 0+@_; + my(%attr) = @_; + my $node = $class->SUPER::new(%attr, type => 'db') + or $log->push, return undef; + $node->setfsdir($attr{fsdir}) + or $log->push, return undef; + return 1; +} + +# run methods + +sub handleprepare { + my $node = shift; + @_ == 1 or confess 0+@_; + my($opts) = @_; + my $netenv = NDB::Net::Env->instance; + my $envdefs = $node->getenvdefs($opts); + defined($envdefs) or return undef; + my $nodedir = $node->getnodedir; + my $shellfile = $node->getshellfile; + my $fsdir = NDB::Util::Dir->new( + path => sprintf("%s/%s/%s-%s.fs", + $node->getfsdir, $node->getdb->getname, $node->getid, $node->gettype)); + $fsdir->mkdir or $log->push, return undef; + my $init_rm; + my $run; + if ($^O ne 'MSWin32') { + $init_rm = "# no -i"; + if ($opts->{init_rm}) { + $init_rm = 'rm -f $NDB_FILESYSTEM/*/DBDIH/P0.sysfile'; + } + $run = "\$NDB_TOP/bin/ndb"; + } else { + $init_rm = "rem no -i"; + if ($opts->{init_rm}) { + $init_rm = + 'del/f %NDB_FILESYSTEM%\D1\DBDIH\P0.sysfile' . "\n" . + 'del/f %NDB_FILESYSTEM%\D2\DBDIH\P0.sysfile'; + } + $run = "ndb"; + } + if ($node->getdb->cmpversion("1.0") <= 0) { + $run .= " -s"; + } + if ($opts->{nostart}) { + $run .= " -n"; + } + if ($node->hasrun) { + $run = $node->getrun; + } + if (defined($opts->{run})) { + $run = $opts->{run}; + } + $log->put("run: $run")->push($node)->user; + if ($^O ne 'MSWin32') { + $shellfile->puttext(<<END) or $log->push, return undef; +$envdefs +NDB_FILESYSTEM=@{[ $fsdir->getpath ]} +export NDB_FILESYSTEM +# v1.0 compat +UAS_FILESYSTEM=\$NDB_FILESYSTEM +export UAS_FILESYSTEM +mkdir -p \$NDB_FILESYSTEM +$init_rm +cd @{[ $nodedir->getpath ]} || exit 1 +exec \$debugger $run +END + } else { + $shellfile->puttext(<<END) or $log->push, return undef; +$envdefs +set NDB_FILESYSTEM=@{[ $fsdir->getpath ]} +rem v1.0 compat +set UAS_FILESYSTEM=%NDB_FILESYSTEM% +mkdir %NDB_FILESYSTEM% +$init_rm +cd @{[ $nodedir->getpath ]} +call $run +END + } + return 1; +} + +sub cmd_stopnode_fg { + my($node, $cmd) = @_; + $node->setstate('stop') + or log->push($node), return undef; + return 1; +} + +1; +# vim:set sw=4: diff --git a/ndb/tools/ndbnet/lib/NDB/Net/NodeMgmt.pm b/ndb/tools/ndbnet/lib/NDB/Net/NodeMgmt.pm new file mode 100644 index 00000000000..1056e3df623 --- /dev/null +++ b/ndb/tools/ndbnet/lib/NDB/Net/NodeMgmt.pm @@ -0,0 +1,318 @@ +package NDB::Net::NodeMgmt; + +use strict; +use Carp; +use Symbol; + +require NDB::Net::Node; + +use vars qw(@ISA); +@ISA = qw(NDB::Net::Node); + +# constructors + +my $log; + +sub initmodule { + $log = NDB::Util::Log->instance; +} + +NDB::Net::NodeMgmt->attributes( + port => sub { s/^\s+|\s+$//g; /^\d+$/ }, +); + +sub new { + my $class = shift; + @_ % 2 == 0 or confess 0+@_; + my(%attr) = @_; + my $node = $class->SUPER::new(%attr, type => 'mgmt') + or $log->push, return undef; + $node->setport($attr{port}) + or $log->push, return undef; + return 1; +} + +# socket parser methods + +sub socketcommand { + my $node = shift; + my $socket; + $socket = NDB::Util::SocketINET->new or + $log->push($node), return undef; + $socket->settimeout(10); + $socket->connect($node->getserver->getcanon, $node->getport) or + $log->push($node), return undef; + $socket->write("GET STATUS\r\nBYE\r\n") or + $log->push($node), return undef; + my $out = ""; + my $data; + while ($data = $socket->read) { + $out .= $data; + } + $socket->close; + $out =~ s/\015//g; + return $out; +} + +sub get_status { + my $node = shift; + my $out = $node->socketcommand or + $log->push, return undef; + my @out = split(/\n/, $out); + $out[0] =~ /^get\s+status\s+(\d+)/i or + $log->put("bad line 0: $out[0]"), return undef; + my $cnt = $1; + my $ret = {}; + for (my $i = 1; $i <= $cnt; $i++) { + $out[$i] =~ /^$i\s+(.*)/ or + $log->put("bad line $i: $out[$i]"), return undef; + my $text = $1; + $text =~ s/^\s+|\s+$//g; + if ($text =~ /^ndb\s+(no_contact)\s+(\d+)$/i) { + $text = lc "$1"; + } elsif ($text =~ /^ndb\s+(starting)\s+(\d+)$/i) { + $text = lc "$1/$2"; + } elsif ($text =~ /^ndb\s+(started)\s+(\d+)$/i) { + $text = lc "$1"; + } elsif ($text =~ /^ndb\s+(shutting_down)\s+(\d+)$/i) { + $text = lc "$1"; + } elsif ($text =~ /^ndb\s+(restarting)\s+(\d+)$/i) { + $text = lc "$1"; + } elsif ($text =~ /^ndb\s+(unknown)\s+(\d+)$/i) { + $text = lc "$1"; + } + $ret->{node}{$i} = $text; + } + return $ret; +} + +# run methods + +sub getautoinifile { + my $node = shift; + @_ == 0 or confess 0+@_; + my $name = "config.txt"; + my $file = $node->getnodedir->getfile($name); + return $file; +} + +sub writeautoinifile { + my $node = shift; + @_ == 1 or confess 0+@_; + my($opts) = @_; + my $db = $node->getdb; + my $nodelist = $db->getnodelist('all'); + my $computers = {}; + for my $n (@$nodelist) { + $computers->{$n->getserver->getid} ||= { + id => $n->getserver->getid, + hostname => $n->getserver->getcanon, + }; + } + my $section = ""; # e.g. PROCESSES + my $auto; + my $edit = sub { + chomp; + s/^\s+|\s+$//g; + if (/^(\w+)$/) { + $section = uc($1); + } + elsif (/^\@loop$/i) { + $_ = "#$_"; + if ($auto) { + $log->put("nested \@loop"); + return undef; + } + $auto = {}; + } + elsif (/^\@base\s+(\S+)\s*$/) { + my $arg = $1; + $_ = "#$_"; + if (! $auto) { + $log->put("unexpected \@base"); + return undef; + } + if ($arg !~ /^\d+$/) { + $log->put("non-numerical \@base"); + return undef; + } + $auto->{base} = $arg; + } + elsif (/^\@end$/i) { + $_ = "#$_"; + if (! $auto) { + $log->put("unmatched \@end"); + return undef; + } + if ($section eq 'COMPUTERS') { + for my $id (sort { $a <=> $b } keys %$computers) { + my $computer = $computers->{$id}; + $_ .= "\n"; + $_ .= "\nId: " . $computer->{id}; + $_ .= "\nHostName: " . $computer->{hostname}; + if ($auto->{list}) { + $_ .= "\n#defaults"; + for my $s (@{$auto->{list}}) { + $_ .= "\n$s"; + } + } + } + } + elsif ($section eq 'PROCESSES') { + for my $n (@$nodelist) { + if ($auto->{type} && $n->gettype ne lc($auto->{type})) { + next; + } + $_ .= "\n"; + $_ .= "\nType: " . uc($n->gettype); + $_ .= "\nId: " . $n->getid; + $_ .= "\nExecuteOnComputer: " . $n->getserver->getid; + if ($auto->{list}) { + $_ .= "\n#defaults"; + for my $s (@{$auto->{list}}) { + $_ .= "\n$s"; + } + } + } + } + elsif ($section eq 'CONNECTIONS') { + if (! $auto->{type}) { + $log->put("cannot generate CONNECTIONS without type"); + return undef; + } + if (! defined($auto->{base})) { + $log->put("need \@base for CONNECTIONS"); + return undef; + } + my $key = $auto->{base}; + for (my $i1 = 0; $i1 <= $#$nodelist; $i1++) { + for (my $i2 = $i1+1; $i2 <= $#$nodelist; $i2++) { + my $n1 = $nodelist->[$i1]; + my $n2 = $nodelist->[$i2]; + if ($n1->gettype ne 'db' && $n2->gettype ne 'db') { + next; + } + $_ .= "\n"; + $_ .= "\nType: $auto->{type}"; + $_ .= "\nProcessId1: " . $n1->getid; + $_ .= "\nProcessId2: " . $n2->getid; + $key++; + if ($auto->{type} eq 'TCP') { + $_ .= "\nPortNumber: $key"; + if (my $list = $opts->{proxy}) { + my $id1 = $n1->getid; + my $id2 = $n2->getid; + if ($list =~ /\b$id1\b.*-.*\b$id2\b/) { + $key++; + $_ .= "\nProxy: $key"; + } elsif ($list =~ /\b$id2\b.*-.*\b$id1\b/) { + $key++; + $_ .= "\nProxy: $key"; + } + } + } + elsif ($auto->{type} eq 'SHM') { + $_ .= "\nShmKey: $key"; + } + else { + $log->put("cannot handle CONNECTIONS type $auto->{type}"); + return undef; + } + if ($auto->{list}) { + $_ .= "\n#defaults"; + for my $s (@{$auto->{list}}) { + $_ .= "\n$s"; + } + } + } + } + } + else { + $log->put("found \@end in unknown section '$section'"); + return undef; + } + undef $auto; + } + elsif (/^$/) { + } + elsif ($auto) { + if (/^Type:\s*(\w+)$/i) { + $auto->{type} = uc($1); + } + else { + $auto->{list} ||= []; + push(@{$auto->{list}}, $_); + } + $_ = ""; + return 1; # no output + } + $_ .= "\n"; + return 1; + }; + $node->getautoinifile->mkdir + or $log->push, return undef; + $node->getinifile->copyedit($node->getautoinifile, $edit) + or $log->push, return undef; + return 1; +} + +sub handleprepare { + my $node = shift; + @_ == 1 or confess 0+@_; + my($opts) = @_; + my $envdefs = $node->getenvdefs($opts); + defined($envdefs) or return undef; + my $nodedir = $node->getnodedir; + my $shellfile = $node->getshellfile; + my $port = $node->getport; + my $lpath = $node->getlocalcfg->getbasename; + $node->writeautoinifile($opts) + or $log->push, return undef; + my $ipath = $node->getautoinifile->getbasename; + $node->getbincfg->mkdir or $log->push, return undef; + my $cpath = $node->getbincfg->getbasename; + my $run; + if ($^O ne 'MSWin32') { + $run = "\$NDB_TOP/bin/mgmtsrvr"; + } else { + $run = "mgmtsrvr"; + } + my $statport = $port + 1; + $run .= " -l $lpath -c $ipath"; + if ($node->hasrun) { + $run = $node->getrun; + } + if (defined($opts->{run})) { + $run = $opts->{run}; + } + $log->put("run: $run")->push($node)->user; + if ($^O ne 'MSWin32') { + $shellfile->puttext(<<END) or $log->push, return undef; +$envdefs +cd @{[ $nodedir->getpath ]} || exit 1 +set -x +exec \$DEBUGGER $run +END + } else { + $shellfile->puttext(<<END) or $log->push, return undef; +$envdefs +cd @{[ $nodedir->getpath ]} +call $run +END + } + return 1; +} + +sub cmd_stopnode_fg { + my $node = shift; + @_ == 1 or confess 0+@_; + my($cmd) = @_; + $log->put("write: quit")->push($node)->user; + $node->getiow->write("quit\n"); + $node->setstate('stop') + or log->push($node), return undef; + return 1; +} + +1; +# vim:set sw=4: diff --git a/ndb/tools/ndbnet/lib/NDB/Net/Server.pm b/ndb/tools/ndbnet/lib/NDB/Net/Server.pm new file mode 100644 index 00000000000..5d2118f0ffe --- /dev/null +++ b/ndb/tools/ndbnet/lib/NDB/Net/Server.pm @@ -0,0 +1,149 @@ +package NDB::Net::Server; + +use strict; +use Carp; +use Socket; + +require NDB::Net::Base; + +use vars qw(@ISA); +@ISA = qw(NDB::Net::Base); + +# constructors + +my $log; + +sub initmodule { + $log = NDB::Util::Log->instance; +} + +my %servercache = (); + +NDB::Net::Server->attributes( + id => sub { s/^\s+|\s+$//g; m/^\S+$/ && ! m!/! }, + domain => sub { $_ == PF_UNIX || $_ == PF_INET }, +); + +sub desc { + my $server = shift; + my $id = $server->getid; + return "server $id"; +} + +sub add { + my $server = shift; + @_ % 2 == 0 or confess 0+@_; + my(%attr) = @_; + if ($servercache{$server->getid}) { + $log->put("duplicate server")->push($server); + return undef; + } + $servercache{$server->getid} = $server; + return 1; +} + +sub get { + my $class = shift; + @_ == 1 or confess 0+@_; + my($id) = @_; + $id =~ s/^\s+|\s+$//g; + my $server = $servercache{$id}; + if (! $server) { + $log->put("$id: undefined server"); + return undef; + } + $log->put("found")->push($server)->debug; + return $server; +} + +sub delete { + my $server = shift; + delete $servercache{$server->getid}; +} + +sub deleteall { + my $class = shift; + for my $id (sort keys %servercache) { + my $server = $servercache{$id}; + $server->delete; + } +} + +# local server is this server process + +my $localserver; + +sub setlocal { + my $server = shift; + @_ == 0 or confess 0+@_; + $localserver = $server; +} + +sub islocal { + my $server = shift; + @_ == 0 or confess 0+@_; + return $localserver eq $server; +} + +# client side + +sub testconnect { + my $server = shift; + @_ == 0 or confess 0+@_; + my $socket = $server->connect or + $log->push($server), return undef; + $socket->close; + return 1; +} + +sub request { + my $server = shift; + @_ == 1 or confess 0+@_; + my($cmd) = @_; + unless (ref($cmd) && $cmd->isa('NDB::Net::Command')) { + confess 'oops'; + } + my $socket = $server->connect + or $log->push($server), return undef; + anon: { + my $line = $cmd->getline; + my $n = $socket->write("$line\n"); + defined($n) && $n == length("$line\n") + or $log->push($server), return undef; + shutdown($socket->{fh}, 1); + } + my $value; + try: { + my $last; + loop: { + my $line = $socket->readline; + defined($line) + or $log->push($server), last try; + if ($socket->getreadend) { + last loop; + } + while (chomp($line)) {} + $log->put($line)->user + unless $log->hasvalue($line); + $last = $line; + redo loop; + } + if (! $log->hasvalue($last)) { + $log->put("missing return value in \"$last\"")->push($server); + last try; + } + $value = $log->getvalue($last); + defined($value) + or $log->push, last try; + $value = $value->[0]; + if (! defined($value)) { + $log->put("failed")->push($cmd); + last try; + } + } + $socket->close; + return $value; +} + +1; +# vim:set sw=4: diff --git a/ndb/tools/ndbnet/lib/NDB/Net/ServerINET.pm b/ndb/tools/ndbnet/lib/NDB/Net/ServerINET.pm new file mode 100644 index 00000000000..a065c186855 --- /dev/null +++ b/ndb/tools/ndbnet/lib/NDB/Net/ServerINET.pm @@ -0,0 +1,116 @@ +package NDB::Net::ServerINET; + +use strict; +use Carp; +use Socket; + +require NDB::Net::Server; + +use vars qw(@ISA); +@ISA = qw(NDB::Net::Server); + +# constructors + +my $log; + +sub initmodule { + $log = NDB::Util::Log->instance; +} + +NDB::Net::ServerINET->attributes( + host => sub { s/^\s+|\s+$//g; /^\S+$/ }, + port => sub { s/^\s+|\s+$//g; /^\d+$/ }, + canon => sub { s/^\s+|\s+$//g; /^\S+$/ }, + aliases => sub { ref($_) eq 'ARRAY' }, +); + + +sub desc { + my $server = shift; + my $id = $server->getid; + my $host = $server->gethost; + return "server $id at $host"; +} + +sub new { + my $class = shift; + @_ % 2 == 0 or confess 0+@_; + my(%attr) = @_; + my $server = $class->SUPER::new(%attr); + $server->setid($attr{id}) + or $log->push, return undef; + $server->setdomain(PF_INET) + or $log->push, return undef; + $server->sethost($attr{host}) + or $log->push, return undef; + $server->setport($attr{port}) + or $log->push, return undef; + my($canon, $aliases) = gethostbyname($server->gethost); + if (! defined($canon)) { + $log->put("%s: unknown host", $server->gethost); + return undef; + } + $server->setcanon($canon) + or $log->push, return undef; + $server->setaliases([ split(' ', $aliases) ]) + or $log->push, return undef; + $server->add or + $log->push, return undef; + $log->put("added")->push($server)->debug; + return $server; +} + +# find matching servers + +sub match { + my $class = shift; + @_ == 3 or confess 0+@_; + my($host, $port, $servers) = @_; + if (! defined($port) && $host =~ /:/) { + ($host, $port) = split(/:/, $host, 2); + } + $host =~ s/^\s+|\s+$//g; + my($canon) = gethostbyname($host); + unless (defined($canon)) { + $log->put("$host: unknown host"); + return undef; + } + my $hostport = $host; + if (defined($port)) { + $port =~ s/^\s+|\s+$//g; + $port =~ /\d+$/ + or $log->put("$port: non-numeric port"), return undef; + $hostport .= ":$port"; + } + my @server = (); + for my $s (@$servers) { + ($s->getdomain == PF_INET) || next; + ($s->getcanon eq $canon) || next; + ($port && $s->getport != $port) && next; + push(@server, $s); + } + if (! @server) { + $log->put("$hostport: no server found"); + } + if (@server > 1) { + $log->put("$hostport: multiple servers at ports ", + join(' ', map($_->getport, @server))); + } + return \@server; +} + +# client side + +sub connect { + my $server = shift; + @_ == 0 or confess 0+@_; + my $socket; + $socket = NDB::Util::SocketINET->new or + $log->push, return undef; + $socket->connect($server->gethost, $server->getport) or + $log->push, return undef; + return $socket; +} + +1; +# vim:set sw=4: diff --git a/ndb/tools/ndbnet/lib/NDB/Net/ServerUNIX.pm b/ndb/tools/ndbnet/lib/NDB/Net/ServerUNIX.pm new file mode 100644 index 00000000000..b3fa245d5ee --- /dev/null +++ b/ndb/tools/ndbnet/lib/NDB/Net/ServerUNIX.pm @@ -0,0 +1,54 @@ +package NDB::Net::ServerUNIX; + +use strict; +use Carp; +use Socket; + +require NDB::Net::Server; + +use vars qw(@ISA); +@ISA = qw(NDB::Net::Server); + +# constructors + +my $log; + +sub initmodule { + $log = NDB::Util::Log->instance; +} + +NDB::Net::ServerUNIX->attributes( + path => sub { s/^\s+|\s+$//g; /^\S+$/ }, +); + +sub new { + my $class = shift; + @_ % 2 == 0 or confess 0+@_; + my(%attr) = @_; + my $server = $class->SUPER::new(%attr); + $server->setid($attr{id}) + or $log->push, return undef; + $server->setdomain(PF_UNIX) + or $log->push, return undef; + $server->setpath($attr{path}) + or $log->push, return undef; + $server->add or + $log->push, return undef; + return $server; +} + +# client side + +sub connect { + my $server = shift; + @_ == 0 or confess 0+@_; + my $socket; + $socket = NDB::Util::SocketUNIX->new or + $log->push, return undef; + $socket->connect($server->getpath) or + $log->push, return undef; + return $socket; +} + +1; +# vim:set sw=4: diff --git a/ndb/tools/ndbnet/lib/NDB/Run.pm b/ndb/tools/ndbnet/lib/NDB/Run.pm new file mode 100644 index 00000000000..a8cabde544c --- /dev/null +++ b/ndb/tools/ndbnet/lib/NDB/Run.pm @@ -0,0 +1,40 @@ +package NDB::Run; + +use strict; +use Carp; +require Exporter; + +use NDB::Net; + +use vars qw(@ISA @EXPORT @EXPORT_OK); +@ISA = qw(Exporter); + +use vars qw(@modules); +@modules = qw( + NDB::Run::Base + NDB::Run::Database + NDB::Run::Env + NDB::Run::Node +); + +return 1 if $main::onlymodules; + +for my $module (@modules) { + eval "require $module"; + $@ and confess "$module $@"; +} + +for my $module (@modules) { + eval "$module->initmodule"; + $@ and confess "$module $@"; +} + +# methods + +sub getenv { + my $class = shift; + return NDB::Run::Env->new(@_); +} + +1; +# vim:set sw=4: diff --git a/ndb/tools/ndbnet/lib/NDB/Run/Base.pm b/ndb/tools/ndbnet/lib/NDB/Run/Base.pm new file mode 100644 index 00000000000..4769f2c4441 --- /dev/null +++ b/ndb/tools/ndbnet/lib/NDB/Run/Base.pm @@ -0,0 +1,12 @@ +package NDB::Run::Base; + +use strict; +use Carp; + +require NDB::Util::Base; + +use vars qw(@ISA); +@ISA = qw(NDB::Util::Base); + +1; +# vim:set sw=4: diff --git a/ndb/tools/ndbnet/lib/NDB/Run/Database.pm b/ndb/tools/ndbnet/lib/NDB/Run/Database.pm new file mode 100644 index 00000000000..9a12ddb20b3 --- /dev/null +++ b/ndb/tools/ndbnet/lib/NDB/Run/Database.pm @@ -0,0 +1,89 @@ +package NDB::Run::Database; + +use strict; +use Carp; + +require NDB::Run::Base; + +use vars qw(@ISA); +@ISA = qw(NDB::Run::Base); + +# constructors + +my $log; + +sub initmodule { + $log = NDB::Util::Log->instance; +} + +NDB::Run::Database->attributes( + name => sub { s/^\s+|\s+$//g; /^\S+$/ && ! m!/! }, + env => sub { ref && $_->isa('NDB::Run::Env') }, +); + +sub desc { + my $db = shift; + return $db->getname; +} + +sub new { + my $class = shift; + @_ % 2 == 0 or confess 0+@_; + my(%attr) = @_; + my $db = $class->SUPER::new(%attr); + $db->setname($attr{name}) + or $log->push, return undef; + $db->setenv($attr{env}) + or $log->push, return undef; + return $db; +} + +sub getnode { + my $db = shift; + @_ == 1 or croak q(usage: $node = $db->getnode($id)); + my($id) = @_; + my $node = NDB::Run::Node->new(db => $db, id => $id) + or $log->push, return undef; + return $node; +} + +# commands + +sub start { + my $db = shift; + my $opts = shift; + my $argv = [ 'start', $db->getname, $opts ]; + my $cmd = NDB::Net::Command->new(argv => $argv) + or $log->push, return undef; + my $ret = $db->getenv->docmd($cmd); + defined($ret) + or $log->push, return undef; + return $ret; +} + +sub stop { + my $db = shift; + my $opts = shift; + my $argv = [ 'stop', $db->getname, $opts ]; + my $cmd = NDB::Net::Command->new(argv => $argv) + or $log->push, return undef; + my $ret = $db->getenv->docmd($cmd); + defined($ret) + or $log->push, return undef; + return $ret; +} + +sub kill { + my $db = shift; + my $opts = shift; + my $argv = [ 'kill', $db->getname, $opts ]; + my $cmd = NDB::Net::Command->new(argv => $argv) + or $log->push, return undef; + my $ret = $db->getenv->docmd($cmd); + defined($ret) + or $log->push, return undef; + return $ret; +} + +1; +# vim:set sw=4: diff --git a/ndb/tools/ndbnet/lib/NDB/Run/Env.pm b/ndb/tools/ndbnet/lib/NDB/Run/Env.pm new file mode 100644 index 00000000000..e851a82636b --- /dev/null +++ b/ndb/tools/ndbnet/lib/NDB/Run/Env.pm @@ -0,0 +1,84 @@ +package NDB::Run::Env; + +use strict; +use Carp; + +require NDB::Run::Base; + +use vars qw(@ISA); +@ISA = qw(NDB::Run::Base); + +# constructors + +my $log; + +sub initmodule { + $log = NDB::Util::Log->instance; +} + +NDB::Run::Env->attributes( + server => sub { ref && $_->isa('NDB::Net::Server') }, +); + +sub desc { + "env"; +} + +sub new { + my $class = shift; + @_ % 2 == 0 or confess 0+@_; + my(%attr) = @_; + my $env = $class->SUPER::new(%attr); + return $env; +} + +sub getdb { + my $env = shift; + @_ == 1 or croak q(usage: $db = $env->getdb($name)); + my($name) = @_; + my $db = NDB::Run::Database->new(env => $env, name => $name) + or $log->push, return undef; + return $db; +} + +# commands + +sub init { + my $env = shift; + my $netenv = NDB::Net::Env->instance; + my $netcfg = NDB::Net::Config->new(file => $netenv->getnetcfg) + or $log->push, return undef; + $netcfg->load + or $log->push, return undef; + my $servers = $netcfg->getservers + or $log->push, return undef; + my $server; + for my $s (@$servers) { + if (! $s->testconnect) { + $log->push->warn; + next; + } + $server = $s; + last; + } + if (! $server) { + $log->put("no available server")->push($netcfg); + return undef; + } + $env->setserver($server) + or $log->push, return undef; + $log->put("selected")->push($server)->info; + return 1; +} + +sub docmd { + my $env = shift; + my $cmd = shift; + my $ret = $env->getserver->request($cmd); + defined($ret) + or $log->push, return undef; + return $ret; +} + +1; +# vim:set sw=4: diff --git a/ndb/tools/ndbnet/lib/NDB/Run/Node.pm b/ndb/tools/ndbnet/lib/NDB/Run/Node.pm new file mode 100644 index 00000000000..e657021b229 --- /dev/null +++ b/ndb/tools/ndbnet/lib/NDB/Run/Node.pm @@ -0,0 +1,114 @@ +package NDB::Run::Node; + +use strict; +use Carp; + +require NDB::Run::Base; + +use vars qw(@ISA); +@ISA = qw(NDB::Run::Base); + +# constructors + +my $log; + +sub initmodule { + $log = NDB::Util::Log->instance; +} + +NDB::Run::Node->attributes( + env => sub { ref && $_->isa('NDB::Run::Env') }, + db => sub { ref && $_->isa('NDB::Run::Database') }, + dbname => sub { s/^\s+|\s+$//g; /^\S+$/ && ! m!/! }, + id => sub { s/^\s+|\s+$//g; s/^0+(\d+)$/$1/; /^\d+$/ && $_ > 0 }, + type => sub { s/^\s+|\s+$//g; /^(mgmt|db|api)$/ }, +); + +sub desc { + my $node = shift; + my $dbname = $node->getdb->getname; + my $id = $node->getid; + my $type = "?"; # $node->gettype; + return "$dbname.$id-$type"; +} + +sub new { + my $class = shift; + @_ % 2 == 0 or confess 0+@_; + my(%attr) = @_; + my $node = $class->SUPER::new(%attr); + $node->setdb($attr{db}) + or $log->push, return undef; + $node->setenv($node->getdb->getenv) + or $log->push, return undef; + $node->setdbname($node->getdb->getname) + or $log->push, return undef; + $node->setid($attr{id}) + or $log->push, return undef; +# $node->settype($attr{type}) +# or $log->push, return undef; + return $node; +} + +# commands + +sub start { + my $node = shift; + my $opts = shift; + my $argv = [ 'startnode', $node->getdb->getname, $node->getid, $opts ]; + my $cmd = NDB::Net::Command->new(argv => $argv) + or $log->push, return undef; + my $ret = $node->getenv->docmd($cmd) + or $log->push, return undef; + return $ret; +} + +sub stop { + my $node = shift; + my $opts = shift; + my $argv = [ 'stopnode', $node->getdb->getname, $node->getid, $opts ]; + my $cmd = NDB::Net::Command->new(argv => $argv) + or $log->push, return undef; + my $ret = $node->getenv->docmd($cmd) + or $log->push, return undef; + return $ret; +} + +sub kill { + my $node = shift; + my $opts = shift; + my $argv = [ 'killnode', $node->getdb->getname, $node->getid, $opts ]; + my $cmd = NDB::Net::Command->new(argv => $argv) + or $log->push, return undef; + my $ret = $node->getenv->docmd($cmd) + or $log->push, return undef; + return $ret; +} + +sub stat { + my $node = shift; + my $opts = shift; + my $argv = [ 'statnode', $node->getdb->getname, $node->getid, $opts ]; + my $cmd = NDB::Net::Command->new(argv => $argv) + or $log->push, return undef; + my $ret = $node->getenv->docmd($cmd) + or $log->push, return undef; + return $ret; +} + +sub write { + my $node = shift; + my $text = shift; + my $opts = shift; + my $argv = [ 'writenode', $node->getdb->getname, $node->getid, $text, $opts ]; + my $cmd = NDB::Net::Command->new(argv => $argv) + or $log->push, return undef; + my $ret = $node->getenv->docmd($cmd) + or $log->push, return undef; + ref($ret) eq 'HASH' && defined($ret->{output}) + or confess 'oops'; + return $ret; +} + +1; +# vim:set sw=4: diff --git a/ndb/tools/ndbnet/lib/NDB/Util.pm b/ndb/tools/ndbnet/lib/NDB/Util.pm new file mode 100644 index 00000000000..d5db35cbf13 --- /dev/null +++ b/ndb/tools/ndbnet/lib/NDB/Util.pm @@ -0,0 +1,37 @@ +package NDB::Util; + +use strict; +use Carp; +require Exporter; + +use vars qw(@ISA @EXPORT @EXPORT_OK); +@ISA = qw(Exporter); + +use vars qw(@modules); +@modules = qw( + NDB::Util::Base + NDB::Util::Dir + NDB::Util::Event + NDB::Util::File + NDB::Util::IO + NDB::Util::Lock + NDB::Util::Log + NDB::Util::Socket + NDB::Util::SocketINET + NDB::Util::SocketUNIX +); + +return 1 if $main::onlymodules; + +for my $module (@modules) { + eval "require $module"; + $@ and confess "$module $@"; +} + +for my $module (@modules) { + eval "$module->initmodule"; + $@ and confess "$module $@"; +} + +1; +# vim:set sw=4: diff --git a/ndb/tools/ndbnet/lib/NDB/Util/Base.pm b/ndb/tools/ndbnet/lib/NDB/Util/Base.pm new file mode 100644 index 00000000000..20df78a3b9b --- /dev/null +++ b/ndb/tools/ndbnet/lib/NDB/Util/Base.pm @@ -0,0 +1,113 @@ +package NDB::Util::Base; + +use strict; +use Carp; + +# constructors + +my $log; + +sub initmodule { + $log = NDB::Util::Log->instance; +} + +sub new { + my $class = shift; + my $this = bless {}, $class; + return $this; +} + +sub getlog { + my $this = shift; + return NDB::Util::Log->instance; +} + +# clone an object +# extra attributes override or delete (if value is undef) +sub clone { + my $this = shift; + @_ % 2 == 0 or confess 0+@_; + my(%attr) = @_; + my $that = bless {}, ref($this); + for my $attr (sort keys %$this) { + if (! exists($attr{$attr})) { + my $get = "get$attr"; + $attr{$attr} = $this->$get(); + } + } + for my $attr (sort keys %attr) { + if (defined($attr{$attr})) { + my $set = "set$attr"; + $that->$set($attr{$attr}); + } + } + return $that; +} + +# methods for member variables: +# - set returns 1 on success and undef on undefined or invalid value +# - get aborts unless value exists or a default (maybe undef) is given +# - has tests existence of value +# - del deletes the value and returns it (maybe undef) + +sub attributes { + @_ % 2 == 1 or confess 0+@_; + my $class = shift; + my @attr = @_; + while (@attr) { + my $attr = shift @attr; + my $filter = shift @attr; + $attr =~ /^\w+$/ or confess $attr; + ref($filter) eq 'CODE' or confess $attr; + my $set = sub { + @_ == 2 or confess "set$attr: arg count: @_"; + my $this = shift; + my $value = shift; + if (! defined($value)) { + $log->put("set$attr: undefined value")->push($this); + return undef; + } + local $_ = $value; + if (! &$filter($this)) { + $log->put("set$attr: invalid value: $value")->push($this); + return undef; + } + $value = $_; + if (! defined($value)) { + confess "set$attr: changed to undef"; + } + $this->{$attr} = $value; + return 1; + }; + my $get = sub { + @_ == 1 || @_ == 2 or confess "get$attr: arg count: @_"; + my $this = shift; + my $value = $this->{$attr}; + if (! defined($value)) { + @_ == 0 and confess "get$attr: no value"; + $value = shift; + } + return $value; + }; + my $has = sub { + @_ == 1 or confess "has$attr: arg count: @_"; + my $this = shift; + my $value = $this->{$attr}; + return defined($value); + }; + my $del = sub { + @_ == 1 or confess "del$attr: arg count: @_"; + my $this = shift; + my $value = delete $this->{$attr}; + return $value; + }; + no strict 'refs'; + *{"${class}::set$attr"} = $set; + *{"${class}::get$attr"} = $get; + *{"${class}::has$attr"} = $has; + *{"${class}::del$attr"} = $del; + } +} + +1; +# vim:set sw=4: diff --git a/ndb/tools/ndbnet/lib/NDB/Util/Dir.pm b/ndb/tools/ndbnet/lib/NDB/Util/Dir.pm new file mode 100644 index 00000000000..90609b971c7 --- /dev/null +++ b/ndb/tools/ndbnet/lib/NDB/Util/Dir.pm @@ -0,0 +1,170 @@ +package NDB::Util::Dir; + +use strict; +use Carp; +use Symbol; +use Errno; +use File::Basename; + +require NDB::Util::Base; + +use vars qw(@ISA); +@ISA = qw(NDB::Util::Base); + +# constructors + +my $log; + +sub initmodule { + $log = NDB::Util::Log->instance; +} + +NDB::Util::Dir->attributes( + path => sub { length > 0 }, +); + +sub new { + my $class = shift; + @_ % 2 == 0 or confess 0+@_; + my(%attr) = @_; + my $dir = $class->SUPER::new(%attr); + $dir->setpath($attr{path}) + or $log->push, return undef; + return $dir; +} + +sub desc { + my $dir = shift; + return $dir->getpath; +} + +sub getparent { + my $dir = shift; + @_ == 0 or confess 0+@_; + my $ppath = dirname($dir->getpath); + my $pdir = NDB::Util::Dir->new(path => $ppath); + return $pdir; +} + +sub getdir { + my $dir = shift; + @_ == 1 or confess 0+@_; + my($name) = @_; + my $dirpath = $dir->getpath; + my $path = $dirpath eq '.' ? $name : File::Spec->catfile($dirpath, $name); + my $entry = NDB::Util::Dir->new(path => $path); + return $entry; +} + +sub getfile { + my $dir = shift; + @_ == 1 or confess 0+@_; + my($name) = @_; + my $dirpath = $dir->getpath; + my $path = $dirpath eq '.' ? $name : File::Spec->catfile($dirpath, $name); + my $entry = NDB::Util::File->new(path => $path); + return $entry; +} + +# list + +sub listdirs { + my $dir = shift; + @_ == 0 or confess 0+@_; + my @list = (); + my $dirpath = $dir->getpath; + my $dh = gensym(); + if (! opendir($dh, $dirpath)) { + $log->put("opendir failed: $!")->push($dir); + return undef; + } + while (defined(my $name = readdir($dh))) { + if ($name eq '.' || $name eq '..') { + next; + } + my $path = $dirpath eq '.' ? $name : "$dirpath/$name"; + if (! -l $path && -d $path) { + my $dir2 = NDB::Util::Dir->new(path => $path) + or $log->push, return undef; + push(@list, $dir2); + } + } + close($dh); + return \@list; +} + +sub listfiles { + my $dir = shift; + @_ == 0 or confess 0+@_; + my @list = (); + my $dirpath = $dir->getpath; + my $dh = gensym(); + if (! opendir($dh, $dirpath)) { + $log->put("opendir failed: $!")->push($dir); + return undef; + } + while (defined(my $name = readdir($dh))) { + if ($name eq '.' || $name eq '..') { + next; + } + my $path = $dirpath eq '.' ? $name : "$dirpath/$name"; + if (! -d $path && -e $path) { + my $file2 = NDB::Util::File->new(path => $path) + or $log->push, return undef; + push(@list, $file2); + } + } + close($dh); + return \@list; +} + +# create / remove + +sub mkdir { + my $dir = shift; + @_ == 0 or confess 0+@_; + if (! -d $dir->getpath) { + my $pdir = $dir->getparent; + if (length($pdir->getpath) >= length($dir->getpath)) { + $log->put("mkdir looping")->push($dir); + return undef; + } + $pdir->mkdir or return undef; + if (! mkdir($dir->getpath, 0777)) { + my $errstr = "$!"; + if (-d $dir->getpath) { + return 1; + } + $log->put("mkdir failed: $errstr")->push($dir); + return undef; + } + } + return 1; +} + +sub rmdir { + my $dir = shift; + my $keep = shift; # keep top level + $log->put("remove")->push($dir)->info; + my $list; + $list = $dir->listdirs or $log->push, return undef; + for my $d (@$list) { + $d->rmdir or $log->push, return undef; + } + $list = $dir->listfiles or $log->push, return undef; + for my $f (@$list) { + $f->unlink or $log->push, return undef; + } + if (! $keep && ! rmdir($dir->getpath)) { + my $errstr = "$!"; + if (! -e $dir->getpath) { + return 1; + } + $log->put("rmdir failed: $errstr")->push($dir); + return undef; + } + return 1; +} + +1; +# vim:set sw=4: diff --git a/ndb/tools/ndbnet/lib/NDB/Util/Event.pm b/ndb/tools/ndbnet/lib/NDB/Util/Event.pm new file mode 100644 index 00000000000..a3ad32cd7fb --- /dev/null +++ b/ndb/tools/ndbnet/lib/NDB/Util/Event.pm @@ -0,0 +1,103 @@ +package NDB::Util::Event; + +use strict; +use Carp; +use Errno; + +require NDB::Util::Base; + +use vars qw(@ISA); +@ISA = qw(NDB::Util::Base); + +# constructors + +my $log; + +sub initmodule { + $log = NDB::Util::Log->instance; +} + +NDB::Util::Event->attributes(); + +sub new { + my $class = shift; + @_ % 2 == 0 or confess 0+@_; + my(%attr) = @_; + my $event = $class->SUPER::new(%attr); + return $event; +} + +# set and test bits + +sub check { + my $event = shift; + my($file, $type) = @_; + my $fileno; + if (ref($file) eq 'GLOB') { + $fileno = fileno($file); + } + elsif (ref($file)) { + $file->can("getfh") or confess 'oops'; + $fileno = fileno($file->getfh); + } + else { + $fileno = $file; + } + defined($fileno) or confess 'oops'; + $fileno =~ s/^\s+|\s+$//g; + $fileno =~ /^\d+$/ or confess 'oops'; + $type =~ /^[rwe]$/ or confess 'oops'; + return ($fileno, $type); +} + +sub set { + my $event = shift; + @_ == 2 or confess 0+@_; + my($fileno, $type) = $event->check(@_); + vec($event->{"i_$type"}, $fileno, 1) = 1; +} + +sub clear { + my $event = shift; + @_ == 2 or confess 0+@_; + my($fileno, $type) = $event->check(@_); + vec($event->{"i_$type"}, $fileno, 1) = 0; +} + +sub test { + my $event = shift; + @_ == 2 or confess 0+@_; + my($fileno, $type) = $event->check(@_); + return vec($event->{"o_$type"}, $fileno, 1); +} + +# poll + +sub poll { + my $event = shift; + @_ <= 1 or confess 'oops'; + my $timeout = shift; + if (defined($timeout)) { + $timeout =~ /^\d+$/ or confess 'oops'; + } + $event->{o_r} = $event->{i_r}; + $event->{o_w} = $event->{i_w}; + $event->{o_e} = $event->{i_e}; + my $n; + $n = select($event->{o_r}, $event->{o_w}, $event->{o_e}, $timeout); + if ($n < 0 || ! defined($n)) { + if ($! == Errno::EINTR) { + $log->put("select interrupted"); + return 0; + } + $log->put("select failed: $!"); + return undef; + } + if (! $n) { + $log->put("select timed out"); + } + return $n; +} + +1; +# vim:set sw=4: diff --git a/ndb/tools/ndbnet/lib/NDB/Util/File.pm b/ndb/tools/ndbnet/lib/NDB/Util/File.pm new file mode 100644 index 00000000000..4b3cb38191c --- /dev/null +++ b/ndb/tools/ndbnet/lib/NDB/Util/File.pm @@ -0,0 +1,163 @@ +package NDB::Util::File; + +use strict; +use Carp; +use Symbol; +use Errno; +use File::Basename; + +require NDB::Util::Base; + +use vars qw(@ISA); +@ISA = qw(NDB::Util::Base); + +# constructors + +my $log; + +sub initmodule { + $log = NDB::Util::Log->instance; +} + +NDB::Util::File->attributes( + path => sub { length > 0 }, +); + +sub new { + my $class = shift; + @_ % 2 == 0 or confess 0+@_; + my(%attr) = @_; + my $file = $class->SUPER::new(%attr); + $file->setpath($attr{path}) + or $log->push, return undef; + return $file; +} + +sub desc { + my $file = shift; + return $file->getpath; +} + +sub getdir { + my $file = shift; + @_ == 0 or confess 0+@_; + my $dirpath = dirname($file->getpath); + my $dir = NDB::Util::Dir->new(path => $dirpath); + return $dir; +} + +sub getlock { + my $file = shift; + @_ == 0 or confess 0+@_; + my $lock = NDB::Util::Lock->new(path => $file->getpath); + return $lock; +} + +sub getbasename { + my $file = shift; + @_ == 0 or confess 0+@_; + return basename($file->getpath); +} + +# make dir, unlink + +sub mkdir { + my $file = shift; + @_ == 0 or confess 0+@_; + return $file->getdir->mkdir; +} + +sub unlink { + my $file = shift; + @_ == 0 or confess 0+@_; + $log->put("remove")->push($file)->debug; + if (-e $file->getpath) { + if (! unlink($file->getpath)) { + my $errstr = "$!"; + if (! -e $file->getpath) { + return 1; + } + $log->put("unlink failed: $errstr")->push($file); + return undef; + } + } + return 1; +} + +# read /write + +sub open { + my $file = shift; + @_ == 1 or confess 0+@_; + my($mode) = @_; + my $fh = gensym(); + if (! open($fh, $mode.$file->getpath)) { + $log->put("open$mode failed")->push($file); + return undef; + } + my $io = NDB::Util::IO->new; + $io->setfh($fh) + or $log->push, return undef; + return $io; +} + +sub puttext { + my $file = shift; + @_ == 1 or confess 0+@_; + my($text) = @_; + ref($text) and confess 'oops'; + $file->mkdir + or $log->push, return undef; + $file->unlink + or $log->push, return undef; + my $io = $file->open(">") + or $log->push, return undef; + if (! $io->write($text)) { + $log->push($file); + $io->close; + return undef; + } + if (! $io->close) { + $log->push($file); + return undef; + } + return 1; +} + +sub putlines { + my $file = shift; + @_ == 1 or confess 0+@_; + my($lines) = @_; + ref($lines) eq 'ARRAY' or confess 'oops'; + my $text = join("\n", @$lines) . "\n"; + $file->puttext($text) or $log->push, return undef; + return 1; +} + +sub copyedit { + my $file1 = shift; + @_ == 2 or confess 0+@_; + my($file2, $edit) = @_; + my $io1 = $file1->open("<") + or $log->push, return undef; + my $io2 = $file2->open(">") + or $log->push, return undef; + local $_; + my $fh1 = $io1->getfh; + my $fh2 = $io2->getfh; + my $line = 0; + while (defined($_ = <$fh1>)) { + $line++; + if (! &$edit()) { + $log->push("line $line")->push($file1); + return undef; + } + print $fh2 $_; + } + $io1->close; + $io2->close; + return 1; +} + +1; +# vim:set sw=4: diff --git a/ndb/tools/ndbnet/lib/NDB/Util/IO.pm b/ndb/tools/ndbnet/lib/NDB/Util/IO.pm new file mode 100644 index 00000000000..34f4d0a150d --- /dev/null +++ b/ndb/tools/ndbnet/lib/NDB/Util/IO.pm @@ -0,0 +1,213 @@ +package NDB::Util::IO; + +use strict; +use Carp; + +require NDB::Util::Base; + +use vars qw(@ISA); +@ISA = qw(NDB::Util::Base); + +# constructors + +my $log; + +sub initmodule { + $log = NDB::Util::Log->instance; +} + +NDB::Util::IO->attributes( + readbuf => sub { defined }, + readend => sub { defined }, + writebuf => sub { defined }, + writeend => sub { defined }, + iosize => sub { $_ > 0 }, + timeout => sub { /^\d+$/ }, + fh => sub { ref($_) eq 'GLOB' && defined(fileno($_)) }, +); + +sub desc { + my $io = shift; + my $fileno = $io->hasfh ? fileno($io->getfh) : -1; + return "fd=$fileno"; +} + +sub new { + my $class = shift; + @_ % 2 == 0 or confess 0+@_; + my(%attr) = @_; + my $io = $class->SUPER::new(%attr); + $io->setreadbuf("") + or $log->push, return undef; + $io->setreadend(0) + or $log->push, return undef; + $io->setwritebuf("") + or $log->push, return undef; + $io->setwriteend(0) + or $log->push, return undef; + $io->setiosize(1024) + or $log->push, return undef; + $io->settimeout(0) + or $log->push, return undef; + if (defined($attr{fh})) { + $io->setfh($attr{fh}) + or $log->push, return undef; + } + return $io; +} + +# input / output + +sub read { + my $io = shift; + @_ == 0 or confess 0+@_; + if ($io->getreadend) { + return ""; + } + my $size = $io->getiosize; + my $timeout = $io->hastimeout ? $io->gettimeout : 0; + my $fh = $io->getfh; + my $n; + my $data; + eval { + if ($^O ne 'MSWin32' && $timeout > 0) { + local $SIG{ALRM} = sub { die("timed out\n") }; + alarm($timeout); + $n = sysread($fh, $data, $size); + alarm(0); + } + else { + $n = sysread($fh, $data, $size); + } + }; + if ($@) { + $log->put("read error: $@")->push($io); + return undef; + } + if (! defined($n)) { + $log->put("read failed: $!")->push($io); + return undef; + } + if ($n == 0) { + $io->setreadend(1) + or $log->push, return undef; + $log->put("read EOF")->push($io)->debug; + return ""; + } + (my $show = $data) =~ s!\n!\\n!g; + $log->put("read: $show")->push($io)->debug; + return $data; +} + +sub readbuf { + my $io = shift; + @_ == 0 or confess 0+@_; + my $data = $io->read; + defined($data) or + $log->push, return undef; + if (length($data) == 0) { + return 0; + } + $io->setreadbuf($io->getreadbuf . $data) + or $log->push, return undef; + return 1; +} + +sub readupto { + my $io = shift; + @_ == 1 or confess 0+@_; + my($code) = @_; + ref($code) eq 'CODE' or confess 'oops'; + my $k = &$code($io->getreadbuf); + if (! defined($k)) { + $log->push($io); + return undef; + } + if ($k == 0) { + my $n = $io->readbuf; + defined($n) or + $log->push, return undef; + if ($n == 0) { + if ($io->getreadbuf eq "") { + return ""; + } + $log->put("incomplete input: %s", $io->getreadbuf)->push($io); + return undef; + } + $k = &$code($io->getreadbuf); + if (! defined($k)) { + $log->push($io); + return undef; + } + if ($k == 0) { + return ""; + } + } + my $head = substr($io->getreadbuf, 0, $k); + my $tail = substr($io->getreadbuf, $k); + $io->setreadbuf($tail) + or $log->push, return undef; + return $head; +} + +sub readline { + my $io = shift; + @_ == 0 or confess 0+@_; + my $code = sub { + my $i = index($_[0], "\n"); + return $i < 0 ? 0 : $i + 1; + }; + return $io->readupto($code); +} + +sub write { + my $io = shift; + @_ == 1 or confess 0+@_; + my($data) = @_; + my $timeout = $io->hastimeout ? $io->gettimeout : 0; + my $fh = $io->getfh; + (my $show = $data) =~ s!\n!\\n!g; + $log->put("write: $show")->push($io)->debug; + my $n; + my $size = length($data); + eval { + local $SIG{PIPE} = sub { die("broken pipe\n") }; + if ($^O ne 'MSWin32' && $timeout > 0) { + local $SIG{ALRM} = sub { die("timed out\n") }; + alarm($timeout); + $n = syswrite($fh, $data, $size); + alarm(0); + } + else { + $n = syswrite($fh, $data, $size); + } + }; + if ($@) { + $log->put("write error: $@")->push($io); + return undef; + } + if (! defined($n)) { + $log->put("write failed: $!")->push($io); + return undef; + } + if ($n > $size) { + $log->put("impossible write: $n > $size")->push($io); + return undef; + } + if ($n != $size) { # need not be error + $log->put("short write: $n < $size")->push($io); + } + return $n; +} + +sub close { + my $io = shift; + @_ == 0 or confess 0+@_; + if (! close($io->delfh)) { + $log->put("close failed: $!")->push($io); + return undef; + } + return 1; +} + +1; diff --git a/ndb/tools/ndbnet/lib/NDB/Util/Lock.pm b/ndb/tools/ndbnet/lib/NDB/Util/Lock.pm new file mode 100644 index 00000000000..b515e633059 --- /dev/null +++ b/ndb/tools/ndbnet/lib/NDB/Util/Lock.pm @@ -0,0 +1,136 @@ +package NDB::Util::Lock; + +use strict; +use Carp; +use Symbol; +use Fcntl qw(:flock); +use Errno; +use File::Basename; + +require NDB::Util::File; + +use vars qw(@ISA); +@ISA = qw(NDB::Util::File); + +# constructors + +my $log; + +sub initmodule { + $log = NDB::Util::Log->instance; +} + +NDB::Util::Lock->attributes( + pid => sub { $_ != 0 }, +); + +sub new { + my $class = shift; + @_ % 2 == 0 or confess 0+@_; + my(%attr) = @_; + my $lock = $class->SUPER::new(%attr); + return $lock; +} + +sub desc { + my $lock = shift; + return $lock->getpath; +} + +# test / set + +sub test { + my $lock = shift; + @_ == 0 or confess 0+@_; + my $fh = gensym(); + if (! open($fh, "+<$lock->{path}")) { + if ($! != Errno::ENOENT) { + $log->put("$lock->{path}: open failed: $!"); + return undef; + } + return 0; # file does not exist + } + if (flock($fh, LOCK_EX|LOCK_NB)) { + close($fh); + return 0; # file was not locked + } + if ($^O eq 'MSWin32') { + close($fh); + if (! open($fh, "<$lock->{path}x")) { + $log->put("$lock->{path}x: open failed: $!"); + return undef; + } + } + my $pid = <$fh>; + close($fh); + ($pid) = split(' ', $pid); + if ($pid+0 == 0) { + $log->put("$lock->{path}: locked but pid='$pid' is zero"); + return undef; + } + $lock->{pid} = $pid; + return 1; # file was locked +} + +sub set { + my $lock = shift; + @_ == 0 or confess 0+@_; + my $fh = gensym(); + if (! open($fh, "+<$lock->{path}")) { + if ($! != Errno::ENOENT) { + $log->put("$lock->{path}: open failed: $!"); + return undef; + } + close($fh); + if (! open($fh, ">$lock->{path}")) { + $log->put("$lock->{path}: create failed: $!"); + return undef; + } + } + if (! flock($fh, LOCK_EX|LOCK_NB)) { + $log->put("$lock->{path}: flock failed: $!"); + close($fh); + return 0; # file was probably locked + } + my $line = "$$\n"; + if ($^O eq 'MSWin32') { + my $gh = gensym(); + if (! open($gh, ">$lock->{path}x")) { + $log->put("$lock->{path}x: open for write failed: $!"); + close($fh); + return undef; + } + if (! syswrite($gh, $line)) { + close($fh); + close($gh); + $log->put("$lock->{path}x: write failed: $!"); + return undef; + } + close($gh); + } else { + if (! truncate($fh, 0)) { + close($fh); + $log->put("$lock->{path}: truncate failed: $!"); + return undef; + } + if (! syswrite($fh, $line)) { + close($fh); + $log->put("$lock->{path}: write failed: $!"); + return undef; + } + } + $lock->{fh} = $fh; + return 1; # file is now locked by us +} + +sub close { + my $lock = shift; + @_ == 0 or confess 0+@_; + my $fh = delete $lock->{fh}; + if ($fh) { + close($fh); + } +} + +1; +# vim:set sw=4: diff --git a/ndb/tools/ndbnet/lib/NDB/Util/Log.pm b/ndb/tools/ndbnet/lib/NDB/Util/Log.pm new file mode 100644 index 00000000000..44b39df84e6 --- /dev/null +++ b/ndb/tools/ndbnet/lib/NDB/Util/Log.pm @@ -0,0 +1,367 @@ +package NDB::Util::Log; + +use strict; +use Carp; +use Symbol; +use Data::Dumper (); + +require NDB::Util::Base; + +use vars qw(@ISA); +@ISA = qw(NDB::Util::Base); + +# constructors + +my $instance = undef; +my %attached = (); + +my %priolevel = qw(user 0 fatal 1 error 2 warn 3 notice 4 info 5 debug 6); +my %partlist = qw(time 1 pid 2 prio 3 text 4 line 5); + +NDB::Util::Log->attributes( + prio => sub { defined($priolevel{$_}) }, + parts => sub { ref eq 'HASH' }, + stack => sub { ref eq 'ARRAY' }, + io => sub { ref && $_->isa('NDB::Util::IO') }, + active => sub { defined }, + censor => sub { ref eq 'ARRAY' }, +); + +sub setpart { + my $log = shift; + @_ % 2 == 0 or confess 0+@_; + while (@_) { + my $part = shift; + my $onoff = shift; + $partlist{$part} or confess 'oops'; + $log->getparts->{$part} = $onoff; + } +} + +sub getpart { + my $log = shift; + @_ == 1 or confess 0+@_; + my($part) = @_; + $partlist{$part} or confess 'oops'; + return $log->getparts->{$part}; +} + +sub instance { + my $class = shift; + @_ % 2 == 0 or confess 0+@_; + my(%attr) = @_; + if (! $instance) { + $instance = $class->SUPER::new(%attr); + $instance->setprio(q(info)); + $instance->setparts({ text => 1 }); + $instance->setstack([]); + $instance->setcensor([]); + my $io = NDB::Util::IO->new(fh => \*STDERR, %attr) + or confess 'oops'; + $instance->setio($io); + } + return $instance; +} + +# attached logs are written in parallel to main log +# user log is a special server-to-client log + +sub attach { + my $log = shift; + @_ % 2 == 1 or confess 0+@_; + my($key, %attr) = @_; + $attached{$key} and confess 'oops'; + my $alog = $attached{$key} = $log->clone(%attr); + return $alog; +} + +sub detach { + my $log = shift; + @_ == 1 or confess 0+@_; + my($key) = @_; + $attached{$key} or return undef; + my $alog = delete $attached{$key}; + return $alog; +} + +sub attachuser { + my $log = shift; + @_ % 2 == 0 or confess 0+@_; + my(%attr) = @_; + %attr = ( + prio => q(user), + parts => { text => 1 }, + censor => [ qw(NDB::Net::Client NDB::Util::IO) ], + %attr); + my $alog = $log->attach(q(user), %attr); + return $alog; +} + +sub detachuser { + my $log = shift; + @_ == 0 or confess 0+@_; + my $alog = $log->detach(q(user)); + return $alog; +} + +# input / output + +sub setfile { + my $log = shift; + @_ == 1 or confess 0+@_; + my $file = shift; + if (! open(STDOUT, ">>$file")) { + $log->put("$file: open for append failed: $!"); + return undef; + } + select(STDOUT); + $| = 1; + open(STDERR, ">&STDOUT"); + select(STDERR); + $| = 1; + return 1; +} + +sub close { + my $log = shift; + $log->getio->close; +} + +sub closeall { + my $class = shift; + for my $key (sort keys %attached) { + my $log = $attached{$key}; + $log->close; + } + $instance->close; +} + +# private + +sub entry { + my $log = shift; + my($clear, $file, $line, @args) = @_; + $file =~ s!^.*\bNDB/!!; + $file =~ s!^.*/bin/([^/]+)$!$1!; + my $text = undef; + if (@args) { + $text = shift(@args); + if (! ref($text)) { + if (@args) { + $text = sprintf($text, @args); + } + while (chomp($text)) {} + } + } + if ($clear) { + $#{$log->getstack} = -1; + } + push(@{$log->getstack}, { + line => "$file($line)", + text => $text, + }); +} + +sub matchlevel { + my $log = shift; + my $msgprio = shift; + my $logprio = $log->getprio; + my $msglevel = $priolevel{$msgprio}; + my $loglevel = $priolevel{$logprio}; + defined($msglevel) && defined($loglevel) + or confess 'oops'; + if ($msglevel == 0 && $loglevel == 0) { + return $msgprio eq $logprio; + } + if ($msglevel == 0 && $loglevel != 0) { + return $loglevel >= $priolevel{q(info)}; + } + if ($msglevel != 0 && $loglevel == 0) { + return $msglevel <= $priolevel{q(notice)}; + } + if ($msglevel != 0 && $loglevel != 0) { + return $msglevel <= $loglevel; + } + confess 'oops'; +} + +sub print { + my $log = shift; + @_ == 2 or confess 0+@_; + my($prio, $tmpstack) = @_; + if ($log->hasactive) { # avoid recursion + return; + } + if (! $log->matchlevel($prio)) { + return; + } + $log->setactive(1); + my @text = (); + if ($log->getpart(q(time))) { + my @t = localtime(time); + push(@text, sprintf("%02d-%02d/%02d:%02d:%02d", + 1+$t[4], $t[3], $t[2], $t[1], $t[0])); + } + if ($log->getpart(q(pid))) { + push(@text, "[$$]"); + } + if ($log->getpart(q(prio)) && + (0 == $priolevel{$prio} || $priolevel{$prio} <= $priolevel{notice})) + { + push(@text, "[$prio]"); + } + if ($log->getpart(q(text))) { + my @stack = @$tmpstack; + while (@stack) { + my $s = pop(@stack); + my $text = $s->{text}; + if (ref($text)) { + if (grep($text->isa($_), @{$log->getcensor})) { + next; + } + $text = $text->desc; + } + push(@text, $text) if length($text) > 0; + } + } + if ($log->getpart(q(line)) && + (0 < $priolevel{$prio} && $priolevel{$prio} <= $priolevel{warn})) + { + push(@text, "at"); + my @stack = @$tmpstack; + while (@stack) { + my $s = shift(@stack); + defined($s->{line}) or confess 'oops'; + if ($text[-1] ne $s->{line}) { + push(@text, $s->{line}); + } + } + } + $log->getio->write("@text\n"); + $log->delactive; +} + +sub printall { + my $log = shift; + @_ == 1 or confess 0+@_; + my($prio) = @_; + my $logstack = $log->getstack; + if (! @$logstack) { + $log->put("[missing log message]"); + } + my @tmpstack = (); + while (@$logstack) { + push(@tmpstack, shift(@$logstack)); + } + for my $key (sort keys %attached) { + my $alog = $attached{$key}; + $alog->print($prio, \@tmpstack); + } + $instance->print($prio, \@tmpstack); +} + +# public + +sub push { + my $log = shift; + my(@args) = @_; + my($pkg, $file, $line) = caller; + $log->entry(0, $file, $line, @args); + return $log; +} + +sub put { + my $log = shift; + my(@args) = @_; + my($pkg, $file, $line) = caller; + $log->entry(1, $file, $line, @args); + return $log; +} + +sub fatal { + my $log = shift; + @_ == 0 or confess 0+@_; + $log->printall(q(fatal)); + exit(1); +} + +sub error { + my $log = shift; + @_ == 0 or confess 0+@_; + $log->printall(q(error)); + return $log; +} + +sub warn { + my $log = shift; + @_ == 0 or confess 0+@_; + $log->printall(q(warn)); + return $log; +} + +sub notice { + my $log = shift; + @_ == 0 or confess 0+@_; + $log->printall(q(notice)); + return $log; +} + +sub info { + my $log = shift; + @_ == 0 or confess 0+@_; + $log->printall(q(info)); + return $log; +} + +sub debug { + my $log = shift; + @_ == 0 or confess 0+@_; + $log->printall(q(debug)); + return $log; +} + +sub user { + my $log = shift; + @_ == 0 or confess 0+@_; + $log->printall(q(user)); + return $log; +} + +# return values from server to client + +sub putvalue { + my $log = shift; + @_ == 1 or confess 0+@_; + my($value) = @_; + my $d = Data::Dumper->new([$value], [qw($value)]); + $d->Indent(0); + $d->Useqq(1); + my $dump = $d->Dump; + $dump =~ /^\s*\$value\s*=\s*(.*);\s*$/ or confess $dump; + $log->push("[value $1]"); +} + +sub hasvalue { + my $log = shift; + @_ == 1 or confess 0+@_; + my($line) = @_; + return $line =~ /\[value\s+(.*)\]/; +} + +sub getvalue { + my $log = shift; + @_ == 1 or confess 0+@_; + my($line) = @_; + $line =~ /\[value\s+(.*)\]/ or confess $line; + my $expr = $1; + my($value); + eval "\$value = $expr"; + if ($@) { + $log->put("$line: eval error: $@"); + return undef; + } + return [$value]; +} + +1; +# vim:set sw=4: diff --git a/ndb/tools/ndbnet/lib/NDB/Util/Socket.pm b/ndb/tools/ndbnet/lib/NDB/Util/Socket.pm new file mode 100644 index 00000000000..00e8b6eca51 --- /dev/null +++ b/ndb/tools/ndbnet/lib/NDB/Util/Socket.pm @@ -0,0 +1,158 @@ +package NDB::Util::Socket; + +use strict; +use Carp; +use Symbol; +use Socket; +use Errno; + +require NDB::Util::IO; + +use vars qw(@ISA); +@ISA = qw(NDB::Util::IO); + +# constructors + +my $log; + +sub initmodule { + $log = NDB::Util::Log->instance; +} + +NDB::Util::Socket->attributes( + domain => sub { $_ == PF_INET || $_ == PF_UNIX }, + type => sub { $_ == SOCK_STREAM }, + proto => sub { /^(0|tcp)$/ }, +); + +sub desc { + my $socket = shift; + return $socket->SUPER::desc; +} + +sub new { + my $class = shift; + @_ % 2 == 0 or confess 0+@_; + my(%attr) = @_; + my $socket = $class->SUPER::new(%attr); + $socket->setdomain($attr{domain}) + or $log->push, return undef; + $socket->settype($attr{type}) + or $log->push, return undef; + $socket->setproto($attr{proto}) + or $log->push, return undef; + my $nproto; + if ($socket->getproto =~ /^\d+/) { + $nproto = $socket->getproto; + } + else { + $nproto = getprotobyname($socket->getproto); + unless (defined($nproto)) { + $log->put("%s: getprotobyname failed", $socket->getproto); + return undef; + } + } + my $fh = gensym(); + if (! socket($fh, $socket->getdomain, $socket->gettype, $nproto)) { + $log->put("create socket failed: $!"); + return undef; + } + $socket->setfh($fh) + or $log->push, return undef; + return $socket; +} + +sub setopt { + my $socket = shift; + @_ >= 2 or confess 'oops'; + my $level = shift; + my $optname = shift; + my $optval = @_ ? pack("l*", @_) : undef; + my $fh = $socket->getfh; + if (! setsockopt($fh, $level, $optname, $optval)) { + $log->put("setsockopt failed: $!")->push($socket); + return undef; + } + return 1; +} + +sub connect { + my $socket = shift; + @_ == 1 or confess 0+@_; + my($paddr) = @_; + my $fh = $socket->getfh; + if (! connect($fh, $paddr)) { + $log->put("connect failed: $!")->push($socket); + return undef; + } + $log->put("connect done")->push($socket)->debug; + return 1; +} + +sub bind { + my $socket = shift; + @_ == 1 or confess 0+@_; + my($paddr) = @_; + my $fh = $socket->getfh; + if (! bind($fh, $paddr)) { + $log->put("bind failed: $!")->push($socket); + return undef; + } + return 1; +} + +sub listen { + my $socket = shift; + @_ == 0 or confess 0+@_; + my $fh = $socket->getfh; + if (! listen($fh, SOMAXCONN)) { + $log->put("listen failed: $!")->push($socket); + return undef; + } + return 1; +} + +sub accept { + my $socket = shift; + @_ == 1 or confess 0+@_; + my($timeout) = @_; + $timeout =~ /^\d+$/ or confess 'oops'; + my $fh = $socket->getfh; + my $gh = gensym(); + my $paddr; + eval { + if ($^O ne 'MSWin32' && $timeout > 0) { + local $SIG{ALRM} = sub { die("timed out\n") }; + alarm($timeout); + $paddr = accept($gh, $fh); + alarm(0); + } + else { + $paddr = accept($gh, $fh); + } + }; + if ($@) { + $log->put("accept failed: $@")->push($socket); + return undef; + } + if (! $paddr) { + my $errno = 0+$!; + if ($errno == Errno::EINTR) { + $log->put("accept interrupted")->push($socket); + return 0; + } + $log->put("accept failed: $!")->push($socket); + return undef; + } + my $csocket = $socket->clone(fh => $gh); + $csocket->acceptaddr($paddr); + return $csocket; +} + +sub DESTROY { + my $socket = shift; + $socket->close; +} + +1; +# vim:set sw=4: diff --git a/ndb/tools/ndbnet/lib/NDB/Util/SocketINET.pm b/ndb/tools/ndbnet/lib/NDB/Util/SocketINET.pm new file mode 100644 index 00000000000..faaa568a08e --- /dev/null +++ b/ndb/tools/ndbnet/lib/NDB/Util/SocketINET.pm @@ -0,0 +1,86 @@ +package NDB::Util::SocketINET; + +use strict; +use Carp; +use Symbol; +use Socket; +use Errno; + +require NDB::Util::Socket; + +use vars qw(@ISA); +@ISA = qw(NDB::Util::Socket); + +# constructors + +my $log; + +sub initmodule { + $log = NDB::Util::Log->instance; +} + +NDB::Util::SocketINET->attributes( + host => sub { /^\S+$/ }, + port => sub { /^\d+$/ }, +); + +sub new { + my $class = shift; + @_ % 2 == 0 or confess 0+@_; + my(%attr) = @_; + my $socket = $class->SUPER::new(%attr, + domain => PF_INET, type => SOCK_STREAM, proto => 'tcp') + or $log->push, return undef; + return $socket; +} + +sub connect { + my $socket = shift; + @_ == 2 or confess 0+@_; + my($host, $port) = @_; + $port =~ /^\d+$/ or confess 'oops'; + my $iaddr = inet_aton($host); + if (! $iaddr) { + $log->put("host $host not found")->push($socket); + return undef; + } + my $paddr = pack_sockaddr_in($port, $iaddr); + $socket->SUPER::connect($paddr) + or $log->push, return undef; + $socket->sethost($host) + or $log->push, return undef; + $socket->setport($port) + or $log->push, return undef; + return 1; +} + +sub bind { + my $socket = shift; + @_ == 1 or confess 0+@_; + my($port) = @_; + $port =~ /^\d+$/ or confess 'oops'; + my $paddr = pack_sockaddr_in($port, INADDR_ANY); + $socket->SUPER::bind($paddr) + or $log->push, return undef; + $socket->setport($port) + or $log->push, return undef; + return 1; +} + +sub acceptaddr { + my $csocket = shift; + @_ == 1 or confess 0+@_; + my($paddr) = @_; + my($port, $iaddr) = unpack_sockaddr_in($paddr); + my $host = gethostbyaddr($iaddr, AF_INET); + $csocket->sethost($host) + or $log->push, return undef; + $csocket->setport($port) + or $log->push, return undef; + $log->put("accept: host=%s port=%d", + $csocket->gethost, $csocket->getport)->push($csocket)->debug; + return 1; +} + +1; +# vim:set sw=4: diff --git a/ndb/tools/ndbnet/lib/NDB/Util/SocketUNIX.pm b/ndb/tools/ndbnet/lib/NDB/Util/SocketUNIX.pm new file mode 100644 index 00000000000..9c6b3115f6a --- /dev/null +++ b/ndb/tools/ndbnet/lib/NDB/Util/SocketUNIX.pm @@ -0,0 +1,76 @@ +package NDB::Util::SocketUNIX; + +use strict; +use Carp; +use Symbol; +use Socket; +use Errno; + +require NDB::Util::Socket; + +use vars qw(@ISA); +@ISA = qw(NDB::Util::Socket); + +# constructors + +my $log; + +sub initmodule { + $log = NDB::Util::Log->instance; +} + +NDB::Util::SocketUNIX->attributes( + path => sub { /^\S+$/ }, +); + +sub new { + my $class = shift; + @_ % 2 == 0 or confess 0+@_; + my(%attr) = @_; + my $socket = $class->SUPER::new(%attr, + domain => PF_UNIX, type => SOCK_STREAM, proto => 0) + or $log->push, return undef; + return $socket; +} + +sub connect { + my $socket = shift; + @_ == 1 or confess 0+@_; + my($path) = @_; + $path =~ /^\S+$/ or confess 'oops'; + my $paddr = pack_sockaddr_un($path); + $socket->SUPER::connect($paddr) + or $log->push, return undef; + $socket->setpath($path) + or $log->push, return undef; + return 1; +} + +sub bind { + my $socket = shift; + @_ == 1 or confess 0+@_; + my($path) = @_; + $path =~ /^\S+$/ or confess 'oops'; + my $paddr = pack_sockaddr_un($path); + $socket->SUPER::bind($paddr) + or $log->push, return undef; + $socket->setpath($path) + or $log->push, return undef; + return 1; +} + +sub acceptaddr { + my $csocket = shift; + @_ == 1 or confess 0+@_; + my($paddr) = @_; + return 1; # crash + my $path = unpack_sockaddr_un($paddr); + $csocket->setpath($path) + or $log->push, return undef; + $log->put("%s accept: path=%s", + $csocket->getpath)->push($csocket)->debug; + return 1; +} + +1; +# vim:set sw=4: diff --git a/ndb/tools/ndbnet/ndbnet.pl b/ndb/tools/ndbnet/ndbnet.pl new file mode 100644 index 00000000000..5f6648da46d --- /dev/null +++ b/ndb/tools/ndbnet/ndbnet.pl @@ -0,0 +1,339 @@ +#! /usr/local/bin/perl + +use strict; +use POSIX(); +use Socket; +use Getopt::Long; +use File::Basename; +use Term::ReadLine; + +use NDB::Net; + +select(STDOUT); +$| = 1; + +# get options and environment + +my $log = NDB::Util::Log->instance; +$log->setpart(); + +sub printhelp { + print <<END; +ndbnet -- ndbnet client +usage: ndbnet [options] [command...] +--help print this text and exit +--base dir ndb installation, default \$NDB_BASE +--netcfg file net config, default \$NDB_BASE/etc/ndbnet.xml +--server id ndbnetd server id, or host:port if no config +--noterm no prompting and no input editing +--log prio debug/info/notice/warn/error/fatal, default info +command... command (by default becomes interactive) +END + exit(0); +} + +my $progopts = {}; +my @progargv; + +anon: { + local $SIG{__WARN__} = sub { + my $errstr = "@_"; + while (chomp($errstr)) {} + $log->put("$errstr (try --help)")->fatal; + }; + Getopt::Long::Configure(qw( + default no_getopt_compat no_ignore_case require_order + )); + GetOptions($progopts, qw( + help base=s netcfg=s server=s noterm log=s + )); +} + +$progopts->{help} && printhelp(); +if (defined(my $prio = $progopts->{log})) { + $log->setprio($prio); +} +@progargv = @ARGV; + +my $netenv = NDB::Net::Env->instance( + base => $progopts->{base}, + netcfg => $progopts->{netcfg}, +); +$netenv or $log->fatal; + +# get servers from command line or from net config + +my @servers = (); +my $netcfg; +if ($netenv->hasnetcfg) { + $netcfg = NDB::Net::Config->new(file => $netenv->getnetcfg); +} + +if (defined(my $id = $progopts->{server})) { + if ($id !~ /:/) { + $netcfg or $log->put("need net config to find server $id")->fatal; + $netcfg->load or $log->push->fatal; + $netcfg->getservers or $log->push->fatal; + my $s = NDB::Net::Server->get($id) or $log->fatal; + push(@servers, $s); + } else { + my($host, $port) = split(/:/, $id, 2); + my $s = NDB::Net::ServerINET->new(id => "?", host => $host, port => $port) + or $log->fatal; + push(@servers, $s); + } +} else { + $netcfg or $log->put("need net config to find servers")->fatal; + $netcfg->load or $log->push->fatal; + my $list = $netcfg->getservers or $log->fatal; + @servers= @$list; + @servers or $log->put("no servers")->push($netcfg)->fatal; +} + +# server commands + +my $server; +sub doserver { + my($cmd) = @_; + my $ret; + my $found; + for my $s (@servers) { + if (! $s->testconnect) { + $log->warn; + next; + } + $found = 1; + if ($server ne $s) { + $server = $s; + $log->put("selected")->push($server)->debug; + } + $ret = $server->request($cmd); + last; + } + if (! $found) { + $log->put("no available server"); + return undef; + } + my %seen = (); + @servers = grep(! $seen{$_}++, $server, @servers); + defined($ret) or $log->push, return undef; + return $ret; +} + +# local commands + +sub cmd_help { + my($cmd) = @_; + my $text = $cmd->helptext; + defined($text) or return undef; + while(chomp($text)) {} + print $text, "\n"; + return 1; +} + +sub cmd_alias { + my($cmd) = @_; + my $text = $cmd->aliastext; + while(chomp($text)) {} + print $text, "\n"; +} + +sub cmd_quit { + my($cmd) = @_; + $log->put("bye-bye")->info; + exit(0); +} + +sub cmd_server { + my($cmd) = @_; + my $action = $cmd->getarg(0); + if ($action !~ /^(start|restart|stop|ping)$/) { + $log->put("$action: undefined action"); + return undef; + } + if ($action eq 'start') { + $cmd->setopt('direct') + or $log->push, return undef; + } + if ($action eq 'ping' && ! @{$cmd->getarglist(1)}) { + $cmd->setopt('all') + or $log->push, return undef; + } + if (! $cmd->getopt('direct')) { + return doserver($cmd); + } + $netcfg->load + or return undef; + my $servers = $netcfg->getservers + or return undef; + my $list; + if ($cmd->getopt('all')) { + $list = $servers; + } + else { + $list = []; + for my $id (@{$cmd->getarglist(1)}) { + if (my $s = NDB::Net::ServerINET->get($id)) { + push(@$list, $s); + next; + } + if (my $s = NDB::Net::ServerINET->match($id, undef, $servers)) { + if (@$s) { + push(@$list, @$s); + next; + } + } + $log->push; + return undef; + } + } + if (! @$list) { + $log->put("no servers specified, use --all for all")->info; + return 1; + } + for my $s (@$list) { + if ($action eq 'ping') { + if ($s->testconnect) { + $log->put("is alive")->push($s); + } + $log->info; + next; + } + if ($action eq 'start') { + if ($s->testconnect) { + $log->put("already running")->push($s)->info; + next; + } + } + my $script = $cmd->getopt('script') || "ndbnetd"; + my @cmd = ($script); + if ($action eq 'restart') { + push(@cmd, "--restart"); + } + if ($action eq 'stop') { + push(@cmd, "--stop"); + } + if ($cmd->getopt('pass')) { + my $base = $netenv->getbase; + $cmd[0] = "$base/bin/$cmd[0]"; + } + if ($cmd->getopt('parallel')) { + my $pid = fork; + defined($pid) or + $log->push("fork failed: $!"), return undef; + $pid > 0 && next; + } + $log->put("$action via ssh")->push($s->getcanon)->push($s)->info; + $log->put("run: @cmd")->push($s)->debug; + system 'ssh', '-n', $s->getcanon, "@cmd"; + if ($cmd->getopt('parallel')) { + exit(0); + } + } + if ($cmd->getopt('parallel')) { + while ((my $pid = waitpid(-1, &POSIX::WNOHANG)) > 0) { + ; + } + } + return 1; +} + +sub cmd_list { + my($cmd) = @_; + my $ret = doserver($cmd) or + $log->push, return undef; + my @out = (); + my @o = qw(NAME NODES PROCESS STATUS COMMENT); + push(@out, [ @o ]); + for my $name (sort keys %$ret) { + $#o = -1; + $o[0] = $name; + my $dbsts = $ret->{$name}; + my @tmp = sort { $a->{id} <=> $b->{id} } values %{$dbsts->{node}}; + my @nodesmgmt = grep($_->{type} eq 'mgmt', @tmp); + my @nodesdb = grep($_->{type} eq 'db', @tmp); + my @nodesapi = grep($_->{type} eq 'api', @tmp); + my @nodes = (@nodesmgmt, @nodesdb, @nodesapi); + $o[1] = sprintf("%d/%d/%d", 0+@nodesmgmt, 0+@nodesdb, 0+@nodesapi); + $o[2] = "-"; + $o[3] = "-"; + $o[4] = $dbsts->{comment}; + $o[4] .= " - " if length $o[4]; + $o[4] .= basename($dbsts->{home}); + push(@out, [ @o ]); + for my $nodests (@nodes) { + $#o = -1; + $o[0] = $nodests->{id} . "-" . $nodests->{type}; + $o[1] = $nodests->{host}; + $o[1] =~ s/\..*//; + $o[2] = $nodests->{run}; + $o[3] = $nodests->{status} || "-"; + $o[4] = $nodests->{comment} || "-"; + push(@out, [ @o ]); + } + } + my @len = ( 8, 8, 8, 8 ); + for my $o (@out) { + for my $i (0..$#len) { + $len[$i] = length($o->[$i]) if $len[$i] < length($o->[$i]); + } + } + for my $o (@out) { + my @t = (); + for my $i (0..$#{$out[0]}) { + my $f = $len[$i] ? "%-$len[$i].$len[$i]s" : "%s"; + push(@t, sprintf($f, $o->[$i])); + } + print "@t\n"; + } + return 1; +} + +# main program + +sub docmd { + my(@args) = @_; + my $cmd = NDB::Net::Command->new(@args) + or return undef; + my $name = $cmd->getname; + my $doit; + { + no strict 'refs'; + $doit = *{"cmd_$name"}; + } + if (! defined(&$doit)) { + $doit = \&doserver; + } + my $ret = &$doit($cmd); + defined($ret) or $log->push, return undef; + return $ret; +} + +if (@progargv) { + docmd(argv => \@progargv) or $log->push->fatal; + exit(0); +} + +my $term; +if ((-t STDIN) && (-t STDOUT) && ! $progopts->{noterm}) { + $term = Term::ReadLine->new("ndbnet"); + $term->ornaments(0); +} + +print "type 'h' for help\n" if $term; +while (1) { + my($line); + while (! $line) { + $line = $term ? $term->readline("> ") : <STDIN>; + if (! defined($line)) { + print("\n") if $term; + $line = 'EOF'; + } + } + my $ret = docmd(line => $line); + $ret or $log->error; + ($line eq 'EOF') && last; +} + +1; +# vim:set sw=4: diff --git a/ndb/tools/ndbnet/ndbnetd.pl b/ndb/tools/ndbnet/ndbnetd.pl new file mode 100644 index 00000000000..95fa5322abc --- /dev/null +++ b/ndb/tools/ndbnet/ndbnetd.pl @@ -0,0 +1,400 @@ +#! /usr/local/bin/perl + +use strict; +use POSIX(); +use Socket; +use Getopt::Long; +use File::Basename; +use File::Spec; + +use NDB::Net; + +# save argv for restart via client +my @origargv = @ARGV; + +# get options and environment + +my $log = NDB::Util::Log->instance; + +sub printhelp { + print <<END; +ndbnetd -- ndbnet daemon +usage: ndbnetd [options] +--help print this text and exit +--base dir ndb installation, default \$NDB_BASE +--netcfg file net config, default \$NDB_BASE/etc/ndbnet.xml +--port num port number (if more than 1 server on this host) +--stop kill any existing server +--restart kill any existing server and start a new one +--fg run in foreground (test option) +--log prio debug/info/notice/warn/error/fatal, default info +END + exit(0); +} + +my $progopts = {}; +anon: { + local $SIG{__WARN__} = sub { + my $errstr = "@_"; + while (chomp($errstr)) {} + $log->put("$errstr (try --help)")->fatal; + }; + Getopt::Long::Configure(qw( + default no_getopt_compat no_ignore_case no_require_order + )); + GetOptions($progopts, qw( + help base=s netcfg=s port=i stop restart fg log=s + )); +} +$progopts->{help} && printhelp(); +if (defined(my $prio = $progopts->{log})) { + $log->setprio($prio); +} +@ARGV and $log->put("extra args on command line")->fatal; + +my $netenv = NDB::Net::Env->instance( + base => $progopts->{base}, + netcfg => $progopts->{netcfg}, +); +$netenv or $log->fatal; +$netenv->hasbase or $log->put("need NDB_BASE")->fatal; + +# load net config and find our entry + +my $netcfg = NDB::Net::Config->new(file => $netenv->getnetcfg) + or $log->push->fatal; +my $server; + +sub loadnetcfg { + $netcfg->load or $log->push->fatal; + my $servers = $netcfg->getservers or $log->fatal; + my $host = $netenv->gethostname; + my $port = $progopts->{port} || 0; + my $list = NDB::Net::ServerINET->match($host, $port, $servers) + or $log->push->fatal; + @$list == 1 + or $log->push->fatal; + $server = $list->[0]; + $server->setlocal; +} +loadnetcfg(); +$log->put("this server")->push($server)->debug; + +# check if server already running + +my $lock; +anon: { + my $dir = NDB::Util::Dir->new(path => File::Spec->catfile($netenv->getbase, "run")); + $dir->mkdir or $log->fatal; + my $name = sprintf("ndbnet%s.pid", $server->getid); + $lock = $dir->getfile($name)->getlock; + my $ret; + $ret = $lock->test; + defined($ret) or $log->fatal; + if ($ret) { + if ($progopts->{stop} || $progopts->{restart}) { + $log->put("stopping server %s pid=%s", $netenv->gethostname, $lock->getpid)->info; + if ($^O ne 'MSWin32') { + kill -15, $lock->getpid; + } else { + kill 15, $lock->getpid; + } + while (1) { + sleep 1; + $ret = $lock->test; + defined($ret) or $log->fatal; + if ($ret) { + if (! kill(0, $lock->getpid) && $! == Errno::ESRCH) { + $log->put("locked but gone (linux bug?)")->info; + $lock->unlink; + $ret = 0; + } + } + if (! $ret) { + if ($progopts->{stop}) { + $log->put("stopped")->info; + exit(0); + } + $log->put("restarting server %s", $netenv->gethostname)->info; + last; + } + } + } + else { + $log->put("already running pid=%s", $lock->getpid)->fatal; + } + } + else { + if ($progopts->{stop}) { + $log->put("not running")->info; + exit(0); + } + } + $lock->set or $log->fatal; +} + +# become daemon, re-obtain the lock, direct log to file + +anon: { + $log->setpart(time => 1, pid => 1, prio => 1, line => 1); + $progopts->{fg} && last anon; + $lock->close; + my $dir = NDB::Util::Dir->new(path => $netenv->getbase . "/log"); + $dir->mkdir or $log->fatal; + my $pid = fork(); + defined($pid) or $log->put("fork failed: $!")->fatal; + if ($pid) { + exit(0); + } + $lock->set or $log->fatal; + if ($^O ne 'MSWin32') { + POSIX::setsid() or $log->put("setsid failed: $!")->fatal; + } + open(STDIN, "</dev/null"); + my $name = sprintf("ndbnet%s.log", $server->getid); + $log->setfile($dir->getfile($name)->getpath) or $log->fatal; +} +$log->put("ndbnetd started pid=$$ port=%s", $server->getport)->info; + +# create server socket and event + +my $socket = NDB::Util::SocketINET->new or $log->fatal; +my $event = NDB::Util::Event->new; + +# commands + +sub cmd_server_fg { + my($cmd) = @_; + my $action = $cmd->getarg(0); + if (! $cmd->getopt('local')) { + return 1; + } + if ($action eq 'restart') { + my $prog = $netenv->getbase . "/bin/ndbnetd"; + my @argv = @origargv; + if (! grep(/^--restart$/, @argv)) { + push(@argv, "--restart"); + } + unshift(@argv, basename($prog)); + $lock->close; + $socket->close; + $log->put("restart: @argv")->push($server)->user; + $log->put("server restart")->putvalue(1)->user; + exec $prog @argv; + die "restart failed: $!"; + } + if ($action eq 'stop') { + $log->put("stop by request")->push($server)->user; + $log->put("server stop")->putvalue(1)->user; + exit(0); + } + if ($action eq 'ping') { + return 1; + } + $log->put("$action: unimplemented"); + return undef; +} + +sub cmd_server_bg { + my($cmd) = @_; + loadnetcfg() or return undef; + my $action = $cmd->getarg(0); + if (! $cmd->getopt('local')) { + $cmd->setopt('local') + or $log->push, return undef; + my $servers = $netcfg->getservers or $log->fatal; + my $list; + if ($cmd->getopt('all')) { + $list = $servers; + } + else { + $list = []; + for my $id (@{$cmd->getarglist(1)}) { + if (my $s = NDB::Net::ServerINET->get($id)) { + push(@$list, $s); + next; + } + if (my $s = NDB::Net::ServerINET->match($id, undef, $servers)) { + if (@$s) { + push(@$list, @$s); + next; + } + } + $log->push; + return undef; + } + } + my $fail = 0; + for my $s (@$list) { + if (! $s->request($cmd)) { + $log->push->user; + $fail++; + } + } + if ($fail) { + $log->put("failed %d/%d", $fail, scalar(@$list)); + return undef; + } + return 1; + } + if ($action eq 'restart') { + return 1; + } + if ($action eq 'stop') { + return 1; + } + if ($action eq 'ping') { + $log->put("is alive")->push($server)->user; + return 1; + } + $log->put("$action: unimplemented"); + return undef; +} + +sub cmd_start_bg { + my($cmd) = @_; + loadnetcfg() or return undef; + my $db = $netcfg->getdatabase($cmd->getarg(0)) or return undef; + $db->start($cmd->getopts) or return undef; + return 1; +} + +sub cmd_startnode_bg { + my($cmd) = @_; + loadnetcfg() or return undef; + my $db = $netcfg->getdatabase($cmd->getarg(0)) or return undef; + my $node = $db->getnode($cmd->getarg(1)) or return undef; + $node->start($cmd->getopts) or return undef; + return 1; +} + +sub cmd_stop_bg { + my($cmd) = @_; + my $db = $netcfg->getdatabase($cmd->getarg(0)) or return undef; + $db->stop($cmd->getopts) or return undef; + return 1; +} + +sub cmd_stopnode_bg { + my($cmd) = @_; + my $db = $netcfg->getdatabase($cmd->getarg(0)) or return undef; + my $node = $db->getnode($cmd->getarg(1)) or return undef; + $node->stop($cmd->getopts) or return undef; + return 1; +} + +sub cmd_kill_bg { + my($cmd) = @_; + my $db = $netcfg->getdatabase($cmd->getarg(0)) or return undef; + $db->kill($cmd->getopts) or return undef; + return 1; +} + +sub cmd_killnode_bg { + my($cmd) = @_; + my $db = $netcfg->getdatabase($cmd->getarg(0)) or return undef; + my $node = $db->getnode($cmd->getarg(1)) or return undef; + $node->kill($cmd->getopts) or return undef; + return 1; +} + +sub cmd_statnode_bg { + my($cmd) = @_; + my $db = $netcfg->getdatabase($cmd->getarg(0)) or return undef; + my $node = $db->getnode($cmd->getarg(1)) or return undef; + my $ret = $node->stat($cmd->getopts) or return undef; + return $ret; +} + +sub cmd_list_bg { + my($cmd) = @_; + loadnetcfg() or return undef; + my $dblist; + if ($cmd->getarg(0)) { + my $db = $netcfg->getdatabase($cmd->getarg(0)) or return undef; + $dblist = [ $db ]; + } else { + $dblist = $netcfg->getdatabases or return undef; + } + my $ret = {}; + for my $db (@$dblist) { + my $status = $db->list($cmd->getopts) || "error"; + $ret->{$db->getname} = $status; + } + return $ret; +} + +sub cmd_writenode_bg { + my($cmd) = @_; + my $db = $netcfg->getdatabase($cmd->getarg(0)) or return undef; + my $node = $db->getnode($cmd->getarg(1)) or return undef; + my $ret = $node->write($cmd->getopts, $cmd->getarg(2)) or return undef; + return $ret; +} + +# main program + +sub checkchild { + while ((my $pid = waitpid(-1, &POSIX::WNOHANG)) > 0) { + $log->put("harvested pid=$pid")->info; + } +} + +my $gotterm = 0; +$SIG{INT} = sub { $gotterm = 1 }; +$SIG{TERM} = sub { $gotterm = 1 }; + +$socket->setopt(SOL_SOCKET, SO_REUSEADDR, 1) or $log->fatal; +$socket->bind($server->getport) or $log->fatal; +$socket->listen or $log->fatal; +$event->set($socket, 'r'); + +loop: { + try: { + my $n = $event->poll(10); + if ($gotterm) { + $log->put("terminate on signal")->info; + last try; + } + if (! defined($n)) { + $log->error; + sleep 1; + last try; + } + if (! $n) { + $log->debug; + last try; + } + if (! $event->test($socket, 'r')) { + last try; + } + my $csocket = $socket->accept(10); + if (! defined($csocket)) { + $log->error; + last try; + } + if (! $csocket) { + $log->warn; + last try; + } + my $client = NDB::Net::Client->new( + socket => $csocket, + serversocket => $socket, + serverlock => $lock, + event => $event, + context => 'main', + ); + $client or $log->fatal; + } + loadnetcfg() or $log->fatal; + NDB::Net::Client->processall; + if ($gotterm) { + last loop; + } + redo loop; +} + +$log->put("ndbnetd done")->info; + +1; +# vim:set sw=4: diff --git a/ndb/tools/ndbnet/ndbrun b/ndb/tools/ndbnet/ndbrun new file mode 100644 index 00000000000..99121276d99 --- /dev/null +++ b/ndb/tools/ndbnet/ndbrun @@ -0,0 +1,33 @@ +#! /bin/sh + +# just for autotest for now + +case $# in +1) script=$1 ;; +*) echo "usage: $0 script"; exit 1 ;; +esac + +case $NDB_TOP in +/*) ;; +*) echo "$0: NDB_TOP not defined" >&2; exit 1 ;; +esac + +case $script in +/*) ;; +*) for d in $NDB_TOP $NDB_TOP/test $NDB_TOP/test/ndbnet; do + if [ -f $d/$script ]; then + script=$d/$script + break + fi + done ;; +esac + +if [ ! -f $script ]; then + echo "$0: $script: script not found" >&2; exit 1 +fi + +PERL5LIB=$NDB_TOP/lib/perl5:$PERL5LIB; export PERL5LIB + +perl -cw $script || exit 1 +perl $script +exit $? |