Commit 3f335264 by Clinton Blackburn Committed by Clinton Blackburn

Refactored rake tasks

- Cleaned up quotes
- Using factories to generate seed data
- Moved Elasticsearch functions into helper module
parent 438cbbc4
......@@ -4,7 +4,7 @@ require 'bundler'
Bundler.setup
Bundler.require
application_yaml = ERB.new(File.read("config/application.yml")).result()
application_yaml = ERB.new(File.read('config/application.yml')).result()
begin
......@@ -23,25 +23,22 @@ end
LOG = Logger.new(STDERR)
desc "Load the environment"
desc 'Load the environment'
task :environment do
environment = ENV["SINATRA_ENV"] || "development"
environment = ENV['SINATRA_ENV'] || 'development'
Sinatra::Base.environment = environment
Mongoid.load!("config/mongoid.yml")
Mongoid.load!('config/mongoid.yml')
Mongoid.logger.level = Logger::INFO
module CommentService
class << self; attr_accessor :config; end
class << self;
attr_accessor :config;
end
end
CommentService.config = YAML.load(application_yaml)
Dir[File.dirname(__FILE__) + '/lib/**/*.rb'].each {|file| require file}
Dir[File.dirname(__FILE__) + '/models/*.rb'].each {|file| require file}
#Dir[File.dirname(__FILE__) + '/models/observers/*.rb'].each {|file| require file}
#Mongoid.observers = PostReplyObserver, PostTopicObserver, AtUserObserver
#Mongoid.instantiate_observers
Dir[File.dirname(__FILE__) + '/lib/**/*.rb'].each { |file| require file }
Dir[File.dirname(__FILE__) + '/models/*.rb'].each { |file| require file }
end
Dir.glob('lib/tasks/*.rake').each { |r| import r }
......
module TaskHelpers
module ElasticsearchHelper
def self.create_index(name=nil)
name ||= "#{Content::ES_INDEX_NAME}_#{Time.now.strftime('%Y%m%d%H%M%S')}"
index = Tire.index(name)
LOG.info "Creating new index: #{name}..."
index.create
[CommentThread, Comment].each do |model|
LOG.info "Applying index mappings for #{model.name}"
model.put_search_index_mapping(index)
end
LOG.info '...done!'
index
end
def self.delete_index(name)
Tire.index(name).delete
end
def self.get_index
CommentThread.tire.index
end
def self.get_index_shard_count(name)
settings = Tire.index(name)
settings['index.number_of_shards']
end
end
end
require 'factory_girl'
namespace :db do
FactoryGirl.find_definitions
def create_test_user(id)
User.create!(external_id: id, username: "user#{id}")
end
task :init => :environment do
puts "recreating indexes..."
puts 'recreating indexes...'
[Comment, CommentThread, User, Notification, Subscription, Activity, Delayed::Backend::Mongoid::Job].each(&:remove_indexes).each(&:create_indexes)
puts "finished"
puts 'finished'
end
task :clean => :environment do
......@@ -21,10 +25,10 @@ namespace :db do
TOP_COMMENTS_PER_THREAD = 3
ADDITIONAL_COMMENTS_PER_THREAD = 5
COURSE_ID = "MITx/6.002x/2012_Fall"
COURSE_ID = 'MITx/6.002x/2012_Fall'
def generate_comments_for(commentable_id, num_threads=THREADS_PER_COMMENTABLE, num_top_comments=TOP_COMMENTS_PER_THREAD, num_subcomments=ADDITIONAL_COMMENTS_PER_THREAD)
level_limit = CommentService.config["level_limit"]
level_limit = CommentService.config['level_limit']
users = User.all.to_a
......@@ -38,47 +42,43 @@ namespace :db do
num_threads.times do
inner_top_comments = []
comment_thread = CommentThread.new(commentable_id: commentable_id, body: Faker::Lorem.paragraphs.join("\n\n"), title: Faker::Lorem.sentence(6))
comment_thread.author = users.sample
comment_thread.course_id = COURSE_ID
comment_thread.save!
# Create a new thread
comment_thread = FactoryGirl::create(:comment_thread, commentable_id: commentable_id, author: users.sample, course_id: COURSE_ID)
threads << comment_thread
# Subscribe a few users to the thread
users.sample(3).each { |user| user.subscribe(comment_thread) }
# Create a few top-level comments for the thread
(1 + rand(num_top_comments)).times do
comment = comment_thread.comments.new(body: Faker::Lorem.paragraph(2))
comment.author = users.sample
comment.endorsed = [true, false].sample
comment.comment_thread = comment_thread
comment.course_id = COURSE_ID
comment.save!
endorsed = [true, false].sample
comment = FactoryGirl::create(:comment, author: users.sample, comment_thread: comment_thread, endorsed: endorsed, course_id: COURSE_ID)
top_comments << comment
inner_top_comments << comment
end
previous_level_comments = inner_top_comments
# Created additional nested comments
parent_comments = inner_top_comments
(level_limit-1).times do
current_level_comments = []
(1 + rand(num_subcomments)).times do
comment = previous_level_comments.sample
sub_comment = comment.children.new(body: Faker::Lorem.paragraph(2))
sub_comment.author = users.sample
sub_comment.endorsed = [true, false].sample
sub_comment.comment_thread = comment_thread
sub_comment.course_id = COURSE_ID
sub_comment.save!
current_level_comments << sub_comment
parent = parent_comments.sample
endorsed = [true, false].sample
child = FactoryGirl::create(:comment, author: users.sample, parent: parent, endorsed: endorsed)
current_level_comments << child
end
previous_level_comments = current_level_comments
parent_comments = current_level_comments
end
end
puts "voting"
puts 'voting'
(threads + top_comments + additional_comments).each do |c|
users.each do |user|
user.vote(c, [:up, :down].sample)
end
end
puts "finished"
puts 'finished'
end
......@@ -90,13 +90,7 @@ namespace :db do
end
task :seed => :environment do
Comment.delete_all
CommentThread.delete_all
User.delete_all
Notification.delete_all
Subscription.delete_all
task :seed => [:environment, :clean] do
Tire.index 'comment_threads' do
delete
end
......@@ -105,9 +99,9 @@ namespace :db do
beginning_time = Time.now
(1..10).map { |id| create_test_user(id) }
generate_comments_for("video_1")
generate_comments_for("lab_1")
generate_comments_for("lab_2")
generate_comments_for('video_1')
generate_comments_for('lab_1')
generate_comments_for('lab_2')
end_time = Time.now
......@@ -119,7 +113,7 @@ namespace :db do
end
task :add_anonymous_to_peers => :environment do
Content.collection.find(:anonymous_to_peers => nil).update_all({"$set" => {'anonymous_to_peers' => false}})
Content.collection.find(:anonymous_to_peers => nil).update_all({'$set' => {anonymous_to_peers: false}})
end
end
namespace :search do
def get_es_index
# we are using the same index for two types, which is against the
# grain of Tire's design. This is why this method works for both
# comment_threads and comments.
CommentThread.tire.index
end
def get_number_of_primary_shards(index_name)
res = Tire::Configuration.client.get "#{Tire::Configuration.url}/#{index_name}/_status"
status = JSON.parse res.body
status["indices"].first[1]["shards"].size
end
def create_es_index
# create the new index with a unique name
new_index = Tire.index "#{Content::ES_INDEX_NAME}_#{Time.now.strftime('%Y%m%d%H%M%S')}"
new_index.create
LOG.info "configuring new index: #{new_index.name}"
[CommentThread, Comment].each do |klass|
LOG.info "applying index mappings for #{klass.name}"
klass.put_search_index_mapping new_index
end
new_index
end
require 'task_helpers'
namespace :search do
def import_from_cursor(cursor, index, opts)
tot = cursor.count
cnt = 0
......@@ -46,9 +22,9 @@ namespace :search do
def move_alias_to(name, index)
# if there was a previous index, switch over the alias to point to the new index
alias_ = Tire::Alias.find name
if alias_ then
if alias_
# does the alias already point to this index?
if alias_.indices.include? index.name then
if alias_.indices.include? index.name
return false
end
# remove the alias from wherever it points to now
......@@ -69,11 +45,11 @@ namespace :search do
end
def do_reindex (opts, in_place=false)
# get a reference to the model class (and make sure it's a model class with tire hooks)
start_time = Time.now
# create the new index with a unique name
new_index = create_es_index
new_index = TaskHelpers::ElasticsearchHelper.create_index
# unless the user is forcing a rebuild, or the index does not yet exist, we
# can do a Tire api reindex which is much faster than reimporting documents
# from mongo.
......@@ -82,33 +58,34 @@ namespace :search do
# for the model class when the app loaded if one did not already exist. However,
# it won't create an alias, which is what our app uses. So if the index exists
# but not the alias, we know that it's auto-created.
old_index = get_es_index
old_index = TaskHelpers::ElasticsearchHelper.get_index
alias_name = old_index.name
alias_ = Tire::Alias.find alias_name
if alias_.nil? then
if alias_.nil?
# edge case.
# the alias doesn't exist, so we know the index was auto-created.
# We will delete it and replace it with an alias.
raise RuntimeError, 'Cannot reindex in-place, no valid source index' if in_place
LOG.warn "deleting auto-created index to make room for the alias"
LOG.warn 'deleting auto-created index to make room for the alias'
old_index.delete
# NOTE on the small chance that another process re-auto-creates the index
# we just deleted before we have a chance to create the alias, this next
# call will fail.
move_alias_to(Content::ES_INDEX_NAME, new_index)
move_alias_to(Content::ES_INDEX_NAME, new_index_name)
end
op = in_place ? "reindex" : "(re)build index"
op = in_place ? 'reindex' : '(re)build index'
LOG.info "preparing to #{op}"
if in_place then
content_types = %w(Comment CommentThread)
if in_place
# reindex, moving source documents directly from old index to new
LOG.info "copying documents from original index (this may take a while!)"
LOG.info 'copying documents from original index (this may take a while!)'
old_index.reindex new_index.name
LOG.info "done copying!"
LOG.info 'done copying!'
else
# fetch all the documents ever, up til start_time
cursor = Content.where(:_type.in => ["Comment", "CommentThread"], :updated_at.lte => start_time)
cursor = Content.where(:_type.in => content_types, :updated_at.lte => start_time)
# import them to the new index
import_from_cursor(cursor, new_index, opts)
end
......@@ -116,44 +93,44 @@ namespace :search do
# move the alias if necessary
did_alias_move = move_alias_to(Content::ES_INDEX_NAME, new_index)
if did_alias_move then
if did_alias_move
# Reimport any source documents that got updated since start_time,
# while the alias still pointed to the old index.
# Elasticsearch understands our document ids, so re-indexing the same
# document won't create duplicates.
LOG.info "importing any documents that changed between #{start_time} and now"
cursor = Content.where(:_type.in => ["Comment", "CommentThread"], :updated_at.gte => start_time)
cursor = Content.where(:_type.in => content_types, :updated_at.gte => start_time)
import_from_cursor(cursor, new_index, opts)
end
end
desc "Copies contents of MongoDB into Elasticsearch if updated in the last N minutes."
desc 'Copies contents of MongoDB into Elasticsearch if updated in the last N minutes.'
task :catchup, [:minutes, :batch_size, :sleep_time] => :environment do |t, args|
opts = batch_opts args
the_index = get_es_index
the_index = TaskHelpers::ElasticsearchHelper.get_index
alias_ = Tire::Alias.find the_index.name
# this check makes sure we are working with the index to which
# the desired model's alias presently points.
raise RuntimeError, "could not find live index" if alias_.nil?
start_time = Time.now - (args[:minutes].to_i * 60)
cursor = Content.where(:_type.in => ["Comment", "CommentThread"], :updated_at.gte => start_time)
cursor = Content.where(:_type.in => %w(Comment CommentThread), :updated_at.gte => start_time)
import_from_cursor(cursor, the_index, opts)
end
def batch_opts(args)
args = args.to_hash
{ :batch_size => args[:batch_size].nil? ? 500 : args[:batch_size].to_i,
:sleep_time => args[:sleep_time].nil? ? 0 : args[:sleep_time].to_i }
{:batch_size => args[:batch_size].nil? ? 500 : args[:batch_size].to_i,
:sleep_time => args[:sleep_time].nil? ? 0 : args[:sleep_time].to_i}
end
desc "Removes any data from Elasticsearch that no longer exists in MongoDB."
desc 'Removes any data from Elasticsearch that no longer exists in MongoDB.'
task :prune, [:batch_size, :sleep_time] => :environment do |t, args|
opts = batch_opts args
the_index = get_es_index
the_index = TaskHelpers::ElasticsearchHelper.get_index
puts "pruning #{the_index.name}"
alias_ = Tire::Alias.find the_index.name
raise RuntimeError, "could not find live index" if alias_.nil?
scan_size = opts[:batch_size] / get_number_of_primary_shards(the_index.name)
raise RuntimeError, 'could not find live index' if alias_.nil?
scan_size = opts[:batch_size] / TaskHelpers::ElasticsearchHelper.get_index_shard_count(the_index.name)
cnt = 0
[CommentThread, Comment].each do |klass|
doc_type = klass.document_type
......@@ -162,12 +139,12 @@ namespace :search do
search = Tire::Search::Scan.new the_index.name, {size: scan_size, type: doc_type}
search.each do |results|
es_ids = results.map(&:id)
mongo_ids = klass.where(:id.in => es_ids).map {|d| d.id.to_s}
mongo_ids = klass.where(:id.in => es_ids).map { |d| d.id.to_s }
to_delete = es_ids - mongo_ids
if to_delete.size > 0
cnt += to_delete.size
puts "deleting #{to_delete.size} orphaned #{doc_type} documents from elasticsearch"
the_index.bulk_delete (to_delete).map {|v| {"type" => doc_type, "id" => v}}
the_index.bulk_delete (to_delete).map { |v| {"type" => doc_type, "id" => v} }
end
puts "#{the_index.name}/#{doc_type}: processed #{search.seen} of #{search.total}"
sleep opts[:sleep_time]
......@@ -176,19 +153,18 @@ namespace :search do
puts "done pruning #{the_index.name}, deleted a total of #{cnt} orphaned documents"
end
desc "Rebuild the content index from MongoDB data."
desc 'Rebuild the content index from MongoDB data.'
task :rebuild, [:batch_size, :sleep_time] => :environment do |t, args|
do_reindex(batch_opts(args))
end
desc "Rebuild the content index from already-indexed data (in place)."
desc 'Rebuild the content index from already-indexed data (in place).'
task :reindex, [:batch_size, :sleep_time] => :environment do |t, args|
do_reindex(batch_opts(args), true)
end
desc "Generate a new, empty physical index, without bringing it online."
desc 'Generate a new, empty physical index, without bringing it online.'
task :create_index => :environment do
create_es_index
TaskHelpers::ElasticsearchHelper.create_index
end
end
......@@ -40,5 +40,6 @@ FactoryGirl.define do
body { Faker::Lorem.paragraph }
course_id { comment_thread.course_id }
commentable_id { comment_thread.commentable_id }
endorsed false
end
end
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment