Prefect Upload

This Prefect flow automates the process of running your dbt project, collecting artifacts, and uploading them to Euno.

Prerequisites

  • Python 3.8+

  • Prefect 2.0+

  • prefect-dbt package

  • requests library

Installation

pip install prefect prefect-dbt requests

Configuration

Before running the flow, update these configuration variables:

WEBHOOK_URL = "https://api.app.euno.ai/accounts/YOUR_ACCOUNT_ID/integrations/YOUR_INTEGRATION_ID/run"
EUNO_TOKEN = "your_integration_key_here"
DBT_PROJECT_PATH = "/path/to/your/dbt/project"
DBT_PROFILES_PATH = "/path/to/your/dbt/profiles"  # e.g., "/Users/username/.dbt"

Prefect Flow

from prefect import flow, task
from prefect_dbt.cli.commands import DbtCoreOperation
import zipfile
import tempfile
import requests
import os
from pathlib import Path

# Configuration - Update these values
WEBHOOK_URL = "https://api.app.euno.ai/accounts/YOUR_ACCOUNT_ID/integrations/YOUR_INTEGRATION_ID/run"
EUNO_TOKEN = "your_integration_key_here"
DBT_PROJECT_PATH = "/path/to/your/dbt/project"
DBT_PROFILES_PATH = "/path/to/your/dbt/profiles"

@task
def dbt_build(project_dir, profiles_dir):
    """Run dbt build and docs generate commands."""
    DbtCoreOperation(
        commands=["dbt build", "dbt docs generate"],
        project_dir=project_dir,
        profiles_dir=profiles_dir
    ).run()

    return Path(project_dir, 'target')

@task
def collect_and_zip_artifacts(path):
    """Collect dbt artifacts and create a zip file."""
    temp_file_name = tempfile.mktemp(suffix='.zip')
    print(f'Zipping artifacts to {temp_file_name}')
    
    # List of artifact files to include
    artifact_files = ['manifest', 'catalog', 'run_results', 'semantic_manifest']
    
    with zipfile.ZipFile(temp_file_name, mode='w', compression=zipfile.ZIP_DEFLATED) as archive:
        for f in artifact_files:
            file_path = os.path.join(path, f'{f}.json')
            if os.path.exists(file_path):
                archive.write(file_path, arcname=f'{f}.json')
                print(f"Added {f}.json to zip")
            else:
                print(f"Warning: {f}.json not found at {file_path}")
    
    # Read the zip file content for upload
    with open(temp_file_name, 'rb') as zip_file:
        zip_content = zip_file.read()
    
    # Clean up temporary file
    os.unlink(temp_file_name)
    
    files = [
        ('files', ('dbt_artifacts.zip', zip_content, 'application/zip')),
    ]
    return files

@task
def send_via_webhook(files):
    """Send the zip file to Euno via webhook."""
    print(f"Sending ZIP file to webhook at: {WEBHOOK_URL}")
    
    headers = {
        'Authorization': f'Bearer {EUNO_TOKEN}'
    }
    
    response = requests.post(
        WEBHOOK_URL, 
        headers=headers, 
        files=files
    )
    
    print(f"Response status code: {response.status_code}")
    print(f"Response headers: {response.headers}")
    print(f"Response text: {response.text}")
    
    response.raise_for_status()
    
    print("ZIP file sent successfully.")
    return response.status_code

@flow(log_prints=True)
def dbt_run_flow():
    """Main Prefect flow to run dbt and upload artifacts to Euno."""
    print("Starting dbt build and upload flow...")
    
    # Run dbt build and docs generate
    target_path = dbt_build(project_dir=DBT_PROJECT_PATH, profiles_dir=DBT_PROFILES_PATH)
    
    # Collect artifacts and create zip
    artifacts = collect_and_zip_artifacts(target_path)
    
    # Send to Euno
    status_code = send_via_webhook(artifacts)
    
    print(f"Flow completed with status code: {status_code}")
    return status_code

if __name__ == "__main__":
    dbt_run_flow()

Enhanced Flow with Error Handling

For production use, consider this enhanced version with better error handling and logging:

from prefect import flow, task, get_run_logger
from prefect_dbt.cli.commands import DbtCoreOperation
from prefect.blocks.system import Secret
import zipfile
import tempfile
import requests
import os
from pathlib import Path
from typing import List, Optional

# Configuration - Consider using Prefect Blocks for secrets
DBT_PROJECT_PATH = "/path/to/your/dbt/project"
DBT_PROFILES_PATH = "/path/to/your/dbt/profiles"

@task(retries=2, retry_delay_seconds=30)
def dbt_build(project_dir: str, profiles_dir: str) -> Path:
    """Run dbt build and docs generate commands with retry logic."""
    logger = get_run_logger()
    
    try:
        logger.info(f"Running dbt commands in {project_dir}")
        DbtCoreOperation(
            commands=["dbt build", "dbt docs generate"],
            project_dir=project_dir,
            profiles_dir=profiles_dir
        ).run()
        
        target_path = Path(project_dir, 'target')
        logger.info(f"dbt commands completed successfully. Target path: {target_path}")
        return target_path
        
    except Exception as e:
        logger.error(f"dbt build failed: {str(e)}")
        raise

@task
def collect_and_zip_artifacts(path: Path) -> Optional[List]:
    """Collect dbt artifacts and create a zip file with validation."""
    logger = get_run_logger()
    
    # List of required artifact files
    artifact_files = ['manifest', 'catalog', 'run_results', 'semantic_manifest']
    
    # Check if all required files exist
    missing_files = []
    for f in artifact_files:
        file_path = path / f'{f}.json'
        if not file_path.exists():
            missing_files.append(f'{f}.json')
    
    if missing_files:
        logger.warning(f"Missing artifact files: {missing_files}")
    
    # Check if we have at least manifest and catalog (minimum required)
    required_files = ['manifest', 'catalog']
    missing_required = [f for f in required_files if f'{f}.json' in missing_files]
    
    if missing_required:
        logger.error(f"Missing required files: {missing_required}")
        raise FileNotFoundError(f"Required artifact files missing: {missing_required}")
    
    temp_file_name = tempfile.mktemp(suffix='.zip')
    logger.info(f'Creating zip file: {temp_file_name}')
    
    try:
        with zipfile.ZipFile(temp_file_name, mode='w', compression=zipfile.ZIP_DEFLATED) as archive:
            for f in artifact_files:
                file_path = path / f'{f}.json'
                if file_path.exists():
                    archive.write(file_path, arcname=f'{f}.json')
                    logger.info(f"Added {f}.json to zip")
                else:
                    logger.warning(f"Skipping {f}.json - file not found")
        
        # Read the zip file content for upload
        with open(temp_file_name, 'rb') as zip_file:
            zip_content = zip_file.read()
        
        logger.info(f"Zip file created successfully. Size: {len(zip_content)} bytes")
        
        files = [
            ('files', ('dbt_artifacts.zip', zip_content, 'application/zip')),
        ]
        return files
        
    finally:
        # Clean up temporary file
        if os.path.exists(temp_file_name):
            os.unlink(temp_file_name)

@task(retries=3, retry_delay_seconds=10)
def send_via_webhook(files: List, webhook_url: str, token: str) -> int:
    """Send the zip file to Euno via webhook with retry logic."""
    logger = get_run_logger()
    
    logger.info(f"Sending ZIP file to webhook at: {webhook_url}")
    
    headers = {
        'Authorization': f'Bearer {token}'
    }
    
    try:
        response = requests.post(
            webhook_url, 
            headers=headers, 
            files=files,
            timeout=300  # 5 minute timeout
        )
        
        logger.info(f"Response status code: {response.status_code}")
        logger.info(f"Response headers: {dict(response.headers)}")
        
        if response.status_code == 200:
            logger.info("ZIP file sent successfully")
            try:
                logger.info(f"Response: {response.json()}")
            except:
                logger.info(f"Response text: {response.text}")
        else:
            logger.error(f"Upload failed with status {response.status_code}: {response.text}")
            response.raise_for_status()
        
        return response.status_code
        
    except requests.exceptions.RequestException as e:
        logger.error(f"Request failed: {str(e)}")
        raise

@flow(log_prints=True)
def dbt_euno_upload_flow():
    """Enhanced Prefect flow to run dbt and upload artifacts to Euno."""
    logger = get_run_logger()
    
    # Get secrets from Prefect Blocks (recommended for production)
    # webhook_url = Secret.load("euno-webhook-url").get()
    # token = Secret.load("euno-integration-token").get()
    
    # Or use environment variables as fallback
    webhook_url = os.getenv("EUNO_WEBHOOK_URL", "your_webhook_url_here")
    token = os.getenv("EUNO_TOKEN", "your_token_here")
    
    if webhook_url == "your_webhook_url_here" or token == "your_token_here":
        raise ValueError("Please set EUNO_WEBHOOK_URL and EUNO_TOKEN environment variables or Prefect Secrets")
    
    logger.info("Starting enhanced dbt build and upload flow...")
    
    try:
        # Run dbt build and docs generate
        target_path = dbt_build(project_dir=DBT_PROJECT_PATH, profiles_dir=DBT_PROFILES_PATH)
        
        # Collect artifacts and create zip
        artifacts = collect_and_zip_artifacts(target_path)
        
        if artifacts:
            # Send to Euno
            status_code = send_via_webhook(artifacts, webhook_url, token)
            logger.info(f"Flow completed successfully with status code: {status_code}")
            return status_code
        else:
            logger.error("No artifacts collected, skipping upload")
            return None
            
    except Exception as e:
        logger.error(f"Flow failed: {str(e)}")
        raise

if __name__ == "__main__":
    dbt_euno_upload_flow()

Usage

Option 1: Direct Execution

python your_flow_file.py

Option 2: Prefect Deployment

# Create a deployment
prefect deployment build your_flow_file.py:dbt_euno_upload_flow -n "dbt-euno-upload"

# Apply the deployment
prefect deployment apply dbt_euno_upload_flow-deployment.yaml

# Run the deployment
prefect deployment run dbt-euno-upload-flow/dbt-euno-upload

Option 3: Scheduled Runs

from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule

deployment = Deployment.build_from_flow(
    flow=dbt_euno_upload_flow,
    name="dbt-euno-daily",
    schedule=CronSchedule(cron="0 6 * * *"),  # Daily at 6 AM
)

deployment.apply()

Configuration with Prefect Blocks

For production use, store sensitive information using Prefect Blocks:

# Create secret blocks
prefect block register -m prefect.blocks.system

# Set your secrets
prefect config set PREFECT_API_URL="your-prefect-server-url"
from prefect.blocks.system import Secret

# Create secrets (run once)
Secret(value="your_webhook_url").save("euno-webhook-url")
Secret(value="your_integration_token").save("euno-integration-token")

Monitoring and Alerts

Add notification blocks for monitoring:

from prefect.blocks.notifications import SlackWebhook

@flow(log_prints=True)
def dbt_euno_upload_flow():
    try:
        # ... flow logic ...
        return status_code
    except Exception as e:
        # Send failure notification
        slack_webhook = SlackWebhook.load("your-slack-block")
        slack_webhook.notify(f"dbt Euno upload flow failed: {str(e)}")
        raise

Notes

  • The flow automatically handles temporary file cleanup

  • Retry logic is built into dbt commands and webhook uploads

  • Missing optional artifact files (like semantic_manifest.json) won't fail the flow

  • The enhanced version includes comprehensive logging and error handling

  • Consider using Prefect Blocks for storing sensitive configuration in production

Last updated