jueves, 25 de septiembre de 2025

WEBPROPv2 (VIII). Instalación en un servidor con nginx (II). Instalar el servicio de consulta al LDAP

 

1. Introducción

Tenemos que crear un programa para que verifique las credenciales de usuario y contraseña sobre el servidor LDAP

También debemos ponerlo en un servidor http para recibir dichos parámetros y devolver la autorización

2. Programa de autenticación LDAP (xmldap.py)

Veamos el código fuente:

La primera línea es de (Shebang #!) que indica el ejecutor del programa. Si en una ventana de comandos ejecutamos xmldap.py, ya sabe que python lo deber ejecutar, que es el de la ruta indicada en el shebang, que es la del entorno virtual que tiene todos los módulos necesarios instalados. (Pero en este caso no lo vamos a ejecutar con el shebang, sino el fichero python del siguiente apartado)

El módulo que nos da toda la gracia de acceso a LDAP es ldap3 que hay que instalarlo con "pip install ldap3" desde el entorno virtual

La función getProps se encarga de leer el fichero de propiedades para aceder al fichero de parámetros del servidor LDAP

La función autheticateByLogin se encarga de acceder al servidor LDAP y autenticar el usuario con su contraseña.

Las demás funciones son para obtener información del sistem LDAP. 

Veamos el código de xmldap.py

#!/home/informatica/eduApps/softprop1/venv_softprop/bin/python3

import pprint
from ldap3 import Server, Connection, ALL, SUBTREE, ALL_ATTRIBUTES ,SAFE_SYNC, NTLM, ALL_ATTRIBUTES
import sys
import os
from typing import Tuple
#------Imprescindible para poder importar de otras carpetas (de basicutils)
import sys
from pathlib import Path
path_root1 = Path(__file__).parents[1] # Damos un salto de directorio hacia arriba-> ximoutilsmod
sys.path.append(str(path_root1))
path_root0 = Path(__file__).parents[0] # El mismo directorio
sys.path.append(str(path_root0))
from basicutils import xmcrypto, xmfile
import xmproject_folder
# ------Fin imprescindible


def get_props() -> dict:
	'''Get the properties from the encrypted file'''
	logs=[]
	logs.append('env='+str(os.environ))
	xmfile.add_lines_to_file('/home/eduard/kk.log', logs)
	conf_file=xmproject_folder.get_project_static_conf_params_encr_folder() + os.sep +'ldap.encrypted.yml'
	config= xmcrypto.get_properties_from_file(conf_file)
	return config['ldapWindows']

def autheticate_by_login(username: str, password: str)->bool:
	'''Autheticate by login name and password'''
	config=get_props()
	try:
		server = Server(config['serverName'], get_info=ALL)
		my_user=config['userPrefix']+username
		conn = Connection(server, user=my_user, password=password)
		if conn.bind():
			#print('xmldap.autheticate_by_login: OK')
			return True
		else:
			#print('xmldap.autheticate_by_login: FALSE')
			return False
	except Exception as e:
		print(f"LDAP error1 : {e}", file=sys.stderr)
		if 'conn' in locals():
			conn.unbind()
		return False
	
def get_conn()-> Tuple[Connection, dict, str]:
	'''Get the connection, config and base DN
	Returns:
		'''
	conn=None; config=None; base_d_n=None
	try: 
		config=get_props()
		base_d_n=config['dn'][config['dn'].find("DC="):]
		server = Server(config['serverName'], get_info=ALL)
		conn = Connection(server, user=config['dn'], password=config['password'], auto_bind=True)
		return conn, config, base_d_n
	except Exception as e:
		print(f"LDAP error2 : {e}", file=sys.stderr)
		if 'conn' in locals():
			conn.unbind()
		return conn, config, base_d_n
	
'''Get all login names from LDAP'''
def get_all_logins()->list:
	entries=None
	try:
		conn, config, base_d_n=get_conn()
		conn.search(base_d_n, '(&(objectclass=person) (sAMAccountName=*))',
			# COMMENT attributes=['sAMAccountName','cn','mail','ou','userAccountControl','accountStatus',
			# COMMENT 'nsAccountLock','loginDisabled', 'shadowExpire','pwdAccountLockedTime'])
			attributes=ALL_ATTRIBUTES)
		entries=conn.entries
		# Close the connection
		conn.unbind()
		with open('/home/eduard/kk/ldap.txt', 'w') as f:
			for e in entries:
				f.write(f"{str(e).replace("\n", ";")}\n")
		# COMMENT: return [str(e) for e in entries]
		return entries
		
	except Exception as e:
		print(f"LDAP error3 : {e}", file=sys.stderr)
		if 'conn' in locals():
			conn.unbind()
		return []

'''Get all DNs from LDAP'''
def get_all_d_ns()->list:
	try:
		conn, config, base_d_n=get_conn()
		conn.search(base_d_n, '(&(objectclass=user) (sAMAccountName=*))',attributes=[])
		#conn.search(baseDN, '(&(objectclass=person) (sAMAccountName=*))',attributes=['DN'])
		entries=conn.entries
		#pprint.pprint(entries)
		print(str(entries[0]))
		# Close the connection
		conn.unbind()
		return [str(e)[:str(e).find(" - STATUS:")].replace('DN: ','')for e in entries]
		
		
	except Exception as e:
		print(f"LDAP error4 : {e}", file=sys.stderr)
		if 'conn' in locals():
			conn.unbind()
		return []

'''Get all entries from LDAP'''
def get_all_entries()->list:
	try:
		conn, _, base_d_n=get_conn()
		conn.search(search_base=base_d_n, 
			search_filter="(objectClass=*)",  # Search for all entries,attributes=[])
			search_scope=SUBTREE, 
			attributes=None)  # Retrieve these attributes

			# Print the results
		for entry in conn.entries:
			print(entry)

		# Close the connection
		conn.unbind()

	except Exception as e:
		print(f"Error: {e}")
		# Close the connection
		conn.unbind()


def exists_user(username:str)->bool:
	try:
		conn, _, base_d_n=get_conn()
		# Build the search filter (for example, using uid or sAMAccountName)
		user_attribute="sAMAccountName"	
		search_filter = f'(&({user_attribute}={username})(objectClass=person))'
		
		conn.search(search_base=base_d_n, search_filter=search_filter, search_scope=SUBTREE, attributes=['sAMAccountName','cn','mail','ou'])
		
		if conn.entries:
			print('*****************************************************')
			print (str(conn.entries))
			return True
		else:
			return False
	except Exception as e:
		print("Search failed:", e)
		return False

def get_user_info(conn, base_d_n, username:str):
	if not conn:
		conn, _, base_d_n=get_conn()
	
	user_attribute="sAMAccountName"	
	search_filter = f'(&({user_attribute}={username})(objectClass=person))'
	# COMMENT:conn.search(search_base=base_d_n, search_filter=search_filter, search_scope=SUBTREE, attributes=['sAMAccountName','cn','mail','ou'])	
	conn.search(search_base=base_d_n, search_filter=search_filter, search_scope=SUBTREE, attributes=ALL_ATTRIBUTES)
	if conn.entries: return conn.entries[0]
	return None
			
if __name__ == "__main__":
	if len(sys.argv) != 3:
		print("Usage: authenticate.py <username> <password>")
		sys.exit(1)

	username = sys.argv[1]
	password = sys.argv[2]
	#print(f"username: {username}, password: {password}")

	#print(get_all_d_ns())
	print ("==========================================================")
	print()
	print (get_all_logins())
	print ("==========================================================")
	print()
	# COMMENT get_all_entries()
	# COMMENT print(autheticate_by_login(username, password))
	# COMMENT print(autheticateByLogin(username, password+'a'))
	# COMMENT print(autheticateByLogin(username+'a', password))
	

	if autheticate_by_login(username, password):
		print("OK")
	else:
		print("FAIL")

	if exists_user(username):
		print("User exists")
	else:
		print("NOT a User")	
	

3. Programa python en fastapi para que ejecute un servidor uvicorn que reciba los parámetros y autentique (xmopenresty.py)

Cabe destacar :

  • La línea del shebang para que se ejecute desde el entorno virtual correcto
  • La línea que llama a FastAPI (app = FastAPI())
  • Las clases AuthRequest y AuthResponse que encapsulan la entrada y salida de datos
  • La ruta POST /auth que es la que se encarga de contestar las peticiones de autenticación
  • El arranque del servidor uvicorn en l IP 127.0.0.1 y el puerto 5000

Veamos el código de xmopenresty.py:

#!/home/informatica/eduApps/softprop1/venv_softprop/bin/python3

# auth_service.py from fastapi import FastAPI, HTTPException from pydantic import BaseModel import uvicorn import os import sys import base64 #------Imprescindible para poder importar de otras carpetas (de basicutils) import sys from pathlib import Path #from ox import xmoxfrmdict2fasthtml as xmoxfrmdict2fasthtml path_root1 = Path(__file__).parents[1] # Damos un salto de directorio hacia arriba-> ximoutilsmod sys.path.append(str(path_root1)) path_root0 = Path(__file__).parents[0] # El mismo directorio sys.path.append(str(path_root0)) from basicutils import xmldap # ------Fin imprescindible app = FastAPI() class AuthRequest(BaseModel): username: str password: str class AuthResponse(BaseModel): authenticated: bool def authenticate(username: str, password: str) -> bool: """ Your existing authentication function. Replace this implementation with your actual authentication logic. """ # Example implementation #if username == "admin" and password == "secret": # return True #return False return xmldap.autheticateByLogin(username, password) @app.post("/auth", response_model=AuthResponse) async def auth(request: AuthRequest): is_valid = authenticate(request.username, request.password) return AuthResponse(authenticated=is_valid) if __name__ == "__main__": uvicorn.run(app, host="127.0.0.1", port=5000)

Para ejecutarlo, hay que ir a un ventana de comandos y introducir

cd ruta-mdulos-python
./xmopenresty.py

Pues como está el shebang de la primera línea, ya sabe que entorno vurtual de python utilizará para su ejecución


No hay comentarios :

Publicar un comentario