From 5158fae577b8bc65e7cf177ea28319d234a13b8c Mon Sep 17 00:00:00 2001 From: David Bomba Date: Sun, 23 Nov 2025 01:01:20 +0000 Subject: [PATCH] Adjustments for droppping index --- .../Elastic/RebuildElasticIndexes.php | 53 ++++++++----------- 1 file changed, 22 insertions(+), 31 deletions(-) diff --git a/app/Console/Commands/Elastic/RebuildElasticIndexes.php b/app/Console/Commands/Elastic/RebuildElasticIndexes.php index d8cc558371..859b564ff6 100644 --- a/app/Console/Commands/Elastic/RebuildElasticIndexes.php +++ b/app/Console/Commands/Elastic/RebuildElasticIndexes.php @@ -58,17 +58,14 @@ class RebuildElasticIndexes extends Command return self::FAILURE; } - // Handle single model rebuild if ($modelName = $this->option('model')) { return $this->rebuildSingleModel($modelName); } - // Dry run mode if ($this->option('dry-run')) { return $this->performDryRun(); } - // Get confirmation for full rebuild if (!$this->option('force')) { $this->warn('This command will rebuild ALL Elasticsearch indexes ONE AT A TIME:'); $this->info(' • Each index will be dropped, migrated, and re-imported sequentially'); @@ -76,7 +73,7 @@ class RebuildElasticIndexes extends Command $this->info(' • Other models remain searchable while one rebuilds'); if ($this->option('wait')) { - $this->info(' • Will WAIT for our jobs to complete after each model (production-safe)'); + $this->info(' • Will WAIT for our jobs to complete (tracks pending + processing)'); } else { $this->warn(' • WARNING: Jobs will queue up async (use --wait for production)'); } @@ -99,7 +96,6 @@ class RebuildElasticIndexes extends Command $this->newLine(); - // Rebuild all models one by one $totalModels = count($this->searchableModels); $currentModel = 0; $startTime = now(); @@ -127,7 +123,6 @@ class RebuildElasticIndexes extends Command protected function rebuildSingleModel(string $modelName): int { - // Find the model class $modelClass = null; foreach ($this->searchableModels as $class => $indexName) { if (class_basename($class) === $modelName) { @@ -217,7 +212,6 @@ class RebuildElasticIndexes extends Command $client = $this->getElasticsearchClient(); try { - // Step 1: Drop the index (with safe existence check) $this->line(" [1/3] Dropping index {$indexName}..."); try { @@ -239,7 +233,6 @@ class RebuildElasticIndexes extends Command $this->line(" - Continuing with migration...", 'comment'); } - // Step 2: Run migration for this specific index $this->line(" [2/3] Running elastic migration..."); try { @@ -250,7 +243,6 @@ class RebuildElasticIndexes extends Command return false; } - // Step 3: Import data $this->line(" [3/3] Importing {$modelName} data..."); try { @@ -263,11 +255,9 @@ class RebuildElasticIndexes extends Command if ($recordCount > 0) { try { if ($this->option('no-queue')) { - // Synchronous import - process in chunks $this->line(" - Using synchronous import (no queue)", 'comment'); $this->importSynchronously($modelClass, $recordCount); } else { - // Queue-based import with tracking $this->importWithQueueTracking($modelClass, $recordCount); } $this->info(" ✓ Import completed for {$recordCount} records"); @@ -309,21 +299,19 @@ class RebuildElasticIndexes extends Command $chunkSize = (int) $this->option('chunk'); $expectedJobCount = ceil($recordCount / $chunkSize); - // Get baseline queue size BEFORE dispatching our jobs $queueName = config('scout.queue.queue', 'scout'); $connection = config('scout.queue.connection', config('queue.default')); try { - $baselineJobCount = $this->getPendingJobCount($connection, $queueName); + $baselineJobCount = $this->getTotalActiveJobCount($connection, $queueName); } catch (\Exception $e) { $baselineJobCount = 0; $this->line(" - Cannot track queue baseline: " . $e->getMessage(), 'comment'); } - $this->line(" - Baseline queue size: {$baselineJobCount} jobs", 'comment'); + $this->line(" - Baseline active jobs: {$baselineJobCount} (pending + processing)", 'comment'); $this->line(" - Dispatching ~{$expectedJobCount} import jobs (chunks of {$chunkSize})", 'comment'); - // Dispatch the import jobs Artisan::call('scout:import', [ 'model' => $modelClass, '--chunk' => $chunkSize, @@ -331,7 +319,6 @@ class RebuildElasticIndexes extends Command $this->line(" - Jobs dispatched to queue", 'comment'); - // If --wait flag is set, monitor OUR jobs if ($this->option('wait')) { $this->waitForOurJobsToComplete($connection, $queueName, $baselineJobCount, $expectedJobCount); } @@ -345,38 +332,33 @@ class RebuildElasticIndexes extends Command ): void { $this->newLine(); $this->line(" Waiting for our {$expectedJobCount} jobs to complete..."); - $this->line(" (Queue may have other jobs - we're tracking the delta)", 'comment'); + $this->line(" (Tracking: pending + processing jobs)", 'comment'); - $maxWaitSeconds = 600; // 10 minutes max + $maxWaitSeconds = 600; $startTime = time(); $lastReportedDelta = -1; $stableCount = 0; - $targetJobCount = $baselineJobCount; // We want to return to baseline (or lower) while ((time() - $startTime) < $maxWaitSeconds) { try { - $currentJobCount = $this->getPendingJobCount($connection, $queueName); + $currentJobCount = $this->getTotalActiveJobCount($connection, $queueName); $delta = $currentJobCount - $baselineJobCount; - // If current count is at or below baseline, our jobs are done if ($currentJobCount <= $baselineJobCount) { - $this->info(" ✓ Our jobs completed (queue: {$currentJobCount}, baseline: {$baselineJobCount})"); + $this->info(" ✓ Our jobs completed (active: {$currentJobCount}, baseline: {$baselineJobCount})"); return; } - // Report progress if delta changed if ($delta !== $lastReportedDelta) { - $this->line(" - Our jobs remaining: ~{$delta} (total queue: {$currentJobCount})", 'comment'); + $this->line(" - Our jobs remaining: ~{$delta} (total active: {$currentJobCount})", 'comment'); $lastReportedDelta = $delta; $stableCount = 0; } else { $stableCount++; } - // If queue size is stable at a reasonable level for 30 seconds, assume we're done - // (handles case where other jobs are being added at similar rate) if ($stableCount >= 15 && $delta <= $expectedJobCount) { - $this->info(" ✓ Queue stabilized with {$delta} jobs above baseline - assuming complete"); + $this->info(" ✓ Queue stabilized - assuming complete"); return; } @@ -390,27 +372,36 @@ class RebuildElasticIndexes extends Command } try { - $finalCount = $this->getPendingJobCount($connection, $queueName); - $this->warn(" ⚠ Timeout after {$maxWaitSeconds}s (queue: {$finalCount}, baseline: {$baselineJobCount})"); + $finalCount = $this->getTotalActiveJobCount($connection, $queueName); + $this->warn(" ⚠ Timeout after {$maxWaitSeconds}s (active: {$finalCount}, baseline: {$baselineJobCount})"); } catch (\Exception $e) { $this->warn(" ⚠ Timeout after {$maxWaitSeconds}s - continuing"); } } - protected function getPendingJobCount(string $connection, string $queueName): int + protected function getTotalActiveJobCount(string $connection, string $queueName): int { $driver = config("queue.connections.{$connection}.driver"); switch ($driver) { case 'database': + // Count both pending (reserved_at IS NULL) and processing (reserved_at IS NOT NULL) return DB::table(config("queue.connections.{$connection}.table", 'jobs')) ->where('queue', $queueName) ->count(); case 'redis': + // Redis: pending jobs in list + reserved jobs in processing set $redis = app('redis')->connection(config("queue.connections.{$connection}.connection")); $prefix = config('database.redis.options.prefix', ''); - return $redis->llen($prefix . 'queues:' . $queueName); + + // Pending jobs in the queue list + $pending = $redis->llen($prefix . 'queues:' . $queueName); + + // Processing jobs in the reserved set + $processing = $redis->zcard($prefix . 'queues:' . $queueName . ':reserved'); + + return $pending + $processing; case 'sync': return 0;