Python Upload
This Python script collects dbt artifacts, zips them, and uploads the zip file to Euno's endpoint.
Prerequisites
Python 3.6+
requests
library (pip install requests
)
Script
import requests
import os
import zipfile
import tempfile
from pathlib import Path
# Configuration - replace with your values
endpoint_url = "https://api.app.euno.ai/accounts/YOUR_ACCOUNT_ID/integrations/YOUR_INTEGRATION_ID/run"
integration_key = "your_key_here" # Replace this with the integration key
# Headers with authorization token
headers = {
"authorization": f"Bearer {integration_key}"
}
# List of files to include in the zip
file_names = ["catalog.json",
"manifest.json",
"run_results.json"]
def create_artifacts_zip():
"""Create a zip file containing all dbt artifacts."""
# Ensure all files exist before attempting upload
missing_files = [file for file in file_names if not os.path.exists(file)]
if missing_files:
print(f"Error: The following files are missing: {', '.join(missing_files)}")
return None
# Create temporary zip file
temp_zip = tempfile.mktemp(suffix='.zip')
print(f'Creating zip file: {temp_zip}')
with zipfile.ZipFile(temp_zip, 'w', compression=zipfile.ZIP_DEFLATED) as archive:
for file_name in file_names:
if os.path.exists(file_name):
archive.write(file_name, arcname=file_name)
print(f"Added {file_name} to zip")
return temp_zip
def upload_artifacts_zip(zip_file_path):
"""Upload the zip file to Euno's endpoint."""
try:
with open(zip_file_path, 'rb') as zip_file:
files = [('files', ('dbt_artifacts.zip', zip_file.read(), 'application/zip'))]
response = requests.post(
url=endpoint_url,
headers=headers,
files=files
)
# Clean up temporary file
os.unlink(zip_file_path)
# Check the response
if response.status_code == 200:
print("Zip file uploaded successfully!")
print("Response:", response.json())
return True
else:
print("Failed to upload zip file.")
print("Status Code:", response.status_code)
print("Response:", response.text)
return False
except Exception as e:
print("An error occurred:", e)
# Clean up temporary file on error
if os.path.exists(zip_file_path):
os.unlink(zip_file_path)
return False
def main():
"""Main function to create and upload artifacts."""
print("Starting dbt artifacts upload process...")
# Create zip file
zip_file_path = create_artifacts_zip()
if not zip_file_path:
return False
# Upload zip file
success = upload_artifacts_zip(zip_file_path)
if success:
print("Upload process completed successfully!")
else:
print("Upload process failed.")
return success
if __name__ == "__main__":
main()
Usage
Configure the script: Update the
endpoint_url
andintegration_key
variables with your specific valuesInstall dependencies:
pip install requests
Run the script: Execute from your dbt project's
target
directory where the artifact files are locatedVerify upload: Check the script output for success/failure messages
Notes
The script expects to be run from the directory containing the dbt artifacts (typically the
target
directory)The zip file is automatically cleaned up after upload
All three artifact files must be present for the upload to proceed
The script provides detailed logging for troubleshooting
Last updated