You can subscribe to many different events using our Stripe integration.
We’re going to run through the steps required to create the onPriceCreated event.
Here’s an example Job using it, you should create Jobs like this while you’re developing so you can test your trigger. See the integration testing guide for more info.
import{ Stripe }from"@trigger.dev/stripe";const stripe =newStripe({ id:"stripe", apiKey: process.env["STRIPE_API_KEY"]!,});client.defineJob({ id:"stripe-on-price", name:"Stripe On Price", version:"0.1.0",//this is what we're going to add in this guide trigger: stripe.onPriceCreated(),run:async(payload, io, ctx)=>{//do stuff with the payloadawait io.logger.info("price created!",{ current: payload.currency });},});
An event is something that happens in the external system. In this case, it’s a price being created. We use EventSpecifications to define events.
We have many events exported from this file, but here’s the one we care about:
integrations/stripe/events.ts
exportconst onPriceCreated: EventSpecification<OnPriceEvent>={//this name matches the name of the event in the Stripe API name:"price.created",//used for display in the Dashboard UI title:"On Price Created", source:"stripe.com", icon:"stripe",//examples appear in the Testing UI in the Dashboard. They're optional but they make the DX really nice :) examples:[{ id:"recurring", name:"Recurring Price", icon:"stripe", payload:{ id:"price_1NYV6vI0XSgju2urKsSmI53v", object:"price",//...},},],//you can just cast the payload to the type you want hereparsePayload:(payload)=> payload as OnPriceEvent,//these properties are displayed in the Dashboard UIrunProperties:(payload)=>[{ label:"Price ID", text: payload.id }],};
We import all the events from the file above, and then we can add a trigger for the event we want.
integrations/stripe/index.ts
//... other imports//this imports all the events from the file above, as an Objectimport*as events from"./events";exportclassStripeimplementsTriggerIntegration{//...//the source is used to register webhooks and to process the data when receivedgetsource(){returncreateWebhookEventSource(this);}//the actual trigger. This one has some configuration options (the optional params)onPriceCreated(params?:{ connect?:boolean; filter?: EventFilter }){returncreateTrigger(this.source, events.onPriceCreated, params ??{ connect:false});}}//this creates a type that is the union of all the eventstypeStripeEvents=(typeof events)[keyoftypeof events];//this is the type of the trigger we're creatingtypeCreateTriggersResult<TEventSpecification extends StripeEvents>= ExternalSourceTrigger< TEventSpecification, ReturnType<typeof createWebhookEventSource>>;functioncreateTrigger<TEventSpecification extends StripeEvents>( source: ReturnType<typeof createWebhookEventSource>, event: TEventSpecification, params: TriggerParams): CreateTriggersResult<TEventSpecification>{returnnewExternalSourceTrigger({ event, params, source, options:{},});}
The ExternalSource is response for registering webhooks and processing the data when it’s received.
integrations/stripe/index.ts
//...everything we've already covered//this defines the shape of the data that Stripe returns when we create/update a webhookconst WebhookDataSchema = z.object({ id: z.string(), object: z.literal("webhook_endpoint"), api_version: z.string().nullable(), application: z.string().nullable(), created: z.number(), description: z.string().nullable(), enabled_events: z.array(z.string()), livemode: z.boolean(), metadata: z.record(z.string()), status: z.enum(["enabled","disabled"]), url: z.string(),});functioncreateWebhookEventSource(//the integration is used to register the webhook integration: Stripe// { connect?: boolean } comes through from the params in the ExternalSourceTrigger we defined above): ExternalSource<Stripe,{ connect?:boolean},"HTTP",{}>{returnnewExternalSource("HTTP",{//this needs to be unique in Trigger.dev//there's only one stripe webhook endpoint (for all events), so we use "stripe.webhook" id:"stripe.webhook",//this is the schema for the params that come through from the ExternalSourceTrigger schema: z.object({ connect: z.boolean().optional()}), version:"0.1.0", integration,//if the key is the same then a webhook will be updated (with any new events added)//if the key is different then a new webhook will be created//in Stripe's case we can have webhooks with multiple events BUT not shared between connect and non-connectkey:(params)=>`stripe.webhook${params.connect ?".connect":""}`,//the webookHandler is called when the webhook is received, this is the last step we'll cover handler: webhookHandler,//this function is called when the webhook is registeredregister:async(event, io, ctx)=>{const{ params, source: httpSource, options }= event;//httpSource.data is the stored data about the existing webhooks that has the same key//httpSource.data will be undefined if no webhook has been registered yet with the same keyconst webhookData = WebhookDataSchema.safeParse(httpSource.data);//this is the full list of events that we want to register (when we add more than just onPriceCreated)const allEvents =Array.from(newSet([...options.event.desired,...options.event.missing]));const registeredOptions ={ event: allEvents,};//if there is already an active source (i.e. a webhook has been registered)//and httpSource.data was parsed successfullyif(httpSource.active && webhookData.success){//there are no missing events, so we don't need to update the webhookif(options.event.missing.length ===0)return;//we want to update the existing webhook with the new events//this uses the Task we created aboveconst updatedWebhook =await io.integration.webhookEndpoints.update("update-webhook",{ id: webhookData.data.id, url: httpSource.url, enabled_events: allEvents asunknownas WebhookEvents[],});//when registering new events, we need to return the data and the optionsreturn{ data: WebhookDataSchema.parse(updatedWebhook), options: registeredOptions,};}//if there is no active source, or httpSource.data wasn't parsed successfully,//but we might be able to add events to an existing webhookconst listResponse =await io.integration.webhookEndpoints.list("list-webhooks",{ limit:100,});//if one of these webhooks has the URL we want, we can update itconst existingWebhook = listResponse.data.find((w)=> w.url === httpSource.url);if(existingWebhook){//add all the events to the webhookconst updatedWebhook =await io.integration.webhookEndpoints.update("update-found-webhook",{ id: existingWebhook.id, url: httpSource.url, enabled_events: allEvents asunknownas WebhookEvents[], disabled:false,});//return the data and the registered optionsreturn{ data: WebhookDataSchema.parse(updatedWebhook), options: registeredOptions,};}//there are no matching webhooks, so we need to create a new oneconst webhook =await io.integration.webhookEndpoints.create("create-webhook",{ url: httpSource.url, enabled_events: allEvents asunknownas WebhookEvents[], connect: params.connect,});//when creating a new webhook, we need to also return the secret that Stripe sends us//the secret is used to validate the webhook payloads we receivereturn{ data: WebhookDataSchema.parse(webhook), secret: webhook.secret, options: registeredOptions,};},});}
When a webhook is received, we need to validate the signature, parse the payload, and return the events. Each event will turn into a payload that can trigger a Job run.
integrations/stripe/index.ts
//...everything we've already covered//this function is called when a webhook is receivedasyncfunctionwebhookHandler(event: HandlerEvent<"HTTP">, logger: Logger){ logger.debug("[@trigger.dev/stripe] Handling webhook payload");//The rawEvent is a Request, from this you can get the body, headers, etc.//The source has info on the ExternalSource that received the webhookconst{ rawEvent: request, source }= event;//no body means no eventsif(!request.body){ logger.debug("[@trigger.dev/stripe] No body found");return{ events:[]};}//in Stripe's case, we need to get the text from the request, and we'll use their SDK to get an eventconst rawBody =await request.text();//it's important to verify webhooks payloads because anyone can send data to a URLconst signature = request.headers.get("stripe-signature");if(signature){//The Stripe SDK is used to validate the signatureconst stripeClient =newStripeClient("",{ apiVersion:"2022-11-15"});try{//this will throw an error if the signature is invalidconst event = stripeClient.webhooks.constructEvent(rawBody, signature, source.secret);return{//move than one event can be returned, but for most APIs it will just be one events:[{ id: event.id, payload: event.data.object, source:"stripe.com",//this name should match the EventSpecification name name: event.type, timestamp:newDate(event.created *1000),//the context can be any format, it will be passed to the Job run's context.source context:{ apiVersion: event.api_version, livemode: event.livemode, request: event.request, previousAttributes: event.data.previous_attributes,},},],//you can also return a response, which will be sent back to the webhook sender//but this is optional, we return a 200 which is normally the requirement//response: {// status: 200,// body: "ok",// headers: {// "Content-Type": "text/plain",// },//},//metadata can be any format, is stored and can be used in the next webhookHandler call// metadata: {// requestId: event.request,// }};}catch(error){if(error instanceofError){ logger.error("[@trigger.dev/stripe] Error while validating webhook signature",{ error:{ name: error.name, message: error.message },});}else{ logger.error("[@trigger.dev/stripe] Unknown Error while validating webhook signature");}return{ events:[]};}}//if there's no signature, we can't validate the payloadreturn{ events:[],};}