Adjustments for droppping index
This commit is contained in:
parent
cdd5352b2e
commit
5158fae577
|
|
@ -58,17 +58,14 @@ class RebuildElasticIndexes extends Command
|
||||||
return self::FAILURE;
|
return self::FAILURE;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Handle single model rebuild
|
|
||||||
if ($modelName = $this->option('model')) {
|
if ($modelName = $this->option('model')) {
|
||||||
return $this->rebuildSingleModel($modelName);
|
return $this->rebuildSingleModel($modelName);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Dry run mode
|
|
||||||
if ($this->option('dry-run')) {
|
if ($this->option('dry-run')) {
|
||||||
return $this->performDryRun();
|
return $this->performDryRun();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get confirmation for full rebuild
|
|
||||||
if (!$this->option('force')) {
|
if (!$this->option('force')) {
|
||||||
$this->warn('This command will rebuild ALL Elasticsearch indexes ONE AT A TIME:');
|
$this->warn('This command will rebuild ALL Elasticsearch indexes ONE AT A TIME:');
|
||||||
$this->info(' • Each index will be dropped, migrated, and re-imported sequentially');
|
$this->info(' • Each index will be dropped, migrated, and re-imported sequentially');
|
||||||
|
|
@ -76,7 +73,7 @@ class RebuildElasticIndexes extends Command
|
||||||
$this->info(' • Other models remain searchable while one rebuilds');
|
$this->info(' • Other models remain searchable while one rebuilds');
|
||||||
|
|
||||||
if ($this->option('wait')) {
|
if ($this->option('wait')) {
|
||||||
$this->info(' • Will WAIT for our jobs to complete after each model (production-safe)');
|
$this->info(' • Will WAIT for our jobs to complete (tracks pending + processing)');
|
||||||
} else {
|
} else {
|
||||||
$this->warn(' • WARNING: Jobs will queue up async (use --wait for production)');
|
$this->warn(' • WARNING: Jobs will queue up async (use --wait for production)');
|
||||||
}
|
}
|
||||||
|
|
@ -99,7 +96,6 @@ class RebuildElasticIndexes extends Command
|
||||||
|
|
||||||
$this->newLine();
|
$this->newLine();
|
||||||
|
|
||||||
// Rebuild all models one by one
|
|
||||||
$totalModels = count($this->searchableModels);
|
$totalModels = count($this->searchableModels);
|
||||||
$currentModel = 0;
|
$currentModel = 0;
|
||||||
$startTime = now();
|
$startTime = now();
|
||||||
|
|
@ -127,7 +123,6 @@ class RebuildElasticIndexes extends Command
|
||||||
|
|
||||||
protected function rebuildSingleModel(string $modelName): int
|
protected function rebuildSingleModel(string $modelName): int
|
||||||
{
|
{
|
||||||
// Find the model class
|
|
||||||
$modelClass = null;
|
$modelClass = null;
|
||||||
foreach ($this->searchableModels as $class => $indexName) {
|
foreach ($this->searchableModels as $class => $indexName) {
|
||||||
if (class_basename($class) === $modelName) {
|
if (class_basename($class) === $modelName) {
|
||||||
|
|
@ -217,7 +212,6 @@ class RebuildElasticIndexes extends Command
|
||||||
$client = $this->getElasticsearchClient();
|
$client = $this->getElasticsearchClient();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// Step 1: Drop the index (with safe existence check)
|
|
||||||
$this->line(" [1/3] Dropping index {$indexName}...");
|
$this->line(" [1/3] Dropping index {$indexName}...");
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
|
@ -239,7 +233,6 @@ class RebuildElasticIndexes extends Command
|
||||||
$this->line(" - Continuing with migration...", 'comment');
|
$this->line(" - Continuing with migration...", 'comment');
|
||||||
}
|
}
|
||||||
|
|
||||||
// Step 2: Run migration for this specific index
|
|
||||||
$this->line(" [2/3] Running elastic migration...");
|
$this->line(" [2/3] Running elastic migration...");
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
|
@ -250,7 +243,6 @@ class RebuildElasticIndexes extends Command
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Step 3: Import data
|
|
||||||
$this->line(" [3/3] Importing {$modelName} data...");
|
$this->line(" [3/3] Importing {$modelName} data...");
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
|
@ -263,11 +255,9 @@ class RebuildElasticIndexes extends Command
|
||||||
if ($recordCount > 0) {
|
if ($recordCount > 0) {
|
||||||
try {
|
try {
|
||||||
if ($this->option('no-queue')) {
|
if ($this->option('no-queue')) {
|
||||||
// Synchronous import - process in chunks
|
|
||||||
$this->line(" - Using synchronous import (no queue)", 'comment');
|
$this->line(" - Using synchronous import (no queue)", 'comment');
|
||||||
$this->importSynchronously($modelClass, $recordCount);
|
$this->importSynchronously($modelClass, $recordCount);
|
||||||
} else {
|
} else {
|
||||||
// Queue-based import with tracking
|
|
||||||
$this->importWithQueueTracking($modelClass, $recordCount);
|
$this->importWithQueueTracking($modelClass, $recordCount);
|
||||||
}
|
}
|
||||||
$this->info(" ✓ Import completed for {$recordCount} records");
|
$this->info(" ✓ Import completed for {$recordCount} records");
|
||||||
|
|
@ -309,21 +299,19 @@ class RebuildElasticIndexes extends Command
|
||||||
$chunkSize = (int) $this->option('chunk');
|
$chunkSize = (int) $this->option('chunk');
|
||||||
$expectedJobCount = ceil($recordCount / $chunkSize);
|
$expectedJobCount = ceil($recordCount / $chunkSize);
|
||||||
|
|
||||||
// Get baseline queue size BEFORE dispatching our jobs
|
|
||||||
$queueName = config('scout.queue.queue', 'scout');
|
$queueName = config('scout.queue.queue', 'scout');
|
||||||
$connection = config('scout.queue.connection', config('queue.default'));
|
$connection = config('scout.queue.connection', config('queue.default'));
|
||||||
|
|
||||||
try {
|
try {
|
||||||
$baselineJobCount = $this->getPendingJobCount($connection, $queueName);
|
$baselineJobCount = $this->getTotalActiveJobCount($connection, $queueName);
|
||||||
} catch (\Exception $e) {
|
} catch (\Exception $e) {
|
||||||
$baselineJobCount = 0;
|
$baselineJobCount = 0;
|
||||||
$this->line(" - Cannot track queue baseline: " . $e->getMessage(), 'comment');
|
$this->line(" - Cannot track queue baseline: " . $e->getMessage(), 'comment');
|
||||||
}
|
}
|
||||||
|
|
||||||
$this->line(" - Baseline queue size: {$baselineJobCount} jobs", 'comment');
|
$this->line(" - Baseline active jobs: {$baselineJobCount} (pending + processing)", 'comment');
|
||||||
$this->line(" - Dispatching ~{$expectedJobCount} import jobs (chunks of {$chunkSize})", 'comment');
|
$this->line(" - Dispatching ~{$expectedJobCount} import jobs (chunks of {$chunkSize})", 'comment');
|
||||||
|
|
||||||
// Dispatch the import jobs
|
|
||||||
Artisan::call('scout:import', [
|
Artisan::call('scout:import', [
|
||||||
'model' => $modelClass,
|
'model' => $modelClass,
|
||||||
'--chunk' => $chunkSize,
|
'--chunk' => $chunkSize,
|
||||||
|
|
@ -331,7 +319,6 @@ class RebuildElasticIndexes extends Command
|
||||||
|
|
||||||
$this->line(" - Jobs dispatched to queue", 'comment');
|
$this->line(" - Jobs dispatched to queue", 'comment');
|
||||||
|
|
||||||
// If --wait flag is set, monitor OUR jobs
|
|
||||||
if ($this->option('wait')) {
|
if ($this->option('wait')) {
|
||||||
$this->waitForOurJobsToComplete($connection, $queueName, $baselineJobCount, $expectedJobCount);
|
$this->waitForOurJobsToComplete($connection, $queueName, $baselineJobCount, $expectedJobCount);
|
||||||
}
|
}
|
||||||
|
|
@ -345,38 +332,33 @@ class RebuildElasticIndexes extends Command
|
||||||
): void {
|
): void {
|
||||||
$this->newLine();
|
$this->newLine();
|
||||||
$this->line(" Waiting for our {$expectedJobCount} jobs to complete...");
|
$this->line(" Waiting for our {$expectedJobCount} jobs to complete...");
|
||||||
$this->line(" (Queue may have other jobs - we're tracking the delta)", 'comment');
|
$this->line(" (Tracking: pending + processing jobs)", 'comment');
|
||||||
|
|
||||||
$maxWaitSeconds = 600; // 10 minutes max
|
$maxWaitSeconds = 600;
|
||||||
$startTime = time();
|
$startTime = time();
|
||||||
$lastReportedDelta = -1;
|
$lastReportedDelta = -1;
|
||||||
$stableCount = 0;
|
$stableCount = 0;
|
||||||
$targetJobCount = $baselineJobCount; // We want to return to baseline (or lower)
|
|
||||||
|
|
||||||
while ((time() - $startTime) < $maxWaitSeconds) {
|
while ((time() - $startTime) < $maxWaitSeconds) {
|
||||||
try {
|
try {
|
||||||
$currentJobCount = $this->getPendingJobCount($connection, $queueName);
|
$currentJobCount = $this->getTotalActiveJobCount($connection, $queueName);
|
||||||
$delta = $currentJobCount - $baselineJobCount;
|
$delta = $currentJobCount - $baselineJobCount;
|
||||||
|
|
||||||
// If current count is at or below baseline, our jobs are done
|
|
||||||
if ($currentJobCount <= $baselineJobCount) {
|
if ($currentJobCount <= $baselineJobCount) {
|
||||||
$this->info(" ✓ Our jobs completed (queue: {$currentJobCount}, baseline: {$baselineJobCount})");
|
$this->info(" ✓ Our jobs completed (active: {$currentJobCount}, baseline: {$baselineJobCount})");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Report progress if delta changed
|
|
||||||
if ($delta !== $lastReportedDelta) {
|
if ($delta !== $lastReportedDelta) {
|
||||||
$this->line(" - Our jobs remaining: ~{$delta} (total queue: {$currentJobCount})", 'comment');
|
$this->line(" - Our jobs remaining: ~{$delta} (total active: {$currentJobCount})", 'comment');
|
||||||
$lastReportedDelta = $delta;
|
$lastReportedDelta = $delta;
|
||||||
$stableCount = 0;
|
$stableCount = 0;
|
||||||
} else {
|
} else {
|
||||||
$stableCount++;
|
$stableCount++;
|
||||||
}
|
}
|
||||||
|
|
||||||
// If queue size is stable at a reasonable level for 30 seconds, assume we're done
|
|
||||||
// (handles case where other jobs are being added at similar rate)
|
|
||||||
if ($stableCount >= 15 && $delta <= $expectedJobCount) {
|
if ($stableCount >= 15 && $delta <= $expectedJobCount) {
|
||||||
$this->info(" ✓ Queue stabilized with {$delta} jobs above baseline - assuming complete");
|
$this->info(" ✓ Queue stabilized - assuming complete");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -390,27 +372,36 @@ class RebuildElasticIndexes extends Command
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
$finalCount = $this->getPendingJobCount($connection, $queueName);
|
$finalCount = $this->getTotalActiveJobCount($connection, $queueName);
|
||||||
$this->warn(" ⚠ Timeout after {$maxWaitSeconds}s (queue: {$finalCount}, baseline: {$baselineJobCount})");
|
$this->warn(" ⚠ Timeout after {$maxWaitSeconds}s (active: {$finalCount}, baseline: {$baselineJobCount})");
|
||||||
} catch (\Exception $e) {
|
} catch (\Exception $e) {
|
||||||
$this->warn(" ⚠ Timeout after {$maxWaitSeconds}s - continuing");
|
$this->warn(" ⚠ Timeout after {$maxWaitSeconds}s - continuing");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected function getPendingJobCount(string $connection, string $queueName): int
|
protected function getTotalActiveJobCount(string $connection, string $queueName): int
|
||||||
{
|
{
|
||||||
$driver = config("queue.connections.{$connection}.driver");
|
$driver = config("queue.connections.{$connection}.driver");
|
||||||
|
|
||||||
switch ($driver) {
|
switch ($driver) {
|
||||||
case 'database':
|
case 'database':
|
||||||
|
// Count both pending (reserved_at IS NULL) and processing (reserved_at IS NOT NULL)
|
||||||
return DB::table(config("queue.connections.{$connection}.table", 'jobs'))
|
return DB::table(config("queue.connections.{$connection}.table", 'jobs'))
|
||||||
->where('queue', $queueName)
|
->where('queue', $queueName)
|
||||||
->count();
|
->count();
|
||||||
|
|
||||||
case 'redis':
|
case 'redis':
|
||||||
|
// Redis: pending jobs in list + reserved jobs in processing set
|
||||||
$redis = app('redis')->connection(config("queue.connections.{$connection}.connection"));
|
$redis = app('redis')->connection(config("queue.connections.{$connection}.connection"));
|
||||||
$prefix = config('database.redis.options.prefix', '');
|
$prefix = config('database.redis.options.prefix', '');
|
||||||
return $redis->llen($prefix . 'queues:' . $queueName);
|
|
||||||
|
// Pending jobs in the queue list
|
||||||
|
$pending = $redis->llen($prefix . 'queues:' . $queueName);
|
||||||
|
|
||||||
|
// Processing jobs in the reserved set
|
||||||
|
$processing = $redis->zcard($prefix . 'queues:' . $queueName . ':reserved');
|
||||||
|
|
||||||
|
return $pending + $processing;
|
||||||
|
|
||||||
case 'sync':
|
case 'sync':
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue