Big Data – Hadoop

In dieser Gruppe diskutieren wir über Einsatzszenarien, Technologien und Trends zu Big Data und hadoop.

Hot Wheels - Folge 3: Datenanalyse und Machine Learning mit MongoDB und Spark

Ulrich Plogas
Experte
In der vorangegangenen Folge wurde auf die Verwendung der pyspark-Shell eingegangen. In diesem Abschnitt wird nun ein komfortablerer Ansatz vorgestellt.

Sogenannte Notebook-Anwendungen haben in den letzten Jahren eine stetig wachsende Anwendergemeinde gefunden, da sie einerseits komfortabel genutzt werden können sowie moderne Programmierkonzepte unterstützen und andererseits recht leistungsfähig sind. Im Open-Source-Umfeld ist hier die Anwendung „Apache Zeppelin“ zu nennen, die über eine Vielzahl von Interpretern verfügt und beispielsweise die Spark-APIs für Java, Scala, Python und R unterstützt.

Die Installation von Zeppelin wurde bereits in Folge 1 beschrieben. Für die Nutzung von Zeppelin ist zunächst die Anwendung (Server) mit
zeppelin-daemon.sh start
zu starten. Anschließend kann im Browser die Notebook-Anwendung aktiviert werden. Dies erfolgt – wenn nicht anders festgelegt – über den Port 8080.
Um Zeppelin in Verbindung mit Spark und MongoDB nutzen zu können, muss der Connector bekannt gemacht werden. Dazu ist in das Interpreter-Menü zu wechseln, beispielsweise mit
http://40.91.212.116:8080/#/interpreter
und für den Spark-Interpreter unter „artifacts“ die Bibliothek „org.mongodb.spark:mongo-spark-connector_2.11:2.2.2“ hinzuzufügen.

Nun sollte dem Zugriff auf die MongoDB nichts mehr im Wege stehen, was sich mit der in Folge 2 generierten Collection überprüfen lässt:
Nachdem die prinzipielle Nutzung von Zeppelin mit Spark und der MongoDB getestet wurde, wird nun eine andere Aufgabenstellung betrachtet, die das Wikipedia-Projekt in den Mittelpunkt stellt. Damit wird dem Potenzial der dokumentenorientierten Datenbank MongoDB stärker Rechnung getragen. Allerdings wird im Folgenden bei der lokalen Anwendung von Spark auf den Download der gesamten Wikipedia verzichtet, weil alleine die englische Version vom August 2017 mehr als 25 GByte beansprucht und wohl die Grenzen der lokalen Installation sprengen würde. Statt dessen wird sich auf die offiziellen Wikipedia Page Views konzentriert, die stündlich aggregiert und aktuell unter https://dumps.wikimedia.org/other/pageviews/ abgelegt werden.
Das Prinzip der aggregierten Daten ist in https://wikitech.wikimedia.org/wiki/Analytics/Archive/Data/Pagecounts-all-sites ausführlich dokumentiert.

Zunächst sind ein paar Beispieldaten in die MongoDB zu laden. Dazu eignen sich vielleicht die Daten vom 9. November 2016, dem Tag nach der Wahl des neuen amerikanischen Präsidenten. Dafür gibt es – wie so oft – mindestens zwei Alternativen. Die erste Möglichkeit besteht darin, den vollständigen Datensatz in einem String zu speichern. Zusätzlich wird ein Zeitstempel hinzugefügt, der jedoch auf den Anfang der Periode gesetzt wird.

Der Ansatz hat den Nachteil, dass die Aufteilung der Daten beim Lesen vorgenommen werden müssten. Das ist zwar prinzipiell möglich, jedoch erfordert dann jeder aktive Lesevorgang eine Umformung der Daten. Bild 2 zeigt den prinzipiellen Ansatz für PySpark.
Die Alternative besteht darin, die Daten bereits gesplittet in der MongoDB zu persistieren. Auch hier stehen verschiedene Möglichkeiten für die Aufteilung zur Verfügung. Eine einfache Möglichkeit stellt der Split bei der Definition des Dataframes dar. Da im Ergebnis aus der Aufteilung nur Teilstrings entstehen, muss man in den gewünschten Typ umformen oder – einfacher – eine implizite Analyse der Daten durchführen. Auch hier werden zusätzliche Spalten hinzugefügt. Die Umsetzung für PySpark kann dem Bild 3 entnommen werden.
Allerdings sind diesem Ansatz Grenzen gesetzt, wenn komplexere Strukturen erzeugt oder abgebildet werden müssen. Hier muss auf eine andere Sprache – üblicherweise Scala – ausgewichen werden, die statische Typdefinitionen erlaubt und die Möglichkeit bietet, beliebige typfeste Strukturen zu definieren, die wiederum auf DateFrames (oder DataSets) abgebildet werden können.

Das gilt im umgekehrten Fall auch für das Lesen der Daten aus der MongoDB. Diese ist eine Dokumenten-orientierte Datenbank, deren Elemente nicht selten komplexe und tief geschachtelte Strukturen besitzen. Wie bereits in dem vorherigen Beitrag erwähnt, kann es Probleme bei der impliziten Bestimmung dieser Datenstrukturen geben oder man möchte auch nur einen Teilausschnitt der Daten laden.
Sofern Scala zum Einsatz gelangt, ist in unserer Notebook-Umgebung zunächst der entsprechende Interpreter auszuwählen und das initiale SparkSession-Objekt zu definieren, beispielsweise:
%spark
import com.mongodb.spark._
import org.apache.spark.sql.SparkSession
// Erstellen eines Spark Context sc
// Datenquelle ist Collection scaldb.scalCol
val sc = SparkSession.builder
.master("local")
.appName("test")
.config("spark.mongodb.input.uri", "mongodb://10.1.0.4:27035/scaldb.scalCol")
.config("spark.mongodb.output.uri", "mongodb://10.1.0.4:27035/scaldb.scalCol")
.getOrCreate
Um die Daten in der gewünschten Struktur zu laden, ist zunächst die benötigte Klassenstruktur zu definieren. Dies kann bei tieferer Verschachtelung iterativ erfolgen:
// Die gewünschte Datenstruktur in Fallklassen abbilden
abstract class DemoStruct
case class L4(sSessionId: String, iRequestTimestamp: Integer, iRequestEndTimestamp: Integer,
iProcessId: String, iProcessStepId: String, iRank: Integer) extends DemoStruct
case class L3(value: L4) extends DemoStruct
case class L2(ExportArray: Array[L3]) extends DemoStruct
case class L1(sCity: String, sContinent: String, sCountry: String, sRegion: String,
iTime: Integer, aDetailData: L2) extends DemoStruct
case class R(id: String, value: L1) extends DemoStruct
Die so beispielhaft generierte Struktur hat folgendes Aussehen:
root
|-- id: String
|-- value: Struct
| |-- sCity: String
| |-- sContinent: String
| |-- sCountry: String
| |-- sRegion: String
| |-- iTime: Integer
| |-- aDetailData: Struct
| | |-- ExportArray: Array
| | | |-- element: Struct
| | | | |-- value: Struct
| | | | | |-- sSessionId: String
| | | | | |-- iRequestTimestamp: Integer
| | | | | |-- iRequestEndTimestamp: Integer
| | | | | |-- iProcessId: Integer
| | | | | |-- iProcessStepId: Integer
| | | | | |-- iRank: Integer
Mit dieser Definition können nun Daten aus einer entsprechenden Collection in einen DataFrame geladen und verarbeitet werden. Dazu ist die load-Methode aus der Klassenbibliothek des Mongo-Spark-Connectors zu verwenden. Durch den expliziten Verweis auf die Strukturklasse kommen die implizierten Helper Methods nicht zur Anwendung:
// einen DataFrame definieren
val df = MongoSpark.load [R] ( sc )

In dieser Folge wurden verschiedene Ansätze beschrieben, um Daten aus externen Datenquellen zu laden sowie aus der MongoDB bereitzustellen. Dazu wurde die Notebook-Anwendung Zeppelin verwendet. Die nächste Folge wird sich mit den Möglichkeiten von SparkSQL beschäftigen.



Hot Wheels - Folge 1: Datenanalyse und Machine Learning mit MongoDB und Spark
https://www.qualiero.com/community/big-data-hadoop/big-data-einsatzszenarien/hot-wheels-folge-1-datenanalyse-und-machine-learning-mit-mongodb-und-spark.html

Hot Wheels - Folge 2: Datenanalyse und Machine Learning mit MongoDB und Spark
https://www.qualiero.com/community/big-data-hadoop/big-data-einsatzszenarien/hot-wheels-folge-2-datenanalyse-und-machine-learning-mit-mongodb-und-spark.html

Neueste Mitgliederaktivitäten

Diesen Community Beitrag weiterempfehlen