Darren Gosbell

Tag: python

Microsoft Fabric – writing JSON data into a Lakehouse table

I recently wrote about a technique for calling a Power BI REST API from a Fabric notebook and I got a question about how to take that data and persist it out to a delta table which I thought would make a good follow-up post.

In my last post I use SPN authentication, this time we are going to use the function mssparkutils.credentials.getToken() to get the authentication token for owner of the notebook.

NOTE: I’m still learning python and spark, so this may not be the best way of doing things, I’m just sharing what’s worked for me. Feel free to drop a comment if you see any areas that can be improved.

If you just want to see the full code listing without the explanations between the sections you can find it at the end of this post.

Extracting data from a REST API

from pyspark.sql.functions import *
import requests, os, datetime
from delta.tables import *

############################################################
# Authentication - Using owner authentication
############################################################
 
access_token = mssparkutils.credentials.getToken("https://analysis.windows.net/powerbi/api")

print('Successfully authenticated.')   
 
############################################################
# Get Refreshables
############################################################
 
base_url = 'https://api.powerbi.com/v1.0/myorg/'
header = {'Authorization': f'Bearer {access_token}'}
 
refreshables_url = "admin/capacities/refreshables?$top=200"
 
refreshables_response = requests.get(base_url + refreshables_url, headers=header)
 
# write raw data to a json file
requestDate = datetime.datetime.now()
outputFilePath = "//lakehouse/default/Files/refreshables/top_200_{year}{month}{day}.json".format(year = requestDate.strftime("%Y"), month= requestDate.strftime("%m"), day=requestDate.strftime("%d"))

if not os.path.exists(os.path.dirname(outputFilePath)):    
    print("creating folder: " + os.path.dirname(outputFilePath))    
    os.makedirs(os.path.dirname(outputFilePath), exist_ok=True)

with open(outputFilePath, 'wb') as f:
    f.write(refreshables_response.content)

To this point we have executed a call against the REST API and we have a response object in the refreshables_response variable. And then we’ve written the content property of that response out to a file using a binary write since the content property contains a byte array. What we want to do now is to read that byte array into a spark dataframe so that we can write it out to a table.

Converting a string to a dataframe

Spark does have a method which can read a json file into a spark dataframe, but it seems a bit silly to do that extra IO of reading the file off disk when I still have the data from that file in a variable in memory. You can do this by wrapping the string from the response content in a calling to the parallelize() method then wrapping that in a call to spark.read.json()

refreshablesRDD = spark.sparkContext.parallelize([refreshables_response.content.decode()])<br>dfRefreshables = spark.read.json(refreshablesRDD)

Transforming our dataframe to a flat table

We are almost there, however if you use the display() method to have a look at the results you will see that the dataframe only has a single row with a value column that has an array in it.

But ideally what we would like is a table with a row for each element in the array, not just a single row with the array in a column. To do this we can select from the dataframe and call the explode() method on the value column

This is looking a bit better, but now we have a nested object inside a column called col and what I really want is to break each of the properties from those objects out into separate columns. And we can do that by added a .select(“col.*”) on the end of our exploded select.

That has successfully split out every property into its own column. If you want to get a subset of columns or list the columns in a certain order you can just reference them by name using a syntax like the following:

dfRefreshables = dfRefreshables.select(explode("value")) \
    .select("col.id", 
        "col.startTime", 
        "col.endTime", 
        "col.medianDuration", 
        "col.averageDuration", 
        "col.lastRefresh.status", 
        "col.lastRefresh.refreshType")

Writing a dataframe to a delta table

Now that I have a dataframe in the exact format that I want it is trivial to write this out to a delta table in my lakehouse by calling the .write() method on the dataframe

dfRefreshables.write.mode('overwrite').format("delta").saveAsTable("refreshables")

Merging the data into an existing table

If all you want to do is to overwrite the data in your table each time you run your notebook then you can stop at this point. But what if you are doing an incremental load and you want to insert new records if they don’t already exist or update a record if the id is already in the table. In this case you can use the following code:

 if spark.catalog.tableExists("refreshables"):
        #get existing table
        targetTable = DeltaTable.forName(spark,"refreshables")
        #merge into table
        (targetTable.alias("t")
            .merge(dfRefreshables.alias("s"), "s.Id = t.Id")
            .whenMatchedUpdateAll()
            .whenNotMatchedInsertAll()
            .execute()
        )
else:
    # create workspace table
    dfRefreshables.write.mode('overwrite').format("delta").saveAsTable("refreshables")

This code will check if the table already exists, if it doesn’t it uses the write method to create a new table. But if the table does exist, we then use the merge method to do an upsert operation.

The full code sample

The section below contains all of the code snippets from the above as a single block that you can paste into a cell in a Fabric notebook. When you run this it should:

  1. Query the Power BI get Refreshables API.
  2. Store the raw output in a json file in the Files portion of the attached lakehouse.
  3. Write out this data to a delta table in the lakehouse.
 from pyspark.sql.functions import *
import requests, os, datetime
from delta.tables import *

#########################################################################################
# Authentication - Using owner authentication
#########################################################################################
 
access_token = mssparkutils.credentials.getToken("https://analysis.windows.net/powerbi/api")

print('\nSuccessfully authenticated.')   
 
#########################################################################################
# Get Refreshables
#########################################################################################
 
base_url = 'https://api.powerbi.com/v1.0/myorg/'
header = {'Authorization': f'Bearer {access_token}'}
 
refreshables_url = "admin/capacities/refreshables?$top=200"
 
refreshables_response = requests.get(base_url + refreshables_url, headers=header)
 
# write raw data to a json file
requestDate = datetime.datetime.now()
outputFilePath = "//lakehouse/default/Files/refreshables/top_200_{year}{month}{day}.json".format(year = requestDate.strftime("%Y"), month= requestDate.strftime("%m"), day=requestDate.strftime("%d"))

if not os.path.exists(os.path.dirname(outputFilePath)):    
    print("creating folder: " + os.path.dirname(outputFilePath))    
    os.makedirs(os.path.dirname(outputFilePath), exist_ok=True)

with open(outputFilePath, 'wb') as f:
    f.write(refreshables_response.content)

refreshablesRDD = spark.sparkContext.parallelize([refreshables_response.content.decode()])
dfRefreshables = spark.read.json(refreshablesRDD)

dfRefreshables = dfRefreshables.select(explode("value")) \
    .select("col.id", 
        "col.startTime", 
        "col.endTime", 
        "col.medianDuration", 
        "col.averageDuration", 
        "col.lastRefresh.status", 
        "col.lastRefresh.refreshType")

## write the spark dataframe out to a delta table
if spark.catalog.tableExists("refreshables"):
        #get existing table
        targetTable = DeltaTable.forName(spark,"refreshables")
        #merge into table
        (targetTable.alias("t")
            .merge(dfRefreshables.alias("s"), "s.Id = t.Id")
            .whenMatchedUpdateAll()
            .whenNotMatchedInsertAll()
            .execute()
        )
else:
    # create workspace table
    dfRefreshables.write.mode('overwrite').format("delta").saveAsTable("refreshables")

Calling a Power BI REST API from a Fabric Notebook

The introduction of Microsoft Fabric at the Build Conference last month has added a whole lot of new functionality that you can use. One of the new items we now have access to is a simple way of running Notebooks. And these Notebooks open up all sorts of interesting options for Data engineering, exploration and ingestion.

One of the things that this makes possible is to call a Power BI REST API and save the results into the Files section of a Fabric Lakehouse.

So lets look at an example of that calling the Get Refreshables API.

Setup

There are a couple of setup steps before you can run the script below.

Firstly, I’ve actually followed best practices here and used Azure Key Vault to secure all the secrets for my script rather than just hard coding them. I’ve added the following 3 secrets to my key vault:

  • FabricTenantId – my Power BI tenant ID
  • FabricClientId – the client ID for an SPN that I created to run this script
  • FabricClientSecret – the client secret for the SPN

I then gave my user access to read those secrets and because the notebook uses the identity of the owner when it executes this also works even when your setup a schedule for your notebook.

If you don’t have access to a key vault you could just hard code your values into this notebook.

Then I’ve created a Lakehouse in my workspace and created a refreshables folder under Files.

Once all that is done you can paste the following script into a new notebook and run it.

#########################################################################################
# Read secretes from Azure Key Vault
#########################################################################################
key_vault = "https://dgosbellKeyVault.vault.azure.net/"
tenant = mssparkutils.credentials.getSecret(key_vault , "FabricTenantId")
client = mssparkutils.credentials.getSecret(key_vault , "FabricClientId")
client_secret = mssparkutils.credentials.getSecret(key_vault , "FabricClientSecret")

#########################################################################################
# Authentication - Replace string variables with your relevant values       
#########################################################################################

import json, requests, pandas as pd
import datetime

try:
    from azure.identity import ClientSecretCredential
except Exception:
     !pip install azure.identity
     from azure.identity import ClientSecretCredential

# Generates the access token for the Service Principal
api = 'https://analysis.windows.net/powerbi/api/.default'
auth = ClientSecretCredential(authority = 'https://login.microsoftonline.com/',
                              tenant_id = tenant,
                              client_id = client,
                              client_secret = client_secret)
access_token = auth.get_token(api)
access_token = access_token.token

print('\nSuccessfully authenticated.')   

#########################################################################################
# Get Refreshables
#########################################################################################

base_url = 'https://api.powerbi.com/v1.0/myorg/'
header = {'Authorization': f'Bearer {access_token}'}

refreshables_url = "admin/capacities/refreshables?$top=200"

print(refreshables_url)

refreshables_response = requests.get(base_url + refreshables_url, headers=header)

# write raw data to a json file
with open("/lakehouse/default/Files/refreshables/top200.json", 'wb') as f:
    f.write(refreshables_response.content)

The Output

If we then open our lakehouse and have a look at the file that was generated, we can see the data returned from our API call.

Conclusion

Most of the script was just involved in getting the authentication working and the core portion that then calls the REST API was relatively simple.

This is all relatively new to me and I’m still learning about Python and Notebooks, so if you see anything where I’m not doing things the “right way” feel free to point them out. I’m planning to do some more Notebook related posts in the future. I’m particularly interested in pushing data into delta tables and then building a Power BI dataset in Direct Lake mode, so stay tuned for that.