Introduction
High level documentation for the SuperMuSR/digital muon data pipeline.
This is intended to be a high level overview of the data pipeline for SuperMuSR (and later other muon instruments). Specific documentation for individual components will be kept alongside the relevant project/service/infrastructure.
Infrastructure
This section describes the infrastructre required to operate the muon data pipeline. The main focus will be on the online stage of the pipeline.
graph TD; A-->B; A-->C; B-->D; C-->D;
Networking
graph RL; cabin_switch[Cabin Switch] --- backboke_switch[Pipeline Backbone Switch]; backboke_switch --- kafka_cluster[Kafka Cluster]; backboke_switch --- compute_cluter(Compute Cluster); backboke_switch --- digitiser_switch_1[Digitiser Switch 1]; backboke_switch --- digitiser_switch_2[Digitiser Switch 2]; backboke_switch --- digitiser_switch_3[Digitiser Switch 3]; digitiser_switch_1 --- digitisers[Digitisers]; digitiser_switch_2 --- digitisers[Digitisers]; digitiser_switch_3 --- digitisers[Digitisers];
Assumptions
- Per DAQ throughput of 10 Mb/s (assuming compression and safety margin)
- Kafka replication factor = 1
- Quantity of nodes for Redpanda broker and compute cluster are unlikely to change, but also not set in stone
Digitisers
- 32 units x 4 DAQ each = 128 GbE ports
- 3 x 48 GbE port switches = 144 available GbE ports
- 43 DAQ per switch
Pipeline
Digitiser uplinks
- 1x 10 GbE uplink per digitiser switch
- 3 switches x 1 link per switch = 3 links
Kafka links
- 1x 10 GbE link per redpanda node
- 5 nodes x 1 link per node = 5 links
Compute links
- 5 nodes x 1 link per node = 5 links
Totals
10 Gbps links: 3 + 5 + 5 + 1 (uplink) = 14
24 ports would be ideal to allow for possible expansion.
Message passing
The online data pipeline uses a data broker as the central storage for active data. In this case the broker is Redpanda, an Apache Kafka compatible message broker.
Data transported in the online data pipeline is encoded using flatbuffers. Where possible, the same schemas as used by the ESS have been used, otherwise pipeline specific ones have been created.
The single source of truth for schemas used in the pipeline is here in the repository.
Online Compute
TODO
Monitoring, Observability and Alerting
TODO
Data Processing Pipeline
The data processing pipeline is defined as the data path between data collection and production of a NeXus histogram file equivalent to what existing muon instruments produce. This is split into two broad categories: acquisition & online processing and offline processing.
graph LR; digitiser[Digitisers] --- event_formation; event_formation[Event Formation] --- aggregator; aggregator[Event Aggregator] --- nexus_writer; epics[EPICS] --- nexus_writer; nexus_writer[NeXus Writer] --- nexus_event_file; nexus_event_file[NeXus Event File] --- event_filtering; event_filtering[Event Filtering] --- histogram_generation; histogram_generation[Histogram Generation] --- nexus_histogram_file; nexus_histogram_file[NeXus Histogram File];
Acquisition & Online Processing
The online stage of the pipeline exists between the digitisers and instrument control program and the NeXus file that is written to the archive. This physically exists on compute within the instrument cabin.
The online data pipeline is designed around a centralised data broker, with several small data processing units, each taking data from and providing data to the broker.
Event Formation
This is the stage that transforms raw detector traces into event lists.
Event Aggregator
This component aggregates the ~128 event list messages for each digitiser for a given frame into a single event list message per frame. Aggregation is performed using the status packet as a key, therefore any fault upstream of the status packet generation or in the transmission of the status packet will cause either controlled data loss or undefined behaviour in this component.
NeXus Writer
This writes NeXus event files with information sourced from:
- Run control messages
- The per frame aggregated event list messages
- EPICS sample environment/process logs
Offline Processing
The offline stage of the pipeline includes any processing that is done using the written event NeXus file as the input. This may happen anywhere within reason, but most likely will be done on IDAAas.
Architecture Design Records
Architecture Decision Records that describe decisions made in the design of the data pipeline.
1. Record architecture decisions
Date: 2024-06-06
Status
Accepted
Context
We need to record the architectural decisions made on this project.
Decision
We will use Architecture Decision Records, as described by Michael Nygard.
adrs will be used to manage ADRs.
Consequences
See Michael Nygard's article, linked above.
2. Use ESS-like architecture for streaming components
Date: 2024-06-17
Status
Accepted
(note that this ADR was written retrospectively)
Context
The desire is to adopt a streaming based architecture for SuperMuSR.
This has the following benefits:
- allows flexible/modular manipulation of data "in flight" (i.e. before a file is written to the archive)
- provides a means of rapid iteration on signal processing
- increases recoverability in the event of a single component failure
Decision
Adopt an ESS like architecture, along with appropriate technology decisions.
Specifically:
- Data will be exchanged via Apache Kafka with a broker
- The format of the exchanged data will be defined and encoded using Google Flatbuffers
SuperMuSR specific items/differences:
- The broker will be Redpanda
- Some additional schemas will be required
- More in flight processing will be needed, this will act as a consumer and producer, publishing the transformed data back to the broker on an appropriate topic
Consequences
- Redpanda broker now part of physical/infrastructure architecture
- Interface between digitisers and software components is further defined
3. Initial flatbuffer formats
Date: 2024-06-17
Status
Accepted
(note that this ADR was written retrospectively)
Context
The SuperMuSR data pipeline will use Google Flatbuffers as the definition and encoding for data passed through the pipeline. This requires schemas to be written and distributed between the various components.
Given previous in-kind work on ESS streaming, where possible existing schemas should be used.
Decision
The following existing ESS schemas will be used to facilitate communication from the instrument control system to the data pipeline:
6s4t_run_start.fbs
al00_alarm.fbs
df12_det_spec_map.fbs
f144_logdata.fbs
pl72_run_start.fbs
se00_data.fbs
The following new schemas will be created, specifically for SuperMuSR:
aev2_frame_assembled_event_v2.fbs
- an ISIS frame of event data from the entire instrumentdat2_digitizer_analog_trace_v2.fbs
- an ISIS frame of ADC data from a single digitizer (i.e. 8 channels)dev2_digitizer_event_v2.fbs
- an ISIS frame of event data from a single digitizer (i.e. 8 channels)
Consequences
- The interface between the digitisers and software components is fully defined
- The interface may change as requirements become more clear in the future (particularly around the output of event formation). Versioning and the ability to make some backwards compatible changes are possible using Flatbuffers.
4. Core technologies for in-flight data processing software
Date: 2024-06-17
Status
Accepted
(note that this ADR was written retrospectively)
Context
The SuperMuSR data pipeline includes several software components that operate on live/in-flight data from the digitisers.
These software components (regardless of their function) have the following requirements:
- Speed: they must reasonably keep up with the rate at which data is produced
- Reliability: reasonable effort should be made to prevent known (and indeed, unknown) issues from causing a component to fail
- Reproducibility: it should be possible to trivially recreate a component of the pipeline from a point in time
- Scalability: where required, it should be possible to scale components to meet throughput requirements
- Portability: where possible, components should not be tied to a specific means of deployment, orchestration or execution or execution environment
Decision
The following tooling will be used:
- The Rust programming language (addressing 1 and sufficiently mitigating 2)
- The Nix package manager & nixpkgs (addressing 3)
- Deployment via OCI compatible container images (partly addressing 4 and addressing 5)
Consequences
- All tools that operate on streamed data will be written in Rust
- All non-Rust non-vendored dependencies and associated tooling will be controlled via Nix (with Flakes)
- Another benefit of this is significantly easier developer environment setup
- Nix tooling will be in place to produce minimal container images which can be used for testing and deployment of pipeline components
5. Use OpenMetrics
Date: 2024-06-17
Status
Accepted
(note that this ADR was written retrospectively)
Context
The SuperMuSR data pipeline is composed of many components, each of them having task specific measures of function/success.
To help ensure the digitisers and data pipeline are operating as expected, such measures should be made available for alerting, monitoring and logging.
Decision
Each component should expose relevant metrics that effectively describe how well it is functioning via the textual OpenMetrics format.
Relevant metrics will differ between components, however given the low cost of reporting anything that could be a useful indicator or diagnostic item should be considered in scope.
Consequences
- Each pipeline component will provide text format metrics via HTTP
6. High level architecture
Date: 2024-06-26
Status
Accepted
(note that this ADR was written retrospectively)
Context
The SuperMuSR data pipeline is a distributed collection of processing and data management steps. These steps should be clearly designated to ensure the creation of minimal, UNIX philosophy compliant tools for manipulating streamed data as well as clearly indicating the interface between them and other components of the overall data acquisition system.
Decision
The overall/high level architecture will follow what is outlined in pipeline_v2.drawio
.
Consequences
- The architecture is well defined, with clear interfaces and areas of responsibility.
- The diagram and documentation are to be considered living documents and can be amended as required.
7. Standardise error handling
Date: 2024-07-30
Status
Accepted
Context
A good amount of the pipeline contains fallible operations, this is unavoidable.
Currently these are handled in roughly one of four ways:
- This should never happen under normal or abnormal execution:
unwrap()
- This might fail, we cannot recover and don't care about any state (usually in setup procedures):
expect()
- This might fail and the callee needs to care (i.e. in a library):
thiserror
- This might fail and the user might care (i.e. in a binary):
anyhow
This comes with the following issues:
- Lack of clean up/reporting when
expect()
panics - Lack of context when
unwrap()
is used
Decision
The above rules will be replaced with:
- This should never happen under normal or abnormal execution:
expect()
- This might fail and the callee needs to care (i.e. in a library or modules containing logic in a binary):
thiserror
- This might fail and the user might care (i.e. in only the setup proedure in a binary):
anyhow
The key changes, for clarity:
- Move cases of 2 into 4
- Forbid
unwrap()
andpanic()
anyhow
is only ever allowed to be the return value ofmain()
unwrap()
is also used extensively in automated tests, for the time being the above rules will only apply to production code, not testing.
"Production code" in this case is defined as anything not inside or descending from a module named test
.
Consequences
Error handling throughout the codebase will need to be audited to ensure it complies with the above (there are certainly a good amount of changes that will need to be made).
Automated tooling should be considered to automatically detect uses of forbidden calls (rustc or Clippy may already have suitable lints for this which could be enabled).
A follow up task later will be to look at the use of other operators and functions which might panic, e.g. []
vs get()
.
8. Use OpenTelemetry
Date: 2024-08-21
Status
Accepted
(note that this ADR was written retrospectively)
Context
In software, tracing is used to track cause and effect within the flow a program by recording logs, events and spans, which are collected and analysed.
For software involving multiple isolated processes, a tool is required to combine the spans emitted by the different processes and integrate them into consistent traces.
OpenTelemetry is a standard for accomplishing this which has implementations in many different languages (including rust) and is widely used.
It integrates nicely with the tracing
crate and Kafka.
Third party collectors and analysis tool exist such as Jaeger.
Decision
- Include, in the
common
crate, the following dependencies (underscore is deliberate):opentelemetry
opentelemetry-otlp
opentelemetry_sdk
tracing-opentelemetry
tracing-subscriber
- Develop structs, functions, and macros in the
common
crate that allows traces to be embedded in the headers of Kafka messages, so they can be used in upstream components. - Design structs, functions, and macros in the
common
crate such that none of the dependencies above need to be included in any other component, and modifications to existing compnents are minimal and can be implemented logically. - Tracing subscribers should be initialised by the above code using a macro called
init_tracer
. These should include thestdout
subscriber, whose filtering is controlled by theRUST_LOG
environment variable, and an OpenTelemetry subscriber which is controlled by command line arguments.
Consequences
- Third-party tool such as Jaeger can be used to collect and analyse tracing data from the whole pipeline.
- Any future component should extract traces from Kafka headers when consuming messages, and embed them when producing.
- Any future component should initialise its subscribers using the
init_tracer
macro (even if it does not used OpenTelemetry).
9. NeXus file format
Date: 2025-05-15
Status
Accepted
(note that this ADR was written retrospectively)
Context
The SuperMuSR data pipeline writes its output to hdf5 files which must be compatible with tools such as "MuonData event filtering", "WiMDA", and possibly "Mantid".
To ensure this, it is advisable that the pipeline's output format conform to a standard compatible with these tools, and which is publically available to allow future tools to easily read the pipeline's output files.
Decision
The Nexus Writer will write output conforming to a standard detailed in the components doc files. Wherever possible this standard conforms to Muon Instrument Definition: Version 2 – 'muonTD'. The main deviation is that the NXdata class in the raw_data_1/detector_1
field is replaced with the NXevent_data class, as this is designed for storing streamed event data.
Consequences
The Nexus Writer must be carefully documented, especially where the output format is concerned. Any file writes which do not conform to the above decision should be clearly explained.
10. Revised error handling using miette
Date: 2025-07-31
Status
Accepted
Context
ADR 0007 introduced an error handling scheme using anyhow
and thiserror
.
This had several issues:
- error messages lacked context
- the task of implementing it was rather intensive (and not yet complete)
Decision
Use miette
as the means of error handling, following the usage guide.
In short:
- use
miette::Result<T>
as the default return type of fallible operations - use
into_diagnostic()
to convert errors - use
with_context()
as appropriate - enable the
fancy
feature on binary crates - continue to use
thiserror
as appropriate - remove all use of
anyhow
Consequences
- error messages will include context, making them more useful in diagnosing problems
- less code will be required compared to ADR 0007