new file: 02 dm_td_external_keycompatitor.sql new file: 03 dm_td_external_brand_market.sql new file: 04 dm_td_external_calendar.sql new file: 05 dm_td_external_exchangerate.sql new file: 07 dm_td_external_packinfo.sql new file: 08 dm_td_external_corp.sql new file: 09 dm_td_external_geo_type.sql new file: 11 DM_TD_EXTERNAL_MARKET_NEW.sql new file: 12 dm_td_external_org.sql new file: 13 external auth.sql new file: 14 dm_tf_external_retail_special_bkp.sql new file: AIA/01 dm_aia_pack_property.sql new file: AIA/02 dm_ext_aia_sales.sql new file: AIA/02 dm_ext_aia_sales_bakup_20230327.sql new file: AIA/03 dm_aia_flag.sql new file: AIA/04 dm_aia_provided_flag.sql new file: AIA/06 DM_TD_EXT_AIA_PACK_PROPERTY.sql new file: AIA/07 DM_TD_EXT_AIA_PACK2MARKET.sql new file: AIA/08 DM_TD_EXT_AIA_MARKET_RATIO.sql new file: AIA/09 DM_TD_EXT_AIA_MARKET_BRAND_RATIO.sql new file: AIA/10 DM_TD_EXT_AIA_MARKET_PACK_MAPPING.sql new file: AIA/11 DM_TD_EXT_AIA_TARGET_INST.sql new file: AIA/11 dm_aia_targethp_flag.sql new file: "AIA/z_03 dm_aia_flag_\345\244\207\344\273\275\347\224\250.sql" new file: CHC/01 dm_chc_pack_property.sql new file: CHC/02 DM_TF_EXT_CHC_SALES.sql new file: CHC/03 DM_TD_EXT_CHC_PACK_PROPERTY.sql new file: CHC/04 DM_TD_EXT_CHC_PACK2MARKET.sql new file: CHC/05 DM_TD_EXT_CHC_MARKET_RATIO.sql new file: CHC/06 DM_TD_EXT_CHC_MARKET_BRAND_RATIO.sql new file: CHC/07 DM_TD_EXT_CHC_MARKET_PACK_MAPPING.sql new file: CHPA/01 dwd_ims_atc_hierarchy.sql new file: CHPA/01 dwd_ims_nfc_hierarchy.sql new file: CHPA/01 dwd_ims_td_manufacturer_corp.sql new file: CHPA/01 dwd_ims_td_pack_property.sql new file: CHPA/01 dwd_update.sql new file: CHPA/01_FB_BLOB_TO_DWD.sql new file: CHPA/02 DWS_IMS_TD_GEO.sql new file: CHPA/02 dws_ims_td_atc_cn.sql new file: CHPA/02 dws_ims_td_corp_cn.sql new file: CHPA/02 dws_ims_td_date.sql new file: CHPA/02 dws_ims_td_manu_cn.sql new file: CHPA/02 dws_ims_td_market.sql new file: CHPA/02 dws_ims_td_market_ta.sql new file: CHPA/02 dws_ims_td_nfc_cn.sql new file: CHPA/02 dws_ims_td_prod_cn.sql new file: CHPA/02 tmp_ims_td_prod_tmp.sql new file: CHPA/02 tmp_ims_tf_fact_sales.sql new file: CHPA/03 DM_TD_EXT_CHPA_MARKET_BRAND_RATIO.sql new file: CHPA/03 DM_TD_EXT_CHPA_MARKET_PACK_MAPPING.sql new file: CHPA/03 DM_TD_EXT_CHPA_MARKET_RATIO.sql new file: CHPA/03 DM_TD_EXT_CHPA_PACK2MARKET.sql new file: CHPA/03 DM_TD_EXT_CHPA_PACK_PROPERTY.sql new file: CHPA/03 DM_TF_EXT_CHPA_SALES.sql new file: CHPA/03 dm_ims_td_calendar.sql new file: CHPA/03 dm_ims_td_geo.sql new file: CHPA/03 dm_ims_td_market_property.sql new file: CHPA/03 dm_ims_td_org.sql new file: CHPA/03 dm_ims_td_org_hvh.sql new file: CHPA/03 dm_ims_td_pack_property.sql new file: CHPA/03 dm_ims_tf_sales.sql new file: CHPA/03 dm_td_chpa_market_definition.sql new file: CHPA/03 dm_td_ims_city_mapping.sql new file: EC/03 ec_load_data.sql new file: EC/04 DM_TD_EXT_EC_PACK_PROPERTY.sql new file: EC/05 DM_TF_EXT_EC_SALES.sql new file: EC/06 DM_TD_EXT_EC_PACK2MARKET.sql new file: EC/07 DM_TD_EXT_EC_MARKET_RATIO.sql new file: EC/08 DM_TD_EXT_EC_MARKET_BRAND_RATIO.sql new file: EC/09 DM_TD_EXT_EC_MARKET_PACK_MAPPING.sql new file: EC/1 (ec)blob_to_dwd.sql new file: EC/2 dwd_inc_gnd_ext_ec_nationnal_pack_union_all.py new file: Merged_Data/Merged_Data_Config_table_bkp.sql new file: Merged_Data/Merged_Data_Config_table_bymonth.sql new file: Merged_Data/dm_tf_exteranl_sales_merged_data_dtp_others_bkp.sql new file: Merged_Data/dm_tf_exteranl_sales_merged_data_dtp_others_bymonth_bkp.sql new file: ORG/DM_TD_EXT_AIA_ORG.sql new file: ORG/DM_TD_EXT_CHC_ORG.sql new file: ORG/DM_TD_EXT_CHPA_ORG.sql new file: ORG/DM_TD_EXT_COUNTY_ORG.sql new file: ORG/DM_TD_EXT_EC_ORG.sql new file: ORG/DM_TD_EXT_RETAIL_ORG.sql new file: ORG/DM_TD_EXT_THC_ORG.sql new file: ORG/DM_TD_EXT_XIEHE_ORG.sql new file: OTHERS/01 dm_td_report_url.sql new file: OTHERS/02 dws_ext_email_warning.sql new file: OTHERS/external_triggered_email.py new file: Retail/01 load_tmp_data.py new file: Retail/02 split_brand_data.py new file: Retail/03 split_pack_data.py new file: Retail/04 map_to_dws_table.py new file: Retail/05 load_dtp_temp_data.py new file: Retail/06 split_dtp_brand_data.py new file: Retail/07 split_dtp_pack_data.py new file: Retail/08 map_to_dtp_dws_table.py new file: Retail/09 dwd_inc_gnd_ext_retail_nataional.py new file: Retail/10 map_to_retail_dm_table.py new file: Retail/11 map_to_overview_dm_table.py new file: Retail/12 dws_tf_external_retail_dtp_special.sql new file: Retail/13 DM_TF_EXT_RETAIL_SALES.sql new file: Retail/14 DM_TF_EXT_RETAIL_DTP_SALES.sql new file: Retail/15 DM_TD_EXT_RETAIL_PACK_PROPERTY.sql new file: Retail/16 DM_TD_EXT_RETAIL_DTP_PACK_PROPERTY.sql new file: Retail/17 DM_TD_EXT_DTP_PACK2MARKET.sql new file: Retail/17 DM_TD_EXT_RETAIL_PACK2MARKET.sql new file: Retail/18 DM_TD_EXT_DTP_MARKET_RATIO.sql new file: Retail/18 DM_TD_EXT_RETAIL_MARKET_RATIO.sql new file: Retail/19 DM_TD_EXT_DTP_MARKET_BRAND_RATIO.sql new file: Retail/19 DM_TD_EXT_RETAIL_MARKET_BRAND_RATIO.sql new file: Retail/20 DM_TD_EXT_DTP_MARKET_PACK_MAPPING.sql new file: Retail/20 DM_TD_EXT_RETAIL_MARKET_PACK_MAPPING.sql new file: "Retail/z1 dwd_inc_gnd_ext_retail_nataional_\344\275\234\345\272\237.py" new file: "Retail/z2 retail_load_data_\344\275\234\345\272\237.sql" new file: "Retail/z3 retail_overview_data_\344\275\234\345\272\237.sql" new file: THC/01 dm_thc_pack_property.sql new file: THC/02 dm_ext_thc_sales.sql new file: THC/02 dm_ext_thc_sales_bakup_20260327.sql new file: THC/03 DM_TF_EXT_THC_MARKET_SALES_CHT.sql new file: THC/04 dm_tf_external_sales_thc.sql new file: THC/05 DM_TD_EXT_THC_PACK_PROPERTY.sql new file: THC/06 DM_TD_EXT_THC_PACK2MARKET.sql new file: THC/07 DM_TD_EXT_THC_MARKET_RATIO.sql new file: THC/08 DM_TD_EXT_THC_MARKET_BRAND_RATIO.sql new file: THC/09 DM_TD_EXT_THC_MARKET_PACK_MAPPING.sql new file: UNIONALL/DM_TD_EXT_UNIONALL_MARKET_PACK_MAPPING.sql new file: UNIONALL/DM_TD_EXT_UNIONALL_PACKINFO.sql new file: UNIONALL/DM_TF_EXT_UNIONALL_MARKET_SALES.sql new file: UNIONALL/DM_TF_EXT_UNIONALL_MARKET_SALES_v1.0.sql new file: UNIONALL/DM_TF_EXT_UNIONALL_SALES.sql new file: UNIONALL/DM_TF_EXT_UNIONALL_SALES_MAPPING.sql new file: UNIONALL/DM_TF_EXT_UNIONALL_SALES_MAPPING_NIAD.sql new file: XIEHE/01 xiehe_blob_to_dwd.py new file: XIEHE/02 dm_xiehe_pack_property.sql new file: XIEHE/03 dm_ext_xiehe_geo.sql new file: XIEHE/04 dm_ext_xiehe_sales.sql new file: XIEHE/05 dm_td_xiehe_core_dept.sql new file: XIEHE/06 DM_TF_EXT_XIEHE_SALES.sql new file: XIEHE/07 DM_TD_EXT_XIEHE_PACK_PROPERTY.sql new file: XIEHE/08 DM_TD_EXT_XIEHE_PACK2MARKET.sql new file: XIEHE/09 DM_TD_EXT_XIEHE_MARKET_RATIO.sql new file: XIEHE/10 DM_TD_EXT_XIEHE_MARKET_BRAND_RATIO.sql new file: XIEHE/11 DM_TD_EXT_XIEHE_MARKET_PACK_MAPPING.sql new file: XIEHE/bkp_01 xiehe_blob2dwd.py new file: XIEHE/bkp_02 dm_ext_xiehe_sales.sql new file: XIEHE/bkp_03 dm_ext_xiehe_pack_property.sql new file: county/01 tmp_ims_county_fact_sales_sum.sql new file: county/02 tmp_imscounty_Result.sql new file: county/03 dm_ims_td_county_geo.sql new file: county/04 dws_ext_county_tf_sales.sql new file: county/05 dm_ext_county_td_pack_property.sql new file: county/06 dm_td_county_pack_region.sql new file: county/07 dm_ext_county_tf_sales_region.sql new file: county/08 DM_TD_EXT_COUNTY_PACK_PROPERTY.sql new file: county/09 DM_TF_EXT_COUNTY_SALES.sql new file: county/10 DM_TD_EXT_COUNTY_PACK2MARKET.sql new file: county/11 DM_TD_EXT_COUNTY_MARKET_RATIO.sql new file: county/12 DM_TD_EXT_COUNTY_MARKET_BRAND_RATIO.sql new file: county/13 DM_TD_EXT_COUNTY_MARKET_PACK_MAPPING.sql new file: for_AIA_Dashboard/01 dm_td_aia_inst_mkt.sql new file: for_AIA_Dashboard/02 dm_td_aia_auth_sales.sql new file: for_AIA_Dashboard/03 dm_td_aia_original_col.sql new file: for_AIA_Dashboard/04 dm_td_aia_nosales_inst.sql new file: for_AIA_Dashboard/05 dm_td_aia_is_eagle_flag.sql new file: for_AIA_Dashboard/06 dm_td_aia_rank.sql new file: for_AIA_Dashboard/07 dm_ext_aia_data_remove_flag.sql new file: for_AIA_Dashboard/07 dm_td_aia_remove_special_ins_bkp.py new file: for_AIA_Dashboard/08 dm_ext_aia_data_quality_flag.sql new file: z 01 dm_tf_external_sales.sql new file: "z 10 dm_td_external_market_pack_mapping_\344\275\234\345\272\237.sql" new file: "z 11 dm_td_external_market_\344\275\234\345\272\237.sql" new file: "\344\270\212\347\272\277\350\204\232\346\234\254.sql" new file: "\346\225\260\346\215\256\351\252\214\350\257\201.sql"
194 lines
6.6 KiB
Python
194 lines
6.6 KiB
Python
# Databricks notebook source
|
|
# MAGIC %md
|
|
# MAGIC ### 从blob读取csv文件作为xiehe的事实表
|
|
|
|
# COMMAND ----------
|
|
|
|
# MAGIC %run ../../../Common/config
|
|
|
|
# COMMAND ----------
|
|
|
|
from datetime import datetime, timedelta
|
|
import pandas as pd
|
|
|
|
# COMMAND ----------
|
|
|
|
if ENVIRONMENT == PRD_ENVIRONMENT_VALUE:
|
|
factsales_file_path_template = "abfss://master@azcdatalakeprd.dfs.core.chinacloudapi.cn/ODS/GND/UserUpload/"
|
|
elif ENVIRONMENT == TEST_ENVIRONMENT_VALUE:
|
|
factsales_file_path_template = "abfss://master@retaildlstoragetest.dfs.core.chinacloudapi.cn/ODS/GND/UserUpload/"
|
|
|
|
# COMMAND ----------
|
|
|
|
# 计算时间得到当天的路径
|
|
current_date = datetime.utcnow() + timedelta(hours=8)
|
|
date_path = current_date.strftime("%Y/%m/%d/")
|
|
base_path = factsales_file_path_template + date_path
|
|
|
|
# COMMAND ----------
|
|
|
|
# 路径是否存在
|
|
def path_exists(path):
|
|
try:
|
|
dbutils.fs.ls(path)
|
|
return True
|
|
except Exception as e:
|
|
if "java.io.FileNotFoundException" in str(e):
|
|
return False
|
|
else:
|
|
print(f"检查路径 {path} 时出错: {e}")
|
|
raise
|
|
|
|
# COMMAND ----------
|
|
|
|
# 列出blob上的文件列表
|
|
def list_file_name(path):
|
|
first_path_list = [i.path for i in dbutils.fs.ls(path)]
|
|
second_path_list = [dbutils.fs.ls(i)[0] for i in first_path_list ]
|
|
return second_path_list
|
|
|
|
# COMMAND ----------
|
|
|
|
# 从blob下载文件到local
|
|
def download_file(file_path, local_path):
|
|
# dbutils.fs.cp(file_path, local_path.replace("/dbfs", ""))
|
|
dbutils.fs.cp(file_path, local_path)
|
|
print(f"已下载 {file_path} 到 {local_path}")
|
|
return local_path
|
|
|
|
# COMMAND ----------
|
|
|
|
# MAGIC %md
|
|
# MAGIC ### 获取路径下的文件名称
|
|
# MAGIC - 并挑出符合条件的文件路径
|
|
|
|
# COMMAND ----------
|
|
|
|
try:
|
|
if path_exists(base_path):
|
|
all_file_list = list_file_name(base_path)
|
|
# 生成df来筛选内容
|
|
files_df = pd.DataFrame([{
|
|
'path':f.path,
|
|
'modificationtime': f.modificationTime,
|
|
'name': f.name
|
|
} for f in all_file_list])
|
|
print(f"{base_path} 路径存在")
|
|
else:
|
|
print(f"{base_path} 路径不存在")
|
|
except Exception as e:
|
|
print(e)
|
|
|
|
# COMMAND ----------
|
|
|
|
try:
|
|
files_df = files_df.sort_values('modificationtime', ascending=False).drop_duplicates('name').sort_index()
|
|
files_df = files_df[files_df['name'].str.match(r'^Dept_Fact.*\.csv$')]
|
|
files_df
|
|
except Exception as e:
|
|
print(e)
|
|
|
|
# COMMAND ----------
|
|
|
|
# MAGIC %md
|
|
# MAGIC ### 读取文件内容
|
|
|
|
# COMMAND ----------
|
|
|
|
import os
|
|
|
|
# COMMAND ----------
|
|
|
|
# 下载数据到local
|
|
try:
|
|
if files_df['path'].tolist():
|
|
# 如果列表不为空
|
|
df_all = []
|
|
for file in files_df['path'].tolist():
|
|
local_path = download_file(file, f"/Volumes/{NGBI_CATALOG}/tmp/volume_tmp/tmp/{os.path.basename(file)}")
|
|
file_df = (spark.read.option("header", "true").option("quote", '"').option("escape", '"').option("multiLine", "true").option("mode", "PERMISSIVE").csv(local_path))
|
|
print(f'已读取{local_path}')
|
|
df_all.append(file_df)
|
|
# df_combine = pd.concat(df_all)
|
|
df_ifexists = True
|
|
else:
|
|
print('没有符合条件的文件')
|
|
df_ifexists= False
|
|
except Exception as e:
|
|
print(e)
|
|
|
|
# COMMAND ----------
|
|
|
|
try:
|
|
if df_ifexists: # 如果不为空
|
|
spark.sql(f"TRUNCATE table tmp.tmp_xiehe_raw_data")
|
|
num = 1
|
|
for i in df_all:
|
|
df_renamed = i.withColumnRenamed('区域','area') \
|
|
.withColumnRenamed('城市','city') \
|
|
.withColumnRenamed('年&季度','yyyyqq') \
|
|
.withColumnRenamed('月','yyyymm') \
|
|
.withColumnRenamed('医保类型','reimburse') \
|
|
.withColumnRenamed('处方来源','prescription_source') \
|
|
.withColumnRenamed('处方科室_lv1','prescription_dept_lv1') \
|
|
.withColumnRenamed('处方科室_lv2','prescription_dept_lv2') \
|
|
.withColumnRenamed('处方科室_lv3','prescription_dept_lv3') \
|
|
.withColumnRenamed('ATC','ATC') \
|
|
.withColumnRenamed('PHCD标准码','new_code') \
|
|
.withColumnRenamed('药品通用名','common_name') \
|
|
.withColumnRenamed('药品商品名','product_name') \
|
|
.withColumnRenamed('规格','pack_des') \
|
|
.withColumnRenamed('PackSize','PackSize') \
|
|
.withColumnRenamed('PackageType','PackageType') \
|
|
.withColumnRenamed('给药途径','nfc') \
|
|
.withColumnRenamed('药品厂家','manu_des') \
|
|
.withColumnRenamed('药品剂型','drug_delivery_route') \
|
|
.withColumnRenamed('处方张数','prescription') \
|
|
.withColumnRenamed('取药数量','sales_vol') \
|
|
.withColumnRenamed('处方金额','sales_value')
|
|
# i.columns = ['area','city','yyyyqq','yyyymm','reimburse','prescription_source','prescription_dept_lv1','prescription_dept_lv2','prescription_dept_lv3','ATC','new_code','common_name','product_name','pack_des','PackSize','PackageType','nfc','manu_des','drug_delivery_route','prescription','sales_vol','sales_value'] #'h_level',
|
|
# sdf = spark.createDataFrame(i)
|
|
df_renamed.createOrReplaceTempView('fact_sales')
|
|
spark.sql(f"INSERT into tmp.tmp_xiehe_raw_data SELECT area,city,yyyyqq,yyyymm,reimburse,prescription_source,prescription_dept_lv1,prescription_dept_lv2,prescription_dept_lv3,ATC,new_code,common_name,product_name,pack_des,PackSize,PackageType,nfc,manu_des,drug_delivery_route,prescription,sales_vol,sales_value FROM fact_sales")
|
|
print(f'第{num}个')
|
|
num +=1
|
|
except Exception as e:
|
|
print(e)
|
|
|
|
# COMMAND ----------
|
|
|
|
# MAGIC %md
|
|
# MAGIC ### 将读取到的dataframe写入表中
|
|
|
|
# COMMAND ----------
|
|
|
|
# MAGIC %sql
|
|
# MAGIC -- 全量覆盖
|
|
# MAGIC insert overwrite dwd.dwd_gnd_ext_xiehe_raw_data
|
|
# MAGIC select
|
|
# MAGIC area ,
|
|
# MAGIC city ,
|
|
# MAGIC yyyyqq ,
|
|
# MAGIC yyyymm ,
|
|
# MAGIC null h_level ,
|
|
# MAGIC reimburse ,
|
|
# MAGIC prescription_source ,
|
|
# MAGIC prescription_dept_lv1 ,
|
|
# MAGIC prescription_dept_lv2 ,
|
|
# MAGIC prescription_dept_lv3 ,
|
|
# MAGIC ATC ,
|
|
# MAGIC new_code ,
|
|
# MAGIC common_name ,
|
|
# MAGIC product_name ,
|
|
# MAGIC pack_des ,
|
|
# MAGIC cast ( PackSize as BIGINT) PackSize ,
|
|
# MAGIC PackageType ,
|
|
# MAGIC nfc ,
|
|
# MAGIC manu_des ,
|
|
# MAGIC drug_delivery_route,
|
|
# MAGIC cast ( prescription as BIGINT) prescription ,
|
|
# MAGIC cast ( sales_vol as DECIMAL(38,8)) sales_vol,
|
|
# MAGIC cast ( sales_value as DECIMAL(38,8)) sales_value ,
|
|
# MAGIC from_utc_timestamp(current_timestamp(),'UTC+8') etl_insert_dt,
|
|
# MAGIC from_utc_timestamp(current_timestamp(),'UTC+8') etl_update_dt
|
|
# MAGIC from tmp.tmp_xiehe_raw_data |