200 lines
6.7 KiB
Python
200 lines
6.7 KiB
Python
import os
|
|
import json
|
|
|
|
from flask import (
|
|
Flask,
|
|
redirect,
|
|
session,
|
|
render_template,
|
|
Response,
|
|
send_from_directory,
|
|
request,
|
|
)
|
|
from flask_oidc import OpenIDConnect
|
|
from flask_session import Session
|
|
from werkzeug.middleware.proxy_fix import ProxyFix
|
|
|
|
|
|
from functions.get_oidc_configuration import get_oidc_configuration
|
|
from functions.ldap_delete_all_uid_entries import ldap_delete_all_uid_entries
|
|
from functions.ldap_get_all_emails_for_uid import ldap_get_all_emails_for_uid
|
|
from functions.ldap_add_email_with_uid import ldap_add_email_with_uid
|
|
from functions.verify_certificate import verify_certificate
|
|
|
|
os.makedirs("/data/flask_session/", exist_ok=True)
|
|
|
|
app = Flask(__name__)
|
|
|
|
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_port=1)
|
|
|
|
with open("config.json", "r") as fid:
|
|
config_json = json.load(fid)
|
|
|
|
app.config["SECRET_KEY"] = config_json["FLASK_SECRET_KEY"]
|
|
app.config["SESSION_TYPE"] = "filesystem"
|
|
app.config["SESSION_FILE_DIR"] = "/data/flask_session/"
|
|
app.config["SESSION_PERMANENT"] = False
|
|
app.config["SESSION_USE_SIGNER"] = True
|
|
|
|
sess = Session(app)
|
|
|
|
|
|
# Fetch dynamic configuration
|
|
oidc_endpoints = get_oidc_configuration()
|
|
|
|
if not oidc_endpoints:
|
|
raise Exception("Could not retrieve OpenID Connect configuration")
|
|
|
|
|
|
# Flask and OIDC Configuration
|
|
app.config.update(
|
|
{
|
|
"OIDC_CLIENT_SECRETS": {
|
|
"web": {
|
|
**oidc_endpoints, # Dynamically retrieved endpoints
|
|
"redirect_uris": [config_json["redirect_uri"]],
|
|
}
|
|
},
|
|
"OIDC_ID_TOKEN_COOKIE_SECURE": True,
|
|
"OIDC_REQUIRE_HTTPS": True,
|
|
"OIDC_OPENID_REALM": "master",
|
|
}
|
|
)
|
|
|
|
# Initialize OpenID Connect
|
|
oidc = OpenIDConnect(app)
|
|
|
|
|
|
@app.route("/", methods=["GET", "POST"])
|
|
def index_page():
|
|
|
|
if oidc.user_loggedin:
|
|
uid: str = session["oidc_auth_profile"].get("preferred_username")
|
|
|
|
if request.method == "POST":
|
|
response_mode: str = request.form.get("mode").strip()
|
|
if response_mode == "delete":
|
|
ldap_delete_all_uid_entries(
|
|
uid=uid,
|
|
config_json=config_json,
|
|
delete_legacy_entry=bool(config_json["delete_legacy_entry"]),
|
|
)
|
|
|
|
certificate_error = None
|
|
if request.method == "POST":
|
|
response_mode: str = request.form.get("mode").strip()
|
|
|
|
if response_mode == "update":
|
|
|
|
# Check if we have a file upload
|
|
if 'certificate' in request.files:
|
|
file = request.files['certificate']
|
|
if file.filename:
|
|
post_content = file.read().decode('utf-8')
|
|
else:
|
|
post_content = ""
|
|
else:
|
|
# Handle text input
|
|
post_content = request.form.get("certificate", "")
|
|
|
|
post_content = post_content.strip()
|
|
|
|
if len(post_content) == 0:
|
|
post_content = "None"
|
|
|
|
if post_content != "None":
|
|
cert_data = post_content.encode()
|
|
(
|
|
certificate_status,
|
|
certificate_error,
|
|
certificate_emails,
|
|
certificate_name,
|
|
certificate_given_name,
|
|
certificate_surname,
|
|
) = verify_certificate(
|
|
cert_data=cert_data,
|
|
ca_chain_path=config_json["ca_chain"],
|
|
correct_organization=config_json["correct_organization"],
|
|
)
|
|
if len(certificate_error) == 0:
|
|
certificate_error = None
|
|
|
|
if (
|
|
certificate_status
|
|
and len(certificate_emails) > 0
|
|
and len(certificate_name) > 0
|
|
and len(certificate_given_name) > 0
|
|
and len(certificate_surname) > 0
|
|
):
|
|
for certificate_email in certificate_emails:
|
|
ldap_add_email_with_uid(
|
|
uid=uid,
|
|
mail=certificate_email,
|
|
givenName=certificate_given_name,
|
|
sn=certificate_surname,
|
|
cert=cert_data,
|
|
config_json=config_json,
|
|
)
|
|
|
|
# Make a list of all registered email addresses
|
|
html_email_str: str = ""
|
|
emails_registered = ldap_get_all_emails_for_uid(
|
|
uid=uid, config_json=config_json
|
|
)[1]
|
|
html_email_str += "<p>"
|
|
html_email_str += '<div class="form-group">'
|
|
html_email_str += '<label for="given_name">Published eMails Addresses:</label>'
|
|
html_email_str += "</div>"
|
|
if len(emails_registered) > 0:
|
|
|
|
html_email_str += "<ul>"
|
|
|
|
for email_item in emails_registered:
|
|
html_email_str += "<li>"
|
|
html_email_str += '<div class="form-group">'
|
|
html_email_str += f"{email_item}"
|
|
html_email_str += "</div>"
|
|
html_email_str += "</li>"
|
|
|
|
html_email_str += "</ul>"
|
|
else:
|
|
html_email_str += "<b>NONE</b>"
|
|
|
|
# If there was a problem with uploading a certificate
|
|
html_error_str: str = ""
|
|
if certificate_error is not None:
|
|
html_error_str += "<table>"
|
|
html_error_str += "<tr>"
|
|
html_error_str += "<td>"
|
|
html_error_str += "<td>"
|
|
html_error_str += '<div class="form-group">'
|
|
html_error_str += '<span class="status-marker status-fail"></span>'
|
|
html_error_str += "</div>"
|
|
html_error_str += "</td>"
|
|
html_error_str += "<td>"
|
|
html_error_str += '<div class="form-group">'
|
|
html_error_str += f'<label for="error">{certificate_error}</label>'
|
|
html_error_str += "</div>"
|
|
html_error_str += "</td>"
|
|
html_error_str += "</tr>"
|
|
html_error_str += "</table>"
|
|
|
|
return render_template(
|
|
"info.html",
|
|
html_email_str=html_email_str,
|
|
html_error_str=html_error_str,
|
|
uid=uid,
|
|
certificate_error=certificate_error,
|
|
)
|
|
else:
|
|
return redirect("https://smime.neuro.uni-bremen.de/login")
|
|
|
|
|
|
@app.route("/static/<path:path>", methods=["GET"])
|
|
def serve_static_files(path) -> Response:
|
|
return send_from_directory("static", path)
|
|
|
|
|
|
@app.route("/error")
|
|
def error_page():
|
|
return "An error occured!"
|