본문 바로가기
프로젝트

Hive 로 json parsing for문 처럼 사용하기(모름)

by DenverAlmighty 2023. 4. 12.
반응형

Hive 로 json parsing for문 처럼 사용하는 방법은 아직 모른다

 

Amadeus API 로 항공 가격 조회를하면 데이터를 아래처럼 준다

data에 0, 1, 2,3 .. 이게 각각 항공권 이다

 

 

hive로 json 파싱을해봤다. 값은 잘 추출되었고 data\[0] 이부분을 1,2, 3 등으로 바꿔줘야하는데

 

아직 hive 로 한번에 처리하는 방법을 모르겠어서 시간이 없으니 그냥 python으로 parsing하는 코드를짰다.

airflow에서 실행시키기 편하도록 json 파일명, 경로, csv 파일명, 경로를 커맨드로 입력 받을 수 있게 했다.

csv 파일 경로를 지정 하지 않으면 json 파일 경로와 동일한 곳에 저장되고,

csv 파일명을 지정하지 않으면 json파일명과 동일한 파일명에 확장자만 .csv로 저장된다.

csv모듈을 이용해서 파싱한 결과를 list에 담아 저장해도되는데

list에 담을거 굳이 변수명을 지정해서 추출하고 list에 담기도 그렇고, 

변수명을 지정안하고 바로 append하자니 나중에 컬럼 수정할 때 무슨 컬럼인지 구분하기 어려울 것 같아서

 , (comma)로 이어붙이고 file write 하도록했다.

import os
import sys
import json
import pathlib
import logging
import argparse


# Parsing command line arguments
def input():
    parser = argparse.ArgumentParser(description='Amadeus API json Parser', add_help=True)
    # Required
    parser.add_argument('-jp', dest='json_fpath', help='JSON file path ', type=pathlib.Path, required=True)
    parser.add_argument('-jf', dest='json_fname', help='JSON file name ', required=True)
    # Optional
    parser.add_argument('-cp', dest='csv_fpath', help='CSV file path. Default : <JSON FILE PATH>', default='', type=pathlib.Path, required=False)
    parser.add_argument('-cf', dest='csv_fname', help='CSV file name. Default : <JSON FILE NAME>', default='', required=False)
    
    args = parser.parse_args() 
    
    # Get arguments and transform format or type
    json_fpath = str(args.json_fpath)
    json_fname = args.json_fname
    csv_fpath = str(args.csv_fpath)
    csv_fname = args.csv_fname

    # if isnull csv_fpath, csv_fname, the same as json
    if csv_fpath == '':
        csv_fpath = json_fpath
    if csv_fname == '':
        csv_fname = f'{json_fname[:-5]}.csv'
    if json_fpath[-1] != '/':
        json_fpath += '/'
    if csv_fpath[-1] != '/':
        csv_fpath += '/'
        
    try:
        # Check path 
        if not os.path.isdir(json_fpath) or not os.path.isdir(csv_fpath):
            raise FileNotFoundError
        
    except FileNotFoundError  as e:
        logger.error(f'{type(e)} No such file or directory : {e.args} {e}')
        print(f'{type(e)} No such file or directory : {e.args} {e}')
        sys.exit(1)
    except Exception as e:
        logger.error(f'{type(e)}  : {e.args} {e}')
        print(f'{type(e)}  : {e.args} {e}')
        sys.exit(1)
    
    return json_fpath, json_fname, csv_fpath, csv_fname


def parsing(li):
    # Init row
    row = ''
    
    total_price = li['price']['total']
    seg = li['itineraries'][0]['segments'][0]
    dept_datetime = seg['departure']['at']
    dept_datetime = dept_datetime.split('T')
    dept_date = dept_datetime[0]
    dept_time = dept_datetime[1]
    dept_iata = seg['departure']['iataCode']
    arrv_datetime = seg['arrival']['at']
    arrv_datetime = arrv_datetime.split('T')
    arrv_date = arrv_datetime[0]
    arrv_time = arrv_datetime[1]
    arrv_iata = seg['arrival']['iataCode']
    airline = seg['carrierCode']
    num_stopover = seg['numberOfStops']
    is_oneway = 1
    
    row = f'{airline}, {dept_iata}, {dept_date}, {dept_time}, {arrv_iata}, {arrv_date}, {arrv_time}, {num_stopover}, {total_price}, {is_oneway}\n'
    
    return row


def main():
    json_fpath, json_fname, csv_fpath, csv_fname = input()

    try:
        # Load JSON file
        jsonfile = f'{json_fpath}{json_fname}'
        with open(jsonfile) as f:
            json_data = json.load(f)
        # Parsing
        data = json_data['data']
        for li in data:        
            row = parsing(li)
            cltd_time = json_fname.split('_')[1]
            row = f'{cltd_time}, {row}'
            print(row)
            # Save as CSV file
            with open(f'{csv_fpath}{csv_fname}', 'a') as f:
                f.write(row)
    except Exception as e:
        logger.error(f'{type(e)}  : {e.args} {e}')
        print(f'{type(e)}  : {e.args} {e}')
        sys.exit(1)


if __name__ == '__main__':
    # Logger Settings
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
    file_handler = logging.FileHandler('../log/amadeus_json_parsing.log', encoding='utf-8')
    file_handler.setFormatter(formatter)
    logger.addHandler(file_handler)

    # main function
    logger.info(' executed')
    if main():
        sys.exit(0)

 

아래는 결과 csv 파일인데 잘 나온다.

내일 csv 데이터를 hive 테이블에 로드하는걸 해봐야한다.

 

 

728x90
반응형