Example: Homegrown ELT Pipeline

This walkthrough shows how to use the Custom integration to document a homegrown Python ELT job that:

  1. Reads rows from a Snowflake source table

  2. Transforms the data in memory

  3. Writes the results to a different Snowflake table

The job runs on a schedule (for example a cron in some Linux machine). After each successful run, your script POSTs a small batch of observations to your Custom integration endpoint so Euno can represent the tables and the pipeline in lineage.

For supported property names and validation rules, see Custom Integration Properties Reference.

Before sending this example, register the custom resource type used for the Python job:

curl -X POST \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer YOUR_EUNO_ACCESS_TOKEN" \
  -d '{
    "type": "acme_transform_script",
    "display_name_singular": "ACME Transform Script",
    "display_name_plural": "ACME Transform Scripts",
    "description": "Homegrown ACME Python transform jobs"
  }' \
  https://api.app.euno.ai/accounts/YOUR_ACCOUNT_ID/resource_types

Scenario

Resource
Description

Source table

RAW_ORDERS in Snowflake β€” upstream data the script reads

Transform script

enrich_orders.py β€” your in-house Python job

Target table

ENRICHED_ORDERS in Snowflake β€” table the script writes

URIs used in this example

Use URIs that match how resources appear β€” or will appear β€” in your Euno account.

Resource
Example URI

Source table

snowflake.us_west_2.acme1234.analytics.raw_orders

Transform script

custom.acme_elt.enrich_orders

Target table

snowflake.us_west_2.acme1234.analytics.enriched_orders

Snowflake table URIs follow Euno's Universal Resource Identifier (URI). To find out the URI of an existing resource, you can find it in the side panel of that resource in Euno's data model screen.

Observations to emit

Emit three observations on each run (or whenever metadata changes):

1. Source Snowflake table

Observe the table your script reads from. Include database_technology so Euno knows it lives in Snowflake.

2. Pipeline script (acme_transform_script)

Observe the Python job itself. Use a custom resource type (acme_transform_script) and declare the source table in table_dependencies. Set icon_url so the resource appears with a pipeline icon in the data model instead of the generic custom-type icon.

3. Target Snowflake table

Observe the destination table. Use explicit_additional_dependencies to declare upstream lineage. Because this resource has type: "table", use explicit_additional_dependencies rather than table_dependencies (see Lineage properties).

Declare both the source table and the transform script as upstream dependencies:

Together, these observations tell Euno:

  • Both Snowflake tables exist in the model

  • The transform script reads from the source table (table_dependencies on the script)

  • The target table depends on the source table and the transform script (explicit_additional_dependencies)

  • The script displays a pipeline icon in the UI via icon_url (see Custom type icons)

Complete payload

Send all three observations in one POST as a JSON array:

Every observation must include type and name in properties. Observations with unsupported property keys are skipped; see the properties reference.

Example Python emitter

Call your Custom integration endpoint at the end of the ELT job:

Store the integration key in a secret manager or environment variable β€” never commit it to source control.

cURL equivalent

Put the JSON array above in observations.json, or inline the -d payload for a quick test.

Scaling to many pipelines

If you operate dozens of homegrown jobs, build observations programmatically and use the prepare-upload file flow to upload a .jsonl file with one observation per line when inline POST bodies become too large.

Last updated