景派HPC研究院丨oneAPI并行版Modin与Pandas对比测试

2021-05-10 17:23:40 景派科技-市场部 40

景派HPC研究院

由景派科技的技术团队组成,主要分享高性能计算领域的技术信息,与解答各位朋友在技术上遇到的问题和瓶颈,欢迎大家在文章下方的留言区讨论与提问。

1 Pandas VS Intel Distribution of Modin

1.1 Pandas 简介

Pandas 是什么?Pandas 是一个强大的分析结构化数据的工具集;它的使用基础是 Numpy(提供高性能的矩阵运算);用于数据挖掘和数据分析,同时也提供数据清洗功能[1]。

1.2 Pandas 分布式计算现状

原生 Python 在多线程处理方面比较弱,不能充分利用多核 CPU 的计算能力。同样,原生 Pandas 功能很强大,但不支持分布式计算,因此难以处理大量数据。目前的解决方案是Pandas on RayPandas on Dask,能够在尽量不改变 Pandas 的 API 情况下实现分布式内存计算[2]。相比于目前很火的分布式内存计算框架 Spark,前者是基于 Python 生态,而 Spark 是基于 Hadoop 生态[3-5]。

1.3 Intel Distribution of Modin 简介

这是 Intel 提供的一个分布式 DataFrame 库,可以使用与 Pandas 相同的 API 跨多个节点进行数据预处理。该库在后端与OmniSci集成,用于加速分析。而Modin原本是一个开源项目,基于 Python 下的分布式计算框架DaskRay对 Pandas 进行并行化加速。Intel Distribution of Modin 就是基于开源版的 Modin 优化而来,对 Ray 和 Dask 均有优化[6]。

2 安装方法

2.1 Pandas

官方版 Pandas 安装是非常方便的,可以直接使用conda来安装:

conda install pandas

或者使用 pip 来安装:

pip install pandas

2.2 Intel Distribution of Modin

参考Intel Software 官网的安装文档, Intel Distribution of Modin并不包含在Intel® AI Analytics Toolkit安装程序中,需要用APT Package ManagerConda Package ManagerDocker HubYUM Package Manager或者Zypper Package Manager来获取安装。本文使用Anaconda来安装,只需要使用如下命令即可:

conda create -n aikit-modin intel-aikit-modin -c intel -c conda-forge

使用上述命令回新建一个叫做aikit-modin的 Anaconda 虚拟环境,为了使用这个环境中的 Modin,我们只需要激活他即可:

conda activate aikit-modin

3 Pandas VS Intel Distribution of Modin[Dask]

根据 Modin 官方 GitHub 仓库主页,其对 Pandas API 的支持情况如下图所示:

图片关键词


3.1 导包构建环境

import os

os.environ["MODIN_ENGINE"] = "dask"# 设置Modin底层使用Dask框架进行并行计算
import pandas as pd# 导入官方版pandas
import modin.pandas as mpd# 导入Modin版pandas
import numpy as np
import time
import matplotlib.pyplot as plt
#导入中文字体,避免显示乱码
plt.rcParams['font.family'] = 'WenQuanYi Micro Hei'# 适用于CentOS下的
plt.rcParams['axes.unicode_minus'] = False

3.2 读入.csv文件速度对比

Modin 基本覆盖了 Pandas 所有经常用到的读入数据的 API,其中pd.read_csv是最最常用的,因此本文将以此 API 为例进行对比。如下代码将会分别使用 Pandas 和 Modin 读入 9GB 的移动基站流量数据,对比两者的耗时,并且以图的形式展示。

t = time.time()
pd_df = pd.read_csv('/home/caibo/jupyter_proj/data/mathor_cup/train1.csv',
encoding='gbk')
pd_time = time.time() - t
print('Pandas times : {} s'.format(pd_time))
t = time.time()
md_df = mpd.read_csv('/home/caibo/jupyter_proj/data/mathor_cup/train2.csv',
 encoding='gbk')
mpd_time = time.time() - t
print('Modin times: {} s'.format(mpd_time))
plt.bar(['Pandas', 'Modin'], [pd_time, mpd_time])
plt.ylabel('Times(s)')
plt.title('pd.read_csv数据读入速度对比')
plt.show()

Pandas times : 181.7861328125 s
UserWarning: Dask execution environment not yet initialized. Initializing...
To remove this warning, run the following python code before doing dataframe operations:

from distributed import Client

client = Client()

UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 44508 instead
Modin times: 46.26690101623535 s

图片关键词

结果可知,读取同样的文件,使用基于 Dask 框架的 Modin 的速度是 Pandas 的392.91%,加速效果非常明显!

3.3 DataFrame基本操作速度对比

在 Pandas 数据分析和处理中,对 DataFrame 的操作是非常多的,因此如果将 DataFrame 的常用操作并行化,将会极大提高数据分析和处理的效率。接下来我们会对比测试基于 Dask 框架的 Modin 和 Pandas 调用 DataFrame API 进行数据分析和处理的速度。Pandas.concat()DataFrame.apply()DataFrame.describe()这三个 API 的使用非常频繁而且易于并行化实现,因此本文将会对比 Modin 和 Pandas 进行这两个操作的耗时。

下面的带代码使用 Modin 和 Pandas 分别合并 DataFrame,并且对比两者的耗时。

t = time.time()
pd.concat([pd_df, pd_df], axis=0)
t_pd_info = time.time() - t
print('Pandas times : {} s'.format(t_pd_info))

t = time.time()
mpd.concat([md_df, md_df], axis=0)
t_mpd_info = time.time() - t
print('Modin times: {} s'.format(t_mpd_info))
plt.bar(['Pandas', 'Modin'], [t_pd_info, t_mpd_info])
plt.ylabel('Times(s)')
plt.show()

Pandas times : 23.865532159805298 s
Modin times: 3.5132291316986084 s

图片关键词

下面的带代码使用 Modin 和 Pandas 分别使用DataFrame.apply()对数据的日期时间字段进行修正,并且对比两者的耗时。

def time_correct(df):
# 时间修正
# 将0x:00:00 的时间修正为 x:00:00
# 将xx:00:0 的时间修正为 xx:00:00
# df['时间']=clock_correct(df)
return df['时间'].apply(
lambda x: '0' + str(int(x[0:x.index(":")])) + ':00:00'if int(x[
0:x.index(":")]) < 10else str(int(x[0:x.index(":")])) + ':00:00')


def date_correct(df):
# 日期修正
# 将0xx-0x-0x 的时间修正为 20xx/x/x
# df['日期']=clock_correct(df)
return df['日期'].apply(lambda x: str(
str(2000 + int(x[0:x.index("-")])) + '/' + str(
int(x[4:x.index('-', 4)])) + '/' + str(int(x[7:])))
if x.count('-') > 0else x)

t = time.time()
pd_df['日期'] = date_correct(pd_df)
pd_df['时间'] = time_correct(pd_df)
t_pd_info = time.time() - t
print('Pandas times : {} s'.format(t_pd_info))

t = time.time()
md_df['日期'] = date_correct(md_df)
md_df['时间'] = time_correct(md_df)
t_mpd_info = time.time() - t
print('Modin times: {} s'.format(t_mpd_info))
plt.bar(['Pandas', 'Modin'], [t_pd_info, t_mpd_info])
plt.ylabel('Times(s)')
plt.show()

Pandas times : 370.6549892425537 s
Modin times: 0.29850006103515625 s

图片关键词

此测试种 Modin 相比 Pandas 执行速度快了非常多,有效性有待验证。

下面的带代码分别使用 Modin 和 Panda 的 DataFrame.mean() API 获取 DataFrame 的统计信息,并且对比两者的耗时。

t = time.time()
pd_df['上行业务量GB'].mean()
t_pd_info = time.time() - t
print('Pandas times : {} s'.format(t_pd_info))

t = time.time()
md_df['上行业务量GB'].mean()
t_mpd_info = time.time() - t
print('Modin times: {} s'.format(t_mpd_info))
plt.bar(['Pandas', 'Modin'], [t_pd_info, t_mpd_info])
plt.ylabel('Times(s)')
plt.show()

Pandas times : 4.411945343017578 s
Modin times: 100.35079002380371 s

图片关键词

可以看到对于这个接口,Modin 并没有 Pandas 快,慢了非常多;此外还发现 Modin 运行过程中内存占用非常大,执行再复杂的操作如DataFrame.describe()时会直接占满内存导致异常中断,而原生 Pandas API 并不会。

4 测试小结

基于 Dask 框架的 Intel Distribution of Modin 可以实现 Pandas 大部分 API 的并行化,比如数据读入、一些 DataFrame 和 Series 的基本操作。对于那些易于并行化实现的 API,Modin 基本上都覆盖了,而那些本不可以并行化实现的 API 依旧调用自 Pandas。部分 API 使用 Modin 可以显著提高处理速度,特别是DataFrame.apply();但是有一些操作不仅不能提速,反而没有 Pandas 原生 API 速度快,造成负优化。此外由于 Modin 还不够完善,在使用过程中调用 M 某些 API 时会出现内存占用过高、进程无端中断的问题。以上测试均是在单机环境下进行的,集群环境下的使用效果有待检验。

参考资料

[1] Pandas官网:pandas.pydata.org/
[2] Modin官网:github.com/modin-projec
[3] Spark vs Dask Python生态下的计算引擎:cloud.tencent.com/devel
[4] python 链式计算框架_Python的分布式计算框架——Dask调度器简介:blog.csdn.net/weixin_39
[5] Ray 分布式计算框架介绍:zhuanlan.zhihu.com/p/11
[6] Intel® oneAPI AI Analytics Toolkit:software.intel.com/cont

景派科技丨超算丨HPC

在线留言

电话咨询
邮件咨询
在线地图
QQ客服