Data Engineer/프로젝트

🔥 [데이터 파이프라인 구축 프로젝트] 02. ETL 수행하기

sehee00 2024. 12. 11. 16:57

영화진흥회 API를 살펴보며 데이터 수집 계획을 세우고, 데이터 추출, 변환, 적재를 진행할 예정이다. 

 

1. 영화진흥회 API

API(Application Programming Interface)란?

  • API는 특정 애플리케이션, 서비스, 또는 시스템이 다른 애플리케이션과 통신할 수 있도록 정의된 일련의 규칙
  • 애플리케이션 간의 계약: API는 두 애플리케이션 간의 데이터 전달 및 명령어 실행 방식(어떤 데이터를 보내야 하고, 어떤 방식으로 받을지)에 대한 규칙을 정해놓은 것. 이를 통해 두 시스템은 서로 무엇을 기대하는지 알 수 있게 된다.
  • 웹 API (RESTful API): 웹에서 가장 흔히 사용되는 형태로, HTTP를 통해 요청(request)과 응답(response)을 주고받는다. 예를 들어, 클라이언트가 API를 통해 데이터를 요청하면, 서버는 해당 데이터를 제공하는 방식이다. 
  • API의 사용 예: 영화진흥회 API처럼, 특정 웹 서비스가 데이터를 제공하고 이를 다른 애플리케이션이 활용할 수 있도록 해주는 방식. 이를 통해 데이터를 추출하거나 특정 기능을 다른 애플리케이션에서 사용할 수 있다.

 

영화진흥위원회 오픈 API 

https://www.kobis.or.kr/kobisopenapi/homepg/board/findServiceGuideList.do

 

오픈 API를 이용하려면 일단 키를 발급받아야 한다. 

키는 신청만 하면 바로 발급된다. 

일 3000회로 사용 제한이 있다. 

발급키

 

제공 서비스

영화 진흥회 Open API

 

제공서비스에는 일별 박스오피스, 주간/주말 박스오피스, 공통코드 조회 등 다양한 서비스를 제공한다. 

다음에는 API를 살펴보며 어떤 데이터를 사용할지 고민해보려고 한다. 

 

2. 데이터 소스 선정하기 

데이터 추출(Extract) 단계에서 필요한 데이터만 수집할지, 아니면 가능한 데이터를 모두 가져올지 고민이다. 

  • 필요한 데이터만 가져오는 경우:
    • 데이터 크기가 작아 처리 시간이 단축되고, 파이프라인이 간결하며 유지보수가 용이하다. 
    • 데이터 요구사항이 변경되거나 추가적인 분석이 필요해지면, 다시 데이터를 추출해야 한다. 
    • 추출 단계에서 데이터 선정을 잘못하거나 오류가 있다면, 중요한 데이터를 놓칠 수 있다. 
  • 가능한 데이터를 모두 가져오는 경우:
    • 변환 단계에서 다양한 데이터 전처리 및 조작이 가능한 유연성을 갖출 수 있다. 
    • 데이터 손실이나 누락을 최소화하며, 추후 확장성이 좋다. 데이터 재수집을 하지 않아도 된다. 
    • 저장소가 많이 필요하며 시간이 오래 걸린다. 
  • 현대적인 데이터 파이프라인 설계 트렌드:
    • 데이터를 최대한 수집하여 ELT 방식으로 처리하는 경우가 많습니다.
    • 이유는 저장 비용이 낮아졌고, 클라우드 서비스에서 컴퓨팅 자원을 유연하게 사용할 수 있기 때문이다.
  • 결론 
    • 나는 "데이터 파이프라인 구축"에 초점을 맞추고 있기 때문에, 가능한 데이터를 모두 가져오는 방식이 맞을 것 같다. 
    • 최대한 데이터를 수집하고, 다양한 데이터 전처리 및 조작을 통해 활용할 수 있는 환경을 제공하는 쪽으로 해야겠다. 
    • 명확한 요구사항이 있는게 아니고 데이터를 다각도로 활용할 가능성이 있기 때문에, 가능한 모든 데이터를 수집하자!

1) 일별 박스오피스 API 살펴보기 

2024년 11월 27일자 일별 박스오피스 API의 일부를 가져왔다.

 

일별 박스오피스 API에서는 영화 장르가 제공되지 않아서 'movieCd(영화의 대표코드)'를 외부키로 연결하여 영화 상세정보 API에서 영화 장르를 얻어야 한다.

일별 박스오피스 API의 응답필드에서 필요하지 않은 필드도 있다. 예를 들어, rnum(순번)은 필요하지 않다.

 

rank(해당일자의 박스오피스 순위), rankOldAndNew(랭킹에 신규진입여부, “OLD” : 기존 , “NEW” : 신규), movieCd(영화의 대표코드), movieNm(영화명(국문)), openDt(개봉일), salesAmt(해당일의 매출액), salesShare(해당일자 상영작의 매출총액 대비 해당 영화의 매출비율), salesInten(전일 대비 매출액 증감분), salesChange(전일 대비 매출액 증감 비율), salesAcc(누적매출액), audiAcc(해당일의 관객수), audiInten(전일 대비 관객수 증감분), audiChange(전일 대비 관객수 증감 비율), audiAcc(누적 관객수), scrnCnt(해당일자에 상영한 스크린수), showCnt(해당일자에 상영된 횟수) 데이터를 수집할 예정이다. 총 16개의 필드를 가진다. 

 

2) 영화 상세정보 API 살펴보기 

'외국영화 - 모아나 2'와 '한국영화 - 히든페이스' 상세정보를 불러와 비교해 봤다.

또, 다른 영화들의 상세정보를 확인하며 데이터 세부 정부를 파악해 봤다. 

 

  • movieNmOg(영화명(원문)): 모두 None값이다. 
  • nations(제작국가):  1개 이상일 수 있다. ex. (미국, 캐나다)
  • genres(장르): 1개 이상일 수 있다. ex. 애니메이션, 스릴러, (범죄, 스릴러) 
  • directors(감독): 1명 이상일 수 있다. 
  • actors(배우): 외국 영화는 2명인 거에 비해 한국 영화는 27명이었다. 
  • showTypes(상영형태): 외국 영화는 4가지, 한국 영화는 1가지였다. 즉, 여러 개일 수 있다. 
  • companys(참여영화사): 1개 이상일 수 있다. 
  • audits(심의정보) - watchGradNm(관람등급 명칭) : 1개 이상일 수 있는데, 의미는 같을 수 있다.  
  • staffs(스텝): 외국 영화는 스텝이 하나도 출력되지 않았고, 한국 영화는 약 100명 이상 출력된다. 

영화 상세정보 API에서는 movieNmOg(영화명(원문)), nations(제작국가), genres(장르), directors(감독), showTypes(상영형태), companys(참여영화사), watchGradNm(관람등급 명칭) 데이터를 수집할 예정이다. 총 7개의 필드를 가진다. 

 

3) 다른 API

일별 박스오피스, 주간/주말 박스오피스, 공통코드 조회, 영화목록, 영화 상세정보, 영화사목록, 영화사 상세정보, 영화인목록, 영화인 상세정보 API들이 있지만, 모든 API의 정보를 가져올 필요는 없을 것 같다. 

 

- 영화사 상세정보 API

companyCd(영화사코드)로 영화사 상세정보 API를 조회해 봤다. 

 

영화사 정보(영화사 목록/상세정보): 제작사, 배급사 관련 데이터 분석이 필요할 때 유용할 것 같다. 

 

- 영화인 목록 API

"peopleNm(영화인명)으로 영화인 목록을 조회해봤다. 

 

- 영화인 상세정보 API

 

  • 모든 API를 추출하고 ERD를 구성하는 것은 너무 복잡하고 불필요해 보여서, 우선 핵심 API(일별 박스오피스, 영화 상세정보)의 데이터만 가져오기로 결정했다. 
  • 프로젝트를 진행하며 추가 API가 필요하다고 판단되면, 그때 데이터를 확장하고 ERD를 업데이트할 예정이다. 

 

3. 데이터베이스 설계하기 

1) AWS EC2에서 Doker로 MySQL 띄우기 (이전게시물 참조)

이전에 작성한 글과 같은 방법으로 새로운 인스턴스를 생성하여 MySQL 컨테이너를 새로 만들었다.

이제 데이터베이스를 만들고, 사용자 생성 및 권한 설정을 해줘야 한다. 

# 데이터베이스 생성 
CREATE DATABASE movie_analysis;
# 데이터베이스 사용
USE movie_analysis;

 

클라우드에서 데이터를 처리하도록 하기 위해, 

사용자 계정을 외부 접속 계정(%)으로 생성하고, 권한 부여도 해준다. 

CREATE USER '유저명'@'%' IDENTIFIED BY '설정할 비밀번호';
GRANT ALL PRIVILEGES ON your_database_name.* TO '유저명'@'%';
FLUSH PRIVILEGES;

 

 

2) 테이블 설계 및 생성하기 

💡 테이블 설계를 먼저 진행하는 이유

데이터를 변환(Transform)하거나 적재(Load) 하기 전에 테이블 구조를 명확히 정의하면 데이터 처리 중 오류를 최소화하고 일관된 파이프라인을 유지할 수 있다.

 

데이터베이스 설계를 하려니 어떻게 이름을 명명해야 할지 고민이 되어 구글링을 해봤다. 블로그를 참조하여 필수적인 명명규칙을 정리해 봤다.

  • 소문자로 작성한다.
  • 단어와 단어 사이는 _ 로 구분한다. 
  • 약어를 사용하지 않는다. (ex. mid_nm 대신 middle_name 사용)
  • 몇 긴 단어의 경우 약어가 단어 자체보다 더 통용되는 경우가 있다. 이런 경우에는 약어를 사용한다. 
  • 기본키 필드의 이름 id로 작성한다. 
  • 외래 키 필드는 참조된 테이블의 이름과 참조된 필드의 이름 조합이어야 한다. ex. team_id 

📌 일별 박스오피스(daily_box_office) 테이블 

-- 일별 박스오피스 테이블 생성
CREATE TABLE daily_box_office (
    movie_id VARCHAR(20) PRIMARY KEY, -- 영화 대표코드
    movie_rank INT, -- 박스오피스 순위
    movie_nm VARCHAR(255), -- 영화명
    open_date DATE, -- 개봉일
    sales_amt BIGINT, -- 해당일의 매출액
    sales_acc BIGINT, -- 누적 매출액
    audi_acc BIGINT, -- 누적 관객수
    scrn_cnt INT, -- 상영 스크린 수
    show_cnt INT  -- 상영 횟수
);

 

총 16개의 칼럼을 모두 가져와 저장하려 했으나, 데이터를 분석할 때 반드시 필요한지 고려하고 비슷한 정보를 제공하는 필드는 통합하여 필드 수를 줄이려고 한다. 

  • movie_id: 영화 대표코드 → 고유 식별자 
  • movie_rank: 해당 일자의 박스오피스 순위 → 순위 분석에 필수
  • movie_nm: 영화명 → 분석 시 사람이 이해하기 쉬운 값 제공
  • open_date: 개봉일 → 영화의 신작 여부 및 상영 기간 분석
  • sales_amt: 해당일의 매출액 → 매출 분석의 기본 데이터
  • sales_acc: 누적 매출액 → 장기 흥행 정도 분석
  • audi_acc: 누적 관객수 → 관객수와 매출의 상관관계 분석
  • scrn_cnt: 상영 스크린 수 → 상영 규모와 매출의 상관 관계 분석
  • show_cnt: 상영 횟수 → 상영 횟수 대비 성과 측정/ 상영 횟수와 매출의 상관 관계 분석 

이렇게 총 9개의 칼럼을 추출하기로 결정했다. 

 

📌 영화 상세정보(movie_details) 테이블 

-- 영화 상세정보 테이블 생성
CREATE TABLE movie_details (
    movie_id VARCHAR(20) PRIMARY KEY, -- 영화 대표코드
    nation_nm VARCHAR(50), -- 제작국가
    genre_nm VARCHAR(50), -- 장르명 
    director_nm VARCHAR(255), -- 감독명 
    actor_nm VARCHAR(255), -- 배우명 
    show_type_nm VARCHAR(255), -- 상영 형태 
    company_nm VARCHAR(255), -- 참여 영회사
    watch_grade_nm VARCHAR(255), -- 관람등급 명칭
    FOREIGN KEY (movie_id) REFERENCES daily_box_office(movie_id)
);
  • movie_id: 영화 대표코드다른 테이블과의 관계 설정 및 데이터 통합 관리에 필수 // 일별 박스오피스 테이블의 movie_id Foreign Key로 연결하여 통합 데이터 관리 필요
  • nation_nm: 제작국가 국가별 영화 매출 비중, 관객 수 분석, 영화 시장 트렌드 파악
  • genre_nm: 장르 장르별 흥행 비율 분석, 특정 장르 트렌드 및 인기 장르 파악
  • director_nm: 감독 감독별 영화 성과 분석, 흥행 성공률 조사, 특정 감독의 영화 스타일 트렌드 분석
  • actor_nm: 배우 배우별 영화 매출 기여도 분석, 주연 배우가 매출 및 관객 수에 미치는 영향 평가 
  • show_type: 상영형태 상영 방식별 관객 선호도, 매출 비교 분석
  • company_nm: 참여 영화사 영화사별 프로젝트 규모 및 성과 분석, 특정 영화사의 흥행 성과 추적
  • watch_grade_nm: 관람등급 명칭 관람 등급별 관객층 분석 // 데이터 정제 과정으로 중복 제거 필요 
  •  

이렇게 총 8개의 칼럼을 추출하기로 결정했다. 

영화 상세정보 테이블은 다대다 관계를 고려해야 한다.

  • nation_nm: 한 영화는 여러 제작국가를 가질 수 있고, 하나의 국가는 여러 영화에 속할 수 있습니다.
  • genre_nm: 한 영화는 여러 장르를 가질 수 있고, 하나의 장르는 여러 영화에 속할 수 있습니다.
  • director_nm: 한 영화는 여러 감독을 가질 수 있고, 한 명의 감독은 여러 영화에 속할 수 있습니다. 
  • actor_nm: 한 영화는 여러 배우를 가질 수 있고, 한 명의 배우는 여러 영화에 속할 수 있습니다. 
  • show_type: 한 영화는 여러 상영형태를 가질 수 있고, 하나의 상영형태는 여러 영화에 속할 수 있습니다. 
  • company_nm: 한 영화는 여러 영화사를 가질 수 있고, 하나의 영화사는 여러 영화에 속할 수 있습니다. 

📌 다대다 관계

  • 다대다 관계는 일대다, 다대일로 풀어줄 수 있도록 연관 테이블을 추가했다. 
-- 국가 테이블
CREATE TABLE nations (
    nation_id INT AUTO_INCREMENT PRIMARY KEY, -- 고유 ID
    nation_nm VARCHAR(255) NOT NULL -- 국가명
);

-- 영화와 국가의 관계 테이블
CREATE TABLE movie_and_nation (
    movie_id VARCHAR(20) NOT NULL, -- 영화 대표코드
    nation_id INT NOT NULL, -- 국가 ID
    PRIMARY KEY (movie_id, nation_id),
    FOREIGN KEY (movie_id) REFERENCES movie_details (movie_id) ON DELETE CASCADE,
    FOREIGN KEY (nation_id) REFERENCES nations (nation_id) ON DELETE CASCADE
);

-- 장르 테이블
CREATE TABLE genres (
    genre_id INT AUTO_INCREMENT PRIMARY KEY, -- 고유 ID
    genre_nm VARCHAR(255) NOT NULL -- 장르명
);

-- 영화와 장르의 관계 테이블
CREATE TABLE movie_and_genre (
    movie_id VARCHAR(20) NOT NULL, -- 영화 대표코드
    genre_id INT NOT NULL, -- 장르 ID
    PRIMARY KEY (movie_id, genre_id),
    FOREIGN KEY (movie_id) REFERENCES movie_details (movie_id) ON DELETE CASCADE,
    FOREIGN KEY (genre_id) REFERENCES genres (genre_id) ON DELETE CASCADE
);

-- 감독 테이블
CREATE TABLE directors (
    director_id INT AUTO_INCREMENT PRIMARY KEY, -- 고유 ID
    director_nm VARCHAR(255) NOT NULL -- 감독명
);

-- 영화와 감독의 관계 테이블
CREATE TABLE movie_and_director (
    movie_id VARCHAR(20) NOT NULL, -- 영화 대표코드
    director_id INT NOT NULL, -- 감독 ID
    PRIMARY KEY (movie_id, director_id),
    FOREIGN KEY (movie_id) REFERENCES movie_details (movie_id) ON DELETE CASCADE,
    FOREIGN KEY (director_id) REFERENCES directors (director_id) ON DELETE CASCADE
);

-- 배우 테이블
CREATE TABLE actors (
    actor_id INT AUTO_INCREMENT PRIMARY KEY, -- 고유 ID
    actor_nm VARCHAR(255) NOT NULL -- 배우명
);

-- 영화와 배우의 관계 테이블
CREATE TABLE movie_and_actor (
    movie_id VARCHAR(20) NOT NULL, -- 영화 대표코드
    actor_id INT NOT NULL, -- 배우 ID
    PRIMARY KEY (movie_id, actor_id),
    FOREIGN KEY (movie_id) REFERENCES movie_details (movie_id) ON DELETE CASCADE,
    FOREIGN KEY (actor_id) REFERENCES actors (actor_id) ON DELETE CASCADE
);

-- 상영형태 테이블
CREATE TABLE show_types (
    show_type_id INT AUTO_INCREMENT PRIMARY KEY, -- 고유 ID
    show_type VARCHAR(255) NOT NULL -- 상영형태
);

-- 영화와 상영형태의 관계 테이블
CREATE TABLE movie_and_show_type (
    movie_id VARCHAR(20) NOT NULL, -- 영화 대표코드
    show_type_id INT NOT NULL, -- 상영형태 ID
    PRIMARY KEY (movie_id, show_type_id),
    FOREIGN KEY (movie_id) REFERENCES movie_details (movie_id) ON DELETE CASCADE,
    FOREIGN KEY (show_type_id) REFERENCES show_types (show_type_id) ON DELETE CASCADE
);

-- 영화사 테이블
CREATE TABLE companies (
    company_id INT AUTO_INCREMENT PRIMARY KEY, -- 고유 ID
    company_nm VARCHAR(255) NOT NULL -- 영화사명
);

-- 영화와 영화사의 관계 테이블
CREATE TABLE movie_and_company (
    movie_id VARCHAR(20) NOT NULL, -- 영화 대표코드
    company_id INT NOT NULL, -- 영화사 ID
    PRIMARY KEY (movie_id, company_id),
    FOREIGN KEY (movie_id) REFERENCES movie_details (movie_id) ON DELETE CASCADE,
    FOREIGN KEY (company_id) REFERENCES companies (company_id) ON DELETE CASCADE
);

 

📌 ERD(개체 관계 다이어그램)

4. ETL 수행하기 

1) Extract (데이터 추출): API를 통해 데이터를 가져옵니다.

# 데이터 추출(Extract)
def fetch_daily_box_office(date):
    """일별 박스오피스 데이터를 API에서 가져옵니다."""
    params = {"key": API_KEY, "targetDt": date}
    response = requests.get(BOX_OFFICE_URL, params=params)
    if response.status_code == 200:
        data = response.json()
        return data["boxOfficeResult"]["dailyBoxOfficeList"]
    else:
        raise Exception(f"API 호출 실패: {response.status_code}")

def fetch_movie_info(movieCd):
    """영화 상세정보 데이터를 API에서 가져옵니다."""
    params = {"key": API_KEY, "movieCd": movieCd}
    response = requests.get(MOVIE_INFO_URL, params=params)
    if response.status_code == 200:
        data = response.json()
        return data["movieInfoResult"]["movieInfo"]
    else:
        raise Exception(f"API 호출 실패: {response.status_code}")

 

 

2) Transform (데이터 정제): 가져온 데이터를 정제합니다. 필요에 따라 데이터 형식 변환, 중복 제거, 필터링 등을 합니다.

# 데이터 변환(Transform)
def transform_data(box_office_data, movie_info_data):

    """박스오피스 데이터와 영화 상세정보 데이터를 정제합니다."""
    clean_records_box_office = []
    clean_records_movie_details = []

    for movie in box_office_data:
        movieCd = movie.get("movieCd", "")
        rank = int(movie.get("rank", 0)) # 박스오피스 순위
        movieNm = movie.get("movieNm", "") # 영화명(국문)
        openDt = movie.get("openDt", None) # 개봉일 
        salesAmt = int(movie.get("salesAmt", 0)) # 해당일의 매출액 
        salesAcc = int(movie.get("salesAcc", 0)) # 누적 매출액 
        audiAcc = int(movie.get("audiAcc", 0)) # 누적 관객수 
        scrnCnt = int(movie.get("scrnCnt", 0)) # 해당일의 상영 스크린 수
        showCnt = int(movie.get("showCnt", 0)) # 해당일의 상영 횟수 

        # 정제된 박스오피스 데이터를 저장
        clean_records_box_office.append((
            movieCd, rank, movieNm, openDt, salesAmt, salesAcc, audiAcc, scrnCnt, showCnt
        ))

        # 영화 상세 정보
        info = movie_info_data.get(movieCd, {})
        nations = [n.get("nationNm", "") for n in info.get("nations", [])] # 국가: ["미국"], ["한국", "중국"]
        genres = [g.get("genreNm", "") for g in info.get("genres", [])] # 장르: ["액션"], ["드라마", "스릴러"]
        directors = [d.get("peopleNm", "") for d in info.get("directors", [])] # 감독: [{"peopleNm": "봉준호"}]
        actors = [a.get("peopleNm", "") for a in info.get("actors", [])[:5]] # 배우: [{"peopleNm": "송강호"}, {"peopleNm": "김혜수"}]
        showTypes = [s.get("showTypeNm", "") for s in info.get("showTypes", [])] # 상영형태: ["2D", "IMAX"]
        companies = [c.get("companyNm", "") for c in info.get("companys", [])] # 제작사: [{"companyNm": "CJ 엔터테인먼트"}]
        watchGradeNm = [w.get("watchGradeNm", "") for w in info.get("audits", [])] # 관람등급: "15세 관람가", "전체 관람가"

        nation_str = ",".join(nations)
        genre_str = ",".join(genres)
        director_str = ",".join(directors)
        actor_str = ",".join(actors)
        show_type_str = ",".join(showTypes)
        company_str = ",".join(companies)
        watch_grade_str = ",".join(watchGradeNm)

        # movie_details 데이터 수집
        clean_records_movie_details.append((
            movieCd, nation_str, genre_str, director_str, actor_str, show_type_str, company_str, watch_grade_str
        ))

    return clean_records_box_office, clean_records_movie_details
  • 일단, 일별 박스오피스 데이터와 영화 상세정보 데이터를 정제한다. 
  • 이후 관계 테이블에 데이터 적재를 위해 관계 테이블 데이터를 정제한다. 

 

  • 영화 데이터와 관련된 모든 관계 정보를 처리.
  • 중복된 데이터는 삽입하지 않으며, 필요한 경우 새 데이터를 생성.
  • 각 영화와 관계를 효율적으로 매핑하며, 데이터의 일관성과 중복 방지를 보장.

 

# 관계 테이블의 column_nm이 있는지 확인하고 column_id를 가져오는 함수
def get_relation_id(connection, table_nm, column_nm): 
    """관계 테이블에서 column_nm을 기준으로 column_id를 확인하고 없으면 추가"""
    cursor = connection.cursor()
    select_query = f"SELECT {table_nm}_id FROM {table_nm} WHERE {table_nm}_nm = %s"
    cursor.execute(select_query, (column_nm,))
    result = cursor.fetchone() 

    if result:
        # 이미 존재하는 경우 ID 반환
        return result[0]
    else:
        # 새로운 데이터를 추가하고 ID 반환
        insert_query = f"INSERT INTO {table_nm} ({table_nm}_nm) VALUES (%s)"
        cursor.execute(insert_query, (column_nm,))
        # connection.commit()
        return cursor.lastrowid
    
# 영화와 *의 관계를 movie_and_relation 테이블에 추가하는 함수 
def insert_relation(connection, movie_id, table_nm, column_id):
    """영화와 관계 테이블에 매핑"""
    cursor = connection.cursor()

    # 중복 확인 쿼리
    check_query = f"SELECT 1 FROM movie_and_{table_nm} WHERE movie_id = %s AND {table_nm}_id = %s"
    cursor.execute(check_query, (movie_id, column_id))
    if cursor.fetchone():
        print(f"영화 {movie_id}와 {table_nm} {column_id}은(는) 이미 존재합니다.")
        return  # 이미 존재하면 삽입하지 않음
    
    # 중복되지 않으면 삽입
    insert_query = f"INSERT INTO movie_and_{table_nm} (movie_id, {table_nm}_id) VALUES (%s, %s)"
    cursor.execute(insert_query, (movie_id, column_id))
    connection.commit()
    print(f"영화 {movie_id}와 {table_nm} {column_id} 처리 완료")
    # connection.commit()

def process_movie_relations(connection, movie_id, nations, genres, directors, actors, showTypes, companies):
    """각 영화에 대해 국가, 장르, 감독, 배우, 상영형태, 회사와 관계 테이블을 매핑"""
    
    # 국가 처리
    for nation in nations:
        nation_id = get_relation_id(connection, "nation", nation)
        insert_relation(connection, movie_id, "nation", nation_id)
        print(f"영화 {movie_id}와 국가 {nation} 처리 완료")
    
    # 장르 처리
    for genre in genres:
        genre_id = get_relation_id(connection, "genre", genre)
        insert_relation(connection, movie_id, "genre", genre_id)
        print(f"영화 {movie_id}와 장르 {genre} 처리 완료")
    
    # 감독 처리
    for director in directors:
        director_id = get_relation_id(connection, "director", director)
        insert_relation(connection, movie_id, "director", director_id)
        print(f"영화 {movie_id}와 감독 {director} 처리 완료")
    
    # 배우 처리
    for actor in actors:
        actor_id = get_relation_id(connection, "actor", actor)
        insert_relation(connection, movie_id, "actor", actor_id)
        print(f"영화 {movie_id}와 배우 {actor} 처리 완료")
    
    # 상영형태 처리
    for show_type in showTypes:
        show_type_id = get_relation_id(connection, "show_type", show_type)
        insert_relation(connection, movie_id, "show_type", show_type_id)
        print(f"영화 {movie_id}와 상영형태 {show_type} 처리 완료")
    
    # 제작사 처리
    for company in companies:
        company_id = get_relation_id(connection, "company", company)
        insert_relation(connection, movie_id, "company", company_id)
        print(f"영화 {movie_id}와 영화사 {company} 처리 완료")

 

  • get_relation_id 함수:
    • 특정 관계 테이블(nation, genre, director, actor, show_type, company)에서 column_nm(예: 국가명, 장르명 등)을 검색
    • 해당 값이 이미 존재하면 ID를 반환
    • 존재하지 않으면 새 데이터를 삽입하고 삽입된 ID를 반환
  • insert_relation 함수:
    • 영화(movie_id)와 특정 관계 테이블 항목(table_nm의 column_id)의 관계를 movie_and_relation 테이블에 추가
    • 추가하기 전에 중복 여부를 확인
    • 중복되지 않으면 데이터를 삽입하고 관계를 매핑
  • process_movie_relations 함수:
    • 영화 ID(movie_id)를 중심으로 여러 관계(국가, 장르, 감독, 배우, 상영형태, 제작사)를 처리
    • 각 관계 데이터(nations, genres, 등등)를 순회하면서 다음 작업 수행:
      1. get_relation_id를 호출해 관계 테이블에서 ID 확인 또는 추가
      2. insert_relation을 호출해 영화와 관계 데이터를 매핑
    • 관계별로 처리 완료 메시지를 출력

 

3) Load (데이터 적재): 정제된 데이터를 MySQL에 삽입합니다. 

# 데이터 로드(Load)
def load_data_to_mysql():
    # MySQL 연결
    connection = pymysql.connect(host=MYSQL_HOST, user=MYSQL_USER, password=MYSQL_PASSWORD, db=MYSQL_DB)
    cursor = connection.cursor()

    # 1. 박스오피스 데이터 가져오기
    date = "20241127"  # 날짜 입력
    box_office_data = fetch_daily_box_office(date)

    # 2. 영화 상세 정보 가져오기
    movie_info_data = {}
    for movie in box_office_data:
        movieCd = movie.get("movieCd")
        movie_info_data[movieCd] = fetch_movie_info(movieCd)

    # 3. 데이터 변환
    clean_records_box_office, clean_records_movie_details = transform_data(box_office_data, movie_info_data)

    # 4. 박스오피스 데이터 삽입
    for record in clean_records_box_office:
        insert_sql = """
        INSERT INTO daily_box_office (
            movie_id, movie_rank, movie_nm, open_date, sales_amt, sales_acc, audi_acc, scrn_cnt, show_cnt)
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
        ON DUPLICATE KEY UPDATE sales_amt = VALUES(sales_amt)
        """
        cursor.execute(insert_sql, record)
        connection.commit()

    # 5. 영화 상세 데이터 삽입
    for record in clean_records_movie_details:
        insert_sql = """
        INSERT INTO movie_details (
            movie_id, nation_nm, genre_nm, director_nm, actor_nm, show_type_nm, company_nm, watch_grade_nm) 
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
        ON DUPLICATE KEY UPDATE
            nation_nm = IFNULL(VALUES(nation_nm), nation_nm),
            genre_nm = IFNULL(VALUES(genre_nm), genre_nm),
            director_nm = IFNULL(VALUES(director_nm), director_nm),
            actor_nm = IFNULL(VALUES(actor_nm), actor_nm),
            show_type_nm = IFNULL(VALUES(show_type_nm), show_type_nm),
            company_nm = IFNULL(VALUES(company_nm), company_nm),
            watch_grade_nm = IFNULL(VALUES(watch_grade_nm), watch_grade_nm)
        """
        cursor.executemany(insert_sql, clean_records_movie_details)
        cursor.connection.commit()
    
    # 연결 종료
    connection.close()

def load_movie_relations_to_mysql(box_office_data, movie_info_data):
    connection = pymysql.connect(host=MYSQL_HOST, user=MYSQL_USER, password=MYSQL_PASSWORD, db=MYSQL_DB)
    cursor = connection.cursor()

    # 데이터 정제
    clean_records_box_office, clean_records_movie_details = transform_data(box_office_data, movie_info_data)

    # 영화 상세 정보 처리
    for movie in clean_records_movie_details:
        print(f"처리 중인 영화: {movie}\n")
        movie_id = movie[0]  # 영화ID
        nations = movie[1].split(",") if movie[1] else []  # 국가 목록
        genres = movie[2].split(",") if movie[2] else []  # 장르 목록
        directors = movie[3].split(",") if movie[3] else []  # 감독 목록
        actors = movie[4].split(",") if movie[4] else []  # 배우 목록
        showTypes = movie[5].split(",") if movie[5] else []  # 상영형태 목록
        companies = movie[6].split(",") if movie[6] else []  # 제작사 목록

        # 관계 테이블에 데이터 삽입
        process_movie_relations(connection, movie_id, nations, genres, directors, actors, showTypes, companies)

    # 한 번에 커밋
    connection.commit()
    # 연결 종료
    connection.close()

 

4) Main 함수 

if __name__ == "__main__":
    target_date = "20241127"  # 데이터를 수집할 날짜
    try:
        # MySQL 연결 생성
        connection = pymysql.connect(host=MYSQL_HOST, user=MYSQL_USER, password=MYSQL_PASSWORD, db=MYSQL_DB)
        cursor = connection.cursor() 

        # 1. 일별 박스오피스 데이터 수집
        box_office_data = fetch_daily_box_office(target_date)
        
        # 2. 각 영화의 상세정보 수집
        movie_info_data = {}
        for movie in box_office_data:
            movieCd = movie["movieCd"]
            movie_info_data[movieCd] = fetch_movie_info(movieCd)
       
        # 3. 데이터 정제
        box_office_records, movie_details_records = transform_data(
            box_office_data, movie_info_data)
     
        # 4. MySQL에 데이터 삽입
        load_data_to_mysql()  
        load_movie_relations_to_mysql(box_office_data, movie_info_data)
 
        # 연결 종료
        cursor.close()
        connection.close()

    except Exception as e:
        print(f"오류 발생: {e}")

 

5. 실행 결과

20241127 일자의 데이터를 저장한 결과, 

위와 같이 관계 테이블의 중복은 제외하고, 데이터가 잘 저장되는 것을 확인할 수 있다. 

이제, MySQL에 데이터가 적재되어 있는지 확인해 보자. 

 

1) daily_box_office(일별 박스오피스) 테이블 

daily_box_office 테이블 조회 결과

 

2) movie_details(영화 상세정보) 테이블

movie_details 테이블 조회 결과

 

3) ex. movie_id = '20240737' 모아나 2 결과 확인(일부분) 

SELECT * FROM nation;
-- 1 미국
-- 2 한국
-- 3 캐나다
-- 4 일본 
-- 5 대만 
-- 6 영국


SELECT * FROM movie_and_nation WHERE movie_id = '20240737';
-- movie_id  nation_id
-- 20240737  1
-- 20240737  3

SELECT * FROM genre;
-- genre_id  genre_nm 
-- 1		 판타지
-- 2 		 뮤지컬
-- 3 		 스릴러
-- 4 		 액션
-- 5 		 사극
-- 6 		 드라마
-- 7 		 멜로/로맨스 
-- 8 		 애니메이션
-- 9 		 코미디
-- 10 		 공연
-- 11 		 다큐멘터리

SELECT * FROM movie_and_genre where movie_id = '20240737';
-- movie_id  genre_id
-- 20240737  8
  • movie_and_nation, genre, director, actor, show_type, company 테이블 모두 데이터 중복 없이 데이터가 의도대로 삽입되는 것을 확인할 수 있다. 
  • movie_id로 '모아나 2'의  nation, movie_and_nation, genre, movie_and_genre 테이블을 확인해 본 결과, 데이터 삽입이 잘 된 것을 확인할 수 있다. 

 


다음에는, 하루치 데이터가 아닌 장기 데이터를 저장하고 간단한 데이터 분석과 데이터 시각화를 진행해 볼 예정이다. 

일단은 일주일치 데이터를 저장해 보고, 다음에는 한 달 치, 일 년 치 데이터를 저장해서 다뤄봐야겠다. 

 

이후에는 매일매일 데이터를 저장하도록 자동화 시스템을 구축해보려고 한다. 

또, 데이터 검증과 데이터 품질도 확인해 보면 좋겠다.