Reactive Extensions. Api client with Cache-Aside & Refresh-Ahead strategy. Part 1.
Published on:
Hi,
Today I want to talk about the development of the api client library. Let’s say it is an imaginary GitHub RESTfull Api that returns user’s rating. To make this routine more interesting we’ll add caching and mix it with Reactive Extensions. In fact the article is a summary of my Windows Phone development experience, and the approach in question was taken in a several applications with different modifications.
Note: In this article I assume that the reader is familiar with the concept of Reactive Programming and Reactive Extensions.
Let’s say that we have an url like that
nonexisting-api.github.com/v1/raiting/user_name
which returns the following json response:
{
"id" : "requested user name",
"rating" : 123,
"lastModified" : "2015-07-20"
}
public async Task<RatingModel> GetRatingForUser(string userName)
{
var ratingResponse = await HttpClient.Get(userName);
if (!ratingaResponse.IsSuccessful)
{
throw ratingResponse.Exception;
}
return ratingResponse.Data;
}
- The app has to start quickly;
- It has to render the cached data first;
- It should try getting the fresh data;
- If succeeded it should put fresh data to its cache;
- Render the new data or report the error;
Let me illustrate it with a sequence diagram. I hope I still remember how to draw them :) There are different local cache strategies. The one I’m going to follow is a Cache-Aside one.
The basic idea is to treat the cache as a passive data storage so that the responsibility to update the cached data is delegated to the cache client.
The points 2,3 and 4 are somewhat the realization of the Refresh-Ahead caching pattern. Not the classical version though, but that’s what we need patterns for.
Implementaton
Let’s create a new Class Library (.NET 4.5) project in visual studio and add the Rx-Main NuGet package to it.
Install-Package Rx-Main
We’ll need the interface for the cache.
public interface ICache<TKey, TValue> where TValue : IEntity<TKey>
{
bool HasCached(TKey key);
TValue GetCachedItem(TKey key);
void Put(TValue updatedRating);
}
public interface IRatingCache : ICache<string, RatingModel>
{
}
The http-client abstraction:
public interface IHttpClient
{
Task<RatingResponse> Get(string userName);
}
And the models
public class RatingResponse
{
public bool IsSuccessful { get; set; }
public RatingModel Data { get; set; }
public Exception Exception { get; set; }
}
public class RatingModel
{
public string UserName { get; set; }
public int Rating{ get; set; }
public DateTime LastModified { get; set; }
}
Let’s keep the actual http-request, parsing, deserialization and error handling outside the scope of this article
The api client interface will look like that
public interface IRatingClient
{
IObservable<RatingModel> GetRatingForUser(string userName);
}
The key part here is the IObservable<T> i.e. the stream of events which you can subscribe to.
The Rx power is in the ability to build the pipeline using the basic blocks, which are quite simple. We can wrap the cache usage into the reusable component/rx-operator:
public static IObservable<T> WithCache<T>(
this IObservable<T> source,
Func<T> get,
Action<T> put) where T : class
{
return Observable.Create<T>(observer =>
{
var cached = get();
if (cached != null)
{
observer.OnNext(cached);
}
source.Subscribe(item =>
{
put(item);
observer.OnNext(item);
}, observer.OnError, observer.OnCompleted);
return Disposable.Empty;
});
}
Note that there is neither dependency on cache type, nor any other information about the cache.
Generally it’s not a great idea to re-render the UI with no reason. To avoid an unnecessary invocation of OnNext delegate we’re going to use DistinctUntilChanged operator.
To do that we should define the custom comparer:
public class RatingComparer : IEqualityComparer<RatingModel>
{
public bool Equals(RatingModel x, RatingModel y)
{
return x.Rating == y.Rating
&& x.LastModified == y.LastModified
&& x.UserName == y.UserName;
}
public int GetHashCode(RatingModel obj)
{
return obj.GetHashCode();
}
}
- Cache
- Rx caching operator
- Duplicates filter
That means we are ready to chain them together and build the RxGitHubClient implementation:
public sealed class RxGitHubClient : IRatingClient
{
private IRatingCache Cache { get; set; }
private IHttpClient HttpClient { get; set; }
private IScheduler Scheduler { get; set; }
public RxGitHubClient(IRatingCache cache,
IHttpClient httpClient,
IScheduler scheduler)
{
Cache = cache;
HttpClient = httpClient;
Scheduler = scheduler;
}
public IObservable<RatingModel> GetRatingForUser(string userName)
{
return GetRatingIntrn(userName)
.WithCache(
() => Cache.GetCachedItem(userName),
model => Cache.Put(model))
.DistinctUntilChanged(new RatingModelComparer());
}
private IObservable<RatingModel> GetRatingIntrn(string userName)
{
return Observable.Create<RatingModel>(observer =>
Scheduler.Schedule(async () =>
{
var ratingResponse =
await HttpClient.Get(userName);
if (!ratingResponse.IsSuccessful)
{
observer.OnError(ratingResponse.Exception);
}
else
{
observer.OnNext(ratingResponse.Data);
observer.OnCompleted();
}
}));
}
}
Now the usage of the client becomes as simple as the client from the first example.
Client.GetRatingForUser(userName)
.Subscribe(RenderRating);
Links and sources
- ReactveX home
- Introduction to Rx
- The Reactive Extensions (Rx)
- Read-Through, Write-Through, Write-Behind, and Refresh-Ahead Caching
- Cache-Aside Pattern
- www.websequencediagrams.com