-- Databricks notebook source -- MAGIC %md -- MAGIC ### 从blob读取csv文件作为chap法伯的事实表 -- COMMAND ---------- -- 源表:文件 -- 目标表:DWD.DWD_GND_PHARBERS_PROV_FACT -- COMMAND ---------- -- MAGIC %run ../../../Common/config -- COMMAND ---------- -- MAGIC %python -- MAGIC from datetime import datetime, timedelta -- MAGIC import pandas as pd -- COMMAND ---------- -- MAGIC %python -- MAGIC if ENVIRONMENT == PRD_ENVIRONMENT_VALUE: -- MAGIC factsales_file_path_template = "abfss://master@azcdatalakeprd.dfs.core.chinacloudapi.cn/ODS/GND/UserUpload/" -- MAGIC elif ENVIRONMENT == TEST_ENVIRONMENT_VALUE: -- MAGIC factsales_file_path_template = "abfss://master@retaildlstoragetest.dfs.core.chinacloudapi.cn/ODS/GND/UserUpload/" -- COMMAND ---------- -- MAGIC %python -- MAGIC # 计算时间得到当天的路径 -- MAGIC current_date = datetime.utcnow() + timedelta(hours=8) -- MAGIC date_path = current_date.strftime("%Y/%m/%d/") -- MAGIC base_path = factsales_file_path_template + date_path -- COMMAND ---------- -- MAGIC %python -- MAGIC # 路径是否存在 -- MAGIC def path_exists(path): -- MAGIC try: -- MAGIC dbutils.fs.ls(path) -- MAGIC return True -- MAGIC except Exception as e: -- MAGIC if "java.io.FileNotFoundException" in str(e): -- MAGIC return False -- MAGIC else: -- MAGIC print(f"检查路径 {path} 时出错: {e}") -- MAGIC raise -- COMMAND ---------- -- MAGIC %python -- MAGIC # 列出blob上的文件列表 -- MAGIC def list_file_name(path): -- MAGIC first_path_list = [i.path for i in dbutils.fs.ls(path)] -- MAGIC second_path_list = [dbutils.fs.ls(i)[0] for i in first_path_list ] -- MAGIC return second_path_list -- COMMAND ---------- -- MAGIC %python -- MAGIC # 从blob下载文件到local -- MAGIC def download_file(file_path, local_path): -- MAGIC # dbutils.fs.cp(file_path, local_path.replace("/dbfs", "")) -- MAGIC dbutils.fs.cp(file_path, local_path) -- MAGIC print(f"已下载 {file_path} 到 {local_path}") -- MAGIC return local_path -- COMMAND ---------- -- MAGIC %md -- MAGIC ### 获取路径下的文件名称 -- MAGIC - 并挑出符合条件的文件路径 -- COMMAND ---------- -- MAGIC %python -- MAGIC try: -- MAGIC if path_exists(base_path): -- MAGIC all_file_list = list_file_name(base_path) -- MAGIC # 生成df来筛选内容 -- MAGIC files_df = pd.DataFrame([{ -- MAGIC 'path':f.path, -- MAGIC 'modificationtime': f.modificationTime, -- MAGIC 'name': f.name -- MAGIC } for f in all_file_list]) -- MAGIC print(f"{base_path} 路径存在") -- MAGIC else: -- MAGIC print(f"{base_path} 路径不存在") -- MAGIC except Exception as e: -- MAGIC print(e) -- COMMAND ---------- -- MAGIC %python -- MAGIC try: -- MAGIC files_df = files_df.sort_values('modificationtime', ascending=False).drop_duplicates('name').sort_index() -- MAGIC files_df = files_df[files_df['name'].str.match(r'^Pharbers_PROV_Fact.*\.csv$')] -- MAGIC files_df -- MAGIC except Exception as e: -- MAGIC print(e) -- COMMAND ---------- -- MAGIC %md -- MAGIC ### 读取文件内容 -- COMMAND ---------- -- MAGIC %python -- MAGIC import os -- COMMAND ---------- -- MAGIC %python -- MAGIC # 下载数据到local -- MAGIC try: -- MAGIC if files_df['path'].tolist(): -- MAGIC # 如果列表不为空 -- MAGIC df_all = [] -- MAGIC for file in files_df['path'].tolist(): -- MAGIC local_path = download_file(file, f"/Volumes/{NGBI_CATALOG}/tmp/volume_tmp/tmp/{os.path.basename(file)}") -- MAGIC file_df = (spark.read.option("header", "true").option("quote", '"').option("escape", '"').option("multiLine", "true").option("mode", "PERMISSIVE").csv(local_path)) -- MAGIC file_df = file_df.drop("TA",'Market') -- MAGIC print(f'已读取{local_path}') -- MAGIC df_all.append(file_df) -- MAGIC # df_combine = pd.concat(df_all) -- MAGIC df_ifexists = True -- MAGIC else: -- MAGIC print('没有符合条件的文件') -- MAGIC df_ifexists= False -- MAGIC except Exception as e: -- MAGIC print(e) -- COMMAND ---------- -- MAGIC %python -- MAGIC try: -- MAGIC if df_ifexists: # 如果不为空 -- MAGIC spark.sql(f"TRUNCATE table tmp.tmp_chpa_raw_data") -- MAGIC num = 1 -- MAGIC for i in df_all: -- MAGIC i.withColumnRenamed('IMS.药品ID','IMS_DRUG_ID') \ -- MAGIC .withColumnRenamed('是否法伯编码','IS_HOSP_CODE') \ -- MAGIC .withColumnRenamed('规格','SPEC') \ -- MAGIC .withColumnRenamed('转换比','CONVERSION_RATIO') \ -- MAGIC .withColumnRenamed('剂型','DOSAGE_FORM') \ -- MAGIC .withColumnRenamed('价格','PRICE') -- MAGIC # i.columns = ['area','city','yyyyqq','yyyymm','reimburse','prescription_source','prescription_dept_lv1','prescription_dept_lv2','prescription_dept_lv3','ATC','new_code','common_name','product_name','pack_des','PackSize','PackageType','nfc','manu_des','drug_delivery_route','prescription','sales_vol','sales_value'] #'h_level', -- MAGIC # sdf = spark.createDataFrame(i) -- MAGIC i.createOrReplaceTempView('fact_sales') -- MAGIC spark.sql(f"INSERT into tmp.tmp_chpa_raw_data SELECT * FROM fact_sales") -- MAGIC print(f'第{num}个') -- MAGIC num +=1 -- MAGIC except Exception as e: -- MAGIC print(e) -- COMMAND ---------- -- MAGIC %md -- MAGIC ### 将读取到的dataframe写入表中 -- COMMAND ---------- -- 全量覆盖 insert overwrite dwd.dwd_gnd_pharbers_prov_fact select year, ym, province_c, ims_drug_id, is_hosp_code, prod_corp, prod_cod, pack_cod, phcd, prod_des, cmps_des, corp_des, mnfl_cod, prod_des_c, cmps_c, corp_des_c, pack_des, spec, conversion_ratio, dosage_form, atc4_cod, app1_cod, app1_des, app1_des_c, app2_cod, app2_des, app2_des_c, app3_cod, app3_des, app3_des_c, vbp_batch, vbp, value, totalunit, countingunit, price, manu_des, manu_des_c, NULL AS source_file_path, NULL AS source_file_name, from_utc_timestamp(current_timestamp(),'UTC+8') AS etl_insert_dt from tmp.tmp_chpa_raw_data