spark SQL数据处理案例----时序数据抽取合并

主函数:SparkReadTimeSeriesData
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
package com.hollysys.utils

import com.alibaba.fastjson.JSON
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StructField, _}
import java.util

import scala.collection.mutable

/**
* Created by Dingzhenying on 2018/8/2315:20
*/
object SparkReadTimeSeriesData {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("sparkReadParquet")
// .master("local")
.getOrCreate()
var params = JSON.parseObject(args(0))
//params = JSON.parseObject("{\"endTime\":\"\",\"fileType\":\"parquet\",\"input\":\"hdfs://cdh-master:8020/user/dzy/parquet\",\"namespace\":\"000000\",\"output\":\"hdfs://cdh-master:8020/hiaAnalyticsService/000001/hiaDataFiles/TimeSeriesDataTable/20180912154057_d152a9019dbd4357a550953b890b660f.parquet\",\"points\":\"000000000,000001000,000019000,010000000,011000000,002000000\",\"startTime\":\"\",\"tableId\":\"402881ad65ccb8c30165ccb9eb450000\",\"jdbcUrl\":\"jdbc:mysql://192.168.66.162:3306/hiacloud-analytics-service-test\",,\"username\":\"root\",,\"password\":\"123456\",,\"driverClassName\":\"com.mysql.jdbc.Driver\"}")

val input: String = params.getString("input")//输入路径
val output: String = params.getString("output")//输出路径
val namespace: String = params.getString("namespace")//表空间
val points: String = params.getString("points")//查询节点
val idList = points.split(",").sorted//转换的节点信息
val startTime: String = params.getString("startTime")//查询的起始时间
val endTime: String = params.getString("endTime")//查询的结束时间
val tableId: String = params.getString("tableId")//数据表ID
//-------------------------------------数据库相关信息-------------------------------------------------
val jdbcUrl=params.getString("jdbcUrl")
val username=params.getString("username")
val password=params.getString("password")
val driverClassName=params.getString("driverClassName")

import spark.implicits._
//scheam类型自动推导转换
// sql.setConf("spark.sql.parquet.binaryAsString", "true")
val datas = spark.read.option("header", value = true).parquet(input)
//将nodes中的数据数据拆分为多行(explode)
val dfDates = datas.select($"namespace", $"time", explode(datas("nodes"))).toDF("namespace", "time", "node")
//嵌套数据集node拆分为多列
val splitDataInfo = dfDates
.select(
$"namespace".cast(StringType),
$"time".cast(LongType),
$"node.id".cast(StringType),
$"node.v".cast(StringType).substr(0, 1).as("type"),
$"node.v".cast(
StringType
).substr(3, 10).as("v"),
dfDates("node.t").as("idTime"),
$"node.s"
)
.filter($"id".isin(idList: _*))
.filter($"namespace" === namespace)
if (null != startTime || startTime.length != 0) {
splitDataInfo.filter($"idTime" <= startTime)
}
if (null != endTime || endTime.length != 0) splitDataInfo.filter($"idTime" >= endTime)
//所需数据的数据集
val endTable = splitDataInfo.groupBy($"namespace", $"idTime").pivot("id", idList).agg(collect_list($"v")(0))
//个节点数据类型的数据集
val typeTable = splitDataInfo.groupBy($"namespace").pivot("id", idList).agg(collect_list($"type")(0)).filter($"namespace" === namespace)
//创建列头信息(listMap)--保存数据库
val tableMapScheam = new util.LinkedHashMap[String, DataType]()
tableMapScheam.put("namespace", StringType)
tableMapScheam.put("idTime", LongType)
//获取scheam数据类型,更新数据库字段
val idsScheam = creatHeadInfo(typeTable)
tableMapScheam.putAll(idsScheam)
//更新schema_info字段信息(JSON格式)
val schema_info = headTypeJson(tableMapScheam)
//创建scheam
val tableScheam = creatScheam(tableMapScheam)
//根据sceam创建datafream并排序
val dataShow = spark.createDataFrame(endTable.toJavaRDD, tableScheam).orderBy("idTime")
dataShow.printSchema()
dataShow.show(false)
//更新数据库数据
val numberofRow = dataShow.count()
val numberofcolumns = dataShow.schema.size
println("行数:" + numberofRow + "---列数:" + numberofcolumns)
try {
val conn =JdbcUtil.getConnectionPool(driverClassName ,jdbcUrl,username,password)
val sql = "UPDATE data_table SET num_of_rows = "+numberofRow+" " +
",num_of_columns= "+numberofcolumns+" " +
",schema_info= '"+schema_info+"'" +
" WHERE id = '"+tableId+"'"
val stmt = conn.createStatement
stmt.executeUpdate(sql)
println(sql)
JdbcUtil.returnConnectionPool(conn)
} catch {
case ex: Exception =>
ex.printStackTrace()
println("spark数据更新异常:" + ex)
}
//保存数据集到HDFSparquet文件中
creatToOneFile(output, dataShow)
// 查询数据表的相关更新信息
val jdbcDF = spark.read.format("jdbc").option("url", jdbcUrl).option("driver", driverClassName).option("user", username).option("password", password ).option("dbtable", "data_table").load()
jdbcDF.select("id", "num_of_columns", "num_of_rows", "schema_info").filter($"id"===tableId).show(false)
spark.stop()
}
/**
* 保存到HDFS(HDFS)
*
* @param output 保存路径
* @param data 数据集
*/
def creatToOneFile(output: String, data: DataFrame): Unit = {
//保存数据存储HDFS文件
val sparkpath = output + "_sparkData"
data.coalesce(1).write.mode("Overwrite").option("header", value = true).parquet(sparkpath)
val pattern = "hdfs://(.*?):(\\d{4})".r
//正则匹配
val uri = (pattern findFirstIn sparkpath).get
val path = sparkpath.stripPrefix(uri)
val fs = HdfsUtils.creatFileSystem(uri)
// 获取目录下文件
val files = fs.listStatus(new Path(path))
//删除原路径下的文件生成新的文件
HdfsUtils.deleteFile(fs, output)
files.foreach(file => {
val fileName = file.getPath.getName
if (!"_SUCCESS".equals(fileName)) fs.rename(new Path(path + "/" + fileName), new Path(output))
})
HdfsUtils.deleteFile(fs, sparkpath)
}

/**
* 获取列头信息
*
* @param typeTable 数据集
* @return 列头信息
*/
def creatHeadInfo(typeTable: DataFrame): util.LinkedHashMap[String, DataType] = {
println("____________________获取数据集列头信息________________________")
val idsInfo = typeTable.drop("namespace")
//idsInfo.show()
val ids = idsInfo.columns.toList
println(ids)
val types = idsInfo.toJavaRDD.rdd.first().toSeq.map(data => if (null == data) {
"S"
} else {
data
}.toString).toList
println(types)
var i = 0
val heads: util.LinkedHashMap[String, DataType] = new util.LinkedHashMap[String, DataType]()
ids.foreach(id => {
var datatype: DataType = BinaryType
if (types(i).equalsIgnoreCase("F")) {
datatype = FloatType
} else if (types(i).equalsIgnoreCase("B")) {
datatype = BooleanType
} else if (types(i).equalsIgnoreCase("L")) {
datatype = LongType
} else {
datatype = StringType
}
heads.put(id, datatype)
i += 1
})
heads
}

/**
* 列头数据类型转换为JSON格式
*
* @param scheamInfo scheam信息
* @return Json格式
*/
def headTypeJson(scheamInfo: util.LinkedHashMap[String, DataType]): String = {
val headInfo: util.List[util.Map[String, String]] = new util.ArrayList[util.Map[String, String]]
val iter = scheamInfo.keySet.iterator
while ( {
iter.hasNext
}) {
val key = iter.next
val value = scheamInfo.get(key)
val idType = typeConversion(value)
val oneHead: util.LinkedHashMap[String, String] = new util.LinkedHashMap[String, String]()
oneHead.put("name", key)
oneHead.put("dataType", idType)
headInfo.add(oneHead)

}
val headJson = headInfo.toString.replaceAll("=", "\":\"").replaceAll("\\{", "\\{\"").replaceAll("\\}", "\"" + "\\}").replaceAll(",", "\",\"").replaceAll("\\}\",\" \\{", "},{")
// println(headJson)
headJson
}

/**
* 数据类型转换(parquet-->基本数据类型)
*
* @param types parquet数据类型
* @return 基本数据类型
*/
def typeConversion(types: DataType): String = {
var datatype: String = "String"
if (types == FloatType) {
datatype = "Float"
} else if (types == BooleanType) {
datatype = "Boolean"
} else if (types == LongType) {
datatype = "Long"
} else {
datatype = "String"
}
datatype
}
/**
* 创建数据表scheam
*
* @param scheamMap scheam信息
* @return
*/
def creatScheam(scheamMap: util.LinkedHashMap[String, DataType]): StructType = {
val structFields = new mutable.ArrayBuffer[StructField]()
val iter = scheamMap.keySet().iterator()
while ( {
iter.hasNext
}) {
val key = iter.next
val value = scheamMap.get(key)
structFields += StructField(key, value, nullable = true)
}
//根据数据类型更新scheam信息
val scheam = StructType.apply(structFields)
//scheam.printTreeString()
scheam
}
}
Mysql数据操作函数:JdbcUtil
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package com.hollysys.utils

import java.io.{FileInputStream, IOException}
import java.sql.{Connection, DriverManager, PreparedStatement, SQLException, Statement}
import java.util
import java.util.{LinkedList, Properties}

import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}

/**
* Created by Dingzhenying on 2018/9/1016:26
*/
object JdbcUtil {
var connectionQueue = new util.LinkedList[Connection]
val props = new Properties
val loader = getClass.getClassLoader
//props.load(new FileInputStream("src/main/resources/application-dev.properties"))


def getConnectionPool(driverClassName:String,url:String,user:String,password:String): Connection = {
Class.forName(driverClassName)
try {
if (connectionQueue != null) {
connectionQueue = new util.LinkedList[Connection]
var i = 0
while (i < 5) {
val conn = DriverManager.getConnection(url, user, password)
connectionQueue.push(conn)
i += 1
}
}
}catch {
case e: Exception =>
e.printStackTrace()
}
connectionQueue.poll
}

def getConnectionPool: Connection = {
props.load(new FileInputStream("application-dev.properties"))
Class.forName(props.getProperty("spring.datasource.driverClassName"))
try {
if (connectionQueue != null) {
connectionQueue = new util.LinkedList[Connection]
var i = 0
while (i < 5) {
val conn = DriverManager.getConnection(props.getProperty("spring.datasource.url"), props.getProperty("spring.datasource.username"), props.getProperty("spring.datasource.password"))
connectionQueue.push(conn)
i += 1
}
}
}catch {
case e: Exception =>
e.printStackTrace()
}
connectionQueue.poll
}

def returnConnectionPool(conn: Connection): Unit = {
connectionQueue.push(conn)
}

//get a connection to mysql database
def getOneConnection(url:String,user:String,password:String):Connection={
DriverManager.getConnection(url,user,password)
}

def main(args:Array[String]): Unit ={
props.load(new FileInputStream("application-dev.properties"))
val url=props.getProperty("spring.datasource.url")
val username=props.getProperty("spring.datasource.username")
val password=props.getProperty("spring.datasource.password")
val driverClassName=props.getProperty("spring.datasource.driverClassName")
println(url,username,password,driverClassName)
val spark = SparkSession
.builder()
.appName("sparkReadParquet")
.master("local")
.getOrCreate()
val sqlContext=spark.sqlContext
import sqlContext.implicits._
//the data should be inserted into MySQL
//the database is named "mydatabase",the table's name is employee
//the database and table should be created in advance
var id ="402881ad65b349ee0165b34b67720000"
val schema="sssssssss"
var sql = "select * from data_table"
var jdbcSql = "UPDATE data_table SET num_of_rows = "+10+" , num_of_columns= "+6+" , schema_info= '"+schema+"' WHERE id = '"+id+"'"

val connection = getConnectionPool//invoke a function to get a connection
val prepareSta2: PreparedStatement= connection.prepareStatement(sql)
// val prepareSta2 = connection.prepareStatement(jdbcSql)
// prepareSta2.setLong(1, 10)
// prepareSta2.setDouble(2, 8)
// prepareSta2.setString(3, "wewewew")
// prepareSta2.setString(4, "333")
println(jdbcSql)
prepareSta2.execute()
//prepareSta2.executeUpdate()
prepareSta2.close()
connection.close()
// prepareSta2.setString(1, "15")
// prepareSta2.setString(2, id)
// prepareSta2.executeUpdate()
// val prepareSta: Statement = connection.createStatement()
//// prepareSta.executeUpdate()
// val aaa= prepareSta.execute(sql)

val jdbcDF = spark.read.format("jdbc").option("url",url).option("driver",driverClassName).option("user", username).option("password", password).option("dbtable", "data_table").load()
jdbcDF.select("id","num_of_columns","num_of_rows","schema_info").show(false)
}
}
打赏
  • 版权声明: 本博客所有文章除特别声明外,著作权归作者所有。转载请注明出处!
  • Copyrights © 2018-2020 丁振莹
  • 访问人数: | 浏览次数:

你的每一分支持,是我努力下去的最大的力量 ٩(๑❛ᴗ❛๑)۶

支付宝
微信