Commit 4d01f169 by Jim Abramson

Merge pull request #107 from edx/jsa/single-index

use a single index in elasticsearch for both document types.
parents db41f9ff ead2f5e3
......@@ -151,7 +151,6 @@ namespace :db do
"votes" => {"count" => 0, "down" => [], "down_count" => 0, "point" => 0, "up" => [], "up_count" => []}}
coll.insert(doc)
end
binding.pry
Tire.index('comment_threads').delete
CommentThread.create_elasticsearch_index
Tire.index('comment_threads') { import CommentThread.all }
......@@ -223,22 +222,33 @@ end
namespace :search do
def get_es_index
# we are using the same index for two types, which is against the
# grain of Tire's design. This is why this method works for both
# comment_threads and comments.
CommentThread.tire.index
end
def get_number_of_primary_shards(index_name)
res = Tire::Configuration.client.get "#{Tire::Configuration.url}/#{index_name}/_status"
status = JSON.parse res.body
status["indices"].first[1]["shards"].size
end
def create_index_for_class(klass)
def create_es_index
# create the new index with a unique name
new_index = Tire.index klass.tire.index.name << '_' << Time.now.strftime('%Y%m%d%H%M%S')
new_index = Tire.index "#{Content::ES_INDEX_NAME}_#{Time.now.strftime('%Y%m%d%H%M%S')}"
new_index.create
LOG.info "configuring new index: #{new_index.name}"
# apply the proper mapping and settings to the new index
new_index.create :mappings => klass.tire.mapping_to_hash, :settings => klass.tire.settings
[CommentThread, Comment].each do |klass|
LOG.info "applying index mappings for #{klass.name}"
klass.put_search_index_mapping new_index
end
new_index
end
def import_from_cursor(cursor, index, opts)
Mongoid.identity_map_enabled = true
tot = cursor.count
cnt = 0
t = Time.now
......@@ -249,6 +259,7 @@ namespace :search do
LOG.info "#{index.name}: imported #{cnt} of #{tot} (#{pct_complete}% complete after #{elapsed_secs} seconds)"
end
cnt += documents.length
Mongoid::IdentityMap.clear
sleep opts[:sleep_time]
documents
end
......@@ -281,15 +292,12 @@ namespace :search do
true
end
def do_reindex (classname, opts, in_place=false)
def do_reindex (opts, in_place=false)
# get a reference to the model class (and make sure it's a model class with tire hooks)
klass = CommentService.const_get(classname)
raise RuntimeError unless klass.instance_of? Class
raise RuntimeError unless klass.respond_to? "tire"
start_time = Time.now
# create the new index with a unique name
new_index = create_index_for_class(klass)
new_index = create_es_index
# unless the user is forcing a rebuild, or the index does not yet exist, we
# can do a Tire api reindex which is much faster than reimporting documents
# from mongo.
......@@ -298,7 +306,7 @@ namespace :search do
# for the model class when the app loaded if one did not already exist. However,
# it won't create an alias, which is what our app uses. So if the index exists
# but not the alias, we know that it's auto-created.
old_index = klass.tire.index
old_index = get_es_index
alias_name = old_index.name
alias_ = Tire::Alias.find alias_name
if alias_.nil? then
......@@ -306,19 +314,16 @@ namespace :search do
# the alias doesn't exist, so we know the index was auto-created.
# We will delete it and replace it with an alias.
raise RuntimeError, 'Cannot reindex in-place, no valid source index' if in_place
LOG.warn "deleting auto-created index to make room for the alias"
old_index.delete
# NOTE on the small chance that another process re-auto-creates the index
# we just deleted before we have a chance to create the alias, this next
# call will fail.
move_alias_to(alias_name, new_index)
move_alias_to(Content::ES_INDEX_NAME, new_index)
end
op = in_place ? "reindex" : "(re)build index for"
LOG.info "preparing to #{op} CommentService::#{classname}"
# ensure there's no identity mapping or caching going on while we do this
Mongoid.identity_map_enabled = false
Mongoid.unit_of_work(disable: :all) do
op = in_place ? "reindex" : "(re)build index"
LOG.info "preparing to #{op}"
if in_place then
# reindex, moving source documents directly from old index to new
......@@ -327,13 +332,13 @@ namespace :search do
LOG.info "done copying!"
else
# fetch all the documents ever, up til start_time
cursor = klass.where(:updated_at.lte => start_time)
cursor = Content.where(:_type.in => ["Comment", "CommentThread"], :updated_at.lte => start_time)
# import them to the new index
import_from_cursor(cursor, new_index, opts)
end
# move the alias if necessary
did_alias_move = move_alias_to(klass.tire.index.name, new_index)
did_alias_move = move_alias_to(Content::ES_INDEX_NAME, new_index)
if did_alias_move then
# Reimport any source documents that got updated since start_time,
......@@ -341,27 +346,23 @@ namespace :search do
# Elasticsearch understands our document ids, so re-indexing the same
# document won't create duplicates.
LOG.info "importing any documents that changed between #{start_time} and now"
cursor = klass.where(:created_at.gte => start_time).union.where(:updated_at.gte => start_time)
cursor = Content.where(:_type.in => ["Comment", "CommentThread"], :updated_at.gte => start_time)
import_from_cursor(cursor, new_index, opts)
end
end
end
desc "Copies contents of MongoDB into Elasticsearch if updated in the last N minutes."
task :catchup, [:minutes, :batch_size, :sleep_time] => :environment do |t, args|
opts = batch_opts args
[CommentThread, Comment].each do |klass|
the_index = klass.tire.index
the_index = get_es_index
alias_ = Tire::Alias.find the_index.name
# this check makes sure we are working with the index to which
# the desired model's alias presently points.
raise RuntimeError, "could not find live index for #{klass.name}" if alias_.nil?
raise RuntimeError, "could not find live index" if alias_.nil?
start_time = Time.now - (args[:minutes].to_i * 60)
cursor = klass.where(:updated_at.gte => start_time)
cursor = Content.where(:_type.in => ["Comment", "CommentThread"], :updated_at.gte => start_time)
import_from_cursor(cursor, the_index, opts)
end
end
def batch_opts(args)
args = args.to_hash
......@@ -372,59 +373,46 @@ namespace :search do
desc "Removes any data from Elasticsearch that no longer exists in MongoDB."
task :prune, [:batch_size, :sleep_time] => :environment do |t, args|
opts = batch_opts args
[CommentThread, Comment].each do |klass|
cnt = 0
the_index = klass.tire.index
the_index = get_es_index
puts "pruning #{the_index.name}"
alias_ = Tire::Alias.find the_index.name
raise RuntimeError, "could not find live index" if alias_.nil?
scan_size = opts[:batch_size] / get_number_of_primary_shards(the_index.name)
cnt = 0
[CommentThread, Comment].each do |klass|
doc_type = klass.document_type
# this check makes sure we are working with the index to which
# the desired model's alias presently points.
raise RuntimeError, "could not find live index for #{klass.name}" if alias_.nil?
scan_size = opts[:batch_size] / get_number_of_primary_shards(the_index.name)
search = Tire::Search::Scan.new the_index.name, size: scan_size
search = Tire::Search::Scan.new the_index.name, {size: scan_size, type: doc_type}
search.each do |results|
es_ids = results.map(&:id)
mongo_ids = klass.where(:id.in => es_ids).map {|d| d.id.to_s}
to_delete = es_ids - mongo_ids
if to_delete.size > 0
cnt += to_delete.size
puts "deleting #{to_delete.size} orphaned documents from elasticsearch"
the_index.bulk_delete (to_delete).map {|v| {"type" => klass.document_type, "id" => v}}
puts "deleting #{to_delete.size} orphaned #{doc_type} documents from elasticsearch"
the_index.bulk_delete (to_delete).map {|v| {"type" => doc_type, "id" => v}}
end
puts "#{the_index.name}: processed #{search.seen} of #{search.total}"
puts "#{the_index.name}/#{doc_type}: processed #{search.seen} of #{search.total}"
sleep opts[:sleep_time]
end
puts "done pruning #{the_index.name}, deleted a total of #{cnt} orphaned documents"
end
puts "done pruning #{the_index.name}, deleted a total of #{cnt} orphaned documents"
end
desc "Generate a new physical index, copy data from MongoDB, and bring online."
task :rebuild, [:classname, :batch_size, :sleep_time] => :environment do |t, args|
opts = batch_opts args
do_reindex(args[:classname], opts)
end
desc "Perform a rebuild on both CommentThread and Comment, using the same options."
task :rebuild_all, [:batch_size, :sleep_time] => :environment do |t, args|
opts = batch_opts args
[CommentThread, Comment].each { |klass| do_reindex(klass.name, opts) }
end
desc "Generate a new physical index, copy data from the existing index, and bring online."
task :reindex, [:classname, :batch_size, :sleep_time] => :environment do |t, args|
opts = batch_opts args
do_reindex(args[:classname], opts, true)
desc "Rebuild the content index from MongoDB data."
task :rebuild, [:batch_size, :sleep_time] => :environment do |t, args|
do_reindex(batch_opts(args))
end
desc "Perform a reindex on both CommentThread and Comment, using the same options."
task :reindex_all , [:batch_size, :sleep_time] => :environment do |t, args|
opts = batch_opts args
[CommentThread, Comment].each { |klass| do_reindex(klass.name, opts, true) }
desc "Rebuild the content index from already-indexed data (in place)."
task :reindex, [:batch_size, :sleep_time] => :environment do |t, args|
do_reindex(batch_opts(args), true)
end
desc "Generate new, empty physical indexes, without bringing them online."
task :create_indexes => :environment do
[CommentThread, Comment].each { |klass| create_index_for_class(klass) }
desc "Generate a new, empty physical index, without bringing it online."
task :create_index => :environment do
create_es_index
end
end
......
......@@ -53,6 +53,7 @@ CommentService.config = YAML.load(application_yaml).with_indifferent_access
Tire.configure do
url CommentService.config[:elasticsearch_server]
logger STDERR if ENV["ENABLE_ELASTICSEARCH_DEBUGGING"]
end
Mongoid.load!("config/mongoid.yml", environment)
......@@ -75,6 +76,10 @@ Dir[File.dirname(__FILE__) + '/lib/**/*.rb'].each {|file| require file}
Dir[File.dirname(__FILE__) + '/models/*.rb'].each {|file| require file}
Dir[File.dirname(__FILE__) + '/presenters/*.rb'].each {|file| require file}
# Ensure elasticsearch index mappings exist.
Comment.put_search_index_mapping
CommentThread.put_search_index_mapping
# Comment out observers until notifications are actually set up properly.
#Dir[File.dirname(__FILE__) + '/models/observers/*.rb'].each {|file| require file}
#Mongoid.observers = PostReplyObserver, PostTopicObserver, AtUserObserver
......
......@@ -30,9 +30,14 @@ class Comment < Content
include Tire::Model::Search
include Tire::Model::Callbacks
index_name Content::ES_INDEX_NAME
mapping do
indexes :body, type: :string, analyzer: :english, stored: true, term_vector: :with_positions_offsets
indexes :course_id, type: :string, index: :not_analyzed, included_in_all: false
indexes :comment_thread_id, type: :string, index: :not_analyzed, included_in_all: false, as: 'comment_thread_id'
indexes :commentable_id, type: :string, index: :not_analyzed, included_in_all: false, as: 'commentable_id'
indexes :group_id, type: :string, index: :not_analyzed, included_in_all: false, as: 'group_id'
indexes :created_at, type: :date, included_in_all: false
indexes :updated_at, type: :date, included_in_all: false
end
......@@ -111,6 +116,19 @@ class Comment < Content
t.commentable_id
end
end
rescue Mongoid::Errors::DocumentNotFound
nil
end
def group_id
if self.comment_thread_id
t = CommentThread.find self.comment_thread_id
if t
t.group_id
end
end
rescue Mongoid::Errors::DocumentNotFound
nil
end
def self.by_date_range_and_thread_ids from_when, to_when, thread_ids
......
......@@ -26,6 +26,8 @@ class CommentThread < Content
include Tire::Model::Search
include Tire::Model::Callbacks
index_name Content::ES_INDEX_NAME
mapping do
indexes :title, type: :string, analyzer: :english, boost: 5.0, stored: true, term_vector: :with_positions_offsets
indexes :body, type: :string, analyzer: :english, stored: true, term_vector: :with_positions_offsets
......@@ -97,10 +99,11 @@ class CommentThread < Content
#so first, find the comment threads associated with comments that hit the query
search = Tire::Search::Search.new 'comment_threads'
search = Tire::Search::Search.new Content::ES_INDEX_NAME
search.query {|query| query.match [:title, :body], params["text"]} if params["text"]
search.highlight({title: { number_of_fragments: 0 } } , {body: { number_of_fragments: 0 } }, options: { tag: "<highlight>" })
search.filter(:type, value: 'comment_thread')
search.filter(:term, commentable_id: params["commentable_id"]) if params["commentable_id"]
search.filter(:terms, commentable_id: params["commentable_ids"]) if params["commentable_ids"]
search.filter(:term, course_id: params["course_id"]) if params["course_id"]
......@@ -117,8 +120,9 @@ class CommentThread < Content
#again, b/c there is no relationship in ordinality, we cannot paginate if it's a text query
results = search.results
search = Tire::Search::Search.new 'comments'
search = Tire::Search::Search.new Content::ES_INDEX_NAME
search.query {|query| query.match :body, params["text"]} if params["text"]
search.filter(:type, value: 'comment')
search.filter(:term, course_id: params["course_id"]) if params["course_id"]
search.size CommentService.config["max_deep_search_comment_count"].to_i
......@@ -151,7 +155,8 @@ class CommentThread < Content
end
#now run one more search to harvest the threads and filter by group
search = Tire::Search::Search.new 'comment_threads'
search = Tire::Search::Search.new Content::ES_INDEX_NAME
search.filter(:type, value: 'comment_thread')
search.filter(:terms, :thread_id => thread_ids)
search.filter(:terms, commentable_id: params["commentable_ids"]) if params["commentable_ids"]
search.filter(:term, course_id: params["course_id"]) if params["course_id"]
......
......@@ -16,6 +16,16 @@ class Content
index({comment_thread_id: 1, endorsed: 1}, {sparse: true})
index({commentable_id: 1}, {sparse: true, background: true})
ES_INDEX_NAME = 'content'
def self.put_search_index_mapping(idx=nil)
idx ||= self.tire.index
success = idx.mapping(self.tire.document_type, {:properties => self.tire.mapping})
unless success
logger.warn "WARNING! could not apply search index mapping for #{self.name}"
end
end
before_save :set_username
def set_username
# avoid having to look this attribute up later, since it does not change
......
......@@ -9,8 +9,8 @@ describe "app" do
let(:author) { create_test_user(42) }
describe "GET /api/v1/search/threads" do
it "returns the correct values for total_results and num_pages", :focus => true do
course_id = "test_course_id"
it "returns the correct values for total_results and num_pages" do
course_id = "test/course/id"
for i in 1..100 do
text = "all"
text += " half" if i % 2 == 0
......@@ -24,8 +24,7 @@ describe "app" do
end
# Elasticsearch does not necessarily make newly indexed content
# available immediately, so we must explicitly refresh the index
CommentThread.tire.index.refresh
Comment.tire.index.refresh
refresh_es_index
test_text = lambda do |text, expected_total_results, expected_num_pages|
get "/api/v1/search/threads", course_id: course_id, text: text, per_page: "10"
......@@ -46,12 +45,12 @@ describe "app" do
# Elasticsearch may not be able to handle searching for non-ASCII text,
# so prepend the text with an ASCII term we can search for.
search_term = "artichoke"
course_id = "unicode_course"
course_id = "unicode/course"
thread = make_thread(author, "#{search_term} #{text}", course_id, "unicode_commentable")
make_comment(author, thread, text)
# Elasticsearch does not necessarily make newly indexed content
# available immediately, so we must explicitly refresh the index
CommentThread.tire.index.refresh
refresh_es_index
get "/api/v1/search/threads", course_id: course_id, text: search_term
last_response.should be_ok
result = parse(last_response.body)["collection"]
......
......@@ -28,6 +28,25 @@ def set_api_key_header
current_session.header "X-Edx-Api-Key", TEST_API_KEY
end
def delete_es_index
Tire.index Content::ES_INDEX_NAME do delete end
end
def create_es_index
new_index = Tire.index Content::ES_INDEX_NAME
new_index.create
[CommentThread, Comment].each do |klass|
klass.put_search_index_mapping
end
end
def refresh_es_index
# we are using the same index for two types, which is against the
# grain of Tire's design. This is why this method works for both
# comment_threads and comments.
CommentThread.tire.index.refresh
end
RSpec.configure do |config|
config.include Rack::Test::Methods
config.treat_symbols_as_metadata_keys_with_true_values = true
......@@ -36,10 +55,8 @@ RSpec.configure do |config|
config.before(:each) do
Mongoid::IdentityMap.clear
DatabaseCleaner.clean
[CommentThread, Comment].each do |class_|
class_.tire.index.delete
class_.create_elasticsearch_index
end
delete_es_index
create_es_index
end
end
......@@ -59,8 +76,8 @@ def init_without_subscriptions
[Comment, CommentThread, User, Notification, Subscription, Activity, Delayed::Backend::Mongoid::Job].each(&:delete_all).each(&:remove_indexes).each(&:create_indexes)
Content.mongo_session[:blocked_hash].drop
Tire.index 'comment_threads' do delete end
CommentThread.create_elasticsearch_index
delete_es_index
create_es_index
commentable = Commentable.new("question_1")
......@@ -140,8 +157,8 @@ end
def init_with_subscriptions
[Comment, CommentThread, User, Notification, Subscription, Activity, Delayed::Backend::Mongoid::Job].each(&:delete_all).each(&:remove_indexes).each(&:create_indexes)
Tire.index 'comment_threads' do delete end
CommentThread.create_elasticsearch_index
delete_es_index
create_es_index
user1 = create_test_user(1)
user2 = create_test_user(2)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment