时间:2021-05-22
pyspark是Spark对Python的api接口,可以在Python环境中通过调用pyspark模块来操作spark,完成大数据框架下的数据分析与挖掘。其中,数据的读写是基础操作,pyspark的子模块pyspark.sql 可以完成大部分类型的数据读写。文本介绍在pyspark中读写Mysql数据库。
在Python中使用Spark,需要安装配置Spark,这里跳过配置的过程,给出运行环境和相关程序版本信息。
pyspark连接Mysql是通过java实现的,所以需要下载连接Mysql的jar包。
下载地址
选择下载Connector/J,然后选择操作系统为Platform Independent,下载压缩包到本地。
然后解压文件,将其中的jar包mysql-connector-java-8.0.19.jar放入spark的安装目录下,例如D:\spark\spark-3.0.0-preview2-bin-hadoop2.7\jars。
环境配置完成!
脚本如下:
from pyspark.sql import SQLContext, SparkSessionif __name__ == '__main__': # spark 初始化 spark = SparkSession. \ Builder(). \ appName('sql'). \ master('local'). \ getOrCreate() # mysql 配置(需要修改) prop = {'user': 'xxx', 'password': 'xxx', 'driver': 'com.mysql.cj.jdbc.Driver'} # database 地址(需要修改) url = 'jdbc:mysql://host:port/database' # 读取表 data = spark.read.jdbc(url=url, table='tb_newCity', properties=prop) # 打印data数据类型 print(type(data)) # 展示数据 data.show() # 关闭spark会话 spark.stop()运行脚本,输出如下:
脚本如下:
import pandas as pdfrom pyspark import SparkContextfrom pyspark.sql import SQLContext, Rowif __name__ == '__main__': # spark 初始化 sc = SparkContext(master='local', appName='sql') spark = SQLContext(sc) # mysql 配置(需要修改) prop = {'user': 'xxx', 'password': 'xxx', 'driver': 'com.mysql.cj.jdbc.Driver'} # database 地址(需要修改) url = 'jdbc:mysql://host:port/database' # 创建spark DataFrame # 方式1:list转spark DataFrame l = [(1, 12), (2, 22)] # 创建并指定列名 list_df = spark.createDataFrame(l, schema=['id', 'value']) # 方式2:rdd转spark DataFrame rdd = sc.parallelize(l) # rdd col_names = Row('id', 'value') # 列名 tmp = rdd.map(lambda x: col_names(*x)) # 设置列名 rdd_df = spark.createDataFrame(tmp) # 方式3:pandas dataFrame 转spark DataFrame df = pd.DataFrame({'id': [1, 2], 'value': [12, 22]}) pd_df = spark.createDataFrame(df) # 写入数据库 pd_df.write.jdbc(url=url, table='new', mode='append', properties=prop) # 关闭spark会话 sc.stop()注意点:
prop和url参数同样需要根据实际情况修改;
写入数据库要求的对象类型是spark DataFrame,提供了三种常见数据类型转spark DataFrame的方法;
通过调用write.jdbc方法进行写入,其中的model参数控制写入数据的行为。
model 参数解释 error 默认值,原表存在则报错 ignore 原表存在,不报错且不写入数据 append 新数据在原表行末追加 overwrite 覆盖原表
Access denied for user …
原因:mysql配置参数出错
解决办法:检查user,password拼写,检查账号密码是否正确,用其他工具测试mysql是否能正常连接,做对比检查。
No suitable driver
原因:没有配置运行环境
解决办法:下载jar包进行配置,具体过程参考本文的2 环境配置。
到此这篇关于pyspark对Mysql数据库进行读写的实现的文章就介绍到这了,更多相关pyspark Mysql读写内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!
声明:本页内容来源网络,仅供用户参考;我单位不保证亦不表示资料全面及准确无误,也不保证亦不表示这些资料为最新信息,如因任何原因,本网内容或者用户因倚赖本网内容造成任何损失或损害,我单位将不会负任何法律责任。如涉及版权问题,请提交至online#300.cn邮箱联系删除。
首先,上个人网站的留言页面,大家可以看看效果:留言板前端为了省事,使用jQuery编写,后台使用php简单读写MySQL数据库。数据库设计和实现思路数据库创建了
MySQL的binlog日志是MySQL日志中非常重要的一种日志,记录了数据库所有的DML操作。通过binlog日志我们可以进行数据库的读写分离、数据增量备份以
本文实例讲述了Yii实现多数据库主从读写分离的方法。分享给大家供大家参考。具体分析如下:Yii框架数据库多数据库、主从、读写分离实现,功能描述:1.实现主从数据
帝国CMS7.0支持多MYSQL服务器读写分离,可减少数据库压力,使网站更稳定:1、支持主、从数据库服务器读写分离。2、支持后台使用独立的MYSQL读写服务器,
如何利用MySQLWorkbench对mysql数据库进行备份?Mysqlworkbench是一款Mysql官方推出的数据库设计建模工具,可建立数据库文档,以及