友情链接:
ETL调优的一些分享(上)
ETL调优的一些分享(下)
背景
随着企业的数据量、数据源和数据类型的增加,在分析、数据科学和机器学习计划中利用这些数据以获得业务洞察力的重要性也在增加。优先考虑这些计划的需求给数据工程团队带来了越来越大的压力,因为将原始、杂乱的数据处理成干净、新鲜、可靠的数据是实施这 ETL 是抽取、转换和加载的缩写,是数据工程师用来从不同来源提取数据、将数据转换为可用和可信资源,并将数据加载到 终用户可以访问和使用的系统中以解决业务问题的流程。
抽取:从目标源中抽取数据,目标源通常是异构的,如业务系统、应用程序接口、传感器数据、营销工具和事务数据库等。其中一些数据类型可能是广泛使用的系统的结构化输出,而另一些则是半结构化的 JSON 服务器日志。
转换:将从数据源中抽取的原始数据转换为不同应用程序可以使用的格式。在这一阶段,数据将得到清理、映射和转换,通常是按照特定模式进行转换,以满足操作需求。这一过程需要进行几种类型的转换,以确保数据的质量和完整性 数据通常不会直接加载到目标数据源中,而是通常上载到暂存数据库中。这一步骤可确保在出现与计划不符的情况时快速回滚。在此阶段,您可以生成审计报告,以符合法规要求,或诊断和修复任何数据问题。
加载:加载功能是将转换后的数据从暂存区域写入目标数据库的过程,而目标数据库以前可能存在,也可能不存在。根据应用程序的要求,这一过程可能非常简单,也可能非常复杂。每个步骤都可以通过 ETL 工具或自定义代码完成。
一、数据抽取(Extract)
选择抽取策略
1、全量抽取
特点:一次性抽取所有数据,适合数据量较小或首次抽取的场景。
实现方式:直接查询整个表或读取整个文件。
2、增量抽取
特点:仅抽取发生变化的数据,适合数据量较大且需要频繁更新的场景。
常用技术:
时间戳:通过记录最后更新时间来抽取新增或修改的数据;
CDC(Change Data Capture):通过数据库日志或触发器捕获数据变化
数据抽取
数据库抽取:mysql、oracle等
文件抽取:使用文件读取库(如csv)。
API 抽取:使用 HTTP 客户端库(如 requests)发送请求,获取 API 返回的数据。
消息队列抽取:使用消息队列客户端(如 Kafka Consumer)订阅消息并获取数据。
数据抽取实现示例
从 MySQL 数据库抽取数据
连接数据库,执行 SQL 查询,将查询结果保存到 DataFrame 或文件中。
import pandas as pd
from sqlalchemy import create_engine
# 数据库连接配置
db_config ={
'host':'localhost',
'user':'root',
'password':'password',
'database':'test_db'
}
# 创建数据库连接
engine = create_engine(f"mysql+pymysql://{db_config['user']}:{db_config['password']}@{db_config['host']}/{db_config['database']}")
# 执行 SQL 查询
query ="SELECT * FROM users"# 假设 users 表包含 id, name, age 字段
df_users = pd.read_sql(query, engine)
# 输出结果
print("从 MySQL 抽取的用户数据:")
print(df_users)
从 CSV 文件抽取数据
读取 CSV 文件,将数据加载到 DataFrame 中。
import pandas as pd
# 读取 CSV 文件
df_orders = pd.read_csv('orders.csv')# 假设 orders.csv 包含 order_id, user_id, amount 字段
# 输出结果
print("从 CSV 文件抽取的订单数据:")
print(df_orders)
从 API 抽取数据
发送 HTTP 请求到 API,解析返回的 JSON 数据,将数据保存到 DataFrame 中。
import requests
import pandas as pd
# API 配置
api_url ="https://api.weatherapi.com/v1/current.json"
api_key ="your_api_key"# 替换为你的 API Key
params={
'key': api_key,
'q':'Beijing'# 查询北京的天气
}
# 发送 HTTP 请求
response = requests.get(api_url,params=params)
data = response.json()# 解析 JSON 数据
# 将数据保存到 DataFrame
weather_data ={
'location': data['location']['name'],
'temperature': data['current']['temp_c'],
'condition': data['current']['condition']['text']
}
df_weather = pd.DataFrame([weather_data])
# 输出结果
print("从 API 抽取的天气数据:")
print(df_weather)
二、数据清洗
数据清洗是确保数据质量的关键步骤,主要包括处理缺失值、删除重复记录、修正错误数据、标准化数据格式和处理异常值等操作。
数据清洗的主要任务
1、处理缺失值。填充默认值(如用 0 填充缺失的数值,用 Unknown 填充缺失的文本);删除包含缺失值的记录(如果缺失值比例较高或对分析影响较大)。
2、删除重复数据。识别并删除完全重复的记录;根据业务规则删除部分重复的记录(如保留最新的一条记录)。
3、修正错误数据。修正格式错误(如日期格式不一致、电话号码格式错误);修正逻辑错误(如年龄为负数、订单金额为 0)。
4、标准化数据格式。统一字段格式(如日期统一为 YYYY-MM-DD,金额统一为两位小数);统一编码(如性别字段统一为 Male 和 Female)。
5、处理异常值。识别并处理异常值(如年龄超过 150 岁,订单金额为负数);根据业务规则修正或剔除异常值。
6、数据拆分与合并。拆分字段(如将地址字段拆分为省、市、区);合并字段(如将姓和名字段合并为全名)。
数据清洗的具体流程
1、数据质量评估。对原始数据进行初步分析,识别数据质量问题(如缺失值、重复值、异常值);使用统计方法(如描述性统计)或可视化工具(如直方图、箱线图)评估数据质量。
2、制定清洗规则。根据业务需求和数据质量问题,制定清洗规则(如缺失值填充规则、异常值处理规则)。
3、执行清洗操作。根据清洗规则,对数据进行清洗(如填充缺失值、删除重复记录、修正错误数据)。
4、验证清洗结果。检查清洗后的数据是否符合预期(如缺失值是否已填充,重复记录是否已删除);记录清洗过程中的错误和警告。
5、输出清洗后的数据。将清洗后的数据保存到目标系统(如数据库、文件)。
数据清理实现示例
假设我们有一个包含用户信息的 CSV 文件 users.csv,需要进行以下清洗操作:
处理缺失值:将缺失的年龄字段填充为默认值 0。
删除重复记录:根据 id 字段删除完全重复的记录。
修正错误数据:将性别字段统一为 Male 和 Female。
标准化数据格式:将日期字段统一为 YYYY-MM-DD 格式。
处理异常值:删除年龄超过 100 岁的记录。
原始数据 (users.csv)
id
name
age
gender
join_date
1
Alice
25
F
2025-01-01
2
Bob
M
2025-02-15
3
Charlie
30
Male
2025-03-10
4
David
28
M
2025-04-20
5
Eve
120
F
2025-05-25
1
Alice
25
F
2025-01-01
清洗后的数据
id
name
age
gender
join_date
1
Alice
25
Female
2025-01-01
2
Bob
0
Male
2025-02-15
3
Charlie
30
Male
2025-03-10
4
David
28
Male
2025-04-20
使用 Python 和 pandas 实现上述清洗任务的代码:
import pandas as pd
# 读取数据
df = pd.read_csv('users.csv')
# 1. 处理缺失值:将缺失的年龄字段填充为默认值 0
df['age'] = df['age'].fillna(0)
# 2. 删除重复记录:根据 id 字段删除完全重复的记录
df = df.drop_duplicates(subset=['id'])
# 3. 修正错误数据:将性别字段统一为 Male 和 Female
df['gender'] = df['gender'].replace({'M': 'Male', 'F': 'Female'})
# 4. 标准化数据格式:将日期字段统一为 YYYY-MM-DD 格式
df['join_date'] = pd.to_datetime(df['join_date']).dt.strftime('%Y-%m-%d')
# 5. 处理异常值:删除年龄超过 100 岁的记录
df = df[df['age'] <= 100]
# 输出清洗后的数据
print("清洗后的数据:")
print(df)
# 保存清洗后的数据到新文件
df.to_csv('cleaned_users.csv', index=False)
三、数据转换(Transform)
数据转换的工作流程
1、数据转换操作。数据映射:将源字段与目标字段匹配;数据拆分:将一个字段拆分为多个字段(如身份证号拆分为地区码、出生日期);数据聚合:对数据进行分组和计算(如按地区汇总销售额)。
2、数据验证。检查转换后的数据是否符合预期(如字段类型、数据范围);记录转换过程中的错误和警告。
3、数据输出。将转换后的数据保存到目标系统(如数据库、文件)。
数据转换实现示例
假设我们有一个用户信息的原始数据集raw_users.csv,该数据集包含用户的ID、姓名、性别、出生日期和地址等字段。我们的目标是将这些数据转换为适合分析的形式,并最终加载到数据库中。具体转换需求如下:
数据映射:将源数据中的“gender”字段值从"M", "F"映射为"Male", "Female"。
数据拆分:将“address”字段拆分为“province”(省份)、“city”(城市)和“district”(区县)三个字段。
数据聚合:按省份统计用户数量,并计算每个省份用户的平均年龄。
原始数据 (raw_users.csv)
id
name
gender
birth_date
address
1
Alice
F
1990-01-01
北京市朝阳区
2
Bob
M
1985-05-20
上海市浦东新区
3
Carol
F
1992-07-14
广东省广州市天河区
转换后的数据
id
name
gender
birth_date
province
city
district
1
Alice
Female
1990-01-01
北京
朝阳
2
Bob
Male
1985-05-20
上海
浦东
3
Carol
Female
1992-07-14
广东
广州
天河
使用 Python 实现数据转换
import pandas as pd
# 读取原始数据
df = pd.read_csv('raw_users.csv')
# 1. 数据映射:性别字段标准化
gender_mapping = {'F': 'Female', 'M': 'Male'}
df['gender'] = df['gender'].map(gender_mapping)
# 2. 数据拆分:从地址字段提取省份、城市、区县
def split_address(address):
parts = address.split('市')
province = parts[0].replace('省', '')
city = parts[1].split('区')[0]
district = parts[1].split('区')[1] if len(parts[1].split('区')) > 1 else ''
return province, city, district
df[['province', 'city', 'district']] = df['address'].apply(lambda x: pd.Series(split_address(x)))
# 删除原始地址列
df.drop(columns=['address'], inplace=True)
# 3. 数据聚合:按省份统计用户数量并计算平均年龄
df['birth_date'] = pd.to_datetime(df['birth_date'])
current_year = pd.Timestamp.now().year
df['age'] = current_year - df['birth_date'].dt.year
summary = df.groupby('province').agg(
user_count=('id', 'count'),
avg_age=('age', 'mean') ).reset_index()
print("转换后的汇总数据:")
print(summary)
# 输出转换后的数据到新文件
df.to_csv('transformed_users.csv', index=False)
summary.to_csv('province_summary.csv', index=False)
四、数据加载(Load)
数据加载的主要任务
1、选择加载策略。根据业务需求和数据量,选择合适的加载策略(如全量加载、增量加载)。
2、数据写入目标系统。将数据写入目标系统的表或文件中。
3、数据验证与日志记录。检查加载后的数据是否符合预期(如行数、字段数、数据类型等);记录加载过程中的关键信息(如加载时间、数据量、错误信息等)。
4、异常处理。处理加载过程中出现的错误(如数据格式不匹配、目标系统不可用)。
数据加载的策略
1、全量加载(Full Load)
特点:每次加载时,将所有数据写入目标系统,覆盖原有数据。
适用场景:数据量较小;目标系统需要完全刷新数据(如首次加载或数据重构)。
优点:简单易实现,保证数据一致性。
缺点:资源消耗大,不适合频繁加载。
2、增量加载(Incremental Load)
特点:仅加载新增或修改的数据,不覆盖原有数据。
适用场景:数据量较大;目标系统需要频繁更新数据。
优点:资源消耗小,适合频繁加载。
缺点:需要识别新增或修改的数据(如通过时间戳、日志或 CDC 技术)。
3、批量加载(Bulk Load)
特点:将数据分批加载到目标系统,减少单次加载的资源消耗。
适用场景:数据量较大,无法一次性加载;目标系统对单次加载的数据量有限制。
优点:减少资源消耗,适合大规模数据加载。
缺点:需要管理分批加载的逻辑。
4、实时加载(Real-time Load)
特点:将数据实时加载到目标系统,支持低延迟查询。
适用场景:需要实时分析和查询的场景(如实时监控、实时报表)。
优点:支持低延迟查询。
缺点:实现复杂,对目标系统性能要求高。
数据加载的具体流程
1.选择加载策略。根据业务需求和数据量,选择合适的加载策略(如全量加载、增量加载)。
2.数据写入目标系统。数据库:使用 SQL 语句(如 INSERT INTO、UPDATE)或数据库工具(如AgroDB内置工具Impexp);文件:将数据保存为文件(如 CSV、JSON、Parquet);数据湖/数据仓库:使用专用工具(如星环湖仓一体平台)。
3.数据验证与日志记录。检查加载后的数据是否符合预期(如行数、字段数、数据类型等);记录加载过程中的关键信息(如加载时间、数据量、错误信息等)。
4.异常处理。处理加载过程中出现的错误(如数据格式不匹配、目标系统不可用);根据错误类型,选择重试、跳过或报警。
数据加载实现示例
假设我们有一个清洗后的用户数据表 cleaned_users.csv,需要将其加载到 MySQL 数据库中。以下是具体实现:
清洗后的数据 (cleaned_users.csv)
id
name
age
gender
join_date
1
Alice
25
Female
2025-01-01
2
Bob
0
Male
2025-02-15
3
Charlie
30
Male
2025-03-10
4
David
28
Male
2025-04-20
目标表结构 (users)
字段名
类型
说明
id
INT
用户 ID
name
VARCHAR(50)
用户姓名
age
INT
用户年龄
gender
VARCHAR(10)
用户性别
join_date
DATE
加入日期
使用 Python 实现数据加载
以下是使用 Python 和 pandas + SQLAlchemy 实现数据加载的代码:
import pandas as pd
from sqlalchemy import create_engine
# 读取清洗后的数据
df = pd.read_csv('cleaned_users.csv')
# 数据库连接配置
db_config = {
'host': 'localhost',
'user': 'root',
'password': 'password',
'database': 'test_db'
}
# 创建数据库连接
engine = create_engine(f"mysql+pymysql://{db_config['user']}:{db_config['password']}@{db_config['host']}/{db_config['database']}")
# 将数据加载到 MySQL 数据库
try:
df.to_sql('users', con=engine, if_exists='append', index=False) # if_exists='append' 表示增量加载
print("数据加载成功!")
except Exception as e:
print(f"数据加载失败:{e}")
运行结果
数据加载后,MySQL 数据库中的 users 表内容如下:
id
name
age
gender
join_date
1
Alice
25
Female
2025-01-01
2
Bob
0
Male
2025-02-15
3
Charlie
30
Male
2025-03-10
4
David
28
Male
2025-04-20