PySpark开发入门

PySpark简介

Spark是由Scala语言开发而成,PySpark 只是Python操作Spark的高级 API,底层还是使用JDK运行Scala的代码逻辑。

环境准备

使用python3.10

安装jdk

由于Spark是使用Scala进行开发的,运行时需要有JDK环境。

  1. 下载JDK

    1. 访问https://adoptium.net/zh-CN/temurin/releases?version=11&os=any&arch=any
    2. 选择对应环境和版本的JDK进行下载即可。
  2. 我这里下载的是:OpenJDK11U-jdk_x64_windows_hotspot_11.0.28_6.msi

  3. windows环境下,只需要双击即可运行安装。

    • 注意:在自定义安装这一步,要选择:设置或重新JAVA_HOME变量。
    • 如果忘记了选择,就需要手动添加%JAVA_HOME%\bin变量,并将其写入到PATH路径中。
  4. 安装完成后,打开命令行窗口测试:

    1
    java --version

    可以正确看到我们安装的JDK版本。

    • 如果安装成功,看不到JDK版本信息,那就是缺少JAVA_HOME环境变量,并缺少将%JAVA_HOME%\bin加入到PATH变量中。

安装hadoop

Spark 启动时会调用 winutils.exe(Hadoop 为 Windows 提供的本地工具),用于处理文件权限、临时目录创建等基础操作,即使是 Local 模式(单机非分布式)也需要。

  1. 访问路径下载hadoop :https://hadoop.apache.org/releases.html

  2. 我这里下载的是:https://dlcdn.apache.org/hadoop/common/hadoop-3.3.6/hadoop-3.3.6.tar.gz

  3. 解压缩,移动到本地安装目录中

  4. 配置环境变量:HADOOP_HOME , 并将%HADOOP_HOME%\bin%HADOOP_HOME%\sbin 加入到PATH变量中。

  5. 下载与 Hadoop 版本 匹配的winutils.exe(从winutils 仓库), 放入HADOOP_HOME\bin目录(如D:\develop\hadoop\hadoop-3.3.6\bin

  6. 验证

    1
    winutils.exe version

安装Spark

  1. 访问路径下载Spark :https://spark.apache.org/downloads.html

  2. 我这里下载的是:https://dlcdn.apache.org/spark/spark-3.5.6/spark-3.5.6-bin-hadoop3.tgz

  3. 解压缩,移动到本地安装目录中

  4. 配置环境变量:SPARK_HOME , 并将%SPARK_HOME%\bin%SPARK_HOME%\sbin 加入到PATH变量中。

  5. 验证

    1
    pyspark

安装PySpark

1
2
pip uninstall pyspark -y
pip install pyspark==3.5.6 --no-cache-dir -i https://pypi.tuna.tsinghua.edu.cn/simple

简单使用

使用python3.10
如果版本不匹配,一定要先删除原来的版本如下

C:\Python\Python3.12\Scripts
C:\Python\Python3.12
添加下面的版本到path环境变量
C:\Python\Python3.10\Scripts
C:\Python\Python3.10\

使用pyspark-shell

安装好pyspark,实际上在虚拟环境的Lib/site-packages下已经存在了完整的spark代码,我们运行如下命令,就可以进入pyspark的命令行了:

1
2
cd .\.venv\Lib\site-packages\pyspark\
.\bin\pyspark

启动后,会打开一个 Web UI http://host.docker.internal:4040/jobs/,可以查看Spark任务的执行情况。

进入shell执行操作测试如下:

1
2
3
4
5
# 查看当前路径
>>> print(os.getcwd())

# 测试一下官方的示例:生成一个包含 10 亿条整数记录的分布式数据集,计算并返回这个数据集的总记录数(结果应该是 1000000000)
>>> spark.range(1000 * 1000 * 1000).count()

这个示例通常用于测试 Spark 集群的性能,因为处理 10 亿条数据需要较大的内存和计算资源,能反映出集群的实际处理能力。(执行这个命令消耗大量内存,如果电脑配置低,可能会出现程序崩溃)

  • spark.range(n)
    • 这是 SparkSession 提供的方法,用于生成一个包含从 0n-1 的整数的分布式数据集(DataFrame)
    • 这里的 n1000 * 1000 * 1000,即 10 亿,所以会生成包含 0 到 999,999,999 的整数序列。
  • .count()
    • 这是 DataFrame 的行动操作(action),用于计算数据集中的记录总数
    • 会触发 Spark 执行实际的计算(前面的 range 只是定义了转换操作)

json数据解析

1、创建json测试数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import json
import random


# 生成随机名字的函数
def generate_random_name():
first_names = ["Alice", "Bob", "Charlie", "David", "Ella", "Frank", "Grace", "Henry", "Ivy", "Jack",
"Kate", "Leo", "Mia", "Nick", "Olivia", "Paul", "Quinn", "Ryan", "Sophia", "Tom"]
last_names = ["Smith", "Johnson", "Williams", "Jones", "Brown", "Davis", "Miller", "Wilson", "Moore", "Taylor",
"Anderson", "Thomas", "Jackson", "White", "Harris", "Martin", "Thompson", "Garcia", "Martinez", "Robinson"]
first = random.choice(first_names)
last = random.choice(last_names)
return {"first": first, "last": last}

def main():
# 生成 150 条数据
data = []
for _ in range(150):
name = generate_random_name()
age = random.randint(18, 60)
data.append({"name": name, "age": age})

# 保存为 JSON 文件
with open("test.json", "w") as f:
for item in data:
f.write(json.dumps(item) + "\n")

print("已生成包含 150 条数据的 test.json 文件")

if __name__=='__main__':
main()

2、测试

1
2
3
4
5
6
7
8
9
10
11
12
13
from pyspark.shell import spark


def main():
df = spark.read.json("test.json")
query_age_df = df.where("age > 31")
query_age_df.show()
query_name_df = query_age_df.select("name.first")
query_name_df.show()


if __name__ == '__main__':
main()

测试PostgreSQL

1、启动docker容器

1
2
3
4
# 运行PostgreSQL容器
# docker run -d --name my-postgres -e POSTGRES_DB=learn_db -e POSTGRES_USER=learn -e POSTGRES_PASSWORD=123456 -p 5432:5432 postgres:16.10
docker ps | findstr "my-postgres"
docker start my-postgres

2、运行测试

1
pip install psycopg2  -i https://pypi.tuna.tsinghua.edu.cn/simple
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# 设置Python解释器路径 - 解决Windows上找不到python3的问题
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

from pyspark.sql import SparkSession
import psycopg2
from typing import List, Dict, Any

def create_spark_session():
"""创建SparkSession"""
spark = SparkSession.builder \
.appName("PySparkPostgreSQLExample") \
.config("spark.cleaner.referenceTracking.cleanCheckpoints", "false") \
.config("spark.worker.cleanup.enabled", "false") \
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR") # 只显示ERROR级别日志
return spark

def create_sample_data() -> List[Dict[str, Any]]:
"""创建示例数据"""
return [
{"id": 1, "name": "张三", "age": 25, "email": "[email protected]"},
{"id": 2, "name": "李四", "age": 30, "email": "[email protected]"},
{"id": 3, "name": "王五", "age": 35, "email": "[email protected]"},
{"id": 4, "name": "赵六", "age": 40, "email": "[email protected]"}
]

def create_postgresql_connection():
"""创建PostgreSQL连接"""
return psycopg2.connect(
host="localhost",
port=5432,
user="learn",
password="123456",
database="learn_db"
)

def setup_postgresql_table(conn):
"""设置PostgreSQL表结构"""
cursor = conn.cursor()
cursor.execute("DROP TABLE IF EXISTS users")
cursor.execute("""
CREATE TABLE users (
id INT PRIMARY KEY,
name VARCHAR(50),
age INT,
email VARCHAR(100)
)
""")
conn.commit()
cursor.close()

def write_to_postgresql(conn, data: List[Dict[str, Any]]):
"""将数据写入PostgreSQL"""
cursor = conn.cursor()
for item in data:
cursor.execute("""
INSERT INTO users (id, name, age, email)
VALUES (%s, %s, %s, %s)
""", (item["id"], item["name"], item["age"], item["email"]))
conn.commit()
cursor.close()
print("数据已写入PostgreSQL")

def read_from_postgresql(conn):
"""从PostgreSQL读取数据"""
cursor = conn.cursor()
cursor.execute("SELECT id, name, age, email FROM users")
columns = [desc[0] for desc in cursor.description]
results = [dict(zip(columns, row)) for row in cursor.fetchall()]
cursor.close()
return results

def main():
"""主函数"""
# 创建SparkSession
spark = create_spark_session()

# 创建PostgreSQL连接
pg_conn = create_postgresql_connection()

# 设置PostgreSQL表结构
setup_postgresql_table(pg_conn)

# 创建示例数据
sample_data = create_sample_data()

# 将数据转换为DataFrame并显示
df = spark.createDataFrame(sample_data)
print("原始数据:")
df.show()

# 写入PostgreSQL
write_to_postgresql(pg_conn, sample_data)

# 从PostgreSQL读取数据
pg_data = read_from_postgresql(pg_conn)

# 将PostgreSQL数据转换为DataFrame并显示
pg_df = spark.createDataFrame(pg_data)
print("从PostgreSQL读取的数据:")
pg_df.show()

# 关闭PostgreSQL连接
pg_conn.close()

# 停止SparkSession
spark.stop()

if __name__ == "__main__":
main()

测试MySQL

1、启动docker容器

1
2
docker ps | findstr "my-mysql"
docker start my-mysql

2、运行测试

1
pip install mysql-connector-python  -i https://pypi.tuna.tsinghua.edu.cn/simple
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# 设置Python解释器路径 - 解决Windows上找不到python3的问题
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

from pyspark.sql import SparkSession
import mysql.connector
from typing import List, Dict, Any

def create_spark_session():
"""创建SparkSession"""
spark = SparkSession.builder \
.appName("PySparkMySQLExample") \
.config("spark.cleaner.referenceTracking.cleanCheckpoints", "false") \
.config("spark.worker.cleanup.enabled", "false") \
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR") # 只显示ERROR级别日志
return spark

def create_sample_data() -> List[Dict[str, Any]]:
"""创建示例数据"""
return [
{"id": 1, "name": "张三", "age": 25, "email": "[email protected]"},
{"id": 2, "name": "李四", "age": 30, "email": "[email protected]"},
{"id": 3, "name": "王五", "age": 35, "email": "[email protected]"},
{"id": 4, "name": "赵六", "age": 40, "email": "[email protected]"}
]

def create_mysql_connection():
"""创建MySQL连接"""
import time
max_retries = 3
wait_time = 2

for attempt in range(max_retries):
try:
conn = mysql.connector.connect(
host="localhost",
port=3306,
user="learn",
password="123456",
database="learn_db"
)
print("成功连接到MySQL数据库")
return conn
except mysql.connector.Error as err:
print(f"连接尝试 {attempt + 1}/{max_retries} 失败: {err}")
if attempt < max_retries - 1:
print(f"等待 {wait_time} 秒后重试...")
time.sleep(wait_time)

raise Exception(f"无法连接到MySQL数据库,已尝试 {max_retries} 次")

def setup_mysql_table(conn):
"""设置MySQL表结构"""
cursor = conn.cursor()
cursor.execute("DROP TABLE IF EXISTS users")
cursor.execute("""
CREATE TABLE users (
id INT PRIMARY KEY,
name VARCHAR(50),
age INT,
email VARCHAR(100)
)
""")
conn.commit()
cursor.close()

def write_to_mysql(conn, data: List[Dict[str, Any]]):
"""将数据写入MySQL"""
cursor = conn.cursor()
for item in data:
cursor.execute("""
INSERT INTO users (id, name, age, email)
VALUES (%s, %s, %s, %s)
""", (item["id"], item["name"], item["age"], item["email"]))
conn.commit()
cursor.close()
print("数据已写入MySQL")

def read_from_mysql(conn):
"""从MySQL读取数据"""
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT id, name, age, email FROM users")
results = cursor.fetchall()
cursor.close()
return results

def main():
"""主函数"""
# 创建SparkSession
spark = create_spark_session()

# 创建MySQL连接
mysql_conn = create_mysql_connection()

# 设置MySQL表结构
setup_mysql_table(mysql_conn)

# 创建示例数据
sample_data = create_sample_data()

# 将数据转换为DataFrame并显示
df = spark.createDataFrame(sample_data)
print("原始数据:")
df.show()

# 写入MySQL
write_to_mysql(mysql_conn, sample_data)

# 从MySQL读取数据
mysql_data = read_from_mysql(mysql_conn)

# 将MySQL数据转换为DataFrame并显示
mysql_df = spark.createDataFrame(mysql_data)
print("从MySQL读取的数据:")
mysql_df.show()

# 关闭MySQL连接
mysql_conn.close()

# 停止SparkSession
spark.stop()

if __name__ == "__main__":
main()

redis

1
2
# 运行Redis容器
docker run -d --name my-redis -p 6379:6379 redis:8.2.1
1
pip install redis  -i https://pypi.tuna.tsinghua.edu.cn/simple
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# 设置Python解释器路径 - 解决Windows上找不到python3的问题
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

from pyspark.sql import SparkSession
import config
import json
from typing import List, Dict, Any
import redis
from redis import Redis

def create_spark_session():
"""创建SparkSession并配置Redis连接"""
spark = SparkSession.builder \
.appName("PySparkRedisExample") \
.config("spark.redis.host", config.REDIS_CONFIG['host']) \
.config("spark.redis.port", str(config.REDIS_CONFIG['port'])) \
.config("spark.cleaner.referenceTracking.cleanCheckpoints", "false") \
.config("spark.worker.cleanup.enabled", "false") \
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR") # 只显示ERROR级别日志
return spark

def create_sample_data() -> List[Dict[str, Any]]:
"""创建示例数据"""
return [
{"id": 1, "name": "张三", "age": 25, "email": "[email protected]"},
{"id": 2, "name": "李四", "age": 30, "email": "[email protected]"},
{"id": 3, "name": "王五", "age": 35, "email": "[email protected]"},
{"id": 4, "name": "赵六", "age": 40, "email": "[email protected]"}
]

def write_to_redis(redis_client: Redis, data: List[Dict[str, Any]]):
"""将数据写入Redis"""
for item in data:
key = f"user:{item['id']}"
# 先删除已存在的键(如果有)
redis_client.delete(key)
# 写入新数据
redis_client.hset(
key,
mapping={
"id": str(item["id"]),
"name": item["name"],
"age": str(item["age"]),
"email": item["email"]
}
)
print("数据已写入Redis")

def read_from_redis(redis_client: Redis) -> List[Dict[str, Any]]:
"""从Redis读取数据"""
keys = redis_client.keys("user:*")
results = []
for key in keys:
user_data = redis_client.hgetall(key)
results.append({
"id": int(user_data[b"id"]),
"name": user_data[b"name"].decode("utf-8"),
"age": int(user_data[b"age"]),
"email": user_data[b"email"].decode("utf-8")
})
return results

def main():
"""主函数"""
# 创建SparkSession
spark = create_spark_session()

# 创建Redis客户端
redis_client = redis.Redis(
host=config.REDIS_CONFIG['host'],
port=config.REDIS_CONFIG['port'],
db=0
)

# 创建示例数据
sample_data = create_sample_data()

# 将数据转换为DataFrame并显示
df = spark.createDataFrame(sample_data)
print("原始数据:")
df.show()

# 写入Redis
write_to_redis(redis_client, sample_data)

# 从Redis读取数据
redis_data = read_from_redis(redis_client)

# 将Redis数据转换为DataFrame并显示
redis_df = spark.createDataFrame(redis_data)
print("从Redis读取的数据:")
redis_df.show()

# 停止SparkSession
spark.stop()

if __name__ == "__main__":
main()

mongodb

1
2
# 运行MongoDB容器
docker run -d --name my-mongodb -e MONGO_INITDB_ROOT_USERNAME=root -e MONGO_INITDB_ROOT_PASSWORD=123456 -p 27017:27017 mongo:8.0.13
1
pip install pymongo   -i https://pypi.tuna.tsinghua.edu.cn/simple
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# 设置Python解释器路径 - 解决Windows上找不到python3的问题
import os, sys
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
from pyspark.sql import SparkSession
import config
from pymongo import MongoClient
from typing import List, Dict, Any


def create_spark_session():
"""创建SparkSession"""
spark = SparkSession.builder \
.appName("PySparkMongoDBExample") \
.config("spark.cleaner.referenceTracking.cleanCheckpoints", "false") \
.config("spark.worker.cleanup.enabled", "false") \
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR") # 只显示ERROR级别日志
return spark


def create_sample_data() -> List[Dict[str, Any]]:
"""创建示例数据"""
return [
{"id": 1, "name": "张三", "age": 25, "email": "[email protected]"},
{"id": 2, "name": "李四", "age": 30, "email": "[email protected]"},
{"id": 3, "name": "王五", "age": 35, "email": "[email protected]"},
{"id": 4, "name": "赵六", "age": 40, "email": "[email protected]"}
]


def write_to_mongodb(mongo_client: MongoClient, data: List[Dict[str, Any]]):
"""将数据写入MongoDB"""
db = mongo_client[config.MONGODB_CONFIG['database']]
collection = db[config.MONGODB_CONFIG['collection']]
# 先清空集合
collection.delete_many({})
# 写入新数据
collection.insert_many(data)
print("数据已写入MongoDB")


def read_from_mongodb(mongo_client: MongoClient):
"""从MongoDB读取数据"""
db = mongo_client[config.MONGODB_CONFIG['database']]
collection = db[config.MONGODB_CONFIG['collection']]
return list(collection.find({}, {"_id": 0}))


def main():
"""主函数"""
# 创建SparkSession
spark = create_spark_session()

# 创建MongoDB客户端
mongo_client = MongoClient(config.MONGODB_CONFIG['uri'])

# 创建示例数据
sample_data = create_sample_data()

# 将数据转换为DataFrame并显示
df = spark.createDataFrame(sample_data)
print("原始数据:")
df.show()

# 写入MongoDB
write_to_mongodb(mongo_client, sample_data)

# 从MongoDB读取数据
mongodb_data = read_from_mongodb(mongo_client)

# 将MongoDB数据转换为DataFrame并显示
mongodb_df = spark.createDataFrame(mongodb_data)
print("从MongoDB读取的数据:")
mongodb_df.show()

# 停止SparkSession
spark.stop()


if __name__ == "__main__":
main()