Hi Celigo Team,
I am reaching out to get guidance on implementing a short-lived locking mechanism inside integrator.io to prevent duplicate processing caused by a webhook feedback loop.
Background
We have a Shopify customers/update webhook that triggers a flow (Flow A) which syncs customer data to Microsoft F&O. At the end of this process, it writes an internal F&O ID back to a Shopify customer metafields.
This metafield write causes Shopify to trigger the customers/update webhook a second time, triggering Flow A again for the same customer. This results in the same customer being processed twice concurrently.
We need a mechanism to detect and discard this echo webhook run before any processing occurs.
Questions
-
What is the recommended Celigo-native approach for preventing the same record from being processed twice across concurrent flow runs triggered by the same webhook?
-
Is there a way for a hook script or filter script to read and write temporary state data that persists across concurrent flow runs? If so, what are the options?
-
Are there any built-in Celigo features specifically designed for deduplication or concurrency control at the record level? Does look up cache support such function?
Any documentation, examples, or recommended patterns for this specific scenario would be greatly appreciated.
Thank you
Hey Kate,
You can solve this with a lookup cache + preSavePage hook on the webhook export. The lookup cache persists across flow runs, so the hook can check whether a record key (e.g. customer ID) has already been processed and discard the echo webhook before anything downstream fires.
How it works
- Create a lookup cache (e.g. "customers") to store processed record keys
- Register the lookup cache somewhere in the integration — the
lookupcaches JSRT runtime object is only accessible in hook scripts when the lookup cache is referenced somewhere within the integration (any flow, mapping, transform, tool, or API). It doesn't have to be on the same flow step. In the attached example I used a dummy transform on the export, but it could be referenced anywhere in the integration
- Add a preSavePage hook that:
- Extracts the dedup key from each record (e.g.
customer.id)
- Calls
lookupcaches.getData() to batch-check if those keys already exist
- Filters out any records that are already cached (the echo webhooks)
- Calls
lookupcaches.putData() to write new keys for future runs
The script
import { lookupcaches } from 'integrator-api';
/*
* dedupCacheId: The _id of your lookup cache.
* Create one in Celigo and paste its _id here.
*/
var dedupCacheId = 'YOUR_LOOKUP_CACHE_ID';
/*
* dedupField: Dot-notation path to the unique identifier field.
* Examples: "id", "customer.id", "order.number"
* Leave empty to compare the entire record.
*/
var dedupField = 'id';
function preSavePage(options) {
if (!dedupCacheId) {
throw new Error('dedupCacheId is required');
}
var keys = [];
for (var i = 0; i < options.data.length; i++) {
var record = options.data[i];
if (dedupField) {
var fieldVal = resolveField(record, dedupField);
keys.push(fieldVal != null ? String(fieldVal) : stableStringify(record));
} else {
keys.push(stableStringify(record));
}
}
// Check which keys already exist in the cache
var cachedKeys = {};
try {
var cacheResult = lookupcaches.getData({ _id: dedupCacheId, keys: keys });
var cacheData = Array.isArray(cacheResult) ? cacheResult
: (cacheResult && Array.isArray(cacheResult.data)) ? cacheResult.data
: [];
for (var j = 0; j < cacheData.length; j++) {
var entry = cacheData[j];
if (entry && entry.key && entry.value != null) {
cachedKeys[entry.key] = true;
}
}
} catch (e) {
// Cache read failed — still dedup within the page
}
// Filter duplicates (within page + already in cache)
var seen = {};
var uniqueData = [];
var newCacheEntries = [];
for (var i = 0; i < options.data.length; i++) {
var key = keys[i];
if (!seen[key] && !cachedKeys[key]) {
seen[key] = true;
uniqueData.push(options.data[i]);
newCacheEntries.push({ key: key, value: { ts: new Date().toISOString() } });
}
}
// Write new keys for future runs
if (newCacheEntries.length > 0) {
try {
lookupcaches.putData({ _id: dedupCacheId, data: newCacheEntries });
} catch (e) {}
}
return {
data: uniqueData,
errors: options.errors,
abort: false,
newErrorsAndRetryData: []
};
}
function resolveField(obj, path) {
var parts = path.split('.');
var current = obj;
for (var i = 0; i < parts.length; i++) {
if (current == null) return undefined;
current = current[parts[i]];
}
return current;
}
function stableStringify(value) {
if (value === null || value === undefined) return String(value);
if (typeof value !== 'object') return JSON.stringify(value);
if (Array.isArray(value)) {
return '[' + value.map(function (item) { return stableStringify(item); }).join(',') + ']';
}
var keys = Object.keys(value).sort();
return '{' + keys.map(function (key) {
return JSON.stringify(key) + ':' + stableStringify(value[key]);
}).join(',') + '}';
}
Note on lookup cache + JSRT
Per the JSRT runtime objects docs, the lookupcaches object is only available in hook scripts when the lookup cache is referenced somewhere within the integration — any flow, mapping, transform, tool, or API will do. It just needs to be registered. In the attached example flow I used a dummy transform on the export for simplicity, but you could reference it in any other part of your integration that already uses mappings.
I've attached an example flow zip you can import to see the full setup.
69d81db52675573023564a8b.zip (4.4 KB)
One more thing — you'll probably want to handle cache cleanup and avoid permanently blocking a record from ever processing again. You can add a time threshold so that if a record key exists in the cache but is older than X minutes, it's treated as a new legitimate event rather than an echo.
Replace the filter block in the script with this:
/*
* dedupThresholdMinutes: How long (in minutes) a cached key is considered
* a duplicate. After this window, the same key is treated as a new event.
* Set to 0 to always block duplicates (no expiry).
*/
var dedupThresholdMinutes = 5;
Then update the filtering logic:
for (var i = 0; i < options.data.length; i++) {
var key = keys[i];
var isDuplicate = false;
if (seen[key]) {
isDuplicate = true;
} else if (cachedKeys[key]) {
if (dedupThresholdMinutes > 0) {
var cachedTs = new Date(cachedKeys[key]).getTime();
var now = new Date().getTime();
var diffMinutes = (now - cachedTs) / 60000;
isDuplicate = diffMinutes < dedupThresholdMinutes;
} else {
isDuplicate = true;
}
}
if (!isDuplicate) {
seen[key] = true;
uniqueData.push(options.data[i]);
newCacheEntries.push({ key: key, value: { ts: new Date().toISOString() } });
}
}
And update the cache parsing to store the timestamp value instead of just true:
for (var j = 0; j < cacheData.length; j++) {
var entry = cacheData[j];
if (entry && entry.key && entry.value != null) {
cachedKeys[entry.key] = entry.value.ts || true;
}
}
So for your Shopify scenario — the customer update webhook fires, gets processed, writes back to metafields, Shopify echoes the webhook within seconds. With a 5-minute threshold, the echo gets discarded. But if that same customer legitimately updates again an hour later, it processes normally.
You could also periodically clean up stale cache entries via the lookup cache API endpoints if the cache grows large over time.