1. Crear la conexión
Veamos como se crean los elementos para conectar con la base de datos con SQLAlchemy:
Aquí hemos complicado un poco las cosas pues para cadas sesión tenemos una conexión. Dejamos un máximo de 100 conexiones.
import json from typing import Dict from fastapi import Request from sqlalchemy import String, Integer, Boolean, Date, DateTime, Float, Table, create_engine, MetaData, select, inspect, text, asc, desc, insert, delete from sqlalchemy.orm import sessionmaker from sqlalchemy.engine import Engine import psycopg2 #DB vars sessions={} maxSessions=100 #connection = engine =sessionLocal = metadata = schema = None def getDBConnection(dbType:str, dbUser:str, dbPwd:str, dbHost:str, dbPort:int|str, dbName:str, dbSchema:str, sessionId:str): ''' Function to connect to the database and save the parameters Args: dbType (str): Type of database (e.g., "postgresql", "mysql", "sqlite") dbUser (str): Database username dbPwd (str): Database password dbHost (str): Database host dbPort (int|str): Database port dbName (str): Database name dbSchema (str): Database schema Returns: connection (Engine): Connection to the database engine (Engine): SQLAlchemy engine SessionLocal (Session): SQLAlchemy session metadata (MetaData): SQLAlchemy metadata ''' #global connection, engine, sessionLocal, metadata, schema global sessions if sessionId not in sessions: myDict={} # Create the connection string connection_string = f"{dbType}://{dbUser}:{dbPwd}@{dbHost}:{str(dbPort)}/{dbName}" # Create the SQLAlchemy engine myDict['engine'] = create_engine(connection_string) # Database configuration myDict['sessionLocal'] = sessionmaker(autocommit=False, autoflush=False, bind=myDict['engine']) #metadata = MetaData(bind=engine) # Reflect the database schema myDict['metadata'] = MetaData() myDict['metadata'].reflect(bind=myDict['engine']) #Connection myDict['connection']=myDict['engine'].connect() # Set the schema #connection.execute(text(f"SET search_path TO {dbSchema}")) myDict['schema']=dbSchema sessions[sessionId]=myDict #Not allowed more than maxSessions keys=list(sessions.keys()) if len(keys)>maxSessions: sessions[keys[0]]['connection'].close() del sessions[keys[0]]
2. Consultas con "Raw" SQL con SQLAlchemy
Parece ser que si se permite utilizar consultas SQL "a pelo" per se tienen estas restricciones:
- Los nombres de esquemas, tablas y campos deben ser estar en minúsculas.
- Las cláusulas LIKE y cadenas de caracteres a comparar deben ir en comillas simples.
- Hay que pasar la funcion "text" a la sentencia SQL
- Si queremos usar nombres de esquemas, tablas y campos en MAYÚSCULA, se deben entrecomillar entre comillas dobles
Veamos ejemplos de errores frecuentes:
A. Esquema: esq Tabla: tab Campo: camp
SELECT * FROM esq.tab WHERE camp LIKE "P%"
El error está en utilizar comillas dobles en el literal del LIKE "P%". La solución es:
SELECT * FROM esq.tab WHERE camp LIKE 'P%'
B. Esquema: ESQ Tabla: tab Campo: camp
SELECT * FROM esq.tab WHERE camp LIKE 'P%'
El error está en NO utilizar comillas dobles en el nombre del esquema que tiene letras mayúsculas. La solución es:
SELECT * FROM "ESQ".tab WHERE camp LIKE 'P%'
C. Esquema: ESQ Tabla: TAB Campo: camp
SELECT * FROM "ESQ".TAB WHERE camp LIKE 'P%'
El error está en NO utilizar comillas dobles en el nombre de la tabla que tiene letras mayúsculas. La solución es:
SELECT * FROM "ESQ"."TAB" WHERE camp LIKE 'P%'
D. Esquema: ESQ Tabla: TAB Campo: CAMP
SELECT * FROM "ESQ"."TAB" WHERE CAMP LIKE 'P%'
El error está en NO utilizar comillas dobles en el nombre del campo que tiene letras mayúsculas. La solución es:
SELECT * FROM "ESQ"."TAB" WHERE "CAMP" LIKE 'P%'
Veamos ahora como ejecutar estas consultas SQL en SQLAlchemy:
Supongamos que ya tenemos la variable session creada, y vamos a realizar una búsqueda, a la que le pasamos la cláusula WHERE de la sentencia SQL. Para construir la sentencia completa hay que unir 'SELECT * FROM "esquema"."tabla" WHERE ' a las condiciones que recibimos en un string dentro del diccionario "values" y su clave es "cerca_expressio" que puede tener este valor por ejemplo ' "DESCRIPCION" LIKE 'P%' "
Observar que se utiliza el objeto "engine" para realizar la consulta
def cercar(sessionId:str, values:dict): """ Endpoint to fetch rows for the ag-Grid - Supports pagination, sorting, and filtering. """ tableName=sessions[sessionId]['dbTable'] schema=sessions[sessionId]['schema'] fullTableName=getFullTableName(schema, tableName) table=Table(tableName, sessions[sessionId]['metadata'], autoload_with=sessions[sessionId]['engine'], schema=sessions[sessionId]['schema']) columns=[col.name for col in table.columns] sqlWhere=values.get('cerca_expressio','') sqlSentence="SELECT * FROM " + fullTableName + " WHERE " +sqlWhere try: with sessions[sessionId]['engine' ].connect() as connection: result = connection.execute(text(sqlSentence)) rows=result.fetchall() data=[] for row in rows: data.append(tuple2Dict(row, table, columns)) return json.dumps({"message": "Query executed successfully", 'id_btn_clicked':'cercar','data': data}) except Exception as e: return json.dumps({"message": "Record query failed: "+ ', '.join(e.args),'id_btn_clicked':'Error'}) def getFullTableName(schema:str, tableName:str):
'''Une con un punto esquema y tabla entrecomillado con comillas dobles''' return '"'+schema+'"."'+tableName+'"'
3. Modificaciones
En este caso utilizamos la session y no el engine. Le pasamos un diccionario de valores a modificar, pero tenemos que descartar aquellos que no forman parte de la tabla. Para ello hemos creado una función que solo toma los valores para modificar en el registro.
def modificar(sessionId:str, values:dict): tableName=sessions[sessionId]['dbTable'] row=None try: table=Table(tableName, sessions[sessionId]['metadata'], autoload_with=sessions[sessionId]['engine'], schema=sessions[sessionId]['schema']) session=sessions[sessionId]['sessionLocal']() fieldDict=getTableFieldValues(sessionId, tableName,values) # Update the record result = session.query(table).filter(table.columns.id == values.get('id',-1)).update(fieldDict) print("result=",result) session.commit() row = session.query(table).filter_by(id=values.get("id",-1)).first() rowDict=tuple2Dict(row, table, None) except Exception as e: session.rollback() return json.dumps({"message": "Record update failed: "+ ', '.join(e.args),'id_btn_clicked':'Error'}) finally: session.close() return json.dumps({"message": "Record updated successfully", 'id_btn_clicked':values.get("id_btn_clicked",''),'row': rowDict}) '''Filtramos los campos de la tabla''' def getTableFieldValues(sessionId: int | str, tableName: str, values:dict): columns, fkColDict, colNames, colQuotes=getTableInfo(tableName, sessionId) fieldDict={ key:values[key] for key in colNames if key in values} return fieldDict
4. Altas
Aquí utilizamos "engine" y le pasamos un diccionario de los campos y sus valores al igual que en las modificaciones
def alta(sessionId:str, values:dict): tableName=sessions[sessionId]['dbTable'] session=sessions[sessionId]['sessionLocal']() row=None try: table=Table(tableName, sessions[sessionId]['metadata'], autoload_with=sessions[sessionId]['engine'], schema=sessions[sessionId]['schema']) fieldDict=getTableFieldValues(sessionId,tableName, values) # New record # Create a new record using the dictionary stmt = insert(table).values(fieldDict) # Execute the statement with sessions[sessionId]['engine' ].connect() as connection: result = connection.execute(stmt) newId=result.inserted_primary_key[0] print ("newId",newId) query = select(table).where(table.c.id == newId) row = connection.execute(query).fetchone() rowDict=tuple2Dict(row, table, None) connection.commit() except Exception as e: connection.rollback() return json.dumps({"message": "Record addition failed: "+ ', '.join(e.args),'id_btn_clicked':'Error'}) finally: connection.close() return json.dumps({"message": "Record added successfully", 'id_btn_clicked':values.get("id_btn_clicked",''),'row': rowDict})
5. Bajas
El proceso es parecido a los anteriores,
def borrar(sessionId:str, values:dict): tableName=sessions[sessionId]['dbTable'] session=sessions[sessionId]['sessionLocal']() row=None try: table=Table(tableName, sessions[sessionId]['metadata'], autoload_with=sessions[sessionId]['engine'], schema=sessions[sessionId]['schema']) # Delete row with conditions stmt = delete(table).where(table.columns.id == values.get('id',-100)) # Execute the statement with sessions[sessionId]['engine' ].connect() as connection: result = connection.execute(stmt) connection.commit() except Exception as e: connection.rollback() return json.dumps({"message": "Record deletion failed: "+ ', '.join(e.args),'id_btn_clicked':'Error'}) finally: connection.close() return json.dumps({"message": "Record deleted successfully", 'id_btn_clicked':values.get("id_btn_clicked",''),'id': values.get('id',-100)})
6. Obtener información de la tabla
Para ello tenemos estas funciones para obtener los metadatos de una tabla
def getForeignKeysAndReferences(tableName: str, sessionId:str): """ Detect foreign keys and referenced tables for a given table. """ global sessions inspector = inspect(sessions[sessionId]['engine']) foreignKeys = inspector.get_foreign_keys(table_name= tableName, schema=sessions[sessionId]['schema']) fkColDict = {} for fk in foreignKeys: fkColDict[fk['constrained_columns']]=fk return fkColDict def getTableInfo(tableName: str, sessionId:str): """ Endpoint to dynamically generate a form with foreign key detection and field types. """ global sessions table = Table(tableName, sessions[sessionId]['metadata'], autoload_with=sessions[sessionId]['engine'], schema=sessions[sessionId]['schema']) fkColDict = getForeignKeysAndReferences(tableName,sessionId) # Detect column names and types columns = [{"name": col.name, "type": sql2HtmlType(col.type), "value":get_default(col), "width":get_length(col)} for col in table.columns ] colNames=[col.name for col in table.columns ] colQuotes= {col.name : sqlQuote(col.type) for col in table.columns } return columns, fkColDict, colNames, colQuotes
No hay comentarios :
Publicar un comentario