Commit
This commit is contained in:
parent
cf452a7f06
commit
c3f5624f8d
79
app/lib/App/SSE/EventLoop.php
Normal file
79
app/lib/App/SSE/EventLoop.php
Normal file
@ -0,0 +1,79 @@
|
||||
<?php
|
||||
|
||||
namespace App\SSE;
|
||||
|
||||
/**
|
||||
* Small class to abstract Server-Sent Events (SSE)
|
||||
*
|
||||
* https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events
|
||||
*/
|
||||
class EventLoop
|
||||
{
|
||||
public int $interval = 3;
|
||||
|
||||
public int $heartbeat = 5; // send heartbeat every num seconds to ensure connection is still alive
|
||||
public int $timeLimit = 3600;
|
||||
public int $execLimit = 30;
|
||||
|
||||
public function start(callable $callback): void
|
||||
{
|
||||
// as session data is locked to prevent concurrent writes we
|
||||
// make it read only to prevent the server from locking up
|
||||
if (session_status() === PHP_SESSION_ACTIVE)
|
||||
{
|
||||
session_write_close();
|
||||
}
|
||||
|
||||
header("Cache-Control: no-store");
|
||||
header("Content-Type: text/event-stream");
|
||||
header('X-Accel-Buffering: no'); // for nginx buffering
|
||||
|
||||
// send headers to client indicating we are now a stream
|
||||
ob_end_flush();
|
||||
flush();
|
||||
|
||||
$expirationTime = time() + $this->timeLimit;
|
||||
|
||||
$lastHeartbeat = time();
|
||||
|
||||
while (!connection_aborted() && time() < $expirationTime)
|
||||
{
|
||||
set_time_limit($this->execLimit);
|
||||
try {
|
||||
$data = call_user_func($callback);
|
||||
if ($data !== NULL)
|
||||
{
|
||||
$this->send($data);
|
||||
$lastHeartbeat = time();
|
||||
}
|
||||
} catch (StopEventLoopException $th) {
|
||||
break;
|
||||
}
|
||||
|
||||
// sleep and perform heartbeat to ensure connection is still alive
|
||||
for ($i = 0; $i < $this->interval; $i++)
|
||||
{
|
||||
if (time() >= $lastHeartbeat + $this->heartbeat)
|
||||
{
|
||||
echo ": \n\n";
|
||||
ob_end_flush();
|
||||
flush();
|
||||
$lastHeartbeat = time();
|
||||
}
|
||||
sleep(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Send data to client encoded as json
|
||||
*/
|
||||
private function send($data): void
|
||||
{
|
||||
echo "data: " . json_encode($data);
|
||||
echo "\n\n";
|
||||
// send data to stream
|
||||
ob_end_flush();
|
||||
flush();
|
||||
}
|
||||
}
|
@ -1,39 +0,0 @@
|
||||
<?php
|
||||
|
||||
namespace App\SSE;
|
||||
|
||||
/**
|
||||
* Small class to abstract Server-Sent Events (SSE)
|
||||
*
|
||||
* https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events
|
||||
*/
|
||||
class Sender
|
||||
{
|
||||
public function start(): void
|
||||
{
|
||||
header("Cache-Control: no-store");
|
||||
header("Content-Type: text/event-stream");
|
||||
|
||||
// as session data is locked to prevent concurrent writes we
|
||||
// make it read only to prevent the server from locking up
|
||||
session_write_close();
|
||||
|
||||
// we have to flush before because idk
|
||||
ob_end_flush();
|
||||
flush();
|
||||
}
|
||||
|
||||
public function flush($data, $event = NULL): void
|
||||
{
|
||||
if ($event)
|
||||
{
|
||||
echo "event: $event\n";
|
||||
}
|
||||
echo "data: " . json_encode($data) ."";
|
||||
echo "\n\n";
|
||||
|
||||
// send data to stream
|
||||
ob_end_flush();
|
||||
flush();
|
||||
}
|
||||
}
|
10
app/lib/App/SSE/StopEventLoopException.php
Normal file
10
app/lib/App/SSE/StopEventLoopException.php
Normal file
@ -0,0 +1,10 @@
|
||||
<?php
|
||||
|
||||
namespace App\SSE;
|
||||
|
||||
use \Exception;
|
||||
|
||||
class StopEventLoopException extends Exception
|
||||
{
|
||||
// not much here...
|
||||
}
|
@ -8,19 +8,9 @@
|
||||
</table>
|
||||
|
||||
<script>
|
||||
var source = new EventSource('stream.php');
|
||||
|
||||
source.addEventListener('message', function(e)
|
||||
var es = new EventSource('stream.php');
|
||||
es.onmessage = function(event)
|
||||
{
|
||||
rows = JSON.parse(e.data);
|
||||
console.log(rows);
|
||||
|
||||
// update table
|
||||
root = document.getElementById("root");
|
||||
rows.forEach(row => root.insertAdjacentHTML(
|
||||
'beforebegin',
|
||||
`<tr><td>${row.teamId}</td><td>${row.id}</td></tr>`
|
||||
));
|
||||
|
||||
}, false);
|
||||
console.log("New message", event.data);
|
||||
};
|
||||
</script>
|
@ -2,33 +2,40 @@
|
||||
|
||||
use App\Teamtable\TeamMapper;
|
||||
use App\Timetable\TimeMapper;
|
||||
use App\SSE\Sender;
|
||||
use App\SSE\EventLoop;
|
||||
use App\SSE\StopEventLoopException;
|
||||
|
||||
$teamMapper = new TimeMapper($app->database->conn);
|
||||
$timeMapper = new TimeMapper($app->database->conn);
|
||||
|
||||
/**
|
||||
* Server-Sent Events (SSE)
|
||||
* Send events to client with Server-Sent Events(SSE)
|
||||
*/
|
||||
$sse = new Sender();
|
||||
$sse->start();
|
||||
$sse = new EventLoop();
|
||||
$sse->interval = 1;
|
||||
|
||||
$persist_obj = new class {
|
||||
public ?int $prev_last_insert = NULL;
|
||||
};
|
||||
|
||||
$sse->start(
|
||||
function () use ($timeMapper, $teamMapper, &$persist_obj) {
|
||||
|
||||
$prev_last_insert = NULL;
|
||||
while (!connection_aborted())
|
||||
{
|
||||
$time = $timeMapper->getLatest();
|
||||
|
||||
if ($time)
|
||||
{
|
||||
$last_insert = $time->date->getTimestamp();
|
||||
|
||||
if ($prev_last_insert == NULL || $last_insert > $prev_last_insert)
|
||||
{
|
||||
$sse->flush($timeMapper->getAll());
|
||||
if ( $persist_obj->prev_last_insert == NULL
|
||||
|| $last_insert > $persist_obj->prev_last_insert
|
||||
) {
|
||||
$persist_obj->prev_last_insert = $last_insert;
|
||||
|
||||
$prev_last_insert = $last_insert;
|
||||
return($timeMapper->getAll());
|
||||
}
|
||||
}
|
||||
|
||||
sleep(1);
|
||||
return;
|
||||
}
|
||||
);
|
Reference in New Issue
Block a user