Casi me vuelvo loco veamos lo que sucedía:
- Los mismos parámetros de conexion (url,usuario, contraseña,base de datos, puerto) se usan en 2 máquinas clientes distintas
- Cada máquina encontraba alguna base de datos que no veía la otra
- Para una base de datos comun (openweb), una máquina veía un esquema que la otra no veía
- Cuando listabas las BD con sus parámetros, se veía que el tamaño era distinto.
Plantamiento de solución:
- Hago una copia de seguridad con pgadmin de la BD buena
- La versión de postgres del servidor era la 12. Instalo la 14 en la misma máquina.
- Con la sentencia pg_lsclusters listo los clusters qye hay y me muestra 2 clusters, uno de la versión 12 y otro de la 14
- Borramos el cluster de la 14 con sudo pg_dropcluster 16 main --stop (para poder impoirtar ahí el de la 12)
- Realizamos la migración del cluster 12 al 14 con sudo pg_upgradecluster 12 main
- Paramos el cluster de la 12 con sudo pg_dropcluster 12 main y quitamos los servicios de la 12
- TODAVIA DABA RESULTADOS DIFERENTES.
- Compruebo con psql en ambos lados "SELECT version();" y en un cliente me devuelve la versión 12 y en otro la versión 14
- Desisnstalo la versión de postgres 12 del servidor con
sudo apt remove postgresql-12 postgresql-client-12
- Rearranco la máquina y a funcionar!!!!
Adjunto el fichero python xmdbv4.py donde al final aparecen comprobaciones por si acaso sucede algo parecido:
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker, declarative_base
from sqlalchemy import inspect, select, text
from typing import Any, AsyncGenerator, Awaitable
from contextlib import asynccontextmanager
from pathlib import Path
import os, sys
import asyncio
#------ Import from other folders (basicutils)
# --- Ensure project root is in sys.path when run directly ---
PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
if PROJECT_ROOT not in sys.path:
sys.path.insert(0, PROJECT_ROOT)
from softprop.basicutils import xmcrypto
from softprop import xmproject_folder # <-- Add this import
# ------End imprescindible
# --- Load properties
#mydb_conf_file = os.getenv("CONF_PYTHON") + "db.encrypted.yml"
mydb_conf_file = xmproject_folder.get_project_static_conf_params_encr_folder() + os.sep + "db.encrypted.yml"
mydb_all_props = xmcrypto.get_properties_from_file(mydb_conf_file)
my_main_db = mydb_all_props["actualDBs"][0]
is_async = True
# -- Filter only active DBs
mydb_props = {
k: mydb_all_props[k] for k in mydb_all_props["actualDBs"] if k in mydb_all_props
}
myschema = {k: mydb_props[k]["dbSchema"] for k in mydb_props if "dbSchema" in mydb_props[k]}
def get_db_props(key=None) -> dict:
"""Build connection parameters"""
props = mydb_props[key].copy()
if is_async:
props["url"] = (
f"{props['dbType']}://{props['dbUser']}:{props['dbPwd']}"
f"@{props['dbHost']}:{str(props['dbPort'])}/{props['dbName']}"
)
if props.get("appName"):
props["connect_args"] = {
"server_settings": {"application_name": props["appName"]}
}
else:
app_url = f"?application_name={props['appName']}" if props.get("appName") else ""
props["url"] = (
f"{props['dbType']}://{props['dbUser']}:{props['dbPwd']}"
f"@{props['dbHost']}:{str(props['dbPort'])}/{props['dbName']}" + app_url
)
my_props = {
k: props[k]
for k in {
"url",
"pool_size",
"max_overflow",
"pool_timeout",
"pool_recycle",
"pool_pre_ping",
"connect_args",
"echo",
"future",
"poolclass",
"execution_options",
"json_serializer",
}
if k in props
}
return my_props
# Singleton engines + sessionmakers (lazy init)
async_engines = {}
async_sessionmakers = {}
def get_engine(db_key: str):
"""Lazily create engine + sessionmaker for a DB key."""
if db_key not in async_engines:
config = get_db_props(db_key)
engine = create_async_engine(
config["url"],
**{k: config[k] for k in config if k != "url"},
)
async_engines[db_key] = engine
async_sessionmakers[db_key] = sessionmaker(
bind=engine, class_=AsyncSession, expire_on_commit=False
)
return async_engines[db_key]
# Base class for ORM models
Base = declarative_base()
# ─── Async Session Generator ───────────────────────────────────────────────
@asynccontextmanager
async def get_async_session(db_key: str) -> AsyncGenerator[AsyncSession, None]:
"""Usage: async with get_async_session(db_key) as session:"""
get_engine(db_key) # ensure engine exists in current loop
async with async_sessionmakers[db_key]() as session:
yield session
# ─── Create all tables for a given DB key ──────────────────────────────────
async def create_all_tables(db_key: str):
"""Create all tables for the given DB key."""
engine = get_engine(db_key)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
# ─── Load inspector for a given DB key ─────────────────────────────────────
async def load_inspector(db_key: str):
"""Get SQLAlchemy inspector for the given DB key."""
engine = get_engine(db_key)
async with engine.connect() as conn:
return await conn.run_sync(inspect)
# ─── Merge instance into main and history tables ────────────────────────────
async def _merge_instance(async_ses: AsyncSession, base_instance: Any, cls: type, merged_instance: Any) -> Any:
"""Helper to merge a single instance into a table."""
stmt = select(cls).filter_by(description=base_instance.description)
result = await async_ses.execute(stmt)
instance = result.scalars().first()
if instance:
for k, v in base_instance.__dict__.items():
if not k.startswith("_") and k != "id":
setattr(instance, k, v)
else:
instance = cls(**{k: v for k, v in base_instance.__dict__.items() if not k.startswith("_")})
if merged_instance:
instance.id = merged_instance.id # Share the ID
merged_instance = await async_ses.merge(instance)
await async_ses.flush()
return merged_instance
async def merge_to_main_and_histo_async_newtrans(
db_key: str,
base_instance: Any,
*classes: type
) -> Any:
"""Merge an instance into multiple tables (main + history)."""
merged_instance = None
async with get_async_session(db_key) as async_ses:
async with async_ses.begin():
for cls in classes:
try:
merged_instance = await _merge_instance(async_ses, base_instance, cls, merged_instance)
except Exception as e:
print(f"Error in merge_to_main_and_histo_async_newtrans for class {cls.__name__}: {e}")
raise
#await async_ses.commit()
return merged_instance
'''
=======================================================================================
ESTA PARTE ES SOLO PARA COMPROBACIONES Y TESTEO DE LA CONEXION A LA BD
=======================================================================================
'''
async def test_db():
''' Test DB connection and print info
Esto sale a raiz del problema que tuve donde la misma conexión a la BD daba
distinto resultado según la máquina cliente que hacía la conexión:
Al final resultó ser que había 2 versiones de los ficheros de la BD
y según la versión del cliene psql ejecutaba una u otra
Pero parece ser que el cliente psql de una máquina era capaz de ejecutar acciones
sobre el fichero de la BD a pesar que el servidor estaba parado.
Inicialmente estava la versión de postgres 12 y luego actualicé a la 14 y migré los datos
desde la 12 a la 14 sin borrar los datos de la versión 12. Paré el servidor 12 pero el cliente psql
de la máquina seguía funcionando y accediendo a la versión 12.
Al final desinstalé la versión 12 y borré los ficheros de datos de la versión 12 y ya todo funciona bien.
'''
async with get_async_session(my_main_db) as session:
print('0. Compruebe que coinciden todos los datos en casos de comparar conexiones\n')
# 1. Version, usuario, dirección, puerto..
result = await session.execute(
text("SELECT current_database(), current_user, version(), inet_server_addr(), inet_server_port()")
)
db_name, db_user, db_version, db_addr, db_port = result.first()
print("1. Conectado a DB:", db_name, "\nversión:", db_version,"\ncon usuario:", db_user,
"\nen host:", db_addr, "\npuerto:", db_port)
# 2. Ver el directorio la base de datos
result = await session.execute(text("SHOW data_directory"))
db_data_directory = result.first()
print("2. Data directory:", db_data_directory)
# 3. Ver el oid de la base de datos
result = await session.execute(text("SELECT datname, oid FROM pg_database"))
db_name, db_oid = result.first()
print("3. DB_name:", db_name, "DB_oid:", db_oid)
# 4. Ver el search_path
result = await session.execute(text("SHOW search_path"))
print("4. Search path:", result.scalar(), "\n")
# Obtener el engine desde la sesión
#engine = session.get_bind()
engine =get_engine(my_main_db)
print("engineURL:", engine.url)
print("engine.Dialect:", engine.dialect.name)
print("engine.Driver:", engine.dialect.driver,"\n")
print("#######################################################################\n")
# Usar inspector síncrono dentro del async
#async with engine.connect() as conn:
async with engine.begin() as conn:
def _inspect(sync_conn):
inspector = inspect(sync_conn)
schemas = inspector.get_schema_names()
result = {}
for schema in schemas:
tables = inspector.get_table_names(schema=schema)
result[schema] = tables
return result
db_structure = await conn.run_sync(_inspect)
print("\n--- Estructura de la base de datos ---")
for schema, tables in db_structure.items():
print(f"Schema: {schema}")
for table in tables:
print(f" - {table}")
async def schema_and_table_exists_old(session: AsyncSession, schema: str, table: str) -> bool:
# Get the bound engine
engine = session.get_bind()
async with engine.connect() as conn:
def _check(sync_conn):
inspector = inspect(sync_conn)
# List all schemas
schemas = inspector.get_schema_names()
if schema not in schemas:
return False
# Check if table exists in that schema
tables = inspector.get_table_names(schema=schema)
return table in tables
return await conn.run_sync(_check)
async def schema_and_table_exists(db_key: str, schema: str, table: str) -> bool:
"""
Check if a schema and table exist in the database for the given db_key.
Args:
db_key (str): The key to select DB connection from async_engines
schema (str): Schema name to check
table (str): Table name to check inside the schema
Returns:
bool: True if schema and table exist, False otherwise
"""
engine = get_engine(db_key)
async with engine.connect() as conn:
def _check(sync_conn):
inspector = inspect(sync_conn)
# Get all schemas
schemas = inspector.get_schema_names()
if schema not in schemas:
return False
# Get all tables in schema
tables = inspector.get_table_names(schema=schema)
return table in tables
return await conn.run_sync(_check)
if __name__ == "__main__":
async def main():
await test_db()
print("---------------------------------------------------------------------")
print("\n--- Checking schema and table existence ---")
# Check if schema 'ine' and table 'x_users' exist
exists = await schema_and_table_exists(my_main_db, "ine", "x_users")
if exists:
print("✅ Schema 'ine' and table 'x_users' exist")
else:
print("❌ Schema 'ine' or table 'x_users' does not exist")
asyncio.run(main())