Efficient Image Processing: Golang, Asynq, Redis, and Fiber for Asynchronous Queue Handling
Introduction
Hay there 👋,
In this brief article, I’ll explain a common method to speed up web applications. It involves moving tasks away from the main thread and placing them in a queue for asynchronous processing, using the queue to organize and handle these tasks separately.
In the world of server-side applications, the efficient handling of resource-intensive tasks like image processing is a significant challenge. These operations, often heavy on computation, can bog down the main thread, leading to decreased performance and poor scalability.
The Challenge at Hand
In server-side apps, managing image processing which uses a lot of computer power is a big challenge. If not done right, these tasks can overload the app’s main thread, making it slow and frustrating for users. I aimed to create a solution that improves performance in a simple way. The main goal was to keep the app’s core part running smoothly, even when it’s doing a lot of image processing.
Used Tools and Prerequisites
I picked Golang as the primary language for this image-processing platform because it excels at handling multiple tasks concurrently, allowing for swift image processing. Golang contributes to the speed and reliability of the service.
Fiber, a lightweight web framework built on Golang, is what I use for handling web-related tasks. Its efficient design ensures seamless internet communication and reliable web operations. Fiber is effective in maintaining smooth web service performance.
For handling background tasks, I use Asynq. It arranges these tasks so they don’t overwhelm the main part of the service. This is important for keeping the service running well without any delays or issues.
Redis is a fast key-value store that I’m using as a queue system, which is a popular way to use it. It’s vital for the Asynq package because Asynq depends on it to manage tasks. Redis helps keep everything organized and ensures tasks are completed efficiently and reliably.
Also, I have used Goland as the main IDE for coding and testing. It’s designed specifically for Go, making it incredibly handy for this project. With Goland, coding is streamlined and efficient, facilitating the development of Go applications.
I use Postman to manage and test API requests. It’s great for API work and makes the job easier and faster.
Implementing the Solution: Asynq & Redis
To achieve this, I turned to Asynq and Redis. Async serves as a powerful task queue, allowing me to offload heavy image-processing tasks away from the main thread. This meant that these tasks could be handled asynchronously, ensuring that the core functionalities of the application remained unaffected. Redis, known for its high performance, acted as the backbone, supporting Asynq with fast, in-memory data storage capabilities.
Overview of the Image-Processing Service
I built a service that makes working with images a lot easier. It’s written in a programming language called Go, which is great at doing many things at once without getting mixed up. This service can change image sizes, switch them to different formats, and make them look better, all very quickly. It’s made to handle lots of work without getting slow, which means the main part of an app using this service doesn’t have to do all the hard work by itself and can run smoothly.
Folder Structure
Now, let’s explore the folder structure:
Here’s the folder structure of the image-processing service:
- docker-compose.yaml: This file contains the configuration for running the service using Docker Compose.
- go.mod and go.sum: These files are used for managing Go dependencies and version control.
- handlers: This directory holds the code for handling different aspects of the service, such as image processing.
- images: This directory is where the processed images are stored.
- routes: Here, you’ll find the routes defined for the service, specifying how different HTTP requests are handled.
- server: The server.go file in this directory contains the code for setting up and running the server.
- tasks: In this directory, you’ll find code related to managing tasks, including queuing and processing.
- worker: The worker.go file here handles background tasks and ensures efficient task execution.
This organized folder structure keeps the service’s components separated, making it easier to manage and maintain the codebase.
image-processing-service/
├── docker-compose.yaml
├── go.mod
├── go.sum
├── handlers
│ └── handlers.go
├── images
├── routes
│ └── routes.go
├── server
│ └── server.go
├── tasks
│ ├── queue.go
│ └── tasks.go
└── worker
└── worker.go
Now let’s look at the docker-compose.yaml
file:
version: "3.9"
services:
redis:
image: redis:latest
ports:
- "6379:6379"
volumes:
- redis-data:/data
networks:
- image-processing-network
asynqmon:
image: hibiken/asynqmon:latest
environment:
- REDIS_ADDR=redis:6379
ports:
- "8080:8080"
depends_on:
- redis
networks:
- image-processing-network
networks:
image-processing-network:
volumes:
redis-data:
- Redis Service: This service uses the latest version of the Redis image. It’s like a container that runs the Redis database. It’s connected to the
image-processing-network
network, allowing it to communicate with other services on the same network. Additionally, it uses a volume namedredis-data
to store data. - Asynqmon Service: This service uses the latest version of the
hibiken/asynqmon
image. It’s a monitoring tool for Asynq. It also connects to theimage-processing-network
network to communicate with other services. It depends on the Redis service, which means it waits for the Redis service to start before it does.
So, in summary, this docker-compose.yaml
file defines two services (Redis and Asynqmon) that are part of the "image-processing-network" network and use a volume called "redis-data" for data storage.
Now we run the underlying infrastructure through the command docker-compose up -d
Now let’s look at the main entry point of the application which is the server/server.go
This code sets up the image-processing service’s server using Fiber, configures routes, connects to Redis, and listens on port 3000 for requests.
package main
import (
"github.com/gofiber/fiber/v2"
"github.com/qahta0/image-processing-service/routes"
"github.com/qahta0/image-processing-service/tasks"
"log"
)
const redisAddress = "127.0.0.1:6379"
func main() {
app := fiber.New()
routes.Setup(app)
tasks.Init(redisAddress)
defer tasks.Close()
log.Fatal(app.Listen(":3000"))
}
Now let’s look at the routes of the application which are the routes/routes.go
This code sets up a single POST route at “/image-process” using the Fiber framework, allowing image uploads to be processed by the handlers.UploadImage
function.
package routes
import (
"github.com/gofiber/fiber/v2"
"github.com/qahta0/image-processing-service/handlers"
)
func Setup(app *fiber.App) {
app.Post("/image-process", handlers.UploadImage)
}
In the tasks
package, we create a centralized task queue using the hibiken/asynq
library. This queue efficiently manages background tasks, connects to Redis for seamless processing, and streamlines task management in the image-processing service. It ensures consistent access to the same queue object.
package tasks
import (
"github.com/hibiken/asynq"
"sync"
)
var (
client *asynq.Client
once sync.Once
)
func Init(redisAddress string) {
once.Do(func() {
client = asynq.NewClient(asynq.RedisClientOpt{Addr: redisAddress})
})
}
func Close() {
if client != nil {
client.Close()
}
}
func GetClient() *asynq.Client {
return client
}
Now let’s look at the routes of the application which are the handlers/handlers.go
This code defines the handler for uploading and processing images. Here’s a concise summary:
The UploadImage
function handles image uploads via a POST request. It first checks if the upload was successful and handles errors accordingly. If the upload is successful, it reads the uploaded image data, creates image resize tasks, and enqueues them for processing. Finally, it responds with a success message.
package handlers
import (
"fmt"
"github.com/gofiber/fiber/v2"
"github.com/qahta0/image-processing-service/tasks"
"io"
)
func UploadImage(c *fiber.Ctx) error {
file, err := c.FormFile("file")
if err != nil {
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{"error": "Upload failed"})
}
fileData, err := file.Open()
if err != nil {
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{"error": "Failed to open the file"})
}
defer fileData.Close()
data, err := io.ReadAll(fileData)
if err != nil {
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{"error": "Failed to read the file"})
}
resizeTasks, err := tasks.NewImageResizeTasks(data)
if err != nil {
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{"error": "Could not create image resize tasks"})
}
client := tasks.GetClient()
for _, task := range resizeTasks {
if _, err := client.Enqueue(task); err != nil {
fmt.Printf("Error enqueuing task: %v\n", err)
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{"error": "Could not enqueue image resize task"})
}
}
return c.JSON(fiber.Map{"message": "Image uploaded and resizing tasks started"})
}
In the next crucial step, the function creates image resize tasks and efficiently queues them for asynchronous execution. This approach ensures that image processing doesn’t delay the main thread, allowing it to continue processing the user’s request. This separation of tasks contributes to the overall responsiveness and efficiency of the image-processing service.
Now we run the main server through the following command go run ./server/server.go
Now let’s check the Asynqmon
portal by visiting the http://localhost:8080
and we notice here there is nothing really special because there are no jobs in execution right now!
Now, please use Postman to request the endpoint we have for image processing at http://127.0.0.1:3000/process-image
. You can upload the image to the image
field
/process-image
After sending the request, we observed that the processing time is very quick because enqueuing tasks in Redis happen almost instantly, with an O(1) complexity. Subsequently, we returned to Asyncmon and found that there are currently 10 tasks waiting in the queue.
Now, to consume tasks asynchronously, we will utilize the following code snippet. This task is designed to handle asynchronous image resizing tasks using the Asynq library. It defines a image:resize
task type and a payload structure for the image data
, width
, height
, and filename
.
The NewImageResizeTasks
function generates multiple resizing tasks for various standard widths, while the HandleResizeImageTask
function processes these tasks by resizing the image, saving it with a unique filename, and displaying an output UUID
. This code facilitates efficient and concurrent image resizing within applications.
package tasks
import (
"bytes"
"context"
"encoding/json"
"fmt"
"github.com/google/uuid"
"github.com/hibiken/asynq"
"github.com/nfnt/resize"
"image"
"image/jpeg"
_ "image/jpeg"
_ "image/png"
"os"
"path/filepath"
"time"
)
const TypeResizeImage = "image:resize"
type ResizeImagePayload struct {
ImageData []byte
Width uint
Height uint
FileName string
}
var StandardWidths = []uint{16, 32, 128, 240, 320, 480, 540, 640, 800, 1024}
func NewImageResizeTasks(imageData []byte, fileName string) ([]*asynq.Task, error) {
img, _, err := image.Decode(bytes.NewReader(imageData))
if err != nil {
return nil, err
}
originalBounds := img.Bounds()
originalWidth := uint(originalBounds.Dx())
originalHeight := uint(originalBounds.Dy())
var tasks []*asynq.Task
for _, width := range StandardWidths {
height := (width * originalHeight) / originalWidth
payload := ResizeImagePayload{
ImageData: imageData,
Width: width,
Height: height,
FileName: fileName,
}
payloadBytes, err := json.Marshal(payload)
if err != nil {
return nil, err
}
task := asynq.NewTask(TypeResizeImage, payloadBytes)
tasks = append(tasks, task)
}
return tasks, nil
}
func HandleResizeImageTask(ctx context.Context, t *asynq.Task) error {
var payload ResizeImagePayload
if err := json.Unmarshal(t.Payload(), &payload); err != nil {
return fmt.Errorf("failed to parse resize image task payload: %v", err)
}
img, _, err := image.Decode(bytes.NewReader(payload.ImageData))
if err != nil {
return fmt.Errorf("image decode failed: %v", err)
}
resizedImg := resize.Resize(payload.Width, payload.Height, img, resize.Lanczos3)
outputUUID := uuid.New()
outputFileName := fmt.Sprintf("images/%s/%s%s", time.Now().Format("2006-01-02"), outputUUID.String(), filepath.Ext(payload.FileName))
outputDir := filepath.Dir(outputFileName)
if _, err := os.Stat(outputDir); os.IsNotExist(err) {
if err := os.MkdirAll(outputDir, 0755); err != nil {
return err
}
}
outFile, err := os.Create(outputFileName)
if err != nil {
return err
}
defer outFile.Close()
if err := jpeg.Encode(outFile, resizedImg, nil); err != nil {
return err
}
fmt.Printf("Output UUID for the processed image: %s\n", outputUUID.String())
return nil
}
Next, we should execute the workers by running the command go run worker/worker.go
, and we observed that the 10 tasks were efficiently processed asynchronously, separate from the main thread.
Upon running the workers, we observed that the 10 tasks that were previously in the queue have now been processed and completed.
As a final touch, the function responds with a message confirming the successful upload of the image. Additionally, it informs the user that the resizing tasks have been initiated in the background. This seamless process not only enhances user satisfaction but also showcases the power of asynchronous task handling in the image-processing service.
Future Enhancement: Cloud Integration via GCP Bucket
Moving forward, I plan to enhance the service’s functionality by integrating cloud storage capabilities, specifically targeting Google Cloud Platform (GCP) buckets. This will allow for uploading and storing of processed images, further streamlining the workflow and adding a layer of convenience and security.
Conclusion
In conclusion, this article addresses the critical challenge of optimizing image processing within web development. By leveraging Golang, Asynq, Redis, and Fiber, the image-processing service enhances performance through efficient asynchronous task handling. This article shows the power of well-considered software techniques in approaching crucial bottlenecks in web development, leading to a more responsive and efficient user experience.
Source Code
You can find the source code for this article down here 👇