2024 동계 UST 인턴

[UST] 분자 구조 예측 모델 만들기 - 2

환성 2024. 2. 2. 15:56
728x90

 

전에 이어서 이번에는 smiles식 -> fingerprint로 변환하는 작업을 진행했다. 

이렇게 변환하는 이유는 단순 smiles식이 string형태로 되어있어 기계학습 모델의 입력으로는 사용하기가 어렵다. 그래서 이 smiles식을 fingerprint로 변환하여 이러한 구조 정보를 컴퓨터가 처리하기 쉬운 이진 벡터형태로 바꿔줘야 한다. 추가로 다양한 크기와 특징들을 지닌 화학 물질들을 일정한 길이의 벡터로 생성되어 통일된 차원의 특성을 가지게 되어 분석에 용이해지기 떄문이다.

train_data = pd.read_csv('train_data.csv', index_col=0)
test_data = pd.read_csv('test_data.csv', index_col=0)
train_data.head(5)

 

Fingerprint 변환

 

fingerprint로 변환하는 함수를 만들어 일괄적으로 변환을 진행하게 설계했다. 총 4개의 fingerprint로 변환을 해야하며 각각 avalon, morgan, topological, atompair이다. 이중 rdkit, chem, Allchem 패키지를 이용해 topological, atompair로 변환하였고 pyAvalonTools로 avalon로 변환하였다.

from rdkit import Chem
from rdkit.Chem import AllChem
from rdkit.Avalon import pyAvalonTools

# fingerprint로 변환 함수 정의
def generate_fingerprints(smiles, fp_type):
    mol = Chem.MolFromSmiles(smiles)
    if fp_type == 'avalon':
        fp = pyAvalonTools.GetAvalonFP(mol)
    elif fp_type == 'morgan':
        fp = AllChem.GetMorganFingerprintAsBitVect(mol, radius=2)
    elif fp_type == 'topological':
        fp = Chem.RDKFingerprint(mol)
    elif fp_type == 'atompair':
        fp = AllChem.GetHashedAtomPairFingerprintAsBitVect(mol)
    return fp.ToBitString() 

# 분자 지문 생성 및 저장
for fp_type in ['avalon', 'morgan', 'topological', 'atompair']:
    train_data[f'fingerprint_{fp_type}'] = train_data['PUBCHEM_EXT_DATASOURCE_SMILES'].apply(lambda x: generate_fingerprints(x, fp_type))
    test_data[f'fingerprint_{fp_type}'] = test_data['PUBCHEM_EXT_DATASOURCE_SMILES'].apply(lambda x: generate_fingerprints(x, fp_type))

# 결과 
print(train_data.head())
print(test_data.head())

 

 

이후 각각 csv파일로 다시 저장해주었다.

train_data.to_csv('train_data.csv')
test_data.to_csv('test_data.csv')

 

 

모델 훈련

 

총 4개의 모델을 사용했으며 randomforest, svr, xgboost, neural network로 진행하였다.

원래는 4개의 모델을 한 셀 안에서 일괄적으로 진행하려 했지만 시도횟수가 10000이나 되다보니 시간을 오래 잡아먹는다는 문제가 있었다. 

이를 해결하기 위해 jupyterlab환경 대신에 안전성평가연구소의 서버를 빌리고 각각 모델을 따로 돌리는 방식으로 진행하였다. 

코드를 살펴보자면 convert_fingerprint는 이전에 fingerprint는 이진 데이터로 변환한 결과를 모델을 통해 학습을 시켜야 하므로 2d array형태로 변환하는 함수이다.

tune_random_forest함수를 통해 parameter를 조정하고 model을 학습시키는 함수이다.

optimize_and_train함수는 학습시킨 결과를 optuna를 통해 최적의 파라미터를 찾은 뒤 이를 pickle파일에 파라미터 형태로 저장하는 함수이다.

최종적으로 각 fingerprint에 대해서 모델을 학습시켜 총 10000 x 4 = 40000번의 시도횟수가 생긴다.

또한 randomforest뿐만 아니라 svr, xgboost, neuralnetwork등도 있어 시간이 꽤 걸리는 작업이었다

.

# fingerprint 변환 함수 정의
def convert_fingerprint(data, fingerprint_type):
    return np.array([list(map(int, list(fp))) for fp in data[fingerprint_type]])

# RandomForest 하이퍼파라미터 튜닝
def tune_random_forest(trial, X_train, y_train):
    param = {
        'n_estimators': trial.suggest_int('n_estimators', 1, 2000),
        'max_depth': trial.suggest_int('max_depth', 1, 30),
        'min_samples_split': trial.suggest_int('min_samples_split', 2, 20),
        'min_samples_leaf': trial.suggest_int('min_samples_leaf', 2, 20),
        'max_features': trial.suggest_categorical('max_features', ['sqrt', 'log2'])
    }
    model = RandomForestRegressor(**param, random_state=42)
    model.fit(X_train, y_train)
    return cross_val_score(model, X_train, y_train, cv=KFold(n_splits=3, shuffle=True), scoring='r2').mean()


# 모델 최적화 및 결과 저장 함수
def optimize_and_train(model_optimize_func, model_name, X_train, y_train, X_test, y_test, fingerprint_type):
    study = optuna.create_study(direction='maximize')
    study.optimize(lambda trial: model_optimize_func(trial, X_train, y_train), n_trials=10000)
    best_params = study.best_trial.params
    best_model = RandomForestRegressor(**best_params)
    best_model.fit(X_train, y_train)
    y_pred = best_model.predict(X_test)
    r2 = r2_score(y_test, y_pred)
    result = {model_name: {'best_params': best_params, 'best_r2': r2}}
    with open(f'{fingerprint_type}_{model_name}.pkl', 'wb') as file:
        pickle.dump(result, file)
    print(f"Fingerprint Type: {fingerprint_type}")
    print(f"Best Hyperparameters: {best_params}")
    print(f"R2 Score on Test Data: {r2}\n")

    # y값 설정
y_train = np.log10(train_data['Dose_MRDD_mmol'].values)
y_test = np.log10(test_data['Dose_MRDD_mmol'].values)

# 각 fingerprint 유형에 대한 모델 최적화 및 훈련
fingerprint_types = ['fingerprint_atompair', 'fingerprint_avalon', 'fingerprint_morgan', 'fingerprint_topological']
model_name = 'RandomForest'

for fingerprint_type in fingerprint_types:
    X_train = convert_fingerprint(train_data, fingerprint_type)
    X_test = convert_fingerprint(test_data, fingerprint_type)
    optimize_and_train(tune_random_forest, model_name, X_train, y_train, X_test, y_test, fingerprint_type)

 

 

해당 사진은 서버에서 10000번 돌리는 작업사진이다.

n_trials=10000

 

 

이렇게 저장된 4개의 파라미터의 피클파일들을 하나로 합치는 과정을 진행하였다.

합친 피클파일을 key : value쌍인 dictionary 형태로 저장하여 key가 fingerprint, value가 모델이 될수 있게 저장하였다.
이렇게 만들어진 각 모델별 파일 중 가장 결정계수가 높은 fingerprint를 사용하여 예측을 수행하도록 했다.

import pickle

# 파일 불러오기 함수
def multiple_load(file_paths):
    results = {}
    for file_path in file_paths:
        with open(file_path, 'rb') as file:
            results[file_path] = pickle.load(file)
    return results

# 경로 리스트
file_paths = ['fingerprint_atompair_RandomForest.pkl', 'fingerprint_avalon_RandomForest.pkl', 'fingerprint_morgan_RandomForest.pkl','fingerprint_topological_RandomForest.pkl']


results = multiple_load(file_paths)

# 결과 출력
for file_path, result in results.items():
    print(f"{file_path}:")
    print(result)
    print("\n")
import joblib

# 파일 딕셔너리
files = {
    "fingerprint_rf_avalon": "fingerprint_avalon_RandomForest.pkl",
    "fingerprint_rf_morgan": "fingerprint_morgan_RandomForest.pkl",
    "fingerprint_rf_topological": "fingerprint_topological_RandomForest.pkl",
    "fingerprint_rf_atompair": "fingerprint_atompair_RandomForest.pkl"
}

# 각 파일 로드 및 딕셔너리에 저장
combined_files = {}
for fingerprint_type, file_path in files.items():
    model = joblib.load(file_path)
    combined_files[fingerprint_type] = model

# 새로운 파일로 합친 모델 저장
combined_file_path = 'dict_rf_model.pkl'
joblib.dump(combined_files, combined_file_path)

combined_file_path
combined_file_path = 'dict_rf_model.pkl'
combined_models = joblib.load(combined_file_path)

# 재학습
trained_models = {}
for model_name, model_param in combined_models.items():
    # best params로 재학습
    model = RandomForestRegressor(**model_param['RandomForest']['best_params'])
    model.fit(X_train, y_train)  
    trained_models[model_name] = model

# 학습된 모델 파일로 저장
trained_models_file_path = 'trained_rf_models.pkl'
joblib.dump(trained_models, trained_models_file_path, protocol=pickle.HIGHEST_PROTOCOL)
# 전체적인 구조 : dict 안에 모델이 들어있는 형식
# 학습된 모델 파일 로드
trained_models = joblib.load('trained_rf_models.pkl')

# data type확인(dict)
trained_model_type = type(trained_models)

# data 내용 확인
trained_models_content = {model_name: type(model) for model_name, model in trained_models.items()}

# 출력
print(trained_model_type)
print(trained_models_content)

 

 

 

SMILES식을 입력 받아 Dose_MRDD_mmol에 대한 예측 값을 알려주는 코드부분이다. SMILES식을 ,로 구분하여 여러 개를 받을 수 있도록 했고 결정계수가 가장 높았던 fingerprint은 avalon 그중 svr모델이었다. 

# 입력받은 smiles식 -> fingerprint로 변환
def smiles_to_fingerprint(smiles, fingerprint_type):
    mol = Chem.MolFromSmiles(smiles)
    if fingerprint_type == "fingerprint_atompair":
        fp = rdMolDescriptors.GetHashedAtomPairFingerprintAsBitVect(mol, nBits=2048)
    elif fingerprint_type == "fingerprint_morgan":
        fp = rdMolDescriptors.GetMorganFingerprintAsBitVect(mol, radius=2, nBits=2048)
    elif fingerprint_type == "fingerprint_avalon":
        fp = pyAvalonTools.GetAvalonFP(mol,nBits=2048)
    elif fingerprint_type == "fingerprint_topological":
        fp = Chem.RDKFingerprint(mol, nBits=2048)
    return np.array(list(map(int, fp)))

# 예측 함수(예측을 위해 2D array로 변환)
def predict_smiles(smiles_list, model, fingerprint_type):
    predictions = []
    for smiles in smiles_list:
        fp = smiles_to_fingerprint(smiles, fingerprint_type)
        prediction = model.predict(fp.reshape(1, -1))
        predictions.append(prediction[0])
    return predictions

# 모델 로드
models_info = joblib.load('trained_svr_models.pkl')

# pkl파일중 결정계수가 제일 높은걸 선택
best_model_name = 'svr_morgan' 
best_model = models_info[best_model_name]
best_fingerprint_type = 'fingerprint_avalon'  

# SMIlES 입력 받기
input_smiles = input("SMILES 입력:")
smiles_list = input_smiles.split(',')

# 예측 
predictions = predict_smiles(smiles_list, best_model, best_fingerprint_type)

# 결과 
for smiles, prediction in zip(smiles_list, predictions):
    print(f"\n입력된 SMILES: {smiles}") 
    print(f"예측된 Dose_MRDD_mmol: {prediction}")
    print(f"사용된 Fingerprint: {best_fingerprint_type}")
    print(f"사용된 모델: {best_model_name}")

예측 결과