Zapoznaliśmy się z narzędziem jakim jest airflow, zatem czas by sprawdzić dalsze jego możliwości, a zatem zacznijmy od czegoś powiedzmy sobie „klasycznego”, czyli od pobrania kursów walut. Jest to jedno z lepszych ćwiczeń dla początkujących, by sprawdzić różne wersje metod i połączeń do stron. Mamy bowiem tutaj dwa wyjścia, pierwsze to połączenie się do strony i parsowaniu strony poprzez elementy HTML, lub drugi wykorzystujący API do komunikacji z systemem zewnętrznym.
Chciałbym tutaj zaznaczyć, że skrypt ten został stworzony przed zmianami jakie zaszły w 2023 roku na stronie NBP, stąd sposób komunikacji może się różnić od obecnego. Tutaj podłączymy się prostym skryptem napisanym w Pythonie poprzez udostępnione API i wykonamy parę operacji.
(Artykuł w trakcie budowy)
# -*- coding: utf-8 -*-
"""
Created on Thu Nov 10 21:54:42 2022
"""
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.microsoft.mssql.operators.mssql import MsSqlOperator
from airflow.hooks.base_hook import BaseHook
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['kontakt@projektintegracja.it'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'execution_timeout': timedelta(seconds=300)
}
dag = DAG(
'gold_rates_scrape',
default_args=default_args,
description='Collecting and storing the gold rates data',
schedule_interval="30 7 * * *",
start_date=datetime(2022, 1, 1, 0, 0, 0),
tags=['gold', 'nbp', 'rates', 'eur', 'pln'],
catchup=False
)
import requests
import pandas as pd
import pyodbc
"""
przykladowe dane wejsciowe:
start_date = '2022-06-01'
api_url = f"http://api.nbp.pl/api/cenyzlota/{start_date}"
"""
def main_process():
def get_data_from_api(api_type:str, table:str, currency_code:str, start_date:str):
"""
Funkcja sluzaca do nawiazania polaczenia do API oraz pobrania
odpowiedniego zestawu danych
Parameters
----------
api_url : str
link do api, do ktorego ma byc wykonany request.
table : str
currency_code : str
Wartosci wykorzystywane przy pobieraniu kursow walut.
start_date: str
data, okreslajaca za ktory dzien maja byc pobrane dane
Returns
-------
df : DataFrame
zwracany jest DataFrame zawierajacy dane pobrane z api.
"""
headers = {"Content-Type":"application/json", "Accept": "application/json"}
if api_type == "cenyzlota":
api_url = f"http://api.nbp.pl/api/{api_type}/{start_date}"
response = requests.get(api_url, headers=headers)
results = response.json()
df = pd.json_normalize(results)
elif api_type == "exchangerates":
api_url = f"https://api.nbp.pl/api/{api_type}/rates/{table}/{currency_code}/{start_date}"
response = requests.get(api_url, headers=headers)
results = response.json()
df = pd.json_normalize(results["rates"])
else:
print("Niewlasciwe parametry")
return df
def transform_data_from_api():
"""
Pobranie odpowiedniego zestawu danych poprzez funkcje get_data_from_api
w wyniku zwraca DF zawierajacy kursy zlota w zlotowkach oraz euro
Returns
-------
df : DataFrame
zwracany jest DataFrame zawierajacy dane pobrane z api.
"""
df_gold = get_data_from_api("cenyzlota", '', '', '2022-06-01')
df_euro = get_data_from_api("exchangerates", "a", "eur", '2022-06-01')
df_gold_rates = df_gold.copy()
df_gold_rates.columns = ["data", "cena_pln"]
df_gold_rates["cena_eur"] = df_gold["cena"] / df_euro["mid"].loc[df_euro["effectiveDate"] == df_gold["data"]]
return df_gold_rates
def upload_data_to_db(df_data: pd.DataFrame):
"""
Parameters
----------
df_data : pd.DataFrame
funckja przyjmuje DF zawierajacy zestawienie danych.
Returns
-------
Zapisuje wynikowe dane do bazy
"""
conn = pyodbc.connect('Driver={SQL Server};'
'Server=SRV-TEST-LOCAL;'
'Database=db1test1;'
'Trusted_Connection=yes;'
'UID=job_user;'
'PWD=password;')
try:
for idx, row in df_data.iterrows():
cursor = conn.cursor()
print(f"INSERT INTO dbo.KURS_ZLOTA(DATA, CENA_PLN, CENA_EUR) VALUES \
('{df_data['data'].iloc[idx]}', \
{df_data['cena_pln'].iloc[idx]}, \
{df_data['cena_eur'].iloc[idx]});")
cursor.execute(f"INSERT INTO dbo.KURS_ZLOTA(DATA, CENA_PLN, CENA_EUR) VALUES \
('{df_data['data'].iloc[idx]}',{df_data['cena_pln'].iloc[idx]},{df_data['cena_eur'].iloc[idx]});" )
cursor.commit()
except Exception as e:
print(f"Connection failed: {e}")
df_to_upload = transform_data_from_api()
upload_data_to_db(df_to_upload)
def interface_success():
conn = pyodbc.connect('Driver={SQL Server};'
'Server=DESKTOP-SO51D58;'
'Database=db1test1;'
'Trusted_Connection=yes;'
'UID=job_user;'
'PWD=password;')
try:
operation_sql = ("UPDATE scheduler.job SET end_date = NOW(), status = 0 WHERE id = 1;")
conn.execute(operation_sql)
operation_sql = ("INSERT INTO scheduler.job_history " +
"(id_job, description, start_date, data_downloaded, data_processed, data_inserted, end_date, status, summary) " +
"SELECT * FROM scheduler.job WHERE id = 1;")
conn.execute(operation_sql)
except Exception:
pass
finally:
conn.close()
def interface_fail():
conn = pyodbc.connect('Driver={SQL Server};'
'Server=SRV-TEST-LOCAL;'
'Database=db1test1;'
'Trusted_Connection=yes;'
'UID=job_user;'
'PWD=password;')
try:
operation_sql = ("UPDATE scheduler.job SET end_date = NOW(), status = -1 WHERE id = 1;")
conn.execute(operation_sql)
operation_sql = ("INSERT INTO scheduler.job_history " +
"(id_job, description, start_date, data_downloaded, data_processed, data_inserted, end_date, status, summary) " +
"SELECT * FROM scheduler.job WHERE id = 1;")
conn.execute(operation_sql)
except Exception:
pass
finally:
conn.close()
raise Exception
"""
Konfiguracja przeplywu oraz przypisanie zadan do odpowiednich operatorow
"""
interface_start = MsSqlOperator(
task_id='interface_start',
mssql_conn_id='production_db',
sql="""UPDATE scheduler.job
SET start_date = NOW(),
status = 1,
data_downloaded = NULL,
data_processed = NULL,
data_inserted = NULL,
end_date = NULL,
status = 1,
summary = NULL
WHERE id = 1;""",
dag=dag)
update_main_process_status = MsSqlOperator(
task_id='interface_start',
mssql_conn_id='production_db',
sql="""UPDATE scheduler.job
SET data_downloaded = NOW(),
data_processed = NOW(),
data_inserted = NOW(),
end_date = NULL,
status = 1,
summary = NULL
WHERE id = 1;""",
dag=dag)
"""
przypisanie glownego procesu do operatora python
"""
run_main_process = PythonOperator(
task_id='run_main_process',
python_callable=main_process,
dag=dag)
"""
utworenie struktury tabeli KURS_ZLOTA
"""
create_table = MsSqlOperator(
task_id='create_table',
mssql_conn_id='production_db',
sql="EXEC sp_create_table();",
dag=dag)
"""
uruchomienie jednego z dwoch mozliwych proceso w zaleznosci od przebiegu procesu
"""
success = PythonOperator(
task_id='interface_success',
python_callable=interface_success,
dag=dag)
failure = PythonOperator(
task_id='interface_fail',
python_callable=interface_fail,
dag=dag,
trigger_rule='one_failed')
"""
Konfiguracja kolejnych etapow przeplywu
"""
interface_start >> create_table >> run_main_process >> update_main_process_status >> [success, failure]