Commit 5d9215d6 by jimabramson

reindex_search: refactoring and improvements

* properly handle when the index/alias don't yet exist
* don't lose content updates made while reindex/rebuild are in progress
* when possible, use tire api's reindex call for much more efficient index changes
* expose rake tasks to explicitly rebuild or reindex individual types
parent ad4eb30a
...@@ -10,6 +10,8 @@ Tire.configure do ...@@ -10,6 +10,8 @@ Tire.configure do
url YAML.load(application_yaml)['elasticsearch_server'] url YAML.load(application_yaml)['elasticsearch_server']
end end
LOG = Logger.new(STDERR)
desc "Load the environment" desc "Load the environment"
task :environment do task :environment do
environment = ENV["SINATRA_ENV"] || "development" environment = ENV["SINATRA_ENV"] || "development"
...@@ -152,11 +154,11 @@ namespace :db do ...@@ -152,11 +154,11 @@ namespace :db do
coll = db.collection("contents") coll = db.collection("contents")
args[:num].to_i.times do args[:num].to_i.times do
doc = {"_type" => "CommentThread", "anonymous" => [true, false].sample, "at_position_list" => [], doc = {"_type" => "CommentThread", "anonymous" => [true, false].sample, "at_position_list" => [],
"tags_array" => [], "tags_array" => [],
"comment_count" => 0, "title" => Faker::Lorem.sentence(6), "author_id" => rand(1..10).to_s, "comment_count" => 0, "title" => Faker::Lorem.sentence(6), "author_id" => rand(1..10).to_s,
"body" => Faker::Lorem.paragraphs.join("\n\n"), "course_id" => COURSE_ID, "created_at" => Time.now, "body" => Faker::Lorem.paragraphs.join("\n\n"), "course_id" => COURSE_ID, "created_at" => Time.now,
"commentable_id" => COURSE_ID, "closed" => [true, false].sample, "updated_at" => Time.now, "last_activity_at" => Time.now, "commentable_id" => COURSE_ID, "closed" => [true, false].sample, "updated_at" => Time.now, "last_activity_at" => Time.now,
"votes" => {"count" => 0, "down" => [], "down_count" => 0, "point" => 0, "up" => [], "up_count" => []}} "votes" => {"count" => 0, "down" => [], "down_count" => 0, "point" => 0, "up" => [], "up_count" => []}}
coll.insert(doc) coll.insert(doc)
end end
binding.pry binding.pry
...@@ -223,68 +225,138 @@ namespace :db do ...@@ -223,68 +225,138 @@ namespace :db do
end end
task :reindex_search => :environment do def create_index_for_class(klass)
logger = Logger.new(STDERR) # create the new index with a unique name
cutoff_dt = DateTime.now new_index = Tire.index klass.tire.index.name << '_' << Time.now.strftime('%Y%m%d%H%M%S')
klasses = [Comment, CommentThread] LOG.info "configuring new index: #{new_index.name}"
# apply the proper mapping and settings to the new index
new_index.create :mappings => klass.tire.mapping_to_hash, :settings => klass.tire.settings
new_index
end
def import_from_cursor(cursor, index, page_size)
tot = cursor.count
cnt = 0
t = Time.now
index.import cursor, {:method => :paginate, :per_page => page_size} do |documents|
# GC.start call is backport of memory leak fix in more recent vers. of tire
# see https://github.com/karmi/tire/pull/658
GC.start
if cnt % 1000 == 0 then
elapsed_secs = (Time.now - t).round(2)
pct_complete = (100 * (cnt/tot.to_f)).round(2)
LOG.info "#{index.name}: imported #{cnt} of #{tot} (#{pct_complete}% complete after #{elapsed_secs} seconds)"
end
cnt += documents.length
documents
end
LOG.info "#{index.name}: finished importing #{cnt} documents"
cnt
end
def move_alias_to(name, index)
# if there was a previous index, switch over the alias to point to the new index
LOG.info "moving alias \"#{name}\" to index #{index.name}"
alias_ = Tire::Alias.find name
if alias_ then
# remove the alias from wherever it points to now
LOG.info "found old index alias(es): #{alias_.indices.to_ary.join(',')}"
alias_.indices.each do |old_index_name|
alias_.indices.delete old_index_name unless old_index_name == name
end
else
# create the alias
LOG.info "alias \"#{name}\" does not yet exist - creating."
alias_ = Tire::Alias.new :name => name
end
# point the alias at our new index
alias_.indices.add index.name
alias_.save
LOG.info "alias successfully moved."
end
def do_reindex (classname, force_rebuild=false)
# get a reference to the model class (and make sure it's a model class with tire hooks)
klass = CommentService.const_get(classname)
raise RuntimeError unless klass.instance_of? Class
raise RuntimeError unless klass.respond_to? "tire"
t1 = Time.now # we will need to refer back to this point in time later...
# create the new index with a unique name
new_index = create_index_for_class(klass)
# unless the user is forcing a rebuild, or the index does not yet exist, we
# can do a Tire api reindex which is much faster than reimporting documents
# from mongo.
#
# Checking if the index exists is tricky. Tire automatically created an index
# for the model class when the app loaded if one did not already exist. However,
# it won't create an alias, which is what our app uses. So if the index exists
# but not the alias, we know that it's auto-created.
old_index = klass.tire.index
alias_name = old_index.name
alias_ = Tire::Alias.find alias_name
if alias_.nil? then
# the alias doesn't exist, so we know the index was auto-created.
# We will delete it and replace it with an alias.
is_rebuild = true
old_index.delete
# NOTE on the small chance that another process re-auto-creates the index
# we just deleted before we have a chance to create the alias, this next
# call will fail.
move_alias_to(alias_name, new_index)
else
is_rebuild = force_rebuild
end
op = is_rebuild ? "(re)build index for" : "reindex"
LOG.info "preparing to #{op} CommentService::#{classname}"
# ensure there's no identity mapping or caching going on while we do this
Mongoid.identity_map_enabled = false Mongoid.identity_map_enabled = false
Mongoid.unit_of_work(disable: :all) do Mongoid.unit_of_work(disable: :all) do
klasses.each do |klass|
## generate a versioned name for the rebuilt index
new_index = klass.tire.index.name << '_' << Time.now.strftime('%Y%m%d%H%M%S')
logger.info "[IMPORT] *BEGIN* importing #{klass.name}"
# find the number of docs to be indexed
tot = klass.where(:updated_at.lte => cutoff_dt).count
cnt = 0
t = Time.now
Tire.index new_index do
import klass.where(:updated_at.lte => DateTime.now), {:method => :paginate, :per_page => 200} do |documents|
GC.start
if cnt % 1000 == 0 then
logger.info "[IMPORT] indexed #{cnt} of #{tot} #{klass.name}s (#{(100 * (cnt/tot.to_f)).round(2)}% complete after #{((Time.now - t) / 60).round(2)} minutes)"
end
cnt += documents.length
documents
end
end
logger.info "[IMPORT] *DONE* imported #{klass.name}: #{cnt} documents in #{((Time.now - t) / 60).round(2)} minutes"
logger.info '[IMPORT] about to swap index'
if a = Tire::Alias.find(klass.tire.index.name)
logger.info "[IMPORT] aliases found: #{Tire::Alias.find(klass.tire.index.name).indices.to_ary.join(',')}. deleting."
old_indices = Tire::Alias.find(klass.tire.index.name).indices
old_indices.each do |index|
a.indices.delete index
end
a.indices.add new_index
a.save
old_indices.each do |index|
logger.info "[IMPORT] deleting index: #{index}"
i = Tire::Index.new(index)
i.delete if i.exists?
end
else
logger.info "[IMPORT] no aliases found. deleting index. Creating new one for #{klass} and setting up alias."
klass.tire.index.delete
a = Tire::Alias.new
a.name(klass.tire.index.name)
a.index(new_index)
a.save
end
logger.info "[IMPORT] done. Index: '#{new_index}' created." if is_rebuild then
# fetch all the documents ever, up til t1
cursor = klass.where(:updated_at.lte => t1)
# import them to the new index
import_from_cursor(cursor, new_index, 200)
else
# reindex, moving source documents directly from old index to new
LOG.info "copying documents from original index (this may take a while!)"
old_index.reindex new_index.name
LOG.info "done copying!"
end end
# move the alias
move_alias_to(klass.tire.index.name, new_index)
t2 = Time.now
# Reimport any source documents that got updated between t1 and t2,
# while the alias still pointed to the old index
LOG.info "importing any documents that changed between #{t1} and #{t2}"
cursor = klass.where(:updated_at.gte => t1, :updated_at.lte => t2)
import_from_cursor(cursor, new_index, 200)
end end
end
task :rebuild, [:classname] => :environment do |t, args|
do_reindex(args[:classname], true)
end
task :reindex, [:classname] => :environment do |t, args|
do_reindex(args[:classname])
end
task :reindex_search => :environment do
do_reindex("CommentThread")
do_reindex("Comment")
end end
task :add_anonymous_to_peers => :environment do task :add_anonymous_to_peers => :environment do
Content.collection.find(:anonymous_to_peers=>nil).update_all({"$set" => {'anonymous_to_peers' => false}}) Content.collection.find(:anonymous_to_peers=>nil).update_all({"$set" => {'anonymous_to_peers' => false}})
end end
end end
namespace :jobs do namespace :jobs do
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment