Tracking changes in Migrate with dynamic row hashes

When it comes to Drupal and external data, I use Migrate. A lot. Like a lot, lot, lot. Many times this data is being imported over CSV files that are pushed to a server at some defined interval. Usually, the data can be derived directly from the CSV file itself, other times a custom process plugin derives data from other information. Drupal's Migrate system has two steps to check if new data should be imported or skipped. First, you can tell the migration source to track changes for each row. Then, if you are tracking changes, it hashes each row of data to see if it has been changed. If you have changes in your data source, it will insert the new record or update the previously migrated record.

Why would you need this? Here's my use case. I have an entity which references a subset of other entities migrated in another process. I use a process plugin to determine which of those other entities can be referenced. The decision can change over time without my source files changing their data. We have a script which executes migrations every hour.

Honestly, this took me longer to discover than I would have preferred. And that's due to the fact YAML doesn't have an easy way to inspect the schema its system allows. Eventually, I made my way to the \Drupal\migrate\Plugin\migrate\source\SourcePluginBase class, which defines the schema for the source property on a migration. Then I discovered this!

  /**
   * Flags whether to track changes to incoming data.
   *
   * If TRUE, we will maintain hashed source rows to determine whether incoming
   * data has changed.
   *
   * @var bool
   */
  protected $trackChanges = FALSE;

I manually overrode this in my source plugin class. You would do this in your migration

source:
  plugin: csv
  path: 'private://myfile.csv'
  delimiter: ';'
  header_row_count: 1
  # Set this and make it true.
  track_changes: true
  keys:
    - SKU

Note: I also had to disable cache counts. I can't remember if this was way, but I did. The property and config key are cacheCounts and cache_counts respectively.

Okay, so now your migration will detect new rows in your source file 🤩🎉. If a row has value changes, Migrate will detect these and import the row if anything has changed. See the following from \Drupal\migrate\Plugin\migrate\source\SourcePluginBase::prepareRow

    elseif ($this->trackChanges) {
      // When tracking changed data, We want to quietly skip (rather than
      // "ignore") rows with changes. The caller needs to make that decision,
      // so we need to provide them with the necessary information (before and
      // after hashes).
      $row->rehash();
    }

The migration rehashes the row and later checks if that hash value changed. Here's how that hash is generated in \Drupal\migrate\Row::rehash:

  /**
   * Recalculates the hash for the row.
   */
  public function rehash() {
    $this->idMap['original_hash'] = $this->idMap['hash'];
    $this->idMap['hash'] = hash('sha256', serialize($this->source));
  }

Now. How do we "bust" that row hash if a process plugin will possibly change data? The process plugin reads values from our source. That source did not change. The fix? Use a hook to alter the row's source values and add metadata! Note that I said hook. Most interactions with Migrate are event driven, but the operation we need here is a hook.

The solution is to implement hook_migrate_MIGRATION_ID_prepare_row and add metadata values to the row's source values. These are harmless and can act as hash invalidators. In my case, I use this to query the number of references in my entity reference. If the number goes up or down, my data is updated and its updated references respected. It's magic!

<?php

use Drupal\migrate\Plugin\MigrateSourceInterface;
use Drupal\migrate\Plugin\MigrationInterface;
use Drupal\migrate\Row;

/**
 * Implements hook_migrate_MIGRATION_ID_prepare_row().
 *
 * Determine the number of references to influence the hash.
 */
function mymodule_migrate_MIGRATION_ID_prepare_row(Row $row, MigrateSourceInterface $source, MigrationInterface $migration) {
  $database = \Drupal::database();
  
  // This property is the identifier on the remote table.
  $sku = $row->getSourceProperty('MY_SOURCE_VALUE');
  
  $query = $database
    ->select('OTHER_ENTITY_TABLE')
    ->condition('COLUMN', $sku);

  $count = (int) $query
    ->countQuery()
    ->execute()
    ->fetchField();

  // Set the count as a
  $row->setSourceProperty('entity_count', $count);
}

Viola! Now on each migration run any changes in referenced entities causes that row's mapped entity record to be updated.