viernes, 27 de diciembre de 2024

Python (XVIII) sqlalchemy: CRUD. Problema con el "raw" SQL. Metadatos de la BD

1. Crear la conexión

Veamos como se crean los elementos para conectar con la base de datos con SQLAlchemy:

Aquí hemos complicado un poco las cosas pues para cadas sesión tenemos una conexión. Dejamos un máximo de 100 conexiones.

import json
from typing import Dict
from fastapi import Request
from sqlalchemy import String, Integer, Boolean, Date, DateTime, Float, Table, create_engine, MetaData, select, inspect, text, asc, desc, insert, delete
from sqlalchemy.orm import sessionmaker
from sqlalchemy.engine import Engine
import psycopg2

#DB vars
sessions={}
maxSessions=100
#connection = engine =sessionLocal = metadata = schema = None


def getDBConnection(dbType:str, dbUser:str, dbPwd:str, dbHost:str, dbPort:int|str, dbName:str, dbSchema:str, sessionId:str):
	''' Function to connect to the database and save the parameters 

	Args:
		dbType (str): Type of database (e.g., "postgresql", "mysql", "sqlite")
		dbUser (str): Database username
		dbPwd (str): Database password
		dbHost (str): Database host
		dbPort (int|str): Database port
		dbName (str): Database name
		dbSchema (str): Database schema
		
	Returns:	
		connection (Engine): Connection to the database
		engine (Engine): SQLAlchemy engine
		SessionLocal (Session): SQLAlchemy session	 
        metadata (MetaData): SQLAlchemy metadata       
	'''
	#global connection, engine, sessionLocal, metadata, schema
	global sessions
	if sessionId not in sessions:
		myDict={}
		
		# Create the connection string
		connection_string = f"{dbType}://{dbUser}:{dbPwd}@{dbHost}:{str(dbPort)}/{dbName}"

		# Create the SQLAlchemy engine
		myDict['engine'] = create_engine(connection_string)

		# Database configuration
		myDict['sessionLocal'] = sessionmaker(autocommit=False, autoflush=False, bind=myDict['engine'])
		#metadata = MetaData(bind=engine)
		# Reflect the database schema
		myDict['metadata'] = MetaData()
		myDict['metadata'].reflect(bind=myDict['engine'])

		#Connection
		myDict['connection']=myDict['engine'].connect()

		# Set the schema
		#connection.execute(text(f"SET search_path TO {dbSchema}"))
		myDict['schema']=dbSchema
		sessions[sessionId]=myDict
		#Not allowed more than maxSessions
		keys=list(sessions.keys())
		if len(keys)>maxSessions: 
			sessions[keys[0]]['connection'].close()	
			del sessions[keys[0]]

2. Consultas con "Raw" SQL con SQLAlchemy

Parece ser que si se permite utilizar consultas SQL "a pelo" per se tienen estas restricciones:

  1. Los nombres de esquemas, tablas y campos deben ser estar en minúsculas.
  2. Las cláusulas LIKE y cadenas de caracteres a comparar deben ir en comillas simples.
  3. Hay que pasar la funcion "text" a la sentencia SQL
  4. Si queremos usar nombres de esquemastablas y campos en MAYÚSCULA, se deben entrecomillar entre comillas dobles
Veamos ejemplos de errores frecuentes:

A. Esquema: esq  Tabla: tab Campo: camp 

SELECT * FROM esq.tab WHERE camp LIKE "P%"
El error está en utilizar comillas dobles en el literal del LIKE "P%". La solución es:
SELECT * FROM esq.tab WHERE camp LIKE 'P%'

B. Esquema: ESQ  Tabla: tab Campo: camp 

SELECT * FROM esq.tab WHERE camp LIKE 'P%'
El error está en NO utilizar comillas dobles en el nombre del esquema que tiene letras mayúsculas. La solución es:
SELECT * FROM "ESQ".tab WHERE camp LIKE 'P%'

C. Esquema: ESQ  Tabla: TAB Campo: camp 

SELECT * FROM "ESQ".TAB WHERE camp LIKE 'P%'
El error está en NO utilizar comillas dobles en el nombre de la tabla que tiene letras mayúsculas. La solución es:
SELECT * FROM "ESQ"."TAB" WHERE camp LIKE 'P%'

D. Esquema: ESQ  Tabla: TAB Campo: CAMP 

SELECT * FROM "ESQ"."TAB" WHERE CAMP LIKE 'P%'
El error está en NO utilizar comillas dobles en el nombre del campo que tiene letras mayúsculas. La solución es:
SELECT * FROM "ESQ"."TAB" WHERE "CAMP" LIKE 'P%'

Veamos ahora como ejecutar estas consultas SQL en SQLAlchemy:

Supongamos que ya tenemos la variable session creada, y vamos a realizar una búsqueda, a la que le pasamos la cláusula WHERE de la sentencia SQL. Para construir la sentencia completa hay que unir 'SELECT * FROM "esquema"."tabla" WHERE ' a las condiciones que recibimos en un string dentro del diccionario "values" y su clave es "cerca_expressio" que  puede tener este valor por ejemplo ' "DESCRIPCION" LIKE 'P%' "

Observar que se utiliza el objeto "engine" para realizar la consulta

def cercar(sessionId:str, values:dict):
    """
    Endpoint to fetch rows for the ag-Grid 
    - Supports pagination, sorting, and filtering.
    """
	tableName=sessions[sessionId]['dbTable']
	schema=sessions[sessionId]['schema']
	fullTableName=getFullTableName(schema, tableName)
	table=Table(tableName, sessions[sessionId]['metadata'], autoload_with=sessions[sessionId]['engine'], schema=sessions[sessionId]['schema'])
	columns=[col.name for col in table.columns]
	sqlWhere=values.get('cerca_expressio','')
	sqlSentence="SELECT * FROM " + fullTableName + " WHERE " +sqlWhere
	try:
		with sessions[sessionId]['engine' ].connect() as connection:
			result = connection.execute(text(sqlSentence))
			rows=result.fetchall()
			data=[]
			for row in rows: data.append(tuple2Dict(row, table, columns))
			return json.dumps({"message": "Query executed successfully", 'id_btn_clicked':'cercar','data': data})	
	except Exception as e:
		return json.dumps({"message": "Record query failed: "+ ', '.join(e.args),'id_btn_clicked':'Error'})

def getFullTableName(schema:str, tableName:str):
        '''Une con un punto esquema y tabla entrecomillado con comillas dobles'''
	return '"'+schema+'"."'+tableName+'"'


3. Modificaciones

En este caso utilizamos la session y no el engine. Le pasamos un diccionario de valores a modificar, pero tenemos que descartar aquellos que no forman parte de la tabla. Para ello hemos creado una función que solo toma los valores para modificar en el registro.

def modificar(sessionId:str, values:dict):
	tableName=sessions[sessionId]['dbTable']
	row=None
	try: 
		table=Table(tableName, sessions[sessionId]['metadata'], autoload_with=sessions[sessionId]['engine'], schema=sessions[sessionId]['schema'])
		session=sessions[sessionId]['sessionLocal']()
		fieldDict=getTableFieldValues(sessionId, tableName,values)

		# Update the record
		result = session.query(table).filter(table.columns.id == values.get('id',-1)).update(fieldDict)
		print("result=",result)
		session.commit()
  
		row = session.query(table).filter_by(id=values.get("id",-1)).first()
		rowDict=tuple2Dict(row, table, None)
	except Exception as e:
		session.rollback()
		return json.dumps({"message": "Record update failed: "+ ', '.join(e.args),'id_btn_clicked':'Error'})
	finally:
		session.close()
	return json.dumps({"message": "Record updated successfully", 'id_btn_clicked':values.get("id_btn_clicked",''),'row': rowDict})

'''Filtramos los campos de la tabla'''
def getTableFieldValues(sessionId: int | str, tableName: str, values:dict):
	columns, fkColDict, colNames, colQuotes=getTableInfo(tableName, sessionId)	
	fieldDict={ key:values[key] for key in colNames if key in values}
	return fieldDict


4. Altas


Aquí utilizamos "engine" y le pasamos un diccionario de los campos y sus valores al igual que en las modificaciones

def alta(sessionId:str, values:dict):
	tableName=sessions[sessionId]['dbTable']
	session=sessions[sessionId]['sessionLocal']()
	row=None
	try:
		table=Table(tableName, sessions[sessionId]['metadata'], autoload_with=sessions[sessionId]['engine'], schema=sessions[sessionId]['schema'])
		fieldDict=getTableFieldValues(sessionId,tableName, values)

		# New record
		# Create a new record using the dictionary
		stmt = insert(table).values(fieldDict)

		# Execute the statement
		with sessions[sessionId]['engine' ].connect() as connection:
			result = connection.execute(stmt)
			newId=result.inserted_primary_key[0]
			print ("newId",newId)
			query = select(table).where(table.c.id == newId)
			row = connection.execute(query).fetchone()
			rowDict=tuple2Dict(row, table, None)
			connection.commit()
	except Exception as e:
		connection.rollback()
		return json.dumps({"message": "Record addition failed: "+ ', '.join(e.args),'id_btn_clicked':'Error'})
	finally:
		connection.close()
	return json.dumps({"message": "Record added successfully", 'id_btn_clicked':values.get("id_btn_clicked",''),'row': rowDict})


5. Bajas


El proceso es parecido a los anteriores,

def borrar(sessionId:str, values:dict):
	tableName=sessions[sessionId]['dbTable']
	session=sessions[sessionId]['sessionLocal']()
	row=None
	try:
		table=Table(tableName, sessions[sessionId]['metadata'], autoload_with=sessions[sessionId]['engine'], schema=sessions[sessionId]['schema'])
		# Delete row with conditions
		stmt = delete(table).where(table.columns.id == values.get('id',-100))

		# Execute the statement
		with sessions[sessionId]['engine' ].connect() as connection:
			result = connection.execute(stmt)
			connection.commit() 
	except Exception as e:
		connection.rollback()
		return json.dumps({"message": "Record deletion failed: "+ ', '.join(e.args),'id_btn_clicked':'Error'})
	finally:
		connection.close()
	return json.dumps({"message": "Record deleted successfully", 'id_btn_clicked':values.get("id_btn_clicked",''),'id': values.get('id',-100)})


6. Obtener información de la tabla


Para ello tenemos estas funciones para obtener los metadatos de una tabla

def getForeignKeysAndReferences(tableName: str, sessionId:str):
	""" Detect foreign keys and referenced tables for a given table. """
	global sessions
	inspector = inspect(sessions[sessionId]['engine'])
	foreignKeys = inspector.get_foreign_keys(table_name= tableName, schema=sessions[sessionId]['schema'])
	fkColDict = {}
	for fk in foreignKeys: fkColDict[fk['constrained_columns']]=fk
	return fkColDict

def getTableInfo(tableName: str, sessionId:str):
	""" Endpoint to dynamically generate a form with foreign key detection and field types. """
	global sessions
	table = Table(tableName, sessions[sessionId]['metadata'], autoload_with=sessions[sessionId]['engine'], schema=sessions[sessionId]['schema'])
	fkColDict = getForeignKeysAndReferences(tableName,sessionId)
	
	# Detect column names and types
	columns = [{"name": col.name, "type": sql2HtmlType(col.type), "value":get_default(col), "width":get_length(col)} for col in table.columns ]
	colNames=[col.name for col in table.columns ]
	colQuotes= {col.name : sqlQuote(col.type) for col in table.columns }
	return columns, fkColDict, colNames, colQuotes




No hay comentarios :

Publicar un comentario