import os import json from flask import ( Flask, redirect, session, render_template, Response, send_from_directory, request, ) from flask_oidc import OpenIDConnect from flask_session import Session from werkzeug.middleware.proxy_fix import ProxyFix from functions.get_oidc_configuration import get_oidc_configuration from functions.ldap_delete_all_uid_entries import ldap_delete_all_uid_entries from functions.ldap_get_all_emails_for_uid import ldap_get_all_emails_for_uid from functions.ldap_add_email_with_uid import ldap_add_email_with_uid from functions.verify_certificate import verify_certificate os.makedirs("/data/flask_session/", exist_ok=True) app = Flask(__name__) app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_port=1) with open("config.json", "r") as fid: config_json = json.load(fid) app.config["SECRET_KEY"] = config_json["FLASK_SECRET_KEY"] app.config["SESSION_TYPE"] = "filesystem" app.config["SESSION_FILE_DIR"] = "/data/flask_session/" app.config["SESSION_PERMANENT"] = False app.config["SESSION_USE_SIGNER"] = True sess = Session(app) # Fetch dynamic configuration oidc_endpoints = get_oidc_configuration() if not oidc_endpoints: raise Exception("Could not retrieve OpenID Connect configuration") # Flask and OIDC Configuration app.config.update( { "OIDC_CLIENT_SECRETS": { "web": { **oidc_endpoints, # Dynamically retrieved endpoints "redirect_uris": [config_json["redirect_uri"]], } }, "OIDC_ID_TOKEN_COOKIE_SECURE": True, "OIDC_REQUIRE_HTTPS": True, "OIDC_OPENID_REALM": "master", } ) # Initialize OpenID Connect oidc = OpenIDConnect(app) @app.route("/", methods=["GET", "POST"]) def index_page(): if oidc.user_loggedin: uid: str = session["oidc_auth_profile"].get("preferred_username") if request.method == "POST": response_mode: str = request.form.get("mode").strip() if response_mode == "delete": ldap_delete_all_uid_entries( uid=uid, config_json=config_json, delete_legacy_entry=bool(config_json["delete_legacy_entry"]), ) certificate_error = None if request.method == "POST": response_mode: str = request.form.get("mode").strip() if response_mode == "update": # Check if we have a file upload if 'certificate' in request.files: file = request.files['certificate'] if file.filename: post_content = file.read().decode('utf-8') else: post_content = "" else: # Handle text input post_content = request.form.get("certificate", "") post_content = post_content.strip() if len(post_content) == 0: post_content = "None" if post_content != "None": cert_data = post_content.encode() ( certificate_status, certificate_error, certificate_emails, certificate_name, certificate_given_name, certificate_surname, ) = verify_certificate( cert_data=cert_data, ca_chain_path=config_json["ca_chain"], correct_organization=config_json["correct_organization"], ) if len(certificate_error) == 0: certificate_error = None if ( certificate_status and len(certificate_emails) > 0 and len(certificate_name) > 0 and len(certificate_given_name) > 0 and len(certificate_surname) > 0 ): for certificate_email in certificate_emails: ldap_add_email_with_uid( uid=uid, mail=certificate_email, givenName=certificate_given_name, sn=certificate_surname, cert=cert_data, config_json=config_json, ) # Make a list of all registered email addresses html_email_str: str = "" emails_registered = ldap_get_all_emails_for_uid( uid=uid, config_json=config_json )[1] html_email_str += "
" html_email_str += '
" html_error_str += " | "
html_error_str += ' '
html_error_str += ''
html_error_str += " "
html_error_str += " | "
html_error_str += ""
html_error_str += ' '
html_error_str += f''
html_error_str += " "
html_error_str += " | "
html_error_str += "