Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Introduction

High level documentation for the SuperMuSR/digital muon data pipeline.

This is intended to be a high level overview of the data pipeline for SuperMuSR (and later other muon instruments). Specific documentation for individual components will be kept alongside the relevant project/service/infrastructure.

Infrastructure

This section describes the infrastructre required to operate the muon data pipeline. The main focus will be on the online stage of the pipeline.

graph TD;
    A-->B;
    A-->C;
    B-->D;
    C-->D;

Networking

graph RL;
    cabin_switch[Cabin Switch] --- backboke_switch[Pipeline Backbone Switch];

    backboke_switch --- kafka_cluster[Kafka Cluster];
    backboke_switch --- compute_cluter(Compute Cluster);

    backboke_switch --- digitiser_switch_1[Digitiser Switch 1];
    backboke_switch --- digitiser_switch_2[Digitiser Switch 2];
    backboke_switch --- digitiser_switch_3[Digitiser Switch 3];

    digitiser_switch_1 --- digitisers[Digitisers];
    digitiser_switch_2 --- digitisers[Digitisers];
    digitiser_switch_3 --- digitisers[Digitisers];

Assumptions

  • Per DAQ throughput of 10 Mb/s (assuming compression and safety margin)
  • Kafka replication factor = 1
  • Quantity of nodes for Redpanda broker and compute cluster are unlikely to change, but also not set in stone

Digitisers

  • 32 units x 4 DAQ each = 128 GbE ports
  • 3 x 48 GbE port switches = 144 available GbE ports
  • 43 DAQ per switch

Pipeline

  • 1x 10 GbE uplink per digitiser switch
  • 3 switches x 1 link per switch = 3 links
  • 1x 10 GbE link per redpanda node
  • 5 nodes x 1 link per node = 5 links
  • 5 nodes x 1 link per node = 5 links

Totals

10 Gbps links: 3 + 5 + 5 + 1 (uplink) = 14

24 ports would be ideal to allow for possible expansion.

Message passing

The online data pipeline uses a data broker as the central storage for active data. In this case the broker is Redpanda, an Apache Kafka compatible message broker.

Data transported in the online data pipeline is encoded using flatbuffers. Where possible, the same schemas as used by the ESS have been used, otherwise pipeline specific ones have been created.

The single source of truth for schemas used in the pipeline is here in the repository.

Online Compute

TODO

Monitoring, Observability and Alerting

TODO

Data Processing Pipeline

The data processing pipeline is defined as the data path between data collection and production of a NeXus histogram file equivalent to what existing muon instruments produce. This is split into two broad categories: acquisition & online processing and offline processing.

graph LR;
    digitiser[Digitisers] --- event_formation;
    event_formation[Event Formation] --- aggregator;
    aggregator[Event Aggregator] --- nexus_writer;
    epics[EPICS] --- nexus_writer;
    nexus_writer[NeXus Writer] --- nexus_event_file;
    nexus_event_file[NeXus Event File] --- event_filtering;
    event_filtering[Event Filtering] --- histogram_generation;
    histogram_generation[Histogram Generation] --- nexus_histogram_file;
    nexus_histogram_file[NeXus Histogram File];

Acquisition & Online Processing

The online stage of the pipeline exists between the digitisers and instrument control program and the NeXus file that is written to the archive. This physically exists on compute within the instrument cabin.

The online data pipeline is designed around a centralised data broker, with several small data processing units, each taking data from and providing data to the broker.

Event Formation

This is the stage that transforms raw detector traces into event lists.

Source and documentation.

Event Aggregator

This component aggregates the ~128 event list messages for each digitiser for a given frame into a single event list message per frame. Aggregation is performed using the status packet as a key, therefore any fault upstream of the status packet generation or in the transmission of the status packet will cause either controlled data loss or undefined behaviour in this component.

Source and documentation.

NeXus Writer

This writes NeXus event files with information sourced from:

  • Run control messages
  • The per frame aggregated event list messages
  • EPICS sample environment/process logs

Source and documentation.

Offline Processing

The offline stage of the pipeline includes any processing that is done using the written event NeXus file as the input. This may happen anywhere within reason, but most likely will be done on IDAAas.

Source.

Architecture Design Records

Architecture Decision Records that describe decisions made in the design of the data pipeline.

1. Record architecture decisions

Date: 2024-06-06

Status

Accepted

Context

We need to record the architectural decisions made on this project.

Decision

We will use Architecture Decision Records, as described by Michael Nygard.

adrs will be used to manage ADRs.

Consequences

See Michael Nygard's article, linked above.

2. Use ESS-like architecture for streaming components

Date: 2024-06-17

Status

Accepted

(note that this ADR was written retrospectively)

Context

The desire is to adopt a streaming based architecture for SuperMuSR.

This has the following benefits:

  • allows flexible/modular manipulation of data "in flight" (i.e. before a file is written to the archive)
  • provides a means of rapid iteration on signal processing
  • increases recoverability in the event of a single component failure

Decision

Adopt an ESS like architecture, along with appropriate technology decisions.

Specifically:

  • Data will be exchanged via Apache Kafka with a broker
  • The format of the exchanged data will be defined and encoded using Google Flatbuffers

SuperMuSR specific items/differences:

  • The broker will be Redpanda
  • Some additional schemas will be required
  • More in flight processing will be needed, this will act as a consumer and producer, publishing the transformed data back to the broker on an appropriate topic

Consequences

  • Redpanda broker now part of physical/infrastructure architecture
  • Interface between digitisers and software components is further defined

3. Initial flatbuffer formats

Date: 2024-06-17

Status

Accepted

(note that this ADR was written retrospectively)

Context

The SuperMuSR data pipeline will use Google Flatbuffers as the definition and encoding for data passed through the pipeline. This requires schemas to be written and distributed between the various components.

Given previous in-kind work on ESS streaming, where possible existing schemas should be used.

Decision

The following existing ESS schemas will be used to facilitate communication from the instrument control system to the data pipeline:

  • 6s4t_run_start.fbs
  • al00_alarm.fbs
  • df12_det_spec_map.fbs
  • f144_logdata.fbs
  • pl72_run_start.fbs
  • se00_data.fbs

The following new schemas will be created, specifically for SuperMuSR:

  • aev2_frame_assembled_event_v2.fbs - an ISIS frame of event data from the entire instrument
  • dat2_digitizer_analog_trace_v2.fbs - an ISIS frame of ADC data from a single digitizer (i.e. 8 channels)
  • dev2_digitizer_event_v2.fbs - an ISIS frame of event data from a single digitizer (i.e. 8 channels)

Consequences

  • The interface between the digitisers and software components is fully defined
  • The interface may change as requirements become more clear in the future (particularly around the output of event formation). Versioning and the ability to make some backwards compatible changes are possible using Flatbuffers.

4. Core technologies for in-flight data processing software

Date: 2024-06-17

Status

Accepted

(note that this ADR was written retrospectively)

Context

The SuperMuSR data pipeline includes several software components that operate on live/in-flight data from the digitisers.

These software components (regardless of their function) have the following requirements:

  1. Speed: they must reasonably keep up with the rate at which data is produced
  2. Reliability: reasonable effort should be made to prevent known (and indeed, unknown) issues from causing a component to fail
  3. Reproducibility: it should be possible to trivially recreate a component of the pipeline from a point in time
  4. Scalability: where required, it should be possible to scale components to meet throughput requirements
  5. Portability: where possible, components should not be tied to a specific means of deployment, orchestration or execution or execution environment

Decision

The following tooling will be used:

  • The Rust programming language (addressing 1 and sufficiently mitigating 2)
  • The Nix package manager & nixpkgs (addressing 3)
  • Deployment via OCI compatible container images (partly addressing 4 and addressing 5)

Consequences

  • All tools that operate on streamed data will be written in Rust
  • All non-Rust non-vendored dependencies and associated tooling will be controlled via Nix (with Flakes)
    • Another benefit of this is significantly easier developer environment setup
  • Nix tooling will be in place to produce minimal container images which can be used for testing and deployment of pipeline components

5. Use OpenMetrics

Date: 2024-06-17

Status

Accepted

(note that this ADR was written retrospectively)

Context

The SuperMuSR data pipeline is composed of many components, each of them having task specific measures of function/success.

To help ensure the digitisers and data pipeline are operating as expected, such measures should be made available for alerting, monitoring and logging.

Decision

Each component should expose relevant metrics that effectively describe how well it is functioning via the textual OpenMetrics format.

Relevant metrics will differ between components, however given the low cost of reporting anything that could be a useful indicator or diagnostic item should be considered in scope.

Consequences

  • Each pipeline component will provide text format metrics via HTTP

6. High level architecture

Date: 2024-06-26

Status

Accepted

(note that this ADR was written retrospectively)

Context

The SuperMuSR data pipeline is a distributed collection of processing and data management steps. These steps should be clearly designated to ensure the creation of minimal, UNIX philosophy compliant tools for manipulating streamed data as well as clearly indicating the interface between them and other components of the overall data acquisition system.

Decision

The overall/high level architecture will follow what is outlined in pipeline_v2.drawio.

Consequences

  • The architecture is well defined, with clear interfaces and areas of responsibility.
  • The diagram and documentation are to be considered living documents and can be amended as required.

7. Standardise error handling

Date: 2024-07-30

Status

Accepted

Context

A good amount of the pipeline contains fallible operations, this is unavoidable.

Currently these are handled in roughly one of four ways:

  1. This should never happen under normal or abnormal execution: unwrap()
  2. This might fail, we cannot recover and don't care about any state (usually in setup procedures): expect()
  3. This might fail and the callee needs to care (i.e. in a library): thiserror
  4. This might fail and the user might care (i.e. in a binary): anyhow

This comes with the following issues:

  • Lack of clean up/reporting when expect() panics
  • Lack of context when unwrap() is used

Decision

The above rules will be replaced with:

  1. This should never happen under normal or abnormal execution: expect()
  2. This might fail and the callee needs to care (i.e. in a library or modules containing logic in a binary): thiserror
  3. This might fail and the user might care (i.e. in only the setup proedure in a binary): anyhow

The key changes, for clarity:

  • Move cases of 2 into 4
  • Forbid unwrap() and panic()
  • anyhow is only ever allowed to be the return value of main()

unwrap() is also used extensively in automated tests, for the time being the above rules will only apply to production code, not testing. "Production code" in this case is defined as anything not inside or descending from a module named test.

Consequences

Error handling throughout the codebase will need to be audited to ensure it complies with the above (there are certainly a good amount of changes that will need to be made).

Automated tooling should be considered to automatically detect uses of forbidden calls (rustc or Clippy may already have suitable lints for this which could be enabled).

A follow up task later will be to look at the use of other operators and functions which might panic, e.g. [] vs get().

8. Use OpenTelemetry

Date: 2024-08-21

Status

Accepted

(note that this ADR was written retrospectively)

Context

In software, tracing is used to track cause and effect within the flow a program by recording logs, events and spans, which are collected and analysed. For software involving multiple isolated processes, a tool is required to combine the spans emitted by the different processes and integrate them into consistent traces. OpenTelemetry is a standard for accomplishing this which has implementations in many different languages (including rust) and is widely used. It integrates nicely with the tracing crate and Kafka. Third party collectors and analysis tool exist such as Jaeger.

Decision

  1. Include, in the common crate, the following dependencies (underscore is deliberate):
    1. opentelemetry
    2. opentelemetry-otlp
    3. opentelemetry_sdk
    4. tracing-opentelemetry
    5. tracing-subscriber
  2. Develop structs, functions, and macros in the common crate that allows traces to be embedded in the headers of Kafka messages, so they can be used in upstream components.
  3. Design structs, functions, and macros in the common crate such that none of the dependencies above need to be included in any other component, and modifications to existing compnents are minimal and can be implemented logically.
  4. Tracing subscribers should be initialised by the above code using a macro called init_tracer. These should include the stdout subscriber, whose filtering is controlled by the RUST_LOG environment variable, and an OpenTelemetry subscriber which is controlled by command line arguments.

Consequences

  1. Third-party tool such as Jaeger can be used to collect and analyse tracing data from the whole pipeline.
  2. Any future component should extract traces from Kafka headers when consuming messages, and embed them when producing.
  3. Any future component should initialise its subscribers using the init_tracer macro (even if it does not used OpenTelemetry).

9. NeXus file format

Date: 2025-05-15

Status

Accepted

(note that this ADR was written retrospectively)

Context

The SuperMuSR data pipeline writes its output to hdf5 files which must be compatible with tools such as "MuonData event filtering", "WiMDA", and possibly "Mantid".

To ensure this, it is advisable that the pipeline's output format conform to a standard compatible with these tools, and which is publically available to allow future tools to easily read the pipeline's output files.

Decision

The Nexus Writer will write output conforming to a standard detailed in the components doc files. Wherever possible this standard conforms to Muon Instrument Definition: Version 2 – 'muonTD'. The main deviation is that the NXdata class in the raw_data_1/detector_1 field is replaced with the NXevent_data class, as this is designed for storing streamed event data.

Consequences

The Nexus Writer must be carefully documented, especially where the output format is concerned. Any file writes which do not conform to the above decision should be clearly explained.

10. Revised error handling using miette

Date: 2025-07-31

Status

Accepted

Context

ADR 0007 introduced an error handling scheme using anyhow and thiserror.

This had several issues:

  • error messages lacked context
  • the task of implementing it was rather intensive (and not yet complete)

Decision

Use miette as the means of error handling, following the usage guide.

In short:

  • use miette::Result<T> as the default return type of fallible operations
  • use into_diagnostic() to convert errors
  • use with_context() as appropriate
  • enable the fancy feature on binary crates
  • continue to use thiserror as appropriate
  • remove all use of anyhow

Consequences

  • error messages will include context, making them more useful in diagnosing problems
  • less code will be required compared to ADR 0007