chore: Move initial avatar upload to background worker (#3727)
* chore: Async user avatar upload processor * chore: Async team avatar upload * Refactor to task for retries * Docs Include avatarUrl in task props to prevent race condition Remove transaction around upload fetch request
This commit is contained in:
@@ -61,6 +61,7 @@ async function accountProvisioner({
|
||||
subdomain: teamParams.subdomain,
|
||||
avatarUrl: teamParams.avatarUrl,
|
||||
authenticationProvider: authenticationProviderParams,
|
||||
ip,
|
||||
});
|
||||
} catch (err) {
|
||||
throw AuthenticationError(err.message);
|
||||
|
||||
@@ -7,6 +7,8 @@ import teamCreator from "./teamCreator";
|
||||
beforeEach(() => flushdb());
|
||||
|
||||
describe("teamCreator", () => {
|
||||
const ip = "127.0.0.1";
|
||||
|
||||
it("should create team and authentication provider", async () => {
|
||||
env.DEPLOYMENT = "hosted";
|
||||
const result = await teamCreator({
|
||||
@@ -17,6 +19,7 @@ describe("teamCreator", () => {
|
||||
name: "google",
|
||||
providerId: "example.com",
|
||||
},
|
||||
ip,
|
||||
});
|
||||
const { team, authenticationProvider, isNewTeam } = result;
|
||||
expect(authenticationProvider.name).toEqual("google");
|
||||
@@ -40,6 +43,7 @@ describe("teamCreator", () => {
|
||||
name: "google",
|
||||
providerId: "example.com",
|
||||
},
|
||||
ip,
|
||||
});
|
||||
|
||||
expect(result.team.subdomain).toEqual("myteam1");
|
||||
@@ -62,6 +66,7 @@ describe("teamCreator", () => {
|
||||
name: "google",
|
||||
providerId: "example.com",
|
||||
},
|
||||
ip,
|
||||
});
|
||||
|
||||
expect(result.team.subdomain).toEqual("myteam2");
|
||||
@@ -82,6 +87,7 @@ describe("teamCreator", () => {
|
||||
name: "google",
|
||||
providerId: "example.com",
|
||||
},
|
||||
ip,
|
||||
});
|
||||
} catch (err) {
|
||||
error = err;
|
||||
@@ -109,6 +115,7 @@ describe("teamCreator", () => {
|
||||
name: "google",
|
||||
providerId: "allowed-domain.com",
|
||||
},
|
||||
ip,
|
||||
});
|
||||
const { team, authenticationProvider, isNewTeam } = result;
|
||||
expect(team.id).toEqual(existing.id);
|
||||
@@ -142,6 +149,7 @@ describe("teamCreator", () => {
|
||||
name: "google",
|
||||
providerId: "other-domain.com",
|
||||
},
|
||||
ip,
|
||||
});
|
||||
} catch (err) {
|
||||
error = err;
|
||||
@@ -164,6 +172,7 @@ describe("teamCreator", () => {
|
||||
name: "Updated name",
|
||||
subdomain: "example",
|
||||
authenticationProvider,
|
||||
ip,
|
||||
});
|
||||
const { team, isNewTeam } = result;
|
||||
expect(team.id).toEqual(existing.id);
|
||||
|
||||
@@ -3,7 +3,7 @@ import env from "@server/env";
|
||||
import { DomainNotAllowedError, MaximumTeamsError } from "@server/errors";
|
||||
import Logger from "@server/logging/Logger";
|
||||
import { APM } from "@server/logging/tracing";
|
||||
import { Team, AuthenticationProvider } from "@server/models";
|
||||
import { Team, AuthenticationProvider, Event } from "@server/models";
|
||||
import { generateAvatarUrl } from "@server/utils/avatars";
|
||||
|
||||
type TeamCreatorResult = {
|
||||
@@ -21,6 +21,7 @@ type Props = {
|
||||
name: string;
|
||||
providerId: string;
|
||||
};
|
||||
ip: string;
|
||||
};
|
||||
|
||||
async function teamCreator({
|
||||
@@ -29,6 +30,7 @@ async function teamCreator({
|
||||
subdomain,
|
||||
avatarUrl,
|
||||
authenticationProvider,
|
||||
ip,
|
||||
}: Props): Promise<TeamCreatorResult> {
|
||||
let authP = await AuthenticationProvider.findOne({
|
||||
where: authenticationProvider,
|
||||
@@ -90,7 +92,7 @@ async function teamCreator({
|
||||
}
|
||||
|
||||
const team = await sequelize.transaction(async (transaction) => {
|
||||
return Team.create(
|
||||
const team = await Team.create(
|
||||
{
|
||||
name,
|
||||
avatarUrl,
|
||||
@@ -101,6 +103,19 @@ async function teamCreator({
|
||||
transaction,
|
||||
}
|
||||
);
|
||||
|
||||
await Event.create(
|
||||
{
|
||||
name: "teams.create",
|
||||
teamId: team.id,
|
||||
ip,
|
||||
},
|
||||
{
|
||||
transaction,
|
||||
}
|
||||
);
|
||||
|
||||
return team;
|
||||
});
|
||||
|
||||
// Note provisioning the subdomain is done outside of the transaction as
|
||||
|
||||
@@ -102,7 +102,8 @@ export default async function userCreator({
|
||||
});
|
||||
|
||||
// We have an existing invite for his user, so we need to update it with our
|
||||
// new details and link up the authentication method
|
||||
// new details, link up the authentication method, and count this as a new
|
||||
// user creation.
|
||||
if (invite && !invite.authentications.length) {
|
||||
const auth = await sequelize.transaction(async (transaction) => {
|
||||
await invite.update(
|
||||
@@ -114,6 +115,21 @@ export default async function userCreator({
|
||||
transaction,
|
||||
}
|
||||
);
|
||||
await Event.create(
|
||||
{
|
||||
name: "users.create",
|
||||
actorId: invite.id,
|
||||
userId: invite.id,
|
||||
teamId: invite.teamId,
|
||||
data: {
|
||||
name,
|
||||
},
|
||||
ip,
|
||||
},
|
||||
{
|
||||
transaction,
|
||||
}
|
||||
);
|
||||
return await invite.$create<UserAuthentication>(
|
||||
"authentication",
|
||||
authentication,
|
||||
|
||||
@@ -156,6 +156,7 @@ class Event extends IdModel {
|
||||
"shares.create",
|
||||
"shares.update",
|
||||
"shares.revoke",
|
||||
"teams.create",
|
||||
"teams.update",
|
||||
"users.create",
|
||||
"users.update",
|
||||
|
||||
@@ -11,18 +11,14 @@ import {
|
||||
Table,
|
||||
Unique,
|
||||
IsIn,
|
||||
BeforeSave,
|
||||
HasMany,
|
||||
Scopes,
|
||||
Is,
|
||||
DataType,
|
||||
} from "sequelize-typescript";
|
||||
import { v4 as uuidv4 } from "uuid";
|
||||
import { getBaseDomain, RESERVED_SUBDOMAINS } from "@shared/utils/domains";
|
||||
import env from "@server/env";
|
||||
import Logger from "@server/logging/Logger";
|
||||
import { generateAvatarUrl } from "@server/utils/avatars";
|
||||
import { publicS3Endpoint, uploadToS3FromUrl } from "@server/utils/s3";
|
||||
import AuthenticationProvider from "./AuthenticationProvider";
|
||||
import Collection from "./Collection";
|
||||
import Document from "./Document";
|
||||
@@ -242,34 +238,6 @@ class Team extends ParanoidModel {
|
||||
|
||||
@HasMany(() => TeamDomain)
|
||||
allowedDomains: TeamDomain[];
|
||||
|
||||
// hooks
|
||||
@BeforeSave
|
||||
static uploadAvatar = async (model: Team) => {
|
||||
const endpoint = publicS3Endpoint();
|
||||
const { avatarUrl } = model;
|
||||
|
||||
if (
|
||||
avatarUrl &&
|
||||
!avatarUrl.startsWith("/api") &&
|
||||
!avatarUrl.startsWith(endpoint)
|
||||
) {
|
||||
try {
|
||||
const newUrl = await uploadToS3FromUrl(
|
||||
avatarUrl,
|
||||
`avatars/${model.id}/${uuidv4()}`,
|
||||
"public-read"
|
||||
);
|
||||
if (newUrl) {
|
||||
model.avatarUrl = newUrl;
|
||||
}
|
||||
} catch (err) {
|
||||
Logger.error("Error uploading avatar to S3", err, {
|
||||
url: avatarUrl,
|
||||
});
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
export default Team;
|
||||
|
||||
@@ -10,7 +10,6 @@ import {
|
||||
Default,
|
||||
IsIn,
|
||||
BeforeDestroy,
|
||||
BeforeSave,
|
||||
BeforeCreate,
|
||||
AfterCreate,
|
||||
BelongsTo,
|
||||
@@ -19,12 +18,9 @@ import {
|
||||
HasMany,
|
||||
Scopes,
|
||||
} from "sequelize-typescript";
|
||||
import { v4 as uuidv4 } from "uuid";
|
||||
import { languages } from "@shared/i18n";
|
||||
import { stringToColor } from "@shared/utils/color";
|
||||
import env from "@server/env";
|
||||
import Logger from "@server/logging/Logger";
|
||||
import { publicS3Endpoint, uploadToS3FromUrl } from "@server/utils/s3";
|
||||
import { ValidationError } from "../errors";
|
||||
import ApiKey from "./ApiKey";
|
||||
import Collection from "./Collection";
|
||||
@@ -463,34 +459,6 @@ class User extends ParanoidModel {
|
||||
});
|
||||
};
|
||||
|
||||
@BeforeSave
|
||||
static uploadAvatar = async (model: User) => {
|
||||
const endpoint = publicS3Endpoint();
|
||||
const { avatarUrl } = model;
|
||||
|
||||
if (
|
||||
avatarUrl &&
|
||||
!avatarUrl.startsWith("/api") &&
|
||||
!avatarUrl.startsWith(endpoint) &&
|
||||
!avatarUrl.startsWith(env.DEFAULT_AVATAR_HOST)
|
||||
) {
|
||||
try {
|
||||
const newUrl = await uploadToS3FromUrl(
|
||||
avatarUrl,
|
||||
`avatars/${model.id}/${uuidv4()}`,
|
||||
"public-read"
|
||||
);
|
||||
if (newUrl) {
|
||||
model.avatarUrl = newUrl;
|
||||
}
|
||||
} catch (err) {
|
||||
Logger.error("Couldn't upload user avatar image to S3", err, {
|
||||
url: avatarUrl,
|
||||
});
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
@BeforeCreate
|
||||
static setRandomJwtSecret = (model: User) => {
|
||||
model.jwtSecret = crypto.randomBytes(64).toString("hex");
|
||||
|
||||
40
server/queues/processors/AvatarProcessor.ts
Normal file
40
server/queues/processors/AvatarProcessor.ts
Normal file
@@ -0,0 +1,40 @@
|
||||
import { Team, User } from "@server/models";
|
||||
import { Event, TeamEvent, UserEvent } from "@server/types";
|
||||
import UploadTeamAvatarTask from "../tasks/UploadTeamAvatarTask";
|
||||
import UploadUserAvatarTask from "../tasks/UploadUserAvatarTask";
|
||||
import BaseProcessor from "./BaseProcessor";
|
||||
|
||||
export default class AvatarProcessor extends BaseProcessor {
|
||||
static applicableEvents: Event["name"][] = ["users.create", "teams.create"];
|
||||
|
||||
async perform(event: UserEvent | TeamEvent) {
|
||||
// The uploads are performed in a separate task to allow for retrying in the
|
||||
// case of failures as it involves network calls to third party services.
|
||||
|
||||
if (event.name === "users.create") {
|
||||
const user = await User.findByPk(event.userId, {
|
||||
rejectOnEmpty: true,
|
||||
});
|
||||
|
||||
if (user.avatarUrl) {
|
||||
await UploadUserAvatarTask.schedule({
|
||||
userId: event.userId,
|
||||
avatarUrl: user.avatarUrl,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
if (event.name === "teams.create") {
|
||||
const team = await Team.findByPk(event.teamId, {
|
||||
rejectOnEmpty: true,
|
||||
});
|
||||
|
||||
if (team.avatarUrl) {
|
||||
await UploadTeamAvatarTask.schedule({
|
||||
teamId: event.teamId,
|
||||
avatarUrl: team.avatarUrl,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -157,6 +157,9 @@ export default class DeliverWebhookTask extends BaseTask<Props> {
|
||||
case "integrations.update":
|
||||
await this.handleIntegrationEvent(subscription, event);
|
||||
return;
|
||||
case "teams.create":
|
||||
// Ignored
|
||||
return;
|
||||
case "teams.update":
|
||||
await this.handleTeamEvent(subscription, event);
|
||||
return;
|
||||
|
||||
40
server/queues/tasks/UploadTeamAvatarTask.ts
Normal file
40
server/queues/tasks/UploadTeamAvatarTask.ts
Normal file
@@ -0,0 +1,40 @@
|
||||
import { v4 as uuidv4 } from "uuid";
|
||||
import { Team } from "@server/models";
|
||||
import { uploadToS3FromUrl } from "@server/utils/s3";
|
||||
import BaseTask, { TaskPriority } from "./BaseTask";
|
||||
|
||||
type Props = {
|
||||
/* The teamId to operate on */
|
||||
teamId: string;
|
||||
/* The original avatarUrl from the SSO provider */
|
||||
avatarUrl: string;
|
||||
};
|
||||
|
||||
/**
|
||||
* A task that uploads the provided avatarUrl to S3 storage and updates the
|
||||
* team's record with the new url.
|
||||
*/
|
||||
export default class UploadTeamAvatarTask extends BaseTask<Props> {
|
||||
public async perform(props: Props) {
|
||||
const team = await Team.findByPk(props.teamId, {
|
||||
rejectOnEmpty: true,
|
||||
});
|
||||
|
||||
const avatarUrl = await uploadToS3FromUrl(
|
||||
props.avatarUrl,
|
||||
`avatars/${team.id}/${uuidv4()}`,
|
||||
"public-read"
|
||||
);
|
||||
|
||||
if (avatarUrl) {
|
||||
await team.update({ avatarUrl });
|
||||
}
|
||||
}
|
||||
|
||||
public get options() {
|
||||
return {
|
||||
attempts: 3,
|
||||
priority: TaskPriority.Normal,
|
||||
};
|
||||
}
|
||||
}
|
||||
40
server/queues/tasks/UploadUserAvatarTask.ts
Normal file
40
server/queues/tasks/UploadUserAvatarTask.ts
Normal file
@@ -0,0 +1,40 @@
|
||||
import { v4 as uuidv4 } from "uuid";
|
||||
import { User } from "@server/models";
|
||||
import { uploadToS3FromUrl } from "@server/utils/s3";
|
||||
import BaseTask, { TaskPriority } from "./BaseTask";
|
||||
|
||||
type Props = {
|
||||
/* The userId to operate on */
|
||||
userId: string;
|
||||
/* The original avatarUrl from the SSO provider */
|
||||
avatarUrl: string;
|
||||
};
|
||||
|
||||
/**
|
||||
* A task that uploads the provided avatarUrl to S3 storage and updates the
|
||||
* user's record with the new url.
|
||||
*/
|
||||
export default class UploadUserAvatarTask extends BaseTask<Props> {
|
||||
public async perform(props: Props) {
|
||||
const user = await User.findByPk(props.userId, {
|
||||
rejectOnEmpty: true,
|
||||
});
|
||||
|
||||
const avatarUrl = await uploadToS3FromUrl(
|
||||
props.avatarUrl,
|
||||
`avatars/${user.id}/${uuidv4()}`,
|
||||
"public-read"
|
||||
);
|
||||
|
||||
if (avatarUrl) {
|
||||
await user.update({ avatarUrl });
|
||||
}
|
||||
}
|
||||
|
||||
public get options() {
|
||||
return {
|
||||
attempts: 3,
|
||||
priority: TaskPriority.Normal,
|
||||
};
|
||||
}
|
||||
}
|
||||
@@ -77,6 +77,10 @@ if (env.GOOGLE_CLIENT_ID && env.GOOGLE_CLIENT_SECRET) {
|
||||
const subdomain = domain.split(".")[0];
|
||||
const teamName = capitalize(subdomain);
|
||||
|
||||
// Request a larger size profile picture than the default by tweaking
|
||||
// the query parameter.
|
||||
const avatarUrl = profile.picture.replace("=s96-c", "=s128-c");
|
||||
|
||||
result = await accountProvisioner({
|
||||
ip: req.ip,
|
||||
team: {
|
||||
@@ -87,7 +91,7 @@ if (env.GOOGLE_CLIENT_ID && env.GOOGLE_CLIENT_SECRET) {
|
||||
user: {
|
||||
email: profile.email,
|
||||
name: profile.displayName,
|
||||
avatarUrl: profile.picture,
|
||||
avatarUrl,
|
||||
},
|
||||
authenticationProvider: {
|
||||
name: providerName,
|
||||
|
||||
@@ -46,7 +46,6 @@ export type UserEvent = BaseEvent &
|
||||
(
|
||||
| {
|
||||
name:
|
||||
| "users.create"
|
||||
| "users.signin"
|
||||
| "users.signout"
|
||||
| "users.update"
|
||||
@@ -56,7 +55,7 @@ export type UserEvent = BaseEvent &
|
||||
userId: string;
|
||||
}
|
||||
| {
|
||||
name: "users.promote" | "users.demote";
|
||||
name: "users.create" | "users.promote" | "users.demote";
|
||||
userId: string;
|
||||
data: {
|
||||
name: string;
|
||||
@@ -217,7 +216,7 @@ export type IntegrationEvent = BaseEvent & {
|
||||
};
|
||||
|
||||
export type TeamEvent = BaseEvent & {
|
||||
name: "teams.update";
|
||||
name: "teams.create" | "teams.update";
|
||||
data: Partial<Team>;
|
||||
};
|
||||
|
||||
|
||||
@@ -4,6 +4,7 @@ import AWS from "aws-sdk";
|
||||
import { addHours, format } from "date-fns";
|
||||
import fetch from "fetch-with-proxy";
|
||||
import { v4 as uuidv4 } from "uuid";
|
||||
import env from "@server/env";
|
||||
import Logger from "@server/logging/Logger";
|
||||
|
||||
const AWS_S3_ACCELERATE_URL = process.env.AWS_S3_ACCELERATE_URL;
|
||||
@@ -169,15 +170,23 @@ export const uploadToS3FromBuffer = async (
|
||||
return `${endpoint}/${key}`;
|
||||
};
|
||||
|
||||
// @ts-expect-error ts-migrate(7030) FIXME: Not all code paths return a value.
|
||||
export const uploadToS3FromUrl = async (
|
||||
url: string,
|
||||
key: string,
|
||||
acl: string
|
||||
) => {
|
||||
const endpoint = publicS3Endpoint(true);
|
||||
if (
|
||||
url.startsWith("/api") ||
|
||||
url.startsWith(endpoint) ||
|
||||
url.startsWith(env.DEFAULT_AVATAR_HOST)
|
||||
) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const res = await fetch(url);
|
||||
// @ts-expect-error ts-migrate(2339) FIXME: Property 'buffer' does not exist on type 'Response... Remove this comment to see the full error message
|
||||
// @ts-expect-error buffer exists, need updated typings
|
||||
const buffer = await res.buffer();
|
||||
await s3
|
||||
.putObject({
|
||||
@@ -189,7 +198,6 @@ export const uploadToS3FromUrl = async (
|
||||
Body: buffer,
|
||||
})
|
||||
.promise();
|
||||
const endpoint = publicS3Endpoint(true);
|
||||
return `${endpoint}/${key}`;
|
||||
} catch (err) {
|
||||
Logger.error("Error uploading to S3 from URL", err, {
|
||||
@@ -197,6 +205,7 @@ export const uploadToS3FromUrl = async (
|
||||
key,
|
||||
acl,
|
||||
});
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user