Wie kann ich eine Million Punkte auf Spark schnell geokodieren?

    In meinem vorherigen Projekt standen wir vor der Aufgabe, für viele Paare von geographischen Koordinaten eine umgekehrte Geokodierung durchzuführen. Bei der umgekehrten Geokodierung handelt es sich um eine Prozedur, die mit der Adresse oder dem Namen eines Objekts auf einer Karte übereinstimmt, dem ein durch die Koordinaten angegebener Punkt einem Paar von Breitengrad und Längengrad entspricht. Das heißt, wir nehmen die Koordinaten, sagen diese: @ 55.7602485.37.6170409, und wir erhalten das Ergebnis entweder von "Russland, Zentralrussland, Moskau, Theaterplatz, diesem und dem Haus" oder zum Beispiel "Bolschoi-Theater".

    Wenn die Eingabe eine Adresse oder ein Name ist und die Ausgabe eine Koordinate ist, wird diese Operation direkt geocodiert . Ich hoffe, wir werden später darüber sprechen.

    Als Eingabe hatten wir am Eingang etwa 100 oder 200.000 Punkte, die im Hadoop-Cluster in Form eines Hive-Tisches lagen. Dies ist so, dass der Umfang der Aufgabe klar ist.

    Am Ende wurde Spark als Verarbeitungswerkzeug ausgewählt, obwohl wir dabei sowohl MapReduce als auch Apache Crunch ausprobierten. Dies ist jedoch eine andere Geschichte, die vielleicht ihren Posten verdient.

    Gerade Lösung steht zur Verfügung


    Zunächst haben wir versucht, das Problem als "in der Stirn" zu behandeln. Als Tool gab es einen ArcGIS-Server, der einen Reverse-Geocoding-REST-Service bereitstellt. Es ist recht einfach zu benutzen, dazu führen wir eine http GET-Anfrage mit der folgenden URL aus:

    http://базовый-url/GeocodeServer/reverseGeocode?<параметры>
    

    Von den vielen Parametern reicht es aus, location = x, y zu setzen (die Hauptsache ist nicht zu verwechseln, welcher von ihnen Breitengrad und wer Längengrad ist;). Und hier haben wir bereits JSON mit den Ergebnissen: Land, Region, Stadt, Straße, Hausnummer. Beispiel aus der Dokumentation:

    {
     "address": {
      "Match_addr": "Beeman's Redlands Pharmacy",
      "LongLabel": "Beeman's Redlands Pharmacy, 255 Terracina Blvd, Redlands, CA, 92373, USA",
      "ShortLabel": "Beeman's Redlands Pharmacy",
      "Addr_type": "POI",
      "Type": "Pharmacy",
      "PlaceName": "Beeman's Redlands Pharmacy",
      "AddNum": "255",
      "Address": "255 Terracina Blvd",
      "Block": "",
      "Sector": "",
      "Neighborhood": "South Redlands",
      "District": "",
      "City": "Redlands",
      "MetroArea": "Inland Empire",
      "Subregion": "San Bernardino County",
      "Region": "California",
      "Territory": "",
      "Postal": "92373",
      "PostalExt": "",
      "CountryCode": "USA"
     },
     "location": {
      "x": -117.20558993392585,
      "y": 34.037880040538894,
      "spatialReference": {
       "wkid": 4326,
       "latestWkid": 4326
      }
     }
    }
    

    Sie können optional angeben, welche Arten von Antworten wir wünschen - E-Mail-Adressen oder POI (Point of Interest, Antworten wie "Bolshoi Theatre"), oder brauchen wir zum Beispiel Straßenkreuzungen. Sie können auch den Radius angeben, innerhalb dessen ein benanntes Objekt vom angegebenen Punkt aus gesucht wird.

    Für eine schnelle Überprüfung der Qualität der Antwort können Sie die Entfernung zwischen dem Quellpunkt in den Anforderungsparametern und dem resultierenden Punkt - dem Ort in der Serviceantwort - schätzen.

    Es scheint, dass jetzt alles gut wird. Aber es war nicht da. Unsere ArcGIS-Instanz war ziemlich langsam, der Server schien 4 Kerne und ungefähr 8 Gigabyte RAM zu haben. Infolgedessen konnte die Aufgabe im Cluster unsere 200.000 Punkte sehr schnell ablesen, lehnte sich jedoch gegen REST und die Leistung von ArcGIS ab. Und das Geokodieren aller Punkte dauerte Stunden. Zur gleichen Zeit haben wir nur 8 Kerne auf Hadoop und etwas Speicher zugewiesen. Da die Auslastung des ArcGIS-Servers jedoch viele Stunden lang nahezu 100% betrug, haben uns die zusätzlichen Ressourcen im Cluster nichts gebracht.

    ArcGIS kann keine umgekehrten Geokodierungsvorgänge ausführen. Daher wird die Abfrage für jeden Punkt einmal ausgeführt. Übrigens, wenn der Dienst nicht antwortet, fallen wir mit einer Zeitüberschreitung oder mit einem Fehler aus, und was zu tun ist, ist ein Problem mit einer nicht offensichtlichen Antwort. Versuchen Sie es vielleicht noch einmal und beenden Sie den gesamten Vorgang und wiederholen Sie ihn dann für die Rohpunkte.

    In der zweiten Näherung führen wir den Cache ein


    Zum einen haben wir herausgefunden, dass viele Punkte in unserem Set sich wiederholende Koordinaten haben. Der Grund ist einfach: Natürlich ist die Genauigkeit des GPS nicht gut genug, um die Koordinaten zweier Punkte zu unterscheiden, die zwei Meter voneinander entfernt sind, oder die Koordinaten, die nicht vom GPS, sondern von einer anderen Basis empfangen wurden, wurden einfach in die ursprüngliche Datenbank eingegeben. Im Allgemeinen spielt es keine Rolle, warum dies so ist. Die Hauptsache ist, dass dies eine typische Situation ist, so dass ein Ergebniscache des Dienstes es Ihnen ermöglicht, jedes Koordinatenpaar nur einmal zu geokodieren. Und wir können uns den Speicher für den Cache durchaus leisten.

    Eigentlich wurde die erste Modifikation des Algorithmus trivial gemacht - alle von REST erhaltenen Ergebnisse wurden dem Cache hinzugefügt, und für alle Punkte wurde zuerst nach Koordinaten gesucht. Wir haben nicht einmal einen gemeinsamen Cache für alle Spark-Prozesse gestartet - er hatte auf jedem Cluster-Knoten einen eigenen Cache.

    Auf diese einfache Weise konnten wir die Beschleunigung auf etwa das Zehnfache erhöhen, was ungefähr der Anzahl der Wiederholungen von Koordinaten im ursprünglichen Satz entspricht. Das war schon akzeptabel, aber immer noch sehr langsam.

    Nun, unser Kunde hat uns zu diesem Zeitpunkt gesagt, wenn wir Adressen nicht schneller berechnen können, können wir schnell eine Stadt ermitteln? Und wir machen uns auf den Weg ...

    Vereinfachte Lösung, wir implementieren Geomerty API


    Was müssen wir eine Stadt definieren? Wir hatten die Geometrie der Regionen Russlands, die administrativ-territoriale Einteilung, ungefähr in die Bereiche der Stadt.

    Nehmen Sie zum Beispiel, können die Daten werden hier . Was ist da Dies ist eine Datenbank der Verwaltungsgrenzen der Russischen Föderation für die Stufen 2 (Land) bis 9 (Stadtbezirke). Das Format ist entweder Geojson oder CSV (während die Geometrie selbst im Wkt-Format vorliegt). Gesamtdatenbank von etwa 20 Tausend Datensätzen.

    Die neue vereinfachte Lösung des Problems sah folgendermaßen aus:

    1. Wir laden Daten ATD in Hive.
    2. Für jeden Punkt mit Koordinaten suchen wir nach Polygonen in der Tabelle der Gebietsaufteilung, wo dieser Punkt enthalten ist.
    3. Die gefundenen Polygone sind nach Ebenen sortiert.

    Als Ergebnis erhalten wir am Ausgang so etwas wie: Russland, Zentralrussland, Moskau, solche und solche Verwaltungsbezirke, Distriktsummen und ähnliches, dh eine Liste von Gebieten, zu denen unser Punkt gehört.

    ATD wird geladen


    Um CSV einfacher herunterladen zu können, werden wir Kite anwenden . Dieses Tool kann auf der Grundlage der Spaltenüberschriften in CSV ein sehr gutes Schema für Hive erstellen. Tatsächlich wird der Import auf drei Befehle reduziert (von denen einer für jede Ebenendatei wiederholt wird):

    kite-tools csv-schema admin_level_2.csv --class al --delimiter \; >adminLevel.avrs
    kite-tools create dataset:hive:/default/levelswkt -s adminLevel.avrs
    kite-tools csv-import admin_level_2.csv dataset:hive:/default/levelswkt --delimiter \;
    ...
    kite-tools csv-import admin_level_10.csv dataset:hive:/default/levelswkt --delimiter \;
    

    Was haben wir hier gemacht? Das erste Team entwickelte uns ein Avro-Schema für csv, für das wir einige Parameter des Schemas (in diesem Fall Klasse) und Feldtrennzeichen für CSV angegeben haben. Darüber hinaus lohnt es sich, das resultierende Schema mit Augen zu betrachten, und es ist möglich, einige Korrekturen vorzunehmen, da Kite nicht alle Zeilen unserer Datei betrachtet, sondern nur eine bestimmte Stichprobe, sodass es manchmal falsche Annahmen über die Art der Daten geben kann (ich habe drei Zahlen gesehen) Die Spalte ist numerisch und dann gehen die Zeilen).

    Nun, auf der Grundlage des Schemas erstellen wir ein Dataset (dies ist der allgemeine Begriff Kite, der die Tabellen in Hive, die Tabellen in HBase und etwas anderes verallgemeinert). In diesem Fall ist default die Datenbank (auch für Hive als Schema) und levelswkt ist der Name unserer Tabelle.

    Nun, die letzten Befehle laden CSV-Dateien in unseren Datensatz hoch. Nachdem Sie den Download erfolgreich abgeschlossen haben, können Sie die Abfrage bereits ausführen:

    select * from levelswkt;

    irgendwo in Hue.

    Mit Geometrie arbeiten


    Um mit Geometrie in Java zu arbeiten, haben wir die Java Geometry API (Entwickler von ArcGIS) von Esri ausgewählt. Im Prinzip war es möglich, andere Frameworks zu verwenden, eine Auswahl von Open Source ist verfügbar, zum Beispiel das bekannte Paket JTS Topology Suite oder Geotools .

    Die erste Aufgabe ermöglicht es uns, mit einem anderen Framework derselben Esri-Firma, dem Spatial Framework für Hadoop, umzugehenund basierend auf dem ersten. Tatsächlich benötigen wir davon das sogenannte SerDe, das Serialisierungs-De-Serialisierungs-Modul für Hive, mit dem wir ein Paket von Geojson-Dateien in Hive als Tabelle präsentieren können, deren Spalten den Geojson-Attributen entnommen sind. Und die Geometrie selbst wird zu einer anderen Spalte (mit binären Daten). Als Ergebnis haben wir eine Tabelle, von der eine die Geometrie einer bestimmten Region ist, und der Rest sind seine Attribute (Name, Ebene in ADT usw.). Diese Tabelle steht der Spark-Anwendung zur Verfügung.

    Wenn wir die Datenbank im CSV-Format laden, haben wir eine Spalte, in der die Geometrien in Textform vorliegen, im WKT- Format . In diesem Fall kann Spark zur Laufzeit mithilfe der Geometrie-API ein Geometrieobjekt erstellen.

    Wir haben das CSV-Format (und das WKT) aus einem einfachen Grund gewählt - wie jeder weiß, belegt Russland eine Karte auf der Karte mit den Koordinaten von Chukotka für den 180-Meridian. Das Geojson-Format hat eine Einschränkung - alle Polygone in diesem Bereich sollten auf 180 Grad begrenzt sein, und diejenigen, die den 180-Meridian kreuzen, müssen in zwei Teile geschnitten werden. Beim Importieren von Geometrie in die Geometry-API erhalten wir daher ein Objekt, für das die Geometrie-API den Begrenzungsrahmen (umschließendes Rechteck) für den russischen Rand falsch bestimmt. Es stellt sich heraus, dass die Antwort -180.180 Länge beträgt. Was natürlich nicht stimmt - in der Realität dauert Russland etwa 20 bis -170 Grad. Dies ist das Problem der Geometry-API. Heute wurde es vielleicht schon behoben, aber dann mussten wir es umgehen.

    WKT hat kein solches Problem. Sie fragen, warum brauchen wir die Bounding Box? Dann werde ich erzählen;)

    Es bleibt das so genannte PIP-Problem zu lösen, Punkt in Poligon. Die Java-Geometrie-API kann dies wiederum tun: Dies ist für uns einfach: Eine Geometrie vom Typ Point, das zweite Polygon (Multipoligon) für die Region und eine Methode enthält eine Methode.

    Infolgedessen sah die zweite Lösung und auch auf der Stirn folgendermaßen aus: Die Spark-Anwendung lädt ADT einschließlich der Geometrie. Etwas vom Typ Map Name-> Geometrie wird aus ihnen erstellt (in der Tat etwas komplizierter, da ADT ineinander verschachtelt sind und wir nur in den unteren Ebenen suchen müssen, die bereits in der oberen Ebene enthalten sind) nach den Quelldaten müssen noch gebaut werden). Und dann bauen wir Spark-Datensätze mit unseren Punkten auf, und für jeden Punkt wenden wir unsere UDF an, die das Vorkommen des Punktes in allen Geometrien (Baumstruktur) überprüft.

    Das Erstellen einer neuen Version erfordert einen Arbeitstag. Der Nutzen des Spatial Framework für Hadoop-Bundles war ein recht gutes Beispiel für die Lösung des PIP-Problems (wenn auch auf andere Weise).

    Wir fangen an und ... oh, Horror, etwas ist schnell verschwunden. Schau nochmal. Es ist Zeit, über die Optimierung nachzudenken.

    Optimierte Lösung QuadTree


    Der Grund für die Bremsen ist ziemlich offensichtlich - etwa die Geometrie Russlands, d. H. Außengrenzen sind dies Geojson-Megabytes, ein kräftiges Polygon und kein einziges. Wenn wir uns daran erinnern, wie das PIP-Problem gelöst ist, dann besteht eine der bekannten Methoden darin, einen Strahl von einem Punkt aus, etwa irgendwo aufwärts, ins Unendliche zu bauen und zu bestimmen, an wie vielen Punkten ein Polygon kreuzt. Wenn die Anzahl der Punkte gerade ist, befindet sich der Punkt außerhalb des Polygons, wenn ungerade im Inneren liegt.

    Sagen wir die Beschreibung des Wikis .



    Es ist klar, dass für ein riesiges Polygon die Lösung des Kreuzungsproblems um so viele Male kompliziert ist, wie wir gerade Liniensegmente im Polygon haben. Daher ist es wünschenswert, diese Polygone irgendwie zurückzuweisen, in die der Punkt offensichtlich nicht eintreten kann. Und als zusätzliches Lifhak ist es möglich, den Scheck für die Einreise in die Grenzen Russlands fallen zu lassen (wenn wir wissen, dass alle Koordinaten offensichtlich darin enthalten sind).

    Dafür brauchen wir einen Baum von Quadranten . Glücklicherweise befindet sich die Implementierung in derselben Geometrie-API (und an vielen anderen Stellen).



    Die baumbasierte Lösung sieht folgendermaßen aus:

    1. Laden Sie die Geometrie des ADT
    2. Für jede Geometrie definieren wir das umschließende Rechteck
    3. Wir platzieren es in QuadTree, wir erhalten in der Antwort einen Index
    4. Index merken

    Weiter, wenn Punkte verarbeitet werden:

    1. Fragen Sie QuadTree, welche der bekannten Geometrien einen Punkt enthalten kann.
    2. Holen Sie sich Geometrieindizes
    3. Nur für sie prüfen wir das Vorkommen, indem wir das PIP-Problem lösen.

    Das alles dauert noch vier Stunden. Laufen Sie noch einmal, und wir sehen, dass die Aufgabe sehr schnell erledigt wurde. Wir prüfen - alles ist gut, die Lösung ist erhalten. Und das alles für ein paar Minuten. QuadTree ermöglicht eine Beschleunigung um mehrere Größenordnungen.

    Ergebnisse


    Was haben wir am Ende? Wir haben einen umgekehrten Geokodierungsmechanismus erhalten, der auf dem Hadoop-Cluster effektiv parallelisiert wird und der unser ursprüngliches Problem von 200.000 Punkten in etwa Minuten löst. Ie Wir können diese Lösung sicher auf Millionen von Punkten anwenden.

    Was sind die Einschränkungen dieser Lösung? Erstens das Offensichtliche - es basiert auf den uns zur Verfügung stehenden Daten ATD, die a) möglicherweise nicht relevant sind, b) nur von Russland begrenzt sind.

    Zweitens können wir das umgekehrte Geokodierungsproblem nicht für geschlossene Polygone lösen. Und das heißt - auch für die Straße.

    Entwicklung


    Was kann man damit machen?

    Um die aktuelle Geometrie ADT zu erhalten, ist es am einfachsten, sie von OpenStreetMap zu nehmen. Natürlich müssen sie etwas arbeiten, aber dies ist eine völlig lösbare Aufgabe. Wenn es Interesse gibt, werde ich über die Aufgabe sprechen, OpenStreetMap-Daten zu einem anderen Zeitpunkt in den Hadoop-Cluster zu laden.

    Was kann man für Straßen und Häuser tun? Im Prinzip befinden sich die Straßen im selben OSM. Dies sind jedoch keine geschlossenen Strukturen, sondern Linien. Um festzustellen, dass sich der Punkt in der Nähe einer bestimmten Straße befindet, müssen Sie eine Deponie für die Straße aus gleich beabstandeten Punkten errichten und prüfen, ob die Straße getroffen wird. Als Ergebnis stellt sich so eine Wurst heraus ... es sieht so aus:



    Wie nah ist der Punkt? Dies ist ein Parameter, der ungefähr dem Radius entspricht, in dem nach ArcGIS-Objekten gesucht wird und den ich ganz am Anfang erwähnt habe.

    So finden wir die Straßen, die Entfernung von dem Punkt, bis zu dem eine bestimmte Grenze (z. B. 100 Meter) liegt. Je kleiner diese Grenze ist, desto schneller arbeitet der Algorithmus, aber umso wahrscheinlicher ist es, dass Sie keine einzige Übereinstimmung finden.

    Das offensichtliche Problem besteht darin, dass es unmöglich ist, diese sogenannten Puffer im Voraus zu berechnen - ihre Größe ist ein Dienstparameter. Sie müssen im laufenden Betrieb gebaut werden, nachdem wir das erforderliche Gebiet der Stadt und ausgewählte Straßen, die dieses Gebiet durchqueren, von der OSM-Basis aus festgelegt haben. Straßen können jedoch vorab ausgewählt werden.

    Häuser, die sich in der gefundenen Gegend befinden, bewegen sich auch nirgendwohin, sodass ihre Liste im Voraus erstellt werden kann. Sie müssen jedoch erwägen, sie im laufenden Betrieb zu treffen.

    Das heißt, Sie müssen zunächst ein Verzeichnis der Form „Stadtteil“ erstellen -> eine Liste von Häusern mit Verweisen auf die Geometrie und ein ähnliches für die Liste der Straßen.

    Und sobald wir das Gebiet bestimmt haben, erhalten wir eine Liste von Häusern und Straßen, bauen Grenzen entlang der Straßen auf, und bereits für sie lösen wir das PIP-Problem (wahrscheinlich mit den gleichen Optimierungen wie für die Grenzen der Regionen). In diesem Fall kann natürlich auch ein Quad-Baum für Häuser im Voraus gebaut und irgendwo gespeichert werden.

    Unser Hauptziel ist es, die Anzahl der Berechnungen zu minimieren und im Voraus alles zu optimieren und zu speichern, was berechnet und gespeichert werden kann. In diesem Fall besteht der Prozess aus der langsamen Phase des Aufbaus der Indizes und der zweiten Phase der Berechnung, die sich in der Nähe der Online-Version schnell abspielt.

    Jetzt auch beliebt: