184 lines
5.3 KiB
Python
184 lines
5.3 KiB
Python
import os
|
|
import json
|
|
from flask import Flask, render_template, redirect, url_for, request, session, current_app, send_from_directory, Blueprint
|
|
from authlib.integrations.flask_client import OAuth
|
|
from functools import wraps
|
|
import secrets
|
|
import requests # type: ignore
|
|
from requests.auth import HTTPBasicAuth # type: ignore
|
|
|
|
|
|
app = Flask(__name__)
|
|
|
|
# Secret key for session management
|
|
app.secret_key = os.urandom(24)
|
|
|
|
# OAuth Configuration
|
|
oauth = OAuth(app)
|
|
|
|
with open("config.json", "r") as file:
|
|
config_json: dict = json.load(file)
|
|
|
|
# Keycloak Configuration
|
|
keycloak_conf = {
|
|
'server_metadata_url': f"{config_json['issuer_url']}/.well-known/openid-configuration",
|
|
'client_id': config_json["login_client_id"],
|
|
'client_secret': config_json["login_client_secret"],
|
|
'client_kwargs': {
|
|
'scope': 'openid profile email '
|
|
}
|
|
}
|
|
|
|
# Configure Keycloak OAuth
|
|
keycloak = oauth.register(
|
|
name='keycloak',
|
|
**keycloak_conf
|
|
)
|
|
|
|
# Login required decorator
|
|
def login_required(f):
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
if 'user' not in session:
|
|
return redirect(url_for('keys.login'))
|
|
return f(*args, **kwargs)
|
|
return decorated_function
|
|
|
|
# Create a blueprint for all keys routes
|
|
keys = Blueprint('keys', __name__, url_prefix='/keys')
|
|
|
|
@keys.route('/', methods=['GET', 'POST'])
|
|
@login_required
|
|
def keys_form():
|
|
|
|
error_msg: str = ""
|
|
|
|
username:str = session.get('username')
|
|
users_url = f"{config_json['keycloak_url']}/admin/realms/master/users"
|
|
token_url = f"{config_json['keycloak_url']}/realms/master/protocol/openid-connect/token"
|
|
token_data = {
|
|
"grant_type": "password",
|
|
"username": config_json["admin_username"],
|
|
"password": config_json["admin_password"],
|
|
}
|
|
|
|
# Get token
|
|
try:
|
|
response = requests.post(
|
|
token_url,
|
|
data=token_data,
|
|
auth=HTTPBasicAuth(config_json["client_id"], config_json["client_secret"]),
|
|
)
|
|
response.raise_for_status()
|
|
|
|
except requests.exceptions.HTTPError:
|
|
error_msg = "SSO connection broken. No token."
|
|
|
|
access_token = response.json()["access_token"]
|
|
headers = {
|
|
"Authorization": f"Bearer {access_token}",
|
|
"Content-Type": "application/json",
|
|
}
|
|
|
|
# Check if user exists
|
|
params = {"username": username, "exact": "true"}
|
|
|
|
|
|
try:
|
|
response = requests.get(users_url, headers=headers, params=params)
|
|
response.raise_for_status()
|
|
|
|
# Response is a list of users matching the criteria
|
|
users = response.json()
|
|
|
|
if len(users) == 0:
|
|
error_msg = f"User {username} does not exists."
|
|
|
|
if len(users) > 1:
|
|
error_msg = f"User {username} can not be identified."
|
|
|
|
except requests.exceptions.HTTPError:
|
|
error_msg = "Communication with SSO server failed."
|
|
|
|
user_id = users[0]["id"]
|
|
|
|
# Get user details
|
|
user_url = f"{users_url}/{user_id}"
|
|
|
|
try:
|
|
response = requests.get(user_url, headers=headers)
|
|
response.raise_for_status()
|
|
user_data = response.json()
|
|
except requests.exceptions.HTTPError as e:
|
|
error_msg = f"Failed to get user details: {str(e)}"
|
|
|
|
if "attributes" not in user_data:
|
|
user_data["attributes"] = dict()
|
|
|
|
if ("Overleaf_SSH_Public_Key" in user_data["attributes"] is not None) and \
|
|
(user_data["attributes"]["Overleaf_SSH_Public_Key"] is not None) and \
|
|
(len(user_data["attributes"]["Overleaf_SSH_Public_Key"]) > 0):
|
|
public_key = user_data["attributes"]["Overleaf_SSH_Public_Key"][0]
|
|
private_key = user_data["attributes"]["Overleaf_SSH_Private_Key"][0]
|
|
else:
|
|
public_key = "No available (No overleaf account?)"
|
|
private_key = "No available (No overleaf account?)"
|
|
|
|
return render_template("post.html", username=username, error_msg=error_msg, public_key=public_key, private_key=private_key)
|
|
|
|
@keys.route('/login')
|
|
def login():
|
|
# Generate nonce
|
|
nonce = secrets.token_urlsafe(16)
|
|
session['oauth_nonce'] = nonce
|
|
|
|
redirect_uri = f"{config_json['base_url']}/keys/authorize"
|
|
return keycloak.authorize_redirect(
|
|
redirect_uri,
|
|
nonce=nonce
|
|
)
|
|
|
|
@keys.route("/static/<path:path>", methods=["GET"])
|
|
def serve_static_files(path):
|
|
return send_from_directory("static", path)
|
|
|
|
@keys.route('/authorize')
|
|
def authorize():
|
|
|
|
# Retrieve the nonce from the session
|
|
nonce = session.pop('oauth_nonce', None)
|
|
|
|
token = keycloak.authorize_access_token()
|
|
|
|
# Verify the token
|
|
userinfo = keycloak.parse_id_token(token, nonce=nonce)
|
|
|
|
# Store user information in session
|
|
session['user'] = userinfo
|
|
session['username'] = userinfo.get('preferred_username', 'User')
|
|
|
|
return redirect(url_for('keys.keys_form'))
|
|
|
|
@keys.route('/logout')
|
|
def logout():
|
|
# Clear the session
|
|
session.clear()
|
|
|
|
# Redirect to Keycloak logout endpoint
|
|
logout_url = (
|
|
f"{keycloak_conf['server_metadata_url'].replace('.well-known/openid-configuration', '')}") + \
|
|
f"protocol/openid-connect/logout?client_id={keycloak_conf['client_id']}&" + \
|
|
f"post_logout_redirect_uri={config_json['base_url']}"
|
|
|
|
return redirect(logout_url)
|
|
|
|
# Register the blueprint
|
|
app.register_blueprint(keys)
|
|
|
|
@app.errorhandler(401)
|
|
def unauthorized(error):
|
|
return redirect(url_for('keys.login'))
|
|
|
|
if __name__ == '__main__':
|
|
app.run(host='0.0.0.0', port=80)
|
|
|