random_action_db/app.py
mrutsy bb572f3072 Добавить app.py
Случайно добавляет, изменяет или удаляет строку в базе данных.
2025-04-08 09:17:16 +00:00

121 lines
No EOL
4.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import time
import random
import string
import logging
import psycopg2
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class Database:
def __init__(
self,
host=os.getenv('DB_HOST', 'localhost'),
port=os.getenv('DB_PORT', ''),
user=os.getenv('DB_USER', ''),
password=os.getenv('DB_PASSWORD', ''),
database=os.getenv('DB_DATABASE', ''),
):
self.host = host
self.port = port
self.user = user
self.password = password
self.database = database
self.connection = None
def _open(self):
if self.connection is None:
try:
logging.debug("Пытаюсь установить соединение с базой данных.")
self.connection = psycopg2.connect(
host=self.host,
port=self.port,
user=self.user,
password=self.password,
database=self.database,
connect_timeout=5
)
logging.info(f"Соединение с базой данными {self.database} - установлено.")
except (Exception, psycopg2.Error) as e:
logging.error(f"Ошибка при подключении к базе данных: {e}")
def close(self):
if self.connection is not None:
self.connection.close()
logging.debug("Соединение с базой данных закрыто.")
def execute_query(self, query, params=None):
self._open()
try:
with self.connection.cursor() as cursor:
cursor.execute(query, params)
if query.strip().upper().startswith("SELECT"):
result = cursor.fetchall()
logging.debug(f"Выполнен запрос: {query} | Возвращено {len(result)} строк")
return result
else:
self.connection.commit()
logging.debug(f"Выполнен запрос: {query}")
except Exception as e:
logging.error(f"Ошибка выполнения запроса:\n{e}")
def create_table(self, table_name, columns):
columns_with_types = ', '.join([f"{col} {typ}" for col, typ in columns.items()])
create_table_query = f"CREATE TABLE IF NOT EXISTS {table_name} ({columns_with_types});"
self.execute_query(create_table_query)
def insert_row(self, table_name, **kwargs):
columns = ', '.join(kwargs.keys())
placeholders = ', '.join(['%s'] * len(kwargs))
insert_query = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders});"
self.execute_query(insert_query, list(kwargs.values()))
def update_row(self, table_name, set_values, condition):
set_clause = ', '.join([f"{col} = %s" for col in set_values.keys()])
update_query = f"UPDATE {table_name} SET {set_clause} WHERE {condition};"
self.execute_query(update_query, list(set_values.values()))
def delete_row(self, table_name, condition):
delete_query = f"DELETE FROM {table_name} WHERE {condition};"
self.execute_query(delete_query)
if __name__ == '__main__':
db = Database()
db.create_table(
'rows',
{
'id': 'SERIAL PRIMARY KEY',
'text': 'VARCHAR(255) NOT NULL',
}
)
for _ in range(5):
random_string = ''.join(random.choices(string.ascii_letters + string.digits, k=16))
db.insert_row('rows', text=random_string)
logging.info(f'Создана новая строка: {random_string}')
while True:
all_rows = db.execute_query('SELECT * FROM rows;')
random_string = ''.join(random.choices(string.ascii_letters + string.digits, k=16))
if all_rows:
action = random.choice(['insert', 'update', 'delete'])
random_row = random.choice(all_rows)
if action == 'insert':
db.insert_row('rows', text=random_string)
logging.info(f'Вставлена строка: {random_string}')
elif action == 'update':
db.update_row('rows', {'text': random_string}, f'id = {random_row[0]}')
logging.info(f'Обновлена строка с id {random_row[0]}: новый текст - {random_string}')
elif action == 'delete':
db.delete_row('rows', f'id = {random_row[0]}')
logging.info(f'Удалена строка с id {random_row[0]}')
else:
logging.warning("Нет строк для обработки.")
time.sleep(60)