210 lines
6.2 KiB
SQL
210 lines
6.2 KiB
SQL
-- Databricks notebook source
|
||
-- MAGIC %md
|
||
-- MAGIC ### 从blob读取csv文件作为chap法伯的事实表
|
||
|
||
-- COMMAND ----------
|
||
|
||
-- 源表:文件
|
||
-- 目标表:DWD.DWD_GND_PHARBERS_PROV_FACT
|
||
|
||
-- COMMAND ----------
|
||
|
||
-- MAGIC %run ../../../Common/config
|
||
|
||
-- COMMAND ----------
|
||
|
||
-- MAGIC %python
|
||
-- MAGIC from datetime import datetime, timedelta
|
||
-- MAGIC import pandas as pd
|
||
|
||
-- COMMAND ----------
|
||
|
||
-- MAGIC %python
|
||
-- MAGIC if ENVIRONMENT == PRD_ENVIRONMENT_VALUE:
|
||
-- MAGIC factsales_file_path_template = "abfss://master@azcdatalakeprd.dfs.core.chinacloudapi.cn/ODS/GND/UserUpload/"
|
||
-- MAGIC elif ENVIRONMENT == TEST_ENVIRONMENT_VALUE:
|
||
-- MAGIC factsales_file_path_template = "abfss://master@retaildlstoragetest.dfs.core.chinacloudapi.cn/ODS/GND/UserUpload/"
|
||
|
||
-- COMMAND ----------
|
||
|
||
-- MAGIC %python
|
||
-- MAGIC # 计算时间得到当天的路径
|
||
-- MAGIC current_date = datetime.utcnow() + timedelta(hours=8)
|
||
-- MAGIC date_path = current_date.strftime("%Y/%m/%d/")
|
||
-- MAGIC base_path = factsales_file_path_template + date_path
|
||
|
||
-- COMMAND ----------
|
||
|
||
-- MAGIC %python
|
||
-- MAGIC # 路径是否存在
|
||
-- MAGIC def path_exists(path):
|
||
-- MAGIC try:
|
||
-- MAGIC dbutils.fs.ls(path)
|
||
-- MAGIC return True
|
||
-- MAGIC except Exception as e:
|
||
-- MAGIC if "java.io.FileNotFoundException" in str(e):
|
||
-- MAGIC return False
|
||
-- MAGIC else:
|
||
-- MAGIC print(f"检查路径 {path} 时出错: {e}")
|
||
-- MAGIC raise
|
||
|
||
-- COMMAND ----------
|
||
|
||
-- MAGIC %python
|
||
-- MAGIC # 列出blob上的文件列表
|
||
-- MAGIC def list_file_name(path):
|
||
-- MAGIC first_path_list = [i.path for i in dbutils.fs.ls(path)]
|
||
-- MAGIC second_path_list = [dbutils.fs.ls(i)[0] for i in first_path_list ]
|
||
-- MAGIC return second_path_list
|
||
|
||
-- COMMAND ----------
|
||
|
||
-- MAGIC %python
|
||
-- MAGIC # 从blob下载文件到local
|
||
-- MAGIC def download_file(file_path, local_path):
|
||
-- MAGIC # dbutils.fs.cp(file_path, local_path.replace("/dbfs", ""))
|
||
-- MAGIC dbutils.fs.cp(file_path, local_path)
|
||
-- MAGIC print(f"已下载 {file_path} 到 {local_path}")
|
||
-- MAGIC return local_path
|
||
|
||
-- COMMAND ----------
|
||
|
||
-- MAGIC %md
|
||
-- MAGIC ### 获取路径下的文件名称
|
||
-- MAGIC - 并挑出符合条件的文件路径
|
||
|
||
-- COMMAND ----------
|
||
|
||
-- MAGIC %python
|
||
-- MAGIC try:
|
||
-- MAGIC if path_exists(base_path):
|
||
-- MAGIC all_file_list = list_file_name(base_path)
|
||
-- MAGIC # 生成df来筛选内容
|
||
-- MAGIC files_df = pd.DataFrame([{
|
||
-- MAGIC 'path':f.path,
|
||
-- MAGIC 'modificationtime': f.modificationTime,
|
||
-- MAGIC 'name': f.name
|
||
-- MAGIC } for f in all_file_list])
|
||
-- MAGIC print(f"{base_path} 路径存在")
|
||
-- MAGIC else:
|
||
-- MAGIC print(f"{base_path} 路径不存在")
|
||
-- MAGIC except Exception as e:
|
||
-- MAGIC print(e)
|
||
|
||
-- COMMAND ----------
|
||
|
||
-- MAGIC %python
|
||
-- MAGIC try:
|
||
-- MAGIC files_df = files_df.sort_values('modificationtime', ascending=False).drop_duplicates('name').sort_index()
|
||
-- MAGIC files_df = files_df[files_df['name'].str.match(r'^Pharbers_PROV_Fact.*\.csv$')]
|
||
-- MAGIC files_df
|
||
-- MAGIC except Exception as e:
|
||
-- MAGIC print(e)
|
||
|
||
-- COMMAND ----------
|
||
|
||
-- MAGIC %md
|
||
-- MAGIC ### 读取文件内容
|
||
|
||
-- COMMAND ----------
|
||
|
||
-- MAGIC %python
|
||
-- MAGIC import os
|
||
|
||
-- COMMAND ----------
|
||
|
||
-- MAGIC %python
|
||
-- MAGIC # 下载数据到local
|
||
-- MAGIC try:
|
||
-- MAGIC if files_df['path'].tolist():
|
||
-- MAGIC # 如果列表不为空
|
||
-- MAGIC df_all = []
|
||
-- MAGIC for file in files_df['path'].tolist():
|
||
-- MAGIC local_path = download_file(file, f"/Volumes/{NGBI_CATALOG}/tmp/volume_tmp/tmp/{os.path.basename(file)}")
|
||
-- MAGIC file_df = (spark.read.option("header", "true").option("quote", '"').option("escape", '"').option("multiLine", "true").option("mode", "PERMISSIVE").csv(local_path))
|
||
-- MAGIC file_df = file_df.drop("TA",'Market')
|
||
-- MAGIC print(f'已读取{local_path}')
|
||
-- MAGIC df_all.append(file_df)
|
||
-- MAGIC # df_combine = pd.concat(df_all)
|
||
-- MAGIC df_ifexists = True
|
||
-- MAGIC else:
|
||
-- MAGIC print('没有符合条件的文件')
|
||
-- MAGIC df_ifexists= False
|
||
-- MAGIC except Exception as e:
|
||
-- MAGIC print(e)
|
||
|
||
-- COMMAND ----------
|
||
|
||
-- MAGIC %python
|
||
-- MAGIC try:
|
||
-- MAGIC if df_ifexists: # 如果不为空
|
||
-- MAGIC spark.sql(f"TRUNCATE table tmp.tmp_chpa_raw_data")
|
||
-- MAGIC num = 1
|
||
-- MAGIC for i in df_all:
|
||
-- MAGIC i.withColumnRenamed('IMS.药品ID','IMS_DRUG_ID') \
|
||
-- MAGIC .withColumnRenamed('是否法伯编码','IS_HOSP_CODE') \
|
||
-- MAGIC .withColumnRenamed('规格','SPEC') \
|
||
-- MAGIC .withColumnRenamed('转换比','CONVERSION_RATIO') \
|
||
-- MAGIC .withColumnRenamed('剂型','DOSAGE_FORM') \
|
||
-- MAGIC .withColumnRenamed('价格','PRICE')
|
||
-- MAGIC # i.columns = ['area','city','yyyyqq','yyyymm','reimburse','prescription_source','prescription_dept_lv1','prescription_dept_lv2','prescription_dept_lv3','ATC','new_code','common_name','product_name','pack_des','PackSize','PackageType','nfc','manu_des','drug_delivery_route','prescription','sales_vol','sales_value'] #'h_level',
|
||
-- MAGIC # sdf = spark.createDataFrame(i)
|
||
-- MAGIC i.createOrReplaceTempView('fact_sales')
|
||
-- MAGIC spark.sql(f"INSERT into tmp.tmp_chpa_raw_data SELECT * FROM fact_sales")
|
||
-- MAGIC print(f'第{num}个')
|
||
-- MAGIC num +=1
|
||
-- MAGIC except Exception as e:
|
||
-- MAGIC print(e)
|
||
|
||
-- COMMAND ----------
|
||
|
||
-- MAGIC %md
|
||
-- MAGIC ### 将读取到的dataframe写入表中
|
||
|
||
-- COMMAND ----------
|
||
|
||
-- 全量覆盖
|
||
insert overwrite dwd.dwd_gnd_pharbers_prov_fact
|
||
select
|
||
year,
|
||
ym,
|
||
province_c,
|
||
ims_drug_id,
|
||
is_hosp_code,
|
||
prod_corp,
|
||
prod_cod,
|
||
pack_cod,
|
||
phcd,
|
||
prod_des,
|
||
cmps_des,
|
||
corp_des,
|
||
mnfl_cod,
|
||
prod_des_c,
|
||
cmps_c,
|
||
corp_des_c,
|
||
pack_des,
|
||
spec,
|
||
conversion_ratio,
|
||
dosage_form,
|
||
atc4_cod,
|
||
app1_cod,
|
||
app1_des,
|
||
app1_des_c,
|
||
app2_cod,
|
||
app2_des,
|
||
app2_des_c,
|
||
app3_cod,
|
||
app3_des,
|
||
app3_des_c,
|
||
vbp_batch,
|
||
vbp,
|
||
value,
|
||
totalunit,
|
||
countingunit,
|
||
price,
|
||
manu_des,
|
||
manu_des_c,
|
||
NULL AS source_file_path,
|
||
NULL AS source_file_name,
|
||
from_utc_timestamp(current_timestamp(),'UTC+8') AS etl_insert_dt
|
||
from tmp.tmp_chpa_raw_data |