Create DataFrame From Python Objects in pyspark

Ivan Georgiev
5 min readSep 13, 2019

--

Working in pyspark we often need to create DataFrame directly from python lists and objects. Scenarios include, but not limited to: fixtures for Spark unit testing, creating DataFrame from data loaded from custom data sources, converting results from python computations (e.g. Pandas, scikitlearn, etc.) to Spark DataFrame. SparkSession, as explained in Create Spark DataFrame From Python Objects in pyspark, provides convenient method createDataFrame for creating Spark DataFrames. The method accepts following parameters:

  • data — RDD of any kind of SQL data representation, or list, or pandas.DataFrame.
  • schema — the schema of the DataFrame. Accepts DataType, datatype string, list of strings or None.
  • samplingRatio — sampling ratio of rows used when inferring the schema.
  • verifySchema — if set to True each row is verified against the schema.

We can specify schema using different approaches:

  • When schema is None the schema (column names and column types) is inferred from the data, which should be RDD or list of Row, namedtuple, or dict.
  • When schema is a list of column names, the type of each column is inferred from data.
  • When schema is a DataType or datatype string, it must match the real data.

Let’s see how the createDataFrame method works in some scenarios. We will use python list as data parameter. RDDs and Pandas DataFrame we are leaving for later.

Create pyspark DataFrame Without Specifying Schema

When schema is not specified, Spark tries to infer the schema from the actual data, using the provided sampling ratio. Column names are inferred from the data as well.

Passing a list of namedtuple objects as data

First we will create namedtuple user_row and than we will create a list of user_row objects.

from collections import namedtuple

user_row = namedtuple('user_row', 'dob age is_fan'.split())
data = [
user_row('1990-05-03', 29, True),
user_row('1994-09-23', 25, False)
]

Now we can create the data frame and inspect the inferred schema:

user_df = spark.createDataFrame(data)
user_df.printSchema()
root
|-- dob: string (nullable = true)
|-- age: long (nullable = true)
|-- is_fan: boolean (nullable = true)

We can also inspect the data from the DataFrame:

user_df.show()+----------+---+------+
| dob|age|is_fan|
+----------+---+------+
|1990-05-03| 29| true|
|1994-09-23| 25| false|
+----------+---+------+

The problem with this method is that we don’t have full control on the inferred schema. Sometimes also the schema inference might fail.

Passing a list of dict objects as data

data_list = [
('1990-05-03', 29, True),
('1994-09-23', 25, False)
]

data = [ {'dob': r[0], 'age': r[1], 'is_fan': r[2]} for r in data_list ]

user_df = spark.createDataFrame(data)

A deprecation message is shown.

C:\apps\spark-2.4.0-bin-hadoop2.7\python\pyspark\sql\session.py:346: UserWarning: inferring schema from dict is deprecated,please use pyspark.sql.Row instead
warnings.warn("inferring schema from dict is deprecated,"

Inspecting the schema:

user_df.printSchema()root
|-- age: long (nullable = true)
|-- dob: string (nullable = true)
|-- is_fan: boolean (nullable = true)

Inspecting the data:

user_df.show()+---+----------+------+
|age| dob|is_fan|
+---+----------+------+
| 29|1990-05-03| true|
| 25|1994-09-23| false|
+---+----------+------+

Passing a list of Row objects as data

There are two ways to construct Row object:

  • Create Row object directly. In this method column names are specified as parameter names:
Row(dob='1990-05-03', age=29, is_fan=True)
# Produces: Row(dob='1990-05-03', age=29, is_fan=True)
  • Create Row object using row factory. With this method we first create a row factory and than we create Row objects from it:
# Create row factory user_row
user_row = Row("dob", "age", "is_fan")

user_row('1990-05-03', 29, True)
# Produces: Row(dob='1990-05-03', age=29, is_fan=True)

We are using the second method — row factory.

from pyspark.sql import Row

user_row = Row("dob", "age", "is_fan")

data = [
user_row('1990-05-03', 29, True),
user_row('1994-09-23', 25, False)
]

Now we create the DataFrame and inspect it’s content:

user_df = spark.createDataFrame(data)
user_df.show()
+---+----------+------+
|age| dob|is_fan|
+---+----------+------+
| 29|1990-05-03| true|
| 25|1994-09-23| false|
+---+----------+------+

Create pyspark DataFrame Specifying List of Column Names

When schema is specified as list of field names, the field types are inferred from data.

data = [
('1990-05-03', 29, True),
('1994-09-23', 25, False)
]
df = spark.createDataFrame(data, ['dob', 'age', 'is_fan'])
df.show()
+---+----------+------+
|dob| age|is_fan|
+---+----------+------+
| 29|1990-05-03| true|
| 25|1994-09-23| false|
+---+----------+------+

This still supports other methods for passing data with named fields, e.g. list of namedtuple:

from collections import namedtuple

user_row = namedtuple("user_row", ("dob", "age", "is_fan"))
user_row.__new__.__defaults__ = (None, None, None)

data = [
user_row('1990-05-03', 29, True),
user_row('1994-09-23', 25)
]

df = spark.createDataFrame(data, ['dob', 'age', 'is_fan'])
df.show()

Create pyspark DataFrame Specifying Schema as StructType

With this method we first need to create schema object of StructType and pass it as second argument to the createDataFrame method of SparkSession

import pyspark.sql.types as st

data = [
('1990-05-03', 29, True),
('1994-09-23', 25, False)
]

user_schema = st.StructType([
st.StructField('dob', st.StringType(), True),
st.StructField('age', st.IntegerType(), True),
st.StructField('is_fan', st.BooleanType(), True)
])

user_df = spark.createDataFrame(data, user_schema)
user_df.show()
+----------+---+------+
| dob|age|is_fan|
+----------+---+------+
|1990-05-03| 29| true|
|1994-09-23| 25| false|
+----------+---+------+

This method is quite verbose.

Create pyspark DataFrame Specifying Schema as datatype String

With this method the schema is specified as string. The string uses the same format as the string returned by the schema.simpleString() method. The struct and brackets can be omitted.

Following schema strings are interpreted equally:

  • "struct<dob:string, age:int, is_fan: boolean>"
  • "dob:string, age:int, is_fan: boolean"
data = [
('1990-05-03', 29, True),
('1994-09-23', 25, False)
]

user_schema = "dob:string, age:int, is_fan: boolean"

user_df = spark.createDataFrame(data, user_schema)
user_df.show()
+----------+---+------+
| dob|age|is_fan|
+----------+---+------+
|1990-05-03| 29| true|
|1994-09-23| 25| false|
+----------+---+------+

Appendix A. Use JSON Schema

Create Schema from JSON String

First we need to parse the JSON string into python dictionary and than we can use StructType.fromJSON to create StructType object.

import json
import pyspark.sql.types as st

schema_json_str = """
{
"type": "struct",
"fields": [
{
"name": "dob",
"type": "string",
"nullable": true,
"metadata": {}
},
{
"name": "age",
"type": "integer",
"nullable": true,
"metadata": {}
},
{
"name": "is_fan",
"type": "boolean",
"nullable": true,
"metadata": {}
}
]
}
"""

# Parse JSON string into python dictionary
schema_dict = json.loads(schema_json_str)

# Create StructType from python dictionary
schema = st.StructType.fromJson(schema_dict)


data = [
('1990-05-03', 29, True),
('1994-09-23', 25, False)
]

user_df = spark.createDataFrame(data, schema)
user_df.show()

This article was originally posted at Create Spark DataFrame From Python Objects in pyspark.

--

--

No responses yet