jueves, 25 de septiembre de 2025

WEBPROPv2 (VIII). Instalación en un servidor con nginx (II). Instalar el servicio de consulta al LDAP

 

1. Introducción

Tenemos que crear un programa para que verifique las credenciales de usuario y contraseña sobre el servidor LDAP

También debemos ponerlo en un servidor http para recibir dichos parámetros y devolver la autorización

2. Programa de autenticación LDAP (xmldap.py)

Veamos el código fuente:

La primera línea es de (Shebang #!) que indica el ejecutor del programa. Si en una ventana de comandos ejecutamos xmldap.py, ya sabe que python lo deber ejecutar, que es el de la ruta indicada en el shebang, que es la del entorno virtual que tiene todos los módulos necesarios instalados. (Pero en este caso no lo vamos a ejecutar con el shebang, sino el fichero python del siguiente apartado)

El módulo que nos da toda la gracia de acceso a LDAP es ldap3 que hay que instalarlo con "pip install ldap3" desde el entorno virtual

La función getProps se encarga de leer el fichero de propiedades para aceder al fichero de parámetros del servidor LDAP

La función autheticateByLogin se encarga de acceder al servidor LDAP y autenticar el usuario con su contraseña.

Las demás funciones son para obtener información del sistem LDAP. 

Veamos el código de xmldap.py

#!/home/informatica/eduApps/softprop1/venv_softprop/bin/python3

import pprint
from ldap3 import Server, Connection, ALL, SUBTREE, ALL_ATTRIBUTES ,SAFE_SYNC, NTLM, ALL_ATTRIBUTES
import os, sys
from typing import Tuple
#------Imprescindible para poder importar de otras carpetas (de basicutils)
from softprop.basicutils import xmcrypto, xmfile
from softprop import xmproject_folder
# ------Fin imprescindible


def get_props() -> dict:
	'''Get the properties from the encrypted file'''
	logs=[]
	logs.append('env='+str(os.environ))
	xmfile.add_lines_to_file('/home/eduard/kk.log', logs)
	conf_file=xmproject_folder.get_project_static_conf_params_encr_folder() + os.sep +'ldap.encrypted.yml'
	config= xmcrypto.get_properties_from_file(conf_file)
	return config['ldapWindows']

def autheticate_by_login(username: str, password: str)->bool:
	'''Autheticate by login name and password'''
	config=get_props()
	try:
		server = Server(config['serverName'], get_info=ALL)
		my_user=config['userPrefix']+username
		conn = Connection(server, user=my_user, password=password)
		if conn.bind():
			#print('xmldap.autheticate_by_login: OK')
			return True
		else:
			#print('xmldap.autheticate_by_login: FALSE')
			return False
	except Exception as e:
		print(f"LDAP error1 : {e}", file=sys.stderr)
		if 'conn' in locals():
			conn.unbind()
		return False
	
def get_conn()-> Tuple[Connection, dict, str]:
	'''Get the connection, config and base DN
	Returns:
		'''
	conn=None; config=None; base_d_n=None
	try: 
		config=get_props()
		base_d_n=config['dn'][config['dn'].find("DC="):]
		server = Server(config['serverName'], get_info=ALL)
		conn = Connection(server, user=config['dn'], password=config['password'], auto_bind=True)
		return conn, config, base_d_n
	except Exception as e:
		print(f"LDAP error2 : {e}", file=sys.stderr)
		if 'conn' in locals():
			conn.unbind()
		return conn, config, base_d_n
	
'''Get all login names from LDAP'''
def get_all_logins()->list:
	entries=None
	try:
		conn, config, base_d_n=get_conn()
		conn.search(base_d_n, '(&(objectclass=person) (sAMAccountName=*))',
			# COMMENT attributes=['sAMAccountName','cn','mail','ou','userAccountControl','accountStatus',
			# COMMENT 'nsAccountLock','loginDisabled', 'shadowExpire','pwdAccountLockedTime'])
			attributes=ALL_ATTRIBUTES)
		entries=conn.entries
		# Close the connection
		conn.unbind()
		#with open('/home/eduard/kk/ldap.txt', 'w') as f:
		with open(os.path.join('home','eduard','kk','ldap.txt', 'w')) as f:
			for e in entries:
				f.write(f"{str(e).replace("\n", ";")}\n")
		# COMMENT: return [str(e) for e in entries]
		return entries
		
	except Exception as e:
		print(f"LDAP error3 : {e}", file=sys.stderr)
		if 'conn' in locals():
			conn.unbind()
		return []

'''Get all DNs from LDAP'''
def get_all_d_ns()->list:
	try:
		conn, config, base_d_n=get_conn()
		conn.search(base_d_n, '(&(objectclass=user) (sAMAccountName=*))',attributes=[])
		#conn.search(baseDN, '(&(objectclass=person) (sAMAccountName=*))',attributes=['DN'])
		entries=conn.entries
		#pprint.pprint(entries)
		print(str(entries[0]))
		# Close the connection
		conn.unbind()
		return [str(e)[:str(e).find(" - STATUS:")].replace('DN: ','')for e in entries]
		
		
	except Exception as e:
		print(f"LDAP error4 : {e}", file=sys.stderr)
		if 'conn' in locals():
			conn.unbind()
		return []

'''Get all entries from LDAP'''
def get_all_entries()->list:
	try:
		conn, _, base_d_n=get_conn()
		conn.search(search_base=base_d_n, 
			search_filter="(objectClass=*)",  # Search for all entries,attributes=[])
			search_scope=SUBTREE, 
			attributes=None)  # Retrieve these attributes

			# Print the results
		for entry in conn.entries:
			print(entry)

		# Close the connection
		conn.unbind()

	except Exception as e:
		print(f"Error: {e}")
		# Close the connection
		conn.unbind()


def exists_user(username:str)->bool:
	try:
		conn, _, base_d_n=get_conn()
		# Build the search filter (for example, using uid or sAMAccountName)
		user_attribute="sAMAccountName"	
		search_filter = f'(&({user_attribute}={username})(objectClass=person))'
		
		conn.search(search_base=base_d_n, search_filter=search_filter, search_scope=SUBTREE, attributes=['sAMAccountName','cn','mail','ou'])
		
		if conn.entries:
			print('*****************************************************')
			print (str(conn.entries))
			return True
		else:
			return False
	except Exception as e:
		print("Search failed:", e)
		return False

def get_user_info(conn, base_d_n, username:str):
	if not conn:
		conn, _, base_d_n=get_conn()
	
	user_attribute="sAMAccountName"	
	search_filter = f'(&({user_attribute}={username})(objectClass=person))'
	# COMMENT:conn.search(search_base=base_d_n, search_filter=search_filter, search_scope=SUBTREE, attributes=['sAMAccountName','cn','mail','ou'])	
	conn.search(search_base=base_d_n, search_filter=search_filter, search_scope=SUBTREE, attributes=ALL_ATTRIBUTES)
	if conn.entries: return conn.entries[0]
	return None
			
if __name__ == "__main__":
	if len(sys.argv) != 3:
		print("Usage: authenticate.py <username> <password>")
		sys.exit(1)

	username = sys.argv[1]
	password = sys.argv[2]
	#print(f"username: {username}, password: {password}")

	#print(get_all_d_ns())
	print ("==========================================================")
	print()
	print (get_all_logins())
	print ("==========================================================")
	print()
	# COMMENT get_all_entries()
	# COMMENT print(autheticate_by_login(username, password))
	# COMMENT print(autheticateByLogin(username, password+'a'))
	# COMMENT print(autheticateByLogin(username+'a', password))
	

3. Programa para obtener el usuario del certificado y com`provar si está enb la base de datos.(xmusers_procs.py)

Veamos el código fuente, del que solo nos interesda la última función:

from datetime import timezone
from sqlalchemy import select, and_
import asyncio
import sys

from softprop.models.xmusermodels import UserStore, UserAbst, UserStoreHst #,xmdb
from softprop.basicutils import xmdbv4, xmldap, xmother
from softprop.procedures import xmalbaopc_procs


def get_user(alba_dict:dict | None= None, ldap_entry=None)->UserAbst:
	'''Set the user from Sediapualba or LDAP'''
		
	# Check if the user is active in LDAP
	is_active=False
	if ldap_entry is not None:
		if 'userAccountControl' in ldap_entry:
			uac = int(ldap_entry.userAccountControl.value)
			is_active = not (uac & 2)
		# OpenLDAP-style check
		elif 'accountStatus' in ldap_entry :
			is_active = xmother.get_attr(ldap_entry,'accountStatus','').lower() == 'active'
		elif 'nsAccountLock' in ldap_entry:
			is_active = xmother.get_attr(ldap_entry,'nsAccountLock','').lower() != 'true'
		elif 'loginDisabled' in ldap_entry:
			is_active = xmother.get_attr(ldap_entry,'loginDisabled','').lower() != 'true'
		elif 'shadowExpire' in ldap_entry:
			from datetime import datetime, timedelta
			# 'shadowExpire' is typically days since Jan 1, 1970
			expire_days = int(ldap_entry.shadowExpire.value)
			if expire_days == -1:
				is_active = True
			else:
				is_active = (datetime.now(timezone.utc) < datetime(1970,1,1) + timedelta(days=expire_days))
		elif 'pwdAccountLockedTime' in ldap_entry:
			is_active = False
		else:
			# Default fallback
			is_active = True  # or False, depending on your default stance

	myuser=UserAbst(description=xmother.get_attr_complex(alba_dict,'Descripcion',ldap_entry,'cn',''),
		id_alba=alba_dict.get('Id',-1),
		nif=alba_dict.get('Nif',''),
		nom=alba_dict.get('Nombre',''),
		cognom1 = alba_dict.get('Apellido1',''),
		cognom2 = alba_dict.get('Apellido2',''),
		puesto = xmother.get_attr_complex(alba_dict,'Puesto',ldap_entry,'sAMAccountName',''), 
		activo = alba_dict.get('Activo','False').lower()=='true',
		#--LDAP
		ldap_user=xmother.get_attr(ldap_entry,'sAMAccountName',''),
		ldap_cn=xmother.get_attr(ldap_entry,'cn', ''),
		ldap_mail=xmother.get_attr(ldap_entry,'mail',''),
		ldap_activo=is_active,
		ldap_user_control=xmother.get_attr(ldap_entry,'userAccountControl',''),
		#--TOTP

	)	
	return myuser

def is_dif_user(myuser:UserAbst, mynewuser:UserAbst)->bool:
	'''Check if the user is different from the new user'''
	return any([
        myuser.description != mynewuser.description,
        myuser.id_alba != mynewuser.id_alba,
        myuser.nif != mynewuser.nif,
        myuser.nom != mynewuser.nom,
        myuser.cognom1 != mynewuser.cognom1,
        myuser.cognom2 != mynewuser.cognom2,
        myuser.puesto != mynewuser.puesto,
        myuser.activo != mynewuser.activo,
    ])

async def save_alba_users():
    """Save Sediapualba's users to the database"""
    d_users = {}
    d_nodes = {}
    conn = None
    try:
        conn, _, base_d_n = xmldap.get_conn()
        async with xmdbv4.get_async_session(xmdbv4.my_main_db) as db_main:
            xmalbaopc_procs.alba_get_all_users_nifs(d_nodes=d_nodes, d_users=d_users)

            for key, value in d_users.items():
                operation = "update"
                id_user = value.get("Puesto", "")
                if key == "73912286S":
                    id_user = "eduard"
                ldap_entry = xmldap.get_user_info(conn, base_d_n, id_user)

                result = await db_main.execute(select(UserStore).filter_by(nif=key))
                myuser = result.scalars().first()
                if not myuser:
                    myuser = UserStore()
                    operation = "insert"

                mynewuser = get_user(value, ldap_entry)
                has_changes = is_dif_user(myuser, mynewuser)

                if operation == "insert" or (operation == "update" and has_changes):
                    auser = await xmdbv4.merge_to_main_and_histo_async_newtrans(xmdbv4.my_main_db, mynewuser, UserStore, UserStoreHst)
                    print(getattr(auser, "description", "No description!!!"))

            #await db_main.commit()
            
    except Exception as e:
        print(f"Error getting users from Sediapualba: {e}")
    finally:
        if conn:
            conn.unbind()
    

async def save_ldap_users():
    """Save LDAP's users to the database"""
    try:
        entries = xmldap.get_all_logins()
        async with xmdbv4.get_async_session(xmdbv4.my_main_db) as db_main:
            for ldap_entry in entries:
                dn = getattr(ldap_entry, "entry_dn", "")
                print(dn)
                if "USUARIS" in dn.upper() or "USERS" in dn.upper():
                    result = await db_main.execute(
                        select(UserStore).filter_by(ldap_user=ldap_entry.sAMAccountName.value)
                    )
                    myuser = result.scalars().first()
                    if not myuser:
                        mynewuser = get_user({}, ldap_entry)
                        auser = await xmdbv4.merge_to_main_and_histo_async_newtrans(xmdbv4.my_main_db, mynewuser, UserStore, UserStoreHst)
                        print(getattr(auser, "description", "No description!!!"))
            await db_main.commit()
    except Exception as e:
        print(f"Error getting users from LDAP: {e}")
			
async def save_alba_ldap_users():
    """Save Sediapualba's users and LDAP's users to the database"""
    await save_alba_users()
    await save_ldap_users()
    return "OK", None

async def get_user_by_certificate_dn(cert_dn:str)->UserAbst | None:
	'''Get the user in the database by the DNI in the certificate DN'''
	nif=xmother.get_substring_between(cert_dn,'serialNumber=',',').strip()
	if nif=='': nif=xmother.get_substring_between(cert_dn,'- DNI ',',').strip()	

	# Search in the database
	async with xmdbv4.get_async_session(xmdbv4.my_main_db) as db_main:
		result = await db_main.execute(
			select(UserStore).where(and_(UserStore.nif == nif, UserStore.ldap_activo == True))
		)
		return result.scalars().first()


if __name__ == "__main__":
	asyncio.run(save_alba_ldap_users())
		

4. Programa de utilidades varias.(xmother.py)

Veamos el código fuente, del que solo nos interesda  función:

import base64
import datetime
import os
import random
import secrets
import string
from typing import Any
from pathlib import Path

#---General Parameters
cookie_max_age=3600 # seconds

def new_id(size=10):
    ''' Crea un id de 10 letras aleatorias en mayúscula'''
    return ''.join(random.choices(string.ascii_uppercase + string.digits, k=size))


def return_as_list(my_dict:dict)->list:
	if my_dict is None or len(my_dict)==0: return []
	if not isinstance(my_dict, dict): return [my_dict]
	first_key = next(iter(my_dict))
	first_value = my_dict[first_key]
	if first_value is None: return []
	if isinstance(first_value, list): return first_value
	if isinstance(first_value, dict): return [first_value]
      
def new_return(output_dict:dict| Any, params:list|str, as_list:bool=True):
	my_obj=None
	my_error=None
	if not isinstance(output_dict, dict):
		if as_list: output_dict=[output_dict]	
		return output_dict, my_error
	else:
		if output_dict.get('Error',None)!=None:
			my_error=output_dict['Error']
			if as_list: my_obj=[my_obj]	
			return my_obj,my_error
	try:
		if isinstance(params,str):	my_obj=output_dict[params]
		else: my_obj=[output_dict[param] for param in params]	
	except Exception as e:
		my_error=str(e)
	if as_list: my_obj=[my_obj]	
	return my_obj, my_error	  

def json_str2json_bytearray(mystr:str)->str:
	# Hem de lleva les 2 primeres posicions (b') i l'última (')
	my_bytes=mystr[2:-1].encode() #Truco, sinó no va
	return my_bytes

def arrange_base64(mystr:str)->str:
	'''Replaces any of the chars of "/+=" with "X" of a baswe64 string '''
	toreplace="/+="
	for mychar in toreplace: mystr=mystr.replace(mychar,"X")
	return mystr

def get_old_random_string(mylength: int=32)->str:
	'''get a random string and removes "/+=" '''
	my_random_string = base64.b64encode(os.urandom(mylength)).decode('utf-8')  #
	return arrange_base64(my_random_string)

def get_random_string(mylength: int=32)->str:
	'''get a random string and removes "/+=" '''
	chars = string.ascii_letters + string.digits
	return ''.join(secrets.choice(chars) for _ in range(mylength))

def get_last_folder(mypath:str)->str:
	'''Returns the last folder of a path'''
	path_obj = Path(mypath).resolve()  # Convert to absolute path

	if path_obj.is_dir():
		return path_obj.name  # Return the folder itself if it's a directory

	return path_obj.parent.name  # Otherwise, return the parent folder of the file

def get_tipo_doc_eni(doc_name:str)->str:
	#TD99: Otros (expedient eni)
	if ('es_l' in doc_name.lower() and 'exp_index' in doc_name.lower()): return 'TD02'
	#TD02: Acuerdo de pleno
	elif 'acta' in doc_name.lower() and 'sessi' in doc_name.lower(): return 'TD02'
	elif ('es_l' in doc_name.lower() and 'actes_ple' in doc_name.lower()): return 'TD02'
	#TD12: Diligencia
	elif 'dil' in doc_name.lower() and 'actes' in doc_name.lower(): return 'TD12'
	#TD01: Decreto
	elif 'decr' in doc_name.lower() or 'resol' in doc_name.lower(): return 'TD01'
	else: return 'TD99'
	

def get_estado_elaboracion_eni(doc_name:str)->str:
	if doc_name.endswith('.pdf'): return 'EE01'
	elif doc_name.endswith('.xml'): 
		if "_DOC_" in doc_name: return 'EE02'
		elif "_EXP_" in doc_name: return 'EE01'
	return 'EE04'

def get_attr(obj, attr:str, default_value=None):
	'''Get the attribute of an object'''
	if obj is not None and len(attr)>0: 
		if isinstance(obj, dict): 
			if attr in obj: return obj[attr]
		elif hasattr(obj, attr): return getattr(obj, attr).value
	return default_value	
		
def get_attr_complex(obj, attr:str, obj1=None, attr1:str='', default_value=None)->str:
	'''Get nested alternative attributes form objects'''
	myvalue= get_attr(obj, attr)
	if not myvalue: myvalue= get_attr(obj1, attr1, default_value)
	return myvalue

def get_substring_between(mystring: str, start: str, end: str) -> str:
	"""Get the substring between two strings."""
	try:
		print('string='+str(mystring))
		print('start='+start)
		print('end='+end)
		start_index = mystring.index(start) + len(start)
		end_index = mystring.index(end, start_index)
		if end != -1: return mystring[start_index:end_index]
		else: return mystring[start:]  # no end found
	except ValueError:
		return ""
	
def get_totp_html(fh, has_totp:bool, image_path:str, is_error:bool=False)->str:
	'''Get the TOTP HTML code'''
	myimage=''
	myimage_label=''	
	myerror=''
	# 3. Si no tiene TOTP mostramos el código QR para que el Authenticator
	if not has_totp:
		myimage=fh.Img(src=image_path, alt="TOTP QR Code", cls="img-fluid")
		myimage_label=fh.Label("Escanea el QR code con Authenticator", cls="form-label")
	if is_error:
		myerror=fh.Div("Error en el 2º Factor de autenticación", cls="alert alert-danger")
	return fh.Div(
		myerror,
		myimage_label,
		myimage,
		fh.Label("Introduzca el código TOTP del Authenticator", cls="form-label"),
		fh.Input(type="text", name="totp_digits", id="totp_digits", cls="form-control"),
		fh.Label(""),
	)

def get_yyyymmddhhmmss()->str:
	'''Get the current date and time in the format YYYYMMDDHHMMSS'''
	now_= datetime.datetime.now()
	return now_.strftime("%Y%m%d%H%M%S")+str(now_.microsecond)[:6] # Get the first 6 digits of the microseconds
"""
def get_project_path():
  '''
  Gets the path of the current Python project.

  Returns:
    str: The absolute path to the project directory.
  '''
  #NO VA: path1=os.path.dirname(os.path.abspath(__file__))
  #NO VA: path2=sys.path[0]
  #NO VA: path3=Path(__file__).parent.parent.absolute()
  path4=os.getcwdb().decode('utf-8')
  return path4

"""

if __name__ == "__main__":
	a=get_substring_between('C=ES,O=EDIFICIO_LOCALIDAD,OU=CERTIFICADO ELECTRONICO DE PRUEBA,SN=MARTINEZ MARTINEZ,GN=MARTINETE,serialNumber=12345678A,CN=MARTINETE MARTINEZ MARTINEZ - DNI 12345678A', 'serialNumber=', ',')
	print (a)


5. Programa python en fastapi para que ejecute un servidor uvicorn que reciba los parámetros y autentique (xmopenresty.py)

Cabe destacar :

  • La línea del shebang para que se ejecute desde el entorno virtual correcto
  • Si no le indcamos el PROJECT_ROOT, n podemos ejecutar con shebang!!
  • La línea que llama a FastAPI (app = FastAPI())
  • Las clases AuthRequest y AuthResponse que encapsulan la entrada y salida de datos
  • La ruta POST /auth que es la que se encarga de contestar las peticiones de autenticación
  • El arranque del servidor uvicorn en l IP 127.0.0.1 y el puerto 5000

Veamos el código de xmopenresty.py:

#!/home/informatica/eduApps/softprop/venv_softprop/bin/python3

import sys
import os

# --- Ensure project root is in sys.path when run directly ---
PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
if PROJECT_ROOT not in sys.path:
    sys.path.insert(0, PROJECT_ROOT)

print("PROJECT_ROOT =", PROJECT_ROOT)
from softprop.basicutils import xmldap
from softprop.procedures import xmusers_procs
from softprop import xmproject_folder  # <-- Add this import

# ------Fin imprescindible


app = FastAPI()

class AuthRequest(BaseModel):
    username: str
    password: str
    # cert :str
    cert_verify : str
    cert_dn : str  

class AuthResponse(BaseModel):
    authenticated: bool



def authenticate_db(cert_dn:str, username: str) -> bool:
	''' Authenticate the user using the certificate DN and username.
		This function checks if the user exists in the database with the DNI of the certificate DN.
		And tests if the user is the same as the one in the DB with the DNI
		Args:
			cert_dn (str): The DN of the certificate.
			username (str): The username to authenticate.
		Returns:
			bool: True if the user is authenticated, False otherwise.
	'''
	myuser=xmusers_procs.get_user_by_certificate_dn(cert_dn)
	if myuser and myuser.ldap_user == username:
		print("authenticate_db: User authenticated successfully.")
		return True
	
	print("authenticate_db: NOT User authenticated.")
	return False
				


@app.post("/auth", response_model=AuthResponse)
async def auth(request: AuthRequest):
	''' This function is only used from openresty (nginx)
		Verify that the cert is OK and that the user and password exists in LDAP
		Args:
			request (AuthRequest): The authentication request containing username, password, and certificate information.
		Returns:
			AuthResponse: The authentication response indicating whether the user is authenticated or not.
	'''
	print(request)
	client_verify = 'SUCCESS' in request.cert_verify.upper() 
	is_authenticated_ldap= xmldap.autheticate_by_login(request.username, request.password)  
	is_authenticated_db= authenticate_db(request.cert_dn, request.username)
	is_valid_usr_pwd = client_verify and is_authenticated_ldap and is_authenticated_db
	        
	return AuthResponse(authenticated=is_valid_usr_pwd)


if __name__ == "__main__":
	cert_path=os.path.join(xmproject_folder.get_project_static_folder(), 'certs','wildcard.tavernes.es.')
    	uvicorn.run(app, host="proves.tavernes.es", port=5000,	
		ssl_keyfile =cert_path+"key", 
		ssl_certfile=cert_path+"crt")

Para ejecutarlo, hay que ir a un ventana de comandos y introducir

cd /home/informatica/eduApps
./softprop/authentication/ xmopenresty.py
# ------También se puede ejecutar sin el ./
#  softprop/authentication/ xmopenresty.py 

Pues como está el shebang de la primera línea, ya sabe que entorno vurtual de python utilizará para su ejecución


No hay comentarios :

Publicar un comentario