Files
MarketAnalysis-ETL/Retail/09 dwd_inc_gnd_ext_retail_nataional.py
2026-04-28 09:22:48 +00:00

183 lines
8.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Databricks notebook source
#当更新pack 或品牌 事实数据时需要运行此代码,否则无需运行。
# COMMAND ----------
# MAGIC %sql
# MAGIC update dwd.dwd_gnd_ext_retail_corresponding_relationship set table_name ='dwd.dwd_gnd_ext_retail_nataional_oap' where file_name ='pack-CV-抗血栓2通用名-全国.xlsx';
# MAGIC update dwd.dwd_gnd_ext_retail_corresponding_relationship set table_name ='dwd.dwd_gnd_ext_retail_htn' where file_name ='pack-CV-高血压-化学药-全国.xlsx';
# MAGIC update dwd.dwd_gnd_ext_retail_corresponding_relationship set table_name ='dwd.dwd_gnd_ext_retail_atomizer' where file_name ='pack-雾化器-全国&县域数据.xlsx';
# MAGIC update dwd.dwd_gnd_ext_retail_corresponding_relationship set table_name ='dwd.dwd_gnd_ext_retail_anti_asthma_copd' where file_name ='pack-RE-慢阻肺-全国.xlsx';
# MAGIC update dwd.dwd_gnd_ext_retail_corresponding_relationship set table_name ='dwd.dwd_gnd_ext_zk_brand' where file_name ='Brand-品牌数据报表.xlsx';
# MAGIC update dwd.dwd_gnd_ext_retail_corresponding_relationship set table_name ='dwd.dwd_gnd_ext_retail_statin_xzk' where file_name ='pack-CV-他汀类+血脂康-全国.xlsx';
# MAGIC update dwd.dwd_gnd_ext_retail_corresponding_relationship set table_name ='dwd.dwd_gnd_ext_retail_nataional_rd' where file_name ='pack-RD-肾科-全国.xlsx';
# MAGIC update dwd.dwd_gnd_ext_retail_corresponding_relationship set table_name ='dwd.dwd_gnd_ext_retail_aagsa_ppi_oral' where file_name ='pack-GI-慢性胃炎胃溃疡-全国.xlsx';
# MAGIC update dwd.dwd_gnd_ext_retail_corresponding_relationship set table_name ='dwd.dwd_gnd_ext_retail_nataional_niad' where file_name ='pack-DM-口服降糖化学药.xlsx';
# MAGIC update dwd.dwd_gnd_ext_retail_corresponding_relationship set table_name ='dwd.dwd_gnd_ext_retail_metoprolol_tartrat' where file_name ='pack-CV-酒石酸美托洛尔.xlsx';
# MAGIC
# COMMAND ----------
# brand+ 省份数据自动接入
#获取配置表信息(表名、brand_flag
dfband = spark.sql("""
SELECT DISTINCT table_name tab ,file_name brand_flag FROM dwd.dwd_gnd_ext_retail_corresponding_relationship
where type_name ='BRAND'
""").collect()
def get_union_brand_data(df):
#数据为空
if df == None:
return None
#初始化结果集
union_query = None
for table in df:
# 选择当前表名
T = str(table.tab)
# 获取对应brand表维度对应得 market 名称
pack_flag = str(table.brand_flag)
sql = f"""
select
cast(left(quarter, 4)*100 + right(quarter,1)*3 as int ) AS YYYYMM
,cast(left(quarter, 4) as int ) AS year
,right(quarter, 2) AS quarter
,quarter AS yq
,type AS brand_cat_type
,case when ta = 'NIAD' then 'DM' else ta end AS TA
,market AS market
,zk_brand_category AS zk_brand_category
,zk_common_name AS zk_common_name
,zk_manu_des AS zk_manu_des
,rc_name_en AS rc_name_en
,province_city AS province_city
,ytd AS ytd
,cast(sales_value * 1000000 as decimal(30,10)) AS sales_val
,cast(sales_volume * 1000000 as decimal(30,10)) AS sales_vol
,cast(price as decimal(30,10)) as price
,cast(num_dist_rate as decimal(30,10)) as num_dist_rate
,cast(weig_dist_rate as decimal(30,10)) as weig_dist_rate
,cast(value_share as decimal(30,10)) as val_share
,cast(volume_share as decimal(30,10)) as vol_share
,replace(key_brand_ytd,'-','') as key_brand_ytd
,cast(replace(key_brand_rank_ytd,'-','0') as int) as key_brand_rank_ytd
,replace(top_brand_ytd,'-','') as top_brand_ytd
,cast(replace(top_brand_ms_ytd,'-','0') as decimal(30,10)) as top_brand_ms_ytd
,cast(replace(top_brand_inc_ms_ytd,'-','0') as decimal(30,10)) as top_brand_inc_ms_ytd
,cast(replace(top_brand_gr_ytd,'-','0') as decimal(30,10)) as top_brand_gr_ytd
,replace(key_brand_qtd,'-','') as key_brand_qtd
,cast(replace(key_brand_rank_qtd,'-','0') as int) as key_brand_rank_qtd
,replace(top_brand_qtd,'-','') as top_brand_qtd
,cast(replace(top_brand_ms_qtd,'-','0') as decimal(30,10)) as top_brand_ms_qtd
,cast(replace(top_brand_inc_ms_qtd,'-','0') as decimal(30,10)) as top_brand_inc_ms_qtd
,cast(replace(top_brand_gr_qtd,'-','0') as decimal(30,10)) as top_brand_gr_qtd
,ranked_by as ranked_by
,'{pack_flag}' as pack_flag
,from_utc_timestamp(current_timestamp(),'UTC+8') as etl_insert_dt
,from_utc_timestamp(current_timestamp(),'UTC+8') as etl_update_dt
from {T}
"""
# 读取数据
current_query = spark.sql(sql)
#union 数据
if union_query == None:
union_query = current_query
else:
union_query = union_query.union(current_query)
#返回数据集 / 写入表也行???
return union_query
brand_result = get_union_brand_data(dfband)
brand_result.write.mode("overwrite").saveAsTable("dwd.dwd_inc_gnd_ext_retail_nataional_brand_union_all")
# COMMAND ----------
# MAGIC %md
# MAGIC ###新逻辑
# MAGIC - 修改brand数据先拆分成月维度的数据
# COMMAND ----------
# %sql
# /*
# 修改时间20250311
# 修改人chenwu
# 修改内容brand来数频率为 季度来数, 但是 pack 为 月度来数据,需要用季度的数据/3得到月度的
# 修改时间20260428
# 修改人zhanghaoyi
# 修改内容:上游汇总为季度数据, 无需拆分
# */
# insert overwrite table dwd.dwd_inc_gnd_ext_retail_nataional_brand_union_all
# with quarterly_table as (
# select
# *
# from dwd.dwd_inc_gnd_ext_retail_nataional_brand_union_all
# where market not in ('NIAD','Inhaled Extended Market','布地奈德雾化溶液')
# -- 范围内只能是 季度来数据的,如果有月度来数据的需要排除掉
# )
# ,month_table as (--转化成月度数据
# SELECT
# SUBSTR(q.yq, 1, 4)*100 + -- 提取年份
# LPAD(m.month_num, 2, '0') -- 补零月份
# AS YYYYMM -- 月份首日
# ,`year`
# ,`quarter`
# ,yq
# ,brand_cat_type
# ,TA
# ,market
# ,zk_brand_category
# ,zk_common_name
# ,zk_manu_des
# ,rc_name_en
# ,province_city
# ,ytd
# ,sales_val /3 --除3
# ,sales_vol /3 --除3
# ,price
# ,num_dist_rate
# ,weig_dist_rate
# ,val_share
# ,vol_share
# ,key_brand_ytd
# ,key_brand_rank_ytd
# ,top_brand_ytd
# ,top_brand_ms_ytd
# ,top_brand_inc_ms_ytd
# ,top_brand_gr_ytd
# ,key_brand_qtd
# ,key_brand_rank_qtd
# ,top_brand_qtd
# ,top_brand_ms_qtd
# ,top_brand_inc_ms_qtd
# ,top_brand_gr_qtd
# ,ranked_by
# ,pack_flag
# ,etl_insert_dt
# ,etl_update_dt
# FROM
# quarterly_table q
# LATERAL VIEW EXPLODE( -- 为每季度生成三个月
# CASE
# WHEN RIGHT(q.yq, 2) = 'Q1' THEN ARRAY(1, 2, 3)
# WHEN RIGHT(q.yq, 2) = 'Q2' THEN ARRAY(4, 5, 6)
# WHEN RIGHT(q.yq, 2) = 'Q3' THEN ARRAY(7, 8, 9)
# WHEN RIGHT(q.yq, 2) = 'Q4' THEN ARRAY(10, 11, 12)
# END
# ) m AS month_num
# )
# ,other_not_quarterly_table (
# select
# *
# from dwd.dwd_inc_gnd_ext_retail_nataional_brand_union_all
# where market in ('NIAD','Inhaled Extended Market','布地奈德雾化溶液')
# -- 范围内只能是 月度来数据的
# )
# select * from month_table
# union all
# select * from other_not_quarterly_table