SQLAlchemy 模型到字典
要添加序列化方法,您的所有 SQLAlchemy 模型都继承自抽象基类。这个基类定义了to_dict
遍历模型列并返回字典的方法。
from flask import json
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy.orm.attributes import QueryableAttribute
from wakatime_website import app
db = SQLAlchemy(app)
class BaseModel(db.Model):
__abstract__ = True
def to_dict(self, show=None, _hide=None, _path=None):
"""Return a dictionary representation of this model."""
show = show or []
_hide = _hide or []
hidden = self._hidden_fields if hasattr(self, "_hidden_fields") else []
default = self._default_fields if hasattr(self, "_default_fields") else []
default.extend(['id', 'modified_at', 'created_at'])
if not _path:
_path = self.__tablename__.lower()
def prepend_path(item):
item = item.lower()
if item.split(".", 1)[0] == _path:
return item
if len(item) == 0:
return item
if item[0] != ".":
item = ".%s" % item
item = "%s%s" % (_path, item)
return item
_hide[:] = [prepend_path(x) for x in _hide]
show[:] = [prepend_path(x) for x in show]
columns = self.__table__.columns.keys()
relationships = self.__mapper__.relationships.keys()
properties = dir(self)
ret_data = {}
for key in columns:
if key.startswith("_"):
continue
check = "%s.%s" % (_path, key)
if check in _hide or key in hidden:
continue
if check in show or key in default:
ret_data[key] = getattr(self, key)
for key in relationships:
if key.startswith("_"):
continue
check = "%s.%s" % (_path, key)
if check in _hide or key in hidden:
continue
if check in show or key in default:
_hide.append(check)
is_list = self.__mapper__.relationships[key].uselist
if is_list:
items = getattr(self, key)
if self.__mapper__.relationships[key].query_class is not None:
if hasattr(items, "all"):
items = items.all()
ret_data[key] = []
for item in items:
ret_data[key].append(
item.to_dict(
show=list(show),
_hide=list(_hide),
_path=("%s.%s" % (_path, key.lower())),
)
)
else:
if (
self.__mapper__.relationships[key].query_class is not None
or self.__mapper__.relationships[key].instrument_class
is not None
):
item = getattr(self, key)
if item is not None:
ret_data[key] = item.to_dict(
show=list(show),
_hide=list(_hide),
_path=("%s.%s" % (_path, key.lower())),
)
else:
ret_data[key] = None
else:
ret_data[key] = getattr(self, key)
for key in list(set(properties) - set(columns) - set(relationships)):
if key.startswith("_"):
continue
if not hasattr(self.__class__, key):
continue
attr = getattr(self.__class__, key)
if not (isinstance(attr, property) or isinstance(attr, QueryableAttribute)):
continue
check = "%s.%s" % (_path, key)
if check in _hide or key in hidden:
continue
if check in show or key in default:
val = getattr(self, key)
if hasattr(val, "to_dict"):
ret_data[key] = val.to_dict(
show=list(show),
_hide=list(_hide),
_path=('%s.%s' % (_path, key.lower())),
)
else:
try:
ret_data[key] = json.loads(json.dumps(val))
except:
pass
return ret_data
现在我们使用这个基类来打印 aUser
作为字典。
class User(BaseModel):
id = db.Column(UUID(), primary_key=True, default=uuid.uuid4)
username = db.Column(db.String(), nullabe=False, unique=True)
password = db.Column(db.String())
email_confirmed = db.Column(db.Boolean())
modified_at = db.Column(db.DateTime())
created_at = db.Column(db.DateTime(), nullable=False, default=datetime.utcnow)
_default_fields = [
"username",
"joined_recently",
]
_hidden_fields = [
"password",
]
_readonly_fields = [
"email_confirmed",
]
@property
def joined_recently(self):
return self.created_at > datetime.utcnow() - timedelta(days=3)
user = User(username="zzzeek")
db.session.add(user)
db.session.commit()
print(user.to_dict())
哪个打印:
{
'id': UUID('488345de-88a1-4c87-9304-46a1a31c9414'),
'username': 'zzzeek',
'joined_recently': True,
'modified_at': None,
'created_at': datetime.datetime(2018, 7, 11, 6, 28, 56, 905379),
}
并且很容易通过以下方式进行 jsonified:
json.dumps(user.to_dict())
默认值和隐藏字段
您可能已经注意到User
列出默认和隐藏字段的属性。这些允许您自定义User
返回的字典中包含哪些列。例如,如果您想包含email_confirmed
在您的序列化用户中,您可以:
print(user.to_dict(show=['email_confirmed', 'password']))
哪个打印:
{
'id': UUID('488345de-88a1-4c87-9304-46a1a31c9414'),
'username': 'zzzeek',
'email_confirmed': None,
'joined_recently': True,
'modified_at': None,
'created_at': datetime.datetime(2018, 7, 11, 6, 28, 56, 905379),
}
还要注意password
不包括在内,因为它被列为隐藏在User
.
从字典更新 SQLAlchemy 模型
我们有一个to_dict
方法,但为了支持 POST、PUT 和 PATCH 方法,我们需要一个from_dict
方法,该方法接受一个字典并使用提供的数据更新模型的列。让我们像这样添加from_dict
方法:
from sqlalchemy.sql.expression import not_
class BaseModel(db.Model):
__abstract__ = True
def __init__(self, **kwargs):
kwargs["_force"] = True
self.from_dict(**kwargs)
def to_dict(self, show=None, _hide=[], _path=None):
...
def from_dict(self, **kwargs):
"""Update this model with a dictionary."""
_force = kwargs.pop("_force", False)
readonly = self._readonly_fields if hasattr(self, "_readonly_fields") else []
if hasattr(self, "_hidden_fields"):
readonly += self._hidden_fields
readonly += ["id", "created_at", "modified_at"]
columns = self.__table__.columns.keys()
relationships = self.__mapper__.relationships.keys()
properties = dir(self)
changes = {}
for key in columns:
if key.startswith("_"):
continue
allowed = True if _force or key not in readonly else False
exists = True if key in kwargs else False
if allowed and exists:
val = getattr(self, key)
if val != kwargs[key]:
changes[key] = {"old": val, "new": kwargs[key]}
setattr(self, key, kwargs[key])
for rel in relationships:
if key.startswith("_"):
continue
allowed = True if _force or rel not in readonly else False
exists = True if rel in kwargs else False
if allowed and exists:
is_list = self.__mapper__.relationships[rel].uselist
if is_list:
valid_ids = []
query = getattr(self, rel)
cls = self.__mapper__.relationships[rel].entity.class_
for item in kwargs[rel]:
if (
"id" in item
and query.filter_by(id=item["id"]).limit(1).count() == 1
):
obj = cls.query.filter_by(id=item["id"]).first()
col_changes = obj.from_dict(**item)
if col_changes:
col_changes["id"] = str(item["id"])
if rel in changes:
changes[rel].append(col_changes)
else:
changes.update({rel: [col_changes]})
valid_ids.append(str(item["id"]))
else:
col = cls()
col_changes = col.from_dict(**item)
query.append(col)
db.session.flush()
if col_changes:
col_changes["id"] = str(col.id)
if rel in changes:
changes[rel].append(col_changes)
else:
changes.update({rel: [col_changes]})
valid_ids.append(str(col.id))
# delete rows from relationship that were not in kwargs[rel]
for item in query.filter(not_(cls.id.in_(valid_ids))).all():
col_changes = {"id": str(item.id), "deleted": True}
if rel in changes:
changes[rel].append(col_changes)
else:
changes.update({rel: [col_changes]})
db.session.delete(item)
else:
val = getattr(self, rel)
if self.__mapper__.relationships[rel].query_class is not None:
if val is not None:
col_changes = val.from_dict(**kwargs[rel])
if col_changes:
changes.update({rel: col_changes})
else:
if val != kwargs[rel]:
setattr(self, rel, kwargs[rel])
changes[rel] = {"old": val, "new": kwargs[rel]}
for key in list(set(properties) - set(columns) - set(relationships)):
if key.startswith("_"):
continue
allowed = True if _force or key not in readonly else False
exists = True if key in kwargs else False
if allowed and exists and getattr(self.__class__, key).fset is not None:
val = getattr(self, key)
if hasattr(val, "to_dict"):
val = val.to_dict()
changes[key] = {"old": val, "new": kwargs[key]}
setattr(self, key, kwargs[key])
return changes
使用新from_dict
方法,我们用字典更新我们的用户:
updates = {
"username": "zoe",
"email_confirmed": True,
}
user.from_dict(**updates)
db.session.commit()
print(user.to_dict(show=['email_confirmed']))
哪个打印:
{
'id': UUID('488345de-88a1-4c87-9304-46a1a31c9414'),
'username': 'zoe',
'email_confirmed': None,
'joined_recently': True,
'modified_at': datetime.datetime(2018, 7, 11, 6, 36, 47, 939084),
'created_at': datetime.datetime(2018, 7, 11, 6, 28, 56, 905379),
}
请注意,这email_confirmed
仍然是None
因为它被标记为只读。
关系
我们的to_dict
和from_dict
方法也适用于人际关系。例如,当我们的User
模型有很多模型时,我们可以默认Goal
序列化或使用:Goal
show
class User(BaseModel):
...
goals = db.relationship('Goal', backref='user', lazy='dynamic')
class Goal(BaseModel):
id = db.Column(UUID(), primary_key=True, default=uuid.uuid4)
title = db.Column(db.String(), nullabe=False)
accomplished = db.Column(db.Boolean())
created_at = db.Column(db.DateTime(), nullable=False, default=datetime.utcnow)
_default_fields = [
"title",
]
goal = Goal(title="Mountain", accomplished=True)
user.goals.append(goal)
db.session.commit()
print(user.to_dict(show=['goals', 'goals.accomplished']))
哪个打印:
{
'id': UUID('488345de-88a1-4c87-9304-46a1a31c9414'),
'username': 'zoe',
'goals': [
{
'id': UUID('c72cfef0-0988-45e4-9f4b-8a4a7d4f8d8f'),
'title': 'Mountain',
'accomplished': True,
'created_at': datetime.datetime(2018, 7, 11, 6, 45, 18, 299924),
},
],
'joined_recently': True,
'modified_at': datetime.datetime(2018, 7, 11, 6, 36, 47, 939084),
'created_at': datetime.datetime(2018, 7, 11, 6, 28, 56, 905379),
}
它甚至允许自定义关系列,例如:goals.accomplished
.
原创文章,作者:admin,如若转载,请注明出处:http://106.13.203.214:8080/flask-sqlalchemy-to-json/