This commit is contained in:
Egor Deev 2022-02-24 06:13:18 +07:00 committed by GitHub
commit 341e0fbebb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 238 additions and 0 deletions

19
config.py Normal file
View file

@ -0,0 +1,19 @@
API_TOKEN = '2075938507:AAFhioDj8E1eEh_Rm1U-kTs78s7UhS6da28' # ОСНОВНОЙ
# API_TOKEN = '1232827129:AAEq369Iz5wztfHCBHTl9kFt6YsgIkSOZn0' # ТЕСТЫ
CITY = {'Центральный район': [1, 'Норильск'], 'район Талнах': [2, 'Талнах'],
'ж/о Оганер': [3, 'Оганер'], 'район Кайеркан': [4, 'Кайеркан']}
NAMES = ['Норильск', 'Талнах', 'Оганер', 'Кайеркан']
TIME = [1, 4, 7, 10, 13, 16, 19, 22]
ID_STICKERS = r"CAACAgIAAxkBAAID"
STICKERS = {"пасмурно": r"2mIKSBBtFNR1uHLhdztkhVwqTxRRAAKHEgACljNZSAgtQYHfmE4WIwQ", # облако
"поземок": r"22IKSBIdUSxApfTAEk5mMjldDexZAAIPFwAClIBRSCSyRnG71n6CIwQ", # солнце и облако
1: r"3GIKSBRW4sCgYX8AAaq01U6DaNF1IwACXhMAAlzDUUirRDtYEkGJ3iME", # ветер
4: r"3WIKSBec5Ch8vL-czNkXikARo-JmAALmFAACGHZRSHmiNc0n7VZPIwQ", # гроза
"снег": r"3mIKSBpHLfkJOnI4HCQny_a_XSVbAAI8FAACSixRSN7lm05cAAEj0yME", # снегопад
6: r"32IKSByANsmxbok8VePu7k6mTzexAALMFgACVopQSHXJa0spfMe7IwQ"} # ясно
# await message.answer_sticker(ID_STICKERS + STICKERS[1])

77
info.py Normal file
View file

@ -0,0 +1,77 @@
# - * - coding: utf-8 - * -
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from bs4 import BeautifulSoup
import requests, config
def act():
try:
soup = BeautifulSoup(requests.get('https://www.norilsk-city.ru/meteo/').text, 'html.parser')
acta = [list(map(lambda x: x.get_text(), _.find_all("p"))) for _ in soup.find_all("tbody")[0] if _ != "\n"]
count = '1-ой смены' if "1" in str(soup.find_all("div",
class_="textContent",
limit=1)[0].find_all("p",
limit=1)[0].get_text()) else '2-ой смены'
for lst in acta:
if "нет" in lst[1]:
acta[acta.index(lst)] = [config.CITY[lst[0]], "нет"]
else:
acta[acta.index(lst)] = [config.CITY[lst[0]], int(lst[1].split("-")[1].split()[0])]
return sorted(acta), count
except Exception as e:
return [], ""
def storm():
soup = BeautifulSoup(requests.get('https://www.norilsk-city.ru/meteo/').text, 'html.parser')
message = str(str(soup.find_all("p")[13])[35:-40])
stm = "*" + str(str(soup.find_all("h2")[1])[4:-5]) + \
"*\n\n" + message[:-67] + " " + message[96:-36] + message[118:-24] + message[129:-1]
return stm
def weather(moment):
try:
# путь к драйверу chrome
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument('--no-sandbox')
chromedriver = 'chromedriver'
browser = webdriver.Chrome(executable_path=chromedriver, chrome_options=chrome_options)
# После успешного входа в систему переходим на страницу «Gismeteo»
browser.get(f'https://www.gismeteo.ru/weather-norilsk-3957/{moment}')
# Получение HTML-содержимого
requiredHtml = browser.page_source
soup = BeautifulSoup(requiredHtml, 'html.parser')
# weather = soup.find_all()
temp = soup.find_all("span", class_="unit unit_temperature_c")[6:]
wind = soup.find_all("span", class_=["wind-unit unit unit_wind_m_s", "wind-unit unit unit_wind_m_s warning"])[:8]
dire = soup.find_all("div", class_="direction")
prec = soup.find_all("div", class_="item-unit")
k, s, weather = 0, 0, {}
for i in range(8):
par = wind[i].get_text().split()[0].split('-')
if len(par) == 1:
par = [int(par[0])]
else:
par = [int(par[0]), int(par[1])]
pre = prec[i].get_text()
# pre = prec[i].get_text().split('\n')
# precti = pre[1].split(' ')[-1] if len(pre) > 1 else pre[0]
weather[config.TIME[i]] = [temp[i].get_text(), par, dire[i].get_text(), pre]
return weather
except Exception as e:
return [repr(e)]

142
sql.py Normal file
View file

@ -0,0 +1,142 @@
import sqlite3
class SQLighter:
def __init__(self, database):
"""Подключаемся к БД и сохраняем курсор соединения"""
self.connection = sqlite3.connect(database)
self.cursor = self.connection.cursor()
def user_exists(self, id_user):
"""Проверяем, есть ли уже пользователь в базе"""
with self.connection:
result = self.cursor.execute("SELECT * FROM `users` WHERE `id_user` = ?", (id_user,)).fetchall()
return bool(len(result))
def add_user(self, id_user):
"""Добавляем нового пользователя"""
with self.connection:
return self.cursor.execute("INSERT INTO `users` (`id_user`, `status`, `send`, `part`, `city`) "
"VALUES(?,?,?,?,?)", (id_user, 0, 1, 0, 0))
def update_user(self, id_user, win, num):
"""Обновляем статус пользователя"""
with self.connection:
if win == 1:
return self.cursor.execute("UPDATE `users` SET `status` = ? WHERE `id_user` = ?", (num, id_user))
elif win == 2:
return self.cursor.execute("UPDATE `users` SET `send` = ? WHERE `id_user` = ?", (num, id_user))
elif win == 3:
return self.cursor.execute("UPDATE `users` SET `part` = ? WHERE `id_user` = ?", (num, id_user))
elif win == 4:
return self.cursor.execute("UPDATE `users` SET `city` = ? WHERE `id_user` = ?", (num, id_user))
def get_subs(self, status=True):
"""Получаем всех подписчиков"""
with self.connection:
return [i[0] for i in self.cursor.execute("SELECT id_user FROM `users` WHERE `status` = ?",
(status,)).fetchall()]
def get_send(self, send=True):
"""Получаем всех подписчиков"""
with self.connection:
return [i[0] for i in self.cursor.execute("SELECT id_user FROM `users` WHERE `send` = ?",
(send,)).fetchall()]
def get_users(self, part, city):
"""Получаем всех подписчиков"""
with self.connection:
return [i[0] for i in self.cursor.execute("SELECT id_user FROM `users` WHERE `status` = ? AND `part` = ? AND `city` = ?",
(True, part, city)).fetchall()]
def get_sends(self, part, city, send=True):
"""Получаем только подписчиков, только с полной или нет"""
with self.connection:
return [i[0] for i in self.cursor.execute("SELECT id_user FROM `users` WHERE `status` = ? AND `send` = ? AND `part` = ? AND `city` = ?",
(True, send, part, city)).fetchall()]
def update_status(self, id_user):
"""Обновляем статус пользователя"""
with self.connection:
rat = self.cursor.execute("SELECT `rating` FROM `users` WHERE `id_user` = ?", (id_user, )).fetchone()
if int(rat[0]) + 1 == 3:
self.cursor.execute("UPDATE `users` SET `status` = ? WHERE `id_user` = ?", (0, id_user))
self.cursor.execute("UPDATE `users` SET `rating` = ? WHERE `id_user` = ?", (0, id_user))
else:
self.cursor.execute("UPDATE `users` SET `rating` = ? WHERE `id_user` = ?", (int(rat[0]) + 1, id_user))
return 1
# ВЫЗОВ ДОПОЛНИТЕЛЬНЫХ ДАННЫХ
def get_data(self):
"""Получаем данные"""
with self.connection:
data = self.cursor.execute("SELECT * FROM `data`").fetchone()
return data[1], data[2]
def update_data(self, data, part):
"""Обновляем данные"""
with self.connection:
self.cursor.execute("UPDATE `data` SET `date` = ? WHERE `id` = ?", (data, 1))
self.cursor.execute("UPDATE `data` SET `part` = ? WHERE `id` = ?", (part, 1))
return 1
# СОХРАНЕНИЕ ДАННЫХ ОБ АКТИРОВКАХ
def save_acta(self, city, part):
"""Обновляем данные"""
with self.connection:
for i, town in enumerate(["nor", "tal", "oga", "kae"]):
self.cursor.execute(f"UPDATE `parts` SET `{town}` = ? WHERE `part` = ?", (city[i][1], part))
return 1
def get_acta(self, part):
"""Получаем данные"""
city = []
with self.connection:
for town in ["nor", "tal", "oga", "kae"]:
act = self.cursor.execute(f"SELECT `{town}` FROM `parts` WHERE `part` = ?", (part, )).fetchone()
city.append(act[0])
return city
def del_acta(self):
"""Удаляем данные"""
with self.connection:
for part in [1, 2]:
for town in ["nor", "tal", "oga", "kae"]:
self.cursor.execute(f"UPDATE `parts` SET `{town}` = ? WHERE `part` = ?", (None, part))
pass
def check_acta(self, part):
"""Проверяем данные"""
with self.connection:
for town in ["nor", "tal", "oga", "kae"]:
z = self.cursor.execute(f"SELECT `{town}` FROM `parts` WHERE `part` = ?", (part, )).fetchone()
if z[0] == None:
return 0
return 1
# ПОЛУЧЕНИЕ ДАННЫХ С GISMETEO
def get_moments(self):
"""Получаем все дни"""
with self.connection:
return [i[0] for i in self.cursor.execute("SELECT `id` FROM `weather`").fetchall()]
def update_weather(self, str_data, moment, time):
"""Обновляем данные"""
with self.connection:
return self.cursor.execute(f"UPDATE `weather` SET `{int(time)}` = ? WHERE `id` = ?", (f'{str_data}', moment))
def get_weather(self, moment):
"""Отдаём сохранённые данные"""
with self.connection:
wthr = {}
moments = [i[0] for i in self.cursor.execute("SELECT `id` FROM `weather`").fetchall()]
for time in [1, 4, 7, 10, 13, 16, 19, 22]:
dt = self.cursor.execute(f"SELECT `{time}` FROM `weather` WHERE `id` = ?",
(moments[moment], )).fetchone()
wthr[time] = eval(dt[0])
return wthr
# ВЫКЛЮЧЕНИЕ ВЫЗОВА
def close(self):
"""Закрываем соединение с БД"""
self.connection.close()