Data preprocessing is one of the main steps in any machine learning process. Tensorflow Transform helps us achieve this in a distributed environment on a huge dataset.
Before we dive into data transformation, data validation is the first step in the production process, which has been covered in my article. Data validation in a production process: the TFX methodTake a look at this article to understand it better.
I used Colab for this demo, as it's much easier (and quicker) to set up the environment. If you're in the exploration phase, I would also recommend Colab, as it will help you focus on the most important things.
ML Pipeline operations start with data ingestion and validation, followed by transformation. The transformed data is then trained and deployed. I have discussed the validation part in my previous article. articleNow we will cover the transformation section. To better understand pipelines in Tensorflow, check out the following article.
As stated above, we will be using Colab. So, we just need to install the tfx library and that's it.
! pip install tfx
After installation, please log back in to continue.
Then come the imports.
# Importing Librariesimport tensorflow as tf
from tfx.components import CsvExampleGen
from tfx.components import ExampleValidator
from tfx.components import SchemaGen
from tfx.v1.components import ImportSchemaGen
from tfx.components import StatisticsGen
from tfx.components import Transform
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext
from google.protobuf.json_format import MessageToDict
import os
We will be using the Titanic spaceship dataset from Kaggle, as in the data validation article. This dataset is free to use for commercial and non-commercial purposes. You can access it from hereA description of the dataset is shown in the following figure.
To start with the data transformation part, it is recommended to create folders where the pipeline components will be placed (otherwise they will be placed in the default directory). I have created two folders, one for the pipeline components and the other for our training data.
# Path to pipeline folder
# All the generated components will be stored here_pipeline_root = '/content/tfx/pipeline/'
# Path to training data
# It can even contain multiple training data files
_data_root = '/content/tfx/data/'
Next, we create the InteractiveContext and pass the path to the pipeline directory. This process also creates a sqlite database to store the pipeline process metadata.
InteractiveContext is designed to explore each stage of the process. At each point, we can have a view of the artifacts being created. When we are in a production environment, it is ideal to use a pipeline creation framework such as Apache Beam, where this entire process will be executed automatically, without intervention.
# Initializing the InteractiveContext
# This will create an sqlite db for storing the metadatacontext = InteractiveContext(pipeline_root=_pipeline_root)
Next, we start with the data ingestion. If the data is stored as a csv file, we can use CsvExampleGen and pass the path to the directory where the data files are stored.
Make sure the folder contains only the training data and nothing else. If your training data is split into multiple files, make sure they have the same header.
# Input CSV files
example_gen = CsvExampleGen(input_base=_data_root)
TFX currently supports csv, tf.Record, BigQuery and some custom executors. More information at the following link.
To run the ExampleGen component, use context.run.
# Execute the componentcontext.run(example_gen)
After running the component, this will be our result. It provides the run ID, component details, and where the component results are saved.
By expanding it, we should be able to see these details.
The directory structure looks like the image below. TFX has created all these artifacts for us. They are also automatically versioned and the details are stored in metadata.sqlite. The sqlite file helps maintain the provenance or lineage of the data.
To explore these artifacts programmatically, use the following code.
# View the generated artifacts
artifact = example_gen.outputs('examples').get()(0)# Display split names and uri
print(f'split names: {artifact.split_names}')
print(f'artifact uri: {artifact.uri}')
The output would be the filenames and the URI.
Let’s copy the train URI and look at the details inside the file. The file is stored as a zip file and is saved in TFRecordDataset format.
# Get the URI of the output artifact representing the training examples
train_uri = os.path.join(artifact.uri, 'Split-train')# Get the list of files in this directory (all compressed TFRecord files)
tfrecord_filenames = (os.path.join(train_uri, name)
for name in os.listdir(train_uri))
# Create a `TFRecordDataset` to read these files
dataset = tf.data.TFRecordDataset(tfrecord_filenames, compression_type="GZIP")
The following code is taken from Tensorflow, it is the standard code that can be used to select records from TFRecordDataset and returns the results for us to examine.
# Helper function to get individual examples
def get_records(dataset, num_records):
'''Extracts records from the given dataset.
Args:
dataset (TFRecordDataset): dataset saved by ExampleGen
num_records (int): number of records to preview
'''# initialize an empty list
records = ()
# Use the `take()` method to specify how many records to get
for tfrecord in dataset.take(num_records):
# Get the numpy property of the tensor
serialized_example = tfrecord.numpy()
# Initialize a `tf.train.Example()` to read the serialized data
example = tf.train.Example()
# Read the example data (output is a protocol buffer message)
example.ParseFromString(serialized_example)
# convert the protocol bufffer message to a Python dictionary
example_dict = (MessageToDict(example))
# append to the records list
records.append(example_dict)
return records
# Get 3 records from the dataset
sample_records = get_records(dataset, 3)# Print the output
pp.pprint(sample_records)
We request 3 records and the result is as follows: each record and its metadata are stored in dictionary format.
Next, we move on to the next process, which is to generate the statistics for the data using StatisticsGen. We pass the results of the example_gen object as an argument.
We run the component using statistics.run, with statistics_gen as the argument.
# Generate dataset statistics with StatisticsGen using the example_gen objectstatistics_gen = StatisticsGen(
examples=example_gen.outputs('examples'))
# Execute the component
context.run(statistics_gen)
We can use context.show to see the results.
# Show the output statisticscontext.show(statistics_gen.outputs('statistics'))
You can see that it is very similar to the statistics generation we discussed in the TFDV article. The reason is that TFX uses TFDV in the background to perform these operations. Familiarizing yourself with TFDV will help you better understand these processes.
The next step is to create the schema. This is done using SchemaGen by passing the statistics_gen object. Run the component and view it using context.show.
# Generate schema using SchemaGen with the statistics_gen objectschema_gen = SchemaGen(
statistics=statistics_gen.outputs('statistics'),
)
# Run the component
context.run(schema_gen)
# Visualize the schema
context.show(schema_gen.outputs('schema'))
The output shows details about the underlying schema of the data. Again, the same as in TFDV.
If you need to make modifications to the schema presented here, do so using tfdv and create a schema file. You can pass it through ImportSchemaGen and tell tfx to use the new file.
# Adding a schema file manually
schema_gen = ImportSchemaGen(schema_file="path_to_schema_file/schema.pbtxt")
Next, we validate the examples using ExampleValidator. We pass statistics_gen and schema_gen as arguments.
# Validate the examples using the ExampleValidator
# Pass statistics_gen and schema_gen objectsexample_validator = ExampleValidator(
statistics=statistics_gen.outputs('statistics'),
schema=schema_gen.outputs('schema'))
# Run the component.
context.run(example_validator)
This should be the ideal result to show that everything is fine.
At this point, our directory structure looks like the image below. We can see that for each step of the process, corresponding artifacts are created.
Let's move on to the actual transformation part. Now we will create the constants.py file to add all the constants needed for the process.
# Creating the file containing all constants that are to be used for this project_constants_module_file = 'constants.py'
We will create all the constants and write them in the constants.py file. Please refer to the command “%%writefile {_constants_module_file}”, this command does not run the code, but instead writes all the code for the specified cell to the specified file.
%%writefile {_constants_module_file}# Features with string data types that will be converted to indices
CATEGORICAL_FEATURE_KEYS = ( 'CryoSleep','Destination','HomePlanet','VIP')
# Numerical features that are marked as continuous
NUMERIC_FEATURE_KEYS = ('Age','FoodCourt','RoomService', 'ShoppingMall','Spa','VRDeck')
# Feature that can be grouped into buckets
BUCKET_FEATURE_KEYS = ('Age')
# Number of buckets used by tf.transform for encoding each bucket feature.
FEATURE_BUCKET_COUNT = {'Age': 4}
# Feature that the model will predict
LABEL_KEY = 'Transported'
# Utility function for renaming the feature
def transformed_name(key):
return key + '_xf'
Let's create the transform.py file, which will contain the actual code to transform the data.
# Creating a file that contains all preprocessing code for the project_transform_module_file = 'transform.py'
Here, we will be using tensorflow_transform library. The code for the transformation process will be written under the preprocessing_fn function. It is mandatory that we use the same name as tfx looks for it internally during the transformation process.
%%writefile {_transform_module_file}import tensorflow as tf
import tensorflow_transform as tft
import constants
# Unpack the contents of the constants module
_NUMERIC_FEATURE_KEYS = constants.NUMERIC_FEATURE_KEYS
_CATEGORICAL_FEATURE_KEYS = constants.CATEGORICAL_FEATURE_KEYS
_BUCKET_FEATURE_KEYS = constants.BUCKET_FEATURE_KEYS
_FEATURE_BUCKET_COUNT = constants.FEATURE_BUCKET_COUNT
_LABEL_KEY = constants.LABEL_KEY
_transformed_name = constants.transformed_name
# Define the transformations
def preprocessing_fn(inputs):
outputs = {}
# Scale these features to the range (0,1)
for key in _NUMERIC_FEATURE_KEYS:
outputs(_transformed_name(key)) = tft.scale_to_0_1(
inputs(key))
# Bucketize these features
for key in _BUCKET_FEATURE_KEYS:
outputs(_transformed_name(key)) = tft.bucketize(
inputs(key), _FEATURE_BUCKET_COUNT(key))
# Convert strings to indices in a vocabulary
for key in _CATEGORICAL_FEATURE_KEYS:
outputs(_transformed_name(key)) = tft.compute_and_apply_vocabulary(inputs(key))
# Convert the label strings to an index
outputs(_transformed_name(_LABEL_KEY)) = tft.compute_and_apply_vocabulary(inputs(_LABEL_KEY))
return outputs
We've used some standard scaling and encoding functions for this demonstration. The transformation library actually houses a lot of functions. Explore them here.
Now it’s time to see the transformation process in action. We create a Transform object and pass the example_gen and schema_gen objects, along with the path to the transform.py we created.
# Ignore TF warning messages
tf.get_logger().setLevel('ERROR')# Instantiate the Transform component with example_gen and schema_gen objects
# Pass the path for transform file
transform = Transform(
examples=example_gen.outputs('examples'),
schema=schema_gen.outputs('schema'),
module_file=os.path.abspath(_transform_module_file))
# Run the component
context.run(transform)
Run it and the transformation part is complete!
Take a look at the transformed data shown in the following image.
This is your question now, right?
This process is not intended for people who want to preprocess their data and start training models. It is intended for large amounts of data (data that requires distributed processing) and an automated production chain that cannot afford to experience interruptions.
After applying the transformation, your folder structure will look like this
Contains pre and post transformation details. In addition, a transformation graph is also created.
Recall that we scale our numeric features using tft.scale_to_0_1. Features like this require computational details that require analyzing the entire data (such as the average, minimum, and maximum values of a feature). Analyzing data spread across multiple machines to get these details is performance-intensive (especially if done multiple times). Such details are computed once and kept in the transform_graph. Each time a feature needs them, they are fetched directly from the transform_graph. It also helps to apply the transformations created during the training phase directly to the delivery data, ensuring consistency in the preprocessing phase.
Another major advantage of using Tensorflow transformation libraries is that each phase is recorded as an artifact, thus maintaining data lineage. Data versioning is also done automatically when data changes. Thus, it makes it easier to experiment, deploy, and rollback in a production environment.
That's all. If you have any questions, please write them in the comments section.
You can download the notebook and data files used in this article from my GitHub repository using this link