1. What the service does (business + tech)

  • Business: Reverse-pickup inspection: compare vendor reference images (S3/CloudFront) with pickup/return images (base64 or URLs), answer vendor-defined verification questions, and output Pick / Not Pick with evidence and confidence.

  • Tech: FastAPI app that loads InternVL v2.5 1B (vision–language), runs inference on GPU, validates output against a JSON schema, optionally retries once, and saves requests/results to a DB. No image/output caching.

From the Readme: target is ~20K images/day, 14–17s response, peak concurrency 15, 6:00–24:00 uptime, and eventually SageMaker + MLOps (scale, CI/CD, monitoring).


2. High-level architecture

Client → POST /process_request (RequestItem)

    → Validate request (Pydantic)     → [Optional] Use prefetched images from pipeline     → Else: download vendor images (async) + decode pickup images     → Preprocess to tensors (dynamic_preprocess, load_image)     → Parse “question” JSON → build prompt (generate_question_template)     → run_model_inference (InternVL chat)     → extract_json + validate_json_schema     → [If invalid] Retry once with enhanced prompt     → Map recommendation (Accept→Pick, Reject/Partial/Retry→Not Pick)     → Match question labels (fuzzy), build Question2, analysis     → Return ProcessingResponse immediately     → Background: save to DB (rvpxbverification_request_data, rvpxbverification, rvpxbverification_fail)

So the code is a single FastAPI process (one worker) with:

  • Prefetch pipeline: background workers pull from a queue, download/decode images, turn them into tensors, and put results in processed_queue so the main request can sometimes skip I/O and preprocessing.

  • Concurrency: semaphore MAX_CONCURRENT_REQUESTS (default 10), thread pools for CPU-bound work (image decode, preprocessing).

  • DB: sql_conn used only for inserts (request payload, result, and failure records).


3. Configuration (Config class, ~lines 63–97)

  • Model: MODEL_PATH = “full_14_08_2025_ch_28k” (local dir; no HuggingFace ID in code).
  • Server: HOST, PORT (default 8001), LOG_LEVEL, CORS_ORIGINS, TRUSTED_HOSTS.
  • Limits: MAX_IMAGE_SIZE (10MB), REQUEST_TIMEOUT (300s), MAX_IMAGES_PER_REQUEST (20), MAX_CONCURRENT_REQUESTS (10).
  • Image: IMAGE_SIZE (448), PICKUP_PATCH (2), generation_config (e.g. max_new_tokens 2000, no sampling).
  • Performance: THREAD_POOL_SIZE, PREFETCH_WORKERS, PREFETCH_FACTOR, PIPELINE_QUEUE_SIZE, IMAGE_DOWNLOAD_TIMEOUT, USE_MIXED_PRECISION.

For SageMaker you’ll typically drive these via environment variables (and keep MODEL_PATH pointing to the artifact SageMaker provides).


4. Request and response contracts

Request (RequestItem, ~445–598)

  • pickupproductimage: list of URLs (vendor reference images), 1–10.

  • skupickdone: list of base64 strings or HTTPS URLs (pickup/return images), 1–10.

  • productname, question, shippingid (required).

  • Optional: productid, uuid, previousuuid.

  • Validators enforce URL/image format and lengths.

Response (ProcessingResponse, ~600–612)

  • shippingid, uuid, product_id, processing_time_ai, Question (list of per-question answers), ai_decision (e.g. “Pick”/“Not Pick”), finalAction, ai_match_score, analysis, status, StatusCode, total_time, timestamp.

So for SageMaker you’ll expose an endpoint that accepts this JSON body and returns this shape (or wrap it in SageMaker’s request/response envelope if you use the default inference format).


5. JSON schema and validation (~99–219, 212–221)

RESPONSE_SCHEMA defines the expected model output:

  • image_analysis: pickup_images, vendor_image, comparison (strings).

  • verification_results: array of objects with Type, Label, IsMandatory, ValueToCheck, Result, Evidence, Confidence, isRetakeImage.

  • pickup_confidence: overall_score, confidence_factors (image_quality, attribute_visibility, matching_confidence), recommendation (Accept/Reject/Partial/Retry).

validate_json_schema(data) runs jsonschema validation. If it fails, the code retries inference once with an extra instruction to fix confidence types and structure.


6. Model loading and inference

  • Load (load_model, ~1137–1300):

  • AutoModel.from_pretrained(MODEL_PATH, torch_dtype=torch.bfloat16, attn_implementation=“flash_attention_2”, …) and AutoTokenizer.from_pretrained(MODEL_PATH).

  • Model is .eval().cuda(), then torch.compile(…, mode=“reduce-overhead”).

  • Requires CUDA; on failure sets model_state.load_error and raises ProcessingError.

  • Inference (run_model_inference, ~1355–1372):

  • model_state.model.chat(tokenizer, pixel_values, questions, num_patches_list=num_patches, generation_config=…, history=None) under torch.inference_mode() and a dedicated CUDA stream, then torch.cuda.synchronize().

For SageMaker you’ll load the same way inside the container, with MODEL_PATH set to the path where the model artifact is extracted (e.g. /opt/ml/model or a subfolder).


7. Image pipeline (vendor vs pickup)

  • Vendor images: URLs in pickupproductimage.

  • rewrite_s3_to_cloudfront replaces xbeestest.s3.amazonaws.com with a CloudFront domain.

  • download_image_async (aiohttp) downloads and opens as PIL RGB; enforces MAX_IMAGE_SIZE.

  • Pickup images: in skupickdone — either base64 or https:// URLs.

  • decode_base64_image_batch: if URL, fetches (with retry); else base64-decode; same size limit.

  • Preprocessing:

  • build_transform (resize to input_size, normalize ImageNet stats).

  • dynamic_preprocess splits by aspect ratio into patches (e.g. 1–12 patches).

  • load_image(image_file, input_size, max_num) returns a tensor; vendor uses max_num=1, pickup uses config.PICKUP_PATCH (2).

Prefetch workers do the same download/decode/load in a thread pool and put PrefetchedData (vendor/pickup tensors + metadata) in processed_queue.


8. Question and prompt flow

  • Input: request.question is a JSON string. process_verification_data parses it and extracts a list of questions (e.g. data[‘Question’]), with fields like Type, Label, ValueToCheck, etc.

  • Prompt: generate_question_template(vendor_img_count, sr_images_count, productname, question_data) builds the text prompt and injects a structured JSON template (fix_verification_questions adjusts Result/Evidence instructions per question type and labels). The template describes vendor vs pickup image indices and the required output format.

  • Output parsing: extract_json(responses) (regex/pyaml/json/ast) turns the model text into a dict; then validate_json_schema(result) checks it. On failure, one retry with an “IMPORTANT: … validation errors …” suffix.


9. Main processing and response (process_request_with_prefetch, ~1417–1833)

  • Acquires semaphore, then tries to get prefetched data (short timeout).

  • If none: downloads vendor images, decodes pickup images, builds tensors, and may submit the request for prefetch for future calls.

  • Builds img_tensors, num_patches, questions, pixel_values; runs run_model_inference; extracts and validates JSON; optionally retries once.

  • Maps pickup_confidence.recommendation to ai_decision (Pick/Not Pick) and StatusCode (200 vs 202).

  • Aligns model verification_results to input question labels (including fuzzy match via find_most_similar_string), fills missed questions with defaults, then builds Question2 (with answer, ValueToCheck, isRetakeImage, etc.).

  • Builds response_dict and returns it.

  • Background tasks:

  • Save request to rvpxbverification_request_data.

  • Save success/fail payload to rvpxbverification or rvpxbverification_fail (including first/second inference and validation status).

So the “ML” part is: download/decode → preprocess → one (or two) inference(s) → validate → business logic (recommendation + label matching) → response + DB.


10. Database usage

  • sql_conn (imported as conn):

  • conn.insert_single_dict_simple(data_dict, table_name, drop_if_exists=True) for rvpxbverification_fail.

  • conn.insert_single_dict_simple(data_dict, table_name) for rvpxbverification_request_data and rvpxbverification.

  • conn.create_table_from_dict(data_dict, table_name) in save_to_database_error.

The project only references sql_conn; the actual module is not in the repo. For SageMaker you’ll need to either bundle it or replace it with a client that works in the container (e.g. RDS, Redshift, or another DB with proper networking from the SageMaker VPC).


11. API surface

  • GET /

  • Service name, version, feature flags, max_concurrent_requests, prefetch_workers, caching_enabled: false.

  • GET /health

  • Model loaded, CUDA, GPU memory, semaphore “current active”, prefetch pipeline status (running, workers, queue sizes). Used by load balancers/ SageMaker for health checks.

  • POST /process_request

  • Body: RequestItem. Returns ProcessingResponse. All core logic runs here (via process_request_with_prefetch).

  • GET /metrics

  • Concurrency, prefetch queue sizes, performance flags. Useful for monitoring and autoscaling.

  • Exception handlers

  • ProcessingError, HTTPException, and generic Exception return a consistent { “errorMsg”, “code”, “processingTime”, “previousuuid” } style.


12. Lifespan and entrypoint (~1324–1352, 2045–2057)

  • Startup: load_model() then model_state.start_prefetch_pipeline() (starts prefetch workers).

  • Shutdown: stop prefetch pipeline, close aiohttp session, shutdown thread pools, delete model/tokenizer, torch.cuda.empty_cache().

  • Run: uvicorn.run(app, host=…, port=…, workers=1) — single worker so the same process holds the GPU model.

For SageMaker you’ll run the same app in the container (e.g. uvicorn or gunicorn with 1 worker) and let SageMaker send HTTP requests to /process_request (or to a custom path you wire in the container).


13. What you need for AWS SageMaker deployment

  • Container:

  • Base image with CUDA, PyTorch, transformers, and dependencies (fastapi, uvicorn, aiohttp, PIL, pyaml, jsonschema, fuzzywuzzy, pandas, etc.).

  • Copy fast_prod_new.py and sql_conn (or replacement).

  • Set MODEL_PATH to the SageMaker model artifact path (e.g. /opt/ml/model).

  • Serve the FastAPI app (e.g. CMD [“uvicorn”, “fast_prod_new:app”, “—host”, “0.0.0.0”, “—port”, “8000”]).

  • Implement SageMaker’s invocation contract in the container: read request body from stdin or HTTP (depending on mode), call your existing logic (e.g. build RequestItem from the body, call process_request_with_prefetch), write response to stdout or HTTP. Alternatively, use a SageMaker custom container that just runs your FastAPI server and use “multi-model” or “bring your own container” and call POST /process_request from the client.

  • Model artifact:

  • Upload full_14_08_2025_ch_28k (or the same weights in the format the code expects) to S3 and register it as a SageMaker model; the container reads from the path SageMaker provides.

  • Instance:

  • GPU instance (e.g. g5/g6) to match current G6.2x Large usage; multi-instance + autoscaling for “peak concurrency 15” and queueing.

  • DB and networking:

  • Run the container in a VPC that can reach your DB (or use RDS/Redshift and adapt sql_conn to that). Ensure S3/CloudFront and any external URLs used for images are reachable (no need to change rewrite_s3_to_cloudfront unless domain changes).

  • Env and config:

  • Pass all Config overrides (concurrency, timeouts, model path, etc.) via environment variables in the SageMaker model/endpoint configuration.

  • CI/CD and MLOps:

  • Use SageMaker Pipelines or CodePipeline to build the container, run tests, and deploy to dev/prod endpoints; use SageMaker Model Monitor for drift/infra and your own metrics (e.g. from /metrics and DB) for “inspections done” and performance.

If you tell me your next step (e.g. “container Dockerfile”, “SageMaker invocation handler”, or “replace sql_conn with RDS”), I can outline the exact code changes and file layout next (still in Ask mode — no edits, only guidance).