Source code for ads.pipeline.visualizer.base

#!/usr/bin/env python
# -*- coding: utf-8; -*-

# Copyright (c) 2022 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v 1.0 as shown at

import re
import string
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import ByteString, Dict, List, Optional, Union

import fsspec
from oci.data_science.models.pipeline_step_run import PipelineStepRun

[docs] class PipelineVisualizerError(Exception): # pragma: no cover pass
[docs] class GraphOrientation: TOP_BOTTOM = "TB" LEFT_RIGHT = "LR"
WAIT_STATUS = { StepStatus.WAITING, StepStatus.IN_PROGRESS, StepStatus.CANCELING, } COMPLETE_STATUS = { StepStatus.SUCCEEDED, StepStatus.FAILED, StepStatus.CANCELED, StepStatus.DELETED, StepStatus.SKIPPED, } FAIL_STATUS = { StepStatus.FAILED, StepStatus.CANCELED, StepStatus.DELETED, StepStatus.SKIPPED, } STATUS_TEXT_MAP = { StepStatus.WAITING: "Waiting", StepStatus.ACCEPTED: "Accepted", StepStatus.IN_PROGRESS: "In Progress", StepStatus.FAILED: "Failed", StepStatus.SUCCEEDED: "Succeeded", StepStatus.CANCELING: "Canceling", StepStatus.CANCELED: "Canceled", StepStatus.DELETED: "Deleted", StepStatus.SKIPPED: "Skipped", } STATUS_COLOR_MAP = { StepStatus.WAITING: "#747E7E", StepStatus.ACCEPTED: "#F26B1D", StepStatus.IN_PROGRESS: "#3E975D", StepStatus.FAILED: "#9146C2", StepStatus.SUCCEEDED: "#2C6CBF", StepStatus.CANCELING: "#D92211", StepStatus.CANCELED: "#D92211", StepStatus.DELETED: "#D92211", StepStatus.SKIPPED: "#D92211", }
[docs] class StepKind: ML_JOB = "ML_JOB" CUSTOM_SCRIPT = "CUSTOM_SCRIPT" PIPELINE = "pipeline"
STEP_KIND_MAP = { StepKind.ML_JOB: "Datascience Job", StepKind.CUSTOM_SCRIPT: "Custom Script", StepKind.PIPELINE: "Pipeline", } def _to_utc_native(dt: datetime) -> str: """Converts offset-aware datetime to offset-native UTC time. Parameters ---------- dt : datetime.datetime A datetime object to be converted. If dt is offset native, it will be returned as is. Returns ------- datetime.datetime Offset-native datetime """ if not dt: return None if dt.tzinfo: return dt.astimezone(timezone.utc).replace(tzinfo=None) return dt def _strfdelta(seconds: int, format: str = "%H:%M:%S") -> str: """Converts a datetime.timedelta object to a custom-formatted string. Parameters ---------- seconds: int Seconds to be converted to a formatted string. format: (str, optional). Defaults to `%H:%M:%S`. The format argument allows custom formatting to be specified. """ if not seconds: return "" return datetime.utcfromtimestamp(seconds).strftime(format) def _replace_special_chars(value: str, repl: Optional[str] = "_") -> str: """Returns a string obtained by replacing the all special characters in the string, including spaces, by the replacement `repl`. Parameters ---------- value: str The string that needs to be processed. repl: (str, optional). Defaults to `_`. The replacement for the special characters. Returns ------- str The new string. """ return re.sub(f"[{re.escape(string.punctuation)} ]", repl, value)
[docs] @dataclass class RendererItemStatus: """Class represents the state of the renderer item.""" name: str kind: str = "" time_started: datetime = None time_finished: datetime = None lifecycle_state: str = "" lifecycle_details: str = "" _key: str = "" def __post_init__(self): self.time_started = _to_utc_native(self.time_started) self.time_finished = _to_utc_native(self.time_finished) self._key = f"{_replace_special_chars(}_{self.kind}".lower() @property def key(self) -> str: """Key of the item. Returns ------- str The key of the item. """ return self._key @property def duration(self) -> int: """ Calculates duration in seconds between `time_started` and `time_finished`. Returns ------- int The duration in seconds between `time_started` and `time_finished`. """ if not self.time_started: return 0 if not self.time_finished: return (datetime.utcnow() - self.time_started).seconds return (self.time_finished - self.time_started).seconds
[docs] @classmethod def from_pipeline_run(cls, pipeline_run: "PipelineRun") -> "RendererItemStatus": """Creates class instance from the PipelineRun object. Parameters ---------- pipeline_run: PipelineRun The PipelineRun object. Returns ------- RendererItemStatus Instance of RendererItemStatus. """ return cls(, kind=pipeline_run.pipeline.kind, time_started=pipeline_run.time_accepted or pipeline_run.time_started, time_finished=pipeline_run.time_finished, lifecycle_state=pipeline_run.lifecycle_state, lifecycle_details=pipeline_run.lifecycle_details, )
[docs] @classmethod def from_pipeline_step_run( cls, pipeline_step_run: PipelineStepRun ) -> "RendererItemStatus": """Creates class instance from the PipelineStepRun object. Parameters ---------- pipeline_run: PipelineStepRun The PipelineStepRun object. Returns ------- RendererItemStatus Instance of RendererItemStatus. """ return cls( name=pipeline_step_run.step_name, kind=pipeline_step_run.step_type, time_started=pipeline_step_run.time_started, time_finished=pipeline_step_run.time_finished, lifecycle_state=pipeline_step_run.lifecycle_state, lifecycle_details=pipeline_step_run.lifecycle_details, )
[docs] @staticmethod def format_datetime(value: datetime, format="%Y-%m-%d %H:%M:%S") -> str: """Converts datetime object into a given format in string Parameters ---------- dt: datetime.datetime Datetime object to be formated. Returns ------- str: A timestamp in a string format. """ if not value: return None return value.strftime(format)
def __hash__(self): return hash(self._key) def __eq__(self, other): return hash(self) == hash(other)
[docs] @dataclass class RendererItem: name: str kind: str = "" spec: Union["Pipeline", "PipelineStep"] = None _key: str = "" def __post_init__(self): self._key = f"{_replace_special_chars(}_{self.kind}".lower() @property def key(self) -> str: """Key of the item. Returns ------- str The key of the item. """ return self._key def __hash__(self): return hash(self.key) def __eq__(self, other): return hash(self) == hash(other)
[docs] class PipelineRenderer(ABC): """The base class responsible for the vizualizing a pipleine."""
[docs] @abstractmethod def render( self, steps: List[RendererItem], deps: Dict[str, List[RendererItem]], step_status: Dict[str, List[RendererItemStatus]] = None, **kwargs, ): """Renders pipeline run.""" pass
def _write_to_file( self, content: Union[str, ByteString], mode: str, uri: str, **kwargs ) -> None: """Writes string into location specified by uri. Parameters ---------- content: Union[str, ByteString] The content to be saved. mode: str The file write mode. uri: (string) URI location to save content. kwargs ------ keyword arguments to be passed into For OCI object storage, this should be config="path/to/.oci/config". For other storage connections consider e.g. host, port, username, password, etc. Returns ------- None Nothing. """ with, f"w{mode}", **kwargs) as f: f.write(content)
[docs] def save_to(self, *args, **kwargs) -> str: """Saves the pipeline visualization to the provided format.""" raise NotImplementedError("`.save_to()` is not implemented.")
[docs] class PipelineVisualizer: """PipelineVisualizer class to visualize pipeline in text or graph. Attributes ---------- pipeline: Pipeline Pipeline instance. pipeline_run: PipelineRun PipelineRun instance. steps: List[RendererItem] A list of RendererItem objects. deps: Dict[str, List[RendererItem]] A dictionary mapping the key of a RendererItem to a list of RendererItem that this step depends on. step_status: Dict[str, RendererItemStatus], defaults to None. A dictionary mapping the key of a RendererItem to its current status. """ def __init__( self, pipeline: "Pipeline" = None, pipeline_run: "PipelineRun" = None, renderer: PipelineRenderer = None, ): """Initialize a PipelineVisualizer object. Parameters ---------- pipeline: Pipeline Pipeline instance. pipeline_run: PipelineRun PipelineRun instance. renderer: PipelineRenderer Renderer used to visualize pipeline in text or graph. """ self.pipeline = None self.pipeline_run = None self.renderer = None self.steps = [] self.deps = {} self.step_status = {} if pipeline: self.with_pipeline(pipeline) if pipeline_run: self.with_pipeline_run(pipeline_run) if renderer: self.with_renderer(renderer)
[docs] def with_renderer(self, value: PipelineRenderer) -> "PipelineVisualizer": """ Add renderer to visualize pipeline. Parameters ---------- value: object Renderer used to visualize pipeline in text or graph. Returns ------- PipelineVisualizer The PipelineVisualizer instance. Raises ------ PipelineVisualizerError If `renderer` not specified. """ if not value: raise PipelineVisualizerError("The `renderer` must be specified.") self.renderer = value return self
[docs] def with_pipeline(self, value: "Pipeline") -> "PipelineVisualizer": """ Adds a Pipeline instance to be rendered. Parameters ---------- value: Pipeline Pipeline instance. Returns ------- PipelineVisualizer The PipelineVisualizer instance. Raises ------ PipelineVisualizerError If `pipeline` not specified. """ if not value: raise PipelineVisualizerError("The `pipeline` must be specified.") self.pipeline = value pipeline_render_item = RendererItem(, kind=self.pipeline.kind, spec=self.pipeline ) self.steps = [pipeline_render_item] self.deps = {pipeline_render_item.key: []} if self.pipeline.step_details: render_item_map = { RendererItem(, kind=step.kind, spec=step) for step in self.pipeline.step_details } for step in self.pipeline.step_details: self.steps.append(render_item_map[]) if step.depends_on: depends_on = [ render_item_map[step_name] for step_name in step.depends_on ] else: depends_on = [pipeline_render_item] self.deps[render_item_map[].key] = depends_on return self
[docs] def with_pipeline_run(self, value: "PipelineRun") -> "PipelineVisualizer": """ Adds a PipelineRun instance to be rendered. Parameters ---------- value: PipelineRun PipelineRun instance. Returns ------- PipelineVisualizer The PipelineVisualizer instance. Raises ------ PipelineVisualizerError If `pipeline run` not specified. """ if not value: raise PipelineVisualizerError("The `pipeline run` must be specified.") self.pipeline_run = value self.step_status = {} if self.pipeline_run: render_item_status = RendererItemStatus.from_pipeline_run(self.pipeline_run) self.step_status[render_item_status.key] = render_item_status if self.pipeline_run.step_runs: for step_run in self.pipeline_run.step_runs: render_item_status = RendererItemStatus.from_pipeline_step_run( step_run ) self.step_status[render_item_status.key] = render_item_status return self
[docs] def render(self, rankdir: str = GraphOrientation.TOP_BOTTOM): """ Renders pipeline step status. Parameters ---------- rankdir: str, default to "TB". Direction of the rendered graph; allowed Values are {"TB", "LR"}. Returns ------- None Raises ------ PipelineVisualizerError If `pipeline` or `renderer` not specified. """ if not (self.steps and self.deps and self.renderer): raise PipelineVisualizerError("The `pipeline` must be specified.") self.renderer.render( steps=self.steps, deps=self.deps, step_status=self.step_status, rankdir=rankdir, )
[docs] def to_svg( self, uri: str = None, rankdir: str = GraphOrientation.TOP_BOTTOM, **kwargs ) -> str: """ Renders pipeline as graph in SVG string. Parameters ---------- uri: (string, optional). Defaults to None. URI location to save the SVG string. rankdir: str, default to "TB". Direction of the rendered graph; allowed Values are {"TB", "LR"}. Returns ------- str Graph in svg format. Raises ------ PipelineVisualizerError If `pipeline` or `renderer` not specified. """ if not (self.steps and self.deps and self.renderer): raise PipelineVisualizerError("The `pipeline` must be specified.") return self.renderer.save_to( steps=self.steps, deps=self.deps, step_status=self.step_status, rankdir=rankdir, uri=uri, format="svg", **kwargs, )