mirror of
https://github.com/yu-i-i/overleaf-cep.git
synced 2025-07-24 17:00:06 +02:00

Persist Redis buffer in batches GitOrigin-RevId: 1de22807dae3554b3274ec103783b0868b1439d9
237 lines
6.8 KiB
JavaScript
237 lines
6.8 KiB
JavaScript
// @ts-check
|
|
'use strict'
|
|
|
|
const logger = require('@overleaf/logger')
|
|
const metrics = require('@overleaf/metrics')
|
|
const OError = require('@overleaf/o-error')
|
|
const assert = require('./assert')
|
|
const chunkStore = require('./chunk_store')
|
|
const { BlobStore } = require('./blob_store')
|
|
const BatchBlobStore = require('./batch_blob_store')
|
|
const persistChanges = require('./persist_changes')
|
|
const resyncProject = require('./resync_project')
|
|
const redisBackend = require('./chunk_store/redis')
|
|
|
|
const PERSIST_BATCH_SIZE = 50
|
|
|
|
/**
|
|
* Persist the changes from Redis buffer to the main storage
|
|
*
|
|
* Algorithm Outline:
|
|
* 1. Get the latest chunk's endVersion from the database
|
|
* 2. Get non-persisted changes from Redis that are after this endVersion.
|
|
* 3. If no such changes, exit.
|
|
* 4. Load file blobs for these Redis changes.
|
|
* 5. Run the persistChanges() algorithm to store these changes into a new chunk(s) in GCS.
|
|
* - This must not decrease the endVersion. If changes were processed, it must advance.
|
|
* 6. Set the new persisted version (endVersion of the latest persisted chunk) in Redis.
|
|
*
|
|
* @param {string} projectId
|
|
* @param {Object} limits
|
|
* @throws {Error | OError} If a critical error occurs during persistence.
|
|
*/
|
|
async function persistBuffer(projectId, limits) {
|
|
assert.projectId(projectId)
|
|
logger.debug({ projectId }, 'starting persistBuffer operation')
|
|
|
|
// 1. Get the latest chunk's endVersion from GCS/main store
|
|
let endVersion
|
|
const latestChunkMetadata = await chunkStore.getLatestChunkMetadata(projectId)
|
|
|
|
if (latestChunkMetadata) {
|
|
endVersion = latestChunkMetadata.endVersion
|
|
} else {
|
|
endVersion = 0 // No chunks found, start from version 0
|
|
logger.debug({ projectId }, 'no existing chunks found in main storage')
|
|
}
|
|
const originalEndVersion = endVersion
|
|
|
|
logger.debug({ projectId, endVersion }, 'got latest persisted chunk')
|
|
|
|
// Process changes in batches
|
|
let numberOfChangesPersisted = 0
|
|
let currentChunk = null
|
|
let resyncNeeded = false
|
|
let resyncChangesWerePersisted = false
|
|
while (true) {
|
|
// 2. Get non-persisted changes from Redis
|
|
const changesToPersist = await redisBackend.getNonPersistedChanges(
|
|
projectId,
|
|
endVersion,
|
|
{ maxChanges: PERSIST_BATCH_SIZE }
|
|
)
|
|
|
|
if (changesToPersist.length === 0) {
|
|
break
|
|
}
|
|
|
|
logger.debug(
|
|
{
|
|
projectId,
|
|
endVersion,
|
|
count: changesToPersist.length,
|
|
},
|
|
'found changes in Redis to persist'
|
|
)
|
|
|
|
// 4. Load file blobs for these Redis changes. Errors will propagate.
|
|
const blobStore = new BlobStore(projectId)
|
|
const batchBlobStore = new BatchBlobStore(blobStore)
|
|
|
|
const blobHashes = new Set()
|
|
for (const change of changesToPersist) {
|
|
change.findBlobHashes(blobHashes)
|
|
}
|
|
if (blobHashes.size > 0) {
|
|
await batchBlobStore.preload(Array.from(blobHashes))
|
|
}
|
|
for (const change of changesToPersist) {
|
|
await change.loadFiles('lazy', blobStore)
|
|
}
|
|
|
|
// 5. Run the persistChanges() algorithm. Errors will propagate.
|
|
logger.debug(
|
|
{
|
|
projectId,
|
|
endVersion,
|
|
changeCount: changesToPersist.length,
|
|
},
|
|
'calling persistChanges'
|
|
)
|
|
|
|
const persistResult = await persistChanges(
|
|
projectId,
|
|
changesToPersist,
|
|
limits,
|
|
endVersion
|
|
)
|
|
|
|
if (!persistResult || !persistResult.currentChunk) {
|
|
metrics.inc('persist_buffer', 1, { status: 'no-chunk-error' })
|
|
throw new OError(
|
|
'persistChanges did not produce a new chunk for non-empty changes',
|
|
{
|
|
projectId,
|
|
endVersion,
|
|
changeCount: changesToPersist.length,
|
|
}
|
|
)
|
|
}
|
|
|
|
currentChunk = persistResult.currentChunk
|
|
const newEndVersion = currentChunk.getEndVersion()
|
|
|
|
if (newEndVersion <= endVersion) {
|
|
metrics.inc('persist_buffer', 1, { status: 'chunk-version-mismatch' })
|
|
throw new OError(
|
|
'persisted chunk endVersion must be greater than current persisted chunk end version for non-empty changes',
|
|
{
|
|
projectId,
|
|
newEndVersion,
|
|
endVersion,
|
|
changeCount: changesToPersist.length,
|
|
}
|
|
)
|
|
}
|
|
|
|
logger.debug(
|
|
{
|
|
projectId,
|
|
oldVersion: endVersion,
|
|
newVersion: newEndVersion,
|
|
},
|
|
'successfully persisted changes from Redis to main storage'
|
|
)
|
|
|
|
// 6. Set the persisted version in Redis. Errors will propagate.
|
|
const status = await redisBackend.setPersistedVersion(
|
|
projectId,
|
|
newEndVersion
|
|
)
|
|
|
|
if (status !== 'ok') {
|
|
metrics.inc('persist_buffer', 1, { status: 'error-on-persisted-version' })
|
|
throw new OError('failed to update persisted version in Redis', {
|
|
projectId,
|
|
newEndVersion,
|
|
status,
|
|
})
|
|
}
|
|
|
|
logger.debug(
|
|
{ projectId, newEndVersion },
|
|
'updated persisted version in Redis'
|
|
)
|
|
numberOfChangesPersisted += persistResult.numberOfChangesPersisted
|
|
endVersion = newEndVersion
|
|
|
|
// Check if a resync might be needed
|
|
if (persistResult.resyncNeeded) {
|
|
resyncNeeded = true
|
|
}
|
|
|
|
if (
|
|
changesToPersist.some(
|
|
change => change.getOrigin()?.getKind() === 'history-resync'
|
|
)
|
|
) {
|
|
resyncChangesWerePersisted = true
|
|
}
|
|
|
|
if (persistResult.numberOfChangesPersisted < PERSIST_BATCH_SIZE) {
|
|
// We reached the end of available changes
|
|
break
|
|
}
|
|
}
|
|
|
|
if (numberOfChangesPersisted === 0) {
|
|
logger.debug(
|
|
{ projectId, endVersion },
|
|
'no new changes in Redis buffer to persist'
|
|
)
|
|
metrics.inc('persist_buffer', 1, { status: 'no_changes' })
|
|
// No changes to persist, update the persisted version in Redis
|
|
// to match the current endVersion. This shouldn't be needed
|
|
// unless a worker failed to update the persisted version.
|
|
await redisBackend.setPersistedVersion(projectId, endVersion)
|
|
} else {
|
|
logger.debug(
|
|
{ projectId, finalPersistedVersion: endVersion },
|
|
'persistBuffer operation completed successfully'
|
|
)
|
|
metrics.inc('persist_buffer', 1, { status: 'persisted' })
|
|
}
|
|
|
|
if (limits.autoResync && resyncNeeded) {
|
|
if (resyncChangesWerePersisted) {
|
|
// To avoid an infinite loop, do not resync if the current batch of
|
|
// changes contains a history resync.
|
|
logger.warn(
|
|
{ projectId },
|
|
'content hash validation failed while persisting a history resync, skipping additional resync'
|
|
)
|
|
} else {
|
|
const backend = chunkStore.getBackend(projectId)
|
|
const mongoProjectId =
|
|
await backend.resolveHistoryIdToMongoProjectId(projectId)
|
|
await resyncProject(mongoProjectId)
|
|
}
|
|
}
|
|
|
|
if (currentChunk == null) {
|
|
const { chunk } = await chunkStore.loadByChunkRecord(
|
|
projectId,
|
|
latestChunkMetadata
|
|
)
|
|
currentChunk = chunk
|
|
}
|
|
|
|
return {
|
|
numberOfChangesPersisted,
|
|
originalEndVersion,
|
|
currentChunk,
|
|
resyncNeeded,
|
|
}
|
|
}
|
|
|
|
module.exports = persistBuffer
|