Nowadays, data is very important as they help us make informed decisions and gain insights into a wide range of topics. With the increasing amount of data generated every day, it has become crucial to automate data retrieval and processing. In this article, we will walk through the process of setting up a workflow using Apache Airflow within a Docker environment and YouTube Data API v3 to automatically retrieve daily trending videos from YouTube. The retrieved data will be stored in BigQuery and visualized using Looker Studio.
Apache Airflow is an open-source platform for automating tasks in a data pipeline. It allows data engineers to author, schedule, and monitor workflows. If you wish to learn more about the basics of Airflow, you can check out an article written by my colleague, Stane, here. The article also explains how to install and run Airflow on Docker, which is the approach we will be using for our workflow.
The YouTube Data API v3 is a powerful tool that allows us to fetch data from YouTube. With this API, we can retrieve information about channels, videos, playlists, and more. In this article, we will focus on using the API to fetch data about trending videos in Indonesia.
BigQuery is a fully-managed, cloud-native data warehouse that allows us to store and analyze large amounts of data. BigQuery offers a free tier with a usage limits of 1 TB of querying per month and 10 GB of storage per month, which is adequate for our purpose.
Finally, Looker Studio is a cloud-based business intelligence and analytics platform that allows us to create and share data visualizations. Looker Studio makes it easy to create interactive dashboards and reports, and it integrates seamlessly with BigQuery.
Now let’s start building our workflow!
Setting up the environment
If you are using Windows, I recommend you to use Windows Subsystem for Linux (WSL) to avoid any compatibility issues. You can follow the steps in this video to install WSL. For this project, I will be using Ubuntu 22.04.2 LTS.
First we will make a new project directory named airflow_youtube_project
. Then we will follow the steps in the article mentioned earlier to install and run Airflow on Docker (note: some modifications have been made to the original article to make it more suitable for our project.).
To run Airflow in Docker, you will need to have Docker and Docker Compose installed beforehand. To do so, simply download and install Docker Desktop in your local machine depending on your OS via the official Docker website. Once you have installed Docker Desktop, ensure that you have Docker Compose with v1.27.0 or newer installed by running this command
$ docker compose version Docker Compose version v2.7.0
Afterwards, you need to download
docker-compose.yaml
by executing this command in the project directory (airflow_youtube_project
for our case)$ curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.6.0/docker-compose.yaml'
If you open the
docker.compose.yaml
file, you will see that it defines many services and composes them in a proper way. Now we need to initialize the environment. If you are running Linux OS, you will need to execute these following commands beforehand$ mkdir ./dags ./logs ./plugins $ echo -e "AIRFLOW_UID=$(id -u)" > .env
Now you can run database migrations and create the first user account by using this command
$ docker compose up airflow-init
It will create an account with username
airflow
and passwordairflow
. Run Airflow by usingdocker compose up
, then navigate tolocalhost:8080
and login using the credentials.
Next we will create a new virtual environment and install the required packages in the virtual environment.
$ python3 -m venv airflow_youtube_env
$ source airflow_youtube_env/bin/activate
$ python3 -m pip install apache-airflow google-api-python-client python-dotenv isodate google-cloud-bigquery
Now we need to create a new project in Google Cloud Platform. First go to the Google Cloud Console, then click on the project dropdown menu and select New Project
. We will name the project as airflow-youtube-project
and click Create
. Wait a few moment until the project is sucessfully created. Note that the project ID may be different from the project name.
Once the project is created, select the project from the dropdown menu and go to the APIs & Services Dashboard. Click on Enable APIs and Services
, then search for Youtube Data API v3
and enable them. By default, the BigQuery API should already be enabled (in case it is not, please enable it as well). After enabling the Youtube Data API v3
, click on the Credentials
tab. Click on the Create Credentials
and select API key
. Copy the API key and save it in a .env
file in the airflow_youtube_project/dags
directory. We will use this API key later to fetch data from YouTube.
The content of the .env
file should be similar to this.
YOUTUBE_API_KEY=AIzaSyBiHhiP0_NeL1js7IC3w1ZPWaAt_gCdrUY
Next we will create a new service account and download the service account key. Go to the Service Accounts page and click on Create Service Account
. Give your service account a name and click Create and Continue
. Select BigQuery Admin
as the role, then click Done
. Click on Action
and select Manage keys
. Click Add key
and select Create new key
. Select JSON
as the key type and click Create
. Save the downloaded JSON file in the airflow_youtube_project/dags
directory. Let’s rename the json file to service_account_key.json
for simplicity. We will use this JSON file to authenticate our BigQuery client.
Note that we are saving the .env
and JSON files in the airflow_youtube_project/dags
directory. This is done so the Docker containers are able access those files as well. In the docker-compose.yaml
file, the volume sharing is specified using the volumes parameter, which maps the local directories to the directories inside the container. E.g. the files inside the airflow_youtube_project/dags
directory will be shared with the /opt/airflow/dags
directory inside the container.
volumes:
- ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags
- ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs
- ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins
Now let’s create a new BigQuery dataset using the python code below. This will also let us test if the service account key is working properly.
from google.cloud import bigquery
from google.oauth2 import service_account
# Set the path to your service account key file
key_path = 'dags/service_account_key.json'
# Set the credentials using the service account key
credentials = service_account.Credentials.from_service_account_file(
key_path,
scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
# Instantiate the BigQuery client with the credentials
client = bigquery.Client(credentials=credentials)
# Set the ID of the dataset to create
dataset_id = 'youtube'
# Construct a full Dataset object to be send to the API
dataset = bigquery.Dataset(f"{client.project}.{dataset_id}")
# Send the API request to create the dataset
dataset = client.create_dataset(dataset)
To confirm that the dataset has been created, we can run the following python code.
datasets = client.list_datasets()
for dataset in datasets:
print(dataset.dataset_id)
You should see youtube
in the output if the dataset has been created successfully.
By now your project structure should look similar to this.
airflow_youtube_project
├── airflow_youtube_env
├── dags
│ └── .env (containing YOUTUBE_API_KEY)
│ └── service_account_key.json
├── logs
├── plugins
├── .env (containing AIRFLOW_UID)
└── docker-compose.yaml
Fetching and Processing Data from YouTube Data API
Earlier we have obtained the API key for YouTube Data API v3, which enables us to fetch data from YouTube. Below is a sample code to fetch the first 50 results of the trending YouTube videos in Indonesia. I encourage you to test the code below to see the response returned by the API. If you wish to read more about the parameters of the API request, you can refer to this documentation page. And if you wish to read more about the details of the items returned in the response, you can refer to this documentation page.
from dotenv import load_dotenv
import os
from googleapiclient.discovery import build
# Load API key from .env file
load_dotenv("dags/.env")
api_key = os.environ.get("YOUTUBE_API_KEY")
# Create YouTube API client
youtube = build("youtube", "v3", developerKey=api_key)
# Fetch videos until max_results is reached or there are no more results
# Make API request for videos
request = youtube.videos().list(
part="snippet,contentDetails,statistics",
chart="mostPopular",
regionCode="id",
maxResults=50,
)
response = request.execute()
Now we will make a function to fetch a list of trending videos from the YouTube Data API. The function takes the region code, maximum number of results, and the path to the target file as the arguments. The function will extract the relevant information from the API response and save it in a list of dictionaries, which will then be written to a JSON file.
from dotenv import load_dotenv
import os
from googleapiclient.discovery import build
import json
from datetime import datetime
def fetch_trending_videos(region_code: str, max_results: int, target_file_path: str):
"""Fetches trending videos from YouTube for a specific region.
Args:
region_code: A string representing the ISO 3166-1 alpha-2 country code for the desired region.
max_results: An integer representing the maximum number of results to fetch.
target_file_path: A string representing the path to the file to be written.
"""
# Load API key from .env file
load_dotenv("dags/.env")
api_key = os.environ.get("YOUTUBE_API_KEY")
# Create YouTube API client
youtube = build("youtube", "v3", developerKey=api_key)
# Fetch videos until max_results is reached or there are no more results
videos_list = []
next_page_token = ""
while len(videos_list) < max_results and next_page_token is not None:
# Make API request for videos
request = youtube.videos().list(
part="snippet,contentDetails,statistics",
chart="mostPopular",
regionCode=region_code,
maxResults=50,
pageToken=next_page_token,
)
response = request.execute()
# Extract videos from response
videos = response.get("items", [])
# Update next_page_token for the next API request
next_page_token = response.get("nextPageToken", None)
# Extract relevant video details and append to list
infos = {'snippet':['title', 'publishedAt', 'channelId', 'channelTitle',
'description', 'tags', 'thumbnails', 'categoryId', 'defaultAudioLanguage'],
'contentDetails':['duration', 'caption'],
'statistics':['viewCount', 'likeCount', 'commentCount']}
now = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')
for video in videos:
video_details = {
'videoId': video["id"],
'trendingAt': now
}
for k in infos.keys():
for info in infos[k]:
# use try-except to handle missing info
try:
video_details[info] = video[k][info]
except KeyError:
video_details[info] = None
videos_list.append(video_details)
# Write fetched videos data to a json file
with open(target_file_path, "w") as f:
json.dump(videos_list, f)
If we run the function below, we should see a JSON file named tmp_file.json
in the current directory.
fetch_trending_videos("ID", 200, "tmp_file.json")
Below is a snippet of tmp_file.json
content.
[
{
"videoId": "aVrzOIDB5uc",
"trendingAt": "2023-05-15T02:47:17Z",
"title": "TERHARU🔥🔥🔥! Perjuangan Dramatis Indonesia (3) VS (2) Vietnam | SEA GAMES 32 CAMBODIA",
"publishedAt": "2023-05-13T12:12:24Z",
"channelId": "UCeM5Nksgv9_FXTuZ8jkPJPg",
"channelTitle": "RCTI - ENTERTAINMENT",
"description": "SEMUANYA TERHARU SEKALI!!!melihat garuda muda berjuang dikala hanya bermain dengan 10 orang pemain, namun tak membuat mental mereka gentar. Ini pertandingan yang seru dengan diwarnai dengan berbagai konflik, peluang emas & gol yang indah dari Timnas Indonesia!nn#SeaGames #Seagames32Cambodia #IndonesiannPerjuangan Atlet Atlet Indonesia dalam ajang Sea Games 2023 sudah dimulai✨! Pasukan Garuda akan memulai langkahnya di ajang Sea Games 32 Cambodia, Dukung perjuangan mereka untuk meraih kemenangan di Sea Games dan saksikan semua pertandingan Sea Games 32 Cambodia",
"tags": ["RCTI"],
"thumbnails": {
"default": {
"url": "https://i.ytimg.com/vi/aVrzOIDB5uc/default.jpg",
"width": 120,
"height": 90
},
"medium": {
...
},
"high": {
...
},
"standard": {
...
},
"maxres": {
...
}
},
"categoryId": "17",
"defaultAudioLanguage": "id",
"duration": "PT10M57S",
"caption": "false",
"viewCount": "3712309",
"likeCount": "47450",
"commentCount": "6770"
},
...
]
From the result above, we can see that some of the data need further processing. First we will focus on the categoryId
, which is a numeric value. We will need to convert it to the corresponding category name. We can find the category mapping by performing a YouTube API request as follows. We will also save the category mapping to a JSON file for later use.
from dotenv import load_dotenv
import os
from googleapiclient.discovery import build
import json
load_dotenv('dags/.env')
api_key = os.environ.get("YOUTUBE_API_KEY")
youtube = build("youtube", "v3", developerKey=api_key)
request = youtube.videoCategories().list(
part="snippet",
regionCode="ID"
)
response = request.execute()
categories = {item['id']:item['snippet']['title'] for item in response['items']}
# save categories to json file
with open('dags/categories.json', 'w') as f:
json.dump(categories, f)
The content of categories.json
should look like this.
{"1": "Film & Animation", "2": "Autos & Vehicles", "10": "Music", "15": "Pets & Animals", "17": "Sports", "18": "Short Movies", "19": "Travel & Events", "20": "Gaming", "21": "Videoblogging", "22": "People & Blogs", "23": "Comedy", "24": "Entertainment", "25": "News & Politics", "26": "Howto & Style", "27": "Education", "28": "Science & Technology", "30": "Movies", "31": "Anime/Animation", "32": "Action/Adventure", "33": "Classics", "34": "Comedy", "35": "Documentary", "36": "Drama", "37": "Family", "38": "Foreign", "39": "Horror", "40": "Sci-Fi/Fantasy", "41": "Thriller", "42": "Shorts", "43": "Shows", "44": "Trailers"}
Next, we will make a function to process the json data obtained from fetch_trending_videos
function. The function will do the following things:
- Convert the
duration
from ISO 8601 standard to seconds - Convert the
tags
list to string - Map the
categoryId
to the corresponding category name - Parse the thumbnail URL from
thumbnails
- Convert
viewCount
,likeCount
, andcommentCount
to integer; andcaption
to boolean - Save the processed data to a new json file
import json
import isodate
def data_processing(source_file_path: str, target_file_path: str):
"""Processes the raw data fetched from YouTube.
Args:
source_file_path: A string representing the path to the file to be processed.
target_file_path: A string representing the path to the file to be written.
"""
# Load the fetched videos data from the json file
with open(source_file_path, 'r') as f:
videos_list = json.load(f)
# Load the categories dictionary from the json file
with open('dags/categories.json', 'r') as f:
categories = json.load(f)
# Process the fetched videos data
for video in videos_list:
# Convert ISO 8601 duration to seconds
video['durationSec'] = int(isodate.parse_duration(video['duration']).total_seconds()) if video['duration'] is not None else None
del video['duration']
# Convert tags list to string
video['tags'] = ', '.join(video['tags']) if video['tags'] is not None else None
# Convert categoryId to category based on categories dictionary
video['category'] = categories.get(video['categoryId'], None) if video['categoryId'] is not None else None
del video['categoryId']
# Parse the thumbnail url
video['thumbnailUrl'] = video['thumbnails'].get('standard', {}).get('url', None) if video['thumbnails'] is not None else None
del video['thumbnails']
# Convert viewCount, likeCount, and commentCount to integer
video['viewCount'] = int(video['viewCount']) if video['viewCount'] is not None else None
video['likeCount'] = int(video['likeCount']) if video['likeCount'] is not None else None
video['commentCount'] = int(video['commentCount']) if video['commentCount'] is not None else None
# Convert caption to boolean
video['caption'] = True if video['caption'] == 'true' else False if video['caption'] == 'false' else None
# Save the processed videos data to a new file
with open(target_file_path, "w") as f:
json.dump(videos_list, f)
Now let’s try to run the data_processing
function to process the json file (tmp_file.json
) we generated earlier. We will save the processed data to a new file called tmp_file_processed.json
.
data_processing('tmp_file.json', 'tmp_file_processed.json')
Below is the snippet of tmp_file_processed.json
content. Notice the difference in the tags
, durationSec
(formerly duration
), category
(formerly categoryId
) and thumbnailUrl
(formerly thumbnails
). The data type for viewCount
, likeCount
, commentCount
, and caption
have also been converted appropriately.
[
{
"videoId": "aVrzOIDB5uc",
"trendingAt": "2023-05-15T02:47:17Z",
"title": "TERHARU🔥🔥🔥! Perjuangan Dramatis Indonesia (3) VS (2) Vietnam | SEA GAMES 32 CAMBODIA",
"publishedAt": "2023-05-13T12:12:24Z",
"channelId": "UCeM5Nksgv9_FXTuZ8jkPJPg",
"channelTitle": "RCTI - ENTERTAINMENT",
"description": "SEMUANYA TERHARU SEKALI!!!melihat garuda muda berjuang dikala hanya bermain dengan 10 orang pemain, namun tak membuat mental mereka gentar. Ini pertandingan yang seru dengan diwarnai dengan berbagai konflik, peluang emas & gol yang indah dari Timnas Indonesia!nn#SeaGames #Seagames32Cambodia #IndonesiannPerjuangan Atlet Atlet Indonesia dalam ajang Sea Games 2023 sudah dimulai✨! Pasukan Garuda akan memulai langkahnya di ajang Sea Games 32 Cambodia, Dukung perjuangan mereka untuk meraih kemenangan di Sea Games dan saksikan semua pertandingan Sea Games 32 Cambodia",
"tags": "RCTI",
"defaultAudioLanguage": "id",
"caption": false,
"viewCount": 3712309,
"likeCount": 47450,
"commentCount": 6770,
"durationSec": 657,
"category": "Sports",
"thumbnailUrl": "https://i.ytimg.com/vi/aVrzOIDB5uc/sddefault.jpg"
},
...
]
By now your project structure should look similar to this.
airflow_youtube_project
├── airflow_youtube_env
├── dags
│ └── .env (containing YOUTUBE_API_KEY)
│ └── service_account_key.json
│ └── categories.json
├── logs
├── plugins
├── .env (containing AIRFLOW_UID)
├── docker-compose.yaml
├── tmp_file.json
└── tmp_file_processed.json
Loading Data to BigQuery
Earlier we have created a BigQuery dataset called youtube
. Now we will create two new tables called trending_videos
and trending_videos_test
in the youtube
dataset. The trending_videos
table will be used to store the processed data generated by the Airflow pipeline, while the trending_videos_test
table will be used for testing purposes.
from google.oauth2 import service_account
from google.cloud import bigquery
# Set the path to your service account key file
key_path = 'dags/service_account_key.json'
# Set the credentials using the service account key
credentials = service_account.Credentials.from_service_account_file(
key_path,
scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
# Instantiate the BigQuery client with the credentials
client = bigquery.Client(credentials=credentials)
# Define the table schema
schema = [
bigquery.SchemaField('videoId', 'STRING'),
bigquery.SchemaField('trendingAt', 'TIMESTAMP'),
bigquery.SchemaField('title', 'STRING'),
bigquery.SchemaField('publishedAt', 'TIMESTAMP'),
bigquery.SchemaField('channelId', 'STRING'),
bigquery.SchemaField('channelTitle', 'STRING'),
bigquery.SchemaField('description', 'STRING'),
bigquery.SchemaField('tags', 'STRING'),
bigquery.SchemaField('category', 'STRING'),
bigquery.SchemaField('defaultAudioLanguage', 'STRING'),
bigquery.SchemaField('durationSec', 'INTEGER'),
bigquery.SchemaField('caption', 'BOOLEAN'),
bigquery.SchemaField('viewCount', 'INTEGER'),
bigquery.SchemaField('likeCount', 'INTEGER'),
bigquery.SchemaField('commentCount', 'INTEGER'),
bigquery.SchemaField('thumbnailUrl', 'STRING')
]
# Create the table references
dataset_ref = client.dataset('youtube')
table1_ref = dataset_ref.table('trending_videos')
table2_ref = dataset_ref.table('trending_videos_test')
# Define the table objects
table1 = bigquery.Table(table1_ref, schema=schema)
table2 = bigquery.Table(table2_ref, schema=schema)
# Create the tables in BigQuery
client.create_table(table1)
client.create_table(table2)
To confirm that the table has been created, we can run the following code to list all the tables in the youtube
dataset.
tables = client.list_tables('youtube')
for table in tables:
print(table.table_id)
You should see trending_videos
and trending_videos_test
in the output if the table has been created successfully.
Once the table has been created, we will be able to load the data from our json file to the table. We will make a function called load_to_bigquery
to do this.
from google.oauth2 import service_account
from google.cloud import bigquery
import json
def load_to_bigquery(source_file_path: str, table_name: str):
"""
Loads the processed data to BigQuery.
Args:
source_file_path: A string representing the path to the file to be loaded.
table_name: A string representing the name of the table to load the data to.
"""
# Set the path to your service account key file
key_path = 'dags/service_account_key.json'
# Set the credentials using the service account key
credentials = service_account.Credentials.from_service_account_file(
key_path,
scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
# Instantiate the BigQuery client with the credentials
client = bigquery.Client(credentials=credentials)
# Refer to the table where the data will be loaded
dataset_ref = client.dataset('youtube')
table_ref = dataset_ref.table(table_name)
table = client.get_table(table_ref)
# Load the data from the json file to BigQuery
with open(source_file_path, 'r') as f:
json_data = json.load(f)
job_config = bigquery.LoadJobConfig()
job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
job = client.load_table_from_json(json_data, table, job_config = job_config)
job.result() # Waits for the job to complete
# Log the job results
print(f"Loaded {job.output_rows} rows to {table.table_id}")
Now let’s load the data to BigQuery by running the following code.
load_to_bigquery('tmp_file_processed.json', 'trending_videos_test')
You should see something like Loaded 200 rows to airflow-youtube-project-386703.youtube.trending_videos_test
in the output if everything is running properly. If you want to verify the loaded data through the BigQuery UI, you can go to the BigQuery Console and select the youtube
dataset. You should see the trending_videos_test
table in the list of tables. Click on the table, then click on the Preview
tab to see the data.
Building the Airflow DAG
Now that we have created the functions to fetch, process and load the trending YouTube videos data to BigQuery, we can start building the Airflow DAG. To begin, we will create a new file named trending_youtube_dag.py
within the dags
folder. We will use the TaskFlow API to make the DAG, by adding @dag
decorator before the DAG definition and @task
decorators before the task definitions. This DAG will encompass a series of tasks (fetch_trending_videos
, data_processing
and load_to_bigquery
), each corresponding to the functions we previously created. Note that we have changed all the paths to absolute paths (e.g. "/opt/airflow/dags/.env" instead of "dags/.env") to ensure that the file locations are explicitly specified and can be correctly recognized and accessed by the Docker container running the code. By establishing this DAG, we can effortlessly execute the pipeline on a defined schedule, ensuring the continuous retrieval and processing of daily trending YouTube videos data. Below is the entire code for the trending_youtube_dag.py
file.
from airflow.decorators import dag, task
from googleapiclient.discovery import build
from google.cloud import bigquery
from google.oauth2 import service_account
import json
from datetime import datetime, timedelta, timezone
import isodate
import os
from dotenv import load_dotenv
default_args = {
'owner': 'tmtsmrsl',
'retries': 3,
'retry_delay': timedelta(minutes=5)
}
@dag(dag_id='trending_youtube_dag_v1',
default_args=default_args,
description='A pipeline to fetch trending YouTube videos',
start_date=datetime(2023, 5, 17, tzinfo=timezone(timedelta(hours=7))),
schedule_interval='0 10 * * *',
catchup=False)
def trending_youtube_dag():
@task()
def fetch_trending_videos(region_code: str, max_results: int, target_file_path: str):
"""Fetches trending videos from YouTube for a specific region.
Args:
region_code: A string representing the ISO 3166-1 alpha-2 country code for the desired region.
max_results: An integer representing the maximum number of results to fetch.
target_file_path: A string representing the path to the file to be written.
"""
# Load API key from .env file
load_dotenv("/opt/airflow/dags/.env")
api_key = os.environ.get("YOUTUBE_API_KEY")
# Create YouTube API client
youtube = build("youtube", "v3", developerKey=api_key)
# Fetch videos until max_results is reached or there are no more results
videos_list = []
next_page_token = ""
while len(videos_list) < max_results and next_page_token is not None:
# Make API request for videos
request = youtube.videos().list(
part="snippet,contentDetails,statistics",
chart="mostPopular",
regionCode=region_code,
maxResults=50,
pageToken=next_page_token,
)
response = request.execute()
# Extract videos from response
videos = response.get("items", [])
# Update next_page_token for the next API request
next_page_token = response.get("nextPageToken", None)
# Extract relevant video details and append to list
infos = {'snippet':['title', 'publishedAt', 'channelId', 'channelTitle',
'description', 'tags', 'thumbnails', 'categoryId', 'defaultAudioLanguage'],
'contentDetails':['duration', 'caption'],
'statistics':['viewCount', 'likeCount', 'commentCount']}
now = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')
for video in videos:
video_details = {
'videoId': video["id"],
'trendingAt': now
}
for k in infos.keys():
for info in infos[k]:
# use try-except to handle missing info
try:
video_details[info] = video[k][info]
except KeyError:
video_details[info] = None
videos_list.append(video_details)
# Write fetched videos data to a json file
with open(target_file_path, "w") as f:
json.dump(videos_list, f)
@task()
def data_processing(source_file_path: str, target_file_path: str):
"""Processes the raw data fetched from YouTube.
Args:
source_file_path: A string representing the path to the file to be processed.
target_file_path: A string representing the path to the file to be written.
"""
# Load the fetched videos data from the json file
with open(source_file_path, 'r') as f:
videos_list = json.load(f)
# Load the categories dictionary from the json file
with open('/opt/airflow/dags/categories.json', 'r') as f:
categories = json.load(f)
# Process the fetched videos data
for video in videos_list:
# Convert ISO 8601 duration to seconds
video['durationSec'] = int(isodate.parse_duration(video['duration']).total_seconds()) if video['duration'] is not None else None
del video['duration']
# Convert tags list to string
video['tags'] = ', '.join(video['tags']) if video['tags'] is not None else None
# Convert categoryId to category based on categories dictionary
video['category'] = categories.get(video['categoryId'], None) if video['categoryId'] is not None else None
del video['categoryId']
# Parse the thumbnail url
video['thumbnailUrl'] = video['thumbnails'].get('standard', {}).get('url', None) if video['thumbnails'] is not None else None
del video['thumbnails']
# Convert viewCount, likeCount, and commentCount to integer
video['viewCount'] = int(video['viewCount']) if video['viewCount'] is not None else None
video['likeCount'] = int(video['likeCount']) if video['likeCount'] is not None else None
video['commentCount'] = int(video['commentCount']) if video['commentCount'] is not None else None
# Convert caption to boolean
video['caption'] = True if video['caption'] == 'true' else False if video['caption'] == 'false' else None
# Save the processed videos data to a new file
with open(target_file_path, "w") as f:
json.dump(videos_list, f)
@task()
def load_to_bigquery(source_file_path: str, table_name: str):
"""
Loads the processed data to BigQuery.
Args:
source_file_path: A string representing the path to the file to be loaded.
table_name: A string representing the name of the table to load the data to.
"""
# Set the path to your service account key file
key_path = '/opt/airflow/dags/service_account_key.json'
# Set the credentials using the service account key
credentials = service_account.Credentials.from_service_account_file(
key_path,
scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
# Instantiate the BigQuery client with the credentials
client = bigquery.Client(credentials=credentials)
# Refer to the table where the data will be loaded
dataset_ref = client.dataset('youtube')
table_ref = dataset_ref.table(table_name)
table = client.get_table(table_ref)
# Load the data from the json file to BigQuery
with open(source_file_path, 'r') as f:
json_data = json.load(f)
job_config = bigquery.LoadJobConfig()
job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
job = client.load_table_from_json(json_data, table, job_config = job_config)
# Waits for the job to complete and log the job results
job.result()
print(f"Loaded {job.output_rows} rows to {table.table_id}")
file_path = '/opt/airflow/dags/tmp_file.json'
fetch_trending_videos_task = fetch_trending_videos(region_code='ID', max_results=200, target_file_path=file_path)
processed_file_path = '/opt/airflow/dags/tmp_file_processed.json'
data_processing_task = data_processing(source_file_path=file_path, target_file_path=processed_file_path)
load_to_bigquery_task = load_to_bigquery(source_file_path=processed_file_path, table_name='trending_videos')
fetch_trending_videos_task >> data_processing_task >> load_to_bigquery_task
dag = trending_youtube_dag()
Now, let’s explore some specific sections of the code above in more detail. The section of the code below defines the default configuration options for an Airflow DAG. It specifies various attributes that apply to all tasks within the DAG unless overridden at the task level. With the configuration below, a failed task will be retried for 3 times with 5 minutes delay between each retry.
default_args = {
'owner': 'tmtsmrsl',
'retries': 3,
'retry_delay': timedelta(minutes=5)
}
The next section of the code defines the DAG itself. Below are the explanations of the parameters included in the @dag
decorator:
dag_id
parameter is a unique identifier for the DAG, which will be shown in the Airflow Webserver UIdefault_args
parameter refers to the configuration options defined abovedescription
parameter provides a description or summary of the DAG’s purposestart_date
parameter is the date when the DAG is eligible to run tasks, which you can adjust to the current dateschedule_interval
parameter defines the schedule for the DAG. Here, it is set to run daily at 10:00 AM GMT+7 according to the cron expression ‘0 10 *’ and the timezone specified in thestart_date
catchup
parameter is a boolean that indicates whether the DAG should run for the past dates or not (in our case, we set it toFalse
to prevent the DAG from running for the past dates)
@dag(dag_id='trending_youtube_dag_v1',
default_args=default_args,
description='A pipeline to fetch trending YouTube videos',
start_date=datetime(2023, 5, 17, tzinfo=timezone(timedelta(hours=7))),
schedule_interval='0 10 * * *',
catchup=False)
def trending_youtube_dag():
...
After defining the DAG and tasks, we need to specify the dependencies between the tasks. In our case, the fetch_trending_videos_task
task should be executed first, followed by the data_processing_task
, and finally the load_to_bigquery_task
. We can specify the dependencies between the tasks by using the >>
operator, which indicates that the task on the left side should be executed before the task on the right side. The code below shows how we can specify the dependencies between the tasks.
@dag(...)
def trending_youtube_dag():
@task()
def fetch_trending_videos(region_code: str, max_results: int, target_file_path: str):
...
@task()
def data_processing(source_file_path: str, target_file_path: str):
...
@task()
def load_to_bigquery(source_file_path: str, table_name: str):
...
file_path = '/opt/airflow/dags/tmp_file.json'
fetch_trending_videos_task = fetch_trending_videos(region_code='ID', max_results=200, target_file_path=file_path)
processed_file_path = '/opt/airflow/dags/tmp_file_processed.json'
data_processing_task = data_processing(source_file_path=file_path, target_file_path=processed_file_path)
load_to_bigquery_task = load_to_bigquery(source_file_path=processed_file_path, table_name='trending_videos')
fetch_trending_videos_task >> data_processing_task >> load_to_bigquery_task
The final line of the code creates the DAG instance and registers it in Airflow.
dag = trending_youtube_dag()
By now your project structure should look similar to this. You can delete the tmp_file.json
and tmp_file_processed.json
files if you want to.
airflow_youtube_project
├── airflow_youtube_env
├── dags
│ └── .env (containing YOUTUBE_API_KEY)
│ └── service_account_key.json
│ └── categories.json
│ └── trending_youtube_dag.py
├── logs
├── plugins
├── .env (containing AIRFLOW_UID)
├── docker-compose.yaml
├── tmp_file.json
└── tmp_file_processed.json
Monitoring the DAG through Airflow Webserver UI
Now that we have created the DAG, it’s time to check the Airflow Webserver UI to see if the DAG has been registered successfully. I assume that you have followed the instructions in the earlier section to install and run Airflow using Docker. First open Docker Desktop and and ensure that the containers (except for airflow-init-1
) under airflow_youtube_project
are running. You can verify this by checking the container status, as shown in the screenshot below.
If the containers are not running, you can start them by navigating to the airflow_youtube_project
directory and executing the following command.
$ docker compose up
As seen from the last screenshot, the port for airflow-webserver-1
is shown as 8080:8080. This means port 8080 on the host machine is mapped to port 8080 on the container. Now open your browser and go to http://localhost:8080
to access the Airflow Webserver UI. Enter your username and password, which by default are both airflow
. After logging in, you should be able to see the list of DAGs registered in Airflow. Search for the DAG that we have created earlier (trending_youtube_dag_v1
) and click on it to see the details.
To unpause the DAG, click on the toggle button on the top left corner of the page. The color of the toggle background should change from grey to blue. To check when the next DAG run will be, hover to the light gray box containing Next Run: ...
. A dark gray box will appear showing details such as Run After
, Next Run
and Date Interval
. In the case below, the dark gray box indicates that the next DAG run is scheduled to occur in 18 hours.
If you wish to visualize the dependencies between the tasks, you can click on the Graph
tab. The screenshot below shows the dependencies between the tasks in our DAG.
After the DAG has been executed, we can can monitor the status of the DAG run and individual tasks to ensure everything ran as expected. The screenshot below shows the status of our first DAG run and the corresponding tasks. In the Airflow UI, successful tasks are denoted by green boxes, while any failed tasks will be denoted by red boxes. Additionally, hovering over each task provides additional information such as the execution date and duration, allowing us to analyze the performance of our DAG. If you are interested to see the logs of each task, simply click on the respective task box and select the Logs
option.
Notice that earlier we have only installed the dependecies in the youtube_airflow_env
virtual environment on our local machine. You might be curious how the DAG can run successfully within the Docker environment even though we have not installed any of the dependencies inside the Docker containers. Well, this is because the Docker containers (which are created from the docker-compose.yaml
file provided by Airflow) come with a set of pre-installed Python libraries by default, which happen to include all of our required dependencies. We can verify this by opening the terminal within the airflow-worker-1
container and checking if our dependencies are installed using the command below.
$ container_id=$(docker ps --filter name=airflow-worker-1 --format "{{.ID}}")
$ docker exec -it $container_id bash
$ pip list | grep -E 'google-api-python-client|python-dotenv|isodate|google-cloud-bigquery'
google-api-python-client 1.12.11
google-cloud-bigquery 2.34.4
google-cloud-bigquery-datatransfer 3.7.0
google-cloud-bigquery-storage 2.14.1
isodate 0.6.1
python-dotenv 0.21.1
Visualizing the Data in BigQuery using Looker Studio
Most people are not interested in examining the raw data directly within BigQuery. Instead, they prefer a more user-friendly and visually appealing representation of the data. In this section, we will use Looker Studio to create a dashboard to visualize the data that we have loaded into BigQuery earlier.
First navigate to Looker Studio. Our initial step is to establish a new connection to BigQuery within Looker Studio. To accomplish this, click on the Create
button on the left panel and select Data Source
. Then select BigQuery
as the desired data source. If this is your first time connecting to BigQuery from Looker Studio, you may need to click the "Authorize" button. Following that, we’ll need to select the appropriate Project
, Dataset
and Table
, then click on the Connect
button. Finally, click on the Create Report
button to create a new report, which will be used to create our dashboard.
I encourage you to explore the different options available in Looker Studio and use your creativity to design your own dashboard. It might take some time to be familiar with Looker Studio, but if you have used other dashboarding tools, you’ll quickly get the hang of it. For this article, I’ll share my completed dashboard as an example. Note that I have been retrieving data from YouTube API on a daily basis since 8th May 2023 up to 18th May 2023 (which is the date of writing this article). It includes three charts, a table and a date filter, which are shown below.
The dashboard above aims to address the following questions:
- Which video categories are the most popular?
- What is the average view count for trending videos in each category?
- Is publication day related to the likelihood of a video becoming trending?
- Which videos appeared the most frequently in the trending list?
You can also access my dashboard directly here. You’ll be able to interact with the latest data and experiment with different date filter to observe how the charts and table dynamically respond to your selections.
Conclusion
In this article, we have explored how to use Youtube Data API v3 to retrieve data about trending videos on YouTube. We have also learned how to use Airflow within a Docker environment to automate our data pipeline. Specifically, we have used Airflow to automate the ETL (extract, transform, and load) process from the YouTube Data API into BigQuery. Furthermore we have seen how Looker Studio can be utilized to visualize the data in BigQuery. Hopefully, this article provides you with valuable insights on automating your data retrieval, processing, loading, and visualization processes, allowing you to streamline your workflows effectively.
If you wish to check out the code and folder structure for this project, it is available on my Github Repository. And if you want to see an example of extensive analysis on the trending youtube videos data, you can check out my older project here. Below are some of the visualizations that I have created in my older project.