windowns使用PySpark环境配置和基本操作
下载依赖
首先需要下载hadoop和spark,解压,然后设置环境变量。
hadoop清华源下载
spark清华源下载
HADOOP_HOME => /path/hadoop SPARK_HOME => /path/spark
安装pyspark。
pip install pyspark
基本使用
可以在shell终端,输入pyspark,有如下回显:
输入以下指令进行测试,并创建SparkContext,SparkContext是任何spark功能的入口点。
>>> from pyspark import SparkContext >>> sc = SparkContext("local", "First App")
如果以上不会报错,恭喜可以开始使用pyspark编写代码了。
不过,我这里使用IDE来编写代码,首先我们先在终端执行以下代码关闭SparkContext。
>>> sc.stop()
下面使用pycharm编写代码,如果修改了环境变量需要先重启pycharm。
在pycharm运行如下程序,程序会起本地模式的spark计算引擎,通过spark统计abc.txt文件中a和b出现行的数量,文件路径需要自己指定。
from pyspark import SparkContext sc = SparkContext("local", "First App") logFile = "abc.txt" logData = sc.textFile(logFile).cache() numAs = logData.filter(lambda s: 'a' in s).count() numBs = logData.filter(lambda s: 'b' in s).count() print("Line with a:%i,line with b:%i" % (numAs, numBs))
运行结果如下:
20/03/11 16:15:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
20/03/11 16:15:58 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
Line with a:3,line with b:1
这里说一下,同样的工作使用python可以做,spark也可以做,使用spark主要是为了高效的进行分布式计算。
戳pyspark教程
戳spark教程
RDD
RDD代表Resilient Distributed Dataset,它们是在多个节点上运行和操作以在集群上进行并行处理的元素,RDD是spark计算的操作对象。
一般,我们先使用数据创建RDD,然后对RDD进行操作。
对RDD操作有两种方法:
Transformation(转换) - 这些操作应用于RDD以创建新的RDD。例如filter,groupBy和map。
Action(操作) - 这些是应用于RDD的操作,它指示Spark执行计算并将结果发送回驱动程序,例如count,collect等。
创建RDD
parallelize是从列表创建RDD,先看一个例子:
from pyspark import SparkContext sc = SparkContext("local", "count app") words = sc.parallelize( ["scala", "java", "hadoop", "spark", "akka", "spark vs hadoop", "pyspark", "pyspark and spark" ]) print(words)
结果中我们得到一个对象,就是我们列表数据的RDD对象,spark之后可以对他进行操作。
ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195
Count
count方法返回RDD中的元素个数。
from pyspark import SparkContext sc = SparkContext("local", "count app") words = sc.parallelize( ["scala", "java", "hadoop", "spark", "akka", "spark vs hadoop", "pyspark", "pyspark and spark" ]) print(words) counts = words.count() print("Number of elements in RDD -> %i" % counts)
返回结果:
Number of elements in RDD -> 8
Collect
collect返回RDD中的所有元素。
from pyspark import SparkContext sc = SparkContext("local", "collect app") words = sc.parallelize( ["scala", "java", "hadoop", "spark", "akka", "spark vs hadoop", "pyspark", "pyspark and spark" ]) coll = words.collect() print("Elements in RDD -> %s" % coll)
返回结果:
Elements in RDD -> ['scala', 'java', 'hadoop', 'spark', 'akka', 'spark vs hadoop', 'pyspark', 'pyspark and spark']
foreach
每个元素会使用foreach内的函数进行处理,但是不会返回任何对象。
下面的程序中,我们定义的一个累加器accumulator,用于储存在foreach执行过程中的值。
from pyspark import SparkContext sc = SparkContext("local", "ForEach app") accum = sc.accumulator(0) data = [1, 2, 3, 4, 5] rdd = sc.parallelize(data) def increment_counter(x): print(x) accum.add(x) return 0 s = rdd.foreach(increment_counter) print(s) # None print("Counter value: ", accum)
返回结果:
None
Counter value: 15
filter
返回一个包含元素的新RDD,满足过滤器的条件。
from pyspark import SparkContext sc = SparkContext("local", "Filter app") words = sc.parallelize( ["scala", "java", "hadoop", "spark", "akka", "spark vs hadoop", "pyspark", "pyspark and spark"] ) words_filter = words.filter(lambda x: 'spark' in x) filtered = words_filter.collect() print("Fitered RDD -> %s" % (filtered)) Fitered RDD -> ['spark', 'spark vs hadoop', 'pyspark', 'pyspark and spark']
也可以改写成这样:
from pyspark import SparkContext sc = SparkContext("local", "Filter app") words = sc.parallelize( ["scala", "java", "hadoop", "spark", "akka", "spark vs hadoop", "pyspark", "pyspark and spark"] ) def g(x): for i in x: if "spark" in x: return i words_filter = words.filter(g) filtered = words_filter.collect() print("Fitered RDD -> %s" % (filtered))
map
将函数应用于RDD中的每个元素并返回新的RDD。
from pyspark import SparkContext sc = SparkContext("local", "Map app") words = sc.parallelize( ["scala", "java", "hadoop", "spark", "akka", "spark vs hadoop", "pyspark", "pyspark and spark"] ) words_map = words.map(lambda x: (x, 1, "_{}".format(x))) mapping = words_map.collect() print("Key value pair -> %s" % (mapping))
返回结果:
Key value pair -> [('scala', 1, '_scala'), ('java', 1, '_java'), ('hadoop', 1, '_hadoop'), ('spark', 1, '_spark'), ('akka', 1, '_akka'), ('spark vs hadoop', 1, '_spark vs hadoop'), ('pyspark', 1, '_pyspark'), ('pyspark and spark', 1, '_pyspark and spark')]
Reduce
执行指定的可交换和关联二元操作后,然后返回RDD中的元素。
from pyspark import SparkContext from operator import add sc = SparkContext("local", "Reduce app") nums = sc.parallelize([1, 2, 3, 4, 5]) adding = nums.reduce(add) print("Adding all the elements -> %i" % (adding))
这里的add是python内置的函数,可以使用ide查看:
def add(a, b): "Same as a + b." return a + b
reduce会依次对元素相加,相加后的结果加上其他元素,最后返回结果(RDD中的元素)。
Adding all the elements -> 15
Join
返回RDD,包含两者同时匹配的键,键包含对应的所有元素。
from pyspark import SparkContext sc = SparkContext("local", "Join app") x = sc.parallelize([("spark", 1), ("hadoop", 4), ("python", 4)]) y = sc.parallelize([("spark", 2), ("hadoop", 5)]) print("x =>", x.collect()) print("y =>", y.collect()) joined = x.join(y) final = joined.collect() print( "Join RDD -> %s" % (final))
返回结果:
x => [('spark', 1), ('hadoop', 4), ('python', 4)]
y => [('spark', 2), ('hadoop', 5)]
Join RDD -> [('hadoop', (4, 5)), ('spark', (1, 2))]
到此这篇关于windowns使用PySpark环境配置和基本操作的文章就介绍到这了,更多相关PySpark环境配置 内容请搜索猪先飞以前的文章或继续浏览下面的相关文章希望大家以后多多支持猪先飞!
相关文章
- 这篇文章主要介绍了使用maven的profile构建不同环境配置的方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧...2021-01-26
- MySQL日志主要包含:错误日志、查询日志、慢查询日志、事务日志、二进制日志;日志是mysql数据库的重要组成部分。日志文件中记录着mysql数据库运行期间发生的变化;也就是说用来记录mysql数据库的客户端连接状况、SQL语句...2015-11-24
- C# 对XML基本操作包括读取节点的数据,添加节点。读取节点属性,修改节点属性等...2020-06-25
- 这篇文章主要介绍了Pycharm中Python环境配置常见问题,结合图文形式分析了Pycharm中Python环境配置模块路径问题、虚拟环境创建、配置远程服务器、连接数据库等常见问题与操作方法,需要的朋友可以参考下...2020-04-27
- 这篇文章主要介绍了IDEA Java win10环境配置,本文通过图文并茂的形式给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下...2020-07-13
Windows server 2003 服务器环境配置 新手简明版
Windows server 2003 服务器环境配置 新手简明版,第一次接触win2003服务器的朋友应该也可以参考配置下。...2016-01-27- 这篇文章主要介绍了VSCode C++环境配置过程,在这大家需要在代码的目录下的.vscode文件夹下创建launch.json、tasks.json,具体实现过程跟随小编一起看看吧...2021-11-03
pyspark操作hive分区表及.gz.parquet和part-00000文件压缩问题
这篇文章主要介绍了pyspark操作hive分区表及.gz.parquet和part-00000文件压缩问题,针对问题整理了spark操作hive表的几种方式,需要的朋友可以参考下...2021-08-25- UNIX下的PHP环境配置 所需软件php-3.0.14-win32.zip php-3.0.14-win32.zip mysql-shareware-3.22.32-win.zip 所有软件均安装在/export/home/guoj/下,也可在...2016-11-25
- Windows: 为了以后重装系统方便,建议不要安装在系统盘,这里安装在D盘。可以是根目录,亦可以是其它目录,但最好目录名中不要有空格,这样可以避免某些错误的出现。 配置Apache和PHP...2016-01-28
- 二叉树是数据结构中的树的一种特殊情况,有关二叉树的相关概念,这里不再赘述,如果不了解二叉树相关概念,建议先学习数据结构中的二叉树的知识点...2020-04-25
- 好长时间没有写博客了,最近正在学习php,所以把环境配置与大家分享一下。 好长时间没有写博客了,最近正在学习php,所以把环境配置与大家分享一下。 软件准备: 准备的软...2016-11-25
- 这篇文章主要介绍了Python字符串的15个基本操作,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧...2021-02-04
- 字段类型 1.INT[(M)] 正常大小整数类型 2.DOUBLE[(M,D)] [ZEROFILL] 正常大小(双精密)浮点数字类型 3.DATE 日期类型。支持的...2016-11-25
- 为了便于操作,使用pyspark时我们通常将数据转为DataFrame的形式来完成清洗和分析动作。那么你知道pyspark创建DataFrame有几种方法吗,下面就一起来了解一下...2021-05-17
- 这篇文章主要介绍了在windwos8.1中php环境配置方法,大家参考使用吧...2016-01-27
php 5.3环境配置方法(apache2.2 mysql5.1)
本款php 5.3环境配置方法是利用Apache2.2.16+PHP5.3.3+MySQL5.1.49的配置哦,下面来看看安装配置教程哦。 第一步:下载安装的文件 1. mysql:下载地址mysql-5.1.49-w...2016-11-25- 文件下载网址 Apache 2.2.4 -- www.apache.com PHP 5.2.0 -- www.php.net MySQL 5.0.27 -- www.mysql....2016-11-25
- 上一篇文章中老顾介绍了logback基本配置,了解了日志配置的基本方式.我们平时在系统开发时,开发环境与生产环境的日志配置会不一样;那今天老顾就跟大家介绍一下如何实现多环境配置,需要的朋友可以参考下...2021-06-16
- pyspark是Spark对Python的api接口,可以在Python环境中通过调用pyspark模块来操作spark,这篇文章主要介绍了windowns使用PySpark环境配置和基本操作,感兴趣的可以了解一下...2021-05-17