Dare To Think, Strive To Execute

Spark 与Hive Mysql 交互

背景介绍

  最近的工作一直围绕着怎么灵活地将异源数据导入我们的图形化机器学习平台中,也就是ETL中的最后一步Load,其间因为不了解hive浪费了好多时间,但是好歹最近实现了这些过程。现在记录下使用spark跟其他源数据交互的一些经验。

spark 与 mysql

  spark跟mysql交互就跟java与mysql数据库交互之前需要建立连接一样,首先需要提供database, table, username, password等,建立连接以后spark有一个API可以直接将mysql中的表格转换成dataframe。

1
2
3
4
5
6
val df = sql_context.read.format("jdbc")
.options(Map("url" -> s"${para.url}",
"driver" -> "com.mysql.jdbc.Driver",
"dbtable" -> s"${para.table}",
"user" -> s"${para.user}",
"password" -> s"${para.passwd}")).load()

  dataframe不仅可以完整地保存mysql的table中的数据信息,还可以保存table的schema信息,事实上dataframe就是一种类似于table的数据结构,可以支持查询以及其他的一些数据库操作。

  因为dataframe保存了schema信息,所以可以将该数据保存成不同格式的文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
if (para.format == "tsv") {
df.map {
row =>
row.mkString("\t")
}.saveAsTextFile(para.output_pt)
}
else if (para.format == "csv") {
df.map {
row =>
row.mkString(",")
}.saveAsTextFile(para.output_pt)
}
else if (para.format == "json") {
df.toJSON.saveAsTextFile(para.output_pt)
}
else {
throw new IllegalArgumentException(s"Bad format ${para.format}")
}

Spark 与 Hive

  这里读hive不是跟spark on hive,如果是这样的话,整个事件就简单的多,都不用连接,直接就可以读取hive的table。关于hive on spark 的配置,需要使用spark的源码进行编译,编译的时候加上-Phive, -Pthriftserver。