Препроцессинг данных

На GPU при помощи библиотеки RAPIDS

Используем образ registry.aicloud.sbcp.ru/base/rapids:

client_lib.Job(base_image='registry.aicloud.sbcp.ru/base/rapids',
                    script='preprocessing.py',
                    n_workers=1, n_gpus=1, warm_cache=False)

Пример 1 (Использование одиночного GPU для препроцессинга)

Использование GPU значительно ускоряет препроцессинг данных, ниже представлен график, показывающий ВО СКОЛЬКО РАЗ 1 GPU быстрее 1 CPU по основным категориям обработки данных.

../_images/s__gpu-vs-cpu.jpg

Запуск тестовой задачи:

client_lib.Job(base_image='registry.aicloud.sbcp.ru/base/rapids',
                    script='preprocessing.py',
                    n_workers=1, n_gpus=1, warm_cache=False)

Тестовый скрипт preprocessing.py:

import cudf
import pandas as pd
import time
from mpi4py import MPI
import cupy as cp
import numpy as np
import warnings

warnings.filterwarnings('ignore')

if __name__ == '__main__':
    comm = MPI.COMM_WORLD
    if comm.rank == 0:
        rand1 = np.random.randint(low=0, high=int(1e7), size=int(3e7))
        rand2 = np.random.random(size=int(3e7))
        rand3 = np.random.random(size=int(3e7))
        pdf = pd.DataFrame()
        pdf['a'] = rand1
        pdf['b'] = rand2
        pdf['c'] = rand3

        gpu_rand1 = cp.random.randint(low=0, high=int(1e7), size=int(3e7))
        gpu_rand2 = cp.random.random(size=int(3e7))
        gpu_rand3 = cp.random.random(size=int(3e7))

        gdf = cudf.DataFrame()
        gdf['a'] = gpu_rand1
        gdf['b'] = gpu_rand2
        gdf['c'] = gpu_rand3

        del gpu_rand1, gpu_rand2, gpu_rand3, rand1, rand2, rand3

        start_time = time.time()
        pdf.groupby('a')
        print(f'CPU groupby time = {round(time.time() - start_time, 3)} sec')

        start_time = time.time()
        gdf.groupby('a')
        print(f'GPU groupby time = {round(time.time() - start_time, 3)} sec')

        start_time = time.time()
        pdf = pdf.merge(pdf, on=['a'])
        print(f'CPU merge time = {round(time.time() - start_time, 3)} sec')

        start_time = time.time()
        gdf = gdf.merge(gdf, on=['a'])
        print(f'GPU merge time = {round(time.time() - start_time, 3)} sec')

        def my_udf(x):
            return (x ** 2) - x

        start_time = time.time()
        pdf['d'] = pdf['a'].apply(my_udf)
        print(f'CPU apply time = {round(time.time() - start_time, 3)} sec')

        start_time = time.time()
        gdf['d'] = gdf['a'].applymap(my_udf)
        print(f'GPU apply time = {round(time.time() - start_time, 3)} sec')

        pdf['d'] = pdf['a'].apply(my_udf)
        print(f'CPU apply time = {round(time.time() - start_time, 3)} sec')

        start_time = time.time()
        gdf['d'] = gdf['a'].applymap(my_udf)
        print(f'GPU apply time = {round(time.time() - start_time, 3)} sec')

Пример 2 (Распределенный препроцессинг в рамках одного DGX)

Для решения такого рода задач необходимо выставить количество процессов на одного воркера (processes_per_worker) и количество воркеров в 1. Создание дополнительных процессов и воркеров по количеству GPU будет обеспечивать утилита LocalCudaCluster из пакета dask_cuda.

client_lib.Job(base_image='registry.aicloud.sbcp.ru/base/rapids',
                    script='preprocessing.py',
                    n_workers=1, n_gpus=16, warm_cache=False,
                    processes_per_worker=1)

Пример простого скрипта, формирующего синтетическую выборку на GPU и выполняющего расчеты распределенно, можно увидеть ниже:

preprocessing.py:

import cudf
from dask.distributed import Client
from dask_cuda import LocalCUDACluster
import dask
import dask_cudf
import cupy as cp
import warnings

warnings.filterwarnings('ignore')

if __name__ == '__main__':
    cluster = LocalCUDACluster()
    client = Client(cluster)

    def make_cudf_dataframe(nrows=int(1e8)):
        cudf_df = cudf.DataFrame()
        cudf_df['a'] = cp.random.randint(low=0, high=1000, size=nrows)
        cudf_df['b'] = cp.random.randint(low=0, high=1000, size=nrows)
        cudf_df['c'] = cp.random.random(nrows)
        cudf_df['d'] = cp.random.random(nrows)
        return cudf_df

    delayed_cudf_dataframe = [dask.delayed(make_cudf_dataframe)() for i in range(20)]
    ddf = dask_cudf.from_delayed(delayed_cudf_dataframe)
    ddf.groupby(['a', 'b']).agg({'c': ['sum', 'mean']}).compute()
    client.close()
    cluster.close()

Пример 3 (Распределенный препроцессинг на кластере)

Для использования библиотек rapids в интерактивном режиме в рамках Jupyter Server необходимо выбрать образ jupyter-rapids:0.0.51 (или выше) из списка доступных Jupyter образов с GPU. С примерами использования можно ознакомиться как на сайте Rapids, так и на нашем Github.