1. Introducción
Tenemos que crear un programa para que verifique las credenciales de usuario y contraseña sobre el servidor LDAP
También debemos ponerlo en un servidor http para recibir dichos parámetros y devolver la autorización
2. Programa de autenticación LDAP (xmldap.py)
Veamos el código fuente:
La primera línea es de (Shebang #!) que indica el ejecutor del programa. Si en una ventana de comandos ejecutamos xmldap.py, ya sabe que python lo deber ejecutar, que es el de la ruta indicada en el shebang, que es la del entorno virtual que tiene todos los módulos necesarios instalados. (Pero en este caso no lo vamos a ejecutar con el shebang, sino el fichero python del siguiente apartado)
El módulo que nos da toda la gracia de acceso a LDAP es ldap3 que hay que instalarlo con "pip install ldap3" desde el entorno virtual
La función getProps se encarga de leer el fichero de propiedades para aceder al fichero de parámetros del servidor LDAP
La función autheticateByLogin se encarga de acceder al servidor LDAP y autenticar el usuario con su contraseña.
Las demás funciones son para obtener información del sistem LDAP.
Veamos el código de xmldap.py
#!/home/informatica/eduApps/softprop1/venv_softprop/bin/python3 import pprint from ldap3 import Server, Connection, ALL, SUBTREE, ALL_ATTRIBUTES ,SAFE_SYNC, NTLM, ALL_ATTRIBUTES import sys import os from typing import Tuple #------Imprescindible para poder importar de otras carpetas (de basicutils) import sys from pathlib import Path path_root1 = Path(__file__).parents[1] # Damos un salto de directorio hacia arriba-> ximoutilsmod sys.path.append(str(path_root1)) path_root0 = Path(__file__).parents[0] # El mismo directorio sys.path.append(str(path_root0)) from basicutils import xmcrypto, xmfile import xmproject_folder # ------Fin imprescindible def get_props() -> dict: '''Get the properties from the encrypted file''' logs=[] logs.append('env='+str(os.environ)) xmfile.add_lines_to_file('/home/eduard/kk.log', logs) conf_file=xmproject_folder.get_project_static_conf_params_encr_folder() + os.sep +'ldap.encrypted.yml' config= xmcrypto.get_properties_from_file(conf_file) return config['ldapWindows'] def autheticate_by_login(username: str, password: str)->bool: '''Autheticate by login name and password''' config=get_props() try: server = Server(config['serverName'], get_info=ALL) my_user=config['userPrefix']+username conn = Connection(server, user=my_user, password=password) if conn.bind(): #print('xmldap.autheticate_by_login: OK') return True else: #print('xmldap.autheticate_by_login: FALSE') return False except Exception as e: print(f"LDAP error1 : {e}", file=sys.stderr) if 'conn' in locals(): conn.unbind() return False def get_conn()-> Tuple[Connection, dict, str]: '''Get the connection, config and base DN Returns: ''' conn=None; config=None; base_d_n=None try: config=get_props() base_d_n=config['dn'][config['dn'].find("DC="):] server = Server(config['serverName'], get_info=ALL) conn = Connection(server, user=config['dn'], password=config['password'], auto_bind=True) return conn, config, base_d_n except Exception as e: print(f"LDAP error2 : {e}", file=sys.stderr) if 'conn' in locals(): conn.unbind() return conn, config, base_d_n '''Get all login names from LDAP''' def get_all_logins()->list: entries=None try: conn, config, base_d_n=get_conn() conn.search(base_d_n, '(&(objectclass=person) (sAMAccountName=*))', # COMMENT attributes=['sAMAccountName','cn','mail','ou','userAccountControl','accountStatus', # COMMENT 'nsAccountLock','loginDisabled', 'shadowExpire','pwdAccountLockedTime']) attributes=ALL_ATTRIBUTES) entries=conn.entries # Close the connection conn.unbind() with open('/home/eduard/kk/ldap.txt', 'w') as f: for e in entries: f.write(f"{str(e).replace("\n", ";")}\n") # COMMENT: return [str(e) for e in entries] return entries except Exception as e: print(f"LDAP error3 : {e}", file=sys.stderr) if 'conn' in locals(): conn.unbind() return [] '''Get all DNs from LDAP''' def get_all_d_ns()->list: try: conn, config, base_d_n=get_conn() conn.search(base_d_n, '(&(objectclass=user) (sAMAccountName=*))',attributes=[]) #conn.search(baseDN, '(&(objectclass=person) (sAMAccountName=*))',attributes=['DN']) entries=conn.entries #pprint.pprint(entries) print(str(entries[0])) # Close the connection conn.unbind() return [str(e)[:str(e).find(" - STATUS:")].replace('DN: ','')for e in entries] except Exception as e: print(f"LDAP error4 : {e}", file=sys.stderr) if 'conn' in locals(): conn.unbind() return [] '''Get all entries from LDAP''' def get_all_entries()->list: try: conn, _, base_d_n=get_conn() conn.search(search_base=base_d_n, search_filter="(objectClass=*)", # Search for all entries,attributes=[]) search_scope=SUBTREE, attributes=None) # Retrieve these attributes # Print the results for entry in conn.entries: print(entry) # Close the connection conn.unbind() except Exception as e: print(f"Error: {e}") # Close the connection conn.unbind() def exists_user(username:str)->bool: try: conn, _, base_d_n=get_conn() # Build the search filter (for example, using uid or sAMAccountName) user_attribute="sAMAccountName" search_filter = f'(&({user_attribute}={username})(objectClass=person))' conn.search(search_base=base_d_n, search_filter=search_filter, search_scope=SUBTREE, attributes=['sAMAccountName','cn','mail','ou']) if conn.entries: print('*****************************************************') print (str(conn.entries)) return True else: return False except Exception as e: print("Search failed:", e) return False def get_user_info(conn, base_d_n, username:str): if not conn: conn, _, base_d_n=get_conn() user_attribute="sAMAccountName" search_filter = f'(&({user_attribute}={username})(objectClass=person))' # COMMENT:conn.search(search_base=base_d_n, search_filter=search_filter, search_scope=SUBTREE, attributes=['sAMAccountName','cn','mail','ou']) conn.search(search_base=base_d_n, search_filter=search_filter, search_scope=SUBTREE, attributes=ALL_ATTRIBUTES) if conn.entries: return conn.entries[0] return None if __name__ == "__main__": if len(sys.argv) != 3: print("Usage: authenticate.py <username> <password>") sys.exit(1) username = sys.argv[1] password = sys.argv[2] #print(f"username: {username}, password: {password}") #print(get_all_d_ns()) print ("==========================================================") print() print (get_all_logins()) print ("==========================================================") print() # COMMENT get_all_entries() # COMMENT print(autheticate_by_login(username, password)) # COMMENT print(autheticateByLogin(username, password+'a')) # COMMENT print(autheticateByLogin(username+'a', password)) if autheticate_by_login(username, password): print("OK") else: print("FAIL") if exists_user(username): print("User exists") else: print("NOT a User")
3. Programa python en fastapi para que ejecute un servidor uvicorn que reciba los parámetros y autentique (xmopenresty.py)
Cabe destacar :
- La línea del shebang para que se ejecute desde el entorno virtual correcto
- La línea que llama a FastAPI (app = FastAPI())
- Las clases AuthRequest y AuthResponse que encapsulan la entrada y salida de datos
- La ruta POST /auth que es la que se encarga de contestar las peticiones de autenticación
- El arranque del servidor uvicorn en l IP 127.0.0.1 y el puerto 5000
Veamos el código de xmopenresty.py:
#!/home/informatica/eduApps/softprop1/venv_softprop/bin/python3# auth_service.py from fastapi import FastAPI, HTTPException from pydantic import BaseModel import uvicorn import os import sys import base64 #------Imprescindible para poder importar de otras carpetas (de basicutils) import sys from pathlib import Path #from ox import xmoxfrmdict2fasthtml as xmoxfrmdict2fasthtml path_root1 = Path(__file__).parents[1] # Damos un salto de directorio hacia arriba-> ximoutilsmod sys.path.append(str(path_root1)) path_root0 = Path(__file__).parents[0] # El mismo directorio sys.path.append(str(path_root0)) from basicutils import xmldap # ------Fin imprescindible app = FastAPI() class AuthRequest(BaseModel): username: str password: str class AuthResponse(BaseModel): authenticated: bool def authenticate(username: str, password: str) -> bool: """ Your existing authentication function. Replace this implementation with your actual authentication logic. """ # Example implementation #if username == "admin" and password == "secret": # return True #return False return xmldap.autheticateByLogin(username, password) @app.post("/auth", response_model=AuthResponse) async def auth(request: AuthRequest): is_valid = authenticate(request.username, request.password) return AuthResponse(authenticated=is_valid) if __name__ == "__main__": uvicorn.run(app, host="127.0.0.1", port=5000)
Para ejecutarlo, hay que ir a un ventana de comandos y introducir
cd ruta-mdulos-python
./xmopenresty.py
Pues como está el shebang de la primera línea, ya sabe que entorno vurtual de python utilizará para su ejecución
No hay comentarios :
Publicar un comentario