Files
MarketAnalysis-ETL/EC/1 (ec)blob_to_dwd.sql
2026-04-27 11:04:09 +08:00

103 lines
5.3 KiB
SQL

-- Databricks notebook source
-- MAGIC %python
-- MAGIC import datetime
-- MAGIC from pyspark.sql.functions import current_timestamp, expr, date_format
-- MAGIC
-- MAGIC # 计算当前日期
-- MAGIC current_date_utc = datetime.datetime.utcnow()
-- MAGIC offset = datetime.timedelta(hours=8)
-- MAGIC current_date = current_date_utc + offset
-- MAGIC
-- MAGIC today_path = "{:04d}/{:02d}/{:02d}/".format(
-- MAGIC current_date.year,
-- MAGIC current_date.month,
-- MAGIC current_date.day,
-- MAGIC )
-- MAGIC # 基础路径
-- MAGIC # 测试环境
-- MAGIC base_path_0 = f"abfss://master@retaildlstoragetest.dfs.core.chinacloudapi.cn/ODS/GND/UserUpload/"
-- MAGIC # 生产环境
-- MAGIC # base_path_0 = f"abfss://master@azcdatalakeprd.dfs.core.chinacloudapi.cn/ODS/GND/UserUpload/"
-- MAGIC base_path = base_path_0 + today_path
-- MAGIC print(base_path)
-- MAGIC
-- MAGIC # 检查基础路径是否存在
-- MAGIC def check_path_exists(path):
-- MAGIC try:
-- MAGIC dbutils.fs.ls(path)
-- MAGIC return True
-- MAGIC except Exception as e:
-- MAGIC return False
-- MAGIC
-- MAGIC if check_path_exists(base_path):
-- MAGIC # 列出所有批次路径
-- MAGIC batch_paths = dbutils.fs.ls(base_path)
-- MAGIC
-- MAGIC # 从已存在的配置表中读取数据
-- MAGIC config_df = spark.table("dwd.dwd_gnd_ec_config_table")
-- MAGIC
-- MAGIC # 逐批处理
-- MAGIC for batch in batch_paths:
-- MAGIC current_batch_number = int(batch.name.strip('/'))
-- MAGIC # print(f"Checking batch {current_batch_number} at {batch.path}")
-- MAGIC
-- MAGIC files_in_batch = dbutils.fs.ls(batch.path)
-- MAGIC
-- MAGIC print("该批次中的文件:")
-- MAGIC for file in files_in_batch:
-- MAGIC print(file.name)
-- MAGIC
-- MAGIC for row in config_df.collect():
-- MAGIC file_name = row['file_name'].strip().lower()
-- MAGIC table_name = row['table_name']
-- MAGIC
-- MAGIC # 检查文件是否匹配
-- MAGIC matching_files = [f for f in files_in_batch if f.name.strip().lower() == file_name]
-- MAGIC
-- MAGIC for match in matching_files:
-- MAGIC csv_file_path = batch.path + match.name
-- MAGIC print(f"找到匹配的文件: {csv_file_path}")
-- MAGIC
-- MAGIC # 读取 CSV 文件
-- MAGIC df = spark.read.format("csv").option("header", "true").option("charset", "GBK").load(csv_file_path)
-- MAGIC
-- MAGIC # 对列进行重命名(假设所有文件的列名相同)
-- MAGIC df = df.withColumnRenamed('时间(月度)', 'time')
-- MAGIC df = df.withColumnRenamed('平台', 'platform')
-- MAGIC df = df.withColumnRenamed('店铺名称', 'store_name')
-- MAGIC df = df.withColumnRenamed('店铺类型', 'store_type')
-- MAGIC df = df.withColumnRenamed('产品ID', 'product_id')
-- MAGIC df = df.withColumnRenamed('品牌', 'brand_name')
-- MAGIC df = df.withColumnRenamed('品名', 'category_name')
-- MAGIC df = df.withColumnRenamed('商品名', 'product_name')
-- MAGIC df = df.withColumnRenamed('通用名', 'common_name')
-- MAGIC df = df.withColumnRenamed('厂家', 'factory')
-- MAGIC df = df.withColumnRenamed('集团权益', 'group_interest')
-- MAGIC df = df.withColumnRenamed('规格', 'specification')
-- MAGIC df = df.withColumnRenamed('单件包装盒数', 'pcs_per_box')
-- MAGIC df = df.withColumnRenamed('剂型', 'dosage')
-- MAGIC df = df.withColumnRenamed('细分一', 'des1')
-- MAGIC df = df.withColumnRenamed('细分二', 'des2')
-- MAGIC df = df.withColumnRenamed('细分三', 'des3')
-- MAGIC df = df.withColumnRenamed('细分四', 'des4')
-- MAGIC df = df.withColumnRenamed('销售额', 'sales_amount')
-- MAGIC df = df.withColumnRenamed('成交件数', 'sold_pcs')
-- MAGIC df = df.withColumnRenamed('平均单价(元/件)', 'aup_pices')
-- MAGIC df = df.withColumnRenamed('销售量(盒)', 'sales_qty')
-- MAGIC df = df.withColumnRenamed('平均单价(元/盒)', 'aup_box')
-- MAGIC df = df.withColumnRenamed('PROD_COD', 'PROD_COD')
-- MAGIC df = df.withColumnRenamed('PACK_COD', 'PACK_COD')
-- MAGIC df = df.withColumnRenamed('APP3_COD', 'APP3_COD')
-- MAGIC df = df.withColumnRenamed('ATC4_COD', 'ATC4_COD')
-- MAGIC
-- MAGIC # 添加 'etl_insert_dt' 列,包含当前时间并加上 8 小时的时差
-- MAGIC df = df.withColumn('etl_insert_dt', date_format(expr("current_timestamp() + INTERVAL 8 HOURS"), 'yyyy-MM-dd HH:mm:ss'))
-- MAGIC
-- MAGIC # 将数据保存到目标表
-- MAGIC df.write.mode("overwrite").saveAsTable(table_name)
-- MAGIC
-- MAGIC print(f"数据已写入表 {table_name}")
-- MAGIC else:
-- MAGIC print("未找到批次或路径不存在。")
-- MAGIC
-- MAGIC