将json输出为excel或csv
代码
python
import json
from openpyxl import Workbook, load_workbook
from openpyxl.utils import get_column_letter
from pathlib import Path
import csv
import re
class BaseHelper:
def __init__(self, file_path, is_append=True, progress_interval=100):
self.file_path = Path(file_path)
self.is_append = is_append
self.progress_interval = progress_interval
self.headers = []
def add_data_list(self, data_array):
raise NotImplementedError("Subclasses should implement this method")
def print_rows(self):
raise NotImplementedError("Subclasses should implement this method")
def close(self):
raise NotImplementedError("Subclasses should implement this method")
class ExcelHelper(BaseHelper):
def __init__(self, file_path, is_append=True, progress_interval=100):
super().__init__(file_path, is_append, progress_interval)
self._initialize_excel(is_append)
def _initialize_excel(self, is_append):
if is_append and self.file_path.exists():
print(f'[*] 文件`{self.file_path.absolute()}`存在,追加新行')
self.workbook = load_workbook(self.file_path)
else:
print(f'[*] 新建文件{self.file_path.absolute()}')
self.workbook = Workbook()
self.sheet = self.workbook.active
self.sheet.title = 'Sheet1'
self.row_index = self.sheet.max_row
self.start_index = self.row_index
print(f'[*] 初始文件的行数:{self.row_index}')
def _create_header(self, data_array):
if self.row_index == 1:
self.headers = list(data_array[0].keys())
for i, header in enumerate(self.headers):
self.sheet.cell(row=1, column=i + 1).value = header
else:
self.headers = [cell.value for cell in self.sheet[1]]
def _format_cell(self, cell):
column_letter = get_column_letter(cell.column)
column_dimensions = self.sheet.column_dimensions[column_letter]
column_dimensions.width = max(column_dimensions.width, len(str(cell.value)))
@staticmethod
def _format_value(value):
if isinstance(value, dict):
return json.dumps(value, ensure_ascii=False)
elif isinstance(value, (int, float)):
return str(value)
return value
def _print_current_progress(self):
if self.row_index % self.progress_interval == 0:
print(f'[+] 当前行数:{self.row_index}')
def add_data_list(self, data_array):
self._create_header(data_array)
for row in data_array:
self.row_index += 1
for column_index, column_title in enumerate(self.headers):
cell = self.sheet.cell(row=self.row_index, column=column_index + 1)
cell.value = self._format_value(row.get(column_title, ''))
self._format_cell(cell)
self._print_current_progress()
def close(self):
self.workbook.save(self.file_path)
print(f'[*] 当前总行数为:{self.sheet.max_row},新增行数:{self.sheet.max_row - self.start_index}')
print(f'[*] 文件保存在路径`{self.file_path.absolute()}`')
def print_rows(self):
for row in self.sheet.iter_rows(values_only=True):
print(" | ".join([f"{str(cell):14}" for cell in row]))
class CSVHelper(BaseHelper):
def __init__(self, file_path, is_append=True, progress_interval=100):
super().__init__(file_path, is_append, progress_interval)
self.row_index = 0
self.start_index = 0
self.headers = []
if self.file_path.exists() and is_append:
self.mode = 'a'
print(f'[*] 文件`{self.file_path.absolute()}`存在,追加新行')
with open(self.file_path, newline='', encoding='utf-8') as csvfile:
reader = csv.reader(csvfile)
self.start_index = sum(1 for _ in reader)
else:
self.mode = 'w'
print(f'[*] 新建文件{self.file_path.absolute()}')
self.csvfile = open(self.file_path, self.mode, newline='', encoding='utf-8')
self.writer = csv.writer(self.csvfile, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
print(f'[*] 初始文件的行数:{self.start_index}')
self.total_rows = self.start_index
@staticmethod
def _is_match_bignum(value):
if isinstance(value, (int, float)):
return True
return isinstance(value, str) and re.match(r'^\d+$', value)
def _create_header(self, data_array):
if data_array:
self.headers = list(data_array[0].keys())
def _print_current_progress(self):
if self.row_index % self.progress_interval == 0:
print(f'[+] 当前行数:{self.row_index}')
def _write_new_row(self, row):
self.writer.writerow(row)
self.row_index += 1
self.total_rows += 1
self._print_current_progress()
def add_data_list(self, data_array):
if self.mode == 'w' and self.total_rows == 0:
self._create_header(data_array)
self._write_new_row(self.headers)
for row in data_array:
data_values = [f'\t{str(value)}\t' if self._is_match_bignum(value) else value for value in row.values()]
self._write_new_row(data_values)
# print(f"[+] {self.row_index - self.start_index} rows written on {self.file_path}")
def print_rows(self):
with open(self.file_path, newline='', encoding='utf-8') as csvfile:
reader = csv.reader(csvfile)
for row in reader:
print(" | ".join([f"{str(cell):14}" for cell in row]))
def close(self):
self.csvfile.close()
new_rows = self.total_rows - self.start_index
print(f'[*] 当前总行数为:{self.total_rows},新增行数:{new_rows}')
print(f'[*] 文件保存在路径`{self.file_path.absolute()}`')
class TableHelper:
def __init__(self, file_path, file_type='excel', is_append=True, progress_interval=100):
if file_type == 'excel':
self.helper = ExcelHelper(file_path, is_append, progress_interval)
elif file_type == 'csv':
self.helper = CSVHelper(file_path, is_append, progress_interval)
else:
raise ValueError("Unsupported file type. Use 'excel' or 'csv'.")
def add_data_list(self, data_array):
self.helper.add_data_list(data_array)
def print_rows(self):
self.helper.print_rows()
def close(self):
self.helper.close()
python
from faker import Faker
from table_helper import TableHelper
fake = Faker()
def generate_fake_user():
return {
"name": fake.name(),
"address": fake.address(),
"email": fake.email(),
"date_of_birth": fake.date_of_birth().isoformat(),
"phone_number": fake.phone_number(),
"username": fake.user_name(),
"website": fake.url()
}
num_users = 500
table_helper = TableHelper('a.xlsx', file_type='excel', is_append=True, progress_interval=100)
# table_helper = TableHelper('a.csv', file_type='csv', is_append=False, progress_interval=100)
for item in range(num_users):
data_list = generate_fake_user()
table_helper.add_data_list([data_list])
table_helper.close()