Big Data – Hadoop

In dieser Gruppe diskutieren wir über Einsatzszenarien, Technologien und Trends zu Big Data und hadoop.

Hot Wheels - Folge 2: Datenanalyse und Machine Learning mit MongoDB und Spark

Experte
Nachdem in der ersten Folge der allgemeine Ansatz beschrieben wurde, wird hier auf Spark und die Interaktion mit der MongoDB eingegangen.

Spark ist ein Open-Source-Projekt für die verteilte Verarbeitung von Daten. Wie andere Frameworks basiert es auf der Methode gerichteter azyklischer Graphen (Direct Acyclic Graphs – DAG), ist aber deutlich flexibler als beispielsweise das Map-Reduce-Framework. Spark unterscheidet Transformationen und Aktionen. Erstere werden nur mit Metadaten ausgeführt und benötigen kaum Ressourcen, letztere mit Nutzdaten, jedoch – wenn immer möglich – im Hauptspeicher.
Strukturen für die verteilte Verarbeitung von Daten bestehen typischerweise aus einem Controller (Master) sowie den ausführenden Exekutoren (Workers). Der Master in Spark ist ein Cluster Manager (Hadoop YARN oder Mesos), ein Worker ist ein Executor, der ein oder mehrere Spark-Tasks ausführen kann. Die Kommunikation mit der Spark-Umgebung sowie die Erstellung der für die Lösung der jeweiligen Aufgabenstellung benötigten Spark Session (früher Spark Context) erfolgt über den Spark Driver. Daten werden entweder aus externen Datenquellen (über existierende Treiber oder, wie in unserem Beispiel, über zusätzliche Komponenten - hier der MongoDB Spark Connector) oder dem unterliegenden Filesystem (z.B. HDFS) gelesen oder dorthin geschrieben. Die nachfolgende Abbildung Bild 1 verdeutlicht die verteilte Installation. Im Falle einer lokalen Installation wird auf den Cluster Manager verzichtet und alle Komponenten auf einem Knoten installiert (Bild 2).

Die ursprüngliche logische Abstraktion für die Verarbeitung von Daten in Spark ist das Resilient Distributed Dataset (RDD) – ein Container für beliebige, typisierte, partitionierte und statische Daten. Da Spark immer häufiger auch für die Verarbeitung strukturierter Daten eingesetzt wird, wurde dieser Ansatz um DataSets und DataFrames erweitert, die auf RDDs aufsetzen, jedoch die Möglichkeit der Definition von Datenstrukturen bieten.

Spark ist in Scala programmiert und nutzt die Java-Infrastruktur. Alternativ kann Python oder auch Java verwendet werden. Ohne die wohl niemals endende Diskussion befeuern zu wollen, welche der Sprachen vorzugsweise zu verwenden sei, ist grundsätzlich anzumerken, dass Python aufgrund seiner weiten Verbreitung und vielfältigen Bibliotheken ein idealer Startpunkt ist, während Scala wegen seiner programmatischen Strenge und seiner Performance-Vorteile in zeitkritischen Aufgabenstellungen eher für produktive Anwendungen zu nutzen ist.

Im einfachsten Fall nutzt man das Konsolprogramm pyspark, um auf Daten in der MongoDB zugreifen zu können. Um die standardmäßig mit pyspark verknüpfte Spark Session spark dafür zu nutzen, sind beim Aufruf von pyspark die Konfigurationsparameter für den Zugriff auf die MongoDB anzugeben sowie der Verweis auf die Klassenbibliothek für die MongoDB-Verbindung anzugeben.
In dem nachfolgenden Beispiel soll dies an einer Liste der US-amerikanischen Postleitzahlen demonstriert werden, die beispielsweise unter https://media.mongodb.org/zips.json zu finden ist. Nachdem die Daten in ein temporäres Verzeichnis geladen wurden, kann die Spark Shell aufgerufen werden, beispielsweise:

pyspark --conf “spark.mongodb.input.uri=mongodb://10.1.0.4:27035/zipdb.zipCol“
--conf “spark.mongodb.output.uri=mongodb://10.1.0.4:27035/zipdb.zipCol“
--package org.mongodb.spark:mongo-spark-connector_2.11:2.2.2

Die Port-Adresse (hier 27035) kann entfallen, wenn der Standard-Port 21017 für die MongoDB verwendet wird. Die Datenbank und die Collection muss nicht zwangsläufig existieren, wenn in die Datenbank zunächst geschrieben wird.

Mit dem Session-Objekt besteht Zugang zum DataFrame-API und es kann ein DataFrame definiert werden, welches zunächst die Daten aus der Datei „zips.json“ liest:
df = spark.read.json(“file:///tmp/zips.json“)
Nach einer Anpassung im Attributnahmen können die Daten beispielhaft verifiziert werden:
df1 = df.withColumnRenamed(„_id“, „zip“)
df1.show()
Die Daten können nun in die MongoDB überführt werden:
df1.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").save()
Das Ergebnis kann überprüft werden, indem ein DataFrame auf die gerade gespeicherten Daten referenziert wird:
df2 = spark.read.format("com.mongodb.spark.sql.DefaultSource").load()
Es wird nun aus einer Stichprobe der Daten das Schema bestimmt, welches mit
df2.printSchema()
angezeigt werden kann.

Die automatische Bestimmung des Schemas ist Fluch und Segen zugleich. Vorteilhaft ist, dass man sich nicht die Mühe machen muss, die zuweilen komplexen Datenstrukturen in MongoDB zu beschreiben. Der Nachteil ist, dass die automatisierte Strukturbestimmung zuweilen fehlerhaft ist – eine Häufung von NULL-Werten in einem Attribut führt zu einer Interpretation als NULL-Konstante oder Zeichenketten mit Zahlen werden als Typ Integer interpretiert.

Eine Alternative ist, die Größe der Stichprobe (Standard sind 1.000 Elemente) für die Bestimmung der Datenstruktur zu verändern. Dies ist mittels Konfigurationsparameter möglich, wird jedoch in den meisten Fällen nicht zum Erfolg führen. Eine weitere Möglichkeit besteht darin, die Struktur vorzugeben. Dies ist mit Python allerdings nicht möglich, so dass nach Alternativen gesucht werden muss. Damit wird sich die nächste Folge auseinandersetzen.


Hot Wheels - Folge 3: Datenanalyse und Machine Learning mit MongoDB und Spark
https://www.qualiero.com/community/big-data-hadoop/big-data-einsatzszenarien/hot-wheels-folge-3-datenanalyse-und-machine-learning-mit-mongodb-und-spark.html

Hot Wheels - Folge 1: Datenanalyse und Machine Learning mit MongoDB und Spark
https://www.qualiero.com/community/big-data-hadoop/big-data-einsatzszenarien/hot-wheels-folge-1-datenanalyse-und-machine-learning-mit-mongodb-und-spark.html

Neueste Mitgliederaktivitäten

Diesen Community Beitrag weiterempfehlen