生产级多维聚合:从Pandas groupby到业务语义建模
1. 项目概述:为什么多维聚合不是“加个groupby”就能搞定的事
我在银行数据平台组干了八年,从最早用SQL写几十行嵌套子查询做客户分层,到后来带团队搭实时风险计算引擎,踩过的坑比写的代码还多。今天聊的这个主题——“多维聚合中的数据操作”,听起来像教科书里的一个章节标题,但实际在生产环境里,它直接决定着风控模型能不能准时上线、月度经营分析报告能不能在凌晨三点前自动生成、甚至某次大促期间的实时交易监控大屏会不会突然卡死。
你肯定见过这样的场景:业务方发来一封加急邮件,“请尽快输出各区域、各产品线、各客户等级的当月收入、毛利、客单价、复购率、NPS均值,再叠加近30天滚动环比和YTD累计值”。如果你第一反应是打开Jupyter,敲df.groupby(['region','product','tier'])['revenue'].sum(),然后发现结果是个MultiIndex Series,导出Excel后老板说“这列名怎么是tuple?我要的是表格!”;或者你硬着头皮写五个独立的groupby再merge,跑完发现内存爆了,任务超时被调度系统kill掉……那说明你还没真正吃透多维聚合的底层逻辑。
这不是Pandas语法手册的搬运,而是我带着三个真实项目复盘出来的经验:一个信用卡反欺诈系统的实时特征计算模块,一个财富管理平台的客户资产健康度看板,还有一个跨境支付网关的手续费分润结算引擎。它们共同指向一个事实——多维聚合的本质,是把业务语义精准映射到数据结构上,同时扛住数据量、时效性、可维护性的三重压力。所谓“高级聚合”,高级的从来不是函数名,而是你对“这笔钱到底属于谁、在什么时间、以什么方式产生、要回答什么问题”的理解深度。
关键词里提到的“Towards AI”,其实恰恰点出了这类内容的价值锚点:它不教你怎么调参,也不讲模型架构,而是聚焦在AI落地最常被忽视的“地基层”——数据如何被正确切片、折叠、拉伸、重组。就像盖楼,再炫酷的设计图,如果钢筋没绑牢、混凝土标号不够,迟早出事。本文所有案例都来自真实生产代码库(已脱敏),所有参数选择都有业务依据,所有避坑提示都来自凌晨两点的线上告警。接下来,咱们就按实战顺序,一层层拆解这些让分析师夜不能寐的聚合难题。
2. 核心思路拆解:从“能跑通”到“能扛住”的四重跃迁
很多初学者学完agg()函数,觉得“哦,会了”,结果一上生产环境就懵。我带过不少应届生,他们写的聚合脚本在本地10万行数据上跑得飞快,但放到银行日均3亿笔交易的数仓里,要么OOM,要么耗时从2分钟涨到47分钟。问题出在哪?不是语法错了,而是设计思路上缺了四重关键跃迁。下面这四点,是我给团队新人必讲的“聚合心法”。
2.1 第一重跃迁:从“单维度统计”到“业务维度建模”
看原文第一个例子,按merchant_category分组算均值和中位数。这没错,但如果你只停在这一步,就永远在做报表,不是在建模。真正的业务维度建模,要回答三个问题:这个维度是谁定义的?它的生命周期有多长?它和其他维度的关系是什么?
比如merchant_category(商户类别),在银行内部它不是随便填的字符串。它由收单部门维护,有标准编码体系(如银联MCC码),每年更新两次。它和region(地区)存在强关联——华南地区的“餐饮”类商户,和东北地区的“餐饮”类商户,客群画像、交易频次、风险特征完全不同。所以单纯按category聚合,得到的只是一个统计数字;而按['region','category']联合聚合,才开始逼近业务真相。
我在做跨境支付分润时就吃过亏。最初只按country(国家)分组算手续费,结果发现东南亚几个小国的数据波动极大。后来才发现,这些国家的商户实际由新加坡的收单机构统一接入,真正的业务责任主体是“收单机构+国家”组合。于是我们重构了维度表,把acquiring_institution(收单机构)作为一级维度,country作为二级维度,聚合逻辑立刻稳定下来。维度不是数据字段,而是业务规则的快照。选错维度,等于用错误的地图导航。
2.2 第二重跃迁:从“函数堆砌”到“计算意图显性化”
原文用了lambda x: x.max() - x.min()算范围,很简洁。但生产环境里,我严禁团队这么写。为什么?因为三个月后,当风控同事问“这个range阈值300是怎么定的”,你翻代码只能看到一行lambda,根本看不出业务逻辑。更可怕的是,当需要把这个逻辑复用到另一个指标(比如交易时长范围)时,你得再抄一遍,改一处漏一处。
我的做法是:所有自定义聚合必须封装成命名函数,并强制包含业务注释和参数校验。比如上面那个范围计算,我会写成:
def transaction_range(series, threshold_percentile=95): """ 计算交易金额范围(max-min),但排除极端异常值 业务依据:根据2023年全量数据回测,95%分位数外的交易占总量<0.3%,属设备故障或测试数据 使用场景:用于动态调整反欺诈规则阈值,避免因单笔异常大额交易误触发告警 """ if len(series) < 5: return np.nan # 数据量不足,不参与计算 # 剔除95%分位数外的离群值(非简单max/min) upper_bound = np.percentile(series, threshold_percentile) filtered_series = series[series <= upper_bound] if len(filtered_series) < 3: return np.nan return filtered_series.max() - filtered_series.min()看到没?函数名transaction_range比lambda清晰十倍;docstring里写了业务依据(2023年回测)、使用场景(反欺诈阈值)、甚至数据量兜底逻辑。这已经不是代码,而是可执行的业务文档。当新同事接手时,不用问人,看函数就知道该不该用、怎么用、边界在哪。
2.3 第三重跃迁:从“静态切片”到“时空双维度编织”
滚动窗口(rolling)和扩展窗口(expanding)常被当成“时间序列专属技巧”,这是巨大误解。它们本质是给静态聚合注入时间维度的语法糖。关键在于:窗口大小不是技术参数,而是业务节奏的翻译器。
原文用3天滚动平均算营收,看似合理。但在我们财富管理平台,这个“3”就完全不对。为什么?因为客户申购赎回是T+1确认,净值公布是每个交易日收盘后,而销售团队的晨会复盘周期是“上周五到本周四”。所以我们的滚动窗口必须是5个交易日,且必须对齐自然周(周一至周五),不能简单用freq='D'。否则,周五的滚动均值会包含下周一的数据,导致晨会看到的“最新趋势”其实是滞后的。
更隐蔽的坑在扩展窗口。原文用expanding().sum()算累计营收,没问题。但当我们算“客户生命周期价值(LTV)”时,就不能直接用expanding().sum()。因为LTV要求:1)只计算有效客户(开户满30天且有首笔交易);2)剔除销户客户的历史数据;3)按客户首次交易日期对齐起始点。这意味着扩展窗口的起点不是数据表的第一行,而是每个客户的first_transaction_date。这需要先用transform('min')算出每个客户的首笔时间,再用apply配合自定义函数实现——窗口的“起点”和“步长”,必须由业务规则驱动,而非数据物理顺序。
2.4 第四重跃迁:从“结果导向”到“下游友好型输出”
unstack()操作看起来只是把MultiIndex转成DataFrame,但背后是数据交付契约的建立。原文例子中,region作行、product作列,输出一个矩阵。这个矩阵能直接喂给BI工具吗?不一定。要看BI工具的元数据规范。
我在对接Tableau时就栽过跟头。Tableau要求维度字段名必须是纯英文、无空格、无特殊字符,而我们的region字段里有“华东-上海”这种带短横线的值。unstack()后生成的列名直接变成华东-上海,Tableau读取时报错。解决方案不是改数据源(业务不允许),而是在unstack()后立即做列名标准化:
# unstack后标准化列名 result = df_sales.groupby(['region','product'])['revenue'].mean().unstack() result.columns = [col.replace(' ', '_').replace('-', '_') for col in result.columns] # 华东-上海 -> 华东_上海 result.index.name = 'customer_segment' # 显式设置索引名,避免BI工具识别为Unnamed:0这还只是表层。更深层的是数据语义的保真。unstack()后,缺失值默认是NaN,但业务上,“某区域某产品无销售”和“数据未采集”是两回事。前者要填0(表示真实零销量),后者要留NaN(表示数据缺失)。所以unstack(fill_value=0)不是可选项,而是必选项,且必须在函数注释里写明:“fill_value=0 表示该组合在报告期内无交易,非数据丢失”。
这四重跃迁,构成了生产级聚合的护城河。它不追求炫技,而追求在业务、工程、协作三者间找到那个最稳的平衡点。接下来,我们就用这四重思维,逐个击破实操中的硬骨头。
3. 实操细节解析:那些文档里绝不会写的“脏活累活”
现在进入最硬核的部分。我把原文的五个技术点,全部还原到真实生产环境的上下文中,告诉你每一步背后的真实考量、踩过的坑、以及为什么必须这样写。这些细节,往往决定了你的聚合脚本是能用,还是能扛住百万级并发调用。
3.1 多列多函数聚合:别让列名嵌套毁掉整个ETL流程
原文的agg({'transaction_amount': ['mean','median'], 'processing_fee': ['min','max']})示例很经典,但生产环境里,这行代码后面藏着至少三道关卡。
第一关:列名嵌套的“平铺地狱”
输出结果是MultiIndex Columns,形如(transaction_amount, mean)。当你想把它存入数据库或传给下游API时,绝大多数系统不支持tuple列名。强行用reset_index()会把索引变普通列,但原始分组键(merchant_category)又成了数据的一部分,后续join极易出错。我的标准解法是:用agg()后立即pipe(flatten_columns),并封装成可复用函数:
def flatten_columns(df): """将MultiIndex Columns展平为扁平列名,用下划线连接""" if isinstance(df.columns, pd.MultiIndex): df.columns = ['_'.join(col).strip() for col in df.columns.values] return df # 应用 result = (df.groupby('merchant_category') .agg({'transaction_amount': ['mean','median'], 'processing_fee': ['min','max']}) .pipe(flatten_columns)) # 输出列名:transaction_amount_mean, transaction_amount_median, processing_fee_min, processing_fee_max第二关:数据类型一致性陷阱
你以为mean和median都是float64?错。当transaction_amount列里混有None或np.nan时,median()返回float64,但mean()在某些pandas版本下可能返回object类型(尤其当列含字符串混合时)。下游系统读取时直接报错。解决方案:强制类型转换,且转换逻辑写在agg字典里:
result = df.groupby('merchant_category').agg({ 'transaction_amount': [ ('amount_mean', lambda x: pd.to_numeric(x, errors='coerce').mean()), ('amount_median', lambda x: pd.to_numeric(x, errors='coerce').median()) ], 'processing_fee': [ ('fee_min', lambda x: pd.to_numeric(x, errors='coerce').min()), ('fee_max', lambda x: pd.to_numeric(x, errors='coerce').max()) ] })这里用元组('amount_mean', lambda...)替代字符串,既指定了新列名,又确保了类型安全。pd.to_numeric(..., errors='coerce')会把无法转数字的值设为NaN,避免类型混乱。
第三关:空组处理——业务上不存在的组,代码里必须显式声明
银行风控要求:即使某类商户当月无交易,报表中也必须显示该类别,金额为0。但groupby默认会丢弃空组。解决方案不是reindex()(太慢),而是用pd.Categorical提前定义全量类别:
# 定义全量商户类别(从业务字典获取) all_categories = ['Retail', 'Dining', 'Travel', 'Groceries', 'Electronics', 'Healthcare'] df['merchant_category'] = pd.Categorical(df['merchant_category'], categories=all_categories) result = (df.groupby('merchant_category', observed=False) # observed=False 关键!保留未出现的类别 .agg({'transaction_amount': 'sum'}) .fillna(0)) # 空组自动补0observed=False是核心开关,它告诉pandas:“别只看数据里有的,我要全量维度”。这行代码省去了后续补全逻辑,且性能极佳。
提示:在金融类ETL中,空组处理是SLA红线。某次大促期间,因未处理空组,导致“医疗健康”类商户的风控评分缺失,被监管问询。从此,我们所有聚合脚本开头必加
pd.Categorical声明。
3.2 自定义聚合函数:当业务逻辑复杂到需要“写论文”时
原文的weighted_average函数很优雅,但生产环境里,权重逻辑远不止“线性递增”。让我分享一个真实的反欺诈场景:计算客户“近期交易活跃度得分”,需综合考虑交易频次、金额、时间衰减、设备稳定性四个维度。
这个函数不能写成一个lambda,必须是一个完整的类,原因有三:1)需要预计算全局统计量(如全量客户平均交易间隔);2)需要缓存中间结果避免重复计算;3)需要支持单元测试。以下是精简版实现:
class TransactionActivityScorer: def __init__(self, base_window_days=30, decay_factor=0.95): """ 初始化活跃度评分器 base_window_days: 基础时间窗口(天),用于计算基准频次 decay_factor: 时间衰减因子,越接近1衰减越慢(适合长期客户) """ self.base_window_days = base_window_days self.decay_factor = decay_factor # 预加载全局统计(从缓存或配置中心读取,避免每次调用都查库) self.global_avg_interval = 3.2 # 全量客户平均交易间隔(天) def _calculate_time_weight(self, transaction_dates): """计算每笔交易的时间衰减权重""" if len(transaction_dates) < 2: return np.ones(len(transaction_dates)) # 以最近一笔交易为t=0,向前推算天数 latest_date = transaction_dates.max() days_since_latest = (latest_date - transaction_dates).dt.days # 指数衰减:weight = decay_factor ^ days weights = np.power(self.decay_factor, days_since_latest) return weights def _calculate_device_stability(self, device_ids): """计算设备稳定性得分(同设备交易占比)""" if len(device_ids) < 2: return 1.0 # 同设备交易次数 / 总交易次数 main_device = device_ids.mode().iloc[0] if not device_ids.mode().empty else device_ids.iloc[0] stability_score = (device_ids == main_device).sum() / len(device_ids) return min(stability_score, 0.95) # 封顶0.95,避免单一设备过度影响 def __call__(self, group_df): """ 对单个客户组计算活跃度得分 group_df: 包含date, amount, device_id等列的DataFrame """ if len(group_df) < 3: return 0.0 # 交易太少,无法评估 # 1. 时间衰减权重 time_weights = self._calculate_time_weight(group_df['date']) # 2. 金额归一化(避免大额交易主导) amount_normalized = group_df['amount'] / group_df['amount'].mean() # 3. 设备稳定性 device_score = self._calculate_device_stability(group_df['device_id']) # 4. 综合得分 = (频次得分 * 0.4) + (金额得分 * 0.3) + (设备得分 * 0.3) frequency_score = len(group_df) / self.base_window_days # 日均交易频次 amount_score = np.average(amount_normalized, weights=time_weights) final_score = (frequency_score * 0.4 + amount_score * 0.3 + device_score * 0.3) return round(final_score, 3) # 在聚合中使用 scorer = TransactionActivityScorer(base_window_days=30) result = df.groupby('customer_id').apply(scorer)看到没?这个函数本身就是一个微型业务模型。它把“活跃度”这个模糊概念,拆解为可量化、可验证、可解释的三个子维度。当风控策略调整时,我们只需修改__call__里的权重系数,无需动底层逻辑。这才是自定义聚合的终极形态——不是写代码,而是把业务规则编译成数据指令。
3.3 滚动窗口聚合:时间对齐才是真正的难点
原文的滚动平均示例,用rolling(window=3).mean()就完了。但生产环境里,window=3这个数字背后,是整整两天的跨部门对齐会议。让我用一个真实案例说明:为零售银行APP设计“近7天消费趋势”指标,用于向客户推送个性化优惠。
表面看,就是rolling(7).mean()。但问题来了:
- 数据延迟:交易流水T+1入库,即今天(10月25日)看到的是10月24日的数据。那么“近7天”应该算到10月24日,还是10月25日?
- 节假日效应:春节假期7天,客户消费模式剧变,用固定7天窗口会严重失真。
- 业务口径:市场部定义的“近7天”是自然周(周一至周日),而技术部按数据入库时间算,是滚动7天。
我的解决方案是:放弃rolling()的默认行为,用resample()+rolling()组合,强制对齐业务日历:
def get_business_weekly_trend(df, target_date=None): """ 计算客户近7天消费趋势,严格对齐自然周 target_date: 业务目标日期(如今日),用于确定计算截止点 """ if target_date is None: target_date = pd.Timestamp.today().normalize() # 取今日0点 # 1. 过滤数据:只取target_date前7天内的交易(含target_date当天) cutoff_date = target_date - pd.Timedelta(days=6) # 从target_date往前推6天,共7天 df_filtered = df[df['date'] >= cutoff_date].copy() # 2. 按自然周重采样(周一为每周第一天) # 先确保date列是datetime,再设为索引 df_filtered = df_filtered.set_index('date') weekly_resampled = df_filtered.resample('W-MON', label='left', closed='left')['amount'].sum() # 3. 对重采样后的周数据做滚动(此时窗口是周,非天) # 但我们需要的是“近7天”,所以这里滚动窗口=1(只取最新一周) trend_series = weekly_resampled.rolling(window=1).sum().dropna() # 4. 关键:填充缺失周(如某周无交易,需补0) # 构建完整周索引 full_weeks = pd.date_range(start=cutoff_date, end=target_date, freq='W-MON') trend_series = trend_series.reindex(full_weeks, fill_value=0) return trend_series.iloc[-1] # 返回最新一周的消费总额 # 在groupby中应用 df['weekly_trend'] = df.groupby('customer_id').apply( lambda x: get_business_weekly_trend(x, target_date=pd.Timestamp('2024-10-25')) )这段代码的核心思想是:滚动窗口的粒度,必须与业务决策的粒度一致。市场部看的是“周维度”的趋势,我们就必须先resample到周,再滚动。强行在日粒度上滚动7天,得到的是技术正确、业务错误的结果。这也是为什么我常说:“没有bad code,只有bad business alignment”。
3.4 扩展窗口聚合:累计值不是求和,而是状态机
原文的expanding().sum()算累计营收,干净利落。但当我做“客户资金沉淀率”计算时,发现expanding()根本不够用。因为沉淀率 = (客户账户余额 / 客户历史总入金)× 100%,而“历史总入金”这个分母,必须满足:
- 只计算该客户开户以来的入金;
- 剔除退款、手续费等负向流水;
- 按入金时间升序累加,不能按数据入库时间。
expanding()默认按DataFrame的物理顺序累加,但物理顺序 ≠ 业务时间顺序。解决方案是:先排序,再用cumsum(),且必须用transform保证分组内独立计算:
def calculate_cumulative_deposit(group_df): """ 计算客户累计入金(仅正向流水) group_df: 按customer_id分组的DataFrame,含date, amount, flow_type列 """ # 1. 过滤正向入金(flow_type == 'IN') deposits = group_df[group_df['flow_type'] == 'IN'].copy() # 2. 按交易时间排序(关键!) deposits = deposits.sort_values('date') # 3. 累计求和 deposits['cumulative_deposit'] = deposits['amount'].cumsum() # 4. 将结果映射回原group_df(保持原始顺序和行数) # 用date作为key merge,避免索引错位 result = group_df.merge( deposits[['date', 'cumulative_deposit']], on='date', how='left' ) # 5. 填充缺失值(如该客户无入金,则cumulative_deposit为0) result['cumulative_deposit'] = result['cumulative_deposit'].fillna(0) return result['cumulative_deposit'] # 应用 df['cumulative_deposit'] = df.groupby('customer_id').apply(calculate_cumulative_deposit).explode()注意explode()的使用——因为apply返回的是Series of Series,需要用explode()展开。这个细节,90%的教程都不会提,但线上环境不加就会报错。
注意:在高并发场景下,
apply+sort_values性能较差。我们最终用numba重写了核心累加逻辑,性能提升8倍。但原则不变:扩展窗口的本质,是维护一个随时间演进的状态,而不是一个数学函数。
3.5 多级分组与unstack:当行列互换成为数据契约
原文的unstack()示例,输出一个漂亮的矩阵。但生产环境里,这个矩阵要喂给三个不同系统:BI看板、下游API、监管报送文件。每个系统对行列、缺失值、数据类型的容忍度都不同。所以unstack()不是终点,而是数据交付流水线的起点。
BI看板要求:行列必须是字符串,缺失值填0,列名要带业务前缀。
下游API要求:必须是JSON格式,行索引转为字段,列名不能有特殊字符。
监管报送要求:必须是Excel,且特定单元格需加批注说明计算逻辑。
我的标准做法是:写一个deliver_to_target()函数,按目标系统类型定制输出:
def deliver_to_target(result_df, target_system='bi'): """ 将unstacked结果交付给不同目标系统 target_system: 'bi', 'api', 'regulatory' """ if target_system == 'bi': # BI看板:填0,标准化列名,重置索引 df_out = result_df.fillna(0) df_out.columns = [f"rev_{col.lower().replace(' ', '_')}" for col in df_out.columns] df_out = df_out.reset_index().rename(columns={'region': 'geo_region'}) return df_out elif target_system == 'api': # API:转JSON,索引转字段,列名转驼峰 df_out = result_df.reset_index() df_out.columns = [to_camel_case(col) for col in df_out.columns] return df_out.to_dict(orient='records') elif target_system == 'regulatory': # 监管报送:生成带批注的Excel from openpyxl import Workbook from openpyxl.comments import Comment wb = Workbook() ws = wb.active # 写入数据 for r_idx, row in enumerate(dataframe_to_rows(df_out, index=True, header=True), 1): for c_idx, value in enumerate(row, 1): cell = ws.cell(row=r_idx, column=c_idx, value=value) if r_idx == 1 and c_idx > 1: # 列标题行,除第一列外 cell.comment = Comment("计算逻辑:各区域各产品线平均收入,按自然月汇总", "System") return wb # 返回workbook对象,由调用方保存 # 辅助函数:字符串转驼峰 def to_camel_case(text): s = text.replace("-", " ").replace("_", " ") s = s.split() if len(s) == 0: return text return s[0].lower() + ''.join(i.capitalize() for i in s[1:])这个函数把unstack()从一个技术操作,升级为数据交付协议的执行器。它确保同一份聚合结果,能无缝适配所有下游需求,彻底解决“写一次,改三次”的运维噩梦。
4. 端到端实战:从信用卡交易数据到高管决策看板
现在,我们把前面所有知识点,拧成一股绳,走一遍真实的端到端流程。这不是教学演示,而是我去年在某股份制银行落地的“信用卡客户价值分层”项目。项目目标:每天凌晨2点,自动生成一份PDF报告,发送给零售银行部总经理,包含Top 100高价值客户名单、其消费行为特征、风险预警信号。整个流程从原始交易表开始,到最终PDF,全部由Python脚本驱动。
4.1 数据源与清洗:别让脏数据毁掉所有高级聚合
原始数据来自核心银行系统,每日增量同步一张card_transactions表,包含约1200万行记录。字段有:txn_id,customer_id,card_no,merchant_category,amount,fee,currency,date,time,device_id,ip_address。
清洗第一步:货币标准化
表中currency字段有CNY、USD、HKD三种。amount是原币种金额。高管要看的是人民币等值,所以必须统一换算。但汇率不是固定值——每笔交易发生时的实时汇率,存储在另一张exchange_rates表中。我的做法是:用merge_asof()做时间点对齐,而非简单join:
# exchange_rates表:date, currency, rate_cny(对人民币汇率) # 按date排序,确保merge_asof能正确匹配 exchange_rates = exchange_rates.sort_values('date') # 对交易表,按date排序后merge df_txn_sorted = df_transactions.sort_values('date') df_enriched = pd.merge_asof( df_txn_sorted, exchange_rates, on='date', by='currency', direction='backward', # 取交易时间前最近的汇率 allow_exact_matches=True ) # 计算人民币等值 df_enriched['amount_cny'] = df_enriched['amount'] * df_enriched['rate_cny']merge_asof()是关键。它比merge快10倍,且能处理“找最近时间点”的业务逻辑。如果用merge,得先对每笔交易找汇率,O(n²)复杂度,根本跑不完。
清洗第二步:设备指纹去重
同一客户用同一设备多次交易,可能是正常行为(如家庭共用手机),也可能是黑产(模拟器批量注册)。我们定义:24小时内,同一设备ID关联超过5个不同客户ID,视为可疑设备。这需要跨客户聚合,但groupby只能按单维度分组。解决方案:用transform+nunique:
# 计算每个device_id关联的客户数 df_enriched['device_customer_count'] = df_enriched.groupby('device_id')['customer_id'].transform('nunique') # 标记可疑设备 df_enriched['is_suspicious_device'] = df_enriched['device_customer_count'] > 5 # 过滤掉可疑设备的交易(风控策略) df_clean = df_enriched[~df_enriched['is_suspicious_device']]transform('nunique')是神来之笔。它不改变原DataFrame形状,直接在每行上添加一个新列,值是该行device_id对应的所有客户数。这比先groupby再merge快得多,且内存友好。
4.2 核心聚合:七层嵌套的业务逻辑
现在,我们基于df_clean,构建高管最关心的七个指标。这不是七个独立聚合,而是一个有机整体,每一层都依赖上一层的输出。
第一层:客户基础画像(单维度聚合)
按customer_id,计算:总交易笔数、总金额(CNY)、平均单笔、最大单笔、最小单笔、交易天数(去重date)、首笔交易日、末笔交易日。
base_agg = df_clean.groupby('customer_id').agg({ 'amount_cny': ['count', 'sum', 'mean', 'max', 'min'], 'date': ['nunique', 'min', 'max'] }).round(2) # 重命名列 base_agg.columns = ['txn_count', 'total_amount_cny', 'avg_txn_amount', 'max_txn_amount', 'min_txn_amount', 'txn_days', 'first_txn_date', 'last_txn_date']第二层:多维交叉分析(unstack前置)
按['customer_id','merchant_category']分组,算各品类平均交易额,再unstack()。但这里有个坑:merchant_category有50多个值,unstack()后会产生50多列,下游系统处理不了。解决方案:只保留Top 10高频品类,其余归为"Other":
# 先统计各品类交易频次 category_freq = df_clean['merchant_category'].value_counts() top_10_categories = category_freq.head(10).index.tolist() # 将非Top10品类标记为Other df_clean['mc_group'] = df_clean['merchant_category'].apply( lambda x: x if x in top_10_categories else 'Other' ) # 再聚合unstack category_agg = (df_clean.groupby(['customer_id','mc_group'])['amount_cny'] .mean() .unstack(fill_value=0) .round(2))第三层:时间动态指标(滚动+扩展组合)
计算每个客户的:近30天滚动平均交易额、近30天滚动交易频次、开户至今累计交易额、近7天交易额占累计额比例。
# 先按date排序,设索引 df_sorted = df_clean.sort_values(['customer_id','date']).set_index('date') # 滚动计算(30天窗口) rolling_30d = df_sorted.groupby('customer_id')['amount_cny'].rolling('30D').agg(['mean','count']) # 扩展计算(累计) cumulative = df_sorted.groupby('customer_id')['amount_cny'].expanding().sum() # 合并结果 temp_df = pd.concat([rolling_30d, cumulative], axis=1) temp_df.columns = ['rolling_30d_avg', 'rolling_30d_count', 'cumulative_amount'] # 计算占比 temp_df['recent_ratio'] = (temp_df['rolling_30d_avg'] * temp_df['rolling_30d_count']) / temp_df['cumulative_amount']第四层:风险信号聚合(自定义函数)
用前面定义的TransactionActivityScorer,计算每个客户的活跃度得分;再用自定义函数计算“交易时间离散度”(标准差/均值),识别睡眠客户。
# 活跃度得分 activity_score = df_clean.groupby('customer_id').apply(scorer) # 时间离散度 def time_dispersion(group_df): if len(group_df) < 5: return np.nan dates = pd.to_datetime(group_df['date']) intervals = dates.diff().dt.days.dropna() return intervals.std() / intervals.mean() if intervals.mean() > 0 else np.nan dispersion_score = df_clean.groupby('customer_id').apply(time_dispersion)第五层:高价值客户筛选(业务规则引擎)
综合以上所有指标,用规则引擎筛选Top 100:
- 规则1:
total_amount_cny> 50万 CNY(年化) - 规则2:
txn_count> 120笔(年化
