Feature/improve handling of jobs (#1864)
* Improve handling of jobs * Remove jobs on complete * Refactor jobs removal * Update changelog
This commit is contained in:
parent
947460abdd
commit
654446f068
@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
## Unreleased
|
||||
|
||||
### Changed
|
||||
|
||||
- Enabled the configuration to immediately remove queue jobs on complete
|
||||
- Refactored the implementation of removing queue jobs
|
||||
|
||||
### Fixed
|
||||
|
||||
- Fixed the unique job ids of the gather asset profile process
|
||||
|
@ -4,7 +4,7 @@ import {
|
||||
} from '@ghostfolio/common/config';
|
||||
import { AdminJobs } from '@ghostfolio/common/interfaces';
|
||||
import { InjectQueue } from '@nestjs/bull';
|
||||
import { Injectable, Logger } from '@nestjs/common';
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import { JobStatus, Queue } from 'bull';
|
||||
|
||||
@Injectable()
|
||||
@ -23,14 +23,11 @@ export class QueueService {
|
||||
}: {
|
||||
status?: JobStatus[];
|
||||
}) {
|
||||
const jobs = await this.dataGatheringQueue.getJobs(status);
|
||||
|
||||
for (const job of jobs) {
|
||||
try {
|
||||
await job.remove();
|
||||
} catch (error) {
|
||||
Logger.warn(error, 'QueueService');
|
||||
}
|
||||
for (const statusItem of status) {
|
||||
await this.dataGatheringQueue.clean(
|
||||
300,
|
||||
statusItem === 'waiting' ? 'wait' : statusItem
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@ -44,18 +41,23 @@ export class QueueService {
|
||||
const jobs = await this.dataGatheringQueue.getJobs(status);
|
||||
|
||||
const jobsWithState = await Promise.all(
|
||||
jobs.slice(0, limit).map(async (job) => {
|
||||
return {
|
||||
attemptsMade: job.attemptsMade + 1,
|
||||
data: job.data,
|
||||
finishedOn: job.finishedOn,
|
||||
id: job.id,
|
||||
name: job.name,
|
||||
stacktrace: job.stacktrace,
|
||||
state: await job.getState(),
|
||||
timestamp: job.timestamp
|
||||
};
|
||||
})
|
||||
jobs
|
||||
.filter((job) => {
|
||||
return job;
|
||||
})
|
||||
.slice(0, limit)
|
||||
.map(async (job) => {
|
||||
return {
|
||||
attemptsMade: job.attemptsMade + 1,
|
||||
data: job.data,
|
||||
finishedOn: job.finishedOn,
|
||||
id: job.id,
|
||||
name: job.name,
|
||||
stacktrace: job.stacktrace,
|
||||
state: await job.getState(),
|
||||
timestamp: job.timestamp
|
||||
};
|
||||
})
|
||||
);
|
||||
|
||||
return {
|
||||
|
@ -49,9 +49,7 @@ export const GATHER_ASSET_PROFILE_PROCESS_OPTIONS: JobOptions = {
|
||||
type: 'exponential'
|
||||
},
|
||||
priority: DATA_GATHERING_QUEUE_PRIORITY_HIGH,
|
||||
removeOnComplete: {
|
||||
age: ms('2 weeks') / 1000
|
||||
}
|
||||
removeOnComplete: true
|
||||
};
|
||||
export const GATHER_HISTORICAL_MARKET_DATA_PROCESS =
|
||||
'GATHER_HISTORICAL_MARKET_DATA';
|
||||
@ -62,9 +60,7 @@ export const GATHER_HISTORICAL_MARKET_DATA_PROCESS_OPTIONS: JobOptions = {
|
||||
type: 'exponential'
|
||||
},
|
||||
priority: DATA_GATHERING_QUEUE_PRIORITY_LOW,
|
||||
removeOnComplete: {
|
||||
age: ms('2 weeks') / 1000
|
||||
}
|
||||
removeOnComplete: true
|
||||
};
|
||||
|
||||
export const HEADER_KEY_IMPERSONATION = 'Impersonation-Id';
|
||||
|
Loading…
x
Reference in New Issue
Block a user