Auto Process files added to Azure Storage Account using Databricks
Have you ever wanted to process in near real time new files added to your Azure Storage account (BLOB)? Have you tried using Azure EventHub but files are too large to make this a practical solution?
Let me present you to ABS-AQS file source...
Azure Storage Topics using Event Grid
Optimized Azure Blob Storage File Source with Azure Queue Storage
Databricks have added the support of near real time processing of Changes via the Storage Account event processing via Storage queues.
Detailed in their documentation, you can setup a Databricks readstream to monitor the Azure Storage queue which tracks all the changes.
In order to make this work, you will need a few things as detailed here:
- An Azure Storage Account (BLOB)
- Create a storage queue
- Setting up events using Storage Queue as the end point.
- Generate a connection string for Databricks to use and note a storage account key.
Doing step #2 will ask you to create an EventGrid Subscription. This part was confusion to me, so here's how it should be done.
I recommend unchecking the "Subscribe to all event types". This will at best highlight all the events you want to process. For example, you don't care for files that are deleted.
You will need to select Storage Queues for the Endpoint Type
Doing the above will add a new configuration for Endpoint which you will need to set to the storage queue created above.
Finally, select Event Grid Schema, which is detailed here, for the Event Schema.
Testing your setup
If you did everything correctly, you should see records created when you drop files in your BLOB storage account
Processing Events in Databricks
Now that the nuts and bolts are done, you can now process the events in Databricks.
Setting up WASB access to blob
spark.conf.set( "fs.azure.account.key.[STORAGE ACCOUNT NAME].blob.core.windows.net", [STORAGE ACCOUNT KEY])
Specifying the layout of the files to process
In this example, I will process JSON deposited in the BLOB Storage Account. Doing so will require me to specify a schema. This can be defined using StructType.
For example, the following JSON:
{
"Amounts": {
"amount": "14131441179.6617",
"currencyCode": "EU"
}
}
Will be represented as the following schema:
val schema =
StructType( List (
StructField("Amounts",
StructType(List(
StructField("amount",StringType,true),
StructField("currencyCode",StringType,true)
))
))
)
Setup the Structured Streaming
When setting up the input stream, you will need to specify a few things:
format: | This should be set to abs-aqs |
fileFormat: | The format of the files such as parquet, json, csv, text, and so on. |
queueName: | The name of the Storage Account Queue created earlier |
multiLine: | This allows your JSON to formated on multiple lines |
ignoreDeletes: | Optional, this ignore deleted events |
schema: | The structured schema defined for the JSON |
See this link for complete details on all the options available.
You can now execute the following Scala code to prep the stream:
val inputStream = spark
.readStream.format("abs-aqs")
.option("fileFormat", "json")
.option("queueName", "**[QUEUE NAME CREATED]**")
.option("multiLine", true)
.option("ignoreDeletes", "true")
.option("connectionString", **[STORAGE ACCOUNT CONNECTION STRING]**)
.schema(schema)
.load()
Starting the stream
You are now ready to capture the streaming events coming from your Azure Storage. The following will simply output to the screen but you could much more, like saving to a Delta table.
Display(inputStream)
Thanks!