如何进行deltalake的curd操作
这篇文章给大家介绍如何进行delta lake 的curd操作,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。
公司主营业务:成都网站设计、做网站、移动网站开发等业务。帮助企业客户真正实现互联网宣传,提高企业的竞争能力。创新互联公司是一支青春激扬、勤奋敬业、活力青春激扬、勤奋敬业、活力澎湃、和谐高效的团队。公司秉承以“开放、自由、严谨、自律”为核心的企业文化,感谢他们对我们的高要求,感谢他们从不同领域给我们带来的挑战,让我们激情的团队有机会用头脑与智慧不断的给客户带来惊喜。创新互联公司推出日照免费做网站回馈大家。
delta lake 的表支持删除和更新数据的语法,下面主要是从sql和scala两个语法说起吧。
1. 删除delta 表数据
可以根据查询条件,从delta表中删除数据,比如删除日期在2017年之前的数据,sql和scala的表达语法如下。
sql
DELETE FROM events WHERE date < '2017-01-01'
DELETE FROM delta.`/data/events/` WHERE date < '2017-01-01'
scala
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "/data/events/")
deltaTable.delete("date < '2017-01-01'") // predicate using SQL formatted string
import org.apache.spark.sql.functions._
import spark.implicits._
deltaTable.delete(col("date") < "2017-01-01") // predicate using Spark SQL functions and implicits
请注意,delete操作会将数据从delta 表的最新版本中删除,但其实只有到历史版本直接被vacuum清空的时候,才会从物理存储中删除数据。
2. 更新表
可以更新满足条件的表。比如想更新eventType的字段字符串的编写失误,可以使用下面的表达,sql和scala的表达分别如下:
sql
UPDATE events SET eventType = 'click' WHERE eventType = 'clck'UPDATE delta.`/data/events/` SET eventType = 'click' WHERE eventType = 'clck'
scala
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "/data/events/")
deltaTable.updateExpr( // predicate and update expressions using SQL formatted string
"eventType = 'clck'",
Map("eventType" -> "'click'")
import org.apache.spark.sql.functions._
import spark.implicits._
deltaTable.update( // predicate using Spark SQL functions and implicits
col("eventType") === "clck",
Map("eventType" -> lit("click")));
3.merge算子实现upsert操作
使用merge操作可以将source表,view,dataframe中的数据upsert到目标的delta lake表中。该操作很像传统数据库的merge into操作,但是额外的支持删除操作,和更新,插入和删除的额外条件。
假设你计算过程中生成了一个dataframe,元素是events,包含eventId。而且该dataframe中数据部分数据的eventId已经在events表中存在了。这个时候就可以使用merge into实现,eventId存在的话就更新其对应的值,不存在就插入其对应的值。实现表达式如下:
sql
MERGE INTO eventsUSING updatesON events.eventId = updates.eventIdWHEN MATCHED THEN UPDATE SET events.data = updates.dataWHEN NOT MATCHED THEN INSERT (date, eventId, data) VALUES (date, eventId, data)
scala
import io.delta.tables._
import org.apache.spark.sql.functions._
val updatesDF = ... // define the updates DataFrame[date, eventId, data]
DeltaTable.forPath(spark, "/data/events/")
.as("events")
.merge(
updatesDF.as("updates"),
"events.eventId = updates.eventId")
.whenMatched
.updateExpr(
Map("data" -> "updates.data"))
.whenNotMatched
.insertExpr(
Map(
"date" -> "updates.date",
"eventId" -> "updates.eventId",
"data" -> "updates.data"))
.execute()
关于如何进行delta lake 的curd操作就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。
本文标题:如何进行deltalake的curd操作
本文路径:http://ybzwz.com/article/joogoc.html