TLDR: if we are trying to use a HuggingFaceProcessor/Estimator in a Sagemaker Studio project, what are the requirements for the train.py
file in terms of how it refers to the assembled training data, and where it should store the results of the operations it performs( e.g. compiled model, datae etc.)
FULL DETAILS
So our high level goal is to be able to deploy some kind of non-XGB model from a sagemaker studio project, given that the templates provided are all XGB. As outlined in an earlier question we'd started with TensorFlow, but since our TensorFlow model wraps a HuggingFace model we thought let's try something even simpler, just a HuggingFace model using the HuggingFaceProcessor.
So following docs on HuggingFaceProcessor and a HuggingFace Estimator example we started to adjust the abalone (project template) pipeline.py to look like this (full code can be provided on request):
# processing step for feature engineering
hf_processor = HuggingFaceProcessor(
role=role,
instance_count=processing_instance_count,
instance_type=processing_instance_type,
transformers_version='4.4.2',
pytorch_version='1.6.0',
base_job_name=f"{base_job_prefix}/frameworkprocessor-hf",
sagemaker_session=pipeline_session,
)
step_args = hf_processor.run(
outputs=[
ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
],
code=os.path.join(BASE_DIR, "preprocess.py"),
arguments=["--input-data", input_data],
)
step_process = ProcessingStep(
name="PreprocessTopicData",
step_args=step_args,
)
# training step for generating model artifacts
model_path = f"s3://{sagemaker_session.default_bucket()}/{base_job_prefix}/TopicTrain"
hf_train = HuggingFace(entry_point='train.py',
source_dir=BASE_DIR,
base_job_name='huggingface-sdk-extension',
instance_type=processing_instance_type,
instance_count=processing_instance_count,
transformers_version='4.4',
pytorch_version='1.6',
py_version='py36',
role=role,
)
hf_train.set_hyperparameters(
epochs=3,
train_batch_size=16,
learning_rate=1.0e-5,
model_name='distilbert-base-uncased',
)
step_args = hf_train.fit(
inputs={
"train": TrainingInput(
s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
"train"
].S3Output.S3Uri,
content_type="text/csv",
),
"validation": TrainingInput(
s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
"validation"
].S3Output.S3Uri,
content_type="text/csv",
),
},
)
Finding that pushing to master doesn't provide any feedback on issues arising from pipeline.py, we realised that trying to get the pipeline from a notebook was a better way of debugging these sorts of changes, assuming one remembered to restart the kernel each time to ensure changes to the pipeline.py file was available to the notebook.
So using the following code in the notebook we worked through a series of issues trying to bash the code into shape such that it would compile:
from pipelines.topic.pipeline import get_pipeline
pipeline = get_pipeline(
region=region,
role=role,
default_bucket=default_bucket,
model_package_group_name=model_package_group_name,
pipeline_name=pipeline_name,
)
We needed to change the default processing and training instance types to avoid a "cpu" unsupported issue:
processing_instance_type="ml.p3.xlarge",
training_instance_type="ml.p3.xlarge",
and add a train.py script:
from transformers import AutoTokenizer
from transformers import TFAutoModelForSequenceClassification
tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased")
model = TFAutoModelForSequenceClassification.from_pretrained("distilbert-base-uncased", num_labels=18)
import pandas as pd
import tensorflow as tf
from sklearn.model_selection import train_test_split
from transformers import (
DistilBertTokenizerFast,
TFDistilBertForSequenceClassification,
)
DATA_COLUMN = 'text'
LABEL_COLUMN = 'label'
MAX_SEQUENCE_LENGTH = 512
LEARNING_RATE = 5e-5
BATCH_SIZE = 16
NUM_EPOCHS = 3
NUM_LABELS = 15
if __name__ == "__main__":
# --------------------------------------------------------------------------------
# Tokenizer
# --------------------------------------------------------------------------------
tokenizer = DistilBertTokenizerFast.from_pretrained('distilbert-base-uncased')
def tokenize(sentences, max_length=MAX_SEQUENCE_LENGTH, padding='max_length'):
"""Tokenize using the Huggingface tokenizer
Args:
sentences: String or list of string to tokenize
padding: Padding method ['do_not_pad'|'longest'|'max_length']
"""
return tokenizer(
sentences,
truncation=True,
padding=padding,
max_length=max_length,
return_tensors="tf"
)
# --------------------------------------------------------------------------------
# Load data
# --------------------------------------------------------------------------------
from keras.utils import to_categorical
from sklearn.preprocessing import LabelEncoder
labelencoder_Y_1 = LabelEncoder()
yy = labelencoder_Y_1.fit_transform(train_data[LABEL_COLUMN].tolist())
yy = to_categorical(yy)
print(len(yy))
print(yy.shape)
train_dat, validation_dat, train_label, validation_label = train_test_split(
train_data[DATA_COLUMN].tolist(),
yy,
test_size=0.2,
shuffle=True
)
# --------------------------------------------------------------------------------
# Prepare TF dataset
# --------------------------------------------------------------------------------
train_dataset = tf.data.Dataset.from_tensor_slices((
dict(tokenize(train_dat)), # Convert BatchEncoding instance to dictionary
train_label
)).shuffle(1000).batch(BATCH_SIZE).prefetch(1)
validation_dataset = tf.data.Dataset.from_tensor_slices((
dict(tokenize(validation_dat)),
validation_label
)).batch(BATCH_SIZE).prefetch(1)
# --------------------------------------------------------------------------------
# training
# --------------------------------------------------------------------------------
model = TFDistilBertForSequenceClassification.from_pretrained(
'distilbert-base-uncased',
num_labels=NUM_LABELS
)
optimizer = tf.keras.optimizers.Adam(learning_rate=LEARNING_RATE)
model.compile(
optimizer=optimizer,
loss=tf.keras.losses.CategoricalCrossentropy(from_logits=True),
)
However we are now stuck on this error when trying to get the pipeline from a notebook.
<ipython-input-3-be38b3dda75f> in <module>
7 default_bucket=default_bucket,
8 model_package_group_name=model_package_group_name,
----> 9 pipeline_name=pipeline_name,
10 )
11 # !conda list
~/topic-models-no-monitoring-p-rboparx6tdeg/sagemaker-topic-models-no-monitoring-p-rboparx6tdeg-modelbuild/pipelines/topic/pipeline.py in get_pipeline(region, sagemaker_project_arn, role, default_bucket, model_package_group_name, pipeline_name, base_job_prefix, processing_instance_type, training_instance_type)
228 "validation"
229 ].S3Output.S3Uri,
--> 230 content_type="text/csv",
231 ),
232 },
/opt/conda/lib/python3.7/site-packages/sagemaker/workflow/pipeline_context.py in wrapper(*args, **kwargs)
246 return self_instance.sagemaker_session.context
247
--> 248 return run_func(*args, **kwargs)
249
250 return wrapper
/opt/conda/lib/python3.7/site-packages/sagemaker/estimator.py in fit(self, inputs, wait, logs, job_name, experiment_config)
1059 self._prepare_for_training(job_name=job_name)
1060
-> 1061 self.latest_training_job = _TrainingJob.start_new(self, inputs, experiment_config)
1062 self.jobs.append(self.latest_training_job)
1063 if wait:
/opt/conda/lib/python3.7/site-packages/sagemaker/estimator.py in start_new(cls, estimator, inputs, experiment_config)
1956 train_args = cls._get_train_args(estimator, inputs, experiment_config)
1957
-> 1958 estimator.sagemaker_session.train(**train_args)
1959
1960 return cls(estimator.sagemaker_session, estimator._current_job_name)
/opt/conda/lib/python3.7/site-packages/sagemaker/session.py in train(self, input_mode, input_config, role, job_name, output_config, resource_config, vpc_config, hyperparameters, stop_condition, tags, metric_definitions, enable_network_isolation, image_uri, algorithm_arn, encrypt_inter_container_traffic, use_spot_instances, checkpoint_s3_uri, checkpoint_local_path, experiment_config, debugger_rule_configs, debugger_hook_config, tensorboard_output_config, enable_sagemaker_metrics, profiler_rule_configs, profiler_config, environment, retry_strategy)
611 self.sagemaker_client.create_training_job(**request)
612
--> 613 self._intercept_create_request(train_request, submit, self.train.__name__)
614
615 def _get_train_request( # noqa: C901
/opt/conda/lib/python3.7/site-packages/sagemaker/session.py in _intercept_create_request(self, request, create, func_name)
4303 func_name (str): the name of the function needed intercepting
4304 """
-> 4305 return create(request)
4306
4307
/opt/conda/lib/python3.7/site-packages/sagemaker/session.py in submit(request)
608 def submit(request):
609 LOGGER.info("Creating training-job with name: %s", job_name)
--> 610 LOGGER.debug("train request: %s", json.dumps(request, indent=4))
611 self.sagemaker_client.create_training_job(**request)
612
/opt/conda/lib/python3.7/json/__init__.py in dumps(obj, skipkeys, ensure_ascii, check_circular, allow_nan, cls, indent, separators, default, sort_keys, **kw)
236 check_circular=check_circular, allow_nan=allow_nan, indent=indent,
237 separators=separators, default=default, sort_keys=sort_keys,
--> 238 **kw).encode(obj)
239
240
/opt/conda/lib/python3.7/json/encoder.py in encode(self, o)
199 chunks = self.iterencode(o, _one_shot=True)
200 if not isinstance(chunks, (list, tuple)):
--> 201 chunks = list(chunks)
202 return ''.join(chunks)
203
/opt/conda/lib/python3.7/json/encoder.py in _iterencode(o, _current_indent_level)
429 yield from _iterencode_list(o, _current_indent_level)
430 elif isinstance(o, dict):
--> 431 yield from _iterencode_dict(o, _current_indent_level)
432 else:
433 if markers is not None:
/opt/conda/lib/python3.7/json/encoder.py in _iterencode_dict(dct, _current_indent_level)
403 else:
404 chunks = _iterencode(value, _current_indent_level)
--> 405 yield from chunks
406 if newline_indent is not None:
407 _current_indent_level -= 1
/opt/conda/lib/python3.7/json/encoder.py in _iterencode_dict(dct, _current_indent_level)
403 else:
404 chunks = _iterencode(value, _current_indent_level)
--> 405 yield from chunks
406 if newline_indent is not None:
407 _current_indent_level -= 1
/opt/conda/lib/python3.7/json/encoder.py in _iterencode(o, _current_indent_level)
436 raise ValueError("Circular reference detected")
437 markers[markerid] = o
--> 438 o = _default(o)
439 yield from _iterencode(o, _current_indent_level)
440 if markers is not None:
/opt/conda/lib/python3.7/json/encoder.py in default(self, o)
177
178 """
--> 179 raise TypeError(f'Object of type {o.__class__.__name__} '
180 f'is not JSON serializable')
181
TypeError: Object of type ParameterInteger is not JSON serializable
Which is telling us that some aspect of the training job (?) is not serializable, and it's not clear how to debug further.
What would be enormously helpful is project templates for sagemaker studio showing the use of all the Processors, e.g. HuggingFace, TensorFlow and so on, but failing that we'd be most grateful is anyone could point us to documentation on what the requirements are for the train.py
file that we need to specifiy for the HuggingFace Estimator.
many thanks in advance
hi @Alex_T, thanks so much for your detailed response - that's been really helpful. Sorry for not thanking you sooner - we had to move away from Studio to get something else working as a stop gap, but now we're looking at it again.
Thanks to your help and the post from @Heiko we've moved forward to the point of starting to get a train.py working. We're now stuck on a new point regarding saving DataSets. For XGB we had been saving the training, test, and validation data into csvs. All the train.py examples work with load_from_disk operations that expect a DataSet format. In our preprocess.py we can't load tensorflow or DataSet from datasets, so what would be of great help is an example of a preprocess.py script that worked with DataSets.
The particular difficulty we're finding with SageMaker Studio is working out exactly which versions of which libraries are running within these scripts ...