https://do-one-more.tistory.com/6

 

데이터 수집 추출 : Homebrew를 이용한 MYSQL 환경 구축 - 4장

먼저 BREW가 있다는 과정하에 글을 작성합니다.[STEP : 1 ] 맥에서는 brew install mysql을 통해 mysql을 설치할 수 있습니다.이걸 통해 설치 한 다음에 기본적인 설정을 해주면 됩니다.[STEP : 2] mysql_secure_ins

do-one-more.tistory.com

MYSQL에 데이터 베이스에서 만들었다면
MySQL workbench를 다운 받아서 좀 더 작업하기 쉽게 만들어보자!

brew install --cask mysqlworkbench

를 통해 mysql workbench를 받고 열어서 자신이 설정해서 mysql에 들어갔던데로 username과 password를 지정해주면 된다.

입력이 잘 된 모습~
높은 버전은 오류가 나는데 일단 가자
짜잔 클릭하면 들어올 수 있다!


전체추출을 해본다.

왼쪽 위에 스키마를 클릭하면 db이름을 볼 수 있다. 스키마는 DB와 동일하다고 생각하면 된다. 

그 이후에 아래 코드를 사용해서 전체추출을 해볼 수 있다. 

select * from pipeline_db.Orders;


증분추출을 해본다.

0. 증분 추출의 특징

- 추출 작업의 시간으로 데이터가 업데이트 된다면 시간

1. 증분 추출의 장점

- 증분 추출을 사용하면 업데이트 된 행을 더 쉽게 캡쳐할 수  있다.

2. 증분 추출의 단점

- 삭제된 행은 캡쳐되지 않는다.

- 원본 테이블에는 마지막으로 업데이트 된 시간에 대한 신뢰할 수 있는 타임스탬프가 있어야 한다.
- 그러기 위해서 업데이트가 된다면 업데이트 된 날짜로 데이터를 하나 더 추가해야한다.

SELECT *
FROM pipeline_db.Orders
WHERE LastUpdated > {{ last_extraction_run} };

가장 최신의 데이터인 2020-07-12:00:00 으로 날짜를 정했다.


vsCode를 이용해서 mysql에 연결해보자!

먼저 mysql과 연동할 수 있도록 하는 라이브러리를 install 해줘야 한다.

pip install pymysql

 

그 다음에 pipeline.conf라는 파일에 아까 connection을 만든 것처럼 연결 정보를 저장해야한다.

 

import pymysql
import csv
import boto3
import configparser

# mysql에 접속하기 위한 정보 받아오기
parser = configparser.ConfigParser()
parser.read("pipeline.conf")
HOSTNAME = parser.get("mysql_config", "HOSTNAME")
PORT = parser.get("mysql_config", "PORT")
DB_USER_NAME = parser.get("mysql_config", "DB_USER_NAME")
DB_PASSWORD = parser.get("mysql_config", "DB_PASSWORD")
DATABASE = parser.get("mysql_config", "DATABASE")

# mysql에 연결하기
conn = pymysql.connect(
host=HOSTNAME,
user=DB_USER_NAME,
password=DB_PASSWORD,
db=DATABASE,
port=int(PORT)
)

if conn is None:
print("Error connecting to the MySql database")
else:
print("MySQL connection established")
그럼 established 라고 나오면 접근하게 되었다.

전체추출을 해보고 CSV 파일을 S3에 올려보자

import pymysql
import csv
import boto3
import configparser

# mysql에 접속하기 위한 정보 받아오기
parser = configparser.ConfigParser()
parser.read("pipeline.conf")
HOSTNAME = parser.get("mysql_config", "HOSTNAME")
PORT = parser.get("mysql_config", "PORT")
DB_USER_NAME = parser.get("mysql_config", "DB_USER_NAME")
DB_PASSWORD = parser.get("mysql_config", "DB_PASSWORD")
DATABASE = parser.get("mysql_config", "DATABASE")

# mysql에 연결하기
conn = pymysql.connect(
host=HOSTNAME,
user=DB_USER_NAME,
password=DB_PASSWORD,
db=DATABASE,
port=int(PORT)
)

if conn is None:
print("Error connecting to the MySql database")
else:
print("MySQL connection established")

# 쿼리문 작성
m_query = "SELECT * FROM Orders;"

# 추출할 파일 이름
local_filename = "order_extract.csv"

# mysql에 연결 후 추출해서 전체 가져오기
m_cursor = conn.cursor()
m_cursor.execute(m_query)
results = m_cursor.fetchall()

# csv파일로 작성하기
with open(local_filename, 'w') as fp:
csv_w = csv.writer(fp, delimiter=',')
csv_w.writerows(results)

# 연 파일들 종료하기
fp.close()
m_cursor.close()
conn.close()

# S3에 접속하기 위한 정보 받아오기
parser = configparser.ConfigParser()
parser.read("pipeline.conf")
ACCESS_KEY = parser.get("aws_s3_config", "ACCESS_KEY")
SECRET_KEY = parser.get("aws_s3_config", "SECRET_KEY")
BUCKET_NAME = parser.get("aws_s3_config", "BUCKET_NAME")

# S3에 연결
s3 = boto3.client(
's3',
aws_access_key_id = ACCESS_KEY,
aws_secret_access_key = SECRET_KEY
)

s3_file = local_filename

# S3에 업로드하기
s3.upload_file(local_filename, BUCKET_NAME, s3_file)


데이터를 제대로 읽어온 모습을 볼 수 있다.
s3에 파일이 올라간 모습을 확인할 수 있다

 

https://github.com/woongCat/data_pipeline_guide

 

GitHub - woongCat/data_pipeline_guide: 데이터파이프라인 핵심 가이드 책을 따라해본 hub

데이터파이프라인 핵심 가이드 책을 따라해본 hub. Contribute to woongCat/data_pipeline_guide development by creating an account on GitHub.

github.com

 

+ Recent posts