summaryrefslogtreecommitdiff
path: root/src/redis-trib.rb
diff options
context:
space:
mode:
authorartix <artix2@gmail.com>2018-06-27 16:42:22 +0200
committerartix <artix2@gmail.com>2018-07-13 10:51:58 +0200
commitd222eda9f089f6605d755777c32132cb8cad76c9 (patch)
tree619dfce0c99a9c95b7d0855546c228b2e5a0fa1e /src/redis-trib.rb
parent513eb5728cbe364721f3258b6964566d28b18136 (diff)
downloadredis-d222eda9f089f6605d755777c32132cb8cad76c9.tar.gz
Redis-trib deprecated: it no longer works and it
outputs a warning to the user.
Diffstat (limited to 'src/redis-trib.rb')
-rwxr-xr-xsrc/redis-trib.rb1907
1 files changed, 103 insertions, 1804 deletions
diff --git a/src/redis-trib.rb b/src/redis-trib.rb
index 770fa68eb..b1af83069 100755
--- a/src/redis-trib.rb
+++ b/src/redis-trib.rb
@@ -1,1830 +1,129 @@
#!/usr/bin/env ruby
-# TODO (temporary here, we'll move this into the Github issues once
-# redis-trib initial implementation is completed).
-#
-# - Make sure that if the rehashing fails in the middle redis-trib will try
-# to recover.
-# - When redis-trib performs a cluster check, if it detects a slot move in
-# progress it should prompt the user to continue the move from where it
-# stopped.
-# - Gracefully handle Ctrl+C in move_slot to prompt the user if really stop
-# while rehashing, and performing the best cleanup possible if the user
-# forces the quit.
-# - When doing "fix" set a global Fix to true, and prompt the user to
-# fix the problem if automatically fixable every time there is something
-# to fix. For instance:
-# 1) If there is a node that pretend to receive a slot, or to migrate a
-# slot, but has no entries in that slot, fix it.
-# 2) If there is a node having keys in slots that are not owned by it
-# fix this condition moving the entries in the same node.
-# 3) Perform more possibly slow tests about the state of the cluster.
-# 4) When aborted slot migration is detected, fix it.
-
-require 'rubygems'
-require 'redis'
-
-ClusterHashSlots = 16384
-MigrateDefaultTimeout = 60000
-MigrateDefaultPipeline = 10
-RebalanceDefaultThreshold = 2
-
-$verbose = false
-
-def xputs(s)
- case s[0..2]
- when ">>>"
- color="29;1"
- when "[ER"
- color="31;1"
- when "[WA"
- color="31;1"
- when "[OK"
- color="32"
- when "[FA","***"
- color="33"
- else
- color=nil
- end
-
- color = nil if ENV['TERM'] != "xterm"
- print "\033[#{color}m" if color
- print s
- print "\033[0m" if color
- print "\n"
+def colorized(str, color)
+ return str if !(ENV['TERM'] || '')["xterm"]
+ color_code = {
+ white: 29,
+ bold: '29;1',
+ black: 30,
+ red: 31,
+ green: 32,
+ yellow: 33,
+ blue: 34,
+ magenta: 35,
+ cyan: 36,
+ gray: 37
+ }[color]
+ return str if !color_code
+ "\033[#{color_code}m#{str}\033[0m"
end
-class ClusterNode
- def initialize(addr)
- s = addr.split("@")[0].split(":")
- if s.length < 2
- puts "Invalid IP or Port (given as #{addr}) - use IP:Port format"
- exit 1
- end
- port = s.pop # removes port from split array
- ip = s.join(":") # if s.length > 1 here, it's IPv6, so restore address
- @r = nil
- @info = {}
- @info[:host] = ip
- @info[:port] = port
- @info[:slots] = {}
- @info[:migrating] = {}
- @info[:importing] = {}
- @info[:replicate] = false
- @dirty = false # True if we need to flush slots info into node.
- @friends = []
- end
-
- def friends
- @friends
- end
-
- def slots
- @info[:slots]
- end
-
- def has_flag?(flag)
- @info[:flags].index(flag)
- end
-
- def to_s
- "#{@info[:host]}:#{@info[:port]}"
- end
-
- def connect(o={})
- return if @r
- print "Connecting to node #{self}: " if $verbose
- STDOUT.flush
- begin
- @r = Redis.new(:host => @info[:host], :port => @info[:port], :timeout => 60)
- @r.ping
- rescue
- xputs "[ERR] Sorry, can't connect to node #{self}"
- exit 1 if o[:abort]
- @r = nil
- end
- xputs "OK" if $verbose
- end
-
- def assert_cluster
- info = @r.info
- if !info["cluster_enabled"] || info["cluster_enabled"].to_i == 0
- xputs "[ERR] Node #{self} is not configured as a cluster node."
- exit 1
- end
- end
-
- def assert_empty
- if !(@r.cluster("info").split("\r\n").index("cluster_known_nodes:1")) ||
- (@r.info['db0'])
- xputs "[ERR] Node #{self} is not empty. Either the node already knows other nodes (check with CLUSTER NODES) or contains some key in database 0."
- exit 1
- end
- end
-
- def load_info(o={})
- self.connect
- nodes = @r.cluster("nodes").split("\n")
- nodes.each{|n|
- # name addr flags role ping_sent ping_recv link_status slots
- split = n.split
- name,addr,flags,master_id,ping_sent,ping_recv,config_epoch,link_status = split[0..6]
- slots = split[8..-1]
- info = {
- :name => name,
- :addr => addr,
- :flags => flags.split(","),
- :replicate => master_id,
- :ping_sent => ping_sent.to_i,
- :ping_recv => ping_recv.to_i,
- :link_status => link_status
- }
- info[:replicate] = false if master_id == "-"
-
- if info[:flags].index("myself")
- @info = @info.merge(info)
- @info[:slots] = {}
- slots.each{|s|
- if s[0..0] == '['
- if s.index("->-") # Migrating
- slot,dst = s[1..-1].split("->-")
- @info[:migrating][slot.to_i] = dst
- elsif s.index("-<-") # Importing
- slot,src = s[1..-1].split("-<-")
- @info[:importing][slot.to_i] = src
- end
- elsif s.index("-")
- start,stop = s.split("-")
- self.add_slots((start.to_i)..(stop.to_i))
- else
- self.add_slots((s.to_i)..(s.to_i))
- end
- } if slots
- @dirty = false
- @r.cluster("info").split("\n").each{|e|
- k,v=e.split(":")
- k = k.to_sym
- v.chop!
- if k != :cluster_state
- @info[k] = v.to_i
- else
- @info[k] = v
- end
- }
- elsif o[:getfriends]
- @friends << info
- end
- }
- end
-
- def add_slots(slots)
- slots.each{|s|
- @info[:slots][s] = :new
- }
- @dirty = true
- end
-
- def set_as_replica(node_id)
- @info[:replicate] = node_id
- @dirty = true
- end
-
- def flush_node_config
- return if !@dirty
- if @info[:replicate]
- begin
- @r.cluster("replicate",@info[:replicate])
- rescue
- # If the cluster did not already joined it is possible that
- # the slave does not know the master node yet. So on errors
- # we return ASAP leaving the dirty flag set, to flush the
- # config later.
- return
- end
- else
- new = []
- @info[:slots].each{|s,val|
- if val == :new
- new << s
- @info[:slots][s] = true
- end
- }
- @r.cluster("addslots",*new)
- end
- @dirty = false
- end
-
- def info_string
- # We want to display the hash slots assigned to this node
- # as ranges, like in: "1-5,8-9,20-25,30"
- #
- # Note: this could be easily written without side effects,
- # we use 'slots' just to split the computation into steps.
-
- # First step: we want an increasing array of integers
- # for instance: [1,2,3,4,5,8,9,20,21,22,23,24,25,30]
- slots = @info[:slots].keys.sort
-
- # As we want to aggregate adjacent slots we convert all the
- # slot integers into ranges (with just one element)
- # So we have something like [1..1,2..2, ... and so forth.
- slots.map!{|x| x..x}
-
- # Finally we group ranges with adjacent elements.
- slots = slots.reduce([]) {|a,b|
- if !a.empty? && b.first == (a[-1].last)+1
- a[0..-2] + [(a[-1].first)..(b.last)]
- else
- a + [b]
- end
- }
-
- # Now our task is easy, we just convert ranges with just one
- # element into a number, and a real range into a start-end format.
- # Finally we join the array using the comma as separator.
- slots = slots.map{|x|
- x.count == 1 ? x.first.to_s : "#{x.first}-#{x.last}"
- }.join(",")
-
- role = self.has_flag?("master") ? "M" : "S"
+class String
- if self.info[:replicate] and @dirty
- is = "S: #{self.info[:name]} #{self.to_s}"
- else
- is = "#{role}: #{self.info[:name]} #{self.to_s}\n"+
- " slots:#{slots} (#{self.slots.length} slots) "+
- "#{(self.info[:flags]-["myself"]).join(",")}"
- end
- if self.info[:replicate]
- is += "\n replicates #{info[:replicate]}"
- elsif self.has_flag?("master") && self.info[:replicas]
- is += "\n #{info[:replicas].length} additional replica(s)"
- end
- is
- end
-
- # Return a single string representing nodes and associated slots.
- # TODO: remove slaves from config when slaves will be handled
- # by Redis Cluster.
- def get_config_signature
- config = []
- @r.cluster("nodes").each_line{|l|
- s = l.split
- slots = s[8..-1].select {|x| x[0..0] != "["}
- next if slots.length == 0
- config << s[0]+":"+(slots.sort.join(","))
+ %w(white bold black red green yellow blue magenta cyan gray).each{|color|
+ color = :"#{color}"
+ define_method(color){
+ colorized(self, color)
}
- config.sort.join("|")
- end
-
- def info
- @info
- end
-
- def is_dirty?
- @dirty
- end
+ }
- def r
- @r
- end
end
-class RedisTrib
- def initialize
- @nodes = []
- @fix = false
- @errors = []
- @timeout = MigrateDefaultTimeout
- end
-
- def check_arity(req_args, num_args)
- if ((req_args > 0 and num_args != req_args) ||
- (req_args < 0 and num_args < req_args.abs))
- xputs "[ERR] Wrong number of arguments for specified sub command"
- exit 1
- end
- end
-
- def add_node(node)
- @nodes << node
- end
-
- def reset_nodes
- @nodes = []
- end
-
- def cluster_error(msg)
- @errors << msg
- xputs msg
- end
-
- # Return the node with the specified ID or Nil.
- def get_node_by_name(name)
- @nodes.each{|n|
- return n if n.info[:name] == name.downcase
- }
- return nil
- end
-
- # Like get_node_by_name but the specified name can be just the first
- # part of the node ID as long as the prefix in unique across the
- # cluster.
- def get_node_by_abbreviated_name(name)
- l = name.length
- candidates = []
- @nodes.each{|n|
- if n.info[:name][0...l] == name.downcase
- candidates << n
- end
- }
- return nil if candidates.length != 1
- candidates[0]
- end
-
- # This function returns the master that has the least number of replicas
- # in the cluster. If there are multiple masters with the same smaller
- # number of replicas, one at random is returned.
- def get_master_with_least_replicas
- masters = @nodes.select{|n| n.has_flag? "master"}
- sorted = masters.sort{|a,b|
- a.info[:replicas].length <=> b.info[:replicas].length
- }
- sorted[0]
- end
-
- def check_cluster(opt={})
- xputs ">>> Performing Cluster Check (using node #{@nodes[0]})"
- show_nodes if !opt[:quiet]
- check_config_consistency
- check_open_slots
- check_slots_coverage
- end
-
- def show_cluster_info
- masters = 0
- keys = 0
- @nodes.each{|n|
- if n.has_flag?("master")
- puts "#{n} (#{n.info[:name][0...8]}...) -> #{n.r.dbsize} keys | #{n.slots.length} slots | "+
- "#{n.info[:replicas].length} slaves."
- masters += 1
- keys += n.r.dbsize
- end
- }
- xputs "[OK] #{keys} keys in #{masters} masters."
- keys_per_slot = sprintf("%.2f",keys/16384.0)
- puts "#{keys_per_slot} keys per slot on average."
- end
-
- # Merge slots of every known node. If the resulting slots are equal
- # to ClusterHashSlots, then all slots are served.
- def covered_slots
- slots = {}
- @nodes.each{|n|
- slots = slots.merge(n.slots)
- }
- slots
- end
-
- def check_slots_coverage
- xputs ">>> Check slots coverage..."
- slots = covered_slots
- if slots.length == ClusterHashSlots
- xputs "[OK] All #{ClusterHashSlots} slots covered."
- else
- cluster_error \
- "[ERR] Not all #{ClusterHashSlots} slots are covered by nodes."
- fix_slots_coverage if @fix
- end
- end
-
- def check_open_slots
- xputs ">>> Check for open slots..."
- open_slots = []
- @nodes.each{|n|
- if n.info[:migrating].size > 0
- cluster_error \
- "[WARNING] Node #{n} has slots in migrating state (#{n.info[:migrating].keys.join(",")})."
- open_slots += n.info[:migrating].keys
- end
- if n.info[:importing].size > 0
- cluster_error \
- "[WARNING] Node #{n} has slots in importing state (#{n.info[:importing].keys.join(",")})."
- open_slots += n.info[:importing].keys
- end
- }
- open_slots.uniq!
- if open_slots.length > 0
- xputs "[WARNING] The following slots are open: #{open_slots.join(",")}"
- end
- if @fix
- open_slots.each{|slot| fix_open_slot slot}
- end
- end
-
- def nodes_with_keys_in_slot(slot)
- nodes = []
- @nodes.each{|n|
- next if n.has_flag?("slave")
- nodes << n if n.r.cluster("getkeysinslot",slot,1).length > 0
- }
- nodes
- end
-
- def fix_slots_coverage
- not_covered = (0...ClusterHashSlots).to_a - covered_slots.keys
- xputs ">>> Fixing slots coverage..."
- xputs "List of not covered slots: " + not_covered.join(",")
-
- # For every slot, take action depending on the actual condition:
- # 1) No node has keys for this slot.
- # 2) A single node has keys for this slot.
- # 3) Multiple nodes have keys for this slot.
- slots = {}
- not_covered.each{|slot|
- nodes = nodes_with_keys_in_slot(slot)
- slots[slot] = nodes
- xputs "Slot #{slot} has keys in #{nodes.length} nodes: #{nodes.join(", ")}"
- }
-
- none = slots.select {|k,v| v.length == 0}
- single = slots.select {|k,v| v.length == 1}
- multi = slots.select {|k,v| v.length > 1}
-
- # Handle case "1": keys in no node.
- if none.length > 0
- xputs "The following uncovered slots have no keys across the cluster:"
- xputs none.keys.join(",")
- yes_or_die "Fix these slots by covering with a random node?"
- none.each{|slot,nodes|
- node = @nodes.sample
- xputs ">>> Covering slot #{slot} with #{node}"
- node.r.cluster("addslots",slot)
- }
- end
-
- # Handle case "2": keys only in one node.
- if single.length > 0
- xputs "The following uncovered slots have keys in just one node:"
- puts single.keys.join(",")
- yes_or_die "Fix these slots by covering with those nodes?"
- single.each{|slot,nodes|
- xputs ">>> Covering slot #{slot} with #{nodes[0]}"
- nodes[0].r.cluster("addslots",slot)
- }
- end
-
- # Handle case "3": keys in multiple nodes.
- if multi.length > 0
- xputs "The following uncovered slots have keys in multiple nodes:"
- xputs multi.keys.join(",")
- yes_or_die "Fix these slots by moving keys into a single node?"
- multi.each{|slot,nodes|
- target = get_node_with_most_keys_in_slot(nodes,slot)
- xputs ">>> Covering slot #{slot} moving keys to #{target}"
-
- target.r.cluster('addslots',slot)
- target.r.cluster('setslot',slot,'stable')
- nodes.each{|src|
- next if src == target
- # Set the source node in 'importing' state (even if we will
- # actually migrate keys away) in order to avoid receiving
- # redirections for MIGRATE.
- src.r.cluster('setslot',slot,'importing',target.info[:name])
- move_slot(src,target,slot,:dots=>true,:fix=>true,:cold=>true)
- src.r.cluster('setslot',slot,'stable')
- }
- }
- end
- end
-
- # Return the owner of the specified slot
- def get_slot_owners(slot)
- owners = []
- @nodes.each{|n|
- next if n.has_flag?("slave")
- n.slots.each{|s,_|
- owners << n if s == slot
- }
- }
- owners
- end
-
- # Return the node, among 'nodes' with the greatest number of keys
- # in the specified slot.
- def get_node_with_most_keys_in_slot(nodes,slot)
- best = nil
- best_numkeys = 0
- @nodes.each{|n|
- next if n.has_flag?("slave")
- numkeys = n.r.cluster("countkeysinslot",slot)
- if numkeys > best_numkeys || best == nil
- best = n
- best_numkeys = numkeys
- end
- }
- return best
- end
-
- # Slot 'slot' was found to be in importing or migrating state in one or
- # more nodes. This function fixes this condition by migrating keys where
- # it seems more sensible.
- def fix_open_slot(slot)
- puts ">>> Fixing open slot #{slot}"
-
- # Try to obtain the current slot owner, according to the current
- # nodes configuration.
- owners = get_slot_owners(slot)
- owner = owners[0] if owners.length == 1
-
- migrating = []
- importing = []
- @nodes.each{|n|
- next if n.has_flag? "slave"
- if n.info[:migrating][slot]
- migrating << n
- elsif n.info[:importing][slot]
- importing << n
- elsif n.r.cluster("countkeysinslot",slot) > 0 && n != owner
- xputs "*** Found keys about slot #{slot} in node #{n}!"
- importing << n
- end
- }
- puts "Set as migrating in: #{migrating.join(",")}"
- puts "Set as importing in: #{importing.join(",")}"
-
- # If there is no slot owner, set as owner the slot with the biggest
- # number of keys, among the set of migrating / importing nodes.
- if !owner
- xputs ">>> Nobody claims ownership, selecting an owner..."
- owner = get_node_with_most_keys_in_slot(@nodes,slot)
-
- # If we still don't have an owner, we can't fix it.
- if !owner
- xputs "[ERR] Can't select a slot owner. Impossible to fix."
- exit 1
- end
-
- # Use ADDSLOTS to assign the slot.
- puts "*** Configuring #{owner} as the slot owner"
- owner.r.cluster("setslot",slot,"stable")
- owner.r.cluster("addslots",slot)
- # Make sure this information will propagate. Not strictly needed
- # since there is no past owner, so all the other nodes will accept
- # whatever epoch this node will claim the slot with.
- owner.r.cluster("bumpepoch")
-
- # Remove the owner from the list of migrating/importing
- # nodes.
- migrating.delete(owner)
- importing.delete(owner)
- end
-
- # If there are multiple owners of the slot, we need to fix it
- # so that a single node is the owner and all the other nodes
- # are in importing state. Later the fix can be handled by one
- # of the base cases above.
- #
- # Note that this case also covers multiple nodes having the slot
- # in migrating state, since migrating is a valid state only for
- # slot owners.
- if owners.length > 1
- owner = get_node_with_most_keys_in_slot(owners,slot)
- owners.each{|n|
- next if n == owner
- n.r.cluster('delslots',slot)
- n.r.cluster('setslot',slot,'importing',owner.info[:name])
- importing.delete(n) # Avoid duplciates
- importing << n
- }
- owner.r.cluster('bumpepoch')
- end
-
- # Case 1: The slot is in migrating state in one slot, and in
- # importing state in 1 slot. That's trivial to address.
- if migrating.length == 1 && importing.length == 1
- move_slot(migrating[0],importing[0],slot,:dots=>true,:fix=>true)
- # Case 2: There are multiple nodes that claim the slot as importing,
- # they probably got keys about the slot after a restart so opened
- # the slot. In this case we just move all the keys to the owner
- # according to the configuration.
- elsif migrating.length == 0 && importing.length > 0
- xputs ">>> Moving all the #{slot} slot keys to its owner #{owner}"
- importing.each {|node|
- next if node == owner
- move_slot(node,owner,slot,:dots=>true,:fix=>true,:cold=>true)
- xputs ">>> Setting #{slot} as STABLE in #{node}"
- node.r.cluster("setslot",slot,"stable")
- }
- # Case 3: There are no slots claiming to be in importing state, but
- # there is a migrating node that actually don't have any key. We
- # can just close the slot, probably a reshard interrupted in the middle.
- elsif importing.length == 0 && migrating.length == 1 &&
- migrating[0].r.cluster("getkeysinslot",slot,10).length == 0
- migrating[0].r.cluster("setslot",slot,"stable")
- else
- xputs "[ERR] Sorry, Redis-trib can't fix this slot yet (work in progress). Slot is set as migrating in #{migrating.join(",")}, as importing in #{importing.join(",")}, owner is #{owner}"
- end
- end
-
- # Check if all the nodes agree about the cluster configuration
- def check_config_consistency
- if !is_config_consistent?
- cluster_error "[ERR] Nodes don't agree about configuration!"
- else
- xputs "[OK] All nodes agree about slots configuration."
- end
- end
-
- def is_config_consistent?
- signatures=[]
- @nodes.each{|n|
- signatures << n.get_config_signature
- }
- return signatures.uniq.length == 1
- end
-
- def wait_cluster_join
- print "Waiting for the cluster to join"
- while !is_config_consistent?
- print "."
- STDOUT.flush
- sleep 1
- end
- print "\n"
- end
-
- def alloc_slots
- nodes_count = @nodes.length
- masters_count = @nodes.length / (@replicas+1)
- masters = []
-
- # The first step is to split instances by IP. This is useful as
- # we'll try to allocate master nodes in different physical machines
- # (as much as possible) and to allocate slaves of a given master in
- # different physical machines as well.
- #
- # This code assumes just that if the IP is different, than it is more
- # likely that the instance is running in a different physical host
- # or at least a different virtual machine.
- ips = {}
- @nodes.each{|n|
- ips[n.info[:host]] = [] if !ips[n.info[:host]]
- ips[n.info[:host]] << n
- }
-
- # Select master instances
- puts "Using #{masters_count} masters:"
- interleaved = []
- stop = false
- while not stop do
- # Take one node from each IP until we run out of nodes
- # across every IP.
- ips.each do |ip,nodes|
- if nodes.empty?
- # if this IP has no remaining nodes, check for termination
- if interleaved.length == nodes_count
- # stop when 'interleaved' has accumulated all nodes
- stop = true
- next
- end
- else
- # else, move one node from this IP to 'interleaved'
- interleaved.push nodes.shift
- end
- end
- end
-
- masters = interleaved.slice!(0, masters_count)
- nodes_count -= masters.length
-
- masters.each{|m| puts m}
-
- # Rotating the list sometimes helps to get better initial
- # anti-affinity before the optimizer runs.
- interleaved.push interleaved.shift
-
- # Alloc slots on masters. After interleaving to get just the first N
- # should be optimal. With slaves is more complex, see later...
- slots_per_node = ClusterHashSlots.to_f / masters_count
- first = 0
- cursor = 0.0
- masters.each_with_index{|n,masternum|
- last = (cursor+slots_per_node-1).round
- if last > ClusterHashSlots || masternum == masters.length-1
- last = ClusterHashSlots-1
- end
- last = first if last < first # Min step is 1.
- n.add_slots first..last
- first = last+1
- cursor += slots_per_node
- }
-
- # Select N replicas for every master.
- # We try to split the replicas among all the IPs with spare nodes
- # trying to avoid the host where the master is running, if possible.
- #
- # Note we loop two times. The first loop assigns the requested
- # number of replicas to each master. The second loop assigns any
- # remaining instances as extra replicas to masters. Some masters
- # may end up with more than their requested number of replicas, but
- # all nodes will be used.
- assignment_verbose = false
-
- [:requested,:unused].each do |assign|
- masters.each do |m|
- assigned_replicas = 0
- while assigned_replicas < @replicas
- break if nodes_count == 0
- if assignment_verbose
- if assign == :requested
- puts "Requesting total of #{@replicas} replicas " \
- "(#{assigned_replicas} replicas assigned " \
- "so far with #{nodes_count} total remaining)."
- elsif assign == :unused
- puts "Assigning extra instance to replication " \
- "role too (#{nodes_count} remaining)."
- end
- end
-
- # Return the first node not matching our current master
- node = interleaved.find{|n| n.info[:host] != m.info[:host]}
-
- # If we found a node, use it as a best-first match.
- # Otherwise, we didn't find a node on a different IP, so we
- # go ahead and use a same-IP replica.
- if node
- slave = node
- interleaved.delete node
- else
- slave = interleaved.shift
- end
- slave.set_as_replica(m.info[:name])
- nodes_count -= 1
- assigned_replicas += 1
- puts "Adding replica #{slave} to #{m}"
-
- # If we are in the "assign extra nodes" loop,
- # we want to assign one extra replica to each
- # master before repeating masters.
- # This break lets us assign extra replicas to masters
- # in a round-robin way.
- break if assign == :unused
- end
- end
- end
-
- optimize_anti_affinity
- end
-
- def optimize_anti_affinity
- score,aux = get_anti_affinity_score
- return if score == 0
-
- xputs ">>> Trying to optimize slaves allocation for anti-affinity"
-
- maxiter = 500*@nodes.length # Effort is proportional to cluster size...
- while maxiter > 0
- score,offenders = get_anti_affinity_score
- break if score == 0 # Optimal anti affinity reached
-
- # We'll try to randomly swap a slave's assigned master causing
- # an affinity problem with another random slave, to see if we
- # can improve the affinity.
- first = offenders.shuffle.first
- nodes = @nodes.select{|n| n != first && n.info[:replicate]}
- break if nodes.length == 0
- second = nodes.shuffle.first
-
- first_master = first.info[:replicate]
- second_master = second.info[:replicate]
- first.set_as_replica(second_master)
- second.set_as_replica(first_master)
-
- new_score,aux = get_anti_affinity_score
- # If the change actually makes thing worse, revert. Otherwise
- # leave as it is because the best solution may need a few
- # combined swaps.
- if new_score > score
- first.set_as_replica(first_master)
- second.set_as_replica(second_master)
- end
-
- maxiter -= 1
- end
-
- score,aux = get_anti_affinity_score
- if score == 0
- xputs "[OK] Perfect anti-affinity obtained!"
- elsif score >= 10000
- puts "[WARNING] Some slaves are in the same host as their master"
- else
- puts "[WARNING] Some slaves of the same master are in the same host"
- end
- end
-
- # Return the anti-affinity score, which is a measure of the amount of
- # violations of anti-affinity in the current cluster layout, that is, how
- # badly the masters and slaves are distributed in the different IP
- # addresses so that slaves of the same master are not in the master
- # host and are also in different hosts.
- #
- # The score is calculated as follows:
- #
- # SAME_AS_MASTER = 10000 * each slave in the same IP of its master.
- # SAME_AS_SLAVE = 1 * each slave having the same IP as another slave
- # of the same master.
- # FINAL_SCORE = SAME_AS_MASTER + SAME_AS_SLAVE
- #
- # So a greater score means a worse anti-affinity level, while zero
- # means perfect anti-affinity.
- #
- # The anti affinity optimizator will try to get a score as low as
- # possible. Since we do not want to sacrifice the fact that slaves should
- # not be in the same host as the master, we assign 10000 times the score
- # to this violation, so that we'll optimize for the second factor only
- # if it does not impact the first one.
- #
- # The function returns two things: the above score, and the list of
- # offending slaves, so that the optimizer can try changing the
- # configuration of the slaves violating the anti-affinity goals.
- def get_anti_affinity_score
- score = 0
- offending = [] # List of offending slaves to return to the caller
-
- # First, split nodes by host
- host_to_node = {}
- @nodes.each{|n|
- host = n.info[:host]
- host_to_node[host] = [] if host_to_node[host] == nil
- host_to_node[host] << n
- }
-
- # Then, for each set of nodes in the same host, split by
- # related nodes (masters and slaves which are involved in
- # replication of each other)
- host_to_node.each{|host,nodes|
- related = {}
- nodes.each{|n|
- if !n.info[:replicate]
- name = n.info[:name]
- related[name] = [] if related[name] == nil
- related[name] << :m
- else
- name = n.info[:replicate]
- related[name] = [] if related[name] == nil
- related[name] << :s
- end
- }
-
- # Now it's trivial to check, for each related group having the
- # same host, what is their local score.
- related.each{|id,types|
- next if types.length < 2
- types.sort! # Make sure :m if the first if any
- if types[0] == :m
- score += 10000 * (types.length-1)
- else
- score += 1 * types.length
- end
-
- # Populate the list of offending nodes
- @nodes.each{|n|
- if n.info[:replicate] == id &&
- n.info[:host] == host
- offending << n
- end
- }
- }
- }
- return score,offending
- end
-
- def flush_nodes_config
- @nodes.each{|n|
- n.flush_node_config
- }
- end
-
- def show_nodes
- @nodes.each{|n|
- xputs n.info_string
- }
- end
-
- # Redis Cluster config epoch collision resolution code is able to eventually
- # set a different epoch to each node after a new cluster is created, but
- # it is slow compared to assign a progressive config epoch to each node
- # before joining the cluster. However we do just a best-effort try here
- # since if we fail is not a problem.
- def assign_config_epoch
- config_epoch = 1
- @nodes.each{|n|
- begin
- n.r.cluster("set-config-epoch",config_epoch)
- rescue
- end
- config_epoch += 1
- }
- end
-
- def join_cluster
- # We use a brute force approach to make sure the node will meet
- # each other, that is, sending CLUSTER MEET messages to all the nodes
- # about the very same node.
- # Thanks to gossip this information should propagate across all the
- # cluster in a matter of seconds.
- first = false
- @nodes.each{|n|
- if !first then first = n.info; next; end # Skip the first node
- n.r.cluster("meet",first[:host],first[:port])
- }
- end
-
- def yes_or_die(msg)
- print "#{msg} (type 'yes' to accept): "
- STDOUT.flush
- if !(STDIN.gets.chomp.downcase == "yes")
- xputs "*** Aborting..."
- exit 1
- end
- end
-
- def load_cluster_info_from_node(nodeaddr)
- node = ClusterNode.new(nodeaddr)
- node.connect(:abort => true)
- node.assert_cluster
- node.load_info(:getfriends => true)
- add_node(node)
- node.friends.each{|f|
- next if f[:flags].index("noaddr") ||
- f[:flags].index("disconnected") ||
- f[:flags].index("fail")
- fnode = ClusterNode.new(f[:addr])
- fnode.connect()
- next if !fnode.r
- begin
- fnode.load_info()
- add_node(fnode)
- rescue => e
- xputs "[ERR] Unable to load info for node #{fnode}"
- end
- }
- populate_nodes_replicas_info
- end
-
- # This function is called by load_cluster_info_from_node in order to
- # add additional information to every node as a list of replicas.
- def populate_nodes_replicas_info
- # Start adding the new field to every node.
- @nodes.each{|n|
- n.info[:replicas] = []
- }
-
- # Populate the replicas field using the replicate field of slave
- # nodes.
- @nodes.each{|n|
- if n.info[:replicate]
- master = get_node_by_name(n.info[:replicate])
- if !master
- xputs "*** WARNING: #{n} claims to be slave of unknown node ID #{n.info[:replicate]}."
- else
- master.info[:replicas] << n
- end
- end
- }
- end
-
- # Given a list of source nodes return a "resharding plan"
- # with what slots to move in order to move "numslots" slots to another
- # instance.
- def compute_reshard_table(sources,numslots)
- moved = []
- # Sort from bigger to smaller instance, for two reasons:
- # 1) If we take less slots than instances it is better to start
- # getting from the biggest instances.
- # 2) We take one slot more from the first instance in the case of not
- # perfect divisibility. Like we have 3 nodes and need to get 10
- # slots, we take 4 from the first, and 3 from the rest. So the
- # biggest is always the first.
- sources = sources.sort{|a,b| b.slots.length <=> a.slots.length}
- source_tot_slots = sources.inject(0) {|sum,source|
- sum+source.slots.length
- }
- sources.each_with_index{|s,i|
- # Every node will provide a number of slots proportional to the
- # slots it has assigned.
- n = (numslots.to_f/source_tot_slots*s.slots.length)
- if i == 0
- n = n.ceil
- else
- n = n.floor
- end
- s.slots.keys.sort[(0...n)].each{|slot|
- if moved.length < numslots
- moved << {:source => s, :slot => slot}
- end
- }
- }
- return moved
- end
-
- def show_reshard_table(table)
- table.each{|e|
- puts " Moving slot #{e[:slot]} from #{e[:source].info[:name]}"
- }
- end
-
- # Move slots between source and target nodes using MIGRATE.
- #
- # Options:
- # :verbose -- Print a dot for every moved key.
- # :fix -- We are moving in the context of a fix. Use REPLACE.
- # :cold -- Move keys without opening slots / reconfiguring the nodes.
- # :update -- Update nodes.info[:slots] for source/target nodes.
- # :quiet -- Don't print info messages.
- def move_slot(source,target,slot,o={})
- o = {:pipeline => MigrateDefaultPipeline}.merge(o)
-
- # We start marking the slot as importing in the destination node,
- # and the slot as migrating in the target host. Note that the order of
- # the operations is important, as otherwise a client may be redirected
- # to the target node that does not yet know it is importing this slot.
- if !o[:quiet]
- print "Moving slot #{slot} from #{source} to #{target}: "
- STDOUT.flush
- end
-
- if !o[:cold]
- target.r.cluster("setslot",slot,"importing",source.info[:name])
- source.r.cluster("setslot",slot,"migrating",target.info[:name])
- end
- # Migrate all the keys from source to target using the MIGRATE command
- while true
- keys = source.r.cluster("getkeysinslot",slot,o[:pipeline])
- break if keys.length == 0
- begin
- source.r.client.call(["migrate",target.info[:host],target.info[:port],"",0,@timeout,:keys,*keys])
- rescue => e
- if o[:fix] && e.to_s =~ /BUSYKEY/
- xputs "*** Target key exists. Replacing it for FIX."
- source.r.client.call(["migrate",target.info[:host],target.info[:port],"",0,@timeout,:replace,:keys,*keys])
- else
- puts ""
- xputs "[ERR] Calling MIGRATE: #{e}"
- exit 1
- end
- end
- print "."*keys.length if o[:dots]
- STDOUT.flush
- end
-
- puts if !o[:quiet]
- # Set the new node as the owner of the slot in all the known nodes.
- if !o[:cold]
- @nodes.each{|n|
- next if n.has_flag?("slave")
- n.r.cluster("setslot",slot,"node",target.info[:name])
- }
- end
-
- # Update the node logical config
- if o[:update] then
- source.info[:slots].delete(slot)
- target.info[:slots][slot] = true
- end
- end
-
- # redis-trib subcommands implementations.
-
- def check_cluster_cmd(argv,opt)
- load_cluster_info_from_node(argv[0])
- check_cluster
- end
-
- def info_cluster_cmd(argv,opt)
- load_cluster_info_from_node(argv[0])
- show_cluster_info
- end
-
- def rebalance_cluster_cmd(argv,opt)
- opt = {
- 'pipeline' => MigrateDefaultPipeline,
- 'threshold' => RebalanceDefaultThreshold
- }.merge(opt)
-
- # Load nodes info before parsing options, otherwise we can't
- # handle --weight.
- load_cluster_info_from_node(argv[0])
-
- # Options parsing
- threshold = opt['threshold'].to_i
- autoweights = opt['auto-weights']
- weights = {}
- opt['weight'].each{|w|
- fields = w.split("=")
- node = get_node_by_abbreviated_name(fields[0])
- if !node || !node.has_flag?("master")
- puts "*** No such master node #{fields[0]}"
- exit 1
- end
- weights[node.info[:name]] = fields[1].to_f
- } if opt['weight']
- useempty = opt['use-empty-masters']
-
- # Assign a weight to each node, and compute the total cluster weight.
- total_weight = 0
- nodes_involved = 0
- @nodes.each{|n|
- if n.has_flag?("master")
- next if !useempty && n.slots.length == 0
- n.info[:w] = weights[n.info[:name]] ? weights[n.info[:name]] : 1
- total_weight += n.info[:w]
- nodes_involved += 1
- end
- }
-
- # Check cluster, only proceed if it looks sane.
- check_cluster(:quiet => true)
- if @errors.length != 0
- puts "*** Please fix your cluster problems before rebalancing"
- exit 1
- end
-
- # Calculate the slots balance for each node. It's the number of
- # slots the node should lose (if positive) or gain (if negative)
- # in order to be balanced.
- threshold = opt['threshold'].to_f
- threshold_reached = false
- @nodes.each{|n|
- if n.has_flag?("master")
- next if !n.info[:w]
- expected = ((ClusterHashSlots.to_f / total_weight) *
- n.info[:w]).to_i
- n.info[:balance] = n.slots.length - expected
- # Compute the percentage of difference between the
- # expected number of slots and the real one, to see
- # if it's over the threshold specified by the user.
- over_threshold = false
- if threshold > 0
- if n.slots.length > 0
- err_perc = (100-(100.0*expected/n.slots.length)).abs
- over_threshold = true if err_perc > threshold
- elsif expected > 0
- over_threshold = true
- end
- end
- threshold_reached = true if over_threshold
- end
- }
- if !threshold_reached
- xputs "*** No rebalancing needed! All nodes are within the #{threshold}% threshold."
- return
- end
-
- # Only consider nodes we want to change
- sn = @nodes.select{|n|
- n.has_flag?("master") && n.info[:w]
- }
-
- # Because of rounding, it is possible that the balance of all nodes
- # summed does not give 0. Make sure that nodes that have to provide
- # slots are always matched by nodes receiving slots.
- total_balance = sn.map{|x| x.info[:balance]}.reduce{|a,b| a+b}
- while total_balance > 0
- sn.each{|n|
- if n.info[:balance] < 0 && total_balance > 0
- n.info[:balance] -= 1
- total_balance -= 1
- end
- }
- end
-
- # Sort nodes by their slots balance.
- sn = sn.sort{|a,b|
- a.info[:balance] <=> b.info[:balance]
- }
-
- xputs ">>> Rebalancing across #{nodes_involved} nodes. Total weight = #{total_weight}"
-
- if $verbose
- sn.each{|n|
- puts "#{n} balance is #{n.info[:balance]} slots"
- }
- end
-
- # Now we have at the start of the 'sn' array nodes that should get
- # slots, at the end nodes that must give slots.
- # We take two indexes, one at the start, and one at the end,
- # incrementing or decrementing the indexes accordingly til we
- # find nodes that need to get/provide slots.
- dst_idx = 0
- src_idx = sn.length - 1
-
- while dst_idx < src_idx
- dst = sn[dst_idx]
- src = sn[src_idx]
- numslots = [dst.info[:balance],src.info[:balance]].map{|n|
- n.abs
- }.min
-
- if numslots > 0
- puts "Moving #{numslots} slots from #{src} to #{dst}"
-
- # Actually move the slots.
- reshard_table = compute_reshard_table([src],numslots)
- if reshard_table.length != numslots
- xputs "*** Assertio failed: Reshard table != number of slots"
- exit 1
- end
- if opt['simulate']
- print "#"*reshard_table.length
- else
- reshard_table.each{|e|
- move_slot(e[:source],dst,e[:slot],
- :quiet=>true,
- :dots=>false,
- :update=>true,
- :pipeline=>opt['pipeline'])
- print "#"
- STDOUT.flush
- }
- end
- puts
- end
-
- # Update nodes balance.
- dst.info[:balance] += numslots
- src.info[:balance] -= numslots
- dst_idx += 1 if dst.info[:balance] == 0
- src_idx -= 1 if src.info[:balance] == 0
- end
- end
-
- def fix_cluster_cmd(argv,opt)
- @fix = true
- @timeout = opt['timeout'].to_i if opt['timeout']
-
- load_cluster_info_from_node(argv[0])
- check_cluster
- end
-
- def reshard_cluster_cmd(argv,opt)
- opt = {'pipeline' => MigrateDefaultPipeline}.merge(opt)
-
- load_cluster_info_from_node(argv[0])
- check_cluster
- if @errors.length != 0
- puts "*** Please fix your cluster problems before resharding"
- exit 1
- end
-
- @timeout = opt['timeout'].to_i if opt['timeout'].to_i
-
- # Get number of slots
- if opt['slots']
- numslots = opt['slots'].to_i
- else
- numslots = 0
- while numslots <= 0 or numslots > ClusterHashSlots
- print "How many slots do you want to move (from 1 to #{ClusterHashSlots})? "
- numslots = STDIN.gets.to_i
- end
- end
-
- # Get the target instance
- if opt['to']
- target = get_node_by_name(opt['to'])
- if !target || target.has_flag?("slave")
- xputs "*** The specified node is not known or not a master, please retry."
- exit 1
- end
- else
- target = nil
- while not target
- print "What is the receiving node ID? "
- target = get_node_by_name(STDIN.gets.chop)
- if !target || target.has_flag?("slave")
- xputs "*** The specified node is not known or not a master, please retry."
- target = nil
- end
- end
- end
-
- # Get the source instances
- sources = []
- if opt['from']
- opt['from'].split(',').each{|node_id|
- if node_id == "all"
- sources = "all"
- break
- end
- src = get_node_by_name(node_id)
- if !src || src.has_flag?("slave")
- xputs "*** The specified node is not known or is not a master, please retry."
- exit 1
- end
- sources << src
- }
- else
- xputs "Please enter all the source node IDs."
- xputs " Type 'all' to use all the nodes as source nodes for the hash slots."
- xputs " Type 'done' once you entered all the source nodes IDs."
- while true
- print "Source node ##{sources.length+1}:"
- line = STDIN.gets.chop
- src = get_node_by_name(line)
- if line == "done"
- break
- elsif line == "all"
- sources = "all"
- break
- elsif !src || src.has_flag?("slave")
- xputs "*** The specified node is not known or is not a master, please retry."
- elsif src.info[:name] == target.info[:name]
- xputs "*** It is not possible to use the target node as source node."
- else
- sources << src
- end
- end
- end
+COMMANDS = %w(create check info fix reshard rebalance add-node
+ del-node set-timeout call import help)
- if sources.length == 0
- puts "*** No source nodes given, operation aborted"
- exit 1
- end
-
- # Handle soures == all.
- if sources == "all"
- sources = []
- @nodes.each{|n|
- next if n.info[:name] == target.info[:name]
- next if n.has_flag?("slave")
- sources << n
- }
- end
-
- # Check if the destination node is the same of any source nodes.
- if sources.index(target)
- xputs "*** Target node is also listed among the source nodes!"
- exit 1
- end
-
- puts "\nReady to move #{numslots} slots."
- puts " Source nodes:"
- sources.each{|s| puts " "+s.info_string}
- puts " Destination node:"
- puts " #{target.info_string}"
- reshard_table = compute_reshard_table(sources,numslots)
- puts " Resharding plan:"
- show_reshard_table(reshard_table)
- if !opt['yes']
- print "Do you want to proceed with the proposed reshard plan (yes/no)? "
- yesno = STDIN.gets.chop
- exit(1) if (yesno != "yes")
- end
- reshard_table.each{|e|
- move_slot(e[:source],target,e[:slot],
- :dots=>true,
- :pipeline=>opt['pipeline'])
- }
- end
-
- # This is an helper function for create_cluster_cmd that verifies if
- # the number of nodes and the specified replicas have a valid configuration
- # where there are at least three master nodes and enough replicas per node.
- def check_create_parameters
- masters = @nodes.length/(@replicas+1)
- if masters < 3
- puts "*** ERROR: Invalid configuration for cluster creation."
- puts "*** Redis Cluster requires at least 3 master nodes."
- puts "*** This is not possible with #{@nodes.length} nodes and #{@replicas} replicas per node."
- puts "*** At least #{3*(@replicas+1)} nodes are required."
- exit 1
- end
- end
-
- def create_cluster_cmd(argv,opt)
- opt = {'replicas' => 0}.merge(opt)
- @replicas = opt['replicas'].to_i
-
- xputs ">>> Creating cluster"
- argv[0..-1].each{|n|
- node = ClusterNode.new(n)
- node.connect(:abort => true)
- node.assert_cluster
- node.load_info
- node.assert_empty
- add_node(node)
- }
- check_create_parameters
- xputs ">>> Performing hash slots allocation on #{@nodes.length} nodes..."
- alloc_slots
- show_nodes
- yes_or_die "Can I set the above configuration?"
- flush_nodes_config
- xputs ">>> Nodes configuration updated"
- xputs ">>> Assign a different config epoch to each node"
- assign_config_epoch
- xputs ">>> Sending CLUSTER MEET messages to join the cluster"
- join_cluster
- # Give one second for the join to start, in order to avoid that
- # wait_cluster_join will find all the nodes agree about the config as
- # they are still empty with unassigned slots.
- sleep 1
- wait_cluster_join
- flush_nodes_config # Useful for the replicas
- # Reset the node information, so that when the
- # final summary is listed in check_cluster about the newly created cluster
- # all the nodes would get properly listed as slaves or masters
- reset_nodes
- load_cluster_info_from_node(argv[0])
- check_cluster
- end
-
- def addnode_cluster_cmd(argv,opt)
- xputs ">>> Adding node #{argv[0]} to cluster #{argv[1]}"
-
- # Check the existing cluster
- load_cluster_info_from_node(argv[1])
- check_cluster
+ALLOWED_OPTIONS={
+ "create" => {"replicas" => true},
+ "add-node" => {"slave" => false, "master-id" => true},
+ "import" => {"from" => :required, "copy" => false, "replace" => false},
+ "reshard" => {"from" => true, "to" => true, "slots" => true, "yes" => false, "timeout" => true, "pipeline" => true},
+ "rebalance" => {"weight" => [], "auto-weights" => false, "use-empty-masters" => false, "timeout" => true, "simulate" => false, "pipeline" => true, "threshold" => true},
+ "fix" => {"timeout" => 0},
+}
- # If --master-id was specified, try to resolve it now so that we
- # abort before starting with the node configuration.
- if opt['slave']
- if opt['master-id']
- master = get_node_by_name(opt['master-id'])
- if !master
- xputs "[ERR] No such master ID #{opt['master-id']}"
- end
+def parse_options(cmd)
+ cmd = cmd.downcase
+ idx = 0
+ options = {}
+ args = []
+ while (arg = ARGV.shift)
+ if arg[0..1] == "--"
+ option = arg[2..-1]
+
+ # --verbose is a global option
+ if option == "--verbose"
+ options['verbose'] = true
+ next
+ end
+ if ALLOWED_OPTIONS[cmd] == nil ||
+ ALLOWED_OPTIONS[cmd][option] == nil
+ next
+ end
+ if ALLOWED_OPTIONS[cmd][option] != false
+ value = ARGV.shift
+ next if !value
else
- master = get_master_with_least_replicas
- xputs "Automatically selected master #{master}"
- end
- end
-
- # Add the new node
- new = ClusterNode.new(argv[0])
- new.connect(:abort => true)
- new.assert_cluster
- new.load_info
- new.assert_empty
- first = @nodes.first.info
- add_node(new)
-
- # Send CLUSTER MEET command to the new node
- xputs ">>> Send CLUSTER MEET to node #{new} to make it join the cluster."
- new.r.cluster("meet",first[:host],first[:port])
-
- # Additional configuration is needed if the node is added as
- # a slave.
- if opt['slave']
- wait_cluster_join
- xputs ">>> Configure node as replica of #{master}."
- new.r.cluster("replicate",master.info[:name])
- end
- xputs "[OK] New node added correctly."
- end
-
- def delnode_cluster_cmd(argv,opt)
- id = argv[1].downcase
- xputs ">>> Removing node #{id} from cluster #{argv[0]}"
-
- # Load cluster information
- load_cluster_info_from_node(argv[0])
-
- # Check if the node exists and is not empty
- node = get_node_by_name(id)
-
- if !node
- xputs "[ERR] No such node ID #{id}"
- exit 1
- end
-
- if node.slots.length != 0
- xputs "[ERR] Node #{node} is not empty! Reshard data away and try again."
- exit 1
- end
-
- # Send CLUSTER FORGET to all the nodes but the node to remove
- xputs ">>> Sending CLUSTER FORGET messages to the cluster..."
- @nodes.each{|n|
- next if n == node
- if n.info[:replicate] && n.info[:replicate].downcase == id
- # Reconfigure the slave to replicate with some other node
- master = get_master_with_least_replicas
- xputs ">>> #{n} as replica of #{master}"
- n.r.cluster("replicate",master.info[:name])
- end
- n.r.cluster("forget",argv[1])
- }
-
- # Finally shutdown the node
- xputs ">>> SHUTDOWN the node."
- node.r.shutdown
- end
-
- def set_timeout_cluster_cmd(argv,opt)
- timeout = argv[1].to_i
- if timeout < 100
- puts "Setting a node timeout of less than 100 milliseconds is a bad idea."
- exit 1
- end
-
- # Load cluster information
- load_cluster_info_from_node(argv[0])
- ok_count = 0
- err_count = 0
-
- # Send CLUSTER FORGET to all the nodes but the node to remove
- xputs ">>> Reconfiguring node timeout in every cluster node..."
- @nodes.each{|n|
- begin
- n.r.config("set","cluster-node-timeout",timeout)
- n.r.config("rewrite")
- ok_count += 1
- xputs "*** New timeout set for #{n}"
- rescue => e
- puts "ERR setting node-timeot for #{n}: #{e}"
- err_count += 1
- end
- }
- xputs ">>> New node timeout set. #{ok_count} OK, #{err_count} ERR."
- end
-
- def call_cluster_cmd(argv,opt)
- cmd = argv[1..-1]
- cmd[0] = cmd[0].upcase
-
- # Load cluster information
- load_cluster_info_from_node(argv[0])
- xputs ">>> Calling #{cmd.join(" ")}"
- @nodes.each{|n|
- begin
- res = n.r.send(*cmd)
- puts "#{n}: #{res}"
- rescue => e
- puts "#{n}: #{e}"
+ value = true
end
- }
- end
-
- def import_cluster_cmd(argv,opt)
- source_addr = opt['from']
- xputs ">>> Importing data from #{source_addr} to cluster #{argv[1]}"
- use_copy = opt['copy']
- use_replace = opt['replace']
- # Check the existing cluster.
- load_cluster_info_from_node(argv[0])
- check_cluster
-
- # Connect to the source node.
- xputs ">>> Connecting to the source Redis instance"
- src_host,src_port = source_addr.split(":")
- source = Redis.new(:host =>src_host, :port =>src_port)
- if source.info['cluster_enabled'].to_i == 1
- xputs "[ERR] The source node should not be a cluster node."
- end
- xputs "*** Importing #{source.dbsize} keys from DB 0"
-
- # Build a slot -> node map
- slots = {}
- @nodes.each{|n|
- n.slots.each{|s,_|
- slots[s] = n
- }
- }
-
- # Use SCAN to iterate over the keys, migrating to the
- # right node as needed.
- cursor = nil
- while cursor != 0
- cursor,keys = source.scan(cursor, :count => 1000)
- cursor = cursor.to_i
- keys.each{|k|
- # Migrate keys using the MIGRATE command.
- slot = key_to_slot(k)
- target = slots[slot]
- print "Migrating #{k} to #{target}: "
- STDOUT.flush
- begin
- cmd = ["migrate",target.info[:host],target.info[:port],k,0,@timeout]
- cmd << :copy if use_copy
- cmd << :replace if use_replace
- source.client.call(cmd)
- rescue => e
- puts e
- else
- puts "OK"
- end
- }
- end
- end
-
- def help_cluster_cmd(argv,opt)
- show_help
- exit 0
- end
-
- # Parse the options for the specific command "cmd".
- # Returns an hash populate with option => value pairs, and the index of
- # the first non-option argument in ARGV.
- def parse_options(cmd)
- idx = 1 ; # Current index into ARGV
- options={}
- while idx < ARGV.length && ARGV[idx][0..1] == '--'
- if ARGV[idx][0..1] == "--"
- option = ARGV[idx][2..-1]
- idx += 1
-
- # --verbose is a global option
- if option == "verbose"
- $verbose = true
- next
- end
-
- if ALLOWED_OPTIONS[cmd] == nil || ALLOWED_OPTIONS[cmd][option] == nil
- puts "Unknown option '#{option}' for command '#{cmd}'"
- exit 1
- end
- if ALLOWED_OPTIONS[cmd][option] != false
- value = ARGV[idx]
- idx += 1
- else
- value = true
- end
-
- # If the option is set to [], it's a multiple arguments
- # option. We just queue every new value into an array.
- if ALLOWED_OPTIONS[cmd][option] == []
- options[option] = [] if !options[option]
- options[option] << value
- else
- options[option] = value
- end
+ # If the option is set to [], it's a multiple arguments
+ # option. We just queue every new value into an array.
+ if ALLOWED_OPTIONS[cmd][option] == []
+ options[option] = [] if !options[option]
+ options[option] << value
else
- # Remaining arguments are not options.
- break
+ options[option] = value
end
+ else
+ next if arg[0,1] == '-'
+ args << arg
end
-
- # Enforce mandatory options
- if ALLOWED_OPTIONS[cmd]
- ALLOWED_OPTIONS[cmd].each {|option,val|
- if !options[option] && val == :required
- puts "Option '--#{option}' is required "+ \
- "for subcommand '#{cmd}'"
- exit 1
- end
- }
- end
- return options,idx
end
-end
-
-#################################################################################
-# Libraries
-#
-# We try to don't depend on external libs since this is a critical part
-# of Redis Cluster.
-#################################################################################
-# This is the CRC16 algorithm used by Redis Cluster to hash keys.
-# Implementation according to CCITT standards.
-#
-# This is actually the XMODEM CRC 16 algorithm, using the
-# following parameters:
-#
-# Name : "XMODEM", also known as "ZMODEM", "CRC-16/ACORN"
-# Width : 16 bit
-# Poly : 1021 (That is actually x^16 + x^12 + x^5 + 1)
-# Initialization : 0000
-# Reflect Input byte : False
-# Reflect Output CRC : False
-# Xor constant to output CRC : 0000
-# Output for "123456789" : 31C3
-
-module RedisClusterCRC16
- def RedisClusterCRC16.crc16(bytes)
- crc = 0
- bytes.each_byte{|b|
- crc = ((crc<<8) & 0xffff) ^ XMODEMCRC16Lookup[((crc>>8)^b) & 0xff]
- }
- crc
- end
-
-private
- XMODEMCRC16Lookup = [
- 0x0000,0x1021,0x2042,0x3063,0x4084,0x50a5,0x60c6,0x70e7,
- 0x8108,0x9129,0xa14a,0xb16b,0xc18c,0xd1ad,0xe1ce,0xf1ef,
- 0x1231,0x0210,0x3273,0x2252,0x52b5,0x4294,0x72f7,0x62d6,
- 0x9339,0x8318,0xb37b,0xa35a,0xd3bd,0xc39c,0xf3ff,0xe3de,
- 0x2462,0x3443,0x0420,0x1401,0x64e6,0x74c7,0x44a4,0x5485,
- 0xa56a,0xb54b,0x8528,0x9509,0xe5ee,0xf5cf,0xc5ac,0xd58d,
- 0x3653,0x2672,0x1611,0x0630,0x76d7,0x66f6,0x5695,0x46b4,
- 0xb75b,0xa77a,0x9719,0x8738,0xf7df,0xe7fe,0xd79d,0xc7bc,
- 0x48c4,0x58e5,0x6886,0x78a7,0x0840,0x1861,0x2802,0x3823,
- 0xc9cc,0xd9ed,0xe98e,0xf9af,0x8948,0x9969,0xa90a,0xb92b,
- 0x5af5,0x4ad4,0x7ab7,0x6a96,0x1a71,0x0a50,0x3a33,0x2a12,
- 0xdbfd,0xcbdc,0xfbbf,0xeb9e,0x9b79,0x8b58,0xbb3b,0xab1a,
- 0x6ca6,0x7c87,0x4ce4,0x5cc5,0x2c22,0x3c03,0x0c60,0x1c41,
- 0xedae,0xfd8f,0xcdec,0xddcd,0xad2a,0xbd0b,0x8d68,0x9d49,
- 0x7e97,0x6eb6,0x5ed5,0x4ef4,0x3e13,0x2e32,0x1e51,0x0e70,
- 0xff9f,0xefbe,0xdfdd,0xcffc,0xbf1b,0xaf3a,0x9f59,0x8f78,
- 0x9188,0x81a9,0xb1ca,0xa1eb,0xd10c,0xc12d,0xf14e,0xe16f,
- 0x1080,0x00a1,0x30c2,0x20e3,0x5004,0x4025,0x7046,0x6067,
- 0x83b9,0x9398,0xa3fb,0xb3da,0xc33d,0xd31c,0xe37f,0xf35e,
- 0x02b1,0x1290,0x22f3,0x32d2,0x4235,0x5214,0x6277,0x7256,
- 0xb5ea,0xa5cb,0x95a8,0x8589,0xf56e,0xe54f,0xd52c,0xc50d,
- 0x34e2,0x24c3,0x14a0,0x0481,0x7466,0x6447,0x5424,0x4405,
- 0xa7db,0xb7fa,0x8799,0x97b8,0xe75f,0xf77e,0xc71d,0xd73c,
- 0x26d3,0x36f2,0x0691,0x16b0,0x6657,0x7676,0x4615,0x5634,
- 0xd94c,0xc96d,0xf90e,0xe92f,0x99c8,0x89e9,0xb98a,0xa9ab,
- 0x5844,0x4865,0x7806,0x6827,0x18c0,0x08e1,0x3882,0x28a3,
- 0xcb7d,0xdb5c,0xeb3f,0xfb1e,0x8bf9,0x9bd8,0xabbb,0xbb9a,
- 0x4a75,0x5a54,0x6a37,0x7a16,0x0af1,0x1ad0,0x2ab3,0x3a92,
- 0xfd2e,0xed0f,0xdd6c,0xcd4d,0xbdaa,0xad8b,0x9de8,0x8dc9,
- 0x7c26,0x6c07,0x5c64,0x4c45,0x3ca2,0x2c83,0x1ce0,0x0cc1,
- 0xef1f,0xff3e,0xcf5d,0xdf7c,0xaf9b,0xbfba,0x8fd9,0x9ff8,
- 0x6e17,0x7e36,0x4e55,0x5e74,0x2e93,0x3eb2,0x0ed1,0x1ef0
- ]
-end
-
-# Turn a key name into the corresponding Redis Cluster slot.
-def key_to_slot(key)
- # Only hash what is inside {...} if there is such a pattern in the key.
- # Note that the specification requires the content that is between
- # the first { and the first } after the first {. If we found {} without
- # nothing in the middle, the whole key is hashed as usually.
- s = key.index "{"
- if s
- e = key.index "}",s+1
- if e && e != s+1
- key = key[s+1..e-1]
- end
- end
- RedisClusterCRC16.crc16(key) % 16384
+ return options,args
end
-#################################################################################
-# Definition of commands
-#################################################################################
-
-COMMANDS={
- "create" => ["create_cluster_cmd", -2, "host1:port1 ... hostN:portN"],
- "check" => ["check_cluster_cmd", 2, "host:port"],
- "info" => ["info_cluster_cmd", 2, "host:port"],
- "fix" => ["fix_cluster_cmd", 2, "host:port"],
- "reshard" => ["reshard_cluster_cmd", 2, "host:port"],
- "rebalance" => ["rebalance_cluster_cmd", -2, "host:port"],
- "add-node" => ["addnode_cluster_cmd", 3, "new_host:new_port existing_host:existing_port"],
- "del-node" => ["delnode_cluster_cmd", 3, "host:port node_id"],
- "set-timeout" => ["set_timeout_cluster_cmd", 3, "host:port milliseconds"],
- "call" => ["call_cluster_cmd", -3, "host:port command arg arg .. arg"],
- "import" => ["import_cluster_cmd", 2, "host:port"],
- "help" => ["help_cluster_cmd", 1, "(show this help)"]
-}
-
-ALLOWED_OPTIONS={
- "create" => {"replicas" => true},
- "add-node" => {"slave" => false, "master-id" => true},
- "import" => {"from" => :required, "copy" => false, "replace" => false},
- "reshard" => {"from" => true, "to" => true, "slots" => true, "yes" => false, "timeout" => true, "pipeline" => true},
- "rebalance" => {"weight" => [], "auto-weights" => false, "use-empty-masters" => false, "timeout" => true, "simulate" => false, "pipeline" => true, "threshold" => true},
- "fix" => {"timeout" => MigrateDefaultTimeout},
-}
-
-def show_help
- puts "Usage: redis-trib <command> <options> <arguments ...>\n\n"
- COMMANDS.each{|k,v|
- puts " #{k.ljust(15)} #{v[2]}"
- if ALLOWED_OPTIONS[k]
- ALLOWED_OPTIONS[k].each{|optname,has_arg|
- puts " --#{optname}" + (has_arg ? " <arg>" : "")
- }
+def command_example(cmd, args, opts)
+ cmd = "redis-cli --cluster #{cmd}"
+ args.each{|a|
+ a = a.to_s
+ a = a.inspect if a[' ']
+ cmd << " #{a}"
+ }
+ opts.each{|opt, val|
+ opt = " --cluster-#{opt.downcase}"
+ if val != true
+ val = val.join(' ') if val.is_a? Array
+ opt << " #{val}"
end
+ cmd << opt
}
- puts "\nFor check, fix, reshard, del-node, set-timeout you can specify the host and port of any working node in the cluster.\n"
-end
-
-# Sanity check
-if ARGV.length == 0
- show_help
- exit 1
+ cmd
end
-rt = RedisTrib.new
-cmd_spec = COMMANDS[ARGV[0].downcase]
-if !cmd_spec
- puts "Unknown redis-trib subcommand '#{ARGV[0]}'"
- exit 1
+$command = ARGV.shift
+$opts, $args = parse_options($command) if $command
+
+puts "WARNING: redis-trib.rb is not longer available!".yellow
+puts "You should use #{'redis-cli'.bold} instead."
+puts ''
+puts "All commands and features belonging to redis-trib.rb "+
+ "have been moved\nto redis-cli."
+puts "In order to use them you should call redis-cli with the #{'--cluster'.bold}"
+puts "option followed by the subcommand name, arguments and options."
+puts ''
+puts "Use the following syntax:"
+puts "redis-cli --cluster SUBCOMMAND [ARGUMENTS] [OPTIONS]".bold
+puts ''
+puts "Example:"
+if $command
+ example = command_example $command, $args, $opts
+else
+ example = "redis-cli --cluster info 127.0.0.1:7000"
end
-
-# Parse options
-cmd_options,first_non_option = rt.parse_options(ARGV[0].downcase)
-rt.check_arity(cmd_spec[1],ARGV.length-(first_non_option-1))
-
-# Dispatch
-rt.send(cmd_spec[0],ARGV[first_non_option..-1],cmd_options)
+puts example.bold
+puts ''
+puts "To get help about all subcommands, type:"
+puts "redis-cli --cluster help".bold
+puts ''
+exit 1