您好,欢迎来到年旅网。
搜索
您的当前位置:首页sparksqljson合并json数据

sparksqljson合并json数据

来源:年旅网
sparksqljson合并json数据

java

1 public class Demo {

2 private static SparkConf conf = new SparkConf().setAppName(\"demo\").setMaster(\"local\"); 3 private static JavaSparkContext jsc = new JavaSparkContext(conf); 4 private static SparkSession session = new SparkSession(jsc.sc()); 5

6 public static void main(String[] args) { 7

8 // 加载students.json name,score

9 Dataset score = session.read().json(\"./src/main/java/cn/tele/spark_sql/json/students.json\");10

11 score.createOrReplaceTempView(\"scoreView\");12

13 // name,score

14 JavaRDD scoreRDD = session.sql(\"select * from scoreView where score > 80\").javaRDD();15

16 // 创建信息json name,age

17 JavaRDD infoRDD = jsc.parallelize(Arrays.asList(\"{\\\"name\\\":\\\"Leo\\\18 \"{\\\"name\\\":\\\"Marry\\\19

20 Dataset info = session.read().json(infoRDD);21 info.createOrReplaceTempView(\"infoView\");22

23 // 拼接sql

24 List scoreList = scoreRDD.collect();25

26 String sql = \"select * from infoView where name in (\";27 for (int i = 0; i < scoreList.size(); i++) {

28 sql += \"'\" + scoreList.get(i).getAs(\"name\") + \"'\";29 if (i < scoreList.size() - 1) {30 sql += \31 }32 }33

34 sql += \")\";35

36 // 查询 分数>80的学⽣的name,age37

38 // 转换

39 JavaPairRDD tempRDD = session.sql(sql).javaRDD()40 .mapToPair(new PairFunction() {41

42 private static final long serialVersionUID = 1L;43

44 @Override

45 public Tuple2 call(Row t) throws Exception {

46 return new Tuple2(t.getAs(\"name\"), Integer.valueOf(t.getAs(\"age\").toString()));47 }48 });49

50 JavaPairRDD scoreRDD2 = scoreRDD.mapToPair(new PairFunction() {51

52 private static final long serialVersionUID = 1L;53

@Override

55 public Tuple2 call(Row t) throws Exception {

56 return new Tuple2(t.getAs(\"name\"), Integer.valueOf(t.getAs(\"score\").toString()));57 }58 });59

60 // join

61 JavaPairRDD> resultRDD = tempRDD.join(scoreRDD2);62

63 // 遍历

resultRDD.foreach(new VoidFunction>>() {65

66 private static final long serialVersionUID = 1L;67

68 @Override

69 public void call(Tuple2> t) throws Exception {70 System.out.println(\"name:\" + t._1 + \71 }72 });73

74 // 保存为json格式

75 StructType schema = DataTypes

76 .createStructType(Arrays.asList(DataTypes.createStructField(\"name\false),77 DataTypes.createStructField(\"age\false),78 DataTypes.createStructField(\"score\false)));79

80 JavaRDD rowRDD = resultRDD.map(new Function>, Row>() {81

82 private static final long serialVersionUID = 1L;83

84 @Override

85 public Row call(Tuple2> v1) throws Exception {

86 return RowFactory.create(v1._1, Integer.valueOf(v1._2._1), Integer.valueOf(v1._2._2));87 }88 });

90 Dataset resultDS = session.createDataFrame(rowRDD, schema);91

92 resultDS.write().format(\"json\").mode(SaveMode.Append).save(\"./src/main/java/cn/tele/spark_sql/json/result\");93

94 session.stop();95 jsc.close();96 }97 }scala

1 object Demo {

2 def main(args: Array[String]): Unit = {

3 val conf = new SparkConf().setAppName(\"demo\").setMaster(\"local\") 4 val sc = new SparkContext(conf)

5 val sqlContext = new SQLContext(sc) 6

7 //加载score 信息

8 val scoreDF = sqlContext.read.json(\"./src/main/scala/cn/tele/spark_sql/json/students.json\") 9

10 scoreDF.createOrReplaceTempView(\"scoreView\")11

12 val arr = sqlContext.sql(\"select * from scoreView where score > 80\").rdd.collect()13

14 //创建 学⽣信息

15 val infoRDD = sc.parallelize(Array(16 \"{\\\"name\\\":\\\"Leo\\\17 \"{\\\"name\\\":\\\"Marry\\\18 \"{\\\"name\\\":\\\"Jack\\\19

20 val infoDS = sqlContext.read.json(infoRDD)21

22 infoDS.createOrReplaceTempView(\"infoView\")23

24 var sql = \"select * from infoView where name in (\"25 //拼接sql

26 for (i <- 0 to arr.length - 1) {

27 sql += \"'\" + arr(i).getAs[String](\"name\") + \"'\"28 if (i < arr.length - 1) {29 sql += \30 }31 }

32

33 sql += \")\"34

35 val tempRDD = sqlContext.sql(sql).rdd.map(row => {

36 (row.getAs[String](\"name\"), row.getAs[Long](\"age\").toInt)37 })38

39 val tempRDD2 = scoreDF.rdd.map(row => {

40 (row.getAs[String](\"name\"), row.getAs[Long](\"score\").toInt)41 })42

43 //join

44 val resultRDD = tempRDD.join(tempRDD2)45

46 //遍历

47 resultRDD.foreach(t => {

48 println(\"name:\" + t._1 + \"age:\" + t._2._1 + \"score:\" + t._2._2)49 })50

51 val rowRDD = resultRDD.map(t => Row(t._1, t._2._1, t._2._2))52

53 //保存为json⽂件

val schema = DataTypes.createStructType(Array(55 StructField(\"name\false),56 StructField(\"age\false),57 StructField(\"score\false)))58

59 val df = sqlContext.createDataFrame(rowRDD, schema)60

61 df.write.format(\"json\").mode(SaveMode.Append).save(\"./src/main/scala/cn/tele/spark_sql/json/result\")62 }63 }

因篇幅问题不能全部显示,请点此查看更多更全内容

Copyright © 2019- oldu.cn 版权所有 浙ICP备2024123271号-1

违法及侵权请联系:TEL:199 1889 7713 E-MAIL:2724546146@qq.com

本站由北京市万商天勤律师事务所王兴未律师提供法律服务