Dare To Think, Strive To Execute

Spark SQL 使用JDBC连接数据库

背景

  说来惭愧,之前一点没有用过数据库的调用接口,这一次为了是实现一个ETL的功能,只能自己去摸索解决这方面的办法。好在网上资料多,解决问题的能力也在不断进步。但是用spark读异源数据的经验与分享还是不多,索性记录下我读Hive 以及mysql的经历。

环境介绍

  • spark 集群7台, 系统centos, spark版本1.5.1.
  • hive 计算引擎是hadoop, 也安装在上面的服务器上.
  • mysql 数据库在其中一台服务器上.

Hive

  Hive中我主要是参考的spark 的官方文档关于hive的使用方法来的。

1
2
3
4
5
6
7
8
// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sqlContext.sql("LOAD DATA LOCAL INPATH 'Path/kv1.txt' INTO TABLE src")
// Queries are expressed in HiveQL
sqlContext.sql("FROM src SELECT key, value").collect().foreach(println)

我使用的是sbt来配置的依赖,其中依赖为

1
2
3
4
5
6
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-sql" % "1.5.2",
"org.apache.spark" %% "spark-core" % "1.5.2",
"org.apache.spark" %% "spark-hive" % "1.5.2",
"com.github.scopt" %% "scopt" % "3.2.0"
)

  当我在服务器上执行上面的代码的时候, 开始一直报org.apache.hive.conf 找不到,最后我的处理办法是将hive的包全部加到spark的运行环境中,当然这也是下策,毕竟包很多。之所以这样主要原因应该是spark集群是编译好再部署的而不是用源码编译的,没有将hive的lib加入到spark的classpath 下,我试过用源码编译加了-Phive就有效的规避了这个问题。

  这个问题解决完之后就遇到了下面的情况:
error

  我google以后发现大概原因是如果不配置hive,hive默认的metastore是derby,然而derby的是不允许多个任务同时访问derby数据库的,这个问题遇到的人很多,大概分成两种解决思路,一种是配置derby 的network service 的,一种是修改hive的配置文件,配置hive的hive-site.xml文件,使得hive的metastore是mysql.

1
2
3
4
5
6
7
8
<property>
<name>hive.metastore.uris</name>
<value>thrift://hostname:9083</value>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
</property>

  另外配置完了hive-site.xml后还需要将该文件放在spark 的conf目录下,这样spark 才会读到hive的配置参数。

  如果环境配置好了,用spark sql来读hive table是一件特别惬意的事儿。

1
2
3
4
5
6
7
8
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
// sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
//sqlContext.sql("LOAD DATA INPATH '" + "HDFSPATH/kv1.txt' INTO TABLE src")
// Queries are expressed in HiveQL
sqlContext.sql(s"SELECT * from ${para.table}").map{
row =>
row.mkString("\t")
}.saveAsTextFile(para.output_pt)

JDBC

  spark 通过JDBC连接Mysql读取其中table也很容易,重要的是设置好以下参数

  url mysql数据库的url jdbc:mysql://MysqlServer:3306/DatabaseName
driver: java 连接mysql的驱动,这里我遇到一个坑,就是当我下载了mysql-connector-java-5.1.35-bin.jar的时候,总是出现 “no suitable driver found”,这个error 我待会细说。
另外加上访问数据库的用户名以及密码。
  下面这条语句就可以访问mysql中已经存在的table。

1
2
3
4
5
6
7
val jdbcDF = sqlContext.read.format("jdbc")
.options(Map("url" -> "jdbc:mysql://MysqlServername:3306/DatabaseName",
"driver" -> "com.mysql.jdbc.Driver",
"dbtable" -> "Table",
"user" -> "root",
"password" -> "123456"))
.load()

  spark 通过JDBC连接Mysql,如果执行查询计算等操作有两种方式:

  一种是给mysql server 传一个sql 语句,这样最后返回的dataframe就是查询的结果

1
2
3
4
5
6
7
8
val jdbcDF = sqlContext.read.format("jdbc")
.options(Map("url" -> "jdbc:mysql://MysqlServername:3306/DatabaseName",
"driver" -> "com.mysql.jdbc.Driver",
"dbtable" -> "Table",
"sql" -> "SELECT * FROM TableName",
"user" -> "root",
"password" -> "123456"))
.load()

  另外一种是将load的dataframe注册一个临时表格,执行查询操作, 具体方法使用如下:

1
2
3
4
5
6
7
8
9
10
11
// Create an RDD of Person objects and register it as a table.
val people = sc
.textFile("examples/src/main/resources/people.txt")
.map(_.split(","))
.map(p => Person(p(0), p(1).trim.toInt))
.toDF()
people.registerTempTable("people")
// SQL statements can be run by using the sql methods provided by sqlContext.
val teenagers = sqlContext
.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")

关于“no suitable driver found”

  一般有三个原因导致出这个错误:

  • url 或者driver拼写有误,曾因为url 写在脚本中但是没有传进程序参数一直出现这个error,花了大量的时间,总之注意check自己的命令。
  • 连接数据库的jar包不存在com.mysql.jdbc.Driver.class, 比如我下载的mysql-connector-java-5.1.35-bin.jar就不存在. 可以对这个jar包grep一下确认该类存在。
    在spark中,有两种方法可以添加该jar包,一种是–driver-class-path, 一种是在sparkclasspath中设置一下,如果是集群确保每台机器都有设置
  • 如果–driver-class-path和sparkclasspath都有设置,会报一个冲突,可惜对spark的运行原理不太了解,否则应该能很快找出错误原因。

番外

  沿着前人的步伐走下去总是比较容易的,想当初刚用spark写算法的时候,连使用idea打包失败这类错误都有同学帮助自己解决掉,以至于最后我发现自己独立解决问题的能力越来越差,如果一个问题除了google周围没有任何人能给与援手,自己会在上面被困好久。从年前一个人部署Zeppelin到现在解决这些问题,我深刻地感受到如果自己想独当一面首先得有resolve error的能力,如果要想有resolve error的能力,信心比其他的更加重要,多少次被困住并不是多麻烦的问题,而是内心的恐惧。我想起《暗时间》中的一句话:

过早退出往往在于对于未来的不确定性,对于投资时间最终无法收到回报的恐惧,感受到的困难越大,这种恐惧越大,因为越大的困难往往暗示这个任务需要投资的时间越多。所以我们都是直觉经济学家,当我们“畏难”的时候,其实我们畏惧的不是困难本身,而是困难所暗示的时间经济学意义。

  一言以蔽之就是不自信自己能解决问题嘛,作为一个程序员,解决问题简直就是家常便饭,切不可因为心里的懊恼伤害了前进的动力。我也开始发现这里的一些问题,之所以被卡很久,一方面是自己不能冷静思索,更重要的是自己对专业素养不够。这次感慨颇深,索性记下来自勉。