本文记录如何快速配置 Apache Spark 和 Apache Sedona,包括环境配置和简单的示例
环境配置 前置要求
Java : JDK 8 或更高版本
Python : 3.7+
操作系统 : Linux
1. 安装 Java 1 2 3 4 5 sudo apt install openjdk-11-jdkjava -version
设置 JAVA_HOME 环境变量:
1 2 3 4 5 6 which java export JAVA_HOME=/path/to/javaexport PATH=$JAVA_HOME /bin:$PATH
2. 安装 Spark 1 2 3 4 5 6 7 8 9 10 11 12 13 wget https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz tar -xzf spark-3.5.0-bin-hadoop3.tgz mv spark-3.5.0-bin-hadoop3 ~/sparkexport SPARK_HOME=~/sparkexport PATH=$SPARK_HOME /bin:$PATH spark-shell --version
3. 使用 Python PySpark 1 2 3 4 5 pip install pyspark pip install pyspark==3.5.0
4. 安装 Apache Sedona 1 2 3 4 5 pip install apache-sedona pip install apache-sedona[extra]
5. 验证安装 创建测试脚本 test_spark.py:
1 2 3 4 5 6 7 8 9 10 11 from pyspark.sql import SparkSessionspark = SparkSession.builder \ .appName("test-spark" ) \ .master("local[4]" ) \ .getOrCreate() df = spark.range (10 ) df.show()
运行测试:
6. 配置 Sedona(可选) 如果需要空间数据处理,创建 test_sedona.py:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 from pyspark.sql import SparkSessionimport sedona.spark.sedona_spark_jarsspark = SparkSession.builder \ .appName("sedona-test" ) \ .master("local[4]" ) \ .config("spark.jars.packages" , "org.apache.sedona:sedona-spark-shaded_2.12:1.5.1" ) \ .config("spark.sql.extensions" , "org.apache.sedona.sql.UDTFRegistrator" ) \ .getOrCreate() print ("✓ Sedona 配置成功!" )
简要案例 案例 1:基础 DataFrame 操作 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 from pyspark.sql import SparkSessionfrom pyspark.sql.functions import col, sum , avgspark = SparkSession.builder.appName("spark-demo" ).master("local[4]" ).getOrCreate() data = [ ("Alice" , 2022 , 50000 ), ("Bob" , 2022 , 60000 ), ("Alice" , 2023 , 55000 ), ("Bob" , 2023 , 65000 ), ] columns = ["Name" , "Year" , "Attribute" ] df = spark.createDataFrame(data, columns) df.show() df.filter (col("Attribute" ) > 55000 ).show() df.groupBy("Name" ).agg(avg("Attribute" )).show() result_pdf = df.toPandas() print (result_pdf)
输出 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 +-----+----+-----------+ | Name|Year|Attribute | +-----+----+-----------+ |Alice|2022| 50000| | Bob|2022| 60000| |Alice|2023| 55000| | Bob|2023| 65000| +-----+----+-----------+ +-----+-----------+ | Name|avg(Attribute)| +-----+-----------+ |Alice| 52500| | Bob| 62500| +-----+-----------+ Name Year Attribute 0 Alice 2022 50000 1 Bob 2022 60000 2 Alice 2023 55000 3 Bob 2023 65000
案例 2:Sedona 地理空间查询 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 from pyspark.sql import SparkSessionfrom sedona.sql import ST_Constructors as STfrom sedona.sql import ST_Predicates as ST_Predfrom sedona.sql import ST_Transform as ST_Transspark = SparkSession.builder \ .appName("sedona-geo-demo" ) \ .config("spark.jars.packages" , "org.apache.sedona:sedona-spark-shaded_2.12:1.5.1" ) \ .config("spark.sql.extensions" , "org.apache.sedona.sql.UDTFRegistrator" ) \ .getOrCreate() cities_data = [ ("北京" , "POINT(116.4074 39.9042)" ), ("上海" , "POINT(121.4737 31.2304)" ), ("深圳" , "POINT(114.0579 22.5431)" ), ("杭州" , "POINT(120.1551 30.2741)" ), ] cities_df = spark.createDataFrame(cities_data, ["城市" , "坐标" ]) cities_df = cities_df.withColumn("geom" , ST.ST_GeomFromText("坐标" )) envelope = "POLYGON((115 22, 115 40, 122 40, 122 22, 115 22))" cities_df.createOrReplaceTempView("cities" ) spark.sql(f"CREATE OR REPLACE TEMP VIEW envelope AS SELECT ST_GeomFromText('{envelope} ') as geom" ) result = spark.sql(""" SELECT cities.城市, cities.坐标 FROM cities, envelope WHERE ST_Contains(envelope.geom, cities.geom) """ )result.show()
输出 :
1 2 3 4 5 6 7 8 +----+------------------+ |城市| 坐标 | +----+------------------+ |北京|POINT(116.4074..)| |上海|POINT(121.4737..)| |深圳|POINT(114.0579..)| |杭州|POINT(120.1551..)| +----+------------------+
分布式 Spark 集群部署 + 远程 Jupyter 调用方案 如果你需要部署一个分布式 Spark 集群,并通过远程 Jupyter Notebook 调用,本章提供完整的配置方案。
1. 集群部署(Spark Standalone 模式) 1.1 在 Master 节点(VM_A)部署 下载和安装 Spark:
1 2 3 4 5 6 7 8 9 sudo su - hadoopcd ~wget https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz tar -xzf spark-3.5.0-bin-hadoop3.tgz sudo mv spark-3.5.0-bin-hadoop3 /opt/sparksudo chown -R hadoop:hadoop /opt/spark
配置 Spark Master:
编辑 /opt/spark/conf/spark-env.sh:
1 2 3 4 5 6 7 8 9 10 11 cp /opt/spark/conf/spark-env.sh.template /opt/spark/conf/spark-env.shexport JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64export SPARK_MASTER_HOST='192.168.1.100' export SPARK_MASTER_PORT=7077export SPARK_MASTER_WEBUI_PORT=8080export SPARK_WORKER_CORES=4export SPARK_WORKER_MEMORY=8gexport SPARK_LOCAL_IP=192.168.1.100export HADOOP_CONF_DIR=/opt/hadoop/etc/hadoop
配置 Workers 列表 (/opt/spark/conf/workers):
1 2 3 192.168.1.100 192.168.1.101 192.168.1.102
1.2 在 Worker 节点分发配置 从 Master 分发 Spark 到所有 Workers:
1 2 3 4 5 6 7 for worker in 192.168.1.101 192.168.1.102; do scp -r /opt/spark/* hadoop@$worker :/opt/spark/ done ssh hadoop@192.168.1.101 "ls -la /opt/spark"
1.3 启动集群 在 Master 节点启动:
1 2 3 4 5 6 7 8 9 10 11 12 /opt/spark/sbin/start-master.sh /opt/spark/sbin/start-workers.sh jps
验证集群连接:
1 2 3 4 5 6 /opt/spark/bin/spark-shell --master spark://192.168.1.100:7077 sc.parallelize(1 to 100).sum () :quit
2. Jupyter Server 配置(Master 节点) 2.1 安装 Jupyter 1 2 3 4 5 6 7 8 9 pip3 install jupyter jupyterlab ipython pyspark pandas numpy matplotlib jupyter notebook --generate-config jupyter notebook password
2.2 配置远程访问 编辑 ~/.jupyter/jupyter_notebook_config.py:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 c.NotebookApp.ip = '0.0.0.0' c.NotebookApp.port = 8888 c.NotebookApp.open_browser = False c.NotebookApp.notebook_dir = '/home/hadoop/jupyter_work' c.NotebookApp.token = '' c.NotebookApp.disable_check_xsrf = True c.NotebookApp.allow_remote_access = True
2.3 启动 Jupyter 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 mkdir -p ~/jupyter_worksudo tee /etc/systemd/system/jupyter.service > /dev/null <<EOF [Unit] Description=Jupyter Notebook Server After=network.target [Service] Type=simple User=hadoop WorkingDirectory=/home/hadoop/jupyter_work ExecStart=/usr/local/bin/jupyter notebook --config=/home/hadoop/.jupyter/jupyter_notebook_config.py Restart=on-failure RestartSec=10 [Install] WantedBy=multi-user.target EOF sudo systemctl daemon-reloadsudo systemctl enable jupytersudo systemctl start jupytersudo journalctl -u jupyter -f
或使用后台运行(临时方案):
1 2 cd ~/jupyter_worknohup jupyter notebook --config ~/.jupyter/jupyter_notebook_config.py > ~/jupyter.log 2>&1 &
3. 本地远程连接配置 3.1 SSH 端口转发 在你的本地机器(Windows PowerShell)上:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 ssh -i D:\key ` -L 8888 :192.168 .1.100 :8888 ` -L 8080 :192.168 .1.100 :8080 ` -L 8081 :192.168 .1.100 :8081 ` root@GATEWAY_IP -p 21040 -N $sshKey = "D:\key" $gateway = "GATEWAY_IP" $port = "21040" while ($true ) { Write-Host "连接到 Spark 集群..." ssh -i $sshKey ` -L 8888 :192.168 .1.100 :8888 ` -L 8080 :192.168 .1.100 :8080 ` root@ $gateway -p $port -N Write-Host "连接断开,10秒后重新连接..." Start-Sleep -Seconds 10 } powershell -ExecutionPolicy Bypass -File forward-cluster .ps1
3.2 访问 Jupyter 1 2 http://localhost:8888 # 输入你设置的密码
4. Jupyter 中连接 Spark 集群 4.1 创建 Notebook,添加以下代码 第一个 Cell:配置 PySpark
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 import osimport sysspark_master_url = "spark://192.168.1.100:7077" spark_app_name = "JupyterRemoteApp" from pyspark.sql import SparkSessionspark = SparkSession.builder \ .appName(spark_app_name) \ .master(spark_master_url) \ .config("spark.executor.memory" , "2g" ) \ .config("spark.executor.cores" , "2" ) \ .config("spark.sql.adaptive.enabled" , "true" ) \ .getOrCreate() print (f"✓ Spark 版本: {spark.version} " )print (f"✓ 已连接到集群: {spark_master_url} " )
4.2 提交任务示例 计算密集型任务:
1 2 3 4 5 data = spark.sparkContext.parallelize(range (1 , 1001 ), numPartitions=4 ) result = data.map (lambda x: x ** 2 ).sum () print (f"1到1000平方和: {result} " )
DataFrame 操作:
1 2 3 4 5 6 7 8 9 10 11 12 13 df = spark.createDataFrame([ ("Alice" , 28 ), ("Bob" , 35 ), ("Charlie" , 42 ), ], ["name" , "age" ]) df.filter (df.age > 30 ).show() result = df.toPandas() print (result)
从 HDFS 读取数据:
1 2 3 df = spark.read.csv("hdfs://192.168.1.100:9000/data/file.csv" , header=True ) df.show()
5. 监控集群 5.1 本地访问 Spark Web UI 通过浏览器访问:
1 2 http://localhost:8080 # 可以看到集群状态、Worker 信息、正在运行的任务
5.2 在 Jupyter 中获取集群信息 1 2 3 4 5 6 7 8 9 10 11 12 13 app_id = spark.sparkContext.applicationId print (f"应用 ID: {app_id} " )spark_conf = spark.sparkContext.getConf().getAll() for key, value in spark_conf: if 'memory' in key.lower() or 'cores' in key.lower(): print (f"{key} : {value} " ) status = spark.sparkContext.statusTracker() print (f"总 Executor 数: {status.getExecutorInfos()} " )
相关资源