This post describes how you can create a generic Cosmos DB SQL client in ASP.NET Core. Azure Cosmos DB is a NoSQL database that is designed with horizontal partitioning and multi-master replication on a global scale. Azure Cosmos DB provides native support for NoSQL and OSS APIs including MongoDB, Cassandra, Gremlin and SQL.
Cosmos DB stores data in JSON documents and every document represent a model. A JSON document in Cosmos DB should have properties for id, partion key and type. Id and partion key is usually GUID:s, the type property is used when the document should be deserialized to a model.
Options
Our generic Cosmos DB client need options and we have created a class for these options. We use an appsettings.json file to store values for these options.
public class CosmosDatabaseOptions
{
#region Variables
public string Uri { get; set; }
public string Key { get; set; }
public string Database { get; set; }
public string Collection { get; set; }
public Int32 MaxDegreeOfParallelism { get; set; }
public Int32 MaxBufferedItemCount { get; set; }
public Int32 RequestTimeoutInSeconds { get; set; }
public Int32 RetryCount { get; set; }
public Int32 WaitTimeInSeconds { get; set; }
#endregion
#region Constructors
/// <summary>
/// Create a new post
/// </summary>
public CosmosDatabaseOptions()
{
// Set values for instance variables
this.Uri = "";
this.Key = "";
this.Database = "";
this.Collection = "";
this.MaxDegreeOfParallelism = -1;
this.MaxBufferedItemCount = 100;
this.RequestTimeoutInSeconds = 60;
this.RetryCount = 9;
this.WaitTimeInSeconds = 30;
} // End of the constructor
#endregion
} // End of the class
{
"Logging": {
"IncludeScopes": false,
"LogLevel": {
"Default": "Information"
}
},
"CosmosDatabaseOptions": {
"Uri": "https://mysite.documents.azure.com:443/",
"Key": "YOUR-KEY",
"Database": "development",
"Collection": "items",
"MaxDegreeOfParallelism": -1,
"MaxBufferedItemCount": 100
}
}
Services
We need to register options and repositories in the ConfigureServices method in the StartUp class. We register our Cosmos DB client and a static page repository that will use our client.
public void ConfigureServices(IServiceCollection services)
{
// Add the mvc framework
services.AddRazorPages();
// Create options
services.Configure<CosmosDatabaseOptions>(configuration.GetSection("CosmosDatabaseOptions"));
// Add repositories
services.AddSingleton<ICosmosDatabaseRepository, CosmosDatabaseRepository>();
services.AddSingleton<IStaticPageRepository, StaticPageRepository>();
} // End of the ConfigureServices method
Interface
public interface ICosmosDatabaseRepository
{
Task<bool> CreateDatabase();
Task<bool> CreateCollection();
Task<bool> Add<T>(T item);
Task<bool> Upsert<T>(T item);
Task<bool> Update<T>(string id, T item);
Task<bool> Update<T>(string id, T item, string etag);
Task<ModelItem<T>> GetById<T>(string id, string partion_key);
Task<ModelItem<T>> GetByIdWithEtag<T>(string id, string partion_key);
Task<ModelItem<T>> GetByQuery<T>(string sql, SqlParameterCollection parameters);
Task<ModelItem<T>> GetByQueryWithEtag<T>(string sql, SqlParameterCollection parameters);
Task<ModelPage<T>> GetListByQuery<T>(string sql, SqlParameterCollection parameters, Int32 page_size, string ct);
Task<bool> DeleteOnId(string id, string partion_key);
Task<string> RunStoredProcedure(string stored_procedure_id, string partion_key, dynamic[] parameters);
void Dispose();
} // End of the interface
Client
Our generic client for Cosmos DB includes methods for Insert, Update, Upsert, Get and Delete. The class handles logging, this can be changed if you want to return errors instead. The ModelItem class is a wrapper class that includes item, etag and a boolean for errors.
public class CosmosDatabaseRepository : ICosmosDatabaseRepository
{
#region Variables
private readonly ILogger logger;
private readonly CosmosDatabaseOptions options;
private readonly DocumentClient client;
#endregion
#region Constructors
public CosmosDatabaseRepository(ILogger<ICosmosDatabaseRepository> logger, IOptions<CosmosDatabaseOptions> options)
{
// Set values for instance variables
this.logger = logger;
this.options = options.Value;
// Create a connection policy
ConnectionPolicy connectionPolicy = new ConnectionPolicy();
connectionPolicy.RequestTimeout = TimeSpan.FromSeconds(this.options.RequestTimeoutInSeconds);
connectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = this.options.RetryCount;
connectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = this.options.WaitTimeInSeconds;
connectionPolicy.ConnectionMode = ConnectionMode.Direct;
connectionPolicy.ConnectionProtocol = Protocol.Tcp;
// Create a document client
this.client = new DocumentClient(new Uri(this.options.Uri), this.options.Key, connectionPolicy);
// Call OpenAsync to avoid startup latency on first request
this.client.OpenAsync();
} // End of the constructor
#endregion
#region Create methods
public async Task<bool> CreateDatabase()
{
try
{
await client.ReadDatabaseAsync(UriFactory.CreateDatabaseUri(this.options.Database));
}
catch (DocumentClientException de)
{
if (de.StatusCode == HttpStatusCode.NotFound)
{
await client.CreateDatabaseAsync(new Database { Id = this.options.Database });
}
else
{
// Log the exception
this.logger.LogError(de, $"Create database: {this.options.Database}", null);
return false;
}
}
// Return success
return true;
} // End of the CreateDatabase method
public async Task<bool> CreateCollection()
{
try
{
await client.ReadDocumentCollectionAsync(UriFactory.CreateDocumentCollectionUri(this.options.Database, this.options.Collection));
}
catch (DocumentClientException de)
{
if (de.StatusCode == HttpStatusCode.NotFound)
{
await client.CreateDocumentCollectionAsync(
UriFactory.CreateDatabaseUri(this.options.Database),
new DocumentCollection { Id = this.options.Collection },
new RequestOptions { OfferThroughput = 1000, PartitionKey = new PartitionKey("/id") });
}
else
{
// Log the exception
this.logger.LogError(de, $"Create collection: {this.options.Collection}", null);
return false;
}
}
// Return success
return true;
} // End of the CreateCollection method
public async Task<bool> Add<T>(T item)
{
try
{
// Create the document
await this.client.CreateDocumentAsync(UriFactory.CreateDocumentCollectionUri(this.options.Database, this.options.Collection), item);
}
catch (DocumentClientException de)
{
// Log the exception
this.logger.LogError(de, $"Add", null);
return false;
}
// Return success
return true;
} // End of the Add method
#endregion
#region Update methods
public async Task<bool> Upsert<T>(T item)
{
try
{
// Upsert the document
await this.client.UpsertDocumentAsync(UriFactory.CreateDocumentCollectionUri(this.options.Database, this.options.Collection), item);
}
catch (DocumentClientException de)
{
// Log the exception
this.logger.LogError(de, $"Upsert", null);
return false;
}
// Return success
return true;
} // End of the Upsert method
public async Task<bool> Update<T>(string id, T item)
{
try
{
await this.client.ReplaceDocumentAsync(UriFactory.CreateDocumentUri(this.options.Database, this.options.Collection, id), item);
}
catch (DocumentClientException de)
{
// Log the exception
this.logger.LogError(de, $"Update, id: {id}", null);
return false;
}
// Return success
return true;
} // End of the Update method
public async Task<bool> Update<T>(string id, T item, string etag)
{
try
{
// Create an access condition
AccessCondition ac = new AccessCondition { Condition = etag, Type = AccessConditionType.IfMatch };
// Update the document
await this.client.ReplaceDocumentAsync(UriFactory.CreateDocumentUri(this.options.Database, this.options.Collection, id), item, new RequestOptions { AccessCondition = ac });
}
catch (DocumentClientException de)
{
// Check for exceptions
if (de.StatusCode == HttpStatusCode.PreconditionFailed)
{
return false;
}
else
{
// Log the exception
this.logger.LogError(de, $"Update, id: {id}, etag: {etag}", null);
}
}
// Return a success response
return true;
} // End of the Update method
#endregion
#region Get methods
public async Task<ModelItem<T>> GetById<T>(string id, string partion_key)
{
// Create variables to return
ModelItem<T> model = new ModelItem<T>();
try
{
// Get the response
DocumentResponse<T> response = await this.client.ReadDocumentAsync<T>(UriFactory.CreateDocumentUri(this.options.Database, this.options.Collection, id), new RequestOptions { PartitionKey = new PartitionKey(partion_key) });
// Get the post
model.item = response.Document;
}
catch (DocumentClientException de)
{
// Log the exception
this.logger.LogDebug(de, $"GetById", null);
model.error = true;
}
// Return the model
return model;
} // End of the GetById method
public async Task<ModelItem<T>> GetByIdWithEtag<T>(string id, string partion_key)
{
// Create variables to return
ModelItem<T> model = new ModelItem<T>();
try
{
// Get the response
ResourceResponse<Document> response = await this.client.ReadDocumentAsync(UriFactory.CreateDocumentUri(this.options.Database, this.options.Collection, id),
new RequestOptions { PartitionKey = new PartitionKey(partion_key) });
// Get the document
Document document = response.Resource;
// Get the etag
model.etag = document.ETag;
// Get the post
model.item = (T)(dynamic)document;
}
catch (DocumentClientException de)
{
// Log the exception
this.logger.LogDebug(de, $"GetByIdWithEtag", null);
model.error = true;
}
// Return the model
return model;
} // End of the GetByIdWithEtag method
public async Task<ModelItem<T>> GetByQuery<T>(string sql, SqlParameterCollection parameters)
{
// Create variables to return
ModelItem<T> model = new ModelItem<T>();
// Set query options
FeedOptions queryOptions = new FeedOptions { EnableCrossPartitionQuery = true, MaxDegreeOfParallelism = this.options.MaxDegreeOfParallelism, MaxBufferedItemCount = this.options.MaxBufferedItemCount, MaxItemCount = 1 };
try
{
// Create a query
IDocumentQuery<T> query = this.client.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(this.options.Database, this.options.Collection),
new SqlQuerySpec
{
QueryText = sql,
Parameters = parameters
}, queryOptions).AsDocumentQuery();
// Get the response
FeedResponse<T> response = await query.ExecuteNextAsync<T>();
// Get the post
foreach (T item in response)
{
model.item = item;
break;
}
}
catch (DocumentClientException de)
{
// Log the exception
this.logger.LogError(de, $"GetByQuery", null);
model.error = true;
}
// Return the model
return model;
} // End of the GetByQuery method
public async Task<ModelItem<T>> GetByQueryWithEtag<T>(string sql, SqlParameterCollection parameters)
{
// Create variables to return
ModelItem<T> model = new ModelItem<T>();
// Set query options
FeedOptions queryOptions = new FeedOptions { EnableCrossPartitionQuery = true, MaxDegreeOfParallelism = this.options.MaxDegreeOfParallelism, MaxBufferedItemCount = this.options.MaxBufferedItemCount, MaxItemCount = 1 };
try
{
// Create a query
IDocumentQuery<Document> query = this.client.CreateDocumentQuery<Document>(UriFactory.CreateDocumentCollectionUri(this.options.Database, this.options.Collection),
new SqlQuerySpec
{
QueryText = sql,
Parameters = parameters
}, queryOptions).AsDocumentQuery();
// Get the response
FeedResponse<Document> response = await query.ExecuteNextAsync<Document>();
// Get the post
foreach (Document item in response)
{
model.item = (T)(dynamic)item;
model.etag = item.ETag;
break;
}
}
catch (DocumentClientException de)
{
// Log the exception
this.logger.LogError(de, $"GetByQueryWithEtag", null);
model.error = true;
}
// Return the model
return model;
} // End of the GetByQueryWithEtag method
public async Task<ModelPage<T>> GetListByQuery<T>(string sql, SqlParameterCollection parameters, Int32 page_size, string ct)
{
// Make sure that ct not is an empty string
ct = string.IsNullOrEmpty(ct) ? null : ct;
// Create variables to return
ModelPage<T> page = new ModelPage<T>();
// Set query options
FeedOptions queryOptions = new FeedOptions { EnableCrossPartitionQuery = true, MaxDegreeOfParallelism = this.options.MaxDegreeOfParallelism, MaxBufferedItemCount = this.options.MaxBufferedItemCount, MaxItemCount = page_size, RequestContinuation = ct };
try
{
// Create the query
IDocumentQuery<T> query = this.client.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(this.options.Database, this.options.Collection),
new SqlQuerySpec
{
QueryText = sql,
Parameters = parameters
}, queryOptions).AsDocumentQuery();
// Get the response
FeedResponse<T> response = await query.ExecuteNextAsync<T>();
// Get the continuation token
page.ct = response.ResponseContinuation;
// Get posts
page.items = response.ToList<T>();
}
catch (DocumentClientException de)
{
// Log the exception
this.logger.LogError(de, $"GetListByQuery", null);
page.error = true;
}
// Return the page
return page;
} // End of the GetListByQuery method
#endregion
#region Delete methods
public async Task<bool> DeleteOnId(string id, string partion_key)
{
// Create a boolean that indicates success
bool success = false;
try
{
// Delete a document
await this.client.DeleteDocumentAsync(UriFactory.CreateDocumentUri(this.options.Database, this.options.Collection, id), new RequestOptions { PartitionKey = new PartitionKey(partion_key) });
// The delete operation was successful
success = true;
}
catch (DocumentClientException de)
{
if (de.StatusCode == HttpStatusCode.NotFound)
{
// The post was not found, do not raise an exception
return true;
}
else
{
// Log the exception
this.logger.LogError(de, $"DeleteOnId", null);
}
}
// Return the success boolean
return success;
} // End of the DeleteOnId method
#endregion
#region Stored procedures
public async Task<string> RunStoredProcedure(string stored_procedure_id, string partion_key, dynamic[] parameters)
{
// Create a string to return
string result = "";
try
{
// Run the stored procedure
result = await this.client.ExecuteStoredProcedureAsync<string>(UriFactory.CreateStoredProcedureUri(this.options.Database, this.options.Collection, stored_procedure_id), new RequestOptions { PartitionKey = new PartitionKey(partion_key) }, parameters);
}
catch (DocumentClientException de)
{
// Log the exception
this.logger.LogError(de, $"RunStoredProcedure", null);
}
// Return the string
return result;
} // End of the RunStoredProcedure method
#endregion
#region Dispose methods
public void Dispose()
{
this.client.Dispose();
} // End of the Dispose method
#endregion
} // End of the class
How to use the client?
We have a repository that handles static pages and this repository uses our Cosmos DB client.
public class StaticPageRepository : IStaticPageRepository
{
#region Variables
private readonly ICosmosDatabaseRepository cosmos_database_repository;
#endregion
#region Constructors
public StaticPageRepository(ICosmosDatabaseRepository cosmos_database_repository)
{
this.cosmos_database_repository = cosmos_database_repository;
} // End of the constructor
#endregion
#region Add methods
public async Task<bool> Add(StaticPageDocument item)
{
// Create a document
return await this.cosmos_database_repository.Add<StaticPageDocument>(item);
} // End of the Add method
#endregion
#region Update methods
public async Task<bool> Upsert(StaticPageDocument item)
{
// Upsert a document
return await this.cosmos_database_repository.Upsert<StaticPageDocument>(item);
} // End of the Upsert method
public async Task<bool> Update(StaticPageDocument item)
{
// Replace a document
return await this.cosmos_database_repository.Update<StaticPageDocument>(item.id, item);
} // End of the Update method
#endregion
#region Get methods
public async Task<ModelItem<StaticPageDocument>> GetById(string id)
{
// Return the post
return await this.cosmos_database_repository.GetById<StaticPageDocument>(id, id);
} // End of the GetById method
public async Task<ModelItem<StaticPagePost>> GetByPageName(string page_name, string language_code)
{
// Create the sql string
string key = $"s.translations.{language_code}";
string sql = $"SELECT s.id, s.meta_robots, s.show_as_page, s.page_name, s.main_image, {key}.link_name, {key}.title, {key}.text_html, {key}.meta_description, {key}.meta_keywords "
+ $"FROM s WHERE s.model_type = @model_type AND s.page_name = @page_name";
// Create parameters
SqlParameterCollection parameters = new SqlParameterCollection()
{
new SqlParameter("@model_type", "static_page"),
new SqlParameter("@page_name", page_name)
};
// Return the post
return await this.cosmos_database_repository.GetByQuery<StaticPagePost>(sql, parameters);
} // End of the GetByPageName method
public async Task<ModelPage<StaticPagePost>> GetByConnectionId(Int32 connection_id, string language_code, string sort_field, string sort_order, Int32 page_size, string ct)
{
// Make sure that sort variables are valid
sort_field = GetValidSortField(sort_field);
sort_order = GetValidSortOrder(sort_order);
// Create the sql string
string key = $"s.translations.{language_code}";
string sql = $"SELECT s.id, s.meta_robots, s.show_as_page, s.page_name, s.main_image, {key}.link_name, {key}.title, {key}.text_html, {key}.meta_description, {key}.meta_keywords "
+ $"FROM s WHERE s.model_type = @model_type AND s.connection_id = @connection_id ORDER BY s.{sort_field} {sort_order}";
// Create parameters
SqlParameterCollection parameters = new SqlParameterCollection()
{
new SqlParameter("@model_type", "static_page"),
new SqlParameter("@connection_id", connection_id)
};
// Return posts
return await this.cosmos_database_repository.GetListByQuery<StaticPagePost>(sql, parameters, page_size, ct);
} // End of the GetByConnectionId method
public async Task<ModelPage<StaticPageMeta>> GetMetaByConnectionId(Int32 connection_id, string language_code, string sort_field, string sort_order, Int32 page_size, string ct)
{
// Make sure that sort variables are valid
sort_field = GetValidSortField(sort_field);
sort_order = GetValidSortOrder(sort_order);
// Create the sql string
string key = $"s.translations.{language_code}";
string sql = $"SELECT s.id, s.meta_robots, s.show_as_page, s.page_name, s.main_image, {key}.link_name, {key}.title, {key}.meta_description, {key}.meta_keywords "
+ $"FROM s WHERE s.model_type = @model_type AND s.connection_id = @connection_id ORDER BY s.{sort_field} {sort_order}";
// Create parameters
SqlParameterCollection parameters = new SqlParameterCollection()
{
new SqlParameter("@model_type", "static_page"),
new SqlParameter("@connection_id", connection_id)
};
// Return posts
return await this.cosmos_database_repository.GetListByQuery<StaticPageMeta>(sql, parameters, page_size, ct);
} // End of the GetMetaByConnectionId method
public async Task<ModelPage<StaticPageMeta>> GetBySearch(string keywords, string language_code, string sort_field, string sort_order, Int32 page_size, string ct)
{
// Make sure that sort variables are valid
sort_field = GetValidSortField(sort_field);
sort_order = GetValidSortOrder(sort_order);
// Check if there is keywords
bool keywords_exists = string.IsNullOrEmpty(keywords) == false ? true : false;
// Create the sql string
string key = $"s.translations.{language_code}";
string sql = $"SELECT s.id, s.meta_robots, s.show_as_page, s.page_name, s.main_image, {key}.link_name, {key}.title, {key}.meta_description, {key}.meta_keywords FROM s ";
if(keywords_exists == true)
{
sql += "JOIN keywords IN s.keywords ";
}
sql += $"WHERE s.model_type = @model_type ";
if (keywords_exists == true)
{
sql += $"AND keywords = @keywords ";
}
sql += $"ORDER BY s.{sort_field} {sort_order}";
// Create parameters
SqlParameterCollection parameters = new SqlParameterCollection();
parameters.Add(new SqlParameter("@model_type", "static_page"));
if (keywords_exists == true)
{
parameters.Add(new SqlParameter("@keywords", keywords.ToLower()));
}
// Return the list
return await this.cosmos_database_repository.GetListByQuery<StaticPageMeta>(sql, parameters, page_size, ct);
} // End of the GetBySearch method
public async Task<ModelPage<StaticPageMeta>> GetAll(string language_code, string sort_field, string sort_order, Int32 page_size, string ct)
{
// Make sure that sort variables are valid
sort_field = GetValidSortField(sort_field);
sort_order = GetValidSortOrder(sort_order);
// Create the sql string
string key = $"s.translations.{language_code}";
string sql = $"SELECT s.id, s.meta_robots, s.show_as_page, s.page_name, s.main_image, {key}.link_name, {key}.title, {key}.meta_description, {key}.meta_keywords "
+ $"FROM s WHERE s.model_type = @model_type ORDER BY s.{sort_field} {sort_order}";
// Create parameters
SqlParameterCollection parameters = new SqlParameterCollection()
{
new SqlParameter("@model_type", "static_page"),
new SqlParameter("@connection_id", 0),
new SqlParameter("@show_as_page", 1)
};
// Return posts
return await this.cosmos_database_repository.GetListByQuery<StaticPageMeta>(sql, parameters, page_size, ct);
} // End of the GetAll method
#endregion
#region Delete methods
public async Task<bool> DeleteOnId(string id)
{
// Delete a document
return await this.cosmos_database_repository.DeleteOnId(id, id);
} // End of the DeleteOnId method
#endregion
#region Validation
public string GetValidSortField(string sort_field)
{
// Make sure that the sort field is valid
if (sort_field != "date_updated" && sort_field != "page_name")
{
sort_field = "page_name";
}
// Return the string
return sort_field;
} // End of the GetValidSortField method
public string GetValidSortOrder(string sort_order)
{
// Make sure that the sort order is valid
if (sort_order != "ASC" && sort_order != "DESC")
{
sort_order = "ASC";
}
// Return the string
return sort_order;
} // End of the GetValidSortOrder method
#endregion
} // End of the class