1. Crear la conexión
Veamos como se crean los elementos para conectar con la base de datos con SQLAlchemy:
Aquí hemos complicado un poco las cosas pues para cadas sesión tenemos una conexión. Dejamos un máximo de 100 conexiones.
import json from typing import Dict from fastapi import Request from sqlalchemy import String, Integer, Boolean, Date, DateTime, Float, Table, create_engine, MetaData, select, inspect, text, asc, desc, insert, delete from sqlalchemy.orm import sessionmaker from sqlalchemy.engine import Engine import psycopg2 #DB vars sessions={} maxSessions=100 #connection = engine =sessionLocal = metadata = schema = None def getDBConnection(dbType:str, dbUser:str, dbPwd:str, dbHost:str, dbPort:int|str, dbName:str, dbSchema:str, sessionId:str): ''' Function to connect to the database and save the parameters Args: dbType (str): Type of database (e.g., "postgresql", "mysql", "sqlite") dbUser (str): Database username dbPwd (str): Database password dbHost (str): Database host dbPort (int|str): Database port dbName (str): Database name dbSchema (str): Database schema Returns: connection (Engine): Connection to the database engine (Engine): SQLAlchemy engine SessionLocal (Session): SQLAlchemy session metadata (MetaData): SQLAlchemy metadata ''' #global connection, engine, sessionLocal, metadata, schema global sessions if sessionId not in sessions: myDict={} # Create the connection string connection_string = f"{dbType}://{dbUser}:{dbPwd}@{dbHost}:{str(dbPort)}/{dbName}" # Create the SQLAlchemy engine myDict['engine'] = create_engine(connection_string) # Database configuration myDict['sessionLocal'] = sessionmaker(autocommit=False, autoflush=False, bind=myDict['engine']) #metadata = MetaData(bind=engine) # Reflect the database schema myDict['metadata'] = MetaData() myDict['metadata'].reflect(bind=myDict['engine']) #Connection myDict['connection']=myDict['engine'].connect() # Set the schema #connection.execute(text(f"SET search_path TO {dbSchema}")) myDict['schema']=dbSchema sessions[sessionId]=myDict #Not allowed more than maxSessions keys=list(sessions.keys()) if len(keys)>maxSessions: sessions[keys[0]]['connection'].close() del sessions[keys[0]]
2. Consultas con "Raw" SQL con SQLAlchemy
Parece ser que si se permite utilizar consultas SQL "a pelo" per se tienen estas restricciones:
- Los nombres de esquemas, tablas y campos deben ser estar en minúsculas.
- Las cláusulas LIKE y cadenas de caracteres a comparar deben ir en comillas simples.
- Hay que pasar la funcion "text" a la sentencia SQL
- Si queremos usar nombres de esquemas, tablas y campos en MAYÚSCULA, se deben entrecomillar entre comillas dobles
Veamos ejemplos de errores frecuentes:
A. Esquema: esq Tabla: tab Campo: camp
SELECT * FROM esq.tab WHERE camp LIKE "P%"
El error está en utilizar comillas dobles en el literal del LIKE "P%". La solución es:
SELECT * FROM esq.tab WHERE camp LIKE 'P%'
B. Esquema: ESQ Tabla: tab Campo: camp
SELECT * FROM esq.tab WHERE camp LIKE 'P%'
El error está en NO utilizar comillas dobles en el nombre del esquema que tiene letras mayúsculas. La solución es:
SELECT * FROM "ESQ".tab WHERE camp LIKE 'P%'
C. Esquema: ESQ Tabla: TAB Campo: camp
SELECT * FROM "ESQ".TAB WHERE camp LIKE 'P%'
El error está en NO utilizar comillas dobles en el nombre de la tabla que tiene letras mayúsculas. La solución es:
SELECT * FROM "ESQ"."TAB" WHERE camp LIKE 'P%'
D. Esquema: ESQ Tabla: TAB Campo: CAMP
SELECT * FROM "ESQ"."TAB" WHERE CAMP LIKE 'P%'
El error está en NO utilizar comillas dobles en el nombre del campo que tiene letras mayúsculas. La solución es:
SELECT * FROM "ESQ"."TAB" WHERE "CAMP" LIKE 'P%'
Veamos ahora como ejecutar estas consultas SQL en SQLAlchemy:
Supongamos que ya tenemos la variable session creada, y vamos a realizar una búsqueda, a la que le pasamos la cláusula WHERE de la sentencia SQL. Para construir la sentencia completa hay que unir 'SELECT * FROM "esquema"."tabla" WHERE ' a las condiciones que recibimos en un string dentro del diccionario "values" y su clave es "cerca_expressio" que puede tener este valor por ejemplo ' "DESCRIPCION" LIKE 'P%' "
Observar que se utiliza el objeto "engine" para realizar la consulta
def cercar(sessionId:str, values:dict): """ Endpoint to fetch rows for the ag-Grid - Supports pagination, sorting, and filtering. """ tableName=sessions[sessionId]['dbTable'] schema=sessions[sessionId]['schema'] fullTableName=getFullTableName(schema, tableName) table=Table(tableName, sessions[sessionId]['metadata'], autoload_with=sessions[sessionId]['engine'], schema=sessions[sessionId]['schema']) columns=[col.name for col in table.columns] sqlWhere=values.get('cerca_expressio','') sqlSentence="SELECT * FROM " + fullTableName + " WHERE " +sqlWhere try: with sessions[sessionId]['engine' ].connect() as connection: result = connection.execute(text(sqlSentence)) rows=result.fetchall() data=[] for row in rows: data.append(tuple2Dict(row, table, columns)) return json.dumps({"message": "Query executed successfully", 'id_btn_clicked':'cercar','data': data}) except Exception as e: return json.dumps({"message": "Record query failed: "+ ', '.join(e.args),'id_btn_clicked':'Error'}) def getFullTableName(schema:str, tableName:str):
'''Une con un punto esquema y tabla entrecomillado con comillas dobles''' return '"'+schema+'"."'+tableName+'"'
3. Modificaciones
En este caso utilizamos la session y no el engine. Le pasamos un diccionario de valores a modificar, pero tenemos que descartar aquellos que no forman parte de la tabla. Para ello hemos creado una función que solo toma los valores para modificar en el registro.
def modificar(sessionId:str, values:dict): tableName=sessions[sessionId]['dbTable'] row=None try: table=Table(tableName, sessions[sessionId]['metadata'], autoload_with=sessions[sessionId]['engine'], schema=sessions[sessionId]['schema']) session=sessions[sessionId]['sessionLocal']() fieldDict=getTableFieldValues(sessionId, tableName,values) # Update the record result = session.query(table).filter(table.columns.id == values.get('id',-1)).update(fieldDict) print("result=",result) session.commit() row = session.query(table).filter_by(id=values.get("id",-1)).first() rowDict=tuple2Dict(row, table, None) except Exception as e: session.rollback() return json.dumps({"message": "Record update failed: "+ ', '.join(e.args),'id_btn_clicked':'Error'}) finally: session.close() return json.dumps({"message": "Record updated successfully", 'id_btn_clicked':values.get("id_btn_clicked",''),'row': rowDict}) '''Filtramos los campos de la tabla''' def getTableFieldValues(sessionId: int | str, tableName: str, values:dict): columns, fkColDict, colNames, colQuotes=getTableInfo(tableName, sessionId) fieldDict={ key:values[key] for key in colNames if key in values} return fieldDict
4. Altas
Aquí utilizamos "engine" y le pasamos un diccionario de los campos y sus valores al igual que en las modificaciones
def alta(sessionId:str, values:dict): tableName=sessions[sessionId]['dbTable'] session=sessions[sessionId]['sessionLocal']() row=None try: table=Table(tableName, sessions[sessionId]['metadata'], autoload_with=sessions[sessionId]['engine'], schema=sessions[sessionId]['schema']) fieldDict=getTableFieldValues(sessionId,tableName, values) # New record # Create a new record using the dictionary stmt = insert(table).values(fieldDict) # Execute the statement with sessions[sessionId]['engine' ].connect() as connection: result = connection.execute(stmt) newId=result.inserted_primary_key[0] print ("newId",newId) query = select(table).where(table.c.id == newId) row = connection.execute(query).fetchone() rowDict=tuple2Dict(row, table, None) connection.commit() except Exception as e: connection.rollback() return json.dumps({"message": "Record addition failed: "+ ', '.join(e.args),'id_btn_clicked':'Error'}) finally: connection.close() return json.dumps({"message": "Record added successfully", 'id_btn_clicked':values.get("id_btn_clicked",''),'row': rowDict})
5. Bajas
El proceso es parecido a los anteriores,
def borrar(sessionId:str, values:dict): tableName=sessions[sessionId]['dbTable'] session=sessions[sessionId]['sessionLocal']() row=None try: table=Table(tableName, sessions[sessionId]['metadata'], autoload_with=sessions[sessionId]['engine'], schema=sessions[sessionId]['schema']) # Delete row with conditions stmt = delete(table).where(table.columns.id == values.get('id',-100)) # Execute the statement with sessions[sessionId]['engine' ].connect() as connection: result = connection.execute(stmt) connection.commit() except Exception as e: connection.rollback() return json.dumps({"message": "Record deletion failed: "+ ', '.join(e.args),'id_btn_clicked':'Error'}) finally: connection.close() return json.dumps({"message": "Record deleted successfully", 'id_btn_clicked':values.get("id_btn_clicked",''),'id': values.get('id',-100)})
6. Obtener información de la tabla
Para ello tenemos estas funciones para obtener los metadatos de una tabla
def getForeignKeysAndReferences(tableName: str, sessionId:str): """ Detect foreign keys and referenced tables for a given table. """ global sessions inspector = inspect(sessions[sessionId]['engine']) foreignKeys = inspector.get_foreign_keys(table_name= tableName, schema=sessions[sessionId]['schema']) fkColDict = {} for fk in foreignKeys: fkColDict[fk['constrained_columns']]=fk return fkColDict def getTableInfo(tableName: str, sessionId:str): """ Endpoint to dynamically generate a form with foreign key detection and field types. """ global sessions table = Table(tableName, sessions[sessionId]['metadata'], autoload_with=sessions[sessionId]['engine'], schema=sessions[sessionId]['schema']) fkColDict = getForeignKeysAndReferences(tableName,sessionId) # Detect column names and types columns = [{"name": col.name, "type": sql2HtmlType(col.type), "value":get_default(col), "width":get_length(col)} for col in table.columns ] colNames=[col.name for col in table.columns ] colQuotes= {col.name : sqlQuote(col.type) for col in table.columns } return columns, fkColDict, colNames, colQuotes
7. Alterar la estructura de una tabla. Alembic
Para alterar la estructura de una tabla sse puede utilizar el comjando SQL ALTER TABLE
Pero no es recomendable si se ha utilizado SQLALCHEMY paa crear las tablas. Veamos como se procede
7.1 Instalar Alembic
Nos situamos en la carpeta del proyecto y ejecutamos:
# 1. Instalar la libreía alembic en el entorno virutal pip install alembic # 2. Crea los ficheros alembic.ini y alembic/env-py # proyecto # |-> alembic.ini # |-> alembic # |-> env.py alembic init alembic
7.2 Configurar la conexión de la BD en el fichero alembic.ini
# 1. Para Postgres sqlalchemy.url = postgresql+psycopg2://user:pssword@SERVER_IP:5432/db_name # OJO: NO indicar nombres de esquema en postgres!postgresql+psycopg2://user:pssword@SERVER_IP:5432/db_name?application_name=my_schema# 2. Para sqlite sqlalchemy.url = sqlite:///./your_database.db
7.3 Definir los "metadatos" de la tabla en alembic/env.py
Buscar esta instrucción :
target_metadata = None
Y cambiarla por:
# Note that in my case I have defined the Base class as the base class that the other classes inheritate ## Base class for ORM models #Base = declarative_base() from models import Base # assuming models.Base = declarative_base() target_metadata = Base.metadata
7.4 Ejecutar el script de migración
alembic revision --autogenerate -m "Add new fields to UserStore"
Y en mi caso me devuelve:
INFO [alembic.runtime.migration] Context impl PostgresqlImpl. INFO [alembic.runtime.migration] Will assume transactional DDL. INFO [alembic.autogenerate.compare] Detected added table 'ine.x_users' INFO [alembic.autogenerate.compare] Detected added index ''ix_ine_x_users_description'' on '('description',)' INFO [alembic.autogenerate.compare] Detected added index ''ix_ine_x_users_id'' on '('id',)' INFO [alembic.autogenerate.compare] Detected added index ''ix_ine_x_users_id_alba'' on '('id_alba',)' INFO [alembic.autogenerate.compare] Detected added index ''ix_ine_x_users_ldap_user'' on '('ldap_user',)' INFO [alembic.autogenerate.compare] Detected added index ''ix_ine_x_users_nif'' on '('nif',)' INFO [alembic.autogenerate.compare] Detected added index ''ix_ine_x_users_puesto'' on '('puesto',)' INFO [alembic.autogenerate.compare] Detected added index ''ix_ine_x_users_totp_user'' on '('totp_user',)' INFO [alembic.autogenerate.compare] Detected added table 'ine.x_users_hist' INFO [alembic.autogenerate.compare] Detected added index ''ix_ine_x_users_hist_description'' on '('description',)' INFO [alembic.autogenerate.compare] Detected added index ''ix_ine_x_users_hist_id'' on '('id',)' INFO [alembic.autogenerate.compare] Detected added index ''ix_ine_x_users_hist_id_alba'' on '('id_alba',)' INFO [alembic.autogenerate.compare] Detected added index ''ix_ine_x_users_hist_ldap_user'' on '('ldap_user',)' INFO [alembic.autogenerate.compare] Detected added index ''ix_ine_x_users_hist_nif'' on '('nif',)' INFO [alembic.autogenerate.compare] Detected added index ''ix_ine_x_users_hist_puesto'' on '('puesto',)' INFO [alembic.autogenerate.compare] Detected added index ''ix_ine_x_users_hist_totp_user'' on '('totp_user',)' INFO [alembic.autogenerate.compare] Detected removed table 'revinfo'
No hay comentarios :
Publicar un comentario