smime_unibremen_ldap_exchan.../webapp/data/main.py

200 lines
6.7 KiB
Python

import os
import json
from flask import (
Flask,
redirect,
session,
render_template,
Response,
send_from_directory,
request,
)
from flask_oidc import OpenIDConnect
from flask_session import Session
from werkzeug.middleware.proxy_fix import ProxyFix
from functions.get_oidc_configuration import get_oidc_configuration
from functions.ldap_delete_all_uid_entries import ldap_delete_all_uid_entries
from functions.ldap_get_all_emails_for_uid import ldap_get_all_emails_for_uid
from functions.ldap_add_email_with_uid import ldap_add_email_with_uid
from functions.verify_certificate import verify_certificate
os.makedirs("/data/flask_session/", exist_ok=True)
app = Flask(__name__)
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_port=1)
with open("config.json", "r") as fid:
config_json = json.load(fid)
app.config["SECRET_KEY"] = config_json["FLASK_SECRET_KEY"]
app.config["SESSION_TYPE"] = "filesystem"
app.config["SESSION_FILE_DIR"] = "/data/flask_session/"
app.config["SESSION_PERMANENT"] = False
app.config["SESSION_USE_SIGNER"] = True
sess = Session(app)
# Fetch dynamic configuration
oidc_endpoints = get_oidc_configuration()
if not oidc_endpoints:
raise Exception("Could not retrieve OpenID Connect configuration")
# Flask and OIDC Configuration
app.config.update(
{
"OIDC_CLIENT_SECRETS": {
"web": {
**oidc_endpoints, # Dynamically retrieved endpoints
"redirect_uris": [config_json["redirect_uri"]],
}
},
"OIDC_ID_TOKEN_COOKIE_SECURE": True,
"OIDC_REQUIRE_HTTPS": True,
"OIDC_OPENID_REALM": "master",
}
)
# Initialize OpenID Connect
oidc = OpenIDConnect(app)
@app.route("/", methods=["GET", "POST"])
def index_page():
if oidc.user_loggedin:
uid: str = session["oidc_auth_profile"].get("preferred_username")
if request.method == "POST":
response_mode: str = request.form.get("mode").strip()
if response_mode == "delete":
ldap_delete_all_uid_entries(
uid=uid,
config_json=config_json,
delete_legacy_entry=bool(config_json["delete_legacy_entry"]),
)
certificate_error = None
if request.method == "POST":
response_mode: str = request.form.get("mode").strip()
if response_mode == "update":
# Check if we have a file upload
if 'certificate' in request.files:
file = request.files['certificate']
if file.filename:
post_content = file.read().decode('utf-8')
else:
post_content = ""
else:
# Handle text input
post_content = request.form.get("certificate", "")
post_content = post_content.strip()
if len(post_content) == 0:
post_content = "None"
if post_content != "None":
cert_data = post_content.encode()
(
certificate_status,
certificate_error,
certificate_emails,
certificate_name,
certificate_given_name,
certificate_surname,
) = verify_certificate(
cert_data=cert_data,
ca_chain_path=config_json["ca_chain"],
correct_organization=config_json["correct_organization"],
)
if len(certificate_error) == 0:
certificate_error = None
if (
certificate_status
and len(certificate_emails) > 0
and len(certificate_name) > 0
and len(certificate_given_name) > 0
and len(certificate_surname) > 0
):
for certificate_email in certificate_emails:
ldap_add_email_with_uid(
uid=uid,
mail=certificate_email,
givenName=certificate_given_name,
sn=certificate_surname,
cert=cert_data,
config_json=config_json,
)
# Make a list of all registered email addresses
html_email_str: str = ""
emails_registered = ldap_get_all_emails_for_uid(
uid=uid, config_json=config_json
)[1]
html_email_str += "<p>"
html_email_str += '<div class="form-group">'
html_email_str += '<label for="given_name">Published eMails Addresses:</label>'
html_email_str += "</div>"
if len(emails_registered) > 0:
html_email_str += "<ul>"
for email_item in emails_registered:
html_email_str += "<li>"
html_email_str += '<div class="form-group">'
html_email_str += f"{email_item}"
html_email_str += "</div>"
html_email_str += "</li>"
html_email_str += "</ul>"
else:
html_email_str += "<b>NONE</b>"
# If there was a problem with uploading a certificate
html_error_str: str = ""
if certificate_error is not None:
html_error_str += "<table>"
html_error_str += "<tr>"
html_error_str += "<td>"
html_error_str += "<td>"
html_error_str += '<div class="form-group">'
html_error_str += '<span class="status-marker status-fail"></span>'
html_error_str += "</div>"
html_error_str += "</td>"
html_error_str += "<td>"
html_error_str += '<div class="form-group">'
html_error_str += f'<label for="error">{certificate_error}</label>'
html_error_str += "</div>"
html_error_str += "</td>"
html_error_str += "</tr>"
html_error_str += "</table>"
return render_template(
"info.html",
html_email_str=html_email_str,
html_error_str=html_error_str,
uid=uid,
certificate_error=certificate_error,
)
else:
return redirect("https://smime.neuro.uni-bremen.de/login")
@app.route("/static/<path:path>", methods=["GET"])
def serve_static_files(path) -> Response:
return send_from_directory("static", path)
@app.route("/error")
def error_page():
return "An error occured!"