AIRFLOW – pobieranie kursów walut (stara wersja)

Zapoznaliśmy się z narzędziem jakim jest airflow, zatem czas by sprawdzić dalsze jego możliwości, a zatem zacznijmy od czegoś powiedzmy sobie „klasycznego”, czyli od pobrania kursów walut. Jest to jedno z lepszych ćwiczeń dla początkujących, by sprawdzić różne wersje metod i połączeń do stron. Mamy bowiem tutaj dwa wyjścia, pierwsze to połączenie się do strony i parsowaniu strony poprzez elementy HTML, lub drugi wykorzystujący API do komunikacji z systemem zewnętrznym.

Chciałbym tutaj zaznaczyć, że skrypt ten został stworzony przed zmianami jakie zaszły w 2023 roku na stronie NBP, stąd sposób komunikacji może się różnić od obecnego. Tutaj podłączymy się prostym skryptem napisanym w Pythonie poprzez udostępnione API i wykonamy parę operacji.

(Artykuł w trakcie budowy)

# -*- coding: utf-8 -*-
"""
Created on Thu Nov 10 21:54:42 2022


"""
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.microsoft.mssql.operators.mssql import MsSqlOperator
from airflow.hooks.base_hook import BaseHook

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['kontakt@projektintegracja.it'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
    'execution_timeout': timedelta(seconds=300)
}

dag = DAG(
    'gold_rates_scrape',
    default_args=default_args,
    description='Collecting and storing the gold rates data',
    schedule_interval="30 7 * * *",
    start_date=datetime(2022, 1, 1, 0, 0, 0),
    tags=['gold', 'nbp', 'rates', 'eur', 'pln'],
    catchup=False
)


import requests
import pandas as pd
import pyodbc

"""
przykladowe dane wejsciowe:
    start_date = '2022-06-01'
    api_url = f"http://api.nbp.pl/api/cenyzlota/{start_date}"
"""

def main_process():
    def get_data_from_api(api_type:str, table:str, currency_code:str, start_date:str):
        """
        Funkcja sluzaca do nawiazania polaczenia do API oraz pobrania 
        odpowiedniego zestawu danych
    
        Parameters
        ----------
        api_url : str
            link do api, do ktorego ma byc wykonany request.
            
        table : str
        currency_code : str
            Wartosci wykorzystywane przy pobieraniu kursow walut.
            
        start_date: str
            data, okreslajaca za ktory dzien maja byc pobrane dane
    
        Returns
        -------
        df : DataFrame
            zwracany jest DataFrame zawierajacy dane pobrane z api.
    
        """
        
        headers =  {"Content-Type":"application/json", "Accept": "application/json"}
        
        if api_type == "cenyzlota":
            
            api_url = f"http://api.nbp.pl/api/{api_type}/{start_date}"
            
            response = requests.get(api_url, headers=headers)
            results = response.json()
            
            df = pd.json_normalize(results)
        
        elif api_type == "exchangerates":
            
            api_url = f"https://api.nbp.pl/api/{api_type}/rates/{table}/{currency_code}/{start_date}"
    
            response = requests.get(api_url, headers=headers)
            results = response.json()
            
            df = pd.json_normalize(results["rates"])
            
        else:
            print("Niewlasciwe parametry")
        
        return df
    
    
    def transform_data_from_api():
        """
        Pobranie odpowiedniego zestawu danych poprzez funkcje get_data_from_api
        w wyniku zwraca DF zawierajacy kursy zlota w zlotowkach oraz euro
    
        Returns
        -------
        df : DataFrame
            zwracany jest DataFrame zawierajacy dane pobrane z api.
    
        """
        
        df_gold = get_data_from_api("cenyzlota", '', '', '2022-06-01')
        
        df_euro = get_data_from_api("exchangerates", "a", "eur", '2022-06-01')
        
        
        df_gold_rates = df_gold.copy()
        df_gold_rates.columns = ["data", "cena_pln"] 
        df_gold_rates["cena_eur"] = df_gold["cena"] / df_euro["mid"].loc[df_euro["effectiveDate"] == df_gold["data"]]  
        
        return df_gold_rates
    
    
    def upload_data_to_db(df_data: pd.DataFrame):
        """
        

        Parameters
        ----------
        df_data : pd.DataFrame
            funckja przyjmuje DF zawierajacy zestawienie danych.

        Returns
        -------
        Zapisuje wynikowe dane do bazy

        """
        
        conn = pyodbc.connect('Driver={SQL Server};'
                          'Server=SRV-TEST-LOCAL;'
                          'Database=db1test1;'
                          'Trusted_Connection=yes;'
                          'UID=job_user;'
                          'PWD=password;')
                
        
        try:
            for idx, row in df_data.iterrows():
                
                cursor = conn.cursor()
                print(f"INSERT INTO dbo.KURS_ZLOTA(DATA, CENA_PLN, CENA_EUR) VALUES \
                               ('{df_data['data'].iloc[idx]}', \
                                {df_data['cena_pln'].iloc[idx]}, \
                                {df_data['cena_eur'].iloc[idx]});")
                cursor.execute(f"INSERT INTO dbo.KURS_ZLOTA(DATA, CENA_PLN, CENA_EUR) VALUES \
                               ('{df_data['data'].iloc[idx]}',{df_data['cena_pln'].iloc[idx]},{df_data['cena_eur'].iloc[idx]});" )
                cursor.commit()                   
            
            
        except Exception as e:
            print(f"Connection failed: {e}")
    
   
    df_to_upload = transform_data_from_api()
     
    upload_data_to_db(df_to_upload) 
    

def interface_success():
    
    conn = pyodbc.connect('Driver={SQL Server};'
                          'Server=DESKTOP-SO51D58;'
                          'Database=db1test1;'
                          'Trusted_Connection=yes;'
                          'UID=job_user;'
                          'PWD=password;')
    
    try:
        operation_sql = ("UPDATE scheduler.job SET end_date = NOW(), status = 0 WHERE id = 1;")
        conn.execute(operation_sql)
        operation_sql = ("INSERT INTO scheduler.job_history " + 
                       "(id_job, description, start_date, data_downloaded, data_processed, data_inserted, end_date, status, summary)  " +
                       "SELECT * FROM scheduler.job WHERE id = 1;")
        conn.execute(operation_sql)
    except Exception:
        pass
    finally:
        conn.close()


def interface_fail():
   
    conn = pyodbc.connect('Driver={SQL Server};'
                          'Server=SRV-TEST-LOCAL;'
                          'Database=db1test1;'
                          'Trusted_Connection=yes;'
                          'UID=job_user;'
                          'PWD=password;')
    
    try:
        operation_sql = ("UPDATE scheduler.job SET end_date = NOW(), status = -1 WHERE id = 1;")
        conn.execute(operation_sql)
        operation_sql = ("INSERT INTO scheduler.job_history " + 
                       "(id_job, description, start_date, data_downloaded, data_processed, data_inserted, end_date, status, summary)  " +
                       "SELECT * FROM scheduler.job WHERE id = 1;")
        conn.execute(operation_sql)
    except Exception:
        pass
    finally:
        conn.close()
        raise Exception   

"""

    Konfiguracja przeplywu oraz przypisanie zadan do odpowiednich operatorow

"""

interface_start = MsSqlOperator(
                task_id='interface_start',
                mssql_conn_id='production_db',
                sql="""UPDATE scheduler.job 
                        SET start_date = NOW(), 
                        status = 1, 
                        data_downloaded = NULL, 
                        data_processed = NULL,
                        data_inserted = NULL,
                        end_date = NULL,
                        status = 1,
                        summary = NULL
                        WHERE id = 1;""",
                dag=dag)

update_main_process_status = MsSqlOperator(
                task_id='interface_start',
                mssql_conn_id='production_db',
                sql="""UPDATE scheduler.job 
                        SET data_downloaded = NOW(), 
                        data_processed = NOW(),
                        data_inserted = NOW(),
                        end_date = NULL,
                        status = 1,
                        summary = NULL
                        WHERE id = 1;""",
                dag=dag)

"""
    przypisanie glownego procesu do operatora python
"""
run_main_process = PythonOperator(
                        task_id='run_main_process',
                        python_callable=main_process,
                        dag=dag)

"""
    utworenie struktury tabeli KURS_ZLOTA
"""
create_table = MsSqlOperator(
                    task_id='create_table',
                    mssql_conn_id='production_db',
                    sql="EXEC sp_create_table();",
                    dag=dag)


"""
    uruchomienie jednego z dwoch mozliwych proceso w zaleznosci od przebiegu procesu
"""
success = PythonOperator(
            task_id='interface_success',
            python_callable=interface_success,
            dag=dag)

failure = PythonOperator(
            task_id='interface_fail',
            python_callable=interface_fail,
            dag=dag,
            trigger_rule='one_failed')


"""
    Konfiguracja kolejnych etapow przeplywu 
"""

interface_start >> create_table >> run_main_process >> update_main_process_status >> [success, failure]

Posts created 14

Dodaj komentarz

Twój adres e-mail nie zostanie opublikowany. Wymagane pola są oznaczone *

Related Posts

Begin typing your search term above and press enter to search. Press ESC to cancel.

Back To Top