본문 바로가기
AI 빅데이터/Google Cloud Platform

[GCP] Vertex AI 커스텀 모델 따라하기

by 마고커 2021. 6. 11.


21년 Google I/O에서 Machine Learning Platform인 Vertex AI를 출시했다. 기능상으로는 사실 AI Platform과 큰 차이를 모르겠다. 다만 약간 산만했던 AI Platform Pipeline이 좀 더 깔끔해졌고, API들이 고수준으로 바뀜에 따라 사용이 쉬워진 듯 하다. 차이점은 여기에 있지만, 사실 이제부터는 Vertex AI만 알면 될 것 같다. 예제는 미디움에 있는 아래 포스팅을 이용했다.

 

 

 

Serverless Machine Learning Pipelines with Vertex AI: An Introduction

There is this big problem for Data Scientists and ML Engineers that work at small companies (or small teams): a tremendous lack of human resources and time. Because of that, we have a very limited…

towardsdatascience.com

 

1) Kubeflow 설치 및 Import

 

Vertex AI는 Kubeflow와 TFX를 지원하고 있지만, 여기서는 Kubeflow를 사용한다. 기존에 KFP 사용은 왠 설정이 그렇게 많은지 이건 분명 바뀔거다 했는데, 역시 오랜만에 사용한 KFP는 크게 어렵진 않더라.

 

!pip3 install kfp --upgrade

from kfp.v2 import dsl
from kfp.v2.dsl import (Artifact,
                        Dataset,
                        Input,
                        Model,
                        Output,
                        Metrics,
                        ClassificationMetrics,
                        component)

from kfp.v2 import compiler

 

KFP를 Install하고 나서, Restart Kernel을 꼭 하라고 나오는데, 꼭 눌러주면 된다.

 

2) 컴퍼넌트 정의

 

이제 더 이상 Kubectl과 Yaml의 지옥에서 헤매지 않아도 된다. 쿠버네티스를 몰라도 컨테이너 기반으로 학습할 수 있게 됐다는 뜻이다(그렇다고 마냥 쉽다는 말은 아니지만). Kubernetes 컨테이너로 수행될 컴퍼넌트를 정의하기만 하면된다(이미 되고 있었을 수도, 1년만에 GCP 다시 본 듯). 여기서는 Scikit Learn의 breast cancer 데이터를 활용하는데, get_data, train_xgb_model, eval_model 세 가지를 정의하고 있다. 

 

<get_data>

@component(
    packages_to_install = [
        "pandas",
        "sklearn"
    ],
)
def get_data(
    dataset_train: Output[Dataset],
    dataset_test: Output[Dataset]
    
):
    
    from sklearn import datasets
    from sklearn.model_selection import train_test_split as tts
    import pandas as pd
    # import some data to play with
    
    data_raw = datasets.load_breast_cancer()
    data = pd.DataFrame(data_raw.data, columns=data_raw.feature_names)
    data["target"] = data_raw.target
    
    train, test = tts(data, test_size=0.3)
    
    train.to_csv(dataset_train.path)
    test.to_csv(dataset_test.path)

 

KFP의 함수기반 Component 제작은 여기를 참고한다. 컨테이너로 수행될 컴퍼넌트 만들기가 껌이 됐다. 설치할 패키지를 지정하고, 함수의 출력과 입력을 정의해 주면된다. get_data의 입력은 sklearn을 통해 받아오고 있으므로 필요 없고, train, test 데이터를 Output으로 지정하고 있다. 실제 운영 환경에서 사용한다면 IN Parameter를 지정해 Path를 지정해 주면 될 듯.

 

<train_xgb_model>

@component(
    packages_to_install = [
        "pandas",
        "sklearn",
        "xgboost"
    ],
)
def train_xgb_model(
    dataset: Input[Dataset],
    model_artifact: Output[Model]
):
    
    from xgboost import XGBClassifier
    import pandas as pd
    
    data = pd.read_csv(dataset.path)

    model = XGBClassifier(
        objective="binary:logistic"
    )
    model.fit(
        data.drop(columns=["target"]),
        data.target,
    )

    score = model.score(
        data.drop(columns=["target"]),
        data.target,
    )

    model_artifact.metadata["train_score"] = float(score)
    model_artifact.metadata["framework"] = "XGBoost"
    
    model.save_model(model_artifact.path)

 

수행할 모델은 유방함 환자를 XGBoost로 분류한다. 모델 자체는 쉬우므로 파라미터만 보면 get_data에서 넘겨올 train data를 IN 파라미터로 받을 생각이며 생성한 모델을 Output으로 내보낸다. Vertex AI의 또다른 편리한점은 중간 생성결과를 Data Lineage라는 이름으로 남겨 둔다는 것이다. 중간 결과가 잘못됐다면 해당 데이터나 모델을 살펴보면 된다. 이건 마지막에 다시 볼 것이다.

 

<eval_model>

@component(
    packages_to_install = [
        "pandas",
        "sklearn",
        "xgboost"
    ],
)
def eval_model(
    test_set: Input[Dataset],
    xgb_model: Input[Model],
    metrics: Output[ClassificationMetrics],
    smetrics: Output[Metrics]
):
    from xgboost import XGBClassifier
    import pandas as pd
    
    data = pd.read_csv(test_set.path)
    model = XGBClassifier()
    model.load_model(xgb_model.path)
    
    score = model.score(
        data.drop(columns=["target"]),
        data.target,
    )
    
    from sklearn.metrics import roc_curve
    y_scores =  model.predict_proba(data.drop(columns=["target"]))[:, 1]
    fpr, tpr, thresholds = roc_curve(
         y_true=data.target.to_numpy(), y_score=y_scores, pos_label=True
    )
    metrics.log_roc_curve(fpr.tolist(), tpr.tolist(), thresholds.tolist())
    
    from sklearn.metrics import confusion_matrix
    y_pred = model.predict(data.drop(columns=["target"]))
    
    metrics.log_confusion_matrix(
       ["False", "True"],
       confusion_matrix(
           data.target, y_pred
       ).tolist(),  # .tolist() to convert np array to list.
    )
    
    xgb_model.metadata["test_score"] = float(score)
    smetrics.log_metric("score", float(score))

 

요건 Evaluation인데 따로 설명하지 않는다. 여기서 평가된 결과들이 GCP Dashboard 상에서 나타나게 된다. 

 

 

 

3) 파이프라인 연결 

 

컴퍼넌트들을 다 만들었으니, 이제 연결만 해 주면 된다. KFP의 초창기 DSL(Domain Specific Language)과 크게 바뀌지 않았다. 

 

@dsl.pipeline(
    # Default pipeline root. You can override it when submitting the pipeline.
    pipeline_root="gs://your-vertex-ai-bucket",
    # A name for the pipeline. Use to determine the pipeline Context.
    name="pipeline-test-1",
)
def pipeline():
    dataset_op = get_data()
    train_op = train_xgb_model(dataset_op.outputs["dataset_train"])
    eval_op = eval_model(
        test_set=dataset_op.outputs["dataset_test"],
        xgb_model=train_op.outputs["model_artifact"]
    )
    
compiler.Compiler().compile(pipeline_func=pipeline,
        package_path='xgb_pipe.json')

 

중간의 결과물을 저장할 bucket을 지정하고, Input과 Output을 연결해 준 뒤, pipeline을 compile해 준다. 결과물로 json 파일이 하나 생기는데, 이를 AI Platform의 create_run_from_job_spec 함수로 시작해 주면 끝난다. 그런데, 함수 실행에서 계속 PERMISSION Error가 나서, 콘솔에서 실행하기로 했다. 콘솔 실행을 위해 xgb_pipe.json을 다운 받았다. colab에서 다운 받으려면 아래와 같이 하면 된다. 

 

from google.colab import files
files.download('xgb_pipe.json') 

 

4) 파이프라인 실행

 

Vertex AI 파이프라인 메뉴에서 실행만들기를 선택하고, 다운 받은 xgp_pip.json을 업로드하면 파이프라인 이름까지는 지정된 이름으로 나타난다. 실행 이름을 하나 임의로 지정하고 계속을 누른다.

 

파이프라인 이름을 선택하면 지정했던 함수 컴퍼넌트가 실행되고 있는 것이 보여진다. 다 끝나면 아래와 같이 실행 상황이 실선으로 변경되고, 오른쪽에는 지정했던 metric으로 결과가 나타난다. 

 

여기서 끝은 아니고, 운영 환경에서 사용하다면 중간 중간 문제가 생길 수도 있고 해서, 단계별 결과물들을 찾아보거나 할 때도 있는데, 왼쪽 맨 아래 메타데이터 메뉴를 선택하면 된다. 

 

생성된 데이터 세트나 모델을 저정했던 gcp bucket에서 찾아 확인해 볼 수 있다.

 

그저 몇 가지 함수로 이루어진 듯 하지만, 내부적으로는 K8S 기반으로 Serverless로 운영된다. 하지만 유사한 형태를 왠만큼 큰 기업들은 모두 운영하고 있어, 결국 데이터 사이언티스트 몇명으로 구성된 스타트업이나 비 IT 기업에 유용할 것으로 생각된다. 그래도, 시스템 자원 빼 놓고는 솔루션 자체가 무료니, 꽤 매력적이라고 할 수는 있겠다. 



댓글