23
loading...
This website collects cookies to deliver better user experience
With practical examples, learn how to leverage integration between these services for processing data with Apache Spark
How to process existing data in Azure Data Explorer using Spark and Azure Synapse.
Process streaming and batch data using Spark and write it back to Azure data explore.
Notebooks are available in this GitHub repo — https://github.com/abhirockzz/synapse-azure-data-explorer-101
.alter database adxdb policy ingestionbatching @'{"MaximumBatchingTimeSpan": "00:00:30"}'
The impact of setting this policy to a very small value is an increased cost and reduced performance — this is just for demo purposes
Managed Identity is being used as the Authentication Method as opposed to Service Principals
.create table StormEvents_1 (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)
.ingest into table StormEvents_1 'https://kustosamplefiles.blob.core.windows.net/samplefiles/StormEvents.csv?sv=2019-12-12&ss=b&srt=o&sp=r&se=2022-09-05T02:23:52Z&st=2020-09-04T18:23:52Z&spr=https&sig=VrOfQMT1gUrHltJ8uhjYcCequEcfhjyyMX%2FSc3xsCy4%3D' with (ignoreFirstRecord=true)
If you found this technique useful, I encourage you to try out one-click ingestion as well!
.show ingestion failures
StormEvents_1| count
StormEvents_1| take 5
StormEvents_1| take 5 | project StartTime, EndTime, State, EventType, DamageProperty, Source
kustoDf = spark.read \
.format("com.microsoft.kusto.spark.synapse.datasource") \
.option("spark.synapse.linkedService", "adx") \
.option("kustoDatabase", "adxdb") \
.option("kustoQuery", "StormEvents_1 | take 10") \
.load()
display(kustoDf)
filtered_df = spark.read \
.format("com.microsoft.kusto.spark.synapse.datasource") \
.option("spark.synapse.linkedService", "AzureDataExplorer1") \
.option("kustoDatabase", "mydb") \
.option("kustoQuery", "StormEvents_1 | where DamageProperty > 0 and DeathsDirect > 0 | project EventId, State, StartTime, EndTime, EventType, DamageProperty, DeathsDirect, Source") \
.load()
filtered_df.createOrReplaceTempView("storm_dataset")
import seaborn as sns
import matplotlib.pyplot as plt
filtered_df = filtered_df.toPandas()
ax = sns.barplot(x="DeathsDirect", y="EventType",data=filtered_df)
ax.set_title('deaths per event type')
ax.set_xlabel('Deaths#')
ax.set_ylabel('Event Type')
plt.show()
%%sql
SELECT EventType, AVG(DamageProperty) AS avg_property_damage
FROM storm_dataset
GROUP BY EventType
ORDER BY avg_property_damage DESC
%%sql
SELECT
State
, MAX(DeathsDirect) AS deaths
FROM storm_dataset
GROUP BY State
ORDER BY deaths DESC
.create table StormEvents_2 (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)
curl -o StormEvents.csv "https://kustosamplefiles.blob.core.windows.net/samplefiles/StormEvents.csv?sv=2019-12-12&ss=b&srt=o&sp=r&se=2022-09-05T02:23:52Z&st=2020-09-04T18:23:52Z&spr=https&sig=VrOfQMT1gUrHltJ8uhjYcCequEcfhjyyMX%2FSc3xsCy4%3D"
For the subsequent steps, you can either paste the code directly into a Synapse Studio notebook in Azure Synapse Analytics or import this notebook into the workspace.
events = (spark.read
.csv("/StormEvents.csv", header=True, inferSchema='true')
)
events_filtered = events.dropna() \
.drop('StormSummary', 'EndLat','EndLon','BeginLat','BeginLon') \
.filter((events.DamageProperty > 0))
print(events_filtered.count())
display(events_filtered.take(10))
events_filtered.write \
.format("com.microsoft.kusto.spark.synapse.datasource") \
.option("spark.synapse.linkedService", "adx") \
.option("kustoDatabase", "adxdb") \
.option("kustoTable", "StormEvents_2") \
.option("tableCreateOptions","FailIfNotExist") \
.mode("Append") \
.save()
Notice that we’ve used FailIfNotExist which implies that the operation will fail if the table is not found in the requested cluster and database.
The other option is CreateIfNotExist — if the table is not found in the requested cluster and database, it will be created, with a schema matching the DataFrame that is being written.
For more refer to https://github.com/Azure/azure-kusto-spark/blob/master/docs/KustoSink.md#supported-options
.show ingestion failures
StormEvents_2| take 10
StormEvents_2
| summarize event_count=count() by bin(StartTime, 1d)
| render timechart
How to setup and configure Azure Synapse and Azure Data Explorer (including secure access).
How to make the most of existing data in Azure Data Explorer and process it using Apache Spark pools in Azure Synapse.
How to process data from external sources and write the results back Azure Data Explorer for further analysis.