본문 바로가기
AI 빅데이터/Google Cloud Platform

[GCP] TFX를 Vertex AI에 활용하기

by 마고커 2021. 7. 12.


지난번에는 Kubeflow를 활용해 Vertex AI를 구성하는 것을 확인하였다.

 

 

[GCP] Vertex AI 커스텀 모델 따라하기

21년 Google I/O에서 Machine Learning Platform인 Vertex AI를 출시했다. 기능상으로는 사실 AI Platform과 큰 차이를 모르겠다. 다만 약간 산만했던 AI Platform Pipeline이 좀 더 깔끔해졌고, API들이 고수준으..

magoker.tistory.com

 

Pipeline만 구성해 주고 데이터 준비나 전처리, Serving 등을 직접한다면 위의 내용으로도 충분하겠지만, TFX의 Component들을 이용한다면 얻을 수 있는 이득들이 많이 있다. TFX 컴퍼넌트들의 구성은 아래와 같다.

 

  • TensorFlow Data Validation(TFDV): 데이터에서 이상치를 감지하기 위해 사용됩니다.
  • TensorFlow Transform(TFT): 데이터 전처리 및 특성 추출을 위해 사용됩니다.
  • TensorFlow Estimators and Keras: ML 모델 빌드 및 학습을 위해 사용됩니다.
  • TensorFlow Model Analysis(TFMA): ML 모델 평가 및 분석을 위해 사용됩니다.
  • TensorFlow Serving(TFServing): ML 모델을 REST 및 gRPC API로 제공하기 위해 사용됩니다.

 

TFX가 GCP에 탑재된다면 아래와 같은 구조를 갖게 된다. Cloud Dataflow는 Apache Beam을 Wrapping한 GCP의 Managed Service인데, Kubeflow Runner를 활용하는 API를 Call하게 되면 내부에서 Working하므로 소스상에서는 드러나지 않는다.

GCP의 TFX활용 관련한 상세한 아키텍쳐는 아래를 참고한다.

 

 

TFX, Kubeflow 파이프라인, Cloud Build를 사용하는 MLOps 아키텍처  |  클라우드 아키텍처 센터

이 문서에서는 TensorFlow Extended(TFX) 라이브러리를 사용하는 머신러닝(ML) 시스템의 전반적인 아키텍처를 설명합니다. 또한 Cloud Build 및 Kubeflow Pipelines를 사용하여 ML 시스템에 지속적 통합(CI), 지속

cloud.google.com

 

이를 테스트하기 위한 예제는 아래와 같다. 원본은 아래 참조.

 

 

정점 파이프라인을 위한 간단한 TFX 파이프라인  |  TensorFlow

이 노트북 기반 가이드에서는 간단한 TFX 파이프라인을 만들고 Google Cloud Vertex Pipelines를 사용하여 실행합니다. 이 노트북은 우리가 내장 된 TFX 파이프 라인을 기반으로 간단한 TFX 파이프 라인 튜

www.tensorflow.org

 

위의 예제에 데이터의 통계량을 확인하는 StatsticsGen 컴퍼넌트만을 추가하여 테스트 해 본다. 데이터 Anomaly나 구조를 파악하고 싶으면 SchemaGen과 ExampleValidator를 추가하면 되고, 모델 분석을 위해서는 TFMA(Tensorflow Model Analysis)를 추가해서 사용하면 된다.

 

1) 각종 디렉토리 설정

import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
import kfp
print('KFP version: {}'.format(kfp.__version__))
 
GOOGLE_CLOUD_PROJECT = 'xxx'     # <--- ENTER THIS
GOOGLE_CLOUD_REGION = 'us-central1'      # <--- ENTER THIS
GCS_BUCKET_NAME = 'xxxx_xxxx'          # <--- ENTER THIS
 
if not (GOOGLE_CLOUD_PROJECT and GOOGLE_CLOUD_REGION and GCS_BUCKET_NAME):
    from absl import logging
    logging.error('Please set all required parameters.')
 
PIPELINE_NAME = 'penguin-vertex-pipelines'
 
# Path to various pipeline artifact.
PIPELINE_ROOT = 'gs://{}/pipeline_root/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)
 
# Paths for users' Python module.
MODULE_ROOT = 'gs://{}/pipeline_module/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)
 
# Paths for input data.
DATA_ROOT = 'gs://{}/data/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)
 
# This is the path where your model will be pushed for serving.
SERVING_MODEL_DIR = 'gs://{}/serving_model/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

# 메타데이터 저장
SCHEMA_METADATA_PATH = 'gs://{}/metadata/{}/metadata/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME, 'metadata.db')
 
print('PIPELINE_ROOT: {}, METADATA: {}'.format(PIPELINE_ROOT, SCHEMA_METADATA_PATH))

 

2) 모델링 

 

Trainig에 활용할 모델을 정의하는 부분이다. penguin_train.py라는 이름으로 간단한 DNN 모델을 Keras로 작성할 것이다. (pipeline 생성할 때도 파일 이름이 필요하니, 변수에 지정)

_trainer_module_file = 'penguin_trainer.py'

Training을 위한 TFX의 라이브러리는 관심 사항이 아니므로, 설명 없이 넘어간다. 참고로 TFX의 데이터 전달 형식은 Protocol Buffer 형식이어서 좀 복잡하게 구성되었다. 뒤의 파이프라인 구성에서 CSVExampleGen 컴퍼넌트가 CSV 파일을 입력받아 TFRecord 형식의 파일을 리턴하게 되므로 크게 문제되지는 않는다.

 

%%writefile {_trainer_module_file}
# 아래의 내용을 파일에 담기 위한 지시자(Decorator)
 
# Copied from https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple
 
from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils
 
 
from tfx import v1 as tfx
from tfx_bsl.public import tfxio
 
from tensorflow_metadata.proto.v0 import schema_pb2
 
_FEATURE_KEYS = [
    'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
]
_LABEL_KEY = 'species'
 
_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10
 
# Since we're not generating or creating a schema, we will instead create
# a feature spec.  Since there are a fairly small number of features this is
# manageable for this dataset.
_FEATURE_SPEC = {
    **{
        feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
           for feature in _FEATURE_KEYS
       },
    _LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}
 
def _input_fn(file_pattern: List[str],
              data_accessor: tfx.components.DataAccessor,
              schema: schema_pb2.Schema,
              batch_size: int) -> tf.data.Dataset:
  """Generates features and label for training.
 
  Args:
    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    schema: schema of the input data.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch
 
  Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  """
  return data_accessor.tf_dataset_factory(
      file_pattern,
      tfxio.TensorFlowDatasetOptions(
          batch_size=batch_size, label_key=_LABEL_KEY),
      schema=schema).repeat()
 
 
def _make_keras_model() -> tf.keras.Model:
  """Creates a DNN Keras model for classifying penguin data.
 
  Returns:
    A Keras Model.
  """
  # The model below is built with Functional API, please refer to
  # https://www.tensorflow.org/guide/keras/overview for all API options.
  inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
  d = keras.layers.concatenate(inputs)
  for _ in range(2):
    d = keras.layers.Dense(8, activation='relu')(d)
  outputs = keras.layers.Dense(3)(d)
 
  model = keras.Model(inputs=inputs, outputs=outputs)
  model.compile(
      optimizer=keras.optimizers.Adam(1e-2),
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[keras.metrics.SparseCategoricalAccuracy()])
 
  model.summary(print_fn=logging.info)
  return model
 
 
# TFX Trainer는 run_fn을 호출한다.
def run_fn(fn_args: tfx.components.FnArgs):
  """Train the model based on given args.
 
  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """
 
  # This schema is usually either an output of SchemaGen or a manually-curated
  # version provided by pipeline author. A schema can also derived from TFT
  # graph if a Transform component is used. In the case when either is missing,
  # `schema_from_feature_spec` could be used to generate schema from very simple
  # feature_spec, but the schema returned would be very primitive.
  schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)
 
  train_dataset = _input_fn(
      fn_args.train_files,
      fn_args.data_accessor,
      schema,
      batch_size=_TRAIN_BATCH_SIZE)
  eval_dataset = _input_fn(
      fn_args.eval_files,
      fn_args.data_accessor,
      schema,
      batch_size=_EVAL_BATCH_SIZE)
 
  model = _make_keras_model()
  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)
 
  # The result of the training should be saved in `fn_args.serving_model_dir`
  # directory.
  model.save(fn_args.serving_model_dir, save_format='tf')

 

3) 파이프라인 구성

 

본 예제에서는 CSV를 입력 받아 TFRecord형식으로 만들고 이를 Train과 Eval로 나누는 CSVExampleGen, 각각의 데이터에 대해 통계량을 담고 있는 파일을 생성하는 StatisticsGen, 모델을 학습하는 Trainer, 생성된 모델을 Serving하기 위한 위치로 이동시키는 Pusher를 사용한다. 각자의 목적에 따라 TFX Component들을 추가하면 된다.

 

!gsutil cp {_trainer_module_file} {MODULE_ROOT}/
 
# Copied from https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple and
# slightly modified because we don't need `metadata_path` argument.
 
def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
                     module_file: str, serving_model_dir: str, metadata_path: str
                     ) -> tfx.dsl.Pipeline:
  """Creates a three component penguin pipeline with TFX."""
  # Brings data into the pipeline.
  
  # CSV 파일을 읽어들여 TFRecord 객체를 생성하고 Train/Eval로 Split
  example_gen = tfx.components.CsvExampleGen(input_base=data_root)
 
  # 통계량을 보기 위해 아래의 StatisticsGen을 추가함
  # 학습과는 관계 없는 내용으로 compute_eval_stats의 결과물은 나중에 따로 볼 것이다.
  compute_eval_stats = tfx.components.StatisticsGen(
      examples=example_gen.outputs['examples'])
 
  # 파라미터(Steps)를 넘겨 학습하는데, 데이터로는 CSVExampleGen의 결과를 활용.
  trainer = tfx.components.Trainer(
      module_file=module_file,
      examples=example_gen.outputs['examples'],
      train_args=tfx.proto.TrainArgs(num_steps=100),
      eval_args=tfx.proto.EvalArgs(num_steps=5))
 
  # Pushes the model to a filesystem destination.
  pusher = tfx.components.Pusher(
      model=trainer.outputs['model'],
      push_destination=tfx.proto.PushDestination(
          filesystem=tfx.proto.PushDestination.Filesystem(
              base_directory=serving_model_dir)))
 
  # 파이프라인을 생성한 Component들을 정의
  components = [
      example_gen,
      compute_eval_stats,
      trainer,
      pusher,
  ]
 
 # 파이프라인을 생성하여 Return. Metadata는 정의해 두었지만 본 예제에서는 활용 안함
  return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      metadata_connection_config=tfx.orchestration.metadata
      .sqlite_metadata_connection_config(metadata_path),
      components=components)

 

4) 파이프라인 생성과 실행

 

Vertex AI에서 Kubeflow Pipeline 실행할 때와 마찬가지로 Runner(여기서는 Kubeflow Pipeline)의 정의 파일을 생성하고 이를 기반으로 파이프라인을 생성한다.

 

import os

# 실행시킬 파이프라인 정의
PIPELINE_DEFINITION_FILE = PIPELINE_NAME + '_pipeline.json'
 
# Runner는 Kubeflow 
runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
    config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
    output_filename=PIPELINE_DEFINITION_FILE)

# 파이프라인을 실행하기 위한 Runner의 구성을 지정
# Output은 위에서 정의한 json 파일
_ = runner.run(
    _create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        data_root=DATA_ROOT,
        metadata_path=SCHEMA_METADATA_PATH,
        module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
        serving_model_dir=SERVING_MODEL_DIR))
 
from kfp.v2.google import client

# Vertex AI에서 정의된 파이프라인 구조대로 실행
pipelines_client = client.AIPlatformClient(
    project_id=GOOGLE_CLOUD_PROJECT,
    region=GOOGLE_CLOUD_REGION,
)
 
_ = pipelines_client.create_run_from_job_spec(PIPELINE_DEFINITION_FILE
                                             )

 

5) 파이프라인 실행 결과 확인

 

실행된 결과는 GCP Console의 Vertex AI -> 파이프라인 메뉴에서 확인할 수 있다. 실행 결과는 아래와 같다. 의도된 대로 CSVExampleGen의 결과가 데이터 통계량 확인(StatisticsGen)과 학습(Trainer)로 잘 전달된 것이 확인된다.

 

 

6) 데이터 통계량 확인

 

모델의 성능이 나빠지는 것은 크게 Concept Drift와 Data Drift가 발생하는 것에 기인한다. Concept Drift가 종속변수(Y)의 성격이 달라지는 것(이를테면 이전에는 사과였던 것이 망고로 바뀐다든가)이라면 Data Drift는 독립변수(X)의 분포가 달라져서 성능이 떨어지는 것이다. StatisticsGen은 Data Drift 변동을 확인할 수 있도록 Train과 Eval의 데이터 통계량을 저장한다. 결과는 Pipeline의 Root 아래 StatsticsGen/Stastics 디렉토리 아래 저장된다.

 

PB Binary 형태로 저장되기 때문에 이를 확인하는 데 애를 먹었는데, 사실 TFX에는 이를 위한 모듈이 이미 준비(tensorflow_data_validation)되어 있었다 -__-

 

import tensorflow_data_validation as tfdv
stats = tfdv.load_stats_binary("gs://pipeline실행위치/StatisticsGen_3802040840105230336/statistics/Split-train/FeatureStats.pb")
tfdv.visualize_statistics(stats)

 

아래와 같이 Interactive하게 분포량을 확인할 수 있다.

 



댓글