Source code for ads.bds.auth

#!/usr/bin/env python
# -*- coding: utf-8 -*--

# Copyright (c) 2022 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/


import os
import subprocess
from contextlib import contextmanager


DEFAULT_KRB5_CONFIG_PATH = "~/.bds_config/krb5.conf"
KRB5_CONFIG = "KRB5_CONFIG"


[docs] class KRB5KinitError(Exception): # pragma: no cover """KRB5KinitError class when kinit -kt command failed to generate cached ticket with the keytab file and the krb5 config file.""" pass
[docs] def has_kerberos_ticket(): """Whether kerberos cache ticket exists.""" return True if subprocess.call(["klist", "-s"]) == 0 else False
[docs] def init_ccache_with_keytab(principal: str, keytab_file: str) -> None: """Initialize credential cache using keytab file. Parameters ---------- principal: str The unique identity to which Kerberos can assign tickets. keytab_path: str Path to your keytab file. Returns ------- None Nothing. """ cmd = "kinit -kt %(keytab_file)s %(principal)s" args = {} args["principal"] = principal args["keytab_file"] = keytab_file kinit_proc = subprocess.Popen((cmd % args).split(), stderr=subprocess.PIPE) stdout_data, stderr_data = kinit_proc.communicate() if kinit_proc.returncode > 0: raise KRB5KinitError(stderr_data)
[docs] @contextmanager def krbcontext( principal: str, keytab_path: str, kerb5_path: str = DEFAULT_KRB5_CONFIG_PATH ) -> None: """A context manager for Kerberos-related actions. It provides a Kerberos context that you can put code inside. It will initialize credential cache automatically with keytab if no cached ticket exists. Otherwise, does nothing. Parameters ---------- principal: str The unique identity to which Kerberos can assign tickets. keytab_path: str Path to your keytab file. kerb5_path: (str, optional). Path to your krb5 config file. Returns ------- None Nothing. Examples -------- >>> from ads.bds.auth import krbcontext >>> from pyhive import hive >>> with krbcontext(principal = "your_principal", keytab_path = "your_keytab_path"): >>> hive_cursor = hive.connect(host="your_hive_host", ... port="your_hive_port", ... auth='KERBEROS', ... kerberos_service_name="hive").cursor() """ refresh_ticket(principal=principal, keytab_path=keytab_path, kerb5_path=kerb5_path) yield
[docs] def refresh_ticket( principal: str, keytab_path: str, kerb5_path: str = DEFAULT_KRB5_CONFIG_PATH ) -> None: """generate new cached ticket based on the principal and keytab file path. Parameters ---------- principal: str The unique identity to which Kerberos can assign tickets. keytab_path: str Path to your keytab file. kerb5_path: (str, optional). Path to your krb5 config file. Returns ------- None Nothing. Examples -------- >>> from ads.bds.auth import refresh_ticket >>> from pyhive import hive >>> refresh_ticket(principal = "your_principal", keytab_path = "your_keytab_path") >>> hive_cursor = hive.connect(host="your_hive_host", ... port="your_hive_port", ... auth='KERBEROS', ... kerberos_service_name="hive").cursor() """ keytab_path = os.path.abspath(os.path.expanduser(keytab_path)) os.environ[KRB5_CONFIG] = os.path.abspath(os.path.expanduser(kerb5_path)) if not os.path.exists(os.environ[KRB5_CONFIG]): raise FileNotFoundError(f"krb5 config file not found in {kerb5_path}.") if not has_kerberos_ticket(): init_ccache_with_keytab(principal, keytab_path)