Mastering Python with Apache Beam: Architecting Scalable Data Pipelines

Ahmed Sayed
20 min readFeb 4, 2024

In the ever-evolving world of data processing, Apache Beam stands out as a powerful, unified model designed to handle both batch and streaming data across multiple execution environments. With its ability to abstract away the complexity of underlying data processing engines, Apache Beam provides developers with a versatile toolkit for building scalable, efficient, and maintainable data pipelines. This article embarks on a comprehensive journey through Apache Beam’s extensive feature set, from foundational concepts like ParDo transforms and windowing to advanced capabilities including side inputs and outputs, composite transforms, cross-language operations, and specialized IO connectors for diverse data sources and sinks. Whether you're new to Apache Beam or looking to expand your expertise, this guide offers valuable insights and examples to enhance your data processing projects, covering everything from leveraging schema-aware PCollections to testing streaming pipelines with TestStream. Join us as we explore the depths of Apache Beam, unlocking the potential to transform raw data into actionable insights with unparalleled flexibility and power.

· Introduction to Apache Beam
Key Concepts
· Use Cases
1. Real-time Data Processing
2. ETL (Extract, Transform, Load) Pipelines
3. Data Integration
4. Machine Learning (ML) Pipelines
5. Log and Event Data Analysis
6. Financial Data Processing
7. IoT Data Processing
8. Batch Data Processing
· Architecture
1. Pipeline
2. PCollection
3. PTransform
4. I/O Transforms
5. Runner
6. Windowing
7. Watermarks, Triggers, and Accumulation
8. SDKs
Architecture Benefits
· Getting Started with Apache Beam in Python
Basic Pipeline Example
· Simple Map Transformation
· Filter Transformation
· FlatMap Transformation
· GroupByKey Transformation
· Combine Transformation
· CoGroupByKey for Joining Datasets
· Windowing for Time-based Aggregations
· ParDo
· Basic Concept of ParDo
· Example: Filtering and Transforming Data
· Side Inputs and Outputs
· Composite Transforms
· Cross-language Transforms
· IO Transforms for Other Data Sources and Sinks: ParquetIO and JDBCIO
· Schema-Aware PCollections
· TestStream
· Creating a Basic Pipeline CSV to BQ
Basic Pipeline for Ingesting CSV Data
Custom Pipeline Options
Multiple Transforms on the Same PCollection
· Reading from Pub/Sub and Advanced Transforms
Pub/Sub Integration
Tumbling Windows Example
Sliding Windows Example
· Beam SQL
· Example: Analyzing Product Sales Data with Beam SQL
Step 1: Define the Sales Record Class
Step 2: Create a Pipeline and a PCollection of Sales Records
Step 3: Querying the Data with Beam SQL
Reading data from Google BigQuery
· Scenario
· Execute the Pipeline
· Streaming events for customer updates from Pub/Sub and updating two BigQuery tables
· Prerequisites
· Conceptual Approach
Important Notes

Apache Beam is a powerful open-source, unified model for defining both batch and streaming data-parallel processing pipelines. Its versatility allows developers to construct complex data processing tasks that can run on various execution engines, including Google Cloud Dataflow, Apache Flink, and Apache Spark. This article will introduce you to Apache Beam, guide you through its basic concepts, and then dive into integrating Apache Beam with Google Cloud Platform (GCP) components. We’ll cover transforming data and saving it to BigQuery, along with providing some valuable tips and tricks to enhance your data processing tasks.

Introduction to Apache Beam

Apache Beam provides a comprehensive SDK for writing data processing pipelines that abstract away the complexities of underlying execution engines. The core abstraction in Apache Beam is the Pipeline, which encapsulates a series of data transformations. Each transformation is defined using a PCollection (Parallel Collection) that represents a distributed data set, and PTransforms (Parallel Transforms), which describe operations that can be applied to each element in the PCollection.

Key Concepts

  • Pipeline: The top-level structure for a Beam program. It encapsulates all the data processing operations from start to finish.
  • PCollection: Represents a potentially large, distributed, and immutable collection of data elements that can be processed in parallel.
  • PTransform: Describes a data processing operation that transforms input PCollections into output PCollections.
  • Windowing: Manages how data is grouped into finite sets for processing based on timestamps.
  • Triggers: Determines when to materialize aggregated results in windowed computations.

Use Cases

Apache Beam is a versatile, open-source unified programming model designed for batch and streaming data processing. It allows developers to define and execute data processing pipelines that can run on various execution engines, such as Google Cloud Dataflow, Apache Flink, Apache Spark, and others. Here are some key use cases where Apache Beam excels:

1. Real-time Data Processing

Apache Beam’s ability to process streaming data in real time makes it ideal for applications that require immediate data analysis and decision-making. Use cases include real-time analytics, monitoring, and alerting systems where data from sources like IoT devices, social media feeds, or user interactions on websites needs to be processed on the fly to extract valuable insights or trigger immediate actions.

2. ETL (Extract, Transform, Load) Pipelines

Beam is extensively used to build ETL pipelines that extract data from various sources, transform it into a desirable format, and load it into a data store or data warehouse for further analysis. Its ability to handle both batch and streaming data makes it suitable for continuous ETL processes, where data needs to be ingested, cleaned, enriched, and made available for querying in near real-time.

3. Data Integration

Integrating data from disparate sources into a unified format is a common challenge for businesses. Apache Beam provides a robust framework for data integration tasks, allowing developers to create pipelines that can read, transform, and write data across different storage systems, APIs, and services, facilitating a seamless data flow between heterogeneous systems.

4. Machine Learning (ML) Pipelines

Apache Beam can be used to preprocess data for machine learning models, including feature extraction, normalization, and data splitting. It enables the construction of scalable ML pipelines that prepare data for training and inference in a distributed manner, supporting both batch processing for historical data and streaming processing for real-time model updates.

5. Log and Event Data Analysis

Analyzing log and event data generated by applications, servers, or devices is crucial for operational intelligence, security analysis, and user behavior understanding. Apache Beam pipelines can process large volumes of log data, perform aggregations, detect patterns, and generate summaries or detailed reports, aiding in troubleshooting, monitoring, and understanding user interactions.

6. Financial Data Processing

In the financial sector, Beam is used to process transactions, market data, and customer interactions in real time. Use cases include fraud detection, risk analysis, real-time trading analytics, and customer personalization. Its ability to handle complex event time semantics and windowing makes it particularly well-suited for applications where precise timing and ordering of events are critical.

7. IoT Data Processing

For IoT applications, Apache Beam can process data streams from sensors and devices to perform time-series analysis, anomaly detection, and aggregate statistics. This enables real-time monitoring and control of IoT systems, predictive maintenance, and the generation of insights from IoT data at scale.

8. Batch Data Processing

Despite the emphasis on streaming, Apache Beam is equally adept at batch processing, allowing for efficient analysis of large datasets stored in files, databases, or big data systems. Use cases include historical data analysis, data migration tasks, and large-scale data transformations where the entire dataset can be processed at once.

Apache Beam’s unified model simplifies the development of data processing pipelines across a wide range of scenarios, making it a powerful tool in the arsenal of data engineers and developers looking to tackle complex data processing challenges with a single, consistent API.

Architecture

Apache Beam provides a robust, flexible framework for building batch and streaming data processing pipelines that can run on various execution engines, such as Google Cloud Dataflow, Apache Flink, Apache Spark, and others. Its architecture is designed to abstract away the complexities of distributed computing, allowing developers to focus on defining the logic of their data processing tasks without being tied to the specifics of the underlying execution environment. Here’s an overview of the key components and concepts in Apache Beam’s architecture:

1. Pipeline

At the heart of Apache Beam is the concept of a Pipeline. A pipeline encapsulates the entire data processing task, from reading input data, applying a series of transformations (transforms), and writing output data. Pipelines are constructed using the Beam SDK in languages like Java, Python, or Go.

2. PCollection

PCollection stands for Parallel Collection. It represents a distributed dataset that the pipeline operates on. PCollections are immutable and can hold any type of data, including simple types like integers or strings, or more complex types like custom objects. They can be bounded (having a finite size, typically used in batch processing) or unbounded (having an infinite size, used in streaming processing).

3. PTransform

PTransform represents a data processing operation that transforms input PCollections into output PCollections. Common transforms include Map, Filter, GroupByKey, and Combine. Custom transforms can also be defined to encapsulate specific processing logic.

4. I/O Transforms

Apache Beam provides a set of built-in I/O transforms for reading from and writing to various data storage systems, such as Google Cloud Storage, BigQuery, Apache Kafka, and Apache Hadoop. These I/O transforms abstract the complexities of interacting with these systems, providing a unified API for data ingestion and egress.

5. Runner

The Runner is the component that executes a Beam pipeline on a specific execution environment. Beam pipelines are written in a way that is agnostic of the runner, enabling the same pipeline code to run on different backends. Supported runners include the Direct Runner (for local testing and development), Google Cloud Dataflow Runner, Apache Flink Runner, Apache Spark Runner, and others.

6. Windowing

Windowing is a mechanism in Apache Beam that allows for dividing a PCollection into finite sets, or windows, based on the timestamps of individual elements. This is particularly important in streaming processing for grouping data into meaningful chunks. Beam provides several windowing strategies, such as Fixed Windows, Sliding Windows, and Session Windows, to accommodate different data processing needs.

7. Watermarks, Triggers, and Accumulation

Beam uses watermarks to estimate the progress of event time within a pipeline, helping to handle late data. Triggers can be configured to determine when to emit results for a specific window, providing flexibility in handling data that arrives out of order or late. Accumulation policies define how repeated computations on the same window should be combined.

8. SDKs

Apache Beam offers SDKs in multiple programming languages, including Java, Python, and Go, allowing developers to use the language they are most comfortable with or that best fits their use case.

Architecture Benefits

The architecture of Apache Beam is designed to provide several key benefits:

  • Portability: Write once, run anywhere. Beam pipelines can run on any supported execution engine without modification.
  • Expressiveness: Beam’s API abstracts complex operations like windowing, grouping, and state management, making it easier to write complex data processing jobs.
  • Scalability: Designed for distributed processing, Beam can handle massive datasets with ease, scaling up or down based on the capabilities of the underlying execution engine.

By abstracting the details of distributed data processing, Apache Beam allows developers to focus on the semantics of their data processing tasks, writing scalable, portable pipelines that can be executed across a variety of computing environments.

Getting Started with Apache Beam in Python

To start using Apache Beam in Python, you’ll need to install the Apache Beam Python package. You can install it using pip:

pip install apache-beam[gcp]

This command installs Apache Beam with additional dependencies required for integration with GCP services.

Basic Pipeline Example

Here’s a simple example of a Beam pipeline written in Python:

import apache_beam as beam

def filter_words(x):
return len(x) > 5

with beam.Pipeline() as pipeline:
(pipeline
| 'Read Lines' >> beam.io.ReadFromText('input.txt')
| 'Find Long Words' >> beam.Filter(filter_words)
| 'Write Results' >> beam.io.WriteToText('output.txt'))

This pipeline reads text from an input file, filters out words shorter than six characters, and writes the results to an output file.

Data transformation is a fundamental aspect of processing and analyzing data, allowing you to convert data from one format or structure into another to meet various analytical needs. Apache Beam provides a wide range of built-in transformations for handling common data processing patterns. Below are examples illustrating different cases of data transformation, showcasing how you can leverage Apache Beam’s capabilities to address these scenarios.

Simple Map Transformation

Use Map to apply a simple function to each element in the collection, such as converting strings to uppercase.

import apache_beam as beam

with beam.Pipeline() as pipeline:
uppercase = (
pipeline
| 'Create' >> beam.Create(['hello', 'world'])
| 'ToUppercase' >> beam.Map(lambda x: x.upper())
| 'Print' >> beam.Map(print)
)

Filter Transformation

Filter out elements that don’t meet a certain condition, such as retaining only even numbers from a collection.

with beam.Pipeline() as pipeline:
filtered = (
pipeline
| 'Create Numbers' >> beam.Create([1, 2, 3, 4, 5])
| 'Filter Even' >> beam.Filter(lambda x: x % 2 == 0)
| 'Print Even' >> beam.Map(print)
)

FlatMap Transformation

Use FlatMap to apply a function to each element and flatten the result. This is useful for splitting elements or expanding each element into multiple elements.

with beam.Pipeline() as pipeline:
words = (
pipeline
| 'Create Sentence' >> beam.Create(['Hello world', 'Apache Beam'])
| 'Split Words' >> beam.FlatMap(lambda x: x.split(' '))
| 'Print Words' >> beam.Map(print)
)

GroupByKey Transformation

GroupByKey is used for grouping elements of a collection using a specified key. This is often used after applying a Map transformation to create a key-value pair.

with beam.Pipeline() as pipeline:
grouped = (
pipeline
| 'Create Key-Value' >> beam.Create([('fruit', 'apple'), ('vegetable', 'carrot'), ('fruit', 'banana')])
| 'GroupByKey' >> beam.GroupByKey()
| 'Print Grouped' >> beam.Map(print)
)

Combine Transformation

Use Combine to aggregate elements in a collection, such as summing numbers or concatenating strings within a group.

with beam.Pipeline() as pipeline:
summed = (
pipeline
| 'Create Numbers' >> beam.Create([1, 2, 3, 4])
| 'Sum' >> beam.CombineGlobally(sum)
| 'Print Sum' >> beam.Map(print)
)

CoGroupByKey for Joining Datasets

CoGroupByKey allows you to perform a relational join of two or more key-value PCollections.

with beam.Pipeline() as pipeline:
emails = pipeline | 'Create Emails' >> beam.Create([
('Alice', 'alice@example.com'),
('Bob', 'bob@example.com'),
])

phones = pipeline | 'Create Phones' >> beam.Create([
('Alice', '555-1234'),
('Bob', '555-4567'),
])

contact_info = (
{'emails': emails, 'phones': phones}
| 'CoGroupByKey' >> beam.CoGroupByKey()
| 'Print Contacts' >> beam.Map(print)
)

Windowing for Time-based Aggregations

Apply windowing to group and aggregate data based on time windows, useful for streaming data.

import apache_beam as beam
from apache_beam.transforms.window import FixedWindows

with beam.Pipeline() as pipeline:
windowed_counts = (
pipeline
| 'Create Timestamped Events' >> beam.Create([
beam.window.TimestampedValue('event', 1),
beam.window.TimestampedValue('event', 10),
# Assume more events with timestamps
])
| 'Window Into' >> beam.WindowInto(FixedWindows(10))
| 'Count Per Window' >> beam.CombineGlobally(count).without_defaults()
| 'Print Windowed Counts' >> beam.Map(print)
)

These examples illustrate the flexibility and power of Apache Beam for data transformation tasks, from simple mappings and filters to complex aggregations and joins, even accommodating streaming data with sophisticated windowing and time-based aggregations.

ParDo

ParDo is one of the most fundamental and versatile transformations in Apache Beam, allowing you to perform per-element processing over each element in a collection. It's akin to a map operation but with significantly more flexibility, enabling not just simple transformations but also more complex operations like filtering, aggregation of side inputs, producing multiple outputs, and accessing state and timers in streaming scenarios.

Basic Concept of ParDo

A ParDo transformation takes a PCollection as input and processes each element through a user-defined function to produce zero or more output elements, which are then collected into a new PCollection. The user-defined function is encapsulated within a DoFn (short for "Do Function"), which defines what to do with each element of the input PCollection.

Example: Filtering and Transforming Data

Let’s consider a simple example where we have a PCollection of strings representing sentences. We want to accomplish two tasks:

  1. Filter out sentences that contain a specific word (e.g., “Beam”).
  2. Transform each remaining sentence into its word count.

First, we define our Beam pipeline and create a sample PCollection:

import apache_beam as beam
# Sample data: A list of sentences.
sentences = [
'Apache Beam is a unified model',
'Designed for batch and streaming data processing',
'Beam pipelines can run on multiple execution engines',
'It provides a portable API across various languages'
]
# Define the pipeline
with beam.Pipeline() as pipeline:
sentences_pcoll = pipeline | 'Create PCollection' >> beam.Create(sentences)

Next, we define a DoFn for filtering sentences that contain the word "Beam":

class FilterSentencesDoFn(beam.DoFn):
def process(self, element):
if "Beam" in element:
yield element

And another DoFn for transforming sentences into their word count:

class CountWordsDoFn(beam.DoFn):
def process(self, element):
yield len(element.split())

Now, we apply these DoFns to our PCollection using the ParDo transform:

# Filter sentences containing the word "Beam"
filtered_sentences = sentences_pcoll | 'Filter Sentences' >> beam.ParDo(FilterSentencesDoFn())
    # Transform each sentence into its word count
word_counts = filtered_sentences | 'Count Words' >> beam.ParDo(CountWordsDoFn())
# Print the resulting word counts
word_counts | 'Print Word Counts' >> beam.Map(print)

In this example, the FilterSentencesDoFn DoFn filters sentences containing the word "Beam", and the CountWordsDoFn DoFn transforms each sentence into the count of its words. The final pipeline filters the sentences and then counts the words in the filtered sentences, printing the results.

This example demonstrates how ParDo can be used for both filtering and transforming elements in a PCollection. The flexibility of ParDo makes it a powerful tool for a wide range of data processing tasks in Apache Beam pipelines.

Side Inputs and Outputs

Side Inputs allow you to pass additional data to each element being processed in a ParDo transform, such as lookup tables or configurations.

Example: Filtering based on a dynamic threshold (Side Input)

Imagine you have a collection of numbers and want to filter those above a certain threshold, which is calculated or retrieved dynamically.

import apache_beam as beam

with beam.Pipeline() as pipeline:
# Main collection: A PCollection of numbers.
numbers = pipeline | 'Create Numbers' >> beam.Create([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])

# Side input: The threshold value, calculated or retrieved dynamically.
threshold = pipeline | 'Create Threshold' >> beam.Create([5]) | beam.Map(lambda x: x)

# Use side input in a ParDo transform
filtered_numbers = (
numbers
| 'Filter Numbers' >> beam.ParDo(lambda num, thresh: num if num > thresh else None, thresh=beam.pvalue.AsSingleton(threshold))
| 'Print Filtered Numbers' >> beam.Map(print)
)

Side Outputs allow a single ParDo transform to produce multiple output PCollections, which is useful for splitting a dataset based on certain conditions.

Composite Transforms

Composite Transforms bundle multiple transformations into a single reusable transform.

Example: A composite transform for filtering and counting

Suppose you want to filter strings based on a condition and then count the occurrences of each filtered string.

class FilterAndCount(beam.PTransform):
def expand(self, input_coll):
return (input_coll
| 'Filter' >> beam.Filter(lambda x: "Beam" in x)
| 'Count' >> beam.combiners.Count.PerElement())

with beam.Pipeline() as pipeline:
results = (
pipeline
| 'Create' >> beam.Create(['Apache Beam', 'Google Cloud Dataflow', 'Apache Beam I/O', 'Beam SQL'])
| 'Filter and Count' >> FilterAndCount()
| 'Print' >> beam.Map(print)
)

Cross-language Transforms

Cross-language Transforms allow using transforms written in a different language from the pipeline’s SDK language.

Example: This feature involves more setup, including running an expansion service for the target language. Detailed examples are available in the Apache Beam documentation, but here’s a conceptual approach:

  • Define a Python pipeline that uses a Java transform for KafkaIO.
  • Start a Java expansion service that serves the Java transform.
  • In your Python pipeline, use the ExternalTransform to call the Java transform through the expansion service.

IO Transforms for Other Data Sources and Sinks: ParquetIO and JDBCIO

ParquetIO Example: Reading Parquet files

with beam.Pipeline() as pipeline:
records = (
pipeline
| 'Read Parquet' >> beam.io.parquetio.ReadFromParquet('path/to/parquet/files')
| 'Print Records' >> beam.Map(print)
)

JDBCIO Example: Reading from a SQL database

from apache_beam import coders
from apache_beam.options.pipeline_options import PipelineOptions

pipeline_options = PipelineOptions()

with beam.Pipeline(options=pipeline_options) as pipeline:
records = (
pipeline
| 'Read from JDBC' >> beam.io.jdbc.ReadFromJdbc(
table_name='your_table',
driver_class_name='org.postgresql.Driver',
jdbc_url='jdbc:postgresql://your_db_url',
username='your_username',
password='your_password',
coder=coders.RowCoder(your_schema))
| 'Print Records' >> beam.Map(print)
)

Schema-Aware PCollections

Example: Using schema-aware PCollections

from apache_beam import schema

@schema.annotations.schema
class Person:
name: str
age: int

with beam.Pipeline() as pipeline:
people = (
pipeline
| 'Create People' >> beam.Create([Person('John', 30), Person('Jane', 25)])
| 'Filter by Age' >> beam.Filter(lambda person: person.age > 26)
| 'Print People' >> beam.Map(print)
)

TestStream

TestStream is used for testing streaming pipelines by simulating event time, watermarks, and late data.

Example: Using TestStream

from apache_beam.testing.test_stream import TestStream

with beam.Pipeline() as pipeline:
events = (
pipeline
| TestStream()
.advance_watermark_to(0)
.add_elements(['first event'])
.advance_watermark_to_infinity()
| 'Print Events' >> beam.Map(print)
)

Creating a Basic Pipeline CSV to BQ

Sample CSV Data: Product Inventory

First, let’s generate a simple CSV file named product_inventory.csv representing product inventory data.

ProductID,ProductName,Quantity
1,Apple,50
2,Banana,80
3,Carrot,60

Basic Pipeline for Ingesting CSV Data

The following Python code snippet shows how to create a basic Apache Beam pipeline to read this CSV data, transform it, and write the output to another file.

import apache_beam as beam

class ParseCsv(beam.DoFn):
def process(self, element):
ProductID, ProductName, Quantity = element.split(',')
return [{'ProductID': ProductID, 'ProductName': ProductName, 'Quantity': Quantity}]

with beam.Pipeline() as pipeline:
(pipeline
| 'Read CSV' >> beam.io.ReadFromText('product_inventory.csv', skip_header_lines=1)
| 'Parse CSV' >> beam.ParDo(ParseCsv())
| 'Write Output' >> beam.io.WriteToText('output.txt'))

Custom Pipeline Options

Apache Beam allows you to define custom pipeline options for flexibility.

from apache_beam.options.pipeline_options import PipelineOptions

class CustomOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--input', type=str, help='Path of the input file')
parser.add_value_provider_argument('--output', type=str, help='Path of the output file')

pipeline_options = PipelineOptions().from_args()
custom_options = pipeline_options.view_as(CustomOptions)

with beam.Pipeline(options=pipeline_options) as pipeline:
(pipeline
| 'Read CSV' >> beam.io.ReadFromText(custom_options.input, skip_header_lines=1)
| 'Parse CSV' >> beam.ParDo(ParseCsv())
| 'Write Output' >> beam.io.WriteToText(custom_options.output))

Multiple Transforms on the Same PCollection

You can apply multiple transformations to the same PCollection. For instance, after parsing the CSV, you might want to filter out products with low inventory and then format the remaining records.

def filter_low_inventory(element):
return int(element['Quantity']) > 50

with beam.Pipeline() as pipeline:
products = (pipeline
| 'Read CSV' >> beam.io.ReadFromText('product_inventory.csv', skip_header_lines=1)
| 'Parse CSV' >> beam.ParDo(ParseCsv()))

filtered_products = (products
| 'Filter Low Inventory' >> beam.Filter(filter_low_inventory))

(filtered_products
| 'Format Output' >> beam.Map(lambda x: f"{x['ProductID']},{x['ProductName']},{x['Quantity']}")
| 'Write Output' >> beam.io.WriteToText('filtered_inventory.txt'))

Reading from Pub/Sub and Advanced Transforms

Integrating with Pub/Sub, applying transformations, and utilizing windowing concepts are more advanced use cases. Due to the complexity and the extensive nature of these topics, I’ll outline a conceptual approach for each.

Pub/Sub Integration

# Assuming you have a Pub/Sub subscription to read from and a topic to write back processed data
with beam.Pipeline(options=pipeline_options) as pipeline:
(pipeline
| 'Read from Pub/Sub' >> beam.io.ReadFromPubSub(subscription='your_subscription')
| 'Process Data' >> beam.Map(process_function) # Define your process_function
| 'Write to Pub/Sub' >> beam.io.WriteToPubSub(topic='your_processed_topic'))

Tumbling Windows Example

window_size = 60  # Window size in seconds
with beam.Pipeline() as pipeline:
(pipeline
| 'Read from Pub/Sub' >> beam.io.ReadFromPubSub(subscription='your_subscription')
| 'Window into Fixed' >> beam.WindowInto(beam.window.FixedWindows(window_size))
| 'Process Windowed Data' >> beam.Map(process_function)
| 'Write Results' >> beam.io.WriteToText('windowed_output.txt'))

Sliding Windows Example

window_size = 60  # Window duration in seconds
window_period = 30 # Frequency of window evaluation
with beam.Pipeline() as pipeline:
(pipeline
| 'Read from Pub/Sub' >> beam.io.ReadFromPubSub(subscription='your_subscription')
| 'Window into Sliding' >> beam.WindowInto(beam.window.SlidingWindows(window_size, window_period))
| 'Process Windowed Data' >> beam.Map(process_function)
| 'Write Results' >> beam.io.WriteToText('sliding_window_output.txt'))

Beam SQL

Apache Beam SQL allows you to use SQL-like syntax to process data within your Beam pipelines, enabling you to perform complex data transformations and aggregations using familiar SQL queries. This feature is particularly useful for users who are already familiar with SQL and prefer to manipulate data using SQL statements rather than programming transformations in Java or Python.

Let’s create an example pipeline that demonstrates how to use Beam SQL to query a PCollection of data in Python. This example assumes you have Apache Beam and the Beam SQL extension installed. If the Beam SQL extension is not installed, you can add it by installing the apache-beam[sql] extra.

Example: Analyzing Product Sales Data with Beam SQL

First, we’ll generate a simple in-memory dataset representing sales records. Then, we’ll use Beam SQL to query this data.

Step 1: Define the Sales Record Class

Apache Beam needs to know the schema of the data it’s processing. We define a Python class to represent a sales record.

from typing import NamedTuple

class SaleRecord(NamedTuple):
product_id: int
product_name: str
quantity_sold: int
sale_date: str

# Define the Beam schema for this class
_beam_schema = {
'fields': [
{'name': 'product_id', 'type': 'INTEGER'},
{'name': 'product_name', 'type': 'STRING'},
{'name': 'quantity_sold', 'type': 'INTEGER'},
{'name': 'sale_date', 'type': 'STRING'}
]
}

Step 2: Create a Pipeline and a PCollection of Sales Records

We create a PCollection to hold our sales data.

import apache_beam as beam

sales_data = [
SaleRecord(1, 'Laptop', 10, '2023-01-01'),
SaleRecord(2, 'Smartphone', 20, '2023-01-02'),
SaleRecord(1, 'Laptop', 5, '2023-01-03'),
SaleRecord(3, 'Tablet', 7, '2023-01-04'),
]

with beam.Pipeline() as pipeline:
sales = pipeline | 'Create Sales Data' >> beam.Create(sales_data)

Step 3: Querying the Data with Beam SQL

Now, let’s use Beam SQL to query the sales PCollection to find the total quantity sold for each product.

from apache_beam.transforms.sql import SqlTransform

total_sales_per_product = (
sales
| 'Total Sales Per Product' >> SqlTransform("""
SELECT
product_name,
SUM(quantity_sold) as total_quantity_sold
FROM PCOLLECTION
GROUP BY product_name
""")
)

(total_sales_per_product
| 'Print Results' >> beam.Map(print))

In this example, the SqlTransform applies a SQL query to the sales PCollection. The query selects the product name and the sum of quantity_sold for each product, grouping the results by product_name. Finally, we print the results to the console.

Reading data from Google BigQuery

Reading data from Google BigQuery and processing it with Apache Beam allows you to perform complex analytics and transformations on large datasets. Here, we will create an advanced example that demonstrates how to read from a BigQuery table, perform some transformations, and then write the results back to another BigQuery table. This example assumes that you are familiar with Google Cloud Platform (GCP) services, specifically BigQuery, and that you have the necessary permissions to access BigQuery datasets and tables.

Scenario

Suppose we have a BigQuery table named ecommerce.sales with the following schema:

  • transaction_id: STRING
  • product_id: STRING
  • quantity: INTEGER
  • sale_date: DATE

We want to read this data, calculate the total sales quantity for each product, and then write the results to a new BigQuery table named analytics.product_sales_summary.

Create a Python script to define and execute the Beam pipeline. The script will:

  1. Read data from the ecommerce.sales BigQuery table.
  2. Calculate the total sales quantity for each product.
  3. Write the results to the analytics.product_sales_summary table.
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions

# Define custom options for our pipeline
class CustomOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--input_table', type=str, help='Input BigQuery table name')
parser.add_value_provider_argument('--output_table', type=str, help='Output BigQuery table name')

options = PipelineOptions()
custom_options = options.view_as(CustomOptions)

# Use GoogleCloudOptions to specify project and temp location
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = 'your-gcp-project-id'
google_cloud_options.region = 'your-gcp-region'
google_cloud_options.temp_location = 'gs://your-temp-bucket/temp'

# Define the pipeline
with beam.Pipeline(options=options) as pipeline:
sales_data = (
pipeline
| 'ReadFromBigQuery' >> beam.io.ReadFromBigQuery(table=custom_options.input_table.get())
)

total_sales_per_product = (
sales_data
| 'CountSalesPerProduct' >> beam.Map(lambda record: (record['product_id'], record['quantity']))
| 'SumQuantity' >> beam.CombinePerKey(sum)
| 'ToDict' >> beam.Map(lambda kv: {'product_id': kv[0], 'total_quantity': kv[1]})
)

(total_sales_per_product
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
custom_options.output_table.get(),
schema='product_id:STRING, total_quantity:INTEGER',
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
)

Execute the Pipeline

Run the pipeline script with the appropriate options:

python your_pipeline_script.py \
--input_table=your-gcp-project-id:ecommerce.sales \
--output_table=your-gcp-project-id:analytics.product_sales_summary \
--runner=DataflowRunner \
--project=your-gcp-project-id \
--region=your-gcp-region \
--temp_location=gs://your-temp-bucket/temp

This command specifies the Dataflow runner, which executes the pipeline on Google Cloud Dataflow, Google’s fully managed service for running Apache Beam pipelines. Replace your_pipeline_script.py, your-gcp-project-id, your-gcp-region, and gs://your-temp-bucket/temp with your specific details.

Streaming events for customer updates from Pub/Sub and updating two BigQuery tables

suppose we have an event for customer updates that contain new order ID and order value and we have two tables at Big Query one for customer updates history and one for the last customer updates and we stream that from pubSub so we need to stream event and update both tables

Here’s a conceptual overview and an example pipeline in Python using Apache Beam.

Prerequisites

  1. Google Cloud SDK installed and initialized.
  2. Apache Beam with GCP support installed in your environment.
  3. A Pub/Sub topic where customer update events are published.
  4. Two BigQuery tables:
  • customer_updates_history with fields customer_id, order_id, order_value, and timestamp.
  • last_customer_updates with fields customer_id, latest_order_id, latest_order_value, and timestamp.

Conceptual Approach

  1. Read from the Pub/Sub topic.
  2. Parse the incoming messages (assuming JSON format) to extract the customer update information.
  3. Write each update to the customer_updates_history table.
  4. Process the data to determine the latest update for each customer.
  5. Write or update the last_customer_updates table with the latest update for each customer.
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

class ParseEventFn(beam.DoFn):
def process(self, element):
import json
record = json.loads(element)
yield {
'customer_id': record['customer_id'],
'order_id': record['order_id'],
'order_value': record['order_value'],
'timestamp': record['timestamp']
}

def to_bq_format(element):
"""Prepare element for BigQuery insertion."""
return {
'customer_id': element['customer_id'],
'latest_order_id': element['order_id'],
'latest_order_value': element['order_value'],
'timestamp': element['timestamp']
}

pipeline_options = PipelineOptions(
runner='DataflowRunner',
project='your-gcp-project-id',
region='your-gcp-region',
temp_location='gs://your-temp-bucket/temp',
streaming=True
)

with beam.Pipeline(options=pipeline_options) as pipeline:
events = (
pipeline
| 'ReadFromPubSub' >> beam.io.ReadFromPubSub(subscription='projects/your-gcp-project-id/subscriptions/your-subscription-name')
| 'ParseEvent' >> beam.ParDo(ParseEventFn())
)

# Write each event to the history table
_ = (
events
| 'WriteToHistoryTable' >> beam.io.WriteToBigQuery(
table='your-gcp-project-id:dataset.customer_updates_history',
schema='customer_id:STRING, order_id:STRING, order_value:FLOAT, timestamp:TIMESTAMP',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
)
)

# Process to find the latest update for each customer
latest_updates = (
events
| 'WindowInto' >> beam.WindowInto(beam.window.GlobalWindows(), trigger=beam.transforms.trigger.AfterWatermark(), accumulation_mode=beam.transforms.trigger.AccumulationMode.DISCARDING)
| 'LatestPerKey' >> beam.CombinePerKey(max).with_input_types(beam.typehints.KV[str, float])
| 'ToBQFormat' >> beam.Map(to_bq_format)
)

# Write the latest update to the last updates table, upsert method requires BigQuery Storage Write API and careful schema design
_ = (
latest_updates
| 'WriteToLastUpdatesTable' >> beam.io.WriteToBigQuery(
table='your-gcp-project-id:dataset.last_customer_updates',
schema='customer_id:STRING, latest_order_id:STRING, latest_order_value:FLOAT, timestamp:TIMESTAMP',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE # Consider using WRITE_APPEND with a strategy to filter duplicates or upsert based on `customer_id`
)
)

This example uses the Dataflow runner for a streaming pipeline, which requires the streaming=True option. The ParseEventFn DoFn parses the incoming Pub/Sub messages. The pipeline writes each event to the customer_updates_history table directly and then processes the latest updates, which are then written to the last_customer_updates table.

Important Notes

  • The WRITE_TRUNCATE disposition in the last step is a placeholder. In practice, you would likely need a more sophisticated approach to ensure that only the latest update for each customer is reflected in the last_customer_updates table. This might involve using an intermediate step to read the current state of the table, compare timestamps, and only write newer updates.
  • Handling upserts in BigQuery through Beam requires careful consideration, especially in streaming pipelines. BigQuery’s Storage Write API supports streaming inserts and updates (upserts), which may be more appropriate for updating the last_customer_updates table with the latest state per customer.

--

--

Ahmed Sayed

Data Engineer Lead | Azure Data Solutions | Data Architect