관리 메뉴

짜리몽땅 매거진

[금융 AI] AI 기반 금융 사기 탐지(5) - Networkx 라이브러리 기반 커뮤니티 탐지 본문

Data/Financial AI

[금융 AI] AI 기반 금융 사기 탐지(5) - Networkx 라이브러리 기반 커뮤니티 탐지

쿡국 2025. 5. 5. 08:35

이번 실습에서는 이커머스 거래 데이터를 활용하여 커뮤니티 탐지 방법을 사용해 사기 집단을 찾아보자. 온라인 거래에서 발생하는 사기는 종종 조직적으로 이루어지며, 이를 탐지하는 것은 매우 중요하다. 데이터는 Faker 라이브러리를 사용하여 실제 데이터와 유사하게 생성되며, 이 데이터에는 사용자 ID, 거래 시 사용한 IP, 기기 ID, 신용카드 번호, 전화번호 등이 포함된다.

 

이 데이터를 그래프 형태로 변환하여 분석하기 위해 Networkx 라이브러리를 사용한다. 또한 커뮤니티 탐지 알고리즘으로 사용하는 Louvain 알고리즘은 복잡한 네트워크 내에서 밀접하게 연결된 노드 그룹, 즉 커뮤니티를 찾아내는데 특히 유용하다. 이 알고리즘의 주요 특성은 '모듈성'이다. 모듈성은 네트워크가 얼마나 잘 분할되어 있는지 나타내며, 커뮤니티 내 연결은 매우 밀접하고, 커뮤니티 간 연결은 상대적으로 드물 때 높은 값을 갖는다.

 

이를 일상 적인 비유로 설명하자면, 모듈성을 최대화하는 것은 마치 대학 내 다양한 동아리 활동과 같다. 영화 동아리의 학생들은 함께 영화를 보고 토론하는 활동을 자주 하지만, 코딩 동아리의 학생들과는 그런 활동을 함께하지 않는다. Louvain 알고리즘은 이러한 영화 동아리나 코딩 동아리 같은 집단을 네트워크에서 식별해내는 역할을 한다. 각 동아리는 공통된 관심사를 바탕으로 서로 더 많이 연결되어 있으며, 다른 동아리의 멤버들과는 상대적으로 적게 연결되어 있다.

 

그러면 이제 본격적으로 위 라이브러리와 알고리즘을 적용해 실습을 진행해보자.


import pandas as pd
import numpy as np
from faker import Faker
from datetime import datetime, timedelta
import networkx as nx
from community import community_louvain
from itertools import groupby
from concurrent.futures import ThreadPoolExecutor, as_completed
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches

fake = Faker()
start_date = datetime(2024, 5, 1)
num_days = 15
num_users = 3000
transactions_per_user = 5  # 사기 사용자의 최대 거래 횟수

# 사용자 정보 초기화
users = pd.DataFrame({
    'user_id': range(num_users),
    'ip': [fake.ipv4() for _ in range(num_users)],
    'device_id': [fake.uuid4() for _ in range(num_users)],
    'phone_number': [fake.phone_number() for _ in range(num_users)],
    'credit_card': [fake.credit_card_number() for _ in range(num_users)],
})

# 사기 그룹 설정
fraud_group_size = 30
ip_fraud_users = set(np.random.choice(num_users, fraud_group_size, replace=False))
card_fraud_users = set(np.random.choice(num_users, fraud_group_size, replace=False))
device_phone_fraud_users = set(np.random.choice(num_users, fraud_group_size, replace=False))

# Define shared fraud attributes
shared_ips = [fake.ipv4() for _ in range(10)]
shared_cards = [fake.credit_card_number() for _ in range(10)]
shared_devices = [fake.uuid4() for _ in range(10)]
shared_phones = [fake.phone_number() for _ in range(10)]

# 거래 데이터 생성
transactions = []
for user_id in range(num_users):
    num_transactions = np.random.randint(1, transactions_per_user + 1) if user_id in ip_fraud_users or user_id in card_fraud_users or user_id in device_phone_fraud_users else 5
    for _ in range(num_transactions):
        day = np.random.choice(num_days)
        date = start_date + timedelta(days=day)
        ip = shared_ips[np.random.randint(len(shared_ips))] if user_id in ip_fraud_users else users.at[user_id, 'ip']
        card = shared_cards[np.random.randint(len(shared_cards))] if user_id in card_fraud_users else users.at[user_id, 'credit_card']
        device = shared_devices[np.random.randint(len(shared_devices))] if user_id in device_phone_fraud_users else users.at[user_id, 'device_id']
        phone = shared_phones[np.random.randint(len(shared_phones))] if user_id in device_phone_fraud_users else users.at[user_id, 'phone_number']
        is_fraud = user_id in ip_fraud_users or user_id in card_fraud_users or user_id in device_phone_fraud_users
        transactions.append({
            'transaction_id': fake.uuid4(),
            'user_id': user_id,
            'transaction_time': date,
            'ip': ip,
            'device_id': device,
            'phone_number': phone,
            'credit_card_number': card,
            'order_item': np.random.choice(['스마트폰', '노트북', '식품', '문구', '악세서리','가전제품', '의류', '화장품', '아동용품', '여행용품']),
            'amount': np.random.randint(1000, 300000),
            'is_fraud': is_fraud
        })

transactions_df = pd.DataFrame(transactions)

 

필요 라이브러리를 호출한 뒤, 예시 데이터를 생성한다.

 

def build_graph(data):
    G = nx.Graph()
    for _, row in data.iterrows():
        G.add_edge(row['user_id'], row['ip'], weight=1)
        G.add_edge(row['user_id'], row['device_id'], weight=1)
        G.add_edge(row['user_id'], row['credit_card_number'], weight=1)
        G.add_edge(row['user_id'], row['phone_number'], weight=1)
        
        # Set attributes for nodes
        G.nodes[row['user_id']]['node_type'] = 'user_id'
        G.nodes[row['ip']]['node_type'] = 'ip'
        G.nodes[row['device_id']]['node_type'] = 'device_id'
        G.nodes[row['credit_card_number']]['node_type'] = 'credit_card'
        G.nodes[row['phone_number']]['node_type'] = 'phone_number'
    return G

G = build_graph(transactions_df)

 

위 코드는 데이터프레임을 받아 네트워크 그래프를 생성하는 build_graph 함수를 정의하고 있으며, 각 사용자의 식별자, IP 주소, 기기 ID, 신용카드 번호, 전화번호 간의 관계를 노드와 에지로 표현한다. build_graph 함수는 Networkx 라이브러리의 Graph 객체를 사용하여 각 행의 데이터를 노드로 추가하고, 해당 사용자의 다른 속성들과 연결을 생성한다. 또한 각 노드에는 해당하는 속 성 유형을 나타내는 노드 타입(node_type)이 할당된다.

 

프로세스는 커뮤니티 탐지 알고리즘을 적용하기 위한 준비 단계로, 이를 통해 복잡한 데이터 구조를 시각적으로 분석하고 이해하는 도움을 있다. 그래프 생성 이후에는 Louvain 알고리즘 같은 커뮤니티 탐지 기법을 적용하여 데이터 잠재적인 패턴이나 구조를 식별할 있다.

 

partition = community_louvain.best_partition(G)
for node, comm_id in partition.items():
    G.nodes[node]['community'] = comm_id

all_communities = [list(group) for key, group in groupby(sorted(partition, key=partition.get), key=partition.get)]

 

다음은 코드는 Louvain 커뮤니티 탐지 알고리즘을 사용하여 네트워크 그래프 G에서 커뮤니티를 식별하고, 각 노드에 커뮤니티 ID를 할당한다. 이 과정은 다음 단계로 구성된다.

  • 커뮤니티 탐지 실행: community_louvain.best_partition(G) 함수를 사용하여 그래프 G의 노드들을 최적의 커뮤니티로 분할한다. 이 함수는 각 노드에 대해 가장 적합한 커뮤니티를 찾아내고, 그 결과를 partition 딕셔너리로 반환한다. partition 딕셔너리의 키는 노드 ID 이며, 값은 해당 노드가 속한 커뮤니티의 ID다.
  • 커뮤니티 ID 할당: 반환된 partition 딕셔너리를 순회하며 각 노드에 커뮤니티 ID를 그래 프의 노드 속성으로 저장한다. 이는 G.nodes[node]['community'] = comm_id를 통해 이루 어진다. 이렇게 함으로써 각 노드가 어떤 커뮤니티에 속하는지 쉽게 식별할 수 있다.
  • 커뮤니티별 노드 그룹화: itertools.groupby 함수를 사용하여 partition 딕셔너리를 커뮤 니티 ID에 따라 그룹화한다. 이 과정에서 Sorted(partition, Key=partition.get)을 통해 커뮤니티 ID에 따라 정렬된 노드 리스트를 생성하고, groupby로 이를 커뮤니티별로 그룹화 한다. 각 그룹은 해당 커뮤니티에 속하는 노드의 리스트가 되며, all.communities 리스트에 저장된다.

이렇게 하면 모든 노드가 어떤 커뮤니티에 속하는지를 나타내는 명확한 구조를 갖춘 커뮤니티별 노드 리스트를 얻을 있다. 네트워크 내의 사회적, 구조적 집단을 이해하고, 특정 패턴이나 행동을 분석하는 사용할 있다.

 

def analyze_community(G, nodes):
    subgraph = G.subgraph(nodes)
    comm_density = nx.density(subgraph)
    modularity_score = community_louvain.modularity(partition, G)
    user_count = sum(1 for node in nodes if G.nodes[node]['node_type'] == 'user_id')
    phone_count = sum(1 for node in nodes if G.nodes[node]['node_type'] == 'phone_number')
    card_count = sum(1 for node in nodes if G.nodes[node]['node_type'] == 'credit_card')
    return [comm_density, modularity_score, len(nodes), user_count, phone_count, card_count]

def analyze_communities_parallel(G, all_communities):
    community_info = []
    with ThreadPoolExecutor() as executor:
        futures = {executor.submit(analyze_community, G, nodes): comm_id for comm_id, nodes in enumerate(all_communities)}
        for future in as_completed(futures):
            comm_id = futures[future]
            result = future.result()
            community_info.append([comm_id] + result)
    return pd.DataFrame(community_info, columns=['Community ID', 'Density', 'Modularity', 'Size', 'User Count', 'Phone Count', 'Card Count'])

df_community = analyze_communities_parallel(G, all_communities)

df_community

 

analyze_community 함수는 주어진 커뮤니티의 노드들을 이용해 서브 그래프를 생성하고, 이 서브 그래프의 밀도와 모듈성을 계산한다. 밀도는 커뮤니티 내의 노드들이 얼마나 밀전하게 연결되어 있는지를 보여주며, 모듈성은 커뮤니티 구분의 질을 평가하는 데 사용된다. 이 함수는 또한 커뮤니티 내의 사용자 ID, 전화번호, 신용카드 번호 노드 수를 계산하여 커뮤니티의 구성을 분석한다. 각 커뮤니티의 특성을 리스트로 반환하고, 이는 분석 결과에 중요한 정보를 제공한다.

analyze_communities_parallel 함수는 커뮤니티 분석을 병렬로 실행하여 전체적인 처리 속도를 향상시킨다. 파이썬의 ThreadPoolExecutor를 사용해 각 커뮤니티 분석 작업을 동시에 처리하는데, 이 과정은 커뮤니티의 수가 많을 때 특히 유용하다. 모든 작업이 완료되면, 결과를 종합하여 각 커뮤니티의 밀도, 모듈성, 크기, 노드 유형별 수를 포함하는 데이터프레임을 생성한다. 이 데이터프레임은 커뮤니티의 구조적 특성을 파악하는 데 도움을 준다.

함수는 네트워크 내의 커뮤니티 구조를 분석하고 이해하는데 중요한 도구로, 특히 크고 복잡한 데이터 세트를 다룰 효과적이다. 커뮤니티의 세부적인 분석을 통해 네트워크의 동적인 특성을 파악할 있으며, 정보는 다양한 응용 분야에서 의사결정을 지원하는 사용할 있다.

 

import matplotlib.pyplot as plt

# 가장 큰 세 개의 커뮤니티 선택
top_communities = sorted(all_communities, key=len, reverse=True) [: 3]
filtered_nodes = [node for comm in top_communities for node in comm]
G_top = G. subgraph(filtered_nodes)

# 시각화
plt. figure(figsize=(12, 8))
pos = nx.spring_layout(G_top)# 노드 위치를 계산

for comm_id, nodes in enumerate(top_communities) :
    nx.draw_networkx_nodes(G_top, pos, nodelist=nodes, node_size=50,node_color=plt. cm.jet(comm_id / len(top_communities)))

nx. draw_networkx_edges(G_top, pos, alpha=0.5, edge_color='gray')
plt.title("Top 3 Largest Communities Visualization")
plt. show()

커뮤니티 데이터프레임에 산출된 커뮤니티에 기반해 가장 큰 세 개의 커뮤니티를 시각화해 살펴보자. 먼저, 전체 커뮤니티 리스트 all_communities에서 가장 큰 세 커뮤니티를 크기 순으로 정렬하여 선택한다. 이를 위해 커뮤니티의 길이를 기준으로 내림차순 정렬하고, 가장 큰 세 커뮤니티를 추출한다.

선택된 커뮤니티의 노드들만 포함하는 서브 그래프 G_top 생성한다. 서브 그래프를 이용해 Networkx 시각화 기능을 사용하여 커뮤니티의 노드를 서로 다른 색상으로 그린다. 색상은 plt.cm.jet 컬러맵을 사용하여 커뮤니티 ID 따라 다르게 표시하고, 커뮤니티의 노드는 그래프 상에서 서로 연결된 상태를 보여주는 에지로 표현한다. 시각화는 커뮤니티가 그래프 내에 어떻게 위치하고 있는지, 커뮤니티 간의 연결 상태를 직관적으로 이해하는 도움을 준다.

 

# 가장 큰 커뮤니티 추출
largest_community = max(all_communities, key=len)
G_largest = G. subgraph(largest_community)

plt. figure(figsize=(12, 8))
pos = nx.spring_layout(G_largest) # 노드의 위치 계산

# 노드 유형별로 색상 지정
node_types = {
    'user_id': 'red',
    'ip': 'blue',
    'device_id': 'green',
    'phone_number': 'yellow',
    'credit_card': 'purple'
}

# 노드 그리기
for node_type, color in node_types.items() : 
    nx.draw_networkx_nodes(G_largest, pos, nodelist=[n for n in G_largest if G_largest.nodes[n]['node_type'] == node_type], node_color=color, node_size=70, label=node_type)

nx. draw_networkx_edges(G_largest, pos, alpha=0.5)

# 범례 추가
patches = [mpatches.Patch(color=color, label=node_type) for node_type, color in node_types.items()]

plt. legend(handles=patches, title="Node Types", loc='upper left')
plt. title("the largest community group")
plt.axis('off')
plt.show()

위 코드는 네트워크 내에서 가장 큰 커뮤니티를 추출하고 시각화하는 과정이다. 먼저 all_communities 리스트에서 가장 큰 커뮤니티를 largest_community로 선택하고, 이 커뮤니티를 기반으로 서브 그래프 G_largest를 생성한다. 이 서브 그래프는 해당 커뮤니티의 노드만 포함하므로 커뮤니티의 내부 구조를 집중적으로 분석할 수 있다.