Skip to content

将json输出为excel或csv

代码

python
import json
from openpyxl import Workbook, load_workbook
from openpyxl.utils import get_column_letter
from pathlib import Path
import csv
import re

class BaseHelper:
    def __init__(self, file_path, is_append=True, progress_interval=100):
        self.file_path = Path(file_path)
        self.is_append = is_append
        self.progress_interval = progress_interval
        self.headers = []

    def add_data_list(self, data_array):
        raise NotImplementedError("Subclasses should implement this method")

    def print_rows(self):
        raise NotImplementedError("Subclasses should implement this method")

    def close(self):
        raise NotImplementedError("Subclasses should implement this method")


class ExcelHelper(BaseHelper):
    def __init__(self, file_path, is_append=True, progress_interval=100):
        super().__init__(file_path, is_append, progress_interval)
        self._initialize_excel(is_append)

    def _initialize_excel(self, is_append):
        if is_append and self.file_path.exists():
            print(f'[*] 文件`{self.file_path.absolute()}`存在,追加新行')
            self.workbook = load_workbook(self.file_path)
        else:
            print(f'[*] 新建文件{self.file_path.absolute()}')
            self.workbook = Workbook()
        self.sheet = self.workbook.active
        self.sheet.title = 'Sheet1'
        self.row_index = self.sheet.max_row
        self.start_index = self.row_index
        print(f'[*] 初始文件的行数:{self.row_index}')

    def _create_header(self, data_array):
        if self.row_index == 1:
            self.headers = list(data_array[0].keys())
            for i, header in enumerate(self.headers):
                self.sheet.cell(row=1, column=i + 1).value = header
        else:
            self.headers = [cell.value for cell in self.sheet[1]]

    def _format_cell(self, cell):
        column_letter = get_column_letter(cell.column)
        column_dimensions = self.sheet.column_dimensions[column_letter]
        column_dimensions.width = max(column_dimensions.width, len(str(cell.value)))

    @staticmethod
    def _format_value(value):
        if isinstance(value, dict):
            return json.dumps(value, ensure_ascii=False)
        elif isinstance(value, (int, float)):
            return str(value)
        return value

    def _print_current_progress(self):
        if self.row_index % self.progress_interval == 0:
            print(f'[+] 当前行数:{self.row_index}')

    def add_data_list(self, data_array):
        self._create_header(data_array)
        for row in data_array:
            self.row_index += 1
            for column_index, column_title in enumerate(self.headers):
                cell = self.sheet.cell(row=self.row_index, column=column_index + 1)
                cell.value = self._format_value(row.get(column_title, ''))
                self._format_cell(cell)
            self._print_current_progress()

    def close(self):
        self.workbook.save(self.file_path)
        print(f'[*] 当前总行数为:{self.sheet.max_row},新增行数:{self.sheet.max_row - self.start_index}')
        print(f'[*] 文件保存在路径`{self.file_path.absolute()}`')

    def print_rows(self):
        for row in self.sheet.iter_rows(values_only=True):
            print(" | ".join([f"{str(cell):14}" for cell in row]))

class CSVHelper(BaseHelper):
    def __init__(self, file_path, is_append=True, progress_interval=100):
        super().__init__(file_path, is_append, progress_interval)
        self.row_index = 0
        self.start_index = 0
        self.headers = []

        if self.file_path.exists() and is_append:
            self.mode = 'a'
            print(f'[*] 文件`{self.file_path.absolute()}`存在,追加新行')
            with open(self.file_path, newline='', encoding='utf-8') as csvfile:
                reader = csv.reader(csvfile)
                self.start_index = sum(1 for _ in reader)
        else:
            self.mode = 'w'
            print(f'[*] 新建文件{self.file_path.absolute()}')

        self.csvfile = open(self.file_path, self.mode, newline='', encoding='utf-8')
        self.writer = csv.writer(self.csvfile, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
            
        print(f'[*] 初始文件的行数:{self.start_index}')
        self.total_rows = self.start_index

    @staticmethod
    def _is_match_bignum(value):
        if isinstance(value, (int, float)):
            return True
        return isinstance(value, str) and re.match(r'^\d+$', value)

    def _create_header(self, data_array):
        if data_array:
            self.headers = list(data_array[0].keys())

    def _print_current_progress(self):
        if self.row_index % self.progress_interval == 0:
            print(f'[+] 当前行数:{self.row_index}')

    def _write_new_row(self, row):
        self.writer.writerow(row)
        self.row_index += 1
        self.total_rows += 1
        self._print_current_progress()

    def add_data_list(self, data_array):
        if self.mode == 'w' and self.total_rows == 0:
            self._create_header(data_array)
            self._write_new_row(self.headers)
        for row in data_array:
            data_values = [f'\t{str(value)}\t' if self._is_match_bignum(value) else value for value in row.values()]
            self._write_new_row(data_values)
            # print(f"[+] {self.row_index - self.start_index} rows written on {self.file_path}")

    def print_rows(self):
        with open(self.file_path, newline='', encoding='utf-8') as csvfile:
            reader = csv.reader(csvfile)
            for row in reader:
                print(" | ".join([f"{str(cell):14}" for cell in row]))

    def close(self):
        self.csvfile.close()
        new_rows = self.total_rows - self.start_index
        print(f'[*] 当前总行数为:{self.total_rows},新增行数:{new_rows}')
        print(f'[*] 文件保存在路径`{self.file_path.absolute()}`')

class TableHelper:
    def __init__(self, file_path, file_type='excel', is_append=True, progress_interval=100):
        if file_type == 'excel':
            self.helper = ExcelHelper(file_path, is_append, progress_interval)
        elif file_type == 'csv':
            self.helper = CSVHelper(file_path, is_append, progress_interval)
        else:
            raise ValueError("Unsupported file type. Use 'excel' or 'csv'.")

    def add_data_list(self, data_array):
        self.helper.add_data_list(data_array)

    def print_rows(self):
        self.helper.print_rows()

    def close(self):
        self.helper.close()
python
from faker import Faker
from table_helper import TableHelper

fake = Faker()

def generate_fake_user():
    return {
        "name": fake.name(),
        "address": fake.address(),
        "email": fake.email(),
        "date_of_birth": fake.date_of_birth().isoformat(),
        "phone_number": fake.phone_number(),
        "username": fake.user_name(),
        "website": fake.url()
    }

num_users = 500

table_helper = TableHelper('a.xlsx', file_type='excel', is_append=True, progress_interval=100)
# table_helper = TableHelper('a.csv', file_type='csv', is_append=False, progress_interval=100)

for item in range(num_users):
    data_list = generate_fake_user()
    table_helper.add_data_list([data_list])

table_helper.close()

Released under the MIT License.