process_pdf_from_s3(bucket_name, file_key, output_prefix)
s3에서 가져온 내용을 ocr에 넣어주기 위해 한장씩 jpg 변환 후 저장
keywords.append(make_keywords_list(output_folder_s3_path))
한장씩 ocr 이용하여 추출한 키워드 리스트 (make_keywords_list
) 를 모두 하나의 리스트로 저장
flatten_array(keywords)
다중배열(처음 ocr로 추출되어진 배열 형식)을 단일배열로 만들어 빈도수 측정하도록 전처리
상위 10개 선택 혹은 빈도수 2개 이상인 것만 전처리
DB & elastic 업로드
# -------------------------------------------------------- #
# IR 아이템 하나씩 dynamodb,s3,elastic cloud에 업로드 하는 파일
# -------------------------------------------------------- #
import boto3
import uuid
from elasticsearch import Elasticsearch
import csv
import os
import io
import os
from pdf2image import convert_from_path
from ocr_connection import ocr_connect
import re
### ---------- 명사 판별 위한 자연어 처리 라이브러리 ---------- ###
from konlpy.tag import Komoran
komoran = Komoran()
def is_noun(word):
pos = komoran.pos(word)
return any(tag in ['NNG', 'NNP'] for _, tag in pos)
# elastic 연결
cloud_id = 'univ=============================='
username = 'e==========='
password = 'wNl==========='
## 리소스 세팅
# AWS 리소스 생성
s3 = boto3.resource('s3')
bucket = s3.Bucket('luck4-ir-bucket')
#dynamodb 접근
dynamodb = boto3.resource('dynamodb', region_name='ap-northeast-2')
table = dynamodb.Table('LUCK4_IR_DB')
# S3 버킷과 PDF 파일 경로 설정
bucket_name = 'luck4-ir-bucket'
pdf_file_name = 'IR_5page_test.pdf' #./IR_PROJECT/TEST/IR_5page_test.pdf
pdf_file_path=f'./IR_PROJECT/TEST/{pdf_file_name}' #### 혜수: 여기 경로만 좀 고침
pdf_file_path_0='' # 9개짜리 앞부분
pdf_file_path_1='' # 9개짜리 뒷부분
s3_file_path = f'{pdf_file_name}' # S3 내에서 저장될 경로
output_folder= f'output_jpgs' # pdf을 여러장으로 나눈 jpg 이미지들이 저장될 폴더
output_folder_path=f'./IR_PROJECT/TEST/{output_folder}'
output_folder_s3_path=f'./IR_PROJECT/TEST/output_s3_jpgs'
#unique id 생성
unique_id = str(uuid.uuid4()) # 무작위 UUID 생성
poppler_path = '/usr/bin/'
## --------------- s3에서 가져오기 ------------------------ ##
# AWS 서비스에 연결
s3_client = boto3.client('s3')
# S3 버킷 이름과 파일 키
file_key = 'IR_all_test.pdf' # 실제 파일 키로 변경
try:
# S3 파일 가져오기
response = s3_client.get_object(Bucket=bucket_name, Key=file_key)
# 파일 내용 읽기
pdf_content = response['Body'].read()
# 파일 내용 출력
print(pdf_content.decode('utf-8')) # PDF 파일의 내용을 출력
except Exception as e:
print("Error:", str(e))
## -------------- pdf를 한장씩 분리해서 jpg로 저장해주는 함수 --------------------- ##
# pdf 한장씩 변환해주는 함수-- s3용
import io
import os
import boto3
import fitz # PyMuPDF
from PIL import Image
def pdf_to_jpg(pdf_bytes):
pdf_document = fitz.open("pdf", pdf_bytes)
jpg_images = []
for page_number in range(pdf_document.page_count):
page = pdf_document[page_number]
image = page.get_pixmap(matrix=fitz.Matrix(300/72, 300/72)) # Adjust resolution as needed
jpg_stream = io.BytesIO()
img = Image.frombytes("RGB", [image.width, image.height], image.samples)
img.save(jpg_stream, format="JPEG")
jpg_stream.seek(0)
jpg_images.append(jpg_stream)
pdf_document.close()
return jpg_images
def process_pdf_from_s3(bucket_name, pdf_key, output_prefix):
s3 = boto3.client('s3')
# Get the PDF file from S3
pdf_object = s3.get_object(Bucket=bucket_name, Key=pdf_key)
pdf_bytes = pdf_object['Body'].read()
# Convert PDF to JPG images
jpg_images = pdf_to_jpg(pdf_bytes)
# Create the output folder in the local directory
output_folder = os.path.join(os.getcwd(), output_prefix)
os.makedirs(output_folder, exist_ok=True)
# Save JPGs to the output folder
jpg_paths = []
for index, jpg_image in enumerate(jpg_images):
jpg_filename = f'page_{index + 1}.jpg'
jpg_path = os.path.join(output_folder, jpg_filename)
# Save the JPG image to the local output folder
with open(jpg_path, 'wb') as jpg_file:
jpg_file.write(jpg_image.getvalue())
jpg_paths.append(jpg_path)
return jpg_paths
output_prefix='./IR_PROJECT/TEST/output_s3_jpgs/'
resulting_jpg_paths = process_pdf_from_s3(bucket_name, file_key, output_prefix)
print("Saved JPG paths:")
for jpg_path in resulting_jpg_paths:
print(jpg_path)
## -------------- keywords 뽑아내기 --------------------- ##
# output_folder에서 이미지들을 하나씩 불러와 keyword 뽑기 함수를 실행해준다
# make_keywords_list 함수 : 이미지 9개씩 keywords 배열 만드는 함수 -> 총 2번 실행시켜야 한다.
# keys : 9장의 키워드들 (반환)
# output_folder_path : 9개씩 이미지가 저장된 폴더 위치
def make_keywords_list(output_folder_path):
keys=[]
for filename in os.listdir(output_folder_path):
if filename.endswith('.jpg') or filename.endswith('.jpeg') or filename.endswith('.png'):
image_path = os.path.join(output_folder_path, filename)
print(image_path)
# OCR 한장짜리 함수 실행
result = ocr_connect(image_path)
# result = process_image(image_path) # 예시
keys.append(result) # 예시
return keys
# 전체 두번 루프를 반복하도록 한다(모으기 작업)
keywords=[]
# 한번에 할때
keywords.append(make_keywords_list(output_folder_s3_path))
## -------------- keywords에 다 모아진 상태, 정리&빈도수 측정 단계 --------------------- ##
# keywords=["keyword1", "keyword2", "keyword3"] # test 용
# print("keywords"+str(keywords)) #test
# 키워드 빈도수 측정하여 저장하는 코드
# Flatten the nested arrays into a single list
import numpy as np
def flatten_array(arr):
flat_list = []
for item in arr:
if isinstance(item, (list, np.ndarray)):
flat_list.extend(flatten_array(item))
else:
flat_list.append(item)
return flat_list
# 다중 배열을 단일 배열로 변환
flat_data = flatten_array(keywords)
# print(flat_data)
# Initialize an empty dictionary for word frequency
word_count = {}
# Loop through each word in the flattened list
for word in flat_data:
# Increment the count for each word in the dictionary
if word in word_count:
word_count[word] += 1
else:
word_count[word] = 1
# Print the word frequency dictionary
# print(word_count)
# Sort the word frequency dictionary by values in descending order
sorted_word_count = dict(sorted(word_count.items(), key=lambda item: item[1], reverse=True))
# Print the sorted word frequency dictionary
print("\\nsorted_word_count:\\n")
# print(sorted_word_count)
#### 앞에 숫자. 특수문자, 공백 다 제거하는 ####
sorted_word_count_2 = {}
for key, value in sorted_word_count.items():
field_name = re.sub(r'[^\\w\\s-]', '', key)
# if re.match(r'^[^\\w\\s-]', key):
# 특수 문자 제거 또는 유효한 특수 문자로 대체
# field_name = re.sub(r'[^\\w\\s-]', '', key)
# else:
# field_name = key
# 공백 제거하고 단어 연결하여 의미 있는 이름 생성
# field_name = ''.join(key.split())
sorted_word_count_2[field_name] = value
# print(field_name)
#딕셔너리의 key가 명사인것만 가져오기
noun_dict = {key: value for key, value in sorted_word_count_2.items() if is_noun(key)}
# print(noun_dict)
#### 배열로만 저장할 것 ####
noun_list = list(noun_dict.keys())
# print(noun_list)
#### 빈도수가 2이상만 있는 리스트 생성####
noun_list_2=[]
for key, value in noun_dict.items():
if value >= 2:
noun_list_2.append(key)
print("\\n\\nnoun_list_2\\n")
print(noun_list_2)
print("noun_list_2")
#Tagging - 상위 10개 키워드만 따로 저장
tagging_list = []
tagging_list = noun_list_2[0:10]
print("\\n tagging_list \\n")
print(tagging_list)
## --------------DB & Elastic 업로드 --------------------- ##
# Dynamo에 업로드
item={
"startup_id": '0', # PK (1씩 증가하도록)
"ir_id": 'IR#kor', # SK
"startup_name": '광진기업',
'file_id': unique_id, # 파일 ID는 고유하게 생성
's3_path': s3_file_path,
'keyword_list': noun_list_2,
'keywords_dict': noun_dict, # 키워드 배열 저장
'taggings': [{"tag": tag} for tag in tagging_list] if tagging_list else None
}
# print(item) # test
table.put_item(Item=item)
print("\\ndynamo 완료")
# elastic 에도 업로드
# es 생성
es = Elasticsearch(
cloud_id=cloud_id,
basic_auth=(username, password),
)
# Elasticsearch에 데이터 색인
es.index(index='luck4_ir_db_2', id=item['startup_id'], document=item)
print("elastic완료")