Source code for pywgrib2_xr.script

"""This is a stand-alone script installed as **pywgrib2**.
It provides the following functionality:

    1. List content of a GRIB2 file (same as **wgrib2** without options).
    2. Create inventory files.
    3. List content of inventory files.
    4. Convert GRIB2 files to netCDF4.
    5. Convert GRIB2 files to zarr.
    6. Emulates **wgrib2** executable.
"""

from functools import partial
import getopt
import glob
from multiprocessing.pool import Pool
import pickle
import os
import sys
from typing import List, Optional  # , Sequence

from .inventory import (
    FileMetaData,
    MetaData,
    load_inventory,
    make_inventory,
    save_inventory,
)
from .template import make_template
from .wgrib2 import wgrib
from .xarray_store import open_dataset

_inv_ext = ".binv"

USAGE = """USAGE: pywgrib2 [-h] | [command [option ...]] argument ...

where:
    -h
        Print this message and exit.

    When command is not specified, the script emulates wgrib2 executable.

    command:
        list_inv
            Displays content of GRIB file(s).

            Options:
                -L
                    Long listing (all metadata). Default is short

            Arguments:
                gribfile ...
                    One or more GRIB files.

        make_inv
            Makes inventory file[s]

            Options:
                -i inv-dir
                    Directory for inventory files. Will be created if does not exist.
                    Intended for read-only GRIB directories.
                -n num-procs
                    Number of processes in multiprocessing mode. Default is 1.
                -p pattern
                    "glob.glob" pattern.
                -r
                    Recursively search grib-dir subdirectories.

            Arguments:
                gribfile ...
                    Zero (only if -r or -p is specified) or more GRIB files.

        cat_inv
            Lists content of inventory file[s].
            Use when inventories coexist with GRIB files.

            Options:
                -d dir
                    Directory of GRIB files
                -r
                    Recursively search subdirectories for inventory files.

            Arguments:
                gribfile ...
                    Zero or more GRIB files. The extension ".binv" does not need
                    to be included.

            The final list of files comprises directory entries and explicit
            paths.

        cat_hash
            Lists content of inventory file[s].
            Use when inventories are not collocated with GRIB files
            (i.e. -i inv_dir was specified for make_inv).

            Arguments:
                invfile ...
                    One or more GRIB files. The extension ".binv" does not need
                    to be included.

        template:
            Writes template file.

            Options:
                -i inv_dir
                    Location of inventory files, if different from GRIB files.
                -t reftime
                    Reference time, necessary when GRIB files have messages
                    with more than one reference time.
                -o template
                    Output file name. Must be specified.

            Arguments:
                gribfile ...
                    One or more GRIB files.

        to_nc:
            Writes netcdf file.

            Options:
                -c
                    Compress file, with zlib and compression level 1.
                -o ncfile
                    Output file name. Must be specified.
                -T template
                    Template file name. Must be specified.

            Arguments:
                gribfile ...
                    One or more GRIB files.

        to_zarr:
            Writes zarr group.

            Options:
                -c level
                    Compression level, an integer between 1 and 4. Default is 1.
                -o store
                    Output directory. Must be specified.
                -T template
                    Template file name. Must be specified.

            Arguments:
                gribfile ...
                    One or more GRIB files.
"""


def _print_inventory(inventory: Optional[List[MetaData]], listing: str) -> None:
    if not inventory:
        return
    if listing == "full":
        for i in inventory:
            print(i)
        return
    file = inventory[0].file
    file_inv = FileMetaData(file, inventory)
    if listing == "long":
        print(repr(file_inv))
    else:
        print(file_inv)


def list_inv(args: List[str]) -> None:
    opts, pargs = getopt.getopt(args, "L")
    kwds = dict(opts)
    listing = kwds.get("-L", "short")
    for p in pargs:
        if os.path.isfile(p):
            inventory = make_inventory(p)
            if inventory:
                _print_inventory(inventory, listing)
            else:
                print("No GRIB messages in {:s}".format(p))
        else:
            print("{:s} is not a file".format(p))


def _f(p, d):
    save_inventory(make_inventory(p), p, d)


def make_inv(args: List[str]) -> None:
    opts, pargs = getopt.getopt(args, "hi:n:p:")
    kwds = dict(opts)
    recursive = "-r" in kwds
    inv_dir = kwds.get("-i")
    num_processes = int(kwds.get("-n", 1))
    if not 1 <= num_processes <= 4:
        raise ValueError("Number of processes must be between 1 and 4")
    pattern = kwds.get("-p")
    if pattern:
        files = [
            p
            for p in glob.glob(pattern, recursive=recursive)
            if os.path.isfile(p) and not p.endswith(_inv_ext)
        ]
    else:
        files = []
    files.extend(pargs)

    fun = partial(_f, d=inv_dir)

    if num_processes == 1:
        for file in files:
            fun(file)
    else:
        with Pool(num_processes) as pool:
            pool.map(fun, files)


def cat_inv(args: List[str]) -> None:
    opts, pargs = getopt.getopt(args, "d:hi:Lr")
    kwds = dict(opts)
    recursive = "-r" in kwds
    data_dir = kwds.get("-d")
    listing = kwds.get("-L", "short")
    if data_dir:
        if recursive:
            pattern = os.path.join(data_dir, "**", "*" + _inv_ext)
        else:
            pattern = os.path.join(data_dir, "*" + _inv_ext)
        files = glob.glob(pattern, recursive=recursive)
    else:
        files = []
    files.extend(pargs)
    for file in files:
        base, ext = os.path.splitext(file)
        f = base if ext == _inv_ext else file
        inventory = load_inventory(f)
        _print_inventory(inventory, listing)


def cat_hash(args: List[str]) -> None:
    opts, pargs = getopt.getopt(args, "L")
    kwds = dict(opts)
    listing = kwds.get("-L", "short")
    for arg in pargs:
        base, ext = os.path.splitext(arg)
        file = base if ext == _inv_ext else arg
        inventory = load_inventory(file)
        _print_inventory(inventory, listing)


def mk_tmpl(args) -> None:
    opts, pargs = getopt.getopt(args, "i:o:t:v:")
    kwds = dict(opts)
    inv_dir = kwds.get("-i")
    tmplfile = kwds.get("-o")
    if tmplfile is None:
        raise ValueError("Missing output file")
    reftime = kwds.get("-t")
    vertlevel = kwds.get("-v")
    template = make_template(
        pargs, reftime=reftime, invdir=inv_dir, vertlevels=vertlevel
    )
    with open(tmplfile, "wb") as fp:
        pickle.dump(template, fp)


def to_nc(args: List[str]) -> None:
    opts, pargs = getopt.getopt(args, "co:T:")
    kwds = dict(opts)
    compress = "-c" in kwds
    ncfile = kwds.get("-o")
    if ncfile is None:
        raise ValueError("Missing output file")
    tmplfile = kwds.get("-T")
    if tmplfile is None:
        raise ValueError("Missing template file")
    with open(tmplfile, "rb") as fp:
        template = pickle.load(fp)
    ds = open_dataset(pargs, template=template)
    vars = list(ds.data_vars.keys())
    vars.extend(["longitude", "latitude"])
    encoding = dict.fromkeys(vars, {"zlib": True, "complevel": 1}) if compress else None
    ds.to_netcdf(ncfile, engine="netcdf4", encoding=encoding)


def to_zarr(args: List[str]) -> None:
    import zarr

    opts, pargs = getopt.getopt(args, "c:o:T:")
    kwds = dict(opts)
    clevel = int(kwds.get("-c", 1))
    if not 1 <= clevel <= 9:
        raise ValueError("Invalid compression level {:d}".format(clevel))
    zarrdir = kwds.get("-o")
    if zarrdir is None:
        raise ValueError("Missing output file")
    tmplfile = kwds.get("-T")
    if tmplfile is None:
        raise ValueError("Missing template file")
    with open(tmplfile, "rb") as fp:
        template = pickle.load(fp)
    ds = open_dataset(pargs, template=template)
    compressor = zarr.Blosc(cname="zstd", shuffle=-1, clevel=clevel)
    vars = list(ds.data_vars.keys())
    vars.extend(["longitude", "latitude"])
    encoding = dict.fromkeys(vars, {"compressor": compressor})
    ds.to_zarr(zarrdir, consolidated=True, encoding=encoding)


commands = {
    "list_inv": list_inv,
    "make_inv": make_inv,
    "cat_inv": cat_inv,
    "cat_hash": cat_hash,
    "template": mk_tmpl,
    "to_nc": to_nc,
    "to_zarr": to_zarr,
}


[docs]def main(argv: Optional[List[str]] = None): if not argv: # Allow to call main() with arguments argv = sys.argv[1:] if argv: if argv[0] == "-h": print(USAGE) raise SystemExit # if (f := commands.get(argv[1])): f = commands.get(argv[0]) if f: f(argv[1:]) else: wgrib(*argv)
if __name__ == "__main__": main()