1부터 N까지 정수 N개로 이루어진 순열을 나타내는 방법은 여러 가지가 있다. 예를 들어, 8개의 수로 이루어진 순열 (3, 2, 7, 8, 1, 4, 5, 6)을 배열을 이용해 표현하면 \(\begin{pmatrix} 1 & 2 &3&4&5&6&7&8 \\ 3& 2&7&8&1&4&5&6 \end{pmatrix}\) 와 같다. 또는, Figure 1과 같이 방향 그래프로 나타낼 수도 있다.
순열을 배열을 이용해 \(\begin{pmatrix} 1 & \dots & i & \dots &n \\ \pi_1& \dots& \pi_i & \dots & \pi_n \end{pmatrix}\) 로 나타냈다면, i에서 πi로 간선을 이어 그래프로 만들 수 있다.
Figure 1에 나와있는 것 처럼, 순열 그래프 (3, 2, 7, 8, 1, 4, 5, 6) 에는 총 3개의 사이클이 있다. 이러한 사이클을 "순열 사이클" 이라고 한다.
N개의 정수로 이루어진 순열이 주어졌을 때, 순열 사이클의 개수를 구하는 프로그램을 작성하시오.
입력
첫째 줄에 테스트 케이스의 개수 T가 주어진다. 각 테스트 케이스의 첫째 줄에는 순열의 크기 N (2 ≤ N ≤ 1,000)이 주어진다. 둘째 줄에는 순열이 주어지며, 각 정수는 공백으로 구분되어 있다.
출력
각 테스트 케이스마다, 입력으로 주어진 순열에 존재하는 순열 사이클의 개수를 출력한다.
풀이
n = int(input())
# 바로 바꾼 버젼
for _ in range(n):
t = int(input())
adj_lst = [0] + list(map(int,input().strip().split()))
count = 0
for i in range(1,t+1):
# 순환하는 대로 가고 싶으니 현재 값을 지정함
now_num = i
if adj_lst[i]: # 값이 0이라면 순환하면 안 됨
while True: # 순환 함
next_num = adj_lst[now_num] # 다음 값을 가져오고
adj_lst[now_num] = 0 # 현재 값은 0이 됨
if next_num == 0: # 만약 다음값이 0 이라면 순환 끝 종료
count += 1
break
# 만약 순환 종료 되지 않으면 다음 값이 현재 값이 되고 순환함
now_num = next_num
print(count)
n = int(input())
# visited를 기록하는 버젼
# 이게 더 빠르다
for _ in range(n):
t = int(input())
adj_lst = [0] + list(map(int,input().strip().split()))
visited = [0] * (t+1)
count = 0
for i in range(1,t+1):
# 순환하는 대로 가고 싶으니 현재 값을 지정함
now_num = i
if visited[i] == 0:
while True: # 방문하지 않았다면 순환함
next_num = adj_lst[now_num] # 방문할 장소 기록
visited[now_num] = 1 # 현재는 방문한 걸로 기록
if visited[next_num] == 1:
count += 1
break
# 만약 순환 종료 되지 않으면 다음 값이 현재 값이 되고 순환함
now_num = next_num
print(count)
# 인접 리스트를 이용한 그래프 탐색
found link(root) 1
visit 1
found link 2
visit 2
found link 3
visit 3
link not found
link not found
found link 3
already visited
found link 4
visit 4
found link 3
already visited
link not found
link not found
end
# 수행 시간 계산 - 정점의 제곱 visit : 4 # 노드의 개수만큼 visit함 found link : 5 + 1 # 링크의 개수만큼 found함 link not found : 4 # 노드 개수만큼 link not found already visited : 2 #
정점의 개수 : N = 4, 간선의 개수 : M = 5
수행시간 : T = visit + found link +link not found + already visited = (visit + aleardy visited) + found link + link not found = 2 * found link + link not found = 2 * (M+1) + N => O(M+N)
# 인접 행렬을 이용한 그래프 탐색
visited 1: o
visited 2: o
visited 3: o
visited 4: x
found link(root) 1
visit 1
link not found 1
found link 2
visit 2
link not found 1
link not found 2
found link 3
visit 3
link not found 1
link not found 2
link not found 3
link not found 4
link not found 4
found link 3
already visited
found link 4
visit 4
link not found 1
link not found 2
found link 3
already visited
link not found 4
end
# 수행 시간 계산 -정점의 제곱 found link : 6 link not found : 11 visit + already visited = found link found link + link not found = visit * n
풀이
A, B, C = map(int,input().split())
# G[x] = [, , , ,]: x에 연결된 정점들
adj_lst = [[] for _ in range(11)]
adj_mat = [[0] * 11 for _ in range(11)]
for x in range(1,11):
for y in range(1,11):
if A*x + B*y == C:
adj_lst[x].append(y) # 인접 리스트를 만드는 방법
adj_mat[x][y] = 1 # 인접 행렬을 만드는 방법
def print_adj_mat(adj_mat):
for i in range(1,11): # 인접 행렬로 출력하는 방식
conn = [j for j in range(1,11) if adj_mat[i][j] == 1]
if conn:
print(*conn)
else:
print(0)
def print_adj_lst(adj_lst):
for i in range(1,11):
if adj_lst[i]:
print(*adj_lst[i])
else:
print(0)
널리 잘 알려진 자료구조 중 최대 힙이 있다. 최대 힙을 이용하여 다음과 같은 연산을 지원하는 프로그램을 작성하시오.
배열에 자연수 x를 넣는다.
배열에서 가장 큰 값을 출력하고,그 값을 배열에서 제거한다.
프로그램은 처음에 비어있는 배열에서 시작하게 된다.
입력
첫째 줄에 연산의 개수 N(1 ≤ N ≤ 100,000)이 주어진다. 다음 N개의 줄에는 연산에 대한 정보를 나타내는 정수 x가 주어진다. 만약 x가 자연수라면 배열에 x라는 값을 넣는(추가하는) 연산이고, x가 0이라면 배열에서 가장 큰 값을 출력하고 그 값을 배열에서 제거하는 경우이다. 입력되는 자연수는 231보다 작다.
출력
입력에서 0이 주어진 횟수만큼 답을 출력한다. 만약 배열이 비어 있는 경우인데 가장 큰 값을 출력하라고 한 경우에는 0을 출력하면 된다.
python의 내장된 heapq 라이브러리는 최소 힙을 기본으로 한다. 여기서는 최소힙을 통해 최대 힙을 만들면 된다.
heapq.heappush(heap, x) : 힙에 값을 넣는다. 그리고 최소 힙에 맞게 리스트의 처음이 최솟값으로 정렬되어있는다.
heapq.heappop(heap) : 가장 작은 값을 꺼내고 힙에서 삭제한다.
최소 힙이므로 최대 힙을 만들기 위해 넣어야 하는 값을 음수로 변환해서 튜플 형식으로 같이 값을 넣어주면 된다.
heapq.heappush(max_heap,(-x, x)) 이런식으로 값을 넣어주면 된다. 그리고 튜플에 2번째 값을 출력하면 된다.
import heapq
import sys
input = sys.stdin.readline
n = int(input())
max_heap = []
for _ in range(n):
x = int(input())
if x == 0:
if max_heap:
print(heapq.heappop(max_heap)[1])
else:
print(0)
else:
heapq.heappush(max_heap,(-x,x))
print(max_heap)
다솜이는 나머지와 투표 수와 같거나 작으면 안 된다. 나머지의 투표 수는 다솜이보다 크거나 같으면 안 된다.
1. 다솜이의 투표수를 가져온다.
2. 다솜이보다 크거나 같은 나머지의 투표 수를 구한다.
3. 가장 큰 나머지의 투표 수를 -1하고 다솜이의 투표 수를 +1 한다.
4. 다솜이의 투표 수가 나머지 투표 수보다 크다면 반복을 종료 한다.
n = int(input())
dasom = int(input())
if n == 1:
print(0)
else:
votes = []
for _ in range(n-1): # 다솜이의 투표 수보다 나머지 투표 수가 크거나 같은 경우만 저장
vote_size = int(input())
if vote_size >= dasom:
votes.append(vote_size)
if votes: # 다솜이의 투표수가 제일 크지 않은 경우에만 실시 = 매수할 필요가 없는 경우
count = 0
while True:
dasom += 1
votes[votes.index(max(votes))] -= 1
count += 1
if dasom > max(votes): # 매수를 해서 다솜이의 투표소가 제일 큰 경우 정지
break
print(count)
else: # 다솜이의 투표수가 제일 크다면 매수할 필요 없음
print(0)
어차피 로컬에서 분석을 돌리고 저장을 sql에다가 하면 되기 때문에 지금은 필요 없다고 판단 했다.
그리고 기존에 로컬에 있던 json,csv 파일은 전부 mongodb에 저장하기로 했다.
그리고 Connecting to로 Vscode에 MongoDB Extension을 추가해 연결했다.
vscode에 연결하기
그래서 이제부터는 지금까지 local에 파일로 저장되거나 elastic에 저장되도록 되어있는 코드들을 전부 MongoDB에 저장되도록 변경해야 한다.
db_utils라는 파일에 함수를 만들었다.
from pymongo import MongoClient
from dotenv import load_dotenv
import os
def connect_to_mongo(db_name, collection_name):
"""MongoDB 연결"""
load_dotenv()
pwd = os.getenv("MONGODB_PWD") # 나중에 암호를 추가해야할 수 있음
client = MongoClient(f"mongodb://localhost:27017/")
db = client[db_name]
collection = db[collection_name]
return client, db, collection
def save_to_mongo(data, db_name, collection_name, mongo_uri="mongodb://localhost:27017/"):
"""
데이터를 MongoDB에 저장하는 함수.
:param data: 저장할 데이터 (리스트 또는 딕셔너리)
:param db_name: 사용할 데이터베이스 이름
:param collection_name: 사용할 컬렉션 이름
:param mongo_uri: MongoDB 연결 URI (기본값: 로컬 MongoDB)
"""
try:
client, collection = connect_to_mongo(db_name,collection_name,mongo_uri)
# 데이터 저장
if isinstance(data, list): # 리스트 형태면 여러 개 저장
collection.insert_many(data)
else: # 단일 딕셔너리면 하나만 저장
collection.insert_one(data)
print(f"✅ 데이터 저장 완료! (DB: {db_name}, Collection: {collection_name})")
except Exception as e:
print(f"❌ 데이터 저장 실패: {e}")
finally:
client.close()
그 이후 데이터를 mongodb에 추가했다.
from common.api_utils import load_api_key, fetch_data
from common.db_utils import save_to_mongo
if __name__ == "__main__":
url = "https://open.assembly.go.kr/portal/openapi/nekcaiymatialqlxr"
key = load_api_key()
if key is None:
raise ValueError("❌ OPEN_GOVERMETN_API_KEY 환경 변수가 설정되지 않았습니다.")
all_data = fetch_data(url, key)
save_to_mongo(all_data, "government","congress_scedhule")
아래와 같이 얻은 Mongodb 데이터를 확인할 수 있다.
result = collection.find({},{"MEETTING_DATE":1,"_id":0}).to_list()
sql 문으로 치면 아래와 같다.
SELECT MEETTING_DATE
FROM CONGRESS_SCEDHULE;
앞에 {}에는 where절이고 뒤에 {}에는 select절이고 0과 1로 들어감 여부를 확인할 수 있다.
from pymongo import MongoClient
# MongoDB 서버에 연결
client = MongoClient("mongodb://localhost:27017/")
db = client["government"]
collection = db["congress_scedhule"]
# 데이터베이스 목록 출력
db_list = client.list_database_names()
print("✅ 현재 MongoDB 데이터베이스 목록:", db_list)
# 데이터 조회
result = collection.find({},{"MEETTING_DATE":1,"_id":0}).to_list()
print("✅ MongoDB 데이터 확인:", result)
# 연결 종료
client.close()
RDS와 연결하기
brew install postgresql
psql postgres
1. postgresql을 설치하고 연결한다.
2. 그 다음에 CREATE DATABASE {원하는 데이터베이스 이름}을 추가해 db에 추가한다.
3. vscode에 sqltools라는 extension에 값을 추가한다.
4. database 아키텍쳐를 설계한다.
5. 테이블을 만들어준다.
6. psycopg2-binary를 통해 MongoDB에서 값을 가져와 테이블에 insert 한다.
지금 만들 테이블
CREATE TABLE CONGRESS_SCHEDULE(
meeting_date DATE PRIMARY KEY,
get_pdf BOOLEAN NOT NULL DEFAULT FALSE
);
CREATE TABLE COMITTEE_SCHEDULE(
meeting_date DATE PRIMARY KEY,
get_pdf BOOLEAN NOT NULL DEFAULT FALSE
);
def get_schedule(db_name, collection_name):
"""위원회 및, 국회의원회의 날짜를 가져오는 함수"""
try:
client, db, collection = connect_to_mongo(db_name,collection_name)
result = collection.find({},{"MEETTING_DATE":1,"_id":0}).to_list()
print("✅ MongoDB 날짜 데이터 가져옴:")
return result
except Exception as e:
print(f"❌ mongodb 날짜 데이터 저장 실패: {e}")
finally:
client.close()
def connect_to_postgresql():
"""postgresql에 연결하는 함수"""
load_dotenv()
conn = psycopg2.connect(
host = os.getenv("SQL_HOST"),
dbname = os.getenv("SQL_DBNAME"),
user = os.getenv("SQL_USER"),
password = os.getenv("SQL_PWD"),
port = os.getenv("SQL_PORT")
)
return conn
def schedule_to_postgresql(schedule_list,collection_name):
try:
conn = connect_to_postgresql()
cur = conn.cursor()
for meetting_date in schedule_list:
date = meetting_date["MEETTING_DATE"]
cur.execute(f"""
INSERT INTO {collection_name} (meeting_date)
VALUES (%s)
ON CONFLICT (meeting_date) DO NOTHING
""", (date,))
conn.commit()
print("✅ postgres 데이터 저장 성공")
except Exception as e:
conn.rollback()
print(f"❌ postgres 데이터 저장 실패 : {e}")
finally:
conn.close()
import sys
import os
# 현재 스크립트 파일(get_meeting_schedule.py)의 디렉터리에서 상위 두 개 폴더를 경로에 추가
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../..")))
📌 설명
• os.path.dirname(__file__)→ 현재 파일(get_meeting_schedule.py)이 있는 디렉터리를 가져옴.
• os.path.join(..., "../..")→ 현재 폴더에서두 단계 위로 이동(extract_data기준).
2. 발언자는 직책과 이름 2단어로 구성되어 있고 그 사이에 공백이 있다. = ([\w]+ [\w]+)
3. 발언자와 발언 사이에는 공백이 여러개가 있다. = \s*\n*
4. 그 다음에 발언이 온다. +?를 사용한 이유는 ◯을 만나면 멈추기 위함이다. = ([\s\S]+?)
5. 다음 발언자 또는 문서 끝까지 매칭한다. = (?=\n◯|\Z)
[STEP 2 : 발언자, 발언 내용을 이용해 엘라스틱 서치에 들어갈 내용을 추가한다.]
def get_speaker_data(id,title,date,text):
""" 텍스트에서 발언자와 발언 내용을 추출하는 함수 """
speaker_pattern = re.compile(r"◯([\w]+ [\w]+)\s*\n*([\s\S]+?)(?=\n◯|\Z)", re.MULTILINE)
speech_list = []
for match in speaker_pattern.finditer(text):
speaker = match.group(1).strip()
speech = match.group(2).strip()
speech_list.append({
"document_id": id,
"title": title,
"date": date,
"speaker": speaker, # 발언자
"text": speech, # 발언
"summary": None, # 요약은 나중에 추가
'timestamp': datetime.now(), # 편집날짜
})
return speech_list
근데, elastic search는 어떻게 사용하나요?
아하, 저도 잘 몰라서 요것저것 알아봤습니다!
생각보다 딱 정리된 블로그가 없어서 애 좀 먹었습니다.
먼저 local로 설치하는 방법과 docker로 사용하는 방법이 있는데 손쉽게 하기 위해서 docker를 사용하기로 했습니다!
[STEP 3 : Docker를 이용해 elasticSearch를 가져온다.]
docker run -d --name elasticsearch -p 9200:9200 -e "discovery.type=single-node" docker.elastic.co/elasticsearch/elasticsearch:8.6.0
이렇게 설정해주면 된다.
그리고 .env에 넣어주면 된다.
근데, password라고 환경변수를 설정하지 말자. 자꾸 오류가 난다.
나는 결국, ELASTIC_SEARCH_PW라고 변수명을 바꾸고 나서야 됐다.
만약, 이걸 통해 비번을 바꿔주지 않는다면 로그를 확인하면 http traffic on an https channel을 볼 수 있고
elastic_transport.ConnectionError: 가 나오는 것을 확인할 수 있다.
docker logs elasticsearch
elastic_transport.ConnectionError: Connection error caused by: ConnectionError(Connection error caused by: ProtocolError(('Connection aborted.', RemoteDisconnected('Remote end closed connection without response'))))
[STEP 4 : elasticSearch에 pdf data를 추가한다.]
def connect_to_elasticsearch():
"""Elasticsearch에 연결하는 함수"""
load_dotenv()
Elasticsearch_key = os.getenv("ELASTIC_SEARCH_PW")
# Elasticsearch 연결
es = Elasticsearch(
"https://localhost:9200",
basic_auth=("elastic", Elasticsearch_key), # 인증 추가
verify_certs=False # 자체 서명된 인증서 문제 방지
)
# 인덱스 설정 (인덱스가 없으면 생성)
index_name = "congress_meetings"
if not es.indices.exists(index=index_name):
es.indices.create(index=index_name)
return es,index_name
def save_to_elasticsearch(data):
""" 추출된 국회의원 발언 데이터를 Elasticsearch에 저장하는 함수 """
es, index_name = connect_to_elasticsearch()
for entry in data:
es.index(index=index_name, body=entry)
print("Elasticsearch 저장 완료!")
[STEP 5:elasticSearch에 들어간 내용 확인하기.]
from elasticsearch import Elasticsearch
import os
# .env에서 환경변수 불러오기
from dotenv import load_dotenv
load_dotenv()
# 환경변수에서 비밀번호 가져오기
ELASTIC_PASSWORD = os.getenv("ELASTIC_SEARCH_PW")
# Elasticsearch 클라이언트 설정
es = Elasticsearch(
"https://localhost:9200",
basic_auth=("elastic", ELASTIC_PASSWORD), # 인증 추가
verify_certs=False # 자체 서명된 인증서 문제 방지
)
# 인덱스명 설정 (저장된 데이터가 들어 있는 인덱스)
index_name = "congress_meetings"
# 모든 데이터 조회
def get_all_documents():
query = {
"query": {
"match_all": {}
}
}
res = es.search(index=index_name, body=query, size=10) # 최대 10개 문서 조회
return res['hits']['hits']
# 데이터 가져오기
documents = get_all_documents()
for doc in documents:
print(f"문서 ID: {doc['_id']}, 내용: {doc['_source']}")
여기도 마찬가지로 대수가 나왔는데, 대수는 국회의원 대수고 예시에 맞춰서 '100022'면 22대 국회의원을 가져옵니다.
어디 나와있는 건 아닌데, 직관적으로 값을 넣어야 할 때도 있는 거 같습니다.
[STEP 1] 회의록 일정 가져오기
그럼 일정부터 json으로 저장하겠습니다.
import requests
from dotenv import load_dotenv
import json
import os
# 나중에 airflow에 추가되야 하는 코드
load_dotenv()
key = os.getenv("OPEN_GOVERMETN_API_KEY")
url = "https://open.assembly.go.kr/portal/openapi/nekcaiymatialqlxr"
params = {
"KEY" : key,
"Type" : 'json',
"pIndex" : '1',
"pSize" : "100",
"UNIT_CD" : "100022",
}
# 요청 받기
response = requests.get(url=url, params=params)
# json으로 저장 받기
file_path = 'congress_meeting/congress_schedule.json'
with open(file_path, 'w') as f:
json.dump(response.json(), f)
print(response.json())
[STEP 2] 본회의 기록 가져오기
이제 json을 저장했으니 다시 본 회의 회의록을 구하러 가봅시다~
받아온 json 안에서 국회 일정 리스트로 필수인자들을 가져오고 반복문을 통해서 다시 회의록이 담긴 json을 가져오면 됩니다~
import requests
from dotenv import load_dotenv
import os
import json
# 추후 dag로 변경될 필요 있음
load_dotenv()
key = os.getenv("OPEN_GOVERMETN_API_KEY")
def get_conf_dates():
# 국회 일정 리스트 가져오기
conf_date_list = []
with open("congress_meeting/congress_schedule.json" ,'r') as file:
schedule_file = json.load(file)
data_count = schedule_file['nekcaiymatialqlxr'][0]['head'][0]['list_total_count'] # 국회 일정 개수
for i in range(data_count):
conf_date_list.append(schedule_file['nekcaiymatialqlxr'][1]['row'][i]['MEETTING_DATE'])
return conf_date_list
def get_responses():
# 국회 회의록 가져오기
url = "https://open.assembly.go.kr/portal/openapi/nzbyfwhwaoanttzje"
conf_date_list = get_conf_dates()
for conf_date in conf_date_list:
print(f"데이터를 저장중입니다.{conf_date}")
params = {
"KEY" : key,
"Type" : 'json',
"pIndex" : '1',
"pSize" : "100",
"DAE_NUM" : "22",
"CONF_DATE" : conf_date
}
response = requests.get(url=url, params=params)
if response.status_code == 200:
with open (f"congress_meeting/meetings/{conf_date}.json",'w') as f:
json.dump(response.json(), f)
else:
print(response.text)
response.raise_for_status()
if __name__ == "__main__":
get_responses()
디렉토리를 하나 만들어서 json 파일들을 따르륵 저장해줍니다.
[STEP 3] pdfplumber로 URL에서 pdf 글자로 추출
이후에는 안에 pdf 파일들을 가져와서 pdfplumber를 사용해 글자만 가져옵니다.
pdf를 저장하지 않고 일단은 메모리를 통해 내용을 추출해옵니다.
import requests
import pdfplumber
import io
# PDF 다운로드 URL
url = "http://likms.assembly.go.kr/record/mhs-10-040-0040.do?conferNum=054706&fileId=0000124454&deviceGubun=P&imsiYn=P"
# HTTP 요청하여 PDF 다운로드
response = requests.get(url)
response.raise_for_status() # 오류 발생 시 예외 처리
# PDF 내용을 메모리에서 직접 읽기
with pdfplumber.open(io.BytesIO(response.content)) as pdf: # 반면, io.BytesIO를 사용하면 메모리에서 직접 처리할 수 있어 더 효율적입니다.
for page in pdf.pages:
print(page.extract_text()) # 페이지별 텍스트 출력
# 이후에는 이 내용을 어떻게 저장할지 데이터베이스로 만들어야 함
이렇게 하면 저장된 데이터에 pdf를 읽어올 수 있고 이를 반복문으로 json 파일에서 읽게 해오면 됩니다.
[STEP 4] URL에서 PDF 추출 후 데이터베이스에 적재
근데 어떻게 해야 데이터를 효율적으로 저장할 수 있을 지 고민이 됩니다. 1. full text
2. csv or json
3. elastic search
가장 사용해보고 싶은 방식은 elastich search로 가져오고 싶습니다.
왜냐하면 국회의원 별로 어떤 발언을 했는 지 요약을 하기 위함입니다.
elastic search에 역색인을 통해서 국회의원 발언을 검색해서 국회의원 별 발언을 정리해보고 싶습니다.
그래서 데이터베이스를 어떤 모양으로 만들어야 할지가 관건입니다.
국회의원만 있으면 데이터가 처리되는지 국회의 진행방식과 pdf를 눈으로 확인하고 데이터를 어떻게 저장할지 결정해야 할 거 같습니다.