Commit b1f0109b by Vik Paruchuri

formatting changes, removed unneeded files, and moved tests to tests dir

parent c070a02f
.idea/
__pycache__/
models/
*.pyc
*~
Project to integrate machine learning based essay scoring with xserver. Aspell must be installed and added to path to run. numpy, scipy, sklearn, and nltk also need to be installed.
Nltk also requires the treebank maxent tagger and wordnet to be installed. These can be installed through the nltk downloader(nltk.download()), or programatically through python -m nltk.downloader maxent_treebank_pos_tagger wordnet .
Runnable files:
1. create_test_models.py
Generates test models when used like: python create_test_models.py train_file prompt_file model_path. Use python create_test_models.py train.tsv prompt.txt models/essay_set_1.p to generate a model using sample data.
2. test_server_code/pyxserver_wsgi.py
Starts a server instance that can be sent answers to score. Calls grade.py to score responses. Run server with gunicorn -w 4 -b 127.0.0.1:3031 pyxserver_wsgi:application . This can also be run
3. tests/test.py
Submits test data found in directories within the tests folder to the xserver and displays results. See tests/simple_essay for an example of how to format files. You need payload.json, wrong.txt, and answer.txt to make a test.
#Run with arguments train_file prompt_file model_path to generate a sample model file
import os
import sys
import argparse
base_path = os.path.dirname(__file__)
sys.path.append(base_path)
import model_creator
def main(argv):
parser = argparse.ArgumentParser(description="Generate model from test data files")
parser.add_argument('train_file')
parser.add_argument('prompt_file')
parser.add_argument('model_path')
args = parser.parse_args(argv)
score, text = model_creator.read_in_test_data(args.train_file)
prompt_string = model_creator.read_in_test_prompt(args.prompt_file)
e_set = model_creator.create_essay_set(text, score, prompt_string)
feature_ext, classifier = model_creator.extract_features_and_generate_model(e_set)
model_creator.dump_model_to_file(prompt_string, feature_ext, classifier, args.model_path)
if __name__ == "__main__":
main(sys.argv[1:])
#Defines an essay set object, which encapsulates essays from training and test sets.
#Performs spell and grammar checking, tokenization, and stemming.
import numpy
import nltk
import sys
import random
import os
base_path = os.path.dirname(__file__)
sys.path.append(base_path)
import util_functions
class EssaySet:
def __init__(self, type="train"):
"""
Initialize variables and check essay set type
"""
if(type != "train" and type != "test"):
type = "train"
self._type = type
self._score, self._text, self._id, self._clean_text, self._tokens, self._pos,\
self._clean_stem_text, self._generated = [], [], [], [], [], [], [], []
self._prompt = ""
def add_essay(self, essay_text, essay_score, essay_generated=0):
"""
Add new (essay_text,essay_score) pair to the essay set.
essay_text must be a string.
essay_score must be an int.
essay_generated should not be changed by the user.
Returns a confirmation that essay was added.
"""
#Get maximum current essay id, or set to 0 if this is the first essay added
if(len(self._id) > 0):
max_id = max(self._id)
else:
max_id = 0
#Verify that essay_score is an int, essay_text is a string, and essay_generated equals 0 or 1
if type(essay_score) == type(0) and type(essay_text) == type("text")\
and (essay_generated == 0 or essay_generated == 1):
self._id.append(max_id + 1)
self._score.append(essay_score)
#Clean text by removing non digit/work/punctuation characters
self._text.append(util_functions.sub_chars(essay_text).lower())
#Spell correct text using aspell
self._clean_text.append(util_functions.spell_correct(self._text[len(self._text) - 1]))
#Tokenize text
self._tokens.append(nltk.word_tokenize(self._clean_text[len(self._clean_text) - 1]))
#Part of speech tag text
self._pos.append(nltk.pos_tag(self._tokens[len(self._tokens) - 1]))
self._generated.append(essay_generated)
#Stem spell corrected text
porter = nltk.PorterStemmer()
por_toks = " ".join([porter.stem(w) for w in self._tokens[len(self._tokens) - 1]])
self._clean_stem_text.append(por_toks)
ret = "text: " + self._text[len(self._text) - 1] + " score: " + str(essay_score)
else:
raise util_functions.InputError(essay_text, "arguments need to be in format "
"(text,score). text needs to be string,"
" score needs to be int.")
return ret
def update_prompt(self, prompt_text):
"""
Update the default prompt string, which is "".
prompt_text should be a string.
Returns the prompt as a confirmation.
"""
if(type(prompt_text) == type("text")):
self._prompt = util_functions.sub_chars(prompt_text)
ret = self._prompt
else:
raise util_functions.InputError(prompt_text, "Invalid prompt. Need to enter a string value.")
return ret
def generate_additional_essays(self, e_text, e_score, dict=None, max_syns=3):
"""
Substitute synonyms to generate extra essays from existing ones.
This is done to increase the amount of training data.
Should only be used with lowest scoring essays.
e_text is the text of the original essay.
e_score is the score of the original essay.
dict is a fixed dictionary (list) of words to replace.
max_syns defines the maximum number of additional essays to generate. Do not set too high.
"""
random.seed(1)
e_toks = nltk.word_tokenize(e_text)
all_syns = []
for word in e_toks:
synonyms = util_functions.get_wordnet_syns(word)
if(len(synonyms) > max_syns):
synonyms = random.sample(synonyms, max_syns)
all_syns.append(synonyms)
new_essays = []
for i in range(0, max_syns):
syn_toks = e_toks
for z in range(0, len(e_toks)):
if len(all_syns[z]) > i and (dict == None or e_toks[z] in dict):
syn_toks[z] = all_syns[z][i]
new_essays.append(" ".join(syn_toks))
for z in xrange(0, len(new_essays)):
self.add_essay(new_essays[z], e_score, 1)
\ No newline at end of file
This source diff could not be displayed because it is too large. You can view the blob instead.
#Extracts features from training set and test set essays
import numpy
import re
import nltk
import sys
from sklearn.feature_extraction.text import CountVectorizer
import pickle
import os
from itertools import chain
base_path = os.path.dirname(__file__)
sys.path.append(base_path)
from essay_set import EssaySet
import util_functions
class FeatureExtractor:
def __init__(self):
self._good_pos_ngrams = self.get_good_pos_ngrams()
self.dict_initialized = False
def initialize_dictionaries(self, e_set):
"""
Initializes dictionaries from an essay set object
Dictionaries must be initialized prior to using this to extract features
e_set is an input essay set
returns a confirmation of initialization
"""
if(hasattr(e_set, '_type')):
if(e_set._type == "train"):
nvocab = util_functions.get_vocab(e_set._text, e_set._score)
svocab = util_functions.get_vocab(e_set._clean_stem_text, e_set._score)
self._normal_dict = CountVectorizer(min_n=1, max_n=2, vocabulary=nvocab)
self._stem_dict = CountVectorizer(min_n=1, max_n=2, vocabulary=svocab)
self.dict_initialized = True
ret = "ok"
else:
raise util_functions.InputError(e_set, "needs to be an essay set of the train type.")
else:
raise util_functions.InputError(e_set, "wrong input. need an essay set object")
return ret
def get_good_pos_ngrams(self):
"""
Gets a list of gramatically correct part of speech sequences from an input file called essaycorpus.txt
Returns the list and caches the file
"""
if(os.path.isfile("good_pos_ngrams.p")):
good_pos_ngrams = pickle.load(open('good_pos_ngrams.p', 'rb'))
else:
essay_corpus = open("essaycorpus.txt").read()
essay_corpus = util_functions.sub_chars(essay_corpus)
good_pos_ngrams = util_functions.regenerate_good_tokens(essay_corpus)
pickle.dump(good_pos_ngrams, open('good_pos_ngrams.p', 'wb'))
return good_pos_ngrams
def gen_length_feats(self, e_set):
"""
Generates length based features from an essay set
Generally an internal function called by gen_feats
Returns an array of length features
"""
text = e_set._text
lengths = [len(e) for e in text]
word_counts = [len(t) for t in e_set._tokens]
comma_count = [e.count(",") for e in text]
ap_count = [e.count("'") for e in text]
punc_count = [e.count(".") + e.count("?") + e.count("!") for e in text]
chars_per_word = [lengths[m] / float(word_counts[m]) for m in xrange(0, len(text))]
good_pos_tags = []
for i in xrange(0, len(text)):
pos_seq = [tag[1] for tag in e_set._pos[i]]
pos_ngrams = util_functions.ngrams(pos_seq, 2, 4)
overlap_ngrams = [i for i in pos_ngrams if i in self._good_pos_ngrams]
good_pos_tags.append(len(overlap_ngrams))
good_pos_tag_prop = [good_pos_tags[m] / float(word_counts[m]) for m in xrange(0, len(text))]
length_arr = numpy.array((
lengths, word_counts, comma_count, ap_count, punc_count, chars_per_word, good_pos_tags,
good_pos_tag_prop)).transpose()
return length_arr.copy()
def gen_bag_feats(self, e_set):
"""
Generates bag of words features from an input essay set and trained FeatureExtractor
Generally called by gen_feats
Returns an array of features
"""
if(hasattr(self, '_stem_dict')):
sfeats = self._stem_dict.transform(e_set._clean_stem_text)
nfeats = self._normal_dict.transform(e_set._text)
bag_feats = numpy.concatenate((sfeats.toarray(), nfeats.toarray()), axis=1)
else:
raise util_functions.InputError(self, "Dictionaries must be initialized prior to generating bag features.")
return bag_feats.copy()
def gen_feats(self, e_set):
"""
Generates bag of words, length, and prompt features from an essay set object
returns an array of features
"""
bag_feats = self.gen_bag_feats(e_set)
length_feats = self.gen_length_feats(e_set)
prompt_feats = self.gen_prompt_feats(e_set)
overall_feats = numpy.concatenate((length_feats, prompt_feats, bag_feats), axis=1)
overall_feats = overall_feats.copy()
return overall_feats
def gen_prompt_feats(self, e_set):
"""
Generates prompt based features from an essay set object and internal prompt variable.
Generally called internally by gen_feats
Returns an array of prompt features
"""
prompt_toks = nltk.word_tokenize(e_set._prompt)
expand_syns = []
for word in prompt_toks:
synonyms = util_functions.get_wordnet_syns(word)
expand_syns.append(synonyms)
expand_syns = list(chain.from_iterable(expand_syns))
prompt_overlap = []
prompt_overlap_prop = []
for j in e_set._tokens:
prompt_overlap.append(len([i for i in j if i in prompt_toks]))
prompt_overlap_prop.append(prompt_overlap[len(prompt_overlap) - 1] / float(len(j)))
expand_overlap = []
expand_overlap_prop = []
for j in e_set._tokens:
expand_overlap.append(len([i for i in j if i in expand_syns]))
expand_overlap_prop.append(expand_overlap[len(expand_overlap) - 1] / float(len(j)))
prompt_arr = numpy.array((prompt_overlap, prompt_overlap_prop, expand_overlap, expand_overlap_prop)).transpose()
return prompt_arr.copy()
\ No newline at end of file
#! /usr/bin/env python
##############################################################################
# Following functions have been taken from the DendroPy library from:
##
## DendroPy Phylogenetic Computing Library.
##
## Copyright 2010 Jeet Sukumaran and Mark T. Holder.
## All rights reserved.
##
## See "LICENSE.txt" for terms and conditions of usage.
##
## If you use this work or any portion thereof in published work,
## please cite it as:
##
## Sukumaran, J. and M. T. Holder. 2010. DendroPy: a Python library
## for phylogenetic computing. Bioinformatics 26: 1569-1571.
##
##############################################################################
import math
## From dendropy.mathlib.probability
def hypergeometric_pmf(x, m, n, k):
"""
Given a population consisting of `m` items of class M and `n` items of class N,
this returns the probability of observing `x` items of class M when sampling
`k` times without replacement from the entire population (i.e., {M,N})
p(x) = (choose(m, x) * choose(n, k-x)) / choose(m+n, k)
"""
# following fails with 'OverflowError: long int too large to convert to
# float' with large numbers
# return float(binomial_coefficient(m, x) * binomial_coefficient(n, k-x))/binomial_coefficient(m+n, k)
a = math.log(binomial_coefficient(m, x))
b = math.log(binomial_coefficient(n, k-x))
c = math.log(binomial_coefficient(m+n, k))
return math.exp(a+b-c)
## From dendropy.mathlib.probability
def binomial_coefficient(population, sample):
"Returns `population` choose `sample`."
s = max(sample, population - sample)
assert s <= population
assert population > -1
if s == population:
return 1
numerator = 1
denominator = 1
for i in xrange(s+1, population + 1):
numerator *= i
denominator *= (i - s)
return numerator/denominator
## From dendropy.mathlib.statistics
class FishersExactTest(object):
"""
Given a 2x2 table:
+---+---+
| a | b |
+---+---+
| c | d |
+---+---+
represented by a list of lists::
[[a,b],[c,d]]
this calculates the sum of the probability of this table and all others
more extreme under the null hypothesis that there is no association between
the categories represented by the vertical and horizontal axes.
"""
def probability_of_table(table):
"""
Given a 2x2 table:
+---+---+
| a | b |
+---+---+
| c | d |
+---+---+
represented by a list of lists::
[[a,b],[c,d]]
this returns the probability of this table under the null hypothesis of
no association between rows and columns, which was shown by Fisher to be
a hypergeometric distribution:
p = ( choose(a+b, a) * choose(c+d, c) ) / choose(a+b+c+d, a+c)
"""
a = table[0][0]
b = table[0][1]
c = table[1][0]
d = table[1][1]
return hypergeometric_pmf(a, a+b, c+d, a+c)
probability_of_table = staticmethod(probability_of_table)
def __init__(self, table):
self.table = table
self.flat_table = [table[0][0], table[0][1], table[1][0], table[1][1]]
self.min_value = min(self.flat_table)
self.max_value = max(self.flat_table)
def _rotate_cw(self, table):
"""
Returns a copy of table such that all the values
are rotated clockwise once.
"""
return [ [ table[1][0], table[0][0] ],
[table[1][1], table[0][1] ] ]
def _min_rotation(self):
"""
Returns copy of self.table such that the smallest value is in the first
(upper left) cell.
"""
table = [list(self.table[0]), list(self.table[1])]
while table[0][0] != self.min_value:
table = self._rotate_cw(table)
return table
def _max_rotation(self):
"""
Returns copy of self.table such that the largest value is in the first
(upper left) cell.
"""
table = [list(self.table[0]), list(self.table[1])]
while table[0][0] != self.max_value:
table = self._rotate_cw(table)
return table
def _sum_left_tail(self):
# left_tail_tables = self._get_left_tail_tables()
# p_vals = [ self.probability_of_table(t) for t in left_tail_tables ]
p_vals = self._get_left_tail_probs()
return sum(p_vals)
def _sum_right_tail(self):
# right_tail_tables = self._get_right_tail_tables()
# p_vals = [ self.probability_of_table(t) for t in right_tail_tables ]
p_vals = self._get_right_tail_probs()
return sum(p_vals)
def _get_left_tail_probs(self):
table = self._min_rotation()
row_totals = [sum(table[0]), sum(table[1])]
col_totals = [table[0][0] + table[1][0], table[0][1] + table[1][1]]
p_vals = []
while True:
table[0][0] -= 1
if table[0][0] < 0:
break
table[0][1] = row_totals[0] - table[0][0]
table[1][0] = col_totals[0] - table[0][0]
table[1][1] = row_totals[1] - table[1][0]
p_vals.append(self.probability_of_table(table))
return p_vals
def _get_right_tail_probs(self):
table = self._min_rotation()
row_totals = [sum(table[0]), sum(table[1])]
col_totals = [table[0][0] + table[1][0], table[0][1] + table[1][1]]
p_vals = []
while True:
table[0][0] += 1
table[0][1] = row_totals[0] - table[0][0]
if table[0][1] < 0:
break
table[1][0] = col_totals[0] - table[0][0]
if table[1][0] < 0:
break
table[1][1] = row_totals[1] - table[1][0]
if table[1][1] < 0:
break
p_vals.append(self.probability_of_table(table))
return p_vals
def _get_left_tail_tables(self):
table = self._min_rotation()
row_totals = [sum(table[0]), sum(table[1])]
col_totals = [table[0][0] + table[1][0], table[0][1] + table[1][1]]
left_tail_tables = []
while True:
table[0][0] -= 1
if table[0][0] < 0:
break
table[0][1] = row_totals[0] - table[0][0]
table[1][0] = col_totals[0] - table[0][0]
table[1][1] = row_totals[1] - table[1][0]
left_tail_tables.append([list(table[0]), list(table[1])])
return left_tail_tables
def _get_right_tail_tables(self):
table = self._min_rotation()
row_totals = [sum(table[0]), sum(table[1])]
col_totals = [table[0][0] + table[1][0], table[0][1] + table[1][1]]
right_tail_tables = []
while True:
table[0][0] += 1
table[0][1] = row_totals[0] - table[0][0]
if table[0][1] < 0:
break
table[1][0] = col_totals[0] - table[0][0]
if table[1][0] < 0:
break
table[1][1] = row_totals[1] - table[1][0]
if table[1][1] < 0:
break
right_tail_tables.append([list(table[0]), list(table[1])])
return right_tail_tables
def left_tail_p(self):
"""
Returns the sum of probabilities of this table and all others more
extreme.
"""
return self.probability_of_table(self.table) + self._sum_left_tail()
def right_tail_p(self):
"""
Returns the sum of probabilities of this table and all others more
extreme.
"""
return self.probability_of_table(self.table) + self._sum_right_tail()
def two_tail_p(self):
"""
Returns the sum of probabilities of this table and all others more
extreme.
"""
p0 = self.probability_of_table(self.table)
all_p_vals = self._get_left_tail_probs() + self._get_right_tail_probs()
p_vals = []
for p in all_p_vals:
if p <= p0:
p_vals.append(p)
return sum(p_vals) + p0
def assert_almost_equal(v1, v2, prec=8):
if abs(v1-v2) <= 10**(-prec):
print "OK: {} == {}".format(v1, v2)
else:
print "FAIL: {} != {}".format(v1, v2)
if __name__ == "__main__":
table = [[12, 5], [29, 2]]
ft = FishersExactTest(table)
assert_almost_equal(ft.left_tail_p(), 0.044554737835078267)
assert_almost_equal(ft.right_tail_p(), 0.99452520602190897)
assert_almost_equal(ft.two_tail_p(), 0.08026855207410688)
\ No newline at end of file
This source diff could not be displayed because it is too large. You can view the blob instead.
#Grader called by pyxserver_wsgi.py
#Loads a grader file, which is a dict containing the prompt of the question,
#a feature extractor object, and a trained model.
#Extracts features and runs trained model on the submission to produce a final score.
#Correctness determined by ratio of score to max possible score.
#Requires aspell to be installed and added to the path.
import sys
import pickle
import os
import numpy
base_path = os.path.dirname(__file__)
sys.path.append(base_path)
from essay_set import essay_set
#Imports needed to unpickle grader data
import feature_extractor
import sklearn.ensemble
def grade(grader_path,submission):
results = {'errors': [],'tests': [],'correct': False,'score': 0}
#Try to find and load the model file
try:
grader_data=pickle.load(file(grader_path,"r"))
except:
results['errors'].append("Could not find a valid model file.")
grader_set=essay_set(type="test")
#Try to add essays to essay set object
try:
grader_set.add_essay(str(submission),0)
grader_set.update_prompt(str(grader_data['prompt']))
except:
results['errors'].append("Essay could not be added to essay set:{0}".format(submission))
#Try to extract features from submission and assign score via the model
try:
grader_feats=grader_data['extractor'].gen_feats(grader_set)
results['score']=int(grader_data['model'].predict(grader_feats)[0])
except:
results['errors'].append("Could not extract features and score essay.")
#Determine maximum score and correctness of response
max_score=numpy.max(grader_data['model'].classes_)
if results['score']/float(max_score) >= .66:
results['correct']=True
else:
results['correct']=False
return results
import numpy
import re
import nltk
import sys
from sklearn.feature_extraction.text import CountVectorizer
import pickle
import os
from sklearn.ensemble import GradientBoostingClassifier
from itertools import chain
from sklearn.ensemble import RandomForestClassifier
base_path="C:/Users/Vik/Documents/Consulting/PyShortEssay/"
os.chdir(base_path)
sys.path.append("C:/Users/Vik/Documents/rscripts/python_ses")
from essay_set import essay_set
import util_functions
import mech_turk_interface
from feature_extractor import feature_extractor
id,e_set,score,score2,text=[],[],[],[],[]
combined_raw=open(base_path + "train.tsv").read()
raw_lines=combined_raw.splitlines()
for row in xrange(1,len(raw_lines)):
id1,set1,score1,score12,text1 = raw_lines[row].strip().split("\t")
id.append(int(id1))
text.append(text1)
e_set.append(int(set1))
score.append(int(score1))
score2.append(int(score12))
prompt_string="A group of students wrote the following procedure for their investigation. Procedure: 1. Determine the mass of four different samples. 2. Pour vinegar in each of four separate, but identical, containers. 3. Place a sample of one material into one container and label. Repeat with remaining samples, placing a single sample into a single container. 4. After 24 hours, remove the samples from the containers and rinse each sample with distilled water. 5. Allow the samples to sit and dry for 30 minutes. 6. Determine the mass of each sample. The students’ data are recorded in the table below. Sample Starting Mass (g) Ending Mass (g) Difference in Mass (g) Marble 9.8 9.4 –0.4 Limestone 10.4 9.1 –1.3 Wood 11.2 11.2 0.0 Plastic 7.2 7.1 –0.1"
question_string="After reading the group’s procedure, describe what additional information you would need in order to replicate the experiment. Make sure to include at least three pieces of information."
x=essay_set()
m_coef=1572
for i in xrange(0,len(text)-m_coef):
x.add_essay(text[i],score[i])
if(score[i]==min(score)):
x.generate_additional_essays(x._clean_text[len(x._clean_text)-1],score[i])
x.update_prompt(prompt_string)
all_train_toks=util_functions.f7(list(chain.from_iterable([x._tokens[t] for t in range(0,len(x._tokens)) if x._generated[t]==0])))
x_t=essay_set(type="test")
for i in xrange(len(text)-m_coef,len(text)):
#te_toks=nltk.word_tokenize(text[i].lower())
#tok_overlap=float(len([tok for tok in te_toks if tok in all_train_toks]))/len(te_toks)
#if tok_overlap>=0:
x_t.add_essay(text[i],score[i])
x_t.update_prompt(prompt_string)
f=feature_extractor()
f.initialize_dictionaries(x)
train_feats=f.gen_feats(x)
test_feats=f.gen_feats(x_t)
clf = GradientBoostingClassifier(n_estimators=100, learn_rate=.05,max_depth=4, random_state=1,min_samples_leaf=3)
cv_preds=util_functions.gen_cv_preds(clf,train_feats,x._score)
print "CV Train: " + str(util_functions.quadratic_weighted_kappa(cv_preds,x._score))
model=util_functions.gen_model(clf,train_feats,x._score)
preds=util_functions.gen_preds(clf,test_feats)
print "Test Err: " + str(util_functions.quadratic_weighted_kappa(preds,x_t._score))
print "Conf Mat:\n" + str(numpy.array(util_functions.confusion_matrix(preds,x_t._score)))
prompt=prompt_string
question=question_string
essay_text=text[100:110]
all_essays=text[0:100]
all_scores=score[0:100]
ACCESS_ID =
SECRET_KEY =
HOST = 'mechanicalturk.sandbox.amazonaws.com'
#HOST = 'mechanicalturk.amazonaws.com'
hit_creator=mech_turk_interface.HITCreator(ACCESS_ID,SECRET_KEY,HOST,essay_text,prompt,question,all_essays,all_scores,assignment_count=3)
hit_creator.create_hits(reward=.20,add_qualifications=True)
new_results=hit_creator.hit_container.get_all_results()
print new_results
print [util_functions.getMedian(x) for x in new_results[0]]
hit_creator.hit_container.process_approvals()
#Provides interface functions to create and save models
import numpy
import re
import nltk
import sys
from sklearn.feature_extraction.text import CountVectorizer
import pickle
import os
import sklearn.ensemble
from itertools import chain
base_path = os.path.dirname(__file__)
sys.path.append(base_path)
from essay_set import EssaySet
import util_functions
import feature_extractor
def read_in_test_data(filename):
"""
Reads in test data file found at filename.
filename must be a tab delimited file with columns id, dummy number column, score, dummy score, text
returns the score and the text
"""
id, e_set, score, score2, text = [], [], [], [], []
combined_raw = open(filename).read()
raw_lines = combined_raw.splitlines()
for row in xrange(1, len(raw_lines)):
id1, set1, score1, score12, text1 = raw_lines[row].strip().split("\t")
id.append(int(id1))
text.append(text1)
e_set.append(int(set1))
score.append(int(score1))
score2.append(int(score12))
return score, text
def read_in_test_prompt(filename):
"""
Reads in the prompt from a text file
Returns string
"""
prompt_string = open(filename).read()
return prompt_string
def create_essay_set(text, score, prompt_string, generate_additional=True):
"""
Creates an essay set from given data.
Text should be a list of strings corresponding to essay text.
Score should be a list of scores where score[n] corresponds to text[n]
Prompt string is just a string containing the essay prompt.
Generate_additional indicates whether to generate additional essays at the minimum score point or not.
"""
x = EssaySet()
for i in xrange(0, len(text)):
x.add_essay(text[i], score[i])
if score[i] == min(score) and generate_additional == True:
x.generate_additional_essays(x._clean_text[len(x._clean_text) - 1], score[i])
x.update_prompt(prompt_string)
return x
def extract_features_and_generate_model(essays):
"""
Feed in an essay set to get feature vector and classifier
essays must be an essay set object
returns a trained FeatureExtractor object and a trained classifier
"""
f = feature_extractor.FeatureExtractor()
f.initialize_dictionaries(essays)
train_feats = f.gen_feats(essays)
clf = sklearn.ensemble.GradientBoostingClassifier(n_estimators=100, learn_rate=.05,
max_depth=4, random_state=1,
min_samples_leaf=3)
model = util_functions.gen_model(clf, train_feats, essays._score)
return f, clf
def dump_model_to_file(prompt_string, feature_ext, classifier, model_path):
"""
Writes out a model to a file.
prompt string is a string containing the prompt
feature_ext is a trained FeatureExtractor object
classifier is a trained classifier
model_path is the path of write out the model file to
"""
model_file = {'prompt': prompt_string, 'extractor': feature_ext, 'model': classifier}
pickle.dump(model_file, file=open(model_path, "w"))
"A group of students wrote the following procedure for their investigation. Procedure: 1. Determine the mass of four different samples. 2. Pour vinegar in each of four separate, but identical, containers. 3. Place a sample of one material into one container and label. Repeat with remaining samples, placing a single sample into a single container. 4. After 24 hours, remove the samples from the containers and rinse each sample with distilled water. 5. Allow the samples to sit and dry for 30 minutes. 6. Determine the mass of each sample. The students’ data are recorded in the table below. Sample Starting Mass (g) Ending Mass (g) Difference in Mass (g) Marble 9.8 9.4 –0.4 Limestone 10.4 9.1 –1.3 Wood 11.2 11.2 0.0 Plastic 7.2 7.1 –0.1"
#!/usr/bin/python
#------------------------------------------------------------
# Run me with (may need su privilege for logging):
# gunicorn -w 4 -b 127.0.0.1:3031 pyxserver_wsgi:application
#------------------------------------------------------------
import cgi # for the escape() function
import json
import logging
import os
import os.path
import sys
from time import localtime, strftime
script_dir = os.path.dirname(__file__)
sys.path.append(script_dir)
import settings # Not django, but do something similar
# make sure we can find the grader files
sys.path.append(settings.GRADER_ROOT)
import grade
results_template = """
<div class="test">
<header>Test results</header>
<section>
<div class="shortform">
{status}
</div>
<div class="longform">
{errors}
{results}
</div>
</section>
</div>
"""
results_correct_template = """
<div class="result-output result-correct">
<h4>{short-description}</h4>
<p>{long-description}</p>
<dl>
<dt>Output:</dt>
<dd class="result-actual-output">
<pre>{actual-output}</pre>
</dd>
</dl>
</div>
"""
results_incorrect_template = """
<div class="result-output result-incorrect">
<h4>{short-description}</h4>
<p>{long-description}</p>
<dl>
<dt>Your output:</dt>
<dd class="result-actual-output"><pre>{actual-output}</pre></dd>
<dt>Correct output:</dt>
<dd><pre>{expected-output}</pre></dd>
</dl>
</div>
"""
def format_errors(errors):
esc = cgi.escape
error_string = ''
error_list = [esc(e) for e in errors or []]
if error_list:
items = '\n'.join(['<li><pre>{0}</pre></li>\n'.format(e) for e in error_list])
error_string = '<ul>\n{0}</ul>\n'.format(items)
error_string = '<div class="result-errors">{0}</div>'.format(error_string)
return error_string
def to_dict(result):
# long description may or may not be provided. If not, don't display it.
# TODO: replace with mako template
esc = cgi.escape
if result[1]:
long_desc = '<p>{0}</p>'.format(esc(result[1]))
else:
long_desc = ''
return {'short-description': esc(result[0]),
'long-description': long_desc,
'correct': result[2], # Boolean; don't escape.
'expected-output': esc(result[3]),
'actual-output': esc(result[4])
}
def render_results(results):
output = []
test_results = [to_dict(r) for r in results['tests']]
for result in test_results:
if result['correct']:
template = results_correct_template
else:
template = results_incorrect_template
output += template.format(**result)
errors = format_errors(results['errors'])
status = 'INCORRECT'
if errors:
status = 'ERROR'
elif results['correct']:
status = 'CORRECT'
return results_template.format(status=status,
errors=errors,
results=''.join(output))
def do_GET(data):
return "Hey, the time is %s" % strftime("%a, %d %b %Y %H:%M:%S", localtime())
def do_POST(data):
# This server expects jobs to be pushed to it from the queue
xpackage = json.loads(data)
body = xpackage['xqueue_body']
# Delivery from the lms
body = json.loads(body)
student_response = body['student_response']
payload = body['grader_payload']
try:
grader_config = json.loads(payload)
except ValueError as err:
# If parsing json fails, erroring is fine--something is wrong in the content.
# However, for debugging, still want to see what the problem is
raise
relative_grader_path = grader_config['grader']
grader_path = os.path.join(settings.GRADER_ROOT, relative_grader_path)
results = grade.grade(grader_path, student_response)
# Make valid JSON message
reply = { 'correct': results['correct'],
'score': results['score'],
'msg': render_results(results) }
return json.dumps(reply)
# Entry point
def application(env, start_response):
# Handle request
method = env['REQUEST_METHOD']
data = env['wsgi.input'].read()
def post_wrapper(data):
try:
return do_POST(data)
except:
return None
handlers = {'GET': do_GET,
'POST': post_wrapper,
}
if method in handlers.keys():
reply = handlers[method](data)
if reply is not None:
start_response('200 OK', [('Content-Type', 'text/html')])
return reply
# If we fell through to here, complain.
start_response('404 Not Found', [('Content-Type', 'text/plain')])
return ''
# Not django (for now), but use the same settings format anyway
import json
import os
from path import path
import sys
ROOT_PATH = path(__file__).dirname()
REPO_PATH = ROOT_PATH
ENV_ROOT = REPO_PATH.dirname()
# DEFAULTS
DEBUG = False
# Must end in '/'
RUN_URL = 'http://127.0.0.1:3031/' # Victor's VM ...
RUN_URL = 'http://sandbox-runserver-001.m.edx.org:8080/'
RUN_URL = 'http://sandbox-runserver.elb.edx.org:80/'
GRADER_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__),'..'))
# AWS
if os.path.isfile(ENV_ROOT / "env.json"):
print "Opening env.json file"
with open(ENV_ROOT / "env.json") as env_file:
ENV_TOKENS = json.load(env_file)
RUN_URL = ENV_TOKENS['RUN_URL']
LOG_DIR = ENV_TOKENS['LOG_DIR']
# Should be absolute path to 6.00 grader dir.
# NOTE: This means we only get one version of 6.00 graders available--has to
# be the same for internal and external class. Not critical -- can always
# use different grader file if want different problems.
GRADER_ROOT = ENV_TOKENS.get('GRADER_ROOT')
this is an incorrect response
\ No newline at end of file
<b><fg>In order to replicate this experiment, I would need to know additional information such as the four different samples that they used (because I could have choosen metal, carbboard&&&&&and many other sample materials that they&;;;& didn't use and would get different results. Also I would also<>>> need to know the amount of vinegar to pour because this can caute a major change. Lastly, they might want to tell//////where to sit the samples while they dry for 30 minutes because if they are sitting in room temp. or by a light source makes a difference too.<b><b>
{"grader":"models/essay_set_1.p"}
In order to conduct the experiment, the students would need to know the mass of the marble, the height of the drop, and the air temperature.
In order to replicate this experiment, I would need to know additional information such as the four different samples that they used (because I could have choosen metal, carbboard and many other sample materials that they didn't use and would get different results. Also I would also need to know the amount of vinegar to pour because this can caute a major change. Lastly, they might want to tell where to sit the samples while they dry for 30 minutes because if they are sitting in room temp. or by a light source makes a difference too.
{"grader":"models/essay_set_1.p"}
this is an incorrect response
#!/usr/bin/env python
"""
Send some test programs to an xserver.
For each dir in the current directory, send the contents of payload.xml and each
of the answer*.py, right*.py and wrong*.py files.
"""
import argparse
import glob
import json
import os
import os.path
from path import path
import requests
import sys
import time
xserver = 'http://127.0.0.1:3031/'
def send(payload, answer):
"""
Send a grading request to the xserver
"""
body = {'grader_payload': payload,
'student_response': answer}
data = {'xqueue_body': json.dumps(body),
'xqueue_files': ''}
start = time.time()
r = requests.post(xserver, data=json.dumps(data))
end = time.time()
print "Request took %.03f sec" % (end - start)
if r.status_code != requests.codes.ok:
print "Request error:{0}".format(r.headers)
print "Text: ", r.text
return r.text
def check_contains(string, substr):
if not substr in string:
print "ERROR: Expected '{0}' in '{1}'".format(substr, string)
return False
else:
return True
def check_not_contains(string, substr):
if substr in string:
print "ERROR: Expected '{0}' not to be in '{1}'".format(substr, string)
return False
else:
return True
def check_right(string):
return check_contains(string, '\"correct\": true')
def check_wrong(string):
return check_contains(string, '\"correct\": false')
def globs(dirname, *patterns):
"""
Produce a sequence of all the files matching any of our patterns in dirname.
"""
for pat in patterns:
for fname in glob.glob(os.path.join(dirname, pat)):
yield fname
def contents(fname):
"""
Return the contents of the file `fname`.
"""
with open(fname) as f:
return f.read()
def check(dirname):
"""
Look for payload.json, answer*.py, right*.py, wrong*.py, run tests.
"""
payload_file = os.path.join(dirname, 'payload.json')
if os.path.isfile(payload_file):
payload = contents(payload_file)
print("found payload: " + payload)
else:
graders = list(globs(dirname, 'grade*.py'))
if not graders:
#print "No payload.json or grade*.py in {0}".format(dirname)
return
if len(graders) > 1:
print "More than one grader in {0}".format(dirname)
return
payload = json.dumps({'grader': os.path.abspath(graders[0])})
for name in globs(dirname, 'answer*.txt', 'right*.py'):
print "Checking correct response from {0}".format(name)
answer = contents(name)
right=check_right(send(payload, answer))
for name in globs(dirname, 'wrong*.txt'):
print "Checking wrong response from {0}".format(name)
answer = contents(name)
wrong=check_wrong(send(payload, answer))
assert wrong and right
def test():
#root = args.root or '.'
root=os.path.dirname( os.path.abspath(__file__ ))
for dirpath, _, _ in os.walk(root):
print("checking" + dirpath)
yield check, dirpath
def main(argv):
global xserver
#parser = argparse.ArgumentParser(description="Send dummy requests to a qserver")
#parser.add_argument('server')
#parser.add_argument('root', nargs='?')
#args = parser.parse_args(argv)
#xserver = args.server
if not xserver.endswith('/'):
xserver += '/'
#root = args.root or '.'
root=os.path.dirname( os.path.abspath(__file__ ))
for dirpath, _, _ in os.walk(root):
print("checking" + dirpath)
check(dirpath)
if __name__=="__main__":
main(sys.argv[1:])
this is an incorrect response
\ No newline at end of file
This source diff could not be displayed because it is too large. You can view the blob instead.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment