haoyi修改

This commit is contained in:
2026-04-28 09:22:48 +00:00
parent 3b08537f5e
commit 54ba912b2a
6 changed files with 688 additions and 125 deletions

View File

@@ -0,0 +1,381 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 0,
"metadata": {
"application/vnd.databricks.v1+cell": {
"cellMetadata": {},
"inputWidgets": {},
"nuid": "4d16488f-0327-4ced-b23f-41f960a90d2f",
"showTitle": false,
"tableResultSettingsMap": {},
"title": ""
}
},
"outputs": [],
"source": [
"############################################################START##############################################################\n",
"### STEP-1: insert splited pack data into tmp final table: tmp_retail_final_sales"
]
},
{
"cell_type": "code",
"execution_count": 0,
"metadata": {
"application/vnd.databricks.v1+cell": {
"cellMetadata": {
"byteLimit": 2048000,
"implicitDf": true,
"rowLimit": 10000
},
"inputWidgets": {},
"nuid": "1168666b-255b-44a3-968b-4156c93dad53",
"showTitle": false,
"tableResultSettingsMap": {},
"title": ""
}
},
"outputs": [],
"source": [
"%sql\n",
"-------------------------------------------------------------------------------------\n",
"-- STEP-1: insert splited pack data into tmp final table\n",
"-- insert into tmp_retail_final_sales\n",
"-------------------------------------------------------------------------------------\n",
"\n",
"with tmp_pack as (\n",
" select \n",
" ------------------------------------------------------ \n",
" -- 有月度数据使用月度数据,无月度数据用季度数据去转\n",
" nvl(\n",
" a.month,\n",
" CONCAT(\n",
" SUBSTRING(a.quarter, 1, 4), -- 提取年份前4位\n",
" CASE \n",
" WHEN SUBSTRING(a.quarter, 6, 1) = '1' THEN '03' -- Q1 → 03月\n",
" WHEN SUBSTRING(a.quarter, 6, 1) = '2' THEN '06' -- Q2 → 06月\n",
" WHEN SUBSTRING(a.quarter, 6, 1) = '3' THEN '09' -- Q3 → 09月\n",
" WHEN SUBSTRING(a.quarter, 6, 1) = '4' THEN '12' -- Q4 → 12月\n",
" END\n",
" )\n",
" ) as YYYYMM,\n",
" ------------------------------------------------------ \n",
" a.pack_code as iqvia_pack_code,\n",
" a.product_id as zk_product_id,\n",
" case when a.product_desc <> 'others' then a.product_desc else null end as prod_des_c,\n",
" case when a.product_desc <> 'others' then a.product_desc else concat('Others_', a.molecule_desc) end as PROD_MAPPING,\n",
" a.zk_regin as province_city,\n",
" a.level_market as market,\n",
" a.sales_value,\n",
" a.sales_unit,\n",
" ------------------------------------------------------ \n",
" -- counting_unit取值逻辑\n",
" -- 不能直接取原始pack文件表中的值改为取pack_property表中counting_unit / unit的值\n",
" a.sales_unit * (b.counting_unit/ coalesce(b.unit,1)) as counting_unit,\n",
" ------------------------------------------------------ \n",
" case when data_flag = 0 then 1 else 2 end as pack_flag,\n",
" case when brand_flag = 1 then 1 else 2 end as brand_flag\n",
" from tmp.tmp_retail_pack_rawdata a\n",
" left join dwd.dwd_gnd_ext_retail_pack_property b \n",
" on a.product_id = b.product_id\n",
"), tmp_has_roc as (\n",
" select\n",
" product_id,\n",
" quarter,\n",
" ------------------------------------------------------ \n",
" -- 有月度数据使用月度数据,无月度数据用季度数据去转\n",
" nvl(\n",
" month,\n",
" CONCAT(\n",
" SUBSTRING(quarter, 1, 4), -- 提取年份前4位\n",
" CASE \n",
" WHEN SUBSTRING(quarter, 6, 1) = '1' THEN '03' -- Q1 → 03月\n",
" WHEN SUBSTRING(quarter, 6, 1) = '2' THEN '06' -- Q2 → 06月\n",
" WHEN SUBSTRING(quarter, 6, 1) = '3' THEN '09' -- Q3 → 09月\n",
" WHEN SUBSTRING(quarter, 6, 1) = '4' THEN '12' -- Q4 → 12月\n",
" END\n",
" )\n",
" ) as month,\n",
" ------------------------------------------------------ \n",
" pack_code\n",
" from tmp.tmp_retail_pack_rawdata \n",
" where zk_regin = 'ROC'\n",
"), tmp_pack_this_year_with_roc as (\n",
" select \n",
" * \n",
" from tmp_pack a \n",
" where exists(\n",
" select * from tmp_has_roc b \n",
" where a.YYYYMM = b.month\n",
" and a.iqvia_pack_code = b.pack_code\n",
" and a.zk_product_id = b.product_id\n",
" ) and a.province_city <> '全国'\n",
"), tmp_pack_next_year_with_roc as (\n",
" select \n",
" cast(YYYYMM + 100 as int) as YYYYMM,\n",
" iqvia_pack_code,\n",
" zk_product_id,\n",
" prod_des_c,\n",
" PROD_MAPPING,\n",
" province_city,\n",
" market,\n",
" sales_value as sales_value_ly,\n",
" sales_unit as sales_unit_ly,\n",
" counting_unit as counting_unit_ly,\n",
" pack_flag,\n",
" brand_flag\n",
" from tmp_pack a\n",
" where YYYYMM + 100 <= (select max(YYYYMM) from tmp_pack)\n",
" and exists(\n",
" select * from tmp_has_roc b \n",
" where a.YYYYMM = b.month\n",
" and a.iqvia_pack_code = b.pack_code\n",
" and a.zk_product_id = b.product_id\n",
" ) and a.province_city <> '全国'\n",
"\n",
"), tmp_pack_this_year_without_roc as (\n",
" select \n",
" *\n",
" from tmp_pack a \n",
" where not exists(\n",
" select * from tmp_has_roc b \n",
" where a.YYYYMM = b.month\n",
" and a.iqvia_pack_code = b.pack_code\n",
" and a.zk_product_id = b.product_id\n",
" )\n",
"), tmp_pack_next_year_without_roc as (\n",
" select \n",
" cast(YYYYMM + 100 as int) as YYYYMM,\n",
" iqvia_pack_code,\n",
" zk_product_id,\n",
" prod_des_c,\n",
" PROD_MAPPING,\n",
" province_city,\n",
" market,\n",
" sales_value as sales_value_ly,\n",
" sales_unit as sales_unit_ly,\n",
" counting_unit as counting_unit_ly,\n",
" pack_flag,\n",
" brand_flag\n",
" from tmp_pack a \n",
" where YYYYMM + 100 <= (select max(YYYYMM) from tmp_pack)\n",
" and not exists(\n",
" select * from tmp_has_roc b \n",
" where a.YYYYMM = b.month\n",
" and a.iqvia_pack_code = b.pack_code\n",
" and a.zk_product_id = b.product_id\n",
" )\n",
"), tmp_final_sales as (\n",
" select \n",
" ifnull(a.yyyymm, b.yyyymm) as yyyymm,\n",
" ifnull(a.iqvia_pack_code, b.iqvia_pack_code) as iqvia_pack_code, \n",
" ifnull(a.zk_product_id, b.zk_product_id) as zk_product_id,\n",
" ifnull(a.prod_des_c, b.prod_des_c) as prod_des_c,\n",
" ifnull(a.PROD_MAPPING, b.PROD_MAPPING) as PROD_MAPPING,\n",
" ifnull(a.province_city, b.province_city) as province_city,\n",
" ifnull(a.market, b.market) as market,\n",
" ifnull(a.sales_value, 0) as sales_value,\n",
" ifnull(a.sales_unit, 0) as sales_unit,\n",
" ifnull(a.counting_unit, 0) as counting_unit,\n",
" ifnull(a.pack_flag, b.pack_flag) as pack_flag,\n",
" ifnull(a.brand_flag,b.brand_flag ) as brand_flag,\n",
" ifnull(b.sales_value_ly, 0) as sales_value_ly,\n",
" ifnull(b.sales_unit_ly, 0) as sales_unit_ly,\n",
" ifnull(b.counting_unit_ly, 0) as counting_unit_ly\n",
" from tmp_pack_this_year_with_roc a \n",
" full outer join tmp_pack_next_year_with_roc b \n",
" on a.YYYYMM = b.YYYYMM\n",
" and a.iqvia_pack_code = b.iqvia_pack_code\n",
" and a.zk_product_id = b.zk_product_id\n",
" and a.province_city = b.province_city\n",
"\n",
" union all\n",
"\n",
" select \n",
" ifnull(c.yyyymm, d.yyyymm) as yyyymm,\n",
" ifnull(c.iqvia_pack_code, d.iqvia_pack_code) as iqvia_pack_code, \n",
" ifnull(c.zk_product_id, d.zk_product_id) as zk_product_id,\n",
" ifnull(c.prod_des_c, d.prod_des_c) as prod_des_c,\n",
" ifnull(c.PROD_MAPPING, d.PROD_MAPPING) as PROD_MAPPING,\n",
" 'ROC' as province_city,\n",
" ifnull(c.market, d.market) as market,\n",
" ifnull(c.sales_value, 0) as sales_value,\n",
" ifnull(c.sales_unit, 0) as sales_unit,\n",
" ifnull(c.counting_unit, 0) as counting_unit,\n",
" --ifnull(c.pack_flag, d.pack_flag) as pack_flag,\n",
" 2 as pack_flag, -- 此类没有拆分比例且pack只有全国的数pack_flag固定为2\n",
" ifnull(c.brand_flag,d.brand_flag ) as brand_flag,\n",
" ifnull(d.sales_value_ly, 0) as sales_value_ly,\n",
" ifnull(d.sales_unit_ly, 0) as sales_unit_ly,\n",
" ifnull(d.counting_unit_ly, 0) as counting_unit_ly\n",
" from tmp_pack_this_year_without_roc c \n",
" full outer join tmp_pack_next_year_without_roc d \n",
" on c.YYYYMM = d.YYYYMM\n",
" and c.iqvia_pack_code = d.iqvia_pack_code\n",
" and c.zk_product_id = d.zk_product_id\n",
" and c.province_city = d.province_city \n",
")\n",
"\n",
"-- insert overwrite table tmp.tmp_retail_final_sales\n",
"\n",
"-- select\n",
"-- yyyymm,\n",
"-- iqvia_pack_code,\n",
"-- zk_product_id,\n",
"-- prod_des_c,\n",
"-- PROD_MAPPING,\n",
"-- province_city,\n",
"-- market,\n",
"-- sales_value,\n",
"-- sales_value_ly,\n",
"-- sales_unit,\n",
"-- sales_unit_ly,\n",
"-- counting_unit,\n",
"-- counting_unit_ly,\n",
"-- pack_flag,\n",
"-- brand_flag\n",
"-- from tmp_final_sales\n",
"-- order by yyyymm\n",
", tmp_niad_aggregated as (\n",
" select \n",
" concat(left(yyyymm,4),\n",
" case right(yyyymm,2)\n",
" when '01' then 'Q1' when '02' then 'Q1' when '03' then 'Q1'\n",
" when '04' then 'Q2' when '05' then 'Q2' when '06' then 'Q2'\n",
" when '07' then 'Q3' when '08' then 'Q3' when '09' then 'Q3'\n",
" else 'Q4'\n",
" end) as yyyymm_quarter,\n",
" concat(left(yyyymm,4),\n",
" case right(yyyymm,2)\n",
" when '01' then '03' when '02' then '03' when '03' then '03'\n",
" when '04' then '06' when '05' then '06' when '06' then '06'\n",
" when '07' then '09' when '08' then '09' when '09' then '09'\n",
" else '12'\n",
" end) as yyyymm,\n",
" iqvia_pack_code,\n",
" zk_product_id,\n",
" prod_des_c,\n",
" PROD_MAPPING,\n",
" province_city,\n",
" market,\n",
" sum(sales_value) as sales_value,\n",
" sum(sales_unit) as sales_unit,\n",
" sum(counting_unit) as counting_unit,\n",
" max(pack_flag) as pack_flag,\n",
" max(brand_flag) as brand_flag,\n",
" sum(sales_value_ly) as sales_value_ly,\n",
" sum(sales_unit_ly) as sales_unit_ly,\n",
" sum(counting_unit_ly) as counting_unit_ly\n",
" from tmp_final_sales\n",
" where market = 'NIAD'\n",
" group by \n",
" concat(left(yyyymm,4),\n",
" case right(yyyymm,2)\n",
" when '01' then 'Q1' when '02' then 'Q1' when '03' then 'Q1'\n",
" when '04' then 'Q2' when '05' then 'Q2' when '06' then 'Q2'\n",
" when '07' then 'Q3' when '08' then 'Q3' when '09' then 'Q3'\n",
" else 'Q4'\n",
" end),\n",
" concat(left(yyyymm,4),\n",
" case right(yyyymm,2)\n",
" when '01' then '03' when '02' then '03' when '03' then '03'\n",
" when '04' then '06' when '05' then '06' when '06' then '06'\n",
" when '07' then '09' when '08' then '09' when '09' then '09'\n",
" else '12'\n",
" end),\n",
" iqvia_pack_code,\n",
" zk_product_id,\n",
" prod_des_c,\n",
" PROD_MAPPING,\n",
" province_city,\n",
" market\n",
")\n",
"\n",
"-- 最终结果NIAD 用季度聚合,其他保持原样\n",
"insert overwrite table tmp.tmp_retail_final_sales\n",
"select \n",
" yyyymm,\n",
" iqvia_pack_code,\n",
" zk_product_id,\n",
" prod_des_c,\n",
" PROD_MAPPING,\n",
" province_city,\n",
" market,\n",
" sales_value,\n",
" sales_value_ly,\n",
" sales_unit,\n",
" sales_unit_ly,\n",
" counting_unit,\n",
" counting_unit_ly,\n",
" pack_flag,\n",
" brand_flag\n",
"from (\n",
" select * from tmp_final_sales where market <> 'NIAD'\n",
" union all\n",
" select \n",
" yyyymm,\n",
" iqvia_pack_code,\n",
" zk_product_id,\n",
" prod_des_c,\n",
" PROD_MAPPING,\n",
" province_city,\n",
" market,\n",
" sales_value,\n",
" sales_value_ly,\n",
" sales_unit,\n",
" sales_unit_ly,\n",
" counting_unit,\n",
" counting_unit_ly,\n",
" pack_flag,\n",
" brand_flag\n",
" from tmp_niad_aggregated\n",
") t\n",
"order by yyyymm\n"
]
},
{
"cell_type": "code",
"execution_count": 0,
"metadata": {
"application/vnd.databricks.v1+cell": {
"cellMetadata": {},
"inputWidgets": {},
"nuid": "efe5e7ee-82b5-46d9-85f7-650756dffbf8",
"showTitle": false,
"tableResultSettingsMap": {},
"title": ""
}
},
"outputs": [],
"source": [
"############################################################END################################################################"
]
}
],
"metadata": {
"application/vnd.databricks.v1+notebook": {
"computePreferences": null,
"dashboards": [],
"environmentMetadata": null,
"inputWidgetPreferences": null,
"language": "python",
"notebookMetadata": {
"mostRecentlyExecutedCommandWithImplicitDF": {
"commandId": 1969542701077462,
"dataframes": [
"_sqldf"
]
},
"pythonIndentUnit": 4
},
"notebookName": "04 map_to_dws_table",
"widgets": {}
},
"language_info": {
"name": "python"
}
},
"nbformat": 4,
"nbformat_minor": 0
}