# Databricks notebook source # MAGIC %run ../../../Common/config # COMMAND ---------- # MAGIC %sql # MAGIC insert overwrite dwd.dwd_gnd_xiehe_config_table_mapping # MAGIC select id,file_name, # MAGIC concat('dwd.dwd_gnd_xiehe_', id) as table_name, # MAGIC from_utc_timestamp(current_timestamp(),'UTC+8') as etl_insert_dt # MAGIC from dwd.dwd_gnd_xiehe_config_table # COMMAND ---------- # MAGIC %md # MAGIC + 20250415 kzzh331 uc_upgrade uc兼容调整: # MAGIC 1. 将代码中的`DBFS`路径根调整为`Volume`路径 # MAGIC 2. 将`session`中安装的`mdb`工具调整到集群`init-script`中,`uc`集群可能需要`admin`权限才能在集群中安装程序 # MAGIC 3. 清理重复的`access_file_path_template`赋值,根据环境自动设置`Blob Container` # COMMAND ---------- # %sh # # 安装 mdbtools # sudo apt-get update # sudo apt-get install -y apt-utils # sudo DEBIAN_FRONTEND=noninteractive apt-get install -y mdbtools # COMMAND ---------- # import os # from pyspark.sql import SparkSession # # 初始化SparkSession # spark = SparkSession.builder.getOrCreate() # # 列出最新日期目录下的所有 Blob 文件 # def list_latest_blob_files(base_path_template): # try: # current_date = datetime.utcnow() + timedelta(hours=8) # date_path = current_date.strftime("%Y/%m/%d/") # base_path = base_path_template + date_path # if path_exists(base_path): # all_files = list_files_recursive(base_path) # access_files = [file for file in all_files if is_access_file(file)] # if not access_files: # print("最新日期目录下未找到 Access 文件。") # return [] # else: # return access_files # else: # print(f"指定路径不存在: {base_path}") # return [] # except Exception as e: # print(f"列出路径模板 {base_path_template} 中的最新 Blob 文件时出错: {e}") # raise # # 设置原始路径模板 # # 测试路径 # # access_file_path_template = "abfss://master@retaildlstoragetest.dfs.core.chinacloudapi.cn/ODS/GND/UserUpload/" # # 生产路径 # access_file_path_template = "abfss://master@azcdatalakeprd.dfs.core.chinacloudapi.cn/ODS/GND/UserUpload/" # # Blob存储的文件夹路径 # blob_folder_path = list_latest_blob_files(access_file_path_template) # # 遍历Blob存储上的文件夹和子文件夹 # file_list = [] # for root, dirs, files in os.walk(blob_folder_path): # for file in files: # file_list.append((root, file)) # # 创建DataFrame # df = spark.createDataFrame(file_list, ["folder_path", "file_name"]) # # 将DataFrame写入临时表 # df.createOrReplaceTempView("temp_table") # COMMAND ---------- # # 730 修改在一半版本 # from io import StringIO # import pandas as pd # from pyspark.sql import SparkSession # from subprocess import run, PIPE # import subprocess # from pyspark.sql import SparkSession # from datetime import datetime # #获取所有blob文件目录地址并复制到 dbfs 的指定目录下 # def get_access_file_path_dbfs_list(access_file_path_blob,target_path): # access_file_path_dbfs_list =[] # # 使用 dbutils.fs.ls 遍历目录 # for file_info in dbutils.fs.ls(access_file_path_blob): # if file_info.isDir() and file_info.path.endswith('/') : # # 确保是目录 # blob_path = file_info.path # for file_name in dbutils.fs.ls(file_info.path): # if file_name.path.lower().endswith('xiehe.accdb') & file_name.name.lower().startswith('database') : # dbutils.fs.cp(file_name.path,f"{target_path}/{file_name.name}" ) # access_file_path_dbfs_list.append(f"{target_path}/{file_name.name}") # return access_file_path_dbfs_list # import subprocess # # 将access 文件转化成csv # def access_file_to_csv(access_file_path_blob,target_path,output_dir): # access_file_path_dbfs_list = get_access_file_path_dbfs_list(access_file_path_blob,target_path) # print(access_file_path_dbfs_list) # for access_path in access_file_path_dbfs_list: # # 首先列出所有表名 # result = subprocess.run(['mdb-tables', '-1', f"/dbfs{access_path}"], stdout=subprocess.PIPE, text=True) # print(result.returncode) # if result.returncode != 0: # print(f"Error listing tables: {result.stderr}") # return # table_names = result.stdout.strip().split('\n') # print(table_names) # # 遍历表,并保存至csv ,以下代码有问题----修需改 # for table_name in table_names: # csv_path = f"{output_dir}/{table_name}.csv" # export_result = subprocess.run(['mdb-export', f"/dbfs{access_path}", table_name], stdout=subprocess.PIPE, text=True) # with open(csv_path, 'w') as file: # file.write(export_result.stdout) # if export_result.returncode != 0: # print(f"Error exporting {table_name} to {csv_path}: {export_result.stderr}") # # 以上代码有问题----修需改 # # 还需功能,实现 读取csv 文件,写入对应DWD 表,并基于数据时间对 结果表数据进行数据覆盖 # def main(access_file_path_blob,target_path,output_dir): # access_file_to_csv(access_file_path_blob,target_path,output_dir) # if __name__ == "__main__": # # 原始路径模板,使用年月日替换 # access_file_path_template = "abfss://master@retaildlstoragetest.dfs.core.chinacloudapi.cn/ODS/GND/UserUpload/{date}/" # today = datetime.now().strftime("%Y/%m/%d") # access_file_path_blob = access_file_path_template.format(date=today) # main(access_file_path_blob,"/dbfs/tmp" ,"/dbfs/tmp") # COMMAND ---------- import os import subprocess import pandas as pd from datetime import datetime, timedelta from concurrent.futures import ThreadPoolExecutor from pyspark.sql.functions import expr, date_format, lit from queue import Queue # 增加一个全局变量,保存线程中生成的临时文件名称 TEMP_FILES = Queue() # 下载 Access 文件到本地 def download_access_file(file_path, local_path): # dbutils.fs.cp(file_path, local_path.replace("/dbfs", "")) dbutils.fs.cp(file_path, local_path) print(f"已下载 {file_path} 到 {local_path}") return local_path # 检查文件是否存在 def file_exists(local_path): return os.path.exists(local_path) # 列出 Access 文件中的表 def list_tables_in_access_file(local_path): if not file_exists(local_path): raise FileNotFoundError(f"文件未找到: {local_path}") result = subprocess.run(['mdb-tables', '-1', local_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE) if result.returncode != 0: error_message = result.stderr.decode('utf-8') raise RuntimeError(f"列出表名时出错: {error_message}") table_names = result.stdout.decode('utf-8').strip().split('\n') print(f"{local_path} 中的表: {table_names}") return table_names # 读取每个表的数据并转换为 Pandas DataFrame def read_table_from_access(local_path, table_name): local_dirname = os.path.dirname(local_path) local_filename = os.path.basename(local_path).split('.')[0] temp_file_path = f"{local_dirname}/mdb_export_{local_filename}_{table_name}.csv" try: # 将数据导出到临时文件 result = subprocess.run( ["mdb-export", local_path, table_name], stdout=open(temp_file_path, "w") ) # 读取csv文件并清理列名 df = ( spark.read.option("header", "true") .option("quote", '"') .option("escape", '"') .option("multiLine", "true") .option("mode", "PERMISSIVE") .csv(temp_file_path) ) # 清理列名 for col in df.columns: clean_col = ( col.replace(" ", "_") .replace(",", "_") .replace(";", "_") .replace("{", "_") .replace("}", "_") .replace("(", "_") .replace(")", "_") .replace("\n", "_") .replace("\t", "_") .replace("=", "_") ) if clean_col != col: df = df.withColumnRenamed(col, clean_col) return df finally: TEMP_FILES.put(temp_file_path) # 并行处理表 def process_table_parallel(local_path, table_name, column_mapping, target_columns): try: df = read_table_from_access(local_path, table_name) # 映射列名 if column_mapping: for old_col, new_col in column_mapping.items(): df = df.withColumnRenamed(old_col, new_col) # 确保列对齐 for col in target_columns: if col not in df.columns: df = df.withColumn(col, lit(None)) # 添加 ETL 字段 df = df.withColumn("etl_insert_dt", date_format(expr("current_timestamp() + INTERVAL 8 HOURS"), "yyyy-MM-dd HH:mm:ss")) df = df.withColumn("etl_update_dt", date_format(expr("current_timestamp() + INTERVAL 8 HOURS"), "yyyy-MM-dd HH:mm:ss")) # 重新排序列 spark_df = df.select(target_columns) return spark_df except Exception as e: print(f"处理表 {table_name} 时出错: {e}") return None # 处理 Access 文件并转换为 Spark DataFrame def process_access_files_with_config(access_files, config_df, column_mapping): all_dataframes = [] # 获取目标表的列名 target_columns = list(column_mapping.values()) + ['etl_insert_dt', 'etl_update_dt'] with ThreadPoolExecutor(max_workers=8) as executor: futures = [] # 读取配置文件中的配置信息 for index, row in config_df.iterrows(): file_name = row['file_name'] table_name = row['table_name'] # 找到匹配的文件 matching_files = [file for file in access_files if file_name in file] if not matching_files: print(f"未找到匹配的文件: {file_name}") continue # 读取匹配文件中的数据并合并 for file in matching_files: try: local_path = download_access_file(file, f"/Volumes/{NGBI_CATALOG}/tmp/volume_tmp/tmp/{os.path.basename(file)}") table_names = list_tables_in_access_file(local_path) for table in table_names: futures.append(executor.submit(process_table_parallel, local_path, table, column_mapping, target_columns)) except Exception as e: print(f"处理文件 {file} 时出错: {e}") continue for future in futures: result = future.result() if result: all_dataframes.append((result, table_name)) return all_dataframes # 设置原始路径模板 # 测试路径 # access_file_path_template = "abfss://master@retaildlstoragetest.dfs.core.chinacloudapi.cn/ODS/GND/UserUpload/" # 生产路径 if ENVIRONMENT == PRD_ENVIRONMENT_VALUE: access_file_path_template = "abfss://master@azcdatalakeprd.dfs.core.chinacloudapi.cn/ODS/GND/UserUpload/" elif ENVIRONMENT == TEST_ENVIRONMENT_VALUE: access_file_path_template = "abfss://master@retaildlstoragetest.dfs.core.chinacloudapi.cn/ODS/GND/UserUpload/" # 列出最新日期目录下的所有 Blob 文件 def list_latest_blob_files(base_path_template): try: current_date = datetime.utcnow() + timedelta(hours=8) date_path = current_date.strftime("%Y/%m/%d/") base_path = base_path_template + date_path if path_exists(base_path): all_files = list_files_recursive(base_path) access_files = [file for file in all_files if is_access_file(file)] if not access_files: print("最新日期目录下未找到 Access 文件。") return [] else: return access_files else: print(f"指定路径不存在: {base_path}") return [] except Exception as e: print(f"列出路径模板 {base_path_template} 中的最新 Blob 文件时出错: {e}") raise # 检查路径是否存在 def path_exists(path): try: dbutils.fs.ls(path) return True except Exception as e: if "java.io.FileNotFoundException" in str(e): return False else: print(f"检查路径 {path} 时出错: {e}") raise # 判断文件是否为 Access 文件 def is_access_file(file_path): return file_path.lower().endswith('.accdb') # 递归遍历目录以列出所有文件 def list_files_recursive(path): try: files = [] if path_exists(path): for file_info in dbutils.fs.ls(path): if file_info.isDir(): files.extend(list_files_recursive(file_info.path)) else: files.append(file_info.path) return files except Exception as e: print(f"列出路径 {path} 中的文件时出错: {e}") raise # 设置原始路径模板 # 测试路径 # access_file_path_template = "abfss://master@retaildlstoragetest.dfs.core.chinacloudapi.cn/ODS/GND/UserUpload/" # 生产路径 # access_file_path_template = "abfss://master@azcdatalakeprd.dfs.core.chinacloudapi.cn/ODS/GND/UserUpload/" # 列出最新日期目录下的 Blob 文件 latest_access_files = list_latest_blob_files(access_file_path_template) if latest_access_files: try: # 从配置表中读取配置数据 config_df = spark.table("dwd.dwd_gnd_xiehe_config_table_mapping").toPandas() # 字段映射 column_mapping = { 'AREA': 'area', 'Date': 'yq', '医院级别': 'h_level', 'DEPT_NAME': 'dept_name', '报销': 'reimburse', '报销类型': 'reimburse_type', '处方来源': 'prescription_source', 'ATC': 'atc', '新code': 'new_code', '通用名': 'common_name', '商品名称': 'product_name', '厂家': 'manu_des', '规格': 'pack_des', '给药途径': 'drug_delivery_route', '剂型': 'nfc', '处方张数': 'prescription', '取药数量': 'sales_vol', '单价': 'price', '金额': 'sales_value' } # 处理并合并最新的 Access 文件的数据 all_dataframes = process_access_files_with_config(latest_access_files, config_df, column_mapping) if all_dataframes: combined_spark_df = None for df, table_name in all_dataframes: # 将更改字段名后的 DataFrame 转换为 Spark DataFrame df.createOrReplaceTempView("temp_access_table") spark.sql(f"CREATE TABLE IF NOT EXISTS {table_name} AS SELECT * FROM temp_access_table") spark.sql(f"INSERT overwrite {table_name} SELECT * FROM temp_access_table") if combined_spark_df is None: combined_spark_df = df else: combined_spark_df = combined_spark_df.union(df) # 合并所有写入的表成一个结果表 result if combined_spark_df: combined_spark_df.createOrReplaceTempView("combined_table") spark.sql("create table if not exists dwd.dwd_inc_gnd_ext_xiehe_union_all as select * from combined_table") spark.sql("insert overwrite dwd.dwd_inc_gnd_ext_xiehe_union_all select * from combined_table") else: print("Access 文件中没有找到数据。") except Exception as e: print(f"处理 Access 文件时出错: {e}") else: print("没有 Access 文件需要处理。") # COMMAND ---------- # MAGIC %sql # MAGIC delete from dwd.dwd_gnd_ext_xiehe_raw_data # MAGIC where yq in ( # MAGIC select yq from dwd.dwd_inc_gnd_ext_xiehe_union_all t1 # MAGIC where left(t1.etl_insert_dt, 10) = left(from_utc_timestamp(current_timestamp(),'UTC+8'),10) # MAGIC ); # MAGIC # MAGIC insert into dwd.dwd_gnd_ext_xiehe_raw_data # MAGIC ( # MAGIC area, # MAGIC yq, # MAGIC h_level, # MAGIC dept_name, # MAGIC reimburse, # MAGIC reimburse_type, # MAGIC prescription_source, # MAGIC atc, # MAGIC new_code, # MAGIC common_name, # MAGIC product_name, # MAGIC manu_des, # MAGIC pack_des, # MAGIC drug_delivery_route, # MAGIC nfc, # MAGIC prescription, # MAGIC sales_vol, # MAGIC price, # MAGIC sales_value, # MAGIC etl_insert_dt, # MAGIC etl_update_dt # MAGIC ) # MAGIC select area, # MAGIC yq, # MAGIC h_level, # MAGIC dept_name, # MAGIC reimburse, # MAGIC reimburse_type, # MAGIC prescription_source, # MAGIC atc, # MAGIC new_code, # MAGIC common_name, # MAGIC product_name, # MAGIC manu_des, # MAGIC pack_des, # MAGIC drug_delivery_route, # MAGIC nfc, # MAGIC prescription, # MAGIC sales_vol, # MAGIC price, # MAGIC sales_value, # MAGIC from_utc_timestamp(current_timestamp(),'UTC+8') etl_insert_dt, # MAGIC from_utc_timestamp(current_timestamp(),'UTC+8') etl_update_dt # MAGIC from dwd.dwd_inc_gnd_ext_xiehe_union_all # MAGIC where left(etl_insert_dt, 10) = left(from_utc_timestamp(current_timestamp(),'UTC+8'),10);