Commit 37aba9df by jsa
parent 45b1d661
......@@ -35,7 +35,7 @@ gem 'will_paginate_mongoid'
gem 'rdiscount'
gem 'nokogiri'
gem 'tire', "0.5.2"
gem 'tire', "0.6.2"
gem 'tire-contrib'
gem 'dalli'
......
......@@ -64,6 +64,7 @@ GEM
multi_json (~> 1.0)
ampex (2.0.0)
blankslate
ansi (1.4.3)
blankslate (2.1.2.4)
bson (1.6.4)
bson_ext (1.6.4)
......@@ -99,7 +100,7 @@ GEM
origin (~> 1.0)
tzinfo (~> 0.3.22)
moped (1.5.1)
multi_json (1.9.2)
multi_json (1.10.0)
newrelic_moped (0.0.6)
moped
newrelic_rpm (~> 3.6.0)
......@@ -121,7 +122,7 @@ GEM
rack (>= 1.0)
rack-timeout (0.1.0beta3)
raindrops (0.10.0)
rake (10.2.2)
rake (10.3.1)
rdiscount (1.6.8)
rest-client (1.6.7)
mime-types (>= 1.16)
......@@ -148,10 +149,12 @@ GEM
tilt (~> 1.1, != 1.3.0)
thor (0.16.0)
tilt (1.3.3)
tire (0.5.2)
tire (0.6.2)
activemodel (>= 3.0)
activesupport
ansi
hashr (~> 0.0.19)
multi_json (~> 1.0)
multi_json (~> 1.3)
rake
rest-client (~> 1.6)
tire-contrib (0.1.1)
......@@ -202,7 +205,7 @@ DEPENDENCIES
rspec
simplecov
sinatra
tire (= 0.5.2)
tire (= 0.6.2)
tire-contrib
unicorn
voteable_mongo!
......
......@@ -225,6 +225,21 @@ namespace :db do
end
task :add_anonymous_to_peers => :environment do
Content.collection.find(:anonymous_to_peers=>nil).update_all({"$set" => {'anonymous_to_peers' => false}})
end
end
namespace :search do
def get_number_of_primary_shards(index_name)
res = Tire::Configuration.client.get "#{Tire::Configuration.url}/#{index_name}/_status"
status = JSON.parse res.body
status["indices"].first[1]["shards"].size
end
def create_index_for_class(klass)
# create the new index with a unique name
new_index = Tire.index klass.tire.index.name << '_' << Time.now.strftime('%Y%m%d%H%M%S')
......@@ -234,20 +249,18 @@ namespace :db do
new_index
end
def import_from_cursor(cursor, index, page_size)
def import_from_cursor(cursor, index, opts)
tot = cursor.count
cnt = 0
t = Time.now
index.import cursor, {:method => :paginate, :per_page => page_size} do |documents|
# GC.start call is backport of memory leak fix in more recent vers. of tire
# see https://github.com/karmi/tire/pull/658
GC.start
if cnt % 1000 == 0 then
index.import cursor, {:method => :paginate, :per_page => opts[:batch_size]} do |documents|
if cnt % opts[:batch_size] == 0 then
elapsed_secs = (Time.now - t).round(2)
pct_complete = (100 * (cnt/tot.to_f)).round(2)
LOG.info "#{index.name}: imported #{cnt} of #{tot} (#{pct_complete}% complete after #{elapsed_secs} seconds)"
end
cnt += documents.length
sleep opts[:sleep_time]
documents
end
LOG.info "#{index.name}: finished importing #{cnt} documents"
......@@ -279,13 +292,13 @@ namespace :db do
true
end
def do_reindex (classname, force_rebuild=false)
def do_reindex (classname, opts, in_place=false)
# get a reference to the model class (and make sure it's a model class with tire hooks)
klass = CommentService.const_get(classname)
raise RuntimeError unless klass.instance_of? Class
raise RuntimeError unless klass.respond_to? "tire"
t1 = Time.now # we will need to refer back to this point in time later...
start_time = Time.now
# create the new index with a unique name
new_index = create_index_for_class(klass)
# unless the user is forcing a rebuild, or the index does not yet exist, we
......@@ -300,71 +313,129 @@ namespace :db do
alias_name = old_index.name
alias_ = Tire::Alias.find alias_name
if alias_.nil? then
# edge case.
# the alias doesn't exist, so we know the index was auto-created.
# We will delete it and replace it with an alias.
is_rebuild = true
raise RuntimeError, 'Cannot reindex in-place, no valid source index' if in_place
old_index.delete
# NOTE on the small chance that another process re-auto-creates the index
# we just deleted before we have a chance to create the alias, this next
# call will fail.
move_alias_to(alias_name, new_index)
else
is_rebuild = force_rebuild
end
op = is_rebuild ? "(re)build index for" : "reindex"
op = in_place ? "reindex" : "(re)build index for"
LOG.info "preparing to #{op} CommentService::#{classname}"
# ensure there's no identity mapping or caching going on while we do this
Mongoid.identity_map_enabled = false
Mongoid.unit_of_work(disable: :all) do
if is_rebuild then
# fetch all the documents ever, up til t1
cursor = klass.where(:updated_at.lte => t1)
# import them to the new index
import_from_cursor(cursor, new_index, 200)
else
if in_place then
# reindex, moving source documents directly from old index to new
LOG.info "copying documents from original index (this may take a while!)"
old_index.reindex new_index.name
LOG.info "done copying!"
else
# fetch all the documents ever, up til start_time
cursor = klass.where(:updated_at.lte => start_time)
# import them to the new index
import_from_cursor(cursor, new_index, opts)
end
# move the alias if necessary
did_alias_move = move_alias_to(klass.tire.index.name, new_index)
t2 = Time.now
if did_alias_move then
# Reimport any source documents that got updated between t1 and t2,
# while the alias still pointed to the old index
LOG.info "importing any documents that changed between #{t1} and #{t2}"
cursor = klass.where(:updated_at.gte => t1, :updated_at.lte => t2)
import_from_cursor(cursor, new_index, 200)
# Reimport any source documents that got updated since start_time,
# while the alias still pointed to the old index.
# Elasticsearch understands our document ids, so re-indexing the same
# document won't create duplicates.
LOG.info "importing any documents that changed between #{start_time} and now"
cursor = klass.where(:created_at.gte => start_time).union.where(:updated_at.gte => start_time)
import_from_cursor(cursor, new_index, opts)
end
end
end
task :rebuild, [:classname] => :environment do |t, args|
do_reindex(args[:classname], true)
desc "Copies contents of MongoDB into Elasticsearch if updated in the last N minutes."
task :catchup, [:minutes, :batch_size, :sleep_time] => :environment do |t, args|
opts = batch_opts args
[CommentThread, Comment].each do |klass|
the_index = klass.tire.index
alias_ = Tire::Alias.find the_index.name
# this check makes sure we are working with the index to which
# the desired model's alias presently points.
raise RuntimeError, "could not find live index for #{klass.name}" if alias_.nil?
start_time = Time.now - (args[:minutes].to_i * 60)
cursor = klass.where(:updated_at.gte => start_time)
import_from_cursor(cursor, the_index, opts)
end
end
task :reindex, [:classname] => :environment do |t, args|
do_reindex(args[:classname])
def batch_opts(args)
args = args.to_hash
{ :batch_size => args[:batch_size].nil? ? 500 : args[:batch_size].to_i,
:sleep_time => args[:sleep_time].nil? ? 0 : args[:sleep_time].to_i }
end
task :reindex_search => :environment do
do_reindex("CommentThread")
do_reindex("Comment")
desc "Removes any data from Elasticsearch that no longer exists in MongoDB."
task :prune, [:batch_size, :sleep_time] => :environment do |t, args|
opts = batch_opts args
[CommentThread, Comment].each do |klass|
cnt = 0
the_index = klass.tire.index
puts "pruning #{the_index.name}"
alias_ = Tire::Alias.find the_index.name
# this check makes sure we are working with the index to which
# the desired model's alias presently points.
raise RuntimeError, "could not find live index for #{klass.name}" if alias_.nil?
scan_size = opts[:batch_size] / get_number_of_primary_shards(the_index.name)
search = Tire::Search::Scan.new the_index.name, size: scan_size
search.each do |results|
es_ids = results.map(&:id)
mongo_ids = klass.where(:id.in => es_ids).map {|d| d.id.to_s}
to_delete = es_ids - mongo_ids
if to_delete.size > 0
cnt += to_delete.size
puts "deleting #{to_delete.size} orphaned documents from elasticsearch"
the_index.bulk_delete (to_delete).map {|v| {"type" => klass.document_type, "id" => v}}
end
puts "#{the_index.name}: processed #{search.seen} of #{search.total}"
sleep opts[:sleep_time]
end
puts "done pruning #{the_index.name}, deleted a total of #{cnt} orphaned documents"
end
end
task :create_search_indexes => :environment do
[CommentThread, Comment].each { |klass| create_index_for_class(klass) }
desc "Generate a new physical index, copy data from MongoDB, and bring online."
task :rebuild, [:classname, :batch_size, :sleep_time] => :environment do |t, args|
opts = batch_opts args
do_reindex(args[:classname], opts)
end
task :add_anonymous_to_peers => :environment do
Content.collection.find(:anonymous_to_peers=>nil).update_all({"$set" => {'anonymous_to_peers' => false}})
desc "Perform a rebuild on both CommentThread and Comment, using the same options."
task :rebuild_all, [:batch_size, :sleep_time] => :environment do |t, args|
opts = batch_opts args
[CommentThread, Comment].each { |klass| do_reindex(klass.name, opts) }
end
desc "Generate a new physical index, copy data from the existing index, and bring online."
task :reindex, [:classname, :batch_size, :sleep_time] => :environment do |t, args|
opts = batch_opts args
do_reindex(args[:classname], opts, true)
end
desc "Perform a reindex on both CommentThread and Comment, using the same options."
task :reindex_all , [:batch_size, :sleep_time] => :environment do |t, args|
opts = batch_opts args
[CommentThread, Comment].each { |klass| do_reindex(klass.name, opts, true) }
end
desc "Generate new, empty physical indexes, without bringing them online."
task :create_indexes => :environment do
[CommentThread, Comment].each { |klass| create_index_for_class(klass) }
end
end
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment