"Fossies" - the Fresh Open Source Software Archive

Member "drupal-8.9.10/core/modules/migrate/src/Plugin/migrate/id_map/Sql.php" (26 Nov 2020, 32562 Bytes) of package /linux/www/drupal-8.9.10.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) PHP source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. See also the last Fossies "Diffs" side-by-side code changes report for "Sql.php": 9.0.8_vs_9.1.0-rc1.

    1 <?php
    2 
    3 namespace Drupal\migrate\Plugin\migrate\id_map;
    4 
    5 use Drupal\Core\Database\DatabaseException;
    6 use Drupal\Core\Database\DatabaseExceptionWrapper;
    7 use Drupal\Core\Field\BaseFieldDefinition;
    8 use Drupal\Core\Plugin\ContainerFactoryPluginInterface;
    9 use Drupal\Core\Plugin\PluginBase;
   10 use Drupal\migrate\MigrateMessage;
   11 use Drupal\migrate\Audit\HighestIdInterface;
   12 use Drupal\migrate\Plugin\MigrationInterface;
   13 use Drupal\migrate\Event\MigrateIdMapMessageEvent;
   14 use Drupal\migrate\MigrateException;
   15 use Drupal\migrate\MigrateMessageInterface;
   16 use Drupal\migrate\Plugin\MigrateIdMapInterface;
   17 use Drupal\migrate\Row;
   18 use Drupal\migrate\Event\MigrateEvents;
   19 use Drupal\migrate\Event\MigrateMapSaveEvent;
   20 use Drupal\migrate\Event\MigrateMapDeleteEvent;
   21 use Symfony\Component\DependencyInjection\ContainerInterface;
   22 use Symfony\Component\EventDispatcher\EventDispatcherInterface;
   23 
   24 /**
   25  * Defines the sql based ID map implementation.
   26  *
   27  * It creates one map and one message table per migration entity to store the
   28  * relevant information.
   29  *
   30  * @PluginID("sql")
   31  */
   32 class Sql extends PluginBase implements MigrateIdMapInterface, ContainerFactoryPluginInterface, HighestIdInterface {
   33 
   34   /**
   35    * Column name of hashed source id values.
   36    */
   37   const SOURCE_IDS_HASH = 'source_ids_hash';
   38 
   39   /**
   40    * An event dispatcher instance to use for map events.
   41    *
   42    * @var \Symfony\Component\EventDispatcher\EventDispatcherInterface
   43    */
   44   protected $eventDispatcher;
   45 
   46   /**
   47    * The migration map table name.
   48    *
   49    * @var string
   50    */
   51   protected $mapTableName;
   52 
   53   /**
   54    * The message table name.
   55    *
   56    * @var string
   57    */
   58   protected $messageTableName;
   59 
   60   /**
   61    * The migrate message service.
   62    *
   63    * @var \Drupal\migrate\MigrateMessageInterface
   64    */
   65   protected $message;
   66 
   67   /**
   68    * The database connection for the map/message tables on the destination.
   69    *
   70    * @var \Drupal\Core\Database\Connection
   71    */
   72   protected $database;
   73 
   74   /**
   75    * The select query.
   76    *
   77    * @var \Drupal\Core\Database\Query\SelectInterface
   78    */
   79   protected $query;
   80 
   81   /**
   82    * The migration being done.
   83    *
   84    * @var \Drupal\migrate\Plugin\MigrationInterface
   85    */
   86   protected $migration;
   87 
   88   /**
   89    * The source ID fields.
   90    *
   91    * @var array
   92    */
   93   protected $sourceIdFields;
   94 
   95   /**
   96    * The destination ID fields.
   97    *
   98    * @var array
   99    */
  100   protected $destinationIdFields;
  101 
  102   /**
  103    * Whether the plugin is already initialized.
  104    *
  105    * @var bool
  106    */
  107   protected $initialized;
  108 
  109   /**
  110    * The result.
  111    *
  112    * @var null
  113    */
  114   protected $result = NULL;
  115 
  116   /**
  117    * The source identifiers.
  118    *
  119    * @var array
  120    */
  121   protected $sourceIds = [];
  122 
  123   /**
  124    * The destination identifiers.
  125    *
  126    * @var array
  127    */
  128   protected $destinationIds = [];
  129 
  130   /**
  131    * The current row.
  132    *
  133    * @var null
  134    */
  135   protected $currentRow = NULL;
  136 
  137   /**
  138    * The current key.
  139    *
  140    * @var array
  141    */
  142   protected $currentKey = [];
  143 
  144   /**
  145    * Constructs an SQL object.
  146    *
  147    * Sets up the tables and builds the maps,
  148    *
  149    * @param array $configuration
  150    *   The configuration.
  151    * @param string $plugin_id
  152    *   The plugin ID for the migration process to do.
  153    * @param mixed $plugin_definition
  154    *   The configuration for the plugin.
  155    * @param \Drupal\migrate\Plugin\MigrationInterface $migration
  156    *   The migration to do.
  157    * @param \Symfony\Component\EventDispatcher\EventDispatcherInterface $event_dispatcher
  158    *   The event dispatcher.
  159    */
  160   public function __construct(array $configuration, $plugin_id, $plugin_definition, MigrationInterface $migration, EventDispatcherInterface $event_dispatcher) {
  161     parent::__construct($configuration, $plugin_id, $plugin_definition);
  162     $this->migration = $migration;
  163     $this->eventDispatcher = $event_dispatcher;
  164     $this->message = new MigrateMessage();
  165 
  166     if (!isset($this->database)) {
  167       $this->database = \Drupal::database();
  168     }
  169 
  170     // Default generated table names, limited to 63 characters.
  171     $machine_name = str_replace(':', '__', $this->migration->id());
  172     $prefix_length = strlen($this->database->tablePrefix());
  173     $this->mapTableName = 'migrate_map_' . mb_strtolower($machine_name);
  174     $this->mapTableName = mb_substr($this->mapTableName, 0, 63 - $prefix_length);
  175     $this->messageTableName = 'migrate_message_' . mb_strtolower($machine_name);
  176     $this->messageTableName = mb_substr($this->messageTableName, 0, 63 - $prefix_length);
  177   }
  178 
  179   /**
  180    * {@inheritdoc}
  181    */
  182   public static function create(ContainerInterface $container, array $configuration, $plugin_id, $plugin_definition, MigrationInterface $migration = NULL) {
  183     return new static(
  184       $configuration,
  185       $plugin_id,
  186       $plugin_definition,
  187       $migration,
  188       $container->get('event_dispatcher')
  189     );
  190   }
  191 
  192   /**
  193    * Retrieves the hash of the source identifier values.
  194    *
  195    * @internal
  196    *
  197    * @param array $source_id_values
  198    *   The source identifiers
  199    *
  200    * @return string
  201    *   An hash containing the hashed values of the source identifiers.
  202    */
  203   public function getSourceIdsHash(array $source_id_values) {
  204     // When looking up the destination ID we require an array with both the
  205     // source key and value, e.g. ['nid' => 41]. In this case, $source_id_values
  206     // need to be ordered the same order as $this->sourceIdFields().
  207     // However, the Migration process plugin doesn't currently have a way to get
  208     // the source key so we presume the values have been passed through in the
  209     // correct order.
  210     if (!isset($source_id_values[0])) {
  211       $source_id_values_keyed = [];
  212       foreach ($this->sourceIdFields() as $field_name => $source_id) {
  213         $source_id_values_keyed[] = $source_id_values[$field_name];
  214       }
  215       $source_id_values = $source_id_values_keyed;
  216     }
  217     return hash('sha256', serialize(array_map('strval', $source_id_values)));
  218   }
  219 
  220   /**
  221    * The source ID fields.
  222    *
  223    * @return array
  224    *   The source ID fields.
  225    */
  226   protected function sourceIdFields() {
  227     if (!isset($this->sourceIdFields)) {
  228       // Build the source and destination identifier maps.
  229       $this->sourceIdFields = [];
  230       $count = 1;
  231       foreach ($this->migration->getSourcePlugin()->getIds() as $field => $schema) {
  232         $this->sourceIdFields[$field] = 'sourceid' . $count++;
  233       }
  234     }
  235     return $this->sourceIdFields;
  236   }
  237 
  238   /**
  239    * The destination ID fields.
  240    *
  241    * @return array
  242    *   The destination ID fields.
  243    */
  244   protected function destinationIdFields() {
  245     if (!isset($this->destinationIdFields)) {
  246       $this->destinationIdFields = [];
  247       $count = 1;
  248       foreach ($this->migration->getDestinationPlugin()->getIds() as $field => $schema) {
  249         $this->destinationIdFields[$field] = 'destid' . $count++;
  250       }
  251     }
  252     return $this->destinationIdFields;
  253   }
  254 
  255   /**
  256    * The name of the database map table.
  257    *
  258    * @return string
  259    *   The map table name.
  260    */
  261   public function mapTableName() {
  262     return $this->mapTableName;
  263   }
  264 
  265   /**
  266    * The name of the database message table.
  267    *
  268    * @return string
  269    *   The message table name.
  270    */
  271   public function messageTableName() {
  272     return $this->messageTableName;
  273   }
  274 
  275   /**
  276    * Get the fully qualified map table name.
  277    *
  278    * @return string
  279    *   The fully qualified map table name.
  280    */
  281   public function getQualifiedMapTableName() {
  282     return $this->getDatabase()->getFullQualifiedTableName($this->mapTableName);
  283   }
  284 
  285   /**
  286    * Gets the database connection.
  287    *
  288    * @return \Drupal\Core\Database\Connection
  289    *   The database connection object.
  290    */
  291   public function getDatabase() {
  292     $this->init();
  293     return $this->database;
  294   }
  295 
  296   /**
  297    * Initialize the plugin.
  298    */
  299   protected function init() {
  300     if (!$this->initialized) {
  301       $this->initialized = TRUE;
  302       $this->ensureTables();
  303     }
  304   }
  305 
  306   /**
  307    * {@inheritdoc}
  308    */
  309   public function setMessage(MigrateMessageInterface $message) {
  310     $this->message = $message;
  311   }
  312 
  313   /**
  314    * Create the map and message tables if they don't already exist.
  315    */
  316   protected function ensureTables() {
  317     if (!$this->getDatabase()->schema()->tableExists($this->mapTableName)) {
  318       // Generate appropriate schema info for the map and message tables,
  319       // and map from the source field names to the map/msg field names.
  320       $count = 1;
  321       $source_id_schema = [];
  322       $indexes = [];
  323       foreach ($this->migration->getSourcePlugin()->getIds() as $id_definition) {
  324         $mapkey = 'sourceid' . $count++;
  325         $indexes['source'][] = $mapkey;
  326         $source_id_schema[$mapkey] = $this->getFieldSchema($id_definition);
  327         $source_id_schema[$mapkey]['not null'] = TRUE;
  328       }
  329 
  330       $source_ids_hash[$this::SOURCE_IDS_HASH] = [
  331         'type' => 'varchar',
  332         'length' => '64',
  333         'not null' => TRUE,
  334         'description' => 'Hash of source ids. Used as primary key',
  335       ];
  336       $fields = $source_ids_hash + $source_id_schema;
  337 
  338       // Add destination identifiers to map table.
  339       // @todo How do we discover the destination schema?
  340       $count = 1;
  341       foreach ($this->migration->getDestinationPlugin()->getIds() as $id_definition) {
  342         // Allow dest identifier fields to be NULL (for IGNORED/FAILED cases).
  343         $mapkey = 'destid' . $count++;
  344         $fields[$mapkey] = $this->getFieldSchema($id_definition);
  345         $fields[$mapkey]['not null'] = FALSE;
  346       }
  347       $fields['source_row_status'] = [
  348         'type' => 'int',
  349         'size' => 'tiny',
  350         'unsigned' => TRUE,
  351         'not null' => TRUE,
  352         'default' => MigrateIdMapInterface::STATUS_IMPORTED,
  353         'description' => 'Indicates current status of the source row',
  354       ];
  355       $fields['rollback_action'] = [
  356         'type' => 'int',
  357         'size' => 'tiny',
  358         'unsigned' => TRUE,
  359         'not null' => TRUE,
  360         'default' => MigrateIdMapInterface::ROLLBACK_DELETE,
  361         'description' => 'Flag indicating what to do for this item on rollback',
  362       ];
  363       $fields['last_imported'] = [
  364         'type' => 'int',
  365         'unsigned' => TRUE,
  366         'not null' => TRUE,
  367         'default' => 0,
  368         'description' => 'UNIX timestamp of the last time this row was imported',
  369       ];
  370       $fields['hash'] = [
  371         'type' => 'varchar',
  372         'length' => '64',
  373         'not null' => FALSE,
  374         'description' => 'Hash of source row data, for detecting changes',
  375       ];
  376       $schema = [
  377         'description' => 'Mappings from source identifier value(s) to destination identifier value(s).',
  378         'fields' => $fields,
  379         'primary key' => [$this::SOURCE_IDS_HASH],
  380         'indexes' => $indexes,
  381       ];
  382       $this->getDatabase()->schema()->createTable($this->mapTableName, $schema);
  383 
  384       // Now do the message table.
  385       if (!$this->getDatabase()->schema()->tableExists($this->messageTableName())) {
  386         $fields = [];
  387         $fields['msgid'] = [
  388           'type' => 'serial',
  389           'unsigned' => TRUE,
  390           'not null' => TRUE,
  391         ];
  392         $fields += $source_ids_hash;
  393 
  394         $fields['level'] = [
  395           'type' => 'int',
  396           'unsigned' => TRUE,
  397           'not null' => TRUE,
  398           'default' => 1,
  399         ];
  400         $fields['message'] = [
  401           'type' => 'text',
  402           'size' => 'medium',
  403           'not null' => TRUE,
  404         ];
  405         $schema = [
  406           'description' => 'Messages generated during a migration process',
  407           'fields' => $fields,
  408           'primary key' => ['msgid'],
  409         ];
  410         $this->getDatabase()->schema()->createTable($this->messageTableName(), $schema);
  411       }
  412     }
  413     else {
  414       // Add any missing columns to the map table.
  415       if (!$this->getDatabase()->schema()->fieldExists($this->mapTableName,
  416                                                     'rollback_action')) {
  417         $this->getDatabase()->schema()->addField($this->mapTableName, 'rollback_action',
  418           [
  419             'type' => 'int',
  420             'size' => 'tiny',
  421             'unsigned' => TRUE,
  422             'not null' => TRUE,
  423             'default' => 0,
  424             'description' => 'Flag indicating what to do for this item on rollback',
  425           ]
  426         );
  427       }
  428       if (!$this->getDatabase()->schema()->fieldExists($this->mapTableName, 'hash')) {
  429         $this->getDatabase()->schema()->addField($this->mapTableName, 'hash',
  430           [
  431             'type' => 'varchar',
  432             'length' => '64',
  433             'not null' => FALSE,
  434             'description' => 'Hash of source row data, for detecting changes',
  435           ]
  436         );
  437       }
  438       if (!$this->getDatabase()->schema()->fieldExists($this->mapTableName, $this::SOURCE_IDS_HASH)) {
  439         $this->getDatabase()->schema()->addField($this->mapTableName, $this::SOURCE_IDS_HASH, [
  440           'type' => 'varchar',
  441           'length' => '64',
  442           'not null' => TRUE,
  443           'description' => 'Hash of source ids. Used as primary key',
  444         ]);
  445       }
  446     }
  447   }
  448 
  449   /**
  450    * Creates schema from an ID definition.
  451    *
  452    * @param array $id_definition
  453    *   The definition of the field having the structure as the items returned by
  454    *   MigrateSourceInterface or MigrateDestinationInterface::getIds().
  455    *
  456    * @return array
  457    *   The database schema definition.
  458    *
  459    * @see \Drupal\migrate\Plugin\MigrateSourceInterface::getIds()
  460    * @see \Drupal\migrate\Plugin\MigrateDestinationInterface::getIds()
  461    */
  462   protected function getFieldSchema(array $id_definition) {
  463     $type_parts = explode('.', $id_definition['type']);
  464     if (count($type_parts) == 1) {
  465       $type_parts[] = 'value';
  466     }
  467     unset($id_definition['type']);
  468 
  469     // Get the field storage definition.
  470     $definition = BaseFieldDefinition::create($type_parts[0]);
  471 
  472     // Get a list of setting keys belonging strictly to the field definition.
  473     $default_field_settings = $definition->getSettings();
  474     // Separate field definition settings from custom settings. Custom settings
  475     // are settings passed in $id_definition that are not part of field storage
  476     // definition settings.
  477     $field_settings = array_intersect_key($id_definition, $default_field_settings);
  478     $custom_settings = array_diff_key($id_definition, $default_field_settings);
  479 
  480     // Resolve schema from field storage definition settings.
  481     $schema = $definition
  482       ->setSettings($field_settings)
  483       ->getColumns()[$type_parts[1]];
  484 
  485     // Merge back custom settings.
  486     return $schema + $custom_settings;
  487   }
  488 
  489   /**
  490    * {@inheritdoc}
  491    */
  492   public function getRowBySource(array $source_id_values) {
  493     $query = $this->getDatabase()->select($this->mapTableName(), 'map')
  494       ->fields('map');
  495     $query->condition($this::SOURCE_IDS_HASH, $this->getSourceIdsHash($source_id_values));
  496     $result = $query->execute();
  497     return $result->fetchAssoc();
  498   }
  499 
  500   /**
  501    * {@inheritdoc}
  502    */
  503   public function getRowByDestination(array $destination_id_values) {
  504     $query = $this->getDatabase()->select($this->mapTableName(), 'map')
  505       ->fields('map');
  506     foreach ($this->destinationIdFields() as $field_name => $destination_id) {
  507       $query->condition("map.$destination_id", $destination_id_values[$field_name], '=');
  508     }
  509     $result = $query->execute();
  510     return $result->fetchAssoc();
  511   }
  512 
  513   /**
  514    * {@inheritdoc}
  515    */
  516   public function getRowsNeedingUpdate($count) {
  517     $rows = [];
  518     $result = $this->getDatabase()->select($this->mapTableName(), 'map')
  519       ->fields('map')
  520       ->condition('source_row_status', MigrateIdMapInterface::STATUS_NEEDS_UPDATE)
  521       ->range(0, $count)
  522       ->execute();
  523     foreach ($result as $row) {
  524       $rows[] = $row;
  525     }
  526     return $rows;
  527   }
  528 
  529   /**
  530    * {@inheritdoc}
  531    */
  532   public function lookupSourceId(array $destination_id_values) {
  533     $source_id_fields = $this->sourceIdFields();
  534     $query = $this->getDatabase()->select($this->mapTableName(), 'map');
  535     foreach ($source_id_fields as $source_field_name => $idmap_field_name) {
  536       $query->addField('map', $idmap_field_name, $source_field_name);
  537     }
  538     foreach ($this->destinationIdFields() as $field_name => $destination_id) {
  539       $query->condition("map.$destination_id", $destination_id_values[$field_name], '=');
  540     }
  541     $result = $query->execute();
  542     return $result->fetchAssoc() ?: [];
  543   }
  544 
  545   /**
  546    * {@inheritdoc}
  547    */
  548   public function lookupDestinationId(array $source_id_values) {
  549     @trigger_error(__NAMESPACE__ . '\Sql::lookupDestinationId() is deprecated in drupal:8.1.0 and is removed from drupal:9.0.0. Use Sql::lookupDestinationIds() instead. See https://www.drupal.org/node/2725809', E_USER_DEPRECATED);
  550     $results = $this->lookupDestinationIds($source_id_values);
  551     return $results ? reset($results) : [];
  552   }
  553 
  554   /**
  555    * {@inheritdoc}
  556    */
  557   public function lookupDestinationIds(array $source_id_values) {
  558     if (empty($source_id_values)) {
  559       return [];
  560     }
  561 
  562     // Canonicalize the keys into a hash of DB-field => value.
  563     $is_associative = !isset($source_id_values[0]);
  564     $conditions = [];
  565     foreach ($this->sourceIdFields() as $field_name => $db_field) {
  566       if ($is_associative) {
  567         // Ensure to handle array elements with a NULL value.
  568         if (array_key_exists($field_name, $source_id_values)) {
  569           // Associative $source_id_values can have fields out of order.
  570           if (isset($source_id_values[$field_name])) {
  571             // Only add a condition if the value is not NULL.
  572             $conditions[$db_field] = $source_id_values[$field_name];
  573           }
  574           unset($source_id_values[$field_name]);
  575         }
  576       }
  577       else {
  578         // For non-associative $source_id_values, we assume they're the first
  579         // few fields.
  580         if (empty($source_id_values)) {
  581           break;
  582         }
  583         $conditions[$db_field] = array_shift($source_id_values);
  584       }
  585     }
  586 
  587     if (!empty($source_id_values)) {
  588       $var_dump = var_export($source_id_values, TRUE);
  589       throw new MigrateException(sprintf("Extra unknown items for map %s in source IDs: %s", $this->mapTableName(), $var_dump));
  590     }
  591 
  592     $query = $this->getDatabase()->select($this->mapTableName(), 'map')
  593       ->fields('map', $this->destinationIdFields());
  594     if (count($this->sourceIdFields()) === count($conditions)) {
  595       // Optimization: Use the primary key.
  596       $query->condition($this::SOURCE_IDS_HASH, $this->getSourceIdsHash(array_values($conditions)));
  597     }
  598     else {
  599       foreach ($conditions as $db_field => $value) {
  600         $query->condition($db_field, $value);
  601       }
  602     }
  603 
  604     try {
  605       return $query->execute()->fetchAll(\PDO::FETCH_NUM);
  606     }
  607     catch (DatabaseExceptionWrapper $e) {
  608       // It's possible that the query will cause an exception to be thrown. For
  609       // example, the URL alias migration uses a dummy node ID of 'INVALID_NID'
  610       // to cause the lookup to return no results. On PostgreSQL this causes an
  611       // exception because 'INVALID_NID' is not the expected type.
  612       return [];
  613     }
  614   }
  615 
  616   /**
  617    * {@inheritdoc}
  618    */
  619   public function saveIdMapping(Row $row, array $destination_id_values, $source_row_status = MigrateIdMapInterface::STATUS_IMPORTED, $rollback_action = MigrateIdMapInterface::ROLLBACK_DELETE) {
  620     // Construct the source key.
  621     $source_id_values = $row->getSourceIdValues();
  622     // Construct the source key and initialize to empty variable keys.
  623     $fields = [];
  624     foreach ($this->sourceIdFields() as $field_name => $key_name) {
  625       // A NULL key value is usually an indication of a problem.
  626       if (!isset($source_id_values[$field_name])) {
  627         $this->message->display($this->t(
  628           'Did not save to map table due to NULL value for key field @field',
  629           ['@field' => $field_name]), 'error');
  630         return;
  631       }
  632       $fields[$key_name] = $source_id_values[$field_name];
  633     }
  634 
  635     if (!$fields) {
  636       return;
  637     }
  638 
  639     $fields += [
  640       'source_row_status' => (int) $source_row_status,
  641       'rollback_action' => (int) $rollback_action,
  642       'hash' => $row->getHash(),
  643     ];
  644     $count = 0;
  645     foreach ($destination_id_values as $dest_id) {
  646       $fields['destid' . ++$count] = $dest_id;
  647     }
  648     if ($count && $count != count($this->destinationIdFields())) {
  649       $this->message->display(t('Could not save to map table due to missing destination id values'), 'error');
  650       return;
  651     }
  652     if ($this->migration->getTrackLastImported()) {
  653       $fields['last_imported'] = time();
  654     }
  655     $keys = [$this::SOURCE_IDS_HASH => $this->getSourceIdsHash($source_id_values)];
  656     // Notify anyone listening of the map row we're about to save.
  657     $this->eventDispatcher->dispatch(MigrateEvents::MAP_SAVE, new MigrateMapSaveEvent($this, $fields));
  658     $this->getDatabase()->merge($this->mapTableName())
  659       ->key($keys)
  660       ->fields($fields)
  661       ->execute();
  662   }
  663 
  664   /**
  665    * {@inheritdoc}
  666    */
  667   public function saveMessage(array $source_id_values, $message, $level = MigrationInterface::MESSAGE_ERROR) {
  668     foreach ($this->sourceIdFields() as $field_name => $source_id) {
  669       // If any key value is not set, we can't save.
  670       if (!isset($source_id_values[$field_name])) {
  671         return;
  672       }
  673     }
  674     $fields[$this::SOURCE_IDS_HASH] = $this->getSourceIdsHash($source_id_values);
  675     $fields['level'] = $level;
  676     $fields['message'] = $message;
  677     $this->getDatabase()->insert($this->messageTableName())
  678       ->fields($fields)
  679       ->execute();
  680 
  681     // Notify anyone listening of the message we've saved.
  682     $this->eventDispatcher->dispatch(MigrateEvents::IDMAP_MESSAGE,
  683       new MigrateIdMapMessageEvent($this->migration, $source_id_values, $message, $level));
  684   }
  685 
  686   /**
  687    * {@inheritdoc}
  688    */
  689   public function getMessages(array $source_id_values = [], $level = NULL) {
  690     $query = $this->getDatabase()->select($this->messageTableName(), 'msg');
  691     $condition = sprintf('msg.%s = map.%s', $this::SOURCE_IDS_HASH, $this::SOURCE_IDS_HASH);
  692     $query->addJoin('LEFT', $this->mapTableName(), 'map', $condition);
  693     // Explicitly define the fields we want. The order will be preserved: source
  694     // IDs, destination IDs (if possible), and then the rest.
  695     foreach ($this->sourceIdFields() as $id => $column_name) {
  696       $query->addField('map', $column_name, "src_$id");
  697     }
  698     foreach ($this->destinationIdFields() as $id => $column_name) {
  699       $query->addField('map', $column_name, "dest_$id");
  700     }
  701     $query->fields('msg', ['msgid', $this::SOURCE_IDS_HASH, 'level', 'message']);
  702     if ($source_id_values) {
  703       $query->condition('msg.' . $this::SOURCE_IDS_HASH, $this->getSourceIdsHash($source_id_values));
  704     }
  705     if ($level) {
  706       $query->condition('msg.level', $level);
  707     }
  708     return $query->execute();
  709   }
  710 
  711   /**
  712    * {@inheritdoc}
  713    */
  714   public function getMessageIterator(array $source_id_values = [], $level = NULL) {
  715     @trigger_error('getMessageIterator() is deprecated in drupal:8.8.0 and is removed from drupal:9.0.0. Use getMessages() instead. See https://www.drupal.org/node/3060969', E_USER_DEPRECATED);
  716     return $this->getMessages($source_id_values, $level);
  717   }
  718 
  719   /**
  720    * {@inheritdoc}
  721    */
  722   public function prepareUpdate() {
  723     $this->getDatabase()->update($this->mapTableName())
  724       ->fields(['source_row_status' => MigrateIdMapInterface::STATUS_NEEDS_UPDATE])
  725       ->execute();
  726   }
  727 
  728   /**
  729    * {@inheritdoc}
  730    */
  731   public function processedCount() {
  732     return $this->countHelper(NULL, $this->mapTableName());
  733   }
  734 
  735   /**
  736    * {@inheritdoc}
  737    */
  738   public function importedCount() {
  739     return $this->countHelper([
  740       MigrateIdMapInterface::STATUS_IMPORTED,
  741       MigrateIdMapInterface::STATUS_NEEDS_UPDATE,
  742     ]);
  743   }
  744 
  745   /**
  746    * {@inheritdoc}
  747    */
  748   public function updateCount() {
  749     return $this->countHelper(MigrateIdMapInterface::STATUS_NEEDS_UPDATE);
  750   }
  751 
  752   /**
  753    * {@inheritdoc}
  754    */
  755   public function errorCount() {
  756     return $this->countHelper(MigrateIdMapInterface::STATUS_FAILED);
  757   }
  758 
  759   /**
  760    * {@inheritdoc}
  761    */
  762   public function messageCount() {
  763     return $this->countHelper(NULL, $this->messageTableName());
  764   }
  765 
  766   /**
  767    * Counts records in a table.
  768    *
  769    * @param int|array $status
  770    *   (optional) Status code(s) to filter the source_row_status column.
  771    * @param string $table
  772    *   (optional) The table to work. Defaults to NULL.
  773    *
  774    * @return int
  775    *   The number of records.
  776    */
  777   protected function countHelper($status = NULL, $table = NULL) {
  778     // Use database directly to avoid creating tables.
  779     $query = $this->database->select($table ?: $this->mapTableName());
  780     if (isset($status)) {
  781       $query->condition('source_row_status', $status, is_array($status) ? 'IN' : '=');
  782     }
  783     try {
  784       $count = (int) $query->countQuery()->execute()->fetchField();
  785     }
  786     catch (DatabaseException $e) {
  787       // The table does not exist, therefore there are no records.
  788       $count = 0;
  789     }
  790     return $count;
  791   }
  792 
  793   /**
  794    * {@inheritdoc}
  795    */
  796   public function delete(array $source_id_values, $messages_only = FALSE) {
  797     if (empty($source_id_values)) {
  798       throw new MigrateException('Without source identifier values it is impossible to find the row to delete.');
  799     }
  800 
  801     if (!$messages_only) {
  802       $map_query = $this->getDatabase()->delete($this->mapTableName());
  803       $map_query->condition($this::SOURCE_IDS_HASH, $this->getSourceIdsHash($source_id_values));
  804       // Notify anyone listening of the map row we're about to delete.
  805       $this->eventDispatcher->dispatch(MigrateEvents::MAP_DELETE, new MigrateMapDeleteEvent($this, $source_id_values));
  806       $map_query->execute();
  807     }
  808     $message_query = $this->getDatabase()->delete($this->messageTableName());
  809     $message_query->condition($this::SOURCE_IDS_HASH, $this->getSourceIdsHash($source_id_values));
  810     $message_query->execute();
  811   }
  812 
  813   /**
  814    * {@inheritdoc}
  815    */
  816   public function deleteDestination(array $destination_id_values) {
  817     $map_query = $this->getDatabase()->delete($this->mapTableName());
  818     $message_query = $this->getDatabase()->delete($this->messageTableName());
  819     $source_id_values = $this->lookupSourceId($destination_id_values);
  820     if (!empty($source_id_values)) {
  821       foreach ($this->destinationIdFields() as $field_name => $destination_id) {
  822         $map_query->condition($destination_id, $destination_id_values[$field_name]);
  823       }
  824       // Notify anyone listening of the map row we're about to delete.
  825       $this->eventDispatcher->dispatch(MigrateEvents::MAP_DELETE, new MigrateMapDeleteEvent($this, $source_id_values));
  826       $map_query->execute();
  827 
  828       $message_query->condition($this::SOURCE_IDS_HASH, $this->getSourceIdsHash($source_id_values));
  829       $message_query->execute();
  830     }
  831   }
  832 
  833   /**
  834    * {@inheritdoc}
  835    */
  836   public function setUpdate(array $source_id_values) {
  837     if (empty($source_id_values)) {
  838       throw new MigrateException('No source identifiers provided to update.');
  839     }
  840     $query = $this->getDatabase()
  841       ->update($this->mapTableName())
  842       ->fields(['source_row_status' => MigrateIdMapInterface::STATUS_NEEDS_UPDATE]);
  843 
  844     foreach ($this->sourceIdFields() as $field_name => $source_id) {
  845       $query->condition($source_id, $source_id_values[$field_name]);
  846     }
  847     $query->execute();
  848   }
  849 
  850   /**
  851    * {@inheritdoc}
  852    */
  853   public function clearMessages() {
  854     $this->getDatabase()->truncate($this->messageTableName())->execute();
  855   }
  856 
  857   /**
  858    * {@inheritdoc}
  859    */
  860   public function destroy() {
  861     $this->getDatabase()->schema()->dropTable($this->mapTableName());
  862     $this->getDatabase()->schema()->dropTable($this->messageTableName());
  863   }
  864 
  865   /**
  866    * Implementation of \Iterator::rewind().
  867    *
  868    * This is called before beginning a foreach loop.
  869    */
  870   public function rewind() {
  871     $this->currentRow = NULL;
  872     $fields = [];
  873     foreach ($this->sourceIdFields() as $field) {
  874       $fields[] = $field;
  875     }
  876     foreach ($this->destinationIdFields() as $field) {
  877       $fields[] = $field;
  878     }
  879     $this->result = $this->getDatabase()->select($this->mapTableName(), 'map')
  880       ->fields('map', $fields)
  881       ->orderBy('destid1')
  882       ->execute();
  883     $this->next();
  884   }
  885 
  886   /**
  887    * Implementation of \Iterator::current().
  888    *
  889    * This is called when entering a loop iteration, returning the current row.
  890    */
  891   public function current() {
  892     return $this->currentRow;
  893   }
  894 
  895   /**
  896    * Implementation of \Iterator::key().
  897    *
  898    * This is called when entering a loop iteration, returning the key of the
  899    * current row. It must be a scalar - we will serialize to fulfill the
  900    * requirement, but using getCurrentKey() is preferable.
  901    */
  902   public function key() {
  903     return serialize($this->currentKey);
  904   }
  905 
  906   /**
  907    * {@inheritdoc}
  908    */
  909   public function currentDestination() {
  910     if ($this->valid()) {
  911       $result = [];
  912       foreach ($this->destinationIdFields() as $destination_field_name => $idmap_field_name) {
  913         if (!is_null($this->currentRow[$idmap_field_name])) {
  914           $result[$destination_field_name] = $this->currentRow[$idmap_field_name];
  915         }
  916       }
  917       return $result;
  918     }
  919     else {
  920       return NULL;
  921     }
  922   }
  923 
  924   /**
  925    * {@inheritdoc}
  926    */
  927   public function currentSource() {
  928     if ($this->valid()) {
  929       $result = [];
  930       foreach ($this->sourceIdFields() as $field_name => $source_id) {
  931         $result[$field_name] = $this->currentKey[$source_id];
  932       }
  933       return $result;
  934     }
  935     else {
  936       return NULL;
  937     }
  938   }
  939 
  940   /**
  941    * Implementation of \Iterator::next().
  942    *
  943    * This is called at the bottom of the loop implicitly, as well as explicitly
  944    * from rewind().
  945    */
  946   public function next() {
  947     $this->currentRow = $this->result->fetchAssoc();
  948     $this->currentKey = [];
  949     if ($this->currentRow) {
  950       foreach ($this->sourceIdFields() as $map_field) {
  951         $this->currentKey[$map_field] = $this->currentRow[$map_field];
  952         // Leave only destination fields.
  953         unset($this->currentRow[$map_field]);
  954       }
  955     }
  956   }
  957 
  958   /**
  959    * Implementation of \Iterator::valid().
  960    *
  961    * This is called at the top of the loop, returning TRUE to process the loop
  962    * and FALSE to terminate it.
  963    */
  964   public function valid() {
  965     return $this->currentRow !== FALSE;
  966   }
  967 
  968   /**
  969    * Returns the migration plugin manager.
  970    *
  971    * @todo Inject as a dependency in https://www.drupal.org/node/2919158.
  972    *
  973    * @return \Drupal\migrate\Plugin\MigrationPluginManagerInterface
  974    *   The migration plugin manager.
  975    */
  976   protected function getMigrationPluginManager() {
  977     return \Drupal::service('plugin.manager.migration');
  978   }
  979 
  980   /**
  981    * {@inheritdoc}
  982    */
  983   public function getHighestId() {
  984     // Ensure that the first ID is an integer.
  985     $keys = $this->migration->getDestinationPlugin()->getIds();
  986     if (reset($keys)['type'] !== 'integer') {
  987       throw new \LogicException('To determine the highest migrated ID the first ID must be an integer');
  988     }
  989 
  990     // List of mapping tables to look in for the highest ID.
  991     $map_tables = [
  992       $this->migration->id() => $this->mapTableName(),
  993     ];
  994 
  995     // If there's a bundle, it means we have a derived migration and we need to
  996     // find all the mapping tables from the related derived migrations.
  997     if ($base_id = substr($this->migration->id(), 0, strpos($this->migration->id(), $this::DERIVATIVE_SEPARATOR))) {
  998       $migration_manager = $this->getMigrationPluginManager();
  999       $migrations = $migration_manager->getDefinitions();
 1000       foreach ($migrations as $migration_id => $migration) {
 1001         if ($migration['id'] === $base_id) {
 1002           // Get this derived migration's mapping table and add it to the list
 1003           // of mapping tables to look in for the highest ID.
 1004           $stub = $migration_manager->createInstance($migration_id);
 1005           $map_tables[$migration_id] = $stub->getIdMap()->mapTableName();
 1006         }
 1007       }
 1008     }
 1009 
 1010     // Get the highest id from the list of map tables.
 1011     $ids = [0];
 1012     foreach ($map_tables as $map_table) {
 1013       // If the map_table does not exist then continue on to the next map_table.
 1014       if (!$this->getDatabase()->schema()->tableExists($map_table)) {
 1015         continue;
 1016       }
 1017 
 1018       $query = $this->getDatabase()->select($map_table, 'map')
 1019         ->fields('map', $this->destinationIdFields())
 1020         ->range(0, 1);
 1021       foreach (array_values($this->destinationIdFields()) as $order_field) {
 1022         $query->orderBy($order_field, 'DESC');
 1023       }
 1024       $ids[] = $query->execute()->fetchField();
 1025     }
 1026 
 1027     // Return the highest of all the mapped IDs.
 1028     return (int) max($ids);
 1029   }
 1030 
 1031 }