add support for adding cache tags and invalidation by tags

This commit is contained in:
clarencenpy 2018-06-18 23:16:30 -07:00
parent ab8562ef42
commit af9b4e9af4
4 changed files with 274 additions and 16 deletions

View file

@ -1,4 +1,11 @@
export interface KeyValueCache {
get(key: string): Promise<string | undefined>;
set(key: string, value: string, options?: { ttl?: number }): Promise<void>;
set(
key: string,
value: string,
options?: { ttl?: number; tags?: string[] },
): Promise<void>;
invalidate(tags: string[]): Promise<void>;
flush(): Promise<void>;
close(): Promise<void>;
}

View file

@ -3,8 +3,9 @@ import {
mockDate,
unmockDate,
} from '../../../../__mocks__/date';
import { KeyValueCache } from '../src/KeyValueCache';
export function testKeyValueCache(keyValueCache: any) {
export function testKeyValueCache(keyValueCache: KeyValueCache) {
describe('KeyValueCache Test Suite', () => {
beforeAll(() => {
mockDate();
@ -26,13 +27,20 @@ export function testKeyValueCache(keyValueCache: any) {
expect(await keyValueCache.get('missing')).not.toBeDefined();
});
it('is able to expire keys based on ttl', async () => {
it('can set same key multiple times', async () => {
await keyValueCache.set('hello', 'world');
await keyValueCache.set('hello', 'world');
expect(await keyValueCache.get('hello')).toBe('world');
expect(await keyValueCache.get('missing')).not.toBeDefined();
});
it('can expire keys based on ttl', async () => {
await keyValueCache.set('short', 's', { ttl: 1 });
await keyValueCache.set('long', 'l', { ttl: 5 });
expect(await keyValueCache.get('short')).toBe('s');
expect(await keyValueCache.get('long')).toBe('l');
advanceTimeBy(1500);
jest.advanceTimersByTime(1500);
advanceTimeBy(1000);
jest.advanceTimersByTime(1000);
expect(await keyValueCache.get('short')).not.toBeDefined();
expect(await keyValueCache.get('long')).toBe('l');
advanceTimeBy(4000);
@ -40,5 +48,90 @@ export function testKeyValueCache(keyValueCache: any) {
expect(await keyValueCache.get('short')).not.toBeDefined();
expect(await keyValueCache.get('long')).not.toBeDefined();
});
it('can set tags', async () => {
await keyValueCache.set('tagged', 'data', {
ttl: 1,
tags: ['tag1', 'tag2'],
});
expect(await keyValueCache.get('tagged')).toBe('data');
});
it('can invalidate tags', async () => {
await keyValueCache.set('key1', 'v1', {
ttl: 10,
tags: ['tag1', 'tag2'],
});
await keyValueCache.set('key2', 'v2', {
ttl: 10,
tags: ['tag2', 'tag3'],
});
expect(await keyValueCache.get('key1')).toBe('v1');
expect(await keyValueCache.get('key2')).toBe('v2');
await keyValueCache.invalidate(['tag1']);
expect(await keyValueCache.get('key1')).not.toBeDefined();
expect(await keyValueCache.get('key2')).toBe('v2');
await keyValueCache.invalidate(['tag3']);
expect(await keyValueCache.get('key1')).not.toBeDefined();
expect(await keyValueCache.get('key2')).not.toBeDefined();
});
it('can invalidate tag for multiple keys', async () => {
await keyValueCache.set('key1', 'v1', {
ttl: 10,
tags: ['tag1', 'tag2'],
});
await keyValueCache.set('key2', 'v2', {
ttl: 10,
tags: ['tag2', 'tag3'],
});
expect(await keyValueCache.get('key1')).toBe('v1');
expect(await keyValueCache.get('key2')).toBe('v2');
await keyValueCache.invalidate(['tag2']);
expect(await keyValueCache.get('key1')).not.toBeDefined();
expect(await keyValueCache.get('key2')).not.toBeDefined();
});
it('can reset tags', async () => {
await keyValueCache.set('key1', 'v1', {
ttl: 10,
tags: ['tag1', 'tag2'],
});
await keyValueCache.set('key1', 'v1', {
ttl: 10,
tags: ['tag2', 'tag3'],
});
await keyValueCache.invalidate(['tag1']);
expect(await keyValueCache.get('key1')).toBe('v1');
await keyValueCache.invalidate(['tag3']);
expect(await keyValueCache.get('key1')).not.toBeDefined();
});
it('can invalidate tags before they have been set', async () => {
await keyValueCache.invalidate(['tag1']);
await keyValueCache.set('key1', 'v1', {
ttl: 10,
tags: ['tag1', 'tag2'],
});
expect(await keyValueCache.get('key1')).toBe('v1');
});
it('can invalidate tags after keys expire', async () => {
await keyValueCache.set('key1', 'v1', {
ttl: 10,
tags: ['tag1'],
});
advanceTimeBy(5000);
jest.advanceTimersByTime(5000);
expect(await keyValueCache.get('key1')).toBe('v1');
advanceTimeBy(5000);
jest.advanceTimersByTime(5000);
// key has expired
await keyValueCache.invalidate(['tag1']);
expect(await keyValueCache.get('key1')).not.toBeDefined();
});
});
}

View file

@ -2,16 +2,36 @@ import { KeyValueCache } from 'apollo-server-caching';
import Memcached from 'memcached';
import { promisify } from 'util';
/**
* Defines a KeyValueCache implementation that supports cache tag invalidation.
* When cache entry is associated with 1 or more tags, we store tag:version
* pairs as metadata, as well as the current version associated with each tag.
* Version numbers are simply integers that are incremented whenever a tag is
* invalidated. For a cache entry to be valid, all of its associated tags must
* have version numbers that match their current version numbers.
*
* Performance:
* - set() : # of db reads = # of tags
* - get() : # of db reads = # of tags
* - invalidate() : # of db writes/incr = 1 for each tag
*
* Storage overhead:
* - key->data + tag:version pairs for each tag
* - tag->version stored for each tag
*/
export class MemcachedCache implements KeyValueCache {
readonly client;
readonly defaultSetOptions = {
ttl: 300,
tags: [],
};
constructor(serverLocation: Memcached.Location, options?: Memcached.options) {
this.client = new Memcached(serverLocation, options);
// promisify client calls for convenience
this.client.get = promisify(this.client.get).bind(this.client);
this.client.getMulti = promisify(this.client.getMulti).bind(this.client);
this.client.incr = promisify(this.client.incr).bind(this.client);
this.client.set = promisify(this.client.set).bind(this.client);
this.client.flush = promisify(this.client.flush).bind(this.client);
}
@ -19,14 +39,72 @@ export class MemcachedCache implements KeyValueCache {
async set(
key: string,
data: string,
options?: { ttl?: number },
options?: { ttl?: number; tags?: string[] },
): Promise<void> {
const { ttl } = Object.assign({}, this.defaultSetOptions, options);
await this.client.set(key, data, ttl);
const { ttl, tags } = Object.assign({}, this.defaultSetOptions, options);
// get current versions for all tags
let tagVersions = {};
if (tags.length > 0) {
tagVersions = await this.client.getMulti(tags);
// initialize tag versions that don't exist yet
const setOperations: any[] = [];
for (const tag of tags) {
if (!tagVersions[tag]) {
tagVersions[tag] = 1;
setOperations.push([tag, 1, 0]);
}
}
// wait for all tag versions to initialize
await Promise.all(setOperations.map(op => this.client.set(...op)));
}
const payload = {
d: data,
t: tagVersions,
};
await this.client.set(key, JSON.stringify(payload), ttl);
}
async get(key: string): Promise<string | undefined> {
return await this.client.get(key);
const data = await this.client.get(key);
if (!data) return;
// deserialize data
const payload = JSON.parse(data);
// compare tag versions if cache entry against current versions
const tagVersions = payload.t;
if (Object.keys(tagVersions).length !== 0) {
const currentTagVersions = await this.client.getMulti(
Object.keys(tagVersions),
);
for (const tag in currentTagVersions) {
if (tagVersions[tag] != currentTagVersions[tag]) {
return; // tag has been invalidated, therefore cache entry not valid
}
}
}
// all tag versions up to date
return payload.d;
}
async invalidate(tags: string[]): Promise<void> {
// increment version numbers for all the tags to be invalidated
if (tags.length > 0) {
const incrOperations: any[] = [];
for (const tag of tags) {
incrOperations.push([tag, 1]);
}
// Note: incr operation simply returns false when key is not present
// No need to increment those tag:versions since nothing depends on that
// tag yet.
await Promise.all(incrOperations.map(op => this.client.incr(...op)));
}
}
async flush(): Promise<void> {

View file

@ -2,16 +2,36 @@ import { KeyValueCache } from 'apollo-server-caching';
import Redis from 'redis';
import { promisify } from 'util';
/**
* Defines a KeyValueCache implementation that supports cache tag invalidation.
* When cache entry is associated with 1 or more tags, we store tag:version
* pairs as metadata, as well as the current version associated with each tag.
* Version numbers are simply integers that are incremented whenever a tag is
* invalidated. For a cache entry to be valid, all of its associated tags must
* have version numbers that match their current version numbers.
*
* Performance:
* - set() : # of db reads = # of tags
* - get() : # of db reads = # of tags
* - invalidate() : # of db writes/incr = 1 for each tag
*
* Storage overhead:
* - key->data + tag:version pairs for each tag
* - tag->version stored for each tag
*/
export class RedisCache implements KeyValueCache {
readonly client;
readonly defaultSetOptions = {
ttl: 300,
tags: [],
};
constructor(options: Redis.ClientOpts) {
this.client = Redis.createClient(options);
// promisify client calls for convenience
this.client.get = promisify(this.client.get).bind(this.client);
this.client.mget = promisify(this.client.mget).bind(this.client);
this.client.incr = promisify(this.client.incr).bind(this.client);
this.client.set = promisify(this.client.set).bind(this.client);
this.client.flushdb = promisify(this.client.flushdb).bind(this.client);
this.client.quit = promisify(this.client.quit).bind(this.client);
@ -22,17 +42,77 @@ export class RedisCache implements KeyValueCache {
data: string,
options?: { ttl?: number },
): Promise<void> {
const { ttl } = Object.assign({}, this.defaultSetOptions, options);
await this.client.set(key, data, 'EX', ttl);
const { ttl, tags } = Object.assign({}, this.defaultSetOptions, options);
// get current versions for all tags
let tagVersions = {} as any;
if (tags.length > 0) {
const tagVersionsArr = await this.client.mget(tags);
const setOperations: any[] = [];
for (let i = 0; i < tagVersionsArr.length; i++) {
let version: number;
if (tagVersionsArr[i] === null) {
version = 1;
// tag:version does not exist yet, initialize it
setOperations.push([tags[i], version]);
} else {
version = parseInt(tagVersionsArr[i]);
}
tagVersions[tags[i]] = version;
}
// wait for all tag versions to initialize
await Promise.all(setOperations.map(op => this.client.set(...op)));
}
const payload = {
d: data,
t: tagVersions,
};
await this.client.set(key, JSON.stringify(payload), 'EX', ttl);
}
async get(key: string): Promise<string | undefined> {
const reply = await this.client.get(key);
// reply is null if key is not found
if (reply !== null) {
return reply;
const data = await this.client.get(key);
if (data === null) return; // null is returned if key is not found
// deserialize data
const payload = JSON.parse(data);
// compare tag versions of cache entry against current versions
const tagVersions = payload.t;
const tags = Object.keys(tagVersions);
if (tags.length !== 0) {
const currentTagVersionsArr = await this.client.mget(tags);
for (let i = 0; i < currentTagVersionsArr.length; i++) {
if (
currentTagVersionsArr[i] !== null &&
parseInt(currentTagVersionsArr[i]) !== tagVersions[tags[i]]
) {
return; // tag has been invalidated, therefore cache entry not valid
}
}
}
// all tag versions up to date
return payload.d;
}
async invalidate(tags: string[]): Promise<void> {
// increment version numbers for all the tags to be invalidated
if (tags.length > 0) {
const tagsToIncr: any[] = [];
for (const tag of tags) {
tagsToIncr.push(tag);
}
// Note: in Redis, if key does not exist, it is set to 0 before performing
// the incr operation.
await Promise.all(tagsToIncr.map(tag => this.client.incr(tag)));
}
return;
}
async flush(): Promise<void> {