diff --git a/app/Console/Commands/Elastic/RebuildElasticIndexes.php b/app/Console/Commands/Elastic/RebuildElasticIndexes.php index 1b4ed98682..d8cc558371 100644 --- a/app/Console/Commands/Elastic/RebuildElasticIndexes.php +++ b/app/Console/Commands/Elastic/RebuildElasticIndexes.php @@ -17,13 +17,17 @@ use App\Models\Vendor; use App\Models\VendorContact; use Elastic\Elasticsearch\ClientBuilder; use Illuminate\Support\Facades\Artisan; +use Illuminate\Support\Facades\DB; class RebuildElasticIndexes extends Command { protected $signature = 'elastic:rebuild {--model= : Rebuild only a specific model (e.g., Invoice, Client)} {--force : Force the operation without confirmation} - {--dry-run : Show what would be done without making changes}'; + {--dry-run : Show what would be done without making changes} + {--chunk=500 : Number of records to import per chunk} + {--wait : Wait for our queued jobs to complete after each model (recommended for production)} + {--no-queue : Import synchronously instead of queueing (slower but safer)}'; protected $description = 'Rebuild Elasticsearch indexes one at a time to minimize production impact'; @@ -70,7 +74,17 @@ class RebuildElasticIndexes extends Command $this->info(' • Each index will be dropped, migrated, and re-imported sequentially'); $this->info(' • Search will be unavailable for each model during its rebuild'); $this->info(' • Other models remain searchable while one rebuilds'); - $this->info(' • This minimizes overall downtime for production systems'); + + if ($this->option('wait')) { + $this->info(' • Will WAIT for our jobs to complete after each model (production-safe)'); + } else { + $this->warn(' • WARNING: Jobs will queue up async (use --wait for production)'); + } + + if ($this->option('no-queue')) { + $this->info(' • Using SYNCHRONOUS import (slower but guaranteed)'); + } + $this->newLine(); $totalRecords = $this->getTotalRecordCount(); @@ -141,6 +155,8 @@ class RebuildElasticIndexes extends Command $this->info("Rebuilding {$modelName}"); $this->line("Index: {$indexName}"); $this->line("Records: {$recordCount}"); + $this->line("Chunk size: {$this->option('chunk')}"); + $this->line("Mode: " . ($this->option('no-queue') ? 'Synchronous' : 'Queued')); $this->newLine(); if (!$this->option('force') && !$this->option('dry-run')) { @@ -227,7 +243,6 @@ class RebuildElasticIndexes extends Command $this->line(" [2/3] Running elastic migration..."); try { - // Note: elastic:migrate recreates all indexes, but only the dropped one will be created Artisan::call('elastic:migrate', [], $this->getOutput()); $this->info(" ✓ Migration completed"); } catch (\Exception $migrateException) { @@ -247,10 +262,15 @@ class RebuildElasticIndexes extends Command if ($recordCount > 0) { try { - Artisan::call('scout:import', [ - 'model' => $modelClass, - ], $this->getOutput()); - $this->info(" ✓ Imported {$recordCount} records"); + if ($this->option('no-queue')) { + // Synchronous import - process in chunks + $this->line(" - Using synchronous import (no queue)", 'comment'); + $this->importSynchronously($modelClass, $recordCount); + } else { + // Queue-based import with tracking + $this->importWithQueueTracking($modelClass, $recordCount); + } + $this->info(" ✓ Import completed for {$recordCount} records"); } catch (\Exception $importException) { $this->error(" ✗ Import failed: " . $importException->getMessage()); return false; @@ -267,6 +287,138 @@ class RebuildElasticIndexes extends Command return false; } } + + protected function importSynchronously(string $modelClass, int $totalRecords): void + { + $chunkSize = (int) $this->option('chunk'); + $chunks = ceil($totalRecords / $chunkSize); + $processed = 0; + + $this->line(" - Processing {$chunks} chunks of {$chunkSize} records each", 'comment'); + + $modelClass::chunk($chunkSize, function ($models) use (&$processed, $totalRecords) { + $models->searchable(); + $processed += $models->count(); + $percentage = round(($processed / $totalRecords) * 100); + $this->line(" - Indexed {$processed}/{$totalRecords} ({$percentage}%)", 'comment'); + }); + } + + protected function importWithQueueTracking(string $modelClass, int $recordCount): void + { + $chunkSize = (int) $this->option('chunk'); + $expectedJobCount = ceil($recordCount / $chunkSize); + + // Get baseline queue size BEFORE dispatching our jobs + $queueName = config('scout.queue.queue', 'scout'); + $connection = config('scout.queue.connection', config('queue.default')); + + try { + $baselineJobCount = $this->getPendingJobCount($connection, $queueName); + } catch (\Exception $e) { + $baselineJobCount = 0; + $this->line(" - Cannot track queue baseline: " . $e->getMessage(), 'comment'); + } + + $this->line(" - Baseline queue size: {$baselineJobCount} jobs", 'comment'); + $this->line(" - Dispatching ~{$expectedJobCount} import jobs (chunks of {$chunkSize})", 'comment'); + + // Dispatch the import jobs + Artisan::call('scout:import', [ + 'model' => $modelClass, + '--chunk' => $chunkSize, + ], $this->getOutput()); + + $this->line(" - Jobs dispatched to queue", 'comment'); + + // If --wait flag is set, monitor OUR jobs + if ($this->option('wait')) { + $this->waitForOurJobsToComplete($connection, $queueName, $baselineJobCount, $expectedJobCount); + } + } + + protected function waitForOurJobsToComplete( + string $connection, + string $queueName, + int $baselineJobCount, + int $expectedJobCount + ): void { + $this->newLine(); + $this->line(" Waiting for our {$expectedJobCount} jobs to complete..."); + $this->line(" (Queue may have other jobs - we're tracking the delta)", 'comment'); + + $maxWaitSeconds = 600; // 10 minutes max + $startTime = time(); + $lastReportedDelta = -1; + $stableCount = 0; + $targetJobCount = $baselineJobCount; // We want to return to baseline (or lower) + + while ((time() - $startTime) < $maxWaitSeconds) { + try { + $currentJobCount = $this->getPendingJobCount($connection, $queueName); + $delta = $currentJobCount - $baselineJobCount; + + // If current count is at or below baseline, our jobs are done + if ($currentJobCount <= $baselineJobCount) { + $this->info(" ✓ Our jobs completed (queue: {$currentJobCount}, baseline: {$baselineJobCount})"); + return; + } + + // Report progress if delta changed + if ($delta !== $lastReportedDelta) { + $this->line(" - Our jobs remaining: ~{$delta} (total queue: {$currentJobCount})", 'comment'); + $lastReportedDelta = $delta; + $stableCount = 0; + } else { + $stableCount++; + } + + // If queue size is stable at a reasonable level for 30 seconds, assume we're done + // (handles case where other jobs are being added at similar rate) + if ($stableCount >= 15 && $delta <= $expectedJobCount) { + $this->info(" ✓ Queue stabilized with {$delta} jobs above baseline - assuming complete"); + return; + } + + sleep(2); + } catch (\Exception $e) { + $this->warn(" ⚠ Could not check queue status: " . $e->getMessage()); + $this->line(" - Waiting 10 seconds before continuing...", 'comment'); + sleep(10); + return; + } + } + + try { + $finalCount = $this->getPendingJobCount($connection, $queueName); + $this->warn(" ⚠ Timeout after {$maxWaitSeconds}s (queue: {$finalCount}, baseline: {$baselineJobCount})"); + } catch (\Exception $e) { + $this->warn(" ⚠ Timeout after {$maxWaitSeconds}s - continuing"); + } + } + + protected function getPendingJobCount(string $connection, string $queueName): int + { + $driver = config("queue.connections.{$connection}.driver"); + + switch ($driver) { + case 'database': + return DB::table(config("queue.connections.{$connection}.table", 'jobs')) + ->where('queue', $queueName) + ->count(); + + case 'redis': + $redis = app('redis')->connection(config("queue.connections.{$connection}.connection")); + $prefix = config('database.redis.options.prefix', ''); + return $redis->llen($prefix . 'queues:' . $queueName); + + case 'sync': + return 0; + + default: + throw new \Exception("Cannot check queue size for driver: {$driver}"); + } + } protected function getTotalRecordCount(): int {