from prefect import flow, task, get_run_logger
from prefect_dbt.cli.commands import DbtCoreOperation
from prefect.blocks.system import Secret
import zipfile
import tempfile
import requests
import os
from pathlib import Path
from typing import List, Optional
# Configuration - Consider using Prefect Blocks for secrets
DBT_PROJECT_PATH = "/path/to/your/dbt/project"
DBT_PROFILES_PATH = "/path/to/your/dbt/profiles"
@task(retries=2, retry_delay_seconds=30)
def dbt_build(project_dir: str, profiles_dir: str) -> Path:
"""Run dbt build and docs generate commands with retry logic."""
logger = get_run_logger()
try:
logger.info(f"Running dbt commands in {project_dir}")
DbtCoreOperation(
commands=["dbt build", "dbt docs generate"],
project_dir=project_dir,
profiles_dir=profiles_dir
).run()
target_path = Path(project_dir, 'target')
logger.info(f"dbt commands completed successfully. Target path: {target_path}")
return target_path
except Exception as e:
logger.error(f"dbt build failed: {str(e)}")
raise
@task
def collect_and_zip_artifacts(path: Path) -> Optional[List]:
"""Collect dbt artifacts and create a zip file with validation."""
logger = get_run_logger()
# List of required artifact files
artifact_files = ['manifest', 'catalog', 'run_results', 'semantic_manifest']
# Check if all required files exist
missing_files = []
for f in artifact_files:
file_path = path / f'{f}.json'
if not file_path.exists():
missing_files.append(f'{f}.json')
if missing_files:
logger.warning(f"Missing artifact files: {missing_files}")
# Check if we have at least manifest and catalog (minimum required)
required_files = ['manifest', 'catalog']
missing_required = [f for f in required_files if f'{f}.json' in missing_files]
if missing_required:
logger.error(f"Missing required files: {missing_required}")
raise FileNotFoundError(f"Required artifact files missing: {missing_required}")
temp_file_name = tempfile.mktemp(suffix='.zip')
logger.info(f'Creating zip file: {temp_file_name}')
try:
with zipfile.ZipFile(temp_file_name, mode='w', compression=zipfile.ZIP_DEFLATED) as archive:
for f in artifact_files:
file_path = path / f'{f}.json'
if file_path.exists():
archive.write(file_path, arcname=f'{f}.json')
logger.info(f"Added {f}.json to zip")
else:
logger.warning(f"Skipping {f}.json - file not found")
# Read the zip file content for upload
with open(temp_file_name, 'rb') as zip_file:
zip_content = zip_file.read()
logger.info(f"Zip file created successfully. Size: {len(zip_content)} bytes")
files = [
('files', ('dbt_artifacts.zip', zip_content, 'application/zip')),
]
return files
finally:
# Clean up temporary file
if os.path.exists(temp_file_name):
os.unlink(temp_file_name)
@task(retries=3, retry_delay_seconds=10)
def send_via_webhook(files: List, webhook_url: str, token: str) -> int:
"""Send the zip file to Euno via webhook with retry logic."""
logger = get_run_logger()
logger.info(f"Sending ZIP file to webhook at: {webhook_url}")
headers = {
'Authorization': f'Bearer {token}'
}
try:
response = requests.post(
webhook_url,
headers=headers,
files=files,
timeout=300 # 5 minute timeout
)
logger.info(f"Response status code: {response.status_code}")
logger.info(f"Response headers: {dict(response.headers)}")
if response.status_code == 200:
logger.info("ZIP file sent successfully")
try:
logger.info(f"Response: {response.json()}")
except:
logger.info(f"Response text: {response.text}")
else:
logger.error(f"Upload failed with status {response.status_code}: {response.text}")
response.raise_for_status()
return response.status_code
except requests.exceptions.RequestException as e:
logger.error(f"Request failed: {str(e)}")
raise
@flow(log_prints=True)
def dbt_euno_upload_flow():
"""Enhanced Prefect flow to run dbt and upload artifacts to Euno."""
logger = get_run_logger()
# Get secrets from Prefect Blocks (recommended for production)
# webhook_url = Secret.load("euno-webhook-url").get()
# token = Secret.load("euno-integration-token").get()
# Or use environment variables as fallback
webhook_url = os.getenv("EUNO_WEBHOOK_URL", "your_webhook_url_here")
token = os.getenv("EUNO_TOKEN", "your_token_here")
if webhook_url == "your_webhook_url_here" or token == "your_token_here":
raise ValueError("Please set EUNO_WEBHOOK_URL and EUNO_TOKEN environment variables or Prefect Secrets")
logger.info("Starting enhanced dbt build and upload flow...")
try:
# Run dbt build and docs generate
target_path = dbt_build(project_dir=DBT_PROJECT_PATH, profiles_dir=DBT_PROFILES_PATH)
# Collect artifacts and create zip
artifacts = collect_and_zip_artifacts(target_path)
if artifacts:
# Send to Euno
status_code = send_via_webhook(artifacts, webhook_url, token)
logger.info(f"Flow completed successfully with status code: {status_code}")
return status_code
else:
logger.error("No artifacts collected, skipping upload")
return None
except Exception as e:
logger.error(f"Flow failed: {str(e)}")
raise
if __name__ == "__main__":
dbt_euno_upload_flow()