"Fossies" - the Fresh Open Source Software Archive

Member "glance-24.1.0/glance/tests/functional/db/migrations/test_train_migrate01.py" (8 Jun 2022, 5578 Bytes) of package /linux/misc/openstack/glance-24.1.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file.

    1 # Copyright 2019 RedHat Inc
    2 #
    3 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
    4 #    not use this file except in compliance with the License. You may obtain
    5 #    a copy of the License at
    6 #
    7 #         http://www.apache.org/licenses/LICENSE-2.0
    8 #
    9 #    Unless required by applicable law or agreed to in writing, software
   10 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   11 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   12 #    License for the specific language governing permissions and limitations
   13 #    under the License.
   14 
   15 import datetime
   16 
   17 from oslo_db.sqlalchemy import test_fixtures
   18 from oslo_db.sqlalchemy import utils as db_utils
   19 
   20 from glance.db.sqlalchemy.alembic_migrations import data_migrations
   21 from glance.tests.functional.db import test_migrations
   22 import glance.tests.utils as test_utils
   23 
   24 
   25 class TestTrainMigrate01Mixin(test_migrations.AlembicMigrationsMixin):
   26 
   27     def _get_revisions(self, config):
   28         return test_migrations.AlembicMigrationsMixin._get_revisions(
   29             self, config, head='train_expand01')
   30 
   31     def _pre_upgrade_train_expand01(self, engine):
   32         images = db_utils.get_table(engine, 'images')
   33         image_locations = db_utils.get_table(engine, 'image_locations')
   34         now = datetime.datetime.now()
   35 
   36         # inserting a public image record
   37         image_1 = dict(deleted=False,
   38                        created_at=now,
   39                        status='active',
   40                        min_disk=0,
   41                        min_ram=0,
   42                        visibility='public',
   43                        id='image_1')
   44         with engine.connect() as conn, conn.begin():
   45             conn.execute(images.insert().values(image_1))
   46 
   47         image_2 = dict(deleted=False,
   48                        created_at=now,
   49                        status='active',
   50                        min_disk=0,
   51                        min_ram=0,
   52                        visibility='public',
   53                        id='image_2')
   54         with engine.connect() as conn, conn.begin():
   55             conn.execute(images.insert().values(image_2))
   56 
   57         # adding records to image_locations tables
   58         temp = dict(deleted=False,
   59                     created_at=now,
   60                     image_id='image_1',
   61                     value='image_location_1',
   62                     meta_data='{"backend": "fast"}',
   63                     id=1)
   64         with engine.connect() as conn, conn.begin():
   65             conn.execute(image_locations.insert().values(temp))
   66 
   67         temp = dict(deleted=False,
   68                     created_at=now,
   69                     image_id='image_2',
   70                     value='image_location_2',
   71                     meta_data='{"backend": "cheap"}',
   72                     id=2)
   73         with engine.connect() as conn, conn.begin():
   74             conn.execute(image_locations.insert().values(temp))
   75 
   76     def _check_train_expand01(self, engine, data):
   77         image_locations = db_utils.get_table(engine, 'image_locations')
   78 
   79         # check that meta_data has 'backend' key for existing image_locations
   80         with engine.connect() as conn:
   81             rows = conn.execute(
   82                 image_locations.select().order_by(image_locations.c.id)
   83             ).fetchall()
   84 
   85         self.assertEqual(2, len(rows))
   86         for row in rows:
   87             self.assertIn('"backend":', row['meta_data'])
   88 
   89         # run data migrations
   90         data_migrations.migrate(engine, release='train')
   91 
   92         # check that meta_data has 'backend' key replaced with 'store'
   93         with engine.connect() as conn:
   94             rows = conn.execute(
   95                 image_locations.select().order_by(image_locations.c.id)
   96             ).fetchall()
   97 
   98         self.assertEqual(2, len(rows))
   99         for row in rows:
  100             self.assertNotIn('"backend":', row['meta_data'])
  101             self.assertIn('"store":', row['meta_data'])
  102 
  103 
  104 class TestTrainMigrate01MySQL(
  105     TestTrainMigrate01Mixin,
  106     test_fixtures.OpportunisticDBTestMixin,
  107     test_utils.BaseTestCase,
  108 ):
  109     FIXTURE = test_fixtures.MySQLOpportunisticFixture
  110 
  111 
  112 class TestTrain01PostgresSQL(
  113     TestTrainMigrate01Mixin,
  114     test_fixtures.OpportunisticDBTestMixin,
  115     test_utils.BaseTestCase,
  116 ):
  117     FIXTURE = test_fixtures.PostgresqlOpportunisticFixture
  118 
  119 
  120 class TestTrainMigrate01_EmptyDBMixin(test_migrations.AlembicMigrationsMixin):
  121     """This mixin is used to create an initial glance database and upgrade it
  122     up to the train_expand01 revision.
  123     """
  124     def _get_revisions(self, config):
  125         return test_migrations.AlembicMigrationsMixin._get_revisions(
  126             self, config, head='train_expand01')
  127 
  128     def _pre_upgrade_train_expand01(self, engine):
  129         # New/empty database
  130         pass
  131 
  132     def _check_train_expand01(self, engine, data):
  133         images = db_utils.get_table(engine, 'images')
  134 
  135         # check that there are no rows in the images table
  136         with engine.connect() as conn:
  137             rows = conn.execute(
  138                 images.select().order_by(images.c.id)
  139             ).fetchall()
  140 
  141         self.assertEqual(0, len(rows))
  142 
  143         # run data migrations
  144         data_migrations.migrate(engine)
  145 
  146 
  147 class TestTrainMigrate01_EmptyDBMySQL(
  148     TestTrainMigrate01_EmptyDBMixin,
  149     test_fixtures.OpportunisticDBTestMixin,
  150     test_utils.BaseTestCase,
  151 ):
  152     FIXTURE = test_fixtures.MySQLOpportunisticFixture
  153 
  154 
  155 class TestTrainMigrate01_PySQL(
  156     TestTrainMigrate01_EmptyDBMixin,
  157     test_fixtures.OpportunisticDBTestMixin,
  158     test_utils.BaseTestCase,
  159 ):
  160     FIXTURE = test_fixtures.PostgresqlOpportunisticFixture