Hadoop是對大數(shù)據(jù)集進行分布式計算的標(biāo)準(zhǔn)工具,這也是為什么當(dāng)你穿過機場時能看到”大數(shù)據(jù)(Big Data)”廣告的原因。它已經(jīng)成為大數(shù)據(jù)的操作系統(tǒng),提供了包括工具和技巧在內(nèi)的豐富生態(tài)系統(tǒng),允許使用相對便宜的商業(yè)硬件集群進行超級計算機級別的計算。2003和2004年,兩個來自Google的觀點使Hadoop成為可能:一個分布式存儲框架(Google文件系統(tǒng)),在Hadoop中被實現(xiàn)為HDFS;一個分布式計算框架(MapReduce)。
這兩個觀點成為過去十年規(guī)模分析(scaling analytics)、大規(guī)模機器學(xué)習(xí)(machine learning),以及其他大數(shù)據(jù)應(yīng)用出現(xiàn)的主要推動力!但是,從技術(shù)角度上講,十年是一段非常長的時間,而且Hadoop還存在很多已知限制,尤其是MapReduce。對MapReduce編程明顯是困難的。對大多數(shù)分析,你都必須用很多步驟將Map和Reduce任務(wù)串接起來。這造成類SQL的計算或機器學(xué)習(xí)需要專門的系統(tǒng)來進行。更糟的是,MapReduce要求每個步驟間的數(shù)據(jù)要序列化到磁盤,這意味著MapReduce作業(yè)的I/O成本很高,導(dǎo)致交互分析和迭代算法(iterative algorithms)開銷很大;而事實是,幾乎所有的最優(yōu)化和機器學(xué)習(xí)都是迭代的。
為了解決這些問題,Hadoop一直在向一種更為通用的資源管理框架轉(zhuǎn)變,即YARN(Yet Another Resource Negotiator, 又一個資源協(xié)調(diào)者)。YARN實現(xiàn)了下一代的MapReduce,但同時也允許應(yīng)用利用分布式資源而不必采用MapReduce進行計算。通過將集群管理一般化,研究轉(zhuǎn)到分布式計算的一般化上,來擴展了MapReduce的初衷。
Spark是第一個脫胎于該轉(zhuǎn)變的快速、通用分布式計算范式,并且很快流行起來。Spark使用函數(shù)式編程范式擴展了MapReduce模型以支持更多計算類型,可以涵蓋廣泛的工作流,這些工作流之前被實現(xiàn)為Hadoop之上的特殊系統(tǒng)。Spark使用內(nèi)存緩存來提升性能,因此進行交互式分析也足夠快速(就如同使用Python解釋器,與集群進行交互一樣)。緩存同時提升了迭代算法的性能,這使得Spark非常適合數(shù)據(jù)理論任務(wù),特別是機器學(xué)習(xí)。
本文中,我們將首先討論如何在本地機器上或者EC2的集群上設(shè)置Spark進行簡單分析。然后,我們在入門級水平探索Spark,了解Spark是什么以及它如何工作(希望可以激發(fā)更多探索)。最后兩節(jié)我們開始通過命令行與Spark進行交互,然后演示如何用Python寫Spark應(yīng)用,并作為Spark作業(yè)提交到集群上。
設(shè)置Spark
在本機設(shè)置和運行Spark非常簡單。你只需要下載一個預(yù)構(gòu)建的包,只要你安裝了Java 6+和Python 2.6+,就可以在Windows、Mac OS X和Linux上運行Spark。確保java程序在PATH環(huán)境變量中,或者設(shè)置了JAVA_HOME環(huán)境變量。類似的,python也要在PATH中。
假設(shè)你已經(jīng)安裝了Java和Python:
訪問Spark下載頁
選擇Spark最新發(fā)布版(本文寫作時是1.2.0),一個預(yù)構(gòu)建的Hadoop 2.4包,直接下載。
現(xiàn)在,如何繼續(xù)依賴于你的操作系統(tǒng),靠你自己去探索了。Windows用戶可以在評論區(qū)對如何設(shè)置的提示進行評論。
一般,我的建議是按照下面的步驟(在POSIX操作系統(tǒng)上):
1.解壓Spark
~$ tar -xzf spark-1.2.0-bin-hadoop2.4.tgz
2.將解壓目錄移動到有效應(yīng)用程序目錄中(如Windows上的
~$ mv spark-1.2.0-bin-hadoop2.4 /srv/spark-1.2.0
3.創(chuàng)建指向該Spark版本的符號鏈接到<spark目錄。這樣你可以簡單地下載新/舊版本的Spark,然后修改鏈接來管理Spark版本,而不用更改路徑或環(huán)境變量。
~$ ln -s /srv/spark-1.2.0 /srv/spark
4.修改BASH配置,將Spark添加到PATH中,設(shè)置SPARK_HOME環(huán)境變量。這些小技巧在命令行上會幫到你。在Ubuntu上,只要編輯~/.bash_profile或~/.profile文件,將以下語句添加到文件中:
export SPARK_HOME=/srv/spark
export PATH=$SPARK_HOME/bin:$PATH
5.source這些配置(或者重啟終端)之后,你就可以在本地運行一個pyspark解釋器。執(zhí)行pyspark命令,你會看到以下結(jié)果:
~$ pyspark
Python 2.7.8 (default, Dec 2 2014, 12:45:58)
[GCC 4.2.1 Compatible Apple LLVM 6.0 (clang-600.0.54)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Sparks default log4j profile: org/apache/spark/log4j-defaults.properties
[… snip …]
Welcome to
____ __
/ __/__ ___ _____/ /__
_ / _ / _ `/ __/ `_/
/__ / .__/_,_/_/ /_/_ version 1.2.0
/_/
Using Python version 2.7.8 (default, Dec 2 2014 12:45:58)
SparkContext available as sc.
>>>
現(xiàn)在Spark已經(jīng)安裝完畢,可以在本機以”單機模式“(standalone mode)使用。你可以在本機開發(fā)應(yīng)用并提交Spark作業(yè),這些作業(yè)將以多進程/多線程模式運行的,或者,配置該機器作為一個集群的客戶端(不推薦這樣做,因為在Spark作業(yè)中,驅(qū)動程序(driver)是個很重要的角色,并且應(yīng)該與集群的其他部分處于相同網(wǎng)絡(luò))??赡艹碎_發(fā),你在本機使用Spark做得最多的就是利用spark-ec2腳本來配置Amazon云上的一個EC2 Spark集群了。
簡略Spark輸出
Spark(和PySpark)的執(zhí)行可以特別詳細(xì),很多INFO日志消息都會打印到屏幕。開發(fā)過程中,這些非常惱人,因為可能丟失Python棧跟蹤或者print的輸出。為了減少Spark輸出 – 你可以設(shè)置$SPARK_HOME/conf下的log4j。首先,拷貝一份$SPARK_HOME/conf/log4j.properties.template文件,去掉“.template”擴展名。
~$ cp $SPARK_HOME/conf/log4j.properties.template $SPARK_HOME/conf/log4j.properties
編輯新文件,用WARN替換代碼中出現(xiàn)的INFO。你的log4j.properties文件類似:
# Set everything to be logged to the console
log4j.rootCategory=WARN, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
# Settings to quiet third party logs that are too verbose
log4j.logger.org.eclipse.jetty=WARN
log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=WARN
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=WARN
現(xiàn)在運行PySpark,輸出消息將會更簡略!感謝@genomegeek在一次District Data Labs的研討會中指出這一點。
在Spark中使用IPython Notebook
當(dāng)搜索有用的Spark小技巧時,我發(fā)現(xiàn)了一些文章提到在PySpark中配置IPython notebook。IPython notebook對數(shù)據(jù)科學(xué)家來說是個交互地呈現(xiàn)科學(xué)和理論工作的必備工具,它集成了文本和Python代碼。對很多數(shù)據(jù)科學(xué)家,IPython notebook是他們的Python入門,并且使用非常廣泛,所以我想值得在本文中提及。
這里的大部分說明都來改編自IPython notebook: 在PySpark中設(shè)置IPython。但是,我們將聚焦在本機以單機模式將IPtyon shell連接到PySpark,而不是在EC2集群。如果你想在一個集群上使用PySpark/IPython,查看并評論下文的說明吧!
1.為Spark創(chuàng)建一個iPython notebook配置
~$ ipython profile create spark
[ProfileCreate] Generating default config file: u'$HOME/.ipython/profile_spark/ipython_config.py'
[ProfileCreate] Generating default config file: u'$HOME/.ipython/profile_spark/ipython_notebook_config.py'
[ProfileCreate] Generating default config file: u'$HOME/.ipython/profile_spark/ipython_nbconvert_config.py'
記住配置文件的位置,替換下文各步驟相應(yīng)的路徑:
2.創(chuàng)建文件$HOME/.ipython/profile_spark/startup/00-pyspark-setup.py,并添加如下代碼:
import os
import sys
# Configure the environment
if 'SPARK_HOME' not in os.environ:
os.environ['SPARK_HOME'] = '/srv/spark'
# Create a variable for our root path
SPARK_HOME = os.environ['SPARK_HOME']
# Add the PySpark/py4j to the Python Path
sys.path.insert(0, os.path.join(SPARK_HOME, "python", "build"))
sys.path.insert(0, os.path.join(SPARK_HOME, "python"))
3.使用我們剛剛創(chuàng)建的配置來啟動IPython notebook。
~$ ipython notebook --profile spark
4.在notebook中,你應(yīng)該能看到我們剛剛創(chuàng)建的變量。
print SPARK_HOME
5.在IPython notebook最上面,確保你添加了Spark context。
from pyspark import SparkContext
sc = SparkContext( 'local', 'pyspark')
6.使用IPython做個簡單的計算來測試Spark context。
def isprime(n):
"""
check if integer n is a prime
"""
# make sure n is a positive integer
n = abs(int(n))
# 0 and 1 are not primes
if n < 2:
return False
# 2 is the only even prime number
if n == 2:
return True
# all other even numbers are not primes
if not n &amp; 1:
return False
# range starts with 3 and only needs to go up the square root of n
# for all odd numbers
for x in range(3, int(n**0.5)+1, 2):
if n % x == 0:
return False
return True
# Create an RDD of numbers from 0 to 1,000,000
nums = sc.parallelize(xrange(1000000))
# Compute the number of primes in the RDD
print nums.filter(isprime).count()
如果你能得到一個數(shù)字而且沒有錯誤發(fā)生,那么你的context正確工作了!
編輯提示:上面配置了一個使用PySpark直接調(diào)用IPython notebook的IPython context。但是,你也可以使用PySpark按以下方式直接啟動一個notebook: $ IPYTHON_OPTS=”notebook –pylab inline” pyspark
哪個方法好用取決于你使用PySpark和IPython的具體情景。前一個允許你更容易地使用IPython notebook連接到一個集群,因此是我喜歡的方法。
在EC2上使用Spark
在講授使用Hadoop進行分布式計算時,我發(fā)現(xiàn)很多可以通過在本地偽分布式節(jié)點(pseudo-distributed node)或以單節(jié)點模式(single-node mode)講授。但是為了了解真正發(fā)生了什么,就需要一個集群。當(dāng)數(shù)據(jù)變得龐大,這些書面講授的技能和真實計算需求間經(jīng)常出現(xiàn)隔膜。如果你肯在學(xué)習(xí)詳細(xì)使用Spark上花錢,我建議你設(shè)置一個快速Spark集群做做實驗。 包含5個slave(和1個master)每周大概使用10小時的集群每月大概需要$45.18。
完整的討論可以在Spark文檔中找到:在EC2上運行Spark在你決定購買EC2集群前一定要通讀這篇文檔!我列出了一些關(guān)鍵點:
通過AWS Console獲取AWS EC2 key對(訪問key和密鑰key)。
將key對導(dǎo)出到你的環(huán)境中。在shell中敲出以下命令,或者將它們添加到配置中。
export AWS_ACCESS_KEY_ID=myaccesskeyid
export AWS_SECRET_ACCESS_KEY=mysecretaccesskey
注意不同的工具使用不同的環(huán)境名稱,確保你用的是Spark腳本所使用的名稱。
3.啟動集群:
~$ cd $SPARK_HOME/ec2
ec2$ ./spark-ec2 -k <keypair> -i <key-file> -s <num-slaves> launch <cluster-name>
4.SSH到集群來運行Spark作業(yè)。
ec2$ ./spark-ec2 -k <keypair> -i <key-file> login <cluster-name>
5.銷毀集群
ec2$ ./spark-ec2 destroy <cluster-name>.
這些腳本會自動創(chuàng)建一個本地的HDFS集群來添加數(shù)據(jù),copy-dir命令可以同步代碼和數(shù)據(jù)到該集群。但是你最好使用S3來存儲數(shù)據(jù),創(chuàng)建使用s3://URI來加載數(shù)據(jù)的RDDs。
Spark是什么?
既然設(shè)置好了Spark,現(xiàn)在我們討論下Spark是什么。Spark是個通用的集群計算框架,通過將大量數(shù)據(jù)集計算任務(wù)分配到多臺計算機上,提供高效內(nèi)存計算。如果你熟悉Hadoop,那么你知道分布式計算框架要解決兩個問題:如何分發(fā)數(shù)據(jù)和如何分發(fā)計算。Hadoop使用HDFS來解決分布式數(shù)據(jù)問題,MapReduce計算范式提供有效的分布式計算。類似的,Spark擁有多種語言的函數(shù)式編程API,提供了除map和reduce之外更多的運算符,這些操作是通過一個稱作彈性分布式數(shù)據(jù)集(resilient distributed datasets, RDDs)的分布式數(shù)據(jù)框架進行的。
本質(zhì)上,RDD是種編程抽象,代表可以跨機器進行分割的只讀對象集合。RDD可以從一個繼承結(jié)構(gòu)(lineage)重建(因此可以容錯),通過并行操作訪問,可以讀寫HDFS或S3這樣的分布式存儲,更重要的是,可以緩存到worker節(jié)點的內(nèi)存中進行立即重用。由于RDD可以被緩存在內(nèi)存中,Spark對迭代應(yīng)用特別有效,因為這些應(yīng)用中,數(shù)據(jù)是在整個算法運算過程中都可以被重用。大多數(shù)機器學(xué)習(xí)和最優(yōu)化算法都是迭代的,使得Spark對數(shù)據(jù)科學(xué)來說是個非常有效的工具。另外,由于Spark非常快,可以通過類似Python REPL的命令行提示符交互式訪問。
Spark庫本身包含很多應(yīng)用元素,這些元素可以用到大部分大數(shù)據(jù)應(yīng)用中,其中包括對大數(shù)據(jù)進行類似SQL查詢的支持,機器學(xué)習(xí)和圖算法,甚至對實時流數(shù)據(jù)的支持。
核心組件如下:
Spark Core:包含Spark的基本功能;尤其是定義RDD的API、操作以及這兩者上的動作。其他Spark的庫都是構(gòu)建在RDD和Spark Core之上的。
Spark SQL:提供通過Apache Hive的SQL變體Hive查詢語言(HiveQL)與Spark進行交互的API。每個數(shù)據(jù)庫表被當(dāng)做一個RDD,Spark SQL查詢被轉(zhuǎn)換為Spark操作。對熟悉Hive和HiveQL的人,Spark可以拿來就用。
Spark Streaming:允許對實時數(shù)據(jù)流進行處理和控制。很多實時數(shù)據(jù)庫(如Apache Store)可以處理實時數(shù)據(jù)。Spark Streaming允許程序能夠像普通RDD一樣處理實時數(shù)據(jù)。
MLlib:一個常用機器學(xué)習(xí)算法庫,算法被實現(xiàn)為對RDD的Spark操作。這個庫包含可擴展的學(xué)習(xí)算法,比如分類、回歸等需要對大量數(shù)據(jù)集進行迭代的操作。之前可選的大數(shù)據(jù)機器學(xué)習(xí)庫Mahout,將會轉(zhuǎn)到Spark,并在未來實現(xiàn)。
GraphX:控制圖、并行圖操作和計算的一組算法和工具的集合。GraphX擴展了RDD API,包含控制圖、創(chuàng)建子圖、訪問路徑上所有頂點的操作。
由于這些組件滿足了很多大數(shù)據(jù)需求,也滿足了很多數(shù)據(jù)科學(xué)任務(wù)的算法和計算上的需要,Spark快速流行起來。不僅如此,Spark也提供了使用Scala、Java和Python編寫的API;滿足了不同團體的需求,允許更多數(shù)據(jù)科學(xué)家簡便地采用Spark作為他們的大數(shù)據(jù)解決方案。
對Spark編程
編寫Spark應(yīng)用與之前實現(xiàn)在Hadoop上的其他數(shù)據(jù)流語言類似。代碼寫入一個惰性求值的驅(qū)動程序(driver program)中,通過一個動作(action),驅(qū)動代碼被分發(fā)到集群上,由各個RDD分區(qū)上的worker來執(zhí)行。然后結(jié)果會被發(fā)送回驅(qū)動程序進行聚合或編譯。本質(zhì)上,驅(qū)動程序創(chuàng)建一個或多個RDD,調(diào)用操作來轉(zhuǎn)換RDD,然后調(diào)用動作處理被轉(zhuǎn)換后的RDD。
這些步驟大體如下:
定義一個或多個RDD,可以通過獲取存儲在磁盤上的數(shù)據(jù)(HDFS,Cassandra,HBase,Local Disk),并行化內(nèi)存中的某些集合,轉(zhuǎn)換(transform)一個已存在的RDD,或者,緩存或保存。
通過傳遞一個閉包(函數(shù))給RDD上的每個元素來調(diào)用RDD上的操作。Spark提供了除了Map和Reduce的80多種高級操作。
使用結(jié)果RDD的動作(action)(如count、collect、save等)。動作將會啟動集群上的計算。
當(dāng)Spark在一個worker上運行閉包時,閉包中用到的所有變量都會被拷貝到節(jié)點上,但是由閉包的局部作用域來維護。Spark提供了兩種類型的共享變量,這些變量可以按照限定的方式被所有worker訪問。廣播變量會被分發(fā)給所有worker,但是是只讀的。累加器這種變量,worker可以使用關(guān)聯(lián)操作來“加”,通常用作計數(shù)器。
Spark應(yīng)用本質(zhì)上通過轉(zhuǎn)換和動作來控制RDD。后續(xù)文章將會深入討論,但是理解了這個就足以執(zhí)行下面的例子了。
Spark的執(zhí)行
簡略描述下Spark的執(zhí)行。本質(zhì)上,Spark應(yīng)用作為獨立的進程運行,由驅(qū)動程序中的SparkContext協(xié)調(diào)。這個context將會連接到一些集群管理者(如YARN),這些管理者分配系統(tǒng)資源。集群上的每個worker由執(zhí)行者(executor)管理,執(zhí)行者反過來由SparkContext管理。執(zhí)行者管理計算、存儲,還有每臺機器上的緩存。
重點要記住的是應(yīng)用代碼由驅(qū)動程序發(fā)送給執(zhí)行者,執(zhí)行者指定context和要運行的任務(wù)。執(zhí)行者與驅(qū)動程序通信進行數(shù)據(jù)分享或者交互。驅(qū)動程序是Spark作業(yè)的主要參與者,因此需要與集群處于相同的網(wǎng)絡(luò)。這與Hadoop代碼不同,Hadoop中你可以在任意位置提交作業(yè)給JobTracker,JobTracker處理集群上的執(zhí)行。
與Spark交互
使用Spark最簡單的方式就是使用交互式命令行提示符。打開PySpark終端,在命令行中打出pyspark。
~$ pyspark
[… snip …]
>>>
PySpark將會自動使用本地Spark配置創(chuàng)建一個SparkContext。你可以通過sc變量來訪問它。我們來創(chuàng)建第一個RDD。
>>> text = sc.textFile("shakespeare.txt")
>>> print text
shakespeare.txt MappedRDD[1] at textFile at NativeMethodAccessorImpl.java:-2
textFile方法將莎士比亞全部作品加載到一個RDD命名文本。如果查看了RDD,你就可以看出它是個MappedRDD,文件路徑是相對于當(dāng)前工作目錄的一個相對路徑(記得傳遞磁盤上正確的shakespear.txt文件路徑)。我們轉(zhuǎn)換下這個RDD,來進行分布式計算的“hello world”:“字?jǐn)?shù)統(tǒng)計”。
>>> from operator import add
>>> def tokenize(text):
... return text.split()
...
>>> words = text.flatMap(tokenize)
>>> print words
PythonRDD[2] at RDD at PythonRDD.scala:43
我們首先導(dǎo)入了add操作符,它是個命名函數(shù),可以作為加法的閉包來使用。我們稍后再使用這個函數(shù)。首先我們要做的是把文本拆分為單詞。我們創(chuàng)建了一個tokenize函數(shù),參數(shù)是文本片段,返回根據(jù)空格拆分的單詞列表。然后我們通過給flatMap操作符傳遞tokenize閉包對textRDD進行變換創(chuàng)建了一個wordsRDD。你會發(fā)現(xiàn),words是個PythonRDD,但是執(zhí)行本應(yīng)該立即進行。顯然,我們還沒有把整個莎士比亞數(shù)據(jù)集拆分為單詞列表。
如果你曾使用MapReduce做過Hadoop版的“字?jǐn)?shù)統(tǒng)計”,你應(yīng)該知道下一步是將每個單詞映射到一個鍵值對,其中鍵是單詞,值是1,然后使用reducer計算每個鍵的1總數(shù)。
首先,我們map一下。
>>> wc = words.map(lambda x: (x,1))
>>> print wc.toDebugString()
(2) PythonRDD[3] at RDD at PythonRDD.scala:43
| shakespeare.txt MappedRDD[1] at textFile at NativeMethodAccessorImpl.java:-2
| shakespeare.txt HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:-2
我使用了一個匿名函數(shù)(用了Python中的lambda關(guān)鍵字)而不是命名函數(shù)。這行代碼將會把lambda映射到每個單詞。因此,每個x都是一個單詞,每個單詞都會被匿名閉包轉(zhuǎn)換為元組(word, 1)。為了查看轉(zhuǎn)換關(guān)系,我們使用toDebugString方法來查看PipelinedRDD是怎么被轉(zhuǎn)換的??梢允褂胷educeByKey動作進行字?jǐn)?shù)統(tǒng)計,然后把統(tǒng)計結(jié)果寫到磁盤。
>>> counts = wc.reduceByKey(add)
>>> counts.saveAsTextFile("wc")
一旦我們最終調(diào)用了saveAsTextFile動作,這個分布式作業(yè)就開始執(zhí)行了,在作業(yè)“跨集群地”(或者你本機的很多進程)運行時,你應(yīng)該可以看到很多INFO語句。如果退出解釋器,你可以看到當(dāng)前工作目錄下有個“wc”目錄。
$ ls wc/
_SUCCESS part-00000 part-00001
每個part文件都代表你本機上的進程計算得到的被保持到磁盤上的最終RDD。如果對一個part文件進行head命令,你應(yīng)該能看到字?jǐn)?shù)統(tǒng)計元組。
$ head wc/part-00000
(u'fawn', 14)
(u'Fame.', 1)
(u'Fame,', 2)
(, 1)
(, 1)
(, 1)
(, 1)
(, 1)
(u'fleeces', 1)
(, 1)
注意這些鍵沒有像Hadoop一樣被排序(因為Hadoop中Map和Reduce任務(wù)中有個必要的打亂和排序階段)。但是,能保證每個單詞在所有文件中只出現(xiàn)一次,因為你使用了reduceByKey操作符。你還可以使用sort操作符確保在寫入到磁盤之前所有的鍵都被排過序。
編寫一個Spark應(yīng)用
編寫Spark應(yīng)用與通過交互式控制臺使用Spark類似。API是相同的。首先,你需要訪問<SparkContext,它已經(jīng)由<pyspark自動加載好了。
使用Spark編寫Spark應(yīng)用的一個基本模板如下:
## Spark Application - execute with spark-submit
## Imports
from pyspark import SparkConf, SparkContext
## Module Constants
APP_NAME = "My Spark Application"
## Closure Functions
## Main functionality
def main(sc):
pass
if __name__ == "__main__":
# Configure Spark
conf = SparkConf().setAppName(APP_NAME)
conf = conf.setMaster("local[*]")
sc = SparkContext(conf=conf)
# Execute Main functionality
main(sc)
這個模板列出了一個Spark應(yīng)用所需的東西:導(dǎo)入Python庫,模塊常量,用于調(diào)試和Spark UI的可識別的應(yīng)用名稱,還有作為驅(qū)動程序運行的一些主要分析方法學(xué)。在ifmain中,我們創(chuàng)建了SparkContext,使用了配置好的context執(zhí)行main。我們可以簡單地導(dǎo)入驅(qū)動代碼到pyspark而不用執(zhí)行。注意這里Spark配置通過setMaster方法被硬編碼到SparkConf,一般你應(yīng)該允許這個值通過命令行來設(shè)置,所以你能看到這行做了占位符注釋。
使用<sc.stop()或<sys.exit(0)來關(guān)閉或退出程序。
## Spark Application - execute with spark-submit
## Imports
import csv
import matplotlib.pyplot as plt
from StringIO import StringIO
from datetime import datetime
from collections import namedtuple
from operator import add, itemgetter
from pyspark import SparkConf, SparkContext
## Module Constants
APP_NAME = "Flight Delay Analysis"
DATE_FMT = "%Y-%m-%d"
TIME_FMT = "%H%M"
fields = ('date', 'airline', 'flightnum', 'origin', 'dest', 'dep',
'dep_delay', 'arv', 'arv_delay', 'airtime', 'distance')
Flight = namedtuple('Flight', fields)
## Closure Functions
def parse(row):
"""
Parses a row and returns a named tuple.
"""
row[0] = datetime.strptime(row[0], DATE_FMT).date()
row[5] = datetime.strptime(row[5], TIME_FMT).time()
row[6] = float(row[6])
row[7] = datetime.strptime(row[7], TIME_FMT).time()
row[8] = float(row[8])
row[9] = float(row[9])
row[10] = float(row[10])
return Flight(*row[:11])
def split(line):
"""
Operator function for splitting a line with csv module
"""
reader = csv.reader(StringIO(line))
return reader.next()
def plot(delays):
"""
Show a bar chart of the total delay per airline
"""
airlines = [d[0] for d in delays]
minutes = [d[1] for d in delays]
index = list(xrange(len(airlines)))
fig, axe = plt.subplots()
bars = axe.barh(index, minutes)
# Add the total minutes to the right
for idx, air, min in zip(index, airlines, minutes):
if min > 0:
bars[idx].set_color('#d9230f')
axe.annotate(" %0.0f min" % min, xy=(min+1, idx+0.5), va='center')
else:
bars[idx].set_color('#469408')
axe.annotate(" %0.0f min" % min, xy=(10, idx+0.5), va='center')
# Set the ticks
ticks = plt.yticks([idx+ 0.5 for idx in index], airlines)
xt = plt.xticks()[0]
plt.xticks(xt, [' '] * len(xt))
# minimize chart junk
plt.grid(axis = 'x', color ='white', linestyle='-')
plt.title('Total Minutes Delayed per Airline')
plt.show()
## Main functionality
def main(sc):
# Load the airlines lookup dictionary
airlines = dict(sc.textFile("ontime/airlines.csv").map(split).collect())
# Broadcast the lookup dictionary to the cluster
airline_lookup = sc.broadcast(airlines)
# Read the CSV Data into an RDD
flights = sc.textFile("ontime/flights.csv").map(split).map(parse)
# Map the total delay to the airline (joined using the broadcast value)
delays = flights.map(lambda f: (airline_lookup.value[f.airline],
add(f.dep_delay, f.arv_delay)))
# Reduce the total delay for the month to the airline
delays = delays.reduceByKey(add).collect()
delays = sorted(delays, key=itemgetter(1))
# Provide output from the driver
for d in delays:
print "%0.0f minutes delayedt%s" % (d[1], d[0])
# Show a bar chart of the delays
plot(delays)
if __name__ == "__main__":
# Configure Spark
conf = SparkConf().setMaster("local[*]")
conf = conf.setAppName(APP_NAME)
sc = SparkContext(conf=conf)
# Execute Main functionality
main(sc)
使用<spark-submit命令來運行這段代碼(假設(shè)你已有ontime目錄,目錄中有兩個CSV文件):
~$ spark-submit app.py
這個Spark作業(yè)使用本機作為master,并搜索app.py同目錄下的ontime目錄下的2個CSV文件。最終結(jié)果顯示,4月的總延誤時間(單位分鐘),既有早點的(如果你從美國大陸飛往夏威夷或者阿拉斯加),但對大部分大型航空公司都是延誤的。注意,我們在app.py中使用matplotlib直接將結(jié)果可視化出來了:
這段代碼做了什么呢?我們特別注意下與Spark最直接相關(guān)的main函數(shù)。首先,我們加載CSV文件到RDD,然后把split函數(shù)映射給它。split函數(shù)使用csv模塊解析文本的每一行,并返回代表每行的元組。最后,我們將collect動作傳給RDD,這個動作把數(shù)據(jù)以Python列表的形式從RDD傳回驅(qū)動程序。本例中,airlines.csv是個小型的跳轉(zhuǎn)表(jump table),可以將航空公司代碼與全名對應(yīng)起來。我們將轉(zhuǎn)移表存儲為Python字典,然后使用sc.broadcast廣播給集群上的每個節(jié)點。
接著,main函數(shù)加載了數(shù)據(jù)量更大的flights.csv([譯者注]作者筆誤寫成fights.csv,此處更正)。拆分CSV行完成之后,我們將parse函數(shù)映射給CSV行,此函數(shù)會把日期和時間轉(zhuǎn)成Python的日期和時間,并對浮點數(shù)進行合適的類型轉(zhuǎn)換。每行作為一個NamedTuple保存,名為Flight,以便高效簡便地使用。
有了Flight對象的RDD,我們映射一個匿名函數(shù),這個函數(shù)將RDD轉(zhuǎn)換為一些列的鍵值對,其中鍵是航空公司的名字,值是到達和出發(fā)的延誤時間總和。使用reduceByKey動作和add操作符可以得到每個航空公司的延誤時間總和,然后RDD被傳遞給驅(qū)動程序(數(shù)據(jù)中航空公司的數(shù)目相對較少)。最終延誤時間按照升序排列,輸出打印到了控制臺,并且使用matplotlib進行了可視化。
這個例子稍長,但是希望能演示出集群和驅(qū)動程序之間的相互作用(發(fā)送數(shù)據(jù)進行分析,結(jié)果取回給驅(qū)動程序),以及Python代碼在Spark應(yīng)用中的角色。
結(jié)論
盡管算不上一個完整的Spark入門,我們希望你能更好地了解Spark是什么,如何使用進行快速、內(nèi)存分布式計算。至少,你應(yīng)該能將Spark運行起來,并開始在本機或Amazon EC2上探索數(shù)據(jù)。你應(yīng)該可以配置好iPython notebook來運行Spark。
Spark不能解決分布式存儲問題(通常Spark從HDFS中獲取數(shù)據(jù)),但是它為分布式計算提供了豐富的函數(shù)式編程API。這個框架建立在伸縮分布式數(shù)據(jù)集(RDD)之上。RDD是種編程抽象,代表被分區(qū)的對象集合,允許進行分布式操作。RDD有容錯能力(可伸縮的部分),更重要的時,可以存儲到節(jié)點上的worker內(nèi)存里進行立即重用。內(nèi)存存儲提供了快速和簡單表示的迭代算法,以及實時交互分析。
由于Spark庫提供了Python、Scale、Java編寫的API,以及內(nèi)建的機器學(xué)習(xí)、流數(shù)據(jù)、圖算法、類SQL查詢等模塊;Spark迅速成為當(dāng)今最重要的分布式計算框架之一。與YARN結(jié)合,Spark提供了增量,而不是替代已存在的Hadoop集群,它將成為未來大數(shù)據(jù)重要的一部分,為數(shù)據(jù)科學(xué)探索鋪設(shè)了一條康莊大道。
有用的鏈接
希望你喜歡這篇博文!寫作并不是憑空而來的,以下是一些曾幫助我寫作的有用鏈接;查看這些鏈接,可能對進一步探索Spark有幫助。注意,有些圖書鏈接是推廣鏈接,意味著如果你點擊并購買了這些圖書,你將會支持District Data Labs!
這篇更多是篇入門文章,而不是District Data Labs的典型文章,有些與此入門相關(guān)的數(shù)據(jù)和代碼你可以在這里找到:
Github上的代碼
莎士比亞數(shù)據(jù)集
航空公司時間數(shù)據(jù)集改編自美國交通統(tǒng)計局(US DOT)
Spark論文
Spark與Hadoop一樣,有一些基礎(chǔ)論文,我認(rèn)為那些需要對大數(shù)據(jù)集進行分布式計算的嚴(yán)謹(jǐn)數(shù)據(jù)科學(xué)家一定要讀。首先是HotOS(“操作系統(tǒng)熱門話題”的簡寫)的一篇研討會論文,簡單易懂地描述了Spark。第二個是偏理論的論文,具體描述了RDD。
M. Zaharia, M. Chowdhury, M. J. Franklin, S. Shenker, and I. Stoica, “Spark: cluster computing with working sets,” in Proceedings of the 2nd USENIX conference on Hot topics in cloud computing, 2010, pp. 10–10.
M. Zaharia, M. Chowdhury, T. Das, A. Dave, J. Ma, M. McCauley, M. J. Franklin, S. Shenker, and I. Stoica, “Resilient distributed datasets: A fault-tolerant abstraction for in-memory cluster computing,” in Proceedings of the 9th USENIX conference on Networked Systems Design and Implementation, 2012, pp. 2–2.
Spark圖書
學(xué)習(xí)Spark
使用Spark進行高級分析
有用的博文
設(shè)置IPython以使用PySpark
Databricks的Spark參考應(yīng)用程序
在EC2上運行Spark
在Amazon Elastic MapReduce上運行Spark和SparkSQL
更多信息請查看IT技術(shù)專欄