Skip to main content

Context Transfer Engine (CTE) SDK

Overview

The CLIO Transfer Engine (formerly CTE) Core is a high-performance distributed storage middleware system built on the Chimaera framework. It provides a flexible blob storage API with advanced features including:

  • Multi-target Storage Management: Register and manage multiple storage backends (file, RAM, NVMe)
  • Blob Storage with Tags: Store and retrieve data blobs with tag-based organization
  • Block-based Data Management: Efficient block-level data placement across multiple targets
  • Performance Monitoring: Built-in telemetry and performance metrics collection
  • Configurable Data Placement: Multiple data placement algorithms (random, round-robin, max bandwidth)
  • Asynchronous Operations: Both synchronous and asynchronous APIs for all operations

CTE Core implements a ChiMod (Chimaera Module) that integrates with the Chimaera distributed runtime system, providing scalable data management across multiple nodes in a cluster.

Installation & Linking

Prerequisites

  • CMake 3.20 or higher
  • C++17 compatible compiler
  • Chimaera framework (chimaera and chimaera_admin packages)
  • yaml-cpp library
  • Python 3.7+ (for Python bindings)
  • nanobind (for Python bindings)

Dependencies

Our docker container has all dependencies installed for you.

docker pull iowarp/iowarp-build:latest

Building CTE Core

# Clone the repository
git clone https://github.com/iowarp/content-transfer-engine.git
cd content-transfer-engine

# Create build directory
mkdir build && cd build

# Configure with CMake (using debug preset as recommended)
cmake .. -DCMAKE_BUILD_TYPE=Debug

# Build the project
make -j

# Install (optional)
sudo make install

Linking to CTE Core in CMake Projects

Add the following to your CMakeLists.txt:

# find iowarp-core
find_package(iowarp-core CONFIG)

# Create your executable or library
add_executable(my_app main.cpp)

# Link against CTE Core libraries using modern target aliases
target_link_libraries(my_app
PRIVATE
wrp_cte::core_client # CTE Core client library
)

Package and Target Naming

CTE Core follows the Chimaera ChiMod naming conventions:

  • Package Name: wrp_cte_core (for find_package(wrp_cte_core REQUIRED))
  • Target Aliases: wrp_cte::core_client, wrp_cte::core_runtime (recommended for linking)
  • Actual Targets: wrp_cte_core_client, wrp_cte_core_runtime
  • Library Files: libwrp_cte_core_client.so, libwrp_cte_core_runtime.so
  • Include Path: wrp_cte/core/ (e.g., #include <wrp_cte/core/core_client.h>)

Dependency Management

The CTE Core ChiMod targets automatically include all required dependencies:

  • Core Chimaera Framework: Automatically linked via wrp_cte::core_client target
  • Admin ChiMod: Available via chimaera::admin_client if needed
  • Include Paths: Automatically configured by ChiMod targets
  • System Dependencies: Handled by the build system (threading, YAML, etc.)

External applications only need to link against the CTE Core targets - all framework dependencies are resolved automatically.

API Reference

Core Client Class

The main entry point for CTE Core functionality is the wrp_cte::core::Client class.

Class Definition

namespace wrp_cte::core {

class Client : public chi::ContainerClient {
public:
// Constructors
Client();
explicit Client(const chi::PoolId &pool_id);

// Container lifecycle
void Create(const hipc::MemContext &mctx,
const chi::PoolQuery &pool_query,
const std::string &pool_name,
const chi::PoolId &custom_pool_id,
const CreateParams &params = CreateParams());

// Target management
chi::u32 RegisterTarget(const hipc::MemContext &mctx,
const std::string &target_name,
chimaera::bdev::BdevType bdev_type,
chi::u64 total_size,
const chi::PoolQuery &target_query = chi::PoolQuery::Local(),
const chi::PoolId &bdev_id = chi::PoolId::GetNull());

chi::u32 UnregisterTarget(const hipc::MemContext &mctx,
const std::string &target_name);

std::vector<std::string> ListTargets(const hipc::MemContext &mctx);

chi::u32 StatTargets(const hipc::MemContext &mctx);

// Tag management
TagId GetOrCreateTag(const hipc::MemContext &mctx,
const std::string &tag_name,
const TagId &tag_id = TagId::GetNull());

bool DelTag(const hipc::MemContext &mctx, const TagId &tag_id);
bool DelTag(const hipc::MemContext &mctx, const std::string &tag_name);

size_t GetTagSize(const hipc::MemContext &mctx, const TagId &tag_id);

// Blob operations
bool PutBlob(const hipc::MemContext &mctx, const TagId &tag_id,
const std::string &blob_name,
chi::u64 offset, chi::u64 size, hipc::Pointer blob_data,
float score, chi::u32 flags);

bool GetBlob(const hipc::MemContext &mctx, const TagId &tag_id,
const std::string &blob_name,
chi::u64 offset, chi::u64 size, chi::u32 flags,
hipc::Pointer blob_data);

bool DelBlob(const hipc::MemContext &mctx, const TagId &tag_id,
const std::string &blob_name);

chi::u32 ReorganizeBlob(const hipc::MemContext &mctx,
const TagId &tag_id,
const std::string &blob_name,
float new_score);

// Blob metadata operations
float GetBlobScore(const hipc::MemContext &mctx, const TagId &tag_id,
const std::string &blob_name);

chi::u64 GetBlobSize(const hipc::MemContext &mctx, const TagId &tag_id,
const std::string &blob_name);

std::vector<std::string> GetContainedBlobs(const hipc::MemContext &mctx,
const TagId &tag_id);

// Telemetry
std::vector<CteTelemetry> PollTelemetryLog(const hipc::MemContext &mctx,
std::uint64_t minimum_logical_time);

// Async variants (all methods have Async versions)
hipc::FullPtr<CreateTask> AsyncCreate(...);
hipc::FullPtr<RegisterTargetTask> AsyncRegisterTarget(...);
hipc::FullPtr<UnregisterTargetTask> AsyncUnregisterTarget(...);
hipc::FullPtr<ListTargetsTask> AsyncListTargets(...);
hipc::FullPtr<StatTargetsTask> AsyncStatTargets(...);
hipc::FullPtr<GetOrCreateTagTask<CreateParams>> AsyncGetOrCreateTag(...);
hipc::FullPtr<DelTagTask> AsyncDelTag(...);
hipc::FullPtr<GetTagSizeTask> AsyncGetTagSize(...);
hipc::FullPtr<PutBlobTask> AsyncPutBlob(...);
hipc::FullPtr<GetBlobTask> AsyncGetBlob(...);
hipc::FullPtr<DelBlobTask> AsyncDelBlob(...);
hipc::FullPtr<ReorganizeBlobTask> AsyncReorganizeBlob(...);
hipc::FullPtr<GetBlobScoreTask> AsyncGetBlobScore(...);
hipc::FullPtr<GetBlobSizeTask> AsyncGetBlobSize(...);
hipc::FullPtr<GetContainedBlobsTask> AsyncGetContainedBlobs(...);
hipc::FullPtr<PollTelemetryLogTask> AsyncPollTelemetryLog(...);
};

} // namespace wrp_cte::core

Tag Wrapper Class

The wrp_cte::core::Tag class provides a simplified, object-oriented interface for blob operations within a specific tag. This wrapper class eliminates the need to pass TagId and memory context parameters for each operation, making the API more convenient and less error-prone.

Class Definition

namespace wrp_cte::core {

class Tag {
private:
TagId tag_id_;
std::string tag_name_;

public:
// Constructors
explicit Tag(const std::string &tag_name); // Creates or gets existing tag
explicit Tag(const TagId &tag_id); // Uses existing TagId directly

// Blob storage operations
void PutBlob(const std::string &blob_name, const char *data, size_t data_size, size_t off = 0);
void PutBlob(const std::string &blob_name, const hipc::Pointer &data, size_t data_size,
size_t off = 0, float score = 1.0f);

// Asynchronous blob storage
hipc::FullPtr<PutBlobTask> AsyncPutBlob(const std::string &blob_name, const hipc::Pointer &data,
size_t data_size, size_t off = 0, float score = 1.0f);

// Blob retrieval operations
void GetBlob(const std::string &blob_name, char *data, size_t data_size, size_t off = 0); // Automatic memory management
void GetBlob(const std::string &blob_name, hipc::Pointer data, size_t data_size, size_t off = 0); // Manual memory management

// Blob metadata operations
float GetBlobScore(const std::string &blob_name);
chi::u64 GetBlobSize(const std::string &blob_name);
std::vector<std::string> GetContainedBlobs();

// Tag accessor
const TagId& GetTagId() const { return tag_id_; }
};

} // namespace wrp_cte::core

Key Features

  • Automatic Tag Management: Constructor with tag name automatically creates or retrieves existing tags
  • Simplified API: No need to pass TagId or MemContext for each operation
  • Memory Management: Raw data variant automatically handles shared memory allocation and cleanup
  • Exception Safety: Operations throw exceptions on failure for clear error handling
  • Score Support: Blob scoring for intelligent data placement across storage targets
  • Blob Enumeration: GetContainedBlobs() method returns all blob names in the tag

Memory Management Guidelines

For Synchronous Operations:

  • Raw data variant (const char*) automatically manages shared memory lifecycle
  • Shared memory variant requires caller to manage hipc::Pointer lifecycle

For Asynchronous Operations:

  • Only shared memory variant available to avoid memory lifecycle issues
  • Caller must keep hipc::FullPtr<char> alive until async task completes
  • See usage examples below for proper async memory management patterns

Data Structures

CreateParams

Configuration parameters for CTE container creation:

struct CreateParams {
chi::string config_file_path_; // YAML config file path
chi::u32 worker_count_; // Number of worker threads (default: 4)

CreateParams();
CreateParams(const hipc::CtxAllocator<CHI_MAIN_ALLOC_T> &alloc,
const std::string& config_file_path = "",
chi::u32 worker_count = 4);
};

ListTargets Return Type

The ListTargets method returns a vector of target names (strings):

std::vector<std::string> ListTargets(const hipc::MemContext &mctx);

Example usage:

auto target_names = cte_client->ListTargets(mctx);
for (const auto& target_name : target_names) {
std::cout << "Target: " << target_name << "\n";
}

GetOrCreateTag Return Type

The GetOrCreateTag method returns a TagId directly:

TagId GetOrCreateTag(const hipc::MemContext &mctx,
const std::string &tag_name,
const TagId &tag_id = TagId::GetNull());

Example usage:

TagId tag_id = cte_client->GetOrCreateTag(mctx, "my_dataset");

BlobInfo

Blob metadata and block management:

struct BlobInfo {
BlobId blob_id_;
std::string blob_name_;
std::vector<BlobBlock> blocks_; // Ordered blocks making up the blob
float score_; // 0-1 score for reorganization
Timestamp last_modified_;
Timestamp last_read_;

chi::u64 GetTotalSize() const; // Total size from all blocks
};

Note: Individual blob sizes can be queried efficiently using Client::GetBlobSize() or Tag::GetBlobSize() without needing to retrieve full BlobInfo.

BlobBlock

Individual block within a blob:

struct BlobBlock {
chimaera::bdev::Client bdev_client_; // Target client for this block
chi::u64 target_offset_; // Offset within target
chi::u64 size_; // Size of this block
};

CteTelemetry

Telemetry data for performance monitoring:

struct CteTelemetry {
CteOp op_; // Operation type
size_t off_; // Offset within blob
size_t size_; // Size of operation
BlobId blob_id_;
TagId tag_id_;
Timestamp mod_time_;
Timestamp read_time_;
std::uint64_t logical_time_; // For ordering entries
};

enum class CteOp : chi::u32 {
kPutBlob = 0,
kGetBlob = 1,
kDelBlob = 2,
kGetOrCreateTag = 3,
kDelTag = 4,
kGetTagSize = 5,
kGetBlobScore = 6,
kGetBlobSize = 7
};

Global Access

CTE Core provides singleton access patterns:

// Initialize CTE client (must be called once)
// NOTE: This automatically calls CHIMAERA_CLIENT_INIT internally
// config_path: Optional path to YAML configuration file
// pool_query: Pool query type for CTE container creation (default: Dynamic)
bool WRP_CTE_CLIENT_INIT(const std::string &config_path = "",
const chi::PoolQuery &pool_query = chi::PoolQuery::Dynamic());

// Access global CTE client instance
auto *client = WRP_CTE_CLIENT;

Important Notes:

  • WRP_CTE_CLIENT_INIT automatically calls CHIMAERA_CLIENT_INIT internally
  • You do NOT need to call CHIMAERA_CLIENT_INIT separately when using CTE Core
  • Configuration is managed per-Runtime instance (no global ConfigManager singleton)
  • The config file path can also be specified via the WRP_CTE_CONF environment variable

Usage Examples

Basic Initialization

#include <chimaera/chimaera.h>
#include <wrp_cte/core/core_client.h>
#include <wrp_cte/core/core_tasks.h>

int main() {
// Initialize Chimaera runtime
chi::CHIMAERA_RUNTIME_INIT();

// Initialize CTE subsystem
// NOTE: WRP_CTE_CLIENT_INIT automatically calls CHIMAERA_CLIENT_INIT internally
// You do NOT need to call CHIMAERA_CLIENT_INIT separately
wrp_cte::core::WRP_CTE_CLIENT_INIT("/path/to/config.yaml");

// Get global CTE client instance (created during initialization)
auto *cte_client = WRP_CTE_CLIENT;

// The CTE client is now ready to use - no need to call Create() again
// The client is automatically initialized with the pool specified during WRP_CTE_CLIENT_INIT

return 0;
}

Registering Storage Targets

// Get global CTE client
auto *cte_client = WRP_CTE_CLIENT;
hipc::MemContext mctx;

// Register a file-based storage target
std::string target_path = "/mnt/nvme/cte_storage.dat";
chi::u64 target_size = 100ULL * 1024 * 1024 * 1024; // 100GB

chi::u32 result = cte_client->RegisterTarget(
mctx,
target_path,
chimaera::bdev::BdevType::kFile,
target_size
);

if (result == 0) {
std::cout << "Target registered successfully\n";
}

// Register a RAM-based cache target
result = cte_client->RegisterTarget(
mctx,
"/tmp/cte_cache",
chimaera::bdev::BdevType::kRam,
8ULL * 1024 * 1024 * 1024 // 8GB
);

// List all registered targets
auto targets = cte_client->ListTargets(mctx);
for (const auto& target_name : targets) {
std::cout << "Target: " << target_name << "\n";
}

Working with Tags and Blobs

Using the Core Client Directly

// Get global CTE client
auto *cte_client = WRP_CTE_CLIENT;
hipc::MemContext mctx;

// Create or get a tag for grouping related blobs
TagId tag_id = cte_client->GetOrCreateTag(mctx, "dataset_v1");

// Prepare data for storage
std::vector<char> data(1024 * 1024); // 1MB of data
std::fill(data.begin(), data.end(), 'A');

// Allocate shared memory for the data
// NOTE: AllocateBuffer is NOT templated - it returns hipc::FullPtr<char>
hipc::FullPtr<char> shm_buffer = CHI_IPC->AllocateBuffer(data.size());
memcpy(shm_buffer.ptr_, data.data(), data.size());

bool success = cte_client->PutBlob(
mctx,
tag_id,
"blob_001", // Blob name
0, // Offset
data.size(), // Size
shm_buffer.shm_, // Shared memory pointer
0.8f, // Score (0-1, higher = hotter data)
0 // Flags
);

if (success) {
std::cout << "Blob stored successfully\n";

// Get blob size
chi::u64 blob_size = cte_client->GetBlobSize(mctx, tag_id, "blob_001");
std::cout << "Stored blob size: " << blob_size << " bytes\n";

// Get blob score
float blob_score = cte_client->GetBlobScore(mctx, tag_id, "blob_001");
std::cout << "Blob score: " << blob_score << "\n";
}

// Retrieve the blob
auto retrieve_buffer = CHI_IPC->AllocateBuffer(data.size());
success = cte_client->GetBlob(
mctx,
tag_id,
"blob_001", // Blob name for lookup
0, // Offset
data.size(), // Size to read
0, // Flags
retrieve_buffer.shm_ // Buffer for data
);

// Get all blob names in the tag
std::vector<std::string> blob_names = cte_client->GetContainedBlobs(mctx, tag_id);
std::cout << "Tag contains " << blob_names.size() << " blobs\n";
for (const auto& name : blob_names) {
std::cout << " - " << name << "\n";
}

// Get total size of all blobs in tag
size_t tag_size = cte_client->GetTagSize(mctx, tag_id);
std::cout << "Tag total size: " << tag_size << " bytes\n";

// Delete a specific blob
success = cte_client->DelBlob(mctx, tag_id, "blob_001");

// Delete entire tag (removes all blobs)
success = cte_client->DelTag(mctx, tag_id);
// Create tag wrapper - automatically creates or gets existing tag
wrp_cte::core::Tag dataset_tag("dataset_v1");

// Prepare data for storage
std::vector<char> data(1024 * 1024); // 1MB of data
std::fill(data.begin(), data.end(), 'A');

try {
// Store blob - automatically handles shared memory management
dataset_tag.PutBlob("blob_001", data.data(), data.size());
std::cout << "Blob stored successfully\n";

// Get blob size
chi::u64 blob_size = dataset_tag.GetBlobSize("blob_001");
std::cout << "Stored blob size: " << blob_size << " bytes\n";

// Get blob score
float blob_score = dataset_tag.GetBlobScore("blob_001");
std::cout << "Blob score: " << blob_score << "\n";

// Retrieve the blob using automatic memory management (recommended)
std::vector<char> retrieve_data(blob_size);
dataset_tag.GetBlob("blob_001", retrieve_data.data(), blob_size);

// Alternative: Retrieve using manual shared memory management
// auto retrieve_buffer = CHI_IPC->AllocateBuffer(blob_size);
// dataset_tag.GetBlob("blob_001", retrieve_buffer.shm_, blob_size);

std::cout << "Blob retrieved successfully\n";

// Get all blobs in the tag
std::vector<std::string> blob_names = dataset_tag.GetContainedBlobs();
std::cout << "Tag contains " << blob_names.size() << " blobs\n";

} catch (const std::exception& e) {
std::cerr << "Error: " << e.what() << "\n";
}

// For tag-level operations, you still need the core client:
auto *cte_client = WRP_CTE_CLIENT;
hipc::MemContext mctx;

// Get total size of all blobs in tag
size_t tag_size = cte_client->GetTagSize(mctx, dataset_tag.GetTagId());
std::cout << "Tag total size: " << tag_size << " bytes\n";

// Delete entire tag (removes all blobs)
bool success = cte_client->DelTag(mctx, dataset_tag.GetTagId());

Tag Wrapper Usage Examples

The Tag wrapper class provides a more convenient interface for blob operations within a specific tag. Here are comprehensive examples showing different usage patterns:

Basic Tag Wrapper Operations

#include <wrp_cte/core/core_client.h>
#include <iostream>
#include <vector>

// Initialize CTE system (same as before)
// ... initialization code ...

// Create a tag wrapper - automatically creates or gets existing tag
wrp_cte::core::Tag dataset_tag("dataset_v1");

// Store data using the simple raw data interface
std::vector<char> data(1024 * 1024); // 1MB of data
std::fill(data.begin(), data.end(), 'X');

try {
// Simple PutBlob - automatically manages shared memory
dataset_tag.PutBlob("sample_blob", data.data(), data.size());
std::cout << "Blob stored successfully\n";

// Get blob size without retrieving data
chi::u64 blob_size = dataset_tag.GetBlobSize("sample_blob");
std::cout << "Blob size: " << blob_size << " bytes\n";

// Get blob score (data temperature)
float blob_score = dataset_tag.GetBlobScore("sample_blob");
std::cout << "Blob score: " << blob_score << "\n";

} catch (const std::exception& e) {
std::cerr << "Error: " << e.what() << "\n";
}

Memory Management: Automatic vs Manual

The Tag class provides two GetBlob variants to suit different memory management preferences:

wrp_cte::core::Tag data_tag("performance_data");

try {
// Store some test data
std::string test_data = "Sample blob data for retrieval testing";
data_tag.PutBlob("test_blob", test_data.c_str(), test_data.size());

chi::u64 blob_size = data_tag.GetBlobSize("test_blob");
std::cout << "Blob size: " << blob_size << " bytes\n";

// Method 1: Automatic memory management (recommended for most use cases)
std::vector<char> auto_buffer(blob_size);
data_tag.GetBlob("test_blob", auto_buffer.data(), blob_size);
std::cout << "Retrieved with automatic memory management\n";

// Method 2: Manual shared memory management (for advanced use cases)
auto shm_buffer = CHI_IPC->AllocateBuffer(blob_size);
if (!shm_buffer.IsNull()) {
data_tag.GetBlob("test_blob", shm_buffer.shm_, blob_size);
std::cout << "Retrieved with manual shared memory management\n";
// shm_buffer automatically freed when it goes out of scope
}

// Method 1 is preferred because:
// - No shared memory allocation required
// - Automatic cleanup via RAII
// - Works with standard C++ containers
// - Simpler error handling

} catch (const std::exception& e) {
std::cerr << "Memory management example error: " << e.what() << "\n";
}

Advanced Tag Wrapper with Scoring

// Create tag wrapper for time-series data
wrp_cte::core::Tag timeseries_tag("timeseries_2024");

// Store multiple data chunks with different scores (data temperatures)
std::vector<std::vector<char>> chunks;
std::vector<float> scores = {0.9f, 0.7f, 0.5f, 0.2f}; // Hot to cold data
std::vector<std::string> chunk_names = {"latest", "recent", "old", "archived"};

for (size_t i = 0; i < 4; ++i) {
chunks.emplace_back(1024 * 512); // 512KB chunks
std::fill(chunks[i].begin(), chunks[i].end(), 'A' + i);

try {
// For custom scoring, use shared memory version:
auto shm_ptr = CHI_IPC->AllocateBuffer(chunks[i].size());
memcpy(shm_ptr.ptr_, chunks[i].data(), chunks[i].size());
timeseries_tag.PutBlob(chunk_names[i], shm_ptr.shm_, chunks[i].size(), 0, scores[i]);

std::cout << "Stored chunk '" << chunk_names[i] << "' with score " << scores[i] << "\n";

} catch (const std::exception& e) {
std::cerr << "Failed to store chunk " << chunk_names[i] << ": " << e.what() << "\n";
}
}

Blob Retrieval with Tag Wrapper

// Create tag wrapper from existing TagId
TagId existing_tag_id = /* ... get from somewhere ... */;
wrp_cte::core::Tag existing_tag(existing_tag_id);

try {
// First, check if blob exists and get its size
chi::u64 blob_size = existing_tag.GetBlobSize("target_blob");
if (blob_size == 0) {
std::cout << "Blob 'target_blob' not found or empty\n";
return;
}

std::cout << "Blob size: " << blob_size << " bytes\n";

// Allocate shared memory buffer for retrieval
auto retrieve_buffer = CHI_IPC->AllocateBuffer(blob_size);
if (retrieve_buffer.IsNull()) {
throw std::runtime_error("Failed to allocate retrieval buffer");
}

// Retrieve the blob
existing_tag.GetBlob("target_blob", retrieve_buffer.shm_, blob_size);

// Process the retrieved data
ProcessBlobData(retrieve_buffer.ptr_, blob_size);

std::cout << "Successfully retrieved and processed blob\n";

} catch (const std::exception& e) {
std::cerr << "Blob retrieval error: " << e.what() << "\n";
}

Asynchronous Operations with Tag Wrapper

wrp_cte::core::Tag async_tag("async_operations");

// Prepare data for async operations
std::vector<std::vector<char>> async_data;
std::vector<hipc::FullPtr<void>> shm_buffers;
std::vector<hipc::FullPtr<PutBlobTask>> async_tasks;

for (int i = 0; i < 5; ++i) {
// Prepare data
async_data.emplace_back(1024 * 256); // 256KB each
std::fill(async_data[i].begin(), async_data[i].end(), 'Z' - i);

// Allocate shared memory (must keep alive until async operation completes)
auto shm_buffer = CHI_IPC->AllocateBuffer(async_data[i].size());
if (shm_buffer.IsNull()) {
std::cerr << "Failed to allocate shared memory for async operation " << i << "\n";
continue;
}

// Copy data to shared memory
memcpy(shm_buffer.ptr_, async_data[i].data(), async_data[i].size());

try {
// Start async operation (returns immediately)
auto task = async_tag.AsyncPutBlob(
"async_blob_" + std::to_string(i),
shm_buffer.shm_,
async_data[i].size(),
0, // offset
0.6f // score
);

// Store references to keep alive
shm_buffers.push_back(std::move(shm_buffer));
async_tasks.push_back(task);

std::cout << "Started async put for blob " << i << "\n";

} catch (const std::exception& e) {
std::cerr << "Failed to start async put " << i << ": " << e.what() << "\n";
}
}

// Wait for all async operations to complete
std::cout << "Waiting for async operations to complete...\n";
for (size_t i = 0; i < async_tasks.size(); ++i) {
try {
async_tasks[i]->Wait();
if (async_tasks[i]->result_code_ == 0) {
std::cout << "Async operation " << i << " completed successfully\n";
} else {
std::cout << "Async operation " << i << " failed with code "
<< async_tasks[i]->result_code_ << "\n";
}

// Clean up task
CHI_IPC->DelTask(async_tasks[i]);

} catch (const std::exception& e) {
std::cerr << "Error waiting for async operation " << i << ": " << e.what() << "\n";
}
}

// Note: shm_buffers will be automatically cleaned up when they go out of scope

Asynchronous Operations

// Asynchronous blob operations for better performance
auto put_task = cte_client.AsyncPutBlob(
mctx, tag_id, "async_blob", BlobId::GetNull(),
0, data.size(), data_ptr, 0.5f, 0
);

// Do other work while blob is being stored
ProcessOtherData();

// Wait for completion
put_task->Wait();
if (put_task->result_code_ == 0) {
std::cout << "Async put completed successfully\n";
}

// Clean up task
CHI_IPC->DelTask(put_task);

// Multiple async operations
std::vector<hipc::FullPtr<PutBlobTask>> tasks;
for (int i = 0; i < 10; ++i) {
auto task = cte_client.AsyncPutBlob(
mctx, tag_id,
"blob_" + std::to_string(i),
BlobId::GetNull(),
0, data.size(), data_ptr, 0.5f, 0
);
tasks.push_back(task);
}

// Wait for all to complete
for (auto& task : tasks) {
task->Wait();
CHI_IPC->DelTask(task);
}

Performance Monitoring

// Poll telemetry log for performance analysis
std::uint64_t last_logical_time = 0;

auto telemetry = cte_client.PollTelemetryLog(mctx, last_logical_time);

for (const auto& entry : telemetry) {
std::cout << "Operation: ";
switch (entry.op_) {
case CteOp::kPutBlob: std::cout << "PUT"; break;
case CteOp::kGetBlob: std::cout << "GET"; break;
case CteOp::kDelBlob: std::cout << "DEL"; break;
case CteOp::kGetBlobScore: std::cout << "GET_SCORE"; break;
case CteOp::kGetBlobSize: std::cout << "GET_SIZE"; break;
case CteOp::kGetOrCreateTag: std::cout << "GET_TAG"; break;
case CteOp::kDelTag: std::cout << "DEL_TAG"; break;
case CteOp::kGetTagSize: std::cout << "TAG_SIZE"; break;
default: std::cout << "OTHER"; break;
}
std::cout << " Size: " << entry.size_
<< " Offset: " << entry.off_
<< " LogicalTime: " << entry.logical_time_ << "\n";
}

// Update target statistics
cte_client.StatTargets(mctx);

// Check updated target metrics
auto targets = cte_client.ListTargets(mctx);
for (const auto& target : targets) {
std::cout << "Target: " << target.target_name_ << "\n"
<< " Bytes read: " << target.bytes_read_ << "\n"
<< " Bytes written: " << target.bytes_written_ << "\n"
<< " Read ops: " << target.ops_read_ << "\n"
<< " Write ops: " << target.ops_written_ << "\n";
}

Blob Reorganization

// Reorganize blobs based on new access patterns
// Higher scores (closer to 1.0) indicate hotter data

TagId tag_id = tag_info.tag_id_;

// Reorganize multiple blobs by calling ReorganizeBlob once per blob
std::vector<std::string> blob_names = {"blob_001", "blob_002", "blob_003"};
std::vector<float> new_scores = {0.95f, 0.7f, 0.3f}; // Hot, warm, cold

for (size_t i = 0; i < blob_names.size(); ++i) {
chi::u32 result = cte_client.ReorganizeBlob(mctx, tag_id, blob_names[i], new_scores[i]);
if (result == 0) {
std::cout << "Blob " << blob_names[i] << " reorganized successfully\n";
}
}

// Example: Reorganize single blob
chi::u32 result = cte_client.ReorganizeBlob(mctx, tag_id, "important_blob", 0.95f);
if (result == 0) {
std::cout << "Single blob reorganized successfully\n";
}

Configuration

CTE Core uses YAML configuration files for runtime parameters. Configuration can be loaded from:

  1. A file path specified during initialization
  2. Environment variable WRP_CTE_CONF
  3. Programmatically via the Config API

Configuration File Format

# Worker thread configuration
worker_count: 4

# Target management settings
targets:
max_targets: 100
default_target_timeout_ms: 30000
auto_unregister_failed: true

# Performance tuning
performance:
target_stat_interval_ms: 5000 # Target statistics update interval
blob_cache_size_mb: 512 # Cache size for blob operations
max_concurrent_operations: 64 # Max concurrent I/O operations
score_threshold: 0.7 # Threshold for blob reorganization

# Queue configuration for different operation types
queues:
target_management:
lane_count: 2
priority: "low_latency"

tag_management:
lane_count: 2
priority: "low_latency"

blob_operations:
lane_count: 4
priority: "high_latency"

stats:
lane_count: 1
priority: "high_latency"

# Storage device configuration
storage:
# Primary high-performance storage with manual tier score
- path: "/mnt/nvme/cte_primary"
bdev_type: "file"
capacity_limit: "1TB"
score: 0.9 # Optional: Manual tier score (0.0-1.0)

# RAM-based cache (highest tier)
- path: "/tmp/cte_cache"
bdev_type: "ram"
capacity_limit: "8GB"
score: 1.0 # Optional: Manual tier score for fastest access

# Secondary storage (uses automatic scoring)
- path: "/mnt/ssd/cte_secondary"
bdev_type: "file"
capacity_limit: "500GB"
# No score specified - uses automatic bandwidth-based scoring

# Data Placement Engine configuration
dpe:
dpe_type: "max_bw" # Options: "random", "round_robin", "max_bw"

Programmatic Configuration

Configuration in CTE Core is now managed per-Runtime instance, not through a global singleton. Configuration is loaded during initialization through the WRP_CTE_CLIENT_INIT function.

#include <wrp_cte/core/core_client.h>

// Initialize CTE with configuration file
// Configuration is passed to the Runtime during creation
bool success = wrp_cte::core::WRP_CTE_CLIENT_INIT("/path/to/config.yaml");

// Or use environment variable WRP_CTE_CONF
// export WRP_CTE_CONF=/path/to/config.yaml
success = wrp_cte::core::WRP_CTE_CLIENT_INIT();

// Configuration is now embedded in the Runtime instance
// and cannot be modified after initialization

Note: The ConfigManager singleton has been removed. Configuration is now:

  • Loaded once during WRP_CTE_CLIENT_INIT
  • Embedded in the CTE Runtime instance via CreateParams
  • Immutable after initialization
  • Can be specified via file path parameter or WRP_CTE_CONF environment variable

Queue Priority Options

  • "low_latency" - Optimized for minimal latency (chi::kLowLatency)
  • "high_latency" - Optimized for throughput (chi::kHighLatency)

Storage Device Types

  • "file" - File-based block device
  • "ram" - RAM-based block device (for caching)
  • "dev_dax" - Persistent memory device
  • "posix" - POSIX file system interface

Manual Tier Scoring

Storage devices support optional manual tier scoring to override automatic bandwidth-based tier assignment:

Configuration Parameters

  • score (optional, float 0.0-1.0): Manual tier score for the storage device
    • 1.0: Highest tier (fastest access, e.g., RAM, high-end NVMe)
    • 0.8-0.9: High-performance tier (e.g., NVMe SSDs)
    • 0.5-0.7: Medium-performance tier (e.g., SATA SSDs)
    • 0.1-0.4: Low-performance tier (e.g., HDDs, network storage)
    • Not specified: Uses automatic bandwidth-based scoring

Behavior

  • Manual scores are preserved during target statistics updates
  • Targets with manual scores will not be overwritten by automatic scoring algorithms
  • Data placement engines use these scores for intelligent tier selection
  • Mixed configurations (some manual, some automatic) are fully supported

Example Configuration

storage:
# Fastest tier - manual score
- path: "/mnt/ram/cache"
bdev_type: "ram"
capacity_limit: "16GB"
score: 1.0

# High-performance tier - manual score
- path: "/mnt/nvme/primary"
bdev_type: "file"
capacity_limit: "1TB"
score: 0.85

# Medium tier - automatic scoring
- path: "/mnt/ssd/secondary"
bdev_type: "file"
capacity_limit: "2TB"
# Uses automatic bandwidth measurement

Data Placement Engine Types

  • "random" - Random placement across targets
  • "round_robin" - Round-robin placement
  • "max_bw" - Place on target with maximum available bandwidth

Python Bindings

CTE Core provides Python bindings for easy integration with Python applications.

Installation

# Build Python bindings
cd build
cmake .. -DBUILD_PYTHON_BINDINGS=ON
make

# Install Python module
pip install ./wrapper/python

Python API Usage

import wrp_cte_core_ext as cte

# Initialize Chimaera runtime
cte.chimaera_runtime_init()

# Initialize CTE
# NOTE: This automatically calls chimaera_client_init() internally
# You do NOT need to call chimaera_client_init() separately
cte.initialize_cte("/path/to/config.yaml")

# Get global CTE client
client = cte.get_cte_client()

# Create memory context
mctx = cte.MemContext()

# Poll telemetry log
minimum_logical_time = 0
telemetry_entries = client.PollTelemetryLog(mctx, minimum_logical_time)

for entry in telemetry_entries:
print(f"Operation: {entry.op_}")
print(f"Size: {entry.size_}")
print(f"Offset: {entry.off_}")
print(f"Logical Time: {entry.logical_time_}")

# Reorganize blobs with new scores
tag_id = cte.TagId()
tag_id.major_ = 0
tag_id.minor_ = 1

blob_names = ["blob_001", "blob_002", "blob_003"]
new_scores = [0.95, 0.85, 0.75] # Different tier assignments

# Call ReorganizeBlob once per blob
for blob_name, new_score in zip(blob_names, new_scores):
result = client.ReorganizeBlob(mctx, tag_id, blob_name, new_score)
if result == 0:
print(f"Blob {blob_name} reorganized successfully")
else:
print(f"Reorganization of {blob_name} failed with error code: {result}")

Python Data Types

# Create unique IDs
tag_id = cte.TagId.GetNull()
blob_id = cte.BlobId.GetNull()

# Check if ID is null
if tag_id.IsNull():
print("Tag ID is null")

# Access ID components
print(f"Major: {tag_id.major_}, Minor: {tag_id.minor_}")

# Operation types
print(cte.CteOp.kPutBlob) # Put blob operation
print(cte.CteOp.kGetBlob) # Get blob operation
print(cte.CteOp.kDelBlob) # Delete blob operation

Python Blob Reorganization

The Python bindings support blob reorganization for dynamic data placement optimization using the ReorganizeBlob method:

import wrp_cte_core_ext as cte

# Initialize CTE system (as shown in previous examples)
# ...

client = cte.get_cte_client()
mctx = cte.MemContext()

# Get or create tag for the blobs
tag_id = cte.TagId()
tag_id.major_ = 0
tag_id.minor_ = 1

# Example 1: Reorganize multiple blobs to different tiers
blob_names = ["hot_data", "warm_data", "cold_archive"]
new_scores = [0.95, 0.6, 0.2] # Hot, warm, and cold tiers

# Call ReorganizeBlob once per blob
for blob_name, new_score in zip(blob_names, new_scores):
result = client.ReorganizeBlob(mctx, tag_id, blob_name, new_score)
if result == 0:
print(f"Blob {blob_name} reorganized successfully")
else:
print(f"Reorganization of {blob_name} failed with error code: {result}")

# Example 2: Promote frequently accessed blobs based on telemetry
telemetry = client.PollTelemetryLog(mctx, 0)
access_counts = {}

# Count accesses per blob name (requires tracking blob names from telemetry)
# Note: You may need to maintain a blob_id to blob_name mapping
for entry in telemetry:
if entry.op_ == cte.CteOp.kGetBlob:
# Track access patterns
blob_key = (entry.blob_id_.major_, entry.blob_id_.minor_)
access_counts[blob_key] = access_counts.get(blob_key, 0) + 1

# Batch reorganize based on access frequency
# Assuming you have a mapping of blob IDs to names
blob_id_to_name = {
(0, 1): "dataset_001",
(0, 2): "dataset_002",
(0, 3): "dataset_003"
}

blobs_to_reorganize = []
new_scores_list = []

for blob_key, count in access_counts.items():
if blob_key in blob_id_to_name and count > 10:
blob_name = blob_id_to_name[blob_key]
blobs_to_reorganize.append(blob_name)

# Calculate score based on access frequency
score = min(0.5 + (count / 100.0), 1.0)
new_scores_list.append(score)

# Perform reorganization for each blob
if blobs_to_reorganize:
for blob_name, new_score in zip(blobs_to_reorganize, new_scores_list):
result = client.ReorganizeBlob(mctx, tag_id, blob_name, new_score)
if result == 0:
print(f"Reorganized blob {blob_name} successfully")

# Example 3: Tier-based reorganization strategy
# Organize blobs into three tiers based on size and access patterns

# Small, frequently accessed -> Hot tier (0.9)
small_hot_blobs = ["config", "index", "metadata"]
for blob_name in small_hot_blobs:
result = client.ReorganizeBlob(mctx, tag_id, blob_name, 0.9)
if result == 0:
print(f"Hot tier blob {blob_name} reorganized")

# Medium, occasionally accessed -> Warm tier (0.5-0.7)
warm_blobs = ["dataset_recent_01", "dataset_recent_02"]
warm_scores = [0.6, 0.5]
for blob_name, score in zip(warm_blobs, warm_scores):
result = client.ReorganizeBlob(mctx, tag_id, blob_name, score)
if result == 0:
print(f"Warm tier blob {blob_name} reorganized")

# Large, rarely accessed -> Cold tier (0.1-0.3)
cold_blobs = ["archive_2023", "backup_full"]
cold_scores = [0.2, 0.1]
for blob_name, score in zip(cold_blobs, cold_scores):
result = client.ReorganizeBlob(mctx, tag_id, blob_name, score)
if result == 0:
print(f"Cold tier blob {blob_name} reorganized")

Score Guidelines for Python:

  • 0.9 - 1.0: Highest tier (RAM cache, frequently accessed)
  • 0.7 - 0.8: High tier (NVMe, recently accessed)
  • 0.4 - 0.6: Medium tier (SSD, occasionally accessed)
  • 0.1 - 0.3: Low tier (HDD, archival data)
  • 0.0: Lowest tier (cold storage, rarely accessed)

Method Signature:

result = client.ReorganizeBlob(
mctx, # Memory context
tag_id, # Tag ID containing the blob
blob_name, # Blob name (string)
new_score # New score (float, 0.0 to 1.0)
)

Return Codes:

  • 0: Success - blob reorganized successfully
  • Non-zero: Error - reorganization failed (tag not found, blob not found, insufficient space, etc.)

Important Notes:

  • Call ReorganizeBlob once per blob to reorganize multiple blobs
  • All blobs must belong to the specified tag_id
  • Scores must be in the range [0.0, 1.0]
  • Higher scores indicate hotter data that should be placed on faster storage tiers

Advanced Topics

Best Practices

Choosing Between Tag Wrapper and Direct Client API

Generally, the tag wrapper class is preferred over the direct API.

Memory Management Best Practices

For Raw Data Operations:

// Tag wrapper automatically manages shared memory for sync operations
wrp_cte::core::Tag tag("my_data");
std::vector<char> data = LoadData();
tag.PutBlob("item", data.data(), data.size()); // Safe - automatic cleanup

For Shared Memory Operations:

// Manual shared memory management - more control
// NOTE: AllocateBuffer is NOT templated - it returns hipc::FullPtr<char>
auto shm_buffer = CHI_IPC->AllocateBuffer(data_size);
memcpy(shm_buffer.ptr_, raw_data, data_size);
tag.PutBlob("item", shm_buffer.shm_, data_size, 0, score);
// shm_buffer automatically cleaned up when it goes out of scope

For Asynchronous Operations:

// Always keep shared memory alive until async task completes
std::vector<hipc::FullPtr<char>> buffers; // Keep alive
std::vector<hipc::FullPtr<PutBlobTask>> tasks;

for (auto& data_chunk : data_chunks) {
auto buffer = CHI_IPC->AllocateBuffer(data_chunk.size());
memcpy(buffer.ptr_, data_chunk.data(), data_chunk.size());

auto task = tag.AsyncPutBlob("chunk", buffer.shm_, data_chunk.size());

buffers.push_back(std::move(buffer)); // Keep alive!
tasks.push_back(task);
}

// Wait for completion and cleanup
for (auto& task : tasks) {
task->Wait();
CHI_IPC->DelTask(task);
}
// buffers automatically cleaned up here

Performance Optimization

Blob Scoring Guidelines:

  • Use scores 0.8-1.0 for frequently accessed "hot" data
  • Use scores 0.4-0.7 for occasionally accessed "warm" data
  • Use scores 0.0-0.3 for archival "cold" data
  • CTE uses scores for intelligent placement across storage tiers

Batch Operations:

// Efficient: Group related operations
wrp_cte::core::Tag batch_tag("batch_job");
for (const auto& item : batch_items) {
batch_tag.PutBlob(item.name, item.data, item.size);
}

// Less efficient: Multiple tags with few operations each
// Creates overhead for tag lookup and context switching

Size Queries:

// Efficient: Check size before allocating retrieval buffer
chi::u64 blob_size = tag.GetBlobSize("large_blob");
if (blob_size > 0) {
auto buffer = CHI_IPC->AllocateBuffer(blob_size);
tag.GetBlob("large_blob", buffer.shm_, blob_size);
}

// Less efficient: Allocate maximum possible size
// auto buffer = CHI_IPC->AllocateBuffer(MAX_SIZE); // Wasteful

Error Handling Patterns

Tag Wrapper (Exception-based):

try {
wrp_cte::core::Tag tag("dataset");
tag.PutBlob("data", buffer, size);

chi::u64 stored_size = tag.GetBlobSize("data");
if (stored_size != size) {
throw std::runtime_error("Size mismatch after storage");
}

} catch (const std::exception& e) {
std::cerr << "Storage operation failed: " << e.what() << "\n";
// Automatic cleanup via RAII
}

Direct Client (Return Code-based):

auto *client = WRP_CTE_CLIENT;
hipc::MemContext mctx;

TagId tag_id = client->GetOrCreateTag(mctx, "dataset");
bool success = client->PutBlob(mctx, tag_id, "data",
0, size, buffer, 0.5f, 0);

if (!success) {
std::cerr << "PutBlob failed\n";
return false;
}

chi::u64 stored_size = client->GetBlobSize(mctx, tag_id, "data");
if (stored_size != size) {
std::cerr << "Size mismatch: expected " << size << ", got " << stored_size << "\n";
return false;
}

Thread Safety Considerations

  • Both Tag wrapper and Client are thread-safe
  • Multiple threads can safely share the same Tag or Client instance
  • Shared memory buffers (hipc::FullPtr) should not be shared between threads
  • Each thread should use its own hipc::MemContext for optimal performance

Multi-Node Deployment

CTE Core supports distributed deployment across multiple nodes:

  1. Configure Chimaera for multi-node operation
  2. Use appropriate PoolQuery values:
    • chi::PoolQuery::Local() - Local node only
    • chi::PoolQuery::Global() - All nodes
    • Custom pool queries for specific node groups

Custom Data Placement Algorithms

Extend the DPE (Data Placement Engine) by implementing custom placement strategies:

  1. Inherit from the base DPE class
  2. Implement placement logic based on target metrics
  3. Register the new DPE type in configuration

Performance Optimization

  1. Batch Operations: Use async APIs for multiple operations
  2. Score-based Placement: Set appropriate scores (0-1) for data temperature
  3. Target Balancing: Monitor and rebalance based on target metrics
  4. Queue Tuning: Adjust lane counts and priorities based on workload

Error Handling

All operations return result codes:

  • 0: Success
  • Non-zero: Error (specific codes depend on operation)

Always check return values and handle errors appropriately:

chi::u32 result = cte_client.RegisterTarget(...);
if (result != 0) {
// Handle error
std::cerr << "Failed to register target, error code: " << result << "\n";
}

Thread Safety

  • CTE Core client operations are thread-safe
  • Multiple threads can share a client instance
  • Async operations are particularly suitable for multi-threaded usage

Memory Management

  • CTE Core uses shared memory for zero-copy data transfer
  • The hipc::Pointer type represents shared memory locations
  • Memory contexts (hipc::MemContext) manage allocation lifecycle

Troubleshooting

Common Issues

  1. Initialization Failures

    • Ensure Chimaera runtime is initialized first
    • Check configuration file path and format
    • Verify storage paths have appropriate permissions
  2. Target Registration Errors

    • Confirm target path exists and is writable
    • Check available disk space
    • Verify bdev type matches storage medium
  3. Blob Operations Failing

    • Ensure tag exists before blob operations
    • Check target has sufficient space
    • Verify data pointers are valid shared memory
  4. Performance Issues

    • Monitor target statistics regularly
    • Adjust worker count based on workload
    • Tune queue configurations
    • Consider data placement strategy

Debug Logging

Enable debug logging by setting environment variables:

export CHIMAERA_LOG_LEVEL=DEBUG
export CTE_LOG_LEVEL=DEBUG

Metrics Collection

Use the telemetry API to collect performance metrics:

// Continuous monitoring loop
while (running) {
auto telemetry = cte_client.PollTelemetryLog(mctx, last_logical_time);
ProcessTelemetry(telemetry);

if (!telemetry.empty()) {
last_logical_time = telemetry.back().logical_time_;
}

std::this_thread::sleep_for(std::chrono::seconds(1));
}

API Stability and Versioning

CTE Core follows semantic versioning:

  • Major version: Breaking API changes
  • Minor version: New features, backward compatible
  • Patch version: Bug fixes

Check version compatibility:

// Version macros (defined in headers)
#if CTE_CORE_VERSION_MAJOR >= 1 && CTE_CORE_VERSION_MINOR >= 0
// Use newer API features
#endif

Support and Resources

  • Documentation: This document and inline API documentation
  • Examples: See test/unit/ directory for comprehensive examples
  • Configuration: Example configs in config/ directory
  • Issues: Report bugs via project issue tracker