-- Databricks notebook source -- DBTITLE 1,DDL语句定义表 -- CREATE OR REPLACE TABLE dwd.dwd_ext_email_warning ( -- data_source STRING COMMENT '数据源名称', -- max_data STRING COMMENT '数据源最新的日期,YYYYMM格式', -- record_date STRING COMMENT '记录数据的日期', -- etl_timestamp TIMESTAMP COMMENT 'ETL运行时间') -- USING delta -- LOCATION 'abfss://master@azcdatalakeprd.dfs.core.chinacloudapi.cn/DWD/dwd_ext_email_warning' -- ; -- CREATE OR REPLACE TABLE dws.dws_ext_email_warning ( -- data_source STRING COMMENT '数据源名称', -- max_data STRING COMMENT '数据源最新的日期,YYYYMM格式', -- if_update INT COMMENT '0代表没有更新,1代表更新过数据源', -- etl_timestamp TIMESTAMP) -- USING delta -- LOCATION 'abfss://master@azcdatalakeprd.dfs.core.chinacloudapi.cn/DWS/dws_ext_email_warning' -- COMMAND ---------- -- MAGIC %md -- MAGIC ### dwd层 -- COMMAND ---------- -- DBTITLE 1,删除今天的数据并插入最新数据 delete from dwd.dwd_ext_email_warning where record_date = date_format( from_utc_timestamp( current_timestamp,'UTC+8'),'yyyy-MM-dd');--删除今天的输入记录 insert into dwd.dwd_ext_email_warning select data_source ,max(yyyymm) max_data --数据源最大日期 ,date_format( from_utc_timestamp( current_timestamp,'UTC+8'),'yyyy-MM-dd') record_date ,from_utc_timestamp( current_timestamp,'UTC+8') etl_timestamp from dm.dm_tf_ext_unionall_sales group by DATA_SOURCE -- COMMAND ---------- -- MAGIC %md -- MAGIC ### dws层 -- COMMAND ---------- -- DBTITLE 1,计算出各数据是否更新了数据 create or replace temporary view temp_datasource_update_info as /* 1 获取今天日期的各数据源的记录情况 2 获取今天之前的上一个周期记录情况 3 今天日期的数据源最新数据月份 与 上一个周期的数据源最新数据月份 是否相等 */ with today_record_date ( select data_source ,max_data ,record_date from dwd.dwd_ext_email_warning where record_date = date_format( from_utc_timestamp( current_timestamp,'UTC+8'),'yyyy-MM-dd') --取今天的数据 ) ,previous_record_date ( select data_source ,max_data ,record_date from ( select data_source ,max_data ,record_date ,row_number() over( partition by data_source order by record_date desc ) num from dwd.dwd_ext_email_warning where record_date <> date_format( from_utc_timestamp( current_timestamp,'UTC+8'),'yyyy-MM-dd') ) where num = 1 ) select t1.data_source ,t1.max_data ,case when t1.max_data = t2.max_data then 0 else 1 end as if_update ,from_utc_timestamp( current_timestamp,'UTC+8') etl_timestamp from today_record_date t1 inner join previous_record_date t2 on t1.data_source = t2.data_source -- COMMAND ---------- -- DBTITLE 1,写入dws层 INSERT OVERWRITE dws.dws_ext_email_warning select data_source ,max_data ,if_update ,etl_timestamp from temp_datasource_update_info -- COMMAND ---------- -- MAGIC %md -- MAGIC ### ads层 -- COMMAND ---------- -- select -- data_source -- ,max_data -- ,if_update -- from dws.dws_ext_email_warning