Adjustments for droppping index
This commit is contained in:
parent
c1471d1846
commit
cdd5352b2e
|
|
@ -17,13 +17,17 @@ use App\Models\Vendor;
|
|||
use App\Models\VendorContact;
|
||||
use Elastic\Elasticsearch\ClientBuilder;
|
||||
use Illuminate\Support\Facades\Artisan;
|
||||
use Illuminate\Support\Facades\DB;
|
||||
|
||||
class RebuildElasticIndexes extends Command
|
||||
{
|
||||
protected $signature = 'elastic:rebuild
|
||||
{--model= : Rebuild only a specific model (e.g., Invoice, Client)}
|
||||
{--force : Force the operation without confirmation}
|
||||
{--dry-run : Show what would be done without making changes}';
|
||||
{--dry-run : Show what would be done without making changes}
|
||||
{--chunk=500 : Number of records to import per chunk}
|
||||
{--wait : Wait for our queued jobs to complete after each model (recommended for production)}
|
||||
{--no-queue : Import synchronously instead of queueing (slower but safer)}';
|
||||
|
||||
protected $description = 'Rebuild Elasticsearch indexes one at a time to minimize production impact';
|
||||
|
||||
|
|
@ -70,7 +74,17 @@ class RebuildElasticIndexes extends Command
|
|||
$this->info(' • Each index will be dropped, migrated, and re-imported sequentially');
|
||||
$this->info(' • Search will be unavailable for each model during its rebuild');
|
||||
$this->info(' • Other models remain searchable while one rebuilds');
|
||||
$this->info(' • This minimizes overall downtime for production systems');
|
||||
|
||||
if ($this->option('wait')) {
|
||||
$this->info(' • Will WAIT for our jobs to complete after each model (production-safe)');
|
||||
} else {
|
||||
$this->warn(' • WARNING: Jobs will queue up async (use --wait for production)');
|
||||
}
|
||||
|
||||
if ($this->option('no-queue')) {
|
||||
$this->info(' • Using SYNCHRONOUS import (slower but guaranteed)');
|
||||
}
|
||||
|
||||
$this->newLine();
|
||||
|
||||
$totalRecords = $this->getTotalRecordCount();
|
||||
|
|
@ -141,6 +155,8 @@ class RebuildElasticIndexes extends Command
|
|||
$this->info("Rebuilding {$modelName}");
|
||||
$this->line("Index: {$indexName}");
|
||||
$this->line("Records: {$recordCount}");
|
||||
$this->line("Chunk size: {$this->option('chunk')}");
|
||||
$this->line("Mode: " . ($this->option('no-queue') ? 'Synchronous' : 'Queued'));
|
||||
$this->newLine();
|
||||
|
||||
if (!$this->option('force') && !$this->option('dry-run')) {
|
||||
|
|
@ -227,7 +243,6 @@ class RebuildElasticIndexes extends Command
|
|||
$this->line(" [2/3] Running elastic migration...");
|
||||
|
||||
try {
|
||||
// Note: elastic:migrate recreates all indexes, but only the dropped one will be created
|
||||
Artisan::call('elastic:migrate', [], $this->getOutput());
|
||||
$this->info(" ✓ Migration completed");
|
||||
} catch (\Exception $migrateException) {
|
||||
|
|
@ -247,10 +262,15 @@ class RebuildElasticIndexes extends Command
|
|||
|
||||
if ($recordCount > 0) {
|
||||
try {
|
||||
Artisan::call('scout:import', [
|
||||
'model' => $modelClass,
|
||||
], $this->getOutput());
|
||||
$this->info(" ✓ Imported {$recordCount} records");
|
||||
if ($this->option('no-queue')) {
|
||||
// Synchronous import - process in chunks
|
||||
$this->line(" - Using synchronous import (no queue)", 'comment');
|
||||
$this->importSynchronously($modelClass, $recordCount);
|
||||
} else {
|
||||
// Queue-based import with tracking
|
||||
$this->importWithQueueTracking($modelClass, $recordCount);
|
||||
}
|
||||
$this->info(" ✓ Import completed for {$recordCount} records");
|
||||
} catch (\Exception $importException) {
|
||||
$this->error(" ✗ Import failed: " . $importException->getMessage());
|
||||
return false;
|
||||
|
|
@ -268,6 +288,138 @@ class RebuildElasticIndexes extends Command
|
|||
}
|
||||
}
|
||||
|
||||
protected function importSynchronously(string $modelClass, int $totalRecords): void
|
||||
{
|
||||
$chunkSize = (int) $this->option('chunk');
|
||||
$chunks = ceil($totalRecords / $chunkSize);
|
||||
$processed = 0;
|
||||
|
||||
$this->line(" - Processing {$chunks} chunks of {$chunkSize} records each", 'comment');
|
||||
|
||||
$modelClass::chunk($chunkSize, function ($models) use (&$processed, $totalRecords) {
|
||||
$models->searchable();
|
||||
$processed += $models->count();
|
||||
$percentage = round(($processed / $totalRecords) * 100);
|
||||
$this->line(" - Indexed {$processed}/{$totalRecords} ({$percentage}%)", 'comment');
|
||||
});
|
||||
}
|
||||
|
||||
protected function importWithQueueTracking(string $modelClass, int $recordCount): void
|
||||
{
|
||||
$chunkSize = (int) $this->option('chunk');
|
||||
$expectedJobCount = ceil($recordCount / $chunkSize);
|
||||
|
||||
// Get baseline queue size BEFORE dispatching our jobs
|
||||
$queueName = config('scout.queue.queue', 'scout');
|
||||
$connection = config('scout.queue.connection', config('queue.default'));
|
||||
|
||||
try {
|
||||
$baselineJobCount = $this->getPendingJobCount($connection, $queueName);
|
||||
} catch (\Exception $e) {
|
||||
$baselineJobCount = 0;
|
||||
$this->line(" - Cannot track queue baseline: " . $e->getMessage(), 'comment');
|
||||
}
|
||||
|
||||
$this->line(" - Baseline queue size: {$baselineJobCount} jobs", 'comment');
|
||||
$this->line(" - Dispatching ~{$expectedJobCount} import jobs (chunks of {$chunkSize})", 'comment');
|
||||
|
||||
// Dispatch the import jobs
|
||||
Artisan::call('scout:import', [
|
||||
'model' => $modelClass,
|
||||
'--chunk' => $chunkSize,
|
||||
], $this->getOutput());
|
||||
|
||||
$this->line(" - Jobs dispatched to queue", 'comment');
|
||||
|
||||
// If --wait flag is set, monitor OUR jobs
|
||||
if ($this->option('wait')) {
|
||||
$this->waitForOurJobsToComplete($connection, $queueName, $baselineJobCount, $expectedJobCount);
|
||||
}
|
||||
}
|
||||
|
||||
protected function waitForOurJobsToComplete(
|
||||
string $connection,
|
||||
string $queueName,
|
||||
int $baselineJobCount,
|
||||
int $expectedJobCount
|
||||
): void {
|
||||
$this->newLine();
|
||||
$this->line(" Waiting for our {$expectedJobCount} jobs to complete...");
|
||||
$this->line(" (Queue may have other jobs - we're tracking the delta)", 'comment');
|
||||
|
||||
$maxWaitSeconds = 600; // 10 minutes max
|
||||
$startTime = time();
|
||||
$lastReportedDelta = -1;
|
||||
$stableCount = 0;
|
||||
$targetJobCount = $baselineJobCount; // We want to return to baseline (or lower)
|
||||
|
||||
while ((time() - $startTime) < $maxWaitSeconds) {
|
||||
try {
|
||||
$currentJobCount = $this->getPendingJobCount($connection, $queueName);
|
||||
$delta = $currentJobCount - $baselineJobCount;
|
||||
|
||||
// If current count is at or below baseline, our jobs are done
|
||||
if ($currentJobCount <= $baselineJobCount) {
|
||||
$this->info(" ✓ Our jobs completed (queue: {$currentJobCount}, baseline: {$baselineJobCount})");
|
||||
return;
|
||||
}
|
||||
|
||||
// Report progress if delta changed
|
||||
if ($delta !== $lastReportedDelta) {
|
||||
$this->line(" - Our jobs remaining: ~{$delta} (total queue: {$currentJobCount})", 'comment');
|
||||
$lastReportedDelta = $delta;
|
||||
$stableCount = 0;
|
||||
} else {
|
||||
$stableCount++;
|
||||
}
|
||||
|
||||
// If queue size is stable at a reasonable level for 30 seconds, assume we're done
|
||||
// (handles case where other jobs are being added at similar rate)
|
||||
if ($stableCount >= 15 && $delta <= $expectedJobCount) {
|
||||
$this->info(" ✓ Queue stabilized with {$delta} jobs above baseline - assuming complete");
|
||||
return;
|
||||
}
|
||||
|
||||
sleep(2);
|
||||
} catch (\Exception $e) {
|
||||
$this->warn(" ⚠ Could not check queue status: " . $e->getMessage());
|
||||
$this->line(" - Waiting 10 seconds before continuing...", 'comment');
|
||||
sleep(10);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
$finalCount = $this->getPendingJobCount($connection, $queueName);
|
||||
$this->warn(" ⚠ Timeout after {$maxWaitSeconds}s (queue: {$finalCount}, baseline: {$baselineJobCount})");
|
||||
} catch (\Exception $e) {
|
||||
$this->warn(" ⚠ Timeout after {$maxWaitSeconds}s - continuing");
|
||||
}
|
||||
}
|
||||
|
||||
protected function getPendingJobCount(string $connection, string $queueName): int
|
||||
{
|
||||
$driver = config("queue.connections.{$connection}.driver");
|
||||
|
||||
switch ($driver) {
|
||||
case 'database':
|
||||
return DB::table(config("queue.connections.{$connection}.table", 'jobs'))
|
||||
->where('queue', $queueName)
|
||||
->count();
|
||||
|
||||
case 'redis':
|
||||
$redis = app('redis')->connection(config("queue.connections.{$connection}.connection"));
|
||||
$prefix = config('database.redis.options.prefix', '');
|
||||
return $redis->llen($prefix . 'queues:' . $queueName);
|
||||
|
||||
case 'sync':
|
||||
return 0;
|
||||
|
||||
default:
|
||||
throw new \Exception("Cannot check queue size for driver: {$driver}");
|
||||
}
|
||||
}
|
||||
|
||||
protected function getTotalRecordCount(): int
|
||||
{
|
||||
$total = 0;
|
||||
|
|
|
|||
Loading…
Reference in New Issue