[ML] 유저이탈 예측하기
앱서비스를 운영할 때 유저는 피로를 느끼고 중도 이탈을 하거나 앱에 오랜기간 접속을 하지 않는 등 다양한 단계에서 이탈 유저가 발생한다. 사전에 이탈 유저를 예측하고, 어떤 원인으로 인해 이탈했는지 파악한다면 기업 입장에서 리스크를 최소화할 수 있다.
그래서 오늘은 머신러닝 모델을 활용해 이탈 유저와 일반 유저를 분류해보는 예측 모형을 개발해보고자 한다. 데이터셋은 유저 및 기능별 로그 데이터를 전처리한 데이터셋을 활용하였다.
* 실제 기업 데이터로, 수치는 비공개 처리합니다.
1. 라이브러리 호출 및 데이터 불러오기
import pandas as pd
import numpy as np
from datetime import datetime
from sklearn.preprocessing import LabelEncoder, MinMaxScaler
result = pd.read_csv('유저 이탈 모형 데이터.csv')
각 유저별 기능 내 활동 수치 데이터와 RFM Score 변수까지 병합한 데이터셋을 준비한다.
2. 변수 특징 별 인코딩 및 스케일링 진행
# userId, createdAt 제외
excluded_cols = ["userId", "createdAt"]
# 레이블 인코딩 대상
label_encoding_cols = ["r", "f", "m", "gender", "age"]
# Min-Max 스케일링 대상
scaling_cols = [col for col in result.columns if col not in excluded_cols + label_encoding_cols]
# 레이블 인코딩 수행 (1부터 시작)
label_encoder = LabelEncoder()
for col in label_encoding_cols:
result[col] = label_encoder.fit_transform(result[col]) + 1 # 1부터 시작하도록 변환
# Min-Max 스케일링 수행
scaler = MinMaxScaler()
result[scaling_cols] = scaler.fit_transform(result[scaling_cols])
result
RFM 변수나 성별, 연령대 등의 변수에 대해서는 인코딩을 진행하고, 나머지 연속형 변수에 대해서는 스케일링을 진행했다.
3. 타겟 변수 정의
# userId와 createdAt 기준으로 정렬
result = result.sort_values(by=["userId", "createdAt"])
# rolling을 사용해 마지막 30일간의 합계를 계산
result['last_30days_project_sum'] = result.groupby('userId')['project_daily_count'].transform(
lambda x: x.rolling(window=30, min_periods=1).sum()
)
result['last_30days_vote_sum'] = result.groupby('userId')['vote_daily_count'].transform(
lambda x: x.rolling(window=30, min_periods=1).sum()
)
# 두 활동량이 모두 0인지 확인
result['churn'] = (result['last_30days_project_sum'] == 0) & (result['last_30days_vote_sum'] == 0)
# churn 칼럼을 0과 1로 변환
result['churn'] = result['churn'].astype(int)
result[['userId', 'createdAt', 'last_30days_project_sum', 'last_30days_vote_sum', 'churn']]
유저이탈 예측을 진행할 땐 타겟변수인 churn 변수를 생성하는 과정이 필요하다. 이번 실습의 경우 3달간 프로젝트 기능과 커뮤니티 기능 참여 횟수가 모두 0인 유저를 이탈유저로 정의하였다.
4. 파생 변수 생성
# 활동 변화율
result['activity_change_rate'] = result['project_daily_count'] / (result['project_total_count'] + 1)
# 최근 활동 빈도
result['recent_30days_avg_project'] = result.groupby('userId')['project_daily_count'].transform(
lambda x: x.rolling(window=30, min_periods=1).mean()
)
# 누적 소비 비율
result['credit_ratio'] = result['total_credit_count'] / (result['total_scale_sum'] + 1)
result
이후 변수 간 계산으로 파생 변수를 생성해, 예측의 정확도를 높이고자 하였다.
5. 피처 엔지니어링
# 연속형 변수만 선택 (userId, createdAt, gender 등 제외)
continuous_cols = [
'credit_scale', 'age', 'project_consumedTime', 'project_daily_count',
'project_total_count', 'project_total_consumedTime', 'participateOnHotVote',
'vote_daily_count', 'vote_total_count', 'daily_participateOnHotVote',
'total_participateOnHotVote', 'fortune_daily_count', 'fortune_total_count',
'daily_noti_count', 'total_noti_count', 'daily_creit_count', 'total_credit_count',
'daily_scale_sum', 'total_scale_sum', 'last_30days_project_sum', 'last_30days_vote_sum',
'activity_change_rate', 'recent_30days_avg_project', 'credit_ratio'
]
# 연속형 변수만 추출
continuous_data = result[continuous_cols]
# 상관관계 행렬 계산
correlation_matrix = continuous_data.corr()
# 상관계수 절대값 기준으로 높은 상관관계 필터링 (예: 0.8 이상)
threshold = 0.8
high_correlation = (correlation_matrix.abs() >= threshold) & (correlation_matrix != 1)
# 상관관계가 높은 변수들 출력
correlated_features = {}
for col in high_correlation.columns:
high_corr_vars = list(high_correlation.index[high_correlation[col]])
if high_corr_vars:
correlated_features[col] = high_corr_vars
print("상관관계 높은 변수 그룹:")
for key, value in correlated_features.items():
print(f"{key}: {value}")
# 상관관계가 높은 변수 중 하나만 선택
# 선택 기준: 첫 번째 변수만 유지
selected_features = set()
for key, value in correlated_features.items():
if key not in selected_features:
selected_features.add(key)
# 최종 선택된 변수들
selected_data = continuous_data[list(selected_features)]
print("최종 선택된 변수들:")
print(selected_data.columns)
이후 지금까지의 변수들 간 상관분석을 진행해 상관관계가 높은 변수들만 남겨 예측의 정확도를 높이고자 하였다.
6. 이탈예측 모델링 진행(+교차검증, 하아퍼파라미터 튜닝, 성능 계산)
from sklearn.model_selection import train_test_split
# 독립 변수(X)와 종속 변수(y) 분리
X = final_dataset_with_non_continuous.drop(['churn', 'userId', 'createdAt'], axis=1) # 타겟 및 식별 변수 제거
y = final_dataset_with_non_continuous['churn'] # 타겟 변수
# 학습용과 테스트용 데이터 분리 (70% 학습, 30% 테스트)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42, stratify=y)
from sklearn.model_selection import GridSearchCV
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from xgboost import XGBClassifier
from lightgbm import LGBMClassifier
# 모델별 하이퍼파라미터 설정
param_grids = {
"Logistic Regression": {
'C': [0.01, 0.1, 1, 10, 100], # Regularization strength
'solver': ['liblinear', 'lbfgs'] # Optimization algorithm
},
"Random Forest": {
'n_estimators': [100, 200, 300], # Number of trees
'max_depth': [10, 20, None], # Maximum depth of trees
'min_samples_split': [2, 5, 10] # Minimum samples required to split an internal node
},
"XGBoost": {
'n_estimators': [100, 200, 300], # Number of trees
'max_depth': [3, 6, 10], # Maximum depth of trees
'learning_rate': [0.01, 0.1, 0.2] # Step size shrinkage
},
"LightGBM": {
'n_estimators': [100, 200, 300], # Number of boosting rounds
'max_depth': [10, 20, -1], # Maximum depth of trees (-1 means no limit)
'learning_rate': [0.01, 0.1, 0.2] # Step size shrinkage
}
}
# 모델 초기화
models = {
"Logistic Regression": LogisticRegression(random_state=42, max_iter=500),
"Random Forest": RandomForestClassifier(random_state=42),
"XGBoost": XGBClassifier(random_state=42, use_label_encoder=False, eval_metric='logloss'),
"LightGBM": LGBMClassifier(random_state=42)
}
# 결과 저장 딕셔너리
best_models = {}
# 각 모델에 대해 GridSearchCV 실행
for name, model in models.items():
print(f"=== {name} GridSearchCV ===")
# GridSearchCV 초기화
grid_search = GridSearchCV(
estimator=model,
param_grid=param_grids[name],
cv=3, # 3-Fold Cross Validation
scoring='roc_auc',
n_jobs=-1
)
# 하이퍼파라미터 튜닝 수행
grid_search.fit(X_train_scaled, y_train)
# 최적의 하이퍼파라미터와 성능 저장
best_models[name] = {
"best_estimator": grid_search.best_estimator_,
"best_params": grid_search.best_params_,
"best_score": grid_search.best_score_
}
# 결과 출력
print(f"Best Parameters for {name}: {grid_search.best_params_}")
print(f"Best AUC-ROC Score: {grid_search.best_score_:.4f}\n")
모델링 및 하이퍼파라미터 튜닝 결과 Random Forest 분류 모델의 AUC-ROC Score가 가장 높게 나와 해당 모델을 최종 모델로 선정하였다.
7. 변수중요도 출력
# 최적화된 Random Forest 모델 가져오기
best_rf_model = best_models["Random Forest"]["best_estimator"]
# 최종 데이터셋에 예측 결과 칼럼 추가
result['predicted_churn'] = best_rf_model.predict(X)
# 변수 중요도 계산
feature_importances = best_rf_model.feature_importances_
features = X.columns
# 변수 중요도를 데이터프레임으로 정리
importance_df = pd.DataFrame({
'Feature': features,
'Importance': feature_importances
}).sort_values(by='Importance', ascending=False)
# 변수 중요도 시각화
import matplotlib.pyplot as plt
plt.figure(figsize=(10, 6))
plt.barh(importance_df['Feature'], importance_df['Importance'])
plt.gca().invert_yaxis() # 상위 변수부터 보기 위해 y축 반전
plt.title('Feature Importance')
plt.xlabel('Importance')
plt.ylabel('Features')
plt.show()
변수중요도를 파악해 이탈유저 분류와 예측에 어떤 기능이 가장 큰 영향을 미치고 있는지 확인해 추후 전략의 근거로 반영할 수 있다.