You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
284 lines
12 KiB
284 lines
12 KiB
"""This module handles all operations involving importing reports.""" |
|
|
|
import shutil |
|
import platform |
|
import ctypes |
|
import csv |
|
from os import path, makedirs |
|
from PyQt5.QtCore import QModelIndex, QDate, Qt |
|
from PyQt5.QtWidgets import QWidget, QDialog, QDialogButtonBox |
|
from PyQt5.QtGui import QStandardItemModel, QStandardItem |
|
from PyQt5 import QtWidgets |
|
|
|
import GeneralUtils |
|
from Constants import * |
|
from ui import ImportReportTab, ReportResultWidget |
|
from ManageVendors import Vendor |
|
from FetchData import ALL_REPORTS, CompletionStatus |
|
from Settings import SettingsModel |
|
from ManageDB import UpdateDatabaseWorker |
|
|
|
|
|
class ProcessResult: |
|
"""This holds the results of an import process |
|
|
|
:param vendor: The target vendor |
|
:param report_type: The target report type |
|
""" |
|
def __init__(self, vendor: Vendor, report_type: str): |
|
self.vendor = vendor |
|
self.report_type = report_type |
|
self.completion_status = CompletionStatus.SUCCESSFUL |
|
self.message = "" |
|
self.file_name = "" |
|
self.file_dir = "" |
|
self.file_path = "" |
|
|
|
|
|
class ImportReportController: |
|
"""Controls the Import Report tab |
|
|
|
:param vendors: The list of vendors in the system |
|
:param settings: The user's settings |
|
:param import_report_widget: The import report widget. |
|
:param import_report_ui: The UI for the import_report_widget. |
|
""" |
|
def __init__(self, vendors: list, settings: SettingsModel, import_report_widget: QWidget, |
|
import_report_ui: ImportReportTab.Ui_import_report_tab): |
|
|
|
# region General |
|
self.import_report_widget = import_report_widget |
|
self.vendors = vendors |
|
self.date = QDate.currentDate() |
|
self.selected_vendor_index = -1 |
|
self.selected_report_type_index = -1 |
|
self.selected_file_path: str = "" |
|
self.settings = settings |
|
self.result_dialog = None |
|
# endregion |
|
|
|
# region Vendors |
|
self.vendor_list_view = import_report_ui.vendors_list_view_import |
|
self.vendor_list_model = QStandardItemModel(self.vendor_list_view) |
|
self.vendor_list_view.setModel(self.vendor_list_model) |
|
self.vendor_list_view.clicked.connect(self.on_vendor_selected) |
|
self.update_vendors_ui() |
|
# endregion |
|
|
|
# region Report Types |
|
self.report_type_list_view = import_report_ui.report_types_list_view_import |
|
self.report_type_list_model = QStandardItemModel(self.report_type_list_view) |
|
self.report_type_list_view.setModel(self.report_type_list_model) |
|
self.report_type_list_view.clicked.connect(self.on_report_type_selected) |
|
for report_type in ALL_REPORTS: |
|
item = QStandardItem(report_type) |
|
item.setEditable(False) |
|
self.report_type_list_model.appendRow(item) |
|
# endregion |
|
|
|
# region Others |
|
self.year_date_edit = import_report_ui.report_year_date_edit |
|
self.year_date_edit.setDate(self.date) |
|
self.year_date_edit.dateChanged.connect(self.on_date_changed) |
|
|
|
self.select_file_btn = import_report_ui.select_file_button |
|
self.select_file_btn.clicked.connect(self.on_select_file_clicked) |
|
|
|
self.selected_file_edit = import_report_ui.selected_file_edit |
|
|
|
self.import_report_button = import_report_ui.import_report_button |
|
self.import_report_button.clicked.connect(self.on_import_clicked) |
|
# endregion |
|
|
|
def on_vendors_changed(self, vendors: list): |
|
"""Handles the signal emitted when the system's vendor list is updated |
|
|
|
:param vendors: An updated list of the system's vendors |
|
""" |
|
self.selected_vendor_index = -1 |
|
self.update_vendors(vendors) |
|
self.update_vendors_ui() |
|
|
|
def update_vendors(self, vendors: list): |
|
""" Updates the local copy of vendors that support report import |
|
|
|
:param vendors: A list of vendors |
|
""" |
|
self.vendors = vendors |
|
|
|
def update_vendors_ui(self): |
|
"""Updates the UI to show vendors that support report import""" |
|
self.vendor_list_model.clear() |
|
for vendor in self.vendors: |
|
item = QStandardItem(vendor.name) |
|
item.setEditable(False) |
|
self.vendor_list_model.appendRow(item) |
|
|
|
def on_vendor_selected(self, model_index: QModelIndex): |
|
"""Handles the signal emitted when a vendor is selected""" |
|
self.selected_vendor_index = model_index.row() |
|
|
|
def on_report_type_selected(self, model_index: QModelIndex): |
|
"""Handles the signal emitted when a report type is selected""" |
|
self.selected_report_type_index = model_index.row() |
|
|
|
def on_date_changed(self, date: QDate): |
|
"""Handles the signal emitted when the target date is changed""" |
|
self.date = date |
|
|
|
def on_select_file_clicked(self): |
|
"""Handles the signal emitted when the select file button is clicked""" |
|
file_path = GeneralUtils.choose_file(TSV_FILTER + CSV_FILTER) |
|
if file_path: |
|
self.selected_file_path = file_path |
|
self.selected_file_edit.setText(file_path) |
|
|
|
def on_import_clicked(self): |
|
"""Handles the signal emitted when the import button is clicked""" |
|
if self.selected_vendor_index == -1: |
|
GeneralUtils.show_message("Select a vendor") |
|
return |
|
elif self.selected_report_type_index == -1: |
|
GeneralUtils.show_message("Select a report type") |
|
return |
|
elif self.selected_file_path == "": |
|
GeneralUtils.show_message("Select a file") |
|
return |
|
|
|
vendor = self.vendors[self.selected_vendor_index] |
|
report_type = ALL_REPORTS[self.selected_report_type_index] |
|
|
|
process_result = self.import_report(vendor, report_type) |
|
self.show_result(process_result) |
|
|
|
def import_report(self, vendor: Vendor, report_type: str) -> ProcessResult: |
|
""" Imports the selected file using the selected parameters |
|
|
|
:param vendor: The target vendor |
|
:param report_type: The target report type |
|
:raises Exception: If anything goes wrong while importing the report |
|
""" |
|
process_result = ProcessResult(vendor, report_type) |
|
|
|
try: |
|
|
|
dest_file_dir = GeneralUtils.get_yearly_file_dir(self.settings.yearly_directory, vendor.name, self.date) |
|
dest_file_name = GeneralUtils.get_yearly_file_name(vendor.name, report_type, self.date) |
|
dest_file_path = f"{dest_file_dir}{dest_file_name}" |
|
|
|
# Verify that dest_file_dir exists |
|
if not path.isdir(dest_file_dir): |
|
makedirs(dest_file_dir) |
|
|
|
# Validate report header |
|
delimiter = DELIMITERS[self.selected_file_path[-4:].lower()] |
|
file = open(self.selected_file_path, 'r', encoding='utf-8-sig') |
|
reader = csv.reader(file, delimiter=delimiter, quotechar='\"') |
|
if file.mode == 'r': |
|
header = {} |
|
for row in range(HEADER_ROWS): # reads header row data |
|
cells = next(reader) |
|
if cells: |
|
key = cells[0].lower() |
|
if key != HEADER_ENTRIES[row]: |
|
raise Exception('File has invalid header (missing row ' + HEADER_ENTRIES[row] + ')') |
|
else: |
|
header[key] = cells[1].strip() |
|
else: |
|
raise Exception('File has invalid header (missing row ' + HEADER_ENTRIES[row] + ')') |
|
for row in range(BLANK_ROWS): |
|
cells = next(reader) |
|
if cells: |
|
if cells[0].strip(): |
|
raise Exception('File has invalid header (not enough blank rows)') |
|
if header['report_id'] != report_type: |
|
raise Exception('File has invalid header (wrong Report_Id)') |
|
if not header['created']: |
|
raise Exception('File has invalid header (no Created date)') |
|
else: |
|
raise Exception('Could not open file') |
|
|
|
# Copy selected_file_path to dest_file_path |
|
self.copy_file(self.selected_file_path, dest_file_path) |
|
|
|
process_result.file_dir = dest_file_dir |
|
process_result.file_name = dest_file_name |
|
process_result.file_path = dest_file_path |
|
process_result.completion_status = CompletionStatus.SUCCESSFUL |
|
|
|
# Save protected tsv file |
|
protected_file_dir = f"{PROTECTED_DATABASE_FILE_DIR}{self.date.toString('yyyy')}/{vendor.name}/" |
|
if not path.isdir(protected_file_dir): |
|
makedirs(protected_file_dir) |
|
if platform.system() == "Windows": |
|
ctypes.windll.kernel32.SetFileAttributesW(PROTECTED_DATABASE_FILE_DIR, 2) # Hide folder |
|
|
|
protected_file_path = f"{protected_file_dir}{dest_file_name}" |
|
self.copy_file(self.selected_file_path, protected_file_path) |
|
|
|
# Add file to database |
|
database_worker = UpdateDatabaseWorker([{'file': protected_file_path, |
|
'vendor': vendor.name, |
|
'year': int(self.date.toString('yyyy'))}], |
|
False) |
|
database_worker.work() |
|
|
|
except Exception as e: |
|
process_result.message = f"Exception: {e}" |
|
process_result.completion_status = CompletionStatus.FAILED |
|
|
|
return process_result |
|
|
|
def copy_file(self, origin_path: str, dest_path: str): |
|
"""Copies a file from origin_path to dest_path""" |
|
shutil.copy2(origin_path, dest_path) |
|
|
|
def show_result(self, process_result: ProcessResult): |
|
"""Shows the result of the import process to the user |
|
|
|
:param process_result: The result of the import process |
|
""" |
|
self.result_dialog = QDialog(self.import_report_widget, flags=Qt.WindowCloseButtonHint) |
|
self.result_dialog.setWindowTitle("Import Result") |
|
vertical_layout = QtWidgets.QVBoxLayout(self.result_dialog) |
|
vertical_layout.setContentsMargins(5, 5, 5, 5) |
|
|
|
report_result_widget = QWidget(self.result_dialog) |
|
report_result_ui = ReportResultWidget.Ui_ReportResultWidget() |
|
report_result_ui.setupUi(report_result_widget) |
|
|
|
button_box = QtWidgets.QDialogButtonBox(QDialogButtonBox.Ok, self.result_dialog) |
|
button_box.setCenterButtons(True) |
|
button_box.accepted.connect(self.result_dialog.accept) |
|
|
|
vendor = process_result.vendor |
|
report_type = process_result.report_type |
|
|
|
report_result_ui.report_type_label.setText(f"{vendor.name} - {report_type}") |
|
report_result_ui.success_label.setText(process_result.completion_status.value) |
|
|
|
if process_result.completion_status == CompletionStatus.SUCCESSFUL: |
|
report_result_ui.message_label.hide() |
|
report_result_ui.retry_frame.hide() |
|
|
|
report_result_ui.file_label.setText(f"Saved as: {process_result.file_name}") |
|
report_result_ui.file_label.mousePressEvent = \ |
|
lambda event: GeneralUtils.open_file_or_dir(process_result.file_path) |
|
|
|
report_result_ui.folder_button.clicked.connect( |
|
lambda: GeneralUtils.open_file_or_dir(process_result.file_dir)) |
|
|
|
report_result_ui.success_label.setText("Successful!") |
|
report_result_ui.retry_frame.hide() |
|
|
|
elif process_result.completion_status == CompletionStatus.FAILED: |
|
report_result_ui.file_frame.hide() |
|
report_result_ui.retry_frame.hide() |
|
|
|
report_result_ui.message_label.setText(process_result.message) |
|
|
|
vertical_layout.addWidget(report_result_widget) |
|
vertical_layout.addWidget(button_box) |
|
self.result_dialog.show() |
|
|
|
|
|
|