-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathauthorizer.py
More file actions
executable file
·103 lines (86 loc) · 3.17 KB
/
authorizer.py
File metadata and controls
executable file
·103 lines (86 loc) · 3.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
#!/usr/bin/env python3
import argparse
import json
import threading
import time
import logging
import webbrowser
from urllib.parse import urlunsplit, urlencode
import requests
from flask import Flask
from flask import request
client_id = ""
client_secret = ""
port = 5000
scope = 'https://www.googleapis.com/auth/analytics.readonly'
GET_CODE_ENDPOINT = '/oauth'
first_call_executed = False
app = Flask(__name__)
@app.route("/")
def health_check():
return "ok"
@app.route(GET_CODE_ENDPOINT)
def exchange_for_auth_token():
response_data = request.args
if 'error' in response_data:
return {"message": "Failed to get authorization code from Google", "error": response_data['error']}
parameters = {
'client_id': client_id,
'client_secret': client_secret,
'code': response_data['code'],
'grant_type': 'authorization_code',
'redirect_uri': f'http://localhost:{port}{GET_CODE_ENDPOINT}'
}
url = urlunsplit(('https', "oauth2.googleapis.com", '/token', urlencode(parameters), ""))
resp = requests.post(url)
print(json.dumps(resp.json(), sort_keys=True, indent=4, separators=(',', ': ')))
return resp.json()
@app.before_first_request
def send_auth_request():
global first_call_executed
if not first_call_executed:
parameters = {
'client_id': client_id,
'redirect_uri': f'http://localhost:{port}{GET_CODE_ENDPOINT}',
'response_type': 'code',
'scope': scope,
'access_type': 'offline',
'prompt': 'consent'
}
path = '/o/oauth2/v2/auth'
query = urlencode(parameters)
url = urlunsplit(('https', 'accounts.google.com', path, query, ""))
first_call_executed = True
webbrowser.open_new_tab(url)
def start_runner():
def wait_server_loop():
not_started = True
while not_started:
print('In start loop')
try:
url = f'http://localhost:{port}/'
r = requests.get(url)
if r.status_code == 200:
print('Server started, quiting start_loop')
not_started = False
send_auth_request()
except:
logging.exception("Cannot connect to server")
print('Server not yet started')
time.sleep(2)
print('Started runner')
thread = threading.Thread(target=wait_server_loop)
thread.start()
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='OAuth tool')
parser.add_argument("--client_id", help='Google client_id', required=True)
parser.add_argument("--client_secret", help='Google client_secret', required=True)
parser.add_argument("--port", help='Port to run application, 5000 by default', required=False, default=5000)
parser.add_argument("--scope", help='OAuth scope (default is https://www.googleapis.com/auth/analytics.readonly)', required=False, default='https://www.googleapis.com/auth/analytics.readonly')
args = parser.parse_args()
client_id = args.client_id
client_secret = args.client_secret
scope = args.scope
port = args.port
start_runner()
app.run()