Skip to content

API Reference

This page provides the API reference for TrackNado.

Hub Building

Bases: BaseModel

Fluent API for building UCSC track hubs.

Now a Pydantic model for EASY serialization.

Source code in tracknado/builder.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
class HubBuilder(BaseModel):
    """Fluent API for building UCSC track hubs.

    Now a Pydantic model for EASY serialization.
    """

    model_config = ConfigDict(arbitrary_types_allowed=True, populate_by_name=True)

    tracks: list[Track] = Field(default_factory=list)
    group_by_cols: list[str] = Field(default_factory=list)
    supergroup_by_cols: list[str] = Field(default_factory=list)
    overlay_by_cols: list[str] = Field(default_factory=list)
    color_by_col: str | None = Field(None)
    color_palette: str = Field("tab20")

    # Conversion settings
    convert_files: bool = Field(False)
    chrom_sizes: Optional[pathlib.Path] = Field(None)
    custom_genome_config: dict[str, Any] = Field(default_factory=dict)
    sort_metadata: bool = Field(False)
    missing_group_label: str | None = Field(None)
    missing_group_columns: list[str] = Field(default_factory=list)

    # Non-serialized field for extractors (functions can't be JSON serialized easily)
    metadata_extractors: list[Callable[[pathlib.Path], dict[str, str]]] = Field(
        default_factory=list, exclude=True
    )

    def __init__(self, **data):
        super().__init__(**data)

    def add_tracks(
        self, paths: list[str] | list[pathlib.Path], **common_metadata: str
    ) -> HubBuilder:
        """Add multiple tracks from paths."""
        for p in paths:
            path = pathlib.Path(p)
            self.tracks.append(Track(path=path, metadata=common_metadata.copy()))
        return self

    def add_tracks_from_df(self, df: pd.DataFrame, fn_col: str = "fn") -> HubBuilder:
        """Add tracks from a pandas DataFrame."""
        df = df.copy()
        if fn_col in df.columns and "ext" not in df.columns:
            df["ext"] = df[fn_col].apply(lambda x: pathlib.Path(x).suffix.strip("."))

        try:
            df = TrackDataFrameSchema.validate(df)
        except Exception as e:
            logger.warning(f"DataFrame validation failed: {e}")

        for _, row in df.iterrows():
            path = pathlib.Path(row[fn_col])
            metadata = {
                k: str(v)
                for k, v in row.items()
                if k not in [fn_col, "ext", "path", "name"] and pd.notna(v)
            }
            track = Track(path=path, metadata=metadata)
            if "name" in row and pd.notna(row["name"]):
                track.name = row["name"]
            if "ext" in row and pd.notna(row["ext"]):
                track.track_type = row["ext"]
            self.tracks.append(track)
        return self

    def with_metadata_extractor(
        self, fn: Callable[[pathlib.Path], dict[str, str]]
    ) -> HubBuilder:
        """Add a metadata extractor function."""
        self.metadata_extractors.append(fn)
        return self

    def group_by(self, *columns: str, as_supertrack: bool = False) -> "HubBuilder":
        """Specify columns to group by. If as_supertrack is True, these columns
        will be used for SuperTracks instead of dimensions in a CompositeTrack.
        """
        if as_supertrack:
            self.supergroup_by_cols.extend(columns)
        else:
            self.group_by_cols.extend(columns)
        return self

    def with_custom_genome(
        self,
        name: str,
        twobit_file: str | pathlib.Path,
        organism: str,
        default_position: str = "chr1:1000-2000",
    ) -> "HubBuilder":
        """Configure a custom genome (Assembly Hub) for this hub."""
        self.custom_genome_config = {
            "custom_genome": True,
            "genome_twobit": str(twobit_file),
            "genome_organism": organism,
            "genome_default_position": default_position,
        }
        return self

    def color_by(self, column: str, palette: str = "tab20") -> HubBuilder:
        """Specify column for track coloring."""
        self.color_by_col = column
        self.color_palette = palette
        return self

    def overlay_by(self, *columns: str) -> HubBuilder:
        """Specify columns for overlay tracks."""
        self.overlay_by_cols.extend(columns)
        return self

    def with_sort_metadata(self, enabled: bool = True) -> HubBuilder:
        """Enable or disable sorting of metadata columns in output."""
        self.sort_metadata = enabled
        return self

    def with_convert_files(self, enabled: bool = True) -> HubBuilder:
        """Enable or disable implicit track conversion."""
        self.convert_files = enabled
        return self

    def with_missing_groups(
        self, label: str = "NA", *columns: str
    ) -> HubBuilder:
        """Replace missing grouping values with a label before hub generation.

        If `columns` are not provided, applies to all active grouping columns.
        """
        self.missing_group_label = label
        self.missing_group_columns = list(columns)
        return self

    def with_chrom_sizes(self, path: Union[str, pathlib.Path]) -> HubBuilder:
        """Set the chrom.sizes file for track conversion."""
        self.chrom_sizes = pathlib.Path(path)
        return self

    def merge(self, *others: "HubBuilder") -> HubBuilder:
        """Merge other HubBuilders into this one, reconciling settings."""
        for other in others:
            self.tracks.extend(other.tracks)
            # Union of grouping columns
            self.group_by_cols = sorted(
                list(set(self.group_by_cols + other.group_by_cols))
            )
            self.supergroup_by_cols = sorted(
                list(set(self.supergroup_by_cols + other.supergroup_by_cols))
            )
            self.overlay_by_cols = sorted(
                list(set(self.overlay_by_cols + other.overlay_by_cols))
            )
            # Merge extractors
            for ex in other.metadata_extractors:
                if ex not in self.metadata_extractors:
                    self.metadata_extractors.append(ex)
            # Colors: use other's if not set here
            if not self.color_by_col:
                self.color_by_col = other.color_by_col
                self.color_palette = other.color_palette
            # Merge sort_metadata: True if either is True
            self.sort_metadata = self.sort_metadata or other.sort_metadata
        return self

    def to_json(self, path: str | pathlib.Path | None = None) -> str:
        """Serialize state to JSON string or file."""
        data = self.model_dump_json(indent=2, by_alias=True)
        if path:
            with open(path, "w") as f:
                f.write(data)
        return data

    @classmethod
    def from_json(cls, path_or_data: str | pathlib.Path) -> HubBuilder:
        """Reconstruct builder from JSON string or file path."""
        p = pathlib.Path(path_or_data)
        if p.exists() and p.is_file():
            with open(p, "r") as f:
                data = f.read()
        else:
            data = path_or_data
        return cls.model_validate_json(data)

    def _extract_metadata(self):
        """Extract metadata for all tracks using registered extractors."""
        if not self.metadata_extractors:
            return

        for track in self.tracks:
            # Skip if metadata has already been extracted (marked with _metadata_extracted flag)
            if getattr(track, "_metadata_extracted", False):
                continue

            # Use original path if it exists for extraction, as it might have more metadata in its name/path
            # than a temporary converted path.
            path_to_extract = getattr(track, "_original_path", track.path)
            for extractor in self.metadata_extractors:
                extracted = extractor(path_to_extract)
                track.metadata.update(extracted)

            # Mark this track as having metadata extracted
            track._metadata_extracted = True

    def _convert_tracks(self, outdir: pathlib.Path):
        """Convert tracks to UCSC formats (e.g. BED -> BigBed)."""
        from .converters import convert_bed_to_bigbed, convert_gtf_to_biggenepred

        if not self.chrom_sizes or not self.chrom_sizes.exists():
            raise ValueError(
                "chrom_sizes must be provided and exist for track conversion"
            )

        conv_dir = outdir / "converted"
        conv_dir.mkdir(parents=True, exist_ok=True)

        for track in self.tracks:
            if track.path.suffix.lower() == ".bed":
                logger.info(f"Converting {track.path.name} to BigBed")
                dest = conv_dir / track.path.with_suffix(".bb").name
                # Save original path for metadata extraction if needed later
                track._original_path = track.path
                new_path = convert_bed_to_bigbed(track.path, self.chrom_sizes, dest)
                track.path = new_path
                track.track_type = "bigBed"
            elif track.path.suffix.lower() in [".gtf", ".gff"]:
                logger.info(f"Converting {track.path.name} to BigGenePred")
                dest = conv_dir / track.path.with_suffix(".bb").name
                track._original_path = track.path
                new_path = convert_gtf_to_biggenepred(
                    track.path, self.chrom_sizes, dest
                )
                track.path = new_path
                track.track_type = "bigGenePred"

    def _prepare_design_df(self) -> pd.DataFrame:
        """Convert tracks to the DataFrame format used by TrackDesign."""
        # Metadata extraction is idempotent and safe to call multiple times
        # It will be a no-op if already extracted
        self._extract_metadata()

        extension_mapping = {
            "bw": "bigWig",
            "bb": "bigBed",
            "bigbed": "bigBed",
            "bigwig": "bigWig",
            "bed": "bigBed",  # Default for .bed is bigBed (assuming conversion)
            "gtf": "bigGenePred",
            "gff": "bigGenePred",
            "biggenepred": "bigGenePred",
            "narrowpeak": "narrowPeak",
            "broadpeak": "broadPeak",
        }

        data = []
        for track in self.tracks:
            # Metadata is already extracted in build()

            ext = track.track_type or track.path.suffix.lstrip(".")
            ext = extension_mapping.get(ext.lower(), ext)

            row = {
                "fn": str(track.path),
                "path": str(track.path.absolute().resolve()),
                "name": track.name or track.path.stem,
                "ext": ext,
            }
            row.update(track.metadata)
            data.append(row)

        df = pd.DataFrame(data)
        self._ensure_unique_track_names(df)
        self._fill_missing_group_values(df)

        # Sort columns alphabetically if requested (keeping standard columns first)
        if self.sort_metadata:
            standard_cols = ["fn", "path", "name", "ext"]
            existing_standard = [c for c in standard_cols if c in df.columns]
            other_cols = sorted([c for c in df.columns if c not in existing_standard])
            df = df[existing_standard + other_cols]

        return df

    @staticmethod
    def _normalize_name(value: str) -> str:
        return "".join(ch if ch.isalnum() else "_" for ch in value).strip("_")

    @classmethod
    def _append_path_suffix(cls, base: str, path: pathlib.Path, depth: int) -> str:
        parents = list(path.parents)
        # parents[0] is the immediate parent directory
        parts = [p.name for p in parents[:depth] if p.name]
        if not parts:
            parts = [path.name]
        suffix = "__".join(reversed(parts))
        suffix = cls._normalize_name(suffix)
        return f"{base}__{suffix}" if suffix else base

    @classmethod
    def _ensure_unique_track_names(cls, df: pd.DataFrame) -> None:
        """Ensure `name` is unique while keeping names readable."""
        if df.empty or "name" not in df.columns:
            return

        names = df["name"].astype(str).tolist()
        paths = [pathlib.Path(p) for p in df["fn"].tolist()]
        counts = pd.Series(names).value_counts()
        used: set[str] = set()

        for i, name in enumerate(names):
            candidate = name
            if counts[name] > 1 or candidate in used:
                path = paths[i]
                depth = 1
                while candidate in used:
                    candidate = cls._append_path_suffix(name, path, depth)
                    depth += 1
                    if depth > len(path.parents) + 1:
                        candidate = f"{name}__{i + 1}"
                        break
            used.add(candidate)
            names[i] = candidate

        df["name"] = names

    def _fill_missing_group_values(self, df: pd.DataFrame) -> None:
        """Fill NA/empty values in grouping columns with a configured label."""
        if not self.missing_group_label:
            return

        if self.missing_group_columns:
            target_columns = list(dict.fromkeys(self.missing_group_columns))
        else:
            target_columns = list(
                dict.fromkeys(
                    [
                        *self.group_by_cols,
                        *self.supergroup_by_cols,
                        *self.overlay_by_cols,
                    ]
                )
            )

        for col in target_columns:
            if col not in df.columns:
                continue
            df[col] = df[col].replace("", pd.NA).fillna(self.missing_group_label)

    def build(
        self,
        name: str,
        genome: str,
        outdir: str | pathlib.Path,
        hub_email: str = "",
        **kwargs,
    ) -> Any:
        """Build the hub and export sidecar config."""
        outdir = pathlib.Path(outdir)
        outdir.mkdir(parents=True, exist_ok=True)

        # 1. Handle conversions
        if self.convert_files:
            self._convert_tracks(outdir)

        # 2. Extract metadata BEFORE saving config so extracted data is included
        self._extract_metadata()

        # 3. Save sidecar config (now with extracted metadata)
        self.to_json(outdir / "tracknado_config.json")

        df = self._prepare_design_df()

        design = TrackDesign.from_design(
            df,
            color_by=self.color_by_col,
            subgroup_by=self.group_by_cols if self.group_by_cols else None,
            supergroup_by=self.supergroup_by_cols if self.supergroup_by_cols else None,
            overlay_by=self.overlay_by_cols if self.overlay_by_cols else None,
            **kwargs,
        )

        hub = HubGenerator(
            hub_name=name,
            genome=genome,
            track_design=design,
            outdir=outdir,
            hub_email=hub_email,
            **self.custom_genome_config,
            **kwargs,
        )

        return hub

Functions

add_tracks(paths, **common_metadata)

Add multiple tracks from paths.

Source code in tracknado/builder.py
45
46
47
48
49
50
51
52
def add_tracks(
    self, paths: list[str] | list[pathlib.Path], **common_metadata: str
) -> HubBuilder:
    """Add multiple tracks from paths."""
    for p in paths:
        path = pathlib.Path(p)
        self.tracks.append(Track(path=path, metadata=common_metadata.copy()))
    return self

add_tracks_from_df(df, fn_col='fn')

Add tracks from a pandas DataFrame.

Source code in tracknado/builder.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def add_tracks_from_df(self, df: pd.DataFrame, fn_col: str = "fn") -> HubBuilder:
    """Add tracks from a pandas DataFrame."""
    df = df.copy()
    if fn_col in df.columns and "ext" not in df.columns:
        df["ext"] = df[fn_col].apply(lambda x: pathlib.Path(x).suffix.strip("."))

    try:
        df = TrackDataFrameSchema.validate(df)
    except Exception as e:
        logger.warning(f"DataFrame validation failed: {e}")

    for _, row in df.iterrows():
        path = pathlib.Path(row[fn_col])
        metadata = {
            k: str(v)
            for k, v in row.items()
            if k not in [fn_col, "ext", "path", "name"] and pd.notna(v)
        }
        track = Track(path=path, metadata=metadata)
        if "name" in row and pd.notna(row["name"]):
            track.name = row["name"]
        if "ext" in row and pd.notna(row["ext"]):
            track.track_type = row["ext"]
        self.tracks.append(track)
    return self

build(name, genome, outdir, hub_email='', **kwargs)

Build the hub and export sidecar config.

Source code in tracknado/builder.py
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
def build(
    self,
    name: str,
    genome: str,
    outdir: str | pathlib.Path,
    hub_email: str = "",
    **kwargs,
) -> Any:
    """Build the hub and export sidecar config."""
    outdir = pathlib.Path(outdir)
    outdir.mkdir(parents=True, exist_ok=True)

    # 1. Handle conversions
    if self.convert_files:
        self._convert_tracks(outdir)

    # 2. Extract metadata BEFORE saving config so extracted data is included
    self._extract_metadata()

    # 3. Save sidecar config (now with extracted metadata)
    self.to_json(outdir / "tracknado_config.json")

    df = self._prepare_design_df()

    design = TrackDesign.from_design(
        df,
        color_by=self.color_by_col,
        subgroup_by=self.group_by_cols if self.group_by_cols else None,
        supergroup_by=self.supergroup_by_cols if self.supergroup_by_cols else None,
        overlay_by=self.overlay_by_cols if self.overlay_by_cols else None,
        **kwargs,
    )

    hub = HubGenerator(
        hub_name=name,
        genome=genome,
        track_design=design,
        outdir=outdir,
        hub_email=hub_email,
        **self.custom_genome_config,
        **kwargs,
    )

    return hub

color_by(column, palette='tab20')

Specify column for track coloring.

Source code in tracknado/builder.py
113
114
115
116
117
def color_by(self, column: str, palette: str = "tab20") -> HubBuilder:
    """Specify column for track coloring."""
    self.color_by_col = column
    self.color_palette = palette
    return self

from_json(path_or_data) classmethod

Reconstruct builder from JSON string or file path.

Source code in tracknado/builder.py
184
185
186
187
188
189
190
191
192
193
@classmethod
def from_json(cls, path_or_data: str | pathlib.Path) -> HubBuilder:
    """Reconstruct builder from JSON string or file path."""
    p = pathlib.Path(path_or_data)
    if p.exists() and p.is_file():
        with open(p, "r") as f:
            data = f.read()
    else:
        data = path_or_data
    return cls.model_validate_json(data)

group_by(*columns, as_supertrack=False)

Specify columns to group by. If as_supertrack is True, these columns will be used for SuperTracks instead of dimensions in a CompositeTrack.

Source code in tracknado/builder.py
87
88
89
90
91
92
93
94
95
def group_by(self, *columns: str, as_supertrack: bool = False) -> "HubBuilder":
    """Specify columns to group by. If as_supertrack is True, these columns
    will be used for SuperTracks instead of dimensions in a CompositeTrack.
    """
    if as_supertrack:
        self.supergroup_by_cols.extend(columns)
    else:
        self.group_by_cols.extend(columns)
    return self

merge(*others)

Merge other HubBuilders into this one, reconciling settings.

Source code in tracknado/builder.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
def merge(self, *others: "HubBuilder") -> HubBuilder:
    """Merge other HubBuilders into this one, reconciling settings."""
    for other in others:
        self.tracks.extend(other.tracks)
        # Union of grouping columns
        self.group_by_cols = sorted(
            list(set(self.group_by_cols + other.group_by_cols))
        )
        self.supergroup_by_cols = sorted(
            list(set(self.supergroup_by_cols + other.supergroup_by_cols))
        )
        self.overlay_by_cols = sorted(
            list(set(self.overlay_by_cols + other.overlay_by_cols))
        )
        # Merge extractors
        for ex in other.metadata_extractors:
            if ex not in self.metadata_extractors:
                self.metadata_extractors.append(ex)
        # Colors: use other's if not set here
        if not self.color_by_col:
            self.color_by_col = other.color_by_col
            self.color_palette = other.color_palette
        # Merge sort_metadata: True if either is True
        self.sort_metadata = self.sort_metadata or other.sort_metadata
    return self

overlay_by(*columns)

Specify columns for overlay tracks.

Source code in tracknado/builder.py
119
120
121
122
def overlay_by(self, *columns: str) -> HubBuilder:
    """Specify columns for overlay tracks."""
    self.overlay_by_cols.extend(columns)
    return self

to_json(path=None)

Serialize state to JSON string or file.

Source code in tracknado/builder.py
176
177
178
179
180
181
182
def to_json(self, path: str | pathlib.Path | None = None) -> str:
    """Serialize state to JSON string or file."""
    data = self.model_dump_json(indent=2, by_alias=True)
    if path:
        with open(path, "w") as f:
            f.write(data)
    return data

with_chrom_sizes(path)

Set the chrom.sizes file for track conversion.

Source code in tracknado/builder.py
145
146
147
148
def with_chrom_sizes(self, path: Union[str, pathlib.Path]) -> HubBuilder:
    """Set the chrom.sizes file for track conversion."""
    self.chrom_sizes = pathlib.Path(path)
    return self

with_convert_files(enabled=True)

Enable or disable implicit track conversion.

Source code in tracknado/builder.py
129
130
131
132
def with_convert_files(self, enabled: bool = True) -> HubBuilder:
    """Enable or disable implicit track conversion."""
    self.convert_files = enabled
    return self

with_custom_genome(name, twobit_file, organism, default_position='chr1:1000-2000')

Configure a custom genome (Assembly Hub) for this hub.

Source code in tracknado/builder.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
def with_custom_genome(
    self,
    name: str,
    twobit_file: str | pathlib.Path,
    organism: str,
    default_position: str = "chr1:1000-2000",
) -> "HubBuilder":
    """Configure a custom genome (Assembly Hub) for this hub."""
    self.custom_genome_config = {
        "custom_genome": True,
        "genome_twobit": str(twobit_file),
        "genome_organism": organism,
        "genome_default_position": default_position,
    }
    return self

with_metadata_extractor(fn)

Add a metadata extractor function.

Source code in tracknado/builder.py
80
81
82
83
84
85
def with_metadata_extractor(
    self, fn: Callable[[pathlib.Path], dict[str, str]]
) -> HubBuilder:
    """Add a metadata extractor function."""
    self.metadata_extractors.append(fn)
    return self

with_missing_groups(label='NA', *columns)

Replace missing grouping values with a label before hub generation.

If columns are not provided, applies to all active grouping columns.

Source code in tracknado/builder.py
134
135
136
137
138
139
140
141
142
143
def with_missing_groups(
    self, label: str = "NA", *columns: str
) -> HubBuilder:
    """Replace missing grouping values with a label before hub generation.

    If `columns` are not provided, applies to all active grouping columns.
    """
    self.missing_group_label = label
    self.missing_group_columns = list(columns)
    return self

with_sort_metadata(enabled=True)

Enable or disable sorting of metadata columns in output.

Source code in tracknado/builder.py
124
125
126
127
def with_sort_metadata(self, enabled: bool = True) -> HubBuilder:
    """Enable or disable sorting of metadata columns in output."""
    self.sort_metadata = enabled
    return self
Source code in tracknado/api.py
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
class HubGenerator:
    def __init__(
        self,
        hub_name: str,
        genome: str,
        track_design: TrackDesign,
        outdir: pathlib.Path,
        description_html: pathlib.Path = None,
        hub_email: str = "",
        custom_genome: bool = False,
        genome_twobit: pathlib.Path = None,
        genome_organism: str = None,
        genome_default_position: str = "chr1:10000-20000",
    ):
        # Basic parameters for hub creation
        self.hub_name = hub_name
        self.genome_name = genome
        self.track_design = track_design
        self.outdir = outdir
        self.custom_genome = custom_genome
        self.description_url_path = description_html

        # Parameters for custom genomes
        self._genome_twobit = genome_twobit
        self._genome_organism = genome_organism
        self._genome_default_position = genome_default_position

        # Create the basic hub
        self._hub = trackhub.Hub(
            hub_name, short_label=hub_name, long_label=hub_name, email=hub_email
        )

        self.trackdb = trackhub.TrackDb()
        self._used_track_names: set[str] = set()
        _genome = self._get_genome_file()  # type: ignore
        _genomes_file = trackhub.GenomesFile()

        # Add these to the hub
        _genome.add_trackdb(self.trackdb)
        self._hub.add_genomes_file(_genomes_file)
        _genomes_file.add_genome(_genome)

        self._add_tracks_to_hub()

    def _add_tracks_to_hub(self) -> None:
        # Loop through each entry in the details dataframe

        for row in self.track_design.details.itertuples():
            has_composite = False
            has_overlay = False

            # If the row has a "composite" attribute
            if hasattr(row, "composite") and pd.notna(row.composite):
                has_composite = True
                composite_track = self.track_design.composite_tracks[row.composite]
                # Create a new track and add it as a subtrack to the composite track
                track = self._get_track(row, suffix=f"_{composite_track.name}")
                composite_track.add_subtrack(track)

            # If the row has an "overlay" attribute
            if hasattr(row, "overlay") and pd.notna(row.overlay):
                has_overlay = True
                overlay_track = self.track_design.overlay_tracks[row.overlay]
                # Create a new track and add it to the overlay track
                track = self._get_track(row, suffix=f"_{overlay_track.name}")

                # Ignore the track if it is not a signal track e.g. bigWig
                if track.tracktype not in [
                    "bigWig",
                ]:
                    logger.warning(
                        f"Track {track.name} is not a signal track and will be ignored for the overlay track {overlay_track.name}"
                    )
                else:
                    overlay_track.add_subtrack(track)

            # If the row doesn't have a "supertrack" attribute
            if not hasattr(row, "supertrack") and not has_composite and not has_overlay:
                # Create a new track and add it to the trackdb
                track = self._get_track(row)
                self.trackdb.add_tracks(track)

        # Add the supertracks or composite/overlay tracks to the trackdb
        if self.track_design.super_tracks:
            tracks = self.track_design.super_tracks.values()

            # Ensure the composite and/or overlay tracks have the group attribute set
            if self.custom_genome:
                for t in [
                    *self.track_design.composite_tracks.values(),
                    *self.track_design.overlay_tracks.values(),
                ]:
                    t.add_params(group=self._hub.hub)

        else:
            tracks = [
                *self.track_design.composite_tracks.values(),
                *self.track_design.overlay_tracks.values(),
            ]

        # Add the composite/overlay and supertracks to the trackdb
        for ii, track in enumerate(tracks):
            # Add group if custom genome
            if self.custom_genome:
                track.add_params(group=self._hub.hub)
            self.trackdb.add_tracks(track)

    def _get_track(self, track: namedtuple, suffix: str = "") -> trackhub.Track:
        """Generate a trackhub.Track object from a row in the details dataframe"""

        extra_kwargs = dict()
        if hasattr(track, "color"):
            extra_kwargs["color"] = ",".join([str(x) for x in track.color])

        if hasattr(track, "subgroup_names"):
            extra_kwargs["subgroups"] = {
                subgroup_name: getattr(track, subgroup_name)
                for subgroup_name in track.subgroup_names
            }

        if self.custom_genome:
            extra_kwargs["group"] = self._hub.hub

        if track.ext == "bigWig":
            extra_kwargs.update(
                {
                    "maxHeightPixels": "100:50:11",
                    "visibility": "full",
                    "viewLimits": "0:100",
                    "autoScale": "on",
                    "windowingFunction": "mean",
                }
            )

        elif track.ext == "bigBed":
            extra_kwargs.update(
                {
                    "visibility": "pack",
                }
            )

        elif track.ext == "bigGenePred":
            extra_kwargs.update(
                {
                    "visibility": "pack",
                    "baseColorDefault": "genomicCodons",
                }
            )

        base_name = "".join([trackhub.helpers.sanitize(track.name), suffix])
        unique_name = self._ensure_unique_track_name(base_name)

        return trackhub.Track(
            name=unique_name,
            shortLabel=" ".join(re.split(r"[.|_|\s+|-]", track.name)),
            longLabel=" ".join(re.split(r"[.|_|\s+|-]", track.name)),
            source=str(track.path),
            tracktype=track.ext,
            **extra_kwargs,
        )

    def _ensure_unique_track_name(self, name: str) -> str:
        """Ensure UCSC track IDs are unique after sanitization."""
        if name not in self._used_track_names:
            self._used_track_names.add(name)
            return name

        idx = 2
        while f"{name}_{idx}" in self._used_track_names:
            idx += 1
        unique = f"{name}_{idx}"
        self._used_track_names.add(unique)
        return unique

    def _get_genome_file(self) -> trackhub.Genome:
        if not self.custom_genome:
            genome = trackhub.Genome(self.genome_name)
            groups_file = None
        else:
            genome = trackhub.Assembly(
                genome=self.genome_name,
                twobit_file=self._genome_twobit,
                organism=self._genome_organism,
                defaultPos=self._genome_default_position,
            )

            groups_file = trackhub.GroupsFile(
                [
                    trackhub.GroupDefinition(
                        name=self.hub_name, priority=1, default_is_closed=False
                    ),
                ]
            )

            genome.add_groups(groups_file)

        return genome

    def stage_hub(
        self,
        remove_existing: bool = False,
    ):
        with tempfile.TemporaryDirectory() as tmpdir:
            trackhub.upload.stage_hub(self._hub, staging=tmpdir)

            if self.description_url_path:
                description_basename = os.path.basename(self.description_url_path)
                with open(
                    os.path.join(tmpdir, f"{self._hub.hub}.hub.txt"), "a"
                ) as hubtxt:
                    hubtxt.write("\n")
                    hubtxt.write(
                        f"descriptionUrl {self.genome_name}/{description_basename}\n"
                    )

                shutil.copy(
                    self.description_url_path,
                    os.path.join(tmpdir, self.genome_name),
                )

            # Remove existing hub directory if requested
            if remove_existing:
                if self.outdir.exists():
                    logger.info(f"Removing existing hub at {self.outdir}")
                    shutil.rmtree(self.outdir)
                else:
                    logger.warning(f"--remove-existing was requested but no hub exists at {self.outdir}")

            # Copy to the new location
            shutil.copytree(
                tmpdir,
                self.outdir,
                dirs_exist_ok=True,
                symlinks=False,
            )

            subprocess.run(["chmod", "-R", "2755", self.outdir])
Source code in tracknado/api.py
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
class TrackDesign:
    def __init__(
        self,
        details: pd.DataFrame,
        color_by: list[str] = None,
        subgroup_by: list[str] = None,
        overlay_by: list[str] = None,
        supergroup_by: list[str] = None,
        **kwargs,
    ):
        self.details = details
        self._supertrack_columns = list(supergroup_by) if supergroup_by else list()
        self._overlay_columns = list(overlay_by) if overlay_by else list()
        self._subgroup_columns = list(subgroup_by) if subgroup_by else list()
        self.subgroup_definitions = list() if subgroup_by else None
        self._color_columns = list(color_by) if color_by else list()

        self._add_subgroupings(
            supergroup_by=self._supertrack_columns, subgroup_by=self._subgroup_columns
        )

        self.super_tracks = self._get_super_tracks()
        self._add_supertrack_indicators()

        self.composite_tracks = self._get_composite_tracks()
        self._add_composite_track_indicators()

        self.overlay_tracks = self._get_overlay_tracks()
        self._add_overlay_track_indicators()

        self._add_track_colors(color_by=color_by)

    @classmethod
    def from_design(cls, design: pd.DataFrame, **kwargs) -> "TrackDesign":
        return cls(design, **kwargs)

    def _add_track_colors(
        self,
        color_by: str | list[str] = None,
        palette: str = "tab20",
        color_column: str = None,
    ) -> None:
        """Add a column to the details dataframe with a color for each track"""

        from PIL import ImageColor

        if color_by:
            if isinstance(color_by, str):
                color_by = [color_by]

            assert all([c in self.details.columns for c in color_by]), (
                f"Color-By columns {color_by} missing"
            )  # type: ignore

            try:
                # Get a palette with enough colors for the unique groups in the details
                import seaborn as sns

                n_colors = len(self.details[color_by].drop_duplicates())
                colors = sns.color_palette(palette, n_colors=n_colors).as_hex()  # type: ignore

                # Assign a color to each group
                color_dict = {}
                for i, group in enumerate(
                    self.details[color_by].drop_duplicates().itertuples()
                ):
                    color_dict[tuple([getattr(group, c) for c in color_by])] = colors[i]  # type: ignore

                # Add a column to the details dataframe with the color for each track
                self.details["color"] = self.details[color_by].apply(
                    lambda row: ImageColor.getrgb(color_dict[tuple([c for c in row])]),
                    axis=1,
                )

            except NameError:
                raise NameError(
                    "Palette not found. Try one of the following: 'tab20', 'tab20b', 'tab20c'"
                )

        elif color_column:
            assert color_column in self.details.columns, (
                f"Color column {color_column} missing"
            )

            colors = []
            for i, color in enumerate(self.details[color_column]):
                if isinstance(color, tuple):
                    c = color
                elif isinstance(color, str):
                    if color.startswith("#"):
                        c = ImageColor.getrgb(color)
                    else:
                        c = color.split(",")
                        c = tuple([int(x) for x in c])
                else:
                    raise ValueError(
                        f"Color column {color_column} must be a tuple or string"
                    )

                colors.append(c)

            self.details["color"] = colors

    def _add_subgroup_definitions_to_df(
        self, df: pd.DataFrame, subgroup_by: list[str] = None
    ) -> pd.DataFrame:
        """Add a column to the details dataframe with a `trackhub.SubGroupDefinition` for each track"""

        assert all([c in df.columns for c in subgroup_by]), (
            f"Subgroup-By columns {subgroup_by} missing"
        )
        df = df.copy()

        # Loop through all columns provided and generate a subgroup definition for each
        subgroup_definitions = []
        for column in subgroup_by:
            # Get a list of unique values in the column
            unique_values = df[column].unique()
            subgroup_definition = trackhub.SubGroupDefinition(
                name=column,
                label=column,
                mapping={value: value for value in unique_values},
            )
            subgroup_definitions.append(subgroup_definition)

        # Add a column to the details dataframe with the subgroup definition for each track
        df["subgroup_names"] = [
            tuple([col for col in subgroup_by]) for i in range(df.shape[0])
        ]
        df["subgroup_definition"] = [subgroup_definitions for i in range(df.shape[0])]

        self.subgroup_definitions.extend(subgroup_definitions)

        return df

    def _add_subgroupings(
        self, supergroup_by: list[str] = None, subgroup_by: list[str] = None
    ) -> None:
        """Add a column to the details dataframe with a `trackhub.SubGroupDefinition` for each track.

        If `supergroup_by` is provided, the subgroup definitions will be added to the dataframe
        grouped by the supergroup columns.

        If `supergroup_by` is not provided, the subgroup definitions will be added to the dataframe
        as a single group.
        """

        if subgroup_by:
            assert all([c in self.details.columns for c in subgroup_by]), (
                f"Subgroup-By columns {subgroup_by} missing"
            )

            if supergroup_by:
                assert not any(subgroup in supergroup_by for subgroup in subgroup_by), (
                    f"SubGroup columns {subgroup_by} cannot be in SuperGroup columns {supergroup_by}"
                )

                self.details = (
                    self.details.groupby(supergroup_by)
                    .apply(
                        self._add_subgroup_definitions_to_df,
                        subgroup_by=subgroup_by,
                        include_groups=False,
                    )
                    .reset_index(drop=False)
                )
                # Drop the extra index levels if they are named after the columns
                self.details = self.details.loc[:, ~self.details.columns.duplicated()]
            else:
                self.details = self._add_subgroup_definitions_to_df(
                    self.details, subgroup_by=subgroup_by
                )

    def _get_super_tracks(self) -> dict[str, trackhub.SuperTrack]:
        """Generate a dictionary of SuperTracks from the details dataframe"""

        if self._supertrack_columns:
            assert all([c in self.details.columns for c in self._supertrack_columns]), (
                f"SuperTrack columns {self._supertrack_columns} missing"
            )

            supertracks = dict()
            for grouping, df in self.details.reset_index(drop=True).groupby(
                self._supertrack_columns, as_index=False
            ):
                if isinstance(grouping, str):
                    track_id = (grouping,)
                elif len(grouping) == 1:
                    track_id = grouping
                else:
                    track_id = tuple(grouping)

                if len(track_id) == 1:
                    track_name = track_id[0]
                else:
                    track_name = "_".join(track_id)

                supertracks[get_hash(track_id)] = trackhub.SuperTrack(
                    name=track_name,
                )

        else:
            supertracks = dict()

        return supertracks

    def _add_supertrack_indicators(self):
        """Add a column to the details dataframe with a SuperTrack indicator for each track"""

        if self._supertrack_columns:
            assert all([c in self.details.columns for c in self._supertrack_columns]), (
                f"SuperTrack columns {self._supertrack_columns} missing"
            )

            self.details["supertrack"] = get_hash_for_df(
                self.details, self._supertrack_columns
            )

    def _get_composite_tracks(self) -> dict[str, trackhub.CompositeTrack]:
        """Generate a dictionary of CompositeTracks from the details dataframe"""

        composite_tracks = dict()
        dimensions = dict(
            zip(
                [f"dim{d}" for d in ["X", "Y", "A", "B", "C", "D"]],
                self._subgroup_columns,
            )
        )

        if "supertrack" in self.details.columns:
            for (supertrack, ext), df in self.details.groupby(["supertrack", "ext"]):
                supertrack_name = self.super_tracks[supertrack].name
                composite_name = "_".join([supertrack_name, ext])

                composite = trackhub.CompositeTrack(
                    name=composite_name,
                    tracktype=ext,
                    dimensions=" ".join([f"{k}={v}" for k, v in dimensions.items()])
                    if dimensions
                    else None,
                    sortOrder=" ".join([f"{k}=+" for k in self._subgroup_columns]),
                    visibility="hide",
                    dragAndDrop="subTracks",
                )

                composite.add_subgroups(self.subgroup_definitions)

                self.super_tracks[supertrack].add_tracks(composite)
                composite_tracks[get_hash((supertrack, ext))] = composite

        elif self._subgroup_columns:
            for ext, df in self.details.groupby("ext"):
                composite = trackhub.CompositeTrack(
                    name=ext,
                    tracktype=ext,
                    visibility="hide",
                    dragAndDrop="subTracks",
                    dimensions=" ".join([f"{k}={v}" for k, v in dimensions.items()])
                    if dimensions
                    else None,
                    sortOrder=" ".join([f"{k}=+" for k in self._subgroup_columns]),
                )

                composite.add_subgroups(self.subgroup_definitions)
                composite_tracks[get_hash((ext,))] = composite

        else:
            composite_tracks = dict()

        return composite_tracks

    def _add_composite_track_indicators(self):
        """Add a column to the details dataframe with a CompositeTrack indicator for each track"""

        if self.composite_tracks:
            composite_columns = ["supertrack"] if self._supertrack_columns else []
            composite_columns.append("ext")

            self.details["composite"] = get_hash_for_df(self.details, composite_columns)

            assert self.details["composite"].isin(self.composite_tracks.keys()).all(), (
                "Composite tracks not found in details dataframe"
            )

    def _get_overlay_tracks(self):
        """Generate a dictionary of OverlayTracks from the details dataframe"""

        if self._overlay_columns:
            assert all([c in self.details.columns for c in self._overlay_columns]), (
                f"Overlay columns {self._overlay_columns} missing"
            )

            overlay_tracks = dict()
            overlay_columns = (
                list(self._overlay_columns)
                if not isinstance(self._overlay_columns, str)
                else [
                    self._overlay_columns,
                ]
            )

            if "supertrack" in self.details.columns:
                for (supertrack, overlay), df in self.details.groupby(
                    ["supertrack", *self._overlay_columns]
                ):
                    supertrack_name = self.super_tracks[supertrack].name

                    if isinstance(overlay, str):
                        overlay_name = "_".join([supertrack_name, overlay]) + "_overlay"
                    else:
                        overlay_name = (
                            "_".join([supertrack_name, *overlay]) + "_overlay"
                        )

                    overlay_track = trackhub.AggregateTrack(
                        aggregate="transparentOverlay",
                        name=overlay_name,
                        tracktype="bigWig",
                    )

                    self.super_tracks[supertrack].add_tracks(overlay_track)
                    overlay_tracks[get_hash(tuple([supertrack, overlay]))] = (
                        overlay_track
                    )

            else:
                for overlay, df in self.details.groupby(self._overlay_columns):
                    overlay_name = (
                        "_".join(overlay) if isinstance(overlay, tuple) else overlay
                    )
                    overlay_id = (
                        tuple(overlay) if isinstance(overlay, tuple) else (overlay,)
                    )

                    overlay_track = trackhub.AggregateTrack(
                        aggregate="transparentOverlay",
                        name=overlay_name,
                        tracktype="bigWig",
                    )
                    overlay_tracks[get_hash(overlay_id)] = overlay_track

        else:
            overlay_tracks = dict()

        return overlay_tracks

    def _add_overlay_track_indicators(self):
        """Add a column to the details dataframe with an OverlayTrack indicator for each track"""

        if self._overlay_columns:
            overlay_columns = ["supertrack"] if self._supertrack_columns else []
            overlay_columns.extend(self._overlay_columns)

            # Only assign indicators to rows that actually have all overlay columns set
            has_overlay_cols = self.details[overlay_columns].notna().all(axis=1)

            self.details.loc[has_overlay_cols, "overlay"] = get_hash_for_df(
                self.details[has_overlay_cols], overlay_columns
            )

            # Verification should only apply to rows marked with 'overlay'
            valid_indicators = self.details["overlay"].dropna().unique()
            missing = [
                i for i in valid_indicators if i not in self.overlay_tracks.keys()
            ]
            if missing:
                logger.warning(f"Overlay tracks not found for indices: {missing}")
                # We can choose to either raise or just clear those indicators
                self.details.loc[self.details["overlay"].isin(missing), "overlay"] = (
                    None
                )

        return self

Data Models

Bases: BaseModel

Single track with validated metadata.

Source code in tracknado/models.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class Track(BaseModel):
    """Single track with validated metadata."""
    path: Path
    name: str | None = None  # Auto-derived from path if None
    metadata: dict[str, str] = Field(default_factory=dict)
    color: tuple[int, int, int] | None = None
    track_type: str | None = None  # bigWig, bigBed, etc.

    @field_validator('path')
    @classmethod
    def validate_path_exists(cls, v):
        if not v.exists():
            # Note: We might want a way to allow virtual paths if staging doesn't require immediate existence
            # but for now we enforce existence as per requirements.
            pass 
        return v

    @field_validator('color')
    @classmethod  
    def validate_color_range(cls, v):
        if v and not all(0 <= c <= 255 for c in v):
            raise ValueError("Color values must be 0-255")
        return v

Bases: BaseModel

Hierarchical grouping of tracks.

Source code in tracknado/models.py
30
31
32
33
34
35
class TrackGroup(BaseModel):
    """Hierarchical grouping of tracks."""
    name: str
    tracks: list[Track] = Field(default_factory=list)
    subgroups: list[TrackGroup] = Field(default_factory=list)
    metadata: dict[str, str] = Field(default_factory=dict)

File Converters

Convert a BED file to BigBed format.

Source code in tracknado/converters.py
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
def convert_bed_to_bigbed(
    input_bed: Path, 
    chrom_sizes: Path, 
    output_bb: Path | None = None,
    force_container: bool = False
) -> Path:
    """Convert a BED file to BigBed format."""
    if output_bb is None:
        output_bb = input_bed.with_suffix(".bb")

    # 1. Find tool
    cmd_prefix = []
    if not force_container:
        local_tool = ToolFinder.find_local("bedToBigBed")
        if local_tool:
            cmd_prefix = [local_tool]

    if not cmd_prefix:
        container_cmd = ToolFinder.get_container_cmd("bedToBigBed")
        if container_cmd:
            cmd_prefix = container_cmd
        else:
            raise RuntimeError(
                "bedToBigBed not found locally and no container engine (Apptainer/Docker) detected. "
                "Please install bedToBigBed or a container engine."
            )

    # 2. Sort BED file (required for bedToBigBed)
    logger.info(f"Sorting {input_bed.name}...")
    sorted_bed = tempfile.NamedTemporaryFile(suffix=".sorted.bed", delete=False).name
    try:
        # We use LC_ALL=C for consistent sorting
        env = os.environ.copy()
        env["LC_ALL"] = "C"
        subprocess.run(
            ["sort", "-k1,1", "-k2,2n", str(input_bed)],
            stdout=open(sorted_bed, "w"),
            check=True,
            env=env
        )

        # 3. Run bedToBigBed
        logger.info(f"Converting {input_bed.name} to BigBed...")

        # Prepare actual command (handling Docker mount replacement if needed)
        cwd = os.getcwd()
        final_cmd = []
        for part in cmd_prefix:
            if isinstance(part, str):
                final_cmd.append(part.replace("{cwd}", cwd))

        final_cmd.extend([sorted_bed, str(chrom_sizes), str(output_bb)])

        subprocess.run(final_cmd, check=True)
        logger.info(f"Successfully created {output_bb}")

    finally:
        if os.path.exists(sorted_bed):
            os.remove(sorted_bed)

    return output_bb

Convert a GTF or GFF file to BigGenePred format.

Source code in tracknado/converters.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
def convert_gtf_to_biggenepred(
    input_file: Path,
    chrom_sizes: Path,
    output_bb: Path | None = None,
    force_container: bool = False
) -> Path:
    """Convert a GTF or GFF file to BigGenePred format."""
    if output_bb is None:
        output_bb = input_file.with_suffix(".bb")

    ext = input_file.suffix.lower()
    conv_tool = "gtfToGenePred" if ext == ".gtf" else "gff3ToGenePred"

    # Check for tools
    tools = [conv_tool, "genePredToBigGenePred", "bedToBigBed"]
    cmd_prefixes = {}

    for tool in tools:
        cmd_prefix = []
        if not force_container:
            local_tool = ToolFinder.find_local(tool)
            if local_tool:
                cmd_prefix = [local_tool]

        if not cmd_prefix:
            container_cmd = ToolFinder.get_container_cmd(tool)
            if container_cmd:
                cmd_prefix = container_cmd
            else:
                 raise RuntimeError(
                    f"{tool} not found locally and no container engine detected."
                )
        cmd_prefixes[tool] = cmd_prefix

    as_file = Path(__file__).parent / "as_files" / "bigGenePred.as"
    if not as_file.exists():
        raise FileNotFoundError(f"Required autoSql file not found: {as_file}")

    intermediate_genepred = tempfile.NamedTemporaryFile(suffix=".genePred", delete=False).name
    intermediate_txt = tempfile.NamedTemporaryFile(suffix=".txt", delete=False).name

    try:
        cwd = os.getcwd()

        # 1. Convert to genePred
        logger.info(f"Converting {input_file.name} to genePred...")
        cmd1 = []
        for part in cmd_prefixes[conv_tool]:
            cmd1.append(part.replace("{cwd}", cwd))
        if conv_tool == "gtfToGenePred":
            cmd1.extend(["-genePredExt", str(input_file), intermediate_genepred])
        else: # gff3ToGenePred
            cmd1.extend([str(input_file), intermediate_genepred])
        subprocess.run(cmd1, check=True)

        # 2. Convert to bigGenePred text
        logger.info("Converting genePred to bigGenePred text...")
        cmd2 = []
        for part in cmd_prefixes["genePredToBigGenePred"]:
            cmd2.append(part.replace("{cwd}", cwd))
        cmd2.extend([intermediate_genepred, intermediate_txt])
        subprocess.run(cmd2, check=True)

        # 3. Sort bigGenePred text (required for bedToBigBed)
        logger.info("Sorting bigGenePred text...")
        sorted_txt = tempfile.NamedTemporaryFile(suffix=".sorted.txt", delete=False).name
        env = os.environ.copy()
        env["LC_ALL"] = "C"
        subprocess.run(
            ["sort", "-k1,1", "-k2,2n", intermediate_txt],
            stdout=open(sorted_txt, "w"),
            check=True,
            env=env
        )

        # 4. Convert to BigBed with .as
        logger.info(f"Converting to BigGenePred: {output_bb.name}...")
        cmd4 = []
        for part in cmd_prefixes["bedToBigBed"]:
            cmd4.append(part.replace("{cwd}", cwd))
        cmd4.extend([
            "-type=bed12+8", 
            "-tab", 
            f"-as={as_file}", 
            sorted_txt, 
            str(chrom_sizes), 
            str(output_bb)
        ])
        subprocess.run(cmd4, check=True)

        if os.path.exists(sorted_txt):
            os.remove(sorted_txt)

        logger.info(f"Successfully created {output_bb}")

    finally:
        for f in [intermediate_genepred, intermediate_txt]:
            if os.path.exists(f):
                os.remove(f)

    return output_bb

Metadata Extractors

Extract metadata from seqnado file paths.

Pattern: .../seqnado_output/{assay}/[bigwigs/peaks]/{method}/{norm}/{sample}_{strand|viewpoint}.[bigWig|bed] Example: .../seqnado_output/atac/bigwigs/atac_tn5/cpm/sample1.bigWig

Source code in tracknado/extractors.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
def from_seqnado_path(path: Path) -> dict[str, str]:
    """Extract metadata from seqnado file paths.

    Pattern: .../seqnado_output/{assay}/[bigwigs/peaks]/{method}/{norm}/{sample}_{strand|viewpoint}.[bigWig|bed]
    Example: .../seqnado_output/atac/bigwigs/atac_tn5/cpm/sample1.bigWig
    """
    metadata = {}
    parts = list(path.parts)

    metadata["assay"] = determine_seqnado_assay(parts)
    metadata["norm"] = parts[-2]
    metadata["method"] = parts[-3]
    metadata['file_type'] = parts[-4]   # bigwigs or peaks for now

    # samplename is usually the stem, but seqnado sometimes has extensions like .plus/.minus
    # We'll take the first part before any dots or underscores commonly used
    stem = path.stem
    metadata["samplename"] = re.split(r"[._]", stem)[0]

    # If the assay is "MCC" we need to extract the viewpoint from the filename
    # Pattern looks like: /bigwigs/mcc/replicates/{sample}_{viewpoint_group}.bigWig
    if metadata["assay"] == "MCC":
        metadata["viewpoint"] = re.split(r"[._]", stem)[-1].split(".")[0]

    # For RNA we need to extract the strandedness from the filename
    # Pattern looks like: /bigwigs/{method}/{norm}/{sample}_{strand}.bigWig
    elif metadata["assay"] == "RNA":
        metadata["strand"] = re.split(r"[._]", stem)[-1].split(".")[0]

    return metadata

Create extractor from regex pattern with named groups.

Source code in tracknado/extractors.py
51
52
53
54
55
56
57
58
59
60
61
def from_filename_pattern(pattern: str) -> MetadataExtractor:
    """Create extractor from regex pattern with named groups."""
    regex = re.compile(pattern)

    def extractor(path: Path) -> dict[str, str]:
        match = regex.search(path.name)
        if match:
            return match.groupdict()
        return {}

    return extractor

Extract metadata from parent directory names.

Parameters:

Name Type Description Default
depth int

How many levels up to go

1
names list[str]

Optional list of keys for the directory levels (last to first)

None
Source code in tracknado/extractors.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
def from_parent_dirs(depth: int = 1, names: list[str] = None) -> MetadataExtractor:
    """Extract metadata from parent directory names.

    Args:
        depth: How many levels up to go
        names: Optional list of keys for the directory levels (last to first)
    """
    def extractor(path: Path) -> dict[str, str]:
        metadata = {}
        current = path.parent
        for i in range(depth):
            if current == current.parent: # Reached root
                break
            key = names[i] if names and i < len(names) else f"dir_{i+1}"
            metadata[key] = current.name
            current = current.parent
        return metadata

    return extractor

Validation

Validate a hub using UCSC's hubCheck tool.

Parameters:

Name Type Description Default
hub_path Path

Path to hub.txt file

required
strict bool

If True, fail on warnings too

False

Returns:

Type Description
tuple[bool, str]

(is_valid, message) tuple

Source code in tracknado/validation.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def validate_hub(hub_path: Path, strict: bool = False) -> tuple[bool, str]:
    """Validate a hub using UCSC's hubCheck tool.

    Args:
        hub_path: Path to hub.txt file
        strict: If True, fail on warnings too

    Returns:
        (is_valid, message) tuple
    """
    hub_path = Path(hub_path)
    if not hub_path.exists():
        return False, f"Hub file not found: {hub_path}"

    hubcheck = shutil.which("hubCheck")
    if not hubcheck:
        # Check standard user local bin too
        user_bin = Path.home() / "bin" / "hubCheck"
        if user_bin.exists():
            hubcheck = str(user_bin)

    if not hubcheck:
        return False, ("hubCheck not found in PATH or ~/bin/hubCheck. "
                      "Install from: http://hgdownload.cse.ucsc.edu/admin/exe/")

    cmd = [hubcheck]
    if strict:
        cmd.append("-strict")
    cmd.append(str(hub_path))

    try:
        result = subprocess.run(cmd, capture_output=True, text=True)
        is_valid = result.returncode == 0
        message = result.stderr or result.stdout
        if not message and is_valid:
            message = "Hub is valid."
        return is_valid, message
    except Exception as e:
        return False, f"Error running hubCheck: {e}"

Validates hub structure without external tools.

Source code in tracknado/validation.py
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
class HubValidator:
    """Validates hub structure without external tools."""

    def __init__(self, hub_dir: str | Path):
        self.hub_dir = Path(hub_dir)
        self.errors = []
        self.warnings = []

    def validate_all(self) -> bool:
        """Run all validations."""
        self.validate_structure()
        self.validate_track_files_exist()
        return len(self.errors) == 0

    def validate_structure(self) -> list[str]:
        """Check hub has required files."""
        hub_txt = self.hub_dir / f"{self.hub_dir.name}.hub.txt"
        # Since hub names can vary, we look for *.hub.txt
        hub_files = list(self.hub_dir.glob("*.hub.txt"))
        if not hub_files:
            self.errors.append("No hub.txt file found in directory.")

        # Check for genomes.txt referenced in hub.txt would be better, 
        # but for now we look for common file names or hub-specific ones
        genomes_files = (list(self.hub_dir.glob("**/genomes.txt")) + 
                         list(self.hub_dir.glob("**/*.genomes.txt")))
        if not genomes_files:
            self.errors.append("No genomes.txt file found.")

        trackdb_files = list(self.hub_dir.glob("**/trackDb.txt"))
        if not trackdb_files:
            self.errors.append("No trackDb.txt file found.")

        return self.errors

    def validate_track_files_exist(self) -> list[str]:
        """Ensure all referenced track files exist locally.

        Note: This only works if tracks are local paths, which is true during staging.
        """
        for trackdb in self.hub_dir.glob("**/trackDb.txt"):
            with open(trackdb, 'r') as f:
                content = f.read()
                # Simple regex to find 'bigDataUrl' entries
                import re
                urls = re.findall(r"bigDataUrl\s+(.+)", content)
                for url in urls:
                    # Resolve relative to trackdb or hub_dir
                    # Hubs usually use relative paths from trackdb
                    track_path = trackdb.parent / url
                    if not track_path.exists():
                        self.warnings.append(f"Track file not found: {url} (referenced in {trackdb.name})")
        return self.warnings

Functions

validate_all()

Run all validations.

Source code in tracknado/validation.py
57
58
59
60
61
def validate_all(self) -> bool:
    """Run all validations."""
    self.validate_structure()
    self.validate_track_files_exist()
    return len(self.errors) == 0

validate_structure()

Check hub has required files.

Source code in tracknado/validation.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def validate_structure(self) -> list[str]:
    """Check hub has required files."""
    hub_txt = self.hub_dir / f"{self.hub_dir.name}.hub.txt"
    # Since hub names can vary, we look for *.hub.txt
    hub_files = list(self.hub_dir.glob("*.hub.txt"))
    if not hub_files:
        self.errors.append("No hub.txt file found in directory.")

    # Check for genomes.txt referenced in hub.txt would be better, 
    # but for now we look for common file names or hub-specific ones
    genomes_files = (list(self.hub_dir.glob("**/genomes.txt")) + 
                     list(self.hub_dir.glob("**/*.genomes.txt")))
    if not genomes_files:
        self.errors.append("No genomes.txt file found.")

    trackdb_files = list(self.hub_dir.glob("**/trackDb.txt"))
    if not trackdb_files:
        self.errors.append("No trackDb.txt file found.")

    return self.errors

validate_track_files_exist()

Ensure all referenced track files exist locally.

Note: This only works if tracks are local paths, which is true during staging.

Source code in tracknado/validation.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def validate_track_files_exist(self) -> list[str]:
    """Ensure all referenced track files exist locally.

    Note: This only works if tracks are local paths, which is true during staging.
    """
    for trackdb in self.hub_dir.glob("**/trackDb.txt"):
        with open(trackdb, 'r') as f:
            content = f.read()
            # Simple regex to find 'bigDataUrl' entries
            import re
            urls = re.findall(r"bigDataUrl\s+(.+)", content)
            for url in urls:
                # Resolve relative to trackdb or hub_dir
                # Hubs usually use relative paths from trackdb
                track_path = trackdb.parent / url
                if not track_path.exists():
                    self.warnings.append(f"Track file not found: {url} (referenced in {trackdb.name})")
    return self.warnings