Hadoop Streaming

Összefoglalás

Az olvasó ebből az olvasóleckéből megismerkedhet az Apache Hadoop Streaming eszközzel. Egyrészt bemutatjuk a streaming alapvető működési elvét, valamint azt, hogy hogyan illeszkedik ez bele a teljes Hadoop ökoszisztémába. A streaming működési elve mellett gyakorlati példákon keresztül ismerkedhet meg az olvasó azzal, hogyan kell használni a streaming eszközt ahhoz, hogy a klasszikus Java MapReduce programok helyett egyszerű futtatható állományokkal vagy szkriptekkel valósítsa meg a map és reduce függvényeket, valamint futtassa le ezt job-ként a Hadoop klaszteren.
A lecke fejezetei:
  • 1. fejezet: Az Apache Hadoop Streaming bemutatása, alapvető működési elve, bevezető példa (olvasó)
  • 2. fejezet: A klasszikus word count probléma megoldása Python szkript segítségével, streaming-en futtatva (olvasó)
  • 3. fejezet: Árfolyam átlag számítási példa megoldása Python szkript segítségével, streaming-en futtatva (olvasó)
Téma típusa: gyakorlati
Olvasási idő: 45 perc

Book to read, knowledge, lecture, open book, reading book icon 1. fejezet
A Hadoop Streaming eszköz

A Hadoop streaming egy eszköz, amelyet az alap Hadoop disztribúció tartalmaz. Segítségével tetszőleges futtatható állomány vagy szkript használható mint map és reduce megvalósítás egy MapReduce job-hoz. Ezt nagyon egyszerűen a Unix Stream-ek használatával éri el, amely interfészt képez a Hadoop klaszter és a map/reduce megvalósítások futtatható állományai között. Mind a mapper mind a reducer a standard bemenetről olvassa be az adatokat, és a standard kimenetre írja azokat. A beolvasás soronként történik. Ezáltal a map/reduce megvalósítás tetszőleges futtatható állomány lehet, ami a standard bemenetet olvassa és az eredményét a standard kimenetre írja. Továbbá a Hadoop Streaming több szkript nyelvet is támogat map/reduce megvalósításhoz: Python, Perl, R, vagy PHP.

https://d2h0cx97tjks2p.cloudfront.net/blogs/wp-content/uploads/sites/2/2020/03/hadoop-streaming.jpg

Egy nagyon egyszerű MapReduce program példa a streaming használatával a következő lehet:

Az egyes paraméterek jelentése a következő:

ParaméterLeírás
-input myInputDirsA mapper bemenetét tartalmazó könyvtár
-output myOutputDirA reducer kimenetének helye
-mapper /bin/catA mapper funkciót megvalósító futtatható program (fájl tartalmának kiírása)
-reducer /usr/bin/wcA reducer funkciót megvalósító futtatható program (unix word count parancs)

https://d2h0cx97tjks2p.cloudfront.net/blogs/wp-content/uploads/sites/2/2020/03/Hadoop-Streaming-1.jpg

A streaming működésének részletei a következők:

Book to read, knowledge, lecture, open book, reading book icon 2. fejezet
Word count megoldása Python szkript és streaming segítségével

Tekintsük a klasszikus szó összeszámlálós feladatot. Ehhez indítsuk el a Docker alapú Hadoop klasztert, amit a 3g_BigData-hadoop-SPOC olvasólecke 1. fejezetében ismertettünk. A map és reduce szkripteket Python nyelven valósítjuk meg, amiket a namenode container-re fel kell másolnunk, hogy a streaming segítségével futtatni tudjuk őket.

Először lássuk a mapper funkcionalitás Python kódját (mapper.py):

A standard inputról egyszerűen beolvassuk a sorokat, majd whitespace-k mentén széttördeljük (azaz szavakra bontjuk). Minden egyes szóhoz standard kimenetre írunk egy sort, ahol maga a szó a kulcs, majd egy tabulátor (kulcs és érték elválasztásához), utána pedig egy 1-es érték.

A reducer program a következőképp fest (reducer.py):

Miután a sorokat kulcs-érték párokká alakítottuk, kihasználjuk, hogy a kulcsok szerint rendezve érkeznek az adatok. Így tulajdonképpen az egyes szavak sorozatának hosszát kell összeszámolnunk, és amikor változik a szó (tudjuk, hogy a másikból már biztosan nem lesz több), a sorozat hosszát, azaz az előző szó összes előfordulásának a számát kiírjuk standard kimenetre a szóval, mint kulccsal együtt.

Másoljuk át a szkripteket a HDFS namenode-ra a következő docker parancssal:

Mielőtt kipróbálnánk őket a MapReduce job során, telepítsük fel a python programcsomagot (apt install python) és teszteljük le a szkripteket a következő módon a namenode container-en belül (vagy bárhol, ahol van Python):

Ha a telepítés sikeres, és a fenti kimenetet kapjuk, akkor telepítenünk kell a fentiek szerint a python programcsomagot a nodemanager gépre is. Ezután indítsunk egy streaming job-ot a namenode container-en belül az alábbi módon (az input könyvtár létrehozását és feltöltését fájlokkal a 3g_BigData-hadoop-SPOC 1. fejezetében mutattuk be):

Ahhoz, hogy a Python szkripteket meg tudjuk hívni a job végrehajtása során, fel is kell azokat tölteni a megfelelő node-ra, a -file paraméterek pontosan ezt csinálják. Amennyiben a job sikeresen lefutott, az output könyvtárba előálló fájl tartalma a következőképpen néz ki:

Book to read, knowledge, lecture, open book, reading book icon 3. fejezet
Árfolyam adatok átlagának kiszámítása Python map/reduce segítségével

Valósítsuk meg a xy fejezetben bemutatott Java MapReduce job-ot Python szkriptek és streaming használatával. A feladat a code\5g_BigData-mapred-SPOC\data\daily_csv.csv táblázatban szereplő valuta árfolyam értékek átlagát kiszámítani országonként.

A mapper funkció (currency_mapper.py) megvalósítása a következő:

A megvalósítás igen egyértelmű. A standard bemeneten fogadjuk a bemenő csv fájl sorait. A fejléc sort eldobjuk, minden más sort pedig a vesszők mentén feldarabolunk. Egy-egy sorhoz egy emit szükséges, azaz egy kiírást végzünk a standard kimenetre. A kulcs mindig az adott sorban megjelenő ország, az érték pedig az aktuális árfolyam adat.

A reducer megvalósítás (currency_reducer.py) a következő:

A word count példához nagyon hasonló módon implementálható. Országonként sorba rendezve kapjuk meg az értékeket, így pusztán annyi a feladatunk, hogy az árfolyam értékeket összeadjuk amíg a következő országhoz tartozó értéket nem kapunk. Ha ez megtörtént, akkor a kiíratásnál arra kell csak ügyelni, hogy ne az értékek összegét, hanem a darabszámmal leosztott változatát, azaz az átlagot írjuk ki.

Töltsük fel a két Python szkriptet, valamint a daily_csv.csv fájlt a namenode container-be. Mielőtt bármit csinálnánk, teszteljük le a parancssorból az elkészített Python fájlokat:

Másoljuk fel a szükséges fájlokat a namenode-ra:

Másoljuk fel a daily_csv.csv fájlt HDFS-re:

Most pedig futtassuk le a job-ot streaming segítségével:

Sikeres futtatás után a kimeneti könyvtárban előáll az eredmény:

Check mark icon set. Green OK or V tick, red X, exclamation mark ...További feladatok

  1. Módosítsuk a word count példát úgy, hogy akkor is működjön, ha a reducer nem sorba rendezve kapja meg az inputját, azaz gyűjtsük egy belső dictionary-be az előfordulások gyakoriságát, és a végén azt írassuk ki!
  2. Módosítsuk az árfolyam átlag számító példát úgy, hogy ne számítsuk ki az árfolyamok összegét, hanem az átlagot inkrementálisan [4], minden újabb input sor után frissítsük!
  3. Módosítsuk az árfolyam átlag számító példát úgy, hogy az átlag helyett a medián értéket számítsa ki!
  4. Írjunk egy Java MapReduce job-ot, amely egy lebutított streaming megoldást implementál, azaz a mapper és reducer funkció is egy külső végrehajtható állományt hívjon meg (Runtime.exec()), és standard bemenetükre továbbítsa az aktuális adatot, valamint a standard kimenetükről gyűjtse be azt és konvertálja megfelelően kulcs-érték párokra!

Referenciák

[1] http://hadoop.apache.org/docs/r1.2.1/streaming.html

[2] https://data-flair.training/blogs/hadoop-streaming/

[3] https://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/

[4] https://ubuntuincident.wordpress.com/2012/04/25/calculating-the-average-incrementally/