博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark中RDD转换成DataFrame的两种方式(分别用Java和Scala实现)
阅读量:5337 次
发布时间:2019-06-15

本文共 5695 字,大约阅读时间需要 18 分钟。

一:准备数据源

    在项目下新建一个student.txt文件,里面的内容为:

1,zhangsan,20  2,lisi,21  3,wanger,19  4,fangliu,18

 二:实现

     Java版:

    1.首先新建一个student的Bean对象,实现序列化和toString()方法,具体代码如下:

import java.io.Serializable;    @SuppressWarnings("serial")  public class Student implements Serializable {        String sid;      String sname;      int sage;      public String getSid() {          return sid;      }      public void setSid(String sid) {          this.sid = sid;      }      public String getSname() {          return sname;      }      public void setSname(String sname) {          this.sname = sname;      }      public int getSage() {          return sage;      }      public void setSage(int sage) {          this.sage = sage;      }      @Override      public String toString() {          return "Student [sid=" + sid + ", sname=" + sname + ", sage=" + sage + "]";      }    }

2.转换,具体代码如下

import java.util.ArrayList;    import org.apache.spark.SparkConf;  import org.apache.spark.api.java.JavaRDD;  import org.apache.spark.sql.Dataset;  import org.apache.spark.sql.Row;  import org.apache.spark.sql.RowFactory;  import org.apache.spark.sql.SaveMode;  import org.apache.spark.sql.SparkSession;  import org.apache.spark.sql.types.DataTypes;  import org.apache.spark.sql.types.StructField;  import org.apache.spark.sql.types.StructType;    public class TxtToParquetDemo {        public static void main(String[] args) {                    SparkConf conf = new SparkConf().setAppName("TxtToParquet").setMaster("local");          SparkSession spark = SparkSession.builder().config(conf).getOrCreate();            reflectTransform(spark);//Java反射          dynamicTransform(spark);//动态转换      }            /**      * 通过Java反射转换      * @param spark      */      private static void reflectTransform(SparkSession spark)      {          JavaRDD
source = spark.read().textFile("stuInfo.txt").javaRDD(); JavaRDD
rowRDD = source.map(line -> { String parts[] = line.split(","); Student stu = new Student(); stu.setSid(parts[0]); stu.setSname(parts[1]); stu.setSage(Integer.valueOf(parts[2])); return stu; }); Dataset
df = spark.createDataFrame(rowRDD, Student.class); df.select("sid", "sname", "sage"). coalesce(1).write().mode(SaveMode.Append).parquet("parquet.res"); } /** * 动态转换 * @param spark */ private static void dynamicTransform(SparkSession spark) { JavaRDD
source = spark.read().textFile("stuInfo.txt").javaRDD(); JavaRDD
rowRDD = source.map( line -> { String[] parts = line.split(","); String sid = parts[0]; String sname = parts[1]; int sage = Integer.parseInt(parts[2]); return RowFactory.create( sid, sname, sage ); }); ArrayList
fields = new ArrayList
(); StructField field = null; field = DataTypes.createStructField("sid", DataTypes.StringType, true); fields.add(field); field = DataTypes.createStructField("sname", DataTypes.StringType, true); fields.add(field); field = DataTypes.createStructField("sage", DataTypes.IntegerType, true); fields.add(field); StructType schema = DataTypes.createStructType(fields); Dataset
df = spark.createDataFrame(rowRDD, schema); df.coalesce(1).write().mode(SaveMode.Append).parquet("parquet.res1"); } }

 scala版本:

import org.apache.spark.sql.SparkSession  import org.apache.spark.sql.types.StringType  import org.apache.spark.sql.types.StructField  import org.apache.spark.sql.types.StructType  import org.apache.spark.sql.Row  import org.apache.spark.sql.types.IntegerType    object RDD2Dataset {        case class Student(id:Int,name:String,age:Int)    def main(args:Array[String])    {            val spark=SparkSession.builder().master("local").appName("RDD2Dataset").getOrCreate()      import spark.implicits._      reflectCreate(spark)      dynamicCreate(spark)    }       /**       * 通过Java反射转换       * @param spark       */    private def reflectCreate(spark:SparkSession):Unit={      import spark.implicits._      val stuRDD=spark.sparkContext.textFile("student2.txt")      //toDF()为隐式转换      val stuDf=stuRDD.map(_.split(",")).map(parts⇒Student(parts(0).trim.toInt,parts(1),parts(2).trim.toInt)).toDF()      //stuDf.select("id","name","age").write.text("result") //对写入文件指定列名      stuDf.printSchema()      stuDf.createOrReplaceTempView("student")      val nameDf=spark.sql("select name from student where age<20")      //nameDf.write.text("result") //将查询结果写入一个文件      nameDf.show()    }        /**       * 动态转换       * @param spark       */    private def dynamicCreate(spark:SparkSession):Unit={      val stuRDD=spark.sparkContext.textFile("student.txt")      import spark.implicits._      val schemaString="id,name,age"      val fields=schemaString.split(",").map(fieldName => StructField(fieldName, StringType, nullable = true))      val schema=StructType(fields)      val rowRDD=stuRDD.map(_.split(",")).map(parts⇒Row(parts(0),parts(1),parts(2)))      val stuDf=spark.createDataFrame(rowRDD, schema)          stuDf.printSchema()      val tmpView=stuDf.createOrReplaceTempView("student")      val nameDf=spark.sql("select name from student where age<20")      //nameDf.write.text("result") //将查询结果写入一个文件      nameDf.show()    }  }

  注:1.上面代码全都已经测试通过,测试的环境为spark2.1.0,jdk1.8。

         2.此代码不适用于spark2.0以前的版本。

转载于:https://www.cnblogs.com/itboys/p/9172780.html

你可能感兴趣的文章
oracle job
查看>>
Redis常用命令
查看>>
XML学习笔记(二)-- DTD格式规范
查看>>
IOS开发学习笔记026-UITableView的使用
查看>>
[转载]电脑小绝技
查看>>
windos系统定时执行批处理文件(bat文件)
查看>>
thinkphp如何实现伪静态
查看>>
BZOJ 2243: [SDOI2011]染色( 树链剖分 )
查看>>
BZOJ 1925: [Sdoi2010]地精部落( dp )
查看>>
c++中的string常用函数用法总结!
查看>>
界面交互之支付宝生活圈pk微信朋友圈
查看>>
[DLX精确覆盖+打表] hdu 2518 Dominoes
查看>>
SuperMap iServerJava 6R扩展领域开发及压力测试---判断点在那个面内(1)
查看>>
Week03-面向对象入门
查看>>
一个控制台程序,模拟机器人对话
查看>>
web.xml 中加载顺序
查看>>
pycharm激活地址
查看>>
hdu 1207 四柱汉诺塔
查看>>
Vue 2.x + Webpack 3.x + Nodejs 多页面项目框架(上篇——纯前端多页面)
查看>>
display:none与visible:hidden的区别
查看>>