Quantum-CLI: A powerful CLI to build, run, and test Quantum Machines.
Unifying developers, engineers, and users to build, orchestrate, and automate intelligent workflows with Quantum Machines.
QuantumDatalytica empowers Developers to build modular Quantum Machines, enables Workflow Engineers to design powerful data workflows, and allows Workflow Users to execute and automate tasks seamlessly, all within a secure, scalable, and cloud-native platform.
Workflows are created using DAG-based design, with parent-child machine dependencies and conditional execution logic.
QuantumDatalytica as a unified platform for workflow automation, data processing, and task orchestration using modular, containerized units called Quantum Machines.
Marketplace, the central repository where machines are published, versioned, and reused across workflows and teams, reducing duplication of logic.
Design, build, and publish Quantum Machines to power scalable, reusable, and intelligent automation workflows.
A Machine Developer is a core contributor to the QuantumDatalytica platform, responsible for building, testing, and publishing self-contained units of logic called Quantum Machines. These machines are the foundational building blocks of automated data workflows. They are designed to perform a specific task, such as transforming data, calling APIs, running analytics, or generating documents and are built with consistency, reusability, and scalability in mind.
Machine Developers primarily work in Python, structure their logic around a standardized interface (CoreEngine), and package everything into lightweight Quantum Blocks. Each Quantum Machine must adhere to a defined project.json, input/output schema and is validated through rigorous testing before publication. These machines are then made available to Workflow Engineers and Users, who can plug them into larger Quantum workflows or trigger them as standalone automation tasks.
The role is both creative and technical. Developers have the freedom to innovate, turning domain expertise or unique algorithms into shareable, monetizable machines, while working within a well-structured ecosystem. With Quantum-Machine CLI tools, testing environments, and version control built into the platform, developers can confidently deploy machines that scale across environments.
Whether you're solving a data challenge, building a PDF generator, or integrating external APIs, as a Machine Developer, your work directly empowers automated decision-making and workflow efficiency at scale.
Set up your local tools and environment to begin building, testing, and publishing Quantum Machines efficiently.
Before you begin building a Quantum Machine, it's essential to set up a local development environment that mirrors the runtime environment of the QuantumDatalytica platform. This ensures compatibility, smooth testing, and predictable deployment behavior.
Tool | Purpose | Recommended Version |
---|---|---|
Python | Core language for writing machine logic | 3.9 or higher |
Docker | Core language for writing machine logic | Latest stable |
Git | Source control and version management | Latest |
Quantum CLI | Create, test, validate, and publish machines | Latest from PyPI |
VS Code (or similar IDE) | Code editing and project navigation | Latest |
Quantum-Core-Engine | This package provides the base class CoreEngine, which every machine's main.py must inherit from. It handles the standard execution flow, structured logging, and ensures compatibility with the QuantumDatalytica runtime engine. | Latest |
Once these tools are installed, you're fully equipped to create and run your first Quantum Machine.
Install the Quantum CLI to scaffold, test, and publish Quantum Machines directly from your local environment.
Quantum-Machine CLI is a command-line interface developed by QuantumDatalytica LLC to help developers build, run, test, log, and manage modular analytics components called Quantum Machines. These machines are the foundation of scalable, distributed data workflows within the QuantumDatalytica ecosystem. The CLI streamlines local development and ensures consistent behavior across environments.
pip install quantum-machine
quantum --help
Command | Description |
---|---|
init machine | Initialize a new Quantum Machine project with boilerplate files |
run machine | Run a machine and observe its behavior locally |
build machine | Build a Docker image for the specified machine |
test machine | Run unit tests defined for the machine |
lint machine | Check the machine's code for linting/style issues |
validate machine | Validate the machine's Project.json and required structure |
init workflow | Initialize a new workflow YAML file with DAG structure |
add machine | Add a machine as a task to a workflow and define its dependencies |
run workflow | Run a workflow DAG by executing machines in topological order |
logs machine | View logs from the last execution of a specified machine |
quantum init machine HelloWorld
git+https://github.com/QuantumDatalytica-LLC/quantum-core-engine.git@<latest_version>
Open the generated requirements.txt and update it to include the required quantum-core-engine version.
pip install --no-cache-dir --force-reinstall -r HelloWorld/requirements.txt
Install all required Python packages without using cache to ensure a clean environment.
quantum run machine HelloWorld
quantum build machine HelloWorld
Builds a Docker image with dependencies for the machine.
quantum test machine HelloWorld
Runs the test suite defined under the machine's directory.
quantum lint machine HelloWorld
Applies flake8 or equivalent linting tools to maintain code standards.
quantum validate machine HelloWorld\<file_name>
Ensures the machine has the correct Project.json, required fields, and structure.
quantum init workflow my_workflow
Creates a workflow.yaml file to define machine dependencies.
quantum add machine HelloWord -w my_workflow
quantum add machine 2nd_Machine -w my_workflow
quantum add machine 3rd_Machine -p HelloWorld --workflow my_workflow
quantum add machine 4th_Machine -parent 2nd_Machine -w my_workflow
quantum add machine 5th_Machine -p 3rd_Machine 4th_Machine -w my_workflow
quantum run workflow my_workflow
Executes machines in the correct DAG order as defined in workflow.yaml.
quantum logs machine HelloWord
Displays the logs from the most recent execution of the HelloWorld machine.
Install the core SDK that powers every Quantum Machine and ensures standardized execution, logging, and output handling.
The quantum-core-engine is a required Python package that provides the foundational structure for all Quantum Machines. It must be installed in your development environment before writing or testing any machine logic
https://github.com/QuantumDatalytica-LLC/quantum-core-engine.git@<latest_version>
Run this command inside your virtual environment or project directory to keep dependencies clean and isolated. Replace <latest_version> with the latest version available on the Quantum-Core-Engine Releases page.
The Quantum-Core-Engine is more than just a library, it defines how every Quantum Machine behaves. Its goals include:
Objective | Description |
---|---|
Standardized Execution | Provides a CoreEngine base class that ensures all machines follow a common execution pattern. |
Input Handling | Accepts structured JSON inputs and injects them into your custom logic. |
Output Validation | Ensures outputs are clean, structured, and aligned with the declared schema. |
Logging Support | Built-in logging methods (self.machine_logger.info(), self.machine_logger.error(), self.machine_logger.warning())for easy debugging and traceability. |
Security Hooks | Enforces safe execution policies (e.g., blocking unsafe file paths or unvalidated data access). |
Sandbox Integration | Seamlessly connects to the Quantum CLI and sandbox environment for local testing and validation. |
By using CoreEngine, developers don’t have to reinvent basic execution logic or worry about input/output parsing. They can focus purely on building business logic, while the framework handles the rest, making machines easier to validate, test, and integrate into Quantum Workflows.
from quantum.CoreEngine import CoreEngine
class MyMachine(CoreEngine):
"""
Inherit from CoreEngine to define your machine's logic.
Implement the required methods such as 'receiving', ‘pre_processing’
and many more to handle input and produce output.
"""
if __name__ == "__main__":
# Initialize and execute machine
machine = MyMachine()
machine.start()
Learn how to create, structure, and run your first Quantum Machine using the Quantum CLI toolkit.
Before writing any code, it's important to understand what a Quantum Machine really is: a self-contained, containerized Python base code that takes structured input, performs a task, and returns structured output. Each machine follows a common design pattern, making it predictable, testable, and reusable.
Development starts by using the Quantum Machine CLI to scaffold a new machine with a predefined structure. You'll write your logic in main.py, prepare project.json which defines key parameters of the Quantum Machine, and input.json specifies the expected input parameters for the Quantum Machine, while output.json defines the structure and content of the machine's resulting output.
Use CLI commands to test your machine locally, simulating its execution within the actual Quantum Datalytica runtime environment.
from quantum.CoreEngine import CoreEngine
class MyMachine(CoreEngine):
input_data = {}
dependent_machine_data = {}
def receiving(self, input_data, dependent_machine_data, callback):
"""Receiving
:param input_data: Configure parameter values
:param dependent_machine_data: Dependant/Previous machine data values
:return: callback method to pass data and error into next step
"""
data = {}
error_list = []
try:
# Review Final data and Error list
data = self.get_final_data()
error_list = self.get_error_list()
self.input_data = input_data
self.dependent_machine_data = dependent_machine_data
except Exception as e:
err_msg = f"Error : {str(e)}"
error_list.append(err_msg)
finally:
callback(data, error_list)
def pre_processing(self, callback):
"""Pre-Processing
:return: callback method to pass data and error into next step
"""
data = {}
error_list = []
try:
# Updated Final data and Error list
data = self.get_final_data()
error_list = self.get_error_list()
except Exception as e:
err_msg = f"Error : {str(e)}"
error_list.append(err_msg)
finally:
callback(data, error_list)
def processing(self, callback):
"""Processing
:return: callback method to pass data and error into next step
"""
data = {}
error_list = []
try:
# Updated Final data and Error list
data = self.get_final_data()
error_list = self.get_error_list()
except Exception as e:
err_msg = f"Error : {str(e)}"
error_list.append(err_msg)
finally:
callback(data, error_list)
def post_processing(self, callback):
"""Post-Processing
:return: callback method to pass data and error into next step
"""
data = {}
error_list = []
try:
# Updated Final data and Error list
data = self.get_final_data()
error_list = self.get_error_list()
except Exception as e:
err_msg = f"Error : {str(e)}"
error_list.append(err_msg)
finally:
callback(data, error_list)
def packaging_shipping(self, callback):
"""Packaging & Shipping
:return: callback method to pass data and error into next step, This is
final data to use into next machine
"""
data = {}
error_list = []
try:
# Updated Final data and Error list
data = self.get_final_data()
error_list = self.get_error_list()
except Exception as e:
err_msg = f"Error : {str(e)}"
error_list.append(err_msg)
finally:
callback(data, error_list)
if __name__ == '__main__':
# Create a machine instance and start the process
machine = MyMachine()
machine.start()
from quantum.CoreEngine import CoreEngine
This imports the CoreEngine base class from the Quantum-Core-Engine SDK. Inheriting from CoreEngine standardizes the structure, input/output flow, error handling, and lifecycle of your Quantum Machine.
input_data = {}Dependent_machine_data = {}
input_data: Holds the raw input parameters passed into the machine. These typically come from input.json or are set by the workflow engine.
dependent_machine_data: Captures output from any parent/previous machines in the workflow. Useful when a machine depends on upstream results.
receiving(self, input_data, dependent_machine_data, callback)
This method acts as the initial handshake for the machine’s execution. It captures the raw input and any output from parent machines, setting the stage for downstream logic.
pre_processing(self, callback)
Pre-processing prepares and transforms the initial inputs into a shape more suitable for the main computation. This stage is often used for validation, enrichment, and staging.
processing(self, callback)
This is the core execution phase where the machine performs its primary logic or task.
post_processing(self, callback)
In this phase, the machine performs final adjustments or logging after processing is complete, preparing the output for final packaging.
packaging_shipping(self, callback)
This is the last phase in the lifecycle — it prepares and packages the machine’s output into a structure compatible with QuantumDatalytica’s output schema and downstream machines.
if __name__ == '__main__':
machine = MyMachine()
machine.start()
Efficiently handle file operations inside your Quantum Machines with built-in APIs for reading, writing, and streaming files.
Quantum Machines often need to handle files such as data files, PDFs, Excel sheets, logs, or any file format. The Quantum-Core-Engine (QCE) provides a unified and developer-friendly interface for reading and writing files, while automatically handling:
The QCE decides automatically whether to inline (Base64) or stream (raw bytes) based on file size.
Retrieve file contents seamlessly, either inline for small files or via efficient streaming for large datasets, with optional Base64 encoding.
The read_file function provides a unified interface for reading files in a Quantum Machine environment. It automatically handles small files inline (e.g., base64-encoded JSON) and large files with streaming (chunked binary reading). This ensures developers can read files of any size efficiently without worrying about memory overhead.
read_file(
self,
filename: Union[str, Path],
*,
as_base64: bool = True,
inline_limit: Optional[int] = None,
chunk_size: Optional[int] = None
) -> dict
Name | Type | Default | Description |
---|---|---|---|
filename | str | Path | (required) |
as_base64 | bool | True | Whether to encode the file content as base64 (inline) or return raw bytes (via
iterator).
|
inline_limit | int | None | None (defaults to engine setting, e.g., 10 MiB) |
chunk_size | int | None | None (defaults to e.g., 1 MiB) |
The function returns a dictionary (dict) with metadata and file content. The structure varies slightly depending on whether the file was inlined or streamed.
Inline Mode (small files)
{
"status_code": 200,
"message": "File read successfully (inline)",
"file_name": "example.txt",
"file_size": 1024, # size in bytes
"inline": True, # indicates inline mode
"as_base64": True, # whether content is base64
"file_content": "SGVsbG8gd29ybGQh" # base64 string or raw bytes
}
Streaming Mode (large files)
{
"status_code": 206,
"message": "File read successfully (streaming)",
"file_name": "bigdata.csv",
"file_size": 1073741824, # 1GB
"inline": False, # indicates streaming mode
"as_base64": False, # indicates raw stream
"file_iter": , # generator yielding file chunks
"chunk_size": 1048576 # size of each chunk (e.g., 1 MiB)
}
File Not Found
{
"status_code": 404,
"message": "File not found or not allowed: missing.pdf",
"filename": "missing.pdf"
}
Permission Denied
{
"status_code": 403,
"message": "Permission denied for file: secret.key",
"filename": "secret.key"
}
Unexpected Error
{
"status_code": 500,
"message": "Exception: [error details]",
"filename": "data.bin"
}
Read a small text file
resp = self.read_file("config.json")
print(resp["file_content"]) # base64-encoded content
Read as bytes (for binary files like PDF/Excel)
resp = self.read_file("report.pdf", as_base64=True)
with open("local_report.pdf", "wb") as f:
f.write(resp["file_content"]) # raw bytes
Example: Read a large file (auto streaming)
resp = self.read_file("huge_dataset.csv")
with open("huge_dataset.csv", "wb") as f:
for chunk in resp["iterator"]: # streamed chunks (bytes)
f.write(chunk)
You don’t need to worry about Base64 or memory limits—the engine chooses the best mode.
Store files reliably using raw bytes, Base64, or streaming, with built-in support for atomic writes, overwrite control, and large file handling.
The write_file() method is the central file write API for Quantum Machines. It supports multiple input formats (raw bytes, base64, iterators, or existing file paths) and automatically decides the safest and most efficient way to persist the data.
write_file(
self,
filename: Union[str, Path],
*,
data: bytes | bytearray | None = None,
base64_data: Optional[str] = None,
data_iter: Optional[Iterable[bytes | str]] = None,
src_path: Optional[Union[str, Path]] = None,
overwrite: bool = True,
chunk_size: Optional[int] = None
) -> dict
Parameter | Type | Required | Default | Description |
---|---|---|---|---|
filename | str | Path | Yes | -- | Target filename to write into the Quantum Machine’s working directory. Can be relative or absolute. |
data | bytes | bytearray | None | No | None | Direct binary data buffer to write. Use this for small in-memory data. Mutually exclusive with base64_data, data_iter, and src_path. |
base64_data | str | None | No | None | Base64-encoded string of file contents. Useful when data is transferred over JSON or APIs. Mutually exclusive with data, data_iter, and src_path. |
data_iter | Iterable[bytes | str] | None | No | None | Iterable/stream of chunks to write incrementally. Useful for large files(e.g., reading in 4MB pieces). Chunks can be raw bytes or base64 strings. |
src_path | str | Path | None | No | None | Copy file from an existing local path instead of providing inline data. Handy for staging files or reusing existing outputs. |
overwrite | bool | No | True | If True, overwrites existing file. If False and file exists, it raises an error. |
chunk_size | int | None | No | None | Size (in bytes) of each chunk when writing from data_iter. Ignored for data and base64_data. Useful for throttling memory usage during streaming. |
The function always returns a dictionary containing metadata about the read operation, and depending on parameters, either inline content or a streaming reference.
Key | Type | Always Present? | Description |
---|---|---|---|
status_code | int | Yes | HTTP-style status code. 200 = success, 400/404/500 = various errors. |
filename | str | Yes | The file name that was read (relative to Quantum Machine’s working directory). |
size | int | Yes | Size of the file in bytes. |
mode | int | Yes | File permission bits (e.g., 420 for 0o644). |
modified_at | float | Yes | Last modified timestamp (epoch seconds). |
data | str (Base64) or bytes | Conditional | Inline file content. Returned if file size ≤ inline_limit (default behavior). Format depends on
as_base64:
|
chunks | list[dict] | Conditional | Present if file is streamed because it exceeded inline_limit Each chunk dict contains:
|
streaming | bool | Yes | True if file was returned in streamed chunks, False if inline. |
{
"status_code": 200,
"message": "File saved successfully",
"file_path": "output/data.csv",
"size_bytes": 2048,
"sha256": "9d5f7ac...c5b1",
"mime_type": "text/csv"
}
Conflict (file exists, overwrite disabled)
{
"status_code": 409,
"message": "File exists and overwrite=False",
"filename": "output/data.csv"
}
Invalid Input
{
"status_code": 400,
"message": "Provide exactly one of data, base64_data, data_iter, or src_path"
}
File System Error
{
"status_code": 500,
"message": "Exception: [error details]"
}
Write a small file (auto streaming)
self.write_file("hello.txt", data=b"Hello, Quantum!")
Write from Base64 (rarely needed manually)
content_b64 = base64.b64encode(b"My secret file").decode()
self.write_file("secret.txt", base64_data=content_b64)
Write a large file via streaming
def file_iterator(path, chunk_size=1024*1024): # 1MB chunks
with open(path, "rb") as f:
while chunk := f.read(chunk_size):
yield chunk
self.write_file("big_backup.zip", data_iter=file_iterator("local_backup.zip"))
The engine detects iterators and automatically performs a streaming write.
Practical examples demonstrating how Quantum Machine Developers can use read_file and write_file in real workflows for seamless file handling.
The following examples demonstrate how to use CoreEngine’s file management APIs in a Quantum Machine. These cover both small inline files and large streamed files.
Write a small file (string/bytes)
Upload Machine
from quantum.CoreEngine import CoreEngine
class MyMachine(CoreEngine):
input_data = {}
dependent_machine_data = {}
def receiving(self, input_data, dependent_machine_data, callback):
"""Receiving
:param input_data: Configure parameter values
:param dependent_machine_data: Dependant/Previous machine data values
:return: callback method to pass data and error into next step
"""
data = {}
error_list = []
try:
# Review Final data and Error list
data = self.get_final_data()
error_list = self.get_error_list()
except Exception as e:
err_msg = f"Error : {str(e)}"
error_list.append(err_msg)
finally:
callback(data, error_list)
def pre_processing(self, callback):
"""Pre-Processing
:return: callback method to pass data and error into next step
"""
data = {}
error_list = []
try:
# Updated Final data and Error list
data = self.get_final_data()
error_list = self.get_error_list()
except Exception as e:
err_msg = f"Error : {str(e)}"
error_list.append(err_msg)
finally:
callback(data, error_list)
def processing(self, callback):
"""Processing
:return: callback method to pass data and error into next step
"""
data = {}
error_list = []
try:
# Updated Final data and Error list
data = self.get_final_data()
error_list = self.get_error_list()
def file_iterator(path, chunk_size=1024*1024): # 1MB chunks
with open(path, "rb") as f:
while chunk := f.read(chunk_size):
yield chunk
res = self.write_file("UploadSampleFile.zip",
data_iter=file_iterator("D:\SampleFile.zip"))
data = res
except Exception as e:
err_msg = f"Error : {str(e)}"
error_list.append(err_msg)
finally:
callback(data, error_list)
def post_processing(self, callback):
"""Post-Processing
:return: callback method to pass data and error into next step
"""
data = {}
error_list = []
try:
# Updated Final data and Error list
data = self.get_final_data()
error_list = self.get_error_list()
except Exception as e:
err_msg = f"Error : {str(e)}"
error_list.append(err_msg)
finally:
callback(data, error_list)
def packaging_shipping(self, callback):
"""Packaging & Shipping
:return: callback method to pass data and error into next step,
This is final data to use into next machine
"""
data = {}
error_list = []
try:
# Updated Final data and Error list
data = self.get_final_data()
error_list = self.get_error_list()
except Exception as e:
err_msg = f"Error : {str(e)}"
error_list.append(err_msg)
finally:
callback(data, error_list)
if __name__ == '__main__':
# Create a machine instance and start the process
machine = MyMachine()
machine.start()
Download Machine
from quantum.CoreEngine import CoreEngine
class MyMachine(CoreEngine):
input_data = {}
dependent_machine_data = {}
# file_path get from previous machine or configure parameter
file_path = None
def receiving(self, input_data, dependent_machine_data, callback):
"""Receiving
:param input_data: Configure parameter values
:param dependent_machine_data: Dependant/Previous machine data values
:return: callback method to pass data and error into next step
"""
data = {}
error_list = []
try:
# Review Final data and Error list
data = self.get_final_data()
error_list = self.get_error_list()
self.input_data = input_data
self.dependent_machine_data = dependent_machine_data
self.file_path = self.input_data.get("file_path", None)
if not self.file_path:
err_msg = "File path is not provided."
error_list.append(err_msg)
except Exception as e:
err_msg = f"Error : {str(e)}"
error_list.append(err_msg)
finally:
callback(data, error_list)
def pre_processing(self, callback):
"""Pre-Processing
:return: callback method to pass data and error into next step
"""
data = {}
error_list = []
try:
# Updated Final data and Error list
data = self.get_final_data()
error_list = self.get_error_list()
except Exception as e:
err_msg = f"Error : {str(e)}"
error_list.append(err_msg)
finally:
callback(data, error_list)
def processing(self, callback):
"""Processing
:return: callback method to pass data and error into next step
"""
data = {}
error_list = []
try:
# Updated Final data and Error list
data = self.get_final_data()
error_list = self.get_error_list()
# Read chunk data
resp = self.read_file(self.file_path)
if resp.get("streamed"):
with open("DownloadSampleFile.zip", "wb") as out:
for chunk in resp["iterator"]:
out.write(chunk)
except Exception as e:
err_msg = f"Error : {str(e)}"
error_list.append(err_msg)
finally:
callback(data, error_list)
def post_processing(self, callback):
"""Post-Processing
:return: callback method to pass data and error into next step
"""
data = {}
error_list = []
try:
# Updated Final data and Error list
data = self.get_final_data()
error_list = self.get_error_list()
except Exception as e:
err_msg = f"Error : {str(e)}"
error_list.append(err_msg)
finally:
callback(data, error_list)
def packaging_shipping(self, callback):
"""Packaging & Shipping
:return: callback method to pass data and error into next step,
This is final data to use into next machine
"""
data = {}
error_list = []
try:
# Updated Final data and Error list
data = self.get_final_data()
error_list = self.get_error_list()
except Exception as e:
err_msg = f"Error : {str(e)}"
error_list.append(err_msg)
finally:
callback(data, error_list)
if __name__ == '__main__':
# Create a machine instance and start the process
machine = MyMachine()
machine.start()
With these utilities, Quantum Machine Developers can handle any file size, any format, safely and efficiently without needing to think about encoding or streaming logistics.
Connect with peers, join our Slack/Discord community, attend monthly AMAs, and get featured through the Machine Spotlight Program.
Email us at