Many of the third party services we integrate in our applications use webhooks to communicate events to us. Responding to the incoming webhook with success response (status 200) is all that is required to acknowledge the receipt of the webhook. To process that webhook immediately upon receiving or entering it in a queue to be processed later is up to you.
In this post, I am sharing a way to build and process a mongodb based webhook queue for node/express applications.
MongoDB and mongoose are of course required.
Webhook Schema
const mongoose = require('mongoose');
const WebhookSchema = new mongoose.Schema({
accountId: { type: String, index: true },
notificationType: { type: String, index: true },
receivedAt: { type: Date, default: Date.now, index: true, expires: '14d' },
processed: { type: Boolean, index: true, default: false },
processedAt: { type: Date }
//more relevant fields related to the webhook
});
module.exports = mongoose.model("Webhook", WebhookSchema);
accountId
or any other field (such assubscriptionId
) referencing the document (like user) you want to update on receiving webhooknotificationType
identifies the type of the webhook. Based on this changes are made to the user document. Common notification type examples are subscription.created, subscription.upgraded, account.deletedreceivedAt
tells when webhook was received. We also set up the document for destruction after 14 days (time to live), as we don’t need it after processing is done, apart from a few days for recordprocessed
the queue finds only unprocessed webhooksprocessedAt
records when the webhook got processed, if it did
Controller Method (Route)
router.post('/api/webhook', function(req, res) {
const webhooks = req.body.webhooks.map((w)=>{
return {
notificationType: w.type,
accountId: w.accountId
};
});
Webhook.create(webhooks, function (err) {
if (err) {
console.log(err);
return res.status(500).end(); //something went wrong, the service should send the webhook again
}
else {
return res.status(200).end(); // acknowledge that the webhook(s) is received.
}
})
});
This is the route you register with the service provider as endpoint where webhooks will be sent. Dump the incoming webhooks to Webhook
.
Here assuming that one call to this endpoint can have multiple webhooks (many providers bundle webhooks this way). If it’s one per webhook, insert single document:
const webhook = {
notificationType: w.type,
accountId: w.accountId
};
Webhook.create(webhook, function (err) {...}
Webhook Queue
Finally, the queue script. Let’s call it webhookqueue.js
const mongoose = require('mongoose');
const Webhook = require('./webhook.js');
mongoose.connect('mongodb://127.0.0.1/webapp');
mongoose.connection.on('connected', () => {
console.log("connected");
if (!this.instance) {
console.log("creating new WebhookQueue instance");
this.instance = new WebhookQueue();
} else {
console.log("WebhookQueue instance already exists");
}
});
const WebhookQueue = function () {
const self = this;
self.checkHooksInterval = 5000;
setTimeout(self.checkHooks.bind(self), self.checkHooksInterval);
}
const pt = WebhookQueue.prototype;
pt.checkHooks = function () {
const self = this;
console.log("** Checking Webhook Queue ***")
Webhook.find({ processed: false })
.limit(10)
.exec()
.then(function (webhooks) {
if (webhooks.length) {
console.log("Pending Webhooks Found");
// Process the webhooks and mark them processed: true
setTimeout(self.checkHooks.bind(self), self.checkHooksInterval);
}
})
.catch(function (err) {
console.log("something went wrong", err);
setTimeout(self.checkHooks.bind(self), self.checkHooksInterval);
})
}
We instantiate the WebhookQueue as soon as mongodb connection is made. Then we check for pending webhooks in the queue every 5 seconds (increase or decrease this time based on urgency of processing). On every turn 10 unprocessed webhooks are looked for, and, if found, processed and setprocessed
to true
. Finally, checkHooks
method is set for re-running after next 5 seconds with setTimeout
.
And that’s it. Start this script by including and calling it on starting the express server (the one that has '/api/webhook'
api), or start it as a stand alone script with node webhookqueue
.
Why Use Webhook Queue?
Why should we have a queue in the first place? Why not just process the webhooks when they are received? That’s totally doable if webhooks are few and far between. Actually, it’s better that way instead of dealing with extra overhead of the queue.
But for high load and frequent webhooks, especially if the processing involved upon receiving those webhooks is complex/high too, it’s a better option for a couple of reasons:
-
Sometimes the service provider can be down. When it’s up and running, it will try to clear as many of your stuck webhooks as possible by sending them over to your api endpoint, resulting in an unexpected spike in your CPU usage (as you’re processing immediately upon receiving), and probably some failures.
-
Though remote, it’s still a possibility that you acknowledge receiving the webhook, and before you’re done processing it, the server crashes (for any reason). Now, since you have acknowledged, the service is not going to attempt sending that webhook again, and it could not be processed because of your server crash, so the webhook is lost forever. This can be taken care of by acknowledging the receipt after the process is complete. But it’s not a good idea for a complex processing, as that may take a while, and some services consider it a failure when no acknowledgement is received within few seconds.
Some Providers With Webhook Communication
See also
- Node JS Mongo Client for Atlas Data API
- SignatureDoesNotMatch: The request signature we calculated does not match the signature you provided. Check your key and signing method.
- Exactly Same Query Behaving Differently in Mongo Client and Mongoose
- MongoDB Single Update Query to Change the Field Name in All Matching Documents of the Collection
- AWS Layer: Generate nodejs Zip Layer File Based on the Lambda's Dependencies
- In Node JS HTML to PDF conversion, Populate Images From URLs
- Convert HTML to PDF in Nodejs