Python Upload

This Python script uploads a zip file to Euno's endpoint.

Prerequisites

  • Python 3.6+

  • requests library (pip install requests)

Script

import requests
import os
import json
from pathlib import Path


# Configuration - replace with your values
endpoint_url = "https://api.app.euno.ai/accounts/YOUR_ACCOUNT_ID/integrations/YOUR_INTEGRATION_ID/prepare-upload"
integration_key = "your_key_here"  # Replace this with the integration key
zip_file_path = "Archive.zip"  # Path to your zip file

# Headers with authorization token
headers = {
    "authorization": f"Bearer {integration_key}",
    "content-type": "application/json"
}


def get_signed_url():
    """Request a signed upload URL for the zip file."""
    body = {"filename": Path(zip_file_path).name}
    response = requests.post(endpoint_url, headers=headers, json=body, timeout=30)
    
    if response.status_code != 200:
        print(f"Failed to obtain signed URL (status {response.status_code}): {response.text}")
        return None
    
    payload = response.json()
    upload_url = (
        payload.get("body", {}).get("upload", {}).get("url")
    )
    
    if not upload_url:
        print("Signed upload URL not found in response:", json.dumps(payload)[:500])
        return None
    
    return upload_url, payload


def upload_file(upload_url):
    """Upload the zip file to the signed URL."""
    try:
        with open(zip_file_path, "rb") as fp:
            response = requests.put(upload_url, data=fp, headers={"content-type": "application/zip"}, timeout=60)
        
        if response.status_code not in (200, 201):
            print(f"Upload failed (status {response.status_code}): {response.text[:200]}")
            return False
        
        return True
        
    except Exception as e:
        print("An error occurred during upload:", e)
        return False


def main():
    """Main function to upload archive using prepare-upload flow."""
    print("Starting archive upload process...")
    
    # Check if file exists
    if not os.path.exists(zip_file_path):
        print(f"Error: file '{zip_file_path}' not found.")
        return False
    
    # Get signed URL
    print("Requesting signed URL...")
    result = get_signed_url()
    if not result:
        return False
    
    upload_url, meta = result
    print("Signed URL obtained โ€” uploading archive...")
    
    # Upload file
    success = upload_file(upload_url)
    
    if success:
        print("Upload succeeded! Prepare-upload response:")
        print(json.dumps(meta, indent=2))
        return True
    else:
        print("Upload failed.")
        return False


if __name__ == "__main__":
    main()

Usage

  1. Configure the script: Update the endpoint_url and integration_key variables with your specific values

  2. Install dependencies: pip install requests

  3. Run the script: Execute from your dbt project's target directory where the artifact files are located

  4. Verify upload: Check the script output for success/failure messages

Last updated