Ahmed Sayed
13 min readFeb 18, 2024

--

Introduction
What is DLT?
Getting Started
Basic DLT Pipeline
Complex Transformations
Destinations
Upsert
Incremental Load
Upsert and Incremental Load Together
Hypothetical Use Case
Modes
Example (Writing to Redshift):
All available Integrations
What is Staging Storage?
Types of Staging Storage
Placement in Data Pipelines
DLT State
DLT and Schemas
Schema and Data Contracts
DLT and Data Contracts
Key Deployment Scenarios
Deployment Considerations
Deploy a pipeline with GitHub Actions
How to add credentials
Load data from an API Example
Pandas vs DLT
Situations Where DLT May Excel
Conclusion

Introduction

Data engineers and analysts often wrestle with the complexities of data pipelines. Cleaning, transforming, and loading data from various sources into target destinations can be a time-consuming and error-prone task. The DLT Python library promises to streamline this process, providing a powerful, Pythonic, and backend-free way to build reliable data pipelines. In this article, we’ll dive into the key features of DLT, explore complex transformations, and demonstrate how to connect to popular databases and file formats.

What is DLT?

DLT (standing for “data load tool”) is an open-source Python library designed to radically simplify the creation and maintenance of data pipelines. Key advantages of DLT include:

  • Python-Centric:Leverages the expressiveness and familiarity of Python.
  • Backend-Free:Operates without the need for additional backends or containers, reducing complexity.
  • Automatic Schema Management:Infers schemas, handles updates, and manages schema evolution.
  • Declarative Syntax:Promotes the readability and maintainability of your pipelines.
  • Flexibility:Works seamlessly in diverse environments from Jupyter Notebooks and Airflow to AWS Lambda functions.

Getting Started

Install DLT using pip:

pip install dlt

Basic DLT Pipeline

Let’s illustrate a simple DLT pipeline that loads data from a CSV file into a PostgreSQL database:

import dlt
import pandas as pd

@dlt.table
def raw_data():
return dlt.read_csv(path="data/raw_data.csv")

@dlt.table
def cleaned_data():
return (
dlt.read("raw_data") # Read from 'raw_data' table defined above
.select("column1", "column2")
.where(dlt.column("column1") > 0)
)

@dlt.sink
def target_table():
dlt.write("cleaned_data", destination="postgresql", mode="overwrite")

In this example:

  1. raw_datareads a CSV file.
  2. cleaned_dataapplies transformations (selection, filtering).
  3. target_tablewrites the processed data to a PostgreSQL table.

Complex Transformations

DLT excels at handling complex data transformations using familiar Python paradigms:

@dlt.table
def processed_data():
return (
dlt.read("cleaned_data")
.when(lambda r: r["column1"] >= 100, "high_value") # Conditional logic
.when(lambda r: r["column1"] < 100, "low_value")
.fillna("unknown") # Replace missing values
.with_column("new_column", dlt.column("column1") * 2) # Derived column
)

Connecting to Databases

DLT supports connections to a wide range of popular databases:

  • SQL Server:
  • MySQL
  • MongoDB
  • Cassandra

Working with Files

DLT handles various file formats:

  • CSV/JSON:(Similar to basic example)
  • Parquet

Destinations

DLT supports writing to databases as shown, plus additional destinations like cloud storage (e.g., S3) and more.

Upsert

  • Meaning:The term “upsert” is a portmanteau of “update” and “insert.” It’s an operation that intelligently either updates an existing record in a database table or inserts a new record if it doesn’t already exist.
  • Why Upsert?
  • Implementation:Upserts usually rely on unique key columns.

Incremental Load

  • Meaning:An incremental load extracts only the new or changed data from a source system since the last data load process. This contrasts with a full load, which reloads the entire dataset each time.
  • Why Incremental Loads?
  • Implementation:Requires mechanisms to identify changes:

Upsert and Incremental Load Together

Pairing upsert and incremental load is a potent data pipeline strategy:

  1. Load New/Changed Data:An incremental load efficiently brings only the recently changed data into your pipeline.
  2. Upsert into Target:The upsert operation ensures that your target database is correctly updated or has new records inserted, handling both modification and insertion scenarios.

Hypothetical Use Case

Imagine you operate an e-commerce website. Here’s how these concepts work together:

  • New Orders:Customers continuously place new orders on your website.
  • Source System:Order data lands in a JSON file every five minutes (for simplicity; often this would be a database).
  • Target:You maintain a PostgreSQL database table (customer_orders) to store and analyze order information.
  • Goal:Efficiently updatecustomer_orderswith new/changed orders while preventing duplicates.

We’ll assume you have the necessary libraries installed (dlt, relevant database connectors).

Incremental Load

import dlt
import datetime

@dlt.table
def new_orders():
last_load_time = dlt.state("last_load_time") or datetime.datetime(2023, 1, 1) # Initial starting point
return (
dlt.read_json("orders_data/")
.where(dlt.column("order_timestamp") > last_load_time) # Assuming your JSON has an 'order_timestamp'
)

@dlt.run
def update_last_load_time():
dlt.update_state(last_load_time=datetime.datetime.now()) # Record state

Explanation:

  • new_orders: Loads recently placed orders based on anorder_timestampcomparison.
  • DLT state management handles storing and updating thelast_load_time.

Upsert into PostgreSQL

@dlt.sink(upsert_keys=["order_id"])  
def customer_orders():
dlt.write("new_orders", destination="postgresql", mode="upsert")

Explanation:

  • customer_orders: Defines the PostgreSQL sink.
  • upsert_keys=[“order_id”]: Ensures orders are updated if the order_id exists and inserted if new.

Putting It Together

You would likely schedule this pipeline to run every few minutes with an orchestration tool (e.g., Airflow) to achieve frequent, non-duplicated updates:

  1. The pipeline begins.
  2. new_ordersfetches only orders made since the previous run.
  3. customer_ordersupserts them into the database table.
  4. update_last_load_time saves the current timestamp for the next run.
  • Error Handling:Gracefully handling invalid data or database connection issues.
  • Complex Transformations:Additional cleaning, restructuring of JSON orders as needed.
  • CDC:In situations requiring extremely fine-grained changes, investigating tools for Change Data Capture compatible with your source database.

Modes

  • mode=”overwrite”
  • mode=”append”
  • mode=”update”
  • mode=”error”or mode=”errorifexists”

Considerations & Additional Notes

Choice Matters:Selecting the mode aligns with your desired outcome for the target data. Overwriting can lead to data loss if used in scenarios where preserving changes is vital.

Framework and Target Specifics:DLT likely inherits a lot of these options from underlying technologies like Spark. Some databases or file systems might have additional fine-grained options related to modes.

Combination with Other Operations:You often combine ‘mode’ with other DLT features likeupsert_keysfor finer control over how to merge updates.

Example Scenarios

  1. Daily Snapshot Overwrite:
  2. Adding to Transaction Log:
  3. Upserting Customer Dimension:

DLT does support cloud services like S3, Azure Blob, GCP, Redshift, and Snowflake in various capacities. Let’s break down how support differs slightly between them:

Supported as Sources and Destinations

  • Amazon S3:You can directly read data from S3 buckets (CSV, JSON, Parquet) using DLT and write transformed data back to S3 for storage.
  • Azure Blob Storage:Similar to S3, DLT can read various file formats from Azure Blob Storage and save results into it.
  • Google Cloud Storage (GCS):DLT works seamlessly with GCS for loading and storing data in common formats.

Example (Reading Parquet file from S3):

import dlt

@dlt.table
def data_from_s3():
return dlt.read_parquet("s3://my-bucket/data/raw_data.parquet")

Cloud Data Warehouses

  • Redshift:DLT supports Redshift as a sink, allowing you to write transformed data directly into Redshift tables.
  • Snowflake:DLT supports Snowflake similarly to Redshift — you can load your processed data pipelines output into Snowflake.

Example (Writing to Redshift):

@dlt.sink
def target_redshift_table():
dlt.write("transformed_data", destination="redshift", mode="overwrite")

Important Considerations

  • Authentication:You’ll need proper credentials and configurations to access these cloud services from within your DLT pipelines.
  • Data Formats:DLT is format-aware; support for specific formats (beyond the usual CSV, JSON, Parquet) might depend on the cloud service or data warehouse.
  • Connectors:Some services might require specific connectors within DLT for full interaction.

All available Integrations

What is Staging Storage?

A staging storage area (or staging database) serves as a temporary holding zone for raw or unprocessed data before it’s transformed and loaded into a target data warehouse or data lake. Here’s why it’s essential:

  • Decoupling Source and Target: The staging area breaks the dependency between the data source systems and the final target. This allows for data extraction and transformation without directly impacting either the source systems or the data warehouse.
  • Data Preparation: It provides a space to cleanse, standardize, and enrich your raw data before incorporating it into the more formal structure of the data warehouse. You can validate data quality, remove irrelevant information, and reconcile data from multiple sources.
  • Isolation: The staging area isolates incoming data. This enables error debugging and troubleshooting without contaminating the data warehouse with invalid or corrupt data.
  • Flexibility: A staging area provides the flexibility to modify the data transformation processes without affecting the data warehouse itself. You can make modifications, experiment with new processing logic, and refine your ETL jobs all within the staging zone.

Key Characteristics

  • Transient:Data in a staging area is temporary by nature. Once data has been cleansed, transformed, and loaded into the data warehouse, it’s often removed from the staging area.
  • Optimization:While performance is always a factor, your staging area often sacrifices some strictness around indexing and constraints that a formal data warehouse would prioritize. This facilitates faster loading and transformation in the staging process.

Types of Staging Storage

  • Databases:Traditional relational databases (e.g., PostgreSQL, MySQL) are frequently used for staging, providing familiar structures and SQL operations.
  • Data Lake Storage:Object storage systems in data lakes (e.g., S3 buckets, Azure Blob Storage) offer scalability and cost-effectiveness, well-suited for large volumes of raw data.
  • Flat Files:In simpler systems, staging folders holding structured files (CSV, JSON, etc.) are also an option.

Best Practices

  • Align to Workflow:Consider your ETL/ELT design and overall data architecture when selecting staging storage types.
  • Metadata Management:Maintain metadata around data lineage and timestamps to track the stages and origin of data in your staging area.
  • Cleanup:Develop clear policies for how long data remains in the staging area. Implement procedures to manage its size and maintain its integrity.

Placement in Data Pipelines

A typical data pipeline with a staging area looks like this:

  1. Extraction:Data is extracted from source systems (databases, applications, files, etc.).
  2. Loading (Staging):Raw data lands in the staging storage area.
  3. Transformation (Staging):Cleaning, transformation, filtering, and other preparation logic occurs within the staging environment.
  4. Loading (Data Warehouse):The transformed, standardized data is loaded into the data warehouse or data lake for analysis and reporting.

Here’s how DLT supports the concept of staging storage within its data pipeline framework:

1. Chaining Destinations

DLT allows you to chain multiple destinations together. This is the core mechanism that enables you to set up a staging area before moving the data to a final destination. A typical pattern:

  • 1st Destination: Staging
  • 2nd Destination: Data Warehouse/Lake

2. Intermediate Tables

While not mandatory, DLT encourages breaking your pipeline into logical tables (@dlt.table). When moving data from the staging destination to the final destination, subsequent pipelines steps treat these intermediate tables as sources.

3. Copy Jobs

Currently, DLT provides built-in support for creating copy jobs to certain targets — notably cloud data warehouses like Redshift. When a job is finalized, DLT may be able to automatically issue highly efficient bulk-load commands like COPY for Redshift to move the data from your staging filesystem into the data warehouse itself.

Example (Simplified)

import dlt

@dlt.table
def raw_data():
# Load your incoming data (from files, a database, etc.)

@dlt.table
def staged_data():
return (
dlt.read("raw_data")
# Perform transformations, cleaning within the staging context
)

@dlt.sink(destination="filesystem") # Stage data on your filesystem
def staging_output():
dlt.write("staged_data")

@dlt.sink(destination="redshift")
def warehouse_data():
dlt.write("staging_output", mode="overwrite") # Assuming an efficient Redshift load is set up

General Concept of State

In programming and computing, ‘state’ refers to the current condition or values of variables, objects, or a system at a given point in time. Here’s how it’s broadly relevant:

  • Stateful Applications:Many applications maintain state to track user sessions, user preferences, the progress of a transaction, or any other data that must be stored even as other aspects of the application change.
  • Stateful vs. Stateless:A stateless component’s output relies solely on its inputs, while a stateful component’s output may also depend on its internal state that has been preserved.

DLT State

DLT state is a specialized mechanism within the DLT library designed specifically for data pipelines. Here’s what it does and why it’s important:

  1. Pipeline Memory:DLT state functions as a key-value storage system for your pipelines. You can store and update pipeline variables (like a timestamp) throughout a pipeline and retrieve them in subsequent runs.
  2. Incremental Loads:The primary use case for DLT state is enabling incremental loads. Your pipeline can store a value likelast_load_time. The next time it runs, you can retrieve this timestamp and load only the new data generated since the last run.
  3. Resumability:If a pipeline execution fails, DLT state can help preserve progress and enable it to resume from where it left off.

Example

import dlt
import datetime

@dlt.table
def my_pipeline():
last_updated = dlt.state("last_updated") or datetime.datetime(2023, 1, 1)
# ... data loading and transformations that might use 'last_updated' value for filtering

@dlt.run
def update_state():
dlt.update_state(last_updated=datetime.datetime.now())

Key Points

  • Persistence:Depending on your DLT environment setup, states might be managed within the memory of your DLT process, external files, or cloud-based storage systems.
  • Synchronization:When pipelines run in a distributed setup, the management and synchronization of DLT state become a bit more complex to ensure consistency.

The distinction between broad ‘state’ in computation and DLT’s specialized state mechanism ensures your data pipelines can run more efficiently and remain trackable.

DLT and Schemas

  • Schema Inference:One core strength of DLT is its ability to infer schemas automatically from your source data. Whether you’re loading JSON, CSV, Parquet, or data from certain databases, DLT analyzes the structure and data types and generates an internal schema.
  • Schema Evolution:DLT handles schema evolution with a degree of flexibility. To a point, it can readily adapt to changes in your data source if fields are added or their data types evolve (e.g., expanding string lengths).
  • Explicit Control:DLT does allow you to provide explicit schema definitions when needed. This is useful when you want finer-grained control over data types or constraints.

Schema and Data Contracts

Schema contracts go beyond basic structure (which a normal schema defines) and introduce additional validation and quality rules:

  • Data Types:Traditional schemas enforce data types of a column (string, integer, date, etc.).
  • Constraints:Schema contracts can stipulate valid ranges (“age must be positive”), domain values (“country must be from a list of recognized countries”), or custom rules you define.
  • Metadata:Contracts might encapsulate business logic (“customer IDs must be unique”) or provide descriptive metadata to users of your data.

DLT and Data Contracts

DLT provides capabilities that align with data contracts:

  • Expectations:The @dlt.expect and @dlt.expect_valid decorators introduce validation checks within your pipeline. If data violates your expectations (e.g., a null value unexpectedly appears), DLT can alert you or fail the pipeline.
  • Pydantic Models:DLT seamlessly integrates with Pydantic. Define strict Pydantic models and use them in your DLT transformations for even more rigorous type checking and enforcement of validation rules.

Example (Combining DLT & Pydantic)

import dlt
from pydantic import BaseModel, validator

class Customer(BaseModel):
id: int
name: str
email: str

@validator('email')
def email_must_be_valid(cls, value):
# Add appropriate email validation logic here
return value

@dlt.table
def clean_customer_data():
# ... logic to read and potentially clean incoming data
return dlt.read(...).to_pydantic(Customer) # Enforce Pydantic validation

Schema Contracts for Robust Data

While DLT provides mechanisms to help you achieve it, the concept of strict schema contracts goes a bit beyond DLT’s core focus. Implementing these effectively often involves:

  • Upstream Contracts:Ensuring that the sources providing data to your DLT pipeline already align with strong data quality norms.
  • Monitoring and Tooling:Having systems in place to monitor if your evolving data continues to satisfy contracts and alert you of breaches.

Key Deployment Scenarios

DLT’s backend-free and Python-centric design gives it considerable flexibility, leading to several deployment approaches:

  1. Local Notebooks & Testing:During development and initial testing, it’s common to execute your DLT pipelines directly within environments like Jupyter Notebooks or from the command line.
  2. Scheduled Scripts:For simple pipelines, cron jobs or task schedulers (e.g., Windows Task Scheduler) can trigger your DLT pipelines as Python scripts on a predictable schedule.
  3. Orchestration Tools:For more complex setups with dependencies, error handling, and monitoring, dedicated orchestration tools become highly valuable. Consider:
  4. Serverless Functions:With cloud platforms like AWS Lambda or Azure Functions, it’s possible to trigger individual DLT pipeline steps upon events (like a new file arriving in a data lake bucket).

Deployment Considerations

  • Dependency Management:Ensure necessary libraries (DLT, database connectors, etc.) are packaged and available in your deployment environment.
  • Configuration:Credentials for databases, cloud storage, and other services need to be securely managed and accessible to your DLT pipeline.
  • Logging and Monitoring:Especially in production systems, implement tools to track pipeline runs, log errors, and alert you to issues. Your orchestration tool often helps here.
  • Data Locality:Consider how your source data, destinations, and your DLT pipeline’s compute environment are located. Ideally, minimize data movement across networks for efficiency.

Important Notes

  • DLT itself doesn’t have a formalized ‘deployment server’ component. Your deployment patterns leverage its Pythonic nature and orchestration tools.
  • Ideal solutions depend on your specific infrastructure, team preferences, and pipeline complexity.

Deploy a pipeline with GitHub Actions

  1. Install dlt from dlthub.com.
  2. Create a pipeline as per the guide on dlthub.com.
  3. Sign up for a GitHub account and use GitHub Actions for deployment.
  4. Add your dlt project directory to a GitHub repository by initializing a Git repo and pushing it to GitHub.
  5. Ensure your pipeline works by running your pipeline script (e.g.,python3 chess_pipeline.py).
  6. Initialize deployment by adding necessary dependencies and creating a GitHub workflow with dlt deploy command to run your pipeline script every 30 minutes.
  7. Add the secret values printed by the dlt deploy command to GitHub’s secrets settings.
  8. Add, commit, and push your files to GitHub to finish the setup.
  9. Monitor and manually trigger the pipeline through the provided GitHub link.

How to add credentials

Load data from an API Example

The question of whether DLT can replace Pandas requires a nuanced answer. It’s not a strictly “yes” or “no” situation. Here’s a breakdown of why:

Pandas vs DLT

  • Pandas:Pandas is an incredibly powerful and mature library primarily designed for in-memory data analysis and manipulation. It excels at tasks like:
  • DLT:DLT focuses on building efficient and reliable data pipelines. Its core functions center around:

Where They Overlap

Both DLT and Pandas share capabilities in areas like:

  • Data Transformation:Both provide ways to clean, filter, and modify data within their paradigms.
  • File Handling:They support loading data from common formats like CSV, JSON, etc.

Complimentary, Not Direct Replacements

It’s more accurate to see DLT and Pandas as complementary tools often used together within the broader data engineering workflow:

  1. Pandas for EDA:Pandas shines during initial data exploration, identifying patterns and anomalies to inform your DLT pipeline design.
  2. DLT for Building Pipelines:Once you understand your data, DLT helps you structure a resilient data pipeline — loading, transforming, and moving data to a database or data lake for scalable analysis.
  3. Pandas for Downstream Use:After cleaning and loading data via DLT into a suitable source, analysts may continue leveraging Pandas for in-depth analysis or model building.

Situations Where DLT May Excel

  • Big Data:If volume becomes too great for in-memory analysis with Pandas, DLT’s backend-free paradigm could provide a path to handling larger data using the compute of various backends (Spark, etc.).
  • Productionalizing ETL:DLT’s focus makes it easier to deploy pipelines and schedule their execution, crucial in reliable data operations.
  • Diverse Source Support:DLT might simplify connecting to multiple data sources with its pre-built connectors.

Let’s be clear: Pandas remains a powerhouse tool for many data manipulation and analysis tasks. DLT introduces advantages for certain production ETL use cases and scalability scenarios.

Conclusion

The DLT Python library offers a powerful and straightforward framework for building robust data pipelines. Its Python-first approach, ease of use, and flexibility make it an excellent choice for data professionals. If you’re looking to simplify your data workflows, DLT is definitely worth exploring!

--

--

Ahmed Sayed

Data Engineer Lead | Azure Data Solutions | Data Architect