-- Databricks notebook source -- MAGIC %python -- MAGIC import datetime -- MAGIC from pyspark.sql.functions import current_timestamp, expr, date_format -- MAGIC -- MAGIC # 计算当前日期 -- MAGIC current_date_utc = datetime.datetime.utcnow() -- MAGIC offset = datetime.timedelta(hours=8) -- MAGIC current_date = current_date_utc + offset -- MAGIC -- MAGIC today_path = "{:04d}/{:02d}/{:02d}/".format( -- MAGIC current_date.year, -- MAGIC current_date.month, -- MAGIC current_date.day, -- MAGIC ) -- MAGIC # 基础路径 -- MAGIC # 测试环境 -- MAGIC base_path_0 = f"abfss://master@retaildlstoragetest.dfs.core.chinacloudapi.cn/ODS/GND/UserUpload/" -- MAGIC # 生产环境 -- MAGIC # base_path_0 = f"abfss://master@azcdatalakeprd.dfs.core.chinacloudapi.cn/ODS/GND/UserUpload/" -- MAGIC base_path = base_path_0 + today_path -- MAGIC print(base_path) -- MAGIC -- MAGIC # 检查基础路径是否存在 -- MAGIC def check_path_exists(path): -- MAGIC try: -- MAGIC dbutils.fs.ls(path) -- MAGIC return True -- MAGIC except Exception as e: -- MAGIC return False -- MAGIC -- MAGIC if check_path_exists(base_path): -- MAGIC # 列出所有批次路径 -- MAGIC batch_paths = dbutils.fs.ls(base_path) -- MAGIC -- MAGIC # 从已存在的配置表中读取数据 -- MAGIC config_df = spark.table("dwd.dwd_gnd_ec_config_table") -- MAGIC -- MAGIC # 逐批处理 -- MAGIC for batch in batch_paths: -- MAGIC current_batch_number = int(batch.name.strip('/')) -- MAGIC # print(f"Checking batch {current_batch_number} at {batch.path}") -- MAGIC -- MAGIC files_in_batch = dbutils.fs.ls(batch.path) -- MAGIC -- MAGIC print("该批次中的文件:") -- MAGIC for file in files_in_batch: -- MAGIC print(file.name) -- MAGIC -- MAGIC for row in config_df.collect(): -- MAGIC file_name = row['file_name'].strip().lower() -- MAGIC table_name = row['table_name'] -- MAGIC -- MAGIC # 检查文件是否匹配 -- MAGIC matching_files = [f for f in files_in_batch if f.name.strip().lower() == file_name] -- MAGIC -- MAGIC for match in matching_files: -- MAGIC csv_file_path = batch.path + match.name -- MAGIC print(f"找到匹配的文件: {csv_file_path}") -- MAGIC -- MAGIC # 读取 CSV 文件 -- MAGIC df = spark.read.format("csv").option("header", "true").option("charset", "GBK").load(csv_file_path) -- MAGIC -- MAGIC # 对列进行重命名(假设所有文件的列名相同) -- MAGIC df = df.withColumnRenamed('时间(月度)', 'time') -- MAGIC df = df.withColumnRenamed('平台', 'platform') -- MAGIC df = df.withColumnRenamed('店铺名称', 'store_name') -- MAGIC df = df.withColumnRenamed('店铺类型', 'store_type') -- MAGIC df = df.withColumnRenamed('产品ID', 'product_id') -- MAGIC df = df.withColumnRenamed('品牌', 'brand_name') -- MAGIC df = df.withColumnRenamed('品名', 'category_name') -- MAGIC df = df.withColumnRenamed('商品名', 'product_name') -- MAGIC df = df.withColumnRenamed('通用名', 'common_name') -- MAGIC df = df.withColumnRenamed('厂家', 'factory') -- MAGIC df = df.withColumnRenamed('集团权益', 'group_interest') -- MAGIC df = df.withColumnRenamed('规格', 'specification') -- MAGIC df = df.withColumnRenamed('单件包装盒数', 'pcs_per_box') -- MAGIC df = df.withColumnRenamed('剂型', 'dosage') -- MAGIC df = df.withColumnRenamed('细分一', 'des1') -- MAGIC df = df.withColumnRenamed('细分二', 'des2') -- MAGIC df = df.withColumnRenamed('细分三', 'des3') -- MAGIC df = df.withColumnRenamed('细分四', 'des4') -- MAGIC df = df.withColumnRenamed('销售额', 'sales_amount') -- MAGIC df = df.withColumnRenamed('成交件数', 'sold_pcs') -- MAGIC df = df.withColumnRenamed('平均单价(元/件)', 'aup_pices') -- MAGIC df = df.withColumnRenamed('销售量(盒)', 'sales_qty') -- MAGIC df = df.withColumnRenamed('平均单价(元/盒)', 'aup_box') -- MAGIC df = df.withColumnRenamed('PROD_COD', 'PROD_COD') -- MAGIC df = df.withColumnRenamed('PACK_COD', 'PACK_COD') -- MAGIC df = df.withColumnRenamed('APP3_COD', 'APP3_COD') -- MAGIC df = df.withColumnRenamed('ATC4_COD', 'ATC4_COD') -- MAGIC -- MAGIC # 添加 'etl_insert_dt' 列,包含当前时间并加上 8 小时的时差 -- MAGIC df = df.withColumn('etl_insert_dt', date_format(expr("current_timestamp() + INTERVAL 8 HOURS"), 'yyyy-MM-dd HH:mm:ss')) -- MAGIC -- MAGIC # 将数据保存到目标表 -- MAGIC df.write.mode("overwrite").saveAsTable(table_name) -- MAGIC -- MAGIC print(f"数据已写入表 {table_name}") -- MAGIC else: -- MAGIC print("未找到批次或路径不存在。") -- MAGIC -- MAGIC