import csv import os from ij.gui import GenericDialog, DialogListener from ij.gui import Overlay from java.awt import Color COLOR_P63_POSITIVE = Color(230, 159, 0) # orange COLOR_P63_NEGATIVE = Color(86, 180, 233) # sky blue def show_review_dialog(imp, rois_with_means, otsu_value_255, output_dir, variant): """Show a blocking GenericDialog with a 0-255 slider seeded at otsu_value_255; return the accepted threshold. rois_with_means -- list of (Roi, mean_255) tuples; means are on the 0-255 scale. The dialog recolors ROIs live on every slider move. On cancel, Otsu coloring is restored and the Otsu value is returned unchanged. On OK, CSVs are updated with the final calls. """ from java.awt import Label, Panel from java.awt.event import ActionListener otsu_int = int(round(otsu_value_255)) # Pre-colour ROIs at the Otsu value before the dialog opens n_pos_init, n_neg_init = _recolor_rois(rois_with_means, otsu_int, None, imp) gd = GenericDialog("p63 Threshold Review - " + imp.getTitle()) gd.addSlider("p63 threshold (0-255)", 0, 255, otsu_int) # Live count panel -- updated by ThresholdListener on every slider move count_panel = Panel() count_label = Label("p63+: {} p63-: {} (T={})".format(n_pos_init, n_neg_init, otsu_int)) count_panel.add(count_label) gd.addPanel(count_panel) # Inner classes capture outer-scope names via Jython closure rules class ResetListener(ActionListener): def actionPerformed(self, event): sliders = gd.getSliders() if sliders.size() > 0: sliders.get(0).setValue(otsu_int) n_pos, n_neg = _recolor_rois(rois_with_means, otsu_int, None, imp) count_label.setText("p63+: {} p63-: {} (T={})".format(n_pos, n_neg, otsu_int)) class ThresholdListener(DialogListener): def dialogItemChanged(self, gd2, event): sliders = gd2.getSliders() if sliders.size() > 0: t = sliders.get(0).getValue() n_pos, n_neg = _recolor_rois(rois_with_means, t, None, imp) count_label.setText("p63+: {} p63-: {} (T={})".format(n_pos, n_neg, t)) return True gd.addButton("Reset to Otsu", ResetListener()) gd.addDialogListener(ThresholdListener()) gd.showDialog() if gd.wasCanceled(): _recolor_rois(rois_with_means, otsu_int, None, imp) return otsu_int sliders = gd.getSliders() final_t = sliders.get(0).getValue() if sliders.size() > 0 else otsu_int was_manual = (final_t != otsu_int) from sidecar_runner import read_cells_csv # Filter to the current image only -- _write_final_calls must not touch other images image_name = os.path.splitext(imp.getTitle())[0] cells_data = [r for r in read_cells_csv(output_dir) if r["image"] == image_name] _write_final_calls(cells_data, final_t, output_dir, was_manual, variant) return final_t def _recolor_rois(rois_with_means, threshold_255, overlay, imp): """Recolor every ROI orange/sky-blue based on threshold_255, replace the overlay, and refresh imp. Returns (n_pos, n_neg) so callers can update a live count label without a second pass. The overlay parameter is accepted for API consistency but a fresh Overlay is always built. """ new_overlay = Overlay() n_pos = n_neg = 0 for roi, mean in rois_with_means: if mean >= threshold_255: roi.setStrokeColor(COLOR_P63_POSITIVE) n_pos += 1 else: roi.setStrokeColor(COLOR_P63_NEGATIVE) n_neg += 1 new_overlay.add(roi) imp.setOverlay(new_overlay) imp.updateAndDraw() return n_pos, n_neg def _write_final_calls(cells_data, final_threshold, output_dir, was_manual, variant): """Overwrite cells.csv and summary.csv with the final p63+ calls; flag was_manual in the output. cells_data -- list of dicts from read_cells_csv (all images; rows for other images pass through unchanged) final_threshold -- int on 0-255 scale variant -- e.g. 'T14'; determines which summary.csv threshold column is updated """ threshold_source = "manual" if was_manual else "otsu" variant_lower = variant.lower() output_dir_s = str(output_dir) # Images covered by this review session (normally just one) images_in_review = set(row["image"] for row in cells_data) # ------------------------------------------------------------------ cells.csv cells_path = os.path.join(output_dir_s, "cells.csv") all_cells = [] with open(cells_path, "rb") as fh: reader = csv.DictReader(fh) orig_fields = list(reader.fieldnames or []) for row in reader: all_cells.append(dict(row)) cell_fields = orig_fields if "threshold_source" in orig_fields \ else orig_fields + ["threshold_source"] updated_cells = [] for row in all_cells: if row["image"] in images_in_review: row = dict(row) row["variant_call"] = "1" if float(row["red_mean"]) >= final_threshold else "0" row["threshold_source"] = threshold_source updated_cells.append(row) with open(cells_path, "wb") as fh: writer = csv.DictWriter(fh, fieldnames=cell_fields, extrasaction="ignore", restval="") writer.writeheader() writer.writerows(updated_cells) # ------------------------------------------------------------------ summary.csv # Recompute p63+ count for the reviewed image(s) at the final threshold n_pos = sum(1 for row in cells_data if float(row["red_mean"]) >= final_threshold) n_cells = len(cells_data) frac = round(float(n_pos) / n_cells, 6) if n_cells > 0 else 0.0 summary_path = os.path.join(output_dir_s, "summary.csv") all_summary = [] with open(summary_path, "rb") as fh: reader = csv.DictReader(fh) summ_fields = list(reader.fieldnames or []) for row in reader: all_summary.append(dict(row)) summ_fields = summ_fields if "threshold_source" in summ_fields \ else summ_fields + ["threshold_source"] updated_summary = [] for row in all_summary: if row["image"] in images_in_review: row = dict(row) row["{}_threshold".format(variant_lower)] = str(round(float(final_threshold), 4)) row["{}_p63_positive".format(variant_lower)] = str(n_pos) row["{}_p63_fraction".format(variant_lower)] = str(frac) row["threshold_source"] = threshold_source updated_summary.append(row) with open(summary_path, "wb") as fh: writer = csv.DictWriter(fh, fieldnames=summ_fields, extrasaction="ignore", restval="") writer.writeheader() writer.writerows(updated_summary) def _write_final_calls_batch(cells_by_image, thresholds, output_dir, was_manual_map, variant): """Write final p63+ calls for multiple images in a single pass through each CSV. thresholds -- {image_name: final_threshold_float} was_manual_map -- {image_name: bool} """ variant_lower = variant.lower() output_dir_s = str(output_dir) images_in_review = set(thresholds.keys()) # ------------------------------------------------------------------ cells.csv cells_path = os.path.join(output_dir_s, "cells.csv") all_cells = [] with open(cells_path, "rb") as fh: reader = csv.DictReader(fh) orig_fields = list(reader.fieldnames or []) for row in reader: all_cells.append(dict(row)) cell_fields = orig_fields if "threshold_source" in orig_fields \ else orig_fields + ["threshold_source"] updated_cells = [] for row in all_cells: img = row["image"] if img in images_in_review: row = dict(row) t = thresholds[img] row["variant_call"] = "1" if float(row["red_mean"]) >= t else "0" row["threshold_source"] = "manual" if was_manual_map.get(img) else "otsu" updated_cells.append(row) with open(cells_path, "wb") as fh: writer = csv.DictWriter(fh, fieldnames=cell_fields, extrasaction="ignore", restval="") writer.writeheader() writer.writerows(updated_cells) # ------------------------------------------------------------------ summary.csv summary_path = os.path.join(output_dir_s, "summary.csv") all_summary = [] with open(summary_path, "rb") as fh: reader = csv.DictReader(fh) summ_fields = list(reader.fieldnames or []) for row in reader: all_summary.append(dict(row)) summ_fields = summ_fields if "threshold_source" in summ_fields \ else summ_fields + ["threshold_source"] # Pre-compute per-image counts from cells_by_image updated_summary = [] for row in all_summary: img = row["image"] if img in images_in_review: row = dict(row) t = thresholds[img] cells = cells_by_image.get(img, []) n_pos = sum(1 for c in cells if float(c["red_mean"]) >= t) n_cells = len(cells) frac = round(float(n_pos) / n_cells, 6) if n_cells > 0 else 0.0 row["{}_threshold".format(variant_lower)] = str(round(float(t), 4)) row["{}_p63_positive".format(variant_lower)] = str(n_pos) row["{}_p63_fraction".format(variant_lower)] = str(frac) row["threshold_source"] = "manual" if was_manual_map.get(img) else "otsu" updated_summary.append(row) with open(summary_path, "wb") as fh: writer = csv.DictWriter(fh, fieldnames=summ_fields, extrasaction="ignore", restval="") writer.writeheader() writer.writerows(updated_summary)