Commit 620b52e6 by John Eskew

Add test of nested block time storage.

Add .gitignore file.
parent 014a30e5
from timeit import default_timer
from .storage import TimingDataStorage
class Globals(object):
pass
global __m
__m = Globals()
_m = Globals()
__m.nest_level = 0
# Current module-level nest stack.
# As CodeBlockTimer objects are __enter__ed, their descriptions are pushed
# onto this stack. The stack length indicates the current nesting level.
_m.nest_stack = []
# Current run_id.
# Set from data storage when stack size increases from 0.
# While nest stack is populated, remains at a constant value.
# Identifies all the times from the same run.
_m.run_id = None
class CodeBlockTimer(object):
def __init__(self, block_desc, verbose=False):
def __init__(self, block_desc, **kwargs):
self.block_desc = block_desc
self.verbose = verbose
self.verbose = False if 'verbose' not in kwargs else kwargs['verbose']
self.timer = default_timer
self.data_store = TimingDataStorage()
self.run_id = None
self.data_store = TimingDataStorage(**kwargs)
def __enter__(self):
global __m
if __m.nest_level == 0:
self.run_id = self.data_store.run_id()
__m.nest_level += 1
global _m
if len(_m.nest_stack) == 0:
_m.run_id = self.data_store.run_id()
_m.nest_stack.append(self.block_desc)
self.start = self.timer()
return self
def __exit__(self, *args):
global __m
global _m
# Compute elapsed times.
end = self.timer()
self.elapsed_secs = end - self.start
self.elapsed = self.elapsed_secs * 1000 # millisecs
self.data_store.store(self.run_id, self.block_desc, self.elapsed)
__m.nest_level -= 1
if __m.nest_level == 0:
self.run_id = None
if self.verbose:
print '{}: elapsed time: {} ms'.format(self.block_desc, self.elapsed)
# Store the timings.
nested_desc = ":".join(_m.nest_stack)
self.data_store.store(_m.run_id, nested_desc, self.elapsed)
# Pop the stack.
_m.nest_stack.pop()
if len(_m.nest_stack) == 0:
_m.run_id = None
if self.verbose:
print '{}: elapsed time: {} ms'.format(
self.block_desc, self.elapsed
)
import sqlite3
import os
# Assumes that the DB exists and the schema in schema.sql exists in it.
class TimingDataStorage(object):
DB_NAME = 'block_times.db'
SCHEMA_NAME = "code_block_timer/code_block_timer/schema.sql"
DEFAULT_DB_NAME = 'block_times.db'
def __init__(self):
def __init__(self, **kwargs):
# Verify that the sqlite DB and schema exists.
self.conn = sqlite3.connect(self.DB_NAME)
def run_id(self, desc):
db_name = self.DEFAULT_DB_NAME
if 'db_name' in kwargs:
db_name = kwargs['db_name']
if not os.path.exists(db_name):
self._createDB(db_name)
self.conn = sqlite3.connect(db_name)
def _createDB(self, db_name):
# Create the sqlite DB file.
with open(db_name, "w") as f:
conn = sqlite3.connect(db_name)
with open(self.SCHEMA_NAME, "r") as schema_file:
schema = schema_file.read()
cur = conn.cursor()
conn.executescript(schema)
conn.commit()
def run_id(self, desc=""):
"""
Creates a new run ID and returns it.
A single run ID can be used for multiple store()s for times in the same test.
......@@ -27,6 +45,8 @@ class TimingDataStorage(object):
Store the description and elapsed time in the DB, under the passed-in run_id.
"""
cur = self.conn.cursor()
cur.execute('insert into block_times (run_id, block_desc, elapsed) values (?, ?, ?)', (run_id, desc, elapsed))
cur.execute(
'insert into block_times (run_id, block_desc, elapsed)'
'values (?, ?, ?)', (run_id, desc, elapsed)
)
self.conn.commit()
import os
import unittest
import random
import sqlite3
from code_block_timer import CodeBlockTimer, _m
class TestCodeBlockTimer(unittest.TestCase):
"""
Tests for CodeBlockTimer.
"""
def setUp(self):
super(TestCodeBlockTimer, self).setUp()
self.db_name = "tmp_db_{}".format(random.randint(100000, 999999))
def _verifyEvents(self, run_id, event_names):
"""
Verify that the run identified by the run_id has the events identified by the event_names.
"""
conn = sqlite3.connect(self.db_name)
cur = conn.cursor()
for event in event_names:
cur.execute("select count(*) from block_times where run_id=? and block_desc=?", (run_id, event))
self.assertEquals(cur.fetchone()[0], 1)
def test_stuff(self):
iterations = ['iter0', 'iter1', 'iter2', 'iter3']
with CodeBlockTimer("test", db_name=self.db_name) as timer:
run_id = _m.run_id
z = 0
for i, iter_name in enumerate(iterations):
with CodeBlockTimer(iter_name, db_name=self.db_name) as inner:
z += i
self._verifyEvents(run_id, ['test', ] + ["test:{}".format(x) for x in iterations])
def tearDown(self):
# Destroy the sqlite DB.
os.remove(self.db_name)
if __name__ == '__main__':
unittest.main()
......@@ -7,4 +7,4 @@ setup(
"code_block_timer",
],
author_email='jeskew@edx.org'
)
\ No newline at end of file
)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment