Commit 09fbbe7b by Ned Batchelder

Codejail is in its own repo now.

parent adde9398
CodeJail
========
CodeJail manages execution of untrusted code in secure sandboxes. It is
designed primarily for Python execution, but can be used for other languages as
well.
Security is enforced with AppArmor. If your operating system doesn't support
AppArmor, then CodeJail won't protect the execution.
CodeJail is designed to be configurable, and will auto-configure itself for
Python execution if you install it properly. The configuration is designed to
be flexible: it can run in safe more or unsafe mode. This helps support large
development groups where only some of the developers are involved enough with
secure execution to configure AppArmor on their development machines.
If CodeJail is not configured for safe execution, it will execution Python
using the same API, but will not guard against malicious code. This allows the
same code to be used on safe-configured or non-safe-configured developer's
machines.
Installation
------------
These instructions detail how to configure your operating system so that
CodeJail can execute Python code safely. You can run CodeJail without these
steps, and you will have an unsafe CodeJail. This is fine for developers'
machines who are unconcerned with security, and simplifies the integration of
CodeJail into your project.
To secure Python execution, you'll be creating a new virtualenv. This means
you'll have two: the main virtualenv for your project, and the new one for
sandboxed Python code.
Choose a place for the new virtualenv, call it <SANDENV>. It will be
automatically detected and used if you put it right alongside your existing
virtualenv, but with -sandbox appended. So if your existing virtualenv is in
~/ve/myproj, make <SANDENV> be ~/ve/myproj-sandbox (but you'll need to spell
out your home directory instead of ~).
Other details here that depend on your configuration:
- Your mitx working tree is <MITX>, for example, ~/mitx_all/mitx
- The user running the LMS is <USER>, for example, you on your dev machine,
or www-data on a server.
1. Create the new virtualenv::
$ sudo virtualenv <SANDENV>
2. Install the sandbox requirements::
$ source <SANDENV>/bin/activate
$ sudo pip install -r sandbox-requirements.txt
3. Add a sandbox user::
$ sudo addgroup sandbox
$ sudo adduser --disabled-login sandbox --ingroup sandbox
4. Let the web server run the sandboxed Python as sandbox. Create the file
/etc/sudoers.d/01-sandbox::
$ visudo -f /etc/sudoers.d/01-sandbox
<USER> ALL=(sandbox) NOPASSWD:<SANDENV>/bin/python
<USER> ALL=(ALL) NOPASSWD:/bin/kill
5. Edit an AppArmor profile. The file must be named for the python executable,
but with slashes changed to dots::
#include <tunables/global>
<SANDENV>/bin/python {
#include <abstractions/base>
<SANDENV>/** mr,
<MITX>/common/lib/sandbox-packages/** r,
/usr/local/lib/python2.7/** r,
/usr/lib/python2.7/** rix,
/tmp/** rix,
}
6. Parse the profiles::
$ sudo apparmor_parser <APPARMOR_FILE>
7. Reactivate your project's main virtualenv again.
Tests
=====
The tests run under nose in the standard fashion.
If CodeJail is running unsafely, many of the tests will be automatically
skipped, or will fail, depending on whether CodeJail thinks it should be in
safe mode or not.
"""Django integration for codejail"""
from django.core.exceptions import MiddlewareNotUsed
from django.conf import settings
import codejail.jail_code
class ConfigureCodeJailMiddleware(object):
"""Middleware to configure codejail on startup."""
def __init__(self):
python_bin = settings.CODE_JAIL.get('python_bin')
if python_bin:
user = settings.CODE_JAIL['user']
codejail.jail_code.configure("python", python_bin, user=user)
raise MiddlewareNotUsed
"""Run a python process in a jail."""
# Instructions:
# - AppArmor.md from xserver
import logging
import os
import os.path
import resource
import shutil
import subprocess
import sys
import threading
import time
from .util import temp_directory
log = logging.getLogger(__name__)
# TODO: limit too much stdout data?
# Configure the commands
# COMMANDS is a map from an abstract command name to a list of command-line
# pieces, such as subprocess.Popen wants.
COMMANDS = {}
def configure(command, bin_path, user=None):
"""Configure a command for `jail_code` to use.
`command` is the abstract command you're configuring, such as "python" or
"node". `bin_path` is the path to the binary. `user`, if provided, is
the user name to run the command under.
"""
cmd_argv = []
if user:
cmd_argv.extend(['sudo', '-u', 'sandbox'])
cmd_argv.append(bin_path)
# Command-specific arguments
if command == "python":
cmd_argv.append('-E')
COMMANDS[command] = cmd_argv
def is_configured(command):
"""Has `jail_code` been configured for `command`?
Returns true if the abstract command `command` has been configured for use
in the `jail_code` function.
"""
return command in COMMANDS
# By default, look where our current Python is, and maybe there's a
# python-sandbox alongside. Only do this if running in a virtualenv.
if hasattr(sys, 'real_prefix'):
if os.path.isdir(sys.prefix + "-sandbox"):
configure("python", sys.prefix + "-sandbox/bin/python", "sandbox")
class JailResult(object):
"""A passive object for us to return from jail_code."""
def __init__(self):
self.stdout = self.stderr = self.status = None
def jail_code(command, code=None, files=None, argv=None, stdin=None):
"""Run code in a jailed subprocess.
`command` is an abstract command ("python", "node", ...) that must have
been configured using `configure`.
`code` is a string containing the code to run. If no code is supplied,
then the code to run must be in one of the `files` copied, and must be
named in the `argv` list.
`files` is a list of file paths, they are all copied to the jailed
directory.
`argv` is the command-line arguments to supply.
Return an object with:
.stdout: stdout of the program, a string
.stderr: stderr of the program, a string
.status: return status of the process: an int, 0 for successful
"""
if not is_configured(command):
raise Exception("jail_code needs to be configured for %r" % command)
with temp_directory(delete_when_done=True) as tmpdir:
log.debug("Executing jailed code: %r", code)
argv = argv or []
# All the supporting files are copied into our directory.
for filename in files or ():
if os.path.isfile(filename):
shutil.copy(filename, tmpdir)
else:
dest = os.path.join(tmpdir, os.path.basename(filename))
shutil.copytree(filename, dest)
# Create the main file.
if code:
with open(os.path.join(tmpdir, "jailed_code"), "w") as jailed:
jailed.write(code)
argv = ["jailed_code"] + argv
cmd = COMMANDS[command] + argv
subproc = subprocess.Popen(
cmd, preexec_fn=set_process_limits, cwd=tmpdir,
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
)
# TODO: time limiting
killer = ProcessKillerThread(subproc)
killer.start()
result = JailResult()
result.stdout, result.stderr = subproc.communicate(stdin)
result.status = subproc.returncode
return result
def set_process_limits():
"""
Set limits on this processs, to be used first in a child process.
"""
resource.setrlimit(resource.RLIMIT_CPU, (1, 1)) # 1 second of CPU--not wall clock time
resource.setrlimit(resource.RLIMIT_NPROC, (0, 0)) # no subprocesses
resource.setrlimit(resource.RLIMIT_FSIZE, (0, 0)) # no files
mem = 32 * (2 ** 20) # 32 MB should be enough for anyone, right? :)
resource.setrlimit(resource.RLIMIT_STACK, (mem, mem))
resource.setrlimit(resource.RLIMIT_RSS, (mem, mem))
resource.setrlimit(resource.RLIMIT_DATA, (mem, mem))
class ProcessKillerThread(threading.Thread):
def __init__(self, subproc, limit=1):
super(ProcessKillerThread, self).__init__()
self.subproc = subproc
self.limit = limit
def run(self):
start = time.time()
while (time.time() - start) < self.limit:
time.sleep(.1)
if self.subproc.poll() is not None:
# Process ended, no need for us any more.
return
if self.subproc.poll() is None:
# Can't use subproc.kill because we launched the subproc with sudo.
killargs = ["sudo", "kill", "-9", str(self.subproc.pid)]
kill = subprocess.Popen(killargs, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = kill.communicate()
# TODO: This doesn't actually kill the process.... :(
"""Safe execution of untrusted Python code."""
import json
import logging
import os.path
import shutil
import sys
import textwrap
from codejail import jail_code
from codejail.util import temp_directory, change_directory
log = logging.getLogger(__name__)
def safe_exec(code, globals_dict, files=None, python_path=None):
"""Execute code as "exec" does, but safely.
`code` is a string of Python code. `globals_dict` is used as the globals
during execution. Modifications the code makes to `globals_dict` are
reflected in the dictionary on return.
Returns None. Changes made by `code` are visible in `globals_dict`.
"""
the_code = []
files = list(files or ())
the_code.append(textwrap.dedent(
"""
import json
import sys
"""
# We need to prevent the sandboxed code from printing to stdout,
# or it will pollute the json we print there. This isn't a
# security concern (they can put any values in the json output
# anyway, either by writing to sys.__stdout__, or just by defining
# global values), but keeps accidents from happening.
"""
class DevNull(object):
def write(self, *args, **kwargs):
pass
sys.stdout = DevNull()
"""
# Read the code and the globals from the stdin.
"""
code, g_dict = json.load(sys.stdin)
"""))
for pydir in python_path or ():
pybase = os.path.basename(pydir)
the_code.append("sys.path.append(%r)\n" % pybase)
files.append(pydir)
the_code.append(textwrap.dedent(
# Execute the sandboxed code.
"""
exec code in g_dict
"""
# Clean the globals for sending back as JSON over stdout.
"""
ok_types = (type(None), int, long, float, str, unicode, list, tuple, dict)
bad_keys = ("__builtins__",)
def jsonable(v):
if not isinstance(v, ok_types):
return False
try:
json.dumps(v)
except Exception:
return False
return True
g_dict = {k:v for k,v in g_dict.iteritems() if jsonable(v) and k not in bad_keys}
"""
# Write the globals back to the calling process.
"""
json.dump(g_dict, sys.__stdout__)
"""))
stdin = json.dumps([code, json_safe(globals_dict)])
jailed_code = "".join(the_code)
# Turn this on to see what's being executed.
if 0:
log.debug("Jailed code: %s", jailed_code)
log.debug("Exec: %s", code)
log.debug("Stdin: %s", stdin)
res = jail_code.jail_code("python", code=jailed_code, stdin=stdin, files=files)
if res.status != 0:
raise Exception("Couldn't execute jailed code: %s" % res.stderr)
globals_dict.update(json.loads(res.stdout))
def json_safe(d):
"""Return only the JSON-safe part of d.
Used to emulate reading data through a serialization straw.
"""
ok_types = (type(None), int, long, float, str, unicode, list, tuple, dict)
bad_keys = ("__builtins__",)
jd = {}
for k, v in d.iteritems():
if not isinstance(v, ok_types):
continue
if k in bad_keys:
continue
try:
json.dumps(v)
except TypeError:
continue
else:
jd[k] = v
return json.loads(json.dumps(jd))
def not_safe_exec(code, globals_dict, files=None, python_path=None):
"""Another implementation of `safe_exec`, but not safe.
This can be swapped in for debugging problems in sandboxed Python code.
This is not thread-safe, due to temporarily changing the current directory
and modifying sys.path.
"""
g_dict = json_safe(globals_dict)
with temp_directory(delete_when_done=True) as tmpdir:
with change_directory(tmpdir):
# Copy the files here.
for filename in files or ():
dest = os.path.join(tmpdir, os.path.basename(filename))
shutil.copyfile(filename, dest)
original_path = sys.path
if python_path:
sys.path.extend(python_path)
try:
exec code in g_dict
finally:
sys.path = original_path
globals_dict.update(json_safe(g_dict))
# Running Python code in the sandbox makes it difficult to debug.
# Change 0 to 1 to run the code directly.
if 0 or not jail_code.is_configured("python"):
safe_exec = not_safe_exec
import sys
print "This is doit.py!"
print "My args are %r" % (sys.argv,)
"""Test jail_code.py"""
import os.path
import textwrap
import unittest
from nose.plugins.skip import SkipTest
from codejail.jail_code import jail_code, is_configured
dedent = textwrap.dedent
def jailpy(*args, **kwargs):
"""Run `jail_code` on Python."""
return jail_code("python", *args, **kwargs)
def file_here(fname):
"""Return the full path to a file alongside this code."""
return os.path.join(os.path.dirname(__file__), fname)
class JailCodeHelpers(object):
"""Assert helpers for jail_code tests."""
def setUp(self):
super(JailCodeHelpers, self).setUp()
if not is_configured("python"):
raise SkipTest
def assertResultOk(self, res):
self.assertEqual(res.stderr, "")
self.assertEqual(res.status, 0)
class TestFeatures(JailCodeHelpers, unittest.TestCase):
def test_hello_world(self):
res = jailpy(code="print 'Hello, world!'")
self.assertResultOk(res)
self.assertEqual(res.stdout, 'Hello, world!\n')
def test_argv(self):
res = jailpy(
code="import sys; print ':'.join(sys.argv[1:])",
argv=["Hello", "world", "-x"]
)
self.assertResultOk(res)
self.assertEqual(res.stdout, "Hello:world:-x\n")
def test_ends_with_exception(self):
res = jailpy(code="""raise Exception('FAIL')""")
self.assertNotEqual(res.status, 0)
self.assertEqual(res.stdout, "")
self.assertEqual(res.stderr, dedent("""\
Traceback (most recent call last):
File "jailed_code", line 1, in <module>
raise Exception('FAIL')
Exception: FAIL
"""))
def test_stdin_is_provided(self):
res = jailpy(
code="import json,sys; print sum(json.load(sys.stdin))",
stdin="[1, 2.5, 33]"
)
self.assertResultOk(res)
self.assertEqual(res.stdout.strip(), "36.5")
def test_files_are_copied(self):
res = jailpy(
code="print 'Look:', open('hello.txt').read()",
files=[file_here("hello.txt")]
)
self.assertResultOk(res)
self.assertEqual(res.stdout, 'Look: Hello there.\n\n')
def test_executing_a_copied_file(self):
res = jailpy(
files=[file_here("doit.py")],
argv=["doit.py", "1", "2", "3"]
)
self.assertResultOk(res)
self.assertEqual(res.stdout, "This is doit.py!\nMy args are ['doit.py', '1', '2', '3']\n")
class TestLimits(JailCodeHelpers, unittest.TestCase):
def test_cant_use_too_much_memory(self):
res = jailpy(code="print sum(range(100000000))")
self.assertNotEqual(res.status, 0)
self.assertEqual(res.stdout, "")
def test_cant_use_too_much_cpu(self):
res = jailpy(code="print sum(xrange(100000000))")
self.assertNotEqual(res.status, 0)
self.assertEqual(res.stdout, "")
def test_cant_use_too_much_time(self):
raise SkipTest # TODO: test this once we can kill sleeping processes.
res = jailpy(code=dedent("""\
import time
time.sleep(5)
print 'Done!'
"""))
self.assertNotEqual(res.status, 0)
self.assertEqual(res.stdout, "")
def test_cant_write_files(self):
res = jailpy(code=dedent("""\
print "Trying"
with open("mydata.txt", "w") as f:
f.write("hello")
with open("mydata.txt") as f2:
print "Got this:", f2.read()
"""))
self.assertNotEqual(res.status, 0)
self.assertEqual(res.stdout, "Trying\n")
self.assertIn("ermission denied", res.stderr)
def test_cant_use_network(self):
res = jailpy(code=dedent("""\
import urllib
print "Reading google"
u = urllib.urlopen("http://google.com")
google = u.read()
print len(google)
"""))
self.assertNotEqual(res.status, 0)
self.assertEqual(res.stdout, "Reading google\n")
self.assertIn("IOError", res.stderr)
# TODO: fork
class TestMalware(JailCodeHelpers, unittest.TestCase):
def test_crash_cpython(self):
# http://nedbatchelder.com/blog/201206/eval_really_is_dangerous.html
res = jailpy(code=dedent("""\
import new, sys
crash_me = new.function(new.code(0,0,0,0,"KABOOM",(),(),(),"","",0,""), {})
print "Here we go..."
sys.stdout.flush()
crash_me()
print "The afterlife!"
"""))
self.assertNotEqual(res.status, 0)
self.assertEqual(res.stdout, "Here we go...\n")
self.assertEqual(res.stderr, "")
def test_read_etc_passwd(self):
res = jailpy(code=dedent("""\
bytes = len(open('/etc/passwd').read())
print 'Gotcha', bytes
"""))
self.assertNotEqual(res.status, 0)
self.assertEqual(res.stdout, "")
self.assertIn("ermission denied", res.stderr)
def test_find_other_sandboxes(self):
res = jailpy(code=dedent("""
import os;
places = [
"..", "/tmp", "/", "/home", "/etc",
"/var"
]
for place in places:
try:
files = os.listdir(place)
except Exception:
# darn
pass
else:
print "Files in %r: %r" % (place, files)
print "Done."
"""))
self.assertResultOk(res)
self.assertEqual(res.stdout, "Done.\n")
"""Test safe_exec.py"""
import os.path
import textwrap
import unittest
from nose.plugins.skip import SkipTest
from codejail.safe_exec import safe_exec, not_safe_exec
class SafeExecTests(object):
"""The tests for `safe_exec`, will be mixed into specific test classes below."""
def test_set_values(self):
g = {}
self.safe_exec("a = 17", g)
self.assertEqual(g['a'], 17)
def test_files_are_copied(self):
g = {}
self.safe_exec(
"a = 'Look: ' + open('hello.txt').read()", g,
files=[os.path.dirname(__file__) + "/hello.txt"]
)
self.assertEqual(g['a'], 'Look: Hello there.\n')
def test_python_path(self):
g = {}
self.safe_exec(
"import module; a = module.const", g,
python_path=[os.path.dirname(__file__) + "/pylib"]
)
self.assertEqual(g['a'], 42)
def test_functions_calling_each_other(self):
g = {}
self.safe_exec(textwrap.dedent("""\
def f():
return 1723
def g():
return f()
x = g()
"""), g)
self.assertEqual(g['x'], 1723)
def test_printing_stuff_when_you_shouldnt(self):
g = {}
self.safe_exec("a = 17; print 'hi!'", g)
self.assertEqual(g['a'], 17)
def test_importing_lots_of_crap(self):
g = {}
self.safe_exec(textwrap.dedent("""\
from numpy import *
a = 1723
"""), g)
self.assertEqual(g['a'], 1723)
class TestSafeExec(SafeExecTests, unittest.TestCase):
"""Run SafeExecTests, with the real safe_exec."""
def safe_exec(self, *args, **kwargs):
safe_exec(*args, **kwargs)
class TestNotSafeExec(SafeExecTests, unittest.TestCase):
"""Run SafeExecTests, with not_safe_exec."""
def setUp(self):
# If safe_exec is actually an alias to not_safe_exec, then there's no
# point running these tests.
if safe_exec is not_safe_exec:
raise SkipTest
def safe_exec(self, *args, **kwargs):
not_safe_exec(*args, **kwargs)
"""Helpers for codejail."""
import contextlib
import os
import shutil
import tempfile
class TempDirectory(object):
def __init__(self, delete_when_done=True):
self.delete_when_done = delete_when_done
self.temp_dir = tempfile.mkdtemp(prefix="codejail-")
# Make directory readable by other users ('sandbox' user needs to be able to read it)
os.chmod(self.temp_dir, 0775)
def clean_up(self):
if self.delete_when_done:
# if this errors, something is genuinely wrong, so don't ignore errors.
shutil.rmtree(self.temp_dir)
@contextlib.contextmanager
def temp_directory(delete_when_done=True):
"""
A context manager to make and use a temp directory. If `delete_when_done`
is true (the default), the directory will be removed when done.
"""
tmp = TempDirectory(delete_when_done)
try:
yield tmp.temp_dir
finally:
tmp.clean_up()
class ChangeDirectory(object):
def __init__(self, new_dir):
self.old_dir = os.getcwd()
os.chdir(new_dir)
def clean_up(self):
os.chdir(self.old_dir)
@contextlib.contextmanager
def change_directory(new_dir):
"""
A context manager to change the directory, and then change it back.
"""
cd = ChangeDirectory(new_dir)
try:
yield new_dir
finally:
cd.clean_up()
from setuptools import setup
setup(
name="codejail",
version="0.1",
packages=['codejail'],
)
......@@ -2,6 +2,6 @@
-e common/lib/calc
-e common/lib/capa
-e common/lib/chem
-e common/lib/codejail
#-e common/lib/codejail
-e common/lib/xmodule
-e .
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment