Building a Simple Data Pipeline with Mage: A Beginner’s Guide
Introduction:
In today’s dynamic world of data engineering, tools like Mage are becoming increasingly popular due to their simplicity and versatility in managing data pipelines. Mage provides an intuitive user interface that feels familiar, much like using notebooks, which makes it accessible even to those without deep technical expertise. In this article, we’ll take a step-by-step journey to build a basic data pipeline using Mage, starting from extracting data to aggregating and generating reports. So, let’s dive in and explore the power of Mage in simplifying the data pipeline process!
Why Choose Mage.ai Over Airflow?
Mage.ai offers a specialized and simplified approach to data pipeline management compared to Airflow. Here’s why Mage.ai may be the preferred option:
- Specialization: Mage.ai is tailored specifically for data pipelines, providing a focused solution for data engineering tasks.
- Ease of Use: With an intuitive interactive notebook UI, Mage.ai simplifies code writing in Python, R, and SQL, while handling exception handling and allowing for easy relationship creation between pipeline blocks.
- Real-time Visualization: Mage.ai allows for real-time preview and visualization of pipeline results, enhancing efficiency and enabling quick data analysis.
- Data Integration: Leveraging the Singer spec, Mage.ai seamlessly integrates with various third-party sources, simplifying data integration tasks.
- Customization: Users can write custom code in Python, R, or SQL for batch processing pipelines, tailoring solutions to specific needs.
- Streaming Capabilities: Mage.ai supports streaming pipelines with platforms like Kafka, Azure Event Hub, Google Cloud PubSub, Kinesis, and RabbitMQ, enabling real-time data processing and analysis.
Getting Started with Mage:
Before we delve into pipeline construction, let’s familiarize ourselves with Mage’s setup process. Follow these three simple steps to install and start Mage:
I Would Recommend Using Docker:
Managing dependencies manually (e.g. pip or conda) for data integrations can be quite challenging due to their complexity. To simplify this process and ensure a smooth experience, Docker is highly recommended for running Mage.
Why Docker?
Docker offers a lightweight and portable solution for managing dependencies, ensuring consistency across different environments. By encapsulating Mage and its dependencies within Docker containers, you can avoid compatibility issues and streamline the installation process.
Installation using docker: (Recommended)
docker run -it -p 6789:6789 -v $(pwd):/home/src mageai/mageai /app/run_app.sh mage start [project_name]
Installation using pip: (Not Recommended)
- Open a virtual environment.
- Install Mage using pip:
pip install mage-ai
- Initialize a new Mage project:
mage start [project name]
Building a pipeline
After installation, open http://localhost:6789 in your browser to start building your pipeline.
Running a pipeline
- Once you’ve built your pipeline in the tool, you can run it with the following command:
docker run -it -p 6789:6789 -v $(pwd):/home/src mageai/mageai mage run [project_name][pipeline]
- If you’re using pip, replace docker run… with mage.
Creating a new project
- If you want to create a different project with a different name, use the following command:
docker run -it -p 6789:6789 -v $(pwd):/home/src mageai/mageai mage init [project_name]
- If you’re using pip, replace docker run… with mage.
Understanding Mage’s Pipeline Blocks:
Mage simplifies pipeline creation through modular blocks, each serving a specific purpose:
- Data Loader: This block facilitates the extraction of data from diverse sources, whether local or remote. Utilizing Python or SQL, it provides templates to fetch and parse data, creating a DataFrame for further processing.
- Transformer: The Transformer block is instrumental in data transformation tasks such as filtering, aggregating, and cleansing. It ensures that data is standardized and prepared for downstream analysis.
- Data Exporter: With the Data Exporter block, Mage enables seamless loading of processed data into target destinations. Whether it’s a relational database like PostgreSQL or cloud storage like Google Cloud Storage, this block streamlines data transfer operations.
- Custom: For advanced users, Mage offers a Custom block where custom code can be executed, whether in Python, SQL, or other languages. This block provides flexibility for tailored data manipulation and processing tasks.
- Sensor: Sensors continuously evaluate conditions until met, crucial for real-time data processing scenarios or triggering actions based on specific events.
- Scratchpad: This block serves as a sandbox for experimenting with throwaway code, facilitating rapid prototyping without affecting the main pipeline.
Constructing Our Pipeline:
Let’s build a simple pipeline to illustrate Mage’s capabilities:
Data Loading: We’ll start by extracting data from a public API and parsing it into a DataFrame. Here’s a snippet of the Data Loader block:
1. io_config.yml Configuration:
- Ensure the io_config.yml file is structured properly to establish a connection to the PostgreSQL database.
2. Batch Pipeline:
Update the requirements.txt file with the necessary libraries.
Write a function called load_soccer_data to fetch data from a remote source or load it from disk.
3. Data Loader:
- The provided code seems to be loading data from the StatsBomb API for the final match of the 2024 AFCON. I’ll update the function name and parameters accordingly.
Test run:
# io_config.yml dev: POSTGRES_CONNECT_TIMEOUT: 10 POSTGRES_DBNAME: "{{ env_var('POSTGRES_DBNAME') }}" POSTGRES_SCHEMA: "{{ env_var('POSTGRES_SCHEMA') }}"POSTGRES_USER: "{{ env_var('POSTGRES_USER') }}" POSTGRES_PASSWORD: "{{ env_var('POSTGRES_PASSWORD') }}" POSTGRES_HOST: "{{ env_var('POSTGRES_HOST') }}"POSTGRES_PORT: "{{ env_var('POSTGRES_PORT') }}"Add a data loader block called: load_soccer_data
import io
import pandas as pd
import requests
from pandas import DataFrame
import soccerdata as sd
from statsbombpy import sb
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test
@data_loader
def load_data_from_api(**kwargs) -> DataFrame:
"""
Template for loading data from API
"""
events = sb.events(match_id=3923881) #Final match of the 2024 AFCON
return events
@test
def test_output(df) -> None:
"""
Template code for testing the output of the block.
"""
assert df is not None, 'The output is undefined'
This code retrieves data from the StatsBomb API regarding the final match of the 2024 AFCON between Côte d’Ivoire and Nigeria, identified by the match_id=3923881. It generates a data frame containing detailed information, comprising 2699 rows and 89 columns.
Data Exporting:
Next, we’ll export the raw data DataFrame to a PostgreSQL table using the Data Exporter block. Configuration details are stored in an io_config.yaml file within the project directory.
@transformer
def transform_df(df: DataFrame, *args, **kwargs) -> DataFrame:
"""
This function filters the DataFrame for shots and selects relevant columns.
"""
# Define the columns to keep
columns_to_keep = [
'player', 'team', 'location', 'shot_aerial_won',
'shot_body_part', 'shot_end_location', 'shot_first_time',
'shot_freeze_frame', 'shot_key_pass_id', 'shot_one_on_one',
'shot_outcome', 'shot_statsbomb_xg', 'shot_technique', 'shot_type'
]
# Filter rows where type is 'Shot'
shots_df = df[df['type'] == 'Shot']
# Select the relevant columns
transformed_df = shots_df[columns_to_keep]
return transformed_df
Data Aggregation:
Finally, we’ll create aggregated tables for reporting purposes using a Custom block with SQL queries. These queries aggregate data by region and geographical area.
@data_exporter
def export_data_to_postgres(df: DataFrame, **kwargs) -> None:
schema_name = 'statsbomb_data' # Specify the name of the schema to export data to
table_name = 'events_data' # Specify the name of the table to export data to
config_path = os.path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'dev'
with Postgres.with_config(ConfigFileLoader(config_path, config_profile)) as loader:
loader.export(
df,
schema_name,
table_name,
index=False, # Specifies whether to include index in exported table
if_exists='replace', # Specify resolution policy if table name already exists
)
To ensure that all data has been successfully loaded into PostgreSQL, we’ll create another data loader block, this time establishing a connection to the PostgreSQL database and selecting the development environment. We can then use a simple SQL command to retrieve the data from the relevant table.
Orchestrating the Pipeline:
Mage simplifies pipeline orchestration with its DAG (Directed Acyclic Graph) representation of blocks. As you add blocks to your pipeline, Mage automatically constructs the DAG, ready for scheduling. Triggers within Mage allow for configuring pipeline execution based on various conditions or time-based schedules.
Conclusion:
In this beginner’s guide, we’ve explored the fundamentals of building a data pipeline using Mage. From installation to pipeline construction and orchestration, Mage streamlines the entire process, empowering users to focus on data insights rather than infrastructure.
While we’ve covered a basic batch pipeline here, Mage offers a plethora of advanced capabilities for building data integrations, streaming pipelines, and more, awaiting exploration by curious minds in the data engineering community.