【实用工具】用Python将Excel表格数据一键导入MySQL数据库
我来教你写一个实用的Python工具,一键将Excel数据导入MySQL数据库。
完整代码实现
import pandas as pd import pymysql from sqlalchemy import create_engine import os import sys from datetime import datetime class ExcelToMySQL: def __init__(self, host='localhost', user='root', password='your_password', database='test_db'): """ 初始化数据库连接 :param host: 数据库地址 :param user: 用户名 :param password: 密码 :param database: 数据库名 """ self.db_config = { 'host': host, 'user': user, 'password': password, 'database': database, 'charset': 'utf8mb4' } # 使用SQLAlchemy创建引擎(更推荐的方式) self.engine = create_engine(f'mysql+pymysql://{user}:{password}@{host}/{database}?charset=utf8mb4') # 传统连接方式(用于执行DDL语句) self.connection = None self.cursor = None def connect(self): """建立数据库连接""" try: self.connection = pymysql.connect(**self.db_config) self.cursor = self.connection.cursor() print("✅ 数据库连接成功") return True except Exception as e: print(f"❌ 数据库连接失败: {e}") return False def close(self): """关闭连接""" if self.cursor: self.cursor.close() if self.connection: self.connection.close() print("🔒 数据库连接已关闭") def read_excel(self, file_path, sheet_name=0): """ 读取Excel文件 :param file_path: Excel文件路径 :param sheet_name: 工作表名称或索引,默认第一个sheet :return: DataFrame对象 """ try: # 判断文件类型 if file_path.endswith('.xlsx'): df = pd.read_excel(file_path, sheet_name=sheet_name, engine='openpyxl') elif file_path.endswith('.xls'): df = pd.read_excel(file_path, sheet_name=sheet_name, engine='xlrd') elif file_path.endswith('.csv'): df = pd.read_csv(file_path, encoding='utf-8') else: print("❌ 不支持的文件格式,请使用.xlsx、.xls或.csv文件") return None print(f"✅ 成功读取Excel文件: {file_path}") print(f"📊 共 {len(df)} 行数据,{len(df.columns)} 列") print(f"📋 列名: {list(df.columns)}") return df except Exception as e: print(f"❌ 读取Excel失败: {e}") return None def preview_data(self, df, rows=5): """ 预览数据前几行 :param df: DataFrame对象 :param rows: 预览行数 """ if df is None or df.empty: print("📭 数据为空") return print(f"\n📋 数据预览 (前{rows}行):") print("=" * 80) print(df.head(rows).to_string(index=False)) print("=" * 80) # 显示数据类型 print("\n📊 数据类型:") for col in df.columns: print(f" {col}: {df[col].dtype}") def auto_create_table(self, df, table_name, drop_if_exists=False): """ 根据DataFrame自动创建MySQL表 :param df: DataFrame对象 :param table_name: 表名 :param drop_if_exists: 如果表存在是否删除重建 """ if not self.connect(): return False try: # 映射pandas数据类型到MySQL类型 type_mapping = { 'object': 'VARCHAR(255)', 'int64': 'INT', 'float64': 'DECIMAL(15,2)', 'bool': 'TINYINT(1)', 'datetime64[ns]': 'DATETIME', 'timedelta[ns]': 'TIME' } # 构建CREATE TABLE语句 columns_def = [] for col, dtype in df.dtypes.items(): mysql_type = type_mapping.get(str(dtype), 'VARCHAR(255)') # 清理列名中的特殊字符 clean_col = col.replace(' ', '_').replace('-', '_').replace('.', '_') columns_def.append(f"`{clean_col}` {mysql_type}") create_sql = f"CREATE TABLE IF NOT EXISTS `{table_name}` (\n" create_sql += " `id` INT AUTO_INCREMENT PRIMARY KEY,\n" create_sql += " " + ",\n ".join(columns_def) + "\n" create_sql += ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;" # 如果表存在且需要删除 if drop_if_exists: self.cursor.execute(f"DROP TABLE IF EXISTS `{table_name}`") print(f"🗑️ 已删除旧表: {table_name}") self.cursor.execute(create_sql) self.connection.commit() print(f"✅ 表 '{table_name}' 创建成功") return True except Exception as e: print(f"❌ 创建表失败: {e}") self.connection.rollback() return False finally: self.close() def import_data(self, df, table_name, if_exists='append', chunksize=1000): """ 导入数据到MySQL :param df: DataFrame对象 :param table_name: 目标表名 :param if_exists: 表存在时的处理方式 ('fail', 'replace', 'append') :param chunksize: 批量插入的每批大小 """ if df is None or df.empty: print("📭 数据为空,无法导入") return False try: # 清理列名 df_clean = df.copy() df_clean.columns = [col.replace(' ', '_').replace('-', '_').replace('.', '_') for col in df_clean.columns] # 处理缺失值 df_clean = df_clean.fillna('') # 分批导入大数据量 total_rows = len(df_clean) imported_rows = 0 print(f"\n⏳ 开始导入数据到表 '{table_name}'...") start_time = datetime.now() for i in range(0, total_rows, chunksize): chunk = df_clean.iloc[i:i+chunksize] chunk.to_sql( name=table_name, con=self.engine, if_exists=if_exists if i == 0 else 'append', index=False, method='multi' ) imported_rows += len(chunk) progress = (imported_rows / total_rows) * 100 print(f" 进度: {progress:.1f}% ({imported_rows}/{total_rows})") end_time = datetime.now() duration = (end_time - start_time).total_seconds() print(f"\n✅ 导入完成!") print(f"📊 共导入 {imported_rows} 条数据") print(f"⏱️ 耗时: {duration:.2f} 秒") print(f"⚡ 速度: {imported_rows/duration:.0f} 条/秒") return True except Exception as e: print(f"❌ 导入失败: {e}") return False def validate_and_clean_data(self, df): """ 验证和清洗数据 :param df: DataFrame对象 :return: 清洗后的DataFrame """ print("\n🔍 数据验证与清洗...") issues = [] # 检查空值 null_counts = df.isnull().sum() if null_counts.any(): print(f"⚠️ 发现空值:") for col, count in null_counts[null_counts > 0].items(): print(f" - {col}: {count} 个空值") # 检查重复行 duplicates = df.duplicated().sum() if duplicates > 0: print(f"⚠️ 发现 {duplicates} 行重复数据") issues.append("重复数据") # 检查特殊字符 for col in df.select_dtypes(include=['object']).columns: special_chars = df[col].str.contains(r'[\'"\\]', na=False).sum() if special_chars > 0: print(f"⚠️ {col} 列包含 {special_chars} 个特殊字符") # 填充或处理空值 df_clean = df.fillna({ col: '' if df[col].dtype == 'object' else 0 for col in df.columns }) if not issues: print("✅ 数据验证通过") return df_clean def interactive_mode(): """交互式模式""" print("=" * 50) print("📥 Excel数据一键导入MySQL工具") print("=" * 50) # 获取数据库配置 print("\n🔧 数据库配置:") host = input("数据库地址 (默认 localhost): ").strip() or 'localhost' user = input("数据库用户名 (默认 root): ").strip() or 'root' password = input("数据库密码: ").strip() database = input("数据库名: ").strip() # 创建导入器 importer = ExcelToMySQL(host=host, user=user, password=password, database=database) # 获取Excel文件路径 while True: file_path = input("\n📂 Excel文件路径: ").strip() if os.path.exists(file_path): break print("❌ 文件不存在,请重新输入") # 读取Excel df = importer.read_excel(file_path) if df is None: return # 预览数据 importer.preview_data(df) # 数据清洗 clean = input("\n是否进行数据清洗? (y/n, 默认 y): ").strip().lower() if clean != 'n': df = importer.validate_and_clean_data(df) # 设置表名 table_name = input("目标表名 (默认使用文件名): ").strip() if not table_name: table_name = os.path.splitext(os.path.basename(file_path))[0] # 清理表名 table_name = ''.join(c if c.isalnum() or c == '_' else '_' for c in table_name) # 选择导入模式 print("\n📋 导入模式:") print("1. 追加到现有表 (append)") print("2. 替换现有表 (replace)") print("3. 如果表不存在则创建新表 (auto)") mode_map = {'1': 'append', '2': 'replace', '3': 'auto'} mode_choice = input("请选择 (1-3, 默认 1): ").strip() or '1' if_exists = mode_map.get(mode_choice, 'append') # 自动建表(如果需要) if if_exists == 'auto' or if_exists == 'replace': drop_first = if_exists == 'replace' importer.auto_create_table(df, table_name, drop_if_exists=drop_first) if_exists = 'append' # 确认导入 print(f"\n📊 准备导入 {len(df)} 条数据到表 '{table_name}'") confirm = input("确认导入? (y/n): ").strip().lower() if confirm == 'y': importer.import_data(df, table_name, if_exists=if_exists) else: print("已取消导入") def quick_import(excel_path, db_config, table_name=None, sheet_name=0): """ 快速导入函数 :param excel_path: Excel文件路径 :param db_config: 数据库配置字典 :param table_name: 表名,默认使用文件名 :param sheet_name: 工作表名称或索引 """ importer = ExcelToMySQL(**db_config) # 读取Excel df = importer.read_excel(excel_path, sheet_name) if df is None: return False # 自动设置表名 if not table_name: table_name = os.path.splitext(os.path.basename(excel_path))[0] # 自动建表并导入 importer.auto_create_table(df, table_name) result = importer.import_data(df, table_name) return result # 示例用法 if __name__ == "__main__": # 方法1: 交互式模式(推荐新手使用) interactive_mode() # 方法2: 快速导入(适合脚本调用) """ db_config = { 'host': 'localhost', 'user': 'root', 'password': 'your_password', 'database': 'test_db' } quick_import('学生成绩.xlsx', db_config) """使用说明
1️⃣ 安装依赖
pip install pandas pymysql sqlalchemy openpyxl xlrd2️⃣ 准备Excel文件
Excel文件格式示例(学生成绩.xlsx):
姓名 | 语文 | 数学 | 英语 |
|---|---|---|---|
张三 | 90 | 85 | 92 |
李四 | 88 | 91 | 87 |
3️⃣ 运行程序
交互式模式(推荐):
python excel_to_mysql.py然后按照提示输入数据库信息和文件路径即可。
快速导入模式(适合脚本):
db_config = { 'host': 'localhost', 'user': 'root', 'password': '123456', 'database': 'test_db' } quick_import('学生成绩.xlsx', db_config)功能特点
✅自动建表:根据Excel列名和数据类型自动创建MySQL表
✅批量导入:大数据量时分批导入,避免内存溢出
✅数据清洗:自动处理空值和特殊字符
✅进度显示:实时显示导入进度
✅支持多种格式:.xlsx、.xls、.csv
✅性能统计:显示导入速度和耗时
常见问题解决
编码问题:如果中文乱码,确保Excel保存为UTF-8编码
大文件处理:超过10万行的数据会自动分批次导入
类型转换:数字列会被自动识别为INT或DECIMAL类型
