augwrap.py 6.85 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
#!/usr/bin/python3

# Copyright (C) 2017

import posixpath
import logging
import os
import collections

from augeas import Augeas

AUGEAS_LOAD_PATH = '/augeas/load/'
AUGEAS_FILES_PATH = '/files/'
AUGEAS_ERROR_PATH = '//error'

log = logging.getLogger('augeas')

18

19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
def join(*paths):
    """
    join two Augeas tree paths

    FIXME: Beware: // is normalized to /
    """
    norm_paths = [posixpath.normpath(path) for path in paths]
    # first path must be absolute
    assert norm_paths[0][0] == '/'
    new_paths = [norm_paths[0]]
    # relativize all other paths so join works as expected
    for path in norm_paths[1:]:
        if path.startswith('/'):
            path = path[1:]
        new_paths.append(path)
    new_path = posixpath.join(*new_paths)
    log.debug("join: new_path %s", new_path)
    return posixpath.normpath(new_path)


39
class AugeasWrapper:
40 41 42 43 44 45 46 47 48
    """python-augeas higher-level wrapper.

    Load single augeas lens and configuration file.
    Exposes configuration file as AugeasNode object with dict-like interface.

    AugeasWrapper can be used in with statement in the same way as file does.
    """

    def __init__(self, confpath, lens, root=None, loadpath=None,
49
                 flags=Augeas.NO_MODL_AUTOLOAD | Augeas.NO_LOAD | Augeas.ENABLE_SPAN):
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
        """Parse configuration file using given lens.

        Params:
            confpath (str): Absolute path to the configuration file
            lens (str): Name of module containing Augeas lens
            root: passed down to original Augeas
            flags: passed down to original Augeas
            loadpath: passed down to original Augeas
            flags: passed down to original Augeas
        """
        log.debug('loadpath: %s', loadpath)
        log.debug('confpath: %s', confpath)
        self._aug = Augeas(root=root, loadpath=loadpath, flags=flags)

        # /augeas/load/{lens}
        aug_load_path = join(AUGEAS_LOAD_PATH, lens)
        # /augeas/load/{lens}/lens = {lens}.lns
        self._aug.set(join(aug_load_path, 'lens'), '%s.lns' % lens)
        # /augeas/load/{lens}/incl[0] = {confpath}
        self._aug.set(join(aug_load_path, 'incl[0]'), confpath)
        self._aug.load()

        errors = self._aug.match(AUGEAS_ERROR_PATH)
        if errors:
            err_msg = '\n'.join(
                ["{}: {}".format(e, self._aug.get(e)) for e in errors]
            )
            raise RuntimeError(err_msg)

        path = join(AUGEAS_FILES_PATH, confpath)
        paths = self._aug.match(path)
        if len(paths) != 1:
            raise ValueError('path %s did not match exactly once' % path)
        self.tree = AugeasNode(self._aug, path)
        self._loaded = True

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        self.save()
        self.close()

    def save(self):
        """Save Augeas tree to its original file."""
        assert self._loaded
        try:
            self._aug.save()
        except IOError as exc:
            log.exception(exc)
            for err_path in self._aug.match('//error'):
                log.error('%s: %s', err_path,
                          self._aug.get(os.path.join(err_path, 'message')))
            raise

    def close(self):
        """
        close Augeas library

        After calling close() the object must not be used anymore.
        """
        assert self._loaded
        self._aug.close()
        del self._aug
        self._loaded = False

    def match(self, path):
        """Yield AugeasNodes matching given expression."""
        assert self._loaded
119
        assert path
120 121 122 123 124 125 126 127
        log.debug('tree match %s', path)
        for matched_path in self._aug.match(path):
            yield AugeasNode(self._aug, matched_path)


class AugeasNode(collections.MutableMapping):
    """One Augeas tree node with dict-like interface."""

128
    def __init__(self, aug, path):
129 130 131 132 133 134 135 136 137 138 139 140
        """
        Args:
            aug (AugeasWrapper or Augeas): Augeas library instance
            path (str): absolute path in Augeas tree matching single node

        BEWARE: There are no sanity checks of given path for performance reasons.
        """
        assert aug
        assert path
        assert path.startswith('/')
        self._aug = aug
        self._path = path
141
        self._span = None
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156

    @property
    def path(self):
        """canonical path in Augeas tree, read-only"""
        return self._path

    @property
    def value(self):
        """
        get value of this node in Augeas tree
        """
        value = self._aug.get(self._path)
        log.debug('tree get: %s = %s', self._path, value)
        return value

157 158 159 160 161 162 163 164
    @value.setter
    def value(self, value):
        """
        set value of this node in Augeas tree
        """
        log.debug('tree set: %s = %s', self._path, value)
        self._aug.set(self._path, value)

165 166 167 168 169 170
    @property
    def span(self):
        if self._span is None:
            self._span = "char position %s" % self._aug.span(self._path)[5]
        return self._span

171 172 173 174
    @property
    def char(self):
        return self._aug.span(self._path)[5]

175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227
    def __len__(self):
        """
        number of items matching this path

        It is always 1 after __init__() but it may change
        as Augeas tree changes.
        """
        return len(self._aug.match(self._path))

    def __getitem__(self, key):
        if isinstance(key, int):
            # int is a shortcut to write [int]
            target_path = '%s[%s]' % (self._path, key)
        else:
            target_path = self._path + key
        log.debug('tree getitem: target_path %s', target_path)
        paths = self._aug.match(target_path)
        if len(paths) != 1:
            raise KeyError('path %s did not match exactly once' % target_path)
        return AugeasNode(self._aug, target_path)

    def __delitem__(self, key):
        log.debug('tree delitem: %s + %s', self._path, key)
        target_path = self._path + key
        log.debug('tree delitem: target_path %s', target_path)
        self._aug.remove(target_path)

    def __setitem__(self, key, value):
        assert isinstance(value, AugeasNode)
        target_path = self.path + key
        self._aug.copy(value.path, target_path)

    def __iter__(self):
        self_path_len = len(self._path)
        assert self_path_len > 0

        log.debug('tree iter: %s', self._path)
        for new_path in self._aug.match(self._path):
            if len(new_path) == self_path_len:
                yield ''
            else:
                yield new_path[self_path_len - 1:]

    def match(self, subpath):
        """Yield AugeasNodes matching given sub-expression."""
        assert subpath.startswith("/")
        match_path = "%s%s" % (self._path, subpath)
        log.debug('tree match %s: %s', match_path, self._path)
        for matched_path in self._aug.match(match_path):
            yield AugeasNode(self._aug, matched_path)

    def __repr__(self):
        return 'AugeasNode(%s)' % self._path