앞전에 작성했던 부분에 추가할 부분이 현재는 dag를 하나로 돌린다.
하지만 gmarket 페이지의 전체 베스트상품들 카테고리를 보면 2,30개 정도가 된다. 이 dag들을 한개식 잡으면 dag관리가 안될것 같아, 큰 카테고리씩 dag로 잡아서 진행
1. 전체 : all.dag
2. 신선식품: fresh.dag
3. 가공식품: frozen.dag
...
전체 카테고리제외 하위 카테고리들 존재 => dummyoperator 활용 하나의 dag에 여러개의 분기로 그룹화 실행되게 적용.
def category(group_name, sub_group_name, **context):
...
...
ti = context['ti']
# 한 dag에 여러개의 태스크를 돌려서, xcom 변수를 다르게 설정하기 위해 고유 task_id 변수 사용
category_task_id = ti.task_id
ti.xcom_push(key=f'url_{category_task_id}', value=url)
# 카테고리별 저장경로
ti.xcom_push(key=f'group_name_{category_task_id}', value=group_name)
ti.xcom_push(key=f'sub_group_name_{category_task_id}', value=sub_group_name)
def gmarket_crawl(category_task_id, **context):
if data:
try:
# 크롤링 한 데이터 xcom push 큰 데이터들 push 할때 주의
ti.xcom_push(key=f'crawl_data_{category_task_id}', value=data)
return "SUCCESS"
except Exception as e:
logging.error(f'크롤링 실패: {e}')
raise
else:
logging.error(f'크롤링할 데이터가 없습니다.')
raise ValueError("크롤링 결과가 없습니다.")
def hadop_store(category_task_id, crawling_task_id, **context): # 2개의 태스크에서 xcom 변수 받기 위한 taxk id
ti = context['ti']
# gmarket_crawling 태스크 xcom 변수
data = ti.xcom_pull(key=f'crawl_data_{category_task_id}', task_ids=crawling_task_id)
year = ti.xcom_pull(key=f'year_{category_task_id}', task_ids=crawling_task_id)
month = ti.xcom_pull(key=f'month_{category_task_id}', task_ids=crawling_task_id)
day = ti.xcom_pull(key=f'day_{category_task_id}', task_ids=crawling_task_id)
hour = ti.xcom_pull(key=f'hour_{category_task_id}', task_ids=crawling_task_id)
minute = ti.xcom_pull(key=f'minute_{category_task_id}', task_ids=crawling_task_id)
# category 태스크 xcom 변수
group_name = ti.xcom_pull(key=f'group_name_{category_task_id}', task_ids=category_task_id)
sub_group_name = ti.xcom_pull(key=f'sub_group_name_{category_task_id}', task_ids=category_task_id)
df = pd.DataFrame(data, columns=['rank', 'name', 'original_price', 'sale_price'])
table = pa.Table.from_pandas(df)
buffer = BytesIO()
pq.write_table(table, buffer, compression='snappy')
buffer.seek(0)
hdfs_host = 'http://192.168.56.114:9870'
hdfs_user = 'cha'
client_hdfs = InsecureClient(hdfs_host, user=hdfs_user)
# hadoop 저장 경로
hdfs_path = f'/gmarket/{group_name}/{sub_group_name}/{year}/{month}/{day}/{hour}/{minute}/{sub_group_name}.snappy.parquet'
with client_hdfs.write(hdfs_path, overwrite=True) as writer:
writer.write(buffer.getvalue())
각 카테고리 별로 xcom 변수를 주기 위하여 수정 하였습니다.
이렇게 진행한 이유는 카테고리 별로 url 주소(?groupCode={groupcategory}&subGroupCode={subcategory})가 다르기 때문에, 각각의 주소에 맞는 url 접속, data를 hadoop에 저장하기 위해서 진행하였습니다.
그리고 xcom에서 변수를 던져줄때, 기존에는 task_id하나에만 적용하면 됫지만, 지금은 분기되서 실행되게 만들거기 때문에 xcom값을 어떤 task에서 값을 받는지 명확하게 분리하기 위해서 변수로 task_id를 줘서 고유하게 진행하였습니다.
위처럼 진행 하고 airflow dag 하위에 pythonoperator를 사용하여 분기 그래프를 작성
start_task = DummyOperator(task_id='start_task', dag=dag)
end_task = DummyOperator(task_id='end_task', dag=dag)
return_xcom_1 = PythonOperator(
task_id = 'xcom_url_1',
python_callable = category,
op_args=['fresh', 'all'],
dag = dag
)
return_xcom_2 = PythonOperator(
task_id = 'xcom_url_2',
python_callable = category,
op_args=['fresh', 'fruit_vegetable'],
dag = dag
)
gmarket_crawling_1 = PythonOperator(
task_id = 'gmarket_crawling_1',
python_callable = gmarket_crawl,
# xcom 변수 받기 위해 이전 태스크 id 넘기기
op_kwargs={'category_task_id': 'xcom_url_1'},
dag = dag
)
gmarket_crawling_2 = PythonOperator(
task_id = 'gmarket_crawling_2',
python_callable = gmarket_crawl,
# xcom 변수 받기 위해 이전 태스크 id 넘기기
op_kwargs={'category_task_id': 'xcom_url_2'},
dag = dag
)
store_data_1 = PythonOperator(
task_id = 'store_data_1',
python_callable = hadop_store,
op_kwargs={'category_task_id': 'xcom_url_1',
'crawling_task_id': 'gmarket_crawling_1'},
dag = dag
)
store_data_2 = PythonOperator(
task_id = 'store_data_2',
python_callable = hadop_store,
op_kwargs={'category_task_id': 'xcom_url_2',
'crawling_task_id': 'gmarket_crawling_2'},
dag = dag
)
start_task >> return_xcom_1 >> gmarket_crawling_1 >> store_data_1 >> end_task
start_task >> return_xcom_2 >> gmarket_crawling_2 >> store_data_2 >> end_task
하나의 groupcategory에 여러개의 subgroupcategory를 돌리기 위하여 dummyoperator둬서 그룹화 진행
이렇게 각각의 카테고리별로 수집하여 적재가 완료하는 dag graph가 생성됩니다.
그런데..... operator로 직접 하나씩 작성하다 보니 너무 귀찮고, 코드도 너무 길어질꺼 같아서 깔끔하게 작성하며 변수만 지정해주면 알아서 진행하게 되었습니다.
# 크롤링 별 카테고리 정보 리스트
categories = [
("fresh_all_url", "fresh_all_crawling", "fresh_all_store_data", "fresh", "all"),
("fresh_frult_url", "fresh_frult_crawling", "fresh_frult_store_data", "fresh", "fruit_vegetable"),
("fresh_rice_url", "fresh_rice_crawling", "fresh_rice_store_data", "fresh", "rice_grains_nuts"),
("fresh_meat_url", "fresh_meat_crawling", "fresh_meat_store_data", "fresh", "meat"),
("fresh_seafood_url", "fresh_seafood_crawling", "fresh_seafood_store_data", "fresh", "seafood"),
("fresh_kimchi_url", "fresh_kimchi_crawling", "fresh_kimchi_store_data", "fresh", "kimchi_side"),
]
start_task = DummyOperator(task_id='start_task', dag=dag)
end_task = DummyOperator(task_id='end_task', dag=dag)
# 카테고리별 작업 정의, 태스크 연결
for xcom_id, crawl_id, store_id, group, subgroup in categories:
return_xcom = PythonOperator(
task_id=xcom_id,
python_callable=category,
op_args=[group, subgroup],
dag=dag
)
crawling = PythonOperator(
task_id=crawl_id,
python_callable=gmarket_crawl,
op_kwargs={'category_task_id': xcom_id},
dag=dag
)
store = PythonOperator(
task_id=store_id,
python_callable=hadop_store,
op_kwargs={
'category_task_id': xcom_id,
'crawling_task_id': crawl_id
},
dag=dag
)
# 태스크 간 의존성 연결
start_task >> return_xcom >> crawling >> store >> end_task
이렇게 진행하고 나니 코드도 깔끔해지고, 태스크도 문제 없이 잘돌았습니다!
끝인줄 알았지만.... 또 수정하고 싶은게 보였습니다.
task 그래프는 쪼금 지저분(?)하지만, 그래도 코드단에서는 깔끔하게 만들었지만,
1. 그룹카테고리별, 서브그룹카테고리별로 수집할게 많은데 categories 리스트에 수기로 다 작성하는게 비효율적이다....
2. 만약 새로운 카테고리가 생긴다면? 또 수정해줘야된다.
그래서 이런 문제가 생길것 같아, 이 부분도 주기적으로 수집해주는 dag를 하나 작성해야 되겠다고 생각했습니다.
이 부분은 적용하여 다음 블로그에 작성하겠습니다.
감사합니다.
'side-project' 카테고리의 다른 글
Gmarket 데이터 크롤링 ETL(1) - airflow, hadoop (0) | 2025.04.10 |
---|---|
Side-Project-kaggle data ETL(2) sink-connector (0) | 2025.04.10 |
Side-Project-kaggle data ETL(1) source-connector (0) | 2025.04.10 |