Real channel fetch (Telegram/Bale/Divar) + AI-audited automation engine + CI/CD
- Fetch: Telegram via t.me/s, Bale via Bot API, Divar via web-search (HttpClient, config-gated, graceful) - AI layer: DB-backed AppSetting (mode auto/manual, thresholds, AI endpoint/model/key/prompt/framework, auto-approve); OpenAI-compatible IAiAuditor (self-host/Iranian endpoints; fails safe to manual) - Pipeline: fetch → dedupe(hash) → parse → validate → AI audit → Discard/Flag/Queue/auto-publish (resolve-or-create facility) - Admin: /Admin/Settings automation+AI panel; queue shows confidence + AI verdict; flagged section - CI/CD: Dockerfile, docker-compose.prod.yml, .gitea/workflows/ci-cd.yml, nginx vhost, DEPLOY.md; forwarded headers + /healthz + prod reference-only seed; ports 22/80/443 only Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
@@ -7,22 +7,24 @@ using Microsoft.EntityFrameworkCore;
|
||||
|
||||
namespace JobsMedical.Web.Services.Scraping;
|
||||
|
||||
public record SourceResult(string Source, int Fetched, int Queued, int Flagged, int Spam, int Duplicates);
|
||||
public record SourceResult(string Source, int Fetched, int Queued, int Published, int Flagged, int Spam, int Duplicates);
|
||||
|
||||
public record IngestionSummary(List<SourceResult> Sources)
|
||||
{
|
||||
public int TotalQueued => Sources.Sum(s => s.Queued);
|
||||
public int TotalPublished => Sources.Sum(s => s.Published);
|
||||
public int TotalFlagged => Sources.Sum(s => s.Flagged);
|
||||
public int TotalSpam => Sources.Sum(s => s.Spam);
|
||||
public int TotalDuplicates => Sources.Sum(s => s.Duplicates);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// The scrape engine. Pulls from every enabled <see cref="IListingSource"/>, dedupes by content
|
||||
/// hash, parses with <see cref="IListingParser"/>, validates with <see cref="ListingValidator"/>,
|
||||
/// and stores each as a <see cref="RawListing"/> with a status: New (queued for review),
|
||||
/// Flagged (incomplete/suspicious), or Discarded (spam). Source-agnostic — add a source and it
|
||||
/// flows through unchanged.
|
||||
/// The scrape engine. For every enabled source: dedupe by content hash → parse → rule-validate →
|
||||
/// (optional) AI audit → decide. Decision depends on admin settings:
|
||||
/// • spam → Discarded
|
||||
/// • AI on: AI verdict drives approve/reject/review; approve + Automatic + AiAutoApprove → publish
|
||||
/// • AI off: Automatic + confidence ≥ threshold → publish; else queue/flag
|
||||
/// "Publish" resolves-or-creates an (unverified) facility and creates the Shift/JobOpening.
|
||||
/// </summary>
|
||||
public class IngestionService
|
||||
{
|
||||
@@ -30,16 +32,15 @@ public class IngestionService
|
||||
private readonly IEnumerable<IListingSource> _sources;
|
||||
private readonly IListingParser _parser;
|
||||
private readonly ListingValidator _validator;
|
||||
private readonly IAiAuditor _ai;
|
||||
private readonly SettingsService _settings;
|
||||
private readonly ILogger<IngestionService> _log;
|
||||
|
||||
public IngestionService(AppDbContext db, IEnumerable<IListingSource> sources,
|
||||
IListingParser parser, ListingValidator validator, ILogger<IngestionService> log)
|
||||
public IngestionService(AppDbContext db, IEnumerable<IListingSource> sources, IListingParser parser,
|
||||
ListingValidator validator, IAiAuditor ai, SettingsService settings, ILogger<IngestionService> log)
|
||||
{
|
||||
_db = db;
|
||||
_sources = sources;
|
||||
_parser = parser;
|
||||
_validator = validator;
|
||||
_log = log;
|
||||
_db = db; _sources = sources; _parser = parser; _validator = validator;
|
||||
_ai = ai; _settings = settings; _log = log;
|
||||
}
|
||||
|
||||
public IReadOnlyList<(string Name, bool Enabled)> Sources =>
|
||||
@@ -47,18 +48,22 @@ public class IngestionService
|
||||
|
||||
public async Task<IngestionSummary> RunAsync(CancellationToken ct = default)
|
||||
{
|
||||
var roles = await _db.Roles.Select(r => r.Name).ToListAsync(ct);
|
||||
var cities = await _db.Cities.Select(c => c.Name).ToListAsync(ct);
|
||||
var districts = await _db.Districts.Select(d => d.Name).ToListAsync(ct);
|
||||
var settings = await _settings.GetAsync();
|
||||
var roles = await _db.Roles.ToListAsync(ct);
|
||||
var cities = await _db.Cities.ToListAsync(ct);
|
||||
var districts = await _db.Districts.ToListAsync(ct);
|
||||
var roleNames = roles.Select(r => r.Name).ToList();
|
||||
var cityNames = cities.Select(c => c.Name).ToList();
|
||||
var districtNames = districts.Select(d => d.Name).ToList();
|
||||
|
||||
var results = new List<SourceResult>();
|
||||
|
||||
foreach (var source in _sources.Where(s => s.Enabled))
|
||||
{
|
||||
int fetched = 0, queued = 0, flagged = 0, spam = 0, dupes = 0;
|
||||
int fetched = 0, queued = 0, published = 0, flagged = 0, spam = 0, dupes = 0;
|
||||
IReadOnlyList<ScrapedItem> items;
|
||||
try { items = await source.FetchAsync(ct); }
|
||||
catch (Exception ex) { _log.LogError(ex, "Source {Source} fetch failed", source.Name); continue; }
|
||||
catch (Exception ex) { _log.LogError(ex, "Source {Source} failed", source.Name); continue; }
|
||||
|
||||
foreach (var item in items)
|
||||
{
|
||||
@@ -66,42 +71,155 @@ public class IngestionService
|
||||
var hash = Hash(item.RawText);
|
||||
if (await _db.RawListings.AnyAsync(r => r.ContentHash == hash, ct)) { dupes++; continue; }
|
||||
|
||||
var parsed = _parser.Parse(item.RawText, roles, cities, districts);
|
||||
var parsed = _parser.Parse(item.RawText, roleNames, cityNames, districtNames);
|
||||
var val = _validator.Validate(item.RawText, parsed);
|
||||
|
||||
var status = val.IsSpam ? RawListingStatus.Discarded
|
||||
: val.IsValid ? RawListingStatus.New
|
||||
: RawListingStatus.Flagged;
|
||||
if (status == RawListingStatus.New) queued++;
|
||||
else if (status == RawListingStatus.Flagged) flagged++;
|
||||
else spam++;
|
||||
AiAuditResult? ai = null;
|
||||
if (settings.AiEnabled && !val.IsSpam)
|
||||
ai = await _ai.AuditAsync(item.RawText, settings, ct);
|
||||
|
||||
_db.RawListings.Add(new RawListing
|
||||
var (status, reason, confidence) = Decide(settings, val, ai);
|
||||
|
||||
var raw = new RawListing
|
||||
{
|
||||
SourceChannel = item.Source,
|
||||
SourceUrl = item.SourceUrl,
|
||||
RawText = item.RawText.Trim(),
|
||||
ContentHash = hash,
|
||||
Confidence = val.Confidence,
|
||||
ValidationNotes = val.Issues.Count > 0 ? string.Join("؛ ", val.Issues) : null,
|
||||
Confidence = confidence,
|
||||
ValidationNotes = reason,
|
||||
Status = status,
|
||||
});
|
||||
};
|
||||
_db.RawListings.Add(raw);
|
||||
|
||||
if (status == RawListingStatus.Normalized)
|
||||
{
|
||||
try { Publish(parsed, ai, raw, roles, cities, districts); published++; }
|
||||
catch (Exception ex) { _log.LogWarning(ex, "Auto-publish failed; queueing instead"); raw.Status = RawListingStatus.New; queued++; }
|
||||
}
|
||||
else if (status == RawListingStatus.New) queued++;
|
||||
else if (status == RawListingStatus.Flagged) flagged++;
|
||||
else spam++;
|
||||
}
|
||||
|
||||
await _db.SaveChangesAsync(ct);
|
||||
results.Add(new SourceResult(source.Name, fetched, queued, flagged, spam, dupes));
|
||||
_log.LogInformation("Ingestion {Source}: fetched={F} queued={Q} flagged={Fl} spam={S} dupes={D}",
|
||||
source.Name, fetched, queued, flagged, spam, dupes);
|
||||
results.Add(new SourceResult(source.Name, fetched, queued, published, flagged, spam, dupes));
|
||||
_log.LogInformation("Ingest {S}: fetched={F} queued={Q} published={P} flagged={Fl} spam={Sp} dupes={D}",
|
||||
source.Name, fetched, queued, published, flagged, spam, dupes);
|
||||
}
|
||||
|
||||
return new IngestionSummary(results);
|
||||
}
|
||||
|
||||
/// <summary>SHA-256 hex of the whitespace-normalized text (for cross-run dedupe).</summary>
|
||||
private static (RawListingStatus status, string? reason, int confidence) Decide(
|
||||
AppSetting s, ValidationResult val, AiAuditResult? ai)
|
||||
{
|
||||
var notes = val.Issues.Count > 0 ? string.Join("؛ ", val.Issues) : null;
|
||||
|
||||
if (val.IsSpam)
|
||||
return (RawListingStatus.Discarded, Join("اسپم", notes), val.Confidence);
|
||||
|
||||
if (ai is not null)
|
||||
{
|
||||
var aiNote = Join($"AI: {ai.Decision} ({ai.Confidence}٪)" + (ai.Reason is null ? "" : $" — {ai.Reason}"), notes);
|
||||
if (ai.Reject) return (RawListingStatus.Discarded, aiNote, ai.Confidence);
|
||||
if (ai.Approve)
|
||||
return (s.Mode == IngestionMode.Automatic && s.AiAutoApprove
|
||||
? RawListingStatus.Normalized : RawListingStatus.New, aiNote, ai.Confidence);
|
||||
return (RawListingStatus.Flagged, aiNote, ai.Confidence); // review
|
||||
}
|
||||
|
||||
if (!val.IsValid) return (RawListingStatus.Flagged, notes, val.Confidence);
|
||||
if (s.Mode == IngestionMode.Automatic && val.Confidence >= s.AutoPublishMinConfidence)
|
||||
return (RawListingStatus.Normalized, notes, val.Confidence);
|
||||
return (RawListingStatus.New, notes, val.Confidence);
|
||||
}
|
||||
|
||||
private void Publish(ParsedListing parsed, AiAuditResult? ai, RawListing raw,
|
||||
List<Role> roles, List<City> cities, List<District> districts)
|
||||
{
|
||||
var d = ai?.Data;
|
||||
var roleName = d?.Role ?? parsed.RoleName;
|
||||
var cityName = d?.City ?? parsed.CityName;
|
||||
var districtName = d?.District ?? parsed.DistrictName;
|
||||
|
||||
var role = roles.FirstOrDefault(r => r.Name == roleName) ?? roles.First();
|
||||
var city = cities.FirstOrDefault(c => c.Name == cityName)
|
||||
?? cities.FirstOrDefault(c => c.IsActive) ?? cities.First();
|
||||
var district = districts.FirstOrDefault(x => x.Name == districtName && x.CityId == city.Id);
|
||||
|
||||
var facilityName = !string.IsNullOrWhiteSpace(d?.FacilityName) ? d!.FacilityName!.Trim()
|
||||
: $"مرکز درمانی (از {raw.SourceChannel})";
|
||||
var facility = _db.Facilities.Local.FirstOrDefault(f => f.Name == facilityName && f.CityId == city.Id)
|
||||
?? _db.Facilities.FirstOrDefault(f => f.Name == facilityName && f.CityId == city.Id);
|
||||
if (facility is null)
|
||||
{
|
||||
facility = new Facility
|
||||
{
|
||||
Name = facilityName, Type = FacilityType.Clinic, City = city, DistrictId = district?.Id,
|
||||
Phone = parsed.Phone, IsVerified = false,
|
||||
};
|
||||
_db.Facilities.Add(facility);
|
||||
}
|
||||
|
||||
var kind = (d?.Kind ?? parsed.Kind.ToString()).ToLowerInvariant();
|
||||
if (kind.Contains("job") || kind.Contains("استخدام"))
|
||||
{
|
||||
_db.JobOpenings.Add(new JobOpening
|
||||
{
|
||||
Facility = facility, Role = role,
|
||||
Title = !string.IsNullOrWhiteSpace(d?.Title) ? d!.Title!.Trim() : $"استخدام {role.Name}",
|
||||
EmploymentType = MapEmployment(d?.EmploymentType, parsed.EmploymentType),
|
||||
SalaryMin = parsed.PayAmount,
|
||||
Description = raw.RawText, Status = ShiftStatus.Open, Source = ShiftSource.Aggregated,
|
||||
SourceUrl = raw.SourceUrl,
|
||||
});
|
||||
}
|
||||
else
|
||||
{
|
||||
var st = MapShiftType(d?.ShiftType, parsed.ShiftType);
|
||||
var (start, end) = DefaultTimes(st);
|
||||
_db.Shifts.Add(new Shift
|
||||
{
|
||||
Facility = facility, Role = role,
|
||||
Date = DateOnly.FromDateTime(DateTime.UtcNow).AddDays(1),
|
||||
StartTime = start, EndTime = end, ShiftType = st,
|
||||
SpecialtyRequired = role.Name, Description = raw.RawText,
|
||||
PayType = parsed.SharePercent is not null && parsed.PayAmount is null ? PayType.Percentage
|
||||
: parsed.PayAmount is null ? PayType.Negotiable : PayType.PerShift,
|
||||
PayAmount = parsed.PayAmount, SharePercent = parsed.SharePercent,
|
||||
Status = ShiftStatus.Open, Source = ShiftSource.Aggregated, SourceUrl = raw.SourceUrl,
|
||||
});
|
||||
}
|
||||
raw.Status = RawListingStatus.Normalized;
|
||||
}
|
||||
|
||||
private static ShiftType MapShiftType(string? ai, ShiftType? parsed) => (ai?.ToLowerInvariant()) switch
|
||||
{
|
||||
"day" => ShiftType.Day, "evening" => ShiftType.Evening, "night" => ShiftType.Night, "oncall" => ShiftType.OnCall,
|
||||
_ => parsed ?? ShiftType.Day,
|
||||
};
|
||||
|
||||
private static EmploymentType MapEmployment(string? ai, EmploymentType? parsed) => (ai?.ToLowerInvariant()) switch
|
||||
{
|
||||
"parttime" => EmploymentType.PartTime, "contract" => EmploymentType.Contract,
|
||||
"plan" => EmploymentType.Plan, "fulltime" => EmploymentType.FullTime,
|
||||
_ => parsed ?? EmploymentType.FullTime,
|
||||
};
|
||||
|
||||
private static (TimeOnly, TimeOnly) DefaultTimes(ShiftType t) => t switch
|
||||
{
|
||||
ShiftType.Day => (new TimeOnly(8, 0), new TimeOnly(14, 0)),
|
||||
ShiftType.Evening => (new TimeOnly(14, 0), new TimeOnly(20, 0)),
|
||||
ShiftType.Night => (new TimeOnly(20, 0), new TimeOnly(8, 0)),
|
||||
_ => (new TimeOnly(8, 0), new TimeOnly(8, 0)),
|
||||
};
|
||||
|
||||
private static string? Join(string a, string? b) => string.IsNullOrEmpty(b) ? a : $"{a} | {b}";
|
||||
|
||||
private static string Hash(string text)
|
||||
{
|
||||
var normalized = Regex.Replace((text ?? "").Trim(), @"\s+", " ");
|
||||
var bytes = SHA256.HashData(Encoding.UTF8.GetBytes(normalized));
|
||||
return Convert.ToHexString(bytes).ToLowerInvariant();
|
||||
return Convert.ToHexString(SHA256.HashData(Encoding.UTF8.GetBytes(normalized))).ToLowerInvariant();
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user