0%

Spark运行模式

Spark 有很多种模式,最简单就是单机本地模式,还有单机伪分布式模式,复杂的则运行在集群中,目前能很好的运行在 Yarn和 Mesos 中,当然 Spark 还有自带的 Standalone 模式,对于大多数情况 Standalone 模式就足够了,如果企业已经有 Yarn 或者 Mesos 环境,也是很方便部署的。

  • local(本地模式):常用于本地开发测试,本地还分为local单线程和local-cluster多线程;
  • standalone(集群模式):典型的Mater/slave模式,不过也能看出Master是有单点故障的;Spark支持ZooKeeper来实现 HA
  • on yarn(集群模式): 运行在 yarn 资源管理器框架之上,由 yarn 负责资源管理,Spark 负责任务调度和计算
  • on mesos(集群模式): 运行在 mesos 资源管理器框架之上,由 mesos 负责资源管理,Spark 负责任务调度和计算
  • on cloud(集群模式):比如 AWS 的 EC2,使用这个模式能很方便的访问 Amazon的 S3;Spark 支持多种分布式存储系统:HDFS 和 S3

local模式运行

1
spark-shell --master local[2]

2代表2个worker。

如果local[*],也是默认master选项,则自动获取机器cores数量。

standalone模式运行

Spark Standalone模式的架构和Hadoop hdfs/yarn很类似,1 master + n workers

配置conf/spark-env.sh文件

1
2
3
4
cd $SPARK_HOME
cd conf
cp spark-env.sh.template spark-env.sh
vi spark-env.sh

添加如下内容:

1
2
3
SPARK_MASTER_HOST=localhost
SPARK_WORKER_CORES=1
SPARK_WORKER_MEMORY=2g

启动Spark

1
2
3
4
cd $SPARK_HOME
sbin/start-all.sh
starting org.apache.spark.deploy.master.Master, logging to /usr/local/spark/spark-2.2.3-bin-2.6.0-cdh5.7.0/logs/spark-simon-org.apache.spark.deploy.master.Master-1-localhost.out
localhost: starting org.apache.spark.deploy.worker.Worker, logging to /usr/local/spark/spark-2.2.3-bin-2.6.0-cdh5.7.0/logs/spark-simon-org.apache.spark.deploy.worker.Worker-1-localhost.out

查看master日志

1
2
19/02/10 13:11:09 INFO Master: I have been elected leader! New state: ALIVE
19/02/10 13:11:12 INFO Master: Registering worker 192.168.1.6:51683 with 1 cores, 2.0 GB RAM

查看worker日志

1
19/02/10 13:11:12 INFO Worker: Successfully registered with master spark://localhost:7077

执行jps命令,可以看到有MasterWorker进程

1
2
3424 Master
3459 Worker

生成wordCount的输入文件

新建/usr/local/spark/data/words文件

1
vi /usr/local/spark/data/words

添加如下内容

1
2
3
hello,hello,world
hello,world,
welcome

启动Spark-shell

以standalone模式启动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
bin/spark-shell --master spark://localhost:7077
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
19/02/10 13:12:41 WARN Utils: Your hostname, localhost resolves to a loopback address: 127.0.0.1; using 192.168.1.6 instead (on interface en0)
19/02/10 13:12:41 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
19/02/10 13:12:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context Web UI available at http://192.168.1.6:4040
Spark context available as 'sc' (master = spark://localhost:7077, app id = app-20190210131243-0000).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.2.3
/_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_181)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

输入wordCount程序

1
2
3
4
5
6
7
8
9
10
scala> var file = spark.sparkContext.textFile("file:///usr/local/spark/data/words")
file: org.apache.spark.rdd.RDD[String] = file:///usr/local/spark/data MapPartitionsRDD[6] at textFile at <console>:23

scala> val wordCounts = file.flatMap(line => line.split(",")).map(word => (word, 1)).reduceByKey(_ + _)
wordCounts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[9] at reduceByKey at <console>:25

scala> wordCounts.collect
res1: Array[(String, Int)] = Array((hello,3), (welcome,1), (world,2))

scala>

可以看到/usr/local/spark/data/words文件里面的单词被成功的统计了。

Spark会依赖Hadoop版本,当我们使用cdh版本的hadoop时,在Spark官网上下载不了对应的版本,这时就需要编译Spark了。

下载源码

到Spark官网 http://spark.apache.org/downloads.html 下载Spark的源码,并非已经Build好的安装包。

Spark release我选择的是2.2.3

package type选择Source Code

下载并解压

1
2
3
4
cd /usr/local/spark
wget https://archive.apache.org/dist/spark/spark-2.2.3/spark-2.2.3.tgz

tar -zxvf spark-2.2.3.tgz

构建发布版本

查看dev/make-distribution.sh源码,可以知道构建后的包的文件名为spark-$VERSION-bin-$NAME.tgz,所以--name参数设置为2.6.0-cdh5.7.0,-P是指定使用pom.xml中指定的profile,-D是指使用指定的Dependency。

1
2
cd spark-2.2.3
./dev/make-distribution.sh --name 2.6.0-cdh5.7.0 --tgz -Phadoop-2.6 -Phive -Phive-thriftserver -Pmesos -Pyarn -Dhadoop.version=2.6.0-cdh5.7.0

构建过程中,我们会发现出现了以下错误:

1
2
3
4
5
6
7
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 46.000 s (Wall Clock)
[INFO] Finished at: 2019-02-10T11:28:39+08:00
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal on project spark-launcher_2.11: Could not resolve dependencies for project org.apache.spark:spark-launcher_2.11:jar:2.2.3: Could not find artifact org.apache.hadoop:hadoop-client:jar:2.6.0-cdh5.7.0 in alimaven (http://maven.aliyun.com/nexus/content/groups/public/) -> [Help 1]

表明在现有的maven仓库中,找不到cdh版本的jar包。所以,我们得在pom.xml中的repositories中添加cdh的仓库地址。

编辑pom.xml文件,在repositories标签下的maven central仓库后面添加cloudera的仓库。

1
2
3
4
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>

然后再执行刚才的命令,经过12分钟,spark-2.2.3-bin-2.6.0-cdh5.7.0.tgz安装包构建成功。

1
2
3
4
5
6
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 12:08 min (Wall Clock)
[INFO] Finished at: 2019-02-10T11:59:57+08:00
[INFO] ------------------------------------------------------------------------

解压spark-2.2.3-bin-2.6.0-cdh5.7.0.tgz包到上级spark目录

1
2
cd ..
tar -zxvf spark-2.2.3/spark-2.2.3-bin-2.6.0-cdh5.7.0.tgz -C .

设置环境变量

1
vi ~/.bash_profile

添加内容

1
export SPARK_HOME=/usr/local/spark/spark-2.2.3-bin-2.6.0-cdh5.7.0

使设置生效

1
source ~/.bash_profile

启动Spark

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
cd $SPARK_HOME
bin/spark-shell --master local[*]

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
19/02/10 12:13:43 WARN Utils: Your hostname, localhost resolves to a loopback address: 127.0.0.1; using 192.168.1.6 instead (on interface en0)
19/02/10 12:13:43 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
19/02/10 12:13:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context Web UI available at http://192.168.1.6:4040
Spark context available as 'sc' (master = local[*], app id = local-1549772024897).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.2.3
/_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_181)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

Spark成功启动。

多任务可以由多进程完成,也可以由一个进程内的多线程完成。

我们前面提到了进程是由若干线程组成的,一个进程至少有一个线程。

由于线程是操作系统直接支持的执行单元,因此,高级语言通常都内置多线程的支持,Python也不例外,并且,Python的线程是真正的Posix Thread,而不是模拟出来的线程。

Python的标准库提供了两个模块:_thread和threading,_thread是低级模块,threading是高级模块,对_thread进行了封装。绝大多数情况下,我们只需要使用threading这个高级模块。

启动一个线程就是把一个函数传入并创建Thread实例,然后调用start()开始执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import time, threading

# 新线程执行的代码:
def loop():
print('thread %s is running...' % threading.current_thread().name)
n = 0
while n < 5:
n = n + 1
print('thread %s >>> %s' % (threading.current_thread().name, n))
time.sleep(1)
print('thread %s ended.' % threading.current_thread().name)

print('thread %s is running...' % threading.current_thread().name)
t = threading.Thread(target=loop, name='LoopThread')
t.start()
t.join()
print('thread %s ended.' % threading.current_thread().name)

执行结果如下:

1
2
3
4
5
6
7
8
9
thread MainThread is running...
thread LoopThread is running...
thread LoopThread >>> 1
thread LoopThread >>> 2
thread LoopThread >>> 3
thread LoopThread >>> 4
thread LoopThread >>> 5
thread LoopThread ended.
thread MainThread ended.

由于任何进程默认就会启动一个线程,我们把该线程称为主线程,主线程又可以启动新的线程,Python的threading模块有个current_thread()函数,它永远返回当前线程的实例。主线程实例的名字叫MainThread,子线程的名字在创建时指定,我们用LoopThread命名子线程。名字仅仅在打印时用来显示,完全没有其他意义,如果不起名字Python就自动给线程命名为Thread-1,Thread-2…

Lock

多线程和多进程最大的不同在于,多进程中,同一个变量,各自有一份拷贝存在于每个进程中,互不影响,而多线程中,所有变量都由所有线程共享,所以,任何一个变量都可以被任何一个线程修改,因此,线程之间共享数据最大的危险在于多个线程同时改一个变量,把内容给改乱了。

来看看多个线程同时操作一个变量怎么把内容给改乱了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import time, threading

# 假定这是你的银行存款:
balance = 0

def change_it(n):
# 先存后取,结果应该为0:
global balance
balance = balance + n
balance = balance - n

def run_thread(n):
for i in range(100000):
change_it(n)

t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)

我们定义了一个共享变量balance,初始值为0,并且启动两个线程,先存后取,理论上结果应该为0,但是,由于线程的调度是由操作系统决定的,当t1、t2交替执行时,只要循环次数足够多,balance的结果就不一定是0了。

原因是因为高级语言的一条语句在CPU执行时是若干条语句,即使一个简单的计算:

balance = balance + n
也分两步:

计算balance + n,存入临时变量中;
将临时变量的值赋给balance。
也就是可以看成:

x = balance + n
balance = x
由于x是局部变量,两个线程各自都有自己的x,当代码正常执行时:

初始值 balance = 0

t1: x1 = balance + 5 # x1 = 0 + 5 = 5
t1: balance = x1 # balance = 5
t1: x1 = balance - 5 # x1 = 5 - 5 = 0
t1: balance = x1 # balance = 0

t2: x2 = balance + 8 # x2 = 0 + 8 = 8
t2: balance = x2 # balance = 8
t2: x2 = balance - 8 # x2 = 8 - 8 = 0
t2: balance = x2 # balance = 0

结果 balance = 0
但是t1和t2是交替运行的,如果操作系统以下面的顺序执行t1、t2:

初始值 balance = 0

t1: x1 = balance + 5 # x1 = 0 + 5 = 5

t2: x2 = balance + 8 # x2 = 0 + 8 = 8
t2: balance = x2 # balance = 8

t1: balance = x1 # balance = 5
t1: x1 = balance - 5 # x1 = 5 - 5 = 0
t1: balance = x1 # balance = 0

t2: x2 = balance - 8 # x2 = 0 - 8 = -8
t2: balance = x2 # balance = -8

结果 balance = -8
究其原因,是因为修改balance需要多条语句,而执行这几条语句时,线程可能中断,从而导致多个线程把同一个对象的内容改乱了。

两个线程同时一存一取,就可能导致余额不对,你肯定不希望你的银行存款莫名其妙地变成了负数,所以,我们必须确保一个线程在修改balance的时候,别的线程一定不能改。

如果我们要确保balance计算正确,就要给change_it()上一把锁,当某个线程开始执行change_it()时,我们说,该线程因为获得了锁,因此其他线程不能同时执行change_it(),只能等待,直到锁被释放后,获得该锁以后才能改。由于锁只有一个,无论多少线程,同一时刻最多只有一个线程持有该锁,所以,不会造成修改的冲突。创建一个锁就是通过threading.Lock()来实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
balance = 0
lock = threading.Lock()

def run_thread(n):
for i in range(100000):
# 先要获取锁:
lock.acquire()
try:
# 放心地改吧:
change_it(n)
finally:
# 改完了一定要释放锁:
lock.release()

当多个线程同时执行lock.acquire()时,只有一个线程能成功地获取锁,然后继续执行代码,其他线程就继续等待直到获得锁为止。

获得锁的线程用完后一定要释放锁,否则那些苦苦等待锁的线程将永远等待下去,成为死线程。所以我们用try…finally来确保锁一定会被释放。

锁的好处就是确保了某段关键代码只能由一个线程从头到尾完整地执行,坏处当然也很多,首先是阻止了多线程并发执行,包含锁的某段代码实际上只能以单线程模式执行,效率就大大地下降了。其次,由于可以存在多个锁,不同的线程持有不同的锁,并试图获取对方持有的锁时,可能会造成死锁,导致多个线程全部挂起,既不能执行,也无法结束,只能靠操作系统强制终止。

多核CPU
如果你不幸拥有一个多核CPU,你肯定在想,多核应该可以同时执行多个线程。

如果写一个死循环的话,会出现什么情况呢?

打开Mac OS X的Activity Monitor,或者Windows的Task Manager,都可以监控某个进程的CPU使用率。

我们可以监控到一个死循环线程会100%占用一个CPU。

如果有两个死循环线程,在多核CPU中,可以监控到会占用200%的CPU,也就是占用两个CPU核心。

要想把N核CPU的核心全部跑满,就必须启动N个死循环线程。

试试用Python写个死循环:

1
2
3
4
5
6
7
8
9
10
import threading, multiprocessing

def loop():
x = 0
while True:
x = x ^ 1

for i in range(multiprocessing.cpu_count()):
t = threading.Thread(target=loop)
t.start()

启动与CPU核心数量相同的N个线程,在4核CPU上可以监控到CPU占用率仅有102%,也就是仅使用了一核。

但是用C、C++或Java来改写相同的死循环,直接可以把全部核心跑满,4核就跑到400%,8核就跑到800%,为什么Python不行呢?

因为Python的线程虽然是真正的线程,但解释器执行代码时,有一个GIL锁:Global Interpreter Lock,任何Python线程执行前,必须先获得GIL锁,然后,每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行。这个GIL全局锁实际上把所有线程的执行代码都给上了锁,所以,多线程在Python中只能交替执行,即使100个线程跑在100核CPU上,也只能用到1个核。

GIL是Python解释器设计的历史遗留问题,通常我们用的解释器是官方实现的CPython,要真正利用多核,除非重写一个不带GIL的解释器。

所以,在Python中,可以使用多线程,但不要指望能有效利用多核。如果一定要通过多线程利用多核,那只能通过C扩展来实现,不过这样就失去了Python简单易用的特点。

不过,也不用过于担心,Python虽然不能利用多线程实现多核任务,但可以通过多进程实现多核任务。多个Python进程有各自独立的GIL锁,互不影响。

小结

多线程编程,模型复杂,容易发生冲突,必须用锁加以隔离,同时,又要小心死锁的发生。

Python解释器由于设计时有GIL全局锁,导致了多线程无法利用多核。