记一个 PostgreSQL 事务未提交问题

最近遇到一个因应用端查询事务未提交导致数仓跑书脚本阻塞的问题。

通过查看数据库的状态信息,定位到原因是:查询对表持有 Access Share 锁,而跑数脚本要先清空这张表,调用了 truncate 命令,而 truncate 命令会请求 Access Exclusive 锁。这两个锁是互斥的,一个事务持有了其中一个锁,只要事务未提交,另一个请求互斥锁的事务必须等待。

但是什么导致事务未提交呢?

初步分析

应用的代码是基于 Flask,使用了 SQLAclhemy 这个 ORM,并在查询的地方做了一层封装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@staticmethod
def fetch_data_sql(sql, params, bind_key=None):
result = []
render_sql(sql=sql, params=params)
try:
sql = text(sql)
con = db.session.execute(sql, params, bind=db.get_engine(current_app, bind=bind_key))
if con.returns_rows:
result = con.fetchall()
except Exception as er:
LOG.error("fetch data error: {0}, sql is: {1}".format(er, sql))
raise DBError()
else:
return result

注意到这里 db.session.exuecute() 发起了一个查询,而 db.session 在默认 autocommit=False 的情况下是会自动开启事务的(参见:Flask-SQLAlchmey 是如何执行查询的),问题似乎是开启了事务,而没有调用 db.session.commit() 提交事务。

但仔细一想,有两个疑问:

  1. 如果是因为没有调用 db.session.commit(),那这个问题应该存在已久,数据库应该存在大量未提交的事务;
  2. 既然自动开启了事务,为什么没有自动关闭事务呢?按理说事务的生命周期应该和请求保持一致的。

第一个疑问很好验证,从应用发起一个查询,通过 pg_stat_activitypg_locks 查看事务和锁的状态,结果是,查询结束后,事务自动提交了,持有的锁也自动释放了。

可以得到初步结论,即使没有调用 db.session.commit() ,事务也是会自动提交的。

那是在什么地方提交的呢?SQLAlchemy 虽然帮我们自动开启了事务,但是它是不知道我们要在事务里提交几个查询的,所以它应该不会帮我们去提交,那应该就是在 flask-sqlalchemy 里处理的了。一个请求结束了,事务也应该提交,可以猜想,flask-sqlalchemy 里应该是注册了 flask 的 request hook,在请求结束时去自动去提交事务。

源码验证

去源码里验证一下,相关软件版本:flask 1.1.1,SQLAlchemy 1.3.10,flask-sqlalchmey 2.4.1。

发现,flask-sqlalchmey 的确是注册是一个关闭 session 的 hook 函数,和预想的不太一样的是,它是在 app context 而不是 reqest context

1
2
3
4
5
6
7
8
@app.teardown_appcontext
def shutdown_session(response_or_exc):
if app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN']:
if response_or_exc is None:
self.session.commit()

self.session.remove()
return response_or_exc

SQLALCHEMY_COMMIT_ON_TEARDOWN 模式是 False,这里会进入到 self.session.remove(),触发的是 scope_sessionremove 方法,这里面调用的 close() 方法正是去关闭事务的。

1
2
3
4
5
6
class scoped_session(object):

def remove(self):
if self.registry.has():
self.registry().close()
self.registry.clear()

再去 flask 里面看一下,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class Flask(_PackageBoundObject):
# ...
self.teardown_appcontext_funcs = []

@setupmethod
def teardown_appcontext(self, f):
"""Registers a function to be called when the application context
ends. These functions are typically also called when the request
context is popped.
"""
self.teardown_appcontext_funcs.append(f)
return f

def do_teardown_appcontext(self, exc=_sentinel):
"""Called right before the application context is popped.
When handling a request, the application context is popped
after the request context.
"""
if exc is _sentinel:
exc = sys.exc_info()[1]
for func in reversed(self.teardown_appcontext_funcs):
func(exc)
appcontext_tearing_down.send(self, exc=exc)

teardown_appcontext 装饰器将 hook 函数保存在了 teardown_appcontext_funcs 列表里,在某一个时刻,会通过 do_teardown_appcontext 调用这些函数。从 do_teardown_appcontext 的注释可以得知,这些函数一般是请求上下文被删除时调用的,而请求上下文正是在一个请求处理结束时被删除。

看来猜想是没错的,只是实现上的一点差异,继续看在哪触发的调用。

flask 通过 RequestContxt 管理请求上下文,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class RequestContext(object):
# ...

def pop(self, exc=_sentinel):
app_ctx = self._implicit_app_ctx_stack.pop()

try:
clear_request = False
if not self._implicit_app_ctx_stack:
self.preserved = False
self._preserved_exc = None
if exc is _sentinel:
exc = sys.exc_info()[1]
self.app.do_teardown_request(exc)

if hasattr(sys, "exc_clear"):
sys.exc_clear()

request_close = getattr(self.request, "close", None)
if request_close is not None:
request_close()
clear_request = True
finally:
rv = _request_ctx_stack.pop()

if clear_request:
rv.request.environ["werkzeug.request"] = None

# Get rid of the app as well if necessary.
if app_ctx is not None:
app_ctx.pop(exc)

assert rv is self, "Popped wrong request context. (%r instead of %r)" % (
rv,
self,
)

在请求处理结束时,会调用 pop 清除请求上下文,在这一行:

1
self.app.do_teardown_request(exc)

调用了注册的请求结束时需要执行的 hook 函数。继续往下,在 finally 里面调用了 app contextpop 方法:

1
2
if app_ctx is not None:
app_ctx.pop(exc)

app contextpop 方法里,触发了 self.app.do_teardown_appcontext(exc)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class AppContext(object):
#...
def pop(self, exc=_sentinel):
"""Pops the app context."""
try:
self._refcnt -= 1
if self._refcnt <= 0:
if exc is _sentinel:
exc = sys.exc_info()[1]
self.app.do_teardown_appcontext(exc)
finally:
rv = _app_ctx_stack.pop()
assert rv is self, "Popped wrong app context. (%r instead of %r)" % (rv, self)
appcontext_popped.send(self.app)

再次分析

现在已经清晰了,即使没有主动调用 db.session.commit(),只要请求正常处理结束,flask-sqlalchemy 会自动帮我们清理事务的。

看来导致查询事务未提交的根本原因是:请求非正常结束,也就是说 db.session.execute 提交了查询,数据库开启了事务,执行查询,但是还没等到触发 hook 函数,请求非正常终止了。

经实验验证了一种情况:Flask App 是跑在 Docker 里的,在一个查询请求提交后,hook 函数执行前通过 docker restart 重启服务,就会导致事务不会被提交。

当然,也不能完全排除有人从一些客户端、命令行开启事务没有提交,但这种可能性非常低。

解决方案

  1. 还是要确保 db.session.execute()/db.session.commit() 成对出现,虽然 flask-sqlalchemy 会帮我们在自动清理,但是在发起请求和自动清理之间有很多其他指令要执行,这增加了触发问题的概率。
  1. 设置 idle_in_transaction_session_timeout 参数,如果一个事务长期处于 idle in transaction 状态,超过设置的时长(单位毫秒)时,数据库会自动清理掉这些事务。

对于 1:

可以最好是统一使用上下文管理器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class SQLAlchemy(_SQLAlchemy):

@contextmanager
def auto_commit(self):
try:
yield
self.session.commit()
except Exception as e:
logger.exception(e)
db.session.rollback()
raise


with db.auto_commit():
# some query

参考

[1] Server Configuration