当前位置:首页 > 科技  > 软件

数据工程中的单元测试完全指南

来源: 责编: 时间:2023-09-20 21:51:38 223观看
导读在数据工程领域中,经常被忽视的一项实践是单元测试。许多人可能认为单元测试仅仅是一种软件开发方法论,但事实远非如此。随着我们努力构建稳健、无错误的数据流水线和SQL数据模型,单元测试在数据工程中的价值变得越来越

在数据工程领域中,经常被忽视的一项实践是单元测试。许多人可能认为单元测试仅仅是一种软件开发方法论,但事实远非如此。随着我们努力构建稳健、无错误的数据流水线和SQL数据模型,单元测试在数据工程中的价值变得越来越清晰。aek28资讯网——每日最新资讯28at.com

本文带你深入探索如何将这些成熟的软件工程实践应用到数据工程中。aek28资讯网——每日最新资讯28at.com

1 单元测试的重要性

在数据工程的背景下,采用单元测试可以确保您的数据和业务逻辑的准确性,进而产出高质量的数据,获得您的数据分析师、科学家和决策者对数据的信任。aek28资讯网——每日最新资讯28at.com

2 单元测试数据流水线

数据流水线通常涉及复杂的数据抽取、转换和加载(ETL)操作序列,出错的可能性很大。为了对这些操作进行单元测试,我们将流水线拆分为单个组件,并对每个组件进行独立验证。aek28资讯网——每日最新资讯28at.com

以一个简单的流水线为例,该流水线从CSV文件中提取数据,通过清除空值来转换数据,然后将其加载到数据库中。以下是使用pandas的基于Python的示例:aek28资讯网——每日最新资讯28at.com

import pandas as pdfrom sqlalchemy import create_engine# 加载CSV文件的函数def load_data(file_name):    data = pd.read_csv(file_name)    return data# 清理数据的函数def clean_data(data):    data = data.dropna()    return data# 将数据保存到SQL数据库的函数def save_data(data, db_string, table_name):    engine = create_engine(db_string)    data.to_sql(table_name, engine, if_exists='replace')# 运行数据流水线data = load_data('data.csv')data = clean_data(data)save_data(data, 'sqlite:///database.db', 'my_table')

为了对这个流水线进行单元测试,我们使用像pytest这样的库为每个函数编写单独的测试。aek28资讯网——每日最新资讯28at.com

在这个示例中,有三个主要的函数:load_data、clean_data和save_data。我们会为每个函数编写测试。对于load_data和save_data,需要设置一个临时的CSV文件和SQLite数据库,可以使用pytest库的fixture功能来实现。aek28资讯网——每日最新资讯28at.com

import osimport pandas as pdimport pytestfrom sqlalchemy import create_engine, inspect# 使用pytest fixture来设置临时的CSV文件和SQLite数据库@pytest.fixturedef csv_file(tmp_path):    data = pd.DataFrame({        'name': ['John', 'Jane', 'Doe'],        'age': [34, None, 56]  # Jane的年龄缺失    })    file_path = tmp_path / "data.csv"    data.to_csv(file_path, index=False)    return file_path@pytest.fixturedef sqlite_db(tmp_path):    file_path = tmp_path / "database.db"    return 'sqlite:///' + str(file_path)def test_load_data(csv_file):    data = load_data(csv_file)        assert 'name' in data.columns    assert 'age' in data.columns    assert len(data) == 3def test_clean_data(csv_file):    data = load_data(csv_file)    data = clean_data(data)        assert data['age'].isna().sum() == 0    assert len(data) == 2  # Jane的记录应该被删除def test_save_data(csv_file, sqlite_db):    data = load_data(csv_file)    data = clean_data(data)    save_data(data, sqlite_db, 'my_table')        # 检查数据是否保存正确    engine = create_engine(sqlite_db)    inspector = inspect(engine)    tables = inspector.get_table_names()        assert 'my_table' in tables        loaded_data = pd.read_sql('my_table', engine)    assert len(loaded_data) == 2  # 只应该存在John和Doe的记录

这里是另一个例子:假设您有一个从CSV文件中加载数据并将其中的“日期”列从字符串转换为日期时间的流水线:aek28资讯网——每日最新资讯28at.com

def convert_date(data, date_column):    data[date_column] = pd.to_datetime(data[date_column])    return data

为上述函数编写的单元测试将传入具有已知日期字符串格式的DataFrame。然后,它将验证函数是否正确将日期转换为日期时间对象,并且它是否适当处理无效格式。aek28资讯网——每日最新资讯28at.com

我们为上述场景编写一个单元测试。该测试首先使用有效日期检查函数,断言输出DataFrame中的“date”列确实是datetime类型,并且值与预期相符。然后,它检查在给出无效日期时,函数是否正确引发了ValueError。aek28资讯网——每日最新资讯28at.com

import pandas as pdimport pytestdef test_convert_date():    # 使用有效日期进行测试    test_data = pd.DataFrame({        'date': ['2021-01-01', '2021-01-02']    })        converted_data = convert_date(test_data.copy(), 'date')        assert pd.api.types.is_datetime64_any_dtype(converted_data['date'])    assert converted_data.loc[0, 'date'] == pd.Timestamp('2021-01-01')    assert converted_data.loc[1, 'date'] == pd.Timestamp('2021-01-02')    # 使用无效日期进行测试    test_data = pd.DataFrame({        'date': ['2021-13-01']  # 这个日期是无效的,因为没有第13个月    })        with pytest.raises(ValueError):        convert_date(test_data, 'date')

以下是最后一个例子:假设您有一个加载数据并进行聚合的流水线,计算每个地区的总销售额:aek28资讯网——每日最新资讯28at.com

def aggregate_sales(data):    aggregated = data.groupby('region').sales.sum().reset_index()    return aggregated

为该函数编写的单元测试将向其传递具有各个地区销售数据的DataFrame。测试将验证函数是否正确计算每个地区的总销售额。aek28资讯网——每日最新资讯28at.com

我们为该函数编写一个单元测试。在这个测试中,我们首先向aggregate_sales函数传递一个具有已知销售数据的DataFrame,并检查它是否正确聚合了销售额。然后,向其传递一个没有销售数据的DataFrame,并检查它是否正确将这些销售额聚合为0。这样可以确保函数正确处理典型情况和边缘情况。aek28资讯网——每日最新资讯28at.com

以下是使用pytest库为aggregate_sales函数编写单元测试的示例:aek28资讯网——每日最新资讯28at.com

import pandas as pdimport pytestdef test_aggregate_sales():    # 各个地区的销售数据    test_data = pd.DataFrame({        'region': ['North', 'North', 'South', 'South', 'East', 'East', 'West', 'West'],        'sales': [100, 200, 300, 400, 500, 600, 700, 800]    })        aggregated = aggregate_sales(test_data)        assert aggregated.loc[aggregated['region'] == 'North', 'sales'].values[0] == 300    assert aggregated.loc[aggregated['region'] == 'South', 'sales'].values[0] == 700    assert aggregated.loc[aggregated['region'] == 'East', 'sales'].values[0] == 1100    assert aggregated.loc[aggregated['region'] == 'West', 'sales'].values[0] == 1500    # 没有销售数据的测试    test_data = pd.DataFrame({        'region': ['North', 'South', 'East', 'West'],        'sales': [0, 0, 0, 0]    })        aggregated = aggregate_sales(test_data)        assert aggregated.loc[aggregated['region'] == 'North', 'sales'].values[0] == 0    assert aggregated.loc[aggregated['region'] == 'South', 'sales'].values[0] == 0    assert aggregated.loc[aggregated['region'] == 'East', 'sales'].values[0] == 0    assert aggregated.loc[aggregated['region'] == 'West', 'sales'].values[0] == 0

本文转载自微信公众号「Java学研大本营」,可以通过以下二维码关注。转载本文请联系公众号。aek28资讯网——每日最新资讯28at.com

aek28资讯网——每日最新资讯28at.com

本文链接:http://www.28at.com/showinfo-26-10690-0.html数据工程中的单元测试完全指南

声明:本网页内容旨在传播知识,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。邮件:2376512515@qq.com

上一篇: ZGC关键技术分析

下一篇: 团队协作开发中,五个强大的VS Code插件

标签:
  • 热门焦点
Top