业务逻辑是这样:
用户注册后需要用邮箱confirmed。
在本地一切OK,但是部署到heroku就无法confirmed。
model用单元测试一切通过的,百思不得其解,求助。。。
model:
from datetime import datetime
import hashlib
from werkzeug.security import generate_password_hash, check_password_hash
from itsdangerous import TimedJSONWebSignatureSerializer as Serializer
from markdown import markdown
import bleach
from flask import current_app, request, url_for
from flask.ext.login import UserMixin, AnonymousUserMixin
from app.exceptions import ValidationError
from . import db, login_manage
class Permissions:
FOLLOW = 0x01
COMMENT = 0x02
WRITE_ARTICLES = 0x04
MODERATE_COMMENTS = 0x08
ADMINISTER = 0x80
class Role(db.Model):
__tablename__ = 'roles'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64), unique=True)
default = db.Column(db.Boolean, default=False, index=True)
permissions = db.Column(db.Integer)
users = db.relationship('User', backref='role', lazy='dynamic')
@staticmethod
def insert_roles():
roles = {
'User': (Permissions.FOLLOW |
Permissions.COMMENT |
Permissions.WRITE_ARTICLES, True),
'Moderator': (Permissions.FOLLOW |
Permissions.COMMENT |
Permissions.WRITE_ARTICLES |
Permissions.MODERATE_COMMENTS, False),
'Administrator': (0xff, False)
}
for r in roles:
role = Role.query.filter_by(name=r).first()
if role is None:
role = Role(name=r)
role.permissions = roles[r][0]
role.default = roles[r][1]
db.session.add(role)
db.session.commit()
def __repr__(self):
return '<Role %r>' % self.name
class Follow(db.Model):
__tablename__ = 'follows'
follower_id = db.Column(db.Integer, db.ForeignKey('users.id'),
primary_key=True)
followed_id = db.Column(db.Integer, db.ForeignKey('users.id'),
primary_key=True)
timestamp = db.Column(db.DateTime, default=datetime.utcnow)
class User(UserMixin, db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
email = db.Column(db.String(64), unique=True, index=True)
username = db.Column(db.String(64), unique=True, index=True)
role_id = db.Column(db.Integer, db.ForeignKey('roles.id'))
password_hash = db.Column(db.String(128))
confirmed = db.Column(db.Boolean, default=False)
name = db.Column(db.String(64))
location = db.Column(db.String(64))
about_me = db.Column(db.Text())
member_since = db.Column(db.DateTime(), default=datetime.utcnow)
last_seen = db.Column(db.DateTime(), default=datetime.utcnow)
avatar_hash = db.Column(db.String(32))
posts = db.relationship('Post', backref='author', lazy='dynamic')
followed = db.relationship('Follow',
foreign_keys=[Follow.follower_id],
backref=db.backref('follower', lazy='joined'),
lazy='dynamic',
cascade='all, delete-orphan')
followers = db.relationship('Follow',
foreign_keys=[Follow.followed_id],
backref=db.backref('followed', lazy='joined'),
lazy='dynamic',
cascade='all, delete-orphan')
comments = db.relationship('Comment', backref='author', lazy='dynamic')
@staticmethod
def generate_fake(count=100):
from sqlalchemy.exc import IntegrityError
from random import seed
import forgery_py
seed()
for i in range(count):
u = User(email=forgery_py.internet.email_address(),
username=forgery_py.internet.user_name(True),
password=forgery_py.lorem_ipsum.word(),
confirmed=True,
name=forgery_py.name.full_name(),
location=forgery_py.address.city(),
about_me=forgery_py.lorem_ipsum.sentence(),
member_since=forgery_py.date.date(True))
db.session.add(u)
try:
db.session.commit()
except IntegrityError:
db.session.rollback()
@staticmethod
def add_self_follows():
for user in User.query.all():
if not user.is_following(user):
user.follow(user)
db.session.add(user)
db.session.commit()
def __init__(self, **kwargs):
super(User, self).__init__(**kwargs)
if self.role is None:
if self.email == current_app.config['FLASKY_ADMIN']:
self.role = Role.query.filter_by(permissions=0xff).first()
if self.role is None:
self.role = Role.query.filter_by(default=True).first()
if self.email is not None and self.avatar_hash is None:
self.avatar_hash = hashlib.md5(
self.email.encode('utf-8')).hexdigest()
self.followed.append(Follow(followed=self))
@property
def password(self):
raise AttributeError('password is not a readable attribute')
@password.setter
def password(self, password):
self.password_hash = generate_password_hash(password)
def verify_password(self, password):
return check_password_hash(self.password_hash, password)
def generate_confirmation_token(self, expiration=3600):
s = Serializer(current_app.config['SECRET_KEY'], expiration)
return s.dumps({'confirm': self.id})
def confirm(self, token):
s = Serializer(current_app.config['SECRET_KEY'])
try:
data = s.loads(token)
except:
return False
if data.get('confirm') != self.id:
return False
self.confirmed = True
db.session.add(self)
return True
def generate_reset_token(self, expiration=3600):
s = Serializer(current_app.config['SECRET_KEY'], expiration)
return s.dumps({'reset': self.id})
def reset_password(self, token, new_password):
s = Serializer(current_app.config['SECRET_KEY'])
try:
data = s.loads(token)
except:
return False
if data.get('reset') != self.id:
return False
self.password = new_password
db.session.add(self)
return True
def generate_email_change_token(self, new_email, expiration=3600):
s = Serializer(current_app.config['SECRET_KEY'], expiration)
return s.dumps({'change_email': self.id, 'new_email': new_email})
def change_email(self, token):
s = Serializer(current_app.config['SECRET_KEY'])
try:
data = s.loads(token)
except:
return False
if data.get('change_email') != self.id:
return False
new_email = data.get('new_email')
if new_email is None:
return False
if self.query.filter_by(email=new_email).first() is not None:
return False
self.email = new_email
self.avatar_hash = hashlib.md5(
self.email.encode('utf-8')).hexdigest()
db.session.add(self)
return True
def can(self, permissions):
return self.role is not None and \
(self.role.permissions & permissions) == permissions
def is_administrator(self):
return self.can(Permissions.ADMINISTER)
def ping(self):
self.last_seen = datetime.utcnow()
db.session.add(self)
def gravatar(self, size=100, default='identicon', rating='g'):
if request.is_secure:
url = 'https://secure.gravatar.com/avatar'
else:
url = 'http://www.gravatar.com/avatar'
hash = self.avatar_hash or hashlib.md5(
self.email.encode('utf-8')).hexdigest()
return '{url}/{hash}?s={size}&d={default}&r={rating}'.format(
url=url, hash=hash, size=size, default=default, rating=rating)
def follow(self, user):
if not self.is_following(user):
f = Follow(follower=self, followed=user)
db.session.add(f)
def unfollow(self, user):
f = self.followed.filter_by(followed_id=user.id).first()
if f:
db.session.delete(f)
def is_following(self, user):
return self.followed.filter_by(
followed_id=user.id).first() is not None
def is_followed_by(self, user):
return self.followers.filter_by(
follower_id=user.id).first() is not None
@property
def followed_posts(self):
return Post.query.join(Follow, Follow.followed_id == Post.author_id)\
.filter(Follow.follower_id == self.id)
def to_json(self):
json_user = {
'url': url_for('api.get_post', id=self.id, _external=True),
'username': self.username,
'member_since': self.member_since,
'last_seen': self.last_seen,
'posts': url_for('api.get_user_posts', id=self.id, _external=True),
'followed_posts': url_for('api.get_user_followed_posts',
id=self.id, _external=True),
'post_count': self.posts.count()
}
return json_user
def generate_auth_token(self, expiration):
s = Serializer(current_app.config['SECRET_KEY'],
expires_in=expiration)
return s.dumps({'id': self.id}).decode('ascii')
@staticmethod
def verify_auth_token(token):
s = Serializer(current_app.config['SECRET_KEY'])
try:
data = s.loads(token)
except:
return None
return User.query.get(data['id'])
def __repr__(self):
return '<User %r>' % self.username
class AnonymousUser(AnonymousUserMixin):
def can(self, permissions):
return False
def is_administrator(self):
return False
login_manage.anonymous_user = AnonymousUser
@login_manage.user_loader
def load_user(user_id):
return User.query.get(int(user_id))
class Post(db.Model):
__tablename__ = 'posts'
id = db.Column(db.Integer, primary_key=True)
body = db.Column(db.Text)
body_html = db.Column(db.Text)
timestamp = db.Column(db.DateTime, index=True, default=datetime.utcnow)
author_id = db.Column(db.Integer, db.ForeignKey('users.id'))
comments = db.relationship('Comment', backref='post', lazy='dynamic')
@staticmethod
def generate_fake(count=100):
from random import seed, randint
import forgery_py
seed()
user_count = User.query.count()
for i in range(count):
u = User.query.offset(randint(0, user_count - 1)).first()
p = Post(body=forgery_py.lorem_ipsum.sentences(randint(1, 5)),
timestamp=forgery_py.date.date(True),
author=u)
db.session.add(p)
db.session.commit()
@staticmethod
def on_changed_body(target, value, oldvalue, initiator):
allowed_tags = ['a', 'abbr', 'acronym', 'b', 'blockquote', 'code',
'em', 'i', 'li', 'ol', 'pre', 'strong', 'ul',
'h1', 'h2', 'h3', 'p']
target.body_html = bleach.linkify(bleach.clean(
markdown(value, output_format='html'),
tags=allowed_tags, strip=True))
def to_json(self):
json_post = {
'url': url_for('api.get_post', id=self.id, _external=True),
'body': self.body,
'body_html': self.body_html,
'timestamp': self.timestamp,
'author': url_for('api.get_user', id=self.author_id,
_external=True),
'comments': url_for('api.get_post_comments', id=self.id,
_external=True),
'comment_count': self.comments.count()
}
return json_post
@staticmethod
def from_json(json_post):
body = json_post.get('body')
if body is None or body == '':
raise ValidationError('post does not have a body')
return Post(body=body)
db.event.listen(Post.body, 'set', Post.on_changed_body)
class Comment(db.Model):
__tablename__ = 'comments'
id = db.Column(db.Integer, primary_key=True)
body = db.Column(db.Text)
body_html = db.Column(db.Text)
timestamp = db.Column(db.DateTime, index=True, default=datetime.utcnow)
disabled = db.Column(db.Boolean)
author_id = db.Column(db.Integer, db.ForeignKey('users.id'))
post_id = db.Column(db.Integer, db.ForeignKey('posts.id'))
@staticmethod
def on_changed_body(target, value, oldvalue, initiator):
allowed_tags = ['a', 'abbr', 'acronym', 'b', 'code', 'em', 'i',
'strong']
target.body_html = bleach.linkify(bleach.clean(
markdown(value, output_format='html'),
tags=allowed_tags, strip=True))
def to_json(self):
json_comment = {
'url': url_for('api.get_comment', id=self.id, _external=True),
'post': url_for('api.get_post', id=self.post_id, _external=True),
'body': self.body,
'body_html': self.body_html,
'timestamp': self.timestamp,
'author': url_for('api.get_user', id=self.author_id,
_external=True),
}
return json_comment
@staticmethod
def from_json(json_comment):
body = json_comment.get('body')
if body is None or body == '':
raise ValidationError('comment does not have a body')
return Comment(body=body)
db.event.listen(Comment.body, 'set', Comment.on_changed_body)
view:
from flask import render_template, redirect, request, url_for, flash
from flask.ext.login import login_user, logout_user, login_required, \
current_user
from . import auth
from .. import db
from ..models import User
from ..email import send_email
from .forms import LoginForm, RegistrationForm, ChangePasswordForm,\
PasswordResetRequestForm, PasswordResetForm, ChangeEmailForm
@auth.before_app_request
def before_request():
if current_user.is_authenticated:
current_user.ping()
if not current_user.confirmed \
and request.endpoint[:5] != 'auth.' \
and request.endpoint != 'static':
return redirect(url_for('auth.unconfirmed'))
@auth.route('/unconfirmed')
def unconfirmed():
if current_user.is_anonymous or current_user.confirmed:
return redirect(url_for('main.index'))
return render_template('auth/unconfirmed.html')
@auth.route('/login', methods=['GET', 'POST'])
def login():
form = LoginForm()
if form.validate_on_submit():
user = User.query.filter_by(email=form.email.data).first()
if user is not None and user.verify_password(form.password.data):
login_user(user, form.remember_me.data)
return redirect(request.args.get('next') or url_for('main.index'))
flash('Invalid username or password.')
return render_template('auth/login.html', form=form)
@auth.route('/logout')
@login_required
def logout():
logout_user()
flash('You have been logged out.')
return redirect(url_for('main.index'))
@auth.route('/register', methods=['GET', 'POST'])
def register():
form = RegistrationForm()
if form.validate_on_submit():
user = User(email=form.email.data,
username=form.username.data,
password=form.password.data)
db.session.add(user)
db.session.commit()
token = user.generate_confirmation_token()
send_email(user.email, 'Confirm Your Account',
'auth/email/confirm', user=user, token=token)
flash('A confirmation email has been sent to you by email.')
return redirect(url_for('auth.login'))
return render_template('auth/register.html', form=form)
@auth.route('/confirm/<token>')
@login_required
def confirm(token):
if current_user.confirmed:
return redirect(url_for('main.index'))
if current_user.confirm(token):
flash('You have confirmed your account. Thanks!')
else:
flash('The confirmation link is invalid or has expired.')
return redirect(url_for('main.index'))
@auth.route('/confirm')
@login_required
def resend_confirmation():
token = current_user.generate_confirmation_token()
send_email(current_user.email, 'Confirm Your Account',
'auth/email/confirm', user=current_user, token=token)
flash('A new confirmation email has been sent to you by email.')
return redirect(url_for('main.index'))
@auth.route('/change-password', methods=['GET', 'POST'])
@login_required
def change_password():
form = ChangePasswordForm()
if form.validate_on_submit():
if current_user.verify_password(form.old_password.data):
current_user.password = form.password.data
db.session.add(current_user)
flash('Your password has been updated.')
return redirect(url_for('main.index'))
else:
flash('Invalid password.')
return render_template("auth/change_password.html", form=form)
@auth.route('/reset', methods=['GET', 'POST'])
def password_reset_request():
if not current_user.is_anonymous:
return redirect(url_for('main.index'))
form = PasswordResetRequestForm()
if form.validate_on_submit():
user = User.query.filter_by(email=form.email.data).first()
if user:
token = user.generate_reset_token()
send_email(user.email, 'Reset Your Password',
'auth/email/reset_password',
user=user, token=token,
next=request.args.get('next'))
flash('An email with instructions to reset your password has been '
'sent to you.')
return redirect(url_for('auth.login'))
return render_template('auth/reset_password.html', form=form)
@auth.route('/reset/<token>', methods=['GET', 'POST'])
def password_reset(token):
if not current_user.is_anonymous:
return redirect(url_for('main.index'))
form = PasswordResetForm()
if form.validate_on_submit():
user = User.query.filter_by(email=form.email.data).first()
if user is None:
return redirect(url_for('main.index'))
if user.reset_password(token, form.password.data):
flash('Your password has been updated.')
return redirect(url_for('auth.login'))
else:
return redirect(url_for('main.index'))
return render_template('auth/reset_password.html', form=form)
@auth.route('/change-email', methods=['GET', 'POST'])
@login_required
def change_email_request():
form = ChangeEmailForm()
if form.validate_on_submit():
if current_user.verify_password(form.password.data):
new_email = form.email.data
token = current_user.generate_email_change_token(new_email)
send_email(new_email, 'Confirm your email address',
'auth/email/change_email',
user=current_user, token=token)
flash('An email with instructions to confirm your new email '
'address has been sent to you.')
return redirect(url_for('main.index'))
else:
flash('Invalid email or password.')
return render_template("auth/change_email.html", form=form)
@auth.route('/change-email/<token>')
@login_required
def change_email(token):
if current_user.change_email(token):
flash('Your email address has been updated.')
else:
flash('Invalid request.')
return redirect(url_for('main.index'))
unconfirmed.html:
{% extends "base.html" %}
{% block title %}Flasky - Confirm your account{% endblock %}
{% block page_content %}
<div class="page-header">
<h1>
Hello, {{ current_user.username }}!
</h1>
{% if current_user.confirmed %}
<h3>one</h3>
{% else %}
<h3>two</h3>
{% endif %}
<h3>You have not confirmed your account yet.</h3>
<p>
Before you can access this site you need to confirm your account.
Check your inbox, you should have received an email with a confirmation link.
</p>
<p>
Need another confirmation email?
<a href="{{ url_for('auth.resend_confirmation') }}">Click here</a>
</p>
</div>
{% endblock %}
I think it matches your description. A temporary solution is to downgrade gunicorn to 18.0. But this is a Flask bug which is already fixed towards the next release, whenever that might happen.
db.session.commit()
**is not performed automatically at the end of each request, despite having SQLALCHEMY_COMMIT_ON_TEARDOWN = True in config.py.This issue was noted after analysing why user confirmation was not working properly.
The app is hosted on heroku (can be reached here), is using Heroku Postgres and is based on this repository, which is forked from the original 17d.**
在所有的
db.session.add()
之后加上db.session.commit()
试一下能否有效。