Fix issues with tracking predictions usage when consuming credits

This commit is contained in:
Ilango 2025-08-01 15:04:32 +05:30
parent 66167761e0
commit e612716c7c
4 changed files with 294 additions and 105 deletions

View File

@ -796,6 +796,10 @@ export class StripeManager {
}
public async checkPredictionEligibility(orgId: string, subscriptionId: string): Promise<Record<string, any>> {
if (!this.stripe) {
throw new Error('Stripe is not initialized')
}
try {
if (!subscriptionId || !orgId) {
throw new Error('Subscription ID and Organization ID are required')
@ -821,21 +825,17 @@ export class StripeManager {
}
// Check credit balance for overage
const subscription = await this.stripe!.subscriptions.retrieve(subscriptionId)
const subscription = await this.stripe.subscriptions.retrieve(subscriptionId)
const customerId = subscription.customer as string
const creditBalance = await this.stripe!.billing.creditBalanceSummary.retrieve({
customer: customerId,
filter: {
type: 'applicability_scope',
applicability_scope: {
price_type: 'metered'
}
}
})
const creditBalance = await this.getCreditBalance(subscriptionId)
const availableCredits = creditBalance.balances?.[0]?.available_balance?.monetary?.value || 0
const requestCost = 1 // 1 cent per prediction
logger.info(`credit balance: ${JSON.stringify(creditBalance)}`)
const availableCredits = creditBalance.availableCredits || 0
const requestCost = 1 // 1 credit per prediction
logger.info(`available credits: ${availableCredits}`)
return {
allowed: availableCredits >= requestCost,
@ -938,7 +938,10 @@ export class StripeManager {
}
} as any,
category: 'paid',
name: `${selectedPackage.credits} Credits Purchase`
name: `${selectedPackage.credits} Credits Purchase`,
metadata: {
usage_count: '0'
}
})
// Clear cache
@ -957,20 +960,32 @@ export class StripeManager {
}
}
private calculateTotalCredits = (grants: any[]) => {
if (!grants || !Array.isArray(grants)) return 0
return grants.reduce((total, grant) => {
const grantValue = grant?.amount?.monetary?.value || 0
return total + grantValue
}, 0)
}
public async getCreditBalance(subscriptionId: string): Promise<Record<string, any>> {
if (!this.stripe) {
throw new Error('Stripe is not initialized')
}
try {
if (!subscriptionId) {
throw new Error('Subscription ID is required')
}
const subscription = await this.stripe!.subscriptions.retrieve(subscriptionId)
const subscription = await this.stripe.subscriptions.retrieve(subscriptionId)
const customerId = subscription.customer as string
if (!customerId) {
throw new Error('Customer ID not found in subscription')
}
const creditBalance = await this.stripe!.billing.creditBalanceSummary.retrieve({
const creditBalance = await this.stripe.billing.creditBalanceSummary.retrieve({
customer: customerId,
filter: {
type: 'applicability_scope',
@ -984,20 +999,42 @@ export class StripeManager {
const balanceInDollars = balance / 100
// Get credit grants for detailed info
const grants = await this.stripe!.billing.creditGrants.list({
const grants = await this.stripe.billing.creditGrants.list({
customer: customerId,
limit: 10
limit: 100
})
// Calculate total credits and usage from grants
let totalCredits = 0
let totalUsage = 0
const grantsInfo = grants.data.map((grant) => {
const grantAmount = grant.amount?.monetary?.value || 0
const grantCredits = this.getCreditsFromPrice(grantAmount)
const usage = parseInt(grant.metadata?.usage_count || '0')
totalCredits += grantCredits
totalUsage += usage
return {
id: grant.id,
amount: grant.amount,
name: grant.name,
created: grant.created,
credits: grantCredits,
usage: usage,
effectiveBalance: (grant as any).effective_balance?.monetary?.value || 0
}
})
const availableCredits = Math.max(0, totalCredits - totalUsage)
return {
balance,
balanceInDollars,
grants: grants.data.map((grant) => ({
id: grant.id,
amount: grant.amount,
name: grant.name,
created: grant.created
}))
totalCredits,
totalUsage,
availableCredits,
grants: grantsInfo
}
} catch (error) {
console.error('Error getting credit balance:', error)
@ -1036,6 +1073,10 @@ export class StripeManager {
credits: {
balance: creditBalance.balance,
balanceInDollars: creditBalance.balanceInDollars,
totalCredits: creditBalance.totalCredits,
totalUsage: creditBalance.totalUsage,
availableCredits: creditBalance.availableCredits,
grants: creditBalance.grants,
costPerPrediction: 0.01
}
}
@ -1115,7 +1156,7 @@ export class StripeManager {
// Add metered subscription item
await this.stripe.subscriptionItems.create({
subscription: subscriptionId,
price: process.env.METERED_PRICE_ID!
price: process.env.METERED_PRICE_ID
})
return {
@ -1129,6 +1170,10 @@ export class StripeManager {
}
public async reportMeterUsage(customerId: string, quantity: number = 1): Promise<void> {
if (!this.stripe) {
throw new Error('Stripe is not initialized')
}
try {
if (!process.env.METER_EVENT_NAME) {
throw new Error('METER_EVENT_NAME environment variable is required')
@ -1137,19 +1182,122 @@ export class StripeManager {
throw new Error('Customer ID is required')
}
await this.stripe!.billing.meterEvents.create({
event_name: process.env.METER_EVENT_NAME!,
logger.info(`[reportMeterUsage] Reporting ${quantity} usage for customer ${customerId}`)
// Report meter usage to Stripe
await this.stripe.billing.meterEvents.create({
event_name: process.env.METER_EVENT_NAME,
payload: {
stripe_customer_id: customerId,
value: quantity.toString()
}
})
logger.info(`[reportMeterUsage] Successfully reported meter usage to Stripe`)
// Track usage in credit grant metadata
await this.updateCreditGrantUsage(customerId, quantity)
logger.info(`[reportMeterUsage] Completed usage tracking for customer ${customerId}`)
} catch (error) {
console.error('Error reporting meter usage:', error)
logger.error('Error reporting meter usage:', error)
throw error
}
}
private async updateCreditGrantUsage(customerId: string, quantity: number): Promise<void> {
if (!this.stripe) {
throw new Error('Stripe is not initialized')
}
if (!customerId) {
logger.error('[updateCreditGrantUsage] Customer ID is required')
return
}
if (quantity <= 0) {
logger.error('[updateCreditGrantUsage] Quantity must be positive')
return
}
try {
logger.info(`[updateCreditGrantUsage] Starting update for customer ${customerId}, quantity: ${quantity}`)
// Get all credit grants for this customer (not just active ones)
const grants = await this.stripe.billing.creditGrants.list({
customer: customerId,
limit: 100
})
logger.info(`[updateCreditGrantUsage] Found ${grants.data.length} credit grants for customer ${customerId}`)
if (grants.data.length === 0) {
logger.info('[updateCreditGrantUsage] No credit grants found for customer')
return
}
// Sort by creation date (oldest first) to use FIFO approach
grants.data.sort((a, b) => a.created - b.created)
// Find the grant that should be used for tracking usage
// We'll use the first grant that still has available credits or the oldest one
let grantToUpdate = grants.data.find((grant) => {
const effectiveBalance = (grant as any).effective_balance?.monetary?.value || 0
logger.debug(`[updateCreditGrantUsage] Grant ${grant.id} has effective balance: ${effectiveBalance}`)
return effectiveBalance > 0
})
// If no grant has remaining balance, use the most recent one for tracking
if (!grantToUpdate && grants.data.length > 0) {
grantToUpdate = grants.data[grants.data.length - 1]
logger.info(`[updateCreditGrantUsage] No grants with balance found, using most recent: ${grantToUpdate.id}`)
}
if (!grantToUpdate) {
logger.warn('[updateCreditGrantUsage] No suitable grant found for usage tracking')
return
}
// Validate metadata access
const metadata = grantToUpdate.metadata || {}
const currentUsageStr = metadata.usage_count || '0'
const currentUsage = parseInt(currentUsageStr)
if (isNaN(currentUsage)) {
logger.error(`[updateCreditGrantUsage] Invalid usage_count in metadata: ${currentUsageStr}`)
return
}
const newUsage = currentUsage + quantity
logger.info(`[updateCreditGrantUsage] Updating grant ${grantToUpdate.id}: current usage ${currentUsage} -> ${newUsage}`)
const updateResult = await this.stripe.billing.creditGrants.update(grantToUpdate.id, {
metadata: {
...metadata,
usage_count: newUsage.toString()
}
})
logger.info(`[updateCreditGrantUsage] Successfully updated credit grant usage for grant ${grantToUpdate.id}`)
logger.debug(`[updateCreditGrantUsage] Updated metadata:`, updateResult.metadata)
// Clear cache to ensure fresh data on next request
if (this.cacheManager) {
await this.cacheManager.del(`credits:balance:${customerId}`)
logger.debug(`[updateCreditGrantUsage] Cleared cache for customer ${customerId}`)
}
} catch (error) {
logger.error('[updateCreditGrantUsage] Error updating credit grant usage:', error)
// Log additional details for debugging
if (error instanceof Error) {
logger.error('[updateCreditGrantUsage] Error message:', error.message)
logger.debug('[updateCreditGrantUsage] Error stack:', error.stack)
}
// Don't throw here as meter usage was already reported successfully
}
}
private getCreditsFromPrice(unitAmount: number): number {
// $10.00 = 1000 credits, so 1 cent = 1 credit
return unitAmount

View File

@ -60,7 +60,14 @@ import {
import { validateFlowAPIKey } from './validateKey'
import logger from './logger'
import { utilAddChatMessage } from './addChatMesage'
import { checkPredictions, checkStorage, updatePredictionsUsage, updateStorageUsage } from './quotaUsage'
import {
checkPredictions,
checkPredictionsWithCredits,
checkStorage,
updatePredictionsUsage,
updatePredictionsUsageWithCredits,
updateStorageUsage
} from './quotaUsage'
import { buildAgentGraph } from './buildAgentGraph'
import { getErrorMessage } from '../errors/utils'
import { FLOWISE_METRIC_COUNTERS, FLOWISE_COUNTER_STATUS, IMetricsProvider } from '../Interface.Metrics'
@ -955,7 +962,8 @@ export const utilBuildChatflow = async (req: Request, isInternal: boolean = fals
throw new InternalFlowiseError(StatusCodes.PAYMENT_REQUIRED, 'Organization suspended due to non-payment')
}
await checkPredictions(orgId, subscriptionId, appServer.usageCacheManager)
// await checkPredictions(orgId, subscriptionId, appServer.usageCacheManager)
const predictionCheck = await checkPredictionsWithCredits(orgId, subscriptionId, appServer.usageCacheManager)
const executeData: IExecuteFlowParams = {
incomingInput, // Use the defensively created incomingInput variable
@ -989,7 +997,13 @@ export const utilBuildChatflow = async (req: Request, isInternal: boolean = fals
if (!result) {
throw new Error('Job execution failed')
}
await updatePredictionsUsage(orgId, subscriptionId, workspaceId, appServer.usageCacheManager)
// await updatePredictionsUsage(orgId, subscriptionId, workspaceId, appServer.usageCacheManager)
await updatePredictionsUsageWithCredits(
orgId,
subscriptionId,
predictionCheck?.useCredits || false,
appServer.usageCacheManager
)
incrementSuccessMetricCounter(appServer.metricsProvider, isInternal, isAgentFlow)
return result
} else {
@ -1001,7 +1015,13 @@ export const utilBuildChatflow = async (req: Request, isInternal: boolean = fals
const result = await executeFlow(executeData)
appServer.abortControllerPool.remove(abortControllerId)
await updatePredictionsUsage(orgId, subscriptionId, workspaceId, appServer.usageCacheManager)
// await updatePredictionsUsage(orgId, subscriptionId, workspaceId, appServer.usageCacheManager)
await updatePredictionsUsageWithCredits(
orgId,
subscriptionId,
predictionCheck?.useCredits || false,
appServer.usageCacheManager
)
incrementSuccessMetricCounter(appServer.metricsProvider, isInternal, isAgentFlow)
return result
}

View File

@ -75,6 +75,59 @@ export const checkUsageLimit = async (
}
}
export const checkPredictions = async (orgId: string, subscriptionId: string, usageCacheManager: UsageCacheManager) => {
if (!usageCacheManager || !subscriptionId) return
const currentPredictions: number = (await usageCacheManager.get(`predictions:${orgId}`)) || 0
const quotas = await usageCacheManager.getQuotas(subscriptionId)
const predictionsLimit = quotas[LICENSE_QUOTAS.PREDICTIONS_LIMIT]
if (predictionsLimit === -1) return
if (currentPredictions >= predictionsLimit) {
throw new InternalFlowiseError(StatusCodes.TOO_MANY_REQUESTS, 'Predictions limit exceeded')
}
return {
usage: currentPredictions,
limit: predictionsLimit
}
}
// Enhanced prediction checking that includes credit eligibility
export const checkPredictionsWithCredits = async (orgId: string, subscriptionId: string, usageCacheManager: UsageCacheManager) => {
if (!usageCacheManager || !subscriptionId) return
const eligibility = await checkPredictionEligibility(orgId, subscriptionId, usageCacheManager)
if (!eligibility.allowed) {
throw new InternalFlowiseError(StatusCodes.PAYMENT_REQUIRED, 'Predictions limit exceeded. Please purchase credits to continue.')
}
return {
useCredits: eligibility.useCredits,
remainingCredits: eligibility.remainingCredits,
usage: eligibility.currentUsage,
limit: eligibility.planLimit
}
}
export const checkPredictionEligibility = async (orgId: string, subscriptionId: string, usageCacheManager: UsageCacheManager) => {
try {
if (!usageCacheManager || !subscriptionId) return { allowed: true, useCredits: false }
const stripeManager = await StripeManager.getInstance()
if (!stripeManager) return { allowed: true, useCredits: false }
const eligibility = await stripeManager.checkPredictionEligibility(orgId, subscriptionId)
logger.info(`eligibility: ${JSON.stringify(eligibility)}`)
return eligibility
} catch (error) {
logger.error(`[checkPredictionEligibility] Error checking prediction eligibility: ${error}`)
throw error
}
}
// As predictions limit renew per month, we set to cache with 1 month TTL
export const updatePredictionsUsage = async (
orgId: string,
@ -126,40 +179,6 @@ export const updatePredictionsUsage = async (
}
}
export const checkPredictions = async (orgId: string, subscriptionId: string, usageCacheManager: UsageCacheManager) => {
if (!usageCacheManager || !subscriptionId) return
const currentPredictions: number = (await usageCacheManager.get(`predictions:${orgId}`)) || 0
const quotas = await usageCacheManager.getQuotas(subscriptionId)
const predictionsLimit = quotas[LICENSE_QUOTAS.PREDICTIONS_LIMIT]
if (predictionsLimit === -1) return
if (currentPredictions >= predictionsLimit) {
throw new InternalFlowiseError(StatusCodes.TOO_MANY_REQUESTS, 'Predictions limit exceeded')
}
return {
usage: currentPredictions,
limit: predictionsLimit
}
}
export const checkPredictionEligibility = async (orgId: string, subscriptionId: string, usageCacheManager: UsageCacheManager) => {
try {
if (!usageCacheManager || !subscriptionId) return { allowed: true, useCredits: false }
const stripeManager = await StripeManager.getInstance()
if (!stripeManager) return { allowed: true, useCredits: false }
const eligibility = await stripeManager.checkPredictionEligibility(orgId, subscriptionId)
return eligibility
} catch (error) {
logger.error(`[checkPredictionEligibility] Error checking prediction eligibility: ${error}`)
throw error
}
}
export const updatePredictionsUsageWithCredits = async (
orgId: string,
subscriptionId: string,
@ -174,6 +193,7 @@ export const updatePredictionsUsageWithCredits = async (
const stripeManager = await StripeManager.getInstance()
if (stripeManager) {
const subscriptionDetails = await usageCacheManager.getSubscriptionDetails(subscriptionId)
logger.info(`subscription details: ${JSON.stringify(subscriptionDetails)}`)
if (subscriptionDetails && subscriptionDetails.customer) {
await stripeManager.reportMeterUsage(subscriptionDetails.customer as string)
}
@ -187,24 +207,6 @@ export const updatePredictionsUsageWithCredits = async (
}
}
// Enhanced prediction checking that includes credit eligibility
export const checkPredictionsWithCredits = async (orgId: string, subscriptionId: string, usageCacheManager: UsageCacheManager) => {
if (!usageCacheManager || !subscriptionId) return
const eligibility = await checkPredictionEligibility(orgId, subscriptionId, usageCacheManager)
if (!eligibility.allowed) {
throw new InternalFlowiseError(StatusCodes.PAYMENT_REQUIRED, 'Predictions limit exceeded. Please purchase credits to continue.')
}
return {
useCredits: eligibility.useCredits,
remainingCredits: eligibility.remainingCredits,
usage: eligibility.currentUsage,
limit: eligibility.planLimit
}
}
// Storage does not renew per month nor do we store the total size in database, so we just store the total size in cache
export const updateStorageUsage = (orgId: string, _: string = '', totalSize: number, usageCacheManager?: UsageCacheManager) => {
if (!usageCacheManager) return

View File

@ -60,11 +60,26 @@ const calculatePercentage = (count, total) => {
const calculateTotalCredits = (grants) => {
if (!grants || !Array.isArray(grants)) return 0
return grants.reduce((total, grant) => {
const grantValue = grant?.amount?.monetary?.value || 0
return total + grantValue
const grantCredits = grant?.credits || 0
return total + grantCredits
}, 0)
}
const calculateTotalUsage = (grants) => {
if (!grants || !Array.isArray(grants)) return 0
return grants.reduce((total, grant) => {
const usage = grant?.usage || 0
return total + usage
}, 0)
}
const calculateAvailableCredits = (grants) => {
if (!grants || !Array.isArray(grants)) return 0
const totalCredits = calculateTotalCredits(grants)
const totalUsage = calculateTotalUsage(grants)
return Math.max(0, totalCredits - totalUsage)
}
const AccountSettings = () => {
const theme = useTheme()
const dispatch = useDispatch()
@ -101,8 +116,15 @@ const AccountSettings = () => {
const totalCredits = useMemo(() => {
return creditsBalance ? calculateTotalCredits(creditsBalance.grants) : 0
}, [creditsBalance])
const totalUsage = useMemo(() => {
return creditsBalance ? calculateTotalUsage(creditsBalance.grants) : 0
}, [creditsBalance])
const availableCredits = useMemo(() => {
return creditsBalance ? calculateAvailableCredits(creditsBalance.grants) : 0
}, [creditsBalance])
const [creditsPackages, setCreditsPackages] = useState([])
const [usageWithCredits, setUsageWithCredits] = useState(null)
const [openCreditsDialog, setOpenCreditsDialog] = useState(false)
const [selectedPackage, setSelectedPackage] = useState(null)
const [isPurchasingCredits, setIsPurchasingCredits] = useState(false)
@ -178,12 +200,6 @@ const AccountSettings = () => {
}
}, [getCreditsPackagesApi.data])
useEffect(() => {
if (getUsageWithCreditsApi.data) {
setUsageWithCredits(getUsageWithCreditsApi.data)
}
}, [getUsageWithCreditsApi.data])
useEffect(() => {
if (openRemoveSeatsDialog || openAddSeatsDialog || openCreditsDialog) {
setSeatsQuantity(0)
@ -849,21 +865,21 @@ const AccountSettings = () => {
<Stack sx={{ alignItems: 'center' }} flexDirection='row'>
<Typography variant='body2'>Available Credits:</Typography>
<Typography sx={{ ml: 1, color: theme.palette.success.dark }} variant='h3'>
{getCreditsBalanceApi.loading ? <CircularProgress size={16} /> : availableCredits || 0}
</Typography>
</Stack>
<Stack sx={{ alignItems: 'center' }} flexDirection='row'>
<Typography variant='body2'>Credits Used:</Typography>
<Typography sx={{ ml: 1, color: 'inherit' }} variant='h3'>
{getCreditsBalanceApi.loading ? <CircularProgress size={16} /> : totalUsage || 0}
</Typography>
</Stack>
<Stack sx={{ alignItems: 'center' }} flexDirection='row'>
<Typography variant='body2'>Total Credits Purchased:</Typography>
<Typography sx={{ ml: 1, color: 'inherit' }} variant='h3'>
{getCreditsBalanceApi.loading ? <CircularProgress size={16} /> : totalCredits || 0}
</Typography>
</Stack>
{usageWithCredits && (
<Stack sx={{ alignItems: 'center' }} flexDirection='row'>
<Typography variant='body2'>Credits Used This Month:</Typography>
<Typography sx={{ ml: 1, color: 'inherit' }} variant='h3'>
{getUsageWithCreditsApi.loading ? (
<CircularProgress size={16} />
) : (
usageWithCredits?.creditsUsed || 0
)}
</Typography>
</Stack>
)}
<Typography
sx={{ opacity: customization.isDarkMode ? 0.7 : 1 }}
variant='body2'
@ -1641,7 +1657,10 @@ const AccountSettings = () => {
Current Balance
</Typography>
<Typography variant='h4' color='success.main'>
{totalCredits || 0} Credits
{availableCredits || 0} Available Credits
</Typography>
<Typography variant='body2' color='text.secondary' sx={{ mt: 1 }}>
Total: {totalCredits || 0} | Used: {totalUsage || 0}
</Typography>
</Box>