Files
MarketAnalysis-ETL/XIEHE/bkp_01 xiehe_blob2dwd.py
chenwu 832c7bcd69 new file: 01 dm_tf_external_sales.sql
new file:   02 dm_td_external_keycompatitor.sql
	new file:   03 dm_td_external_brand_market.sql
	new file:   04 dm_td_external_calendar.sql
	new file:   05 dm_td_external_exchangerate.sql
	new file:   07 dm_td_external_packinfo.sql
	new file:   08 dm_td_external_corp.sql
	new file:   09 dm_td_external_geo_type.sql
	new file:   11 DM_TD_EXTERNAL_MARKET_NEW.sql
	new file:   12 dm_td_external_org.sql
	new file:   13 external auth.sql
	new file:   14 dm_tf_external_retail_special_bkp.sql
	new file:   AIA/01 dm_aia_pack_property.sql
	new file:   AIA/02 dm_ext_aia_sales.sql
	new file:   AIA/02 dm_ext_aia_sales_bakup_20230327.sql
	new file:   AIA/03 dm_aia_flag.sql
	new file:   AIA/04 dm_aia_provided_flag.sql
	new file:   AIA/06 DM_TD_EXT_AIA_PACK_PROPERTY.sql
	new file:   AIA/07 DM_TD_EXT_AIA_PACK2MARKET.sql
	new file:   AIA/08 DM_TD_EXT_AIA_MARKET_RATIO.sql
	new file:   AIA/09 DM_TD_EXT_AIA_MARKET_BRAND_RATIO.sql
	new file:   AIA/10 DM_TD_EXT_AIA_MARKET_PACK_MAPPING.sql
	new file:   AIA/11 DM_TD_EXT_AIA_TARGET_INST.sql
	new file:   AIA/11 dm_aia_targethp_flag.sql
	new file:   "AIA/z_03 dm_aia_flag_\345\244\207\344\273\275\347\224\250.sql"
	new file:   CHC/01 dm_chc_pack_property.sql
	new file:   CHC/02 DM_TF_EXT_CHC_SALES.sql
	new file:   CHC/03 DM_TD_EXT_CHC_PACK_PROPERTY.sql
	new file:   CHC/04 DM_TD_EXT_CHC_PACK2MARKET.sql
	new file:   CHC/05 DM_TD_EXT_CHC_MARKET_RATIO.sql
	new file:   CHC/06 DM_TD_EXT_CHC_MARKET_BRAND_RATIO.sql
	new file:   CHC/07 DM_TD_EXT_CHC_MARKET_PACK_MAPPING.sql
	new file:   CHPA/01 dwd_ims_atc_hierarchy.sql
	new file:   CHPA/01 dwd_ims_nfc_hierarchy.sql
	new file:   CHPA/01 dwd_ims_td_manufacturer_corp.sql
	new file:   CHPA/01 dwd_ims_td_pack_property.sql
	new file:   CHPA/01 dwd_update.sql
	new file:   CHPA/01_FB_BLOB_TO_DWD.sql
	new file:   CHPA/02 DWS_IMS_TD_GEO.sql
	new file:   CHPA/02 dws_ims_td_atc_cn.sql
	new file:   CHPA/02 dws_ims_td_corp_cn.sql
	new file:   CHPA/02 dws_ims_td_date.sql
	new file:   CHPA/02 dws_ims_td_manu_cn.sql
	new file:   CHPA/02 dws_ims_td_market.sql
	new file:   CHPA/02 dws_ims_td_market_ta.sql
	new file:   CHPA/02 dws_ims_td_nfc_cn.sql
	new file:   CHPA/02 dws_ims_td_prod_cn.sql
	new file:   CHPA/02 tmp_ims_td_prod_tmp.sql
	new file:   CHPA/02 tmp_ims_tf_fact_sales.sql
	new file:   CHPA/03 DM_TD_EXT_CHPA_MARKET_BRAND_RATIO.sql
	new file:   CHPA/03 DM_TD_EXT_CHPA_MARKET_PACK_MAPPING.sql
	new file:   CHPA/03 DM_TD_EXT_CHPA_MARKET_RATIO.sql
	new file:   CHPA/03 DM_TD_EXT_CHPA_PACK2MARKET.sql
	new file:   CHPA/03 DM_TD_EXT_CHPA_PACK_PROPERTY.sql
	new file:   CHPA/03 DM_TF_EXT_CHPA_SALES.sql
	new file:   CHPA/03 dm_ims_td_calendar.sql
	new file:   CHPA/03 dm_ims_td_geo.sql
	new file:   CHPA/03 dm_ims_td_market_property.sql
	new file:   CHPA/03 dm_ims_td_org.sql
	new file:   CHPA/03 dm_ims_td_org_hvh.sql
	new file:   CHPA/03 dm_ims_td_pack_property.sql
	new file:   CHPA/03 dm_ims_tf_sales.sql
	new file:   CHPA/03 dm_td_chpa_market_definition.sql
	new file:   CHPA/03 dm_td_ims_city_mapping.sql
	new file:   EC/03 ec_load_data.sql
	new file:   EC/04 DM_TD_EXT_EC_PACK_PROPERTY.sql
	new file:   EC/05 DM_TF_EXT_EC_SALES.sql
	new file:   EC/06 DM_TD_EXT_EC_PACK2MARKET.sql
	new file:   EC/07 DM_TD_EXT_EC_MARKET_RATIO.sql
	new file:   EC/08 DM_TD_EXT_EC_MARKET_BRAND_RATIO.sql
	new file:   EC/09 DM_TD_EXT_EC_MARKET_PACK_MAPPING.sql
	new file:   EC/1 (ec)blob_to_dwd.sql
	new file:   EC/2 dwd_inc_gnd_ext_ec_nationnal_pack_union_all.py
	new file:   Merged_Data/Merged_Data_Config_table_bkp.sql
	new file:   Merged_Data/Merged_Data_Config_table_bymonth.sql
	new file:   Merged_Data/dm_tf_exteranl_sales_merged_data_dtp_others_bkp.sql
	new file:   Merged_Data/dm_tf_exteranl_sales_merged_data_dtp_others_bymonth_bkp.sql
	new file:   ORG/DM_TD_EXT_AIA_ORG.sql
	new file:   ORG/DM_TD_EXT_CHC_ORG.sql
	new file:   ORG/DM_TD_EXT_CHPA_ORG.sql
	new file:   ORG/DM_TD_EXT_COUNTY_ORG.sql
	new file:   ORG/DM_TD_EXT_EC_ORG.sql
	new file:   ORG/DM_TD_EXT_RETAIL_ORG.sql
	new file:   ORG/DM_TD_EXT_THC_ORG.sql
	new file:   ORG/DM_TD_EXT_XIEHE_ORG.sql
	new file:   OTHERS/01 dm_td_report_url.sql
	new file:   OTHERS/02 dws_ext_email_warning.sql
	new file:   OTHERS/external_triggered_email.py
	new file:   Retail/01 load_tmp_data.py
	new file:   Retail/02 split_brand_data.py
	new file:   Retail/03 split_pack_data.py
	new file:   Retail/04 map_to_dws_table.py
	new file:   Retail/05 load_dtp_temp_data.py
	new file:   Retail/06 split_dtp_brand_data.py
	new file:   Retail/07 split_dtp_pack_data.py
	new file:   Retail/08 map_to_dtp_dws_table.py
	new file:   Retail/09 dwd_inc_gnd_ext_retail_nataional.py
	new file:   Retail/10 map_to_retail_dm_table.py
	new file:   Retail/11 map_to_overview_dm_table.py
	new file:   Retail/12 dws_tf_external_retail_dtp_special.sql
	new file:   Retail/13 DM_TF_EXT_RETAIL_SALES.sql
	new file:   Retail/14 DM_TF_EXT_RETAIL_DTP_SALES.sql
	new file:   Retail/15 DM_TD_EXT_RETAIL_PACK_PROPERTY.sql
	new file:   Retail/16 DM_TD_EXT_RETAIL_DTP_PACK_PROPERTY.sql
	new file:   Retail/17 DM_TD_EXT_DTP_PACK2MARKET.sql
	new file:   Retail/17 DM_TD_EXT_RETAIL_PACK2MARKET.sql
	new file:   Retail/18 DM_TD_EXT_DTP_MARKET_RATIO.sql
	new file:   Retail/18 DM_TD_EXT_RETAIL_MARKET_RATIO.sql
	new file:   Retail/19 DM_TD_EXT_DTP_MARKET_BRAND_RATIO.sql
	new file:   Retail/19 DM_TD_EXT_RETAIL_MARKET_BRAND_RATIO.sql
	new file:   Retail/20 DM_TD_EXT_DTP_MARKET_PACK_MAPPING.sql
	new file:   Retail/20 DM_TD_EXT_RETAIL_MARKET_PACK_MAPPING.sql
	new file:   "Retail/z1 dwd_inc_gnd_ext_retail_nataional_\344\275\234\345\272\237.py"
	new file:   "Retail/z2 retail_load_data_\344\275\234\345\272\237.sql"
	new file:   "Retail/z3 retail_overview_data_\344\275\234\345\272\237.sql"
	new file:   THC/01 dm_thc_pack_property.sql
	new file:   THC/02 dm_ext_thc_sales.sql
	new file:   THC/02 dm_ext_thc_sales_bakup_20260327.sql
	new file:   THC/03 DM_TF_EXT_THC_MARKET_SALES_CHT.sql
	new file:   THC/04 dm_tf_external_sales_thc.sql
	new file:   THC/05 DM_TD_EXT_THC_PACK_PROPERTY.sql
	new file:   THC/06 DM_TD_EXT_THC_PACK2MARKET.sql
	new file:   THC/07 DM_TD_EXT_THC_MARKET_RATIO.sql
	new file:   THC/08 DM_TD_EXT_THC_MARKET_BRAND_RATIO.sql
	new file:   THC/09 DM_TD_EXT_THC_MARKET_PACK_MAPPING.sql
	new file:   UNIONALL/DM_TD_EXT_UNIONALL_MARKET_PACK_MAPPING.sql
	new file:   UNIONALL/DM_TD_EXT_UNIONALL_PACKINFO.sql
	new file:   UNIONALL/DM_TF_EXT_UNIONALL_MARKET_SALES.sql
	new file:   UNIONALL/DM_TF_EXT_UNIONALL_MARKET_SALES_v1.0.sql
	new file:   UNIONALL/DM_TF_EXT_UNIONALL_SALES.sql
	new file:   UNIONALL/DM_TF_EXT_UNIONALL_SALES_MAPPING.sql
	new file:   UNIONALL/DM_TF_EXT_UNIONALL_SALES_MAPPING_NIAD.sql
	new file:   XIEHE/01 xiehe_blob_to_dwd.py
	new file:   XIEHE/02 dm_xiehe_pack_property.sql
	new file:   XIEHE/03 dm_ext_xiehe_geo.sql
	new file:   XIEHE/04 dm_ext_xiehe_sales.sql
	new file:   XIEHE/05 dm_td_xiehe_core_dept.sql
	new file:   XIEHE/06 DM_TF_EXT_XIEHE_SALES.sql
	new file:   XIEHE/07 DM_TD_EXT_XIEHE_PACK_PROPERTY.sql
	new file:   XIEHE/08 DM_TD_EXT_XIEHE_PACK2MARKET.sql
	new file:   XIEHE/09 DM_TD_EXT_XIEHE_MARKET_RATIO.sql
	new file:   XIEHE/10 DM_TD_EXT_XIEHE_MARKET_BRAND_RATIO.sql
	new file:   XIEHE/11 DM_TD_EXT_XIEHE_MARKET_PACK_MAPPING.sql
	new file:   XIEHE/bkp_01 xiehe_blob2dwd.py
	new file:   XIEHE/bkp_02 dm_ext_xiehe_sales.sql
	new file:   XIEHE/bkp_03 dm_ext_xiehe_pack_property.sql
	new file:   county/01 tmp_ims_county_fact_sales_sum.sql
	new file:   county/02 tmp_imscounty_Result.sql
	new file:   county/03 dm_ims_td_county_geo.sql
	new file:   county/04 dws_ext_county_tf_sales.sql
	new file:   county/05 dm_ext_county_td_pack_property.sql
	new file:   county/06 dm_td_county_pack_region.sql
	new file:   county/07 dm_ext_county_tf_sales_region.sql
	new file:   county/08 DM_TD_EXT_COUNTY_PACK_PROPERTY.sql
	new file:   county/09 DM_TF_EXT_COUNTY_SALES.sql
	new file:   county/10 DM_TD_EXT_COUNTY_PACK2MARKET.sql
	new file:   county/11 DM_TD_EXT_COUNTY_MARKET_RATIO.sql
	new file:   county/12 DM_TD_EXT_COUNTY_MARKET_BRAND_RATIO.sql
	new file:   county/13 DM_TD_EXT_COUNTY_MARKET_PACK_MAPPING.sql
	new file:   for_AIA_Dashboard/01 dm_td_aia_inst_mkt.sql
	new file:   for_AIA_Dashboard/02 dm_td_aia_auth_sales.sql
	new file:   for_AIA_Dashboard/03 dm_td_aia_original_col.sql
	new file:   for_AIA_Dashboard/04 dm_td_aia_nosales_inst.sql
	new file:   for_AIA_Dashboard/05 dm_td_aia_is_eagle_flag.sql
	new file:   for_AIA_Dashboard/06 dm_td_aia_rank.sql
	new file:   for_AIA_Dashboard/07 dm_ext_aia_data_remove_flag.sql
	new file:   for_AIA_Dashboard/07 dm_td_aia_remove_special_ins_bkp.py
	new file:   for_AIA_Dashboard/08 dm_ext_aia_data_quality_flag.sql
	new file:   z 01 dm_tf_external_sales.sql
	new file:   "z 10 dm_td_external_market_pack_mapping_\344\275\234\345\272\237.sql"
	new file:   "z 11 dm_td_external_market_\344\275\234\345\272\237.sql"
	new file:   "\344\270\212\347\272\277\350\204\232\346\234\254.sql"
	new file:   "\346\225\260\346\215\256\351\252\214\350\257\201.sql"
2026-04-27 15:48:38 +08:00

479 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Databricks notebook source
# MAGIC %run ../../../Common/config
# COMMAND ----------
# MAGIC %sql
# MAGIC insert overwrite dwd.dwd_gnd_xiehe_config_table_mapping
# MAGIC select id,file_name,
# MAGIC concat('dwd.dwd_gnd_xiehe_', id) as table_name,
# MAGIC from_utc_timestamp(current_timestamp(),'UTC+8') as etl_insert_dt
# MAGIC from dwd.dwd_gnd_xiehe_config_table
# COMMAND ----------
# MAGIC %md
# MAGIC + 20250415 kzzh331 uc_upgrade uc兼容调整
# MAGIC 1. 将代码中的`DBFS`路径根调整为`Volume`路径
# MAGIC 2. 将`session`中安装的`mdb`工具调整到集群`init-script`中,`uc`集群可能需要`admin`权限才能在集群中安装程序
# MAGIC 3. 清理重复的`access_file_path_template`赋值,根据环境自动设置`Blob Container`
# COMMAND ----------
# %sh
# # 安装 mdbtools
# sudo apt-get update
# sudo apt-get install -y apt-utils
# sudo DEBIAN_FRONTEND=noninteractive apt-get install -y mdbtools
# COMMAND ----------
# import os
# from pyspark.sql import SparkSession
# # 初始化SparkSession
# spark = SparkSession.builder.getOrCreate()
# # 列出最新日期目录下的所有 Blob 文件
# def list_latest_blob_files(base_path_template):
# try:
# current_date = datetime.utcnow() + timedelta(hours=8)
# date_path = current_date.strftime("%Y/%m/%d/")
# base_path = base_path_template + date_path
# if path_exists(base_path):
# all_files = list_files_recursive(base_path)
# access_files = [file for file in all_files if is_access_file(file)]
# if not access_files:
# print("最新日期目录下未找到 Access 文件。")
# return []
# else:
# return access_files
# else:
# print(f"指定路径不存在: {base_path}")
# return []
# except Exception as e:
# print(f"列出路径模板 {base_path_template} 中的最新 Blob 文件时出错: {e}")
# raise
# # 设置原始路径模板
# # 测试路径
# # access_file_path_template = "abfss://master@retaildlstoragetest.dfs.core.chinacloudapi.cn/ODS/GND/UserUpload/"
# # 生产路径
# access_file_path_template = "abfss://master@azcdatalakeprd.dfs.core.chinacloudapi.cn/ODS/GND/UserUpload/"
# # Blob存储的文件夹路径
# blob_folder_path = list_latest_blob_files(access_file_path_template)
# # 遍历Blob存储上的文件夹和子文件夹
# file_list = []
# for root, dirs, files in os.walk(blob_folder_path):
# for file in files:
# file_list.append((root, file))
# # 创建DataFrame
# df = spark.createDataFrame(file_list, ["folder_path", "file_name"])
# # 将DataFrame写入临时表
# df.createOrReplaceTempView("temp_table")
# COMMAND ----------
# # 730 修改在一半版本
# from io import StringIO
# import pandas as pd
# from pyspark.sql import SparkSession
# from subprocess import run, PIPE
# import subprocess
# from pyspark.sql import SparkSession
# from datetime import datetime
# #获取所有blob文件目录地址并复制到 dbfs 的指定目录下
# def get_access_file_path_dbfs_list(access_file_path_blob,target_path):
# access_file_path_dbfs_list =[]
# # 使用 dbutils.fs.ls 遍历目录
# for file_info in dbutils.fs.ls(access_file_path_blob):
# if file_info.isDir() and file_info.path.endswith('/') :
# # 确保是目录
# blob_path = file_info.path
# for file_name in dbutils.fs.ls(file_info.path):
# if file_name.path.lower().endswith('xiehe.accdb') & file_name.name.lower().startswith('database') :
# dbutils.fs.cp(file_name.path,f"{target_path}/{file_name.name}" )
# access_file_path_dbfs_list.append(f"{target_path}/{file_name.name}")
# return access_file_path_dbfs_list
# import subprocess
# # 将access 文件转化成csv
# def access_file_to_csv(access_file_path_blob,target_path,output_dir):
# access_file_path_dbfs_list = get_access_file_path_dbfs_list(access_file_path_blob,target_path)
# print(access_file_path_dbfs_list)
# for access_path in access_file_path_dbfs_list:
# # 首先列出所有表名
# result = subprocess.run(['mdb-tables', '-1', f"/dbfs{access_path}"], stdout=subprocess.PIPE, text=True)
# print(result.returncode)
# if result.returncode != 0:
# print(f"Error listing tables: {result.stderr}")
# return
# table_names = result.stdout.strip().split('\n')
# print(table_names)
# # 遍历表并保存至csv ,以下代码有问题----修需改
# for table_name in table_names:
# csv_path = f"{output_dir}/{table_name}.csv"
# export_result = subprocess.run(['mdb-export', f"/dbfs{access_path}", table_name], stdout=subprocess.PIPE, text=True)
# with open(csv_path, 'w') as file:
# file.write(export_result.stdout)
# if export_result.returncode != 0:
# print(f"Error exporting {table_name} to {csv_path}: {export_result.stderr}")
# # 以上代码有问题----修需改
# # 还需功能,实现 读取csv 文件写入对应DWD 表,并基于数据时间对 结果表数据进行数据覆盖
# def main(access_file_path_blob,target_path,output_dir):
# access_file_to_csv(access_file_path_blob,target_path,output_dir)
# if __name__ == "__main__":
# # 原始路径模板,使用年月日替换
# access_file_path_template = "abfss://master@retaildlstoragetest.dfs.core.chinacloudapi.cn/ODS/GND/UserUpload/{date}/"
# today = datetime.now().strftime("%Y/%m/%d")
# access_file_path_blob = access_file_path_template.format(date=today)
# main(access_file_path_blob,"/dbfs/tmp" ,"/dbfs/tmp")
# COMMAND ----------
import os
import subprocess
import pandas as pd
from datetime import datetime, timedelta
from concurrent.futures import ThreadPoolExecutor
from pyspark.sql.functions import expr, date_format, lit
from queue import Queue
# 增加一个全局变量,保存线程中生成的临时文件名称
TEMP_FILES = Queue()
# 下载 Access 文件到本地
def download_access_file(file_path, local_path):
# dbutils.fs.cp(file_path, local_path.replace("/dbfs", ""))
dbutils.fs.cp(file_path, local_path)
print(f"已下载 {file_path}{local_path}")
return local_path
# 检查文件是否存在
def file_exists(local_path):
return os.path.exists(local_path)
# 列出 Access 文件中的表
def list_tables_in_access_file(local_path):
if not file_exists(local_path):
raise FileNotFoundError(f"文件未找到: {local_path}")
result = subprocess.run(['mdb-tables', '-1', local_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if result.returncode != 0:
error_message = result.stderr.decode('utf-8')
raise RuntimeError(f"列出表名时出错: {error_message}")
table_names = result.stdout.decode('utf-8').strip().split('\n')
print(f"{local_path} 中的表: {table_names}")
return table_names
# 读取每个表的数据并转换为 Pandas DataFrame
def read_table_from_access(local_path, table_name):
local_dirname = os.path.dirname(local_path)
local_filename = os.path.basename(local_path).split('.')[0]
temp_file_path = f"{local_dirname}/mdb_export_{local_filename}_{table_name}.csv"
try:
# 将数据导出到临时文件
result = subprocess.run(
["mdb-export", local_path, table_name], stdout=open(temp_file_path, "w")
)
# 读取csv文件并清理列名
df = (
spark.read.option("header", "true")
.option("quote", '"')
.option("escape", '"')
.option("multiLine", "true")
.option("mode", "PERMISSIVE")
.csv(temp_file_path)
)
# 清理列名
for col in df.columns:
clean_col = (
col.replace(" ", "_")
.replace(",", "_")
.replace(";", "_")
.replace("{", "_")
.replace("}", "_")
.replace("(", "_")
.replace(")", "_")
.replace("\n", "_")
.replace("\t", "_")
.replace("=", "_")
)
if clean_col != col:
df = df.withColumnRenamed(col, clean_col)
return df
finally:
TEMP_FILES.put(temp_file_path)
# 并行处理表
def process_table_parallel(local_path, table_name, column_mapping, target_columns):
try:
df = read_table_from_access(local_path, table_name)
# 映射列名
if column_mapping:
for old_col, new_col in column_mapping.items():
df = df.withColumnRenamed(old_col, new_col)
# 确保列对齐
for col in target_columns:
if col not in df.columns:
df = df.withColumn(col, lit(None))
# 添加 ETL 字段
df = df.withColumn("etl_insert_dt", date_format(expr("current_timestamp() + INTERVAL 8 HOURS"), "yyyy-MM-dd HH:mm:ss"))
df = df.withColumn("etl_update_dt", date_format(expr("current_timestamp() + INTERVAL 8 HOURS"), "yyyy-MM-dd HH:mm:ss"))
# 重新排序列
spark_df = df.select(target_columns)
return spark_df
except Exception as e:
print(f"处理表 {table_name} 时出错: {e}")
return None
# 处理 Access 文件并转换为 Spark DataFrame
def process_access_files_with_config(access_files, config_df, column_mapping):
all_dataframes = []
# 获取目标表的列名
target_columns = list(column_mapping.values()) + ['etl_insert_dt', 'etl_update_dt']
with ThreadPoolExecutor(max_workers=8) as executor:
futures = []
# 读取配置文件中的配置信息
for index, row in config_df.iterrows():
file_name = row['file_name']
table_name = row['table_name']
# 找到匹配的文件
matching_files = [file for file in access_files if file_name in file]
if not matching_files:
print(f"未找到匹配的文件: {file_name}")
continue
# 读取匹配文件中的数据并合并
for file in matching_files:
try:
local_path = download_access_file(file, f"/Volumes/{NGBI_CATALOG}/tmp/volume_tmp/tmp/{os.path.basename(file)}")
table_names = list_tables_in_access_file(local_path)
for table in table_names:
futures.append(executor.submit(process_table_parallel, local_path, table, column_mapping, target_columns))
except Exception as e:
print(f"处理文件 {file} 时出错: {e}")
continue
for future in futures:
result = future.result()
if result:
all_dataframes.append((result, table_name))
return all_dataframes
# 设置原始路径模板
# 测试路径
# access_file_path_template = "abfss://master@retaildlstoragetest.dfs.core.chinacloudapi.cn/ODS/GND/UserUpload/"
# 生产路径
if ENVIRONMENT == PRD_ENVIRONMENT_VALUE:
access_file_path_template = "abfss://master@azcdatalakeprd.dfs.core.chinacloudapi.cn/ODS/GND/UserUpload/"
elif ENVIRONMENT == TEST_ENVIRONMENT_VALUE:
access_file_path_template = "abfss://master@retaildlstoragetest.dfs.core.chinacloudapi.cn/ODS/GND/UserUpload/"
# 列出最新日期目录下的所有 Blob 文件
def list_latest_blob_files(base_path_template):
try:
current_date = datetime.utcnow() + timedelta(hours=8)
date_path = current_date.strftime("%Y/%m/%d/")
base_path = base_path_template + date_path
if path_exists(base_path):
all_files = list_files_recursive(base_path)
access_files = [file for file in all_files if is_access_file(file)]
if not access_files:
print("最新日期目录下未找到 Access 文件。")
return []
else:
return access_files
else:
print(f"指定路径不存在: {base_path}")
return []
except Exception as e:
print(f"列出路径模板 {base_path_template} 中的最新 Blob 文件时出错: {e}")
raise
# 检查路径是否存在
def path_exists(path):
try:
dbutils.fs.ls(path)
return True
except Exception as e:
if "java.io.FileNotFoundException" in str(e):
return False
else:
print(f"检查路径 {path} 时出错: {e}")
raise
# 判断文件是否为 Access 文件
def is_access_file(file_path):
return file_path.lower().endswith('.accdb')
# 递归遍历目录以列出所有文件
def list_files_recursive(path):
try:
files = []
if path_exists(path):
for file_info in dbutils.fs.ls(path):
if file_info.isDir():
files.extend(list_files_recursive(file_info.path))
else:
files.append(file_info.path)
return files
except Exception as e:
print(f"列出路径 {path} 中的文件时出错: {e}")
raise
# 设置原始路径模板
# 测试路径
# access_file_path_template = "abfss://master@retaildlstoragetest.dfs.core.chinacloudapi.cn/ODS/GND/UserUpload/"
# 生产路径
# access_file_path_template = "abfss://master@azcdatalakeprd.dfs.core.chinacloudapi.cn/ODS/GND/UserUpload/"
# 列出最新日期目录下的 Blob 文件
latest_access_files = list_latest_blob_files(access_file_path_template)
if latest_access_files:
try:
# 从配置表中读取配置数据
config_df = spark.table("dwd.dwd_gnd_xiehe_config_table_mapping").toPandas()
# 字段映射
column_mapping = {
'AREA': 'area',
'Date': 'yq',
'医院级别': 'h_level',
'DEPT_NAME': 'dept_name',
'报销': 'reimburse',
'报销类型': 'reimburse_type',
'处方来源': 'prescription_source',
'ATC': 'atc',
'新code': 'new_code',
'通用名': 'common_name',
'商品名称': 'product_name',
'厂家': 'manu_des',
'规格': 'pack_des',
'给药途径': 'drug_delivery_route',
'剂型': 'nfc',
'处方张数': 'prescription',
'取药数量': 'sales_vol',
'单价': 'price',
'金额': 'sales_value'
}
# 处理并合并最新的 Access 文件的数据
all_dataframes = process_access_files_with_config(latest_access_files, config_df, column_mapping)
if all_dataframes:
combined_spark_df = None
for df, table_name in all_dataframes:
# 将更改字段名后的 DataFrame 转换为 Spark DataFrame
df.createOrReplaceTempView("temp_access_table")
spark.sql(f"CREATE TABLE IF NOT EXISTS {table_name} AS SELECT * FROM temp_access_table")
spark.sql(f"INSERT overwrite {table_name} SELECT * FROM temp_access_table")
if combined_spark_df is None:
combined_spark_df = df
else:
combined_spark_df = combined_spark_df.union(df)
# 合并所有写入的表成一个结果表 result
if combined_spark_df:
combined_spark_df.createOrReplaceTempView("combined_table")
spark.sql("create table if not exists dwd.dwd_inc_gnd_ext_xiehe_union_all as select * from combined_table")
spark.sql("insert overwrite dwd.dwd_inc_gnd_ext_xiehe_union_all select * from combined_table")
else:
print("Access 文件中没有找到数据。")
except Exception as e:
print(f"处理 Access 文件时出错: {e}")
else:
print("没有 Access 文件需要处理。")
# COMMAND ----------
# MAGIC %sql
# MAGIC delete from dwd.dwd_gnd_ext_xiehe_raw_data
# MAGIC where yq in (
# MAGIC select yq from dwd.dwd_inc_gnd_ext_xiehe_union_all t1
# MAGIC where left(t1.etl_insert_dt, 10) = left(from_utc_timestamp(current_timestamp(),'UTC+8'),10)
# MAGIC );
# MAGIC
# MAGIC insert into dwd.dwd_gnd_ext_xiehe_raw_data
# MAGIC (
# MAGIC area,
# MAGIC yq,
# MAGIC h_level,
# MAGIC dept_name,
# MAGIC reimburse,
# MAGIC reimburse_type,
# MAGIC prescription_source,
# MAGIC atc,
# MAGIC new_code,
# MAGIC common_name,
# MAGIC product_name,
# MAGIC manu_des,
# MAGIC pack_des,
# MAGIC drug_delivery_route,
# MAGIC nfc,
# MAGIC prescription,
# MAGIC sales_vol,
# MAGIC price,
# MAGIC sales_value,
# MAGIC etl_insert_dt,
# MAGIC etl_update_dt
# MAGIC )
# MAGIC select area,
# MAGIC yq,
# MAGIC h_level,
# MAGIC dept_name,
# MAGIC reimburse,
# MAGIC reimburse_type,
# MAGIC prescription_source,
# MAGIC atc,
# MAGIC new_code,
# MAGIC common_name,
# MAGIC product_name,
# MAGIC manu_des,
# MAGIC pack_des,
# MAGIC drug_delivery_route,
# MAGIC nfc,
# MAGIC prescription,
# MAGIC sales_vol,
# MAGIC price,
# MAGIC sales_value,
# MAGIC from_utc_timestamp(current_timestamp(),'UTC+8') etl_insert_dt,
# MAGIC from_utc_timestamp(current_timestamp(),'UTC+8') etl_update_dt
# MAGIC from dwd.dwd_inc_gnd_ext_xiehe_union_all
# MAGIC where left(etl_insert_dt, 10) = left(from_utc_timestamp(current_timestamp(),'UTC+8'),10);