{ "cells": [ { "cell_type": "raw", "id": "2c5291f3", "metadata": { "raw_mimetype": "text/restructuredtext" }, "source": [ ".. _performance:" ] }, { "cell_type": "markdown", "id": "9dab133a", "metadata": {}, "source": [ "# Improving Computational Performance\n", "\n", "Feature engineering is a computationally expensive task. While Featuretools comes with reasonable default settings for feature calculation, there are a number of built-in approaches to improve computational performance based on dataset and problem specific considerations.\n", "\n", "## Reduce number of unique cutoff times\n", "Each row in a feature matrix created by Featuretools is calculated at a specific cutoff time that represents the last point in time that data from any dataframe in an entityset can be used to calculate the feature. As a result, calculations incur an overhead in finding the subset of allowed data for each distinct time in the calculation." ] }, { "cell_type": "raw", "id": "6ab1a83a", "metadata": { "raw_mimetype": "text/restructuredtext" }, "source": [ ".. note::\n", "\n", " Featuretools is very precise in how it deals with time. For more information, see :doc:`/getting_started/handling_time`." ] }, { "cell_type": "markdown", "id": "051fbaba", "metadata": {}, "source": [ "If there are many unique cutoff times, it is often worthwhile to figure out how to have fewer. This can be done manually by figuring out which unique times are necessary for the prediction problem or automatically using [approximate](../getting_started/handling_time.ipynb#Approximating-Features-by-Rounding-Cutoff-Times).\n", "\n", "## Parallel Feature Computation\n", "\n", "Computational performance can often be improved by parallelizing the feature calculation process. There are several different approaches that can be used to perform parallel feature computation with Featuretools. An overview of the most commonly used approaches is provided below." ] }, { "cell_type": "markdown", "id": "b47e770f", "metadata": {}, "source": [ "\n", "### Simple Parallel Feature Computation\n", "If using a pandas `EntitySet`, Featuretools can optionally compute features on multiple cores. The simplest way to control the amount of parallelism is to specify the `n_jobs` parameter:\n", "\n", "```python3\n", "fm = ft.calculate_feature_matrix(features=features,\n", " entityset=entityset,\n", " cutoff_time=cutoff_time,\n", " n_jobs=2,\n", " verbose=True)\n", "```\n", "The above command will start 2 processes to compute chunks of the feature matrix in parallel. Each process receives its own copy of the entityset, so memory use will be proportional to the number of parallel processes. Because the entityset has to be copied to each process, there is overhead to perform this operation before calculation can begin. To avoid this overhead on successive calls to `calculate_feature_matrix`, read the section below on using a persistent cluster.\n", "\n", "#### Adjust chunk size\n", "By default, Featuretools calculates rows with the same cutoff time simultaneously. The *chunk_size* parameter limits the maximum number of rows that will be grouped and then calculated together. If calculation is done using parallel processing, the default chunk size is set to be `1 / n_jobs` to ensure the computation can be spread across available workers. Normally, this behavior works well, but if there are only a few unique cutoff times it can lead to higher peak memory usage (due to more intermediate calculations stored in memory) or limited parallelism (if the number of chunks is less than *n_jobs*).\n", "\n", "By setting `chunk_size`, we can limit the maximum number of rows in each group to specific number or a percentage of the overall data when calling `ft.dfs` or `ft.calculate_feature_matrix`:\n", "\n", "```python3\n", "# use maximum 100 rows per chunk\n", "feature_matrix, features_list = ft.dfs(entityset=es,\n", " target_dataframe_name=\"customers\",\n", " chunk_size=100)\n", "```\n", "\n", "We can also set chunk size to be a percentage of total rows:\n", "\n", "```python3\n", "# use maximum 5% of all rows per chunk\n", "feature_matrix, features_list = ft.dfs(entityset=es,\n", " target_dataframe_name=\"customers\",\n", " chunk_size=.05)\n", "```\n", "\n", "#### Using persistent cluster\n", "Behind the scenes, Featuretools uses [Dask's](http://dask.pydata.org/) distributed scheduler to implement multiprocessing. When you only specify the `n_jobs` parameter, a cluster will be created for that specific feature matrix calculation and destroyed once calculations have finished. A drawback of this is that each time a feature matrix is calculated, the entityset has to be transmitted to the workers again. To avoid this, we would like to reuse the same cluster between calls. The way to do this is by creating a cluster first and telling featuretools to use it with the `dask_kwargs` parameter:\n", "\n", "```python3\n", "import featuretools as ft\n", "from dask.distributed import LocalCluster\n", "\n", "cluster = LocalCluster()\n", "fm_1 = ft.calculate_feature_matrix(features=features_1,\n", " entityset=entityset,\n", " cutoff_time=cutoff_time,\n", " dask_kwargs={'cluster': cluster},\n", " verbose=True)\n", "```\n", "\n", "The 'cluster' value can either be the actual cluster object or a string of the address the cluster's scheduler can be reached at. The call below would also work. This second feature matrix calculation will not need to resend the entityset data to the workers because it has already been saved on the cluster.\n", "\n", "```python3\n", "fm_2 = ft.calculate_feature_matrix(features=features_2,\n", " entityset=entityset,\n", " cutoff_time=cutoff_time,\n", " dask_kwargs={'cluster': cluster.scheduler.address},\n", " verbose=True)\n", "```" ] }, { "cell_type": "raw", "id": "57aaa835", "metadata": { "raw_mimetype": "text/restructuredtext" }, "source": [ ".. note::\n", "\n", " When using a persistent cluster, Featuretools publishes a copy of the ``EntitySet`` to the cluster the first time it calculates a feature matrix. Based on the ``EntitySet``'s metadata the cluster will reuse it for successive computations. This means if two ``EntitySets`` have the same metadata but different row values (e.g. new data is added to the ``EntitySet``), Featuretools won’t recopy the second ``EntitySet`` in later calls. A simple way to avoid this scenario is to use a unique ``EntitySet`` id." ] }, { "cell_type": "markdown", "id": "cdecad1d", "metadata": {}, "source": [ "#### Using the distributed dashboard\n", "Dask.distributed has a web-based diagnostics dashboard that can be used to analyze the state of the workers and tasks. It can also be useful for tracking memory use or visualizing task run-times. An in-depth description of the web interface can be found [here](https://distributed.readthedocs.io/en/latest/web.html).\n", "\n", "![Distributed dashboard image](../_static/images/dashboard.png)\n", "\n", "The dashboard requires an additional python package, bokeh, to work. Once bokeh is installed, the web interface will be launched by default when a LocalCluster is created. The cluster created by featuretools when using `n_jobs` does not enable the web interface automatically. To do so, the port to launch the main web interface on must be specified in `dask_kwargs`:\n", "\n", "```python3\n", "fm = ft.calculate_feature_matrix(features=features,\n", " entityset=entityset,\n", " cutoff_time=cutoff_time,\n", " n_jobs=2,\n", " dask_kwargs={'diagnostics_port': 8787}\n", " verbose=True)\n", "```\n", "\n", "### Parallel Computation by Partitioning Data\n", "As an alternative to Featuretools' parallelization, the data can be partitioned and the feature calculations run on multiple cores or a cluster using Dask or Apache Spark with PySpark. This approach may be necessary with a large pandas `EntitySet` because the current parallel implementation sends the entire `EntitySet` to each worker which may exhaust the worker memory. Dask and Spark allow Featuretools to scale to multiple cores on a single machine or multiple machines on a cluster." ] }, { "cell_type": "markdown", "id": "795cc323", "metadata": {}, "source": [ "When an entire dataset is not required to calculate the features for a given set of instances, we can split the data into independent partitions and calculate on each partition. For example, imagine we are calculating features for customers and the features are \"number of other customers in this zip code\" or \"average age of other customers in this zip code\". In this case, we can load in data partitioned by zip code. As long as we have all of the data for a zip code when calculating, we can calculate all features for a subset of customers.\n", "\n", "An example of this approach can be seen in the [Predict Next Purchase demo notebook](https://github.com/featuretools/predict_next_purchase). In this example, we partition data by customer and only load a fixed number of customers into memory at any given time. We implement this easily using [Dask](https://dask.pydata.org/), which could also be used to scale the computation to a cluster of computers. A framework like [Spark](https://spark.apache.org/) could be used similarly.\n", "\n", "An additional example of partitioning data to distribute on multiple cores or a cluster using Dask can be seen in the [Featuretools on Dask notebook](https://github.com/Featuretools/Automated-Manual-Comparison/blob/main/Loan%20Repayment/notebooks/Featuretools%20on%20Dask.ipynb). This approach is detailed in the [Parallelizing Feature Engineering with Dask article](https://medium.com/feature-labs-engineering/scaling-featuretools-with-dask-ce46f9774c7d) on the Feature Labs engineering blog. Dask allows for simple scaling to multiple cores on a single computer or multiple machines on a cluster.\n", "\n", "For a similar partition and distribute implementation using Apache Spark with PySpark, refer to the [Feature Engineering on Spark notebook](https://github.com/Featuretools/predict-customer-churn/blob/main/churn/4.%20Feature%20Engineering%20on%20Spark.ipynb). This implementation shows how to carry out feature engineering on a cluster of EC2 instances using Spark as the distributed framework." ] } ], "metadata": { "celltoolbar": "Raw Cell Format", "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.2" } }, "nbformat": 4, "nbformat_minor": 5 }