Parallel export from Azure Data Warehouse to Parquet files

I wanted to export one of our bigger tables from Azure Data Warehouse (ADW) to Azure Data Lake (ADL) as a set of Parquet files. Unfortunately, this is not yet supported by just using external tables and Polybase, so i needed to find an alternative. In Azure, when it comes to data movement, that tends to be Azure Data Factory (ADF).

The idea is to use ADF to export data from a table with about 10 billion records from ADW to a bunch of Parquet files in ADL. This data set can be easily partitioned by time since it's a time series stream by nature. First idea was to partition by month and create one ADF pipeline per month. This was a bit too optimistic. It turns out that when uncompressed, a month of this data was about 100-120GB. I quickly hit OutOfMemoryError exceptions. Ok, month is too big, let's do it per day. Per day partitioning (mostly) worked fine, but now we have a lot of pipelines to create. About 30x more that previously. What is the best approach to do this job?

Throughput

We would like to maximize throughput, use all available resources and of course minimize the time we have to wait for this to be done. How much throughput do we get with a single ADF pipeline? To start with this I have to give a small disclaimer. My ADW was in West Europe and ADL Store was in North Europe. This is because at the moment of writing this, in Europe, ADL is only available in North region, so obviously it's not as fast as it would be if both were in a single region. That said, I was able to average only 5MB/s for a single pipeline. That really does not sound like a lot. I was using ADW with 3000 DWUs, had a user with xlargerc resource class and was only getting 5MB/s. This was obviously going to take ages. I definitely need to have multiple pipelines running at the same time to make this tolerable.
My first attempt was to just duplicate pipelines and adjust the custom query in each of them to query a different month. Same ADW user, same resource class.

This did not work as expected.

Pipelines were just sitting in the queue waiting for the previous one to finish. Hmm, my user should have enough resources to execute multiple queries on ADW, there should be no locking on a simple read query. Why are my pipelines queuing? I must be doing something wrong.

ADW resource model

At this point it was time to go back to basics. Let's RTFM.

It boils down to this.

Bigger the resource group, more memory you get per query. That sounds logical, let's continue. If you are using a big resource class you get a lot of memory for your query. If you don't need that much memory for a query it does not matter, you still get it. Since available memory is constant, there is only so much to give. And there lies the catch. Other queries will be queued because there is no memory left to assign to them. Resource classes get memory assignment guaranties. Even though you are not actually using the memory, it will be reserved.

Here is what the docs say about concurrency limits

Concurrency limits are governed by two concepts: concurrent queries and concurrency slots. For a query to execute, it must execute within both the query concurrency limit and the concurrency slot allocation.

  • Concurrent queries are the queries executing at the same time. SQL Data Warehouse supports up to 32 concurrent queries on the larger DWU sizes.
  • Concurrency slots are allocated based on DWU. Each 100 DWU provides 4 concurrency slots. For example, a DW100 allocates 4 concurrency slots and DW1000 allocates 40. Each query consumes one or more concurrency slots, dependent on the resource class of the query. Queries running in the smallrc resource class consume one concurrency slot. Queries running in a higher resource class consume more concurrency slots.

SQL Data Warehouse grants more memory to queries running in higher resource classes. Memory is a fixed resource. Therefore, the more memory allocated per query, the fewer concurrent queries can execute. The following table reiterates all of the previous concepts in a single view that shows the number of concurrency slots available by DWU and the slots consumed by each resource class.

For additional details, please check out the full docs.

Essentially, we will need a user with a smaller resource class to be able to run multiple pipelines and get more throughput. Smaller resource class means less memory assignment and more concurrency. Turns out, less is more :).

Building the pipelines

We will change our strategy from using a user with a big resource class to a user with a small resource class.

Now that we know the strategy to use for export, let's see how to build all these pipelines. It's not like Microsoft made it easy in the portal with all that Json.
Luckily, there is PowerShell. I decided not to be to fancy, but just hack some scripts and get it done. First, I took one of the previously created pipelines with the Copy wizard and copied the Json definition it generated.
There is only a couple of things changing per each pipeline. Date range and the name of the file we are exporting to. Let's just create simple tokens in the template that we can then replace with actual values in our script. Here is the code.

#Read template file,
#For each day, replace template with values and save file
$templatePipeline = "C:\Temp\DataLakeTemplatePipeline.txt"
$destinationFolder = "C:\Temp\ResultingPipelines"

$templateString = Get-Content $templatePipeline

$dateToken = '$${Date}'
$endDateToken = '$${EndDate}'
$dateUnderscoreToken = '$${DateUnderscore}'
$numberToken = '$${Number}'

#loop over all days
$currentDate = Get-Date -Date "2015-11-01"
$lastDate = Get-Date -Date "2017-05-31"
$number = 1;
while ($currentDate -le $lastDate)  
{
    $date = $currentDate.ToString("yyyy-MM-dd")
    $endDate = $currentDate.AddDays(1).ToString("yyyy-MM-dd")
    $dateUnderscore = $currentDate.ToString("yyyy_MM_dd")

    $pipelineJson = $templateString.Replace($dateToken, $date)
    $pipelineJson = $pipelineJson.Replace($endDateToken, $endDate)
    $pipelineJson = $pipelineJson.Replace($dateUnderscoreToken, $dateUnderscore)
    $pipelineJson = $pipelineJson.Replace($numberToken, $number)


    $pipelineJson | Out-File "$destinationFolder\$date.json"

    $currentDate = $currentDate.AddDays(1)

    $number = $number % 10 + 1
}

As mentioned, there are a couple of tokens. Start date, end date, file name (date with underscore) and that number. Number is used to spread the jobs over multiple users, but that is probably not necessary as in ADW docs I could not find any limitations on a number of connection for one user, just amount of concurrency slots used. Same principle should work just fine with a single ADW user. In the end, script generates one pipeline definition per day of data.

Next step is to actually create all these pipelines in ADF. In this case that was 570 pipelines. Not something anyone wants to do manually.

We will turn to PowerShell once again.

$subscriptionName = "Name of your subscription"
$dataFactoryName = "Name of your data factory"

Get-AzureRmSubscription -SubscriptionName $subscriptionName | Set-AzureRmContext

$df = Get-AzureRmDataFactory -Name $dataFactoryName

$currentDate = Get-Date -Date "2015-11-01"
$lastDate = Get-Date -Date "2017-05-31"
$counter = 0
while ($currentDate -le $lastDate)  
{
    $date = $currentDate.ToString("yyyy-MM-dd")
    "Processing date: $date"

#New pipeline creation
    New-AzureRmDataFactoryPipeline $df -File "C:\Temp\ResultingPipelines\$date.json"
    Move-Item "C:\Temp\ResultingPipelines\$date.json" "C:\Temp\ResultingPipelines\Done\$date.json"

    $currentDate = $currentDate.AddDays(1)
    $counter = $counter + 1

    if($counter -eq 16)
    {
        $counter = 0
        Start-Sleep -Seconds 300
    }
}

I decided to create 16 pipelines in a row and then wait for five minutes to let the previous ones finish. Depending on the amount of DWUs you have, you could go higher or lower. Check the limitations in ADW docs.

I ran the script over night and had my table exported to Parquet files in Azure Data Lake Store by the morning.

I ended up with about 440GB of Parquet files. Ranging from 100MB to 1.5GB in size. This was about 7x compression compared to raw data size and about the same compression you will get on a table with Columnar index in SQL Server 2016.

Notes on ADW Scaling

I found that 400DWUs was more than enough in my case (up to 16 concurrent queries). Only reason to go that high or higher is if you need to go more parallel. Parallelism maxes out at 1000DWUs with 32 concurrent queries. I was not able to exhaust actuall resources, just the connections. My DWU usage graph did not go over 110DWUs used. For this use case it was not necessary to overprovision the warehouse and burn a lot of money in the process. Maybe it's a bit different if ADW and the destination (ADL) are in the same region/data center. I leave that up to you.

Summary

So, to sum this thing up. If you want more parallel queries on ADW, use smaller resource classes. This is definitely useful when you want to export data that is easily partitioned. It's a reminder for myself and I hope it helps you too.

P.S.

Heap Space Issues

Let's address that "mostly" works statement. Some of the 570 pipelines I ran failed with OutOfMemoryError. Mostly these happened when working with source chunks of about 7GB (from what I could see in monitoring that Azure exposes). My guess is that this is the max amount of memory that can be assigned to a single activity in a pipeline. Only thing I could think of to resolve this is split the chunks further (to 12 or 6 hours period). I found it interesting that sometimes this seemed to happen on a data set that was just around 3GB in size. Maybe it has to do with how Parquet files are being written, but unfortunately we don't get enough debug information in ADF to be able to conclude what the reason was.

Copy activity encountered a user error at Sink side: ErrorCode=UserErrorJavaInvocationException,'Type=Microsoft.DataTransfer.Common.Shared.HybridDeliveryException,Message=An error occurred when invoking java, message: java.lang.OutOfMemoryError:Java heap space total entry:11 java.util.ArrayDeque.doubleCapacity(ArrayDeque.java:157) java.util.ArrayDeque.addFirst(ArrayDeque.java:231) java.util.ArrayDeque.push(ArrayDeque.java:503) org.apache.parquet.io.ValidatingRecordConsumer.endField(ValidatingRecordConsumer.java:108) org.apache.parquet.example.data.GroupWriter.writeGroup(GroupWriter.java:58) org.apache.parquet.example.data.GroupWriter.write(GroupWriter.java:37) org.apache.parquet.hadoop.example.GroupWriteSupport.write(GroupWriteSupport.java:64) org.apache.parquet.hadoop.example.GroupWriteSupport.write(GroupWriteSupport.java:36) org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:121) org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:288) com.microsoft.datatransfer.bridge.parquet.ParquetBatchWriter.addRows(ParquetBatchWriter.java:60) ,Source=Microsoft.DataTransfer.ClientLibrary,''Type=Microsoft.DataTransfer.Richfile.JniExt.JavaBridgeException,Message=,Source=Microsoft.DataTransfer.Richfile.HiveOrcBridge,'.