from prefect import flow, task, get_run_logger
from prefect_dbt import PrefectDbtRunner, PrefectDbtSettings
from prefect.blocks.system import Secret
import zipfile
import tempfile
import requests
import os
import json
from pathlib import Path
from typing import List, Optional, Tuple, Dict, Any
DBT_PROJECT_PATH = "/path/to/your/dbt/project"
DBT_PROFILES_PATH = "/path/to/your/dbt/profiles"
# Configuration using Prefect Blocks for secrets.
# Can be replaced by other secret managers or env variables
@task
def get_euno_integration_credentials():
"""Retrieve Euno Integration credentials from Prefect Secret Blocks."""
return {
"webhook_url": Secret.load("euno-webhook-url").get(),
"token": Secret.load("euno-integration-token").get(),
}
@task
def dbt_build(project_dir: str, profiles_dir: str) -> Path:
"""Run dbt build and docs generate commands."""
logger = get_run_logger()
try:
logger.info(f"Running dbt commands in {project_dir}")
# Run dbt build
PrefectDbtRunner(
settings=PrefectDbtSettings(
project_dir=project_dir,
profiles_dir=profiles_dir,
)
).invoke(["build"])
# Run dbt docs generate
PrefectDbtRunner(
settings=PrefectDbtSettings(
project_dir=project_dir,
profiles_dir=profiles_dir,
)
).invoke(["docs", "generate"])
target_path = Path(project_dir, "target")
logger.info(f"dbt commands completed successfully. Target path: {target_path}")
return target_path
except Exception as e:
logger.error(f"dbt build failed: {str(e)}")
raise
@task
def collect_and_zip_artifacts(path: Path) -> Optional[str]:
"""Collect dbt artifacts and create a zip file with validation."""
logger = get_run_logger()
# List of required artifact files
artifact_files = ["manifest", "catalog", "run_results", "semantic_manifest"]
# Check if all required files exist
missing_files = []
for f in artifact_files:
file_path = path / f"{f}.json"
if not file_path.exists():
missing_files.append(f"{f}.json")
if missing_files:
logger.warning(f"Missing artifact files: {missing_files}")
# Check if we have at least manifest and catalog (minimum required)
required_files = ["manifest", "catalog"]
missing_required = [f for f in required_files if f"{f}.json" in missing_files]
if missing_required:
logger.error(f"Missing required files: {missing_required}")
raise FileNotFoundError(f"Required artifact files missing: {missing_required}")
temp_file_name = tempfile.mktemp(suffix=".zip")
logger.info(f"Creating zip file: {temp_file_name}")
try:
with zipfile.ZipFile(
temp_file_name, mode="w", compression=zipfile.ZIP_DEFLATED
) as archive:
for f in artifact_files:
file_path = path / f"{f}.json"
if file_path.exists():
archive.write(file_path, arcname=f"{f}.json")
logger.info(f"Added {f}.json to zip")
else:
logger.warning(f"Skipping {f}.json - file not found")
# Get file size for logging
file_size = os.path.getsize(temp_file_name)
logger.info(f"Zip file created successfully. Size: {file_size} bytes")
return temp_file_name
except Exception as e:
# Clean up on error
logger.error(f"Error creating zip file: {str(e)}")
if os.path.exists(temp_file_name):
os.unlink(temp_file_name)
raise
@task
def get_signed_url(webhook_url: str, token: str, filename: str) -> Tuple[str, Dict[Any, Any]]:
"""Request a signed upload URL for the zip file."""
logger = get_run_logger()
headers = {"Authorization": f"Bearer {token}"}
body = {"filename": filename}
logger.info(f"Requesting signed upload URL from: {webhook_url}")
response = requests.post(webhook_url, headers=headers, json=body, timeout=30)
logger.info(f"Signed URL request status code: {response.status_code}")
if response.status_code != 200:
logger.error(f"Failed to obtain signed URL (status {response.status_code}): {response.text}")
response.raise_for_status()
payload = response.json()
upload_url = payload.get("upload", {}).get("url")
if not upload_url:
logger.error(f"Signed upload URL not found in response: {json.dumps(payload)[:500]}")
raise ValueError("Signed upload URL not found in response")
logger.info("Signed URL obtained successfully")
return upload_url, payload
@task
def upload_file(zip_file_path: str, upload_url: str) -> bool:
"""Upload the zip file to the signed URL."""
logger = get_run_logger()
logger.info("Uploading ZIP file to signed URL")
try:
with open(zip_file_path, "rb") as fp:
response = requests.put(
upload_url,
data=fp,
headers={"content-type": "application/zip"},
timeout=300 # 5 minute timeout for large files
)
logger.info(f"Upload status code: {response.status_code}")
if response.status_code not in (200, 201):
logger.error(f"Upload failed (status {response.status_code}): {response.text[:200]}")
response.raise_for_status()
logger.info("ZIP file uploaded successfully")
return True
except Exception as e:
logger.error(f"An error occurred during upload: {e}")
raise
@flow(log_prints=True)
def dbt_euno_upload_flow():
"""Enhanced Prefect flow to run dbt and upload artifacts to Euno."""
logger = get_run_logger()
# Get secrets from Prefect Blocks (recommended for production)
euno_integration_credentials = get_euno_integration_credentials()
dbt_project_path = Secret.load("dbt-project-path").get() # or use DBT_PROJECT_PATH
dbt_profiles_path = Secret.load("dbt-profiles-path").get() # or use DBT_PROFILES_PATH
logger.info("Starting enhanced dbt build and upload flow...")
try:
# Run dbt build and docs generate
target_path = dbt_build(
project_dir=dbt_project_path,
profiles_dir=dbt_profiles_path,
)
# Collect artifacts and create zip
artifacts_zip_path = collect_and_zip_artifacts(target_path)
if artifacts_zip_path:
try:
# Get signed URL
upload_url, meta = get_signed_url(
euno_integration_credentials["webhook_url"],
euno_integration_credentials["token"],
Path(artifacts_zip_path).name
)
# Upload file
success = upload_file(artifacts_zip_path, upload_url)
if success:
logger.info("Upload succeeded! Prepare-upload response:")
logger.info(json.dumps(meta, indent=2))
return True
else:
logger.error("Upload failed.")
return False
finally:
# Clean up temporary zip file after upload attempt
if os.path.exists(artifacts_zip_path):
try:
os.unlink(artifacts_zip_path)
logger.info(f"Cleaned up temporary file: {artifacts_zip_path}")
except Exception as e:
logger.warning(f"Failed to clean up temporary file: {str(e)}")
else:
logger.error("No artifacts collected, skipping upload")
return None
except Exception as e:
logger.error(f"Flow failed: {str(e)}")
raise
if __name__ == "__main__":
dbt_euno_upload_flow()
python your_flow_file.py
# Create a deployment
prefect deployment build your_flow_file.py:dbt_euno_upload_flow -n "dbt-euno-upload"
# Apply the deployment
prefect deployment apply dbt_euno_upload_flow-deployment.yaml
# Run the deployment
prefect deployment run dbt-euno-upload-flow/dbt-euno-upload
# Create a work pool (one-time setup, required in Prefect 3.x)
prefect work-pool create dbt-pool --type process
# Deploy the flow
prefect deploy your_flow_file.py:dbt_euno_upload_flow --name dbt-euno-upload --pool dbt-pool
# Run the deployment
prefect deployment run dbt-euno-upload-flow/dbt-euno-upload
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule
deployment = Deployment.build_from_flow(
flow=dbt_euno_upload_flow,
name="dbt-euno-daily",
schedule=CronSchedule(cron="0 6 * * *"), # Daily at 6 AM
)
deployment.apply()