# Databricks notebook source # MAGIC %md # MAGIC ### 从blob读取csv文件作为xiehe的事实表 # COMMAND ---------- # MAGIC %run ../../../Common/config # COMMAND ---------- from datetime import datetime, timedelta import pandas as pd # COMMAND ---------- if ENVIRONMENT == PRD_ENVIRONMENT_VALUE: factsales_file_path_template = "abfss://master@azcdatalakeprd.dfs.core.chinacloudapi.cn/ODS/GND/UserUpload/" elif ENVIRONMENT == TEST_ENVIRONMENT_VALUE: factsales_file_path_template = "abfss://master@retaildlstoragetest.dfs.core.chinacloudapi.cn/ODS/GND/UserUpload/" # COMMAND ---------- # 计算时间得到当天的路径 current_date = datetime.utcnow() + timedelta(hours=8) date_path = current_date.strftime("%Y/%m/%d/") base_path = factsales_file_path_template + date_path # COMMAND ---------- # 路径是否存在 def path_exists(path): try: dbutils.fs.ls(path) return True except Exception as e: if "java.io.FileNotFoundException" in str(e): return False else: print(f"检查路径 {path} 时出错: {e}") raise # COMMAND ---------- # 列出blob上的文件列表 def list_file_name(path): first_path_list = [i.path for i in dbutils.fs.ls(path)] second_path_list = [dbutils.fs.ls(i)[0] for i in first_path_list ] return second_path_list # COMMAND ---------- # 从blob下载文件到local def download_file(file_path, local_path): # dbutils.fs.cp(file_path, local_path.replace("/dbfs", "")) dbutils.fs.cp(file_path, local_path) print(f"已下载 {file_path} 到 {local_path}") return local_path # COMMAND ---------- # MAGIC %md # MAGIC ### 获取路径下的文件名称 # MAGIC - 并挑出符合条件的文件路径 # COMMAND ---------- try: if path_exists(base_path): all_file_list = list_file_name(base_path) # 生成df来筛选内容 files_df = pd.DataFrame([{ 'path':f.path, 'modificationtime': f.modificationTime, 'name': f.name } for f in all_file_list]) print(f"{base_path} 路径存在") else: print(f"{base_path} 路径不存在") except Exception as e: print(e) # COMMAND ---------- try: files_df = files_df.sort_values('modificationtime', ascending=False).drop_duplicates('name').sort_index() files_df = files_df[files_df['name'].str.match(r'^Dept_Fact.*\.csv$')] files_df except Exception as e: print(e) # COMMAND ---------- # MAGIC %md # MAGIC ### 读取文件内容 # COMMAND ---------- import os # COMMAND ---------- # 下载数据到local try: if files_df['path'].tolist(): # 如果列表不为空 df_all = [] for file in files_df['path'].tolist(): local_path = download_file(file, f"/Volumes/{NGBI_CATALOG}/tmp/volume_tmp/tmp/{os.path.basename(file)}") file_df = (spark.read.option("header", "true").option("quote", '"').option("escape", '"').option("multiLine", "true").option("mode", "PERMISSIVE").csv(local_path)) print(f'已读取{local_path}') df_all.append(file_df) # df_combine = pd.concat(df_all) df_ifexists = True else: print('没有符合条件的文件') df_ifexists= False except Exception as e: print(e) # COMMAND ---------- try: if df_ifexists: # 如果不为空 spark.sql(f"TRUNCATE table tmp.tmp_xiehe_raw_data") num = 1 for i in df_all: df_renamed = i.withColumnRenamed('区域','area') \ .withColumnRenamed('城市','city') \ .withColumnRenamed('年&季度','yyyyqq') \ .withColumnRenamed('月','yyyymm') \ .withColumnRenamed('医保类型','reimburse') \ .withColumnRenamed('处方来源','prescription_source') \ .withColumnRenamed('处方科室_lv1','prescription_dept_lv1') \ .withColumnRenamed('处方科室_lv2','prescription_dept_lv2') \ .withColumnRenamed('处方科室_lv3','prescription_dept_lv3') \ .withColumnRenamed('ATC','ATC') \ .withColumnRenamed('PHCD标准码','new_code') \ .withColumnRenamed('药品通用名','common_name') \ .withColumnRenamed('药品商品名','product_name') \ .withColumnRenamed('规格','pack_des') \ .withColumnRenamed('PackSize','PackSize') \ .withColumnRenamed('PackageType','PackageType') \ .withColumnRenamed('给药途径','nfc') \ .withColumnRenamed('药品厂家','manu_des') \ .withColumnRenamed('药品剂型','drug_delivery_route') \ .withColumnRenamed('处方张数','prescription') \ .withColumnRenamed('取药数量','sales_vol') \ .withColumnRenamed('处方金额','sales_value') # i.columns = ['area','city','yyyyqq','yyyymm','reimburse','prescription_source','prescription_dept_lv1','prescription_dept_lv2','prescription_dept_lv3','ATC','new_code','common_name','product_name','pack_des','PackSize','PackageType','nfc','manu_des','drug_delivery_route','prescription','sales_vol','sales_value'] #'h_level', # sdf = spark.createDataFrame(i) df_renamed.createOrReplaceTempView('fact_sales') spark.sql(f"INSERT into tmp.tmp_xiehe_raw_data SELECT area,city,yyyyqq,yyyymm,reimburse,prescription_source,prescription_dept_lv1,prescription_dept_lv2,prescription_dept_lv3,ATC,new_code,common_name,product_name,pack_des,PackSize,PackageType,nfc,manu_des,drug_delivery_route,prescription,sales_vol,sales_value FROM fact_sales") print(f'第{num}个') num +=1 except Exception as e: print(e) # COMMAND ---------- # MAGIC %md # MAGIC ### 将读取到的dataframe写入表中 # COMMAND ---------- # MAGIC %sql # MAGIC -- 全量覆盖 # MAGIC insert overwrite dwd.dwd_gnd_ext_xiehe_raw_data # MAGIC select # MAGIC area , # MAGIC city , # MAGIC yyyyqq , # MAGIC yyyymm , # MAGIC null h_level , # MAGIC reimburse , # MAGIC prescription_source , # MAGIC prescription_dept_lv1 , # MAGIC prescription_dept_lv2 , # MAGIC prescription_dept_lv3 , # MAGIC ATC , # MAGIC new_code , # MAGIC common_name , # MAGIC product_name , # MAGIC pack_des , # MAGIC cast ( PackSize as BIGINT) PackSize , # MAGIC PackageType , # MAGIC nfc , # MAGIC manu_des , # MAGIC drug_delivery_route, # MAGIC cast ( prescription as BIGINT) prescription , # MAGIC cast ( sales_vol as DECIMAL(38,8)) sales_vol, # MAGIC cast ( sales_value as DECIMAL(38,8)) sales_value , # MAGIC from_utc_timestamp(current_timestamp(),'UTC+8') etl_insert_dt, # MAGIC from_utc_timestamp(current_timestamp(),'UTC+8') etl_update_dt # MAGIC from tmp.tmp_xiehe_raw_data