Source code for ecgtools.builder

import fnmatch
import os.path
import re
import tempfile
import typing
import warnings

import fsspec
import joblib
import pandas as pd
import pydantic
import toolz
from intake_esm.cat import (
    Aggregation,
    AggregationControl,
    Assets,
    Attribute,
    DataFormat,
    ESMCatalogModel,
)

INVALID_ASSET = 'INVALID_ASSET'
TRACEBACK = 'TRACEBACK'


def glob_to_regex(*, include_patterns, exclude_patterns):
    include_regex = r'|'.join([fnmatch.translate(x) for x in include_patterns])
    exclude_regex = r'|'.join([fnmatch.translate(x) for x in exclude_patterns]) or r'$.'
    return include_regex, exclude_regex


class RootDirectory(pydantic.BaseModel):
    path: str
    depth: int = 0
    storage_options: typing.Dict[typing.Any, typing.Any] = pydantic.Field(default_factory=dict)
    exclude_regex: str = pydantic.Field(default_factory=str)
    include_regex: str = pydantic.Field(default_factory=str)

    def __hash__(self):
        return hash(f'{self.path}{self.raw_path}')

    @property
    def mapper(self):
        return fsspec.get_mapper(self.path, **self.storage_options)

    @property
    def protocol(self):
        protocol = self.mapper.fs.protocol
        if isinstance(protocol, (list, tuple)):
            protocol = protocol[0]
        return protocol

    @property
    def raw_path(self):
        return self.mapper.fs._strip_protocol(self.path)

    def walk(self):
        all_assets = []
        for root, dirs, files in self.mapper.fs.walk(self.raw_path, maxdepth=self.depth + 1):
            # exclude dirs
            directories = [os.path.join(root, directory) for directory in dirs]
            directories = [
                directory
                for directory in directories
                if not re.match(self.exclude_regex, directory)
            ]

            if files:
                # exclude/include assets
                if self.protocol != 'file':
                    files = [f'{self.protocol}://{os.path.join(root, file)}' for file in files]
                else:
                    files = [os.path.join(root, file) for file in files]
                files = [file for file in files if not re.match(self.exclude_regex, file)]
                files = [file for file in files if re.match(self.include_regex, file)]
                all_assets.extend(files)

            # Look for zarr assets. This works for zarr stores created with consolidated metadata
            # print(all_assets)
            for directory in directories:
                if self.mapper.fs.exists(f'{directory}/.zmetadata'):
                    path = (
                        f'{self.protocol}://{directory}' if self.protocol != 'file' else directory
                    )
                    all_assets.append(path)

        return all_assets


[docs] @pydantic.dataclasses.dataclass class Builder: """Generates a catalog from a list of netCDF files or zarr stores Parameters ---------- paths : list of str List of paths to crawl for assets/files. storage_options : dict, optional Parameters passed to the backend file-system such as Google Cloud Storage, Amazon Web Service S3 depth : int, optional Maximum depth to crawl for assets. Default is 0. exclude_patterns : list of str, optional List of glob patterns to exclude from crawling. include_patterns : list of str, optional List of glob patterns to include from crawling. joblib_parallel_kwargs : dict, optional Parameters passed to joblib.Parallel. Default is {}. """ paths: typing.List[str] storage_options: typing.Optional[typing.Dict[typing.Any, typing.Any]] = None depth: int = 0 exclude_patterns: typing.Optional[typing.List[str]] = None include_patterns: typing.Optional[typing.List[str]] = None joblib_parallel_kwargs: typing.Optional[typing.Dict[str, typing.Any]] = None def __post_init__(self): self.storage_options = self.storage_options or {} self.joblib_parallel_kwargs = self.joblib_parallel_kwargs or {} self.exclude_patterns = self.exclude_patterns or [] self.include_patterns = self.include_patterns or [] # transform glob patterns to regular expressions self.include_regex, self.exclude_regex = glob_to_regex( include_patterns=self.include_patterns, exclude_patterns=self.exclude_patterns ) self._root_dirs = [ RootDirectory( path=path, storage_options=self.storage_options, depth=self.depth, exclude_regex=self.exclude_regex, include_regex=self.include_regex, ) for path in self.paths ] self.assets = None self.invalid_assets = pd.DataFrame() self.entries = None self.df = pd.DataFrame() def get_assets(self): assets = [directory.walk() for directory in self._root_dirs] self.assets = sorted(toolz.unique(toolz.concat(assets))) return self @pydantic.validate_arguments def parse( self, *, parsing_func: typing.Callable, parsing_func_kwargs: typing.Optional[dict] = None ): if not self.assets: raise ValueError('asset list provided is None. Please run `.get_assets()` first') parsing_func_kwargs = {} if parsing_func_kwargs is None else parsing_func_kwargs entries = joblib.Parallel(**self.joblib_parallel_kwargs)( joblib.delayed(parsing_func)(asset, **parsing_func_kwargs) for asset in self.assets ) self.entries = entries self.df = pd.DataFrame(entries) return self
[docs] def clean_dataframe(self): """Clean the dataframe by excluding invalid assets and removing duplicate entries.""" if INVALID_ASSET in self.df.columns: invalid_assets = self.df[self.df[INVALID_ASSET].notnull()][[INVALID_ASSET, TRACEBACK]] df = self.df[self.df[INVALID_ASSET].isnull()].drop(columns=[INVALID_ASSET, TRACEBACK]) self.invalid_assets = invalid_assets if not self.invalid_assets.empty: warnings.warn( f'Unable to parse {len(self.invalid_assets)} assets. A list of these assets can be found in `.invalid_assets` attribute.', stacklevel=2, ) self.df = df return self
[docs] @pydantic.validate_arguments def build( self, *, parsing_func: typing.Callable, parsing_func_kwargs: typing.Optional[dict] = None, postprocess_func: typing.Optional[typing.Callable] = None, postprocess_func_kwargs: typing.Optional[dict] = None, ): """Builds a catalog from a list of netCDF files or zarr stores. Parameters ---------- parsing_func : callable Function that parses the asset and returns a dictionary of metadata. parsing_func_kwargs : dict, optional Parameters passed to the parsing function. Default is {}. postprocess_func : callable, optional Function that post-processes the built dataframe and returns a pandas dataframe. Default is None. postprocess_func_kwargs : dict, optional Parameters passed to the post-processing function. Default is {}. Returns ------- :py:class:`~ecgtools.Builder` The builder object. """ self.get_assets().parse( parsing_func=parsing_func, parsing_func_kwargs=parsing_func_kwargs ).clean_dataframe() if postprocess_func: postprocess_func_kwargs = postprocess_func_kwargs or {} self.df = postprocess_func(self.df, **postprocess_func_kwargs) return self
[docs] @pydantic.validate_arguments def save( self, *, name: str, path_column_name: str, variable_column_name: str, data_format: DataFormat, groupby_attrs: typing.Optional[typing.List[str]] = None, aggregations: typing.Optional[typing.List[Aggregation]] = None, esmcat_version: str = '0.0.1', description: typing.Optional[str] = None, directory: typing.Optional[str] = None, catalog_type: str = 'file', to_csv_kwargs: typing.Optional[dict] = None, json_dump_kwargs: typing.Optional[dict] = None, ): """Persist catalog contents to files. Parameters ---------- name: str The name of the file to save the catalog to. path_column_name : str The name of the column containing the path to the asset. Must be in the header of the CSV file. variable_column_name : str Name of the attribute column in csv file that contains the variable name. data_format : str The data format. Valid values are netcdf and zarr. aggregations : List[dict] List of aggregations to apply to query results, default None esmcat_version : str The ESM Catalog version the collection implements, default None description : str Detailed multi-line description to fully explain the collection, default None directory: str The directory to save the catalog to. If None, use the current directory catalog_type: str The type of catalog to save. Whether to save the catalog table as a dictionary in the JSON file or as a separate CSV file. Valid options are 'dict' and 'file'. to_csv_kwargs : dict, optional Additional keyword arguments passed through to the :py:meth:`~pandas.DataFrame.to_csv` method. json_dump_kwargs : dict, optional Additional keyword arguments passed through to the :py:func:`~json.dump` function. Returns ------- :py:class:`~ecgtools.Builder` The builder object. Notes ----- See https://github.com/NCAR/esm-collection-spec/blob/master/collection-spec/collection-spec.md for more """ for col in {variable_column_name, path_column_name}.union(set(groupby_attrs or [])): assert col in self.df.columns, f'{col} must be a column in the dataframe.' attributes = [Attribute(column_name=column, vocabulary='') for column in self.df.columns] _aggregation_control = AggregationControl( variable_column_name=variable_column_name, groupby_attrs=groupby_attrs, aggregations=aggregations, ) cat = ESMCatalogModel( esmcat_version=esmcat_version, description=description, attributes=attributes, aggregation_control=_aggregation_control, assets=Assets(column_name=path_column_name, format=data_format), ) cat._df = self.df cat.save( name=name, directory=directory, catalog_type=catalog_type, to_csv_kwargs=to_csv_kwargs, json_dump_kwargs=json_dump_kwargs, ) if not self.invalid_assets.empty: invalid_assets_report_file = f'{tempfile.gettempdir()}/{name}_invalid_assets.csv' warnings.warn( f'Unable to parse {len(self.invalid_assets)} assets/files. A list of these assets can be found in {invalid_assets_report_file}.', stacklevel=2, ) self.invalid_assets.to_csv(invalid_assets_report_file, index=False)