본 페이지는 Apache Spark 관련 Tip에 대해 정리한 내용을 제공합니다.
1 - DataFrame에서 Column 유무 체크 방법
DafaFrame에서 Column 유무 검사
// Example 1
val schema = StructType( Array(
StructField("language", StringType,true),
StructField("users", IntegerType,true)
))
val rowData= Seq(Row("Java", 20000),
Row("Python", 100000),
Row("Scala", 3000))
var df = spark.createDataFrame(rowData,schema)
df.printSchema()
// root
// |-- language: string (nullable = true)
// |-- users: integer (nullable = true)
Spark DataFrame의 columns attribute는 모든 column 이름을 Array[String] 타입으로 반환합니다. Column의 유무를 check하기 위해 contains() 함수를 이용해서 column의 유무를 확인 할 수 있습니다.
val columnName = "users"
if(df.columns.contains(columnName))
println("column exists")
else
println("column not exists")
중첩된 구조의 DataFrame에서 Column 유무 검사
// Example2
[{
"manufacturer": "hyundai",
"model":[
{
"name" : "2022 Genesis GV70 ",
"release-date" : "2021. 12. 8",
"price" : "6,000"
},
{
"name" : "2022 Genesis G90 ",
"release-date" : "2021. 12. 19",
"price" : "9,300"
}
]
},
{
"manufacturer": "bmw",
"model":[
{
"name" : "2023 i7 ",
"release-date" : "-",
"price" : "-"
},
{
"name" : "2023 x5 ",
"release-date" : "2022",
"price" : "12,420"
}
]
},
{
"manufacturer": "benz",
"model":[
{
"name" : "2022 AMG GT 4-door coupe ",
"release-date" : "2022. 3. 25",
"price" : "16,960"
},
{
"name" : "2022 C Class ",
"release-date" : "2022. 4. 01",
"price" : "6,800"
}
]
}]
위와 같은 중첩된 구조의 Json 형식의 데이터를 DataFrame으로 구성했을 경우 Column의 유무를 검색하는 방법을 알아봅니다.
val jsonStr =
"""
|[{
| "manufacturer": "hyundai",
| "model":[
| {
| "name" : "2022 Genesis GV70 ",
| "release-date" : "2021. 12. 8",
| "price" : "6,000"
| },
| {
| "name" : "2022 Genesis G90 ",
| "release-date" : "2021. 12. 19",
| "price" : "9,300"
| }
| ]
|},
|{
| "manufacturer": "bmw",
| "model":[
| {
| "name" : "2023 i7 ",
| "release-date" : "-",
| "price" : "-"
| },
| {
| "name" : "2023 x5 ",
| "release-date" : "2022",
| "price" : "12,420"
| }
|
| ]
|},
|{
| "manufacturer": "benz",
| "model":[
| {
| "name" : "2022 AMG GT 4-door coupe ",
| "release-date" : "2022. 3. 25",
| "price" : "16,960"
| },
| {
| "name" : "2022 C Class ",
| "release-date" : "2022. 4. 01",
| "price" : "6,800"
| }
|
| ]
|}]
|""".stripMargin
val df = sqlContext.read.json(Seq(jsonStr).toDS())
df.printSchema()
df.show(false)
root
|-- manufacturer: string (nullable = true)
|-- model: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- name: string (nullable = true)
| | |-- price: string (nullable = true)
| | |-- release-date: string (nullable = true)
+------------+---------------------------------------------------------------------------------------+
|manufacturer|model |
+------------+---------------------------------------------------------------------------------------+
|hyundai |[[2022 Genesis GV70 , 6,000, 2021. 12. 8], [2022 Genesis G90 , 9,300, 2021. 12. 19]] |
|bmw |[[2023 i7 , -, -], [2023 x5 , 12,420, 2022]] |
|benz |[[2022 AMG GT 4-door coupe , 16,960, 2022. 3. 25], [2022 C Class , 6,800, 2022. 4. 01]]|
+------------+---------------------------------------------------------------------------------------+
df.columns은 중첩된 구조에서 내부 컬럼을 반환하지 않습니다. 그렇기 때문에 DataFrame이 중첩된 구조를 가질 경우 df.schema.simpleString()을 이용하여 중첩 구조의 스키마 컬럼을 검사할 수 있습니다.
df.columns
// Array[String] = Array(manufacturer, model)
df.schema.simpleString()
// String = struct<manufacturer:string,model:array<struct<name:string,price:string,release-date:string>>>
df.schema.simpleString.contains("release-date:")
// true
DataFrame에서 Field 유무 검사
동일한 데이터 유형을 가진 열이 있는지 확인하려면 스파크 스키마 함수 df.schema.fieldNames 또는 df.schema.contain() 함수를 사용합니다.
import org.apache.spark.sql.types.{StringType, StructField}
df.schema.fieldNames.contains("manufacturer")
df.schema.contains(StructField("manufacturer",StringType,true))
2 - Dataframe 에서 그룹 별 첫 번째 행 추출하는 방법
SparkSQL의 window 함수를 이용해 DataFrame 에서 각 그룹별 최상위 행을 추출하는 방법을 설명합니다.
테스트 셋 준비
val testData = Seq(
("Nike Dunk Low Retro Black", "Nike", "179,000"),
("Nike x Off-White Air Force 1 Mid SP White", "Nike", "234,000"),
("Nike Air Force 1 '07 Low White", "Nike", "127,000"),
("Nike Dunk Low Retro Valerian Blue", "Nike", "166,000"),
("Adidas Yeezy Foam RNNR Onyx", "Adidas", "190,000"),
("Adidas Yeezy Foam RNNR Stone Sage", "Adidas", "197,000"),
("Adidas Yeezy Boost 350 V2 Onyx", "Adidas", "322,000"),
("Adidas Yeezy Slide Pure - Re-Release Ver.", "Adidas", "161,000"),
("Puma x Maison Kitsune Roma White", "Puma", "200,000"),
("Puma x Rick and Morty MB.01 Jasmine Green Energy Rose", "Puma", "319,000"),
("Puma x Ader Error Vaderon White", "Puma", "277,000")
)
import spark.implicits._
val df = testData.toDF("model", "brand", "price")
df.show()
// +--------------------+------+-------+
// | model| brand| price|
// +--------------------+------+-------+
// |Nike Dunk Low Ret...| Nike|179,000|
// |Nike x Off-White ...| Nike|234,000|
// |Nike Air Force 1 ...| Nike|127,000|
// |Nike Dunk Low Ret...| Nike|166,000|
// |Adidas Yeezy Foam...|Adidas|190,000|
// |Adidas Yeezy Foam...|Adidas|197,000|
// |Adidas Yeezy Boos...|Adidas|322,000|
// |Adidas Yeezy Slid...|Adidas|161,000|
// |Puma x Maison Kit...| Puma|200,000|
// |Puma x Rick and M...| Puma|319,000|
// |Puma x Ader Error...| Puma|277,000|
// +--------------------+------+-------+
위 테스트 셋은 스포츠웨어 브랜드의 모델별 가격을 정리한 데이터 셋으로 각 브랜드 별 최고 모델에 대해 추출해 봅니다.
‘brand’ 그룹 별 ‘price’ 가 가장 높은 행 추출
import org.apache.spark.sql.expressions.Window
val window = Window.partitionBy("brand").orderBy(col("price").desc)
val extractedDF = df.withColumn("row", row_number.over(window)).
where(col("row").equalTo(1)).drop("row")
extractedDF.show()
// +--------------------+------+-------+
// | model| brand| price|
// +--------------------+------+-------+
// |Nike x Off-White ...| Nike|234,000|
// |Puma x Rick and M...| Puma|319,000|
// |Adidas Yeezy Boos...|Adidas|322,000|
// +--------------------+------+-------+
먼저 ‘brand’ 그룹을 지정하고 ‘price’ 컬럼에 대해 오름차순으로 정리하는 windowSpec을 생성합니다. DataFrame에서 window 함수인 row_number를 이용하여 새로운 컬럼 ‘row’를 추가합니다. where 조건을 통해 ‘row’ == 1 조건의 행만 추출한 후, “row” 컬럼을 삭제합니다.