39
loading...
This website collects cookies to deliver better user experience
A data lake is a centralized repository that allows you to store all your structured and unstructured data at any scale. You can store your data as-is, without having to first structure the data, and run different types of analytics—from dashboards and visualizations to big data processing, real-time analytics, and machine learning to guide better decisions.
An open-source storage layer that brings scalable, ACID transactions to Apache Spark™ and big data workloads. It provides serializability, the strongest level of isolation level. Scalable Metadata Handling, Time Travel, and is 100% compatible with Apache Spark APIs.
Atomicity states that it should either write full data or nothing to the data source when using spark data frame writer. Consistency, on the other hand, ensures that the data is always in a valid state.
We know that when a transaction is in process and not yet committed, it must remain isolated from any other transaction. This is called Isolation Property. It means writing to a data set shouldn’t impact another concurrent read/write on the same data set.
Finally, Durability. It is the ACID property which guarantees that transactions that have committed will survive permanently. However, when Spark doesn’t correctly implement the commit, then all the durability features offered by the storage goes for a toss.
a serverless data integration service that makes it easy to discover, prepare, and combine data for analytics, machine learning, and application development.
an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run.
an object storage service that offers industry-leading scalability, data availability, security, and performance.
initial data
then I want to apply changes to the Sales
and Profit
column. Then the table in the AWS Glue Data Catalog should be able to capture that changes. Just a basic update to the data..jar
file to access it's libraries. You can download it here. Upload it on your S3 Bucket and take note of the S3 path, we'll use this as a reference later. This job runs
to A new script to be authored by you
. This will allow you to have a custom spark code.from delta import *
from pyspark.sql.session import SparkSession
# Initialize Spark Session with Delta Lake
spark = SparkSession \
.builder \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
SparkSession
along with the Delta Lake configurations.# Read Source
inputDF = spark.read.format("csv") \
.option("header", "true") \
.load('s3://delta-lake-ia-test/raw/')
# Write data as DELTA TABLE
inputDF.write.format("delta") \
.mode("overwrite") \
.save("s3a://delta-lake-ia-test/current/")
Delta
format. s3a
prefix in the save path, it is essential to use the s3a
prefix instead of the standard s3
as the path. As using the s3
prefix, will throw an UnsupportedFileSystemException
error. Followed by a fs.AbstractFileSystem.s3.impl=null: No AbstractFileSystem configured for scheme: s3
.# Generate MANIFEST file for Athena/Catalog
deltaTable = DeltaTable.forPath(spark, "s3a://delta-lake-ia-test/current/")
deltaTable.generate("symlink_format_manifest")
manifest
file, which is a text file containing the list of data files to read for querying a table. Running the above code will generate a manifest
file.from delta import *
from pyspark.sql.session import SparkSession
spark = SparkSession \
.builder \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# Read Source
inputDF = spark.read.format("csv").option("header", "true").load('s3://delta-lake-ia-test/raw/')
# Write data as DELTA TABLE
inputDF.write.format("delta").mode("overwrite").save("s3a://delta-lake-ia-test/current/")
# Generate MANIFEST file for Athena/Catalog
deltaTable = DeltaTable.forPath(spark, "s3a://delta-lake-ia-test/current/")
deltaTable.generate("symlink_format_manifest")
_symlink_format_manifest
prefix/folder. This will be used by Amazon Athena for mapping out the parquet files.CREATE EXTERNAL TABLE IF NOT EXISTS "default"."superstore" (
row_id STRING,
order_id STRING,
order_date STRING,
ship_date STRING,
ship_mode STRING,
customer_id STRING,
customer_name STRING,
segment STRING,
country STRING,
city STRING,
state STRING,
postal_code STRING,
region STRING,
product_id STRING,
category STRING,
sub_category STRING,
product_name STRING,
sales STRING,
quantity STRING,
discount STRING,
profit STRING
) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://delta-lake-ia-test/current/_symlink_format_manifest/'
STRING
for all columns.-- Run a simple select
SELECT *
FROM "default"."superstore"
LIMIT 10
99999
values in the sales and profit for the first 15 rows. Feel free to have your own modifications. from delta import *
from pyspark.sql.session import SparkSession
spark = SparkSession \
.builder \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# Read updates
df_updates = spark.read.format("csv").option("header", "true").load('s3://delta-lake-ia-test/updates/')
# Read current as DELTA TABLE
df = DeltaTable.forPath(spark, "s3a://delta-lake-ia-test/current/")
DeltaTable
object, which allows us to call functions in the delta package.# UPSERT process
final_df = df.alias("full_df").merge(
source = df_updates.alias("append_df"),
condition = expr("append_df.row_id = full_df.row_id"))\
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
merge(source, condition)
function, which:Merges the data from the source DataFrame based on the given merge condition.
DeltaTable DataFrame
object, then give it an alias. We then call the merge()
function, supplying the Parameters
with the our Arguments
. Which, in this case, is the updates DataFrame
and the merge condition.whenMatchedUpdateAll(condition=None)
Updates all the columns of the matched table row with the values of the corresponding columns in the source row. If a condition
is specified, then it must be true for the new row to be updated.
merge()
function doesn't match, then we do a whenNotMatchedInsertAll(condition=None)
Insert a new target Delta table row by assigning the target columns to the values of the corresponding columns in the source row. If a condition
is specified, then it must evaluate to true for the new row to be inserted.
execute()
function to sum it upExecute the merge operation based on the built matched and not matched actions.
# Generate new MANIFEST file
final_df = DeltaTable.forPath(spark, "s3a://delta-lake-ia-test/current/")
final_df.generate("symlink_format_manifest")
manifest
file.from delta import *
from pyspark.sql.session import SparkSession
spark = SparkSession \
.builder \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# Read updates
df_updates = spark.read.format("csv").option("header", "true").load('s3://delta-lake-ia-test/updates/')
# Read current as DELTA TABLE
df = DeltaTable.forPath(spark, "s3a://delta-lake-ia-test/current/")
# UPSERT process
final_df = df.alias("full_df").merge(
source = df_updates.alias("append_df"),
condition = expr("append_df.row_id = full_df.row_id"))\
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
# Generate new MANIFEST file
final_df = DeltaTable.forPath(spark, "s3a://delta-lake-ia-test/current/")
final_df.generate("symlink_format_manifest")
immutable
or not.The data should be stored in a raw format from the different source systems. More importantly, the data stored should be immutable in nature.
By making it immutable, it inherently takes care of human fault tolerance to at least some extent and takes away errors with regards to data loss and corruption. It allows data to be selected, inserted, and not updated or deleted.
To cater to fundamental fast processing/performance, the data is usually stored in a denormalized fashion. Data being immutable makes the system in general simpler and more manageable.
Imagine I had a large set of data, say GBs in a file, would I want to download this and change a few values before uploading it again? Do we want a large ETL load process to repeat?
Could we repeat the process and reload a file again? I don't think so, but it's hard to decide. After all, the lake isn't the source of data; that is some other system.
Maybe that's the simplest solution, and one that reduces complexity, downtime, or anything else that might be involved with locking and changing a file.
I suspect immutability should be asked after asking if you should even have the data lake or warehouse in the first place.