Spark SQL
处理结构化信息的方法
SchemaRDD(dataFrame)
是在内部,SchemaRDD 可以利用结构信息更加高效地存储数据。更多操作,SQL。
使用Row
row[i]、row.column_name
注册临时表
registerTempTable()
缓存
hiveCtx.cacheTable("tableName")
dataSet
dataFrame是特殊的dataSet,即dataFrame是类型为Row的dataSet。
case class是一种类似于javaBean的东西,dataSet可用它来定义行,这样行的每一列都是定义类型的,这样就可以在编译时进行检查,防止在int那一列插入了String。
DataSet的API总是强类型的
三者转换
DataFrame/Dataset转RDD:
val rdd1=testDF.rdd
val rdd2=testDS.rdd
RDD转DataFrame:
import spark.implicits._
//一般用元组把一行的数据写在一起,然后在toDF中指定字段名
val testDF = rdd.map {line=>
(line._1,line._2)
}.toDF("col1","col2")
或者
//为每一行初始化了Row
employee = parts.map(lambda p: Row(name=p[0], salary=int(p[1])))
employee_temp = spark.createDataFrame(employee)
RDD转Dataset:
import spark.implicits._
//定义case class
case class Coltest(col1:String,col2:Int)extends Serializable //定义字段名和类型
val testDS = rdd.map {line=>
Coltest(line._1,line._2)
}.toDS
可以注意到,定义每一行的类型(case class)时,已经给出了字段名和类型,后面只要往case class里面添加值即可
Dataset转DataFrame:
import spark.implicits._
val testDF = testDS.toDF
DataFrame转Dataset:
import spark.implicits._
case class Coltest(col1:String,col2:Int)extends Serializable //定义字段名和类型
val testDS = testDF.as[Coltest]
这种方法就是在给出每一列的类型后,使用as方法,转成Dataset,这在数据类型是DataFrame又需要针对各个字段处理时极为方便
特别注意:
在使用一些特殊的操作时,一定要加上 import spark.implicits._
不然toDF、toDS无法使用