Source code for pycnfg.producer

"""
The :mod:`pycnfg.run` includes class to produce configuration object.
Use it as Mixin to add desired endpoints.

Support method to cache/read intermediate state of object (pickle/unpickle).
It useful to save time when reusing a configuration. Also python dictionary api
forwarding is supported.

"""


import functools
import glob
import importlib
import logging
import os
import sys

import pycnfg


[docs]class Producer(object): """Execute configuration steps. Interface: run, dump_cache, load_cache, dict_api. Parameters ---------- objects : dict Dictionary with resulted objects from previous executed producers: {'section_id__config__id', object}. oid : str Unique identifier of produced object. path_id: str, optional (default='path_default') Unique identifier of project path in ``objects``. If not exist, raise error if used by producer. logger_id: str, optional (default='logger_default') Unique identifier of logger either in ``objects`` or :data:`logging.root.manager.loggerDict` . If not found, attach new logger to stdout (with logger_id name and 'info' level). Attributes ---------- objects : dict Dictionary with resulted objects from previous executed producers: {'section_id__config__id', object,} oid : str Unique identifier of produced object. logger : logger object Logger. project_path: None Absolute path to project dir. """ _required_parameters = ['objects', 'oid']
[docs] def __init__(self, objects, oid, path_id='path__default', logger_id='logger__default'): if logger_id in objects: logger = objects[logger_id] elif logger_id in logging.root.manager.loggerDict: logger = logging.getLogger(logger_id) else: # Temporary, garbage collected (in opposite to getLogger()) logger = logging.Logger(logger_id) logger.addHandler(logging.StreamHandler(stream=sys.stdout)) logger.setLevel("INFO") if path_id in objects: project_path = objects[path_id] else: # Should be set either in objects or as configuration. # Otherwise double execution for path config. project_path = None self.objects = objects self.oid = oid self.logger = logger self.project_path = project_path
[docs] def run(self, init, steps): """Execute configuration steps. Consecutive call (with decorators): ``init = getattr(self, 'method_id')(init, objects=objects, **kwargs)`` Parameters ---------- init: object Will be passed as arg in each step and get back as result. steps : list of tuples List of ``self`` methods to run consecutive with kwargs: ('method_id', kwargs, decorators ). Returns ------- configs : list of tuple List of configurations, prepared for execution: [('section_id__config__id', config), ...]. Notes ----- Object identifier ``oid`` auto added, if produced object has ``oid`` attribute. """ self.logger.info(f"|__ CONFIGURATION: {self.oid}") self.logger.debug(f"steps:\n" f" {[i[0] for i in steps]}") res = init for step in steps: self.logger.debug(f"step:\n" f" {step[0]}") # len(step)=3 guaranteed. method = step[0] kwargs = step[1] decors = step[2] self.logger.info(f" |__ {method.upper()}") if not isinstance(kwargs, dict): raise ValueError(f"Kwargs for step '{method}' " f"should be a dictionary.") kwargs = self._resolve_object(kwargs, self.objects) res = functools.reduce(lambda x, y: y(x), decors, getattr(self, method))(res, **kwargs) # Add producer identifier to object (if needs). # Avoid hasattr(res, 'oid') to prevent execution. if 'oid' in dir(res): res.oid = self.oid res = self._check(res) return res
[docs] def dump_cache(self, obj, prefix=None, cachedir=None, pkg='pickle', **kwargs): """Dump intermediate object state to IO. Parameters ---------- obj : picklable Object to dump. prefix : str, optional (default=None) File identifier, added to filename. If None, 'self.oid' is used. cachedir : str, optional(default=None) Absolute path to dump dir or relative to 'project_path' started with './'. Created, if not exists. If None, "sproject_path/ .temp/objects" is used. pkg : str, optional (default='pickle') Import package and try ``pkg``.dump(obj, file, **kwargs). **kwargs : kwargs Additional parameters to pass in .dump(). Returns ------- obj : picklable Unchanged input for compliance with producer logic. """ if prefix is None: prefix = self.oid if cachedir is None: cachedir = f"{self.project_path}/.temp/objects" elif cachedir.startswith('./'): cachedir = f"{self.project_path}/{cachedir[2:]}" # Create .temp dir for cache if not exist. if not os.path.exists(cachedir): os.makedirs(cachedir) for filename in glob.glob(f"{cachedir}/{prefix}*"): os.remove(filename) fps = set() # pickle, dill, joblib. pkg_ = importlib.import_module(pkg) filepath = f'{cachedir}/{prefix}_.dump' with open(filepath, mode='wb') as f: pkg_.dump(obj, f, **kwargs) fps.add(filepath) self.logger.warning('Warning: update cache file(s):\n' ' {}'.format('\n '.join(fps))) return obj
[docs] def load_cache(self, obj, prefix=None, cachedir=None, pkg='pickle', **kwargs): """Load intermediate object state from IO. Parameters ---------- obj : picklable Object template, for producer logic only (ignored). prefix : str, optional (default=None) File identifier. If None, 'self.oid' is used. pkg : str, optional default('pickle') Import package and try obj = ``pkg``.load(file, **kwargs). cachedir : str, optional(default=None) Absolute path to load dir or relative to 'project_path' started with './'. If None, 'project_path/.temp/objects' is used. **kwargs : kwargs Additional parameters to pass in .load(). Returns ------- obj : picklable object Loaded cache. """ if prefix is None: prefix = self.oid if cachedir is None: cachedir = f"{self.project_path}/.temp/objects" elif cachedir.startswith('./'): cachedir = f"{self.project_path}/{cachedir[2:]}" pkg_ = importlib.import_module(pkg) filepath = f'{cachedir}/{prefix}_.dump' with open(filepath, mode='rb') as f: obj = pkg_.load(f, **kwargs) self.logger.warning(f"Warning: use cache file(s):\n {cachedir}") return obj
[docs] def dict_api(self, obj, method='update', **kwargs): """Forwarding api for dictionary object. Could be useful to add/pop keys via configuration steps. For example to proceed update: ('dict_api', {'b':7} ) """ if not isinstance(obj, dict): raise TypeError('Object should be a dictionary.') _ = getattr(obj, method)(**kwargs) return obj
[docs] def update(self, obj, items): """Update key(s) for dictionary object. Parameters ---------- obj : dict Object to update. items : dict, list, optional (default=None) Either dictionary or items [(key,val),] to update ``obj``. Returns ------- obj : dict Updated input. """ if not isinstance(obj, dict): raise TypeError('Object should be a dictionary.') obj.update(items) return obj
def _resolve_object(self, kwargs, objects): """Substitute objects in kwargs. If val not ends with '_id'. For str val (or str subval in list val) looks up in ``objects`` the key__val, replace if found. """ for key, val in kwargs.items(): if not key.endswith('_id'): # For compliance with list. val_ = val if isinstance(val, list) else [val] resolved = [objects[v] if isinstance(v, str) and v in objects else v for v in val_] kwargs[key] = resolved if isinstance(val, list)\ else resolved[0] return kwargs def _check(self, res): """Additional result check.""" return res
if __name__ == '__main__': pass