Django Migration 源码分析

25 minute read

Django Migration 主要用来自动化地变更数据库的 schema(新增表,新增字段等等),有点类似版本控制系统(git),只是控制的是数据库的 schema,而不是代码。

migration 主要分为两部分:

  • makemigrations: 基于 models 的变更,来生成新的 migration 文件,存放到各个 app 下的 migrations 目录。

  • migrate: 把 migration 文件应用到数据库,也可以反向取消已应用的变更,类似 git revert

主要流程如下图:

django migrations

下面就从源码层次分析下 Django Migration 系统的内部原理,Django 版本为 v1.11。

Make Migrations

python manage.py makemigrations 该命令通过对比现有的 migration 文件和所有 APP 的 models 字段,根据差异生成新的 migration 文件。

代码路径: https://github.com/django/django/blob/stable/1.11.x/django/core/management/commands/makemigrations.py

MigrationLoader

代码路径: https://github.com/django/django/blob/stable/1.11.x/django/db/migrations/loader.py#L21

makemigration 首先会实例化一个 MigrationLoader

loader = MigrationLoader(None, ignore_no_migrations=True)

MigrationLoader 在实例化过程中,会通过加载当前项目下所有的 migration 文件来构造出 MigrationGraph

class MigrationLoader(object):
    """
    Loads migration files from disk, and their status from the database.
    ...
    """
    def __init__(self, connection, load=True, ignore_no_migrations=False):
        ...
        if load:
            self.build_graph()

    def build_graph(self):
        self.load_disk()
        ...
        self.graph = MigrationGraph()
        ...

    def load_disk(self):
        """
        Loads the migrations from all INSTALLED_APPS from disk.
        """
        ...

MigrationGraph 本质上就是项目里所有 migration 文件的依赖关系图。图中的每一个节点 Node 代表一个 app 下的 migration 文件,例如 ('app_A, '0001_auto_20190822_0806')

migration graph

class MigrationGraph(object):
    """
    Represents the digraph of all migrations in a project.
    ...
    """
    def __init__(self):
        self.node_map = {}
        self.nodes = {}
        self.cached = False

class Node(object):
    """
    A single node in the migration graph. Contains direct links to adjacent
    nodes in either direction.
    """
    def __init__(self, key):
        self.key = key
        self.children = set()
        self.parents = set()

ProjectState

代码路径: https://github.com/django/django/blob/stable/1.11.x/django/db/migrations/state.py#L88

有了上面的依赖关系图后,就可以推导出整个项目的状态,也就是上一次执行makemigration时的项目状态,项目状态可以理解为项目里所有APP里的model状态。

# Set up autodetector
autodetector = MigrationAutodetector(
    loader.project_state(),
    ProjectState.from_apps(apps),
    questioner,
)

其中, loader.project_state()就是通过依赖关系图推导出的项目状态,而ProjectState.from_apps(apps)是通过app推导出的当前实时的项目状态,把这两个新旧状态传给MigrationAutodetector,由它来对比出差异。

class MigrationLoader(object):

    def project_state(self, nodes=None, at_end=True):
        """
        Returns a ProjectState object representing the most recent state
        that the migrations we loaded represent.

        See graph.make_state for the meaning of "nodes" and "at_end"
        """
        return self.graph.make_state(nodes=nodes, at_end=at_end, real_apps=list(self.unmigrated_apps))

在这个函数里,loader没做什么事情,只是通过依赖关系图graph.make_state来推导状态。

class MigrationGraph(object):
    def make_state(self, nodes=None, at_end=True, real_apps=None):
        """
        Given a migration node or nodes, returns a complete ProjectState for it.
        If at_end is False, returns the state before the migration has run.
        If nodes is not provided, returns the overall most current project state.
        """
        if nodes is None:
            nodes = list(self.leaf_nodes())
        if len(nodes) == 0:
            return ProjectState()
        if not isinstance(nodes[0], tuple):
            nodes = [nodes]
        plan = []

        for node in nodes:
            for migration in self.forwards_plan(node):
                if migration not in plan:
                    if not at_end and migration in nodes:
                        continue
                    plan.append(migration)
        project_state = ProjectState(real_apps=real_apps)

        for node in plan:
            project_state = self.nodes[node].mutate_state(project_state, preserve=False)

        return project_state

make_state函数其实就做了一件事情,从空的 ProjectState 开始,把当前所有的 migration 应用一遍,这样就导出了对应的项目状态。

上面说的项目状态就是所有 APP 的 model,对应的类为ProjectState

class ProjectState(object):
    """
    Represents the entire project's overall state.
    This is the item that is passed around - we do it here rather than at the
    app level so that cross-app FKs/etc. resolve properly.
    """

    def __init__(self, models=None, real_apps=None):
        self.models = models or {}
        # Apps to include from main registry, usually unmigrated ones
        self.real_apps = real_apps or []
        self.is_delayed = False

    @classmethod
    def from_apps(cls, apps):
        "Takes in an Apps and returns a ProjectState matching it"
        app_models = {}
        for model in apps.get_models(include_swapped=True):
            model_state = ModelState.from_model(model)
            app_models[(model_state.app_label, model_state.name_lower)] = model_state

        return cls(app_models)

ProjectState 借助了 ModelState 来应用 migration, 之所以不用db.Model,是因为db.Model.options假定不可变。

class ModelState(object):
    """
    Represents a Django Model. We don't use the actual Model class
    as it's not designed to have its options changed - instead, we
    mutate this one and then render it into a Model as required.

    Note that while you are allowed to mutate .fields, you are not allowed
    to mutate the Field instances inside there themselves - you must instead
    assign new ones, as these are not detached during a clone.
    """
    def __init__(self, app_label, name, fields, options=None, bases=None, managers=None):
        self.app_label = app_label
        self.name = force_text(name)
        self.fields = fields
        self.options = options or {}
        self.options.setdefault('indexes', [])
        self.bases = bases or (models.Model, )
        self.managers = managers or []

MigrationAutodetector

代码路径: https://github.com/django/django/blob/stable/1.11.x/django/db/migrations/autodetector.py#L22

autodetector 构造出来后,makemigration接下来调用 changes 函数来对比新旧状态的差异。

# Detect changes
changes = autodetector.changes(
    graph=loader.graph,
    trim_to_apps=app_labels or None,
    convert_apps=app_labels or None,
    migration_name=self.migration_name,
)

MigrationAutodetector的class如下:

class MigrationAutodetector(object):
    """
    Takes a pair of ProjectStates, and compares them to see what the
    first would need doing to make it match the second (the second
    usually being the project's current state).

    Note that this naturally operates on entire projects at a time,
    as it's likely that changes interact (for example, you can't
    add a ForeignKey without having a migration to add the table it
    depends on first). A user interface may offer single-app usage
    if it wishes, with the caveat that it may not always be possible.
    """

    def __init__(self, from_state, to_state, questioner=None):
        self.from_state = from_state
        self.to_state = to_state
        self.questioner = questioner or MigrationQuestioner()
        self.existing_apps = {app for app, model in from_state.models}

    def changes(self, graph, trim_to_apps=None, convert_apps=None, migration_name=None):
        """
        Main entry point to produce a list of applicable changes.
        Takes a graph to base names on and an optional set of apps
        to try and restrict to (restriction is not guaranteed)
        """
        changes = self._detect_changes(convert_apps, graph)
        changes = self.arrange_for_graph(changes, graph, migration_name)
        if trim_to_apps:
            changes = self._trim_to_apps(changes, trim_to_apps)
        return changes

changes为字典对象,包含每个app下的model变更,类似如下:

{'app1': [<Migration app1.0003_auto_20210120_1451>], 'app2': [<Migration app2.0005_auto_20210120_1451>]}

Migration

代码路径: https://github.com/django/django/blob/stable/1.11.x/django/db/migrations/migration.py#L10

上一步生成的每个app的model变更都是一个Migration对象,class如下:

class Migration(object):
    """
    The base class for all migrations.
    """

    # Operations to apply during this migration, in order.
    operations = []

    # Other migrations that should be run before this migration.
    # Should be a list of (app, migration_name).
    dependencies = []

    def mutate_state(self, project_state, preserve=True):
        """
        Takes a ProjectState and returns a new one with the migration's
        operations applied to it. Preserves the original object state by
        default and will return a mutated state from a copy.
        """
        new_state = project_state
        if preserve:
            new_state = project_state.clone()

        for operation in self.operations:
            operation.state_forwards(self.app_label, new_state)
        return new_state

Migration有个方法mutate_state,该方法就是被graph.make_state用来应用 migration,生成新的项目状态。

每个 migration 对象最后再通过MigrationWriter写入到各自app的migration目录,最后生成如下文件:

# -*- coding: utf-8 -*-
# Generated by Django 1.11.2 on 2021-01-20 07:47
from __future__ import unicode_literals

from django.db import migrations, models


class Migration(migrations.Migration):

    dependencies = [
        ('app', '0004_auto_20210115_1133'),
    ]

    operations = [
        migrations.AlterField(
            model_name='model1',
            name='attribute_code',
            field=models.CharField(max_length=32, unique=True),
        ),
        migrations.AlterField(
            model_name='model2',
            name='parent_code',
            field=models.CharField(blank=True, default=''),
        ),
    ]

至此, makemigration 整个过程完成。

Bonus

理论上说,makemigrate只检查项目里的 migration 文件,不应该访问数据库,但实际上实例化完MigrationLoader后,会做一次历史一致性的检查,该检查主要防止django_migration表的数据不一致,比如已经被应用的migration 0002,它所依赖的migration 0001却没被应用。

# Raise an error if any migrations are applied before their dependencies.
consistency_check_labels = set(config.label for config in apps.get_app_configs())

loader.check_consistent_history(connection)

这个问题在ticket 25850里提出来,在1.10版本里修复

所以,如果项目无法连接到数据库,会导致 make 过程失败。

Migrate

这篇关于SchemaEditor的文档里有说明,Django Migration 由两部分组成:

  1. 计算出哪些migration需要应用。
  2. 遍历这些migrationoperations列表,然后交给 SchemaEditor 来执行更新数据库的操作。

MigrationExecutor

首先,实例化一个MigrationExecutor, 并把数据库连接connection和一个回调函数migration_progress_callback传给这个 executor

executor = MigrationExecutor(connection, self.migration_progress_callback)

migration_progress_callback主要为了输出migration过程中的一些信息,在这个commit里引入,使用callback函数的好处是,对MigrationExecutor的逻辑改动较少,也更灵活,如果需要输出不同的信息,无需改动executor, 只需要传入不同的callback函数即可。

MigrationExecutor代码如下:

class MigrationExecutor(object):
    """
    End-to-end migration execution - loads migrations, and runs them
    up or down to a specified set of targets.
    """

    def __init__(self, connection, progress_callback=None):
        self.connection = connection
        self.loader = MigrationLoader(self.connection)
        self.recorder = MigrationRecorder(self.connection)
        self.progress_callback = progress_callback

MigrationLoader 在上面已经了解过,主要用来加载磁盘上的 migration 文件,并生成 migration 文件的依赖关系图。

MigrationRecorder 主要为了把 migration 的执行记录在数据库里记下来,方便实现增量式的变更。

MigrationRecorder

MigrationRecorder class 如下:

class MigrationRecorder(object):
    """
    Deals with storing migration records in the database.

    Because this table is actually itself used for dealing with model
    creation, it's the one thing we can't do normally via migrations.
    We manually handle table creation/schema updating (using schema backend)
    and then have a floating model to do queries with.

    If a migration is unapplied its row is removed from the table. Having
    a row in the table always means a migration is applied.
    """

    @python_2_unicode_compatible
    class Migration(models.Model):
        app = models.CharField(max_length=255)
        name = models.CharField(max_length=255)
        applied = models.DateTimeField(default=now)

        class Meta:
            apps = Apps()
            app_label = "migrations"
            db_table = "django_migrations"

        def __str__(self):
            return "Migration %s for %s" % (self.name, self.app)

    def __init__(self, connection):
        self.connection = connection

Django里所有的数据表都可以通过 migratation 创建,唯独 migration 自己所需要用到的表需要使用SchemaEditor手工创建。

class MigrationRecorder(object):
    ...

    def ensure_schema(self):
        """
        Ensures the table exists and has the correct schema.
        """
        # If the table's there, that's fine - we've never changed its schema
        # in the codebase.
        if self.Migration._meta.db_table in self.connection.introspection.table_names(self.connection.cursor()):
            return
        # Make the table
        try:
            with self.connection.schema_editor() as editor:
                editor.create_model(self.Migration)
        except DatabaseError as exc:
            raise MigrationSchemaMissing("Unable to create the django_migrations table (%s)" % exc)

ensure_schema()会在多个地方被调用,确保数据表django_migrations存在。

Migration Plan

MigrationExecutor 实例化完成后,接下来需要生成migration plan, 也就是需要执行的 migration, migration_plan 主要就从 loader 的 migration 依赖图里,找到未应用的 migration。

targets = executor.loader.graph.leaf_nodes()

plan = executor.migration_plan(targets)
class MigrationExecutor(object):

    def migration_plan(self, targets, clean_start=False):
        """
        Given a set of targets, returns a list of (Migration instance, backwards?).
        """
        plan = []

        for migration in self.loader.graph.forwards_plan(target):
            if migration not in applied:
                plan.append((self.loader.graph.nodes[migration], False))
                applied.add(migration)

Pre-migrate/Post-migrate Signal

在 migrate 执行前后,各个 app 可以接收信号pre_migrate/post_migrate来做一些自定义的事情。

def emit_pre_migrate_signal(verbosity, interactive, db, **kwargs):
    # Emit the pre_migrate signal for every application.
    for app_config in apps.get_app_configs():
        models.signals.pre_migrate.send(
            sender=app_config,
            app_config=app_config,
            verbosity=verbosity,
            interactive=interactive,
            using=db,
            **kwargs
        )

def emit_post_migrate_signal(verbosity, interactive, db, **kwargs):
    # Emit the post_migrate signal for every application.
    for app_config in apps.get_app_configs():
        models.signals.post_migrate.send(
            sender=app_config,
            app_config=app_config,
            verbosity=verbosity,
            interactive=interactive,
            using=db,
            **kwargs
        )

Migrate

migrate 分正向和反向(forward/backward), 反向migrate主要用在回退一个 migration, 例如:当前已经应用了0001, 0002, 0003, 重新应用0001, 就会回退后面两个 migration。

有了需要应用的 plan 列表后,migrate就可以遍历这个列表,开始应用migration,也就是调用migration.apply

        post_migrate_state = executor.migrate(
            targets, plan=plan, state=pre_migrate_state.clone(), fake=fake,
            fake_initial=fake_initial,
        )
class MigrationExecutor(object):
    def migrate(self, targets, plan=None, state=None, fake=False, fake_initial=False):
        """
        Migrates the database up to the given targets.

        Django first needs to create all project states before a migration is
        (un)applied and in a second step run all the database operations.
        """

        ...
        elif all_forwards:
            state = self._migrate_all_forwards(state, plan, full_plan, fake=fake, fake_initial=fake_initial)

    def _migrate_all_forwards(self, state, plan, full_plan, fake, fake_initial):
        """
        Take a list of 2-tuples of the form (migration instance, False) and
        apply them in the order they occur in the full_plan.
        """
        migrations_to_run = {m[0] for m in plan}
        for migration, _ in full_plan:
            ...
            if migration in migrations_to_run:
                state = self.apply_migration(state, migration, fake=fake, fake_initial=fake_initial)

        return state

    def apply_migration(self, state, migration, fake=False, fake_initial=False):
        """
        Runs a migration forwards.
        """
        ...
                with self.connection.schema_editor(atomic=migration.atomic) as schema_editor:
                    state = migration.apply(state, schema_editor)

        return state

apply函数就是遍历 migraiton 里的 operations 列表,再交给 SchemaEditor 来执行具体的数据库变更。

    def apply(self, project_state, schema_editor, collect_sql=False):
        """
        Takes a project_state representing all migrations prior to this one
        and a schema_editor for a live database and applies the migration
        in a forwards order.

        Returns the resulting project state for efficient re-use by following
        Migrations.
        """
        for operation in self.operations:

                operation.database_forwards(self.app_label, schema_editor, old_state, project_state)

        return project_state

至此,整个 migrate 操作完成。