Commit 41cafc3b by Gabe Mulley

Use sudo to kick off the job

In the nearish future we will force login as users that can run commands as the hadoop user using sudo.

Change-Id: If891e176de0a5de4d6d38a01a258a997e14f905d
parent 6f6e124c
...@@ -18,6 +18,7 @@ def main(): ...@@ -18,6 +18,7 @@ def main():
parser.add_argument('--wait', action='store_true', help='wait for the task to complete') parser.add_argument('--wait', action='store_true', help='wait for the task to complete')
parser.add_argument('--verbose', action='store_true', help='display very verbose output') parser.add_argument('--verbose', action='store_true', help='display very verbose output')
parser.add_argument('--log-path', help='download luigi output streams after completing the task', default=None) parser.add_argument('--log-path', help='download luigi output streams after completing the task', default=None)
parser.add_argument('--user', help='remote user name to connect as', default=None)
arguments, extra_args = parser.parse_known_args() arguments, extra_args = parser.parse_known_args()
arguments.launch_task_arguments = extra_args arguments.launch_task_arguments = extra_args
...@@ -39,7 +40,10 @@ def run_task_playbook(arguments, uid): ...@@ -39,7 +40,10 @@ def run_task_playbook(arguments, uid):
uid (str): A unique identifier for this task execution. uid (str): A unique identifier for this task execution.
""" """
extra_vars = convert_args_to_extra_vars(arguments, uid) extra_vars = convert_args_to_extra_vars(arguments, uid)
return run_ansible(('task.yml', '-e', extra_vars), arguments.verbose, executable='ansible-playbook') args = ['task.yml', '-e', extra_vars]
if arguments.user:
args.extend(['-u', arguments.user])
return run_ansible(tuple(args), arguments.verbose, executable='ansible-playbook')
def convert_args_to_extra_vars(arguments, uid): def convert_args_to_extra_vars(arguments, uid):
...@@ -54,7 +58,7 @@ def convert_args_to_extra_vars(arguments, uid): ...@@ -54,7 +58,7 @@ def convert_args_to_extra_vars(arguments, uid):
extra_vars = { extra_vars = {
'name': arguments.job_flow_id or arguments.job_flow_name, 'name': arguments.job_flow_id or arguments.job_flow_name,
'branch': arguments.branch, 'branch': arguments.branch,
'task_arguments': ' '.join(arguments.launch_task_arguments) + ' >/tmp/{0}.out 2>/tmp/{0}.err'.format(uid), 'task_arguments': ' '.join(arguments.launch_task_arguments),
'uuid': uid, 'uuid': uid,
} }
if arguments.repo: if arguments.repo:
...@@ -109,15 +113,19 @@ def download_logs(arguments, uid): ...@@ -109,15 +113,19 @@ def download_logs(arguments, uid):
uid (str): A unique identifier for this task execution. uid (str): A unique identifier for this task execution.
""" """
for extension in ['out', 'err']: for extension in ['out', 'err']:
args = [
'mr_{job_flow}_master'.format(job_flow=arguments.job_flow_id),
'-m', 'fetch',
'-a', 'src=/tmp/{uid}.{ext} dest={dest} flat=yes'.format(
uid=uid,
ext=extension,
dest=arguments.log_path
)
]
if arguments.user:
args.extend(['-u', arguments.user])
run_ansible( run_ansible(
( tuple(args),
'mr_{job_flow}_master'.format(job_flow=arguments.job_flow_id),
'-m', 'fetch',
'-a', 'src=/tmp/{uid}.{ext} dest={dest} flat=yes'.format(
uid=uid,
ext=extension,
dest=arguments.log_path
)
),
arguments.verbose arguments.verbose
) )
#
# Define logging for use with analytics tasks.
#
# This defines handlers for logging coming from
# edx/analytics code, and from luigi code.
# Luigi messages go to stdout, while edx messages
# are routed to stderr.
[loggers]
keys=root,edx_analytics,luigi_interface
[handlers]
keys=stderrHandler,luigiHandler,localHandler
[formatters]
keys=standard,luigi_default
[logger_root]
level=DEBUG
handlers=localHandler
[logger_edx_analytics]
# Errors from edx/analytics get routed to stderr.
level=WARNING
handlers=stderrHandler
qualname=edx.analytics
propagate=0
[logger_luigi_interface]
# Errors from luigi-interface get routed to stdout.
level=INFO
handlers=luigiHandler
qualname=luigi-interface
propagate=0
[handler_stderrHandler]
class=StreamHandler
formatter=standard
args=(sys.stderr,)
[handler_luigiHandler]
# Define as in luigi/interface.py.
class=StreamHandler
formatter=luigi_default
args=(sys.stdout,)
[handler_localHandler]
# Define as in edx-platform/common/lib/logsettings.py (for dev logging, not syslog).
class=logging.handlers.RotatingFileHandler
formatter=standard
args=('{{ log_dir }}/edx_analytics.log', 'w')
# 'maxBytes': 1024 * 1024 * 2,
# 'backupCount': 5,
[formatter_standard]
# Define as in edx-platform/common/lib/logsettings.py (for dev logging, not syslog).
format=%(asctime)s %(levelname)s %(process)d [%(name)s] %(filename)s:%(lineno)d - %(message)s
[formatter_luigi_default]
# Define as in luigi/interface.py.
format=%(levelname)s: %(message)s
...@@ -10,12 +10,14 @@ ...@@ -10,12 +10,14 @@
- name: Run a task - name: Run a task
hosts: "mr_{{ name }}_master" hosts: "mr_{{ name }}_master"
gather_facts: False gather_facts: False
sudo: False
vars: vars:
- repo: git@github.com:edx/analytics-tasks.git - repo: git@github.com:edx/analytics-tasks.git
- branch: master - branch: master
- working_dir: "/tmp/task-work-{{ uuid }}" - root_data_dir: /var/lib/analytics-tasks
- root_log_dir: /var/log/analytics-tasks
- working_dir: "{{ root_data_dir }}/{{ uuid }}"
- log_dir: "{{ root_log_dir }}/{{ uuid}}"
- working_repo_dir: "{{ working_dir }}/repo" - working_repo_dir: "{{ working_dir }}/repo"
- working_venv_dir: "{{ working_dir }}/venv" - working_venv_dir: "{{ working_dir }}/venv"
- task_arguments: '' - task_arguments: ''
...@@ -34,13 +36,27 @@ ...@@ -34,13 +36,27 @@
regexp=^github.com regexp=^github.com
line="{{ git_server_hostname }},{{ git_server_ip_address }} {{ git_server_public_key }}" line="{{ git_server_hostname }},{{ git_server_ip_address }} {{ git_server_public_key }}"
- name: root directories created
file: path={{ item }} state=directory owner=root group=root
sudo: True
with_items:
- "{{ root_data_dir }}"
- "{{ root_log_dir }}"
- name: working directories created - name: working directories created
file: path={{ item }} state=directory file: path={{ item }} state=directory owner={{ ansible_ssh_user }} group={{ ansible_ssh_user }}
sudo: True
with_items: with_items:
- "{{ working_dir }}" - "{{ working_dir }}"
- "{{ working_repo_dir }}" - "{{ working_repo_dir }}"
- "{{ working_venv_dir }}" - "{{ working_venv_dir }}"
- name: log directory created
file: path={{ item }} state=directory mode=777 owner={{ ansible_ssh_user }} group={{ ansible_ssh_user }}
sudo: True
with_items:
- "{{ log_dir }}"
- name: analytics tasks repository checked out - name: analytics tasks repository checked out
git: repo={{ repo }} dest={{ working_repo_dir }} git: repo={{ repo }} dest={{ working_repo_dir }}
...@@ -52,9 +68,10 @@ ...@@ -52,9 +68,10 @@
- name: ensure system packages are installed - name: ensure system packages are installed
command: make system-requirements chdir={{ working_repo_dir }} command: make system-requirements chdir={{ working_repo_dir }}
sudo: True
- name: bootstrap pip - name: bootstrap pip
command: sudo apt-get install -q -y python-pip command: apt-get install -q -y python-pip
sudo: True sudo: True
- name: virtualenv installed - name: virtualenv installed
...@@ -70,6 +87,9 @@ ...@@ -70,6 +87,9 @@
. {{ working_venv_dir }}/bin/activate && make install . {{ working_venv_dir }}/bin/activate && make install
chdir={{ working_repo_dir }} chdir={{ working_repo_dir }}
- name: logging configured
template: src=logging.cfg.j2 dest={{ working_repo_dir }}/logging.cfg
# Unfortunately, we cannot make the poll value a variable because of this open issue: # Unfortunately, we cannot make the poll value a variable because of this open issue:
# https://github.com/ansible/ansible/issues/255 # https://github.com/ansible/ansible/issues/255
# As a workaround, we define two versions of this play, then choose # As a workaround, we define two versions of this play, then choose
...@@ -78,14 +98,18 @@ ...@@ -78,14 +98,18 @@
# In the integration tests, we will wait for tasks to complete before verifying results. # In the integration tests, we will wait for tasks to complete before verifying results.
- name: task run (fire and forget) - name: task run (fire and forget)
shell: > shell: >
{{ working_venv_dir }}/bin/launch-task {{ task_arguments }} chdir={{ working_repo_dir }} . /home/hadoop/.bashrc && {{ working_venv_dir }}/bin/launch-task {{ task_arguments }} >{{ log_dir }}/stdout 2>{{ log_dir }}/stderr chdir={{ working_repo_dir }}
async: 10000000000 async: 10000000000
poll: 0 poll: 0
sudo: True
sudo_user: hadoop
when: not wait_for_task when: not wait_for_task
- name: task run (wait for completion) - name: task run (wait for completion)
shell: > shell: >
{{ working_venv_dir }}/bin/launch-task {{ task_arguments }} chdir={{ working_repo_dir }} . /home/hadoop/.bashrc && {{ working_venv_dir }}/bin/launch-task {{ task_arguments }} >{{ log_dir }}/stdout 2>{{ log_dir }}/stderr chdir={{ working_repo_dir }}
async: 10000000000 async: 10000000000
poll: 10 poll: 10
sudo: True
sudo_user: hadoop
when: wait_for_task when: wait_for_task
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment