[web] various migration script improvements (#31538)

* make warnings more eye catching in clean up dry run
* only increment processedCount after processing
* always rm recurlyAccountCode
* check if existing Stripe customer already has a subscription
* ensure no other customer accounts share the user id
* set recurly_to_stripe_migration_status to cancelled in cancel script
* add script for updating metadata of canceled migrations

GitOrigin-RevId: 3331de480e99774679ff2649b90d41e981a8fdef
This commit is contained in:
Kristina
2026-02-17 14:22:35 +01:00
committed by Copybot
parent c597e3b1f4
commit 85066c6cb3
8 changed files with 486 additions and 55 deletions
@@ -108,8 +108,6 @@ async function main(trackProgress) {
}
queue.add(async () => {
processedCount++
try {
const result = await processTermination(input, opts.commit)
@@ -126,12 +124,6 @@ async function main(trackProgress) {
} else {
errorCount++
}
if (processedCount % 25 === 0) {
await trackProgress(
`Progress: ${processedCount} processed, ${successCount} successful, ${errorCount} errors`
)
}
} catch (err) {
errorCount++
if (err instanceof ReportError) {
@@ -152,6 +144,13 @@ async function main(trackProgress) {
})
}
}
processedCount++
if (processedCount % 25 === 0) {
await trackProgress(
`Progress: ${processedCount} processed, ${successCount} successful, ${errorCount} errors`
)
}
})
}
} finally {
@@ -281,10 +280,10 @@ async function processTermination(input, commit) {
} else {
const note = isInExpectedEndState
? 'DRY RUN: Ready to terminate'
: `DRY RUN: Ready to terminate ${warning}`
: `DRY RUN: Can terminate, with this warning: ${warning}`
return {
status: 'validated',
status: isInExpectedEndState ? 'validated' : 'validated-with-warnings',
note,
}
}
@@ -593,7 +593,10 @@ async function fetchTargetStripeCustomer(
) {
const customer = await rateLimiters.requestWithRetries(
stripeClient.serviceName,
() => stripeClient.customers.retrieve(stripeCustomerId),
() =>
stripeClient.customers.retrieve(stripeCustomerId, {
expand: ['subscriptions'],
}),
{ ...context, stripeApi: 'customers.retrieve' }
)
@@ -604,6 +607,37 @@ async function fetchTargetStripeCustomer(
return customer
}
/**
* Query for other matching customers from the target Stripe account by ID.
*
* @param {Stripe} stripeClient - The Stripe client for the target account
* @param {string} userId - The user id to query
* @param {string} stripeCustomerId - The Stripe customer ID to exclude from results (if any)
* @param {object} context - Context for logging and rate limiter identification
* @returns {Promise<Stripe.Customer | null>}
*/
async function fetchOtherStripeCustomerByUserId(
stripeClient,
userId,
stripeCustomerId,
context
) {
const results = await rateLimiters.requestWithRetries(
stripeClient.serviceName,
() =>
stripeClient.customers.search({
query: `metadata['userId']:"${userId}"`,
limit: 100,
}),
{ ...context, stripeApi: 'customers.search' }
)
return (
results.data?.find(customer => customer.id !== context.stripeCustomerId) ||
null
)
}
/**
* Fetch existing customer's payment method from the target Stripe account by ID.
*
@@ -913,6 +947,25 @@ async function processCustomer(
stripeContext
)
if (existingCustomer.subscriptions?.data.length > 0) {
throw new Error(
`Stripe customer ${stripeCustomerId} already has ${existingCustomer.subscriptions.data.length} active subscription(s).`
)
}
const otherMatchingCustomer = await fetchOtherStripeCustomerByUserId(
stripeClient,
recurlyAccountCode,
stripeCustomerId,
stripeContext
)
if (otherMatchingCustomer) {
throw new Error(
`Found another Stripe customer with matching userId metadata: ${otherMatchingCustomer.id}`
)
}
logDebug(
'Found existing Stripe customer',
{
@@ -1062,19 +1115,25 @@ async function processCustomer(
if (taxInfoPendingValue) {
metadata.taxInfoPending = taxInfoPendingValue
}
if (
existingCustomer.metadata != null &&
existingCustomer.metadata.recurlyAccountCode === recurlyAccountCode &&
existingCustomer.metadata.userId == null
existingCustomer.metadata?.recurlyAccountCode &&
existingCustomer.metadata?.recurlyAccountCode !== recurlyAccountCode
) {
metadata.recurlyAccountCode = ''
metadata.userId = recurlyAccountCode
} else {
logWarn('Stripe customer metadata cannot be remapped', {
...context,
existingCustomerMetadata: existingCustomer.metadata,
})
throw new Error(
`Existing Stripe customer has unexpected recurlyAccountCode: (expected) ${recurlyAccountCode} (actual) ${existingCustomer.metadata?.recurlyAccountCode}`
)
}
if (
existingCustomer.metadata?.userId &&
existingCustomer.metadata?.userId !== recurlyAccountCode
) {
throw new Error(
`Existing Stripe customer has unexpected userId: (expected) ${recurlyAccountCode} (actual) ${existingCustomer.metadata?.userId}`
)
}
metadata.recurlyAccountCode = ''
metadata.userId = recurlyAccountCode
const { metadata: customFieldMetadata, counts: customFieldCounts } =
extractRecurlyCustomFieldMetadata(account)
@@ -106,8 +106,6 @@ async function main(trackProgress) {
}
queue.add(async () => {
processedCount++
try {
const result = await processScheduleCancellation(input, opts.commit)
@@ -124,12 +122,6 @@ async function main(trackProgress) {
} else {
errorCount++
}
if (processedCount % 25 === 0) {
await trackProgress(
`Progress: ${processedCount} processed, ${successCount} successful, ${errorCount} errors`
)
}
} catch (err) {
errorCount++
csvWriter.write({
@@ -140,6 +132,13 @@ async function main(trackProgress) {
note: err.message,
})
}
processedCount++
if (processedCount % 25 === 0) {
await trackProgress(
`Progress: ${processedCount} processed, ${successCount} successful, ${errorCount} errors`
)
}
})
}
} finally {
@@ -108,8 +108,6 @@ async function main(trackProgress) {
}
queue.add(async () => {
processedCount++
try {
const result = await processCancellation(input, opts.commit)
@@ -128,12 +126,6 @@ async function main(trackProgress) {
} else {
errorCount++
}
if (processedCount % 10 === 0) {
await trackProgress(
`Processed ${processedCount} customers (${successCount} ${opts.commit ? 'cancelled' : 'validated'}, ${errorCount} errors)`
)
}
} catch (err) {
errorCount++
if (err instanceof ReportError) {
@@ -157,6 +149,13 @@ async function main(trackProgress) {
)
}
}
processedCount++
if (processedCount % 10 === 0) {
await trackProgress(
`Processed ${processedCount} customers (${successCount} ${opts.commit ? 'cancelled' : 'validated'}, ${errorCount} errors)`
)
}
})
}
} finally {
@@ -351,6 +350,19 @@ async function processCancellation(input, commit) {
}
)
await rateLimiters.requestWithRetries(
stripeClient.serviceName,
() =>
stripeClient.updateSubscriptionMetadata(migrationSubscription.id, {
recurly_to_stripe_migration_status: 'cancelled',
}),
{
operation: 'updateSubscriptionMetadata',
subscriptionId: migrationSubscription.id,
region: stripeClient.serviceName,
}
)
return {
status: 'cancelled',
note: `Cancelled subscription ${migrationSubscription.id}`,
@@ -0,0 +1,358 @@
#!/usr/bin/env node
/**
* This script validates subscriptions that are canceled or expired and have migration metadata set to "in_progress",
* then updates the metadata to "cancelled" if validation passes.
*
* TODO: This script can be deleted after being run in production.
*
* Usage:
* node scripts/stripe/bulk-update-migration-status.mjs [OPTS] [INPUT-FILE]
*
* Options:
* --output PATH Output file path (default: /tmp/bulk_update_output_<timestamp>.csv)
* Use '-' to write to stdout
* --commit Apply changes (without this flag, runs in dry-run mode)
* --concurrency N Number of subscriptions to process concurrently (default: 10)
* --stripe-rate-limit N Requests per second for Stripe (default: 50)
* --stripe-api-retries N Number of retries on Stripe 429s (default: 5)
* --stripe-retry-delay-ms N Delay between Stripe retries in ms (default: 1000)
* --help Show a help message
*
* CSV Input Format:
* The CSV must have the following columns:
* - subscription_id: Stripe subscription id
* - target_stripe_account: Either 'stripe-uk' or 'stripe-us'
*
* Output:
* Writes a CSV with columns:
* - subscription_id: The subscription id processed
* - target_stripe_account: The Stripe account
* - status: Result status (validated, updated, invalid-status, invalid-metadata, or error)
* - note: Additional information about the status
*/
import fs from 'node:fs'
import path from 'node:path'
import * as csv from 'csv'
import minimist from 'minimist'
import PQueue from 'p-queue'
import { z } from '../../app/src/infrastructure/Validation.mjs'
import { scriptRunner } from '../lib/ScriptRunner.mjs'
import { getRegionClient } from '../../modules/subscriptions/app/src/StripeClient.mjs'
import { ReportError } from './helpers.mjs'
import {
createRateLimitedApiWrappers,
DEFAULT_STRIPE_RATE_LIMIT,
DEFAULT_STRIPE_API_RETRIES,
DEFAULT_STRIPE_RETRY_DELAY_MS,
} from './RateLimiter.mjs'
const DEFAULT_CONCURRENCY = 10
// rate limiters - initialized in main()
let rateLimiters
function usage() {
console.error(`Usage: node scripts/stripe/bulk-update-migration-status.mjs [OPTS] [INPUT-FILE]
Options:
--output PATH Output file path (default: /tmp/bulk_update_output_<timestamp>.csv)
Use '-' to write to stdout
--commit Apply changes (without this, runs in dry-run mode)
--concurrency N Number of subscriptions to process concurrently (default: ${DEFAULT_CONCURRENCY})
--stripe-rate-limit N Requests per second for Stripe (default: ${DEFAULT_STRIPE_RATE_LIMIT})
--stripe-api-retries N Number of retries on Stripe 429s (default: ${DEFAULT_STRIPE_API_RETRIES})
--stripe-retry-delay-ms N Delay between Stripe retries in ms (default: ${DEFAULT_STRIPE_RETRY_DELAY_MS})
--help Show this help message
`)
}
async function main(trackProgress) {
const opts = parseArgs()
const timestamp = new Date().toISOString().replace(/[:.]/g, '-')
const outputFile = opts.output ?? `/tmp/bulk_update_output_${timestamp}.csv`
// initialize rate limiters
rateLimiters = createRateLimitedApiWrappers({
stripeRateLimit: opts.stripeRateLimit,
stripeApiRetries: opts.stripeApiRetries,
stripeRetryDelayMs: opts.stripeRetryDelayMs,
})
await trackProgress(
'Starting bulk validation and update of subscription metadata'
)
await trackProgress(`Run mode: ${opts.commit ? 'COMMIT' : 'DRY RUN'}`)
await trackProgress(`Rate limit: Stripe ${opts.stripeRateLimit}/s`)
await trackProgress(`Concurrency: ${opts.concurrency}`)
const inputStream = opts.inputFile
? fs.createReadStream(opts.inputFile)
: process.stdin
const csvReader = getCsvReader(inputStream)
const csvWriter = getCsvWriter(outputFile)
await trackProgress(`Output: ${outputFile === '-' ? 'stdout' : outputFile}`)
let processedCount = 0
let successCount = 0
let errorCount = 0
const queue = new PQueue({ concurrency: opts.concurrency })
const maxQueueSize = opts.concurrency
try {
for await (const input of csvReader) {
if (queue.size >= maxQueueSize) {
await queue.onSizeLessThan(maxQueueSize)
}
queue.add(async () => {
try {
const result = await processValidation(input, opts.commit)
csvWriter.write({
subscription_id: input.subscription_id,
target_stripe_account: input.target_stripe_account,
status: result.status,
note:
result.note ||
(opts.commit ? '' : 'dry run - no changes applied'),
})
if (result.status === 'updated' || result.status === 'validated') {
successCount++
} else {
errorCount++
}
} catch (err) {
errorCount++
if (err instanceof ReportError) {
csvWriter.write({
subscription_id: input.subscription_id,
target_stripe_account: input.target_stripe_account,
status: err.status,
note: err.message,
})
} else {
csvWriter.write({
subscription_id: input.subscription_id,
target_stripe_account: input.target_stripe_account,
status: 'error',
note: err.message,
})
await trackProgress(
`Error processing ${input.subscription_id}: ${err.message}`
)
}
}
processedCount++
if (processedCount % 10 === 0) {
await trackProgress(
`Processed ${processedCount} subscriptions (${successCount} ${opts.commit ? 'updated' : 'validated'}, ${errorCount} errors)`
)
}
})
}
} finally {
await queue.onIdle()
}
await trackProgress(`✅ Total processed: ${processedCount}`)
if (opts.commit) {
await trackProgress(`✅ Successfully updated: ${successCount}`)
} else {
await trackProgress(`✅ Successfully validated: ${successCount}`)
await trackProgress('️ DRY RUN: No changes were applied')
}
await trackProgress(`❌ Errors: ${errorCount}`)
await trackProgress('🎉 Script completed!')
csvWriter.end()
}
function parseArgs() {
const args = minimist(process.argv.slice(2), {
string: [
'output',
'concurrency',
'stripe-rate-limit',
'stripe-api-retries',
'stripe-retry-delay-ms',
],
boolean: ['commit', 'help'],
default: {
commit: false,
concurrency: DEFAULT_CONCURRENCY,
'stripe-rate-limit': DEFAULT_STRIPE_RATE_LIMIT,
'stripe-api-retries': DEFAULT_STRIPE_API_RETRIES,
'stripe-retry-delay-ms': DEFAULT_STRIPE_RETRY_DELAY_MS,
},
unknown: arg => {
if (arg.startsWith('-')) {
console.error(`Unknown option: ${arg}`)
usage()
process.exit(1)
}
return true
},
})
if (args.help) {
usage()
process.exit(0)
}
const inputFile = args._[0]
const paramsSchema = z.object({
output: z.string().optional(),
commit: z.boolean(),
concurrency: z.number().int().positive(),
stripeRateLimit: z.number().positive(),
stripeApiRetries: z.number().int().nonnegative(),
stripeRetryDelayMs: z.number().int().nonnegative(),
inputFile: z.string().optional(),
})
try {
return paramsSchema.parse({
output: args.output,
commit: args.commit,
concurrency: Number(args.concurrency),
stripeRateLimit: Number(args['stripe-rate-limit']),
stripeApiRetries: Number(args['stripe-api-retries']),
stripeRetryDelayMs: Number(args['stripe-retry-delay-ms']),
inputFile,
})
} catch (err) {
console.error('Invalid arguments:', err.message)
usage()
process.exit(1)
}
}
function getCsvReader(inputStream) {
const parser = csv.parse({ columns: true })
inputStream.pipe(parser)
return parser
}
function getCsvWriter(outputFile) {
if (outputFile === '-') {
const writer = csv.stringify({
columns: ['subscription_id', 'target_stripe_account', 'status', 'note'],
header: true,
})
writer.on('error', err => {
console.error(err)
process.exit(1)
})
writer.pipe(process.stdout)
return writer
}
fs.mkdirSync(path.dirname(outputFile), { recursive: true })
const outputStream = fs.createWriteStream(outputFile)
const writer = csv.stringify({
columns: ['subscription_id', 'target_stripe_account', 'status', 'note'],
header: true,
})
writer.on('error', err => {
console.error(err)
process.exit(1)
})
writer.pipe(outputStream)
return writer
}
async function processValidation(input, commit) {
const {
subscription_id: subscriptionId,
target_stripe_account: targetStripeAccount,
} = input
// get Stripe client for the target account (strip 'stripe-' prefix if present)
const region = targetStripeAccount.replace(/^stripe-/, '')
const stripeClient = getRegionClient(region)
// fetch subscription
let subscription
try {
subscription = await rateLimiters.requestWithRetries(
stripeClient.serviceName,
() => stripeClient.stripe.subscriptions.retrieve(subscriptionId),
{
operation: 'subscriptions.retrieve',
subscriptionId,
region: stripeClient.serviceName,
}
)
} catch (err) {
throw new ReportError(
'subscription-not-found',
`Subscription not found: ${err.message}`
)
}
const validStatuses = ['canceled']
if (!validStatuses.includes(subscription.status)) {
throw new ReportError(
'invalid-status',
`Subscription status is ${subscription.status}, expected canceled`
)
}
if (
subscription.metadata?.recurly_to_stripe_migration_status !== 'in_progress'
) {
throw new ReportError(
'invalid-metadata',
`Migration status is ${subscription.metadata?.recurly_to_stripe_migration_status}, expected in_progress`
)
}
if (!commit) {
return {
status: 'validated',
note: 'Subscription is valid for update',
}
}
try {
await rateLimiters.requestWithRetries(
stripeClient.serviceName,
() =>
stripeClient.updateSubscriptionMetadata(subscriptionId, {
recurly_to_stripe_migration_status: 'cancelled',
}),
{
operation: 'updateSubscriptionMetadata',
subscriptionId,
region: stripeClient.serviceName,
}
)
return {
status: 'updated',
note: `Updated metadata for subscription ${subscriptionId}`,
}
} catch (err) {
throw new ReportError(
'update-failed',
`Failed to update metadata: ${err.message}`
)
}
}
try {
await scriptRunner(main)
process.exit(0)
} catch (error) {
console.error(error)
process.exit(1)
}
@@ -138,8 +138,6 @@ async function main(trackProgress) {
}
queue.add(async () => {
processedCount++
try {
const result = await processMigration(input, opts.commit)
@@ -164,12 +162,6 @@ async function main(trackProgress) {
} else {
errorCount++
}
if (processedCount % 25 === 0) {
await trackProgress(
`Progress: ${processedCount} processed, ${successCount} successful, ${errorCount} errors`
)
}
} catch (err) {
errorCount++
if (err instanceof ReportError) {
@@ -198,6 +190,13 @@ async function main(trackProgress) {
})
}
}
processedCount++
if (processedCount % 25 === 0) {
await trackProgress(
`Progress: ${processedCount} processed, ${successCount} successful, ${errorCount} errors`
)
}
})
}
} finally {
@@ -122,8 +122,6 @@ async function main(trackProgress) {
}
queue.add(async () => {
processedCount++
try {
const result = await processRollback(input, opts.commit)
@@ -144,12 +142,6 @@ async function main(trackProgress) {
} else {
errorCount++
}
if (processedCount % 25 === 0) {
await trackProgress(
`Progress: ${processedCount} processed, ${successCount} successful, ${errorCount} errors`
)
}
} catch (err) {
errorCount++
if (err instanceof ReportError) {
@@ -170,6 +162,13 @@ async function main(trackProgress) {
})
}
}
processedCount++
if (processedCount % 25 === 0) {
await trackProgress(
`Progress: ${processedCount} processed, ${successCount} successful, ${errorCount} errors`
)
}
})
}
} finally {
+8 -2
View File
@@ -3,7 +3,10 @@ import Stripe from 'stripe'
type StripeSubscription = Stripe.Subscription & {
metadata: {
billing_migration_id?: string
recurly_to_stripe_migration_status?: 'in_progress' | 'completed'
recurly_to_stripe_migration_status?:
| 'in_progress'
| 'completed'
| 'cancelled'
}
customer: string
}
@@ -57,7 +60,10 @@ export interface InvoicePaidWebhookEvent extends Stripe.EventBase {
subscription_details: Stripe.Invoice.Parent.SubscriptionDetails & {
metadata: {
billing_migration_id?: string
recurly_to_stripe_migration_status?: 'in_progress' | 'completed'
recurly_to_stripe_migration_status?:
| 'in_progress'
| 'completed'
| 'cancelled'
}
}
}