Using Airflow and YouTube API to Automatically Retrieve Trending Videos
This article explains how to set up a workflow using Apache Airflow to retrieve trending videos data from YouTube Data API v3 and store them in BigQuery. Feb 25, 2025 ยท Timotius Marselo
Nowadays, data is very important as they help us make informed decisions and gain insights into a wide range of topics. With the increasing amount of data generated every day, it has become crucial to automate data retrieval and processing. In this article, we will walk through the process of setting up a workflow using Apache Airflow within a Docker environment and YouTube Data API v3 to automatically retrieve daily trending videos from YouTube. The retrieved data will be stored in BigQuery and visualized using Looker Studio.
Apache Airflow is an open-source platform for automating tasks in a data pipeline. It allows data engineers to author, schedule, and monitor workflows. If you wish to learn more about the basics of Airflow, you can check out an article written by my colleague, Stane, here. The article also explains how to install and run Airflow on Docker, which is the approach we will be using for our workflow.
The YouTube Data API v3 is a powerful tool that allows us to fetch data from YouTube. With this API, we can retrieve information about channels, videos, playlists, and more. In this article, we will focus on using the API to fetch data about trending videos in Indonesia.
BigQuery is a fully-managed, cloud-native data warehouse that allows us to store and analyze large amounts of data. BigQuery offers a free tier with a usage limits of 1 TB of querying per month and 10 GB of storage per month, which is adequate for our purpose.
Finally, Looker Studio is a cloud-based business intelligence and analytics platform that allows us to create and share data visualizations. Looker Studio makes it easy to create interactive dashboards and reports, and it integrates seamlessly with BigQuery.
If you are using Windows, I recommend you to use Windows Subsystem for Linux (WSL) to avoid any compatibility issues. You can follow the steps in this video to install WSL. For this project, I will be using Ubuntu 22.04.2 LTS.
First we will make a new project directory named airflow_youtube_project. Then we will follow the steps in the article mentioned earlier to install and run Airflow on Docker (note: some modifications have been made to the original article to make it more suitable for our project.).
To run Airflow in Docker, you will need to have Docker and Docker Compose installed beforehand. To do so, simply download and install Docker Desktop in your local machine depending on your OS via the official Docker website. Once you have installed Docker Desktop, ensure that you have Docker Compose with v1.27.0 or newer installed by running this command
$ docker compose versionDocker Compose version v2.7.0
Afterwards, you need to download docker-compose.yaml by executing this command in the project directory (airflow_youtube_project for our case)
If you open the docker.compose.yaml file, you will see that it defines many services and composes them in a proper way. Now we need to initialize the environment. If you are running Linux OS, you will need to execute these following commands beforehand
Now you can run database migrations and create the first user account by using this command
$ docker compose up airflow-init
It will create an account with username airflow and password airflow. Run Airflow by using docker compose up, then navigate to localhost:8080 and login using the credentials.
Next we will create a new virtual environment and install the required packages in the virtual environment.
Now we need to create a new project in Google Cloud Platform. First go to the Google Cloud Console, then click on the project dropdown menu and select New Project. We will name the project as airflow-youtube-project and click Create. Wait a few moment until the project is sucessfully created. Note that the project ID may be different from the project name.
Once the project is created, select the project from the dropdown menu and go to the APIs & Services Dashboard. Click on Enable APIs and Services, then search for Youtube Data API v3 and enable them. By default, the BigQuery API should already be enabled (in case it is not, please enable it as well). After enabling the Youtube Data API v3, click on the Credentials tab. Click on the Create Credentials and select API key. Copy the API key and save it in a .env file in the airflow_youtube_project/dags directory. We will use this API key later to fetch data from YouTube.
The content of the .env file should be similar to this.
Next we will create a new service account and download the service account key. Go to the Service Accounts page and click on Create Service Account. Give your service account a name and click Create and Continue. Select BigQuery Admin as the role, then click Done. Click on Action and select Manage keys. Click Add key and select Create new key. Select JSON as the key type and click Create. Save the downloaded JSON file in the airflow_youtube_project/dags directory. Let's rename the json file to service_account_key.json for simplicity. We will use this JSON file to authenticate our BigQuery client.
Note that we are saving the .env and JSON files in the airflow_youtube_project/dags directory. This is done so the Docker containers are able access those files as well. In the docker-compose.yaml file, the volume sharing is specified using the volumes parameter, which maps the local directories to the directories inside the container. E.g. the files inside the airflow_youtube_project/dags directory will be shared with the /opt/airflow/dags directory inside the container.
Now let's create a new BigQuery dataset using the python code below. This will also let us test if the service account key is working properly.
from google.cloud import bigqueryfrom google.oauth2 import service_account# Set the path to your service account key filekey_path = 'dags/service_account_key.json'# Set the credentials using the service account keycredentials = service_account.Credentials.from_service_account_file( key_path, scopes=["https://www.googleapis.com/auth/cloud-platform"],)# Instantiate the BigQuery client with the credentialsclient = bigquery.Client(credentials=credentials)# Set the ID of the dataset to createdataset_id = 'youtube'# Construct a full Dataset object to be send to the APIdataset = bigquery.Dataset(f"{client.project}.{dataset_id}")# Send the API request to create the datasetdataset = client.create_dataset(dataset)
To confirm that the dataset has been created, we can run the following python code.
datasets = client.list_datasets()for dataset in datasets: print(dataset.dataset_id)
You should see youtube in the output if the dataset has been created successfully.
By now your project structure should look similar to this.
Earlier we have obtained the API key for YouTube Data API v3, which enables us to fetch data from YouTube. Below is a sample code to fetch the first 50 results of the trending YouTube videos in Indonesia. I encourage you to test the code below to see the response returned by the API. If you wish to read more about the parameters of the API request, you can refer to this documentation page. And if you wish to read more about the details of the items returned in the response, you can refer to this documentation page.
from dotenv import load_dotenvimport osfrom googleapiclient.discovery import build# Load API key from .env fileload_dotenv("dags/.env")api_key = os.environ.get("YOUTUBE_API_KEY")# Create YouTube API clientyoutube = build("youtube", "v3", developerKey=api_key)# Fetch videos until max_results is reached or there are no more results# Make API request for videosrequest = youtube.videos().list( part="snippet,contentDetails,statistics", chart="mostPopular", regionCode="id", maxResults=50,)response = request.execute()
Now we will make a function to fetch a list of trending videos from the YouTube Data API. The function takes the region code, maximum number of results, and the path to the target file as the arguments. The function will extract the relevant information from the API response and save it in a list of dictionaries, which will then be written to a JSON file.
from dotenv import load_dotenvimport osfrom googleapiclient.discovery import buildimport jsonfrom datetime import datetimedef fetch_trending_videos(region_code: str, max_results: int, target_file_path: str): """Fetches trending videos from YouTube for a specific region. Args: region_code: A string representing the ISO 3166-1 alpha-2 country code for the desired region. max_results: An integer representing the maximum number of results to fetch. target_file_path: A string representing the path to the file to be written. """ # Load API key from .env file load_dotenv("dags/.env") api_key = os.environ.get("YOUTUBE_API_KEY") # Create YouTube API client youtube = build("youtube", "v3", developerKey=api_key) # Fetch videos until max_results is reached or there are no more results videos_list = [] next_page_token = "" while len(videos_list) < max_results and next_page_token is not None: # Make API request for videos request = youtube.videos().list( part="snippet,contentDetails,statistics", chart="mostPopular", regionCode=region_code, maxResults=50, pageToken=next_page_token, ) response = request.execute() # Extract videos from response videos = response.get("items", []) # Update next_page_token for the next API request next_page_token = response.get("nextPageToken", None) # Extract relevant video details and append to list infos = {'snippet':['title', 'publishedAt', 'channelId', 'channelTitle', 'description', 'tags', 'thumbnails', 'categoryId', 'defaultAudioLanguage'], 'contentDetails':['duration', 'caption'], 'statistics':['viewCount', 'likeCount', 'commentCount']} now = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ') for video in videos: video_details = { 'videoId': video["id"], 'trendingAt': now } for k in infos.keys(): for info in infos[k]: # use try-except to handle missing info try: video_details[info] = video[k][info] except KeyError: video_details[info] = None videos_list.append(video_details) # Write fetched videos data to a json file with open(target_file_path, "w") as f: json.dump(videos_list, f)
If we run the function below, we should see a JSON file named tmp_file.json in the current directory.
fetch_trending_videos("ID", 200, "tmp_file.json")
Below is a snippet of tmp_file.json content.
[
{
"videoId": "aVrzOIDB5uc",
"trendingAt": "2023-05-15T02:47:17Z",
"title": "TERHARU๐ฅ๐ฅ๐ฅ! Perjuangan Dramatis Indonesia (3) VS (2) Vietnam | SEA GAMES 32 CAMBODIA",
"publishedAt": "2023-05-13T12:12:24Z",
"channelId": "UCeM5Nksgv9_FXTuZ8jkPJPg",
"channelTitle": "RCTI - ENTERTAINMENT",
"description": "SEMUANYA TERHARU SEKALI!!!melihat garuda muda berjuang dikala hanya bermain dengan 10 orang pemain, namun tak membuat mental mereka gentar. Ini pertandingan yang seru dengan diwarnai dengan berbagai konflik, peluang emas & gol yang indah dari Timnas Indonesia!\n\n#SeaGames #Seagames32Cambodia #Indonesia\n\nPerjuangan Atlet Atlet Indonesia dalam ajang Sea Games 2023 sudah dimulaiโจ! Pasukan Garuda akan memulai langkahnya di ajang Sea Games 32 Cambodia, Dukung perjuangan mereka untuk meraih kemenangan di Sea Games dan saksikan semua pertandingan Sea Games 32 Cambodia",
"tags": ["RCTI"],
"thumbnails": {
"default": {
"url": "https://i.ytimg.com/vi/aVrzOIDB5uc/default.jpg",
"width": 120,
"height": 90
},
"medium": {
...
},
"high": {
...
},
"standard": {
...
},
"maxres": {
...
}
},
"categoryId": "17",
"defaultAudioLanguage": "id",
"duration": "PT10M57S",
"caption": "false",
"viewCount": "3712309",
"likeCount": "47450",
"commentCount": "6770"
},
...
]
From the result above, we can see that some of the data need further processing. First we will focus on the categoryId, which is a numeric value. We will need to convert it to the corresponding category name. We can find the category mapping by performing a YouTube API request as follows. We will also save the category mapping to a JSON file for later use.
from dotenv import load_dotenvimport osfrom googleapiclient.discovery import buildimport jsonload_dotenv('dags/.env')api_key = os.environ.get("YOUTUBE_API_KEY")youtube = build("youtube", "v3", developerKey=api_key)request = youtube.videoCategories().list( part="snippet", regionCode="ID")response = request.execute()categories = {item['id']:item['snippet']['title'] for item in response['items']}# save categories to json filewith open('dags/categories.json', 'w') as f: json.dump(categories, f)
The content of categories.json should look like this.
Next, we will make a function to process the json data obtained from fetch_trending_videos function. The function will do the following things:
Convert the duration from ISO 8601 standard to seconds
Convert the tags list to string
Map the categoryId to the corresponding category name
Parse the thumbnail URL from thumbnails
Convert viewCount, likeCount, and commentCount to integer; and caption to boolean
Save the processed data to a new json file
import jsonimport isodatedef data_processing(source_file_path: str, target_file_path: str): """Processes the raw data fetched from YouTube. Args: source_file_path: A string representing the path to the file to be processed. target_file_path: A string representing the path to the file to be written. """ # Load the fetched videos data from the json file with open(source_file_path, 'r') as f: videos_list = json.load(f) # Load the categories dictionary from the json file with open('dags/categories.json', 'r') as f: categories = json.load(f) # Process the fetched videos data for video in videos_list: # Convert ISO 8601 duration to seconds video['durationSec'] = int(isodate.parse_duration(video['duration']).total_seconds()) if video['duration'] is not None else None del video['duration'] # Convert tags list to string video['tags'] = ', '.join(video['tags']) if video['tags'] is not None else None # Convert categoryId to category based on categories dictionary video['category'] = categories.get(video['categoryId'], None) if video['categoryId'] is not None else None del video['categoryId'] # Parse the thumbnail url video['thumbnailUrl'] = video['thumbnails'].get('standard', {}).get('url', None) if video['thumbnails'] is not None else None del video['thumbnails'] # Convert viewCount, likeCount, and commentCount to integer video['viewCount'] = int(video['viewCount']) if video['viewCount'] is not None else None video['likeCount'] = int(video['likeCount']) if video['likeCount'] is not None else None video['commentCount'] = int(video['commentCount']) if video['commentCount'] is not None else None # Convert caption to boolean video['caption'] = True if video['caption'] == 'true' else False if video['caption'] == 'false' else None # Save the processed videos data to a new file with open(target_file_path, "w") as f: json.dump(videos_list, f)
Now let's try to run the data_processing function to process the json file (tmp_file.json) we generated earlier. We will save the processed data to a new file called tmp_file_processed.json.
Below is the snippet of tmp_file_processed.json content. Notice the difference in the tags, durationSec (formerly duration), category (formerly categoryId) and thumbnailUrl (formerly thumbnails). The data type for viewCount, likeCount, commentCount, and caption have also been converted appropriately.
[
{
"videoId": "aVrzOIDB5uc",
"trendingAt": "2023-05-15T02:47:17Z",
"title": "TERHARU๐ฅ๐ฅ๐ฅ! Perjuangan Dramatis Indonesia (3) VS (2) Vietnam | SEA GAMES 32 CAMBODIA",
"publishedAt": "2023-05-13T12:12:24Z",
"channelId": "UCeM5Nksgv9_FXTuZ8jkPJPg",
"channelTitle": "RCTI - ENTERTAINMENT",
"description": "SEMUANYA TERHARU SEKALI!!!melihat garuda muda berjuang dikala hanya bermain dengan 10 orang pemain, namun tak membuat mental mereka gentar. Ini pertandingan yang seru dengan diwarnai dengan berbagai konflik, peluang emas & gol yang indah dari Timnas Indonesia!\n\n#SeaGames #Seagames32Cambodia #Indonesia\n\nPerjuangan Atlet Atlet Indonesia dalam ajang Sea Games 2023 sudah dimulaiโจ! Pasukan Garuda akan memulai langkahnya di ajang Sea Games 32 Cambodia, Dukung perjuangan mereka untuk meraih kemenangan di Sea Games dan saksikan semua pertandingan Sea Games 32 Cambodia",
"tags": "RCTI",
"defaultAudioLanguage": "id",
"caption": false,
"viewCount": 3712309,
"likeCount": 47450,
"commentCount": 6770,
"durationSec": 657,
"category": "Sports",
"thumbnailUrl": "https://i.ytimg.com/vi/aVrzOIDB5uc/sddefault.jpg"
},
...
]
By now your project structure should look similar to this.
Earlier we have created a BigQuery dataset called youtube. Now we will create two new tables called trending_videos and trending_videos_test in the youtube dataset. The trending_videos table will be used to store the processed data generated by the Airflow pipeline, while the trending_videos_test table will be used for testing purposes.
from google.oauth2 import service_accountfrom google.cloud import bigquery# Set the path to your service account key filekey_path = 'dags/service_account_key.json'# Set the credentials using the service account keycredentials = service_account.Credentials.from_service_account_file( key_path, scopes=["https://www.googleapis.com/auth/cloud-platform"],)# Instantiate the BigQuery client with the credentialsclient = bigquery.Client(credentials=credentials)# Define the table schemaschema = [ bigquery.SchemaField('videoId', 'STRING'), bigquery.SchemaField('trendingAt', 'TIMESTAMP'), bigquery.SchemaField('title', 'STRING'), bigquery.SchemaField('publishedAt', 'TIMESTAMP'), bigquery.SchemaField('channelId', 'STRING'), bigquery.SchemaField('channelTitle', 'STRING'), bigquery.SchemaField('description', 'STRING'), bigquery.SchemaField('tags', 'STRING'), bigquery.SchemaField('category', 'STRING'), bigquery.SchemaField('defaultAudioLanguage', 'STRING'), bigquery.SchemaField('durationSec', 'INTEGER'), bigquery.SchemaField('caption', 'BOOLEAN'), bigquery.SchemaField('viewCount', 'INTEGER'), bigquery.SchemaField('likeCount', 'INTEGER'), bigquery.SchemaField('commentCount', 'INTEGER'), bigquery.SchemaField('thumbnailUrl', 'STRING')]# Create the table referencesdataset_ref = client.dataset('youtube')table1_ref = dataset_ref.table('trending_videos')table2_ref = dataset_ref.table('trending_videos_test')# Define the table objectstable1 = bigquery.Table(table1_ref, schema=schema)table2 = bigquery.Table(table2_ref, schema=schema)# Create the tables in BigQueryclient.create_table(table1)client.create_table(table2)
To confirm that the table has been created, we can run the following code to list all the tables in the youtube dataset.
tables = client.list_tables('youtube')for table in tables: print(table.table_id)
You should see trending_videos and trending_videos_test in the output if the table has been created successfully.
Once the table has been created, we will be able to load the data from our json file to the table. We will make a function called load_to_bigquery to do this.
from google.oauth2 import service_accountfrom google.cloud import bigqueryimport jsondef load_to_bigquery(source_file_path: str, table_name: str): """ Loads the processed data to BigQuery. Args: source_file_path: A string representing the path to the file to be loaded. table_name: A string representing the name of the table to load the data to. """ # Set the path to your service account key file key_path = 'dags/service_account_key.json' # Set the credentials using the service account key credentials = service_account.Credentials.from_service_account_file( key_path, scopes=["https://www.googleapis.com/auth/cloud-platform"], ) # Instantiate the BigQuery client with the credentials client = bigquery.Client(credentials=credentials) # Refer to the table where the data will be loaded dataset_ref = client.dataset('youtube') table_ref = dataset_ref.table(table_name) table = client.get_table(table_ref) # Load the data from the json file to BigQuery with open(source_file_path, 'r') as f: json_data = json.load(f) job_config = bigquery.LoadJobConfig() job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON job = client.load_table_from_json(json_data, table, job_config = job_config) job.result() # Waits for the job to complete # Log the job results print(f"Loaded {job.output_rows} rows to {table.table_id}")
Now let's load the data to BigQuery by running the following code.
You should see something like Loaded 200 rows to airflow-youtube-project-386703.youtube.trending_videos_test in the output if everything is running properly. If you want to verify the loaded data through the BigQuery UI, you can go to the BigQuery Console and select the youtube dataset. You should see the trending_videos_test table in the list of tables. Click on the table, then click on the Preview tab to see the data.
Now that we have created the functions to fetch, process and load the trending YouTube videos data to BigQuery, we can start building the Airflow DAG. To begin, we will create a new file named trending_youtube_dag.py within the dags folder. We will use the TaskFlow API to make the DAG, by adding @dag decorator before the DAG definition and @task decorators before the task definitions. This DAG will encompass a series of tasks (fetch_trending_videos, data_processing and load_to_bigquery), each corresponding to the functions we previously created. Note that we have changed all the paths to absolute paths (e.g. "/opt/airflow/dags/.env" instead of "dags/.env") to ensure that the file locations are explicitly specified and can be correctly recognized and accessed by the Docker container running the code. By establishing this DAG, we can effortlessly execute the pipeline on a defined schedule, ensuring the continuous retrieval and processing of daily trending YouTube videos data. Below is the entire code for the trending_youtube_dag.py file.
from airflow.decorators import dag, taskfrom googleapiclient.discovery import buildfrom google.cloud import bigqueryfrom google.oauth2 import service_accountimport jsonfrom datetime import datetime, timedelta, timezoneimport isodateimport osfrom dotenv import load_dotenvdefault_args = { 'owner': 'tmtsmrsl', 'retries': 3, 'retry_delay': timedelta(minutes=5)}@dag(dag_id='trending_youtube_dag_v1', default_args=default_args, description='A pipeline to fetch trending YouTube videos', start_date=datetime(2023, 5, 17, tzinfo=timezone(timedelta(hours=7))), schedule_interval='0 10 * * *', catchup=False)def trending_youtube_dag(): @task() def fetch_trending_videos(region_code: str, max_results: int, target_file_path: str): """Fetches trending videos from YouTube for a specific region. Args: region_code: A string representing the ISO 3166-1 alpha-2 country code for the desired region. max_results: An integer representing the maximum number of results to fetch. target_file_path: A string representing the path to the file to be written. """ # Load API key from .env file load_dotenv("/opt/airflow/dags/.env") api_key = os.environ.get("YOUTUBE_API_KEY") # Create YouTube API client youtube = build("youtube", "v3", developerKey=api_key) # Fetch videos until max_results is reached or there are no more results videos_list = [] next_page_token = "" while len(videos_list) < max_results and next_page_token is not None: # Make API request for videos request = youtube.videos().list( part="snippet,contentDetails,statistics", chart="mostPopular", regionCode=region_code, maxResults=50, pageToken=next_page_token, ) response = request.execute() # Extract videos from response videos = response.get("items", []) # Update next_page_token for the next API request next_page_token = response.get("nextPageToken", None) # Extract relevant video details and append to list infos = {'snippet':['title', 'publishedAt', 'channelId', 'channelTitle', 'description', 'tags', 'thumbnails', 'categoryId', 'defaultAudioLanguage'], 'contentDetails':['duration', 'caption'], 'statistics':['viewCount', 'likeCount', 'commentCount']} now = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ') for video in videos: video_details = { 'videoId': video["id"], 'trendingAt': now } for k in infos.keys(): for info in infos[k]: # use try-except to handle missing info try: video_details[info] = video[k][info] except KeyError: video_details[info] = None videos_list.append(video_details) # Write fetched videos data to a json file with open(target_file_path, "w") as f: json.dump(videos_list, f) @task() def data_processing(source_file_path: str, target_file_path: str): """Processes the raw data fetched from YouTube. Args: source_file_path: A string representing the path to the file to be processed. target_file_path: A string representing the path to the file to be written. """ # Load the fetched videos data from the json file with open(source_file_path, 'r') as f: videos_list = json.load(f) # Load the categories dictionary from the json file with open('/opt/airflow/dags/categories.json', 'r') as f: categories = json.load(f) # Process the fetched videos data for video in videos_list: # Convert ISO 8601 duration to seconds video['durationSec'] = int(isodate.parse_duration(video['duration']).total_seconds()) if video['duration'] is not None else None del video['duration'] # Convert tags list to string video['tags'] = ', '.join(video['tags']) if video['tags'] is not None else None # Convert categoryId to category based on categories dictionary video['category'] = categories.get(video['categoryId'], None) if video['categoryId'] is not None else None del video['categoryId'] # Parse the thumbnail url video['thumbnailUrl'] = video['thumbnails'].get('standard', {}).get('url', None) if video['thumbnails'] is not None else None del video['thumbnails'] # Convert viewCount, likeCount, and commentCount to integer video['viewCount'] = int(video['viewCount']) if video['viewCount'] is not None else None video['likeCount'] = int(video['likeCount']) if video['likeCount'] is not None else None video['commentCount'] = int(video['commentCount']) if video['commentCount'] is not None else None # Convert caption to boolean video['caption'] = True if video['caption'] == 'true' else False if video['caption'] == 'false' else None # Save the processed videos data to a new file with open(target_file_path, "w") as f: json.dump(videos_list, f) @task() def load_to_bigquery(source_file_path: str, table_name: str): """ Loads the processed data to BigQuery. Args: source_file_path: A string representing the path to the file to be loaded. table_name: A string representing the name of the table to load the data to. """ # Set the path to your service account key file key_path = '/opt/airflow/dags/service_account_key.json' # Set the credentials using the service account key credentials = service_account.Credentials.from_service_account_file( key_path, scopes=["https://www.googleapis.com/auth/cloud-platform"], ) # Instantiate the BigQuery client with the credentials client = bigquery.Client(credentials=credentials) # Refer to the table where the data will be loaded dataset_ref = client.dataset('youtube') table_ref = dataset_ref.table(table_name) table = client.get_table(table_ref) # Load the data from the json file to BigQuery with open(source_file_path, 'r') as f: json_data = json.load(f) job_config = bigquery.LoadJobConfig() job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON job = client.load_table_from_json(json_data, table, job_config = job_config) # Waits for the job to complete and log the job results job.result() print(f"Loaded {job.output_rows} rows to {table.table_id}") file_path = '/opt/airflow/dags/tmp_file.json' fetch_trending_videos_task = fetch_trending_videos(region_code='ID', max_results=200, target_file_path=file_path) processed_file_path = '/opt/airflow/dags/tmp_file_processed.json' data_processing_task = data_processing(source_file_path=file_path, target_file_path=processed_file_path) load_to_bigquery_task = load_to_bigquery(source_file_path=processed_file_path, table_name='trending_videos') fetch_trending_videos_task >> data_processing_task >> load_to_bigquery_taskdag = trending_youtube_dag()
Now, let's explore some specific sections of the code above in more detail. The section of the code below defines the default configuration options for an Airflow DAG. It specifies various attributes that apply to all tasks within the DAG unless overridden at the task level. With the configuration below, a failed task will be retried for 3 times with 5 minutes delay between each retry.
The next section of the code defines the DAG itself. Below are the explanations of the parameters included in the @dag decorator:
dag_id parameter is a unique identifier for the DAG, which will be shown in the Airflow Webserver UI
default_args parameter refers to the configuration options defined above
description parameter provides a description or summary of the DAG's purpose
start_date parameter is the date when the DAG is eligible to run tasks, which you can adjust to the current date
schedule_interval parameter defines the schedule for the DAG. Here, it is set to run daily at 10:00 AM GMT+7 according to the cron expression '0 10 * * *' and the timezone specified in the start_date
catchup parameter is a boolean that indicates whether the DAG should run for the past dates or not (in our case, we set it to False to prevent the DAG from running for the past dates)
After defining the DAG and tasks, we need to specify the dependencies between the tasks. In our case, the fetch_trending_videos_task task should be executed first, followed by the data_processing_task, and finally the load_to_bigquery_task. We can specify the dependencies between the tasks by using the >> operator, which indicates that the task on the left side should be executed before the task on the right side. The code below shows how we can specify the dependencies between the tasks.
Now that we have created the DAG, it's time to check the Airflow Webserver UI to see if the DAG has been registered successfully. I assume that you have followed the instructions in the earlier section to install and run Airflow using Docker. First open Docker Desktop and and ensure that the containers (except for airflow-init-1) under airflow_youtube_project are running. You can verify this by checking the container status, as shown in the screenshot below.
If the containers are not running, you can start them by navigating to the airflow_youtube_project directory and executing the following command.
$ docker compose up
As seen from the last screenshot, the port for airflow-webserver-1 is shown as 8080:8080. This means port 8080 on the host machine is mapped to port 8080 on the container. Now open your browser and go to http://localhost:8080 to access the Airflow Webserver UI. Enter your username and password, which by default are both airflow. After logging in, you should be able to see the list of DAGs registered in Airflow. Search for the DAG that we have created earlier (trending_youtube_dag_v1) and click on it to see the details.
To unpause the DAG, click on the toggle button on the top left corner of the page. The color of the toggle background should change from grey to blue. To check when the next DAG run will be, hover to the light gray box containing Next Run: .... A dark gray box will appear showing details such as Run After, Next Run and Date Interval.
If you wish to visualize the dependencies between the tasks, you can click on the Graph tab. The screenshot below shows the dependencies between the tasks in our DAG.
After the DAG has been executed, we can can monitor the status of the DAG run and individual tasks to ensure everything ran as expected. The screenshot below shows the status of our first DAG run and the corresponding tasks. In the Airflow UI, successful tasks are denoted by green boxes, while any failed tasks will be denoted by red boxes. Additionally, hovering over each task provides additional information such as the execution date and duration, allowing us to analyze the performance of our DAG. If you are interested to see the logs of each task, simply click on the respective task box and select the Logs option.
Notice that earlier we have only installed the dependecies in the youtube_airflow_env virtual environment on our local machine. You might be curious how the DAG can run successfully within the Docker environment even though we have not installed any of the dependencies inside the Docker containers. Well, this is because the Docker containers (which are created from the docker-compose.yaml file provided by Airflow) come with a set of pre-installed Python libraries by default, which happen to include all of our required dependencies. We can verify this by opening the terminal within the airflow-worker-1 container and checking if our dependencies are installed using the command below.
Most people are not interested in examining the raw data directly within BigQuery. Instead, they prefer a more user-friendly and visually appealing representation of the data. In this section, we will use Looker Studio to create a dashboard to visualize the data that we have loaded into BigQuery earlier.
First navigate to Looker Studio. Our initial step is to establish a new connection to BigQuery within Looker Studio. To accomplish this, click on the Create button on the left panel and select Data Source. Then select BigQuery as the desired data source. If this is your first time connecting to BigQuery from Looker Studio, you may need to click the "Authorize" button. Following that, we'll need to select the appropriate Project, Dataset and Table, then click on the Connect button. Finally, click on the Create Report button to create a new report, which will be used to create our dashboard.
I encourage you to explore the different options available in Looker Studio and use your creativity to design your own dashboard. It might take some time to be familiar with Looker Studio, but if you have used other dashboarding tools, you'll quickly get the hang of it. For this article, I'll share my completed dashboard as an example. Note that I have been retrieving data from YouTube API on a daily basis since 8th May 2023 up to 18th May 2023 (which is the date of writing this article). It includes three charts, a table and a date filter, which are shown below.
The dashboard above aims to address the following questions:
Which video categories are the most popular?
What is the average view count for trending videos in each category?
Is publication day related to the likelihood of a video becoming trending?
Which videos appeared the most frequently in the trending list?
You can also access my dashboard directly here. You'll be able to interact with the latest data and experiment with different date filter to observe how the charts and table dynamically respond to your selections.
In this article, we have explored how to use Youtube Data API v3 to retrieve data about trending videos on YouTube. We have also learned how to use Airflow within a Docker environment to automate our data pipeline. Specifically, we have used Airflow to automate the ETL (extract, transform, and load) process from the YouTube Data API into BigQuery. Furthermore we have seen how Looker Studio can be utilized to visualize the data in BigQuery. Hopefully, this article provides you with valuable insights on automating your data retrieval, processing, loading, and visualization processes, allowing you to streamline your workflows effectively.
If you wish to check out the code and folder structure for this project, it is available on my Github Repository. And if you want to see an example of extensive analysis on the trending youtube videos data, you can check out my older project here.
Below are some of the visualizations that I have created in my older project.