From bb8baf348a047d9dc2591ba71ad0091758319d99 Mon Sep 17 00:00:00 2001 From: Aleksandr Statciuk Date: Mon, 14 Feb 2022 03:17:29 +0300 Subject: [PATCH] Update database/update.js --- scripts/commands/database/update.js | 113 ++++++++---------- .../input/database/db_update.streams.db | 1 + tests/commands/database/update.test.js | 19 ++- 3 files changed, 70 insertions(+), 63 deletions(-) diff --git a/scripts/commands/database/update.js b/scripts/commands/database/update.js index 6000e9b0f..a8a76c810 100644 --- a/scripts/commands/database/update.js +++ b/scripts/commands/database/update.js @@ -1,21 +1,63 @@ const { db, store, parser, file, logger } = require('../../core') const _ = require('lodash') -const items = [] - const LOGS_DIR = process.env.LOGS_DIR || 'scripts/logs/cluster/load' async function main() { - let streams = await loadStreams() + const streams = await loadStreams() const results = await loadResults() - const origins = await findOrigins(results) - streams = await updateStreams(streams, results, origins) + const origins = await loadOrigins(results) - await updateDatabase(streams) + await updateStreams(streams, results, origins) } main() +async function updateStreams(items = [], results = {}, origins = {}) { + logger.info('updating streams...') + + let buffer = {} + let updated = 0 + let removed = 0 + for (const item of items) { + const stream = store.create(item) + const result = results[item._id] + if (result) { + const status = parseStatus(result.error) + stream.set('status', { status }) + + if (result.streams.length) { + const { width, height, bitrate } = parseMediaInfo(result.streams) + stream.set('width', { width }) + stream.set('height', { height }) + stream.set('bitrate', { bitrate }) + } + + if (result.requests.length) { + const origin = findOrigin(result.requests, origins) + if (origin) { + stream.set('url', { url: origin }) + } + } + } + + if (buffer[stream.get('url')]) { + await db.streams.remove({ _id: stream.get('_id') }) + removed++ + } else if (stream.changed) { + await db.streams.update({ _id: stream.get('_id') }, stream.data()) + buffer[stream.get('url')] = true + updated++ + } + } + + db.streams.compact() + + logger.info(`updated ${updated} streams`) + logger.info(`removed ${removed} duplicates`) + logger.info('done') +} + async function loadStreams() { logger.info('loading streams...') @@ -28,7 +70,7 @@ async function loadStreams() { } async function loadResults() { - logger.info('loading results from logs...') + logger.info('loading check results...') const results = {} const files = await file.list(`${LOGS_DIR}/cluster_*.log`) @@ -44,8 +86,8 @@ async function loadResults() { return results } -async function findOrigins(results = {}) { - logger.info('searching for stream origins...') +async function loadOrigins(results = {}) { + logger.info('loading origins...') const origins = {} for (const { error, requests } of Object.values(results)) { @@ -67,57 +109,6 @@ async function findOrigins(results = {}) { return origins } -async function updateStreams(items = [], results = {}, origins = {}) { - logger.info('updating streams...') - - let updated = 0 - const output = [] - for (const item of items) { - const stream = store.create(item) - - const result = results[item._id] - if (result) { - const { error, streams, requests } = result - - const status = parseStatus(error) - stream.set('status', { status }) - - if (streams.length) { - const { width, height, bitrate } = parseStreams(streams) - stream.set('width', { width }) - stream.set('height', { height }) - stream.set('bitrate', { bitrate }) - } - - if (requests.length) { - const origin = findOrigin(requests, origins) - if (origin) { - stream.set('url', { url: origin }) - } - } - } - - if (stream.changed) updated++ - - output.push(stream.data()) - } - - logger.info(`updated ${updated} streams`) - - return output -} - -async function updateDatabase(streams = []) { - logger.info('saving to database...') - - for (const stream of streams) { - await db.streams.update({ _id: stream._id }, stream) - } - db.streams.compact() - - logger.info('done') -} - function findOrigin(requests = [], origins = {}) { if (origins && Array.isArray(requests)) { requests = requests.map(r => r.url.replace(/(^\w+:|^)/, '')) @@ -131,7 +122,7 @@ function findOrigin(requests = [], origins = {}) { return null } -function parseStreams(streams) { +function parseMediaInfo(streams) { streams = streams.filter(s => s.codec_type === 'video') streams = _.orderBy( streams, diff --git a/tests/__data__/input/database/db_update.streams.db b/tests/__data__/input/database/db_update.streams.db index ee46f1d6e..a9d7a1e71 100644 --- a/tests/__data__/input/database/db_update.streams.db +++ b/tests/__data__/input/database/db_update.streams.db @@ -4,3 +4,4 @@ {"title":"Kayhan TV","channel":"KayhanTV.af","filepath":"channels/af.m3u","url":"http://208.93.117.113/live/Stream1/playlist.m3u8","http_referrer":null,"user_agent":null,"cluster_id":1,"_id":"cFFpFVzSn6xFMUF3"} {"title":"Sharq","channel":"Sharq.af","filepath":"channels/af.m3u","url":"http://51.210.199.50/hls/stream.m3u8","http_referrer":null,"user_agent":null,"cluster_id":1,"_id":"u7iyA6cjtf1iWWAZ"} {"title":"BBC News HD","channel":"BBCNewsHD.ad","filepath":"tests/__data__/output/channels/uk.m3u","url":"http://1111296894.rsc.cdn77.org/LS-ATL-54548-6/index.m3u8","http_referrer":null,"user_agent":null,"cluster_id":3,"_id":"WTbieV1ptnZVCIdn"} +{"title":"ABC","channel":"ABC.uk","filepath":"tests/__data__/output/channels/uk.m3u","url":"http://1111296894.rsc.cdn77.org/LS-ATL-54548-6/index.m3u8","http_referrer":null,"user_agent":null,"cluster_id":3,"_id":"WTbieV1ptnZVCI3n"} diff --git a/tests/commands/database/update.test.js b/tests/commands/database/update.test.js index 6256a471c..3701dbaa1 100644 --- a/tests/commands/database/update.test.js +++ b/tests/commands/database/update.test.js @@ -8,14 +8,29 @@ beforeEach(() => { 'tests/__data__/input/database/db_update.streams.db', 'tests/__data__/output/streams.db' ) +}) +it('can save results', () => { const stdout = execSync( 'DB_DIR=tests/__data__/output LOGS_DIR=tests/__data__/input/logs/cluster/load npm run db:update', { encoding: 'utf8' } ) -}) + expect(stdout).toEqual(` +> db:update +> node scripts/commands/database/update.js + +loading streams... +found 7 streams +loading check results... +found 6 results +loading origins... +found 2 origins +updating streams... +updated 6 streams +removed 1 duplicates +done +`) -it('can save results', () => { expect(content('tests/__data__/output/streams.db')).toEqual( content('tests/__data__/expected/database/db_update.streams.db') )