Spark编程入门(一) QuickStart

Spark 基本概念

在实际应用中,大数据处理主要包括:

  • 复杂的批量数据处理(数十分钟 - 几小时)
  • 基于历史数据的交互式查询(数十秒 - 几分钟)
  • 基于实时流的数据处理 (数百毫秒 - 几秒)

Spark 的设计遵循“一个软件栈满足不同的应用场景”,有一套完整的生态系统。包括内存计算框架、SQL即时查询、实时流式计算、机器学习和图计算等。Spark可以部署在 YARN 资源管理器上,提供一站式的大数据解决方案。

  • RDD:弹性分布式数据集(Resilient Distributed Dataset),分布式内存的一个抽象概念,提供了一种高度受限的内存模型。
  • DAG:有向无环图(Directed Acyclic Graph, DAG),反映RDD之间的依赖关系。
  • Executor:运行在工作节点(Worker Node)上的一个进程,负责运行任务,为应用程序存储数据。
  • 应用:用户编写的 Spark 程序
  • 任务:运行在 Executor 上的工作单元
  • 作业:一个作业包含多个RDD以及作用域相应RDD的各种操作
  • 阶段:是作业的基本调度单位,一个作业分为多组任务,每组任务被称为“阶段”,或者“任务集”。

在 Spark 中,一个应用(Application)由一个任务控制节点(Driver)和若干个作业(Job)构成,一个作业由多个阶段(Stage)构成,一个阶段由多个任务(Task)组成。


Spark 生态系统组件

  • Spark Core:内存计算、任务调度、部署模式、故障恢复、存储管理等,主要面向批数据处理
  • Spark SQL:允许开发者直接处理RDD,也可查询 Hive、Hbase等外部数据源。统一处理数据关系表和RDD,开发者无需自己编写Spark程序,即可用 SQL 命令查询。
  • Spark Streaming: 实时流处理。支持多种数据输入源,如 Kafka、Flume、HDFS 或 TCP套接字。
  • MLlib(机器学习):提供机器学习算法实现,包括聚类、分类、回归、协同过滤。
  • GraphX(图计算)

无论哪个组件,都可以用 Core 的API处理问题。


Spark 的安装

在 Spark 官网下载 tgz 压缩包

1
wget http://mirrors.tuna.tsinghua.edu.cn/apache/spark/spark-2.3.1/spark-2.3.1-bin-hadoop2.7.tgz
  • 注:以上是 2.3.1 版本的清华大学镜像,最新版本在 spark 官网

将 spark 解压到当前目录

1
tar -zxvf spark-2.3.1-bin-hadoop2.7.tgz -C .

Spark 三种部署方式

  • standalone
  • Spark on Mesos(官方推荐)
  • Spark on YARN

使用 Spark Shell 进行交互式编程

Spark 官方提供了命令行交互式编程,也就是一边输入一边输出。

安装完 spark 后, 运行 ./bin/spark-shell 即可启动 scala 语言的 spark shell, 运行 ./bin/pyspark即可启动 Python 语言的 spark shell (pyspark)。


独立 spark 应用程序

但是我们一般都是在 IDE 里编写独立的应用程序(Self-Contained Applications),再部署打包运行。

我这里根据官方示例,使用 IDEA,编写一个统计文本中包含字母 ‘a’ 的行数和包含字母 ‘b’的行数的 spark 应用程序。

新建 Maven 工程

首先新建一个 Maven 工程,然后在 pom.xml 中添加依赖

1
2
3
4
5
6
7
8
9
10
11
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>

<dependencies>
<dependency> <!-- Spark dependency -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.3.0</version>
</dependency>
  • properties标签里的内容目的是让我们的工程基于 jdk 1.8
  • dependencies标签里的内容表示依赖,需要哪些依赖一般 Spark 官方文档 在相应的地方都会有标注,然后可以到MVN仓库去搜,上面会告诉你对应版本的dependency该怎么写。

编写java代码

SimpleApp.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/* SimpleApp.java */
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;

public class SimpleApp {
public static void main(String[] args) {
// 定义文件路径 (可以说 file:// 也可以是 hdfs://)
String logFile = "file:///home/jerrysheh/spark-2.3.0/README.md";

//定义 spark 会话
SparkSession spark = SparkSession.builder().appName("Simple Application").getOrCreate();

// 定义数据集( spark读 logfile 文本文件,缓存)
Dataset<String> logData = spark.read().textFile(logFile).cache();

// 数据集过滤,包含 a / b 的就进行统计
long numAs = logData.filter((FilterFunction<String>) s -> s.contains("a")).count();
long numBs = logData.filter((FilterFunction<String>) s -> s.contains("b")).count();

//输出
System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);

spark.stop();
}
}

配置运行环境

点击edit configuration,在左侧点击该项目。在右侧 VM options中 输入-Dspark.master=local,指示本程序本地单线程运行。

spark 支持的 master URL 有:

  • local 本地单线程
  • local[K] 本地多线程(指定K个内核)
  • local[*] 本地多线程(指定所有可用内核)
  • spark://HOST:PORT 连接到指定的 Spark standalone cluster master,需要指定端口。
  • mesos://HOST:PORT 连接到指定的 Mesos 集群,需要指定端口。
  • yarn-client 客户端模式 连接到 YARN 集群。需要配置 HADOOP_CONF_DIR。
  • yarn-cluster 集群模式 连接到 YARN 集群。需要配置 HADOOP_CONF_DIR。

配置完以后,就可以直接在 IDEA 运行,开发阶段不必打包后在命令行运行。

打包

使用 mvn 命令打包

安装 maven

1
sudo apt install maven

打包 jar

1
mvn package

这样 out 目录就生成了 一个 jar 文件,这就是我们的 spark 应用程序了。

使用 IDEA 打包

用 maven 打包的 jar 非常大,因为把很多依赖都打包进去了,我们可以用 IDEA 删除我们不需要的组件,这样打包出来的 jar 就很小了。

使用 IDEA 打包的方法可以参考:利用开发工具IntelliJ IDEA编写Spark应用程序

如果需要 sql 驱动,可以这样写:

1
2
3
4
5
6
7
8
9
10
// 从数据库读 DataFrame
def readFromMySQL(spark: SparkSession, tableName:String): DataFrame = {
val prop=new java.util.Properties
prop.setProperty("driver", "com.mysql.cj.jdbc.Driver")
prop.setProperty("user","root")
prop.setProperty("password","YOURPASSWORD")

val df = spark.read.jdbc(jdbcURL, tableName, prop)
df
}

这样当提交到 spark-submit 的时候会读取你的驱动,否则报 Driver not found

运行

在 spark 安装目录下

1
./bin/spark-submit --class "SimpleApp" --master local[4] simple-project.jar

输出:

1
2
3
...
...
Lines with a: 46, Lines with b: 23

这样,一个简单的 spark 应用程序就运行成功了。

只输出 WARN 和 ERROR 不输出 INFO

把 SPARK_HOME/conf 下的 log4j.properties.template 复制到 工程 Resource Root 下面(我这里是 Scala 包),重命名为 log4j.properties 。 打开,修改其中的

1
log4j.rootCategory=INFO, console

将 INFO 改成 WARN 或者 ERROR 即可