import ldap3 # type: ignore from OpenSSL import crypto # type: ignore import base64 def ldap_add_email_with_uid( uid: str, mail: str, givenName: str, sn: str, cert, config_json: dict, ) -> bool: server: ldap3.core.server.Server = ldap3.Server( config_json["ldap_host"], get_info=ldap3.ALL ) user_dn = f"uid={uid}#{mail},ou=people,dc=smime,dc=uni-bremen,dc=de" # Does this user exit? If so delete it. try: with ldap3.Connection( server, user=config_json["ldap_bind_dn"], password=config_json["ldap_bind_password"], auto_bind=True, ) as conn: conn.search( search_base=config_json["people_dn"], search_filter=f"(uid={uid}#{mail})", attributes=["uid"], ) entries_to_delete = [] for entry in conn.entries: entries_to_delete.append(entry.entry_dn) for entry in entries_to_delete: conn.delete(entry) except Exception as e: print(f"Error finding user: {e}") # Create the user userSMIMECertificate = base64.b64encode(cert).decode() # Convert PEM to X509 certificate object x509_cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert) # Convert to DER format der_cert = crypto.dump_certificate(crypto.FILETYPE_ASN1, x509_cert) user_attributes = { "objectClass": [ "top", "person", # "nsAccount", # "nsOrgPerson", "organizationalPerson", "inetOrgPerson", ], "uid": f"{uid}#{mail}", "cn": f"{givenName} {sn}", "givenName": givenName, "sn": sn, "mail": mail, "userSMIMECertificate": {userSMIMECertificate}, "userCertificate;binary": {der_cert}, } try: with ldap3.Connection( server, user=config_json["ldap_bind_dn"], password=config_json["ldap_bind_password"], auto_bind=True, ) as conn: conn.add(user_dn, attributes=user_attributes) except Exception as e: print(f"Error creating user: {e}") return False print(f"User added: {user_dn}") return True