From 2f462ffa1674c9f79acba4c317eeb07ffb105227 Mon Sep 17 00:00:00 2001 From: Michael Date: Mon, 18 Jul 2022 03:31:00 +0000 Subject: [PATCH] Store incoming posts in a queue --- src/Protocol/ActivityPub/Processor.php | 3 ++ src/Protocol/ActivityPub/Receiver.php | 51 ++++++++++++++++++++------ static/dbstructure.config.php | 32 +++++++++++++++- 3 files changed, 74 insertions(+), 12 deletions(-) diff --git a/src/Protocol/ActivityPub/Processor.php b/src/Protocol/ActivityPub/Processor.php index c6420436e8..c49a2da686 100644 --- a/src/Protocol/ActivityPub/Processor.php +++ b/src/Protocol/ActivityPub/Processor.php @@ -223,6 +223,8 @@ class Processor Post\History::add($item['uri-id'], $item); Item::update($item, ['uri' => $activity['id']]); + DBA::delete('inbox-queue', ['url' => $item['uri']]); + if ($activity['object_type'] == 'as:Event') { $posts = Post::select(['event-id', 'uid'], ["`uri` = ? AND `event-id` > ?", $activity['id'], 0]); while ($post = DBA::fetch($posts)) { @@ -890,6 +892,7 @@ class Processor $item_id = Item::insert($item); if ($item_id) { Logger::info('Item insertion successful', ['user' => $item['uid'], 'item_id' => $item_id]); + DBA::delete('inbox-queue', ['url' => $item['uri']]); } else { Logger::notice('Item insertion aborted', ['user' => $item['uid']]); } diff --git a/src/Protocol/ActivityPub/Receiver.php b/src/Protocol/ActivityPub/Receiver.php index 2a56e9e5e7..b55636a30e 100644 --- a/src/Protocol/ActivityPub/Receiver.php +++ b/src/Protocol/ActivityPub/Receiver.php @@ -28,6 +28,7 @@ use Friendica\Content\Text\Markdown; use Friendica\Core\Logger; use Friendica\Core\Protocol; use Friendica\Core\System; +use Friendica\Database\Database; use Friendica\DI; use Friendica\Model\Contact; use Friendica\Model\APContact; @@ -36,6 +37,7 @@ use Friendica\Model\Post; use Friendica\Model\User; use Friendica\Protocol\Activity; use Friendica\Protocol\ActivityPub; +use Friendica\Util\DateTimeFormat; use Friendica\Util\HTTPSignature; use Friendica\Util\JsonLD; use Friendica\Util\LDSignature; @@ -96,7 +98,21 @@ class Receiver $ldactivity = JsonLD::compact($activity); + $http_signer = HTTPSignature::getSigner($body, $header); + if ($http_signer === false) { + Logger::warning('Invalid HTTP signature, message will be discarded.'); + return; + } elseif (empty($http_signer)) { + Logger::info('Signer is a tombstone. The message will be discarded, the signer account is deleted.'); + return; + } else { + Logger::info('Valid HTTP signature', ['signer' => $http_signer]); + } + + self::enqueuePost($ldactivity, $http_signer, $uid); + $actor = JsonLD::fetchElement($ldactivity, 'as:actor', '@id') ?? ''; + $apcontact = APContact::getByURL($actor); if (empty($apcontact)) { @@ -109,17 +125,6 @@ class Receiver APContact::unmarkForArchival($apcontact); } - $http_signer = HTTPSignature::getSigner($body, $header); - if ($http_signer === false) { - Logger::warning('Invalid HTTP signature, message will be discarded.'); - return; - } elseif (empty($http_signer)) { - Logger::info('Signer is a tombstone. The message will be discarded, the signer account is deleted.'); - return; - } else { - Logger::info('Valid HTTP signature', ['signer' => $http_signer]); - } - $signer = [$http_signer]; Logger::info('Message for user ' . $uid . ' is from actor ' . $actor); @@ -157,6 +162,30 @@ class Receiver $fetchQueue->process(); } + private static function enqueuePost(array $ldactivity = [], string $signer, int $uid) + { + if (empty($ldactivity['as:object'])) { + return; + } + + $url = JsonLD::fetchElement($ldactivity, 'as:object', '@id'); + $fields = [ + 'url' => $url, + 'in-reply-to-url' => JsonLD::fetchElement($ldactivity['as:object'], 'as:inReplyTo', '@id'), + 'signer' => $signer, + 'type' => JsonLD::fetchElement($ldactivity, '@type'), + 'object-type' => JsonLD::fetchElement($ldactivity['as:object'], '@type'), + 'activity' => json_encode($ldactivity, JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE | JSON_PRETTY_PRINT), + 'received' => DateTimeFormat::utcNow(), + ]; + DBA::insert('inbox-queue', $fields, Database::INSERT_IGNORE); + + $queue = DBA::selectFirst('inbox-queue', ['id'], ['url' => $url]); + if (!empty($queue['id'])) { + DBA::insert('inbox-queue-receiver', ['queue-id' => $queue['id'], 'uid' => $uid], Database::INSERT_IGNORE); + } + } + /** * Process incoming posts from relays * diff --git a/static/dbstructure.config.php b/static/dbstructure.config.php index 758c33d0da..1d8b21b02c 100644 --- a/static/dbstructure.config.php +++ b/static/dbstructure.config.php @@ -55,7 +55,7 @@ use Friendica\Database\DBA; if (!defined('DB_UPDATE_VERSION')) { - define('DB_UPDATE_VERSION', 1473); + define('DB_UPDATE_VERSION', 1474); } return [ @@ -784,6 +784,36 @@ return [ "hook_file_function" => ["UNIQUE", "hook", "file", "function"], ] ], + "inbox-queue" => [ + "comment" => "Incoming activity", + "fields" => [ + "id" => ["type" => "int unsigned", "not null" => "1", "extra" => "auto_increment", "primary" => "1", "comment" => "sequential ID"], + "url" => ["type" => "varbinary(255)", "comment" => "id of the incoming activity"], + "in-reply-to-url" => ["type" => "varbinary(255)", "comment" => "related id of the incoming activity"], + "signer" => ["type" => "varbinary(255)", "comment" => "Signer of the incoming activity"], + "activity" => ["type" => "mediumtext", "comment" => "The JSON activity"], + "received" => ["type" => "datetime", "not null" => "1", "default" => DBA::NULL_DATETIME, "comment" => "Receiving date"], + "type" => ["type" => "varchar(64)", "not null" => "1", "default" => "", "comment" => "Type of the activity"], + "object-type" => ["type" => "varchar(64)", "not null" => "1", "default" => "", "comment" => "Type of the object activity"], + ], + "indexes" => [ + "PRIMARY" => ["id"], + "url" => ["UNIQUE", "url"], + "in-reply-to-url" => ["in-reply-to-url"], + "received" => ["received"], + ] + ], + "inbox-queue-receiver" => [ + "comment" => "Receiver for the incoming activity", + "fields" => [ + "queue-id" => ["type" => "int unsigned", "not null" => "1", "foreign" => ["inbox-queue" => "id"], "comment" => ""], + "uid" => ["type" => "mediumint unsigned", "not null" => "1", "default" => "0", "foreign" => ["user" => "uid"], "comment" => "User id"], + ], + "indexes" => [ + "PRIMARY" => ["queue-id", "uid"], + "uid" => ["uid"], + ] + ], "inbox-status" => [ "comment" => "Status of ActivityPub inboxes", "fields" => [