# db/database.py import psycopg2 from psycopg2.extras import RealDictCursor from typing import List, Dict, Any, Optional import logging from contextlib import contextmanager logger = logging.getLogger(__name__) class Database: """ Класс для управления подключением к PostgreSQL и выполнения операций Использует паттерн Singleton для единственного экземпляра подключения """ _instance = None def __new__(cls, *args, **kwargs): if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance def __init__(self, host='localhost', port=5432, database='mobile_devices_db', user='admin', password='password'): if not hasattr(self, 'initialized'): self.connection_params = { 'host': host, 'port': port, 'database': database, 'user': user, 'password': password } self.connection = None self.initialized = True def connect(self): """Установка соединения с БД""" try: self.connection = psycopg2.connect(**self.connection_params) logger.info("✅ Подключение к БД установлено") return True except Exception as e: logger.error(f"❌ Ошибка подключения к БД: {e}") return False def disconnect(self): """Закрытие соединения с БД""" if self.connection: self.connection.close() logger.info("🔒 Соединение с БД закрыто") @contextmanager def get_cursor(self, dict_cursor=True): """ Контекстный менеджер для безопасной работы с курсором """ cursor_factory = RealDictCursor if dict_cursor else None cursor = self.connection.cursor(cursor_factory=cursor_factory) try: yield cursor self.connection.commit() except Exception as e: self.connection.rollback() logger.error(f"❌ Ошибка выполнения запроса: {e}") raise finally: cursor.close() # === CRUD операции для Companies === def get_all_companies(self) -> List[Dict[str, Any]]: """Получение всех компаний""" with self.get_cursor() as cursor: cursor.execute(""" SELECT c.company_id, c.company_name, COUNT(m.model_id) as models_count FROM companies c LEFT JOIN models m ON c.company_id = m.company_id GROUP BY c.company_id, c.company_name ORDER BY c.company_name """) return cursor.fetchall() def add_company(self, company_name: str) -> int: """Добавление новой компании""" with self.get_cursor() as cursor: cursor.execute( "INSERT INTO companies (company_name) VALUES (%s) RETURNING company_id", (company_name,) ) return cursor.fetchone()['company_id'] def update_company(self, company_id: int, company_name: str): """Обновление названия компании""" with self.get_cursor() as cursor: cursor.execute( "UPDATE companies SET company_name = %s WHERE company_id = %s", (company_name, company_id) ) def delete_company(self, company_id: int): """Удаление компании (каскадно удалит все модели)""" with self.get_cursor() as cursor: cursor.execute("DELETE FROM companies WHERE company_id = %s", (company_id,)) # === CRUD операции для Models === def get_all_models(self, company_id: Optional[int] = None) -> List[Dict[str, Any]]: """Получение всех моделей (опционально по компании)""" query = """ SELECT m.model_id, m.model_name, c.company_name, m.mobile_weight, m.ram, m.front_camera, m.back_camera, pr.processor_name, m.battery_capacity, m.screen_size, m.launched_year, COUNT(DISTINCT p.region_id) as price_regions FROM models m JOIN companies c ON m.company_id = c.company_id LEFT JOIN processors pr ON m.processor_id = pr.processor_id LEFT JOIN prices p ON m.model_id = p.model_id """ params = [] if company_id: query += " WHERE m.company_id = %s" params.append(company_id) query += " GROUP BY m.model_id, c.company_name, pr.processor_name ORDER BY c.company_name, m.model_name" with self.get_cursor() as cursor: cursor.execute(query, params) return cursor.fetchall() def get_model_by_id(self, model_id: int) -> Dict[str, Any]: """Получение модели по ID""" with self.get_cursor() as cursor: cursor.execute(""" SELECT m.*, c.company_name, pr.processor_name FROM models m JOIN companies c ON m.company_id = c.company_id LEFT JOIN processors pr ON m.processor_id = pr.processor_id WHERE m.model_id = %s """, (model_id,)) return cursor.fetchone() def add_model(self, model_data: Dict[str, Any]) -> int: """Добавление новой модели""" # Сначала получаем или создаем процессор processor_id = None if model_data.get('processor_name'): processor_id = self.get_or_create_processor(model_data['processor_name']) with self.get_cursor() as cursor: cursor.execute(""" INSERT INTO models (model_name, company_id, processor_id, mobile_weight, ram, front_camera, back_camera, battery_capacity, screen_size, launched_year) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) RETURNING model_id """, ( model_data['model_name'], model_data['company_id'], processor_id, model_data.get('mobile_weight'), model_data.get('ram'), model_data.get('front_camera'), model_data.get('back_camera'), model_data.get('battery_capacity'), model_data.get('screen_size'), model_data.get('launched_year') )) return cursor.fetchone()['model_id'] def update_model(self, model_id: int, model_data: Dict[str, Any]): """Обновление модели""" # Получаем или создаем процессор processor_id = None if model_data.get('processor_name'): processor_id = self.get_or_create_processor(model_data['processor_name']) with self.get_cursor() as cursor: cursor.execute(""" UPDATE models SET model_name = %s, company_id = %s, processor_id = %s, mobile_weight = %s, ram = %s, front_camera = %s, back_camera = %s, battery_capacity = %s, screen_size = %s, launched_year = %s WHERE model_id = %s """, ( model_data['model_name'], model_data['company_id'], processor_id, model_data.get('mobile_weight'), model_data.get('ram'), model_data.get('front_camera'), model_data.get('back_camera'), model_data.get('battery_capacity'), model_data.get('screen_size'), model_data.get('launched_year'), model_id )) def delete_model(self, model_id: int): """Удаление модели""" with self.get_cursor() as cursor: cursor.execute("DELETE FROM models WHERE model_id = %s", (model_id,)) # === CRUD операции для Prices === def get_model_prices(self, model_id: int) -> List[Dict[str, Any]]: """Получение всех цен для модели""" with self.get_cursor() as cursor: cursor.execute(""" SELECT p.price_id, p.model_id, p.region_id, r.region_name, p.price, p.currency FROM prices p JOIN regions r ON p.region_id = r.region_id WHERE p.model_id = %s ORDER BY r.region_name """, (model_id,)) return cursor.fetchall() def add_or_update_price(self, model_id: int, region_id: int, price: float): """Добавление или обновление цены""" with self.get_cursor() as cursor: cursor.execute(""" INSERT INTO prices (model_id, region_id, price) VALUES (%s, %s, %s) ON CONFLICT (model_id, region_id) DO UPDATE SET price = EXCLUDED.price """, (model_id, region_id, price)) def delete_price(self, price_id: int): """Удаление цены""" with self.get_cursor() as cursor: cursor.execute("DELETE FROM prices WHERE price_id = %s", (price_id,)) # === Вспомогательные методы === def get_or_create_processor(self, processor_name: str) -> int: """Получение или создание процессора""" with self.get_cursor() as cursor: cursor.execute( "SELECT processor_id FROM processors WHERE processor_name = %s", (processor_name,) ) result = cursor.fetchone() if result: return result['processor_id'] else: cursor.execute( "INSERT INTO processors (processor_name) VALUES (%s) RETURNING processor_id", (processor_name,) ) return cursor.fetchone()['processor_id'] def get_all_regions(self) -> List[Dict[str, Any]]: """Получение всех регионов""" with self.get_cursor() as cursor: cursor.execute("SELECT * FROM regions ORDER BY region_name") return cursor.fetchall() def get_all_processors(self) -> List[Dict[str, Any]]: """Получение всех процессоров""" with self.get_cursor() as cursor: cursor.execute("SELECT * FROM processors ORDER BY processor_name") return cursor.fetchall() # === Методы для аналитики === def get_price_statistics(self) -> List[Dict[str, Any]]: """Получение статистики цен по регионам""" with self.get_cursor() as cursor: cursor.execute(""" SELECT r.region_name, COUNT(p.price_id) as models_count, AVG(p.price) as avg_price, MIN(p.price) as min_price, MAX(p.price) as max_price FROM prices p JOIN regions r ON p.region_id = r.region_id GROUP BY r.region_name ORDER BY avg_price DESC """) return cursor.fetchall() def search_models(self, search_text: str) -> List[Dict[str, Any]]: """Поиск моделей по тексту""" search_pattern = f"%{search_text}%" with self.get_cursor() as cursor: cursor.execute(""" SELECT DISTINCT m.model_id, m.model_name, c.company_name, m.ram, m.battery_capacity, m.launched_year FROM models m JOIN companies c ON m.company_id = c.company_id WHERE m.model_name ILIKE %s OR c.company_name ILIKE %s OR m.ram ILIKE %s OR m.battery_capacity ILIKE %s ORDER BY c.company_name, m.model_name LIMIT 100 """, (search_pattern, search_pattern, search_pattern, search_pattern)) return cursor.fetchall()