Feature/adapt priorities of data gathering jobs (#3262)
* Adapt priorities of data gathering jobs * Update changelog
This commit is contained in:
@@ -7,13 +7,12 @@ import { ManualService } from '@ghostfolio/api/services/data-provider/manual/man
|
||||
import { MarketDataService } from '@ghostfolio/api/services/market-data/market-data.service';
|
||||
import { PropertyDto } from '@ghostfolio/api/services/property/property.dto';
|
||||
import {
|
||||
DATA_GATHERING_QUEUE_PRIORITY_HIGH,
|
||||
DATA_GATHERING_QUEUE_PRIORITY_MEDIUM,
|
||||
GATHER_ASSET_PROFILE_PROCESS,
|
||||
GATHER_ASSET_PROFILE_PROCESS_OPTIONS
|
||||
} from '@ghostfolio/common/config';
|
||||
import {
|
||||
getAssetProfileIdentifier,
|
||||
resetHours
|
||||
} from '@ghostfolio/common/helper';
|
||||
import { getAssetProfileIdentifier } from '@ghostfolio/common/helper';
|
||||
import {
|
||||
AdminData,
|
||||
AdminMarketData,
|
||||
@@ -94,7 +93,8 @@ export class AdminController {
|
||||
name: GATHER_ASSET_PROFILE_PROCESS,
|
||||
opts: {
|
||||
...GATHER_ASSET_PROFILE_PROCESS_OPTIONS,
|
||||
jobId: getAssetProfileIdentifier({ dataSource, symbol })
|
||||
jobId: getAssetProfileIdentifier({ dataSource, symbol }),
|
||||
priority: DATA_GATHERING_QUEUE_PRIORITY_MEDIUM
|
||||
}
|
||||
};
|
||||
})
|
||||
@@ -119,7 +119,8 @@ export class AdminController {
|
||||
name: GATHER_ASSET_PROFILE_PROCESS,
|
||||
opts: {
|
||||
...GATHER_ASSET_PROFILE_PROCESS_OPTIONS,
|
||||
jobId: getAssetProfileIdentifier({ dataSource, symbol })
|
||||
jobId: getAssetProfileIdentifier({ dataSource, symbol }),
|
||||
priority: DATA_GATHERING_QUEUE_PRIORITY_MEDIUM
|
||||
}
|
||||
};
|
||||
})
|
||||
@@ -141,7 +142,8 @@ export class AdminController {
|
||||
name: GATHER_ASSET_PROFILE_PROCESS,
|
||||
opts: {
|
||||
...GATHER_ASSET_PROFILE_PROCESS_OPTIONS,
|
||||
jobId: getAssetProfileIdentifier({ dataSource, symbol })
|
||||
jobId: getAssetProfileIdentifier({ dataSource, symbol }),
|
||||
priority: DATA_GATHERING_QUEUE_PRIORITY_HIGH
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@@ -58,6 +58,7 @@ export class QueueService {
|
||||
finishedOn: job.finishedOn,
|
||||
id: job.id,
|
||||
name: job.name,
|
||||
opts: job.opts,
|
||||
stacktrace: job.stacktrace,
|
||||
state: await job.getState(),
|
||||
timestamp: job.timestamp
|
||||
|
@@ -13,6 +13,10 @@ import { DataGatheringService } from '@ghostfolio/api/services/data-gathering/da
|
||||
import { DataProviderService } from '@ghostfolio/api/services/data-provider/data-provider.service';
|
||||
import { ExchangeRateDataService } from '@ghostfolio/api/services/exchange-rate-data/exchange-rate-data.service';
|
||||
import { SymbolProfileService } from '@ghostfolio/api/services/symbol-profile/symbol-profile.service';
|
||||
import {
|
||||
DATA_GATHERING_QUEUE_PRIORITY_HIGH,
|
||||
DATA_GATHERING_QUEUE_PRIORITY_MEDIUM
|
||||
} from '@ghostfolio/common/config';
|
||||
import {
|
||||
DATE_FORMAT,
|
||||
getAssetProfileIdentifier,
|
||||
@@ -448,15 +452,16 @@ export class ImportService {
|
||||
});
|
||||
});
|
||||
|
||||
this.dataGatheringService.gatherSymbols(
|
||||
uniqueActivities.map(({ date, SymbolProfile }) => {
|
||||
this.dataGatheringService.gatherSymbols({
|
||||
dataGatheringItems: uniqueActivities.map(({ date, SymbolProfile }) => {
|
||||
return {
|
||||
date,
|
||||
dataSource: SymbolProfile.dataSource,
|
||||
symbol: SymbolProfile.symbol
|
||||
};
|
||||
})
|
||||
);
|
||||
}),
|
||||
priority: DATA_GATHERING_QUEUE_PRIORITY_HIGH
|
||||
});
|
||||
}
|
||||
|
||||
return activities;
|
||||
|
@@ -7,7 +7,10 @@ import { TransformDataSourceInResponseInterceptor } from '@ghostfolio/api/interc
|
||||
import { ApiService } from '@ghostfolio/api/services/api/api.service';
|
||||
import { DataGatheringService } from '@ghostfolio/api/services/data-gathering/data-gathering.service';
|
||||
import { ImpersonationService } from '@ghostfolio/api/services/impersonation/impersonation.service';
|
||||
import { HEADER_KEY_IMPERSONATION } from '@ghostfolio/common/config';
|
||||
import {
|
||||
DATA_GATHERING_QUEUE_PRIORITY_HIGH,
|
||||
HEADER_KEY_IMPERSONATION
|
||||
} from '@ghostfolio/common/config';
|
||||
import { hasPermission, permissions } from '@ghostfolio/common/permissions';
|
||||
import type { DateRange, RequestWithUser } from '@ghostfolio/common/types';
|
||||
|
||||
@@ -160,13 +163,16 @@ export class OrderController {
|
||||
if (data.dataSource && !order.isDraft) {
|
||||
// Gather symbol data in the background, if data source is set
|
||||
// (not MANUAL) and not draft
|
||||
this.dataGatheringService.gatherSymbols([
|
||||
{
|
||||
dataSource: data.dataSource,
|
||||
date: order.date,
|
||||
symbol: data.symbol
|
||||
}
|
||||
]);
|
||||
this.dataGatheringService.gatherSymbols({
|
||||
dataGatheringItems: [
|
||||
{
|
||||
dataSource: data.dataSource,
|
||||
date: order.date,
|
||||
symbol: data.symbol
|
||||
}
|
||||
],
|
||||
priority: DATA_GATHERING_QUEUE_PRIORITY_HIGH
|
||||
});
|
||||
}
|
||||
|
||||
return order;
|
||||
|
@@ -4,6 +4,7 @@ import { ExchangeRateDataService } from '@ghostfolio/api/services/exchange-rate-
|
||||
import { PrismaService } from '@ghostfolio/api/services/prisma/prisma.service';
|
||||
import { SymbolProfileService } from '@ghostfolio/api/services/symbol-profile/symbol-profile.service';
|
||||
import {
|
||||
DATA_GATHERING_QUEUE_PRIORITY_HIGH,
|
||||
GATHER_ASSET_PROFILE_PROCESS,
|
||||
GATHER_ASSET_PROFILE_PROCESS_OPTIONS
|
||||
} from '@ghostfolio/common/config';
|
||||
@@ -101,7 +102,8 @@ export class OrderService {
|
||||
jobId: getAssetProfileIdentifier({
|
||||
dataSource: data.SymbolProfile.connectOrCreate.create.dataSource,
|
||||
symbol: data.SymbolProfile.connectOrCreate.create.symbol
|
||||
})
|
||||
}),
|
||||
priority: DATA_GATHERING_QUEUE_PRIORITY_HIGH
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -427,13 +429,17 @@ export class OrderService {
|
||||
|
||||
if (!isDraft) {
|
||||
// Gather symbol data of order in the background, if not draft
|
||||
this.dataGatheringService.gatherSymbols([
|
||||
{
|
||||
dataSource: data.SymbolProfile.connect.dataSource_symbol.dataSource,
|
||||
date: <Date>data.date,
|
||||
symbol: data.SymbolProfile.connect.dataSource_symbol.symbol
|
||||
}
|
||||
]);
|
||||
this.dataGatheringService.gatherSymbols({
|
||||
dataGatheringItems: [
|
||||
{
|
||||
dataSource:
|
||||
data.SymbolProfile.connect.dataSource_symbol.dataSource,
|
||||
date: <Date>data.date,
|
||||
symbol: data.SymbolProfile.connect.dataSource_symbol.symbol
|
||||
}
|
||||
],
|
||||
priority: DATA_GATHERING_QUEUE_PRIORITY_HIGH
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -1,4 +1,5 @@
|
||||
import {
|
||||
DATA_GATHERING_QUEUE_PRIORITY_LOW,
|
||||
GATHER_ASSET_PROFILE_PROCESS,
|
||||
GATHER_ASSET_PROFILE_PROCESS_OPTIONS,
|
||||
PROPERTY_IS_DATA_GATHERING_ENABLED
|
||||
@@ -56,7 +57,8 @@ export class CronService {
|
||||
name: GATHER_ASSET_PROFILE_PROCESS,
|
||||
opts: {
|
||||
...GATHER_ASSET_PROFILE_PROCESS_OPTIONS,
|
||||
jobId: getAssetProfileIdentifier({ dataSource, symbol })
|
||||
jobId: getAssetProfileIdentifier({ dataSource, symbol }),
|
||||
priority: DATA_GATHERING_QUEUE_PRIORITY_LOW
|
||||
}
|
||||
};
|
||||
})
|
||||
|
@@ -8,6 +8,8 @@ import { PropertyService } from '@ghostfolio/api/services/property/property.serv
|
||||
import { SymbolProfileService } from '@ghostfolio/api/services/symbol-profile/symbol-profile.service';
|
||||
import {
|
||||
DATA_GATHERING_QUEUE,
|
||||
DATA_GATHERING_QUEUE_PRIORITY_HIGH,
|
||||
DATA_GATHERING_QUEUE_PRIORITY_LOW,
|
||||
GATHER_HISTORICAL_MARKET_DATA_PROCESS,
|
||||
GATHER_HISTORICAL_MARKET_DATA_PROCESS_OPTIONS,
|
||||
PROPERTY_BENCHMARKS
|
||||
@@ -61,24 +63,35 @@ export class DataGatheringService {
|
||||
|
||||
public async gather7Days() {
|
||||
const dataGatheringItems = await this.getSymbols7D();
|
||||
await this.gatherSymbols(dataGatheringItems);
|
||||
await this.gatherSymbols({
|
||||
dataGatheringItems,
|
||||
priority: DATA_GATHERING_QUEUE_PRIORITY_LOW
|
||||
});
|
||||
}
|
||||
|
||||
public async gatherMax() {
|
||||
const dataGatheringItems = await this.getSymbolsMax();
|
||||
await this.gatherSymbols(dataGatheringItems);
|
||||
await this.gatherSymbols({
|
||||
dataGatheringItems,
|
||||
priority: DATA_GATHERING_QUEUE_PRIORITY_LOW
|
||||
});
|
||||
}
|
||||
|
||||
public async gatherSymbol({ dataSource, symbol }: UniqueAsset) {
|
||||
await this.marketDataService.deleteMany({ dataSource, symbol });
|
||||
|
||||
const symbols = (await this.getSymbolsMax()).filter((dataGatheringItem) => {
|
||||
return (
|
||||
dataGatheringItem.dataSource === dataSource &&
|
||||
dataGatheringItem.symbol === symbol
|
||||
);
|
||||
const dataGatheringItems = (await this.getSymbolsMax()).filter(
|
||||
(dataGatheringItem) => {
|
||||
return (
|
||||
dataGatheringItem.dataSource === dataSource &&
|
||||
dataGatheringItem.symbol === symbol
|
||||
);
|
||||
}
|
||||
);
|
||||
await this.gatherSymbols({
|
||||
dataGatheringItems,
|
||||
priority: DATA_GATHERING_QUEUE_PRIORITY_HIGH
|
||||
});
|
||||
await this.gatherSymbols(symbols);
|
||||
}
|
||||
|
||||
public async gatherSymbolForDate({
|
||||
@@ -230,9 +243,15 @@ export class DataGatheringService {
|
||||
);
|
||||
}
|
||||
|
||||
public async gatherSymbols(aSymbolsWithStartDate: IDataGatheringItem[]) {
|
||||
public async gatherSymbols({
|
||||
dataGatheringItems,
|
||||
priority
|
||||
}: {
|
||||
dataGatheringItems: IDataGatheringItem[];
|
||||
priority: number;
|
||||
}) {
|
||||
await this.addJobsToQueue(
|
||||
aSymbolsWithStartDate.map(({ dataSource, date, symbol }) => {
|
||||
dataGatheringItems.map(({ dataSource, date, symbol }) => {
|
||||
return {
|
||||
data: {
|
||||
dataSource,
|
||||
@@ -242,6 +261,7 @@ export class DataGatheringService {
|
||||
name: GATHER_HISTORICAL_MARKET_DATA_PROCESS,
|
||||
opts: {
|
||||
...GATHER_HISTORICAL_MARKET_DATA_PROCESS_OPTIONS,
|
||||
priority,
|
||||
jobId: `${getAssetProfileIdentifier({
|
||||
dataSource,
|
||||
symbol
|
||||
|
Reference in New Issue
Block a user