From 670d164fbe60b5f6018ea6cb7a1c778f838ea755 Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 18 Jun 2014 15:52:14 +0200 Subject: Test: Tcl client initial support for automatic reconnection. --- tests/support/redis.tcl | 67 ++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 53 insertions(+), 14 deletions(-) (limited to 'tests/support') diff --git a/tests/support/redis.tcl b/tests/support/redis.tcl index cc1cbba36..ad9cbe8ab 100644 --- a/tests/support/redis.tcl +++ b/tests/support/redis.tcl @@ -31,8 +31,10 @@ package provide redis 0.1 namespace eval redis {} set ::redis::id 0 array set ::redis::fd {} +array set ::redis::addr {} array set ::redis::blocking {} array set ::redis::deferred {} +array set ::redis::reconnect {} array set ::redis::callback {} array set ::redis::state {} ;# State in non-blocking reply reading array set ::redis::statestack {} ;# Stack of states, for nested mbulks @@ -42,35 +44,63 @@ proc redis {{server 127.0.0.1} {port 6379} {defer 0}} { fconfigure $fd -translation binary set id [incr ::redis::id] set ::redis::fd($id) $fd + set ::redis::addr($id) [list $server $port] set ::redis::blocking($id) 1 set ::redis::deferred($id) $defer + set ::redis::reconnect($id) 0 ::redis::redis_reset_state $id interp alias {} ::redis::redisHandle$id {} ::redis::__dispatch__ $id } +# This is a wrapper to the actual dispatching procedure that handles +# reconnection if needed. proc ::redis::__dispatch__ {id method args} { + set errorcode [catch {::redis::__dispatch__raw__ $id $method $args} retval] + if {$errorcode && $::redis::reconnect($id) && $::redis::fd($id) eq {}} { + # Try again if the connection was lost. + # FIXME: we don't re-select the previously selected DB, nor we check + # if we are inside a transaction that needs to be re-issued from + # scratch. + set errorcode [catch {::redis::__dispatch__raw__ $id $method $args} retval] + } + return -code $errorcode $retval +} + +proc ::redis::__dispatch__raw__ {id method argv} { set fd $::redis::fd($id) + + # Reconnect the link if needed. + if {$fd eq {}} { + lassign $::redis::addr($id) host port + set ::redis::fd($id) [socket $host $port] + fconfigure $::redis::fd($id) -translation binary + set fd $::redis::fd($id) + } + set blocking $::redis::blocking($id) set deferred $::redis::deferred($id) if {$blocking == 0} { - if {[llength $args] == 0} { + if {[llength $argv] == 0} { error "Please provide a callback in non-blocking mode" } - set callback [lindex $args end] - set args [lrange $args 0 end-1] + set callback [lindex $argv end] + set argv [lrange $argv 0 end-1] } if {[info command ::redis::__method__$method] eq {}} { - set cmd "*[expr {[llength $args]+1}]\r\n" + set cmd "*[expr {[llength $argv]+1}]\r\n" append cmd "$[string length $method]\r\n$method\r\n" - foreach a $args { + foreach a $argv { append cmd "$[string length $a]\r\n$a\r\n" } ::redis::redis_write $fd $cmd - flush $fd + if {[catch {flush $fd}]} { + set ::redis::fd($id) {} + return -code error "I/O error reading reply" + } if {!$deferred} { if {$blocking} { - ::redis::redis_read_reply $fd + ::redis::redis_read_reply $id $fd } else { # Every well formed reply read will pop an element from this # list and use it as a callback. So pipelining is supported @@ -80,7 +110,7 @@ proc ::redis::__dispatch__ {id method args} { } } } else { - uplevel 1 [list ::redis::__method__$method $id $fd] $args + uplevel 1 [list ::redis::__method__$method $id $fd] $argv } } @@ -89,8 +119,12 @@ proc ::redis::__method__blocking {id fd val} { fconfigure $fd -blocking $val } +proc ::redis::__method__reconnect {id fd val} { + set ::redis::reconnect($id) $val +} + proc ::redis::__method__read {id fd} { - ::redis::redis_read_reply $fd + ::redis::redis_read_reply $id $fd } proc ::redis::__method__write {id fd buf} { @@ -104,8 +138,10 @@ proc ::redis::__method__flush {id fd} { proc ::redis::__method__close {id fd} { catch {close $fd} catch {unset ::redis::fd($id)} + catch {unset ::redis::addr($id)} catch {unset ::redis::blocking($id)} catch {unset ::redis::deferred($id)} + catch {unset ::redis::reconnect($id)} catch {unset ::redis::state($id)} catch {unset ::redis::statestack($id)} catch {unset ::redis::callback($id)} @@ -143,14 +179,14 @@ proc ::redis::redis_bulk_read {fd} { return $buf } -proc ::redis::redis_multi_bulk_read fd { +proc ::redis::redis_multi_bulk_read {id fd} { set count [redis_read_line $fd] if {$count == -1} return {} set l {} set err {} for {set i 0} {$i < $count} {incr i} { if {[catch { - lappend l [redis_read_reply $fd] + lappend l [redis_read_reply $id $fd] } e] && $err eq {}} { set err $e } @@ -163,16 +199,19 @@ proc ::redis::redis_read_line fd { string trim [gets $fd] } -proc ::redis::redis_read_reply fd { +proc ::redis::redis_read_reply {id fd} { set type [read $fd 1] switch -exact -- $type { : - + {redis_read_line $fd} - {return -code error [redis_read_line $fd]} $ {redis_bulk_read $fd} - * {redis_multi_bulk_read $fd} + * {redis_multi_bulk_read $id $fd} default { - if {$type eq {}} {return -code error "I/O error reading reply"} + if {$type eq {}} { + set ::redis::fd($id) {} + return -code error "I/O error reading reply" + } return -code error "Bad protocol, '$type' as reply type byte" } } -- cgit v1.2.1