Optimizing Concurrency in a Coffee Ordering System
Written on
Understanding Concurrency in Software Development
In the realm of contemporary software development, enhancing application efficiency is paramount. Concurrency plays a vital role, as modern applications often execute multiple tasks at once, thereby maximizing hardware utilization. To illustrate this concept, let’s consider an example involving a small coffee shop.
Imagine we are developing a mobile application that allows users to place coffee orders online. Below is a straightforward server implementation designed to handle these order requests.
from socket import socket, create_server
BUFFER_SIZE = 1024 ADDRESS = ("127.0.0.1", 12345)
class CoffeeServer:
def __init__(self):
try:
self.server_socket = create_server(ADDRESS)
print("Coffee server started at:", ADDRESS)
except OSError:
print("nCoffee server stopped.")def accept(self):
conn, client_address = self.server_socket.accept()
print("Connected to a coffee lover at:", client_address)
return conn
def serve(self, conn):
try:
while True:
order = conn.recv(BUFFER_SIZE).decode().strip()
if not order:
breakresponse = self.process_order(order)
conn.send(response.encode())
finally:
print("Connection with", conn.getpeername(), "closed")
conn.close()
def process_order(self, order):
menu = {
"1": "Your espresso is on its way!",
"2": "Enjoy your latte!",
"3": "Your cappuccino is coming right up!",
}
return menu.get(order, "Sorry, we don't have that option.")
def start(self):
print("Coffee server is ready to take orders")
try:
while True:
conn = self.accept()
self.serve(conn)
finally:
self.server_socket.close()
print("nCoffee server stopped.")
if __name__ == "__main__":
coffee_server = CoffeeServer()
coffee_server.start()
In this example, we set up a coffee server that communicates with users via sockets. Users can connect, place orders, and receive responses. However, a significant drawback of this implementation is that it can only process one request at a time, leading to potential delays for other customers.
The first video titled "Why Does Thread Break? OVER 10 SOLUTIONS" provides insights into potential issues that arise in multi-threading.
Enhancing Concurrency with Multi-Threading
To address the concurrency limitation, we can utilize multi-threading. Here’s how we can modify our server using Python's threading module.
from socket import socket, create_server import threading
BUFFER_SIZE = 1024 ADDRESS = ("127.0.0.1", 12345)
class CoffeeServer:
def __init__(self):
try:
self.server_socket = create_server(ADDRESS)
print("Coffee server started at:", ADDRESS)
except OSError:
print("nCoffee server stopped.")def accept(self):
conn, client_address = self.server_socket.accept()
print("Connected to a coffee lover at:", client_address)
return conn
def serve(self, conn):
try:
while True:
order = conn.recv(BUFFER_SIZE).decode().strip()
if not order:
breakresponse = self.process_order(order)
conn.send(response.encode())
finally:
print("Connection with", conn.getpeername(), "closed")
conn.close()
def handle_client(self, conn):
try:
self.serve(conn)except Exception as e:
print("An error occurred:", e)finally:
conn.close()def start(self):
print("Coffee server is ready to take orders")
try:
while True:
conn = self.accept()
client_thread = threading.Thread(target=self.handle_client, args=(conn,))
client_thread.start()
finally:
self.server_socket.close()
print("nCoffee server stopped.")
if __name__ == "__main__":
coffee_server = CoffeeServer()
coffee_server.start()
In this updated version, we've introduced a handle_client method, which wraps the serve function. Each accepted connection spawns a new thread, allowing simultaneous handling of multiple requests. However, this approach may lead to high memory and CPU usage when scaling to thousands of users, known as the C10K problem.
The second video, "Picking the Perfect Thread Colors | Live Chat with Angela Walters," offers valuable tips and techniques relevant to our topic.
Exploring Non-Blocking Sockets and Busy Waiting
Traditional sockets, as previously used, are blocking, meaning they halt the server's execution during I/O operations. This limitation can be problematic when managing multiple clients.
To overcome this, we can implement non-blocking sockets and utilize busy waiting. Here’s the revised implementation:
from socket import socket, create_server
BUFFER_SIZE = 1024 ADDRESS = ("127.0.0.1", 12345)
class CoffeeServer:
def __init__(self):
try:
self.server_socket = create_server(ADDRESS)
self.server_socket.setblocking(False)
print("Coffee server started at:", ADDRESS)
self.clients = []
except OSError:
print("nCoffee server stopped.")def accept(self):
try:
conn, client_address = self.server_socket.accept()
print("Connected to a coffee lover at:", client_address)
conn.setblocking(False)
self.clients.append(conn)
except BlockingIOError:
passdef serve(self, conn):
try:
data = conn.recv(BUFFER_SIZE)
if not data:
self.clients.remove(conn)
conn.close()
return
try:
order = data.decode().strip()
response = self.process_order(order)
except ValueError:
response = "Invalid order, please try again."conn.send(response.encode())
except BlockingIOError:
passexcept OSError:
self.clients.remove(conn)
conn.close()
def process_order(self, order):
menu = {
"1": "Your espresso is on its way!",
"2": "Enjoy your latte!",
"3": "Your cappuccino is coming right up!",
}
return menu.get(order, "Sorry, we don't have that option.")
def start(self):
print("Coffee server is ready to take orders")
try:
while True:
self.accept()
for conn in self.clients.copy():
self.serve(conn)finally:
for conn in self.clients:
conn.close()self.server_socket.close()
print("nCoffee server stopped.")
if __name__ == "__main__":
coffee_server = CoffeeServer()
coffee_server.start()
This approach involves configuring the socket to be non-blocking, allowing the server to check for I/O completion without halting execution. However, busy waiting can consume CPU resources unnecessarily.
Event-Based Concurrency: The Reactor Pattern
To address the inefficiencies of busy waiting, we can implement event-based concurrency using the Reactor pattern. This method waits for events and processes them via callback functions.
Here’s how we can rewrite our server using this approach:
import selectors from socket import socket, create_server
BUFFER_SIZE = 1024 ADDRESS = ("127.0.0.1", 12345)
class CoffeeServer:
def __init__(self):
try:
self.server_socket = create_server(ADDRESS)
self.server_socket.setblocking(False)
print("Coffee server started at:", ADDRESS)
self.clients = []
self.selector = selectors.DefaultSelector()
self.selector.register(
self.server_socket, selectors.EVENT_READ, self.accept)except OSError:
print("nCoffee server stopped.")def accept(self, source, _):
try:
conn, client_address = source.accept()
print("Connected to a coffee lover at:", client_address)
conn.setblocking(False)
self.clients.append(conn)
self.selector.register(conn, selectors.EVENT_READ, self.serve)
except BlockingIOError:
passdef serve(self, source, _):
try:
data = source.recv(BUFFER_SIZE)
if not data:
self.clients.remove(source)
self.selector.unregister(source)
source.close()
return
try:
order = data.decode().strip()
response = self.process_order(order)
except ValueError:
response = "Invalid order, please try again."source.send(response.encode())
except BlockingIOError:
passexcept OSError:
self.clients.remove(source)
self.selector.unregister(source)
source.close()
def process_order(self, order):
menu = {
"1": "Your espresso is on its way!",
"2": "Enjoy your latte!",
"3": "Your cappuccino is coming right up!",
}
return menu.get(order, "Sorry, we don't have that option.")
def start(self):
print("Coffee server is ready to take orders")
try:
while True:
events = self.selector.select()
for key, _ in events:
callback = key.data
callback(key.fileobj, _)
finally:
for conn in self.clients:
conn.close()self.server_socket.close()
print("nCoffee server stopped.")
if __name__ == "__main__":
coffee_server = CoffeeServer()
coffee_server.start()
In this version, we leverage the selectors module to manage events efficiently, allowing a single-threaded server to handle multiple connections concurrently without the overhead of thread management.
For those interested in exploring the complete code, please visit the GitHub repository linked below:
GitHub - ashishbamania/Concurrent-Coffee-Server Contribute to ashishbamania/Concurrent-Coffee-Server development by creating an account on GitHub.
github.com
Further Reading
- Grokking Concurrency by Kirill Bobrov (Affiliate link)
- Modern Operating Systems by Andrew Tanenbaum (Affiliate link)
Stay connected through my Substack newsletters:
- Ashish’s Substack | Dr. Ashish Bamania | Substack
- Byte Surgery | Dr. Ashish Bamania | Substack
Explore my publications on Gumroad and Medium.