Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions zeppelin_provider/hooks/zeppelin_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,15 @@ def __init__(
self.z_conn = self.get_connection(conn_id)
zeppelin_url = "http://" + self.z_conn.host + ":" + str(self.z_conn.port)
extra_config = self.z_conn.extra_dejson
self.principal = extra_config.get('principal', None)
self.principal_keytab = extra_config.get('principal_keytab', None)
self.knox_sso = extra_config.get('KNOX_SSO', None)
self.log.info("KNOX_SSO: " + str(self.knox_sso))
if self.knox_sso:
zeppelin_url = extra_config.get('REST_URL', None)
self.client_config = ClientConfig(zeppelin_url, 5, self.knox_sso)
if self.principal:
self.client_config = ClientConfig(zeppelin_url, query_interval=5, principal=self.principal, principal_keytab=self.principal_keytab)
else:
self.client_config = ClientConfig(zeppelin_url, query_interval=5)
self.z_client = ZeppelinClient(self.client_config)
Expand All @@ -51,6 +55,9 @@ def login(self) -> None:
user = self.z_conn.self.login
password = self.z_conn.password
self.z_client.self.login(user, password)
if self.principal:
self.log.info("Principal is enabled, self.login via principal")
self.z_client.login(principal=self.principal, principal_keytab=self.principal_keytab)

def run_note(self, note_id: str, parameters: Optional[Dict[str, str]]={}) -> None:
note_result = self.z_client.execute_note(note_id, parameters)
Expand Down
4 changes: 3 additions & 1 deletion zeppelin_provider/pyzeppelin/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ class ClientConfig:
"""
Client side configuration of Zeppelin SDK
"""
def __init__(self, zeppelin_rest_url, query_interval=1, knox_sso_url=None):
def __init__(self, zeppelin_rest_url, query_interval=1, knox_sso_url=None, principal=None, principal_keytab=None):
self.zeppelin_rest_url = zeppelin_rest_url
self.query_interval = query_interval
self.knox_sso_url = knox_sso_url
self.principal = principal
self.principal_keytab = principal_keytab

def get_zeppelin_rest_url(self):
return self.zeppelin_rest_url
Expand Down
2 changes: 1 addition & 1 deletion zeppelin_provider/pyzeppelin/notebook.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def __init__(self, note_json):
self.is_running = False
if 'info' in note_json:
info_json = note_json['info']
self.is_running = bool(info_json.get('isRunning', 'False'))
self.is_running = bool(info_json.get('isRunning'))
self.paragraphs = []
if 'paragraphs' in note_json:
paragraph_json_array = note_json['paragraphs']
Expand Down
12 changes: 11 additions & 1 deletion zeppelin_provider/pyzeppelin/zeppelin_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
from .notebook import Note, Paragraph
import time
import logging
from spnego import KerberosKeytab
from requests_kerberos import HTTPKerberosAuth


class SessionInfo:
Expand Down Expand Up @@ -71,7 +73,7 @@ def get_version(self):
self._check_response(resp)
return resp.json()['body']['version']

def login(self, user_name, password, knox_sso = None):
def login(self, user_name=None, password=None, knox_sso=None, principal=None, principal_keytab=None):
"""
Login to Zeppelin, use knox_sso if it is provided.
:param user_name:
Expand All @@ -89,6 +91,14 @@ def login(self, user_name, password, knox_sso = None):
if resp.status_code != 200:
raise Exception("Fail to get ticket after Knox SSO, status: {}, status_text: {}" \
.format(resp.status_code, resp.text))
if principal:
cred = KerberosKeytab(principal_keytab, principal)
auth = HTTPKerberosAuth(principal=cred)
self.session.auth = auth
resp = self.session.get(self.zeppelin_rest_url + "/api/notebook")
if resp.status_code != 200:
raise Exception("Kerberos login fails, status: {}, status_text: {}" \
.format(resp.status_code, resp.text))
else:
resp = self.session.post(self.zeppelin_rest_url + "/api/login",
data = {'userName': user_name, 'password': password})
Expand Down