Files
MarketAnalysis-ETL/Retail/04 map_to_dws_table.ipynb
2026-04-29 10:15:15 +00:00

409 lines
15 KiB
Plaintext
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
{
"cells": [
{
"cell_type": "code",
"execution_count": 0,
"metadata": {
"application/vnd.databricks.v1+cell": {
"cellMetadata": {},
"inputWidgets": {},
"nuid": "4d16488f-0327-4ced-b23f-41f960a90d2f",
"showTitle": false,
"tableResultSettingsMap": {},
"title": ""
}
},
"outputs": [],
"source": [
"############################################################START##############################################################\n",
"### STEP-1: insert splited pack data into tmp final table: tmp_retail_final_sales"
]
},
{
"cell_type": "code",
"execution_count": 0,
"metadata": {
"application/vnd.databricks.v1+cell": {
"cellMetadata": {
"byteLimit": 2048000,
"implicitDf": true,
"rowLimit": 10000
},
"finishTime": 1777430517044,
"inputWidgets": {},
"nuid": "1168666b-255b-44a3-968b-4156c93dad53",
"showTitle": false,
"startTime": 1777430506637,
"submitTime": 1777430498072,
"tableResultSettingsMap": {
"0": {
"dataGridStateBlob": "{\"version\":1,\"tableState\":{\"columnPinning\":{\"left\":[\"#row_number#\"],\"right\":[]},\"columnSizing\":{},\"columnVisibility\":{}},\"settings\":{\"columns\":{}},\"syncTimestamp\":1777430717962}",
"filterBlob": null,
"queryPlanFiltersBlob": null,
"tableResultIndex": 0
}
},
"title": ""
}
},
"outputs": [],
"source": [
"%sql\n",
"-------------------------------------------------------------------------------------\n",
"-- STEP-1: insert splited pack data into tmp final table\n",
"-- insert into tmp_retail_final_sales\n",
"-------------------------------------------------------------------------------------\n",
"\n",
"with tmp_pack as (\n",
" select \n",
" ------------------------------------------------------ \n",
" -- 有月度数据使用月度数据,无月度数据用季度数据去转\n",
" nvl(\n",
" a.month,\n",
" CONCAT(\n",
" SUBSTRING(a.quarter, 1, 4), -- 提取年份前4位\n",
" CASE \n",
" WHEN SUBSTRING(a.quarter, 6, 1) = '1' THEN '03' -- Q1 → 03月\n",
" WHEN SUBSTRING(a.quarter, 6, 1) = '2' THEN '06' -- Q2 → 06月\n",
" WHEN SUBSTRING(a.quarter, 6, 1) = '3' THEN '09' -- Q3 → 09月\n",
" WHEN SUBSTRING(a.quarter, 6, 1) = '4' THEN '12' -- Q4 → 12月\n",
" END\n",
" )\n",
" ) as YYYYMM,\n",
" ------------------------------------------------------ \n",
" a.pack_code as iqvia_pack_code,\n",
" a.product_id as zk_product_id,\n",
" case when a.product_desc <> 'others' then a.product_desc else null end as prod_des_c,\n",
" case when a.product_desc <> 'others' then a.product_desc else concat('Others_', a.molecule_desc) end as PROD_MAPPING,\n",
" a.zk_regin as province_city,\n",
" a.level_market as market,\n",
" a.sales_value,\n",
" a.sales_unit,\n",
" ------------------------------------------------------ \n",
" -- counting_unit取值逻辑\n",
" -- 不能直接取原始pack文件表中的值改为取pack_property表中counting_unit / unit的值\n",
" a.sales_unit * (b.counting_unit/ coalesce(b.unit,1)) as counting_unit,\n",
" ------------------------------------------------------ \n",
" case when data_flag = 0 then 1 else 2 end as pack_flag,\n",
" case when brand_flag = 1 then 1 else 2 end as brand_flag\n",
" from tmp.tmp_retail_pack_rawdata a\n",
" left join dwd.dwd_gnd_ext_retail_pack_property b \n",
" on a.product_id = b.product_id\n",
"), tmp_has_roc as (\n",
" select\n",
" product_id,\n",
" quarter,\n",
" ------------------------------------------------------ \n",
" -- 有月度数据使用月度数据,无月度数据用季度数据去转\n",
" nvl(\n",
" month,\n",
" CONCAT(\n",
" SUBSTRING(quarter, 1, 4), -- 提取年份前4位\n",
" CASE \n",
" WHEN SUBSTRING(quarter, 6, 1) = '1' THEN '03' -- Q1 → 03月\n",
" WHEN SUBSTRING(quarter, 6, 1) = '2' THEN '06' -- Q2 → 06月\n",
" WHEN SUBSTRING(quarter, 6, 1) = '3' THEN '09' -- Q3 → 09月\n",
" WHEN SUBSTRING(quarter, 6, 1) = '4' THEN '12' -- Q4 → 12月\n",
" END\n",
" )\n",
" ) as month,\n",
" ------------------------------------------------------ \n",
" pack_code\n",
" from tmp.tmp_retail_pack_rawdata \n",
" where zk_regin = 'ROC'\n",
"), tmp_pack_this_year_with_roc as (\n",
" select \n",
" * \n",
" from tmp_pack a \n",
" where exists(\n",
" select * from tmp_has_roc b \n",
" where a.YYYYMM = b.month\n",
" and a.iqvia_pack_code = b.pack_code\n",
" and a.zk_product_id = b.product_id\n",
" ) and a.province_city <> '全国'\n",
"), tmp_pack_next_year_with_roc as (\n",
" select \n",
" cast(YYYYMM + 100 as int) as YYYYMM,\n",
" iqvia_pack_code,\n",
" zk_product_id,\n",
" prod_des_c,\n",
" PROD_MAPPING,\n",
" province_city,\n",
" market,\n",
" sales_value as sales_value_ly,\n",
" sales_unit as sales_unit_ly,\n",
" counting_unit as counting_unit_ly,\n",
" pack_flag,\n",
" brand_flag\n",
" from tmp_pack a\n",
" where YYYYMM + 100 <= (select max(YYYYMM) from tmp_pack)\n",
" and exists(\n",
" select * from tmp_has_roc b \n",
" where a.YYYYMM = b.month\n",
" and a.iqvia_pack_code = b.pack_code\n",
" and a.zk_product_id = b.product_id\n",
" ) and a.province_city <> '全国'\n",
"\n",
"), tmp_pack_this_year_without_roc as (\n",
" select \n",
" *\n",
" from tmp_pack a \n",
" where not exists(\n",
" select * from tmp_has_roc b \n",
" where a.YYYYMM = b.month\n",
" and a.iqvia_pack_code = b.pack_code\n",
" and a.zk_product_id = b.product_id\n",
" )\n",
"), tmp_pack_next_year_without_roc as (\n",
" select \n",
" cast(YYYYMM + 100 as int) as YYYYMM,\n",
" iqvia_pack_code,\n",
" zk_product_id,\n",
" prod_des_c,\n",
" PROD_MAPPING,\n",
" province_city,\n",
" market,\n",
" sales_value as sales_value_ly,\n",
" sales_unit as sales_unit_ly,\n",
" counting_unit as counting_unit_ly,\n",
" pack_flag,\n",
" brand_flag\n",
" from tmp_pack a \n",
" where YYYYMM + 100 <= (select max(YYYYMM) from tmp_pack)\n",
" and not exists(\n",
" select * from tmp_has_roc b \n",
" where a.YYYYMM = b.month\n",
" and a.iqvia_pack_code = b.pack_code\n",
" and a.zk_product_id = b.product_id\n",
" )\n",
"), tmp_final_sales as (\n",
" select \n",
" ifnull(a.yyyymm, b.yyyymm) as yyyymm,\n",
" ifnull(a.iqvia_pack_code, b.iqvia_pack_code) as iqvia_pack_code, \n",
" ifnull(a.zk_product_id, b.zk_product_id) as zk_product_id,\n",
" ifnull(a.prod_des_c, b.prod_des_c) as prod_des_c,\n",
" ifnull(a.PROD_MAPPING, b.PROD_MAPPING) as PROD_MAPPING,\n",
" ifnull(a.province_city, b.province_city) as province_city,\n",
" ifnull(a.market, b.market) as market,\n",
" ifnull(a.sales_value, 0) as sales_value,\n",
" ifnull(a.sales_unit, 0) as sales_unit,\n",
" ifnull(a.counting_unit, 0) as counting_unit,\n",
" ifnull(a.pack_flag, b.pack_flag) as pack_flag,\n",
" ifnull(a.brand_flag,b.brand_flag ) as brand_flag,\n",
" ifnull(b.sales_value_ly, 0) as sales_value_ly,\n",
" ifnull(b.sales_unit_ly, 0) as sales_unit_ly,\n",
" ifnull(b.counting_unit_ly, 0) as counting_unit_ly\n",
" from tmp_pack_this_year_with_roc a \n",
" full outer join tmp_pack_next_year_with_roc b \n",
" on a.YYYYMM = b.YYYYMM\n",
" and a.iqvia_pack_code = b.iqvia_pack_code\n",
" and a.zk_product_id = b.zk_product_id\n",
" and a.province_city = b.province_city\n",
"\n",
" union all\n",
"\n",
" select \n",
" ifnull(c.yyyymm, d.yyyymm) as yyyymm,\n",
" ifnull(c.iqvia_pack_code, d.iqvia_pack_code) as iqvia_pack_code, \n",
" ifnull(c.zk_product_id, d.zk_product_id) as zk_product_id,\n",
" ifnull(c.prod_des_c, d.prod_des_c) as prod_des_c,\n",
" ifnull(c.PROD_MAPPING, d.PROD_MAPPING) as PROD_MAPPING,\n",
" 'ROC' as province_city,\n",
" ifnull(c.market, d.market) as market,\n",
" ifnull(c.sales_value, 0) as sales_value,\n",
" ifnull(c.sales_unit, 0) as sales_unit,\n",
" ifnull(c.counting_unit, 0) as counting_unit,\n",
" --ifnull(c.pack_flag, d.pack_flag) as pack_flag,\n",
" 2 as pack_flag, -- 此类没有拆分比例且pack只有全国的数pack_flag固定为2\n",
" ifnull(c.brand_flag,d.brand_flag ) as brand_flag,\n",
" ifnull(d.sales_value_ly, 0) as sales_value_ly,\n",
" ifnull(d.sales_unit_ly, 0) as sales_unit_ly,\n",
" ifnull(d.counting_unit_ly, 0) as counting_unit_ly\n",
" from tmp_pack_this_year_without_roc c \n",
" full outer join tmp_pack_next_year_without_roc d \n",
" on c.YYYYMM = d.YYYYMM\n",
" and c.iqvia_pack_code = d.iqvia_pack_code\n",
" and c.zk_product_id = d.zk_product_id\n",
" and c.province_city = d.province_city \n",
")\n",
"\n",
"-- insert overwrite table tmp.tmp_retail_final_sales\n",
"\n",
"-- select\n",
"-- yyyymm,\n",
"-- iqvia_pack_code,\n",
"-- zk_product_id,\n",
"-- prod_des_c,\n",
"-- PROD_MAPPING,\n",
"-- province_city,\n",
"-- market,\n",
"-- sales_value,\n",
"-- sales_value_ly,\n",
"-- sales_unit,\n",
"-- sales_unit_ly,\n",
"-- counting_unit,\n",
"-- counting_unit_ly,\n",
"-- pack_flag,\n",
"-- brand_flag\n",
"-- from tmp_final_sales\n",
"-- order by yyyymm\n",
", tmp_niad_aggregated as (\n",
" select \n",
" concat(left(yyyymm,4),\n",
" case right(yyyymm,2)\n",
" when '01' then 'Q1' when '02' then 'Q1' when '03' then 'Q1'\n",
" when '04' then 'Q2' when '05' then 'Q2' when '06' then 'Q2'\n",
" when '07' then 'Q3' when '08' then 'Q3' when '09' then 'Q3'\n",
" else 'Q4'\n",
" end) as yyyymm_quarter,\n",
" concat(left(yyyymm,4),\n",
" case right(yyyymm,2)\n",
" when '01' then '03' when '02' then '03' when '03' then '03'\n",
" when '04' then '06' when '05' then '06' when '06' then '06'\n",
" when '07' then '09' when '08' then '09' when '09' then '09'\n",
" else '12'\n",
" end) as yyyymm,\n",
" iqvia_pack_code,\n",
" zk_product_id,\n",
" prod_des_c,\n",
" PROD_MAPPING,\n",
" province_city,\n",
" market,\n",
" sum(sales_value) as sales_value,\n",
" sum(sales_unit) as sales_unit,\n",
" sum(counting_unit) as counting_unit,\n",
" pack_flag,\n",
" brand_flag,\n",
" sum(sales_value_ly) as sales_value_ly,\n",
" sum(sales_unit_ly) as sales_unit_ly,\n",
" sum(counting_unit_ly) as counting_unit_ly\n",
" from tmp_final_sales\n",
" where market = 'NIAD'\n",
" group by \n",
" concat(left(yyyymm,4),\n",
" case right(yyyymm,2)\n",
" when '01' then 'Q1' when '02' then 'Q1' when '03' then 'Q1'\n",
" when '04' then 'Q2' when '05' then 'Q2' when '06' then 'Q2'\n",
" when '07' then 'Q3' when '08' then 'Q3' when '09' then 'Q3'\n",
" else 'Q4'\n",
" end),\n",
" concat(left(yyyymm,4),\n",
" case right(yyyymm,2)\n",
" when '01' then '03' when '02' then '03' when '03' then '03'\n",
" when '04' then '06' when '05' then '06' when '06' then '06'\n",
" when '07' then '09' when '08' then '09' when '09' then '09'\n",
" else '12'\n",
" end),\n",
" iqvia_pack_code,\n",
" zk_product_id,\n",
" prod_des_c,\n",
" PROD_MAPPING,\n",
" province_city,\n",
" market,\n",
" pack_flag,\n",
" brand_flag\n",
")\n",
"\n",
"-- 最终结果NIAD 用季度聚合,其他保持原样\n",
"insert overwrite table tmp.tmp_retail_final_sales\n",
"select \n",
" yyyymm,\n",
" iqvia_pack_code,\n",
" zk_product_id,\n",
" prod_des_c,\n",
" PROD_MAPPING,\n",
" province_city,\n",
" market,\n",
" sales_value,\n",
" sales_value_ly,\n",
" sales_unit,\n",
" sales_unit_ly,\n",
" counting_unit,\n",
" counting_unit_ly,\n",
" pack_flag,\n",
" brand_flag\n",
"from (\n",
" select \n",
" yyyymm,\n",
" iqvia_pack_code,\n",
" zk_product_id,\n",
" prod_des_c,\n",
" PROD_MAPPING,\n",
" province_city,\n",
" market,\n",
" sales_value,\n",
" sales_value_ly,\n",
" sales_unit,\n",
" sales_unit_ly,\n",
" counting_unit,\n",
" counting_unit_ly,\n",
" pack_flag,\n",
" brand_flag from tmp_final_sales where market <> 'NIAD'\n",
" union all\n",
" select \n",
" yyyymm,\n",
" iqvia_pack_code,\n",
" zk_product_id,\n",
" prod_des_c,\n",
" PROD_MAPPING,\n",
" province_city,\n",
" market,\n",
" sales_value,\n",
" sales_value_ly,\n",
" sales_unit,\n",
" sales_unit_ly,\n",
" counting_unit,\n",
" counting_unit_ly,\n",
" pack_flag,\n",
" brand_flag\n",
" from tmp_niad_aggregated\n",
") t\n",
"order by yyyymm\n"
]
},
{
"cell_type": "code",
"execution_count": 0,
"metadata": {
"application/vnd.databricks.v1+cell": {
"cellMetadata": {},
"inputWidgets": {},
"nuid": "efe5e7ee-82b5-46d9-85f7-650756dffbf8",
"showTitle": false,
"tableResultSettingsMap": {},
"title": ""
}
},
"outputs": [],
"source": [
"############################################################END################################################################"
]
}
],
"metadata": {
"application/vnd.databricks.v1+notebook": {
"computePreferences": null,
"dashboards": [],
"environmentMetadata": null,
"inputWidgetPreferences": null,
"language": "python",
"notebookMetadata": {
"mostRecentlyExecutedCommandWithImplicitDF": {
"commandId": 7839378222846108,
"dataframes": [
"_sqldf"
]
},
"pythonIndentUnit": 4
},
"notebookName": "04 map_to_dws_table",
"widgets": {}
},
"language_info": {
"name": "python"
}
},
"nbformat": 4,
"nbformat_minor": 0
}