Source code for simdb.remote.core.auth.ldap
from typing import Optional
import ldap
from flask import Request
from simdb.config import Config
from ._authenticator import Authenticator
from ._exceptions import AuthenticationError
from ._user import User
[docs]
class LdapAuthenticator(Authenticator):
"""
Authenticator for authenticating using an LDAP server.
This requires the following extra parameters in the server configuration:
ldap_server - the URI of the LDAP server
ldap_bind - the bind string for the LDAP authentication (formatted to
replace {username} with username)
ldap_query_user - the bind string for the LDAP query
ldap_query_password - the password for the LDAP query
ldap_query_base - the base point for the LDAP query
ldap_query_filter - the filter to apply to the LDAP query (formatted to replace
{username} with username)
"""
Name = "LDAP"
[docs]
def authenticate(self, config: Config, request: Request) -> Optional[User]:
ldap_host = config.get_option("authentication.ldap_server")
try:
conn = ldap.initialize(ldap_host)
except ldap.LDAPError as err: # ty: ignore[unresolved-attribute]
raise AuthenticationError("failed to connect to ldap server") from err
auth = request.authorization
if not auth:
return None
username = auth.username
password = auth.password
ldap_bind: str = config.get_string_option("authentication.ldap_bind")
try:
conn.simple_bind_s(ldap_bind.format(username=username), password)
except ldap.INVALID_CREDENTIALS: # ty: ignore[unresolved-attribute]
return None
ldap_query_user = config.get_option(
"authentication.ldap_query_user", default=None
)
ldap_query_password = config.get_option(
"authentication.ldap_query_password", default=None
)
if ldap_query_user is not None:
conn.unbind_s()
try:
conn = ldap.initialize(ldap_host)
except ldap.LDAPError as err: # ty: ignore[unresolved-attribute]
raise AuthenticationError("failed to connect to ldap server") from err
try:
conn.simple_bind_s(ldap_query_user, ldap_query_password)
except ldap.INVALID_CREDENTIALS as err: # ty: ignore[unresolved-attribute]
raise AuthenticationError(
"failed to bind to LDAP server for user query"
) from err
ldap_query_base = config.get_option("authentication.ldap_query_base")
ldap_query_filter = str(config.get_option("authentication.ldap_query_filter"))
ldap_query_uid = config.get_option(
"authentication.ldap_query_uid", default="uid"
)
ldap_query_mail = config.get_option(
"authentication.ldap_query_mail", default="mail"
)
results = conn.search_s(
ldap_query_base,
ldap.SCOPE_SUBTREE, # ty: ignore[unresolved-attribute]
ldap_query_filter.format(username=username),
)
try:
user = results[0][1][ldap_query_uid][0].decode()
mail = results[0][1][ldap_query_mail][0].decode()
return User(user, mail)
except Exception as err:
raise AuthenticationError("failed to find user in LDAP query") from err