Dare To Think, Strive To Execute


  • 首页

  • 分类

  • 关于

  • 归档

  • 标签

  • Sitemap
Dare To Think, Strive To Execute

Google Girl Hackthon

发表于 2017-03-13

google girl hackthon

  花了两天的时间参加了谷歌的一个为了鼓励女生coding的编程马拉松,感触还是挺深的。即使知乎上牛人再多,手机上天天都能听到王健林马云鼓励年轻人的鸡汤,即便每天公众号各种正能量,但是它们带给我的触动远不及面对面看到一群智慧优雅才华横溢的女工程师给我的触动大。

  参加这个的时候单纯是冲着谷歌的食堂去的,毕竟工作也找了,其中的奖品以及实习对我来讲要么没有吸引力要么也没很大的意义了。但是我收获了比想象的更多的东西。

  先放几张我们的照片

name

name

最有感触的话

  时间不能倒流,那些年错过的大作业,那些年错过的Project,还有那些年抱过的大腿,想想当年自己的表现真是羞愧。我不太清楚同样参加这个活动的小伙伴什么感受,但是听了几个女工程师的话,我真是非常触动。其中最让我难忘也是最让我深思的一句话是

不要把自己当女孩看
把自己当女孩看

  所谓“不要把自己当女孩看”是说不要总用低标准来要求自己,我大学期间不乏成绩优异的女同学,但是说到动手能力,因为都不怎么样,而且长期觉得自己即使是动手在女生中也不差,所以就没有动力超越自己,以至于技术上进步非常缓慢,甚至很多到了最后就自暴自弃了。而我没有放弃肯定是因为既没有退路又没有依靠。

  有一个美女工程师也讲到当自己硕士阶段不是合作写大作业而是需要独立完成project的时候,当自己独立能handle一个任务的时候,发现原来写代码什么的也不是那么难嘛,自己其实比想象的能干很多。当自己总是有很多退路的时候,尤其那些退路那么显而易见,一旦学会了拿那些“女生工程不行”的套路为自己开脱,慢慢就消磨了克服困难的动力。这么多年,我努力克服着自己身上的坏毛病,一步步尽力成为自己想要的样子的时候,我自认为没有天赋,没有才华,甚至也不够聪明,刚开始编程我根本无法入道,非常的沮丧,倘若有退路和其他选择我肯定也早就放弃了。原本很多技能的获得都不是什么轻松的事情,现在想想支持我走过很多困境的也不过是无人可依时的顽强。不逼自己一把,我都不知道自己原来可以一直走这么远,无论如何我克服了对自己的怀疑以及对工程的恐惧。

  所谓“把自己当个女孩”是说要发挥好自己的优势,无论是气质还是风格,女生确实有自己的优势。倘若一个人外在举止投足优雅大方,声音悦耳表情自信,心思体贴细腻,性格开朗幽默,有想法,有能力,有见识,论谁跟她打交道都会如沐春风的吧?当我越来越意识到优秀的多元以后,也愈发体会到这些软实力的价值。

  想做一件事很大程度取决于你想做成它的愿望,而想做好一件事则取决于你给自己预设的标准。一直都很喜欢那句话,”求于上者得于中,求于中者得于下”。

  • 标准跟自己的视野,观念以及自信戚戚相关,标准怎么也不会高于榜样,有几个人能变得比自己的榜样还好呢?而且面对面看到这些跟自己有相同经历的人顿时就有了榜样的效果,比如我觉得这次活动就让我意识到原来女工程师真的可以优秀如此而且人数不少;至于观念,倘若我从来没有被那句“男工程师就是比女程序员优秀”影响,我应该也会以更高的标准要求自己,再说自信,自信是自己在自己见识多了优秀且排除了无关杂念后一步步从心底里对自己的认同。另外,主动承担责任也容易让自己提高对自己的要求,因为辜负自己很简单,辜负别人很不好意思,当然可能是因为我还不够不要脸。对于标准的感受简直不要更多,大学我就是因为在女生中感觉良好,摆脱不了性别质疑,再加上班上大腿给我留了不少后路,所以一而再地降低对自己的要求。
  • 题外话是想起了曾经一个师弟说的一句话

    想逃避的事,终究是要面对和解决的
    我想说,可能犯的错误早晚都会犯的,所以犯错也要趁早
    毕竟越晚时间成本越高
    总而言之,欠下的都会还的

  • 再说努力,我当然不是唯心主义者,但是“念念不忘,必有回响”的感受我还是挺认同的,我感觉自己大部分的懈怠都发生在自己没那么强烈做的愿望或对自己的能力有所怀疑或者有着很不错的退路的时候。不过强烈做成一件事的愿望也会跟与这件事相遇的情景有关,当然情景是可以设计的,比如我以前听了那么多word2vector都不为所动,当我亲眼见到它令人震撼的模型效果的时候,我会情不自禁想去深入了解它。

  这次活动虽然只在谷歌吃了一顿饭,但是仍然很感激这么有情怀的公司。所谓“穷则独善其身,达则兼济天下”不是没有道理的,看一个公司的对人才的态度看其情怀还是有道理的。虽然压榨员工是资本主义的天性,但是一个兼济天下的公司想必对自己员工的发展考虑的也不会少。恩,没错我就是在解释我为什么愿意去微软,希望到时候我也能遇到一帮很优秀的同事。

收获

  收获主要有二

  • 自己遇到的困难很多人也都遇到过,自己内心的质疑以及困惑很多人也经历过,看到那么多妹子问着困扰我当年的问题。我心里也是感慨万千,尤其是当我听到一个师妹问怎么提高编程能力的时候,我不禁哑然失笑,恩恩,我仿佛看到了当年我一脸虔诚请教师兄师姐的样子。有个师姐说应该看什么书,而我心里在想其实很多妹子学习非常刻苦,普遍都是缺乏面对困难的勇气和解决问题的决心,但凡逼着自己有过一两次经历这些都不是问题了。

  • 对自己的缺憾跟能力有了更加清醒的认识,理性有余,感性不足,严谨但不够有意思,我就是很欣赏有趣的人;

  • 亲身感受了多维的优秀,现在环境毕竟简单,我顶多会觉得一个人代码写的好佩服,论文发的多发的牛钦佩,为人做事好让人羡慕;当我看到了那么多妹子的时候,无论是工程师还是跟我一样的参加活动的童鞋,我意识到她们的优秀是那么多元。有的是年纪轻轻对自己未来那么坚定那么有梦想,有的表达自如谈笑风生,有的是才华横溢能力超群,有的女孩是提问题一针见血尖锐有深度,有的思维缜密认真负责,有的妹子风趣幽默,做展示非常有感染力,有的细腻温婉生活精致,还有的英语一级棒。

反思

  说完了感受,再说一说自己对自己的反思。

  • 一方面,摆脱不了自己的偏见,觉得技术牛才值得钦佩。这个我昨晚也跟室友探讨过,技术牛当然值得钦佩,但是技术仅仅是推动社会前进的一部分力量,脱离了产品设计运营等其影响力十分有限。想起来高中以前的自己也是这样,在自己执着追逐分数的同时,缺乏了对其他素养的鉴别与欣赏。
  • 做过presentation越多,越来越了解到表现能力的重要性,我觉得我不缺把一个问题表达清楚的能力,但是自己很缺表现力跟感染力。信息传递的过程中因为表现力跟感染力不佳导致很多信息丢失,想象下自己经常作为台下的观众听演讲的状态后,自己也情不自禁地笑了起来,相比于一本正经的学究当然还是段子手更有看点呀。
  • 理性有余,感性不足。怀疑自己是不是很呆板乏味。

总结

  都是老生长谈,说不上意识到了问题就能改的很彻底,好歹是对自己的一些总结,我很开心自己能在入职之前思考了这些问题,写出来一是为了纪念,二是整理下思路,说不定哪天自己看的也为像李唯师姐说的那样,觉得当初的自己那么傻那么有意思。

Dare To Think, Strive To Execute

Nonnegative Matrix Factorization(NMF)实现

发表于 2016-12-06

NMF应用场景

推荐

  在推荐应用中,用户以及商品的历史关系相当于大型的矩阵,分解后的矩阵W以及H可以看成用户以及商品在兴趣维度的表达,根据分解后的矩阵可以预测未知的用户商品的打分情况。在Netflix竞赛中, Bellkor’s Pragmatic Chaos 采用该算法取得了 RMSE: 0.856704的好成绩,相比于原来的算法Cinematch效果提升了10.06%。

name

文本挖掘

  NMF可以看做LSI的近似算法,Document-Term矩阵的NMF分解结果可以看成是在topic空间对文档以及单词的低秩近似。利用NMF,一个文档是基础潜在语义的加法的组合,使得在文本域中更有意义。

name

交叉社区发现

  矩阵 $V$ 可以看作社交网络中用户关系矩阵。$W$ 中的 $k$ 个列向量看作新的特征空间下的一组基,$H$ 的每一行是在这组基下的新表示。因此,每个节点的表示向量 $x$ 可以用$W$的列向量的线性组合进行逼近$𝑥→𝑊ℎ^𝑇$,其中, $h$ 表示在各基向量的组合权重,代表节点到各个社区的隶属程度。

name

NMF理论推导

  非负矩阵分解 (Nonnegative Matrix Factorization, NMF)是机器学习中重要的非监督学习算法,其广泛应用于话题发现和推荐系统等应用。给定一个非负样本矩阵(如$m$个用户对$n$个商品的打分矩阵)$\mathbf{R} \in R_{+}^{m \times n}$,NMF的目标为求解两个非负低秩矩阵$\mathbf{W} \in R_{+}^{m \times k}$,$\mathbf{H} \in R_{+}^{k \times n}$,最小化如下目标函数如下:
$$L\left ( \mathbf{W}, \mathbf{H} \right ) = \frac{1}{2} \left | \mathbf{R} - \mathbf{W}\mathbf{H^T} \right | + \frac{\lambda}{2}\left ( \left | \mathbf{W} \right |_F^2 + \left | \mathbf{H} \right |_F^2 \right )$$

  其中$\left | . \right |$表示矩阵的F范数。上述目标函数也可以表示为:

$$L\left ( \mathbf{W}, \mathbf{H} \right ) = \frac{1}{2} \sum_{i = 1}^{m}\sum_{j = 1}^{n}\left ( r_{i,j} - w_i h_j^T \right ) + \frac{\lambda}{2}\left ( \left | \mathbf{W} \right |_F^2 + \left | \mathbf{H} \right |_F^2 \right )$$

其中$w_i$和$h_j$均为行向量,其分别对应矩阵$W$的第$i$个行向量和$H$的第$j$个行向量。可计算$W$和$H$的梯度如下:
$$\frac{\partial L}{\partial w_i} = -\sum_{j = 1}^{n}(r_{i,j} - w_i h_j^T)h_j + \lambda w_i = G_{w_i} + F_{w_i}$$

$$\frac{\partial L}{\partial h_j} = -\sum_{i = 1}^{m}(r_{i,j} - w_i h_j^T)w_i + \lambda h_j = G_{h_j} + F_{h_j}$$

其中,$G_{w_i} = -\sum_{j = 1}^{n}(r_{i,j} - w_i h_j^T)h_j$,$F_{w_i} = \lambda w_i$,$G_{h_j} = -\sum_{i = 1}^{m}(r_{i,j} - w_i h_j^T)w_i$,$F_{h_j} = \lambda h_j$。采用梯度下降的参数优化方式, 可得W以及H的更新方式见下式:
$$w_i \leftarrow \left [ (1 - \eta \lambda)w_i + \eta \sum_{j = 1}^{n}e_{i,j}h_j \right ]_+$$

$$h_j \leftarrow \left [ (1 - \eta \lambda)h_j + \eta \sum_{i = 1}^{m}e_{i,j}w_i \right ]_+$$
其中$\left [ x \right ]_+$定义为$\left [ x \right ]_+ = \left \langle max(x_1, 0),\cdots,max(x_k, 0) \right \rangle$。

实现

单机实现(Python)

  下面是采用梯度下降(公式见上面)实现的NMF矩阵分解算法。

# coding=utf-8
import numpy as np
import math
import random
import argparse


# generate a matrix whose cell is in gauss distribution
def generate_gauss_dist_matrix(row, col, mu, sigma):
    gauss_matrix = np.zeros(shape=(row, col))
    for i in xrange(row):
        for j in xrange(col):
            gauss_matrix[i][j] = random.gauss(mu, sigma)
    return gauss_matrix 


# calculate gradients
def calculate_grad(user_lst, item_lst, rate_lst, row, col, matrix_w, matrix_h, prompt):
    grad = np.zeros(shape=(row, col))
    error_sum = 0  # calculate the the RMSE on train data.
    for user, item, rate in zip(user_lst, item_lst, rate_lst):
        eij = rate - np.dot(matrix_w[user, :], matrix_h[:, item])
        error_sum += abs(eij)
        if prompt == "W":
            grad[user, :] += eij * matrix_h[:, item].T
        else:
            grad[:, item] += eij * matrix_w[user, :].T
    return grad, error_sum


# update parameters
def update_paramters(row, col, matrix, grad, learn_rate, reg):
    for i in range(row):
        for j in range(col):
            new = (1 - learn_rate * reg) * matrix[i][j] + learn_rate * grad[i][j]
            matrix[i][j] = max(new, 0.0)
    return matrix


def matrix_factorization_train(user_lst, item_lst, rate_lst, K=10, iters=100, learn_rate_init=0.001, reg=0.1):
    max_user_id = max(user_lst)
    max_item_id = max(item_lst)
    sum_rate = sum(rate_lst) 
    count = len(user_lst) 

    # size
    row = max_user_id + 1
    col = max_item_id + 1

    # matrix parameters distribution is gauss distribution.
    print "initialization of the parameters......"
    bias = sum_rate * 1.0 / count
    mu = math.sqrt(bias / K)
    sigma = 0.1
    matrix_w = generate_gauss_dist_matrix(row, K, mu, sigma)
    matrix_h = generate_gauss_dist_matrix(K, col, mu, sigma)

    for iter in xrange(1, iters + 1):
        learn_rate = learn_rate_init / math.sqrt(iter)
        print "In this iteration, learn_rate is: " + str(learn_rate)
        # update W
        grad_w, error_sum = calculate_grad(user_lst, item_lst, rate_lst, row, K, matrix_w, matrix_h, "W")
        matrix_w = update_paramters(row, K, matrix_w, grad_w, learn_rate, reg)
        print "This is the " + str(iter) \
              + "th iteration after update W, and the RMSE of the Train data is " \
              + str(math.sqrt(error_sum * 1.0 / len(user_lst)))

        # update Q
        grad_h, error_sum = calculate_grad(user_lst, item_lst, rate_lst, K, col, matrix_w, matrix_h, "Q")
        matrix_h = update_paramters(K, col, matrix_h, grad_h, learn_rate, reg)
        print "This is the " + str(iter) \
              + "th iteration after update H, and the RMSE of the Train data is " \
              + str(math.sqrt(error_sum * 1.0 / count))
    return matrix_w, matrix_h


def matrix_factorization_predict(user_lst, item_lst, rate_lst, matrix_w, matrix_h):
    max_user_id = len(matrix_w)
    max_item_id = len(matrix_h[0])
    count = len(user_lst)
    error = 0
    for user, item, rate in zip(user_lst, item_lst, rate_lst):
        # filter the triple whose user or item are not trained.
        if user >= max_user_id or item > max_item_id:
            continue
        error += abs(rate - np.dot(matrix_w[user, :], matrix_h[:, item]))
    rmse = math.sqrt(error / count)
    return rmse


# Return the user_lst, item_lst, rate_lst of the movie info.
def load_data(file_pt):
    user_lst = list()
    item_lst = list()
    rate_lst = list()
    with open(file_pt) as f:
        for line in f:
            try:
                fields = line.strip().split()
                user_lst.append(int(fields[0]) - 1)
                item_lst.append(int(fields[1]) - 1)
                rate_lst.append(int(fields[2]))
            except ValueError:
                print "format must be [user, item, rate, *]!"
    return user_lst, item_lst, rate_lst 


def parse_args():
    parser = argparse.ArgumentParser(description="NMF demo by Ping Li")

    # optional args
    parser.add_argument("-l", "--learn_rate", type=float, metavar="\b", default=0.001, help="learn rate, default 0.001")
    parser.add_argument("-r", "--reg", type=float, metavar="\b", default=0.1, help="reg, default 0.1")
    parser.add_argument("-i", "--iter", type=int, metavar="\b", default=50, help="iter num, default 50")
    parser.add_argument("-K", "--K", type=int, metavar="\b", default=10, help="iter num, default 10")

    # necessary args
    parser.add_argument("-tr", "--train_pt", metavar="\b", help="train data path", required=True)
    parser.add_argument("-te", "--test_pt", metavar="\b", help="test data path", required=True)

    args = parser.parse_args()

    if args.learn_rate < 0.0 or args.reg < 0.0 or args.iter < 0 or args.K < 0:
        raise ValueError("learn_rate, reg, iter, K must be positive")

    return args


if __name__ == "__main__":
    args = parse_args()

    train_user_lst, train_item_lst, train_rate_lst = load_data(args.train_pt)
    test_user_lst, test_item_lst, test_rate_lst = load_data(args.test_pt)
    print "NMF train start......"
    # A descent parameters for NMF.
    W, H = matrix_factorization_train(train_user_lst, train_item_lst,
                                      train_rate_lst, args.K, args.iter, args.learn_rate, args.reg)
    print "NMF test start ......"
    rmse = matrix_factorization_predict(test_user_lst, test_item_lst, test_rate_lst, W, H)
    print "RMSE of the matrix factorization on test data is " + str(rmse)

# command line
# python NMF.py --learn_rate 0.001 --reg 0.1 --iter 100 -K 10 --train_pt "./u1.base" --test_pt "./u1.test"

实现要点

参数初始化

  由于 NMF 是一个非凸问题,所以只能找到一个 local optimal。参数初始值的选取对结果的影响比较大。参数初始值最好离预期解比较近,所以其重构后应尽可能和原样本值接近:
$$w_i h_j = R_{i,j}$$
  一种简单的策略是让参数随机初始值重构后的值等于 $R$ 的均值 $\mu$:
$$W = DenseMatrix(m, K) \times \sqrt{\frac{\mu}{K}}$$
$$H = DenseMatrix(K, n) \times \sqrt{\frac{\mu}{K}}$$

RMSE震荡问题

  在优化的过程中,如果在迭代过程一直保持一个学习率会导致loss震荡,在优化的过程中步长宜逐渐减小。

$$learnrate = \frac{lr} { math.sqrt(step)}$$

Dare To Think, Strive To Execute

Sbt打包时一些包找不到的问题

发表于 2016-11-27

背景

  一直对于java打jar包一直半解,也从来没深究在idea 这个IDE中如何打jar包这个问题,为此自己在这个上面跳了很多坑。
  最近使用一个第三方包,本来在打包的时候一股脑都加进去了,但是在跑程序的时候总是会出现jar包找不到。我将打好的jar包解压缩以后,确实能找到我想要的那几个包名,可是程序就是找不到。当我采用外部加jar包的形式,程序就能顺利跑起来了。
error

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
input=/user/liping/ETL/test
output=/user/liping/ETL/result-after-transform
function run()
{
hadoop fs -rmr $output
${SPARK_HOME}/bin/spark-submit \
--master "spark://bda07:7077" \
--class "sparksql.reader.FormatTransform" \
--conf "spark.cores.max=10" \
--jars "./etl/spark-csv_2.10-1.5.0.jar:./etl/common-csv-1.1.jar" \
spark-sql.jar \
--input_pt $input \
--output_pt $output \
--input_format "json" \
--output_format "tsv"
}
run

  如此看来,我用idea打的jar包还确实没有将spark-csv_2.10-1.5.0.jar以及common-csv-1.1.jar这两个包打进去。为此我专门请请教了一个擅长java的同学,之后恍然大悟,原来打jar包里面是不能有其他的jar包的,如果需要某个类,应该先将该jar包解压后再跟源程序编译好的class文件一起打包,这也是为什么需要打extracted jar。因为extracted jar先将该jar包解压后再一起打包。同时解压两种方式打的jar包就可以明显看出二者的区别。同时我也意识到平时自己打的上百M的jar包,除了自己写的源文件编译后的class文件,其他一点用都没用。
下图是正确打包方式:
error
下图是我之前的打包方式:
error
下面是原本的jar解压后跟正确的jar解压后的对照图:
error

Dare To Think, Strive To Execute

分布式算法实现--数据分布&&模型分布

发表于 2016-11-18

背景

  在Spark分布式计算框架下,机器学习算法有两种分布式实现方式:数据分布和模型分布。数据分布将数据集无差别地划分为多个数据子集,然后将数据子集分发到集群的各个节点上进行并行处理,并在每个节点上保留一份完整的模型参数,在参数完成局部更新之后,需要汇合每个节点上的局部参数值得出全局参数值。  Spark MLlib中大部分算法采用了数据分布实现。与数据分布中每一个计算节点都需要保留一份完整的模型参数不同,模型分布同时将数据和参数分发到各个节点上进行处理,因而能够支持更细粒度的并行化。机器学习算法库GraphLab将数据和模型构成一个有向无环二部图采用模型分布与异步通信的方式对算法进行迭代计算。
  相比较而言,数据分布算法实现相对简单,通信代价小,但其要求在每一个节点上都保存一份完整的模型参数,因而难以满足具有大规模参数的机器学习算法的训练;采用模型分布的机器学习算法实现比较复杂,需要将训练样本与模型参数的关系抽象出一个二部图,每一个样本与模型参数都是图上的节点,同时由于数据和模型参数都分布在不同的计算节点上,因此训练过程中的通信开销较大,但其主要优势在于可以支持大规模参数的模型训练。

数据分布

数据分布介绍

  数据分布是将训练数据进行划分并分发到不同节点以实现算法并行的一类方式的统称。图-1是Spark 环境中采用数据分布实现并行算法的示意图:(1)Driver将训练样本分成若干个分区分发到各个Worker上; (2)Driver向所有的Worker广播模型参数;(3)每一个Worker根据当前的模型和所拥有的本地局部数据计算出梯度并将其发送给Driver;(4)Driver接收到所有Worker的梯度后将其合并并对现有的模型参数进行更新。算法重复步骤(2)-(4)直至收敛。
  数据分布式在各种分布式机器学习算法库中被广泛采纳,例如Spark中重要的机器学习算法库MLlib采取数据分布的方式实现不同的机器学习算法,基于Hadoop MapReduce分布式架构实现的大规模数据挖掘与机器学习库Mahout也是采用的数据分布。

error

以LR为例介绍数据分布

  Logistic Regression 算法可以看成对一个正则化后的负对数似然函数的优化过程,在二值分类问题中,假设给定$N$个训练样本,$x_i$为第$i$个样本的K维特征向量,为第$i$个样本的标签,$t_i = 0$表示第$i$个样本为负例,$t_i = 1$表示其为正例。进一步假设预测模型为线性函数,即$f\left ( x \right ) = \left \langle \mathbf{w, x} \right \rangle + b$,其中和为模型参数。Logistic Regression的目标函数由经验风险项和$l$2正则项组成:

(1)$$L \left (\mathbf{w} \right ) = - \sum_{i = 1}^N \left [ t_i ln \left ( y_i \right ) - \left ( 1- t_i \right ) ln\left ( 1 - y_i \right ) \right] + \frac{\lambda }{2}\left | \mathbf{w} \right |_2^2$$

  其中为使用模型对第i个样本的预测值,为平衡经验风险与正则项的系数。Logistic Regression的目标函数可以采用梯度下降法(GD:Gradient Descent)进行优化,根据(1)式,可求得b以及w的梯度如下:

(2) $$\frac{\partial L}{\partial b} = - \sum_{i = 1}^N\left ( t_i - y_i \right ) = G_b$$

(3) $$\frac{\partial L}{\partial b} = - \sum_{i = 1}^N\left ( t_i - y_i \right ) \times x_i + \lambda \mathbf{w}= G_{\mathbf{w}} + \mathbf{F}$$
  为了便于并行化,将对w的梯度分成两部分,其中$G_\mathbf{w} = - \sum_{i = 1}^N\left ( t_i - y_i \right ) \times x_i$, $F = \lambda \mathbf{w}$。
  采用GD参数优化方式, 可得w以及b的更新方式如下:

(4) $$b\leftarrow b + \eta \sum_{i = 1}^N\left ( t_i - y_i \right )$$

(5) $$\mathbf{w}_j \leftarrow \left ( 1 - \eta \lambda \right )\mathbf{w}_j + \eta \sum_{i = 1}^N\left ( t_i - y_i \right ) x_i^j,for j = 1,\cdots, K$$
其中为预设的学习步长。
  算法1展示了使用数据分布实现Logistic Regression算法的伪代码,其在算法执行初始将训练样本$S$划分成多份并分发到不同的计算节点上,并广播模型参数w和b到所有的计算节点。在每一次迭代过程中,首先每一个计算节点根据本地的数据和当前参数值$\mathbf{w}$和$b$计算本地的梯度$G_{\mathbf{w}}^i$和$G_b^i$;然后集群的Driver节点聚合所有计算节点上梯度$G_{\mathbf{w}}^i$和$G_b^i$,更新全局的参数值,并向所有的计算节点进行广播;计算节点在接收到更新后的全局参数值后,开始下一轮的迭代。

data1

数据分布一般形式

  数据分布的一般流程图见下图。首先挖掘出算法中可以并行的部分在Worker中运行,在以梯度下降为优化算法的机器学习算法中,计算梯度是明显可以并行的部分,在灰色节点中执行;相反合并梯度广播参数等这些操作在黄色节点Driver中执行。
data1

模型分布

模型分布介绍

  模型分布是将模型参数分布在不同的节点实现分布式计算的一类方式的统称。使用模型分布实现分布式机器学习算法,首先要构建一个二部图,如图-2所示,其中为样本顶点表示每一个输入的训练样本,为参数顶点表示模型中每一个参数。用表示样本对参数关联程度(在线性模型假设下,为样本的第j维特征),当时,顶点与顶点之间建立起权重为的边,反之,顶点与顶点之间没有边。
  图-3为基于Spark的GraphX[9]采用模型分布实现并行算法的示意图:(1)通过GraphX将二部图的分布式存储到不同的Spark计算节点;(2)模型顶点向与其邻接样本顶点发送模型参数信息;(3)样本顶点计算该样本上的梯度并发送给邻接的参数顶点;(4) 参数顶点在接受到梯度信息后更新自身权重。重复(2)-(4)直至收敛。
data1
data1

以LR为例介绍模型分布

与数据分布的实现方式不同,模型分布需要首先根据模型参数和训练样本构造二部图, 在Logistic Regression中顶点与顶点分别代表第$i$个样本和第$j$个参数,当$x_i^j \neq 0$时,顶点$S_i$与顶点$\mathbf{w}_j$之间会有权重为的边,反之,顶点$i$与顶点$j$之间没有边。 算法2展示了使用模型分布式实现Logistic Regression算法的伪代码,在对此二部图划分和分发后算法开始对模型参数进行迭代更新,在每一轮迭代中,首先每一个参数顶点将其参数值发送给其相邻的样本顶点,各个样本顶点根据接收到的参数值计算当前模型的预测值以及梯度向量,并向其相邻的参数顶点广播其得到的梯度向量。参数顶点接收到所有的梯度向量后进行梯度聚合和参数更新,完成一轮迭代过程。
data1

模型分布一般形式

  模型分布的一般流程图见下图。首先应该构造出样本到模型的二部图,如上图所示,因为在模型分布的过程中数据与模型都是作为二部图中的顶点存储在不同的计算节点上,所以各个步骤都无差地在计算节点上运行,如下图所示。
data1

应用场景

  • 从编程方式来看,数据分布因为模型以广播的形式分发,编程较简单。
  • 从可扩展性来看,模型分布因为模型参数可以分布式存储,具备支持大规模模型训练的能力。
  • 从计算效率来看,模型分布可以更好地解决模型规模大、单个节点无法存储的问题,适合基于大规模但稀疏的数据训练机器学习模型;在数据规模小或非稀疏的情况下,使用数据分布方式通信开销小,可以加速模型训练。
Dare To Think, Strive To Execute

Blog New Plan

发表于 2016-11-18

Plan

  之前的博客所在的服务器速度实在太慢,每次打开都感觉弄丢了一天的好心情,恰逢师弟安利hexo,我就打算把博客整个地来个大迁移。另外找工作也接近尾声,所以我打算把剩下的本来以文档形式存在着的笔记放上来,一方面为了自我总结,另一方面也可以给有兴趣的人一些前人之鉴。

算法分布式实现

  • 数据分布
  • 模型分布

机器学习

  • Factorization Machine
  • NMF
  • ICModel
Dare To Think, Strive To Execute

Pagerank 幂法实现

发表于 2016-11-18

Graphx中Pagerank中的不足

  前一段时间为了完善bda studio中得算法,我调研了spark graphx中的pagerank,之后就萌发了要自己重写pagerank的想法。开始想重写pagerank完全是因为当初固执地想让一个算法收敛,而不是给定一个最大迭代次数,让算法终止,因为这样算法究竟收敛不收敛心里很没有底气。当我测试完spark graphx中得pagerank以后,发现了很多问题,这更加坚定了我写好这个算法的心。在这里提一句,我测试该算法使用的是twitter数据集的两个文件,分别是twitter_rv.net以及numeric;前者是边文件,后者是用户id以及用户名字典,这个数据集边大概有10亿左右,顶点有四千万。graphx的pagerank算法写的虽然十分简短精炼,但是存在着这样两个问题:

  • 收敛性方面:算法迭代的时候pagerank值在不断的变化,总的pagerank值也在不断变小,这也说明无论迭代多少次,这样的算法也是不可能收敛的,因为迭代次数越多,损失的pagerank值也越多。
  • 效率方面: 该算法在迭代的时候,迭代时间变得越来越长, 迭代时间几乎成倍增长,如果该算法整体的迭代次数比较短还可以忍受,但是越到后来迭代时间会达到一个小时左右,有时候会造成内存崩溃。

幂法跟传统pagerank的根本区别

error

  在一个网络图中,经常会出现出度为0 的顶点,如上图顶点C,如果仍然用之前的方式,即所有顶点以damping factor的概率沿着链接跳转,(1 - damping factor)的概率随机访问,那么每次迭代就会损失所有出度为0的顶点的pagernak总和乘以damping factor。简而言之,就是这些出度为0 的顶点有85%(默认damping factor就是0.85)的pagerank并没有给其他的顶点。

  那怎样才是合理的呢,从经验上讲,对于这些出度为0的顶点,它们应该将自己100%的pagerank随机地贡献给所有的页面才是合理的。也就是说,当没有链接往下跳转的时候,我们随机地选择任意一个网页。幂法跟传统的pagerank最根本的区别就是合理地处理了出度为0的顶点的pagerank值,让迭代的过程pagerank总和不变,只是各个网页的pagerank值有增有减。

算法介绍

  假设$N$是Internet 中所有可访问的网页的数目,此数值非常大,定义:$N \times N$的网页链接矩阵$G = g_{i,j}\in R^{N*N}$, 若网页i存在链接到j, 则有$g_{i,j} = 1$, 否则 $g_{i,j} = 0$。大规模稀疏矩阵有如下特点:

  • G中非零元素的的数目是整个Internet中超链接的数目。
  • 第j列非零元素的位置表示了所有链接到网页j的网页。
  • 第i行非零元素的位置表示了所有从网页i链接出去的网页。
  • 记矩阵行元素之和 $r_i = \sum _j g_{i,j}$, 它表示了第i个网页的出度。
  • 记矩阵列元素之和 $c_j = \sum _i g_{i,j}$, 它表示了第j个网页的入度。

  要计算pagerank, 可假设一个随机上网“冲浪”的过程,即每次看完网页后有两种选择:

  在当前网页中随机选择一个超链接进入下一个网页。
随机打开一个新的网页。
  设$p$为从当前网页上找一个链接跳转过去的概率(也叫damping factor),则$1 - p$即为不选当前网页链接,随机去任意一个网页的概率。若当前网页是 $i$,则其跳转到网页 $j$ 的可能性有两种:

  • 网页 j 在网页 i 的链接上,也就是说这两个顶点之间存在有向边,其概率为 $\frac {p}{r_i} + \frac {1 - p}{N}$.
  • 网页 j 不在网页 i 的链接上,也就是说这两个顶点不存在有向边,其概率为 $\frac {1 - p}{N}$.
  • 由于网页 i 与网页 j 之间是否存在有向边由$g_{i,j}$决定,则转移矩阵为:

$$a_{i,j} = p \times \frac{g_{i,j}}{r_i} + \frac{1 - p}N$$

  需要注意的是,如果 $r_i = 0$, 则意味着 $g_{i,j} = 0$, 上式中 $a_{i,j} = \frac{1}{N}$。任意两个网页之间的转移矩阵 $A = \left(a_{i,j} \right)_{n \times n}$。设矩阵D为各个网页出度的倒数(若没有出度则置为1)构成N阶对角矩阵, $e$是全为1的N维向量, 则有:

$$A = pGD + \frac {1 - p}{N}ee^T $$

  设 $x_i ^k$, i = 1,2,3…n表示某时刻 k 网页 i 的pagerank值, 向量 $x^k$表示当前时刻各个网页的pagerank。那么下一时刻浏览到网页 j 的概率为 $x_j^ {k + 1} = \sum_i a_{i,j} \times x_i^k$, 此时浏览网页的pagerank为 $x^{k + 1}= Ax^k$.

  当这个过程无限进行下去,达到极限情况,即网页访问概率 $x^k$ 收敛到一个极限值,这个极限向量x为各个网页的pagerank,它满足 $Ax = x$, 且 $\sum_i^n x_i = 1 $. 最后可以看出求得的pagerank实际上也是矩阵A在特征值为1时的特征向量。

Spark Graphx 实现

  将每个节点的pagerank值分成两部分,第一部分是通过链接跳转过来的,第二部分为不通过链接跳转的部分。贡献第二部分的pagerank值的节点有两类:

出度为 0 的顶点,这些顶点以1/N的概率跳转到其他页面, 对任意一个网页的贡献为$\frac{x_i}{N}$;
出度不为 0 的顶点,这些顶点以(1 - p) / N的概率跳转到其他网页,贡献了$\frac {(1 - p) \times x_i}{N}$;
由于第二部分得到的pagerank对所有的节点而言都是一样的,在graphx中,计算第一部分使用消息传递,第二部分可以采用map操作。

error

如何解决迭代时间变长

  迭代时间变长是由于RDD的lazy computation的特性,以及RDD为了错误恢复,在迭代的过程中都保留着很长且复杂的依赖关系(lineage dependency),不过为什么lineage越长会造成迭代时间越长,揣测是因为迭代的时候会占用大量内存,这也是为什么迭代次数太多会发生内存泄露。解决这个问题,主要就是要懂得合理的cache以及unpersist, 仅仅这些还不够,还需要在适当的时候,truncate lineage设置checkpoint。

  下面是我train pagerank的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
/** Update the vertex with new pagerank.*/
def updateVertex(id: VertexId, attr: Double, msg_sum: Double): Double = {
damping_factor * msg_sum
}
def sum(a: Double, b: Double): Double = a + b
/** Send pr/out_degree with edge direction.*/
def sendPR(e: EdgeContext[Double, Double, Double]) = {
e.sendToDst(e.srcAttr * e.attr)
}
/**
* Run pagerank algorithm with damping_factor and max_iter.
* @param g The original graph.
* @return An RDD includes all vertices' id and pagerank value.
*/
def run(g: Graph[Double, Double]): PagerankModel = {
N = g.vertices.count()
//Cache the outdegree to get the vertices whose outdegree is 0.
val outdegree = g.outDegrees.cache()
//Initialize the graph with its pr set 1/N and edge attr set 1/out_degree
var graph = g.outerJoinVertices(outdegree) {
case (vid, vdata, deg) => deg.getOrElse(0)
}.mapTriplets(e => 1.0 / e.srcAttr)
.mapVertices((id, attr) => 1.0 / N)
.cache()
// v is a vertices set whose outdegree is > 0.
val v: VertexRDD[Double] = graph.vertices.innerJoin(outdegree) { (vid, PR, deg) => PR }
// special_vertices are vertices whose outdegree is 0.
val special_vertices = graph.vertices.subtract(v).cache()
special_vertices.checkpoint()
outdegree.unpersist(blocking = false)
//send messages:[ pr / out_degree ].
//aggregate messages to the same dstId.
var messages: VertexRDD[Double] = graph.aggregateMessages(sendPR, sum).cache()
var old_message: VertexRDD[Double] = null
//K is the num of vertices whose out_degree is 0.
K = N - outdegree.count()
//Respectly calculate the avg_PR_deg1 for pages with out_degree
//and avg_PR_deg0 for pages with out_degree 0.
var avg_PR_deg1 = (1.0 - K * 1.0 / N) / N
var avg_PR_deg0 = 1.0 * K / (N * N)
var preG: Graph[Double, Double] = null
val times = new ArrayBuffer[Double]()
var iter = 0
while (iter < max_iter) {
val timer = new Timer()
val new_PR: VertexRDD[Double] = graph.vertices
.innerJoin(messages)(updateVertex).cache()
preG = graph
//update tht pagerank value.
graph = graph.outerJoinVertices(new_PR) {
case (vid, old_PR, new_opt) => new_opt.getOrElse(0.0)
}.mapVertices {
(id, PR) => PR + avg_PR_deg1 * (1 - damping_factor) + avg_PR_deg0
}.cache()
graph.vertices.foreachPartition(f => {})
graph.checkpoint()
old_message = messages
messages = graph.aggregateMessages(sendPR, sum).cache()
//all pagerank value sum.
val sum_PR: Double = graph.vertices.map { case (id, attr) => attr }.sum()
//pagerank sum for those vertices whose outdegree is 0.
val sum0_PR = graph.vertices.innerJoin(special_vertices) {
case (id, pr, zero) => pr
}.map {
case (id, attr) => attr
}.sum()
avg_PR_deg1 = (sum_PR - sum0_PR) / N
avg_PR_deg0 = sum0_PR / N
iter = iter + 1
val cost_time = timer.cost()
times.append(cost_time)
val msg = Msg(s"iter " -> iter, s"cost_time(ms)" -> cost_time)
logInfo(msg.toString)
old_message.unpersist(blocking = false)
preG.unpersist(blocking = false)
new_PR.unpersist(blocking = false)
}
val avg_time = times.sum / iter
logInfo(s"average iteration time is: $avg_time ms.\n")
val PR: RDD[(VertexId, Double)] = graph.vertices
val prmodel = new PagerankModel(PR)
prmodel
}

总结

  pagerank还是一个收敛很快得算法,对于上面测试的社交网络图,规模还是很大的,迭代15次以后,top50的users 基本跟某一篇论文中次序完全一致。在效果方面,如果只要保证topK用户的次序,迭代次数不用很多;在效率上看,如果图不是很大,设置checkpoint可以不用那么频繁,比如迭代两三次写一次hdfs也行,另外在spark对图进行partition的过程中,应尽量减少通信,也就是说有同样srcID的被分到同一个partition中,spark Graphx中有相应的API,PartitionStrategy.EdgePartition2D。

  上图中的pagerank分别为:

error

Dare To Think, Strive To Execute

岛上书店

发表于 2016-11-17

是迟钝的感受让岁月匆匆

  细腻的感觉也是生活丰富的一部分,我总提醒自己还记得小学时候每一次升旗的沸腾热情么,还记得高中校园的桂花淡雅的香气在夜色中给我的感觉么,以及在长春深夜自习回来踩着厚厚的积雪侵在昏黄的灯光中看到灯光下飞舞的雪花的样子么,还有现在,每天晚上骑着自行车飞快地从中关村东路飞驰过去,望着斑斓的车尾灯以及红绿灯,恍惚中意识到自己是在帝都。

  是我们对生活中迟钝的感受,使得岁月匆匆。尤其是现在,每天来到自己的格子间,打水,浇花,打开电脑,日复一日重复做着同样的事儿,虽然自己清楚每天自己CPU的流水线还是满负荷的,但是如果不是工作内容的不同,谁能分得清昨天跟前天的区别。

error

岛上书店

  之所以说这么多,是因为我想把这些感受记录下来,不为别的,只为记录我当时看它的感觉。

  读小说需要在适合它的人生阶段读。我们在二十岁有共鸣的东西到了四十岁不一定能产生共鸣,反之亦然。书本如此,生活亦如此。

关于政治、上帝、和爱,人们总爱讲些无聊的谎话。想要了解一个人你只需问一个问题:“你最喜欢的书是什么?”

关于婚姻,要是有谁让你觉得你在一屋子人中是独一无二的,就选那个人吧。

  这篇小说结构紧凑,温暖人心,很自然,就是写一个很寻常的人很寻常的自救的故事。周末的晚上,伴着书桌上的百合花香,只觉得此时自己是最幸福的。

Dare To Think, Strive To Execute

Quickly DownLoad Resources From Coursera

发表于 2016-11-17

安装coursera-dl

  • 这个非常简单,直接pip install coursera-dl

下载

  • 登陆到coursera主页,登陆并且enroll某个要下载的课程。

  • 进入课程,https://class.coursera.org/ntumltwo-001, 例如我想下载林轩田的机器学习基石,则将ntumltwo-001作为我的课程id.

  • 下载命令

  • 1
    coursera-dl -u myaccount -p mypasswd -d your_target_dir ntumltwo-001
Dare To Think, Strive To Execute

Spark小白学习之路

发表于 2016-11-17

背景

  最近好几个同学问我怎么开始学习spark, 虽说对spark的认识也十分有限,但是好歹接触spark也有一年半了,所以记录下自己的学习历程以便初入门者不至于最后“从入门到放弃”。想想当初自己抱着“hadoop权威指南”怀着一定要跟hadoop死磕到底的悲壮心情,不仅哑然失笑。刚开始的时候,都会觉得高大上的东西不大好懂,实则不然。刚开始大都是从看书开始的,理论知识又会让人觉得晦涩,顿时愈发觉得这些分布式框架高深莫测了。好在我学习spark没有那么痛苦,甚至可以说十分热爱这个分布式平台,遂分享下我的学习心得。

环境

spark

  下载spark稳定版的binary安装包,按照官方文档配置下spark_env.sh等配置文件。

  • tutorial: spark-tutorial
  • spark programming guide: programming-guide
    上面的教程直接在spark-shell中运行(不用 IDE)就好了。
    先看一遍试一遍有一个直观的感受。

    IDE

      intelliJ IDEA,该IDE功能十分强大,不仅集成了terminal, scala 编译器,sbt, 版本控制等, 还有代码美化,自动补全的功能。
    安装intelliJ IDEA, 然后安装sbt以及scala的编译器。
    error
      点击下面的plugin,搜索并安装set以及scala 插件。不过sbt的控件有时候会安装失败,如果多次失败,建议下载下来手动安装。至于sbt是做什么的,通俗点儿理解就是sbt是管理项目依赖的一个工具,spark程序依赖很多spark的jar包,如果我们配置了build.sbt那么sbt会自动去远程库中帮我们下载这些jar包,放在我们library中。sbt跟maven功能相同,但是前者服务于scala,后者主要用在java项目中。
      下载完了控件,接下来配置build.sbt文件。你在idea中新建一个项目的时候,首先项目下面就会出现一个build.sbt文件。如果只是简单的scala程序,不需要做额外配置,倘若是spark程序,可以按照如下格式添加依赖。
1
2
3
4
5
6
7
8
9
10
11
12
name := "example"
version := "0.1"
scalaVersion := "2.10.4"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "1.5.2",
"org.apache.spark" %% "spark-graphx" % "1.5.2",
“org.apache.spark" %% "spark-streaming" % "1.5.2",
"org.apache.spark" %% "spark-mllib" % "1.5.2",
"org.apache.spark" %% "spark-sql" % "1.5.2" )

  上面我添加了5个依赖,这5个包都是spark生态的主要部门,分别是spark-core,这个只用用到RDD都需要使用,spark-graphx提供图数据的存储以及计算方面的API,spark-mllib中包含了一定数量的机器学习算法API,spark-streaming针对流式计算,spark-sql是spark生态不可低估的生力军,因为spark-sql支持多种数据源,可以处理hive,mysql中的数据,轻松实现ETL的操作。
上面的依赖包并不是越多越好,因为sbt每次创建新项目都会要下载这些依赖,一来比较慢,二来最后的jar包会很大。
语言, scala. 虽说spark支持python, R, Java, 但是一开始学习的时候用的就是scala就没觉得scala是学习障碍,况且scala的函数式编程功能在写map, reduce函数的时候真的十分便捷。另外scala跟java可以混合编程,scala文件中可以import java的包。
学习要点

driver跟executor的区别

  简单理解,driver运行主程序,如果有map等操作,都交给executor来执行。
分布式计算框架的核心思想就是要找可以并行的部分,然后将这些部分分给其他的计算机来执行,以此来提升运行的效率。想到这里看到其他的概念,就会清晰很多。当初碰到driver,executor, master, worker这些概念的时候也是一头雾水。
transformation 跟 action

  transformation跟action都是spark提供给RDD的API的分类。RDD提供给开发者的API有:

  • transformation: map, join, groupby, filter, flatmap, union, intersection, distinct……
  • action: reduce, sum, first, top, collect, take……
  • spark是内存计算,也是懒惰计算,就是所有的计算资源都尽可能load进内存中,transformation不会立即执行,但是出现action以后该action依赖的transformation才会一起执行。spark在运行期间会一直保存RDD的依赖信息,也叫血缘(lineage)。当然如果是迭代算法,需要好好考虑怎么优化,都则会出现OOM。

简单的spark程序 wordcount

1
2
3
4
5
6
7
8
9
10
11
12
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object wordcount {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("wordcount").setMaster("local[2]")
val sc = new SparkContext(conf)
sc.textFile("text").flatMap{ s => s.split("\\s+")}.map{word => (word, 1)}.reduceByKey(_ + _).foreach(println)
sc.stop()
}
}

  处理小数据量的程序在idea就可以胜任(指定master为local), 数据量很大的情况可以打成jar包上传到spark集群中运行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
output=hdfs://bda00:8020/user/xx/hive_table
table=test
function run {
hadoop fs -rm $output
spark-submit \
--executor-memory 5G \
--master "spark://hostname:7077" \
--class "sparksql.reader.HiveReader" \
--conf "spark.cores.max=10" \
jarName.jar \
--output_pt $output \
--table src
}
run

  这个jar包之前的是spark环境参数,例如executor的core数目,内存大小等,当然这个还可以传入依赖的jar包等。具体使用详见, spark_submit_application.–class指定spark程序主类。

  spark程序的参数是使用了scopt来实现读参数的,这个看起来比较美观,还可以进行参数检查,简单的参考java程序的传参方法(貌似就是jar包后面空格隔开)就好。

总结

  实践是了解spark最直接最有效的学习方式,如果想进一步了解spark,实践这些文档中的例子还远远不够,这个时候再看一些相关的技术书籍或者博客肯定是非常必要的。当我们对spark有了一定的认识,再了解它就不会那么恐惧了。

  另外补充一些非常不错的进阶资料,当然最好的还是官方文档:

  • http://dongxicheng.org/framework-on-yarn/apache-spark-shuffle-details
  • http://blog.sina.com.cn/s/blog_4d1426660102v3pt.html
  • http://blog.sina.com.cn/s/blog_4d1426660102v5u2.html
Dare To Think, Strive To Execute

Tensorflow Mac

发表于 2016-11-17

前言

最近找工作,好久都没碰博客了,不是没有新东西,只是各种忙乱一直没空,再加上这个服务器响应速度越来越差,也就更加没有之前的精心细致。好不容易来一次,故将今天的bug解决方案留下。

问题环境

我是在mac上采用pip 安装tensorflow的,python环境是2.7,遇到的bug是这样的:

error

解决方案

这个问题我是查找到这个博客:python安装,基本上是需要

1
pip install --upgrade setuptools

问题是setuptools这个包找不到,接着重新安装tf即可。

123
Li Ping

Li Ping

22 日志
9 标签
RSS
© 2017 Li Ping
由 Hexo 强力驱动
主题 - NexT.Muse