This post explains how you can do safe updates of documents in Cosmos DB with optimistic concurrency control by using the etag of a document. Cosmos DB implements optimistic concurrency and you can not lock documents while reading from or writing to a document.
Optimistic concurrency means that if two concurrent operations attempts to update a document within a logical partion, one operation will win and the other operation will fail. When this happens, your document will most likely contain incorrect data and it might be important to have correct data in the document.
Optimistic Concurrency Control (OCC) allows you to prevent lost updates and to keep your data correct. OCC can be implemented by using the etag of a document. The value of the etag is automatically generated and updated by the server every time a document is updated. By checking if the etag has changed between a read and an update, you can ensure that you are updating the latest version of the document.
Cosmos DB Client
I have already written a post about a generic cosmos db client that includes the methods that we need. We need to get a document with an etag and to update a document by checking the etag. I also include code for the ModelItem type and the ModelPage type.
public async Task<ModelItem<T>> GetByIdWithEtag<T>(string id, string partion_key)
{
// Create variables to return
ModelItem<T> model = new ModelItem<T>();
try
{
// Get the response
ResourceResponse<Document> response = await this.client.ReadDocumentAsync(UriFactory.CreateDocumentUri(this.options.Database, this.options.Collection, id),
new RequestOptions { PartitionKey = new PartitionKey(partion_key) });
// Get the document
Document document = response.Resource;
// Get the etag
model.etag = document.ETag;
// Get the post
model.item = (T)(dynamic)document;
}
catch (DocumentClientException de)
{
// Log the exception
this.logger.LogDebug(de, $"GetByIdWithEtag", null);
model.error = true;
}
// Return the model
return model;
} // End of the GetByIdWithEtag method
public async Task<bool> Update<T>(string id, T item, string etag)
{
try
{
// Create an access condition
AccessCondition ac = new AccessCondition { Condition = etag, Type = AccessConditionType.IfMatch };
// Update the document
await this.client.ReplaceDocumentAsync(UriFactory.CreateDocumentUri(this.options.Database, this.options.Collection, id), item, new RequestOptions { AccessCondition = ac });
}
catch (DocumentClientException de)
{
// Check for exceptions
if (de.StatusCode == HttpStatusCode.PreconditionFailed)
{
return false;
}
else
{
// Log the exception
this.logger.LogError(de, $"Update, id: {id}, etag: {etag}", null);
}
}
// Return a success response
return true;
} // End of the Update method
public class ModelItem<T>
{
#region Variables
public T item { get; set; }
public string etag { get; set; }
public bool error { get; set; }
#endregion
#region Constructors
public ModelItem()
{
// Set values for instance variables
this.item = default(T);
this.etag = "";
this.error = false;
} // End of the constructor
public ModelItem(T item, string etag, bool error)
{
// Set values for instance variables
this.item = item;
this.etag = etag;
this.error = false;
} // End of the constructor
#endregion
} // End of the class
public class ModelPage<T>
{
#region Variables
public IList<T> items { get; set; }
public string ct { get; set; }
public bool error { get; set; }
#endregion
#region Constructors
public ModelPage()
{
// Set values for instance variables
this.items = new List<T>();
this.ct = "";
this.error = false;
} // End of the constructor
#endregion
} // End of the class
Update with etag
The method below is used to get a list with locations and to save the continuation token in a document so that we can continue to get more locations at a later point in time. We need to implement optimistic concurrency control when the continuation token is saved. We first get a continuation token and then we loop until we successfully have updated the JobLocationsPage document.
public async Task<ModelPage<LocationDocument>> GetLocations(Int32 page_size)
{
// Create the locations page to return
ModelPage<LocationDocument> page = new ModelPage<LocationDocument>();
// Get the job locations page
ModelItem<JobLocationsPage> job_tuple = await this.cosmos_database_repository.GetByIdWithEtag<JobLocationsPage>("xx481cd9-7961-4c6e-960e-7cb6e5cde5e8", "xx481cd9-7961-4c6e-960e-7cb6e5cde5e8");
// Loop until a successful update
while (true)
{
// Get locations
page = await this.location_repository.GetChunk("page_name", "ASC", page_size, job_tuple.item.continuation_token);
// Update the job locations page
job_tuple.item.continuation_token = page.ct;
bool success = await this.cosmos_database_repository.Update<JobLocationsPage>(job_tuple.item.id, job_tuple.item, job_tuple.etag);
// Check if the update failed
if (success == false)
{
// Get the tuple again
job_tuple = await this.cosmos_database_repository.GetByIdWithEtag<JobLocationsPage>("xx481cd9-7961-4c6e-960e-7cb6e5cde5e8", "xx481cd9-7961-4c6e-960e-7cb6e5cde5e8");
// Continue the loop
continue;
}
// Break out from the loop
break;
} // End of the while(true) loop
// Return the page with locations
return page;
} // End of the GetLocations method