Filtering data is one of the most common operations you’ll perform when working with PySpark DataFrames. Whether you’re analyzing large datasets, preparing data for machine learning models, or performing transformations, you often need to isolate specific subsets of data based on certain conditions. PySpark provides several methods for filtering DataFrames, and this article will explore the most widely used approaches.
Methods to Filter Data in PySpark
There are two ways to filter data in PySpark:
1) filter() or where() method
2) Using SQL Queries
Let’s go through each method in detail.
Method 1: Using filter() or where(): The filter() method in PySpark is equivalent to the SQL WHERE clause. It’s the most direct way to filter a DataFrame based on one or more conditions.
Syntax: DataFrame.filter(condition) DataFrame.where(condition) # where is an alias for filter condition: A condition or multiple conditions used to filter the rows of the DataFrame.
Example 1: Filter Rows Based on a Single Condition.
# Importing necessary modules from pyspark.sql import SparkSession # Initialize Spark session spark = SparkSession.builder.appName("Filter Data Example").getOrCreate() # Sample DataFrame data = [("Alice", 25), ("Bob", 30), ("Catherine", 29)] columns = ["Name", "Age"] df = spark.createDataFrame(data, columns) # Filter DataFrame where Age is greater than 25 df_filtered = df.filter(df["Age"] > 25) # Show the result df_filtered.show()
Output:
+----------+---+ | Name|Age| +----------+---+ | Bob| 30| | Catherine| 29| +----------+---+
In this example, the filter condition df[“Age”] > 25 is used to return only the rows where the Age column has a value greater than 25.
Example 2: Filtering with Multiple Conditions. You can combine multiple conditions to filter rows based on more complex criteria using logical operators like:
& (and): Both conditions must be true. | (or): At least one condition must be true. ~ (not): Negates a condition.
# Filter DataFrame where Age is greater # than 25 and Name starts with 'C' from pyspark.sql.functions import col df_filtered_and = df.filter((df["Age"] > 25) & (df["Name"].startswith("C"))) # Show the result df_filtered_and.show()
Output:
+----------+---+ | Name|Age| +----------+---+ | Catherine| 29| +----------+---+
Here, the condition df[“Age”] > 25 ensures that the Age column is greater than 25, and the df[“Name”].startswith(“C”) ensures that the Name column starts with the letter “C”. Both conditions must be true for a row to be included in the result.
Example 3: Filtering with Multiple Conditions (OR).
# Filter DataFrame where Age is less # than 30 or Name is 'Bob' df_filtered_or = df.filter((df["Age"] < 30) | (df["Name"] == "Bob")) # Show the result df_filtered_or.show()
Output:
+----------+---+ | Name|Age| +----------+---+ | Alice| 25| | Bob| 30| | Catherine| 29| +----------+---+
In this case, the filter includes rows where either the Age is less than 30 or the Name is equal to “Bob”.
Example 4: Filtering with NOT Condition.
# Filter DataFrame where Age is not equal to 30 df_filtered_not = df.filter(~(df["Age"] == 30)) # Show the result df_filtered_not.show()
Output:
+----------+---+ | Name|Age| +----------+---+ | Alice| 25| | Catherine| 29| +----------+---+
The ~ operator is used to negate the condition, filtering out rows where Age equals 30.
Example 5: Filter Rows Based on a Substring Match.
from pyspark.sql.functions import col # Filter rows where the Name contains the letter 'A' df_filtered_contains = df.filter(col("Name").like("%A%")) # Show the result df_filtered_contains.show()
Output:
+-----+---+ | Name|Age| +-----+---+ |Alice| 25| +-----+---+
In this case, we use the like() function to filter rows where the Name column contains the letter “A”.
Method 2: Using SQL Queries.
Example: Filtering Data with SQL Queries
# Register the DataFrame as a temporary SQL view df.createOrReplaceTempView("people") # SQL query to filter where Age is less than 30 df_sql_filtered = spark.sql("SELECT * FROM people WHERE Age < 30") # Show the result df_sql_filtered.show()
Output:
+----------+---+ | Name|Age| +----------+---+ | Alice| 25| | Catherine| 29| +----------+---+
This method lets you write familiar SQL queries to filter the data. The createOrReplaceTempView() creates a temporary view, and spark.sql() executes an SQL query to filter the rows where Age is less than 30.
Conclusion: Filtering data in PySpark DataFrames is essential for narrowing down data and performing targeted analysis. PySpark offers a variety of filtering methods to suit different cases.By mastering these methods, you can efficiently extract meaningful subsets of data for further analysis or processing in PySpark.