程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
您现在的位置: 程式師世界 >> 編程語言 >  >> 更多編程語言 >> Python

PySpark - python package的測試

編輯:Python

Pyspark python package的測試

PySpark - Python Package Management

PySpark提供了一種將python環境剝離到鏡像之外的一種方式,眾所周知,在docker的優化方案中,減小鏡像的體積能夠節省資源、提高效率,這是一種“極大程度上”能夠優化鏡像的方式。

在這篇博文中,我主要使用conda打包python的方式測了一下在k8s集群上的client/cluster模式提交任務

環境准備

  • k8s 集群
  • NFS - PVC(需要結合k8s配置nfs pvc)
  • 一個可提交spark任務的鏡像

啟動容器

  1. 用能夠提交spark任務的鏡像啟動一個容器
  2. 進入容器,Do Something(可以在容器中提前用conda打包所需的python)

1. conda打包環境

  1. 安裝conda
  2. 用conda安裝所需python環境
# python=XXX, XXX代表指定python版本
conda create -y -n pyspark_conda_env -c conda-forge pyarrow pandas conda-pack python=XXX
conda activate pyspark_conda_env
conda pack -f -o pyspark_conda_env.tar.gz

2. 測試使用的代碼:app.py

import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql import SparkSession
def main(spark):
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
return v.mean()
print(df.groupby("id").agg(mean_udf(df['v'])).collect())
if __name__ == "__main__":
main(SparkSession.builder.getOrCreate())

client mode

運行命令

在client模式下,PYSPARK_DRIVER_PYTHONPYSPARK_PYTHON 都要設置

  1. PYSPARK_PYTHON: 通過--archives上傳的pyspark_conda_env.tar.gz在pod中解壓之後的python路徑
  2. PYSPARK_DRIVER_PYTHON: pyspark_conda_env.tar.gz在容器中解壓之後的python路徑
  3. 由於client模式下,PYSPARK_DRIVER_PYTHON指定的python路徑只被driver端使用,所以只需掛載到當前啟動的容器中就行,不需要在其他節點也存在
# 解壓!
# [email protected]:/ppml/trusted-big-data-ml# tar -zxvf pyspark_conda_env.tar.gz -C pyspark_conda_env
export PYSPARK_DRIVER_PYTHON=/ppml/trusted-big-data-ml/pyspark_conda_env/bin/python # Do not set in cluster modes.
export PYSPARK_PYTHON=./pyspark_conda_env/bin/python
# 提交Spark命令
${SPARK_HOME}/bin/spark-submit \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=${RUNTIME_K8S_SERVICE_ACCOUNT} \
--deploy-mode client \
--conf spark.driver.host=${LOCAL_HOST} \
--master ${RUNTIME_SPARK_MASTER} \
--conf spark.kubernetes.executor.podTemplateFile=/ppml/trusted-big-data-ml/spark-executor-template.yaml \
--conf spark.kubernetes.container.image=${RUNTIME_K8S_SPARK_IMAGE} \
--conf spark.kubernetes.executor.deleteOnTermination=false \
--archives ./pyspark_conda_env.tar.gz#pyspark_conda_env \
local:///ppml/trusted-big-data-ml/app.py

執行分析

# hello

cluster mode

運行命令

在cluster模式下,只需要設置PYSPARK_PYTHON

  1. PYSPARK_PYTHON: pyspark_conda_env.tar.gz解壓到pod中的python路徑
  2. 通過conda打包的pyspark_conda_env.tar.gz,通過spark.kubernetes.file.upload.path指定的共享文件系統傳入到Driver
export PYSPARK_PYTHON=./pyspark_conda_env/bin/python
# 提交Spark命令
${SPARK_HOME}/bin/spark-submit \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=${RUNTIME_K8S_SERVICE_ACCOUNT} \
--deploy-mode cluster \
--master ${RUNTIME_SPARK_MASTER} \
--conf spark.kubernetes.executor.podTemplateFile=/ppml/trusted-big-data-ml/spark-executor-template.yaml \
--conf spark.kubernetes.driver.podTemplateFile=/ppml/trusted-big-data-ml/spark-driver-template.yaml \
--conf spark.kubernetes.container.image=${RUNTIME_K8S_SPARK_IMAGE} \
--conf spark.kubernetes.executor.deleteOnTermination=false \
--conf spark.kubernetes.file.upload.path=/ppml/trusted-big-data-ml/work/data/shaojie \
--archives ./pyspark_conda_env.tar.gz#pyspark_conda_env \
local:///ppml/trusted-big-data-ml/work/data/shaojie/app.py

通過跑cluster模式的任務,趁機來看一下spark.kubernetes.file.upload.path的作用。在這裡,--archives意為通過指定逗號分割的tar,jar,zip等一系列依賴包,將會被解壓到executor

執行分析

22-07-29 02:05:58 INFO SparkContext:57 - Added archive file:/ppml/trusted-big-data-ml/work/data/shaojie/spark-upload-06550aaa-76f6-4f6e-a123-d34c71dbce5c/pyspark_conda_env.tar.gz#pyspark_conda_env at spark://app-py-36a4238247b3d72e-driver-svc.default.svc:7078/files/pyspark_conda_env.tar.gz with timestamp 1659060357319
22-07-29 02:05:58 INFO Utils:57 - Copying /ppml/trusted-big-data-ml/work/data/shaojie/spark-upload-06550aaa-76f6-4f6e-a123-d34c71dbce5c/pyspark_conda_env.tar.gz to /tmp/spark-6c702f67-0531-49f8-9656-48d4e110eea1/pyspark_conda_env.tar.gz
INFO fork chmod is forbidden !!!/tmp/spark-6c702f67-0531-49f8-9656-48d4e110eea1/pyspark_conda_env.tar.gz
22-07-29 02:05:58 INFO SparkContext:57 - Unpacking an archive file:/ppml/trusted-big-data-ml/work/data/shaojie/spark-upload-06550aaa-76f6-4f6e-a123-d34c71dbce5c/pyspark_conda_env.tar.gz#pyspark_conda_env from /tmp/spark-6c702f67-0531-49f8-9656-48d4e110eea1/pyspark_conda_env.tar.gz to /var/data/spark-3024b9ad-8e4d-4b2a-b51a-aee8f54d5a46/spark-8acb95d2-599d-4e2f-8203-c1f3455c4c7f/userFiles-994a18bf-12cd-4d98-b3e9-1035f741fe67/pyspark_conda_en
  1. SparkContext先索取spark.kubernetes.file.upload.path路徑中所有被上傳的archives包,添加到driver中。[spark.kubernetes.file.upload.path所指定的路徑一定要是可被共享訪問的文件系統: HDFS, NFS等]
  2. 拷貝共享路徑中的包到Driver中的路徑
  3. 解壓Driver中拷貝過來的archives包到指定位置

questions

  1. client去提交archives的時候,不需要指定spark.kubernetes.file.upload.path的嘛?
  2. cluster提交的archives主要是給啟動的driver使用?
  3. spark.kubernetes.file.upload.path文檔上認為這個配置的值應該是一個遠程存儲

  1. 上一篇文章:
  2. 下一篇文章:
Copyright © 程式師世界 All Rights Reserved