Big Data – Hadoop

In dieser Gruppe diskutieren wir über Einsatzszenarien, Technologien und Trends zu Big Data und hadoop.

Hot Wheels - Folge 4: Datenanalyse und Machine Learning mit MongoDB und Spark

Ulrich Plogas
Experte
Zurück aus der Sommerpause beschäftigt sich dieser Beitrag – wie zuletzt angekündigt – mit SparkSQL. Allerdings werden hier weniger die funktionalen Aspekte betrachtet – dies ist der Schwerpunkt eines Videoblogs zu diesem Thema. Hier wird eher auf das „Wie“ fokussiert, also praktische Aspekte der Nutzung in den Mittelpunkt gestellt. Wie überall, lohnt es sich auch hier, das Kleingedruckte zu beachten.
Das Thema wird in zwei Beiträgen beleuchtet. Dieser Beitrag gibt einen Überblick, der nachfolgende wird die Thematik mit praktischen Beispielen unterfüttern.
Oft ist zu lesen, dass das Leistungsvermögen einer Spark-Anwendung praktisch kaum Limitationen kennt. Das erinnert an Kommentare von Entwicklern relationaler Engines in den 90er Jahren, die meinten, für Performance-Optimierung gäbe es überhaupt keine Notwendigkeit mehr, man müsste einfach nur „genügend“ leistungsfähige Hardware einsetzen. So ist es prinzipiell auch mit Spark – mehr Maschinen und mehr Speicher bewirken Wunder. Aber: Wer hat schon ein unbegrenztes Hardware-Budget und will beständig die Konfiguration anpassen? Also sollte man sich auf altbekannte (und bewährte) Tugenden besinnen und das Thema „Performance-Optimierung“ gedanklich reaktivieren …
Die Optimierung von Spark hat zugegebenermaßen viele Facetten. Das liegt zum einen in der Komplexität einer verteilten Umgebung wie Spark und zum anderen in den unterschiedlichen Einsatzszenarien. Sicher gelten für die investigative Datenanalyse andere Maßstäbe als für eine produktive Anwendung. Doch überall gilt: Ressourcen sind immer zu knapp und folglich sollte man auch schonend mit ihnen umgehen.
Die folgenden Ausführungen lassen sich grob in vier Gruppen aufteilen:
- Optimierung der Spark-Konfiguration;
- Optimierung des Datenzugriffs;
- Optimierung des Ablaufes;
- Optimierung von Abfragen.
Der erste Aspekt, die Optimierung der Spark-Konfiguration, soll hier nur kurz behandelt werden. Zum einen wird nicht jeder Data Scientist die Möglichkeit und/oder den Wunsch haben, an der Konfiguration herumzuwerkeln und zum anderen gibt es genügend Informationen zu diesem Thema. Deshalb soll hier eine grobe Aufzählung ausreichen:
- Anzahl der Knoten: Natürlich gilt grundsätzlich die Regel, dass mit der Anzahl der Spark Workers der Datendurchsatz steigt und die Verarbeitungszeit sinkt. Allerdings setzt dies voraus, dass die Aufgabenstellung tatsächlich auch in entsprechendem Umfang verteilbar ist. Sollten einzelne Executors nicht oder nur in geringem Umfang zur Bearbeitung der Aufgabenstellung beitragen, ist zu prüfen, ob die Verteilung verbessert werden kann. Anregungen dazu weiter unten.
- Größe und Aufteilung des verfügbaren Hauptspeichers: Apache Spark ist ein Framework für InMemory-Computing, entsprechend kommt dem verfügbaren Hauptspeicher eine zentrale Bedeutung zu. Dabei konkurrieren verschiedene Bereiche im Java Heap um den zugeteilten Platz: Execution Memory (Shuffles, Joins, Aggregations, …), Storage Memory (Cache, interne Daten, …) sowie eine „Sicherheitsreserve“. Es gibt verschiedene Konditionierungen, die man mit Parametern anpassen kann. Eine detaillierte Betrachtung würde an dieser Stelle zu weit führen; es wird aber im nächsten Beitrag ein paar praktische Hinweise geben, wie man die Speicherbelegung ermitteln und interpretieren kann.
- Steuerung der Garbage Collection: Periodische Aufräumarbeiten im Java Heap, die sogenannte Garbage Collection (GC), sind ein trüber Punkt in der ansonsten hellen Java-Welt. Zwar haben die Java-Entwickler nachgebessert und beispielsweise den Heap granuliert, aber da Java-Objekte einen zum Teil beachtlichen Overhead erzeugen, kann das Thema nicht gänzlich vernachlässigt werden. Die gute Nachricht ist, dass man Möglichkeiten hat, die GC zu kontrollieren, die schlechte jedoch, dass das relativ aufwendig ist. Es wird an dieser Stelle auf die Dokumentation zu Spark verwiesen. Vorbeugen ist auch hier besser als heilen: Sparsamerer Umgang mit Größe und Anzahl der generierten Objekte kann helfen, eventuelle Probleme mit der GC zu vermeiden.
- Orchestrierung des Spark Clusters: Ein Spark Cluster wird entweder mit Mesos oder mit YARN orchestriert. Welcher der beiden Cluster Manager zu bevorzugen ist, hängt vom allgemeinen Workload ab. In einem Hadoop-Cluster, in dem auch das Spark Framework aktiviert wird, wird YARN verwendet. In einem Spark Cluster wird Mesos der Vorzug zu geben sein. Mesos ist flexibler und skaliert besser, da es einen nicht-monolithischen Scheduler besitzt.
Ein weiterer Punkt betrifft den Datenzugriff. Die Ausführung einer verteilten Anwendung macht nur wirklich Sinn, wenn die zu verarbeitenden Daten – oder zumindest ein Großteil davon – auch verteilt zur Verfügung stehen. Andernfalls wird nur ein Executor den Job ausführen. Man kann sich behelfen, indem man die Quelldaten beim Lesen partitioniert. Dadurch wird zwar der Flaschenhals beim Lesen der Daten von einer einzelnen Quellinstanz nicht beseitigt, jedoch wird die anschließende Verarbeitung auf mehrere Executors (entsprechend der Anzahl der Partitionen) verteilt. Diese Technik wird häufig beim Lesen von monolithischen Datenquellen (z.B. aus einer relationalen Datenquelle) angewendet.
Obwohl Spark mit den unterschiedlichsten Datenquellen umgehen kann, ist nicht jedes Format gleich gut geeignet in Hinblick auf Performance und Effizienz. Dies hat seine Ursachen in der Formatstruktur selbst, in der Qualität der implementierten Treiber und/oder in der Serialisierung der Daten. Unter Serialisierung versteht man die Umwandlung zwischen externen Datenformaten und dem internen Speicherformat. Dafür stehen in Spark zwei Bibliotheken zur Verfügung, zum einen die (standardmäßige) für die universelle Java Serialization und zum anderen die speziellere, aber effizientere Kryo Serialization.
Ein wichtiger Aspekt ist schließlich die optimale Gestaltung des Ablaufes. Wie an anderer Stelle – Folge 2 – schon ausgeführt, wird jeder Job in Spark als DAG abgearbeitet, der verschiedene Tasks (Transformationen oder Aktionen) beinhaltet. Hierauf kann man Einfluss nehmen, in dem man auf überflüssige Aktionen verzichtet. Aktionen sind im Gegensatz zu Transformationen ressourcenintensiv, da Spark-Objekte materialisiert werden. Ein Spark-Objekt ist per Definition unveränderbar, so dass für jede Persistierung ein neues Objekt generiert und die vorhergehende Version verworfen wird. Wird das Objekt nicht geändert und sind weitere Aktionen erforderlich, so kann es Sinn machen, das persistierte Objekt im Cache zwischen zu speichern. Dabei ist allerdings der Speicherverbrauch zu kontrollieren (siehe auch Ausführungen oben).
Und schließlich bietet noch die Gestaltung der Abfragen Gestaltungsmöglichkeiten für die Verbesserung der Performance. Hier gibt es verschiedene Ansatzmöglichkeiten, aber die wichtigste ist wohl die Art und Weise, wie JOINs aufgelöst werden.
SparkSQL besitzt einen Optimizer (Catalyst) und wer sich ein bisschen in der relationalen Welt auskennt, weiß, wie Optimizer grundsätzlich funktionieren: Eine Abfrage wird zunächst überprüft und zerlegt, es wird der günstigste Ausführungsplan erstellt und dieser wird anschließend ausgeführt. Der Catalyst-Optimizer funktioniert prinzipiell genauso, wenn einmal außer Acht gelassen wird, dass in einer Spark-Umgebung zusätzliche Anforderungen für die Parallelisierung und Serialisierung sowie die Synchronisation der Teilschritte berücksichtigt werden müssen.
Die gute Nachricht ist also, dass ein Optimizer existiert und viel Handarbeit erspart. Trotzdem ist es hilfreich, die Abläufe zu verstehen und gegebenenfalls eingreifen zu können. Und: Ein Optimizer kann nur in dem vorgegebenen Rahmen agieren, also – wenn man so will – ein lokales Optimum zu finden. Das Wissen um die Zusammenhänge ermöglicht es, dem Optimizer „die Arbeit zu erleichtern“ und gegebenenfalls einen besseren Ausführungsplan zu finden.
SparkSQL beherrscht verschiedene JOIN-Techniken:
- Shuffle Hash Join: Diese Methode ist der Urahn der Join-Techniken in Spark und entspricht in etwa dem bekannten Map-Reduce Ansatz. Die Technik war zwischenzeitlich aus Spark verbannt, aber seit Version 2.0.0 wieder aktiviert. In der aktuellen Version (ab 2.3.0) jedoch nur noch dritte Wahl;
- Broadcast Hash Join: Der Ansatz beruht darauf, die kleinste der Tabellen per Broadcast auf alle Knoten zu verteilen und im Speicher vorzuhalten. Dies ist der bevorzugte Ansatz, sofern die Voraussetzung (Größe der Tabelle überschreitet keinen Grenzwert) erfüllt wird;
- Sort Merge Join: Diese Technik ist zweistufig und sortiert zunächst beide Seiten des Joins und sucht anschließend die Übereinstimmung.
Generell lassen sich folgende Hinweise geben:
1) Shuffle-Operationen im JOIN sind nach Möglichkeit zu vermeiden oder zumindest zu reduzieren.
Diese Operation benötigt erhebliche Ressourcen (IO, CPU, Netzwerk), da Daten zwischen den einzelnen Knoten verschoben werden müssen. Idealerweise kann auf den Shuffle im Broadcast Hash Join verzichtet werden (allerdings auf Kosten des Broadcasts). Weitere Möglichkeiten zur Reduzierung werden nachfolgend ausgewiesen.
2) Sorgfältige Auswahl der Schlüsselspalten.
Schon in relationalen Zeiten waren kompakte Schlüsselspalten zu bevorzugen. Daran hat sich grundsätzlich nichts geändert, auch wenn andere Beweggründe dafür existieren. Darüber hinaus sind bei Spark die Inhalte entscheidend: Eine geringe Diversität oder eine Exzentrik in der Verteilung der Schlüsselwerte führt zu unterschiedlich ausgelasteten, im Extremfall leer laufenden Knoten
3) Nur Daten verarbeiten, die tatsächlich benötigt werden.
Sofern abzusehen ist, dass nur ein Teil der Daten dem JOIN-Kriterium entspricht, kann es helfen, die Daten vorab geeignet zu filtern. Die Selektion ist eine Transformation und nimmt zum Zeitpunkt der Definition keine weiteren Ressourcen in Anspruch. Auch eine Auswahl von Spalten (Projektion) kann helfen, Ressourcen schonender einzusetzen.
4) Eine gute Auswahl für die Partitionierung wählen.
Wenn immer möglich, sind die Daten der Tabellen geeignet zu partitionieren, da die Partitionierung – wie oben beschrieben – ein entscheidendes Kriterium für die Verteilung der Bearbeitung auf verschiedene Knoten darstellt. „Geeignet“ heißt in diesem Zusammenhang, dass die Zahl der Partitionen nicht zu groß und nicht zu klein sein sollte. Die Mindestgröße wird durch die Anzahl der im Cluster für Spark verfügbaren Kerne bestimmt.
5) Elementare Regeln aus der relationalen Welt berücksichtigen.
Es ist nicht von Nachteil, sich an einige Regeln aus der relationalen Welt zu erinnern. Beispielsweise ist es gefährlich, wenn linke und rechte Seite Schlüsselduplikate enthalten. Falls eine Seite keine Schlüsselwerte enthält, kann es sinnvoll sein, einen OUTER JOIN einzusetzen. Und so weiter, und so fort …

Neueste Mitgliederaktivitäten

Diesen Community Beitrag weiterempfehlen