> For the complete documentation index, see [llms.txt](https://docs.euno.ai/llms.txt). Markdown versions of documentation pages are available by appending `.md` to page URLs; this page is available as [Markdown](https://docs.euno.ai/sources/custom-integrations/custom-integration/example-homegrown-elt-pipeline.md).

# Example: Homegrown ELT Pipeline

This walkthrough shows how to use the Custom integration to document a **homegrown Python ELT job** that:

1. Reads rows from a Snowflake source table
2. Transforms the data in memory
3. Writes the results to a different Snowflake table

The job runs on a schedule (for example a cron in some Linux machine). After each successful run, your script POSTs a small batch of observations to your Custom integration endpoint so Euno can represent the tables and the pipeline in lineage.

For supported property names and validation rules, see [Custom Integration Properties Reference](/sources/custom-integrations/custom-integration/custom-integration-properties-reference.md).

Before sending this example, register the custom resource type used for the Python job:

```bash
curl -X POST \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer YOUR_EUNO_ACCESS_TOKEN" \
  -d '{
    "type": "acme_transform_script",
    "display_name_singular": "ACME Transform Script",
    "display_name_plural": "ACME Transform Scripts",
    "description": "Homegrown ACME Python transform jobs"
  }' \
  https://api.app.euno.ai/accounts/YOUR_ACCOUNT_ID/resource_types
```

## Scenario

| Resource             | Description                                                |
| -------------------- | ---------------------------------------------------------- |
| **Source table**     | `RAW_ORDERS` in Snowflake — upstream data the script reads |
| **Transform script** | `enrich_orders.py` — your in-house Python job              |
| **Target table**     | `ENRICHED_ORDERS` in Snowflake — table the script writes   |

```
┌─────────────────────┐     read      ┌──────────────────────────┐     write     ┌─────────────────────────┐
│  RAW_ORDERS         │ ────────────► │  enrich_orders.py        │ ────────────► │  ENRICHED_ORDERS        │
│  (Snowflake table)  │               │  (homegrown ELT script)  │               │  (Snowflake table)      │
└─────────────────────┘               └──────────────────────────┘               └─────────────────────────┘
```

## URIs used in this example

Use URIs that match how resources appear — or will appear — in your Euno account.

| Resource         | Example URI                                              |
| ---------------- | -------------------------------------------------------- |
| Source table     | `snowflake.us_west_2.acme1234.analytics.raw_orders`      |
| Transform script | `custom.acme_elt.enrich_orders`                          |
| Target table     | `snowflake.us_west_2.acme1234.analytics.enriched_orders` |

Snowflake table URIs follow Euno's [Universal Resource Identifier (URI)](/developer-reference/technical-concepts/universal-resource-identifier-uri.md). To find out the URI of an existing resource, you can find it in the side panel of that resource in Euno's data model screen.

## Observations to emit

Emit **three observations** on each run (or whenever metadata changes):

### 1. Source Snowflake table

Observe the table your script reads from. Include `database_technology` so Euno knows it lives in Snowflake.

```json
{
  "uri": "snowflake.us_west_2.acme1234.analytics.raw_orders",
  "properties": {
    "type": "table",
    "name": "RAW_ORDERS",
    "description": "Raw order events loaded from the operational database",
    "database_technology": "snowflake",
    "tags": ["acme_elt", "source"]
  }
}
```

### 2. Pipeline script (`acme_transform_script`)

Observe the Python job itself. Use a custom resource type (`acme_transform_script`) and declare the source table in `table_dependencies`. Set `icon_url` so the resource appears with a pipeline icon in the data model instead of the generic custom-type icon.

```json
{
  "uri": "custom.acme_elt.enrich_orders",
  "properties": {
    "type": "acme_transform_script",
    "name": "enrich_orders.py",
    "description": "Reads RAW_ORDERS, enriches rows in memory, and writes ENRICHED_ORDERS",
    "tags": ["acme_elt", "python"],
    "icon_url": "https://airflow.apache.org/images/airflow-icon.svg",
    "table_dependencies": [
      "snowflake.us_west_2.acme1234.analytics.raw_orders"
    ]
  }
}
```

### 3. Target Snowflake table

Observe the destination table. Use `explicit_additional_dependencies` to declare upstream lineage. Because this resource has `type: "table"`, use `explicit_additional_dependencies` rather than `table_dependencies` (see [Lineage properties](/sources/custom-integrations/custom-integration/custom-integration-properties-reference.md#lineage-properties)).

Declare both the source table and the transform script as upstream dependencies:

```json
{
  "uri": "snowflake.us_west_2.acme1234.analytics.enriched_orders",
  "properties": {
    "type": "table",
    "name": "ENRICHED_ORDERS",
    "description": "Order facts enriched with customer and product attributes",
    "database_technology": "snowflake",
    "tags": ["acme_elt", "target"],
    "explicit_additional_dependencies": [
      "snowflake.us_west_2.acme1234.analytics.raw_orders",
      "custom.acme_elt.enrich_orders"
    ]
  }
}
```

Together, these observations tell Euno:

* Both Snowflake tables exist in the model
* The transform script reads from the source table (`table_dependencies` on the script)
* The target table depends on the source table and the transform script (`explicit_additional_dependencies`)
* The script displays a pipeline icon in the UI via `icon_url` (see [Custom type icons](/sources/custom-integrations/custom-integration/custom-integration-properties-reference.md#custom-type-icons))

## Complete payload

Send all three observations in one POST as a JSON array:

```json
[
  {
    "uri": "snowflake.us_west_2.acme1234.analytics.raw_orders",
    "properties": {
      "type": "table",
      "name": "RAW_ORDERS",
      "description": "Raw order events loaded from the operational database",
      "database_technology": "snowflake",
      "tags": ["acme_elt", "source"]
    }
  },
  {
    "uri": "custom.acme_elt.enrich_orders",
    "properties": {
      "type": "acme_transform_script",
      "name": "enrich_orders.py",
      "description": "Reads RAW_ORDERS, enriches rows in memory, and writes ENRICHED_ORDERS",
      "tags": ["acme_elt", "python"],
      "icon_url": "https://airflow.apache.org/images/airflow-icon.svg",
      "table_dependencies": [
        "snowflake.us_west_2.acme1234.analytics.raw_orders"
      ]
    }
  },
  {
    "uri": "snowflake.us_west_2.acme1234.analytics.enriched_orders",
    "properties": {
      "type": "table",
      "name": "ENRICHED_ORDERS",
      "description": "Order facts enriched with customer and product attributes",
      "database_technology": "snowflake",
      "tags": ["acme_elt", "target"],
      "explicit_additional_dependencies": [
        "snowflake.us_west_2.acme1234.analytics.raw_orders",
        "custom.acme_elt.enrich_orders"
      ]
    }
  }
]
```

Every observation must include `type` and `name` in `properties`. Observations with unsupported property keys are skipped; see the [properties reference](/sources/custom-integrations/custom-integration/custom-integration-properties-reference.md).

## Example Python emitter

Call your Custom integration endpoint at the end of the ELT job:

```python
import os
import requests

EUNO_RUN_URL = os.environ["EUNO_CUSTOM_RUN_URL"]  # POST .../integrations/{id}/run — see Custom Integration setup
EUNO_INTEGRATION_KEY = os.environ["EUNO_INTEGRATION_KEY"]

SOURCE_TABLE_URI = "snowflake.us_west_2.acme1234.analytics.raw_orders"
TARGET_TABLE_URI = "snowflake.us_west_2.acme1234.analytics.enriched_orders"
SCRIPT_URI = "custom.acme_elt.enrich_orders"

SCRIPT_ICON_URL = "https://airflow.apache.org/images/airflow-icon.svg"

OBSERVATIONS = [
    {
        "uri": SOURCE_TABLE_URI,
        "properties": {
            "type": "table",
            "name": "RAW_ORDERS",
            "description": "Raw order events loaded from the operational database",
            "database_technology": "snowflake",
            "tags": ["acme_elt", "source"],
        },
    },
    {
        "uri": SCRIPT_URI,
        "properties": {
            "type": "acme_transform_script",
            "name": "enrich_orders.py",
            "description": "Reads RAW_ORDERS, enriches rows in memory, and writes ENRICHED_ORDERS",
            "tags": ["acme_elt", "python"],
            "icon_url": SCRIPT_ICON_URL,
            "table_dependencies": [SOURCE_TABLE_URI],
        },
    },
    {
        "uri": TARGET_TABLE_URI,
        "properties": {
            "type": "table",
            "name": "ENRICHED_ORDERS",
            "description": "Order facts enriched with customer and product attributes",
            "database_technology": "snowflake",
            "tags": ["acme_elt", "target"],
            "explicit_additional_dependencies": [SOURCE_TABLE_URI, SCRIPT_URI],
        },
    },
]


def emit_lineage_to_euno() -> None:
    response = requests.post(
        EUNO_RUN_URL,
        json=OBSERVATIONS,
        headers={
            "Authorization": f"Bearer {EUNO_INTEGRATION_KEY}",
            "Content-Type": "application/json",
        },
        timeout=60,
    )
    response.raise_for_status()


if __name__ == "__main__":
    # ... your Snowflake read / transform / write logic ...
    emit_lineage_to_euno()
```

Store the integration key in a secret manager or environment variable — never commit it to source control.

## cURL equivalent

```bash
curl -X POST \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer YOUR_INTEGRATION_KEY_HERE" \
  -d @observations.json \
  https://api.app.euno.ai/accounts/YOUR_ACCOUNT_ID/integrations/YOUR_INTEGRATION_ID/run
```

Put the JSON array above in `observations.json`, or inline the `-d` payload for a quick test.

## Scaling to many pipelines

If you operate dozens of homegrown jobs, build observations programmatically and use the [prepare-upload file flow](/sources/custom-integrations/custom-integration/handling-high-volume-of-observations.md) to upload a `.jsonl` file with one observation per line when inline POST bodies become too large.

## Related documentation

* [Custom Integration setup](/sources/custom-integrations/custom-integration.md)
* [Custom Integration Properties Reference](/sources/custom-integrations/custom-integration/custom-integration-properties-reference.md)
* [Handling High Volume of Observations](/sources/custom-integrations/custom-integration/handling-high-volume-of-observations.md)
* [Relationships](/developer-reference/technical-concepts/relationships.md)


---

# Agent Instructions
This documentation is published with GitBook. GitBook is the documentation platform designed so that both humans and AI agents can read, navigate, and reason over technical content effectively. Learn more at gitbook.com.

## Querying This Documentation
If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter, and the optional `goal` query parameter:

```
GET https://docs.euno.ai/sources/custom-integrations/custom-integration/example-homegrown-elt-pipeline.md?ask=<question>&goal=<endgoal>
```

`ask` is the immediate question: it should be specific, self-contained, and written in natural language.
`goal` is optional and describes the broader end goal you are ultimately trying to accomplish on behalf of the user. GitBook uses it to tailor the answer towards what is most useful for that goal.

The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
