# Prefect Upload

This Prefect flow automates the process of running your dbt project, collecting artifacts, and uploading them to Euno.

## Prerequisites

* Python 3.8+
* Prefect 2.1+
* `prefect-dbt` package
* `requests` library

## Installation

```bash
pip install prefect prefect-dbt requests
```

## Configuration

Before running the flow, update these configuration variables:

```python
WEBHOOK_URL = "https://api.app.euno.ai/accounts/YOUR_ACCOUNT_ID/integrations/YOUR_INTEGRATION_ID/prepare-upload"
EUNO_TOKEN = "your_integration_key_here"
DBT_PROJECT_PATH = "/path/to/your/dbt/project"
DBT_PROFILES_PATH = "/path/to/your/dbt/profiles"
```

### With Prefect Blocks

```bash
# Create secret blocks
prefect block register -m prefect.blocks.system

# Set your secrets
prefect config set PREFECT_API_URL="your-prefect-server-url"
```

```python
from prefect.blocks.system import Secret

# Create secrets (run once)
Secret(value="your_webhook_url").save("euno-webhook-url")
Secret(value="your_integration_token").save("euno-integration-token")
Secret(value="/path/to/your/dbt/project").save("dbt-project-path")
Secret(value="/path/to/your/dbt/profiles").save("dbt-profiles-path")
```

## Prefect Flow

The following example is basd on `prefect-dbt` 0.7.0 and later. For earlier versions see: [prefect-dbt documentation](https://docs.prefect.io/integrations/prefect-dbt#prefect-dbt-0-6-6-and-earlier)

<pre class="language-python"><code class="lang-python">from prefect import flow, task, get_run_logger
from prefect_dbt import PrefectDbtRunner, PrefectDbtSettings
from prefect.blocks.system import Secret
import zipfile
import tempfile
import requests
import os
import json
from pathlib import Path
from typing import List, Optional, Tuple, Dict, Any

DBT_PROJECT_PATH = "/path/to/your/dbt/project"
DBT_PROFILES_PATH = "/path/to/your/dbt/profiles"

<strong># Configuration using Prefect Blocks for secrets. 
</strong># Can be replaced by other secret managers or env variables
@task
def get_euno_integration_credentials():
    """Retrieve Euno Integration credentials from Prefect Secret Blocks."""
    return {
        "webhook_url": Secret.load("euno-webhook-url").get(),
        "token": Secret.load("euno-integration-token").get(),
    }

@task
def dbt_build(project_dir: str, profiles_dir: str) -> Path:
    """Run dbt build and docs generate commands."""
    logger = get_run_logger()

    try:
        logger.info(f"Running dbt commands in {project_dir}")
        
        # Run dbt build
        PrefectDbtRunner(
            settings=PrefectDbtSettings(
                project_dir=project_dir,
                profiles_dir=profiles_dir,
            )
        ).invoke(["build"])
        
       # Run dbt docs generate
        PrefectDbtRunner(
            settings=PrefectDbtSettings(
                project_dir=project_dir,
                profiles_dir=profiles_dir,
            )
        ).invoke(["docs", "generate"])

        target_path = Path(project_dir, "target")
        logger.info(f"dbt commands completed successfully. Target path: {target_path}")
        return target_path

    except Exception as e:
        logger.error(f"dbt build failed: {str(e)}")
        raise


@task
def collect_and_zip_artifacts(path: Path) -> Optional[str]:
    """Collect dbt artifacts and create a zip file with validation."""
    logger = get_run_logger()

    # List of required artifact files
    artifact_files = ["manifest", "catalog", "run_results", "semantic_manifest"]

    # Check if all required files exist
    missing_files = []
    for f in artifact_files:
        file_path = path / f"{f}.json"
        if not file_path.exists():
            missing_files.append(f"{f}.json")

    if missing_files:
        logger.warning(f"Missing artifact files: {missing_files}")

    # Check if we have at least manifest and catalog (minimum required)
    required_files = ["manifest", "catalog"]
    missing_required = [f for f in required_files if f"{f}.json" in missing_files]

    if missing_required:
        logger.error(f"Missing required files: {missing_required}")
        raise FileNotFoundError(f"Required artifact files missing: {missing_required}")

    temp_file_name = tempfile.mktemp(suffix=".zip")
    logger.info(f"Creating zip file: {temp_file_name}")

    try:
        with zipfile.ZipFile(
            temp_file_name, mode="w", compression=zipfile.ZIP_DEFLATED
        ) as archive:
            for f in artifact_files:
                file_path = path / f"{f}.json"
                if file_path.exists():
                    archive.write(file_path, arcname=f"{f}.json")
                    logger.info(f"Added {f}.json to zip")
                else:
                    logger.warning(f"Skipping {f}.json - file not found")

        # Get file size for logging
        file_size = os.path.getsize(temp_file_name)
        logger.info(f"Zip file created successfully. Size: {file_size} bytes")

        return temp_file_name

    except Exception as e:
        # Clean up on error
        logger.error(f"Error creating zip file: {str(e)}")
        if os.path.exists(temp_file_name):
            os.unlink(temp_file_name)
        raise


@task
def get_signed_url(webhook_url: str, token: str, filename: str) -> Tuple[str, Dict[Any, Any]]:
    """Request a signed upload URL for the zip file."""
    logger = get_run_logger()
    headers = {"Authorization": f"Bearer {token}"}
    body = {"filename": filename}
    
    logger.info(f"Requesting signed upload URL from: {webhook_url}")
    response = requests.post(webhook_url, headers=headers, json=body, timeout=30)
    
    logger.info(f"Signed URL request status code: {response.status_code}")
    
    if response.status_code != 200:
        logger.error(f"Failed to obtain signed URL (status {response.status_code}): {response.text}")
        response.raise_for_status()
    
    payload = response.json()
    upload_url = payload.get("upload", {}).get("url")
    
    if not upload_url:
        logger.error(f"Signed upload URL not found in response: {json.dumps(payload)[:500]}")
        raise ValueError("Signed upload URL not found in response")
    
    logger.info("Signed URL obtained successfully")
    return upload_url, payload


@task
def upload_file(zip_file_path: str, upload_url: str) -> bool:
    """Upload the zip file to the signed URL."""
    logger = get_run_logger()
    
    logger.info("Uploading ZIP file to signed URL")
    try:
        with open(zip_file_path, "rb") as fp:
            response = requests.put(
                upload_url, 
                data=fp, 
                headers={"content-type": "application/zip"}, 
                timeout=300  # 5 minute timeout for large files
            )
        
        logger.info(f"Upload status code: {response.status_code}")
        
        if response.status_code not in (200, 201):
            logger.error(f"Upload failed (status {response.status_code}): {response.text[:200]}")
            response.raise_for_status()
        
        logger.info("ZIP file uploaded successfully")
        return True
        
    except Exception as e:
        logger.error(f"An error occurred during upload: {e}")
        raise


@flow(log_prints=True)
def dbt_euno_upload_flow():
    """Enhanced Prefect flow to run dbt and upload artifacts to Euno."""
    logger = get_run_logger()

    # Get secrets from Prefect Blocks (recommended for production)
    euno_integration_credentials = get_euno_integration_credentials()
    dbt_project_path = Secret.load("dbt-project-path").get() # or use DBT_PROJECT_PATH
    dbt_profiles_path = Secret.load("dbt-profiles-path").get() # or use DBT_PROFILES_PATH

    logger.info("Starting enhanced dbt build and upload flow...")

    try:
        # Run dbt build and docs generate
        target_path = dbt_build(
            project_dir=dbt_project_path,
            profiles_dir=dbt_profiles_path,
        )

        # Collect artifacts and create zip
        artifacts_zip_path = collect_and_zip_artifacts(target_path)

        if artifacts_zip_path:
            try:
                # Get signed URL
                upload_url, meta = get_signed_url(
                    euno_integration_credentials["webhook_url"],
                    euno_integration_credentials["token"],
                    Path(artifacts_zip_path).name
                )
                
                # Upload file
                success = upload_file(artifacts_zip_path, upload_url)
                
                if success:
                    logger.info("Upload succeeded! Prepare-upload response:")
                    logger.info(json.dumps(meta, indent=2))
                    return True
                else:
                    logger.error("Upload failed.")
                    return False
                
            finally:
                # Clean up temporary zip file after upload attempt
                if os.path.exists(artifacts_zip_path):
                    try:
                        os.unlink(artifacts_zip_path)
                        logger.info(f"Cleaned up temporary file: {artifacts_zip_path}")
                    except Exception as e:
                        logger.warning(f"Failed to clean up temporary file: {str(e)}")
        else:
            logger.error("No artifacts collected, skipping upload")
            return None

    except Exception as e:
        logger.error(f"Flow failed: {str(e)}")
        raise


if __name__ == "__main__":
    dbt_euno_upload_flow()

</code></pre>

## Notes

* The artifact files `manifest.json` and `catalog.json` are required. Other missing artifacts won't fail the flow
* dbt `build` can be replaced with dbt `compile`

## Usage

### Option 1: Direct Execution

```bash
python your_flow_file.py
```

### Option 2: Prefect Deployment

#### Prefect 2.10+

```bash
# Create a deployment
prefect deployment build your_flow_file.py:dbt_euno_upload_flow -n "dbt-euno-upload"

# Apply the deployment
prefect deployment apply dbt_euno_upload_flow-deployment.yaml

# Run the deployment
prefect deployment run dbt-euno-upload-flow/dbt-euno-upload
```

#### Prefect 3.x+

```bash
# Create a work pool (one-time setup, required in Prefect 3.x)
prefect work-pool create dbt-pool --type process

# Deploy the flow
prefect deploy your_flow_file.py:dbt_euno_upload_flow --name dbt-euno-upload --pool dbt-pool

# Run the deployment
prefect deployment run dbt-euno-upload-flow/dbt-euno-upload
```

### Option 3: Scheduled Runs

```python
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule

deployment = Deployment.build_from_flow(
    flow=dbt_euno_upload_flow,
    name="dbt-euno-daily",
    schedule=CronSchedule(cron="0 6 * * *"),  # Daily at 6 AM
)

deployment.apply()
```

## Monitoring and Alerts

Add notification blocks for monitoring:

```python
from prefect.blocks.notifications import SlackWebhook

@flow(log_prints=True)
def dbt_euno_upload_flow():
    try:
        # ... flow logic ...
        return status_code
    except Exception as e:
        # Send failure notification
        slack_webhook = SlackWebhook.load("your-slack-block")
        slack_webhook.notify(f"dbt Euno upload flow failed: {str(e)}")
        raise
```
