Source code for dataio.scripts.sync_dataset_documentation
#!/usr/bin/env python3"""Sync dataset documentation (README.md and manifest files) from S3 file server to database.This script fetches README.md and manifest files from the S3 filestoreand caches their contents in the datasets table for faster access.Usage: # Sync all datasets uv run python -m dataio.scripts.sync_dataset_documentation # Sync specific dataset uv run python -m dataio.scripts.sync_dataset_documentation --dataset DS_EXAMPLE01 # Dry run (show what would be synced) uv run python -m dataio.scripts.sync_dataset_documentation --dry-run"""importargparseimportloggingimportosimportsysimportboto3importdotenvfromsqlalchemyimportcreate_engine,textfromsqlalchemy.ormimportsessionmakerfromdataio.api.services.base_serviceimportget_aws_access_key_idfromdataio.api.services.dataset_documentation_sync_serviceimport(get_dataset_documentation_status,sync_dataset_documentation,)dotenv.load_dotenv()# Configure logginglogging.basicConfig(level=logging.INFO,format="%(asctime)s - %(levelname)s - %(message)s")logger=logging.getLogger(__name__)
[docs]defget_database_url()->str:"""Build database URL from environment variables."""host=os.getenv("DB_HOST","localhost")port=os.getenv("DB_PORT","5432")user=os.getenv("DB_USER","postgres")password=os.getenv("DB_PASSWORD","")database=os.getenv("DB_NAME","catalogue")returnf"postgresql://{user}:{password}@{host}:{port}/{database}"
[docs]defmain():parser=argparse.ArgumentParser(description="Sync dataset documentation from S3 to database")parser.add_argument("--dataset","-d",help="Sync only this dataset ID (e.g., DS_EXAMPLE01)")parser.add_argument("--dry-run","-n",action="store_true",help="Show what would be synced without making changes")parser.add_argument("--verbose","-v",action="store_true",help="Enable verbose output")args=parser.parse_args()ifargs.verbose:logging.getLogger().setLevel(logging.DEBUG)# Initialize database connectionlogger.info("Connecting to database...")engine=create_engine(get_database_url())Session=sessionmaker(bind=engine)db_session=Session()# Initialize S3 clientlogger.info("Connecting to S3...")try:bucket=get_s3_client()exceptExceptionase:logger.error(f"Failed to connect to S3: {e}")sys.exit(1)# Get datasets to syncifargs.dataset:# Sync specific datasetdatasets=[(args.dataset,)]logger.info(f"Syncing documentation for dataset: {args.dataset}")else:# Get all dataset IDsresult=db_session.execute(text("SELECT ds_id FROM datasets ORDER BY ds_id"))datasets=result.fetchall()logger.info(f"Found {len(datasets)} datasets to sync")ifargs.dry_run:logger.info("=== DRY RUN MODE ===")# Sync each datasetresults={"total":len(datasets),"readme_found":0,"data_dict_found":0,"updated":0,"errors":0,}for(ds_id,)indatasets:logger.info(f"Processing {ds_id}...")try:status=get_dataset_documentation_status(db_session,bucket,ds_id)result=sync_dataset_documentation(db_session,bucket,ds_id,dry_run=args.dry_run,)exceptExceptionase:db_session.rollback()result={"ds_id":ds_id,"updated":False,"error":str(e),"has_remote_documentation":False,}status={"changed_fields":[]}logger.error(f" Error: {e}")if"readme_md"instatus["changed_fields"]orresult.get("has_remote_documentation"):results["readme_found"]+=1ifany(fieldinstatus["changed_fields"]forfieldin("data_dictionary_json","manifest_yaml","manifest_json"))orresult.get("has_remote_documentation"):results["data_dict_found"]+=1ifresult["updated"]:results["updated"]+=1ifresult["error"]:results["errors"]+=1# Print summarylogger.info("="*50)logger.info("SYNC SUMMARY")logger.info(f" Total datasets: {results['total']}")logger.info(f" READMEs found: {results['readme_found']}")logger.info(f" Data dictionaries: {results['data_dict_found']}")logger.info(f" Datasets updated: {results['updated']}")logger.info(f" Errors: {results['errors']}")db_session.close()ifresults["errors"]>0:sys.exit(1)