diff --git a/packages/server/src/StripeManager.ts b/packages/server/src/StripeManager.ts index 1d733141b..0c8b41612 100644 --- a/packages/server/src/StripeManager.ts +++ b/packages/server/src/StripeManager.ts @@ -796,6 +796,10 @@ export class StripeManager { } public async checkPredictionEligibility(orgId: string, subscriptionId: string): Promise> { + if (!this.stripe) { + throw new Error('Stripe is not initialized') + } + try { if (!subscriptionId || !orgId) { throw new Error('Subscription ID and Organization ID are required') @@ -821,21 +825,17 @@ export class StripeManager { } // Check credit balance for overage - const subscription = await this.stripe!.subscriptions.retrieve(subscriptionId) + const subscription = await this.stripe.subscriptions.retrieve(subscriptionId) const customerId = subscription.customer as string - const creditBalance = await this.stripe!.billing.creditBalanceSummary.retrieve({ - customer: customerId, - filter: { - type: 'applicability_scope', - applicability_scope: { - price_type: 'metered' - } - } - }) + const creditBalance = await this.getCreditBalance(subscriptionId) - const availableCredits = creditBalance.balances?.[0]?.available_balance?.monetary?.value || 0 - const requestCost = 1 // 1 cent per prediction + logger.info(`credit balance: ${JSON.stringify(creditBalance)}`) + + const availableCredits = creditBalance.availableCredits || 0 + const requestCost = 1 // 1 credit per prediction + + logger.info(`available credits: ${availableCredits}`) return { allowed: availableCredits >= requestCost, @@ -938,7 +938,10 @@ export class StripeManager { } } as any, category: 'paid', - name: `${selectedPackage.credits} Credits Purchase` + name: `${selectedPackage.credits} Credits Purchase`, + metadata: { + usage_count: '0' + } }) // Clear cache @@ -957,20 +960,32 @@ export class StripeManager { } } + private calculateTotalCredits = (grants: any[]) => { + if (!grants || !Array.isArray(grants)) return 0 + return grants.reduce((total, grant) => { + const grantValue = grant?.amount?.monetary?.value || 0 + return total + grantValue + }, 0) + } + public async getCreditBalance(subscriptionId: string): Promise> { + if (!this.stripe) { + throw new Error('Stripe is not initialized') + } + try { if (!subscriptionId) { throw new Error('Subscription ID is required') } - const subscription = await this.stripe!.subscriptions.retrieve(subscriptionId) + const subscription = await this.stripe.subscriptions.retrieve(subscriptionId) const customerId = subscription.customer as string if (!customerId) { throw new Error('Customer ID not found in subscription') } - const creditBalance = await this.stripe!.billing.creditBalanceSummary.retrieve({ + const creditBalance = await this.stripe.billing.creditBalanceSummary.retrieve({ customer: customerId, filter: { type: 'applicability_scope', @@ -984,20 +999,42 @@ export class StripeManager { const balanceInDollars = balance / 100 // Get credit grants for detailed info - const grants = await this.stripe!.billing.creditGrants.list({ + const grants = await this.stripe.billing.creditGrants.list({ customer: customerId, - limit: 10 + limit: 100 }) + // Calculate total credits and usage from grants + let totalCredits = 0 + let totalUsage = 0 + const grantsInfo = grants.data.map((grant) => { + const grantAmount = grant.amount?.monetary?.value || 0 + const grantCredits = this.getCreditsFromPrice(grantAmount) + const usage = parseInt(grant.metadata?.usage_count || '0') + + totalCredits += grantCredits + totalUsage += usage + + return { + id: grant.id, + amount: grant.amount, + name: grant.name, + created: grant.created, + credits: grantCredits, + usage: usage, + effectiveBalance: (grant as any).effective_balance?.monetary?.value || 0 + } + }) + + const availableCredits = Math.max(0, totalCredits - totalUsage) + return { balance, balanceInDollars, - grants: grants.data.map((grant) => ({ - id: grant.id, - amount: grant.amount, - name: grant.name, - created: grant.created - })) + totalCredits, + totalUsage, + availableCredits, + grants: grantsInfo } } catch (error) { console.error('Error getting credit balance:', error) @@ -1036,6 +1073,10 @@ export class StripeManager { credits: { balance: creditBalance.balance, balanceInDollars: creditBalance.balanceInDollars, + totalCredits: creditBalance.totalCredits, + totalUsage: creditBalance.totalUsage, + availableCredits: creditBalance.availableCredits, + grants: creditBalance.grants, costPerPrediction: 0.01 } } @@ -1115,7 +1156,7 @@ export class StripeManager { // Add metered subscription item await this.stripe.subscriptionItems.create({ subscription: subscriptionId, - price: process.env.METERED_PRICE_ID! + price: process.env.METERED_PRICE_ID }) return { @@ -1129,6 +1170,10 @@ export class StripeManager { } public async reportMeterUsage(customerId: string, quantity: number = 1): Promise { + if (!this.stripe) { + throw new Error('Stripe is not initialized') + } + try { if (!process.env.METER_EVENT_NAME) { throw new Error('METER_EVENT_NAME environment variable is required') @@ -1137,19 +1182,122 @@ export class StripeManager { throw new Error('Customer ID is required') } - await this.stripe!.billing.meterEvents.create({ - event_name: process.env.METER_EVENT_NAME!, + logger.info(`[reportMeterUsage] Reporting ${quantity} usage for customer ${customerId}`) + + // Report meter usage to Stripe + await this.stripe.billing.meterEvents.create({ + event_name: process.env.METER_EVENT_NAME, payload: { stripe_customer_id: customerId, value: quantity.toString() } }) + + logger.info(`[reportMeterUsage] Successfully reported meter usage to Stripe`) + + // Track usage in credit grant metadata + await this.updateCreditGrantUsage(customerId, quantity) + + logger.info(`[reportMeterUsage] Completed usage tracking for customer ${customerId}`) } catch (error) { - console.error('Error reporting meter usage:', error) + logger.error('Error reporting meter usage:', error) throw error } } + private async updateCreditGrantUsage(customerId: string, quantity: number): Promise { + if (!this.stripe) { + throw new Error('Stripe is not initialized') + } + + if (!customerId) { + logger.error('[updateCreditGrantUsage] Customer ID is required') + return + } + + if (quantity <= 0) { + logger.error('[updateCreditGrantUsage] Quantity must be positive') + return + } + + try { + logger.info(`[updateCreditGrantUsage] Starting update for customer ${customerId}, quantity: ${quantity}`) + + // Get all credit grants for this customer (not just active ones) + const grants = await this.stripe.billing.creditGrants.list({ + customer: customerId, + limit: 100 + }) + + logger.info(`[updateCreditGrantUsage] Found ${grants.data.length} credit grants for customer ${customerId}`) + + if (grants.data.length === 0) { + logger.info('[updateCreditGrantUsage] No credit grants found for customer') + return + } + + // Sort by creation date (oldest first) to use FIFO approach + grants.data.sort((a, b) => a.created - b.created) + + // Find the grant that should be used for tracking usage + // We'll use the first grant that still has available credits or the oldest one + let grantToUpdate = grants.data.find((grant) => { + const effectiveBalance = (grant as any).effective_balance?.monetary?.value || 0 + logger.debug(`[updateCreditGrantUsage] Grant ${grant.id} has effective balance: ${effectiveBalance}`) + return effectiveBalance > 0 + }) + + // If no grant has remaining balance, use the most recent one for tracking + if (!grantToUpdate && grants.data.length > 0) { + grantToUpdate = grants.data[grants.data.length - 1] + logger.info(`[updateCreditGrantUsage] No grants with balance found, using most recent: ${grantToUpdate.id}`) + } + + if (!grantToUpdate) { + logger.warn('[updateCreditGrantUsage] No suitable grant found for usage tracking') + return + } + + // Validate metadata access + const metadata = grantToUpdate.metadata || {} + const currentUsageStr = metadata.usage_count || '0' + const currentUsage = parseInt(currentUsageStr) + + if (isNaN(currentUsage)) { + logger.error(`[updateCreditGrantUsage] Invalid usage_count in metadata: ${currentUsageStr}`) + return + } + + const newUsage = currentUsage + quantity + + logger.info(`[updateCreditGrantUsage] Updating grant ${grantToUpdate.id}: current usage ${currentUsage} -> ${newUsage}`) + + const updateResult = await this.stripe.billing.creditGrants.update(grantToUpdate.id, { + metadata: { + ...metadata, + usage_count: newUsage.toString() + } + }) + + logger.info(`[updateCreditGrantUsage] Successfully updated credit grant usage for grant ${grantToUpdate.id}`) + logger.debug(`[updateCreditGrantUsage] Updated metadata:`, updateResult.metadata) + + // Clear cache to ensure fresh data on next request + if (this.cacheManager) { + await this.cacheManager.del(`credits:balance:${customerId}`) + logger.debug(`[updateCreditGrantUsage] Cleared cache for customer ${customerId}`) + } + } catch (error) { + logger.error('[updateCreditGrantUsage] Error updating credit grant usage:', error) + // Log additional details for debugging + if (error instanceof Error) { + logger.error('[updateCreditGrantUsage] Error message:', error.message) + logger.debug('[updateCreditGrantUsage] Error stack:', error.stack) + } + // Don't throw here as meter usage was already reported successfully + } + } + private getCreditsFromPrice(unitAmount: number): number { // $10.00 = 1000 credits, so 1 cent = 1 credit return unitAmount diff --git a/packages/server/src/utils/buildChatflow.ts b/packages/server/src/utils/buildChatflow.ts index f98ab61b2..ad987c405 100644 --- a/packages/server/src/utils/buildChatflow.ts +++ b/packages/server/src/utils/buildChatflow.ts @@ -60,7 +60,14 @@ import { import { validateFlowAPIKey } from './validateKey' import logger from './logger' import { utilAddChatMessage } from './addChatMesage' -import { checkPredictions, checkStorage, updatePredictionsUsage, updateStorageUsage } from './quotaUsage' +import { + checkPredictions, + checkPredictionsWithCredits, + checkStorage, + updatePredictionsUsage, + updatePredictionsUsageWithCredits, + updateStorageUsage +} from './quotaUsage' import { buildAgentGraph } from './buildAgentGraph' import { getErrorMessage } from '../errors/utils' import { FLOWISE_METRIC_COUNTERS, FLOWISE_COUNTER_STATUS, IMetricsProvider } from '../Interface.Metrics' @@ -955,7 +962,8 @@ export const utilBuildChatflow = async (req: Request, isInternal: boolean = fals throw new InternalFlowiseError(StatusCodes.PAYMENT_REQUIRED, 'Organization suspended due to non-payment') } - await checkPredictions(orgId, subscriptionId, appServer.usageCacheManager) + // await checkPredictions(orgId, subscriptionId, appServer.usageCacheManager) + const predictionCheck = await checkPredictionsWithCredits(orgId, subscriptionId, appServer.usageCacheManager) const executeData: IExecuteFlowParams = { incomingInput, // Use the defensively created incomingInput variable @@ -989,7 +997,13 @@ export const utilBuildChatflow = async (req: Request, isInternal: boolean = fals if (!result) { throw new Error('Job execution failed') } - await updatePredictionsUsage(orgId, subscriptionId, workspaceId, appServer.usageCacheManager) + // await updatePredictionsUsage(orgId, subscriptionId, workspaceId, appServer.usageCacheManager) + await updatePredictionsUsageWithCredits( + orgId, + subscriptionId, + predictionCheck?.useCredits || false, + appServer.usageCacheManager + ) incrementSuccessMetricCounter(appServer.metricsProvider, isInternal, isAgentFlow) return result } else { @@ -1001,7 +1015,13 @@ export const utilBuildChatflow = async (req: Request, isInternal: boolean = fals const result = await executeFlow(executeData) appServer.abortControllerPool.remove(abortControllerId) - await updatePredictionsUsage(orgId, subscriptionId, workspaceId, appServer.usageCacheManager) + // await updatePredictionsUsage(orgId, subscriptionId, workspaceId, appServer.usageCacheManager) + await updatePredictionsUsageWithCredits( + orgId, + subscriptionId, + predictionCheck?.useCredits || false, + appServer.usageCacheManager + ) incrementSuccessMetricCounter(appServer.metricsProvider, isInternal, isAgentFlow) return result } diff --git a/packages/server/src/utils/quotaUsage.ts b/packages/server/src/utils/quotaUsage.ts index a614c6c09..657ed1af1 100644 --- a/packages/server/src/utils/quotaUsage.ts +++ b/packages/server/src/utils/quotaUsage.ts @@ -75,6 +75,59 @@ export const checkUsageLimit = async ( } } +export const checkPredictions = async (orgId: string, subscriptionId: string, usageCacheManager: UsageCacheManager) => { + if (!usageCacheManager || !subscriptionId) return + + const currentPredictions: number = (await usageCacheManager.get(`predictions:${orgId}`)) || 0 + + const quotas = await usageCacheManager.getQuotas(subscriptionId) + const predictionsLimit = quotas[LICENSE_QUOTAS.PREDICTIONS_LIMIT] + if (predictionsLimit === -1) return + + if (currentPredictions >= predictionsLimit) { + throw new InternalFlowiseError(StatusCodes.TOO_MANY_REQUESTS, 'Predictions limit exceeded') + } + + return { + usage: currentPredictions, + limit: predictionsLimit + } +} + +// Enhanced prediction checking that includes credit eligibility +export const checkPredictionsWithCredits = async (orgId: string, subscriptionId: string, usageCacheManager: UsageCacheManager) => { + if (!usageCacheManager || !subscriptionId) return + + const eligibility = await checkPredictionEligibility(orgId, subscriptionId, usageCacheManager) + + if (!eligibility.allowed) { + throw new InternalFlowiseError(StatusCodes.PAYMENT_REQUIRED, 'Predictions limit exceeded. Please purchase credits to continue.') + } + + return { + useCredits: eligibility.useCredits, + remainingCredits: eligibility.remainingCredits, + usage: eligibility.currentUsage, + limit: eligibility.planLimit + } +} + +export const checkPredictionEligibility = async (orgId: string, subscriptionId: string, usageCacheManager: UsageCacheManager) => { + try { + if (!usageCacheManager || !subscriptionId) return { allowed: true, useCredits: false } + + const stripeManager = await StripeManager.getInstance() + if (!stripeManager) return { allowed: true, useCredits: false } + + const eligibility = await stripeManager.checkPredictionEligibility(orgId, subscriptionId) + logger.info(`eligibility: ${JSON.stringify(eligibility)}`) + return eligibility + } catch (error) { + logger.error(`[checkPredictionEligibility] Error checking prediction eligibility: ${error}`) + throw error + } +} + // As predictions limit renew per month, we set to cache with 1 month TTL export const updatePredictionsUsage = async ( orgId: string, @@ -126,40 +179,6 @@ export const updatePredictionsUsage = async ( } } -export const checkPredictions = async (orgId: string, subscriptionId: string, usageCacheManager: UsageCacheManager) => { - if (!usageCacheManager || !subscriptionId) return - - const currentPredictions: number = (await usageCacheManager.get(`predictions:${orgId}`)) || 0 - - const quotas = await usageCacheManager.getQuotas(subscriptionId) - const predictionsLimit = quotas[LICENSE_QUOTAS.PREDICTIONS_LIMIT] - if (predictionsLimit === -1) return - - if (currentPredictions >= predictionsLimit) { - throw new InternalFlowiseError(StatusCodes.TOO_MANY_REQUESTS, 'Predictions limit exceeded') - } - - return { - usage: currentPredictions, - limit: predictionsLimit - } -} - -export const checkPredictionEligibility = async (orgId: string, subscriptionId: string, usageCacheManager: UsageCacheManager) => { - try { - if (!usageCacheManager || !subscriptionId) return { allowed: true, useCredits: false } - - const stripeManager = await StripeManager.getInstance() - if (!stripeManager) return { allowed: true, useCredits: false } - - const eligibility = await stripeManager.checkPredictionEligibility(orgId, subscriptionId) - return eligibility - } catch (error) { - logger.error(`[checkPredictionEligibility] Error checking prediction eligibility: ${error}`) - throw error - } -} - export const updatePredictionsUsageWithCredits = async ( orgId: string, subscriptionId: string, @@ -174,6 +193,7 @@ export const updatePredictionsUsageWithCredits = async ( const stripeManager = await StripeManager.getInstance() if (stripeManager) { const subscriptionDetails = await usageCacheManager.getSubscriptionDetails(subscriptionId) + logger.info(`subscription details: ${JSON.stringify(subscriptionDetails)}`) if (subscriptionDetails && subscriptionDetails.customer) { await stripeManager.reportMeterUsage(subscriptionDetails.customer as string) } @@ -187,24 +207,6 @@ export const updatePredictionsUsageWithCredits = async ( } } -// Enhanced prediction checking that includes credit eligibility -export const checkPredictionsWithCredits = async (orgId: string, subscriptionId: string, usageCacheManager: UsageCacheManager) => { - if (!usageCacheManager || !subscriptionId) return - - const eligibility = await checkPredictionEligibility(orgId, subscriptionId, usageCacheManager) - - if (!eligibility.allowed) { - throw new InternalFlowiseError(StatusCodes.PAYMENT_REQUIRED, 'Predictions limit exceeded. Please purchase credits to continue.') - } - - return { - useCredits: eligibility.useCredits, - remainingCredits: eligibility.remainingCredits, - usage: eligibility.currentUsage, - limit: eligibility.planLimit - } -} - // Storage does not renew per month nor do we store the total size in database, so we just store the total size in cache export const updateStorageUsage = (orgId: string, _: string = '', totalSize: number, usageCacheManager?: UsageCacheManager) => { if (!usageCacheManager) return diff --git a/packages/ui/src/views/account/index.jsx b/packages/ui/src/views/account/index.jsx index d3d9e16b2..697b0fff9 100644 --- a/packages/ui/src/views/account/index.jsx +++ b/packages/ui/src/views/account/index.jsx @@ -60,11 +60,26 @@ const calculatePercentage = (count, total) => { const calculateTotalCredits = (grants) => { if (!grants || !Array.isArray(grants)) return 0 return grants.reduce((total, grant) => { - const grantValue = grant?.amount?.monetary?.value || 0 - return total + grantValue + const grantCredits = grant?.credits || 0 + return total + grantCredits }, 0) } +const calculateTotalUsage = (grants) => { + if (!grants || !Array.isArray(grants)) return 0 + return grants.reduce((total, grant) => { + const usage = grant?.usage || 0 + return total + usage + }, 0) +} + +const calculateAvailableCredits = (grants) => { + if (!grants || !Array.isArray(grants)) return 0 + const totalCredits = calculateTotalCredits(grants) + const totalUsage = calculateTotalUsage(grants) + return Math.max(0, totalCredits - totalUsage) +} + const AccountSettings = () => { const theme = useTheme() const dispatch = useDispatch() @@ -101,8 +116,15 @@ const AccountSettings = () => { const totalCredits = useMemo(() => { return creditsBalance ? calculateTotalCredits(creditsBalance.grants) : 0 }, [creditsBalance]) + + const totalUsage = useMemo(() => { + return creditsBalance ? calculateTotalUsage(creditsBalance.grants) : 0 + }, [creditsBalance]) + + const availableCredits = useMemo(() => { + return creditsBalance ? calculateAvailableCredits(creditsBalance.grants) : 0 + }, [creditsBalance]) const [creditsPackages, setCreditsPackages] = useState([]) - const [usageWithCredits, setUsageWithCredits] = useState(null) const [openCreditsDialog, setOpenCreditsDialog] = useState(false) const [selectedPackage, setSelectedPackage] = useState(null) const [isPurchasingCredits, setIsPurchasingCredits] = useState(false) @@ -178,12 +200,6 @@ const AccountSettings = () => { } }, [getCreditsPackagesApi.data]) - useEffect(() => { - if (getUsageWithCreditsApi.data) { - setUsageWithCredits(getUsageWithCreditsApi.data) - } - }, [getUsageWithCreditsApi.data]) - useEffect(() => { if (openRemoveSeatsDialog || openAddSeatsDialog || openCreditsDialog) { setSeatsQuantity(0) @@ -849,21 +865,21 @@ const AccountSettings = () => { Available Credits: + {getCreditsBalanceApi.loading ? : availableCredits || 0} + + + + Credits Used: + + {getCreditsBalanceApi.loading ? : totalUsage || 0} + + + + Total Credits Purchased: + {getCreditsBalanceApi.loading ? : totalCredits || 0} - {usageWithCredits && ( - - Credits Used This Month: - - {getUsageWithCreditsApi.loading ? ( - - ) : ( - usageWithCredits?.creditsUsed || 0 - )} - - - )} { Current Balance - {totalCredits || 0} Credits + {availableCredits || 0} Available Credits + + + Total: {totalCredits || 0} | Used: {totalUsage || 0}