Create DataFrame From Python Objects in pyspark
Working in pyspark we often need to create DataFrame directly from python lists and objects. Scenarios include, but not limited to: fixtures for Spark unit testing, creating DataFrame from data loaded from custom data sources, converting results from python computations (e.g. Pandas, scikitlearn, etc.) to Spark DataFrame. SparkSession
, as explained in Create Spark DataFrame From Python Objects in pyspark, provides convenient method createDataFrame
for creating Spark DataFrames. The method accepts following parameters:
- data — RDD of any kind of SQL data representation, or list, or pandas.DataFrame.
- schema — the schema of the DataFrame. Accepts DataType, datatype string, list of strings or None.
- samplingRatio — sampling ratio of rows used when inferring the schema.
- verifySchema — if set to True each row is verified against the schema.
We can specify schema using different approaches:
- When schema is None the schema (column names and column types) is inferred from the data, which should be RDD or list of Row, namedtuple, or dict.
- When schema is a list of column names, the type of each column is inferred from data.
- When schema is a DataType or datatype string, it must match the real data.
Let’s see how the createDataFrame
method works in some scenarios. We will use python list as data parameter. RDDs and Pandas DataFrame we are leaving for later.
Create pyspark DataFrame Without Specifying Schema
When schema is not specified, Spark tries to infer the schema from the actual data, using the provided sampling ratio. Column names are inferred from the data as well.
Passing a list of namedtuple
objects as data
First we will create namedtuple user_row and than we will create a list of user_row objects.
from collections import namedtuple
user_row = namedtuple('user_row', 'dob age is_fan'.split())
data = [
user_row('1990-05-03', 29, True),
user_row('1994-09-23', 25, False)
]
Now we can create the data frame and inspect the inferred schema:
user_df = spark.createDataFrame(data)
user_df.printSchema()root
|-- dob: string (nullable = true)
|-- age: long (nullable = true)
|-- is_fan: boolean (nullable = true)
We can also inspect the data from the DataFrame:
user_df.show()+----------+---+------+
| dob|age|is_fan|
+----------+---+------+
|1990-05-03| 29| true|
|1994-09-23| 25| false|
+----------+---+------+
The problem with this method is that we don’t have full control on the inferred schema. Sometimes also the schema inference might fail.
Passing a list of dict
objects as data
data_list = [
('1990-05-03', 29, True),
('1994-09-23', 25, False)
]
data = [ {'dob': r[0], 'age': r[1], 'is_fan': r[2]} for r in data_list ]
user_df = spark.createDataFrame(data)
A deprecation message is shown.
C:\apps\spark-2.4.0-bin-hadoop2.7\python\pyspark\sql\session.py:346: UserWarning: inferring schema from dict is deprecated,please use pyspark.sql.Row instead
warnings.warn("inferring schema from dict is deprecated,"
Inspecting the schema:
user_df.printSchema()root
|-- age: long (nullable = true)
|-- dob: string (nullable = true)
|-- is_fan: boolean (nullable = true)
Inspecting the data:
user_df.show()+---+----------+------+
|age| dob|is_fan|
+---+----------+------+
| 29|1990-05-03| true|
| 25|1994-09-23| false|
+---+----------+------+
Passing a list of Row objects as data
There are two ways to construct Row
object:
- Create
Row
object directly. In this method column names are specified as parameter names:
Row(dob='1990-05-03', age=29, is_fan=True)
# Produces: Row(dob='1990-05-03', age=29, is_fan=True)
- Create
Row
object using row factory. With this method we first create a row factory and than we createRow
objects from it:
# Create row factory user_row
user_row = Row("dob", "age", "is_fan")
user_row('1990-05-03', 29, True)
# Produces: Row(dob='1990-05-03', age=29, is_fan=True)
We are using the second method — row factory.
from pyspark.sql import Row
user_row = Row("dob", "age", "is_fan")
data = [
user_row('1990-05-03', 29, True),
user_row('1994-09-23', 25, False)
]
Now we create the DataFrame and inspect it’s content:
user_df = spark.createDataFrame(data)
user_df.show()+---+----------+------+
|age| dob|is_fan|
+---+----------+------+
| 29|1990-05-03| true|
| 25|1994-09-23| false|
+---+----------+------+
Create pyspark DataFrame Specifying List of Column Names
When schema is specified as list of field names, the field types are inferred from data.
data = [
('1990-05-03', 29, True),
('1994-09-23', 25, False)
]
df = spark.createDataFrame(data, ['dob', 'age', 'is_fan'])
df.show()+---+----------+------+
|dob| age|is_fan|
+---+----------+------+
| 29|1990-05-03| true|
| 25|1994-09-23| false|
+---+----------+------+
This still supports other methods for passing data with named fields, e.g. list of namedtuple:
from collections import namedtuple
user_row = namedtuple("user_row", ("dob", "age", "is_fan"))
user_row.__new__.__defaults__ = (None, None, None)
data = [
user_row('1990-05-03', 29, True),
user_row('1994-09-23', 25)
]
df = spark.createDataFrame(data, ['dob', 'age', 'is_fan'])
df.show()
Create pyspark DataFrame Specifying Schema as StructType
With this method we first need to create schema object of StructType
and pass it as second argument to the createDataFrame
method of SparkSession
import pyspark.sql.types as st
data = [
('1990-05-03', 29, True),
('1994-09-23', 25, False)
]
user_schema = st.StructType([
st.StructField('dob', st.StringType(), True),
st.StructField('age', st.IntegerType(), True),
st.StructField('is_fan', st.BooleanType(), True)
])
user_df = spark.createDataFrame(data, user_schema)
user_df.show()+----------+---+------+
| dob|age|is_fan|
+----------+---+------+
|1990-05-03| 29| true|
|1994-09-23| 25| false|
+----------+---+------+
This method is quite verbose.
Create pyspark DataFrame Specifying Schema as datatype String
With this method the schema is specified as string. The string uses the same format as the string returned by the schema.simpleString()
method. The struct
and brackets can be omitted.
Following schema strings are interpreted equally:
"struct<dob:string, age:int, is_fan: boolean>"
"dob:string, age:int, is_fan: boolean"
data = [
('1990-05-03', 29, True),
('1994-09-23', 25, False)
]
user_schema = "dob:string, age:int, is_fan: boolean"
user_df = spark.createDataFrame(data, user_schema)
user_df.show()+----------+---+------+
| dob|age|is_fan|
+----------+---+------+
|1990-05-03| 29| true|
|1994-09-23| 25| false|
+----------+---+------+
Appendix A. Use JSON Schema
Create Schema from JSON String
First we need to parse the JSON string into python dictionary and than we can use StructType.fromJSON to create StructType object.
import json
import pyspark.sql.types as st
schema_json_str = """
{
"type": "struct",
"fields": [
{
"name": "dob",
"type": "string",
"nullable": true,
"metadata": {}
},
{
"name": "age",
"type": "integer",
"nullable": true,
"metadata": {}
},
{
"name": "is_fan",
"type": "boolean",
"nullable": true,
"metadata": {}
}
]
}
"""
# Parse JSON string into python dictionary
schema_dict = json.loads(schema_json_str)
# Create StructType from python dictionary
schema = st.StructType.fromJson(schema_dict)
data = [
('1990-05-03', 29, True),
('1994-09-23', 25, False)
]
user_df = spark.createDataFrame(data, schema)
user_df.show()
This article was originally posted at Create Spark DataFrame From Python Objects in pyspark.