summaryrefslogtreecommitdiff
path: root/src/redis-trib.rb
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2015-12-15 12:54:40 +0100
committerantirez <antirez@gmail.com>2015-12-15 12:54:40 +0100
commitcba1c29580e6ffa12fdb19b83007050264b4bf87 (patch)
tree12743657a5e51a7e13b0fd0c8d4e751ec4ad9d3c /src/redis-trib.rb
parent3782902bec9c9856d47833a799aca42c62b3cc8b (diff)
downloadredis-cba1c29580e6ffa12fdb19b83007050264b4bf87.tar.gz
Cluster: redis-trib rebalance initial implementation.
Diffstat (limited to 'src/redis-trib.rb')
-rwxr-xr-xsrc/redis-trib.rb171
1 files changed, 154 insertions, 17 deletions
diff --git a/src/redis-trib.rb b/src/redis-trib.rb
index 5a19adae7..b065365d9 100755
--- a/src/redis-trib.rb
+++ b/src/redis-trib.rb
@@ -25,6 +25,10 @@ require 'rubygems'
require 'redis'
ClusterHashSlots = 16384
+MigrateDefaultTimeout = 60000
+RebalanceDefaultThreshold = 2
+
+$verbose = false
def xputs(s)
case s[0..2]
@@ -86,7 +90,7 @@ class ClusterNode
def connect(o={})
return if @r
- print "Connecting to node #{self}: "
+ print "Connecting to node #{self}: " if $verbose
STDOUT.flush
begin
@r = Redis.new(:host => @info[:host], :port => @info[:port], :timeout => 60)
@@ -96,7 +100,7 @@ class ClusterNode
exit 1 if o[:abort]
@r = nil
end
- xputs "OK"
+ xputs "OK" if $verbose
end
def assert_cluster
@@ -288,7 +292,7 @@ class RedisTrib
@nodes = []
@fix = false
@errors = []
- @timeout = 60000
+ @timeout = MigrateDefaultTimeout
end
def check_arity(req_args, num_args)
@@ -303,6 +307,10 @@ class RedisTrib
@nodes << node
end
+ def reset_nodes
+ @nodes = []
+ end
+
def cluster_error(msg)
@errors << msg
xputs msg
@@ -326,9 +334,9 @@ class RedisTrib
sorted[0]
end
- def check_cluster
+ def check_cluster(opt={})
xputs ">>> Performing Cluster Check (using node #{@nodes[0]})"
- show_nodes
+ show_nodes if !opt[:quiet]
check_config_consistency
check_open_slots
check_slots_coverage
@@ -512,7 +520,7 @@ class RedisTrib
# Case 1: The slot is in migrating state in one slot, and in
# importing state in 1 slot. That's trivial to address.
if migrating.length == 1 && importing.length == 1
- move_slot(migrating[0],importing[0],slot,:verbose=>true,:fix=>true)
+ move_slot(migrating[0],importing[0],slot,:dots=>true,:fix=>true)
# Case 2: There are multiple nodes that claim the slot as importing,
# they probably got keys about the slot after a restart so opened
# the slot. In this case we just move all the keys to the owner
@@ -521,7 +529,7 @@ class RedisTrib
xputs ">>> Moving all the #{slot} slot keys to its owner #{owner}"
importing.each {|node|
next if node == owner
- move_slot(node,owner,slot,:verbose=>true,:fix=>true,:cold=>true)
+ move_slot(node,owner,slot,:dots=>true,:fix=>true,:cold=>true)
xputs ">>> Setting #{slot} as STABLE in #{node}"
node.r.cluster("setslot",slot,"stable")
}
@@ -819,13 +827,19 @@ class RedisTrib
# Options:
# :verbose -- Print a dot for every moved key.
# :fix -- We are moving in the context of a fix. Use REPLACE.
- # :cold -- Move keys without opening / reconfiguring the nodes.
+ # :cold -- Move keys without opening slots / reconfiguring the nodes.
+ # :update -- Update nodes.info[:slots] for source/target nodes.
+ # :quiet -- Don't print info messages.
def move_slot(source,target,slot,o={})
# We start marking the slot as importing in the destination node,
# and the slot as migrating in the target host. Note that the order of
# the operations is important, as otherwise a client may be redirected
# to the target node that does not yet know it is importing this slot.
- print "Moving slot #{slot} from #{source} to #{target}: "; STDOUT.flush
+ if !o[:quiet]
+ print "Moving slot #{slot} from #{source} to #{target}: "
+ STDOUT.flush
+ end
+
if !o[:cold]
target.r.cluster("setslot",slot,"importing",source.info[:name])
source.r.cluster("setslot",slot,"migrating",target.info[:name])
@@ -846,20 +860,26 @@ class RedisTrib
exit 1
end
end
- print "."*keys.length if o[:verbose]
+ print "."*keys.length if o[:dots]
STDOUT.flush
end
- puts
+ puts if !o[:quiet]
# Set the new node as the owner of the slot in all the known nodes.
if !o[:cold]
@nodes.each{|n|
n.r.cluster("setslot",slot,"node",target.info[:name])
}
end
+
+ # Update the node logical config
+ if o[:update] then
+ source.info[:slots].delete(slot)
+ target.info[:slots][slot] = true
+ end
end
- # redis-trib subcommands implementations
+ # redis-trib subcommands implementations.
def check_cluster_cmd(argv,opt)
load_cluster_info_from_node(argv[0])
@@ -871,6 +891,106 @@ class RedisTrib
show_cluster_info
end
+ def rebalance_cluster_cmd(argv,opt)
+ load_cluster_info_from_node(argv[0])
+
+ # Options parsing
+ threshold = opt['threshold'].to_i
+ autoweights = opt['auto-weights']
+ weights = {}
+ opt['weight'].each{|w|
+ fields = w.split("=")
+ node = get_node_by_name(fields[0])
+ if !node || !node.has_flag?("master")
+ puts "*** No such master node #{fields[0]}"
+ exit 1
+ end
+ weights[fields[0]] = fields[1].to_f
+ } if opt['weight']
+ useempty = opt['use-empty-masters']
+
+ # Assign a weight to each node, and compute the total cluster weight.
+ total_weight = 0
+ nodes_involved = 0
+ @nodes.each{|n|
+ if n.has_flag?("master")
+ next if !useempty && n.slots.length == 0
+ n.info[:w] = weights[n.info[:name]] ? weights[n.info[:name]] : 1
+ total_weight += n.info[:w]
+ nodes_involved += 1
+ end
+ }
+
+ # Check cluster, only proceed if it looks sane.
+ check_cluster(:quiet => true)
+ if @errors.length != 0
+ puts "*** Please fix your cluster problems before rebalancing"
+ exit 1
+ end
+
+ # Calculate the slots balance for each node. It's the number of
+ # slots the node should lose (if positive) or gain (if negative)
+ # in order to be balanced.
+ @nodes.each{|n|
+ if n.has_flag?("master")
+ next if !n.info[:w]
+ expected = ((ClusterHashSlots.to_f / total_weight) *
+ n.info[:w]).to_i
+ n.info[:balance] = n.slots.length - expected
+ puts "#{n} balance is #{n.info[:balance]} slots" if $verbose
+ end
+ }
+
+ # Sort nodes by their slots balance.
+ sn = @nodes.select{|n|
+ n.has_flag?("master")
+ }.sort{|a,b|
+ a.info[:balance] <=> b.info[:balance]
+ }
+
+ xputs ">>> Rebalancing across #{nodes_involved} nodes. Total weight = #{total_weight}"
+
+ # Now we have at the start of the 'sn' array nodes that should get
+ # slots, at the end nodes that must give slots.
+ # We take two indexes, one at the start, and one at the end,
+ # incrementing or decrementing the indexes accordingly til we
+ # find nodes that need to get/provide slots.
+ dst_idx = 0
+ src_idx = sn.length - 1
+
+ while dst_idx < src_idx
+ dst = sn[dst_idx]
+ src = sn[src_idx]
+ numslots = [dst.info[:balance],src.info[:balance]].map{|n|
+ n.abs
+ }.min
+
+ if numslots > 0
+ puts "Moving #{numslots} slots from #{src} to #{dst}"
+
+ # Actaully move the slots.
+ reshard_table = compute_reshard_table([src],numslots)
+ if reshard_table.length != numslots
+ xputs "*** Assertio failed: Reshard table != number of slots"
+ exit 1
+ end
+ reshard_table.each{|e|
+ move_slot(e[:source],dst,e[:slot],
+ :quiet=>true,:dots=>false,:update=>true)
+ print "#"
+ STDOUT.flush
+ }
+ puts
+ end
+
+ # Update nodes balance.
+ dst.info[:balance] += numslots
+ src.info[:balance] -= numslots
+ dst_idx += 1 if dst.info[:balance] == 0
+ src_idx -= 1 if src.info[:balance] == 0
+ end
+ end
+
def fix_cluster_cmd(argv,opt)
@fix = true
@timeout = opt['timeout'].to_i if opt['timeout']
@@ -992,7 +1112,7 @@ class RedisTrib
exit(1) if (yesno != "yes")
end
reshard_table.each{|e|
- move_slot(e[:source],target,e[:slot],:verbose=>true)
+ move_slot(e[:source],target,e[:slot],:dots=>true)
}
end
@@ -1238,17 +1358,32 @@ class RedisTrib
if ARGV[idx][0..1] == "--"
option = ARGV[idx][2..-1]
idx += 1
+
+ # --verbose is a global option
+ if option == "verbose"
+ $verbose = true
+ next
+ end
+
if ALLOWED_OPTIONS[cmd] == nil || ALLOWED_OPTIONS[cmd][option] == nil
puts "Unknown option '#{option}' for command '#{cmd}'"
exit 1
end
- if ALLOWED_OPTIONS[cmd][option]
+ if ALLOWED_OPTIONS[cmd][option] != false
value = ARGV[idx]
idx += 1
else
value = true
end
- options[option] = value
+
+ # If the option is set to [], it's a multiple arguments
+ # option. We just queue every new value into an array.
+ if ALLOWED_OPTIONS[cmd][option] == []
+ options[option] = [] if !options[option]
+ options[option] << value
+ else
+ options[option] = value
+ end
else
# Remaining arguments are not options.
break
@@ -1363,6 +1498,7 @@ COMMANDS={
"info" => ["info_cluster_cmd", 2, "host:port"],
"fix" => ["fix_cluster_cmd", 2, "host:port"],
"reshard" => ["reshard_cluster_cmd", 2, "host:port"],
+ "rebalance" => ["rebalance_cluster_cmd", -2, "host:port"],
"add-node" => ["addnode_cluster_cmd", 3, "new_host:new_port existing_host:existing_port"],
"del-node" => ["delnode_cluster_cmd", 3, "host:port node_id"],
"set-timeout" => ["set_timeout_cluster_cmd", 3, "host:port milliseconds"],
@@ -1375,8 +1511,9 @@ ALLOWED_OPTIONS={
"create" => {"replicas" => true},
"add-node" => {"slave" => false, "master-id" => true},
"import" => {"from" => :required, "copy" => false, "replace" => false},
- "reshard" => {"from" => true, "to" => true, "slots" => true, "yes" => false, "timeout" => 15000},
- "fix" => {"timeout" => 15000},
+ "reshard" => {"from" => true, "to" => true, "slots" => true, "yes" => false, "timeout" => MigrateDefaultTimeout},
+ "rebalance" => {"weight" => [], "auto-weights" => false, "threshold" => RebalanceDefaultThreshold, "use-empty-masters" => false, "timeout" => MigrateDefaultTimeout},
+ "fix" => {"timeout" => MigrateDefaultTimeout},
}
def show_help