overleaf-cep/services/history-v1/storage/lib/backupArchiver.mjs
Andrew Rumble 903277c222 Handle rangeHashes
GitOrigin-RevId: 9b9a67d8e45f17e948d003aed33cef6128a25744
2025-06-25 08:06:50 +00:00

474 lines
12 KiB
JavaScript

// @ts-check
import path from 'node:path'
import projectKey from './project_key.js'
import {
chunksBucket,
backupPersistor,
projectBlobsBucket,
globalBlobsBucket as backupGlobalBlobsBucket,
} from './backupPersistor.mjs'
import core, { Chunk, History } from 'overleaf-editor-core'
import {
GLOBAL_BLOBS,
makeProjectKey,
getStringLengthOfFile,
makeGlobalKey,
} from './blob_store/index.js'
import streams from './streams.js'
import objectPersistor from '@overleaf/object-persistor'
import OError from '@overleaf/o-error'
import chunkStore from './chunk_store/index.js'
import logger from '@overleaf/logger'
import fs from 'node:fs'
import { pipeline } from 'node:stream/promises'
import withTmpDir from '../../api/controllers/with_tmp_dir.js'
import { loadChunk } from './backupVerifier.mjs'
import globalBlobPersistor from './persistor.js'
import config from 'config'
import { NoKEKMatchedError } from '@overleaf/object-persistor/src/Errors.js'
const globalBlobsBucket = config.get('blobStore.globalBucket')
class BackupBlobStore {
/**
*
* @param {string} historyId
* @param {string} tmp
* @param {CachedPerProjectEncryptedS3Persistor} persistor
* @param {boolean} useBackupGlobalBlobs
*/
constructor(historyId, tmp, persistor, useBackupGlobalBlobs) {
this.historyId = historyId
this.tmp = tmp
this.blobs = new Map()
this.persistor = persistor
this.useBackupGlobalBlobs = useBackupGlobalBlobs
}
/**
* Required for BlobStore interface - not supported.
*
* @template T
* @param {string} hash
* @return {Promise<T>}
*/
async getObject(hash) {
try {
const stream = await this.getStream(hash)
const buffer = await streams.readStreamToBuffer(stream)
return JSON.parse(buffer.toString())
} catch (err) {
logger.warn({ err, hash }, 'Failed to fetch chunk blob')
throw err
}
}
/**
*
* @param {Set<string>} hashes
* @return {Promise<void>}
*/
async fetchBlobs(hashes) {
for await (const hash of hashes) {
if (this.blobs.has(hash)) return
const path = `${this.tmp}/${hash}`
/** @type {core.Blob} */
let blob
/** @type {NodeJS.ReadableStream} */
let blobStream
if (GLOBAL_BLOBS.has(hash)) {
try {
const blobData = await this.fetchGlobalBlob(hash)
await pipeline(blobData.stream, fs.createWriteStream(path))
blob = blobData.blob
} catch (err) {
logger.warn({ hash, err }, 'Failed to fetch global blob')
continue
}
} else {
try {
blobStream = await fetchBlob(this.historyId, hash, this.persistor)
await pipeline(blobStream, fs.createWriteStream(path))
blob = await this.makeBlob(hash, path)
} catch (err) {
logger.warn({ err, hash }, 'Failed to fetch chunk blob')
continue
}
}
this.blobs.set(hash, blob)
}
}
/**
*
* @param {string} hash
* @return {Promise<{ blob: core.Blob, stream: NodeJS.ReadableStream }>}
*/
async fetchGlobalBlob(hash) {
const globalBlob = GLOBAL_BLOBS.get(hash)
if (!globalBlob) {
throw new Error('blob does not exist or is not a global blob')
}
let stream
const key = makeGlobalKey(hash)
if (this.useBackupGlobalBlobs) {
stream = await this.persistor.getObjectStream(
backupGlobalBlobsBucket,
key
)
} else {
stream = await globalBlobPersistor.getObjectStream(globalBlobsBucket, key)
}
return { blob: globalBlob.blob, stream }
}
/**
*
* @param {string} hash
* @param {string} pathname
* @return {Promise<core.Blob>}
*/
async makeBlob(hash, pathname) {
const stat = await fs.promises.stat(pathname)
const byteLength = stat.size
const stringLength = await getStringLengthOfFile(byteLength, pathname)
if (stringLength) {
return new core.Blob(hash, byteLength, stringLength)
}
return new core.Blob(hash, byteLength)
}
/**
*
* @param {string} hash
* @return {Promise<string>}
*/
async getString(hash) {
const stream = await this.getStream(hash)
const buffer = await streams.readStreamToBuffer(stream)
return buffer.toString()
}
/**
*
* @param {string} hash
* @return {Promise<fs.ReadStream>}
*/
async getStream(hash) {
return fs.createReadStream(this.getBlobPathname(hash))
}
/**
*
* @param {string} hash
* @return {Promise<core.Blob>}
*/
async getBlob(hash) {
return this.blobs.get(hash)
}
/**
*
* @param {string} hash
* @return {string}
*/
getBlobPathname(hash) {
return path.join(this.tmp, hash)
}
}
/**
* @typedef {(import('@overleaf/object-persistor/src/PerProjectEncryptedS3Persistor.js').CachedPerProjectEncryptedS3Persistor)} CachedPerProjectEncryptedS3Persistor
*/
/**
* @typedef {(import('archiver').Archiver)} Archiver
*/
/**
* @typedef {(import('overleaf-editor-core').FileMap)} FileMap
*/
/**
*
* @param historyId
* @return {Promise<CachedPerProjectEncryptedS3Persistor>}
*/
async function getProjectPersistor(historyId) {
try {
return await backupPersistor.forProjectRO(
projectBlobsBucket,
makeProjectKey(historyId, '')
)
} catch (error) {
if (error instanceof NoKEKMatchedError) {
logger.info({}, 'no kek matched')
}
throw new BackupPersistorError(
'Failed to get project persistor',
{ historyId },
error instanceof Error ? error : undefined
)
}
}
/**
*
* @param persistor
* @param {string} key
* @return {Promise<{chunkData: any, buffer: Buffer}>}
*/
async function loadChunkByKey(persistor, key) {
try {
const buf = await streams.gunzipStreamToBuffer(
await persistor.getObjectStream(chunksBucket, key)
)
return { chunkData: JSON.parse(buf.toString('utf-8')), buffer: buf }
} catch (err) {
if (err instanceof objectPersistor.Errors.NotFoundError) {
throw new Chunk.NotPersistedError('chunk not found')
}
if (err instanceof Error) {
throw OError.tag(err, 'Failed to load chunk', { key })
}
throw err
}
}
/**
*
* @param {string} historyId
* @param {string} hash
* @param {CachedPerProjectEncryptedS3Persistor} persistor
* @return {Promise<NodeJS.ReadableStream>}
*/
async function fetchBlob(historyId, hash, persistor) {
const path = makeProjectKey(historyId, hash)
return await persistor.getObjectStream(projectBlobsBucket, path, {
autoGunzip: true,
})
}
/**
* @typedef {object} AddChunkOptions
* @property {string} [prefix] Should include trailing slash (if length > 0)
* @property {boolean} [useBackupGlobalBlobs]
*/
/**
*
* @param {History} history
* @param {Archiver} archive
* @param {CachedPerProjectEncryptedS3Persistor} projectCache
* @param {string} historyId
* @param {AddChunkOptions} [options]
* @returns {Promise<void>}
*/
async function addChunkToArchive(
history,
archive,
projectCache,
historyId,
{ prefix = '', useBackupGlobalBlobs = false } = {}
) {
const chunkBlobs = new Set()
history.findBlobHashes(chunkBlobs)
await withTmpDir('recovery-blob-', async tmpDir => {
const blobStore = new BackupBlobStore(
historyId,
tmpDir,
projectCache,
useBackupGlobalBlobs
)
await blobStore.fetchBlobs(chunkBlobs)
await history.loadFiles('lazy', blobStore)
const snapshot = history.getSnapshot()
snapshot.applyAll(history.getChanges())
const filePaths = snapshot.getFilePathnames()
if (filePaths.length === 0) {
logger.warn(
{ historyId, projectVersion: snapshot.projectVersion },
'No files found in snapshot backup'
)
}
for (const filePath of filePaths) {
/** @type {core.File | null | undefined} */
const file = snapshot.getFile(filePath)
if (!file) {
logger.error({ filePath }, 'File not found in snapshot')
continue
}
try {
await file.load('eager', blobStore)
} catch (err) {
logger.error(
{ filePath, err },
'Failed to load file from snapshot, skipping'
)
continue
}
const hash = file.getHash()
/** @type {string | fs.ReadStream | null | undefined} */
let content = file.getContent({ filterTrackedDeletes: true })
if (content === null) {
if (!hash) {
logger.error({ filePath }, 'File does not have a hash')
continue
}
const blob = await blobStore.getBlob(hash)
if (!blob) {
logger.error({ filePath }, 'Blob not found in blob store')
continue
}
content = await blobStore.getStream(hash)
}
archive.append(content, {
name: `${prefix}${filePath}`,
})
}
})
}
/**
*
* @param {string} historyId
* @return {Promise<number>}
*/
async function findStartVersionOfLatestChunk(historyId) {
const backend = chunkStore.getBackend(historyId)
const chunk = await backend.getLatestChunk(historyId, { readOnly: true })
if (!chunk) {
throw new Error('Latest chunk could not be loaded')
}
return chunk.startVersion
}
/**
* Restore a project from the latest snapshot
*
* There is an assumption that the database backup has been restored.
*
* @param {Archiver} archive
* @param {string} historyId
* @param {boolean} [useBackupGlobalBlobs]
* @return {Promise<void>}
*/
export async function archiveLatestChunk(
archive,
historyId,
useBackupGlobalBlobs = false
) {
logger.info({ historyId, useBackupGlobalBlobs }, 'Archiving latest chunk')
const projectCache = await getProjectPersistor(historyId)
const startVersion = await findStartVersionOfLatestChunk(historyId)
const backedUpChunkRaw = await loadChunk(
historyId,
startVersion,
projectCache
)
const backedUpChunk = History.fromRaw(backedUpChunkRaw)
await addChunkToArchive(backedUpChunk, archive, projectCache, historyId, {
useBackupGlobalBlobs,
})
return archive
}
/**
* Fetches all raw blobs from the project and adds them to the archive.
*
* @param {string} historyId
* @param {Archiver} archive
* @param {CachedPerProjectEncryptedS3Persistor} projectCache
* @return {Promise<void>}
*/
async function addRawBlobsToArchive(historyId, archive, projectCache) {
const key = projectKey.format(historyId)
const { contents } = await projectCache.listDirectory(projectBlobsBucket, key)
for (const blobRecord of contents) {
if (!blobRecord.Key) {
logger.debug({ blobRecord }, 'no key')
continue
}
const blobKey = blobRecord.Key
try {
const stream = await projectCache.getObjectStream(
projectBlobsBucket,
blobKey,
{ autoGunzip: true }
)
archive.append(stream, {
name: path.join(historyId, 'blobs', blobKey),
})
} catch (err) {
logger.warn(
{ err, path: blobRecord.Key },
'Failed to append blob to archive'
)
}
}
}
/**
* Download raw files from the backup.
*
* This can work without the database being backed up.
*
* It will split the project into chunks per directory and download the blobs alongside the chunk.
*
* @param {Archiver} archive
* @param {string} historyId
* @param {boolean} [useBackupGlobalBlobs]
* @return {Promise<void>}
*/
export async function archiveRawProject(
archive,
historyId,
useBackupGlobalBlobs = false
) {
const projectCache = await getProjectPersistor(historyId)
const { contents: chunks } = await projectCache.listDirectory(
chunksBucket,
projectKey.format(historyId)
)
if (chunks.length === 0) {
throw new Error('No chunks found')
}
for (const chunkRecord of chunks) {
if (!chunkRecord.Key) {
logger.debug({ chunkRecord }, 'no key')
continue
}
const chunkId = chunkRecord.Key.split('/').pop()
logger.debug({ chunkId, key: chunkRecord.Key }, 'Processing chunk')
const { buffer } = await loadChunkByKey(projectCache, chunkRecord.Key)
archive.append(buffer, {
name: `${historyId}/chunks/${chunkId}/chunk.json`,
})
}
await addRawBlobsToArchive(historyId, archive, projectCache)
}
export class BackupPersistorError extends OError {}