new file: 02 dm_td_external_keycompatitor.sql new file: 03 dm_td_external_brand_market.sql new file: 04 dm_td_external_calendar.sql new file: 05 dm_td_external_exchangerate.sql new file: 07 dm_td_external_packinfo.sql new file: 08 dm_td_external_corp.sql new file: 09 dm_td_external_geo_type.sql new file: 11 DM_TD_EXTERNAL_MARKET_NEW.sql new file: 12 dm_td_external_org.sql new file: 13 external auth.sql new file: 14 dm_tf_external_retail_special_bkp.sql new file: AIA/01 dm_aia_pack_property.sql new file: AIA/02 dm_ext_aia_sales.sql new file: AIA/02 dm_ext_aia_sales_bakup_20230327.sql new file: AIA/03 dm_aia_flag.sql new file: AIA/04 dm_aia_provided_flag.sql new file: AIA/06 DM_TD_EXT_AIA_PACK_PROPERTY.sql new file: AIA/07 DM_TD_EXT_AIA_PACK2MARKET.sql new file: AIA/08 DM_TD_EXT_AIA_MARKET_RATIO.sql new file: AIA/09 DM_TD_EXT_AIA_MARKET_BRAND_RATIO.sql new file: AIA/10 DM_TD_EXT_AIA_MARKET_PACK_MAPPING.sql new file: AIA/11 DM_TD_EXT_AIA_TARGET_INST.sql new file: AIA/11 dm_aia_targethp_flag.sql new file: "AIA/z_03 dm_aia_flag_\345\244\207\344\273\275\347\224\250.sql" new file: CHC/01 dm_chc_pack_property.sql new file: CHC/02 DM_TF_EXT_CHC_SALES.sql new file: CHC/03 DM_TD_EXT_CHC_PACK_PROPERTY.sql new file: CHC/04 DM_TD_EXT_CHC_PACK2MARKET.sql new file: CHC/05 DM_TD_EXT_CHC_MARKET_RATIO.sql new file: CHC/06 DM_TD_EXT_CHC_MARKET_BRAND_RATIO.sql new file: CHC/07 DM_TD_EXT_CHC_MARKET_PACK_MAPPING.sql new file: CHPA/01 dwd_ims_atc_hierarchy.sql new file: CHPA/01 dwd_ims_nfc_hierarchy.sql new file: CHPA/01 dwd_ims_td_manufacturer_corp.sql new file: CHPA/01 dwd_ims_td_pack_property.sql new file: CHPA/01 dwd_update.sql new file: CHPA/01_FB_BLOB_TO_DWD.sql new file: CHPA/02 DWS_IMS_TD_GEO.sql new file: CHPA/02 dws_ims_td_atc_cn.sql new file: CHPA/02 dws_ims_td_corp_cn.sql new file: CHPA/02 dws_ims_td_date.sql new file: CHPA/02 dws_ims_td_manu_cn.sql new file: CHPA/02 dws_ims_td_market.sql new file: CHPA/02 dws_ims_td_market_ta.sql new file: CHPA/02 dws_ims_td_nfc_cn.sql new file: CHPA/02 dws_ims_td_prod_cn.sql new file: CHPA/02 tmp_ims_td_prod_tmp.sql new file: CHPA/02 tmp_ims_tf_fact_sales.sql new file: CHPA/03 DM_TD_EXT_CHPA_MARKET_BRAND_RATIO.sql new file: CHPA/03 DM_TD_EXT_CHPA_MARKET_PACK_MAPPING.sql new file: CHPA/03 DM_TD_EXT_CHPA_MARKET_RATIO.sql new file: CHPA/03 DM_TD_EXT_CHPA_PACK2MARKET.sql new file: CHPA/03 DM_TD_EXT_CHPA_PACK_PROPERTY.sql new file: CHPA/03 DM_TF_EXT_CHPA_SALES.sql new file: CHPA/03 dm_ims_td_calendar.sql new file: CHPA/03 dm_ims_td_geo.sql new file: CHPA/03 dm_ims_td_market_property.sql new file: CHPA/03 dm_ims_td_org.sql new file: CHPA/03 dm_ims_td_org_hvh.sql new file: CHPA/03 dm_ims_td_pack_property.sql new file: CHPA/03 dm_ims_tf_sales.sql new file: CHPA/03 dm_td_chpa_market_definition.sql new file: CHPA/03 dm_td_ims_city_mapping.sql new file: EC/03 ec_load_data.sql new file: EC/04 DM_TD_EXT_EC_PACK_PROPERTY.sql new file: EC/05 DM_TF_EXT_EC_SALES.sql new file: EC/06 DM_TD_EXT_EC_PACK2MARKET.sql new file: EC/07 DM_TD_EXT_EC_MARKET_RATIO.sql new file: EC/08 DM_TD_EXT_EC_MARKET_BRAND_RATIO.sql new file: EC/09 DM_TD_EXT_EC_MARKET_PACK_MAPPING.sql new file: EC/1 (ec)blob_to_dwd.sql new file: EC/2 dwd_inc_gnd_ext_ec_nationnal_pack_union_all.py new file: Merged_Data/Merged_Data_Config_table_bkp.sql new file: Merged_Data/Merged_Data_Config_table_bymonth.sql new file: Merged_Data/dm_tf_exteranl_sales_merged_data_dtp_others_bkp.sql new file: Merged_Data/dm_tf_exteranl_sales_merged_data_dtp_others_bymonth_bkp.sql new file: ORG/DM_TD_EXT_AIA_ORG.sql new file: ORG/DM_TD_EXT_CHC_ORG.sql new file: ORG/DM_TD_EXT_CHPA_ORG.sql new file: ORG/DM_TD_EXT_COUNTY_ORG.sql new file: ORG/DM_TD_EXT_EC_ORG.sql new file: ORG/DM_TD_EXT_RETAIL_ORG.sql new file: ORG/DM_TD_EXT_THC_ORG.sql new file: ORG/DM_TD_EXT_XIEHE_ORG.sql new file: OTHERS/01 dm_td_report_url.sql new file: OTHERS/02 dws_ext_email_warning.sql new file: OTHERS/external_triggered_email.py new file: Retail/01 load_tmp_data.py new file: Retail/02 split_brand_data.py new file: Retail/03 split_pack_data.py new file: Retail/04 map_to_dws_table.py new file: Retail/05 load_dtp_temp_data.py new file: Retail/06 split_dtp_brand_data.py new file: Retail/07 split_dtp_pack_data.py new file: Retail/08 map_to_dtp_dws_table.py new file: Retail/09 dwd_inc_gnd_ext_retail_nataional.py new file: Retail/10 map_to_retail_dm_table.py new file: Retail/11 map_to_overview_dm_table.py new file: Retail/12 dws_tf_external_retail_dtp_special.sql new file: Retail/13 DM_TF_EXT_RETAIL_SALES.sql new file: Retail/14 DM_TF_EXT_RETAIL_DTP_SALES.sql new file: Retail/15 DM_TD_EXT_RETAIL_PACK_PROPERTY.sql new file: Retail/16 DM_TD_EXT_RETAIL_DTP_PACK_PROPERTY.sql new file: Retail/17 DM_TD_EXT_DTP_PACK2MARKET.sql new file: Retail/17 DM_TD_EXT_RETAIL_PACK2MARKET.sql new file: Retail/18 DM_TD_EXT_DTP_MARKET_RATIO.sql new file: Retail/18 DM_TD_EXT_RETAIL_MARKET_RATIO.sql new file: Retail/19 DM_TD_EXT_DTP_MARKET_BRAND_RATIO.sql new file: Retail/19 DM_TD_EXT_RETAIL_MARKET_BRAND_RATIO.sql new file: Retail/20 DM_TD_EXT_DTP_MARKET_PACK_MAPPING.sql new file: Retail/20 DM_TD_EXT_RETAIL_MARKET_PACK_MAPPING.sql new file: "Retail/z1 dwd_inc_gnd_ext_retail_nataional_\344\275\234\345\272\237.py" new file: "Retail/z2 retail_load_data_\344\275\234\345\272\237.sql" new file: "Retail/z3 retail_overview_data_\344\275\234\345\272\237.sql" new file: THC/01 dm_thc_pack_property.sql new file: THC/02 dm_ext_thc_sales.sql new file: THC/02 dm_ext_thc_sales_bakup_20260327.sql new file: THC/03 DM_TF_EXT_THC_MARKET_SALES_CHT.sql new file: THC/04 dm_tf_external_sales_thc.sql new file: THC/05 DM_TD_EXT_THC_PACK_PROPERTY.sql new file: THC/06 DM_TD_EXT_THC_PACK2MARKET.sql new file: THC/07 DM_TD_EXT_THC_MARKET_RATIO.sql new file: THC/08 DM_TD_EXT_THC_MARKET_BRAND_RATIO.sql new file: THC/09 DM_TD_EXT_THC_MARKET_PACK_MAPPING.sql new file: UNIONALL/DM_TD_EXT_UNIONALL_MARKET_PACK_MAPPING.sql new file: UNIONALL/DM_TD_EXT_UNIONALL_PACKINFO.sql new file: UNIONALL/DM_TF_EXT_UNIONALL_MARKET_SALES.sql new file: UNIONALL/DM_TF_EXT_UNIONALL_MARKET_SALES_v1.0.sql new file: UNIONALL/DM_TF_EXT_UNIONALL_SALES.sql new file: UNIONALL/DM_TF_EXT_UNIONALL_SALES_MAPPING.sql new file: UNIONALL/DM_TF_EXT_UNIONALL_SALES_MAPPING_NIAD.sql new file: XIEHE/01 xiehe_blob_to_dwd.py new file: XIEHE/02 dm_xiehe_pack_property.sql new file: XIEHE/03 dm_ext_xiehe_geo.sql new file: XIEHE/04 dm_ext_xiehe_sales.sql new file: XIEHE/05 dm_td_xiehe_core_dept.sql new file: XIEHE/06 DM_TF_EXT_XIEHE_SALES.sql new file: XIEHE/07 DM_TD_EXT_XIEHE_PACK_PROPERTY.sql new file: XIEHE/08 DM_TD_EXT_XIEHE_PACK2MARKET.sql new file: XIEHE/09 DM_TD_EXT_XIEHE_MARKET_RATIO.sql new file: XIEHE/10 DM_TD_EXT_XIEHE_MARKET_BRAND_RATIO.sql new file: XIEHE/11 DM_TD_EXT_XIEHE_MARKET_PACK_MAPPING.sql new file: XIEHE/bkp_01 xiehe_blob2dwd.py new file: XIEHE/bkp_02 dm_ext_xiehe_sales.sql new file: XIEHE/bkp_03 dm_ext_xiehe_pack_property.sql new file: county/01 tmp_ims_county_fact_sales_sum.sql new file: county/02 tmp_imscounty_Result.sql new file: county/03 dm_ims_td_county_geo.sql new file: county/04 dws_ext_county_tf_sales.sql new file: county/05 dm_ext_county_td_pack_property.sql new file: county/06 dm_td_county_pack_region.sql new file: county/07 dm_ext_county_tf_sales_region.sql new file: county/08 DM_TD_EXT_COUNTY_PACK_PROPERTY.sql new file: county/09 DM_TF_EXT_COUNTY_SALES.sql new file: county/10 DM_TD_EXT_COUNTY_PACK2MARKET.sql new file: county/11 DM_TD_EXT_COUNTY_MARKET_RATIO.sql new file: county/12 DM_TD_EXT_COUNTY_MARKET_BRAND_RATIO.sql new file: county/13 DM_TD_EXT_COUNTY_MARKET_PACK_MAPPING.sql new file: for_AIA_Dashboard/01 dm_td_aia_inst_mkt.sql new file: for_AIA_Dashboard/02 dm_td_aia_auth_sales.sql new file: for_AIA_Dashboard/03 dm_td_aia_original_col.sql new file: for_AIA_Dashboard/04 dm_td_aia_nosales_inst.sql new file: for_AIA_Dashboard/05 dm_td_aia_is_eagle_flag.sql new file: for_AIA_Dashboard/06 dm_td_aia_rank.sql new file: for_AIA_Dashboard/07 dm_ext_aia_data_remove_flag.sql new file: for_AIA_Dashboard/07 dm_td_aia_remove_special_ins_bkp.py new file: for_AIA_Dashboard/08 dm_ext_aia_data_quality_flag.sql new file: z 01 dm_tf_external_sales.sql new file: "z 10 dm_td_external_market_pack_mapping_\344\275\234\345\272\237.sql" new file: "z 11 dm_td_external_market_\344\275\234\345\272\237.sql" new file: "\344\270\212\347\272\277\350\204\232\346\234\254.sql" new file: "\346\225\260\346\215\256\351\252\214\350\257\201.sql"
479 lines
17 KiB
Python
479 lines
17 KiB
Python
# Databricks notebook source
|
||
# MAGIC %run ../../../Common/config
|
||
|
||
# COMMAND ----------
|
||
|
||
# MAGIC %sql
|
||
# MAGIC insert overwrite dwd.dwd_gnd_xiehe_config_table_mapping
|
||
# MAGIC select id,file_name,
|
||
# MAGIC concat('dwd.dwd_gnd_xiehe_', id) as table_name,
|
||
# MAGIC from_utc_timestamp(current_timestamp(),'UTC+8') as etl_insert_dt
|
||
# MAGIC from dwd.dwd_gnd_xiehe_config_table
|
||
|
||
# COMMAND ----------
|
||
|
||
# MAGIC %md
|
||
# MAGIC + 20250415 kzzh331 uc_upgrade uc兼容调整:
|
||
# MAGIC 1. 将代码中的`DBFS`路径根调整为`Volume`路径
|
||
# MAGIC 2. 将`session`中安装的`mdb`工具调整到集群`init-script`中,`uc`集群可能需要`admin`权限才能在集群中安装程序
|
||
# MAGIC 3. 清理重复的`access_file_path_template`赋值,根据环境自动设置`Blob Container`
|
||
|
||
# COMMAND ----------
|
||
|
||
# %sh
|
||
# # 安装 mdbtools
|
||
# sudo apt-get update
|
||
# sudo apt-get install -y apt-utils
|
||
# sudo DEBIAN_FRONTEND=noninteractive apt-get install -y mdbtools
|
||
|
||
# COMMAND ----------
|
||
|
||
# import os
|
||
# from pyspark.sql import SparkSession
|
||
|
||
# # 初始化SparkSession
|
||
# spark = SparkSession.builder.getOrCreate()
|
||
|
||
# # 列出最新日期目录下的所有 Blob 文件
|
||
# def list_latest_blob_files(base_path_template):
|
||
# try:
|
||
# current_date = datetime.utcnow() + timedelta(hours=8)
|
||
# date_path = current_date.strftime("%Y/%m/%d/")
|
||
# base_path = base_path_template + date_path
|
||
|
||
# if path_exists(base_path):
|
||
# all_files = list_files_recursive(base_path)
|
||
# access_files = [file for file in all_files if is_access_file(file)]
|
||
# if not access_files:
|
||
# print("最新日期目录下未找到 Access 文件。")
|
||
# return []
|
||
# else:
|
||
# return access_files
|
||
# else:
|
||
# print(f"指定路径不存在: {base_path}")
|
||
# return []
|
||
# except Exception as e:
|
||
# print(f"列出路径模板 {base_path_template} 中的最新 Blob 文件时出错: {e}")
|
||
# raise
|
||
|
||
|
||
# # 设置原始路径模板
|
||
# # 测试路径
|
||
# # access_file_path_template = "abfss://master@retaildlstoragetest.dfs.core.chinacloudapi.cn/ODS/GND/UserUpload/"
|
||
# # 生产路径
|
||
# access_file_path_template = "abfss://master@azcdatalakeprd.dfs.core.chinacloudapi.cn/ODS/GND/UserUpload/"
|
||
|
||
|
||
# # Blob存储的文件夹路径
|
||
# blob_folder_path = list_latest_blob_files(access_file_path_template)
|
||
|
||
# # 遍历Blob存储上的文件夹和子文件夹
|
||
# file_list = []
|
||
# for root, dirs, files in os.walk(blob_folder_path):
|
||
# for file in files:
|
||
# file_list.append((root, file))
|
||
|
||
# # 创建DataFrame
|
||
# df = spark.createDataFrame(file_list, ["folder_path", "file_name"])
|
||
|
||
# # 将DataFrame写入临时表
|
||
# df.createOrReplaceTempView("temp_table")
|
||
|
||
|
||
# COMMAND ----------
|
||
|
||
# # 730 修改在一半版本
|
||
# from io import StringIO
|
||
# import pandas as pd
|
||
# from pyspark.sql import SparkSession
|
||
# from subprocess import run, PIPE
|
||
# import subprocess
|
||
# from pyspark.sql import SparkSession
|
||
# from datetime import datetime
|
||
|
||
# #获取所有blob文件目录地址并复制到 dbfs 的指定目录下
|
||
# def get_access_file_path_dbfs_list(access_file_path_blob,target_path):
|
||
# access_file_path_dbfs_list =[]
|
||
# # 使用 dbutils.fs.ls 遍历目录
|
||
# for file_info in dbutils.fs.ls(access_file_path_blob):
|
||
# if file_info.isDir() and file_info.path.endswith('/') :
|
||
# # 确保是目录
|
||
# blob_path = file_info.path
|
||
# for file_name in dbutils.fs.ls(file_info.path):
|
||
# if file_name.path.lower().endswith('xiehe.accdb') & file_name.name.lower().startswith('database') :
|
||
# dbutils.fs.cp(file_name.path,f"{target_path}/{file_name.name}" )
|
||
# access_file_path_dbfs_list.append(f"{target_path}/{file_name.name}")
|
||
# return access_file_path_dbfs_list
|
||
|
||
# import subprocess
|
||
|
||
# # 将access 文件转化成csv
|
||
# def access_file_to_csv(access_file_path_blob,target_path,output_dir):
|
||
# access_file_path_dbfs_list = get_access_file_path_dbfs_list(access_file_path_blob,target_path)
|
||
# print(access_file_path_dbfs_list)
|
||
# for access_path in access_file_path_dbfs_list:
|
||
# # 首先列出所有表名
|
||
# result = subprocess.run(['mdb-tables', '-1', f"/dbfs{access_path}"], stdout=subprocess.PIPE, text=True)
|
||
# print(result.returncode)
|
||
# if result.returncode != 0:
|
||
# print(f"Error listing tables: {result.stderr}")
|
||
# return
|
||
# table_names = result.stdout.strip().split('\n')
|
||
# print(table_names)
|
||
# # 遍历表,并保存至csv ,以下代码有问题----修需改
|
||
# for table_name in table_names:
|
||
# csv_path = f"{output_dir}/{table_name}.csv"
|
||
# export_result = subprocess.run(['mdb-export', f"/dbfs{access_path}", table_name], stdout=subprocess.PIPE, text=True)
|
||
# with open(csv_path, 'w') as file:
|
||
# file.write(export_result.stdout)
|
||
# if export_result.returncode != 0:
|
||
# print(f"Error exporting {table_name} to {csv_path}: {export_result.stderr}")
|
||
|
||
# # 以上代码有问题----修需改
|
||
# # 还需功能,实现 读取csv 文件,写入对应DWD 表,并基于数据时间对 结果表数据进行数据覆盖
|
||
|
||
|
||
# def main(access_file_path_blob,target_path,output_dir):
|
||
# access_file_to_csv(access_file_path_blob,target_path,output_dir)
|
||
|
||
# if __name__ == "__main__":
|
||
|
||
# # 原始路径模板,使用年月日替换
|
||
# access_file_path_template = "abfss://master@retaildlstoragetest.dfs.core.chinacloudapi.cn/ODS/GND/UserUpload/{date}/"
|
||
# today = datetime.now().strftime("%Y/%m/%d")
|
||
# access_file_path_blob = access_file_path_template.format(date=today)
|
||
|
||
# main(access_file_path_blob,"/dbfs/tmp" ,"/dbfs/tmp")
|
||
|
||
# COMMAND ----------
|
||
|
||
import os
|
||
import subprocess
|
||
import pandas as pd
|
||
from datetime import datetime, timedelta
|
||
from concurrent.futures import ThreadPoolExecutor
|
||
from pyspark.sql.functions import expr, date_format, lit
|
||
from queue import Queue
|
||
|
||
# 增加一个全局变量,保存线程中生成的临时文件名称
|
||
TEMP_FILES = Queue()
|
||
|
||
# 下载 Access 文件到本地
|
||
def download_access_file(file_path, local_path):
|
||
# dbutils.fs.cp(file_path, local_path.replace("/dbfs", ""))
|
||
dbutils.fs.cp(file_path, local_path)
|
||
print(f"已下载 {file_path} 到 {local_path}")
|
||
return local_path
|
||
|
||
# 检查文件是否存在
|
||
def file_exists(local_path):
|
||
return os.path.exists(local_path)
|
||
|
||
# 列出 Access 文件中的表
|
||
def list_tables_in_access_file(local_path):
|
||
if not file_exists(local_path):
|
||
raise FileNotFoundError(f"文件未找到: {local_path}")
|
||
|
||
result = subprocess.run(['mdb-tables', '-1', local_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||
if result.returncode != 0:
|
||
error_message = result.stderr.decode('utf-8')
|
||
raise RuntimeError(f"列出表名时出错: {error_message}")
|
||
|
||
table_names = result.stdout.decode('utf-8').strip().split('\n')
|
||
print(f"{local_path} 中的表: {table_names}")
|
||
return table_names
|
||
|
||
# 读取每个表的数据并转换为 Pandas DataFrame
|
||
def read_table_from_access(local_path, table_name):
|
||
local_dirname = os.path.dirname(local_path)
|
||
local_filename = os.path.basename(local_path).split('.')[0]
|
||
temp_file_path = f"{local_dirname}/mdb_export_{local_filename}_{table_name}.csv"
|
||
|
||
try:
|
||
# 将数据导出到临时文件
|
||
result = subprocess.run(
|
||
["mdb-export", local_path, table_name], stdout=open(temp_file_path, "w")
|
||
)
|
||
|
||
# 读取csv文件并清理列名
|
||
df = (
|
||
spark.read.option("header", "true")
|
||
.option("quote", '"')
|
||
.option("escape", '"')
|
||
.option("multiLine", "true")
|
||
.option("mode", "PERMISSIVE")
|
||
.csv(temp_file_path)
|
||
)
|
||
# 清理列名
|
||
for col in df.columns:
|
||
clean_col = (
|
||
col.replace(" ", "_")
|
||
.replace(",", "_")
|
||
.replace(";", "_")
|
||
.replace("{", "_")
|
||
.replace("}", "_")
|
||
.replace("(", "_")
|
||
.replace(")", "_")
|
||
.replace("\n", "_")
|
||
.replace("\t", "_")
|
||
.replace("=", "_")
|
||
)
|
||
if clean_col != col:
|
||
df = df.withColumnRenamed(col, clean_col)
|
||
return df
|
||
finally:
|
||
TEMP_FILES.put(temp_file_path)
|
||
|
||
# 并行处理表
|
||
def process_table_parallel(local_path, table_name, column_mapping, target_columns):
|
||
try:
|
||
df = read_table_from_access(local_path, table_name)
|
||
|
||
# 映射列名
|
||
if column_mapping:
|
||
for old_col, new_col in column_mapping.items():
|
||
df = df.withColumnRenamed(old_col, new_col)
|
||
|
||
# 确保列对齐
|
||
for col in target_columns:
|
||
if col not in df.columns:
|
||
df = df.withColumn(col, lit(None))
|
||
|
||
# 添加 ETL 字段
|
||
df = df.withColumn("etl_insert_dt", date_format(expr("current_timestamp() + INTERVAL 8 HOURS"), "yyyy-MM-dd HH:mm:ss"))
|
||
df = df.withColumn("etl_update_dt", date_format(expr("current_timestamp() + INTERVAL 8 HOURS"), "yyyy-MM-dd HH:mm:ss"))
|
||
|
||
# 重新排序列
|
||
spark_df = df.select(target_columns)
|
||
|
||
return spark_df
|
||
except Exception as e:
|
||
print(f"处理表 {table_name} 时出错: {e}")
|
||
return None
|
||
|
||
# 处理 Access 文件并转换为 Spark DataFrame
|
||
def process_access_files_with_config(access_files, config_df, column_mapping):
|
||
all_dataframes = []
|
||
|
||
# 获取目标表的列名
|
||
target_columns = list(column_mapping.values()) + ['etl_insert_dt', 'etl_update_dt']
|
||
|
||
with ThreadPoolExecutor(max_workers=8) as executor:
|
||
futures = []
|
||
|
||
# 读取配置文件中的配置信息
|
||
for index, row in config_df.iterrows():
|
||
file_name = row['file_name']
|
||
table_name = row['table_name']
|
||
|
||
# 找到匹配的文件
|
||
matching_files = [file for file in access_files if file_name in file]
|
||
if not matching_files:
|
||
print(f"未找到匹配的文件: {file_name}")
|
||
continue
|
||
|
||
# 读取匹配文件中的数据并合并
|
||
for file in matching_files:
|
||
try:
|
||
local_path = download_access_file(file, f"/Volumes/{NGBI_CATALOG}/tmp/volume_tmp/tmp/{os.path.basename(file)}")
|
||
table_names = list_tables_in_access_file(local_path)
|
||
|
||
for table in table_names:
|
||
futures.append(executor.submit(process_table_parallel, local_path, table, column_mapping, target_columns))
|
||
except Exception as e:
|
||
print(f"处理文件 {file} 时出错: {e}")
|
||
continue
|
||
|
||
for future in futures:
|
||
result = future.result()
|
||
if result:
|
||
all_dataframes.append((result, table_name))
|
||
|
||
return all_dataframes
|
||
|
||
# 设置原始路径模板
|
||
# 测试路径
|
||
# access_file_path_template = "abfss://master@retaildlstoragetest.dfs.core.chinacloudapi.cn/ODS/GND/UserUpload/"
|
||
# 生产路径
|
||
if ENVIRONMENT == PRD_ENVIRONMENT_VALUE:
|
||
access_file_path_template = "abfss://master@azcdatalakeprd.dfs.core.chinacloudapi.cn/ODS/GND/UserUpload/"
|
||
elif ENVIRONMENT == TEST_ENVIRONMENT_VALUE:
|
||
access_file_path_template = "abfss://master@retaildlstoragetest.dfs.core.chinacloudapi.cn/ODS/GND/UserUpload/"
|
||
|
||
# 列出最新日期目录下的所有 Blob 文件
|
||
def list_latest_blob_files(base_path_template):
|
||
try:
|
||
current_date = datetime.utcnow() + timedelta(hours=8)
|
||
date_path = current_date.strftime("%Y/%m/%d/")
|
||
base_path = base_path_template + date_path
|
||
|
||
if path_exists(base_path):
|
||
all_files = list_files_recursive(base_path)
|
||
access_files = [file for file in all_files if is_access_file(file)]
|
||
if not access_files:
|
||
print("最新日期目录下未找到 Access 文件。")
|
||
return []
|
||
else:
|
||
return access_files
|
||
else:
|
||
print(f"指定路径不存在: {base_path}")
|
||
return []
|
||
except Exception as e:
|
||
print(f"列出路径模板 {base_path_template} 中的最新 Blob 文件时出错: {e}")
|
||
raise
|
||
|
||
# 检查路径是否存在
|
||
def path_exists(path):
|
||
try:
|
||
dbutils.fs.ls(path)
|
||
return True
|
||
except Exception as e:
|
||
if "java.io.FileNotFoundException" in str(e):
|
||
return False
|
||
else:
|
||
print(f"检查路径 {path} 时出错: {e}")
|
||
raise
|
||
|
||
# 判断文件是否为 Access 文件
|
||
def is_access_file(file_path):
|
||
return file_path.lower().endswith('.accdb')
|
||
|
||
# 递归遍历目录以列出所有文件
|
||
def list_files_recursive(path):
|
||
try:
|
||
files = []
|
||
if path_exists(path):
|
||
for file_info in dbutils.fs.ls(path):
|
||
if file_info.isDir():
|
||
files.extend(list_files_recursive(file_info.path))
|
||
else:
|
||
files.append(file_info.path)
|
||
return files
|
||
except Exception as e:
|
||
print(f"列出路径 {path} 中的文件时出错: {e}")
|
||
raise
|
||
|
||
# 设置原始路径模板
|
||
# 测试路径
|
||
# access_file_path_template = "abfss://master@retaildlstoragetest.dfs.core.chinacloudapi.cn/ODS/GND/UserUpload/"
|
||
# 生产路径
|
||
# access_file_path_template = "abfss://master@azcdatalakeprd.dfs.core.chinacloudapi.cn/ODS/GND/UserUpload/"
|
||
|
||
# 列出最新日期目录下的 Blob 文件
|
||
latest_access_files = list_latest_blob_files(access_file_path_template)
|
||
|
||
if latest_access_files:
|
||
try:
|
||
# 从配置表中读取配置数据
|
||
config_df = spark.table("dwd.dwd_gnd_xiehe_config_table_mapping").toPandas()
|
||
|
||
# 字段映射
|
||
column_mapping = {
|
||
'AREA': 'area',
|
||
'Date': 'yq',
|
||
'医院级别': 'h_level',
|
||
'DEPT_NAME': 'dept_name',
|
||
'报销': 'reimburse',
|
||
'报销类型': 'reimburse_type',
|
||
'处方来源': 'prescription_source',
|
||
'ATC': 'atc',
|
||
'新code': 'new_code',
|
||
'通用名': 'common_name',
|
||
'商品名称': 'product_name',
|
||
'厂家': 'manu_des',
|
||
'规格': 'pack_des',
|
||
'给药途径': 'drug_delivery_route',
|
||
'剂型': 'nfc',
|
||
'处方张数': 'prescription',
|
||
'取药数量': 'sales_vol',
|
||
'单价': 'price',
|
||
'金额': 'sales_value'
|
||
}
|
||
|
||
# 处理并合并最新的 Access 文件的数据
|
||
all_dataframes = process_access_files_with_config(latest_access_files, config_df, column_mapping)
|
||
|
||
if all_dataframes:
|
||
combined_spark_df = None
|
||
|
||
for df, table_name in all_dataframes:
|
||
# 将更改字段名后的 DataFrame 转换为 Spark DataFrame
|
||
df.createOrReplaceTempView("temp_access_table")
|
||
spark.sql(f"CREATE TABLE IF NOT EXISTS {table_name} AS SELECT * FROM temp_access_table")
|
||
spark.sql(f"INSERT overwrite {table_name} SELECT * FROM temp_access_table")
|
||
|
||
if combined_spark_df is None:
|
||
combined_spark_df = df
|
||
else:
|
||
combined_spark_df = combined_spark_df.union(df)
|
||
|
||
# 合并所有写入的表成一个结果表 result
|
||
if combined_spark_df:
|
||
combined_spark_df.createOrReplaceTempView("combined_table")
|
||
spark.sql("create table if not exists dwd.dwd_inc_gnd_ext_xiehe_union_all as select * from combined_table")
|
||
spark.sql("insert overwrite dwd.dwd_inc_gnd_ext_xiehe_union_all select * from combined_table")
|
||
else:
|
||
print("Access 文件中没有找到数据。")
|
||
except Exception as e:
|
||
print(f"处理 Access 文件时出错: {e}")
|
||
else:
|
||
print("没有 Access 文件需要处理。")
|
||
|
||
|
||
# COMMAND ----------
|
||
|
||
# MAGIC %sql
|
||
# MAGIC delete from dwd.dwd_gnd_ext_xiehe_raw_data
|
||
# MAGIC where yq in (
|
||
# MAGIC select yq from dwd.dwd_inc_gnd_ext_xiehe_union_all t1
|
||
# MAGIC where left(t1.etl_insert_dt, 10) = left(from_utc_timestamp(current_timestamp(),'UTC+8'),10)
|
||
# MAGIC );
|
||
# MAGIC
|
||
# MAGIC insert into dwd.dwd_gnd_ext_xiehe_raw_data
|
||
# MAGIC (
|
||
# MAGIC area,
|
||
# MAGIC yq,
|
||
# MAGIC h_level,
|
||
# MAGIC dept_name,
|
||
# MAGIC reimburse,
|
||
# MAGIC reimburse_type,
|
||
# MAGIC prescription_source,
|
||
# MAGIC atc,
|
||
# MAGIC new_code,
|
||
# MAGIC common_name,
|
||
# MAGIC product_name,
|
||
# MAGIC manu_des,
|
||
# MAGIC pack_des,
|
||
# MAGIC drug_delivery_route,
|
||
# MAGIC nfc,
|
||
# MAGIC prescription,
|
||
# MAGIC sales_vol,
|
||
# MAGIC price,
|
||
# MAGIC sales_value,
|
||
# MAGIC etl_insert_dt,
|
||
# MAGIC etl_update_dt
|
||
# MAGIC )
|
||
# MAGIC select area,
|
||
# MAGIC yq,
|
||
# MAGIC h_level,
|
||
# MAGIC dept_name,
|
||
# MAGIC reimburse,
|
||
# MAGIC reimburse_type,
|
||
# MAGIC prescription_source,
|
||
# MAGIC atc,
|
||
# MAGIC new_code,
|
||
# MAGIC common_name,
|
||
# MAGIC product_name,
|
||
# MAGIC manu_des,
|
||
# MAGIC pack_des,
|
||
# MAGIC drug_delivery_route,
|
||
# MAGIC nfc,
|
||
# MAGIC prescription,
|
||
# MAGIC sales_vol,
|
||
# MAGIC price,
|
||
# MAGIC sales_value,
|
||
# MAGIC from_utc_timestamp(current_timestamp(),'UTC+8') etl_insert_dt,
|
||
# MAGIC from_utc_timestamp(current_timestamp(),'UTC+8') etl_update_dt
|
||
# MAGIC from dwd.dwd_inc_gnd_ext_xiehe_union_all
|
||
# MAGIC where left(etl_insert_dt, 10) = left(from_utc_timestamp(current_timestamp(),'UTC+8'),10);
|