Spark常用内置函数

explode 数据拆分函数

Creates a new row for each element in the given array or map column.

遍历列中的值,生成新的一行

1
2
val dfDates = datas.select($"namespace", $"time", explode(datas("nodes"))).toDF("namespace", "time", "node")
dfDates.show(false)

image
image

filter过滤函数

将datefream按照传入的过滤条件,进行判断

.cast为指定列的数据类型

filter

filter对RDD中的数据项进行计算
($”id”.isin(idList: _*)):列中的值是否在数组中(val idList = points.split(“,”).sorted)

filterByRange

对RDD中的元素进行过滤

.isin函数:遍历判断列中值是否在数组中

image

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
val splitDataInfo = dfDates
.select(
$"namespace".cast(StringType),
$"time".cast(LongType),
$"node.id".cast(StringType),
$"node.v".cast(StringType).substr(0, 1).as("type"),
$"node.v".cast(
StringType
).substr(3, 10).as("v"),
dfDates("node.t").as("idTime"),
//from_unixtime(dfDates("node.t") / 1000).as("node_time").cast(TimestampType),
$"node.s"
)
.filter($"id".isin(idList: _*))
.filter($"namespace" === (namespace))

数据合并函数 concat_ws

使用给定的分隔符将多个输入字符串列连接成一个字符串

1
df.select(concat_ws(",",$"name",$"age",$"phone").cast(StringType).as("value"))

struct函数

将多列合并一个数组保存为一列

1
df.select(struct($"name",$"age",$"phone").as("value")).show(false)//将String转数组

foldLeft左累积器

参考文章:https://blog.csdn.net/wsscy2004/article/details/37698013

1
2
3
4
5
6
7
8
9
10
//times(List('a', 'b', 'a')) --> List(('a', 2), ('b', 1))
def times(chars: List[Char]): List[(Char, Int)] = {
def incr(pairs: List[(Char, Int)], C: Char): List[(Char, Int)] =
pairs match {
case Nil => List((C, 1))
case (C, n) :: ps => (C, n+1) :: ps
case p :: ps => p :: incr(ps, C)
}
chars.foldLeft(List[(Char,Int)]())(incr)
}

printSchema scheam输出函数

image

聚合函数

.groupBy

  • 使用指定的列对数据集进行分组,以便我们可以对它们进行聚合。
1
2
3
def groupBy(cols: Column*): RelationalGroupedDataset = {
RelationalGroupedDataset(toDF(), cols.map(_.expr), RelationalGroupedDataset.GroupByType)
}

.pivot行转列聚合,将对应列的值与列表匹配

1
2
val idList = points.split(",").sorted
val typeTable = splitDataInfo.groupBy($"namespace").pivot("id", idList).agg(collect_list($"type")(0)).filter($"namespace" === (namespace))

image

.agg()聚合生成datafream

1
.agg(collect_list($"type")(0))

collect_list()参数去重

image
参考:https://www.cnblogs.com/wwxbi/p/6102380.html

Spark正则匹配

1
2
3
4
5
6
7
val sparkpath = output + "_sparkData"
data.coalesce(1).write.mode("Overwrite").option("header", true).parquet(sparkpath)
val pattern = "hdfs://(.*?):(\\d{4})".r//正则表达式
//正则匹配
val uri = (pattern findFirstIn sparkpath).get//正则化匹配
val path = sparkpath.stripPrefix(uri) 截取
val fs = HdfsUtils.creatFileSystem(uri)

createStatement创建sql连接

1
2
3
4
5
6
7
val conn =JdbcUtil.getConnectionPool
val sql = "UPDATE data_table SET num_of_rows = "+numberofRow+" " +
",num_of_columns= "+numberofcolumns+" " +
",schema_info= '"+schema_info+"'" +
" WHERE id = '"+tableId+"'"
val stmt = conn.createStatement
stmt.executeUpdate(sql)

时间转换函数

1
from_unixtime(dfDates("time") / 1000, "yyyyMMdd").as("day"),
打赏
  • 版权声明: 本博客所有文章除特别声明外,著作权归作者所有。转载请注明出处!
  • Copyrights © 2018-2020 丁振莹
  • 访问人数: | 浏览次数:

你的每一分支持,是我努力下去的最大的力量 ٩(๑❛ᴗ❛๑)۶

支付宝
微信