import requests from loguru import logger import csv import os import random class Api: def __init__(self) -> None: # self.base_url = 'http://127.0.0.1:6060' self.base_url = 'http://192.168.11.67:6060' # 创建店铺 def create_shop(self, city: str, street: str, shop_name: str) -> dict: url = f'{self.base_url}/country/shop' item = { 'city': city, 'street': street, 'shop_name': shop_name, } response = requests.post(url, json=item).json() logger.info(response) return response # 查询店铺 def get_shop(self, city: str) -> dict: url = f'{self.base_url}/country/shop' response = requests.get(url).json() # logger.info(response) return response # 创建信息 def create_info(self, first_name: str, last_name: str, birthday: str, current_address: str, city: str, phone: str, postal_code: str, province: str, email: str, text: str,status: bool=False, email_content: str|None=None) -> dict: url = f'{self.base_url}/country/info' item = { "first_name": first_name, "last_name": last_name, "birthday": birthday, "current_address": current_address, "city": city, "phone": phone, "postal_code": postal_code, "province": province, "status": status, "email": email, "email_content": email_content, "text": text } response = requests.post(url, json=item).json() logger.info(response) return response # 根据城市 随机获取一个店铺 def get_random_shop(self) -> dict: url = f'{self.base_url}/country/shop/random' response = requests.get(url).json() # logger.info(response) if not response.get('street'): logger.error(f'没有店铺') return None return response def main(): """ 从同目录的 `bakeries.csv` 读取面包店数据,按列映射输出或创建店铺 列顺序:`Name,Address,City` """ api = Api() csv_path = os.path.join(os.path.dirname(__file__), 'data.csv') if not os.path.exists(csv_path): logger.error(f'CSV 文件不存在: {csv_path}') return with open(csv_path, 'r', encoding='utf-8') as file: reader = csv.reader(file) header = next(reader, None) for row in reader: if len(row) < 3: logger.warning(f'行列数不足,跳过: {row}') continue shop_name, street, city = row[1], row[2], row[0] if ' (city)' in city: city = city.replace(' (city)', '') if 'Quebec' in city: continue if ',' in city: city = city.split(',')[0] logger.info(f'city: {city}, street: {street}, shop_name: {shop_name}') api.create_shop(city, street, shop_name) # def main2(): # api = Api() # city = 'Toronto' # shop = api.get_random_shop() # if shop: # logger.info(shop) # if __name__ == '__main__': # main() api = Api()