diff --git a/.gitignore b/.gitignore index 8d9efadde98af17abc731e2b35c1d603cff4dcc7..b6dbfafe06884dd5dc1c18f850275138a8160c87 100644 --- a/.gitignore +++ b/.gitignore @@ -1,7 +1,8 @@ -*.pyc -/.venv/ -/.cache/ -/MANIFEST -/dist/ -/build/ -*.egg-info/ +*.pyc +/.venv/ +/.cache/ +/MANIFEST +/dist/ +/build/ +*.egg-info/ +.venv/ \ No newline at end of file diff --git a/dojo/cli.py b/dojo/cli.py index 07cb84cc16c95d1c7df766d1bf87be7e8d08b041..f0f5bf7e7b3a03f8e37a55b776881f81e65ffaa9 100644 --- a/dojo/cli.py +++ b/dojo/cli.py @@ -15,12 +15,13 @@ def cli(): @click.option('--runner', default=None, help='specify a runner for the job') @click.option('--config', default='config', help='path to directory containing configuration files to be merged') @click.option('--env', default='development', help='environment used to select configuration and secrets') +@click.option('--input', dest='input_job', default=None, help='Use a single input for the job.') @click.pass_context -def run(context, name, runner, config, env): +def run(context, name, runner, config, env, input_job): if context.obj is None: context.obj = {} logging.getLogger().setLevel(logging.INFO) - Entrypoint().run(name, runner, config, env) + Entrypoint().run(name, runner, config, env, input_job) @cli.command(help='Encrypt secrets') diff --git a/dojo/run.py b/dojo/run.py index b78443367995562bf590e011b7cb65d31ef657b7..a60a50643afc948ba8a06f0cb5de9be3730e05fe 100644 --- a/dojo/run.py +++ b/dojo/run.py @@ -1,105 +1,110 @@ -from __future__ import absolute_import, print_function, unicode_literals - -import os -import yaml -import importlib -import fnmatch - -from datetime import datetime -from .secrets import Secrets - - -from .util import deep_merge - - -class Entrypoint(object): - - def run(self, name, runner, config, env): - # Build base config from core yml and jobs files. - base_config_path = os.path.join(config, 'config.yml') - base_config = self._read_yaml(base_config_path) or {} - if 'jobs' not in base_config: - base_config['jobs'] = {} - jobs_config_dir = os.path.join(config, 'jobs') - for config_file in self._list_files_r(jobs_config_dir, 'yml'): - jobs_config = self._read_yaml(config_file) - base_config['jobs'].update(jobs_config['jobs']) - - # Build secrets by decrypting available JSONs - env_json_secrets_path = os.path.join(config, 'secrets.%s.json.enc' % (env, )) - secrets = Secrets().decrypt(env_json_secrets_path) - - # Build the envionment-specific config, and merged to rendered config.json. - env_config_path = os.path.join(config, 'config.%s.yml' % (env, )) - env_config = self._read_yaml(env_config_path) or {} - config = deep_merge(base_config, env_config) - - # Build the job. - job = self._build_job(name, config, secrets, runner) - - # Derive and initialize the runner class. - runner_name = 'direct' if runner is None else runner - if '.' in runner_name: - runner_class = self._get_module_class(runner_name) - else: - if runner_name not in job.RUNNERS: - raise ValueError('specified runner "%s" is not supported by job type %s, only %s' % (runner_name, job.__class__.__name__, job.RUNNERS.keys())) - runner_class = job.RUNNERS[runner_name] - - runner_class().run(job, config) - - def _build_job(self, job_name, config, secrets, runner): - job_config = config.get('jobs', {}).get(job_name, {}) - job_config.update({ - 'name': job_name, - 'timestamp': datetime.utcnow().strftime('%Y%m%d%H%M%S%f') - }) - job_class = self._get_module_class(config['jobs'][job_name]['adapter']) - job_secrets = secrets.get(job_name, {}) - - if 'cloud' in config and 'store' in config['cloud'] and runner == 'cloud': - job_config['store'] = config['cloud']['store'] - else: - job_config['store'] = config['store'] - if 'store' not in job_secrets: - job_secrets['store'] = {} - if 'store' in job_config and 'connection' in job_config['store']: - job_secrets['store']['connection'] = secrets.get('connections', {}).get(job_config['store']['connection'],) - - job_connection = job_config.get('connection') - if isinstance(job_connection, str): - job_config['connection'] = config['connections'].get(job_connection, {}) - job_secrets['connection'] = secrets['connections'].get(job_connection, {}) - - # Merge job cloud config into global cloud config defaults - cloud_config = config.get('cloud', {}) - job_cloud_config = job_config.get('cloud', {}) - job_cloud_config.update() - job_cloud_config = deep_merge(cloud_config, job_cloud_config) - if len(job_cloud_config) > 0: - job_config['cloud'] = job_cloud_config - - return job_class(job_config, job_secrets) - - def _read_file(self, path): - if os.path.isfile(path): - with open(path, 'r') as f: - return f.read() - else: - return {} - - def _read_yaml(self, path): - with open(path, 'r') as f: - return yaml.load(f) - - def _get_module_class(self, module_class_path): - module_and_class_parts = module_class_path.split('.') - module = importlib.import_module('.'.join(module_and_class_parts[:-1])) - return getattr(module, module_and_class_parts[-1]) - - def _list_files_r(self, path, extension): - matches = [] - for root, dirnames, filenames in os.walk(path): - for filename in fnmatch.filter(filenames, '*.%s' % (extension, )): - matches.append(os.path.join(root, filename)) - return matches +from __future__ import absolute_import, print_function, unicode_literals + +import os +import yaml +import importlib +import fnmatch + +from datetime import datetime +from .secrets import Secrets + + +from .util import deep_merge + + +class Entrypoint(object): + + def run(self, name, runner, config, env, input_job): + # Build base config from core yml and jobs files. + base_config_path = os.path.join(config, 'config.yml') + base_config = self._read_yaml(base_config_path) or {} + if 'jobs' not in base_config: + base_config['jobs'] = {} + jobs_config_dir = os.path.join(config, 'jobs') + for config_file in self._list_files_r(jobs_config_dir, 'yml'): + jobs_config = self._read_yaml(config_file) + base_config['jobs'].update(jobs_config['jobs']) + + # Build secrets by decrypting available JSONs + env_json_secrets_path = os.path.join(config, 'secrets.%s.json.enc' % (env, )) + secrets = Secrets().decrypt(env_json_secrets_path) + + # Build the envionment-specific config, and merged to rendered config.json. + env_config_path = os.path.join(config, 'config.%s.yml' % (env, )) + env_config = self._read_yaml(env_config_path) or {} + config = deep_merge(base_config, env_config) + + # Build the job. + job = self._build_job(name, config, secrets, runner, input_job) + + # Derive and initialize the runner class. + runner_name = 'direct' if runner is None else runner + if '.' in runner_name: + runner_class = self._get_module_class(runner_name) + else: + if runner_name not in job.RUNNERS: + raise ValueError('specified runner "%s" is not supported by job type %s, only %s' % (runner_name, job.__class__.__name__, job.RUNNERS.keys())) + runner_class = job.RUNNERS[runner_name] + + runner_class().run(job, config) + + def _build_job(self, job_name, config, secrets, runner, input_job): + job_config = config.get('jobs', {}).get(job_name, {}) + job_config.update({ + 'name': job_name, + 'timestamp': datetime.utcnow().strftime('%Y%m%d%H%M%S%f') + }) + job_class = self._get_module_class(config['jobs'][job_name]['adapter']) + job_secrets = secrets.get(job_name, {}) + + if input_job is not None and input_job in job_config.get('inputs', {}): + job_config['inputs'] = { + input_job: job_config.get('inputs', {}).get(input_job, {}) + } + + if 'cloud' in config and 'store' in config['cloud'] and runner == 'cloud': + job_config['store'] = config['cloud']['store'] + else: + job_config['store'] = config['store'] + if 'store' not in job_secrets: + job_secrets['store'] = {} + if 'store' in job_config and 'connection' in job_config['store']: + job_secrets['store']['connection'] = secrets.get('connections', {}).get(job_config['store']['connection'],) + + job_connection = job_config.get('connection') + if isinstance(job_connection, str): + job_config['connection'] = config['connections'].get(job_connection, {}) + job_secrets['connection'] = secrets['connections'].get(job_connection, {}) + + # Merge job cloud config into global cloud config defaults + cloud_config = config.get('cloud', {}) + job_cloud_config = job_config.get('cloud', {}) + job_cloud_config.update() + job_cloud_config = deep_merge(cloud_config, job_cloud_config) + if len(job_cloud_config) > 0: + job_config['cloud'] = job_cloud_config + + return job_class(job_config, job_secrets) + + def _read_file(self, path): + if os.path.isfile(path): + with open(path, 'r') as f: + return f.read() + else: + return {} + + def _read_yaml(self, path): + with open(path, 'r') as f: + return yaml.load(f) + + def _get_module_class(self, module_class_path): + module_and_class_parts = module_class_path.split('.') + module = importlib.import_module('.'.join(module_and_class_parts[:-1])) + return getattr(module, module_and_class_parts[-1]) + + def _list_files_r(self, path, extension): + matches = [] + for root, dirnames, filenames in os.walk(path): + for filename in fnmatch.filter(filenames, '*.%s' % (extension, )): + matches.append(os.path.join(root, filename)) + return matches diff --git a/setup.py b/setup.py index 56f9c6a785496e49bd78ac8d64ffcd0fb417b7cf..929dd0fdc5c18a79c72fbe6e9281c24afa5ebcfe 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ from setuptools import setup, find_packages setup( name='dojo', - version='0.0.42', + version='0.0.43', description='A framework for building and running your data platform.', author='Data Up', author_email='dojo@dataup.me',