Commit f4cb95bb by Ned Batchelder

Optionally use a proxy process to spawn subprocesses

When spawning subprocesses, fork() will fail if the memory of the parent
can't be duplicated in the child. This means large long-lived processes
will have problems using CodeJail repeatedly.

This introduces a long-lived proxy subprocess which then spawns the
actual subprocesses.  Because the proxy process starts small and stays
small, it will be able to spawn subprocesses even when the parent
process grows large.

The use of the proxy process is controlled by a "PROXY" limit, which
should be 0 or 1.  If neither is set, then the CODEJAIL_PROXY
environment variable determines whether the proxy is used.
parent f373d23f
# Makefile for CodeJail
test: test_no_proxy test_proxy
test_no_proxy:
@echo "Running all tests with no proxy process"
CODEJAIL_PROXY=0 nosetests
test_proxy:
@echo "Running all tests with proxy process"
CODEJAIL_PROXY=1 nosetests
......@@ -159,7 +159,9 @@ the rights to modify the files in its site-packages directory.
Tests
-----
The tests run under nose in the standard fashion.
Run the tests with the Makefile::
$ make tests
If CodeJail is running unsafely, many of the tests will be automatically
skipped, or will fail, depending on whether CodeJail thinks it should be in
......
......@@ -24,7 +24,9 @@ class ConfigureCodeJailMiddleware(object):
if python_bin:
user = settings.CODE_JAIL['user']
codejail.jail_code.configure("python", python_bin, user=user)
limits = settings.CODE_JAIL.get('limits', {})
for name, value in limits.items():
codejail.jail_code.set_limit(name, value)
raise MiddlewareNotUsed
......@@ -5,14 +5,13 @@ import os
import os.path
import resource
import shutil
import subprocess
import sys
import threading
import time
from .proxy import run_subprocess_through_proxy
from .subproc import run_subprocess
from .util import temp_directory
log = logging.getLogger(__name__)
log = logging.getLogger("codejail")
# TODO: limit too much stdout data?
......@@ -76,6 +75,10 @@ LIMITS = {
"VMEM": 0,
# Size of files creatable, in bytes, defaulting to nothing can be written.
"FSIZE": 0,
# Whether to use a proxy process or not. None means use an environment
# variable to decide. NOTE: using a proxy process is NOT THREAD-SAFE, only
# one thread can use CodeJail at a time if you are using a proxy process.
"PROXY": None,
}
......@@ -101,6 +104,9 @@ def set_limit(limit_name, value):
* `"FSIZE"`: the maximum size of files creatable by the jailed code,
in bytes. The default is 0 (no files may be created).
* `"PROXY"`: 1 to use a proxy process, 0 to not use one. This isn't
really a limit, sorry about that.
Limits are process-wide, and will affect all future calls to jail_code.
Providing a limit of 0 will disable that limit.
......@@ -214,29 +220,30 @@ def jail_code(command, code=None, files=None, extra_files=None, argv=None,
# Add the code-specific command line pieces.
cmd.extend(argv)
# Run the subprocess.
subproc = subprocess.Popen(
cmd, preexec_fn=set_process_limits, cwd=homedir, env={},
stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
)
if slug:
log.info("Executing jailed code %s in %s, with PID %s", slug, homedir, subproc.pid)
# Use the configuration and maybe an environment variable to determine
# whether to use a proxy process.
use_proxy = LIMITS["PROXY"]
if use_proxy is None:
use_proxy = int(os.environ.get("CODEJAIL_PROXY", "0"))
if use_proxy:
run_subprocess_fn = run_subprocess_through_proxy
else:
run_subprocess_fn = run_subprocess
# Start the time killer thread.
realtime = LIMITS["REALTIME"]
if realtime:
killer = ProcessKillerThread(subproc, limit=realtime)
killer.start()
# Run the subprocess.
status, stdout, stderr = run_subprocess_fn(
cmd=cmd, cwd=homedir, env={}, slug=slug,
stdin=stdin,
realtime=LIMITS["REALTIME"], rlimits=create_rlimits(),
)
result = JailResult()
result.stdout, result.stderr = subproc.communicate(stdin)
result.status = subproc.returncode
result.status = status
result.stdout = stdout
result.stderr = stderr
# Remove the tmptmp directory as the sandbox user
# since the sandbox user may have written files that
# the application user can't delete.
# Remove the tmptmp directory as the sandbox user since the sandbox
# user may have written files that the application user can't delete.
rm_cmd.extend([
'/usr/bin/find', tmptmp,
'-mindepth', '1', '-maxdepth', '1',
......@@ -244,22 +251,19 @@ def jail_code(command, code=None, files=None, extra_files=None, argv=None,
])
# Run the rm command subprocess.
subproc = subprocess.Popen(rm_cmd, cwd=homedir)
subproc.communicate()
run_subprocess_fn(rm_cmd, cwd=homedir)
return result
def set_process_limits(): # pragma: no cover
def create_rlimits():
"""
Set limits on this process, to be used first in a child process.
Create a list of resource limits for our jailed processes.
"""
# Set a new session id so that this process and all its children will be
# in a new process group, so we can kill them all later if we need to.
os.setsid()
rlimits = []
# No subprocesses.
resource.setrlimit(resource.RLIMIT_NPROC, (0, 0))
rlimits.append((resource.RLIMIT_NPROC, (0, 0)))
# CPU seconds, not wall clock time.
cpu = LIMITS["CPU"]
......@@ -268,40 +272,15 @@ def set_process_limits(): # pragma: no cover
# reaches the soft limit, a SIGXCPU will be sent, which should kill the
# process. If you set the soft and hard limits the same, then the hard
# limit is reached, and a SIGKILL is sent, which is less distinctive.
resource.setrlimit(resource.RLIMIT_CPU, (cpu, cpu+1))
rlimits.append((resource.RLIMIT_CPU, (cpu, cpu+1)))
# Total process virtual memory.
vmem = LIMITS["VMEM"]
if vmem:
resource.setrlimit(resource.RLIMIT_AS, (vmem, vmem))
rlimits.append((resource.RLIMIT_AS, (vmem, vmem)))
# Size of written files. Can be zero (nothing can be written).
fsize = LIMITS["FSIZE"]
resource.setrlimit(resource.RLIMIT_FSIZE, (fsize, fsize))
rlimits.append((resource.RLIMIT_FSIZE, (fsize, fsize)))
class ProcessKillerThread(threading.Thread):
"""
A thread to kill a process after a given time limit.
"""
def __init__(self, subproc, limit):
super(ProcessKillerThread, self).__init__()
self.subproc = subproc
self.limit = limit
def run(self):
start = time.time()
while (time.time() - start) < self.limit:
time.sleep(.25)
if self.subproc.poll() is not None:
# Process ended, no need for us any more.
return
if self.subproc.poll() is None:
# Can't use subproc.kill because we launched the subproc with sudo.
pgid = os.getpgid(self.subproc.pid)
log.warning(
"Killing process %r (group %r), ran too long: %.1fs",
self.subproc.pid, pgid, time.time() - start
)
subprocess.call(["sudo", "pkill", "-9", "-g", str(pgid)])
return rlimits
"""A proxy subprocess-making process for CodeJail."""
import ast
import logging
import os
import os.path
import subprocess
import sys
import time
from .subproc import run_subprocess
log = logging.getLogger("codejail")
# We use .readline to get data from the pipes between the processes, so we need
# to ensure that a newline does not appear in the data. We also need a way to
# communicate a few values, and unpack them. Lastly, we need to be sure we can
# handle binary data. Serializing with repr() and deserializing the literals
# that result give us all the properties we need.
serialize = repr
deserialize = ast.literal_eval
##
## Client code, runs in the parent CodeJail process.
##
def run_subprocess_through_proxy(*args, **kwargs):
"""
Works just like :ref:`run_subprocess`, but through the proxy process.
This will retry a few times if need be.
"""
for tries in xrange(3):
try:
proxy = get_proxy()
# Write the args and kwargs to the proxy process.
proxy_stdin = serialize((args, kwargs))
proxy.stdin.write(proxy_stdin+"\n")
# Read the result from the proxy. This blocks until the process
# is done.
proxy_stdout = proxy.stdout.readline()
if not proxy_stdout:
# EOF: the proxy must have died.
raise Exception("Proxy process died unexpectedly!")
status, stdout, stderr, log_calls = deserialize(proxy_stdout.rstrip())
# Write all the log messages to the log, and return.
for level, msg, args in log_calls:
log.log(level, msg, *args)
return status, stdout, stderr
except Exception as e:
log.exception("Proxy process failed")
# Give the proxy process a chance to die completely if it is dying.
time.sleep(.001)
continue
# If we finished all the tries, then raise the last exception we got.
raise
# There is one global proxy process.
PROXY_PROCESS = None
def get_proxy():
global PROXY_PROCESS
# If we had a proxy process, but it died, clean up.
if PROXY_PROCESS is not None:
status = PROXY_PROCESS.poll()
if status is not None:
log.info(
"CodeJail proxy process (pid %d) ended with status code %d",
PROXY_PROCESS.pid,
status
)
PROXY_PROCESS = None
# If we need a proxy, make a proxy.
if PROXY_PROCESS is None:
# Start the proxy by invoking proxy_main.py in our root directory.
root = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
proxy_main_py = os.path.join(root, "proxy_main.py")
# Run proxy_main.py with the same Python that is running us. "-u" makes
# the stdin and stdout unbuffered. We pass the log level of the
# "codejail" log so that the proxy can send back an appropriate level
# of detail in the log messages.
log_level = log.getEffectiveLevel()
cmd = [sys.executable, '-u', proxy_main_py, str(log_level)]
PROXY_PROCESS = subprocess.Popen(
args=cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
log.info("Started CodeJail proxy process (pid %d)", PROXY_PROCESS.pid)
return PROXY_PROCESS
##
## Proxy process code
##
class CapturingHandler(logging.Handler):
"""
A logging Handler that captures all the log calls, for later replay.
NOTE: this doesn't capture all aspects of the log record. It only captures
the log level, the message string, and the arguments. It does not capture
the caller, the current exception, the current time, etc.
"""
def __init__(self):
super(CapturingHandler, self).__init__()
self.log_calls = []
def createLock(self):
self.lock = None
def handle(self, record):
self.log_calls.append((record.levelno, record.msg, record.args))
def get_log_calls(self):
retval = self.log_calls
self.log_calls = []
return retval
def proxy_main(argv):
"""
The main program for the proxy process.
It does this:
* Reads a line from stdin with the repr of a tuple: (args, kwargs)
* Calls :ref:`run_subprocess` with *args, **kwargs
* Writes one line to stdout: the repr of the return value from
`run_subprocess` and the log calls made:
(status, stdout, stderr, log_calls) .
The process ends when its stdin is closed.
`argv` is the argument list of the process, from sys.argv. The only
argument is the logging level for the "codejail" log in the parent
process. Since we tunnel our logging back to the parent, we don't want to
send everything, just the records that the parent will actually log.
"""
# We don't want to see any noise on stderr.
sys.stderr = open(os.devnull, "w")
# Capture all logging messages.
capture_log = CapturingHandler()
log.addHandler(capture_log)
log.setLevel(int(argv[1]) or logging.DEBUG)
log.debug("Starting proxy process")
try:
while True:
stdin = sys.stdin.readline()
log.debug("proxy stdin: %r" % stdin)
if not stdin:
break
args, kwargs = deserialize(stdin.rstrip())
status, stdout, stderr = run_subprocess(*args, **kwargs)
log.debug("run_subprocess result: status=%r\nstdout=%r\nstderr=%r" % (status, stdout, stderr))
log_calls = capture_log.get_log_calls()
stdout = serialize((status, stdout, stderr, log_calls))
sys.stdout.write(stdout+"\n")
except Exception:
# Note that this log message will not get back to the parent, because
# we are dying and not communicating back to the parent. This will be
# useful only if you add another handler at the top of this function.
log.exception("Proxy dying due to exception")
log.debug("Exiting proxy process")
......@@ -14,7 +14,7 @@ except ImportError:
from codejail import jail_code
from codejail.util import temp_directory, change_directory
log = logging.getLogger(__name__)
log = logging.getLogger("codejail")
# Flags to let developers temporarily change some behavior in this file.
......
"""Subprocess helpers for CodeJail."""
import functools
import logging
import os
import resource
import subprocess
import threading
import time
log = logging.getLogger("codejail")
def run_subprocess(
cmd, stdin=None, cwd=None, env=None, rlimits=None, realtime=None,
slug=None,
):
"""
A helper to make a limited subprocess.
`cmd`, `cwd`, and `env` are exactly as `subprocess.Popen` expects.
`stdin` is the data to write to the stdin of the subprocess.
`rlimits` is a list of tuples, the arguments to pass to
`resource.setrlimit` to set limits on the process.
`realtime` is the number of seconds to limit the execution of the process.
`slug` is a short identifier for use in log messages.
This function waits until the process has finished executing before
returning.
Returns a tuple of three values: the exit status code of the process, and
the stdout and stderr of the process, as strings.
"""
subproc = subprocess.Popen(
cmd, cwd=cwd, env=env,
preexec_fn=functools.partial(set_process_limits, rlimits or ()),
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
)
if slug:
log.info("Executed jailed code %s in %s, with PID %s", slug, cwd, subproc.pid)
# Start the time killer thread.
if realtime:
killer = ProcessKillerThread(subproc, limit=realtime)
killer.start()
stdout, stderr = subproc.communicate(stdin)
return subproc.returncode, stdout, stderr
def set_process_limits(rlimits): # pragma: no cover
"""
Set limits on this process, to be used first in a child process.
"""
# Set a new session id so that this process and all its children will be
# in a new process group, so we can kill them all later if we need to.
os.setsid()
for limit, value in rlimits:
resource.setrlimit(limit, value)
class ProcessKillerThread(threading.Thread):
"""
A thread to kill a process after a given time limit.
"""
def __init__(self, subproc, limit):
super(ProcessKillerThread, self).__init__()
self.subproc = subproc
self.limit = limit
def run(self):
start = time.time()
while (time.time() - start) < self.limit:
time.sleep(.25)
if self.subproc.poll() is not None:
# Process ended, no need for us any more.
return
if self.subproc.poll() is None:
# Can't use subproc.kill because we launched the subproc with sudo.
pgid = os.getpgid(self.subproc.pid)
log.warning(
"Killing process %r (group %r), ran too long: %.1fs",
self.subproc.pid, pgid, time.time() - start
)
subprocess.call(["sudo", "pkill", "-9", "-g", str(pgid)])
"""Test jail_code.py"""
import json
import logging
import os
import os.path
import shutil
import signal
import textwrap
import tempfile
import time
import unittest
import mock
from nose.plugins.skip import SkipTest
from codejail.jail_code import jail_code, is_configured, set_limit, LIMITS
from codejail import proxy
def jailpy(code=None, *args, **kwargs):
......@@ -25,6 +30,27 @@ def file_here(fname):
return os.path.join(os.path.dirname(__file__), fname)
def text_of_logs(mock_calls):
"""
After capturing log messages, use this to get the full text.
Like this::
@mock.patch("codejail.subproc.log._log")
def test_with_log_messages(self, log_log):
do_something_that_makes_log_messages()
log_text = text_of_logs(log_log.mock_calls)
self.assertRegexpMatches(log_text, r"INFO: Something cool happened")
"""
text = ""
for m in mock_calls:
level, msg, args = m[1]
msg_formatted = msg % args
text += "%s: %s\n" % (logging.getLevelName(level), msg_formatted)
return text
class JailCodeHelpers(object):
"""Assert helpers for jail_code tests."""
def setUp(self):
......@@ -76,6 +102,42 @@ class TestFeatures(JailCodeHelpers, unittest.TestCase):
self.assertResultOk(res)
self.assertEqual(res.stdout, "36.5\n")
def test_stdin_can_be_large_and_binary(self):
res = jailpy(
code="import sys; print sum(ord(c) for c in sys.stdin.read())",
stdin="".join(chr(i) for i in range(256))*10000,
)
self.assertResultOk(res)
self.assertEqual(res.stdout, "326400000\n")
def test_stdout_can_be_large_and_binary(self):
res = jailpy(
code="""
import sys
sys.stdout.write("".join(chr(i) for i in range(256))*10000)
"""
)
self.assertResultOk(res)
self.assertEqual(
res.stdout,
"".join(chr(i) for i in range(256))*10000
)
def test_stderr_can_be_large_and_binary(self):
res = jailpy(
code="""
import sys
sys.stderr.write("".join(chr(i) for i in range(256))*10000)
sys.stdout.write("OK!")
"""
)
self.assertEqual(res.status, 0)
self.assertEqual(res.stdout, "OK!")
self.assertEqual(
res.stderr,
"".join(chr(i) for i in range(256))*10000
)
def test_files_are_copied(self):
res = jailpy(
code="print 'Look:', open('hello.txt').read()",
......@@ -163,6 +225,12 @@ class TestFeatures(JailCodeHelpers, unittest.TestCase):
"This is my file!\n['overthere.txt', '.myfile.txt']\n"
)
@mock.patch("codejail.subproc.log._log")
def test_slugs_get_logged(self, log_log):
res = jailpy(code="print 'Hello, world!'", slug="HELLO")
log_text = text_of_logs(log_log.mock_calls)
self.assertRegexpMatches(log_text, r"INFO: Executed jailed code HELLO in .*, with PID .*")
class TestLimits(JailCodeHelpers, unittest.TestCase):
"""Tests of the resource limits, and changing them."""
......@@ -207,13 +275,18 @@ class TestLimits(JailCodeHelpers, unittest.TestCase):
self.assertEqual(res.stdout, "")
self.assertEqual(res.status, 128+signal.SIGXCPU) # 137
def test_cant_use_too_much_time(self):
@mock.patch("codejail.subproc.log._log")
def test_cant_use_too_much_time(self, log_log):
# Default time limit is 1 second. Sleep for 1.5 seconds.
set_limit('CPU', 100)
res = jailpy(code="import time; time.sleep(1.5); print 'Done!'")
self.assertEqual(res.stdout, "")
self.assertEqual(res.status, -signal.SIGKILL) # -9
# Make sure we log that we are killing the process.
log_text = text_of_logs(log_log.mock_calls)
self.assertRegexpMatches(log_text, r"WARNING: Killing process \d+")
def test_changing_realtime_limit(self):
# Change time limit to 2 seconds, sleeping for 1.5 will be fine.
set_limit('REALTIME', 2)
......@@ -450,3 +523,54 @@ class TestMalware(JailCodeHelpers, unittest.TestCase):
""")
self.assertResultOk(res)
self.assertEqual(res.stdout, "Done.\n")
class TestProxyProcess(JailCodeHelpers, unittest.TestCase):
"""Tests of the proxy process."""
def setUp(self):
# During testing, the proxy is used if the environment variable is set.
# Skip these tests if we aren't using the proxy.
if not int(os.environ.get("CODEJAIL_PROXY", "0")):
raise SkipTest()
super(TestProxyProcess, self).setUp()
def run_ok(self):
"""Run some code to see that it works."""
num = int(time.time()*100000)
res = jailpy(code="print 'Look: %d'" % num)
self.assertResultOk(res)
self.assertEqual(res.stdout, 'Look: %d\n' % num)
def test_proxy_is_persistent(self):
# Running code twice, you use the same proxy process.
self.run_ok()
pid = proxy.PROXY_PROCESS.pid
self.run_ok()
self.assertEqual(proxy.PROXY_PROCESS.pid, pid)
def test_crash_proxy(self):
# We can run some code.
self.run_ok()
pids = set()
pids.add(proxy.PROXY_PROCESS.pid)
# Run this a number of times, to try to catch some cases.
for i in xrange(10):
# The proxy process dies unexpectedly!
proxy.PROXY_PROCESS.kill()
# The behavior is slightly different if we rush immediately to the
# next run, or if we wait a bit to let the process truly die, so
# alternate whether we wait or not.
if i % 2:
time.sleep(.1)
# Code can still run.
self.run_ok()
# We should have a new proxy process each time.
pid = proxy.PROXY_PROCESS.pid
self.assertNotIn(pid, pids)
pids.add(pid)
"""Memory-stress a long-running CodeJail-using process."""
from codejail import safe_exec
GOBBLE_CHUNK = int(1e7)
def main():
gobble = []
for i in xrange(int(1e7)):
print i
globs = {}
safe_exec.safe_exec("a = 17", globs)
assert globs["a"] == 17
gobble.append("x"*GOBBLE_CHUNK)
if __name__ == "__main__":
main()
"""The main program for the proxy process."""
import sys
from codejail.proxy import proxy_main
if __name__ == "__main__":
sys.exit(proxy_main(sys.argv))
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment