import re import pandas as pd import clr import os from config import CURRENT_DIR_PATH_NET, NET_FOLDER folder = os.path.join(CURRENT_DIR_PATH_NET, NET_FOLDER) clr.AddReference( folder + r"\Microsoft.AnalysisServices.AdomdClient\v4.0_15.0.0.0__89845dcd8080cc91\Microsoft.AnalysisServices.AdomdClient.DLL" ) from Microsoft.AnalysisServices.AdomdClient import AdomdConnection, AdomdCommand # type: ignore class ASDataFetcher: def __init__( self, workspace, username, password, queries, json_name, result_folder_path, isAdmin=True, catalog=None, effective_username=None, customdata=None, role=None, model_number=None, ): self.workspace = workspace self.username = username self.password = password self.queries = queries self.isAdmin = isAdmin self.effective_username = effective_username self.customdata = customdata self.role = role self.result_folder_path = result_folder_path self.error_list = [] self.json_name = json_name self.model_number = model_number self.log_messages = [] self.catalog = catalog # 先将当前对象的 connString 方法的返回值赋值给当前对象的 conn_string 属性, 以便后续使用 self.conn_string = self.connString() def connString(self) -> str: # 初始化连接字符串,包含数据源、用户名和密码 # 如果workspace开头是powerbi,则连接字符串中包含catalog if self.workspace.startswith("powerbi://"): conn = f"DataSource={self.workspace};User ID={self.username};Password={self.password};Initial Catalog={self.catalog}" else: conn = f"DataSource={self.workspace};User ID={self.username};Password={self.password}" # 如果是管理员,直接返回连接字符串 if self.isAdmin: self.result_excel_name = f"admin_{self.json_name}_{self.model_number}" self.result_full_excel_name = os.path.join( self.result_folder_path, self.result_excel_name ) return conn # 如果不是管理员,且提供了角色信息 elif self.role: # 如果提供了有效的用户名,返回包含有效用户名和角色的连接字符串 if self.effective_username: self.result_excel_name = ( f"{self.role}_{self.effective_username}_{self.json_name}_{self.model_number}" ) self.result_full_excel_name = os.path.join( self.result_folder_path, self.result_excel_name ) return f"{conn};EffectiveUserName={self.effective_username};Roles={self.role};" # 如果提供了自定义数据,返回包含自定义数据和角色的连接字符串 elif self.customdata: self.result_excel_name = ( f"{self.role}_{self.customdata}_{self.json_name}_{self.model_number}" ) self.result_full_excel_name = os.path.join( self.result_folder_path, self.result_excel_name ) return f"{conn};CustomData={self.customdata};Roles={self.role};" # 如果只提供了角色,返回包含角色的连接字符串 else: self.result_excel_name = f"{self.role}_-NoUser-_{self.json_name}_{self.model_number}" self.result_full_excel_name = os.path.join( self.result_folder_path, self.result_excel_name ) return f"{conn};Roles={self.role};" # 如果不是管理员且未提供角色信息,抛出异常 else: raise ValueError("非管理员用户必须提供角色信息") def readData(self, reader): # 获取列名 column_names = [reader.GetName(i) for i in range(reader.FieldCount)] # 初始化数据存储 data = [] while reader.Read(): # 读取每一行的数据 row_data = [reader[i] for i in range(reader.FieldCount)] # 将行数据添加到数据列表中 data.append(row_data) # 将数据转换为 DataFrame df = pd.DataFrame(data, columns=column_names) # 返回DataFrame和下一个结果集 return df, reader.NextResult() def getDataFromAS(self, query): # 创建AdomdConnection对象 conn = AdomdConnection(self.conn_string) try: # 建立连接 conn.Open() # 创建命令 cmd = AdomdCommand(query, conn) # 执行命令 reader = cmd.ExecuteReader() IsNext = True while IsNext: # 将结果集转换为DataFrame df, IsNext = self.readData(reader) # 重命名列名 df.columns = [ ( re.search(r"\[(.*?)\]", col).group(1) if re.search(r"\[(.*?)\]", col) else col ) for col in df.columns ] yield df except Exception as ex: message = f"错误信息: {ex}" print(message) self.log_messages.append(f"\n{message}") finally: # 关闭连接 conn.Close() def writeToExcel(self, print_lock): # 创建一个ExcelWriter对象 with pd.ExcelWriter( f"{self.result_full_excel_name}.xlsx", engine="openpyxl" ) as writer: sheet_name_id = 1 # 将DataFrame写入不同的工作表 for query in self.queries: for df in self.getDataFromAS(query): try: # 访问工作簿和工作表 workbook = writer.book worksheet = workbook.create_sheet(str(sheet_name_id)) worksheet["A1"] = query # 将DataFrame写入Excel文件 df.to_excel( writer, sheet_name=str(sheet_name_id), index=True, float_format="%.2f", startrow=1, ) except Exception as e: with print_lock: message = f"写入工作表 {sheet_name_id} 失败: {e}" print(message) self.log_messages.append(f"\n{message}") self.error_list.append(sheet_name_id) else: with print_lock: message = f"{self.result_excel_name} \n表格大小:{df.shape} \n写入工作表 {sheet_name_id}" print(message) self.log_messages.append(f"\n{message}") finally: with print_lock: message = f"-------------------------" print(message) self.log_messages.append(f"{message}") sheet_name_id += 1 return self.error_list if __name__ == "__main__": pass