#!/usr/bin/env python3
"""
Sync dataset documentation (README.md and metadata.json) from S3 file server to database.
This script fetches README.md and metadata.json files from the S3 filestore
and caches their contents in the datasets table for faster access.
Usage:
# Sync all datasets
uv run python -m dataio.scripts.sync_dataset_documentation
# Sync specific dataset
uv run python -m dataio.scripts.sync_dataset_documentation --dataset DS_EXAMPLE01
# Dry run (show what would be synced)
uv run python -m dataio.scripts.sync_dataset_documentation --dry-run
"""
import argparse
import logging
from datetime import datetime
from typing import Optional
import os
import sys
import boto3
from botocore.client import Config
from botocore.exceptions import ClientError
import dotenv
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
dotenv.load_dotenv()
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
[docs]
def get_database_url() -> str:
"""Build database URL from environment variables."""
host = os.getenv("DB_HOST", "localhost")
port = os.getenv("DB_PORT", "5432")
user = os.getenv("DB_USER", "postgres")
password = os.getenv("DB_PASSWORD", "")
database = os.getenv("DB_NAME", "catalogue")
return f"postgresql://{user}:{password}@{host}:{port}/{database}"
[docs]
def get_s3_client():
"""Initialize S3 client."""
session = boto3.Session(
aws_access_key_id=os.getenv("AWS_ACCESS_KEY"),
aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
)
s3 = session.resource("s3")
bucket = s3.Bucket(os.getenv("AWS_BUCKET_NAME"))
return bucket
[docs]
def fetch_file_from_s3(bucket, dataset_id: str, filename: str) -> Optional[str]:
"""
Fetch a file from S3 for a dataset.
Looks in both STANDARDISED and PREPROCESSED versions.
Returns the file content as string, or None if not found.
"""
for version_type in ["STANDARDISED", "PREPROCESSED"]:
key = f"filestore/{version_type}/{dataset_id}/{filename}"
try:
obj = bucket.Object(key)
content = obj.get()["Body"].read().decode("utf-8")
logger.debug(f"Found {filename} at {key}")
return content
except ClientError as e:
if e.response["Error"]["Code"] == "NoSuchKey":
continue
logger.warning(f"Error fetching {key}: {e}")
except Exception as e:
logger.warning(f"Unexpected error fetching {key}: {e}")
return None
[docs]
def sync_dataset_documentation(
db_session,
bucket,
dataset_id: str,
dry_run: bool = False
) -> dict:
"""
Sync documentation for a single dataset.
Returns dict with sync results.
"""
result = {
"ds_id": dataset_id,
"readme_found": False,
"data_dictionary_found": False,
"updated": False,
"error": None,
}
try:
# Fetch README.md
readme_content = fetch_file_from_s3(bucket, dataset_id, "README.md")
if readme_content:
result["readme_found"] = True
logger.info(f" Found README.md ({len(readme_content)} chars)")
# Fetch metadata.json (data dictionary)
data_dict_content = fetch_file_from_s3(bucket, dataset_id, "metadata.json")
if data_dict_content:
result["data_dictionary_found"] = True
logger.info(f" Found metadata.json ({len(data_dict_content)} chars)")
# Update database if any content found
if readme_content or data_dict_content:
if not dry_run:
update_query = text("""
UPDATE datasets
SET readme_md = :readme,
data_dictionary_json = :data_dict,
documentation_synced_at = :synced_at
WHERE ds_id = :ds_id
""")
db_session.execute(update_query, {
"readme": readme_content,
"data_dict": data_dict_content,
"synced_at": datetime.utcnow(),
"ds_id": dataset_id,
})
db_session.commit()
result["updated"] = True
logger.info(f" Updated database")
else:
logger.info(f" [DRY RUN] Would update database")
result["updated"] = True
else:
logger.info(f" No documentation files found")
except Exception as e:
result["error"] = str(e)
logger.error(f" Error: {e}")
db_session.rollback()
return result
[docs]
def main():
parser = argparse.ArgumentParser(
description="Sync dataset documentation from S3 to database"
)
parser.add_argument(
"--dataset", "-d",
help="Sync only this dataset ID (e.g., DS_EXAMPLE01)"
)
parser.add_argument(
"--dry-run", "-n",
action="store_true",
help="Show what would be synced without making changes"
)
parser.add_argument(
"--verbose", "-v",
action="store_true",
help="Enable verbose output"
)
args = parser.parse_args()
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
# Initialize database connection
logger.info("Connecting to database...")
engine = create_engine(get_database_url())
Session = sessionmaker(bind=engine)
db_session = Session()
# Initialize S3 client
logger.info("Connecting to S3...")
try:
bucket = get_s3_client()
except Exception as e:
logger.error(f"Failed to connect to S3: {e}")
sys.exit(1)
# Get datasets to sync
if args.dataset:
# Sync specific dataset
datasets = [(args.dataset,)]
logger.info(f"Syncing documentation for dataset: {args.dataset}")
else:
# Get all dataset IDs
result = db_session.execute(text("SELECT ds_id FROM datasets ORDER BY ds_id"))
datasets = result.fetchall()
logger.info(f"Found {len(datasets)} datasets to sync")
if args.dry_run:
logger.info("=== DRY RUN MODE ===")
# Sync each dataset
results = {
"total": len(datasets),
"readme_found": 0,
"data_dict_found": 0,
"updated": 0,
"errors": 0,
}
for (ds_id,) in datasets:
logger.info(f"Processing {ds_id}...")
result = sync_dataset_documentation(db_session, bucket, ds_id, args.dry_run)
if result["readme_found"]:
results["readme_found"] += 1
if result["data_dictionary_found"]:
results["data_dict_found"] += 1
if result["updated"]:
results["updated"] += 1
if result["error"]:
results["errors"] += 1
# Print summary
logger.info("=" * 50)
logger.info("SYNC SUMMARY")
logger.info(f" Total datasets: {results['total']}")
logger.info(f" READMEs found: {results['readme_found']}")
logger.info(f" Data dictionaries: {results['data_dict_found']}")
logger.info(f" Datasets updated: {results['updated']}")
logger.info(f" Errors: {results['errors']}")
db_session.close()
if results["errors"] > 0:
sys.exit(1)
if __name__ == "__main__":
main()