Dare To Think, Strive To Execute

Spark-Sql与异源数据库交互

Spark-sql 跟hive

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
case class Param(
input_pt: String = "",
table: String = "",
format: String = "tsv",
metastore_uris: String = ""
)
def run(para: Param) {
val conf = new SparkConf()
.setAppName("Write to Hive Table")
val sc = new SparkContext(conf)
val hive_context = new HiveContext(sc)
if (!para.metastore_uris.startsWith("thrift"))
throw new IllegalArgumentException(s"Bad metastore uris ${para.metastore_uris}")
hive_context.setConf("hive.metastore.uris", para.metastore_uris)
//val df = hive_context.read.table(para.table)
val df: DataFrame = para.format match {
case "parquet" => hive_context.read.parquet(para.input_pt)
case "json" => hive_context.read.json(para.input_pt)
case _ => throw new IllegalArgumentException(s"Bad format ${para.format}")
}
df.write.mode(SaveMode.Overwrite).saveAsTable(para.table)
sc.stop()
}
  • 采用动态设置metastore_uris本来想动态地链接hive的,但是每次这么设置后并不奏效,按理说代码中对配置信息进行设置的优先级要高于配置文件的,但实际上并非如此。初步揣测,spark在启动的时候已经连接上hive 了。但是下面的两种方式都是可以动态连接hive的。
1
2
spark-sql --hiveconf "hive.metastore.uris"="thrift://hostname:9083"
hive --hiveconf "hive.metastore.uris"="thrift://hostname:9083"

Spark-sql 跟mysql

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def run(para: Param) {
val conf = new SparkConf()
.setAppName("Write to Mysql Table")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
//val df = hive_context.read.table(para.table)
val df: DataFrame = para.format match {
case "parquet" => sqlContext.read.parquet(para.input_pt)
case "json" => sqlContext.read.json(para.input_pt)
case _ => throw new IllegalArgumentException(s"Bad format ${para.format}")
}
//key and value are keywords of spark-sql, the schema can not use them as field name.
val prop = new Properties()
prop.put("user", para.user)
prop.put("password", para.passwd)
df.write.mode(SaveMode.Overwrite).jdbc(para.url, para.table, prop)
sc.stop()
}
  • 刚开始测试的一个json文件中,有两列,名字分别是key跟value,结果执行的时候报出一个错误:
    error
  • 这个应该是一个sql语法的问题,单单从程序上看,这里根本没有什么SQL语句。细究以后会发现spark-sql会利用dataframe中的schema信息去创建一个表格,所以内部的逻辑会有一个创建表格的sql语句,其中出现value是因为schema中有value,所以当我将json文件中的schema信息中的value替换掉就OK了。大胆猜测value应该是sql的保留字之类的。

进一步的工作

hive的动态连接还存在一定问题,这主要是对hive缺乏必要的了解导致的,接下来会看看spark-sql跟hive是怎么整合的,把这个问题给解决掉。