Darren Gosbell

Microsoft Fabric – writing JSON data into a Lakehouse table

I recently wrote about a technique for calling a Power BI REST API from a Fabric notebook and I got a question about how to take that data and persist it out to a delta table which I thought would make a good follow-up post.

In my last post I use SPN authentication, this time we are going to use the function mssparkutils.credentials.getToken() to get the authentication token for owner of the notebook.

NOTE: I’m still learning python and spark, so this may not be the best way of doing things, I’m just sharing what’s worked for me. Feel free to drop a comment if you see any areas that can be improved.

If you just want to see the full code listing without the explanations between the sections you can find it at the end of this post.

Extracting data from a REST API

from pyspark.sql.functions import *
import requests, os, datetime
from delta.tables import *

############################################################
# Authentication - Using owner authentication
############################################################
 
access_token = mssparkutils.credentials.getToken("https://analysis.windows.net/powerbi/api")

print('Successfully authenticated.')   
 
############################################################
# Get Refreshables
############################################################
 
base_url = 'https://api.powerbi.com/v1.0/myorg/'
header = {'Authorization': f'Bearer {access_token}'}
 
refreshables_url = "admin/capacities/refreshables?$top=200"
 
refreshables_response = requests.get(base_url + refreshables_url, headers=header)
 
# write raw data to a json file
requestDate = datetime.datetime.now()
outputFilePath = "//lakehouse/default/Files/refreshables/top_200_{year}{month}{day}.json".format(year = requestDate.strftime("%Y"), month= requestDate.strftime("%m"), day=requestDate.strftime("%d"))

if not os.path.exists(os.path.dirname(outputFilePath)):    
    print("creating folder: " + os.path.dirname(outputFilePath))    
    os.makedirs(os.path.dirname(outputFilePath), exist_ok=True)

with open(outputFilePath, 'wb') as f:
    f.write(refreshables_response.content)

To this point we have executed a call against the REST API and we have a response object in the refreshables_response variable. And then we’ve written the content property of that response out to a file using a binary write since the content property contains a byte array. What we want to do now is to read that byte array into a spark dataframe so that we can write it out to a table.

Converting a string to a dataframe

Spark does have a method which can read a json file into a spark dataframe, but it seems a bit silly to do that extra IO of reading the file off disk when I still have the data from that file in a variable in memory. You can do this by wrapping the string from the response content in a calling to the parallelize() method then wrapping that in a call to spark.read.json()

refreshablesRDD = spark.sparkContext.parallelize([refreshables_response.content.decode()])<br>dfRefreshables = spark.read.json(refreshablesRDD)

Transforming our dataframe to a flat table

We are almost there, however if you use the display() method to have a look at the results you will see that the dataframe only has a single row with a value column that has an array in it.

But ideally what we would like is a table with a row for each element in the array, not just a single row with the array in a column. To do this we can select from the dataframe and call the explode() method on the value column

This is looking a bit better, but now we have a nested object inside a column called col and what I really want is to break each of the properties from those objects out into separate columns. And we can do that by added a .select(“col.*”) on the end of our exploded select.

That has successfully split out every property into its own column. If you want to get a subset of columns or list the columns in a certain order you can just reference them by name using a syntax like the following:

dfRefreshables = dfRefreshables.select(explode("value")) \
    .select("col.id", 
        "col.startTime", 
        "col.endTime", 
        "col.medianDuration", 
        "col.averageDuration", 
        "col.lastRefresh.status", 
        "col.lastRefresh.refreshType")

Writing a dataframe to a delta table

Now that I have a dataframe in the exact format that I want it is trivial to write this out to a delta table in my lakehouse by calling the .write() method on the dataframe

dfRefreshables.write.mode('overwrite').format("delta").saveAsTable("refreshables")

Merging the data into an existing table

If all you want to do is to overwrite the data in your table each time you run your notebook then you can stop at this point. But what if you are doing an incremental load and you want to insert new records if they don’t already exist or update a record if the id is already in the table. In this case you can use the following code:

 if spark.catalog.tableExists("refreshables"):
        #get existing table
        targetTable = DeltaTable.forName(spark,"refreshables")
        #merge into table
        (targetTable.alias("t")
            .merge(dfRefreshables.alias("s"), "s.Id = t.Id")
            .whenMatchedUpdateAll()
            .whenNotMatchedInsertAll()
            .execute()
        )
else:
    # create workspace table
    dfRefreshables.write.mode('overwrite').format("delta").saveAsTable("refreshables")

This code will check if the table already exists, if it doesn’t it uses the write method to create a new table. But if the table does exist, we then use the merge method to do an upsert operation.

The full code sample

The section below contains all of the code snippets from the above as a single block that you can paste into a cell in a Fabric notebook. When you run this it should:

  1. Query the Power BI get Refreshables API.
  2. Store the raw output in a json file in the Files portion of the attached lakehouse.
  3. Write out this data to a delta table in the lakehouse.
 from pyspark.sql.functions import *
import requests, os, datetime
from delta.tables import *

#########################################################################################
# Authentication - Using owner authentication
#########################################################################################
 
access_token = mssparkutils.credentials.getToken("https://analysis.windows.net/powerbi/api")

print('\nSuccessfully authenticated.')   
 
#########################################################################################
# Get Refreshables
#########################################################################################
 
base_url = 'https://api.powerbi.com/v1.0/myorg/'
header = {'Authorization': f'Bearer {access_token}'}
 
refreshables_url = "admin/capacities/refreshables?$top=200"
 
refreshables_response = requests.get(base_url + refreshables_url, headers=header)
 
# write raw data to a json file
requestDate = datetime.datetime.now()
outputFilePath = "//lakehouse/default/Files/refreshables/top_200_{year}{month}{day}.json".format(year = requestDate.strftime("%Y"), month= requestDate.strftime("%m"), day=requestDate.strftime("%d"))

if not os.path.exists(os.path.dirname(outputFilePath)):    
    print("creating folder: " + os.path.dirname(outputFilePath))    
    os.makedirs(os.path.dirname(outputFilePath), exist_ok=True)

with open(outputFilePath, 'wb') as f:
    f.write(refreshables_response.content)

refreshablesRDD = spark.sparkContext.parallelize([refreshables_response.content.decode()])
dfRefreshables = spark.read.json(refreshablesRDD)

dfRefreshables = dfRefreshables.select(explode("value")) \
    .select("col.id", 
        "col.startTime", 
        "col.endTime", 
        "col.medianDuration", 
        "col.averageDuration", 
        "col.lastRefresh.status", 
        "col.lastRefresh.refreshType")

## write the spark dataframe out to a delta table
if spark.catalog.tableExists("refreshables"):
        #get existing table
        targetTable = DeltaTable.forName(spark,"refreshables")
        #merge into table
        (targetTable.alias("t")
            .merge(dfRefreshables.alias("s"), "s.Id = t.Id")
            .whenMatchedUpdateAll()
            .whenNotMatchedInsertAll()
            .execute()
        )
else:
    # create workspace table
    dfRefreshables.write.mode('overwrite').format("delta").saveAsTable("refreshables")

2 Comments

  1. dexterz

    import json,requests
    import pandas as pd
    from notebookutils import mssparkutils
    from pyspark.sql.types import *
    from pyspark.sql.functions import *

    token = mssparkutils.credentials.getToken(‘pbi’) #https://analysis.windows.net/powerbi/api
    headers = {‘Content-Type’:’application/json’, ‘Authorization’:f’Bearer {token}’}

    url = f”https://api.powerbi.com/v1.0/myorg/admin/capacities/refreshables?$expand=capacity,group”

    r = requests.get(url=url,headers=headers)
    pdf = pd.json_normalize(r.json()[‘value’])

    pdf = pdf[[‘capacity.id’,’capacity.displayName’,’group.id’,’group.name’,’id’,’name’,’kind’,
    ‘refreshCount’,’refreshFailures’,’averageDuration’,’medianDuration’,’refreshesPerDay’,
    ‘lastRefresh.id’,’lastRefresh.requestId’,’lastRefresh.refreshType’,’lastRefresh.startTime’,’lastRefresh.endTime’,’lastRefresh.status’,’lastRefresh.serviceExceptionJson’]]

    pdf.columns = [‘capacity_id’,’capacity’,’workspace_id’,’workspace’,’item_id’,’item’,’kind’,
    ‘refreshCount’,’refreshFailures’,’averageDuration’,’medianDuration’,’refreshesPerDay’,
    ‘lastRefresh_id’,’requestId’,’refreshType’,’startTime’,’endTime’,’status’,’serviceExceptionJson’]

    pdf = pdf.replace(pd.NA,”)

    df = spark.createDataFrame(pdf)
    df = df.withColumn(“refreshCount”,col(“refreshCount”).cast(‘int’))\
    .withColumn(“refreshFailures”,col(“refreshFailures”).cast(‘int’))\
    .withColumn(“refreshesPerDay”,col(“refreshesPerDay”).cast(‘int’))\
    .withColumn(“lastRefresh_id”,col(“lastRefresh_id”).cast(‘int’))\
    .withColumn(“startTime”,col(“startTime”).cast(‘TIMESTAMP’))\
    .withColumn(“endTime”,col(“endTime”).cast(‘TIMESTAMP’))

    df.write.mode(“overwrite”).format(“delta”).saveAsTable(“pbi_refreshables”)

  2. dexterz

    url = f”https://api.powerbi.com/v1.0/myorg/admin/capacities/refreshables?$top=60&$expand=capacity,group”
    r = requests.get(url=url,headers=headers)

    if r.status_code == 200 :
    json_content = r.content.decode(‘utf-8’)
    #json_content = json.dumps(r.json()[‘value’])
    mssparkutils.fs.put(“Files/PBI” + ‘/powerbi_refreshables.json’,json_content,True)

Leave a Reply

Your email address will not be published. Required fields are marked *