Browse Source

added queueing

drush
astanley 1 day ago
parent
commit
ae105694eb
  1. 34
      README.md
  2. 1
      drush.services.yml
  3. 1
      islandora_inplace_media.services.yml
  4. 58
      src/Commands/IslandoraInplaceMediaCommands.php
  5. 18
      src/Plugin/QueueWorker/InplaceMediaQueueWorker.php
  6. 41
      src/Service/InplaceMediaProcessor.php

34
README.md

@ -1,6 +1,36 @@
## INTRODUCTION
# Islandora In-Place Media
A Drush-driven ingestion tool for Islandora that creates Media entities
from files already on disk — **without moving them**.
Designed for **large-scale, resumable, idempotent, parallel-safe** ingest
workflows.
---
## ✨ Features
* ✅ In-place ingestion (no file copying unless paths differ)
* ✅ Idempotent (safe to re-run; no duplicate Media)
* ✅ Automatic resume after failure
* ✅ Drush command interface
* ✅ Queue-based background processing
* ✅ Concurrency-safe sharding for parallel workers
* ✅ Explicit ownership (uid = 1, configurable later)
* ✅ Works with cron or manual queue runners
---
## 📦 Requirements
* Drupal 10 or 11
* Drush 10+
* Islandora Media bundles already configured
* Files already present on disk (e.g. `public://`, `private://`, or mounted paths)
---
The Islandora Inplace Media module allows users to create and attach media from files staged on the server.
## INSTALLATION

1
drush.services.yml

@ -3,5 +3,6 @@ services:
class: Drupal\islandora_inplace_media\Commands\IslandoraInplaceMediaCommands
arguments:
- '@islandora_inplace_media.processor'
- '@state'
tags:
- { name: drush.command }

1
islandora_inplace_media.services.yml

@ -11,3 +11,4 @@ services:
- '@file_system'
- '@file.repository'
- '@logger.channel.islandora_inplace_media'
- '@entity_type.manager'

58
src/Commands/IslandoraInplaceMediaCommands.php

@ -4,14 +4,21 @@ namespace Drupal\islandora_inplace_media\Commands;
use Drush\Commands\DrushCommands;
use Drupal\islandora_inplace_media\Service\InplaceMediaProcessor;
use Drupal\Core\State\StateInterface;
use Symfony\Component\Console\Helper\ProgressBar;
class IslandoraInplaceMediaCommands extends DrushCommands {
protected InplaceMediaProcessor $processor;
protected StateInterface $state;
public function __construct(
protected InplaceMediaProcessor $processor
InplaceMediaProcessor $processor,
StateInterface $state,
) {
parent::__construct();
$this->processor = $processor;
$this->state = $state;
}
/**
@ -36,7 +43,10 @@ class IslandoraInplaceMediaCommands extends DrushCommands {
* Number of files to skip before processing.
* @option queue
* Queue files instead of processing immediately.
* @option shards
* Total number of queue shards.
*/
public function run(array $options = [
'source_dir' => NULL,
'destination_path' => NULL,
@ -46,26 +56,48 @@ class IslandoraInplaceMediaCommands extends DrushCommands {
'limit' => NULL,
'offset' => 0,
'queue' => FALSE,
'shards' => 1,
]) {
$files = array_values(array_diff(scandir($options['source_dir']), ['.', '..']));
$files = array_values(array_diff(
scandir($options['source_dir']),
['.', '..']
));
if ($options['offset']) {
$files = array_slice($files, (int) $options['offset']);
}
if ($options['limit']) {
$files = array_slice($files, 0, (int) $options['limit']);
$job_id = hash('sha256', serialize([
$options['source_dir'],
$options['destination_path'],
$options['media_type'],
$options['media_use'],
$options['file_type'],
]));
$state_key = "islandora_inplace_media.completed.$job_id";
$completed = $this->state->get($state_key, []);
$files = array_values(array_diff($files, $completed));
if ($options['queue']) {
foreach ($files as $file) {
$shard = crc32($file) % (int) $options['shards'];
\Drupal::queue('islandora_inplace_media')
->createItem([
'file' => $file,
'options' => $options,
'job_id' => $job_id,
'shard' => $shard,
]);
}
$this->output()->writeln('Files queued with sharding.');
return;
}
$progress = new ProgressBar($this->output(), count($files));
$progress->start();
foreach ($files as $file) {
if ($options['queue']) {
\Drupal::queue('islandora_inplace_media')
->createItem(['file' => $file, 'options' => $options]);
}
else {
$this->processor->processFile($file, $options);
if ($this->processor->processFile($file, $options)) {
$completed[] = $file;
$this->state->set($state_key, $completed);
}
$progress->advance();
}

18
src/Plugin/QueueWorker/InplaceMediaQueueWorker.php

@ -14,8 +14,22 @@ use Drupal\Core\Queue\QueueWorkerBase;
class InplaceMediaQueueWorker extends QueueWorkerBase {
public function processItem($data) {
\Drupal::service('islandora_inplace_media.processor')
->processFile($data['file'], $data['options']);
// Optional shard filtering (used by drush queue:run --shard).
if (isset($data['shard']) && isset($_SERVER['DRUSH_SHARD'])) {
if ((int) $_SERVER['DRUSH_SHARD'] !== (int) $data['shard']) {
return;
}
}
$processor = \Drupal::service('islandora_inplace_media.processor');
$state = \Drupal::state();
if ($processor->processFile($data['file'], $data['options'])) {
$key = "islandora_inplace_media.completed.{$data['job_id']}";
$completed = $state->get($key, []);
$completed[] = $data['file'];
$state->set($key, $completed);
}
}
}

41
src/Service/InplaceMediaProcessor.php

@ -7,43 +7,68 @@ use Drupal\file\FileRepositoryInterface;
use Drupal\Core\File\FileExists;
use Drupal\file\Entity\File;
use Drupal\media\Entity\Media;
use Drupal\Core\Entity\EntityTypeManagerInterface;
use Psr\Log\LoggerInterface;
class InplaceMediaProcessor {
protected $mediaStorage;
public function __construct(
protected FileSystemInterface $fileSystem,
protected FileRepositoryInterface $fileRepository,
protected LoggerInterface $logger,
) {}
FileSystemInterface $fileSystem,
FileRepositoryInterface $fileRepository,
LoggerInterface $logger,
EntityTypeManagerInterface $entityTypeManager,
) {
$this->fileSystem = $fileSystem;
$this->fileRepository = $fileRepository;
$this->logger = $logger;
$this->mediaStorage = $entityTypeManager->getStorage('media');
}
protected function mediaExists(string $uri, array $opts): bool {
return (bool) $this->mediaStorage->getQuery()
->condition('bundle', $opts['media_type'])
->condition("{$opts['file_type']}.entity.uri", $uri)
->condition('field_media_use.target_id', $opts['media_use'])
->accessCheck(FALSE)
->range(0, 1)
->execute();
}
public function processFile(string $file, array $opts): void {
public function processFile(string $file, array $opts): bool {
$source = "{$opts['source_dir']}/{$file}";
$dest = "{$opts['destination_path']}/{$file}";
if (!file_exists($source)) {
$this->logger->warning('Missing file @file', ['@file' => $source]);
return;
return FALSE;
}
$uri = ($source === $dest)
? $dest
: $this->fileSystem->copy($source, $dest, FileExists::Rename);
if ($this->mediaExists($uri, $opts)) {
return TRUE;
}
$fileEntity = $this->fileRepository->loadByUri($uri)
?? File::create(['uri' => $uri, 'status' => 1, 'uid' => 1]);
?? File::create(['uri' => $uri, 'status' => 1]);
$fileEntity->save();
preg_match('/^(\d+)_/', $file, $m);
$nid = $m[1] ?? NULL;
Media::create([
'bundle' => $opts['media_type'],
'name' => $file,
'uid' => 1,
$opts['file_type'] => ['target_id' => $fileEntity->id()],
'field_media_use' => ['target_id' => $opts['media_use']],
'field_media_of' => $nid,
])->save();
return TRUE;
}
}

Loading…
Cancel
Save