martes, 14 de enero de 2025

Nginx (IV) Autenticación LDAP(III). Programa python para autenticación LDAP

 1. Introducción

Tenemos que creare un programa para que verifique las credenciales de usuario y contraseña sobre el servidor LDAP

También debemos ponerlo en un servidor http para recibir dichos parámetros y devolver la autorización

2. Programa de autenticación LDAP (xmldap.py)

Veamos el código fuente:

La primera línea es de (Shebang #!) que indica el ejecutor del programa. Si en una ventana de comandos ejecutamos xmldap.py, ya sabe que python lo deber ejecutar, que es el de la ruda indicada en el shebang, que es la del entorno virtual que tiene todos los módulos necesarios instalados. (Pero en este caso no lo vamos a ehjecutar con el shebang, sino el fichero python del siguiente apartado)

El módulo que nos da toda la gracia de acceso a LDAP es ldap3 que hay que instalarlo con "pip install ldap3" desde el entorno virtual

La función getProps se encarga de leer el fichero de propiedades para aceder al fichero de parámetros del servidor LDAP

La función autheticateByLogin se encarga de acceder al servidor LDAP y autenticar el usuario con su contraseña.

Las demás funciones son para obtener información del sistem LDAP

#!/home/eduard/MyPython/05.fasthtml/venv_fasthtml/bin/python3

import pprint
from ldap3 import Server, Connection, ALL, ALL_ATTRIBUTES ,SAFE_SYNC, NTLM, ALL_ATTRIBUTES
import sys
import os
#------Imprescindible para poder importar de otras carpetas (de basicutils)
import sys
from pathlib import Path
#from ox import xmoxfrmdict2fasthtml as xmoxfrmdict2fasthtml
path_root1 = Path(__file__).parents[1] # Damos un salto de directorio hacia arriba-> ximoutilsmod
sys.path.append(str(path_root1))
path_root0 = Path(__file__).parents[0] # El mismo directorio
sys.path.append(str(path_root0))
from basicutils import xmcrypto, xmfile
# ------Fin imprescindible


def getProps():
	logs=[]
	logs.append('env='+str(os.environ))
	logs.append('os.getenv("CONF_PYTHON")='+str(os.getenv("CONF_PYTHON")))
	xmfile.addLinesToFile('/home/eduard/kk.log', logs)
	confFile=os.getenv("CONF_PYTHON")+'ldap.encrypted.yml'
	config= xmcrypto.getPropertiesFromFile(confFile)
	return config['ldapWindows']

'''Autheticate by login name and password'''
def autheticateByLogin(usename: str, password: str)->bool:
	config=getProps()
	try:
		server = Server(config['serverName'], get_info=ALL)
		myUser=config['userPrefix']+usename
		conn = Connection(server, user=myUser, password=password)
		if conn.bind():
			return True
		else:
			return False
	except Exception as e:
		print(f"LDAP error: {e}", file=sys.stderr)
		if 'conn' in locals():
			conn.unbind()
		return False
	
def getConn()->Connection:
	conn=None; config=None; baseDN=None
	try: 
		config=getProps()
		baseDN=config['dn'][config['dn'].find("DC="):]
		server = Server(config['serverName'], get_info=ALL)
		conn = Connection(server, user=config['dn'], password=config['password'], auto_bind=True)
		return conn, config, baseDN
	except Exception as e:
		print(f"LDAP error: {e}", file=sys.stderr)
		if 'conn' in locals():
			conn.unbind()
		return conn, config, baseDN
	
'''Get all login names from LDAP'''
def getAllLogins()->list:
	try:
		conn, config, baseDN=getConn()
		#conn.search(baseDN, '(&(objectclass=user) (sAMAccountName=*))',attributes=ALL_ATTRIBUTES)
		conn.search(baseDN, '(&(objectclass=person) (sAMAccountName=*))',attributes=['sAMAccountName'])
		entries=conn.entries
		return [str(e['sAMAccountName']) for e in entries]
		
	except Exception as e:
		print(f"LDAP error: {e}", file=sys.stderr)
		if 'conn' in locals():
			conn.unbind()
		return []

'''Get all DNs from LDAP'''
def getAllDNs()->list:
	try:
		conn, config, baseDN=getConn()
		conn.search(baseDN, '(&(objectclass=user) (sAMAccountName=*))',attributes=[])
		#conn.search(baseDN, '(&(objectclass=person) (sAMAccountName=*))',attributes=['DN'])
		entries=conn.entries
		#pprint.pprint(entries)
		print(str(entries[0]))
		return [str(e)[:str(e).find(" - STATUS:")].replace('DN: ','')for e in entries]
		
		
	except Exception as e:
		print(f"LDAP error: {e}", file=sys.stderr)
		if 'conn' in locals():
			conn.unbind()
		return []


			
if __name__ == "__main__":
	if len(sys.argv) != 3:
		print("Usage: authenticate.py <username> <password>")
		sys.exit(1)

	username = sys.argv[1]
	password = sys.argv[2]
	#print(f"username: {username}, password: {password}")

	'''
	print(getAllDNs())
	print (getAllLogins())
	print(autheticateByLogin(username, password))
	print(autheticateByLogin(username, password+'a'))
	print(autheticateByLogin(username+'a', password))
	'''

	if autheticateByLogin(username, password):
		print("OK")
	else:
		print("FAIL")
	

3. Programa python en fastapi para que ejecute un servidor uvicorn que reciba los parámetros y autentique (xmopenresty.py)

Cabe destacar :

  • La línea que llama a FastAPI (app = FastAPI())
  • Las clases AuthRequest y AuthResponse que encapsulan la entrada y salida de datos
  • La ruta POST /auth que es la que se encarga de contestar las peticiones de autenticación
  • El arranque del servidor uvicorn en l IP 127.0.0.1 y el puerto 5000

Veamos el código:

#!/home/eduard/MyPython/05.fasthtml/venv_fasthtml/bin/python3

# auth_service.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import uvicorn

import os
import sys
import base64

#------Imprescindible para poder importar de otras carpetas (de basicutils)
import sys
from pathlib import Path
#from ox import xmoxfrmdict2fasthtml as xmoxfrmdict2fasthtml
path_root1 = Path(__file__).parents[1] # Damos un salto de directorio hacia arriba-> ximoutilsmod
sys.path.append(str(path_root1))
path_root0 = Path(__file__).parents[0] # El mismo directorio
sys.path.append(str(path_root0))
from basicutils import xmldap
# ------Fin imprescindible

app = FastAPI()

class AuthRequest(BaseModel):
    username: str
    password: str

class AuthResponse(BaseModel):
    authenticated: bool

def authenticate(username: str, password: str) -> bool:
    """
    Your existing authentication function.
    Replace this implementation with your actual authentication logic.
    """
    # Example implementation
    #if username == "admin" and password == "secret":
    #    return True
    #return False
    return xmldap.autheticateByLogin(username, password)

@app.post("/auth", response_model=AuthResponse)
async def auth(request: AuthRequest):
    is_valid = authenticate(request.username, request.password)
    return AuthResponse(authenticated=is_valid)

if __name__ == "__main__":
    uvicorn.run(app, host="127.0.0.1", port=5000)

Para ejecutarlo, hay que ir a un ventana de comandos y introducir

cd ruta-mdulos-python
./xmopenresty.py

Pues como está el shebang, ya sabe que python utilizará para su ejecución



No hay comentarios :

Publicar un comentario