Chapter 2: A Tour of PySpark Data Types#
Basic Data Types in PySpark#
Understanding the basic data types in PySpark is crucial for defining DataFrame schemas and performing efficient data processing. Below is a detailed overview of each type, with descriptions, Python equivalents, and examples:
Numerical Types#
ByteType Used to store byte-length integers ranging from -128
to 127
. Ideal for storing small data efficiently. - Python Equivalent: int
(-128
to 127
) Python Example
[2]:
byte_example = 127 # Maximum value for a signed byte
ShortType Represents a short integer, storing values between -32768
and 32767
. More efficient than using IntegerType for data with smaller numerical ranges. - Python Equivalent: int
(-32768
to 32767
) Python Example
[3]:
short_example = 32767 # Maximum value for a signed short
IntegerType Used to store integer values. Ideal for counts, indices, and any discrete quantity. - Python Equivalent: int
(-2147483648
to 2147483647
) Python Example
[4]:
integer_example = 123
LongType Suitable for storing large integer values, often used for identifiers or large counts. - Python Equivalent: int
(-9223372036854775808
to 9223372036854775807
) Python Example
[5]:
long_integer_example = 1234567890123456789
DoubleType Provides double precision floating-point numbers for accurate and precise calculations. - Python Equivalent: float
(double precision) Python Example
[6]:
double_example = 12345.6789
FloatType Used for floating-point numbers where less precision is acceptable in exchange for performance. - Python Equivalent: float
(single precision) Python Example
[7]:
float_example = 123.456
DecimalType Allows fixed precision and scale, used in scenarios requiring exact decimal representation, such as financial computations. - Python Equivalent: decimal.Decimal
Python Example
[8]:
from decimal import Decimal
decimal_example = Decimal('12345.6789')
StringType#
Used for text data; supports Unicode and is capable of storing any string data. - Python Equivalent: str
Python Example
[9]:
string_example = "Hello, World!"
BinaryType#
Used for raw byte data, such as file contents or images, stored as binary streams. - Python Equivalent: bytes
Python Example
[10]:
binary_example = b'Hello, binary world!'
BooleanType#
Represents Boolean values, used extensively in conditional operations and filters. - Python Equivalent: bool
Python Example
[11]:
boolean_example = True
Datetime Types#
DateType Used for dates without time, suitable for storing calendar dates like birthdays or specific days. - Python Equivalent: datetime.date
Python Example
[12]:
from datetime import date
date_example = date(2020, 1, 1)
TimestampType Stores both date and time, essential for recording precise moments in time, such as log timestamps. - Python Equivalent: datetime.datetime
Python Example
[13]:
from datetime import datetime
timestamp_example = datetime(2020, 1, 1, 12, 0)
Creating a DataFrame from Python Objects in PySpark#
Here’s how to define a schema and create a DataFrame in PySpark using the Python objects corresponding to each basic data type:
[14]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, LongType, DoubleType, FloatType
from pyspark.sql.types import DecimalType, StringType, BinaryType, BooleanType, DateType, TimestampType
from decimal import Decimal
from datetime import date, datetime
# Define the schema of the DataFrame
schema = StructType([
StructField("integer_field", IntegerType(), nullable=False),
StructField("long_field", LongType(), nullable=False),
StructField("double_field", DoubleType(), nullable=False),
StructField("float_field", FloatType(), nullable=False),
StructField("decimal_field", DecimalType(10, 2), nullable=False),
StructField("string_field", StringType(), nullable=False),
StructField("binary_field", BinaryType(), nullable=False),
StructField("boolean_field", BooleanType(), nullable=False),
StructField("date_field", DateType(), nullable=False),
StructField("timestamp_field", TimestampType(), nullable=False)
])
# Sample data using the Python objects corresponding to each PySpark type
data = [
(123, 1234567890123456789, 12345.6789, 123.456, Decimal('12345.67'), "Hello, World!",
b'Hello, binary world!', True, date(2020, 1, 1), datetime(2020, 1, 1, 12, 0)),
(456, 9223372036854775807, 98765.4321, 987.654, Decimal('98765.43'), "Goodbye, World!",
b'Goodbye, binary world!', False, date(2025, 12, 31), datetime(2025, 12, 31, 23, 59)),
(-1, -1234567890123456789, -12345.6789, -123.456, Decimal('-12345.67'), "Negative Values",
b'Negative binary!', False, date(1990, 1, 1), datetime(1990, 1, 1, 0, 0)),
(0, 0, 0.0, 0.0, Decimal('0.00'), "", b'', True, date(2000, 1, 1), datetime(2000, 1, 1, 0, 0))
]
# Create DataFrame
df = spark.createDataFrame(data, schema=schema)
# Show the DataFrame
df.show()
+-------------+--------------------+------------+-----------+-------------+---------------+--------------------+-------------+----------+-------------------+
|integer_field| long_field|double_field|float_field|decimal_field| string_field| binary_field|boolean_field|date_field| timestamp_field|
+-------------+--------------------+------------+-----------+-------------+---------------+--------------------+-------------+----------+-------------------+
| 123| 1234567890123456789| 12345.6789| 123.456| 12345.67| Hello, World!|[48 65 6C 6C 6F 2...| true|2020-01-01|2020-01-01 12:00:00|
| 456| 9223372036854775807| 98765.4321| 987.654| 98765.43|Goodbye, World!|[47 6F 6F 64 62 7...| false|2025-12-31|2025-12-31 23:59:00|
| -1|-1234567890123456789| -12345.6789| -123.456| -12345.67|Negative Values|[4E 65 67 61 74 6...| false|1990-01-01|1990-01-01 00:00:00|
| 0| 0| 0.0| 0.0| 0.00| | []| true|2000-01-01|2000-01-01 00:00:00|
+-------------+--------------------+------------+-----------+-------------+---------------+--------------------+-------------+----------+-------------------+
Precision for Doubles, Floats, and Decimals#
Understanding precision in numerical data types is critical for data integrity, especially in fields requiring high accuracy such as financial analysis, scientific computation, and engineering. PySpark offers different data types to cater to these needs.
FloatType FloatType
in PySpark represents a single precision 32-bit IEEE 754 floating-point number. It’s less precise but requires less storage and can be processed faster than DoubleType. This makes it suitable for applications where a large volume of numerical data needs to be processed quickly and extreme precision is not critical. Usage Scenario Useful in machine learning algorithms for faster computation when processing large datasets.
DoubleType DoubleType
corresponds to a double precision 64-bit IEEE 754 floating-point number. It provides a good balance between precision and performance and is suitable for most numerical calculations where precision is important. Usage Scenario Ideal for financial calculations where precision is more crucial than computational speed.
DecimalType DecimalType
is used when dealing with high-precision fixed-scale decimal numbers. The precision and scale can be defined by the user, which makes it invaluable for applications such as financial reporting, where precise decimal representation helps avoid rounding errors. Usage Scenario Critical in accounting applications where calculations need to be accurate to the cent.
Example: Calculating Financial Statistics#
This example demonstrates how to use different numerical data types in PySpark for financial calculations, such as aggregating revenues and calculating averages with appropriate precision.
[15]:
from decimal import Decimal
from pyspark.sql.types import StructType, StructField, FloatType, DoubleType, DecimalType
from pyspark.sql.functions import sum, avg, col, format_number
# Define the schema of the DataFrame
schema = StructType([
StructField("revenue_float", FloatType(), nullable=False),
StructField("revenue_double", DoubleType(), nullable=False),
StructField("revenue_decimal", DecimalType(10, 2), nullable=False)
])
# Sample data
data = [
(12345.67, 12345.6789, Decimal('12345.68')),
(98765.43, 98765.4321, Decimal('98765.43')),
(54321.10, 54321.0987, Decimal('54321.10'))
]
# Create DataFrame
df = spark.createDataFrame(data, schema=schema)
# Calculations
result = df.select(
format_number(sum(col("revenue_float")), 2).alias("Total_Revenue_Float"),
format_number(avg(col("revenue_float")), 2).alias("Average_Revenue_Float"),
format_number(sum(col("revenue_double")), 2).alias("Total_Revenue_Double"),
format_number(avg(col("revenue_double")), 2).alias("Average_Revenue_Double"),
format_number(sum(col("revenue_decimal")), 2).alias("Total_Revenue_Decimal"),
format_number(avg(col("revenue_decimal")), 2).alias("Average_Revenue_Decimal")
)
result.show()
+-------------------+---------------------+--------------------+----------------------+---------------------+-----------------------+
|Total_Revenue_Float|Average_Revenue_Float|Total_Revenue_Double|Average_Revenue_Double|Total_Revenue_Decimal|Average_Revenue_Decimal|
+-------------------+---------------------+--------------------+----------------------+---------------------+-----------------------+
| 165,432.20| 55,144.07| 165,432.21| 55,144.07| 165,432.21| 55,144.07|
+-------------------+---------------------+--------------------+----------------------+---------------------+-----------------------+
Complex Data Types in PySpark#
Complex data types in PySpark facilitate the handling of nested and structured data, which is essential for working with modern data formats like JSON, XML, and others commonly found in big data ecosystems. This section explores the primary complex data types available in PySpark: ArrayType
, StructType
, MapType
, and their use cases.
ArrayType Allows storage of multiple values of the same type in a single column. Ideal for data that naturally forms a list, such as tags, categories, or historical data points. - Python Equivalent: list
Python Example
[16]:
array_example = ['apple', 'banana', 'cherry']
Usage Scenario Managing lists of items associated with each record, such as multiple phone numbers or email addresses for a single contact.
StructType Enables nesting of DataFrame columns, allowing complex and hierarchical data structures within a single DataFrame cell. Each field in a StructType
can itself be a complex type. It’s similar to a row in a DataFrame, typically used to encapsulate records with a structured schema. - Python Equivalent: pyspark.sql.Row
Python Example
[17]:
from pyspark.sql import Row
struct_example = Row(name="John Doe", age=30, address=Row(street="123 Elm St", city="Somewhere"))
Usage Scenario Often used to represent a JSON object, enabling the manipulation of each JSON field as if it were a column in the DataFrame.
MapType Represents a key-value pair in a DataFrame column, where each key and value can be of any data type. Useful for dynamically structured data. - Python Equivalent: dict
Python Example
[18]:
map_example = {'food': 'pizza', 'color': 'blue', 'car': 'Tesla'}
Usage Scenario Storing and processing collections of key-value pairs within a single DataFrame column, like attributes of a product where keys are attribute names and values are attribute values.
Example: Handling Complex Nested Data#
To illustrate the use of these complex data types, let’s consider a practical example involving nested data structures such as a customer record that includes multiple addresses and preferences in various categories.
[19]:
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, MapType
from pyspark.sql import Row
# Define the schema of the DataFrame
schema = StructType([
StructField("name", StringType(), nullable=False),
StructField("addresses", ArrayType(
StructType([
StructField("street", StringType(), nullable=False),
StructField("city", StringType(), nullable=False),
StructField("zip", StringType(), nullable=False)
])
), nullable=True),
StructField("preferences", MapType(StringType(), StringType()), nullable=True)
])
# Sample data using Row objects for StructType
data = [
Row(name="John Doe",
addresses=[Row(street="123 Elm St", city="Somewhere", zip="12345"),
Row(street="456 Oak St", city="Anywhere", zip="67890")],
preferences={"food": "pizza", "color": "blue", "car": "Tesla"}),
Row(name="Jane Smith",
addresses=[Row(street="789 Pine St", city="Everywhere", zip="10112")],
preferences={"food": "sushi", "color": "green", "car": "Honda"})
]
# Create DataFrame
df = spark.createDataFrame(data, schema=schema)
# Show the DataFrame
df.show(truncate=False)
+----------+---------------------------------------------------------------+---------------------------------------------+
|name |addresses |preferences |
+----------+---------------------------------------------------------------+---------------------------------------------+
|John Doe |[{123 Elm St, Somewhere, 12345}, {456 Oak St, Anywhere, 67890}]|{color -> blue, car -> Tesla, food -> pizza} |
|Jane Smith|[{789 Pine St, Everywhere, 10112}] |{color -> green, car -> Honda, food -> sushi}|
+----------+---------------------------------------------------------------+---------------------------------------------+
In this example: - ArrayType
is used to store multiple addresses for each customer. - StructType
is nested within ArrayType
to represent each address as a structured record. - MapType
stores preferences as key-value pairs, allowing for dynamic data storage.
Casting Columns in PySpark#
Casting columns is a fundamental operation in data processing where the data type of a column in a DataFrame is converted from one type to another. PySpark provides straightforward methods that enable you to align input data types with the requirements of data processing operations or applications.
How to Cast Columns#
To cast columns in PySpark, the cast()
or astype()
method can be used on a column. Here’s a complete example demonstrating how to perform basic casting operations:
[20]:
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, FloatType
# Define the schema of the DataFrame
schema = StructType([
StructField("float_column", FloatType(), nullable=True),
StructField("string_column", StringType(), nullable=True)
])
# Sample data
data = [
(123.456, "123"),
(789.012, "456"),
(None, "789")
]
# Create DataFrame
df = spark.createDataFrame(data, schema=schema)
# Display original DataFrame
print("Original DataFrame:")
df.show()
# Example of casting a float column to string
df = df.withColumn('string_from_float', col('float_column').cast('string'))
# Example of casting a string column to integer
df = df.withColumn('integer_from_string', col('string_column').cast('integer'))
# Display DataFrame after casting
print("DataFrame after Casting:")
df.show()
Original DataFrame:
+------------+-------------+
|float_column|string_column|
+------------+-------------+
| 123.456| 123|
| 789.012| 456|
| NULL| 789|
+------------+-------------+
DataFrame after Casting:
+------------+-------------+-----------------+-------------------+
|float_column|string_column|string_from_float|integer_from_string|
+------------+-------------+-----------------+-------------------+
| 123.456| 123| 123.456| 123|
| 789.012| 456| 789.012| 456|
| NULL| 789| NULL| 789|
+------------+-------------+-----------------+-------------------+
Cast with Caution: Potential Data Loss#
When casting columns, it’s important to be aware of how PySpark handles incompatible or invalid casting operations:
Silent Conversion to Null - If ANSI mode is disabled, PySpark does not throw an error if a value cannot be converted to the desired type during casting. Instead, it overflows or converts the value to null
. This behavior can lead to data loss in your dataset, which might not be immediately obvious. - If ANSI mode is enabled, PySpark throws an error in that case. If it is acceptable, use try_cast
instead.
Example: Checking for Data Loss - It’s a good practice to check for unexpected nulls that result from casting operations, especially when converting from string to numeric types where formatting issues may cause failures.
[21]:
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType
# Disable ANSI mode
spark.conf.set("spark.sql.ansi.enabled", False)
# Define the schema of the DataFrame
schema = StructType([
StructField("original_column", StringType(), nullable=True)
])
# Sample data
data = [
("123",), # Valid integer in string form
("abc",), # Invalid, will result in null when cast to integer
(None,) # Original null, remains null
]
# Create DataFrame
df = spark.createDataFrame(data, schema=schema)
# Display original DataFrame
print("Original DataFrame:")
df.show()
# Add a new column with casted values
df = df.withColumn('casted_column', col('original_column').cast('integer'))
# Show rows where casting resulted in nulls but the original column had data
print("DataFrame Showing Potential Data Loss:")
df.filter(col('original_column').isNotNull() & col('casted_column').isNull()).show()
spark.conf.unset("spark.sql.ansi.enabled")
Original DataFrame:
+---------------+
|original_column|
+---------------+
| 123|
| abc|
| NULL|
+---------------+
DataFrame Showing Potential Data Loss:
+---------------+-------------+
|original_column|casted_column|
+---------------+-------------+
| abc| NULL|
+---------------+-------------+
Best Practices for Casting#
Validate Data First - Before casting columns, especially when converting strings to numerical types, validate and clean your data to ensure it conforms to expected formats.
Example: Checking if numeric strings are properly formatted before casting to integers
[22]:
from pyspark.sql.functions import col, regexp_extract
# Sample DataFrame with a string column
df = spark.createDataFrame([("100",), ("20x",), ("300",)], ["data"])
# Checking and filtering rows where data can be safely cast to an integer
valid_df = df.filter(regexp_extract(col("data"), '^[0-9]+$', 0) != "")
valid_df.show()
+----+
|data|
+----+
| 100|
| 300|
+----+
Use Explicit Schemas - When reading data, use explicit schemas to avoid incorrect data type inference, which can minimize the need for casting.
Example: Specifying a schema when reading data to ensure correct data types are applied from the start
[23]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
# Define a schema
schema = StructType([
StructField("Employee ID", IntegerType(), True),
StructField("Role", StringType(), True),
StructField("Location", StringType(), True)
])
# Read data with an explicit schema
df = spark.read.csv("../data/employees.csv", schema=schema)
df.printSchema()
root
|-- Employee ID: integer (nullable = true)
|-- Role: string (nullable = true)
|-- Location: string (nullable = true)
Semi-Structured Data Processing in PySpark#
This section explores PySpark’s capabilities for handling semi-structured data formats, particularly focusing on JSON and XML, and addresses approaches for managing VARIANT-like data, which is commonly used in some SQL databases.
JSON Processing#
JSON is a widely used format in web services and data interchange. PySpark simplifies parsing JSON data into structured DataFrames, making it easy to manipulate and analyze.
Key Functions - from_json()
: Converts JSON strings into a DataFrame column with a structured data type. - to_json()
: Converts columns of a DataFrame into JSON strings.
Example: Parsing JSON Strings
[24]:
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
json_schema = StructType([
StructField("name", StringType()),
StructField("age", IntegerType())
])
df = spark.createDataFrame([("{\"name\":\"John\", \"age\":30}",), ("{\"name\":\"Jane\", \"age\":25}",)], ["json_str"])
df.select(from_json(col("json_str"), json_schema).alias("parsed_json")).show()
+-----------+
|parsed_json|
+-----------+
| {John, 30}|
| {Jane, 25}|
+-----------+
Example: Reading and Processing JSON Data
[25]:
df = spark.read.json('../data/books.json')
df.select("author", "title", "genre").show()
+-------------+--------------------+---------+
| author| title| genre|
+-------------+--------------------+---------+
|George Orwell| 1984|Dystopian|
| Jane Austen| Pride and Prejudice| Romance|
| Mark Twain|Adventures of Huc...| Fiction|
+-------------+--------------------+---------+
XML Processing#
Note: This section applies to Spark 4.0
XML is another common format for semi-structured data, used extensively in various enterprise applications. Example: Reading and Processing XML Data
[26]:
df = spark.read \
.format('xml') \
.option('rowTag', 'book') \
.load('../data/books.xml')
df.select("author", "title", "genre").show()
+-------------+--------------------+---------+
| author| title| genre|
+-------------+--------------------+---------+
|George Orwell| 1984|Dystopian|
| Jane Austen| Pride and Prejudice| Romance|
| Mark Twain|Adventures of Huc...| Fiction|
+-------------+--------------------+---------+
Handling VARIANT Data Types in PySpark#
Note: This section applies to Spark 4.0
With the introduction of the VARIANT data type, handling semi-structured data has become more streamlined. VARIANT types are designed to store data that doesn’t conform to a fixed schema, such as JSON or XML, directly within a DataFrame column.
Features of VARIANT in PySpark - Flexibility: VARIANT types can store data structures like JSON or XML without predefined schema constraints, offering high flexibility for data ingestion and manipulation. - Integration: Provides better integration with systems that use semi-structured data, allowing for more direct data exchanges and queries.
Considerations When Using VARIANT - Performance: While VARIANT provides flexibility, it might impact performance due to its dynamic nature. It’s important to test and optimize data operations involving VARIANT types. - Compatibility: Ensure that all parts of your data pipeline support VARIANT if you’re leveraging this data type, especially when exporting data to external systems.
Practical Example: Handling JSON Data with VARIANT This example demonstrates how VARIANT can be used to handle JSON data effectively in PySpark:
[27]:
from datetime import date, datetime
from decimal import Decimal
from pyspark.sql.functions import try_parse_json, try_variant_get, col
# Sample JSON data
data = [
'1234567890123456789',
'12345.6789',
'"Hello, World!"',
'true',
'{"id": 1, "attributes": {"key1": "value1", "key2": "value2"}}',
'{"id": 2, "attributes": {"key1": "value3", "key2": "value4"}}',
]
# Load data into DataFrame with VARIANT
df = spark.createDataFrame(data, StringType()).select(try_parse_json(col("value")).alias("variant_data"))
df.printSchema()
df.show(truncate=False)
# Accessing elements inside the VARIANT
df.select(
try_variant_get(col("variant_data"), "$", "long").alias("long_value"),
try_variant_get(col("variant_data"), "$.id", "int").alias("id"),
try_variant_get(col("variant_data"), "$.attributes.key1", "string").alias("key1"),
try_variant_get(col("variant_data"), "$.attributes.key2", "string").alias("key2"),
).show()
# Collect data and convert to Python objects
[row["variant_data"].toPython() for row in df.collect()]
root
|-- variant_data: variant (nullable = true)
+-------------------------------------------------------+
|variant_data |
+-------------------------------------------------------+
|1234567890123456789 |
|12345.6789 |
|"Hello, World!" |
|true |
|{"attributes":{"key1":"value1","key2":"value2"},"id":1}|
|{"attributes":{"key1":"value3","key2":"value4"},"id":2}|
+-------------------------------------------------------+
+-------------------+----+------+------+
| long_value| id| key1| key2|
+-------------------+----+------+------+
|1234567890123456789|NULL| NULL| NULL|
| 12345|NULL| NULL| NULL|
| NULL|NULL| NULL| NULL|
| 1|NULL| NULL| NULL|
| NULL| 1|value1|value2|
| NULL| 2|value3|value4|
+-------------------+----+------+------+
[27]:
[1234567890123456789,
Decimal('12345.6789'),
'Hello, World!',
True,
{'attributes': {'key1': 'value1', 'key2': 'value2'}, 'id': 1},
{'attributes': {'key1': 'value3', 'key2': 'value4'}, 'id': 2}]