Prefect Upload
This Prefect flow automates the process of running your dbt project, collecting artifacts, and uploading them to Euno.
Prerequisites
Python 3.8+
Prefect 2.0+
prefect-dbt
packagerequests
library
Installation
pip install prefect prefect-dbt requests
Configuration
Before running the flow, update these configuration variables:
WEBHOOK_URL = "https://api.app.euno.ai/accounts/YOUR_ACCOUNT_ID/integrations/YOUR_INTEGRATION_ID/run"
EUNO_TOKEN = "your_integration_key_here"
DBT_PROJECT_PATH = "/path/to/your/dbt/project"
DBT_PROFILES_PATH = "/path/to/your/dbt/profiles" # e.g., "/Users/username/.dbt"
Prefect Flow
from prefect import flow, task
from prefect_dbt.cli.commands import DbtCoreOperation
import zipfile
import tempfile
import requests
import os
from pathlib import Path
# Configuration - Update these values
WEBHOOK_URL = "https://api.app.euno.ai/accounts/YOUR_ACCOUNT_ID/integrations/YOUR_INTEGRATION_ID/run"
EUNO_TOKEN = "your_integration_key_here"
DBT_PROJECT_PATH = "/path/to/your/dbt/project"
DBT_PROFILES_PATH = "/path/to/your/dbt/profiles"
@task
def dbt_build(project_dir, profiles_dir):
"""Run dbt build and docs generate commands."""
DbtCoreOperation(
commands=["dbt build", "dbt docs generate"],
project_dir=project_dir,
profiles_dir=profiles_dir
).run()
return Path(project_dir, 'target')
@task
def collect_and_zip_artifacts(path):
"""Collect dbt artifacts and create a zip file."""
temp_file_name = tempfile.mktemp(suffix='.zip')
print(f'Zipping artifacts to {temp_file_name}')
# List of artifact files to include
artifact_files = ['manifest', 'catalog', 'run_results', 'semantic_manifest']
with zipfile.ZipFile(temp_file_name, mode='w', compression=zipfile.ZIP_DEFLATED) as archive:
for f in artifact_files:
file_path = os.path.join(path, f'{f}.json')
if os.path.exists(file_path):
archive.write(file_path, arcname=f'{f}.json')
print(f"Added {f}.json to zip")
else:
print(f"Warning: {f}.json not found at {file_path}")
# Read the zip file content for upload
with open(temp_file_name, 'rb') as zip_file:
zip_content = zip_file.read()
# Clean up temporary file
os.unlink(temp_file_name)
files = [
('files', ('dbt_artifacts.zip', zip_content, 'application/zip')),
]
return files
@task
def send_via_webhook(files):
"""Send the zip file to Euno via webhook."""
print(f"Sending ZIP file to webhook at: {WEBHOOK_URL}")
headers = {
'Authorization': f'Bearer {EUNO_TOKEN}'
}
response = requests.post(
WEBHOOK_URL,
headers=headers,
files=files
)
print(f"Response status code: {response.status_code}")
print(f"Response headers: {response.headers}")
print(f"Response text: {response.text}")
response.raise_for_status()
print("ZIP file sent successfully.")
return response.status_code
@flow(log_prints=True)
def dbt_run_flow():
"""Main Prefect flow to run dbt and upload artifacts to Euno."""
print("Starting dbt build and upload flow...")
# Run dbt build and docs generate
target_path = dbt_build(project_dir=DBT_PROJECT_PATH, profiles_dir=DBT_PROFILES_PATH)
# Collect artifacts and create zip
artifacts = collect_and_zip_artifacts(target_path)
# Send to Euno
status_code = send_via_webhook(artifacts)
print(f"Flow completed with status code: {status_code}")
return status_code
if __name__ == "__main__":
dbt_run_flow()
Enhanced Flow with Error Handling
For production use, consider this enhanced version with better error handling and logging:
from prefect import flow, task, get_run_logger
from prefect_dbt.cli.commands import DbtCoreOperation
from prefect.blocks.system import Secret
import zipfile
import tempfile
import requests
import os
from pathlib import Path
from typing import List, Optional
# Configuration - Consider using Prefect Blocks for secrets
DBT_PROJECT_PATH = "/path/to/your/dbt/project"
DBT_PROFILES_PATH = "/path/to/your/dbt/profiles"
@task(retries=2, retry_delay_seconds=30)
def dbt_build(project_dir: str, profiles_dir: str) -> Path:
"""Run dbt build and docs generate commands with retry logic."""
logger = get_run_logger()
try:
logger.info(f"Running dbt commands in {project_dir}")
DbtCoreOperation(
commands=["dbt build", "dbt docs generate"],
project_dir=project_dir,
profiles_dir=profiles_dir
).run()
target_path = Path(project_dir, 'target')
logger.info(f"dbt commands completed successfully. Target path: {target_path}")
return target_path
except Exception as e:
logger.error(f"dbt build failed: {str(e)}")
raise
@task
def collect_and_zip_artifacts(path: Path) -> Optional[List]:
"""Collect dbt artifacts and create a zip file with validation."""
logger = get_run_logger()
# List of required artifact files
artifact_files = ['manifest', 'catalog', 'run_results', 'semantic_manifest']
# Check if all required files exist
missing_files = []
for f in artifact_files:
file_path = path / f'{f}.json'
if not file_path.exists():
missing_files.append(f'{f}.json')
if missing_files:
logger.warning(f"Missing artifact files: {missing_files}")
# Check if we have at least manifest and catalog (minimum required)
required_files = ['manifest', 'catalog']
missing_required = [f for f in required_files if f'{f}.json' in missing_files]
if missing_required:
logger.error(f"Missing required files: {missing_required}")
raise FileNotFoundError(f"Required artifact files missing: {missing_required}")
temp_file_name = tempfile.mktemp(suffix='.zip')
logger.info(f'Creating zip file: {temp_file_name}')
try:
with zipfile.ZipFile(temp_file_name, mode='w', compression=zipfile.ZIP_DEFLATED) as archive:
for f in artifact_files:
file_path = path / f'{f}.json'
if file_path.exists():
archive.write(file_path, arcname=f'{f}.json')
logger.info(f"Added {f}.json to zip")
else:
logger.warning(f"Skipping {f}.json - file not found")
# Read the zip file content for upload
with open(temp_file_name, 'rb') as zip_file:
zip_content = zip_file.read()
logger.info(f"Zip file created successfully. Size: {len(zip_content)} bytes")
files = [
('files', ('dbt_artifacts.zip', zip_content, 'application/zip')),
]
return files
finally:
# Clean up temporary file
if os.path.exists(temp_file_name):
os.unlink(temp_file_name)
@task(retries=3, retry_delay_seconds=10)
def send_via_webhook(files: List, webhook_url: str, token: str) -> int:
"""Send the zip file to Euno via webhook with retry logic."""
logger = get_run_logger()
logger.info(f"Sending ZIP file to webhook at: {webhook_url}")
headers = {
'Authorization': f'Bearer {token}'
}
try:
response = requests.post(
webhook_url,
headers=headers,
files=files,
timeout=300 # 5 minute timeout
)
logger.info(f"Response status code: {response.status_code}")
logger.info(f"Response headers: {dict(response.headers)}")
if response.status_code == 200:
logger.info("ZIP file sent successfully")
try:
logger.info(f"Response: {response.json()}")
except:
logger.info(f"Response text: {response.text}")
else:
logger.error(f"Upload failed with status {response.status_code}: {response.text}")
response.raise_for_status()
return response.status_code
except requests.exceptions.RequestException as e:
logger.error(f"Request failed: {str(e)}")
raise
@flow(log_prints=True)
def dbt_euno_upload_flow():
"""Enhanced Prefect flow to run dbt and upload artifacts to Euno."""
logger = get_run_logger()
# Get secrets from Prefect Blocks (recommended for production)
# webhook_url = Secret.load("euno-webhook-url").get()
# token = Secret.load("euno-integration-token").get()
# Or use environment variables as fallback
webhook_url = os.getenv("EUNO_WEBHOOK_URL", "your_webhook_url_here")
token = os.getenv("EUNO_TOKEN", "your_token_here")
if webhook_url == "your_webhook_url_here" or token == "your_token_here":
raise ValueError("Please set EUNO_WEBHOOK_URL and EUNO_TOKEN environment variables or Prefect Secrets")
logger.info("Starting enhanced dbt build and upload flow...")
try:
# Run dbt build and docs generate
target_path = dbt_build(project_dir=DBT_PROJECT_PATH, profiles_dir=DBT_PROFILES_PATH)
# Collect artifacts and create zip
artifacts = collect_and_zip_artifacts(target_path)
if artifacts:
# Send to Euno
status_code = send_via_webhook(artifacts, webhook_url, token)
logger.info(f"Flow completed successfully with status code: {status_code}")
return status_code
else:
logger.error("No artifacts collected, skipping upload")
return None
except Exception as e:
logger.error(f"Flow failed: {str(e)}")
raise
if __name__ == "__main__":
dbt_euno_upload_flow()
Usage
Option 1: Direct Execution
python your_flow_file.py
Option 2: Prefect Deployment
# Create a deployment
prefect deployment build your_flow_file.py:dbt_euno_upload_flow -n "dbt-euno-upload"
# Apply the deployment
prefect deployment apply dbt_euno_upload_flow-deployment.yaml
# Run the deployment
prefect deployment run dbt-euno-upload-flow/dbt-euno-upload
Option 3: Scheduled Runs
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule
deployment = Deployment.build_from_flow(
flow=dbt_euno_upload_flow,
name="dbt-euno-daily",
schedule=CronSchedule(cron="0 6 * * *"), # Daily at 6 AM
)
deployment.apply()
Configuration with Prefect Blocks
For production use, store sensitive information using Prefect Blocks:
# Create secret blocks
prefect block register -m prefect.blocks.system
# Set your secrets
prefect config set PREFECT_API_URL="your-prefect-server-url"
from prefect.blocks.system import Secret
# Create secrets (run once)
Secret(value="your_webhook_url").save("euno-webhook-url")
Secret(value="your_integration_token").save("euno-integration-token")
Monitoring and Alerts
Add notification blocks for monitoring:
from prefect.blocks.notifications import SlackWebhook
@flow(log_prints=True)
def dbt_euno_upload_flow():
try:
# ... flow logic ...
return status_code
except Exception as e:
# Send failure notification
slack_webhook = SlackWebhook.load("your-slack-block")
slack_webhook.notify(f"dbt Euno upload flow failed: {str(e)}")
raise
Notes
The flow automatically handles temporary file cleanup
Retry logic is built into dbt commands and webhook uploads
Missing optional artifact files (like
semantic_manifest.json
) won't fail the flowThe enhanced version includes comprehensive logging and error handling
Consider using Prefect Blocks for storing sensitive configuration in production
Last updated