hahadsg's note

Follow me on GitHub

参考

http://litaotao.github.io/introduction-to-spark

master, slave

master服务(YARN ResourceManager,Mesos master和Spark standalone master)决定哪些application可以运行,什么时候运行以及哪里去运行。而slave服务( YARN NodeManager, Mesos slave和Spark standalone slave)实际上运行executor进程。

任务分配

job : A job is triggered by an action, like count() or saveAsTextFile(). Click on a job to see information about the stages of tasks inside it.

stage : stage 是一个 job 的组成单位,就是说,一个 job 会被切分成 1 个或 1 个以上的 stage,然后各个 stage 会按照执行顺序依次执行(以shuffle为界进行切分stage)

task : A unit of work within a stage, corresponding to one RDD partition。即 stage 下的一个任务执行单元,一般来说,一个 rdd 有多少个 partition,就会有多少个 task,因为每一个 task 只是处理一个 partition 上的数据。从 web ui 截图上我们可以看到,这个 job 一共有 2 个 stage,66 个 task,平均下来每个 stage 有 33 个 task,相当于每个 stage 的数据都有 33 个 partition [注意:这里是平均下来的哦,并不都是每个 stage 有 33 个 task,有时候也会有一个 stage 多,另外一个 stage 少的情况,就看你有没有在不同的 stage 进行 repartition 类似的操作了。

action, transformation

判断结果会不会发到driver上,发到driver上的是action。比如reduce会产生一个结果发到driver上,就是action;reduceByKey对每个key进行操作,并不会发到driver上,就是transformation

narrow/wide dependences

narrow dependences: C->D,只使用至多一个partition

wide dependences: A->B,需要使用多个partitions,也就是说需要shuffle

Conf

优先级:代码显示调用set;spark-submit传参;配置文件;系统默认值

// print all config
spark.sparkContext.getConf.getAll.foreach(println)

// set config by sql
spark.sql("set spark.sql.shuffle.partitions=200") // 设置sql发生shuffle后形成多少partitions

代码性能

  • 调用函数

错误的方法会将整个obj都发送到worker上

(关于实例化个人总结:将rdd作为示例的成员是没有问题的,如self.rdd;有问题的是df.map(self.f)这种情形)

### wrong way
class SearchFunctions(object):
  def __init__(self, query):
      self.query = query
  def isMatch(self, s):
      return self.query in s
  def getMatchesFunctionReference(self, rdd):
      # Problem: references all of "self" in "self.isMatch"
      return rdd.filter(self.isMatch)
  def getMatchesMemberReference(self, rdd):
      # Problem: references all of "self" in "self.query"
      return rdd.filter(lambda x: self.query in x)
### the right way
class WordFunctions(object):
  ...
  def getMatchesNoReference(self, rdd):
      # Safe: extract only the field we need into a local variable
      query = self.query
      return rdd.filter(lambda x: query in x)
  • broadcast

https://blog.csdn.net/dengxing1234/article/details/74330768

CPU/Memory设置

http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/

https://spark.apache.org/docs/latest/tuning.html

tasks

tasks数量是根据源数据的partition数量来的

tasks过少时

可能会导致executors跑不满(有些executeors闲置)

可能会导致,数据无法完全的放入到memory中(每个executors执行的tasks数量少,但是每次执行tasks所需内存大)

如何设置

通过实验的方式得到partitons数量,先用一个较小的值,每次*1.5,直到性能没有提升

Spark Streaming

https://www.cnblogs.com/Dhouse/p/7615034.html

TODO

怎么查看每个tasks是否完全放入到memory中