diff options
author | antirez <antirez@gmail.com> | 2015-12-15 12:54:40 +0100 |
---|---|---|
committer | antirez <antirez@gmail.com> | 2015-12-15 12:54:40 +0100 |
commit | cba1c29580e6ffa12fdb19b83007050264b4bf87 (patch) | |
tree | 12743657a5e51a7e13b0fd0c8d4e751ec4ad9d3c /src/redis-trib.rb | |
parent | 3782902bec9c9856d47833a799aca42c62b3cc8b (diff) | |
download | redis-cba1c29580e6ffa12fdb19b83007050264b4bf87.tar.gz |
Cluster: redis-trib rebalance initial implementation.
Diffstat (limited to 'src/redis-trib.rb')
-rwxr-xr-x | src/redis-trib.rb | 171 |
1 files changed, 154 insertions, 17 deletions
diff --git a/src/redis-trib.rb b/src/redis-trib.rb index 5a19adae7..b065365d9 100755 --- a/src/redis-trib.rb +++ b/src/redis-trib.rb @@ -25,6 +25,10 @@ require 'rubygems' require 'redis' ClusterHashSlots = 16384 +MigrateDefaultTimeout = 60000 +RebalanceDefaultThreshold = 2 + +$verbose = false def xputs(s) case s[0..2] @@ -86,7 +90,7 @@ class ClusterNode def connect(o={}) return if @r - print "Connecting to node #{self}: " + print "Connecting to node #{self}: " if $verbose STDOUT.flush begin @r = Redis.new(:host => @info[:host], :port => @info[:port], :timeout => 60) @@ -96,7 +100,7 @@ class ClusterNode exit 1 if o[:abort] @r = nil end - xputs "OK" + xputs "OK" if $verbose end def assert_cluster @@ -288,7 +292,7 @@ class RedisTrib @nodes = [] @fix = false @errors = [] - @timeout = 60000 + @timeout = MigrateDefaultTimeout end def check_arity(req_args, num_args) @@ -303,6 +307,10 @@ class RedisTrib @nodes << node end + def reset_nodes + @nodes = [] + end + def cluster_error(msg) @errors << msg xputs msg @@ -326,9 +334,9 @@ class RedisTrib sorted[0] end - def check_cluster + def check_cluster(opt={}) xputs ">>> Performing Cluster Check (using node #{@nodes[0]})" - show_nodes + show_nodes if !opt[:quiet] check_config_consistency check_open_slots check_slots_coverage @@ -512,7 +520,7 @@ class RedisTrib # Case 1: The slot is in migrating state in one slot, and in # importing state in 1 slot. That's trivial to address. if migrating.length == 1 && importing.length == 1 - move_slot(migrating[0],importing[0],slot,:verbose=>true,:fix=>true) + move_slot(migrating[0],importing[0],slot,:dots=>true,:fix=>true) # Case 2: There are multiple nodes that claim the slot as importing, # they probably got keys about the slot after a restart so opened # the slot. In this case we just move all the keys to the owner @@ -521,7 +529,7 @@ class RedisTrib xputs ">>> Moving all the #{slot} slot keys to its owner #{owner}" importing.each {|node| next if node == owner - move_slot(node,owner,slot,:verbose=>true,:fix=>true,:cold=>true) + move_slot(node,owner,slot,:dots=>true,:fix=>true,:cold=>true) xputs ">>> Setting #{slot} as STABLE in #{node}" node.r.cluster("setslot",slot,"stable") } @@ -819,13 +827,19 @@ class RedisTrib # Options: # :verbose -- Print a dot for every moved key. # :fix -- We are moving in the context of a fix. Use REPLACE. - # :cold -- Move keys without opening / reconfiguring the nodes. + # :cold -- Move keys without opening slots / reconfiguring the nodes. + # :update -- Update nodes.info[:slots] for source/target nodes. + # :quiet -- Don't print info messages. def move_slot(source,target,slot,o={}) # We start marking the slot as importing in the destination node, # and the slot as migrating in the target host. Note that the order of # the operations is important, as otherwise a client may be redirected # to the target node that does not yet know it is importing this slot. - print "Moving slot #{slot} from #{source} to #{target}: "; STDOUT.flush + if !o[:quiet] + print "Moving slot #{slot} from #{source} to #{target}: " + STDOUT.flush + end + if !o[:cold] target.r.cluster("setslot",slot,"importing",source.info[:name]) source.r.cluster("setslot",slot,"migrating",target.info[:name]) @@ -846,20 +860,26 @@ class RedisTrib exit 1 end end - print "."*keys.length if o[:verbose] + print "."*keys.length if o[:dots] STDOUT.flush end - puts + puts if !o[:quiet] # Set the new node as the owner of the slot in all the known nodes. if !o[:cold] @nodes.each{|n| n.r.cluster("setslot",slot,"node",target.info[:name]) } end + + # Update the node logical config + if o[:update] then + source.info[:slots].delete(slot) + target.info[:slots][slot] = true + end end - # redis-trib subcommands implementations + # redis-trib subcommands implementations. def check_cluster_cmd(argv,opt) load_cluster_info_from_node(argv[0]) @@ -871,6 +891,106 @@ class RedisTrib show_cluster_info end + def rebalance_cluster_cmd(argv,opt) + load_cluster_info_from_node(argv[0]) + + # Options parsing + threshold = opt['threshold'].to_i + autoweights = opt['auto-weights'] + weights = {} + opt['weight'].each{|w| + fields = w.split("=") + node = get_node_by_name(fields[0]) + if !node || !node.has_flag?("master") + puts "*** No such master node #{fields[0]}" + exit 1 + end + weights[fields[0]] = fields[1].to_f + } if opt['weight'] + useempty = opt['use-empty-masters'] + + # Assign a weight to each node, and compute the total cluster weight. + total_weight = 0 + nodes_involved = 0 + @nodes.each{|n| + if n.has_flag?("master") + next if !useempty && n.slots.length == 0 + n.info[:w] = weights[n.info[:name]] ? weights[n.info[:name]] : 1 + total_weight += n.info[:w] + nodes_involved += 1 + end + } + + # Check cluster, only proceed if it looks sane. + check_cluster(:quiet => true) + if @errors.length != 0 + puts "*** Please fix your cluster problems before rebalancing" + exit 1 + end + + # Calculate the slots balance for each node. It's the number of + # slots the node should lose (if positive) or gain (if negative) + # in order to be balanced. + @nodes.each{|n| + if n.has_flag?("master") + next if !n.info[:w] + expected = ((ClusterHashSlots.to_f / total_weight) * + n.info[:w]).to_i + n.info[:balance] = n.slots.length - expected + puts "#{n} balance is #{n.info[:balance]} slots" if $verbose + end + } + + # Sort nodes by their slots balance. + sn = @nodes.select{|n| + n.has_flag?("master") + }.sort{|a,b| + a.info[:balance] <=> b.info[:balance] + } + + xputs ">>> Rebalancing across #{nodes_involved} nodes. Total weight = #{total_weight}" + + # Now we have at the start of the 'sn' array nodes that should get + # slots, at the end nodes that must give slots. + # We take two indexes, one at the start, and one at the end, + # incrementing or decrementing the indexes accordingly til we + # find nodes that need to get/provide slots. + dst_idx = 0 + src_idx = sn.length - 1 + + while dst_idx < src_idx + dst = sn[dst_idx] + src = sn[src_idx] + numslots = [dst.info[:balance],src.info[:balance]].map{|n| + n.abs + }.min + + if numslots > 0 + puts "Moving #{numslots} slots from #{src} to #{dst}" + + # Actaully move the slots. + reshard_table = compute_reshard_table([src],numslots) + if reshard_table.length != numslots + xputs "*** Assertio failed: Reshard table != number of slots" + exit 1 + end + reshard_table.each{|e| + move_slot(e[:source],dst,e[:slot], + :quiet=>true,:dots=>false,:update=>true) + print "#" + STDOUT.flush + } + puts + end + + # Update nodes balance. + dst.info[:balance] += numslots + src.info[:balance] -= numslots + dst_idx += 1 if dst.info[:balance] == 0 + src_idx -= 1 if src.info[:balance] == 0 + end + end + def fix_cluster_cmd(argv,opt) @fix = true @timeout = opt['timeout'].to_i if opt['timeout'] @@ -992,7 +1112,7 @@ class RedisTrib exit(1) if (yesno != "yes") end reshard_table.each{|e| - move_slot(e[:source],target,e[:slot],:verbose=>true) + move_slot(e[:source],target,e[:slot],:dots=>true) } end @@ -1238,17 +1358,32 @@ class RedisTrib if ARGV[idx][0..1] == "--" option = ARGV[idx][2..-1] idx += 1 + + # --verbose is a global option + if option == "verbose" + $verbose = true + next + end + if ALLOWED_OPTIONS[cmd] == nil || ALLOWED_OPTIONS[cmd][option] == nil puts "Unknown option '#{option}' for command '#{cmd}'" exit 1 end - if ALLOWED_OPTIONS[cmd][option] + if ALLOWED_OPTIONS[cmd][option] != false value = ARGV[idx] idx += 1 else value = true end - options[option] = value + + # If the option is set to [], it's a multiple arguments + # option. We just queue every new value into an array. + if ALLOWED_OPTIONS[cmd][option] == [] + options[option] = [] if !options[option] + options[option] << value + else + options[option] = value + end else # Remaining arguments are not options. break @@ -1363,6 +1498,7 @@ COMMANDS={ "info" => ["info_cluster_cmd", 2, "host:port"], "fix" => ["fix_cluster_cmd", 2, "host:port"], "reshard" => ["reshard_cluster_cmd", 2, "host:port"], + "rebalance" => ["rebalance_cluster_cmd", -2, "host:port"], "add-node" => ["addnode_cluster_cmd", 3, "new_host:new_port existing_host:existing_port"], "del-node" => ["delnode_cluster_cmd", 3, "host:port node_id"], "set-timeout" => ["set_timeout_cluster_cmd", 3, "host:port milliseconds"], @@ -1375,8 +1511,9 @@ ALLOWED_OPTIONS={ "create" => {"replicas" => true}, "add-node" => {"slave" => false, "master-id" => true}, "import" => {"from" => :required, "copy" => false, "replace" => false}, - "reshard" => {"from" => true, "to" => true, "slots" => true, "yes" => false, "timeout" => 15000}, - "fix" => {"timeout" => 15000}, + "reshard" => {"from" => true, "to" => true, "slots" => true, "yes" => false, "timeout" => MigrateDefaultTimeout}, + "rebalance" => {"weight" => [], "auto-weights" => false, "threshold" => RebalanceDefaultThreshold, "use-empty-masters" => false, "timeout" => MigrateDefaultTimeout}, + "fix" => {"timeout" => MigrateDefaultTimeout}, } def show_help |