Python Upload
This Python script uploads a zip file to Euno's endpoint.
Prerequisites
Python 3.6+
requests
library (pip install requests
)
Script
import requests
import os
import json
from pathlib import Path
# Configuration - replace with your values
endpoint_url = "https://api.app.euno.ai/accounts/YOUR_ACCOUNT_ID/integrations/YOUR_INTEGRATION_ID/prepare-upload"
integration_key = "your_key_here" # Replace this with the integration key
zip_file_path = "Archive.zip" # Path to your zip file
# Headers with authorization token
headers = {
"authorization": f"Bearer {integration_key}",
"content-type": "application/json"
}
def get_signed_url():
"""Request a signed upload URL for the zip file."""
body = {"filename": Path(zip_file_path).name}
response = requests.post(endpoint_url, headers=headers, json=body, timeout=30)
if response.status_code != 200:
print(f"Failed to obtain signed URL (status {response.status_code}): {response.text}")
return None
payload = response.json()
upload_url = (
payload.get("body", {}).get("upload", {}).get("url")
)
if not upload_url:
print("Signed upload URL not found in response:", json.dumps(payload)[:500])
return None
return upload_url, payload
def upload_file(upload_url):
"""Upload the zip file to the signed URL."""
try:
with open(zip_file_path, "rb") as fp:
response = requests.put(upload_url, data=fp, headers={"content-type": "application/zip"}, timeout=60)
if response.status_code not in (200, 201):
print(f"Upload failed (status {response.status_code}): {response.text[:200]}")
return False
return True
except Exception as e:
print("An error occurred during upload:", e)
return False
def main():
"""Main function to upload archive using prepare-upload flow."""
print("Starting archive upload process...")
# Check if file exists
if not os.path.exists(zip_file_path):
print(f"Error: file '{zip_file_path}' not found.")
return False
# Get signed URL
print("Requesting signed URL...")
result = get_signed_url()
if not result:
return False
upload_url, meta = result
print("Signed URL obtained โ uploading archive...")
# Upload file
success = upload_file(upload_url)
if success:
print("Upload succeeded! Prepare-upload response:")
print(json.dumps(meta, indent=2))
return True
else:
print("Upload failed.")
return False
if __name__ == "__main__":
main()
Usage
Configure the script: Update the
endpoint_url
andintegration_key
variables with your specific valuesInstall dependencies:
pip install requests
Run the script: Execute from your dbt project's
target
directory where the artifact files are locatedVerify upload: Check the script output for success/failure messages
Last updated