java
1 public class Demo {
2 private static SparkConf conf = new SparkConf().setAppName(\"demo\").setMaster(\"local\"); 3 private static JavaSparkContext jsc = new JavaSparkContext(conf); 4 private static SparkSession session = new SparkSession(jsc.sc()); 5
6 public static void main(String[] args) { 7
8 // 加载students.json name,score
9 Dataset 11 score.createOrReplaceTempView(\"scoreView\");12 13 // name,score 14 JavaRDD 16 // 创建信息json name,age 17 JavaRDD 20 Dataset 23 // 拼接sql 24 List 26 String sql = \"select * from infoView where name in (\";27 for (int i = 0; i < scoreList.size(); i++) { 28 sql += \"'\" + scoreList.get(i).getAs(\"name\") + \"'\";29 if (i < scoreList.size() - 1) {30 sql += \31 }32 }33 34 sql += \")\";35 36 // 查询 分数>80的学⽣的name,age37 38 // 转换 39 JavaPairRDD 42 private static final long serialVersionUID = 1L;43 44 @Override 45 public Tuple2 46 return new Tuple2 50 JavaPairRDD 52 private static final long serialVersionUID = 1L;53 @Override 55 public Tuple2 56 return new Tuple2 60 // join 61 JavaPairRDD 63 // 遍历 resultRDD.foreach(new VoidFunction 66 private static final long serialVersionUID = 1L;67 68 @Override 69 public void call(Tuple2 74 // 保存为json格式 75 StructType schema = DataTypes 76 .createStructType(Arrays.asList(DataTypes.createStructField(\"name\false),77 DataTypes.createStructField(\"age\false),78 DataTypes.createStructField(\"score\false)));79 80 JavaRDD 82 private static final long serialVersionUID = 1L;83 84 @Override 85 public Row call(Tuple2 86 return RowFactory.create(v1._1, Integer.valueOf(v1._2._1), Integer.valueOf(v1._2._2));87 }88 }); 90 Dataset 92 resultDS.write().format(\"json\").mode(SaveMode.Append).save(\"./src/main/java/cn/tele/spark_sql/json/result\");93 94 session.stop();95 jsc.close();96 }97 }scala 1 object Demo { 2 def main(args: Array[String]): Unit = { 3 val conf = new SparkConf().setAppName(\"demo\").setMaster(\"local\") 4 val sc = new SparkContext(conf) 5 val sqlContext = new SQLContext(sc) 6 7 //加载score 信息 8 val scoreDF = sqlContext.read.json(\"./src/main/scala/cn/tele/spark_sql/json/students.json\") 9 10 scoreDF.createOrReplaceTempView(\"scoreView\")11 12 val arr = sqlContext.sql(\"select * from scoreView where score > 80\").rdd.collect()13 14 //创建 学⽣信息 15 val infoRDD = sc.parallelize(Array(16 \"{\\\"name\\\":\\\"Leo\\\17 \"{\\\"name\\\":\\\"Marry\\\18 \"{\\\"name\\\":\\\"Jack\\\19 20 val infoDS = sqlContext.read.json(infoRDD)21 22 infoDS.createOrReplaceTempView(\"infoView\")23 24 var sql = \"select * from infoView where name in (\"25 //拼接sql 26 for (i <- 0 to arr.length - 1) { 27 sql += \"'\" + arr(i).getAs[String](\"name\") + \"'\"28 if (i < arr.length - 1) {29 sql += \30 }31 } 32 33 sql += \")\"34 35 val tempRDD = sqlContext.sql(sql).rdd.map(row => { 36 (row.getAs[String](\"name\"), row.getAs[Long](\"age\").toInt)37 })38 39 val tempRDD2 = scoreDF.rdd.map(row => { 40 (row.getAs[String](\"name\"), row.getAs[Long](\"score\").toInt)41 })42 43 //join 44 val resultRDD = tempRDD.join(tempRDD2)45 46 //遍历 47 resultRDD.foreach(t => { 48 println(\"name:\" + t._1 + \"age:\" + t._2._1 + \"score:\" + t._2._2)49 })50 51 val rowRDD = resultRDD.map(t => Row(t._1, t._2._1, t._2._2))52 53 //保存为json⽂件 val schema = DataTypes.createStructType(Array(55 StructField(\"name\false),56 StructField(\"age\false),57 StructField(\"score\false)))58 59 val df = sqlContext.createDataFrame(rowRDD, schema)60 61 df.write.format(\"json\").mode(SaveMode.Append).save(\"./src/main/scala/cn/tele/spark_sql/json/result\")62 }63 } 因篇幅问题不能全部显示,请点此查看更多更全内容
Copyright © 2019- oldu.cn 版权所有 浙ICP备2024123271号-1
违法及侵权请联系:TEL:199 1889 7713 E-MAIL:2724546146@qq.com
本站由北京市万商天勤律师事务所王兴未律师提供法律服务