6.1 Spark 与 R 语言

6.1.1 sparklyr

Spark 依赖特定版本的 Java、Hadoop,三者之间的版本应该要相融。

在 MacOS 上配置 Java 环境,注意 Spark 仅支持 Java 8 至 11,所以安装指定版本的 Java 开发环境

# 安装 openjdk 11
brew install openjdk@11
# 全局设置 JDK 11
sudo ln -sfn /usr/local/opt/openjdk@11/libexec/openjdk.jdk /Library/Java/JavaVirtualMachines/openjdk-11.jdk
# Java 11 JDK 添加到 .zshrc 
export CPPFLAGS="-I/usr/local/opt/openjdk@11/include"
export PATH="/usr/local/opt/openjdk@11/bin:$PATH"

配置 R 环境,让 R 能够识别 Java 环境,再安装 rJava

# 配置
sudo R CMD javareconf
# 安装 rJava
Rscript -e 'install.packages("rJava", type="soure")'

最后安装 sparklyr 包,以及 Spark 环境,可以借助 spark_install() 安装 Spark,比如下面 Spark 3.0 连同 hadoop 2.7 一起安装。

install.packages('sparklyr')
sparklyr::spark_install(version = '3.0', hadoop_version = '2.7')

也可以先手动下载 Spark 软件环境,建议选择就近镜像站点下载,比如在北京选择清华站点 https://mirrors.tuna.tsinghua.edu.cn/apache/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz,此环境自带 R 和 Python 接口。为了供 sparklyr 调用,先设置 SPARK_HOME 环境变量指向 Spark 安装位置,再连接单机版 Spark。

# 连接 Spark 
library(sparklyr)
library(ggplot2)
sc <- spark_connect(master = "local", spark_home = Sys.getenv("SPARK_HOME"))
# diamonds 数据集导入 Spark
diamonds_tbl <- copy_to(sc, ggplot2::diamonds, "diamonds")

做数据的聚合统计,有两种方式。一种是使用用 R 包 dplyr 提供的数据操作语法,下面以按 cut 分组统计钻石的数量为例,说明 dplyr 提供的数据操作方式。

library(dplyr)
# 列出数据源下所有的表 tbls
src_tbls(sc)

diamonds_tbl <- diamonds_tbl %>%
  group_by(cut) %>%
  summarise(cnt = n()) %>%
  collect

另一种是使用结构化查询语言 SQL,这自不必说,大多数情况下,使用和一般的 SQL 没什么两样。

library(DBI)
diamonds_preview <- dbGetQuery(sc, "SELECT count(*) as cnt, cut FROM diamonds GROUP BY cut")
diamonds_preview
##     cnt       cut
## 1 21551     Ideal
## 2 13791   Premium
## 3  4906      Good
## 4  1610      Fair
## 5 12082 Very Good
# SQL 中的 AVG 和 R 中的 mean 函数是类似的
diamonds_price <- dbGetQuery(sc, "SELECT AVG(price) as mean_price, cut FROM diamonds GROUP BY cut")
diamonds_price
##   mean_price       cut
## 1   3457.542     Ideal
## 2   4584.258   Premium
## 3   3928.864      Good
## 4   4358.758      Fair
## 5   3981.760 Very Good

 

library(data.table)
diamonds <- as.data.table(diamonds)
diamonds[,.(mean_price = mean(price)), by = .(cut)]
##          cut mean_price
## 1:     Ideal   3457.542
## 2:   Premium   4584.258
## 3:      Good   3928.864
## 4: Very Good   3981.760
## 5:      Fair   4358.758

将结果数据用 ggplot2 呈现出来

library(ggplot2)
ggplot(diamonds_preview, aes(cut, cnt)) +
  geom_col() +
  theme_minimal()

diamonds 数据集总共 53940 条数据,下面用 BUCKET 分桶抽样,将原数据随机分成 1000 个桶,取其中的一个桶,由于是随机分桶,所以每次的结果都不一样,解释详见https://spark.apache.org/docs/latest/sql-ref-syntax-qry-select-sampling.html

diamonds_sample <- dbGetQuery(sc, "SELECT * FROM diamonds TABLESAMPLE (BUCKET 1 OUT OF 1000) LIMIT 6")
diamonds_sample
##   carat       cut color clarity depth table price    x    y    z
## 1  0.91 Very Good     F     SI1  63.1  60.0  3007 6.13 6.10 3.86
## 2  0.76     Ideal     D     VS2  60.2  56.0  3352 5.92 5.95 3.57
## 3  1.02      Fair     F     SI2  66.6  58.0  3633 6.21 6.14 4.11
## 4  0.85     Ideal     G     SI1  62.2  56.0  3670 6.04 6.09 3.77
## 5  1.23 Very Good     H      I1  61.9  60.6  3835 6.79 6.85 4.22
## 6  1.07   Premium     H     SI2  62.2  59.0  4119 6.47 6.53 4.04

将抽样的结果用窗口函数 RANK() 排序,详见 https://spark.apache.org/docs/latest/sql-ref-syntax-qry-select-window.html

窗口函数 https://www.cnblogs.com/ZackSun/p/9713435.html

diamonds_rank <- dbGetQuery(sc, "
  SELECT cut, price, RANK() OVER (PARTITION BY cut ORDER BY price) AS rank 
  FROM diamonds TABLESAMPLE (BUCKET 1 OUT OF 1000)
  LIMIT 6
")
diamonds_rank
##    cut price rank
## 1 Fair  1040    1
## 2 Fair  5919    2
## 3 Fair  7950    3
## 4 Fair 16280    4
## 5 Good   593    1
## 6 Good   638    2

LATERAL VIEW 把一列拆成多行

https://liam.page/2020/03/09/LATERAL-VIEW-in-Hive-SQL/ https://spark.apache.org/docs/latest/sql-ref-syntax-qry-select-lateral-view.html

创建数据集

# 先删除存在的表 person
dbGetQuery(sc, "DROP TABLE IF EXISTS person")
# 创建表 person
dbGetQuery(sc, "CREATE TABLE IF NOT EXISTS person (id INT, name STRING, age INT, class INT, address STRING)")
# 插入数据到表 person
dbGetQuery(sc, "
INSERT INTO person VALUES
    (100, 'John', 30, 1, 'Street 1'),
    (200, 'Mary', NULL, 1, 'Street 2'),
    (300, 'Mike', 80, 3, 'Street 3'),
    (400, 'Dan', 50, 4, 'Street 4')
")

查看数据集

dbGetQuery(sc,"SELECT * FROM person")
##    id name age class  address
## 1 300 Mike  80     3 Street 3
## 2 400  Dan  50     4 Street 4
## 3 100 John  30     1 Street 1
## 4 200 Mary  NA     1 Street 2

行列转换 https://www.cnblogs.com/kimbo/p/6208973.html

LATERAL VIEW 展开

dbGetQuery(sc,"
SELECT * FROM person
    LATERAL VIEW EXPLODE(ARRAY(30, 60)) tabelName AS c_age
    LATERAL VIEW EXPLODE(ARRAY(40, 80)) AS d_age
LIMIT 6
")
##    id name age class  address c_age d_age
## 1 300 Mike  80     3 Street 3    30    40
## 2 300 Mike  80     3 Street 3    30    80
## 3 300 Mike  80     3 Street 3    60    40
## 4 300 Mike  80     3 Street 3    60    80
## 5 400  Dan  50     4 Street 4    30    40
## 6 400  Dan  50     4 Street 4    30    80

日期相关的函数 https://spark.apache.org/docs/latest/sql-ref-functions-builtin.html#date-and-timestamp-functions

# 今天
dbGetQuery(sc, "select current_date")
##   current_date()
## 1     2021-05-11
# 昨天
dbGetQuery(sc, "select date_sub(current_date, 1)")
##   date_sub(current_date(), 1)
## 1                  2021-05-10
# 本月最后一天 current_date 所属月份的最后一天
dbGetQuery(sc, "select last_day(current_date)")
##   last_day(current_date())
## 1               2021-05-31
# 星期几
dbGetQuery(sc, "select dayofweek(current_date)")
##   dayofweek(current_date())
## 1                         3

最后,使用完记得关闭 Spark 连接

spark_disconnect(sc)

6.1.2 SparkR

考虑到和第6.1.1节的重合性,以及 sparklyr 的优势,本节代码都不会执行,仅作为补充信息予以描述。完整的介绍见 SparkR 包

if (nchar(Sys.getenv("SPARK_HOME")) < 1) {
  Sys.setenv(SPARK_HOME = "/opt/spark/spark-3.0.1-bin-hadoop2.7")
}
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
sparkR.session(master = "local[*]", sparkConfig = list(spark.driver.memory = "2g"))

sparkConfig 有哪些参数可以传递

Property Name Property group spark-submit equivalent
spark.master Application Properties --master
spark.kerberos.keytab Application Properties --keytab
spark.kerberos.principal Application Properties --principal
spark.driver.memory Application Properties --driver-memory
spark.driver.extraClassPath Runtime Environment --driver-class-path
spark.driver.extraJavaOptions Runtime Environment --driver-java-options
spark.driver.extraLibraryPath Runtime Environment --driver-library-path

将 data.frame 转化为 SparkDataFrame

faithful_sdf <- as.DataFrame(faithful)

SparkDataFrame

head(faithful_sdf)

查看结构

str(faithful_sdf)