Commit d5523cc7 by Jim Abramson

Merge pull request #104 from edx/jsa/index-migr2

Revise/add rake tasks for search index mgmt.  Bump Tire to 0.6.2.
parents 45b1d661 37aba9df
...@@ -35,7 +35,7 @@ gem 'will_paginate_mongoid' ...@@ -35,7 +35,7 @@ gem 'will_paginate_mongoid'
gem 'rdiscount' gem 'rdiscount'
gem 'nokogiri' gem 'nokogiri'
gem 'tire', "0.5.2" gem 'tire', "0.6.2"
gem 'tire-contrib' gem 'tire-contrib'
gem 'dalli' gem 'dalli'
......
...@@ -64,6 +64,7 @@ GEM ...@@ -64,6 +64,7 @@ GEM
multi_json (~> 1.0) multi_json (~> 1.0)
ampex (2.0.0) ampex (2.0.0)
blankslate blankslate
ansi (1.4.3)
blankslate (2.1.2.4) blankslate (2.1.2.4)
bson (1.6.4) bson (1.6.4)
bson_ext (1.6.4) bson_ext (1.6.4)
...@@ -99,7 +100,7 @@ GEM ...@@ -99,7 +100,7 @@ GEM
origin (~> 1.0) origin (~> 1.0)
tzinfo (~> 0.3.22) tzinfo (~> 0.3.22)
moped (1.5.1) moped (1.5.1)
multi_json (1.9.2) multi_json (1.10.0)
newrelic_moped (0.0.6) newrelic_moped (0.0.6)
moped moped
newrelic_rpm (~> 3.6.0) newrelic_rpm (~> 3.6.0)
...@@ -121,7 +122,7 @@ GEM ...@@ -121,7 +122,7 @@ GEM
rack (>= 1.0) rack (>= 1.0)
rack-timeout (0.1.0beta3) rack-timeout (0.1.0beta3)
raindrops (0.10.0) raindrops (0.10.0)
rake (10.2.2) rake (10.3.1)
rdiscount (1.6.8) rdiscount (1.6.8)
rest-client (1.6.7) rest-client (1.6.7)
mime-types (>= 1.16) mime-types (>= 1.16)
...@@ -148,10 +149,12 @@ GEM ...@@ -148,10 +149,12 @@ GEM
tilt (~> 1.1, != 1.3.0) tilt (~> 1.1, != 1.3.0)
thor (0.16.0) thor (0.16.0)
tilt (1.3.3) tilt (1.3.3)
tire (0.5.2) tire (0.6.2)
activemodel (>= 3.0) activemodel (>= 3.0)
activesupport
ansi
hashr (~> 0.0.19) hashr (~> 0.0.19)
multi_json (~> 1.0) multi_json (~> 1.3)
rake rake
rest-client (~> 1.6) rest-client (~> 1.6)
tire-contrib (0.1.1) tire-contrib (0.1.1)
...@@ -202,7 +205,7 @@ DEPENDENCIES ...@@ -202,7 +205,7 @@ DEPENDENCIES
rspec rspec
simplecov simplecov
sinatra sinatra
tire (= 0.5.2) tire (= 0.6.2)
tire-contrib tire-contrib
unicorn unicorn
voteable_mongo! voteable_mongo!
......
...@@ -225,6 +225,21 @@ namespace :db do ...@@ -225,6 +225,21 @@ namespace :db do
end end
task :add_anonymous_to_peers => :environment do
Content.collection.find(:anonymous_to_peers=>nil).update_all({"$set" => {'anonymous_to_peers' => false}})
end
end
namespace :search do
def get_number_of_primary_shards(index_name)
res = Tire::Configuration.client.get "#{Tire::Configuration.url}/#{index_name}/_status"
status = JSON.parse res.body
status["indices"].first[1]["shards"].size
end
def create_index_for_class(klass) def create_index_for_class(klass)
# create the new index with a unique name # create the new index with a unique name
new_index = Tire.index klass.tire.index.name << '_' << Time.now.strftime('%Y%m%d%H%M%S') new_index = Tire.index klass.tire.index.name << '_' << Time.now.strftime('%Y%m%d%H%M%S')
...@@ -234,20 +249,18 @@ namespace :db do ...@@ -234,20 +249,18 @@ namespace :db do
new_index new_index
end end
def import_from_cursor(cursor, index, page_size) def import_from_cursor(cursor, index, opts)
tot = cursor.count tot = cursor.count
cnt = 0 cnt = 0
t = Time.now t = Time.now
index.import cursor, {:method => :paginate, :per_page => page_size} do |documents| index.import cursor, {:method => :paginate, :per_page => opts[:batch_size]} do |documents|
# GC.start call is backport of memory leak fix in more recent vers. of tire if cnt % opts[:batch_size] == 0 then
# see https://github.com/karmi/tire/pull/658
GC.start
if cnt % 1000 == 0 then
elapsed_secs = (Time.now - t).round(2) elapsed_secs = (Time.now - t).round(2)
pct_complete = (100 * (cnt/tot.to_f)).round(2) pct_complete = (100 * (cnt/tot.to_f)).round(2)
LOG.info "#{index.name}: imported #{cnt} of #{tot} (#{pct_complete}% complete after #{elapsed_secs} seconds)" LOG.info "#{index.name}: imported #{cnt} of #{tot} (#{pct_complete}% complete after #{elapsed_secs} seconds)"
end end
cnt += documents.length cnt += documents.length
sleep opts[:sleep_time]
documents documents
end end
LOG.info "#{index.name}: finished importing #{cnt} documents" LOG.info "#{index.name}: finished importing #{cnt} documents"
...@@ -279,13 +292,13 @@ namespace :db do ...@@ -279,13 +292,13 @@ namespace :db do
true true
end end
def do_reindex (classname, force_rebuild=false) def do_reindex (classname, opts, in_place=false)
# get a reference to the model class (and make sure it's a model class with tire hooks) # get a reference to the model class (and make sure it's a model class with tire hooks)
klass = CommentService.const_get(classname) klass = CommentService.const_get(classname)
raise RuntimeError unless klass.instance_of? Class raise RuntimeError unless klass.instance_of? Class
raise RuntimeError unless klass.respond_to? "tire" raise RuntimeError unless klass.respond_to? "tire"
t1 = Time.now # we will need to refer back to this point in time later... start_time = Time.now
# create the new index with a unique name # create the new index with a unique name
new_index = create_index_for_class(klass) new_index = create_index_for_class(klass)
# unless the user is forcing a rebuild, or the index does not yet exist, we # unless the user is forcing a rebuild, or the index does not yet exist, we
...@@ -300,71 +313,129 @@ namespace :db do ...@@ -300,71 +313,129 @@ namespace :db do
alias_name = old_index.name alias_name = old_index.name
alias_ = Tire::Alias.find alias_name alias_ = Tire::Alias.find alias_name
if alias_.nil? then if alias_.nil? then
# edge case.
# the alias doesn't exist, so we know the index was auto-created. # the alias doesn't exist, so we know the index was auto-created.
# We will delete it and replace it with an alias. # We will delete it and replace it with an alias.
is_rebuild = true raise RuntimeError, 'Cannot reindex in-place, no valid source index' if in_place
old_index.delete old_index.delete
# NOTE on the small chance that another process re-auto-creates the index # NOTE on the small chance that another process re-auto-creates the index
# we just deleted before we have a chance to create the alias, this next # we just deleted before we have a chance to create the alias, this next
# call will fail. # call will fail.
move_alias_to(alias_name, new_index) move_alias_to(alias_name, new_index)
else
is_rebuild = force_rebuild
end end
op = is_rebuild ? "(re)build index for" : "reindex" op = in_place ? "reindex" : "(re)build index for"
LOG.info "preparing to #{op} CommentService::#{classname}" LOG.info "preparing to #{op} CommentService::#{classname}"
# ensure there's no identity mapping or caching going on while we do this # ensure there's no identity mapping or caching going on while we do this
Mongoid.identity_map_enabled = false Mongoid.identity_map_enabled = false
Mongoid.unit_of_work(disable: :all) do Mongoid.unit_of_work(disable: :all) do
if is_rebuild then if in_place then
# fetch all the documents ever, up til t1
cursor = klass.where(:updated_at.lte => t1)
# import them to the new index
import_from_cursor(cursor, new_index, 200)
else
# reindex, moving source documents directly from old index to new # reindex, moving source documents directly from old index to new
LOG.info "copying documents from original index (this may take a while!)" LOG.info "copying documents from original index (this may take a while!)"
old_index.reindex new_index.name old_index.reindex new_index.name
LOG.info "done copying!" LOG.info "done copying!"
else
# fetch all the documents ever, up til start_time
cursor = klass.where(:updated_at.lte => start_time)
# import them to the new index
import_from_cursor(cursor, new_index, opts)
end end
# move the alias if necessary # move the alias if necessary
did_alias_move = move_alias_to(klass.tire.index.name, new_index) did_alias_move = move_alias_to(klass.tire.index.name, new_index)
t2 = Time.now
if did_alias_move then if did_alias_move then
# Reimport any source documents that got updated between t1 and t2, # Reimport any source documents that got updated since start_time,
# while the alias still pointed to the old index # while the alias still pointed to the old index.
LOG.info "importing any documents that changed between #{t1} and #{t2}" # Elasticsearch understands our document ids, so re-indexing the same
cursor = klass.where(:updated_at.gte => t1, :updated_at.lte => t2) # document won't create duplicates.
import_from_cursor(cursor, new_index, 200) LOG.info "importing any documents that changed between #{start_time} and now"
cursor = klass.where(:created_at.gte => start_time).union.where(:updated_at.gte => start_time)
import_from_cursor(cursor, new_index, opts)
end end
end end
end end
task :rebuild, [:classname] => :environment do |t, args| desc "Copies contents of MongoDB into Elasticsearch if updated in the last N minutes."
do_reindex(args[:classname], true) task :catchup, [:minutes, :batch_size, :sleep_time] => :environment do |t, args|
opts = batch_opts args
[CommentThread, Comment].each do |klass|
the_index = klass.tire.index
alias_ = Tire::Alias.find the_index.name
# this check makes sure we are working with the index to which
# the desired model's alias presently points.
raise RuntimeError, "could not find live index for #{klass.name}" if alias_.nil?
start_time = Time.now - (args[:minutes].to_i * 60)
cursor = klass.where(:updated_at.gte => start_time)
import_from_cursor(cursor, the_index, opts)
end
end end
task :reindex, [:classname] => :environment do |t, args| def batch_opts(args)
do_reindex(args[:classname]) args = args.to_hash
{ :batch_size => args[:batch_size].nil? ? 500 : args[:batch_size].to_i,
:sleep_time => args[:sleep_time].nil? ? 0 : args[:sleep_time].to_i }
end end
task :reindex_search => :environment do desc "Removes any data from Elasticsearch that no longer exists in MongoDB."
do_reindex("CommentThread") task :prune, [:batch_size, :sleep_time] => :environment do |t, args|
do_reindex("Comment") opts = batch_opts args
[CommentThread, Comment].each do |klass|
cnt = 0
the_index = klass.tire.index
puts "pruning #{the_index.name}"
alias_ = Tire::Alias.find the_index.name
# this check makes sure we are working with the index to which
# the desired model's alias presently points.
raise RuntimeError, "could not find live index for #{klass.name}" if alias_.nil?
scan_size = opts[:batch_size] / get_number_of_primary_shards(the_index.name)
search = Tire::Search::Scan.new the_index.name, size: scan_size
search.each do |results|
es_ids = results.map(&:id)
mongo_ids = klass.where(:id.in => es_ids).map {|d| d.id.to_s}
to_delete = es_ids - mongo_ids
if to_delete.size > 0
cnt += to_delete.size
puts "deleting #{to_delete.size} orphaned documents from elasticsearch"
the_index.bulk_delete (to_delete).map {|v| {"type" => klass.document_type, "id" => v}}
end
puts "#{the_index.name}: processed #{search.seen} of #{search.total}"
sleep opts[:sleep_time]
end
puts "done pruning #{the_index.name}, deleted a total of #{cnt} orphaned documents"
end
end end
task :create_search_indexes => :environment do desc "Generate a new physical index, copy data from MongoDB, and bring online."
[CommentThread, Comment].each { |klass| create_index_for_class(klass) } task :rebuild, [:classname, :batch_size, :sleep_time] => :environment do |t, args|
opts = batch_opts args
do_reindex(args[:classname], opts)
end end
task :add_anonymous_to_peers => :environment do desc "Perform a rebuild on both CommentThread and Comment, using the same options."
Content.collection.find(:anonymous_to_peers=>nil).update_all({"$set" => {'anonymous_to_peers' => false}}) task :rebuild_all, [:batch_size, :sleep_time] => :environment do |t, args|
opts = batch_opts args
[CommentThread, Comment].each { |klass| do_reindex(klass.name, opts) }
end
desc "Generate a new physical index, copy data from the existing index, and bring online."
task :reindex, [:classname, :batch_size, :sleep_time] => :environment do |t, args|
opts = batch_opts args
do_reindex(args[:classname], opts, true)
end
desc "Perform a reindex on both CommentThread and Comment, using the same options."
task :reindex_all , [:batch_size, :sleep_time] => :environment do |t, args|
opts = batch_opts args
[CommentThread, Comment].each { |klass| do_reindex(klass.name, opts, true) }
end
desc "Generate new, empty physical indexes, without bringing them online."
task :create_indexes => :environment do
[CommentThread, Comment].each { |klass| create_index_for_class(klass) }
end end
end end
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment