Apache Spark programozása Python nyelven

Összefoglalás

Ez az olvasólecke gyakorlati tudást nyújt a Spark keretrendszer használatához. Részletesen foglalkozunk a Spark Python nyelvű interaktív parancsértelmező eszközével. Különböző példákon keresztül demonstráljuk a különböző adatforrásokból történő adat beolvasást és a DataFrame API használatát. Több konkrét transzformációt és akciót is bemutatunk az adatokon. A parancssori kliens mellett betekintést nyújtunk abba, hogyan lehet olyan önálló Python alkalmazásokat fejleszteni, melyek átadhatók a Spark keretrendszernek mind végrehajtható feladat.
A lecke fejezetei:
  • 1. fejezet: Apache Spark indítása docker környezetben, Python parancssori kliens használata (olvasó)
  • 2. fejezet: A Word count és árfolyam átlag problmák megoldása Spark Python parancssorban (olvasó)
  • 3. fejezet: Önálló Spark alkalmazások programozása Python nyelven (olvasó)
Téma típusa: gyakorlati
Olvasási idő: 50 perc

Book to read, knowledge, lecture, open book, reading book icon 1. fejezet

Az Apache Spark telepítése/indítása docker környezetben

Ahogy a kapcsolódó előadás olvasóleckéiben (8e_BigData-exec-engines-SPOC.md és 9e_BigData-spark-rdd-df-SPOC.md) már láttuk, az Apache Spark [1] egy villámgyors klaszter számítási keretrendszer, amit nagyon gyors adatfeldolgozásra terveztek. A Hadoop MapReduce modellen alapul (koncepcionálisan, nem kód szintjén), de olyan módon általánosítja és terjeszti ki azt, ami lehetővé teszi a hatékony felhasználását interaktív lekérdezések készítéséhez vagy stream feldolgozáshoz is.

Telepítés/docker konténerek Spark-hoz

A Spark telepítése a klaszter típusától függően eltérő lépésekből állhat. Az alapvető Spark disztribúció a megfelelő bináris csomag letöltéséből, kicsomagolásából, valamint a környezeti változók és konfigurációs állományok beállításából áll [2]. Ha Hadoop támogatást szeretnénk használni, egy megfelelő Hadoop klasztert is telepítenünk kell, majd a Spark beállításait módosítani eszerint. Ezt követően a klaszter típusának megfelelően (Standalone, Apache Mesos, Hadoop YARN, Kubernetes) az egyes node-okra telepíteni kell a Spark komponenseket (master primary, worker, stb.). Részletek a hivatalos dokumentációban [3] olvashatók. Mi a lokális gépre történő telepítés, illetve saját fizikai klaszter összeállítása helyett egy előre előkészített docker container stack-et fogunk használni, ami tartalmazza a Hadoop klaszter elemeit is. A következőkben bemutatott stack az összes docker image-t elindítja, ami szükséges a Spark azonnali használatához. A következő példák tetszőleges gépen futtathatók, ahol telepítve van a Dokcer környezet, valamint a Git verziókövető kliens.

A Spark stack indításához először töltsük le a docker leírókat és a teljes stack konfigurációt tartalmazó git repository-t a következő parancs segítségével:

A letöltött docker-hadoop-spark-workbench mappa két stack konfigurációt tartalmaz: docker-compose.yml és docker-compose-hive.yml. Mi az elsőt fogjuk használni, amely egy Hadoop klasztert hoz létre egy Spark master primary és egy worker node-dal, valamint néhány támogató eszközt tartalmazó container-rel. A stack egészen pontosan a következő container-eket indítja el:

A docker-compose-hive.yml a fenti stack-hez még Apache Hive támogatást is ad, hiszen a Spark Hive táblákból is tud dolgozni. Ha ilyet szeretnénk használni, ezt a konfigurációt kell elindítanunk.

A stack indításához lépjünk be a docker-hadoop-spark-workbench mappába, és adjuk ki a következő docker parancsot:

A stack sikeres indítása után a következő paranccsal ellenőrizhetjük, hogy minden container sikeresen elindult:

Windows felhasználók figyelem!
Amennyiben Windows host-on indítjuk a docker stack-et, és a következő hibaüzenetet kapjuk "ERROR: for namenode Cannot start service namenode: Ports are not available: listen tcp 0.0.0.0:50070: bind: An attempt was made to access a socket in a way forbidden by its access permissions.", az azért van, mert a dokcer daemon bizonyos portokat lefoglal magának, amivel ütközik a docker konfigurációnk. A legegyszerűbb megoldás, ha módosítjuk a docker-compose.yml fájlt, és minden 50000-en felüli portszám esetén a mapping-et átírjuk, pl. - "50070:50070" helyett legyen - "30070:50070".

Az Apache Spark Python parancssori kliense

Az Apache Spark beépített parancssori klienssel [5] rendelkezik, amivel könnyen és gyorsan tudunk hozzáférni a Spark motorhoz, és interaktív módon tudunk adatelemzéseket végezni. A Spark kliensnek két változata van, az egyik a spark-shell, ami Scala utasításokat tud végrehajtani. A Scala [6] JVM (vagy akár JavaScript motor) fölött futó erősen típusos OO és funkcionális programnyelv, fejlett többszálúság kezeléssel (így ideális választás Spark-hoz). A másik interaktív parancssori kliens a pyspark, amely ugyanazt a funkcionalitást nyújtja, mint a spark-shell, de Python [7] nyelvű utasításokat tud végrehajtani. Ezáltal egy dinamikus típussal rendelkező, szkript nyelv segítségével tudunk hozzáférni a Spark API-khoz. Jelen olvasóleckék során a pyspark-ot fogjuk használni, a spark-shell és Scala alapú Spark programokról a 9g_a_BigData-spark-scala-cli-java-SPOC olvasóleckéből tudhat meg többet az olvasó.

A pyspark indításához be kell lépnünk a spark-master docker container-be, ott érhető el a bekonfigurált kliens. Adjuk ki a következő utasítást a parancssorból:

Az interaktív parancssorunk üzemképes, ahogy látjuk két előre definiált változó is rendelkezésünkre áll a Spark kommunikációhoz: sc (SparkContext) és spark (SparkSession). Ezeket tetszőleges Python utasításban használhatjuk. A shell tulajdonképpen egy teljes értékű Python értelmező és végrehajtó, bármilyen Python programot írhatunk benne, de természetesen mi a Spark fölötti programok készítéséhez használjuk. Írjuk is meg az első Spark programunkat. Töltsük be a lokális fájlrendszer /spark/README.md állományát, azaz készítsünk ebből egy DataFrame-et:

Az első paranccsal létrehoztunk egy df változót, ami egy DataFrame objektum lett (dinamikus nyelv révén nem lehet a típusos Dataset API-t használni, a tárolt elemek string-ek, amik a lokális fájlrendszeren található szövegfájl egyes sorai. Fontos megjegyezni, hogy ha nem használjuk a file:// protokollt, akkor a Spark alapból a HDFS-ről próbálja betölteni a fájlt, mert a docker image-ek előre be vannak konfigurálva így. Látszik, hogy a shell alapból az újabb DataFrame API-t használja, de bármilyen típusú DataFrame-ből könnyedén RDD-t is készíthetünk. Hajtsunk végre transzformációkat, illetve akciókat a DataFrame-en, amihez be kell töltenünk a pyspark.sql.functions modult.

A df a szöveg fájl sorait tartalmazza egy oszlopban, ebből a split segítségével készítettünk egy spli_col nevű oszlopot az eredeti DataFrame value oszlopából, ami szóközök mentén szétdarabolja a fájl szövegét és szavak tömbjét tárolja egy-egy sorban. Ezután az explode művelet "szétrobbantja" ezeket a sorokat, és a tömb által tartalmazott minden egyes szó külön sorba kerül (lásd show() eredménye). Az oszlopnak az alias() metódussal nevet is adunk (exploded). A DataFrame így előállított exp_col oszlopán aztán meghívjuk a count műveletet, ami visszaadja a DataFrame megfelelő oszlopában lévő elemek (a szöveg szóinak) számát, ami 568. A második esetben előbb egy distinct() transzformációt is elvégzünk az oszlopon, ami csak a különböző szavakat tartja meg a transzformált DataFrame-ben, amire ismét meghívva a count()-ot már csak 289 lesz az eredmény (ennyi különböző szó van a szövegfájlban).

Book to read, knowledge, lecture, open book, reading book icon 2. fejezet

Word count és árfolyam átlag feladatok megoldása

Word count probléma lokális input fájllal

Most, hogy megismerkedtünk a pyspark alapvető használatával, oldjuk meg a klasszikus MapReduce példa problémát, a word count, azaz szó összeszámoló problémát. Számoljuk össze a README.md fájlban szereplő szavak előfordulásainka számát. Ez klasszikusan egy map/reduce programmal könnyen megoldható feladat, ami a Spark-ban is nagyon könnyedén megfogalmazható, hiszen mind a map mind a reduce műveleteket támogatja (sőt azoknál sokkal többet). Lássuk a feladat megoldását:.

A fájl betöltést és a split valamint explode szerepét már láttuk. A szavak oszlopán elvégzünk egy groupBy műveletet, ami egy GroupedData objektumot állít elő, ami az azonos szavakat csoportosítva tartalmazza. Ezen már csak egy count() műveletet kell meghívnunk, ami a csoportosított sorok számát összegzi, és egy új oszlopként (count) hozzáilleszti az eredeti DataFrame-hez. Ezen a DataFrame-en a collect()-et meghívva, megkapjuk az eredmény táblázat sorait egy tömbben (Row típusú elemek tömbjét két oszloppal). Ha például egy konkrét szó előfordulásának számára vagyunk kíváncsiak, használhatjuk a filter műveletet a fenti módon.

Természetesen nem kell minden lépés eredményét egy változóban eltárolni, csak a példa szemléletesebbé tétele érdekében csináltuk. A műveleteket össze lehet kötni egy hívási láncba így:

Word count probléma HDFS input fájllal

A fenti megoldást alkalmazzuk HDFS-en tárolt szövegállományokra is. Először másoljuk fel HDFS-re a code/5g_BigData-mapred-SPOC/input mappában található file01 és file02 állományokat, majd a fenti word count megoldást hajtsuk végre azokat használva bemenetként. Átmásolhatjuk a fájlokat először a namenode container-be, majd onnan a hadoop klienssel feltölthetjük a HDFS-re, ám a Hue használatával egyszerűbben is felmásolhatjuk a fájlokat. Nyissuk meg a böngészőben a http://localhost:8088/filebrowser/#/ címet, és hozzunk létre egy új könyvtárat a New gomb segítségével ("word_count"), majd az Upload-ot használva tallózzuk ki a két fájlt és töltsük fel őket HDFS-re közvetlenül a lokális gépünkről (lásd ábra).

Ha ez megvan, akkor ezekből a fájlokból töltsük be az input DataFrame-et, és erre hajtsuk végre a fenti megoldások egyikét:

A betöltésnél használhatunk wildcard-okat több fájl együttes betöltésére. Mivel nem adtunk meg protokollt, így alapértelmezetten a HDFS-en keresi a mappát (megegyezik a hdfs:// használatával).

Árfolyam átlagok számítása országonként

Oldjuk meg a 5g_BigData-mapred-SPOC gyakorlati olvasólecke 3. fejezetében kitűzött feladatot. Ehhez másoljuk fel a code/5g_BigData-mapred-SPOC/data/daily_csv.csv fájlt HDFS-re (a fenti módon), és írjunk olyan Spark programot, amely országonként kiszámítja az átlagos USD árfolyam értéket. Lássuk a megoldást:

Book to read, knowledge, lecture, open book, reading book icon 3. fejezet
Standalone Spark alkalmazás Python nyelven

Ebben a fejezetben bemutatjuk, hogyan lehet olyan önálló alkalmazást írni Python nyelven, amely végrehajtható feladatként átadható a Spark-nak. Azaz ebben az esetben nem az interaktív klienst használjuk a Spark API eléréséhez, hanem önálló Python programot készítünk, ami természetesen használja a Spark könyvtárakat. Oldjuk meg a fenti word count problémát egy önálló Python programmal. Ehhez szükség van a pyspark Python modul-ra, hogy elérhessük a Spark API funkcionalitását.

Készítsük el az önálló alkalmazás Python kódját (code/9g_b_BigData-spark-python-cli-python-SPOC/PySparkWordCount.py):

Az interaktív shell megoldása a Python API-ra átültetve. Először szükségünk van egy SparkSession objektumra, majd az oszlop transzformációk sorozatával előállítjuk az eredményt. Végül a collect() eredményén végigiterálva a végeredményt a konzolra írjuk.

A PySparkWordCount.py fájlt másoljuk fel a spark-master container-be:

Ezután lépjünk be a spark-master container-be és indítsunk egy új Spark job-ot a feltöltött Python fájl segítségével:

A spark-submit programmal adhatunk át egy önálló alkalmazást futtatásra, a futtatandó Python szkriptet kell csak paraméterben megadnunk. A program beégetett módon a HDFS /word_count mappában lévő összes fájlra számolja ki a szó előfordulási gyakoriságokat.

Check mark icon set. Green OK or V tick, red X, exclamation mark ...További feladatok

  1. Írjuk meg a valutaárfolyamok átlagát kiszámító Spark programot önálló Python alkalmazásként, és hajtsuk végre azt a Spark klaszteren!
  2. Írjuk meg a sorted word count programot Spark-ban (pyspark-ot használjuk(, ami az eredeti word count problémától abban tér el, hogy az eredményt az előfordulási gyakoriságok számában csökkenő sorrendbe rendezve adja meg!
  3. Töltsük be a korábbi leckék során bemutatott personal_entries.json és billing_entries.json, valamint sales_entries.csv állományokat Spark-ba, egyesítsük (union) a három adathalmazt és csoportosítsuk az adatokat PID alapján!

Referenciák

[1] https://spark.apache.org/

[2] https://spark.apache.org/downloads.html

[3] https://spark.apache.org/docs/latest/cluster-overview.html

[4] https://jupyter.org/

[5] https://spark.apache.org/docs/latest/quick-start.html

[6] https://www.scala-lang.org/

[7] https://www.python.org/

[8] https://maven.apache.org/

[9] https://www.py4j.org/