summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2015-12-18 12:27:14 +0100
committerantirez <antirez@gmail.com>2015-12-18 15:52:22 +0100
commitb4b7c57cb026b0b8e533135be4e5c0590d063690 (patch)
tree98c01e1bbd956c311f6df6c83f10477144ef2b29
parentc48355e920f6387b85cf9d46bf40f00d4cd46f5d (diff)
downloadredis-b4b7c57cb026b0b8e533135be4e5c0590d063690.tar.gz
Cluster: redis-trib reshard / rebalance --pipeline support.
-rwxr-xr-xsrc/redis-trib.rb26
1 files changed, 21 insertions, 5 deletions
diff --git a/src/redis-trib.rb b/src/redis-trib.rb
index 285b955e2..a3eaf4a5d 100755
--- a/src/redis-trib.rb
+++ b/src/redis-trib.rb
@@ -26,6 +26,7 @@ require 'redis'
ClusterHashSlots = 16384
MigrateDefaultTimeout = 60000
+MigrateDefaultPipeline = 10
RebalanceDefaultThreshold = 2
$verbose = false
@@ -36,6 +37,8 @@ def xputs(s)
color="29;1"
when "[ER"
color="31;1"
+ when "[WA"
+ color="31;1"
when "[OK"
color="32"
when "[FA","***"
@@ -847,6 +850,8 @@ class RedisTrib
# :update -- Update nodes.info[:slots] for source/target nodes.
# :quiet -- Don't print info messages.
def move_slot(source,target,slot,o={})
+ o = {:pipeline => MigrateDefaultPipeline}.merge(o)
+
# We start marking the slot as importing in the destination node,
# and the slot as migrating in the target host. Note that the order of
# the operations is important, as otherwise a client may be redirected
@@ -862,7 +867,7 @@ class RedisTrib
end
# Migrate all the keys from source to target using the MIGRATE command
while true
- keys = source.r.cluster("getkeysinslot",slot,10)
+ keys = source.r.cluster("getkeysinslot",slot,o[:pipeline])
break if keys.length == 0
begin
source.r.client.call(["migrate",target.info[:host],target.info[:port],"",0,@timeout,:keys,*keys])
@@ -908,6 +913,10 @@ class RedisTrib
end
def rebalance_cluster_cmd(argv,opt)
+ opt = {'pipeline' => MigrateDefaultPipeline}.merge(opt)
+
+ # Load nodes info before parsing options, otherwise we can't
+ # handle --weight.
load_cluster_info_from_node(argv[0])
# Options parsing
@@ -995,7 +1004,10 @@ class RedisTrib
else
reshard_table.each{|e|
move_slot(e[:source],dst,e[:slot],
- :quiet=>true,:dots=>false,:update=>true)
+ :quiet=>true,
+ :dots=>false,
+ :update=>true,
+ :pipeline=>opt['pipeline'])
print "#"
STDOUT.flush
}
@@ -1020,6 +1032,8 @@ class RedisTrib
end
def reshard_cluster_cmd(argv,opt)
+ opt = {'pipeline' => MigrateDefaultPipeline}.merge(opt)
+
load_cluster_info_from_node(argv[0])
check_cluster
if @errors.length != 0
@@ -1132,7 +1146,9 @@ class RedisTrib
exit(1) if (yesno != "yes")
end
reshard_table.each{|e|
- move_slot(e[:source],target,e[:slot],:dots=>true)
+ move_slot(e[:source],target,e[:slot],
+ :dots=>true,
+ :pipeline=>opt['pipeline'])
}
end
@@ -1531,8 +1547,8 @@ ALLOWED_OPTIONS={
"create" => {"replicas" => true},
"add-node" => {"slave" => false, "master-id" => true},
"import" => {"from" => :required, "copy" => false, "replace" => false},
- "reshard" => {"from" => true, "to" => true, "slots" => true, "yes" => false, "timeout" => MigrateDefaultTimeout},
- "rebalance" => {"weight" => [], "auto-weights" => false, "threshold" => RebalanceDefaultThreshold, "use-empty-masters" => false, "timeout" => MigrateDefaultTimeout, "simulate" => false},
+ "reshard" => {"from" => true, "to" => true, "slots" => true, "yes" => false, "timeout" => true, "pipeline" => true},
+ "rebalance" => {"weight" => [], "auto-weights" => false, "threshold" => RebalanceDefaultThreshold, "use-empty-masters" => false, "timeout" => true, "simulate" => false, "pipeline" => true},
"fix" => {"timeout" => MigrateDefaultTimeout},
}