Spark SQL és Spark ML

Összefoglalás

Az olvasó ebből az olvasóleckéből mélyebb elméleti ismereteket szerezhet az Apache Spark két fontos moduljáról, a Spark SQL-ről, illetve a Spark ML-ről. Előbbi esetében áttekintést adunk a Spark strukturált adatok lekérdezéséhez nyújtott támogatásáról, az alkalmazott adat absztrakciókról, illetve az egyes programozói interfészekről. Rövid áttekintést kap az olvasó a gépi tanulási feladatról általában, annak típusairól és legygakrabban használt algoritmusairól. Végezetül a Spark gépi tanuló moduljának koncepcionális működését ismertetjük az olvasóval, ami után képes lesz akár önállóan is gépi tanuló folyamatok összeállítására.
A lecke fejezetei:
  • 1. fejezet: A Spark SQL bemutatása, kapcsolódása más technológiákhoz (olvasó)
  • 2. fejezet: Rövid áttekintő a gépi tanulás által megoldható problémákba, algoritmusok kategorizálása (olvasó)
  • 3. fejezet: A Spark ML (MLlib) gépi tanuló modul koncepcionális bemutatása példa kóddal együtt (olvasó)
Téma típusa: elméleti
Olvasási idő: 55 perc

 

Book to read, knowledge, lecture, open book, reading book icon 1. fejezet
Strukturált adat lekérdezés Spark fölött a Spark SQL segítségével

A Spark részét képezi a Spark SQL modul [1], ami a strukturált adatfeldolgozást támogatja. Funkcióját tekintve megegyezik az Apache Hive-val [2], ám bizonyos Hive hiányosságokat (mint például a MapReduce job-ok gyengébb hatékonyságát kis és közepes adathalmazokon vagy hogy a Hive a sikertelen lekérdezéseket nem tudja onnan folytatni, ahol hiba folytán leállt, hanem mindig a teljes lekérdezést újrafuttatja) kiküszöböl, és mint ilyen a Hive utódjaként tekinthető a Spark stack-en belül. Ez nem azt jelenti, hogy a Spark SQL konkurrens technológiák és vagy az egyiket használjuk vagy a másikat. Teljes körű az integráció és együttműködés a két eszköz között. Például a Spark SQL támogatja az adatok betöltését Hive táblákból [3] és a HiveQL lekérdező nyelvet is (lásd ábra), illetve maga az Apache Hive is tudja használni a Spark motort a lekérdezések végrehajtásához, nem csak a klasszikus MapReduce-t (sőt, az újabb Hive verziók már elavultnak jelölték a MapReduce használatát, és a későbbi verziókban meg is fog szűnni a támogatása).

https://spark.apache.org/images/sql-hive-arch.png

A Spark SQL integrálja a relációs adat feldolgozást a Spark funkcionális programozási modelljével. Több féle adatforrást is támogat, és az SQL-ben megfogalmazott lekérdezéseket Spark transzformációkká és akciókká alakítja. A klasszikus RDD API-hoz képest a Spark SQL sokkal több információt tartalmaz mind az adatok, mind pedig a rajtuk végzendő műveletek struktúrájáról, amit a Spark végrehajtáskor hatékonyan ki tud használni a végrehajtás optimalizálásához. A Spark elég jól elmossa a határvonalat az RDD-k és a relációs táblák között, azáltal hogy az integrációhoz a már korább olvasóleckékben ismertetett DataFrame API-t használja (lásd lenti ábra). Ez nagyobb optimalizálási lehetőséget ad a Spark kezébe, ezért a DataFrame/Dataset API a preferált mód a Spark SQL eléréséhez (noha lehetőség van közvetlen SQL végrehajtásra is).

https://i.pinimg.com/originals/c4/11/2b/c4112bb4bb2c8b7e61ce1425a0d10051.jpg

A Spark SQL fő rétegei

A Spark SQL az alábbi négy osztály könyvtárat használja a relációs és procedurális adatfeldolgozáshoz:

  1. Data Source API: egy univerzális API strukturált adatok betöltéséhez és tárolásához:

    • Beépített támogatás az Avro, CSV, Elasticsearch, JSON, JDBC, Parquet, Cassandra, stb. adatformátumokhoz és a HDFS, Hive, MySQL, stb. tároló rendszerekhez.
    • A Spark-core modul segítségével könnyen integrálható tetszőleges más BigData platformmal.
    • Python, Java, Scala és R API támogatása.
  2. DataFrame API: a Spark RDD fölötti absztrakció, adatok elosztottan tárolt gyűjteményének, amelyek névvel ellátott oszlopokba vannak szervezve. Felépítését tekintve megegyezik egy relációs adattáblával SQL esetén (a DataFrame-eket részletesen bemutattuk a 9e_BigData-spark-rdd-df-SPOC olvasóleckében).

  3. SQL interpreter és optimalizáló: egy funkcionális programnyelven (Scala) írt modul, amely:

    • A Spark SQL legfejletteb és legújabb komponense.
    • Egy általános keretrendszert ad fák transzformációjához, amely lekérdezések elemzéséhez/kiértékeléséhez, optimalizálásához, futtatási terv készítéséhez, stb.
    • Lehetővé teszi a költség alapú és szabály alapú optimalizálást (pl. Catalyst [4]), amelyek hatására az SQL lekérdezések sokkal gyorsabban lefutnak, mint az RDD fölötti műveletek. A Spark SQL és Hadoop teljesítmény összehasonlítást az alábbi ábra szemlélteti (forrás: edureka!).

    https://d1jnx9ba8s6j9r.cloudfront.net/blog/wp-content/uploads/2016/12/Performance-Spark-SQL-Vs-Hadoop-Spark-SQL-Edureka.png

  4. SQL szolgáltatás: ez a komponens a kiindulópontja a Spark strukturált adatokkal történő munkának. Ez lehetővé teszi DataFrame-ek létrehozását, valamint SQL lekérdezések végrehajtását.

Az alábbi Java kódrészlet a Spark SQL használatát szemlélteti (további részletekért lásd a Spark programozási útmutatót [5]):

A 6. sorban létrehozunk egy SparkSession objektumot, ami a megfelelő Spark kontextust reprezentálja (Spark és egyéb komponensek paraméterei, stb.). Ennek segítségével a 12. sorban létre tudunk hozni egy DataFrame-t a people.json fájl beolvasásával. A DataFrame-n közvetlenül is végezhetünk műveleteket (lásd 15. sor), de ahogy a 25. sorban látszik, a DataFrame-et egy ideiglenes SQL táblává is tehetjük, hogy aztán a Spark SQL segítségével (spark.sql hívás) közvetlenül SQL-el kérdezhessük le az adatokat.

Book to read, knowledge, lecture, open book, reading book icon 2. fejezet

Gépi tanulás bevezető

A gépi tanulási módszerek igen jelentős szerepet kapnak napjaink problémáinak megoldásában. Természetesen BigData feldolgozásához is számos felhasználási területe van. A gépi tanulás a mesterséges intelligencia egyik alterülete, ahova olyan statisztikai módszereket sorolunk, amelyek képesek véges számú ún. tanuló mintából egy olyan függvényt előállítani, mellyel később még sosem látott mintához is előállít nekünk bizonyos információt (amit megtanult). Tipikus gépi tanulási problémák:

Számos algoritmus létezik, amelyek különböző módszerekkel valósítják meg a "tanulást" (függvény illesztést). Ezeket az algoritmusokat alapvetően két csoportba sorolhatjuk:

https://www.normshield.com/wp-content/uploads/2017/01/MachineLearningDiagram.png

Az összes gépi tanuló módszer valamilyen jellemzők ún. prediktorok (feature) alapján próbálja megtanulni a mintához tartozó címkét. Például egy karakter felismerési feladatnál egy prediktor lehet a képen szereplő fekete és fehér képpontok egymáshoz viszonyított arány, stb. Ha egy táblába rendezzük az egyes mintákat úgy, hogy minden minta egy sor, és az oszlopok pedig az adott mintához tartozó prediktorok értéke, illetve a megtanulni kívánt osztály címkéje, egy klasszikus tanuló táblát kapunk. A legtöbb tanuló algoritmus ilyen bemeneten dolgozik (amelyek tipikusan BigData környezetben is rendelkezésre állnak). A tanulás tipikus módja, hogy ezt a tanuló adathalmazt 3 részre osztjuk:

A leggyakrabban használt gépi tanuló algoritmusokat a következő URL jól összefoglalja: https://www.analyticsvidhya.com/blog/2017/09/common-machine-learning-algorithms/

Book to read, knowledge, lecture, open book, reading book icon 3. fejezet

Gépi tanulás Spark fölött, a Spark ML (MLlib) modul bemutatása

Az MLlib [7] (vagy újabban Spark ML) modul a Spark gépi tanulást támogató komponense. Segít abban, hogy a Hadoop klaszteren tárolt adatainkra gyorsan, könnyedén és jól skálázható módon alkalmazhassunk gépi tanuló algoritmusokat. Az MLlib az újabb verzióktól kezdve a DataFrame API-ra épül, noha az RDD alapú API is karbantartás alatt marad. Magas szinten az MLlib a következő funkcionalitást nyújtja:

MLlib csővezeték fogalmak

Az MLlib egységesíti és standardizálja a gépi tanuláshoz használható API-kat, ezzel megkönnyítve több algoritmus egyetlen ún. csővezetékbe (pipeline/workflow) szervezését. Gépi tanulás során nagyon gyakori, hogy egy modell tanítás több algoritmus egymás után történő végrehajtásával áll elő (adat előkészítés, feature kinyerés, modell tanítás, stb.), az MLlib ilyen pipeline-ok felállítását könnyíti meg az alábbi absztrakciók felhasználásával:

Transformer-ek

A Transformer-ek absztrakció magában foglalja a feature transzformációkat és a betanított modelleket. Technikailag egy Transformer egy transform() metódust implementál, amely egy DataFrame-ből egy másik DataFrame-et állít elő, tipikusan egy vagy több oszlop hozzáadásával. Például:

Estimator-ok

Az Estimator a klasszikus tanuló algoritmusok általánosítása, vagy bármely olyan algoritmusé, amely tanulást vagy illesztést végez adatokon. Technikailag egyEstimator egy fit() metódust implementál, ami egy DataFrame-et fogad és egy Model-t állít elő, ami egy Transformer. Például a LogisticRegression tanuló algoritmus egy Estimator, aminek a fit() metódusa betanít (előállít és visszaad) egy LogisticRegressionModel-t, ami egy Model, ezáltal egy Transformer is.

Parameter-ek

Mind a Transformer-ek mind az Estimator-ok egy közös API-t, a Parameter-t használják a paramétereik leírásához. AParam egy névvel ellátott paraméter. A ParamMap pedig egy (paraméter, érték) párokból álló halmaz. Az algoritmusokat két módon paraméterezhetjük:

  1. Beállíthatjuk közvetlenül egy példányon, pl. ha az lr egy példány a LogisticRegression algoritmusból, meghívhatjuk rajta a lr.setMaxIter(10) beállítást, hogy az lr.fit() maximum 10 iterációt használjon.
  2. Átadhatunk egy ParamMap-et a fit() vagy transform() metódusnak. A ParamMap-ben lévő paraméterek felülírnak bármely olyan paramétert, amit korábban a setter metódussal állítottunk be.

ML pipeline működése

Egy Pipeline fázisok sorozata, ahol minden egyes fázis vagy egy Transform vagy egy Estimator. Ezek a fázisok egymás után sorba rendezve kerülnek futtatásra, és a bemenő DataFrame minden fázis során módosul. Egy Transformer fázis során annak transform() metódusa hívódik meg a DataFrame-re. Az Estimator fázisok esetén a fit() metódus hívódik meg, ami előállít egy Transform-ot (ami ezek után a PipelineModel vagy más néven illesztett Pipeline részévé válik), és ennek a Transform-nak a transform() metódusa kerül meghívásra a DataFrame-en.

Vegyük az alábbi egyszerű szöveges dokumentum feldolgozó tanulási folyamatot, amin aztán a fent leírt pipeline koncepciót illusztráljuk:

A Pipeline tanítás idejű felhasználása.

https://spark.apache.org/docs/latest/img/ml-Pipeline.png

A fenti képen a felső sor egy három fázisú Pipeline-t ábrázol. Az első kettő (Tokenizer és HashingTF) Transformer (kékkel jelölve), míg a harmadik (LogisticRegression) egy Estimator (piros). Az alsó sor a csővezetéken átfolyó adat áramlását reprezentálja, ahol minden henger egy DataFrame-et jelöl. A Pipeline.fit() metódus az eredeti DataFrame-en (Raw text) kerül meghívásra, amely nyers szöveges dokumentumokat és címkéket tartalmaz. A Tokenizer.transform() metódus szétbontja a nyers szöveges dokumentumokat szavakra, és egy a szavakat tartalmazó új oszlopot hozzáad a DataFrame-hez (Words). A HashingTF.transform() metódus konvertálja a szavak oszlopot feature vektorokra, amely vektorokat egy új oszlopként hozzáad a DataFrame-hez (Feature cectors). Ezután, mivel a LogisticRegression egy Estimator, a Pipeline először meghívja a LogisticRegression.fit() metódust, hogy előállítson egy LogisticRegressionModel-t. Ha a Pipeline-nak lenne több fázisa, akkor meghívná az előállt LogisticRegressionModel.transform() metódust is a DataFrame-en mielőtt azt továbbadná a következő fázisnak.

Maga a Pipeline is egy Estimator, így miután a Pipeline.fit() meghívódik, előáll egy PipelineModel, ami egy Transformer. A PipelineModel-t aztén tesztelési időben tudjuk használni.

A PipelineModel tesztelés idejű felhasználása.

https://spark.apache.org/docs/latest/img/ml-PipelineModel.png

A fenti képen látható PipelineModel-nek ugyanannyi fázisa van, mint az eredeti Pipeline-nak, azzal a különbséggel, hogy a Pipeline összes Estimator fázisa Transformer lett. Amikor a PipelineModel.transform() meghívódik a test adathalmazon, a DataFrame végighalad az illesztett Pipelineminden egyes fázisán sorban. Minden fázis transform() metódusa módosítja a DataFrame-et, majd továbbadja a következő fázisnak. A Pipeline és PipelineModel biztosítja, hogy a tanuló és teszt adatok ugyanazokon a feature feldolgozó fázisokon haladnak végig. További részletek a hivatalos dokumentációban [8] olvashatók.

A fenti ML pipeline Java nyelvű megvalósítása az MLlib segítségével a következőképp néz ki (a segéd osztályok kódját lásd a Spark GitHub repository-ban [9]):

Check mark icon set. Green OK or V tick, red X, exclamation mark ...Ellenőrző kérdések

  1. Mire szolgál a Spark SQL?
  2. Milyen adatforrásokból tud dolgozni a Spark SQL?
  3. Milyen kapcsolatban áll egymással a Spark SQL és az Apache Hive?
  4. Mik a Spark SQL megvalósítás főbb rétegei/programozói interfészei? Röviden ismertesd milyen feladatot látnak el!
  5. Milyen optimalizációt végez a Catalyst? Ismersz-e más optimalizációkat, amiket a Spark alkalmaz?
  6. Milyen főbb csoportokba sorolhatjuk a gépi tanuló algoritmusokat?
  7. Mi a különbség a train, dev és test adathalmazok között?
  8. Röviden magyarázd el az MLlib által alkalmazott Pipeline absztrakció lényegét!
  9. Milyen típusú fázisokból állhat egy Pipeline? Hogyan működnek ezek a fázisok?
  10. Mi a különbség a Pipeline és a PipelineModel között?

Referenciák

[1] https://spark.apache.org/docs/latest/sql-programming-guide.html#datasets-and-dataframes

[2] http://hive.apache.org/

[3] https://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html

[4] https://databricks.com/glossary/catalyst-optimizer

[5] https://spark.apache.org/docs/2.1.0/sql-programming-guide.html

[6] https://towardsdatascience.com/metrics-to-evaluate-your-machine-learning-algorithm-f10ba6e38234

[7] https://spark.apache.org/docs/latest/ml-guide.html

[8] https://spark.apache.org/docs/latest/ml-pipeline.html

[9] https://github.com/apache/spark/tree/master/examples/src/main/java/org/apache/spark/examples/ml