Airflow에서 동적 워크 플로를 만드는 적절한 방법
문제
Airflow에서 작업 A가 완료 될 때까지 작업 B. *의 수를 알 수없는 워크 플로를 만들 수있는 방법이 있습니까? 나는 subdag를 보았지만 Dag 생성시 결정되어야하는 정적 작업 집합에서만 작동 할 수있는 것처럼 보입니다.
dag 트리거가 작동합니까? 그렇다면 예를 들어주세요.
작업 A가 완료 될 때까지 작업 C를 계산하는 데 필요한 작업 B의 수를 알 수없는 문제가 있습니다. 각 태스크 B. *는 계산하는 데 몇 시간이 걸리며 결합 할 수 없습니다.
|---> Task B.1 --|
|---> Task B.2 --|
Task A ------|---> Task B.3 --|-----> Task C
| .... |
|---> Task B.N --|
아이디어 # 1
차단 ExternalTaskSensor를 만들어야하고 모든 작업 B. *를 완료하는 데 2-24 시간이 걸리기 때문에이 솔루션이 마음에 들지 않습니다. 그래서 저는 이것이 실행 가능한 해결책이라고 생각하지 않습니다. 확실히 더 쉬운 방법이 있습니까? 아니면 Airflow가이를 위해 설계되지 않았습니까?
Dag 1
Task A -> TriggerDagRunOperator(Dag 2) -> ExternalTaskSensor(Dag 2, Task Dummy B) -> Task C
Dag 2 (Dynamically created DAG though python_callable in TriggerDagrunOperator)
|-- Task B.1 --|
|-- Task B.2 --|
Task Dummy A --|-- Task B.3 --|-----> Task Dummy B
| .... |
|-- Task B.N --|
편집 1 :
현재로서는이 질문에 대한 좋은 답변이 없습니다 . 나는 해결책을 찾고있는 몇몇 사람들로부터 연락을 받았습니다.
다음은 subdag없이 유사한 요청으로 수행 한 방법입니다.
먼저 원하는 값을 반환하는 메서드를 만듭니다.
def values_function():
return values
다음으로 작업을 동적으로 생성하는 메서드를 만듭니다.
def group(number, **kwargs):
#load the values if needed in the command you plan to execute
dyn_value = "{{ task_instance.xcom_pull(task_ids='push_func') }}"
return BashOperator(
task_id='JOB_NAME_{}'.format(number),
bash_command='script.sh {} {}'.format(dyn_value, number),
dag=dag)
그런 다음 결합하십시오.
push_func = PythonOperator(
task_id='push_func',
provide_context=True,
python_callable=values_function,
dag=dag)
complete = DummyOperator(
task_id='All_jobs_completed',
dag=dag)
for i in values_function():
push_func >> group(i) >> complete
이전 작업의 결과를 기반으로 워크 플로를 만드는 방법을 알아 냈습니다.
기본적으로 다음과 같은 두 개의 서브 데 그가 있습니다.
- Xcom은 먼저 실행되는 subdag에서 목록 (또는 나중에 동적 워크 플로를 만드는 데 필요한 항목)을 푸시합니다 (test1.py 참조
def return_list()). - 두 번째 subdag에 매개 변수로 기본 dag 개체를 전달하십시오.
- 이제 기본 dag 개체가있는 경우이를 사용하여 작업 인스턴스 목록을 가져올 수 있습니다. 작업 인스턴스 목록에서
parent_dag.get_task_instances(settings.Session, start_date=parent_dag.get_active_runs()[-1])[-1]) 를 사용하여 현재 실행중인 작업을 필터링 할 수 있으며 여기에 필터를 더 추가 할 수 있습니다. - 해당 작업 인스턴스를 사용하면 xcom pull을 사용하여 dag_id를 첫 번째 subdag 중 하나에 지정하여 필요한 값을 얻을 수 있습니다.
dag_id='%s.%s' % (parent_dag_name, 'test1') - 목록 / 값을 사용하여 작업을 동적으로 생성
이제 로컬 공기 흐름 설치에서 이것을 테스트했으며 제대로 작동합니다. 동시에 실행중인 dag의 인스턴스가 두 개 이상인 경우 xcom pull 부분에 문제가 있는지는 모르겠지만 xcom을 고유하게 식별하기 위해 고유 한 키 또는 이와 유사한 것을 사용할 수 있습니다. 당신이 원하는 가치. 현재 메인 dag의 특정 작업을 100 % 확실하게 얻기 위해 3. 단계를 최적화 할 수 있지만, 내 사용을 위해서는 충분히 잘 수행되므로 xcom_pull을 사용하려면 하나의 task_instance 개체 만 필요하다고 생각합니다.
또한 실수로 잘못된 값을 얻지 않도록 모든 실행 전에 첫 번째 subdag에 대한 xcom을 정리합니다.
나는 설명을 잘 못하기 때문에 다음 코드가 모든 것을 명확하게 해주길 바랍니다.
test1.py
from airflow.models import DAG
import logging
from airflow.operators.python_operator import PythonOperator
from airflow.operators.postgres_operator import PostgresOperator
log = logging.getLogger(__name__)
def test1(parent_dag_name, start_date, schedule_interval):
dag = DAG(
'%s.test1' % parent_dag_name,
schedule_interval=schedule_interval,
start_date=start_date,
)
def return_list():
return ['test1', 'test2']
list_extract_folder = PythonOperator(
task_id='list',
dag=dag,
python_callable=return_list
)
clean_xcoms = PostgresOperator(
task_id='clean_xcoms',
postgres_conn_id='airflow_db',
sql="delete from xcom where dag_id='{{ dag.dag_id }}'",
dag=dag)
clean_xcoms >> list_extract_folder
return dag
test2.py
from airflow.models import DAG, settings
import logging
from airflow.operators.dummy_operator import DummyOperator
log = logging.getLogger(__name__)
def test2(parent_dag_name, start_date, schedule_interval, parent_dag=None):
dag = DAG(
'%s.test2' % parent_dag_name,
schedule_interval=schedule_interval,
start_date=start_date
)
if len(parent_dag.get_active_runs()) > 0:
test_list = parent_dag.get_task_instances(settings.Session, start_date=parent_dag.get_active_runs()[-1])[-1].xcom_pull(
dag_id='%s.%s' % (parent_dag_name, 'test1'),
task_ids='list')
if test_list:
for i in test_list:
test = DummyOperator(
task_id=i,
dag=dag
)
return dag
및 주요 워크 플로 :
test.py
from datetime import datetime
from airflow import DAG
from airflow.operators.subdag_operator import SubDagOperator
from subdags.test1 import test1
from subdags.test2 import test2
DAG_NAME = 'test-dag'
dag = DAG(DAG_NAME,
description='Test workflow',
catchup=False,
schedule_interval='0 0 * * *',
start_date=datetime(2018, 8, 24))
test1 = SubDagOperator(
subdag=test1(DAG_NAME,
dag.start_date,
dag.schedule_interval),
task_id='test1',
dag=dag
)
test2 = SubDagOperator(
subdag=test2(DAG_NAME,
dag.start_date,
dag.schedule_interval,
parent_dag=dag),
task_id='test2',
dag=dag
)
test1 >> test2
OA : "Airflow에서 작업 A가 완료 될 때까지 작업 B. *의 수를 알 수없는 워크 플로를 만들 수있는 방법이 있습니까?"
짧은 대답은 아니오입니다. Airflow는 실행을 시작하기 전에 DAG 흐름을 빌드합니다.
그것은 우리가 그런 필요가 없다는 간단한 결론에 도달했다고 말했습니다. 일부 작업을 병렬화하려면 처리 할 항목 수가 아니라 사용 가능한 리소스를 평가해야합니다.
우리는 이렇게했습니다. 작업을 분할 할 고정 된 수의 작업 (예 : 10 개)을 동적으로 생성합니다. 예를 들어 100 개의 파일을 처리해야하는 경우 각 작업은 파일 중 10 개를 처리합니다. 오늘 나중에 코드를 게시하겠습니다.
최신 정보
다음은 코드입니다. 지연되어 죄송합니다.
from datetime import datetime, timedelta
import airflow
from airflow.operators.dummy_operator import DummyOperator
args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 1, 8),
'email': ['myemail@gmail.com'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(seconds=5)
}
dag = airflow.DAG(
'parallel_tasks_v1',
schedule_interval="@daily",
catchup=False,
default_args=args)
# You can read this from variables
parallel_tasks_total_number = 10
start_task = DummyOperator(
task_id='start_task',
dag=dag
)
# Creates the tasks dynamically.
# Each one will elaborate one chunk of data.
def create_dynamic_task(current_task_number):
return DummyOperator(
provide_context=True,
task_id='parallel_task_' + str(current_task_number),
python_callable=parallelTask,
# your task will take as input the total number and the current number to elaborate a chunk of total elements
op_args=[current_task_number, int(parallel_tasks_total_number)],
dag=dag)
end = DummyOperator(
task_id='end',
dag=dag)
for page in range(int(parallel_tasks_total_number)):
created_task = create_dynamic_task(page)
start_task >> created_task
created_task >> end
코드 설명 :
여기에는 단일 시작 작업과 단일 종료 작업 (모두 더미)이 있습니다.
그런 다음 for 루프가있는 시작 작업에서 동일한 Python 호출 가능으로 10 개의 작업을 만듭니다. 태스크는 create_dynamic_task 함수에서 생성됩니다.
각 파이썬 콜 러블에 병렬 작업의 총 수와 현재 작업 인덱스를 인수로 전달합니다.
1000 개의 항목을 정교화한다고 가정합니다. 첫 번째 작업은 10 개의 청크 중 첫 번째 청크를 정교화해야한다는 입력을받습니다. 1000 개의 항목을 10 개의 덩어리로 나누고 첫 번째 항목을 정교하게 만듭니다.
예, 가능합니다. 이것을 보여주는 예제 DAG를 만들었습니다.
import airflow
from airflow.operators.python_operator import PythonOperator
import os
from airflow.models import Variable
import logging
from airflow import configuration as conf
from airflow.models import DagBag, TaskInstance
from airflow import DAG, settings
from airflow.operators.bash_operator import BashOperator
main_dag_id = 'DynamicWorkflow2'
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2),
'provide_context': True
}
dag = DAG(
main_dag_id,
schedule_interval="@once",
default_args=args)
def start(*args, **kwargs):
value = Variable.get("DynamicWorkflow_Group1")
logging.info("Current DynamicWorkflow_Group1 value is " + str(value))
def resetTasksStatus(task_id, execution_date):
logging.info("Resetting: " + task_id + " " + execution_date)
dag_folder = conf.get('core', 'DAGS_FOLDER')
dagbag = DagBag(dag_folder)
check_dag = dagbag.dags[main_dag_id]
session = settings.Session()
my_task = check_dag.get_task(task_id)
ti = TaskInstance(my_task, execution_date)
state = ti.current_state()
logging.info("Current state of " + task_id + " is " + str(state))
ti.set_state(None, session)
state = ti.current_state()
logging.info("Updated state of " + task_id + " is " + str(state))
def bridge1(*args, **kwargs):
# You can set this value dynamically e.g., from a database or a calculation
dynamicValue = 2
variableValue = Variable.get("DynamicWorkflow_Group2")
logging.info("Current DynamicWorkflow_Group2 value is " + str(variableValue))
logging.info("Setting the Airflow Variable DynamicWorkflow_Group2 to " + str(dynamicValue))
os.system('airflow variables --set DynamicWorkflow_Group2 ' + str(dynamicValue))
variableValue = Variable.get("DynamicWorkflow_Group2")
logging.info("Current DynamicWorkflow_Group2 value is " + str(variableValue))
# Below code prevents this bug: https://issues.apache.org/jira/browse/AIRFLOW-1460
for i in range(dynamicValue):
resetTasksStatus('secondGroup_' + str(i), str(kwargs['execution_date']))
def bridge2(*args, **kwargs):
# You can set this value dynamically e.g., from a database or a calculation
dynamicValue = 3
variableValue = Variable.get("DynamicWorkflow_Group3")
logging.info("Current DynamicWorkflow_Group3 value is " + str(variableValue))
logging.info("Setting the Airflow Variable DynamicWorkflow_Group3 to " + str(dynamicValue))
os.system('airflow variables --set DynamicWorkflow_Group3 ' + str(dynamicValue))
variableValue = Variable.get("DynamicWorkflow_Group3")
logging.info("Current DynamicWorkflow_Group3 value is " + str(variableValue))
# Below code prevents this bug: https://issues.apache.org/jira/browse/AIRFLOW-1460
for i in range(dynamicValue):
resetTasksStatus('thirdGroup_' + str(i), str(kwargs['execution_date']))
def end(*args, **kwargs):
logging.info("Ending")
def doSomeWork(name, index, *args, **kwargs):
# Do whatever work you need to do
# Here I will just create a new file
os.system('touch /home/ec2-user/airflow/' + str(name) + str(index) + '.txt')
starting_task = PythonOperator(
task_id='start',
dag=dag,
provide_context=True,
python_callable=start,
op_args=[])
# Used to connect the stream in the event that the range is zero
bridge1_task = PythonOperator(
task_id='bridge1',
dag=dag,
provide_context=True,
python_callable=bridge1,
op_args=[])
DynamicWorkflow_Group1 = Variable.get("DynamicWorkflow_Group1")
logging.info("The current DynamicWorkflow_Group1 value is " + str(DynamicWorkflow_Group1))
for index in range(int(DynamicWorkflow_Group1)):
dynamicTask = PythonOperator(
task_id='firstGroup_' + str(index),
dag=dag,
provide_context=True,
python_callable=doSomeWork,
op_args=['firstGroup', index])
starting_task.set_downstream(dynamicTask)
dynamicTask.set_downstream(bridge1_task)
# Used to connect the stream in the event that the range is zero
bridge2_task = PythonOperator(
task_id='bridge2',
dag=dag,
provide_context=True,
python_callable=bridge2,
op_args=[])
DynamicWorkflow_Group2 = Variable.get("DynamicWorkflow_Group2")
logging.info("The current DynamicWorkflow value is " + str(DynamicWorkflow_Group2))
for index in range(int(DynamicWorkflow_Group2)):
dynamicTask = PythonOperator(
task_id='secondGroup_' + str(index),
dag=dag,
provide_context=True,
python_callable=doSomeWork,
op_args=['secondGroup', index])
bridge1_task.set_downstream(dynamicTask)
dynamicTask.set_downstream(bridge2_task)
ending_task = PythonOperator(
task_id='end',
dag=dag,
provide_context=True,
python_callable=end,
op_args=[])
DynamicWorkflow_Group3 = Variable.get("DynamicWorkflow_Group3")
logging.info("The current DynamicWorkflow value is " + str(DynamicWorkflow_Group3))
for index in range(int(DynamicWorkflow_Group3)):
# You can make this logic anything you'd like
# I chose to use the PythonOperator for all tasks
# except the last task will use the BashOperator
if index < (int(DynamicWorkflow_Group3) - 1):
dynamicTask = PythonOperator(
task_id='thirdGroup_' + str(index),
dag=dag,
provide_context=True,
python_callable=doSomeWork,
op_args=['thirdGroup', index])
else:
dynamicTask = BashOperator(
task_id='thirdGroup_' + str(index),
bash_command='touch /home/ec2-user/airflow/thirdGroup_' + str(index) + '.txt',
dag=dag)
bridge2_task.set_downstream(dynamicTask)
dynamicTask.set_downstream(ending_task)
# If you do not connect these then in the event that your range is ever zero you will have a disconnection between your stream
# and your tasks will run simultaneously instead of in your desired stream order.
starting_task.set_downstream(bridge1_task)
bridge1_task.set_downstream(bridge2_task)
bridge2_task.set_downstream(ending_task)
DAG를 실행하기 전에 다음 세 가지 기류 변수를 만듭니다.
airflow variables --set DynamicWorkflow_Group1 1
airflow variables --set DynamicWorkflow_Group2 0
airflow variables --set DynamicWorkflow_Group3 0
DAG가 여기에서 나오는 것을 볼 수 있습니다.
실행 후 이것으로
이 DAG에 대한 자세한 내용은 Airflow에서 동적 워크 플로 만들기에 대한 내 문서에서 확인할 수 있습니다 .
https://github.com/mastak/airflow_multi_dagrun 에서 더 좋은 해결책을 찾았다 고 생각합니다 . 이는 TriggerDagRuns 와 유사한 여러 dagrun을 트리거하여 DagRuns의 간단한 대기열을 사용합니다 . 대부분의 크레딧은 https://github.com/mastak으로 이동합니다 .하지만 최신 공기 흐름에서 작동하도록 몇 가지 세부 정보 를 패치 해야했습니다.
이 솔루션은 여러 DagRun을 트리거 하는 사용자 지정 연산자를 사용합니다 .
from airflow import settings
from airflow.models import DagBag
from airflow.operators.dagrun_operator import DagRunOrder, TriggerDagRunOperator
from airflow.utils.decorators import apply_defaults
from airflow.utils.state import State
from airflow.utils import timezone
class TriggerMultiDagRunOperator(TriggerDagRunOperator):
CREATED_DAGRUN_KEY = 'created_dagrun_key'
@apply_defaults
def __init__(self, op_args=None, op_kwargs=None,
*args, **kwargs):
super(TriggerMultiDagRunOperator, self).__init__(*args, **kwargs)
self.op_args = op_args or []
self.op_kwargs = op_kwargs or {}
def execute(self, context):
context.update(self.op_kwargs)
session = settings.Session()
created_dr_ids = []
for dro in self.python_callable(*self.op_args, **context):
if not dro:
break
if not isinstance(dro, DagRunOrder):
dro = DagRunOrder(payload=dro)
now = timezone.utcnow()
if dro.run_id is None:
dro.run_id = 'trig__' + now.isoformat()
dbag = DagBag(settings.DAGS_FOLDER)
trigger_dag = dbag.get_dag(self.trigger_dag_id)
dr = trigger_dag.create_dagrun(
run_id=dro.run_id,
execution_date=now,
state=State.RUNNING,
conf=dro.payload,
external_trigger=True,
)
created_dr_ids.append(dr.id)
self.log.info("Created DagRun %s, %s", dr, now)
if created_dr_ids:
session.commit()
context['ti'].xcom_push(self.CREATED_DAGRUN_KEY, created_dr_ids)
else:
self.log.info("No DagRun created")
session.close()
그런 다음 PythonOperator의 호출 가능 함수에서 여러 dagrun을 제출할 수 있습니다. 예를 들면 다음과 같습니다.
from airflow.operators.dagrun_operator import DagRunOrder
from airflow.models import DAG
from airflow.operators import TriggerMultiDagRunOperator
from airflow.utils.dates import days_ago
def generate_dag_run(**kwargs):
for i in range(10):
order = DagRunOrder(payload={'my_variable': i})
yield order
args = {
'start_date': days_ago(1),
'owner': 'airflow',
}
dag = DAG(
dag_id='simple_trigger',
max_active_runs=1,
schedule_interval='@hourly',
default_args=args,
)
gen_target_dag_run = TriggerMultiDagRunOperator(
task_id='gen_target_dag_run',
dag=dag,
trigger_dag_id='common_target',
python_callable=generate_dag_run
)
https://github.com/flinz/airflow_multi_dagrun 에서 코드로 포크를 만들었습니다.
작업 그래프는 런타임에 생성되지 않습니다. 오히려 그래프는 dags 폴더에서 Airflow에 의해 선택 될 때 작성됩니다. 따라서 작업이 실행될 때마다 작업에 대해 다른 그래프를 갖는 것은 실제로 가능하지 않습니다. 로드 시 쿼리를 기반으로 그래프를 작성하도록 작업을 구성 할 수 있습니다 . 그 그래프는 그 이후의 모든 실행에 대해 동일하게 유지되며, 이는 아마도별로 유용하지 않을 것입니다.
분기 연산자를 사용하여 쿼리 결과를 기반으로 실행될 때마다 다른 작업을 실행하는 그래프를 디자인 할 수 있습니다.
What I've done is to pre-configure a set of tasks and then take the query results and distribute them across the tasks. This is probably better anyhow because if your query returns a lot of results, you probably don't want to flood the scheduler with a lot of concurrent tasks anyhow. To be even safer, I also used a pool to ensure my concurrency doesn't get out of hand with an unexpectedly large query.
"""
- This is an idea for how to invoke multiple tasks based on the query results
"""
import logging
from datetime import datetime
from airflow import DAG
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.mysql_operator import MySqlOperator
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from include.run_celery_task import runCeleryTask
########################################################################
default_args = {
'owner': 'airflow',
'catchup': False,
'depends_on_past': False,
'start_date': datetime(2019, 7, 2, 19, 50, 00),
'email': ['rotten@stackoverflow'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 0,
'max_active_runs': 1
}
dag = DAG('dynamic_tasks_example', default_args=default_args, schedule_interval=None)
totalBuckets = 5
get_orders_query = """
select
o.id,
o.customer
from
orders o
where
o.created_at >= current_timestamp at time zone 'UTC' - '2 days'::interval
and
o.is_test = false
and
o.is_processed = false
"""
###########################################################################################################
# Generate a set of tasks so we can parallelize the results
def createOrderProcessingTask(bucket_number):
return PythonOperator(
task_id=f'order_processing_task_{bucket_number}',
python_callable=runOrderProcessing,
pool='order_processing_pool',
op_kwargs={'task_bucket': f'order_processing_task_{bucket_number}'},
provide_context=True,
dag=dag
)
# Fetch the order arguments from xcom and doStuff() to them
def runOrderProcessing(task_bucket, **context):
orderList = context['ti'].xcom_pull(task_ids='get_open_orders', key=task_bucket)
if orderList is not None:
for order in orderList:
logging.info(f"Processing Order with Order ID {order[order_id]}, customer ID {order[customer_id]}")
doStuff(**op_kwargs)
# Discover the orders we need to run and group them into buckets for processing
def getOpenOrders(**context):
myDatabaseHook = PostgresHook(postgres_conn_id='my_database_conn_id')
# initialize the task list buckets
tasks = {}
for task_number in range(0, totalBuckets):
tasks[f'order_processing_task_{task_number}'] = []
# populate the task list buckets
# distribute them evenly across the set of buckets
resultCounter = 0
for record in myDatabaseHook.get_records(get_orders_query):
resultCounter += 1
bucket = (resultCounter % totalBuckets)
tasks[f'order_processing_task_{bucket}'].append({'order_id': str(record[0]), 'customer_id': str(record[1])})
# push the order lists into xcom
for task in tasks:
if len(tasks[task]) > 0:
logging.info(f'Task {task} has {len(tasks[task])} orders.')
context['ti'].xcom_push(key=task, value=tasks[task])
else:
# if we didn't have enough tasks for every bucket
# don't bother running that task - remove it from the list
logging.info(f"Task {task} doesn't have any orders.")
del(tasks[task])
return list(tasks.keys())
###################################################################################################
# this just makes sure that there aren't any dangling xcom values in the database from a crashed dag
clean_xcoms = MySqlOperator(
task_id='clean_xcoms',
mysql_conn_id='airflow_db',
sql="delete from xcom where dag_id='{{ dag.dag_id }}'",
dag=dag)
# Ideally we'd use BranchPythonOperator() here instead of PythonOperator so that if our
# query returns fewer results than we have buckets, we don't try to run them all.
# Unfortunately I couldn't get BranchPythonOperator to take a list of results like the
# documentation says it should (Airflow 1.10.2). So we call all the bucket tasks for now.
get_orders_task = PythonOperator(
task_id='get_orders',
python_callable=getOpenOrders,
provide_context=True,
dag=dag
)
open_order_task.set_upstream(clean_xcoms)
# set up the parallel tasks -- these are configured at compile time, not at run time:
for bucketNumber in range(0, totalBuckets):
taskBucket = createOrderProcessingTask(bucketNumber)
taskBucket.set_upstream(get_orders_task)
###################################################################################################
I found this Medium post which is very similar to this question. However it is full of typos, and does not work when I tried implementing it.
My answer to the above is as follows:
작업을 동적으로 생성하는 경우 업스트림 작업에 의해 생성되지 않거나 해당 작업과 독립적으로 정의 될 수있는 항목을 반복하여 수행해야합니다 . 다른 많은 사람들이 이전에 지적한 것처럼 템플릿 외부 (예 : 작업)에 실행 날짜 또는 기타 기류 변수를 전달할 수 없다는 것을 배웠습니다. 이 게시물을 참조하십시오 .
참고 URL : https://stackoverflow.com/questions/41517798/proper-way-to-create-dynamic-workflows-in-airflow
'IT박스' 카테고리의 다른 글
| WebWorkers를 안전한 환경으로 만들기 (0) | 2020.11.22 |
|---|---|
| iOS iPad 키보드가 열리면 고정 위치가 끊어짐 (0) | 2020.11.22 |
| * .csproj 프로젝트에서 BootstrapperPackage는 무엇을 의미합니까? (0) | 2020.11.22 |
| `std :: string`의 하위 문자열에 대한`string_view`를 효율적으로 가져 오는 방법 (0) | 2020.11.22 |
| .NET에 RAII가없는 이유는 무엇입니까? (0) | 2020.11.22 |

