new file: 01 dm_tf_external_sales.sql
new file: 02 dm_td_external_keycompatitor.sql new file: 03 dm_td_external_brand_market.sql new file: 04 dm_td_external_calendar.sql new file: 05 dm_td_external_exchangerate.sql new file: 07 dm_td_external_packinfo.sql new file: 08 dm_td_external_corp.sql new file: 09 dm_td_external_geo_type.sql new file: 11 DM_TD_EXTERNAL_MARKET_NEW.sql new file: 12 dm_td_external_org.sql new file: 13 external auth.sql new file: 14 dm_tf_external_retail_special_bkp.sql new file: AIA/01 dm_aia_pack_property.sql new file: AIA/02 dm_ext_aia_sales.sql new file: AIA/02 dm_ext_aia_sales_bakup_20230327.sql new file: AIA/03 dm_aia_flag.sql new file: AIA/04 dm_aia_provided_flag.sql new file: AIA/06 DM_TD_EXT_AIA_PACK_PROPERTY.sql new file: AIA/07 DM_TD_EXT_AIA_PACK2MARKET.sql new file: AIA/08 DM_TD_EXT_AIA_MARKET_RATIO.sql new file: AIA/09 DM_TD_EXT_AIA_MARKET_BRAND_RATIO.sql new file: AIA/10 DM_TD_EXT_AIA_MARKET_PACK_MAPPING.sql new file: AIA/11 DM_TD_EXT_AIA_TARGET_INST.sql new file: AIA/11 dm_aia_targethp_flag.sql new file: "AIA/z_03 dm_aia_flag_\345\244\207\344\273\275\347\224\250.sql" new file: CHC/01 dm_chc_pack_property.sql new file: CHC/02 DM_TF_EXT_CHC_SALES.sql new file: CHC/03 DM_TD_EXT_CHC_PACK_PROPERTY.sql new file: CHC/04 DM_TD_EXT_CHC_PACK2MARKET.sql new file: CHC/05 DM_TD_EXT_CHC_MARKET_RATIO.sql new file: CHC/06 DM_TD_EXT_CHC_MARKET_BRAND_RATIO.sql new file: CHC/07 DM_TD_EXT_CHC_MARKET_PACK_MAPPING.sql new file: CHPA/01 dwd_ims_atc_hierarchy.sql new file: CHPA/01 dwd_ims_nfc_hierarchy.sql new file: CHPA/01 dwd_ims_td_manufacturer_corp.sql new file: CHPA/01 dwd_ims_td_pack_property.sql new file: CHPA/01 dwd_update.sql new file: CHPA/01_FB_BLOB_TO_DWD.sql new file: CHPA/02 DWS_IMS_TD_GEO.sql new file: CHPA/02 dws_ims_td_atc_cn.sql new file: CHPA/02 dws_ims_td_corp_cn.sql new file: CHPA/02 dws_ims_td_date.sql new file: CHPA/02 dws_ims_td_manu_cn.sql new file: CHPA/02 dws_ims_td_market.sql new file: CHPA/02 dws_ims_td_market_ta.sql new file: CHPA/02 dws_ims_td_nfc_cn.sql new file: CHPA/02 dws_ims_td_prod_cn.sql new file: CHPA/02 tmp_ims_td_prod_tmp.sql new file: CHPA/02 tmp_ims_tf_fact_sales.sql new file: CHPA/03 DM_TD_EXT_CHPA_MARKET_BRAND_RATIO.sql new file: CHPA/03 DM_TD_EXT_CHPA_MARKET_PACK_MAPPING.sql new file: CHPA/03 DM_TD_EXT_CHPA_MARKET_RATIO.sql new file: CHPA/03 DM_TD_EXT_CHPA_PACK2MARKET.sql new file: CHPA/03 DM_TD_EXT_CHPA_PACK_PROPERTY.sql new file: CHPA/03 DM_TF_EXT_CHPA_SALES.sql new file: CHPA/03 dm_ims_td_calendar.sql new file: CHPA/03 dm_ims_td_geo.sql new file: CHPA/03 dm_ims_td_market_property.sql new file: CHPA/03 dm_ims_td_org.sql new file: CHPA/03 dm_ims_td_org_hvh.sql new file: CHPA/03 dm_ims_td_pack_property.sql new file: CHPA/03 dm_ims_tf_sales.sql new file: CHPA/03 dm_td_chpa_market_definition.sql new file: CHPA/03 dm_td_ims_city_mapping.sql new file: EC/03 ec_load_data.sql new file: EC/04 DM_TD_EXT_EC_PACK_PROPERTY.sql new file: EC/05 DM_TF_EXT_EC_SALES.sql new file: EC/06 DM_TD_EXT_EC_PACK2MARKET.sql new file: EC/07 DM_TD_EXT_EC_MARKET_RATIO.sql new file: EC/08 DM_TD_EXT_EC_MARKET_BRAND_RATIO.sql new file: EC/09 DM_TD_EXT_EC_MARKET_PACK_MAPPING.sql new file: EC/1 (ec)blob_to_dwd.sql new file: EC/2 dwd_inc_gnd_ext_ec_nationnal_pack_union_all.py new file: Merged_Data/Merged_Data_Config_table_bkp.sql new file: Merged_Data/Merged_Data_Config_table_bymonth.sql new file: Merged_Data/dm_tf_exteranl_sales_merged_data_dtp_others_bkp.sql new file: Merged_Data/dm_tf_exteranl_sales_merged_data_dtp_others_bymonth_bkp.sql new file: ORG/DM_TD_EXT_AIA_ORG.sql new file: ORG/DM_TD_EXT_CHC_ORG.sql new file: ORG/DM_TD_EXT_CHPA_ORG.sql new file: ORG/DM_TD_EXT_COUNTY_ORG.sql new file: ORG/DM_TD_EXT_EC_ORG.sql new file: ORG/DM_TD_EXT_RETAIL_ORG.sql new file: ORG/DM_TD_EXT_THC_ORG.sql new file: ORG/DM_TD_EXT_XIEHE_ORG.sql new file: OTHERS/01 dm_td_report_url.sql new file: OTHERS/02 dws_ext_email_warning.sql new file: OTHERS/external_triggered_email.py new file: Retail/01 load_tmp_data.py new file: Retail/02 split_brand_data.py new file: Retail/03 split_pack_data.py new file: Retail/04 map_to_dws_table.py new file: Retail/05 load_dtp_temp_data.py new file: Retail/06 split_dtp_brand_data.py new file: Retail/07 split_dtp_pack_data.py new file: Retail/08 map_to_dtp_dws_table.py new file: Retail/09 dwd_inc_gnd_ext_retail_nataional.py new file: Retail/10 map_to_retail_dm_table.py new file: Retail/11 map_to_overview_dm_table.py new file: Retail/12 dws_tf_external_retail_dtp_special.sql new file: Retail/13 DM_TF_EXT_RETAIL_SALES.sql new file: Retail/14 DM_TF_EXT_RETAIL_DTP_SALES.sql new file: Retail/15 DM_TD_EXT_RETAIL_PACK_PROPERTY.sql new file: Retail/16 DM_TD_EXT_RETAIL_DTP_PACK_PROPERTY.sql new file: Retail/17 DM_TD_EXT_DTP_PACK2MARKET.sql new file: Retail/17 DM_TD_EXT_RETAIL_PACK2MARKET.sql new file: Retail/18 DM_TD_EXT_DTP_MARKET_RATIO.sql new file: Retail/18 DM_TD_EXT_RETAIL_MARKET_RATIO.sql new file: Retail/19 DM_TD_EXT_DTP_MARKET_BRAND_RATIO.sql new file: Retail/19 DM_TD_EXT_RETAIL_MARKET_BRAND_RATIO.sql new file: Retail/20 DM_TD_EXT_DTP_MARKET_PACK_MAPPING.sql new file: Retail/20 DM_TD_EXT_RETAIL_MARKET_PACK_MAPPING.sql new file: "Retail/z1 dwd_inc_gnd_ext_retail_nataional_\344\275\234\345\272\237.py" new file: "Retail/z2 retail_load_data_\344\275\234\345\272\237.sql" new file: "Retail/z3 retail_overview_data_\344\275\234\345\272\237.sql" new file: THC/01 dm_thc_pack_property.sql new file: THC/02 dm_ext_thc_sales.sql new file: THC/02 dm_ext_thc_sales_bakup_20260327.sql new file: THC/03 DM_TF_EXT_THC_MARKET_SALES_CHT.sql new file: THC/04 dm_tf_external_sales_thc.sql new file: THC/05 DM_TD_EXT_THC_PACK_PROPERTY.sql new file: THC/06 DM_TD_EXT_THC_PACK2MARKET.sql new file: THC/07 DM_TD_EXT_THC_MARKET_RATIO.sql new file: THC/08 DM_TD_EXT_THC_MARKET_BRAND_RATIO.sql new file: THC/09 DM_TD_EXT_THC_MARKET_PACK_MAPPING.sql new file: UNIONALL/DM_TD_EXT_UNIONALL_MARKET_PACK_MAPPING.sql new file: UNIONALL/DM_TD_EXT_UNIONALL_PACKINFO.sql new file: UNIONALL/DM_TF_EXT_UNIONALL_MARKET_SALES.sql new file: UNIONALL/DM_TF_EXT_UNIONALL_MARKET_SALES_v1.0.sql new file: UNIONALL/DM_TF_EXT_UNIONALL_SALES.sql new file: UNIONALL/DM_TF_EXT_UNIONALL_SALES_MAPPING.sql new file: UNIONALL/DM_TF_EXT_UNIONALL_SALES_MAPPING_NIAD.sql new file: XIEHE/01 xiehe_blob_to_dwd.py new file: XIEHE/02 dm_xiehe_pack_property.sql new file: XIEHE/03 dm_ext_xiehe_geo.sql new file: XIEHE/04 dm_ext_xiehe_sales.sql new file: XIEHE/05 dm_td_xiehe_core_dept.sql new file: XIEHE/06 DM_TF_EXT_XIEHE_SALES.sql new file: XIEHE/07 DM_TD_EXT_XIEHE_PACK_PROPERTY.sql new file: XIEHE/08 DM_TD_EXT_XIEHE_PACK2MARKET.sql new file: XIEHE/09 DM_TD_EXT_XIEHE_MARKET_RATIO.sql new file: XIEHE/10 DM_TD_EXT_XIEHE_MARKET_BRAND_RATIO.sql new file: XIEHE/11 DM_TD_EXT_XIEHE_MARKET_PACK_MAPPING.sql new file: XIEHE/bkp_01 xiehe_blob2dwd.py new file: XIEHE/bkp_02 dm_ext_xiehe_sales.sql new file: XIEHE/bkp_03 dm_ext_xiehe_pack_property.sql new file: county/01 tmp_ims_county_fact_sales_sum.sql new file: county/02 tmp_imscounty_Result.sql new file: county/03 dm_ims_td_county_geo.sql new file: county/04 dws_ext_county_tf_sales.sql new file: county/05 dm_ext_county_td_pack_property.sql new file: county/06 dm_td_county_pack_region.sql new file: county/07 dm_ext_county_tf_sales_region.sql new file: county/08 DM_TD_EXT_COUNTY_PACK_PROPERTY.sql new file: county/09 DM_TF_EXT_COUNTY_SALES.sql new file: county/10 DM_TD_EXT_COUNTY_PACK2MARKET.sql new file: county/11 DM_TD_EXT_COUNTY_MARKET_RATIO.sql new file: county/12 DM_TD_EXT_COUNTY_MARKET_BRAND_RATIO.sql new file: county/13 DM_TD_EXT_COUNTY_MARKET_PACK_MAPPING.sql new file: for_AIA_Dashboard/01 dm_td_aia_inst_mkt.sql new file: for_AIA_Dashboard/02 dm_td_aia_auth_sales.sql new file: for_AIA_Dashboard/03 dm_td_aia_original_col.sql new file: for_AIA_Dashboard/04 dm_td_aia_nosales_inst.sql new file: for_AIA_Dashboard/05 dm_td_aia_is_eagle_flag.sql new file: for_AIA_Dashboard/06 dm_td_aia_rank.sql new file: for_AIA_Dashboard/07 dm_ext_aia_data_remove_flag.sql new file: for_AIA_Dashboard/07 dm_td_aia_remove_special_ins_bkp.py new file: for_AIA_Dashboard/08 dm_ext_aia_data_quality_flag.sql new file: z 01 dm_tf_external_sales.sql new file: "z 10 dm_td_external_market_pack_mapping_\344\275\234\345\272\237.sql" new file: "z 11 dm_td_external_market_\344\275\234\345\272\237.sql" new file: "\344\270\212\347\272\277\350\204\232\346\234\254.sql" new file: "\346\225\260\346\215\256\351\252\214\350\257\201.sql"
This commit is contained in:
103
EC/1 (ec)blob_to_dwd.sql
Normal file
103
EC/1 (ec)blob_to_dwd.sql
Normal file
@@ -0,0 +1,103 @@
|
||||
-- Databricks notebook source
|
||||
-- MAGIC %python
|
||||
-- MAGIC import datetime
|
||||
-- MAGIC from pyspark.sql.functions import current_timestamp, expr, date_format
|
||||
-- MAGIC
|
||||
-- MAGIC # 计算当前日期
|
||||
-- MAGIC current_date_utc = datetime.datetime.utcnow()
|
||||
-- MAGIC offset = datetime.timedelta(hours=8)
|
||||
-- MAGIC current_date = current_date_utc + offset
|
||||
-- MAGIC
|
||||
-- MAGIC today_path = "{:04d}/{:02d}/{:02d}/".format(
|
||||
-- MAGIC current_date.year,
|
||||
-- MAGIC current_date.month,
|
||||
-- MAGIC current_date.day,
|
||||
-- MAGIC )
|
||||
-- MAGIC # 基础路径
|
||||
-- MAGIC # 测试环境
|
||||
-- MAGIC base_path_0 = f"abfss://master@retaildlstoragetest.dfs.core.chinacloudapi.cn/ODS/GND/UserUpload/"
|
||||
-- MAGIC # 生产环境
|
||||
-- MAGIC # base_path_0 = f"abfss://master@azcdatalakeprd.dfs.core.chinacloudapi.cn/ODS/GND/UserUpload/"
|
||||
-- MAGIC base_path = base_path_0 + today_path
|
||||
-- MAGIC print(base_path)
|
||||
-- MAGIC
|
||||
-- MAGIC # 检查基础路径是否存在
|
||||
-- MAGIC def check_path_exists(path):
|
||||
-- MAGIC try:
|
||||
-- MAGIC dbutils.fs.ls(path)
|
||||
-- MAGIC return True
|
||||
-- MAGIC except Exception as e:
|
||||
-- MAGIC return False
|
||||
-- MAGIC
|
||||
-- MAGIC if check_path_exists(base_path):
|
||||
-- MAGIC # 列出所有批次路径
|
||||
-- MAGIC batch_paths = dbutils.fs.ls(base_path)
|
||||
-- MAGIC
|
||||
-- MAGIC # 从已存在的配置表中读取数据
|
||||
-- MAGIC config_df = spark.table("dwd.dwd_gnd_ec_config_table")
|
||||
-- MAGIC
|
||||
-- MAGIC # 逐批处理
|
||||
-- MAGIC for batch in batch_paths:
|
||||
-- MAGIC current_batch_number = int(batch.name.strip('/'))
|
||||
-- MAGIC # print(f"Checking batch {current_batch_number} at {batch.path}")
|
||||
-- MAGIC
|
||||
-- MAGIC files_in_batch = dbutils.fs.ls(batch.path)
|
||||
-- MAGIC
|
||||
-- MAGIC print("该批次中的文件:")
|
||||
-- MAGIC for file in files_in_batch:
|
||||
-- MAGIC print(file.name)
|
||||
-- MAGIC
|
||||
-- MAGIC for row in config_df.collect():
|
||||
-- MAGIC file_name = row['file_name'].strip().lower()
|
||||
-- MAGIC table_name = row['table_name']
|
||||
-- MAGIC
|
||||
-- MAGIC # 检查文件是否匹配
|
||||
-- MAGIC matching_files = [f for f in files_in_batch if f.name.strip().lower() == file_name]
|
||||
-- MAGIC
|
||||
-- MAGIC for match in matching_files:
|
||||
-- MAGIC csv_file_path = batch.path + match.name
|
||||
-- MAGIC print(f"找到匹配的文件: {csv_file_path}")
|
||||
-- MAGIC
|
||||
-- MAGIC # 读取 CSV 文件
|
||||
-- MAGIC df = spark.read.format("csv").option("header", "true").option("charset", "GBK").load(csv_file_path)
|
||||
-- MAGIC
|
||||
-- MAGIC # 对列进行重命名(假设所有文件的列名相同)
|
||||
-- MAGIC df = df.withColumnRenamed('时间(月度)', 'time')
|
||||
-- MAGIC df = df.withColumnRenamed('平台', 'platform')
|
||||
-- MAGIC df = df.withColumnRenamed('店铺名称', 'store_name')
|
||||
-- MAGIC df = df.withColumnRenamed('店铺类型', 'store_type')
|
||||
-- MAGIC df = df.withColumnRenamed('产品ID', 'product_id')
|
||||
-- MAGIC df = df.withColumnRenamed('品牌', 'brand_name')
|
||||
-- MAGIC df = df.withColumnRenamed('品名', 'category_name')
|
||||
-- MAGIC df = df.withColumnRenamed('商品名', 'product_name')
|
||||
-- MAGIC df = df.withColumnRenamed('通用名', 'common_name')
|
||||
-- MAGIC df = df.withColumnRenamed('厂家', 'factory')
|
||||
-- MAGIC df = df.withColumnRenamed('集团权益', 'group_interest')
|
||||
-- MAGIC df = df.withColumnRenamed('规格', 'specification')
|
||||
-- MAGIC df = df.withColumnRenamed('单件包装盒数', 'pcs_per_box')
|
||||
-- MAGIC df = df.withColumnRenamed('剂型', 'dosage')
|
||||
-- MAGIC df = df.withColumnRenamed('细分一', 'des1')
|
||||
-- MAGIC df = df.withColumnRenamed('细分二', 'des2')
|
||||
-- MAGIC df = df.withColumnRenamed('细分三', 'des3')
|
||||
-- MAGIC df = df.withColumnRenamed('细分四', 'des4')
|
||||
-- MAGIC df = df.withColumnRenamed('销售额', 'sales_amount')
|
||||
-- MAGIC df = df.withColumnRenamed('成交件数', 'sold_pcs')
|
||||
-- MAGIC df = df.withColumnRenamed('平均单价(元/件)', 'aup_pices')
|
||||
-- MAGIC df = df.withColumnRenamed('销售量(盒)', 'sales_qty')
|
||||
-- MAGIC df = df.withColumnRenamed('平均单价(元/盒)', 'aup_box')
|
||||
-- MAGIC df = df.withColumnRenamed('PROD_COD', 'PROD_COD')
|
||||
-- MAGIC df = df.withColumnRenamed('PACK_COD', 'PACK_COD')
|
||||
-- MAGIC df = df.withColumnRenamed('APP3_COD', 'APP3_COD')
|
||||
-- MAGIC df = df.withColumnRenamed('ATC4_COD', 'ATC4_COD')
|
||||
-- MAGIC
|
||||
-- MAGIC # 添加 'etl_insert_dt' 列,包含当前时间并加上 8 小时的时差
|
||||
-- MAGIC df = df.withColumn('etl_insert_dt', date_format(expr("current_timestamp() + INTERVAL 8 HOURS"), 'yyyy-MM-dd HH:mm:ss'))
|
||||
-- MAGIC
|
||||
-- MAGIC # 将数据保存到目标表
|
||||
-- MAGIC df.write.mode("overwrite").saveAsTable(table_name)
|
||||
-- MAGIC
|
||||
-- MAGIC print(f"数据已写入表 {table_name}")
|
||||
-- MAGIC else:
|
||||
-- MAGIC print("未找到批次或路径不存在。")
|
||||
-- MAGIC
|
||||
-- MAGIC
|
||||
Reference in New Issue
Block a user