fix: Collaboration debounce shared between docs (#3401)
* fix: Collaboration debounce shared between docs * Rename, Tracing -> Metrics * Add tracing * tsc * fix: Lock document row when loading document in collaboration service incase state needs writing * fix: Incorrect service name regression
This commit is contained in:
@@ -1,10 +1,14 @@
|
||||
import { onAuthenticatePayload } from "@hocuspocus/server";
|
||||
import { onAuthenticatePayload, Extension } from "@hocuspocus/server";
|
||||
import { APM } from "@server/logging/tracing";
|
||||
import Document from "@server/models/Document";
|
||||
import { can } from "@server/policies";
|
||||
import { getUserForJWT } from "@server/utils/jwt";
|
||||
import { AuthenticationError } from "../errors";
|
||||
|
||||
export default class Authentication {
|
||||
@APM.trace({
|
||||
spanName: "authentication",
|
||||
})
|
||||
export default class AuthenticationExtension implements Extension {
|
||||
async onAuthenticate({
|
||||
connection,
|
||||
token,
|
||||
@@ -2,13 +2,14 @@ import {
|
||||
onConnectPayload,
|
||||
onDisconnectPayload,
|
||||
onLoadDocumentPayload,
|
||||
Extension,
|
||||
} from "@hocuspocus/server";
|
||||
import Logger from "@server/logging/logger";
|
||||
|
||||
export default class CollaborationLogger {
|
||||
export default class LoggerExtension implements Extension {
|
||||
async onLoadDocument(data: onLoadDocumentPayload) {
|
||||
Logger.info("hocuspocus", `Loaded document "${data.documentName}"`, {
|
||||
userId: data.context.user.id,
|
||||
userId: data.context.user?.id,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -17,6 +18,8 @@ export default class CollaborationLogger {
|
||||
}
|
||||
|
||||
async onDisconnect(data: onDisconnectPayload) {
|
||||
Logger.info("hocuspocus", `Connection to "${data.documentName}" closed `);
|
||||
Logger.info("hocuspocus", `Closed connection to "${data.documentName}"`, {
|
||||
userId: data.context.user?.id,
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -3,11 +3,12 @@ import {
|
||||
onConnectPayload,
|
||||
onDisconnectPayload,
|
||||
onLoadDocumentPayload,
|
||||
Extension,
|
||||
} from "@hocuspocus/server";
|
||||
import Metrics from "@server/logging/metrics";
|
||||
|
||||
export default class Tracing {
|
||||
onLoadDocument({ documentName, instance }: onLoadDocumentPayload) {
|
||||
export default class MetricsExtension implements Extension {
|
||||
async onLoadDocument({ documentName, instance }: onLoadDocumentPayload) {
|
||||
Metrics.increment("collaboration.load_document", {
|
||||
documentName,
|
||||
});
|
||||
@@ -23,7 +24,7 @@ export default class Tracing {
|
||||
});
|
||||
}
|
||||
|
||||
onConnect({ documentName, instance }: onConnectPayload) {
|
||||
async onConnect({ documentName, instance }: onConnectPayload) {
|
||||
Metrics.increment("collaboration.connect", {
|
||||
documentName,
|
||||
});
|
||||
@@ -33,7 +34,7 @@ export default class Tracing {
|
||||
);
|
||||
}
|
||||
|
||||
onDisconnect({ documentName, instance }: onDisconnectPayload) {
|
||||
async onDisconnect({ documentName, instance }: onDisconnectPayload) {
|
||||
Metrics.increment("collaboration.disconnect", {
|
||||
documentName,
|
||||
});
|
||||
@@ -47,13 +48,13 @@ export default class Tracing {
|
||||
);
|
||||
}
|
||||
|
||||
onChange({ documentName }: onChangePayload) {
|
||||
async onStoreDocument({ documentName }: onChangePayload) {
|
||||
Metrics.increment("collaboration.change", {
|
||||
documentName,
|
||||
});
|
||||
}
|
||||
|
||||
onDestroy() {
|
||||
async onDestroy() {
|
||||
Metrics.gaugePerInstance("collaboration.connections_count", 0);
|
||||
Metrics.gaugePerInstance("collaboration.documents_count", 0);
|
||||
}
|
||||
86
server/collaboration/PersistenceExtension.ts
Normal file
86
server/collaboration/PersistenceExtension.ts
Normal file
@@ -0,0 +1,86 @@
|
||||
import {
|
||||
onStoreDocumentPayload,
|
||||
onLoadDocumentPayload,
|
||||
Extension,
|
||||
} from "@hocuspocus/server";
|
||||
import invariant from "invariant";
|
||||
import * as Y from "yjs";
|
||||
import { sequelize } from "@server/database/sequelize";
|
||||
import Logger from "@server/logging/logger";
|
||||
import { APM } from "@server/logging/tracing";
|
||||
import Document from "@server/models/Document";
|
||||
import documentUpdater from "../commands/documentUpdater";
|
||||
import markdownToYDoc from "./utils/markdownToYDoc";
|
||||
|
||||
@APM.trace({
|
||||
spanName: "persistence",
|
||||
})
|
||||
export default class PersistenceExtension implements Extension {
|
||||
async onLoadDocument({ documentName, ...data }: onLoadDocumentPayload) {
|
||||
const [, documentId] = documentName.split(".");
|
||||
const fieldName = "default";
|
||||
|
||||
// Check if the given field already exists in the given y-doc. This is import
|
||||
// so we don't import a document fresh if it exists already.
|
||||
if (!data.document.isEmpty(fieldName)) {
|
||||
return;
|
||||
}
|
||||
|
||||
return await sequelize.transaction(async (transaction) => {
|
||||
const document = await Document.scope("withState").findOne({
|
||||
transaction,
|
||||
lock: transaction.LOCK.UPDATE,
|
||||
where: {
|
||||
id: documentId,
|
||||
},
|
||||
});
|
||||
invariant(document, "Document not found");
|
||||
|
||||
if (document.state) {
|
||||
const ydoc = new Y.Doc();
|
||||
Logger.info("database", `Document ${documentId} is in database state`);
|
||||
Y.applyUpdate(ydoc, document.state);
|
||||
return ydoc;
|
||||
}
|
||||
|
||||
Logger.info(
|
||||
"database",
|
||||
`Document ${documentId} is not in state, creating from markdown`
|
||||
);
|
||||
const ydoc = markdownToYDoc(document.text, fieldName);
|
||||
const state = Y.encodeStateAsUpdate(ydoc);
|
||||
await document.update(
|
||||
{
|
||||
state: Buffer.from(state),
|
||||
},
|
||||
{
|
||||
hooks: false,
|
||||
transaction,
|
||||
}
|
||||
);
|
||||
return ydoc;
|
||||
});
|
||||
}
|
||||
|
||||
async onStoreDocument({
|
||||
document,
|
||||
context,
|
||||
documentName,
|
||||
}: onStoreDocumentPayload) {
|
||||
const [, documentId] = documentName.split(".");
|
||||
Logger.info("database", `Persisting ${documentId}`);
|
||||
|
||||
try {
|
||||
await documentUpdater({
|
||||
documentId,
|
||||
ydoc: document,
|
||||
userId: context.user?.id,
|
||||
});
|
||||
} catch (err) {
|
||||
Logger.error("Unable to persist document", err, {
|
||||
documentId,
|
||||
userId: context.user?.id,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,77 +0,0 @@
|
||||
import { onChangePayload, onLoadDocumentPayload } from "@hocuspocus/server";
|
||||
import invariant from "invariant";
|
||||
import { debounce } from "lodash";
|
||||
import * as Y from "yjs";
|
||||
import Logger from "@server/logging/logger";
|
||||
import Document from "@server/models/Document";
|
||||
import documentUpdater from "../commands/documentUpdater";
|
||||
import markdownToYDoc from "./utils/markdownToYDoc";
|
||||
|
||||
const DELAY = 3000;
|
||||
|
||||
export default class Persistence {
|
||||
async onLoadDocument({ documentName, ...data }: onLoadDocumentPayload) {
|
||||
const [, documentId] = documentName.split(".");
|
||||
const fieldName = "default";
|
||||
|
||||
// Check if the given field already exists in the given y-doc. This is import
|
||||
// so we don't import a document fresh if it exists already.
|
||||
if (!data.document.isEmpty(fieldName)) {
|
||||
return;
|
||||
}
|
||||
|
||||
const document = await Document.scope("withState").findOne({
|
||||
where: {
|
||||
id: documentId,
|
||||
},
|
||||
});
|
||||
invariant(document, "Document not found");
|
||||
|
||||
if (document.state) {
|
||||
const ydoc = new Y.Doc();
|
||||
Logger.info("database", `Document ${documentId} is in database state`);
|
||||
Y.applyUpdate(ydoc, document.state);
|
||||
return ydoc;
|
||||
}
|
||||
|
||||
Logger.info(
|
||||
"database",
|
||||
`Document ${documentId} is not in state, creating from markdown`
|
||||
);
|
||||
const ydoc = markdownToYDoc(document.text, fieldName);
|
||||
const state = Y.encodeStateAsUpdate(ydoc);
|
||||
await document.update(
|
||||
{
|
||||
state: Buffer.from(state),
|
||||
},
|
||||
{
|
||||
hooks: false,
|
||||
}
|
||||
);
|
||||
return ydoc;
|
||||
}
|
||||
|
||||
onChange = debounce(
|
||||
async ({ document, context, documentName }: onChangePayload) => {
|
||||
const [, documentId] = documentName.split(".");
|
||||
Logger.info("database", `Persisting ${documentId}`);
|
||||
|
||||
try {
|
||||
await documentUpdater({
|
||||
documentId,
|
||||
ydoc: document,
|
||||
userId: context.user?.id,
|
||||
});
|
||||
} catch (err) {
|
||||
Logger.error("Unable to persist document", err, {
|
||||
documentId,
|
||||
userId: context.user?.id,
|
||||
});
|
||||
}
|
||||
},
|
||||
DELAY,
|
||||
{
|
||||
maxWait: DELAY * 3,
|
||||
}
|
||||
);
|
||||
}
|
||||
Reference in New Issue
Block a user