Geocoding

 View Only
  • 1.  Tuning Geocoding in Spark

    Employee
    Posted 11-15-2019 09:19
    Edited by Dustin Clark 11-18-2019 11:18

    Introduction

    Using Geocoding in Spark is a great way to quickly geocode millions of addresses but proper tuning is required to avoid failures and extended job times. This post will give general tuning advice and what to focus on when running your own data on your own cluster. This post is part 1 of 2 and will focus on geocoding a 19 million record dataset in an EMR cluster with 4 very large worker nodes. The second post will discuss advanced topics on techniques and calculations used in this post.

    Tools used:

    • Amazon Web Services (AWS):
      • Elastic Map Reduce (EMR) - Used to create clusters of machines
      • Simple Storage Service (S3) - Used to store scripts and data to be downloaded to the EMR cluster.
    • Spectrum Geocoding for Big Data:
      • Spark 2 Geocoding Driver - Jar file used as-is to geocode in spark.

    Data

    The data we are going to geocode is a 1.25GB CSV file containing 19,385,996 addresses. Here is an example of the the data:


    The addresses are from over 200 countries.

    image2019-10-9_15-7-27.png

    Cluster

    We are going to use the AWS EMR service to create a cluster comprised of 1 master node and 4 worker nodes. In order to determine the hardware specifications for our worker nodes we must determine an optimal size of our Spark Executors that will run our geocoding tasks. After doing tests, detailed next in Part 2, we will have spark executors that are 4 cores and 15GB of Memory. Based on this ratio of cores to memory we will use m5 EC2 cluster types since they have a similar hardware ratio of cores to memory. To determine the best m5 type we have to consider how many spark executors the type can allocate. Its important to remember that YARN, the service responsible for manage resources in an EMR cluster, will not allow spark to consume all of a node's memory. Go HERE to see yarn setting for each instance type.


    If we want to use our resources efficiently then we have to either choose m5.12xlarge or m5.24xlarge. We decided to choose the smaller instance type m5.12xlarge.  Since we will have 4 worker nodes then our cluster will be able to hold 47 Spark Executors. We cannot allocate 48 Executors because Spark needs space to run a Spark Driver which helps manages the Spark Job. Only 1 node will have the Spark Driver. Here is a diagram of our EMR Cluster:

    cluster-breakdown

    The cluster has custom bootstrap scripts that install geocoding when the nodes start up. This means the cluster is ready to geocode once it comes online.

    Spark Application

    To run our Spark application we will use the Spark Geocoding Jar that is included in the product. We will not need to edit this jar because all the settings we need to change can be set as arguments. 
    Here is the command used to execute the spark job:

    spark-submit --master yarn --deploy-mode cluster --num-executors 47 --executor-cores 4 --executor-memory 14G --conf spark.executor.memoryOverhead=1G --driver-cores 1 --driver-memory 1GB --conf spark.driver.memoryOverhead=1GB --class com.pb.bigdata.geocoding.spark.app.GeocodeDriver /pb/geocoding/sdk/spark2/driver/spectrum-bigdata-geocoding-spark2-3.2.0-all.jar --geocoding-config-location /mnt/pb/geocoding/sdk/resources/config/ --geocoding-binaries-location /pb/geocoding/sdk/resources/nativeLibraries/bin/linux64/ --geocoding-preferences-filepath /pb/geocoding/sdk/resources/config/geocodePreferences.xml --input /user/hadoop/geocoding/test_data.txt --csv delimiter="|" --geocoding-input-fields mainAddressLine=1,2,3 areaName3=4 areaName1=5 postCode1=6 country=7 --output /user/hadoop/spark2/GeocodeDriverTester/should_geocode_addresses/output4 --geocoding-output-fields x y PB_KEY --error-field error --overwrite --num-partitions 1000

    Lets break that command down to explain it

    Spark Script
    spark-submit
    Script supplied by Spark that allows us to submit spark applications



    Spark Options
    --master yarn
    --deploy-mode cluster
    Run our Spark application on the worker nodes in our cluster.
    --num-executors 47
    --executor-cores 4
    --executor-memory 14G
    --conf spark.executor.memoryOverhead=1G
    Create 47 executors, each having 4 cores and 15GB of total memory.
    --driver-cores 1
    --driver-memory 1GB
    --conf spark.driver.memoryOverhead=1GB
    Explicitly declare driver resources to best manage cluster resources. We have enough extra resources to have the driver resource consumption mimic an executor but that would be overkill for our driver which does very little work.
    --class com.pb.bigdata.geocoding.spark.app.GeocodeDriver
    The class of our driver that spark will execute.
    Geocoding Jar
    /pb/geocoding/sdk/spark2/driver/spectrum-bigdata-geocoding-spark2-3.2.0-all.jar
    The jar that contains the class declared above.



    Geocoding Options
    --geocoding-config-location
    /pb/geocoding/sdk/resources/config/
    --geocoding-binaries-location
    /pb/geocoding/sdk/resources/nativeLibraries/bin/linux64/
    --geocoding-preferences-filepath
    /pb/geocoding/sdk/resources/config/geocodePreferences.xml
    Configuration paths needed for geocoding. These paths were configured during the bootstrap phase of he cluster starting up.
    --input /user/hadoop/geocoding/test_data.txt
    --csv delimiter="|"
    --geocoding-input-fields
    mainAddressLine=1,2,3 areaName3=4 areaName1=5 postCode1=6 country=7
    The format and path to the input data and the fields to be used for geocoding.
    --output /user/hadoop/output/
    --geocoding-output-fields x y PB_KEY
    --error-field error
    --overwrite
    The path to save the output and the desired geocode fields that will be added, along with an error field containing error messages if present.
    --num-partitions 188
    Our cluster will have 47 4-core Executors which means it can work on 47 * 4 = 188 tasks at once. By default Spark will split this data by the Executor Count, 47. To fully utilize our cluster we have to force our data into 188 partitions. See Part 2 for more details.

    Full Execution

    Now that we know the cluster specifications, have the scripts to setup geocoding, and know our spark-submit command then we can execute the process.

    image2019-10-21_11-23-45.png

    This will geocode the 19 million addresses and save a table that has the following structure:

    Performance Analysis

    We are going to analyze the time spent in the Spark Application step of our process.

    Total Time Spent Geocoding: 28.7 minutes

    Geocodes per second (Across entire cluster): 11257.8

    Geocodes per second per core: 59.88

    Using Geocodes per second per core we can calculate how long our Spark Application would take if had more, or less, worker nodes. For example, if we only had 1 worker node then our Spark Executors would only have 44 cores to utilize and the job would take 2 hours and 3 minutes. If we had 10 worker nodes then the cluster would have 476 for our Spark Executors and the job would only take 11.3 minutes. The time spent creating the cluster and setting up geocoding is constant in all scenarios, which means adding more nodes becomes marginally beneficial.



    ------------------------------
    - Dustin.clark1@pb.com
    PITNEY BOWES SOFTWARE, INC
    Troy NY
    ------------------------------


  • 2.  RE: Tuning Geocoding in Spark

    Employee
    Posted 11-15-2019 09:20

    Tuning Geocoding in Spark: Advanced Topics

    In this post we will dive a little deeper into decisions and calculations made in the above post.

    Distributing Data (Sorting, Shuffling, Repartitioning)

    There are two main problems to solve while geocoding: distributing data across the cluster and distributing computation evenly across the cluster.

    Distributing data across the cluster

    This problem is not unique to geocoding and arises in most Spark Applications. By default, spark will not distribute the 1.25GB input file across the cluster while geocoding. To solve this you need to call repartition(int n) on the spark dataframe before geocoding, where n is the number of partitions to split the data into. This is already implemented in the geocode driver and is exposed through the --num-partitions option. At a minimum, this should be set to num-executors * executor-cores. For us that is 47 * 4 = 188.

    Distributing geocoding computation evenly across the cluster. 

    Not all countries geocode at the same speed. If a spark executor only has countries that geocode quickly then it will finish geocoding and be idle for a long period of time while the other executors are still working. This is not a proper way to utilize our cluster and can result in a Spark Application waiting on just a few tasks. To solve this problem we want to shuffle our input data. Luckily for us this will done be implicitly when we call repartition(int n)to solve the problem above. There is no benefit to randomly sort before using repartition.

    Spark Executor Size

    Choosing the correct amount of CPUs and Memory size to give an executor is very important when trying to efficiently use spark without errors. Since we know our geocoding instance will be shared across multiple tasks in an executor then we will take advantage of that and work with multi-core executors. Heres are the steps that get us to our executor size:

    1. Determine the minimum amount of memory required to avoid failures. 

    We want a reliable Spark Job that never has any task failures. We achieve this through trial and error by starting our job with 3-core executors with 1GB of memory and increasing the memory until we do not have any task failures. For this data we saw some task failures until we got to 13GB of memory. If your data is only distributed across a couple countries this the memory requirement will be lower. 

    2. Increase the minimum memory for a potential performance boost.

    Now that we have our minimum memory requirement it is possible we will see better performance if we increase our memory more. Again by trial and error, we increased the memory until we saw no improvement. We saw minor improvements if we increase to 15GB of memory and degradation started to occur at 20GB of memory.

    3. Determine optimal CPU count

    Now that we know our memory requirement we will determine how many cores each executor should have. By trial and error we start with executors having 2 cores and 15GB of memory and increase by 1 core until we see no improvement. At 3 cores we saw a noticeable improvement and at 7 cores we saw a degradation in performance. A minor peak in performance was observed at 4 cores.

    Finalize

    After following these steps we arrived at 4 cores and 15GB of memory for our executor size. Any executor size between 3 Core/13GB Memory and 6 core/20GB Memory will be very comparable in performance. 

    Additional Geocoding Options

    Using S3 for input or output

    When using EMR it is very common to use S3 paths for your input and output. This is OK and adds no performance penalty. 

    Using --combine

    By default Spark will split your output into many small files, typically by the number of partitions. If you want a single file then the geocoding driver supports the --combine flag which will repartition the output into 1 file. Using that flag with this Spark Application only added 1 minute to the total run time. 

    Cost

    AWS EMR will round up to the nearest hour when calculating cost. This means we would be charged for 1 hour. Costs will vary based on your AWS billing strategies but this cluster would cost between $3.40 (Spot) and $9.39 (On-Demand). 



    ------------------------------
    - Dustin.clark1@pb.com
    PITNEY BOWES SOFTWARE, INC
    Troy NY
    ------------------------------