Este proyecto es un organizador fotográfico avanzado que procesa imágenes, extrae metadatos y los almacena en una base de datos MongoDB. El sistema consta de varios módulos que se ejecutan en un orden específico para procesar la colección completa de imágenes.
Funcionalidades principales:
Nota sobre procesamiento facial y metadatos: La detección facial y la inserción de metadatos en los archivos se ha realizado con la aplicación Picasa 3 de Google. Para más información detallada sobre el uso de Picasa 3, consulte el tutorial dedicado.
El código fuente completo del proyecto de organización de imágenes se encuentra disponible para descarga:
codigo/Descargar Proyecto (ALBUM_SEMANTICO.zip)
Instrucciones de descarga:
📁 ALBUM_SEMANTICO/
├── 📁 1_ACTUALIZAR_ID_MONGODB/
│ └── 📄 actualizar_ids_mongodb_imagenes.py
├── 📁 2_ALIMENTAR_BD/
│ ├── 📄 =0.10.0 (especificación de versión)
│ ├── 📄 =1.7.3 (especificación de versión)
│ ├── 📄 =1.9.0 (especificación de versión)
│ ├── 📄 =1.18.1 (especificación de versión)
│ ├── 📄 =3.4.3 (especificación de versión)
│ ├── 📄 =4.1.2 (especificación de versión)
│ ├── 📄 =7.0.0 (especificación de versión)
│ └── 📄 1-alimentar.py
├── 📁 3_EXTRAER_XMP/
│ ├── 📄 1_extraer_xmp_a_mongodb .py
│ └── 📄 2_extraer_todos_metadatos_a_mongodb.py
├── 📁 GENERAR_ESTADISTICAS_MONGODB/
│ ├── 📄 generar_estadisticas_mongodb.py
│ └── 📄 requirements.txt
└── 📄 manual_empleo.html (este archivo de documentación)
actualizar_ids_mongodb_imagenes.py - Script principal del módulo1-alimentar.py - Script principal del módulo=X.X.X - Archivos de especificación de versiones de dependencias1_extraer_xmp_a_mongodb .py - Script para extraer nombres de personas2_extraer_todos_metadatos_a_mongodb.py - Script para extraer todos los metadatos XMP/EXIFgenerar_estadisticas_mongodb.py - Script principal del módulorequirements.txt - Archivo de dependencias específicaspymongo - Para conexión con MongoDBpandas - Para procesamiento de datos (solo para estadísticas)colorama - Para salida coloreada (opcional, solo para estadísticas)PIL (Pillow) - Para procesamiento de imágenesexifread - Para lectura de metadatos EXIFrequests - Para geocodificaciónpyexiv2 - Para procesamiento de metadatos XMPInstalación de dependencias:
MONGO_URI - URI de conexión a MongoDBDB_NAME - Nombre de la base de datosCOLLECTION_NAME - Nombre de la colección principalMONGODB_URI - URL de MongoDB para el generador de estadísticasDATABASE_NAME - Base de datos para estadísticasCOLLECTION_NAME - Colección para estadísticasPropósito: Almacena toda la información básica de imágenes procesadas
{
"_id": "hash_sha512_de_la_imagen", // Índice único (SHA-512 del archivo)
"hash_sha512": "string", // Hash del archivo completo
"ruta": "string", // Ruta completa al archivo
"nombre": "string", // Nombre del archivo
// Metadatos de imagen
"ancho": number, // Ancho en píxeles
"alto": number, // Alto en píxeles
"peso": number, // Tamaño en KB
// Fecha de creación del archivo (componentes separados)
"fecha_creacion_dia": "DD",
"fecha_creacion_mes": "MM",
"fecha_creacion_anio": "YYYY",
"fecha_creacion_hora": "HH",
"fecha_creacion_minuto": "MM",
// Información de ubicación GPS
"coordenadas": [lat, lng], // Array con coordenadas decimales
"calle": "string", // Calle geocodificada
"barrio": "string", // Barrio geocodificado
"ciudad": "string", // Ciudad geocodificada
"cp": "string", // Código postal
"pais": "string", // País geocodificado
// Fecha de procesamiento (examen automático)
"fecha_procesamiento_dia": "DD",
"fecha_procesamiento_mes": "MM",
"fecha_procesamiento_anio": "YYYY",
"fecha_procesamiento_hora": "HH",
"fecha_procesamiento_minuto": "MM",
// Array de personas identificadas (módulo 3)
"personas": ["string"], // Array de nombres de personas
// Campo opcional para procesamiento adicional
"objeto_procesado": boolean, // Estado de procesamiento
"objetos": ["string"], // Array de objetos detectados
"ruta_alternativa": "string" // Ruta alternativa del archivo
}
Propósito: Almacena todos los metadatos XMP y EXIF avanzados
{
"_id": ObjectId(), // ID automático de MongoDB
"ruta_imagen": "string", // Ruta completa al archivo original
"fecha_extraccion": "YYYY-MM-DD HH:MM:SS", // Cuando se extrajeron los metadatos
// Metadatos XMP (extensibles)
"Xmp.xmp.CreateDate": "string", // Fecha de creación XMP
"Xmp.xmp.MetadataDate": "string", // Fecha de metadatos
"Xmp.xmp.CreatorTool": "string", // Herramienta creadora
"Xmp.xmp.Label": "string", // Etiqueta XMP
"Xmp.mwg-rs.Regions/mwg-rs:RegionList[i]/mwg-rs:Name": "string", // Nombres de regiones
"Xmp.mwg-rs.Regions/mwg-rs:RegionList[i]/mwg-rs:Area": {
"mwg-rs:x": number,
"mwg-rs:y": number,
"mwg-rs:w": number,
"mwg-rs:h": number
},
// Ejemplos de metadatos EXIF (extensibles)
"Exif.Image.Make": "string", // Fabricante de la cámara
"Exif.Image.Model": "string", // Modelo de la cámara
"Exif.Image.Software": "string", // Software usado
"Exif.Image.DateTime": "string", // Fecha y hora EXIF
"Exif.Image.Orientation": number, // Orientación de la imagen
"Exif.Photo.ExposureTime": number, // Tiempo de exposición
"Exif.Photo.FNumber": "f/number", // Número F
"Exif.Photo.ISOSpeedRatings": number, // ISO
"Exif.Photo.FocalLength": number, // Longitud focal
"Exif.GPSInfo.GPSLatitude": "DD MM SS", // Latitud GPS
"Exif.GPSInfo.GPSLongitude": "DD MM SS", // Longitud GPS
"Exif.GPSInfo.GPSLatitudeRef": "N/S", // Referencia latitud
"Exif.GPSInfo.GPSLongitudeRef": "E/W", // Referencia longitud
// Más campos XMP/EXIF según disponibilidad...
}
Nota: Esta colección contiene todos los metadatos XMP y EXIF encontrados. Los campos son dinámicos según el contenido de cada imagen.
Propósito: Colección analizada por el generador de estadísticas
Utilizada por el Módulo 4 para generar reportes estadísticos detallados.
| Variable | Valor por Defecto | Propósito |
|---|---|---|
MONGO_URI |
mongodb://localhost:27017 |
URI de conexión a MongoDB |
DB_NAME |
your_database_name |
Nombre de la base de datos |
COLLECTION_NAME |
your_collection_name |
Nombre de la colección principal |
MONGODB_URI |
mongodb://localhost:27017/ |
URI alternativo para estadísticas |
DATABASE_NAME |
album |
Base de datos para estadísticas |
COLLECTION_NAME |
imagenes_2 |
Colección para estadísticas |
imagenes_collection o target_collectiontarget_metadata_collectionimagenes_2{ "_id": 1 } - Índice único automático de MongoDB{ "ruta": 1 } - Para búsqueda rápida por archivo{ "hash_sha512": 1 } - Para detección de duplicados{ "fecha_procesamiento": 1 } - Para ordenamiento temporal{ "coordenadas": 1 } - Para consultas geoespaciales{ "ruta_imagen": 1 } - Para metadatos (colección metadata)Los módulos deben ejecutarse en el siguiente orden para garantizar el correcto procesamiento de los datos:
Ubicación: 1_ACTUALIZAR_ID_MONGODB/actualizar_ids_mongodb_imagenes.py
Propósito: Calcula hashes SHA-512 de las imágenes y los utiliza como IDs únicos en MongoDB, evitando duplicados.
¿Cuándo ejecutar? Este script es fundamental para garantizar la integridad de los datos antes de cualquier procesamiento.
Ubicación: 2_ALIMENTAR_BD/1-alimentar.py
Propósito: Escanea un directorio de imágenes, extrae metadatos básicos y coordenadas GPS, y los almacena en MongoDB con geocodificación automática.
¿Cuándo ejecutar? Después de actualizar los IDs y antes de extraer metadatos adicionales.
Ubicación: 3_EXTRAER_XMP/
Propósito: Extrae metadatos XMP específicos relacionados con personas identificadas en las imágenes y todos los metadatos XMP/EXIF.
¿Cuándo ejecutar? Después de haber alimentado la base de datos con imágenes.
1_extraer_xmp_a_mongodb .py - Extrae nombres de personas desde metadatos XMP2_extraer_todos_metadatos_a_mongodb.py - Extrae todos los metadatos XMP y EXIFUbicación: GENERAR_ESTADISTICAS_MONGODB/generar_estadisticas_mongodb.py
Propósito: Genera estadísticas matemáticas y categóricas completas sobre todos los datos procesados.
¿Cuándo ejecutar? En cualquier momento después de tener datos en la base de datos.
Funcionamiento: Este script se conecta a MongoDB y recorre todos los documentos de la colección especificada. Para cada documento que tenga una ruta de archivo accesible, calcula el hash SHA-512 del contenido de la imagen y compara con el _id actual. Si no coinciden, actualiza el documento para usar el hash como _id único.
Pasos de ejecución:
python 1_ACTUALIZAR_ID_MONGODB/actualizar_ids_mongodb_imagenes.py1_ACTUALIZAR_ID_MONGODB/actualizar_ids_mongodb_imagenes.py
"""
Actualizar ID de MongoDB para Imágenes
Este script se conecta a una base de datos MongoDB y actualiza los documentos
en la colección especificada, utilizando hashes SHA-512 calculados a partir
de los archivos de imagen para establecer el campo _id y evitar duplicados.
Funcionalidades principales:
- Calcula el hash SHA-512 de archivos de imagen.
- Actualiza documentos en MongoDB si el hash no coincide con el _id actual.
- Maneja casos de duplicados para evitar eliminación de datos existentes.
- Imprime mensajes para archivos modificados.
Configuración requerida (puede personalizarse con variables de entorno):
- MONGO_URI: URI de conexión a MongoDB (por defecto: mongodb://localhost:27017).
- DB_NAME: Nombre de la base de datos (por defecto: "your_database_name").
- COLLECTION_NAME: Nombre de la colección (por defecto: "your_collection_name").
Notas:
- El campo "ruta" en cada documento debe contener la ruta completa al archivo de imagen.
- Solo procesa documentos donde la ruta existe y es accesible.
- Cierra la conexión a MongoDB al finalizar.
"""
import os
import hashlib
from pymongo import MongoClient
# --- CONFIGURACIÓN (Usar variables de entorno para personalización) ---
MONGO_URI = os.environ.get("MONGO_URI", "mongodb://localhost:27017")
DB_NAME = os.environ.get("DB_NAME", "your_database_name")
COLLECTION_NAME = os.environ.get("COLLECTION_NAME", "your_collection_name")
def calcular_sha512(ruta):
"""Calcula el hash SHA-512 de un archivo dado"""
sha512 = hashlib.sha512()
with open(ruta, "rb") as f:
for bloque in iter(lambda: f.read(4096), b""):
sha512.update(bloque)
return sha512.hexdigest()
def main():
# Conexión a MongoDB
cliente = MongoClient(MONGO_URI)
db = cliente[DB_NAME]
coleccion = db[COLLECTION_NAME]
# Recorremos documentos
for doc in coleccion.find({}):
ruta = doc.get("ruta")
if not ruta or not os.path.exists(ruta):
continue
# Calcular hash actual de la imagen
hash_actual = calcular_sha512(ruta)
# Verificar si está desactualizado
if doc["_id"] != hash_actual or doc.get("hash") != hash_actual:
# Chequear si ya existe un documento con _id = hash_actual
existing = coleccion.find_one({"_id": hash_actual})
if not existing:
# Si no existe, insertar nuevo y eliminar viejo
nuevo_doc = dict(doc)
nuevo_doc["_id"] = hash_actual
nuevo_doc["hash"] = hash_actual
coleccion.insert_one(nuevo_doc)
coleccion.delete_one({"_id": doc["_id"]})
print(f"Archivo modificado: {ruta}")
else:
# Si existe, no hacer nada para evitar eliminación
pass
# Si no necesita actualización, no informa nada
cliente.close()
if __name__ == "__main__":
main()
Funcionamiento: Este script procesa todas las imágenes en un directorio específico, extrae metadatos básicos como dimensiones, peso, fecha de creación, calcula el hash SHA-512 y procesa coordenadas GPS si están disponibles. Realiza geocodificación inversa para obtener dirección completa desde las coordenadas.
Pasos de configuración y ejecución:
DIRECTORY por la ruta donde están las imágenespython 2_ALIMENTAR_BD/1-alimentar.py2_ALIMENTAR_BD/1-alimentar.py
# CONDA: your_environment
"""
Script para alimentar una base de datos MongoDB con metadatos de imágenes.
Este módulo procesa imágenes de un directorio específico, extrae metadatos
como dimensiones, fecha de creación, coordenadas GPS y dirección geocodificada,
y los almacena en una colección de MongoDB usando el hash SHA512 del archivo
como identificador único.
"""
import os
import sys
import exifread
import requests
import pymongo
from datetime import datetime
import hashlib
from PIL import Image
# Constante para el directorio a escanear
DIRECTORY = '/path/to/your/image/directory' # Cambia esta ruta por la deseada
MONGO_URI = "mongodb://localhost:27017"
DB_NAME = "your_database_name"
COLLECTION_NAME = "images_collection"
# Conexión a MongoDB
client = pymongo.MongoClient(MONGO_URI)
db = client[DB_NAME]
collection = db[COLLECTION_NAME]
def get_file_hash(file_path):
"""
Calcula el hash SHA512 del archivo para identificar duplicados.
Args:
file_path (str): Ruta completa al archivo de imagen.
Returns:
str or None: Hash SHA512 en formato hexadecimal, o None si ocurre un error.
"""
hash_obj = hashlib.sha512()
try:
with open(file_path, 'rb') as f:
while chunk := f.read(8192):
hash_obj.update(chunk)
return hash_obj.hexdigest()
except Exception:
return None
def get_image_metadata(file_path):
"""
Extrae metadatos básicos de la imagen usando PIL.
Args:
file_path (str): Ruta completa al archivo de imagen.
Returns:
dict or None: Diccionario con metadatos de la imagen incluyendo:
- nombre_archivo: Nombre del archivo
- ruta_completa: Ruta completa del archivo
- ancho: Ancho de la imagen en píxeles
- alto: Alto de la imagen en píxeles
- fecha_creacion: Timestamp de modificación del archivo
- coordenadas: None (se completa posteriormente con GPS)
- fecha_procesamiento: Timestamp del momento del procesamiento
Retorna None si ocurre un error al procesar la imagen.
"""
try:
with Image.open(file_path) as img:
width, height = img.size
fecha_dt = datetime.fromtimestamp(os.path.getmtime(file_path))
fecha_proc_dt = datetime.now()
return {
'nombre': os.path.basename(file_path),
'ruta': file_path,
'ancho': width,
'alto': height,
'peso': os.path.getsize(file_path) / 1024,
'fecha_creacion_dia': fecha_dt.strftime('%d'),
'fecha_creacion_mes': fecha_dt.strftime('%m'),
'fecha_creacion_anio': fecha_dt.strftime('%Y'),
'fecha_creacion_hora': fecha_dt.strftime('%H'),
'fecha_creacion_minuto': fecha_dt.strftime('%M'),
'coordenadas': None,
'fecha_procesamiento_dia': fecha_proc_dt.strftime('%d'),
'fecha_procesamiento_mes': fecha_proc_dt.strftime('%m'),
'fecha_procesamiento_anio': fecha_proc_dt.strftime('%Y'),
'fecha_procesamiento_hora': fecha_proc_dt.strftime('%H'),
'fecha_procesamiento_minuto': fecha_proc_dt.strftime('%M')
}
except Exception as e:
print(f"Error extrayendo metadatos de {file_path}: {e}")
return None
def dms_to_decimal(dms, ref):
"""
Convierte coordenadas del formato DMS (grados, minutos, segundos) a formato decimal.
Args:
dms: Tupla con tres valores FRS (Fractional Rational String) representando
grados, minutos y segundos de las coordenadas GPS.
ref (str): Referencia de dirección ('N', 'S', 'E', 'W').
Returns:
float: Coordenadas en formato decimal. Negativas para 'S' y 'W'.
"""
degrees = dms[0].num / dms[0].den
minutes = dms[1].num / dms[1].den
seconds = dms[2].num / dms[2].den
decimal = degrees + (minutes / 60) + (seconds / 3600)
if ref in ['S', 'W']:
decimal = -decimal
return decimal
def reverse_geocode(lat, lon):
"""
Realiza geocodificación inversa usando la API de Nominatim (OpenStreetMap).
Args:
lat (float): Latitud en formato decimal.
lon (float): Longitud en formato decimal.
Returns:
dict or None: Diccionario con información de dirección estructurada, o None si falla.
Contiene: display_name, address (con road, suburb, city, postcode, country)
"""
try:
url = f"https://nominatim.openstreetmap.org/reverse?format=json&lat={lat}&lon={lon}&zoom=18&addressdetails=1"
headers = {'User-Agent': 'anonymous_image_processor'}
response = requests.get(url, headers=headers, timeout=10)
if response.status_code == 200:
data = response.json()
if 'address' in data:
address_details = data.get('address', {})
structured_address = {
'display_name': data.get('display_name', ''),
'address': {
'road': address_details.get('road', ''),
'suburb': address_details.get('suburb', ''),
'city': address_details.get('city', ''),
'postcode': address_details.get('postcode', ''),
'country': address_details.get('country', '')
}
}
return structured_address
else:
print(f"API response missing address details: {data}")
return None
else:
print(f"Geocoding API error {response.status_code}: {response.text}")
return None
except requests.exceptions.RequestException as e:
print(f"Network error during geocoding: {e}")
return None
def get_gps_location(file_path):
"""
Procesa una imagen para extraer información GPS y actualizar la base de datos.
Calcula el hash del archivo, verifica si ya existe en la base de datos,
extrae coordenadas GPS de los metadatos EXIF, las geocodifica a componentes de dirección,
y actualiza el documento en MongoDB. Si el documento no existe, lo crea primero
con metadatos básicos.
Args:
file_path (str): Ruta completa al archivo de imagen a procesar.
Returns:
None: No retorna valores, actualiza la base de datos directamente.
Raises:
Exception: Se maneja internamente imprimiendo mensajes de error.
"""
try:
hash_value = get_file_hash(file_path)
if not hash_value:
print(f"Error calculando hash para {file_path}")
return
# Comprobar si la imagen ya existe en la base de datos por hash
document = collection.find_one({'_id': hash_value})
if document:
print(f"Imagen ya existe en la base de datos, saltando procesamiento: {file_path}")
return
# Si no existe, insertar nuevo documento
metadata = get_image_metadata(file_path)
if metadata:
metadata['_id'] = hash_value
metadata['hash_sha512'] = hash_value
try:
collection.insert_one(metadata)
print(f"Documento insertado: {file_path}")
except Exception as e:
print(f"Error insertando documento: {e}")
return
else:
print(f"No se pudo obtener metadatos para insertar: {file_path}")
return
with open(file_path, 'rb') as f:
tags = exifread.process_file(f, details=False)
update_data = {}
# Inicializar siempre coordenadas y componentes de dirección
coordenadas = None
calle = ''
barrio = ''
ciudad = ''
cp = ''
pais = ''
if 'GPS GPSLatitude' in tags and 'GPS GPSLongitude' in tags:
lat_tag = tags['GPS GPSLatitude']
lon_tag = tags['GPS GPSLongitude']
lat_ref = str(tags['GPS GPSLatitudeRef'])
lon_ref = str(tags['GPS GPSLongitudeRef'])
lat = dms_to_decimal(lat_tag.values, lat_ref)
lon = dms_to_decimal(lon_tag.values, lon_ref)
coordenadas = [lat, lon]
address_data = reverse_geocode(lat, lon)
if address_data:
# Guardar componentes de dirección por separado
address_details = address_data.get('address', {})
calle = address_details.get('road', '')
barrio = address_details.get('suburb', '')
ciudad = address_details.get('city', '')
cp = address_details.get('postcode', '')
pais = address_details.get('country', '')
print(f"Dirección geocodificada: {address_data.get('display_name', '')}")
else:
print(f"No se pudo geocodificar: coordenadas {lat}, {lon}")
print(f"Actualizado {file_path} con GPS")
else:
print(f"Sin GPS: {file_path}")
# Siempre insertar los datos GPS y de dirección
update_data['coordenadas'] = coordenadas
update_data['calle'] = calle
update_data['barrio'] = barrio
update_data['ciudad'] = ciudad
update_data['cp'] = cp
update_data['pais'] = pais
# Siempre actualizar fecha_procesamiento con campos separados
now = datetime.now()
update_data['fecha_procesamiento_dia'] = now.strftime('%d')
update_data['fecha_procesamiento_mes'] = now.strftime('%m')
update_data['fecha_procesamiento_anio'] = now.strftime('%Y')
update_data['fecha_procesamiento_hora'] = now.strftime('%H')
update_data['fecha_procesamiento_minuto'] = now.strftime('%M')
collection.update_one({'_id': hash_value}, {'$set': update_data})
except Exception as e:
print(f"Error procesando {file_path}: {e}")
def main():
"""
Función principal que escanea el directorio definido y procesa todas las imágenes.
Escanea recursivamente el directorio DIRECTORY en busca de archivos de imagen
con extensiones válidas (jpg, jpeg, png, tiff, bmp), y para cada imagen encontrada
ejecuta el procesamiento completo: extracción de metadatos, GPS y actualización
en la base de datos MongoDB.
Returns:
None: No retorna valores, imprime información de progreso en consola.
Raises:
SystemExit: Si el directorio especificado en DIRECTORY no existe.
"""
dir_path = DIRECTORY
if not os.path.isdir(dir_path):
print("El directorio no existe.")
sys.exit(1)
print(f"Procesando directorio: {dir_path}\n")
for root, dirs, files in os.walk(dir_path):
for file in files:
ext = file.lower().split('.')[-1]
if ext in ['jpg', 'jpeg', 'png', 'webp', 'heic', 'heif']:
path = os.path.join(root, file)
print(f"Procesando: {path}")
get_gps_location(path)
if __name__ == "__main__":
main()
Este módulo tiene dos sub-scripts que procesan metadatos XMP:
Extrae nombres de personas identificadas en las fotos desde metadatos XMP y los agrega al campo 'personas' de los documentos en MongoDB.
Ejecución: python 3_EXTRAER_XMP/1_extraer_xmp_a_mongodb .py
3_EXTRAER_XMP/1_extraer_xmp_a_mongodb .py
#!/usr/bin/env python3
"""
Script para extraer metadatos XMP específicos de imágenes almacenadas en MongoDB.
Extrae el valor correspondiente a la etiqueta ETIQUETA_XMP de todas las imágenes
en la colección target_collection de la base de datos your_database_name.
"""
import os
import json
import time
from pymongo import MongoClient
import pyexiv2
# Constante para la etiqueta XMP base a extraer
ETIQUETA_XMP_BASE = "Xmp.mwg-rs.Regions/mwg-rs:RegionList"
def conectar_mongodb(host='localhost', port=27017, db_name='your_database_name'):
"""Conecta a la base de datos MongoDB"""
try:
client = MongoClient(host, port)
db = client[db_name]
print(f"[✔] Conectado exitosamente a MongoDB: {db_name}")
return db
except Exception as e:
print(f"[✘] Error conectando a MongoDB: {e}")
return None
def obtener_rutas_imagenes(db):
"""Obtiene todas las rutas de imágenes de la colección target_collection"""
try:
collection = db['target_collection']
cursor = collection.find({}, {'ruta': 1, '_id': 0})
rutas = [doc['ruta'] for doc in cursor if 'ruta' in doc]
print(f"[✔] Encontradas {len(rutas)} rutas de imágenes en la base de datos")
return rutas
except Exception as e:
print(f"[✘] Error obteniendo rutas de imágenes: {e}")
return []
def limpiar_valor_xmp(valor_completo):
"""Limpia el valor XMP para extraer solo el nombre limpio"""
try:
# El formato típico es: ""
# Necesitamos extraer todo lo que viene después del "="
if "=" in valor_completo:
partes = valor_completo.split("=", 1)
if len(partes) == 2:
valor = partes[1].strip()
# Limpiar comillas y caracteres de cierre
valor = valor.rstrip('>').strip()
if valor.startswith('"') and valor.endswith('"'):
valor = valor[1:-1]
elif valor.startswith("'") and valor.endswith("'"):
valor = valor[1:-1]
return valor.strip()
return None
except Exception as e:
print(f"[!] Error limpiando valor XMP: {e}")
return None
def extraer_valores_etiquetas_xmp(imagen_path, etiqueta_base):
"""Extrae múltiples valores de etiquetas XMP de una imagen"""
try:
# Verificar si el archivo existe
if not os.path.exists(imagen_path):
print(f"[✘] Archivo no encontrado: {imagen_path}")
return []
print(f"[...] Procesando: {imagen_path}")
# Leer metadatos usando pyexiv2
metadata = pyexiv2.ImageMetadata(imagen_path)
metadata.read()
valores_encontrados = []
indice = 1
# Buscar todas las etiquetas numeradas
while True:
etiqueta_actual = etiqueta_base + f"[{indice}]/mwg-rs:Name"
if etiqueta_actual in metadata:
valor_completo = metadata[etiqueta_actual]
print(f"[✔] Valor encontrado en {etiqueta_actual}: {valor_completo}")
# Limpiar el valor extraído para obtener solo el nombre
valor_limpio = limpiar_valor_xmp(str(valor_completo))
if valor_limpio:
print(f"[✔] Valor limpio extraído: {valor_limpio}")
if valor_limpio not in valores_encontrados: # Evitar duplicados internos
valores_encontrados.append(valor_limpio)
else:
break # No hay más etiquetas numeradas
indice += 1
if not valores_encontrados:
print(f"[!] Ninguna etiqueta XMP encontrada en {imagen_path}")
return valores_encontrados
except Exception as e:
print(f"[✘] Error procesando {imagen_path}: {e}")
return []
def main():
"""Función principal"""
print("=== Extractor de XMP desde MongoDB ===")
print(f"Etiqueta XMP base a extraer: {ETIQUETA_XMP_BASE}")
print("=" * 50)
# Conectar a MongoDB
db = conectar_mongodb()
if db is None:
return
# Obtener colección y rutas de imágenes
collection = db['target_collection']
rutas_imagenes = obtener_rutas_imagenes(db)
if not rutas_imagenes:
print("No se encontraron rutas de imágenes en la base de datos")
return
print(f"Procesando {len(rutas_imagenes)} imágenes...")
print("-" * 50)
# Contadores
imagenes_procesadas = 0
imagenes_con_valor = 0
documentos_actualizados = 0
valores_agregados = 0
# Procesar cada imagen
for ruta in rutas_imagenes:
print(f"\nProcesando imagen {imagenes_procesadas + 1}/{len(rutas_imagenes)}:")
# Extraer múltiples valores XMP
valores_etiqueta = extraer_valores_etiquetas_xmp(ruta, ETIQUETA_XMP_BASE)
if valores_etiqueta:
imagenes_con_valor += 1
# Mostrar los valores encontrados
print("=" * 50)
print(f"RUTA: {ruta}")
print(f"VALORES ENCONTRADOS ({len(valores_etiqueta)}): {', '.join(valores_etiqueta)}")
print("=" * 50)
# Actualizar el documento en MongoDB siguiendo el flujo exacto solicitado
try:
# PASO 1: Primero, verificar y preparar el campo 'personas'
doc = collection.find_one({"ruta": ruta})
if doc:
# PASO 2: Verificar si campo 'personas' existe, sino crearlo vacío
if 'personas' not in doc or doc['personas'] is None:
lista_personas_actual = []
print(f"[i] Campo 'personas' no existe, inicializado vacío")
else:
if isinstance(doc['personas'], list):
lista_personas_actual = doc['personas'].copy()
print(f"[i] Campo 'personas' existe con {len(lista_personas_actual)} valores: {lista_personas_actual}")
else:
# Si no es lista, crear nueva lista vacía
lista_personas_actual = []
print(f"[i] Campo 'personas' existe pero no es lista, inicializado vacío")
# PASO 3: Procesar la imagen y extraer metadatos a otra lista
lista_valores_extraidos = valores_etiqueta.copy()
print(f"[i] Valores extraídos de XMP: {lista_valores_extraidos}")
# PASO 4: Comparar ambas listas y añadir solo valores nuevos
valores_a_anadir = []
for valor_extraido in lista_valores_extraidos:
if valor_extraido not in lista_personas_actual:
lista_personas_actual.append(valor_extraido)
valores_a_anadir.append(valor_extraido)
# PASO 5: Actualizar el campo personas solo si hay cambios
if valores_a_anadir:
print(f"[✔] Añadiendo valores nuevos: {valores_a_anadir}")
print(f"[✔] Campo 'personas' actualizado a: {lista_personas_actual}")
result = collection.update_one(
{"ruta": ruta},
{"$set": {"personas": lista_personas_actual}}
)
if result.modified_count > 0:
documentos_actualizados += 1
valores_agregados += len(valores_a_anadir)
print(f"[✔] Documento actualizado correctamente")
else:
print(f"[!] Error: No se pudo actualizar el documento")
else:
print(f"[i] No hay valores nuevos que añadir")
# Aun asi contamos la imagen como procesada
else:
print(f"[!] Documento no encontrado para ruta: {ruta}")
except Exception as e:
print(f"[✘] Error actualizando documento para {ruta}: {e}")
else:
print(f"[!] No se encontraron valores para etiquetas XMP en: {ruta}")
imagenes_procesadas += 1
# Resumen final
print("\n" + "=" * 60)
print("=== RESUMEN DEL PROCESAMIENTO ===")
print("=" * 60)
print(f"Total de imágenes procesadas: {imagenes_procesadas}")
print(f"Imágenes con valores en etiquetas: {imagenes_con_valor}")
print(f"Imágenes sin valores en etiquetas: {imagenes_procesadas - imagenes_con_valor}")
print(f"Documentos actualizados: {documentos_actualizados}")
print(f"Valores agregados al campo 'personas': {valores_agregados}")
print("\n" + "Procesamiento completado. Los documentos en MongoDB han sido actualizados con el campo 'personas'.")
if __name__ == "__main__":
# Verificar dependencias
try:
import pymongo
import pyexiv2
except ImportError as e:
print(f"[✘] Falta dependencia - {e}")
print("Instala las dependencias con:")
print("pip install pymongo pyexiv2")
exit(1)
main()
Extrae todos los metadatos XMP y EXIF y los guarda en una colección separada.
Ejecución: python 3_EXTRAER_XMP/2_extraer_todos_metadatos_a_mongodb.py
3_EXTRAER_XMP/2_extraer_todos_metadatos_a_mongodb.py
#!/usr/bin/env python3
"""
Script para extraer TODOS los metadatos XMP y EXIF de imágenes almacenadas en MongoDB.
Extrae todos los metadatos XMP y EXIF de todas las imágenes en la colección target_collection
de la base de datos your_database_name y los guarda en la colección target_metadata_collection.
"""
import os
import time
from pymongo import MongoClient
import pyexiv2
def conectar_mongodb(host='localhost', port=27017, db_name='your_database_name'):
"""Conecta a la base de datos MongoDB"""
try:
client = MongoClient(host, port)
db = client[db_name]
print(f"[✔] Conectado exitosamente a MongoDB: {db_name}")
return db
except Exception as e:
print(f"[✘] Error conectando a MongoDB: {e}")
return None
def obtener_rutas_imagenes(db):
"""Obtiene todas las rutas de imágenes de la colección target_collection"""
try:
collection = db['target_collection']
cursor = collection.find({}, {'ruta': 1, '_id': 0})
rutas = [doc['ruta'] for doc in cursor if 'ruta' in doc]
print(f"[✔] Encontradas {len(rutas)} rutas de imágenes en la base de datos")
return rutas
except Exception as e:
print(f"[✘] Error obteniendo rutas de imágenes: {e}")
return []
def extraer_todos_metadatos(imagen_path):
"""Extrae todos los metadatos XMP y EXIF de una imagen"""
try:
# Verificar si el archivo existe
if not os.path.exists(imagen_path):
print(f"[✘] Archivo no encontrado: {imagen_path}")
return {}
print(f"[...] Procesando: {imagen_path}")
# Leer metadatos usando pyexiv2
metadata = pyexiv2.ImageMetadata(imagen_path)
metadata.read()
# Diccionario para almacenar todos los metadatos
all_metadata = {}
# Iterar sobre todos los metadatos disponibles
for key in metadata:
if key.startswith(('Xmp.', 'Exif.')): # Extraer metadatos XMP y EXIF
try:
valor = metadata[key]
# Convertir valor a string o mantener tipo original si es simple
if isinstance(valor, (int, float, str)):
valor_limpio = valor
else:
valor_limpio = str(valor)
all_metadata[key] = valor_limpio
print(f"[✔] Metadato encontrado: {key} = {valor_limpio}")
except Exception as e:
print(f"[!] Error extrayendo {key}: {e}")
all_metadata[key] = f"Error: {str(e)}"
if not all_metadata:
print(f"[!] No se encontraron metadatos XMP o EXIF en {imagen_path}")
return all_metadata
except Exception as e:
print(f"[✘] Error procesando {imagen_path}: {e}")
return {}
def main():
"""Función principal"""
print("=== Extractor de TODOS los metadatos XMP y EXIF desde MongoDB ===")
print("Guardar en colección: target_metadata_collection")
print("=" * 60)
# Conectar a MongoDB
db = conectar_mongodb()
if db is None:
return
# Obtener colección fuente y rutas de imágenes
collection_fuente = db['target_collection']
rutas_imagenes = obtener_rutas_imagenes(db)
if not rutas_imagenes:
print("No se encontraron rutas de imágenes en la base de datos")
return
# Nueva colección para guardar todos los metadatos
collection_metadatos = db['target_metadata_collection']
print(f"Procesando {len(rutas_imagenes)} imágenes...")
print("-" * 50)
# Contadores
imagenes_procesadas = 0
imagenes_con_metadatos = 0
documentos_insertados = 0
# Procesar cada imagen
for ruta in rutas_imagenes:
print(f"\nProcesando imagen {imagenes_procesadas + 1}/{len(rutas_imagenes)}:")
# Extraer todos los metadatos
all_metadata = extraer_todos_metadatos(ruta)
if all_metadata:
imagenes_con_metadatos += 1
# Mostrar los metadatos encontrados (resumido)
print("=" * 60)
print(f"RUTA: {ruta}")
print(f"METADATOS ENCONTRADOS ({len(all_metadata)}):")
for key, value in all_metadata.items():
print(f" {key}: {value}")
print("=" * 60)
# Verificar si ya existe un documento para esta imagen
documento_existente = collection_metadatos.find_one({"ruta_imagen": ruta})
# Preparar documento con metadatos como campos individuales
documento_metadatos = {
"ruta_imagen": ruta,
"fecha_extraccion": time.strftime("%Y-%m-%d %H:%M:%S")
}
documento_metadatos.update(all_metadata) # Agregar todos los metadatos como campos individuales
if documento_existente:
# Comparar los metadatos uno por uno para ver si han cambiado
campos_han_cambiado = False
# Crear diccionario con solo los cambios
campos_actualizar = {
"fecha_extraccion": time.strftime("%Y-%m-%d %H:%M:%S")
}
for key, new_value in all_metadata.items():
existing_value = documento_existente.get(key)
if existing_value != new_value:
campos_han_cambiado = True
campos_actualizar[key] = new_value
print(f"[✔] Campo {key} cambiado de '{existing_value}' a '{new_value}'")
if campos_han_cambiado:
print(f"[✔] Actualizando campos modificados para {ruta}")
# Actualizar el documento existente
try:
result = collection_metadatos.update_one(
{"ruta_imagen": ruta},
{"$set": campos_actualizar}
)
if result.modified_count > 0:
documentos_insertados += 1
print(f"[✔] Documento actualizado en target_metadata_collection")
else:
print(f"[!] Error: No se pudo actualizar el documento")
except Exception as e:
print(f"[✘] Error actualizando documento para {ruta}: {e}")
else:
print(f"[i] Ningún campo ha cambiado para {ruta}, saltando actualización")
else:
# Insertar nuevo documento
try:
result = collection_metadatos.insert_one(documento_metadatos)
if result.inserted_id:
documentos_insertados += 1
print(f"[✔] Documento insertado en target_metadata_collection con ID: {result.inserted_id}")
else:
print(f"[!] Error: No se pudo insertar el documento")
except Exception as e:
print(f"[✘] Error insertando documento para {ruta}: {e}")
else:
print(f"[!] No se encontraron metadatos en: {ruta}")
imagenes_procesadas += 1
# Resumen final
print("\n" + "=" * 60)
print("=== RESUMEN DEL PROCESAMIENTO ===")
print("=" * 60)
print(f"Total de imágenes procesadas: {imagenes_procesadas}")
print(f"Imágenes con metadatos: {imagenes_con_metadatos}")
print(f"Imágenes sin metadatos: {imagenes_procesadas - imagenes_con_metadatos}")
print(f"Documentos insertados en target_metadata_collection: {documentos_insertados}")
print("\n" + "Procesamiento completado. Metadatos salvados en colección 'target_metadata_collection'.")
if __name__ == "__main__":
# Verificar dependencias
try:
import pymongo
import pyexiv2
except ImportError as e:
print(f"[✘] Falta dependencia - {e}")
print("Instala las dependencias con:")
print("pip install pymongo pyexiv2")
exit(1)
main()
Funcionamiento: Este script genera estadísticas completas sobre todos los datos almacenados en la colección MongoDB especificada. Calcula estadísticas matemáticas, frecuencias de valores, letras medias y más.
Características:
Ejecución: python GENERAR_ESTADISTICAS_MONGODB/generar_estadisticas_mongodb.py
Variables de entorno:
MONGODB_URI - URI de MongoDB (por defecto: mongodb://localhost:27017/)DATABASE_NAME - Nombre de la base de datos (por defecto: album)COLLECTION_NAME - Nombre de la colección (por defecto: imagenes_2)GENERAR_ESTADISTICAS_MONGODB/generar_estadisticas_mongodb.py
"""
Generador de Estadísticas Completas para Colección MongoDB
Este módulo genera estadísticas matemáticas y categóricas completas a partir de una colección
MongoDB específica. Se conecta a una instancia de MongoDB (local o remota), recupera datos de la colección
especificada, procesa los datos en un DataFrame de pandas, calcula estadísticas detalladas
sobre nombres de archivos, dimensiones de imágenes, fechas, ubicaciones, objetos detectados
y otros campos, muestra los resultados en formato legible con colores (opcionalmente),
y guarda los resultados en un archivo .dat con marca de tiempo.
Dependencias principales:
- pymongo: Para interacción con MongoDB
- pandas: Para manipulación de datos
- colorama: Opcional, para salida coloreada en terminal (instalar con: pip install colorama)
Uso:
Ejecutar directamente: python generar_estadisticas_mongodb.py
Asegúrate de configurar las variables de entorno MONGODB_URI, DATABASE_NAME y COLLECTION_NAME
Funciones principales:
- connect_to_mongodb(): Establece conexión con MongoDB
- get_data(): Obtiene todos los documentos de la colección
- convert_to_dataframe(): Convierte datos MongoDB a DataFrame con procesamiento de fechas
- compute_statistics(): Calcula estadísticas exhaustivas sobre los datos
- print_statistics(): Imprime estadísticas estructuradas y coloreadas
- save_to_dat_file(): Guarda estadísticas en archivo .dat con timestamp
- main(): Función principal que coordina todo el flujo
"""
import pymongo
from pymongo import MongoClient
import pandas as pd
from collections import Counter
from datetime import datetime
import statistics
import sys
import os
try:
import colorama
from colorama import Fore, Style, init
colorama_available = True
except ImportError:
print("colorama no instalado. Instala con: pip install colorama")
colorama_available = False
def connect_to_mongodb():
"""
Conecta a la base de datos MongoDB.
Usa variables de entorno para configuración: MONGODB_URI, DATABASE_NAME, COLLECTION_NAME.
Si no existen, usa valores por defecto útiles para desarrollo.
"""
try:
mongodb_uri = os.getenv('MONGODB_URI', 'mongodb://localhost:27017/')
database_name = os.getenv('DATABASE_NAME', 'album')
collection_name = os.getenv('COLLECTION_NAME', 'imagenes_2')
client = MongoClient(mongodb_uri)
db = client[database_name]
collection = db[collection_name]
print(f"Conectado a MongoDB en {mongodb_uri}, base de datos: {database_name}, colección: {collection_name}")
return collection
except Exception as e:
print(f"Error conectando a MongoDB: {e}")
sys.exit(1)
def get_data(collection):
"""
Obtiene todos los documentos de la colección.
"""
try:
data = list(collection.find())
return data
except Exception as e:
print(f"Error obteniendo datos: {e}")
return []
def convert_to_dataframe(data):
"""
Convierte la lista de documentos a un DataFrame de pandas.
"""
if not data:
print("No data found in collection.")
return pd.DataFrame()
df = pd.DataFrame(data)
# Convertir fechas a objetos datetime
df['fecha_creacion'] = pd.to_datetime(df['fecha_creacion_anio'].astype(str) + '-' +
df['fecha_creacion_mes'].astype(str) + '-' +
df['fecha_creacion_dia'].astype(str) + ' ' +
df['fecha_creacion_hora'].astype(str) + ':' +
df['fecha_creacion_minuto'].astype(str), errors='coerce')
df['fecha_procesamiento'] = pd.to_datetime(df['fecha_procesamiento_anio'].astype(str) + '-' +
df['fecha_procesamiento_mes'].astype(str) + '-' +
df['fecha_procesamiento_dia'].astype(str) + ' ' +
df['fecha_procesamiento_hora'].astype(str) + ':' +
df['fecha_procesamiento_minuto'].astype(str), errors='coerce')
return df
def compute_statistics(df):
"""
Calcula estadísticas matemáticas completas basadas en los datos.
"""
if df.empty:
return {}
stats = {}
stats['total_registros'] = len(df)
# Estadísticas sobre nombre (filenames)
if 'nombre' in df.columns:
names_filtered = df['nombre'][(df['nombre'].notna()) & (df['nombre'] != "")]
stats['nombre_total_cantidad'] = len(names_filtered)
stats['nombre_cantidad_unica'] = names_filtered.nunique()
stats['nombre_null_cantidad'] = df['nombre'].isnull().sum()
stats['nombre_vacio_cantidad'] = (df['nombre'] == "").sum()
if not names_filtered.empty:
most_common_name = names_filtered.value_counts().idxmax()
stats['nombre_mas_comun'] = most_common_name
stats['nombre_mas_comun_cantidad'] = names_filtered.value_counts().iloc[0]
# Estadísticas sobre extensiones
extensions = names_filtered.str.extract(r'(\.\w+)$').fillna('')
extension_counts = extensions[extensions != ""].squeeze().value_counts()
ext_most_common = extension_counts.idxmax() if not extension_counts.empty else None
stats['nombre_extension_mas_comun'] = ext_most_common
stats['nombre_extension_cantidad_unica'] = extension_counts.nunique()
# Longitud de nombres
name_lengths = names_filtered.str.len()
stats['nombre_longitud_media'] = name_lengths.mean()
stats['nombre_longitud_min'] = name_lengths.min()
stats['nombre_longitud_max'] = name_lengths.max()
else:
stats['nombre_mas_comun'] = None
stats['nombre_mas_comun_cantidad'] = 0
stats['nombre_extension_mas_comun'] = None
stats['nombre_extension_cantidad_unica'] = 0
stats['nombre_longitud_media'] = None
stats['nombre_longitud_min'] = None
stats['nombre_longitud_max'] = None
# Estadísticas numéricas básicas
numerical_fields = ['ancho', 'alto', 'peso']
for field in numerical_fields:
if field in df.columns:
values = df[field].dropna()
if not values.empty:
stats[f'{field}_cantidad'] = len(values)
stats[f'{field}_media'] = values.mean()
stats[f'{field}_mediana'] = values.median()
stats[f'{field}_desviacion_estandar'] = values.std()
stats[f'{field}_min'] = values.min()
stats[f'{field}_max'] = values.max()
stats[f'{field}_q25'] = values.quantile(0.25)
stats[f'{field}_q75'] = values.quantile(0.75)
# Estadísticas de fechas
for date_field in ['fecha_creacion', 'fecha_procesamiento']:
if date_field in df.columns:
dates = df[date_field].dropna()
if not dates.empty:
stats[f'{date_field}_mas_temprano'] = dates.min()
stats[f'{date_field}_mas_tarde'] = dates.max()
stats[f'{date_field}_ano_mas_comun'] = dates.dt.year.value_counts().idxmax()
stats[f'{date_field}_mes_mas_comun'] = dates.dt.month.value_counts().idxmax()
stats[f'{date_field}_fecha_media'] = pd.to_datetime(dates).mean()
# Estadísticas catégoricas (ubicación)
location_fields = ['barrio', 'calle', 'ciudad', 'cp', 'pais']
for field in location_fields:
if field in df.columns:
values = df[field]
filtered_values = values[(values.notna()) & (values != "")]
stats[f'{field}_cantidad_unica'] = len(filtered_values)
stats[f'{field}_null_cantidad'] = df[field].isnull().sum() + (df[field] == "").sum()
if not filtered_values.empty:
most_common = filtered_values.value_counts().idxmax()
stats[f'{field}_mas_comun'] = most_common if most_common != "" else None
else:
stats[f'{field}_mas_comun'] = None
stats[f'{field}_vacio_cantidad'] = (df[field] == "").sum()
# objeto_procesado
if 'objeto_procesado' in df.columns:
processed_counts = df['objeto_procesado'].value_counts()
stats['objeto_procesado_verdadero_cantidad'] = processed_counts.get(True, 0)
stats['objeto_procesado_falso_cantidad'] = processed_counts.get(False, 0)
# Estadísticas de arrays
# objetos
if 'objetos' in df.columns:
all_objetos = [obj for sublist in df['objetos'].dropna() for obj in sublist] # Aplanar lista
objeto_counts = Counter(all_objetos)
stats['objetos_total_unico'] = len(objeto_counts)
stats['objetos_total_cantidad'] = sum(objeto_counts.values())
stats['objetos_mas_comun'] = objeto_counts.most_common(1)[0] if objeto_counts else None
# Estadísticas sobre longitudes de array
array_lengths = df['objetos'].dropna().apply(len)
stats['objetos_array_longitud_media'] = array_lengths.mean()
stats['objetos_array_longitud_min'] = array_lengths.min()
stats['objetos_array_longitud_max'] = array_lengths.max()
# personas
if 'personas' in df.columns:
all_personas = [persona for sublist in df['personas'].dropna() for persona in sublist]
persona_counts = Counter(all_personas)
stats['personas_total_unico'] = len(persona_counts)
stats['personas_total_cantidad'] = sum(persona_counts.values())
stats['personas_mas_comun'] = persona_counts.most_common(1)[0] if persona_counts else None
array_lengths = df['personas'].dropna().apply(len)
stats['personas_array_longitud_media'] = array_lengths.mean() if not array_lengths.empty else 0
stats['personas_array_longitud_min'] = array_lengths.min() if not array_lengths.empty else 0
stats['personas_array_longitud_max'] = array_lengths.max() if not array_lengths.empty else 0
# ruta_alternativa
if 'ruta_alternativa' in df.columns:
values = df['ruta_alternativa'].dropna()
stats['ruta_alternativa_cantidad_unica'] = values.nunique()
stats['ruta_alternativa_null_cantidad'] = df['ruta_alternativa'].isnull().sum()
ruta_most_common = values.value_counts().idxmax() if not values.empty else None
if ruta_most_common == "":
ruta_most_common = None
stats['ruta_alternativa_mas_comun'] = ruta_most_common
# coordenadas
if 'coordenadas' in df.columns:
coords = df['coordenadas'].dropna()
stats['coordenadas_no_null_cantidad'] = len(coords)
stats['coordenadas_null_cantidad'] = df['coordenadas'].isnull().sum()
# Asumiendo que son tipo dict o array [lat, lon], pero como en ejemplo es null, dejarlo así
return stats
def print_statistics(stats, collection_name):
"""
Imprime las estadísticas de forma legible con colores y estructura.
"""
if colorama_available:
init(autoreset=True)
def print_colored(text, fore_color=None, style=None):
if not colorama_available:
print(text)
else:
output = ""
if style:
output += style
if fore_color:
output += fore_color
print(f"{output}{text}")
def print_group(title, keys):
print_colored(f"\n{title}:", Fore.CYAN, Style.BRIGHT)
for key in keys:
if key in stats:
print_colored(f" {key}: {stats[key]}", Fore.WHITE, Style.NORMAL)
print_colored(f"Estadísticas Matemáticas Completas de la Colección '{collection_name}'", Fore.GREEN, Style.BRIGHT)
print_colored("=" * 80, Fore.YELLOW)
# Grupos de estadísticas
general = ['total_registros']
nombre_stats = [k for k in stats if k.startswith('nombre_')]
numericas = [k for k in stats if any(k.startswith(f'{f}_') for f in ['ancho', 'alto', 'peso'])]
fechas = [k for k in stats if '_fecha' in k or 'ano' in k or 'mes' in k or k.endswith('_temprano') or k.endswith('_tarde')]
ubicacion = [k for k in stats if any(k.startswith(f'{f}_') for f in ['barrio', 'calle', 'ciudad', 'cp', 'pais'])]
processed = [k for k in stats if 'objeto_procesado' in k]
arrays = [k for k in stats if 'objetos_' in k or 'personas_' in k]
other = ['ruta_alternativa_cantidad_unica', 'ruta_alternativa_null_cantidad', 'ruta_alternativa_mas_comun', 'coordenadas_no_null_cantidad', 'coordenadas_null_cantidad']
print_group("Estadísticas Generales", general)
print_group("Estadísticas del Nombre", nombre_stats)
print_group("Estadísticas Numéricas", numericas)
print_group("Estadísticas de Fechas", fechas)
print_group("Estadísticas de Ubicación", ubicacion)
print_group("Procesamiento", processed)
print_group("Arreglos y Listas", arrays)
print_group("Otros", other)
print_colored("\n" + "=" * 80, Fore.YELLOW)
def save_to_dat_file(stats, db_name, collection_name):
"""
Guarda las estadísticas en un archivo .dat.
"""
current_date = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f"{db_name}_{collection_name}_{current_date}.dat"
try:
with open(filename, 'w') as f:
f.write(f"Estadísticas Matemáticas Completas de la Colección '{collection_name}'\n")
f.write("=" * 80 + "\n")
for key, value in stats.items():
f.write(f"{key}: {value}\n")
f.write("=" * 80 + "\n")
print(f"Estadísticas guardadas en {filename}")
except Exception as e:
print(f"Error guardando archivo: {e}")
def main():
"""
Función principal del script.
"""
mongodb_uri = os.getenv('MONGODB_URI', 'mongodb://localhost:27017/')
database_name = os.getenv('DATABASE_NAME', 'album')
collection_name = os.getenv('COLLECTION_NAME', 'imagenes_2')
collection = connect_to_mongodb()
data = get_data(collection)
df = convert_to_dataframe(data)
stats = compute_statistics(df)
print_statistics(stats, collection_name)
save_to_dat_file(stats, database_name, collection_name)
if __name__ == "__main__":
main()
Este sistema de organización de imágenes proporciona una solución completa para el procesamiento de grandes colecciones fotográficas. Siguiendo el orden establecido, se puede procesar automáticamente todo el contenido, extraer metadatos valiosos y generar reportes estadísticos detallados.
Recomendaciones: