Commit d5a1919e by Clinton Blackburn

Merge pull request #174 from edx/clintonb/task-refactor

Refactored rake tasks
parents 438cbbc4 3f335264
...@@ -4,7 +4,7 @@ require 'bundler' ...@@ -4,7 +4,7 @@ require 'bundler'
Bundler.setup Bundler.setup
Bundler.require Bundler.require
application_yaml = ERB.new(File.read("config/application.yml")).result() application_yaml = ERB.new(File.read('config/application.yml')).result()
begin begin
...@@ -23,25 +23,22 @@ end ...@@ -23,25 +23,22 @@ end
LOG = Logger.new(STDERR) LOG = Logger.new(STDERR)
desc "Load the environment" desc 'Load the environment'
task :environment do task :environment do
environment = ENV["SINATRA_ENV"] || "development" environment = ENV['SINATRA_ENV'] || 'development'
Sinatra::Base.environment = environment Sinatra::Base.environment = environment
Mongoid.load!("config/mongoid.yml") Mongoid.load!('config/mongoid.yml')
Mongoid.logger.level = Logger::INFO Mongoid.logger.level = Logger::INFO
module CommentService module CommentService
class << self; attr_accessor :config; end class << self;
attr_accessor :config;
end
end end
CommentService.config = YAML.load(application_yaml) CommentService.config = YAML.load(application_yaml)
Dir[File.dirname(__FILE__) + '/lib/**/*.rb'].each {|file| require file} Dir[File.dirname(__FILE__) + '/lib/**/*.rb'].each { |file| require file }
Dir[File.dirname(__FILE__) + '/models/*.rb'].each {|file| require file} Dir[File.dirname(__FILE__) + '/models/*.rb'].each { |file| require file }
#Dir[File.dirname(__FILE__) + '/models/observers/*.rb'].each {|file| require file}
#Mongoid.observers = PostReplyObserver, PostTopicObserver, AtUserObserver
#Mongoid.instantiate_observers
end end
Dir.glob('lib/tasks/*.rake').each { |r| import r } Dir.glob('lib/tasks/*.rake').each { |r| import r }
......
module TaskHelpers
module ElasticsearchHelper
def self.create_index(name=nil)
name ||= "#{Content::ES_INDEX_NAME}_#{Time.now.strftime('%Y%m%d%H%M%S')}"
index = Tire.index(name)
LOG.info "Creating new index: #{name}..."
index.create
[CommentThread, Comment].each do |model|
LOG.info "Applying index mappings for #{model.name}"
model.put_search_index_mapping(index)
end
LOG.info '...done!'
index
end
def self.delete_index(name)
Tire.index(name).delete
end
def self.get_index
CommentThread.tire.index
end
def self.get_index_shard_count(name)
settings = Tire.index(name)
settings['index.number_of_shards']
end
end
end
require 'factory_girl'
namespace :db do namespace :db do
FactoryGirl.find_definitions
def create_test_user(id) def create_test_user(id)
User.create!(external_id: id, username: "user#{id}") User.create!(external_id: id, username: "user#{id}")
end end
task :init => :environment do task :init => :environment do
puts "recreating indexes..." puts 'recreating indexes...'
[Comment, CommentThread, User, Notification, Subscription, Activity, Delayed::Backend::Mongoid::Job].each(&:remove_indexes).each(&:create_indexes) [Comment, CommentThread, User, Notification, Subscription, Activity, Delayed::Backend::Mongoid::Job].each(&:remove_indexes).each(&:create_indexes)
puts "finished" puts 'finished'
end end
task :clean => :environment do task :clean => :environment do
...@@ -21,10 +25,10 @@ namespace :db do ...@@ -21,10 +25,10 @@ namespace :db do
TOP_COMMENTS_PER_THREAD = 3 TOP_COMMENTS_PER_THREAD = 3
ADDITIONAL_COMMENTS_PER_THREAD = 5 ADDITIONAL_COMMENTS_PER_THREAD = 5
COURSE_ID = "MITx/6.002x/2012_Fall" COURSE_ID = 'MITx/6.002x/2012_Fall'
def generate_comments_for(commentable_id, num_threads=THREADS_PER_COMMENTABLE, num_top_comments=TOP_COMMENTS_PER_THREAD, num_subcomments=ADDITIONAL_COMMENTS_PER_THREAD) def generate_comments_for(commentable_id, num_threads=THREADS_PER_COMMENTABLE, num_top_comments=TOP_COMMENTS_PER_THREAD, num_subcomments=ADDITIONAL_COMMENTS_PER_THREAD)
level_limit = CommentService.config["level_limit"] level_limit = CommentService.config['level_limit']
users = User.all.to_a users = User.all.to_a
...@@ -38,47 +42,43 @@ namespace :db do ...@@ -38,47 +42,43 @@ namespace :db do
num_threads.times do num_threads.times do
inner_top_comments = [] inner_top_comments = []
comment_thread = CommentThread.new(commentable_id: commentable_id, body: Faker::Lorem.paragraphs.join("\n\n"), title: Faker::Lorem.sentence(6)) # Create a new thread
comment_thread.author = users.sample comment_thread = FactoryGirl::create(:comment_thread, commentable_id: commentable_id, author: users.sample, course_id: COURSE_ID)
comment_thread.course_id = COURSE_ID
comment_thread.save!
threads << comment_thread threads << comment_thread
# Subscribe a few users to the thread
users.sample(3).each { |user| user.subscribe(comment_thread) } users.sample(3).each { |user| user.subscribe(comment_thread) }
# Create a few top-level comments for the thread
(1 + rand(num_top_comments)).times do (1 + rand(num_top_comments)).times do
comment = comment_thread.comments.new(body: Faker::Lorem.paragraph(2)) endorsed = [true, false].sample
comment.author = users.sample comment = FactoryGirl::create(:comment, author: users.sample, comment_thread: comment_thread, endorsed: endorsed, course_id: COURSE_ID)
comment.endorsed = [true, false].sample
comment.comment_thread = comment_thread
comment.course_id = COURSE_ID
comment.save!
top_comments << comment top_comments << comment
inner_top_comments << comment inner_top_comments << comment
end end
previous_level_comments = inner_top_comments
# Created additional nested comments
parent_comments = inner_top_comments
(level_limit-1).times do (level_limit-1).times do
current_level_comments = [] current_level_comments = []
(1 + rand(num_subcomments)).times do (1 + rand(num_subcomments)).times do
comment = previous_level_comments.sample parent = parent_comments.sample
sub_comment = comment.children.new(body: Faker::Lorem.paragraph(2)) endorsed = [true, false].sample
sub_comment.author = users.sample child = FactoryGirl::create(:comment, author: users.sample, parent: parent, endorsed: endorsed)
sub_comment.endorsed = [true, false].sample current_level_comments << child
sub_comment.comment_thread = comment_thread
sub_comment.course_id = COURSE_ID
sub_comment.save!
current_level_comments << sub_comment
end end
previous_level_comments = current_level_comments parent_comments = current_level_comments
end end
end end
puts "voting" puts 'voting'
(threads + top_comments + additional_comments).each do |c| (threads + top_comments + additional_comments).each do |c|
users.each do |user| users.each do |user|
user.vote(c, [:up, :down].sample) user.vote(c, [:up, :down].sample)
end end
end end
puts "finished" puts 'finished'
end end
...@@ -90,13 +90,7 @@ namespace :db do ...@@ -90,13 +90,7 @@ namespace :db do
end end
task :seed => :environment do task :seed => [:environment, :clean] do
Comment.delete_all
CommentThread.delete_all
User.delete_all
Notification.delete_all
Subscription.delete_all
Tire.index 'comment_threads' do Tire.index 'comment_threads' do
delete delete
end end
...@@ -105,9 +99,9 @@ namespace :db do ...@@ -105,9 +99,9 @@ namespace :db do
beginning_time = Time.now beginning_time = Time.now
(1..10).map { |id| create_test_user(id) } (1..10).map { |id| create_test_user(id) }
generate_comments_for("video_1") generate_comments_for('video_1')
generate_comments_for("lab_1") generate_comments_for('lab_1')
generate_comments_for("lab_2") generate_comments_for('lab_2')
end_time = Time.now end_time = Time.now
...@@ -119,7 +113,7 @@ namespace :db do ...@@ -119,7 +113,7 @@ namespace :db do
end end
task :add_anonymous_to_peers => :environment do task :add_anonymous_to_peers => :environment do
Content.collection.find(:anonymous_to_peers => nil).update_all({"$set" => {'anonymous_to_peers' => false}}) Content.collection.find(:anonymous_to_peers => nil).update_all({'$set' => {anonymous_to_peers: false}})
end end
end end
namespace :search do require 'task_helpers'
def get_es_index
# we are using the same index for two types, which is against the
# grain of Tire's design. This is why this method works for both
# comment_threads and comments.
CommentThread.tire.index
end
def get_number_of_primary_shards(index_name)
res = Tire::Configuration.client.get "#{Tire::Configuration.url}/#{index_name}/_status"
status = JSON.parse res.body
status["indices"].first[1]["shards"].size
end
def create_es_index
# create the new index with a unique name
new_index = Tire.index "#{Content::ES_INDEX_NAME}_#{Time.now.strftime('%Y%m%d%H%M%S')}"
new_index.create
LOG.info "configuring new index: #{new_index.name}"
[CommentThread, Comment].each do |klass|
LOG.info "applying index mappings for #{klass.name}"
klass.put_search_index_mapping new_index
end
new_index
end
namespace :search do
def import_from_cursor(cursor, index, opts) def import_from_cursor(cursor, index, opts)
tot = cursor.count tot = cursor.count
cnt = 0 cnt = 0
...@@ -46,9 +22,9 @@ namespace :search do ...@@ -46,9 +22,9 @@ namespace :search do
def move_alias_to(name, index) def move_alias_to(name, index)
# if there was a previous index, switch over the alias to point to the new index # if there was a previous index, switch over the alias to point to the new index
alias_ = Tire::Alias.find name alias_ = Tire::Alias.find name
if alias_ then if alias_
# does the alias already point to this index? # does the alias already point to this index?
if alias_.indices.include? index.name then if alias_.indices.include? index.name
return false return false
end end
# remove the alias from wherever it points to now # remove the alias from wherever it points to now
...@@ -69,11 +45,11 @@ namespace :search do ...@@ -69,11 +45,11 @@ namespace :search do
end end
def do_reindex (opts, in_place=false) def do_reindex (opts, in_place=false)
# get a reference to the model class (and make sure it's a model class with tire hooks)
start_time = Time.now start_time = Time.now
# create the new index with a unique name # create the new index with a unique name
new_index = create_es_index new_index = TaskHelpers::ElasticsearchHelper.create_index
# unless the user is forcing a rebuild, or the index does not yet exist, we # unless the user is forcing a rebuild, or the index does not yet exist, we
# can do a Tire api reindex which is much faster than reimporting documents # can do a Tire api reindex which is much faster than reimporting documents
# from mongo. # from mongo.
...@@ -82,33 +58,34 @@ namespace :search do ...@@ -82,33 +58,34 @@ namespace :search do
# for the model class when the app loaded if one did not already exist. However, # for the model class when the app loaded if one did not already exist. However,
# it won't create an alias, which is what our app uses. So if the index exists # it won't create an alias, which is what our app uses. So if the index exists
# but not the alias, we know that it's auto-created. # but not the alias, we know that it's auto-created.
old_index = get_es_index old_index = TaskHelpers::ElasticsearchHelper.get_index
alias_name = old_index.name alias_name = old_index.name
alias_ = Tire::Alias.find alias_name alias_ = Tire::Alias.find alias_name
if alias_.nil? then if alias_.nil?
# edge case. # edge case.
# the alias doesn't exist, so we know the index was auto-created. # the alias doesn't exist, so we know the index was auto-created.
# We will delete it and replace it with an alias. # We will delete it and replace it with an alias.
raise RuntimeError, 'Cannot reindex in-place, no valid source index' if in_place raise RuntimeError, 'Cannot reindex in-place, no valid source index' if in_place
LOG.warn "deleting auto-created index to make room for the alias" LOG.warn 'deleting auto-created index to make room for the alias'
old_index.delete old_index.delete
# NOTE on the small chance that another process re-auto-creates the index # NOTE on the small chance that another process re-auto-creates the index
# we just deleted before we have a chance to create the alias, this next # we just deleted before we have a chance to create the alias, this next
# call will fail. # call will fail.
move_alias_to(Content::ES_INDEX_NAME, new_index) move_alias_to(Content::ES_INDEX_NAME, new_index_name)
end end
op = in_place ? "reindex" : "(re)build index" op = in_place ? 'reindex' : '(re)build index'
LOG.info "preparing to #{op}" LOG.info "preparing to #{op}"
if in_place then content_types = %w(Comment CommentThread)
if in_place
# reindex, moving source documents directly from old index to new # reindex, moving source documents directly from old index to new
LOG.info "copying documents from original index (this may take a while!)" LOG.info 'copying documents from original index (this may take a while!)'
old_index.reindex new_index.name old_index.reindex new_index.name
LOG.info "done copying!" LOG.info 'done copying!'
else else
# fetch all the documents ever, up til start_time # fetch all the documents ever, up til start_time
cursor = Content.where(:_type.in => ["Comment", "CommentThread"], :updated_at.lte => start_time) cursor = Content.where(:_type.in => content_types, :updated_at.lte => start_time)
# import them to the new index # import them to the new index
import_from_cursor(cursor, new_index, opts) import_from_cursor(cursor, new_index, opts)
end end
...@@ -116,44 +93,44 @@ namespace :search do ...@@ -116,44 +93,44 @@ namespace :search do
# move the alias if necessary # move the alias if necessary
did_alias_move = move_alias_to(Content::ES_INDEX_NAME, new_index) did_alias_move = move_alias_to(Content::ES_INDEX_NAME, new_index)
if did_alias_move then if did_alias_move
# Reimport any source documents that got updated since start_time, # Reimport any source documents that got updated since start_time,
# while the alias still pointed to the old index. # while the alias still pointed to the old index.
# Elasticsearch understands our document ids, so re-indexing the same # Elasticsearch understands our document ids, so re-indexing the same
# document won't create duplicates. # document won't create duplicates.
LOG.info "importing any documents that changed between #{start_time} and now" LOG.info "importing any documents that changed between #{start_time} and now"
cursor = Content.where(:_type.in => ["Comment", "CommentThread"], :updated_at.gte => start_time) cursor = Content.where(:_type.in => content_types, :updated_at.gte => start_time)
import_from_cursor(cursor, new_index, opts) import_from_cursor(cursor, new_index, opts)
end end
end end
desc "Copies contents of MongoDB into Elasticsearch if updated in the last N minutes." desc 'Copies contents of MongoDB into Elasticsearch if updated in the last N minutes.'
task :catchup, [:minutes, :batch_size, :sleep_time] => :environment do |t, args| task :catchup, [:minutes, :batch_size, :sleep_time] => :environment do |t, args|
opts = batch_opts args opts = batch_opts args
the_index = get_es_index the_index = TaskHelpers::ElasticsearchHelper.get_index
alias_ = Tire::Alias.find the_index.name alias_ = Tire::Alias.find the_index.name
# this check makes sure we are working with the index to which # this check makes sure we are working with the index to which
# the desired model's alias presently points. # the desired model's alias presently points.
raise RuntimeError, "could not find live index" if alias_.nil? raise RuntimeError, "could not find live index" if alias_.nil?
start_time = Time.now - (args[:minutes].to_i * 60) start_time = Time.now - (args[:minutes].to_i * 60)
cursor = Content.where(:_type.in => ["Comment", "CommentThread"], :updated_at.gte => start_time) cursor = Content.where(:_type.in => %w(Comment CommentThread), :updated_at.gte => start_time)
import_from_cursor(cursor, the_index, opts) import_from_cursor(cursor, the_index, opts)
end end
def batch_opts(args) def batch_opts(args)
args = args.to_hash args = args.to_hash
{ :batch_size => args[:batch_size].nil? ? 500 : args[:batch_size].to_i, {:batch_size => args[:batch_size].nil? ? 500 : args[:batch_size].to_i,
:sleep_time => args[:sleep_time].nil? ? 0 : args[:sleep_time].to_i } :sleep_time => args[:sleep_time].nil? ? 0 : args[:sleep_time].to_i}
end end
desc "Removes any data from Elasticsearch that no longer exists in MongoDB." desc 'Removes any data from Elasticsearch that no longer exists in MongoDB.'
task :prune, [:batch_size, :sleep_time] => :environment do |t, args| task :prune, [:batch_size, :sleep_time] => :environment do |t, args|
opts = batch_opts args opts = batch_opts args
the_index = get_es_index the_index = TaskHelpers::ElasticsearchHelper.get_index
puts "pruning #{the_index.name}" puts "pruning #{the_index.name}"
alias_ = Tire::Alias.find the_index.name alias_ = Tire::Alias.find the_index.name
raise RuntimeError, "could not find live index" if alias_.nil? raise RuntimeError, 'could not find live index' if alias_.nil?
scan_size = opts[:batch_size] / get_number_of_primary_shards(the_index.name) scan_size = opts[:batch_size] / TaskHelpers::ElasticsearchHelper.get_index_shard_count(the_index.name)
cnt = 0 cnt = 0
[CommentThread, Comment].each do |klass| [CommentThread, Comment].each do |klass|
doc_type = klass.document_type doc_type = klass.document_type
...@@ -162,12 +139,12 @@ namespace :search do ...@@ -162,12 +139,12 @@ namespace :search do
search = Tire::Search::Scan.new the_index.name, {size: scan_size, type: doc_type} search = Tire::Search::Scan.new the_index.name, {size: scan_size, type: doc_type}
search.each do |results| search.each do |results|
es_ids = results.map(&:id) es_ids = results.map(&:id)
mongo_ids = klass.where(:id.in => es_ids).map {|d| d.id.to_s} mongo_ids = klass.where(:id.in => es_ids).map { |d| d.id.to_s }
to_delete = es_ids - mongo_ids to_delete = es_ids - mongo_ids
if to_delete.size > 0 if to_delete.size > 0
cnt += to_delete.size cnt += to_delete.size
puts "deleting #{to_delete.size} orphaned #{doc_type} documents from elasticsearch" puts "deleting #{to_delete.size} orphaned #{doc_type} documents from elasticsearch"
the_index.bulk_delete (to_delete).map {|v| {"type" => doc_type, "id" => v}} the_index.bulk_delete (to_delete).map { |v| {"type" => doc_type, "id" => v} }
end end
puts "#{the_index.name}/#{doc_type}: processed #{search.seen} of #{search.total}" puts "#{the_index.name}/#{doc_type}: processed #{search.seen} of #{search.total}"
sleep opts[:sleep_time] sleep opts[:sleep_time]
...@@ -176,19 +153,18 @@ namespace :search do ...@@ -176,19 +153,18 @@ namespace :search do
puts "done pruning #{the_index.name}, deleted a total of #{cnt} orphaned documents" puts "done pruning #{the_index.name}, deleted a total of #{cnt} orphaned documents"
end end
desc "Rebuild the content index from MongoDB data." desc 'Rebuild the content index from MongoDB data.'
task :rebuild, [:batch_size, :sleep_time] => :environment do |t, args| task :rebuild, [:batch_size, :sleep_time] => :environment do |t, args|
do_reindex(batch_opts(args)) do_reindex(batch_opts(args))
end end
desc "Rebuild the content index from already-indexed data (in place)." desc 'Rebuild the content index from already-indexed data (in place).'
task :reindex, [:batch_size, :sleep_time] => :environment do |t, args| task :reindex, [:batch_size, :sleep_time] => :environment do |t, args|
do_reindex(batch_opts(args), true) do_reindex(batch_opts(args), true)
end end
desc "Generate a new, empty physical index, without bringing it online." desc 'Generate a new, empty physical index, without bringing it online.'
task :create_index => :environment do task :create_index => :environment do
create_es_index TaskHelpers::ElasticsearchHelper.create_index
end end
end end
...@@ -40,5 +40,6 @@ FactoryGirl.define do ...@@ -40,5 +40,6 @@ FactoryGirl.define do
body { Faker::Lorem.paragraph } body { Faker::Lorem.paragraph }
course_id { comment_thread.course_id } course_id { comment_thread.course_id }
commentable_id { comment_thread.commentable_id } commentable_id { comment_thread.commentable_id }
endorsed false
end end
end end
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment