137 lines
4.6 KiB
Python
137 lines
4.6 KiB
Python
from flask import Flask, Blueprint, request, Response
|
|
from flask_security import SQLAlchemyUserDatastore, login_required, current_user, Security
|
|
from flask_security.utils import verify_and_update_password,login_user
|
|
from flask_restless import APIManager, ProcessingException
|
|
from models import db, User, Role, Gradeclass, Student, AttendanceUpdate, Presence
|
|
from sqlalchemy import func
|
|
import json
|
|
from datetime import datetime, date
|
|
from dateutil import parser
|
|
|
|
app = Flask(__name__)
|
|
app.config['DEBUG'] = True
|
|
app.config['PORT'] = 5001
|
|
app.config['SECRET_KEY'] = 'super-secret'
|
|
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:////tmp/db.sqlite'
|
|
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
|
|
app.config['DEFAULT_MAIL_SENDER'] = 'info@example.com'
|
|
app.config['SECURITY_PASSWORD_SALT'] = 'uaisfyasiduyaisiuf'
|
|
|
|
db.init_app(app)
|
|
user_datastore = SQLAlchemyUserDatastore(db, User, Role)
|
|
security = Security(app, user_datastore)
|
|
|
|
|
|
def auth_func(*args, **kwargs):
|
|
# if not current_user.is_authenticated:
|
|
# raise ProcessingException(description='Not authenticated', code=401)
|
|
return True
|
|
|
|
|
|
verify_logged_id = dict(GET_SINGLE=[auth_func], GET_MANY=[auth_func])
|
|
|
|
manager = APIManager(app, flask_sqlalchemy_db=db)
|
|
manager.create_api(Student, methods=[
|
|
'GET', 'PUT'], results_per_page=5, preprocessors=verify_logged_id)
|
|
manager.create_api(Gradeclass, methods=[
|
|
'GET', 'PUT'], results_per_page=5, preprocessors=verify_logged_id)
|
|
|
|
|
|
@app.before_first_request
|
|
def create_test_data():
|
|
try:
|
|
db.drop_all()
|
|
db.create_all()
|
|
for i in range(12):
|
|
g_cls = Gradeclass("Class {}".format(i))
|
|
db.session.add(g_cls)
|
|
for s in range(10):
|
|
stu = Student("TestStudent#{}-Class{}".format(s,i), g_cls.id)
|
|
db.session.add(stu)
|
|
user_datastore.create_user(
|
|
email='user@example.com', password='password')
|
|
db.session.commit()
|
|
except:
|
|
pass
|
|
|
|
|
|
def api_resp(data, code=200):
|
|
return Response(response=data,
|
|
status=code,
|
|
mimetype="application/json")
|
|
|
|
def parse_time(time_str):
|
|
utc_now = (time_str and time_str == 'now') or not time_str
|
|
time = datetime.utcnow() if utc_now else None
|
|
try:
|
|
time = parser.parse(attend_date)
|
|
except:
|
|
pass
|
|
return time
|
|
|
|
def upsert_attendance(update_type, object_id, attend_time, pres_enum):
|
|
existing = AttendanceUpdate.query.filter(
|
|
AttendanceUpdate.update_type == update_type
|
|
).filter(
|
|
AttendanceUpdate.value_identifier == object_id
|
|
).filter(
|
|
func.date(AttendanceUpdate.time) == attend_time.date()
|
|
).first()
|
|
if existing:
|
|
existing.presence = pres_enum
|
|
else:
|
|
new_entry = AttendanceUpdate(
|
|
update_type, object_id, attend_time, pres_enum)
|
|
db.session.add(new_entry)
|
|
db.session.commit()
|
|
|
|
def validate_attend(update_type, object_id):
|
|
return True
|
|
if update_type in ['class', 'student'] and object_id:
|
|
g_cls_found = update_type == 'class' and Gradeclass.query.get(object_id)
|
|
stud_found = update_type == 'student' and Student.query.get(object_id)
|
|
return g_cls_found or stud_found
|
|
else:
|
|
return False
|
|
|
|
|
|
presense_keys = [str(i).replace('Presence.', '').upper() for i in Presence]
|
|
|
|
|
|
@app.route('/api/attendance/<update_type>', methods=['POST'])
|
|
# @login_required
|
|
def update_attendance(update_type):
|
|
object_id = request.form.get('identifier', None)
|
|
presence = request.form.get('presence', None)
|
|
attend_str = request.form.get('time', None)
|
|
attend_time = parse_time(attend_str)
|
|
ret_data = (json.dumps({'status': 'invalid'}),400)
|
|
if validate_attend(update_type, object_id) and presence and attend_time and presence.upper() in presense_keys:
|
|
pres_enum = Presence[presence.upper()]
|
|
upsert_attendance(update_type, object_id, attend_time, pres_enum)
|
|
ret_data = (json.dumps({'status': 'updated'}),200)
|
|
return api_resp(*ret_data)
|
|
|
|
@app.route('/api/login',methods=['POST'])
|
|
def login():
|
|
username = request.form.get('username', None)
|
|
password = request.form.get('password', None)
|
|
print(username,password)
|
|
ret_data = (json.dumps({'status': 'failed'}),401)
|
|
if username and password:
|
|
user = User.query.filter(User.email == username).first()
|
|
if not user:
|
|
ret_data = (json.dumps({'status': 'notfound'}),401)
|
|
elif verify_and_update_password(password,user):
|
|
login_user(user)
|
|
ret_data = (json.dumps({'status': 'success'}),200)
|
|
return ret_data
|
|
|
|
@app.route('/')
|
|
def home():
|
|
return 'hello'
|
|
|
|
|
|
if __name__ == '__main__':
|
|
app.run()
|