overleaf-cep/services/history-v1/storage/lib/persist_buffer.js
Eric Mc Sween 735cc2272f Merge pull request #26505 from overleaf/em-persist-buffer-batch
Persist Redis buffer in batches

GitOrigin-RevId: 1de22807dae3554b3274ec103783b0868b1439d9
2025-06-30 08:05:25 +00:00

237 lines
6.8 KiB
JavaScript

// @ts-check
'use strict'
const logger = require('@overleaf/logger')
const metrics = require('@overleaf/metrics')
const OError = require('@overleaf/o-error')
const assert = require('./assert')
const chunkStore = require('./chunk_store')
const { BlobStore } = require('./blob_store')
const BatchBlobStore = require('./batch_blob_store')
const persistChanges = require('./persist_changes')
const resyncProject = require('./resync_project')
const redisBackend = require('./chunk_store/redis')
const PERSIST_BATCH_SIZE = 50
/**
* Persist the changes from Redis buffer to the main storage
*
* Algorithm Outline:
* 1. Get the latest chunk's endVersion from the database
* 2. Get non-persisted changes from Redis that are after this endVersion.
* 3. If no such changes, exit.
* 4. Load file blobs for these Redis changes.
* 5. Run the persistChanges() algorithm to store these changes into a new chunk(s) in GCS.
* - This must not decrease the endVersion. If changes were processed, it must advance.
* 6. Set the new persisted version (endVersion of the latest persisted chunk) in Redis.
*
* @param {string} projectId
* @param {Object} limits
* @throws {Error | OError} If a critical error occurs during persistence.
*/
async function persistBuffer(projectId, limits) {
assert.projectId(projectId)
logger.debug({ projectId }, 'starting persistBuffer operation')
// 1. Get the latest chunk's endVersion from GCS/main store
let endVersion
const latestChunkMetadata = await chunkStore.getLatestChunkMetadata(projectId)
if (latestChunkMetadata) {
endVersion = latestChunkMetadata.endVersion
} else {
endVersion = 0 // No chunks found, start from version 0
logger.debug({ projectId }, 'no existing chunks found in main storage')
}
const originalEndVersion = endVersion
logger.debug({ projectId, endVersion }, 'got latest persisted chunk')
// Process changes in batches
let numberOfChangesPersisted = 0
let currentChunk = null
let resyncNeeded = false
let resyncChangesWerePersisted = false
while (true) {
// 2. Get non-persisted changes from Redis
const changesToPersist = await redisBackend.getNonPersistedChanges(
projectId,
endVersion,
{ maxChanges: PERSIST_BATCH_SIZE }
)
if (changesToPersist.length === 0) {
break
}
logger.debug(
{
projectId,
endVersion,
count: changesToPersist.length,
},
'found changes in Redis to persist'
)
// 4. Load file blobs for these Redis changes. Errors will propagate.
const blobStore = new BlobStore(projectId)
const batchBlobStore = new BatchBlobStore(blobStore)
const blobHashes = new Set()
for (const change of changesToPersist) {
change.findBlobHashes(blobHashes)
}
if (blobHashes.size > 0) {
await batchBlobStore.preload(Array.from(blobHashes))
}
for (const change of changesToPersist) {
await change.loadFiles('lazy', blobStore)
}
// 5. Run the persistChanges() algorithm. Errors will propagate.
logger.debug(
{
projectId,
endVersion,
changeCount: changesToPersist.length,
},
'calling persistChanges'
)
const persistResult = await persistChanges(
projectId,
changesToPersist,
limits,
endVersion
)
if (!persistResult || !persistResult.currentChunk) {
metrics.inc('persist_buffer', 1, { status: 'no-chunk-error' })
throw new OError(
'persistChanges did not produce a new chunk for non-empty changes',
{
projectId,
endVersion,
changeCount: changesToPersist.length,
}
)
}
currentChunk = persistResult.currentChunk
const newEndVersion = currentChunk.getEndVersion()
if (newEndVersion <= endVersion) {
metrics.inc('persist_buffer', 1, { status: 'chunk-version-mismatch' })
throw new OError(
'persisted chunk endVersion must be greater than current persisted chunk end version for non-empty changes',
{
projectId,
newEndVersion,
endVersion,
changeCount: changesToPersist.length,
}
)
}
logger.debug(
{
projectId,
oldVersion: endVersion,
newVersion: newEndVersion,
},
'successfully persisted changes from Redis to main storage'
)
// 6. Set the persisted version in Redis. Errors will propagate.
const status = await redisBackend.setPersistedVersion(
projectId,
newEndVersion
)
if (status !== 'ok') {
metrics.inc('persist_buffer', 1, { status: 'error-on-persisted-version' })
throw new OError('failed to update persisted version in Redis', {
projectId,
newEndVersion,
status,
})
}
logger.debug(
{ projectId, newEndVersion },
'updated persisted version in Redis'
)
numberOfChangesPersisted += persistResult.numberOfChangesPersisted
endVersion = newEndVersion
// Check if a resync might be needed
if (persistResult.resyncNeeded) {
resyncNeeded = true
}
if (
changesToPersist.some(
change => change.getOrigin()?.getKind() === 'history-resync'
)
) {
resyncChangesWerePersisted = true
}
if (persistResult.numberOfChangesPersisted < PERSIST_BATCH_SIZE) {
// We reached the end of available changes
break
}
}
if (numberOfChangesPersisted === 0) {
logger.debug(
{ projectId, endVersion },
'no new changes in Redis buffer to persist'
)
metrics.inc('persist_buffer', 1, { status: 'no_changes' })
// No changes to persist, update the persisted version in Redis
// to match the current endVersion. This shouldn't be needed
// unless a worker failed to update the persisted version.
await redisBackend.setPersistedVersion(projectId, endVersion)
} else {
logger.debug(
{ projectId, finalPersistedVersion: endVersion },
'persistBuffer operation completed successfully'
)
metrics.inc('persist_buffer', 1, { status: 'persisted' })
}
if (limits.autoResync && resyncNeeded) {
if (resyncChangesWerePersisted) {
// To avoid an infinite loop, do not resync if the current batch of
// changes contains a history resync.
logger.warn(
{ projectId },
'content hash validation failed while persisting a history resync, skipping additional resync'
)
} else {
const backend = chunkStore.getBackend(projectId)
const mongoProjectId =
await backend.resolveHistoryIdToMongoProjectId(projectId)
await resyncProject(mongoProjectId)
}
}
if (currentChunk == null) {
const { chunk } = await chunkStore.loadByChunkRecord(
projectId,
latestChunkMetadata
)
currentChunk = chunk
}
return {
numberOfChangesPersisted,
originalEndVersion,
currentChunk,
resyncNeeded,
}
}
module.exports = persistBuffer