1. Introducción
Tenemos que crear un programa para que verifique las credenciales de usuario y contraseña sobre el servidor LDAP
También debemos ponerlo en un servidor http para recibir dichos parámetros y devolver la autorización
2. Programa de autenticación LDAP (xmldap.py)
Veamos el código fuente:
La primera línea es de (Shebang #!) que indica el ejecutor del programa. Si en una ventana de comandos ejecutamos xmldap.py, ya sabe que python lo deber ejecutar, que es el de la ruta indicada en el shebang, que es la del entorno virtual que tiene todos los módulos necesarios instalados. (Pero en este caso no lo vamos a ejecutar con el shebang, sino el fichero python del siguiente apartado)
El módulo que nos da toda la gracia de acceso a LDAP es ldap3 que hay que instalarlo con "pip install ldap3" desde el entorno virtual
La función getProps se encarga de leer el fichero de propiedades para aceder al fichero de parámetros del servidor LDAP
La función autheticateByLogin se encarga de acceder al servidor LDAP y autenticar el usuario con su contraseña.
Las demás funciones son para obtener información del sistem LDAP.
Veamos el código de xmldap.py
#!/home/informatica/eduApps/softprop1/venv_softprop/bin/python3
import pprint from ldap3 import Server, Connection, ALL, SUBTREE, ALL_ATTRIBUTES ,SAFE_SYNC, NTLM, ALL_ATTRIBUTES import os, sys from typing import Tuple #------Imprescindible para poder importar de otras carpetas (de basicutils) from softprop.basicutils import xmcrypto, xmfile from softprop import xmproject_folder # ------Fin imprescindible def get_props() -> dict: '''Get the properties from the encrypted file''' logs=[] logs.append('env='+str(os.environ)) xmfile.add_lines_to_file('/home/eduard/kk.log', logs) conf_file=xmproject_folder.get_project_static_conf_params_encr_folder() + os.sep +'ldap.encrypted.yml' config= xmcrypto.get_properties_from_file(conf_file) return config['ldapWindows'] def autheticate_by_login(username: str, password: str)->bool: '''Autheticate by login name and password''' config=get_props() try: server = Server(config['serverName'], get_info=ALL) my_user=config['userPrefix']+username conn = Connection(server, user=my_user, password=password) if conn.bind(): #print('xmldap.autheticate_by_login: OK') return True else: #print('xmldap.autheticate_by_login: FALSE') return False except Exception as e: print(f"LDAP error1 : {e}", file=sys.stderr) if 'conn' in locals(): conn.unbind() return False def get_conn()-> Tuple[Connection, dict, str]: '''Get the connection, config and base DN Returns: ''' conn=None; config=None; base_d_n=None try: config=get_props() base_d_n=config['dn'][config['dn'].find("DC="):] server = Server(config['serverName'], get_info=ALL) conn = Connection(server, user=config['dn'], password=config['password'], auto_bind=True) return conn, config, base_d_n except Exception as e: print(f"LDAP error2 : {e}", file=sys.stderr) if 'conn' in locals(): conn.unbind() return conn, config, base_d_n '''Get all login names from LDAP''' def get_all_logins()->list: entries=None try: conn, config, base_d_n=get_conn() conn.search(base_d_n, '(&(objectclass=person) (sAMAccountName=*))', # COMMENT attributes=['sAMAccountName','cn','mail','ou','userAccountControl','accountStatus', # COMMENT 'nsAccountLock','loginDisabled', 'shadowExpire','pwdAccountLockedTime']) attributes=ALL_ATTRIBUTES) entries=conn.entries # Close the connection conn.unbind() #with open('/home/eduard/kk/ldap.txt', 'w') as f: with open(os.path.join('home','eduard','kk','ldap.txt', 'w')) as f: for e in entries: f.write(f"{str(e).replace("\n", ";")}\n") # COMMENT: return [str(e) for e in entries] return entries except Exception as e: print(f"LDAP error3 : {e}", file=sys.stderr) if 'conn' in locals(): conn.unbind() return [] '''Get all DNs from LDAP''' def get_all_d_ns()->list: try: conn, config, base_d_n=get_conn() conn.search(base_d_n, '(&(objectclass=user) (sAMAccountName=*))',attributes=[]) #conn.search(baseDN, '(&(objectclass=person) (sAMAccountName=*))',attributes=['DN']) entries=conn.entries #pprint.pprint(entries) print(str(entries[0])) # Close the connection conn.unbind() return [str(e)[:str(e).find(" - STATUS:")].replace('DN: ','')for e in entries] except Exception as e: print(f"LDAP error4 : {e}", file=sys.stderr) if 'conn' in locals(): conn.unbind() return [] '''Get all entries from LDAP''' def get_all_entries()->list: try: conn, _, base_d_n=get_conn() conn.search(search_base=base_d_n, search_filter="(objectClass=*)", # Search for all entries,attributes=[]) search_scope=SUBTREE, attributes=None) # Retrieve these attributes # Print the results for entry in conn.entries: print(entry) # Close the connection conn.unbind() except Exception as e: print(f"Error: {e}") # Close the connection conn.unbind() def exists_user(username:str)->bool: try: conn, _, base_d_n=get_conn() # Build the search filter (for example, using uid or sAMAccountName) user_attribute="sAMAccountName" search_filter = f'(&({user_attribute}={username})(objectClass=person))' conn.search(search_base=base_d_n, search_filter=search_filter, search_scope=SUBTREE, attributes=['sAMAccountName','cn','mail','ou']) if conn.entries: print('*****************************************************') print (str(conn.entries)) return True else: return False except Exception as e: print("Search failed:", e) return False def get_user_info(conn, base_d_n, username:str): if not conn: conn, _, base_d_n=get_conn() user_attribute="sAMAccountName" search_filter = f'(&({user_attribute}={username})(objectClass=person))' # COMMENT:conn.search(search_base=base_d_n, search_filter=search_filter, search_scope=SUBTREE, attributes=['sAMAccountName','cn','mail','ou']) conn.search(search_base=base_d_n, search_filter=search_filter, search_scope=SUBTREE, attributes=ALL_ATTRIBUTES) if conn.entries: return conn.entries[0] return None if __name__ == "__main__": if len(sys.argv) != 3: print("Usage: authenticate.py <username> <password>") sys.exit(1) username = sys.argv[1] password = sys.argv[2] #print(f"username: {username}, password: {password}") #print(get_all_d_ns()) print ("==========================================================") print() print (get_all_logins()) print ("==========================================================") print() # COMMENT get_all_entries() # COMMENT print(autheticate_by_login(username, password)) # COMMENT print(autheticateByLogin(username, password+'a')) # COMMENT print(autheticateByLogin(username+'a', password))
3. Programa para obtener el usuario del certificado y com`provar si está enb la base de datos.(xmusers_procs.py)
Veamos el código fuente, del que solo nos interesda la última función:
from datetime import timezone from sqlalchemy import select, and_ import asyncio import sys from softprop.models.xmusermodels import UserStore, UserAbst, UserStoreHst #,xmdb from softprop.basicutils import xmdbv4, xmldap, xmother from softprop.procedures import xmalbaopc_procs def get_user(alba_dict:dict | None= None, ldap_entry=None)->UserAbst: '''Set the user from Sediapualba or LDAP''' # Check if the user is active in LDAP is_active=False if ldap_entry is not None: if 'userAccountControl' in ldap_entry: uac = int(ldap_entry.userAccountControl.value) is_active = not (uac & 2) # OpenLDAP-style check elif 'accountStatus' in ldap_entry : is_active = xmother.get_attr(ldap_entry,'accountStatus','').lower() == 'active' elif 'nsAccountLock' in ldap_entry: is_active = xmother.get_attr(ldap_entry,'nsAccountLock','').lower() != 'true' elif 'loginDisabled' in ldap_entry: is_active = xmother.get_attr(ldap_entry,'loginDisabled','').lower() != 'true' elif 'shadowExpire' in ldap_entry: from datetime import datetime, timedelta # 'shadowExpire' is typically days since Jan 1, 1970 expire_days = int(ldap_entry.shadowExpire.value) if expire_days == -1: is_active = True else: is_active = (datetime.now(timezone.utc) < datetime(1970,1,1) + timedelta(days=expire_days)) elif 'pwdAccountLockedTime' in ldap_entry: is_active = False else: # Default fallback is_active = True # or False, depending on your default stance myuser=UserAbst(description=xmother.get_attr_complex(alba_dict,'Descripcion',ldap_entry,'cn',''), id_alba=alba_dict.get('Id',-1), nif=alba_dict.get('Nif',''), nom=alba_dict.get('Nombre',''), cognom1 = alba_dict.get('Apellido1',''), cognom2 = alba_dict.get('Apellido2',''), puesto = xmother.get_attr_complex(alba_dict,'Puesto',ldap_entry,'sAMAccountName',''), activo = alba_dict.get('Activo','False').lower()=='true', #--LDAP ldap_user=xmother.get_attr(ldap_entry,'sAMAccountName',''), ldap_cn=xmother.get_attr(ldap_entry,'cn', ''), ldap_mail=xmother.get_attr(ldap_entry,'mail',''), ldap_activo=is_active, ldap_user_control=xmother.get_attr(ldap_entry,'userAccountControl',''), #--TOTP ) return myuser def is_dif_user(myuser:UserAbst, mynewuser:UserAbst)->bool: '''Check if the user is different from the new user''' return any([ myuser.description != mynewuser.description, myuser.id_alba != mynewuser.id_alba, myuser.nif != mynewuser.nif, myuser.nom != mynewuser.nom, myuser.cognom1 != mynewuser.cognom1, myuser.cognom2 != mynewuser.cognom2, myuser.puesto != mynewuser.puesto, myuser.activo != mynewuser.activo, ]) async def save_alba_users(): """Save Sediapualba's users to the database""" d_users = {} d_nodes = {} conn = None try: conn, _, base_d_n = xmldap.get_conn() async with xmdbv4.get_async_session(xmdbv4.my_main_db) as db_main: xmalbaopc_procs.alba_get_all_users_nifs(d_nodes=d_nodes, d_users=d_users) for key, value in d_users.items(): operation = "update" id_user = value.get("Puesto", "") if key == "73912286S": id_user = "eduard" ldap_entry = xmldap.get_user_info(conn, base_d_n, id_user) result = await db_main.execute(select(UserStore).filter_by(nif=key)) myuser = result.scalars().first() if not myuser: myuser = UserStore() operation = "insert" mynewuser = get_user(value, ldap_entry) has_changes = is_dif_user(myuser, mynewuser) if operation == "insert" or (operation == "update" and has_changes): auser = await xmdbv4.merge_to_main_and_histo_async_newtrans(xmdbv4.my_main_db, mynewuser, UserStore, UserStoreHst) print(getattr(auser, "description", "No description!!!")) #await db_main.commit() except Exception as e: print(f"Error getting users from Sediapualba: {e}") finally: if conn: conn.unbind() async def save_ldap_users(): """Save LDAP's users to the database""" try: entries = xmldap.get_all_logins() async with xmdbv4.get_async_session(xmdbv4.my_main_db) as db_main: for ldap_entry in entries: dn = getattr(ldap_entry, "entry_dn", "") print(dn) if "USUARIS" in dn.upper() or "USERS" in dn.upper(): result = await db_main.execute( select(UserStore).filter_by(ldap_user=ldap_entry.sAMAccountName.value) ) myuser = result.scalars().first() if not myuser: mynewuser = get_user({}, ldap_entry) auser = await xmdbv4.merge_to_main_and_histo_async_newtrans(xmdbv4.my_main_db, mynewuser, UserStore, UserStoreHst) print(getattr(auser, "description", "No description!!!")) await db_main.commit() except Exception as e: print(f"Error getting users from LDAP: {e}") async def save_alba_ldap_users(): """Save Sediapualba's users and LDAP's users to the database""" await save_alba_users() await save_ldap_users() return "OK", None async def get_user_by_certificate_dn(cert_dn:str)->UserAbst | None: '''Get the user in the database by the DNI in the certificate DN''' nif=xmother.get_substring_between(cert_dn,'serialNumber=',',').strip() if nif=='': nif=xmother.get_substring_between(cert_dn,'- DNI ',',').strip() # Search in the database async with xmdbv4.get_async_session(xmdbv4.my_main_db) as db_main: result = await db_main.execute( select(UserStore).where(and_(UserStore.nif == nif, UserStore.ldap_activo == True)) ) return result.scalars().first() if __name__ == "__main__": asyncio.run(save_alba_ldap_users())
4. Programa de utilidades varias.(xmother.py)
Veamos el código fuente, del que solo nos interesda función:
import base64 import datetime import os import random import secrets import string from typing import Any from pathlib import Path #---General Parameters cookie_max_age=3600 # seconds def new_id(size=10): ''' Crea un id de 10 letras aleatorias en mayúscula''' return ''.join(random.choices(string.ascii_uppercase + string.digits, k=size)) def return_as_list(my_dict:dict)->list: if my_dict is None or len(my_dict)==0: return [] if not isinstance(my_dict, dict): return [my_dict] first_key = next(iter(my_dict)) first_value = my_dict[first_key] if first_value is None: return [] if isinstance(first_value, list): return first_value if isinstance(first_value, dict): return [first_value] def new_return(output_dict:dict| Any, params:list|str, as_list:bool=True): my_obj=None my_error=None if not isinstance(output_dict, dict): if as_list: output_dict=[output_dict] return output_dict, my_error else: if output_dict.get('Error',None)!=None: my_error=output_dict['Error'] if as_list: my_obj=[my_obj] return my_obj,my_error try: if isinstance(params,str): my_obj=output_dict[params] else: my_obj=[output_dict[param] for param in params] except Exception as e: my_error=str(e) if as_list: my_obj=[my_obj] return my_obj, my_error def json_str2json_bytearray(mystr:str)->str: # Hem de lleva les 2 primeres posicions (b') i l'última (') my_bytes=mystr[2:-1].encode() #Truco, sinó no va return my_bytes def arrange_base64(mystr:str)->str: '''Replaces any of the chars of "/+=" with "X" of a baswe64 string ''' toreplace="/+=" for mychar in toreplace: mystr=mystr.replace(mychar,"X") return mystr def get_old_random_string(mylength: int=32)->str: '''get a random string and removes "/+=" ''' my_random_string = base64.b64encode(os.urandom(mylength)).decode('utf-8') # return arrange_base64(my_random_string) def get_random_string(mylength: int=32)->str: '''get a random string and removes "/+=" ''' chars = string.ascii_letters + string.digits return ''.join(secrets.choice(chars) for _ in range(mylength)) def get_last_folder(mypath:str)->str: '''Returns the last folder of a path''' path_obj = Path(mypath).resolve() # Convert to absolute path if path_obj.is_dir(): return path_obj.name # Return the folder itself if it's a directory return path_obj.parent.name # Otherwise, return the parent folder of the file def get_tipo_doc_eni(doc_name:str)->str: #TD99: Otros (expedient eni) if ('es_l' in doc_name.lower() and 'exp_index' in doc_name.lower()): return 'TD02' #TD02: Acuerdo de pleno elif 'acta' in doc_name.lower() and 'sessi' in doc_name.lower(): return 'TD02' elif ('es_l' in doc_name.lower() and 'actes_ple' in doc_name.lower()): return 'TD02' #TD12: Diligencia elif 'dil' in doc_name.lower() and 'actes' in doc_name.lower(): return 'TD12' #TD01: Decreto elif 'decr' in doc_name.lower() or 'resol' in doc_name.lower(): return 'TD01' else: return 'TD99' def get_estado_elaboracion_eni(doc_name:str)->str: if doc_name.endswith('.pdf'): return 'EE01' elif doc_name.endswith('.xml'): if "_DOC_" in doc_name: return 'EE02' elif "_EXP_" in doc_name: return 'EE01' return 'EE04' def get_attr(obj, attr:str, default_value=None): '''Get the attribute of an object''' if obj is not None and len(attr)>0: if isinstance(obj, dict): if attr in obj: return obj[attr] elif hasattr(obj, attr): return getattr(obj, attr).value return default_value def get_attr_complex(obj, attr:str, obj1=None, attr1:str='', default_value=None)->str: '''Get nested alternative attributes form objects''' myvalue= get_attr(obj, attr) if not myvalue: myvalue= get_attr(obj1, attr1, default_value) return myvalue def get_substring_between(mystring: str, start: str, end: str) -> str: """Get the substring between two strings.""" try: print('string='+str(mystring)) print('start='+start) print('end='+end) start_index = mystring.index(start) + len(start) end_index = mystring.index(end, start_index) if end != -1: return mystring[start_index:end_index] else: return mystring[start:] # no end found except ValueError: return "" def get_totp_html(fh, has_totp:bool, image_path:str, is_error:bool=False)->str: '''Get the TOTP HTML code''' myimage='' myimage_label='' myerror='' # 3. Si no tiene TOTP mostramos el código QR para que el Authenticator if not has_totp: myimage=fh.Img(src=image_path, alt="TOTP QR Code", cls="img-fluid") myimage_label=fh.Label("Escanea el QR code con Authenticator", cls="form-label") if is_error: myerror=fh.Div("Error en el 2º Factor de autenticación", cls="alert alert-danger") return fh.Div( myerror, myimage_label, myimage, fh.Label("Introduzca el código TOTP del Authenticator", cls="form-label"), fh.Input(type="text", name="totp_digits", id="totp_digits", cls="form-control"), fh.Label(""), ) def get_yyyymmddhhmmss()->str: '''Get the current date and time in the format YYYYMMDDHHMMSS''' now_= datetime.datetime.now() return now_.strftime("%Y%m%d%H%M%S")+str(now_.microsecond)[:6] # Get the first 6 digits of the microseconds """ def get_project_path(): ''' Gets the path of the current Python project. Returns: str: The absolute path to the project directory. ''' #NO VA: path1=os.path.dirname(os.path.abspath(__file__)) #NO VA: path2=sys.path[0] #NO VA: path3=Path(__file__).parent.parent.absolute() path4=os.getcwdb().decode('utf-8') return path4 """ if __name__ == "__main__": a=get_substring_between('C=ES,O=EDIFICIO_LOCALIDAD,OU=CERTIFICADO ELECTRONICO DE PRUEBA,SN=MARTINEZ MARTINEZ,GN=MARTINETE,serialNumber=12345678A,CN=MARTINETE MARTINEZ MARTINEZ - DNI 12345678A', 'serialNumber=', ',') print (a)
5. Programa python en fastapi para que ejecute un servidor uvicorn que reciba los parámetros y autentique (xmopenresty.py)
Cabe destacar :
- La línea del shebang para que se ejecute desde el entorno virtual correcto
- Si no le indcamos el PROJECT_ROOT, n podemos ejecutar con shebang!!
- La línea que llama a FastAPI (app = FastAPI())
- Las clases AuthRequest y AuthResponse que encapsulan la entrada y salida de datos
- La ruta POST /auth que es la que se encarga de contestar las peticiones de autenticación
- El arranque del servidor uvicorn en l IP 127.0.0.1 y el puerto 5000
Veamos el código de xmopenresty.py:
#!/home/informatica/eduApps/softprop/venv_softprop/bin/python3
import sys import os# --- Ensure project root is in sys.path when run directly --- PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) if PROJECT_ROOT not in sys.path: sys.path.insert(0, PROJECT_ROOT) print("PROJECT_ROOT =", PROJECT_ROOT)from softprop.basicutils import xmldap from softprop.procedures import xmusers_procs from softprop import xmproject_folder # <-- Add this import # ------Fin imprescindible app = FastAPI() class AuthRequest(BaseModel): username: str password: str # cert :str cert_verify : str cert_dn : str class AuthResponse(BaseModel): authenticated: bool def authenticate_db(cert_dn:str, username: str) -> bool: ''' Authenticate the user using the certificate DN and username. This function checks if the user exists in the database with the DNI of the certificate DN. And tests if the user is the same as the one in the DB with the DNI Args: cert_dn (str): The DN of the certificate. username (str): The username to authenticate. Returns: bool: True if the user is authenticated, False otherwise. ''' myuser=xmusers_procs.get_user_by_certificate_dn(cert_dn) if myuser and myuser.ldap_user == username: print("authenticate_db: User authenticated successfully.") return True print("authenticate_db: NOT User authenticated.") return False @app.post("/auth", response_model=AuthResponse) async def auth(request: AuthRequest): ''' This function is only used from openresty (nginx) Verify that the cert is OK and that the user and password exists in LDAP Args: request (AuthRequest): The authentication request containing username, password, and certificate information. Returns: AuthResponse: The authentication response indicating whether the user is authenticated or not. ''' print(request) client_verify = 'SUCCESS' in request.cert_verify.upper() is_authenticated_ldap= xmldap.autheticate_by_login(request.username, request.password) is_authenticated_db= authenticate_db(request.cert_dn, request.username) is_valid_usr_pwd = client_verify and is_authenticated_ldap and is_authenticated_db return AuthResponse(authenticated=is_valid_usr_pwd) if __name__ == "__main__": cert_path=os.path.join(xmproject_folder.get_project_static_folder(), 'certs','wildcard.tavernes.es.') uvicorn.run(app, host="proves.tavernes.es", port=5000, ssl_keyfile =cert_path+"key", ssl_certfile=cert_path+"crt")
Para ejecutarlo, hay que ir a un ventana de comandos y introducir
cd /home/informatica/eduApps
./softprop/authentication/ xmopenresty.py
# ------También se puede ejecutar sin el ./
# softprop/authentication/ xmopenresty.py
Pues como está el shebang de la primera línea, ya sabe que entorno vurtual de python utilizará para su ejecución
No hay comentarios :
Publicar un comentario