#!/usr/bin/env python
# -*- coding: utf-8; -*-
# Copyright (c) 2021, 2022 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/
import fsspec
import io
import json
import os
from typing import Dict, Generator, List, Union
from ads.common.decorator.runtime_dependency import runtime_dependency
from ads.text_dataset.utils import PY4JGateway, experimental
from fsspec.core import OpenFile
[docs]
class Base:
"""Base class for backends."""
[docs]
def read_line(
self, fhandler: OpenFile
) -> Generator[Union[str, List[str]], None, None]:
"""Read lines from a file.
Parameters
----------
fhandler : `fsspec.core.OpenFile`
a file handler returned by `fsspec`
Yields
-------
Generator
a generator that yields lines
"""
with fhandler as f:
for line in f:
yield line.decode(fhandler.encoding)
[docs]
def read_text(
self, fhandler: OpenFile
) -> Generator[Union[str, List[str]], None, None]:
"""Read entire file into a string.
Parameters
----------
fhandler : `fsspec.core.OpenFile`
a file handler returned by `fsspec`
Yields
-------
Generator
a generator that yields text in the file
"""
with fhandler as f:
yield f.read().decode(fhandler.encoding)
@staticmethod
def _validate_dest(src, dst_folder: str, fname: str = None) -> str:
if fname is None:
fname = os.path.splitext(os.path.basename(src))[0] + ".txt"
return os.path.join(dst_folder, fname)
[docs]
def convert_to_text(
self,
fhandler: OpenFile,
dst_path: str,
fname: str = None,
storage_options: Dict = None,
) -> str:
"""Convert input file to a text file
Parameters
----------
fhandler : `fsspec.core.OpenFile`
a file handler returned by `fsspec`
dst_path: str
local folder or cloud storage prefix to save converted text files
fname: str, optional
filename for converted output, relative to dirname or prefix, by default None
storage_options: dict, optional
storage options for cloud storage
Returns
-------
str
path to saved output
"""
dest = self._validate_dest(fhandler.path, dst_path, fname)
storage_options = {} if storage_options is None else storage_options
with fsspec.open(dest, mode="wb", **storage_options) as fout:
with fhandler as fin:
fout.write(fin.read())
return dest
[docs]
class Tika(Base):
[docs]
def read_line(self, fhandler):
with PY4JGateway() as gateway:
parser = gateway.jvm.parsers.TikaParser()
with fhandler as f:
reader = parser.reader(f.read())
while True:
line = reader.readLine()
if line is not None:
yield line
else:
break
[docs]
def read_text(self, fhandler):
with PY4JGateway() as gateway:
parser = gateway.jvm.parsers.TikaParser()
with fhandler as f:
yield parser.readText(f.read()).decode()
[docs]
def convert_to_text(self, fhandler, dst_path, fname=None, storage_options=None):
dest = self._validate_dest(fhandler.path, dst_path, fname)
storage_options = {} if storage_options is None else storage_options
with fsspec.open(dest, mode="w", **storage_options) as f:
f.write(next(self.read_text(fhandler)))
return dest
[docs]
def detect_encoding(self, fhandler: OpenFile):
with PY4JGateway() as gateway:
with fhandler as f:
return gateway.jvm.parsers.TikaParser.detectEncoding(f.read())
[docs]
class PDFPlumber(Base):
@runtime_dependency(
module="pdfplumber", err_msg="pdfplumber must be installed first."
)
def __init__(self):
super().__init__()
[docs]
def read_line(self, fhandler):
import pdfplumber
with fhandler as f:
pdf = pdfplumber.PDF(f)
for page in pdf.pages:
reader = io.StringIO(page.extract_text())
for line in reader:
yield line
[docs]
def read_text(self, fhandler):
import pdfplumber
with fhandler as f:
pdf = pdfplumber.PDF(f)
texts = (page.extract_text() for page in pdf.pages)
yield "\n".join([text for text in texts if text is not None])
[docs]
def convert_to_text(
self,
fhandler,
dst_path,
fname=None,
storage_options=None,
):
import pdfplumber
dest = self._validate_dest(fhandler.path, dst_path, fname)
storage_options = {} if storage_options is None else storage_options
with fhandler as fin:
pdf = pdfplumber.PDF(fin)
with fsspec.open(dest, mode="w", **storage_options) as fout:
for page in pdf.pages:
text = page.extract_text()
if text is not None:
fout.write(text)
return dest
[docs]
@experimental
class OITCC(Base): # pragma: no cover
def convert_to_text(self, fhandler, output_path, fname=None, storage_options=None):
dest = self._validate_dest(fhandler.path, output_path, fname)
storage_options = {} if storage_options is None else storage_options
with fsspec.open(dest, mode="w", **storage_options) as f:
f.write(next(self.read_text(fhandler)))