from __future__ import annotations
import json
import os
from pathlib import Path
import dotenv
import requests
from dataio.validate.contracts.models import DatasetKind, ValidationRequest
from dataio.validate.reports.models import ValidationResult
from dataio.validate.service import DataIOValidationService
[docs]
class DataIOValidator:
def __init__(
self,
*,
api_base_url: str | None = None,
api_key: str | None = None,
timeout: int = 30,
):
dotenv.load_dotenv(override=True)
self.service = DataIOValidationService()
self.api_base_url = (
api_base_url if api_base_url is not None else os.getenv("DATAIO_API_BASE_URL")
)
self.timeout = timeout
self.session = requests.Session()
if api_key or os.getenv("DATAIO_API_KEY"):
self.session.headers.update({"X-API-Key": api_key or os.getenv("DATAIO_API_KEY", "")})
[docs]
def validate_tabular(
self,
*,
manifest: str | bytes | dict,
data_files: dict[str, str],
deep_check: bool = False,
full_scan: bool = True,
max_rows: int | None = None,
extra_column_policy: str = "warn",
) -> ValidationResult:
if deep_check:
return self._validate_tabular_via_api(
manifest=manifest,
data_files=data_files,
deep_check=deep_check,
)
request = ValidationRequest(
dataset_kind=DatasetKind.TABULAR,
manifest_source=manifest,
data_files=data_files,
deep_check=deep_check,
full_scan=full_scan,
max_rows=max_rows,
extra_column_policy=extra_column_policy,
)
return self.service.validate(request)
[docs]
def validate_geojson(
self,
*,
manifest: str | bytes | dict,
data: str | bytes | dict,
deep_check: bool = False,
) -> ValidationResult:
if deep_check:
return self._validate_geojson_via_api(
manifest=manifest,
data=data,
deep_check=deep_check,
)
request = ValidationRequest(
dataset_kind=DatasetKind.GEOJSON,
manifest_source=manifest,
data=data,
deep_check=deep_check,
)
return self.service.validate(request)
[docs]
def _require_api_base_url(self) -> str:
if not self.api_base_url:
raise ValueError(
"deep_check requires API access. Set DATAIO_API_BASE_URL or pass api_base_url="
)
return self.api_base_url.rstrip("/")
[docs]
def _normalize_manifest_bytes(self, manifest: str | bytes | dict) -> bytes:
if isinstance(manifest, bytes):
return manifest
if isinstance(manifest, str):
try:
path = Path(manifest)
if path.exists():
return path.read_bytes()
except OSError:
pass
return manifest.encode("utf-8")
return json.dumps(manifest).encode("utf-8")
[docs]
def _normalize_tabular_payload(self, data_files: dict[str, str]) -> dict[str, str]:
normalized: dict[str, str] = {}
for table_name, source in data_files.items():
try:
path = Path(source)
if path.exists():
normalized[table_name] = path.read_text(encoding="utf-8")
continue
except OSError:
pass
normalized[table_name] = source
return normalized
[docs]
def _normalize_geojson_bytes(self, data: str | bytes | dict) -> bytes:
if isinstance(data, bytes):
return data
if isinstance(data, str):
try:
path = Path(data)
if path.exists():
return path.read_bytes()
except OSError:
pass
return data.encode("utf-8")
return json.dumps(data).encode("utf-8")
[docs]
def _validate_tabular_via_api(
self,
*,
manifest: str | bytes | dict,
data_files: dict[str, str],
deep_check: bool,
) -> ValidationResult:
base_url = self._require_api_base_url()
try:
response = self.session.post(
f"{base_url}/validate",
files={
"manifest_file": (
"manifest.yaml",
self._normalize_manifest_bytes(manifest),
"application/x-yaml",
),
},
data={
"dataset_kind": DatasetKind.TABULAR.value,
"data_files": json.dumps(self._normalize_tabular_payload(data_files)),
"deep_check": json.dumps(deep_check),
},
timeout=self.timeout,
)
response.raise_for_status()
return ValidationResult.model_validate(response.json())
except requests.RequestException as exc:
raise ValueError(
"deep_check API request failed. "
f"Verify DATAIO_API_BASE_URL points to a DataIO API with /validate available. "
f"Original error: {exc}"
) from exc
[docs]
def _validate_geojson_via_api(
self,
*,
manifest: str | bytes | dict,
data: str | bytes | dict,
deep_check: bool,
) -> ValidationResult:
base_url = self._require_api_base_url()
try:
response = self.session.post(
f"{base_url}/validate/geojson",
files={
"manifest_file": (
"manifest.yaml",
self._normalize_manifest_bytes(manifest),
"application/x-yaml",
),
"geojson": (
"data.geojson",
self._normalize_geojson_bytes(data),
"application/geo+json",
),
},
data={"deep_check": json.dumps(deep_check)},
timeout=self.timeout,
)
response.raise_for_status()
return ValidationResult.model_validate(response.json())
except requests.RequestException as exc:
raise ValueError(
"deep_check API request failed. "
f"Verify DATAIO_API_BASE_URL points to a DataIO API with /validate/geojson available. "
f"Original error: {exc}"
) from exc