objectTest_DF_DS_RDD_Speed{ defmain(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder().appName("无测试").master("local").getOrCreate() spark.sparkContext.setLogLevel("ERROR")
val firstRdd: RDD[(String, Int)] = spark.sparkContext.parallelize(0 to 400000).map(num => { (UUID.randomUUID().toString, num) }) firstRdd firstRdd.cache()
val beginTimeRdd: Long = System.currentTimeMillis() firstRdd.map(tp=>{tp._1+"-"+tp._2}).collect() val endTimeRdd: Long = System.currentTimeMillis()
import spark.implicits._ val beginTimeDF: Long = System.currentTimeMillis() firstRdd.toDF().map(row=>{row.get(0)+"-"+row.get(1)}).collect() val endTimeDF: Long = System.currentTimeMillis()
val beginTimeDS: Long = System.currentTimeMillis() firstRdd.toDS().map(tp=>{tp._1+"-"+tp._2}).collect() val endTimeDS: Long = System.currentTimeMillis()
objectSparkParseFlowJsonTest2{ val spark = newSparkSession.Builder() .appName("sql") .master("local[3]") .getOrCreate() import spark.implicits._
val mapType: mutable.Map[String, String] = Map()
defmain(args: Array[String]): Unit = { valCONFIG_MAP = readConfig() val dataset = spark.read.textFile(s"json文件地址") valCONFIG_LIST:List[String] = CONFIG_MAP.keySet.toList //print(CONFIG_MAP)
val dataset2 = dataset.map(re => { val mapJson: mutable.Map[String, String] = handleMessage2CaseClass(re) var arrayData:ArrayBuffer[String] = ArrayBuffer() for ( key <- CONFIG_LIST ){ var valueOption = mapJson.get(key) var value = "" if (valueOption != None){ value=valueOption.get if (key.equalsIgnoreCase("_timestamp")){ val formatter = newSimpleDateFormat("dd/MMM/yyyy:hh:mm:ss Z", Locale.ENGLISH) val formatStr = newSimpleDateFormat("yyyy-MM-dd HH:mm:ss"); value = formatStr.format(formatter.parse(value)) } } arrayData += value } arrayData }) spark.udf.register("strToMap", (field: String) => strToMapUDF(field)) var lb: ListBuffer[String] = ListBuffer() for( i <- 0 to CONFIG_LIST.length-1){ var keyname=CONFIG_MAP.get(CONFIG_LIST.get(i)).get val str = mapType.get(keyname).toString.replace("Some(","").replace(")","") if (str.equalsIgnoreCase("Map")){ lb = lb :+ f"strToMap(value[$i]) as $keyname" }else{ lb = lb :+ f"cast(value[$i] as ${str}) as $keyname" }
} val frame: DataFrame = dataset2.selectExpr(lb: _*) frame.show() frame.printSchema()
} //自定义udf string to map defstrToMapUDF(field: String): Map[String,String] = { val mapJson2: mutable.Map[String, String] = Map() val strings = field.split(",") for (i <- 0 until strings.length){ mapJson2.put(strings(i).split(":")(0),strings(i).split(":")(1)) } mapJson2 }
//josnToMap defhandleMessage2CaseClass(jsonStr: String): Map[String,String] = { val mapJson: mutable.Map[String, String] = Map() val parser = newJsonParser val element = parser.parse(jsonStr) if(element.isJsonObject) { val jsonObject: JsonObject = element.getAsJsonObject() val set = jsonObject.entrySet() val ite= set.iterator() while (ite.hasNext){ val el = ite.next() val key: String = el.getKey val value = el.getValue.toString.replace("\"","") mapJson.put(key,value) } } mapJson }
//readConfig defreadConfig(): Map[String,String] ={ val mapAll: mutable.Map[String, String] = Map() val data: Dataset[String] = spark.read.textFile("配置文件地址") val stringses: Array[Array[String]] = data.map(x => x.split(",") ).collect()
val iterator: Iterator[Array[String]] = stringses.iterator while(iterator.hasNext){ val arr: Array[String] = iterator.next() mapAll.put(arr(1),arr(0)) mapType.put(arr(0),arr(2)) } mapAll } }