importtimefromtypingimportListfromfastapiimportDepends,FastAPI,HTTPExceptionfrom.importcrud,database,models,schemasfrom.databaseimportdb_state_defaultdatabase.db.connect()database.db.create_tables([models.User,models.Item])database.db.close()app=FastAPI()sleep_time=10asyncdefreset_db_state():database.db._state._state.set(db_state_default.copy())database.db._state.reset()defget_db(db_state=Depends(reset_db_state)):try:database.db.connect()yieldfinally:ifnotdatabase.db.is_closed():database.db.close()@app.post("/users/",response_model=schemas.User,dependencies=[Depends(get_db)])defcreate_user(user:schemas.UserCreate):db_user=crud.get_user_by_email(email=user.email)ifdb_user:raiseHTTPException(status_code=400,detail="Email already registered")returncrud.create_user(user=user)@app.get("/users/",response_model=List[schemas.User],dependencies=[Depends(get_db)])defread_users(skip:int=0,limit:int=100):users=crud.get_users(skip=skip,limit=limit)returnusers@app.get("/users/{user_id}",response_model=schemas.User,dependencies=[Depends(get_db)])defread_user(user_id:int):db_user=crud.get_user(user_id=user_id)ifdb_userisNone:raiseHTTPException(status_code=404,detail="User not found")returndb_user@app.post("/users/{user_id}/items/",response_model=schemas.Item,dependencies=[Depends(get_db)],)defcreate_item_for_user(user_id:int,item:schemas.ItemCreate):returncrud.create_user_item(item=item,user_id=user_id)@app.get("/items/",response_model=List[schemas.Item],dependencies=[Depends(get_db)])defread_items(skip:int=0,limit:int=100):items=crud.get_items(skip=skip,limit=limit)returnitems@app.get("/slowusers/",response_model=List[schemas.User],dependencies=[Depends(get_db)])defread_slow_users(skip:int=0,limit:int=100):globalsleep_timesleep_time=max(0,sleep_time-1)time.sleep(sleep_time)# Fake long processing requestusers=crud.get_users(skip=skip,limit=limit)returnusers
importtimefromtypingimportListfromfastapiimportDepends,FastAPI,HTTPExceptionfrom.importcrud,database,models,schemasfrom.databaseimportdb_state_defaultdatabase.db.connect()database.db.create_tables([models.User,models.Item])database.db.close()app=FastAPI()sleep_time=10asyncdefreset_db_state():database.db._state._state.set(db_state_default.copy())database.db._state.reset()defget_db(db_state=Depends(reset_db_state)):try:database.db.connect()yieldfinally:ifnotdatabase.db.is_closed():database.db.close()@app.post("/users/",response_model=schemas.User,dependencies=[Depends(get_db)])defcreate_user(user:schemas.UserCreate):db_user=crud.get_user_by_email(email=user.email)ifdb_user:raiseHTTPException(status_code=400,detail="Email already registered")returncrud.create_user(user=user)@app.get("/users/",response_model=List[schemas.User],dependencies=[Depends(get_db)])defread_users(skip:int=0,limit:int=100):users=crud.get_users(skip=skip,limit=limit)returnusers@app.get("/users/{user_id}",response_model=schemas.User,dependencies=[Depends(get_db)])defread_user(user_id:int):db_user=crud.get_user(user_id=user_id)ifdb_userisNone:raiseHTTPException(status_code=404,detail="User not found")returndb_user@app.post("/users/{user_id}/items/",response_model=schemas.Item,dependencies=[Depends(get_db)],)defcreate_item_for_user(user_id:int,item:schemas.ItemCreate):returncrud.create_user_item(item=item,user_id=user_id)@app.get("/items/",response_model=List[schemas.Item],dependencies=[Depends(get_db)])defread_items(skip:int=0,limit:int=100):items=crud.get_items(skip=skip,limit=limit)returnitems@app.get("/slowusers/",response_model=List[schemas.User],dependencies=[Depends(get_db)])defread_slow_users(skip:int=0,limit:int=100):globalsleep_timesleep_time=max(0,sleep_time-1)time.sleep(sleep_time)# Fake long processing requestusers=crud.get_users(skip=skip,limit=limit)returnusers
importtimefromtypingimportListfromfastapiimportDepends,FastAPI,HTTPExceptionfrom.importcrud,database,models,schemasfrom.databaseimportdb_state_defaultdatabase.db.connect()database.db.create_tables([models.User,models.Item])database.db.close()app=FastAPI()sleep_time=10asyncdefreset_db_state():database.db._state._state.set(db_state_default.copy())database.db._state.reset()defget_db(db_state=Depends(reset_db_state)):try:database.db.connect()yieldfinally:ifnotdatabase.db.is_closed():database.db.close()@app.post("/users/",response_model=schemas.User,dependencies=[Depends(get_db)])defcreate_user(user:schemas.UserCreate):db_user=crud.get_user_by_email(email=user.email)ifdb_user:raiseHTTPException(status_code=400,detail="Email already registered")returncrud.create_user(user=user)@app.get("/users/",response_model=List[schemas.User],dependencies=[Depends(get_db)])defread_users(skip:int=0,limit:int=100):users=crud.get_users(skip=skip,limit=limit)returnusers@app.get("/users/{user_id}",response_model=schemas.User,dependencies=[Depends(get_db)])defread_user(user_id:int):db_user=crud.get_user(user_id=user_id)ifdb_userisNone:raiseHTTPException(status_code=404,detail="User not found")returndb_user@app.post("/users/{user_id}/items/",response_model=schemas.Item,dependencies=[Depends(get_db)],)defcreate_item_for_user(user_id:int,item:schemas.ItemCreate):returncrud.create_user_item(item=item,user_id=user_id)@app.get("/items/",response_model=List[schemas.Item],dependencies=[Depends(get_db)])defread_items(skip:int=0,limit:int=100):items=crud.get_items(skip=skip,limit=limit)returnitems@app.get("/slowusers/",response_model=List[schemas.User],dependencies=[Depends(get_db)])defread_slow_users(skip:int=0,limit:int=100):globalsleep_timesleep_time=max(0,sleep_time-1)time.sleep(sleep_time)# Fake long processing requestusers=crud.get_users(skip=skip,limit=limit)returnusers
importtimefromtypingimportListfromfastapiimportDepends,FastAPI,HTTPExceptionfrom.importcrud,database,models,schemasfrom.databaseimportdb_state_defaultdatabase.db.connect()database.db.create_tables([models.User,models.Item])database.db.close()app=FastAPI()sleep_time=10asyncdefreset_db_state():database.db._state._state.set(db_state_default.copy())database.db._state.reset()defget_db(db_state=Depends(reset_db_state)):try:database.db.connect()yieldfinally:ifnotdatabase.db.is_closed():database.db.close()@app.post("/users/",response_model=schemas.User,dependencies=[Depends(get_db)])defcreate_user(user:schemas.UserCreate):db_user=crud.get_user_by_email(email=user.email)ifdb_user:raiseHTTPException(status_code=400,detail="Email already registered")returncrud.create_user(user=user)@app.get("/users/",response_model=List[schemas.User],dependencies=[Depends(get_db)])defread_users(skip:int=0,limit:int=100):users=crud.get_users(skip=skip,limit=limit)returnusers@app.get("/users/{user_id}",response_model=schemas.User,dependencies=[Depends(get_db)])defread_user(user_id:int):db_user=crud.get_user(user_id=user_id)ifdb_userisNone:raiseHTTPException(status_code=404,detail="User not found")returndb_user@app.post("/users/{user_id}/items/",response_model=schemas.Item,dependencies=[Depends(get_db)],)defcreate_item_for_user(user_id:int,item:schemas.ItemCreate):returncrud.create_user_item(item=item,user_id=user_id)@app.get("/items/",response_model=List[schemas.Item],dependencies=[Depends(get_db)])defread_items(skip:int=0,limit:int=100):items=crud.get_items(skip=skip,limit=limit)returnitems@app.get("/slowusers/",response_model=List[schemas.User],dependencies=[Depends(get_db)])defread_slow_users(skip:int=0,limit:int=100):globalsleep_timesleep_time=max(0,sleep_time-1)time.sleep(sleep_time)# Fake long processing requestusers=crud.get_users(skip=skip,limit=limit)returnusers
importtimefromtypingimportListfromfastapiimportDepends,FastAPI,HTTPExceptionfrom.importcrud,database,models,schemasfrom.databaseimportdb_state_defaultdatabase.db.connect()database.db.create_tables([models.User,models.Item])database.db.close()app=FastAPI()sleep_time=10asyncdefreset_db_state():database.db._state._state.set(db_state_default.copy())database.db._state.reset()defget_db(db_state=Depends(reset_db_state)):try:database.db.connect()yieldfinally:ifnotdatabase.db.is_closed():database.db.close()@app.post("/users/",response_model=schemas.User,dependencies=[Depends(get_db)])defcreate_user(user:schemas.UserCreate):db_user=crud.get_user_by_email(email=user.email)ifdb_user:raiseHTTPException(status_code=400,detail="Email already registered")returncrud.create_user(user=user)@app.get("/users/",response_model=List[schemas.User],dependencies=[Depends(get_db)])defread_users(skip:int=0,limit:int=100):users=crud.get_users(skip=skip,limit=limit)returnusers@app.get("/users/{user_id}",response_model=schemas.User,dependencies=[Depends(get_db)])defread_user(user_id:int):db_user=crud.get_user(user_id=user_id)ifdb_userisNone:raiseHTTPException(status_code=404,detail="User not found")returndb_user@app.post("/users/{user_id}/items/",response_model=schemas.Item,dependencies=[Depends(get_db)],)defcreate_item_for_user(user_id:int,item:schemas.ItemCreate):returncrud.create_user_item(item=item,user_id=user_id)@app.get("/items/",response_model=List[schemas.Item],dependencies=[Depends(get_db)])defread_items(skip:int=0,limit:int=100):items=crud.get_items(skip=skip,limit=limit)returnitems@app.get("/slowusers/",response_model=List[schemas.User],dependencies=[Depends(get_db)])defread_slow_users(skip:int=0,limit:int=100):globalsleep_timesleep_time=max(0,sleep_time-1)time.sleep(sleep_time)# Fake long processing requestusers=crud.get_users(skip=skip,limit=limit)returnusers
importtimefromtypingimportListfromfastapiimportDepends,FastAPI,HTTPExceptionfrom.importcrud,database,models,schemasfrom.databaseimportdb_state_defaultdatabase.db.connect()database.db.create_tables([models.User,models.Item])database.db.close()app=FastAPI()sleep_time=10asyncdefreset_db_state():database.db._state._state.set(db_state_default.copy())database.db._state.reset()defget_db(db_state=Depends(reset_db_state)):try:database.db.connect()yieldfinally:ifnotdatabase.db.is_closed():database.db.close()@app.post("/users/",response_model=schemas.User,dependencies=[Depends(get_db)])defcreate_user(user:schemas.UserCreate):db_user=crud.get_user_by_email(email=user.email)ifdb_user:raiseHTTPException(status_code=400,detail="Email already registered")returncrud.create_user(user=user)@app.get("/users/",response_model=List[schemas.User],dependencies=[Depends(get_db)])defread_users(skip:int=0,limit:int=100):users=crud.get_users(skip=skip,limit=limit)returnusers@app.get("/users/{user_id}",response_model=schemas.User,dependencies=[Depends(get_db)])defread_user(user_id:int):db_user=crud.get_user(user_id=user_id)ifdb_userisNone:raiseHTTPException(status_code=404,detail="User not found")returndb_user@app.post("/users/{user_id}/items/",response_model=schemas.Item,dependencies=[Depends(get_db)],)defcreate_item_for_user(user_id:int,item:schemas.ItemCreate):returncrud.create_user_item(item=item,user_id=user_id)@app.get("/items/",response_model=List[schemas.Item],dependencies=[Depends(get_db)])defread_items(skip:int=0,limit:int=100):items=crud.get_items(skip=skip,limit=limit)returnitems@app.get("/slowusers/",response_model=List[schemas.User],dependencies=[Depends(get_db)])defread_slow_users(skip:int=0,limit:int=100):globalsleep_timesleep_time=max(0,sleep_time-1)time.sleep(sleep_time)# Fake long processing requestusers=crud.get_users(skip=skip,limit=limit)returnusers