Source code for dask.dataframe.io.orc

from distutils.version import LooseVersion

from .utils import _get_pyarrow_dtypes, _meta_from_dtypes
from ..core import DataFrame
from ...base import tokenize
from ...bytes.core import get_fs_token_paths
from ...utils import import_required

__all__ = ("read_orc",)


def _read_orc_stripe(fs, path, stripe, columns=None):
    """Pull out specific data from specific part of ORC file"""
    orc = import_required("pyarrow.orc", "Please install pyarrow >= 0.9.0")
    import pyarrow as pa

    with fs.open(path, "rb") as f:
        o = orc.ORCFile(f)
        table = o.read_stripe(stripe, columns)
    if pa.__version__ < LooseVersion("0.11.0"):
        return table.to_pandas()
    else:
        return table.to_pandas(date_as_object=False)


[docs]def read_orc(path, columns=None, storage_options=None): """Read dataframe from ORC file(s) Parameters ---------- path: str or list(str) Location of file(s), which can be a full URL with protocol specifier, and may include glob character if a single string. columns: None or list(str) Columns to load. If None, loads all. storage_options: None or dict Further parameters to pass to the bytes backend. Returns ------- Dask.DataFrame (even if there is only one column) Examples -------- >>> df = dd.read_orc('https://github.com/apache/orc/raw/' ... 'master/examples/demo-11-zlib.orc') # doctest: +SKIP """ orc = import_required("pyarrow.orc", "Please install pyarrow >= 0.9.0") import pyarrow as pa if LooseVersion(pa.__version__) == "0.10.0": raise RuntimeError( "Due to a bug in pyarrow 0.10.0, the ORC reader is " "unavailable. Please either downgrade pyarrow to " "0.9.0, or use the pyarrow master branch (in which " "this issue is fixed).\n\n" "For more information see: " "https://issues.apache.org/jira/browse/ARROW-3009" ) storage_options = storage_options or {} fs, fs_token, paths = get_fs_token_paths( path, mode="rb", storage_options=storage_options ) schema = None nstripes_per_file = [] for path in paths: with fs.open(path, "rb") as f: o = orc.ORCFile(f) if schema is None: schema = o.schema elif schema != o.schema: raise ValueError("Incompatible schemas while parsing ORC files") nstripes_per_file.append(o.nstripes) schema = _get_pyarrow_dtypes(schema, categories=None) if columns is not None: ex = set(columns) - set(schema) if ex: raise ValueError( "Requested columns (%s) not in schema (%s)" % (ex, set(schema)) ) else: columns = list(schema) meta = _meta_from_dtypes(columns, schema, [], []) name = "read-orc-" + tokenize(fs_token, path, columns) dsk = {} N = 0 for path, n in zip(paths, nstripes_per_file): for stripe in range(n): dsk[(name, N)] = (_read_orc_stripe, fs, path, stripe, columns) N += 1 return DataFrame(dsk, name, meta, [None] * (len(dsk) + 1))