Commit 56a20637 by Jim Abramson

Merge pull request #49 from edx/feature/jsa/ds-reindex

restore deep search with reindex fix
parents 7ff63463 65d0a329
......@@ -32,7 +32,7 @@ gem 'mongoid_magic_counter_cache', :git => 'https://github.com/dementrock/mongoi
gem 'kaminari', :require => 'kaminari/sinatra', :git => 'https://github.com/dementrock/kaminari.git'
gem 'faker'
gem 'will_paginate_mongoid'
gem 'rdiscount'
gem 'nokogiri'
......
......@@ -151,6 +151,10 @@ GEM
kgio (~> 2.6)
rack
raindrops (~> 0.7)
will_paginate (3.0.4)
will_paginate_mongoid (1.1.0)
mongoid (>= 2.4)
will_paginate (~> 3.0)
yajl-ruby (1.1.0)
PLATFORMS
......@@ -187,4 +191,5 @@ DEPENDENCIES
tire-contrib
unicorn
voteable_mongo!
will_paginate_mongoid
yajl-ruby
......@@ -10,6 +10,8 @@ Tire.configure do
url YAML.load(application_yaml)['elasticsearch_server']
end
LOG = Logger.new(STDERR)
desc "Load the environment"
task :environment do
environment = ENV["SINATRA_ENV"] || "development"
......@@ -152,11 +154,11 @@ namespace :db do
coll = db.collection("contents")
args[:num].to_i.times do
doc = {"_type" => "CommentThread", "anonymous" => [true, false].sample, "at_position_list" => [],
"tags_array" => [],
"comment_count" => 0, "title" => Faker::Lorem.sentence(6), "author_id" => rand(1..10).to_s,
"body" => Faker::Lorem.paragraphs.join("\n\n"), "course_id" => COURSE_ID, "created_at" => Time.now,
"commentable_id" => COURSE_ID, "closed" => [true, false].sample, "updated_at" => Time.now, "last_activity_at" => Time.now,
"votes" => {"count" => 0, "down" => [], "down_count" => 0, "point" => 0, "up" => [], "up_count" => []}}
"tags_array" => [],
"comment_count" => 0, "title" => Faker::Lorem.sentence(6), "author_id" => rand(1..10).to_s,
"body" => Faker::Lorem.paragraphs.join("\n\n"), "course_id" => COURSE_ID, "created_at" => Time.now,
"commentable_id" => COURSE_ID, "closed" => [true, false].sample, "updated_at" => Time.now, "last_activity_at" => Time.now,
"votes" => {"count" => 0, "down" => [], "down_count" => 0, "point" => 0, "up" => [], "up_count" => []}}
coll.insert(doc)
end
binding.pry
......@@ -223,46 +225,144 @@ namespace :db do
end
task :reindex_search => :environment do
Mongoid.identity_map_enabled = false
def create_index_for_class(klass)
# create the new index with a unique name
new_index = Tire.index klass.tire.index.name << '_' << Time.now.strftime('%Y%m%d%H%M%S')
LOG.info "configuring new index: #{new_index.name}"
# apply the proper mapping and settings to the new index
new_index.create :mappings => klass.tire.mapping_to_hash, :settings => klass.tire.settings
new_index
end
klass = CommentThread
ENV['CLASS'] = klass.name
ENV['INDEX'] = new_index = klass.tire.index.name << '_' << Time.now.strftime('%Y%m%d%H%M%S')
def import_from_cursor(cursor, index, page_size)
tot = cursor.count
cnt = 0
t = Time.now
index.import cursor, {:method => :paginate, :per_page => page_size} do |documents|
# GC.start call is backport of memory leak fix in more recent vers. of tire
# see https://github.com/karmi/tire/pull/658
GC.start
if cnt % 1000 == 0 then
elapsed_secs = (Time.now - t).round(2)
pct_complete = (100 * (cnt/tot.to_f)).round(2)
LOG.info "#{index.name}: imported #{cnt} of #{tot} (#{pct_complete}% complete after #{elapsed_secs} seconds)"
end
cnt += documents.length
documents
end
LOG.info "#{index.name}: finished importing #{cnt} documents"
cnt
end
Rake::Task["tire:import"].invoke
def move_alias_to(name, index)
# if there was a previous index, switch over the alias to point to the new index
alias_ = Tire::Alias.find name
if alias_ then
# does the alias already point to this index?
if alias_.indices.include? index.name then
return false
end
# remove the alias from wherever it points to now
LOG.info "alias already exists (will move): #{alias_.indices.to_ary.join(',')}"
alias_.indices.each do |old_index_name|
alias_.indices.delete old_index_name unless old_index_name == name
end
else
# create the alias
LOG.info "alias \"#{name}\" does not yet exist - creating."
alias_ = Tire::Alias.new :name => name
end
# point the alias at our new index
alias_.indices.add index.name
alias_.save
LOG.info "alias \"#{name}\" now points to index #{index.name}."
true
end
puts '[IMPORT] about to swap index'
if a = Tire::Alias.find(klass.tire.index.name)
puts "[IMPORT] aliases found: #{Tire::Alias.find(klass.tire.index.name).indices.to_ary.join(',')}. deleting."
old_indices = Tire::Alias.find(klass.tire.index.name).indices
old_indices.each do |index|
a.indices.delete index
def do_reindex (classname, force_rebuild=false)
# get a reference to the model class (and make sure it's a model class with tire hooks)
klass = CommentService.const_get(classname)
raise RuntimeError unless klass.instance_of? Class
raise RuntimeError unless klass.respond_to? "tire"
t1 = Time.now # we will need to refer back to this point in time later...
# create the new index with a unique name
new_index = create_index_for_class(klass)
# unless the user is forcing a rebuild, or the index does not yet exist, we
# can do a Tire api reindex which is much faster than reimporting documents
# from mongo.
#
# Checking if the index exists is tricky. Tire automatically created an index
# for the model class when the app loaded if one did not already exist. However,
# it won't create an alias, which is what our app uses. So if the index exists
# but not the alias, we know that it's auto-created.
old_index = klass.tire.index
alias_name = old_index.name
alias_ = Tire::Alias.find alias_name
if alias_.nil? then
# the alias doesn't exist, so we know the index was auto-created.
# We will delete it and replace it with an alias.
is_rebuild = true
old_index.delete
# NOTE on the small chance that another process re-auto-creates the index
# we just deleted before we have a chance to create the alias, this next
# call will fail.
move_alias_to(alias_name, new_index)
else
is_rebuild = force_rebuild
end
op = is_rebuild ? "(re)build index for" : "reindex"
LOG.info "preparing to #{op} CommentService::#{classname}"
# ensure there's no identity mapping or caching going on while we do this
Mongoid.identity_map_enabled = false
Mongoid.unit_of_work(disable: :all) do
if is_rebuild then
# fetch all the documents ever, up til t1
cursor = klass.where(:updated_at.lte => t1)
# import them to the new index
import_from_cursor(cursor, new_index, 200)
else
# reindex, moving source documents directly from old index to new
LOG.info "copying documents from original index (this may take a while!)"
old_index.reindex new_index.name
LOG.info "done copying!"
end
a.indices.add new_index
a.save
# move the alias if necessary
did_alias_move = move_alias_to(klass.tire.index.name, new_index)
t2 = Time.now
old_indices.each do |index|
puts "[IMPORT] deleting index: #{index}"
i = Tire::Index.new(index)
i.delete if i.exists?
if did_alias_move then
# Reimport any source documents that got updated between t1 and t2,
# while the alias still pointed to the old index
LOG.info "importing any documents that changed between #{t1} and #{t2}"
cursor = klass.where(:updated_at.gte => t1, :updated_at.lte => t2)
import_from_cursor(cursor, new_index, 200)
end
else
puts "[IMPORT] no aliases found. deleting index. creating new one and setting up alias."
klass.tire.index.delete
a = Tire::Alias.new
a.name(klass.tire.index.name)
a.index(new_index)
a.save
end
puts "[IMPORT] done. Index: '#{new_index}' created."
end
task :rebuild, [:classname] => :environment do |t, args|
do_reindex(args[:classname], true)
end
task :reindex, [:classname] => :environment do |t, args|
do_reindex(args[:classname])
end
task :reindex_search => :environment do
do_reindex("CommentThread")
do_reindex("Comment")
end
task :add_anonymous_to_peers => :environment do
Content.collection.find(:anonymous_to_peers=>nil).update_all({"$set" => {'anonymous_to_peers' => false}})
end
end
namespace :jobs do
......
......@@ -5,3 +5,4 @@ elasticsearch_server: <%= ENV['SEARCH_SERVER'] || 'http://localhost:9200' %>
cache_timeout:
threads_search: 10
threads_query: 10
max_deep_search_comment_count: 5000
require 'rest_client'
roots = {}
roots['development'] = "http://localhost:8000"
roots['test'] = "http://localhost:8000"
roots['production'] = "http://edx.org"
roots['staging'] = "http://stage.edx.org"
ROOT = roots[ENV['SINATRA_ENV']]
namespace :deep_search do
task :performance => :environment do
#USAGE
#SINATRA_ENV=development rake kpis:prolific
#or
#SINATRA_ENV=development bundle exec rake kpis:prolific
#create comment and thread bodies
bodies = []
50.times do |i|
bodies << (0...8).map{ ('a'..'z').to_a[rand(26)] }.join
end
parents = CommentThread.limit(100)
#now create comments and threads with hits
puts "Manufacturing Threads"
100.times do |j|
(1..5).to_a.sample.times do |i|
c = CommentThread.new
c.course_id = 'sample course'
c.title = 'sample title'
c.commentable_id = 'sample commetable'
c.body = bodies.sample
c.author = 1
c.save
end
end
puts "Manufacturing Comments"
100.times do |j|
(1..5).to_a.sample.times do |i|
c = Comment.new
c.course_id = 'sample course'
c.body = bodies.sample
c.comment_thread_id = parents.sample.id
c.author = 1
c.save
end
end
sort_keys = %w[date activity votes comments]
sort_order = "desc"
#set the sinatra env to test to avoid 401'ing
set :environment, :test
start_time = Time.now
puts "Starting test at #{start_time}"
1000.times do |i|
query_params = { course_id: "1", sort_key: sort_keys.sample, sort_order: sort_order, page: 1, per_page: 5, text: bodies.sample }
RestClient.get "#{PREFIX}/threads", params: query_params
end
end_time = Time.now
puts "Ending test at #{end_time}"
puts "Total Time: #{(end_time - start_time).to_f} seconds"
end
end
......@@ -17,6 +17,18 @@ class Comment < Content
field :at_position_list, type: Array, default: []
index({author_id: 1, course_id: 1})
include Tire::Model::Search
include Tire::Model::Callbacks
mapping do
indexes :body, type: :string, analyzer: :snowball, stored: true, term_vector: :with_positions_offsets
indexes :course_id, type: :string, index: :not_analyzed, included_in_all: false
#indexes :comment_thread_id, type: :string, stored: true, index: :not_analyzed, included_in_all: false
#current prod tire doesn't support indexing BSON ids, will reimplement when we upgrade
end
belongs_to :comment_thread, index: true
belongs_to :author, class_name: "User", index: true
......@@ -77,11 +89,11 @@ class Comment < Content
as_document.slice(*%w[body course_id endorsed anonymous anonymous_to_peers created_at updated_at at_position_list])
.merge("id" => _id)
.merge("user_id" => author_id)
.merge("username" => author.username)
.merge("username" => author.nil? ? "na" : author.username) # avoid crashing to_hash on orphaned comments
.merge("depth" => depth)
.merge("closed" => comment_thread.closed)
.merge("closed" => comment_thread.nil? ? false : comment_thread.closed) # ditto
.merge("thread_id" => comment_thread_id)
.merge("commentable_id" => comment_thread.commentable_id)
.merge("commentable_id" => comment_thread.nil? ? nil : comment_thread.commentable_id) # ditto
.merge("votes" => votes.slice(*%w[count up_count down_count point]))
.merge("abuse_flaggers" => abuse_flaggers)
.merge("type" => "comment")
......
......@@ -45,7 +45,8 @@ class CommentThread < Content
indexes :commentable_id, type: :string, index: :not_analyzed, included_in_all: false
indexes :author_id, type: :string, as: 'author_id', index: :not_analyzed, included_in_all: false
indexes :group_id, type: :integer, as: 'group_id', index: :not_analyzed, included_in_all: false
#indexes :pinned, type: :boolean, as: 'pinned', index: :not_analyzed, included_in_all: false
indexes :id, :index => :not_analyzed
indexes :thread_id, :analyzer => :keyword, :as => "_id"
end
belongs_to :author, class_name: "User", inverse_of: :comment_threads, index: true#, autosave: true
......@@ -93,6 +94,7 @@ class CommentThread < Content
end
def self.perform_search(params, options={})
page = [1, options[:page] || 1].max
per_page = options[:per_page] || 20
sort_key = options[:sort_key]
......@@ -104,30 +106,104 @@ class CommentThread < Content
return results
end
end
#GET /api/v1/search/threads?user_id=1&recursive=False&sort_key=date&│[2013-06-28 10:16:46,104][INFO ][plugins ] [Glamor] loaded [], sites []
#text=response&sort_order=desc&course_id=HarvardX%2FHLS1xD%2FCopyright&per_page=20&api_key=PUT_YOUR_API_KE│T1GYWxzZSZzb3J0X2tleT1kYXRlJnRleHQ9cmVzcG9uc2Umc29ydF9vcmRlcj1kZXNjJmNvdXJzZV9pZA==: initialized
#Y_HERE&page=1
#KChugh - Unfortunately, there's no algorithmically nice way to handle pagination with
#stitching together Comments and CommentThreads, because there is no determinstic relationship
#between the ordinality of comments and threads.
#the best solution is to find all of the thread ids for matching comment hits, and union them
#with the comment thread query, however, Tire does not support ORing a query key with a term filter
#so the 3rd best solution is to run two Tire searches (3 actually, one to query the comments, one to query the threads based on
#thread ids and the original thread search) and merge the results, uniqifying the results in the process.
#so first, find the comment threads associated with comments that hit the query
search = Tire::Search::Search.new 'comment_threads'
search.query {|query| query.text :_all, params["text"]} if params["text"]
search.highlight({title: { number_of_fragments: 0 } } , {body: { number_of_fragments: 0 } }, options: { tag: "<highlight>" })
search.filter(:bool, :must => params["tags"].split(/,/).map{ |tag| { :term => { :tags_array => tag } } }) if params["tags"]
search.filter(:term, commentable_id: params["commentable_id"]) if params["commentable_id"]
search.filter(:terms, commentable_id: params["commentable_ids"]) if params["commentable_ids"]
search.filter(:term, course_id: params["course_id"]) if params["course_id"]
if params["group_id"]
search.filter :or, [
{:not => {:exists => {:field => :group_id}}},
{:term => {:group_id => params["group_id"]}}
]
{:term => {:group_id => params["group_id"]}}
]
end
search.sort {|sort| sort.by sort_key, sort_order} if sort_key && sort_order #TODO should have search option 'auto sort or sth'
search.size per_page
search.from per_page * (page - 1)
#again, b/c there is no relationship in ordinality, we cannot paginate if it's a text query
if not params["text"]
search.size per_page
search.from per_page * (page - 1)
end
results = search.results
#if this is a search query, then also search the comments and harvest the matching comments
if params["text"]
search = Tire::Search::Search.new 'comments'
search.query {|query| query.text :_all, params["text"]} if params["text"]
search.filter(:term, course_id: params["course_id"]) if params["course_id"]
search.size CommentService.config["max_deep_search_comment_count"].to_i
#unforutnately, we cannot paginate here, b/c we don't know how the ordinality is totally
#unrelated to that of threads
c_results = search.results
comment_ids = c_results.collect{|c| c.id}.uniq
comments = Comment.where(:id.in => comment_ids)
thread_ids = comments.collect{|c| c.comment_thread_id}
#thread_ids = c_results.collect{|c| c.comment_thread_id}
#as soon as we can add comment thread id to the ES index, via Tire updgrade, we'll
#use ES instead of mongo to collect the thread ids
#use the elasticsearch index instead to avoid DB hit
original_thread_ids = results.collect{|r| r.id}
#now add the original search thread ids
thread_ids += original_thread_ids
thread_ids = thread_ids.uniq
#now run one more search to harvest the threads and filter by group
search = Tire::Search::Search.new 'comment_threads'
search.filter(:terms, :thread_id => thread_ids)
search.filter(:terms, commentable_id: params["commentable_ids"]) if params["commentable_ids"]
search.filter(:term, course_id: params["course_id"]) if params["course_id"]
search.size per_page
search.from per_page * (page - 1)
if params["group_id"]
search.filter :or, [
{:not => {:exists => {:field => :group_id}}},
{:term => {:group_id => params["group_id"]}}
]
end
search.sort {|sort| sort.by sort_key, sort_order} if sort_key && sort_order
results = search.results
end
if CommentService.config[:cache_enabled]
Sinatra::Application.cache.set(memcached_key, results, CommentService.config[:cache_timeout][:threads_search].to_i)
end
......@@ -181,7 +257,6 @@ class CommentThread < Content
"group_id" => group_id,
"pinned" => pinned?,
"endorsed" => endorsed?)
if params[:recursive]
doc = doc.merge("children" => root_comments.map{|c| c.to_hash(recursive: true)})
end
......
require 'spec_helper'
describe "app" do
describe "thread search" do
describe "GET /api/v1/search/threads" do
it "returns thread with query match" do
user = User.find 1
if user.nil?
user = create_test_user(1)
end
commentable = Commentable.new("question_1")
random_string = (0...8).map{ ('a'..'z').to_a[rand(26)] }.join
thread = CommentThread.new(title: "Test title", body: random_string, course_id: "1", commentable_id: commentable.id)
thread.author = user
thread.save!
sleep 3
get "/api/v1/search/threads", text: random_string
last_response.should be_ok
threads = parse(last_response.body)['collection']
threads.select{|t| t["id"].to_s == thread.id.to_s}.first.should_not be_nil
end
end
end
describe "comment search" do
describe "GET /api/v1/search/threads" do
it "returns thread with comment query match" do
user = User.find 1
if user.nil?
user = create_test_user(1)
end
commentable = Commentable.new("question_1")
random_string = (0...8).map{ ('a'..'z').to_a[rand(26)] }.join
thread = CommentThread.new(title: "Test title", body: "elephant otter", course_id: "1", commentable_id: commentable.id)
thread.author = user
thread.save!
sleep 3
comment = Comment.new(body: random_string, course_id: "1", commentable_id: commentable.id)
comment.author = user
comment.comment_thread = thread
comment.save!
sleep 1
get "/api/v1/search/threads", text: random_string
last_response.should be_ok
threads = parse(last_response.body)['collection']
threads.select{|t| t["id"].to_s == thread.id.to_s}.first.should_not be_nil
end
end
end
end
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment