PySpark实战-前言介绍
spark
是Apache基金会旗下的顶级开源项目用于对海量数据进行大规模分布式计算。
PySpark
是Spark的Python实现,是Spark为Python开发者提供的编程入口,用于以Python代码完成Spark任务的开发
PySpark不仅可以作为Python第三方库使用也可以将程序提交的Spark集群环境中,调度大规模集群进行执行。
Python应用场景和就业方向是十分丰富的,其中,最为亮点的方向为大数据开发和人工智能
基础准备
PySpark库的安装:同其它的Python第三方库一样PySpark同样可以使用pip程序进行安装
在”CMD”命令提示符程序内,输入: pip install pyspark
或使用国内代理镜像网站(清华大学源)
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark
1 2 3 4 5 6 7 8 9 10 11 12 13
|
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
print(sc.version)
sc.stop()
|
数据输入
RDD对象,PySpark支持多种数据的输入,在输入完成后,都会得到一个:RDD类的对象
RDD全称为:弹性分布式数据集(Resilient Distributed Datasets)PySpark针对数据的处理都是以RDD对象作为载体,即:
- 数据存储在RDD内
- 各类数据的计算方法,也都是RDD的成员方法
- RDD的数据计算方法,返回值依旧是RDD对象
Python数据容器转RDD对象: PySpark支持通过SparkContext对象的parallelize成员方法将:
list、tuple、set、dicr、str 转换为PySpark的RDD对象
1 2 3 4 5 6 7 8 9 10
| from pyspark import SparkConf,SparkContext
conf = SparkConf().setMaster("local[*]").\ setAppName("test_spark_app") sc = SparkContext(conf=conf)
rdd = sc.parallelize(数据容器对象)
print(rdd.collect())
|
注意: 字符串会被拆分出1个个的字符,存入RDD对象
字典仅有key会被存入RDD对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| from pyspark import SparkConf,SparkContext
conf = SparkConf().setMaster("local[*]").setAppName("test_spark") sc = SparkContext(conf=conf)
rdd1 = sc.parallelize([1,2,3,4,5]) rdd2 = sc.parallelize((1,2,3,4,5)) rdd3 = sc.parallelize("abcdefg") rdd4 = sc.parallelize({1,2,3,4,5}) rdd5 = sc.parallelize({"key1": "value1","key2": "value2"})
print(rdd1.collect()) print(rdd2.collect()) print(rdd3.collect()) print(rdd4.collect()) print(rdd1.collect())
sc.stop()
|
读取文件转RDD对象:PySpark也支持通过SparkContext入口对象,来读取文件来构建出RDD对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| from pyspark import SparkConf,SparkContext
conf = SparkConf().setMaster("local[*]").setAppName("test_spark") sc = SparkContext(conf=conf)
rdd1 = sc.parallelize([1,2,3,4,5]) rdd2 = sc.parallelize((1,2,3,4,5)) rdd3 = sc.parallelize("abcdefg") rdd4 = sc.parallelize({1,2,3,4,5}) rdd5 = sc.parallelize({"key1": "value1","key2": "value2"})
print(rdd1.collect()) print(rdd2.collect()) print(rdd3.collect()) print(rdd4.collect()) print(rdd1.collect())
sc.stop()
|
数据计算-map方法
map方法: PySpark的数据计算,都是基于RDD对象来进行的,那么如何进行呢?
自然是依赖,RDD对象内置丰富的: 成员方法(算子)
map算子是将RDD的数据一条条处理(处理的逻基于map算子中接收的处理函数 返回新的RDD
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| from pyspark import SparkConf,SparkContext import os os.environ['PYSPARK_PYTHON'] = "python解释器位置“
conf = SparkConf().setMaster("local[*]").setAppName("test_spark") sc = SparkContext(conf=conf)
# 准备一个RDD rdd = sc.parallelize([1,2,3,4,5]) # 通过map方法将全部数据都乘以10
def func(data): return data * 10 rdd2 = rdd.map(func) # 链式调用 # rdd2 = rdd.map(lambda x: x * 10).map(lambda x: x + 5)
print(rdd2.collect()) # (T) -> U # (T) -> T
|
数据计算-flatMap方法
功能: 对rdd执行map操作,然后进行解除嵌套操作.
1 2 3 4 5 6 7 8 9 10 11 12 13
| from pyspark import SparkConf,SparkContext import os
os.environ['PYSPARK_PYTHON'] = '解释器位置' conf = SparkConf().setMaster("local[*]").setAppName("test_spark") sc = SparkContext(conf=conf)
rdd = sc.parallelize(["itheima itcast 666","itheima itheima itcast","python itheima"])
rdd2 = rdd.map(lambda x: x.split("")) print(rdd2.collect())
|
数据计算-reduceByKey方法
功能: 针对KV型 RDD,自动按照key分组,然后根据你提供的聚合逻辑,完成 组内数据(value) 的聚合操作.
1 2 3 4 5 6 7 8 9 10 11 12
| from pyspark import SparkConf,SparkContext import os
os.environ['PYSPARK_PYTHON'] = '解释器位置' conf = SparkConf().setMaster("local[*]").setAppName("test_spark") sc = SparkContext(conf=conf)
rdd = sc.parallelize([('男',99),('男',88),('女',99),('女',66)])
rdd2 = rdd.reduceByKey(lambda a,b: a + b) print(rdd2.collect())
|
数据计算-单词统计案例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| from pyspark import SparkConf,SparkContext import os
os.environ['PYSPARK_PYTHON'] = 'F:\Python\python.exe' conf = SparkConf().setMaster("local[*]").setAppName("test_spark") sc = SparkContext(conf=conf)
rdd = sc.textFile("F:/hello.txt")
word_rdd = rdd.flatMap(lambda x: x.split(""))
word_with_one_rdd = word_rdd.map(lambda word: (word,1))
result_rdd = word_with_one_rdd.reduceByKey(lambda a,b: a + b)
print(result_rdd.collect())
|
数据计算-filter方法
功能:过滤想要的数据进行保留
返回是True的数据被保留,False的数据被丢弃
1 2 3 4 5 6 7 8 9 10 11 12 13
| from pyspark import SparkConf,SparkContext import os
os.environ['PYSPARK_PYTHON'] = 'F:\Python\python.exe' conf = SparkConf().setMaster("local[*]").setAppName("test_spark") sc = SparkContext(conf=conf)
rdd = sc.parallelize([1,2,3,4,5])
rdd2 = rdd.filter(lambda num: num % 2 == 0) print(rdd2.collect())
|
数据计算-distinct方法
完成对RDD内数据的去重操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| from pyspark import SparkConf,SparkContext import os
os.environ['PYSPARK_PYTHON'] = 'F:\Python\python.exe' conf = SparkConf().setMaster("local[*]").setAppName("test_spark") sc = SparkContext(conf=conf)
rdd = sc.parallelize([1,1,3,3,5,5,7,8,8,9,10])
rdd2 = rdd.distinct()
print(rdd2.collect())
|
数据计算-sortBy方法
功能:对RDD数据
进行排序基于你指定的排序依据
接收一个处理函数可用lambda快速编写,函数表示用来决定排序的依据,可以控制升序或降序,全局排序需要设置分区数为1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| from pyspark import SparkConf,SparkContext import os
os.environ['PYSPARK_PYTHON'] = 'F:\Python\python.exe' conf = SparkConf().setMaster("local[*]").setAppName("test_spark") sc = SparkContext(conf=conf)
rdd = sc.textFile("F:/hello.txt")
word_rdd = rdd.flatMap(lambda x: x.split(" "))
word_with_one_rdd = word_rdd.map(lambda word: (word,1))
result_rdd = word_with_one_rdd.reduceByKey(lambda a, b: a + b) print(result_rdd.collect())
final_rdd = result_rdd.sortBy(lambda x: x[1],ascending=False, numPartitions=1) print(final_rdd.collect())
|
数据输出-输出为Python对象
Spark的编程流程就是:将数据加载为RDD(数据输入)-对RDD进行计算(数据计算)-将RDD转换为Python对象(数据输出)
数据输出的方法:
- collect:将RDD内容转换为list
- reduce:对RDD内容进行自定义聚合
- take:取出RDD的前N个元素组成list
- count:统计RDD元素个数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| from pyspark import SparkConf,SparkContext import os import json os.environ['PYSPARK_PYTHON'] = 'D:/dev/python/python310/python.exe' conf = SparkConf().setMaster("local[*]").setAppName("test_spark") sc = SparkContext(conf=conf)
rdd = sc.parallelize([1,2,3,4,5])
rdd_list: list = rdd.collect() print(rdd_list) print(type(rdd_list))
num = rdd.reduce(lambda a, b: a + b) print(num)
take_list = rdd.take(3) print(take_list)
num_count = rdd.count() print(f"rdd内有{num_count}个元素")
sc.stop()
|
数据输出-输出到文件中
saveAsTextFile算子功能: 将RDD的数据写入文本文件中,支持本地写出hdfs等文件系统
RDD输出到文件的方法:
- rdd.saveAsTextFile
- 输出的结果是一个文件夹
- 有几个分区就输出多少个结果文件
如何修改RDD分区:
- SparkConf对象设置conf.set(“spark.default.parallelism”,”1”)
- 创建RDD的时候,sc.parallelize方法传入numSlices参数为1
闭包
在函数嵌套的前提下,内部函数使用了外部函数的变量,并且外部函数返回了内部函数,我们把这个使用外部函数变量的内部函数称为闭包
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| """ def outer(logo): def inner(msg): print(f"<{logo}>{msg}<{logo}>")
return inner
fn1 = outer("黑马程序员") fn1("大家好") fn1("大家好") fn2 = outer("传智教育") fn2("大家好") """
""" def outer(num1): def inner(num2): nonlocal num1 num1 += num2 print(num1) return inner fn = outer(10) fn(10) fn(10) fn(10) fn(10) """
def account_create(initial_amount=0): def atm(num,deposit=True): nonlocal initial_amount if deposit: initial_amount += num print(f"存款: +{num}, 账户余额: {initial_amount}") else: initial_amount -= num print(f"取款:-{num}, 账户余额: {initial_amount}") return atm atm = account_create()
atm(100) atm(200)
|
闭包的好处和缺点:
优点: 不定义全局变量,也可以让函数持续访问和修改一个外部变量
优点:闭包函数引用的外部变量,是外层函数的内部变量。作用域封闭难以被误操作修改
缺点:额外的内存占用
nonlocal关键字的作用:
在闭包函数(内部函数中)想要修改外部函数的变量值
需要用nonlocal声明这个外部变量
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| """ def outer(logo): def inner(msg): print(f"<{logo}>{msg}<{logo}>")
return inner
fn1 = outer("黑马程序员") fn1("大家好") fn1("大家好") fn2 = outer("传智教育") fn2("大家好") """
""" def outer(num1): def inner(num2): nonlocal num1 num1 += num2 print(num1) return inner fn = outer(10) fn(10) fn(10) fn(10) fn(10) """
def account_create(initial_amount=0): def atm(num,deposit=True): nonlocal initial_amount if deposit: initial_amount += num print(f"存款: +{num}, 账户余额: {initial_amount}") else: initial_amount -= num print(f"取款:-{num}, 账户余额: {initial_amount}") return atm atm = account_create()
atm(100) atm(200)
|
装饰器
装饰器其实也是一种闭包,其功能就是在不破坏目标函数原有的代码和功能的前提下,为目标函数增加新功能。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| """ # 装饰器的一般写法(闭包) def outer(func): def inner(): print("我睡觉了") func() print("我起床了")
return inner
def sleep(): import random import time print("睡眠中......") time.sleep(random.randint(1, 5))
fn = outer(sleep) fn() """
def outer(func): def inner(): print("我睡觉了") func() print("我起床了")
return inner
@outer def sleep(): import random import time print("睡眠中......") time.sleep(random.randint(1,5))
sleep()
|
设计模式-单例模式
单例模式(Singleton Pattern)
是一种常用的软件设计模式,该模式的主要目的是确保某一个类只有一个实例存在。
在整个系统中,某个类只能出现一个实例时,单例对象就能派上用场。
定义:保证一个类只有一个实例,并提供一个访问它的全局访问点
适用场景:当一个类只能有一个实例,而客户可以从一个众所周知的访问点访问它时。
- 什么是设计模式:设计模式就是一种编程套路,使用特定的套路得到特定的效果
- 什么是单例设计模式单例模式就是对一个类,只获取其唯一的类实例对象,持续复用它,节省内存,节省创建对象的开销
1 2 3 4 5 6 7 8 9
|
class StrTools: pass
s1 = StrTools() s2 = StrTools() print(id(s1)) print(id(s2))
|
设计模式-工厂模式
当需要大量创建一个类的实例的时候,可以使用工厂模式即,从原生的使用类的构造去创建对象的形式迁移到,基于工厂提供的方法去创建对象的形式
使用工厂类的get_person()方法
去创建具体的类对象
- 大批量创建对象的时候有统一的入口,易于代码维护
- 当发生修改,仅修改工厂类的创建方法即可
- 符合现实世界的模式即由工厂来制作产品(对象)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| class Person: pass class Worker(Person): pass class Student(Person): pass class Teacher(Person): pass
class PersonFactory: def get_person(self, p_type): if p_type == 'w': return Worker() elif p_type == 's': return Student() else: return Teacher()
pf = PersonFactory() worker = pf.get_person('w') stu = pf.get_person('s') teacher = pf.get_person('t')
|
多线程并行执行概念
现代操作系统比如MacOS X,UNIX,Linux,Windows等,都是支持”多任务”的操作系统.
进程:就是一个程序,运行在系统之上,那么便称之这个程序为一个运行进程,并分配进程ID方便系统管理。
线程:线程是归属于进程的一个进程可以开启多个线程,执行不同的工作,是进程的实际工作最小单位。
进程就好比一家公司,是操作系统对程序进行运行管理的单位
线程就好比公司的员工,进程可以有多个线程(员工),是进程实际的工作者
注意点:
进程之间是内存隔离的,即不同的进程拥有各自的内存空间。这就类似于不同的公司拥有不同的办公场所.
线程之间是内存共享的,线程是属于进程的,一个进程内的多个线程之间是共享这个进程所拥有的内存空间的这就好比,公司员工之间是共享公司的办公场所。
多个进程同时在运行,即不同的程序同时运行,称之为:多任务并行执行
一个进程内的多个线程同时在运行,称之为:多线程并行执行
多线程编程
threading模块的使用:
thread_obj = threading.Thread(target=func)
创建线程对象
thread_obj.start()
启动线程执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| import time import threading
def sing(msg): while True: print(msg) time.sleep(1) def dance(msg): while True: print(msg) time.sleep(1)
if __name__ == '__main__': sing_thread = threading.Thread(target=sing,args=("我要唱歌 哈哈哈", )) dance_thread = threading.Thread(target=dance, kwargs={"msg":"我在跳舞哦 啦啦啦"})
sing_thread.start() dance_thread.start()
|
Socket服务端开发
socket
(简称 套接字)是进程之间通信一个工具,好比现实生活中的插座,所有的家用电器要想工作都是基于插座进行进程之间想要进行网络通信需要socket。
Socket负责进程之间的网络数据传输,好比数据的搬运工。
客户端和服务端
2个进程之间通过Socket进行相互通讯,就必须有服务端和客户端
Socket服务端:等待其它进程的连接、可接受发来的消息、可以回复消息Socket客户端: 主动连接服务端、可以发送消息、可以接收回复
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| import socket
socket_server = socket.socket()
socket_server.bind(("localhost", 8888))
socket_server.listen(1)
conn, address = socket_server.accept()
print(f"接收到了客户端的链接,客户端的信息是: {address}")
while True: data: str = conn.recv(124).decode("UTF-8") print(f"客户端发来的消息是,{data}") msg = input("请输入你要和客户端回复的消息:").encode("UTF-8") if msg == 'exit': break conn.send(msg)
conn.close() socket_server.close()
|
链接:https://pan.baidu.com/s/1aof_H-PLZWSfoSN1xPnCww?pwd=2155
提取码:2155
Socket客户端开发
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| import socket
socket_client = socket.socket()
socket_client.connect(("localhost", 8888))
while True: msg = input("请输入要给服务端发送的消息:") if msg == 'exit': break socket_client.send("你好呀".encode("UTF-8")) recv_data = socket_client.recv(1024) print(f"服务端回复的消息是: {recv_data.decode('UTF-8')}")
socket_client.close()
|
正则表达式-基础方法
正则表达式,又称规则表达式(Regular Expression),是使用单个字符串来描述、匹配某个句法规则的字符串,常被用来检索、替换那些符合某个模式(规则)的文本。
简单来说,正则表达式就是使用:字符串定义规则,并通过规则去验证字符串是否匹配。
比如,验证一个字符串是否是符合条件的电子邮箱地址,只需要配置好正则规则,即可匹配任意邮箱。
比如通过正则规则:(^[\w-]+(\.[\w-]*@[w-]+[\.w-]+)+$)
即可匹配一个字符串是否是标准邮箱格式
Python正则表达式使用re模块,并基于re模块中三个基础方法来做正则匹配。
分别是:match、search、findall 三个基础方法
re.match(匹配规则,被匹配字符串)
从被匹配字符串开头进行匹配,匹配成功返回匹配对象(包含匹配的信息),匹配不成功返回None
1 2 3 4 5 6 7 8 9
| import re
s = "python itheima"
result = re.match("python", s)
print(result) print(result.span()) print(result.group())
|
search(匹配规则,被匹配字符串)
搜索整个字符串,找出匹配的。从前向后,找到第一个后,就停止,不会继续向后
整个字符串都找不到,返回None
1 2 3 4 5 6 7 8 9 10 11
| import re
s = "1python itheima python python"
result = re.match("python", s) print(result)
result = re.search("python2", s) print(result)
|
findall(匹配规则,被匹配字符串)匹配整个字符串,找出全部匹配项
找不到返回空list:[ ]
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| import re
s = "1python itheima python python"
result = re.match("python", s) print(result)
result = re.search("python2", s) print(result)
result = re.findall("python", s) print(result)
|
正则表达式-元字符匹配
单字符匹配:
字符 |
功能 |
. |
匹配任意1个字符(除了\n),.匹配点本身 |
[] |
匹配[]中列举的字符 |
\d |
匹配数字,即0-9 |
\D |
匹配非数字 |
\s |
匹配空白,即空格、tab键 |
\S |
匹配非空白 |
\w |
匹配单词字符,即a-z、A-Z、0-9、_ |
\W |
匹配非单词字符 |
示例:
字符串s = "itheima1 @@python2 !!666 ##itcast3"
找出全部数字:re.findall(r'\d',s)
字符串的r
标记,表示当前字符串是原始字符串,即内部的转义字符无效而是普通字符
找出特殊字符:找出全部英文字母
re.findall(r'[a-zA-Z]', s)
[]内可以写:[a-zA-Z0-9]这三种范围组合或指定单个字符如[aceDFG135]
数量匹配:
字符 |
功能 |
* |
匹配前一个规则的字符出现0至无数次 |
+ |
匹配前一个规则的字符出现1至无数次 |
? |
匹配前一个规则的字符出现0次或1次 |
{m} |
匹配前一个规则的字符出现m次 |
{m,} |
匹配前一个规则的字符出现最少m次 |
{m,n} |
匹配前一个规则的字符出现m到n次 |
边界匹配:
字符 |
功能 |
^ |
匹配字符串开头 |
$ |
匹配字符串结尾 |
\b |
匹配一个单词的边界 |
\B |
匹配非单词边界 |
分组匹配:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| import re
r = r'^[\w-]+(\.[\w-]+)*@(qq|163|gmail)(\.[\w-]+)+$' s = 'a.b.c.d.e.f.g@qq.com.a.z.c.d.e' print(re.match(r,s))
|
递归
- 递归在编程中是一种非常重要的算法
递归:即方法(函数)自己调用自己的一种特殊编程写法,函数调用自己,即称之为递归调用
- 递归需要注意什么?
注意退出的条件,否则容易变成无限递归
注意返回值的传递,确保从最内层,层层传递到最外层
- OS模块的3个方法
os.listdir,列出指定目录下的内容
os.path.isdir,判断给定路径是否是文件夹,是返回True,否返回False
os.path.exists,判断给定路径是否存在,存在返回True,否则返回False