반응형
velog 마이그레이션
원글 : [Airflow] 8. Airflow plugin 생성하기(2022년 9월 11일)
airflow 2.0에서는 plugin system에 operator 를 추가해야한다.
존재하는 operator를 확장, 수정해서
Airflow 장점이 operator, views, hooks 등 모든 것을 커스텀할 수 있다는 점이다.
생성 방법
AirflowPlugin 클래스를상속하는 View, Operator,hook... 클래스를 생성한다
plugin 이름 등 속성을 설정한다
생성 후 Lazy Loaded 이기때문에 airflow 인스턴스를 재시작해야한다.
- web애서 elastic connection 생성
- plugins/hooks/elastic/elastic_hook.py 생성
from airflow.plugins_manager import AirflowPlugin
# 모든 hook은 BaseHook 상속해서 메서드, property 등
from airflow.hooks.base import BaseHook
from elasticsearch import Elasticsearch
class ElasticHook(BaseHook):
def __init__(self, conn_id='elastic_default', *args, **kwargs):
super().__init__(*args, **kwargs)
conn = self.get_connection(conn_id)
conn_config = {}
hosts = []
if conn.host:
hosts = conn.host.split(',')
if conn.port:
conn_config['port'] = int(conn.port)
if conn.login:
conn_config['http_auth'] = (conn.login, conn.password)
self.es = Elasticsearch(hosts, **conn_config)
self.index = conn.schema
def info(self):
return self.es.info()
def set_index(self, index):
self.index = index
def add_doc(self, index, doc_type, doc):
self.set_index(index)
res = self.es.index(index=index, doc_type=doc_type, doc=doc)
return res
class AirflowElasticPlugin(AirflowPlugin):
name = 'elastic'
hooks = [ElasticHook]
728x90
반응형
'Data > Data Engineering' 카테고리의 다른 글
[Kafka] Kafka-python 설치 (0) | 2024.03.06 |
---|---|
[Airflow] Airflow 설치하기(pip) (0) | 2024.03.06 |
[Airflow] 6-2. SubDagOperator -> TaskGroup 변환하기 (ImportError : SubDagOperator ) (0) | 2024.03.06 |
[Airflow] 6-1. TaskGroup으로 Task 그룹화하기 (0) | 2024.03.06 |
[Airflow] 5-3. 데이터 파이프라인 구성하기 : Executor (2) | 2024.03.06 |