sso.fb1.uni-bremen.de/docker/compose/overleafssh/data/main.py

184 lines
5.3 KiB
Python

import os
import json
from flask import Flask, render_template, redirect, url_for, request, session, current_app, send_from_directory, Blueprint
from authlib.integrations.flask_client import OAuth
from functools import wraps
import secrets
import requests # type: ignore
from requests.auth import HTTPBasicAuth # type: ignore
app = Flask(__name__)
# Secret key for session management
app.secret_key = os.urandom(24)
# OAuth Configuration
oauth = OAuth(app)
with open("config.json", "r") as file:
config_json: dict = json.load(file)
# Keycloak Configuration
keycloak_conf = {
'server_metadata_url': f"{config_json['issuer_url']}/.well-known/openid-configuration",
'client_id': config_json["login_client_id"],
'client_secret': config_json["login_client_secret"],
'client_kwargs': {
'scope': 'openid profile email '
}
}
# Configure Keycloak OAuth
keycloak = oauth.register(
name='keycloak',
**keycloak_conf
)
# Login required decorator
def login_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if 'user' not in session:
return redirect(url_for('keys.login'))
return f(*args, **kwargs)
return decorated_function
# Create a blueprint for all keys routes
keys = Blueprint('keys', __name__, url_prefix='/keys')
@keys.route('/', methods=['GET', 'POST'])
@login_required
def keys_form():
error_msg: str = ""
username:str = session.get('username')
users_url = f"{config_json['keycloak_url']}/admin/realms/master/users"
token_url = f"{config_json['keycloak_url']}/realms/master/protocol/openid-connect/token"
token_data = {
"grant_type": "password",
"username": config_json["admin_username"],
"password": config_json["admin_password"],
}
# Get token
try:
response = requests.post(
token_url,
data=token_data,
auth=HTTPBasicAuth(config_json["client_id"], config_json["client_secret"]),
)
response.raise_for_status()
except requests.exceptions.HTTPError:
error_msg = "SSO connection broken. No token."
access_token = response.json()["access_token"]
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
}
# Check if user exists
params = {"username": username, "exact": "true"}
try:
response = requests.get(users_url, headers=headers, params=params)
response.raise_for_status()
# Response is a list of users matching the criteria
users = response.json()
if len(users) == 0:
error_msg = f"User {username} does not exists."
if len(users) > 1:
error_msg = f"User {username} can not be identified."
except requests.exceptions.HTTPError:
error_msg = "Communication with SSO server failed."
user_id = users[0]["id"]
# Get user details
user_url = f"{users_url}/{user_id}"
try:
response = requests.get(user_url, headers=headers)
response.raise_for_status()
user_data = response.json()
except requests.exceptions.HTTPError as e:
error_msg = f"Failed to get user details: {str(e)}"
if "attributes" not in user_data:
user_data["attributes"] = dict()
if ("Overleaf_SSH_Public_Key" in user_data["attributes"] is not None) and \
(user_data["attributes"]["Overleaf_SSH_Public_Key"] is not None) and \
(len(user_data["attributes"]["Overleaf_SSH_Public_Key"]) > 0):
public_key = user_data["attributes"]["Overleaf_SSH_Public_Key"][0]
private_key = user_data["attributes"]["Overleaf_SSH_Private_Key"][0]
else:
public_key = "No available (No overleaf account?)"
private_key = "No available (No overleaf account?)"
return render_template("post.html", username=username, error_msg=error_msg, public_key=public_key, private_key=private_key)
@keys.route('/login')
def login():
# Generate nonce
nonce = secrets.token_urlsafe(16)
session['oauth_nonce'] = nonce
redirect_uri = f"{config_json['base_url']}/keys/authorize"
return keycloak.authorize_redirect(
redirect_uri,
nonce=nonce
)
@keys.route("/static/<path:path>", methods=["GET"])
def serve_static_files(path):
return send_from_directory("static", path)
@keys.route('/authorize')
def authorize():
# Retrieve the nonce from the session
nonce = session.pop('oauth_nonce', None)
token = keycloak.authorize_access_token()
# Verify the token
userinfo = keycloak.parse_id_token(token, nonce=nonce)
# Store user information in session
session['user'] = userinfo
session['username'] = userinfo.get('preferred_username', 'User')
return redirect(url_for('keys.keys_form'))
@keys.route('/logout')
def logout():
# Clear the session
session.clear()
# Redirect to Keycloak logout endpoint
logout_url = (
f"{keycloak_conf['server_metadata_url'].replace('.well-known/openid-configuration', '')}") + \
f"protocol/openid-connect/logout?client_id={keycloak_conf['client_id']}&" + \
f"post_logout_redirect_uri={config_json['base_url']}"
return redirect(logout_url)
# Register the blueprint
app.register_blueprint(keys)
@app.errorhandler(401)
def unauthorized(error):
return redirect(url_for('keys.login'))
if __name__ == '__main__':
app.run(host='0.0.0.0', port=80)