0%

如何安装Anaconda及基本的使用请参考 macOS下Anaconda的安装及环境切换(Python2/Python3)

我在安装Anaconda的时候,软件自带的版本是Python3.6的,如果我现在需要使用Python2.7或者Python3.7,应该怎么操作呢?

如果我们在Anaconda Navigator控制台上做Create操作,环境是可以成功创建,但环境里面携带的包是非常少的。

所以可以参考如下方式进行Python版本修改。

1
conda create -n python37 python=3.7 anaconda

如果conda的版本比较旧的话,会出现如下提示:

1
2
3
4
5
6
7
==> WARNING: A newer version of conda exists. <==
current version: 4.6.2
latest version: 4.6.7

Please update conda by running

$ conda update -n base -c defaults conda

所以我们可以先执行命令

1
conda update -n base -c defaults conda

然后再执行conda create命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
conda create -n python37 python=3.7 anaconda
Collecting package metadata: done
Solving environment: done

## Package Plan ##

environment location: /Users/simon/anaconda3/envs/python37

added / updated specs:
- anaconda
- python=3.7


The following packages will be downloaded:

package | build
---------------------------|-----------------
mkl-2019.1 | 144 154.4 MB
------------------------------------------------------------
Total: 154.4 MB

The following NEW packages will be INSTALLED:

alabaster pkgs/main/osx-64::alabaster-0.7.12-py37_0
anaconda pkgs/main/osx-64::anaconda-2018.12-py37_0
...
...
Proceed ([y]/n)? y


Downloading and Extracting Packages
mkl-2019.1 | 154.4 MB | ###################################################################################################### | 100%
Preparing transaction: done
Verifying transaction: done
Executing transaction: done
#
# To activate this environment, use:
# > source activate python37
#
# To deactivate an active environment, use:
# > source deactivate
#

按照上面的提示,我们可以执行如下命令激活或失效某个python环境。

  • 激活
1
source activate python37
  • 失效
1
source deactivate

方便快速从不同的数据源(json, parquet, rdbms),经过混合处理(json join parquet)
再将处理结果以特定的格式(json, parquet)写回到指定的系统中(HDFS, S3)

spark.read.format(format)

format
built-in json, parquet jdbc, csv(2.x)
packages: 外部的非SPark内置 spark-packages.org

spark.write

操作parquet

ParquetApp

main

spark = SparkSession

df = spark.read.format(“parquet”).load(users.parquet”)
df.printSchema
df.select*

new_df.write.format(“json”).save(new_path)

默认处理parquet数据

spark-sql

create temporary view parquetTable
USING…
Options(
path: fsafsdfsafdsf
)

select * from parquettable

spark.read.format().option(path).load()

处理Hive

Df写入到hive
write.writeAsTable()

设置分区的数量,默认200

处理MySQL(jdbc)

当我们把Spark Streaming程序开发好以后,放到测试环境测试时,如何通过Maven打包呢?

在这篇文章中,我将介绍两种Maven的build方式。

推荐使用第二种方式。

所有依赖打成一个jar包

通过这种方式,把所有的依赖都打成一个大的jar包。

  • 优点

通过spark-submit执行程序时,简单方便,不需要指定jars参数。

  • 缺点

依赖多的时候,有时候包会有几百M,在网络传输过程时效率不高。

pom.xml文件的build节点需要添加如下两个Maven插件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id> <!-- this is used for inheritance merges -->
<phase>package</phase> <!-- bind to the packaging phase -->
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>

使用org.scala-tools:maven-scala-plugin插件有个好处是,如果项目里面有src/main/javasrc/main/scala两种语言混编时,可以成功编译源码。

1
2
[INFO] /Users/simon/Development/workspace/scala/spark-streaming-demo/src/main/java:-1: info: compiling
[INFO] /Users/simon/Development/workspace/scala/spark-streaming-demo/src/main/scala:-1: info: compiling

执行Spark任务命令:

1
2
3
bin/spark-submit --master local[2] \
--class gy.finolo.spark.project.StreamingApp \
../data/app/spark-streaming-demo-1.0-SNAPSHOT.jar localhost:9092 streamingtopic

打成一个小jar和lib依赖文件夹

  • 优点

Spark Streaming源码被修改后,打成小包就可以上传服务器,传输流量小,速度快。

  • 缺点

提交任务时,需要指定jars参数

pom.xml文件的build节点需要添加如下两个Maven插件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.10</version>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
<excludeScope>provided</excludeScope> <!-- provided scope的依赖不需要放入到lib中 -->
</configuration>
</execution>
</executions>
</plugin>
</plugins>

执行Spark任务命令,需要指定jars参数,注意:这里不是lib的地址。

1
2
3
4
bin/spark-submit --master local[2] \
--class gy.finolo.spark.project.StreamingApp \
--jars $(echo ../data/app/lib/*.jar | tr ' ' ',') \
../data/lib/spark-streaming-demo-1.0-SNAPSHOT.jar localhost:9092 streamingtopic

$(echo ../data/app/lib/*.jar | tr ' ' ',')代码的意思是把jar包名以逗号分隔符拼接成一个字符串。